Skip to content

Commit

Permalink
feat: adapt open telemetry to release 3.0
Browse files Browse the repository at this point in the history
Include peer.service tag for producer and consumer spans

Close consumer span when Complete() is invoked to have a correctly
time closing for both single and batch consumers
  • Loading branch information
simaoribeiro committed Oct 31, 2023
1 parent d5a9c21 commit f8840a9
Show file tree
Hide file tree
Showing 10 changed files with 52 additions and 13 deletions.
6 changes: 6 additions & 0 deletions src/KafkaFlow.Abstractions/IConsumerContext.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

Expand All @@ -24,6 +25,11 @@ public interface IConsumerContext
/// </summary>
int WorkerId { get; }

/// <summary>
/// Gets the brokers associated with the message
/// </summary>
public IReadOnlyCollection<string> Brokers { get; }

/// <summary>
/// Gets the topic associated with the message
/// </summary>
Expand Down
7 changes: 7 additions & 0 deletions src/KafkaFlow.Abstractions/IProducerContext.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
namespace KafkaFlow;

using System.Collections.Generic;

/// <summary>
/// Some producer metadata
/// </summary>
Expand All @@ -26,4 +28,9 @@ public interface IProducerContext
/// that are scoped to the lifecycle of a single producer.
/// </summary>
IDependencyResolver DependencyResolver { get; }

/// <summary>
/// Gets the brokers associated with the message
/// </summary>
public IReadOnlyCollection<string> Brokers { get; }
}
3 changes: 2 additions & 1 deletion src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ internal static class ActivitySourceAccessor
internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString();
internal static readonly ActivitySource ActivitySource = new(ActivitySourceName, Version);

public static void SetGenericTags(Activity activity)
public static void SetGenericTags(Activity activity, IEnumerable<string> bootstrapServers)
{
activity?.SetTag(Conventions.AttributeMessagingSystem, MessagingSystemId);
activity?.SetTag(Conventions.AttributePeerService, string.Join(",", bootstrapServers));
}

public static ActivityEvent CreateExceptionEvent(Exception exception)
Expand Down
13 changes: 11 additions & 2 deletions src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static Task OnConsumeStarted(IMessageContext context)

context?.Items.Add(ActivitySourceAccessor.ActivityString, activity);

ActivitySourceAccessor.SetGenericTags(activity);
ActivitySourceAccessor.SetGenericTags(activity, context?.ConsumerContext.Brokers);

if (activity != null && activity.IsAllDataRequested)
{
Expand Down Expand Up @@ -84,7 +84,16 @@ private static IEnumerable<string> ExtractTraceContextIntoBasicProperties(IMessa

private static void SetConsumerTags(IMessageContext context, Activity activity)
{
var messageKey = Encoding.UTF8.GetString(context.Message.Key as byte[]);
string messageKey = string.Empty;

try
{
messageKey = Encoding.UTF8.GetString(context.Message.Key as byte[]);
}
catch
{
// Do nothing
}

activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, ProcessString);
activity.SetTag(AttributeMessagingSourceName, context.ConsumerContext.Topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static Task OnProducerStarted(IMessageContext context)
// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
Propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), context, InjectTraceContextIntoBasicProperties);

ActivitySourceAccessor.SetGenericTags(activity);
ActivitySourceAccessor.SetGenericTags(activity, context?.ProducerContext.Brokers);

if (activity != null && activity.IsAllDataRequested)
{
Expand Down
3 changes: 3 additions & 0 deletions src/KafkaFlow/Consumers/ConsumerContext.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow.Consumers
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
Expand Down Expand Up @@ -63,6 +64,8 @@ public ConsumerContext(

public Task<TopicPartitionOffset> Completion => this.completionSource.Task;

public IReadOnlyCollection<string> Brokers => this.consumer.Configuration.ClusterConfiguration.Brokers;

public void Complete()
{
if (this.ShouldStoreOffset)
Expand Down
13 changes: 11 additions & 2 deletions src/KafkaFlow/Consumers/ConsumerWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,20 @@ private async Task ProcessMessageAsync(IMessageContext context, CancellationToke
{
await this.globalEvents.FireMessageConsumeStartedAsync(new MessageEventContext(context));

_= context.ConsumerContext.Completion.ContinueWith(
async task =>
{
if (task.IsFaulted)
{
await this.globalEvents.FireMessageConsumeErrorAsync(new MessageErrorEventContext(context, task.Exception));
}
await this.globalEvents.FireMessageConsumeCompletedAsync(new MessageEventContext(context));
});

await this.middlewareExecutor
.Execute(context, _ => Task.CompletedTask)
.ConfigureAwait(false);

await this.globalEvents.FireMessageConsumeCompletedAsync(new MessageEventContext(context));
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
Expand Down
2 changes: 1 addition & 1 deletion src/KafkaFlow/Producers/MessageProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ private MessageContext CreateMessageContext(
headers,
messageScopedResolver,
null,
new ProducerContext(topic, this.producerDependencyScope.Resolver));
new ProducerContext(topic, this.producerDependencyScope.Resolver, this.configuration.Cluster.Brokers));
}
}
}
10 changes: 9 additions & 1 deletion src/KafkaFlow/Producers/ProducerContext.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
namespace KafkaFlow.Producers
{
using System.Collections.Generic;

internal class ProducerContext : IProducerContext
{
public ProducerContext(string topic, IDependencyResolver producerDependencyResolver)
public ProducerContext(
string topic,
IDependencyResolver producerDependencyResolver,
IReadOnlyCollection<string> brokers)
{
this.Topic = topic;
this.DependencyResolver = producerDependencyResolver;
this.Brokers = brokers;
}

public string Topic { get; }
Expand All @@ -15,5 +21,7 @@ public ProducerContext(string topic, IDependencyResolver producerDependencyResol
public long? Offset { get; set; }

public IDependencyResolver DependencyResolver { get; }

public IReadOnlyCollection<string> Brokers { get; }
}
}
6 changes: 1 addition & 5 deletions website/docs/guides/global-events.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ sidebar_position: 9

# Global Events

In this section, we will delve into the concept of Global Events in KafkaFlow, which provides a mechanism to subscribe to various events that are triggered during the message production and consumption processes.
In this section, we will delve into the concept of Global Events in KafkaFlow, which provides a mechanism to subscribe to various events that are triggered during the message production and consumption processes.

KafkaFlow offers a range of Global Events that can be subscribed to. These events can be used to monitor and react to different stages of message handling. Below is a list of available events:
- [Message Produce Started Event](#message-produce-started-event)
Expand Down Expand Up @@ -82,10 +82,6 @@ services.AddKafka(

The Message Consume Completed Event signals the successful completion of message consumption. By subscribing to this event, you can track when messages have been successfully processed.

:::info
Please note that the current event is not compatible with Batch Consume in the current version (v2). However, this limitation is expected to be addressed in future releases (v3+).
:::info

```csharp
services.AddKafka(
kafka => kafka
Expand Down

0 comments on commit f8840a9

Please sign in to comment.