Skip to content

Commit

Permalink
As per PR feedback, just use an InMemoryMetricReader instead.
Browse files Browse the repository at this point in the history
Signed-off-by: Greg Schohn <greg.schohn@gmail.com>
  • Loading branch information
gregschohn committed Mar 1, 2024
1 parent 4b6fa93 commit 7eaf632
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 78 deletions.
Original file line number Diff line number Diff line change
@@ -1,112 +1,50 @@
package org.opensearch.migrations.tracing;

import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import lombok.Getter;
import lombok.Lombok;
import lombok.NonNull;
import lombok.SneakyThrows;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

public class InMemoryInstrumentationBundle implements AutoCloseable {

public static final Duration DEFAULT_COLLECTION_PERIOD = Duration.ofMillis(1);
private static final int MIN_MILLIS_TO_WAIT_FOR_FINISH = 10;

public static class LastMetricsExporter implements MetricExporter {
private final Queue<MetricData> finishedMetricItems = new ConcurrentLinkedQueue<>();
boolean isStopped;

public List<MetricData> getFinishedMetricItems() {
return Collections.unmodifiableList(new ArrayList<>(finishedMetricItems));
}

@Override
public CompletableResultCode export(@NonNull Collection<MetricData> metrics) {
if (isStopped) {
return CompletableResultCode.ofFailure();
}
finishedMetricItems.clear();
finishedMetricItems.addAll(metrics);
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode flush() {
return CompletableResultCode.ofSuccess();
}

@Override
public CompletableResultCode shutdown() {
isStopped = true;
return CompletableResultCode.ofSuccess();
}

@Override
public AggregationTemporality getAggregationTemporality(@NonNull InstrumentType instrumentType) {
return AggregationTemporality.CUMULATIVE;
}
}

@Getter
public final OpenTelemetrySdk openTelemetrySdk;
private final InMemorySpanExporter testSpanExporter;
private final LastMetricsExporter testMetricExporter;
private final PeriodicMetricReader periodicMetricReader;
private final Duration collectionPeriod;
private boolean alreadyWaitedForMetrics;
private final InMemoryMetricReader testMetricReader;

public InMemoryInstrumentationBundle(boolean collectTraces,
boolean collectMetrics) {
this(collectTraces ? InMemorySpanExporter.create() : null,
collectMetrics ? new LastMetricsExporter() : null);
collectMetrics ? InMemoryMetricReader.create() : null);
}

public InMemoryInstrumentationBundle(InMemorySpanExporter testSpanExporter,
LastMetricsExporter testMetricExporter) {
this(testSpanExporter, testMetricExporter, DEFAULT_COLLECTION_PERIOD);
}

public InMemoryInstrumentationBundle(InMemorySpanExporter testSpanExporter,
LastMetricsExporter testMetricExporter,
Duration collectionPeriod) {
InMemoryMetricReader testMetricReader) {
this.testSpanExporter = testSpanExporter;
this.testMetricExporter = testMetricExporter;
this.collectionPeriod = collectionPeriod;
this.testMetricReader = testMetricReader;

var otelBuilder = OpenTelemetrySdk.builder();
if (testSpanExporter != null) {
otelBuilder = otelBuilder.setTracerProvider(SdkTracerProvider.builder()
.addSpanProcessor(SimpleSpanProcessor.create(testSpanExporter)).build());
}
if (testMetricExporter != null) {
this.periodicMetricReader = PeriodicMetricReader.builder(testMetricExporter)
.setInterval(Duration.ofMillis(collectionPeriod.toMillis()))
.build();
if (testMetricReader != null) {
otelBuilder = otelBuilder.setMeterProvider(SdkMeterProvider.builder()
.registerMetricReader(periodicMetricReader)
.registerMetricReader(testMetricReader)
.build());
} else {
this.periodicMetricReader = null;
}
openTelemetrySdk = otelBuilder.build();
}
Expand All @@ -124,25 +62,20 @@ public List<SpanData> getFinishedSpans() {
*/
@SneakyThrows
public Collection<MetricData> getFinishedMetrics() {
if (testMetricExporter == null) {
if (testMetricReader == null) {
throw new IllegalStateException("Metrics collector was not configured");
}
if (!alreadyWaitedForMetrics) {
Thread.sleep(Math.max(collectionPeriod.toMillis() * 2, MIN_MILLIS_TO_WAIT_FOR_FINISH));
alreadyWaitedForMetrics = true;
}
return testMetricExporter.getFinishedMetricItems();
return testMetricReader.collectAllMetrics();
}

@Override
public void close() {
Optional.ofNullable(testMetricExporter).ifPresent(me -> {
Optional.ofNullable(testMetricReader).ifPresent(me -> {
try {
periodicMetricReader.close();
me.close();
} catch (IOException e) {
throw Lombok.sneakyThrow(e);
}
me.close();
});
Optional.ofNullable(testSpanExporter).ifPresent(te -> {
te.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ public void testThatConnectionsAreKeptAliveAndShared(boolean useTls)
@WrapWithNettyLeakDetection(repetitions = 1)
public void testMetricCountsFor_testThatConnectionsAreKeptAliveAndShared(boolean useTls) throws Exception {
testThatConnectionsAreKeptAliveAndShared(useTls);
Thread.sleep(200); // let metrics settle down
var allMetricData = rootContext.inMemoryInstrumentationBundle.getFinishedMetrics();
long tcpOpenConnectionCount = allMetricData.stream().filter(md->md.getName().startsWith("tcpConnectionCount"))
.reduce((a,b)->b).get().getLongSumData().getPoints().stream().reduce((a,b)->b).get().getValue();
Expand Down

0 comments on commit 7eaf632

Please sign in to comment.