From 767435a3df26fd5013d86c93da173355f1de7752 Mon Sep 17 00:00:00 2001 From: Ailton Silva Date: Fri, 29 Sep 2023 11:12:10 -0300 Subject: [PATCH] feat: add event notification feature --- .../Consumers/IWorker.cs | 6 +- .../Consumers/WorkerStoppedSubject.cs | 19 -- .../Consumers/WorkerStoppingSubject.cs | 19 -- src/KafkaFlow.Abstractions/IEvent.cs | 30 +++ .../Observer/ISubject.cs | 17 -- .../Observer/ISubjectObserver.cs | 18 -- .../Observer/Subject.cs | 53 ------ src/KafkaFlow.Abstractions/VoidObject.cs | 17 -- .../BatchConsumeMiddleware.cs | 10 +- .../BatchConsumeMiddlewareTests.cs | 2 +- src/KafkaFlow.UnitTests/EventTests.cs | 171 ++++++++++++++++++ src/KafkaFlow/ConsumerManagerFactory.cs | 2 +- src/KafkaFlow/Consumers/ConsumerWorker.cs | 13 +- src/KafkaFlow/Consumers/ConsumerWorkerPool.cs | 9 +- .../Consumers/WorkerPoolStoppedSubject.cs | 22 +-- src/KafkaFlow/Event.cs | 63 +++++++ src/KafkaFlow/MiddlewareExecutor.cs | 8 +- 17 files changed, 293 insertions(+), 186 deletions(-) delete mode 100644 src/KafkaFlow.Abstractions/Consumers/WorkerStoppedSubject.cs delete mode 100644 src/KafkaFlow.Abstractions/Consumers/WorkerStoppingSubject.cs create mode 100644 src/KafkaFlow.Abstractions/IEvent.cs delete mode 100644 src/KafkaFlow.Abstractions/Observer/ISubject.cs delete mode 100644 src/KafkaFlow.Abstractions/Observer/ISubjectObserver.cs delete mode 100644 src/KafkaFlow.Abstractions/Observer/Subject.cs delete mode 100644 src/KafkaFlow.Abstractions/VoidObject.cs create mode 100644 src/KafkaFlow.UnitTests/EventTests.cs create mode 100644 src/KafkaFlow/Event.cs diff --git a/src/KafkaFlow.Abstractions/Consumers/IWorker.cs b/src/KafkaFlow.Abstractions/Consumers/IWorker.cs index 482218597..bbcbb91c4 100644 --- a/src/KafkaFlow.Abstractions/Consumers/IWorker.cs +++ b/src/KafkaFlow.Abstractions/Consumers/IWorker.cs @@ -1,8 +1,6 @@ namespace KafkaFlow { using System; - using KafkaFlow.Configuration; - using KafkaFlow.Observer; /// /// Represents the interface of a internal worker @@ -23,11 +21,11 @@ public interface IWorker /// /// Gets the subject for worker stopping events where observers can subscribe to receive notifications. /// - ISubject WorkerStopping { get; } + IEvent WorkerStopping { get; } /// /// Gets the subject for worker stopped events where observers can subscribe to receive notifications. /// - ISubject WorkerStopped { get; } + IEvent WorkerStopped { get; } } } diff --git a/src/KafkaFlow.Abstractions/Consumers/WorkerStoppedSubject.cs b/src/KafkaFlow.Abstractions/Consumers/WorkerStoppedSubject.cs deleted file mode 100644 index 4d92d1908..000000000 --- a/src/KafkaFlow.Abstractions/Consumers/WorkerStoppedSubject.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace KafkaFlow -{ - using KafkaFlow.Observer; - - /// - /// Represents a subject specific to worker stopped events where observers can subscribe to receive notifications. - /// - public class WorkerStoppedSubject : Subject - { - /// - /// Initializes a new instance of the class. - /// - /// The log handler object to be used - public WorkerStoppedSubject(ILogHandler logHandler) - : base(logHandler) - { - } - } -} diff --git a/src/KafkaFlow.Abstractions/Consumers/WorkerStoppingSubject.cs b/src/KafkaFlow.Abstractions/Consumers/WorkerStoppingSubject.cs deleted file mode 100644 index 816ce5f5f..000000000 --- a/src/KafkaFlow.Abstractions/Consumers/WorkerStoppingSubject.cs +++ /dev/null @@ -1,19 +0,0 @@ -namespace KafkaFlow -{ - using KafkaFlow.Observer; - - /// - /// Represents a subject specific to worker stopping events where observers can subscribe to receive notifications. - /// - public class WorkerStoppingSubject : Subject - { - /// - /// Initializes a new instance of the class. - /// - /// The log handler object to be used - public WorkerStoppingSubject(ILogHandler logHandler) - : base(logHandler) - { - } - } -} diff --git a/src/KafkaFlow.Abstractions/IEvent.cs b/src/KafkaFlow.Abstractions/IEvent.cs new file mode 100644 index 000000000..cf172f234 --- /dev/null +++ b/src/KafkaFlow.Abstractions/IEvent.cs @@ -0,0 +1,30 @@ +namespace KafkaFlow +{ + using System; + using System.Threading.Tasks; + + /// + /// Represents an Event to be subscribed. + /// + public interface IEvent + { + /// + /// Subscribes to the event. + /// + /// The handler to be called when the event is fired. + void Subscribe(Func handler); + } + + /// + /// Represents an Event to be subscribed. + /// + /// The argument expected by the event. + public interface IEvent + { + /// + /// Subscribes to the event. + /// + /// The handler to be called when the event is fired. + void Subscribe(Func handler); + } +} diff --git a/src/KafkaFlow.Abstractions/Observer/ISubject.cs b/src/KafkaFlow.Abstractions/Observer/ISubject.cs deleted file mode 100644 index 669d24591..000000000 --- a/src/KafkaFlow.Abstractions/Observer/ISubject.cs +++ /dev/null @@ -1,17 +0,0 @@ -namespace KafkaFlow.Observer -{ - /// - /// Represents a subject in the observer design pattern that can be observed by observers. - /// - /// The type of the subject. - /// An argument type that will be passed to the observers - public interface ISubject - where TSubject : Subject - { - /// - /// Subscribes an observer to the subject. - /// - /// The observer to subscribe. - void Subscribe(ISubjectObserver observer); - } -} diff --git a/src/KafkaFlow.Abstractions/Observer/ISubjectObserver.cs b/src/KafkaFlow.Abstractions/Observer/ISubjectObserver.cs deleted file mode 100644 index 223cd863f..000000000 --- a/src/KafkaFlow.Abstractions/Observer/ISubjectObserver.cs +++ /dev/null @@ -1,18 +0,0 @@ -namespace KafkaFlow.Observer -{ - using System.Threading.Tasks; - - /// - /// Represents an observer in the observer design pattern that can receive notifications from a subject. - /// - /// The type of the subject. - /// An argument type that will be passed to the observers - public interface ISubjectObserver - { - /// - /// Called when a notification is received from the subject. - /// - /// A task representing the asynchronous notification handling. - Task OnNotification(TSubject subject, TArg arg); - } -} diff --git a/src/KafkaFlow.Abstractions/Observer/Subject.cs b/src/KafkaFlow.Abstractions/Observer/Subject.cs deleted file mode 100644 index 31fb1bfe9..000000000 --- a/src/KafkaFlow.Abstractions/Observer/Subject.cs +++ /dev/null @@ -1,53 +0,0 @@ -namespace KafkaFlow.Observer -{ - using System; - using System.Collections.Generic; - using System.Threading.Tasks; - - /// - /// A generic implementation that should be extended to help the use of the notification system. - /// - /// The type of the subject. - /// An argument type that will be passed to the observers - public abstract class Subject : ISubject - where TSubject : Subject - { - private readonly ILogHandler logHandler; - private readonly List> observers = new(); - - /// - /// Initializes a new instance of the class. - /// - /// The log handler object to be used - protected Subject(ILogHandler logHandler) - { - this.logHandler = logHandler; - } - - /// - /// Subscribes an observer to the subject, allowing it to receive notifications. - /// - /// The observer to subscribe. - public void Subscribe(ISubjectObserver observer) => this.observers.Add(observer); - - /// - /// Notifies all subscribed observers asynchronously. - /// - /// The parameter passed by the client. - /// A task representing the asynchronous notification operation. - public async Task NotifyAsync(TArg arg) - { - foreach (var observer in this.observers) - { - try - { - await observer.OnNotification((TSubject)this, arg); - } - catch (Exception e) - { - this.logHandler.Error("Error notifying observer", e, new { Subject = this.GetType().Name }); - } - } - } - } -} diff --git a/src/KafkaFlow.Abstractions/VoidObject.cs b/src/KafkaFlow.Abstractions/VoidObject.cs deleted file mode 100644 index d59912d4c..000000000 --- a/src/KafkaFlow.Abstractions/VoidObject.cs +++ /dev/null @@ -1,17 +0,0 @@ -namespace KafkaFlow; - -/// -/// A type that represents an empty object that should be ignored -/// -public class VoidObject -{ - /// - /// Gets the unique instance value - /// - public static readonly VoidObject Value = new(); - - private VoidObject() - { - // Empty - } -} diff --git a/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs b/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs index 70cc87530..a6f0621df 100644 --- a/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs +++ b/src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs @@ -7,12 +7,8 @@ using System.Threading.Tasks; using KafkaFlow.Configuration; using KafkaFlow.Consumers; - using KafkaFlow.Observer; - internal class BatchConsumeMiddleware - : IMessageMiddleware, - ISubjectObserver, - IDisposable + internal class BatchConsumeMiddleware : IMessageMiddleware, IDisposable { private readonly SemaphoreSlim dispatchSemaphore = new(1, 1); @@ -37,7 +33,7 @@ public BatchConsumeMiddleware( this.batch = new(batchSize); this.consumerConfiguration = middlewareContext.Consumer.Configuration; - middlewareContext.Worker.WorkerStopped.Subscribe(this); + middlewareContext.Worker.WorkerStopped.Subscribe(() => this.TriggerDispatchAndWaitAsync()); } public async Task Invoke(IMessageContext context, MiddlewareDelegate next) @@ -67,8 +63,6 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next) } } - public async Task OnNotification(WorkerStoppedSubject subject, VoidObject arg) => await this.TriggerDispatchAndWaitAsync(); - public void Dispose() { this.dispatchTask?.Dispose(); diff --git a/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs b/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs index e55e85ae8..2269db974 100644 --- a/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs +++ b/src/KafkaFlow.UnitTests/BatchConsume/BatchConsumeMiddlewareTests.cs @@ -51,7 +51,7 @@ public void Setup() workerMock .SetupGet(x => x.WorkerStopped) - .Returns(new WorkerStoppedSubject(this.logHandlerMock.Object)); + .Returns(new Event(this.logHandlerMock.Object)); consumerConfigurationMock .SetupGet(x => x.AutoMessageCompletion) diff --git a/src/KafkaFlow.UnitTests/EventTests.cs b/src/KafkaFlow.UnitTests/EventTests.cs new file mode 100644 index 000000000..edc68c3f0 --- /dev/null +++ b/src/KafkaFlow.UnitTests/EventTests.cs @@ -0,0 +1,171 @@ +namespace KafkaFlow.UnitTests +{ + using System; + using System.Collections.Generic; + using System.Linq; + using System.Threading.Tasks; + using Microsoft.VisualStudio.TestTools.UnitTesting; + using Moq; + + [TestClass] + public class EventTests + { + [TestMethod] + public async Task FireAsync_EventSubscribed_CallDelegateWithSuccess() + { + // Arrange + var numberOfCalls = 0; + var log = new Mock(); + + var event1 = new Event(log.Object); + + event1.Subscribe(() => + { + numberOfCalls++; + return Task.CompletedTask; + }); + + // Act + await event1.FireAsync(); + + // Assert + Assert.AreEqual(1, numberOfCalls); + } + + [TestMethod] + public async Task FireAsync_EventWithMultipleObservers_CallAllDelegatesWithSuccess() + { + // Arrange + var numberOfCalls = 0; + var log = new Mock(); + + var event1 = new Event(log.Object); + + event1.Subscribe(() => + { + numberOfCalls++; + return Task.CompletedTask; + }); + + event1.Subscribe(() => + { + numberOfCalls++; + return Task.CompletedTask; + }); + + // Act + await event1.FireAsync(); + + // Assert + Assert.AreEqual(2, numberOfCalls); + } + + [TestMethod] + public async Task FireAsync_EventWithMultipleObserversAndErrors_CallAllDelegatesAndContinueWithoutErrors() + { + // Arrange + var numberOfCalls = 0; + var log = new Mock(); + + var event1 = new Event(log.Object); + + event1.Subscribe(() => + { + throw new System.Exception(); + }); + + event1.Subscribe(() => + { + numberOfCalls++; + return Task.CompletedTask; + }); + + // Act + await event1.FireAsync(); + + // Assert + Assert.AreEqual(1, numberOfCalls); + } + + [TestMethod] + public async Task FireAsync_EventSubscribedWithArgument_CallDelegateWithSuccess() + { + // Arrange + var expectedArgument = Guid.NewGuid().ToString(); + var receivedArgument = string.Empty; + var log = new Mock(); + + var event1 = new Event(log.Object); + + event1.Subscribe(arg => + { + receivedArgument = arg; + return Task.CompletedTask; + }); + + // Act + await event1.FireAsync(expectedArgument); + + // Assert + Assert.AreEqual(expectedArgument, receivedArgument); + } + + [TestMethod] + public async Task FireAsync_EventWithMultipleObserversAndArgument_CallAllDelegatesWithSuccess() + { + // Arrange + var expectedArgument = Guid.NewGuid().ToString(); + var receivedArguments = new List(); + + var log = new Mock(); + + var event1 = new Event(log.Object); + + event1.Subscribe(arg => + { + receivedArguments.Add(arg); + return Task.CompletedTask; + }); + + event1.Subscribe(arg => + { + receivedArguments.Add(arg); + return Task.CompletedTask; + }); + + // Act + await event1.FireAsync(expectedArgument); + + // Assert + Assert.AreEqual(2, receivedArguments.Count); + Assert.IsTrue(receivedArguments.All(x => x == expectedArgument)); + } + + [TestMethod] + public async Task FireAsync_TypedEventWithMultipleObserversAndErrors_CallAllDelegatesAndContinueWithoutErrors() + { + // Arrange + var numberOfCalls = 0; + var log = new Mock(); + + var event1 = new Event(log.Object); + + event1.Subscribe(_ => + { + throw new System.Exception(); + }); + + event1.Subscribe(_ => + { + numberOfCalls++; + return Task.CompletedTask; + }); + + // Act + await event1.FireAsync(Guid.NewGuid().ToString()); + + // Assert + Assert.AreEqual(1, numberOfCalls); + } + } +} diff --git a/src/KafkaFlow/ConsumerManagerFactory.cs b/src/KafkaFlow/ConsumerManagerFactory.cs index a3c752bc2..33bce7adb 100644 --- a/src/KafkaFlow/ConsumerManagerFactory.cs +++ b/src/KafkaFlow/ConsumerManagerFactory.cs @@ -22,7 +22,7 @@ public IConsumerManager Create(IConsumerConfiguration configuration, IDependency configuration, logHandler); - consumerWorkerPool.WorkerPoolStopped.Subscribe(middlewareExecutor); + consumerWorkerPool.WorkerPoolStopped.Subscribe(() => middlewareExecutor.OnWorkerPoolStopped()); var feeder = new WorkerPoolFeeder( consumer, diff --git a/src/KafkaFlow/Consumers/ConsumerWorker.cs b/src/KafkaFlow/Consumers/ConsumerWorker.cs index 56e5978eb..81041dc69 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorker.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorker.cs @@ -4,7 +4,6 @@ namespace KafkaFlow.Consumers using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; - using KafkaFlow.Observer; internal class ConsumerWorker : IConsumerWorker { @@ -15,8 +14,8 @@ internal class ConsumerWorker : IConsumerWorker private readonly Channel messagesBuffer; - private readonly WorkerStoppingSubject workerStoppingSubject; - private readonly WorkerStoppedSubject workerStoppedSubject; + private readonly Event workerStoppingSubject; + private readonly Event workerStoppedSubject; private CancellationTokenSource stopCancellationTokenSource; private Task backgroundTask; @@ -51,9 +50,9 @@ public ConsumerWorker( public IDependencyResolver WorkerDependencyResolver => this.workerDependencyResolverScope.Resolver; - public ISubject WorkerStopping => this.workerStoppingSubject; + public IEvent WorkerStopping => this.workerStoppingSubject; - public ISubject WorkerStopped => this.workerStoppedSubject; + public IEvent WorkerStopped => this.workerStoppedSubject; public ValueTask EnqueueAsync( IMessageContext context, @@ -98,7 +97,7 @@ public Task StartAsync() public async Task StopAsync() { - await this.workerStoppingSubject.NotifyAsync(VoidObject.Value); + await this.workerStoppingSubject.FireAsync(); this.messagesBuffer.Writer.TryComplete(); @@ -109,7 +108,7 @@ public async Task StopAsync() await this.backgroundTask.ConfigureAwait(false); - await this.workerStoppedSubject.NotifyAsync(VoidObject.Value); + await this.workerStoppedSubject.FireAsync(); } public void Dispose() diff --git a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs index f82f494b9..5774d2c9e 100644 --- a/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs +++ b/src/KafkaFlow/Consumers/ConsumerWorkerPool.cs @@ -7,7 +7,6 @@ namespace KafkaFlow.Consumers using System.Threading.Tasks; using Confluent.Kafka; using KafkaFlow.Configuration; - using KafkaFlow.Observer; internal class ConsumerWorkerPool : IConsumerWorkerPool { @@ -18,7 +17,7 @@ internal class ConsumerWorkerPool : IConsumerWorkerPool private readonly Factory distributionStrategyFactory; private readonly IOffsetCommitter offsetCommitter; - private readonly WorkerPoolStoppedSubject workerPoolStoppedSubject; + private readonly Event workerPoolStoppedSubject; private TaskCompletionSource startedTaskSource = new(); private List workers = new(); @@ -38,7 +37,7 @@ public ConsumerWorkerPool( this.middlewareExecutor = middlewareExecutor; this.logHandler = logHandler; this.distributionStrategyFactory = consumerConfiguration.DistributionStrategyFactory; - this.workerPoolStoppedSubject = new(logHandler); + this.workerPoolStoppedSubject = new Event(logHandler); this.offsetCommitter = consumer.Configuration.NoStoreOffsets ? new NullOffsetCommitter() : @@ -52,7 +51,7 @@ public ConsumerWorkerPool( public int CurrentWorkersCount { get; private set; } - public ISubject WorkerPoolStopped => this.workerPoolStoppedSubject; + public IEvent WorkerPoolStopped => this.workerPoolStoppedSubject; public async Task StartAsync(IReadOnlyCollection partitions, int workersCount) { @@ -121,7 +120,7 @@ public async Task StopAsync() this.offsetManager = null; - await this.workerPoolStoppedSubject.NotifyAsync(VoidObject.Value); + await this.workerPoolStoppedSubject.FireAsync(); await this.offsetCommitter.StopAsync(); } diff --git a/src/KafkaFlow/Consumers/WorkerPoolStoppedSubject.cs b/src/KafkaFlow/Consumers/WorkerPoolStoppedSubject.cs index 0a3a5124c..51f85875d 100644 --- a/src/KafkaFlow/Consumers/WorkerPoolStoppedSubject.cs +++ b/src/KafkaFlow/Consumers/WorkerPoolStoppedSubject.cs @@ -1,12 +1,12 @@ -namespace KafkaFlow.Consumers -{ - using KafkaFlow.Observer; +//namespace KafkaFlow.Consumers +//{ +// using KafkaFlow.Observer; - internal class WorkerPoolStoppedSubject : Subject - { - public WorkerPoolStoppedSubject(ILogHandler logHandler) - : base(logHandler) - { - } - } -} +// internal class WorkerPoolStoppedSubject : Subject +// { +// public WorkerPoolStoppedSubject(ILogHandler logHandler) +// : base(logHandler) +// { +// } +// } +//} diff --git a/src/KafkaFlow/Event.cs b/src/KafkaFlow/Event.cs new file mode 100644 index 000000000..aa366030f --- /dev/null +++ b/src/KafkaFlow/Event.cs @@ -0,0 +1,63 @@ +namespace KafkaFlow +{ + using System; + using System.Collections.Generic; + using System.Threading.Tasks; + + internal class Event : IEvent + { + private readonly ILogHandler logHandler; + + private readonly IList> handlers = new List>(); + + public Event(ILogHandler logHandler) + { + this.logHandler = logHandler; + } + + public void Subscribe(Func handler) + { + this.handlers.Add(handler); + } + + internal async Task FireAsync(TArg arg) + { + foreach (var handler in this.handlers) + { + try + { + if (handler is null) + { + continue; + } + + await handler.Invoke(arg); + } + catch (Exception e) + { + this.logHandler.Error("Error firing event", e, new { Event = this.GetType().Name }); + } + } + } + } + + internal class Event : IEvent + { + private readonly Event evt; + + public Event(ILogHandler logHandler) + { + this.evt = new Event(logHandler); + } + + public void Subscribe(Func handle) + { + this.evt.Subscribe(_ => handle?.Invoke()); + } + + internal Task FireAsync() + { + return this.evt.FireAsync(null); + } + } +} diff --git a/src/KafkaFlow/MiddlewareExecutor.cs b/src/KafkaFlow/MiddlewareExecutor.cs index feaf54619..1c528be31 100644 --- a/src/KafkaFlow/MiddlewareExecutor.cs +++ b/src/KafkaFlow/MiddlewareExecutor.cs @@ -5,12 +5,8 @@ namespace KafkaFlow using System.Linq; using System.Threading.Tasks; using KafkaFlow.Configuration; - using KafkaFlow.Consumers; - using KafkaFlow.Observer; - internal class MiddlewareExecutor - : IMiddlewareExecutor, - ISubjectObserver + internal class MiddlewareExecutor : IMiddlewareExecutor { private readonly IReadOnlyList configurations; @@ -27,7 +23,7 @@ public Task Execute(IMessageContext context, Func nextOpe return this.ExecuteDefinition(0, context, nextOperation); } - public Task OnNotification(WorkerPoolStoppedSubject subject, VoidObject arg) + internal Task OnWorkerPoolStopped() { this.workersMiddlewares.Clear(); return Task.CompletedTask;