diff --git a/pyflink-decodable/Makefile b/pyflink-decodable/Makefile index 84bf9df..df3d769 100644 --- a/pyflink-decodable/Makefile +++ b/pyflink-decodable/Makefile @@ -15,7 +15,7 @@ $(TARGET): $(VENV): requirements.txt $(PY) -m venv $(VENV) - $(BIN)/pip install -r requirements.txt --target=${PYTHON_LIBS} + $(BIN)/pip install --platform manylinux2014_aarch64 --only-binary=:all: --upgrade -r requirements.txt --target=${PYTHON_LIBS} touch $(VENV) $(LIBS)/flink-sql-connector-kafka-3.0.2-1.18.jar: | $(TARGET) diff --git a/pyflink-decodable/main.py b/pyflink-decodable/main.py index a27d959..bb84347 100644 --- a/pyflink-decodable/main.py +++ b/pyflink-decodable/main.py @@ -28,19 +28,30 @@ from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.udf import udf +import polars as pl +from datetime import datetime + @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.STRING()) def get_user_name(id): + df = pl.DataFrame( + { + "integer": [1, 2, 3], + "date": [ + datetime(2025, 1, 1), + datetime(2025, 1, 2), + datetime(2025, 1, 3), + ], + "float": [4.0, 5.0, 6.0], + "string": ["a", "b", "c"], + } + ) + + print(df) + r = requests.get('https://jsonplaceholder.typicode.com/users/' + str(id)) return jmespath.search("name", json.loads(r.text)) def process_todos(): - with open('/opt/pipeline-secrets/gm_todo_kafka_user_name', 'r') as file: - user_name = file.read() - with open('/opt/pipeline-secrets/gm_todo_kafka_password', 'r') as file: - password = file.read() - with open('/opt/pipeline-secrets/gm_todo_kafka_bootstrap_servers', 'r') as file: - bootstrap_servers = file.read() - env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) @@ -76,14 +87,7 @@ def process_todos(): due TIMESTAMP(3), user_name STRING ) WITH ( - 'connector' = 'kafka', - 'topic' = 'todos', - 'properties.bootstrap.servers' = '{bootstrap_servers}', - 'properties.sasl.mechanism' = 'SCRAM-SHA-256', - 'properties.security.protocol' = 'SASL_SSL', - 'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{user_name}\" password=\"{password}\";', - 'properties.group.id' = 'todos-sink', - 'format' = 'json' + 'connector' = 'blackhole' )""") t_env.execute_sql(""" diff --git a/pyflink-decodable/requirements.txt b/pyflink-decodable/requirements.txt index 9fe4885..a711142 100644 --- a/pyflink-decodable/requirements.txt +++ b/pyflink-decodable/requirements.txt @@ -1 +1,2 @@ jmespath==1.0.1 +polars==0.19.12 #Would like to use 0.20.31, but getting this: No matching distribution found for polars==0.20.31