diff --git a/src/Kafka.TestFramework/Client.cs b/src/Kafka.TestFramework/Client.cs index b5b851e..8bd708f 100644 --- a/src/Kafka.TestFramework/Client.cs +++ b/src/Kafka.TestFramework/Client.cs @@ -1,25 +1,23 @@ using System; -using System.IO; using System.IO.Pipelines; using System.Threading; using System.Threading.Tasks; -using Kafka.Protocol; -using Int32 = Kafka.Protocol.Int32; namespace Kafka.TestFramework { internal abstract class Client : IAsyncDisposable { - private readonly CancellationTokenSource _cancellationSource = new CancellationTokenSource(); + private readonly CancellationTokenSource _cancellationSource; private readonly Pipe _pipe = new Pipe(); private readonly INetworkClient _networkClient; private Task _sendAndReceiveBackgroundTask = default!; - protected Client(INetworkClient networkClient) + protected Client(INetworkClient networkClient, CancellationToken cancellationToken) { _networkClient = networkClient; NetworkClient = new NetworkStream(networkClient); Reader = _pipe.Reader; + _cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); } protected NetworkStream NetworkClient { get; } diff --git a/src/Kafka.TestFramework/InMemoryKafkaTestFramework.cs b/src/Kafka.TestFramework/InMemoryKafkaTestFramework.cs index 102bd23..4b96ed6 100644 --- a/src/Kafka.TestFramework/InMemoryKafkaTestFramework.cs +++ b/src/Kafka.TestFramework/InMemoryKafkaTestFramework.cs @@ -26,9 +26,9 @@ public async Task CreateRequestClientAsync( var requestClient = new CrossWiredMemoryNetworkClient(first, second); var responseClient = new CrossWiredMemoryNetworkClient(second, first); await _clients - .SendAsync(responseClient, cancellationToken) + .SendAsync(responseClient, CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, Stopping).Token) .ConfigureAwait(false); - return new DisposableRequestClientDecorator(RequestClient.Start(requestClient), responseClient, second, first); + return new DisposableRequestClientDecorator(RequestClient.Start(requestClient, Stopping), responseClient, second, first); } private class DisposableRequestClientDecorator : IRequestClient diff --git a/src/Kafka.TestFramework/KafkaTestFramework.cs b/src/Kafka.TestFramework/KafkaTestFramework.cs index cd2e47a..435ba56 100644 --- a/src/Kafka.TestFramework/KafkaTestFramework.cs +++ b/src/Kafka.TestFramework/KafkaTestFramework.cs @@ -8,17 +8,23 @@ namespace Kafka.TestFramework { - public abstract class KafkaTestFramework : IAsyncDisposable + public abstract class KafkaTestFramework { private readonly INetworkServer _networkServer; private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); + private readonly List _backgroundTasks = new List(); - private const int Stopped = 0; - private const int Started = 1; - private int _status = Stopped; + private const int HasStopped = 0; + private const int HasStarted = 1; + private int _status = HasStopped; + + /// + /// Triggered when the test framework is stopping + /// + public CancellationToken Stopping => _cancellationTokenSource.Token; public static InMemoryKafkaTestFramework InMemory() { @@ -47,10 +53,10 @@ internal KafkaTestFramework(INetworkServer networkServer) public IAsyncDisposable Start() { - var previousStatus = Interlocked.Exchange(ref _status, Started); - if (previousStatus == Started) + var previousStatus = Interlocked.Exchange(ref _status, HasStarted); + if (previousStatus == HasStarted) { - return this; + return new StopOnDispose(this); } var task = Task.Run( @@ -61,7 +67,7 @@ public IAsyncDisposable Start() try { var client = await _networkServer - .WaitForConnectedClientAsync(_cancellationTokenSource.Token) + .WaitForConnectedClientAsync(Stopping) .ConfigureAwait(false); ReceiveMessagesFor(client); } @@ -69,10 +75,15 @@ public IAsyncDisposable Start() { return; } + catch + { + _cancellationTokenSource.Cancel(); + throw; + } } }); _backgroundTasks.Add(task); - return this; + return new StopOnDispose(this); } private void ReceiveMessagesFor(INetworkClient networkClient) @@ -80,14 +91,14 @@ private void ReceiveMessagesFor(INetworkClient networkClient) var task = Task.Run( async () => { - var client = ResponseClient.Start(networkClient); + var client = ResponseClient.Start(networkClient, Stopping); await using var _ = client.ConfigureAwait(false); while (_cancellationTokenSource.IsCancellationRequested == false) { try { var requestPayload = await client - .ReadAsync(_cancellationTokenSource.Token) + .ReadAsync(Stopping) .ConfigureAwait(false); if (!_subscriptions.TryGetValue( @@ -95,12 +106,12 @@ private void ReceiveMessagesFor(INetworkClient networkClient) out var subscription)) { throw new InvalidOperationException( - $"Missing subscription for {requestPayload.Message.GetType()}"); + $"Missing subscription for {requestPayload.Message.GetType()}"); } var response = await subscription( - requestPayload.Message, - _cancellationTokenSource.Token); + requestPayload.Message, + Stopping); await client .SendAsync( @@ -109,13 +120,18 @@ await client Messages.GetResponseHeaderVersionFor(requestPayload)) .WithCorrelationId(requestPayload.Header.CorrelationId), response), - _cancellationTokenSource.Token) + Stopping) .ConfigureAwait(false); } catch when (_cancellationTokenSource.IsCancellationRequested) { return; } + catch + { + _cancellationTokenSource.Cancel(); + throw; + } } }); _backgroundTasks.Add(task); @@ -163,11 +179,20 @@ public KafkaTestFramework On( return this; } - public async ValueTask DisposeAsync() + internal sealed class StopOnDispose : IAsyncDisposable { - _cancellationTokenSource.Cancel(); + private readonly KafkaTestFramework _testFramework; - await Task.WhenAll(_backgroundTasks); + public StopOnDispose(KafkaTestFramework testFramework) + { + _testFramework = testFramework; + } + public async ValueTask DisposeAsync() + { + _testFramework._cancellationTokenSource.Cancel(); + await Task.WhenAll(_testFramework._backgroundTasks) + .ConfigureAwait(false); + } } } } \ No newline at end of file diff --git a/src/Kafka.TestFramework/RequestClient.cs b/src/Kafka.TestFramework/RequestClient.cs index f1bf3c1..867698a 100644 --- a/src/Kafka.TestFramework/RequestClient.cs +++ b/src/Kafka.TestFramework/RequestClient.cs @@ -1,19 +1,19 @@ using System.Threading; using System.Threading.Tasks; using Kafka.Protocol; -using Int32 = Kafka.Protocol.Int32; namespace Kafka.TestFramework { internal class RequestClient : Client, IRequestClient { - private RequestClient(INetworkClient networkClient) : base(networkClient) + private RequestClient(INetworkClient networkClient, CancellationToken cancellationToken) : + base(networkClient, cancellationToken) { } - internal static RequestClient Start(INetworkClient networkClient) + internal static RequestClient Start(INetworkClient networkClient, CancellationToken cancellationToken) { - var client = new RequestClient(networkClient); + var client = new RequestClient(networkClient, cancellationToken); client.StartReceiving(); return client; } diff --git a/src/Kafka.TestFramework/ResponseClient.cs b/src/Kafka.TestFramework/ResponseClient.cs index 081c1f9..757f478 100644 --- a/src/Kafka.TestFramework/ResponseClient.cs +++ b/src/Kafka.TestFramework/ResponseClient.cs @@ -6,13 +6,14 @@ namespace Kafka.TestFramework { internal class ResponseClient : Client { - private ResponseClient(INetworkClient networkClient) : base(networkClient) + private ResponseClient(INetworkClient networkClient, CancellationToken cancellationToken) : + base(networkClient, cancellationToken) { } - internal static ResponseClient Start(INetworkClient networkClient) + internal static ResponseClient Start(INetworkClient networkClient, CancellationToken cancellationToken) { - var client = new ResponseClient(networkClient); + var client = new ResponseClient(networkClient, cancellationToken); client.StartReceiving(); return client; } diff --git a/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs b/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs index 8bc03b2..51e873f 100644 --- a/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs +++ b/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs @@ -52,7 +52,7 @@ protected override Task GivenAsync() .WithLeaderId(Int32.From(0)) .WithPartitionIndex(Int32.From(0)) .WithReplicaNodesCollection(new[] { Int32.From(0) })))) - .ToArray() ?? new Func[0]) + .ToArray() ?? Array.Empty>()) .WithControllerId(Int32.From(0)) .WithClusterId(String.From("test")) .WithBrokersCollection(broker => broker @@ -96,7 +96,7 @@ protected override async Task WhenAsync() await using (_testServer.Start() .ConfigureAwait(false)) { - await ProduceMessageFromClientAsync("localhost", _testServer.Port) + await ProduceMessageFromClientAsync("localhost", _testServer.Port, _testServer.Stopping) .ConfigureAwait(false); } } @@ -114,7 +114,7 @@ public void It_should_have_read_the_message_sent() } private static async Task ProduceMessageFromClientAsync(string host, - int port) + int port, CancellationToken testServerStopping) { var producerConfig = new ProducerConfig(new Dictionary { @@ -138,7 +138,7 @@ private static async Task ProduceMessageFromClientAsync(string host, .ConfigureAwait(false); LogFactory.Create("producer").Info("Produce report {@report}", report); - producer.Flush(); + producer.Flush(testServerStopping); } } } diff --git a/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs b/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs index 3c7c860..b8c0a64 100644 --- a/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs +++ b/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs @@ -52,7 +52,7 @@ protected override Task GivenAsync() .WithLeaderId(Int32.From(0)) .WithPartitionIndex(Int32.From(0)) .WithReplicaNodesCollection(new[] { Int32.From(0) })))) - .ToArray() ?? new Func[0]) + .ToArray() ?? Array.Empty>()) .WithControllerId(Int32.From(0)) .WithClusterId(String.From("test")) .WithBrokersCollection(broker => broker @@ -96,7 +96,7 @@ protected override async Task WhenAsync() await using (_testServer.Start() .ConfigureAwait(false)) { - await ProduceMessageFromClientAsync("localhost", _testServer.Port) + await ProduceMessageFromClientAsync("localhost", _testServer.Port, _testServer.Stopping) .ConfigureAwait(false); } } @@ -114,7 +114,7 @@ public void It_should_have_read_the_message_sent() } private static async Task ProduceMessageFromClientAsync(string host, - int port) + int port, CancellationToken testServerStopping) { var producerConfig = new ProducerConfig(new Dictionary { @@ -133,11 +133,11 @@ private static async Task ProduceMessageFromClientAsync(string host, var report = await producer .ProduceAsync("my-topic", - new Message { Value = "test" }) + new Message { Value = "test" }, testServerStopping) .ConfigureAwait(false); LogFactory.Create("producer").Info("Produce report {@report}", report); - producer.Flush(); + producer.Flush(testServerStopping); } } } diff --git a/tests/Kafka.TestFramework.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs b/tests/Kafka.TestFramework.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs index edab6e7..07822f9 100644 --- a/tests/Kafka.TestFramework.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs +++ b/tests/Kafka.TestFramework.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs @@ -130,13 +130,6 @@ public void .ApiKeysCollection[FetchRequest.ApiKey].MaxVersion.Should() .Be(FetchRequest.MaxVersion); } - - protected override async Task TearDownAsync() - { - await _testServer - .DisposeAsync() - .ConfigureAwait(false); - } } } } \ No newline at end of file