From fb6338560ed2f70a1897b6093586235f863d78d9 Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Thu, 28 Jul 2022 12:45:10 +0800 Subject: [PATCH] br: use one shot session to close domain ASAP (#36558) (#36594) ref pingcap/tidb#36546 --- br/pkg/backup/client.go | 16 ++- br/pkg/backup/client_test.go | 6 +- br/pkg/glue/glue.go | 6 ++ br/pkg/gluetidb/glue.go | 159 +++++++++++++++++++++++++++++ br/pkg/gluetikv/glue.go | 5 + br/pkg/restore/db_test.go | 12 ++- br/pkg/task/backup.go | 19 ++-- br/tests/br_full_ddl/run.sh | 10 ++ br/tests/br_incremental_ddl/run.sh | 36 ++++++- executor/brie.go | 6 ++ 10 files changed, 255 insertions(+), 20 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 598f9d50c3006..17058f5a03722 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/conn" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/redact" @@ -36,7 +37,6 @@ import ( "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/ranger" @@ -473,7 +473,7 @@ func skipUnsupportedDDLJob(job *model.Job) bool { } // WriteBackupDDLJobs sends the ddl jobs are done in (lastBackupTS, backupTS] to metaWriter. -func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, se sessionctx.Context, store kv.Storage, lastBackupTS, backupTS uint64) error { +func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.Storage, lastBackupTS, backupTS uint64, needDomain bool) error { snapshot := store.GetSnapshot(kv.NewVersion(backupTS)) snapMeta := meta.NewSnapshotMeta(snapshot) lastSnapshot := store.GetSnapshot(kv.NewVersion(lastBackupTS)) @@ -492,11 +492,19 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, se sessionctx.Context, return errors.Trace(err) } newestMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(version.Ver))) - allJobs, err := ddl.GetAllDDLJobs(se, newestMeta) + allJobs := make([]*model.Job, 0) + err = g.UseOneShotSession(store, !needDomain, func(se glue.Session) error { + allJobs, err = ddl.GetAllDDLJobs(se.GetSessionCtx(), newestMeta) + if err != nil { + return errors.Trace(err) + } + log.Debug("get all jobs", zap.Int("jobs", len(allJobs))) + return nil + }) if err != nil { return errors.Trace(err) } - log.Debug("get all jobs", zap.Int("jobs", len(allJobs))) + historyJobs, err := ddl.GetAllHistoryDDLJobs(newestMeta) if err != nil { return errors.Trace(err) diff --git a/br/pkg/backup/client_test.go b/br/pkg/backup/client_test.go index 60cf42f94998a..ae5e24ac3002b 100644 --- a/br/pkg/backup/client_test.go +++ b/br/pkg/backup/client_test.go @@ -15,6 +15,7 @@ import ( "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/tidb/br/pkg/backup" "github.com/pingcap/tidb/br/pkg/conn" + "github.com/pingcap/tidb/br/pkg/gluetidb" "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/mock" "github.com/pingcap/tidb/br/pkg/pdutil" @@ -38,6 +39,7 @@ type testBackup struct { cancel context.CancelFunc mockPDClient pd.Client + mockGlue *gluetidb.MockGlue backupClient *backup.Client cluster *mock.Cluster @@ -48,6 +50,7 @@ func createBackupSuite(t *testing.T) (s *testBackup, clean func()) { tikvClient, _, pdClient, err := testutils.NewMockTiKV("", nil) require.NoError(t, err) s = new(testBackup) + s.mockGlue = &gluetidb.MockGlue{} s.mockPDClient = pdClient s.ctx, s.cancel = context.WithCancel(context.Background()) mockMgr := &conn.Mgr{PdController: &pdutil.PdController{}} @@ -280,7 +283,8 @@ func TestSkipUnsupportedDDLJob(t *testing.T) { metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher) ctx := context.Background() metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL) - err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.cluster.Storage, lastTS, ts) + s.mockGlue.SetSession(tk.Session()) + err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.cluster.Storage, lastTS, ts, false) require.NoErrorf(t, err, "Error get ddl jobs: %s", err) err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL) require.NoError(t, err, "Flush failed", err) diff --git a/br/pkg/glue/glue.go b/br/pkg/glue/glue.go index 4c3f18714f9a3..09dfce094af07 100644 --- a/br/pkg/glue/glue.go +++ b/br/pkg/glue/glue.go @@ -29,6 +29,12 @@ type Glue interface { // GetVersion gets BR package version to run backup/restore job GetVersion() string + + // UseOneShotSession temporary creates session from store when run backup job. + // because we don't have to own domain/session during the whole backup. + // we can close domain as soon as possible. + // and we must reuse the exists session and don't close it in SQL backup job. + UseOneShotSession(store kv.Storage, closeDomain bool, fn func(se Session) error) error } // Session is an abstraction of the session.Session interface. diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index d07594f1f842d..dfe5ae62639bd 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -114,6 +114,40 @@ func (g Glue) GetVersion() string { return g.tikvGlue.GetVersion() } +// UseOneShotSession implements glue.Glue. +func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error { + se, err := session.CreateSession(store) + if err != nil { + return errors.Trace(err) + } + glueSession := &tidbSession{ + se: se, + } + defer func() { + se.Close() + log.Info("one shot session closed") + }() + // dom will be created during session.CreateSession. + dom, err := session.GetDomain(store) + if err != nil { + return errors.Trace(err) + } + // because domain was created during the whole program exists. + // and it will register br info to info syncer. + // we'd better close it as soon as possible. + if closeDomain { + defer func() { + dom.Close() + log.Info("one shot domain closed") + }() + } + err = fn(glueSession) + if err != nil { + return errors.Trace(err) + } + return nil +} + // GetSessionCtx implements glue.Glue func (gs *tidbSession) GetSessionCtx() sessionctx.Context { return gs.se @@ -266,3 +300,128 @@ func (gs *tidbSession) showCreateDatabase(db *model.DBInfo) (string, error) { func (gs *tidbSession) showCreatePlacementPolicy(policy *model.PolicyInfo) string { return executor.ConstructResultOfShowCreatePlacementPolicy(policy) } + +// mockSession is used for test. +type mockSession struct { + se session.Session +} + +// GetSessionCtx implements glue.Glue +func (s *mockSession) GetSessionCtx() sessionctx.Context { + return s.se +} + +// Execute implements glue.Session. +func (s *mockSession) Execute(ctx context.Context, sql string) error { + return s.ExecuteInternal(ctx, sql) +} + +func (s *mockSession) ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error { + ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR) + rs, err := s.se.ExecuteInternal(ctx, sql, args...) + if err != nil { + return err + } + // Some of SQLs (like ADMIN RECOVER INDEX) may lazily take effect + // when we polling the result set. + // At least call `next` once for triggering theirs side effect. + // (Maybe we'd better drain all returned rows?) + if rs != nil { + //nolint: errcheck + defer rs.Close() + c := rs.NewChunk(nil) + if err := rs.Next(ctx, c); err != nil { + return nil + } + } + return nil +} + +// CreateDatabase implements glue.Session. +func (s *mockSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { + log.Fatal("unimplemented CreateDatabase for mock session") + return nil + +} + +// CreatePlacementPolicy implements glue.Session. +func (s *mockSession) CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error { + log.Fatal("unimplemented CreateDatabase for mock session") + return nil +} + +// CreateTables implements glue.BatchCreateTableSession. +func (s *mockSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error { + log.Fatal("unimplemented CreateDatabase for mock session") + return nil +} + +// CreateTable implements glue.Session. +func (s *mockSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error { + log.Fatal("unimplemented CreateDatabase for mock session") + return nil +} + +// Close implements glue.Session. +func (s *mockSession) Close() { + s.se.Close() +} + +// GetGlobalVariables implements glue.Session. +func (s *mockSession) GetGlobalVariable(name string) (string, error) { + return "true", nil +} + +// MockGlue only used for test +type MockGlue struct { + se session.Session +} + +func (m *MockGlue) SetSession(se session.Session) { + m.se = se +} + +// GetDomain implements glue.Glue. +func (*MockGlue) GetDomain(store kv.Storage) (*domain.Domain, error) { + return nil, nil +} + +// CreateSession implements glue.Glue. +func (m *MockGlue) CreateSession(store kv.Storage) (glue.Session, error) { + glueSession := &mockSession{ + se: m.se, + } + return glueSession, nil +} + +// Open implements glue.Glue. +func (*MockGlue) Open(path string, option pd.SecurityOption) (kv.Storage, error) { + return nil, nil +} + +// OwnsStorage implements glue.Glue. +func (*MockGlue) OwnsStorage() bool { + return true +} + +// StartProgress implements glue.Glue. +func (*MockGlue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress { + return nil +} + +// Record implements glue.Glue. +func (*MockGlue) Record(name string, value uint64) { +} + +// GetVersion implements glue.Glue. +func (*MockGlue) GetVersion() string { + return "mock glue" +} + +// UseOneShotSession implements glue.Glue. +func (m *MockGlue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error { + glueSession := &mockSession{ + se: m.se, + } + return fn(glueSession) +} diff --git a/br/pkg/gluetikv/glue.go b/br/pkg/gluetikv/glue.go index 69c18c3c50277..a8c020528c771 100644 --- a/br/pkg/gluetikv/glue.go +++ b/br/pkg/gluetikv/glue.go @@ -68,3 +68,8 @@ func (Glue) Record(name string, val uint64) { func (Glue) GetVersion() string { return "BR\n" + build.Info() } + +// UseOneShotSession implements glue.Glue. +func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error { + return nil +} diff --git a/br/pkg/restore/db_test.go b/br/pkg/restore/db_test.go index 89ff7a9ab62a4..b5c52895c0ac1 100644 --- a/br/pkg/restore/db_test.go +++ b/br/pkg/restore/db_test.go @@ -30,13 +30,15 @@ import ( ) type testRestoreSchemaSuite struct { - mock *mock.Cluster - storage storage.ExternalStorage + mock *mock.Cluster + mockGlue *gluetidb.MockGlue + storage storage.ExternalStorage } func createRestoreSchemaSuite(t *testing.T) (s *testRestoreSchemaSuite, clean func()) { var err error s = new(testRestoreSchemaSuite) + s.mockGlue = &gluetidb.MockGlue{} s.mock, err = mock.NewCluster() require.NoError(t, err) base := t.TempDir() @@ -194,7 +196,8 @@ func TestFilterDDLJobs(t *testing.T) { metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher) ctx := context.Background() metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL) - err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.mock.Storage, lastTS, ts) + s.mockGlue.SetSession(tk.Session()) + err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.mock.Storage, lastTS, ts, false) require.NoErrorf(t, err, "Error get ddl jobs: %s", err) err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL) require.NoErrorf(t, err, "Flush failed", err) @@ -258,7 +261,8 @@ func TestFilterDDLJobsV2(t *testing.T) { metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, true, "", &cipher) ctx := context.Background() metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL) - err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.mock.Storage, lastTS, ts) + s.mockGlue.SetSession(tk.Session()) + err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.mock.Storage, lastTS, ts, false) require.NoErrorf(t, err, "Error get ddl jobs: %s", err) err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL) require.NoErrorf(t, err, "Flush failed", err) diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index a80806b09e236..c7a70877f3254 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -263,16 +263,19 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig statsHandle = mgr.GetDomain().StatsHandle() } - se, err := g.CreateSession(mgr.GetStorage()) - if err != nil { - return errors.Trace(err) - } - newCollationEnable, err := se.GetGlobalVariable(tidbNewCollationEnabled) + var newCollationEnable string + err = g.UseOneShotSession(mgr.GetStorage(), !needDomain, func(se glue.Session) error { + newCollationEnable, err = se.GetGlobalVariable(tidbNewCollationEnabled) + if err != nil { + return errors.Trace(err) + } + log.Info("get new_collations_enabled_on_first_bootstrap config from system table", + zap.String(tidbNewCollationEnabled, newCollationEnable)) + return nil + }) if err != nil { return errors.Trace(err) } - log.Info("get new_collations_enabled_on_first_bootstrap config from system table", - zap.String(tidbNewCollationEnabled, newCollationEnable)) client, err := backup.NewBackupClient(ctx, mgr) if err != nil { @@ -399,7 +402,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig } metawriter.StartWriteMetasAsync(ctx, metautil.AppendDDL) - err = backup.WriteBackupDDLJobs(metawriter, se.GetSessionCtx(), mgr.GetStorage(), cfg.LastBackupTS, backupTS) + err = backup.WriteBackupDDLJobs(metawriter, g, mgr.GetStorage(), cfg.LastBackupTS, backupTS, needDomain) if err != nil { return errors.Trace(err) } diff --git a/br/tests/br_full_ddl/run.sh b/br/tests/br_full_ddl/run.sh index 1433c1f71e9a6..ae056ad5206e5 100755 --- a/br/tests/br_full_ddl/run.sh +++ b/br/tests/br_full_ddl/run.sh @@ -82,6 +82,16 @@ if [ "${checksum_count}" -lt "1" ];then exit 1 fi +# when we have backup stats during backup, we cannot close domain during one shot session. +# so we can check the log count of `one shot domain closed`. +# we will call UseOneShotSession once to get the value global variable. +one_shot_session_count=$(cat $LOG | grep "one shot session closed" | wc -l | xargs) +one_shot_domain_count=$(cat $LOG | grep "one shot domain closed" | wc -l | xargs) +if [ "${one_shot_session_count}" -ne "1" ] || [ "$one_shot_domain_count" -ne "0" ];then + echo "TEST: [$TEST_NAME] fail on one shot session check, $one_shot_session_count, $one_shot_domain_count" + exit 1 +fi + echo "backup start without stats..." run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/${DB}_disable_stats" --concurrency 4 diff --git a/br/tests/br_incremental_ddl/run.sh b/br/tests/br_incremental_ddl/run.sh index 49b825498e2af..df5a478f6733e 100755 --- a/br/tests/br_incremental_ddl/run.sh +++ b/br/tests/br_incremental_ddl/run.sh @@ -19,6 +19,7 @@ DB="$TEST_NAME" TABLE="usertable" ROW_COUNT=100 PATH="tests/$TEST_NAME:bin:$PATH" +LOG=/$TEST_DIR/backup.log echo "load data..." # create database @@ -30,9 +31,24 @@ for i in $(seq $ROW_COUNT); do run_sql "INSERT INTO ${DB}.${TABLE}(c1) VALUES ($i);" done + +# Do not log to terminal +unset BR_LOG_TO_TERM # full backup echo "full backup start..." -run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/full" --db $DB -t $TABLE +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/full" --db $DB -t $TABLE --log-file $LOG + +# when we backup, we should close domain in one shot session. +# so we can check the log count of `one shot domain closed` to be 1. +# we will call UseOneShotSession once to get the value global variable. +one_shot_session_count=$(cat $LOG | grep "one shot session closed" | wc -l | xargs) +one_shot_domain_count=$(cat $LOG | grep "one shot domain closed" | wc -l | xargs) +if [ "${one_shot_session_count}" -ne "1" ] || [ "$one_shot_domain_count" -ne "1" ];then + echo "TEST: [$TEST_NAME] fail on one shot session check during backup, $one_shot_session_count, $one_shot_domain_count" + exit 1 +fi +rm -rf $LOG + # run ddls echo "run ddls..." run_sql "RENAME TABLE ${DB}.${TABLE} to ${DB}.${TABLE}1;" @@ -54,7 +70,21 @@ done # incremental backup echo "incremental backup start..." last_backup_ts=$(run_br validate decode --field="end-version" -s "local://$TEST_DIR/$DB/full" | grep -oE "^[0-9]+") -run_br --pd $PD_ADDR backup db -s "local://$TEST_DIR/$DB/inc" --db $DB --lastbackupts $last_backup_ts +run_br --pd $PD_ADDR backup db -s "local://$TEST_DIR/$DB/inc" --db $DB --lastbackupts $last_backup_ts --log-file $LOG + +# when we doing incremental backup, we should close domain in one shot session. +# so we can check the log count of `one shot domain closed` to be 2. +# we will call UseOneShotSession twice +# 1. to get the value global variable. +# 2. to get all ddl jobs with session. +one_shot_session_count=$(cat $LOG | grep "one shot session closed" | wc -l | xargs) +one_shot_domain_count=$(cat $LOG | grep "one shot domain closed" | wc -l | xargs) +if [ "${one_shot_session_count}" -ne "2" ] || [ "$one_shot_domain_count" -ne "2" ];then + echo "TEST: [$TEST_NAME] fail on one shot session check during inc backup, $one_shot_session_count, $one_shot_domain_count" + exit 1 +fi +rm -rf $LOG +BR_LOG_TO_TERM=1 run_sql "DROP DATABASE $DB;" # full restore @@ -101,4 +131,4 @@ fi run_sql "INSERT INTO ${DB}.${TABLE}(c2) VALUES ('1');" run_sql "INSERT INTO ${DB}.${TABLE}_rename2(c) VALUES ('1');" -run_sql "DROP DATABASE $DB;" \ No newline at end of file +run_sql "DROP DATABASE $DB;" diff --git a/executor/brie.go b/executor/brie.go index 690497da83b54..f77f1567f7b64 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -568,3 +568,9 @@ func (gs *tidbGlueSession) Record(name string, value uint64) { func (gs *tidbGlueSession) GetVersion() string { return "TiDB\n" + printer.GetTiDBInfo() } + +// UseOneShotSession implements glue.Glue +func (gs *tidbGlueSession) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(se glue.Session) error) error { + // in SQL backup. we don't need to close domain. + return fn(gs) +}