Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support /upgrade/start and upgrade/finish APIs #45887

Merged
merged 7 commits into from
Aug 16, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions server/handler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "handler",
srcs = [
"tikv_handler.go",
"upgradehandler.go",
"util.go",
],
importpath = "github.com/pingcap/tidb/server/handler",
Expand All @@ -22,6 +23,7 @@ go_library(
"//tablecodec",
"//types",
"//util/codec",
"@com_github_gorilla_mux//:mux",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
Expand Down
2 changes: 1 addition & 1 deletion server/handler/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 35,
shard_count = 36,
deps = [
"//config",
"//ddl",
Expand Down
78 changes: 78 additions & 0 deletions server/handler/tests/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1241,3 +1241,81 @@ func TestSetLabelsConcurrentWithGetLabel(t *testing.T) {
conf.Labels = map[string]string{}
})
}

func TestUpgrade(t *testing.T) {
ts := createBasicHTTPHandlerTestSuite()
ts.startServer(t)
defer ts.stopServer(t)

// test /upgrade/start
resp, err := ts.FetchStatus("/upgrade/start")
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
b, err := httputil.DumpResponse(resp, true)
require.NoError(t, err)
require.Greater(t, len(b), 0)
body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, "\"success!\"", string(body))
// check the result
se, err := session.CreateSession(ts.store)
require.NoError(t, err)
isUpgrading, err := session.IsUpgradingClusterState(se)
require.NoError(t, err)
require.True(t, isUpgrading)

// Do start upgrade again.
resp, err = ts.FetchStatus("/upgrade/start")
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
b, err = httputil.DumpResponse(resp, true)
require.NoError(t, err)
require.Greater(t, len(b), 0)
body, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, "\"Be upgrading.\"\"success!\"", string(body))
// check the result
se, err = session.CreateSession(ts.store)
require.NoError(t, err)
isUpgrading, err = session.IsUpgradingClusterState(se)
require.NoError(t, err)
require.True(t, isUpgrading)

// test /upgrade/finish
resp, err = ts.FetchStatus("/upgrade/finish")
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
b, err = httputil.DumpResponse(resp, true)
require.NoError(t, err)
require.Greater(t, len(b), 0)
body, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, "\"success!\"", string(body))
// check the result
se, err = session.CreateSession(ts.store)
require.NoError(t, err)
isUpgrading, err = session.IsUpgradingClusterState(se)
require.NoError(t, err)
require.False(t, isUpgrading)

// Do finish upgrade again.
resp, err = ts.FetchStatus("/upgrade/finish")
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
b, err = httputil.DumpResponse(resp, true)
require.NoError(t, err)
require.Greater(t, len(b), 0)
body, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
require.Equal(t, "\"Be normal.\"\"success!\"", string(body))
// check the result
se, err = session.CreateSession(ts.store)
require.NoError(t, err)
isUpgrading, err = session.IsUpgradingClusterState(se)
require.NoError(t, err)
require.False(t, isUpgrading)
}
105 changes: 105 additions & 0 deletions server/handler/upgradehandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package handler
tangenta marked this conversation as resolved.
Show resolved Hide resolved

import (
"net/http"

"github.com/gorilla/mux"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/session"
)

// ClusterUpgradeHandler is the handler for upgrading cluster.
type ClusterUpgradeHandler struct {
store kv.Storage
}

// NewClusterUpgradeHandler creates a new ClusterUpgradeHandler.
func NewClusterUpgradeHandler(store kv.Storage) *ClusterUpgradeHandler {
return &ClusterUpgradeHandler{store: store}
}

// ServeHTTP handles request of ddl server info.
func (h ClusterUpgradeHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// parse params
params := mux.Vars(req)

var err error
var hasDone bool
op := params[Operation]
switch op {
case "start":
hasDone, err = h.startUpgrade()
case "finish":
hasDone, err = h.finishUpgrade()
default:
WriteError(w, errors.Errorf("wrong operation:%s", op))
return

Check warning on line 51 in server/handler/upgradehandler.go

View check run for this annotation

Codecov / codecov/patch

server/handler/upgradehandler.go#L49-L51

Added lines #L49 - L51 were not covered by tests
}

if err != nil {
WriteError(w, err)
return
}

Check warning on line 57 in server/handler/upgradehandler.go

View check run for this annotation

Codecov / codecov/patch

server/handler/upgradehandler.go#L55-L57

Added lines #L55 - L57 were not covered by tests
if hasDone {
switch op {
case "start":
WriteData(w, "Be upgrading.")
case "finish":
WriteData(w, "Be normal.")
}
}
WriteData(w, "success!")
}

func (h ClusterUpgradeHandler) startUpgrade() (hasDone bool, err error) {
tangenta marked this conversation as resolved.
Show resolved Hide resolved
se, err := session.CreateSession(h.store)
if err != nil {
return false, err
}

Check warning on line 73 in server/handler/upgradehandler.go

View check run for this annotation

Codecov / codecov/patch

server/handler/upgradehandler.go#L72-L73

Added lines #L72 - L73 were not covered by tests
defer se.Close()

isUpgrading, err := session.IsUpgradingClusterState(se)
if err != nil {
return false, err
}

Check warning on line 79 in server/handler/upgradehandler.go

View check run for this annotation

Codecov / codecov/patch

server/handler/upgradehandler.go#L78-L79

Added lines #L78 - L79 were not covered by tests
if isUpgrading {
return true, nil
}

err = session.SyncUpgradeState(se)
return false, err
}

func (h ClusterUpgradeHandler) finishUpgrade() (hasDone bool, err error) {
se, err := session.CreateSession(h.store)
if err != nil {
return false, err
}

Check warning on line 92 in server/handler/upgradehandler.go

View check run for this annotation

Codecov / codecov/patch

server/handler/upgradehandler.go#L91-L92

Added lines #L91 - L92 were not covered by tests
defer se.Close()

isUpgrading, err := session.IsUpgradingClusterState(se)
if err != nil {
return false, err
}

Check warning on line 98 in server/handler/upgradehandler.go

View check run for this annotation

Codecov / codecov/patch

server/handler/upgradehandler.go#L97-L98

Added lines #L97 - L98 were not covered by tests
if !isUpgrading {
return true, nil
}

err = session.SyncNormalRunning(se)
return false, err
}
4 changes: 4 additions & 0 deletions server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/server/handler"
"github.com/pingcap/tidb/server/handler/optimizor"
"github.com/pingcap/tidb/server/handler/tikvhandler"
"github.com/pingcap/tidb/server/handler/ttlhandler"
Expand Down Expand Up @@ -246,6 +247,9 @@ func (s *Server) startHTTPServer() {
// HTTP path for get table tiflash replica info.
router.Handle("/tiflash/replica-deprecated", tikvhandler.NewFlashReplicaHandler(tikvHandlerTool))

// HTTP path for upgrade operations.
router.Handle("/upgrade/{op}", handler.NewClusterUpgradeHandler(tikvHandlerTool.Store.(kv.Storage))).Name("upgrade operations")

if s.cfg.Store == "tikv" {
// HTTP path for tikv.
router.Handle("/tables/{db}/{table}/regions", tikvhandler.NewTableHandler(tikvHandlerTool, tikvhandler.OpTableRegions))
Expand Down
39 changes: 30 additions & 9 deletions session/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -1209,7 +1209,7 @@
}

if ver >= int64(SupportUpgradeStateVer) {
syncUpgradeState(s)
terror.MustNil(SyncUpgradeState(s))
}
if isNull {
upgradeToVer99Before(s)
Expand All @@ -1224,7 +1224,7 @@
upgradeToVer99After(s)
}
if ver >= int64(SupportUpgradeStateVer) {
syncNormalRunning(s)
terror.MustNil(SyncNormalRunning(s))
}

variable.DDLForce2Queue.Store(false)
Expand Down Expand Up @@ -1253,14 +1253,16 @@
}
}

func syncUpgradeState(s Session) {
// SyncUpgradeState syncs upgrade state to etcd.
func SyncUpgradeState(s Session) error {
totalInterval := time.Duration(internalSQLTimeout) * time.Second
ctx, cancelFunc := context.WithTimeout(context.Background(), totalInterval)
defer cancelFunc()
dom := domain.GetDomain(s)
err := dom.DDL().StateSyncer().UpdateGlobalState(ctx, syncer.NewStateInfo(syncer.StateUpgrading))
if err != nil {
logutil.BgLogger().Fatal("[upgrading] update global state failed", zap.String("state", syncer.StateUpgrading), zap.Error(err))
logutil.BgLogger().Error("[upgrading] update global state failed", zap.String("state", syncer.StateUpgrading), zap.Error(err))
return err

Check warning on line 1265 in session/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

session/bootstrap.go#L1264-L1265

Added lines #L1264 - L1265 were not covered by tests
}

interval := 200 * time.Millisecond
Expand All @@ -1271,7 +1273,8 @@
break
}
if i == retryTimes-1 {
logutil.BgLogger().Fatal("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err))
logutil.BgLogger().Error("[upgrading] get owner op failed", zap.Stringer("state", op), zap.Error(err))
return err

Check warning on line 1277 in session/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

session/bootstrap.go#L1276-L1277

Added lines #L1276 - L1277 were not covered by tests
}
if i%10 == 0 {
logutil.BgLogger().Warn("get owner op failed", zap.String("category", "upgrading"), zap.Stringer("state", op), zap.Error(err))
Expand All @@ -1298,21 +1301,24 @@
}

if i == retryTimes-1 {
logutil.BgLogger().Fatal("[upgrading] pause all jobs failed", zap.Strings("errs", jobErrStrs), zap.Error(err))
logutil.BgLogger().Error("[upgrading] pause all jobs failed", zap.Strings("errs", jobErrStrs), zap.Error(err))
return err

Check warning on line 1305 in session/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

session/bootstrap.go#L1304-L1305

Added lines #L1304 - L1305 were not covered by tests
}
logutil.BgLogger().Warn("pause all jobs failed", zap.String("category", "upgrading"), zap.Strings("errs", jobErrStrs), zap.Error(err))
time.Sleep(interval)
}
logutil.BgLogger().Info("update global state to upgrading", zap.String("category", "upgrading"), zap.String("state", syncer.StateUpgrading))
return nil
}

func syncNormalRunning(s Session) {
// SyncNormalRunning syncs normal state to etcd.
func SyncNormalRunning(s Session) error {
failpoint.Inject("mockResumeAllJobsFailed", func(val failpoint.Value) {
if val.(bool) {
dom := domain.GetDomain(s)
//nolint: errcheck
dom.DDL().StateSyncer().UpdateGlobalState(context.Background(), syncer.NewStateInfo(syncer.StateNormalRunning))
failpoint.Return()
failpoint.Return(nil)
}
})

Expand All @@ -1329,9 +1335,24 @@
dom := domain.GetDomain(s)
err = dom.DDL().StateSyncer().UpdateGlobalState(ctx, syncer.NewStateInfo(syncer.StateNormalRunning))
if err != nil {
logutil.BgLogger().Fatal("[upgrading] update global state to normal failed", zap.Error(err))
logutil.BgLogger().Error("[upgrading] update global state to normal failed", zap.Error(err))
return err

Check warning on line 1339 in session/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

session/bootstrap.go#L1338-L1339

Added lines #L1338 - L1339 were not covered by tests
}
logutil.BgLogger().Info("update global state to normal running finished", zap.String("category", "upgrading"))
return nil
}

// IsUpgradingClusterState checks whether the global state is upgrading.
func IsUpgradingClusterState(s Session) (bool, error) {
dom := domain.GetDomain(s)
ctx, cancelFunc := context.WithTimeout(context.Background(), 3*time.Second)
defer cancelFunc()
stateInfo, err := dom.DDL().StateSyncer().GetGlobalState(ctx)
if err != nil {
return false, err
}

Check warning on line 1353 in session/bootstrap.go

View check run for this annotation

Codecov / codecov/patch

session/bootstrap.go#L1352-L1353

Added lines #L1352 - L1353 were not covered by tests

return stateInfo.State == syncer.StateUpgrading, nil
}

// checkOwnerVersion is used to wait the DDL owner to be elected in the cluster and check it is the same version as this TiDB.
Expand Down