Skip to content

Commit

Permalink
Merge pull request #5 from Fresa/continous-consumer-test
Browse files Browse the repository at this point in the history
test: extend the consumer test to demonstrate a client consuming mult…
  • Loading branch information
Fresa authored Feb 1, 2023
2 parents 4cad71b + 8f574a5 commit 15acdc3
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 165 deletions.
38 changes: 0 additions & 38 deletions .github/version_config.yml

This file was deleted.

163 changes: 69 additions & 94 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ on:
push:
branches:
- '**'
env:
project_path: src/Kafka.TestFramework
project_name: Kafka.TestFramework

jobs:
test:
Expand All @@ -14,112 +17,84 @@ jobs:
matrix:
os: [ubuntu-latest]
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3
- name: Setup .NET
uses: actions/setup-dotnet@v1
with:
dotnet-version: 3.1.x
dotnet-version: 7.0.x
- name: Build
run: dotnet build -c Release
- name: Test
run: dotnet test -c Release --no-build --verbosity normal

release:
release:
name: Create Release
needs: [test]
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
# Fetches entire history, so we can analyze commits since last tag
fetch-depth: 0
- name: Install GitVersion
uses: gittools/actions/gitversion/setup@v0.9.7
with:
versionSpec: '5.x'
- name: Determine Version
id: gitversion
uses: gittools/actions/gitversion/execute@v0
with:
useConfigFile: true
configFilePath: .github/version_config.yml
- name: Determine Release Info
id: release
run: |
default_branch=$(git remote show origin | awk '/HEAD branch/ {print $NF}')
if [ "${{ github.ref_name }}" == "$default_branch" ]; then
# Extract the branch name of the branch that merged into the current commit
commit_subject=$(git log -1 --pretty=format:%s)
regex='Merge pull request #[0-9]+ from .+/(.+)$'
[[ $commit_subject =~ $regex ]]
merged_branch=${BASH_REMATCH[1]}
[[ -z "$merged_branch" ]] && \
# Committed directly on default branch, use the previous commit
from_ref_exclusive=$(git log -2 --pretty=format:"%H" | tail -1)
# Find what commit the merged branch branched from originally
from_ref_exclusive=$(diff -u <(git rev-list --first-parent $merged_branch) \
<(git rev-list --first-parent $default_branch) | \
sed -ne 's/^ //p' | head -1)
else
# Get the commit this branch branched from
from_ref_exclusive=$(git rev-list $(git rev-list $default_branch.. | tail -1)^ -n 1)
fi
- uses: actions/checkout@v3
with:
# Fetches entire history, so we can analyze commits since last tag
fetch-depth: 0
- name: Determine Release Versioning
id: versioning
uses: Fresa/trunk-based-release-versioning@v0
- name: Determine Release Version
id: release-tag
run: |
version=${{ steps.versioning.outputs.version }}
if ${{ steps.versioning.outputs.is-prerelease }}; then
version="$version-pre-$(echo ${{ steps.versioning.outputs.release-ref }} | cut -c1-8)"
fi
[[ -z "${{ steps.gitversion.outputs.preReleaseTag }}" ]] && \
is_prerelease=false || \
is_prerelease=true
[[ $is_prerelease == true ]] && \
version=${{ steps.gitversion.outputs.majorMinorPatch }}-pre-${{ steps.gitversion.outputs.commitsSinceVersionSource }} || \
version=${{ steps.gitversion.outputs.majorMinorPatch }}
echo "::set-output name=is_prerelease::$is_prerelease"
echo "::set-output name=tag::v$version"
echo "::set-output name=version::$version"
echo "::set-output name=from_ref_exclusive::$from_ref_exclusive"
- name: Create Tag
uses: actions/github-script@v3
with:
script: |
github.git.createRef({
owner: context.repo.owner,
repo: context.repo.repo,
ref: "refs/tags/${{ steps.release.outputs.tag }}",
sha: "${{ steps.gitversion.outputs.sha }}"
});
- name: Generate Release Notes
id: release_notes
uses: Fresa/release-notes-generator@v0
with:
version: ${{ steps.release.outputs.tag }}
from_ref_exclusive: ${{ steps.release.outputs.from_ref_exclusive }}
to_ref_inclusive: ${{ steps.release.outputs.tag }}
- name: Create Release
id: create_release
uses: softprops/action-gh-release@v1
with:
echo "tag=v$version" >> $GITHUB_OUTPUT
echo "version=$version" >> $GITHUB_OUTPUT
- name: Convert Commit Logs to JSON
id: convert-commit-logs
uses: mikefarah/yq@v4.30.6
with:
cmd: yq ea -o=json 'select(. != null) | [.]' "$(echo "${{ steps.versioning.outputs.commit-logs-path }}" | sed "s|^${{ github.workspace }}/||")" | tee commit_logs.json
- name: Generate Release Notes
id: release_notes
uses: Fresa/release-notes-generator@v2
with:
version: ${{ steps.release-tag.outputs.tag }}
last_release_ref: ${{ steps.versioning.outputs.last-release-ref }}
release_ref: ${{ steps.versioning.outputs.release-ref }}
path_to_commits: ./commit_logs.json
- name: Pack
env:
release_notes: ${{ steps.release_notes.outputs.release_notes }}
run: dotnet pack ${{ env.project_path }}/${{ env.project_name }}.csproj -c Release -o nuget-packages -p:PackageVersion=${{ steps.release-tag.outputs.version }} -p:PackageReleaseNotes="$release_notes"
- name: Create Tag
uses: actions/github-script@v6
with:
script: |
github.rest.git.createRef({
owner: context.repo.owner,
repo: context.repo.repo,
ref: "refs/tags/${{ steps.release-tag.outputs.tag }}",
sha: "${{ steps.versioning.outputs.release-ref }}"
});
- name: Create Release
uses: softprops/action-gh-release@v1
with:
body: ${{ steps.release_notes.outputs.release_notes }}
tag_name: ${{ steps.release.outputs.tag }}
prerelease: ${{ steps.release.outputs.is_prerelease }}
env:
tag_name: ${{ steps.release-tag.outputs.tag }}
prerelease: ${{ steps.versioning.outputs.is-prerelease }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
- name: Update Latest Minor Tag
uses: EndBug/latest-tag@v1
if: steps.release.outputs.is_prerelease == 'false'
with:
tag-name: v${{ steps.gitversion.outputs.major }}
description: ${{ steps.release.outputs.tag }}
- name: Update Latest Patch Tag
uses: EndBug/latest-tag@v1
if: steps.release.outputs.is_prerelease == 'false'
with:
tag-name: v${{ steps.gitversion.outputs.major }}.${{ steps.gitversion.outputs.minor }}
description: ${{ steps.release.outputs.tag }}
- name: Pack
env:
release_notes: ${{ steps.release_notes.outputs.release_notes }}
run: dotnet pack src/Kafka.TestFramework/Kafka.TestFramework.csproj -c Release -o nuget-packages -p:PackageVersion=${{ steps.release.outputs.version }} -p:PackageReleaseNotes="$release_notes"
- name: Publish to nuget.org
run: dotnet nuget push nuget-packages/Kafka.TestFramework.${{ steps.release.outputs.version }}.nupkg --api-key ${{secrets.NUGET_API_KEY}} --source https://api.nuget.org/v3/index.json
- name: Update Latest Minor Tag
uses: EndBug/latest-tag@v1
if: steps.versioning.outputs.is-prerelease == 'false'
with:
ref: v${{ steps.versioning.outputs.major-version }}
description: ${{ steps.release-tag.outputs.tag }}
- name: Update Latest Patch Tag
uses: EndBug/latest-tag@v1
if: steps.versioning.outputs.is-prerelease == 'false'
with:
ref: v${{ steps.versioning.outputs.major-version }}.${{ steps.versioning.outputs.minor-version }}
description: ${{ steps.release-tag.outputs.tag }}
- name: Publish to nuget.org
run: dotnet nuget push nuget-packages/${{ env.project_name }}.${{ steps.release-tag.outputs.version }}.nupkg --api-key ${{secrets.NUGET_API_KEY}} --source https://api.nuget.org/v3/index.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp3.0</TargetFramework>
<TargetFramework>net7.0</TargetFramework>

<IsPackable>false</IsPackable>
</PropertyGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
using FluentAssertions;
using Kafka.Protocol;
using Kafka.Protocol.Records;
using Log.It;
using Xunit;
using Xunit.Abstractions;
using Int32 = Kafka.Protocol.Int32;
Expand All @@ -21,7 +20,8 @@ public partial class Given_a_socket_based_test_server
public class When_connecting_to_the_server_and_consuming_a_message : TestSpecificationAsync
{
private SocketBasedKafkaTestFramework _testServer;
private ConsumeResult<Ignore, string> _result;
private readonly List<string> _result = new List<string>();
private const int NumberOfMessage = 5;

public When_connecting_to_the_server_and_consuming_a_message(
ITestOutputHelper testOutputHelper)
Expand Down Expand Up @@ -99,26 +99,51 @@ protected override Task GivenAsync()
Array.Empty<Func<OffsetFetchResponse.OffsetFetchResponseTopic,
OffsetFetchResponse.OffsetFetchResponseTopic>>()));

_testServer.On<FetchRequest, FetchResponse>(request => request.Respond()
.WithResponsesCollection(
request.TopicsCollection.Select(topic =>
new Func<FetchResponse.FetchableTopicResponse, FetchResponse.FetchableTopicResponse>(
response => response
.WithTopic(topic.Topic)
.WithPartitionsCollection(topic.PartitionsCollection.Select(partition =>
new Func<FetchResponse.FetchableTopicResponse.PartitionData,
FetchResponse.FetchableTopicResponse.PartitionData>(data => data
.WithRecords(new NullableRecordBatch
var records = new Dictionary<long, Record>();
for (var i = 0; i < NumberOfMessage; i++)
{
records.Add(i, new Record
{
OffsetDelta = i,
Value = Encoding.UTF8.GetBytes(
$"data{i} fetched from broker")
});
}

_testServer.On<FetchRequest, FetchResponse>(async (request, cancellationToken) =>
{
var returnsData = false;
var response = request.Respond()
.WithResponsesCollection(
request.TopicsCollection.Select(topic =>
new Func<FetchResponse.FetchableTopicResponse, FetchResponse.FetchableTopicResponse>(
response => response
.WithTopic(topic.Topic)
.WithPartitionsCollection(topic.PartitionsCollection.Select(partition =>
new Func<FetchResponse.FetchableTopicResponse.PartitionData,
FetchResponse.FetchableTopicResponse.PartitionData>(data =>
{
Magic = 2,
Records = new NullableArray<Record>(new Record
var recordBatch = new NullableRecordBatch
{
Value = Encoding.UTF8.GetBytes(
"data fetched from broker"),
})
}))).ToArray()
))).ToArray())
);
LastOffsetDelta = (int)partition.FetchOffset,
Magic = 2,
Records = records.TryGetValue(partition.FetchOffset.Value,
out var record)
? new NullableArray<Record>(record)
: NullableArray<Record>.Default
};
returnsData = recordBatch.Records != NullableArray<Record>.Default;
return returnsData ? data.WithRecords(recordBatch) : data;
})).ToArray()
))).ToArray());
if (!returnsData)
{
await Task.Delay(request.MaxWaitMs, cancellationToken)
.ConfigureAwait(false);
}
return response;
});

_testServer.On<OffsetCommitRequest, OffsetCommitResponse>(request => request.Respond()
.WithTopicsCollection(request.TopicsCollection.Select(topic =>
Expand All @@ -145,17 +170,21 @@ protected override async Task WhenAsync()
await using (_testServer.Start()
.ConfigureAwait(false))
{
_result = ConsumeMessage("localhost", _testServer.Port, _testServer.Stopping);
ConsumeMessages("localhost", _testServer.Port, _testServer.Stopping);
}
}

[Fact]
public void It_should_have_read_the_message_sent()
public void It_should_have_read_the_messages_sent()
{
_result.Message.Value.Should().Be("data fetched from broker");
_result.Should().HaveCount(NumberOfMessage);
for (var i = 0; i < NumberOfMessage; i++)
{
_result.Should().Contain($"data{i} fetched from broker");
}
}

private ConsumeResult<Ignore, string> ConsumeMessage(string host,
private void ConsumeMessages(string host,
int port, CancellationToken testServerStopping)
{
var consumerConfig = new ConsumerConfig(new Dictionary<string, string>
Expand All @@ -166,7 +195,7 @@ private ConsumeResult<Ignore, string> ConsumeMessage(string host,
BootstrapServers = $"{host}:{port}",
ApiVersionRequestTimeoutMs = 30000,
Debug = "all",
GroupId = "group1"
GroupId = "group1",
};

using var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)
Expand All @@ -176,12 +205,17 @@ private ConsumeResult<Ignore, string> ConsumeMessage(string host,
consumer.Subscribe("topic1");
var cancellationToken = CancellationTokenSource
.CreateLinkedTokenSource(testServerStopping, TimeoutCancellationToken).Token;
var result = consumer.Consume(cancellationToken);
consumer.Close();

LogFactory.Create("consumer").Info("Consumer consumed {@result}", result);

return result;
try
{
for (var i = 0; i < NumberOfMessage; i++)
{
_result.Add(consumer.Consume(cancellationToken).Message.Value);
}
}
finally
{
consumer.Close();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp3.0</TargetFramework>
<TargetFramework>net7.0</TargetFramework>

<IsPackable>false</IsPackable>
</PropertyGroup>
Expand Down

0 comments on commit 15acdc3

Please sign in to comment.