Skip to content

Commit

Permalink
feat(data-warehouse): Use compact and vacuum on delta tables + s3 wra…
Browse files Browse the repository at this point in the history
…pper (#24335)
  • Loading branch information
Gilbert09 authored Aug 13, 2024
1 parent 83996e1 commit 90071b9
Show file tree
Hide file tree
Showing 12 changed files with 187 additions and 29 deletions.
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0016_rolemembership_organization_member
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0453_alter_errortrackinggroup_fingerprint_and_more
posthog: 0454_alter_datawarehousetable_format
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
25 changes: 25 additions & 0 deletions posthog/hogql/database/s3_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,31 @@ def return_expr(expr: str) -> str:

return f"{substitute_params(expr, raw_params)})"

# DeltaS3Wrapper format
if format == "DeltaS3Wrapper":
if url.endswith("/"):
escaped_url = add_param(f"{url}*.zstd.parquet")
else:
escaped_url = add_param(f"{url}/*.zstd.parquet")

if structure:
escaped_structure = add_param(structure, False)

expr = f"s3({escaped_url}"

if access_key and access_secret:
escaped_access_key = add_param(access_key)
escaped_access_secret = add_param(access_secret)

expr += f", {escaped_access_key}, {escaped_access_secret}"

expr += ", 'Parquet'"

if structure:
expr += f", {escaped_structure}"

return return_expr(expr)

# Delta format
if format == "Delta":
escaped_url = add_param(url)
Expand Down
18 changes: 18 additions & 0 deletions posthog/hogql/database/test/test_s3_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,24 @@ def test_s3_build_function_call_without_context_and_delta_format(self):
res = build_function_call("http://url.com", DataWarehouseTable.TableFormat.Delta, "key", "secret", None, None)
assert res == "deltaLake('http://url.com', 'key', 'secret')"

def test_s3_build_function_call_without_context_and_deltaS3Wrapper_format(self):
res = build_function_call(
"http://url.com", DataWarehouseTable.TableFormat.DeltaS3Wrapper, "key", "secret", None, None
)
assert res == "s3('http://url.com/*.zstd.parquet', 'key', 'secret', 'Parquet')"

def test_s3_build_function_call_without_context_and_deltaS3Wrapper_format_with_slash(self):
res = build_function_call(
"http://url.com/", DataWarehouseTable.TableFormat.DeltaS3Wrapper, "key", "secret", None, None
)
assert res == "s3('http://url.com/*.zstd.parquet', 'key', 'secret', 'Parquet')"

def test_s3_build_function_call_without_context_and_deltaS3Wrapper_format_with_structure(self):
res = build_function_call(
"http://url.com/", DataWarehouseTable.TableFormat.DeltaS3Wrapper, "key", "secret", "some structure", None
)
assert res == "s3('http://url.com/*.zstd.parquet', 'key', 'secret', 'Parquet', 'some structure')"

def test_s3_build_function_call_without_context_and_delta_format_and_with_structure(self):
res = build_function_call(
"http://url.com", DataWarehouseTable.TableFormat.Delta, "key", "secret", "some structure", None
Expand Down
27 changes: 27 additions & 0 deletions posthog/migrations/0454_alter_datawarehousetable_format.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Generated by Django 4.2.14 on 2024-08-13 10:54

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("posthog", "0453_alter_errortrackinggroup_fingerprint_and_more"),
]

operations = [
migrations.AlterField(
model_name="datawarehousetable",
name="format",
field=models.CharField(
choices=[
("CSV", "CSV"),
("CSVWithNames", "CSVWithNames"),
("Parquet", "Parquet"),
("JSONEachRow", "JSON"),
("Delta", "Delta"),
("DeltaS3Wrapper", "DeltaS3Wrapper"),
],
max_length=128,
),
),
]
32 changes: 30 additions & 2 deletions posthog/temporal/data_imports/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import asyncio
from posthog.settings.base_variables import TEST
from structlog.typing import FilteringBoundLogger
from dlt.common.libs.deltalake import get_delta_tables
from dlt.load.exceptions import LoadClientJobRetry
from dlt.sources import DltSource
from deltalake.exceptions import DeltaError
from collections import Counter
Expand Down Expand Up @@ -116,7 +118,10 @@ def _run(self) -> dict[str, int]:
)
except PipelineStepFailed as e:
# Remove once DLT support writing empty Delta files
if isinstance(e.exception, DeltaError):
if isinstance(e.exception, LoadClientJobRetry):
if "Generic S3 error" not in e.exception.retry_message:
raise
elif isinstance(e.exception, DeltaError):
if e.exception.args[0] != "Generic error: No data source supplied to write command.":
raise
else:
Expand All @@ -132,12 +137,22 @@ def _run(self) -> dict[str, int]:
total_counts = counts + total_counts

if total_counts.total() > 0:
delta_tables = get_delta_tables(pipeline)
file_len = 0

for table in delta_tables.values():
# Compact doesn't work on single file tables, so we can't use the wrapper for these
file_len = len(table.files())
table.optimize.compact()
table.vacuum(retention_hours=24, enforce_retention_duration=False, dry_run=False)

async_to_sync(validate_schema_and_update_table)(
run_id=self.inputs.run_id,
team_id=self.inputs.team_id,
schema_id=self.inputs.schema_id,
table_schema=self.source.schema.tables,
row_count=total_counts.total(),
use_delta_wrapper=file_len != 1,
)

pipeline_runs = pipeline_runs + 1
Expand All @@ -151,7 +166,10 @@ def _run(self) -> dict[str, int]:
)
except PipelineStepFailed as e:
# Remove once DLT support writing empty Delta files
if isinstance(e.exception, DeltaError):
if isinstance(e.exception, LoadClientJobRetry):
if "Generic S3 error" not in e.exception.retry_message:
raise
elif isinstance(e.exception, DeltaError):
if e.exception.args[0] != "Generic error: No data source supplied to write command.":
raise
else:
Expand All @@ -167,12 +185,22 @@ def _run(self) -> dict[str, int]:
total_counts = total_counts + counts

if total_counts.total() > 0:
delta_tables = get_delta_tables(pipeline)
file_len = 0

for table in delta_tables.values():
# Compact doesn't work on single file tables, so we can't use the wrapper for these
file_len = len(table.files())
table.optimize.compact()
table.vacuum(retention_hours=24, enforce_retention_duration=False, dry_run=False)

async_to_sync(validate_schema_and_update_table)(
run_id=self.inputs.run_id,
team_id=self.inputs.team_id,
schema_id=self.inputs.schema_id,
table_schema=self.source.schema.tables,
row_count=total_counts.total(),
use_delta_wrapper=file_len != 1,
)

return dict(total_counts)
Expand Down
2 changes: 2 additions & 0 deletions posthog/temporal/data_imports/pipelines/test/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def mock_create_pipeline(local_self: Any):
patch(
"posthog.temporal.data_imports.pipelines.pipeline.validate_schema_and_update_table"
) as mock_validate_schema_and_update_table,
patch("posthog.temporal.data_imports.pipelines.pipeline.get_delta_tables"),
):
pipeline = await self._create_pipeline("Customer", False)
res = await pipeline.run()
Expand All @@ -97,6 +98,7 @@ def mock_create_pipeline(local_self: Any):
patch(
"posthog.temporal.data_imports.pipelines.pipeline.validate_schema_and_update_table"
) as mock_validate_schema_and_update_table,
patch("posthog.temporal.data_imports.pipelines.pipeline.get_delta_tables"),
):
pipeline = await self._create_pipeline("Customer", True)
res = await pipeline.run()
Expand Down
11 changes: 11 additions & 0 deletions posthog/temporal/tests/data_imports/test_end_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ def mock_to_session_credentials(class_self):
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}

def mock_to_object_store_rs_credentials(class_self):
return {
"aws_access_key_id": settings.OBJECT_STORAGE_ACCESS_KEY_ID,
"aws_secret_access_key": settings.OBJECT_STORAGE_SECRET_ACCESS_KEY,
"endpoint_url": settings.OBJECT_STORAGE_ENDPOINT,
"region": "us-east-1",
"AWS_ALLOW_HTTP": "true",
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}

with (
mock.patch.object(RESTClient, "paginate", mock_paginate),
override_settings(
Expand All @@ -109,6 +119,7 @@ def mock_to_session_credentials(class_self):
AIRBYTE_BUCKET_DOMAIN="objectstorage:19000",
),
mock.patch.object(AwsCredentials, "to_session_credentials", mock_to_session_credentials),
mock.patch.object(AwsCredentials, "to_object_store_rs_credentials", mock_to_object_store_rs_credentials),
):
async with await WorkflowEnvironment.start_time_skipping() as activity_environment:
async with Worker(
Expand Down
56 changes: 50 additions & 6 deletions posthog/temporal/tests/external_data/test_external_data_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,16 @@ def mock_to_session_credentials(class_self):
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}

def mock_to_object_store_rs_credentials(class_self):
return {
"aws_access_key_id": settings.OBJECT_STORAGE_ACCESS_KEY_ID,
"aws_secret_access_key": settings.OBJECT_STORAGE_SECRET_ACCESS_KEY,
"endpoint_url": settings.OBJECT_STORAGE_ENDPOINT,
"region": "us-east-1",
"AWS_ALLOW_HTTP": "true",
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}

with (
mock.patch.object(RESTClient, "paginate", mock_customers_paginate),
override_settings(
Expand All @@ -490,6 +500,7 @@ def mock_to_session_credentials(class_self):
return_value={"clickhouse": {"id": "string", "name": "string"}},
),
mock.patch.object(AwsCredentials, "to_session_credentials", mock_to_session_credentials),
mock.patch.object(AwsCredentials, "to_object_store_rs_credentials", mock_to_object_store_rs_credentials),
):
await asyncio.gather(
activity_environment.run(import_data_activity, job_1_inputs),
Expand All @@ -500,7 +511,7 @@ def mock_to_session_credentials(class_self):
Bucket=BUCKET_NAME, Prefix=f"{folder_path}/customer/"
)

assert len(job_1_customer_objects["Contents"]) == 2
assert len(job_1_customer_objects["Contents"]) == 3

with (
mock.patch.object(RESTClient, "paginate", mock_charges_paginate),
Expand All @@ -516,6 +527,7 @@ def mock_to_session_credentials(class_self):
return_value={"clickhouse": {"id": "string", "name": "string"}},
),
mock.patch.object(AwsCredentials, "to_session_credentials", mock_to_session_credentials),
mock.patch.object(AwsCredentials, "to_object_store_rs_credentials", mock_to_object_store_rs_credentials),
):
await asyncio.gather(
activity_environment.run(import_data_activity, job_2_inputs),
Expand All @@ -524,7 +536,7 @@ def mock_to_session_credentials(class_self):
job_2_charge_objects = await minio_client.list_objects_v2(
Bucket=BUCKET_NAME, Prefix=f"{job_2.folder_path()}/charge/"
)
assert len(job_2_charge_objects["Contents"]) == 2
assert len(job_2_charge_objects["Contents"]) == 3


@pytest.mark.django_db(transaction=True)
Expand Down Expand Up @@ -586,12 +598,21 @@ def mock_to_session_credentials(class_self):
"aws_access_key_id": settings.OBJECT_STORAGE_ACCESS_KEY_ID,
"aws_secret_access_key": settings.OBJECT_STORAGE_SECRET_ACCESS_KEY,
"endpoint_url": settings.OBJECT_STORAGE_ENDPOINT,
"region_name": settings.AIRBYTE_BUCKET_REGION,
"aws_session_token": None,
"AWS_ALLOW_HTTP": "true",
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}

def mock_to_object_store_rs_credentials(class_self):
return {
"aws_access_key_id": settings.OBJECT_STORAGE_ACCESS_KEY_ID,
"aws_secret_access_key": settings.OBJECT_STORAGE_SECRET_ACCESS_KEY,
"endpoint_url": settings.OBJECT_STORAGE_ENDPOINT,
"region": "us-east-1",
"AWS_ALLOW_HTTP": "true",
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}

with (
mock.patch.object(RESTClient, "paginate", mock_customers_paginate),
override_settings(
Expand All @@ -606,6 +627,7 @@ def mock_to_session_credentials(class_self):
return_value={"clickhouse": {"id": "string", "name": "string"}},
),
mock.patch.object(AwsCredentials, "to_session_credentials", mock_to_session_credentials),
mock.patch.object(AwsCredentials, "to_object_store_rs_credentials", mock_to_object_store_rs_credentials),
):
await asyncio.gather(
activity_environment.run(import_data_activity, job_1_inputs),
Expand All @@ -617,7 +639,7 @@ def mock_to_session_credentials(class_self):
)

# if job was not canceled, this job would run indefinitely
assert len(job_1_customer_objects.get("Contents", [])) == 0
assert len(job_1_customer_objects.get("Contents", [])) == 1

await sync_to_async(job_1.refresh_from_db)()
assert job_1.rows_synced == 0
Expand Down Expand Up @@ -692,6 +714,16 @@ def mock_to_session_credentials(class_self):
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}

def mock_to_object_store_rs_credentials(class_self):
return {
"aws_access_key_id": settings.OBJECT_STORAGE_ACCESS_KEY_ID,
"aws_secret_access_key": settings.OBJECT_STORAGE_SECRET_ACCESS_KEY,
"endpoint_url": settings.OBJECT_STORAGE_ENDPOINT,
"region": "us-east-1",
"AWS_ALLOW_HTTP": "true",
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}

with (
mock.patch.object(RESTClient, "paginate", mock_customers_paginate),
override_settings(
Expand All @@ -706,6 +738,7 @@ def mock_to_session_credentials(class_self):
return_value={"clickhouse": {"id": "string", "name": "string"}},
),
mock.patch.object(AwsCredentials, "to_session_credentials", mock_to_session_credentials),
mock.patch.object(AwsCredentials, "to_object_store_rs_credentials", mock_to_object_store_rs_credentials),
):
await asyncio.gather(
activity_environment.run(import_data_activity, job_1_inputs),
Expand All @@ -716,7 +749,7 @@ def mock_to_session_credentials(class_self):
Bucket=BUCKET_NAME, Prefix=f"{folder_path}/customer/"
)

assert len(job_1_customer_objects["Contents"]) == 2
assert len(job_1_customer_objects["Contents"]) == 3

await sync_to_async(job_1.refresh_from_db)()
assert job_1.rows_synced == 1
Expand Down Expand Up @@ -851,6 +884,16 @@ def mock_to_session_credentials(class_self):
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}

def mock_to_object_store_rs_credentials(class_self):
return {
"aws_access_key_id": settings.OBJECT_STORAGE_ACCESS_KEY_ID,
"aws_secret_access_key": settings.OBJECT_STORAGE_SECRET_ACCESS_KEY,
"endpoint_url": settings.OBJECT_STORAGE_ENDPOINT,
"region": "us-east-1",
"AWS_ALLOW_HTTP": "true",
"AWS_S3_ALLOW_UNSAFE_RENAME": "true",
}

with (
override_settings(
BUCKET_URL=f"s3://{BUCKET_NAME}",
Expand All @@ -861,6 +904,7 @@ def mock_to_session_credentials(class_self):
BUCKET_NAME=BUCKET_NAME,
),
mock.patch.object(AwsCredentials, "to_session_credentials", mock_to_session_credentials),
mock.patch.object(AwsCredentials, "to_object_store_rs_credentials", mock_to_object_store_rs_credentials),
):
await asyncio.gather(
activity_environment.run(import_data_activity, job_1_inputs),
Expand All @@ -870,7 +914,7 @@ def mock_to_session_credentials(class_self):
job_1_team_objects = await minio_client.list_objects_v2(
Bucket=BUCKET_NAME, Prefix=f"{folder_path}/posthog_test/"
)
assert len(job_1_team_objects["Contents"]) == 2
assert len(job_1_team_objects["Contents"]) == 3


@pytest.mark.django_db(transaction=True)
Expand Down
6 changes: 5 additions & 1 deletion posthog/warehouse/data_load/validate_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ async def validate_schema_and_update_table(
schema_id: uuid.UUID,
table_schema: TSchemaTables,
row_count: int,
use_delta_wrapper: bool,
) -> None:
"""
Expand All @@ -84,6 +85,7 @@ async def validate_schema_and_update_table(
schema_id: The schema for which the data job relates to
table_schema: The DLT schema from the data load stage
table_row_counts: The count of synced rows from DLT
use_delta_wrapper: Whether we wanna use the S3 wrapper for Delta tables
"""

logger = await bind_temporal_worker_logger(team_id=team_id)
Expand Down Expand Up @@ -117,7 +119,9 @@ async def validate_schema_and_update_table(
table_params = {
"credential": credential,
"name": table_name,
"format": DataWarehouseTable.TableFormat.Delta,
"format": DataWarehouseTable.TableFormat.DeltaS3Wrapper
if use_delta_wrapper
else DataWarehouseTable.TableFormat.Delta,
"url_pattern": new_url_pattern,
"team_id": team_id,
"row_count": row_count,
Expand Down
Loading

0 comments on commit 90071b9

Please sign in to comment.