From e91bf49590879485fae5f1953470b34c197553f8 Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 10 Feb 2020 21:03:35 +0800 Subject: [PATCH 1/6] *: add tiflash replica sync progress Signed-off-by: crazycs --- domain/infosync/info.go | 67 +++++++++++++++++++++++++++++++++++++++ infoschema/tables.go | 14 ++++++-- infoschema/tables_test.go | 4 +-- server/http_handler.go | 12 ++++++- 4 files changed, 92 insertions(+), 5 deletions(-) diff --git a/domain/infosync/info.go b/domain/infosync/info.go index 3d9c2a1ba2fd0..f79f36b826b39 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -44,6 +44,8 @@ const ( ServerInformationPath = "/tidb/server/info" // ServerMinStartTSPath store the server min start timestamp. ServerMinStartTSPath = "/tidb/server/minstartts" + // TiFlashTableSyncProgressPath store the tiflash table replica sync progress. + TiFlashTableSyncProgressPath = "/tiflash/table/sync" // keyOpDefaultRetryCnt is the default retry count for etcd store. keyOpDefaultRetryCnt = 5 // keyOpDefaultTimeout is the default time out for etcd store. @@ -169,6 +171,71 @@ func GetAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) { return is.getAllServerInfo(ctx) } +// UpdateTiFlashTableSyncProgress uses to update the tiflash table replica sync progress. +func UpdateTiFlashTableSyncProgress(ctx context.Context, tid int64, progress float64) error { + is, err := getGlobalInfoSyncer() + if err != nil { + return err + } + if is.etcdCli == nil { + return nil + } + key := fmt.Sprintf("%s/%v", TiFlashTableSyncProgressPath, tid) + return util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, key, strconv.FormatFloat(progress, 'f', 2, 64)) +} + +// DeleteTiFlashTableSyncProgress uses to delete the tiflash table replica sync progress. +func DeleteTiFlashTableSyncProgress(tid int64) error { + is, err := getGlobalInfoSyncer() + if err != nil { + return err + } + if is.etcdCli == nil { + return nil + } + key := fmt.Sprintf("%s/%v", TiFlashTableSyncProgressPath, tid) + return util.DeleteKeyFromEtcd(key, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout) +} + +// DeleteTiFlashTableSyncProgress uses to delete the tiflash table replica sync progress. +func GetTiFlashTableSyncProgress(ctx context.Context) (map[int64]float64, error) { + is, err := getGlobalInfoSyncer() + if err != nil { + return nil, err + } + progressMap := make(map[int64]float64) + if is.etcdCli == nil { + return progressMap, nil + } + for i := 0; i < keyOpDefaultRetryCnt; i++ { + resp, err := is.etcdCli.Get(ctx, TiFlashTableSyncProgressPath, clientv3.WithPrefix()) + if err != nil { + logutil.BgLogger().Info("get tiflash table replica sync progress failed, continue checking.", zap.Error(err)) + continue + } + for _, kv := range resp.Kvs { + if len(kv.Key) <= len(TiFlashTableSyncProgressPath)+1 { + logutil.BgLogger().Info("invalid tiflash table replica sync progress key.", zap.String("key", string(kv.Key))) + continue + } + tid, err := strconv.ParseInt(string(kv.Key[len(TiFlashTableSyncProgressPath)+1:]), 10, 64) + if err != nil { + logutil.BgLogger().Info("invalid tiflash table replica sync progress key.", zap.String("key", string(kv.Key))) + continue + } + progress, err := strconv.ParseFloat(string(kv.Value), 64) + if err != nil { + logutil.BgLogger().Info("invalid tiflash table replica sync progress value.", + zap.String("key", string(kv.Key)), zap.String("value", string(kv.Value))) + continue + } + progressMap[tid] = progress + } + break + } + return progressMap, nil +} + func (is *InfoSyncer) getAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) { allInfo := make(map[string]*ServerInfo) if is.etcdCli == nil { diff --git a/infoschema/tables.go b/infoschema/tables.go index e5492d63c8346..ca451697fb559 100755 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -1115,6 +1115,7 @@ var tableTableTiFlashReplicaCols = []columnInfo{ {"REPLICA_COUNT", mysql.TypeLonglong, 64, 0, nil, nil}, {"LOCATION_LABELS", mysql.TypeVarchar, 64, 0, nil, nil}, {"AVAILABLE", mysql.TypeTiny, 1, 0, nil, nil}, + {"PROGRESS", mysql.TypeDouble, 22, 0, nil, nil}, } var tableInspectionResultCols = []columnInfo{ @@ -2276,13 +2277,21 @@ func dataForTiDBClusterInfo(ctx sessionctx.Context) ([][]types.Datum, error) { } // dataForTableTiFlashReplica constructs data for table tiflash replica info. -func dataForTableTiFlashReplica(schemas []*model.DBInfo) [][]types.Datum { +func dataForTableTiFlashReplica(ctx sessionctx.Context, schemas []*model.DBInfo) [][]types.Datum { var rows [][]types.Datum + progressMap, err := infosync.GetTiFlashTableSyncProgress(context.Background()) + if err != nil { + ctx.GetSessionVars().StmtCtx.AppendWarning(err) + } for _, schema := range schemas { for _, tbl := range schema.Tables { if tbl.TiFlashReplica == nil { continue } + progress := 1.0 + if !tbl.TiFlashReplica.Available { + progress = progressMap[tbl.ID] + } record := types.MakeDatums( schema.Name.O, // TABLE_SCHEMA tbl.Name.O, // TABLE_NAME @@ -2290,6 +2299,7 @@ func dataForTableTiFlashReplica(schemas []*model.DBInfo) [][]types.Datum { int64(tbl.TiFlashReplica.Count), // REPLICA_COUNT strings.Join(tbl.TiFlashReplica.LocationLabels, ","), // LOCATION_LABELS tbl.TiFlashReplica.Available, // AVAILABLE + progress, // PROGRESS ) rows = append(rows, record) } @@ -2455,7 +2465,7 @@ func (it *infoschemaTable) getRows(ctx sessionctx.Context, cols []*table.Column) case TableClusterInfo: fullRows, err = dataForTiDBClusterInfo(ctx) case tableTiFlashReplica: - fullRows = dataForTableTiFlashReplica(dbs) + fullRows = dataForTableTiFlashReplica(ctx, dbs) // Data for cluster memory table. case clusterTableSlowLog, clusterTableProcesslist: fullRows, err = getClusterMemTableRows(ctx, it.meta.Name.O) diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index b0a027cc8646a..f118c6ea5adcb 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -967,11 +967,11 @@ func (s *testTableSuite) TestForTableTiFlashReplica(c *C) { tk.MustExec("drop table if exists t") tk.MustExec("create table t (a int, b int, index idx(a))") tk.MustExec("alter table t set tiflash replica 2 location labels 'a','b';") - tk.MustQuery("select TABLE_SCHEMA,TABLE_NAME,REPLICA_COUNT,LOCATION_LABELS,AVAILABLE from information_schema.tiflash_replica").Check(testkit.Rows("test t 2 a,b 0")) + tk.MustQuery("select TABLE_SCHEMA,TABLE_NAME,REPLICA_COUNT,LOCATION_LABELS,AVAILABLE, PROGRESS from information_schema.tiflash_replica").Check(testkit.Rows("test t 2 a,b 0 0")) tbl, err := domain.GetDomain(tk.Se).InfoSchema().TableByName(model.NewCIStr("test"), model.NewCIStr("t")) c.Assert(err, IsNil) tbl.Meta().TiFlashReplica.Available = true - tk.MustQuery("select TABLE_SCHEMA,TABLE_NAME,REPLICA_COUNT,LOCATION_LABELS,AVAILABLE from information_schema.tiflash_replica").Check(testkit.Rows("test t 2 a,b 1")) + tk.MustQuery("select TABLE_SCHEMA,TABLE_NAME,REPLICA_COUNT,LOCATION_LABELS,AVAILABLE, PROGRESS from information_schema.tiflash_replica").Check(testkit.Rows("test t 2 a,b 1 1")) } func (s *testClusterTableSuite) TestForClusterServerInfo(c *C) { diff --git a/server/http_handler.go b/server/http_handler.go index 04534377117ce..8b4ed84f0af20 100644 --- a/server/http_handler.go +++ b/server/http_handler.go @@ -779,10 +779,20 @@ func (h flashReplicaHandler) handleStatusReport(w http.ResponseWriter, req *http writeError(w, err) return } - err = do.DDL().UpdateTableReplicaInfo(s, status.ID, status.checkTableFlashReplicaAvailable()) + available := status.checkTableFlashReplicaAvailable() + err = do.DDL().UpdateTableReplicaInfo(s, status.ID, available) if err != nil { writeError(w, err) } + if available { + err = infosync.DeleteTiFlashTableSyncProgress(status.ID) + } else { + err = infosync.UpdateTiFlashTableSyncProgress(context.Background(), status.ID, float64(status.RegionCount)/float64(status.FlashRegionCount)) + } + if err != nil { + writeError(w, err) + } + logutil.BgLogger().Info("handle flash replica report", zap.Int64("table ID", status.ID), zap.Uint64("region count", status.RegionCount), zap.Uint64("flash region count", status.FlashRegionCount), From a1f45a38f0c4cb4f5673a93a399d7859dc076aca Mon Sep 17 00:00:00 2001 From: crazycs Date: Mon, 10 Feb 2020 21:27:37 +0800 Subject: [PATCH 2/6] make ci happy Signed-off-by: crazycs --- domain/infosync/info.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/domain/infosync/info.go b/domain/infosync/info.go index f79f36b826b39..a8522bb71acf9 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -197,7 +197,7 @@ func DeleteTiFlashTableSyncProgress(tid int64) error { return util.DeleteKeyFromEtcd(key, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout) } -// DeleteTiFlashTableSyncProgress uses to delete the tiflash table replica sync progress. +// GetTiFlashTableSyncProgress uses to get all the tiflash table replica sync progress. func GetTiFlashTableSyncProgress(ctx context.Context) (map[int64]float64, error) { is, err := getGlobalInfoSyncer() if err != nil { From 9a85c8559c991fe9ffb049cf3015ba1efea62e94 Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 12 Feb 2020 10:24:05 +0800 Subject: [PATCH 3/6] address comment and support partition table Signed-off-by: crazycs --- domain/infosync/info.go | 6 +----- infoschema/tables.go | 11 ++++++++++- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/domain/infosync/info.go b/domain/infosync/info.go index a8522bb71acf9..71ac97c82cf91 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -208,16 +208,12 @@ func GetTiFlashTableSyncProgress(ctx context.Context) (map[int64]float64, error) return progressMap, nil } for i := 0; i < keyOpDefaultRetryCnt; i++ { - resp, err := is.etcdCli.Get(ctx, TiFlashTableSyncProgressPath, clientv3.WithPrefix()) + resp, err := is.etcdCli.Get(ctx, TiFlashTableSyncProgressPath+"/", clientv3.WithPrefix()) if err != nil { logutil.BgLogger().Info("get tiflash table replica sync progress failed, continue checking.", zap.Error(err)) continue } for _, kv := range resp.Kvs { - if len(kv.Key) <= len(TiFlashTableSyncProgressPath)+1 { - logutil.BgLogger().Info("invalid tiflash table replica sync progress key.", zap.String("key", string(kv.Key))) - continue - } tid, err := strconv.ParseInt(string(kv.Key[len(TiFlashTableSyncProgressPath)+1:]), 10, 64) if err != nil { logutil.BgLogger().Info("invalid tiflash table replica sync progress key.", zap.String("key", string(kv.Key))) diff --git a/infoschema/tables.go b/infoschema/tables.go index ca451697fb559..e93b3e2a8e655 100755 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -2290,7 +2290,16 @@ func dataForTableTiFlashReplica(ctx sessionctx.Context, schemas []*model.DBInfo) } progress := 1.0 if !tbl.TiFlashReplica.Available { - progress = progressMap[tbl.ID] + if pi := tbl.GetPartitionInfo(); pi != nil && len(pi.Definitions) > 0 { + progress = 0 + for _, p := range pi.Definitions { + // TODO: need check partition replica available. + progress += progressMap[p.ID] + } + progress = progress / float64(len(pi.Definitions)) + } else { + progress = progressMap[tbl.ID] + } } record := types.MakeDatums( schema.Name.O, // TABLE_SCHEMA From 64abe0933755bc54b453b368b55cf7c548d56fcd Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 12 Feb 2020 18:50:22 +0800 Subject: [PATCH 4/6] address comment Signed-off-by: crazycs --- domain/infosync/info.go | 4 ++-- server/http_handler.go | 8 +++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/domain/infosync/info.go b/domain/infosync/info.go index 71ac97c82cf91..09d21f4c97c4b 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -171,7 +171,7 @@ func GetAllServerInfo(ctx context.Context) (map[string]*ServerInfo, error) { return is.getAllServerInfo(ctx) } -// UpdateTiFlashTableSyncProgress uses to update the tiflash table replica sync progress. +// UpdateTiFlashTableSyncProgress is used to update the tiflash table replica sync progress. func UpdateTiFlashTableSyncProgress(ctx context.Context, tid int64, progress float64) error { is, err := getGlobalInfoSyncer() if err != nil { @@ -184,7 +184,7 @@ func UpdateTiFlashTableSyncProgress(ctx context.Context, tid int64, progress flo return util.PutKVToEtcd(ctx, is.etcdCli, keyOpDefaultRetryCnt, key, strconv.FormatFloat(progress, 'f', 2, 64)) } -// DeleteTiFlashTableSyncProgress uses to delete the tiflash table replica sync progress. +// DeleteTiFlashTableSyncProgress is used to delete the tiflash table replica sync progress. func DeleteTiFlashTableSyncProgress(tid int64) error { is, err := getGlobalInfoSyncer() if err != nil { diff --git a/server/http_handler.go b/server/http_handler.go index 8b4ed84f0af20..c76215c161208 100644 --- a/server/http_handler.go +++ b/server/http_handler.go @@ -752,8 +752,10 @@ func (h flashReplicaHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) type tableFlashReplicaStatus struct { // Modifying the field name needs to negotiate with TiFlash colleague. - ID int64 `json:"id"` - RegionCount uint64 `json:"region_count"` + ID int64 `json:"id"` + // RegionCount is the total resion number that need sync. + RegionCount uint64 `json:"region_count"` + // FlashRegionCount is the regions number that already sync completed. FlashRegionCount uint64 `json:"flash_region_count"` } @@ -787,7 +789,7 @@ func (h flashReplicaHandler) handleStatusReport(w http.ResponseWriter, req *http if available { err = infosync.DeleteTiFlashTableSyncProgress(status.ID) } else { - err = infosync.UpdateTiFlashTableSyncProgress(context.Background(), status.ID, float64(status.RegionCount)/float64(status.FlashRegionCount)) + err = infosync.UpdateTiFlashTableSyncProgress(context.Background(), status.ID, float64(status.FlashRegionCount)/float64(status.RegionCount)) } if err != nil { writeError(w, err) From 1a531af4395295bbb1352cd85bc74e4bdb5fb093 Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 12 Feb 2020 19:06:12 +0800 Subject: [PATCH 5/6] Update server/http_handler.go Co-Authored-By: Arenatlx --- server/http_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/http_handler.go b/server/http_handler.go index c76215c161208..a4cd41d340456 100644 --- a/server/http_handler.go +++ b/server/http_handler.go @@ -753,7 +753,7 @@ func (h flashReplicaHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) type tableFlashReplicaStatus struct { // Modifying the field name needs to negotiate with TiFlash colleague. ID int64 `json:"id"` - // RegionCount is the total resion number that need sync. + // RegionCount is the number of regions that need sync. RegionCount uint64 `json:"region_count"` // FlashRegionCount is the regions number that already sync completed. FlashRegionCount uint64 `json:"flash_region_count"` From 2b23b620634d25d22e7f808e1ca3f52499bff1a4 Mon Sep 17 00:00:00 2001 From: crazycs Date: Wed, 12 Feb 2020 19:06:20 +0800 Subject: [PATCH 6/6] Update server/http_handler.go Co-Authored-By: Arenatlx --- server/http_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/http_handler.go b/server/http_handler.go index a4cd41d340456..7e03f58fc1782 100644 --- a/server/http_handler.go +++ b/server/http_handler.go @@ -755,7 +755,7 @@ type tableFlashReplicaStatus struct { ID int64 `json:"id"` // RegionCount is the number of regions that need sync. RegionCount uint64 `json:"region_count"` - // FlashRegionCount is the regions number that already sync completed. + // FlashRegionCount is the number of regions that already sync completed. FlashRegionCount uint64 `json:"flash_region_count"` }