Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add prompt when a pipeline recreation or deletion happens #1672

Merged
merged 10 commits into from
Sep 4, 2024
92 changes: 66 additions & 26 deletions bundle/phases/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,55 +19,95 @@ import (
"github.com/databricks/cli/bundle/scripts"
"github.com/databricks/cli/libs/cmdio"
terraformlib "github.com/databricks/cli/libs/terraform"
tfjson "github.com/hashicorp/terraform-json"
)

func approvalForUcSchemaDelete(ctx context.Context, b *bundle.Bundle) (bool, error) {
tf := b.Terraform
if tf == nil {
return false, fmt.Errorf("terraform not initialized")
}

// read plan file
plan, err := tf.ShowPlanFile(ctx, b.Plan.Path)
if err != nil {
return false, err
}

actions := make([]terraformlib.Action, 0)
for _, rc := range plan.ResourceChanges {
// We only care about destructive actions on UC schema resources.
if rc.Type != "databricks_schema" {
func parseTerraformActions(changes []*tfjson.ResourceChange, toInclude func(typ string, actions tfjson.Actions) bool) []terraformlib.Action {
res := make([]terraformlib.Action, 0)
for _, rc := range changes {
if !toInclude(rc.Type, rc.Change.Actions) {
continue
}

var actionType terraformlib.ActionType

switch {
case rc.Change.Actions.Delete():
actionType = terraformlib.ActionTypeDelete
case rc.Change.Actions.Replace():
actionType = terraformlib.ActionTypeRecreate
default:
// We don't need a prompt for non-destructive actions like creating
// or updating a schema.
// No use case for other action types yet.
continue
}

actions = append(actions, terraformlib.Action{
res = append(res, terraformlib.Action{
Action: actionType,
ResourceType: rc.Type,
ResourceName: rc.Name,
})
}

// No restricted actions planned. No need for approval.
if len(actions) == 0 {
return res
}

func approvalForDeploy(ctx context.Context, b *bundle.Bundle) (bool, error) {
tf := b.Terraform
if tf == nil {
return false, fmt.Errorf("terraform not initialized")
}

// read plan file
plan, err := tf.ShowPlanFile(ctx, b.Plan.Path)
if err != nil {
return false, err
}

schemaActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool {
// Filter in only UC schema resources.
if typ != "databricks_schema" {
return false
}

// We only display prompts for destructive actions like deleting or
// recreating a schema.
return actions.Delete() || actions.Replace()
})

dltActions := parseTerraformActions(plan.ResourceChanges, func(typ string, actions tfjson.Actions) bool {
// Filter in only DLT pipeline resources.
if typ != "databricks_pipeline" {
return false
}

// Recreating DLT pipeline leads to metadata loss and for a transient period
// the underling tables will be unavailable.
return actions.Replace() || actions.Delete()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this always true per the switch/case on the action type in parseTerraformActions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are correct. This structure is from when this PR was just for recreates of DLT. I can clean this up in a followup.

})

// We don't need to display any prompts in this case.
if len(dltActions) == 0 && len(schemaActions) == 0 {
return true, nil
}

cmdio.LogString(ctx, "The following UC schemas will be deleted or recreated. Any underlying data may be lost:")
for _, action := range actions {
cmdio.Log(ctx, action)
// One or more UC schema resources will be deleted or recreated.
if len(schemaActions) != 0 {
cmdio.LogString(ctx, "The following UC schemas will be deleted or recreated. Any underlying data may be lost:")
for _, action := range schemaActions {
cmdio.Log(ctx, action)
}
}

// One or more DLT pipelines is being recreated.
if len(dltActions) != 0 {
msg := `
This action will result in the deletion or recreation of the following DLT Pipelines along with the
Streaming Tables (STs) and Materialized Views (MVs) managed by them. Recreating the Pipelines will
restore the defined STs and MVs through full refresh. Note that recreation is necessary when pipeline
properties such as the 'catalog' or 'storage' are changed:`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The note about recreation should only be shown on actual recreation.

Can be a follow-up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Will do.

cmdio.LogString(ctx, msg)
for _, action := range dltActions {
cmdio.Log(ctx, action)
}
}

if b.AutoApprove {
Expand Down Expand Up @@ -126,7 +166,7 @@ func Deploy() bundle.Mutator {
terraform.CheckRunningResource(),
terraform.Plan(terraform.PlanGoal("deploy")),
bundle.If(
approvalForUcSchemaDelete,
approvalForDeploy,
deployCore,
bundle.LogString("Deployment cancelled!"),
),
Expand Down
67 changes: 67 additions & 0 deletions bundle/phases/deploy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package phases

import (
"testing"

terraformlib "github.com/databricks/cli/libs/terraform"
tfjson "github.com/hashicorp/terraform-json"
"github.com/stretchr/testify/assert"
)

func TestParseTerraformActions(t *testing.T) {
changes := []*tfjson.ResourceChange{
{
Type: "databricks_pipeline",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionCreate},
},
Name: "create pipeline",
},
{
Type: "databricks_pipeline",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionDelete},
},
Name: "delete pipeline",
},
{
Type: "databricks_pipeline",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionDelete, tfjson.ActionCreate},
},
Name: "recreate pipeline",
},
{
Type: "databricks_whatever",
Change: &tfjson.Change{
Actions: tfjson.Actions{tfjson.ActionDelete, tfjson.ActionCreate},
},
Name: "recreate whatever",
},
}

res := parseTerraformActions(changes, func(typ string, actions tfjson.Actions) bool {
if typ != "databricks_pipeline" {
return false
}

if actions.Delete() || actions.Replace() {
return true
}

return false
})

assert.Equal(t, []terraformlib.Action{
{
Action: terraformlib.ActionTypeDelete,
ResourceType: "databricks_pipeline",
ResourceName: "delete pipeline",
},
{
Action: terraformlib.ActionTypeRecreate,
ResourceType: "databricks_pipeline",
ResourceName: "recreate pipeline",
},
}, res)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{
"properties": {
"unique_id": {
"type": "string",
"description": "Unique ID for the schema and pipeline names"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
bundle:
name: "bundle-playground"

variables:
catalog:
description: The catalog the DLT pipeline should use.
default: main


resources:
pipelines:
foo:
name: test-pipeline-{{.unique_id}}
libraries:
- notebook:
path: ./nb.sql
development: true
catalog: ${var.catalog}

include:
- "*.yml"

targets:
development:
default: true
2 changes: 2 additions & 0 deletions internal/bundle/bundles/recreate_pipeline/template/nb.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- Databricks notebook source
select 1
91 changes: 90 additions & 1 deletion internal/bundle/deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,97 @@ func TestAccBundleDeployUcSchemaFailsWithoutAutoApprove(t *testing.T) {
t.Setenv("BUNDLE_ROOT", bundleRoot)
t.Setenv("TERM", "dumb")
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock")
stdout, _, err := c.Run()
stdout, stderr, err := c.Run()

assert.EqualError(t, err, root.ErrAlreadyPrinted.Error())
assert.Contains(t, stderr.String(), "The following UC schemas will be deleted or recreated. Any underlying data may be lost:\n delete schema bar")
assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed")
}

func TestAccBundlePipelineDeleteWithoutAutoApprove(t *testing.T) {
ctx, wt := acc.WorkspaceTest(t)
w := wt.W

nodeTypeId := internal.GetNodeTypeId(env.Get(ctx, "CLOUD_ENV"))
uniqueId := uuid.New().String()
bundleRoot, err := initTestTemplate(t, ctx, "deploy_then_remove_resources", map[string]any{
"unique_id": uniqueId,
"node_type_id": nodeTypeId,
"spark_version": defaultSparkVersion,
})
require.NoError(t, err)

// deploy pipeline
err = deployBundle(t, ctx, bundleRoot)
require.NoError(t, err)

// assert pipeline is created
pipelineName := "test-bundle-pipeline-" + uniqueId
pipeline, err := w.Pipelines.GetByName(ctx, pipelineName)
require.NoError(t, err)
assert.Equal(t, pipeline.Name, pipelineName)

// assert job is created
jobName := "test-bundle-job-" + uniqueId
job, err := w.Jobs.GetBySettingsName(ctx, jobName)
require.NoError(t, err)
assert.Equal(t, job.Settings.Name, jobName)

// delete resources.yml
err = os.Remove(filepath.Join(bundleRoot, "resources.yml"))
require.NoError(t, err)

// Redeploy the bundle. Expect it to fail because deleting the pipeline requires --auto-approve.
t.Setenv("BUNDLE_ROOT", bundleRoot)
t.Setenv("TERM", "dumb")
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock")
stdout, stderr, err := c.Run()

assert.EqualError(t, err, root.ErrAlreadyPrinted.Error())
assert.Contains(t, stderr.String(), `This action will result in the deletion or recreation of the following DLT Pipelines along with the
Streaming Tables (STs) and Materialized Views (MVs) managed by them. Recreating the Pipelines will
restore the defined STs and MVs through full refresh. Note that recreation is necessary when pipeline
properties such as the 'catalog' or 'storage' are changed:
delete pipeline bar`)
assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed")

}

func TestAccBundlePipelineRecreateWithoutAutoApprove(t *testing.T) {
ctx, wt := acc.UcWorkspaceTest(t)
w := wt.W
uniqueId := uuid.New().String()

bundleRoot, err := initTestTemplate(t, ctx, "recreate_pipeline", map[string]any{
"unique_id": uniqueId,
})
require.NoError(t, err)

err = deployBundle(t, ctx, bundleRoot)
require.NoError(t, err)

t.Cleanup(func() {
destroyBundle(t, ctx, bundleRoot)
})

// Assert the pipeline is created
pipelineName := "test-pipeline-" + uniqueId
pipeline, err := w.Pipelines.GetByName(ctx, pipelineName)
require.NoError(t, err)
require.Equal(t, pipelineName, pipeline.Name)

// Redeploy the bundle, pointing the DLT pipeline to a different UC catalog.
t.Setenv("BUNDLE_ROOT", bundleRoot)
t.Setenv("TERM", "dumb")
c := internal.NewCobraTestRunnerWithContext(t, ctx, "bundle", "deploy", "--force-lock", "--var=\"catalog=whatever\"")
stdout, stderr, err := c.Run()

assert.EqualError(t, err, root.ErrAlreadyPrinted.Error())
assert.Contains(t, stderr.String(), `This action will result in the deletion or recreation of the following DLT Pipelines along with the
Streaming Tables (STs) and Materialized Views (MVs) managed by them. Recreating the Pipelines will
restore the defined STs and MVs through full refresh. Note that recreation is necessary when pipeline
properties such as the 'catalog' or 'storage' are changed:
recreate pipeline foo`)
assert.Contains(t, stdout.String(), "the deployment requires destructive actions, but current console does not support prompting. Please specify --auto-approve if you would like to skip prompts and proceed")
}

Expand Down
Loading