diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 0000000..a580509 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1 @@ +* @Fresa \ No newline at end of file diff --git a/.github/version_config.yml b/.github/version_config.yml new file mode 100644 index 0000000..c569c97 --- /dev/null +++ b/.github/version_config.yml @@ -0,0 +1,38 @@ +mode: ContinuousDelivery +# Conventional Commits https://www.conventionalcommits.org/en/v1.0.0/ +# https://regex101.com/r/Ms7Vx6/2 +major-version-bump-message: "(build|chore|ci|docs|doc|feat|fix|perf|refactor|revert|style|test)(\\([a-z]+\\))?(!: .+|: (.+\\n\\n)+BREAKING CHANGE: .+)" +# https://regex101.com/r/Oqhi2m/1 +minor-version-bump-message: "(feat)(\\([a-z]+\\))?: .+" +# https://regex101.com/r/f5C4fP/1 +patch-version-bump-message: "(build|chore|ci|docs|doc|fix|perf|refactor|revert|style|test)(\\([a-z]+\\))?: .+" +# Match nothing +no-bump-message: ^\b$ +continuous-delivery-fallback-tag: '' +branches: + development: + increment: Patch + # Everything except main and master + regex: ^(?!(main|master)$) + track-merge-target: true + source-branches: [] + feature: + # Match nothing + regex: ^\b$ + develop: + # Match nothing + regex: ^\b$ + main: + source-branches: [] + release: + # Match nothing + regex: ^\b$ + pull-request: + # Match nothing + regex: ^\b$ + hotfix: + # Match nothing + regex: ^\b$ + support: + # Match nothing + regex: ^\b$ diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..bb368d6 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,109 @@ +name: Continuous Delivery + +on: + push: + branches: + - '**' + +jobs: + test: + name: Build & Test + runs-on: ${{ matrix.os }} + timeout-minutes: 10 + strategy: + matrix: + os: [ubuntu-latest] + steps: + - uses: actions/checkout@v2 + - name: Setup .NET + uses: actions/setup-dotnet@v1 + with: + dotnet-version: 3.1.x + - name: Build + run: dotnet build -c Release + - name: Test + run: dotnet test -c Release --no-build --verbosity normal + + release: + name: Create Release + needs: [test] + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + # Fetches entire history, so we can analyze commits since last tag + fetch-depth: 0 + - name: Install GitVersion + uses: gittools/actions/gitversion/setup@v0.9.7 + with: + versionSpec: '5.x' + - name: Determine Version + id: gitversion + uses: gittools/actions/gitversion/execute@v0 + with: + useConfigFile: true + configFilePath: .github/version_config.yml + - name: Determine Release Info + id: release + run: | + from_tag=$(git tag --points-at ${{ steps.gitversion.outputs.versionSourceSha }} | grep -m 1 ^v[0-9]*\.[0-9]*\.[0-9]* | head -1) + [[ -z "$from_tag" ]] && \ + from_ref_exclusive=${{ steps.gitversion.outputs.versionSourceSha }} || \ + from_ref_exclusive=$from_tag + + [[ -z "${{ steps.gitversion.outputs.preReleaseTag }}" ]] && \ + is_prerelease=false || \ + is_prerelease=true + + [[ $is_prerelease == true ]] && \ + version=${{ steps.gitversion.outputs.majorMinorPatch }}-pre-${{ steps.gitversion.outputs.commitsSinceVersionSource }} || \ + version=${{ steps.gitversion.outputs.majorMinorPatch }} + + echo "::set-output name=is_prerelease::$is_prerelease" + echo "::set-output name=tag::v$version" + echo "::set-output name=version::$version" + echo "::set-output name=from_ref_exclusive::$from_ref_exclusive" + - name: Create Tag + uses: actions/github-script@v3 + with: + script: | + github.git.createRef({ + owner: context.repo.owner, + repo: context.repo.repo, + ref: "refs/tags/${{ steps.release.outputs.tag }}", + sha: "${{ steps.gitversion.outputs.sha }}" + }); + - name: Generate Release Notes + id: release_notes + uses: Fresa/release-notes-generator@v0 + with: + version: ${{ steps.release.outputs.tag }} + from_ref_exclusive: ${{ steps.release.outputs.from_ref_exclusive }} + to_ref_inclusive: ${{ steps.release.outputs.tag }} + - name: Create Release + id: create_release + uses: softprops/action-gh-release@v1 + with: + body: ${{ steps.release_notes.outputs.release_notes }} + tag_name: ${{ steps.release.outputs.tag }} + prerelease: ${{ steps.release.outputs.is_prerelease }} + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} + - name: Update Latest Minor Tag + uses: EndBug/latest-tag@v1 + if: steps.release.outputs.is_prerelease == 'false' + with: + tag-name: v${{ steps.gitversion.outputs.major }} + description: ${{ steps.release.outputs.tag }} + - name: Update Latest Patch Tag + uses: EndBug/latest-tag@v1 + if: steps.release.outputs.is_prerelease == 'false' + with: + tag-name: v${{ steps.gitversion.outputs.major }}.${{ steps.gitversion.outputs.minor }} + description: ${{ steps.release.outputs.tag }} + - name: Pack + env: + release_notes: ${{ steps.release_notes.outputs.release_notes }} + run: dotnet pack src/Kafka.TestFramework/Kafka.TestFramework.csproj -c Release -o nuget-packages -p:PackageVersion=${{ steps.release.outputs.version }} -p:PackageReleaseNotes="$release_notes" + - name: Publish to nuget.org + run: dotnet nuget push nuget-packages/Kafka.TestFramework.${{ steps.release.outputs.version }}.nupkg --api-key ${{secrets.NUGET_API_KEY}} --source https://api.nuget.org/v3/index.json diff --git a/Kafka.TestFramework.sln b/Kafka.TestFramework.sln index 0cd3255..7b8b432 100644 --- a/Kafka.TestFramework.sln +++ b/Kafka.TestFramework.sln @@ -1,21 +1,24 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio Version 16 -VisualStudioVersion = 16.0.30011.22 +# Visual Studio Version 17 +VisualStudioVersion = 17.0.31903.59 MinimumVisualStudioVersion = 10.0.40219.1 Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{0EEC4A41-7C67-4580-8A21-BFA01B2415F1}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{93A69F6A-D895-4A0E-9241-655EF7A9EEDA}" ProjectSection(SolutionItems) = preProject - appveyor.yml = appveyor.yml + .github\workflows\ci.yml = .github\workflows\ci.yml + .github\CODEOWNERS = .github\CODEOWNERS README.md = README.md - version = version + .github\version_config.yml = .github\version_config.yml EndProjectSection EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Kafka.TestFramework.Tests", "tests\Kafka.TestFramework.Tests\Kafka.TestFramework.Tests.csproj", "{FCFCC73C-B0A6-4D4E-A765-9A94303D00AD}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Kafka.TestFramework", "src\Kafka.TestFramework\Kafka.TestFramework.csproj", "{161B63C2-B654-4611-AAC8-28B54B5F3C9D}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Confluent.Kafka.1.1.0.Tests", "tests\Confluent.Kafka.1.1.0.Tests\Confluent.Kafka.1.1.0.Tests.csproj", "{12646283-CC9E-4D45-8E89-F52337C790B6}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -30,12 +33,17 @@ Global {161B63C2-B654-4611-AAC8-28B54B5F3C9D}.Debug|Any CPU.Build.0 = Debug|Any CPU {161B63C2-B654-4611-AAC8-28B54B5F3C9D}.Release|Any CPU.ActiveCfg = Release|Any CPU {161B63C2-B654-4611-AAC8-28B54B5F3C9D}.Release|Any CPU.Build.0 = Release|Any CPU + {12646283-CC9E-4D45-8E89-F52337C790B6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {12646283-CC9E-4D45-8E89-F52337C790B6}.Debug|Any CPU.Build.0 = Debug|Any CPU + {12646283-CC9E-4D45-8E89-F52337C790B6}.Release|Any CPU.ActiveCfg = Release|Any CPU + {12646283-CC9E-4D45-8E89-F52337C790B6}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection GlobalSection(NestedProjects) = preSolution {FCFCC73C-B0A6-4D4E-A765-9A94303D00AD} = {0EEC4A41-7C67-4580-8A21-BFA01B2415F1} + {12646283-CC9E-4D45-8E89-F52337C790B6} = {0EEC4A41-7C67-4580-8A21-BFA01B2415F1} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {B432BD60-300C-4955-9D44-011413D52F2D} diff --git a/README.md b/README.md index 112a19e..c324820 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,13 @@ # Kafka.TestFramework An in-memory test framework for Kafka clients which can be used to subscribe on request messages and respond with response messages. The test server can be used in-memory or connected to clients over TCP. -[![Build status](https://ci.appveyor.com/api/projects/status/3by56qq57a8or5a2?svg=true)](https://ci.appveyor.com/project/Fresa/kafka-testframework) - -[![Build history](https://buildstats.info/appveyor/chart/Fresa/kafka-testframework)](https://ci.appveyor.com/project/Fresa/kafka-testframework/history) +[![Continuous Delivery](https://github.com/Fresa/Kafka.TestFramework/actions/workflows/ci.yml/badge.svg)](https://github.com/Fresa/Kafka.TestFramework/actions/workflows/ci.yml) ## Download https://www.nuget.org/packages/kafka.testframework ## Getting Started The test framework can be used in-memory or by setting up a TCP socket that the kafka client can connect to. See the [`integration tests`](https://github.com/Fresa/Kafka.TestFramework/blob/master/tests/Kafka.TestFramework.Tests). + +### v2.x +Now supports [Kafka.Protocol](https://github.com/Fresa/Kafka.Protocol) v2.x. \ No newline at end of file diff --git a/appveyor.yml b/appveyor.yml deleted file mode 100644 index f539dc8..0000000 --- a/appveyor.yml +++ /dev/null @@ -1,34 +0,0 @@ -version: '{build}' -skip_tags: true -image: Visual Studio 2019 -configuration: Release -dotnet_csproj: - patch: true - file: '**\Kafka.TestFramework.csproj' - package_version: '{version}' -environment: - git_access_token: - secure: V8i7UZRY6IQFpngUi2sgnunCFZKvE4al96flaBKyWqXyRLmTabX0/1Z0Tpl8BWKr - appveyor_api_token: - secure: eMpCyScSyam7rRtm3bSRAzt29qATXHWxk8mfgdTpyh0= -install: -- ps: scripts/set-version.ps1 -before_build: -- cmd: dotnet restore -build_script: -- dotnet build -c %CONFIGURATION% -test_script: -- dotnet test -c %CONFIGURATION% -after_test: -- ps: >- - scripts/update-release-notes.ps1 - - dotnet pack src/Kafka.TestFramework/Kafka.TestFramework.csproj -c %CONFIGURATION% -o nupkgs -artifacts: -- path: nupkgs\*.nupkg -deploy: -- provider: NuGet - api_key: - secure: CrJHCcuXYVvE4Qwk+1VfIboKDImZyj+NUE5fZQj300ctK0uUCICPgrVNA7YfLRl+ -after_deploy: -- ps: scripts/set-version-tag-in-git.ps1 \ No newline at end of file diff --git a/scripts/set-version-tag-in-git.ps1 b/scripts/set-version-tag-in-git.ps1 deleted file mode 100644 index 31026d9..0000000 --- a/scripts/set-version-tag-in-git.ps1 +++ /dev/null @@ -1,7 +0,0 @@ -$ErrorActionPreference = "Stop" -git config --global credential.helper store -Add-Content -Path "$env:USERPROFILE\.git-credentials" -Value "https://$($env:git_access_token):x-oauth-basic@github.com`n" -NoNewline -git config --global user.email "fresa@fresa.se" -git config --global user.name "Fredrik Arvidsson" -git tag v$($env:APPVEYOR_BUILD_VERSION) $($env:APPVEYOR_REPO_COMMIT) -git push origin --tags --quiet \ No newline at end of file diff --git a/scripts/set-version.ps1 b/scripts/set-version.ps1 deleted file mode 100644 index a5bbca3..0000000 --- a/scripts/set-version.ps1 +++ /dev/null @@ -1,77 +0,0 @@ -$ErrorActionPreference = "Stop" -Write-Host "Versioning is built on the previous build version which is expected to be in semver format, ex: 1.1.6-alpha" -$token = $env:appveyor_api_token #should be defined as a secure variable -$branch = $env:APPVEYOR_REPO_BRANCH - -$headers = @{ - "Authorization" = "Bearer $token" - "Content-type" = "application/json" -} -$apiURL = "https://ci.appveyor.com/api/projects/$env:APPVEYOR_ACCOUNT_NAME/$env:APPVEYOR_PROJECT_SLUG" -$history = Invoke-RestMethod -Uri "$apiURL/history?recordsNumber=2" -Headers $headers -Method Get - -$version = (Get-Content .\version) -Write-Host "Current version specified: $version" -[int]$major = $version.Substring(0, $version.IndexOf(".")) -[int]$minor = $version.Substring($version.IndexOf(".") + 1) - -# apply versioning strategy if this is not the first build -if ($history.builds.Count -eq 2) -{ - $previousVersion = $history.builds[1].version - Write-Host "Previous version: $previousVersion" - [int]$previousMajor = $previousVersion.Substring(0, $previousVersion.IndexOf(".")) - Write-Host "Previous major version: $previousMajor" - [int]$previousMinor = $previousVersion.Substring($previousVersion.IndexOf(".") + 1, $previousVersion.LastIndexOf(".") - ($previousVersion.IndexOf(".") + 1)) - Write-Host "Previous minor version: $previousMinor" - # handle suffix, eg. 1.2.3-alpha - if ([int]$previousVersion.IndexOf("-") -eq -1){ - [int]$previousPatch = $previousVersion.Substring($previousVersion.LastIndexOf(".") + 1) - } else { - [int]$previousPatch = $previousVersion.Substring($previousVersion.LastIndexOf(".") + 1, $previousVersion.IndexOf("-")-($previousVersion.LastIndexOf(".") + 1)) - } - Write-Host "Previous patch version: $previousPatch" - - $patch = $previousPatch + 1 - - if ($previousMajor -ne $major) - { - if ($major -ne $previousMajor + 1) - { - throw "Major version identity $major can only be incremented by one in regards to previous major $previousMajor" - } - - if ($minor -ne 0) - { - throw "Minor version has to be set to 0 when incrementing major version" - } - - Write-Warning "Major version has been changed, setting patch to 0" - $patch = 0 - } - if ($previousMinor -ne $minor) - { - if ($minor -ne $previousMinor + 1) - { - throw "Minor version identity $minor can only be incremented by one in regards to previous minor $previousMinor" - } - - Write-Warning "Minor version has been changed, setting patch to 0" - $patch = 0 - } -} else -{ - # first build - Write-Warning "No previous builds found, setting patch to 0" - $patch = 0 -} - -$versionSuffix = "" -if ($branch -ne "master") -{ - $versionSuffix="-alpha" -} - -$currentBuildVersion = "$version.$patch$versionSuffix" -Write-Host "Setting build version to $currentBuildVersion" -Update-AppveyorBuild -Version "$currentBuildVersion" \ No newline at end of file diff --git a/scripts/update-release-notes.ps1 b/scripts/update-release-notes.ps1 deleted file mode 100644 index b78e69b..0000000 --- a/scripts/update-release-notes.ps1 +++ /dev/null @@ -1,20 +0,0 @@ -$ErrorActionPreference = "Stop" -git fetch --tags -$tags = git tag -l v* -if ($tags) -{ - $releaseNotes = git log "$(git describe --tags --match v* --abbrev=0)..$($env:APPVEYOR_REPO_COMMIT)" --pretty=format:"-%s" --no-merges -} -else -{ - $releaseNotes = git log $($env:APPVEYOR_REPO_COMMIT) --pretty=format:"-%s" --no-merges -} - -$releaseNotesAsString = if ($releaseNotes -eq $null) { "" } else { [string]::join("`n", $releaseNotes) } - -Write-Host "Release notes: '$($releaseNotesAsString)'" - -$path = "src/$env:APPVEYOR_PROJECT_NAME/$env:APPVEYOR_PROJECT_NAME.csproj" -[xml]$xml = Get-Content -Path $path -$xml.GetElementsByTagName("PackageReleaseNotes").set_InnerXML("$releaseNotesAsString") -Set-Content $path -Value $xml.InnerXml -Force \ No newline at end of file diff --git a/src/Kafka.TestFramework/AsyncDisposableExtensions.cs b/src/Kafka.TestFramework/AsyncDisposableExtensions.cs new file mode 100644 index 0000000..9654bfc --- /dev/null +++ b/src/Kafka.TestFramework/AsyncDisposableExtensions.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Kafka.TestFramework +{ + internal static class AsyncDisposableExtensions + { + internal static Task DisposeAllAsync( + this IEnumerable disposables) + => disposables.Select(client => client.DisposeAsync()) + .WhenAllAsync(); + } +} \ No newline at end of file diff --git a/src/Kafka.TestFramework/Client.cs b/src/Kafka.TestFramework/Client.cs index f4655de..8bd708f 100644 --- a/src/Kafka.TestFramework/Client.cs +++ b/src/Kafka.TestFramework/Client.cs @@ -1,71 +1,28 @@ using System; -using System.IO; using System.IO.Pipelines; using System.Threading; using System.Threading.Tasks; -using Kafka.Protocol; -using Int32 = Kafka.Protocol.Int32; namespace Kafka.TestFramework { - internal abstract class Client : IAsyncDisposable - where TSendPayload : IPayload + internal abstract class Client : IAsyncDisposable { - private readonly CancellationTokenSource _cancellationSource = new CancellationTokenSource(); + private readonly CancellationTokenSource _cancellationSource; private readonly Pipe _pipe = new Pipe(); private readonly INetworkClient _networkClient; private Task _sendAndReceiveBackgroundTask = default!; - protected Client(INetworkClient networkClient) + protected Client(INetworkClient networkClient, CancellationToken cancellationToken) { _networkClient = networkClient; - Reader = new KafkaReader(_pipe.Reader); - } - - protected IKafkaReader Reader { get; } - - public async ValueTask SendAsync( - TSendPayload payload, - CancellationToken cancellationToken = default) - { - var buffer = new MemoryStream(); - await using var _ = buffer.ConfigureAwait(false); - var writer = new KafkaWriter(buffer); - await using (writer.ConfigureAwait(false)) - { - await payload - .WriteToAsync(writer, cancellationToken) - .ConfigureAwait(false); - } - - var lengthBuffer = new MemoryStream(); - await using (lengthBuffer - .ConfigureAwait(false)) - { - var lengthWriter = new KafkaWriter(lengthBuffer); - await using (lengthWriter.ConfigureAwait(false)) - { - await lengthWriter - .WriteInt32Async(Int32.From((int)buffer.Length), cancellationToken) - .ConfigureAwait(false); - } - - await _networkClient.SendAsync( - lengthBuffer - .GetBuffer() - .AsMemory() - .Slice(0, (int)lengthBuffer.Length), - cancellationToken) - .ConfigureAwait(false); - } - - await _networkClient.SendAsync( - buffer.GetBuffer().AsMemory() - .Slice(0, (int)buffer.Length), - cancellationToken) - .ConfigureAwait(false); + NetworkClient = new NetworkStream(networkClient); + Reader = _pipe.Reader; + _cancellationSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); } + protected NetworkStream NetworkClient { get; } + protected PipeReader Reader { get; } + protected void StartReceiving() { _sendAndReceiveBackgroundTask = Task.Run( diff --git a/src/Kafka.TestFramework/InMemoryKafkaTestFramework.cs b/src/Kafka.TestFramework/InMemoryKafkaTestFramework.cs index 1a49656..4b96ed6 100644 --- a/src/Kafka.TestFramework/InMemoryKafkaTestFramework.cs +++ b/src/Kafka.TestFramework/InMemoryKafkaTestFramework.cs @@ -1,6 +1,9 @@ -using System.Threading; +using System; +using System.Collections.Generic; +using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Dataflow; +using Kafka.Protocol; namespace Kafka.TestFramework { @@ -23,9 +26,36 @@ public async Task CreateRequestClientAsync( var requestClient = new CrossWiredMemoryNetworkClient(first, second); var responseClient = new CrossWiredMemoryNetworkClient(second, first); await _clients - .SendAsync(responseClient, cancellationToken) + .SendAsync(responseClient, CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, Stopping).Token) .ConfigureAwait(false); - return RequestClient.Start(requestClient); + return new DisposableRequestClientDecorator(RequestClient.Start(requestClient, Stopping), responseClient, second, first); + } + + private class DisposableRequestClientDecorator : IRequestClient + { + private readonly IRequestClient _requestClient; + private readonly List _disposables; + + public DisposableRequestClientDecorator(IRequestClient requestClient, params IAsyncDisposable[] disposables) + { + _requestClient = requestClient; + _disposables = new List(disposables) { requestClient }; + } + public async ValueTask DisposeAsync() + { + await _disposables.DisposeAllAsync() + .ConfigureAwait(false); + } + + public ValueTask ReadAsync(RequestPayload requestPayload, CancellationToken cancellationToken = default) + { + return _requestClient.ReadAsync(requestPayload, cancellationToken); + } + + public ValueTask SendAsync(RequestPayload payload, CancellationToken cancellationToken = default) + { + return _requestClient.SendAsync(payload, cancellationToken); + } } } } \ No newline at end of file diff --git a/src/Kafka.TestFramework/Kafka.TestFramework.csproj b/src/Kafka.TestFramework/Kafka.TestFramework.csproj index 3fa8669..c4cf0fe 100644 --- a/src/Kafka.TestFramework/Kafka.TestFramework.csproj +++ b/src/Kafka.TestFramework/Kafka.TestFramework.csproj @@ -20,8 +20,8 @@ - - + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/Kafka.TestFramework/KafkaTestFramework.cs b/src/Kafka.TestFramework/KafkaTestFramework.cs index 128aaae..435ba56 100644 --- a/src/Kafka.TestFramework/KafkaTestFramework.cs +++ b/src/Kafka.TestFramework/KafkaTestFramework.cs @@ -8,17 +8,23 @@ namespace Kafka.TestFramework { - public abstract class KafkaTestFramework : IAsyncDisposable + public abstract class KafkaTestFramework { private readonly INetworkServer _networkServer; private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); + private readonly List _backgroundTasks = new List(); - private const int Stopped = 0; - private const int Started = 1; - private int _status = Stopped; + private const int HasStopped = 0; + private const int HasStarted = 1; + private int _status = HasStopped; + + /// + /// Triggered when the test framework is stopping + /// + public CancellationToken Stopping => _cancellationTokenSource.Token; public static InMemoryKafkaTestFramework InMemory() { @@ -47,10 +53,10 @@ internal KafkaTestFramework(INetworkServer networkServer) public IAsyncDisposable Start() { - var previousStatus = Interlocked.Exchange(ref _status, Started); - if (previousStatus == Started) + var previousStatus = Interlocked.Exchange(ref _status, HasStarted); + if (previousStatus == HasStarted) { - return this; + return new StopOnDispose(this); } var task = Task.Run( @@ -61,7 +67,7 @@ public IAsyncDisposable Start() try { var client = await _networkServer - .WaitForConnectedClientAsync(_cancellationTokenSource.Token) + .WaitForConnectedClientAsync(Stopping) .ConfigureAwait(false); ReceiveMessagesFor(client); } @@ -69,10 +75,15 @@ public IAsyncDisposable Start() { return; } + catch + { + _cancellationTokenSource.Cancel(); + throw; + } } }); _backgroundTasks.Add(task); - return this; + return new StopOnDispose(this); } private void ReceiveMessagesFor(INetworkClient networkClient) @@ -80,14 +91,14 @@ private void ReceiveMessagesFor(INetworkClient networkClient) var task = Task.Run( async () => { - var client = ResponseClient.Start(networkClient); + var client = ResponseClient.Start(networkClient, Stopping); await using var _ = client.ConfigureAwait(false); while (_cancellationTokenSource.IsCancellationRequested == false) { try { var requestPayload = await client - .ReadAsync(_cancellationTokenSource.Token) + .ReadAsync(Stopping) .ConfigureAwait(false); if (!_subscriptions.TryGetValue( @@ -95,29 +106,33 @@ private void ReceiveMessagesFor(INetworkClient networkClient) out var subscription)) { throw new InvalidOperationException( - $"Missing subscription for {requestPayload.Message.GetType()}"); + $"Missing subscription for {requestPayload.Message.GetType()}"); } var response = await subscription( - requestPayload.Message, - _cancellationTokenSource.Token); + requestPayload.Message, + Stopping); await client .SendAsync( new ResponsePayload( - requestPayload, - new ResponseHeader(requestPayload.Header.Version) + new ResponseHeader( + Messages.GetResponseHeaderVersionFor(requestPayload)) .WithCorrelationId(requestPayload.Header.CorrelationId), response), - _cancellationTokenSource.Token) + Stopping) .ConfigureAwait(false); } catch when (_cancellationTokenSource.IsCancellationRequested) { return; } + catch + { + _cancellationTokenSource.Cancel(); + throw; + } } - }); _backgroundTasks.Add(task); } @@ -164,11 +179,20 @@ public KafkaTestFramework On( return this; } - public async ValueTask DisposeAsync() + internal sealed class StopOnDispose : IAsyncDisposable { - _cancellationTokenSource.Cancel(); + private readonly KafkaTestFramework _testFramework; - await Task.WhenAll(_backgroundTasks); + public StopOnDispose(KafkaTestFramework testFramework) + { + _testFramework = testFramework; + } + public async ValueTask DisposeAsync() + { + _testFramework._cancellationTokenSource.Cancel(); + await Task.WhenAll(_testFramework._backgroundTasks) + .ConfigureAwait(false); + } } } } \ No newline at end of file diff --git a/src/Kafka.TestFramework/NetworkStream.cs b/src/Kafka.TestFramework/NetworkStream.cs new file mode 100644 index 0000000..51d9cc1 --- /dev/null +++ b/src/Kafka.TestFramework/NetworkStream.cs @@ -0,0 +1,50 @@ +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Kafka.TestFramework +{ + internal class NetworkStream : Stream + { + private readonly INetworkClient _networkClient; + + public NetworkStream(INetworkClient networkClient) + { + _networkClient = networkClient; + } + + public override void Flush() + { + } + + public override int Read(byte[] buffer, int offset, int count) + { + throw new System.NotImplementedException(); + } + + public override long Seek(long offset, SeekOrigin origin) + { + throw new System.NotImplementedException(); + } + + public override void SetLength(long value) + { + throw new System.NotImplementedException(); + } + + public override void Write(byte[] buffer, int offset, int count) + { + throw new System.NotImplementedException(); + } + + public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = new CancellationToken()) => + _networkClient.SendAsync(buffer, cancellationToken).AsValueTask(); + + public override bool CanRead => false; + public override bool CanSeek => false; + public override bool CanWrite => true; + public override long Length => 0; + public override long Position { get; set; } = 0; + } +} \ No newline at end of file diff --git a/src/Kafka.TestFramework/RequestClient.cs b/src/Kafka.TestFramework/RequestClient.cs index 1fe0a27..867698a 100644 --- a/src/Kafka.TestFramework/RequestClient.cs +++ b/src/Kafka.TestFramework/RequestClient.cs @@ -4,19 +4,27 @@ namespace Kafka.TestFramework { - internal class RequestClient : Client, IRequestClient + internal class RequestClient : Client, IRequestClient { - private RequestClient(INetworkClient networkClient) : base(networkClient) + private RequestClient(INetworkClient networkClient, CancellationToken cancellationToken) : + base(networkClient, cancellationToken) { } - internal static RequestClient Start(INetworkClient networkClient) + internal static RequestClient Start(INetworkClient networkClient, CancellationToken cancellationToken) { - var client = new RequestClient(networkClient); + var client = new RequestClient(networkClient, cancellationToken); client.StartReceiving(); return client; } + public ValueTask SendAsync( + RequestPayload payload, + CancellationToken cancellationToken = default) + { + return payload.WriteToAsync(NetworkClient, cancellationToken); + } + public async ValueTask ReadAsync( RequestPayload requestPayload, CancellationToken cancellationToken = default) diff --git a/src/Kafka.TestFramework/ResponseClient.cs b/src/Kafka.TestFramework/ResponseClient.cs index bc56ec8..757f478 100644 --- a/src/Kafka.TestFramework/ResponseClient.cs +++ b/src/Kafka.TestFramework/ResponseClient.cs @@ -4,25 +4,32 @@ namespace Kafka.TestFramework { - internal class ResponseClient : Client + internal class ResponseClient : Client { - private ResponseClient(INetworkClient networkClient) : base(networkClient) + private ResponseClient(INetworkClient networkClient, CancellationToken cancellationToken) : + base(networkClient, cancellationToken) { } - internal static ResponseClient Start(INetworkClient networkClient) + internal static ResponseClient Start(INetworkClient networkClient, CancellationToken cancellationToken) { - var client = new ResponseClient(networkClient); + var client = new ResponseClient(networkClient, cancellationToken); client.StartReceiving(); return client; } + internal ValueTask SendAsync( + ResponsePayload payload, + CancellationToken cancellationToken = default) + { + return payload.WriteToAsync(NetworkClient, cancellationToken); + } + internal async Task ReadAsync( CancellationToken cancellationToken = default) { return await RequestPayload .ReadFromAsync( - RequestHeader.MaxVersion, Reader, cancellationToken) .ConfigureAwait(false); diff --git a/src/Kafka.TestFramework/ValueTaskExtensions.cs b/src/Kafka.TestFramework/ValueTaskExtensions.cs new file mode 100644 index 0000000..c6eeb72 --- /dev/null +++ b/src/Kafka.TestFramework/ValueTaskExtensions.cs @@ -0,0 +1,20 @@ +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; + +namespace Kafka.TestFramework +{ + internal static class ValueTaskExtensions + { + internal static Task WhenAllAsync( + this IEnumerable tasks) + => Task.WhenAll( + tasks.Where( + valueTask + => !valueTask.IsCompletedSuccessfully) + .Select(valueTask => valueTask.AsTask())); + + internal static ValueTask AsValueTask(this ValueTask valueTask) => + valueTask.IsCompletedSuccessfully ? default : new ValueTask(valueTask.AsTask()); + } +} \ No newline at end of file diff --git a/tests/Confluent.Kafka.1.1.0.Tests/BytesExtensions.cs b/tests/Confluent.Kafka.1.1.0.Tests/BytesExtensions.cs new file mode 100644 index 0000000..b1d6757 --- /dev/null +++ b/tests/Confluent.Kafka.1.1.0.Tests/BytesExtensions.cs @@ -0,0 +1,13 @@ +using System.Text; +using Kafka.Protocol; + +namespace Kafka.TestFramework.Tests +{ + internal static class BytesExtensions + { + internal static string EncodeToString(this byte[] bytes, Encoding encoding) + { + return encoding.GetString(bytes); + } + } +} \ No newline at end of file diff --git a/tests/Confluent.Kafka.1.1.0.Tests/Confluent.Kafka.1.1.0.Tests.csproj b/tests/Confluent.Kafka.1.1.0.Tests/Confluent.Kafka.1.1.0.Tests.csproj new file mode 100644 index 0000000..cb679ae --- /dev/null +++ b/tests/Confluent.Kafka.1.1.0.Tests/Confluent.Kafka.1.1.0.Tests.csproj @@ -0,0 +1,35 @@ + + + + netcoreapp3.0 + + false + + + + + + + + + + + + + + + + + + + + + + + + + Always + + + + diff --git a/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs b/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs new file mode 100644 index 0000000..51e873f --- /dev/null +++ b/tests/Confluent.Kafka.1.1.0.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs @@ -0,0 +1,145 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Confluent.Kafka; +using FluentAssertions; +using Kafka.Protocol; +using Log.It; +using Xunit; +using Xunit.Abstractions; +using Int32 = Kafka.Protocol.Int32; +using Int64 = Kafka.Protocol.Int64; +using Record = Kafka.Protocol.Records.Record; +using String = Kafka.Protocol.String; + +namespace Kafka.TestFramework.Tests +{ + public partial class Given_a_socket_based_test_server + { + public class When_connecting_to_the_server_and_producing_a_message : TestSpecificationAsync + { + private SocketBasedKafkaTestFramework _testServer; + private IEnumerable _records; + + public When_connecting_to_the_server_and_producing_a_message( + ITestOutputHelper testOutputHelper) + : base(testOutputHelper) + { + } + + protected override Task GivenAsync() + { + _testServer = KafkaTestFramework.WithSocket(); + + _testServer.On( + request => request.Respond() + .WithAllApiKeys()); + + _testServer.On( + request => request.Respond() + .WithTopicsCollection( + request.TopicsCollection?.Select(topic => + new Func( + responseTopic => + responseTopic + .WithName(topic.Name) + .WithPartitionsCollection(partition => + partition + .WithLeaderId(Int32.From(0)) + .WithPartitionIndex(Int32.From(0)) + .WithReplicaNodesCollection(new[] { Int32.From(0) })))) + .ToArray() ?? Array.Empty>()) + .WithControllerId(Int32.From(0)) + .WithClusterId(String.From("test")) + .WithBrokersCollection(broker => broker + .WithRack(String.From("testrack")) + .WithNodeId(Int32.From(0)) + .WithHost(String.From("localhost")) + .WithPort(Int32.From(_testServer.Port))) + ); + + _testServer.On(request => + { + _records = request.TopicDataCollection.SelectMany(pair => + pair.Value.PartitionDataCollection.Where(data => data.Records != null) + .SelectMany(data => data.Records!.Records)); + return request + .Respond() + .WithResponsesCollection(request.TopicDataCollection.Select(topicProduceData => + new Func( + topicProduceResponse => + topicProduceResponse + .WithName(topicProduceData.Value.Name) + .WithPartitionResponsesCollection(topicProduceData.Value + .PartitionDataCollection.Select(partitionProduceData => + new Func( + partitionProduceResponse => + partitionProduceResponse + .WithIndex(partitionProduceData.Index) + .WithLogAppendTimeMs(Int64.From(-1)))) + .ToArray()))) + .ToArray()); + }); + + return Task.CompletedTask; + } + + protected override async Task WhenAsync() + { + await using (_testServer.Start() + .ConfigureAwait(false)) + { + await ProduceMessageFromClientAsync("localhost", _testServer.Port, _testServer.Stopping) + .ConfigureAwait(false); + } + } + + [Fact] + public void It_should_have_read_one_record() + { + _records.Should().HaveCount(1); + } + + [Fact] + public void It_should_have_read_the_message_sent() + { + _records.First().Value.EncodeToString(Encoding.UTF8).Should().Be("test"); + } + + private static async Task ProduceMessageFromClientAsync(string host, + int port, CancellationToken testServerStopping) + { + var producerConfig = new ProducerConfig(new Dictionary + { + { "log_level", "7"} + }) + { + BootstrapServers = $"{host}:{port}", + MessageTimeoutMs = 5000, + SocketTimeoutMs = 30000, + Debug = "all" + }; + + using var producer = + new ProducerBuilder(producerConfig) + .SetLogHandler(LogExtensions.UseLogIt) + .Build(); + + var report = await producer + .ProduceAsync("my-topic", + new Message { Value = "test" }) + .ConfigureAwait(false); + LogFactory.Create("producer").Info("Produce report {@report}", report); + + producer.Flush(testServerStopping); + } + } + } +} \ No newline at end of file diff --git a/tests/Confluent.Kafka.1.1.0.Tests/LogExtensions.cs b/tests/Confluent.Kafka.1.1.0.Tests/LogExtensions.cs new file mode 100644 index 0000000..6f56fd7 --- /dev/null +++ b/tests/Confluent.Kafka.1.1.0.Tests/LogExtensions.cs @@ -0,0 +1,50 @@ +using System; +using Confluent.Kafka; +using Log.It; + +namespace Kafka.TestFramework.Tests +{ + internal static class LogExtensions + { + internal static void UseLogIt( + IProducer producer, LogMessage logMessage) + { + var logger = LogFactory.Create(producer.GetType().GetPrettyName()); + + switch (logMessage.Level) + { + case SyslogLevel.Debug: + LogTo(logger.Debug); + break; + case SyslogLevel.Notice: + case SyslogLevel.Info: + LogTo(logger.Info); + break; + case SyslogLevel.Warning: + LogTo(logger.Warning); + break; + case SyslogLevel.Error: + LogTo(logger.Error); + break; + case SyslogLevel.Alert: + case SyslogLevel.Critical: + case SyslogLevel.Emergency: + LogTo(logger.Fatal); + break; + default: + throw new ArgumentOutOfRangeException($"{logMessage.Level} is not supported"); + } + + void LogTo(Action log) + { + log("{name} {facility}: {message}", + new object[] + { + logMessage.Name, + logMessage.Facility, + logMessage.Message + }); + } + } + } +} \ No newline at end of file diff --git a/tests/Confluent.Kafka.1.1.0.Tests/TestSpecificationAsync.cs b/tests/Confluent.Kafka.1.1.0.Tests/TestSpecificationAsync.cs new file mode 100644 index 0000000..22adb82 --- /dev/null +++ b/tests/Confluent.Kafka.1.1.0.Tests/TestSpecificationAsync.cs @@ -0,0 +1,41 @@ +using System.Threading.Tasks; +using Log.It; +using Log.It.With.NLog; +using Microsoft.Extensions.Configuration; +using NLog.Extensions.Logging; +using Test.It.With.XUnit; +using Xunit.Abstractions; + +namespace Kafka.TestFramework.Tests +{ + public class TestSpecificationAsync : XUnit2SpecificationAsync + { + static TestSpecificationAsync() + { + var config = new ConfigurationBuilder() + .SetBasePath(System.IO.Directory.GetCurrentDirectory()) + .AddJsonFile("appsettings.json", optional: true, reloadOnChange: true) + .Build(); + + NLog.LogManager.Configuration = new NLogLoggingConfiguration(config.GetSection("NLog")); + LogFactory.Initialize(new NLogFactory(new LogicalThreadContext())); + } + + public TestSpecificationAsync(ITestOutputHelper testOutputHelper) : base(testOutputHelper) + { + NLogCapturingTarget.Subscribe += TestOutputHelper.WriteLine; + } + + protected virtual Task TearDownAsync() + { + return Task.CompletedTask; + } + + protected sealed override async Task DisposeAsync(bool disposing) + { + NLogCapturingTarget.Subscribe -= TestOutputHelper.WriteLine; + await TearDownAsync(); + await base.DisposeAsync(disposing); + } + } +} \ No newline at end of file diff --git a/tests/Confluent.Kafka.1.1.0.Tests/appsettings.json b/tests/Confluent.Kafka.1.1.0.Tests/appsettings.json new file mode 100644 index 0000000..712e57e --- /dev/null +++ b/tests/Confluent.Kafka.1.1.0.Tests/appsettings.json @@ -0,0 +1,26 @@ +{ + "NLog": { + "throwConfigExceptions": true, + "internalLogLevel": "info", + "internalLogFile": "${basedir}/internal-nlog.txt", + "extensions": [ + { "assembly": "Log.It.With.NLog" } + ], + "variables": { + "simplePipeSeparatorLayout": "${date:format=yyyy-MM-dd HH\\:mm\\:ss.fff} | ${logger} | ${level} | ${message} ${onexception:| ${exception:format=type} | ${exception:format=method} | ${exception:format=message} | ${exception:format=stacktrace} | ${exception:method:maxInnerExceptionLevel=5:innerFormat=shortType,message,method}}" + }, + "targets": { + "NLogCapturing": { + "type": "NLogCapturing", + "layout": "${simplePipeSeparatorLayout}" + } + }, + "rules": [ + { + "logger": "*", + "minLevel": "Trace", + "writeTo": "NLogCapturing" + } + ] + } +} \ No newline at end of file diff --git a/tests/Kafka.TestFramework.Tests/BytesExtensions.cs b/tests/Kafka.TestFramework.Tests/BytesExtensions.cs index 64075b3..b1d6757 100644 --- a/tests/Kafka.TestFramework.Tests/BytesExtensions.cs +++ b/tests/Kafka.TestFramework.Tests/BytesExtensions.cs @@ -5,9 +5,9 @@ namespace Kafka.TestFramework.Tests { internal static class BytesExtensions { - internal static string EncodeToString(this Bytes bytes, Encoding encoding) + internal static string EncodeToString(this byte[] bytes, Encoding encoding) { - return encoding.GetString(bytes.Value); + return encoding.GetString(bytes); } } } \ No newline at end of file diff --git a/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs b/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs index cc55336..b8c0a64 100644 --- a/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs +++ b/tests/Kafka.TestFramework.Tests/Given_a_socket_based_test_server.When_connecting_to_the_server_and_producing_a_message.cs @@ -52,7 +52,7 @@ protected override Task GivenAsync() .WithLeaderId(Int32.From(0)) .WithPartitionIndex(Int32.From(0)) .WithReplicaNodesCollection(new[] { Int32.From(0) })))) - .ToArray() ?? new Func[0]) + .ToArray() ?? Array.Empty>()) .WithControllerId(Int32.From(0)) .WithClusterId(String.From("test")) .WithBrokersCollection(broker => broker @@ -61,32 +61,32 @@ protected override Task GivenAsync() .WithHost(String.From("localhost")) .WithPort(Int32.From(_testServer.Port))) ); - - _testServer.On(async request => (await request - .WithActionAsync(produceRequest => Task.Run(async () => - { - _records = await produceRequest - .ExtractRecordsAsync(CancellationToken.None) - .ToListAsync() - .ConfigureAwait(false); - })) - .ConfigureAwait(false)) - .Respond() - .WithResponsesCollection(request.TopicsCollection.Select(topicProduceData => - new Func( - topicProduceResponse => - topicProduceResponse - .WithName(topicProduceData.Name) - .WithPartitionsCollection(topicProduceData.PartitionsCollection.Select(partitionProduceData => - new Func( - partitionProduceResponse => - partitionProduceResponse - .WithPartitionIndex(partitionProduceData.PartitionIndex) - .WithLogAppendTimeMs(Int64.From(-1)))) - .ToArray()))) - .ToArray())); + + _testServer.On(request => + { + _records = request.TopicDataCollection.SelectMany(pair => + pair.Value.PartitionDataCollection.Where(data => data.Records != null) + .SelectMany(data => data.Records!.Records)); + return request + .Respond() + .WithResponsesCollection(request.TopicDataCollection.Select(topicProduceData => + new Func( + topicProduceResponse => + topicProduceResponse + .WithName(topicProduceData.Value.Name) + .WithPartitionResponsesCollection(topicProduceData.Value + .PartitionDataCollection.Select(partitionProduceData => + new Func( + partitionProduceResponse => + partitionProduceResponse + .WithIndex(partitionProduceData.Index) + .WithLogAppendTimeMs(Int64.From(-1)))) + .ToArray()))) + .ToArray()); + }); return Task.CompletedTask; } @@ -96,7 +96,7 @@ protected override async Task WhenAsync() await using (_testServer.Start() .ConfigureAwait(false)) { - await ProduceMessageFromClientAsync("localhost", _testServer.Port) + await ProduceMessageFromClientAsync("localhost", _testServer.Port, _testServer.Stopping) .ConfigureAwait(false); } } @@ -114,7 +114,7 @@ public void It_should_have_read_the_message_sent() } private static async Task ProduceMessageFromClientAsync(string host, - int port) + int port, CancellationToken testServerStopping) { var producerConfig = new ProducerConfig(new Dictionary { @@ -122,9 +122,7 @@ private static async Task ProduceMessageFromClientAsync(string host, }) { BootstrapServers = $"{host}:{port}", - MessageTimeoutMs = 5000, - MetadataRequestTimeoutMs = 5000, - SocketTimeoutMs = 30000, + ApiVersionRequestTimeoutMs = 30000, Debug = "all" }; @@ -135,11 +133,11 @@ private static async Task ProduceMessageFromClientAsync(string host, var report = await producer .ProduceAsync("my-topic", - new Message { Value = "test" }) + new Message { Value = "test" }, testServerStopping) .ConfigureAwait(false); LogFactory.Create("producer").Info("Produce report {@report}", report); - producer.Flush(); + producer.Flush(testServerStopping); } } } diff --git a/tests/Kafka.TestFramework.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs b/tests/Kafka.TestFramework.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs index 8082b93..07822f9 100644 --- a/tests/Kafka.TestFramework.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs +++ b/tests/Kafka.TestFramework.Tests/Given_an_inmemory_kafka_test_framework_and_a_message_subscription.When_the_client_sends_the_message_subscribed.cs @@ -30,7 +30,7 @@ protected override Task GivenAsync() .WithThrottleTimeMs(Int32.From(100)) .WithApiKeysCollection( key => key - .WithIndex(FetchRequest.ApiKey) + .WithApiKey(FetchRequest.ApiKey) .WithMinVersion(FetchRequest.MinVersion) .WithMaxVersion(FetchRequest.MaxVersion))); @@ -46,13 +46,15 @@ protected override async Task WhenAsync() .CreateRequestClientAsync() .ConfigureAwait(false); + var message = new ApiVersionsRequest(ApiVersionsRequest.MaxVersion); var requestPayload = new RequestPayload( - new RequestHeader(RequestHeader.MaxVersion) + new RequestHeader(message.HeaderVersion) .WithRequestApiKey(ApiVersionsRequest.ApiKey) .WithRequestApiVersion( - ApiVersionsRequest.MaxVersion) + message.Version) .WithCorrelationId(Int32.From(12)), - new ApiVersionsRequest(ApiVersionsRequest.MaxVersion)); + message + ); await client .SendAsync(requestPayload) @@ -99,7 +101,7 @@ public void The_subscription_should_receive_a_api_versions_response_with_fetch_request_api_key() { _response.Message.As() - .ApiKeysCollection.Should().ContainKey(FetchRequest.ApiKey); + .ApiKeysCollection.Value.Should().ContainKey(FetchRequest.ApiKey); } [Fact] @@ -107,7 +109,7 @@ public void The_subscription_should_receive_a_api_versions_response_with_fetch_request_api_index() { _response.Message.As() - .ApiKeysCollection[FetchRequest.ApiKey].Index.Should() + .ApiKeysCollection[FetchRequest.ApiKey].ApiKey.Should() .Be(FetchRequest.ApiKey); } @@ -128,13 +130,6 @@ public void .ApiKeysCollection[FetchRequest.ApiKey].MaxVersion.Should() .Be(FetchRequest.MaxVersion); } - - protected override async Task TearDownAsync() - { - await _testServer - .DisposeAsync() - .ConfigureAwait(false); - } } } } \ No newline at end of file diff --git a/tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj b/tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj index 73a44e8..bf80eb0 100644 --- a/tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj +++ b/tests/Kafka.TestFramework.Tests/Kafka.TestFramework.Tests.csproj @@ -7,9 +7,9 @@ - + - + diff --git a/tests/Kafka.TestFramework.Tests/ProduceRequestExtensions.cs b/tests/Kafka.TestFramework.Tests/ProduceRequestExtensions.cs deleted file mode 100644 index 2be7fec..0000000 --- a/tests/Kafka.TestFramework.Tests/ProduceRequestExtensions.cs +++ /dev/null @@ -1,57 +0,0 @@ -using System; -using System.Collections.Generic; -using System.IO.Pipelines; -using System.Linq; -using System.Runtime.CompilerServices; -using System.Threading; -using System.Threading.Tasks; -using Kafka.Protocol; -using Kafka.Protocol.Records; -using Int16 = Kafka.Protocol.Int16; - -namespace Kafka.TestFramework.Tests -{ - internal static class ProduceRequestExtensions - { - internal static async IAsyncEnumerable ExtractRecordBatchesAsync( - this ProduceRequest produceRequest, - [EnumeratorCancellation] CancellationToken cancellationToken = default) - { - var records = produceRequest.TopicsCollection.SelectMany(data => - data.PartitionsCollection.Select(produceData => - produceData.Records)) - .Where(record => record.HasValue); - - var pipe = new Pipe(); - var reader = new KafkaReader(pipe.Reader); - foreach (var record in records) - { - await pipe.Writer.WriteAsync( - record.Value.Value.AsMemory(), - cancellationToken); - - yield return await RecordBatch.ReadFromAsync(Int16.Default, reader, - cancellationToken); - } - } - - internal static async IAsyncEnumerable ExtractRecordsAsync( - this ProduceRequest request, - [EnumeratorCancellation] CancellationToken cancellationToken) - { - await foreach (var batch in request - .ExtractRecordBatchesAsync(cancellationToken) - .ConfigureAwait(false)) - { - if (batch.Records == null) - continue; - - foreach (var record in batch.Records) - { - yield return record; - } - } - } - - } -} \ No newline at end of file diff --git a/version b/version deleted file mode 100644 index 9f8e9b6..0000000 --- a/version +++ /dev/null @@ -1 +0,0 @@ -1.0 \ No newline at end of file