Skip to content

Commit

Permalink
DE-6354: Add Kafka-Iceberg with Flink example code (#16)
Browse files Browse the repository at this point in the history
* Updates

* update

* Added readme, sample SQL files, fixed deps, etc

* Update readme

* update readme
  • Loading branch information
rmoff authored Jul 2, 2024
1 parent 4103185 commit 36243a8
Show file tree
Hide file tree
Showing 9 changed files with 497 additions and 58 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ This repository contains examples of use cases that utilize Decodable streaming
|[ Logical Decoding Message Examples](postgres-logical-decoding)| We show how to retrieve logical decoding messages from the Postgres WAL |
|[GitHub Webhooks](github-webhooks)| We show how to process GitHub Webhook events using the Decodable REST source connector |
|[PyFlink](pyflink)| We run a basic PyFlink job on Kubernetes |
|[Kafka / Flink / Iceberg](kafka-iceberg/apache-flink)| Integrating Apache Kafka with Apache Iceberg through Apache Flink. As presented at Kafka Summit London 2024|
|[Kafka / Flink / Iceberg](kafka-iceberg/decodable)| Streaming from Apache Kafka to Apache Iceberg with Decodable|
|[Kafka / Flink / Iceberg](kafka-iceberg/apache-flink)| Integrating Apache Kafka with Apache Iceberg through Apache Flink. _As presented at Kafka Summit London 2024_|
|[Kafka / Flink / Iceberg](kafka-iceberg/decodable) (with Decodable)| Streaming from Apache Kafka to Apache Iceberg with Decodable|
|[Flink SQL Troubleshooting](troubleshooting-flinksql)| A set of Docker Compose environments for demonstrating various Flink SQL troubleshooting scenarios (see [related blog](https://www.decodable.co/blog/flink-sql-misconfiguration-misunderstanding-and-mishaps?utm_medium=github&utm_source=examples_repo&utm_campaign=blog&utm_content=troubleshooting-flinksql))|
|[Array Aggregation](array-agg)| Using the `array_agg()` UDF for denormalizing data in a pipeline from MySQL to OpenSearch |
|[Kafka with ngrok](kafka-ngrok)| Docker Compose for running Apache Kafka locally, accessible from the internet using ngrok|
Expand Down
334 changes: 334 additions & 0 deletions kafka-iceberg/apache-flink/README.md

Large diffs are not rendered by default.

35 changes: 21 additions & 14 deletions kafka-iceberg/apache-flink/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
version: "3.8"

services:
jobmanager:
build: ./flink
Expand All @@ -8,6 +6,8 @@ services:
ports:
- "8081:8081"
command: jobmanager
volumes:
- .:/data/
environment:
- |
FLINK_PROPERTIES=
Expand All @@ -27,15 +27,6 @@ services:
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 4
# flink:
#container_name: flink
#build: ./flink
#depends_on:
#- minio
#- hive-metastore
#ports:
#- "8081:8081"
zookeeper:
image: confluentinc/cp-zookeeper:7.5.1
container_name: zookeeper
Expand Down Expand Up @@ -92,6 +83,22 @@ services:
environment:
- HMS_LOGLEVEL=INFO

pyiceberg:
image: python:3.12-bookworm
container_name: pyiceberg
environment:
PYICEBERG_CATALOG__DEFAULT__URI: thrift://hms:9083
PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID: admin
PYICEBERG_CATALOG__DEFAULT__S3__SECRET_ACCESS_KEY: password
PYICEBERG_CATALOG__DEFAULT__S3__PATH_STYLE_ACCESS: true
PYICEBERG_CATALOG__DEFAULT__S3__ENDPOINT: http://minio:9000
entrypoint: >
/bin/sh -c "
pip install pyiceberg["s3fs,hive,pyarrow"];
sleep infinity
"
duckdb:
image: davidgasquez/duckdb
container_name: duckdb
Expand All @@ -105,15 +112,15 @@ services:
entrypoint: tail -f /dev/null

shadowtraffic:
image: shadowtraffic/shadowtraffic:0.1.35
# watch 'docker exec shadowtraffic curl -s localhost:9400/metrics |grep events_sent'
image: shadowtraffic/shadowtraffic:0.6.0
container_name: shadowtraffic
# profiles: ["shadowtraffic"]
env_file:
- shadowtraffic/license.env
volumes:
- ./shadowtraffic:/data
command: --show-progress --config /data/kafka-retail.json

command: --config /data/kafka-retail.json

# Without a network explicitly defined, you hit this Hive/Thrift error
# java.net.URISyntaxException Illegal character in hostname
Expand Down
8 changes: 1 addition & 7 deletions kafka-iceberg/apache-flink/flink/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ RUN echo "Purge apt artifacts" && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

RUN wget https://github.com/duckdb/duckdb/releases/download/v0.10.0/duckdb_cli-linux-amd64.zip \
RUN wget https://github.com/duckdb/duckdb/releases/download/v1.0.0/duckdb_cli-linux-amd64.zip \
&& unzip duckdb_cli-linux-amd64.zip -d /usr/local/bin \
&& rm duckdb_cli-linux-amd64.zip

Expand All @@ -20,18 +20,12 @@ WORKDIR /opt/flink

# Set up Hive config
COPY conf/hive-site.xml ./conf/hive-site.xml
# Pre-seed the SQL history because I'm nice like that
#COPY flink-sql-history /root/.flink-sql-history

# Enable SQL Client to find the job manager when running it from this image
RUN sed -i "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: flink-jobmanager/g" ./conf/flink-conf.yaml

# Install JARs

RUN echo "Add Flink S3 Plugin" && \
mkdir ./plugins/s3-fs-hadoop && \
cp ./opt/flink-s3-fs-hadoop-1.18.1.jar ./plugins/s3-fs-hadoop/

RUN echo "-> Install JARs: Flink's Kafka connector" && \
mkdir -p ./lib/kafka && pushd $_ && \
curl https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.1.0-1.18/flink-sql-connector-kafka-3.1.0-1.18.jar -O && \
Expand Down
11 changes: 11 additions & 0 deletions kafka-iceberg/apache-flink/kafka-to-iceberg-multi-statement.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
INSERT INTO t_i_orders
SELECT *
FROM t_k_orders
WHERE cost > 100;
UNION ALL
SELECT ko.*
FROM t_k_orders ko
INNER JOIN
customers c
ON ko.customerId = c.customerId
WHERE c.vip_status = 'Gold';
34 changes: 34 additions & 0 deletions kafka-iceberg/apache-flink/kafka-to-iceberg.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
CREATE TABLE t_k_orders
(
orderId STRING,
customerId STRING,
orderNumber INT,
product STRING,
backordered BOOLEAN,
cost FLOAT,
description STRING,
create_ts BIGINT,
creditCardNumber STRING,
discountPercent INT
) WITH (
'connector' = 'kafka',
'topic' = 'orders',
'properties.bootstrap.servers' = 'broker:29092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);


SET 'execution.checkpointing.interval' = '60sec';
SET 'pipeline.operator-chaining.enabled' = 'false';

CREATE TABLE t_i_orders
WITH (
'connector' = 'iceberg',
'catalog-type'='hive',
'catalog-name'='dev',
'warehouse' = 's3a://warehouse',
'hive-conf-dir' = './conf')
AS
SELECT * FROM t_k_orders
WHERE cost > 100;
7 changes: 6 additions & 1 deletion kafka-iceberg/apache-flink/ksl-demo.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ docker compose up -d

== Demo 1: Exploring the SQL Client

[source,bash]
----
docker compose exec -it jobmanager bash -c "./bin/sql-client.sh"
----

=== Result Mode

==== table
Expand Down Expand Up @@ -188,7 +193,7 @@ CREATE TABLE t_i_orders
WHERE cost > 100;
----

View the Flink dashboard: http://localhost:8081/=/overview
View the Flink dashboard: http://localhost:8081/

View the MinIO browser and see that there are objects there: http://localhost:9001 (login:admin / password:password)

Expand Down
96 changes: 78 additions & 18 deletions kafka-iceberg/apache-flink/shadowtraffic/kafka-retail.json
Original file line number Diff line number Diff line change
@@ -1,18 +1,43 @@
{
"globalConfigs": {
"throttle": {
"ms": {
"_gen": "uniformDistribution",
"bounds": [50, 200]
}
"throttleMs": {
"_gen": "uniformDistribution",
"bounds": [
500,
2000
]
}
},

"schedule": {
"stages": [
{
"generators": [
"customers"
],
"overrides": {
"customers": {
"localConfigs": {
"maxEvents": 20,
"throttleMs": 0
}
}
}
},
{
"generators": [
"orders"
]
}
]
},
"generators": [
{
"name": "customers",
"topic": "customers",
"value": {
"customerId": { "_gen": "uuid" },
"customerId": {
"_gen": "uuid"
},
"name": {
"_gen": "string",
"expr": "#{Name.fullName}"
Expand All @@ -26,7 +51,11 @@
},
"membershipLevel": {
"_gen": "oneOf",
"choices": ["free", "pro", "elite"]
"choices": [
"free",
"pro",
"elite"
]
},
"shippingAddress": {
"_gen": "string",
Expand All @@ -36,45 +65,76 @@
"_gen": "formatDateTime",
"ms": {
"_gen": "uniformDistribution",
"bounds": [1710176905, { "_gen": "now" }]
"bounds": [
1710176905,
{
"_gen": "now"
}
]
}
}
}
},
{
"localConfigs": {
"maxMs": 3600000
},
"name": "orders",
"topic": "orders",
"value": {
"orderId": { "_gen": "uuid" },
"orderId": {
"_gen": "uuid"
},
"customerId": {
"_gen": "lookup",
"topic": "customers",
"path": ["value", "customerId"]
"path": [
"value",
"customerId"
]
},
"orderNumber": {
"_gen": "sequentialInteger"
},
"product": { "_gen": "string", "expr": "#{Commerce.productName}" },
"product": {
"_gen": "string",
"expr": "#{Commerce.productName}"
},
"backordered": {
"_gen": "weightedOneOf",
"choices": [
{ "weight": 19, "value": false },
{ "weight": 1, "value": true }
{
"weight": 19,
"value": false
},
{
"weight": 1,
"value": true
}
]
},
"cost": {
"_gen": "normalDistribution",
"mean": 100,
"sd": 20
},
"description": { "_gen": "string", "expr": "#{Lorem.paragraph}" },
"create_ts": { "_gen": "now" },
"description": {
"_gen": "string",
"expr": "#{Lorem.paragraph}"
},
"create_ts": {
"_gen": "now"
},
"creditCardNumber": {
"_gen": "string",
"expr": "#{Business.creditCardNumber}"
},
"discountPercent": {
"_gen": "uniformDistribution",
"bounds": [0, 10],
"bounds": [
0,
10
],
"decimals": 0
}
}
Expand All @@ -90,4 +150,4 @@
}
}
}
}
}
26 changes: 10 additions & 16 deletions kafka-iceberg/decodable/shadowtraffic/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,11 @@
"expr": "#{Commerce.productName}"
},
"quantity": {
"_gen": "integer",
"n": {
"_gen": "uniformDistribution",
"bounds": [
1,
5
]
}
"_gen": "uniformDistribution",
"bounds": [
1,
5
]
},
"unitPrice": {
"_gen": "uniformDistribution",
Expand Down Expand Up @@ -105,14 +102,11 @@
]
},
{
"_gen": "integer",
"n": {
"_gen": "uniformDistribution",
"bounds": [
60000,
600000
]
}
"_gen": "uniformDistribution",
"bounds": [
60000,
600000
]
}
]
}
Expand Down

0 comments on commit 36243a8

Please sign in to comment.