Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added transformation mutator for Python wheel task for them to work on DBR <13.1 #635

Merged
merged 10 commits into from
Aug 30, 2023
95 changes: 95 additions & 0 deletions bundle/config/mutator/trampoline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package mutator

import (
"context"
"fmt"
"os"
"path"
"path/filepath"
"text/template"

"github.com/databricks/cli/bundle"
"github.com/databricks/databricks-sdk-go/service/jobs"
)

type fnTemplateData func(task *jobs.Task) (map[string]any, error)
type fnCleanUp func(task *jobs.Task)
andrewnester marked this conversation as resolved.
Show resolved Hide resolved
type fnTasks func(b *bundle.Bundle) []*jobs.Task
andrewnester marked this conversation as resolved.
Show resolved Hide resolved

type trampoline struct {
andrewnester marked this conversation as resolved.
Show resolved Hide resolved
name string
getTasks fnTasks
templateData fnTemplateData
cleanUp fnCleanUp
template string
}

func NewTrampoline(
name string,
tasks fnTasks,
templateData fnTemplateData,
cleanUp fnCleanUp,
template string,
) *trampoline {
return &trampoline{name, tasks, templateData, cleanUp, template}
}

func (m *trampoline) Name() string {
return fmt.Sprintf("trampoline(%s)", m.name)
}

func (m *trampoline) Apply(ctx context.Context, b *bundle.Bundle) error {
tasks := m.getTasks(b)
for _, task := range tasks {
err := m.generateNotebookWrapper(b, task)
if err != nil {
return err
}
}
return nil
}

func (m *trampoline) generateNotebookWrapper(b *bundle.Bundle, task *jobs.Task) error {
internalDir, err := b.InternalDir()
if err != nil {
return err
}

notebookName := fmt.Sprintf("notebook_%s", task.TaskKey)
andrewnester marked this conversation as resolved.
Show resolved Hide resolved
localNotebookPath := filepath.Join(internalDir, notebookName+".py")

err = os.MkdirAll(filepath.Dir(localNotebookPath), 0755)
if err != nil {
return err
}

f, err := os.Create(localNotebookPath)
if err != nil {
return err
}
defer f.Close()

data, err := m.templateData(task)
if err != nil {
return err
}

t, err := template.New(notebookName).Parse(m.template)
if err != nil {
return err
}

internalDirRel, err := filepath.Rel(b.Config.Path, internalDir)
if err != nil {
return err
}

m.cleanUp(task)
remotePath := path.Join(b.Config.Workspace.FilesPath, filepath.ToSlash(internalDirRel), notebookName)

task.NotebookTask = &jobs.NotebookTask{
NotebookPath: remotePath,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment on how we expect the wheel args, or task params, to be passed in the template?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a reference example, in Python trampoline we do it with sys.argv as you can see here
https://github.com/databricks/cli/pull/635/files#diff-8ddf2564fcd580399d61df5283d0c0c7b614c476e0803f63f9f785b62dc69a04R23

I'd expect it should work similarly for task params

}

return t.Execute(f, data)
}
90 changes: 90 additions & 0 deletions bundle/config/mutator/trampoline_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package mutator

import (
"context"
"fmt"
"os"
"path/filepath"
"testing"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/require"
)

func getTasks(b *bundle.Bundle) []*jobs.Task {
tasks := make([]*jobs.Task, 0)
for k := range b.Config.Resources.Jobs["test"].Tasks {
tasks = append(tasks, &b.Config.Resources.Jobs["test"].Tasks[k])
}

return tasks
}

func templateData(task *jobs.Task) (map[string]any, error) {
if task.PythonWheelTask == nil {
return nil, fmt.Errorf("PythonWheelTask cannot be nil")
}

data := make(map[string]any)
data["MyName"] = "Trampoline"
return data, nil
}

func cleanUp(task *jobs.Task) {
task.PythonWheelTask = nil
}

func TestGenerateTrampoline(t *testing.T) {
tmpDir := t.TempDir()

tasks := []jobs.Task{
{
TaskKey: "to_trampoline",
PythonWheelTask: &jobs.PythonWheelTask{
PackageName: "test",
EntryPoint: "run",
}},
}

b := &bundle.Bundle{
Config: config.Root{
Path: tmpDir,
Bundle: config.Bundle{
Target: "development",
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"test": {
Paths: resources.Paths{
ConfigFilePath: tmpDir,
},
JobSettings: &jobs.JobSettings{
Tasks: tasks,
},
},
},
},
},
}
ctx := context.Background()

trampoline := NewTrampoline("test_trampoline", getTasks, templateData, cleanUp, "Hello from {{.MyName}}")
err := bundle.Apply(ctx, b, trampoline)
require.NoError(t, err)

dir, err := b.InternalDir()
require.NoError(t, err)
filename := filepath.Join(dir, "notebook_to_trampoline.py")

bytes, err := os.ReadFile(filename)
require.NoError(t, err)

require.Equal(t, "Hello from Trampoline", string(bytes))

task := b.Config.Resources.Jobs["test"].Tasks[0]
require.Equal(t, task.NotebookTask.NotebookPath, ".databricks/bundle/development/.internal/notebook_to_trampoline")
require.Nil(t, task.PythonWheelTask)
}
4 changes: 3 additions & 1 deletion bundle/phases/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/databricks/cli/bundle/deploy/lock"
"github.com/databricks/cli/bundle/deploy/terraform"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/cli/bundle/python"
)

// The deploy phase deploys artifacts and resources.
Expand All @@ -17,10 +18,11 @@ func Deploy() bundle.Mutator {
bundle.Defer(
bundle.Seq(
mutator.ValidateGitDetails(),
files.Upload(),
libraries.MatchWithArtifacts(),
artifacts.CleanUp(),
artifacts.UploadAll(),
python.TransformWheelTask(),
files.Upload(),
terraform.Interpolate(),
terraform.Write(),
terraform.StatePull(),
Expand Down
85 changes: 85 additions & 0 deletions bundle/python/transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package python

import (
"fmt"
"strconv"
"strings"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/databricks-sdk-go/service/jobs"
)

const NOTEBOOK_TEMPLATE = `# Databricks notebook source
%python
{{range .Libraries}}
%pip install --force-reinstall {{.Whl}}
{{end}}

from contextlib import redirect_stdout
import io
import sys
sys.argv = [{{.Params}}]

import pkg_resources
_func = pkg_resources.load_entry_point("{{.Task.PackageName}}", "console_scripts", "{{.Task.EntryPoint}}")

f = io.StringIO()
with redirect_stdout(f):
_func()
s = f.getvalue()
dbutils.notebook.exit(s)
`
pietern marked this conversation as resolved.
Show resolved Hide resolved

// This mutator takes the wheel task and transforms it into notebook
// which installs uploaded wheels using %pip and then calling corresponding
// entry point.
func TransformWheelTask() bundle.Mutator {
return mutator.NewTrampoline(
"python_wheel",
getTasks,
generateTemplateData,
cleanUpTask,
NOTEBOOK_TEMPLATE,
)
}

func getTasks(b *bundle.Bundle) []*jobs.Task {
return libraries.FindAllWheelTasks(b)
}

func generateTemplateData(task *jobs.Task) (map[string]any, error) {
params, err := generateParameters(task.PythonWheelTask)
if err != nil {
return nil, err
}

data := map[string]any{
"Libraries": task.Libraries,
"Params": params,
"Task": task.PythonWheelTask,
}

return data, nil
}

func generateParameters(task *jobs.PythonWheelTask) (string, error) {
if task.Parameters != nil && task.NamedParameters != nil {
return "", fmt.Errorf("not allowed to pass both paramaters and named_parameters")
}
params := append([]string{"python"}, task.Parameters...)
for k, v := range task.NamedParameters {
params = append(params, fmt.Sprintf("%s=%s", k, v))
andrewnester marked this conversation as resolved.
Show resolved Hide resolved
}

for i := range params {
params[i] = strconv.Quote(params[i])
}
return strings.Join(params, ", "), nil
}

func cleanUpTask(task *jobs.Task) {
task.PythonWheelTask = nil
task.Libraries = nil
}
andrewnester marked this conversation as resolved.
Show resolved Hide resolved
57 changes: 57 additions & 0 deletions bundle/python/transform_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package python

import (
"testing"

"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/require"
)

type testCase struct {
Actual []string
Expected string
}
type NamedParams map[string]string
type testCaseNamed struct {
Actual NamedParams
Expected string
}

var paramsTestCases []testCase = []testCase{
{[]string{}, `"python"`},
{[]string{"a"}, `"python", "a"`},
{[]string{"a", "b"}, `"python", "a", "b"`},
{[]string{"123!@#$%^&*()-="}, `"python", "123!@#$%^&*()-="`},
andrewnester marked this conversation as resolved.
Show resolved Hide resolved
{[]string{`{"a": 1}`}, `"python", "{\"a\": 1}"`},
}

var paramsTestCasesNamed []testCaseNamed = []testCaseNamed{
{NamedParams{}, `"python"`},
{NamedParams{"a": "1"}, `"python", "a=1"`},
{NamedParams{"a": "1", "b": "2"}, `"python", "a=1", "b=2"`},
{NamedParams{"data": `{"a": 1}`}, `"python", "data={\"a\": 1}"`},
}

func TestGenerateParameters(t *testing.T) {
for _, c := range paramsTestCases {
task := &jobs.PythonWheelTask{Parameters: c.Actual}
result, err := generateParameters(task)
require.NoError(t, err)
require.Equal(t, c.Expected, result)
}
}

func TestGenerateNamedParameters(t *testing.T) {
for _, c := range paramsTestCasesNamed {
task := &jobs.PythonWheelTask{NamedParameters: c.Actual}
result, err := generateParameters(task)
require.NoError(t, err)
require.Equal(t, c.Expected, result)
}
}

func TestGenerateBoth(t *testing.T) {
task := &jobs.PythonWheelTask{NamedParameters: map[string]string{"a": "1"}, Parameters: []string{"b"}}
_, err := generateParameters(task)
require.Error(t, err)
andrewnester marked this conversation as resolved.
Show resolved Hide resolved
}