Skip to content

Commit

Permalink
Merge pull request #3 from Fresa/add-consumer-test
Browse files Browse the repository at this point in the history
Add consumer test
  • Loading branch information
Fresa authored Feb 5, 2022
2 parents 5bb6fc0 + 01dfe11 commit a935148
Show file tree
Hide file tree
Showing 13 changed files with 314 additions and 44 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,11 @@ https://www.nuget.org/packages/kafka.testframework
## Getting Started
The test framework can be used in-memory or by setting up a TCP socket that the kafka client can connect to. See the [`integration tests`](https://github.com/Fresa/Kafka.TestFramework/blob/master/tests/Kafka.TestFramework.Tests).

### Testing a Producer
[This](tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs) integration test shows how a setup of the test framework with a Kafka.Confluence producer could look like.

### Testing a Consumer
[This](tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_consuming_a_message.cs) integration test shows how a setup of the test framework with a Kafka.Confluence consumer could look like.

### v2.x
Now supports [Kafka.Protocol](https://github.com/Fresa/Kafka.Protocol) v2.x.
78 changes: 62 additions & 16 deletions src/Kafka.TestFramework/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,99 @@
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Log.It;

namespace Kafka.TestFramework
{
internal abstract class Client : IAsyncDisposable
{
private readonly CancellationTokenSource _cancellationSource;
private readonly Pipe _pipe = new Pipe();
private readonly CancellationTokenSource _signalNoMoreDataToWrite;
private readonly CancellationTokenSource _signalNoMoreDataToRead;

private readonly Pipe _pipe = new Pipe(new PipeOptions(
useSynchronizationContext: false));
private readonly INetworkClient _networkClient;
private Task _sendAndReceiveBackgroundTask = default!;
private const int MinimumBufferSize = 512;
private static readonly ILogger Logger =
LogFactory.Create<Client>();

protected Client(INetworkClient networkClient, CancellationToken cancellationToken)
{
_networkClient = networkClient;
NetworkClient = new NetworkStream(networkClient);
Reader = _pipe.Reader;
_cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_signalNoMoreDataToWrite = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_signalNoMoreDataToRead = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
_pipe.Writer.OnReaderCompleted((exception, _) => _signalNoMoreDataToWrite.Cancel(), null);
_pipe.Reader.OnWriterCompleted((exception, _) => _signalNoMoreDataToRead.Cancel(), null);
}

protected NetworkStream NetworkClient { get; }
protected PipeReader Reader { get; }


protected CancellationToken NoMoreDataToRead => _signalNoMoreDataToRead.Token;

protected void StartReceiving()
{
_sendAndReceiveBackgroundTask = Task.Run(
async () =>
{
var cancellationToken = _cancellationSource.Token;
try
var cancellationToken = _signalNoMoreDataToWrite.Token;
var writer = _pipe.Writer;
await using (_networkClient.ConfigureAwait(false))
{
var dataReceiver = new DataReceiver(_networkClient);
while (cancellationToken.IsCancellationRequested == false)
try
{
await dataReceiver
.ReceiveAsync(_pipe.Writer, cancellationToken)
.ConfigureAwait(false);
FlushResult result;
do
{
var memory = writer.GetMemory(MinimumBufferSize);
var bytesRead = await _networkClient.ReceiveAsync(
memory,
cancellationToken)
.ConfigureAwait(false);
if (bytesRead == 0)
{
break;
}
Logger.Debug("Received {bytesRead} bytes", bytesRead);
writer.Advance(bytesRead);
result = await writer
.FlushAsync(cancellationToken)
.ConfigureAwait(false);
} while (result.IsCanceled == false &&
result.IsCompleted == false);
}
}
catch when (_cancellationSource.IsCancellationRequested)
{
// Shutdown in progress
catch when (_signalNoMoreDataToWrite.IsCancellationRequested)
{
// Shutdown in progress
}
catch (OperationCanceledException)
{
}
catch (Exception ex)
{
writer.Complete(ex);
throw;
}
finally
{
_signalNoMoreDataToWrite.Cancel();
}
writer.Complete();
}
});
}

public async ValueTask DisposeAsync()
{
_cancellationSource.Cancel();
_signalNoMoreDataToWrite.Cancel();
_signalNoMoreDataToRead.Cancel();

await _sendAndReceiveBackgroundTask
.ConfigureAwait(false);
Expand Down
2 changes: 1 addition & 1 deletion src/Kafka.TestFramework/DataReceiver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ internal async Task ReceiveAsync(PipeWriter writer, CancellationToken cancellati

if (bytesRead == 0)
{
return;
break;
}

Logger.Debug("Received {bytesRead} bytes", bytesRead);
Expand Down
5 changes: 5 additions & 0 deletions src/Kafka.TestFramework/KafkaTestFramework.cs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ await client
{
return;
}
catch (OperationCanceledException)
{
_cancellationTokenSource.Cancel();
return;
}
catch
{
_cancellationTokenSource.Cancel();
Expand Down
2 changes: 1 addition & 1 deletion src/Kafka.TestFramework/RequestClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public async ValueTask<ResponsePayload> ReadAsync(
.ReadFromAsync(
requestPayload,
Reader,
cancellationToken)
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, NoMoreDataToRead).Token)
.ConfigureAwait(false);
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/Kafka.TestFramework/ResponseClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ internal async Task<RequestPayload> ReadAsync(
return await RequestPayload
.ReadFromAsync(
Reader,
cancellationToken)
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, NoMoreDataToRead).Token)
.ConfigureAwait(false);
}
}
Expand Down
26 changes: 20 additions & 6 deletions src/Kafka.TestFramework/SocketNetworkClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,30 @@ public ValueTask DisposeAsync()

public async ValueTask<int> SendAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
{
return await _socket
.SendAsync(buffer, SocketFlags.None, cancellationToken)
.ConfigureAwait(false);
try
{
return await _socket
.SendAsync(buffer, SocketFlags.None, cancellationToken)
.ConfigureAwait(false);
}
catch (SocketException) when (!_socket.Connected)
{
throw new OperationCanceledException(cancellationToken);
}
}

public async ValueTask<int> ReceiveAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
{
return await _socket
.ReceiveAsync(buffer, SocketFlags.None, cancellationToken)
.ConfigureAwait(false);
try
{
return await _socket
.ReceiveAsync(buffer, SocketFlags.None, cancellationToken)
.ConfigureAwait(false);
}
catch (SocketException) when (!_socket.Connected)
{
throw new OperationCanceledException(cancellationToken);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.1.0" />
<PackageReference Include="FluentAssertions" Version="5.9.0" />
<PackageReference Include="Kafka.Protocol" Version="2.0.0" />
<PackageReference Include="Kafka.Protocol" Version="2.0.3" />
<PackageReference Include="Log.It.With.NLog" Version="2.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="3.0.0" />
Expand Down
16 changes: 7 additions & 9 deletions tests/Confluent.Kafka.1.1.0.Tests/TestSpecificationAsync.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Threading.Tasks;
using System;
using System.Threading.Tasks;
using Log.It;
using Log.It.With.NLog;
using Microsoft.Extensions.Configuration;
Expand All @@ -10,6 +11,8 @@ namespace Kafka.TestFramework.Tests
{
public class TestSpecificationAsync : XUnit2SpecificationAsync
{
private readonly IDisposable _logWriter;

static TestSpecificationAsync()
{
var config = new ConfigurationBuilder()
Expand All @@ -19,23 +22,18 @@ static TestSpecificationAsync()

NLog.LogManager.Configuration = new NLogLoggingConfiguration(config.GetSection("NLog"));
LogFactory.Initialize(new NLogFactory(new LogicalThreadContext()));
NLogCapturingTarget.Subscribe += Output.Writer.WriteLine;
}

public TestSpecificationAsync(ITestOutputHelper testOutputHelper) : base(testOutputHelper)
{
NLogCapturingTarget.Subscribe += TestOutputHelper.WriteLine;
}

protected virtual Task TearDownAsync()
{
return Task.CompletedTask;
_logWriter = Output.WriteTo(testOutputHelper);
}

protected sealed override async Task DisposeAsync(bool disposing)
{
NLogCapturingTarget.Subscribe -= TestOutputHelper.WriteLine;
await TearDownAsync();
await base.DisposeAsync(disposing);
_logWriter.Dispose();
}
}
}
Loading

0 comments on commit a935148

Please sign in to comment.