Skip to content

Commit

Permalink
fix(CSI-224): race condition on multiple volume deletion in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeyberezansky committed Jul 23, 2024
1 parent 30c6c34 commit ac5a71b
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
14 changes: 10 additions & 4 deletions pkg/wekafs/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) {
path, err, unmount := gc.mounter.Mount(ctx, fsName, volume.apiClient)
defer unmount()
volumeTrashLoc := filepath.Join(path, garbagePath)
gc.Lock()
if err := os.MkdirAll(volumeTrashLoc, DefaultVolumePermissions); err != nil {
logger.Error().Err(err).Msg("Failed to create garbage collector directory")
} else {
Expand All @@ -70,6 +71,7 @@ func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) {
logger.Error().Err(err).Str("full_path", fullPath).
Str("volume_trash_location", volumeTrashLoc).Msg("Failed to move volume contents to volumeTrashLoc")
}
gc.Unlock()
// NOTE: there is a problem of directory leaks here. If the volume innerPath is deeper than /csi-volumes/vol-name,
// e.g. if using statically provisioned volume, we move only the deepest directory
// so if the volume is dir/v1/<filesystem>/this/is/a/path/to/volume, we might move only the `volume`
Expand All @@ -81,15 +83,15 @@ func (gc *innerPathVolGc) purgeVolume(ctx context.Context, volume *Volume) {
logger.Error().Err(err).Msg("Failed to mount filesystem for GC processing")
return
}
if err := purgeDirectory(ctx, volumeTrashLoc); err != nil {
if err := purgeDirectory(ctx, volumeTrashLoc, volumeTrashLoc); err != nil {
logger.Error().Err(err).Str("purge_path", volumeTrashLoc).Msg("Failed to remove directory")
return
}

logger.Debug().Msg("Volume purged")
}

func purgeDirectory(ctx context.Context, path string) error {
func purgeDirectory(ctx context.Context, path string, rootPath string) error {
logger := log.Ctx(ctx).With().Str("path", path).Logger()
if !PathExists(path) {
logger.Error().Str("path", path).Msg("Failed to remove existing directory")
Expand All @@ -104,7 +106,7 @@ func purgeDirectory(ctx context.Context, path string) error {
for _, f := range files {
fp := filepath.Join(path, f.Name())
if f.IsDir() {
if err := purgeDirectory(ctx, fp); err != nil {
if err := purgeDirectory(ctx, fp, rootPath); err != nil {
logger.Error().Err(err).Msg("")
return err
}
Expand All @@ -113,7 +115,11 @@ func purgeDirectory(ctx context.Context, path string) error {
}
}
}
return os.Remove(path)
if path != rootPath {
// we only want to remove the directory if it's not the root directory (always retain the .__weka_async_delete)
return os.Remove(path)
}
return nil
}

func (gc *innerPathVolGc) purgeLeftovers(ctx context.Context, fs string, apiClient *apiclient.ApiClient) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/wekafs/utilities.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ func PathExists(p string) bool {

func PathIsWekaMount(ctx context.Context, path string) bool {
log.Ctx(ctx).Trace().Str("full_path", path).Msg("Checking if path is wekafs mount")
mountcmd := "mount -t wekafs | grep " + path
mountcmd := "mount -t wekafs | grep -w" + path
res, _ := exec.Command("sh", "-c", mountcmd).Output()
return strings.Contains(string(res), path)
}
Expand Down

0 comments on commit ac5a71b

Please sign in to comment.