diff --git a/Kafka.TestFramework.sln b/Kafka.TestFramework.sln
index 7b8b432..702f55e 100644
--- a/Kafka.TestFramework.sln
+++ b/Kafka.TestFramework.sln
@@ -19,6 +19,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Kafka.TestFramework", "src\
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Confluent.Kafka.1.1.0.Tests", "tests\Confluent.Kafka.1.1.0.Tests\Confluent.Kafka.1.1.0.Tests.csproj", "{12646283-CC9E-4D45-8E89-F52337C790B6}"
EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Kafka.TestSpecification", "tests\Kafka.TestSpecification\Kafka.TestSpecification.csproj", "{4676B0E5-6641-426B-986D-49D336B0E5E6}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Confluent.Kafka.2.4.0.Tests", "tests\Confluent.Kafka.2.4.0.Tests\Confluent.Kafka.2.4.0.Tests.csproj", "{B06C41D0-DB18-47F2-A280-0104FE5843CC}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -37,6 +41,14 @@ Global
{12646283-CC9E-4D45-8E89-F52337C790B6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{12646283-CC9E-4D45-8E89-F52337C790B6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{12646283-CC9E-4D45-8E89-F52337C790B6}.Release|Any CPU.Build.0 = Release|Any CPU
+ {4676B0E5-6641-426B-986D-49D336B0E5E6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {4676B0E5-6641-426B-986D-49D336B0E5E6}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {4676B0E5-6641-426B-986D-49D336B0E5E6}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {4676B0E5-6641-426B-986D-49D336B0E5E6}.Release|Any CPU.Build.0 = Release|Any CPU
+ {B06C41D0-DB18-47F2-A280-0104FE5843CC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {B06C41D0-DB18-47F2-A280-0104FE5843CC}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {B06C41D0-DB18-47F2-A280-0104FE5843CC}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {B06C41D0-DB18-47F2-A280-0104FE5843CC}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -44,6 +56,8 @@ Global
GlobalSection(NestedProjects) = preSolution
{FCFCC73C-B0A6-4D4E-A765-9A94303D00AD} = {0EEC4A41-7C67-4580-8A21-BFA01B2415F1}
{12646283-CC9E-4D45-8E89-F52337C790B6} = {0EEC4A41-7C67-4580-8A21-BFA01B2415F1}
+ {4676B0E5-6641-426B-986D-49D336B0E5E6} = {0EEC4A41-7C67-4580-8A21-BFA01B2415F1}
+ {B06C41D0-DB18-47F2-A280-0104FE5843CC} = {0EEC4A41-7C67-4580-8A21-BFA01B2415F1}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {B432BD60-300C-4955-9D44-011413D52F2D}
diff --git a/Kafka.TestFramework.v3.ncrunchsolution b/Kafka.TestFramework.v3.ncrunchsolution
index 10420ac..13107d3 100644
--- a/Kafka.TestFramework.v3.ncrunchsolution
+++ b/Kafka.TestFramework.v3.ncrunchsolution
@@ -1,6 +1,8 @@
True
+ True
+ True
True
\ No newline at end of file
diff --git a/src/Kafka.TestFramework/SocketServer.cs b/src/Kafka.TestFramework/SocketServer.cs
index d2c7f65..f6098d2 100644
--- a/src/Kafka.TestFramework/SocketServer.cs
+++ b/src/Kafka.TestFramework/SocketServer.cs
@@ -57,7 +57,27 @@ private void StartAcceptingClients()
var clientSocket = await _clientAcceptingSocket
.AcceptAsync()
.ConfigureAwait(false);
- Logger.Debug("Client connected {@clientSocket}", clientSocket);
+ Logger.Debug("Client connected {@clientSocket}", new
+ {
+ clientSocket.AddressFamily,
+ clientSocket.Available,
+ clientSocket.Connected,
+ clientSocket.ProtocolType,
+ clientSocket.ReceiveTimeout,
+ clientSocket.ReceiveBufferSize,
+ clientSocket.Ttl,
+ clientSocket.SendBufferSize,
+ clientSocket.SendTimeout,
+ RemoteEndPoint = new
+ {
+ clientSocket.RemoteEndPoint.AddressFamily
+ },
+ clientSocket.SocketType,
+ LocalEndPoint = new
+ {
+ clientSocket.LocalEndPoint.AddressFamily
+ }
+ });
await _waitingClients
.SendAsync(
diff --git a/tests/Confluent.Kafka.1.1.0.Tests/BytesExtensions.cs b/tests/Confluent.Kafka.1.1.0.Tests/BytesExtensions.cs
deleted file mode 100644
index b1d6757..0000000
--- a/tests/Confluent.Kafka.1.1.0.Tests/BytesExtensions.cs
+++ /dev/null
@@ -1,13 +0,0 @@
-using System.Text;
-using Kafka.Protocol;
-
-namespace Kafka.TestFramework.Tests
-{
- internal static class BytesExtensions
- {
- internal static string EncodeToString(this byte[] bytes, Encoding encoding)
- {
- return encoding.GetString(bytes);
- }
- }
-}
\ No newline at end of file
diff --git a/tests/Confluent.Kafka.1.1.0.Tests/Confluent.Kafka.1.1.0.Tests.csproj b/tests/Confluent.Kafka.1.1.0.Tests/Confluent.Kafka.1.1.0.Tests.csproj
index 2f242dd..525ca9d 100644
--- a/tests/Confluent.Kafka.1.1.0.Tests/Confluent.Kafka.1.1.0.Tests.csproj
+++ b/tests/Confluent.Kafka.1.1.0.Tests/Confluent.Kafka.1.1.0.Tests.csproj
@@ -7,14 +7,13 @@
-
+
-
@@ -24,6 +23,7 @@
+
diff --git a/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_consuming_a_message.cs b/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_consuming_a_message.cs
new file mode 100644
index 0000000..a6f80b4
--- /dev/null
+++ b/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_consuming_a_message.cs
@@ -0,0 +1,222 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Confluent.Kafka;
+using FluentAssertions;
+using Kafka.Protocol;
+using Kafka.Protocol.Records;
+using Xunit;
+using Xunit.Abstractions;
+using Int32 = Kafka.Protocol.Int32;
+using Record = Kafka.Protocol.Records.Record;
+
+namespace Kafka.TestFramework.Tests
+{
+ public partial class Given_a_socket_based_test_server
+ {
+ public class When_connecting_to_the_server_and_consuming_a_message : TestSpecificationAsync
+ {
+ private SocketBasedKafkaTestFramework _testServer;
+ private readonly List _result = new List();
+ private const int NumberOfMessage = 5;
+
+ public When_connecting_to_the_server_and_consuming_a_message(
+ ITestOutputHelper testOutputHelper)
+ : base(testOutputHelper)
+ {
+ }
+
+ protected override Task GivenAsync()
+ {
+ _testServer = KafkaTestFramework.WithSocket();
+
+ _testServer.On(
+ request => request.Respond()
+ .WithAllApiKeys());
+
+ _testServer.On(
+ request => request.Respond()
+ .WithTopicsCollection(
+ request.TopicsCollection?.Select(topic =>
+ new Func(
+ responseTopic =>
+ responseTopic
+ .WithName(topic.Name)
+ .WithPartitionsCollection(partition =>
+ partition
+ .WithLeaderId(0)
+ .WithPartitionIndex(0))))
+ .ToArray() ??
+ Array.Empty>())
+ .WithBrokersCollection(broker => broker
+ .WithHost("localhost")
+ .WithPort(_testServer.Port))
+ );
+
+ _testServer.On(
+ request => request.Respond()
+ .WithHost("localhost")
+ .WithPort(_testServer.Port)
+ );
+
+ _testServer.On(
+ request => request.Respond()
+ .WithProtocolName(request.ProtocolsCollection.First().Value.Name));
+
+ _testServer.On(
+ async (request, cancellationToken) => request.Respond()
+ .WithAssignment(
+ await new ConsumerProtocolAssignment(ConsumerProtocolAssignment.MaxVersion)
+ .WithAssignedPartitionsCollection(partition => partition
+ .WithTopic("topic1")
+ .WithPartitionsCollection(new Int32[] { 0 }))
+ .ToBytesAsync(cancellationToken)
+ .ConfigureAwait(false))
+ );
+
+ _testServer.On(request => request.Respond()
+ .WithTopicsCollection(
+ request.TopicsCollection?.Select(topic =>
+ new Func(responseTopic =>
+ responseTopic
+ .WithName(topic.Name)
+ .WithPartitionsCollection(topic.PartitionIndexesCollection
+ .Select(partitionIndex =>
+ new Func(
+ partition => partition
+ .WithPartitionIndex(partitionIndex)))
+ .ToArray())))
+ .ToArray() ??
+ Array.Empty>()));
+
+ var records = new Dictionary();
+ for (var i = 0; i < NumberOfMessage; i++)
+ {
+ records.Add(i, new Record
+ {
+ OffsetDelta = i,
+ Value = Encoding.UTF8.GetBytes(
+ $"data{i} fetched from broker")
+ });
+ }
+
+ _testServer.On(async (request, cancellationToken) =>
+ {
+ var returnsData = false;
+ var response = request.Respond()
+ .WithResponsesCollection(
+ request.TopicsCollection.Select(topic =>
+ new Func(
+ response => response
+ .WithTopic(topic.Topic)
+ .WithPartitionsCollection(topic.PartitionsCollection.Select(partition =>
+ new Func(data =>
+ {
+ var recordBatch = new NullableRecordBatch
+ {
+ LastOffsetDelta = (int)partition.FetchOffset,
+ Magic = 2,
+ Records = records.TryGetValue(partition.FetchOffset.Value,
+ out var record)
+ ? new NullableArray(record)
+ : NullableArray.Default
+ };
+ returnsData = recordBatch.Records != NullableArray.Default;
+ return returnsData ? data.WithRecords(recordBatch) : data;
+ })).ToArray()
+ ))).ToArray());
+ if (!returnsData)
+ {
+ await Task.Delay(request.MaxWaitMs, cancellationToken)
+ .ConfigureAwait(false);
+ }
+
+ return response;
+ });
+
+ _testServer.On(request => request.Respond()
+ .WithTopicsCollection(request.TopicsCollection.Select(topic =>
+ new Func(responseTopic => responseTopic
+ .WithName(topic.Name)
+ .WithPartitionsCollection(topic.PartitionsCollection.Select(partition =>
+ new Func(responsePartition => responsePartition
+ .WithPartitionIndex(partition.PartitionIndex))).ToArray())
+ )).ToArray()
+ ));
+
+ _testServer.On(request => request.Respond());
+ _testServer.On(request => request.Respond());
+
+ return Task.CompletedTask;
+ }
+
+ protected override async Task WhenAsync()
+ {
+ await using (_testServer.Start()
+ .ConfigureAwait(false))
+ {
+ ConsumeMessages("localhost", _testServer.Port, _testServer.Stopping);
+ }
+ }
+
+ [Fact]
+ public void It_should_have_read_the_messages_sent()
+ {
+ _result.Should().HaveCount(NumberOfMessage);
+ for (var i = 0; i < NumberOfMessage; i++)
+ {
+ _result.Should().Contain($"data{i} fetched from broker");
+ }
+ }
+
+ private void ConsumeMessages(string host,
+ int port, CancellationToken testServerStopping)
+ {
+ var consumerConfig = new ConsumerConfig(new Dictionary
+ {
+ { "log_level", "7" }
+ })
+ {
+ BootstrapServers = $"{host}:{port}",
+ ApiVersionRequestTimeoutMs = 30000,
+ Debug = "all",
+ GroupId = "group1",
+ };
+
+ using var consumer = new ConsumerBuilder(consumerConfig)
+ .SetLogHandler(this.Log)
+ .Build();
+
+ consumer.Subscribe("topic1");
+ var cancellationToken = CancellationTokenSource
+ .CreateLinkedTokenSource(testServerStopping, TimeoutCancellationToken).Token;
+ try
+ {
+ for (var i = 0; i < NumberOfMessage; i++)
+ {
+ _result.Add(consumer.Consume(cancellationToken).Message.Value);
+ }
+ }
+ finally
+ {
+ consumer.Close();
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs b/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs
index 51e873f..42b7f77 100644
--- a/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs
+++ b/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs
@@ -113,7 +113,7 @@ public void It_should_have_read_the_message_sent()
_records.First().Value.EncodeToString(Encoding.UTF8).Should().Be("test");
}
- private static async Task ProduceMessageFromClientAsync(string host,
+ private async Task ProduceMessageFromClientAsync(string host,
int port, CancellationToken testServerStopping)
{
var producerConfig = new ProducerConfig(new Dictionary
@@ -129,12 +129,12 @@ private static async Task ProduceMessageFromClientAsync(string host,
using var producer =
new ProducerBuilder(producerConfig)
- .SetLogHandler(LogExtensions.UseLogIt)
+ .SetLogHandler(this.Log)
.Build();
var report = await producer
.ProduceAsync("my-topic",
- new Message { Value = "test" })
+ new Message { Value = "test" }, testServerStopping)
.ConfigureAwait(false);
LogFactory.Create("producer").Info("Produce report {@report}", report);
diff --git a/tests/Confluent.Kafka.1.1.0.Tests/LogExtensions.cs b/tests/Confluent.Kafka.1.1.0.Tests/LogExtensions.cs
index 6f56fd7..90ff015 100644
--- a/tests/Confluent.Kafka.1.1.0.Tests/LogExtensions.cs
+++ b/tests/Confluent.Kafka.1.1.0.Tests/LogExtensions.cs
@@ -1,50 +1,29 @@
using System;
using Confluent.Kafka;
-using Log.It;
namespace Kafka.TestFramework.Tests
{
internal static class LogExtensions
{
- internal static void UseLogIt(
+ internal static void Log(this TestSpecificationAsync test,
IProducer producer, LogMessage logMessage)
{
- var logger = LogFactory.Create(producer.GetType().GetPrettyName());
+ test.Log(logMessage);
+ }
- switch (logMessage.Level)
- {
- case SyslogLevel.Debug:
- LogTo(logger.Debug);
- break;
- case SyslogLevel.Notice:
- case SyslogLevel.Info:
- LogTo(logger.Info);
- break;
- case SyslogLevel.Warning:
- LogTo(logger.Warning);
- break;
- case SyslogLevel.Error:
- LogTo(logger.Error);
- break;
- case SyslogLevel.Alert:
- case SyslogLevel.Critical:
- case SyslogLevel.Emergency:
- LogTo(logger.Fatal);
- break;
- default:
- throw new ArgumentOutOfRangeException($"{logMessage.Level} is not supported");
- }
+ internal static void Log(this TestSpecificationAsync test,
+ IConsumer consumer, LogMessage logMessage)
+ {
+ test.Log(logMessage);
+ }
- void LogTo(Action log)
- {
- log("{name} {facility}: {message}",
- new object[]
- {
- logMessage.Name,
- logMessage.Facility,
- logMessage.Message
- });
- }
+ private static void Log(this TestSpecificationAsync test,
+ LogMessage logMessage)
+ {
+ test.WriteKafkaLogMessage(logMessage.Name,
+ logMessage.Facility,
+ Enum.GetName(logMessage.Level),
+ logMessage.Message);
}
}
}
\ No newline at end of file
diff --git a/tests/Confluent.Kafka.1.1.0.Tests/TestSpecificationAsync.cs b/tests/Confluent.Kafka.1.1.0.Tests/TestSpecificationAsync.cs
deleted file mode 100644
index 4096dad..0000000
--- a/tests/Confluent.Kafka.1.1.0.Tests/TestSpecificationAsync.cs
+++ /dev/null
@@ -1,39 +0,0 @@
-using System;
-using System.Threading.Tasks;
-using Log.It;
-using Log.It.With.NLog;
-using Microsoft.Extensions.Configuration;
-using NLog.Extensions.Logging;
-using Test.It.With.XUnit;
-using Xunit.Abstractions;
-
-namespace Kafka.TestFramework.Tests
-{
- public class TestSpecificationAsync : XUnit2SpecificationAsync
- {
- private readonly IDisposable _logWriter;
-
- static TestSpecificationAsync()
- {
- var config = new ConfigurationBuilder()
- .SetBasePath(System.IO.Directory.GetCurrentDirectory())
- .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
- .Build();
-
- NLog.LogManager.Configuration = new NLogLoggingConfiguration(config.GetSection("NLog"));
- LogFactory.Initialize(new NLogFactory(new LogicalThreadContext()));
- NLogCapturingTarget.Subscribe += Output.Writer.WriteLine;
- }
-
- public TestSpecificationAsync(ITestOutputHelper testOutputHelper) : base(testOutputHelper)
- {
- _logWriter = Output.WriteTo(testOutputHelper);
- }
-
- protected sealed override async Task DisposeAsync(bool disposing)
- {
- await base.DisposeAsync(disposing);
- _logWriter.Dispose();
- }
- }
-}
\ No newline at end of file
diff --git a/tests/Confluent.Kafka.2.4.0.Tests/Confluent.Kafka.2.4.0.Tests.csproj b/tests/Confluent.Kafka.2.4.0.Tests/Confluent.Kafka.2.4.0.Tests.csproj
new file mode 100644
index 0000000..a16fc7d
--- /dev/null
+++ b/tests/Confluent.Kafka.2.4.0.Tests/Confluent.Kafka.2.4.0.Tests.csproj
@@ -0,0 +1,37 @@
+
+
+
+ net7.0
+
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Always
+
+
+
+
diff --git a/tests/Confluent.Kafka.2.4.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_consuming_a_message.cs b/tests/Confluent.Kafka.2.4.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_consuming_a_message.cs
new file mode 100644
index 0000000..33abc61
--- /dev/null
+++ b/tests/Confluent.Kafka.2.4.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_consuming_a_message.cs
@@ -0,0 +1,236 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Confluent.Kafka;
+using FluentAssertions;
+using Kafka.Protocol;
+using Kafka.Protocol.Records;
+using Xunit;
+using Xunit.Abstractions;
+using static Kafka.Protocol.OffsetFetchResponse;
+using Int32 = Kafka.Protocol.Int32;
+using Record = Kafka.Protocol.Records.Record;
+using Uuid = Kafka.Protocol.Uuid;
+
+namespace Kafka.TestFramework.Tests
+{
+ public partial class Given_a_socket_based_test_server
+ {
+ public class When_connecting_to_the_server_and_consuming_a_message : TestSpecificationAsync
+ {
+ private SocketBasedKafkaTestFramework _testServer;
+ private readonly List _result = new List();
+ private const int NumberOfMessage = 5;
+
+ public When_connecting_to_the_server_and_consuming_a_message(
+ ITestOutputHelper testOutputHelper)
+ : base(testOutputHelper)
+ {
+ }
+
+ protected override Task GivenAsync()
+ {
+ _testServer = KafkaTestFramework.WithSocket();
+
+ _testServer.On(
+ request => request.Respond()
+ .WithAllApiKeys());
+
+ var topicId = Uuid.From(Guid.NewGuid());
+ _testServer.On(
+ request => request.Respond()
+ .WithTopicsCollection(
+ request.TopicsCollection?.Select(topic =>
+ new Func(
+ responseTopic =>
+ responseTopic
+ .WithName(topic.Name)
+ .WithTopicId(topicId)
+ .WithPartitionsCollection(partition =>
+ partition
+ .WithLeaderId(0)
+ .WithPartitionIndex(0))))
+ .ToArray() ??
+ Array.Empty>())
+ .WithBrokersCollection(broker => broker
+ .WithHost("localhost")
+ .WithPort(_testServer.Port))
+ );
+
+ _testServer.On(
+ request => request.Respond()
+ .WithHost("localhost")
+ .WithPort(_testServer.Port)
+ );
+
+ _testServer.On(
+ request => request.Respond()
+ .WithProtocolName(request.ProtocolsCollection.First().Value.Name));
+
+ _testServer.On(
+ async (request, cancellationToken) => request.Respond()
+ .WithAssignment(
+ await new ConsumerProtocolAssignment(ConsumerProtocolAssignment.MaxVersion)
+ .WithAssignedPartitionsCollection(partition => partition
+ .WithTopic("topic1")
+ .WithPartitionsCollection(new Int32[] { 0 }))
+ .ToBytesAsync(cancellationToken)
+ .ConfigureAwait(false))
+ );
+
+ _testServer.On(request => request.Respond()
+ .WithGroupsCollection(request.GroupsCollection.Select(group =>
+ new Func(offsetFetchResponseGroup =>
+ offsetFetchResponseGroup
+ .WithGroupId(group.GroupId)
+ .WithTopicsCollection(
+ group.TopicsCollection?
+ .Select(topic =>
+ new Func(
+ responseTopic =>
+ responseTopic
+ .WithName(topic.Name)
+ .WithPartitionsCollection(topic
+ .PartitionIndexesCollection
+ .Select(partitionIndex =>
+ new Func(
+ partition => partition
+ .WithPartitionIndex(partitionIndex)))
+ .ToArray())))
+ .ToArray() ??
+ Array.Empty>()))).ToArray()));
+
+ var records = new Dictionary();
+ for (var i = 0; i < NumberOfMessage; i++)
+ {
+ records.Add(i, new Record
+ {
+ OffsetDelta = i,
+ Value = Encoding.UTF8.GetBytes(
+ $"data{i} fetched from broker")
+ });
+ }
+
+ _testServer.On(async (request, cancellationToken) =>
+ {
+ var returnsData = false;
+ var response = request.Respond()
+ .WithResponsesCollection(
+ request.TopicsCollection.Select(topic =>
+ new Func(
+ response => response
+ .WithTopic(topic.Topic)
+ .WithPartitionsCollection(topic.PartitionsCollection.Select(partition =>
+ new Func(data =>
+ {
+ var recordBatch = new NullableRecordBatch
+ {
+ LastOffsetDelta = (int)partition.FetchOffset,
+ Magic = 2,
+ Records = records.TryGetValue(partition.FetchOffset.Value,
+ out var record)
+ ? new NullableArray(record)
+ : NullableArray.Default
+ };
+ returnsData = recordBatch.Records != NullableArray.Default;
+ return returnsData ? data.WithRecords(recordBatch) : data;
+ })).ToArray()
+ ))).ToArray());
+ if (!returnsData)
+ {
+ await Task.Delay(request.MaxWaitMs, cancellationToken)
+ .ConfigureAwait(false);
+ }
+
+ return response;
+ });
+
+ _testServer.On(request => request.Respond()
+ .WithTopicsCollection(request.TopicsCollection.Select(topic =>
+ new Func(responseTopic => responseTopic
+ .WithName(topic.Name)
+ .WithPartitionsCollection(topic.PartitionsCollection.Select(partition =>
+ new Func(responsePartition => responsePartition
+ .WithPartitionIndex(partition.PartitionIndex))).ToArray())
+ )).ToArray()
+ ));
+
+ _testServer.On(request => request.Respond());
+ _testServer.On(request => request.Respond());
+
+ return Task.CompletedTask;
+ }
+
+ protected override async Task WhenAsync()
+ {
+ await using (_testServer.Start()
+ .ConfigureAwait(false))
+ {
+ ConsumeMessages("localhost", _testServer.Port, _testServer.Stopping);
+ }
+ }
+
+ [Fact]
+ public void It_should_have_read_the_messages_sent()
+ {
+ _result.Should().HaveCount(NumberOfMessage);
+ for (var i = 0; i < NumberOfMessage; i++)
+ {
+ _result.Should().Contain($"data{i} fetched from broker");
+ }
+ }
+
+ private void ConsumeMessages(string host,
+ int port, CancellationToken testServerStopping)
+ {
+ var consumerConfig = new ConsumerConfig(new Dictionary
+ {
+ { "log_level", "7" }
+ })
+ {
+ BootstrapServers = $"{host}:{port}",
+ ApiVersionRequestTimeoutMs = 30000,
+ Debug = "all",
+ GroupId = "group1",
+ LogThreadName = false
+ };
+
+ using var consumer = new ConsumerBuilder(consumerConfig)
+ .SetLogHandler(this.Log)
+ .Build();
+
+ consumer.Subscribe("topic1");
+ var cancellationToken = CancellationTokenSource
+ .CreateLinkedTokenSource(testServerStopping, TimeoutCancellationToken).Token;
+ try
+ {
+ for (var i = 0; i < NumberOfMessage; i++)
+ {
+ _result.Add(consumer.Consume(cancellationToken).Message.Value);
+ }
+ }
+ finally
+ {
+ consumer.Close();
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/tests/Confluent.Kafka.2.4.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs b/tests/Confluent.Kafka.2.4.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs
new file mode 100644
index 0000000..3e3c31b
--- /dev/null
+++ b/tests/Confluent.Kafka.2.4.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs
@@ -0,0 +1,144 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using Confluent.Kafka;
+using FluentAssertions;
+using Kafka.Protocol;
+using Log.It;
+using Xunit;
+using Xunit.Abstractions;
+using Int32 = Kafka.Protocol.Int32;
+using Int64 = Kafka.Protocol.Int64;
+using Record = Kafka.Protocol.Records.Record;
+using String = Kafka.Protocol.String;
+
+namespace Kafka.TestFramework.Tests
+{
+ public partial class Given_a_socket_based_test_server
+ {
+ public class When_connecting_to_the_server_and_producing_a_message : TestSpecificationAsync
+ {
+ private SocketBasedKafkaTestFramework _testServer;
+ private IEnumerable _records;
+
+ public When_connecting_to_the_server_and_producing_a_message(
+ ITestOutputHelper testOutputHelper)
+ : base(testOutputHelper)
+ {
+ }
+
+ protected override Task GivenAsync()
+ {
+ _testServer = KafkaTestFramework.WithSocket();
+
+ _testServer.On(
+ request => request.Respond()
+ .WithAllApiKeys());
+
+ _testServer.On(
+ request => request.Respond()
+ .WithTopicsCollection(
+ request.TopicsCollection?.Select(topic =>
+ new Func(
+ responseTopic =>
+ responseTopic
+ .WithName(topic.Name)
+ .WithPartitionsCollection(partition =>
+ partition
+ .WithLeaderId(Int32.From(0))
+ .WithPartitionIndex(Int32.From(0))
+ .WithReplicaNodesCollection(new[] { Int32.From(0) }))))
+ .ToArray() ?? Array.Empty>())
+ .WithControllerId(Int32.From(0))
+ .WithClusterId(String.From("test"))
+ .WithBrokersCollection(broker => broker
+ .WithRack(String.From("testrack"))
+ .WithNodeId(Int32.From(0))
+ .WithHost(String.From("localhost"))
+ .WithPort(Int32.From(_testServer.Port)))
+ );
+
+ _testServer.On(request =>
+ {
+ _records = request.TopicDataCollection.SelectMany(pair =>
+ pair.Value.PartitionDataCollection.Where(data => data.Records != null)
+ .SelectMany(data => data.Records!.Records));
+ return request
+ .Respond()
+ .WithResponsesCollection(request.TopicDataCollection.Select(topicProduceData =>
+ new Func(
+ topicProduceResponse =>
+ topicProduceResponse
+ .WithName(topicProduceData.Value.Name)
+ .WithPartitionResponsesCollection(topicProduceData.Value
+ .PartitionDataCollection.Select(partitionProduceData =>
+ new Func(
+ partitionProduceResponse =>
+ partitionProduceResponse
+ .WithIndex(partitionProduceData.Index)
+ .WithLogAppendTimeMs(Int64.From(-1))))
+ .ToArray())))
+ .ToArray());
+ });
+
+ return Task.CompletedTask;
+ }
+
+ protected override async Task WhenAsync()
+ {
+ await using (_testServer.Start()
+ .ConfigureAwait(false))
+ {
+ await ProduceMessageFromClientAsync("localhost", _testServer.Port, _testServer.Stopping)
+ .ConfigureAwait(false);
+ }
+ }
+
+ [Fact]
+ public void It_should_have_read_one_record()
+ {
+ _records.Should().HaveCount(1);
+ }
+
+ [Fact]
+ public void It_should_have_read_the_message_sent()
+ {
+ _records.First().Value.EncodeToString(Encoding.UTF8).Should().Be("test");
+ }
+
+ private async Task ProduceMessageFromClientAsync(string host,
+ int port, CancellationToken testServerStopping)
+ {
+ var producerConfig = new ProducerConfig(new Dictionary
+ {
+ { "log_level", "7"}
+ })
+ {
+ BootstrapServers = $"{host}:{port}",
+ ApiVersionRequestTimeoutMs = 30000,
+ Debug = "all"
+ };
+
+ using var producer =
+ new ProducerBuilder(producerConfig)
+ .SetLogHandler(this.Log)
+ .Build();
+
+ var report = await producer
+ .ProduceAsync("my-topic",
+ new Message { Value = "test" }, testServerStopping)
+ .ConfigureAwait(false);
+ LogFactory.Create("producer").Info("Produce report {@report}", report);
+
+ producer.Flush(testServerStopping);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/tests/Confluent.Kafka.2.4.0.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs b/tests/Confluent.Kafka.2.4.0.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs
new file mode 100644
index 0000000..07822f9
--- /dev/null
+++ b/tests/Confluent.Kafka.2.4.0.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs
@@ -0,0 +1,135 @@
+using System.Threading.Tasks;
+using FluentAssertions;
+using Kafka.Protocol;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Kafka.TestFramework.Tests
+{
+ public partial class Given_an_inmemory_kafka_test_framework_and_a_message_subscription
+ {
+ public partial class
+ When_the_client_sends_the_message_subscribed :
+ TestSpecificationAsync
+ {
+ private readonly InMemoryKafkaTestFramework _testServer =
+ KafkaTestFramework.InMemory();
+
+ private ResponsePayload _response;
+
+ public When_the_client_sends_the_message_subscribed(
+ ITestOutputHelper testOutputHelper)
+ : base(testOutputHelper)
+ {
+ }
+
+ protected override Task GivenAsync()
+ {
+ _testServer.On(
+ request => request.Respond()
+ .WithThrottleTimeMs(Int32.From(100))
+ .WithApiKeysCollection(
+ key => key
+ .WithApiKey(FetchRequest.ApiKey)
+ .WithMinVersion(FetchRequest.MinVersion)
+ .WithMaxVersion(FetchRequest.MaxVersion)));
+
+ return Task.CompletedTask;
+ }
+
+ protected override async Task WhenAsync()
+ {
+ await using (_testServer.Start()
+ .ConfigureAwait(false))
+ {
+ var client = await _testServer
+ .CreateRequestClientAsync()
+ .ConfigureAwait(false);
+
+ var message = new ApiVersionsRequest(ApiVersionsRequest.MaxVersion);
+ var requestPayload = new RequestPayload(
+ new RequestHeader(message.HeaderVersion)
+ .WithRequestApiKey(ApiVersionsRequest.ApiKey)
+ .WithRequestApiVersion(
+ message.Version)
+ .WithCorrelationId(Int32.From(12)),
+ message
+ );
+
+ await client
+ .SendAsync(requestPayload)
+ .ConfigureAwait(false);
+
+ _response = await client
+ .ReadAsync(requestPayload)
+ .ConfigureAwait(false);
+ }
+ }
+
+ [Fact]
+ public void
+ The_subscription_should_receive_a_api_versions_response()
+ {
+ _response.Message.Should().BeOfType();
+ }
+
+ [Fact]
+ public void
+ The_subscription_should_receive_a_api_versions_response_with_correlation_id()
+ {
+ _response.Header.CorrelationId.Should().Be(Int32.From(12));
+ }
+
+ [Fact]
+ public void
+ The_subscription_should_receive_a_api_versions_response_with_throttle_time()
+ {
+ _response.Message.As()
+ .ThrottleTimeMs.Should().Be(Int32.From(100));
+ }
+
+ [Fact]
+ public void
+ The_subscription_should_receive_a_api_versions_response_with_one_api_key()
+ {
+ _response.Message.As()
+ .ApiKeysCollection.Should().HaveCount(1);
+ }
+
+ [Fact]
+ public void
+ The_subscription_should_receive_a_api_versions_response_with_fetch_request_api_key()
+ {
+ _response.Message.As()
+ .ApiKeysCollection.Value.Should().ContainKey(FetchRequest.ApiKey);
+ }
+
+ [Fact]
+ public void
+ The_subscription_should_receive_a_api_versions_response_with_fetch_request_api_index()
+ {
+ _response.Message.As()
+ .ApiKeysCollection[FetchRequest.ApiKey].ApiKey.Should()
+ .Be(FetchRequest.ApiKey);
+ }
+
+ [Fact]
+ public void
+ The_subscription_should_receive_a_api_versions_response_with_fetch_request_api_min_version()
+ {
+ _response.Message.As()
+ .ApiKeysCollection[FetchRequest.ApiKey].MinVersion.Should()
+ .Be(FetchRequest.MinVersion);
+ }
+
+ [Fact]
+ public void
+ The_subscription_should_receive_a_api_versions_response_with_fetch_request_api_max_version()
+ {
+ _response.Message.As()
+ .ApiKeysCollection[FetchRequest.ApiKey].MaxVersion.Should()
+ .Be(FetchRequest.MaxVersion);
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/tests/Confluent.Kafka.2.4.0.Tests/LogExtensions.cs b/tests/Confluent.Kafka.2.4.0.Tests/LogExtensions.cs
new file mode 100644
index 0000000..e51f65b
--- /dev/null
+++ b/tests/Confluent.Kafka.2.4.0.Tests/LogExtensions.cs
@@ -0,0 +1,30 @@
+using System;
+using Confluent.Kafka;
+using Log.It;
+
+namespace Kafka.TestFramework.Tests
+{
+ internal static class LogExtensions
+ {
+ internal static void Log(this TestSpecificationAsync test,
+ IProducer producer, LogMessage logMessage)
+ {
+ test.Log(logMessage);
+ }
+
+ internal static void Log(this TestSpecificationAsync test,
+ IConsumer consumer, LogMessage logMessage)
+ {
+ test.Log(logMessage);
+ }
+
+ private static void Log(this TestSpecificationAsync test,
+ LogMessage logMessage)
+ {
+ test.WriteKafkaLogMessage(logMessage.Name,
+ logMessage.Facility,
+ Enum.GetName(logMessage.Level),
+ logMessage.Message);
+ }
+ }
+}
\ No newline at end of file
diff --git a/tests/Confluent.Kafka.2.4.0.Tests/appsettings.json b/tests/Confluent.Kafka.2.4.0.Tests/appsettings.json
new file mode 100644
index 0000000..e2dfec9
--- /dev/null
+++ b/tests/Confluent.Kafka.2.4.0.Tests/appsettings.json
@@ -0,0 +1,26 @@
+{
+ "NLog": {
+ "throwConfigExceptions": true,
+ "internalLogLevel": "info",
+ "internalLogFile": "${basedir}/internal-nlog.txt",
+ "extensions": [
+ { "assembly": "Log.It.With.NLog" }
+ ],
+ "variables": {
+ "simplePipeSeparatorLayout": "${date:format=HH\\:mm\\:ss.fff} | ${logger} | ${level} | ${message} ${onexception:| ${exception:format=type} | ${exception:format=method} | ${exception:format=message} | ${exception:format=stacktrace} | ${exception:method:maxInnerExceptionLevel=5:innerFormat=shortType,message,method}}"
+ },
+ "targets": {
+ "NLogCapturing": {
+ "type": "NLogCapturing",
+ "layout": "${simplePipeSeparatorLayout}"
+ }
+ },
+ "rules": [
+ {
+ "logger": "*",
+ "minLevel": "Trace",
+ "writeTo": "NLogCapturing"
+ }
+ ]
+ }
+}
\ No newline at end of file
diff --git a/tests/Kafka.TestFramework.Tests/BytesExtensions.cs b/tests/Kafka.TestFramework.Tests/BytesExtensions.cs
deleted file mode 100644
index b1d6757..0000000
--- a/tests/Kafka.TestFramework.Tests/BytesExtensions.cs
+++ /dev/null
@@ -1,13 +0,0 @@
-using System.Text;
-using Kafka.Protocol;
-
-namespace Kafka.TestFramework.Tests
-{
- internal static class BytesExtensions
- {
- internal static string EncodeToString(this byte[] bytes, Encoding encoding)
- {
- return encoding.GetString(bytes);
- }
- }
-}
\ No newline at end of file
diff --git a/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_consuming_a_message.cs b/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_consuming_a_message.cs
index 5e40f85..21c58d8 100644
--- a/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_consuming_a_message.cs
+++ b/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_consuming_a_message.cs
@@ -10,8 +10,10 @@
using Kafka.Protocol.Records;
using Xunit;
using Xunit.Abstractions;
+using static Kafka.Protocol.OffsetFetchResponse;
using Int32 = Kafka.Protocol.Int32;
using Record = Kafka.Protocol.Records.Record;
+using Uuid = Kafka.Protocol.Uuid;
namespace Kafka.TestFramework.Tests
{
@@ -37,6 +39,7 @@ protected override Task GivenAsync()
request => request.Respond()
.WithAllApiKeys());
+ var topicId = Uuid.From(Guid.NewGuid());
_testServer.On(
request => request.Respond()
.WithTopicsCollection(
@@ -46,6 +49,7 @@ protected override Task GivenAsync()
responseTopic =>
responseTopic
.WithName(topic.Name)
+ .WithTopicId(topicId)
.WithPartitionsCollection(partition =>
partition
.WithLeaderId(0)
@@ -80,24 +84,33 @@ protected override Task GivenAsync()
);
_testServer.On(request => request.Respond()
- .WithTopicsCollection(
- request.TopicsCollection?.Select(topic =>
- new Func(responseTopic =>
- responseTopic
- .WithName(topic.Name)
- .WithPartitionsCollection(topic.PartitionIndexesCollection
- .Select(partitionIndex =>
- new Func(
- partition => partition
- .WithPartitionIndex(partitionIndex)))
- .ToArray())))
- .ToArray() ??
- Array.Empty>()));
+ .WithGroupsCollection(request.GroupsCollection.Select(group =>
+ new Func(offsetFetchResponseGroup =>
+ offsetFetchResponseGroup
+ .WithGroupId(group.GroupId)
+ .WithTopicsCollection(
+ group.TopicsCollection?
+ .Select(topic =>
+ new Func(
+ responseTopic =>
+ responseTopic
+ .WithName(topic.Name)
+ .WithPartitionsCollection(topic
+ .PartitionIndexesCollection
+ .Select(partitionIndex =>
+ new Func(
+ partition => partition
+ .WithPartitionIndex(partitionIndex)))
+ .ToArray())))
+ .ToArray() ??
+ Array.Empty>()))).ToArray()));
var records = new Dictionary();
for (var i = 0; i < NumberOfMessage; i++)
@@ -118,11 +131,16 @@ protected override Task GivenAsync()
request.TopicsCollection.Select(topic =>
new Func(
response => response
- .WithTopic(topic.Topic)
+ .WithTopicId(topic.TopicId)
.WithPartitionsCollection(topic.PartitionsCollection.Select(partition =>
new Func(data =>
{
+ data.WithCurrentLeader(epoch => epoch
+ .WithLeaderEpoch(epoch.LeaderEpoch)
+ .WithLeaderId(0))
+ .WithPartitionIndex(partition.Partition)
+ .WithHighWatermark(NumberOfMessage - 1);
var recordBatch = new NullableRecordBatch
{
LastOffsetDelta = (int)partition.FetchOffset,
@@ -162,6 +180,8 @@ await Task.Delay(request.MaxWaitMs, cancellationToken)
_testServer.On(request => request.Respond());
_testServer.On(request => request.Respond());
+ _testServer.On(request => request.Respond()
+ .WithPushIntervalMs(1000));
return Task.CompletedTask;
}
@@ -196,10 +216,11 @@ private void ConsumeMessages(string host,
ApiVersionRequestTimeoutMs = 30000,
Debug = "all",
GroupId = "group1",
+ LogThreadName = false
};
using var consumer = new ConsumerBuilder(consumerConfig)
- .SetLogHandler(LogExtensions.UseLogIt)
+ .SetLogHandler(this.Log)
.Build();
consumer.Subscribe("topic1");
diff --git a/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs b/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs
index b8c0a64..3cddedc 100644
--- a/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs
+++ b/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs
@@ -88,6 +88,8 @@ protected override Task GivenAsync()
.ToArray());
});
+ _testServer.On(request => request.Respond()
+ .WithPushIntervalMs(1000));
return Task.CompletedTask;
}
@@ -113,7 +115,7 @@ public void It_should_have_read_the_message_sent()
_records.First().Value.EncodeToString(Encoding.UTF8).Should().Be("test");
}
- private static async Task ProduceMessageFromClientAsync(string host,
+ private async Task ProduceMessageFromClientAsync(string host,
int port, CancellationToken testServerStopping)
{
var producerConfig = new ProducerConfig(new Dictionary
@@ -128,7 +130,7 @@ private static async Task ProduceMessageFromClientAsync(string host,
using var producer =
new ProducerBuilder(producerConfig)
- .SetLogHandler(LogExtensions.UseLogIt)
+ .SetLogHandler(this.Log)
.Build();
var report = await producer
diff --git a/tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj b/tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj
index 58f1397..10fe799 100644
--- a/tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj
+++ b/tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj
@@ -7,14 +7,15 @@
-
+
-
+
-
+
+
@@ -24,6 +25,7 @@
+
diff --git a/tests/Kafka.TestFramework.Tests/LogExtensions.cs b/tests/Kafka.TestFramework.Tests/LogExtensions.cs
index 1552c3b..e51f65b 100644
--- a/tests/Kafka.TestFramework.Tests/LogExtensions.cs
+++ b/tests/Kafka.TestFramework.Tests/LogExtensions.cs
@@ -6,56 +6,25 @@ namespace Kafka.TestFramework.Tests
{
internal static class LogExtensions
{
- internal static void UseLogIt(
+ internal static void Log(this TestSpecificationAsync test,
IProducer producer, LogMessage logMessage)
{
- var logger = LogFactory.Create(producer.GetType().GetPrettyName());
- UseLogIt(logger, logMessage);
+ test.Log(logMessage);
}
- internal static void UseLogIt(
+ internal static void Log(this TestSpecificationAsync test,
IConsumer consumer, LogMessage logMessage)
{
- var logger = LogFactory.Create(consumer.GetType().GetPrettyName());
- UseLogIt(logger, logMessage);
+ test.Log(logMessage);
}
- private static void UseLogIt(ILogger logger, LogMessage logMessage)
+ private static void Log(this TestSpecificationAsync test,
+ LogMessage logMessage)
{
- switch (logMessage.Level)
- {
- case SyslogLevel.Debug:
- LogTo(logger.Debug);
- break;
- case SyslogLevel.Notice:
- case SyslogLevel.Info:
- LogTo(logger.Info);
- break;
- case SyslogLevel.Warning:
- LogTo(logger.Warning);
- break;
- case SyslogLevel.Error:
- LogTo(logger.Error);
- break;
- case SyslogLevel.Alert:
- case SyslogLevel.Critical:
- case SyslogLevel.Emergency:
- LogTo(logger.Fatal);
- break;
- default:
- throw new ArgumentOutOfRangeException($"{logMessage.Level} is not supported");
- }
-
- void LogTo(Action log)
- {
- log("{name} {facility}: {message}",
- new object[]
- {
- logMessage.Name,
- logMessage.Facility,
- logMessage.Message
- });
- }
+ test.WriteKafkaLogMessage(logMessage.Name,
+ logMessage.Facility,
+ Enum.GetName(logMessage.Level),
+ logMessage.Message);
}
}
}
\ No newline at end of file
diff --git a/tests/Kafka.TestFramework.Tests/appsettings.json b/tests/Kafka.TestFramework.Tests/appsettings.json
index 712e57e..e2dfec9 100644
--- a/tests/Kafka.TestFramework.Tests/appsettings.json
+++ b/tests/Kafka.TestFramework.Tests/appsettings.json
@@ -7,7 +7,7 @@
{ "assembly": "Log.It.With.NLog" }
],
"variables": {
- "simplePipeSeparatorLayout": "${date:format=yyyy-MM-dd HH\\:mm\\:ss.fff} | ${logger} | ${level} | ${message} ${onexception:| ${exception:format=type} | ${exception:format=method} | ${exception:format=message} | ${exception:format=stacktrace} | ${exception:method:maxInnerExceptionLevel=5:innerFormat=shortType,message,method}}"
+ "simplePipeSeparatorLayout": "${date:format=HH\\:mm\\:ss.fff} | ${logger} | ${level} | ${message} ${onexception:| ${exception:format=type} | ${exception:format=method} | ${exception:format=message} | ${exception:format=stacktrace} | ${exception:method:maxInnerExceptionLevel=5:innerFormat=shortType,message,method}}"
},
"targets": {
"NLogCapturing": {
diff --git a/tests/Kafka.TestSpecification/BytesExtensions.cs b/tests/Kafka.TestSpecification/BytesExtensions.cs
new file mode 100644
index 0000000..723f8a0
--- /dev/null
+++ b/tests/Kafka.TestSpecification/BytesExtensions.cs
@@ -0,0 +1,12 @@
+using System.Text;
+
+namespace Kafka.TestFramework
+{
+ public static class BytesExtensions
+ {
+ public static string EncodeToString(this byte[] bytes, Encoding encoding)
+ {
+ return encoding.GetString(bytes);
+ }
+ }
+}
\ No newline at end of file
diff --git a/tests/Kafka.TestSpecification/Kafka.TestSpecification.csproj b/tests/Kafka.TestSpecification/Kafka.TestSpecification.csproj
new file mode 100644
index 0000000..2753acf
--- /dev/null
+++ b/tests/Kafka.TestSpecification/Kafka.TestSpecification.csproj
@@ -0,0 +1,35 @@
+
+
+
+ net7.0
+
+ false
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Always
+
+
+
+
diff --git a/tests/Kafka.TestFramework.Tests/ObjectExtensions.cs b/tests/Kafka.TestSpecification/ObjectExtensions.cs
similarity index 88%
rename from tests/Kafka.TestFramework.Tests/ObjectExtensions.cs
rename to tests/Kafka.TestSpecification/ObjectExtensions.cs
index 46196b6..5bb0c30 100644
--- a/tests/Kafka.TestFramework.Tests/ObjectExtensions.cs
+++ b/tests/Kafka.TestSpecification/ObjectExtensions.cs
@@ -1,7 +1,7 @@
using System;
using System.Threading.Tasks;
-namespace Kafka.TestFramework.Tests
+namespace Kafka.TestFramework
{
internal static class ObjectExtensions
{
diff --git a/tests/Kafka.TestFramework.Tests/TestSpecificationAsync.cs b/tests/Kafka.TestSpecification/TestSpecificationAsync.cs
similarity index 67%
rename from tests/Kafka.TestFramework.Tests/TestSpecificationAsync.cs
rename to tests/Kafka.TestSpecification/TestSpecificationAsync.cs
index 903c6c1..5b46274 100644
--- a/tests/Kafka.TestFramework.Tests/TestSpecificationAsync.cs
+++ b/tests/Kafka.TestSpecification/TestSpecificationAsync.cs
@@ -1,6 +1,7 @@
using System;
using System.Threading;
using System.Threading.Tasks;
+using Confluent.Kafka;
using Log.It;
using Log.It.With.NLog;
using Microsoft.Extensions.Configuration;
@@ -8,10 +9,11 @@
using Test.It.With.XUnit;
using Xunit.Abstractions;
-namespace Kafka.TestFramework.Tests
+namespace Kafka.TestFramework
{
- public class TestSpecificationAsync : XUnit2SpecificationAsync
+ public abstract class TestSpecificationAsync : XUnit2SpecificationAsync
{
+ private readonly ITestOutputHelper _testOutputHelper;
private readonly IDisposable _logWriter;
static TestSpecificationAsync()
@@ -28,11 +30,22 @@ static TestSpecificationAsync()
public TestSpecificationAsync(ITestOutputHelper testOutputHelper) : base(testOutputHelper)
{
+ _testOutputHelper = testOutputHelper;
_logWriter = Output.WriteTo(testOutputHelper);
}
+ public void WriteKafkaLogMessage(
+ string name,
+ string facility,
+ string level,
+ string message)
+ {
+ _testOutputHelper?.WriteLine(
+ $"{DateTimeOffset.Now:HH:mm:ss.fff} | {name}:{facility} | {level} | {message}");
+ }
+
protected CancellationToken TimeoutCancellationToken =>
- new CancellationTokenSource(TimeSpan.FromSeconds(20)).Token;
+ new CancellationTokenSource(TimeSpan.FromSeconds(5)).Token;
protected sealed override async Task DisposeAsync(bool disposing)
{
diff --git a/tests/Kafka.TestSpecification/appsettings.json b/tests/Kafka.TestSpecification/appsettings.json
new file mode 100644
index 0000000..e2dfec9
--- /dev/null
+++ b/tests/Kafka.TestSpecification/appsettings.json
@@ -0,0 +1,26 @@
+{
+ "NLog": {
+ "throwConfigExceptions": true,
+ "internalLogLevel": "info",
+ "internalLogFile": "${basedir}/internal-nlog.txt",
+ "extensions": [
+ { "assembly": "Log.It.With.NLog" }
+ ],
+ "variables": {
+ "simplePipeSeparatorLayout": "${date:format=HH\\:mm\\:ss.fff} | ${logger} | ${level} | ${message} ${onexception:| ${exception:format=type} | ${exception:format=method} | ${exception:format=message} | ${exception:format=stacktrace} | ${exception:method:maxInnerExceptionLevel=5:innerFormat=shortType,message,method}}"
+ },
+ "targets": {
+ "NLogCapturing": {
+ "type": "NLogCapturing",
+ "layout": "${simplePipeSeparatorLayout}"
+ }
+ },
+ "rules": [
+ {
+ "logger": "*",
+ "minLevel": "Trace",
+ "writeTo": "NLogCapturing"
+ }
+ ]
+ }
+}
\ No newline at end of file