Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: use one shot session to close domain ASAP #36558

Merged
merged 11 commits into from
Jul 26, 2022
16 changes: 12 additions & 4 deletions br/pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/conn"
berrors "github.com/pingcap/tidb/br/pkg/errors"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/logutil"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/redact"
Expand All @@ -36,7 +37,6 @@ import (
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/ranger"
Expand Down Expand Up @@ -473,7 +473,7 @@ func skipUnsupportedDDLJob(job *model.Job) bool {
}

// WriteBackupDDLJobs sends the ddl jobs are done in (lastBackupTS, backupTS] to metaWriter.
func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, se sessionctx.Context, store kv.Storage, lastBackupTS, backupTS uint64) error {
func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, g glue.Glue, store kv.Storage, lastBackupTS, backupTS uint64, needDomain bool) error {
snapshot := store.GetSnapshot(kv.NewVersion(backupTS))
snapMeta := meta.NewSnapshotMeta(snapshot)
lastSnapshot := store.GetSnapshot(kv.NewVersion(lastBackupTS))
Expand All @@ -492,11 +492,19 @@ func WriteBackupDDLJobs(metaWriter *metautil.MetaWriter, se sessionctx.Context,
return errors.Trace(err)
}
newestMeta := meta.NewSnapshotMeta(store.GetSnapshot(kv.NewVersion(version.Ver)))
allJobs, err := ddl.GetAllDDLJobs(se, newestMeta)
allJobs := make([]*model.Job, 0)
err = g.UseOneShotSession(store, !needDomain, func(se glue.Session) error {
allJobs, err = ddl.GetAllDDLJobs(se.GetSessionCtx(), newestMeta)
if err != nil {
return errors.Trace(err)
}
log.Debug("get all jobs", zap.Int("jobs", len(allJobs)))
return nil
})
if err != nil {
return errors.Trace(err)
}
log.Debug("get all jobs", zap.Int("jobs", len(allJobs)))

historyJobs, err := ddl.GetAllHistoryDDLJobs(newestMeta)
if err != nil {
return errors.Trace(err)
Expand Down
6 changes: 5 additions & 1 deletion br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/tidb/br/pkg/backup"
"github.com/pingcap/tidb/br/pkg/conn"
"github.com/pingcap/tidb/br/pkg/gluetidb"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/pdutil"
Expand All @@ -38,6 +39,7 @@ type testBackup struct {
cancel context.CancelFunc

mockPDClient pd.Client
mockGlue *gluetidb.MockGlue
backupClient *backup.Client

cluster *mock.Cluster
Expand All @@ -48,6 +50,7 @@ func createBackupSuite(t *testing.T) (s *testBackup, clean func()) {
tikvClient, _, pdClient, err := testutils.NewMockTiKV("", nil)
require.NoError(t, err)
s = new(testBackup)
s.mockGlue = &gluetidb.MockGlue{}
s.mockPDClient = pdClient
s.ctx, s.cancel = context.WithCancel(context.Background())
mockMgr := &conn.Mgr{PdController: &pdutil.PdController{}}
Expand Down Expand Up @@ -280,7 +283,8 @@ func TestSkipUnsupportedDDLJob(t *testing.T) {
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.cluster.Storage, lastTS, ts)
s.mockGlue.SetSession(tk.Session())
err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.cluster.Storage, lastTS, ts, false)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoError(t, err, "Flush failed", err)
Expand Down
6 changes: 6 additions & 0 deletions br/pkg/glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ type Glue interface {

// GetVersion gets BR package version to run backup/restore job
GetVersion() string

// UseOneShotSession temporary creates session from store when run backup job.
// because we don't have to own domain/session during the whole backup.
// we can close domain as soon as possible.
// and we must reuse the exists session and don't close it in SQL backup job.
UseOneShotSession(store kv.Storage, closeDomain bool, fn func(se Session) error) error
}

// Session is an abstraction of the session.Session interface.
Expand Down
159 changes: 159 additions & 0 deletions br/pkg/gluetidb/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,40 @@ func (g Glue) GetVersion() string {
return g.tikvGlue.GetVersion()
}

// UseOneShotSession implements glue.Glue.
func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error {
se, err := session.CreateSession(store)
if err != nil {
return errors.Trace(err)
}
glueSession := &tidbSession{
se: se,
}
defer func() {
se.Close()
log.Info("one shot session closed")
}()
// dom will be created during session.CreateSession.
dom, err := session.GetDomain(store)
if err != nil {
return errors.Trace(err)
}
// because domain was created during the whole program exists.
// and it will register br info to info syncer.
// we'd better close it as soon as possible.
if closeDomain {
defer func() {
dom.Close()
log.Info("one shot domain closed")
}()
}
err = fn(glueSession)
if err != nil {
return errors.Trace(err)
}
return nil
}

// GetSessionCtx implements glue.Glue
func (gs *tidbSession) GetSessionCtx() sessionctx.Context {
return gs.se
Expand Down Expand Up @@ -266,3 +300,128 @@ func (gs *tidbSession) showCreateDatabase(db *model.DBInfo) (string, error) {
func (gs *tidbSession) showCreatePlacementPolicy(policy *model.PolicyInfo) string {
return executor.ConstructResultOfShowCreatePlacementPolicy(policy)
}

// mockSession is used for test.
type mockSession struct {
se session.Session
}

// GetSessionCtx implements glue.Glue
func (s *mockSession) GetSessionCtx() sessionctx.Context {
return s.se
}

// Execute implements glue.Session.
func (s *mockSession) Execute(ctx context.Context, sql string) error {
return s.ExecuteInternal(ctx, sql)
}

func (s *mockSession) ExecuteInternal(ctx context.Context, sql string, args ...interface{}) error {
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBR)
rs, err := s.se.ExecuteInternal(ctx, sql, args...)
if err != nil {
return err
}
// Some of SQLs (like ADMIN RECOVER INDEX) may lazily take effect
// when we polling the result set.
// At least call `next` once for triggering theirs side effect.
// (Maybe we'd better drain all returned rows?)
if rs != nil {
//nolint: errcheck
defer rs.Close()
c := rs.NewChunk(nil)
if err := rs.Next(ctx, c); err != nil {
return nil
}
}
return nil
}

// CreateDatabase implements glue.Session.
func (s *mockSession) CreateDatabase(ctx context.Context, schema *model.DBInfo) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil

}

// CreatePlacementPolicy implements glue.Session.
func (s *mockSession) CreatePlacementPolicy(ctx context.Context, policy *model.PolicyInfo) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}

// CreateTables implements glue.BatchCreateTableSession.
func (s *mockSession) CreateTables(ctx context.Context, tables map[string][]*model.TableInfo) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}

// CreateTable implements glue.Session.
func (s *mockSession) CreateTable(ctx context.Context, dbName model.CIStr, table *model.TableInfo) error {
log.Fatal("unimplemented CreateDatabase for mock session")
return nil
}

// Close implements glue.Session.
func (s *mockSession) Close() {
s.se.Close()
}

// GetGlobalVariables implements glue.Session.
func (s *mockSession) GetGlobalVariable(name string) (string, error) {
return "true", nil
}

// MockGlue only used for test
type MockGlue struct {
se session.Session
}

func (m *MockGlue) SetSession(se session.Session) {
m.se = se
}

// GetDomain implements glue.Glue.
func (*MockGlue) GetDomain(store kv.Storage) (*domain.Domain, error) {
return nil, nil
}

// CreateSession implements glue.Glue.
func (m *MockGlue) CreateSession(store kv.Storage) (glue.Session, error) {
glueSession := &mockSession{
se: m.se,
}
return glueSession, nil
}

// Open implements glue.Glue.
func (*MockGlue) Open(path string, option pd.SecurityOption) (kv.Storage, error) {
return nil, nil
}

// OwnsStorage implements glue.Glue.
func (*MockGlue) OwnsStorage() bool {
return true
}

// StartProgress implements glue.Glue.
func (*MockGlue) StartProgress(ctx context.Context, cmdName string, total int64, redirectLog bool) glue.Progress {
return nil
}

// Record implements glue.Glue.
func (*MockGlue) Record(name string, value uint64) {
}

// GetVersion implements glue.Glue.
func (*MockGlue) GetVersion() string {
return "mock glue"
}

// UseOneShotSession implements glue.Glue.
func (m *MockGlue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error {
glueSession := &mockSession{
se: m.se,
}
return fn(glueSession)
}
5 changes: 5 additions & 0 deletions br/pkg/gluetikv/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,8 @@ func (Glue) Record(name string, val uint64) {
func (Glue) GetVersion() string {
return "BR\n" + build.Info()
}

// UseOneShotSession implements glue.Glue.
func (g Glue) UseOneShotSession(store kv.Storage, closeDomain bool, fn func(glue.Session) error) error {
return nil
}
12 changes: 8 additions & 4 deletions br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ import (
)

type testRestoreSchemaSuite struct {
mock *mock.Cluster
storage storage.ExternalStorage
mock *mock.Cluster
mockGlue *gluetidb.MockGlue
storage storage.ExternalStorage
}

func createRestoreSchemaSuite(t *testing.T) (s *testRestoreSchemaSuite, clean func()) {
var err error
s = new(testRestoreSchemaSuite)
s.mockGlue = &gluetidb.MockGlue{}
s.mock, err = mock.NewCluster()
require.NoError(t, err)
base := t.TempDir()
Expand Down Expand Up @@ -194,7 +196,8 @@ func TestFilterDDLJobs(t *testing.T) {
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, false, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.mock.Storage, lastTS, ts)
s.mockGlue.SetSession(tk.Session())
err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.mock.Storage, lastTS, ts, false)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoErrorf(t, err, "Flush failed", err)
Expand Down Expand Up @@ -258,7 +261,8 @@ func TestFilterDDLJobsV2(t *testing.T) {
metaWriter := metautil.NewMetaWriter(s.storage, metautil.MetaFileSize, true, "", &cipher)
ctx := context.Background()
metaWriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metaWriter, tk.Session(), s.mock.Storage, lastTS, ts)
s.mockGlue.SetSession(tk.Session())
err = backup.WriteBackupDDLJobs(metaWriter, s.mockGlue, s.mock.Storage, lastTS, ts, false)
require.NoErrorf(t, err, "Error get ddl jobs: %s", err)
err = metaWriter.FinishWriteMetas(ctx, metautil.AppendDDL)
require.NoErrorf(t, err, "Flush failed", err)
Expand Down
19 changes: 11 additions & 8 deletions br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,16 +263,19 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
statsHandle = mgr.GetDomain().StatsHandle()
}

se, err := g.CreateSession(mgr.GetStorage())
if err != nil {
return errors.Trace(err)
}
newCollationEnable, err := se.GetGlobalVariable(tidbNewCollationEnabled)
var newCollationEnable string
err = g.UseOneShotSession(mgr.GetStorage(), !needDomain, func(se glue.Session) error {
newCollationEnable, err = se.GetGlobalVariable(tidbNewCollationEnabled)
if err != nil {
return errors.Trace(err)
}
log.Info("get new_collations_enabled_on_first_bootstrap config from system table",
zap.String(tidbNewCollationEnabled, newCollationEnable))
return nil
})
if err != nil {
return errors.Trace(err)
}
log.Info("get new_collations_enabled_on_first_bootstrap config from system table",
zap.String(tidbNewCollationEnabled, newCollationEnable))

client, err := backup.NewBackupClient(ctx, mgr)
if err != nil {
Expand Down Expand Up @@ -399,7 +402,7 @@ func RunBackup(c context.Context, g glue.Glue, cmdName string, cfg *BackupConfig
}

metawriter.StartWriteMetasAsync(ctx, metautil.AppendDDL)
err = backup.WriteBackupDDLJobs(metawriter, se.GetSessionCtx(), mgr.GetStorage(), cfg.LastBackupTS, backupTS)
err = backup.WriteBackupDDLJobs(metawriter, g, mgr.GetStorage(), cfg.LastBackupTS, backupTS, needDomain)
if err != nil {
return errors.Trace(err)
}
Expand Down
10 changes: 10 additions & 0 deletions br/tests/br_full_ddl/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,16 @@ if [ "${checksum_count}" -lt "1" ];then
exit 1
fi

# when we have backup stats during backup, we cannot close domain during one shot session.
# so we can check the log count of `one shot domain closed`.
# we will call UseOneShotSession once to get the value global variable.
one_shot_session_count=$(cat $LOG | grep "one shot session closed" | wc -l | xargs)
one_shot_domain_count=$(cat $LOG | grep "one shot domain closed" | wc -l | xargs)
if [ "${one_shot_session_count}" -ne "1" ] || [ "$one_shot_domain_count" -ne "0" ];then
echo "TEST: [$TEST_NAME] fail on one shot session check, $one_shot_session_count, $one_shot_domain_count"
exit 1
fi

echo "backup start without stats..."
run_br --pd $PD_ADDR backup full -s "local://$TEST_DIR/${DB}_disable_stats" --concurrency 4

Expand Down
Loading