Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PyFlink/Polars WIP #15

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyflink-decodable/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ $(TARGET):

$(VENV): requirements.txt
$(PY) -m venv $(VENV)
$(BIN)/pip install -r requirements.txt --target=${PYTHON_LIBS}
$(BIN)/pip install --platform manylinux2014_aarch64 --only-binary=:all: --upgrade -r requirements.txt --target=${PYTHON_LIBS}
touch $(VENV)

$(LIBS)/flink-sql-connector-kafka-3.0.2-1.18.jar: | $(TARGET)
Expand Down
34 changes: 19 additions & 15 deletions pyflink-decodable/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,30 @@
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

import polars as pl
from datetime import datetime

@udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.STRING())
def get_user_name(id):
df = pl.DataFrame(
{
"integer": [1, 2, 3],
"date": [
datetime(2025, 1, 1),
datetime(2025, 1, 2),
datetime(2025, 1, 3),
],
"float": [4.0, 5.0, 6.0],
"string": ["a", "b", "c"],
}
)

print(df)

r = requests.get('https://jsonplaceholder.typicode.com/users/' + str(id))
return jmespath.search("name", json.loads(r.text))

def process_todos():
with open('/opt/pipeline-secrets/gm_todo_kafka_user_name', 'r') as file:
user_name = file.read()
with open('/opt/pipeline-secrets/gm_todo_kafka_password', 'r') as file:
password = file.read()
with open('/opt/pipeline-secrets/gm_todo_kafka_bootstrap_servers', 'r') as file:
bootstrap_servers = file.read()

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

Expand Down Expand Up @@ -76,14 +87,7 @@ def process_todos():
due TIMESTAMP(3),
user_name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'todos',
'properties.bootstrap.servers' = '{bootstrap_servers}',
'properties.sasl.mechanism' = 'SCRAM-SHA-256',
'properties.security.protocol' = 'SASL_SSL',
'properties.sasl.jaas.config' = 'org.apache.flink.kafka.shaded.org.apache.kafka.common.security.scram.ScramLoginModule required username=\"{user_name}\" password=\"{password}\";',
'properties.group.id' = 'todos-sink',
'format' = 'json'
'connector' = 'blackhole'
)""")

t_env.execute_sql("""
Expand Down
1 change: 1 addition & 0 deletions pyflink-decodable/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
jmespath==1.0.1
polars==0.19.12 #Would like to use 0.20.31, but getting this: No matching distribution found for polars==0.20.31
Loading