Skip to content

Commit

Permalink
Added custom sampler support based on action in request
Browse files Browse the repository at this point in the history
Signed-off-by: Dev Agarwal <devagarwal1803@gmail.com>
  • Loading branch information
devagarwal1803 committed Sep 20, 2023
1 parent 2f7969a commit 09f3c9b
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,30 @@

import org.opensearch.telemetry.TelemetrySettings;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;

import static org.opensearch.telemetry.tracing.AttributeNames.TRANSPORT_ACTION;

/**
* ProbabilisticSampler implements a head-based sampling strategy based on provided settings.
*/
public class ProbabilisticSampler implements Sampler {
private Sampler defaultSampler;
private final TelemetrySettings telemetrySettings;
private double samplingRatio;
private final Map<String, Sampler> actionBasedSampler;
private final Map<String, Double> actionBasedSamplingProbability;

/**
* Constructor
Expand All @@ -37,27 +44,62 @@ public ProbabilisticSampler(TelemetrySettings telemetrySettings) {
this.telemetrySettings = Objects.requireNonNull(telemetrySettings);
this.samplingRatio = telemetrySettings.getSamplingProbability();
this.defaultSampler = Sampler.traceIdRatioBased(samplingRatio);
this.actionBasedSampler = new HashMap<>();
this.actionBasedSamplingProbability = new HashMap<>();
}

Sampler getSampler() {
/**
* Returns custom sampler based on the type of request
* @param attributes Telemetry attributes
*/
Sampler getSampler(Attributes attributes) {
double newSamplingRatio = telemetrySettings.getSamplingProbability();
if (isSamplingRatioChanged(newSamplingRatio)) {
synchronized (this) {
this.samplingRatio = newSamplingRatio;
defaultSampler = Sampler.traceIdRatioBased(samplingRatio);
}
}

final String action = attributes.get(AttributeKey.stringKey(TRANSPORT_ACTION));
if (action != null && telemetrySettings.isActionSamplingOverrideSet(action)) {
if (actionSamplerNeedsUpdate(action)) {
synchronized (this) {
updateActionSampler(action);
}
return actionBasedSampler.get(action);
}
}

return defaultSampler;
}

private boolean isSamplingRatioChanged(double newSamplingRatio) {
return Double.compare(this.samplingRatio, newSamplingRatio) != 0;
}

private boolean actionSamplerNeedsUpdate(String action) {
return (!actionBasedSampler.containsKey(action)
|| actionBasedSamplingProbability.get(action) != telemetrySettings.getActionSamplingProbability(action));
}

private void updateActionSampler(String action) {
double samplingRatio = telemetrySettings.getActionSamplingProbability(action);
this.actionBasedSamplingProbability.put(action, samplingRatio);
this.actionBasedSampler.put(action, Sampler.traceIdRatioBased(samplingRatio));
}

double getSamplingRatio() {
return samplingRatio;
}

double getActionSamplingRatio(String action) {
if (actionBasedSamplingProbability.containsKey(action)) {
return actionBasedSamplingProbability.get(action);
}
return samplingRatio;
}

@Override
public SamplingResult shouldSample(
Context parentContext,
Expand All @@ -67,7 +109,7 @@ public SamplingResult shouldSample(
Attributes attributes,
List<LinkData> parentLinks
) {
return getSampler().shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
return getSampler(attributes).shouldSample(parentContext, traceId, name, spanKind, attributes, parentLinks);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,17 @@
import org.opensearch.telemetry.TelemetrySettings;
import org.opensearch.test.OpenSearchTestCase;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.sdk.internal.AttributesMap;
import io.opentelemetry.sdk.trace.samplers.Sampler;

import static org.opensearch.telemetry.OTelTelemetrySettings.TRACER_EXPORTER_DELAY_SETTING;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_ENABLED_SETTING;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_SAMPLER_ACTION_PROBABILITY;
import static org.opensearch.telemetry.TelemetrySettings.TRACER_SAMPLER_PROBABILITY;

public class ProbabilisticSamplerTests extends OpenSearchTestCase {
Expand All @@ -35,11 +40,12 @@ public void testDefaultGetSampler() {
Settings.EMPTY,
new ClusterSettings(settings, Set.of(TRACER_SAMPLER_PROBABILITY, TRACER_ENABLED_SETTING))
);
AttributesMap attributes = AttributesMap.create(1, 100);

// Probabilistic Sampler
ProbabilisticSampler probabilisticSampler = new ProbabilisticSampler(telemetrySettings);

assertNotNull(probabilisticSampler.getSampler());
assertNotNull(probabilisticSampler.getSampler(attributes));
assertEquals(0.01, probabilisticSampler.getSamplingRatio(), 0.0d);
}

Expand All @@ -49,6 +55,7 @@ public void testGetSamplerWithUpdatedSamplingRatio() {
Settings.EMPTY,
new ClusterSettings(settings, Set.of(TRACER_SAMPLER_PROBABILITY, TRACER_ENABLED_SETTING))
);
AttributesMap attributes = AttributesMap.create(1, 100);

// Probabilistic Sampler
ProbabilisticSampler probabilisticSampler = new ProbabilisticSampler(telemetrySettings);
Expand All @@ -57,8 +64,45 @@ public void testGetSamplerWithUpdatedSamplingRatio() {
telemetrySettings.setSamplingProbability(0.02);

// Need to call getSampler() to update the value of tracerHeadSamplerSamplingRatio
Sampler updatedProbabilisticSampler = probabilisticSampler.getSampler();
Sampler updatedProbabilisticSampler = probabilisticSampler.getSampler(attributes);
assertEquals(0.02, probabilisticSampler.getSamplingRatio(), 0.0d);
}

public void testGetSamplerWithCustomActionSamplingRatio() {
Settings settings = Settings.builder().put(TRACER_EXPORTER_DELAY_SETTING.getKey(), "1s").build();
TelemetrySettings telemetrySettings = new TelemetrySettings(
Settings.EMPTY,
new ClusterSettings(settings, Set.of(TRACER_SAMPLER_PROBABILITY, TRACER_ENABLED_SETTING, TRACER_SAMPLER_ACTION_PROBABILITY))
);
telemetrySettings.setSamplingProbability(0.02);
AttributesMap attributes = AttributesMap.create(1, 100);

// Probabilistic Sampler
ProbabilisticSampler probabilisticSampler = new ProbabilisticSampler(telemetrySettings);

Map<String, Double> filters = new HashMap<>();
// Setting 100% sampling for indexing request
filters.put("indexing", 1.00);
// Setting 50% sampling rate for query request
filters.put("query", 0.5);
telemetrySettings.setActionSamplingProbability(filters);

attributes.put(AttributeKey.stringKey("action"), "indexing");
probabilisticSampler.getSampler(attributes);

// Validates sampling probability for general request.
assertEquals(0.02, probabilisticSampler.getActionSamplingRatio("dummy"), 0.0d);
// Validates sampling probability for indexing request as override is present for it.
assertEquals(1.00, probabilisticSampler.getActionSamplingRatio("indexing"), 0.0d);
// Validates sampling probability for query request as override is present for it.
assertEquals(0.02, probabilisticSampler.getActionSamplingRatio("query"), 0.0d);

filters.put("indexing", 0.30);
telemetrySettings.setActionSamplingProbability(filters);
// Need to call getSampler() to update the value of tracerHeadSamplerSamplingRatio
probabilisticSampler.getSampler(attributes);
// Validates sampling probability for indexing request as override is present for it.
assertEquals(0.30, probabilisticSampler.getActionSamplingRatio("indexing"), 0.0d);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,10 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING
),
List.of(FeatureFlags.TELEMETRY),
List.of(TelemetrySettings.TRACER_ENABLED_SETTING, TelemetrySettings.TRACER_SAMPLER_PROBABILITY)
List.of(
TelemetrySettings.TRACER_ENABLED_SETTING,
TelemetrySettings.TRACER_SAMPLER_PROBABILITY,
TelemetrySettings.TRACER_SAMPLER_ACTION_PROBABILITY
)
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;

import java.util.HashMap;
import java.util.Map;

/**
* Wrapper class to encapsulate tracing related settings
*
Expand All @@ -39,15 +42,27 @@ public class TelemetrySettings {
Setting.Property.Dynamic
);

/**
* Probability of action based sampler
*/
public static final Setting.AffixSetting<Double> TRACER_SAMPLER_ACTION_PROBABILITY = Setting.affixKeySetting(
"telemetry.tracer.sampler.",
"probability",
(ns, key) -> Setting.doubleSetting(key, 0.00d, 0.00d, 1.00d, Setting.Property.Dynamic, Setting.Property.NodeScope)
);

private volatile boolean tracingEnabled;
private volatile double samplingProbability;
private volatile Map<String, Double> affixSamplingProbability;

public TelemetrySettings(Settings settings, ClusterSettings clusterSettings) {
this.tracingEnabled = TRACER_ENABLED_SETTING.get(settings);
this.samplingProbability = TRACER_SAMPLER_PROBABILITY.get(settings);
this.affixSamplingProbability = new HashMap<>();

clusterSettings.addSettingsUpdateConsumer(TRACER_ENABLED_SETTING, this::setTracingEnabled);
clusterSettings.addSettingsUpdateConsumer(TRACER_SAMPLER_PROBABILITY, this::setSamplingProbability);
clusterSettings.addAffixMapUpdateConsumer(TRACER_SAMPLER_ACTION_PROBABILITY, this::setActionSamplingProbability, (a, b) -> {});
}

public void setTracingEnabled(boolean tracingEnabled) {
Expand All @@ -66,10 +81,44 @@ public void setSamplingProbability(double samplingProbability) {
this.samplingProbability = samplingProbability;
}

/**
* Set sampling ratio for action
* @param filters map of action and corresponding value
*/
public void setActionSamplingProbability(Map<String, Double> filters) {
synchronized (this) {
for (String name : filters.keySet()) {
this.affixSamplingProbability.put(name, filters.get(name));
}
}
}

/**
* Get sampling ratio
* @return double
*/
public double getSamplingProbability() {
return samplingProbability;
}

/**
* Returns if sampling override is set for action
* @param action string
* @return boolean if sampling override is present for an action
*/
public boolean isActionSamplingOverrideSet(String action) {
return affixSamplingProbability.containsKey(action);
}

/**
* Get action sampling ratio
* @param action string
* @return double value of sampling probability for that action
*/
public double getActionSamplingProbability(String action) {
if (affixSamplingProbability.containsKey(action)) {
return this.affixSamplingProbability.get(action);
}
return samplingProbability;
}
}

0 comments on commit 09f3c9b

Please sign in to comment.