diff --git a/.gitignore b/.gitignore index c182a89..70664ac 100644 --- a/.gitignore +++ b/.gitignore @@ -33,3 +33,5 @@ api-key* secret snow*.log* .DS_Store + +.venv diff --git a/README.md b/README.md index fcb5e22..9bcdc30 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,8 @@ This repository contains examples of use cases that utilize Decodable streaming |[Flink SQL Troubleshooting](troubleshooting-flinksql)| A set of Docker Compose environments for demonstrating various Flink SQL troubleshooting scenarios (see [related blog](https://www.decodable.co/blog/flink-sql-misconfiguration-misunderstanding-and-mishaps?utm_medium=github&utm_source=examples_repo&utm_campaign=blog&utm_content=troubleshooting-flinksql))| |[Array Aggregation](array-agg)| Using the `array_agg()` UDF for denormalizing data in a pipeline from MySQL to OpenSearch | |[Kafka with ngrok](kafka-ngrok)| Docker Compose for running Apache Kafka locally, accessible from the internet using ngrok| +|[PyFlink on Decodable](pyflink-decodable)| Running a PyFlink job as a Custom Pipeline on Decodable| + ## License diff --git a/pyflink-decodable/.secret_kafka_bootstrap_servers b/pyflink-decodable/.secret_kafka_bootstrap_servers new file mode 100644 index 0000000..1c40b05 --- /dev/null +++ b/pyflink-decodable/.secret_kafka_bootstrap_servers @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/pyflink-decodable/.secret_kafka_password b/pyflink-decodable/.secret_kafka_password new file mode 100644 index 0000000..1c40b05 --- /dev/null +++ b/pyflink-decodable/.secret_kafka_password @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/pyflink-decodable/.secret_kafka_user_name b/pyflink-decodable/.secret_kafka_user_name new file mode 100644 index 0000000..1c40b05 --- /dev/null +++ b/pyflink-decodable/.secret_kafka_user_name @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/pyflink-decodable/Makefile b/pyflink-decodable/Makefile new file mode 100644 index 0000000..84bf9df --- /dev/null +++ b/pyflink-decodable/Makefile @@ -0,0 +1,37 @@ +.PHONY: clean deploy + +TARGET = target +LIBS = $(TARGET)/libs +PYTHON_LIBS = $(TARGET)/python-libs + +PY = python3 +VENV = $(TARGET)/venv +BIN=$(VENV)/bin + +build: $(TARGET) $(LIBS)/flink-sql-connector-kafka-3.0.2-1.18.jar $(LIBS)/flink-python-1.18.1.jar $(TARGET)/pyflink-job.zip $(VENV) + +$(TARGET): + mkdir $(TARGET) + +$(VENV): requirements.txt + $(PY) -m venv $(VENV) + $(BIN)/pip install -r requirements.txt --target=${PYTHON_LIBS} + touch $(VENV) + +$(LIBS)/flink-sql-connector-kafka-3.0.2-1.18.jar: | $(TARGET) + mkdir -p $(LIBS) + wget -N -P $(LIBS) https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/3.0.2-1.18/flink-sql-connector-kafka-3.0.2-1.18.jar + +$(LIBS)/flink-python-1.18.1.jar: | $(TARGET) + mkdir -p $(LIBS) + wget -N -P $(LIBS) https://repo1.maven.org/maven2/org/apache/flink/flink-python/1.18.1/flink-python-1.18.1.jar + +$(TARGET)/pyflink-job.zip: main.py $(LIBS)/flink-sql-connector-kafka-3.0.2-1.18.jar $(LIBS)/flink-python-1.18.1.jar $(VENV) + cp main.py $(TARGET) + cd $(TARGET) && zip -r pyflink-job.zip main.py libs python-libs + +clean: + @rm -rf $(TARGET) + +deploy: build + decodable apply decodable-resources.yaml diff --git a/pyflink-decodable/README.md b/pyflink-decodable/README.md new file mode 100644 index 0000000..b7d39cd --- /dev/null +++ b/pyflink-decodable/README.md @@ -0,0 +1,64 @@ +# Getting Started With PyFlink On Decodable + +This example project shows how to run PyFlink jobs on Decodable. + +By deploying your PyFlink jobs as [Custom Pipelines](https://docs.decodable.co/pipelines/create-pipelines-using-your-own-apache-flink-jobs.html) onto Decodable, you can solely focus on implementing your job, +while leaving all the aspects of running the job, like provisioning Flink clusters and the underlying hardware, +keeping them secure and up-to-date, scaling them, monitoring and observing them, to the fully-managed Decodable platform. + +## Prerequisites + +You'll need the following things in place to run this example: + +* A free Decodable account ([sign up](https://app.decodable.co/-/accounts/create)) +* The [Decodable CLI](https://docs.decodable.co/cli.html) +* [GNU Make](https://www.gnu.org/software/make/) +* A Kafka cluster which can be accessed via Decodable; For instance, the free tier of [Upstash](https://upstash.com/) can be used + +Make sure you are logged into your Decodable account on the CLI by running `decodable login`. + +## Description + +This example shows how to run a simple [PyFlink](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/python/overview/) job on Decodable. +With the help of the built-in DataGen connector, a random `Todo` event is emitted per second. +A user-defined function (UDF) is used for enriching each event with user information retrieved from a remote REST API, +leveraging the `requests` and `jmespath` 3rd-party libraries. +The enriched events are sent to a Kafka topic. + +## Running the Example + +After checking out the project, provide the broker address and credentials for your Kafka cluster in the files _.secret\_kafka\_bootstrap\_servers_, ._secret\_kafka\_user\_name_, and _secret\_kafka\_password_, respectively. +If your cluster is using another securiy protocol than SASL_SSL with SCRAM-SHA-256, +adjust the connector configuration of the `enriched_todos` table in _main.py_ accordingly. + +Next, build the PyFlink job and deploy it to your Decodable account: + +``` +$ make +$ make deploy +``` + +Take note of the id generated for the `pyflink_on_decodable` pipeline. +Then activate this pipeline using the Decodable CLI: + +``` +$ decodable pipeline activate +``` + +Once the pipeline is running (use `decodable pipeline get ` to query its state), +you can observe the enriched `Todo` events in the Kafka topic, for instance via the web console when using Upstash. + +## Clean-Up + +To shut down the pipeline and clean up all the resources in your Decodable account, +run the following commands: + +``` +$ decodable pipeline deactivate +$ decodable pipeline delete + +# Obtain secret ids via decodable secret list +$ decodable secret delete +$ decodable secret delete +$ decodable secret delete +``` diff --git a/pyflink-decodable/decodable-resources.yaml b/pyflink-decodable/decodable-resources.yaml new file mode 100644 index 0000000..ee48da7 --- /dev/null +++ b/pyflink-decodable/decodable-resources.yaml @@ -0,0 +1,36 @@ +--- +kind: secret +metadata: + name: todo_kafka_user_name +spec_version: v1 +spec: + value_file: .secret_kafka_user_name +--- +kind: secret +metadata: + name: todo_kafka_password +spec_version: v1 +spec: + value_file: .secret_kafka_password +--- +kind: secret +metadata: + name: todo_kafka_bootstrap_servers +spec_version: v1 +spec: + value_file: .secret_kafka_bootstrap_servers +--- +kind: pipeline +metadata: + name: pyflink_on_decodable +spec_version: v1 +spec: + type: PYTHON + job_file_path: target/pyflink-job.zip + properties: + secrets: + - todo_kafka_user_name + - todo_kafka_password + - todo_kafka_bootstrap_servers + flink_version: 1.18-python310 + additional_metrics: [] \ No newline at end of file diff --git a/pyflink-decodable/main.py b/pyflink-decodable/main.py new file mode 100644 index 0000000..a27d959 --- /dev/null +++ b/pyflink-decodable/main.py @@ -0,0 +1,94 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import logging +import sys +import os + +import requests +import jmespath +import json +import sys + +from pyflink.datastream import StreamExecutionEnvironment +from pyflink.table import StreamTableEnvironment, DataTypes +from pyflink.table.udf import udf + +@udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.STRING()) +def get_user_name(id): + r = requests.get('https://jsonplaceholder.typicode.com/users/' + str(id)) + return jmespath.search("name", json.loads(r.text)) + +def process_todos(): + with open('/opt/pipeline-secrets/gm_todo_kafka_user_name', 'r') as file: + user_name = file.read() + with open('/opt/pipeline-secrets/gm_todo_kafka_password', 'r') as file: + password = file.read() + with open('/opt/pipeline-secrets/gm_todo_kafka_bootstrap_servers', 'r') as file: + bootstrap_servers = file.read() + + env = StreamExecutionEnvironment.get_execution_environment() + env.set_parallelism(1) + + t_env = StreamTableEnvironment.create(stream_execution_environment=env) + t_env.create_temporary_system_function("user_name", get_user_name) + + kafka_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)) + "/libs", + 'flink-sql-connector-kafka-3.0.2-1.18.jar') + + flink_python_jar = os.path.join(os.path.abspath(os.path.dirname(__file__)) + "/libs", + 'flink-python-1.18.1.jar') + + t_env.get_config()\ + .get_configuration()\ + .set_string("pipeline.jars", "file://{}".format(kafka_jar) + ";file://{}".format(flink_python_jar)) + + t_env.execute_sql(""" + CREATE TABLE todos ( + id BIGINT, + text STRING, + user_id BIGINT, + due TIMESTAMP(3) + ) WITH ( + 'connector' = 'datagen', + 'rows-per-second' = '1' + )""") + + t_env.execute_sql(f""" + CREATE TABLE enriched_todos ( + id BIGINT, + text STRING, + user_id BIGINT, + due TIMESTAMP(3), + user_name STRING + ) WITH ( + 'connector' = 'kafka', + 'topic' = 'todos', + 'properties.bootstrap.servers' = '{bootstrap_servers}', + 'properties.sasl.mechanism' = 'SCRAM-SHA-256', + 'properties.security.protocol' = 'SASL_SSL', + 'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{user_name}\" password=\"{password}\";', + 'properties.group.id' = 'todos-sink', + 'format' = 'json' + )""") + + t_env.execute_sql(""" + INSERT INTO enriched_todos SELECT *, user_name(ABS(MOD(todos.user_id, 10))) FROM todos""") + +if __name__ == '__main__': + logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") + process_todos() diff --git a/pyflink-decodable/requirements.txt b/pyflink-decodable/requirements.txt new file mode 100644 index 0000000..9fe4885 --- /dev/null +++ b/pyflink-decodable/requirements.txt @@ -0,0 +1 @@ +jmespath==1.0.1