Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iceberg sink docs #586

Merged
merged 2 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/connectors/sinks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ sdf = sdf.apply() # continue different operations with another branch...

Currently, Quix Streams provides these sinks out of the box:

- [Apache Iceberg Sink](apache-iceberg-sink.md) - a sink to write data in Apache Iceberg format.
- [CSV Sink](csv-sink.md) - a simple CSV sinks that writes data to a single CSV file.
- [InfluxDB 3 Sink](influxdb3-sink.md) - a sink to write data to InfluxDB 3.

Expand Down
67 changes: 67 additions & 0 deletions docs/connectors/sinks/apache-iceberg-sink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Apache Iceberg Sink

!!! info

This is a **Community** connector. Test it before using in production.

To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page.

This sink writes batches of data to an Apache Iceberg table.
By default, the data will include the kafka message key, value, and timestamp.

Currently, supports Apache Iceberg hosted in:

- AWS

Supported data catalogs:

- AWS Glue

## How the Iceberg Sink Works
`IcebergSink` is a batching sink.

It batches processed records in memory per topic partition, serializes incoming data batches into Parquet format, and appends them to the Iceberg table, updating the table schema as necessary.

## How To Use Iceberg Sink

Create an instance of `IcebergSink` and pass
it to the `StreamingDataFrame.sink()` method.

For the full description of expected parameters, ee the [Iceberg Sink API](../../api-reference/sinks.md#icebergsink) page.

```python
from quixstreams import Application
from quixstreams.sinks.community.iceberg import IcebergSink, AWSIcebergConfig

# Configure S3 bucket credentials
iceberg_config = AWSIcebergConfig(
aws_s3_uri="", aws_region="", aws_access_key_id="", aws_secret_access_key=""
)

# Configure the sink to write data to S3 with the AWS Glue catalog spec
iceberg_sink = IcebergSink(
table_name="glue.sink-test",
config=iceberg_config,
data_catalog_spec="aws_glue",
)

app = Application(broker_address='localhost:9092', auto_offset_reset="earliest")
topic = app.topic('sink_topic')

# Do some processing here
sdf = app.dataframe(topic=topic).print(metadata=True)

# Sink results to the IcebergSink
sdf.sink(iceberg_sink)


if __name__ == "__main__":
# Start the application
app.run()
```

## Retrying Failures
`IcebergSink` will retry failed commits automatically with a random delay up to 5 seconds.

## Delivery Guarantees
`IcebergSink` provides at-least-once guarantees, and the results may contain duplicated rows of data if there were errors during processing.
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ nav:
- 'Connectors [beta]':
- Sinks:
- 'connectors/sinks/README.md'
- Apache Iceberg Sink: connectors/sinks/apache-iceberg-sink.md
- CSV Sink: connectors/sinks/csv-sink.md
- InfluxDB v3 Sink: connectors/sinks/influxdb3-sink.md
- Creating a Custom Sink: connectors/sinks/custom-sinks.md
Expand Down
15 changes: 12 additions & 3 deletions quixstreams/sinks/community/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from pyiceberg.partitioning import PartitionSpec, PartitionField
from pyiceberg.schema import Schema, NestedField
from pyiceberg.types import StringType, TimestampType
from pyiceberg.exceptions import CommitFailedException # Import the exception
from pyiceberg.exceptions import CommitFailedException
except ImportError as exc:
raise ImportError(
f"Package {exc.name} is missing: "
Expand Down Expand Up @@ -101,12 +101,15 @@ class IcebergSink(BatchingSink):
Example setup using an AWS-hosted Iceberg with AWS Glue:

```
from quixstreams.sinks.community.iceberg import IcebergSink, IcebergAWSConfig
from quixstreams import Application
from quixstreams.sinks.community.iceberg import IcebergSink, AWSIcebergConfig

iceberg_config = IcebergAWSConfig(
# Configure S3 bucket credentials
iceberg_config = AWSIcebergConfig(
aws_s3_uri="", aws_region="", aws_access_key_id="", aws_secret_access_key=""
)

# Configure the sink to write data to S3 with the AWS Glue catalog spec
iceberg_sink = IcebergSink(
table_name="glue.sink-test",
config=iceberg_config,
Expand All @@ -115,10 +118,16 @@ class IcebergSink(BatchingSink):

app = Application(broker_address='localhost:9092', auto_offset_reset="earliest")
topic = app.topic('sink_topic')

# Do some processing here
sdf = app.dataframe(topic=topic).print(metadata=True)

# Sink results to the IcebergSink
sdf.sink(iceberg_sink)


if __name__ == "__main__":
# Start the application
app.run()
```
"""
Expand Down
Loading