Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add extra links for google dataproc #10343

Merged
merged 17 commits into from
May 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 103 additions & 1 deletion airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,57 @@
from google.protobuf.field_mask_pb2 import FieldMask

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.models import BaseOperator, BaseOperatorLink
from airflow.models.taskinstance import TaskInstance
from airflow.providers.google.cloud.hooks.dataproc import DataprocHook, DataProcJobBuilder
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults

DATAPROC_BASE_LINK = "https://console.cloud.google.com/dataproc"
DATAPROC_JOB_LOG_LINK = DATAPROC_BASE_LINK + "/jobs/{job_id}?region={region}&project={project_id}"
DATAPROC_CLUSTER_LINK = (
DATAPROC_BASE_LINK + "/clusters/{cluster_name}/monitoring?region={region}&project={project_id}"
)


class DataprocJobLink(BaseOperatorLink):
"""Helper class for constructing Dataproc Job link"""

name = "Dataproc Job"

def get_link(self, operator, dttm):
ti = TaskInstance(task=operator, execution_date=dttm)
job_conf = ti.xcom_pull(task_ids=operator.task_id, key="job_conf")
return (
DATAPROC_JOB_LOG_LINK.format(
job_id=job_conf["job_id"],
region=job_conf["region"],
project_id=job_conf["project_id"],
)
if job_conf
else ""
)


class DataprocClusterLink(BaseOperatorLink):
"""Helper class for constructing Dataproc Cluster link"""

name = "Dataproc Cluster"

def get_link(self, operator, dttm):
ti = TaskInstance(task=operator, execution_date=dttm)
cluster_conf = ti.xcom_pull(task_ids=operator.task_id, key="cluster_conf")
return (
DATAPROC_CLUSTER_LINK.format(
cluster_name=cluster_conf["cluster_name"],
region=cluster_conf["region"],
project_id=cluster_conf["project_id"],
)
if cluster_conf
else ""
)


# pylint: disable=too-many-instance-attributes
class ClusterGenerator:
Expand Down Expand Up @@ -478,6 +523,8 @@ class DataprocCreateClusterOperator(BaseOperator):
)
template_fields_renderers = {'cluster_config': 'json'}

operator_extra_links = (DataprocClusterLink(),)

@apply_defaults
def __init__( # pylint: disable=too-many-arguments
self,
Expand Down Expand Up @@ -620,6 +667,16 @@ def _wait_for_cluster_in_creating_state(self, hook: DataprocHook) -> Cluster:
def execute(self, context) -> dict:
self.log.info('Creating cluster: %s', self.cluster_name)
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
# Save data required to display extra link no matter what the cluster status will be
self.xcom_push(
context,
key="cluster_conf",
value={
"cluster_name": self.cluster_name,
"region": self.region,
"project_id": self.project_id,
},
)
try:
# First try to create a new cluster
cluster = self._create_cluster(hook)
Expand Down Expand Up @@ -694,6 +751,8 @@ class DataprocScaleClusterOperator(BaseOperator):

template_fields = ['cluster_name', 'project_id', 'region', 'impersonation_chain']

operator_extra_links = (DataprocClusterLink(),)

@apply_defaults
def __init__(
self,
Expand Down Expand Up @@ -773,6 +832,16 @@ def execute(self, context) -> None:
update_mask = ["config.worker_config.num_instances", "config.secondary_worker_config.num_instances"]

hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
# Save data required to display extra link no matter what the cluster status will be
self.xcom_push(
context,
key="cluster_conf",
value={
"cluster_name": self.cluster_name,
"region": self.region,
"project_id": self.project_id,
},
)
operation = hook.update_cluster(
project_id=self.project_id,
location=self.region,
Expand Down Expand Up @@ -931,6 +1000,8 @@ class DataprocJobBaseOperator(BaseOperator):

job_type = ""

operator_extra_links = (DataprocJobLink(),)

@apply_defaults
def __init__(
self,
Expand Down Expand Up @@ -1005,6 +1076,12 @@ def execute(self, context):
)
job_id = job_object.reference.job_id
self.log.info('Job %s submitted successfully.', job_id)
# Save data required for extra links no matter what the job status will be
self.xcom_push(
context,
key='job_conf',
value={'job_id': job_id, 'region': self.region, 'project_id': self.project_id},
)

if not self.asynchronous:
self.log.info('Waiting for job %s to complete', job_id)
Expand Down Expand Up @@ -1082,6 +1159,8 @@ class DataprocSubmitPigJobOperator(DataprocJobBaseOperator):
ui_color = '#0273d4'
job_type = 'pig_job'

operator_extra_links = (DataprocJobLink(),)

@apply_defaults
def __init__(
self,
Expand Down Expand Up @@ -1871,6 +1950,8 @@ class DataprocSubmitJobOperator(BaseOperator):
template_fields = ('project_id', 'location', 'job', 'impersonation_chain', 'request_id')
template_fields_renderers = {"job": "json"}

operator_extra_links = (DataprocJobLink(),)

@apply_defaults
def __init__(
self,
Expand Down Expand Up @@ -1919,6 +2000,16 @@ def execute(self, context: Dict):
)
job_id = job_object.reference.job_id
self.log.info('Job %s submitted successfully.', job_id)
# Save data required by extra links no matter what the job status will be
self.xcom_push(
context,
key="job_conf",
value={
"job_id": job_id,
"region": self.location,
"project_id": self.project_id,
},
)

if not self.asynchronous:
self.log.info('Waiting for job %s to complete', job_id)
Expand Down Expand Up @@ -1988,6 +2079,7 @@ class DataprocUpdateClusterOperator(BaseOperator):
"""

template_fields = ('impersonation_chain', 'cluster_name')
operator_extra_links = (DataprocClusterLink(),)

@apply_defaults
def __init__( # pylint: disable=too-many-arguments
Expand Down Expand Up @@ -2023,6 +2115,16 @@ def __init__( # pylint: disable=too-many-arguments

def execute(self, context: Dict):
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
# Save data required by extra links no matter what the cluster status will be
self.xcom_push(
context,
key="cluster_conf",
value={
"cluster_name": self.cluster_name,
"region": self.location,
"project_id": self.project_id,
},
)
self.log.info("Updating %s cluster.", self.cluster_name)
operation = hook.update_cluster(
project_id=self.project_id,
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/google/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,8 @@ extra-links:
- airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink
- airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink
- airflow.providers.google.cloud.operators.mlengine.AIPlatformConsoleLink
- airflow.providers.google.cloud.operators.dataproc.DataprocJobLink
- airflow.providers.google.cloud.operators.dataproc.DataprocClusterLink

additional-extras:
apache.beam: apache-beam[gcp]
9 changes: 9 additions & 0 deletions airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,15 @@
"airflow.sensors.external_task_sensor.ExternalTaskSensorLink",
}

BUILTIN_OPERATOR_EXTRA_LINKS: List[str] = [
"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink",
"airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink",
"airflow.providers.google.cloud.operators.dataproc.DataprocJobLink",
"airflow.providers.google.cloud.operators.dataproc.DataprocClusterLink",
"airflow.providers.google.cloud.operators.mlengine.AIPlatformConsoleLink",
"airflow.providers.qubole.operators.qubole.QDSLink",
]


@cache
def get_operator_extra_links():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ function discover_all_extra_links() {
group_start "Listing available extra links via 'airflow providers links'"
COLUMNS=180 airflow providers links

local expected_number_of_extra_links=4
local expected_number_of_extra_links=6
local actual_number_of_extra_links
actual_number_of_extra_links=$(airflow providers links --output table | grep -c ^airflow.providers | xargs)
if [[ ${actual_number_of_extra_links} != "${expected_number_of_extra_links}" ]]; then
Expand Down
2 changes: 2 additions & 0 deletions tests/core/test_providers_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,8 @@
EXTRA_LINKS = [
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleIndexableLink',
'airflow.providers.google.cloud.operators.bigquery.BigQueryConsoleLink',
'airflow.providers.google.cloud.operators.dataproc.DataprocClusterLink',
'airflow.providers.google.cloud.operators.dataproc.DataprocJobLink',
'airflow.providers.google.cloud.operators.mlengine.AIPlatformConsoleLink',
'airflow.providers.qubole.operators.qubole.QDSLink',
]
Expand Down
Loading