From 355ed0c739a7fec97ce6b91a5db2b6021c1a74f8 Mon Sep 17 00:00:00 2001 From: Amir Mofakhar Date: Wed, 7 Sep 2022 08:46:26 +0100 Subject: [PATCH] [AP-1254] Refactored partialsync to use merge (#1010) * refactored partialsync to use merge * fixed unify * fix unity * fixed pep8 --- docs/user_guide/metadata_columns.rst | 2 +- pipelinewise/fastsync/commons/tap_mysql.py | 31 ++- pipelinewise/fastsync/commons/tap_postgres.py | 39 ++-- .../fastsync/commons/target_snowflake.py | 29 +++ .../partialsync/mysql_to_snowflake.py | 52 ++++- .../partialsync/postgres_to_snowflake.py | 46 ++++- pipelinewise/fastsync/partialsync/utils.py | 104 +++++++--- tests/end_to_end/helpers/assertions.py | 97 +++++++-- tests/end_to_end/helpers/env.py | 39 +++- tests/end_to_end/target_snowflake/__init__.py | 4 +- .../test_partial_sync_mariadb_to_sf.py | 172 ++++++++++++++-- .../test_partial_sync_pg_to_sf.py | 191 +++++++++++++++--- .../tap_mysql_to_sf_soft_delete.yml.template | 113 +++++++++++ ...ap_postgres_to_sf_soft_delete.yml.template | 123 +++++++++++ .../commons/test_fastsync_target_snowflake.py | 37 ++++ .../tmp/target_config_tmp.json | 4 +- .../test_partial_sync_utils/__init__.py | 0 .../sample_sf_columns.py | 107 ++++++++++ .../partialsync/test_mysql_to_snowflake.py | 38 ++-- .../partialsync/test_partial_sync_utils.py | 139 ++++++++++--- .../partialsync/test_postgres_to_snowflake.py | 32 ++- tests/units/partialsync/utils.py | 5 +- 22 files changed, 1198 insertions(+), 206 deletions(-) create mode 100644 tests/end_to_end/test-project/tap_mysql_to_sf_soft_delete.yml.template create mode 100644 tests/end_to_end/test-project/tap_postgres_to_sf_soft_delete.yml.template create mode 100644 tests/units/partialsync/resources/test_partial_sync_utils/__init__.py create mode 100644 tests/units/partialsync/resources/test_partial_sync_utils/sample_sf_columns.py diff --git a/docs/user_guide/metadata_columns.rst b/docs/user_guide/metadata_columns.rst index 225e2a750..8119d5386 100644 --- a/docs/user_guide/metadata_columns.rst +++ b/docs/user_guide/metadata_columns.rst @@ -43,7 +43,7 @@ at the end of the table: target database as well. Please also note that Only :ref:`log_based` replication method detects delete row events. - To turn off **Hard Delete** mode add ``hard_delete: False`` to the target :ref:`targets_list` + To turn off **Hard Delete** mode add ``hard_delete: false`` to the target :ref:`targets_list` YAML config file. In this case when a deleted row captured in source then ``_SDC_DELETED_AT`` column will only get flagged and not get deleted in the target. Please also note that Only :ref:`log_based` replication method detects delete row events. diff --git a/pipelinewise/fastsync/commons/tap_mysql.py b/pipelinewise/fastsync/commons/tap_mysql.py index 179c659d5..220ea9572 100644 --- a/pipelinewise/fastsync/commons/tap_mysql.py +++ b/pipelinewise/fastsync/commons/tap_mysql.py @@ -8,7 +8,7 @@ import pymysql.cursors from argparse import Namespace -from typing import Tuple, Dict, Callable, Union +from typing import Tuple, Dict, Callable from pymysql import InterfaceError, OperationalError, Connection from ...utils import safe_column_name @@ -280,11 +280,11 @@ def fetch_current_incremental_key_pos(self, table, replication_key): Get the actual incremental key position in the table """ result = self.query( - 'SELECT MAX({}) AS key_value FROM {}'.format(replication_key, table) + f'SELECT MAX({replication_key}) AS key_value FROM {table}' ) if not result: raise Exception( - 'Cannot get replication key value for table: {}'.format(table) + f'Cannot get replication key value for table: {table}' ) mysql_key_value = result[0].get('key_value') @@ -311,9 +311,8 @@ def get_primary_keys(self, table_name): Get the primary key of a table """ table_dict = utils.tablename_to_dict(table_name) - sql = "SHOW KEYS FROM `{}`.`{}` WHERE Key_name = 'PRIMARY'".format( - table_dict['schema_name'], table_dict['table_name'] - ) + sql = f"SHOW KEYS FROM `{table_dict['schema_name']}`.`{table_dict['table_name']}` WHERE Key_name = 'PRIMARY'" + pk_specs = self.query(sql) if len(pk_specs) > 0: return [ @@ -417,7 +416,7 @@ def copy_table( split_file_chunk_size_mb=1000, split_file_max_chunks=20, compress=True, - where_clause_setting=None + where_clause_sql='', ): """ Export data from table to a zipped csv @@ -437,16 +436,14 @@ def copy_table( raise Exception('{} table not found.'.format(table_name)) table_dict = utils.tablename_to_dict(table_name) - where_clause_sql = '' - if where_clause_setting: - where_clause_sql = f' WHERE {where_clause_setting["column"]} >= \'{where_clause_setting["start_value"]}\'' - if where_clause_setting['end_value']: - where_clause_sql += f' AND {where_clause_setting["column"]} <= \'{where_clause_setting["end_value"]}\'' + + column_safe_sql_values = column_safe_sql_values + [ + "CONVERT_TZ( NOW(),@@session.time_zone,'+00:00') AS `_SDC_EXTRACTED_AT`", + "CONVERT_TZ( NOW(),@@session.time_zone,'+00:00') AS `_SDC_BATCHED_AT`", + 'null AS `_SDC_DELETED_AT`' + ] sql = """SELECT {} - ,CONVERT_TZ( NOW(),@@session.time_zone,'+00:00') AS _SDC_EXTRACTED_AT - ,CONVERT_TZ( NOW(),@@session.time_zone,'+00:00') AS _SDC_BATCHED_AT - ,null AS _SDC_DELETED_AT FROM `{}`.`{}` {} """.format( ','.join(column_safe_sql_values), @@ -500,7 +497,7 @@ def copy_table( ) def export_source_table_data( - self, args: Namespace, tap_id: str, where_clause_setting: Union[Dict, None] = None) -> list: + self, args: Namespace, tap_id: str, where_clause_sql: str = '') -> list: """Export source table data""" filename = utils.gen_export_filename(tap_id=tap_id, table=args.table, sync_type='partialsync') filepath = os.path.join(args.temp_dir, filename) @@ -513,7 +510,7 @@ def export_source_table_data( split_large_files=args.target.get('split_large_files'), split_file_chunk_size_mb=args.target.get('split_file_chunk_size_mb'), split_file_max_chunks=args.target.get('split_file_max_chunks'), - where_clause_setting=where_clause_setting + where_clause_sql=where_clause_sql, ) file_parts = glob.glob(f'{filepath}*') return file_parts diff --git a/pipelinewise/fastsync/commons/tap_postgres.py b/pipelinewise/fastsync/commons/tap_postgres.py index ee85850f9..1807f7b2f 100644 --- a/pipelinewise/fastsync/commons/tap_postgres.py +++ b/pipelinewise/fastsync/commons/tap_postgres.py @@ -337,7 +337,7 @@ def fetch_current_incremental_key_pos(self, table, replication_key): ) if not result: raise Exception( - 'Cannot get replication key value for table: {}'.format(table) + f'Cannot get replication key value for table: {table}' ) postgres_key_value = result[0].get('key_value') @@ -477,7 +477,7 @@ def copy_table( split_file_chunk_size_mb=1000, split_file_max_chunks=20, compress=True, - where_clause_setting=None + where_clause_sql='', ): """ Export data from table to a zipped csv @@ -494,24 +494,20 @@ def copy_table( # If self.get_table_columns returns zero row then table not exist if len(column_safe_sql_values) == 0: - raise Exception('{} table not found.'.format(table_name)) + raise Exception(f'{table_name} table not found.') schema_name, table_name = table_name.split('.') - where_clause_sql = '' - if where_clause_setting: - where_clause_sql = f' WHERE {where_clause_setting["column"]} >= \'{where_clause_setting["start_value"]}\'' - if where_clause_setting['end_value']: - where_clause_sql += f' AND {where_clause_setting["column"]} <= \'{where_clause_setting["end_value"]}\'' - - sql = """COPY (SELECT {} - ,now() AT TIME ZONE 'UTC' - ,now() AT TIME ZONE 'UTC' - ,null - FROM {}."{}"{}) TO STDOUT with CSV DELIMITER ',' - """.format( - ','.join(column_safe_sql_values), schema_name, table_name, where_clause_sql - ) + column_safe_sql_values = column_safe_sql_values + [ + "now() AT TIME ZONE 'UTC' AS _SDC_EXTRACTED_AT", + "now() AT TIME ZONE 'UTC' AS _SDC_BATCHED_AT", + 'null _SDC_DELETED_AT' + ] + + sql = f"""COPY (SELECT {','.join(column_safe_sql_values)} + FROM {schema_name}."{table_name}"{where_clause_sql}) TO STDOUT with CSV DELIMITER ',' + """ + LOGGER.info('Exporting data: %s', sql) gzip_splitter = split_gzip.open( @@ -526,23 +522,18 @@ def copy_table( self.curr.copy_expert(sql, split_gzip_files, size=131072) def export_source_table_data( - self, args: Namespace, tap_id: str) -> list: + self, args: Namespace, tap_id: str, where_clause_sql: str = '') -> list: """Exporting data from the source table""" filename = utils.gen_export_filename(tap_id=tap_id, table=args.table, sync_type='partialsync') filepath = os.path.join(args.temp_dir, filename) - where_clause_setting = { - 'column': args.column, - 'start_value': args.start_value, - 'end_value': args.end_value - } self.copy_table( args.table, filepath, split_large_files=args.target.get('split_large_files'), split_file_chunk_size_mb=args.target.get('split_file_chunk_size_mb'), split_file_max_chunks=args.target.get('split_file_max_chunks'), - where_clause_setting=where_clause_setting + where_clause_sql=where_clause_sql ) file_parts = glob.glob(f'{filepath}*') return file_parts diff --git a/pipelinewise/fastsync/commons/target_snowflake.py b/pipelinewise/fastsync/commons/target_snowflake.py index 3008f3078..ef59dd7da 100644 --- a/pipelinewise/fastsync/commons/target_snowflake.py +++ b/pipelinewise/fastsync/commons/target_snowflake.py @@ -417,6 +417,28 @@ def obfuscate_columns(self, target_schema: str, table_name: str): LOGGER.info('Obfuscation rules applied.') + def merge_tables(self, schema, source_table, target_table, columns, primary_keys): + on_clause = ' AND '.join( + [f'"{source_table.upper()}".{p.upper()} = "{target_table.upper()}".{p.upper()}' for p in primary_keys] + ) + update_clause = ', '.join( + [f'"{target_table.upper()}".{c.upper()} = "{source_table.upper()}".{c.upper()}' for c in columns] + ) + columns_for_insert = ', '.join([f'{c.upper()}' for c in columns]) + values = ', '.join([f'"{source_table.upper()}".{c.upper()}' for c in columns]) + + query = f'MERGE INTO {schema}."{target_table.upper()}" USING {schema}."{source_table.upper()}"' \ + f' ON {on_clause}' \ + f' WHEN MATCHED THEN UPDATE SET {update_clause}' \ + f' WHEN NOT MATCHED THEN INSERT ({columns_for_insert})' \ + f' VALUES ({values})' + self.query(query) + + def partial_hard_delete(self, schema, table, where_clause_sql): + self.query( + f'DELETE FROM {schema}."{table.upper()}"{where_clause_sql} AND _SDC_DELETEd_AT IS NOT NULL' + ) + def swap_tables(self, schema, table_name) -> None: """ Swaps given target table with its temp version and drops the latter @@ -440,6 +462,13 @@ def swap_tables(self, schema, table_name) -> None: query_tag_props={'schema': schema, 'table': temp_table}, ) + def add_columns(self, schema: str, table_name: str, adding_columns: dict) -> None: + if adding_columns: + add_columns_list = [f'{column_name} {column_type}' for column_name, column_type in adding_columns.items()] + add_clause = ', '.join(add_columns_list) + query = f'ALTER TABLE {schema}."{table_name.upper()}" ADD {add_clause}' + self.query(query) + def __apply_transformations( self, transformations: List[Dict], target_schema: str, table_name: str ) -> None: diff --git a/pipelinewise/fastsync/partialsync/mysql_to_snowflake.py b/pipelinewise/fastsync/partialsync/mysql_to_snowflake.py index b55b8d1ce..0a8a2b02f 100644 --- a/pipelinewise/fastsync/partialsync/mysql_to_snowflake.py +++ b/pipelinewise/fastsync/partialsync/mysql_to_snowflake.py @@ -13,11 +13,13 @@ from pipelinewise.fastsync.partialsync import utils from pipelinewise.fastsync.mysql_to_snowflake import REQUIRED_CONFIG_KEYS, tap_type_to_target_type -from pipelinewise.fastsync.partialsync.utils import load_into_snowflake, upload_to_s3, update_state_file +from pipelinewise.fastsync.partialsync.utils import ( + upload_to_s3, update_state_file, diff_source_target_columns, load_into_snowflake) LOGGER = Logger().get_logger(__name__) +# pylint: disable=too-many-locals def partial_sync_table(args: Namespace) -> Union[bool, str]: """Partial sync table for MySQL to Snowflake""" snowflake = FastSyncTargetSnowflake(args.target, args.transform) @@ -26,22 +28,52 @@ def partial_sync_table(args: Namespace) -> Union[bool, str]: try: mysql = FastSyncTapMySql(args.tap, tap_type_to_target_type) - # Get bookmark - Binlog position or Incremental Key value mysql.open_connections() - bookmark = common_utils.get_bookmark_for_table(args.table, args.properties, mysql) - where_clause_setting = { - 'column': args.column, - 'start_value': args.start_value, - 'end_value': args.end_value + # Get column differences + target_schema = common_utils.get_target_schema(args.target, args.table) + table_dict = common_utils.tablename_to_dict(args.table) + target_table = table_dict.get('table_name') + + target_sf = { + 'sf_object': snowflake, + 'schema': target_schema, + 'table': target_table, + 'temp': table_dict.get('temp_table_name') } - file_parts = mysql.export_source_table_data(args, tap_id, where_clause_setting) + snowflake_types = mysql.map_column_types_to_target(args.table) + source_columns = snowflake_types.get('columns', []) + columns_diff = diff_source_target_columns(target_sf, source_columns=source_columns) + + # Get bookmark - Binlog position or Incremental Key value + bookmark = common_utils.get_bookmark_for_table(args.table, args.properties, mysql) + + where_clause_sql = f' WHERE {args.column} >= \'{args.start_value}\'' + if args.end_value: + where_clause_sql += f' AND {args.column} <= \'{args.end_value}\'' + + # export data from source + file_parts = mysql.export_source_table_data(args, tap_id, where_clause_sql) + + # mark partial data as deleted in the target + snowflake.query(f'UPDATE {target_schema}."{target_table.upper()}"' + f' SET _SDC_DELETEd_AT = CURRENT_TIMESTAMP(){where_clause_sql} AND _SDC_DELETED_AT IS NULL') + + # Creating temp table in Snowflake + primary_keys = snowflake_types.get('primary_key') + snowflake.create_schema(target_schema) + snowflake.create_table( + target_schema, args.table, source_columns, primary_keys, is_temporary=True + ) mysql.close_connections() + size_bytes = sum([os.path.getsize(file_part) for file_part in file_parts]) - s3_keys, s3_key_pattern = upload_to_s3(snowflake, file_parts, args.temp_dir) - load_into_snowflake(snowflake, args, s3_keys, s3_key_pattern, size_bytes) + _, s3_key_pattern = upload_to_s3(snowflake, file_parts, args.temp_dir) + + load_into_snowflake(target_sf, args, columns_diff, primary_keys, s3_key_pattern, size_bytes, where_clause_sql) + update_state_file(args, bookmark) return True diff --git a/pipelinewise/fastsync/partialsync/postgres_to_snowflake.py b/pipelinewise/fastsync/partialsync/postgres_to_snowflake.py index 556192cb2..bc500af8b 100644 --- a/pipelinewise/fastsync/partialsync/postgres_to_snowflake.py +++ b/pipelinewise/fastsync/partialsync/postgres_to_snowflake.py @@ -10,12 +10,13 @@ from pipelinewise.fastsync.postgres_to_snowflake import REQUIRED_CONFIG_KEYS, tap_type_to_target_type from pipelinewise.fastsync.commons import utils as common_utils from pipelinewise.fastsync.partialsync.utils import ( - load_into_snowflake, upload_to_s3, update_state_file, parse_args_for_partial_sync) + upload_to_s3, update_state_file, parse_args_for_partial_sync, diff_source_target_columns, load_into_snowflake) from pipelinewise.logger import Logger LOGGER = Logger().get_logger(__name__) +# pylint: disable=too-many-locals def partial_sync_table(args: Namespace) -> Union[bool, str]: """Partial sync table for Postgres to Snowflake""" snowflake = FastSyncTargetSnowflake(args.target, args.transform) @@ -26,13 +27,50 @@ def partial_sync_table(args: Namespace) -> Union[bool, str]: # Get bookmark - Binlog position or Incremental Key value postgres.open_connection() + + # Get column differences + target_schema = common_utils.get_target_schema(args.target, args.table) + table_dict = common_utils.tablename_to_dict(args.table) + target_table = table_dict.get('table_name') + + target_sf = { + 'sf_object': snowflake, + 'schema': target_schema, + 'table': target_table, + 'temp': table_dict.get('temp_table_name') + } + + snowflake_types = postgres.map_column_types_to_target(args.table) + source_columns = snowflake_types.get('columns', []) + columns_diff = diff_source_target_columns(target_sf, source_columns=source_columns) + bookmark = common_utils.get_bookmark_for_table(args.table, args.properties, postgres, dbname=dbname) - file_parts = postgres.export_source_table_data(args, tap_id) + where_clause_sql = f' WHERE {args.column} >= \'{args.start_value}\'' + if args.end_value: + where_clause_sql += f' AND {args.column} <= \'{args.end_value}\'' + + file_parts = postgres.export_source_table_data(args, tap_id, where_clause_sql) + + # mark partial data as deleted in the target + snowflake.query( + f'UPDATE {target_schema}."{target_table.upper()}"' + f' SET _SDC_DELETEd_AT = CURRENT_TIMESTAMP(){where_clause_sql} AND _SDC_DELETED_AT IS NULL') + + # Creating temp table in Snowflake + primary_keys = snowflake_types.get('primary_key') + snowflake.create_schema(target_schema) + snowflake.create_table( + target_schema, args.table, source_columns, primary_keys, is_temporary=True + ) + postgres.close_connection() + size_bytes = sum([os.path.getsize(file_part) for file_part in file_parts]) - s3_keys, s3_key_pattern = upload_to_s3(snowflake, file_parts, args.temp_dir) - load_into_snowflake(snowflake, args, s3_keys, s3_key_pattern, size_bytes) + _, s3_key_pattern = upload_to_s3(snowflake, file_parts, args.temp_dir) + + load_into_snowflake(target_sf, args, columns_diff, primary_keys, s3_key_pattern, size_bytes, where_clause_sql) + update_state_file(args, bookmark) return True diff --git a/pipelinewise/fastsync/partialsync/utils.py b/pipelinewise/fastsync/partialsync/utils.py index 83cecd13c..8fa91a142 100644 --- a/pipelinewise/fastsync/partialsync/utils.py +++ b/pipelinewise/fastsync/partialsync/utils.py @@ -1,3 +1,5 @@ +import json + import argparse import os import re @@ -25,37 +27,69 @@ def upload_to_s3(snowflake: FastSyncTargetSnowflake, file_parts: List, temp_dir: return s3_keys, s3_key_pattern -def load_into_snowflake( - snowflake: FastSyncTargetSnowflake, - args: argparse.Namespace, - s3_keys: List, s3_key_pattern: str, size_bytes: int) -> None: - """load data into Snowflake""" - - # delete partial data from the table - target_schema = common_utils.get_target_schema(args.target, args.table) - table_dict = common_utils.tablename_to_dict(args.table) - target_table = table_dict.get('table_name') - where_clause = f'WHERE {args.column} >= \'{args.start_value}\'' - if args.end_value: - where_clause += f' AND {args.column} <= \'{args.end_value}\'' - - snowflake.query(f'DELETE FROM {target_schema}."{target_table.upper()}" {where_clause}') - # copy partial data into the table - archive_load_files = args.target.get('archive_load_files', False) - tap_id = args.target.get('tap_id') +def diff_source_target_columns(target_sf: dict, source_columns: list) -> dict: + """Finding the diff between source and target columns""" + target_column = target_sf['sf_object'].query( + f'SHOW COLUMNS IN TABLE {target_sf["schema"]}."{target_sf["table"].upper()}"' + ) - # Load into Snowflake table + source_columns_dict = _get_source_columns_dict(source_columns) + target_columns_info = _get_target_columns_info(target_column) + added_columns = _get_added_columns(source_columns_dict, target_columns_info['columns_dict']) + removed_columns = _get_removed_columns(source_columns_dict, target_columns_info['columns_dict']) + + return { + 'added_columns': added_columns, + 'removed_columns': removed_columns, + 'target_columns': target_columns_info['column_names'], + 'source_columns': source_columns_dict + } + + +def _get_target_columns_info(target_column): + target_columns_dict = {} + list_of_target_column_names = [] + for column in target_column: + list_of_target_column_names.append(column['column_name']) + column_type_str = column['data_type'] + column_type_dict = json.loads(column_type_str) + target_columns_dict[f'"{column["column_name"]}"'] = column_type_dict['type'] + return { + 'column_names': list_of_target_column_names, + 'columns_dict': target_columns_dict + } + + +def _get_source_columns_dict(source_columns): + source_columns_dict = {} + for column in source_columns: + column_info = column.split(' ') + column_name = column_info[0] + column_type = ' '.join(column_info[1:]) + source_columns_dict[column_name] = column_type + return source_columns_dict + + +def load_into_snowflake(target, args, columns_diff, primary_keys, s3_key_pattern, size_bytes, + where_clause_sql): + """Loading data from S3 to the temp table in snowflake and then merge it with the target table""" + + snowflake = target['sf_object'] + # Load into Snowflake temp table snowflake.copy_to_table( - s3_key_pattern, target_schema, args.table, size_bytes, is_temporary=False + s3_key_pattern, target['schema'], args.table, size_bytes, is_temporary=True ) + # Obfuscate columns + snowflake.obfuscate_columns(target['schema'], args.table) - for s3_key in s3_keys: - if archive_load_files: - # Copy load file to archive - snowflake.copy_to_archive(s3_key, tap_id, args.table) - - # Delete all file parts from s3 - snowflake.s3.delete_object(Bucket=args.target.get('s3_bucket'), Key=s3_key) + snowflake.add_columns(target['schema'], target['table'], columns_diff['added_columns']) + added_metadata_columns = ['_SDC_EXTRACTED_AT', '_SDC_BATCHED_AT', '_SDC_DELETED_AT'] + snowflake.merge_tables( + target['schema'], target['temp'], target['table'], + list(columns_diff['source_columns'].keys()) + added_metadata_columns, primary_keys) + if args.target['hard_delete'] is True: + snowflake.partial_hard_delete(target['schema'], target['table'], where_clause_sql) + snowflake.drop_table(target['schema'], target['temp']) def update_state_file(args: argparse.Namespace, bookmark: Dict) -> None: @@ -117,3 +151,19 @@ def _get_args_parser_for_partialsync(): ) return parser + + +def _get_removed_columns(source_columns_dict, target_columns_dict): + # ignoring columns added by PPW + default_columns_added_by_ppw = {'"_SDC_EXTRACTED_AT"', '"_SDC_BATCHED_AT"', '"_SDC_DELETED_AT"'} + + removed_columns = set(target_columns_dict) - set(source_columns_dict) + removed_columns = removed_columns - default_columns_added_by_ppw + removed_columns = {key: target_columns_dict[key] for key in removed_columns} + return removed_columns + + +def _get_added_columns(source_columns_dict, target_columns_dict): + added_columns = set(source_columns_dict) - set(target_columns_dict) + added_columns = {key: source_columns_dict[key] for key in added_columns} + return added_columns diff --git a/tests/end_to_end/helpers/assertions.py b/tests/end_to_end/helpers/assertions.py index 1fb93e7c6..df80e73b9 100644 --- a/tests/end_to_end/helpers/assertions.py +++ b/tests/end_to_end/helpers/assertions.py @@ -4,6 +4,8 @@ from typing import List, Set, Union from pathlib import Path +from unittest import TestCase +from contextlib import contextmanager from . import tasks from . import db @@ -52,30 +54,83 @@ def assert_resync_tables_success(tap, target, profiling=False): ) -# pylint: disable=too-many-arguments -def assert_partial_sync_table_success(env, tap, tap_type, target, source_db, table, column, start_value, end_value): +# pylint: disable=invalid-name +def assert_partial_sync_table_success(tap_parameters, start_value, end_value, min_pk_value_for_target_missed_records): """Partial sync a specific tap and make sure that it finished successfully and state file is created with the right content""" - # It should be ran one time before for partial sync - assert_resync_tables_success(tap, target, profiling=False) + + # Deleting all records from the target with primary key greater than 5 + tap_parameters['env'].delete_record_from_target_snowflake( + tap_type=tap_parameters['tap_type'], + table=tap_parameters['table'], + where_clause=f'WHERE {tap_parameters["column"]}>{min_pk_value_for_target_missed_records}' + ) + + command = _get_command_for_partial_sync(tap_parameters, start_value, end_value) + + [return_code, stdout, stderr] = tasks.run_command(command) + log_file = tasks.find_run_tap_log_file(stdout, 'partialsync') + assert_command_success(return_code, stdout, stderr, log_file) + + +def assert_partial_sync_table_with_target_additional_columns( + tap_parameters, additional_column, + start_value, end_value, min_pk_value_for_target_missed_records): + """Assert partial sync table command with additional column in the target""" + + # Deleting all records from the target except the first one + tap_parameters['env'].delete_record_from_target_snowflake( + tap_type=tap_parameters['tap_type'], + table=tap_parameters['table'], + where_clause=f'WHERE {tap_parameters["column"]}>{min_pk_value_for_target_missed_records}' + ) + + # Add a new column in the target + tap_parameters['env'].add_column_into_target_sf( + tap_type=tap_parameters['tap_type'], + table=tap_parameters['table'], + new_column=additional_column + ) + + command = _get_command_for_partial_sync(tap_parameters, start_value, end_value) + + [return_code, stdout, stderr] = tasks.run_command(command) + log_file = tasks.find_run_tap_log_file(stdout, 'partialsync') + assert_command_success(return_code, stdout, stderr, log_file) + + +def assert_partial_sync_table_with_source_additional_columns( + tap_parameters, additional_column, + start_value, end_value, min_pk_value_for_target_missed_records): + """Assert partial sync table command with additional columns in the source""" # Deleting all records from the target except the first one - env.delete_record_from_target_snowflake(tap_type=tap_type, - table=table, - where_clause=f'WHERE {column}>1') + tap_parameters['env'].delete_record_from_target_snowflake( + tap_type=tap_parameters['tap_type'], + table=tap_parameters['table'], + where_clause=f'WHERE {tap_parameters["column"]}>{min_pk_value_for_target_missed_records}' + ) - command = f'pipelinewise partial_sync_table --tap {tap} --target {target}' \ - f' --table {source_db}.{table} --column {column} --start_value {start_value} --end_value {end_value}' + # Add a new column in the source + tap_parameters['env'].add_column_into_source( + tap_type=tap_parameters['tap_type'], + table=tap_parameters['table'], + new_column=additional_column + ) + + command = _get_command_for_partial_sync(tap_parameters, start_value, end_value) [return_code, stdout, stderr] = tasks.run_command(command) log_file = tasks.find_run_tap_log_file(stdout, 'partialsync') assert_command_success(return_code, stdout, stderr, log_file) -def assert_partial_sync_rows_in_target(env, tap_type, table, column_index, expected_column_values): +def assert_partial_sync_rows_in_target(env, tap_type, table, column, primary_key, expected_column_values): """Assert only expected rows are synced in the target snowflake""" - records = env.get_records_from_target_snowflake(tap_type=tap_type, table=table) - list_of_column_values = [column[column_index] for column in records] + records = env.get_records_from_target_snowflake( + tap_type=tap_type, table=table, column=column, primary_key=primary_key + ) + list_of_column_values = [column[0] for column in records] assert expected_column_values == list_of_column_values @@ -445,3 +500,21 @@ def assert_profiling_stats_files_created( if isinstance(tap, list): for tap_ in tap: assert f'tap_{tap_}.pstat' in pstat_files + +# pylint: disable=raise-missing-from +@contextmanager +def assert_not_raises(exc_type): + """Assert exception not raised""" + try: + yield None + except exc_type: + raise TestCase.failureException(f'{exc_type.__name__} raised!') + + +def _get_command_for_partial_sync(tap_parameters, start_value, end_value=None): + end_value_command = f' --end_value {end_value}' if end_value else '' + command = f'pipelinewise partial_sync_table --tap {tap_parameters["tap"]} --target {tap_parameters["target"]}' \ + f' --table {tap_parameters["source_db"]}.{tap_parameters["table"]} --column {tap_parameters["column"]}' \ + f' --start_value {start_value} --end_value {end_value}{end_value_command}' + + return command diff --git a/tests/end_to_end/helpers/env.py b/tests/end_to_end/helpers/env.py index 127cc4405..8887ed882 100644 --- a/tests/end_to_end/helpers/env.py +++ b/tests/end_to_end/helpers/env.py @@ -655,10 +655,45 @@ def delete_record_from_target_snowflake(self, tap_type, table, where_clause): f'DELETE from ppw_e2e_tap_{tap_type}{self.sf_schema_postfix}.{table} {where_clause}' ) - def get_records_from_target_snowflake(self, tap_type, table): + def add_column_into_target_sf(self, tap_type, table, new_column) : + """Add a record into the target""" + self.run_query_target_snowflake( + f'ALTER TABLE ppw_e2e_tap_{tap_type}{self.sf_schema_postfix}.{table} ADD {new_column["name"]} int' + ) + self.run_query_target_snowflake( + f'UPDATE ppw_e2e_tap_{tap_type}{self.sf_schema_postfix}.{table}' + f' SET {new_column["name"]}={new_column["value"]} WHERE 1=1' + ) + + def add_column_into_source(self, tap_type, table, new_column): + """Add a column into the source table""" + run_query_method = getattr(self, f'run_query_tap_{tap_type}') + run_query_method( + f'ALTER TABLE {table} ADD {new_column["name"]} int' + ) + run_query_method( + f'UPDATE {table} set {new_column["name"]}={new_column["value"]} where 1=1' + ) + + def delete_record_from_source(self, tap_type, table, where_clause): + """Delete a record from the source""" + run_query_method = getattr(self, f'run_query_tap_{tap_type}') + run_query_method( + f'DELETE FROM {table} {where_clause}' + ) + + def run_query_on_source(self, tap_type, query): + """Running a query on the source""" + run_query_method = getattr(self, f'run_query_tap_{tap_type}') + run_query_method( + query + ) + + def get_records_from_target_snowflake(self, tap_type, table, column, primary_key): """"Getting all records from a specific table of snowflake target""" records = self.run_query_target_snowflake( - f'SELECT * from ppw_e2e_tap_{tap_type}{self.sf_schema_postfix}.{table}' + f'SELECT {column} FROM ppw_e2e_tap_{tap_type}{self.sf_schema_postfix}.{table}' + f' ORDER BY "{primary_key.upper()}"' ) return records diff --git a/tests/end_to_end/target_snowflake/__init__.py b/tests/end_to_end/target_snowflake/__init__.py index 2e72051e3..91f419926 100644 --- a/tests/end_to_end/target_snowflake/__init__.py +++ b/tests/end_to_end/target_snowflake/__init__.py @@ -34,10 +34,12 @@ def setUp(self, tap_id: str, target_id: str, tap_type: str): self.check_validate_taps() self.check_import_config() + self.tap_type = tap_type def tearDown(self): self.remove_dir_from_config_dir(f'{self.target_id}/{self.tap_id}') - self.drop_sf_schema_if_exists(f'{self.tap_id}{self.e2e_env.sf_schema_postfix}') + self.drop_sf_schema_if_exists(f'ppw_e2e_{self.tap_type}{self.e2e_env.sf_schema_postfix}'.upper()) + self.drop_sf_schema_if_exists(f'ppw_e2e_{self.tap_type}_public2{self.e2e_env.sf_schema_postfix}'.upper()) super().tearDown() # pylint: disable=no-self-use diff --git a/tests/end_to_end/target_snowflake/tap_mariadb/test_partial_sync_mariadb_to_sf.py b/tests/end_to_end/target_snowflake/tap_mariadb/test_partial_sync_mariadb_to_sf.py index e3d40b9c1..3e18476b9 100644 --- a/tests/end_to_end/target_snowflake/tap_mariadb/test_partial_sync_mariadb_to_sf.py +++ b/tests/end_to_end/target_snowflake/tap_mariadb/test_partial_sync_mariadb_to_sf.py @@ -1,12 +1,6 @@ from tests.end_to_end.helpers import assertions from tests.end_to_end.target_snowflake.tap_mariadb import TapMariaDB - -TAP_ID = 'mariadb_to_sf' -TARGET_ID = 'snowflake' -TABLE = 'weight_unit' -COLUMN = 'weight_unit_id' -START_VALUE = '5' -END_VALUE = '7' +from datetime import datetime class TestPartialSyncMariaDBToSF(TapMariaDB): @@ -16,31 +10,165 @@ class TestPartialSyncMariaDBToSF(TapMariaDB): # pylint: disable=arguments-differ def setUp(self): - super().setUp(tap_id=TAP_ID, target_id=TARGET_ID) + self.table = 'weight_unit' + self.column = 'weight_unit_id' + super().setUp(tap_id='mariadb_to_sf', target_id='snowflake') + # It should be ran one time before for partial sync + assertions.assert_resync_tables_success(self.tap_id, self.target_id, profiling=False) + self.tap_parameters = { + 'env': self.e2e_env, + 'tap': self.tap_id, + 'tap_type': 'mysql', + 'target': self.target_id, + 'source_db': self.e2e_env.get_conn_env_var('TAP_MYSQL', 'DB'), + 'table': self.table, + 'column': self.column + } def test_partial_sync_mariadb_to_sf(self): """ Test partial sync table from MariaDB to Snowflake """ - source_db = self.e2e_env.get_conn_env_var('TAP_MYSQL', 'DB') + + assertions.assert_partial_sync_table_success( + self.tap_parameters, + start_value=4, + end_value=6, + min_pk_value_for_target_missed_records=1 + ) + + # for this test, all records with id > 1 are deleted from the target and then will do a partial sync + expected_records_for_column = [1, 4, 5, 6] + column_to_check = primary_key = self.column + + assertions.assert_partial_sync_rows_in_target( + self.e2e_env, 'mysql', self.table, column_to_check, primary_key, expected_records_for_column + ) + + def test_partial_sync_if_there_is_additional_column_in_source(self): + """ + Test partial sync table from MariaDB to Snowflake if there are additional columns in source + """ + + additional_column = 'FOO_NEW_COLUMN_SOURCE' + additional_column_value = 345 + assertions.assert_partial_sync_table_with_source_additional_columns( + self.tap_parameters, + additional_column={'name': additional_column, 'value': additional_column_value}, + start_value=4, + end_value=6, + min_pk_value_for_target_missed_records=1 + ) + + # for this test, all records with id > 1 are deleted from the target and then will do a partial sync + # records start_value to end_value will be with the value same as source because + # out of this range wont be touched and they will have None + + expected_records_for_column = [None, additional_column_value, additional_column_value, additional_column_value] + primary_key = self.column + assertions.assert_partial_sync_rows_in_target( + self.e2e_env, 'mysql', self.table, additional_column, primary_key, expected_records_for_column + ) + + def test_partial_sync_if_there_is_additional_column_in_target(self): + """ + Test partial sync table from MariaDB to Snowflake if there are additional columns in target + """ + + additional_column_value = 567 + additional_column = 'FOO_NEW_COLUMN_TARGET' + assertions.assert_partial_sync_table_with_target_additional_columns( + self.tap_parameters, + additional_column={'name': additional_column, 'value': additional_column_value}, + start_value=4, + end_value=6, + min_pk_value_for_target_missed_records=1 + ) + + # for this test, all records with id > 1 are deleted from the target and then will do a partial sync + # records start_value to end_value should be None because these columns do not exist in the source and records + # out of this range wont be touched and they will have their original value + expected_records_for_column = [additional_column_value, None, None, None] + primary_key = self.column + + assertions.assert_partial_sync_rows_in_target( + self.e2e_env, 'mysql', self.table, additional_column, primary_key, expected_records_for_column + ) + + def test_partial_sync_if_record_is_deleted_from_the_source_and_hard_delete(self): + """ + Test partial sync table from MariaDB to SF if hard delete is selected and a record is deleted from the source + """ + self.e2e_env.delete_record_from_source('mysql', self.table, 'WHERE weight_unit_id=5') + assertions.assert_partial_sync_table_success( - self.e2e_env, - self.tap_id, - 'mysql', - self.target_id, - source_db, - TABLE, - COLUMN, - START_VALUE, - END_VALUE + self.tap_parameters, + start_value=4, + end_value=6, + min_pk_value_for_target_missed_records=1 + ) + + # for this test, all records with id > 1 are deleted from the target and then will do a partial sync + expected_records_for_column = [1, 4, 6] + column_to_check = primary_key = self.column + + assertions.assert_partial_sync_rows_in_target( + self.e2e_env, 'mysql', self.table, column_to_check, primary_key, expected_records_for_column ) - index_of_column = 0 + +class TestPartialSyncMariaDBToSFSoftDelete(TapMariaDB): + """ + Test cases for Partial sync table from MariaDB to Snowflake if set to soft delete + """ + + # pylint: disable=arguments-differ + def setUp(self): + self.table = 'weight_unit' + self.column = 'weight_unit_id' + super().setUp(tap_id='mariadb_to_sf_soft_delete', target_id='snowflake') + # It should be ran one time before for partial sync + assertions.assert_resync_tables_success(self.tap_id, self.target_id, profiling=False) + self.tap_parameters = { + 'env': self.e2e_env, + 'tap': self.tap_id, + 'tap_type': 'mysql', + 'target': self.target_id, + 'source_db': self.e2e_env.get_conn_env_var('TAP_MYSQL', 'DB'), + 'table': self.table, + 'column': self.column + } + + def test_partial_sync_if_record_is_deleted_from_the_source_and_soft_delete(self): + """ + Test partial sync table from MariaDB to SF if soft delete is selected and a record is deleted from the source + """ + self.e2e_env.delete_record_from_source('mysql', self.table, 'WHERE weight_unit_id=5') + + assertions.assert_partial_sync_table_success( + self.tap_parameters, + start_value=4, + end_value=6, + min_pk_value_for_target_missed_records=5 + ) # for this test, all records with id > 1 are deleted from the target and then will do a partial sync - # for 7=> id =>5 - expected_records_for_column = [1, 5, 6, 7] + expected_records_for_column = [1, 2, 3, 4, 5, 6] + column_to_check = primary_key = 'weight_unit_id' assertions.assert_partial_sync_rows_in_target( - self.e2e_env, 'mysql', TABLE, index_of_column, expected_records_for_column + self.e2e_env, 'mysql', self.table, column_to_check, primary_key, expected_records_for_column + ) + + expected_metadata = [None, None, None, None, 'TIME_STAMP', None] + + records = self.e2e_env.get_records_from_target_snowflake( + tap_type='mysql', table=self.table, column='_SDC_DELETED_AT', primary_key=primary_key ) + list_of_column_values = [column[0] for column in records] + + *first_part, sdc_delete, end_part = list_of_column_values + self.assertListEqual(first_part, expected_metadata[:4]) + self.assertEqual(end_part, expected_metadata[-1]) + with assertions.assert_not_raises(ValueError): + datetime.strptime(sdc_delete[:19], '%Y-%m-%d %H:%M:%S') diff --git a/tests/end_to_end/target_snowflake/tap_postgres/test_partial_sync_pg_to_sf.py b/tests/end_to_end/target_snowflake/tap_postgres/test_partial_sync_pg_to_sf.py index dcf341b9e..b45808183 100644 --- a/tests/end_to_end/target_snowflake/tap_postgres/test_partial_sync_pg_to_sf.py +++ b/tests/end_to_end/target_snowflake/tap_postgres/test_partial_sync_pg_to_sf.py @@ -1,45 +1,186 @@ +from datetime import datetime from tests.end_to_end.helpers import assertions -from tests.end_to_end.target_snowflake.tap_mariadb import TapMariaDB +from tests.end_to_end.target_snowflake.tap_postgres import TapPostgres -TAP_ID = 'postgres_to_sf' -TARGET_ID = 'snowflake' -TABLE = 'edgydata' -COLUMN = 'cid' -START_VALUE = '3' -END_VALUE = '5' - -class TestPartialSyncMariaDBToSF(TapMariaDB): +class TestPartialSyncPGToSF(TapPostgres): """ - Test cases for Partial sync table from MariaDB to Snowflake + Test cases for Partial sync table from Postgres to Snowflake """ # pylint: disable=arguments-differ def setUp(self): - super().setUp(tap_id=TAP_ID, target_id=TARGET_ID) + self.table = 'edgydata' + self.column = 'cid' + super().setUp(tap_id='postgres_to_sf', target_id='snowflake') + + self.tap_parameters = { + 'env': self.e2e_env, + 'tap': self.tap_id, + 'tap_type': 'postgres', + 'target': self.target_id, + 'source_db': 'public', + 'table': self.table, + 'column': self.column, + } + # It should be ran one time before for partial sync + while True: + # Repeating resync until it is successful and there are records in target + assertions.assert_resync_tables_success(self.tap_id, self.target_id) + records = self.e2e_env.get_records_from_target_snowflake( + tap_type='postgres', table=self.table, column=self.column, primary_key=self.column + ) + if records: + break - def test_partial_sync_mariadb_to_sf(self): + def test_partial_sync_pg_to_sf(self): """ Test partial sync table from MariaDB to Snowflake """ + assertions.assert_partial_sync_table_success( - self.e2e_env, - self.tap_id, - 'postgres', - self.target_id, - 'public', - TABLE, - COLUMN, - START_VALUE, - END_VALUE + self.tap_parameters, + start_value=3, + end_value=7, + min_pk_value_for_target_missed_records=1 + ) + + # for this test, all records with id > 1 are deleted from the target and then will do a partial sync + expected_records_for_column = [1, 3, 4, 5, 6, 7] + column_to_check = primary_key = self.column + + assertions.assert_partial_sync_rows_in_target( + self.e2e_env, 'postgres', self.table, column_to_check, primary_key, expected_records_for_column + ) + + def test_partial_sync_if_there_is_additional_column_in_source(self): + """ + Test partial sync table from PG to Snowflake if there are additional columns in source + """ + additional_column = 'FOO_NEW_COLUMN_SOURCE' + additional_column_value = 567 + + assertions.assert_partial_sync_table_with_source_additional_columns( + self.tap_parameters, + additional_column={'name': additional_column, 'value': additional_column_value}, + start_value=4, + end_value=6, + min_pk_value_for_target_missed_records=2 + ) + + # for this test, all records with id > 2 are deleted from the target and then will do a partial sync + # It is expected records 4 to 6 be with the value same as source + # out of this range should have None in target + expected_records_for_column = [ + None, None, additional_column_value, additional_column_value, additional_column_value + ] + primary_key = self.column + assertions.assert_partial_sync_rows_in_target( + self.e2e_env, 'postgres', self.table, additional_column, primary_key, expected_records_for_column + ) + + def test_partial_sync_if_there_is_additional_column_in_target(self): + """ + Test partial sync table from PG to Snowflake if there are additional columns in target + """ + additional_column_value = 987 + additional_column = 'FOO_NEW_COLUMN_TARGET' + assertions.assert_partial_sync_table_with_target_additional_columns( + self.tap_parameters, + additional_column={'name': additional_column, 'value': additional_column_value }, + start_value=4, + end_value=7, + min_pk_value_for_target_missed_records=2 + ) + + # for this test, all records with id > 2 are deleted from the target and then will do a partial sync + # It is expected records 4 to 7 be None value because this column does not exist in the source and records + # out of this range wont be touched and they will have their original value + expected_records_for_column = [additional_column_value , additional_column_value , None, None, None, None] + primary_key = self.column + + assertions.assert_partial_sync_rows_in_target( + self.e2e_env, 'postgres', self.table, additional_column, primary_key, expected_records_for_column ) - index_of_column = 0 + def test_partial_sync_if_record_is_deleted_from_the_source_and_hard_delete(self): + """ + Test partial sync table from PG to Snowflake if hard delete is selected and a record is deleted from the source + """ + self.e2e_env.delete_record_from_source('postgres', self.table, 'WHERE cid=5') + assertions.assert_partial_sync_table_success( + self.tap_parameters, + start_value=4, + end_value=6, + min_pk_value_for_target_missed_records=1 + ) # for this test, all records with id > 1 are deleted from the target and then will do a partial sync - # for 5=> id =>3 - expected_records_for_column = [1, 3, 4, 5] + expected_records_for_column = [1, 4, 6] + column_to_check = primary_key = self.column + + assertions.assert_partial_sync_rows_in_target( + self.e2e_env, 'postgres', self.table, column_to_check, primary_key, expected_records_for_column + ) + + +class TestPartialSyncPGToSFSoftDelete(TapPostgres): + """ + Test cases for Partial sync table from Postgres to Snowflake if set to soft delete + """ + # pylint: disable=arguments-differ + def setUp(self): + self.table = 'edgydata' + self.column = 'cid' + super().setUp(tap_id='postgres_to_sf_soft_delete', target_id='snowflake') + self.tap_parameters = { + 'env': self.e2e_env, + 'tap': self.tap_id, + 'tap_type': 'postgres', + 'target': self.target_id, + 'source_db': 'public', + 'table': self.table, + 'column': self.column, + } + # It should be ran one time before for partial sync + while True: + assertions.assert_resync_tables_success(self.tap_id, self.target_id) + records = self.e2e_env.get_records_from_target_snowflake( + tap_type='postgres', table=self.table, column=self.column, primary_key=self.column + ) + if records: + break + + def test_partial_sync_if_record_is_deleted_from_the_source_and_soft_delete(self): + """ + Test partial sync table from PG to Snowflake if soft delete is selected and a record is deleted from the source + """ + self.e2e_env.delete_record_from_source('postgres', self.table, 'WHERE cid=5') + + assertions.assert_partial_sync_table_success( + self.tap_parameters, + start_value=4, + end_value=6, + min_pk_value_for_target_missed_records=5 + ) + + # for this test, all records with id > 3 are deleted from the target and then will do a partial sync + expected_records_for_column = [1, 2, 3, 4, 5, 6] + column_to_check = primary_key = self.column assertions.assert_partial_sync_rows_in_target( - self.e2e_env, 'postgres', TABLE, index_of_column, expected_records_for_column + self.e2e_env, 'postgres', self.table, column_to_check, primary_key, expected_records_for_column + ) + + expected_metadata = [None, None, None, None, 'TIME_STAMP', None] + + records = self.e2e_env.get_records_from_target_snowflake( + tap_type='postgres', table=self.table, column='_SDC_DELETED_AT', primary_key=primary_key ) + list_of_column_values = [column[0] for column in records] + + *first_part, sdc_delete, end_part = list_of_column_values + self.assertListEqual(first_part, expected_metadata[:4]) + self.assertEqual(end_part, expected_metadata[-1]) + with assertions.assert_not_raises(ValueError): + datetime.strptime(sdc_delete[:19], '%Y-%m-%d %H:%M:%S') diff --git a/tests/end_to_end/test-project/tap_mysql_to_sf_soft_delete.yml.template b/tests/end_to_end/test-project/tap_mysql_to_sf_soft_delete.yml.template new file mode 100644 index 000000000..8587d21d3 --- /dev/null +++ b/tests/end_to_end/test-project/tap_mysql_to_sf_soft_delete.yml.template @@ -0,0 +1,113 @@ +--- + +# ------------------------------------------------------------------------------ +# General Properties +# ------------------------------------------------------------------------------ +id: "mariadb_to_sf_soft_delete" +name: "MariaDB source test database" +type: "tap-mysql" +owner: "test-runner" + + +# ------------------------------------------------------------------------------ +# Source (Tap) - MySQL connection details +# ------------------------------------------------------------------------------ +db_conn: + host: "${TAP_MYSQL_HOST}" # MySQL host + port: ${TAP_MYSQL_PORT} # MySQL port + user: "${TAP_MYSQL_USER}" # MySQL user + password: "${TAP_MYSQL_PASSWORD}" # Plain string or vault encrypted + dbname: "${TAP_MYSQL_DB}" # MySQL database name + use_gtid: true + engine: mariadb + +# ------------------------------------------------------------------------------ +# Destination (Target) - Target properties +# Connection details should be in the relevant target YAML file +# ------------------------------------------------------------------------------ +target: "snowflake" # ID of the target connector where the data will be loaded +batch_size_rows: 20000 # Batch size for the stream to optimise load performance +stream_buffer_size: 0 # In-memory buffer size (MB) between taps and targets for asynchronous data pipes +hard_delete: false + +# ------------------------------------------------------------------------------ +# Source to target Schema mapping +# ------------------------------------------------------------------------------ +schemas: + - source_schema: "mysql_source_db" + target_schema: "ppw_e2e_tap_mysql${TARGET_SNOWFLAKE_SCHEMA_POSTFIX}" + + tables: + ### Table with LOG_BASED replication + - table_name: "weight_unit" + replication_method: "LOG_BASED" + transformations: + - column: "weight_unit_name" + type: "HASH-SKIP-FIRST-2" + + ### Table with INCREMENTAL replication + - table_name: "address" + replication_method: "INCREMENTAL" + replication_key: "date_updated" + transformations: + - column: "zip_code_zip_code_id" + type: "MASK-NUMBER" + when: + - column: 'street_number' + regex_match: '[801]' + + - column: "date_created" + type: "MASK-DATE" + + ### Table with FULL_TABLE replication + - table_name: "order" + replication_method: "FULL_TABLE" + + ### Table with no primary key + - table_name: "no_pk_table" + replication_method: "FULL_TABLE" + + ### Table with binary and varbinary columns + - table_name: "table_with_binary" + replication_method: "LOG_BASED" + + ### Table with reserved words and columns and special characters + - table_name: "edgydata" + replication_method: "LOG_BASED" + transformations: + - column: "case" + type: "HASH" + + - column: "group" + type: "MASK-NUMBER" + when: + - column: 'case' + equals: 'A' + - column: "group" + type: "SET-NULL" + when: + - column: 'case' + equals: 'B' + + ### Table with reserved word + - table_name: "full" + replication_method: "INCREMENTAL" + replication_key: "begin" + + ### Table with space and mixed upper and lowercase characters + - table_name: "table_with_space and UPPERCase" + replication_method: "LOG_BASED" + + ### Table with all possible data types + - table_name: "all_datatypes" + replication_method: "LOG_BASED" + + ### Table with LOG_BASED replication + - table_name: "customers" + replication_method: "LOG_BASED" + transformations: + - column: "phone" + type: "MASK-STRING-SKIP-ENDS-2" + + - column: "email" + type: "MASK-STRING-SKIP-ENDS-6" diff --git a/tests/end_to_end/test-project/tap_postgres_to_sf_soft_delete.yml.template b/tests/end_to_end/test-project/tap_postgres_to_sf_soft_delete.yml.template new file mode 100644 index 000000000..80781fd72 --- /dev/null +++ b/tests/end_to_end/test-project/tap_postgres_to_sf_soft_delete.yml.template @@ -0,0 +1,123 @@ +--- + +# ------------------------------------------------------------------------------ +# General Properties +# ------------------------------------------------------------------------------ +id: "postgres_to_sf_soft_delete" +name: "PostgreSQL source test database" +type: "tap-postgres" +owner: "test-runner" + + +# ------------------------------------------------------------------------------ +# Source (Tap) - PostgreSQL connection details +# ------------------------------------------------------------------------------ +db_conn: + host: "${TAP_POSTGRES_HOST}" # PostgreSQL host + logical_poll_total_seconds: 3 # Time out if no LOG_BASED changes received for 3 seconds + port: ${TAP_POSTGRES_PORT} # PostgreSQL port + user: "${TAP_POSTGRES_USER}" # PostgreSQL user + password: "${TAP_POSTGRES_PASSWORD}" # Plain string or vault encrypted + dbname: "${TAP_POSTGRES_DB}" # PostgreSQL database name + + +# ------------------------------------------------------------------------------ +# Destination (Target) - Target properties +# Connection details should be in the relevant target YAML file +# ------------------------------------------------------------------------------ +target: "snowflake" # ID of the target connector where the data will be loaded +batch_size_rows: 1000 # Batch size for the stream to optimise load performance +stream_buffer_size: 0 # In-memory buffer size (MB) between taps and targets for asynchronous data pipes +hard_delete: false + + +# ------------------------------------------------------------------------------ +# Source to target Schema mapping +# ------------------------------------------------------------------------------ +schemas: + + ### SOURCE SCHEMA 1: public + - source_schema: "public" + target_schema: "ppw_e2e_tap_postgres${TARGET_SNOWFLAKE_SCHEMA_POSTFIX}" + + tables: + + ### Table with INCREMENTAL replication + - table_name: "city" + replication_method: "INCREMENTAL" + replication_key: "id" + + ### Table with FULL_TABLE replication + - table_name: "country" + replication_method: "FULL_TABLE" + + ### Table with no primary key + - table_name: "no_pk_table" + replication_method: "FULL_TABLE" + + ### Table with reserved words and columns and special characters + - table_name: "edgydata" + replication_method: "INCREMENTAL" + replication_key: "cid" + transformations: + - column: "cvarchar" + type: "HASH-SKIP-FIRST-3" + + ### Table with reserved word + - table_name: "order" + replication_method: "INCREMENTAL" + replication_key: "id" + + ### Table with space and mixed upper and lowercase characters + - table_name: "table_with_space and UPPERCase" + replication_method: "LOG_BASED" + + ### Table with space and mixed upper and lowercase characters + - table_name: "table_with_reserved_words" + replication_method: "FULL_TABLE" + + ### Table with INCREMENTAL replication + - table_name: "customers" + replication_method: "INCREMENTAL" + replication_key: "id" + transformations: + - column: "phone" + type: "MASK-STRING-SKIP-ENDS-2" + + - column: "email" + type: "MASK-STRING-SKIP-ENDS-6" + + ### Empty table + - table_name: "empty_table" + replication_method: "INCREMENTAL" + replication_key: "id" + + ### SOURCE SCHEMA 2: public2 + - source_schema: "public2" + target_schema: "ppw_e2e_tap_postgres_public2${TARGET_SNOWFLAKE_SCHEMA_POSTFIX}" + + tables: + ### Table with FULL_TABLE replication + - table_name: "wearehere" + replication_method: "FULL_TABLE" + + ### Table with reserved words and columns and special characters + - table_name: "public2_edgydata" + replication_method: "INCREMENTAL" + replication_key: "cid" + + ### SOURCE SCHEMA 3: logical 1 + #- source_schema: "logical1" + # target_schema: "ppw_e2e_tap_postgres_logical1${TARGET_SNOWFLAKE_SCHEMA_POSTFIX}" + # + # tables: + # - table_name: "logical1_table1" + # replication_method: "LOG_BASED" + # - table_name: "logical1_table2" + # - table_name: "logical1_edgydata" + + ### SOURCE SCHEMA 4: logical2 + #- source_schema: "logical2" + # target_schema: "ppw_e2e_tap_postgres_logical2${TARGET_SNOWFLAKE_SCHEMA_POSTFIX}" + # tables: + # - table_name: "logical2_table1" \ No newline at end of file diff --git a/tests/units/fastsync/commons/test_fastsync_target_snowflake.py b/tests/units/fastsync/commons/test_fastsync_target_snowflake.py index db88c2d12..1209a0afa 100644 --- a/tests/units/fastsync/commons/test_fastsync_target_snowflake.py +++ b/tests/units/fastsync/commons/test_fastsync_target_snowflake.py @@ -81,6 +81,43 @@ def test_drop_table(self): 'DROP TABLE IF EXISTS test_schema."TEST TABLE WITH SPACE_TEMP"', ]) + def test_merge_tables(self): + """Validate if merge tables query is generated correctly""" + schema = 'test_schema' + source_table = 'source_table' + target_table = 'target_table' + list_of_columns = ['p1', 'col1', 'p2', 'col2', 'col3'] + primary_keys = ['p1', 'p2'] + on_clause = ' AND '.join( + [f'"{source_table.upper()}".{p.upper()} = "{target_table.upper()}".{p.upper()}' for p in primary_keys] + ) + update_clause = ', '.join( + [f'"{target_table.upper()}".{c.upper()} = "{source_table.upper()}".{c.upper()}' for c in list_of_columns] + ) + columns_for_insert = ', '.join([c.upper() for c in list_of_columns]) + values = ', '.join([f'"{source_table.upper()}".{c.upper()}' for c in list_of_columns]) + + expected_merge_query = f'MERGE INTO {schema}."{target_table.upper()}" USING {schema}."{source_table.upper()}"' \ + f' ON {on_clause}' \ + f' WHEN MATCHED THEN UPDATE SET {update_clause}' \ + f' WHEN NOT MATCHED THEN INSERT ({columns_for_insert})' \ + f' VALUES ({values})' + + self.snowflake.merge_tables(schema, source_table, target_table, list_of_columns, primary_keys) + + self.assertListEqual(self.snowflake.executed_queries, [expected_merge_query]) + + def test_add_columns(self): + """Test add_column method works as expected""" + schema = 'test_schema' + table = 'test_table' + adding_columns = {'col1': 'type1', 'col2': 'type2'} + columns_query = ', '.join([f'{col_name} {col_type}' for col_name, col_type in adding_columns.items()]) + expected_query = f'ALTER TABLE {schema}."{table.upper()}" ADD {columns_query}' + + self.snowflake.add_columns(schema, table, adding_columns) + self.assertListEqual(self.snowflake.executed_queries, [expected_query]) + def test_create_table(self): """Validate if create table queries generated correctly""" # Create table with standard table and column names diff --git a/tests/units/partialsync/resources/test_partial_sync/tmp/target_config_tmp.json b/tests/units/partialsync/resources/test_partial_sync/tmp/target_config_tmp.json index aba30b413..67ca9df09 100644 --- a/tests/units/partialsync/resources/test_partial_sync/tmp/target_config_tmp.json +++ b/tests/units/partialsync/resources/test_partial_sync/tmp/target_config_tmp.json @@ -33,5 +33,7 @@ "temp_dir": "/foo", "user": "FOO_USER", "validate_records": false, - "warehouse": "FOO_WAREHOUSE" + "warehouse": "FOO_WAREHOUSE", + "default_target_schema": "foo_schema" + } diff --git a/tests/units/partialsync/resources/test_partial_sync_utils/__init__.py b/tests/units/partialsync/resources/test_partial_sync_utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/units/partialsync/resources/test_partial_sync_utils/sample_sf_columns.py b/tests/units/partialsync/resources/test_partial_sync_utils/sample_sf_columns.py new file mode 100644 index 000000000..8ce878435 --- /dev/null +++ b/tests/units/partialsync/resources/test_partial_sync_utils/sample_sf_columns.py @@ -0,0 +1,107 @@ +SAMPLE_OUTPUT_FROM_SF = [ + { + 'table_name': 'BAR_TABLE', + 'schema_name': 'FOO_SCHEMA', + 'column_name': 'FOO_COLUMN_1', + 'data_type': '{"type":"FIXED","precision":38,"scale":0,"nullable":true}', + 'null?': 'true', + 'default': '', + 'kind': 'COLUMN', + 'expression': '', + 'comment': '', + 'database_name': 'FOO_DB', + 'autoincrement': '' + }, + { + 'table_name': 'BAR_TABLE', + 'schema_name': 'FOO_SCHEMA', + 'column_name': 'FOO_COLUMN_2', + 'data_type': '{"type":"TEXT","length":16777216,"byteLength":16777216,"nullable":true,"fixed":false}', + 'null?': 'true', + 'default': '', + 'kind': 'COLUMN', + 'expression': '', + 'comment': '', + 'database_name': 'FOO_DB', + 'autoincrement': '' + }, + { + 'table_name': 'BAR_TABLE', + 'schema_name': 'FOO_SCHEMA', + 'column_name': 'FOO_COLUMN_3', + 'data_type': '{"type":"TEXT","length":16777216,"byteLength":16777216,"nullable":true,"fixed":false}', + 'null?': 'true', + 'default': '', + 'kind': 'COLUMN', + 'expression': '', + 'comment': '', + 'database_name': 'FOO_DB', + 'autoincrement': '' + }, + + { + 'table_name': 'BAR_TABLE', + 'schema_name': 'FOO_SCHEMA', + 'column_name': 'FOO_COLUMN_4', + 'data_type': '{"type":"NUMBER","nullable":false,"fixed":false}', + 'null?': 'true', + 'default': '', + 'kind': 'COLUMN', + 'expression': '', + 'comment': '', + 'database_name': 'FOO_DB', + 'autoincrement': '' + }, + { + 'table_name': 'BAR_TABLE', + 'schema_name': 'FOO_SCHEMA', + 'column_name': '_SDC_EXTRACTED_AT', + 'data_type': '{"type":"TIMESTAMP_NTZ","nullable":false,"fixed":false}', + 'null?': 'true', + 'default': '', + 'kind': 'COLUMN', + 'expression': '', + 'comment': '', + 'database_name': 'FOO_DB', + 'autoincrement': '' + }, + { + 'table_name': 'BAR_TABLE', + 'schema_name': 'FOO_SCHEMA', + 'column_name': '_SDC_BATCHED_AT', + 'data_type': '{"type":"TIMESTAMP_NTZ","nullable":false,"fixed":false}', + 'null?': 'true', + 'default': '', + 'kind': 'COLUMN', + 'expression': '', + 'comment': '', + 'database_name': 'FOO_DB', + 'autoincrement': '' + }, + { + 'table_name': 'BAR_TABLE', + 'schema_name': 'FOO_SCHEMA', + 'column_name': '_SDC_DELETED_AT', + 'data_type': '{"type":"TIMESTAMP_NTZ","nullable":false,"fixed":false}', + 'null?': 'true', + 'default': '', + 'kind': 'COLUMN', + 'expression': '', + 'comment': '', + 'database_name': 'FOO_DB', + 'autoincrement': '' + }, + { + 'table_name': 'BAR_TABLE', + 'schema_name': 'FOO_SCHEMA', + 'column_name': '_SDC_FOO_BAR', + 'data_type': '{"type":"TIMESTAMP_NTZ","nullable":false,"fixed":false}', + 'null?': 'true', + 'default': '', + 'kind': 'COLUMN', + 'expression': '', + 'comment': '', + 'database_name': 'FOO_DB', + 'autoincrement': '' + }, + ] diff --git a/tests/units/partialsync/test_mysql_to_snowflake.py b/tests/units/partialsync/test_mysql_to_snowflake.py index d46c0df7c..d78685d65 100644 --- a/tests/units/partialsync/test_mysql_to_snowflake.py +++ b/tests/units/partialsync/test_mysql_to_snowflake.py @@ -30,7 +30,7 @@ def test_mysql_to_snowflake_partial_sync_table_if_exception_happens(self): self.assertEqual(f'{args.table}: {exception_message}', actual_return) - def test_export_source_table_data_new(self): + def test_export_source_table_data(self): """Test export_source_table_data method""" expected_file_parts = [] @@ -49,12 +49,10 @@ def mocked_copy_table_method(table, filepath, **kwargs): mocked_copy_table.side_effect = mocked_copy_table_method test_fast_sync = FastSyncTapMySql({}, {}) - where_clause_setting = { - 'column': args.column, - 'start_value': args.start_value, - 'end_value': args.end_value - } - actual_file_parts = test_fast_sync.export_source_table_data(args, tap_id, where_clause_setting) + + where_clause = 'FOO WHERE' + actual_file_parts = test_fast_sync.export_source_table_data( + args, tap_id, where_clause) call_args = mocked_copy_table.call_args[0] call_kwargs = mocked_copy_table.call_args[1] @@ -63,7 +61,7 @@ def mocked_copy_table_method(table, filepath, **kwargs): 'split_large_files': False, 'split_file_chunk_size_mb': args.target['split_file_chunk_size_mb'], 'split_file_max_chunks': args.target['split_file_max_chunks'], - 'where_clause_setting': {'column': 'FOO_COLUMN', 'start_value': 'FOO_START', 'end_value': 'FOO_END'} + 'where_clause_sql': where_clause } self.assertEqual(2, len(call_args)) @@ -79,7 +77,6 @@ def mocked_copy_table_method(table, filepath, **kwargs): # pylint: disable=too-many-locals, too-many-arguments @mock.patch('pipelinewise.fastsync.commons.utils.save_state_file') - @mock.patch('pipelinewise.fastsync.partialsync.mysql_to_snowflake.load_into_snowflake') @mock.patch('pipelinewise.fastsync.partialsync.mysql_to_snowflake.upload_to_s3') @mock.patch('pipelinewise.fastsync.commons.utils.get_bookmark_for_table') @mock.patch('pipelinewise.fastsync.partialsync.mysql_to_snowflake.FastSyncTapMySql') @@ -89,7 +86,6 @@ def test_running_partial_sync_mysql_to_snowflake(self, mocked_fastsyncmysql, mocked_bookmark, mocked_upload_to_s3, - mocked_load_into_sf, mocked_save_state): """Test the whole partial_sync_mysql_to_snowflake module works as expected""" with TemporaryDirectory() as temp_directory: @@ -98,6 +94,10 @@ def test_running_partial_sync_mysql_to_snowflake(self, s3_keys = ['FOO_S3_KEYS', ] s3_key_pattern = 'BAR_S3_KEY_PATTERN' bookmark = 'foo_bookmark' + maped_column_types_to_target = { + 'columns': ['foo type1', 'bar type2'], + 'primary_key': 'foo_primary' + } def export_data_to_file(*args, **kwargs): # pylint: disable=unused-argument with open(f'{temp_directory}/t1', 'w', encoding='utf8') as exported_file: @@ -108,6 +108,7 @@ def export_data_to_file(*args, **kwargs): # pylint: disable=unused-argument mocked_upload_to_s3.return_value = (s3_keys, s3_key_pattern) mocked_bookmark.return_value = bookmark mocked_export_data = mocked_fastsyncmysql.return_value.export_source_table_data + mocked_fastsyncmysql.return_value.map_column_types_to_target.return_value = maped_column_types_to_target mocked_export_data.side_effect = export_data_to_file table_name = 'foo_table' @@ -153,18 +154,19 @@ def export_data_to_file(*args, **kwargs): # pylint: disable=unused-argument for message in log_messages: self.assertIn(message, actual_logs.output[log_index]) - where_clause_setting = { - 'column': arguments['column'], - 'start_value': arguments['start_value'], - 'end_value': arguments['end_value'] - } + expected_where_clause = f" WHERE {column} >= '{start_value}'" + if end_value: + expected_where_clause += f" AND {column} <= '10'" mocked_export_data.assert_called_with( - args_namespace, args_namespace.target.get('tap_id'), where_clause_setting + args_namespace, args_namespace.target.get('tap_id'), expected_where_clause ) mocked_upload_to_s3.assert_called_with(mocked_fastsync_sf(), file_parts, arguments['temp_dir']) - mocked_load_into_sf.assert_called_with( - mocked_fastsync_sf(), args_namespace, s3_keys, s3_key_pattern, file_size + mocked_fastsync_sf.return_value.merge_tables.assert_called_with( + 'foo_schema', f'{table_name}_temp', table_name, + ['foo', 'bar', '_SDC_EXTRACTED_AT', '_SDC_BATCHED_AT', '_SDC_DELETED_AT'], + maped_column_types_to_target['primary_key'] ) + mocked_fastsync_sf.return_value.drop_table.assert_called_with('foo_schema', f'{table_name}_temp') if end_value: mocked_save_state.assert_not_called() else: diff --git a/tests/units/partialsync/test_partial_sync_utils.py b/tests/units/partialsync/test_partial_sync_utils.py index 1a222b3f8..db931af84 100644 --- a/tests/units/partialsync/test_partial_sync_utils.py +++ b/tests/units/partialsync/test_partial_sync_utils.py @@ -2,7 +2,11 @@ from tempfile import TemporaryDirectory from pipelinewise.fastsync.partialsync.utils import load_into_snowflake, upload_to_s3, update_state_file +from pipelinewise.fastsync.partialsync.utils import diff_source_target_columns + + from tests.units.partialsync.utils import PartialSync2SFArgs +from tests.units.partialsync.resources.test_partial_sync_utils.sample_sf_columns import SAMPLE_OUTPUT_FROM_SF class PartialSyncUtilsTestCase(TestCase): @@ -25,41 +29,74 @@ def test_upload_to_s3(self): self.assertTupleEqual(([test_s3_key], test_s3_key), actual_return) mocked_upload_to_s3.assert_called_with(test_file_part, tmp_dir=temp_test_dir) - # pylint: disable=protected-access - def test_load_into_snowflake(self): + # pylint: disable=no-self-use + def test_load_into_snowflake_hard_delete(self): """Test load_into_snowflake method""" - test_table = 'weight_unit' - - with TemporaryDirectory() as temp_test_dir: - test_end_value_cases = (None, '30') - - for test_end_value in test_end_value_cases: - with self.subTest(endvalue= test_end_value): - args = PartialSync2SFArgs( - temp_test_dir=temp_test_dir, table=test_table, start_value='20', end_value=test_end_value - ) - test_target_schema = args.target['schema_mapping'][args.tap['dbname']]['target_schema'] - test_s3_key_pattern = ['s3_key_pattern_foo'] - test_size_byte = 4000 - test_s3_keys = ['s3_key_foo'] - test_tap_id = args.target['tap_id'] - test_bucket = args.target['s3_bucket'] - where_clause_for_end = f" AND {args.column} <= '{args.end_value}'" if args.end_value else '' + snowflake = mock.MagicMock() + target = { + 'sf_object': snowflake, + 'schema': 'FOO_SCHEMA', + 'table': 'FOO_TABLE', + 'temp': 'FOO_TEMP' + } + args = PartialSync2SFArgs( + temp_test_dir='temp_test_dir', start_value='20', end_value='30' + ) + columns_diff = { + 'added_columns': ['FOO_ADDED_COLUMN'], + 'source_columns': {'FOO_SOURCE_COLUMN': 'FOO_TYPE'} + } + primary_keys = ['FOO_PRIMARY'] + s3_key_pattern = 'FOO_PATTERN' + size_bytes = 3 + where_clause_sql = 'test' + load_into_snowflake(target, args, columns_diff, primary_keys, s3_key_pattern, size_bytes, + where_clause_sql) + + snowflake.assert_has_calls([ + mock.call.copy_to_table(s3_key_pattern, target['schema'], args.table, size_bytes, is_temporary=True), + mock.call.obfuscate_columns(target['schema'], args.table), + mock.call.add_columns(target['schema'], target['table'], columns_diff['added_columns']), + mock.call.merge_tables( + target['schema'], target['temp'], target['table'], + ['FOO_SOURCE_COLUMN', '_SDC_EXTRACTED_AT', '_SDC_BATCHED_AT', '_SDC_DELETED_AT'], primary_keys), + mock.call.partial_hard_delete(target['schema'], target['table'], where_clause_sql), + mock.call.drop_table(target['schema'], target['temp']) + ]) - mocked_snowflake = mock.MagicMock() - - load_into_snowflake(mocked_snowflake, args, test_s3_keys, test_s3_key_pattern, test_size_byte) - - mocked_snowflake.query.assert_called_with( - f'DELETE FROM {test_target_schema}."{test_table.upper()}"' - f' WHERE {args.column} >= \'{args.start_value}\'{where_clause_for_end}') - - mocked_snowflake.copy_to_table.assert_called_with( - test_s3_key_pattern, test_target_schema, args.table, test_size_byte, is_temporary=False - ) - - mocked_snowflake.copy_to_archive.assert_called_with(test_s3_keys[0], test_tap_id, args.table) - mocked_snowflake.s3.delete_object.assert_called_with(Bucket=test_bucket, Key=test_s3_keys[0]) + # pylint: disable=no-self-use + def test_load_into_snowflake_soft_delete(self): + """Test load_into_snowflake method""" + snowflake = mock.MagicMock() + target = { + 'sf_object': snowflake, + 'schema': 'FOO_SCHEMA', + 'table': 'FOO_TABLE', + 'temp': 'FOO_TEMP' + } + args = PartialSync2SFArgs( + temp_test_dir='temp_test_dir', start_value='20', end_value='30', hard_delete=False + ) + columns_diff = { + 'added_columns': ['FOO_ADDED_COLUMN'], + 'source_columns': {'FOO_SOURCE_COLUMN': 'FOO_TYPE'} + } + primary_keys = ['FOO_PRIMARY'] + s3_key_pattern = 'FOO_PATTERN' + size_bytes = 3 + where_clause_sql = 'test' + load_into_snowflake(target, args, columns_diff, primary_keys, s3_key_pattern, size_bytes, + where_clause_sql) + + snowflake.assert_has_calls([ + mock.call.copy_to_table(s3_key_pattern, target['schema'], args.table, size_bytes, is_temporary=True), + mock.call.obfuscate_columns(target['schema'], args.table), + mock.call.add_columns(target['schema'], target['table'], columns_diff['added_columns']), + mock.call.merge_tables(target['schema'], target['temp'], target['table'], + ['FOO_SOURCE_COLUMN', '_SDC_EXTRACTED_AT', '_SDC_BATCHED_AT', '_SDC_DELETED_AT'], + primary_keys), + mock.call.drop_table(target['schema'], target['temp']) + ]) # pylint: disable=no-self-use def test_update_state_file(self): @@ -78,3 +115,39 @@ def test_update_state_file(self): mocked_save_state_file.assert_not_called() else: mocked_save_state_file.assert_called_with(args.state, args.table, bookmark) + + def test_find_diff_columns(self): + """Test find_diff_columns method works as expected""" + sample_source_columns = [ + '"FOO_COLUMN_0" NUMBER', '"FOO_COLUMN_1" NUMBER', '"FOO_COLUMN_3" VARCHAR', '"FOO_COLUMN_5" VARCHAR' + ] + schema = 'FOO_SCHEMA' + table = 'BAR_TABLE' + mocked_snowflake = mock.MagicMock() + mocked_snowflake.query.return_value = SAMPLE_OUTPUT_FROM_SF + sample_target_sf = { + 'sf_object': mocked_snowflake, + 'schema': schema, + 'table': table + } + + expected_output = { + 'added_columns': {'"FOO_COLUMN_0"': 'NUMBER', + '"FOO_COLUMN_5"': 'VARCHAR'}, + 'removed_columns': { + '"FOO_COLUMN_2"': 'TEXT', + '"FOO_COLUMN_4"': 'NUMBER', + '"_SDC_FOO_BAR"': 'TIMESTAMP_NTZ' + }, + 'source_columns': { + '"FOO_COLUMN_0"': 'NUMBER', + '"FOO_COLUMN_1"': 'NUMBER', + '"FOO_COLUMN_3"': 'VARCHAR', + '"FOO_COLUMN_5"': 'VARCHAR' + }, + 'target_columns': ['FOO_COLUMN_1', 'FOO_COLUMN_2', + 'FOO_COLUMN_3', 'FOO_COLUMN_4', + '_SDC_EXTRACTED_AT', '_SDC_BATCHED_AT', '_SDC_DELETED_AT', '_SDC_FOO_BAR'], + } + actual_output = diff_source_target_columns(target_sf=sample_target_sf, source_columns=sample_source_columns) + self.assertDictEqual(actual_output, expected_output) diff --git a/tests/units/partialsync/test_postgres_to_snowflake.py b/tests/units/partialsync/test_postgres_to_snowflake.py index 169adcbd5..7f40d077f 100644 --- a/tests/units/partialsync/test_postgres_to_snowflake.py +++ b/tests/units/partialsync/test_postgres_to_snowflake.py @@ -50,7 +50,8 @@ def mocked_copy_table_method(table, filepath, **kwargs): test_fast_sync = FastSyncTapPostgres({}, {}) - actual_file_parts = test_fast_sync.export_source_table_data(args, tap_id) + where_clause = 'WHERE FOO' + actual_file_parts = test_fast_sync.export_source_table_data(args, tap_id, where_clause) call_args = mocked_copy_table.call_args[0] call_kwargs = mocked_copy_table.call_args[1] @@ -59,7 +60,7 @@ def mocked_copy_table_method(table, filepath, **kwargs): 'split_large_files': False, 'split_file_chunk_size_mb': args.target['split_file_chunk_size_mb'], 'split_file_max_chunks': args.target['split_file_max_chunks'], - 'where_clause_setting': {'column': 'FOO_COLUMN', 'start_value': 'FOO_START', 'end_value': 'FOO_END'} + 'where_clause_sql': where_clause } self.assertEqual(2, len(call_args)) @@ -75,7 +76,6 @@ def mocked_copy_table_method(table, filepath, **kwargs): # pylint: disable=too-many-locals, too-many-arguments @mock.patch('pipelinewise.fastsync.commons.utils.save_state_file') - @mock.patch('pipelinewise.fastsync.partialsync.postgres_to_snowflake.load_into_snowflake') @mock.patch('pipelinewise.fastsync.partialsync.postgres_to_snowflake.upload_to_s3') @mock.patch('pipelinewise.fastsync.commons.utils.get_bookmark_for_table') @mock.patch('pipelinewise.fastsync.partialsync.postgres_to_snowflake.FastSyncTapPostgres') @@ -85,7 +85,6 @@ def test_running_partial_sync_postgres_to_snowflake(self, mocked_fastsyncpostgres, mocked_bookmark, mocked_upload_to_s3, - mocked_load_into_sf, mocked_save_state): """Test the whole partial_sync_postgres_to_snowflake module works as expected""" with TemporaryDirectory() as temp_directory: @@ -94,6 +93,10 @@ def test_running_partial_sync_postgres_to_snowflake(self, s3_keys = ['FOO_S3_KEYS',] s3_key_pattern = 'BAR_S3_KEY_PATTERN' bookmark = 'foo_bookmark' + maped_column_types_to_target = { + 'columns': ['foo type1', 'bar type2'], + 'primary_key': 'foo_primary' + } def export_data_to_file(*args, **kwargs): # pylint: disable=unused-argument with open(f'{temp_directory}/t1', 'w', encoding='utf8') as exported_file: @@ -104,6 +107,7 @@ def export_data_to_file(*args, **kwargs): # pylint: disable=unused-argument mocked_upload_to_s3.return_value = (s3_keys, s3_key_pattern) mocked_bookmark.return_value = bookmark mocked_export_data = mocked_fastsyncpostgres.return_value.export_source_table_data + mocked_fastsyncpostgres.return_value.map_column_types_to_target.return_value = maped_column_types_to_target mocked_export_data.side_effect = export_data_to_file table_name = 'foo_table' @@ -149,13 +153,25 @@ def export_data_to_file(*args, **kwargs): # pylint: disable=unused-argument for message in log_messages: self.assertIn(message, actual_logs.output[log_index]) + expected_where_clause = f" WHERE {column} >= '{start_value}'" + if end_value: + expected_where_clause += f" AND {column} <= '10'" + + mocked_export_data.assert_called_with( - args_namespace, args_namespace.target.get('tap_id') + args_namespace, args_namespace.target.get('tap_id'), expected_where_clause ) mocked_upload_to_s3.assert_called_with(mocked_fastsync_sf(), file_parts, arguments['temp_dir']) - mocked_load_into_sf.assert_called_with( - mocked_fastsync_sf(), args_namespace, s3_keys, s3_key_pattern, file_size - ) + + mocked_fastsync_sf.return_value.merge_tables.assert_called_with('foo_schema', f'{table_name}_temp', + table_name, + ['foo', 'bar', '_SDC_EXTRACTED_AT', + '_SDC_BATCHED_AT', + '_SDC_DELETED_AT'], + maped_column_types_to_target[ + 'primary_key']) + mocked_fastsync_sf.return_value.drop_table.assert_called_with('foo_schema', f'{table_name}_temp') + if end_value: mocked_save_state.assert_not_called() else: diff --git a/tests/units/partialsync/utils.py b/tests/units/partialsync/utils.py index e1ac119aa..762623ef1 100644 --- a/tests/units/partialsync/utils.py +++ b/tests/units/partialsync/utils.py @@ -11,13 +11,16 @@ # pylint: disable=too-many-instance-attributes, too-few-public-methods class PartialSync2SFArgs: """Arguments for using in mysql to snowflake tests""" - def __init__(self, temp_test_dir, table='email', start_value='FOO_START', end_value='FOO_END', state='state.json'): + def __init__(self, temp_test_dir, table='email', + start_value='FOO_START', end_value='FOO_END', state='state.json', hard_delete=None): resources_dir = f'{os.path.dirname(__file__)}/resources' config_dir = f'{resources_dir}/test_partial_sync' tap_config = self._load_json_config(f'{config_dir}/target_snowflake/tap_mysql/config.json') target_config = self._load_json_config(f'{config_dir}/tmp/target_config_tmp.json') transform_config = self._load_json_config(f'{config_dir}/target_snowflake/tap_mysql/transformation.json') properties_config = self._load_json_config(f'{config_dir}/target_snowflake/tap_mysql/properties.json') + if hard_delete is not None: + target_config['hard_delete'] = hard_delete self.table = f'{tap_config["dbname"]}.{table}' self.column = 'FOO_COLUMN'