diff --git a/README.md b/README.md index 9bcdc30..4766e19 100644 --- a/README.md +++ b/README.md @@ -30,8 +30,8 @@ This repository contains examples of use cases that utilize Decodable streaming |[ Logical Decoding Message Examples](postgres-logical-decoding)| We show how to retrieve logical decoding messages from the Postgres WAL | |[GitHub Webhooks](github-webhooks)| We show how to process GitHub Webhook events using the Decodable REST source connector | |[PyFlink](pyflink)| We run a basic PyFlink job on Kubernetes | -|[Kafka / Flink / Iceberg](kafka-iceberg/apache-flink)| Integrating Apache Kafka with Apache Iceberg through Apache Flink. As presented at Kafka Summit London 2024| -|[Kafka / Flink / Iceberg](kafka-iceberg/decodable)| Streaming from Apache Kafka to Apache Iceberg with Decodable| +|[Kafka / Flink / Iceberg](kafka-iceberg/apache-flink)| Integrating Apache Kafka with Apache Iceberg through Apache Flink. _As presented at Kafka Summit London 2024_| +|[Kafka / Flink / Iceberg](kafka-iceberg/decodable) (with Decodable)| Streaming from Apache Kafka to Apache Iceberg with Decodable| |[Flink SQL Troubleshooting](troubleshooting-flinksql)| A set of Docker Compose environments for demonstrating various Flink SQL troubleshooting scenarios (see [related blog](https://www.decodable.co/blog/flink-sql-misconfiguration-misunderstanding-and-mishaps?utm_medium=github&utm_source=examples_repo&utm_campaign=blog&utm_content=troubleshooting-flinksql))| |[Array Aggregation](array-agg)| Using the `array_agg()` UDF for denormalizing data in a pipeline from MySQL to OpenSearch | |[Kafka with ngrok](kafka-ngrok)| Docker Compose for running Apache Kafka locally, accessible from the internet using ngrok| diff --git a/kafka-iceberg/apache-flink/README.md b/kafka-iceberg/apache-flink/README.md new file mode 100644 index 0000000..7c47ef4 --- /dev/null +++ b/kafka-iceberg/apache-flink/README.md @@ -0,0 +1,334 @@ +# Streaming data from Kafka to Iceberg with Apache Flink + +_πŸ‘‰ See the supporting blog post at https://www.decodable.co/blog/_ + +## Run it all + +The end-to-end example does the following: + +* Brings up a Flink cluster, Kafka broker, and MinIO object store +* Generates dummy data to the Kafka `orders` topic +* Uses Flink SQL to write the Kafka `orders` topic to a table in Iceberg format on MinIO + +_NB. test data is generated using [ShadowTraffic](https://shadowtraffic.io/). You can get a free trial licenceβ€”put your `license.env` file in the `shadowtraffic` folder. If you don't want to use ShadowTraffic you can insert your own dummy data on a Kafka topic._ + +```bash +# Bring up the stack +docker compose up + +# Once launched, run this: +docker compose exec -it jobmanager bash -c "./bin/sql-client.sh -f /data/kafka-to-iceberg.sql" + +# Check for data (should see a mix of parquet, json, and avro files under default_database.db/t_i_orders): +docker compose exec mc bash -c \ + "mc ls -r minio/warehouse/" +``` + +Check the data in DuckDB + +1. Build a query using the latest manifest + + ```bash + docker exec mc bash -c \ + "mc ls -r minio/warehouse/" | grep orders | grep json | tail -n1 | \ + awk '{print "SELECT count(*), strftime(to_timestamp(max(create_ts)/1000),'\''%Y-%m-%d %H:%M:%S'\'') as max_ts, \n avg(cost), min(cost) \n FROM iceberg_scan('\''s3://warehouse/" $6"'\'');"}' + ``` + +2. Run it + + ```sql + βš«β—— SELECT count(*), strftime(to_timestamp(max(create_ts)/1000),'%Y-%m-%d %H:%M:%S') as max_ts, + avg(cost), min(cost) + FROM iceberg_scan('s3://warehouse/default_database.db/t_i_orders/metadata/00002-46e26aab-b843-4d45-aa2d-66804870a39e.metadata.json'); + β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” + β”‚ count_star() β”‚ max_ts β”‚ avg("cost") β”‚ min("cost") β”‚ + β”‚ int64 β”‚ varchar β”‚ double β”‚ float β”‚ + β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ + β”‚ 37 β”‚ 2024-06-28 16:40:46 β”‚ 115.5715142327386 β”‚ 100.209236 β”‚ + β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ + ``` + +## Step-by-step + +### Set up Kafka source + +```sql +CREATE TABLE t_k_orders + ( + orderId STRING, + customerId STRING, + orderNumber INT, + product STRING, + backordered BOOLEAN, + cost FLOAT, + description STRING, + create_ts BIGINT, + creditCardNumber STRING, + discountPercent INT + ) WITH ( + 'connector' = 'kafka', + 'topic' = 'orders', + 'properties.bootstrap.servers' = 'broker:29092', + 'scan.startup.mode' = 'earliest-offset', + 'format' = 'json' + ); +``` + +```sql +SELECT * FROM t_k_orders LIMIT 10; +``` + + +``` + SQL Query Result (Table) + Refresh: 1 s Page: Last of 1 Updated: 15:20:43.336 + + orderId customerId orderNumber product backordered cost + 89a3bf3e-12e5-4386-ff1e-2de88~ f2e72581-d19b-6253-aa52-ce57f~ 0 Intelligent Granite Chair FALSE 130.58978 Blanditiis qu + fb9b04bd-d1a5-43dc-fa90-1ed75~ 5d3f2d00-8715-7b8d-1abd-7db76~ 1 Mediocre Silk Bench FALSE 79.25486 Fuga reprehen + 4fee16e6-1326-6aa6-2a8a-b3919~ 21df0e3c-5e43-cd00-1c3e-258d0~ 2 Aerodynamic Aluminum Coat FALSE 83.89926 Possimus labo + de7c84ca-8b7b-c13a-4fb8-16179~ b1303f73-5ce4-7da3-ab41-d9b04~ 3 Gorgeous Plastic Bottle FALSE 140.99934 Vero explicab + 67ab9269-b0ff-7e7f-deba-eaf8a~ 5d3f2d00-8715-7b8d-1abd-7db76~ 4 Practical Plastic Hat FALSE 86.2369 Placeat nemo + 3e9b5fc5-d5d6-62b4-3abb-244f0~ b1303f73-5ce4-7da3-ab41-d9b04~ 5 Fantastic Granite Hat FALSE 106.13418 Quod numquam + 58af6095-3aa5-eca8-c00c-98dfa~ 6878a7d0-1bb4-5817-485a-c6b85~ 6 Gorgeous Iron Bag FALSE 94.56349 Dolorem magna + 0562400e-7b51-ccbb-85a1-09349~ 5d3f2d00-8715-7b8d-1abd-7db76~ 7 Enormous Silk Hat FALSE 106.08421 Fugit omnis l + 2d772926-979d-d054-5bb0-e867f~ 6878a7d0-1bb4-5817-485a-c6b85~ 8 Heavy Duty Bronze Lamp FALSE 67.12055 Nobis tempori + b76e8915-7922-cd3b-0486-f4a42~ fd11ce95-358b-c682-994c-27246~ 9 Intelligent Linen Watch FALSE 103.01574 Consequatur v + +``` + +### Set up Iceberg sink + +Set checkpoint to happen every minute + +```sql +SET 'execution.checkpointing.interval' = '60sec'; +``` + +Set this so that the operators are separate in the Flink WebUI. + +```sql +SET 'pipeline.operator-chaining.enabled' = 'false'; +``` + +Create Iceberg table + +```sql +CREATE TABLE t_i_orders + WITH ( + 'connector' = 'iceberg', + 'catalog-type'='hive', + 'catalog-name'='dev', + 'warehouse' = 's3a://warehouse', + 'hive-conf-dir' = './conf') + AS + SELECT * FROM t_k_orders + WHERE cost > 100; +``` + +### Examine the data in MinIO + +Check data: + +```bash +❯ docker exec mc bash -c \ + "mc ls -r minio/warehouse/" +[2024-06-28 15:23:45 UTC] 6.3KiB STANDARD default_database.db/t_i_orders/data/00000-0-131b86c6-f4fc-4f26-9541-674ec3101ea8-00001.parquet +[2024-06-28 15:22:55 UTC] 2.0KiB STANDARD default_database.db/t_i_orders/metadata/00000-59d5c01b-1ab2-457b-9365-bf1cd056bf1d.metadata.json +[2024-06-28 15:23:47 UTC] 3.1KiB STANDARD default_database.db/t_i_orders/metadata/00001-5affbf21-7bb7-4360-9d65-d547211d63ab.metadata.json +[2024-06-28 15:23:46 UTC] 7.2KiB STANDARD default_database.db/t_i_orders/metadata/6bf97c2e-0e10-410f-8db8-c6cc279e3475-m0.avro +[2024-06-28 15:23:46 UTC] 4.1KiB STANDARD default_database.db/t_i_orders/metadata/snap-3773022978137163897-1-6bf97c2e-0e10-410f-8db8-c6cc279e3475.avro +``` + +### Look at the data with PyIceberg + +```bash +docker compose exec pyiceberg "bash" +``` + +```bash +root@3e3ebb9c0be1:/# pyiceberg list +default +default_database + +root@3e3ebb9c0be1:/# pyiceberg list default_database +default_database.t_i_orders + +root@3e3ebb9c0be1:/# pyiceberg describe default_database.t_i_orders +Table format version 2 +Metadata location s3a://warehouse/default_database.db/t_i_orders/metadata/00010-e7d5499e-f73c-4ff3-a036-f17f644ac1ca.metadata.json +Table UUID 72b165e4-11f9-4a75-8a1b-e1bbfde06bae +Last Updated 1719842483459 +Partition spec [] +Sort order [] +Current schema Schema, id=0 + β”œβ”€β”€ 1: orderId: optional string + β”œβ”€β”€ 2: customerId: optional string + β”œβ”€β”€ 3: orderNumber: optional int + β”œβ”€β”€ 4: product: optional string + β”œβ”€β”€ 5: backordered: optional boolean + β”œβ”€β”€ 6: cost: optional float + β”œβ”€β”€ 7: description: optional string + β”œβ”€β”€ 8: create_ts: optional long + β”œβ”€β”€ 9: creditCardNumber: optional string + └── 10: discountPercent: optional int +Current snapshot Operation.APPEND: id=9116831331988708639, parent_id=9098627110859091234, schema_id=0 +Snapshots Snapshots + β”œβ”€β”€ Snapshot 5681413802900792746, schema 0: + β”‚ s3a://warehouse/default_database.db/t_i_orders/metadata/snap-5681413802900792746-1-f7670cb3-af47-478d-a90a-0b4e0074aabe.avro + β”œβ”€β”€ Snapshot 3079059435875923863, schema 0: + β”‚ s3a://warehouse/default_database.db/t_i_orders/metadata/snap-3079059435875923863-1-42e24305-3c5f-4eea-9df3-2bf529704740.avro + β”œβ”€β”€ Snapshot 1110224315320183294, schema 0: + β”‚ s3a://warehouse/default_database.db/t_i_orders/metadata/snap-1110224315320183294-1-08ba7134-ab55-4ae2-995f-085f83b62a05.avro + β”œβ”€β”€ Snapshot 5859436771394135890, schema 0: + β”‚ s3a://warehouse/default_database.db/t_i_orders/metadata/snap-5859436771394135890-1-8c1bbf78-3f8e-4d7e-b444-107874a29360.avro + β”œβ”€β”€ Snapshot 8505813483884320524, schema 0: + β”‚ s3a://warehouse/default_database.db/t_i_orders/metadata/snap-8505813483884320524-1-3f2f0738-67a2-4807-8565-dedd67cddb12.avro + β”œβ”€β”€ Snapshot 4956548979990641944, schema 0: + β”‚ s3a://warehouse/default_database.db/t_i_orders/metadata/snap-4956548979990641944-1-e669a94c-805c-4f85-89d3-3be3bad231f9.avro + β”œβ”€β”€ Snapshot 2916878419900541694, schema 0: + β”‚ s3a://warehouse/default_database.db/t_i_orders/metadata/snap-2916878419900541694-1-c39affb0-81b0-4f37-93be-198651dcd432.avro + β”œβ”€β”€ Snapshot 2521637909894096219, schema 0: + β”‚ s3a://warehouse/default_database.db/t_i_orders/metadata/snap-2521637909894096219-1-fa225a5f-a609-4844-95e6-6ccf16bb32f0.avro + β”œβ”€β”€ Snapshot 9098627110859091234, schema 0: + β”‚ s3a://warehouse/default_database.db/t_i_orders/metadata/snap-9098627110859091234-1-a76147f2-4162-46df-968e-5192fbf6edaf.avro + └── Snapshot 9116831331988708639, schema 0: + s3a://warehouse/default_database.db/t_i_orders/metadata/snap-9116831331988708639-1-022a8006-ae0c-48c1-a61c-de9f3ca8daee.avro +Properties hive-conf-dir ./conf + connector iceberg + write.parquet.compression-codec zstd + catalog-type hive + catalog-name dev + warehouse s3a://warehouse +root@3e3ebb9c0be1:/# +``` + +### Use DuckDB to query the data + +Look at the data with duckdb + +```bash +docker exec -it jobmanager bash -c "duckdb" +``` + +Install the needful and configure S3/Minio connection + +```sql +.prompt 'βš«β—— ' +INSTALL httpfs; +INSTALL iceberg; +LOAD httpfs; +LOAD iceberg; +CREATE SECRET secret1 ( + TYPE S3, + KEY_ID 'admin', + SECRET 'password', + REGION 'us-east-1', + ENDPOINT 'minio:9000', + URL_STYLE 'path', + USE_SSL 'false' +); +``` + +Run this bash to generate a DuckDB SQL statement to query the latest version of the Iceberg table (https://duckdb.org/docs/guides/import/s3_iceberg_import#loading-iceberg-tables-from-s3[ref]) + +```bash +docker exec mc bash -c \ + "mc ls -r minio/warehouse/" | grep orders | grep json | tail -n1 | \ + awk '{print "SELECT count(*) AS row_ct, strftime(to_timestamp(max(create_ts)/1000),'\''%Y-%m-%d %H:%M:%S'\'') AS max_ts, \n AVG(cost) AS avg_cost, MIN(cost) AS min_cost \n FROM iceberg_scan('\''s3://warehouse/" $6"'\'');"}' +``` + +```sql +βš«β—— SELECT count(*), strftime(to_timestamp(max(create_ts)/1000),'%Y-%m-%d %H:%M:%S') as max_ts, + avg(cost), min(cost) + FROM iceberg_scan('s3://warehouse/default_database.db/t_i_orders/metadata/00001-5affbf21-7bb7-4360-9d65-d547211d63ab.metadata.json'); +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ count_star() β”‚ max_ts β”‚ avg("cost") β”‚ min("cost") β”‚ +β”‚ int64 β”‚ varchar β”‚ double β”‚ float β”‚ +β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ +β”‚ 23 β”‚ 2024-06-28 15:11:11 β”‚ 119.38902548085089 β”‚ 103.01574 β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +Wait for next checkpoint, get latest manifest for the Iceberg table: + +```bash +docker exec mc bash -c \ + "mc ls -r minio/warehouse/" | grep orders | grep json | tail -n1 | \ + awk '{print "SELECT count(*), strftime(to_timestamp(max(create_ts)/1000),'\''%Y-%m-%d %H:%M:%S'\'') as max_ts, \n avg(cost), min(cost) \n FROM iceberg_scan('\''s3://warehouse/" $6"'\'');"}' +``` + +Run it to see the changed data: + +```sql +βš«β—— SELECT count(*), strftime(to_timestamp(max(create_ts)/1000),'%Y-%m-%d %H:%M:%S') as max_ts, + avg(cost), min(cost) + FROM iceberg_scan('s3://warehouse/default_database.db/t_i_orders/metadata/00003-36444b19-3cd6-4c06-ab77-b05e14af40c5.metadata.json'); +β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” +β”‚ count_star() β”‚ max_ts β”‚ avg("cost") β”‚ min("cost") β”‚ +β”‚ int64 β”‚ varchar β”‚ double β”‚ float β”‚ +β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€ +β”‚ 43 β”‚ 2024-06-28 15:35:18 β”‚ 117.6643137377362 β”‚ 100.03383 β”‚ +β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ +``` + +## Smoke-Testing Flink Dependencies for Iceberg (with Hive metastore) + +Test table, dummy source + +```sql +CREATE TABLE iceberg_test WITH ( + 'connector' = 'iceberg', + 'catalog-type'='hive', + 'catalog-name'='dev', + 'warehouse' = 's3a://warehouse', + 'hive-conf-dir' = './conf') +AS + SELECT name, COUNT(*) AS cnt + FROM (VALUES ('Never'), ('Gonna'), ('Give'), ('You'), ('Up')) AS NameTable(name) + GROUP BY name; +``` + +## Changing Iceberg table config + +https://iceberg.apache.org/docs/1.5.2/configuration/#write-properties + +e.g. `'write.format.default'='orc'` + +```sql +CREATE TABLE iceberg_test WITH ( + 'connector' = 'iceberg', + 'catalog-type'='hive', + 'catalog-name'='dev', + 'warehouse' = 's3a://warehouse', + 'hive-conf-dir' = './conf', + 'write.format.default'='orc') +AS + SELECT name, COUNT(*) AS cnt + FROM (VALUES ('Never'), ('Gonna'), ('Give'), ('You'), ('Up')) AS NameTable(name) + GROUP BY name; +``` + +```bash +❯ docker exec mc bash -c \ + "mc ls -r minio/warehouse/" +[2024-07-01 10:41:49 UTC] 398B STANDARD default_database.db/iceberg_test/data/00000-0-023674bd-dc7d-4249-8c50-8c1238881e57-00001.orc +[2024-07-01 10:41:44 UTC] 1.2KiB STANDARD default_database.db/iceberg_test/metadata/00000-bf7cc294-fe04-4e2d-af8b-722e20cfca97.metadata.json +[2024-07-01 10:41:50 UTC] 2.4KiB STANDARD default_database.db/iceberg_test/metadata/00001-0f8296eb-8e0b-4c0b-b7ab-c3bbbbcf2ff9.metadata.json +[2024-07-01 10:41:49 UTC] 6.5KiB STANDARD default_database.db/iceberg_test/metadata/279b6a97-ac90-492c-bbe7-7514af4f2a36-m0.avro +[2024-07-01 10:41:50 UTC] 4.1KiB STANDARD default_database.db/iceberg_test/metadata/snap-2795270994728078488-1-279b6a97-ac90-492c-bbe7-7514af4f2a36.avro +``` + +Change existing table: + +```sql +Flink SQL> ALTER TABLE iceberg_test SET ('write.format.default'='avro'); +[INFO] Execute statement succeed. +``` + +or reset it to its default value: + +```sql +Flink SQL> ALTER TABLE iceberg_test RESET ('write.format.default'); +[INFO] Execute statement succeed. +``` \ No newline at end of file diff --git a/kafka-iceberg/apache-flink/docker-compose.yml b/kafka-iceberg/apache-flink/docker-compose.yml index d64355a..22dffdc 100644 --- a/kafka-iceberg/apache-flink/docker-compose.yml +++ b/kafka-iceberg/apache-flink/docker-compose.yml @@ -1,5 +1,3 @@ -version: "3.8" - services: jobmanager: build: ./flink @@ -8,6 +6,8 @@ services: ports: - "8081:8081" command: jobmanager + volumes: + - .:/data/ environment: - | FLINK_PROPERTIES= @@ -27,15 +27,6 @@ services: jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 4 - # flink: - #container_name: flink - #build: ./flink - #depends_on: - #- minio - #- hive-metastore - #ports: - #- "8081:8081" - zookeeper: image: confluentinc/cp-zookeeper:7.5.1 container_name: zookeeper @@ -92,6 +83,22 @@ services: environment: - HMS_LOGLEVEL=INFO + pyiceberg: + image: python:3.12-bookworm + container_name: pyiceberg + environment: + PYICEBERG_CATALOG__DEFAULT__URI: thrift://hms:9083 + PYICEBERG_CATALOG__DEFAULT__S3__ACCESS_KEY_ID: admin + PYICEBERG_CATALOG__DEFAULT__S3__SECRET_ACCESS_KEY: password + PYICEBERG_CATALOG__DEFAULT__S3__PATH_STYLE_ACCESS: true + PYICEBERG_CATALOG__DEFAULT__S3__ENDPOINT: http://minio:9000 + entrypoint: > + /bin/sh -c " + pip install pyiceberg["s3fs,hive,pyarrow"]; + sleep infinity + " + + duckdb: image: davidgasquez/duckdb container_name: duckdb @@ -105,15 +112,15 @@ services: entrypoint: tail -f /dev/null shadowtraffic: - image: shadowtraffic/shadowtraffic:0.1.35 + # watch 'docker exec shadowtraffic curl -s localhost:9400/metrics |grep events_sent' + image: shadowtraffic/shadowtraffic:0.6.0 container_name: shadowtraffic # profiles: ["shadowtraffic"] env_file: - shadowtraffic/license.env volumes: - ./shadowtraffic:/data - command: --show-progress --config /data/kafka-retail.json - + command: --config /data/kafka-retail.json # Without a network explicitly defined, you hit this Hive/Thrift error # java.net.URISyntaxException Illegal character in hostname diff --git a/kafka-iceberg/apache-flink/flink/Dockerfile b/kafka-iceberg/apache-flink/flink/Dockerfile index bd532ff..1ea329e 100644 --- a/kafka-iceberg/apache-flink/flink/Dockerfile +++ b/kafka-iceberg/apache-flink/flink/Dockerfile @@ -10,7 +10,7 @@ RUN echo "Purge apt artifacts" && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* -RUN wget https://github.com/duckdb/duckdb/releases/download/v0.10.0/duckdb_cli-linux-amd64.zip \ +RUN wget https://github.com/duckdb/duckdb/releases/download/v1.0.0/duckdb_cli-linux-amd64.zip \ && unzip duckdb_cli-linux-amd64.zip -d /usr/local/bin \ && rm duckdb_cli-linux-amd64.zip @@ -20,18 +20,12 @@ WORKDIR /opt/flink # Set up Hive config COPY conf/hive-site.xml ./conf/hive-site.xml -# Pre-seed the SQL history because I'm nice like that -#COPY flink-sql-history /root/.flink-sql-history # Enable SQL Client to find the job manager when running it from this image RUN sed -i "s/jobmanager.rpc.address: localhost/jobmanager.rpc.address: flink-jobmanager/g" ./conf/flink-conf.yaml # Install JARs -RUN echo "Add Flink S3 Plugin" && \ - mkdir ./plugins/s3-fs-hadoop && \ - cp ./opt/flink-s3-fs-hadoop-1.18.1.jar ./plugins/s3-fs-hadoop/ - RUN echo "-> Install JARs: Flink's Kafka connector" && \ mkdir -p ./lib/kafka && pushd $_ && \ curl https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.1.0-1.18/flink-sql-connector-kafka-3.1.0-1.18.jar -O && \ diff --git a/kafka-iceberg/apache-flink/kafka-to-iceberg-multi-statement.sql b/kafka-iceberg/apache-flink/kafka-to-iceberg-multi-statement.sql new file mode 100644 index 0000000..3d079f1 --- /dev/null +++ b/kafka-iceberg/apache-flink/kafka-to-iceberg-multi-statement.sql @@ -0,0 +1,11 @@ +INSERT INTO t_i_orders +SELECT * + FROM t_k_orders + WHERE cost > 100; +UNION ALL +SELECT ko.* + FROM t_k_orders ko + INNER JOIN + customers c + ON ko.customerId = c.customerId + WHERE c.vip_status = 'Gold'; \ No newline at end of file diff --git a/kafka-iceberg/apache-flink/kafka-to-iceberg.sql b/kafka-iceberg/apache-flink/kafka-to-iceberg.sql new file mode 100644 index 0000000..e168366 --- /dev/null +++ b/kafka-iceberg/apache-flink/kafka-to-iceberg.sql @@ -0,0 +1,34 @@ +CREATE TABLE t_k_orders + ( + orderId STRING, + customerId STRING, + orderNumber INT, + product STRING, + backordered BOOLEAN, + cost FLOAT, + description STRING, + create_ts BIGINT, + creditCardNumber STRING, + discountPercent INT + ) WITH ( + 'connector' = 'kafka', + 'topic' = 'orders', + 'properties.bootstrap.servers' = 'broker:29092', + 'scan.startup.mode' = 'earliest-offset', + 'format' = 'json' + ); + + +SET 'execution.checkpointing.interval' = '60sec'; +SET 'pipeline.operator-chaining.enabled' = 'false'; + +CREATE TABLE t_i_orders + WITH ( + 'connector' = 'iceberg', + 'catalog-type'='hive', + 'catalog-name'='dev', + 'warehouse' = 's3a://warehouse', + 'hive-conf-dir' = './conf') + AS + SELECT * FROM t_k_orders + WHERE cost > 100; \ No newline at end of file diff --git a/kafka-iceberg/apache-flink/ksl-demo.adoc b/kafka-iceberg/apache-flink/ksl-demo.adoc index a587afd..4b24342 100644 --- a/kafka-iceberg/apache-flink/ksl-demo.adoc +++ b/kafka-iceberg/apache-flink/ksl-demo.adoc @@ -15,6 +15,11 @@ docker compose up -d == Demo 1: Exploring the SQL Client +[source,bash] +---- +docker compose exec -it jobmanager bash -c "./bin/sql-client.sh" +---- + === Result Mode ==== table @@ -188,7 +193,7 @@ CREATE TABLE t_i_orders WHERE cost > 100; ---- -View the Flink dashboard: http://localhost:8081/=/overview +View the Flink dashboard: http://localhost:8081/ View the MinIO browser and see that there are objects there: http://localhost:9001 (login:admin / password:password) diff --git a/kafka-iceberg/apache-flink/shadowtraffic/kafka-retail.json b/kafka-iceberg/apache-flink/shadowtraffic/kafka-retail.json index c1c7e7e..486fc3f 100644 --- a/kafka-iceberg/apache-flink/shadowtraffic/kafka-retail.json +++ b/kafka-iceberg/apache-flink/shadowtraffic/kafka-retail.json @@ -1,18 +1,43 @@ { "globalConfigs": { - "throttle": { - "ms": { - "_gen": "uniformDistribution", - "bounds": [50, 200] - } + "throttleMs": { + "_gen": "uniformDistribution", + "bounds": [ + 500, + 2000 + ] } }, - + "schedule": { + "stages": [ + { + "generators": [ + "customers" + ], + "overrides": { + "customers": { + "localConfigs": { + "maxEvents": 20, + "throttleMs": 0 + } + } + } + }, + { + "generators": [ + "orders" + ] + } + ] + }, "generators": [ { + "name": "customers", "topic": "customers", "value": { - "customerId": { "_gen": "uuid" }, + "customerId": { + "_gen": "uuid" + }, "name": { "_gen": "string", "expr": "#{Name.fullName}" @@ -26,7 +51,11 @@ }, "membershipLevel": { "_gen": "oneOf", - "choices": ["free", "pro", "elite"] + "choices": [ + "free", + "pro", + "elite" + ] }, "shippingAddress": { "_gen": "string", @@ -36,29 +65,52 @@ "_gen": "formatDateTime", "ms": { "_gen": "uniformDistribution", - "bounds": [1710176905, { "_gen": "now" }] + "bounds": [ + 1710176905, + { + "_gen": "now" + } + ] } } } }, { + "localConfigs": { + "maxMs": 3600000 + }, + "name": "orders", "topic": "orders", "value": { - "orderId": { "_gen": "uuid" }, + "orderId": { + "_gen": "uuid" + }, "customerId": { "_gen": "lookup", "topic": "customers", - "path": ["value", "customerId"] + "path": [ + "value", + "customerId" + ] }, "orderNumber": { "_gen": "sequentialInteger" }, - "product": { "_gen": "string", "expr": "#{Commerce.productName}" }, + "product": { + "_gen": "string", + "expr": "#{Commerce.productName}" + }, "backordered": { "_gen": "weightedOneOf", "choices": [ - { "weight": 19, "value": false }, - { "weight": 1, "value": true } + { + "weight": 19, + "value": false + }, + { + "weight": 1, + "value": true + } ] }, "cost": { @@ -66,15 +118,23 @@ "mean": 100, "sd": 20 }, - "description": { "_gen": "string", "expr": "#{Lorem.paragraph}" }, - "create_ts": { "_gen": "now" }, + "description": { + "_gen": "string", + "expr": "#{Lorem.paragraph}" + }, + "create_ts": { + "_gen": "now" + }, "creditCardNumber": { "_gen": "string", "expr": "#{Business.creditCardNumber}" }, "discountPercent": { "_gen": "uniformDistribution", - "bounds": [0, 10], + "bounds": [ + 0, + 10 + ], "decimals": 0 } } @@ -90,4 +150,4 @@ } } } -} +} \ No newline at end of file diff --git a/kafka-iceberg/decodable/shadowtraffic/config.json b/kafka-iceberg/decodable/shadowtraffic/config.json index cbc279a..9c62319 100644 --- a/kafka-iceberg/decodable/shadowtraffic/config.json +++ b/kafka-iceberg/decodable/shadowtraffic/config.json @@ -66,14 +66,11 @@ "expr": "#{Commerce.productName}" }, "quantity": { - "_gen": "integer", - "n": { - "_gen": "uniformDistribution", - "bounds": [ - 1, - 5 - ] - } + "_gen": "uniformDistribution", + "bounds": [ + 1, + 5 + ] }, "unitPrice": { "_gen": "uniformDistribution", @@ -105,14 +102,11 @@ ] }, { - "_gen": "integer", - "n": { - "_gen": "uniformDistribution", - "bounds": [ - 60000, - 600000 - ] - } + "_gen": "uniformDistribution", + "bounds": [ + 60000, + 600000 + ] } ] }