Skip to content

Commit

Permalink
Support impersonation service account parameter for Dataflow runner
Browse files Browse the repository at this point in the history
  • Loading branch information
Lukasz Wyszomirski committed May 27, 2022
1 parent 637a8b8 commit 6f3aa92
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
15 changes: 15 additions & 0 deletions airflow/providers/apache/beam/operators/beam.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class BeamDataflowMixin(metaclass=ABCMeta):
dataflow_config: DataflowConfiguration
gcp_conn_id: str
delegate_to: Optional[str]
dataflow_support_impersonation: bool = True

def _set_dataflow(
self,
Expand Down Expand Up @@ -91,6 +92,13 @@ def __get_dataflow_pipeline_options(
pipeline_options[job_name_key] = job_name
if self.dataflow_config.service_account:
pipeline_options["serviceAccount"] = self.dataflow_config.service_account
if self.dataflow_support_impersonation and self.dataflow_config.impersonation_chain:
if isinstance(self.dataflow_config.impersonation_chain, list):
pipeline_options["impersonateServiceAccount"] = ",".join(
self.dataflow_config.impersonation_chain
)
else:
pipeline_options["impersonateServiceAccount"] = self.dataflow_config.impersonation_chain
pipeline_options["project"] = self.dataflow_config.project_id
pipeline_options["region"] = self.dataflow_config.location
pipeline_options.setdefault("labels", {}).update(
Expand Down Expand Up @@ -549,6 +557,13 @@ def __init__(
**kwargs,
)

if self.dataflow_config.impersonation_chain:
self.log.info(
"Impersonation chain parameter is not supported for Apache Beam GO SDK and will be skipped "
"in the execution"
)
self.dataflow_support_impersonation = False

self.go_file = go_file
self.should_init_go_module = False
self.pipeline_options.setdefault("labels", {}).update(
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def write_version(filename: str = os.path.join(*[my_dir, "airflow", "git_version
'xmltodict<0.13.0',
]
apache_beam = [
'apache-beam>=2.33.0',
'apache-beam>=2.39.0',
]
arangodb = ['python-arango>=7.3.2']
asana = ['asana>=0.10']
Expand Down

0 comments on commit 6f3aa92

Please sign in to comment.