From 90071b90f4d84f660830bceed9654c1c886d1377 Mon Sep 17 00:00:00 2001 From: Tom Owers Date: Tue, 13 Aug 2024 15:28:32 +0100 Subject: [PATCH] feat(data-warehouse): Use compact and vacuum on delta tables + s3 wrapper (#24335) --- latest_migrations.manifest | 2 +- posthog/hogql/database/s3_table.py | 25 +++++++++ posthog/hogql/database/test/test_s3_table.py | 18 ++++++ .../0454_alter_datawarehousetable_format.py | 27 +++++++++ .../data_imports/pipelines/pipeline.py | 32 ++++++++++- .../pipelines/test/test_pipeline.py | 2 + .../tests/data_imports/test_end_to_end.py | 11 ++++ .../external_data/test_external_data_job.py | 56 +++++++++++++++++-- .../warehouse/data_load/validate_schema.py | 6 +- posthog/warehouse/models/table.py | 1 + requirements.in | 6 +- requirements.txt | 30 +++++----- 12 files changed, 187 insertions(+), 29 deletions(-) create mode 100644 posthog/migrations/0454_alter_datawarehousetable_format.py diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 06630e5dead23..657eada2d02e6 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0016_rolemembership_organization_member otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0453_alter_errortrackinggroup_fingerprint_and_more +posthog: 0454_alter_datawarehousetable_format sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/posthog/hogql/database/s3_table.py b/posthog/hogql/database/s3_table.py index deecd715e4235..b29826e10b5d8 100644 --- a/posthog/hogql/database/s3_table.py +++ b/posthog/hogql/database/s3_table.py @@ -34,6 +34,31 @@ def return_expr(expr: str) -> str: return f"{substitute_params(expr, raw_params)})" + # DeltaS3Wrapper format + if format == "DeltaS3Wrapper": + if url.endswith("/"): + escaped_url = add_param(f"{url}*.zstd.parquet") + else: + escaped_url = add_param(f"{url}/*.zstd.parquet") + + if structure: + escaped_structure = add_param(structure, False) + + expr = f"s3({escaped_url}" + + if access_key and access_secret: + escaped_access_key = add_param(access_key) + escaped_access_secret = add_param(access_secret) + + expr += f", {escaped_access_key}, {escaped_access_secret}" + + expr += ", 'Parquet'" + + if structure: + expr += f", {escaped_structure}" + + return return_expr(expr) + # Delta format if format == "Delta": escaped_url = add_param(url) diff --git a/posthog/hogql/database/test/test_s3_table.py b/posthog/hogql/database/test/test_s3_table.py index d1c80c4505bb9..7db4a922c6677 100644 --- a/posthog/hogql/database/test/test_s3_table.py +++ b/posthog/hogql/database/test/test_s3_table.py @@ -219,6 +219,24 @@ def test_s3_build_function_call_without_context_and_delta_format(self): res = build_function_call("http://url.com", DataWarehouseTable.TableFormat.Delta, "key", "secret", None, None) assert res == "deltaLake('http://url.com', 'key', 'secret')" + def test_s3_build_function_call_without_context_and_deltaS3Wrapper_format(self): + res = build_function_call( + "http://url.com", DataWarehouseTable.TableFormat.DeltaS3Wrapper, "key", "secret", None, None + ) + assert res == "s3('http://url.com/*.zstd.parquet', 'key', 'secret', 'Parquet')" + + def test_s3_build_function_call_without_context_and_deltaS3Wrapper_format_with_slash(self): + res = build_function_call( + "http://url.com/", DataWarehouseTable.TableFormat.DeltaS3Wrapper, "key", "secret", None, None + ) + assert res == "s3('http://url.com/*.zstd.parquet', 'key', 'secret', 'Parquet')" + + def test_s3_build_function_call_without_context_and_deltaS3Wrapper_format_with_structure(self): + res = build_function_call( + "http://url.com/", DataWarehouseTable.TableFormat.DeltaS3Wrapper, "key", "secret", "some structure", None + ) + assert res == "s3('http://url.com/*.zstd.parquet', 'key', 'secret', 'Parquet', 'some structure')" + def test_s3_build_function_call_without_context_and_delta_format_and_with_structure(self): res = build_function_call( "http://url.com", DataWarehouseTable.TableFormat.Delta, "key", "secret", "some structure", None diff --git a/posthog/migrations/0454_alter_datawarehousetable_format.py b/posthog/migrations/0454_alter_datawarehousetable_format.py new file mode 100644 index 0000000000000..17f9cd6e071c0 --- /dev/null +++ b/posthog/migrations/0454_alter_datawarehousetable_format.py @@ -0,0 +1,27 @@ +# Generated by Django 4.2.14 on 2024-08-13 10:54 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0453_alter_errortrackinggroup_fingerprint_and_more"), + ] + + operations = [ + migrations.AlterField( + model_name="datawarehousetable", + name="format", + field=models.CharField( + choices=[ + ("CSV", "CSV"), + ("CSVWithNames", "CSVWithNames"), + ("Parquet", "Parquet"), + ("JSONEachRow", "JSON"), + ("Delta", "Delta"), + ("DeltaS3Wrapper", "DeltaS3Wrapper"), + ], + max_length=128, + ), + ), + ] diff --git a/posthog/temporal/data_imports/pipelines/pipeline.py b/posthog/temporal/data_imports/pipelines/pipeline.py index 4c374e1c9e3d7..e1489ddc780b7 100644 --- a/posthog/temporal/data_imports/pipelines/pipeline.py +++ b/posthog/temporal/data_imports/pipelines/pipeline.py @@ -10,6 +10,8 @@ import asyncio from posthog.settings.base_variables import TEST from structlog.typing import FilteringBoundLogger +from dlt.common.libs.deltalake import get_delta_tables +from dlt.load.exceptions import LoadClientJobRetry from dlt.sources import DltSource from deltalake.exceptions import DeltaError from collections import Counter @@ -116,7 +118,10 @@ def _run(self) -> dict[str, int]: ) except PipelineStepFailed as e: # Remove once DLT support writing empty Delta files - if isinstance(e.exception, DeltaError): + if isinstance(e.exception, LoadClientJobRetry): + if "Generic S3 error" not in e.exception.retry_message: + raise + elif isinstance(e.exception, DeltaError): if e.exception.args[0] != "Generic error: No data source supplied to write command.": raise else: @@ -132,12 +137,22 @@ def _run(self) -> dict[str, int]: total_counts = counts + total_counts if total_counts.total() > 0: + delta_tables = get_delta_tables(pipeline) + file_len = 0 + + for table in delta_tables.values(): + # Compact doesn't work on single file tables, so we can't use the wrapper for these + file_len = len(table.files()) + table.optimize.compact() + table.vacuum(retention_hours=24, enforce_retention_duration=False, dry_run=False) + async_to_sync(validate_schema_and_update_table)( run_id=self.inputs.run_id, team_id=self.inputs.team_id, schema_id=self.inputs.schema_id, table_schema=self.source.schema.tables, row_count=total_counts.total(), + use_delta_wrapper=file_len != 1, ) pipeline_runs = pipeline_runs + 1 @@ -151,7 +166,10 @@ def _run(self) -> dict[str, int]: ) except PipelineStepFailed as e: # Remove once DLT support writing empty Delta files - if isinstance(e.exception, DeltaError): + if isinstance(e.exception, LoadClientJobRetry): + if "Generic S3 error" not in e.exception.retry_message: + raise + elif isinstance(e.exception, DeltaError): if e.exception.args[0] != "Generic error: No data source supplied to write command.": raise else: @@ -167,12 +185,22 @@ def _run(self) -> dict[str, int]: total_counts = total_counts + counts if total_counts.total() > 0: + delta_tables = get_delta_tables(pipeline) + file_len = 0 + + for table in delta_tables.values(): + # Compact doesn't work on single file tables, so we can't use the wrapper for these + file_len = len(table.files()) + table.optimize.compact() + table.vacuum(retention_hours=24, enforce_retention_duration=False, dry_run=False) + async_to_sync(validate_schema_and_update_table)( run_id=self.inputs.run_id, team_id=self.inputs.team_id, schema_id=self.inputs.schema_id, table_schema=self.source.schema.tables, row_count=total_counts.total(), + use_delta_wrapper=file_len != 1, ) return dict(total_counts) diff --git a/posthog/temporal/data_imports/pipelines/test/test_pipeline.py b/posthog/temporal/data_imports/pipelines/test/test_pipeline.py index de4f2df7e8694..542309727d4e6 100644 --- a/posthog/temporal/data_imports/pipelines/test/test_pipeline.py +++ b/posthog/temporal/data_imports/pipelines/test/test_pipeline.py @@ -77,6 +77,7 @@ def mock_create_pipeline(local_self: Any): patch( "posthog.temporal.data_imports.pipelines.pipeline.validate_schema_and_update_table" ) as mock_validate_schema_and_update_table, + patch("posthog.temporal.data_imports.pipelines.pipeline.get_delta_tables"), ): pipeline = await self._create_pipeline("Customer", False) res = await pipeline.run() @@ -97,6 +98,7 @@ def mock_create_pipeline(local_self: Any): patch( "posthog.temporal.data_imports.pipelines.pipeline.validate_schema_and_update_table" ) as mock_validate_schema_and_update_table, + patch("posthog.temporal.data_imports.pipelines.pipeline.get_delta_tables"), ): pipeline = await self._create_pipeline("Customer", True) res = await pipeline.run() diff --git a/posthog/temporal/tests/data_imports/test_end_to_end.py b/posthog/temporal/tests/data_imports/test_end_to_end.py index 678f4d3e0590c..2dd43a8f0cef5 100644 --- a/posthog/temporal/tests/data_imports/test_end_to_end.py +++ b/posthog/temporal/tests/data_imports/test_end_to_end.py @@ -99,6 +99,16 @@ def mock_to_session_credentials(class_self): "AWS_S3_ALLOW_UNSAFE_RENAME": "true", } + def mock_to_object_store_rs_credentials(class_self): + return { + "aws_access_key_id": settings.OBJECT_STORAGE_ACCESS_KEY_ID, + "aws_secret_access_key": settings.OBJECT_STORAGE_SECRET_ACCESS_KEY, + "endpoint_url": settings.OBJECT_STORAGE_ENDPOINT, + "region": "us-east-1", + "AWS_ALLOW_HTTP": "true", + "AWS_S3_ALLOW_UNSAFE_RENAME": "true", + } + with ( mock.patch.object(RESTClient, "paginate", mock_paginate), override_settings( @@ -109,6 +119,7 @@ def mock_to_session_credentials(class_self): AIRBYTE_BUCKET_DOMAIN="objectstorage:19000", ), mock.patch.object(AwsCredentials, "to_session_credentials", mock_to_session_credentials), + mock.patch.object(AwsCredentials, "to_object_store_rs_credentials", mock_to_object_store_rs_credentials), ): async with await WorkflowEnvironment.start_time_skipping() as activity_environment: async with Worker( diff --git a/posthog/temporal/tests/external_data/test_external_data_job.py b/posthog/temporal/tests/external_data/test_external_data_job.py index aa0a83d9941a6..aa16aad42cba4 100644 --- a/posthog/temporal/tests/external_data/test_external_data_job.py +++ b/posthog/temporal/tests/external_data/test_external_data_job.py @@ -476,6 +476,16 @@ def mock_to_session_credentials(class_self): "AWS_S3_ALLOW_UNSAFE_RENAME": "true", } + def mock_to_object_store_rs_credentials(class_self): + return { + "aws_access_key_id": settings.OBJECT_STORAGE_ACCESS_KEY_ID, + "aws_secret_access_key": settings.OBJECT_STORAGE_SECRET_ACCESS_KEY, + "endpoint_url": settings.OBJECT_STORAGE_ENDPOINT, + "region": "us-east-1", + "AWS_ALLOW_HTTP": "true", + "AWS_S3_ALLOW_UNSAFE_RENAME": "true", + } + with ( mock.patch.object(RESTClient, "paginate", mock_customers_paginate), override_settings( @@ -490,6 +500,7 @@ def mock_to_session_credentials(class_self): return_value={"clickhouse": {"id": "string", "name": "string"}}, ), mock.patch.object(AwsCredentials, "to_session_credentials", mock_to_session_credentials), + mock.patch.object(AwsCredentials, "to_object_store_rs_credentials", mock_to_object_store_rs_credentials), ): await asyncio.gather( activity_environment.run(import_data_activity, job_1_inputs), @@ -500,7 +511,7 @@ def mock_to_session_credentials(class_self): Bucket=BUCKET_NAME, Prefix=f"{folder_path}/customer/" ) - assert len(job_1_customer_objects["Contents"]) == 2 + assert len(job_1_customer_objects["Contents"]) == 3 with ( mock.patch.object(RESTClient, "paginate", mock_charges_paginate), @@ -516,6 +527,7 @@ def mock_to_session_credentials(class_self): return_value={"clickhouse": {"id": "string", "name": "string"}}, ), mock.patch.object(AwsCredentials, "to_session_credentials", mock_to_session_credentials), + mock.patch.object(AwsCredentials, "to_object_store_rs_credentials", mock_to_object_store_rs_credentials), ): await asyncio.gather( activity_environment.run(import_data_activity, job_2_inputs), @@ -524,7 +536,7 @@ def mock_to_session_credentials(class_self): job_2_charge_objects = await minio_client.list_objects_v2( Bucket=BUCKET_NAME, Prefix=f"{job_2.folder_path()}/charge/" ) - assert len(job_2_charge_objects["Contents"]) == 2 + assert len(job_2_charge_objects["Contents"]) == 3 @pytest.mark.django_db(transaction=True) @@ -586,12 +598,21 @@ def mock_to_session_credentials(class_self): "aws_access_key_id": settings.OBJECT_STORAGE_ACCESS_KEY_ID, "aws_secret_access_key": settings.OBJECT_STORAGE_SECRET_ACCESS_KEY, "endpoint_url": settings.OBJECT_STORAGE_ENDPOINT, - "region_name": settings.AIRBYTE_BUCKET_REGION, "aws_session_token": None, "AWS_ALLOW_HTTP": "true", "AWS_S3_ALLOW_UNSAFE_RENAME": "true", } + def mock_to_object_store_rs_credentials(class_self): + return { + "aws_access_key_id": settings.OBJECT_STORAGE_ACCESS_KEY_ID, + "aws_secret_access_key": settings.OBJECT_STORAGE_SECRET_ACCESS_KEY, + "endpoint_url": settings.OBJECT_STORAGE_ENDPOINT, + "region": "us-east-1", + "AWS_ALLOW_HTTP": "true", + "AWS_S3_ALLOW_UNSAFE_RENAME": "true", + } + with ( mock.patch.object(RESTClient, "paginate", mock_customers_paginate), override_settings( @@ -606,6 +627,7 @@ def mock_to_session_credentials(class_self): return_value={"clickhouse": {"id": "string", "name": "string"}}, ), mock.patch.object(AwsCredentials, "to_session_credentials", mock_to_session_credentials), + mock.patch.object(AwsCredentials, "to_object_store_rs_credentials", mock_to_object_store_rs_credentials), ): await asyncio.gather( activity_environment.run(import_data_activity, job_1_inputs), @@ -617,7 +639,7 @@ def mock_to_session_credentials(class_self): ) # if job was not canceled, this job would run indefinitely - assert len(job_1_customer_objects.get("Contents", [])) == 0 + assert len(job_1_customer_objects.get("Contents", [])) == 1 await sync_to_async(job_1.refresh_from_db)() assert job_1.rows_synced == 0 @@ -692,6 +714,16 @@ def mock_to_session_credentials(class_self): "AWS_S3_ALLOW_UNSAFE_RENAME": "true", } + def mock_to_object_store_rs_credentials(class_self): + return { + "aws_access_key_id": settings.OBJECT_STORAGE_ACCESS_KEY_ID, + "aws_secret_access_key": settings.OBJECT_STORAGE_SECRET_ACCESS_KEY, + "endpoint_url": settings.OBJECT_STORAGE_ENDPOINT, + "region": "us-east-1", + "AWS_ALLOW_HTTP": "true", + "AWS_S3_ALLOW_UNSAFE_RENAME": "true", + } + with ( mock.patch.object(RESTClient, "paginate", mock_customers_paginate), override_settings( @@ -706,6 +738,7 @@ def mock_to_session_credentials(class_self): return_value={"clickhouse": {"id": "string", "name": "string"}}, ), mock.patch.object(AwsCredentials, "to_session_credentials", mock_to_session_credentials), + mock.patch.object(AwsCredentials, "to_object_store_rs_credentials", mock_to_object_store_rs_credentials), ): await asyncio.gather( activity_environment.run(import_data_activity, job_1_inputs), @@ -716,7 +749,7 @@ def mock_to_session_credentials(class_self): Bucket=BUCKET_NAME, Prefix=f"{folder_path}/customer/" ) - assert len(job_1_customer_objects["Contents"]) == 2 + assert len(job_1_customer_objects["Contents"]) == 3 await sync_to_async(job_1.refresh_from_db)() assert job_1.rows_synced == 1 @@ -851,6 +884,16 @@ def mock_to_session_credentials(class_self): "AWS_S3_ALLOW_UNSAFE_RENAME": "true", } + def mock_to_object_store_rs_credentials(class_self): + return { + "aws_access_key_id": settings.OBJECT_STORAGE_ACCESS_KEY_ID, + "aws_secret_access_key": settings.OBJECT_STORAGE_SECRET_ACCESS_KEY, + "endpoint_url": settings.OBJECT_STORAGE_ENDPOINT, + "region": "us-east-1", + "AWS_ALLOW_HTTP": "true", + "AWS_S3_ALLOW_UNSAFE_RENAME": "true", + } + with ( override_settings( BUCKET_URL=f"s3://{BUCKET_NAME}", @@ -861,6 +904,7 @@ def mock_to_session_credentials(class_self): BUCKET_NAME=BUCKET_NAME, ), mock.patch.object(AwsCredentials, "to_session_credentials", mock_to_session_credentials), + mock.patch.object(AwsCredentials, "to_object_store_rs_credentials", mock_to_object_store_rs_credentials), ): await asyncio.gather( activity_environment.run(import_data_activity, job_1_inputs), @@ -870,7 +914,7 @@ def mock_to_session_credentials(class_self): job_1_team_objects = await minio_client.list_objects_v2( Bucket=BUCKET_NAME, Prefix=f"{folder_path}/posthog_test/" ) - assert len(job_1_team_objects["Contents"]) == 2 + assert len(job_1_team_objects["Contents"]) == 3 @pytest.mark.django_db(transaction=True) diff --git a/posthog/warehouse/data_load/validate_schema.py b/posthog/warehouse/data_load/validate_schema.py index 664d51ee68b6f..1f77082815180 100644 --- a/posthog/warehouse/data_load/validate_schema.py +++ b/posthog/warehouse/data_load/validate_schema.py @@ -72,6 +72,7 @@ async def validate_schema_and_update_table( schema_id: uuid.UUID, table_schema: TSchemaTables, row_count: int, + use_delta_wrapper: bool, ) -> None: """ @@ -84,6 +85,7 @@ async def validate_schema_and_update_table( schema_id: The schema for which the data job relates to table_schema: The DLT schema from the data load stage table_row_counts: The count of synced rows from DLT + use_delta_wrapper: Whether we wanna use the S3 wrapper for Delta tables """ logger = await bind_temporal_worker_logger(team_id=team_id) @@ -117,7 +119,9 @@ async def validate_schema_and_update_table( table_params = { "credential": credential, "name": table_name, - "format": DataWarehouseTable.TableFormat.Delta, + "format": DataWarehouseTable.TableFormat.DeltaS3Wrapper + if use_delta_wrapper + else DataWarehouseTable.TableFormat.Delta, "url_pattern": new_url_pattern, "team_id": team_id, "row_count": row_count, diff --git a/posthog/warehouse/models/table.py b/posthog/warehouse/models/table.py index b6454ea379d80..fe5376507bc25 100644 --- a/posthog/warehouse/models/table.py +++ b/posthog/warehouse/models/table.py @@ -78,6 +78,7 @@ class TableFormat(models.TextChoices): Parquet = "Parquet", "Parquet" JSON = "JSONEachRow", "JSON" Delta = "Delta", "Delta" + DeltaS3Wrapper = "DeltaS3Wrapper", "DeltaS3Wrapper" name: models.CharField = models.CharField(max_length=128) format: models.CharField = models.CharField(max_length=128, choices=TableFormat.choices) diff --git a/requirements.in b/requirements.in index 5d90ff90be7b9..14e4e597971b5 100644 --- a/requirements.in +++ b/requirements.in @@ -33,8 +33,8 @@ djangorestframework==3.15.1 djangorestframework-csv==2.1.1 djangorestframework-dataclasses==1.2.0 django-fernet-encrypted-fields==0.1.3 -dlt==0.5.2a1 -dlt[deltalake]==0.5.2a1 +dlt==0.5.3 +dlt[deltalake]==0.5.3 dnspython==2.2.1 drf-exceptions-hog==0.4.0 drf-extensions==0.7.0 @@ -59,7 +59,7 @@ posthoganalytics==3.5.0 psycopg2-binary==2.9.7 PyMySQL==1.1.1 psycopg[binary]==3.1.18 -pyarrow==15.0.0 +pyarrow==17.0.0 pydantic==2.5.3 pyjwt==2.4.0 python-dateutil>=2.8.2 diff --git a/requirements.txt b/requirements.txt index 9319c6099f41a..bb89c434f2e67 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,12 +1,8 @@ -# -# This file is autogenerated by pip-compile with Python 3.11 -# by the following command: -# -# pip-compile requirements.in -# +# This file was autogenerated by uv via the following command: +# uv pip compile requirements.in -o requirements.txt aioboto3==12.0.0 # via -r requirements.in -aiobotocore[boto3]==2.7.0 +aiobotocore==2.7.0 # via # aioboto3 # s3fs @@ -208,7 +204,7 @@ djangorestframework-csv==2.1.1 # via -r requirements.in djangorestframework-dataclasses==1.2.0 # via -r requirements.in -dlt[deltalake]==0.5.2a1 +dlt==0.5.3 # via -r requirements.in dnspython==2.2.1 # via -r requirements.in @@ -240,7 +236,7 @@ gitpython==3.1.40 # via dlt giturlparse==0.12.0 # via dlt -google-api-core[grpc]==2.11.1 +google-api-core==2.11.1 # via # google-cloud-bigquery # google-cloud-core @@ -414,7 +410,7 @@ protobuf==4.22.1 # grpcio-status # proto-plus # temporalio -psycopg[binary]==3.1.18 +psycopg==3.1.18 # via -r requirements.in psycopg-binary==3.1.18 # via psycopg @@ -422,7 +418,7 @@ psycopg2-binary==2.9.7 # via -r requirements.in py==1.11.0 # via retry -pyarrow==15.0.0 +pyarrow==17.0.0 # via # -r requirements.in # deltalake @@ -548,8 +544,13 @@ semantic-version==2.8.5 # via -r requirements.in semver==3.0.2 # via dlt -sentry-sdk[celery,clickhouse-driver,django,openai]==1.44.1 +sentry-sdk==1.44.1 # via -r requirements.in +setuptools==72.1.0 + # via + # dlt + # gunicorn + # infi-clickhouse-orm simplejson==3.19.2 # via dlt six==1.16.0 @@ -659,7 +660,7 @@ unicodecsv==0.14.1 # via djangorestframework-csv uritemplate==4.1.1 # via drf-spectacular -urllib3[secure,socks]==1.26.18 +urllib3==1.26.18 # via # botocore # django-revproxy @@ -698,6 +699,3 @@ zstd==1.5.5.1 # via -r requirements.in zxcvbn==4.4.28 # via -r requirements.in - -# The following packages are considered to be unsafe in a requirements file: -# setuptools