diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index bb368d6..3fed914 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -46,10 +46,26 @@ jobs: - name: Determine Release Info id: release run: | - from_tag=$(git tag --points-at ${{ steps.gitversion.outputs.versionSourceSha }} | grep -m 1 ^v[0-9]*\.[0-9]*\.[0-9]* | head -1) - [[ -z "$from_tag" ]] && \ - from_ref_exclusive=${{ steps.gitversion.outputs.versionSourceSha }} || \ - from_ref_exclusive=$from_tag + default_branch=$(git remote show origin | awk '/HEAD branch/ {print $NF}') + + if [ "${{ github.ref_name }}" == "$default_branch" ]; then + # Extract the branch name of the branch that merged into the current commit + commit_subject=$(git log -1 --pretty=format:%s) + regex='Merge pull request #[0-9]+ from .+/(.+)$' + [[ $commit_subject =~ $regex ]] + merged_branch=${BASH_REMATCH[1]} + + [[ -z "$merged_branch" ]] && \ + # Committed directly on default branch, use the previous commit + from_ref_exclusive=$(git log -2 --pretty=format:"%H" | tail -1) + # Find what commit the merged branch branched from originally + from_ref_exclusive=$(diff -u <(git rev-list --first-parent $merged_branch) \ + <(git rev-list --first-parent $default_branch) | \ + sed -ne 's/^ //p' | head -1) + else + # Get the commit this branch branched from + from_ref_exclusive=$(git rev-list $(git rev-list $default_branch.. | tail -1)^ -n 1) + fi [[ -z "${{ steps.gitversion.outputs.preReleaseTag }}" ]] && \ is_prerelease=false || \ diff --git a/src/Kafka.TestFramework/Client.cs b/src/Kafka.TestFramework/Client.cs index 64284cf..6ad2a1e 100644 --- a/src/Kafka.TestFramework/Client.cs +++ b/src/Kafka.TestFramework/Client.cs @@ -42,52 +42,49 @@ protected void StartReceiving() { var cancellationToken = _signalNoMoreDataToWrite.Token; var writer = _pipe.Writer; - await using (_networkClient.ConfigureAwait(false)) + try { - try + FlushResult result; + do { - FlushResult result; - do - { - var memory = writer.GetMemory(MinimumBufferSize); - var bytesRead = await _networkClient.ReceiveAsync( - memory, - cancellationToken) - .ConfigureAwait(false); + var memory = writer.GetMemory(MinimumBufferSize); + var bytesRead = await _networkClient.ReceiveAsync( + memory, + cancellationToken) + .ConfigureAwait(false); - if (bytesRead == 0) - { - break; - } + if (bytesRead == 0) + { + break; + } - Logger.Debug("Received {bytesRead} bytes", bytesRead); - writer.Advance(bytesRead); + Logger.Debug("Received {bytesRead} bytes", bytesRead); + writer.Advance(bytesRead); - result = await writer - .FlushAsync(cancellationToken) - .ConfigureAwait(false); - } while (result.IsCanceled == false && - result.IsCompleted == false); - } - catch when (_signalNoMoreDataToWrite.IsCancellationRequested) - { - // Shutdown in progress - } - catch (OperationCanceledException) - { - } - catch (Exception ex) - { - writer.Complete(ex); - throw; - } - finally - { - _signalNoMoreDataToWrite.Cancel(); - } - - writer.Complete(); + result = await writer + .FlushAsync(cancellationToken) + .ConfigureAwait(false); + } while (result.IsCanceled == false && + result.IsCompleted == false); + } + catch when (_signalNoMoreDataToWrite.IsCancellationRequested) + { + // Shutdown in progress } + catch (OperationCanceledException) + { + } + catch (Exception ex) + { + writer.Complete(ex); + throw; + } + finally + { + _signalNoMoreDataToWrite.Cancel(); + } + + writer.Complete(); }); } @@ -98,6 +95,8 @@ public async ValueTask DisposeAsync() await _sendAndReceiveBackgroundTask .ConfigureAwait(false); + await _networkClient.DisposeAsync() + .ConfigureAwait(false); } } } \ No newline at end of file diff --git a/src/Kafka.TestFramework/KafkaTestFramework.cs b/src/Kafka.TestFramework/KafkaTestFramework.cs index 5d81f57..da554e0 100644 --- a/src/Kafka.TestFramework/KafkaTestFramework.cs +++ b/src/Kafka.TestFramework/KafkaTestFramework.cs @@ -192,11 +192,13 @@ public StopOnDispose(KafkaTestFramework testFramework) { _testFramework = testFramework; } - public async ValueTask DisposeAsync() + + public ValueTask DisposeAsync() { _testFramework._cancellationTokenSource.Cancel(); - await Task.WhenAll(_testFramework._backgroundTasks) - .ConfigureAwait(false); + return Task.WhenAll(_testFramework._backgroundTasks) + .ThrowAllExceptions() + .AsValueTask(); } } } diff --git a/src/Kafka.TestFramework/TaskExtensions.cs b/src/Kafka.TestFramework/TaskExtensions.cs new file mode 100644 index 0000000..142c165 --- /dev/null +++ b/src/Kafka.TestFramework/TaskExtensions.cs @@ -0,0 +1,28 @@ +using System.Runtime.ExceptionServices; +using System.Threading.Tasks; + +namespace Kafka.TestFramework +{ + internal static class TaskExtensions + { + internal static async Task ThrowAllExceptions(this Task task) + { + try + { + await task.ConfigureAwait(false); + } + catch + { + if (task.Exception?.InnerExceptions.Count > 1) + { + ExceptionDispatchInfo.Capture(task.Exception).Throw(); + } + + throw; + } + } + + internal static ValueTask AsValueTask(this Task task) => + task.IsCompletedSuccessfully ? default : new ValueTask(task); + } +} \ No newline at end of file