Skip to content

Commit

Permalink
Added support for glob patterns in pipeline libraries section (#833)
Browse files Browse the repository at this point in the history
## Changes
Now it's possible to specify glob pattern in pipeline libraries section
and DAB will add all matched files as libraries

```
  pipelines:
    dummy:
      name: " DLT with Python files"
      target: "dlt_python_files"
      libraries:
        - file:
            path: ./*.py
```

## Tests
Added unit test
  • Loading branch information
andrewnester authored Oct 4, 2023
1 parent 9b6a847 commit aa54a86
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 0 deletions.
89 changes: 89 additions & 0 deletions bundle/config/mutator/expand_pipeline_glob_paths.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package mutator

import (
"context"
"fmt"
"path/filepath"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/databricks-sdk-go/service/pipelines"
)

type expandPipelineGlobPaths struct{}

func ExpandPipelineGlobPaths() bundle.Mutator {
return &expandPipelineGlobPaths{}
}

func (m *expandPipelineGlobPaths) Apply(_ context.Context, b *bundle.Bundle) error {
for key, pipeline := range b.Config.Resources.Pipelines {
dir, err := pipeline.ConfigFileDirectory()
if err != nil {
return fmt.Errorf("unable to determine directory for pipeline %s: %w", key, err)
}

expandedLibraries := make([]pipelines.PipelineLibrary, 0)
for i := 0; i < len(pipeline.Libraries); i++ {

library := &pipeline.Libraries[i]
path := getGlobPatternToExpand(library)
if path == "" || !libraries.IsLocalPath(path) {
expandedLibraries = append(expandedLibraries, *library)
continue
}

matches, err := filepath.Glob(filepath.Join(dir, path))
if err != nil {
return err
}

for _, match := range matches {
m, err := filepath.Rel(dir, match)
if err != nil {
return err
}
expandedLibraries = append(expandedLibraries, cloneWithPath(library, m))
}
}
pipeline.Libraries = expandedLibraries
}

return nil
}

func getGlobPatternToExpand(library *pipelines.PipelineLibrary) string {
if library.File != nil {
return library.File.Path
}

if library.Notebook != nil {
return library.Notebook.Path
}

return ""
}

func cloneWithPath(library *pipelines.PipelineLibrary, path string) pipelines.PipelineLibrary {
if library.File != nil {
return pipelines.PipelineLibrary{
File: &pipelines.FileLibrary{
Path: path,
},
}
}

if library.Notebook != nil {
return pipelines.PipelineLibrary{
Notebook: &pipelines.NotebookLibrary{
Path: path,
},
}
}

return pipelines.PipelineLibrary{}
}

func (*expandPipelineGlobPaths) Name() string {
return "ExpandPipelineGlobPaths"
}
154 changes: 154 additions & 0 deletions bundle/config/mutator/expand_pipeline_glob_paths_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package mutator

import (
"context"
"os"
"path/filepath"
"testing"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/paths"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/stretchr/testify/require"
)

func touchEmptyFile(t *testing.T, path string) {
err := os.MkdirAll(filepath.Dir(path), 0700)
require.NoError(t, err)
f, err := os.Create(path)
require.NoError(t, err)
f.Close()
}

func TestExpandGlobPathsInPipelines(t *testing.T) {
dir := t.TempDir()

touchEmptyFile(t, filepath.Join(dir, "test1.ipynb"))
touchEmptyFile(t, filepath.Join(dir, "test/test2.ipynb"))
touchEmptyFile(t, filepath.Join(dir, "test/test3.ipynb"))
touchEmptyFile(t, filepath.Join(dir, "test1.jar"))
touchEmptyFile(t, filepath.Join(dir, "test/test2.jar"))
touchEmptyFile(t, filepath.Join(dir, "test/test3.jar"))
touchEmptyFile(t, filepath.Join(dir, "test1.py"))
touchEmptyFile(t, filepath.Join(dir, "test/test2.py"))
touchEmptyFile(t, filepath.Join(dir, "test/test3.py"))

b := &bundle.Bundle{
Config: config.Root{
Path: dir,
Resources: config.Resources{
Pipelines: map[string]*resources.Pipeline{
"pipeline": {
Paths: paths.Paths{
ConfigFilePath: filepath.Join(dir, "resource.yml"),
},
PipelineSpec: &pipelines.PipelineSpec{
Libraries: []pipelines.PipelineLibrary{
{
Notebook: &pipelines.NotebookLibrary{
Path: "./**/*.ipynb",
},
},
{
Jar: "./*.jar",
},
{
File: &pipelines.FileLibrary{
Path: "./**/*.py",
},
},
{
Maven: &compute.MavenLibrary{
Coordinates: "org.jsoup:jsoup:1.7.2",
},
},
{
Notebook: &pipelines.NotebookLibrary{
Path: "./test1.ipynb",
},
},
{
Notebook: &pipelines.NotebookLibrary{
Path: "/Workspace/Users/me@company.com/test.ipynb",
},
},
{
Notebook: &pipelines.NotebookLibrary{
Path: "dbfs:/me@company.com/test.ipynb",
},
},
},
},
},
},
},
},
}

m := ExpandPipelineGlobPaths()
err := bundle.Apply(context.Background(), b, m)
require.NoError(t, err)

libraries := b.Config.Resources.Pipelines["pipeline"].Libraries
require.Len(t, libraries, 9)

// Making sure glob patterns are expanded correctly
require.True(t, containsNotebook(libraries, filepath.Join("test", "test2.ipynb")))
require.True(t, containsNotebook(libraries, filepath.Join("test", "test3.ipynb")))
require.True(t, containsFile(libraries, filepath.Join("test", "test2.py")))
require.True(t, containsFile(libraries, filepath.Join("test", "test3.py")))

// Making sure exact file references work as well
require.True(t, containsNotebook(libraries, "test1.ipynb"))

// Making sure absolute pass to remote FS file references work as well
require.True(t, containsNotebook(libraries, "/Workspace/Users/me@company.com/test.ipynb"))
require.True(t, containsNotebook(libraries, "dbfs:/me@company.com/test.ipynb"))

// Making sure other libraries are not replaced
require.True(t, containsJar(libraries, "./*.jar"))
require.True(t, containsMaven(libraries, "org.jsoup:jsoup:1.7.2"))
}

func containsNotebook(libraries []pipelines.PipelineLibrary, path string) bool {
for _, l := range libraries {
if l.Notebook != nil && l.Notebook.Path == path {
return true
}
}

return false
}

func containsJar(libraries []pipelines.PipelineLibrary, path string) bool {
for _, l := range libraries {
if l.Jar == path {
return true
}
}

return false
}

func containsMaven(libraries []pipelines.PipelineLibrary, coordinates string) bool {
for _, l := range libraries {
if l.Maven != nil && l.Maven.Coordinates == coordinates {
return true
}
}

return false
}

func containsFile(libraries []pipelines.PipelineLibrary, path string) bool {
for _, l := range libraries {
if l.File != nil && l.File.Path == path {
return true
}
}

return false
}
4 changes: 4 additions & 0 deletions bundle/libraries/libraries.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ func isLocalLibrary(library *compute.Library) bool {
return false
}

return IsLocalPath(path)
}

func IsLocalPath(path string) bool {
if isExplicitFileScheme(path) {
return true
}
Expand Down
1 change: 1 addition & 0 deletions bundle/phases/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func Initialize() bundle.Mutator {
),
mutator.OverrideCompute(),
mutator.ProcessTargetMode(),
mutator.ExpandPipelineGlobPaths(),
mutator.TranslatePaths(),
python.WrapperWarning(),
terraform.Initialize(),
Expand Down

0 comments on commit aa54a86

Please sign in to comment.