Skip to content

Commit

Permalink
feat(CSI-245): allow specifying client group for NFS
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeyberezansky committed Sep 3, 2024
1 parent 8bf9785 commit 34ecc36
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ spec:
{{- if .Values.pluginConfig.mountProtocol.interfaceGroupName }}
- "--interfacegroupname={{ .Values.pluginConfig.mountProtocol.interfaceGroupName }}"
{{- end }}
{{- if .Values.pluginConfig.mountProtocol.clientGroupName }}
- "--clientgroupname={{ .Values.pluginConfig.mountProtocol.clientGroupName }}"
{{- end }}
ports:
- containerPort: 9898
name: healthz
Expand Down
3 changes: 3 additions & 0 deletions charts/csi-wekafsplugin/templates/nodeserver-daemonset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ spec:
{{- if .Values.pluginConfig.mountProtocol.interfaceGroupName }}
- "--interfacegroupname={{ .Values.pluginConfig.mountProtocol.interfaceGroupName }}"
{{- end }}
{{- if .Values.pluginConfig.mountProtocol.clientGroupName }}
- "--clientgroupname={{ .Values.pluginConfig.mountProtocol.clientGroupName }}"
{{- end }}
ports:
- containerPort: 9899
name: healthz
Expand Down
2 changes: 2 additions & 0 deletions charts/csi-wekafsplugin/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -151,5 +151,7 @@ pluginConfig:
allowNfsFailback: false
# -- Specify name of NFS interface group to use for mounting Weka filesystems. If not set, first NFS interface group will be used
interfaceGroupName: ""
# -- Specify existing client group name for NFS configuration. If not set, "WekaCSIPluginClients" group will be created
clientGroupName: ""


2 changes: 2 additions & 0 deletions cmd/wekafsplugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ var (
allowNfsFailback = flag.Bool("allownfsfailback", false, "Allow NFS failback")
useNfs = flag.Bool("usenfs", false, "Use NFS for mounting volumes")
interfaceGroupName = flag.String("interfacegroupname", "", "Name of the NFS interface group to use for mounting volumes")
clientGroupName = flag.String("clientgroupname", "", "Name of the NFS client group to use for managing NFS permissions")
// Set by the build process
version = ""
)
Expand Down Expand Up @@ -223,6 +224,7 @@ func handle() {
*allowNfsFailback,
*useNfs,
*interfaceGroupName,
*clientGroupName,
)
driver, err := wekafs.NewWekaFsDriver(
*driverName, *nodeID, *endpoint, *maxVolumesPerNode, version, *debugPath, csiMode, *selinuxSupport, config)
Expand Down
23 changes: 13 additions & 10 deletions pkg/wekafs/apiclient/nfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,22 +363,25 @@ func (a *ApiClient) CreateNfsClientGroup(ctx context.Context, r *NfsClientGroupC
return err
}

func (a *ApiClient) EnsureCsiPluginNfsClientGroup(ctx context.Context) (*NfsClientGroup, error) {
func (a *ApiClient) EnsureCsiPluginNfsClientGroup(ctx context.Context, clientGroupName string) (*NfsClientGroup, error) {
op := "EnsureCsiPluginNfsClientGroup"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx)
var ret *NfsClientGroup
logger.Trace().Str("client_group_name", NfsClientGroupName).Msg("Getting client group by name")
ret, err := a.GetNfsClientGroupByName(ctx, NfsClientGroupName)
if clientGroupName == "" {
clientGroupName = NfsClientGroupName
}
logger.Trace().Str("client_group_name", clientGroupName).Msg("Getting client group by name")
ret, err := a.GetNfsClientGroupByName(ctx, clientGroupName)
if err != nil {
if err != ObjectNotFoundError {
logger.Error().Err(err).Msg("Failed to get client group by name")
return ret, err
} else {
logger.Trace().Str("client_group_name", NfsClientGroupName).Msg("Existing client group not found, creating client group")
err = a.CreateNfsClientGroup(ctx, NewNfsClientGroupCreateRequest(NfsClientGroupName), ret)
logger.Trace().Str("client_group_name", clientGroupName).Msg("Existing client group not found, creating client group")
err = a.CreateNfsClientGroup(ctx, NewNfsClientGroupCreateRequest(clientGroupName), ret)
}
}
return ret, nil
Expand Down Expand Up @@ -540,12 +543,12 @@ func (r *NfsClientGroupRule) IsEligibleForIP(ip string) bool {
return network.ContainsIPAddress(ip)
}

func (a *ApiClient) GetNfsClientGroupRules(ctx context.Context, rules *[]NfsClientGroupRule) error {
func (a *ApiClient) GetNfsClientGroupRules(ctx context.Context, clientGroupName string, rules *[]NfsClientGroupRule) error {
op := "GetNfsClientGroupRules"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
cg, err := a.EnsureCsiPluginNfsClientGroup(ctx)
cg, err := a.EnsureCsiPluginNfsClientGroup(ctx, clientGroupName)
if err != nil {
return err
}
Expand Down Expand Up @@ -705,16 +708,16 @@ func (a *ApiClient) EnsureNfsClientGroupRuleForIp(ctx context.Context, cg *NfsCl
return err
}

func (a *ApiClient) EnsureNfsPermissions(ctx context.Context, ip string, fsName string) error {
func (a *ApiClient) EnsureNfsPermissions(ctx context.Context, ip string, fsName string, clientGroupName string) error {
op := "EnsureNfsPermissions"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx)
logger.Debug().Str("ip", ip).Str("filesystem", fsName).Msg("Ensuring NFS permissions")
logger.Debug().Str("ip", ip).Str("filesystem", fsName).Str("client_group_name", clientGroupName).Msg("Ensuring NFS permissions")
// Ensure client group
logger.Trace().Msg("Ensuring CSI Plugin NFS Client Group")
cg, err := a.EnsureCsiPluginNfsClientGroup(ctx)
cg, err := a.EnsureCsiPluginNfsClientGroup(ctx, clientGroupName)
if err != nil {
logger.Error().Err(err).Msg("Failed to ensure NFS client group")
return err
Expand Down
8 changes: 4 additions & 4 deletions pkg/wekafs/apiclient/nfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,14 @@ func TestNfsClientGroup(t *testing.T) {

func TestEnsureCsiPluginNfsClientGroup(t *testing.T) {
apiClient := GetApiClientForTest(t)
result, err := apiClient.EnsureCsiPluginNfsClientGroup(context.Background())
result, err := apiClient.EnsureCsiPluginNfsClientGroup(context.Background(), NfsClientGroupName)
assert.NoError(t, err)
assert.NotNil(t, result)
}

func TestNfsClientGroupRules(t *testing.T) {
apiClient := GetApiClientForTest(t)
cg, err := apiClient.EnsureCsiPluginNfsClientGroup(context.Background())
cg, err := apiClient.EnsureCsiPluginNfsClientGroup(context.Background(), NfsClientGroupName)
assert.NoError(t, err)
assert.NotNil(t, cg)

Expand All @@ -236,7 +236,7 @@ outerLoop:
assert.NoError(t, err)
}
rules := &[]NfsClientGroupRule{}
err = apiClient.GetNfsClientGroupRules(context.Background(), rules)
err = apiClient.GetNfsClientGroupRules(context.Background(), NfsClientGroupName, rules)
assert.NoError(t, err)
assert.NotEmpty(t, rules)
for _, rule := range *rules {
Expand All @@ -251,7 +251,7 @@ func TestNfsEnsureNfsPermissions(t *testing.T) {
apiClient := GetApiClientForTest(t)

// Test EnsureNfsPermission
err := apiClient.EnsureNfsPermissions(context.Background(), "172.16.5.147", "default")
err := apiClient.EnsureNfsPermissions(context.Background(), "172.16.5.147", "default", NfsClientGroupName)
assert.NoError(t, err)
}

Expand Down
20 changes: 7 additions & 13 deletions pkg/wekafs/driverconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,11 @@ type DriverConfig struct {
allowProtocolContainers bool
allowNfsFailback bool
useNfs bool
interfaceGroupName *string
interfaceGroupName string
clientGroupName string
}

func (dc *DriverConfig) Log() {
igName := "<<default>>"
if dc.interfaceGroupName != nil {
igName = *dc.interfaceGroupName
}
log.Info().Str("dynamic_vol_path", dc.DynamicVolPath).
Str("volume_prefix", dc.VolumePrefix).Str("snapshot_prefix", dc.SnapshotPrefix).Str("seed_snapshot_prefix", dc.SnapshotPrefix).
Bool("allow_auto_fs_creation", dc.allowAutoFsCreation).Bool("allow_auto_fs_expansion", dc.allowAutoFsExpansion).
Expand All @@ -60,7 +57,8 @@ func (dc *DriverConfig) Log() {
Bool("allow_protocol_containers", dc.allowProtocolContainers).
Bool("allow_nfs_failback", dc.allowNfsFailback).
Bool("use_nfs", dc.useNfs).
Str("interface_group_name", igName).
Str("interface_group_name", dc.interfaceGroupName).
Str("client_group_name", dc.clientGroupName).
Msg("Starting driver with the following configuration")

}
Expand All @@ -72,7 +70,7 @@ func NewDriverConfig(dynamicVolPath, VolumePrefix, SnapshotPrefix, SeedSnapshotP
grpcRequestTimeoutSeconds int,
allowProtocolContainers bool,
allowNfsFailback, useNfs bool,
interfaceGroupName string,
interfaceGroupName, clientGroupName string,
) *DriverConfig {

var MutuallyExclusiveMountOptions []mutuallyExclusiveMountOptionSet
Expand All @@ -95,11 +93,6 @@ func NewDriverConfig(dynamicVolPath, VolumePrefix, SnapshotPrefix, SeedSnapshotP
concurrency["NodePublishVolume"] = maxNodePublishVolumeReqs
concurrency["NodeUnpublishVolume"] = maxNodeUnpublishVolumeReqs

igName := &[]string{interfaceGroupName}[0]
if interfaceGroupName == "" {
igName = nil
}

return &DriverConfig{
DynamicVolPath: dynamicVolPath,
VolumePrefix: VolumePrefix,
Expand All @@ -119,7 +112,8 @@ func NewDriverConfig(dynamicVolPath, VolumePrefix, SnapshotPrefix, SeedSnapshotP
allowProtocolContainers: allowProtocolContainers,
allowNfsFailback: allowNfsFailback,
useNfs: useNfs,
interfaceGroupName: igName,
interfaceGroupName: interfaceGroupName,
clientGroupName: clientGroupName,
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/wekafs/nfsmount.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type nfsMount struct {
lastUsed time.Time
mountIpAddress string
interfaceGroupName *string
clientGroupName string
}

func (m *nfsMount) getMountPoint() string {
Expand Down Expand Up @@ -131,7 +132,7 @@ func (m *nfsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient,
}

nodeIP := apiclient.GetNodeIpAddress()
if apiClient.EnsureNfsPermissions(ctx, nodeIP, m.fsName) != nil {
if apiClient.EnsureNfsPermissions(ctx, nodeIP, m.fsName, m.clientGroupName) != nil {
logger.Error().Msg("Failed to ensure NFS permissions")
return errors.New("failed to ensure NFS permissions")
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/wekafs/nfsmounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type nfsMounter struct {
selinuxSupport *bool
gc *innerPathVolGc
interfaceGroupName *string
clientGroupName string
}

func (m *nfsMounter) getGarbageCollector() *innerPathVolGc {
Expand All @@ -32,7 +33,8 @@ func newNfsMounter(driver *WekaFsDriver) *nfsMounter {
mounter := &nfsMounter{mountMap: mountsMap{}, debugPath: driver.debugPath, selinuxSupport: selinuxSupport}
mounter.gc = initInnerPathVolumeGc(mounter)
mounter.schedulePeriodicMountGc()
mounter.interfaceGroupName = driver.config.interfaceGroupName
mounter.interfaceGroupName = &driver.config.interfaceGroupName
mounter.clientGroupName = driver.config.clientGroupName

return mounter
}
Expand All @@ -54,6 +56,7 @@ func (m *nfsMounter) NewMount(fsName string, options MountOptions) AnyMount {
mountPoint: "/run/weka-fs-mounts/" + getAsciiPart(fsName, 64) + "-" + uniqueId,
mountOptions: options,
interfaceGroupName: m.interfaceGroupName,
clientGroupName: m.clientGroupName,
}
m.mountMap[fsName][options.String()] = wMount
}
Expand Down

0 comments on commit 34ecc36

Please sign in to comment.