Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add development runs #522

Merged
merged 26 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
48d6df3
Add support for "debug runs"
lennartkats-db Jun 18, 2023
27b2b77
Introduce DATABRICKS_COMPUTE environment variable
lennartkats-db Jun 18, 2023
823c868
Increase max concurrent runs for debug runs
lennartkats-db Jun 18, 2023
6414701
Merge remote-tracking branch 'upstream/main' into add-debug-runs
lennartkats-db Jun 18, 2023
bdd2124
Rename Deploy() to deploy()
lennartkats-db Jun 18, 2023
7c654ec
Pause schedule for debug jobs
lennartkats-db Jun 20, 2023
b7a80af
Merge remote-tracking branch 'upstream/main' into add-debug-runs
lennartkats-db Jun 20, 2023
567d76d
Rename to DATABRICKS_COMPUTE_ID
lennartkats-db Jun 22, 2023
30667d4
Change to DATABRICKS_CLUSTER_ID
lennartkats-db Jun 22, 2023
b7c53b2
Merge remote-tracking branch 'upstream/main' into add-debug-runs
lennartkats-db Jun 23, 2023
458a2d0
Dummy commit
lennartkats-db Jun 23, 2023
4762716
Rename "debug" to "development"
lennartkats-db Jul 3, 2023
15396bc
Implement --no-wait for pipelines
lennartkats-db Jul 3, 2023
5a7b556
Add docstring
lennartkats-db Jul 3, 2023
34a1698
Pass compute override as part of the bundle object
lennartkats-db Jul 3, 2023
ddcd613
Don't support databricks bundle run --deploy for now
lennartkats-db Jul 3, 2023
64edf39
Merge remote-tracking branch 'databricks/main' into add-debug-runs
lennartkats-db Jul 3, 2023
35f4d3b
Use latest config names
lennartkats-db Jul 3, 2023
36a6e93
Fix tests
lennartkats-db Jul 4, 2023
be2dd4f
Improve DATABRICKS_CLUSTER_ID handling for non-development modes
lennartkats-db Jul 7, 2023
d221d4d
Make it possible to override the compute id in an environment
lennartkats-db Jul 7, 2023
50b76bb
Cleanup
lennartkats-db Jul 7, 2023
6b221c0
Cleanup
lennartkats-db Jul 7, 2023
cbd3068
Support experiment names that use full paths
lennartkats-db Jul 7, 2023
54a66c0
Fix paths on Windows
lennartkats-db Jul 7, 2023
77fcc92
Add comment
lennartkats-db Jul 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bundle/config/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ type Bundle struct {
// Contains Git information like current commit, current branch and
// origin url. Automatically loaded by reading .git directory if not specified
Git Git `json:"git,omitempty"`

Mode Mode `json:"mode,omitempty"`
lennartkats-db marked this conversation as resolved.
Show resolved Hide resolved
}
10 changes: 10 additions & 0 deletions bundle/config/environment.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
package config

type Mode string

const (
Debug Mode = "debug"
Default Mode = "default"
PullRequest Mode = "pull-request"
)

// Environment defines overrides for a single environment.
// This structure is recursively merged into the root configuration.
type Environment struct {
// Default marks that this environment must be used if one isn't specified
// by the user (through environment variable or command line argument).
Default bool `json:"default,omitempty"`

Mode Mode `json:"mode,omitempty"`

Bundle *Bundle `json:"bundle,omitempty"`

Workspace *Workspace `json:"workspace,omitempty"`
Expand Down
50 changes: 50 additions & 0 deletions bundle/config/mutator/override_compute.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package mutator

import (
"context"
"fmt"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/resources"
)

type overrideCompute struct {
compute string
}

func OverrideCompute(compute string) bundle.Mutator {
return &overrideCompute{compute: compute}
}

func (m *overrideCompute) Name() string {
return "OverrideCompute"
}

func (m *overrideCompute) overrideJobCompute(j *resources.Job) {
for i := range j.Tasks {
task := &j.Tasks[i]
if task.NewCluster != nil {
task.NewCluster = nil
task.ExistingClusterId = m.compute
} else if task.ExistingClusterId != "" {
task.ExistingClusterId = m.compute
}
}
}

func (m *overrideCompute) Apply(ctx context.Context, b *bundle.Bundle) error {
if m.compute == "" {
return nil
}
if b.Config.Bundle.Mode != config.Debug {
return fmt.Errorf("cannot override compute for an environment that does not use 'mode: debug'")
}

r := b.Config.Resources
for i := range r.Jobs {
m.overrideJobCompute(r.Jobs[i])
}

return nil
}
47 changes: 47 additions & 0 deletions bundle/config/mutator/override_compute_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package mutator_test

import (
"context"
"testing"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/compute"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestOverrideCompute(t *testing.T) {
bundle := &bundle.Bundle{
Config: config.Root{
Bundle: config.Bundle{
Mode: config.Debug,
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {JobSettings: &jobs.JobSettings{
Name: "job1",
Tasks: []jobs.JobTaskSettings{
{
NewCluster: &compute.BaseClusterInfo{},
},
{
ExistingClusterId: "cluster2",
},
},
}},
},
},
},
}

m := mutator.OverrideCompute("newClusterID")
err := m.Apply(context.Background(), bundle)
require.NoError(t, err)
assert.Nil(t, bundle.Config.Resources.Jobs["job1"].Tasks[0].NewCluster)
assert.Equal(t, "newClusterID", bundle.Config.Resources.Jobs["job1"].Tasks[0].ExistingClusterId)
assert.Equal(t, "newClusterID", bundle.Config.Resources.Jobs["job1"].Tasks[1].ExistingClusterId)
}
4 changes: 4 additions & 0 deletions bundle/config/mutator/populate_current_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ func (m *populateCurrentUser) Name() string {
}

func (m *populateCurrentUser) Apply(ctx context.Context, b *bundle.Bundle) error {
if b.Config.Workspace.CurrentUser != nil {
return nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious: has this run twice in your testing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it would, but I think that may have only happened with databricks deploy --run. So I'll remove this.

We should actually have a general fix to make sure w.CurrentUser.Me() is cached, since it is very very slow and was still being called a second time in some other cases. This could be part of a general initiative to make things a bit faster.


w := b.WorkspaceClient()
me, err := w.CurrentUser.Me(ctx)
if err != nil {
Expand Down
82 changes: 82 additions & 0 deletions bundle/config/mutator/process_environment_mode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package mutator

import (
"context"
"fmt"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/databricks-sdk-go/service/ml"
)

type processEnvironmentMode struct{}

const debugConcurrentRuns = 4
lennartkats-db marked this conversation as resolved.
Show resolved Hide resolved

func ProcessEnvironmentMode() bundle.Mutator {
return &processEnvironmentMode{}
}

func (m *processEnvironmentMode) Name() string {
return "ProcessEnvironmentMode"
}

// Mark all resources as being for 'debug' purposes, i.e.
// changing their their name, adding tags, and (in the future)
// marking them as 'hidden' in the UI.
func processDebugMode(b *bundle.Bundle) error {
r := b.Config.Resources

for i := range r.Jobs {
r.Jobs[i].Name = "[debug] " + r.Jobs[i].Name
if r.Jobs[i].Tags == nil {
r.Jobs[i].Tags = make(map[string]string)
}
r.Jobs[i].Tags["debug"] = ""
if r.Jobs[i].MaxConcurrentRuns == 0 {
r.Jobs[i].MaxConcurrentRuns = debugConcurrentRuns
lennartkats-db marked this conversation as resolved.
Show resolved Hide resolved
}
if r.Jobs[i].Schedule != nil {
r.Jobs[i].Schedule.PauseStatus = "PAUSED"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These "PAUSED" strings should be the constant jobs.PauseStatusPaused.

Copy link
Contributor Author

@lennartkats-db lennartkats-db Jul 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Is that another SDK name that should be changed, btw? :( That doesn't seem like a very idiomatic name.

}
if r.Jobs[i].Continuous != nil {
r.Jobs[i].Continuous.PauseStatus = "PAUSED"
}
if r.Jobs[i].Trigger != nil {
r.Jobs[i].Trigger.PauseStatus = "PAUSED"
}
}

for i := range r.Pipelines {
r.Pipelines[i].Name = "[debug] " + r.Pipelines[i].Name
r.Pipelines[i].Development = true
// (pipelines don't yet support tags)
}

for i := range r.Models {
r.Models[i].Name = "[debug] " + r.Models[i].Name
r.Models[i].Tags = append(r.Models[i].Tags, ml.ModelTag{Key: "debug", Value: ""})
}

for i := range r.Experiments {
r.Experiments[i].Name = "[debug] " + r.Experiments[i].Name
r.Experiments[i].Tags = append(r.Experiments[i].Tags, ml.ExperimentTag{Key: "debug", Value: ""})
}

return nil
}

func (m *processEnvironmentMode) Apply(ctx context.Context, b *bundle.Bundle) error {
switch b.Config.Bundle.Mode {
case config.Debug:
return processDebugMode(b)
case config.Default, "":
// No action
case config.PullRequest:
return fmt.Errorf("not implemented")
default:
return fmt.Errorf("unsupported value specified for 'mode': %s", b.Config.Bundle.Mode)
}

return nil
}
75 changes: 75 additions & 0 deletions bundle/config/mutator/process_environment_mode_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package mutator_test

import (
"context"
"testing"

"github.com/databricks/cli/bundle"
"github.com/databricks/cli/bundle/config"
"github.com/databricks/cli/bundle/config/mutator"
"github.com/databricks/cli/bundle/config/resources"
"github.com/databricks/databricks-sdk-go/service/jobs"
"github.com/databricks/databricks-sdk-go/service/ml"
"github.com/databricks/databricks-sdk-go/service/pipelines"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestProcessEnvironmentModeApplyDebug(t *testing.T) {
bundle := &bundle.Bundle{
Config: config.Root{
Bundle: config.Bundle{
Mode: config.Debug,
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {JobSettings: &jobs.JobSettings{Name: "job1"}},
},
Pipelines: map[string]*resources.Pipeline{
"pipeline1": {PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1"}},
},
Experiments: map[string]*resources.MlflowExperiment{
"experiment1": {Experiment: &ml.Experiment{Name: "experiment1"}},
},
Models: map[string]*resources.MlflowModel{
"model1": {Model: &ml.Model{Name: "model1"}},
},
},
},
}

m := mutator.ProcessEnvironmentMode()
err := m.Apply(context.Background(), bundle)
require.NoError(t, err)
assert.Equal(t, "[debug] job1", bundle.Config.Resources.Jobs["job1"].Name)
assert.Equal(t, "[debug] pipeline1", bundle.Config.Resources.Pipelines["pipeline1"].Name)
assert.Equal(t, "[debug] experiment1", bundle.Config.Resources.Experiments["experiment1"].Name)
assert.Equal(t, "[debug] model1", bundle.Config.Resources.Models["model1"].Name)
assert.Equal(t, "debug", bundle.Config.Resources.Experiments["experiment1"].Experiment.Tags[0].Key)
assert.True(t, bundle.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development)
}

func TestProcessEnvironmentModeApplyDefault(t *testing.T) {
bundle := &bundle.Bundle{
Config: config.Root{
Bundle: config.Bundle{
Mode: config.Default,
},
Resources: config.Resources{
Jobs: map[string]*resources.Job{
"job1": {JobSettings: &jobs.JobSettings{Name: "job1"}},
},
Pipelines: map[string]*resources.Pipeline{
"pipeline1": {PipelineSpec: &pipelines.PipelineSpec{Name: "pipeline1"}},
},
},
},
}

m := mutator.ProcessEnvironmentMode()
err := m.Apply(context.Background(), bundle)
require.NoError(t, err)
assert.Equal(t, "job1", bundle.Config.Resources.Jobs["job1"].Name)
assert.Equal(t, "pipeline1", bundle.Config.Resources.Pipelines["pipeline1"].Name)
assert.False(t, bundle.Config.Resources.Pipelines["pipeline1"].PipelineSpec.Development)
}
4 changes: 4 additions & 0 deletions bundle/config/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,5 +190,9 @@ func (r *Root) MergeEnvironment(env *Environment) error {
}
}

if env.Mode != "" {
r.Bundle.Mode = env.Mode
}

return nil
}
9 changes: 9 additions & 0 deletions bundle/config/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,12 @@ func TestInitializeVariablesUndefinedVariables(t *testing.T) {
err := root.InitializeVariables([]string{"bar=567"})
assert.ErrorContains(t, err, "variable bar has not been defined")
}

func TestRootMergeEnvironmentWithMode(t *testing.T) {
root := &Root{
Bundle: Bundle{},
}
env := &Environment{Mode: Debug}
require.NoError(t, root.MergeEnvironment(env))
assert.Equal(t, Debug, root.Bundle.Mode)
}
4 changes: 3 additions & 1 deletion bundle/phases/initialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// The initialize phase fills in defaults and connects to the workspace.
// Interpolation of fields referring to the "bundle" and "workspace" keys
// happens upon completion of this phase.
func Initialize() bundle.Mutator {
func Initialize(overrideCompute string) bundle.Mutator {
lennartkats-db marked this conversation as resolved.
Show resolved Hide resolved
return newPhase(
"initialize",
[]bundle.Mutator{
Expand All @@ -25,6 +25,8 @@ func Initialize() bundle.Mutator {
interpolation.IncludeLookupsInPath("workspace"),
interpolation.IncludeLookupsInPath(variable.VariableReferencePrefix),
),
mutator.OverrideCompute(overrideCompute),
mutator.ProcessEnvironmentMode(),
mutator.TranslatePaths(),
terraform.Initialize(),
},
Expand Down
9 changes: 9 additions & 0 deletions bundle/run/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,15 @@ func (r *jobRunner) Run(ctx context.Context, opts *Options) (output.RunOutput, e
if err != nil {
return nil, fmt.Errorf("cannot start job")
}

if opts.NoWait {
details, err := w.Jobs.GetRun(ctx, jobs.GetRunRequest{
RunId: waiter.RunId,
})
progressLogger.Log(progress.NewJobRunUrlEvent(details.RunPageUrl))
return nil, err
}

run, err := waiter.OnProgress(func(r *jobs.Run) {
pullRunId(r)
logDebug(r)
Expand Down
1 change: 1 addition & 0 deletions bundle/run/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
type Options struct {
Job JobOptions
Pipeline PipelineOptions
NoWait bool
}

func (o *Options) Define(fs *flag.FlagSet) {
Expand Down
4 changes: 4 additions & 0 deletions bundle/run/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ func (r *pipelineRunner) Run(ctx context.Context, opts *Options) (output.RunOutp
return nil, fmt.Errorf("no progress logger found")
}

if opts.NoWait {
log.Warnf(ctx, "--no-wait is not yet implemented for pipelines")
}

// Log the pipeline update URL as soon as it is available.
progressLogger.Log(progress.NewPipelineUpdateUrlEvent(w.Config.Host, updateID, pipelineID))

Expand Down
1 change: 1 addition & 0 deletions bundle/tests/job_and_pipeline/bundle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ resources:

environments:
development:
mode: debug
resources:
pipelines:
nyc_taxi_pipeline:
Expand Down
2 changes: 2 additions & 0 deletions bundle/tests/job_and_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"path/filepath"
"testing"

"github.com/databricks/cli/bundle/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -15,6 +16,7 @@ func TestJobAndPipelineDevelopment(t *testing.T) {

p := b.Config.Resources.Pipelines["nyc_taxi_pipeline"]
assert.Equal(t, "job_and_pipeline/bundle.yml", filepath.ToSlash(p.ConfigFilePath))
assert.Equal(t, b.Config.Bundle.Mode, config.Debug)
assert.True(t, p.Development)
require.Len(t, p.Libraries, 1)
assert.Equal(t, "./dlt/nyc_taxi_loader", p.Libraries[0].Notebook.Path)
Expand Down
Loading