From 2e2c6e4d60ef5ad255fc03c9651a4cce152f7bab Mon Sep 17 00:00:00 2001 From: Ailton Silva Date: Tue, 11 Jun 2024 11:36:10 -0300 Subject: [PATCH 1/2] fix: production delivery result errors --- src/KafkaFlow/Producers/MessageProducer.cs | 32 ++++++++++++++++++---- 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/src/KafkaFlow/Producers/MessageProducer.cs b/src/KafkaFlow/Producers/MessageProducer.cs index 4894023ea..e601f5e7f 100644 --- a/src/KafkaFlow/Producers/MessageProducer.cs +++ b/src/KafkaFlow/Producers/MessageProducer.cs @@ -258,7 +258,17 @@ private IProducer EnsureProducer() { foreach (var handler in _configuration.StatisticsHandlers) { - handler.Invoke(statistics); + try + { + handler?.Invoke(statistics); + } + catch (Exception ex) + { + _logHandler.Error("Kafka Producer StatisticsHandler Error", ex, new + { + ProducerName = _configuration.Name, + }); + } } }); @@ -351,14 +361,24 @@ private void InternalProduce( void Handler(DeliveryReport report) { - if (report.Error.IsFatal) + try { - this.InvalidateProducer(report.Error, report); - } + if (report.Error.IsFatal) + { + this.InvalidateProducer(report.Error, report); + } - FillContextWithResultMetadata(context, report); + FillContextWithResultMetadata(context, report); + } + catch (Exception ex) + { + _logHandler.Error("Delivery Result Handler Error", ex, new + { + Report = report, + }); + } - deliveryHandler(report); + deliveryHandler?.Invoke(report); } } From 129b58d2e5c749e89479925e0d292c53580752a9 Mon Sep 17 00:00:00 2001 From: Ailton Silva Date: Tue, 11 Jun 2024 11:38:12 -0300 Subject: [PATCH 2/2] chore: add logs for throubleshooting --- src/KafkaFlow/Producers/MessageProducer.cs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/KafkaFlow/Producers/MessageProducer.cs b/src/KafkaFlow/Producers/MessageProducer.cs index e601f5e7f..6cf6e5288 100644 --- a/src/KafkaFlow/Producers/MessageProducer.cs +++ b/src/KafkaFlow/Producers/MessageProducer.cs @@ -188,9 +188,19 @@ public void Dispose() _producer?.Dispose(); } - private static void FillContextWithResultMetadata(IMessageContext context, DeliveryResult result) + private void FillContextWithResultMetadata(IMessageContext context, DeliveryResult result) { - var concreteProducerContext = (ProducerContext)context.ProducerContext; + var concreteProducerContext = context.ProducerContext as ProducerContext; + + if (concreteProducerContext is null) + { + _logHandler.Warning("Producer context is null on FillContextWithResultMetadata", new + { + DeliveryResult = result, + }); + + return; + } concreteProducerContext.Offset = result.Offset; concreteProducerContext.Partition = result.Partition;