Skip to content

Commit

Permalink
feat: add event notification feature
Browse files Browse the repository at this point in the history
  • Loading branch information
ailtonguitar authored and filipeesch committed Oct 23, 2023
1 parent 677241d commit d5a9c21
Show file tree
Hide file tree
Showing 16 changed files with 234 additions and 203 deletions.
16 changes: 6 additions & 10 deletions src/KafkaFlow.Abstractions/Consumers/IWorker.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
namespace KafkaFlow
{
using System;
using KafkaFlow.Observer;

/// <summary>
/// Represents the interface of a internal worker
/// </summary>
Expand All @@ -14,19 +11,18 @@ public interface IWorker
int Id { get; }

/// <summary>
/// This handler is called immediately after a worker completes the consumption of a message
/// Gets the subject for worker stopping events where observers can subscribe to receive notifications.
/// </summary>
/// <param name="handler"><see cref="Action"/> to be executed</param>
void OnTaskCompleted(Action handler);
IEvent WorkerStopping { get; }

/// <summary>
/// Gets the subject for worker stopping events where observers can subscribe to receive notifications.
/// Gets the subject for worker stopped events where observers can subscribe to receive notifications.
/// </summary>
ISubject<WorkerStoppingSubject, VoidObject> WorkerStopping { get; }
IEvent WorkerStopped { get; }

/// <summary>
/// Gets the subject for worker stopped events where observers can subscribe to receive notifications.
/// Gets the subject for worker consumption completed events where observers can subscribe to receive notifications.
/// </summary>
ISubject<WorkerStoppedSubject, VoidObject> WorkerStopped { get; }
IEvent<IMessageContext> WorkerProcessingEnded { get; }
}
}
19 changes: 0 additions & 19 deletions src/KafkaFlow.Abstractions/Consumers/WorkerStoppedSubject.cs

This file was deleted.

19 changes: 0 additions & 19 deletions src/KafkaFlow.Abstractions/Consumers/WorkerStoppingSubject.cs

This file was deleted.

17 changes: 0 additions & 17 deletions src/KafkaFlow.Abstractions/Observer/ISubject.cs

This file was deleted.

18 changes: 0 additions & 18 deletions src/KafkaFlow.Abstractions/Observer/ISubjectObserver.cs

This file was deleted.

53 changes: 0 additions & 53 deletions src/KafkaFlow.Abstractions/Observer/Subject.cs

This file was deleted.

17 changes: 0 additions & 17 deletions src/KafkaFlow.Abstractions/VoidObject.cs

This file was deleted.

10 changes: 2 additions & 8 deletions src/KafkaFlow.BatchConsume/BatchConsumeMiddleware.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@
using System.Threading.Tasks;
using KafkaFlow.Configuration;
using KafkaFlow.Consumers;
using KafkaFlow.Observer;

internal class BatchConsumeMiddleware
: IMessageMiddleware,
ISubjectObserver<WorkerStoppedSubject, VoidObject>,
IDisposable
internal class BatchConsumeMiddleware : IMessageMiddleware, IDisposable
{
private readonly SemaphoreSlim dispatchSemaphore = new(1, 1);

Expand All @@ -37,7 +33,7 @@ public BatchConsumeMiddleware(
this.batch = new(batchSize);
this.consumerConfiguration = middlewareContext.Consumer.Configuration;

middlewareContext.Worker.WorkerStopped.Subscribe(this);
middlewareContext.Worker.WorkerStopped.Subscribe(() => this.TriggerDispatchAndWaitAsync());
}

public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
Expand Down Expand Up @@ -67,8 +63,6 @@ public async Task Invoke(IMessageContext context, MiddlewareDelegate next)
}
}

public async Task OnNotification(WorkerStoppedSubject subject, VoidObject arg) => await this.TriggerDispatchAndWaitAsync();

public void Dispose()
{
this.dispatchTask?.Dispose();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void Setup()

workerMock
.SetupGet(x => x.WorkerStopped)
.Returns(new WorkerStoppedSubject(this.logHandlerMock.Object));
.Returns(new Event(this.logHandlerMock.Object));

consumerConfigurationMock
.SetupGet(x => x.AutoMessageCompletion)
Expand Down
Loading

0 comments on commit d5a9c21

Please sign in to comment.