Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: upgrade exception events for OpenTelemetry #476

Merged
merged 1 commit into from
Nov 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using KafkaFlow.IntegrationTests.Core.Handlers;
using KafkaFlow.IntegrationTests.Core.Middlewares;
using KafkaFlow.IntegrationTests.Core.Producers;
using KafkaFlow.OpenTelemetry;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
Expand Down Expand Up @@ -45,7 +46,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans
MessageStorage.Clear();

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource("KafkaFlow.OpenTelemetry")
.AddSource(KafkaFlowInstrumentation.ActivitySourceName)
.AddInMemoryExporter(this.exportedItems)
.Build();

Expand Down Expand Up @@ -78,7 +79,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp
var baggageValue2 = "TestBaggageValue2";

using var tracerProvider = Sdk.CreateTracerProviderBuilder()
.AddSource("KafkaFlow.OpenTelemetry")
.AddSource(KafkaFlowInstrumentation.ActivitySourceName)
.AddSource(kafkaFlowTestString)
.AddInMemoryExporter(this.exportedItems)
.Build();
Expand Down
24 changes: 3 additions & 21 deletions src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,22 @@

namespace KafkaFlow.OpenTelemetry
{
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Reflection;
using Conventions = SemanticConventions::OpenTelemetry.Trace.TraceSemanticConventions;

internal static class ActivitySourceAccessor
{
internal const string ActivityString = "otel_activity";
internal const string ExceptionEventKey = "exception";
internal const string MessagingSystemId = "kafka";
internal const string AttributeMessagingOperation = "messaging.operation";
internal const string AttributeMessagingKafkaMessageKey = "messaging.kafka.message.key";
internal const string AttributeMessagingKafkaMessageOffset = "messaging.kafka.message.offset";
internal static readonly AssemblyName AssemblyName = typeof(ActivitySourceAccessor).Assembly.GetName();
internal static readonly string ActivitySourceName = AssemblyName.Name;
internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString();
internal static readonly ActivitySource ActivitySource = new(ActivitySourceName, Version);

public static void SetGenericTags(Activity activity)
{
activity?.SetTag(Conventions.AttributeMessagingSystem, MessagingSystemId);
}
internal static readonly ActivitySource ActivitySource = new(KafkaFlowInstrumentation.ActivitySourceName, KafkaFlowInstrumentation.Version);

public static ActivityEvent CreateExceptionEvent(Exception exception)
internal static void SetGenericTags(Activity activity)
{
var activityTagCollection = new ActivityTagsCollection(
new[]
{
new KeyValuePair<string, object>(Conventions.AttributeExceptionMessage, exception.Message),
new KeyValuePair<string, object>(Conventions.AttributeExceptionStacktrace, exception.StackTrace),
});

return new ActivityEvent(ExceptionEventKey, DateTimeOffset.UtcNow, activityTagCollection);
activity?.SetTag(Conventions.AttributeMessagingSystem, MessagingSystemId);
}
}
}
19 changes: 19 additions & 0 deletions src/KafkaFlow.OpenTelemetry/KafkaFlowInstrumentation.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
namespace KafkaFlow.OpenTelemetry
{
using System.Reflection;

/// <summary>
/// KafkaFlow OTEL instrumentation properties
/// </summary>
public static class KafkaFlowInstrumentation
{
internal static readonly AssemblyName AssemblyName = typeof(KafkaFlowInstrumentation).Assembly.GetName();
internal static readonly string Version = AssemblyName.Version.ToString();

/// <summary>
/// ActivitySource name to be used when adding
/// KafkaFlow as source to an OTEL listener
/// </summary>
public static readonly string ActivitySourceName = AssemblyName.Name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading.Tasks;
using global::OpenTelemetry;
using global::OpenTelemetry.Context.Propagation;
using global::OpenTelemetry.Trace;

internal static class OpenTelemetryConsumerEventsHandler
{
Expand Down Expand Up @@ -67,9 +68,8 @@ public static Task OnConsumeError(IMessageContext context, Exception ex)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
var exceptionEvent = ActivitySourceAccessor.CreateExceptionEvent(ex);

activity?.AddEvent(exceptionEvent);
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
activity?.RecordException(ex);

activity?.Dispose();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading.Tasks;
using global::OpenTelemetry;
using global::OpenTelemetry.Context.Propagation;
using global::OpenTelemetry.Trace;

internal static class OpenTelemetryProducerEventsHandler
{
Expand Down Expand Up @@ -77,9 +78,8 @@ public static Task OnProducerError(IMessageContext context, Exception ex)
{
if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity)
{
var exceptionEvent = ActivitySourceAccessor.CreateExceptionEvent(ex);

activity?.AddEvent(exceptionEvent);
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
activity?.RecordException(ex);

activity?.Dispose();
}
Expand Down
Loading