From 1322aa9e2e9f67655a64eb0c175b6b4285bebb3c Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 1 Jun 2023 22:44:29 +0200 Subject: [PATCH 01/12] Use filer in sync command This deprecates usage of the `repofiles` package in favor of the filer package and consolidates the code paths into WSFS. Note: one potentially breaking change here is the following. If a file at `foo/bar.txt` is created and removed, the directory `foo` is kept around because we do not perform directory tracking. If subsequently we need to write a file at `foo`, it will result in an `fs.ErrExist` because it is impossible to overwrite a directory. The previous implementation performed a recursive delete of the path if this happened, where this implementation will return the `fs.ErrExist` error to the user. We can mitigate this in one of two ways: * Track directories to remove as part of a `diff` and remove them * Attempt to remove an empty directory tree if we see this error * ...? --- libs/sync/repofiles/repofiles.go | 159 -------------------------- libs/sync/repofiles/repofiles_test.go | 88 -------------- libs/sync/sync.go | 23 ++-- libs/sync/watchdog.go | 16 ++- 4 files changed, 27 insertions(+), 259 deletions(-) delete mode 100644 libs/sync/repofiles/repofiles.go delete mode 100644 libs/sync/repofiles/repofiles_test.go diff --git a/libs/sync/repofiles/repofiles.go b/libs/sync/repofiles/repofiles.go deleted file mode 100644 index 8fcabc113e..0000000000 --- a/libs/sync/repofiles/repofiles.go +++ /dev/null @@ -1,159 +0,0 @@ -package repofiles - -import ( - "context" - "errors" - "fmt" - "net/http" - "net/url" - "os" - "path" - "path/filepath" - "strings" - - "github.com/databricks/databricks-sdk-go" - "github.com/databricks/databricks-sdk-go/apierr" - "github.com/databricks/databricks-sdk-go/client" - "github.com/databricks/databricks-sdk-go/service/workspace" -) - -// RepoFiles wraps reading and writing into a remote repo with safeguards to prevent -// accidental deletion of repos and more robust methods to overwrite workspace files -type RepoFiles struct { - repoRoot string - localRoot string - workspaceClient *databricks.WorkspaceClient -} - -func Create(repoRoot, localRoot string, workspaceClient *databricks.WorkspaceClient) *RepoFiles { - return &RepoFiles{ - repoRoot: repoRoot, - localRoot: localRoot, - workspaceClient: workspaceClient, - } -} - -func (r *RepoFiles) remotePath(relativePath string) (string, error) { - fullPath := path.Join(r.repoRoot, relativePath) - cleanFullPath := path.Clean(fullPath) - if !strings.HasPrefix(cleanFullPath, r.repoRoot) { - return "", fmt.Errorf("relative file path is not inside repo root: %s", relativePath) - } - // path.Clean will remove any trailing / so it's enough to check cleanFullPath == r.repoRoot - if cleanFullPath == r.repoRoot { - return "", fmt.Errorf("file path relative to repo root cannot be empty: %s", relativePath) - } - return cleanFullPath, nil -} - -func (r *RepoFiles) readLocal(relativePath string) ([]byte, error) { - localPath := filepath.Join(r.localRoot, relativePath) - return os.ReadFile(localPath) -} - -func (r *RepoFiles) writeRemote(ctx context.Context, relativePath string, content []byte) error { - apiClientConfig := r.workspaceClient.Config - apiClientConfig.HTTPTimeoutSeconds = 600 - apiClient, err := client.New(apiClientConfig) - if err != nil { - return err - } - remotePath, err := r.remotePath(relativePath) - if err != nil { - return err - } - escapedPath := url.PathEscape(strings.TrimLeft(remotePath, "/")) - apiPath := fmt.Sprintf("/api/2.0/workspace-files/import-file/%s?overwrite=true", escapedPath) - - err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) - - // Handling some edge cases when an upload might fail - // - // We cannot do more precise error scoping here because the API does not - // provide descriptive errors yet - // - // TODO: narrow down the error condition scope of this "if" block to only - // trigger for the specific edge cases instead of all errors once the API - // implements them - if err != nil { - // Delete any artifact files incase non overwriteable by the current file - // type and thus are failing the PUT request. - // files, folders and notebooks might not have been cleaned up and they - // can't overwrite each other. If a folder `foo` exists, then attempts to - // PUT a file `foo` will fail - err := r.workspaceClient.Workspace.Delete(ctx, - workspace.Delete{ - Path: remotePath, - Recursive: true, - }, - ) - // ignore RESOURCE_DOES_NOT_EXIST here incase nothing existed at remotePath - var aerr *apierr.APIError - if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" { - err = nil - } - if err != nil { - return err - } - - // Mkdir parent dirs incase they are what's causing the PUT request to - // fail - err = r.workspaceClient.Workspace.MkdirsByPath(ctx, path.Dir(remotePath)) - if err != nil { - return fmt.Errorf("could not mkdir to put file: %s", err) - } - - // Attempt to upload file again after cleanup/setup - err = apiClient.Do(ctx, http.MethodPost, apiPath, content, nil) - if err != nil { - return err - } - } - return nil -} - -func (r *RepoFiles) deleteRemote(ctx context.Context, relativePath string) error { - remotePath, err := r.remotePath(relativePath) - if err != nil { - return err - } - return r.workspaceClient.Workspace.Delete(ctx, - workspace.Delete{ - Path: remotePath, - Recursive: false, - }, - ) -} - -// The API calls for a python script foo.py would be -// `PUT foo.py` -// `DELETE foo.py` -// -// The API calls for a python notebook foo.py would be -// `PUT foo.py` -// `DELETE foo` -// -// The workspace file system backend strips .py from the file name if the python -// file is a notebook -func (r *RepoFiles) PutFile(ctx context.Context, relativePath string) error { - content, err := r.readLocal(relativePath) - if err != nil { - return err - } - - return r.writeRemote(ctx, relativePath, content) -} - -func (r *RepoFiles) DeleteFile(ctx context.Context, relativePath string) error { - err := r.deleteRemote(ctx, relativePath) - - // We explictly ignore RESOURCE_DOES_NOT_EXIST error to make delete idempotent - var aerr *apierr.APIError - if errors.As(err, &aerr) && aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" { - err = nil - } - return nil -} - -// TODO: write integration tests for all non happy path cases that rely on -// specific behaviour of the workspace apis diff --git a/libs/sync/repofiles/repofiles_test.go b/libs/sync/repofiles/repofiles_test.go deleted file mode 100644 index 2a881d90d0..0000000000 --- a/libs/sync/repofiles/repofiles_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package repofiles - -import ( - "os" - "path/filepath" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestRepoFilesRemotePath(t *testing.T) { - repoRoot := "/Repos/doraemon/bar" - repoFiles := Create(repoRoot, "/doraemon/foo/bar", nil) - - remotePath, err := repoFiles.remotePath("a/b/c") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/a/b/c", remotePath) - - remotePath, err = repoFiles.remotePath("a/b/../d") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/a/d", remotePath) - - remotePath, err = repoFiles.remotePath("a/../c") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/c", remotePath) - - remotePath, err = repoFiles.remotePath("a/b/c/.") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/a/b/c", remotePath) - - remotePath, err = repoFiles.remotePath("a/b/c/d/./../../f/g") - assert.NoError(t, err) - assert.Equal(t, repoRoot+"/a/b/f/g", remotePath) - - _, err = repoFiles.remotePath("..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ..`) - - _, err = repoFiles.remotePath("a/../..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: a/../..`) - - _, err = repoFiles.remotePath("./../.") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../.`) - - _, err = repoFiles.remotePath("/./.././..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: /./.././..`) - - _, err = repoFiles.remotePath("./../.") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../.`) - - _, err = repoFiles.remotePath("./..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./..`) - - _, err = repoFiles.remotePath("./../../..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../../..`) - - _, err = repoFiles.remotePath("./../a/./b../../..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ./../a/./b../../..`) - - _, err = repoFiles.remotePath("../..") - assert.ErrorContains(t, err, `relative file path is not inside repo root: ../..`) - - _, err = repoFiles.remotePath(".//a/..//./b/..") - assert.ErrorContains(t, err, `file path relative to repo root cannot be empty`) - - _, err = repoFiles.remotePath("a/b/../..") - assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - - _, err = repoFiles.remotePath("") - assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - - _, err = repoFiles.remotePath(".") - assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") - - _, err = repoFiles.remotePath("/") - assert.ErrorContains(t, err, "file path relative to repo root cannot be empty") -} - -func TestRepoReadLocal(t *testing.T) { - tempDir := t.TempDir() - helloPath := filepath.Join(tempDir, "hello.txt") - err := os.WriteFile(helloPath, []byte("my name is doraemon :P"), os.ModePerm) - assert.NoError(t, err) - - repoFiles := Create("/Repos/doraemon/bar", tempDir, nil) - bytes, err := repoFiles.readLocal("./a/../hello.txt") - assert.NoError(t, err) - assert.Equal(t, "my name is doraemon :P", string(bytes)) -} diff --git a/libs/sync/sync.go b/libs/sync/sync.go index 54d0624e77..5c4c9d8f68 100644 --- a/libs/sync/sync.go +++ b/libs/sync/sync.go @@ -5,9 +5,9 @@ import ( "fmt" "time" + "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/git" "github.com/databricks/cli/libs/log" - "github.com/databricks/cli/libs/sync/repofiles" "github.com/databricks/databricks-sdk-go" ) @@ -29,9 +29,9 @@ type SyncOptions struct { type Sync struct { *SyncOptions - fileSet *git.FileSet - snapshot *Snapshot - repoFiles *repofiles.RepoFiles + fileSet *git.FileSet + snapshot *Snapshot + filer filer.Filer // Synchronization progress events are sent to this event notifier. notifier EventNotifier @@ -77,16 +77,19 @@ func New(ctx context.Context, opts SyncOptions) (*Sync, error) { } } - repoFiles := repofiles.Create(opts.RemotePath, opts.LocalPath, opts.WorkspaceClient) + filer, err := filer.NewWorkspaceFilesClient(opts.WorkspaceClient, opts.RemotePath) + if err != nil { + return nil, err + } return &Sync{ SyncOptions: &opts, - fileSet: fileSet, - snapshot: snapshot, - repoFiles: repoFiles, - notifier: &NopNotifier{}, - seq: 0, + fileSet: fileSet, + snapshot: snapshot, + filer: filer, + notifier: &NopNotifier{}, + seq: 0, }, nil } diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index 3e7acccc2c..94bd9f3384 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -1,8 +1,12 @@ package sync import ( + "bytes" "context" + "os" + "path/filepath" + "github.com/databricks/cli/libs/filer" "golang.org/x/sync/errgroup" ) @@ -21,7 +25,7 @@ func (s *Sync) applyDelete(ctx context.Context, group *errgroup.Group, remoteNam group.Go(func() error { s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0) - err := s.repoFiles.DeleteFile(ctx, remoteName) + err := s.filer.Delete(ctx, remoteName) if err != nil { return err } @@ -42,10 +46,18 @@ func (s *Sync) applyPut(ctx context.Context, group *errgroup.Group, localName st group.Go(func() error { s.notifyProgress(ctx, EventActionPut, localName, 0.0) - err := s.repoFiles.PutFile(ctx, localName) + + contents, err := os.ReadFile(filepath.Join(s.LocalPath, localName)) + if err != nil { + return err + } + + opts := []filer.WriteMode{filer.CreateParentDirectories, filer.OverwriteIfExists} + err = s.filer.Write(ctx, localName, bytes.NewReader(contents), opts...) if err != nil { return err } + s.notifyProgress(ctx, EventActionPut, localName, 1.0) return nil }) From 91a3d4d4f4f2e81b504805f056c41cba81f70fd7 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Thu, 1 Jun 2023 22:51:52 +0200 Subject: [PATCH 02/12] Pass file handle instead of buffer --- libs/sync/watchdog.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index 94bd9f3384..84ccc418d4 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -1,7 +1,6 @@ package sync import ( - "bytes" "context" "os" "path/filepath" @@ -47,13 +46,15 @@ func (s *Sync) applyPut(ctx context.Context, group *errgroup.Group, localName st group.Go(func() error { s.notifyProgress(ctx, EventActionPut, localName, 0.0) - contents, err := os.ReadFile(filepath.Join(s.LocalPath, localName)) + localFile, err := os.Open(filepath.Join(s.LocalPath, localName)) if err != nil { return err } + defer localFile.Close() + opts := []filer.WriteMode{filer.CreateParentDirectories, filer.OverwriteIfExists} - err = s.filer.Write(ctx, localName, bytes.NewReader(contents), opts...) + err = s.filer.Write(ctx, localName, localFile, opts...) if err != nil { return err } From 952d5ecc2c3493bf6a12db76fbcd6e7e3d4714d6 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Fri, 2 Jun 2023 00:08:47 +0200 Subject: [PATCH 03/12] Ignore ErrNotExist on deletes --- libs/sync/watchdog.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index 84ccc418d4..828eb1183d 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -2,6 +2,8 @@ package sync import ( "context" + "errors" + "io/fs" "os" "path/filepath" @@ -25,7 +27,7 @@ func (s *Sync) applyDelete(ctx context.Context, group *errgroup.Group, remoteNam group.Go(func() error { s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0) err := s.filer.Delete(ctx, remoteName) - if err != nil { + if err != nil && !errors.Is(err, fs.ErrNotExist) { return err } s.notifyProgress(ctx, EventActionDelete, remoteName, 1.0) From ccb69806d45b1ae286e317e1b28731e76563910f Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Fri, 2 Jun 2023 00:09:48 +0200 Subject: [PATCH 04/12] Delete directories when they become empty Sync currently doesn't clean up remote empty directories. This change computes the set of directories that have been removed between on an incremental update and removes those as well. --- libs/sync/diff.go | 31 ++++++++++++++++++++++++++++ libs/sync/diff_test.go | 28 +++++++++++++++++++++++++ libs/sync/snapshot.go | 42 ++++++++++++++++++++++++++++++++++++++ libs/sync/snapshot_test.go | 3 ++- libs/sync/watchdog.go | 21 +++++++++++++++---- 5 files changed, 120 insertions(+), 5 deletions(-) create mode 100644 libs/sync/diff_test.go diff --git a/libs/sync/diff.go b/libs/sync/diff.go index 72c1d5aab6..9534bb6e6f 100644 --- a/libs/sync/diff.go +++ b/libs/sync/diff.go @@ -1,5 +1,12 @@ package sync +import ( + "sort" + "strings" + + "golang.org/x/exp/maps" +) + type diff struct { put []string delete []string @@ -8,3 +15,27 @@ type diff struct { func (d diff) IsEmpty() bool { return len(d.put) == 0 && len(d.delete) == 0 } + +func (d diff) GroupDeletesByNestingLevel() [][]string { + // Group the paths to delete by their nesting level. + // We need a directory to be empty before we can remove it, so a file at + // level 5 must be deleted before deleting its directory at level 4. + deletesByLevel := make(map[int][]string) + for _, remoteName := range d.delete { + level := len(strings.Split(remoteName, "/")) + deletesByLevel[level] = append(deletesByLevel[level], remoteName) + } + + // Get a sorted list of nesting levels. + levels := maps.Keys(deletesByLevel) + sort.Ints(levels) + + // Return slice ordered by descending level. + // Each slice contains paths at the same level. + var out [][]string + for i := len(levels) - 1; i >= 0; i-- { + out = append(out, deletesByLevel[levels[i]]) + } + + return out +} diff --git a/libs/sync/diff_test.go b/libs/sync/diff_test.go new file mode 100644 index 0000000000..55b6d370b6 --- /dev/null +++ b/libs/sync/diff_test.go @@ -0,0 +1,28 @@ +package sync + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestDiffGroupDeletesByNestingLevel(t *testing.T) { + d := diff{ + delete: []string{ + "foo/bar/baz1", + "foo/bar1", + "foo/bar/baz2", + "foo/bar2", + "foo1", + "foo2", + }, + } + + expected := [][]string{ + {"foo/bar/baz1", "foo/bar/baz2"}, + {"foo/bar1", "foo/bar2"}, + {"foo1", "foo2"}, + } + + assert.Equal(t, expected, d.GroupDeletesByNestingLevel()) +} diff --git a/libs/sync/snapshot.go b/libs/sync/snapshot.go index 1ea7b18bdb..bf442068f4 100644 --- a/libs/sync/snapshot.go +++ b/libs/sync/snapshot.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "os" + "path" "path/filepath" "strings" "time" @@ -15,6 +16,7 @@ import ( "github.com/databricks/cli/libs/fileset" "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/notebook" + "golang.org/x/exp/maps" ) // Bump it up every time a potentially breaking change is made to the snapshot schema @@ -252,6 +254,46 @@ func (s *Snapshot) diff(ctx context.Context, all []fileset.File) (change diff, e // add them to a delete batch change.delete = append(change.delete, remoteName) } + + // Gather all directories previously present. + previousDirectories := map[string]struct{}{} + for _, localName := range maps.Keys(lastModifiedTimes) { + dir := filepath.ToSlash(filepath.Dir(localName)) + for dir != "." { + if _, ok := previousDirectories[dir]; ok { + break + } + previousDirectories[dir] = struct{}{} + dir = path.Dir(dir) + } + } + + // Gather all directories currently present. + currentDirectories := map[string]struct{}{} + for _, f := range all { + dir := filepath.ToSlash(filepath.Dir(f.Relative)) + for dir != "." { + if _, ok := currentDirectories[dir]; ok { + break + } + currentDirectories[dir] = struct{}{} + dir = path.Dir(dir) + } + } + + // Gather all directories that are no longer present. + for dir := range previousDirectories { + // Look for dir and all its parents. + for dir != "." { + if _, ok := currentDirectories[dir]; ok { + break + } + + change.delete = append(change.delete, dir) + dir = path.Dir(dir) + } + } + // and remove them from the snapshot for _, remoteName := range change.delete { // we do note assert that remoteName exists in remoteToLocalNames since it diff --git a/libs/sync/snapshot_test.go b/libs/sync/snapshot_test.go index 8154b79141..cb7e7e0899 100644 --- a/libs/sync/snapshot_test.go +++ b/libs/sync/snapshot_test.go @@ -147,9 +147,10 @@ func TestFolderDiff(t *testing.T) { assert.NoError(t, err) change, err = state.diff(ctx, files) assert.NoError(t, err) - assert.Len(t, change.delete, 1) + assert.Len(t, change.delete, 2) assert.Len(t, change.put, 0) assert.Contains(t, change.delete, "foo/bar") + assert.Contains(t, change.delete, "foo") } func TestPythonNotebookDiff(t *testing.T) { diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index 828eb1183d..bc9fbd9ea0 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -67,13 +67,26 @@ func (s *Sync) applyPut(ctx context.Context, group *errgroup.Group, localName st } func (s *Sync) applyDiff(ctx context.Context, d diff) error { - group, ctx := errgroup.WithContext(ctx) - group.SetLimit(MaxRequestsInFlight) + // Delete paths in descending order of nesting level, + // while deleting paths at the same level in parallel. + for _, deletes := range d.GroupDeletesByNestingLevel() { + group, ctx := errgroup.WithContext(ctx) + group.SetLimit(MaxRequestsInFlight) - for _, remoteName := range d.delete { - s.applyDelete(ctx, group, remoteName) + for _, remoteName := range deletes { + s.applyDelete(ctx, group, remoteName) + } + + // Wait for goroutines to finish and return first non-nil error return if any. + err := group.Wait() + if err != nil { + return err + } } + group, ctx := errgroup.WithContext(ctx) + group.SetLimit(MaxRequestsInFlight) + for _, localName := range d.put { s.applyPut(ctx, group, localName) } From 5e57dc9c26a07fa8bf95b47443b27126ec909076 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Fri, 2 Jun 2023 00:13:56 +0200 Subject: [PATCH 05/12] Simplify dir diff logic --- libs/sync/snapshot.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/libs/sync/snapshot.go b/libs/sync/snapshot.go index bf442068f4..2f3eacc3dc 100644 --- a/libs/sync/snapshot.go +++ b/libs/sync/snapshot.go @@ -283,14 +283,8 @@ func (s *Snapshot) diff(ctx context.Context, all []fileset.File) (change diff, e // Gather all directories that are no longer present. for dir := range previousDirectories { - // Look for dir and all its parents. - for dir != "." { - if _, ok := currentDirectories[dir]; ok { - break - } - + if _, ok := currentDirectories[dir]; !ok { change.delete = append(change.delete, dir) - dir = path.Dir(dir) } } From b3a57273a504e6beb1da1812e02bf89498dc776a Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Tue, 6 Jun 2023 00:06:18 +0200 Subject: [PATCH 06/12] Track both directories to create and directories to remove --- libs/sync/diff.go | 99 +++++++++++++++++++++++------ libs/sync/diff_test.go | 62 +++++++++++++++--- libs/sync/snapshot.go | 45 ++++++++----- libs/sync/snapshot_test.go | 9 ++- libs/sync/watchdog.go | 127 ++++++++++++++++++++++--------------- 5 files changed, 243 insertions(+), 99 deletions(-) diff --git a/libs/sync/diff.go b/libs/sync/diff.go index 9534bb6e6f..26e99b3483 100644 --- a/libs/sync/diff.go +++ b/libs/sync/diff.go @@ -1,40 +1,99 @@ package sync import ( - "sort" - "strings" - - "golang.org/x/exp/maps" + "path" ) type diff struct { - put []string delete []string + rmdir []string + mkdir []string + put []string } func (d diff) IsEmpty() bool { return len(d.put) == 0 && len(d.delete) == 0 } -func (d diff) GroupDeletesByNestingLevel() [][]string { - // Group the paths to delete by their nesting level. - // We need a directory to be empty before we can remove it, so a file at - // level 5 must be deleted before deleting its directory at level 4. - deletesByLevel := make(map[int][]string) - for _, remoteName := range d.delete { - level := len(strings.Split(remoteName, "/")) - deletesByLevel[level] = append(deletesByLevel[level], remoteName) +// groupedMkdir returns a slice of slices of paths to create. +// Because the underlying mkdir calls create intermediate directories, +// we can group them together to reduce the total number of calls. +// This returns a slice of a slice for parity with [groupedRmdir]. +func (d diff) groupedMkdir() [][]string { + // Compute the set of prefixes of all paths to create. + prefixes := make(map[string]bool) + for _, name := range d.mkdir { + dir := path.Dir(name) + for dir != "." && dir != "/" { + prefixes[dir] = true + dir = path.Dir(dir) + } } - // Get a sorted list of nesting levels. - levels := maps.Keys(deletesByLevel) - sort.Ints(levels) + var out []string + + // Collect all paths that are not a prefix of another path. + for _, name := range d.mkdir { + if !prefixes[name] { + out = append(out, name) + } + } + + return [][]string{out} +} + +// groupedRmdir returns a slice of slices of paths to delete. +// The outer slice is ordered such that each inner slice can be +// deleted in parallel, as long as it is processed in order. +// The first entry will contain leaf directories, the second entry +// will contain intermediate directories, and so on. +func (d diff) groupedRmdir() [][]string { + // Compute the number of times each directory is a prefix of another directory. + prefixes := make(map[string]int) + for _, dir := range d.rmdir { + prefixes[dir] = 0 + } + for _, dir := range d.rmdir { + dir = path.Dir(dir) + for dir != "." && dir != "/" { + // Increment the prefix count for this directory, only if it + // it one of the directories we are deleting. + if _, ok := prefixes[dir]; ok { + prefixes[dir]++ + } + dir = path.Dir(dir) + } + } - // Return slice ordered by descending level. - // Each slice contains paths at the same level. var out [][]string - for i := len(levels) - 1; i >= 0; i-- { - out = append(out, deletesByLevel[levels[i]]) + + for len(prefixes) > 0 { + var toDelete []string + + // Find directories which are not a prefix of another directory. + // These are the directories we can delete. + for dir, count := range prefixes { + if count == 0 { + toDelete = append(toDelete, dir) + delete(prefixes, dir) + } + } + + // Remove these directories from the prefixes map. + for _, dir := range toDelete { + dir = path.Dir(dir) + for dir != "." && dir != "/" { + // Decrement the prefix count for this directory, only if it + // it one of the directories we are deleting. + if _, ok := prefixes[dir]; ok { + prefixes[dir]-- + } + dir = path.Dir(dir) + } + } + + // Add these directories to the output. + out = append(out, toDelete) } return out diff --git a/libs/sync/diff_test.go b/libs/sync/diff_test.go index 55b6d370b6..a874cb8f72 100644 --- a/libs/sync/diff_test.go +++ b/libs/sync/diff_test.go @@ -6,9 +6,56 @@ import ( "github.com/stretchr/testify/assert" ) -func TestDiffGroupDeletesByNestingLevel(t *testing.T) { +func TestDiffGroupedMkdir(t *testing.T) { d := diff{ - delete: []string{ + mkdir: []string{ + "foo", + "foo/bar", + "foo/bar/baz1", + "foo/bar/baz2", + "foo1", + "a/b", + "a/b/c/d/e/f", + }, + } + + // Expect only leaf directories to be included. + out := d.groupedMkdir() + assert.ElementsMatch(t, []string{ + "foo/bar/baz1", + "foo/bar/baz2", + "foo1", + "a/b/c/d/e/f", + }, out[0]) +} + +func TestDiffGroupedRmdir(t *testing.T) { + d := diff{ + rmdir: []string{ + "a/b/c/d/e/f", + "a/b/c/d/e", + "a/b/c/d", + "a/b/c", + "a/b/e/f/g/h", + "a/b/e/f/g", + "a/b/e/f", + "a/b/e", + "a/b", + }, + } + + out := d.groupedRmdir() + assert.Len(t, out, 5) + assert.ElementsMatch(t, []string{"a/b/c/d/e/f", "a/b/e/f/g/h"}, out[0]) + assert.ElementsMatch(t, []string{"a/b/c/d/e", "a/b/e/f/g"}, out[1]) + assert.ElementsMatch(t, []string{"a/b/c/d", "a/b/e/f"}, out[2]) + assert.ElementsMatch(t, []string{"a/b/c", "a/b/e"}, out[3]) + assert.ElementsMatch(t, []string{"a/b"}, out[4]) +} + +func TestDiffGroupedRmdirWithLeafsOnly(t *testing.T) { + d := diff{ + rmdir: []string{ "foo/bar/baz1", "foo/bar1", "foo/bar/baz2", @@ -18,11 +65,8 @@ func TestDiffGroupDeletesByNestingLevel(t *testing.T) { }, } - expected := [][]string{ - {"foo/bar/baz1", "foo/bar/baz2"}, - {"foo/bar1", "foo/bar2"}, - {"foo1", "foo2"}, - } - - assert.Equal(t, expected, d.GroupDeletesByNestingLevel()) + // Expect all directories to be included. + out := d.groupedRmdir() + assert.Len(t, out, 1) + assert.ElementsMatch(t, d.rmdir, out[0]) } diff --git a/libs/sync/snapshot.go b/libs/sync/snapshot.go index 2f3eacc3dc..ff39b56463 100644 --- a/libs/sync/snapshot.go +++ b/libs/sync/snapshot.go @@ -185,6 +185,11 @@ func (s *Snapshot) diff(ctx context.Context, all []fileset.File) (change diff, e localFileSet[f.Relative] = struct{}{} } + // Capture both previous and current set of files. + // This is needed to detect directories that were added or removed. + previousFiles := maps.Keys(lastModifiedTimes) + currentFiles := maps.Keys(localFileSet) + for _, f := range all { // get current modified timestamp modified := f.Modified() @@ -255,10 +260,21 @@ func (s *Snapshot) diff(ctx context.Context, all []fileset.File) (change diff, e change.delete = append(change.delete, remoteName) } + // and remove them from the snapshot + for _, remoteName := range change.delete { + // we do note assert that remoteName exists in remoteToLocalNames since it + // will be missing for files with remote name changed + localName := remoteToLocalNames[remoteName] + + delete(lastModifiedTimes, localName) + delete(remoteToLocalNames, remoteName) + delete(localToRemoteNames, localName) + } + // Gather all directories previously present. previousDirectories := map[string]struct{}{} - for _, localName := range maps.Keys(lastModifiedTimes) { - dir := filepath.ToSlash(filepath.Dir(localName)) + for _, f := range previousFiles { + dir := filepath.ToSlash(filepath.Dir(f)) for dir != "." { if _, ok := previousDirectories[dir]; ok { break @@ -270,8 +286,8 @@ func (s *Snapshot) diff(ctx context.Context, all []fileset.File) (change diff, e // Gather all directories currently present. currentDirectories := map[string]struct{}{} - for _, f := range all { - dir := filepath.ToSlash(filepath.Dir(f.Relative)) + for _, f := range currentFiles { + dir := filepath.ToSlash(filepath.Dir(f)) for dir != "." { if _, ok := currentDirectories[dir]; ok { break @@ -281,22 +297,19 @@ func (s *Snapshot) diff(ctx context.Context, all []fileset.File) (change diff, e } } - // Gather all directories that are no longer present. + // Create all new directories. + for dir := range currentDirectories { + if _, ok := previousDirectories[dir]; !ok { + change.mkdir = append(change.mkdir, dir) + } + } + + // Remove all directories that are no longer present. for dir := range previousDirectories { if _, ok := currentDirectories[dir]; !ok { - change.delete = append(change.delete, dir) + change.rmdir = append(change.rmdir, dir) } } - // and remove them from the snapshot - for _, remoteName := range change.delete { - // we do note assert that remoteName exists in remoteToLocalNames since it - // will be missing for files with remote name changed - localName := remoteToLocalNames[remoteName] - - delete(lastModifiedTimes, localName) - delete(remoteToLocalNames, remoteName) - delete(localToRemoteNames, localName) - } return } diff --git a/libs/sync/snapshot_test.go b/libs/sync/snapshot_test.go index cb7e7e0899..c2e8f6b80c 100644 --- a/libs/sync/snapshot_test.go +++ b/libs/sync/snapshot_test.go @@ -139,7 +139,10 @@ func TestFolderDiff(t *testing.T) { change, err := state.diff(ctx, files) assert.NoError(t, err) assert.Len(t, change.delete, 0) + assert.Len(t, change.rmdir, 0) + assert.Len(t, change.mkdir, 1) assert.Len(t, change.put, 1) + assert.Contains(t, change.mkdir, "foo") assert.Contains(t, change.put, "foo/bar.py") f1.Remove(t) @@ -147,10 +150,12 @@ func TestFolderDiff(t *testing.T) { assert.NoError(t, err) change, err = state.diff(ctx, files) assert.NoError(t, err) - assert.Len(t, change.delete, 2) + assert.Len(t, change.delete, 1) + assert.Len(t, change.rmdir, 1) + assert.Len(t, change.mkdir, 0) assert.Len(t, change.put, 0) assert.Contains(t, change.delete, "foo/bar") - assert.Contains(t, change.delete, "foo") + assert.Contains(t, change.rmdir, "foo") } func TestPythonNotebookDiff(t *testing.T) { diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index bc9fbd9ea0..0a4b582b1a 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -14,83 +14,106 @@ import ( // Maximum number of concurrent requests during sync. const MaxRequestsInFlight = 20 -// Perform a DELETE of the specified remote path. -func (s *Sync) applyDelete(ctx context.Context, group *errgroup.Group, remoteName string) { - // Return early if the context has already been cancelled. - select { - case <-ctx.Done(): - return - default: - // Proceed. +// Delete the specified path. +func (s *Sync) applyDelete(ctx context.Context, remoteName string) error { + s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0) + + err := s.filer.Delete(ctx, remoteName) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + return err } - group.Go(func() error { - s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0) - err := s.filer.Delete(ctx, remoteName) - if err != nil && !errors.Is(err, fs.ErrNotExist) { - return err - } - s.notifyProgress(ctx, EventActionDelete, remoteName, 1.0) - return nil - }) + s.notifyProgress(ctx, EventActionDelete, remoteName, 1.0) + return nil +} + +// Create the specified path. +func (s *Sync) applyMkdir(ctx context.Context, localName string) error { + s.notifyProgress(ctx, EventActionPut, localName, 0.0) + + err := s.filer.Mkdir(ctx, localName) + if err != nil { + return err + } + + s.notifyProgress(ctx, EventActionPut, localName, 1.0) + return nil } // Perform a PUT of the specified local path. -func (s *Sync) applyPut(ctx context.Context, group *errgroup.Group, localName string) { +func (s *Sync) applyPut(ctx context.Context, localName string) error { + s.notifyProgress(ctx, EventActionPut, localName, 0.0) + + localFile, err := os.Open(filepath.Join(s.LocalPath, localName)) + if err != nil { + return err + } + + defer localFile.Close() + + opts := []filer.WriteMode{filer.CreateParentDirectories, filer.OverwriteIfExists} + err = s.filer.Write(ctx, localName, localFile, opts...) + if err != nil { + return err + } + + s.notifyProgress(ctx, EventActionPut, localName, 1.0) + return nil +} + +func groupRunSingle(ctx context.Context, group *errgroup.Group, fn func(context.Context, string) error, path string) { // Return early if the context has already been cancelled. select { case <-ctx.Done(): - return + break default: // Proceed. } group.Go(func() error { - s.notifyProgress(ctx, EventActionPut, localName, 0.0) - - localFile, err := os.Open(filepath.Join(s.LocalPath, localName)) - if err != nil { - return err - } + return fn(ctx, path) + }) +} - defer localFile.Close() +func groupRunParallel(ctx context.Context, paths []string, fn func(context.Context, string) error) error { + group, ctx := errgroup.WithContext(ctx) + group.SetLimit(MaxRequestsInFlight) - opts := []filer.WriteMode{filer.CreateParentDirectories, filer.OverwriteIfExists} - err = s.filer.Write(ctx, localName, localFile, opts...) - if err != nil { - return err - } + for _, path := range paths { + groupRunSingle(ctx, group, fn, path) + } - s.notifyProgress(ctx, EventActionPut, localName, 1.0) - return nil - }) + // Wait for goroutines to finish and return first non-nil error return if any. + return group.Wait() } func (s *Sync) applyDiff(ctx context.Context, d diff) error { - // Delete paths in descending order of nesting level, - // while deleting paths at the same level in parallel. - for _, deletes := range d.GroupDeletesByNestingLevel() { - group, ctx := errgroup.WithContext(ctx) - group.SetLimit(MaxRequestsInFlight) - - for _, remoteName := range deletes { - s.applyDelete(ctx, group, remoteName) - } + var err error + + // Delete files in parallel. + err = groupRunParallel(ctx, d.delete, s.applyDelete) + if err != nil { + return err + } - // Wait for goroutines to finish and return first non-nil error return if any. - err := group.Wait() + // Delete directories ordered by depth from leaf to root. + for _, group := range d.groupedRmdir() { + err = groupRunParallel(ctx, group, s.applyDelete) if err != nil { return err } } - group, ctx := errgroup.WithContext(ctx) - group.SetLimit(MaxRequestsInFlight) - - for _, localName := range d.put { - s.applyPut(ctx, group, localName) + // Create directories (leafs only because intermediates are created automatically). + for _, group := range d.groupedMkdir() { + err = groupRunParallel(ctx, group, s.applyMkdir) + if err != nil { + return err + } } - // Wait for goroutines to finish and return first non-nil error return if any. - return group.Wait() + // Put files in parallel. + err = groupRunParallel(ctx, d.put, s.applyPut) + + return err } From 811c52360a3baa77804b082e90dee154b6297733 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Tue, 6 Jun 2023 00:29:12 +0200 Subject: [PATCH 07/12] Update sync integration test --- internal/sync_test.go | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/internal/sync_test.go b/internal/sync_test.go index 89b318a970..4fe433e694 100644 --- a/internal/sync_test.go +++ b/internal/sync_test.go @@ -3,8 +3,10 @@ package internal import ( "context" "encoding/json" + "errors" "fmt" "io" + "io/fs" "net/http" "os" "os/exec" @@ -15,6 +17,7 @@ import ( "time" _ "github.com/databricks/cli/cmd/sync" + "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/sync" "github.com/databricks/cli/libs/testfile" "github.com/databricks/databricks-sdk-go" @@ -63,6 +66,7 @@ type syncTest struct { t *testing.T c *cobraTestRunner w *databricks.WorkspaceClient + f filer.Filer localRoot string remoteRoot string } @@ -73,6 +77,8 @@ func setupSyncTest(t *testing.T, args ...string) *syncTest { w := databricks.Must(databricks.NewWorkspaceClient()) localRoot := t.TempDir() remoteRoot := temporaryWorkspaceDir(t, w) + f, err := filer.NewWorkspaceFilesClient(w, remoteRoot) + require.NoError(t, err) // Prepend common arguments. args = append([]string{ @@ -90,6 +96,7 @@ func setupSyncTest(t *testing.T, args ...string) *syncTest { t: t, c: c, w: w, + f: f, localRoot: localRoot, remoteRoot: remoteRoot, } @@ -160,6 +167,13 @@ func (a *syncTest) remoteFileContent(ctx context.Context, relativePath string, e }, 30*time.Second, 5*time.Second) } +func (a *syncTest) notExist(ctx context.Context, relativePath string) { + a.c.Eventually(func() bool { + _, err := a.f.Stat(ctx, relativePath) + return errors.Is(err, fs.ErrNotExist) + }, 30*time.Second, 5*time.Second) +} + func (a *syncTest) objectType(ctx context.Context, relativePath string, expected string) { path := path.Join(a.remoteRoot, relativePath) @@ -297,8 +311,7 @@ func TestAccSyncNestedFolderSync(t *testing.T) { // delete f.Remove(t) assertSync.waitForCompletionMarker() - // directories are not cleaned up right now. This is not ideal - assertSync.remoteDirContent(ctx, "dir1/dir2/dir3", []string{}) + assertSync.notExist(ctx, "dir1") assertSync.snapshotContains(append(repoFiles, ".gitignore")) } @@ -326,12 +339,10 @@ func TestAccSyncNestedSpacePlusAndHashAreEscapedSync(t *testing.T) { // delete f.Remove(t) assertSync.waitForCompletionMarker() - // directories are not cleaned up right now. This is not ideal - assertSync.remoteDirContent(ctx, "dir1/a b+c/c+d e", []string{}) + assertSync.notExist(ctx, "dir1/a b+c/c+d e") assertSync.snapshotContains(append(repoFiles, ".gitignore")) } -// sync does not clean up empty directories from the workspace file system. // This is a check for the edge case when a user does the following: // // 1. Add file foo/bar.txt @@ -359,8 +370,7 @@ func TestAccSyncIncrementalFileOverwritesFolder(t *testing.T) { f.Remove(t) os.Remove(filepath.Join(assertSync.localRoot, "foo")) assertSync.waitForCompletionMarker() - assertSync.remoteDirContent(ctx, "foo", []string{}) - assertSync.objectType(ctx, "foo", "DIRECTORY") + assertSync.notExist(ctx, "foo") assertSync.snapshotContains(append(repoFiles, ".gitignore")) f2 := testfile.CreateFile(t, filepath.Join(assertSync.localRoot, "foo")) From 5a822945ac795880dfaa2747b968491c259d3889 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Tue, 6 Jun 2023 11:13:31 +0200 Subject: [PATCH 08/12] Failures deleting a directory can be ignored --- internal/sync_test.go | 57 ++++++++++++++++++++++++++++++++++++------- libs/sync/watchdog.go | 19 +++++++++++++-- 2 files changed, 65 insertions(+), 11 deletions(-) diff --git a/internal/sync_test.go b/internal/sync_test.go index 4fe433e694..f84cf4916e 100644 --- a/internal/sync_test.go +++ b/internal/sync_test.go @@ -3,7 +3,6 @@ package internal import ( "context" "encoding/json" - "errors" "fmt" "io" "io/fs" @@ -167,11 +166,19 @@ func (a *syncTest) remoteFileContent(ctx context.Context, relativePath string, e }, 30*time.Second, 5*time.Second) } -func (a *syncTest) notExist(ctx context.Context, relativePath string) { - a.c.Eventually(func() bool { - _, err := a.f.Stat(ctx, relativePath) - return errors.Is(err, fs.ErrNotExist) - }, 30*time.Second, 5*time.Second) +func (a *syncTest) remoteNotExist(ctx context.Context, relativePath string) { + _, err := a.f.Stat(ctx, relativePath) + require.ErrorIs(a.t, err, fs.ErrNotExist) +} + +func (a *syncTest) remoteExists(ctx context.Context, relativePath string) { + _, err := a.f.Stat(ctx, relativePath) + require.NoError(a.t, err) +} + +func (a *syncTest) touchFile(ctx context.Context, path string) { + err := a.f.Write(ctx, path, strings.NewReader("contents"), filer.CreateParentDirectories) + require.NoError(a.t, err) } func (a *syncTest) objectType(ctx context.Context, relativePath string, expected string) { @@ -311,10 +318,42 @@ func TestAccSyncNestedFolderSync(t *testing.T) { // delete f.Remove(t) assertSync.waitForCompletionMarker() - assertSync.notExist(ctx, "dir1") + assertSync.remoteNotExist(ctx, "dir1") assertSync.snapshotContains(append(repoFiles, ".gitignore")) } +func TestAccSyncNestedFolderDoesntFailOnNonEmptyDirectory(t *testing.T) { + ctx := context.Background() + assertSync := setupSyncTest(t, "--watch") + + // .gitignore is created by the sync process to enforce .databricks is not synced + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "", append(repoFiles, ".gitignore")) + + // New file + localFilePath := filepath.Join(assertSync.localRoot, "dir1/dir2/dir3/foo.txt") + err := os.MkdirAll(filepath.Dir(localFilePath), 0o755) + assert.NoError(t, err) + f := testfile.CreateFile(t, localFilePath) + defer f.Close(t) + assertSync.waitForCompletionMarker() + assertSync.remoteDirContent(ctx, "dir1/dir2/dir3", []string{"foo.txt"}) + + // Add file to dir1 to simulate a user writing to the workspace directly. + assertSync.touchFile(ctx, "dir1/foo.txt") + + // Remove original file. + f.Remove(t) + assertSync.waitForCompletionMarker() + + // Sync should have removed these directories. + assertSync.remoteNotExist(ctx, "dir1/dir2/dir3") + assertSync.remoteNotExist(ctx, "dir1/dir2") + + // Sync should have ignored not being able to delete dir1. + assertSync.remoteExists(ctx, "dir1") +} + func TestAccSyncNestedSpacePlusAndHashAreEscapedSync(t *testing.T) { ctx := context.Background() assertSync := setupSyncTest(t, "--watch") @@ -339,7 +378,7 @@ func TestAccSyncNestedSpacePlusAndHashAreEscapedSync(t *testing.T) { // delete f.Remove(t) assertSync.waitForCompletionMarker() - assertSync.notExist(ctx, "dir1/a b+c/c+d e") + assertSync.remoteNotExist(ctx, "dir1/a b+c/c+d e") assertSync.snapshotContains(append(repoFiles, ".gitignore")) } @@ -370,7 +409,7 @@ func TestAccSyncIncrementalFileOverwritesFolder(t *testing.T) { f.Remove(t) os.Remove(filepath.Join(assertSync.localRoot, "foo")) assertSync.waitForCompletionMarker() - assertSync.notExist(ctx, "foo") + assertSync.remoteNotExist(ctx, "foo") assertSync.snapshotContains(append(repoFiles, ".gitignore")) f2 := testfile.CreateFile(t, filepath.Join(assertSync.localRoot, "foo")) diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index 0a4b582b1a..c2fa84feec 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -8,6 +8,7 @@ import ( "path/filepath" "github.com/databricks/cli/libs/filer" + "github.com/databricks/cli/libs/log" "golang.org/x/sync/errgroup" ) @@ -27,7 +28,21 @@ func (s *Sync) applyDelete(ctx context.Context, remoteName string) error { return nil } -// Create the specified path. +// Remove the directory at the specified path. +func (s *Sync) applyRmdir(ctx context.Context, remoteName string) error { + s.notifyProgress(ctx, EventActionDelete, remoteName, 0.0) + + err := s.filer.Delete(ctx, remoteName) + if err != nil { + // Directory deletion is opportunistic, so we ignore errors. + log.Debugf(ctx, "error removing directory %s: %s", remoteName, err) + } + + s.notifyProgress(ctx, EventActionDelete, remoteName, 1.0) + return nil +} + +// Create a directory at the specified path. func (s *Sync) applyMkdir(ctx context.Context, localName string) error { s.notifyProgress(ctx, EventActionPut, localName, 0.0) @@ -98,7 +113,7 @@ func (s *Sync) applyDiff(ctx context.Context, d diff) error { // Delete directories ordered by depth from leaf to root. for _, group := range d.groupedRmdir() { - err = groupRunParallel(ctx, group, s.applyDelete) + err = groupRunParallel(ctx, group, s.applyRmdir) if err != nil { return err } From 5cbae81139145d89d1f2a048430c6dc0a4d810ed Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Tue, 6 Jun 2023 11:20:15 +0200 Subject: [PATCH 09/12] Assert file exists --- internal/sync_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/sync_test.go b/internal/sync_test.go index f84cf4916e..09418a8556 100644 --- a/internal/sync_test.go +++ b/internal/sync_test.go @@ -351,6 +351,7 @@ func TestAccSyncNestedFolderDoesntFailOnNonEmptyDirectory(t *testing.T) { assertSync.remoteNotExist(ctx, "dir1/dir2") // Sync should have ignored not being able to delete dir1. + assertSync.remoteExists(ctx, "dir1/foo.txt") assertSync.remoteExists(ctx, "dir1") } From e8b34731d1eb243b4e1adc6f478b0d496e8a6456 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Fri, 9 Jun 2023 14:17:31 +0200 Subject: [PATCH 10/12] Factor out directory processing code into dedicated type --- libs/sync/dirset.go | 54 ++++++++++++++++++++++++++++++++++++++++ libs/sync/dirset_test.go | 37 +++++++++++++++++++++++++++ libs/sync/snapshot.go | 50 ++++++------------------------------- 3 files changed, 99 insertions(+), 42 deletions(-) create mode 100644 libs/sync/dirset.go create mode 100644 libs/sync/dirset_test.go diff --git a/libs/sync/dirset.go b/libs/sync/dirset.go new file mode 100644 index 0000000000..3c37c97cfa --- /dev/null +++ b/libs/sync/dirset.go @@ -0,0 +1,54 @@ +package sync + +import ( + "path" + "path/filepath" + "sort" +) + +// DirSet is a set of directories. +type DirSet map[string]struct{} + +// MakeDirSet turns a list of file paths into the complete set of directories +// that is needed to store them (including parent directories). +func MakeDirSet(files []string) DirSet { + out := map[string]struct{}{} + + // Iterate over all files. + for _, f := range files { + // Get the directory of the file in /-separated form. + dir := filepath.ToSlash(filepath.Dir(f)) + + // Add this directory and its parents until it is either "." or already in the set. + for dir != "." { + if _, ok := out[dir]; ok { + break + } + out[dir] = struct{}{} + dir = path.Dir(dir) + } + } + + return out +} + +// Slice returns a sorted copy of the dirset elements as a slice. +func (dirset DirSet) Slice() []string { + out := make([]string, 0, len(dirset)) + for dir := range dirset { + out = append(out, dir) + } + sort.Strings(out) + return out +} + +// Remove returns the set difference of two DirSets. +func (dirset DirSet) Remove(other DirSet) DirSet { + out := map[string]struct{}{} + for dir := range dirset { + if _, ok := other[dir]; !ok { + out[dir] = struct{}{} + } + } + return out +} diff --git a/libs/sync/dirset_test.go b/libs/sync/dirset_test.go new file mode 100644 index 0000000000..7e920819c0 --- /dev/null +++ b/libs/sync/dirset_test.go @@ -0,0 +1,37 @@ +package sync + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "golang.org/x/exp/maps" +) + +func TestMakeDirSet(t *testing.T) { + assert.ElementsMatch(t, + []string{ + "a", + "a/b", + "a/b/c", + "a/b/d", + "a/e", + "b", + }, + maps.Keys( + MakeDirSet([]string{ + "./a/b/c/file1", + "./a/b/c/file2", + "./a/b/d/file", + "./a/e/file", + "b/file", + }), + ), + ) +} + +func TestDirSetRemove(t *testing.T) { + a := MakeDirSet([]string{"./a/b/c/file1"}) + b := MakeDirSet([]string{"./a/b/d/file2"}) + assert.ElementsMatch(t, []string{"a/b/c"}, a.Remove(b).Slice()) + assert.ElementsMatch(t, []string{"a/b/d"}, b.Remove(a).Slice()) +} diff --git a/libs/sync/snapshot.go b/libs/sync/snapshot.go index ff39b56463..1680f04626 100644 --- a/libs/sync/snapshot.go +++ b/libs/sync/snapshot.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "os" - "path" "path/filepath" "strings" "time" @@ -186,10 +185,17 @@ func (s *Snapshot) diff(ctx context.Context, all []fileset.File) (change diff, e } // Capture both previous and current set of files. - // This is needed to detect directories that were added or removed. previousFiles := maps.Keys(lastModifiedTimes) currentFiles := maps.Keys(localFileSet) + // Build directory sets to figure out which directories to create and which to remove. + previousDirectories := MakeDirSet(previousFiles) + currentDirectories := MakeDirSet(currentFiles) + + // Create new directories; remove stale directories. + change.mkdir = currentDirectories.Remove(previousDirectories).Slice() + change.rmdir = previousDirectories.Remove(currentDirectories).Slice() + for _, f := range all { // get current modified timestamp modified := f.Modified() @@ -271,45 +277,5 @@ func (s *Snapshot) diff(ctx context.Context, all []fileset.File) (change diff, e delete(localToRemoteNames, localName) } - // Gather all directories previously present. - previousDirectories := map[string]struct{}{} - for _, f := range previousFiles { - dir := filepath.ToSlash(filepath.Dir(f)) - for dir != "." { - if _, ok := previousDirectories[dir]; ok { - break - } - previousDirectories[dir] = struct{}{} - dir = path.Dir(dir) - } - } - - // Gather all directories currently present. - currentDirectories := map[string]struct{}{} - for _, f := range currentFiles { - dir := filepath.ToSlash(filepath.Dir(f)) - for dir != "." { - if _, ok := currentDirectories[dir]; ok { - break - } - currentDirectories[dir] = struct{}{} - dir = path.Dir(dir) - } - } - - // Create all new directories. - for dir := range currentDirectories { - if _, ok := previousDirectories[dir]; !ok { - change.mkdir = append(change.mkdir, dir) - } - } - - // Remove all directories that are no longer present. - for dir := range previousDirectories { - if _, ok := currentDirectories[dir]; !ok { - change.rmdir = append(change.rmdir, dir) - } - } - return } From d9fb6c49eac3c2344ebed34039f88b05f62edea2 Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Fri, 9 Jun 2023 14:18:22 +0200 Subject: [PATCH 11/12] Add assert --- libs/sync/diff_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/libs/sync/diff_test.go b/libs/sync/diff_test.go index a874cb8f72..ff44887218 100644 --- a/libs/sync/diff_test.go +++ b/libs/sync/diff_test.go @@ -21,6 +21,7 @@ func TestDiffGroupedMkdir(t *testing.T) { // Expect only leaf directories to be included. out := d.groupedMkdir() + assert.Len(t, out, 1) assert.ElementsMatch(t, []string{ "foo/bar/baz1", "foo/bar/baz2", From 4a5de1796c70df3d8b7f759ac4c88803e1fdbfbd Mon Sep 17 00:00:00 2001 From: Pieter Noordhuis Date: Mon, 12 Jun 2023 13:32:41 +0200 Subject: [PATCH 12/12] break -> return --- libs/sync/watchdog.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/sync/watchdog.go b/libs/sync/watchdog.go index c2fa84feec..b0c96e01c0 100644 --- a/libs/sync/watchdog.go +++ b/libs/sync/watchdog.go @@ -80,7 +80,7 @@ func groupRunSingle(ctx context.Context, group *errgroup.Group, fn func(context. // Return early if the context has already been cancelled. select { case <-ctx.Done(): - break + return default: // Proceed. }