Skip to content

Commit

Permalink
fix(CSI-270): filesystem-backed volumes cannot be deleted due to stal…
Browse files Browse the repository at this point in the history
…e NFS permissions
  • Loading branch information
sergeyberezansky committed Sep 27, 2024
1 parent fc73750 commit 7325dae
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 4 deletions.
22 changes: 22 additions & 0 deletions pkg/wekafs/apiclient/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,28 @@ func (a *ApiClient) DeleteFileSystem(ctx context.Context, r *FileSystemDeleteReq
return nil
}

func (a *ApiClient) EnsureNoNfsPermissionsForFilesystem(ctx context.Context, fsName string) error {
logger := log.Ctx(ctx)
logger.Trace().Str("filesystem", fsName).Msg("Ensuring no NFS permissions for filesystem")
permissions := &[]NfsPermission{}
err := a.FindNfsPermissionsByFilesystem(ctx, fsName, permissions)
if err != nil {
logger.Error().Err(err).Str("filesystem", fsName).Msg("Failed to list NFS permissions")
}
for _, p := range *permissions {
err = a.DeleteNfsPermission(ctx, &NfsPermissionDeleteRequest{Uid: p.Uid})
if err != nil {
logger.Error().Err(err).Str("permission", p.Uid.String()).Str("filesystem", p.Filesystem).Str("client_group", p.Group).Msg("Failed to delete NFS permission")
return err
}
}
if len(*permissions) > 0 {
time.Sleep(time.Second * 5) // wait for NFS permissions reconfiguration
logger.Trace().Str("filesystem", fsName).Msg("Deleted NFS permissions")
}
return nil
}

func (a *ApiClient) GetFileSystemMountToken(ctx context.Context, r *FileSystemMountTokenRequest, token *FileSystemMountToken) error {
op := "GetFileSystemMountToken"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
Expand Down
88 changes: 85 additions & 3 deletions pkg/wekafs/apiclient/nfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"k8s.io/helm/pkg/urlutil"
"strconv"
"strings"
"time"
)

type NfsPermissionType string
Expand Down Expand Up @@ -134,6 +135,26 @@ func (a *ApiClient) FindNfsPermissionsByFilter(ctx context.Context, query *NfsPe
return nil
}

func (a *ApiClient) FindNfsPermissionsByFilesystem(ctx context.Context, fsName string, resultSet *[]NfsPermission) error {
op := "FindNfsPermissionsByFilter"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
ret := &[]NfsPermission{}
query := &NfsPermission{Filesystem: fsName}
q, _ := qs.Values(query)
err := a.Get(ctx, query.GetBasePath(a), q, ret)
if err != nil {
return err
}
for _, r := range *ret {
if r.Filesystem == query.Filesystem {
*resultSet = append(*resultSet, r)
}
}
return nil
}

// GetNfsPermissionByFilter expected to return exactly one result of FindNfsPermissionsByFilter (error)
func (a *ApiClient) GetNfsPermissionByFilter(ctx context.Context, query *NfsPermission) (*NfsPermission, error) {
rs := &[]NfsPermission{}
Expand Down Expand Up @@ -199,16 +220,22 @@ func (a *ApiClient) CreateNfsPermission(ctx context.Context, r *NfsPermissionCre
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
logger := log.Ctx(ctx).With().Str("nfs_permission", r.String()).Logger()
if !r.hasRequiredFields() {
return RequestMissingParams
}
payload, err := json.Marshal(r)
if err != nil {
logger.Error().Err(err).Msg("Failed to marshal request")
return err
}

logger.Trace().Msg("Creating permission")
err = a.Post(ctx, r.getRelatedObject().GetBasePath(a), &payload, nil, p)
return err
if err != nil {
logger.Error().Err(err).Msg("Failed to create NFS permission")
return err
}
return nil
}

func EnsureNfsPermission(ctx context.Context, fsName string, group string, version NfsVersionString, apiClient *ApiClient) error {
Expand All @@ -235,12 +262,66 @@ func EnsureNfsPermission(ctx context.Context, fsName string, group string, versi
AnonUid: 65534,
SupportedVersions: &[]string{string(NfsVersionV4)},
}
return apiClient.CreateNfsPermission(ctx, req, perm)
if err := apiClient.CreateNfsPermission(ctx, req, perm); err != nil {
return err
}
time.Sleep(5 * time.Second) // wait for the permission to be applied
return nil
}
}
return err
}

type NfsPermissionDeleteRequest struct {
Uid uuid.UUID `json:"-"`
}

func (pd *NfsPermissionDeleteRequest) getApiUrl(a *ApiClient) string {
return pd.getRelatedObject().GetApiUrl(a)
}

func (pd *NfsPermissionDeleteRequest) getRelatedObject() ApiObject {
return &NfsPermission{Uid: pd.Uid}
}

func (pd *NfsPermissionDeleteRequest) getRequiredFields() []string {
return []string{"Uid"}
}

func (pd *NfsPermissionDeleteRequest) hasRequiredFields() bool {
return ObjectRequestHasRequiredFields(pd)
}

func (pd *NfsPermissionDeleteRequest) String() string {
return fmt.Sprintln("NfsPermissionDeleteRequest(uid:", pd.Uid)
}

func (a *ApiClient) DeleteNfsPermission(ctx context.Context, r *NfsPermissionDeleteRequest) error {
op := "DeleteNfsPermission"
ctx, span := otel.Tracer(TracerName).Start(ctx, op)
defer span.End()
ctx = log.With().Str("trace_id", span.SpanContext().TraceID().String()).Str("span_id", span.SpanContext().SpanID().String()).Str("op", op).Logger().WithContext(ctx)
if !r.hasRequiredFields() {
return RequestMissingParams
}
apiResponse := &ApiResponse{}
err := a.Delete(ctx, r.getApiUrl(a), nil, nil, apiResponse)
if err != nil {
switch t := err.(type) {
case *ApiNotFoundError:
return ObjectNotFoundError
case *ApiBadRequestError:
for _, c := range t.ApiResponse.ErrorCodes {
if c == "PermissionDoesNotExistException" {
return ObjectNotFoundError
}
}
}
}
time.Sleep(5 * time.Second) // wait for the permission to be removed
return nil
}

type NfsClientGroup struct {
Uid uuid.UUID `json:"uid,omitempty" url:"-"`
Rules []NfsClientGroupRule `json:"rules,omitempty" url:"-"`
Expand Down Expand Up @@ -739,6 +820,7 @@ func (a *ApiClient) EnsureNfsPermissions(ctx context.Context, ip string, fsName
logger.Trace().Str("ip_address", ip).Msg("Ensuring NFS Client Group Rule for IP")
err = a.EnsureNfsClientGroupRuleForIp(ctx, cg, ip)
if err != nil {
logger.Error().Err(err).Str("ip_address", ip).Msg("Failed to ensure NFS client group rule for IP")
return err
}
// Ensure NFS permission
Expand Down
11 changes: 10 additions & 1 deletion pkg/wekafs/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -1364,14 +1364,23 @@ func (v *Volume) deleteFilesystem(ctx context.Context) error {
logger.Debug().Str("filesystem", v.FilesystemName).Msg("Deleting filesystem")
fsObj, err := v.getFilesystemObj(ctx)
if err != nil {
return status.Errorf(codes.Internal, "Failed to delete filesystem %s", v.FilesystemName)
logger.Error().Err(err).Str("filesystem", v.FilesystemName).Msg("Failed to fetch filesystem for deletion")
return status.Errorf(codes.Internal, "Failed to fetch filesystem for deletion %s", v.FilesystemName)
}
if fsObj == nil || fsObj.Uid == uuid.Nil {
logger.Warn().Str("filesystem", v.FilesystemName).Msg("Apparently filesystem not exists, returning OK")
// FS doesn't exist already, return OK for idempotence
return nil
}
if !fsObj.IsRemoving { // if filesystem is already removing, just wait
if v.server.getMounter().getTransport() == dataTransportNfs {
logger.Trace().Str("filesystem", v.FilesystemName).Msg("Ensuring no NFS permissions exist that could block filesystem deletion")
err := v.apiClient.EnsureNoNfsPermissionsForFilesystem(ctx, fsObj.Name)
if err != nil {
logger.Error().Str("filesystem", v.FilesystemName).Err(err).Msg("Failed to remove NFS permissions, cannot delete filesystem")
return err
}
}
logger.Trace().Str("filesystem", v.FilesystemName).Msg("Attempting deletion of filesystem")
fsd := &apiclient.FileSystemDeleteRequest{Uid: fsObj.Uid}
err = v.apiClient.DeleteFileSystem(ctx, fsd)
Expand Down

0 comments on commit 7325dae

Please sign in to comment.