From b3c505edc96c114f1234fea0e94fb81ef3e648cc Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 26 Jul 2022 13:19:42 +0800 Subject: [PATCH 1/7] br: use one shot session to close domain ASAP --- br/pkg/backup/client.go | 16 +++- br/pkg/backup/client_test.go | 7 +- br/pkg/glue/glue.go | 6 ++ br/pkg/gluetidb/glue.go | 150 +++++++++++++++++++++++++++++++++++ br/pkg/gluetikv/glue.go | 5 ++ br/pkg/restore/db_test.go | 13 ++- br/pkg/task/backup.go | 19 +++-- executor/brie.go | 5 ++ 8 files changed, 204 insertions(+), 17 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index 598f9d50c3006..ecd6aa3e5a364 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/conn" berrors "github.com/pingcap/tidb/br/pkg/errors" + "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/logutil" "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/redact" @@ -36,7 +37,6 @@ import ( "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/parser/model" - "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/ranger" @@ -473,7 +473,7 @@ func skipUnsupportedDDLJob(job *model.Job) bool { } // WriteBackupDDLJobs sends the ddl jobs are done in (lastBackupTS, backupTS] to metaWriter. -func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, se sessionctx.Context, store kv.Storage, lastBackupTS, backupTS uint64) error { +func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.Storage, lastBackupTS, backupTS uint64) error { snapshot := store.GetSnapshot(kv.NewVersion(backupTS)) snapMeta := meta.NewSnapshotMeta(snapshot) lastSnapshot := store.GetSnapshot(kv.NewVersion(lastBackupTS)) @@ -492,11 +492,19 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, se sessionctx.Context, return errors.Trace(err) } newestMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(version.Ver))) - allJobs, err := ddl.GetAllDDLJobs(se, newestMeta) + allJobs := make([]*model.Job, 0) + err = g.UseOneShotSession(store, func(se glue.Session) error { + allJobs, err = ddl.GetAllDDLJobs(se.GetSessionCtx(), newestMeta) + if err != nil { + return errors.Trace(err) + } + log.Debug("get all jobs", zap.Int("jobs", len(allJobs))) + return nil + }) if err != nil { return errors.Trace(err) } - log.Debug("get all jobs", zap.Int("jobs", len(allJobs))) + historyJobs, err := ddl.GetAllHistoryDDLJobs(newestMeta) if err != nil { return errors.Trace(err) diff --git a/br/pkg/backup/client_test.go b/br/pkg/backup/client_test.go index 60cf42f94998a..84d642f5fbf0d 100644 --- a/br/pkg/backup/client_test.go +++ b/br/pkg/backup/client_test.go @@ -5,6 +5,7 @@ package backup_test import ( "context" "encoding/json" + "github.com/pingcap/tidb/br/pkg/gluetidb" "math" "testing" "time" @@ -15,6 +16,7 @@ import ( "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/tidb/br/pkg/backup" "github.com/pingcap/tidb/br/pkg/conn" + "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/mock" "github.com/pingcap/tidb/br/pkg/pdutil" @@ -38,6 +40,7 @@ type testBackup struct { cancel context.CancelFunc mockPDClient pd.Client + mockGlue *gluetidb.MockGlue backupClient *backup.Client cluster *mock.Cluster @@ -48,6 +51,7 @@ func createBackupSuite(t *testing.T) (s *testBackup, clean func()) { tikvClient, _, pdClient, err := testutils.NewMockTiKV("", nil) require.NoError(t, err) s = new(testBackup) + s.mockGlue = &gluetidb.MockGlue{} s.mockPDClient = pdClient s.ctx, s.cancel = context.WithCancel(context.Background()) mockMgr := &conn.Mgr{PdController: &pdutil.PdController{}} @@ -280,7 +284,8 @@ func TestSkipUnsupportedDDLJob(t *testing.T) { metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher) ctx := context.Background() metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL) - err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.cluster.Storage, lastTS, ts) + s.mockGlue.SetSession(tk.Session()) + err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.cluster.Storage, lastTS, ts) require.NoErrorf(t, err, "Error get ddl jobs: %s", err) err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL) require.NoError(t, err, "Flush failed", err) diff --git a/br/pkg/glue/glue.go b/br/pkg/glue/glue.go index 4c3f18714f9a3..b4ae582874cd4 100644 --- a/br/pkg/glue/glue.go +++ b/br/pkg/glue/glue.go @@ -29,6 +29,12 @@ type Glue interface { // GetVersion gets BR package version to run backup/restore job GetVersion() string + + // UseOneShotSession temporary creates session from store when run backup job. + // because we don't have to own domain/session during the whole backup. + // we can close domain as soon as possible. + // and in SQL backup job. we must reuse the exists session and don't close it. + UseOneShotSession(store kv.Storage, fn func(se Session) error) error } // Session is an abstraction of the session.Session interface. diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index d07594f1f842d..e0498d5e9ef2b 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -114,6 +114,31 @@ func (g Glue) GetVersion() string { return g.tikvGlue.GetVersion() } +// UseOneShotSession implements glue.Glue. +func (g Glue) UseOneShotSession(store kv.Storage, fn func(glue.Session) error) error { + se, err := session.CreateSession(store) + if err != nil { + return errors.Trace(err) + } + glueSession := &tidbSession{ + se: se, + } + // dom will be created during session.CreateSession. + dom, err := session.GetDomain(store) + if err != nil { + return errors.Trace(err) + } + // because domain was created during the whole program exists. + // and it will register br info to info syncer. + // we'd better close it as soon as possible. + defer dom.Close() + err = fn(glueSession) + if err != nil { + return errors.Trace(err) + } + return nil +} + // GetSessionCtx implements glue.Glue func (gs *tidbSession) GetSessionCtx() sessionctx.Context { return gs.se @@ -266,3 +291,128 @@ func (gs *tidbSession) showCreateDatabase(db *model.DBInfo) (string, error) { func (gs *tidbSession) showCreatePlacementPolicy(policy *model.PolicyInfo) string { return executor.ConstructResultOfShowCreatePlacementPolicy(policy) } + +// mockSession is used for test. +type mockSession struct { + se session.Session +} + +// GetSessionCtx implements glue.Glue +func (s *mockSession) GetSessionCtx() sessionctx.Context { + return s.se +} + +// Execute implements glue.Session. +func (s *mockSession) Execute(ctx context.Context, sql string) error { + return s.ExecuteInternal(ctx, sql) +} + +func (s *mockSession) ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error { + ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR) + rs, err := s.se.ExecuteInternal(ctx, sql, args...) + if err != nil { + return err + } + // Some of SQLs (like ADMIN RECOVER INDEX) may lazily take effect + // when we polling the result set. + // At least call `next` once for triggering theirs side effect. + // (Maybe we'd better drain all returned rows?) + if rs != nil { + //nolint: errcheck + defer rs.Close() + c := rs.NewChunk(nil) + if err := rs.Next(ctx, c); err != nil { + return nil + } + } + return nil +} + +// CreateDatabase implements glue.Session. +func (s *mockSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error { + log.Fatal("unimplemented CreateDatabase for mock session") + return nil + +} + +// CreatePlacementPolicy implements glue.Session. +func (s *mockSession) CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error { + log.Fatal("unimplemented CreateDatabase for mock session") + return nil +} + +// CreateTables implements glue.BatchCreateTableSession. +func (s *mockSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error { + log.Fatal("unimplemented CreateDatabase for mock session") + return nil +} + +// CreateTable implements glue.Session. +func (s *mockSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error { + log.Fatal("unimplemented CreateDatabase for mock session") + return nil +} + +// Close implements glue.Session. +func (s *mockSession) Close() { + s.se.Close() +} + +// GetGlobalVariables implements glue.Session. +func (s *mockSession) GetGlobalVariable(name string) (string, error) { + return "true", nil +} + +// MockGlue only used for test +type MockGlue struct { + se session.Session +} + +func (m *MockGlue) SetSession(se session.Session) { + m.se = se +} + +// GetDomain implements glue.Glue. +func (*MockGlue) GetDomain(store kv.Storage) (*domain.Domain, error) { + return nil, nil +} + +// CreateSession implements glue.Glue. +func (m *MockGlue) CreateSession(store kv.Storage) (glue.Session, error) { + glueSession := &mockSession{ + se: m.se, + } + return glueSession, nil +} + +// Open implements glue.Glue. +func (*MockGlue) Open(path string, option pd.SecurityOption) (kv.Storage, error) { + return nil, nil +} + +// OwnsStorage implements glue.Glue. +func (*MockGlue) OwnsStorage() bool { + return true +} + +// StartProgress implements glue.Glue. +func (*MockGlue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress { + return nil +} + +// Record implements glue.Glue. +func (*MockGlue) Record(name string, value uint64) { +} + +// GetVersion implements glue.Glue. +func (*MockGlue) GetVersion() string { + return "mock glue" +} + +// UseOneShotSession implements glue.Glue. +func (m *MockGlue) UseOneShotSession(store kv.Storage, fn func(glue.Session) error) error { + glueSession := &mockSession{ + se: m.se, + } + return fn(glueSession) +} diff --git a/br/pkg/gluetikv/glue.go b/br/pkg/gluetikv/glue.go index 69c18c3c50277..ef3b895dc7624 100644 --- a/br/pkg/gluetikv/glue.go +++ b/br/pkg/gluetikv/glue.go @@ -68,3 +68,8 @@ func (Glue) Record(name string, val uint64) { func (Glue) GetVersion() string { return "BR\n" + build.Info() } + +// UseOneShotSession implements glue.Glue. +func (g Glue) UseOneShotSession(store kv.Storage, fn func(glue.Session) error) error { + return nil +} diff --git a/br/pkg/restore/db_test.go b/br/pkg/restore/db_test.go index 89ff7a9ab62a4..7124343188722 100644 --- a/br/pkg/restore/db_test.go +++ b/br/pkg/restore/db_test.go @@ -5,6 +5,7 @@ package restore_test import ( "context" "encoding/json" + "github.com/pingcap/tidb/br/pkg/glue" "math" "strconv" "testing" @@ -30,13 +31,15 @@ import ( ) type testRestoreSchemaSuite struct { - mock *mock.Cluster - storage storage.ExternalStorage + mock *mock.Cluster + mockGlue *gluetidb.MockGlue + storage storage.ExternalStorage } func createRestoreSchemaSuite(t *testing.T) (s *testRestoreSchemaSuite, clean func()) { var err error s = new(testRestoreSchemaSuite) + s.mockGlue = &gluetidb.MockGlue{} s.mock, err = mock.NewCluster() require.NoError(t, err) base := t.TempDir() @@ -194,7 +197,8 @@ func TestFilterDDLJobs(t *testing.T) { metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher) ctx := context.Background() metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL) - err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.mock.Storage, lastTS, ts) + s.mockGlue.SetSession(tk.Session()) + err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.mock.Storage, lastTS, ts) require.NoErrorf(t, err, "Error get ddl jobs: %s", err) err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL) require.NoErrorf(t, err, "Flush failed", err) @@ -258,7 +262,8 @@ func TestFilterDDLJobsV2(t *testing.T) { metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, true, "", &cipher) ctx := context.Background() metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL) - err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.mock.Storage, lastTS, ts) + s.mockGlue.SetSession(tk.Session()) + err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.mock.Storage, lastTS, ts) require.NoErrorf(t, err, "Error get ddl jobs: %s", err) err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL) require.NoErrorf(t, err, "Flush failed", err) diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index a80806b09e236..611e35b615c1e 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -263,16 +263,19 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig statsHandle = mgr.GetDomain().StatsHandle() } - se, err := g.CreateSession(mgr.GetStorage()) - if err != nil { - return errors.Trace(err) - } - newCollationEnable, err := se.GetGlobalVariable(tidbNewCollationEnabled) + var newCollationEnable string + err = g.UseOneShotSession(mgr.GetStorage(), func(se glue.Session) error { + newCollationEnable, err = se.GetGlobalVariable(tidbNewCollationEnabled) + if err != nil { + return errors.Trace(err) + } + log.Info("get new_collations_enabled_on_first_bootstrap config from system table", + zap.String(tidbNewCollationEnabled, newCollationEnable)) + return nil + }) if err != nil { return errors.Trace(err) } - log.Info("get new_collations_enabled_on_first_bootstrap config from system table", - zap.String(tidbNewCollationEnabled, newCollationEnable)) client, err := backup.NewBackupClient(ctx, mgr) if err != nil { @@ -399,7 +402,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig } metawriter.StartWriteMetasAsync(ctx, metautil.AppendDDL) - err = backup.WriteBackupDDLJobs(metawriter, se.GetSessionCtx(), mgr.GetStorage(), cfg.LastBackupTS, backupTS) + err = backup.WriteBackupDDLJobs(metawriter, g, mgr.GetStorage(), cfg.LastBackupTS, backupTS) if err != nil { return errors.Trace(err) } diff --git a/executor/brie.go b/executor/brie.go index 690497da83b54..4b491a438dc2b 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -568,3 +568,8 @@ func (gs *tidbGlueSession) Record(name string, value uint64) { func (gs *tidbGlueSession) GetVersion() string { return "TiDB\n" + printer.GetTiDBInfo() } + +// UseOneShotSession implements glue.Glue +func (gs *tidbGlueSession) UseOneShotSession(store kv.Storage, fn func(se glue.Session) error) error { + return fn(gs) +} From 351442a53434573419eab29d100654cec0ef195f Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 26 Jul 2022 13:22:59 +0800 Subject: [PATCH 2/7] fmt --- br/pkg/backup/client_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/br/pkg/backup/client_test.go b/br/pkg/backup/client_test.go index 84d642f5fbf0d..e86491c3d95da 100644 --- a/br/pkg/backup/client_test.go +++ b/br/pkg/backup/client_test.go @@ -5,7 +5,6 @@ package backup_test import ( "context" "encoding/json" - "github.com/pingcap/tidb/br/pkg/gluetidb" "math" "testing" "time" @@ -16,7 +15,7 @@ import ( "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/tidb/br/pkg/backup" "github.com/pingcap/tidb/br/pkg/conn" - "github.com/pingcap/tidb/br/pkg/glue" + "github.com/pingcap/tidb/br/pkg/gluetidb" "github.com/pingcap/tidb/br/pkg/metautil" "github.com/pingcap/tidb/br/pkg/mock" "github.com/pingcap/tidb/br/pkg/pdutil" From 35eec734c4f3e6cb2053400f1ae060c2df19766a Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 26 Jul 2022 13:35:54 +0800 Subject: [PATCH 3/7] fmt --- br/pkg/glue/glue.go | 2 +- br/pkg/restore/db_test.go | 1 - executor/brie.go | 1 + 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/br/pkg/glue/glue.go b/br/pkg/glue/glue.go index b4ae582874cd4..655d3d5d9e826 100644 --- a/br/pkg/glue/glue.go +++ b/br/pkg/glue/glue.go @@ -33,7 +33,7 @@ type Glue interface { // UseOneShotSession temporary creates session from store when run backup job. // because we don't have to own domain/session during the whole backup. // we can close domain as soon as possible. - // and in SQL backup job. we must reuse the exists session and don't close it. + // and we must reuse the exists session and don't close it in SQL backup job. UseOneShotSession(store kv.Storage, fn func(se Session) error) error } diff --git a/br/pkg/restore/db_test.go b/br/pkg/restore/db_test.go index 7124343188722..2903594f6671a 100644 --- a/br/pkg/restore/db_test.go +++ b/br/pkg/restore/db_test.go @@ -5,7 +5,6 @@ package restore_test import ( "context" "encoding/json" - "github.com/pingcap/tidb/br/pkg/glue" "math" "strconv" "testing" diff --git a/executor/brie.go b/executor/brie.go index 4b491a438dc2b..9c419c3879ad6 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -571,5 +571,6 @@ func (gs *tidbGlueSession) GetVersion() string { // UseOneShotSession implements glue.Glue func (gs *tidbGlueSession) UseOneShotSession(store kv.Storage, fn func(se glue.Session) error) error { + // in SQL backup. we don't need to close domain. return fn(gs) } From e682b9827eca82ff490f63a3ce4e451b9b682b14 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 26 Jul 2022 15:50:49 +0800 Subject: [PATCH 4/7] fix --- br/pkg/backup/client.go | 4 ++-- br/pkg/backup/client_test.go | 2 +- br/pkg/glue/glue.go | 2 +- br/pkg/gluetidb/glue.go | 8 +++++--- br/pkg/gluetikv/glue.go | 2 +- br/pkg/restore/db_test.go | 4 ++-- br/pkg/task/backup.go | 4 ++-- executor/brie.go | 2 +- 8 files changed, 15 insertions(+), 13 deletions(-) diff --git a/br/pkg/backup/client.go b/br/pkg/backup/client.go index ecd6aa3e5a364..17058f5a03722 100644 --- a/br/pkg/backup/client.go +++ b/br/pkg/backup/client.go @@ -473,7 +473,7 @@ func skipUnsupportedDDLJob(job *model.Job) bool { } // WriteBackupDDLJobs sends the ddl jobs are done in (lastBackupTS, backupTS] to metaWriter. -func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.Storage, lastBackupTS, backupTS uint64) error { +func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.Storage, lastBackupTS, backupTS uint64, needDomain bool) error { snapshot := store.GetSnapshot(kv.NewVersion(backupTS)) snapMeta := meta.NewSnapshotMeta(snapshot) lastSnapshot := store.GetSnapshot(kv.NewVersion(lastBackupTS)) @@ -493,7 +493,7 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.S } newestMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(version.Ver))) allJobs := make([]*model.Job, 0) - err = g.UseOneShotSession(store, func(se glue.Session) error { + err = g.UseOneShotSession(store, !needDomain, func(se glue.Session) error { allJobs, err = ddl.GetAllDDLJobs(se.GetSessionCtx(), newestMeta) if err != nil { return errors.Trace(err) diff --git a/br/pkg/backup/client_test.go b/br/pkg/backup/client_test.go index e86491c3d95da..ae5e24ac3002b 100644 --- a/br/pkg/backup/client_test.go +++ b/br/pkg/backup/client_test.go @@ -284,7 +284,7 @@ func TestSkipUnsupportedDDLJob(t *testing.T) { ctx := context.Background() metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL) s.mockGlue.SetSession(tk.Session()) - err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.cluster.Storage, lastTS, ts) + err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.cluster.Storage, lastTS, ts, false) require.NoErrorf(t, err, "Error get ddl jobs: %s", err) err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL) require.NoError(t, err, "Flush failed", err) diff --git a/br/pkg/glue/glue.go b/br/pkg/glue/glue.go index 655d3d5d9e826..09dfce094af07 100644 --- a/br/pkg/glue/glue.go +++ b/br/pkg/glue/glue.go @@ -34,7 +34,7 @@ type Glue interface { // because we don't have to own domain/session during the whole backup. // we can close domain as soon as possible. // and we must reuse the exists session and don't close it in SQL backup job. - UseOneShotSession(store kv.Storage, fn func(se Session) error) error + UseOneShotSession(store kv.Storage, closeDomain bool, fn func(se Session) error) error } // Session is an abstraction of the session.Session interface. diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index e0498d5e9ef2b..491ae436b6c9f 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -115,7 +115,7 @@ func (g Glue) GetVersion() string { } // UseOneShotSession implements glue.Glue. -func (g Glue) UseOneShotSession(store kv.Storage, fn func(glue.Session) error) error { +func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error { se, err := session.CreateSession(store) if err != nil { return errors.Trace(err) @@ -131,7 +131,9 @@ func (g Glue) UseOneShotSession(store kv.Storage, fn func(glue.Session) error) e // because domain was created during the whole program exists. // and it will register br info to info syncer. // we'd better close it as soon as possible. - defer dom.Close() + if closeDomain { + defer dom.Close() + } err = fn(glueSession) if err != nil { return errors.Trace(err) @@ -410,7 +412,7 @@ func (*MockGlue) GetVersion() string { } // UseOneShotSession implements glue.Glue. -func (m *MockGlue) UseOneShotSession(store kv.Storage, fn func(glue.Session) error) error { +func (m *MockGlue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error { glueSession := &mockSession{ se: m.se, } diff --git a/br/pkg/gluetikv/glue.go b/br/pkg/gluetikv/glue.go index ef3b895dc7624..a8c020528c771 100644 --- a/br/pkg/gluetikv/glue.go +++ b/br/pkg/gluetikv/glue.go @@ -70,6 +70,6 @@ func (Glue) GetVersion() string { } // UseOneShotSession implements glue.Glue. -func (g Glue) UseOneShotSession(store kv.Storage, fn func(glue.Session) error) error { +func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error { return nil } diff --git a/br/pkg/restore/db_test.go b/br/pkg/restore/db_test.go index 2903594f6671a..b5c52895c0ac1 100644 --- a/br/pkg/restore/db_test.go +++ b/br/pkg/restore/db_test.go @@ -197,7 +197,7 @@ func TestFilterDDLJobs(t *testing.T) { ctx := context.Background() metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL) s.mockGlue.SetSession(tk.Session()) - err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.mock.Storage, lastTS, ts) + err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.mock.Storage, lastTS, ts, false) require.NoErrorf(t, err, "Error get ddl jobs: %s", err) err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL) require.NoErrorf(t, err, "Flush failed", err) @@ -262,7 +262,7 @@ func TestFilterDDLJobsV2(t *testing.T) { ctx := context.Background() metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL) s.mockGlue.SetSession(tk.Session()) - err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.mock.Storage, lastTS, ts) + err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.mock.Storage, lastTS, ts, false) require.NoErrorf(t, err, "Error get ddl jobs: %s", err) err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL) require.NoErrorf(t, err, "Flush failed", err) diff --git a/br/pkg/task/backup.go b/br/pkg/task/backup.go index 611e35b615c1e..c7a70877f3254 100644 --- a/br/pkg/task/backup.go +++ b/br/pkg/task/backup.go @@ -264,7 +264,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig } var newCollationEnable string - err = g.UseOneShotSession(mgr.GetStorage(), func(se glue.Session) error { + err = g.UseOneShotSession(mgr.GetStorage(), !needDomain, func(se glue.Session) error { newCollationEnable, err = se.GetGlobalVariable(tidbNewCollationEnabled) if err != nil { return errors.Trace(err) @@ -402,7 +402,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig } metawriter.StartWriteMetasAsync(ctx, metautil.AppendDDL) - err = backup.WriteBackupDDLJobs(metawriter, g, mgr.GetStorage(), cfg.LastBackupTS, backupTS) + err = backup.WriteBackupDDLJobs(metawriter, g, mgr.GetStorage(), cfg.LastBackupTS, backupTS, needDomain) if err != nil { return errors.Trace(err) } diff --git a/executor/brie.go b/executor/brie.go index 9c419c3879ad6..f77f1567f7b64 100644 --- a/executor/brie.go +++ b/executor/brie.go @@ -570,7 +570,7 @@ func (gs *tidbGlueSession) GetVersion() string { } // UseOneShotSession implements glue.Glue -func (gs *tidbGlueSession) UseOneShotSession(store kv.Storage, fn func(se glue.Session) error) error { +func (gs *tidbGlueSession) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(se glue.Session) error) error { // in SQL backup. we don't need to close domain. return fn(gs) } From 7ba9ba9c73eaf0f71087fe5bae1514c49e672d3c Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 26 Jul 2022 15:54:56 +0800 Subject: [PATCH 5/7] fix --- br/pkg/gluetidb/glue.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index 491ae436b6c9f..0c06291ebded5 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -123,6 +123,10 @@ func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue glueSession := &tidbSession{ se: se, } + defer func() { + se.Close() + log.Info("one shot session closed") + }() // dom will be created during session.CreateSession. dom, err := session.GetDomain(store) if err != nil { @@ -132,7 +136,10 @@ func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue // and it will register br info to info syncer. // we'd better close it as soon as possible. if closeDomain { - defer dom.Close() + defer func() { + dom.Close() + log.Info("one shot session domain closed") + }() } err = fn(glueSession) if err != nil { From ff7dbfdd9a71f4a3f971cfb092d685c65470e495 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 26 Jul 2022 16:19:27 +0800 Subject: [PATCH 6/7] add integration test --- br/pkg/gluetidb/glue.go | 2 +- br/tests/br_full_ddl/run.sh | 10 ++++++++++ br/tests/br_incremental_ddl/run.sh | 32 +++++++++++++++++++++++++++--- 3 files changed, 40 insertions(+), 4 deletions(-) diff --git a/br/pkg/gluetidb/glue.go b/br/pkg/gluetidb/glue.go index 0c06291ebded5..dfe5ae62639bd 100644 --- a/br/pkg/gluetidb/glue.go +++ b/br/pkg/gluetidb/glue.go @@ -138,7 +138,7 @@ func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue if closeDomain { defer func() { dom.Close() - log.Info("one shot session domain closed") + log.Info("one shot domain closed") }() } err = fn(glueSession) diff --git a/br/tests/br_full_ddl/run.sh b/br/tests/br_full_ddl/run.sh index 1433c1f71e9a6..ae056ad5206e5 100755 --- a/br/tests/br_full_ddl/run.sh +++ b/br/tests/br_full_ddl/run.sh @@ -82,6 +82,16 @@ if [ "${checksum_count}" -lt "1" ];then exit 1 fi +# when we have backup stats during backup, we cannot close domain during one shot session. +# so we can check the log count of `one shot domain closed`. +# we will call UseOneShotSession once to get the value global variable. +one_shot_session_count=$(cat $LOG | grep "one shot session closed" | wc -l | xargs) +one_shot_domain_count=$(cat $LOG | grep "one shot domain closed" | wc -l | xargs) +if [ "${one_shot_session_count}" -ne "1" ] || [ "$one_shot_domain_count" -ne "0" ];then + echo "TEST: [$TEST_NAME] fail on one shot session check, $one_shot_session_count, $one_shot_domain_count" + exit 1 +fi + echo "backup start without stats..." run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/${DB}_disable_stats" --concurrency 4 diff --git a/br/tests/br_incremental_ddl/run.sh b/br/tests/br_incremental_ddl/run.sh index 49b825498e2af..9a2f6db13c14a 100755 --- a/br/tests/br_incremental_ddl/run.sh +++ b/br/tests/br_incremental_ddl/run.sh @@ -19,6 +19,7 @@ DB="$TEST_NAME" TABLE="usertable" ROW_COUNT=100 PATH="tests/$TEST_NAME:bin:$PATH" +LOG=/$TEST_DIR/backup.log echo "load data..." # create database @@ -32,7 +33,19 @@ done # full backup echo "full backup start..." -run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/full" --db $DB -t $TABLE +run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/full" --db $DB -t $TABLE --log-file $LOG + +# when we backup, we should close domain in one shot session. +# so we can check the log count of `one shot domain closed` to be 1. +# we will call UseOneShotSession once to get the value global variable. +one_shot_session_count=$(cat $LOG | grep "one shot session closed" | wc -l | xargs) +one_shot_domain_count=$(cat $LOG | grep "one shot domain closed" | wc -l | xargs) +if [ "${one_shot_session_count}" -ne "1" ] || [ "$one_shot_domain_count" -ne "1" ];then + echo "TEST: [$TEST_NAME] fail on one shot session check during backup, $one_shot_session_count, $one_shot_domain_count" + exit 1 +fi +rm -rf $LOG + # run ddls echo "run ddls..." run_sql "RENAME TABLE ${DB}.${TABLE} to ${DB}.${TABLE}1;" @@ -54,7 +67,20 @@ done # incremental backup echo "incremental backup start..." last_backup_ts=$(run_br validate decode --field="end-version" -s "local://$TEST_DIR/$DB/full" | grep -oE "^[0-9]+") -run_br --pd $PD_ADDR backup db -s "local://$TEST_DIR/$DB/inc" --db $DB --lastbackupts $last_backup_ts +run_br --pd $PD_ADDR backup db -s "local://$TEST_DIR/$DB/inc" --db $DB --lastbackupts $last_backup_ts --log-file $LOG + +# when we doing incremental backup, we should close domain in one shot session. +# so we can check the log count of `one shot domain closed` to be 2. +# we will call UseOneShotSession twice +# 1. to get the value global variable. +# 2. to get all ddl jobs with session. +one_shot_session_count=$(cat $LOG | grep "one shot session closed" | wc -l | xargs) +one_shot_domain_count=$(cat $LOG | grep "one shot domain closed" | wc -l | xargs) +if [ "${one_shot_session_count}" -ne "2" ] || [ "$one_shot_domain_count" -ne "2" ];then + echo "TEST: [$TEST_NAME] fail on one shot session check during inc backup, $one_shot_session_count, $one_shot_domain_count" + exit 1 +fi +rm -rf $LOG run_sql "DROP DATABASE $DB;" # full restore @@ -101,4 +127,4 @@ fi run_sql "INSERT INTO ${DB}.${TABLE}(c2) VALUES ('1');" run_sql "INSERT INTO ${DB}.${TABLE}_rename2(c) VALUES ('1');" -run_sql "DROP DATABASE $DB;" \ No newline at end of file +run_sql "DROP DATABASE $DB;" From 1b07bc744e5403fe6bbd638fc03e0e4ea3d1b96c Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 26 Jul 2022 16:55:54 +0800 Subject: [PATCH 7/7] fix integration test --- br/tests/br_incremental_ddl/run.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/br/tests/br_incremental_ddl/run.sh b/br/tests/br_incremental_ddl/run.sh index 9a2f6db13c14a..df5a478f6733e 100755 --- a/br/tests/br_incremental_ddl/run.sh +++ b/br/tests/br_incremental_ddl/run.sh @@ -31,6 +31,9 @@ for i in $(seq $ROW_COUNT); do run_sql "INSERT INTO ${DB}.${TABLE}(c1) VALUES ($i);" done + +# Do not log to terminal +unset BR_LOG_TO_TERM # full backup echo "full backup start..." run_br --pd $PD_ADDR backup table -s "local://$TEST_DIR/$DB/full" --db $DB -t $TABLE --log-file $LOG @@ -81,6 +84,7 @@ if [ "${one_shot_session_count}" -ne "2" ] || [ "$one_shot_domain_count" -ne "2" exit 1 fi rm -rf $LOG +BR_LOG_TO_TERM=1 run_sql "DROP DATABASE $DB;" # full restore