From 368c38792e633c96af0a8bd36a8fe4c7286f28bf Mon Sep 17 00:00:00 2001 From: cswpy Date: Fri, 10 Jun 2022 14:59:34 +0800 Subject: [PATCH 1/3] Fixed BigQueryCreateExternalTableOperator and its unit test (#24160) --- .../google/cloud/operators/bigquery.py | 52 +++++++++++++------ .../google/cloud/operators/test_bigquery.py | 46 ++++++++++------ 2 files changed, 64 insertions(+), 34 deletions(-) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 81cce5ca2b5817..8fecfbf149ed96 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -33,7 +33,7 @@ from airflow.models import BaseOperator, BaseOperatorLink from airflow.models.xcom import XCom from airflow.operators.sql import SQLCheckOperator, SQLIntervalCheckOperator, SQLValueCheckOperator -from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, BigQueryJob +from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, BigQueryJob, _split_tablename from airflow.providers.google.cloud.hooks.gcs import GCSHook, _parse_gcs_url from airflow.providers.google.cloud.links.bigquery import BigQueryDatasetLink, BigQueryTableLink @@ -1143,23 +1143,41 @@ def execute(self, context: 'Context') -> None: source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects] - table = bq_hook.create_external_table( - external_project_dataset_table=self.destination_project_dataset_table, - schema_fields=schema_fields, - source_uris=source_uris, - source_format=self.source_format, - autodetect=self.autodetect, - compression=self.compression, - skip_leading_rows=self.skip_leading_rows, - field_delimiter=self.field_delimiter, - max_bad_records=self.max_bad_records, - quote_character=self.quote_character, - allow_quoted_newlines=self.allow_quoted_newlines, - allow_jagged_rows=self.allow_jagged_rows, - src_fmt_configs=self.src_fmt_configs, - labels=self.labels, - encryption_configuration=self.encryption_configuration, + project_id, dataset_id, table_id = _split_tablename( + table_input=self.destination_project_dataset_table, + default_project_id=bq_hook.project_id or '', + ) + + table_resource = { + "tableReference": { + "projectId": project_id, + "datasetId": dataset_id, + "tableId": table_id, + }, + "labels": self.labels, + "schema": {"fields": schema_fields}, + "externalDataConfiguration": { + "source_uris": source_uris, + "source_format": self.source_format, + "maxBadRecords": self.max_bad_records, + "autodetect": self.autodetect, + "compression": self.compression, + "csvOptions": { + "fieldDelimeter": self.field_delimiter, + "skipLeadingRows": self.skip_leading_rows, + "quote": self.quote_character, + "allowQuotedNewlines": self.allow_quoted_newlines, + "allowJaggedRows": self.allow_jagged_rows, + }, + }, + "location": self.location, + "encryptionConfiguration": self.encryption_configuration, + } + + table = bq_hook.create_empty_table( + table_resource=table_resource, ) + BigQueryTableLink.persist( context=context, task_instance=self, diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py index 9a060d2f6e3674..58722819749e94 100644 --- a/tests/providers/google/cloud/operators/test_bigquery.py +++ b/tests/providers/google/cloud/operators/test_bigquery.py @@ -190,7 +190,7 @@ class TestBigQueryCreateExternalTableOperator(unittest.TestCase): def test_execute(self, mock_hook): operator = BigQueryCreateExternalTableOperator( task_id=TASK_ID, - destination_project_dataset_table=f'{TEST_DATASET}.{TEST_TABLE_ID}', + destination_project_dataset_table=f'{TEST_GCP_PROJECT_ID}.{TEST_DATASET}.{TEST_TABLE_ID}', schema_fields=[], bucket=TEST_GCS_BUCKET, source_objects=TEST_GCS_DATA, @@ -199,22 +199,34 @@ def test_execute(self, mock_hook): ) operator.execute(context=MagicMock()) - mock_hook.return_value.create_external_table.assert_called_once_with( - external_project_dataset_table=f'{TEST_DATASET}.{TEST_TABLE_ID}', - schema_fields=[], - source_uris=[f'gs://{TEST_GCS_BUCKET}/{source_object}' for source_object in TEST_GCS_DATA], - source_format=TEST_SOURCE_FORMAT, - autodetect=True, - compression='NONE', - skip_leading_rows=0, - field_delimiter=',', - max_bad_records=0, - quote_character=None, - allow_quoted_newlines=False, - allow_jagged_rows=False, - src_fmt_configs={}, - labels=None, - encryption_configuration=None, + mock_hook.return_value.create_empty_table.assert_called_once_with( + table_resource={ + "tableReference": { + "projectId": TEST_GCP_PROJECT_ID, + "datasetId": TEST_DATASET, + "tableId": TEST_TABLE_ID, + }, + "labels": None, + "schema": {"fields": []}, + "externalDataConfiguration": { + "source_uris": [ + f'gs://{TEST_GCS_BUCKET}/{source_object}' for source_object in TEST_GCS_DATA + ], + "source_format": TEST_SOURCE_FORMAT, + "maxBadRecords": 0, + "autodetect": True, + "compression": 'NONE', + "csvOptions": { + "fieldDelimeter": ',', + "skipLeadingRows": 0, + "quote": None, + "allowQuotedNewlines": False, + "allowJaggedRows": False, + }, + }, + "location": None, + "encryptionConfiguration": None, + } ) From 920a81c5b66923563673034cf282389048779d09 Mon Sep 17 00:00:00 2001 From: cswpy Date: Mon, 20 Jun 2022 04:27:37 +0800 Subject: [PATCH 2/3] Updated split_tablename functional call --- airflow/providers/google/cloud/operators/bigquery.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airflow/providers/google/cloud/operators/bigquery.py b/airflow/providers/google/cloud/operators/bigquery.py index 8fecfbf149ed96..4c78f6d5ba526b 100644 --- a/airflow/providers/google/cloud/operators/bigquery.py +++ b/airflow/providers/google/cloud/operators/bigquery.py @@ -33,7 +33,7 @@ from airflow.models import BaseOperator, BaseOperatorLink from airflow.models.xcom import XCom from airflow.operators.sql import SQLCheckOperator, SQLIntervalCheckOperator, SQLValueCheckOperator -from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, BigQueryJob, _split_tablename +from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook, BigQueryJob from airflow.providers.google.cloud.hooks.gcs import GCSHook, _parse_gcs_url from airflow.providers.google.cloud.links.bigquery import BigQueryDatasetLink, BigQueryTableLink @@ -1143,7 +1143,7 @@ def execute(self, context: 'Context') -> None: source_uris = [f"gs://{self.bucket}/{source_object}" for source_object in self.source_objects] - project_id, dataset_id, table_id = _split_tablename( + project_id, dataset_id, table_id = bq_hook.split_tablename( table_input=self.destination_project_dataset_table, default_project_id=bq_hook.project_id or '', ) From 420bdeb404056354b16f6ef0170e41aaaf870547 Mon Sep 17 00:00:00 2001 From: cswpy Date: Wed, 22 Jun 2022 10:06:56 +0800 Subject: [PATCH 3/3] Fixed TestBigQueryCreateExternalTableOperator unit test --- tests/providers/google/cloud/operators/test_bigquery.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/providers/google/cloud/operators/test_bigquery.py b/tests/providers/google/cloud/operators/test_bigquery.py index 58722819749e94..7be855a9a0e6f4 100644 --- a/tests/providers/google/cloud/operators/test_bigquery.py +++ b/tests/providers/google/cloud/operators/test_bigquery.py @@ -198,6 +198,12 @@ def test_execute(self, mock_hook): autodetect=True, ) + mock_hook.return_value.split_tablename.return_value = ( + TEST_GCP_PROJECT_ID, + TEST_DATASET, + TEST_TABLE_ID, + ) + operator.execute(context=MagicMock()) mock_hook.return_value.create_empty_table.assert_called_once_with( table_resource={