From ac5a71b48a19255e60119bf6db54f17247314cf1 Mon Sep 17 00:00:00 2001 From: Sergey Berezansky Date: Tue, 23 Jul 2024 12:47:37 +0300 Subject: [PATCH] fix(CSI-224): race condition on multiple volume deletion in parallel --- pkg/wekafs/gc.go | 14 ++++++++++---- pkg/wekafs/utilities.go | 2 +- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/pkg/wekafs/gc.go b/pkg/wekafs/gc.go index beb102dcf..891a56e0a 100644 --- a/pkg/wekafs/gc.go +++ b/pkg/wekafs/gc.go @@ -58,6 +58,7 @@ func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) { path, err, unmount := gc.mounter.Mount(ctx, fsName, volume.apiClient) defer unmount() volumeTrashLoc := filepath.Join(path, garbagePath) + gc.Lock() if err := os.MkdirAll(volumeTrashLoc, DefaultVolumePermissions); err != nil { logger.Error().Err(err).Msg("Failed to create garbage collector directory") } else { @@ -70,6 +71,7 @@ func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) { logger.Error().Err(err).Str("full_path", fullPath). Str("volume_trash_location", volumeTrashLoc).Msg("Failed to move volume contents to volumeTrashLoc") } + gc.Unlock() // NOTE: there is a problem of directory leaks here. If the volume innerPath is deeper than /csi-volumes/vol-name, // e.g. if using statically provisioned volume, we move only the deepest directory // so if the volume is dir/v1//this/is/a/path/to/volume, we might move only the `volume` @@ -81,7 +83,7 @@ func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) { logger.Error().Err(err).Msg("Failed to mount filesystem for GC processing") return } - if err := purgeDirectory(ctx, volumeTrashLoc); err != nil { + if err := purgeDirectory(ctx, volumeTrashLoc, volumeTrashLoc); err != nil { logger.Error().Err(err).Str("purge_path", volumeTrashLoc).Msg("Failed to remove directory") return } @@ -89,7 +91,7 @@ func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) { logger.Debug().Msg("Volume purged") } -func purgeDirectory(ctx context.Context, path string) error { +func purgeDirectory(ctx context.Context, path string, rootPath string) error { logger := log.Ctx(ctx).With().Str("path", path).Logger() if !PathExists(path) { logger.Error().Str("path", path).Msg("Failed to remove existing directory") @@ -104,7 +106,7 @@ func purgeDirectory(ctx context.Context, path string) error { for _, f := range files { fp := filepath.Join(path, f.Name()) if f.IsDir() { - if err := purgeDirectory(ctx, fp); err != nil { + if err := purgeDirectory(ctx, fp, rootPath); err != nil { logger.Error().Err(err).Msg("") return err } @@ -113,7 +115,11 @@ func purgeDirectory(ctx context.Context, path string) error { } } } - return os.Remove(path) + if path != rootPath { + // we only want to remove the directory if it's not the root directory (always retain the .__weka_async_delete) + return os.Remove(path) + } + return nil } func (gc *innerPathVolGc) purgeLeftovers(ctx context.Context, fs string, apiClient *apiclient.ApiClient) { diff --git a/pkg/wekafs/utilities.go b/pkg/wekafs/utilities.go index 0a33de5e2..6153f9240 100644 --- a/pkg/wekafs/utilities.go +++ b/pkg/wekafs/utilities.go @@ -273,7 +273,7 @@ func PathExists(p string) bool { func PathIsWekaMount(ctx context.Context, path string) bool { log.Ctx(ctx).Trace().Str("full_path", path).Msg("Checking if path is wekafs mount") - mountcmd := "mount -t wekafs | grep " + path + mountcmd := "mount -t wekafs | grep -w" + path res, _ := exec.Command("sh", "-c", mountcmd).Output() return strings.Contains(string(res), path) }