Skip to content

Commit

Permalink
[AP-1285] tap_s3_csv duplication bug fix (#1017)
Browse files Browse the repository at this point in the history
* fixed s3_csv duplication bug

* fixed formatting

* fixed grouping imports

* fixed lint

* using chunks

* fixed lint

* fixed typing

* kill FastSync for s3-csv

* cleanup

* fixed pep8

* fixed log

* fixed e2e test
  • Loading branch information
amofakhar authored Oct 6, 2022
1 parent 365fe17 commit a0face9
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 19 deletions.
28 changes: 19 additions & 9 deletions pipelinewise/cli/pipelinewise.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@
ConnectorType.TARGET_POSTGRES,
ConnectorType.TARGET_BIGQUERY,
},
ConnectorType.TAP_S3_CSV: {
ConnectorType.TARGET_SNOWFLAKE,
ConnectorType.TARGET_REDSHIFT,
ConnectorType.TARGET_POSTGRES,
ConnectorType.TARGET_BIGQUERY,
},
ConnectorType.TAP_MONGODB: {
ConnectorType.TARGET_SNOWFLAKE,
ConnectorType.TARGET_POSTGRES,
Expand Down Expand Up @@ -1470,9 +1464,25 @@ def sync_tables(self):
target_id=target_id,
)

self.run_tap_fastsync(
tap=tap_params, target=target_params, transform=transform_params
)
if ConnectorType(target_type) in FASTSYNC_PAIRS.get(ConnectorType(tap_type), set()):
self.run_tap_fastsync(
tap=tap_params, target=target_params, transform=transform_params
)

else:
self.tap_run_log_file = os.path.join(
log_dir, f'{target_id}-{tap_id}-{current_time}.singer.log'
)
stream_buffer_size = self.tap.get(
'stream_buffer_size', commands.DEFAULT_STREAM_BUFFER_SIZE
)

self.run_tap_singer(
tap=tap_params,
target=target_params,
transform=transform_params,
stream_buffer_size=stream_buffer_size,
)

except pidfile.AlreadyRunningError:
self.logger.error('Another instance of the tap is already running.')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,14 @@ def test_replicate_s3_to_sf(self):
Replicate csv files from s3 to Snowflake, check if return code is zero and success log file created
"""

# 1. Run tap first time - both fastsync and a singer should be triggered
# 1. Run tap first time - singer should be triggered
assertions.assert_run_tap_success(
self.tap_id, self.target_id, ['fastsync', 'singer']
self.tap_id, self.target_id, ['singer']
)
self.assert_columns_exist()

# 2. Run tap second time - both fastsync and a singer should be triggered
# 2. Run tap second time - singer should be triggered
assertions.assert_run_tap_success(
self.tap_id, self.target_id, ['fastsync', 'singer']
self.tap_id, self.target_id, ['singer']
)
self.assert_columns_exist()
8 changes: 4 additions & 4 deletions tests/end_to_end/test_target_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,15 +368,15 @@ def assert_columns_exist():
],
)

# 1. Run tap first time - both fastsync and a singer should be triggered
# 1. Run tap first time - singer should be triggered
assertions.assert_run_tap_success(
TAP_S3_CSV_ID, TARGET_ID, ['fastsync', 'singer']
TAP_S3_CSV_ID, TARGET_ID, ['singer']
)
assert_columns_exist()

# 2. Run tap second time - both fastsync and a singer should be triggered
# 2. Run tap second time - singer should be triggered
assertions.assert_run_tap_success(
TAP_S3_CSV_ID, TARGET_ID, ['fastsync', 'singer']
TAP_S3_CSV_ID, TARGET_ID, ['singer']
)
assert_columns_exist()

Expand Down
3 changes: 1 addition & 2 deletions tests/units/fastsync/commons/test_fastsync_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import argparse
import os
from unittest.mock import patch

import pytest

from unittest import TestCase, mock
from unittest.mock import patch

from pipelinewise.fastsync.commons import utils
from pipelinewise.fastsync.commons.utils import NotSelectedTableException
Expand Down

0 comments on commit a0face9

Please sign in to comment.