From 7afe1dbb3b4132211ce2b5fafd904846c7862e10 Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 8 Aug 2023 10:28:57 +0800 Subject: [PATCH 1/7] *: support upgrade start/finish APIs --- server/handler/tests/http_handler_test.go | 78 ++++++++++++++++++++ server/http_handler.go | 87 +++++++++++++++++++++++ server/http_status.go | 3 + session/bootstrap.go | 36 +++++++--- 4 files changed, 196 insertions(+), 8 deletions(-) diff --git a/server/handler/tests/http_handler_test.go b/server/handler/tests/http_handler_test.go index ddbe5d8b4e96d..592e9329c25fc 100644 --- a/server/handler/tests/http_handler_test.go +++ b/server/handler/tests/http_handler_test.go @@ -1241,3 +1241,81 @@ func TestSetLabelsConcurrentWithGetLabel(t *testing.T) { conf.Labels = map[string]string{} }) } + +func TestUpgrade(t *testing.T) { + ts := createBasicHTTPHandlerTestSuite() + ts.startServer(t) + defer ts.stopServer(t) + + // test /upgrade/start + resp, err := ts.FetchStatus("/upgrade/start") + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + b, err := httputil.DumpResponse(resp, true) + require.NoError(t, err) + require.Greater(t, len(b), 0) + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.Equal(t, "\"success!\"", string(body)) + // check the result + se, err := session.CreateSession(ts.store) + require.NoError(t, err) + isUpgrading, err := session.IsUpgrading(se) + require.NoError(t, err) + require.True(t, isUpgrading) + + // Do start upgrade again. + resp, err = ts.FetchStatus("/upgrade/start") + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + b, err = httputil.DumpResponse(resp, true) + require.NoError(t, err) + require.Greater(t, len(b), 0) + body, err = io.ReadAll(resp.Body) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.Equal(t, "\"Be upgrading.\"\"success!\"", string(body)) + // check the result + se, err = session.CreateSession(ts.store) + require.NoError(t, err) + isUpgrading, err = session.IsUpgrading(se) + require.NoError(t, err) + require.True(t, isUpgrading) + + // test /upgrade/finish + resp, err = ts.FetchStatus("/upgrade/finish") + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + b, err = httputil.DumpResponse(resp, true) + require.NoError(t, err) + require.Greater(t, len(b), 0) + body, err = io.ReadAll(resp.Body) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.Equal(t, "\"success!\"", string(body)) + // check the result + se, err = session.CreateSession(ts.store) + require.NoError(t, err) + isUpgrading, err = session.IsUpgrading(se) + require.NoError(t, err) + require.False(t, isUpgrading) + + // Do finish upgrade again. + resp, err = ts.FetchStatus("/upgrade/finish") + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + b, err = httputil.DumpResponse(resp, true) + require.NoError(t, err) + require.Greater(t, len(b), 0) + body, err = io.ReadAll(resp.Body) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) + require.Equal(t, "\"Be normal.\"\"success!\"", string(body)) + // check the result + se, err = session.CreateSession(ts.store) + require.NoError(t, err) + isUpgrading, err = session.IsUpgrading(se) + require.NoError(t, err) + require.False(t, isUpgrading) +} diff --git a/server/http_handler.go b/server/http_handler.go index d10e99af3314b..7fdb828c0047e 100644 --- a/server/http_handler.go +++ b/server/http_handler.go @@ -15,11 +15,17 @@ package server import ( + "net/http" + + "github.com/gorilla/mux" + "github.com/pingcap/errors" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/server/handler" "github.com/pingcap/tidb/server/handler/optimizor" + "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/store/helper" ) @@ -65,3 +71,84 @@ func (s *Server) newPlanReplayerHandler() *optimizor.PlanReplayerHandler { } return optimizor.NewPlanReplayerHandler(is, statsHandle, infoGetter, cfg.AdvertiseAddress, cfg.Status.StatusPort) } + +// ClusterUpgradeHandler is the handler for upgrading cluster. +type ClusterUpgradeHandler struct { + store kv.Storage +} + +// NewClusterUpgradeHandler creates a new ClusterUpgradeHandler. +func NewClusterUpgradeHandler(store kv.Storage) *ClusterUpgradeHandler { + return &ClusterUpgradeHandler{store: store} +} + +// ServeHTTP handles request of ddl server info. +func (h ClusterUpgradeHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + // parse params + params := mux.Vars(req) + + var err error + var hasDone bool + op := params[handler.Operation] + switch op { + case "start": + hasDone, err = h.startUpgrade() + case "finish": + hasDone, err = h.finishUpgrade() + default: + handler.WriteError(w, errors.Errorf("wrong operation:%s", op)) + return + } + + if err != nil { + handler.WriteError(w, err) + return + } + if hasDone { + switch op { + case "start": + handler.WriteData(w, "Be upgrading.") + case "finish": + handler.WriteData(w, "Be normal.") + } + } + handler.WriteData(w, "success!") +} + +func (h ClusterUpgradeHandler) startUpgrade() (hasDone bool, err error) { + se, err := session.CreateSession(h.store) + if err != nil { + return false, err + } + defer se.Close() + + isUpgrading, err := session.IsUpgrading(se) + if err != nil { + return false, err + } + if isUpgrading { + return true, nil + } + + err = session.SyncUpgradeState(se) + return false, err +} + +func (h ClusterUpgradeHandler) finishUpgrade() (hasDone bool, err error) { + se, err := session.CreateSession(h.store) + if err != nil { + return false, err + } + defer se.Close() + + isUpgrading, err := session.IsUpgrading(se) + if err != nil { + return false, err + } + if !isUpgrading { + return true, nil + } + + err = session.SyncNormalRunning(se) + return false, err +} diff --git a/server/http_status.go b/server/http_status.go index 20bc5299b245d..545a8b2b2c55d 100644 --- a/server/http_status.go +++ b/server/http_status.go @@ -246,6 +246,9 @@ func (s *Server) startHTTPServer() { // HTTP path for get table tiflash replica info. router.Handle("/tiflash/replica-deprecated", tikvhandler.NewFlashReplicaHandler(tikvHandlerTool)) + // + router.Handle("/upgrade/{op}", NewClusterUpgradeHandler(tikvHandlerTool.Store.(kv.Storage))).Name("upgrade operations") + if s.cfg.Store == "tikv" { // HTTP path for tikv. router.Handle("/tables/{db}/{table}/regions", tikvhandler.NewTableHandler(tikvHandlerTool, tikvhandler.OpTableRegions)) diff --git a/session/bootstrap.go b/session/bootstrap.go index 90a7d22b80899..cc4668d9aaf14 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -1209,7 +1209,7 @@ func upgrade(s Session) { } if ver >= int64(SupportUpgradeStateVer) { - syncUpgradeState(s) + terror.MustNil(SyncUpgradeState(s)) } if isNull { upgradeToVer99Before(s) @@ -1224,7 +1224,7 @@ func upgrade(s Session) { upgradeToVer99After(s) } if ver >= int64(SupportUpgradeStateVer) { - syncNormalRunning(s) + terror.MustNil(SyncNormalRunning(s)) } variable.DDLForce2Queue.Store(false) @@ -1253,14 +1253,16 @@ func upgrade(s Session) { } } -func syncUpgradeState(s Session) { +// SyncUpgradeState syncs upgrade state to etcd. +func SyncUpgradeState(s Session) error { totalInterval := time.Duration(internalSQLTimeout) * time.Second ctx, cancelFunc := context.WithTimeout(context.Background(), totalInterval) defer cancelFunc() dom := domain.GetDomain(s) err := dom.DDL().StateSyncer().UpdateGlobalState(ctx, syncer.NewStateInfo(syncer.StateUpgrading)) if err != nil { - logutil.BgLogger().Fatal("[upgrading] update global state failed", zap.String("state", syncer.StateUpgrading), zap.Error(err)) + logutil.BgLogger().Error("[upgrading] update global state failed", zap.String("state", syncer.StateUpgrading), zap.Error(err)) + return err } interval := 200 * time.Millisecond @@ -1271,7 +1273,8 @@ func syncUpgradeState(s Session) { break } if i == retryTimes-1 { - logutil.BgLogger().Fatal("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err)) + logutil.BgLogger().Error("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err)) + return err } if i%10 == 0 { logutil.BgLogger().Warn("get owner op failed", zap.String("category", "upgrading"), zap.Stringer("state", op), zap.Error(err)) @@ -1298,15 +1301,18 @@ func syncUpgradeState(s Session) { } if i == retryTimes-1 { - logutil.BgLogger().Fatal("[upgrading] pause all jobs failed", zap.Strings("errs", jobErrStrs), zap.Error(err)) + logutil.BgLogger().Error("[upgrading] pause all jobs failed", zap.Strings("errs", jobErrStrs), zap.Error(err)) + return err } logutil.BgLogger().Warn("pause all jobs failed", zap.String("category", "upgrading"), zap.Strings("errs", jobErrStrs), zap.Error(err)) time.Sleep(interval) } logutil.BgLogger().Info("update global state to upgrading", zap.String("category", "upgrading"), zap.String("state", syncer.StateUpgrading)) + return nil } -func syncNormalRunning(s Session) { +// SyncNormalRunning syncs normal state to etcd. +func SyncNormalRunning(s Session) error { failpoint.Inject("mockResumeAllJobsFailed", func(val failpoint.Value) { if val.(bool) { dom := domain.GetDomain(s) @@ -1329,9 +1335,23 @@ func syncNormalRunning(s Session) { dom := domain.GetDomain(s) err = dom.DDL().StateSyncer().UpdateGlobalState(ctx, syncer.NewStateInfo(syncer.StateNormalRunning)) if err != nil { - logutil.BgLogger().Fatal("[upgrading] update global state to normal failed", zap.Error(err)) + logutil.BgLogger().Error("[upgrading] update global state to normal failed", zap.Error(err)) + return err } logutil.BgLogger().Info("update global state to normal running finished", zap.String("category", "upgrading")) + return nil +} + +func IsUpgrading(s Session) (bool, error) { + dom := domain.GetDomain(s) + ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second) + defer cancelFunc() + stateInfo, err := dom.DDL().StateSyncer().GetGlobalState(ctx) + if err != nil { + return false, err + } + + return stateInfo.State == syncer.StateUpgrading, nil } // checkOwnerVersion is used to wait the DDL owner to be elected in the cluster and check it is the same version as this TiDB. From cd5ce6981cfd0b24af87439e43de7f99887ea4b0 Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 8 Aug 2023 10:57:38 +0800 Subject: [PATCH 2/7] *: make nogo happy --- server/handler/tests/BUILD.bazel | 2 +- server/handler/tests/http_handler_test.go | 8 ++++---- server/http_handler.go | 4 ++-- session/bootstrap.go | 5 +++-- 4 files changed, 10 insertions(+), 9 deletions(-) diff --git a/server/handler/tests/BUILD.bazel b/server/handler/tests/BUILD.bazel index d8d4cde458cd4..1b0b5d772edec 100644 --- a/server/handler/tests/BUILD.bazel +++ b/server/handler/tests/BUILD.bazel @@ -9,7 +9,7 @@ go_test( "main_test.go", ], flaky = True, - shard_count = 35, + shard_count = 36, deps = [ "//config", "//ddl", diff --git a/server/handler/tests/http_handler_test.go b/server/handler/tests/http_handler_test.go index 592e9329c25fc..8d1d691b028ec 100644 --- a/server/handler/tests/http_handler_test.go +++ b/server/handler/tests/http_handler_test.go @@ -1261,7 +1261,7 @@ func TestUpgrade(t *testing.T) { // check the result se, err := session.CreateSession(ts.store) require.NoError(t, err) - isUpgrading, err := session.IsUpgrading(se) + isUpgrading, err := session.IsUpgradingClusterState(se) require.NoError(t, err) require.True(t, isUpgrading) @@ -1279,7 +1279,7 @@ func TestUpgrade(t *testing.T) { // check the result se, err = session.CreateSession(ts.store) require.NoError(t, err) - isUpgrading, err = session.IsUpgrading(se) + isUpgrading, err = session.IsUpgradingClusterState(se) require.NoError(t, err) require.True(t, isUpgrading) @@ -1297,7 +1297,7 @@ func TestUpgrade(t *testing.T) { // check the result se, err = session.CreateSession(ts.store) require.NoError(t, err) - isUpgrading, err = session.IsUpgrading(se) + isUpgrading, err = session.IsUpgradingClusterState(se) require.NoError(t, err) require.False(t, isUpgrading) @@ -1315,7 +1315,7 @@ func TestUpgrade(t *testing.T) { // check the result se, err = session.CreateSession(ts.store) require.NoError(t, err) - isUpgrading, err = session.IsUpgrading(se) + isUpgrading, err = session.IsUpgradingClusterState(se) require.NoError(t, err) require.False(t, isUpgrading) } diff --git a/server/http_handler.go b/server/http_handler.go index 7fdb828c0047e..c4858fbe04dd3 100644 --- a/server/http_handler.go +++ b/server/http_handler.go @@ -122,7 +122,7 @@ func (h ClusterUpgradeHandler) startUpgrade() (hasDone bool, err error) { } defer se.Close() - isUpgrading, err := session.IsUpgrading(se) + isUpgrading, err := session.IsUpgradingClusterState(se) if err != nil { return false, err } @@ -141,7 +141,7 @@ func (h ClusterUpgradeHandler) finishUpgrade() (hasDone bool, err error) { } defer se.Close() - isUpgrading, err := session.IsUpgrading(se) + isUpgrading, err := session.IsUpgradingClusterState(se) if err != nil { return false, err } diff --git a/session/bootstrap.go b/session/bootstrap.go index cc4668d9aaf14..e03535ee419fa 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -1318,7 +1318,7 @@ func SyncNormalRunning(s Session) error { dom := domain.GetDomain(s) //nolint: errcheck dom.DDL().StateSyncer().UpdateGlobalState(context.Background(), syncer.NewStateInfo(syncer.StateNormalRunning)) - failpoint.Return() + failpoint.Return(nil) } }) @@ -1342,7 +1342,8 @@ func SyncNormalRunning(s Session) error { return nil } -func IsUpgrading(s Session) (bool, error) { +// IsUpgradingClusterState checks whether the global state is upgrading. +func IsUpgradingClusterState(s Session) (bool, error) { dom := domain.GetDomain(s) ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second) defer cancelFunc() From a766cf5e7464f020ddc84f9a01b2da3c7a8b6641 Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 8 Aug 2023 11:50:48 +0800 Subject: [PATCH 3/7] *: move upgrade handler to server/handler --- server/handler/BUILD.bazel | 2 + server/handler/upgradehandler.go | 91 ++++++++++++++++++++++++++++++++ server/http_handler.go | 87 ------------------------------ server/http_status.go | 5 +- 4 files changed, 96 insertions(+), 89 deletions(-) create mode 100644 server/handler/upgradehandler.go diff --git a/server/handler/BUILD.bazel b/server/handler/BUILD.bazel index 82af4c8affc86..846c308ac6cb8 100644 --- a/server/handler/BUILD.bazel +++ b/server/handler/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "handler", srcs = [ "tikv_handler.go", + "upgradehandler.go", "util.go", ], importpath = "github.com/pingcap/tidb/server/handler", @@ -22,6 +23,7 @@ go_library( "//tablecodec", "//types", "//util/codec", + "@com_github_gorilla_mux//:mux", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/kvrpcpb", diff --git a/server/handler/upgradehandler.go b/server/handler/upgradehandler.go new file mode 100644 index 0000000000000..67dbbb09d5b48 --- /dev/null +++ b/server/handler/upgradehandler.go @@ -0,0 +1,91 @@ +package handler + +import ( + "net/http" + + "github.com/gorilla/mux" + "github.com/pingcap/errors" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/session" +) + +// ClusterUpgradeHandler is the handler for upgrading cluster. +type ClusterUpgradeHandler struct { + store kv.Storage +} + +// NewClusterUpgradeHandler creates a new ClusterUpgradeHandler. +func NewClusterUpgradeHandler(store kv.Storage) *ClusterUpgradeHandler { + return &ClusterUpgradeHandler{store: store} +} + +// ServeHTTP handles request of ddl server info. +func (h ClusterUpgradeHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + // parse params + params := mux.Vars(req) + + var err error + var hasDone bool + op := params[Operation] + switch op { + case "start": + hasDone, err = h.startUpgrade() + case "finish": + hasDone, err = h.finishUpgrade() + default: + WriteError(w, errors.Errorf("wrong operation:%s", op)) + return + } + + if err != nil { + WriteError(w, err) + return + } + if hasDone { + switch op { + case "start": + WriteData(w, "Be upgrading.") + case "finish": + WriteData(w, "Be normal.") + } + } + WriteData(w, "success!") +} + +func (h ClusterUpgradeHandler) startUpgrade() (hasDone bool, err error) { + se, err := session.CreateSession(h.store) + if err != nil { + return false, err + } + defer se.Close() + + isUpgrading, err := session.IsUpgradingClusterState(se) + if err != nil { + return false, err + } + if isUpgrading { + return true, nil + } + + err = session.SyncUpgradeState(se) + return false, err +} + +func (h ClusterUpgradeHandler) finishUpgrade() (hasDone bool, err error) { + se, err := session.CreateSession(h.store) + if err != nil { + return false, err + } + defer se.Close() + + isUpgrading, err := session.IsUpgradingClusterState(se) + if err != nil { + return false, err + } + if !isUpgrading { + return true, nil + } + + err = session.SyncNormalRunning(se) + return false, err +} diff --git a/server/http_handler.go b/server/http_handler.go index c4858fbe04dd3..d10e99af3314b 100644 --- a/server/http_handler.go +++ b/server/http_handler.go @@ -15,17 +15,11 @@ package server import ( - "net/http" - - "github.com/gorilla/mux" - "github.com/pingcap/errors" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/infoschema" - "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/server/handler" "github.com/pingcap/tidb/server/handler/optimizor" - "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/store/helper" ) @@ -71,84 +65,3 @@ func (s *Server) newPlanReplayerHandler() *optimizor.PlanReplayerHandler { } return optimizor.NewPlanReplayerHandler(is, statsHandle, infoGetter, cfg.AdvertiseAddress, cfg.Status.StatusPort) } - -// ClusterUpgradeHandler is the handler for upgrading cluster. -type ClusterUpgradeHandler struct { - store kv.Storage -} - -// NewClusterUpgradeHandler creates a new ClusterUpgradeHandler. -func NewClusterUpgradeHandler(store kv.Storage) *ClusterUpgradeHandler { - return &ClusterUpgradeHandler{store: store} -} - -// ServeHTTP handles request of ddl server info. -func (h ClusterUpgradeHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - // parse params - params := mux.Vars(req) - - var err error - var hasDone bool - op := params[handler.Operation] - switch op { - case "start": - hasDone, err = h.startUpgrade() - case "finish": - hasDone, err = h.finishUpgrade() - default: - handler.WriteError(w, errors.Errorf("wrong operation:%s", op)) - return - } - - if err != nil { - handler.WriteError(w, err) - return - } - if hasDone { - switch op { - case "start": - handler.WriteData(w, "Be upgrading.") - case "finish": - handler.WriteData(w, "Be normal.") - } - } - handler.WriteData(w, "success!") -} - -func (h ClusterUpgradeHandler) startUpgrade() (hasDone bool, err error) { - se, err := session.CreateSession(h.store) - if err != nil { - return false, err - } - defer se.Close() - - isUpgrading, err := session.IsUpgradingClusterState(se) - if err != nil { - return false, err - } - if isUpgrading { - return true, nil - } - - err = session.SyncUpgradeState(se) - return false, err -} - -func (h ClusterUpgradeHandler) finishUpgrade() (hasDone bool, err error) { - se, err := session.CreateSession(h.store) - if err != nil { - return false, err - } - defer se.Close() - - isUpgrading, err := session.IsUpgradingClusterState(se) - if err != nil { - return false, err - } - if !isUpgrading { - return true, nil - } - - err = session.SyncNormalRunning(se) - return false, err -} diff --git a/server/http_status.go b/server/http_status.go index 545a8b2b2c55d..7f6a2aa0126a7 100644 --- a/server/http_status.go +++ b/server/http_status.go @@ -44,6 +44,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/server/handler" "github.com/pingcap/tidb/server/handler/optimizor" "github.com/pingcap/tidb/server/handler/tikvhandler" "github.com/pingcap/tidb/server/handler/ttlhandler" @@ -246,8 +247,8 @@ func (s *Server) startHTTPServer() { // HTTP path for get table tiflash replica info. router.Handle("/tiflash/replica-deprecated", tikvhandler.NewFlashReplicaHandler(tikvHandlerTool)) - // - router.Handle("/upgrade/{op}", NewClusterUpgradeHandler(tikvHandlerTool.Store.(kv.Storage))).Name("upgrade operations") + // HTTP path for upgrade operations. + router.Handle("/upgrade/{op}", handler.NewClusterUpgradeHandler(tikvHandlerTool.Store.(kv.Storage))).Name("upgrade operations") if s.cfg.Store == "tikv" { // HTTP path for tikv. From 757c19e8321bd4b23f13482cec325dd5564e43a2 Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 8 Aug 2023 14:01:52 +0800 Subject: [PATCH 4/7] server: address a comment --- server/handler/upgradehandler.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/server/handler/upgradehandler.go b/server/handler/upgradehandler.go index 67dbbb09d5b48..a86f990830e03 100644 --- a/server/handler/upgradehandler.go +++ b/server/handler/upgradehandler.go @@ -1,3 +1,17 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package handler import ( From 6049691d3a8e06bfd3b66019091d14cab2ee905a Mon Sep 17 00:00:00 2001 From: Lynn Date: Tue, 15 Aug 2023 10:48:50 +0800 Subject: [PATCH 5/7] *: address a comment --- server/handler/BUILD.bazel | 3 ++- server/handler/tests/http_handler_test.go | 19 +++++++++++++------ server/handler/upgradehandler.go | 13 +++++++++---- server/http_status.go | 2 +- 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/server/handler/BUILD.bazel b/server/handler/BUILD.bazel index 846c308ac6cb8..d443361eb22e6 100644 --- a/server/handler/BUILD.bazel +++ b/server/handler/BUILD.bazel @@ -23,11 +23,12 @@ go_library( "//tablecodec", "//types", "//util/codec", - "@com_github_gorilla_mux//:mux", + "//util/logutil", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_pingcap_kvproto//pkg/metapb", "@com_github_tikv_client_go_v2//tikv", + "@org_uber_go_zap//:zap", ], ) diff --git a/server/handler/tests/http_handler_test.go b/server/handler/tests/http_handler_test.go index 8d1d691b028ec..fa6000261c093 100644 --- a/server/handler/tests/http_handler_test.go +++ b/server/handler/tests/http_handler_test.go @@ -1247,8 +1247,15 @@ func TestUpgrade(t *testing.T) { ts.startServer(t) defer ts.stopServer(t) - // test /upgrade/start - resp, err := ts.FetchStatus("/upgrade/start") + resp, err := ts.FetchStatus("/upgrade") + require.NoError(t, err) + require.Equal(t, http.StatusBadRequest, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + + require.NoError(t, err) + require.NotNil(t, resp) + // test upgrade start + resp, err = ts.PostStatus("/upgrade", "application/x-www-form-urlencoded", bytes.NewBuffer([]byte(`op=start`))) require.NoError(t, err) require.Equal(t, http.StatusOK, resp.StatusCode) b, err := httputil.DumpResponse(resp, true) @@ -1266,7 +1273,7 @@ func TestUpgrade(t *testing.T) { require.True(t, isUpgrading) // Do start upgrade again. - resp, err = ts.FetchStatus("/upgrade/start") + resp, err = ts.PostStatus("/upgrade", "application/x-www-form-urlencoded", bytes.NewBuffer([]byte(`op=start`))) require.NoError(t, err) require.Equal(t, http.StatusOK, resp.StatusCode) b, err = httputil.DumpResponse(resp, true) @@ -1283,8 +1290,8 @@ func TestUpgrade(t *testing.T) { require.NoError(t, err) require.True(t, isUpgrading) - // test /upgrade/finish - resp, err = ts.FetchStatus("/upgrade/finish") + // test upgrade finish + resp, err = ts.PostStatus("/upgrade", "application/x-www-form-urlencoded", bytes.NewBuffer([]byte(`op=finish`))) require.NoError(t, err) require.Equal(t, http.StatusOK, resp.StatusCode) b, err = httputil.DumpResponse(resp, true) @@ -1302,7 +1309,7 @@ func TestUpgrade(t *testing.T) { require.False(t, isUpgrading) // Do finish upgrade again. - resp, err = ts.FetchStatus("/upgrade/finish") + resp, err = ts.PostStatus("/upgrade", "application/x-www-form-urlencoded", bytes.NewBuffer([]byte(`op=finish`))) require.NoError(t, err) require.Equal(t, http.StatusOK, resp.StatusCode) b, err = httputil.DumpResponse(resp, true) diff --git a/server/handler/upgradehandler.go b/server/handler/upgradehandler.go index a86f990830e03..e3957b97568f2 100644 --- a/server/handler/upgradehandler.go +++ b/server/handler/upgradehandler.go @@ -17,10 +17,11 @@ package handler import ( "net/http" - "github.com/gorilla/mux" "github.com/pingcap/errors" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/session" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" ) // ClusterUpgradeHandler is the handler for upgrading cluster. @@ -35,12 +36,14 @@ func NewClusterUpgradeHandler(store kv.Storage) *ClusterUpgradeHandler { // ServeHTTP handles request of ddl server info. func (h ClusterUpgradeHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - // parse params - params := mux.Vars(req) + if req.Method != http.MethodPost { + WriteError(w, errors.Errorf("This API only support POST method")) + return + } var err error var hasDone bool - op := params[Operation] + op := req.FormValue("op") switch op { case "start": hasDone, err = h.startUpgrade() @@ -64,6 +67,8 @@ func (h ClusterUpgradeHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques } } WriteData(w, "success!") + logutil.Logger(req.Context()).Info("[upgrading] upgrade op success", + zap.String("op", req.FormValue("op")), zap.Bool("hasDone", hasDone)) } func (h ClusterUpgradeHandler) startUpgrade() (hasDone bool, err error) { diff --git a/server/http_status.go b/server/http_status.go index 7f6a2aa0126a7..4f9bd410322c6 100644 --- a/server/http_status.go +++ b/server/http_status.go @@ -248,7 +248,7 @@ func (s *Server) startHTTPServer() { router.Handle("/tiflash/replica-deprecated", tikvhandler.NewFlashReplicaHandler(tikvHandlerTool)) // HTTP path for upgrade operations. - router.Handle("/upgrade/{op}", handler.NewClusterUpgradeHandler(tikvHandlerTool.Store.(kv.Storage))).Name("upgrade operations") + router.Handle("/upgrade", handler.NewClusterUpgradeHandler(tikvHandlerTool.Store.(kv.Storage))).Name("upgrade operations") if s.cfg.Store == "tikv" { // HTTP path for tikv. From 179fa9d245b3f2cb2575cf3fd6627023c6fc9c15 Mon Sep 17 00:00:00 2001 From: Lynn Date: Wed, 16 Aug 2023 15:25:15 +0800 Subject: [PATCH 6/7] server: address comments --- server/handler/BUILD.bazel | 2 +- server/handler/{upgradehandler.go => upgrade_handler.go} | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) rename server/handler/{upgradehandler.go => upgrade_handler.go} (93%) diff --git a/server/handler/BUILD.bazel b/server/handler/BUILD.bazel index d443361eb22e6..22ce358a1897a 100644 --- a/server/handler/BUILD.bazel +++ b/server/handler/BUILD.bazel @@ -4,7 +4,7 @@ go_library( name = "handler", srcs = [ "tikv_handler.go", - "upgradehandler.go", + "upgrade_handler.go", "util.go", ], importpath = "github.com/pingcap/tidb/server/handler", diff --git a/server/handler/upgradehandler.go b/server/handler/upgrade_handler.go similarity index 93% rename from server/handler/upgradehandler.go rename to server/handler/upgrade_handler.go index e3957b97568f2..51c13cfb40c17 100644 --- a/server/handler/upgradehandler.go +++ b/server/handler/upgrade_handler.go @@ -67,8 +67,8 @@ func (h ClusterUpgradeHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques } } WriteData(w, "success!") - logutil.Logger(req.Context()).Info("[upgrading] upgrade op success", - zap.String("op", req.FormValue("op")), zap.Bool("hasDone", hasDone)) + logutil.Logger(req.Context()).Info("upgrade op success", + zap.String("category", "upgrading"), zap.String("op", req.FormValue("op")), zap.Bool("hasDone", hasDone)) } func (h ClusterUpgradeHandler) startUpgrade() (hasDone bool, err error) { From 521fe547f51559128631b5a5827bd2db09b98320 Mon Sep 17 00:00:00 2001 From: Lynn Date: Wed, 16 Aug 2023 15:31:07 +0800 Subject: [PATCH 7/7] session: update logs --- session/bootstrap.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/session/bootstrap.go b/session/bootstrap.go index e03535ee419fa..d31820a4d6a12 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -1261,7 +1261,7 @@ func SyncUpgradeState(s Session) error { dom := domain.GetDomain(s) err := dom.DDL().StateSyncer().UpdateGlobalState(ctx, syncer.NewStateInfo(syncer.StateUpgrading)) if err != nil { - logutil.BgLogger().Error("[upgrading] update global state failed", zap.String("state", syncer.StateUpgrading), zap.Error(err)) + logutil.BgLogger().Error("update global state failed", zap.String("category", "upgrading"), zap.String("state", syncer.StateUpgrading), zap.Error(err)) return err } @@ -1273,7 +1273,7 @@ func SyncUpgradeState(s Session) error { break } if i == retryTimes-1 { - logutil.BgLogger().Error("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err)) + logutil.BgLogger().Error("get owner op failed", zap.String("category", "upgrading"), zap.Stringer("state", op), zap.Error(err)) return err } if i%10 == 0 { @@ -1301,7 +1301,7 @@ func SyncUpgradeState(s Session) error { } if i == retryTimes-1 { - logutil.BgLogger().Error("[upgrading] pause all jobs failed", zap.Strings("errs", jobErrStrs), zap.Error(err)) + logutil.BgLogger().Error("pause all jobs failed", zap.String("category", "upgrading"), zap.Strings("errs", jobErrStrs), zap.Error(err)) return err } logutil.BgLogger().Warn("pause all jobs failed", zap.String("category", "upgrading"), zap.Strings("errs", jobErrStrs), zap.Error(err)) @@ -1335,7 +1335,7 @@ func SyncNormalRunning(s Session) error { dom := domain.GetDomain(s) err = dom.DDL().StateSyncer().UpdateGlobalState(ctx, syncer.NewStateInfo(syncer.StateNormalRunning)) if err != nil { - logutil.BgLogger().Error("[upgrading] update global state to normal failed", zap.Error(err)) + logutil.BgLogger().Error("update global state to normal failed", zap.String("category", "upgrading"), zap.Error(err)) return err } logutil.BgLogger().Info("update global state to normal running finished", zap.String("category", "upgrading"))