Skip to content

Commit

Permalink
test: extend the consumer test to demonstrate a client consuming mult…
Browse files Browse the repository at this point in the history
…iple messages
  • Loading branch information
Fresa committed Feb 1, 2023
1 parent 4cad71b commit 56421ce
Showing 1 changed file with 65 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using FluentAssertions;
using Kafka.Protocol;
using Kafka.Protocol.Records;
using Log.It;
using Xunit;
using Xunit.Abstractions;
using Int32 = Kafka.Protocol.Int32;
Expand All @@ -21,7 +20,8 @@ public partial class Given_a_socket_based_test_server
public class When_connecting_to_the_server_and_consuming_a_message : TestSpecificationAsync
{
private SocketBasedKafkaTestFramework _testServer;
private ConsumeResult<Ignore, string> _result;
private readonly List<string> _result = new List<string>();
private const int NumberOfMessage = 5;

public When_connecting_to_the_server_and_consuming_a_message(
ITestOutputHelper testOutputHelper)
Expand Down Expand Up @@ -99,26 +99,51 @@ protected override Task GivenAsync()
Array.Empty<Func<OffsetFetchResponse.OffsetFetchResponseTopic,
OffsetFetchResponse.OffsetFetchResponseTopic>>()));

_testServer.On<FetchRequest, FetchResponse>(request => request.Respond()
.WithResponsesCollection(
request.TopicsCollection.Select(topic =>
new Func<FetchResponse.FetchableTopicResponse, FetchResponse.FetchableTopicResponse>(
response => response
.WithTopic(topic.Topic)
.WithPartitionsCollection(topic.PartitionsCollection.Select(partition =>
new Func<FetchResponse.FetchableTopicResponse.PartitionData,
FetchResponse.FetchableTopicResponse.PartitionData>(data => data
.WithRecords(new NullableRecordBatch
var records = new Dictionary<long, Record>();
for (var i = 0; i < NumberOfMessage; i++)
{
records.Add(i, new Record
{
OffsetDelta = i,
Value = Encoding.UTF8.GetBytes(
$"data{i} fetched from broker")
});
}

_testServer.On<FetchRequest, FetchResponse>(async (request, cancellationToken) =>
{
var returnsData = false;
var response = request.Respond()
.WithResponsesCollection(
request.TopicsCollection.Select(topic =>
new Func<FetchResponse.FetchableTopicResponse, FetchResponse.FetchableTopicResponse>(
response => response
.WithTopic(topic.Topic)
.WithPartitionsCollection(topic.PartitionsCollection.Select(partition =>
new Func<FetchResponse.FetchableTopicResponse.PartitionData,
FetchResponse.FetchableTopicResponse.PartitionData>(data =>
{
Magic = 2,
Records = new NullableArray<Record>(new Record
var recordBatch = new NullableRecordBatch
{
Value = Encoding.UTF8.GetBytes(
"data fetched from broker"),
})
}))).ToArray()
))).ToArray())
);
LastOffsetDelta = (int)partition.FetchOffset,
Magic = 2,
Records = records.TryGetValue(partition.FetchOffset.Value,
out var record)
? new NullableArray<Record>(record)
: NullableArray<Record>.Default
};
returnsData = recordBatch.Records != NullableArray<Record>.Default;
return returnsData ? data.WithRecords(recordBatch) : data;
})).ToArray()
))).ToArray());
if (!returnsData)
{
await Task.Delay(request.MaxWaitMs, cancellationToken)
.ConfigureAwait(false);
}
return response;
});

_testServer.On<OffsetCommitRequest, OffsetCommitResponse>(request => request.Respond()
.WithTopicsCollection(request.TopicsCollection.Select(topic =>
Expand All @@ -145,17 +170,21 @@ protected override async Task WhenAsync()
await using (_testServer.Start()
.ConfigureAwait(false))
{
_result = ConsumeMessage("localhost", _testServer.Port, _testServer.Stopping);
ConsumeMessages("localhost", _testServer.Port, _testServer.Stopping);
}
}

[Fact]
public void It_should_have_read_the_message_sent()
public void It_should_have_read_the_messages_sent()
{
_result.Message.Value.Should().Be("data fetched from broker");
_result.Should().HaveCount(NumberOfMessage);
for (var i = 0; i < NumberOfMessage; i++)
{
_result.Should().Contain($"data{i} fetched from broker");
}
}

private ConsumeResult<Ignore, string> ConsumeMessage(string host,
private void ConsumeMessages(string host,
int port, CancellationToken testServerStopping)
{
var consumerConfig = new ConsumerConfig(new Dictionary<string, string>
Expand All @@ -166,7 +195,7 @@ private ConsumeResult<Ignore, string> ConsumeMessage(string host,
BootstrapServers = $"{host}:{port}",
ApiVersionRequestTimeoutMs = 30000,
Debug = "all",
GroupId = "group1"
GroupId = "group1",
};

using var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)
Expand All @@ -176,12 +205,17 @@ private ConsumeResult<Ignore, string> ConsumeMessage(string host,
consumer.Subscribe("topic1");
var cancellationToken = CancellationTokenSource
.CreateLinkedTokenSource(testServerStopping, TimeoutCancellationToken).Token;
var result = consumer.Consume(cancellationToken);
consumer.Close();

LogFactory.Create("consumer").Info("Consumer consumed {@result}", result);

return result;
try
{
for (var i = 0; i < NumberOfMessage; i++)
{
_result.Add(consumer.Consume(cancellationToken).Message.Value);
}
}
finally
{
consumer.Close();
}
}
}
}
Expand Down

0 comments on commit 56421ce

Please sign in to comment.