Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added custom sampler support based on action in request (#10136) #12223

Merged
merged 1 commit into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added Support for dynamically adding SearchRequestOperationsListeners with SearchRequestOperationsCompositeListenerFactory ([#11526](https://github.com/opensearch-project/OpenSearch/pull/11526))
- [Query Insights] Query Insights Framework which currently supports retrieving the most time-consuming queries within the last configured time window ([#11903](https://github.com/opensearch-project/OpenSearch/pull/11903))
- [Query Insights] Implement Top N Queries feature to collect and gather information about high latency queries in a window ([#11904](https://github.com/opensearch-project/OpenSearch/pull/11904))
- Add override support for sampling based on action ([#9621](https://github.com/opensearch-project/OpenSearch/issues/9621))
- Added custom sampler support based on transport action in request ([#9621](https://github.com/opensearch-project/OpenSearch/issues/9621))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ public List<Setting<?>> getSettings() {
OTelTelemetrySettings.TRACER_EXPORTER_DELAY_SETTING,
OTelTelemetrySettings.TRACER_EXPORTER_MAX_QUEUE_SIZE_SETTING,
OTelTelemetrySettings.OTEL_TRACER_SPAN_EXPORTER_CLASS_SETTING,
OTelTelemetrySettings.OTEL_METRICS_EXPORTER_CLASS_SETTING
OTelTelemetrySettings.OTEL_TRACER_SPAN_SAMPLER_CLASS_SETTINGS,
OTelTelemetrySettings.OTEL_METRICS_EXPORTER_CLASS_SETTING,
OTelTelemetrySettings.TRACER_SAMPLER_ACTION_PROBABILITY
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,21 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.telemetry.metrics.exporter.OTelMetricsExporterFactory;
import org.opensearch.telemetry.tracing.exporter.OTelSpanExporterFactory;
import org.opensearch.telemetry.tracing.sampler.OTelSamplerFactory;
import org.opensearch.telemetry.tracing.sampler.ProbabilisticSampler;
import org.opensearch.telemetry.tracing.sampler.ProbabilisticTransportActionSampler;

import java.security.AccessController;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.List;

import io.opentelemetry.exporter.logging.LoggingMetricExporter;
import io.opentelemetry.exporter.logging.LoggingSpanExporter;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import io.opentelemetry.sdk.trace.samplers.Sampler;

/**
* OTel specific telemetry settings.
Expand Down Expand Up @@ -110,4 +116,40 @@
Setting.Property.NodeScope,
Setting.Property.Final
);

/**
* Samplers orders setting.
*/
@SuppressWarnings("unchecked")
public static final Setting<List<Class<Sampler>>> OTEL_TRACER_SPAN_SAMPLER_CLASS_SETTINGS = Setting.listSetting(
"telemetry.otel.tracer.span.sampler.classes",
Arrays.asList(ProbabilisticTransportActionSampler.class.getName(), ProbabilisticSampler.class.getName()),
sampler -> {
// Check we ourselves are not being called by unprivileged code.
SpecialPermission.check();
try {
return AccessController.doPrivileged((PrivilegedExceptionAction<Class<Sampler>>) () -> {
final ClassLoader loader = OTelSamplerFactory.class.getClassLoader();
return (Class<Sampler>) loader.loadClass(sampler);
});
} catch (PrivilegedActionException ex) {
throw new IllegalStateException("Unable to load sampler class: " + sampler, ex.getCause());

Check warning on line 136 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetrySettings.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetrySettings.java#L135-L136

Added lines #L135 - L136 were not covered by tests
}
},
Setting.Property.NodeScope,
Setting.Property.Final
);

/**
* Probability of action based sampler
*/
public static final Setting<Double> TRACER_SAMPLER_ACTION_PROBABILITY = Setting.doubleSetting(
"telemetry.tracer.action.sampler.probability",
0.001d,
0.000d,
1.00d,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.telemetry.metrics.exporter.OTelMetricsExporterFactory;
import org.opensearch.telemetry.tracing.exporter.OTelSpanExporterFactory;
import org.opensearch.telemetry.tracing.sampler.ProbabilisticSampler;
import org.opensearch.telemetry.tracing.sampler.OTelSamplerFactory;
import org.opensearch.telemetry.tracing.sampler.RequestSampler;

import java.security.AccessController;
Expand Down Expand Up @@ -60,7 +60,7 @@ public static OpenTelemetrySdk get(TelemetrySettings telemetrySettings, Settings
settings,
OTelSpanExporterFactory.create(settings),
ContextPropagators.create(W3CTraceContextPropagator.getInstance()),
Sampler.parentBased(new RequestSampler(new ProbabilisticSampler(telemetrySettings)))
Sampler.parentBased(new RequestSampler(OTelSamplerFactory.create(telemetrySettings, settings)))
)
);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.sampler;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.SpecialPermission;
import org.opensearch.common.settings.Settings;
import org.opensearch.telemetry.OTelTelemetrySettings;
import org.opensearch.telemetry.TelemetrySettings;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.security.AccessController;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.ListIterator;

import io.opentelemetry.sdk.trace.samplers.Sampler;

/**
* Factory class to create the instance of OTelSampler
*/
public class OTelSamplerFactory {

/**
* Logger instance for logging messages related to the OTelSamplerFactory.
*/
private static final Logger logger = LogManager.getLogger(OTelSamplerFactory.class);

/**
* Base constructor.
*/
private OTelSamplerFactory() {

}

/**
* Creates the {@link Sampler} instances based on the TRACER_SPAN_SAMPLER_CLASSES value.
*
* @param telemetrySettings TelemetrySettings.
* @param settings the settings
* @return list of samplers.
*/
public static Sampler create(TelemetrySettings telemetrySettings, Settings settings) {
List<Class<Sampler>> samplersNameList = OTelTelemetrySettings.OTEL_TRACER_SPAN_SAMPLER_CLASS_SETTINGS.get(settings);
ListIterator<Class<Sampler>> li = samplersNameList.listIterator(samplersNameList.size());

Sampler fallbackSampler = null;

// Iterating samplers list in reverse order to create chain of sampler
while (li.hasPrevious()) {
Class<Sampler> samplerName = li.previous();
fallbackSampler = instantiateSampler(samplerName, telemetrySettings, settings, fallbackSampler);
}

return fallbackSampler;
}

private static Sampler instantiateSampler(
Class<Sampler> samplerClassName,
TelemetrySettings telemetrySettings,
Settings settings,
Sampler fallbackSampler
) {
try {
// Check we ourselves are not being called by unprivileged code.
SpecialPermission.check();

return AccessController.doPrivileged((PrivilegedExceptionAction<Sampler>) () -> {
try {
// Define the method type which receives TelemetrySettings & Sampler as arguments
MethodType methodType = MethodType.methodType(Sampler.class, TelemetrySettings.class, Settings.class, Sampler.class);

return (Sampler) MethodHandles.publicLookup()
.findStatic(samplerClassName, "create", methodType)
.invokeExact(telemetrySettings, settings, fallbackSampler);
} catch (Throwable e) {

Check warning on line 84 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/OTelSamplerFactory.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/OTelSamplerFactory.java#L84

Added line #L84 was not covered by tests
if (e.getCause() instanceof NoSuchMethodException) {
throw new IllegalStateException("No create method exist in [" + samplerClassName + "]", e.getCause());

Check warning on line 86 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/OTelSamplerFactory.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/OTelSamplerFactory.java#L86

Added line #L86 was not covered by tests
} else {
throw new IllegalStateException("Sampler instantiation failed for class [" + samplerClassName + "]", e.getCause());

Check warning on line 88 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/OTelSamplerFactory.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/OTelSamplerFactory.java#L88

Added line #L88 was not covered by tests
}
}
});
} catch (Exception e) {
throw new IllegalStateException("Sampler instantiation failed for class [" + samplerClassName + "]", e.getCause());

Check warning on line 93 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/OTelSamplerFactory.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/OTelSamplerFactory.java#L92-L93

Added lines #L92 - L93 were not covered by tests
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.telemetry.tracing.sampler;

import org.opensearch.common.settings.Settings;
import org.opensearch.telemetry.TelemetrySettings;

import java.util.List;
Expand All @@ -18,36 +19,43 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingDecision;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;

/**
* ProbabilisticSampler implements a head-based sampling strategy based on provided settings.
* ProbabilisticSampler implements a probability sampling strategy based on configured sampling ratio.
*/
public class ProbabilisticSampler implements Sampler {
private Sampler defaultSampler;
private final TelemetrySettings telemetrySettings;
private final Settings settings;
private final Sampler fallbackSampler;

private double samplingRatio;

/**
* Constructor
*
* @param telemetrySettings Telemetry settings.
*/
public ProbabilisticSampler(TelemetrySettings telemetrySettings) {
private ProbabilisticSampler(TelemetrySettings telemetrySettings, Settings settings, Sampler fallbackSampler) {
this.telemetrySettings = Objects.requireNonNull(telemetrySettings);
this.settings = Objects.requireNonNull(settings);
this.samplingRatio = telemetrySettings.getSamplingProbability();
this.defaultSampler = Sampler.traceIdRatioBased(samplingRatio);
this.fallbackSampler = fallbackSampler;
}

Sampler getSampler() {
double newSamplingRatio = telemetrySettings.getSamplingProbability();
if (isSamplingRatioChanged(newSamplingRatio)) {
synchronized (this) {
this.samplingRatio = newSamplingRatio;
defaultSampler = Sampler.traceIdRatioBased(samplingRatio);
}
}
return defaultSampler;
/**
* Create probabilistic sampler.
*
* @param telemetrySettings the telemetry settings
* @param settings the settings
* @param fallbackSampler the fallback sampler
* @return the probabilistic sampler
*/
public static Sampler create(TelemetrySettings telemetrySettings, Settings settings, Sampler fallbackSampler) {
return new ProbabilisticSampler(telemetrySettings, settings, fallbackSampler);
}

private boolean isSamplingRatioChanged(double newSamplingRatio) {
Expand All @@ -67,7 +75,19 @@
Attributes attributes,
List<LinkData> parentLinks
) {
return getSampler().shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
double newSamplingRatio = telemetrySettings.getSamplingProbability();
if (isSamplingRatioChanged(newSamplingRatio)) {
synchronized (this) {
this.samplingRatio = newSamplingRatio;
defaultSampler = Sampler.traceIdRatioBased(samplingRatio);
}
}
final SamplingResult result = defaultSampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
if (result.getDecision() != SamplingDecision.DROP && fallbackSampler != null) {
return fallbackSampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);

Check warning on line 87 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticSampler.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticSampler.java#L87

Added line #L87 was not covered by tests
} else {
return result;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.telemetry.tracing.sampler;

import org.opensearch.common.settings.Settings;
import org.opensearch.telemetry.OTelTelemetrySettings;
import org.opensearch.telemetry.TelemetrySettings;

import java.util.List;
import java.util.Objects;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingDecision;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;

import static org.opensearch.telemetry.tracing.AttributeNames.TRANSPORT_ACTION;

/**
* ProbabilisticTransportActionSampler sampler samples request with action based on defined probability
*/
public class ProbabilisticTransportActionSampler implements Sampler {

private final Sampler fallbackSampler;
private Sampler actionSampler;
private final TelemetrySettings telemetrySettings;
private final Settings settings;
private double actionSamplingRatio;

/**
* Creates ProbabilisticTransportActionSampler sampler
* @param telemetrySettings TelemetrySettings
*/
private ProbabilisticTransportActionSampler(TelemetrySettings telemetrySettings, Settings settings, Sampler fallbackSampler) {
this.telemetrySettings = Objects.requireNonNull(telemetrySettings);
this.settings = Objects.requireNonNull(settings);
this.actionSamplingRatio = OTelTelemetrySettings.TRACER_SAMPLER_ACTION_PROBABILITY.get(settings);
this.actionSampler = Sampler.traceIdRatioBased(actionSamplingRatio);
this.fallbackSampler = fallbackSampler;
}

/**
* Create probabilistic transport action sampler.
*
* @param telemetrySettings the telemetry settings
* @param settings the settings
* @param fallbackSampler the fallback sampler
* @return the probabilistic transport action sampler
*/
public static Sampler create(TelemetrySettings telemetrySettings, Settings settings, Sampler fallbackSampler) {
return new ProbabilisticTransportActionSampler(telemetrySettings, settings, fallbackSampler);
}

@Override
public SamplingResult shouldSample(
Context parentContext,
String traceId,
String name,
SpanKind spanKind,
Attributes attributes,
List<LinkData> parentLinks
) {
final String action = attributes.get(AttributeKey.stringKey(TRANSPORT_ACTION));
if (action != null) {
final SamplingResult result = actionSampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
if (result.getDecision() != SamplingDecision.DROP && fallbackSampler != null) {
return fallbackSampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
}
return result;
}
if (fallbackSampler != null) return fallbackSampler.shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);

return SamplingResult.drop();

Check warning on line 83 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticTransportActionSampler.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticTransportActionSampler.java#L83

Added line #L83 was not covered by tests
}

double getSamplingRatio() {
return actionSamplingRatio;
}

@Override
public String getDescription() {
return "Transport Action Sampler";

Check warning on line 92 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticTransportActionSampler.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticTransportActionSampler.java#L92

Added line #L92 was not covered by tests
}

@Override
public String toString() {
return getDescription();

Check warning on line 97 in plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticTransportActionSampler.java

View check run for this annotation

Codecov / codecov/patch

plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticTransportActionSampler.java#L97

Added line #L97 was not covered by tests
}
}
Loading
Loading