Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt OpenTelemetry to KafkaFlow 3.0 #460

Merged
merged 1 commit into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/KafkaFlow.Abstractions/IMessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ public interface IMessageContext
/// </summary>
IDependencyResolver DependencyResolver { get; }

/// <summary>
/// Gets the brokers associated with the message
/// </summary>
public IReadOnlyCollection<string> Brokers { get; }

/// <summary>
/// Creates a new <see cref="IMessageContext"/> with the new message
/// </summary>
Expand Down
4 changes: 3 additions & 1 deletion src/KafkaFlow.OpenTelemetry/ActivitySourceAccessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace KafkaFlow.OpenTelemetry
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using Conventions = SemanticConventions::OpenTelemetry.Trace.TraceSemanticConventions;

Expand All @@ -21,9 +22,10 @@ internal static class ActivitySourceAccessor
internal static readonly string Version = Assembly.GetExecutingAssembly().GetName().Version.ToString();
internal static readonly ActivitySource ActivitySource = new(ActivitySourceName, Version);

public static void SetGenericTags(Activity activity)
public static void SetGenericTags(Activity activity, IEnumerable<string> bootstrapServers)
ailtonguitar marked this conversation as resolved.
Show resolved Hide resolved
{
activity?.SetTag(Conventions.AttributeMessagingSystem, MessagingSystemId);
activity?.SetTag(Conventions.AttributePeerService, string.Join(",", bootstrapServers ?? Enumerable.Empty<string>()));
}

public static ActivityEvent CreateExceptionEvent(Exception exception)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static Task OnConsumeStarted(IMessageContext context)

context?.Items.Add(ActivitySourceAccessor.ActivityString, activity);

ActivitySourceAccessor.SetGenericTags(activity);
ActivitySourceAccessor.SetGenericTags(activity, context?.Brokers);

if (activity != null && activity.IsAllDataRequested)
{
Expand Down Expand Up @@ -84,7 +84,7 @@ private static IEnumerable<string> ExtractTraceContextIntoBasicProperties(IMessa

private static void SetConsumerTags(IMessageContext context, Activity activity)
{
var messageKey = Encoding.UTF8.GetString(context.Message.Key as byte[]);
string messageKey = context.Message.Key != null ? Encoding.UTF8.GetString(context.Message.Key as byte[]) : string.Empty;

activity.SetTag(ActivitySourceAccessor.AttributeMessagingOperation, ProcessString);
activity.SetTag(AttributeMessagingSourceName, context.ConsumerContext.Topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static Task OnProducerStarted(IMessageContext context)
// Inject the ActivityContext into the message headers to propagate trace context to the receiving service.
Propagator.Inject(new PropagationContext(contextToInject, Baggage.Current), context, InjectTraceContextIntoBasicProperties);

ActivitySourceAccessor.SetGenericTags(activity);
ActivitySourceAccessor.SetGenericTags(activity, context?.Brokers);

if (activity != null && activity.IsAllDataRequested)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
namespace KafkaFlow.UnitTests.BatchConsume
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using AutoFixture;
using FluentAssertions;
using KafkaFlow.Batching;
using KafkaFlow.Configuration;
Expand All @@ -16,6 +18,7 @@ public class BatchConsumeMiddlewareTests

private readonly TimeSpan batchTimeout = TimeSpan.FromMilliseconds(1000);
private readonly TimeSpan waitForTaskExecution = TimeSpan.FromMilliseconds(100);
private readonly Fixture fixture = new();

private Mock<ILogHandler> logHandlerMock;

Expand All @@ -37,6 +40,10 @@ public void Setup()
var consumerMock = new Mock<IConsumer>();
var consumerConfigurationMock = new Mock<IConsumerConfiguration>();

var clusterConfig = this.fixture.Create<ClusterConfiguration>();

consumerConfigurationMock.SetupGet(x => x.ClusterConfiguration).Returns(clusterConfig);

middlewareContextMock
.SetupGet(x => x.Worker)
.Returns(workerMock.Object);
Expand Down
6 changes: 5 additions & 1 deletion src/KafkaFlow/Batching/BatchConsumeMessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ internal class BatchConsumeMessageContext : IMessageContext, IDisposable

public BatchConsumeMessageContext(
IConsumerContext consumer,
IReadOnlyCollection<IMessageContext> batchMessage)
IReadOnlyCollection<IMessageContext> batchMessage,
IReadOnlyCollection<string> brokers)
{
this.ConsumerContext = consumer;
this.Message = new Message(null, batchMessage);
this.batchDependencyScope = consumer.WorkerDependencyResolver.CreateScope();
this.Items = new Dictionary<string, object>();
this.Brokers = brokers;
}

public Message Message { get; }
Expand All @@ -29,6 +31,8 @@ public BatchConsumeMessageContext(

public IDictionary<string, object> Items { get; }

public IReadOnlyCollection<string> Brokers { get; }

public IMessageContext SetMessage(object key, object value) =>
throw new NotSupportedException($"{nameof(BatchConsumeMessageContext)} does not allow to change the message");

Expand Down
2 changes: 1 addition & 1 deletion src/KafkaFlow/Batching/BatchConsumeMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private async Task DispatchAsync(IMessageContext context, MiddlewareDelegate nex
return;
}

var batchContext = new BatchConsumeMessageContext(context.ConsumerContext, localBatch);
var batchContext = new BatchConsumeMessageContext(context.ConsumerContext, localBatch, this.consumerConfiguration.ClusterConfiguration.Brokers);

await next(batchContext).ConfigureAwait(false);
}
Expand Down
13 changes: 11 additions & 2 deletions src/KafkaFlow/Consumers/ConsumerWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,20 @@ private async Task ProcessMessageAsync(IMessageContext context, CancellationToke
{
await this.globalEvents.FireMessageConsumeStartedAsync(new MessageEventContext(context));

_= context.ConsumerContext.Completion.ContinueWith(
async task =>
{
if (task.IsFaulted)
{
await this.globalEvents.FireMessageConsumeErrorAsync(new MessageErrorEventContext(context, task.Exception));
}

await this.globalEvents.FireMessageConsumeCompletedAsync(new MessageEventContext(context));
});

await this.middlewareExecutor
.Execute(context, _ => Task.CompletedTask)
.ConfigureAwait(false);

await this.globalEvents.FireMessageConsumeCompletedAsync(new MessageEventContext(context));
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
Expand Down
3 changes: 2 additions & 1 deletion src/KafkaFlow/Consumers/ConsumerWorkerPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ private MessageContext CreateMessageContext(ConsumeResult<byte[], byte[]> messag
worker,
messageDependencyScope,
this.consumerDependencyResolver),
null);
null,
this.consumer.Configuration.ClusterConfiguration.Brokers);
return context;
}
}
Expand Down
9 changes: 7 additions & 2 deletions src/KafkaFlow/MessageContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,16 @@ public MessageContext(
IMessageHeaders headers,
IDependencyResolver dependencyResolver,
IConsumerContext consumer,
IProducerContext producer)
IProducerContext producer,
IReadOnlyCollection<string> brokers)
{
this.Message = message;
this.DependencyResolver = dependencyResolver;
this.Headers = headers ?? new MessageHeaders();
this.ConsumerContext = consumer;
this.ProducerContext = producer;
this.Items = new Dictionary<string, object>();
this.Brokers = brokers;
}

public Message Message { get; }
Expand All @@ -31,11 +33,14 @@ public MessageContext(

public IDictionary<string, object> Items { get; }

public IReadOnlyCollection<string> Brokers { get; }

public IMessageContext SetMessage(object key, object value) => new MessageContext(
new Message(key, value),
this.Headers,
this.DependencyResolver,
this.ConsumerContext,
this.ProducerContext);
this.ProducerContext,
this.Brokers);
}
}
3 changes: 2 additions & 1 deletion src/KafkaFlow/Producers/MessageProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ private MessageContext CreateMessageContext(
headers,
messageScopedResolver,
null,
new ProducerContext(topic, this.producerDependencyScope.Resolver));
new ProducerContext(topic, this.producerDependencyScope.Resolver),
this.configuration.Cluster.Brokers);
}
}
}
6 changes: 1 addition & 5 deletions website/docs/guides/global-events.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ sidebar_position: 9

# Global Events

In this section, we will delve into the concept of Global Events in KafkaFlow, which provides a mechanism to subscribe to various events that are triggered during the message production and consumption processes.
In this section, we will delve into the concept of Global Events in KafkaFlow, which provides a mechanism to subscribe to various events that are triggered during the message production and consumption processes.

KafkaFlow offers a range of Global Events that can be subscribed to. These events can be used to monitor and react to different stages of message handling. Below is a list of available events:
- [Message Produce Started Event](#message-produce-started-event)
Expand Down Expand Up @@ -82,10 +82,6 @@ services.AddKafka(

The Message Consume Completed Event signals the successful completion of message consumption. By subscribing to this event, you can track when messages have been successfully processed.

:::info
Please note that the current event is not compatible with Batch Consume in the current version (v2). However, this limitation is expected to be addressed in future releases (v3+).
:::info

```csharp
services.AddKafka(
kafka => kafka
Expand Down
Loading