diff --git a/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_consuming_a_message.cs b/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_consuming_a_message.cs index 0a8840e..5e40f85 100644 --- a/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_consuming_a_message.cs +++ b/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_consuming_a_message.cs @@ -8,7 +8,6 @@ using FluentAssertions; using Kafka.Protocol; using Kafka.Protocol.Records; -using Log.It; using Xunit; using Xunit.Abstractions; using Int32 = Kafka.Protocol.Int32; @@ -21,7 +20,8 @@ public partial class Given_a_socket_based_test_server public class When_connecting_to_the_server_and_consuming_a_message : TestSpecificationAsync { private SocketBasedKafkaTestFramework _testServer; - private ConsumeResult _result; + private readonly List _result = new List(); + private const int NumberOfMessage = 5; public When_connecting_to_the_server_and_consuming_a_message( ITestOutputHelper testOutputHelper) @@ -99,26 +99,51 @@ protected override Task GivenAsync() Array.Empty>())); - _testServer.On(request => request.Respond() - .WithResponsesCollection( - request.TopicsCollection.Select(topic => - new Func( - response => response - .WithTopic(topic.Topic) - .WithPartitionsCollection(topic.PartitionsCollection.Select(partition => - new Func(data => data - .WithRecords(new NullableRecordBatch + var records = new Dictionary(); + for (var i = 0; i < NumberOfMessage; i++) + { + records.Add(i, new Record + { + OffsetDelta = i, + Value = Encoding.UTF8.GetBytes( + $"data{i} fetched from broker") + }); + } + + _testServer.On(async (request, cancellationToken) => + { + var returnsData = false; + var response = request.Respond() + .WithResponsesCollection( + request.TopicsCollection.Select(topic => + new Func( + response => response + .WithTopic(topic.Topic) + .WithPartitionsCollection(topic.PartitionsCollection.Select(partition => + new Func(data => { - Magic = 2, - Records = new NullableArray(new Record + var recordBatch = new NullableRecordBatch { - Value = Encoding.UTF8.GetBytes( - "data fetched from broker"), - }) - }))).ToArray() - ))).ToArray()) - ); + LastOffsetDelta = (int)partition.FetchOffset, + Magic = 2, + Records = records.TryGetValue(partition.FetchOffset.Value, + out var record) + ? new NullableArray(record) + : NullableArray.Default + }; + returnsData = recordBatch.Records != NullableArray.Default; + return returnsData ? data.WithRecords(recordBatch) : data; + })).ToArray() + ))).ToArray()); + if (!returnsData) + { + await Task.Delay(request.MaxWaitMs, cancellationToken) + .ConfigureAwait(false); + } + + return response; + }); _testServer.On(request => request.Respond() .WithTopicsCollection(request.TopicsCollection.Select(topic => @@ -145,17 +170,21 @@ protected override async Task WhenAsync() await using (_testServer.Start() .ConfigureAwait(false)) { - _result = ConsumeMessage("localhost", _testServer.Port, _testServer.Stopping); + ConsumeMessages("localhost", _testServer.Port, _testServer.Stopping); } } [Fact] - public void It_should_have_read_the_message_sent() + public void It_should_have_read_the_messages_sent() { - _result.Message.Value.Should().Be("data fetched from broker"); + _result.Should().HaveCount(NumberOfMessage); + for (var i = 0; i < NumberOfMessage; i++) + { + _result.Should().Contain($"data{i} fetched from broker"); + } } - private ConsumeResult ConsumeMessage(string host, + private void ConsumeMessages(string host, int port, CancellationToken testServerStopping) { var consumerConfig = new ConsumerConfig(new Dictionary @@ -166,7 +195,7 @@ private ConsumeResult ConsumeMessage(string host, BootstrapServers = $"{host}:{port}", ApiVersionRequestTimeoutMs = 30000, Debug = "all", - GroupId = "group1" + GroupId = "group1", }; using var consumer = new ConsumerBuilder(consumerConfig) @@ -176,12 +205,17 @@ private ConsumeResult ConsumeMessage(string host, consumer.Subscribe("topic1"); var cancellationToken = CancellationTokenSource .CreateLinkedTokenSource(testServerStopping, TimeoutCancellationToken).Token; - var result = consumer.Consume(cancellationToken); - consumer.Close(); - - LogFactory.Create("consumer").Info("Consumer consumed {@result}", result); - - return result; + try + { + for (var i = 0; i < NumberOfMessage; i++) + { + _result.Add(consumer.Consume(cancellationToken).Message.Value); + } + } + finally + { + consumer.Close(); + } } } }