Skip to content

Commit

Permalink
Revert "feat: official support for multiple controller server replicas (
Browse files Browse the repository at this point in the history
#47)"

This reverts commit 8b4e410.
  • Loading branch information
sergeyberezansky committed May 25, 2023
1 parent c5baece commit d3eca4d
Show file tree
Hide file tree
Showing 6 changed files with 2 additions and 40 deletions.
3 changes: 1 addition & 2 deletions cmd/wekafsplugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ var (
allowInsecureHttps = flag.Bool("allowinsecurehttps", false, "Allow insecure HTTPS connection without cert validation")
alwaysAllowSnapshotVolumes = flag.Bool("alwaysallowsnapshotvolumes", false, "Allow snapshot-backed volumes even when Weka cluster doesn't support capacity enforcement")
usejsonlogging = flag.Bool("usejsonlogging", false, "Use structured JSON logging rather than human-readable console log formatting")
maxRandomWaitIntervalSecs = flag.Int("maxrandomwaitintervalsecs", 0, "Use random wait on prolonged controller operations to minimize risk of races. Used only if number of replicas is >=2")
maxConcurrentRequests = flag.Int64("maxconcurrentrequests", 10, "Do not allow more than X requests in parallel")
grpcRequestTimeoutSeconds = flag.Int("grpcrequesttimeoutseconds", 30, "Time out requests waiting in queue after X seconds")
// Set by the build process
Expand Down Expand Up @@ -196,7 +195,7 @@ func handle() {
*allowAutoFsCreation, *allowAutoFsExpansion,
*allowAutoSeedSnapshotCreation, *allowSnapshotsOfLegacyVolumes,
*suppressSnapshotsCapability, *suppressVolumeCloneCapability,
*allowInsecureHttps, *alwaysAllowSnapshotVolumes, *maxRandomWaitIntervalSecs, mutuallyExclusiveMountOptionsStrings,
*allowInsecureHttps, *alwaysAllowSnapshotVolumes, mutuallyExclusiveMountOptionsStrings,
*maxConcurrentRequests, *grpcRequestTimeoutSeconds)
driver, err := wekafs.NewWekaFsDriver(
*driverName, *nodeID, *endpoint, *maxVolumesPerNode, version, *debugPath, csiMode, *selinuxSupport, config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,6 @@ spec:
{{- if .Values.useJsonLogging }}
- "--usejsonlogging"
{{- end }}
{{- if ge (int (default 1 .Values.controller.replicas)) 2 }}
- "--maxrandomwaitintervalsecs={{ .Values.controller.maxRandomWaitIntervalSecs | default 0 }}"
{{- end}}
{{- if .Values.controller.grpcRequestTimeoutSeconds }}
- "--grpcrequesttimeoutseconds={{ .Values.controller.grpcRequestTimeoutSeconds | default "5" }}"
{{- end }}
Expand Down
3 changes: 0 additions & 3 deletions deploy/helm/csi-wekafsplugin/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@ nodeSelector: {}
controller:
# -- Controller number of replicas
replicas: 2
# -- when multiple controller replicas are triggered in parallel, each controller will issue a random wait of up to
# `maxRandomWaitIntervalSecs` to avoid multiple API calls. Can be increased on very busy / large Weka clusters
maxRandomWaitIntervalSecs: 5
# -- maximum concurrent operations per operation type (to avoid API starvation)
maxConcurrentRequests: 5
# -- Return GRPC Unavailable if request waits in queue for that long time (seconds)
Expand Down
16 changes: 0 additions & 16 deletions pkg/wekafs/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,6 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
// Check for maximum available capacity
capacity := req.GetCapacityRange().GetRequiredBytes()

// randomBackoff to minimize race conditions when multiple CSI controllers are run in parallel
waitRandomTime(ctx, cs.config.maxRandomWaitIntervalSecs)

// IDEMPOTENCE FLOW: If directory already exists, return the createResponse if size matches, or error
volExists, volMatchesCapacity, err := volumeExistsAndMatchesCapacity(ctx, volume, capacity)

Expand Down Expand Up @@ -346,9 +343,6 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return DeleteVolumeError(ctx, codes.Internal, err.Error())
}

// randomBackoff to minimize race conditions when multiple CSI controllers are run in parallel
waitRandomTime(ctx, cs.config.maxRandomWaitIntervalSecs)

err = volume.Trash(ctx)
if os.IsNotExist(err) {
logger.Debug().Str("volume_id", volume.GetId()).Msg("Volume not found, but returning success for idempotence")
Expand Down Expand Up @@ -424,9 +418,6 @@ func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi
return ExpandVolumeError(ctx, codes.NotFound, fmt.Sprintf("Volume with id %s does not exist", req.GetVolumeId()))
}

// randomBackoff to minimize race conditions when multiple CSI controllers are run in parallel
waitRandomTime(ctx, cs.config.maxRandomWaitIntervalSecs)

maxStorageCapacity, err := volume.getMaxCapacity(ctx)
if err != nil {
return ExpandVolumeError(ctx, codes.Unknown, fmt.Sprintf("ExpandVolume: Cannot obtain free capacity for volume %s", volume.GetId()))
Expand Down Expand Up @@ -521,9 +512,6 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
return CreateSnapshotError(ctx, codes.FailedPrecondition, fmt.Sprintf("Could not find source volume %s", srcVolume.GetId()))
}

// randomBackoff to minimize race conditions when multiple CSI controllers are run in parallel
waitRandomTime(ctx, cs.config.maxRandomWaitIntervalSecs)

s, err := srcVolume.CreateSnapshot(ctx, snapName)
if err != nil {
return &csi.CreateSnapshotResponse{}, err
Expand Down Expand Up @@ -588,10 +576,6 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
if err != nil {
return DeleteSnapshotError(ctx, codes.Internal, fmt.Sprintln("Failed to initialize snapshot from ID", snapshotID, err.Error()))
}

// randomBackoff to minimize race conditions when multiple CSI controllers are run in parallel
waitRandomTime(ctx, cs.config.maxRandomWaitIntervalSecs)

err = existingSnap.Delete(ctx)
if err != nil {
return DeleteSnapshotError(ctx, codes.Internal, fmt.Sprintln("Failed to delete snapshot", snapshotID, err))
Expand Down
4 changes: 1 addition & 3 deletions pkg/wekafs/driverconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ type DriverConfig struct {
debugPath string
allowInsecureHttps bool
alwaysAllowSnapshotVolumes bool
maxRandomWaitIntervalSecs int
mutuallyExclusiveOptions []mutuallyExclusiveMountOptionSet
maxConcurrentRequests int64
grpcRequestTimeout time.Duration
Expand All @@ -48,7 +47,7 @@ func (dc *DriverConfig) Log() {
func NewDriverConfig(dynamicVolPath, VolumePrefix, SnapshotPrefix, SeedSnapshotPrefix, debugPath string,
allowAutoFsCreation, allowAutoFsExpansion, allowAutoSeedSnapshotCreation, allowSnapshotsOfLegacyVolumes bool,
suppressnapshotSupport, suppressVolumeCloneSupport,
allowInsecureHttps, alwaysAllowSnapshotVolumes bool, maxRandomWaitIntervalSecs int,
allowInsecureHttps, alwaysAllowSnapshotVolumes bool,
mutuallyExclusiveMountOptions MutuallyExclusiveMountOptsStrings, maxConcurrentRequests int64,
grpcRequestTimeoutSeconds int) *DriverConfig {

Expand Down Expand Up @@ -77,7 +76,6 @@ func NewDriverConfig(dynamicVolPath, VolumePrefix, SnapshotPrefix, SeedSnapshotP
debugPath: debugPath,
allowInsecureHttps: allowInsecureHttps,
alwaysAllowSnapshotVolumes: alwaysAllowSnapshotVolumes,
maxRandomWaitIntervalSecs: maxRandomWaitIntervalSecs,
mutuallyExclusiveOptions: MutuallyExclusiveMountOptions,
maxConcurrentRequests: maxConcurrentRequests,
grpcRequestTimeout: grpcRequestTimeout,
Expand Down
13 changes: 0 additions & 13 deletions pkg/wekafs/utilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
timestamp "google.golang.org/protobuf/types/known/timestamppb"
"math/rand"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -470,15 +469,3 @@ func volumeExistsAndMatchesCapacity(ctx context.Context, v *Volume, capacity int
matches := reportedCapacity == capacity
return exists, matches, err
}

func waitRandomTime(ctx context.Context, maxInterval int) {
if maxInterval == 0 {
return
}
logger := log.Ctx(ctx)
interval := rand.Intn(maxInterval)
if interval > 0 {
logger.Trace().Int("wait_seconds", interval).Msg("Issuing a random backoff before performing operation")
time.Sleep(time.Second * time.Duration(interval))
}
}

0 comments on commit d3eca4d

Please sign in to comment.