Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] A single bad message can cause Flink CDC writer to stuck in an infinite loop waiting for the schema change #4239

Open
2 tasks done
AshishKhatkar opened this issue Sep 23, 2024 · 5 comments · May be fixed by #4295
Open
2 tasks done
Labels
bug Something isn't working

Comments

@AshishKhatkar
Copy link
Contributor

AshishKhatkar commented Sep 23, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Paimon version

0.9.0

Compute Engine

Flink 1.18.1

Minimal reproduce step

You can replicate this in local environment using any schema. You can use flink paimon action jar to sink Kafka cdc data into paimon. An example:

Schema : {'update_time' : DATE}
Produce data like : {'update_time': -2954}

This will cause the Flink ingestion job to run into an infinite loop waiting for the schema to update.

Error example :

2024-09-23 05:32:12,439 [] INFO  org.apache.paimon.flink.sink.cdc.CdcRecordUtils              [] - Failed to convert value [{"update_time": -2954}] to type ROW<update_time DATE 'update_time'>. Waiting for schema update.
java.lang.RuntimeException: Failed to parse Json String {"update_time": -2954}
	at org.apache.paimon.utils.TypeUtils.castFromStringInternal(TypeUtils.java:192) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
	at org.apache.paimon.utils.TypeUtils.castFromCdcValueString(TypeUtils.java:96) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
	at org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow(CdcRecordUtils.java:105) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
	at org.apache.paimon.flink.sink.cdc.CdcRecordStoreWriteOperator.processElement(CdcRecordStoreWriteOperator.java:80) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) ~[flink-dist-1.18.0.jar:1.18.0]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) ~[flink-dist-1.18.0.jar:1.18.0]
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) ~[flink-dist-1.18.0.jar:1.18.0]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.18.0.jar:1.18.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) ~[flink-dist-1.18.0.jar:1.18.0]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.18.0.jar:1.18.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) ~[flink-dist-1.18.0.jar:1.18.0]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) ~[flink-dist-1.18.0.jar:1.18.0]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) [flink-dist-1.18.0.jar:1.18.0]
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) [flink-dist-1.18.0.jar:1.18.0]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) [flink-dist-1.18.0.jar:1.18.0]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.18.0.jar:1.18.0]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.RuntimeException: Failed to parse Json String {"update_time": -2954}
	at org.apache.paimon.utils.TypeUtils.castFromStringInternal(TypeUtils.java:259) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
	at org.apache.paimon.utils.TypeUtils.castFromStringInternal(TypeUtils.java:177) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
	... 16 more
Caused by: java.time.DateTimeException: For input string: '-2954'.
	at org.apache.paimon.utils.BinaryStringUtils.toDate(BinaryStringUtils.java:289) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
	at org.apache.paimon.utils.TypeUtils.castFromStringInternal(TypeUtils.java:157) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
	at org.apache.paimon.utils.TypeUtils.castFromStringInternal(TypeUtils.java:243) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
	at org.apache.paimon.utils.TypeUtils.castFromStringInternal(TypeUtils.java:177) ~[paimon-flink-1.18-0.9-SNAPSHOT.jar:0.9-SNAPSHOT]
	... 16 more

What doesn't meet your expectations?

This caused the job to run into an infinite loop and fail checkpointing. I expected it to loudly fail with the exception that message it got is corrupted instead of silently running into an infinite loop. All the exceptions in Flink UI pointed to checkpoint timeout.

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@AshishKhatkar AshishKhatkar added the bug Something isn't working label Sep 23, 2024
@AshishKhatkar
Copy link
Contributor Author

A possible way to tackle this can be to try a configurable no of time before bubbling the exception. We can have a config cdc.retry-num-times with default as 3, if the conversion to genericRow still fails, we can fail the job by bubbling up the exception.

@AshishKhatkar
Copy link
Contributor Author

@JingsongLi @zhuangchong any thoughts on this?

@JingsongLi
Copy link
Contributor

retry times looks good to me! Maybe default 3 is too small. cdc.retry-sleep-time is 0.5 seconds. The default value can be 100?

@AshishKhatkar
Copy link
Contributor Author

retry times looks good to me! Maybe default 3 is too small. cdc.retry-sleep-time is 0.5 seconds. The default value can be 100?

All right I will work on the PR.

@AshishKhatkar
Copy link
Contributor Author

@JingsongLi alternatively if it is a corrupt record, it is going to keep on failing unless that record is somehow skipped. Do you think we should also provide a functionality to skip the record as well with a default for skipping as false?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
2 participants