From fdaa3a64ca036544df3aa0ff23d43dc234a83572 Mon Sep 17 00:00:00 2001 From: Fredrik Arvidsson Date: Tue, 18 Jan 2022 17:31:52 +0100 Subject: [PATCH 01/17] test: add framework tests for previously supported Confluent.Kafka 1.1.0 --- Kafka.TestFramework.sln | 17 +- src/Kafka.TestFramework/NetworkStream.cs | 52 +++++++ .../BytesExtensions.cs | 13 ++ .../Confluent.Kafka.1.1.0.Tests.csproj | 34 ++++ ...g_to_the_server_and_producing_a_message.cs | 145 ++++++++++++++++++ ...the_client_sends_the_message_subscribed.cs | 142 +++++++++++++++++ .../LogExtensions.cs | 50 ++++++ .../TestSpecificationAsync.cs | 41 +++++ .../appsettings.json | 26 ++++ 9 files changed, 518 insertions(+), 2 deletions(-) create mode 100644 src/Kafka.TestFramework/NetworkStream.cs create mode 100644 tests/Confluent.Kafka.1.1.0.Tests/BytesExtensions.cs create mode 100644 tests/Confluent.Kafka.1.1.0.Tests/Confluent.Kafka.1.1.0.Tests.csproj create mode 100644 tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs create mode 100644 tests/Confluent.Kafka.1.1.0.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs create mode 100644 tests/Confluent.Kafka.1.1.0.Tests/LogExtensions.cs create mode 100644 tests/Confluent.Kafka.1.1.0.Tests/TestSpecificationAsync.cs create mode 100644 tests/Confluent.Kafka.1.1.0.Tests/appsettings.json diff --git a/Kafka.TestFramework.sln b/Kafka.TestFramework.sln index 0cd3255..fce0fb4 100644 --- a/Kafka.TestFramework.sln +++ b/Kafka.TestFramework.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio Version 16 -VisualStudioVersion = 16.0.30011.22 +# Visual Studio Version 17 +VisualStudioVersion = 17.0.31903.59 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{0EEC4A41-7C67-4580-8A21-BFA01B2415F1}" EndProject @@ -16,6 +16,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Kafka.TestFramework.Tests", EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Kafka.TestFramework", "src\Kafka.TestFramework\Kafka.TestFramework.csproj", "{161B63C2-B654-4611-AAC8-28B54B5F3C9D}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Kafka.Protocol", "..\Kafka.Protocol\Kafka.Protocol\Kafka.Protocol.csproj", "{2317A4BE-9250-4FCE-91B6-B6A7BFD421B7}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Confluent.Kafka.1.1.0.Tests", "tests\Confluent.Kafka.1.1.0.Tests\Confluent.Kafka.1.1.0.Tests.csproj", "{12646283-CC9E-4D45-8E89-F52337C790B6}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -30,12 +34,21 @@ Global {161B63C2-B654-4611-AAC8-28B54B5F3C9D}.Debug|Any CPU.Build.0 = Debug|Any CPU {161B63C2-B654-4611-AAC8-28B54B5F3C9D}.Release|Any CPU.ActiveCfg = Release|Any CPU {161B63C2-B654-4611-AAC8-28B54B5F3C9D}.Release|Any CPU.Build.0 = Release|Any CPU + {2317A4BE-9250-4FCE-91B6-B6A7BFD421B7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2317A4BE-9250-4FCE-91B6-B6A7BFD421B7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2317A4BE-9250-4FCE-91B6-B6A7BFD421B7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {2317A4BE-9250-4FCE-91B6-B6A7BFD421B7}.Release|Any CPU.Build.0 = Release|Any CPU + {12646283-CC9E-4D45-8E89-F52337C790B6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {12646283-CC9E-4D45-8E89-F52337C790B6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {12646283-CC9E-4D45-8E89-F52337C790B6}.Release|Any CPU.ActiveCfg = Release|Any CPU + {12646283-CC9E-4D45-8E89-F52337C790B6}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection GlobalSection(NestedProjects) = preSolution {FCFCC73C-B0A6-4D4E-A765-9A94303D00AD} = {0EEC4A41-7C67-4580-8A21-BFA01B2415F1} + {12646283-CC9E-4D45-8E89-F52337C790B6} = {0EEC4A41-7C67-4580-8A21-BFA01B2415F1} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {B432BD60-300C-4955-9D44-011413D52F2D} diff --git a/src/Kafka.TestFramework/NetworkStream.cs b/src/Kafka.TestFramework/NetworkStream.cs new file mode 100644 index 0000000..39ac8f4 --- /dev/null +++ b/src/Kafka.TestFramework/NetworkStream.cs @@ -0,0 +1,52 @@ +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Kafka.TestFramework +{ + internal class NetworkStream : Stream + { + private readonly INetworkClient _networkClient; + + public NetworkStream(INetworkClient networkClient) + { + _networkClient = networkClient; + } + + public override void Flush() + { + } + + public override int Read(byte[] buffer, int offset, int count) + { + throw new System.NotImplementedException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new System.NotImplementedException(); + } + + public override void SetLength(long value) + { + throw new System.NotImplementedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw new System.NotImplementedException(); + } + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = new CancellationToken()) + { + return new ValueTask(_networkClient.SendAsync(buffer, cancellationToken).AsTask()); + } + + public override bool CanRead => false; + public override bool CanSeek => false; + public override bool CanWrite => true; + public override long Length => 0; + public override long Position { get; set; } = 0; + } +} \ No newline at end of file diff --git a/tests/Confluent.Kafka.1.1.0.Tests/BytesExtensions.cs b/tests/Confluent.Kafka.1.1.0.Tests/BytesExtensions.cs new file mode 100644 index 0000000..b1d6757 --- /dev/null +++ b/tests/Confluent.Kafka.1.1.0.Tests/BytesExtensions.cs @@ -0,0 +1,13 @@ +using System.Text; +using Kafka.Protocol; + +namespace Kafka.TestFramework.Tests +{ + internal static class BytesExtensions + { + internal static string EncodeToString(this byte[] bytes, Encoding encoding) + { + return encoding.GetString(bytes); + } + } +} \ No newline at end of file diff --git a/tests/Confluent.Kafka.1.1.0.Tests/Confluent.Kafka.1.1.0.Tests.csproj b/tests/Confluent.Kafka.1.1.0.Tests/Confluent.Kafka.1.1.0.Tests.csproj new file mode 100644 index 0000000..a25ecd8 --- /dev/null +++ b/tests/Confluent.Kafka.1.1.0.Tests/Confluent.Kafka.1.1.0.Tests.csproj @@ -0,0 +1,34 @@ + + + + netcoreapp3.0 + + false + + + + + + + + + + + + + + + + + + + + + + + + Always + + + + diff --git a/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs b/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs new file mode 100644 index 0000000..8bc03b2 --- /dev/null +++ b/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs @@ -0,0 +1,145 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Confluent.Kafka; +using FluentAssertions; +using Kafka.Protocol; +using Log.It; +using Xunit; +using Xunit.Abstractions; +using Int32 = Kafka.Protocol.Int32; +using Int64 = Kafka.Protocol.Int64; +using Record = Kafka.Protocol.Records.Record; +using String = Kafka.Protocol.String; + +namespace Kafka.TestFramework.Tests +{ + public partial class Given_a_socket_based_test_server + { + public class When_connecting_to_the_server_and_producing_a_message : TestSpecificationAsync + { + private SocketBasedKafkaTestFramework _testServer; + private IEnumerable _records; + + public When_connecting_to_the_server_and_producing_a_message( + ITestOutputHelper testOutputHelper) + : base(testOutputHelper) + { + } + + protected override Task GivenAsync() + { + _testServer = KafkaTestFramework.WithSocket(); + + _testServer.On( + request => request.Respond() + .WithAllApiKeys()); + + _testServer.On( + request => request.Respond() + .WithTopicsCollection( + request.TopicsCollection?.Select(topic => + new Func( + responseTopic => + responseTopic + .WithName(topic.Name) + .WithPartitionsCollection(partition => + partition + .WithLeaderId(Int32.From(0)) + .WithPartitionIndex(Int32.From(0)) + .WithReplicaNodesCollection(new[] { Int32.From(0) })))) + .ToArray() ?? new Func[0]) + .WithControllerId(Int32.From(0)) + .WithClusterId(String.From("test")) + .WithBrokersCollection(broker => broker + .WithRack(String.From("testrack")) + .WithNodeId(Int32.From(0)) + .WithHost(String.From("localhost")) + .WithPort(Int32.From(_testServer.Port))) + ); + + _testServer.On(request => + { + _records = request.TopicDataCollection.SelectMany(pair => + pair.Value.PartitionDataCollection.Where(data => data.Records != null) + .SelectMany(data => data.Records!.Records)); + return request + .Respond() + .WithResponsesCollection(request.TopicDataCollection.Select(topicProduceData => + new Func( + topicProduceResponse => + topicProduceResponse + .WithName(topicProduceData.Value.Name) + .WithPartitionResponsesCollection(topicProduceData.Value + .PartitionDataCollection.Select(partitionProduceData => + new Func( + partitionProduceResponse => + partitionProduceResponse + .WithIndex(partitionProduceData.Index) + .WithLogAppendTimeMs(Int64.From(-1)))) + .ToArray()))) + .ToArray()); + }); + + return Task.CompletedTask; + } + + protected override async Task WhenAsync() + { + await using (_testServer.Start() + .ConfigureAwait(false)) + { + await ProduceMessageFromClientAsync("localhost", _testServer.Port) + .ConfigureAwait(false); + } + } + + [Fact] + public void It_should_have_read_one_record() + { + _records.Should().HaveCount(1); + } + + [Fact] + public void It_should_have_read_the_message_sent() + { + _records.First().Value.EncodeToString(Encoding.UTF8).Should().Be("test"); + } + + private static async Task ProduceMessageFromClientAsync(string host, + int port) + { + var producerConfig = new ProducerConfig(new Dictionary + { + { "log_level", "7"} + }) + { + BootstrapServers = $"{host}:{port}", + MessageTimeoutMs = 5000, + SocketTimeoutMs = 30000, + Debug = "all" + }; + + using var producer = + new ProducerBuilder(producerConfig) + .SetLogHandler(LogExtensions.UseLogIt) + .Build(); + + var report = await producer + .ProduceAsync("my-topic", + new Message { Value = "test" }) + .ConfigureAwait(false); + LogFactory.Create("producer").Info("Produce report {@report}", report); + + producer.Flush(); + } + } + } +} \ No newline at end of file diff --git a/tests/Confluent.Kafka.1.1.0.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs b/tests/Confluent.Kafka.1.1.0.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs new file mode 100644 index 0000000..edab6e7 --- /dev/null +++ b/tests/Confluent.Kafka.1.1.0.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs @@ -0,0 +1,142 @@ +using System.Threading.Tasks; +using FluentAssertions; +using Kafka.Protocol; +using Xunit; +using Xunit.Abstractions; + +namespace Kafka.TestFramework.Tests +{ + public partial class Given_an_inmemory_kafka_test_framework_and_a_message_subscription + { + public partial class + When_the_client_sends_the_message_subscribed : + TestSpecificationAsync + { + private readonly InMemoryKafkaTestFramework _testServer = + KafkaTestFramework.InMemory(); + + private ResponsePayload _response; + + public When_the_client_sends_the_message_subscribed( + ITestOutputHelper testOutputHelper) + : base(testOutputHelper) + { + } + + protected override Task GivenAsync() + { + _testServer.On( + request => request.Respond() + .WithThrottleTimeMs(Int32.From(100)) + .WithApiKeysCollection( + key => key + .WithApiKey(FetchRequest.ApiKey) + .WithMinVersion(FetchRequest.MinVersion) + .WithMaxVersion(FetchRequest.MaxVersion))); + + return Task.CompletedTask; + } + + protected override async Task WhenAsync() + { + await using (_testServer.Start() + .ConfigureAwait(false)) + { + var client = await _testServer + .CreateRequestClientAsync() + .ConfigureAwait(false); + + var message = new ApiVersionsRequest(ApiVersionsRequest.MaxVersion); + var requestPayload = new RequestPayload( + new RequestHeader(message.HeaderVersion) + .WithRequestApiKey(ApiVersionsRequest.ApiKey) + .WithRequestApiVersion( + message.Version) + .WithCorrelationId(Int32.From(12)), + message + ); + + await client + .SendAsync(requestPayload) + .ConfigureAwait(false); + + _response = await client + .ReadAsync(requestPayload) + .ConfigureAwait(false); + } + } + + [Fact] + public void + The_subscription_should_receive_a_api_versions_response() + { + _response.Message.Should().BeOfType(); + } + + [Fact] + public void + The_subscription_should_receive_a_api_versions_response_with_correlation_id() + { + _response.Header.CorrelationId.Should().Be(Int32.From(12)); + } + + [Fact] + public void + The_subscription_should_receive_a_api_versions_response_with_throttle_time() + { + _response.Message.As() + .ThrottleTimeMs.Should().Be(Int32.From(100)); + } + + [Fact] + public void + The_subscription_should_receive_a_api_versions_response_with_one_api_key() + { + _response.Message.As() + .ApiKeysCollection.Should().HaveCount(1); + } + + [Fact] + public void + The_subscription_should_receive_a_api_versions_response_with_fetch_request_api_key() + { + _response.Message.As() + .ApiKeysCollection.Value.Should().ContainKey(FetchRequest.ApiKey); + } + + [Fact] + public void + The_subscription_should_receive_a_api_versions_response_with_fetch_request_api_index() + { + _response.Message.As() + .ApiKeysCollection[FetchRequest.ApiKey].ApiKey.Should() + .Be(FetchRequest.ApiKey); + } + + [Fact] + public void + The_subscription_should_receive_a_api_versions_response_with_fetch_request_api_min_version() + { + _response.Message.As() + .ApiKeysCollection[FetchRequest.ApiKey].MinVersion.Should() + .Be(FetchRequest.MinVersion); + } + + [Fact] + public void + The_subscription_should_receive_a_api_versions_response_with_fetch_request_api_max_version() + { + _response.Message.As() + .ApiKeysCollection[FetchRequest.ApiKey].MaxVersion.Should() + .Be(FetchRequest.MaxVersion); + } + + protected override async Task TearDownAsync() + { + await _testServer + .DisposeAsync() + .ConfigureAwait(false); + } + } + } +} \ No newline at end of file diff --git a/tests/Confluent.Kafka.1.1.0.Tests/LogExtensions.cs b/tests/Confluent.Kafka.1.1.0.Tests/LogExtensions.cs new file mode 100644 index 0000000..6f56fd7 --- /dev/null +++ b/tests/Confluent.Kafka.1.1.0.Tests/LogExtensions.cs @@ -0,0 +1,50 @@ +using System; +using Confluent.Kafka; +using Log.It; + +namespace Kafka.TestFramework.Tests +{ + internal static class LogExtensions + { + internal static void UseLogIt( + IProducer producer, LogMessage logMessage) + { + var logger = LogFactory.Create(producer.GetType().GetPrettyName()); + + switch (logMessage.Level) + { + case SyslogLevel.Debug: + LogTo(logger.Debug); + break; + case SyslogLevel.Notice: + case SyslogLevel.Info: + LogTo(logger.Info); + break; + case SyslogLevel.Warning: + LogTo(logger.Warning); + break; + case SyslogLevel.Error: + LogTo(logger.Error); + break; + case SyslogLevel.Alert: + case SyslogLevel.Critical: + case SyslogLevel.Emergency: + LogTo(logger.Fatal); + break; + default: + throw new ArgumentOutOfRangeException($"{logMessage.Level} is not supported"); + } + + void LogTo(Action log) + { + log("{name} {facility}: {message}", + new object[] + { + logMessage.Name, + logMessage.Facility, + logMessage.Message + }); + } + } + } +} \ No newline at end of file diff --git a/tests/Confluent.Kafka.1.1.0.Tests/TestSpecificationAsync.cs b/tests/Confluent.Kafka.1.1.0.Tests/TestSpecificationAsync.cs new file mode 100644 index 0000000..22adb82 --- /dev/null +++ b/tests/Confluent.Kafka.1.1.0.Tests/TestSpecificationAsync.cs @@ -0,0 +1,41 @@ +using System.Threading.Tasks; +using Log.It; +using Log.It.With.NLog; +using Microsoft.Extensions.Configuration; +using NLog.Extensions.Logging; +using Test.It.With.XUnit; +using Xunit.Abstractions; + +namespace Kafka.TestFramework.Tests +{ + public class TestSpecificationAsync : XUnit2SpecificationAsync + { + static TestSpecificationAsync() + { + var config = new ConfigurationBuilder() + .SetBasePath(System.IO.Directory.GetCurrentDirectory()) + .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true) + .Build(); + + NLog.LogManager.Configuration = new NLogLoggingConfiguration(config.GetSection("NLog")); + LogFactory.Initialize(new NLogFactory(new LogicalThreadContext())); + } + + public TestSpecificationAsync(ITestOutputHelper testOutputHelper) : base(testOutputHelper) + { + NLogCapturingTarget.Subscribe += TestOutputHelper.WriteLine; + } + + protected virtual Task TearDownAsync() + { + return Task.CompletedTask; + } + + protected sealed override async Task DisposeAsync(bool disposing) + { + NLogCapturingTarget.Subscribe -= TestOutputHelper.WriteLine; + await TearDownAsync(); + await base.DisposeAsync(disposing); + } + } +} \ No newline at end of file diff --git a/tests/Confluent.Kafka.1.1.0.Tests/appsettings.json b/tests/Confluent.Kafka.1.1.0.Tests/appsettings.json new file mode 100644 index 0000000..712e57e --- /dev/null +++ b/tests/Confluent.Kafka.1.1.0.Tests/appsettings.json @@ -0,0 +1,26 @@ +{ + "NLog": { + "throwConfigExceptions": true, + "internalLogLevel": "info", + "internalLogFile": "${basedir}/internal-nlog.txt", + "extensions": [ + { "assembly": "Log.It.With.NLog" } + ], + "variables": { + "simplePipeSeparatorLayout": "${date:format=yyyy-MM-dd HH\\:mm\\:ss.fff} | ${logger} | ${level} | ${message} ${onexception:| ${exception:format=type} | ${exception:format=method} | ${exception:format=message} | ${exception:format=stacktrace} | ${exception:method:maxInnerExceptionLevel=5:innerFormat=shortType,message,method}}" + }, + "targets": { + "NLogCapturing": { + "type": "NLogCapturing", + "layout": "${simplePipeSeparatorLayout}" + } + }, + "rules": [ + { + "logger": "*", + "minLevel": "Trace", + "writeTo": "NLogCapturing" + } + ] + } +} \ No newline at end of file From c8c40e25dd103fe25f90151ed08314414d45e61a Mon Sep 17 00:00:00 2001 From: Fredrik Arvidsson Date: Tue, 18 Jan 2022 17:33:23 +0100 Subject: [PATCH 02/17] feat: support the latest kafka protocol BREAKING CHANGE: The test framework has no breaking changes but the protocol has --- src/Kafka.TestFramework/Client.cs | 53 ++--------------- .../Kafka.TestFramework.csproj | 5 +- src/Kafka.TestFramework/KafkaTestFramework.cs | 4 +- src/Kafka.TestFramework/RequestClient.cs | 10 +++- src/Kafka.TestFramework/ResponseClient.cs | 9 ++- .../BytesExtensions.cs | 4 +- ...g_to_the_server_and_producing_a_message.cs | 53 +++++++++-------- ...the_client_sends_the_message_subscribed.cs | 14 +++-- .../Kafka.TestFramework.Tests.csproj | 3 +- .../ProduceRequestExtensions.cs | 57 ------------------- 10 files changed, 66 insertions(+), 146 deletions(-) delete mode 100644 tests/Kafka.TestFramework.Tests/ProduceRequestExtensions.cs diff --git a/src/Kafka.TestFramework/Client.cs b/src/Kafka.TestFramework/Client.cs index f4655de..b5b851e 100644 --- a/src/Kafka.TestFramework/Client.cs +++ b/src/Kafka.TestFramework/Client.cs @@ -8,8 +8,7 @@ namespace Kafka.TestFramework { - internal abstract class Client : IAsyncDisposable - where TSendPayload : IPayload + internal abstract class Client : IAsyncDisposable { private readonly CancellationTokenSource _cancellationSource = new CancellationTokenSource(); private readonly Pipe _pipe = new Pipe(); @@ -19,53 +18,13 @@ internal abstract class Client : IAsyncDisposable protected Client(INetworkClient networkClient) { _networkClient = networkClient; - Reader = new KafkaReader(_pipe.Reader); - } - - protected IKafkaReader Reader { get; } - - public async ValueTask SendAsync( - TSendPayload payload, - CancellationToken cancellationToken = default) - { - var buffer = new MemoryStream(); - await using var _ = buffer.ConfigureAwait(false); - var writer = new KafkaWriter(buffer); - await using (writer.ConfigureAwait(false)) - { - await payload - .WriteToAsync(writer, cancellationToken) - .ConfigureAwait(false); - } - - var lengthBuffer = new MemoryStream(); - await using (lengthBuffer - .ConfigureAwait(false)) - { - var lengthWriter = new KafkaWriter(lengthBuffer); - await using (lengthWriter.ConfigureAwait(false)) - { - await lengthWriter - .WriteInt32Async(Int32.From((int)buffer.Length), cancellationToken) - .ConfigureAwait(false); - } - - await _networkClient.SendAsync( - lengthBuffer - .GetBuffer() - .AsMemory() - .Slice(0, (int)lengthBuffer.Length), - cancellationToken) - .ConfigureAwait(false); - } - - await _networkClient.SendAsync( - buffer.GetBuffer().AsMemory() - .Slice(0, (int)buffer.Length), - cancellationToken) - .ConfigureAwait(false); + NetworkClient = new NetworkStream(networkClient); + Reader = _pipe.Reader; } + protected NetworkStream NetworkClient { get; } + protected PipeReader Reader { get; } + protected void StartReceiving() { _sendAndReceiveBackgroundTask = Task.Run( diff --git a/src/Kafka.TestFramework/Kafka.TestFramework.csproj b/src/Kafka.TestFramework/Kafka.TestFramework.csproj index 3fa8669..986bc2b 100644 --- a/src/Kafka.TestFramework/Kafka.TestFramework.csproj +++ b/src/Kafka.TestFramework/Kafka.TestFramework.csproj @@ -20,13 +20,16 @@ - all runtime; build; native; contentfiles; analyzers; buildtransitive + + + + false diff --git a/src/Kafka.TestFramework/KafkaTestFramework.cs b/src/Kafka.TestFramework/KafkaTestFramework.cs index 128aaae..259897e 100644 --- a/src/Kafka.TestFramework/KafkaTestFramework.cs +++ b/src/Kafka.TestFramework/KafkaTestFramework.cs @@ -105,8 +105,8 @@ private void ReceiveMessagesFor(INetworkClient networkClient) await client .SendAsync( new ResponsePayload( - requestPayload, - new ResponseHeader(requestPayload.Header.Version) + new ResponseHeader( + Messages.GetResponseHeaderVersionFor(requestPayload)) .WithCorrelationId(requestPayload.Header.CorrelationId), response), _cancellationTokenSource.Token) diff --git a/src/Kafka.TestFramework/RequestClient.cs b/src/Kafka.TestFramework/RequestClient.cs index 1fe0a27..f1bf3c1 100644 --- a/src/Kafka.TestFramework/RequestClient.cs +++ b/src/Kafka.TestFramework/RequestClient.cs @@ -1,10 +1,11 @@ using System.Threading; using System.Threading.Tasks; using Kafka.Protocol; +using Int32 = Kafka.Protocol.Int32; namespace Kafka.TestFramework { - internal class RequestClient : Client, IRequestClient + internal class RequestClient : Client, IRequestClient { private RequestClient(INetworkClient networkClient) : base(networkClient) { @@ -17,6 +18,13 @@ internal static RequestClient Start(INetworkClient networkClient) return client; } + public ValueTask SendAsync( + RequestPayload payload, + CancellationToken cancellationToken = default) + { + return payload.WriteToAsync(NetworkClient, cancellationToken); + } + public async ValueTask ReadAsync( RequestPayload requestPayload, CancellationToken cancellationToken = default) diff --git a/src/Kafka.TestFramework/ResponseClient.cs b/src/Kafka.TestFramework/ResponseClient.cs index bc56ec8..ef12f73 100644 --- a/src/Kafka.TestFramework/ResponseClient.cs +++ b/src/Kafka.TestFramework/ResponseClient.cs @@ -4,7 +4,7 @@ namespace Kafka.TestFramework { - internal class ResponseClient : Client + internal class ResponseClient : Client { private ResponseClient(INetworkClient networkClient) : base(networkClient) { @@ -17,6 +17,13 @@ internal static ResponseClient Start(INetworkClient networkClient) return client; } + internal ValueTask SendAsync( + ResponsePayload payload, + CancellationToken cancellationToken = default) + { + return payload.WriteToAsync(NetworkClient, cancellationToken); + } + internal async Task ReadAsync( CancellationToken cancellationToken = default) { diff --git a/tests/Kafka.TestFramework.Tests/BytesExtensions.cs b/tests/Kafka.TestFramework.Tests/BytesExtensions.cs index 64075b3..b1d6757 100644 --- a/tests/Kafka.TestFramework.Tests/BytesExtensions.cs +++ b/tests/Kafka.TestFramework.Tests/BytesExtensions.cs @@ -5,9 +5,9 @@ namespace Kafka.TestFramework.Tests { internal static class BytesExtensions { - internal static string EncodeToString(this Bytes bytes, Encoding encoding) + internal static string EncodeToString(this byte[] bytes, Encoding encoding) { - return encoding.GetString(bytes.Value); + return encoding.GetString(bytes); } } } \ No newline at end of file diff --git a/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs b/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs index cc55336..8bc03b2 100644 --- a/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs +++ b/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs @@ -61,32 +61,32 @@ protected override Task GivenAsync() .WithHost(String.From("localhost")) .WithPort(Int32.From(_testServer.Port))) ); - - _testServer.On(async request => (await request - .WithActionAsync(produceRequest => Task.Run(async () => - { - _records = await produceRequest - .ExtractRecordsAsync(CancellationToken.None) - .ToListAsync() - .ConfigureAwait(false); - })) - .ConfigureAwait(false)) - .Respond() - .WithResponsesCollection(request.TopicsCollection.Select(topicProduceData => - new Func( - topicProduceResponse => - topicProduceResponse - .WithName(topicProduceData.Name) - .WithPartitionsCollection(topicProduceData.PartitionsCollection.Select(partitionProduceData => - new Func( - partitionProduceResponse => - partitionProduceResponse - .WithPartitionIndex(partitionProduceData.PartitionIndex) - .WithLogAppendTimeMs(Int64.From(-1)))) - .ToArray()))) - .ToArray())); + + _testServer.On(request => + { + _records = request.TopicDataCollection.SelectMany(pair => + pair.Value.PartitionDataCollection.Where(data => data.Records != null) + .SelectMany(data => data.Records!.Records)); + return request + .Respond() + .WithResponsesCollection(request.TopicDataCollection.Select(topicProduceData => + new Func( + topicProduceResponse => + topicProduceResponse + .WithName(topicProduceData.Value.Name) + .WithPartitionResponsesCollection(topicProduceData.Value + .PartitionDataCollection.Select(partitionProduceData => + new Func( + partitionProduceResponse => + partitionProduceResponse + .WithIndex(partitionProduceData.Index) + .WithLogAppendTimeMs(Int64.From(-1)))) + .ToArray()))) + .ToArray()); + }); return Task.CompletedTask; } @@ -123,7 +123,6 @@ private static async Task ProduceMessageFromClientAsync(string host, { BootstrapServers = $"{host}:{port}", MessageTimeoutMs = 5000, - MetadataRequestTimeoutMs = 5000, SocketTimeoutMs = 30000, Debug = "all" }; diff --git a/tests/Kafka.TestFramework.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs b/tests/Kafka.TestFramework.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs index 8082b93..edab6e7 100644 --- a/tests/Kafka.TestFramework.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs +++ b/tests/Kafka.TestFramework.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs @@ -30,7 +30,7 @@ protected override Task GivenAsync() .WithThrottleTimeMs(Int32.From(100)) .WithApiKeysCollection( key => key - .WithIndex(FetchRequest.ApiKey) + .WithApiKey(FetchRequest.ApiKey) .WithMinVersion(FetchRequest.MinVersion) .WithMaxVersion(FetchRequest.MaxVersion))); @@ -46,13 +46,15 @@ protected override async Task WhenAsync() .CreateRequestClientAsync() .ConfigureAwait(false); + var message = new ApiVersionsRequest(ApiVersionsRequest.MaxVersion); var requestPayload = new RequestPayload( - new RequestHeader(RequestHeader.MaxVersion) + new RequestHeader(message.HeaderVersion) .WithRequestApiKey(ApiVersionsRequest.ApiKey) .WithRequestApiVersion( - ApiVersionsRequest.MaxVersion) + message.Version) .WithCorrelationId(Int32.From(12)), - new ApiVersionsRequest(ApiVersionsRequest.MaxVersion)); + message + ); await client .SendAsync(requestPayload) @@ -99,7 +101,7 @@ public void The_subscription_should_receive_a_api_versions_response_with_fetch_request_api_key() { _response.Message.As() - .ApiKeysCollection.Should().ContainKey(FetchRequest.ApiKey); + .ApiKeysCollection.Value.Should().ContainKey(FetchRequest.ApiKey); } [Fact] @@ -107,7 +109,7 @@ public void The_subscription_should_receive_a_api_versions_response_with_fetch_request_api_index() { _response.Message.As() - .ApiKeysCollection[FetchRequest.ApiKey].Index.Should() + .ApiKeysCollection[FetchRequest.ApiKey].ApiKey.Should() .Be(FetchRequest.ApiKey); } diff --git a/tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj b/tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj index 73a44e8..ab45425 100644 --- a/tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj +++ b/tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj @@ -7,9 +7,8 @@ - + - diff --git a/tests/Kafka.TestFramework.Tests/ProduceRequestExtensions.cs b/tests/Kafka.TestFramework.Tests/ProduceRequestExtensions.cs deleted file mode 100644 index 2be7fec..0000000 --- a/tests/Kafka.TestFramework.Tests/ProduceRequestExtensions.cs +++ /dev/null @@ -1,57 +0,0 @@ -using System; -using System.Collections.Generic; -using System.IO.Pipelines; -using System.Linq; -using System.Runtime.CompilerServices; -using System.Threading; -using System.Threading.Tasks; -using Kafka.Protocol; -using Kafka.Protocol.Records; -using Int16 = Kafka.Protocol.Int16; - -namespace Kafka.TestFramework.Tests -{ - internal static class ProduceRequestExtensions - { - internal static async IAsyncEnumerable ExtractRecordBatchesAsync( - this ProduceRequest produceRequest, - [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - var records = produceRequest.TopicsCollection.SelectMany(data => - data.PartitionsCollection.Select(produceData => - produceData.Records)) - .Where(record => record.HasValue); - - var pipe = new Pipe(); - var reader = new KafkaReader(pipe.Reader); - foreach (var record in records) - { - await pipe.Writer.WriteAsync( - record.Value.Value.AsMemory(), - cancellationToken); - - yield return await RecordBatch.ReadFromAsync(Int16.Default, reader, - cancellationToken); - } - } - - internal static async IAsyncEnumerable ExtractRecordsAsync( - this ProduceRequest request, - [EnumeratorCancellation] CancellationToken cancellationToken) - { - await foreach (var batch in request - .ExtractRecordBatchesAsync(cancellationToken) - .ConfigureAwait(false)) - { - if (batch.Records == null) - continue; - - foreach (var record in batch.Records) - { - yield return record; - } - } - } - - } -} \ No newline at end of file From 22fbede94a4df6645c798159db82b1fc29a0db4f Mon Sep 17 00:00:00 2001 From: Fredrik Arvidsson Date: Wed, 19 Jan 2022 19:29:42 +0100 Subject: [PATCH 03/17] request payloads no longer needs header version when reading as headers can figure out the version by looking up the message and version --- src/Kafka.TestFramework/KafkaTestFramework.cs | 1 - src/Kafka.TestFramework/ResponseClient.cs | 1 - ...the_client_sends_the_message_subscribed.cs | 142 ------------------ 3 files changed, 144 deletions(-) delete mode 100644 tests/Confluent.Kafka.1.1.0.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs diff --git a/src/Kafka.TestFramework/KafkaTestFramework.cs b/src/Kafka.TestFramework/KafkaTestFramework.cs index 259897e..cd2e47a 100644 --- a/src/Kafka.TestFramework/KafkaTestFramework.cs +++ b/src/Kafka.TestFramework/KafkaTestFramework.cs @@ -117,7 +117,6 @@ await client return; } } - }); _backgroundTasks.Add(task); } diff --git a/src/Kafka.TestFramework/ResponseClient.cs b/src/Kafka.TestFramework/ResponseClient.cs index ef12f73..081c1f9 100644 --- a/src/Kafka.TestFramework/ResponseClient.cs +++ b/src/Kafka.TestFramework/ResponseClient.cs @@ -29,7 +29,6 @@ internal async Task ReadAsync( { return await RequestPayload .ReadFromAsync( - RequestHeader.MaxVersion, Reader, cancellationToken) .ConfigureAwait(false); diff --git a/tests/Confluent.Kafka.1.1.0.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs b/tests/Confluent.Kafka.1.1.0.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs deleted file mode 100644 index edab6e7..0000000 --- a/tests/Confluent.Kafka.1.1.0.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs +++ /dev/null @@ -1,142 +0,0 @@ -using System.Threading.Tasks; -using FluentAssertions; -using Kafka.Protocol; -using Xunit; -using Xunit.Abstractions; - -namespace Kafka.TestFramework.Tests -{ - public partial class Given_an_inmemory_kafka_test_framework_and_a_message_subscription - { - public partial class - When_the_client_sends_the_message_subscribed : - TestSpecificationAsync - { - private readonly InMemoryKafkaTestFramework _testServer = - KafkaTestFramework.InMemory(); - - private ResponsePayload _response; - - public When_the_client_sends_the_message_subscribed( - ITestOutputHelper testOutputHelper) - : base(testOutputHelper) - { - } - - protected override Task GivenAsync() - { - _testServer.On( - request => request.Respond() - .WithThrottleTimeMs(Int32.From(100)) - .WithApiKeysCollection( - key => key - .WithApiKey(FetchRequest.ApiKey) - .WithMinVersion(FetchRequest.MinVersion) - .WithMaxVersion(FetchRequest.MaxVersion))); - - return Task.CompletedTask; - } - - protected override async Task WhenAsync() - { - await using (_testServer.Start() - .ConfigureAwait(false)) - { - var client = await _testServer - .CreateRequestClientAsync() - .ConfigureAwait(false); - - var message = new ApiVersionsRequest(ApiVersionsRequest.MaxVersion); - var requestPayload = new RequestPayload( - new RequestHeader(message.HeaderVersion) - .WithRequestApiKey(ApiVersionsRequest.ApiKey) - .WithRequestApiVersion( - message.Version) - .WithCorrelationId(Int32.From(12)), - message - ); - - await client - .SendAsync(requestPayload) - .ConfigureAwait(false); - - _response = await client - .ReadAsync(requestPayload) - .ConfigureAwait(false); - } - } - - [Fact] - public void - The_subscription_should_receive_a_api_versions_response() - { - _response.Message.Should().BeOfType(); - } - - [Fact] - public void - The_subscription_should_receive_a_api_versions_response_with_correlation_id() - { - _response.Header.CorrelationId.Should().Be(Int32.From(12)); - } - - [Fact] - public void - The_subscription_should_receive_a_api_versions_response_with_throttle_time() - { - _response.Message.As() - .ThrottleTimeMs.Should().Be(Int32.From(100)); - } - - [Fact] - public void - The_subscription_should_receive_a_api_versions_response_with_one_api_key() - { - _response.Message.As() - .ApiKeysCollection.Should().HaveCount(1); - } - - [Fact] - public void - The_subscription_should_receive_a_api_versions_response_with_fetch_request_api_key() - { - _response.Message.As() - .ApiKeysCollection.Value.Should().ContainKey(FetchRequest.ApiKey); - } - - [Fact] - public void - The_subscription_should_receive_a_api_versions_response_with_fetch_request_api_index() - { - _response.Message.As() - .ApiKeysCollection[FetchRequest.ApiKey].ApiKey.Should() - .Be(FetchRequest.ApiKey); - } - - [Fact] - public void - The_subscription_should_receive_a_api_versions_response_with_fetch_request_api_min_version() - { - _response.Message.As() - .ApiKeysCollection[FetchRequest.ApiKey].MinVersion.Should() - .Be(FetchRequest.MinVersion); - } - - [Fact] - public void - The_subscription_should_receive_a_api_versions_response_with_fetch_request_api_max_version() - { - _response.Message.As() - .ApiKeysCollection[FetchRequest.ApiKey].MaxVersion.Should() - .Be(FetchRequest.MaxVersion); - } - - protected override async Task TearDownAsync() - { - await _testServer - .DisposeAsync() - .ConfigureAwait(false); - } - } - } -} \ No newline at end of file From d5730ef4cdc030f6d6df6b35ab155d0338ef91ab Mon Sep 17 00:00:00 2001 From: Fredrik Arvidsson Date: Thu, 20 Jan 2022 19:03:24 +0100 Subject: [PATCH 04/17] ci: move to github actions --- .github/CODEOWNERS | 1 + .github/version_config.yml | 38 +++++++++++++ .github/workflows/ci.yml | 109 +++++++++++++++++++++++++++++++++++++ 3 files changed, 148 insertions(+) create mode 100644 .github/CODEOWNERS create mode 100644 .github/version_config.yml create mode 100644 .github/workflows/ci.yml diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..a580509 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1 @@ +* @Fresa \ No newline at end of file diff --git a/.github/version_config.yml b/.github/version_config.yml new file mode 100644 index 0000000..c569c97 --- /dev/null +++ b/.github/version_config.yml @@ -0,0 +1,38 @@ +mode: ContinuousDelivery +# Conventional Commits https://www.conventionalcommits.org/en/v1.0.0/ +# https://regex101.com/r/Ms7Vx6/2 +major-version-bump-message: "(build|chore|ci|docs|doc|feat|fix|perf|refactor|revert|style|test)(\\([a-z]+\\))?(!: .+|: (.+\\n\\n)+BREAKING CHANGE: .+)" +# https://regex101.com/r/Oqhi2m/1 +minor-version-bump-message: "(feat)(\\([a-z]+\\))?: .+" +# https://regex101.com/r/f5C4fP/1 +patch-version-bump-message: "(build|chore|ci|docs|doc|fix|perf|refactor|revert|style|test)(\\([a-z]+\\))?: .+" +# Match nothing +no-bump-message: ^\b$ +continuous-delivery-fallback-tag: '' +branches: + development: + increment: Patch + # Everything except main and master + regex: ^(?!(main|master)$) + track-merge-target: true + source-branches: [] + feature: + # Match nothing + regex: ^\b$ + develop: + # Match nothing + regex: ^\b$ + main: + source-branches: [] + release: + # Match nothing + regex: ^\b$ + pull-request: + # Match nothing + regex: ^\b$ + hotfix: + # Match nothing + regex: ^\b$ + support: + # Match nothing + regex: ^\b$ diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..9062a1d --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,109 @@ +name: Continuous Delivery + +on: + push: + branches: + - '**' + +jobs: + test: + name: Build & Test + runs-on: ${{ matrix.os }} + timeout-minutes: 10 + strategy: + matrix: + os: [windows-latest] + steps: + - uses: actions/checkout@v2 + - name: Setup .NET + uses: actions/setup-dotnet@v1 + with: + dotnet-version: 5.0.x + - name: Build + run: dotnet build -c Release + - name: Test + run: dotnet test -c Release --no-build --verbosity normal + + release: + name: Create Release + needs: [test] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + # Fetches entire history, so we can analyze commits since last tag + fetch-depth: 0 + - name: Install GitVersion + uses: gittools/actions/gitversion/setup@v0.9.7 + with: + versionSpec: '5.x' + - name: Determine Version + id: gitversion + uses: gittools/actions/gitversion/execute@v0 + with: + useConfigFile: true + configFilePath: .github/version_config.yml + - name: Determine Release Info + id: release + run: | + from_tag=$(git tag --points-at ${{ steps.gitversion.outputs.versionSourceSha }} | grep -m 1 ^v[0-9]*\.[0-9]*\.[0-9]* | head -1) + [[ -z "$from_tag" ]] && \ + from_ref_exclusive=${{ steps.gitversion.outputs.versionSourceSha }} || \ + from_ref_exclusive=$from_tag + + [[ -z "${{ steps.gitversion.outputs.preReleaseTag }}" ]] && \ + is_prerelease=false || \ + is_prerelease=true + + [[ $is_prerelease == true ]] && \ + version=${{ steps.gitversion.outputs.majorMinorPatch }}-pre-${{ steps.gitversion.outputs.commitsSinceVersionSource }} || \ + version=${{ steps.gitversion.outputs.majorMinorPatch }} + + echo "::set-output name=is_prerelease::$is_prerelease" + echo "::set-output name=tag::v$version" + echo "::set-output name=version::$version" + echo "::set-output name=from_ref_exclusive::$from_ref_exclusive" + - name: Create Tag + uses: actions/github-script@v3 + with: + script: | + github.git.createRef({ + owner: context.repo.owner, + repo: context.repo.repo, + ref: "refs/tags/${{ steps.release.outputs.tag }}", + sha: "${{ steps.gitversion.outputs.sha }}" + }); + - name: Generate Release Notes + id: release_notes + uses: Fresa/release-notes-generator@v0 + with: + version: ${{ steps.release.outputs.tag }} + from_ref_exclusive: ${{ steps.release.outputs.from_ref_exclusive }} + to_ref_inclusive: ${{ steps.release.outputs.tag }} + - name: Create Release + id: create_release + uses: softprops/action-gh-release@v1 + with: + body: ${{ steps.release_notes.outputs.release_notes }} + tag_name: ${{ steps.release.outputs.tag }} + prerelease: ${{ steps.release.outputs.is_prerelease }} + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + - name: Update Latest Minor Tag + uses: EndBug/latest-tag@v1 + if: steps.release.outputs.is_prerelease == 'false' + with: + tag-name: v${{ steps.gitversion.outputs.major }} + description: ${{ steps.release.outputs.tag }} + - name: Update Latest Patch Tag + uses: EndBug/latest-tag@v1 + if: steps.release.outputs.is_prerelease == 'false' + with: + tag-name: v${{ steps.gitversion.outputs.major }}.${{ steps.gitversion.outputs.minor }} + description: ${{ steps.release.outputs.tag }} + - name: Pack + env: + release_notes: ${{ steps.release_notes.outputs.release_notes }} + run: dotnet pack Kafka.TestFramework/Kafka.TestFramework.csproj -c Release -o nuget-packages -p:PackageVersion=${{ steps.release.outputs.version }} -p:PackageReleaseNotes="$release_notes" + - name: Publish to nuget.org + run: dotnet nuget push nuget-packages/Kafka.TestFramework.${{ steps.release.outputs.version }}.nupkg --api-key ${{secrets.NUGET_API_KEY}} --source https://api.nuget.org/v3/index.json From f106c88feca2c1cd099d048478ddf65b2de2fd4a Mon Sep 17 00:00:00 2001 From: Fredrik Arvidsson Date: Thu, 20 Jan 2022 19:07:52 +0100 Subject: [PATCH 05/17] use the latest Kafka.Protocol pre-release --- Kafka.TestFramework.sln | 6 ------ src/Kafka.TestFramework/Kafka.TestFramework.csproj | 5 +---- 2 files changed, 1 insertion(+), 10 deletions(-) diff --git a/Kafka.TestFramework.sln b/Kafka.TestFramework.sln index fce0fb4..dd44be3 100644 --- a/Kafka.TestFramework.sln +++ b/Kafka.TestFramework.sln @@ -16,8 +16,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Kafka.TestFramework.Tests", EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Kafka.TestFramework", "src\Kafka.TestFramework\Kafka.TestFramework.csproj", "{161B63C2-B654-4611-AAC8-28B54B5F3C9D}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Kafka.Protocol", "..\Kafka.Protocol\Kafka.Protocol\Kafka.Protocol.csproj", "{2317A4BE-9250-4FCE-91B6-B6A7BFD421B7}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Confluent.Kafka.1.1.0.Tests", "tests\Confluent.Kafka.1.1.0.Tests\Confluent.Kafka.1.1.0.Tests.csproj", "{12646283-CC9E-4D45-8E89-F52337C790B6}" EndProject Global @@ -34,10 +32,6 @@ Global {161B63C2-B654-4611-AAC8-28B54B5F3C9D}.Debug|Any CPU.Build.0 = Debug|Any CPU {161B63C2-B654-4611-AAC8-28B54B5F3C9D}.Release|Any CPU.ActiveCfg = Release|Any CPU {161B63C2-B654-4611-AAC8-28B54B5F3C9D}.Release|Any CPU.Build.0 = Release|Any CPU - {2317A4BE-9250-4FCE-91B6-B6A7BFD421B7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {2317A4BE-9250-4FCE-91B6-B6A7BFD421B7}.Debug|Any CPU.Build.0 = Debug|Any CPU - {2317A4BE-9250-4FCE-91B6-B6A7BFD421B7}.Release|Any CPU.ActiveCfg = Release|Any CPU - {2317A4BE-9250-4FCE-91B6-B6A7BFD421B7}.Release|Any CPU.Build.0 = Release|Any CPU {12646283-CC9E-4D45-8E89-F52337C790B6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {12646283-CC9E-4D45-8E89-F52337C790B6}.Debug|Any CPU.Build.0 = Debug|Any CPU {12646283-CC9E-4D45-8E89-F52337C790B6}.Release|Any CPU.ActiveCfg = Release|Any CPU diff --git a/src/Kafka.TestFramework/Kafka.TestFramework.csproj b/src/Kafka.TestFramework/Kafka.TestFramework.csproj index 986bc2b..f24cb99 100644 --- a/src/Kafka.TestFramework/Kafka.TestFramework.csproj +++ b/src/Kafka.TestFramework/Kafka.TestFramework.csproj @@ -20,16 +20,13 @@ + all runtime; build; native; contentfiles; analyzers; buildtransitive - - - - false From 62a96335f4c3922001b7c2578776320b913b800a Mon Sep 17 00:00:00 2001 From: Fredrik Arvidsson Date: Thu, 20 Jan 2022 19:22:22 +0100 Subject: [PATCH 06/17] ci: build and test with .NET Core 3.1 SDK on linux --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9062a1d..9124a94 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -12,13 +12,13 @@ jobs: timeout-minutes: 10 strategy: matrix: - os: [windows-latest] + os: [ubuntu-latest] steps: - uses: actions/checkout@v2 - name: Setup .NET uses: actions/setup-dotnet@v1 with: - dotnet-version: 5.0.x + dotnet-version: 3.1.x - name: Build run: dotnet build -c Release - name: Test From bab6ddbf60d5abcf9d2af5f23809b1d9c8bf0dcd Mon Sep 17 00:00:00 2001 From: Fredrik Arvidsson Date: Thu, 20 Jan 2022 19:30:00 +0100 Subject: [PATCH 07/17] ci: fix the path to the nuget project --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9124a94..bb368d6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -104,6 +104,6 @@ jobs: - name: Pack env: release_notes: ${{ steps.release_notes.outputs.release_notes }} - run: dotnet pack Kafka.TestFramework/Kafka.TestFramework.csproj -c Release -o nuget-packages -p:PackageVersion=${{ steps.release.outputs.version }} -p:PackageReleaseNotes="$release_notes" + run: dotnet pack src/Kafka.TestFramework/Kafka.TestFramework.csproj -c Release -o nuget-packages -p:PackageVersion=${{ steps.release.outputs.version }} -p:PackageReleaseNotes="$release_notes" - name: Publish to nuget.org run: dotnet nuget push nuget-packages/Kafka.TestFramework.${{ steps.release.outputs.version }}.nupkg --api-key ${{secrets.NUGET_API_KEY}} --source https://api.nuget.org/v3/index.json From dcdd7b377b4dfabf8b0fcb56150742ba6cc23b40 Mon Sep 17 00:00:00 2001 From: Fredrik Arvidsson Date: Thu, 20 Jan 2022 19:40:57 +0100 Subject: [PATCH 08/17] increase the timeout for the socket based tests as the CI VM seems to be slow --- ...ver.When_connecting_to_the_server_and_producing_a_message.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs b/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs index 8bc03b2..9549340 100644 --- a/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs +++ b/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs @@ -122,7 +122,7 @@ private static async Task ProduceMessageFromClientAsync(string host, }) { BootstrapServers = $"{host}:{port}", - MessageTimeoutMs = 5000, + MessageTimeoutMs = 15000, SocketTimeoutMs = 30000, Debug = "all" }; From 8a9a010f78f96e5583965114247c8d967c5bcbfa Mon Sep 17 00:00:00 2001 From: Fredrik Arvidsson Date: Thu, 20 Jan 2022 20:02:07 +0100 Subject: [PATCH 09/17] further increase timeout for CI --- ...er.When_connecting_to_the_server_and_producing_a_message.cs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs b/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs index 9549340..3c7c860 100644 --- a/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs +++ b/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs @@ -122,8 +122,7 @@ private static async Task ProduceMessageFromClientAsync(string host, }) { BootstrapServers = $"{host}:{port}", - MessageTimeoutMs = 15000, - SocketTimeoutMs = 30000, + ApiVersionRequestTimeoutMs = 30000, Debug = "all" }; From 3dafc7b52331d3a679465ac102ce721372f512ca Mon Sep 17 00:00:00 2001 From: Fredrik Arvidsson Date: Thu, 20 Jan 2022 23:54:43 +0100 Subject: [PATCH 10/17] upgrade sourcelink --- src/Kafka.TestFramework/Kafka.TestFramework.csproj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Kafka.TestFramework/Kafka.TestFramework.csproj b/src/Kafka.TestFramework/Kafka.TestFramework.csproj index f24cb99..8ed063b 100644 --- a/src/Kafka.TestFramework/Kafka.TestFramework.csproj +++ b/src/Kafka.TestFramework/Kafka.TestFramework.csproj @@ -21,7 +21,7 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive From 73e0b244c80ded7a511bda62eae2a53c22b4da86 Mon Sep 17 00:00:00 2001 From: Fredrik Arvidsson Date: Thu, 20 Jan 2022 23:55:20 +0100 Subject: [PATCH 11/17] dispose clients properly when running in memory --- .../AsyncDisposableExtensions.cs | 15 ++++++++ .../InMemoryKafkaTestFramework.cs | 34 +++++++++++++++++-- .../ValueTaskExtensions.cs | 17 ++++++++++ 3 files changed, 64 insertions(+), 2 deletions(-) create mode 100644 src/Kafka.TestFramework/AsyncDisposableExtensions.cs create mode 100644 src/Kafka.TestFramework/ValueTaskExtensions.cs diff --git a/src/Kafka.TestFramework/AsyncDisposableExtensions.cs b/src/Kafka.TestFramework/AsyncDisposableExtensions.cs new file mode 100644 index 0000000..9654bfc --- /dev/null +++ b/src/Kafka.TestFramework/AsyncDisposableExtensions.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Kafka.TestFramework +{ + internal static class AsyncDisposableExtensions + { + internal static Task DisposeAllAsync( + this IEnumerable disposables) + => disposables.Select(client => client.DisposeAsync()) + .WhenAllAsync(); + } +} \ No newline at end of file diff --git a/src/Kafka.TestFramework/InMemoryKafkaTestFramework.cs b/src/Kafka.TestFramework/InMemoryKafkaTestFramework.cs index 1a49656..102bd23 100644 --- a/src/Kafka.TestFramework/InMemoryKafkaTestFramework.cs +++ b/src/Kafka.TestFramework/InMemoryKafkaTestFramework.cs @@ -1,6 +1,9 @@ -using System.Threading; +using System; +using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; +using Kafka.Protocol; namespace Kafka.TestFramework { @@ -25,7 +28,34 @@ public async Task CreateRequestClientAsync( await _clients .SendAsync(responseClient, cancellationToken) .ConfigureAwait(false); - return RequestClient.Start(requestClient); + return new DisposableRequestClientDecorator(RequestClient.Start(requestClient), responseClient, second, first); + } + + private class DisposableRequestClientDecorator : IRequestClient + { + private readonly IRequestClient _requestClient; + private readonly List _disposables; + + public DisposableRequestClientDecorator(IRequestClient requestClient, params IAsyncDisposable[] disposables) + { + _requestClient = requestClient; + _disposables = new List(disposables) { requestClient }; + } + public async ValueTask DisposeAsync() + { + await _disposables.DisposeAllAsync() + .ConfigureAwait(false); + } + + public ValueTask ReadAsync(RequestPayload requestPayload, CancellationToken cancellationToken = default) + { + return _requestClient.ReadAsync(requestPayload, cancellationToken); + } + + public ValueTask SendAsync(RequestPayload payload, CancellationToken cancellationToken = default) + { + return _requestClient.SendAsync(payload, cancellationToken); + } } } } \ No newline at end of file diff --git a/src/Kafka.TestFramework/ValueTaskExtensions.cs b/src/Kafka.TestFramework/ValueTaskExtensions.cs new file mode 100644 index 0000000..add6b12 --- /dev/null +++ b/src/Kafka.TestFramework/ValueTaskExtensions.cs @@ -0,0 +1,17 @@ +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Kafka.TestFramework +{ + internal static class ValueTaskExtensions + { + internal static Task WhenAllAsync( + this IEnumerable tasks) + => Task.WhenAll( + tasks.Where( + valueTask + => !valueTask.IsCompletedSuccessfully) + .Select(valueTask => valueTask.AsTask())); + } +} \ No newline at end of file From 21138fd5ee7ada67caff0035d8684562d681675a Mon Sep 17 00:00:00 2001 From: Fredrik Arvidsson Date: Thu, 20 Jan 2022 23:56:02 +0100 Subject: [PATCH 12/17] explicitly set kafka.protocol reference in test projects --- .../Confluent.Kafka.1.1.0.Tests.csproj | 1 + tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj | 1 + 2 files changed, 2 insertions(+) diff --git a/tests/Confluent.Kafka.1.1.0.Tests/Confluent.Kafka.1.1.0.Tests.csproj b/tests/Confluent.Kafka.1.1.0.Tests/Confluent.Kafka.1.1.0.Tests.csproj index a25ecd8..9586ade 100644 --- a/tests/Confluent.Kafka.1.1.0.Tests/Confluent.Kafka.1.1.0.Tests.csproj +++ b/tests/Confluent.Kafka.1.1.0.Tests/Confluent.Kafka.1.1.0.Tests.csproj @@ -9,6 +9,7 @@ + diff --git a/tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj b/tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj index ab45425..bb93857 100644 --- a/tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj +++ b/tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj @@ -9,6 +9,7 @@ + From 52c77c99f23e3dddd83c0d92effaf9d860f1b90a Mon Sep 17 00:00:00 2001 From: Fredrik Arvidsson Date: Fri, 21 Jan 2022 19:04:25 +0100 Subject: [PATCH 13/17] feat: expose cancellation token triggering when the test framework is stopping in order to be able to abort communication with the test server if it is for example panicing BREAKING CHANGE: The test framework is no longer disposable. It is teared down when calling the disposable returned from Start. --- src/Kafka.TestFramework/Client.cs | 8 +-- .../InMemoryKafkaTestFramework.cs | 4 +- src/Kafka.TestFramework/KafkaTestFramework.cs | 61 +++++++++++++------ src/Kafka.TestFramework/RequestClient.cs | 8 +-- src/Kafka.TestFramework/ResponseClient.cs | 7 ++- ...g_to_the_server_and_producing_a_message.cs | 8 +-- ...g_to_the_server_and_producing_a_message.cs | 10 +-- ...the_client_sends_the_message_subscribed.cs | 7 --- 8 files changed, 65 insertions(+), 48 deletions(-) diff --git a/src/Kafka.TestFramework/Client.cs b/src/Kafka.TestFramework/Client.cs index b5b851e..8bd708f 100644 --- a/src/Kafka.TestFramework/Client.cs +++ b/src/Kafka.TestFramework/Client.cs @@ -1,25 +1,23 @@ using System; -using System.IO; using System.IO.Pipelines; using System.Threading; using System.Threading.Tasks; -using Kafka.Protocol; -using Int32 = Kafka.Protocol.Int32; namespace Kafka.TestFramework { internal abstract class Client : IAsyncDisposable { - private readonly CancellationTokenSource _cancellationSource = new CancellationTokenSource(); + private readonly CancellationTokenSource _cancellationSource; private readonly Pipe _pipe = new Pipe(); private readonly INetworkClient _networkClient; private Task _sendAndReceiveBackgroundTask = default!; - protected Client(INetworkClient networkClient) + protected Client(INetworkClient networkClient, CancellationToken cancellationToken) { _networkClient = networkClient; NetworkClient = new NetworkStream(networkClient); Reader = _pipe.Reader; + _cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); } protected NetworkStream NetworkClient { get; } diff --git a/src/Kafka.TestFramework/InMemoryKafkaTestFramework.cs b/src/Kafka.TestFramework/InMemoryKafkaTestFramework.cs index 102bd23..4b96ed6 100644 --- a/src/Kafka.TestFramework/InMemoryKafkaTestFramework.cs +++ b/src/Kafka.TestFramework/InMemoryKafkaTestFramework.cs @@ -26,9 +26,9 @@ public async Task CreateRequestClientAsync( var requestClient = new CrossWiredMemoryNetworkClient(first, second); var responseClient = new CrossWiredMemoryNetworkClient(second, first); await _clients - .SendAsync(responseClient, cancellationToken) + .SendAsync(responseClient, CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, Stopping).Token) .ConfigureAwait(false); - return new DisposableRequestClientDecorator(RequestClient.Start(requestClient), responseClient, second, first); + return new DisposableRequestClientDecorator(RequestClient.Start(requestClient, Stopping), responseClient, second, first); } private class DisposableRequestClientDecorator : IRequestClient diff --git a/src/Kafka.TestFramework/KafkaTestFramework.cs b/src/Kafka.TestFramework/KafkaTestFramework.cs index cd2e47a..435ba56 100644 --- a/src/Kafka.TestFramework/KafkaTestFramework.cs +++ b/src/Kafka.TestFramework/KafkaTestFramework.cs @@ -8,17 +8,23 @@ namespace Kafka.TestFramework { - public abstract class KafkaTestFramework : IAsyncDisposable + public abstract class KafkaTestFramework { private readonly INetworkServer _networkServer; private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); + private readonly List _backgroundTasks = new List(); - private const int Stopped = 0; - private const int Started = 1; - private int _status = Stopped; + private const int HasStopped = 0; + private const int HasStarted = 1; + private int _status = HasStopped; + + /// + /// Triggered when the test framework is stopping + /// + public CancellationToken Stopping => _cancellationTokenSource.Token; public static InMemoryKafkaTestFramework InMemory() { @@ -47,10 +53,10 @@ internal KafkaTestFramework(INetworkServer networkServer) public IAsyncDisposable Start() { - var previousStatus = Interlocked.Exchange(ref _status, Started); - if (previousStatus == Started) + var previousStatus = Interlocked.Exchange(ref _status, HasStarted); + if (previousStatus == HasStarted) { - return this; + return new StopOnDispose(this); } var task = Task.Run( @@ -61,7 +67,7 @@ public IAsyncDisposable Start() try { var client = await _networkServer - .WaitForConnectedClientAsync(_cancellationTokenSource.Token) + .WaitForConnectedClientAsync(Stopping) .ConfigureAwait(false); ReceiveMessagesFor(client); } @@ -69,10 +75,15 @@ public IAsyncDisposable Start() { return; } + catch + { + _cancellationTokenSource.Cancel(); + throw; + } } }); _backgroundTasks.Add(task); - return this; + return new StopOnDispose(this); } private void ReceiveMessagesFor(INetworkClient networkClient) @@ -80,14 +91,14 @@ private void ReceiveMessagesFor(INetworkClient networkClient) var task = Task.Run( async () => { - var client = ResponseClient.Start(networkClient); + var client = ResponseClient.Start(networkClient, Stopping); await using var _ = client.ConfigureAwait(false); while (_cancellationTokenSource.IsCancellationRequested == false) { try { var requestPayload = await client - .ReadAsync(_cancellationTokenSource.Token) + .ReadAsync(Stopping) .ConfigureAwait(false); if (!_subscriptions.TryGetValue( @@ -95,12 +106,12 @@ private void ReceiveMessagesFor(INetworkClient networkClient) out var subscription)) { throw new InvalidOperationException( - $"Missing subscription for {requestPayload.Message.GetType()}"); + $"Missing subscription for {requestPayload.Message.GetType()}"); } var response = await subscription( - requestPayload.Message, - _cancellationTokenSource.Token); + requestPayload.Message, + Stopping); await client .SendAsync( @@ -109,13 +120,18 @@ await client Messages.GetResponseHeaderVersionFor(requestPayload)) .WithCorrelationId(requestPayload.Header.CorrelationId), response), - _cancellationTokenSource.Token) + Stopping) .ConfigureAwait(false); } catch when (_cancellationTokenSource.IsCancellationRequested) { return; } + catch + { + _cancellationTokenSource.Cancel(); + throw; + } } }); _backgroundTasks.Add(task); @@ -163,11 +179,20 @@ public KafkaTestFramework On( return this; } - public async ValueTask DisposeAsync() + internal sealed class StopOnDispose : IAsyncDisposable { - _cancellationTokenSource.Cancel(); + private readonly KafkaTestFramework _testFramework; - await Task.WhenAll(_backgroundTasks); + public StopOnDispose(KafkaTestFramework testFramework) + { + _testFramework = testFramework; + } + public async ValueTask DisposeAsync() + { + _testFramework._cancellationTokenSource.Cancel(); + await Task.WhenAll(_testFramework._backgroundTasks) + .ConfigureAwait(false); + } } } } \ No newline at end of file diff --git a/src/Kafka.TestFramework/RequestClient.cs b/src/Kafka.TestFramework/RequestClient.cs index f1bf3c1..867698a 100644 --- a/src/Kafka.TestFramework/RequestClient.cs +++ b/src/Kafka.TestFramework/RequestClient.cs @@ -1,19 +1,19 @@ using System.Threading; using System.Threading.Tasks; using Kafka.Protocol; -using Int32 = Kafka.Protocol.Int32; namespace Kafka.TestFramework { internal class RequestClient : Client, IRequestClient { - private RequestClient(INetworkClient networkClient) : base(networkClient) + private RequestClient(INetworkClient networkClient, CancellationToken cancellationToken) : + base(networkClient, cancellationToken) { } - internal static RequestClient Start(INetworkClient networkClient) + internal static RequestClient Start(INetworkClient networkClient, CancellationToken cancellationToken) { - var client = new RequestClient(networkClient); + var client = new RequestClient(networkClient, cancellationToken); client.StartReceiving(); return client; } diff --git a/src/Kafka.TestFramework/ResponseClient.cs b/src/Kafka.TestFramework/ResponseClient.cs index 081c1f9..757f478 100644 --- a/src/Kafka.TestFramework/ResponseClient.cs +++ b/src/Kafka.TestFramework/ResponseClient.cs @@ -6,13 +6,14 @@ namespace Kafka.TestFramework { internal class ResponseClient : Client { - private ResponseClient(INetworkClient networkClient) : base(networkClient) + private ResponseClient(INetworkClient networkClient, CancellationToken cancellationToken) : + base(networkClient, cancellationToken) { } - internal static ResponseClient Start(INetworkClient networkClient) + internal static ResponseClient Start(INetworkClient networkClient, CancellationToken cancellationToken) { - var client = new ResponseClient(networkClient); + var client = new ResponseClient(networkClient, cancellationToken); client.StartReceiving(); return client; } diff --git a/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs b/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs index 8bc03b2..51e873f 100644 --- a/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs +++ b/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs @@ -52,7 +52,7 @@ protected override Task GivenAsync() .WithLeaderId(Int32.From(0)) .WithPartitionIndex(Int32.From(0)) .WithReplicaNodesCollection(new[] { Int32.From(0) })))) - .ToArray() ?? new Func[0]) + .ToArray() ?? Array.Empty>()) .WithControllerId(Int32.From(0)) .WithClusterId(String.From("test")) .WithBrokersCollection(broker => broker @@ -96,7 +96,7 @@ protected override async Task WhenAsync() await using (_testServer.Start() .ConfigureAwait(false)) { - await ProduceMessageFromClientAsync("localhost", _testServer.Port) + await ProduceMessageFromClientAsync("localhost", _testServer.Port, _testServer.Stopping) .ConfigureAwait(false); } } @@ -114,7 +114,7 @@ public void It_should_have_read_the_message_sent() } private static async Task ProduceMessageFromClientAsync(string host, - int port) + int port, CancellationToken testServerStopping) { var producerConfig = new ProducerConfig(new Dictionary { @@ -138,7 +138,7 @@ private static async Task ProduceMessageFromClientAsync(string host, .ConfigureAwait(false); LogFactory.Create("producer").Info("Produce report {@report}", report); - producer.Flush(); + producer.Flush(testServerStopping); } } } diff --git a/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs b/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs index 3c7c860..b8c0a64 100644 --- a/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs +++ b/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs @@ -52,7 +52,7 @@ protected override Task GivenAsync() .WithLeaderId(Int32.From(0)) .WithPartitionIndex(Int32.From(0)) .WithReplicaNodesCollection(new[] { Int32.From(0) })))) - .ToArray() ?? new Func[0]) + .ToArray() ?? Array.Empty>()) .WithControllerId(Int32.From(0)) .WithClusterId(String.From("test")) .WithBrokersCollection(broker => broker @@ -96,7 +96,7 @@ protected override async Task WhenAsync() await using (_testServer.Start() .ConfigureAwait(false)) { - await ProduceMessageFromClientAsync("localhost", _testServer.Port) + await ProduceMessageFromClientAsync("localhost", _testServer.Port, _testServer.Stopping) .ConfigureAwait(false); } } @@ -114,7 +114,7 @@ public void It_should_have_read_the_message_sent() } private static async Task ProduceMessageFromClientAsync(string host, - int port) + int port, CancellationToken testServerStopping) { var producerConfig = new ProducerConfig(new Dictionary { @@ -133,11 +133,11 @@ private static async Task ProduceMessageFromClientAsync(string host, var report = await producer .ProduceAsync("my-topic", - new Message { Value = "test" }) + new Message { Value = "test" }, testServerStopping) .ConfigureAwait(false); LogFactory.Create("producer").Info("Produce report {@report}", report); - producer.Flush(); + producer.Flush(testServerStopping); } } } diff --git a/tests/Kafka.TestFramework.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs b/tests/Kafka.TestFramework.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs index edab6e7..07822f9 100644 --- a/tests/Kafka.TestFramework.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs +++ b/tests/Kafka.TestFramework.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs @@ -130,13 +130,6 @@ public void .ApiKeysCollection[FetchRequest.ApiKey].MaxVersion.Should() .Be(FetchRequest.MaxVersion); } - - protected override async Task TearDownAsync() - { - await _testServer - .DisposeAsync() - .ConfigureAwait(false); - } } } } \ No newline at end of file From e96dc3374c4d2e4c3f845b70f886f7f02a3231e2 Mon Sep 17 00:00:00 2001 From: Fredrik Arvidsson Date: Fri, 21 Jan 2022 19:15:22 +0100 Subject: [PATCH 14/17] ci: remove no longer used pipeline tools --- appveyor.yml | 34 ------------- scripts/set-version-tag-in-git.ps1 | 7 --- scripts/set-version.ps1 | 77 ------------------------------ scripts/update-release-notes.ps1 | 20 -------- version | 1 - 5 files changed, 139 deletions(-) delete mode 100644 appveyor.yml delete mode 100644 scripts/set-version-tag-in-git.ps1 delete mode 100644 scripts/set-version.ps1 delete mode 100644 scripts/update-release-notes.ps1 delete mode 100644 version diff --git a/appveyor.yml b/appveyor.yml deleted file mode 100644 index f539dc8..0000000 --- a/appveyor.yml +++ /dev/null @@ -1,34 +0,0 @@ -version: '{build}' -skip_tags: true -image: Visual Studio 2019 -configuration: Release -dotnet_csproj: - patch: true - file: '**\Kafka.TestFramework.csproj' - package_version: '{version}' -environment: - git_access_token: - secure: V8i7UZRY6IQFpngUi2sgnunCFZKvE4al96flaBKyWqXyRLmTabX0/1Z0Tpl8BWKr - appveyor_api_token: - secure: eMpCyScSyam7rRtm3bSRAzt29qATXHWxk8mfgdTpyh0= -install: -- ps: scripts/set-version.ps1 -before_build: -- cmd: dotnet restore -build_script: -- dotnet build -c %CONFIGURATION% -test_script: -- dotnet test -c %CONFIGURATION% -after_test: -- ps: >- - scripts/update-release-notes.ps1 - - dotnet pack src/Kafka.TestFramework/Kafka.TestFramework.csproj -c %CONFIGURATION% -o nupkgs -artifacts: -- path: nupkgs\*.nupkg -deploy: -- provider: NuGet - api_key: - secure: CrJHCcuXYVvE4Qwk+1VfIboKDImZyj+NUE5fZQj300ctK0uUCICPgrVNA7YfLRl+ -after_deploy: -- ps: scripts/set-version-tag-in-git.ps1 \ No newline at end of file diff --git a/scripts/set-version-tag-in-git.ps1 b/scripts/set-version-tag-in-git.ps1 deleted file mode 100644 index 31026d9..0000000 --- a/scripts/set-version-tag-in-git.ps1 +++ /dev/null @@ -1,7 +0,0 @@ -$ErrorActionPreference = "Stop" -git config --global credential.helper store -Add-Content -Path "$env:USERPROFILE\.git-credentials" -Value "https://$($env:git_access_token):x-oauth-basic@github.com`n" -NoNewline -git config --global user.email "fresa@fresa.se" -git config --global user.name "Fredrik Arvidsson" -git tag v$($env:APPVEYOR_BUILD_VERSION) $($env:APPVEYOR_REPO_COMMIT) -git push origin --tags --quiet \ No newline at end of file diff --git a/scripts/set-version.ps1 b/scripts/set-version.ps1 deleted file mode 100644 index a5bbca3..0000000 --- a/scripts/set-version.ps1 +++ /dev/null @@ -1,77 +0,0 @@ -$ErrorActionPreference = "Stop" -Write-Host "Versioning is built on the previous build version which is expected to be in semver format, ex: 1.1.6-alpha" -$token = $env:appveyor_api_token #should be defined as a secure variable -$branch = $env:APPVEYOR_REPO_BRANCH - -$headers = @{ - "Authorization" = "Bearer $token" - "Content-type" = "application/json" -} -$apiURL = "https://ci.appveyor.com/api/projects/$env:APPVEYOR_ACCOUNT_NAME/$env:APPVEYOR_PROJECT_SLUG" -$history = Invoke-RestMethod -Uri "$apiURL/history?recordsNumber=2" -Headers $headers -Method Get - -$version = (Get-Content .\version) -Write-Host "Current version specified: $version" -[int]$major = $version.Substring(0, $version.IndexOf(".")) -[int]$minor = $version.Substring($version.IndexOf(".") + 1) - -# apply versioning strategy if this is not the first build -if ($history.builds.Count -eq 2) -{ - $previousVersion = $history.builds[1].version - Write-Host "Previous version: $previousVersion" - [int]$previousMajor = $previousVersion.Substring(0, $previousVersion.IndexOf(".")) - Write-Host "Previous major version: $previousMajor" - [int]$previousMinor = $previousVersion.Substring($previousVersion.IndexOf(".") + 1, $previousVersion.LastIndexOf(".") - ($previousVersion.IndexOf(".") + 1)) - Write-Host "Previous minor version: $previousMinor" - # handle suffix, eg. 1.2.3-alpha - if ([int]$previousVersion.IndexOf("-") -eq -1){ - [int]$previousPatch = $previousVersion.Substring($previousVersion.LastIndexOf(".") + 1) - } else { - [int]$previousPatch = $previousVersion.Substring($previousVersion.LastIndexOf(".") + 1, $previousVersion.IndexOf("-")-($previousVersion.LastIndexOf(".") + 1)) - } - Write-Host "Previous patch version: $previousPatch" - - $patch = $previousPatch + 1 - - if ($previousMajor -ne $major) - { - if ($major -ne $previousMajor + 1) - { - throw "Major version identity $major can only be incremented by one in regards to previous major $previousMajor" - } - - if ($minor -ne 0) - { - throw "Minor version has to be set to 0 when incrementing major version" - } - - Write-Warning "Major version has been changed, setting patch to 0" - $patch = 0 - } - if ($previousMinor -ne $minor) - { - if ($minor -ne $previousMinor + 1) - { - throw "Minor version identity $minor can only be incremented by one in regards to previous minor $previousMinor" - } - - Write-Warning "Minor version has been changed, setting patch to 0" - $patch = 0 - } -} else -{ - # first build - Write-Warning "No previous builds found, setting patch to 0" - $patch = 0 -} - -$versionSuffix = "" -if ($branch -ne "master") -{ - $versionSuffix="-alpha" -} - -$currentBuildVersion = "$version.$patch$versionSuffix" -Write-Host "Setting build version to $currentBuildVersion" -Update-AppveyorBuild -Version "$currentBuildVersion" \ No newline at end of file diff --git a/scripts/update-release-notes.ps1 b/scripts/update-release-notes.ps1 deleted file mode 100644 index b78e69b..0000000 --- a/scripts/update-release-notes.ps1 +++ /dev/null @@ -1,20 +0,0 @@ -$ErrorActionPreference = "Stop" -git fetch --tags -$tags = git tag -l v* -if ($tags) -{ - $releaseNotes = git log "$(git describe --tags --match v* --abbrev=0)..$($env:APPVEYOR_REPO_COMMIT)" --pretty=format:"-%s" --no-merges -} -else -{ - $releaseNotes = git log $($env:APPVEYOR_REPO_COMMIT) --pretty=format:"-%s" --no-merges -} - -$releaseNotesAsString = if ($releaseNotes -eq $null) { "" } else { [string]::join("`n", $releaseNotes) } - -Write-Host "Release notes: '$($releaseNotesAsString)'" - -$path = "src/$env:APPVEYOR_PROJECT_NAME/$env:APPVEYOR_PROJECT_NAME.csproj" -[xml]$xml = Get-Content -Path $path -$xml.GetElementsByTagName("PackageReleaseNotes").set_InnerXML("$releaseNotesAsString") -Set-Content $path -Value $xml.InnerXml -Force \ No newline at end of file diff --git a/version b/version deleted file mode 100644 index 9f8e9b6..0000000 --- a/version +++ /dev/null @@ -1 +0,0 @@ -1.0 \ No newline at end of file From c01cc91714d46c1e33aecb9a2be1d0d4e5e5fff4 Mon Sep 17 00:00:00 2001 From: Fredrik Arvidsson Date: Fri, 21 Jan 2022 19:16:08 +0100 Subject: [PATCH 15/17] doc: add v2.x documentation --- README.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 112a19e..c324820 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,13 @@ # Kafka.TestFramework An in-memory test framework for Kafka clients which can be used to subscribe on request messages and respond with response messages. The test server can be used in-memory or connected to clients over TCP. -[![Build status](https://ci.appveyor.com/api/projects/status/3by56qq57a8or5a2?svg=true)](https://ci.appveyor.com/project/Fresa/kafka-testframework) - -[![Build history](https://buildstats.info/appveyor/chart/Fresa/kafka-testframework)](https://ci.appveyor.com/project/Fresa/kafka-testframework/history) +[![Continuous Delivery](https://github.com/Fresa/Kafka.TestFramework/actions/workflows/ci.yml/badge.svg)](https://github.com/Fresa/Kafka.TestFramework/actions/workflows/ci.yml) ## Download https://www.nuget.org/packages/kafka.testframework ## Getting Started The test framework can be used in-memory or by setting up a TCP socket that the kafka client can connect to. See the [`integration tests`](https://github.com/Fresa/Kafka.TestFramework/blob/master/tests/Kafka.TestFramework.Tests). + +### v2.x +Now supports [Kafka.Protocol](https://github.com/Fresa/Kafka.Protocol) v2.x. \ No newline at end of file From 8697fefa6d010a38b0fc095f2921d83d1ad45a2f Mon Sep 17 00:00:00 2001 From: Fredrik Arvidsson Date: Fri, 21 Jan 2022 19:40:42 +0100 Subject: [PATCH 16/17] update to v2.0.0 of Kafka.Protocol --- Kafka.TestFramework.sln | 5 +++-- src/Kafka.TestFramework/Kafka.TestFramework.csproj | 2 +- .../Confluent.Kafka.1.1.0.Tests.csproj | 2 +- .../Kafka.TestFramework.Tests.csproj | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/Kafka.TestFramework.sln b/Kafka.TestFramework.sln index dd44be3..7b8b432 100644 --- a/Kafka.TestFramework.sln +++ b/Kafka.TestFramework.sln @@ -7,9 +7,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{0EEC4A41 EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{93A69F6A-D895-4A0E-9241-655EF7A9EEDA}" ProjectSection(SolutionItems) = preProject - appveyor.yml = appveyor.yml + .github\workflows\ci.yml = .github\workflows\ci.yml + .github\CODEOWNERS = .github\CODEOWNERS README.md = README.md - version = version + .github\version_config.yml = .github\version_config.yml EndProjectSection EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Kafka.TestFramework.Tests", "tests\Kafka.TestFramework.Tests\Kafka.TestFramework.Tests.csproj", "{FCFCC73C-B0A6-4D4E-A765-9A94303D00AD}" diff --git a/src/Kafka.TestFramework/Kafka.TestFramework.csproj b/src/Kafka.TestFramework/Kafka.TestFramework.csproj index 8ed063b..c4cf0fe 100644 --- a/src/Kafka.TestFramework/Kafka.TestFramework.csproj +++ b/src/Kafka.TestFramework/Kafka.TestFramework.csproj @@ -20,7 +20,7 @@ - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/tests/Confluent.Kafka.1.1.0.Tests/Confluent.Kafka.1.1.0.Tests.csproj b/tests/Confluent.Kafka.1.1.0.Tests/Confluent.Kafka.1.1.0.Tests.csproj index 9586ade..cb679ae 100644 --- a/tests/Confluent.Kafka.1.1.0.Tests/Confluent.Kafka.1.1.0.Tests.csproj +++ b/tests/Confluent.Kafka.1.1.0.Tests/Confluent.Kafka.1.1.0.Tests.csproj @@ -9,7 +9,7 @@ - + diff --git a/tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj b/tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj index bb93857..bf80eb0 100644 --- a/tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj +++ b/tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj @@ -9,7 +9,7 @@ - + From 10df6ef251f4249506d6097b7c0b3648635ee5b2 Mon Sep 17 00:00:00 2001 From: Fredrik Arvidsson Date: Fri, 21 Jan 2022 19:52:35 +0100 Subject: [PATCH 17/17] add AsValueTask extension method to cast from ValueTask to ValueTask --- src/Kafka.TestFramework/NetworkStream.cs | 6 ++---- src/Kafka.TestFramework/ValueTaskExtensions.cs | 3 +++ 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/Kafka.TestFramework/NetworkStream.cs b/src/Kafka.TestFramework/NetworkStream.cs index 39ac8f4..51d9cc1 100644 --- a/src/Kafka.TestFramework/NetworkStream.cs +++ b/src/Kafka.TestFramework/NetworkStream.cs @@ -38,10 +38,8 @@ public override void Write(byte[] buffer, int offset, int count) throw new System.NotImplementedException(); } - public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = new CancellationToken()) - { - return new ValueTask(_networkClient.SendAsync(buffer, cancellationToken).AsTask()); - } + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = new CancellationToken()) => + _networkClient.SendAsync(buffer, cancellationToken).AsValueTask(); public override bool CanRead => false; public override bool CanSeek => false; diff --git a/src/Kafka.TestFramework/ValueTaskExtensions.cs b/src/Kafka.TestFramework/ValueTaskExtensions.cs index add6b12..c6eeb72 100644 --- a/src/Kafka.TestFramework/ValueTaskExtensions.cs +++ b/src/Kafka.TestFramework/ValueTaskExtensions.cs @@ -13,5 +13,8 @@ internal static Task WhenAllAsync( valueTask => !valueTask.IsCompletedSuccessfully) .Select(valueTask => valueTask.AsTask())); + + internal static ValueTask AsValueTask(this ValueTask valueTask) => + valueTask.IsCompletedSuccessfully ? default : new ValueTask(valueTask.AsTask()); } } \ No newline at end of file