From c10386010e5697348b3aa26b7464ab96927b5560 Mon Sep 17 00:00:00 2001 From: Rui Quintas Barbosa Date: Sun, 15 Oct 2023 19:07:58 +0100 Subject: [PATCH] tests: Add WaitForSpansAsync mechanism --- .../GlobalEventsTest.cs | 18 ++------ .../OpenTelemetryTests.cs | 44 +++++++++---------- 2 files changed, 26 insertions(+), 36 deletions(-) diff --git a/src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs b/src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs index 01c0c5e43..830412ccc 100644 --- a/src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs +++ b/src/KafkaFlow.IntegrationTests/GlobalEventsTest.cs @@ -7,7 +7,6 @@ using AutoFixture; using KafkaFlow.Configuration; using KafkaFlow.IntegrationTests.Core; - using KafkaFlow.IntegrationTests.Core.Exceptions; using KafkaFlow.IntegrationTests.Core.Handlers; using KafkaFlow.IntegrationTests.Core.Messages; using KafkaFlow.IntegrationTests.Core.Middlewares; @@ -240,19 +239,10 @@ private async Task GetServiceProviderAsync( private async Task WaitForPartitionAssignmentAsync() { - var retryPolicy = Policy - .Handle() - .WaitAndRetryAsync(Enumerable.Range(0, 6).Select(i => TimeSpan.FromSeconds(Math.Pow(i, 2)))); - - await retryPolicy.ExecuteAsync(() => - { - if (!this.isPartitionAssigned) - { - throw new PartitionAssignmentException(); - } - - return Task.CompletedTask; - }); + await Policy + .HandleResult(isAvailable => !isAvailable) + .WaitAndRetryAsync(Enumerable.Range(0, 6).Select(i => TimeSpan.FromSeconds(Math.Pow(i, 2)))) + .ExecuteAsync(() => Task.FromResult(this.isPartitionAssigned)); } } } diff --git a/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs b/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs index 89d60fa3e..855083632 100644 --- a/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs +++ b/src/KafkaFlow.IntegrationTests/OpenTelemetryTests.cs @@ -13,7 +13,6 @@ using KafkaFlow.Compressor.Gzip; using KafkaFlow.Configuration; using KafkaFlow.IntegrationTests.Core; - using KafkaFlow.IntegrationTests.Core.Exceptions; using KafkaFlow.IntegrationTests.Core.Handlers; using KafkaFlow.IntegrationTests.Core.Middlewares; using KafkaFlow.IntegrationTests.Core.Producers; @@ -57,13 +56,9 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_TraceAndSpans await producer.ProduceAsync(null, message); // Assert - await Task.Delay(8000).ConfigureAwait(false); + var (producerSpan, consumerSpan) = await this.WaitForSpansAsync(); Assert.IsNotNull(this.exportedItems); - - var producerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Producer); - var consumerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Consumer); - Assert.IsNull(producerSpan.ParentId); Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId); Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId); @@ -102,13 +97,9 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp await producer.ProduceAsync(null, message); // Assert - await Task.Delay(10000).ConfigureAwait(false); + var (producerSpan, consumerSpan) = await this.WaitForSpansAsync(); Assert.IsNotNull(this.exportedItems); - - var producerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Producer); - var consumerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Consumer); - Assert.AreEqual(producerSpan.TraceId, consumerSpan.TraceId); Assert.AreEqual(consumerSpan.ParentSpanId, producerSpan.SpanId); Assert.AreEqual(producerSpan.GetBaggageItem(baggageName1), baggageValue1); @@ -119,7 +110,7 @@ public async Task AddOpenTelemetry_ProducingAndConsumingOneMessage_BaggageIsProp private async Task GetServiceProvider() { - string topicName = "Otel"; + var topicName = $"OpenTelemetryTestTopic_{Guid.NewGuid()}"; this.isPartitionAssigned = false; @@ -184,19 +175,28 @@ private async Task GetServiceProvider() private async Task WaitForPartitionAssignmentAsync() { - var retryPolicy = Policy - .Handle() - .WaitAndRetryAsync(Enumerable.Range(0, 6).Select(i => TimeSpan.FromSeconds(Math.Pow(i, 2)))); + await Policy + .HandleResult(isAvailable => !isAvailable) + .WaitAndRetryAsync(Enumerable.Range(0, 6).Select(i => TimeSpan.FromSeconds(Math.Pow(i, 2)))) + .ExecuteAsync(() => Task.FromResult(this.isPartitionAssigned)); + } + + private async Task<(Activity producerSpan, Activity consumerSpan)> WaitForSpansAsync() + { + Activity producerSpan = null, consumerSpan = null; - await retryPolicy.ExecuteAsync(() => - { - if (!this.isPartitionAssigned) + await Policy + .HandleResult(isAvailable => !isAvailable) + .WaitAndRetryAsync(Enumerable.Range(0, 6).Select(i => TimeSpan.FromSeconds(Math.Pow(i, 2)))) + .ExecuteAsync(() => { - throw new PartitionAssignmentException(); - } + producerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Producer); + consumerSpan = this.exportedItems.Find(x => x.Kind == ActivityKind.Consumer); + + return Task.FromResult(producerSpan != null && consumerSpan != null); + }); - return Task.CompletedTask; - }); + return (producerSpan, consumerSpan); } } }