diff --git a/docs/NFS.md b/docs/NFS.md index 3edbf33dd..b880a8129 100644 --- a/docs/NFS.md +++ b/docs/NFS.md @@ -164,7 +164,7 @@ This is done by setting the `pluginConfig.mountProtocol.allowNfsFailback` parame The parameter `pluginConfig.mountProtocol.useNfs` enforces the use of NFS transport even if Weka client is installed on the node, and recommended to be set to `true` ONLY for testing. -Follow the [Helm installation instructions](./charts/csi-wekafsplugin/README.md) to install the Weka CSI Plugin. +Follow the [Helm installation instructions](../charts/csi-wekafsplugin/README.md) to install the Weka CSI Plugin. Most of the installation steps are the same as for the native WekaFS driver, however, additional parameters should be set in the `values.yaml` file, or passed as command line arguments to the `helm install` command. diff --git a/docs/usage.md b/docs/usage.md index 3de0d75c7..dd331e010 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -352,7 +352,7 @@ pvc-ee54de25-14f3-4024-98d0-12225e4b8215 4Gi RWX Delete ``` -1. Check that configuration was applied +2. Check that configuration was applied ```shell script $ kubectl get pv NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE diff --git a/pkg/wekafs/apiclient/filesystem.go b/pkg/wekafs/apiclient/filesystem.go index ad6558cfe..0ce0ee009 100644 --- a/pkg/wekafs/apiclient/filesystem.go +++ b/pkg/wekafs/apiclient/filesystem.go @@ -282,6 +282,7 @@ func (fs *FileSystem) GetType() string { return "filesystem" } +//goland:noinspection GoUnusedParameter func (fs *FileSystem) GetBasePath(a *ApiClient) string { return "fileSystems" } diff --git a/pkg/wekafs/apiclient/interfacegroup.go b/pkg/wekafs/apiclient/interfacegroup.go index 5d3a1d0bf..82ce4f604 100644 --- a/pkg/wekafs/apiclient/interfacegroup.go +++ b/pkg/wekafs/apiclient/interfacegroup.go @@ -41,6 +41,7 @@ func (i *InterfaceGroup) GetType() string { return "interfaceGroup" } +//goland:noinspection GoUnusedParameter func (i *InterfaceGroup) GetBasePath(client *ApiClient) string { return "interfaceGroups" } diff --git a/pkg/wekafs/apiclient/nfs.go b/pkg/wekafs/apiclient/nfs.go index b44fbb1f4..838955a9f 100644 --- a/pkg/wekafs/apiclient/nfs.go +++ b/pkg/wekafs/apiclient/nfs.go @@ -34,6 +34,7 @@ func (n NfsVersionString) AsWeka() NfsVersionString { type NfsAuthType string +//goland:noinspection GoUnusedConst const ( NfsPermissionTypeReadWrite NfsPermissionType = "RW" NfsPermissionTypeReadOnly NfsPermissionType = "RO" @@ -76,13 +77,14 @@ func (n *NfsPermission) GetType() string { return "nfsPermission" } +//goland:noinspection GoUnusedParameter func (n *NfsPermission) GetBasePath(a *ApiClient) string { return "nfs/permissions" } func (n *NfsPermission) GetApiUrl(a *ApiClient) string { url, err := urlutil.URLJoin(n.GetBasePath(a), n.Uid.String()) - if err != nil { + if err == nil { return url } return "" @@ -106,16 +108,6 @@ func (n *NfsPermission) IsEligibleForCsi() bool { n.SquashMode == NfsPermissionSquashModeNone } -func (a *ApiClient) GetNfsPermissions(ctx context.Context, fsUid uuid.UUID, permissions *[]NfsPermission) error { - n := &NfsPermission{} - - err := a.Get(ctx, n.GetBasePath(a), nil, permissions) - if err != nil { - return err - } - return nil -} - func (a *ApiClient) FindNfsPermissionsByFilter(ctx context.Context, query *NfsPermission, resultSet *[]NfsPermission) error { op := "FindNfsPermissionsByFilter" ctx, span := otel.Tracer(TracerName).Start(ctx, op) @@ -333,6 +325,7 @@ func (g *NfsClientGroup) GetType() string { return "clientGroup" } +//goland:noinspection GoUnusedParameter func (g *NfsClientGroup) GetBasePath(a *ApiClient) string { return "nfs/clientGroups" } diff --git a/pkg/wekafs/apiclient/nfs_test.go b/pkg/wekafs/apiclient/nfs_test.go index 046447afb..c5606629f 100644 --- a/pkg/wekafs/apiclient/nfs_test.go +++ b/pkg/wekafs/apiclient/nfs_test.go @@ -16,9 +16,9 @@ var fsName string var client *ApiClient func TestMain(m *testing.M) { - flag.StringVar(&endpoint, "api-endpoint", "vm125-1726039130891528-0.lan:14000", "API endpoint for tests") + flag.StringVar(&endpoint, "api-endpoint", "localhost:14000", "API endpoint for tests") flag.StringVar(&creds.Username, "api-username", "admin", "API username for tests") - flag.StringVar(&creds.Password, "api-password", "AAbb1234", "API password for tests") + flag.StringVar(&creds.Password, "api-password", "Qwerty1@", "API password for tests") flag.StringVar(&creds.Organization, "api-org", "Root", "API org for tests") flag.StringVar(&creds.HttpScheme, "api-scheme", "https", "API scheme for tests") flag.StringVar(&fsName, "fs-name", "default", "Filesystem name for tests") @@ -81,16 +81,27 @@ func GetApiClientForTest(t *testing.T) *ApiClient { // assert.NoError(t, err) // assert.NotNil(t, result) //} -// -//func TestGetNfsPermissionsByFilesystemName(t *testing.T) { -// apiClient := GetApiClientForTest(t) -// -// -// var permissions []NfsPermission -// err := apiClient.GetNfsPermissionsByFilesystemName(context.Background(), "fs1", &permissions) -// assert.NoError(t, err) -// assert.NotEmpty(t, permissions) -//} + +func TestFindNfsPermissionsByFilesystemName(t *testing.T) { + apiClient := GetApiClientForTest(t) + + var permissions []NfsPermission + err := apiClient.FindNfsPermissionsByFilesystem(context.Background(), "snapvolFilesystem", &permissions) + assert.NoError(t, err) + assert.NotEmpty(t, permissions) + if len(permissions) > 0 { + for _, p := range permissions { + r := &NfsPermissionDeleteRequest{Uid: p.Uid} + err := apiClient.DeleteNfsPermission(context.Background(), r) + assert.NoError(t, err) + } + } + err = apiClient.FindNfsPermissionsByFilesystem(context.Background(), "snapvolFilesystem", &permissions) + assert.NoError(t, err) + assert.Empty(t, permissions) + +} + // //func TestGetNfsPermissionByUid(t *testing.T) { // apiClient := GetApiClientForTest(t) diff --git a/pkg/wekafs/apiclient/quota.go b/pkg/wekafs/apiclient/quota.go index 943d0aa1e..27288d70f 100644 --- a/pkg/wekafs/apiclient/quota.go +++ b/pkg/wekafs/apiclient/quota.go @@ -17,14 +17,17 @@ import ( type QuotaType string type QuotaStatus string -const QuotaTypeHard QuotaType = "HARD" -const QuotaTypeSoft QuotaType = "SOFT" -const QuotaTypeDefault = QuotaTypeHard -const QuotaStatusActive = "ACTIVE" -const QuotaStatusPending = "ADDING" -const QuotaStatusError = "ERROR" -const QuotaStatusDeleting = "DELETING" -const MaxQuotaSize uint64 = 9223372036854775807 +//goland:noinspection GoUnusedConst +const ( + QuotaTypeHard QuotaType = "HARD" + QuotaTypeSoft QuotaType = "SOFT" + QuotaTypeDefault = QuotaTypeHard + QuotaStatusActive = "ACTIVE" + QuotaStatusPending = "ADDING" + QuotaStatusError = "ERROR" + QuotaStatusDeleting = "DELETING" + MaxQuotaSize uint64 = 9223372036854775807 +) type Quota struct { FilesystemUid uuid.UUID `json:"-"` diff --git a/pkg/wekafs/apiclient/snapshot.go b/pkg/wekafs/apiclient/snapshot.go index 4bc415c5b..9da559570 100644 --- a/pkg/wekafs/apiclient/snapshot.go +++ b/pkg/wekafs/apiclient/snapshot.go @@ -168,6 +168,7 @@ func (snap *Snapshot) GetType() string { return "snapshot" } +//goland:noinspection GoUnusedParameter func (snap *Snapshot) GetBasePath(a *ApiClient) string { return "snapshots" } diff --git a/pkg/wekafs/apiclient/utils.go b/pkg/wekafs/apiclient/utils.go index 9eede7da8..0275d8058 100644 --- a/pkg/wekafs/apiclient/utils.go +++ b/pkg/wekafs/apiclient/utils.go @@ -156,7 +156,7 @@ func GetNodeIpAddressByRouting(targetHost string) (string, error) { if err != nil { return "", err } - defer conn.Close() + defer func() { _ = conn.Close() }() // Set a deadline for the connection err = conn.SetDeadline(time.Now().Add(1 * time.Second)) diff --git a/pkg/wekafs/identityserver.go b/pkg/wekafs/identityserver.go index 132cf746d..191d0ff25 100644 --- a/pkg/wekafs/identityserver.go +++ b/pkg/wekafs/identityserver.go @@ -36,6 +36,7 @@ type identityServer struct { config *DriverConfig } +//goland:noinspection GoExportedFuncWithUnexportedType func NewIdentityServer(name, version string, config *DriverConfig) *identityServer { return &identityServer{ name: name, @@ -44,6 +45,7 @@ func NewIdentityServer(name, version string, config *DriverConfig) *identityServ } } +//goland:noinspection GoUnusedParameter func (ids *identityServer) GetPluginInfo(ctx context.Context, req *csi.GetPluginInfoRequest) (*csi.GetPluginInfoResponse, error) { op := "GetPluginInfo" result := "SUCCESS" @@ -78,6 +80,7 @@ func (ids *identityServer) getConfig() *DriverConfig { return ids.config } +//goland:noinspection GoUnusedParameter func (ids *identityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*csi.ProbeResponse, error) { logger := log.Ctx(ctx) isReady := ids.getConfig().isInDevMode() || isWekaInstalled() @@ -96,6 +99,7 @@ func (ids *identityServer) Probe(ctx context.Context, req *csi.ProbeRequest) (*c }, nil } +//goland:noinspection GoUnusedParameter func (ids *identityServer) GetPluginCapabilities(ctx context.Context, req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) { op := "GetPluginCapabilities" result := "SUCCESS" diff --git a/pkg/wekafs/nfsmount.go b/pkg/wekafs/nfsmount.go index 9abf05286..b2fb8cff8 100644 --- a/pkg/wekafs/nfsmount.go +++ b/pkg/wekafs/nfsmount.go @@ -173,7 +173,7 @@ func (m *nfsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient, } if apiClient.EnsureNfsPermissions(ctx, nodeIP, m.fsName, apiclient.NfsVersionV4, m.clientGroupName) != nil { - logger.Error().Msg("Failed to ensure NFS permissions") + logger.Error().Err(err).Msg("Failed to ensure NFS permissions") return errors.New("failed to ensure NFS permissions") } diff --git a/pkg/wekafs/nodeserver.go b/pkg/wekafs/nodeserver.go index 0d66991a4..321961ff6 100644 --- a/pkg/wekafs/nodeserver.go +++ b/pkg/wekafs/nodeserver.go @@ -92,8 +92,16 @@ func (ns *NodeServer) NodeGetVolumeStats(ctx context.Context, req *csi.NodeGetVo volumePath := req.GetVolumePath() // Validate request fields - if volumeID == "" || volumePath == "" { - return nil, status.Error(codes.InvalidArgument, "Volume ID and path must be provided") + if volumeID == "" { + return nil, status.Error(codes.InvalidArgument, "Volume ID must be provided") + } + if volumePath == "" { + return nil, status.Error(codes.InvalidArgument, "Volume path must be provided") + } + if req.GetStagingTargetPath() != "" { + if !PathExists(req.GetStagingTargetPath()) { + return nil, status.Error(codes.NotFound, "Staging area path not found") + } } // Check if the volume path exists @@ -185,7 +193,6 @@ func NewNodeServer(nodeId string, maxVolumesPerNode int64, api *ApiStore, mounte csi.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER, csi.NodeServiceCapability_RPC_GET_VOLUME_STATS, csi.NodeServiceCapability_RPC_VOLUME_CONDITION, - //csi.NodeServiceCapability_RPC_EXPAND_VOLUME, }, ), nodeID: nodeId, @@ -454,6 +461,10 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu } else { msg := fmt.Sprintf("Directory %s exists, but not a weka mount, assuming already unpublished", targetPath) logger.Warn().Msg(msg) + if err := os.Remove(targetPath); err != nil { + result = "FAILURE" + return NodeUnpublishVolumeError(ctx, codes.Internal, err.Error()) + } result = "SUCCESS_WITH_WARNING" return &csi.NodeUnpublishVolumeResponse{}, nil } @@ -475,75 +486,14 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return &csi.NodeUnpublishVolumeResponse{}, nil } -func NodeStageVolumeError(ctx context.Context, errorCode codes.Code, errorMessage string) (*csi.NodeStageVolumeResponse, error) { - err := status.Error(errorCode, strings.ToLower(errorMessage)) - log.Ctx(ctx).Err(err).CallerSkipFrame(1).Msg("Error staging volume") - return &csi.NodeStageVolumeResponse{}, err -} - +//goland:noinspection GoUnusedParameter func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { - op := "NodeStageVolume" - ctx, span := otel.Tracer(TracerName).Start(ctx, op, trace.WithNewRoot()) - defer span.End() - ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) - - volumeId := req.GetVolumeId() - logger := log.Ctx(ctx) - result := "FAILURE" - logger.Info().Str("volume_id", volumeId).Msg(">>>> Received request") - defer func() { - level := zerolog.InfoLevel - if result != "SUCCESS" { - level = zerolog.ErrorLevel - } - logger.WithLevel(level).Str("result", result).Msg("<<<< Completed processing request") - }() - - // Check arguments - if len(req.GetStagingTargetPath()) == 0 { - return NodeStageVolumeError(ctx, codes.InvalidArgument, "Target path missing in request") - } - - if req.GetVolumeCapability() == nil { - return NodeStageVolumeError(ctx, codes.InvalidArgument, "Error occured, volume Capability missing in request") - } - - if req.GetVolumeCapability().GetBlock() != nil { - return NodeStageVolumeError(ctx, codes.InvalidArgument, "Block accessType is unsupported") - } - result = "SUCCESS" - return &csi.NodeStageVolumeResponse{}, nil -} - -func NodeUnstageVolumeError(ctx context.Context, errorCode codes.Code, errorMessage string) (*csi.NodeUnstageVolumeResponse, error) { - err := status.Error(errorCode, strings.ToLower(errorMessage)) - log.Ctx(ctx).Err(err).CallerSkipFrame(1).Msg("Error unstaging volume") - return &csi.NodeUnstageVolumeResponse{}, err + return nil, status.Error(codes.Unimplemented, "NodeStageVolume is not supported") } +//goland:noinspection GoUnusedParameter func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { - op := "NodeUnstageVolume" - result := "FAILURE" - volumeId := req.GetVolumeId() - ctx, span := otel.Tracer(TracerName).Start(ctx, op, trace.WithNewRoot()) - defer span.End() - ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) - - logger := log.Ctx(ctx) - logger.Info().Str("volume_id", volumeId).Msg(">>>> Received request") - defer func() { - level := zerolog.InfoLevel - if result != "SUCCESS" { - level = zerolog.ErrorLevel - } - logger.WithLevel(level).Str("result", result).Msg("<<<< Completed processing request") - }() - - if len(req.GetStagingTargetPath()) == 0 { - return NodeUnstageVolumeError(ctx, codes.InvalidArgument, "Target path missing in request") - } - result = "SUCCESS" - return &csi.NodeUnstageVolumeResponse{}, nil + return nil, status.Error(codes.Unimplemented, "NodeUnstageVolume is not supported") } //goland:noinspection GoUnusedParameter diff --git a/pkg/wekafs/server.go b/pkg/wekafs/server.go index 440c107dc..9877505bb 100644 --- a/pkg/wekafs/server.go +++ b/pkg/wekafs/server.go @@ -35,6 +35,7 @@ const ( xattrVolumeName = "user.weka_k8s_volname" ) +//goland:noinspection GoExportedFuncWithUnexportedType func NewNonBlockingGRPCServer(mode CsiPluginMode) *nonBlockingGRPCServer { return &nonBlockingGRPCServer{ csiMmode: mode, diff --git a/pkg/wekafs/utilities.go b/pkg/wekafs/utilities.go index 07b77d315..cdb3a7eb1 100644 --- a/pkg/wekafs/utilities.go +++ b/pkg/wekafs/utilities.go @@ -281,6 +281,7 @@ func fileExists(filename string) bool { return false } +//goland:noinspection GoUnusedParameter func PathIsWekaMount(ctx context.Context, path string) bool { file, err := os.Open("/proc/mounts") if err != nil { diff --git a/pkg/wekafs/volume.go b/pkg/wekafs/volume.go index e6df52f09..9a3305fb7 100644 --- a/pkg/wekafs/volume.go +++ b/pkg/wekafs/volume.go @@ -59,6 +59,7 @@ type Volume struct { server AnyServer } +//goland:noinspection GoUnusedParameter func (v *Volume) getCsiContentSource(ctx context.Context) *csi.VolumeContentSource { if v.srcVolume != nil { return &csi.VolumeContentSource{ @@ -103,10 +104,12 @@ func (v *Volume) pruneUnsupportedMountOptions(ctx context.Context) { } } +//goland:noinspection GoUnusedParameter func (v *Volume) setMountOptions(ctx context.Context, mountOptions MountOptions) { v.mountOptions.Merge(mountOptions, v.server.getConfig().mutuallyExclusiveOptions) } +//goland:noinspection GoUnusedParameter func (v *Volume) getMountOptions(ctx context.Context) MountOptions { return v.mountOptions } @@ -643,6 +646,8 @@ func (v *Volume) getInnerPath() string { } // GetFullPath returns a full path on which volume is accessible including snapshot subdir and inner path +// +//goland:noinspection GoUnusedParameter func (v *Volume) GetFullPath(ctx context.Context) string { mountParts := []string{v.mountPath} if v.isOnSnapshot() { @@ -1513,6 +1518,8 @@ func (v *Volume) waitForSnapshotDeletion(ctx context.Context, logger zerolog.Log // ObtainRequestParams takes additional optional params from storage class params and applies them to Volume object // those params then need to be set during actual volume creation via UpdateParams function +// +//goland:noinspection GoUnusedParameter func (v *Volume) ObtainRequestParams(ctx context.Context, params map[string]string) error { // set explicit mount options if were passed in storageclass if val, ok := params["mountOptions"]; ok { @@ -1578,10 +1585,10 @@ func (v *Volume) CreateSnapshot(ctx context.Context, name string) (*Snapshot, er ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx) s, err := NewSnapshotFromVolumeCreate(ctx, name, v, v.apiClient, v.server) - logger := log.Ctx(ctx).With().Str("volume_id", v.GetId()).Str("snapshot_id", s.GetId()).Logger() if err != nil { return &Snapshot{}, err } + logger := log.Ctx(ctx).With().Str("volume_id", v.GetId()).Str("snapshot_id", s.GetId()).Logger() // check if snapshot with this name already exists exists, err := s.Exists(ctx) if err != nil { diff --git a/pkg/wekafs/volumeconstructors.go b/pkg/wekafs/volumeconstructors.go index b74eeb62c..dfc463285 100644 --- a/pkg/wekafs/volumeconstructors.go +++ b/pkg/wekafs/volumeconstructors.go @@ -139,7 +139,7 @@ func NewVolumeForBlankVolumeRequest(ctx context.Context, req *csi.CreateVolumeRe if client == nil && !cs.config.alwaysAllowSnapshotVolumes { return nil, status.Error(codes.FailedPrecondition, "Quota enforcement is supported only with API-bound volumes") } - if !client.SupportsQuotaOnSnapshots() && !cs.config.alwaysAllowSnapshotVolumes { + if client != nil && !client.SupportsQuotaOnSnapshots() && !cs.config.alwaysAllowSnapshotVolumes { return nil, status.Error(codes.FailedPrecondition, "Quota enforcement is not supported for snapshot-backed volumes by current Weka software version, please upgrade Weka cluster") } snapName = generateWekaSnapNameForSnapBasedVol(cs.getConfig().VolumePrefix, requestedVolumeName)