Skip to content

Commit

Permalink
br: use one shot session to close domain ASAP (#36558) (#36594)
Browse files Browse the repository at this point in the history
ref #36546
  • Loading branch information
ti-srebot authored Jul 28, 2022
1 parent 5772997 commit fb63385
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 20 deletions.
16 changes: 12 additions & 4 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/conn"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/redact"
Expand All @@ -36,7 +37,6 @@ import (
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/ranger"
Expand Down Expand Up @@ -473,7 +473,7 @@ func skipUnsupportedDDLJob(job *model.Job) bool {
}

// WriteBackupDDLJobs sends the ddl jobs are done in (lastBackupTS, backupTS] to metaWriter.
func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, se sessionctx.Context, store kv.Storage, lastBackupTS, backupTS uint64) error {
func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.Storage, lastBackupTS, backupTS uint64, needDomain bool) error {
snapshot := store.GetSnapshot(kv.NewVersion(backupTS))
snapMeta := meta.NewSnapshotMeta(snapshot)
lastSnapshot := store.GetSnapshot(kv.NewVersion(lastBackupTS))
Expand All @@ -492,11 +492,19 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, se sessionctx.Context,
return errors.Trace(err)
}
newestMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(version.Ver)))
allJobs, err := ddl.GetAllDDLJobs(se, newestMeta)
allJobs := make([]*model.Job, 0)
err = g.UseOneShotSession(store, !needDomain, func(se glue.Session) error {
allJobs, err = ddl.GetAllDDLJobs(se.GetSessionCtx(), newestMeta)
if err != nil {
return errors.Trace(err)
}
log.Debug("get all jobs", zap.Int("jobs", len(allJobs)))
return nil
})
if err != nil {
return errors.Trace(err)
}
log.Debug("get all jobs", zap.Int("jobs", len(allJobs)))

historyJobs, err := ddl.GetAllHistoryDDLJobs(newestMeta)
if err != nil {
return errors.Trace(err)
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/tidb/br/pkg/backup"
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/gluetidb"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/pdutil"
Expand All @@ -38,6 +39,7 @@ type testBackup struct {
cancel context.CancelFunc

mockPDClient pd.Client
mockGlue *gluetidb.MockGlue
backupClient *backup.Client

cluster *mock.Cluster
Expand All @@ -48,6 +50,7 @@ func createBackupSuite(t *testing.T) (s *testBackup, clean func()) {
tikvClient, _, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(t, err)
s = new(testBackup)
s.mockGlue = &gluetidb.MockGlue{}
s.mockPDClient = pdClient
s.ctx, s.cancel = context.WithCancel(context.Background())
mockMgr := &conn.Mgr{PdController: &pdutil.PdController{}}
Expand Down Expand Up @@ -280,7 +283,8 @@ func TestSkipUnsupportedDDLJob(t *testing.T) {
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.cluster.Storage, lastTS, ts)
s.mockGlue.SetSession(tk.Session())
err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.cluster.Storage, lastTS, ts, false)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoError(t, err, "Flush failed", err)
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ type Glue interface {

// GetVersion gets BR package version to run backup/restore job
GetVersion() string

// UseOneShotSession temporary creates session from store when run backup job.
// because we don't have to own domain/session during the whole backup.
// we can close domain as soon as possible.
// and we must reuse the exists session and don't close it in SQL backup job.
UseOneShotSession(store kv.Storage, closeDomain bool, fn func(se Session) error) error
}

// Session is an abstraction of the session.Session interface.
Expand Down
159 changes: 159 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,40 @@ func (g Glue) GetVersion() string {
return g.tikvGlue.GetVersion()
}

// UseOneShotSession implements glue.Glue.
func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error {
se, err := session.CreateSession(store)
if err != nil {
return errors.Trace(err)
}
glueSession := &tidbSession{
se: se,
}
defer func() {
se.Close()
log.Info("one shot session closed")
}()
// dom will be created during session.CreateSession.
dom, err := session.GetDomain(store)
if err != nil {
return errors.Trace(err)
}
// because domain was created during the whole program exists.
// and it will register br info to info syncer.
// we'd better close it as soon as possible.
if closeDomain {
defer func() {
dom.Close()
log.Info("one shot domain closed")
}()
}
err = fn(glueSession)
if err != nil {
return errors.Trace(err)
}
return nil
}

// GetSessionCtx implements glue.Glue
func (gs *tidbSession) GetSessionCtx() sessionctx.Context {
return gs.se
Expand Down Expand Up @@ -266,3 +300,128 @@ func (gs *tidbSession) showCreateDatabase(db *model.DBInfo) (string, error) {
func (gs *tidbSession) showCreatePlacementPolicy(policy *model.PolicyInfo) string {
return executor.ConstructResultOfShowCreatePlacementPolicy(policy)
}

// mockSession is used for test.
type mockSession struct {
se session.Session
}

// GetSessionCtx implements glue.Glue
func (s *mockSession) GetSessionCtx() sessionctx.Context {
return s.se
}

// Execute implements glue.Session.
func (s *mockSession) Execute(ctx context.Context, sql string) error {
return s.ExecuteInternal(ctx, sql)
}

func (s *mockSession) ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error {
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR)
rs, err := s.se.ExecuteInternal(ctx, sql, args...)
if err != nil {
return err
}
// Some of SQLs (like ADMIN RECOVER INDEX) may lazily take effect
// when we polling the result set.
// At least call `next` once for triggering theirs side effect.
// (Maybe we'd better drain all returned rows?)
if rs != nil {
//nolint: errcheck
defer rs.Close()
c := rs.NewChunk(nil)
if err := rs.Next(ctx, c); err != nil {
return nil
}
}
return nil
}

// CreateDatabase implements glue.Session.
func (s *mockSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil

}

// CreatePlacementPolicy implements glue.Session.
func (s *mockSession) CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}

// CreateTables implements glue.BatchCreateTableSession.
func (s *mockSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}

// CreateTable implements glue.Session.
func (s *mockSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}

// Close implements glue.Session.
func (s *mockSession) Close() {
s.se.Close()
}

// GetGlobalVariables implements glue.Session.
func (s *mockSession) GetGlobalVariable(name string) (string, error) {
return "true", nil
}

// MockGlue only used for test
type MockGlue struct {
se session.Session
}

func (m *MockGlue) SetSession(se session.Session) {
m.se = se
}

// GetDomain implements glue.Glue.
func (*MockGlue) GetDomain(store kv.Storage) (*domain.Domain, error) {
return nil, nil
}

// CreateSession implements glue.Glue.
func (m *MockGlue) CreateSession(store kv.Storage) (glue.Session, error) {
glueSession := &mockSession{
se: m.se,
}
return glueSession, nil
}

// Open implements glue.Glue.
func (*MockGlue) Open(path string, option pd.SecurityOption) (kv.Storage, error) {
return nil, nil
}

// OwnsStorage implements glue.Glue.
func (*MockGlue) OwnsStorage() bool {
return true
}

// StartProgress implements glue.Glue.
func (*MockGlue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress {
return nil
}

// Record implements glue.Glue.
func (*MockGlue) Record(name string, value uint64) {
}

// GetVersion implements glue.Glue.
func (*MockGlue) GetVersion() string {
return "mock glue"
}

// UseOneShotSession implements glue.Glue.
func (m *MockGlue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error {
glueSession := &mockSession{
se: m.se,
}
return fn(glueSession)
}
5 changes: 5 additions & 0 deletions br/pkg/gluetikv/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,8 @@ func (Glue) Record(name string, val uint64) {
func (Glue) GetVersion() string {
return "BR\n" + build.Info()
}

// UseOneShotSession implements glue.Glue.
func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error {
return nil
}
12 changes: 8 additions & 4 deletions br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ import (
)

type testRestoreSchemaSuite struct {
mock *mock.Cluster
storage storage.ExternalStorage
mock *mock.Cluster
mockGlue *gluetidb.MockGlue
storage storage.ExternalStorage
}

func createRestoreSchemaSuite(t *testing.T) (s *testRestoreSchemaSuite, clean func()) {
var err error
s = new(testRestoreSchemaSuite)
s.mockGlue = &gluetidb.MockGlue{}
s.mock, err = mock.NewCluster()
require.NoError(t, err)
base := t.TempDir()
Expand Down Expand Up @@ -194,7 +196,8 @@ func TestFilterDDLJobs(t *testing.T) {
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.mock.Storage, lastTS, ts)
s.mockGlue.SetSession(tk.Session())
err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.mock.Storage, lastTS, ts, false)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoErrorf(t, err, "Flush failed", err)
Expand Down Expand Up @@ -258,7 +261,8 @@ func TestFilterDDLJobsV2(t *testing.T) {
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, true, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.mock.Storage, lastTS, ts)
s.mockGlue.SetSession(tk.Session())
err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.mock.Storage, lastTS, ts, false)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoErrorf(t, err, "Flush failed", err)
Expand Down
19 changes: 11 additions & 8 deletions br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,16 +263,19 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
statsHandle = mgr.GetDomain().StatsHandle()
}

se, err := g.CreateSession(mgr.GetStorage())
if err != nil {
return errors.Trace(err)
}
newCollationEnable, err := se.GetGlobalVariable(tidbNewCollationEnabled)
var newCollationEnable string
err = g.UseOneShotSession(mgr.GetStorage(), !needDomain, func(se glue.Session) error {
newCollationEnable, err = se.GetGlobalVariable(tidbNewCollationEnabled)
if err != nil {
return errors.Trace(err)
}
log.Info("get new_collations_enabled_on_first_bootstrap config from system table",
zap.String(tidbNewCollationEnabled, newCollationEnable))
return nil
})
if err != nil {
return errors.Trace(err)
}
log.Info("get new_collations_enabled_on_first_bootstrap config from system table",
zap.String(tidbNewCollationEnabled, newCollationEnable))

client, err := backup.NewBackupClient(ctx, mgr)
if err != nil {
Expand Down Expand Up @@ -399,7 +402,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
}

metawriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metawriter, se.GetSessionCtx(), mgr.GetStorage(), cfg.LastBackupTS, backupTS)
err = backup.WriteBackupDDLJobs(metawriter, g, mgr.GetStorage(), cfg.LastBackupTS, backupTS, needDomain)
if err != nil {
return errors.Trace(err)
}
Expand Down
10 changes: 10 additions & 0 deletions br/tests/br_full_ddl/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ if [ "${checksum_count}" -lt "1" ];then
exit 1
fi

# when we have backup stats during backup, we cannot close domain during one shot session.
# so we can check the log count of `one shot domain closed`.
# we will call UseOneShotSession once to get the value global variable.
one_shot_session_count=$(cat $LOG | grep "one shot session closed" | wc -l | xargs)
one_shot_domain_count=$(cat $LOG | grep "one shot domain closed" | wc -l | xargs)
if [ "${one_shot_session_count}" -ne "1" ] || [ "$one_shot_domain_count" -ne "0" ];then
echo "TEST: [$TEST_NAME] fail on one shot session check, $one_shot_session_count, $one_shot_domain_count"
exit 1
fi

echo "backup start without stats..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/${DB}_disable_stats" --concurrency 4

Expand Down
Loading

0 comments on commit fb63385

Please sign in to comment.