Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added transformation mutator for Python wheel task for them to work on DBR <13.1 #635

Merged
merged 10 commits into from
Aug 30, 2023
44 changes: 33 additions & 11 deletions bundle/libraries/libraries.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,46 @@ func (a *match) Name() string {
}

func (a *match) Apply(ctx context.Context, b *bundle.Bundle) error {
tasks := findAllTasks(b)
for _, task := range tasks {
if isMissingRequiredLibraries(task) {
return fmt.Errorf("task '%s' is missing required libraries. Please include your package code in task libraries block", task.TaskKey)
}
for j := range task.Libraries {
lib := &task.Libraries[j]
err := findArtifactsAndMarkForUpload(ctx, lib, b)
if err != nil {
return err
}
}
}
return nil
}

func findAllTasks(b *bundle.Bundle) []*jobs.Task {
r := b.Config.Resources
result := make([]*jobs.Task, 0)
for k := range b.Config.Resources.Jobs {
tasks := r.Jobs[k].JobSettings.Tasks
for i := range tasks {
task := &tasks[i]
if isMissingRequiredLibraries(task) {
return fmt.Errorf("task '%s' is missing required libraries. Please include your package code in task libraries block", task.TaskKey)
}
for j := range task.Libraries {
lib := &task.Libraries[j]
err := findArtifactsAndMarkForUpload(ctx, lib, b)
if err != nil {
return err
}
}
result = append(result, task)
}
}
return nil

return result
}

func FindAllWheelTasks(b *bundle.Bundle) []*jobs.Task {
tasks := findAllTasks(b)
wheelTasks := make([]*jobs.Task, 0)
for _, task := range tasks {
if task.PythonWheelTask != nil {
wheelTasks = append(wheelTasks, task)
}
}

return wheelTasks
}

func isMissingRequiredLibraries(task *jobs.Task) bool {
Expand Down
4 changes: 3 additions & 1 deletion bundle/phases/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/databricks/cli/bundle/deploy/lock"
"github.com/databricks/cli/bundle/deploy/terraform"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/cli/bundle/python"
)

// The deploy phase deploys artifacts and resources.
Expand All @@ -17,10 +18,11 @@ func Deploy() bundle.Mutator {
bundle.Defer(
bundle.Seq(
mutator.ValidateGitDetails(),
files.Upload(),
libraries.MatchWithArtifacts(),
artifacts.CleanUp(),
artifacts.UploadAll(),
python.TransformWheelTask(),
files.Upload(),
terraform.Interpolate(),
terraform.Write(),
terraform.StatePull(),
Expand Down
147 changes: 147 additions & 0 deletions bundle/python/transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package python

import (
"context"
"fmt"
"os"
"path"
"path/filepath"
"strings"
"text/template"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/libraries"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
)

const NOTEBOOK_TEMPLATE = `# Databricks notebook source
%python
{{range .Libraries}}
%pip install --force-reinstall {{.Whl}}
{{end}}

from contextlib import redirect_stdout
import io
import sys
sys.argv = [{{.Params}}]

import pkg_resources
_func = pkg_resources.load_entry_point("{{.Task.PackageName}}", "console_scripts", "{{.Task.EntryPoint}}")

f = io.StringIO()
with redirect_stdout(f):
_func()
s = f.getvalue()
dbutils.notebook.exit(s)
`
pietern marked this conversation as resolved.
Show resolved Hide resolved

// This mutator takes the wheel task and transforms it into notebook
// which installs uploaded wheels using %pip and then calling corresponding
// entry point.
func TransformWheelTask() bundle.Mutator {
return &transform{}
}

type transform struct {
}

func (m *transform) Name() string {
return "python.TransformWheelTask"
}

func (m *transform) Apply(ctx context.Context, b *bundle.Bundle) error {
wheelTasks := libraries.FindAllWheelTasks(b)
for _, wheelTask := range wheelTasks {
andrewnester marked this conversation as resolved.
Show resolved Hide resolved
taskDefinition := wheelTask.PythonWheelTask
libraries := wheelTask.Libraries

wheelTask.PythonWheelTask = nil
wheelTask.Libraries = nil

filename, err := generateNotebookWrapper(b, taskDefinition, libraries)
if err != nil {
return err
}

internalDir, err := getInternalDir(b)
if err != nil {
return err
}

internalDirRel, err := filepath.Rel(b.Config.Path, internalDir)
if err != nil {
return err
}

parts := []string{b.Config.Workspace.FilesPath}
parts = append(parts, strings.Split(internalDirRel, string(os.PathSeparator))...)
parts = append(parts, filename)
andrewnester marked this conversation as resolved.
Show resolved Hide resolved

wheelTask.NotebookTask = &jobs.NotebookTask{
NotebookPath: path.Join(parts...),
}
}
return nil
}
andrewnester marked this conversation as resolved.
Show resolved Hide resolved

func getInternalDir(b *bundle.Bundle) (string, error) {
cacheDir, err := b.CacheDir()
if err != nil {
return "", err
}
internalDir := filepath.Join(cacheDir, ".internal")
return internalDir, nil
}

func generateNotebookWrapper(b *bundle.Bundle, task *jobs.PythonWheelTask, libraries []compute.Library) (string, error) {
internalDir, err := getInternalDir(b)
if err != nil {
return "", err
}

notebookName := fmt.Sprintf("notebook_%s_%s", task.PackageName, task.EntryPoint)
path := filepath.Join(internalDir, notebookName+".py")

err = os.MkdirAll(filepath.Dir(path), 0755)
if err != nil {
return "", err
}

f, err := os.Create(path)
if err != nil {
return "", err
}
defer f.Close()

params, err := generateParameters(task)
if err != nil {
return "", err
}

data := map[string]any{
"Libraries": libraries,
"Params": params,
"Task": task,
}

t, err := template.New("notebook").Parse(NOTEBOOK_TEMPLATE)
if err != nil {
return "", err
}
return notebookName, t.Execute(f, data)
andrewnester marked this conversation as resolved.
Show resolved Hide resolved
}

func generateParameters(task *jobs.PythonWheelTask) (string, error) {
if task.Parameters != nil && task.NamedParameters != nil {
return "", fmt.Errorf("not allowed to pass both paramaters and named_parameters")
}
params := append([]string{"python"}, task.Parameters...)
for k, v := range task.NamedParameters {
params = append(params, fmt.Sprintf("%s=%s", k, v))
andrewnester marked this conversation as resolved.
Show resolved Hide resolved
}
for i := range params {
params[i] = `"` + params[i] + `"`
}
return strings.Join(params, ", "), nil
}
andrewnester marked this conversation as resolved.
Show resolved Hide resolved
55 changes: 55 additions & 0 deletions bundle/python/transform_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package python

import (
"testing"

"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/require"
)

type testCase struct {
Actual []string
Expected string
}
type NamedParams map[string]string
type testCaseNamed struct {
Actual NamedParams
Expected string
}

var paramsTestCases []testCase = []testCase{
{[]string{}, `"python"`},
{[]string{"a"}, `"python", "a"`},
{[]string{"a", "b"}, `"python", "a", "b"`},
{[]string{"123!@#$%^&*()-="}, `"python", "123!@#$%^&*()-="`},
andrewnester marked this conversation as resolved.
Show resolved Hide resolved
}

var paramsTestCasesNamed []testCaseNamed = []testCaseNamed{
{NamedParams{}, `"python"`},
{NamedParams{"a": "1"}, `"python", "a=1"`},
{NamedParams{"a": "1", "b": "2"}, `"python", "a=1", "b=2"`},
}

func TestGenerateParameters(t *testing.T) {
for _, c := range paramsTestCases {
task := &jobs.PythonWheelTask{Parameters: c.Actual}
result, err := generateParameters(task)
require.NoError(t, err)
require.Equal(t, c.Expected, result)
}
}

func TestGenerateNamedParameters(t *testing.T) {
for _, c := range paramsTestCasesNamed {
task := &jobs.PythonWheelTask{NamedParameters: c.Actual}
result, err := generateParameters(task)
require.NoError(t, err)
require.Equal(t, c.Expected, result)
}
}

func TestGenerateBoth(t *testing.T) {
task := &jobs.PythonWheelTask{NamedParameters: map[string]string{"a": "1"}, Parameters: []string{"b"}}
_, err := generateParameters(task)
require.Error(t, err)
andrewnester marked this conversation as resolved.
Show resolved Hide resolved
}