diff --git a/extensions/aws/src/main/java/io/opentelemetry/extension/aws/AwsXrayPropagator.java b/extensions/aws/src/main/java/io/opentelemetry/extension/aws/AwsXrayPropagator.java index 928cc68af99..1ffb275d2a3 100644 --- a/extensions/aws/src/main/java/io/opentelemetry/extension/aws/AwsXrayPropagator.java +++ b/extensions/aws/src/main/java/io/opentelemetry/extension/aws/AwsXrayPropagator.java @@ -5,6 +5,9 @@ package io.opentelemetry.extension.aws; +import io.opentelemetry.api.baggage.Baggage; +import io.opentelemetry.api.baggage.BaggageBuilder; +import io.opentelemetry.api.baggage.BaggageEntry; import io.opentelemetry.api.internal.StringUtils; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanContext; @@ -18,6 +21,7 @@ import io.opentelemetry.context.propagation.TextMapSetter; import java.util.Collection; import java.util.Collections; +import java.util.function.BiConsumer; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -107,19 +111,49 @@ public void inject(Context context, @Nullable C carrier, TextMapSetter se char samplingFlag = spanContext.isSampled() ? IS_SAMPLED : NOT_SAMPLED; // TODO: Add OT trace state to the X-Ray trace header - String traceHeader = - TRACE_ID_KEY - + KV_DELIMITER - + xrayTraceId - + TRACE_HEADER_DELIMITER - + PARENT_ID_KEY - + KV_DELIMITER - + parentId - + TRACE_HEADER_DELIMITER - + SAMPLED_FLAG_KEY - + KV_DELIMITER - + samplingFlag; - setter.set(carrier, TRACE_HEADER_KEY, traceHeader); + StringBuilder traceHeader = new StringBuilder(); + traceHeader + .append(TRACE_ID_KEY) + .append(KV_DELIMITER) + .append(xrayTraceId) + .append(TRACE_HEADER_DELIMITER) + .append(PARENT_ID_KEY) + .append(KV_DELIMITER) + .append(parentId) + .append(TRACE_HEADER_DELIMITER) + .append(SAMPLED_FLAG_KEY) + .append(KV_DELIMITER) + .append(samplingFlag); + + Baggage baggage = Baggage.fromContext(context); + // Truncate baggage to 256 chars per X-Ray spec. + baggage.forEach( + new BiConsumer() { + + private int baggageWrittenBytes; + + @Override + public void accept(String key, BaggageEntry entry) { + if (key.equals(TRACE_ID_KEY) + || key.equals(PARENT_ID_KEY) + || key.equals(SAMPLED_FLAG_KEY)) { + return; + } + // Size is key/value pair, excludes delimiter. + int size = key.length() + entry.getValue().length() + 1; + if (baggageWrittenBytes + size > 256) { + return; + } + traceHeader + .append(TRACE_HEADER_DELIMITER) + .append(key) + .append(KV_DELIMITER) + .append(entry.getValue()); + baggageWrittenBytes += size; + } + }); + + setter.set(carrier, TRACE_HEADER_KEY, traceHeader.toString()); } @Override @@ -131,25 +165,23 @@ public Context extract(Context context, @Nullable C carrier, TextMapGetter SpanContext getSpanContextFromHeader( - @Nullable C carrier, TextMapGetter getter) { + private static Context getContextFromHeader( + Context context, @Nullable C carrier, TextMapGetter getter) { String traceHeader = getter.get(carrier, TRACE_HEADER_KEY); if (traceHeader == null || traceHeader.isEmpty()) { - return SpanContext.getInvalid(); + return context; } String traceId = TraceId.getInvalid(); String spanId = SpanId.getInvalid(); Boolean isSampled = false; + BaggageBuilder baggage = null; + int baggageReadBytes = 0; + int pos = 0; while (pos < traceHeader.length()) { int delimiterIndex = traceHeader.indexOf(TRACE_HEADER_DELIMITER, pos); @@ -165,11 +197,8 @@ private static SpanContext getSpanContextFromHeader( String trimmedPart = part.trim(); int equalsIndex = trimmedPart.indexOf(KV_DELIMITER); if (equalsIndex < 0) { - logger.fine( - "Error parsing X-Ray trace header. Invalid key value pair: " - + part - + " Returning INVALID span context."); - return SpanContext.getInvalid(); + logger.fine("Error parsing X-Ray trace header. Invalid key value pair: " + part); + return context; } String value = trimmedPart.substring(equalsIndex + 1); @@ -180,8 +209,13 @@ private static SpanContext getSpanContextFromHeader( spanId = parseSpanId(value); } else if (trimmedPart.startsWith(SAMPLED_FLAG_KEY)) { isSampled = parseTraceFlag(value); + } else if (baggageReadBytes + trimmedPart.length() <= 256) { + if (baggage == null) { + baggage = Baggage.builder(); + } + baggage.put(trimmedPart.substring(0, equalsIndex), value); + baggageReadBytes += trimmedPart.length(); } - // TODO: Put the arbitrary TraceHeader keys in OT trace state } if (isSampled == null) { logger.fine( @@ -189,15 +223,22 @@ private static SpanContext getSpanContextFromHeader( + TRACE_HEADER_KEY + "' with value " + traceHeader - + "'. Returning INVALID span context."); - return SpanContext.getInvalid(); + + "'."); + return context; } - return SpanContext.createFromRemoteParent( - StringUtils.padLeft(traceId, TraceId.getLength()), - spanId, - isSampled ? TraceFlags.getSampled() : TraceFlags.getDefault(), - TraceState.getDefault()); + SpanContext spanContext = + SpanContext.createFromRemoteParent( + StringUtils.padLeft(traceId, TraceId.getLength()), + spanId, + isSampled ? TraceFlags.getSampled() : TraceFlags.getDefault(), + TraceState.getDefault()); + + context = context.with(Span.wrap(spanContext)); + if (baggage != null) { + context = context.with(baggage.build()); + } + return context; } private static String parseTraceId(String xrayTraceId) { diff --git a/extensions/aws/src/test/java/io/opentelemetry/extension/aws/AwsXrayPropagatorTest.java b/extensions/aws/src/test/java/io/opentelemetry/extension/aws/AwsXrayPropagatorTest.java index ed63dbd00b0..3ef7f16e08b 100644 --- a/extensions/aws/src/test/java/io/opentelemetry/extension/aws/AwsXrayPropagatorTest.java +++ b/extensions/aws/src/test/java/io/opentelemetry/extension/aws/AwsXrayPropagatorTest.java @@ -8,6 +8,7 @@ import static io.opentelemetry.extension.aws.AwsXrayPropagator.TRACE_HEADER_KEY; import static org.assertj.core.api.Assertions.assertThat; +import io.opentelemetry.api.baggage.Baggage; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.TraceFlags; @@ -18,6 +19,8 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.Nullable; import org.junit.jupiter.api.Test; @@ -74,6 +77,63 @@ void inject_NotSampledContext() { "Root=1-8a3c60f7-d188f8fa79d48a391a778fa6;Parent=53995c3f42cd8ad8;Sampled=0"); } + @Test + void inject_WithBaggage() { + Map carrier = new LinkedHashMap<>(); + xrayPropagator.inject( + withSpanContext( + SpanContext.create( + TRACE_ID, SPAN_ID, TraceFlags.getDefault(), TraceState.getDefault()), + Context.current()) + .with( + Baggage.builder() + .put("cat", "meow") + .put("dog", "bark") + .put("Root", "ignored") + .put("Parent", "ignored") + .put("Sampled", "ignored") + .build()), + carrier, + setter); + + assertThat(carrier) + .containsEntry( + TRACE_HEADER_KEY, + "Root=1-8a3c60f7-d188f8fa79d48a391a778fa6;Parent=53995c3f42cd8ad8;Sampled=0;" + + "cat=meow;dog=bark"); + } + + @Test + void inject_WithBaggage_LimitTruncates() { + Map carrier = new LinkedHashMap<>(); + // Limit is 256 characters for all baggage. We add a 254-character key/value pair and a + // 3 character key value pair. + String key1 = Stream.generate(() -> "a").limit(252).collect(Collectors.joining()); + String value1 = "a"; // 252 + 1 (=) + 1 = 254 + + String key2 = "b"; + String value2 = "b"; // 1 + 1 (=) + 1 = 3 + + Baggage baggage = Baggage.builder().put(key1, value1).put(key2, value2).build(); + + xrayPropagator.inject( + withSpanContext( + SpanContext.create( + TRACE_ID, SPAN_ID, TraceFlags.getDefault(), TraceState.getDefault()), + Context.current()) + .with(baggage), + carrier, + setter); + + assertThat(carrier) + .containsEntry( + TRACE_HEADER_KEY, + "Root=1-8a3c60f7-d188f8fa79d48a391a778fa6;Parent=53995c3f42cd8ad8;Sampled=0;" + + key1 + + '=' + + value1); + } + @Test void inject_WithTraceState() { Map carrier = new LinkedHashMap<>(); @@ -88,7 +148,8 @@ void inject_WithTraceState() { carrier, setter); - // TODO: assert trace state when the propagator supports it + // TODO: assert trace state when the propagator supports it, for general key/value pairs we are + // mapping with baggage. assertThat(carrier) .containsEntry( TRACE_HEADER_KEY, @@ -168,11 +229,43 @@ void extract_AdditionalFields() { TRACE_HEADER_KEY, "Root=1-8a3c60f7-d188f8fa79d48a391a778fa6;Parent=53995c3f42cd8ad8;Sampled=1;Foo=Bar"); - // TODO: assert additional fields when the propagator supports it - assertThat(getSpanContext(xrayPropagator.extract(Context.current(), carrier, getter))) + Context context = xrayPropagator.extract(Context.current(), carrier, getter); + assertThat(getSpanContext(context)) + .isEqualTo( + SpanContext.createFromRemoteParent( + TRACE_ID, SPAN_ID, TraceFlags.getSampled(), TraceState.getDefault())); + assertThat(Baggage.fromContext(context).getEntryValue("Foo")).isEqualTo("Bar"); + } + + @Test + void extract_Baggage_LimitTruncates() { + // Limit is 256 characters for all baggage. We add a 254-character key/value pair and a + // 3 character key value pair. + String key1 = Stream.generate(() -> "a").limit(252).collect(Collectors.joining()); + String value1 = "a"; // 252 + 1 (=) + 1 = 254 + + String key2 = "b"; + String value2 = "b"; // 1 + 1 (=) + 1 = 3 + + Map carrier = new LinkedHashMap<>(); + carrier.put( + TRACE_HEADER_KEY, + "Root=1-8a3c60f7-d188f8fa79d48a391a778fa6;Parent=53995c3f42cd8ad8;Sampled=1;" + + key1 + + '=' + + value1 + + ';' + + key2 + + '=' + + value2); + + Context context = xrayPropagator.extract(Context.current(), carrier, getter); + assertThat(getSpanContext(context)) .isEqualTo( SpanContext.createFromRemoteParent( TRACE_ID, SPAN_ID, TraceFlags.getSampled(), TraceState.getDefault())); + assertThat(Baggage.fromContext(context).getEntryValue(key1)).isEqualTo(value1); + assertThat(Baggage.fromContext(context).getEntryValue(key2)).isNull(); } @Test