Skip to content

Commit

Permalink
fix: opentelemetry broken traces
Browse files Browse the repository at this point in the history
Activity.Current is a static variable that uses AsyncLocal internally,
which means that it flows into async calls,
but not back out into the caller (message consumer and producer).

As so, the activity.current is null after the produce and consume of the
message, which means that all the spans created after it will be
generate a new trace because the context is not being propagated.
  • Loading branch information
simaoribeiro committed Nov 16, 2023
1 parent 21db5e3 commit 8172b54
Show file tree
Hide file tree
Showing 12 changed files with 144 additions and 43 deletions.
12 changes: 12 additions & 0 deletions src/KafkaFlow.Abstractions/ActivityOperationType.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace KafkaFlow
{
/// <summary>Activity operation names enum values</summary>
public enum ActivityOperationType
{
/// <summary>PUBLISH</summary>
Publish,

/// <summary>PROCESS</summary>
Process,
}
}
28 changes: 28 additions & 0 deletions src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System.Diagnostics;

Check warning on line 1 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Using directive should appear within a namespace declaration (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1200.md)

Check warning on line 1 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

View workflow job for this annotation

GitHub Actions / build

Using directive should appear within a namespace declaration (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1200.md) [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]

Check warning on line 1 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

View workflow job for this annotation

GitHub Actions / build

Using directive should appear within a namespace declaration (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1200.md) [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]

Check warning on line 1 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

View workflow job for this annotation

GitHub Actions / build

Field '_activityString' should not begin with an underscore (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1309.md) [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]

Check warning on line 1 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

View workflow job for this annotation

GitHub Actions / build

Field '_activitySource' should not begin with an underscore (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1309.md) [/home/runner/work/kafkaflow/kafkaflow/src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj]
using System.Reflection;

Check warning on line 2 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Using directive should appear within a namespace declaration (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1200.md)

namespace KafkaFlow
{
/// <summary>
/// ActivitySource properties
/// </summary>
public static class ActivitySourceAccessor
{
internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString();

private const string _activityString = "otel_activity";

Check warning on line 13 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Field '_activityString' should not begin with an underscore (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1309.md)

private static readonly ActivitySource _activitySource = new("KafkaFlow.OpenTelemetry", Version);

Check warning on line 15 in src/KafkaFlow.Abstractions/ActivitySourceAccessor.cs

View workflow job for this annotation

GitHub Actions / Test deployment

Field '_activitySource' should not begin with an underscore (https://github.com/DotNetAnalyzers/StyleCopAnalyzers/blob/master/documentation/SA1309.md)

/// <summary>
/// Gets the name of the OpenTelemetry Activity that is used as a key
/// in MessageContext.Items dictionary
/// </summary>
public static string ActivityString => _activityString;

/// <summary>
/// Gets the ActivitySource name that is used in KafkaFlow
/// </summary>
public static ActivitySource ActivitySource => _activitySource;
}
}
4 changes: 4 additions & 0 deletions src/KafkaFlow.Abstractions/KafkaFlow.Abstractions.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,8 @@
<Description>Contains all KafkaFlow extendable interfaces</Description>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="6.0.1" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
namespace KafkaFlow.IntegrationTests.Core.Middlewares
{
using System.Diagnostics;
using System.Threading.Tasks;
using KafkaFlow.IntegrationTests.Core.Handlers;

internal class GzipMiddleware : IMessageMiddleware
{
public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
{
using var activity = ActivitySourceAccessor.ActivitySource.StartActivity("integration-test", ActivityKind.Internal);

MessageStorage.Add((byte[]) context.Message.Value);
await next(context);
}
Expand Down
40 changes: 35 additions & 5 deletions src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,43 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans
await producer.ProduceAsync(null, message);

// Assert
var (producerSpan, consumerSpan) = await this.WaitForSpansAsync();
var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();

Assert.IsNotNull(this.exportedItems);
Assert.IsNull(producerSpan.ParentId);
Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId);
}

[TestMethod]
public async Task AddOpenTelemetry_CreateActivityOnConsumingMessage_TraceIsPropagatedToCreatedActivity()
{
// Arrange
var provider = await this.GetServiceProvider();
MessageStorage.Clear();

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource("KafkaFlow.OpenTelemetry")
.AddInMemoryExporter(this.exportedItems)
.Build();

var producer = provider.GetRequiredService<IMessageProducer<GzipProducer>>();
var message = this.fixture.Create<byte[]>();

// Act
await producer.ProduceAsync(null, message);

// Assert
var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();

Assert.IsNotNull(this.exportedItems);
Assert.IsNull(producerSpan.ParentId);
Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId);
Assert.AreEqual(internalSpan.TraceId, consumerSpan.TraceId);
Assert.AreEqual(internalSpan.ParentSpanId, consumerSpan.SpanId);
}

[TestMethod]
public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsPropagatedFromTestActivityToConsumer()
{
Expand Down Expand Up @@ -97,7 +126,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp
await producer.ProduceAsync(null, message);

// Assert
var (producerSpan, consumerSpan) = await this.WaitForSpansAsync();
var (producerSpan, consumerSpan, internalSpan) = await this.WaitForSpansAsync();

Assert.IsNotNull(this.exportedItems);
Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId);
Expand Down Expand Up @@ -181,9 +210,9 @@ await Policy
.ExecuteAsync(() => Task.FromResult(this.isPartitionAssigned));
}

private async Task<(Activity producerSpan, Activity consumerSpan)> WaitForSpansAsync()
private async Task<(Activity producerSpan, Activity consumerSpan, Activity internalSpan)> WaitForSpansAsync()
{
Activity producerSpan = null, consumerSpan = null;
Activity producerSpan = null, consumerSpan = null, internalSpan = null;

await Policy
.HandleResult<bool>(isAvailable => !isAvailable)
Expand All @@ -192,11 +221,12 @@ await Policy
{
producerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Producer);
consumerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Consumer);
internalSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Internal);
return Task.FromResult(producerSpan != null && consumerSpan != null);
});

return (producerSpan, consumerSpan);
return (producerSpan, consumerSpan, internalSpan);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,15 @@ namespace KafkaFlow.OpenTelemetry
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reflection;
using Conventions = SemanticConventions::OpenTelemetry.Trace.TraceSemanticConventions;

internal static class ActivitySourceAccessor
internal static class ActivityAccessor
{
internal const string ActivityString = "otel_activity";
internal const string ExceptionEventKey = "exception";
internal const string MessagingSystemId = "kafka";
internal const string AttributeMessagingOperation = "messaging.operation";
internal const string AttributeMessagingKafkaMessageKey = "messaging.kafka.message.key";
internal const string AttributeMessagingKafkaMessageOffset = "messaging.kafka.message.offset";
internal static readonly AssemblyName AssemblyName = typeof(ActivitySourceAccessor).Assembly.GetName();
internal static readonly string ActivitySourceName = AssemblyName.Name;
internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString();
internal static readonly ActivitySource ActivitySource = new(ActivitySourceName, Version);

public static void SetGenericTags(Activity activity)
{
Expand Down
28 changes: 13 additions & 15 deletions src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,23 @@ public static Task OnConsumeStarted(IMessageContext context)
{
try
{
var activityName = !string.IsNullOrEmpty(context?.ConsumerContext.Topic) ? $"{context?.ConsumerContext.Topic} {ProcessString}" : ProcessString;

// Extract the PropagationContext of the upstream parent from the message headers.
var parentContext = Propagator.Extract(new PropagationContext(default, Baggage.Current), context, ExtractTraceContextIntoBasicProperties);
Baggage.Current = parentContext.Baggage;

// Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification.
// The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity.
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md
var activity = ActivitySourceAccessor.ActivitySource.StartActivity(activityName, ActivityKind.Consumer, parentContext.ActivityContext);
var activity = context.Items[ActivitySourceAccessor.ActivityString] as Activity;

if (parentContext.ActivityContext.IsValid())
{
activity.SetParentId(parentContext.ActivityContext.TraceId, parentContext.ActivityContext.SpanId, parentContext.ActivityContext.TraceFlags);
}

foreach (var item in Baggage.Current)
{
activity?.AddBaggage(item.Key, item.Value);
}

context?.Items.Add(ActivitySourceAccessor.ActivityString, activity);

ActivitySourceAccessor.SetGenericTags(activity);
ActivityAccessor.SetGenericTags(activity);

if (activity != null && activity.IsAllDataRequested)
{
Expand All @@ -57,7 +55,7 @@ public static Task OnConsumeCompleted(IMessageContext context)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
activity?.Dispose();
activity?.Stop();
}

return Task.CompletedTask;
Expand All @@ -67,11 +65,11 @@ public static Task OnConsumeError(IMessageContext context, Exception ex)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
var exceptionEvent = ActivitySourceAccessor.CreateExceptionEvent(ex);
var exceptionEvent = ActivityAccessor.CreateExceptionEvent(ex);

activity?.AddEvent(exceptionEvent);

activity?.Dispose();
activity?.Stop();
}

return Task.CompletedTask;
Expand All @@ -86,11 +84,11 @@ private static void SetConsumerTags(IMessageContext context, Activity activity)
{
var messageKey = Encoding.UTF8.GetString(context.Message.Key as byte[]);

activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, ProcessString);
activity.SetTag(ActivityAccessor.AttributeMessagingOperation, ActivityOperationType.Process.ToString().ToLower());
activity.SetTag(AttributeMessagingSourceName, context.ConsumerContext.Topic);
activity.SetTag(AttributeMessagingKafkaConsumerGroup, context.ConsumerContext.GroupId);
activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageKey, messageKey);
activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageOffset, context.ConsumerContext.Offset);
activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageKey, messageKey);
activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageOffset, context.ConsumerContext.Offset);
activity.SetTag(AttributeMessagingKafkaSourcePartition, context.ConsumerContext.Partition);
}
}
Expand Down
24 changes: 8 additions & 16 deletions src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

internal static class OpenTelemetryProducerEventsHandler
{
private const string PublishString = "publish";
private const string AttributeMessagingDestinationName = "messaging.destination.name";
private const string AttributeMessagingKafkaDestinationPartition = "messaging.kafka.destination.partition";
private static readonly TextMapPropagator Propagator = Propagators.DefaultTextMapPropagator;
Expand All @@ -19,12 +18,7 @@ public static Task OnProducerStarted(IMessageContext context)
{
try
{
var activityName = !string.IsNullOrEmpty(context?.ProducerContext.Topic) ? $"{context?.ProducerContext.Topic} {PublishString}" : PublishString;

// Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification.
// The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity.
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md
var activity = ActivitySourceAccessor.ActivitySource.StartActivity(activityName, ActivityKind.Producer);
var activity = context.Items[ActivitySourceAccessor.ActivityString] as Activity;

// Depending on Sampling (and whether a listener is registered or not), the
// activity above may not be created.
Expand All @@ -34,8 +28,6 @@ public static Task OnProducerStarted(IMessageContext context)

if (activity != null)
{
context?.Items.Add(ActivitySourceAccessor.ActivityString, activity);

contextToInject = activity.Context;
}
else if (Activity.Current != null)
Expand All @@ -48,7 +40,7 @@ public static Task OnProducerStarted(IMessageContext context)
// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
Propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), context, InjectTraceContextIntoBasicProperties);

ActivitySourceAccessor.SetGenericTags(activity);
ActivityAccessor.SetGenericTags(activity);

if (activity != null && activity.IsAllDataRequested)
{
Expand All @@ -67,7 +59,7 @@ public static Task OnProducerCompleted(IMessageContext context)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
activity?.Dispose();
activity?.Stop();
}

return Task.CompletedTask;
Expand All @@ -77,11 +69,11 @@ public static Task OnProducerError(IMessageContext context, Exception ex)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
var exceptionEvent = ActivitySourceAccessor.CreateExceptionEvent(ex);
var exceptionEvent = ActivityAccessor.CreateExceptionEvent(ex);

activity?.AddEvent(exceptionEvent);

activity?.Dispose();
activity?.Stop();
}

return Task.CompletedTask;
Expand All @@ -97,11 +89,11 @@ private static void InjectTraceContextIntoBasicProperties(IMessageContext contex

private static void SetProducerTags(IMessageContext context, Activity activity)
{
activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, PublishString);
activity.SetTag(ActivityAccessor.AttributeMessagingOperation, ActivityOperationType.Publish.ToString().ToLower());
activity.SetTag(AttributeMessagingDestinationName, context?.ProducerContext.Topic);
activity.SetTag(AttributeMessagingKafkaDestinationPartition, context?.ProducerContext.Partition);
activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageKey, context?.Message.Key);
activity.SetTag(ActivitySourceAccessor.AttributeMessagingKafkaMessageOffset, context?.ProducerContext.Offset);
activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageKey, context?.Message.Key);
activity.SetTag(ActivityAccessor.AttributeMessagingKafkaMessageOffset, context?.ProducerContext.Offset);
}
}
}
17 changes: 17 additions & 0 deletions src/KafkaFlow/ActivityFactory.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
using System.Diagnostics;

namespace KafkaFlow
{
internal static class ActivityFactory
{
public static Activity Start(string topicName, ActivityOperationType activityOperationType, ActivityKind activityKind)
{
var activityName = !string.IsNullOrEmpty(topicName) ? $"{topicName} {activityOperationType}" : activityOperationType.ToString().ToLower();

// Start an activity with a name following the semantic convention of the OpenTelemetry messaging specification.
// The convention also defines a set of attributes (in .NET they are mapped as `tags`) to be populated in the activity.
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md
return ActivitySourceAccessor.ActivitySource.StartActivity(activityName, activityKind);
}
}
}
5 changes: 5 additions & 0 deletions src/KafkaFlow/Consumers/ConsumerWorker.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
namespace KafkaFlow.Consumers
{
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
Expand Down Expand Up @@ -106,6 +107,8 @@ public void OnTaskCompleted(Action handler)

private async Task ProcessMessageAsync(ConsumeResult<byte[], byte[]> message, CancellationToken cancellationToken)
{
var activity = ActivityFactory.Start(message.Topic, ActivityOperationType.Process, ActivityKind.Consumer);

try
{
var context = new MessageContext(
Expand All @@ -119,6 +122,8 @@ private async Task ProcessMessageAsync(ConsumeResult<byte[], byte[]> message, Ca
this.Id),
null);

context.Items.Add(ActivitySourceAccessor.ActivityString, activity);

try
{
var scope = this.dependencyResolver.CreateScope();
Expand Down
1 change: 1 addition & 0 deletions src/KafkaFlow/KafkaFlow.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.1.1" />
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="6.0.1" />
<PackageReference Include="System.Threading.Channels" Version="4.7.1" />
</ItemGroup>

Expand Down
Loading

0 comments on commit 8172b54

Please sign in to comment.