Skip to content

Commit

Permalink
Merge pull request #517 from gregschohn/FixFlakyOtelTests
Browse files Browse the repository at this point in the history
Fix some flaky tests that were checking if metrics were being emitted.
  • Loading branch information
gregschohn authored Mar 1, 2024
2 parents 7987740 + 7eaf632 commit f35c54a
Show file tree
Hide file tree
Showing 8 changed files with 203 additions and 197 deletions.
Original file line number Diff line number Diff line change
@@ -1,100 +1,82 @@
package org.opensearch.migrations.tracing;

import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricExporter;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import lombok.Getter;
import lombok.NonNull;
import lombok.Lombok;
import lombok.SneakyThrows;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

@Getter
public class InMemoryInstrumentationBundle implements AutoCloseable {

public static class LastMetricsExporter implements MetricExporter {
private final Queue<MetricData> finishedMetricItems = new ConcurrentLinkedQueue<>();
boolean isStopped;

public List<MetricData> getFinishedMetricItems() {
return Collections.unmodifiableList(new ArrayList<>(finishedMetricItems));
}

@Override
public CompletableResultCode export(@NonNull Collection<MetricData> metrics) {
if (isStopped) {
return CompletableResultCode.ofFailure();
}
finishedMetricItems.clear();
finishedMetricItems.addAll(metrics);
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
isStopped = true;
return CompletableResultCode.ofSuccess();
}

@Override
public AggregationTemporality getAggregationTemporality(@NonNull InstrumentType instrumentType) {
return AggregationTemporality.CUMULATIVE;
}
}

@Getter
public final OpenTelemetrySdk openTelemetrySdk;
public final InMemorySpanExporter testSpanExporter;
public final LastMetricsExporter testMetricExporter;
private final InMemorySpanExporter testSpanExporter;
private final InMemoryMetricReader testMetricReader;

public InMemoryInstrumentationBundle(boolean collectTraces,
boolean collectMetrics) {
this(collectTraces ? InMemorySpanExporter.create() : null,
collectMetrics ? new LastMetricsExporter() : null);
collectMetrics ? InMemoryMetricReader.create() : null);
}

public InMemoryInstrumentationBundle(InMemorySpanExporter testSpanExporter,
LastMetricsExporter testMetricExporter) {
InMemoryMetricReader testMetricReader) {
this.testSpanExporter = testSpanExporter;
this.testMetricExporter = testMetricExporter;
this.testMetricReader = testMetricReader;

var otelBuilder = OpenTelemetrySdk.builder();
if (testSpanExporter != null) {
otelBuilder = otelBuilder.setTracerProvider(SdkTracerProvider.builder()
.addSpanProcessor(SimpleSpanProcessor.create(testSpanExporter)).build());
}
if (testMetricExporter != null) {
if (testMetricReader != null) {
otelBuilder = otelBuilder.setMeterProvider(SdkMeterProvider.builder()
.registerMetricReader(PeriodicMetricReader.builder(testMetricExporter)
.setInterval(Duration.ofMillis(100))
.build())
.registerMetricReader(testMetricReader)
.build());
}
openTelemetrySdk = otelBuilder.build();
}

public List<SpanData> getFinishedSpans() {
if (testSpanExporter == null) {
throw new IllegalStateException("Metrics collector was not configured");
}
return testSpanExporter.getFinishedSpanItems();
}

/**
* Waits double the collectionPeriod time (once) before returning the collected metrics
* @return
*/
@SneakyThrows
public Collection<MetricData> getFinishedMetrics() {
if (testMetricReader == null) {
throw new IllegalStateException("Metrics collector was not configured");
}
return testMetricReader.collectAllMetrics();
}

@Override
public void close() {
Optional.ofNullable(testMetricExporter).ifPresent(MetricExporter::close);
Optional.ofNullable(testMetricReader).ifPresent(me -> {
try {
me.close();
} catch (IOException e) {
throw Lombok.sneakyThrow(e);
}
});
Optional.ofNullable(testSpanExporter).ifPresent(te -> {
te.close();
te.reset();
Expand Down
1 change: 1 addition & 0 deletions TrafficCapture/nettyWireLogging/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,5 @@ dependencies {
testImplementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.7'

testImplementation testFixtures(project(path: ':testUtilities'))
testImplementation testFixtures(project(path: ':coreUtilities'))
}
Loading

0 comments on commit f35c54a

Please sign in to comment.