diff --git a/.gitignore b/.gitignore index 70aed0e95..dea60018a 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,7 @@ certificates/ # environment env/ venv/ +.venv/ # build results /build/ diff --git a/docs/build/build.py b/docs/build/build.py index a5aadb0f0..6d24d6118 100644 --- a/docs/build/build.py +++ b/docs/build/build.py @@ -114,6 +114,7 @@ "sinks.md": { k: None for k in [ + "quixstreams.sinks.core.community.iceberg", "quixstreams.sinks.core.influxdb3", "quixstreams.sinks.core.csv", "quixstreams.sinks.base.sink", diff --git a/pyproject.toml b/pyproject.toml index 806fbb61d..aa6dcdd58 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -28,14 +28,20 @@ Homepage = "https://github.com/quixio/quix-streams" [project.optional-dependencies] all = [ - "fastavro>=1.8,<2.0", - "protobuf>=5.27.2,<6.0", - "influxdb3-python>=0.7,<1.0" + "fastavro>=1.8,<2.0", + "protobuf>=5.27.2,<6.0", + "influxdb3-python>=0.7,<1.0", + "pyarrow", + "pyiceberg", + "boto3", + "mypy_boto3_glue", ] avro = ["fastavro>=1.8,<2.0"] protobuf = ["protobuf>=5.27.2,<6.0"] influxdb3 = ["influxdb3-python>=0.7,<1.0"] +iceberg = ["pyarrow", "pyiceberg"] +iceberg_aws = ["pyarrow", "pyiceberg", "boto3", "mypy_boto3_glue"] [tool.setuptools.packages.find] include = ["quixstreams*"] diff --git a/quixstreams/sinks/community/iceberg.py b/quixstreams/sinks/community/iceberg.py new file mode 100644 index 000000000..4785d5354 --- /dev/null +++ b/quixstreams/sinks/community/iceberg.py @@ -0,0 +1,307 @@ +import logging +import random +import time +from dataclasses import dataclass +from datetime import datetime +from importlib import import_module +from io import BytesIO +from typing import Optional, Literal, get_args, Type + +from quixstreams.sinks import SinkBatch, BatchingSink, SinkBackpressureError + +try: + import pyarrow as pa + import pyarrow.parquet as pq + from pyiceberg.catalog import MetastoreCatalog + from pyiceberg.transforms import DayTransform, IdentityTransform + from pyiceberg.partitioning import PartitionSpec, PartitionField + from pyiceberg.schema import Schema, NestedField + from pyiceberg.types import StringType, TimestampType + from pyiceberg.exceptions import CommitFailedException # Import the exception +except ImportError as exc: + raise ImportError( + f"Package {exc.name} is missing: " + f'run "pip install quixstreams[iceberg]" to use IcebergSink' + ) from exc + +__all__ = ("IcebergSink", "AWSIcebergConfig") + +logger = logging.getLogger(__name__) + +DataCatalogSpec = Literal["aws_glue"] + +_SUPPORTED_DATA_CATALOG_SPECS = get_args(DataCatalogSpec) + + +@dataclass +class BaseIcebergConfig: + location: str + auth: dict + + +class AWSIcebergConfig(BaseIcebergConfig): + def __init__( + self, + aws_s3_uri: str, + aws_region: Optional[str] = None, + aws_access_key_id: Optional[str] = None, + aws_secret_access_key: Optional[str] = None, + aws_session_token: Optional[str] = None, + ): + """ + Configure IcebergSink to work with AWS Glue. + + :param aws_s3_uri: The S3 URI where the table data will be stored + (e.g., 's3://your-bucket/warehouse/'). + :param aws_region: The AWS region for the S3 bucket and Glue catalog. + :param aws_access_key_id: the AWS access key ID. + NOTE: can alternatively set the AWS_ACCESS_KEY_ID environment variable + when using AWS Glue. + :param aws_secret_access_key: the AWS secret access key. + NOTE: can alternatively set the AWS_SECRET_ACCESS_KEY environment variable + when using AWS Glue. + :param aws_session_token: a session token (or will be generated for you). + NOTE: can alternatively set the AWS_SESSION_TOKEN environment variable when + using AWS Glue. + """ + self.location = aws_s3_uri + self.auth = { + "client.region": aws_region, + "client.access-key-id": aws_access_key_id, + "client.secret-access-key": aws_secret_access_key, + "client.session-token": aws_session_token, + } + + +class IcebergSink(BatchingSink): + """ + IcebergSink writes batches of data to an Apache Iceberg table. + + The data will by default include the kafka message key, value, and timestamp. + + It serializes incoming data batches into Parquet format and appends them to the + Iceberg table, updating the table schema as necessary. + + Currently, supports Apache Iceberg hosted in: + + - AWS + + Supported data catalogs: + + - AWS Glue + + + :param table_name: The name of the Iceberg table. + :param config: An IcebergConfig with all the various connection parameters. + :param data_catalog_spec: data cataloger to use (ex. for AWS Glue, "aws_glue"). + :param schema: The Iceberg table schema. If None, a default schema is used. + :param partition_spec: The partition specification for the table. + If None, a default is used. + + Example setup using an AWS-hosted Iceberg with AWS Glue: + + ``` + from quixstreams.sinks.community.iceberg import IcebergSink, IcebergAWSConfig + + iceberg_config = IcebergAWSConfig( + aws_s3_uri="", aws_region="", aws_access_key_id="", aws_secret_access_key="" + ) + + iceberg_sink = IcebergSink( + table_name="glue.sink-test", + config=iceberg_config, + data_catalog_spec="aws_glue", + ) + + app = Application(broker_address='localhost:9092', auto_offset_reset="earliest") + topic = app.topic('sink_topic') + sdf = app.dataframe(topic=topic).print(metadata=True) + sdf.sink(iceberg_sink) + + if __name__ == "__main__": + app.run() + ``` + """ + + def __init__( + self, + table_name: str, + config: BaseIcebergConfig, + data_catalog_spec: DataCatalogSpec, + schema: Optional[Schema] = None, + partition_spec: Optional[PartitionSpec] = None, + ): + super().__init__() + self.iceberg_config = config + + # Configure Iceberg Catalog + data_catalog_cls = _import_data_catalog(data_catalog_spec) + self.data_catalog = data_catalog_cls( + name=f"{data_catalog_spec}_catalog", + **self.iceberg_config.auth, + ) + + # Set up the schema. + if schema is None: + # Use a default schema if none is provided. + schema = self._get_default_schema() + + # Set up the partition specification. + if partition_spec is None: + partition_spec = self._get_default_partition_spec(schema=schema) + + # Create the Iceberg table if it doesn't exist. + self.table = self.data_catalog.create_table_if_not_exists( + identifier=table_name, + schema=schema, + location=self.iceberg_config.location, + partition_spec=partition_spec, + properties={"write.distribution-mode": "fanout"}, + ) + logger.info( + f"Loaded Iceberg table '{table_name}' at '{self.iceberg_config.location}'." + ) + + def write(self, batch: SinkBatch): + """ + Writes a batch of data to the Iceberg table. + Implements retry logic to handle concurrent write conflicts. + + :param batch: The batch of data to write. + """ + try: + # Serialize batch data into Parquet format. + data = self._serialize_batch_values(batch) + + # Read data into a PyArrow Table. + input_buffer = pa.BufferReader(data) + parquet_table = pq.read_table(input_buffer) + + # Reload the table to get the latest metadata + self.table = self.data_catalog.load_table(self.table.name()) + + # Update the table schema if necessary. + with self.table.update_schema() as update: + update.union_by_name(parquet_table.schema) + + append_start_epoch = time.time() + self.table.append(parquet_table) + logger.info( + f"Appended {batch.size} records to {self.table.name()} table " + f"in {time.time() - append_start_epoch}s." + ) + + except CommitFailedException as e: + # Handle commit conflict + logger.warning(f"Commit conflict detected.: {e}") + # encourage staggered backoff + sleep_time = random.uniform(0, 5) # noqa: S311 + raise SinkBackpressureError(sleep_time, batch.topic, batch.partition) + except Exception as e: + logger.error(f"Error writing data to Iceberg table: {e}") + raise + + def _get_default_schema(self) -> Schema: + """ + Return a default Iceberg schema when none is provided. + """ + return Schema( + fields=( + NestedField( + field_id=1, + name="_timestamp", + field_type=TimestampType(), + required=False, + ), + NestedField( + field_id=2, name="_key", field_type=StringType(), required=False + ), + ) + ) + + def _get_default_partition_spec(self, schema: Schema) -> PartitionSpec: + """ + Set up a default partition specification if none is provided. + """ + # Map field names to field IDs from the schema. + field_ids = {field.name: field.field_id for field in schema.fields} + + # Create partition fields for kafka key and timestamp. + partition_fields = ( + PartitionField( + source_id=field_ids["_key"], + field_id=1000, # Unique partition field ID. + transform=IdentityTransform(), + name="_key", + ), + PartitionField( + source_id=field_ids["_timestamp"], + field_id=1001, + transform=DayTransform(), + name="day", + ), + ) + + # Create the new PartitionSpec. + return PartitionSpec(fields=partition_fields) + + def _serialize_batch_values(self, batch: SinkBatch) -> bytes: + """ + Dynamically unpacks each kafka message's value (its dict keys/"columns") within the + provided batch and preps the messages for reading into a PyArrow Table. + """ + # TODO: Handle data flattening. Nested properties will cause this to crash. + # TODO: possible optimizations with all the iterative batch transformations + + # Get all unique "keys" (columns) across all rows + all_keys = set() + for row in batch: + all_keys.update(row.value.keys()) + + # Normalize rows: Ensure all rows have the same keys, filling missing ones with None + normalized_values = [ + {key: row.value.get(key, None) for key in all_keys} for row in batch + ] + + columns = { + "_timestamp": [ + datetime.fromtimestamp(row.timestamp / 1000.0) for row in batch + ], + "_key": [ + row.key.decode() if isinstance(row.key, bytes) else row.key + for row in batch + ], + } + + # Convert normalized values to a pyarrow Table + columns = { + **columns, + **{key: [row[key] for row in normalized_values] for key in all_keys}, + } + + table = pa.Table.from_pydict(columns) + + with BytesIO() as f: + pq.write_table(table, f, compression="snappy") + return f.getvalue() + + +def _import_data_catalog(data_catalog_spec: DataCatalogSpec) -> Type[MetastoreCatalog]: + """ + A way to dynamically load data catalogs which may require other imports + """ + if data_catalog_spec not in _SUPPORTED_DATA_CATALOG_SPECS: + raise ValueError(f"Unsupported data_catalog_spec: {data_catalog_spec}") + + data_catalogs = {"aws_glue": ("[iceberg_aws]", "glue.GlueCatalog")} + + install_name, module_path = data_catalogs[data_catalog_spec] + module, catalog_cls_name = module_path.split(".") + try: + return getattr(import_module(f"pyiceberg.catalog.{module}"), catalog_cls_name) + except ImportError as exc: + raise ImportError( + f"Package {exc.name} is missing: " + f"do 'pip install quixstreams{install_name}' to use " + f"data_catalog_spec {data_catalog_spec}" + ) from exc diff --git a/tests/requirements.txt b/tests/requirements.txt index 188248222..c00ec1238 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -5,3 +5,5 @@ docker>=7.1.0 # Required to use requests>=2.32 fastavro>=1.8,<2.0 protobuf>=5.27.2 influxdb3-python>=0.7.0,<1.0 +pyarrow +pyiceberg \ No newline at end of file