diff --git a/src/KafkaFlow.Abstractions/Consumers/IWorker.cs b/src/KafkaFlow.Abstractions/Consumers/IWorker.cs index a90dc7209..f54a374ac 100644 --- a/src/KafkaFlow.Abstractions/Consumers/IWorker.cs +++ b/src/KafkaFlow.Abstractions/Consumers/IWorker.cs @@ -1,8 +1,5 @@ namespace KafkaFlow { - using System; - using KafkaFlow.Observer; - /// /// Represents the interface of a internal worker /// @@ -14,19 +11,18 @@ public interface IWorker int Id { get; } /// - /// This handler is called immediately after a worker completes the consumption of a message + /// Gets the subject for worker stopping events where observers can subscribe to receive notifications. /// - /// to be executed - void OnTaskCompleted(Action handler); + IEvent WorkerStopping { get; } /// - /// Gets the subject for worker stopping events where observers can subscribe to receive notifications. + /// Gets the subject for worker stopped events where observers can subscribe to receive notifications. /// - ISubject WorkerStopping { get; } + IEvent WorkerStopped { get; } /// - /// Gets the subject for worker stopped events where observers can subscribe to receive notifications. + /// Gets the subject for worker consumption completed events where observers can subscribe to receive notifications. /// - ISubject WorkerStopped { get; } + IEvent WorkerProcessingEnded { get; } } } diff --git a/src/KafkaFlow.Abstractions/Consumers/WorkerStoppedSubject.cs b/src/KafkaFlow.Abstractions/Consumers/WorkerStoppedSubject.cs deleted file mode 100644 index 4d92d1908..000000000 --- a/src/KafkaFlow.Abstractions/Consumers/WorkerStoppedSubject.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace KafkaFlow -{ - using KafkaFlow.Observer; - - /// - /// Represents a subject specific to worker stopped events where observers can subscribe to receive notifications. - /// - public class WorkerStoppedSubject : Subject - { - /// - /// Initializes a new instance of the class. - /// - /// The log handler object to be used - public WorkerStoppedSubject(ILogHandler logHandler) - : base(logHandler) - { - } - } -} diff --git a/src/KafkaFlow.Abstractions/Consumers/WorkerStoppingSubject.cs b/src/KafkaFlow.Abstractions/Consumers/WorkerStoppingSubject.cs deleted file mode 100644 index 816ce5f5f..000000000 --- a/src/KafkaFlow.Abstractions/Consumers/WorkerStoppingSubject.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace KafkaFlow -{ - using KafkaFlow.Observer; - - /// - /// Represents a subject specific to worker stopping events where observers can subscribe to receive notifications. - /// - public class WorkerStoppingSubject : Subject - { - /// - /// Initializes a new instance of the class. - /// - /// The log handler object to be used - public WorkerStoppingSubject(ILogHandler logHandler) - : base(logHandler) - { - } - } -} diff --git a/src/KafkaFlow.Abstractions/Observer/ISubject.cs b/src/KafkaFlow.Abstractions/Observer/ISubject.cs deleted file mode 100644 index 669d24591..000000000 --- a/src/KafkaFlow.Abstractions/Observer/ISubject.cs +++ /dev/null @@ -1,17 +0,0 @@ -namespace KafkaFlow.Observer -{ - /// - /// Represents a subject in the observer design pattern that can be observed by observers. - /// - /// The type of the subject. - /// An argument type that will be passed to the observers - public interface ISubject - where TSubject : Subject - { - /// - /// Subscribes an observer to the subject. - /// - /// The observer to subscribe. - void Subscribe(ISubjectObserver observer); - } -} diff --git a/src/KafkaFlow.Abstractions/Observer/ISubjectObserver.cs b/src/KafkaFlow.Abstractions/Observer/ISubjectObserver.cs deleted file mode 100644 index 223cd863f..000000000 --- a/src/KafkaFlow.Abstractions/Observer/ISubjectObserver.cs +++ /dev/null @@ -1,18 +0,0 @@ -namespace KafkaFlow.Observer -{ - using System.Threading.Tasks; - - /// - /// Represents an observer in the observer design pattern that can receive notifications from a subject. - /// - /// The type of the subject. - /// An argument type that will be passed to the observers - public interface ISubjectObserver - { - /// - /// Called when a notification is received from the subject. - /// - /// A task representing the asynchronous notification handling. - Task OnNotification(TSubject subject, TArg arg); - } -} diff --git a/src/KafkaFlow.Abstractions/Observer/Subject.cs b/src/KafkaFlow.Abstractions/Observer/Subject.cs deleted file mode 100644 index 31fb1bfe9..000000000 --- a/src/KafkaFlow.Abstractions/Observer/Subject.cs +++ /dev/null @@ -1,53 +0,0 @@ -namespace KafkaFlow.Observer -{ - using System; - using System.Collections.Generic; - using System.Threading.Tasks; - - /// - /// A generic implementation that should be extended to help the use of the notification system. - /// - /// The type of the subject. - /// An argument type that will be passed to the observers - public abstract class Subject : ISubject - where TSubject : Subject - { - private readonly ILogHandler logHandler; - private readonly List> observers = new(); - - /// - /// Initializes a new instance of the class. - /// - /// The log handler object to be used - protected Subject(ILogHandler logHandler) - { - this.logHandler = logHandler; - } - - /// - /// Subscribes an observer to the subject, allowing it to receive notifications. - /// - /// The observer to subscribe. - public void Subscribe(ISubjectObserver observer) => this.observers.Add(observer); - - /// - /// Notifies all subscribed observers asynchronously. - /// - /// The parameter passed by the client. - /// A task representing the asynchronous notification operation. - public async Task NotifyAsync(TArg arg) - { - foreach (var observer in this.observers) - { - try - { - await observer.OnNotification((TSubject)this, arg); - } - catch (Exception e) - { - this.logHandler.Error("Error notifying observer", e, new { Subject = this.GetType().Name }); - } - } - } - } -} diff --git a/src/KafkaFlow.Abstractions/VoidObject.cs b/src/KafkaFlow.Abstractions/VoidObject.cs deleted file mode 100644 index d59912d4c..000000000 --- a/src/KafkaFlow.Abstractions/VoidObject.cs +++ /dev/null @@ -1,17 +0,0 @@ -namespace KafkaFlow; - -/// -/// A type that represents an empty object that should be ignored -/// -public class VoidObject -{ - /// - /// Gets the unique instance value - /// - public static readonly VoidObject Value = new(); - - private VoidObject() - { - // Empty - } -} diff --git a/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs b/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs index 70cc87530..a6f0621df 100644 --- a/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs +++ b/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs @@ -7,12 +7,8 @@ using System.Threading.Tasks; using KafkaFlow.Configuration; using KafkaFlow.Consumers; - using KafkaFlow.Observer; - internal class BatchConsumeMiddleware - : IMessageMiddleware, - ISubjectObserver, - IDisposable + internal class BatchConsumeMiddleware : IMessageMiddleware, IDisposable { private readonly SemaphoreSlim dispatchSemaphore = new(1, 1); @@ -37,7 +33,7 @@ public BatchConsumeMiddleware( this.batch = new(batchSize); this.consumerConfiguration = middlewareContext.Consumer.Configuration; - middlewareContext.Worker.WorkerStopped.Subscribe(this); + middlewareContext.Worker.WorkerStopped.Subscribe(() => this.TriggerDispatchAndWaitAsync()); } public async Task Invoke(IMessageContext context, MiddlewareDelegate next) @@ -67,8 +63,6 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next) } } - public async Task OnNotification(WorkerStoppedSubject subject, VoidObject arg) => await this.TriggerDispatchAndWaitAsync(); - public void Dispose() { this.dispatchTask?.Dispose(); diff --git a/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs b/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs index e55e85ae8..2269db974 100644 --- a/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs @@ -51,7 +51,7 @@ public void Setup() workerMock .SetupGet(x => x.WorkerStopped) - .Returns(new WorkerStoppedSubject(this.logHandlerMock.Object)); + .Returns(new Event(this.logHandlerMock.Object)); consumerConfigurationMock .SetupGet(x => x.AutoMessageCompletion) diff --git a/src/KafkaFlow.UnitTests/EventTests.cs b/src/KafkaFlow.UnitTests/EventTests.cs new file mode 100644 index 000000000..53aabdee0 --- /dev/null +++ b/src/KafkaFlow.UnitTests/EventTests.cs @@ -0,0 +1,204 @@ +namespace KafkaFlow.UnitTests +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Threading.Tasks; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + + [TestClass] + public class EventTests + { + private readonly Event target; + private readonly Event typedTarget; + + public EventTests() + { + var log = new Mock(); + this.target = new Event(log.Object); + this.typedTarget = new Event(log.Object); + } + + [TestMethod] + public async Task FireAsync_EventSubscribed_CallDelegateWithSuccess() + { + // Arrange + var numberOfCalls = 0; + + this.target.Subscribe(() => + { + numberOfCalls++; + return Task.CompletedTask; + }); + + // Act + await this.target.FireAsync(); + + // Assert + Assert.AreEqual(1, numberOfCalls); + } + + [TestMethod] + public async Task FireAsync_EventWithMultipleObservers_CallAllDelegatesWithSuccess() + { + // Arrange + var numberOfCalls = 0; + + this.target.Subscribe(() => + { + numberOfCalls++; + return Task.CompletedTask; + }); + + this.target.Subscribe(() => + { + numberOfCalls++; + return Task.CompletedTask; + }); + + // Act + await this.target.FireAsync(); + + // Assert + Assert.AreEqual(2, numberOfCalls); + } + + [TestMethod] + public async Task FireAsync_EventWithMultipleObserversAndErrors_CallAllDelegatesAndContinueWithoutErrors() + { + // Arrange + var numberOfCalls = 0; + + this.target.Subscribe(() => throw new NotImplementedException()); + + this.target.Subscribe(() => + { + numberOfCalls++; + return Task.CompletedTask; + }); + + // Act + await this.target.FireAsync(); + + // Assert + Assert.AreEqual(1, numberOfCalls); + } + + [TestMethod] + public async Task FireAsync_EventSubscribedWithArgument_CallDelegateWithSuccess() + { + // Arrange + var expectedArgument = Guid.NewGuid().ToString(); + var receivedArgument = string.Empty; + + this.typedTarget.Subscribe(arg => + { + receivedArgument = arg; + return Task.CompletedTask; + }); + + // Act + await this.typedTarget.FireAsync(expectedArgument); + + // Assert + Assert.AreEqual(expectedArgument, receivedArgument); + } + + [TestMethod] + public async Task FireAsync_EventWithMultipleObserversAndArgument_CallAllDelegatesWithSuccess() + { + // Arrange + var expectedArgument = Guid.NewGuid().ToString(); + var receivedArguments = new List(); + + this.typedTarget.Subscribe(arg => + { + receivedArguments.Add(arg); + return Task.CompletedTask; + }); + + this.typedTarget.Subscribe(arg => + { + receivedArguments.Add(arg); + return Task.CompletedTask; + }); + + // Act + await this.typedTarget.FireAsync(expectedArgument); + + // Assert + Assert.AreEqual(2, receivedArguments.Count); + Assert.IsTrue(receivedArguments.All(x => x == expectedArgument)); + } + + [TestMethod] + public async Task FireAsync_TypedEventWithMultipleObserversAndErrors_CallAllDelegatesAndContinueWithoutErrors() + { + // Arrange + var numberOfCalls = 0; + + this.typedTarget.Subscribe(_ => throw new NotImplementedException()); + + this.typedTarget.Subscribe(_ => + { + numberOfCalls++; + return Task.CompletedTask; + }); + + // Act + await this.typedTarget.FireAsync(Guid.NewGuid().ToString()); + + // Assert + Assert.AreEqual(1, numberOfCalls); + } + + [TestMethod] + public async Task FireAsync_DuplicatedEventHandler_CallHandlerOnce() + { + // Arrange + var expectedArgument = Guid.NewGuid().ToString(); + var receivedArguments = new List(); + + Func handler = (arg) => + { + receivedArguments.Add(arg); + return Task.CompletedTask; + }; + + this.typedTarget.Subscribe(handler); + this.typedTarget.Subscribe(handler); + + // Act + await this.typedTarget.FireAsync(expectedArgument); + + // Assert + Assert.AreEqual(1, receivedArguments.Count); + Assert.IsTrue(receivedArguments.All(x => x == expectedArgument)); + } + + [TestMethod] + public async Task FireAsync_UnsubscribeEventHandler_DoesNotCallHandler() + { + // Arrange + var expectedArgument = Guid.NewGuid().ToString(); + var receivedArguments = new List(); + + Func handler = (arg) => + { + receivedArguments.Add(arg); + return Task.CompletedTask; + }; + + var subscription = this.typedTarget.Subscribe(handler); + + subscription.Cancel(); + + // Act + await this.typedTarget.FireAsync(expectedArgument); + + // Assert + Assert.AreEqual(0, receivedArguments.Count); + } + } +} diff --git a/src/KafkaFlow/ConsumerManagerFactory.cs b/src/KafkaFlow/ConsumerManagerFactory.cs index a3c752bc2..33bce7adb 100644 --- a/src/KafkaFlow/ConsumerManagerFactory.cs +++ b/src/KafkaFlow/ConsumerManagerFactory.cs @@ -22,7 +22,7 @@ public IConsumerManager Create(IConsumerConfiguration configuration, IDependency configuration, logHandler); - consumerWorkerPool.WorkerPoolStopped.Subscribe(middlewareExecutor); + consumerWorkerPool.WorkerPoolStopped.Subscribe(() => middlewareExecutor.OnWorkerPoolStopped()); var feeder = new WorkerPoolFeeder( consumer, diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs index 81ade8577..ea0dfa6c1 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs @@ -4,7 +4,6 @@ namespace KafkaFlow.Consumers using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; - using KafkaFlow.Observer; internal class ConsumerWorker : IConsumerWorker { @@ -16,12 +15,12 @@ internal class ConsumerWorker : IConsumerWorker private readonly Channel messagesBuffer; - private readonly WorkerStoppingSubject workerStoppingSubject; - private readonly WorkerStoppedSubject workerStoppedSubject; + private readonly Event workerStoppingEvent; + private readonly Event workerStoppedEvent; + private readonly Event workerProcessingEnded; private CancellationTokenSource stopCancellationTokenSource; private Task backgroundTask; - private Action onMessageFinishedHandler; public ConsumerWorker( IConsumer consumer, @@ -38,8 +37,9 @@ public ConsumerWorker( this.messagesBuffer = Channel.CreateBounded(consumer.Configuration.BufferSize); this.globalEvents = consumerDependencyResolver.Resolve(); - this.workerStoppingSubject = new(logHandler); - this.workerStoppedSubject = new(logHandler); + this.workerStoppingEvent = new(logHandler); + this.workerStoppedEvent = new(logHandler); + this.workerProcessingEnded = new Event(logHandler); var middlewareContext = this.workerDependencyResolverScope.Resolver.Resolve(); @@ -53,9 +53,11 @@ public ConsumerWorker( public IDependencyResolver WorkerDependencyResolver => this.workerDependencyResolverScope.Resolver; - public ISubject WorkerStopping => this.workerStoppingSubject; + public IEvent WorkerStopping => this.workerStoppingEvent; - public ISubject WorkerStopped => this.workerStoppedSubject; + public IEvent WorkerStopped => this.workerStoppedEvent; + + public IEvent WorkerProcessingEnded => this.workerProcessingEnded; public ValueTask EnqueueAsync( IMessageContext context, @@ -100,7 +102,7 @@ public Task StartAsync() public async Task StopAsync() { - await this.workerStoppingSubject.NotifyAsync(VoidObject.Value); + await this.workerStoppingEvent.FireAsync(); this.messagesBuffer.Writer.TryComplete(); @@ -111,7 +113,7 @@ public async Task StopAsync() await this.backgroundTask.ConfigureAwait(false); - await this.workerStoppedSubject.NotifyAsync(VoidObject.Value); + await this.workerStoppedEvent.FireAsync(); } public void Dispose() @@ -121,11 +123,6 @@ public void Dispose() this.stopCancellationTokenSource.Dispose(); } - public void OnTaskCompleted(Action handler) - { - this.onMessageFinishedHandler = handler; - } - private async Task ProcessMessageAsync(IMessageContext context, CancellationToken cancellationToken) { try @@ -166,7 +163,7 @@ await this.middlewareExecutor context.ConsumerContext.Complete(); } - this.onMessageFinishedHandler?.Invoke(); + await this.workerProcessingEnded.FireAsync(context); } } catch (Exception ex) diff --git a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs index f82f494b9..5774d2c9e 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs @@ -7,7 +7,6 @@ namespace KafkaFlow.Consumers using System.Threading.Tasks; using Confluent.Kafka; using KafkaFlow.Configuration; - using KafkaFlow.Observer; internal class ConsumerWorkerPool : IConsumerWorkerPool { @@ -18,7 +17,7 @@ internal class ConsumerWorkerPool : IConsumerWorkerPool private readonly Factory distributionStrategyFactory; private readonly IOffsetCommitter offsetCommitter; - private readonly WorkerPoolStoppedSubject workerPoolStoppedSubject; + private readonly Event workerPoolStoppedSubject; private TaskCompletionSource startedTaskSource = new(); private List workers = new(); @@ -38,7 +37,7 @@ public ConsumerWorkerPool( this.middlewareExecutor = middlewareExecutor; this.logHandler = logHandler; this.distributionStrategyFactory = consumerConfiguration.DistributionStrategyFactory; - this.workerPoolStoppedSubject = new(logHandler); + this.workerPoolStoppedSubject = new Event(logHandler); this.offsetCommitter = consumer.Configuration.NoStoreOffsets ? new NullOffsetCommitter() : @@ -52,7 +51,7 @@ public ConsumerWorkerPool( public int CurrentWorkersCount { get; private set; } - public ISubject WorkerPoolStopped => this.workerPoolStoppedSubject; + public IEvent WorkerPoolStopped => this.workerPoolStoppedSubject; public async Task StartAsync(IReadOnlyCollection partitions, int workersCount) { @@ -121,7 +120,7 @@ public async Task StopAsync() this.offsetManager = null; - await this.workerPoolStoppedSubject.NotifyAsync(VoidObject.Value); + await this.workerPoolStoppedSubject.FireAsync(); await this.offsetCommitter.StopAsync(); } diff --git a/src/KafkaFlow/Consumers/DistributionStrategies/FreeWorkerDistributionStrategy.cs b/src/KafkaFlow/Consumers/DistributionStrategies/FreeWorkerDistributionStrategy.cs index 4db57f195..e9761ae90 100644 --- a/src/KafkaFlow/Consumers/DistributionStrategies/FreeWorkerDistributionStrategy.cs +++ b/src/KafkaFlow/Consumers/DistributionStrategies/FreeWorkerDistributionStrategy.cs @@ -18,7 +18,7 @@ public void Init(IReadOnlyList workers) { foreach (var worker in workers) { - worker.OnTaskCompleted(() => this.freeWorkers.Writer.WriteAsync(worker)); + worker.WorkerProcessingEnded.Subscribe(_ => Task.FromResult(this.freeWorkers.Writer.WriteAsync(worker))); this.freeWorkers.Writer.TryWrite(worker); } } diff --git a/src/KafkaFlow/Consumers/WorkerPoolStoppedSubject.cs b/src/KafkaFlow/Consumers/WorkerPoolStoppedSubject.cs deleted file mode 100644 index 0a3a5124c..000000000 --- a/src/KafkaFlow/Consumers/WorkerPoolStoppedSubject.cs +++ /dev/null @@ -1,12 +0,0 @@ -namespace KafkaFlow.Consumers -{ - using KafkaFlow.Observer; - - internal class WorkerPoolStoppedSubject : Subject - { - public WorkerPoolStoppedSubject(ILogHandler logHandler) - : base(logHandler) - { - } - } -} diff --git a/src/KafkaFlow/MiddlewareExecutor.cs b/src/KafkaFlow/MiddlewareExecutor.cs index b759d6c37..b2710ff9d 100644 --- a/src/KafkaFlow/MiddlewareExecutor.cs +++ b/src/KafkaFlow/MiddlewareExecutor.cs @@ -5,12 +5,8 @@ namespace KafkaFlow using System.Linq; using System.Threading.Tasks; using KafkaFlow.Configuration; - using KafkaFlow.Consumers; - using KafkaFlow.Observer; - internal class MiddlewareExecutor - : IMiddlewareExecutor, - ISubjectObserver + internal class MiddlewareExecutor : IMiddlewareExecutor { private readonly IReadOnlyList configurations; @@ -27,7 +23,7 @@ public Task Execute(IMessageContext context, Func nextOpe return this.ExecuteDefinition(0, context, nextOperation); } - public Task OnNotification(WorkerPoolStoppedSubject subject, VoidObject arg) + internal Task OnWorkerPoolStopped() { this.workersMiddlewares.Clear(); return Task.CompletedTask;