Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deduplication logic for kafka sink in case of spark task retries #240

Open
kevinwallimann opened this issue Sep 29, 2021 · 0 comments
Open

Comments

@kevinwallimann
Copy link
Collaborator

kevinwallimann commented Sep 29, 2021

Problem description
Spark does not provide an exactly-once behaviour for the Kafka sink, but only at-least-once, and will probably never do so (apache/spark#25618). Under certain assumptions (no concurrent producers, only 1 destination topic, not too big micro-batches, messages don't change between retries), idempotency can still be achieved. See #177.

The DeduplicateKafkaSinkTransformer only addresses retries on an application level. However, retries (and therefore duplicates) may happen on lower levels as well, namely:

  • Retry of a DataWritingSparkTask (the Spark task that will invoke the KafkaProducer)
  • Internal retry of the KafkaProducer (when encountering a RetriableException, e.g. server disconnected)
    Duplicates due to an internal retry of the KafkaProducer can be prevented by setting acks=all and enable.idempotence on the kafka writer. However, this does not take into account retries of the DataWritingSparkTask which invokes the KafkaProducer. For example, if there is an intermittent TopicAuthorizationException, the KafkaProducer will fail and not retry, but the DataWritingSparkTask will retry nevertheless. In such a case, duplications are still possible. Another example is executor failure due to exceeding memory limits. If an executor exceeds memory limits during the DataWritingSparkTask, it will be terminated and another executor will retry the task, which may again lead to duplicates on the destination topic.
    One solution is to switch off spark task retries, but obviously, this causes other problems.

Solution
It might be possible to implement a org.apache.kafka.clients.producer.ProducerInterceptor which would deduplicate retries messages in a similar way as the DeduplicateKafkaSinkTransformer. This would capture duplicates in a scenario where the retry happens on a different executor than the original executor (memory exceeded scenario). In addition, the interceptor should keep a lookup set in memory to capture duplicates that occurred due to retries on the same executor
(TopicAuthorizationException scenario).
A reattempt could be recognized through the attempt nr of the Spark task.
If messages cannot be dropped through the ProducerInterceptor, they could at least be redirected to a trash-topic

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant