diff --git a/src/KafkaFlow.Abstractions/IConsumerContext.cs b/src/KafkaFlow.Abstractions/IConsumerContext.cs index b185d9c2b..9bded7972 100644 --- a/src/KafkaFlow.Abstractions/IConsumerContext.cs +++ b/src/KafkaFlow.Abstractions/IConsumerContext.cs @@ -1,6 +1,7 @@ namespace KafkaFlow { using System; + using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; @@ -24,6 +25,11 @@ public interface IConsumerContext /// int WorkerId { get; } + /// + /// Gets the brokers associated with the message + /// + public IReadOnlyCollection Brokers { get; } + /// /// Gets the topic associated with the message /// diff --git a/src/KafkaFlow.Abstractions/IProducerContext.cs b/src/KafkaFlow.Abstractions/IProducerContext.cs index 1f63af400..bda9d4284 100644 --- a/src/KafkaFlow.Abstractions/IProducerContext.cs +++ b/src/KafkaFlow.Abstractions/IProducerContext.cs @@ -1,5 +1,7 @@ namespace KafkaFlow; +using System.Collections.Generic; + /// /// Some producer metadata /// @@ -26,4 +28,9 @@ public interface IProducerContext /// that are scoped to the lifecycle of a single producer. /// IDependencyResolver DependencyResolver { get; } + + /// + /// Gets the brokers associated with the message + /// + public IReadOnlyCollection Brokers { get; } } diff --git a/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs b/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs index 9a965dee5..0508a010d 100644 --- a/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs +++ b/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs @@ -21,9 +21,10 @@ internal static class ActivitySourceAccessor internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString(); internal static readonly ActivitySource ActivitySource = new(ActivitySourceName, Version); - public static void SetGenericTags(Activity activity) + public static void SetGenericTags(Activity activity, IEnumerable bootstrapServers) { activity?.SetTag(Conventions.AttributeMessagingSystem, MessagingSystemId); + activity?.SetTag(Conventions.AttributePeerService, string.Join(",", bootstrapServers)); } public static ActivityEvent CreateExceptionEvent(Exception exception) diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs index 10fa794e9..7b6b041c8 100644 --- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs @@ -38,7 +38,7 @@ public static Task OnConsumeStarted(IMessageContext context) context?.Items.Add(ActivitySourceAccessor.ActivityString, activity); - ActivitySourceAccessor.SetGenericTags(activity); + ActivitySourceAccessor.SetGenericTags(activity, context?.ConsumerContext.Brokers); if (activity != null && activity.IsAllDataRequested) { @@ -84,7 +84,16 @@ private static IEnumerable ExtractTraceContextIntoBasicProperties(IMessa private static void SetConsumerTags(IMessageContext context, Activity activity) { - var messageKey = Encoding.UTF8.GetString(context.Message.Key as byte[]); + string messageKey = string.Empty; + + try + { + messageKey = Encoding.UTF8.GetString(context.Message.Key as byte[]); + } + catch + { + // Do nothing + } activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, ProcessString); activity.SetTag(AttributeMessagingSourceName, context.ConsumerContext.Topic); diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs index 3630157c1..b603ce0a5 100644 --- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs @@ -48,7 +48,7 @@ public static Task OnProducerStarted(IMessageContext context) // Inject the ActivityContext into the message headers to propagate trace context to the receiving service. Propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), context, InjectTraceContextIntoBasicProperties); - ActivitySourceAccessor.SetGenericTags(activity); + ActivitySourceAccessor.SetGenericTags(activity, context?.ProducerContext.Brokers); if (activity != null && activity.IsAllDataRequested) { diff --git a/src/KafkaFlow/Consumers/ConsumerContext.cs b/src/KafkaFlow/Consumers/ConsumerContext.cs index 794dd7031..fe4bc4b91 100644 --- a/src/KafkaFlow/Consumers/ConsumerContext.cs +++ b/src/KafkaFlow/Consumers/ConsumerContext.cs @@ -1,6 +1,7 @@ namespace KafkaFlow.Consumers { using System; + using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using Confluent.Kafka; @@ -63,6 +64,8 @@ public ConsumerContext( public Task Completion => this.completionSource.Task; + public IReadOnlyCollection Brokers => this.consumer.Configuration.ClusterConfiguration.Brokers; + public void Complete() { if (this.ShouldStoreOffset) diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs index ea0dfa6c1..a40f2143a 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs @@ -131,11 +131,20 @@ private async Task ProcessMessageAsync(IMessageContext context, CancellationToke { await this.globalEvents.FireMessageConsumeStartedAsync(new MessageEventContext(context)); + _= context.ConsumerContext.Completion.ContinueWith( + async task => + { + if (task.IsFaulted) + { + await this.globalEvents.FireMessageConsumeErrorAsync(new MessageErrorEventContext(context, task.Exception)); + } + + await this.globalEvents.FireMessageConsumeCompletedAsync(new MessageEventContext(context)); + }); + await this.middlewareExecutor .Execute(context, _ => Task.CompletedTask) .ConfigureAwait(false); - - await this.globalEvents.FireMessageConsumeCompletedAsync(new MessageEventContext(context)); } catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) { diff --git a/src/KafkaFlow/Producers/MessageProducer.cs b/src/KafkaFlow/Producers/MessageProducer.cs index 124894916..dbb8813ea 100644 --- a/src/KafkaFlow/Producers/MessageProducer.cs +++ b/src/KafkaFlow/Producers/MessageProducer.cs @@ -360,7 +360,7 @@ private MessageContext CreateMessageContext( headers, messageScopedResolver, null, - new ProducerContext(topic, this.producerDependencyScope.Resolver)); + new ProducerContext(topic, this.producerDependencyScope.Resolver, this.configuration.Cluster.Brokers)); } } } diff --git a/src/KafkaFlow/Producers/ProducerContext.cs b/src/KafkaFlow/Producers/ProducerContext.cs index 98e8d17fc..8fd7b13d0 100644 --- a/src/KafkaFlow/Producers/ProducerContext.cs +++ b/src/KafkaFlow/Producers/ProducerContext.cs @@ -1,11 +1,17 @@ namespace KafkaFlow.Producers { + using System.Collections.Generic; + internal class ProducerContext : IProducerContext { - public ProducerContext(string topic, IDependencyResolver producerDependencyResolver) + public ProducerContext( + string topic, + IDependencyResolver producerDependencyResolver, + IReadOnlyCollection brokers) { this.Topic = topic; this.DependencyResolver = producerDependencyResolver; + this.Brokers = brokers; } public string Topic { get; } @@ -15,5 +21,7 @@ public ProducerContext(string topic, IDependencyResolver producerDependencyResol public long? Offset { get; set; } public IDependencyResolver DependencyResolver { get; } + + public IReadOnlyCollection Brokers { get; } } } diff --git a/website/docs/guides/global-events.md b/website/docs/guides/global-events.md index 87803b12d..b8eae9c2f 100644 --- a/website/docs/guides/global-events.md +++ b/website/docs/guides/global-events.md @@ -4,7 +4,7 @@ sidebar_position: 9 # Global Events -In this section, we will delve into the concept of Global Events in KafkaFlow, which provides a mechanism to subscribe to various events that are triggered during the message production and consumption processes. +In this section, we will delve into the concept of Global Events in KafkaFlow, which provides a mechanism to subscribe to various events that are triggered during the message production and consumption processes. KafkaFlow offers a range of Global Events that can be subscribed to. These events can be used to monitor and react to different stages of message handling. Below is a list of available events: - [Message Produce Started Event](#message-produce-started-event) @@ -82,10 +82,6 @@ services.AddKafka( The Message Consume Completed Event signals the successful completion of message consumption. By subscribing to this event, you can track when messages have been successfully processed. -:::info -Please note that the current event is not compatible with Batch Consume in the current version (v2). However, this limitation is expected to be addressed in future releases (v3+). -:::info - ```csharp services.AddKafka( kafka => kafka