Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIP-47 - Migrate databricks DAGs to new design #22442 #24203

Merged
merged 1 commit into from
Jun 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions airflow/providers/databricks/example_dags/__init__.py

This file was deleted.

2 changes: 1 addition & 1 deletion docs/apache-airflow-providers-databricks/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Content
:maxdepth: 1
:caption: Resources

Example DAGs <https://github.com/apache/airflow/tree/main/airflow/providers/databricks/example_dags>
Example DAGs <https://github.com/apache/airflow/tree/main/tests/system/providers/databricks>
PyPI Repository <https://pypi.org/project/apache-airflow-providers-databricks/>
Installing from sources <installing-providers-from-sources>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Importing CSV data

An example usage of the DatabricksCopyIntoOperator to import CSV data into a table is as follows:

.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_sql.py
.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_sql.py
:language: python
:start-after: [START howto_operator_databricks_copy_into]
:end-before: [END howto_operator_databricks_copy_into]
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Create a Databricks Repo

An example usage of the DatabricksReposCreateOperator is as follows:

.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_repos.py
.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_repos.py
:language: python
:start-after: [START howto_operator_databricks_repo_create]
:end-before: [END howto_operator_databricks_repo_create]
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Deleting Databricks Repo by specifying path

An example usage of the DatabricksReposDeleteOperator is as follows:

.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_repos.py
.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_repos.py
:language: python
:start-after: [START howto_operator_databricks_repo_delete]
:end-before: [END howto_operator_databricks_repo_delete]
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ Updating Databricks Repo by specifying path

An example usage of the DatabricksReposUpdateOperator is as follows:

.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_repos.py
.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_repos.py
:language: python
:start-after: [START howto_operator_databricks_repo_update]
:end-before: [END howto_operator_databricks_repo_update]
8 changes: 4 additions & 4 deletions docs/apache-airflow-providers-databricks/operators/sql.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Selecting data

An example usage of the DatabricksSqlOperator to select data from a table is as follows:

.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_sql.py
.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_sql.py
:language: python
:start-after: [START howto_operator_databricks_sql_select]
:end-before: [END howto_operator_databricks_sql_select]
Expand All @@ -59,7 +59,7 @@ Selecting data into a file

An example usage of the DatabricksSqlOperator to select data from a table and store in a file is as follows:

.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_sql.py
.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_sql.py
:language: python
:start-after: [START howto_operator_databricks_sql_select_file]
:end-before: [END howto_operator_databricks_sql_select_file]
Expand All @@ -69,7 +69,7 @@ Executing multiple statements

An example usage of the DatabricksSqlOperator to perform multiple SQL statements is as follows:

.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_sql.py
.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_sql.py
:language: python
:start-after: [START howto_operator_databricks_sql_multiple]
:end-before: [END howto_operator_databricks_sql_multiple]
Expand All @@ -80,7 +80,7 @@ Executing multiple statements from a file

An example usage of the DatabricksSqlOperator to perform statements from a file is as follows:

.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks_sql.py
.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks_sql.py
:language: python
:start-after: [START howto_operator_databricks_sql_multiple_file]
:end-before: [END howto_operator_databricks_sql_multiple_file]
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Specifying parameters as JSON

An example usage of the DatabricksSubmitRunOperator is as follows:

.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks.py
.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks.py
:language: python
:start-after: [START howto_operator_databricks_json]
:end-before: [END howto_operator_databricks_json]
Expand All @@ -71,7 +71,7 @@ Using named parameters

You can also use named parameters to initialize the operator and run the job.

.. exampleinclude:: /../../airflow/providers/databricks/example_dags/example_databricks.py
.. exampleinclude:: /../../tests/system/providers/databricks/example_databricks.py
:language: python
:start-after: [START howto_operator_databricks_named]
:end-before: [END howto_operator_databricks_named]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@
https://docs.databricks.com/api/latest/jobs.html#runstate
"""

import os
from datetime import datetime

from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_databricks_operator"

with DAG(
dag_id='example_databricks_operator',
dag_id=DAG_ID,
schedule_interval='@daily',
start_date=datetime(2021, 1, 1),
tags=['example'],
Expand Down Expand Up @@ -73,3 +77,14 @@
)
# [END howto_operator_databricks_named]
notebook_task >> spark_jar_task

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()

from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.

import os
from datetime import datetime

from airflow import DAG
Expand All @@ -30,8 +31,11 @@
'databricks_conn_id': 'databricks',
}

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_databricks_repos_operator"

with DAG(
dag_id='example_databricks_repos_operator',
dag_id=DAG_ID,
schedule_interval='@daily',
start_date=datetime(2021, 1, 1),
default_args=default_args,
Expand Down Expand Up @@ -72,3 +76,14 @@
# [END howto_operator_databricks_repo_delete]

(create_repo >> update_repo >> notebook_task >> delete_repo)

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()

from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
https://docs.databricks.com/api/latest/jobs.html#runstate
"""

import os
from datetime import datetime

from airflow import DAG
Expand All @@ -39,8 +40,11 @@
DatabricksSqlOperator,
)

ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_databricks_sql_operator"

with DAG(
dag_id='example_databricks_sql_operator',
dag_id=DAG_ID,
schedule_interval='@daily',
start_date=datetime(2021, 1, 1),
tags=['example'],
Expand Down Expand Up @@ -111,3 +115,14 @@
# [END howto_operator_databricks_copy_into]

(create >> create_file >> import_csv >> select >> select_into_file)

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()

from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)