Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enhance bulk insert speed using copy command #370

Open
wants to merge 23 commits into
base: main
Choose a base branch
from

Conversation

kinghuang
Copy link
Contributor

@kinghuang kinghuang commented Jun 14, 2024

I've been doing very large loads into PostgreSQL, and found target-postgres to be slow at bulk inserts. In PostgreSQL, the COPY command is preferable for high performance loading of data.

This PR changes the implementation of bulk_insert_records() by using COPY with CSV data instead of an INSERT statement with values. When combined with larger batches (batch_size_rows), I am able to achieve 2 to 10× bulk insert speeds compared to the existing implementation.

Here's an example difference for moving 1 million rows via tap-snowflake target-postgres.

new  48.46s user 4.19s system 19% cpu  4:36.91 total
old 189.69s user 6.76s system 28% cpu 11:33.89 total

Useful Links:

@kinghuang kinghuang marked this pull request as ready for review June 14, 2024 22:36
@edgarrmondragon
Copy link
Member

@visch did we document somewhere the steps needed to regenerate the SSL cert files so CI can pass?

@visch
Copy link
Member

visch commented Jun 18, 2024

@visch did we document somewhere the steps needed to regenerate the SSL cert files so CI can pass?

Member

We didn't :/ we should have at least linked to https://www.postgresql.org/docs/current/ssl-tcp.html#SSL-CERTIFICATE-CREATION which has the steps. Really our steps should setup them to be 250 years or something.

@sebastianswms
Copy link
Contributor

I've benchmarked the time savings on my local machine. I'm loading 1,000,000 rows directly from a .singer file and using 100,000 for batch_size_rows.

Average running time of 3 invocations using COPY (kinghuang:bulk-insert-copy): 1m46.281s

Timing Results
real    1m45.533s
user    1m34.119s
sys     0m2.153s
real    1m45.344s
user    1m34.108s
sys     0m2.395s
real    1m47.967s
user    1m37.298s
sys     0m2.201s

Average of three invocations using INSERT (MeltanoLabs:main): 2m7.645s

Timing Results
real    2m4.723s
user    1m48.083s
sys     0m1.913s
real    2m7.008s
user    1m51.234s
sys     0m1.511s
real    2m11.203s
user    1m52.054s
sys     0m1.993s

A noticeable improvement, which I imagine would be even more pronounced with a more complicated data set (my benchmark used only a few columns).

That said, I do notice that the current COPY setup fails to handle certain types of data. For example, the /target_postgres/tests/data_files/encoded_strings.singer file demonstrates this.

Log Output
(.venv) root@ssmileyAutoIDM:/git/target-postgres# cat target_postgres/tests/data_files/encoded_strings.singer | meltano invoke target-postgres
2024-06-24T20:34:37.500430Z [info     ] Environment 'dev' is active   
2024-06-24 16:34:37,936 | INFO     | target-postgres      | Target 'target-postgres' is listening for input from tap.
2024-06-24 16:34:37,936 | INFO     | target-postgres      | Initializing 'target-postgres' target sink...
2024-06-24 16:34:37,936 | INFO     | target-postgres.test_strings | Initializing target sink for stream 'test_strings'...
2024-06-24 16:34:37,975 | INFO     | target-postgres      | Initializing 'target-postgres' target sink...
2024-06-24 16:34:37,975 | INFO     | target-postgres.test_strings_in_objects | Initializing target sink for stream 'test_strings_in_objects'...
2024-06-24 16:34:37,988 | INFO     | target-postgres      | Initializing 'target-postgres' target sink...
2024-06-24 16:34:37,989 | INFO     | target-postgres.test_strings_in_arrays | Initializing target sink for stream 'test_strings_in_arrays'...
2024-06-24 16:34:38,000 | INFO     | target-postgres      | Target 'target-postgres' completed reading 32 lines of input (3 schemas, 28 records, 0 batch manifests, 1 state messages).
2024-06-24 16:34:38,059 | INFO     | target-postgres.test_strings_in_objects | Inserting with SQL: copy "f233d2ab_161d_422d_8211_8d99660066d1" ("id", "info", "_sdc_extracted_at", "_sdc_received_at", "_sdc_batched_at", "_sdc_deleted_at", "_sdc_sequence", "_sdc_table_version", "_sdc_sync_started_at") from stdin with csv
2024-06-24 16:34:38,061 | INFO     | target-postgres.test_strings_in_arrays | Inserting with SQL: copy "69e0cb26_e5f0_4559_ae3b_797057adfafe" ("id", "strings", "_sdc_extracted_at", "_sdc_received_at", "_sdc_batched_at", "_sdc_deleted_at", "_sdc_sequence", "_sdc_table_version", "_sdc_sync_started_at") from stdin with csv
2024-06-24 16:34:38,061 | INFO     | target-postgres.test_strings | Inserting with SQL: copy "9e3a2e80_1aad_4ab3_b528_c444869c1f38" ("id", "info", "_sdc_extracted_at", "_sdc_received_at", "_sdc_batched_at", "_sdc_deleted_at", "_sdc_sequence", "_sdc_table_version", "_sdc_sync_started_at") from stdin with csv
joblib.externals.loky.process_executor._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/joblib/_utils.py", line 72, in __call__
    return self.func(**kwargs)
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/joblib/parallel.py", line 598, in __call__
    return [func(*args, **kwargs)
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/joblib/parallel.py", line 598, in <listcomp>
    return [func(*args, **kwargs)
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/singer_sdk/target_base.py", line 522, in _drain_sink
    self.drain_one(sink)
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/singer_sdk/target_base.py", line 512, in drain_one
    sink.process_batch(draining_status)
  File "/git/target-postgres/target_postgres/sinks.py", line 102, in process_batch
    self.bulk_insert_records(
  File "/git/target-postgres/target_postgres/sinks.py", line 221, in bulk_insert_records
    cur.copy_expert(sql=copy_statement, file=buffer)
psycopg2.errors.InvalidTextRepresentation: malformed array literal: "{"\"cha\u00eene simple\"","\"quoted \\"string\\"\""}"
DETAIL:  Unexpected array element.
CONTEXT:  COPY 69e0cb26_e5f0_4559_ae3b_797057adfafe, line 2, column strings: "{"\"cha\u00eene simple\"","\"quoted \\"string\\"\""}"

"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/bin/target-postgres", line 8, in <module>
    sys.exit(TargetPostgres.cli())
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/click/core.py", line 1157, in __call__
    return self.main(*args, **kwargs)
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/click/core.py", line 1078, in main
    rv = self.invoke(ctx)
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/singer_sdk/plugin_base.py", line 80, in invoke
    return super().invoke(ctx)
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/click/core.py", line 1434, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/click/core.py", line 783, in invoke
    return __callback(*args, **kwargs)
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/singer_sdk/target_base.py", line 567, in invoke
    target.listen(file_input)
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/singer_sdk/io_base.py", line 36, in listen
    self._process_endofpipe()
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/singer_sdk/target_base.py", line 326, in _process_endofpipe
    self.drain_all(is_endofpipe=True)
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/singer_sdk/target_base.py", line 492, in drain_all
    self._drain_all(list(self._sinks_active.values()), self.max_parallelism)
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/singer_sdk/target_base.py", line 525, in _drain_all
    Parallel()(delayed(_drain_sink)(sink=sink) for sink in sink_list)
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/joblib/parallel.py", line 2007, in __call__
    return output if self.return_generator else list(output)
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/joblib/parallel.py", line 1650, in _get_outputs
    yield from self._retrieve()
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/joblib/parallel.py", line 1754, in _retrieve
    self._raise_error_fast()
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/joblib/parallel.py", line 1789, in _raise_error_fast
    error_job.get_result(self.timeout)
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/joblib/parallel.py", line 745, in get_result
    return self._return_or_raise()
  File "/git/target-postgres/.meltano/loaders/target-postgres/venv/lib/python3.9/site-packages/joblib/parallel.py", line 763, in _return_or_raise
    raise self._result
psycopg2.errors.InvalidTextRepresentation: malformed array literal: "{"\"cha\u00eene simple\"","\"quoted \\"string\\"\""}"
DETAIL:  Unexpected array element.
CONTEXT:  COPY 69e0cb26_e5f0_4559_ae3b_797057adfafe, line 2, column strings: "{"\"cha\u00eene simple\"","\"quoted \\"string\\"\""}"

'"{'
+ ",".join('""' + v.replace('"', r'\""') + '""' for v in value)
+ '}"'
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think once the tests are passing, we're good to go here. This method of escaping arrays stands out as awkward, but I think it's necessary. I tried putting together an implementation that avoids it using a custom dialect of csv writer. But I think that anything I come up with there is going to be even more janky.

Thoughts @visch ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, surprising but if this is better that makes sense to me!

CREATE TABLE public.testing_table (
    id SERIAL PRIMARY KEY,
    jsonb_data JSONB[]
);

INSERT INTO public.testing_table --succeeds, but breaks backward compatibility
(jsonb_data)
VALUES('{"[\"string1\", \"string2\", 1, 2, 3]"}'::jsonb[]);

INSERT INTO public.testing_table --succeeds
(jsonb_data)
VALUES('{\"string1\", \"string2\", 1, 2, 3}'::jsonb[]);

INSERT INTO public.testing_table --fails
(jsonb_data)
VALUES('[\"string1\", \"string2\", 1, 2, 3]'::jsonb[]);

--ERROR: malformed array literal: "[\"string1\", \"string2\", 1, 2, 3]"
--  Detail: "[" must introduce explicitly-specified array dimensions.

INSERT INTO public.testing_table --fails
(jsonb_data)
VALUES('"[\"string1\", \"string2\", 1, 2, 3]"'::jsonb[]);

--ERROR: malformed array literal: ""[\"string1\", \"string2\", 1, 2, 3]""
--  Detail: Array value must start with "{" or dimension information.

@kinghuang

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll check those out. The csv.QUOTE_NOTNULL and csv.QUOTE_STRINGS would both be more convenient, but they are new in Python 3.12.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kinghuang are you interested in getting this PR to the finish line? Is there anything I can do to help?

I'll check those out. The csv.QUOTE_NOTNULL and csv.QUOTE_STRINGS would both be more convenient, but they are new in Python 3.12.

Would it be worth using those to make the tests pass on Python 3.12? If they significantly simplify the implementation, we can think about backporting the csv library from Python 3.12+ and publish the wheels to PyPI.

Copy link
Member

@visch visch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refer to @sebastianswms 's comments

Create a variation of generate_insert_statement that returns a PostgreSQL copy statement, suitable for bulk loading of data formatted as csv.
Use copy instead of insert to bulk insert records. In PostgreSQL, copy is the fastest way to insert bulk data.
The override of generate_insert_statement is no longer used.
Use the type bind processors to generate values to ensure that values are represented correctly to PostgreSQL, especially for things like ARRAY and JSONB. Also, handle null values.
Always quote strings and handle array values.
@pnadolny13
Copy link

@kinghuang thanks for this PR! Is there anything you need to get it across the finish line? We're motivated to get this feature in 😄 . I see some comments/feedback but I'm not sure what is needed.

@visch and @sebastianswms said theyre able to assist with getting this merged if thats something of interest.

@SpaceCondor
Copy link
Contributor

@edgarrmondragon @pnadolny13 Yes we are definitely interested in getting this released as well!

@visch
Copy link
Member

visch commented Sep 18, 2024

@edgarrmondragon @pnadolny13 Yes we are definitely interested in getting this released as well!

FYI to get a massive speed increase turn off validate_records

@kinghuang
Copy link
Contributor Author

kinghuang commented Sep 19, 2024

@kinghuang thanks for this PR! Is there anything you need to get it across the finish line? We're motivated to get this feature in 😄 . I see some comments/feedback but I'm not sure what is needed.

@visch and @sebastianswms said theyre able to assist with getting this merged if thats something of interest.

Sorry, I haven't had any time to finish this off. It's been quite busy at work! There's some failing tests that indicate an escaping/encoding error somewhere.

One thing that I've thought about trying is changing the delimiters used. The blog post CSVs Are Kinda Bad. DSVs Are Kinda Good makes me think that it might be simpler and better to use the record and unit separator characters instead of trying to deal with escaping quotes and commas. The escape and quote characters are configurable on the COPY command.

@SpaceCondor
Copy link
Contributor

@kinghuang Thoughts on using the copy functionality in psycopg3?

https://www.psycopg.org/articles/2020/11/15/psycopg3-copy/

It should avoid many of the issues mentioned above.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants