diff --git a/proto/heartbeat.proto b/proto/heartbeat.proto index 693bb6a9c5..77adadb293 100644 --- a/proto/heartbeat.proto +++ b/proto/heartbeat.proto @@ -150,8 +150,6 @@ message CopySetConf { // 表示待删除节点。 // chunkserver收到CHANGE_PEER,根据peers,configchangeItem,oldPeer拼出新的conf optional common.Peer oldPeer = 7; - // copyset availflag - optional bool availflag = 8; }; enum HeartbeatStatusCode { diff --git a/proto/topology.proto b/proto/topology.proto index 69057e7424..5c6031f6e9 100644 --- a/proto/topology.proto +++ b/proto/topology.proto @@ -48,6 +48,7 @@ enum ChunkServerStatus { enum DiskState { DISKNORMAL = 0; DISKERROR = 1; + DISKFULL = 2; } enum OnlineState { diff --git a/src/chunkserver/heartbeat.cpp b/src/chunkserver/heartbeat.cpp index 8db742ab62..4970bd7b81 100644 --- a/src/chunkserver/heartbeat.cpp +++ b/src/chunkserver/heartbeat.cpp @@ -307,6 +307,14 @@ int Heartbeat::BuildRequest(HeartbeatRequest* req) { int leaders = 0; for (CopysetNodePtr copyset : copysets) { + + // 如果磁盘空间不足设为readonly + if (diskState->errtype() == curve::mds::heartbeat::DISKFULL) { + copyset->SetReadOnly(true); + }else { + copyset->SetReadOnly(false); + } + curve::mds::heartbeat::CopySetInfo* info = req->add_copysetinfos(); ret = BuildCopysetInfo(info, copyset); @@ -439,9 +447,6 @@ int Heartbeat::ExecTask(const HeartbeatResponse& response) { continue; } - // 判断copyset是否avail,否则设置readonly - copyset->SetReadOnly(!conf.availflag()); - // 解析该chunkserver上的copyset是否需要删除 // 需要删除则清理copyset if (HeartbeatHelper::NeedPurge(csEp_, conf, copyset)) { diff --git a/src/client/chunk_closure.cpp b/src/client/chunk_closure.cpp index 5cd2acde1b..ebed92ae88 100644 --- a/src/client/chunk_closure.cpp +++ b/src/client/chunk_closure.cpp @@ -239,6 +239,7 @@ void ClientClosure::Run() { break; case CHUNK_OP_STATUS::CHUNK_OP_STATUS_READONLY: + needRetry = true; OnReadOnly(); break; diff --git a/src/mds/heartbeat/copyset_conf_generator.cpp b/src/mds/heartbeat/copyset_conf_generator.cpp index a056da1eb1..9139275e4e 100644 --- a/src/mds/heartbeat/copyset_conf_generator.cpp +++ b/src/mds/heartbeat/copyset_conf_generator.cpp @@ -52,9 +52,6 @@ bool CopysetConfGenerator::GenCopysetConf( return true; } - // set copyset availflag - copysetConf->set_availflag(recordCopySetInfo.IsAvailable()); - if (reportCopySetInfo.GetLeader() == reportId) { ChunkServerIdType candidate = LeaderGenCopysetConf(reportCopySetInfo, configChInfo, copysetConf); diff --git a/src/mds/heartbeat/heartbeat_manager.cpp b/src/mds/heartbeat/heartbeat_manager.cpp index 31ad7f6743..8e17baa0a5 100644 --- a/src/mds/heartbeat/heartbeat_manager.cpp +++ b/src/mds/heartbeat/heartbeat_manager.cpp @@ -102,40 +102,31 @@ void HeartbeatManager::UpdateChunkServerDiskStatus( // update ChunkServerState status (disk status) ChunkServerState state; - switch (request.diskstate().errtype()) - { - case curve::mds::heartbeat::DISKFULL: - { + curve::mds::heartbeat::ErrorType errType = request.diskstate().errtype(); + + if (errType == curve::mds::heartbeat::DISKFULL) { // 当chunkserver磁盘接近满,需要将copyset availflag设为false,避免新空间从这些copyset分配 CopySetFilter filter = [](const curve::mds::topology::CopySetInfo &cs) { return cs.IsAvailable(); }; std::vector keys = topology_->GetCopySetsInChunkServer(request.chunkserverid(), filter); for (auto key : keys) { - topology_->SetCopySetAvalFlag(key, false); + topology_->SetCopySetAvalFlag(key, false); } // 设置disk error,copyset就不会迁移到这个chunkserver - state.SetDiskState(curve::mds::topology::DISKERROR); + state.SetDiskState(curve::mds::topology::DISKFULL); LOG(ERROR) << "heartbeat report disk full error, " << "diskused = " << request.diskused() << "capacity = " << request.diskcapacity() << "chunkserverid =" << request.chunkserverid(); - break; - } - - case curve::mds::heartbeat::NORMAL: - { + + }else if (errType == curve::mds::heartbeat::NORMAL) { state.SetDiskState(curve::mds::topology::DISKNORMAL); - break; - } - default: - { + }else { state.SetDiskState(curve::mds::topology::DISKERROR); LOG(ERROR) << "heartbeat report disk error, " << "errortype = " << request.diskstate().errtype() << "errmsg = " << request.diskstate().errmsg(); - break; - } } state.SetDiskCapacity(request.diskcapacity()); diff --git a/tools-v2/go.mod b/tools-v2/go.mod index f48f77e0dc..9dfbc11562 100644 --- a/tools-v2/go.mod +++ b/tools-v2/go.mod @@ -2,37 +2,34 @@ module github.com/opencurve/curve/tools-v2 go 1.19 -replace github.com/optiopay/kafka => github.com/cilium/kafka v0.0.0-20180809090225-01ce283b732b - require ( - github.com/cilium/cilium v1.13.7 - github.com/deckarep/golang-set/v2 v2.3.0 - github.com/docker/cli v24.0.2+incompatible + github.com/cilium/cilium v1.14.3 + github.com/deckarep/golang-set/v2 v2.3.1 + github.com/docker/cli v24.0.7+incompatible github.com/dustin/go-humanize v1.0.1 - github.com/gookit/color v1.5.3 + github.com/gookit/color v1.5.4 github.com/moby/term v0.5.0 github.com/olekukonko/tablewriter v0.0.5 github.com/pkg/xattr v0.4.9 - github.com/schollz/progressbar/v3 v3.13.1 - github.com/smartystreets/goconvey v1.8.0 - github.com/spf13/cobra v1.7.0 + github.com/schollz/progressbar/v3 v3.14.0 + github.com/smartystreets/goconvey v1.8.1 + github.com/spf13/cobra v1.8.0 github.com/spf13/pflag v1.0.5 - github.com/spf13/viper v1.16.0 - golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 - golang.org/x/sys v0.13.0 - google.golang.org/grpc v1.55.0 - google.golang.org/protobuf v1.30.0 + github.com/spf13/viper v1.17.0 + golang.org/x/exp v0.0.0-20231108232855-2478ac86f678 + golang.org/x/sys v0.14.0 + google.golang.org/grpc v1.59.0 + google.golang.org/protobuf v1.31.0 ) require ( - github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect - github.com/Microsoft/go-winio v0.6.1 // indirect - github.com/Shopify/logrus-bugsnag v0.0.0-20171204204709-577dee27f20d // indirect + github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect + github.com/Microsoft/go-winio v0.6.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/docker/distribution v2.8.2+incompatible // indirect github.com/docker/docker v24.0.2+incompatible // indirect - github.com/docker/docker-credential-helpers v0.7.0 // indirect + github.com/docker/docker-credential-helpers v0.8.0 // indirect github.com/docker/go v1.5.1-1.0.20160303222718-d30aec9fd63c // indirect github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-metrics v0.0.1 // indirect @@ -47,39 +44,42 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jtolds/gls v4.20.0+incompatible // indirect github.com/magiconair/properties v1.8.7 // indirect - github.com/mattn/go-runewidth v0.0.14 // indirect + github.com/mattn/go-runewidth v0.0.9 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect - github.com/miekg/pkcs11 v1.1.1 // indirect + github.com/miekg/pkcs11 v1.0.2 // indirect github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect github.com/moby/sys/sequential v0.5.0 // indirect github.com/morikuni/aec v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect - github.com/pelletier/go-toml/v2 v2.0.8 // indirect + github.com/pelletier/go-toml/v2 v2.1.0 // indirect github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.15.1 // indirect + github.com/prometheus/client_golang v1.16.0 // indirect github.com/prometheus/client_model v0.4.0 // indirect - github.com/prometheus/common v0.44.0 // indirect - github.com/prometheus/procfs v0.10.1 // indirect + github.com/prometheus/common v0.42.0 // indirect + github.com/prometheus/procfs v0.11.0 // indirect github.com/rivo/uniseg v0.4.4 // indirect + github.com/sagikazarmark/locafero v0.3.0 // indirect + github.com/sagikazarmark/slog-shim v0.1.0 // indirect github.com/sirupsen/logrus v1.9.3 // indirect - github.com/smartystreets/assertions v1.13.1 // indirect - github.com/spf13/afero v1.9.5 // indirect + github.com/smarty/assertions v1.15.0 // indirect + github.com/sourcegraph/conc v0.3.0 // indirect + github.com/spf13/afero v1.10.0 // indirect github.com/spf13/cast v1.5.1 // indirect - github.com/spf13/jwalterweatherman v1.1.0 // indirect - github.com/subosito/gotenv v1.4.2 // indirect + github.com/subosito/gotenv v1.6.0 // indirect github.com/theupdateframework/notary v0.7.0 // indirect - github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect - golang.org/x/crypto v0.14.0 // indirect - golang.org/x/mod v0.10.0 // indirect - golang.org/x/net v0.17.0 // indirect - golang.org/x/sync v0.2.0 // indirect - golang.org/x/term v0.13.0 // indirect - golang.org/x/text v0.13.0 // indirect - golang.org/x/tools v0.9.3 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc // indirect + github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect + go.uber.org/multierr v1.11.0 // indirect + golang.org/x/crypto v0.15.0 // indirect + golang.org/x/mod v0.14.0 // indirect + golang.org/x/net v0.18.0 // indirect + golang.org/x/sync v0.5.0 // indirect + golang.org/x/term v0.14.0 // indirect + golang.org/x/text v0.14.0 // indirect + golang.org/x/tools v0.15.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - gotest.tools/v3 v3.0.3 // indirect + gotest.tools/v3 v3.5.1 // indirect ) diff --git a/tools-v2/pkg/cli/command/curvebs/delete/copyset/copyset.go b/tools-v2/pkg/cli/command/curvebs/delete/copyset/copyset.go new file mode 100644 index 0000000000..70c06cc2b6 --- /dev/null +++ b/tools-v2/pkg/cli/command/curvebs/delete/copyset/copyset.go @@ -0,0 +1,102 @@ +package copyset + +import ( + "context" + cmderror "github.com/opencurve/curve/tools-v2/internal/error" + basecmd "github.com/opencurve/curve/tools-v2/pkg/cli/command" + "github.com/opencurve/curve/tools-v2/pkg/config" + "github.com/opencurve/curve/tools-v2/pkg/output" + "github.com/opencurve/curve/tools-v2/proto/proto/topology" + "github.com/spf13/cobra" + "google.golang.org/grpc" + "strconv" +) + +const ( + deleteBrokenCopySetExample = `$ curve bs delete broken-copyset --chunkserverid=1` +) + +type DeleteBrokenCopySetInChunkServerRpc struct { + Info *basecmd.Rpc + Request *topology.DeleteBrokenCopysetInChunkServerRequest + topologyClient topology.TopologyServiceClient +} + +var _ basecmd.RpcFunc = (*DeleteBrokenCopySetInChunkServerRpc)(nil) // check interface + +type DeleteBrokenCopySetCommand struct { + basecmd.FinalCurveCmd + Rpc *DeleteBrokenCopySetInChunkServerRpc + Servers []*topology.ServerInfo + chunkserverid uint32 +} + +func (d *DeleteBrokenCopySetInChunkServerRpc) Stub_Func(ctx context.Context) (interface{}, error) { + return d.topologyClient.DeleteBrokenCopysetInChunkServer(ctx, d.Request) +} + +func NewDeleteCommand() *cobra.Command { + return NewDeleteBrokenCopySetCommand().Cmd +} + +func NewDeleteBrokenCopySetCommand() *DeleteBrokenCopySetCommand { + cmd := &DeleteBrokenCopySetCommand{ + FinalCurveCmd: basecmd.FinalCurveCmd{ + Use: "broken-copyset", + Short: "delete broken copyset in chunkserver", + Example: deleteBrokenCopySetExample, + }, + } + + basecmd.NewFinalCurveCli(&cmd.FinalCurveCmd, cmd) + return cmd +} + +func (d *DeleteBrokenCopySetCommand) AddFlags() { + config.AddBsMdsFlagOption(d.Cmd) + config.AddRpcRetryTimesFlag(d.Cmd) + config.AddRpcTimeoutFlag(d.Cmd) + config.AddBsChunkServerIdFlag(d.Cmd) +} + +func (d *DeleteBrokenCopySetInChunkServerRpc) NewRpcClient(cc grpc.ClientConnInterface) { + d.topologyClient = topology.NewTopologyServiceClient(cc) +} + +func (d *DeleteBrokenCopySetCommand) Init(cmd *cobra.Command, args []string) error { + mdsAddrs, err := config.GetBsMdsAddrSlice(d.Cmd) + if err.TypeCode() != cmderror.CODE_SUCCESS { + return err.ToError() + } + timeout := config.GetFlagDuration(d.Cmd, config.RPCTIMEOUT) + retrytimes := config.GetFlagInt32(d.Cmd, config.RPCRETRYTIMES) + strid, e := strconv.Atoi(config.GetBsFlagString(d.Cmd, config.CURVEBS_CHUNKSERVER_ID)) + if e != nil { + return e + } + d.chunkserverid = uint32(strid) + d.Rpc = &DeleteBrokenCopySetInChunkServerRpc{ + Info: basecmd.NewRpc(mdsAddrs, timeout, retrytimes, "DeleteBrokenCopysetInChunkServer"), + Request: &topology.DeleteBrokenCopysetInChunkServerRequest{ + ChunkServerID: &d.chunkserverid, + }, + } + return nil +} + +func (d *DeleteBrokenCopySetCommand) RunCommand(cmd *cobra.Command, args []string) error { + result, errCmd := basecmd.GetRpcResponse(d.Rpc.Info, d.Rpc) + if errCmd.TypeCode() != cmderror.CODE_SUCCESS { + return errCmd.ToError() + } + d.Result = result + return nil +} + +func (d *DeleteBrokenCopySetCommand) Print(cmd *cobra.Command, args []string) error { + return output.FinalCmdOutput(&d.FinalCurveCmd, d) +} + +func (d *DeleteBrokenCopySetCommand) ResultPlainOutput() error { + return output.FinalCmdOutputPlain(&d.FinalCurveCmd) +} diff --git a/tools-v2/pkg/cli/command/curvebs/delete/delete.go b/tools-v2/pkg/cli/command/curvebs/delete/delete.go index fe57b58ef0..83c2b1e57a 100644 --- a/tools-v2/pkg/cli/command/curvebs/delete/delete.go +++ b/tools-v2/pkg/cli/command/curvebs/delete/delete.go @@ -7,6 +7,7 @@ package delete import ( + "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/delete/copyset" "github.com/spf13/cobra" basecmd "github.com/opencurve/curve/tools-v2/pkg/cli/command" @@ -26,6 +27,7 @@ func (dCmd *DeleteCommand) AddSubCommands() { file.NewFileCommand(), peer.NewCommand(), volume.NewVolumeCommand(), + copyset.NewDeleteCommand(), ) } diff --git a/tools-v2/pkg/cli/command/curvebs/list/list.go b/tools-v2/pkg/cli/command/curvebs/list/list.go index 44472a01a9..61d269b790 100644 --- a/tools-v2/pkg/cli/command/curvebs/list/list.go +++ b/tools-v2/pkg/cli/command/curvebs/list/list.go @@ -34,6 +34,7 @@ import ( "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/list/server" "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/list/snapshot" "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/list/space" + "github.com/opencurve/curve/tools-v2/pkg/cli/command/curvebs/list/unavailcopysets" "github.com/spf13/cobra" ) @@ -55,6 +56,7 @@ func (listCmd *ListCommand) AddSubCommands() { may_broken_vol.NewMayBrokenVolCommand(), formatstatus.NewFormatStatusCommand(), snapshot.NewSnapShotCommand(), + unavailcopysets.NewUnAvailCopySetsCommand(), ) } diff --git a/tools-v2/pkg/cli/command/curvebs/list/unavailcopysets/unavailcopysets.go b/tools-v2/pkg/cli/command/curvebs/list/unavailcopysets/unavailcopysets.go index f9e9f5a103..ffa50c4c36 100644 --- a/tools-v2/pkg/cli/command/curvebs/list/unavailcopysets/unavailcopysets.go +++ b/tools-v2/pkg/cli/command/curvebs/list/unavailcopysets/unavailcopysets.go @@ -24,6 +24,8 @@ package unavailcopysets import ( "context" + "fmt" + cobrautil "github.com/opencurve/curve/tools-v2/internal/utils" cmderror "github.com/opencurve/curve/tools-v2/internal/error" basecmd "github.com/opencurve/curve/tools-v2/pkg/cli/command" @@ -36,6 +38,10 @@ import ( "google.golang.org/grpc" ) +const ( + listUnAvailCopySetExample = `$ curve bs list unavail-copyset` +) + type ListUnAvailCopySets struct { Info *basecmd.Rpc Request *topology.ListUnAvailCopySetsRequest @@ -66,7 +72,11 @@ func NewUnAvailCopySetsCommand() *cobra.Command { func NewListUnAvailCopySetsCommand() *UnAvailCopySetsCommand { uCmd := &UnAvailCopySetsCommand{ - FinalCurveCmd: basecmd.FinalCurveCmd{}, + FinalCurveCmd: basecmd.FinalCurveCmd{ + Use: "unavail-copyset", + Short: "list unavail copyset", + Example: listUnAvailCopySetExample, + }, } basecmd.NewFinalCurveCli(&uCmd.FinalCurveCmd, uCmd) @@ -90,6 +100,8 @@ func (uCmd *UnAvailCopySetsCommand) Init(cmd *cobra.Command, args []string) erro Request: &topology.ListUnAvailCopySetsRequest{}, Info: basecmd.NewRpc(mdsAddrs, timeout, retrytimes, "ListUnAvailCopySets"), } + header := []string{cobrautil.ROW_LOGICALPOOL, cobrautil.ROW_COPYSET} + uCmd.SetHeader(header) return nil } @@ -109,6 +121,18 @@ func (uCmd *UnAvailCopySetsCommand) RunCommand(cmd *cobra.Command, args []string return cmderror.ErrBsListPhysicalPoolRpc(code).ToError() } uCmd.response = response.Copysets + uCmd.Result = response.Copysets + rows := make([]map[string]string, 0) + for _, info := range response.Copysets { + row := make(map[string]string) + row[cobrautil.ROW_LOGICALPOOL] = fmt.Sprintf("%d", info.GetLogicalPoolId()) + row[cobrautil.ROW_COPYSET] = fmt.Sprintf("%d", info.GetCopysetId()) + rows = append(rows, row) + } + list := cobrautil.ListMap2ListSortByKeys(rows, uCmd.Header, []string{ + cobrautil.ROW_LOGICALPOOL, cobrautil.ROW_COPYSET, + }) + uCmd.TableNew.AppendBulk(list) return nil }