Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add workspace export-dir command #449

Merged
merged 14 commits into from
Jun 8, 2023
46 changes: 46 additions & 0 deletions cmd/workspace/workspace/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package workspace

type fileIOEvent struct {
SourcePath string `json:"source_path,omitempty"`
TargetPath string `json:"target_path,omitempty"`
Type EventType `json:"type"`
}

type EventType string

const (
EventTypeFileExported = EventType("FILE_EXPORTED")
EventTypeExportStarted = EventType("EXPORT_STARTED")
EventTypeExportCompleted = EventType("EXPORT_COMPLETED")
EventTypeFileSkipped = EventType("FILE_SKIPPED")
)

func newFileExportedEvent(sourcePath, targetPath string) fileIOEvent {
return fileIOEvent{
SourcePath: sourcePath,
TargetPath: targetPath,
Type: EventTypeFileExported,
}
}

func newExportCompletedEvent(targetPath string) fileIOEvent {
return fileIOEvent{
TargetPath: targetPath,
Type: EventTypeExportCompleted,
}
}

func newFileSkippedEvent(sourcePath, targetPath string) fileIOEvent {
shreyas-goenka marked this conversation as resolved.
Show resolved Hide resolved
return fileIOEvent{
SourcePath: sourcePath,
TargetPath: targetPath,
Type: EventTypeFileSkipped,
}
}

func newExportStartedEvent(sourcePath string) fileIOEvent {
return fileIOEvent{
SourcePath: sourcePath,
Type: EventTypeExportStarted,
}
}
125 changes: 125 additions & 0 deletions cmd/workspace/workspace/export_dir.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package workspace

import (
"context"
"io"
"io/fs"
"os"
"path"
"path/filepath"

"github.com/databricks/cli/cmd/root"
"github.com/databricks/cli/libs/cmdio"
"github.com/databricks/cli/libs/filer"
"github.com/databricks/databricks-sdk-go/service/workspace"
"github.com/spf13/cobra"
)

// The callback function exports the file specified at relPath. This function is
// meant to be used in conjunction with fs.WalkDir
func exportFileCallback(ctx context.Context, workspaceFiler filer.Filer, sourceDir, targetDir string) func(string, fs.DirEntry, error) error {
return func(relPath string, d fs.DirEntry, err error) error {
if err != nil {
return err
}

sourcePath := path.Join(sourceDir, relPath)
targetPath := filepath.Join(targetDir, relPath)

// create directory and return early
if d.IsDir() {
return os.MkdirAll(targetPath, 0755)
}

// Add extension to local file path if the file is a notebook
info, err := d.Info()
if err != nil {
return err
}
objectInfo := info.Sys().(workspace.ObjectInfo)
if objectInfo.ObjectType == workspace.ObjectTypeNotebook {
switch objectInfo.Language {
case workspace.LanguagePython:
targetPath += ".py"
case workspace.LanguageR:
targetPath += ".r"
case workspace.LanguageScala:
targetPath += ".scala"
case workspace.LanguageSql:
targetPath += ".sql"
default:
// Do not add any extension to the file name
}
}

// Skip file if a file already exists in path.
// os.Stat returns a fs.ErrNotExist if a file does not exist at path.
// If a file exists, and overwrite is not set, we skip exporting the file
if _, err := os.Stat(targetPath); err == nil && !exportOverwrite {
// Log event that this file/directory has been skipped
return cmdio.RenderWithTemplate(ctx, newFileSkippedEvent(relPath, targetPath), "{{.SourcePath}} -> {{.TargetPath}} (skipped; already exists)\n")
}

// create the file
f, err := os.Create(targetPath)
if err != nil {
return err
}
defer f.Close()

// Write content to the local file
r, err := workspaceFiler.Read(ctx, relPath)
if err != nil {
return err
}
_, err = io.Copy(f, r)
if err != nil {
return err
}
return cmdio.RenderWithTemplate(ctx, newFileExportedEvent(sourcePath, targetPath), "{{.SourcePath}} -> {{.TargetPath}}\n")
}
}

var exportDirCommand = &cobra.Command{
Use: "export-dir SOURCE_PATH TARGET_PATH",
Short: `Export a directory from a Databricks workspace to the local file system.`,
Long: `
Export a directory recursively from a Databricks workspace to the local file system.
Notebooks will have one of the following extensions added .scala, .py, .sql, or .r
based on the language type.
`,
PreRunE: root.MustWorkspaceClient,
Args: cobra.ExactArgs(2),
RunE: func(cmd *cobra.Command, args []string) (err error) {
ctx := cmd.Context()
w := root.WorkspaceClient(ctx)
sourceDir := args[0]
targetDir := args[1]

// Initialize a filer and a file system on the source directory
workspaceFiler, err := filer.NewWorkspaceFilesClient(w, sourceDir)
if err != nil {
return err
}
workspaceFS := filer.NewFS(ctx, workspaceFiler)

// TODO: print progress events on stderr instead: https://github.com/databricks/cli/issues/448
err = cmdio.RenderJson(ctx, newExportStartedEvent(sourceDir))
if err != nil {
return err
}

err = fs.WalkDir(workspaceFS, ".", exportFileCallback(ctx, workspaceFiler, sourceDir, targetDir))
if err != nil {
return err
}
return cmdio.RenderJson(ctx, newExportCompletedEvent(targetDir))
},
}

var exportOverwrite bool

func init() {
exportDirCommand.Flags().BoolVar(&exportOverwrite, "overwrite", false, "overwrite existing local files")
Cmd.AddCommand(exportDirCommand)
}
6 changes: 6 additions & 0 deletions internal/filer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ func runFilerReadWriteTest(t *testing.T, ctx context.Context, f filer.Filer) {
assert.True(t, errors.As(err, &filer.FileDoesNotExistError{}))
assert.True(t, errors.Is(err, fs.ErrNotExist))

// Read should fail because the path points to a directory
err = f.Mkdir(ctx, "/dir")
require.NoError(t, err)
_, err = f.Read(ctx, "/dir")
assert.ErrorIs(t, err, fs.ErrInvalid)

// Write with CreateParentDirectories flag should succeed.
err = f.Write(ctx, "/foo/bar", strings.NewReader(`hello world`), filer.CreateParentDirectories)
assert.NoError(t, err)
Expand Down
111 changes: 111 additions & 0 deletions internal/workspace_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
package internal

import (
"context"
"errors"
"net/http"
"os"
"path/filepath"
"strings"
"testing"

"github.com/databricks/cli/libs/filer"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestAccWorkspaceList(t *testing.T) {
Expand All @@ -27,3 +37,104 @@ func TestWorkpaceGetStatusErrorWhenNoArguments(t *testing.T) {
_, _, err := RequireErrorRun(t, "workspace", "get-status")
assert.Equal(t, "accepts 1 arg(s), received 0", err.Error())
}

func setupWorkspaceImportExportTest(t *testing.T) (context.Context, filer.Filer, string) {
t.Log(GetEnvOrSkipTest(t, "CLOUD_ENV"))

ctx := context.Background()
w := databricks.Must(databricks.NewWorkspaceClient())
tmpdir := temporaryWorkspaceDir(t, w)
f, err := filer.NewWorkspaceFilesClient(w, tmpdir)
require.NoError(t, err)

// Check if we can use this API here, skip test if we cannot.
_, err = f.Read(ctx, "we_use_this_call_to_test_if_this_api_is_enabled")
var aerr *apierr.APIError
if errors.As(err, &aerr) && aerr.StatusCode == http.StatusBadRequest {
t.Skip(aerr.Message)
}

return ctx, f, tmpdir
}

// TODO: add tests for the progress event output logs: https://github.com/databricks/cli/issues/447
func assertFileContents(t *testing.T, path string, content string) {
require.FileExists(t, path)
b, err := os.ReadFile(path)
require.NoError(t, err)
assert.Contains(t, string(b), content)
}

func TestAccExportDir(t *testing.T) {
ctx, f, sourceDir := setupWorkspaceImportExportTest(t)
targetDir := t.TempDir()

var err error

// Write test data to the workspace
err = f.Write(ctx, "file-a", strings.NewReader("abc"))
require.NoError(t, err)
err = f.Write(ctx, "pyNotebook.py", strings.NewReader("# Databricks notebook source"))
require.NoError(t, err)
err = f.Write(ctx, "sqlNotebook.sql", strings.NewReader("-- Databricks notebook source"))
require.NoError(t, err)
err = f.Write(ctx, "scalaNotebook.scala", strings.NewReader("// Databricks notebook source"))
require.NoError(t, err)
err = f.Write(ctx, "rNotebook.r", strings.NewReader("# Databricks notebook source"))
require.NoError(t, err)
err = f.Write(ctx, "a/b/c/file-b", strings.NewReader("def"), filer.CreateParentDirectories)
require.NoError(t, err)

// Run Export
RequireSuccessfulRun(t, "workspace", "export-dir", sourceDir, targetDir)

// Assert files were exported
assertFileContents(t, filepath.Join(targetDir, "file-a"), "abc")
assertFileContents(t, filepath.Join(targetDir, "pyNotebook.py"), "# Databricks notebook source")
assertFileContents(t, filepath.Join(targetDir, "sqlNotebook.sql"), "-- Databricks notebook source")
assertFileContents(t, filepath.Join(targetDir, "rNotebook.r"), "# Databricks notebook source")
assertFileContents(t, filepath.Join(targetDir, "scalaNotebook.scala"), "// Databricks notebook source")
assertFileContents(t, filepath.Join(targetDir, "a/b/c/file-b"), "def")
}

func TestAccExportDirDoesNotOverwrite(t *testing.T) {
ctx, f, sourceDir := setupWorkspaceImportExportTest(t)
targetDir := t.TempDir()

var err error

// Write remote file
err = f.Write(ctx, "file-a", strings.NewReader("content from workspace"))
require.NoError(t, err)

// Write local file
err = os.WriteFile(filepath.Join(targetDir, "file-a"), []byte("local content"), os.ModePerm)
require.NoError(t, err)

// Run Export
RequireSuccessfulRun(t, "workspace", "export-dir", sourceDir, targetDir)

// Assert file is not overwritten
assertFileContents(t, filepath.Join(targetDir, "file-a"), "local content")
}

func TestAccExportDirWithOverwriteFlag(t *testing.T) {
ctx, f, sourceDir := setupWorkspaceImportExportTest(t)
targetDir := t.TempDir()

var err error

// Write remote file
err = f.Write(ctx, "file-a", strings.NewReader("content from workspace"))
require.NoError(t, err)

// Write local file
err = os.WriteFile(filepath.Join(targetDir, "file-a"), []byte("local content"), os.ModePerm)
require.NoError(t, err)

// Run Export
RequireSuccessfulRun(t, "workspace", "export-dir", sourceDir, targetDir, "--overwrite")

// Assert file has been overwritten
assertFileContents(t, filepath.Join(targetDir, "file-a"), "content from workspace")
}
8 changes: 8 additions & 0 deletions libs/cmdio/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ func RenderWithTemplate(ctx context.Context, v any, template string) error {
}
}

func RenderJson(ctx context.Context, v any) error {
c := fromContext(ctx)
if c.outputFormat == flags.OutputJSON {
return renderJson(c.out, v)
}
return nil
}

func RenderReader(ctx context.Context, r io.Reader) error {
c := fromContext(ctx)
switch c.outputFormat {
Expand Down
27 changes: 12 additions & 15 deletions libs/filer/dbfs_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (info dbfsFileInfo) IsDir() bool {
}

func (info dbfsFileInfo) Sys() any {
return nil
return info.fi
}

// DbfsClient implements the [Filer] interface for the DBFS backend.
Expand Down Expand Up @@ -145,24 +145,21 @@ func (w *DbfsClient) Read(ctx context.Context, name string) (io.Reader, error) {
return nil, err
}

handle, err := w.workspaceClient.Dbfs.Open(ctx, absPath, files.FileModeRead)
// This stat call serves two purposes:
// 1. Checks file at path exists, and throws an error if it does not
// 2. Allows us to error out if the path is a directory. This is needed
// because the Dbfs.Open method on the SDK does not error when the path is
// a directory
// TODO(added 8 June 2023): remove this stat call on go sdk bump. https://github.com/databricks/cli/issues/450
stat, err := w.Stat(ctx, name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in databricks/databricks-sdk-go#415

I would prefer not adding a stat call for every read if we can avoid it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a todo with a follow-up issue filed. I would rather not block on a go SDK release or remove test coverage

if err != nil {
var aerr *apierr.APIError
if !errors.As(err, &aerr) {
return nil, err
}

// This API returns a 404 if the file doesn't exist.
if aerr.StatusCode == http.StatusNotFound {
if aerr.ErrorCode == "RESOURCE_DOES_NOT_EXIST" {
return nil, FileDoesNotExistError{absPath}
}
}

return nil, err
}
if stat.IsDir() {
return nil, NotAFile{absPath}
}

return handle, nil
return w.workspaceClient.Dbfs.Open(ctx, absPath, files.FileModeRead)
}

func (w *DbfsClient) Delete(ctx context.Context, name string, mode ...DeleteMode) error {
Expand Down
12 changes: 12 additions & 0 deletions libs/filer/filer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ func (err NotADirectory) Is(other error) bool {
return other == fs.ErrInvalid
}

type NotAFile struct {
path string
}

func (err NotAFile) Error() string {
return fmt.Sprintf("not a file: %s", err.path)
}

func (err NotAFile) Is(other error) bool {
return other == fs.ErrInvalid
}

type DirectoryNotEmptyError struct {
path string
}
Expand Down
Loading