Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

What could be causing inconsistent message processing in a Kafka-based system used to synchronize data between MSSQL and MySQL databases, despite successful message delivery to the Kafka topic #780

Open
nithinmurali94 opened this issue May 21, 2024 · 0 comments

Comments

@nithinmurali94
Copy link

Hi,
I'm encountering an issue with my application. It involves using Kafka to synchronize data between MSSQL and MySQL databases. Specifically, I've employed the Kafka JDBC connector to retrieve data from MSSQL. Additionally, I've developed a Faust application to process messages from the Kafka topic, which in this instance is named 'orderentry'.

In my testing, I noticed that when I input 100 records (modifying the MSSQL database) into the Kafka topic 'orderentry', I verified that 100 messages successfully reached the topic by monitoring it through the terminal. However, I've observed inconsistent behavior with Faust processing. Sometimes it successfully processes all 100 messages, but at other times, it only processes a subset, such as 75 or 80 messages, without any apparent errors logged.

I'm uncertain why this discrepancy occurs. Below, I've included a portion of the relevant code for reference.

app = faust.App('alpha100', broker='kafka://localhost')

@app.agent(app.topic('OrderEntry', key_serializer='json', value_serializer='json'), concurrency=10)
async def alpha2_OrderEntry_handler(stream):
async for key, value in stream.items():
try:
print("[{}] alpha2 OrderEntry Topic Handler |".format(
utils.get_current_datetime_in_ms()))
ariel_inventory.order_detail(value)
print("Sync using Topic From alpha2 to Ariel4 | alpha2.OrderEntry | OrderEntry data is : {}".format(value['payload']))
if utils.check_alpha2_sync_flag(value):
await app.loop.run_in_executor(thread_pool, test_order, value)
except Exception as ex :
print(f"{key} error in processing: {ex}")

Please help! I'm stuck

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant