Skip to content

Commit

Permalink
insights: More robust find-and-replace for series_ids in oob settings…
Browse files Browse the repository at this point in the history
… migration (sourcegraph#28877)
  • Loading branch information
CristinaBirkel committed Dec 13, 2021
1 parent d02f28e commit 99bd0e3
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 12 deletions.
27 changes: 19 additions & 8 deletions enterprise/internal/insights/migration/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/sourcegraph/sourcegraph/enterprise/internal/insights/store"
"github.com/sourcegraph/sourcegraph/enterprise/internal/insights/types"
"github.com/sourcegraph/sourcegraph/internal/api"
"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/insights"
)

Expand Down Expand Up @@ -215,7 +216,7 @@ func (m *migrator) migrateInsights(ctx context.Context, toMigrate []insights.Sea
count++
continue
}
err = migrateSeries(ctx, m.insightStore, d, batch)
err = migrateSeries(ctx, m.insightStore, m.workerBaseStore, d, batch)
if err != nil {
errs = multierror.Append(errs, err)
continue
Expand Down Expand Up @@ -302,7 +303,7 @@ func migrateLangStatSeries(ctx context.Context, insightStore *store.InsightStore
return nil
}

func migrateSeries(ctx context.Context, insightStore *store.InsightStore, from insights.SearchInsight, batch migrationBatch) (err error) {
func migrateSeries(ctx context.Context, insightStore *store.InsightStore, workerStore *basestore.Store, from insights.SearchInsight, batch migrationBatch) (err error) {
tx, err := insightStore.Transact(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -364,15 +365,25 @@ func migrateSeries(ctx context.Context, insightStore *store.InsightStore, from i

// Also match/replace old series_points ids with the new series id
oldId := discovery.Encode(timeSeries)
silentErr := updateTimeSeriesReferences(tx.Handle().DB(), ctx, oldId, temp.SeriesID)
countUpdated, silentErr := updateTimeSeriesReferences(tx.Handle().DB(), ctx, oldId, temp.SeriesID)
if silentErr != nil {
// If the find-replace fails, it's not a big deal. It will just need to be calcuated again.
log15.Error("error updating series_id", "series_id", temp.SeriesID, "err", silentErr)
log15.Error("error updating series_id for series_points", "series_id", temp.SeriesID, "err", silentErr)
} else if countUpdated == 0 {
// If find-replace doesn't match any records, we still need to backfill, so just continue
} else {
// If the find-replace succeeded, we can stamp the backfill_queued_at on the new series.
series, err = tx.StampBackfill(ctx, series)
if err != nil {
return errors.Wrapf(err, "unable to migrate insight unique_id: %s series_id: %s", from.ID, temp.SeriesID)
// If the find-replace succeeded, we can do a similar find-replace on the jobs in the queue,
// and then stamp the backfill_queued_at on the new series.
silentErr = updateTimeSeriesJobReferences(workerStore, ctx, oldId, temp.SeriesID)
if silentErr != nil {
// If the find-replace fails, it's not a big deal. It will just need to be calcuated again.
log15.Error("error updating series_id for jobs", "series_id", temp.SeriesID, "err", silentErr)
} else {
series, silentErr = tx.StampBackfill(ctx, series)
if silentErr != nil {
// If the stamp fails, skip it. It will just need to be calcuated again.
log15.Error("error updating backfill_queued_at", "series_id", temp.SeriesID, "err", silentErr)
}
}
}
}
Expand Down
27 changes: 23 additions & 4 deletions enterprise/internal/insights/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type migrator struct {
insightStore *store.InsightStore
dashboardStore *store.DBDashboardStore
orgStore database.OrgStore
workerBaseStore *basestore.Store
}

func NewMigrator(insightsDB dbutil.DB, postgresDB dbutil.DB) oobmigration.Migrator {
Expand All @@ -51,6 +52,7 @@ func NewMigrator(insightsDB dbutil.DB, postgresDB dbutil.DB) oobmigration.Migrat
insightStore: store.NewInsightStore(insightsDB),
dashboardStore: store.NewDashboardStore(insightsDB),
orgStore: database.Orgs(postgresDB),
workerBaseStore: basestore.NewWithDB(postgresDB, sql.TxOptions{}),
}
}

Expand Down Expand Up @@ -404,12 +406,29 @@ func (m *migrator) migrateDashboard(ctx context.Context, from insights.SettingDa
return nil
}

func updateTimeSeriesReferences(handle dbutil.DB, ctx context.Context, oldId, newId string) error {
q := sqlf.Sprintf("update series_points set series_id = %s where series_id = %s", newId, oldId)
func updateTimeSeriesReferences(handle dbutil.DB, ctx context.Context, oldId, newId string) (int, error) {
q := sqlf.Sprintf(`
WITH updated AS (
UPDATE series_points sp
SET series_id = %s
WHERE series_id = %s
RETURNING sp.series_id
)
SELECT count(*) FROM updated;
`, newId, oldId)
tempStore := basestore.NewWithDB(handle, sql.TxOptions{})
err := tempStore.Exec(ctx, q)
count, _, err := basestore.ScanFirstInt(tempStore.Query(ctx, q))
if err != nil {
return errors.Wrap(err, "updateTimeSeriesReferences")
return 0, errors.Wrap(err, "updateTimeSeriesReferences")
}
return count, nil
}

func updateTimeSeriesJobReferences(workerStore *basestore.Store, ctx context.Context, oldId, newId string) error {
q := sqlf.Sprintf("update insights_query_runner_jobs set series_id = %s where series_id = %s", newId, oldId)
err := workerStore.Exec(ctx, q)
if err != nil {
return errors.Wrap(err, "updateTimeSeriesJobReferences")
}
return nil
}

0 comments on commit 99bd0e3

Please sign in to comment.