From c3bd62e66cfa0733e6fa89fb0f51a213db556818 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sim=C3=A3o=20Ribeiro?= Date: Tue, 21 Nov 2023 22:00:21 +0000 Subject: [PATCH] feat: upgrade exception tags in otel Change how the exceptions are being recorded in otel activities to be in accordance to the official documentation. Make KafkaFlow ActivitySource name public so the users of manual instrumentation can easialy refer to without having to type it. --- .../OpenTelemetryTests.cs | 5 ++-- .../ActivitySourceAccessor.cs | 24 +++---------------- .../KafkaFlowInstrumentation.cs | 19 +++++++++++++++ .../OpenTelemetryConsumerEventsHandler.cs | 6 ++--- .../OpenTelemetryProducerEventsHandler.cs | 6 ++--- 5 files changed, 31 insertions(+), 29 deletions(-) create mode 100644 src/KafkaFlow.OpenTelemetry/KafkaFlowInstrumentation.cs diff --git a/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs b/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs index 855083632..05a61f306 100644 --- a/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs +++ b/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs @@ -16,6 +16,7 @@ using KafkaFlow.IntegrationTests.Core.Handlers; using KafkaFlow.IntegrationTests.Core.Middlewares; using KafkaFlow.IntegrationTests.Core.Producers; + using KafkaFlow.OpenTelemetry; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -45,7 +46,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans MessageStorage.Clear(); using var tracerProvider = Sdk.CreateTracerProviderBuilder() - .AddSource("KafkaFlow.OpenTelemetry") + .AddSource(KafkaFlowInstrumentation.ActivitySourceName) .AddInMemoryExporter(this.exportedItems) .Build(); @@ -78,7 +79,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp var baggageValue2 = "TestBaggageValue2"; using var tracerProvider = Sdk.CreateTracerProviderBuilder() - .AddSource("KafkaFlow.OpenTelemetry") + .AddSource(KafkaFlowInstrumentation.ActivitySourceName) .AddSource(kafkaFlowTestString) .AddInMemoryExporter(this.exportedItems) .Build(); diff --git a/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs b/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs index 9a965dee5..66c801536 100644 --- a/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs +++ b/src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs @@ -2,40 +2,22 @@ namespace KafkaFlow.OpenTelemetry { - using System; - using System.Collections.Generic; using System.Diagnostics; - using System.Reflection; using Conventions = SemanticConventions::OpenTelemetry.Trace.TraceSemanticConventions; internal static class ActivitySourceAccessor { internal const string ActivityString = "otel_activity"; - internal const string ExceptionEventKey = "exception"; internal const string MessagingSystemId = "kafka"; internal const string AttributeMessagingOperation = "messaging.operation"; internal const string AttributeMessagingKafkaMessageKey = "messaging.kafka.message.key"; internal const string AttributeMessagingKafkaMessageOffset = "messaging.kafka.message.offset"; - internal static readonly AssemblyName AssemblyName = typeof(ActivitySourceAccessor).Assembly.GetName(); - internal static readonly string ActivitySourceName = AssemblyName.Name; - internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString(); - internal static readonly ActivitySource ActivitySource = new(ActivitySourceName, Version); - public static void SetGenericTags(Activity activity) - { - activity?.SetTag(Conventions.AttributeMessagingSystem, MessagingSystemId); - } + internal static readonly ActivitySource ActivitySource = new(KafkaFlowInstrumentation.ActivitySourceName, KafkaFlowInstrumentation.Version); - public static ActivityEvent CreateExceptionEvent(Exception exception) + internal static void SetGenericTags(Activity activity) { - var activityTagCollection = new ActivityTagsCollection( - new[] - { - new KeyValuePair(Conventions.AttributeExceptionMessage, exception.Message), - new KeyValuePair(Conventions.AttributeExceptionStacktrace, exception.StackTrace), - }); - - return new ActivityEvent(ExceptionEventKey, DateTimeOffset.UtcNow, activityTagCollection); + activity?.SetTag(Conventions.AttributeMessagingSystem, MessagingSystemId); } } } diff --git a/src/KafkaFlow.OpenTelemetry/KafkaFlowInstrumentation.cs b/src/KafkaFlow.OpenTelemetry/KafkaFlowInstrumentation.cs new file mode 100644 index 000000000..a0096cc9c --- /dev/null +++ b/src/KafkaFlow.OpenTelemetry/KafkaFlowInstrumentation.cs @@ -0,0 +1,19 @@ +namespace KafkaFlow.OpenTelemetry +{ + using System.Reflection; + + /// + /// KafkaFlow OTEL instrumentation properties + /// + public static class KafkaFlowInstrumentation + { + internal static readonly AssemblyName AssemblyName = typeof(KafkaFlowInstrumentation).Assembly.GetName(); + internal static readonly string Version = AssemblyName.Version.ToString(); + + /// + /// ActivitySource name to be used when adding + /// KafkaFlow as source to an OTEL listener + /// + public static readonly string ActivitySourceName = AssemblyName.Name; + } +} diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs index 10fa794e9..cea4febc2 100644 --- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryConsumerEventsHandler.cs @@ -7,6 +7,7 @@ using System.Threading.Tasks; using global::OpenTelemetry; using global::OpenTelemetry.Context.Propagation; + using global::OpenTelemetry.Trace; internal static class OpenTelemetryConsumerEventsHandler { @@ -67,9 +68,8 @@ public static Task OnConsumeError(IMessageContext context, Exception ex) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { - var exceptionEvent = ActivitySourceAccessor.CreateExceptionEvent(ex); - - activity?.AddEvent(exceptionEvent); + activity?.SetStatus(ActivityStatusCode.Error, ex.Message); + activity?.RecordException(ex); activity?.Dispose(); } diff --git a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs index 3630157c1..403e257d1 100644 --- a/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs +++ b/src/KafkaFlow.OpenTelemetry/OpenTelemetryProducerEventsHandler.cs @@ -7,6 +7,7 @@ using System.Threading.Tasks; using global::OpenTelemetry; using global::OpenTelemetry.Context.Propagation; + using global::OpenTelemetry.Trace; internal static class OpenTelemetryProducerEventsHandler { @@ -77,9 +78,8 @@ public static Task OnProducerError(IMessageContext context, Exception ex) { if (context.Items.TryGetValue(ActivitySourceAccessor.ActivityString, out var value) && value is Activity activity) { - var exceptionEvent = ActivitySourceAccessor.CreateExceptionEvent(ex); - - activity?.AddEvent(exceptionEvent); + activity?.SetStatus(ActivityStatusCode.Error, ex.Message); + activity?.RecordException(ex); activity?.Dispose(); }