Skip to content

Commit

Permalink
[AP-680] Fastsync tables with composite primary key (#400)
Browse files Browse the repository at this point in the history
  • Loading branch information
koszti committed May 6, 2020
1 parent 5d425c1 commit 705afbf
Show file tree
Hide file tree
Showing 9 changed files with 63 additions and 27 deletions.
10 changes: 5 additions & 5 deletions pipelinewise/fastsync/commons/tap_mysql.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,16 @@ def fetch_current_incremental_key_pos(self, table, replication_key):
'version': 1
}

def get_primary_key(self, table_name):
def get_primary_keys(self, table_name):
"""
Get the primary key of a table
"""
table_dict = utils.tablename_to_dict(table_name)
sql = "SHOW KEYS FROM `{}`.`{}` WHERE Key_name = 'PRIMARY'".format(table_dict['schema_name'],
table_dict['table_name'])
primary_key = self.query(sql)
if len(primary_key) > 0:
return primary_key[0].get('Column_name')
pk_specs = self.query(sql)
if len(pk_specs) > 0:
return [safe_column_name(k.get('Column_name')) for k in pk_specs]

return None

Expand Down Expand Up @@ -203,7 +203,7 @@ def map_column_types_to_target(self, table_name):

return {
'columns': mapped_columns,
'primary_key': safe_column_name(self.get_primary_key(table_name))
'primary_key': self.get_primary_keys(table_name)
}

# pylint: disable=too-many-locals
Expand Down
10 changes: 5 additions & 5 deletions pipelinewise/fastsync/commons/tap_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def fetch_current_incremental_key_pos(self, table, replication_key):
'version': 1
}

def get_primary_key(self, table):
def get_primary_keys(self, table):
"""
Get the primary key of a table
"""
Expand All @@ -251,9 +251,9 @@ def get_primary_key(self, table):
pg_attribute.attrelid = pg_class.oid AND
pg_attribute.attnum = any(pg_index.indkey)
AND indisprimary""".format(schema_name, table_name)
primary_key = self.query(sql)
if len(primary_key) > 0:
return primary_key[0][0]
pk_specs = self.query(sql)
if len(pk_specs) > 0:
return [safe_column_name(k[0]) for k in pk_specs]

return None

Expand Down Expand Up @@ -294,7 +294,7 @@ def map_column_types_to_target(self, table_name):

return {
'columns': mapped_columns,
'primary_key': safe_column_name(self.get_primary_key(table_name))
'primary_key': self.get_primary_keys(table_name)
}

def copy_table(self, table_name, path):
Expand Down
4 changes: 2 additions & 2 deletions pipelinewise/fastsync/commons/tap_s3_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,15 @@ def fetch_current_incremental_key_pos(self, table: str,
replication_key: self.tables_last_modified[table].isoformat()
} if table in self.tables_last_modified else {}

def _get_primary_keys(self, table_specs: Dict) -> Optional[str]:
def _get_primary_keys(self, table_specs: Dict) -> Optional[List]:
"""
Returns the primary keys specified in the tap config by key_properties
The keys are made safe by wrapping them in quotes in case one or more are reserved words.
:param table_specs: properties of table
:return: the keys concatenated and separated by comma if keys are given, otherwise None
"""
if table_specs.get('key_properties', False):
return ','.join({safe_column_name(k) for k in table_specs['key_properties']})
return [safe_column_name(k) for k in table_specs['key_properties']]

return None

Expand Down
8 changes: 5 additions & 3 deletions pipelinewise/fastsync/commons/target_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def drop_table(self, target_schema, table_name, is_temporary=False):
sql = 'DROP TABLE IF EXISTS {}."{}"'.format(target_schema, target_table.lower())
self.query(sql)

def create_table(self, target_schema: str, table_name: str, columns: List[str], primary_key: str,
def create_table(self, target_schema: str, table_name: str, columns: List[str], primary_key: List[str],
is_temporary: bool = False, sort_columns=False):

table_dict = utils.tablename_to_dict(table_name)
Expand All @@ -78,9 +78,11 @@ def create_table(self, target_schema: str, table_name: str, columns: List[str],
if sort_columns:
columns.sort()

sql_columns = ','.join(columns).lower()
sql_primary_keys = ','.join(primary_key).lower() if primary_key else None
sql = f'CREATE TABLE IF NOT EXISTS {target_schema}."{target_table.lower()}" (' \
f'{",".join(columns).lower()}' \
f'{f", PRIMARY KEY ({primary_key.lower()}))" if primary_key else ")"}'
f'{sql_columns}' \
f'{f", PRIMARY KEY ({sql_primary_keys}))" if primary_key else ")"}'

self.query(sql)

Expand Down
8 changes: 5 additions & 3 deletions pipelinewise/fastsync/commons/target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def drop_table(self, target_schema, table_name, is_temporary=False):
sql = 'DROP TABLE IF EXISTS {}."{}"'.format(target_schema, target_table.upper())
self.query(sql)

def create_table(self, target_schema: str, table_name: str, columns: List[str], primary_key: str,
def create_table(self, target_schema: str, table_name: str, columns: List[str], primary_key: List[str],
is_temporary: bool = False, sort_columns=False):

table_dict = utils.tablename_to_dict(table_name)
Expand All @@ -128,9 +128,11 @@ def create_table(self, target_schema: str, table_name: str, columns: List[str],
if sort_columns:
columns.sort()

sql_columns = ','.join(columns)
sql_primary_keys = ','.join(primary_key) if primary_key else None
sql = f'CREATE OR REPLACE TABLE {target_schema}."{target_table.upper()}" (' \
f'{",".join(columns)}' \
f'{f", PRIMARY KEY ({primary_key}))" if primary_key else ")"}'
f'{sql_columns}' \
f'{f", PRIMARY KEY ({sql_primary_keys}))" if primary_key else ")"}'

self.query(sql)

Expand Down
2 changes: 1 addition & 1 deletion singer-connectors/target-postgres/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
pipelinewise-target-postgres==1.1.0
pipelinewise-target-postgres==2.0.0
22 changes: 19 additions & 3 deletions tests/units/fastsync/commons/test_fastsync_target_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def test_create_table(self):
table_name='test_table',
columns=['"id" INTEGER',
'"txt" CHARACTER VARYING'],
primary_key='"id"')
primary_key=['"id"'])
assert self.postgres.executed_queries == [
'CREATE TABLE IF NOT EXISTS test_schema."test_table" ('
'"id" integer,"txt" character varying,'
Expand All @@ -68,7 +68,7 @@ def test_create_table(self):
columns=['"id" INTEGER',
'"txt" CHARACTER VARYING',
'"SELECT" CHARACTER VARYING'],
primary_key='"id"')
primary_key=['"id"'])
assert self.postgres.executed_queries == [
'CREATE TABLE IF NOT EXISTS test_schema."order" ('
'"id" integer,"txt" character varying,"select" character varying,'
Expand All @@ -83,7 +83,7 @@ def test_create_table(self):
table_name='TABLE with SPACE',
columns=['"id" INTEGER',
'"column_with space" CHARACTER VARYING'],
primary_key='"id"')
primary_key=['"id"'])
assert self.postgres.executed_queries == [
'CREATE TABLE IF NOT EXISTS test_schema."table with space" ('
'"id" integer,"column_with space" character varying,'
Expand All @@ -92,6 +92,22 @@ def test_create_table(self):
'_sdc_deleted_at character varying'
', PRIMARY KEY ("id"))']

# Create table with composite primary key
self.postgres.executed_queries = []
self.postgres.create_table(target_schema='test_schema',
table_name='TABLE with SPACE',
columns=['"id" INTEGER',
'"num" INTEGER',
'"column_with space" CHARACTER VARYING'],
primary_key=['"id"', '"num"'])
assert self.postgres.executed_queries == [
'CREATE TABLE IF NOT EXISTS test_schema."table with space" ('
'"id" integer,"num" integer,"column_with space" character varying,'
'_sdc_extracted_at timestamp without time zone,'
'_sdc_batched_at timestamp without time zone,'
'_sdc_deleted_at character varying'
', PRIMARY KEY ("id","num"))']

# Create table with no primary key
self.postgres.executed_queries = []
self.postgres.create_table(target_schema='test_schema',
Expand Down
22 changes: 19 additions & 3 deletions tests/units/fastsync/commons/test_fastsync_target_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_create_table(self):
table_name='test_table',
columns=['"ID" INTEGER',
'"TXT" VARCHAR'],
primary_key='"ID"')
primary_key=['"ID"'])
assert self.snowflake.executed_queries == [
'CREATE OR REPLACE TABLE test_schema."TEST_TABLE" ('
'"ID" INTEGER,"TXT" VARCHAR,'
Expand All @@ -85,7 +85,7 @@ def test_create_table(self):
columns=['"ID" INTEGER',
'"TXT" VARCHAR',
'"SELECT" VARCHAR'],
primary_key='"ID"')
primary_key=['"ID"'])
assert self.snowflake.executed_queries == [
'CREATE OR REPLACE TABLE test_schema."ORDER" ('
'"ID" INTEGER,"TXT" VARCHAR,"SELECT" VARCHAR,'
Expand All @@ -100,7 +100,7 @@ def test_create_table(self):
table_name='TABLE with SPACE',
columns=['"ID" INTEGER',
'"COLUMN WITH SPACE" CHARACTER VARYING'],
primary_key='"ID"')
primary_key=['"ID"'])
assert self.snowflake.executed_queries == [
'CREATE OR REPLACE TABLE test_schema."TABLE WITH SPACE" ('
'"ID" INTEGER,"COLUMN WITH SPACE" CHARACTER VARYING,'
Expand All @@ -109,6 +109,22 @@ def test_create_table(self):
'_SDC_DELETED_AT VARCHAR'
', PRIMARY KEY ("ID"))']

# Create table with composite primary key
self.snowflake.executed_queries = []
self.snowflake.create_table(target_schema='test_schema',
table_name='TABLE with SPACE',
columns=['"ID" INTEGER',
'"NUM" INTEGER',
'"COLUMN WITH SPACE" CHARACTER VARYING'],
primary_key=['"ID", "NUM"'])
assert self.snowflake.executed_queries == [
'CREATE OR REPLACE TABLE test_schema."TABLE WITH SPACE" ('
'"ID" INTEGER,"NUM" INTEGER,"COLUMN WITH SPACE" CHARACTER VARYING,'
'_SDC_EXTRACTED_AT TIMESTAMP_NTZ,'
'_SDC_BATCHED_AT TIMESTAMP_NTZ,'
'_SDC_DELETED_AT VARCHAR'
', PRIMARY KEY ("ID", "NUM"))']

# Create table with no primary key
self.snowflake.executed_queries = []
self.snowflake.create_table(target_schema='test_schema',
Expand Down
4 changes: 2 additions & 2 deletions tests/units/fastsync/test_fastsync_tap_s3_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,11 @@ def test_get_primary_keys_with_table_that_has_empty_keys_list_returns_none(self)
self.assertIsNone(self.fs_tap_s3_csv._get_primary_keys({'key_properties': []}))

def test_get_primary_keys_with_table_that_has_1_key_returns_one_safe_key(self):
self.assertEqual('"KEY_1"', self.fs_tap_s3_csv._get_primary_keys({'key_properties': ['key_1']}))
self.assertEqual(['"KEY_1"'], self.fs_tap_s3_csv._get_primary_keys({'key_properties': ['key_1']}))

def test_get_primary_keys_with_table_that_has_2_keys_returns_concatenated_keys(self):
self.assertIn(self.fs_tap_s3_csv._get_primary_keys({'key_properties': ['key_2', 'key_3']}),
['"KEY_2","KEY_3"', '"KEY_3","KEY_2"'])
[['"KEY_2"', '"KEY_3"'], ['"KEY_3"', '"KEY_2"']])

def test_get_table_columns(self):
output = list(
Expand Down

0 comments on commit 705afbf

Please sign in to comment.