From 4b6fa9335a83ff91533c8ab6c8b63f8dee406f99 Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Tue, 20 Feb 2024 23:07:03 -0500 Subject: [PATCH 1/2] Fix some flaky tests that were checking if metrics were being emitted. They had a resource lock around them because they were using some static constructs provided by the Otel SDK. Now the resource locks on tests are gone for the ConditionallyReliableLoggingHttpHandlerTest and each test constructs its own non-static otel sdk. I've also tried to make tests that confirm otel instrumentation a bit safer. They properly shut down metric exporter threads now. They also wait a couple cycles before returning the in-memory exported metrics. I really wish that there were an exporter that didn't have to wait, but at least for now, this seems like the only option. At some point, it might be worth implementing a different MeterProvider which would in turn build out a different implementation hierarchy to return instruments that have getters. Using mock classes to spy on calls could work too. Signed-off-by: Greg Schohn --- .../InMemoryInstrumentationBundle.java | 65 ++++- TrafficCapture/nettyWireLogging/build.gradle | 1 + ...ionallyReliableLoggingHttpHandlerTest.java | 272 ++++++++++-------- .../FullReplayerWithTracingChecksTest.java | 16 +- .../replay/ParsedHttpMessagesAsDictsTest.java | 4 - .../replay/ResultsToLogsConsumerTest.java | 2 +- .../NettyPacketToHttpConsumerTest.java | 3 +- .../replay/tracing/TracingTest.java | 7 +- 8 files changed, 221 insertions(+), 149 deletions(-) diff --git a/TrafficCapture/coreUtilities/src/testFixtures/java/org/opensearch/migrations/tracing/InMemoryInstrumentationBundle.java b/TrafficCapture/coreUtilities/src/testFixtures/java/org/opensearch/migrations/tracing/InMemoryInstrumentationBundle.java index 3d1ea5930..2ed8900c5 100644 --- a/TrafficCapture/coreUtilities/src/testFixtures/java/org/opensearch/migrations/tracing/InMemoryInstrumentationBundle.java +++ b/TrafficCapture/coreUtilities/src/testFixtures/java/org/opensearch/migrations/tracing/InMemoryInstrumentationBundle.java @@ -8,13 +8,16 @@ import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.export.MetricExporter; import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; -import io.opentelemetry.sdk.testing.exporter.InMemoryMetricExporter; import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; import io.opentelemetry.sdk.trace.SdkTracerProvider; +import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import lombok.Getter; +import lombok.Lombok; import lombok.NonNull; +import lombok.SneakyThrows; +import java.io.IOException; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@ -24,9 +27,11 @@ import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; -@Getter public class InMemoryInstrumentationBundle implements AutoCloseable { + public static final Duration DEFAULT_COLLECTION_PERIOD = Duration.ofMillis(1); + private static final int MIN_MILLIS_TO_WAIT_FOR_FINISH = 10; + public static class LastMetricsExporter implements MetricExporter { private final Queue finishedMetricItems = new ConcurrentLinkedQueue<>(); boolean isStopped; @@ -62,9 +67,13 @@ public AggregationTemporality getAggregationTemporality(@NonNull InstrumentType } } + @Getter public final OpenTelemetrySdk openTelemetrySdk; - public final InMemorySpanExporter testSpanExporter; - public final LastMetricsExporter testMetricExporter; + private final InMemorySpanExporter testSpanExporter; + private final LastMetricsExporter testMetricExporter; + private final PeriodicMetricReader periodicMetricReader; + private final Duration collectionPeriod; + private boolean alreadyWaitedForMetrics; public InMemoryInstrumentationBundle(boolean collectTraces, boolean collectMetrics) { @@ -74,8 +83,15 @@ public InMemoryInstrumentationBundle(boolean collectTraces, public InMemoryInstrumentationBundle(InMemorySpanExporter testSpanExporter, LastMetricsExporter testMetricExporter) { + this(testSpanExporter, testMetricExporter, DEFAULT_COLLECTION_PERIOD); + } + + public InMemoryInstrumentationBundle(InMemorySpanExporter testSpanExporter, + LastMetricsExporter testMetricExporter, + Duration collectionPeriod) { this.testSpanExporter = testSpanExporter; this.testMetricExporter = testMetricExporter; + this.collectionPeriod = collectionPeriod; var otelBuilder = OpenTelemetrySdk.builder(); if (testSpanExporter != null) { @@ -83,18 +99,51 @@ public InMemoryInstrumentationBundle(InMemorySpanExporter testSpanExporter, .addSpanProcessor(SimpleSpanProcessor.create(testSpanExporter)).build()); } if (testMetricExporter != null) { + this.periodicMetricReader = PeriodicMetricReader.builder(testMetricExporter) + .setInterval(Duration.ofMillis(collectionPeriod.toMillis())) + .build(); otelBuilder = otelBuilder.setMeterProvider(SdkMeterProvider.builder() - .registerMetricReader(PeriodicMetricReader.builder(testMetricExporter) - .setInterval(Duration.ofMillis(100)) - .build()) + .registerMetricReader(periodicMetricReader) .build()); + } else { + this.periodicMetricReader = null; } openTelemetrySdk = otelBuilder.build(); } + public List getFinishedSpans() { + if (testSpanExporter == null) { + throw new IllegalStateException("Metrics collector was not configured"); + } + return testSpanExporter.getFinishedSpanItems(); + } + + /** + * Waits double the collectionPeriod time (once) before returning the collected metrics + * @return + */ + @SneakyThrows + public Collection getFinishedMetrics() { + if (testMetricExporter == null) { + throw new IllegalStateException("Metrics collector was not configured"); + } + if (!alreadyWaitedForMetrics) { + Thread.sleep(Math.max(collectionPeriod.toMillis() * 2, MIN_MILLIS_TO_WAIT_FOR_FINISH)); + alreadyWaitedForMetrics = true; + } + return testMetricExporter.getFinishedMetricItems(); + } + @Override public void close() { - Optional.ofNullable(testMetricExporter).ifPresent(MetricExporter::close); + Optional.ofNullable(testMetricExporter).ifPresent(me -> { + try { + periodicMetricReader.close(); + } catch (IOException e) { + throw Lombok.sneakyThrow(e); + } + me.close(); + }); Optional.ofNullable(testSpanExporter).ifPresent(te -> { te.close(); te.reset(); diff --git a/TrafficCapture/nettyWireLogging/build.gradle b/TrafficCapture/nettyWireLogging/build.gradle index 4675c5cfd..850a6c56d 100644 --- a/TrafficCapture/nettyWireLogging/build.gradle +++ b/TrafficCapture/nettyWireLogging/build.gradle @@ -29,4 +29,5 @@ dependencies { testImplementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.7' testImplementation testFixtures(project(path: ':testUtilities')) + testImplementation testFixtures(project(path: ':coreUtilities')) } diff --git a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java index 9fecf3e17..4d17893fe 100644 --- a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java +++ b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java @@ -4,18 +4,17 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; -import io.opentelemetry.sdk.testing.junit5.OpenTelemetryExtension; +import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.parallel.ResourceLock; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.opensearch.migrations.testutils.TestUtilities; import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection; +import org.opensearch.migrations.tracing.InMemoryInstrumentationBundle; import org.opensearch.migrations.trafficcapture.CodedOutputStreamAndByteBufferWrapper; import org.opensearch.migrations.trafficcapture.CodedOutputStreamHolder; import org.opensearch.migrations.trafficcapture.OrderedStreamLifecyleManager; @@ -43,12 +42,26 @@ @Slf4j public class ConditionallyReliableLoggingHttpHandlerTest { - @RegisterExtension - static final OpenTelemetryExtension otelTesting = OpenTelemetryExtension.create(); - private static class TestRootContext extends RootWireLoggingContext { + private static class TestRootContext extends RootWireLoggingContext implements AutoCloseable { + @Getter + InMemoryInstrumentationBundle instrumentationBundle; + public TestRootContext() { - super(otelTesting.getOpenTelemetry()); + this(false, false); + } + public TestRootContext(boolean trackMetrics, boolean trackTraces) { + this(new InMemoryInstrumentationBundle(trackTraces, trackTraces)); + } + + public TestRootContext(InMemoryInstrumentationBundle inMemoryInstrumentationBundle) { + super(inMemoryInstrumentationBundle.openTelemetrySdk); + this.instrumentationBundle = inMemoryInstrumentationBundle; + } + + @Override + public void close() { + instrumentationBundle.close(); } } @@ -83,41 +96,45 @@ public CodedOutputStreamAndByteBufferWrapper createStream() { } } - private static void writeMessageAndVerify(byte[] fullTrafficBytes, Consumer channelWriter) + private static void writeMessageAndVerify(byte[] fullTrafficBytes, Consumer channelWriter, + boolean checkInstrumentation) throws IOException { - var rootInstrumenter = new TestRootContext(); - var streamManager = new TestStreamManager(); - var offloader = new StreamChannelConnectionCaptureSerializer("Test", "c", streamManager); - - EmbeddedChannel channel = new EmbeddedChannel( - new ConditionallyReliableLoggingHttpHandler(rootInstrumenter, - "n", "c", ctx -> offloader, - new RequestCapturePredicate(), x->true)); // true: block every request - channelWriter.accept(channel); - - // we wrote the correct data to the downstream handler/channel - var outputData = new SequenceInputStream(Collections.enumeration(channel.inboundMessages().stream() - .map(m->new ByteArrayInputStream(consumeIntoArray((ByteBuf)m))) - .collect(Collectors.toList()))) - .readAllBytes(); - Assertions.assertArrayEquals(fullTrafficBytes, outputData); - - Assertions.assertNotNull(streamManager.byteBufferAtomicReference, - "This would be null if the handler didn't block until the output was written"); - // we wrote the correct data to the offloaded stream - var trafficStream = TrafficStream.parseFrom(streamManager.byteBufferAtomicReference.get()); - Assertions.assertTrue(trafficStream.getSubStreamCount() > 0 && - trafficStream.getSubStream(0).hasRead()); - var combinedTrafficPacketsSteam = - new SequenceInputStream(Collections.enumeration(trafficStream.getSubStreamList().stream() - .filter(TrafficObservation::hasRead) - .map(to->new ByteArrayInputStream(to.getRead().getData().toByteArray())) - .collect(Collectors.toList()))); - Assertions.assertArrayEquals(fullTrafficBytes, combinedTrafficPacketsSteam.readAllBytes()); - Assertions.assertEquals(1, streamManager.flushCount.get()); - - Assertions.assertTrue(!otelTesting.getSpans().isEmpty()); - Assertions.assertTrue(!otelTesting.getMetrics().isEmpty()); + try (var rootContext = new TestRootContext(checkInstrumentation, checkInstrumentation)) { + var streamManager = new TestStreamManager(); + var offloader = new StreamChannelConnectionCaptureSerializer("Test", "c", streamManager); + + EmbeddedChannel channel = new EmbeddedChannel( + new ConditionallyReliableLoggingHttpHandler(rootContext, + "n", "c", ctx -> offloader, + new RequestCapturePredicate(), x -> true)); // true: block every request + channelWriter.accept(channel); + + // we wrote the correct data to the downstream handler/channel + var outputData = new SequenceInputStream(Collections.enumeration(channel.inboundMessages().stream() + .map(m -> new ByteArrayInputStream(consumeIntoArray((ByteBuf) m))) + .collect(Collectors.toList()))) + .readAllBytes(); + Assertions.assertArrayEquals(fullTrafficBytes, outputData); + + Assertions.assertNotNull(streamManager.byteBufferAtomicReference, + "This would be null if the handler didn't block until the output was written"); + // we wrote the correct data to the offloaded stream + var trafficStream = TrafficStream.parseFrom(streamManager.byteBufferAtomicReference.get()); + Assertions.assertTrue(trafficStream.getSubStreamCount() > 0 && + trafficStream.getSubStream(0).hasRead()); + var combinedTrafficPacketsSteam = + new SequenceInputStream(Collections.enumeration(trafficStream.getSubStreamList().stream() + .filter(TrafficObservation::hasRead) + .map(to -> new ByteArrayInputStream(to.getRead().getData().toByteArray())) + .collect(Collectors.toList()))); + Assertions.assertArrayEquals(fullTrafficBytes, combinedTrafficPacketsSteam.readAllBytes()); + Assertions.assertEquals(1, streamManager.flushCount.get()); + + if (checkInstrumentation) { + Assertions.assertTrue(!rootContext.instrumentationBundle.getFinishedSpans().isEmpty()); + Assertions.assertTrue(!rootContext.instrumentationBundle.getFinishedMetrics().isEmpty()); + } + } } private static byte[] consumeIntoArray(ByteBuf m) { @@ -129,20 +146,29 @@ private static byte[] consumeIntoArray(ByteBuf m) { @ParameterizedTest @ValueSource(booleans = {true, false}) - @ResourceLock("OpenTelemetryExtension") public void testThatAPostInASinglePacketBlocksFutureActivity(boolean usePool) throws IOException { + testThatAPostInASinglePacketBlocksFutureActivity(usePool, true); + } + + public void testThatAPostInASinglePacketBlocksFutureActivity(boolean usePool, boolean checkInstrumentation) + throws IOException { byte[] fullTrafficBytes = SimpleRequests.SMALL_POST.getBytes(StandardCharsets.UTF_8); var bb = TestUtilities.getByteBuf(fullTrafficBytes, usePool); - writeMessageAndVerify(fullTrafficBytes, w -> w.writeInbound(bb)); + writeMessageAndVerify(fullTrafficBytes, w -> w.writeInbound(bb), checkInstrumentation); log.info("buf.refCnt="+bb.refCnt()); } @ParameterizedTest @ValueSource(booleans = {true, false}) - @ResourceLock("OpenTelemetryExtension") public void testThatAPostInTinyPacketsBlocksFutureActivity(boolean usePool) throws IOException { + testThatAPostInTinyPacketsBlocksFutureActivity(usePool, true); + } + + public void testThatAPostInTinyPacketsBlocksFutureActivity(boolean usePool, boolean checkInstrumentation) + throws IOException { byte[] fullTrafficBytes = SimpleRequests.SMALL_POST.getBytes(StandardCharsets.UTF_8); - writeMessageAndVerify(fullTrafficBytes, getSingleByteAtATimeWriter(usePool, fullTrafficBytes)); + writeMessageAndVerify(fullTrafficBytes, getSingleByteAtATimeWriter(usePool, fullTrafficBytes), + checkInstrumentation); } private static Consumer getSingleByteAtATimeWriter(boolean usePool, byte[] fullTrafficBytes) { @@ -161,86 +187,88 @@ private static Consumer getSingleByteAtATimeWriter(boolean useP @Test @ValueSource(booleans = {false, true}) public void testThatSuppressedCaptureWorks() throws Exception { - var rootInstrumenter = new TestRootContext(); - var streamMgr = new TestStreamManager(); - var offloader = new StreamChannelConnectionCaptureSerializer("Test", "connection", streamMgr); - - var headerCapturePredicate = new HeaderValueFilteringCapturePredicate(Map.of("user-Agent", "uploader")); - EmbeddedChannel channel = new EmbeddedChannel( - new ConditionallyReliableLoggingHttpHandler(rootInstrumenter,"n", "c", - ctx -> offloader, headerCapturePredicate, x->true)); - getWriter(false, true, SimpleRequests.HEALTH_CHECK.getBytes(StandardCharsets.UTF_8)).accept(channel); - channel.close(); - var requestBytes = SimpleRequests.HEALTH_CHECK.getBytes(StandardCharsets.UTF_8); - - Assertions.assertEquals(0, streamMgr.flushCount.get()); - // we wrote the correct data to the downstream handler/channel - var outputData = new SequenceInputStream(Collections.enumeration(channel.inboundMessages().stream() - .map(m->new ByteArrayInputStream(consumeIntoArray((ByteBuf)m))) - .collect(Collectors.toList()))) - .readAllBytes(); - log.info("outputdata = " + new String(outputData, StandardCharsets.UTF_8)); - Assertions.assertArrayEquals(requestBytes, outputData); + try (var rootInstrumenter = new TestRootContext()) { + var streamMgr = new TestStreamManager(); + var offloader = new StreamChannelConnectionCaptureSerializer("Test", "connection", streamMgr); + + var headerCapturePredicate = new HeaderValueFilteringCapturePredicate(Map.of("user-Agent", "uploader")); + EmbeddedChannel channel = new EmbeddedChannel( + new ConditionallyReliableLoggingHttpHandler(rootInstrumenter, "n", "c", + ctx -> offloader, headerCapturePredicate, x -> true)); + getWriter(false, true, SimpleRequests.HEALTH_CHECK.getBytes(StandardCharsets.UTF_8)).accept(channel); + channel.close(); + var requestBytes = SimpleRequests.HEALTH_CHECK.getBytes(StandardCharsets.UTF_8); + + Assertions.assertEquals(0, streamMgr.flushCount.get()); + // we wrote the correct data to the downstream handler/channel + var outputData = new SequenceInputStream(Collections.enumeration(channel.inboundMessages().stream() + .map(m -> new ByteArrayInputStream(consumeIntoArray((ByteBuf) m))) + .collect(Collectors.toList()))) + .readAllBytes(); + log.info("outputdata = " + new String(outputData, StandardCharsets.UTF_8)); + Assertions.assertArrayEquals(requestBytes, outputData); + } } @ParameterizedTest @ValueSource(booleans = {false, true}) public void testThatHealthCheckCaptureCanBeSuppressed(boolean singleBytes) throws Exception { - var rootInstrumenter = new TestRootContext(); - var streamMgr = new TestStreamManager(); - var offloader = new StreamChannelConnectionCaptureSerializer("Test", "connection", streamMgr); - - var headerCapturePredicate = new HeaderValueFilteringCapturePredicate(Map.of("user-Agent", ".*uploader.*")); - EmbeddedChannel channel = new EmbeddedChannel( - new ConditionallyReliableLoggingHttpHandler(rootInstrumenter,"n", "c", - ctx -> offloader, headerCapturePredicate, x->false)); - getWriter(singleBytes, true, SimpleRequests.HEALTH_CHECK.getBytes(StandardCharsets.UTF_8)).accept(channel); - channel.writeOutbound(Unpooled.wrappedBuffer("response1".getBytes(StandardCharsets.UTF_8))); - getWriter(singleBytes, true, SimpleRequests.SMALL_POST.getBytes(StandardCharsets.UTF_8)).accept(channel); - var bytesForResponsePreserved = "response2".getBytes(StandardCharsets.UTF_8); - channel.writeOutbound(Unpooled.wrappedBuffer(bytesForResponsePreserved)); - channel.close(); - var requestBytes = (SimpleRequests.HEALTH_CHECK + SimpleRequests.SMALL_POST).getBytes(StandardCharsets.UTF_8); - - // we wrote the correct data to the downstream handler/channel - var consumedData = new SequenceInputStream(Collections.enumeration(channel.inboundMessages().stream() - .map(m->new ByteArrayInputStream(consumeIntoArray((ByteBuf)m))) - .collect(Collectors.toList()))) - .readAllBytes(); - log.info("captureddata = " + new String(consumedData, StandardCharsets.UTF_8)); - Assertions.assertArrayEquals(requestBytes, consumedData); - - Assertions.assertNotNull(streamMgr.byteBufferAtomicReference, - "This would be null if the handler didn't block until the output was written"); - // we wrote the correct data to the offloaded stream - var trafficStream = TrafficStream.parseFrom(streamMgr.byteBufferAtomicReference.get()); - Assertions.assertTrue(trafficStream.getSubStreamCount() > 0 && - trafficStream.getSubStream(0).hasRead()); - Assertions.assertEquals(1, streamMgr.flushCount.get()); - var observations = trafficStream.getSubStreamList(); - { - var readObservationStreamToUse = singleBytes ? skipReadsBeforeDrop(observations) : observations.stream(); - var combinedTrafficPacketsSteam = - new SequenceInputStream(Collections.enumeration(readObservationStreamToUse - .filter(to -> to.hasRead()) - .map(to -> new ByteArrayInputStream(to.getRead().getData().toByteArray())) - .collect(Collectors.toList()))); - var reconstitutedTrafficStreamReads = combinedTrafficPacketsSteam.readAllBytes(); - Assertions.assertArrayEquals(SimpleRequests.SMALL_POST.getBytes(StandardCharsets.UTF_8), - reconstitutedTrafficStreamReads); - } + try (var rootInstrumenter = new TestRootContext()) { + var streamMgr = new TestStreamManager(); + var offloader = new StreamChannelConnectionCaptureSerializer("Test", "connection", streamMgr); + + var headerCapturePredicate = new HeaderValueFilteringCapturePredicate(Map.of("user-Agent", ".*uploader.*")); + EmbeddedChannel channel = new EmbeddedChannel( + new ConditionallyReliableLoggingHttpHandler(rootInstrumenter, "n", "c", + ctx -> offloader, headerCapturePredicate, x -> false)); + getWriter(singleBytes, true, SimpleRequests.HEALTH_CHECK.getBytes(StandardCharsets.UTF_8)).accept(channel); + channel.writeOutbound(Unpooled.wrappedBuffer("response1".getBytes(StandardCharsets.UTF_8))); + getWriter(singleBytes, true, SimpleRequests.SMALL_POST.getBytes(StandardCharsets.UTF_8)).accept(channel); + var bytesForResponsePreserved = "response2".getBytes(StandardCharsets.UTF_8); + channel.writeOutbound(Unpooled.wrappedBuffer(bytesForResponsePreserved)); + channel.close(); + var requestBytes = (SimpleRequests.HEALTH_CHECK + SimpleRequests.SMALL_POST).getBytes(StandardCharsets.UTF_8); + + // we wrote the correct data to the downstream handler/channel + var consumedData = new SequenceInputStream(Collections.enumeration(channel.inboundMessages().stream() + .map(m -> new ByteArrayInputStream(consumeIntoArray((ByteBuf) m))) + .collect(Collectors.toList()))) + .readAllBytes(); + log.info("captureddata = " + new String(consumedData, StandardCharsets.UTF_8)); + Assertions.assertArrayEquals(requestBytes, consumedData); + + Assertions.assertNotNull(streamMgr.byteBufferAtomicReference, + "This would be null if the handler didn't block until the output was written"); + // we wrote the correct data to the offloaded stream + var trafficStream = TrafficStream.parseFrom(streamMgr.byteBufferAtomicReference.get()); + Assertions.assertTrue(trafficStream.getSubStreamCount() > 0 && + trafficStream.getSubStream(0).hasRead()); + Assertions.assertEquals(1, streamMgr.flushCount.get()); + var observations = trafficStream.getSubStreamList(); + { + var readObservationStreamToUse = singleBytes ? skipReadsBeforeDrop(observations) : observations.stream(); + var combinedTrafficPacketsSteam = + new SequenceInputStream(Collections.enumeration(readObservationStreamToUse + .filter(to -> to.hasRead()) + .map(to -> new ByteArrayInputStream(to.getRead().getData().toByteArray())) + .collect(Collectors.toList()))); + var reconstitutedTrafficStreamReads = combinedTrafficPacketsSteam.readAllBytes(); + Assertions.assertArrayEquals(SimpleRequests.SMALL_POST.getBytes(StandardCharsets.UTF_8), + reconstitutedTrafficStreamReads); + } - // check that we only got one response - { - var combinedTrafficPacketsSteam = - new SequenceInputStream(Collections.enumeration(observations.stream() - .filter(to->to.hasWrite()) - .map(to->new ByteArrayInputStream(to.getWrite().getData().toByteArray())) - .collect(Collectors.toList()))); - var reconstitutedTrafficStreamWrites = combinedTrafficPacketsSteam.readAllBytes(); - log.info("reconstitutedTrafficStreamWrites="+ - new String(reconstitutedTrafficStreamWrites, StandardCharsets.UTF_8)); - Assertions.assertArrayEquals(bytesForResponsePreserved, reconstitutedTrafficStreamWrites); + // check that we only got one response + { + var combinedTrafficPacketsSteam = + new SequenceInputStream(Collections.enumeration(observations.stream() + .filter(to -> to.hasWrite()) + .map(to -> new ByteArrayInputStream(to.getWrite().getData().toByteArray())) + .collect(Collectors.toList()))); + var reconstitutedTrafficStreamWrites = combinedTrafficPacketsSteam.readAllBytes(); + log.info("reconstitutedTrafficStreamWrites=" + + new String(reconstitutedTrafficStreamWrites, StandardCharsets.UTF_8)); + Assertions.assertArrayEquals(bytesForResponsePreserved, reconstitutedTrafficStreamWrites); + } } } @@ -264,18 +292,16 @@ private Consumer getWriter(boolean singleBytes, boolean usePool @ParameterizedTest @ValueSource(booleans = {true, false}) @WrapWithNettyLeakDetection(repetitions = 16) - @ResourceLock("OpenTelemetryExtension") public void testThatAPostInTinyPacketsBlocksFutureActivity_withLeakDetection(boolean usePool) throws Exception { - testThatAPostInTinyPacketsBlocksFutureActivity(usePool); + testThatAPostInTinyPacketsBlocksFutureActivity(usePool, false); //MyResourceLeakDetector.dumpHeap("nettyWireLogging_"+COUNT+"_"+ Instant.now() +".hprof", true); } @ParameterizedTest @ValueSource(booleans = {true, false}) @WrapWithNettyLeakDetection(repetitions = 32) - @ResourceLock("OpenTelemetryExtension") public void testThatAPostInASinglePacketBlocksFutureActivity_withLeakDetection(boolean usePool) throws Exception { - testThatAPostInASinglePacketBlocksFutureActivity(usePool); + testThatAPostInASinglePacketBlocksFutureActivity(usePool, false); //MyResourceLeakDetector.dumpHeap("nettyWireLogging_"+COUNT+"_"+ Instant.now() +".hprof", true); } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullReplayerWithTracingChecksTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullReplayerWithTracingChecksTest.java index 871306376..29fa6219c 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullReplayerWithTracingChecksTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullReplayerWithTracingChecksTest.java @@ -2,7 +2,6 @@ import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; -import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; import io.opentelemetry.sdk.trace.data.SpanData; import lombok.Lombok; import lombok.extern.slf4j.Slf4j; @@ -14,6 +13,7 @@ import org.opensearch.migrations.replay.traffic.source.BlockingTrafficSource; import org.opensearch.migrations.testutils.SimpleNettyHttpServer; import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection; +import org.opensearch.migrations.tracing.InMemoryInstrumentationBundle; import org.opensearch.migrations.tracing.TestContext; import org.opensearch.migrations.trafficcapture.protos.CloseObservation; import org.opensearch.migrations.trafficcapture.protos.EndOfMessageIndication; @@ -118,15 +118,14 @@ public void testStreamWithRequestsWithCloseIsCommittedOnce(int numRequests) thro } Assertions.assertEquals(numRequests, tuplesReceived.size()); - checkSpansForSimpleReplayedTransactions(rootContext.inMemoryInstrumentationBundle.testSpanExporter, - numRequests); + checkSpansForSimpleReplayedTransactions(rootContext.inMemoryInstrumentationBundle, numRequests); log.info("done"); } private static class TraceProcessor { Map> byName; - public TraceProcessor(InMemorySpanExporter testSpanExporter) { - byName = testSpanExporter.getFinishedSpanItems().stream().collect(Collectors.groupingBy(SpanData::getName)); + public TraceProcessor(List finishedSpans) { + byName = finishedSpans.stream().collect(Collectors.groupingBy(SpanData::getName)); } private int getCountAndRemoveSpan(String k) { @@ -144,8 +143,9 @@ public String getRemainingItemsString() { * This function is written like this rather than with a loop so that the backtrace will show WHICH * key was corrupted. */ - private void checkSpansForSimpleReplayedTransactions(InMemorySpanExporter testSpanExporter, int numRequests) { - var traceProcessor = new TraceProcessor(testSpanExporter); + private void checkSpansForSimpleReplayedTransactions(InMemoryInstrumentationBundle inMemoryBundle, + int numRequests) { + var traceProcessor = new TraceProcessor(inMemoryBundle.getFinishedSpans()); for (int numTries=1; ; ++numTries) { final String TCP_CONNECTION_SCOPE_NAME = "tcpConnection"; var numTraces = traceProcessor.getCountAndRemoveSpan(TCP_CONNECTION_SCOPE_NAME); @@ -153,7 +153,7 @@ private void checkSpansForSimpleReplayedTransactions(InMemorySpanExporter testSp case 1: break; case -1: - traceProcessor = new TraceProcessor(testSpanExporter); + traceProcessor = new TraceProcessor(inMemoryBundle.getFinishedSpans()); if (numTries > 5) { Assertions.fail("Even after waiting/polling, no " + TCP_CONNECTION_SCOPE_NAME + " was found."); } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDictsTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDictsTest.java index 056e33a64..f889dcc44 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDictsTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDictsTest.java @@ -8,10 +8,6 @@ class ParsedHttpMessagesAsDictsTest extends InstrumentationTest { - ParsedHttpMessagesAsDicts makeTestData() { - return makeTestData(null, null); - } - @Override protected TestContext makeInstrumentationContext() { return TestContext.withTracking(false, true); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java index 0f395d310..d7f99c476 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java @@ -280,7 +280,7 @@ private void testOutputterForRequest(String requestResourceName, String expected } catch (InterruptedException e) { throw new RuntimeException(e); } - var allMetricData = rootContext.inMemoryInstrumentationBundle.testMetricExporter.getFinishedMetricItems(); + var allMetricData = rootContext.inMemoryInstrumentationBundle.getFinishedMetrics(); var filteredMetrics = allMetricData.stream().filter(md->md.getName().startsWith("tupleResult")) .collect(Collectors.toList()); // TODO - find out how to verify these metrics diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java index 049c9b527..792ef2c5b 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java @@ -194,8 +194,7 @@ public void testThatConnectionsAreKeptAliveAndShared(boolean useTls) @WrapWithNettyLeakDetection(repetitions = 1) public void testMetricCountsFor_testThatConnectionsAreKeptAliveAndShared(boolean useTls) throws Exception { testThatConnectionsAreKeptAliveAndShared(useTls); - Thread.sleep(200); // let metrics settle down - var allMetricData = rootContext.inMemoryInstrumentationBundle.testMetricExporter.getFinishedMetricItems(); + var allMetricData = rootContext.inMemoryInstrumentationBundle.getFinishedMetrics(); long tcpOpenConnectionCount = allMetricData.stream().filter(md->md.getName().startsWith("tcpConnectionCount")) .reduce((a,b)->b).get().getLongSumData().getPoints().stream().reduce((a,b)->b).get().getValue(); long connectionsOpenedCount = allMetricData.stream().filter(md->md.getName().startsWith("connectionsOpened")) diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/tracing/TracingTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/tracing/TracingTest.java index d6cbd2fc2..941fe0bfe 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/tracing/TracingTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/tracing/TracingTest.java @@ -14,6 +14,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -59,8 +60,8 @@ public void tracingWorks() { } } - var recordedSpans = rootContext.inMemoryInstrumentationBundle.testSpanExporter.getFinishedSpanItems(); - var recordedMetrics = rootContext.inMemoryInstrumentationBundle.testMetricExporter.getFinishedMetricItems(); + var recordedSpans = rootContext.inMemoryInstrumentationBundle.getFinishedSpans(); + var recordedMetrics = rootContext.inMemoryInstrumentationBundle.getFinishedMetrics(); checkSpans(recordedSpans); checkMetrics(recordedMetrics); @@ -68,7 +69,7 @@ public void tracingWorks() { Assertions.assertTrue(rootContext.contextTracker.getAllRemainingActiveScopes().isEmpty()); } - private void checkMetrics(List recordedMetrics) { + private void checkMetrics(Collection recordedMetrics) { } private void checkSpans(List recordedSpans) { From 7eaf6324e7840b8c263504242b7fafd450816f9d Mon Sep 17 00:00:00 2001 From: Greg Schohn Date: Wed, 21 Feb 2024 15:09:12 -0500 Subject: [PATCH 2/2] As per PR feedback, just use an InMemoryMetricReader instead. Signed-off-by: Greg Schohn --- .../InMemoryInstrumentationBundle.java | 89 +++---------------- .../NettyPacketToHttpConsumerTest.java | 1 + 2 files changed, 12 insertions(+), 78 deletions(-) diff --git a/TrafficCapture/coreUtilities/src/testFixtures/java/org/opensearch/migrations/tracing/InMemoryInstrumentationBundle.java b/TrafficCapture/coreUtilities/src/testFixtures/java/org/opensearch/migrations/tracing/InMemoryInstrumentationBundle.java index 2ed8900c5..a63dc0bd7 100644 --- a/TrafficCapture/coreUtilities/src/testFixtures/java/org/opensearch/migrations/tracing/InMemoryInstrumentationBundle.java +++ b/TrafficCapture/coreUtilities/src/testFixtures/java/org/opensearch/migrations/tracing/InMemoryInstrumentationBundle.java @@ -1,112 +1,50 @@ package org.opensearch.migrations.tracing; import io.opentelemetry.sdk.OpenTelemetrySdk; -import io.opentelemetry.sdk.common.CompletableResultCode; -import io.opentelemetry.sdk.metrics.InstrumentType; import io.opentelemetry.sdk.metrics.SdkMeterProvider; -import io.opentelemetry.sdk.metrics.data.AggregationTemporality; import io.opentelemetry.sdk.metrics.data.MetricData; -import io.opentelemetry.sdk.metrics.export.MetricExporter; -import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter; import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; import lombok.Getter; import lombok.Lombok; -import lombok.NonNull; import lombok.SneakyThrows; import java.io.IOException; import java.time.Duration; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; public class InMemoryInstrumentationBundle implements AutoCloseable { - public static final Duration DEFAULT_COLLECTION_PERIOD = Duration.ofMillis(1); - private static final int MIN_MILLIS_TO_WAIT_FOR_FINISH = 10; - - public static class LastMetricsExporter implements MetricExporter { - private final Queue finishedMetricItems = new ConcurrentLinkedQueue<>(); - boolean isStopped; - - public List getFinishedMetricItems() { - return Collections.unmodifiableList(new ArrayList<>(finishedMetricItems)); - } - - @Override - public CompletableResultCode export(@NonNull Collection metrics) { - if (isStopped) { - return CompletableResultCode.ofFailure(); - } - finishedMetricItems.clear(); - finishedMetricItems.addAll(metrics); - return CompletableResultCode.ofSuccess(); - } - - @Override - public CompletableResultCode flush() { - return CompletableResultCode.ofSuccess(); - } - - @Override - public CompletableResultCode shutdown() { - isStopped = true; - return CompletableResultCode.ofSuccess(); - } - - @Override - public AggregationTemporality getAggregationTemporality(@NonNull InstrumentType instrumentType) { - return AggregationTemporality.CUMULATIVE; - } - } - @Getter public final OpenTelemetrySdk openTelemetrySdk; private final InMemorySpanExporter testSpanExporter; - private final LastMetricsExporter testMetricExporter; - private final PeriodicMetricReader periodicMetricReader; - private final Duration collectionPeriod; - private boolean alreadyWaitedForMetrics; + private final InMemoryMetricReader testMetricReader; public InMemoryInstrumentationBundle(boolean collectTraces, boolean collectMetrics) { this(collectTraces ? InMemorySpanExporter.create() : null, - collectMetrics ? new LastMetricsExporter() : null); + collectMetrics ? InMemoryMetricReader.create() : null); } public InMemoryInstrumentationBundle(InMemorySpanExporter testSpanExporter, - LastMetricsExporter testMetricExporter) { - this(testSpanExporter, testMetricExporter, DEFAULT_COLLECTION_PERIOD); - } - - public InMemoryInstrumentationBundle(InMemorySpanExporter testSpanExporter, - LastMetricsExporter testMetricExporter, - Duration collectionPeriod) { + InMemoryMetricReader testMetricReader) { this.testSpanExporter = testSpanExporter; - this.testMetricExporter = testMetricExporter; - this.collectionPeriod = collectionPeriod; + this.testMetricReader = testMetricReader; var otelBuilder = OpenTelemetrySdk.builder(); if (testSpanExporter != null) { otelBuilder = otelBuilder.setTracerProvider(SdkTracerProvider.builder() .addSpanProcessor(SimpleSpanProcessor.create(testSpanExporter)).build()); } - if (testMetricExporter != null) { - this.periodicMetricReader = PeriodicMetricReader.builder(testMetricExporter) - .setInterval(Duration.ofMillis(collectionPeriod.toMillis())) - .build(); + if (testMetricReader != null) { otelBuilder = otelBuilder.setMeterProvider(SdkMeterProvider.builder() - .registerMetricReader(periodicMetricReader) + .registerMetricReader(testMetricReader) .build()); - } else { - this.periodicMetricReader = null; } openTelemetrySdk = otelBuilder.build(); } @@ -124,25 +62,20 @@ public List getFinishedSpans() { */ @SneakyThrows public Collection getFinishedMetrics() { - if (testMetricExporter == null) { + if (testMetricReader == null) { throw new IllegalStateException("Metrics collector was not configured"); } - if (!alreadyWaitedForMetrics) { - Thread.sleep(Math.max(collectionPeriod.toMillis() * 2, MIN_MILLIS_TO_WAIT_FOR_FINISH)); - alreadyWaitedForMetrics = true; - } - return testMetricExporter.getFinishedMetricItems(); + return testMetricReader.collectAllMetrics(); } @Override public void close() { - Optional.ofNullable(testMetricExporter).ifPresent(me -> { + Optional.ofNullable(testMetricReader).ifPresent(me -> { try { - periodicMetricReader.close(); + me.close(); } catch (IOException e) { throw Lombok.sneakyThrow(e); } - me.close(); }); Optional.ofNullable(testSpanExporter).ifPresent(te -> { te.close(); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java index 792ef2c5b..3d5901e58 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java @@ -194,6 +194,7 @@ public void testThatConnectionsAreKeptAliveAndShared(boolean useTls) @WrapWithNettyLeakDetection(repetitions = 1) public void testMetricCountsFor_testThatConnectionsAreKeptAliveAndShared(boolean useTls) throws Exception { testThatConnectionsAreKeptAliveAndShared(useTls); + Thread.sleep(200); // let metrics settle down var allMetricData = rootContext.inMemoryInstrumentationBundle.getFinishedMetrics(); long tcpOpenConnectionCount = allMetricData.stream().filter(md->md.getName().startsWith("tcpConnectionCount")) .reduce((a,b)->b).get().getLongSumData().getPoints().stream().reduce((a,b)->b).get().getValue();