Skip to content

Commit

Permalink
Revert changes to mysql_to_snowflake.py
Browse files Browse the repository at this point in the history
  • Loading branch information
mlavoie-sm360 committed Jun 15, 2020
1 parent 53ffb74 commit 8c72192
Showing 1 changed file with 30 additions and 50 deletions.
80 changes: 30 additions & 50 deletions pipelinewise/fastsync/mysql_to_snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
import os
import sys
import time
from functools import partial
from argparse import Namespace
import multiprocessing
from typing import Union

from typing import Union
from argparse import Namespace
from functools import partial
from datetime import datetime

from .commons import utils
from .commons.tap_mysql import FastSyncTapMySql
from .commons.tap_mongodb import FastSyncTapMongoDB
from .commons.target_snowflake import FastSyncTargetSnowflake

LOGGER = logging.getLogger(__name__)
Expand All @@ -20,7 +21,9 @@
'host',
'port',
'user',
'password'
'password',
'auth_database',
'dbname',
],
'target': [
'account',
Expand All @@ -37,78 +40,54 @@
LOCK = multiprocessing.Lock()


def tap_type_to_target_type(mysql_type, mysql_column_type):
"""Data type mapping from MySQL to Snowflake"""
def tap_type_to_target_type(mongo_type):
"""Data type mapping from MongoDB to Snowflake"""
return {
'char': 'VARCHAR',
'varchar': 'VARCHAR',
'binary': 'BINARY',
'varbinary': 'BINARY',
'blob': 'VARCHAR',
'tinyblob': 'VARCHAR',
'mediumblob': 'VARCHAR',
'longblob': 'VARCHAR',
'geometry': 'VARCHAR',
'text': 'VARCHAR',
'tinytext': 'VARCHAR',
'mediumtext': 'VARCHAR',
'longtext': 'VARCHAR',
'enum': 'VARCHAR',
'int': 'NUMBER',
'integer': 'NUMBER',
'tinyint': 'BOOLEAN' if mysql_column_type == 'tinyint(1)' else 'NUMBER',
'smallint': 'NUMBER',
'mediumint': 'NUMBER',
'bigint': 'NUMBER',
'bit': 'BOOLEAN',
'dec': 'FLOAT',
'decimal': 'FLOAT',
'double': 'FLOAT',
'float': 'FLOAT',
'bool': 'BOOLEAN',
'boolean': 'BOOLEAN',
'string': 'TEXT',
'object': 'VARIANT',
'array': 'VARIANT',
'date': 'TIMESTAMP_NTZ',
'datetime': 'TIMESTAMP_NTZ',
'timestamp': 'TIMESTAMP_NTZ',
'year': 'NUMBER',
'json': 'VARIANT'
}.get(mysql_type, 'VARCHAR')
}.get(mongo_type, 'VARCHAR')


# pylint: disable=too-many-locals
def sync_table(table: str, args: Namespace) -> Union[bool, str]:
"""Sync one table"""
mysql = FastSyncTapMySql(args.tap, tap_type_to_target_type)
mongodb = FastSyncTapMongoDB(args.tap, tap_type_to_target_type)
snowflake = FastSyncTargetSnowflake(args.target, args.transform)

try:
filename = 'pipelinewise_fastsync_{}_{}.csv.gz'.format(table, time.strftime('%Y%m%d-%H%M%S'))
dbname = args.tap.get('dbname')
filename = 'pipelinewise_fastsync_{}_{}_{}.csv.gz'.format(dbname, table, time.strftime('%Y%m%d-%H%M%S'))
filepath = os.path.join(args.temp_dir, filename)
target_schema = utils.get_target_schema(args.target, table)

# Open connection and get binlog file position
mysql.open_connections()
# Open connection
mongodb.open_connection()

# Get bookmark - Binlog position or Incremental Key value
bookmark = utils.get_bookmark_for_table(table, args.properties, mysql)
# Get bookmark - LSN position or Incremental Key value
bookmark = utils.get_bookmark_for_table(table, args.properties, mongodb, dbname=dbname)

# Exporting table data, get table definitions and close connection to avoid timeouts
mysql.copy_table(table, filepath)
mongodb.copy_table(table, filepath, args.temp_dir)
size_bytes = os.path.getsize(filepath)
snowflake_types = mysql.map_column_types_to_target(table)
snowflake_types = mongodb.map_column_types_to_target()
snowflake_columns = snowflake_types.get('columns', [])
primary_key = snowflake_types.get('primary_key')
mysql.close_connections()
primary_key = snowflake_types['primary_key']
mongodb.close_connection()

# Uploading to S3
s3_key = snowflake.upload_to_s3(filepath, table, tmp_dir=args.temp_dir)
os.remove(filepath)
# os.remove(filepath)

# Creating temp table in Snowflake
snowflake.create_schema(target_schema)
snowflake.create_table(target_schema, table, snowflake_columns, primary_key, is_temporary=True)

# Load into Snowflake table
snowflake.copy_to_table(s3_key, target_schema, table, size_bytes, is_temporary=True)
snowflake.copy_to_table(s3_key, target_schema, table, size_bytes, is_temporary=True, skip_csv_header=True)

# Obfuscate columns
snowflake.obfuscate_columns(target_schema, table)
Expand Down Expand Up @@ -170,6 +149,7 @@ def main_impl():
Total tables selected to sync : %s
Tables loaded successfully : %s
Exceptions during table sync : %s
CPU cores : %s
Runtime : %s
-------------------------------------------------------
Expand All @@ -186,4 +166,4 @@ def main():
main_impl()
except Exception as exc:
LOGGER.critical(exc)
raise exc
raise exc

0 comments on commit 8c72192

Please sign in to comment.