diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticSampler.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticSampler.java index 774070aa39df6..213bf73dec0a1 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticSampler.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticSampler.java @@ -10,9 +10,12 @@ import org.opensearch.telemetry.TelemetrySettings; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.context.Context; @@ -20,6 +23,8 @@ import io.opentelemetry.sdk.trace.samplers.Sampler; import io.opentelemetry.sdk.trace.samplers.SamplingResult; +import static org.opensearch.telemetry.tracing.AttributeNames.TRANSPORT_ACTION; + /** * ProbabilisticSampler implements a head-based sampling strategy based on provided settings. */ @@ -27,6 +32,8 @@ public class ProbabilisticSampler implements Sampler { private Sampler defaultSampler; private final TelemetrySettings telemetrySettings; private double samplingRatio; + private final Map actionBasedSampler; + private final Map actionBasedSamplingProbability; /** * Constructor @@ -37,9 +44,15 @@ public ProbabilisticSampler(TelemetrySettings telemetrySettings) { this.telemetrySettings = Objects.requireNonNull(telemetrySettings); this.samplingRatio = telemetrySettings.getSamplingProbability(); this.defaultSampler = Sampler.traceIdRatioBased(samplingRatio); + this.actionBasedSampler = new HashMap<>(); + this.actionBasedSamplingProbability = new HashMap<>(); } - Sampler getSampler() { + /** + * Returns custom sampler based on the type of request + * @param attributes Telemetry attributes + */ + Sampler getSampler(Attributes attributes) { double newSamplingRatio = telemetrySettings.getSamplingProbability(); if (isSamplingRatioChanged(newSamplingRatio)) { synchronized (this) { @@ -47,6 +60,17 @@ Sampler getSampler() { defaultSampler = Sampler.traceIdRatioBased(samplingRatio); } } + + final String action = attributes.get(AttributeKey.stringKey(TRANSPORT_ACTION)); + if (action != null && telemetrySettings.isActionSamplingOverrideSet(action)) { + if (actionSamplerNeedsUpdate(action)) { + synchronized (this) { + updateActionSampler(action); + } + return actionBasedSampler.get(action); + } + } + return defaultSampler; } @@ -54,10 +78,28 @@ private boolean isSamplingRatioChanged(double newSamplingRatio) { return Double.compare(this.samplingRatio, newSamplingRatio) != 0; } + private boolean actionSamplerNeedsUpdate(String action) { + return (!actionBasedSampler.containsKey(action) + || actionBasedSamplingProbability.get(action) != telemetrySettings.getActionSamplingProbability(action)); + } + + private void updateActionSampler(String action) { + double samplingRatio = telemetrySettings.getActionSamplingProbability(action); + this.actionBasedSamplingProbability.put(action, samplingRatio); + this.actionBasedSampler.put(action, Sampler.traceIdRatioBased(samplingRatio)); + } + double getSamplingRatio() { return samplingRatio; } + double getActionSamplingRatio(String action) { + if (actionBasedSamplingProbability.containsKey(action)) { + return actionBasedSamplingProbability.get(action); + } + return samplingRatio; + } + @Override public SamplingResult shouldSample( Context parentContext, @@ -67,7 +109,7 @@ public SamplingResult shouldSample( Attributes attributes, List parentLinks ) { - return getSampler().shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks); + return getSampler(attributes).shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks); } @Override diff --git a/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticSamplerTests.java b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticSamplerTests.java index 639dc341ef0db..bad81881f2915 100644 --- a/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticSamplerTests.java +++ b/plugins/telemetry-otel/src/test/java/org/opensearch/telemetry/tracing/sampler/ProbabilisticSamplerTests.java @@ -13,12 +13,17 @@ import org.opensearch.telemetry.TelemetrySettings; import org.opensearch.test.OpenSearchTestCase; +import java.util.HashMap; +import java.util.Map; import java.util.Set; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.sdk.internal.AttributesMap; import io.opentelemetry.sdk.trace.samplers.Sampler; import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_DELAY_SETTING; import static org.opensearch.telemetry.TelemetrySettings.TRACER_ENABLED_SETTING; +import static org.opensearch.telemetry.TelemetrySettings.TRACER_SAMPLER_ACTION_PROBABILITY; import static org.opensearch.telemetry.TelemetrySettings.TRACER_SAMPLER_PROBABILITY; public class ProbabilisticSamplerTests extends OpenSearchTestCase { @@ -35,11 +40,12 @@ public void testDefaultGetSampler() { Settings.EMPTY, new ClusterSettings(settings, Set.of(TRACER_SAMPLER_PROBABILITY, TRACER_ENABLED_SETTING)) ); + AttributesMap attributes = AttributesMap.create(1, 100); // Probabilistic Sampler ProbabilisticSampler probabilisticSampler = new ProbabilisticSampler(telemetrySettings); - assertNotNull(probabilisticSampler.getSampler()); + assertNotNull(probabilisticSampler.getSampler(attributes)); assertEquals(0.01, probabilisticSampler.getSamplingRatio(), 0.0d); } @@ -49,6 +55,7 @@ public void testGetSamplerWithUpdatedSamplingRatio() { Settings.EMPTY, new ClusterSettings(settings, Set.of(TRACER_SAMPLER_PROBABILITY, TRACER_ENABLED_SETTING)) ); + AttributesMap attributes = AttributesMap.create(1, 100); // Probabilistic Sampler ProbabilisticSampler probabilisticSampler = new ProbabilisticSampler(telemetrySettings); @@ -57,8 +64,45 @@ public void testGetSamplerWithUpdatedSamplingRatio() { telemetrySettings.setSamplingProbability(0.02); // Need to call getSampler() to update the value of tracerHeadSamplerSamplingRatio - Sampler updatedProbabilisticSampler = probabilisticSampler.getSampler(); + Sampler updatedProbabilisticSampler = probabilisticSampler.getSampler(attributes); assertEquals(0.02, probabilisticSampler.getSamplingRatio(), 0.0d); } + public void testGetSamplerWithCustomActionSamplingRatio() { + Settings settings = Settings.builder().put(TRACER_EXPORTER_DELAY_SETTING.getKey(), "1s").build(); + TelemetrySettings telemetrySettings = new TelemetrySettings( + Settings.EMPTY, + new ClusterSettings(settings, Set.of(TRACER_SAMPLER_PROBABILITY, TRACER_ENABLED_SETTING, TRACER_SAMPLER_ACTION_PROBABILITY)) + ); + telemetrySettings.setSamplingProbability(0.02); + AttributesMap attributes = AttributesMap.create(1, 100); + + // Probabilistic Sampler + ProbabilisticSampler probabilisticSampler = new ProbabilisticSampler(telemetrySettings); + + Map filters = new HashMap<>(); + // Setting 100% sampling for indexing request + filters.put("indexing", 1.00); + // Setting 50% sampling rate for query request + filters.put("query", 0.5); + telemetrySettings.setActionSamplingProbability(filters); + + attributes.put(AttributeKey.stringKey("action"), "indexing"); + probabilisticSampler.getSampler(attributes); + + // Validates sampling probability for general request. + assertEquals(0.02, probabilisticSampler.getActionSamplingRatio("dummy"), 0.0d); + // Validates sampling probability for indexing request as override is present for it. + assertEquals(1.00, probabilisticSampler.getActionSamplingRatio("indexing"), 0.0d); + // Validates sampling probability for query request as override is present for it. + assertEquals(0.02, probabilisticSampler.getActionSamplingRatio("query"), 0.0d); + + filters.put("indexing", 0.30); + telemetrySettings.setActionSamplingProbability(filters); + // Need to call getSampler() to update the value of tracerHeadSamplerSamplingRatio + probabilisticSampler.getSampler(attributes); + // Validates sampling probability for indexing request as override is present for it. + assertEquals(0.30, probabilisticSampler.getActionSamplingRatio("indexing"), 0.0d); + } + } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 74224d66400da..dc7144cd55c2b 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -690,6 +690,10 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING ), List.of(FeatureFlags.TELEMETRY), - List.of(TelemetrySettings.TRACER_ENABLED_SETTING, TelemetrySettings.TRACER_SAMPLER_PROBABILITY) + List.of( + TelemetrySettings.TRACER_ENABLED_SETTING, + TelemetrySettings.TRACER_SAMPLER_PROBABILITY, + TelemetrySettings.TRACER_SAMPLER_ACTION_PROBABILITY + ) ); } diff --git a/server/src/main/java/org/opensearch/telemetry/TelemetrySettings.java b/server/src/main/java/org/opensearch/telemetry/TelemetrySettings.java index dc0b04244296f..b1296d2a0c91d 100644 --- a/server/src/main/java/org/opensearch/telemetry/TelemetrySettings.java +++ b/server/src/main/java/org/opensearch/telemetry/TelemetrySettings.java @@ -13,6 +13,9 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import java.util.HashMap; +import java.util.Map; + /** * Wrapper class to encapsulate tracing related settings * @@ -39,15 +42,27 @@ public class TelemetrySettings { Setting.Property.Dynamic ); + /** + * Probability of action based sampler + */ + public static final Setting.AffixSetting TRACER_SAMPLER_ACTION_PROBABILITY = Setting.affixKeySetting( + "telemetry.tracer.sampler.", + "probability", + (ns, key) -> Setting.doubleSetting(key, 0.00d, 0.00d, 1.00d, Setting.Property.Dynamic, Setting.Property.NodeScope) + ); + private volatile boolean tracingEnabled; private volatile double samplingProbability; + private volatile Map affixSamplingProbability; public TelemetrySettings(Settings settings, ClusterSettings clusterSettings) { this.tracingEnabled = TRACER_ENABLED_SETTING.get(settings); this.samplingProbability = TRACER_SAMPLER_PROBABILITY.get(settings); + this.affixSamplingProbability = new HashMap<>(); clusterSettings.addSettingsUpdateConsumer(TRACER_ENABLED_SETTING, this::setTracingEnabled); clusterSettings.addSettingsUpdateConsumer(TRACER_SAMPLER_PROBABILITY, this::setSamplingProbability); + clusterSettings.addAffixMapUpdateConsumer(TRACER_SAMPLER_ACTION_PROBABILITY, this::setActionSamplingProbability, (a, b) -> {}); } public void setTracingEnabled(boolean tracingEnabled) { @@ -66,10 +81,44 @@ public void setSamplingProbability(double samplingProbability) { this.samplingProbability = samplingProbability; } + /** + * Set sampling ratio for action + * @param filters map of action and corresponding value + */ + public void setActionSamplingProbability(Map filters) { + synchronized (this) { + for (String name : filters.keySet()) { + this.affixSamplingProbability.put(name, filters.get(name)); + } + } + } + /** * Get sampling ratio + * @return double */ public double getSamplingProbability() { return samplingProbability; } + + /** + * Returns if sampling override is set for action + * @param action string + * @return boolean if sampling override is present for an action + */ + public boolean isActionSamplingOverrideSet(String action) { + return affixSamplingProbability.containsKey(action); + } + + /** + * Get action sampling ratio + * @param action string + * @return double value of sampling probability for that action + */ + public double getActionSamplingProbability(String action) { + if (affixSamplingProbability.containsKey(action)) { + return this.affixSamplingProbability.get(action); + } + return samplingProbability; + } }