From 04daa8da206e3952abe3ba95c248d42201af110a Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Tue, 22 Oct 2024 12:52:02 +0200 Subject: [PATCH 1/2] Iceberg sink docs --- docs/connectors/sinks/README.md | 1 + docs/connectors/sinks/apache-iceberg-sink.md | 66 ++++++++++++++++++++ mkdocs.yml | 1 + quixstreams/sinks/community/iceberg.py | 15 ++++- 4 files changed, 80 insertions(+), 3 deletions(-) create mode 100644 docs/connectors/sinks/apache-iceberg-sink.md diff --git a/docs/connectors/sinks/README.md b/docs/connectors/sinks/README.md index ccf091f06..ffb6a6697 100644 --- a/docs/connectors/sinks/README.md +++ b/docs/connectors/sinks/README.md @@ -78,6 +78,7 @@ sdf = sdf.apply() # continue different operations with another branch... Currently, Quix Streams provides these sinks out of the box: +- [Apache Iceberg Sink](apache-iceberg-sink.md) - a sink to write data in Apache Iceberg format. - [CSV Sink](csv-sink.md) - a simple CSV sinks that writes data to a single CSV file. - [InfluxDB 3 Sink](influxdb3-sink.md) - a sink to write data to InfluxDB 3. diff --git a/docs/connectors/sinks/apache-iceberg-sink.md b/docs/connectors/sinks/apache-iceberg-sink.md new file mode 100644 index 000000000..daa115942 --- /dev/null +++ b/docs/connectors/sinks/apache-iceberg-sink.md @@ -0,0 +1,66 @@ +# Apache Iceberg Sink + +TODO: Intro + +!!! info + + This is a **Community** connector. Test it before using in production. + + To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page. + + +## How To Use Iceberg Sink + +Create an instance of `IcebergSink` and pass +it to the `StreamingDataFrame.sink()` method. + +For the full description of expected parameters, ee the [Iceberg Sink API](../../api-reference/sinks.md#icebergsink) page. + +```python +from quixstreams import Application +from quixstreams.sinks.community.iceberg import IcebergSink, AWSIcebergConfig + +# Configure S3 bucket credentials +iceberg_config = AWSIcebergConfig( + aws_s3_uri="", aws_region="", aws_access_key_id="", aws_secret_access_key="" +) + +# Configure the sink to write data to S3 with the AWS Glue catalog spec +iceberg_sink = IcebergSink( + table_name="glue.sink-test", + config=iceberg_config, + data_catalog_spec="aws_glue", +) + +app = Application(broker_address='localhost:9092', auto_offset_reset="earliest") +topic = app.topic('sink_topic') + +# Do some processing here +sdf = app.dataframe(topic=topic).print(metadata=True) + +# Sink results to the IcebergSink +sdf.sink(iceberg_sink) + + +if __name__ == "__main__": + # Start the application + app.run() +``` + +## How the Iceberg Sink Works +`IcebergSink` is a batching sink. +It batches processed records in memory per topic partition, and writes them to the configured destination when a checkpoint is committed. + +## Schema and Partition Spec +TODO + + +## Data Format +TODO + + +## Retrying Failures +`IcebergSink` will retry failed commits automatically with a random delay up to 5 seconds. + +## Delivery Guarantees +`IcebergSink` provides at-least-once guarantees, and the results may contain duplicated rows of data if there were errors during processing. diff --git a/mkdocs.yml b/mkdocs.yml index 01b81105c..74e17d1b5 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -49,6 +49,7 @@ nav: - 'Connectors [beta]': - Sinks: - 'connectors/sinks/README.md' + - Apache Iceberg Sink: connectors/sinks/apache-iceberg-sink.md - CSV Sink: connectors/sinks/csv-sink.md - InfluxDB v3 Sink: connectors/sinks/influxdb3-sink.md - Creating a Custom Sink: connectors/sinks/custom-sinks.md diff --git a/quixstreams/sinks/community/iceberg.py b/quixstreams/sinks/community/iceberg.py index 4785d5354..cf5e70180 100644 --- a/quixstreams/sinks/community/iceberg.py +++ b/quixstreams/sinks/community/iceberg.py @@ -17,7 +17,7 @@ from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.schema import Schema, NestedField from pyiceberg.types import StringType, TimestampType - from pyiceberg.exceptions import CommitFailedException # Import the exception + from pyiceberg.exceptions import CommitFailedException except ImportError as exc: raise ImportError( f"Package {exc.name} is missing: " @@ -101,12 +101,15 @@ class IcebergSink(BatchingSink): Example setup using an AWS-hosted Iceberg with AWS Glue: ``` - from quixstreams.sinks.community.iceberg import IcebergSink, IcebergAWSConfig + from quixstreams import Application + from quixstreams.sinks.community.iceberg import IcebergSink, AWSIcebergConfig - iceberg_config = IcebergAWSConfig( + # Configure S3 bucket credentials + iceberg_config = AWSIcebergConfig( aws_s3_uri="", aws_region="", aws_access_key_id="", aws_secret_access_key="" ) + # Configure the sink to write data to S3 with the AWS Glue catalog spec iceberg_sink = IcebergSink( table_name="glue.sink-test", config=iceberg_config, @@ -115,10 +118,16 @@ class IcebergSink(BatchingSink): app = Application(broker_address='localhost:9092', auto_offset_reset="earliest") topic = app.topic('sink_topic') + + # Do some processing here sdf = app.dataframe(topic=topic).print(metadata=True) + + # Sink results to the IcebergSink sdf.sink(iceberg_sink) + if __name__ == "__main__": + # Start the application app.run() ``` """ From 527d28f95950ed90edc7ffbb868e3dfeff5c26de Mon Sep 17 00:00:00 2001 From: Daniil Gusev Date: Tue, 22 Oct 2024 15:01:56 +0200 Subject: [PATCH 2/2] Iceberg sink docs --- docs/connectors/sinks/apache-iceberg-sink.md | 31 ++++++++++---------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/docs/connectors/sinks/apache-iceberg-sink.md b/docs/connectors/sinks/apache-iceberg-sink.md index daa115942..f0a89b975 100644 --- a/docs/connectors/sinks/apache-iceberg-sink.md +++ b/docs/connectors/sinks/apache-iceberg-sink.md @@ -1,6 +1,4 @@ -# Apache Iceberg Sink - -TODO: Intro +# Apache Iceberg Sink !!! info @@ -8,6 +6,21 @@ TODO: Intro To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page. +This sink writes batches of data to an Apache Iceberg table. +By default, the data will include the kafka message key, value, and timestamp. + +Currently, supports Apache Iceberg hosted in: + +- AWS + +Supported data catalogs: + +- AWS Glue + +## How the Iceberg Sink Works +`IcebergSink` is a batching sink. + +It batches processed records in memory per topic partition, serializes incoming data batches into Parquet format, and appends them to the Iceberg table, updating the table schema as necessary. ## How To Use Iceberg Sink @@ -47,18 +60,6 @@ if __name__ == "__main__": app.run() ``` -## How the Iceberg Sink Works -`IcebergSink` is a batching sink. -It batches processed records in memory per topic partition, and writes them to the configured destination when a checkpoint is committed. - -## Schema and Partition Spec -TODO - - -## Data Format -TODO - - ## Retrying Failures `IcebergSink` will retry failed commits automatically with a random delay up to 5 seconds.