diff --git a/RATIONALE.md b/RATIONALE.md new file mode 100644 index 00000000..380ce514 --- /dev/null +++ b/RATIONALE.md @@ -0,0 +1,55 @@ +# zipkin-reporter rationale + +## Sending an empty list is permitted + +Historically, we had a `Sender.check()` function for fail fast reasons, but it was rarely used and +rarely implemented correctly. In some cases, people returned `OK` having no knowledge of if the +health was good or not. In one case, Stackdriver, a seemingly good implementation was avoided for +directly sending an empty list of spans, until `check()` was changed to do the same. Rather than +define a poorly implementable `Sender.check()` which would likely still require sending an empty +list, we decided to document a call to send no spans should pass through. + +Two known examples of using `check()` were in server modules that forward spans with zipkin reporter +and finagle. `zipkin-finagle` is no longer maintained, so we'll focus on the server modules. + +zipkin-stackdriver (now zipkin-gcp) was both important to verify and difficult to implement a +meaningful `check()`. First attempts looked good, but would pass even when users had no permission +to write spans. For this reason, people ignored the check and did out-of-band sending zero spans to +the POST endpoint. Later, this logic was made the primary impl of `check()`. + +In HTTP senders a `check()` would be invalid for non-intuitive reasons unless you also just posted +no spans. For example, while zipkin has a `/health` endpoint, most clones do not implement that or +put it at a different path. So, you can't check with `/health` and are left with either falsely +returning `OK` or sending an empty list of spans. + +Note that zipkin server does obviate calls to storage when incoming lists are empty. This is not +just for things like this, but 3rd party instrumentation which bugged out and sent no spans. + +Messaging senders came close to implementing health except would suffer similar problems as +Stackdriver did. For example, verifying broker connectivity doesn't mean the queue or topic works. +While you can dig around and solve this for some brokers, it ends up the same situation. + +Another way could be to catch an exception from a prior "POST", and if that failed, return a +corresponding status. This could not be for fail-fast because the caller wouldn't have any spans to +send, yet. It is complicated code for a function uncommon in instrumentation and the impl would be +hard to reason with concurrently. + +The main problem is that we used the same `Component` type in reporter as we did for zipkin server, +which defined `check()` in a hardly used and hardly implementable way except sending no spans. + +We had the following choices: + +* force implementation of `check()` knowing its problems and that it is usual in instrumentation +* document that implementors can skip `send(empty)` even though call sites use this today +* document that you should not skip `send(empty)`, so that the few callers can use it for fail-fast + +The main driving points were how niche this function is (not called by many, or on interval), +and how much work it is to implement a `check()` vs allowing an empty send to proceed. In the +current code base, the only work required for the latter was documentation, as all senders would +pass an empty list. Secondary driving force was that the `BytesMessageSender` main goal is easier +implementation and re-introducing a bad `check()` api gets in the way of this. + +Due to the complexity of this problem, we decided that rather to leave empty undefined, document +sending empty is ok. This allows a couple users to implement a fail-fast in a portable way, without +burdening implementers of `BytesMessageSender` with an unimplementable or wrong `check()` function +for most platforms. diff --git a/README.md b/README.md index 5a830ece..64c21252 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ Reporter.CONSOLE.report(span); ## AsyncReporter AsyncReporter is how you actually get spans to zipkin. By default, it waits up to a second -before flushes any pending spans out of process via a Sender. +before flushes any pending spans out of process via a BytesMessageSender. ```java reporter = AsyncReporter.create(URLConnectionSender.create("http://localhost:9411/api/v2/spans")); @@ -67,7 +67,7 @@ Here are the most important properties to understand when tuning. Property | Description --- | --- `queuedMaxBytes` | Maximum backlog of span bytes reported vs sent. Corresponds to `ReporterMetrics.updateQueuedBytes`. Default 1% of heap -`messageMaxBytes` | Maximum bytes sendable per message including overhead. Default `500,000` bytes (`500KB`). Defined by `Sender.messageMaxBytes` +`messageMaxBytes` | Maximum bytes sendable per message including overhead. Default `500,000` bytes (`500KB`). Defined by `BytesMessageSender.messageMaxBytes` `messageTimeout` | Maximum time to wait for messageMaxBytes to accumulate before sending. Default 1 second `closeTimeout` | Maximum time to block for in-flight spans to send on close. Default 1 second @@ -84,11 +84,12 @@ by a large `messageTimeout` or `messageMaxBytes`. Consider lowering the `messageMaxBytes` if this occurs, as it will result in less work per message. -## Sender +## BytesMessageSender The sender component handles the last step of sending a list of encoded spans onto a transport. -This involves I/O, so you can call `Sender.check()` to check its health on a given frequency. +This involves I/O, so you can call `sender.send(Collections.emptyList())` to check it works before +using. -Sender is used by AsyncReporter, but you can also create your own if you need to. +BytesMessageSender is used by AsyncReporter, but you can also create your own if you need to. ```java class CustomReporter implements Flushable { @@ -99,7 +100,12 @@ class CustomReporter implements Flushable { // Is the connection healthy? public boolean ok() { - return sender.check().ok(); + try { + sender.send(Collections.emptyList()); + return true; + } catch (Exception e) { + return false; + } } public void report(Span span) { @@ -113,7 +119,7 @@ class CustomReporter implements Flushable { pending.drainTo(drained); if (drained.isEmpty()) return; - sender.sendSpans(drained, callback).execute(); + sender.send(drained); } ``` diff --git a/activemq-client/pom.xml b/activemq-client/pom.xml index 66d7c8c4..b6a1e50f 100644 --- a/activemq-client/pom.xml +++ b/activemq-client/pom.xml @@ -18,7 +18,7 @@ zipkin-reporter-parent io.zipkin.reporter2 - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT 4.0.0 diff --git a/activemq-client/src/main/java/zipkin2/reporter/activemq/ActiveMQSender.java b/activemq-client/src/main/java/zipkin2/reporter/activemq/ActiveMQSender.java index 6735271a..dedfa3e0 100644 --- a/activemq-client/src/main/java/zipkin2/reporter/activemq/ActiveMQSender.java +++ b/activemq-client/src/main/java/zipkin2/reporter/activemq/ActiveMQSender.java @@ -19,7 +19,9 @@ import javax.jms.JMSException; import javax.jms.QueueSender; import org.apache.activemq.ActiveMQConnectionFactory; +import zipkin2.reporter.AsyncReporter; import zipkin2.reporter.BytesMessageEncoder; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Call; import zipkin2.reporter.Callback; import zipkin2.reporter.CheckResult; @@ -32,7 +34,7 @@ * *

Usage

*

- * This type is designed for {@link zipkin2.reporter.AsyncReporter.Builder#builder(Sender) the async + * This type is designed for {@link AsyncReporter.Builder#builder(BytesMessageSender) the async * reporter}. * *

Here's a simple configuration, configured for json: @@ -150,13 +152,33 @@ public final ActiveMQSender build() { return encoding.listSizeInBytes(encodedSpans); } - @Override public Call sendSpans(List encodedSpans) { + /** {@inheritDoc} */ + @Override @Deprecated public Call sendSpans(List encodedSpans) { if (closeCalled) throw new ClosedSenderException(); byte[] message = encoder.encode(encodedSpans); return new ActiveMQCall(message); } - @Override public CheckResult check() { + /** {@inheritDoc} */ + @Override public void send(List encodedSpans) throws IOException { + if (closeCalled) throw new ClosedSenderException(); + send(encoder.encode(encodedSpans)); + } + + void send(byte[] message) throws IOException { + try { + ActiveMQConn conn = lazyInit.get(); + QueueSender sender = conn.sender; + BytesMessage bytesMessage = conn.session.createBytesMessage(); + bytesMessage.writeBytes(message); + sender.send(bytesMessage); + } catch (JMSException e) { + throw ioException("Unable to send message: ", e); + } + } + + /** {@inheritDoc} */ + @Override @Deprecated public CheckResult check() { try { lazyInit.get(); } catch (Throwable t) { @@ -171,7 +193,7 @@ public final ActiveMQSender build() { lazyInit.close(); } - @Override public final String toString() { + @Override public String toString() { return "ActiveMQSender{" + "brokerURL=" + lazyInit.connectionFactory.getBrokerURL() + ", queue=" + lazyInit.queue @@ -186,29 +208,17 @@ final class ActiveMQCall extends Call.Base { // ActiveMQCall is not cancel } @Override protected Void doExecute() throws IOException { - send(); + send(message); return null; } - void send() throws IOException { - try { - ActiveMQConn conn = lazyInit.get(); - QueueSender sender = conn.sender; - BytesMessage bytesMessage = conn.session.createBytesMessage(); - bytesMessage.writeBytes(message); - sender.send(bytesMessage); - } catch (JMSException e) { - throw ioException("Unable to send message: ", e); - } - } - @Override public Call clone() { return new ActiveMQCall(message); } @Override protected void doEnqueue(Callback callback) { try { - send(); + send(message); callback.onSuccess(null); } catch (Throwable t) { Call.propagateIfFatal(t); diff --git a/activemq-client/src/test/java/zipkin2/reporter/activemq/ITActiveMQSender.java b/activemq-client/src/test/java/zipkin2/reporter/activemq/ITActiveMQSender.java index 4a23094e..f363e9e7 100644 --- a/activemq-client/src/test/java/zipkin2/reporter/activemq/ITActiveMQSender.java +++ b/activemq-client/src/test/java/zipkin2/reporter/activemq/ITActiveMQSender.java @@ -14,6 +14,7 @@ package zipkin2.reporter.activemq; import java.io.IOException; +import java.util.Collections; import java.util.stream.Stream; import javax.jms.BytesMessage; import javax.jms.MessageConsumer; @@ -25,12 +26,10 @@ import org.testcontainers.junit.jupiter.Testcontainers; import zipkin2.Span; import zipkin2.codec.SpanBytesDecoder; -import zipkin2.reporter.SpanBytesEncoder; import zipkin2.reporter.AsyncReporter; -import zipkin2.reporter.Call; -import zipkin2.reporter.CheckResult; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; +import zipkin2.reporter.SpanBytesEncoder; import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; @@ -43,55 +42,46 @@ class ITActiveMQSender { @Container ActiveMQContainer activemq = new ActiveMQContainer(); - @Test void checkPasses() { + @Test void emptyOk() throws Exception { try (ActiveMQSender sender = activemq.newSenderBuilder("checkPasses").build()) { - assertThat(sender.check().ok()).isTrue(); - } - } - - @Test void checkFalseWhenBrokerIsDown() { - // we can be pretty certain ActiveMQ isn't running on localhost port 80 - try (ActiveMQSender sender = ActiveMQSender.create("tcp://localhost:80")) { - CheckResult check = sender.check(); - assertThat(check.ok()).isFalse(); - assertThat(check.error()).isInstanceOf(IOException.class); + sender.send(Collections.emptyList()); } } @Test void sendFailsWithInvalidActiveMqServer() { // we can be pretty certain ActiveMQ isn't running on localhost port 80 try (ActiveMQSender sender = ActiveMQSender.create("tcp://localhost:80")) { - assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN).execute()).isInstanceOf( - IOException.class) + assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN)) + .isInstanceOf(IOException.class) .hasMessageContaining("Unable to establish connection to ActiveMQ broker"); } } - @Test void sendsSpans() throws Exception { - try (ActiveMQSender sender = activemq.newSenderBuilder("sendsSpans").build()) { - send(sender, CLIENT_SPAN, CLIENT_SPAN).execute(); + @Test void send() throws Exception { + try (ActiveMQSender sender = activemq.newSenderBuilder("send").build()) { + send(sender, CLIENT_SPAN, CLIENT_SPAN); assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender))).containsExactly( CLIENT_SPAN, CLIENT_SPAN); } } - @Test void sendsSpans_PROTO3() throws Exception { - try (ActiveMQSender sender = activemq.newSenderBuilder("sendsSpans_PROTO3") + @Test void send_PROTO3() throws Exception { + try (ActiveMQSender sender = activemq.newSenderBuilder("send_PROTO3") .encoding(Encoding.PROTO3) .build()) { - send(sender, CLIENT_SPAN, CLIENT_SPAN).execute(); + send(sender, CLIENT_SPAN, CLIENT_SPAN); assertThat(SpanBytesDecoder.PROTO3.decodeList(readMessage(sender))).containsExactly( CLIENT_SPAN, CLIENT_SPAN); } } - @Test void sendsSpans_THRIFT() throws Exception { - try (ActiveMQSender sender = activemq.newSenderBuilder("sendsSpans_THRIFT") + @Test void send_THRIFT() throws Exception { + try (ActiveMQSender sender = activemq.newSenderBuilder("send_THRIFT") .encoding(Encoding.THRIFT) .build()) { - send(sender, CLIENT_SPAN, CLIENT_SPAN).execute(); + send(sender, CLIENT_SPAN, CLIENT_SPAN); assertThat(SpanBytesDecoder.THRIFT.decodeList(readMessage(sender))).containsExactly( CLIENT_SPAN, CLIENT_SPAN); @@ -101,16 +91,16 @@ class ITActiveMQSender { @Test void illegalToSendWhenClosed() { try (ActiveMQSender sender = activemq.newSenderBuilder("illegalToSendWhenClosed").build()) { sender.close(); - assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN).execute()).isInstanceOf( + assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN)).isInstanceOf( IllegalStateException.class); } } /** - * The output of toString() on {@link Sender} implementations appears in thread names created by - * {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other monitoring - * tools, care should be taken to ensure the toString() output is a reasonable length and does not - * contain sensitive information. + * The output of toString() on {@link BytesMessageSender} implementations appears in thread names + * created by {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other + * monitoring tools, care should be taken to ensure the toString() output is a reasonable length + * and does not contain sensitive information. */ @Test void toStringContainsOnlySummaryInformation() { try (ActiveMQSender sender = activemq.newSenderBuilder("toString").build()) { @@ -119,7 +109,7 @@ class ITActiveMQSender { } } - Call send(ActiveMQSender sender, Span... spans) { + void send(ActiveMQSender sender, Span... spans) throws IOException { SpanBytesEncoder bytesEncoder; switch (sender.encoding()) { case JSON: @@ -134,7 +124,7 @@ Call send(ActiveMQSender sender, Span... spans) { default: throw new UnsupportedOperationException("encoding: " + sender.encoding()); } - return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); + sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); } byte[] readMessage(ActiveMQSender sender) throws Exception { diff --git a/amqp-client/pom.xml b/amqp-client/pom.xml index fc694056..d2323453 100644 --- a/amqp-client/pom.xml +++ b/amqp-client/pom.xml @@ -20,7 +20,7 @@ io.zipkin.reporter2 zipkin-reporter-parent - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT zipkin-sender-amqp-client diff --git a/amqp-client/src/main/java/zipkin2/reporter/amqp/RabbitMQSender.java b/amqp-client/src/main/java/zipkin2/reporter/amqp/RabbitMQSender.java index baed1fa1..490ee764 100644 --- a/amqp-client/src/main/java/zipkin2/reporter/amqp/RabbitMQSender.java +++ b/amqp-client/src/main/java/zipkin2/reporter/amqp/RabbitMQSender.java @@ -21,7 +21,9 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeoutException; +import zipkin2.reporter.AsyncReporter; import zipkin2.reporter.BytesMessageEncoder; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Call; import zipkin2.reporter.Callback; import zipkin2.reporter.CheckResult; @@ -36,7 +38,7 @@ * *

Usage

*

- * This type is designed for {@link zipkin2.reporter.AsyncReporter.Builder#builder(Sender) the async + * This type is designed for {@link AsyncReporter.Builder#builder(BytesMessageSender) the async * reporter}. * *

Here's a simple configuration, configured for json: @@ -69,7 +71,7 @@ * RabbitMQ failure. * *

This sender is thread-safe: a channel is created for each thread that calls - * {@link #sendSpans(List)}. + * {@link #send(List)}. */ public final class RabbitMQSender extends Sender { /** Creates a sender that sends {@link Encoding#JSON} messages. */ @@ -213,15 +215,25 @@ public Builder toBuilder() { return encoding.listSizeInBytes(encodedSizeInBytes); } - /** This sends all of the spans as a single message. */ - @Override public Call sendSpans(List encodedSpans) { + /** {@inheritDoc} */ + @Override @Deprecated public Call sendSpans(List encodedSpans) { if (closeCalled) throw new ClosedSenderException(); byte[] message = encoder.encode(encodedSpans); return new RabbitMQCall(message); } - /** Ensures there are no connection issues. */ - @Override public CheckResult check() { + /** {@inheritDoc} */ + @Override public void send(List encodedSpans) throws IOException { + if (closeCalled) throw new ClosedSenderException(); + publish(encoder.encode(encodedSpans)); + } + + void publish(byte[] message) throws IOException { + localChannel().basicPublish("", queue, null, message); + } + + /** {@inheritDoc} */ + @Override @Deprecated public CheckResult check() { try { if (localChannel().isOpen()) return CheckResult.OK; throw new IllegalStateException("Not Open"); @@ -266,7 +278,7 @@ Connection newConnection() { final ThreadLocal CHANNEL = new ThreadLocal(); /** - * In most circumstances there will only be one thread calling {@link #sendSpans(List)}, the + * In most circumstances there will only be one thread calling {@link #send(List)}, the * {@link AsyncReporter}. Just in case someone is flushing manually, we use a thread-local. All of * this is to avoid recreating a channel for each publish, as that costs two additional network * roundtrips. @@ -288,17 +300,13 @@ class RabbitMQCall extends Call.Base { // RabbitMQFuture is not cancelable } @Override protected Void doExecute() throws IOException { - publish(); + publish(message); return null; } - void publish() throws IOException { - localChannel().basicPublish("", queue, null, message); - } - @Override protected void doEnqueue(Callback callback) { try { - publish(); + publish(message); callback.onSuccess(null); } catch (Throwable t) { Call.propagateIfFatal(t); diff --git a/amqp-client/src/test/java/zipkin2/reporter/amqp/ITRabbitMQSender.java b/amqp-client/src/test/java/zipkin2/reporter/amqp/ITRabbitMQSender.java index 6cfe8674..0d4e4c2b 100644 --- a/amqp-client/src/test/java/zipkin2/reporter/amqp/ITRabbitMQSender.java +++ b/amqp-client/src/test/java/zipkin2/reporter/amqp/ITRabbitMQSender.java @@ -17,6 +17,8 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; +import java.io.IOException; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -28,9 +30,8 @@ import org.testcontainers.junit.jupiter.Testcontainers; import zipkin2.Span; import zipkin2.codec.SpanBytesDecoder; -import zipkin2.reporter.Call; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; import zipkin2.reporter.SpanBytesEncoder; import static java.util.stream.Collectors.toList; @@ -43,34 +44,40 @@ public class ITRabbitMQSender { // public for use in src/it @Container RabbitMQContainer rabbit = new RabbitMQContainer(); - @Test void sendsSpans() throws Exception { - try (RabbitMQSender sender = rabbit.newSenderBuilder("sendsSpans").build()) { + @Test void emptyOk() throws Exception { + try (RabbitMQSender sender = rabbit.newSenderBuilder("emptyOk").build()) { + sender.send(Collections.emptyList()); + } + } + + @Test void send() throws Exception { + try (RabbitMQSender sender = rabbit.newSenderBuilder("send").build()) { - send(sender, CLIENT_SPAN, CLIENT_SPAN).execute(); + send(sender, CLIENT_SPAN, CLIENT_SPAN); assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender))) .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } } - @Test void sendsSpans_PROTO3() throws Exception { - try (RabbitMQSender sender = rabbit.newSenderBuilder("sendsSpans_PROTO3") + @Test void send_PROTO3() throws Exception { + try (RabbitMQSender sender = rabbit.newSenderBuilder("send_PROTO3") .encoding(Encoding.PROTO3) .build()) { - send(sender, CLIENT_SPAN, CLIENT_SPAN).execute(); + send(sender, CLIENT_SPAN, CLIENT_SPAN); assertThat(SpanBytesDecoder.PROTO3.decodeList(readMessage(sender))) .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } } - @Test void sendsSpans_configuredQueueDoesntExist() throws Exception { + @Test void send_configuredQueueDoesntExist() throws Exception { try (RabbitMQSender sender = rabbit.newSenderBuilder("ignored") - .queue("sendsSpans_configuredQueueDoesntExist") + .queue("send_configuredQueueDoesntExist") .build()) { - send(sender, CLIENT_SPAN, CLIENT_SPAN).execute(); // doesn't raise exception + send(sender, CLIENT_SPAN, CLIENT_SPAN); // doesn't raise exception } } @@ -78,7 +85,7 @@ public class ITRabbitMQSender { // public for use in src/it try (RabbitMQSender sender = rabbit.newSenderBuilder("shouldCloseRabbitMQConnectionOnClose") .build()) { - send(sender, CLIENT_SPAN, CLIENT_SPAN).execute(); + send(sender, CLIENT_SPAN, CLIENT_SPAN); sender.close(); @@ -88,10 +95,10 @@ public class ITRabbitMQSender { // public for use in src/it } /** Blocks until the callback completes to allow read-your-writes consistency during tests. */ - static Call send(Sender sender, Span... spans) { + static void send(BytesMessageSender sender, Span... spans) throws IOException { SpanBytesEncoder bytesEncoder = sender.encoding() == Encoding.JSON ? SpanBytesEncoder.JSON_V2 : SpanBytesEncoder.PROTO3; - return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); + sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); } byte[] readMessage(RabbitMQSender sender) throws Exception { diff --git a/amqp-client/src/test/java/zipkin2/reporter/amqp/RabbitMQSenderTest.java b/amqp-client/src/test/java/zipkin2/reporter/amqp/RabbitMQSenderTest.java index ec46540c..ca223f46 100644 --- a/amqp-client/src/test/java/zipkin2/reporter/amqp/RabbitMQSenderTest.java +++ b/amqp-client/src/test/java/zipkin2/reporter/amqp/RabbitMQSenderTest.java @@ -15,9 +15,8 @@ import org.junit.jupiter.api.Test; import zipkin2.reporter.AsyncReporter; -import zipkin2.reporter.CheckResult; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.ClosedSenderException; -import zipkin2.reporter.Sender; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -29,11 +28,14 @@ class RabbitMQSenderTest { RabbitMQSender sender = RabbitMQSender.newBuilder() .connectionTimeout(100).addresses("localhost:80").build(); - @Test void checkFalseWhenRabbitMQIsDown() { - CheckResult check = sender.check(); - assertThat(check.ok()).isFalse(); - assertThat(check.error()) - .isInstanceOf(RuntimeException.class); + @Test void sendFailsWhenRabbitMQIsDown() { + // We can be pretty certain RabbitMQ isn't running on localhost port 80 + RabbitMQSender sender = RabbitMQSender.newBuilder() + .connectionTimeout(100).addresses("localhost:80").build(); + + assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Unable to establish connection to RabbitMQ server"); } @Test void illegalToSendWhenClosed() throws Exception { @@ -44,10 +46,10 @@ class RabbitMQSenderTest { } /** - * The output of toString() on {@link Sender} implementations appears in thread names created by - * {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other monitoring - * tools, care should be taken to ensure the toString() output is a reasonable length and does not - * contain sensitive information. + * The output of toString() on {@link BytesMessageSender} implementations appears in thread names + * created by {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other + * monitoring tools, care should be taken to ensure the toString() output is a reasonable length + * and does not contain sensitive information. */ @Test void toStringContainsOnlySummaryInformation() { assertThat(sender).hasToString( diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml index 4f12a332..07524801 100644 --- a/benchmarks/pom.xml +++ b/benchmarks/pom.xml @@ -20,7 +20,7 @@ io.zipkin.reporter2 zipkin-reporter-parent - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT benchmarks diff --git a/benchmarks/src/main/java/zipkin2/reporter/HttpSenderBenchmarks.java b/benchmarks/src/main/java/zipkin2/reporter/HttpSenderBenchmarks.java index 74a7c3ed..37beea53 100644 --- a/benchmarks/src/main/java/zipkin2/reporter/HttpSenderBenchmarks.java +++ b/benchmarks/src/main/java/zipkin2/reporter/HttpSenderBenchmarks.java @@ -26,7 +26,7 @@ public abstract class HttpSenderBenchmarks extends SenderBenchmarks { Server server; - @Override protected Sender createSender() { + @Override protected BytesMessageSender createSender() { Route v2JsonSpans = Route.builder().methods(POST).consumes(JSON).path("/api/v2/spans").build(); server = Server.builder() .http(0) @@ -37,7 +37,7 @@ public abstract class HttpSenderBenchmarks extends SenderBenchmarks { return newHttpSender(url("/api/v2/spans")); } - abstract Sender newHttpSender(String endpoint); + abstract BytesMessageSender newHttpSender(String endpoint); @Override protected void afterSenderClose() { server.stop().join(); diff --git a/benchmarks/src/main/java/zipkin2/reporter/KafkaSenderBenchmarks.java b/benchmarks/src/main/java/zipkin2/reporter/KafkaSenderBenchmarks.java index 52d20f2d..b8805911 100644 --- a/benchmarks/src/main/java/zipkin2/reporter/KafkaSenderBenchmarks.java +++ b/benchmarks/src/main/java/zipkin2/reporter/KafkaSenderBenchmarks.java @@ -57,7 +57,7 @@ String bootstrapServer() { KafkaContainer kafka; KafkaConsumer consumer; - @Override protected Sender createSender() { + @Override protected BytesMessageSender createSender() { kafka = new KafkaContainer(); kafka.start(); diff --git a/benchmarks/src/main/java/zipkin2/reporter/OkHttpSenderBenchmarks.java b/benchmarks/src/main/java/zipkin2/reporter/OkHttpSenderBenchmarks.java index 55b14df7..4f607873 100644 --- a/benchmarks/src/main/java/zipkin2/reporter/OkHttpSenderBenchmarks.java +++ b/benchmarks/src/main/java/zipkin2/reporter/OkHttpSenderBenchmarks.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 The OpenZipkin Authors + * Copyright 2016-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -21,7 +21,7 @@ public class OkHttpSenderBenchmarks extends HttpSenderBenchmarks { - @Override Sender newHttpSender(String endpoint) { + @Override BytesMessageSender newHttpSender(String endpoint) { return OkHttpSender.create(endpoint); } diff --git a/benchmarks/src/main/java/zipkin2/reporter/URLConnectionSenderBenchmarks.java b/benchmarks/src/main/java/zipkin2/reporter/URLConnectionSenderBenchmarks.java index 789e8693..2dbf9c7a 100644 --- a/benchmarks/src/main/java/zipkin2/reporter/URLConnectionSenderBenchmarks.java +++ b/benchmarks/src/main/java/zipkin2/reporter/URLConnectionSenderBenchmarks.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 The OpenZipkin Authors + * Copyright 2016-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -21,7 +21,7 @@ public class URLConnectionSenderBenchmarks extends HttpSenderBenchmarks { - @Override Sender newHttpSender(String endpoint) { + @Override BytesMessageSender newHttpSender(String endpoint) { return URLConnectionSender.create(endpoint); } diff --git a/benchmarks/src/main/java/zipkin2/reporter/amqp/RabbitMQSenderBenchmarks.java b/benchmarks/src/main/java/zipkin2/reporter/amqp/RabbitMQSenderBenchmarks.java index 0c2b5068..2ea1dbcb 100644 --- a/benchmarks/src/main/java/zipkin2/reporter/amqp/RabbitMQSenderBenchmarks.java +++ b/benchmarks/src/main/java/zipkin2/reporter/amqp/RabbitMQSenderBenchmarks.java @@ -16,42 +16,40 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import java.io.IOException; +import java.util.Collections; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.RunnerException; import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; -import zipkin2.reporter.CheckResult; -import zipkin2.reporter.Sender; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.internal.SenderBenchmarks; public class RabbitMQSenderBenchmarks extends SenderBenchmarks { private Channel channel; - @Override protected Sender createSender() throws Exception { - RabbitMQSender result = RabbitMQSender.newBuilder() - .queue("zipkin-jmh") - .addresses("localhost:5672").build(); + @Override protected BytesMessageSender createSender() throws Exception { + RabbitMQSender sender = RabbitMQSender.newBuilder() + .queue("zipkin-jmh") + .addresses("localhost:5672").build(); - CheckResult check = result.check(); - if (!check.ok()) { - throw new IllegalStateException(check.error().getMessage(), check.error()); - } + // check sender works at all + sender.send(Collections.emptyList()); - channel = result.localChannel(); - channel.queueDelete(result.queue); - channel.queueDeclare(result.queue, false, true, true, null); + channel = sender.localChannel(); + channel.queueDelete(sender.queue); + channel.queueDeclare(sender.queue, false, true, true, null); Thread.sleep(500L); new Thread(() -> { try { - channel.basicConsume(result.queue, true, new DefaultConsumer(channel)); + channel.basicConsume(sender.queue, true, new DefaultConsumer(channel)); } catch (IOException e) { e.printStackTrace(); } }).start(); - return result; + return sender; } @Override protected void afterSenderClose() { @@ -61,8 +59,8 @@ public class RabbitMQSenderBenchmarks extends SenderBenchmarks { // Convenience main entry-point public static void main(String[] args) throws RunnerException { Options opt = new OptionsBuilder() - .include(".*" + RabbitMQSenderBenchmarks.class.getSimpleName() + ".*") - .build(); + .include(".*" + RabbitMQSenderBenchmarks.class.getSimpleName() + ".*") + .build(); new Runner(opt).run(); } diff --git a/benchmarks/src/main/java/zipkin2/reporter/internal/NoopSender.java b/benchmarks/src/main/java/zipkin2/reporter/internal/NoopSender.java index 7b444a68..f11afcb4 100644 --- a/benchmarks/src/main/java/zipkin2/reporter/internal/NoopSender.java +++ b/benchmarks/src/main/java/zipkin2/reporter/internal/NoopSender.java @@ -15,21 +15,18 @@ import java.util.List; import zipkin2.reporter.BytesMessageEncoder; -import zipkin2.reporter.Call; -import zipkin2.reporter.CheckResult; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; -final class NoopSender extends Sender { - - final Encoding encoding; +/** Encodes messages on {@link #send(List)}, but doesn't do anything else. */ +final class NoopSender extends BytesMessageSender.Base { final BytesMessageEncoder messageEncoder; /** close is typically called from a different thread */ volatile boolean closeCalled; NoopSender(Encoding encoding) { - this.encoding = encoding; + super(encoding); this.messageEncoder = BytesMessageEncoder.forEncoding(encoding); } @@ -37,25 +34,8 @@ final class NoopSender extends Sender { return Integer.MAX_VALUE; } - @Override public Encoding encoding() { - return encoding; - } - - @Override public int messageSizeInBytes(List encodedSpans) { - return encoding().listSizeInBytes(encodedSpans); - } - - @Override public int messageSizeInBytes(int encodedSizeInBytes) { - return encoding().listSizeInBytes(encodedSizeInBytes); - } - - @Override public Call sendSpans(List encodedSpans) { + @Override public void send(List encodedSpans) { messageEncoder.encode(encodedSpans); - return Call.create(null); - } - - @Override public CheckResult check() { - return CheckResult.OK; } @Override public void close() { diff --git a/benchmarks/src/main/java/zipkin2/reporter/internal/SenderBenchmarks.java b/benchmarks/src/main/java/zipkin2/reporter/internal/SenderBenchmarks.java index f32c570d..2f460c28 100644 --- a/benchmarks/src/main/java/zipkin2/reporter/internal/SenderBenchmarks.java +++ b/benchmarks/src/main/java/zipkin2/reporter/internal/SenderBenchmarks.java @@ -13,6 +13,7 @@ */ package zipkin2.reporter.internal; +import java.util.Collections; import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.AuxCounters; import org.openjdk.jmh.annotations.Benchmark; @@ -31,9 +32,9 @@ import org.openjdk.jmh.annotations.Warmup; import zipkin2.Span; import zipkin2.TestObjects; -import zipkin2.reporter.CheckResult; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.InMemoryReporterMetrics; -import zipkin2.reporter.Sender; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.SpanBytesEncoder; /** @@ -89,7 +90,7 @@ public void clean() { } } - Sender sender; + BytesMessageSender sender; AsyncReporter.BoundedAsyncReporter reporter; @@ -97,8 +98,8 @@ public void clean() { public void setup() throws Throwable { sender = createSender(); - CheckResult senderCheck = sender.check(); - if (!senderCheck.ok()) throw senderCheck.error(); + // check sender works at all + sender.send(Collections.emptyList()); reporter = (AsyncReporter.BoundedAsyncReporter) AsyncReporter.newBuilder(sender) .messageMaxBytes(messageMaxBytes) @@ -106,7 +107,7 @@ public void setup() throws Throwable { .metrics(metrics).build(SpanBytesEncoder.JSON_V2); } - protected abstract Sender createSender() throws Exception; + protected abstract BytesMessageSender createSender() throws Exception; @Setup(Level.Iteration) public void fillQueue() { diff --git a/bom/pom.xml b/bom/pom.xml index 57dbd94e..699e758e 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -20,7 +20,7 @@ io.zipkin.reporter2 zipkin-reporter-bom Zipkin Reporter BOM - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT pom Bill Of Materials POM for all Zipkin reporter artifacts diff --git a/brave/pom.xml b/brave/pom.xml index bb4c0c61..13903a79 100644 --- a/brave/pom.xml +++ b/brave/pom.xml @@ -18,7 +18,7 @@ io.zipkin.reporter2 zipkin-reporter-parent - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT 4.0.0 diff --git a/brave/src/it/no_zipkin_deps/src/test/java/zipkin2/reporter/brave/no_zipkin_deps/AsyncZipkinSpanHandlerTest.java b/brave/src/it/no_zipkin_deps/src/test/java/zipkin2/reporter/brave/no_zipkin_deps/AsyncZipkinSpanHandlerTest.java index 11e73ecd..8eff74fa 100644 --- a/brave/src/it/no_zipkin_deps/src/test/java/zipkin2/reporter/brave/no_zipkin_deps/AsyncZipkinSpanHandlerTest.java +++ b/brave/src/it/no_zipkin_deps/src/test/java/zipkin2/reporter/brave/no_zipkin_deps/AsyncZipkinSpanHandlerTest.java @@ -40,7 +40,7 @@ class AsyncZipkinSpanHandlerTest { OkHttpSender sender = OkHttpSender.newBuilder().endpoint(endpoint).compressionEnabled(false).build(); - @Test void sendsSpans() throws Exception { + @Test void send() throws Exception { server.enqueue(new MockResponse()); try (AsyncZipkinSpanHandler zipkinSpanHandler = AsyncZipkinSpanHandler.newBuilder(sender) diff --git a/brave/src/main/java/zipkin2/reporter/brave/AsyncZipkinSpanHandler.java b/brave/src/main/java/zipkin2/reporter/brave/AsyncZipkinSpanHandler.java index c52d7d20..f8594c2f 100644 --- a/brave/src/main/java/zipkin2/reporter/brave/AsyncZipkinSpanHandler.java +++ b/brave/src/main/java/zipkin2/reporter/brave/AsyncZipkinSpanHandler.java @@ -22,6 +22,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import zipkin2.reporter.BytesEncoder; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Encoding; import zipkin2.reporter.Reporter; import zipkin2.reporter.ReporterMetrics; @@ -31,7 +32,7 @@ /** * A {@link brave.handler.SpanHandler} that queues spans on {@link #end} to bundle and send as a * bulk Zipkin JSON V2 message. When the {@link - * Sender} is HTTP, the endpoint is usually "http://zipkinhost:9411/api/v2/spans". + * BytesMessageSender} is HTTP, the endpoint is usually "http://zipkinhost:9411/api/v2/spans". * *

Example: *

{@code
@@ -46,12 +47,12 @@
  */
 public final class AsyncZipkinSpanHandler extends SpanHandler implements Closeable, Flushable {
   /** @since 2.14 */
-  public static AsyncZipkinSpanHandler create(Sender sender) {
+  public static AsyncZipkinSpanHandler create(BytesMessageSender sender) {
     return newBuilder(sender).build();
   }
 
   /** @since 2.14 */
-  public static Builder newBuilder(Sender sender) {
+  public static Builder newBuilder(BytesMessageSender sender) {
     if (sender == null) throw new NullPointerException("sender == null");
     return new Builder(sender);
   }
@@ -80,7 +81,7 @@ public static final class Builder extends ZipkinSpanHandler.Builder {
       this.errorTag = handler.errorTag;
     }
 
-    Builder(Sender sender) {
+    Builder(BytesMessageSender sender) {
       this.delegate = AsyncReporter.newBuilder(sender);
       this.encoding = sender.encoding();
     }
diff --git a/brave/src/test/java/zipkin2/reporter/brave/AsyncZipkinSpanHandlerTest.java b/brave/src/test/java/zipkin2/reporter/brave/AsyncZipkinSpanHandlerTest.java
index 575ee58c..482639ff 100644
--- a/brave/src/test/java/zipkin2/reporter/brave/AsyncZipkinSpanHandlerTest.java
+++ b/brave/src/test/java/zipkin2/reporter/brave/AsyncZipkinSpanHandlerTest.java
@@ -14,9 +14,21 @@
 package zipkin2.reporter.brave;
 
 import brave.handler.MutableSpan;
+import brave.handler.SpanHandler;
+import brave.propagation.TraceContext;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
 import org.junit.jupiter.api.Test;
+import zipkin2.Span;
+import zipkin2.codec.SpanBytesDecoder;
 import zipkin2.reporter.BytesEncoder;
+import zipkin2.reporter.Call;
+import zipkin2.reporter.Callback;
 import zipkin2.reporter.Encoding;
+import zipkin2.reporter.Sender;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -50,4 +62,106 @@ class AsyncZipkinSpanHandlerTest {
       assertThat(spanReporter).isNotNull();
     }
   }
+
+  @Test void example() {
+    AtomicInteger sentSpans = new AtomicInteger();
+    try (AsyncZipkinSpanHandler spanHandler = AsyncZipkinSpanHandler.newBuilder(FakeSender.create()
+        .onSpans(spans -> sentSpans.addAndGet(spans.size())))
+      .messageTimeout(0, TimeUnit.MILLISECONDS) // no thread
+      .build()) {
+
+      TraceContext context = TraceContext.newBuilder().traceId(1).spanId(2).sampled(true).build();
+      MutableSpan span = new MutableSpan();
+      span.traceId("1");
+      span.id("2");
+      span.name("test");
+      spanHandler.end(context, span, SpanHandler.Cause.FINISHED);
+      spanHandler.flush();
+    }
+
+    assertThat(sentSpans.get()).isEqualTo(1);
+  }
+
+  @Deprecated @Test void example_deprecatedSender() {
+    AtomicInteger sentSpans = new AtomicInteger();
+    try (AsyncZipkinSpanHandler spanHandler = AsyncZipkinSpanHandler.newBuilder(
+        new DeprecatedCheatingSender(
+          spans -> sentSpans.addAndGet(spans.size())
+        ))
+      .messageTimeout(0, TimeUnit.MILLISECONDS) // no thread
+      .build()) {
+
+      TraceContext context = TraceContext.newBuilder().traceId(1).spanId(2).sampled(true).build();
+      MutableSpan span = new MutableSpan();
+      span.traceId("1");
+      span.id("2");
+      span.name("test");
+      spanHandler.end(context, span, SpanHandler.Cause.FINISHED);
+      spanHandler.flush();
+    }
+
+    assertThat(sentSpans.get()).isEqualTo(1);
+  }
+
+  @Deprecated static class DeprecatedCheatingSender extends Sender {
+    final Consumer> onSpans;
+
+    DeprecatedCheatingSender(Consumer> onSpans) {
+      this.onSpans = onSpans;
+    }
+
+    @Override public Encoding encoding() {
+      return Encoding.JSON;
+    }
+
+    @Override public int messageMaxBytes() {
+      return 500;
+    }
+
+    @Override public int messageSizeInBytes(List encodedSpans) {
+      return Encoding.JSON.listSizeInBytes(encodedSpans);
+    }
+
+    @Override public int messageSizeInBytes(int encodedSizeInBytes) {
+      return Encoding.JSON.listSizeInBytes(encodedSizeInBytes);
+    }
+
+    @Override public Call sendSpans(List encodedSpans) {
+      List decoded = encodedSpans.stream()
+        .map(SpanBytesDecoder.JSON_V2::decodeOne).
+        collect(Collectors.toList());
+      return new CheatingVoidCall(onSpans, decoded);
+    }
+  }
+
+  @Deprecated static class CheatingVoidCall extends Call {
+    final Consumer> onSpans;
+    final List spans;
+
+    CheatingVoidCall(Consumer> onSpans, List spans) {
+      this.onSpans = onSpans;
+      this.spans = spans;
+    }
+
+    @Override public Void execute() {
+      onSpans.accept(spans);
+      return null;
+    }
+
+    @Override public void enqueue(Callback callback) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override public void cancel() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override public boolean isCanceled() {
+      return false;
+    }
+
+    @Override public Call clone() {
+      throw new UnsupportedOperationException();
+    }
+  }
 }
diff --git a/brave/src/test/java/zipkin2/reporter/brave/ConvertingSpanReporterTest.java b/brave/src/test/java/zipkin2/reporter/brave/ConvertingSpanReporterTest.java
index 27c4ece7..05c4f8c0 100644
--- a/brave/src/test/java/zipkin2/reporter/brave/ConvertingSpanReporterTest.java
+++ b/brave/src/test/java/zipkin2/reporter/brave/ConvertingSpanReporterTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2023 The OpenZipkin Authors
+ * Copyright 2016-2024 The OpenZipkin Authors
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -54,24 +54,24 @@ static class ListSpanReporter extends ArrayList implements Reporter
 
   @Test void generateKindMap() {
     assertThat(ConvertingSpanReporter.generateKindMap()).containsExactly(
-        entry(CLIENT, Span.Kind.CLIENT),
-        entry(SERVER, Span.Kind.SERVER),
-        entry(PRODUCER, Span.Kind.PRODUCER),
-        entry(CONSUMER, Span.Kind.CONSUMER)
+      entry(CLIENT, Span.Kind.CLIENT),
+      entry(SERVER, Span.Kind.SERVER),
+      entry(PRODUCER, Span.Kind.PRODUCER),
+      entry(CONSUMER, Span.Kind.CONSUMER)
     );
   }
 
   @Test void equalsAndHashCode() {
     assertThat(spanReporter)
-        .hasSameHashCodeAs(spans)
-        .isEqualTo(new ConvertingSpanReporter(spans, Tags.ERROR));
+      .hasSameHashCodeAs(spans)
+      .isEqualTo(new ConvertingSpanReporter(spans, Tags.ERROR));
 
     ConvertingSpanReporter otherReporter = new ConvertingSpanReporter(spans::add, Tags.ERROR);
 
     assertThat(spanReporter)
-        .isNotEqualTo(otherReporter)
-        .extracting(Objects::hashCode)
-        .isNotEqualTo(otherReporter.hashCode());
+      .isNotEqualTo(otherReporter)
+      .extracting(Objects::hashCode)
+      .isNotEqualTo(otherReporter.hashCode());
   }
 
   @Test void convertsSampledSpan() {
@@ -79,10 +79,10 @@ static class ListSpanReporter extends ArrayList implements Reporter
     spanReporter.report(span);
 
     assertThat(spans.get(0)).usingRecursiveComparison().isEqualTo(
-        Span.newBuilder()
-            .traceId("1")
-            .id("2")
-            .build()
+      Span.newBuilder()
+        .traceId("1")
+        .id("2")
+        .build()
     );
   }
 
@@ -92,11 +92,11 @@ static class ListSpanReporter extends ArrayList implements Reporter
     spanReporter.report(span);
 
     assertThat(spans.get(0)).usingRecursiveComparison().isEqualTo(
-        Span.newBuilder()
-            .traceId("0000000000000001")
-            .id("0000000000000002")
-            .debug(true)
-            .build()
+      Span.newBuilder()
+        .traceId("0000000000000001")
+        .id("0000000000000002")
+        .debug(true)
+        .build()
     );
   }
 
@@ -121,10 +121,10 @@ static class ListSpanReporter extends ArrayList implements Reporter
 
     spanReporter.report(span);
     assertThat(spans.get(0).tags()).containsOnly(
-        entry("1", "1"),
-        entry("foo", "baz"),
-        entry("2", "2"),
-        entry("3", "3")
+      entry("1", "1"),
+      entry("foo", "baz"),
+      entry("2", "2"),
+      entry("3", "3")
     );
   }
 
@@ -138,7 +138,7 @@ static class ListSpanReporter extends ArrayList implements Reporter
     spanReporter.report(span);
 
     assertThat(spans.get(0).tags())
-        .containsOnly(entry("error", "RuntimeException"));
+      .containsOnly(entry("error", "RuntimeException"));
   }
 
   @Test void doesntOverwriteErrorTag() {
@@ -150,7 +150,7 @@ static class ListSpanReporter extends ArrayList implements Reporter
     spanReporter.report(span);
 
     assertThat(spans.get(0).tags())
-        .containsOnly(entry("error", ""));
+      .containsOnly(entry("error", ""));
   }
 
   @Test void addsAnnotations() {
@@ -163,7 +163,7 @@ static class ListSpanReporter extends ArrayList implements Reporter
     spanReporter.report(span);
 
     assertThat(spans.get(0).annotations())
-        .containsOnly(Annotation.create(2L, "foo"));
+      .containsOnly(Annotation.create(2L, "foo"));
   }
 
   @Test void finished_client() {
@@ -232,10 +232,10 @@ void flush(brave.Span.Kind braveKind, Span.Kind span2Kind) {
     MutableSpan span = new MutableSpan(context, null);
 
     Endpoint endpoint = Endpoint.newBuilder()
-        .serviceName("fooService")
-        .ip("1.2.3.4")
-        .port(80)
-        .build();
+      .serviceName("fooService")
+      .ip("1.2.3.4")
+      .port(80)
+      .build();
 
     span.kind(CLIENT);
     span.remoteServiceName(endpoint.serviceName());
@@ -246,7 +246,7 @@ void flush(brave.Span.Kind braveKind, Span.Kind span2Kind) {
     spanReporter.report(span);
 
     assertThat(spans.get(0).remoteEndpoint())
-        .isEqualTo(endpoint);
+      .isEqualTo(endpoint);
   }
 
   // This prevents the server startTimestamp from overwriting the client one on the collector
@@ -261,7 +261,7 @@ void flush(brave.Span.Kind braveKind, Span.Kind span2Kind) {
     spanReporter.report(span);
 
     assertThat(spans.get(0).shared())
-        .isTrue();
+      .isTrue();
   }
 
   @Test void flushUnstartedNeitherSetsTimestampNorDuration() {
@@ -271,6 +271,6 @@ void flush(brave.Span.Kind braveKind, Span.Kind span2Kind) {
     spanReporter.report(flushed);
 
     assertThat(spans.get(0)).extracting(Span::timestampAsLong, Span::durationAsLong)
-        .allSatisfy(u -> assertThat(u).isEqualTo(0L));
+      .allSatisfy(u -> assertThat(u).isEqualTo(0L));
   }
 }
diff --git a/brave/src/test/java/zipkin2/reporter/brave/FakeSender.java b/brave/src/test/java/zipkin2/reporter/brave/FakeSender.java
index 5d071c32..d9792361 100644
--- a/brave/src/test/java/zipkin2/reporter/brave/FakeSender.java
+++ b/brave/src/test/java/zipkin2/reporter/brave/FakeSender.java
@@ -21,13 +21,12 @@
 import zipkin2.codec.SpanBytesDecoder;
 import zipkin2.reporter.BytesEncoder;
 import zipkin2.reporter.BytesMessageEncoder;
-import zipkin2.reporter.Call;
+import zipkin2.reporter.BytesMessageSender;
 import zipkin2.reporter.ClosedSenderException;
 import zipkin2.reporter.Encoding;
-import zipkin2.reporter.Sender;
 import zipkin2.reporter.SpanBytesEncoder;
 
-public final class FakeSender extends Sender {
+public final class FakeSender extends BytesMessageSender.Base {
 
   public static FakeSender create() {
     return new FakeSender(Encoding.JSON, Integer.MAX_VALUE,
@@ -36,7 +35,6 @@ public static FakeSender create() {
     });
   }
 
-  final Encoding encoding;
   final int messageMaxBytes;
   final BytesMessageEncoder messageEncoder;
   final BytesEncoder encoder;
@@ -45,7 +43,7 @@ public static FakeSender create() {
 
   FakeSender(Encoding encoding, int messageMaxBytes, BytesMessageEncoder messageEncoder,
     BytesEncoder encoder, BytesDecoder decoder, Consumer> onSpans) {
-    this.encoding = encoding;
+    super(encoding);
     this.messageMaxBytes = messageMaxBytes;
     this.messageEncoder = messageEncoder;
     this.encoder = encoder;
@@ -60,30 +58,21 @@ FakeSender encoding(Encoding encoding) {
       onSpans);
   }
 
-  @Override public Encoding encoding() {
-    return encoding;
+  FakeSender onSpans(Consumer> onSpans) {
+    return new FakeSender(encoding, messageMaxBytes, messageEncoder, encoder, decoder, onSpans);
   }
 
   @Override public int messageMaxBytes() {
     return messageMaxBytes;
   }
 
-  @Override public int messageSizeInBytes(List encodedSpans) {
-    return encoding.listSizeInBytes(encodedSpans);
-  }
-
-  @Override public int messageSizeInBytes(int encodedSizeInBytes) {
-    return encoding.listSizeInBytes(encodedSizeInBytes);
-  }
-
   /** close is typically called from a different thread */
   volatile boolean closeCalled;
 
-  @Override public Call sendSpans(List encodedSpans) {
+  @Override public void send(List encodedSpans) {
     if (closeCalled) throw new ClosedSenderException();
     List decoded = encodedSpans.stream().map(decoder::decodeOne).collect(Collectors.toList());
     onSpans.accept(decoded);
-    return Call.create(null);
   }
 
   @Override public void close() {
diff --git a/core/pom.xml b/core/pom.xml
index ce560f7c..640231d6 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -20,7 +20,7 @@
   
     io.zipkin.reporter2
     zipkin-reporter-parent
-    3.1.2-SNAPSHOT
+    3.2.0-SNAPSHOT
   
 
   zipkin-reporter
diff --git a/core/src/main/java/zipkin2/reporter/AsyncReporter.java b/core/src/main/java/zipkin2/reporter/AsyncReporter.java
index a655afe4..733e85fa 100644
--- a/core/src/main/java/zipkin2/reporter/AsyncReporter.java
+++ b/core/src/main/java/zipkin2/reporter/AsyncReporter.java
@@ -13,6 +13,7 @@
  */
 package zipkin2.reporter;
 
+import java.io.Closeable;
 import java.io.Flushable;
 import java.util.List;
 import java.util.concurrent.ThreadFactory;
@@ -25,7 +26,7 @@
  *
  * 

Spans are bundled into messages based on size in bytes or a timeout, whichever happens first. * - *

The thread that sends flushes spans to the {@linkplain Sender} does so in a synchronous loop. + *

The thread that sends flushes spans to the {@linkplain BytesMessageSender} does so in a synchronous loop. * This means that even asynchronous transports will wait for an ack before sending a next message. * We do this so that a surge of spans doesn't overrun memory or bandwidth via hundreds or * thousands of in-flight messages. The downside of this is that reporting is limited in speed to @@ -34,21 +35,21 @@ * @param type of the span, usually {@link zipkin2.Span} */ // This is effectively, but not explicitly final as it was not final in version 2.x. -public class AsyncReporter extends Component implements Reporter, Flushable { +public class AsyncReporter extends Component implements Reporter, Closeable, Flushable { /** * Builds a json reporter for Zipkin V2. If http, * the endpoint of the sender is usually "http://zipkinhost:9411/api/v2/spans". * - *

After a certain threshold, spans are drained and {@link Sender#sendSpans(List) sent} to - * Zipkin collectors. + *

After a certain threshold, spans are drained and {@link BytesMessageSender#send(List) sent} + * to Zipkin collectors. */ - public static AsyncReporter create(Sender sender) { + public static AsyncReporter create(BytesMessageSender sender) { return new Builder(sender).build(); } - /** Like {@link #create(Sender)}, except you can configure settings such as the timeout. */ - public static Builder builder(Sender sender) { + /** Like {@link #create(BytesMessageSender)}, except you can configure settings such as the timeout. */ + public static Builder builder(BytesMessageSender sender) { return new Builder(sender); } @@ -87,7 +88,7 @@ public static final class Builder { final zipkin2.reporter.internal.AsyncReporter.Builder delegate; final Encoding encoding; - Builder(Sender sender) { + Builder(BytesMessageSender sender) { this.delegate = zipkin2.reporter.internal.AsyncReporter.newBuilder(sender); this.encoding = sender.encoding(); } @@ -110,7 +111,7 @@ public Builder metrics(ReporterMetrics metrics) { /** * Maximum bytes sendable per message including overhead. Defaults to, and is limited by {@link - * Sender#messageMaxBytes()}. + * BytesMessageSender#messageMaxBytes()}. */ public Builder messageMaxBytes(int messageMaxBytes) { this.delegate.messageMaxBytes(messageMaxBytes); @@ -121,8 +122,8 @@ public Builder messageMaxBytes(int messageMaxBytes) { * Default 1 second. 0 implies spans are {@link #flush() flushed} externally. * *

Instead of sending one message at a time, spans are bundled into messages, up to {@link - * Sender#messageMaxBytes()}. This timeout ensures that spans are not stuck in an incomplete - * message. + * BytesMessageSender#messageMaxBytes()}. This timeout ensures that spans are not stuck in an + * incomplete message. * *

Note: this timeout starts when the first unsent span is reported. */ @@ -170,8 +171,9 @@ public AsyncReporter build(BytesEncoder encoder) { } } - static final class BytesEncoderAdapterimplements BytesEncoder { + static final class BytesEncoderAdapter implements BytesEncoder { final BytesEncoder delegate; + BytesEncoderAdapter(BytesEncoder delegate) { this.delegate = delegate; } diff --git a/core/src/main/java/zipkin2/reporter/AwaitableCallback.java b/core/src/main/java/zipkin2/reporter/AwaitableCallback.java index 34bfae83..a630956c 100644 --- a/core/src/main/java/zipkin2/reporter/AwaitableCallback.java +++ b/core/src/main/java/zipkin2/reporter/AwaitableCallback.java @@ -17,8 +17,10 @@ /** * Blocks until {@link Callback#onSuccess(Object)} or {@link Callback#onError(Throwable)}. + * + * @deprecated since 3.2 this is no longer used. */ -public final class AwaitableCallback implements Callback { +@Deprecated public final class AwaitableCallback implements Callback { final CountDownLatch countDown = new CountDownLatch(1); Throwable throwable; // thread visibility guaranteed by the countdown latch diff --git a/core/src/main/java/zipkin2/reporter/BytesMessageSender.java b/core/src/main/java/zipkin2/reporter/BytesMessageSender.java new file mode 100644 index 00000000..0012fb87 --- /dev/null +++ b/core/src/main/java/zipkin2/reporter/BytesMessageSender.java @@ -0,0 +1,120 @@ +/* + * Copyright 2016-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.reporter; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * Sends a list of encoded spans to a transport such as HTTP or Kafka. Usually, this involves + * encoding them into a message and enqueueing them for transport in a corresponding client library. + * The typical end recipient is a zipkin collector. + * + *

Unless mentioned otherwise, senders are not thread-safe. They were designed to be used by a + * single reporting thread, hence the operation is blocking + * + *

Those looking to initialize eagerly can {@link #send(List)} with an empty list. This can be + * used to reduce latency on the first send operation, or to fail fast. + * + *

Implementation notes + * + *

The parameter is a list of encoded spans as opposed to an encoded message. This allows + * implementations flexibility on how to encode spans into a message. For example, a large span + * might need to be sent as a separate message to avoid kafka limits. Also, logging transports like + * scribe will likely write each span as a separate log line. + * + *

This accepts a list of {@link BytesEncoder#encode(Object) encoded spans}, as opposed a list of + * spans like {@code zipkin2.Span}. This allows senders to be re-usable as model shapes change. This + * also allows them to use their most natural message type. For example, kafka would more naturally + * send messages as byte arrays. + * + * @since 3.2 + */ +public interface BytesMessageSender extends Closeable { + + /** + * Base class for implementation, which implements {@link #messageSizeInBytes(List)} and + * {@link #messageSizeInBytes(List)} with a given {@linkplain Encoding} + */ + abstract class Base implements BytesMessageSender { + protected final Encoding encoding; + + protected Base(Encoding encoding) { + this.encoding = encoding; + } + + /** {@inheritDoc} */ + @Override public Encoding encoding() { + return encoding; + } + + /** {@inheritDoc} */ + @Override public int messageSizeInBytes(List encodedSpans) { + return encoding.listSizeInBytes(encodedSpans); + } + + /** {@inheritDoc} */ + @Override public int messageSizeInBytes(int encodedSizeInBytes) { + return encoding.listSizeInBytes(encodedSizeInBytes); + } + } + + /** Returns the encoding this sender requires spans to have. */ + Encoding encoding(); + + /** + * Maximum bytes sendable per message including overhead. This can be calculated using {@link + * #messageSizeInBytes(List)} + *

+ * Defaults to 500KB as a conservative default. You may get better or reduced performance + * by changing this value based on, e.g., machine size or network bandwidth in your + * infrastructure. Finding a perfect value will require trying out different values in production, + * but the default should work well enough in most cases. + */ + int messageMaxBytes(); + + /** + * Before invoking {@link BytesMessageSender#send(List)}, callers must consider message overhead, + * which might be more than encoding overhead. This is used to not exceed {@link + * BytesMessageSender#messageMaxBytes()}. + * + *

Note this is not always {@link Encoding#listSizeInBytes(List)}, as some senders have + * inefficient list encoding. For example, Scribe base64's then tags each span with a category. + */ + int messageSizeInBytes(List encodedSpans); + + /** + * Like {@link #messageSizeInBytes(List)}, except for a single-span. This is used to ensure a span + * is never accepted that can never be sent. + * + *

Note this is not always {@link Encoding#listSizeInBytes(int)}, as some senders have + * inefficient list encoding. For example, Stackdriver's proto message contains other fields. + * + * @param encodedSizeInBytes the {@link BytesEncoder#sizeInBytes(Object) encoded size} of a span + */ + int messageSizeInBytes(int encodedSizeInBytes); + + /** + * Sends a list of encoded spans to a transport such as HTTP or Kafka. + * + *

Empty input is permitted. While async reporters in this repository will always send + * a non-empty list. Some external callers might use an empty send for fail-fast checking. If you + * obviate empty lists, you might break them. See /RATIONALE.md for more. + * + * @param encodedSpans a potentially empty list of encoded spans. + * @throws IllegalStateException if {@link #close() close} was called. + */ + void send(List encodedSpans) throws IOException; +} diff --git a/core/src/main/java/zipkin2/reporter/Call.java b/core/src/main/java/zipkin2/reporter/Call.java index a15a777c..9f566804 100644 --- a/core/src/main/java/zipkin2/reporter/Call.java +++ b/core/src/main/java/zipkin2/reporter/Call.java @@ -48,7 +48,9 @@ * * @param the success type, typically not null except when {@code V} is {@linkplain Void}. * @since 3.0 + * @deprecated since 3.2 this is no longer used. This will be removed in v4.0. */ +@Deprecated public abstract class Call implements Cloneable { /** * Returns a completed call which has the supplied value. This is useful when input parameters @@ -373,6 +375,10 @@ public void onError(Throwable t) { } } + /** + * @deprecated since 3.2 this is no longer used. This will be removed in v4.0. + */ + @Deprecated public static abstract class Base extends Call { volatile boolean canceled; boolean executed; diff --git a/core/src/main/java/zipkin2/reporter/ClosedSenderException.java b/core/src/main/java/zipkin2/reporter/ClosedSenderException.java index d8bda4e9..3e5fb6ee 100644 --- a/core/src/main/java/zipkin2/reporter/ClosedSenderException.java +++ b/core/src/main/java/zipkin2/reporter/ClosedSenderException.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 The OpenZipkin Authors + * Copyright 2016-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -13,7 +13,7 @@ */ package zipkin2.reporter; -/** An exception thrown when a {@link Sender} is used after it has been closed. */ +/** An exception thrown when a {@link BytesMessageSender} is used after it has been closed. */ public final class ClosedSenderException extends IllegalStateException { static final long serialVersionUID = -4636520624634625689L; } diff --git a/core/src/main/java/zipkin2/reporter/Component.java b/core/src/main/java/zipkin2/reporter/Component.java index 1eac9481..8bcb56ed 100644 --- a/core/src/main/java/zipkin2/reporter/Component.java +++ b/core/src/main/java/zipkin2/reporter/Component.java @@ -24,7 +24,9 @@ * avoid crashing the application graph if a network service is unavailable. * * @since 3.0 + * @deprecated since 3.2 this is no longer used. This will be removed in v4.0. */ +@Deprecated public abstract class Component implements Closeable { /** @@ -35,7 +37,10 @@ public abstract class Component implements Closeable { * possible to establish a meaningful result, and be safe to call many times, even concurrently. * * @see CheckResult#OK + * @deprecated since 3.2 this is no longer used. If you need to check a sender, send a zero-length + * list of spans. This will be removed in v4.0. */ + @Deprecated public CheckResult check() { return CheckResult.OK; } diff --git a/core/src/main/java/zipkin2/reporter/ReporterMetrics.java b/core/src/main/java/zipkin2/reporter/ReporterMetrics.java index cbce7a81..b7365aea 100644 --- a/core/src/main/java/zipkin2/reporter/ReporterMetrics.java +++ b/core/src/main/java/zipkin2/reporter/ReporterMetrics.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 The OpenZipkin Authors + * Copyright 2016-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -70,7 +70,7 @@ public interface ReporterMetrics { * *

This is a function of span bytes per message and overhead * - * @see Sender#messageSizeInBytes + * @see BytesMessageSender#messageSizeInBytes */ void incrementMessageBytes(int quantity); diff --git a/core/src/main/java/zipkin2/reporter/Sender.java b/core/src/main/java/zipkin2/reporter/Sender.java index 071bc65c..2b3861fb 100644 --- a/core/src/main/java/zipkin2/reporter/Sender.java +++ b/core/src/main/java/zipkin2/reporter/Sender.java @@ -13,6 +13,7 @@ */ package zipkin2.reporter; +import java.io.IOException; import java.util.Collections; import java.util.List; @@ -40,42 +41,13 @@ * send messages as byte arrays. * * @since 3.0 + * @deprecated since 3.2, use {@link BytesMessageSender} instead. This will be removed in v4.0. */ -public abstract class Sender extends Component { +@Deprecated +public abstract class Sender extends Component implements BytesMessageSender { - /** Returns the encoding this sender requires spans to have. */ - public abstract Encoding encoding(); - - /** - * Maximum bytes sendable per message including overhead. This can be calculated using {@link - * #messageSizeInBytes(List)} - *

- * Defaults to 500KB as a conservative default. You may get better or reduced performance - * by changing this value based on, e.g., machine size or network bandwidth in your - * infrastructure. Finding a perfect value will require trying out different values in production, - * but the default should work well enough in most cases. - */ - public abstract int messageMaxBytes(); - - /** - * Before invoking {@link Sender#sendSpans(List)}, callers must consider message overhead, which - * might be more than encoding overhead. This is used to not exceed {@link - * Sender#messageMaxBytes()}. - * - *

Note this is not always {@link Encoding#listSizeInBytes(List)}, as some senders have - * inefficient list encoding. For example, Scribe base64's then tags each span with a category. - */ - public abstract int messageSizeInBytes(List encodedSpans); - - /** - * Like {@link #messageSizeInBytes(List)}, except for a single-span. This is used to ensure a span - * is never accepted that can never be sent. - * - *

Always override this, which is only abstract as added after version 2.0 - * - * @param encodedSizeInBytes the {@link BytesEncoder#sizeInBytes(Object) encoded size} of a span - */ - public int messageSizeInBytes(int encodedSizeInBytes) { + /** {@inheritDoc} */ + @Override public int messageSizeInBytes(int encodedSizeInBytes) { return messageSizeInBytes(Collections.singletonList(new byte[encodedSizeInBytes])); } @@ -84,6 +56,13 @@ public int messageSizeInBytes(int encodedSizeInBytes) { * * @param encodedSpans list of encoded spans. * @throws IllegalStateException if {@link #close() close} was called. + * @deprecated since 3.2, use {@link BytesMessageSender} instead. This will be removed in v4.0. */ + @Deprecated public abstract Call sendSpans(List encodedSpans); + + /** {@inheritDoc} */ + @Override public void send(List encodedSpans) throws IOException { + sendSpans(encodedSpans).execute(); + } } diff --git a/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java b/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java index 6b8330fb..2c7cb3aa 100644 --- a/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java +++ b/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java @@ -13,8 +13,11 @@ */ package zipkin2.reporter.internal; +import java.io.Closeable; import java.io.Flushable; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -23,6 +26,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import zipkin2.reporter.BytesEncoder; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Call; import zipkin2.reporter.CheckResult; import zipkin2.reporter.ClosedSenderException; @@ -42,17 +46,18 @@ * *

Spans are bundled into messages based on size in bytes or a timeout, whichever happens first. * - *

The thread that sends flushes spans to the {@linkplain Sender} does so in a synchronous loop. - * This means that even asynchronous transports will wait for an ack before sending a next message. - * We do this so that a surge of spans doesn't overrun memory or bandwidth via hundreds or - * thousands of in-flight messages. The downside of this is that reporting is limited in speed to - * what a single thread can clear. When a thread cannot clear the backlog, new spans are dropped. + *

The thread that sends flushes spans to the {@linkplain BytesMessageSender} does so in a + * synchronous loop. This means that even asynchronous transports will wait for an ack before + * sending a next message. We do this so that a surge of spans doesn't overrun memory or bandwidth + * via hundreds or thousands of in-flight messages. The downside of this is that reporting is + * limited in speed to what a single thread can clear. When a thread cannot clear the backlog, new + * spans are dropped. * * @param type of the span, usually {@code zipkin2.Span} * @since 3.0 */ -public abstract class AsyncReporter extends Component implements Reporter, Flushable { - public static Builder newBuilder(Sender sender) { +public abstract class AsyncReporter extends Component implements Reporter, Closeable, Flushable { + public static Builder newBuilder(BytesMessageSender sender) { return new Builder(sender); } @@ -72,7 +77,7 @@ public static Builder newBuilder(Sender sender) { public abstract Builder toBuilder(); public static final class Builder { - final Sender sender; + final BytesMessageSender sender; ThreadFactory threadFactory = Executors.defaultThreadFactory(); ReporterMetrics metrics = ReporterMetrics.NOOP_METRICS; int messageMaxBytes; @@ -98,7 +103,7 @@ static int onePercentOfMemory() { return (int) Math.max(Math.min(Integer.MAX_VALUE, result), Integer.MIN_VALUE); } - Builder(Sender sender) { + Builder(BytesMessageSender sender) { if (sender == null) throw new NullPointerException("sender == null"); this.sender = sender; this.messageMaxBytes = sender.messageMaxBytes(); @@ -124,7 +129,7 @@ public Builder metrics(ReporterMetrics metrics) { /** * Maximum bytes sendable per message including overhead. Defaults to, and is limited by {@link - * Sender#messageMaxBytes()}. + * BytesMessageSender#messageMaxBytes()}. */ public Builder messageMaxBytes(int messageMaxBytes) { if (messageMaxBytes < 0) { @@ -138,8 +143,8 @@ public Builder messageMaxBytes(int messageMaxBytes) { * Default 1 second. 0 implies spans are {@link #flush() flushed} externally. * *

Instead of sending one message at a time, spans are bundled into messages, up to {@link - * Sender#messageMaxBytes()}. This timeout ensures that spans are not stuck in an incomplete - * message. + * BytesMessageSender#messageMaxBytes()}. This timeout ensures that spans are not stuck in an + * incomplete message. * *

Note: this timeout starts when the first unsent span is reported. */ @@ -188,7 +193,7 @@ static final class BoundedAsyncReporter extends AsyncReporter { final AtomicBoolean started, closed; final BytesEncoder encoder; final ByteBoundedQueue pending; - final Sender sender; + final BytesMessageSender sender; final int messageMaxBytes; final long messageTimeoutNanos, closeTimeoutNanos; final CountDownLatch close; @@ -273,7 +278,7 @@ void flush(BufferNextMessage bundler) { }); try { - sender.sendSpans(nextMessage).execute(); + sender.send(nextMessage); } catch (Throwable t) { // In failure case, we increment messages and spans dropped. int count = nextMessage.size(); @@ -307,8 +312,14 @@ void flush(BufferNextMessage bundler) { } } - @Override public CheckResult check() { - return sender.check(); + @Override @Deprecated public CheckResult check() { + try { + sender.send(Collections.emptyList()); + return CheckResult.OK; + } catch (Throwable t) { + Call.propagateIfFatal(t); + return CheckResult.failed(t); + } } @Override public void close() { diff --git a/core/src/test/java/zipkin2/reporter/AsyncReporterTest.java b/core/src/test/java/zipkin2/reporter/AsyncReporterTest.java new file mode 100644 index 00000000..8db12229 --- /dev/null +++ b/core/src/test/java/zipkin2/reporter/AsyncReporterTest.java @@ -0,0 +1,122 @@ +/* + * Copyright 2016-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.reporter; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.junit.jupiter.api.Test; +import zipkin2.Span; +import zipkin2.TestObjects; +import zipkin2.codec.SpanBytesDecoder; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Only tests entry points as {@link zipkin2.reporter.internal.AsyncReporter} tests covers the rest. + */ +class AsyncReporterTest { + @Test void example() { + AtomicInteger sentSpans = new AtomicInteger(); + try (AsyncReporter reporter = AsyncReporter.builder(FakeSender.create() + .onSpans(spans -> sentSpans.addAndGet(spans.size()))) + .messageTimeout(0, TimeUnit.MILLISECONDS) // no thread + .build(SpanBytesEncoder.JSON_V2)) { + + reporter.report(TestObjects.CLIENT_SPAN); + reporter.flush(); + } + + assertThat(sentSpans.get()).isEqualTo(1); + } + + @Deprecated @Test void example_deprecatedSender() { + AtomicInteger sentSpans = new AtomicInteger(); + try (AsyncReporter reporter = AsyncReporter.builder(new DeprecatedCheatingSender( + spans -> sentSpans.addAndGet(spans.size()) + )) + .messageTimeout(0, TimeUnit.MILLISECONDS) // no thread + .build(SpanBytesEncoder.JSON_V2)) { + + reporter.report(TestObjects.CLIENT_SPAN); + reporter.flush(); + } + + assertThat(sentSpans.get()).isEqualTo(1); + } + + @Deprecated static class DeprecatedCheatingSender extends Sender { + final Consumer> onSpans; + + DeprecatedCheatingSender(Consumer> onSpans) { + this.onSpans = onSpans; + } + + @Override public Encoding encoding() { + return Encoding.JSON; + } + + @Override public int messageMaxBytes() { + return 500; + } + + @Override public int messageSizeInBytes(List encodedSpans) { + return Encoding.JSON.listSizeInBytes(encodedSpans); + } + + @Override public int messageSizeInBytes(int encodedSizeInBytes) { + return Encoding.JSON.listSizeInBytes(encodedSizeInBytes); + } + + @Override public Call sendSpans(List encodedSpans) { + List decoded = encodedSpans.stream() + .map(SpanBytesDecoder.JSON_V2::decodeOne). + collect(Collectors.toList()); + return new CheatingVoidCall(onSpans, decoded); + } + } + + @Deprecated static class CheatingVoidCall extends Call { + final Consumer> onSpans; + final List spans; + + CheatingVoidCall(Consumer> onSpans, List spans) { + this.onSpans = onSpans; + this.spans = spans; + } + + @Override public Void execute() { + onSpans.accept(spans); + return null; + } + + @Override public void enqueue(Callback callback) { + throw new UnsupportedOperationException(); + } + + @Override public void cancel() { + throw new UnsupportedOperationException(); + } + + @Override public boolean isCanceled() { + return false; + } + + @Override public Call clone() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/core/src/test/java/zipkin2/reporter/internal/FakeSender.java b/core/src/test/java/zipkin2/reporter/FakeSender.java similarity index 71% rename from core/src/test/java/zipkin2/reporter/internal/FakeSender.java rename to core/src/test/java/zipkin2/reporter/FakeSender.java index 4ca9b8bf..05427fa6 100644 --- a/core/src/test/java/zipkin2/reporter/internal/FakeSender.java +++ b/core/src/test/java/zipkin2/reporter/FakeSender.java @@ -11,7 +11,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package zipkin2.reporter.internal; +package zipkin2.reporter; import java.util.List; import java.util.function.Consumer; @@ -19,15 +19,8 @@ import zipkin2.Span; import zipkin2.codec.BytesDecoder; import zipkin2.codec.SpanBytesDecoder; -import zipkin2.reporter.BytesEncoder; -import zipkin2.reporter.BytesMessageEncoder; -import zipkin2.reporter.Call; -import zipkin2.reporter.ClosedSenderException; -import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; -import zipkin2.reporter.SpanBytesEncoder; -public final class FakeSender extends Sender { +public final class FakeSender extends BytesMessageSender.Base { public static FakeSender create() { return new FakeSender(Encoding.JSON, Integer.MAX_VALUE, @@ -36,7 +29,6 @@ public static FakeSender create() { }); } - final Encoding encoding; final int messageMaxBytes; final BytesMessageEncoder messageEncoder; final BytesEncoder encoder; @@ -45,7 +37,7 @@ public static FakeSender create() { FakeSender(Encoding encoding, int messageMaxBytes, BytesMessageEncoder messageEncoder, BytesEncoder encoder, BytesDecoder decoder, Consumer> onSpans) { - this.encoding = encoding; + super(encoding); this.messageMaxBytes = messageMaxBytes; this.messageEncoder = messageEncoder; this.encoder = encoder; @@ -53,45 +45,32 @@ public static FakeSender create() { this.onSpans = onSpans; } - FakeSender encoding(Encoding encoding) { + public FakeSender encoding(Encoding encoding) { return new FakeSender(encoding, messageMaxBytes, messageEncoder, // invalid but not needed, yet encoder, // invalid but not needed, yet decoder, // invalid but not needed, yet onSpans); } - FakeSender onSpans(Consumer> onSpans) { + public FakeSender onSpans(Consumer> onSpans) { return new FakeSender(encoding, messageMaxBytes, messageEncoder, encoder, decoder, onSpans); } - FakeSender messageMaxBytes(int messageMaxBytes) { + public FakeSender messageMaxBytes(int messageMaxBytes) { return new FakeSender(encoding, messageMaxBytes, messageEncoder, encoder, decoder, onSpans); } - @Override public Encoding encoding() { - return encoding; - } - @Override public int messageMaxBytes() { return messageMaxBytes; } - @Override public int messageSizeInBytes(List encodedSpans) { - return encoding.listSizeInBytes(encodedSpans); - } - - @Override public int messageSizeInBytes(int encodedSizeInBytes) { - return encoding.listSizeInBytes(encodedSizeInBytes); - } - /** close is typically called from a different thread */ volatile boolean closeCalled; - @Override public Call sendSpans(List encodedSpans) { + @Override public void send(List encodedSpans) { if (closeCalled) throw new ClosedSenderException(); List decoded = encodedSpans.stream().map(decoder::decodeOne).collect(Collectors.toList()); onSpans.accept(decoded); - return Call.create(null); } @Override public void close() { diff --git a/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java b/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java index 81c96857..065ec0b4 100644 --- a/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java +++ b/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java @@ -32,6 +32,7 @@ import zipkin2.reporter.BytesEncoder; import zipkin2.reporter.ClosedSenderException; import zipkin2.reporter.Encoding; +import zipkin2.reporter.FakeSender; import zipkin2.reporter.InMemoryReporterMetrics; import zipkin2.reporter.SpanBytesEncoder; import zipkin2.reporter.internal.AsyncReporter.BoundedAsyncReporter; diff --git a/kafka/pom.xml b/kafka/pom.xml index 32da5917..0ae5e15f 100644 --- a/kafka/pom.xml +++ b/kafka/pom.xml @@ -20,7 +20,7 @@ io.zipkin.reporter2 zipkin-reporter-parent - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT zipkin-sender-kafka diff --git a/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java b/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java index df88069f..55ac8949 100644 --- a/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java +++ b/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java @@ -26,8 +26,10 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.serialization.ByteArraySerializer; +import zipkin2.reporter.AsyncReporter; import zipkin2.reporter.AwaitableCallback; import zipkin2.reporter.BytesMessageEncoder; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Call; import zipkin2.reporter.Callback; import zipkin2.reporter.CheckResult; @@ -40,7 +42,7 @@ * *

Usage

*

- * This type is designed for {@link zipkin2.reporter.AsyncReporter.Builder#builder(Sender) the async + * This type is designed for {@link AsyncReporter.Builder#builder(BytesMessageSender) the async * reporter}. * *

Here's a simple configuration, configured for json: @@ -255,19 +257,32 @@ public Builder toBuilder() { return messageMaxBytes; } + /** {@inheritDoc} */ + @Override @Deprecated public Call sendSpans(List encodedSpans) { + if (closeCalled) throw new ClosedSenderException(); + byte[] message = encoder.encode(encodedSpans); + return new KafkaCall(message); + } + /** * This sends all the spans as a single message. * *

NOTE: this blocks until the metadata server is available. */ - @Override public Call sendSpans(List encodedSpans) { + @Override public void send(List encodedSpans) { if (closeCalled) throw new ClosedSenderException(); - byte[] message = encoder.encode(encodedSpans); - return new KafkaCall(message); + send(encoder.encode(encodedSpans)); } - /** Ensures there are no problems reading metadata about the topic. */ - @Override public CheckResult check() { + void send(byte[] message) { + if (closeCalled) throw new ClosedSenderException(); + AwaitableCallback callback = new AwaitableCallback(); + get().send(new ProducerRecord(topic, message), new CallbackAdapter(callback)); + callback.await(); + } + + /** {@inheritDoc} */ + @Override @Deprecated public CheckResult check() { try { KafkaFuture maybeClusterId = getAdminClient().describeCluster().clusterId(); maybeClusterId.get(1, TimeUnit.SECONDS); @@ -308,7 +323,7 @@ AdminClient getAdminClient() { closeCalled = true; } - @Override public final String toString() { + @Override public String toString() { return "KafkaSender{" + "bootstrapServers=" + properties.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) + ", topic=" + topic @@ -323,9 +338,7 @@ class KafkaCall extends Call.Base { // KafkaFuture is not cancelable } @Override protected Void doExecute() { - AwaitableCallback callback = new AwaitableCallback(); - get().send(new ProducerRecord(topic, message), new CallbackAdapter(callback)); - callback.await(); + send(message); return null; } diff --git a/kafka/src/test/java/zipkin2/reporter/kafka/ITKafkaSender.java b/kafka/src/test/java/zipkin2/reporter/kafka/ITKafkaSender.java index c87bd857..a4a6b7fc 100644 --- a/kafka/src/test/java/zipkin2/reporter/kafka/ITKafkaSender.java +++ b/kafka/src/test/java/zipkin2/reporter/kafka/ITKafkaSender.java @@ -20,13 +20,13 @@ import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.TimeoutException; import java.util.stream.Stream; import javax.management.ObjectName; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.junit.jupiter.api.AfterEach; @@ -39,10 +39,8 @@ import zipkin2.Span; import zipkin2.codec.SpanBytesDecoder; import zipkin2.reporter.AsyncReporter; -import zipkin2.reporter.Call; -import zipkin2.reporter.CheckResult; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; import zipkin2.reporter.SpanBytesEncoder; import static java.util.stream.Collectors.toList; @@ -69,49 +67,49 @@ class ITKafkaSender { sender.close(); } - @Test void sendsSpans() throws Exception { - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + @Test void send() { + sendSpans(CLIENT_SPAN, CLIENT_SPAN); sender.producer.flush(); assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage())) .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test void sendsSpans_PROTO3() throws Exception { + @Test void send_PROTO3() { sender.close(); sender = sender.toBuilder().encoding(Encoding.PROTO3).build(); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); sender.producer.flush(); assertThat(SpanBytesDecoder.PROTO3.decodeList(readMessage())) .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test void sendsSpans_THRIFT() throws Exception { + @Test void send_THRIFT() { sender.close(); sender = sender.toBuilder().encoding(Encoding.THRIFT).build(); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); sender.producer.flush(); assertThat(SpanBytesDecoder.THRIFT.decodeList(readMessage())) .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test void sendsSpansToCorrectTopic() throws Exception { + @Test void sendToCorrectTopic() { sender.close(); kafka.prepareTopics("customzipkintopic"); sender = sender.toBuilder().topic("customzipkintopic").build(); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); sender.producer.flush(); assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage("customzipkintopic"))) .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test void checkFalseWhenKafkaIsDown() { + @Test void sendWhenKafkaIsDown() { kafka.stop(); // Make a new tracer that fails faster than 60 seconds @@ -120,20 +118,19 @@ class ITKafkaSender { overrides.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "100"); sender = sender.toBuilder().overrides(overrides).build(); - CheckResult check = sender.check(); - assertThat(check.ok()).isFalse(); - assertThat(check.error()).isInstanceOf(TimeoutException.class); + assertThatThrownBy(() -> sendSpans(CLIENT_SPAN, CLIENT_SPAN)) + .isInstanceOf(TimeoutException.class); } @Test void illegalToSendWhenClosed() { sender.close(); - assertThatThrownBy(() -> send(CLIENT_SPAN, CLIENT_SPAN).execute()) + assertThatThrownBy(() -> sendSpans(CLIENT_SPAN, CLIENT_SPAN)) .isInstanceOf(IllegalStateException.class); } @Test void shouldCloseKafkaProducerOnClose() throws Exception { - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); sender.producer.flush(); final ObjectName kafkaProducerMXBeanName = new ObjectName("kafka.producer:*"); @@ -152,15 +149,15 @@ class ITKafkaSender { sender.close(); sender = sender.toBuilder().messageMaxBytes(1).build(); - assertThatThrownBy(() -> send(CLIENT_SPAN, CLIENT_SPAN).execute()) + assertThatThrownBy(() -> sendSpans(CLIENT_SPAN, CLIENT_SPAN)) .isInstanceOf(RecordTooLargeException.class); } /** - * The output of toString() on {@link Sender} implementations appears in thread names created by - * {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other monitoring - * tools, care should be taken to ensure the toString() output is a reasonable length and does not - * contain sensitive information. + * The output of toString() on {@link BytesMessageSender} implementations appears in thread names + * created by {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other + * monitoring tools, care should be taken to ensure the toString() output is a reasonable length + * and does not contain sensitive information. */ @Test void toStringContainsOnlySummaryInformation() { assertThat(sender.toString()).isEqualTo( @@ -190,7 +187,7 @@ class ITKafkaSender { assertThat(filteredProperties.get(ProducerConfig.SECURITY_PROVIDERS_CONFIG)).isNotNull(); } - Call send(Span... spans) { + void sendSpans(Span... spans) { SpanBytesEncoder bytesEncoder; switch (sender.encoding()) { case JSON: @@ -205,7 +202,7 @@ Call send(Span... spans) { default: throw new UnsupportedOperationException("encoding: " + sender.encoding()); } - return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); + sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); } byte[] readMessage(String topic) { diff --git a/libthrift/pom.xml b/libthrift/pom.xml index b6b40e79..137f65f3 100644 --- a/libthrift/pom.xml +++ b/libthrift/pom.xml @@ -20,7 +20,7 @@ io.zipkin.reporter2 zipkin-reporter-parent - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT zipkin-sender-libthrift diff --git a/libthrift/src/main/java/zipkin2/reporter/libthrift/LibthriftSender.java b/libthrift/src/main/java/zipkin2/reporter/libthrift/LibthriftSender.java index 8c8c375e..6d438d5f 100644 --- a/libthrift/src/main/java/zipkin2/reporter/libthrift/LibthriftSender.java +++ b/libthrift/src/main/java/zipkin2/reporter/libthrift/LibthriftSender.java @@ -84,7 +84,7 @@ public Builder messageMaxBytes(int messageMaxBytes) { return this; } - public final LibthriftSender build() { + public LibthriftSender build() { return new LibthriftSender(this); } @@ -129,11 +129,24 @@ public int messageSizeInBytes(List encodedSpans) { return ScribeClient.messageSizeInBytes(encodedSpans); } - @Override public Call sendSpans(List encodedSpans) { + /** {@inheritDoc} */ + @Override @Deprecated public Call sendSpans(List encodedSpans) { if (closeCalled) throw new ClosedSenderException(); return new ScribeCall(encodedSpans); } + /** {@inheritDoc} */ + @Override public void send(List encodedSpans) throws IOException { + if (closeCalled) throw new ClosedSenderException(); + try { + if (!get().log(encodedSpans)) { + throw new IllegalStateException("try later"); + } + } catch (TException e) { + throw new IOException(e); + } + } + ScribeClient get() { if (client == null) { synchronized (this) { @@ -149,9 +162,8 @@ ScribeClient get() { private volatile boolean closeCalled; private volatile ScribeClient client; - /** Sends an empty log message to the configured host. */ - @Override - public CheckResult check() { + /** {@inheritDoc} */ + @Override @Deprecated public CheckResult check() { try { if (get().log(Collections.emptyList())) { return CheckResult.OK; @@ -169,7 +181,7 @@ public CheckResult check() { if (client != null) client.close(); } - @Override public final String toString() { + @Override public String toString() { return "LibthriftSender(" + host + ":" + port + ")"; } @@ -181,13 +193,7 @@ class ScribeCall extends Call.Base { } @Override protected Void doExecute() throws IOException { - try { - if (!get().log(encodedSpans)) { - throw new IllegalStateException("try later"); - } - } catch (TException e) { - throw new IOException(e); - } + send(encodedSpans); return null; } diff --git a/libthrift/src/test/java/zipkin2/reporter/libthrift/ITLibthriftSender.java b/libthrift/src/test/java/zipkin2/reporter/libthrift/ITLibthriftSender.java index 7b2e072d..f53bcb67 100644 --- a/libthrift/src/test/java/zipkin2/reporter/libthrift/ITLibthriftSender.java +++ b/libthrift/src/test/java/zipkin2/reporter/libthrift/ITLibthriftSender.java @@ -15,6 +15,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; @@ -25,6 +26,8 @@ import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import zipkin2.Span; +import zipkin2.reporter.AsyncReporter; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.SpanBytesEncoder; import static java.util.stream.Collectors.toList; @@ -51,8 +54,8 @@ class ITLibthriftSender { sender.close(); } - @Test void sendsSpans() throws Exception { - send(CLIENT_SPAN); + @Test void send() throws Exception { + sendSpans(CLIENT_SPAN); assertThat(zipkin.get("/api/v2/trace/" + CLIENT_SPAN.traceId()).isSuccessful()) .isTrue(); @@ -61,9 +64,9 @@ class ITLibthriftSender { /** * This will help verify sequence ID and response parsing logic works */ - @Test void sendsSpans_multipleTimes() throws Exception { + @Test void send_multipleTimes() throws Exception { for (int i = 0; i < 5; i++) { // Have client send 5 messages - send(Arrays.copyOfRange(LOTS_OF_SPANS, i, (i * 10) + 10)); + sendSpans(Arrays.copyOfRange(LOTS_OF_SPANS, i, (i * 10) + 10)); } for (int i = 0; i < 5; i++) { // Try the last ID of each @@ -73,11 +76,11 @@ class ITLibthriftSender { } } - @Test void check_okWhenScribeIsListening() { - assertThat(sender.check().ok()).isTrue(); + @Test void emptyOk() throws Exception { + sender.send(Collections.emptyList()); } - @Test void check_notOkWhenScribeIsDown() { + @Test void sendFailsWhenScribeIsDown() { sender.close(); // Reconfigure to a valid host but invalid port. @@ -85,7 +88,8 @@ class ITLibthriftSender { host(zipkin.host()). port(zipkin.httpPort()).build(); - assertThat(sender.check().ok()).isFalse(); + assertThatThrownBy(() -> sendSpans(CLIENT_SPAN, CLIENT_SPAN)) + .isInstanceOf(IOException.class); } @Test void reconnects() throws Exception { @@ -96,12 +100,12 @@ class ITLibthriftSender { host(zipkin.host()). port(9999).build(); - assertThatThrownBy(() -> send(CLIENT_SPAN, CLIENT_SPAN)) + assertThatThrownBy(() -> sendSpans(CLIENT_SPAN, CLIENT_SPAN)) .isInstanceOf(IOException.class); open(); - send(CLIENT_SPAN, CLIENT_SPAN); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); assertThat(zipkin.get("/api/v2/trace/" + CLIENT_SPAN.traceId()).isSuccessful()) .isTrue(); @@ -110,13 +114,13 @@ class ITLibthriftSender { @Test void illegalToSendWhenClosed() { sender.close(); - assertThatThrownBy(() -> send(CLIENT_SPAN, CLIENT_SPAN)) + assertThatThrownBy(() -> sendSpans(CLIENT_SPAN, CLIENT_SPAN)) .isInstanceOf(IllegalStateException.class); } /** - * The output of toString() on {@link zipkin2.reporter.Sender} implementations appears in thread - * names created by {@link zipkin2.reporter.AsyncReporter}. Since thread names are likely to be + * The output of toString() on {@link BytesMessageSender} implementations appears in thread + * names created by {@link AsyncReporter}. Since thread names are likely to be * exposed in logs and other monitoring tools, care should be taken to ensure the toString() * output is a reasonable length and does not contain sensitive information. */ @@ -128,9 +132,9 @@ class ITLibthriftSender { /** * Blocks until the callback completes to allow read-your-writes consistency during tests. */ - void send(Span... spans) throws IOException { + void sendSpans(Span... spans) throws IOException { List encodedSpans = Stream.of(spans).map(SpanBytesEncoder.THRIFT::encode).collect(toList()); - sender.sendSpans(encodedSpans).execute(); + sender.send(encodedSpans); } } diff --git a/libthrift/src/test/java/zipkin2/reporter/libthrift/InternalScribeCodecTest.java b/libthrift/src/test/java/zipkin2/reporter/libthrift/InternalScribeCodecTest.java index 12926dd0..a0fc7402 100644 --- a/libthrift/src/test/java/zipkin2/reporter/libthrift/InternalScribeCodecTest.java +++ b/libthrift/src/test/java/zipkin2/reporter/libthrift/InternalScribeCodecTest.java @@ -40,7 +40,7 @@ class InternalScribeCodecTest { } } - @Test void sendsSpansExpectedMetrics() throws Exception { + @Test void sendExpectedMetrics() throws Exception { byte[] thrift = SpanBytesEncoder.THRIFT.encode(CLIENT_SPAN); List encodedSpans = asList(thrift, thrift); diff --git a/metrics-micrometer/pom.xml b/metrics-micrometer/pom.xml index d7744d1c..448aa0c8 100644 --- a/metrics-micrometer/pom.xml +++ b/metrics-micrometer/pom.xml @@ -20,7 +20,7 @@ io.zipkin.reporter2 zipkin-reporter-parent - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT zipkin-reporter-metrics-micrometer diff --git a/okhttp3/pom.xml b/okhttp3/pom.xml index bc6041b5..93a97e5b 100644 --- a/okhttp3/pom.xml +++ b/okhttp3/pom.xml @@ -20,7 +20,7 @@ io.zipkin.reporter2 zipkin-reporter-parent - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT zipkin-sender-okhttp3 diff --git a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java index 53d39f6f..5209e623 100644 --- a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java +++ b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java @@ -19,6 +19,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import okhttp3.Call; import okhttp3.Dispatcher; import okhttp3.HttpUrl; import okhttp3.MediaType; @@ -30,19 +31,22 @@ import okio.BufferedSink; import okio.GzipSink; import okio.Okio; +import zipkin2.reporter.AsyncReporter; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.CheckResult; import zipkin2.reporter.ClosedSenderException; import zipkin2.reporter.Encoding; import zipkin2.reporter.Sender; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static zipkin2.reporter.okhttp3.HttpCall.parseResponse; /** * Reports spans to Zipkin, using its POST endpoint. * *

Usage

*

- * This type is designed for {@link zipkin2.reporter.AsyncReporter.Builder#builder(Sender) the async + * This type is designed for {@link AsyncReporter.Builder#builder(BytesMessageSender) the async * reporter}. * *

Here's a simple configuration, configured for json: @@ -268,8 +272,8 @@ public Builder toBuilder() { /** close is typically called from a different thread */ volatile boolean closeCalled; - /** The returned call sends spans as a POST to {@link Builder#endpoint(String)}. */ - @Override public zipkin2.reporter.Call sendSpans(List encodedSpans) { + /** {@inheritDoc} */ + @Override @Deprecated public zipkin2.reporter.Call sendSpans(List encodedSpans) { if (closeCalled) throw new ClosedSenderException(); Request request; try { @@ -280,8 +284,16 @@ public Builder toBuilder() { return new HttpCall(client.newCall(request)); } - /** Sends an empty json message to the configured endpoint. */ - @Override public CheckResult check() { + /** Sends spans as a POST to {@link Builder#endpoint(String)}. */ + @Override public void send(List encodedSpans) throws IOException { + if (closeCalled) throw new ClosedSenderException(); + Request request = newRequest(encoder.encode(encodedSpans)); + Call call = client.newCall(request); + parseResponse(call.execute()); + } + + /** {@inheritDoc} */ + @Override @Deprecated public CheckResult check() { try { Request request = new Request.Builder().url(endpoint) .post(RequestBody.create(MediaType.parse("application/json"), "[]")).build(); diff --git a/okhttp3/src/test/java/zipkin2/reporter/okhttp3/ITOkHttpSender.java b/okhttp3/src/test/java/zipkin2/reporter/okhttp3/ITOkHttpSender.java index 0750f1bf..c71fe6db 100644 --- a/okhttp3/src/test/java/zipkin2/reporter/okhttp3/ITOkHttpSender.java +++ b/okhttp3/src/test/java/zipkin2/reporter/okhttp3/ITOkHttpSender.java @@ -15,6 +15,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -29,6 +30,7 @@ import org.junit.jupiter.api.Test; import zipkin2.Span; import zipkin2.codec.SpanBytesDecoder; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.SpanBytesEncoder; import zipkin2.reporter.AsyncReporter; import zipkin2.reporter.AwaitableCallback; @@ -73,15 +75,15 @@ public class ITOkHttpSender { // public for use in src/it server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); assertThat(called.get()).isTrue(); } - @Test void sendsSpans() throws Exception { + @Test void send() throws Exception { server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // Ensure only one request was sent assertThat(server.getRequestCount()).isEqualTo(1); @@ -91,12 +93,27 @@ public class ITOkHttpSender { // public for use in src/it .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test void sendsSpans_PROTO3() throws Exception { + @Test void emptyOk() throws Exception{ + server.enqueue(new MockResponse()); + + sender.send(Collections.emptyList()); + + assertThat(server.getRequestCount()).isEqualTo(1); + } + + @Test void sendFailsOnDisconnect() { + server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); + + assertThatThrownBy(() -> sendSpans(CLIENT_SPAN, CLIENT_SPAN)) + .isInstanceOf(IOException.class); + } + + @Test void send_PROTO3() throws Exception { sender = sender.toBuilder().encoding(Encoding.PROTO3).build(); server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // Ensure only one request was sent assertThat(server.getRequestCount()).isEqualTo(1); @@ -106,12 +123,12 @@ public class ITOkHttpSender { // public for use in src/it .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test void sendsSpans_THRIFT() throws Exception { + @Test void send_THRIFT() throws Exception { sender = sender.toBuilder().encoding(Encoding.THRIFT).build(); server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // Ensure only one request was sent assertThat(server.getRequestCount()).isEqualTo(1); @@ -128,7 +145,7 @@ public class ITOkHttpSender { // public for use in src/it server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // block until the request arrived requests.add(server.takeRequest()); @@ -142,7 +159,7 @@ public class ITOkHttpSender { // public for use in src/it @Test void ensuresProxiesDontTrace() throws Exception { server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // If the Zipkin endpoint is proxied and instrumented, it will know "0" means don't trace. assertThat(server.takeRequest().getHeader("b3")).isEqualTo("0"); @@ -151,13 +168,14 @@ public class ITOkHttpSender { // public for use in src/it @Test void mediaTypeBasedOnSpanEncoding() throws Exception { server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // block until the request arrived assertThat(server.takeRequest().getHeader("Content-Type")) .isEqualTo("application/json"); } + @Deprecated @Test void closeWhileRequestInFlight_cancelsRequest() throws Exception { server.shutdown(); // shutdown the normal zipkin rule sender.close(); @@ -197,6 +215,7 @@ public class ITOkHttpSender { // public for use in src/it /** * Each message by default is up to 5MiB, make sure these go out of process as soon as they can. */ + @Deprecated @Test void messagesSendImmediately() throws Exception { server.shutdown(); // shutdown the normal zipkin rule sender.close(); @@ -234,6 +253,7 @@ public class ITOkHttpSender { // public for use in src/it } } + @Deprecated @Test void closeWhileRequestInFlight_graceful() throws Exception { server.shutdown(); // shutdown the normal zipkin rule sender.close(); @@ -264,10 +284,11 @@ public class ITOkHttpSender { // public for use in src/it } } + @Deprecated @Test void noExceptionWhenServerErrors() { server.enqueue(new MockResponse().setResponseCode(500)); - send().enqueue(new Callback() { + sender.sendSpans(Collections.emptyList()).enqueue(new Callback() { @Override public void onSuccess(Void aVoid) { } @@ -277,38 +298,24 @@ public class ITOkHttpSender { // public for use in src/it } @Test void outOfBandCancel() { - HttpCall call = (HttpCall) send(CLIENT_SPAN, CLIENT_SPAN); + HttpCall call = (HttpCall) sender.sendSpans(Collections.emptyList()); call.cancel(); assertThat(call.isCanceled()).isTrue(); } - @Test void check_ok() { - server.enqueue(new MockResponse()); - - assertThat(sender.check().ok()).isTrue(); - - assertThat(server.getRequestCount()).isEqualTo(1); - } - - @Test void check_fail() { - server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); - - assertThat(sender.check().ok()).isFalse(); - } - @Test void illegalToSendWhenClosed() { sender.close(); - assertThatThrownBy(() -> send(CLIENT_SPAN)) + assertThatThrownBy(() -> sendSpans(CLIENT_SPAN)) .isInstanceOf(IllegalStateException.class); } /** - * The output of toString() on {@link Sender} implementations appears in thread names created by - * {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other monitoring - * tools, care should be taken to ensure the toString() output is a reasonable length and does not - * contain sensitive information. + * The output of toString() on {@link BytesMessageSender} implementations appears in thread names + * created by {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other + * monitoring tools, care should be taken to ensure the toString() output is a reasonable length + * and does not contain sensitive information. */ @Test void toStringContainsOnlySenderTypeAndEndpoint() { assertThat(sender.toString()).isEqualTo("OkHttpSender{" + endpoint + "}"); @@ -320,7 +327,7 @@ public class ITOkHttpSender { // public for use in src/it .isNull(); } - Call send(Span... spans) { + void sendSpans(Span... spans) throws IOException { SpanBytesEncoder bytesEncoder; switch (sender.encoding()) { case JSON: @@ -335,6 +342,6 @@ Call send(Span... spans) { default: throw new UnsupportedOperationException("encoding: " + sender.encoding()); } - return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); + sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); } } diff --git a/pom.xml b/pom.xml index 57ebf426..e059a2b0 100755 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ io.zipkin.reporter2 zipkin-reporter-parent - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT pom @@ -93,7 +93,7 @@ 3.0.1 3.5.1 3.3.0 - 3.2.3 + 3.2.5 1.6.13 diff --git a/spring-beans/pom.xml b/spring-beans/pom.xml index d4f3d102..2c3997b0 100644 --- a/spring-beans/pom.xml +++ b/spring-beans/pom.xml @@ -18,7 +18,7 @@ io.zipkin.reporter2 zipkin-reporter-parent - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT 4.0.0 diff --git a/spring-beans/src/main/java/zipkin2/reporter/beans/BaseAsyncFactoryBean.java b/spring-beans/src/main/java/zipkin2/reporter/beans/BaseAsyncFactoryBean.java index 5326aeaa..914de351 100644 --- a/spring-beans/src/main/java/zipkin2/reporter/beans/BaseAsyncFactoryBean.java +++ b/spring-beans/src/main/java/zipkin2/reporter/beans/BaseAsyncFactoryBean.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 The OpenZipkin Authors + * Copyright 2016-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -14,11 +14,12 @@ package zipkin2.reporter.beans; import org.springframework.beans.factory.config.AbstractFactoryBean; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.ReporterMetrics; import zipkin2.reporter.Sender; abstract class BaseAsyncFactoryBean extends AbstractFactoryBean { - Sender sender; + BytesMessageSender sender; ReporterMetrics metrics; Integer messageMaxBytes; Integer messageTimeout; @@ -30,7 +31,7 @@ abstract class BaseAsyncFactoryBean extends AbstractFactoryBean { return true; } - public void setSender(Sender sender) { + public void setSender(BytesMessageSender sender) { this.sender = sender; } diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/ActiveMQSenderFactoryBeanTest.java b/spring-beans/src/test/java/zipkin2/reporter/beans/ActiveMQSenderFactoryBeanTest.java index 3bec3c21..4ac1eb7a 100755 --- a/spring-beans/src/test/java/zipkin2/reporter/beans/ActiveMQSenderFactoryBeanTest.java +++ b/spring-beans/src/test/java/zipkin2/reporter/beans/ActiveMQSenderFactoryBeanTest.java @@ -134,7 +134,7 @@ class ActiveMQSenderFactoryBeanTest { ActiveMQSender sender = context.getBean("sender", ActiveMQSender.class); context.close(); - sender.sendSpans(Arrays.asList(new byte[]{'{', '}'})); + sender.send(Arrays.asList(new byte[]{'{', '}'})); }); } } diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/AsyncReporterFactoryBeanTest.java b/spring-beans/src/test/java/zipkin2/reporter/beans/AsyncReporterFactoryBeanTest.java index 7ca97bcb..201cf39c 100755 --- a/spring-beans/src/test/java/zipkin2/reporter/beans/AsyncReporterFactoryBeanTest.java +++ b/spring-beans/src/test/java/zipkin2/reporter/beans/AsyncReporterFactoryBeanTest.java @@ -17,6 +17,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import zipkin2.reporter.AsyncReporter; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Encoding; import zipkin2.reporter.ReporterMetrics; import zipkin2.reporter.Sender; @@ -25,8 +26,8 @@ import static org.assertj.core.api.Assertions.assertThat; class AsyncReporterFactoryBeanTest { - public static Sender SENDER = new FakeSender(); - public static Sender PROTO3_SENDER = new FakeSender() { + public static BytesMessageSender SENDER = new FakeSender(); + public static BytesMessageSender PROTO3_SENDER = new FakeSender() { @Override public Encoding encoding() { return Encoding.PROTO3; } diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/AsyncZipkinSpanHandlerFactoryBeanTest.java b/spring-beans/src/test/java/zipkin2/reporter/beans/AsyncZipkinSpanHandlerFactoryBeanTest.java index 29837bc1..2c61d488 100755 --- a/spring-beans/src/test/java/zipkin2/reporter/beans/AsyncZipkinSpanHandlerFactoryBeanTest.java +++ b/spring-beans/src/test/java/zipkin2/reporter/beans/AsyncZipkinSpanHandlerFactoryBeanTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 The OpenZipkin Authors + * Copyright 2016-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -18,6 +18,7 @@ import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.ReporterMetrics; import zipkin2.reporter.Sender; import zipkin2.reporter.brave.AsyncZipkinSpanHandler; @@ -31,7 +32,7 @@ class AsyncZipkinSpanHandlerFactoryBeanTest { return null; } }; - public static Sender SENDER = new FakeSender(); + public static BytesMessageSender SENDER = new FakeSender(); public static ReporterMetrics METRICS = ReporterMetrics.NOOP_METRICS; XmlBeans context; diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/FakeSender.java b/spring-beans/src/test/java/zipkin2/reporter/beans/FakeSender.java index 73cce760..6f3cf4f9 100644 --- a/spring-beans/src/test/java/zipkin2/reporter/beans/FakeSender.java +++ b/spring-beans/src/test/java/zipkin2/reporter/beans/FakeSender.java @@ -13,25 +13,23 @@ */ package zipkin2.reporter.beans; +import java.io.IOException; import java.util.List; -import zipkin2.reporter.Call; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; -class FakeSender extends Sender { - @Override public Encoding encoding() { - return Encoding.JSON; +class FakeSender extends BytesMessageSender.Base { + FakeSender() { + super(Encoding.JSON); } @Override public int messageMaxBytes() { return 1024; } - @Override public int messageSizeInBytes(List encodedSpans) { - return 1024; + @Override public void send(List encodedSpans) { } - @Override public Call sendSpans(List encodedSpans) { - return Call.create(null); + @Override public void close() throws IOException { } } diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/KafkaSenderFactoryBeanTest.java b/spring-beans/src/test/java/zipkin2/reporter/beans/KafkaSenderFactoryBeanTest.java index 579d66fc..8816fe7e 100755 --- a/spring-beans/src/test/java/zipkin2/reporter/beans/KafkaSenderFactoryBeanTest.java +++ b/spring-beans/src/test/java/zipkin2/reporter/beans/KafkaSenderFactoryBeanTest.java @@ -95,7 +95,7 @@ class KafkaSenderFactoryBeanTest { KafkaSender sender = context.getBean("sender", KafkaSender.class); context.close(); - sender.sendSpans(Arrays.asList(new byte[] {'{', '}'})); + sender.send(Arrays.asList(new byte[] {'{', '}'})); }); } } diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/LibthriftSenderFactoryBeanTest.java b/spring-beans/src/test/java/zipkin2/reporter/beans/LibthriftSenderFactoryBeanTest.java index 9fc6f976..f2284759 100755 --- a/spring-beans/src/test/java/zipkin2/reporter/beans/LibthriftSenderFactoryBeanTest.java +++ b/spring-beans/src/test/java/zipkin2/reporter/beans/LibthriftSenderFactoryBeanTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 The OpenZipkin Authors + * Copyright 2016-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -110,7 +110,7 @@ class LibthriftSenderFactoryBeanTest { LibthriftSender sender = context.getBean("sender", LibthriftSender.class); context.close(); - sender.sendSpans(Arrays.asList(new byte[0])); + sender.send(Arrays.asList(new byte[0])); }); } } diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/OkHttpSenderFactoryBeanTest.java b/spring-beans/src/test/java/zipkin2/reporter/beans/OkHttpSenderFactoryBeanTest.java index e02f589d..70805aab 100755 --- a/spring-beans/src/test/java/zipkin2/reporter/beans/OkHttpSenderFactoryBeanTest.java +++ b/spring-beans/src/test/java/zipkin2/reporter/beans/OkHttpSenderFactoryBeanTest.java @@ -144,7 +144,7 @@ class OkHttpSenderFactoryBeanTest { OkHttpSender sender = context.getBean("sender", OkHttpSender.class); context.close(); - sender.sendSpans(Arrays.asList(new byte[]{'{', '}'})); + sender.send(Arrays.asList(new byte[]{'{', '}'})); }); } } diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/RabbitMQSenderFactoryBeanTest.java b/spring-beans/src/test/java/zipkin2/reporter/beans/RabbitMQSenderFactoryBeanTest.java index 02d8cfa5..d130f497 100755 --- a/spring-beans/src/test/java/zipkin2/reporter/beans/RabbitMQSenderFactoryBeanTest.java +++ b/spring-beans/src/test/java/zipkin2/reporter/beans/RabbitMQSenderFactoryBeanTest.java @@ -132,7 +132,7 @@ class RabbitMQSenderFactoryBeanTest { RabbitMQSender sender = context.getBean("sender", RabbitMQSender.class); context.close(); - sender.sendSpans(asList(new byte[] {'{', '}'})); + sender.send(asList(new byte[] {'{', '}'})); }); } } diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/URLConnectionSenderFactoryBeanTest.java b/spring-beans/src/test/java/zipkin2/reporter/beans/URLConnectionSenderFactoryBeanTest.java index 8aeb0621..3231fed8 100755 --- a/spring-beans/src/test/java/zipkin2/reporter/beans/URLConnectionSenderFactoryBeanTest.java +++ b/spring-beans/src/test/java/zipkin2/reporter/beans/URLConnectionSenderFactoryBeanTest.java @@ -124,7 +124,7 @@ class URLConnectionSenderFactoryBeanTest { URLConnectionSender sender = context.getBean("sender", URLConnectionSender.class); context.close(); - sender.sendSpans(Arrays.asList(new byte[]{'{', '}'})); + sender.send(Arrays.asList(new byte[]{'{', '}'})); }); } } diff --git a/urlconnection/pom.xml b/urlconnection/pom.xml index 00865da7..55ffac50 100644 --- a/urlconnection/pom.xml +++ b/urlconnection/pom.xml @@ -20,7 +20,7 @@ io.zipkin.reporter2 zipkin-reporter-parent - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT zipkin-sender-urlconnection diff --git a/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java b/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java index 6e76f428..a779e16e 100644 --- a/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java +++ b/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java @@ -117,7 +117,7 @@ public Builder encoding(Encoding encoding) { return this; } - public final URLConnectionSender build() { + public URLConnectionSender build() { return new URLConnectionSender(this); } @@ -182,14 +182,20 @@ public Builder toBuilder() { return messageMaxBytes; } - /** The returned call sends spans as a POST to {@link Builder#endpoint}. */ - @Override public Call sendSpans(List encodedSpans) { + /** {@inheritDoc} */ + @Override @Deprecated public Call sendSpans(List encodedSpans) { if (closeCalled) throw new ClosedSenderException(); return new HttpPostCall(encoder.encode(encodedSpans)); } - /** Sends an empty json message to the configured endpoint. */ - @Override public CheckResult check() { + /** Sends spans as a POST to {@link Builder#endpoint}. */ + @Override public void send(List encodedSpans) throws IOException { + if (closeCalled) throw new ClosedSenderException(); + send(encoder.encode(encodedSpans), mediaType); + } + + /** {@inheritDoc} */ + @Override @Deprecated public CheckResult check() { try { send(new byte[] {'[', ']'}, "application/json"); return CheckResult.OK; @@ -199,12 +205,8 @@ public Builder toBuilder() { } } - @Override public void close() { - closeCalled = true; - } - void send(byte[] body, String mediaType) throws IOException { - // intentionally not closing the connection, so as to use keep-alives + // intentionally not closing the connection, to use keep-alives HttpURLConnection connection = (HttpURLConnection) endpoint.openConnection(); connection.setConnectTimeout(connectTimeout); connection.setReadTimeout(readTimeout); @@ -255,7 +257,11 @@ static IOException skipAndSuppress(InputStream in) { } } - @Override public final String toString() { + @Override public void close() { + closeCalled = true; + } + + @Override public String toString() { return "URLConnectionSender{" + endpoint + "}"; } diff --git a/urlconnection/src/test/java/zipkin2/reporter/urlconnection/ITURLConnectionSender.java b/urlconnection/src/test/java/zipkin2/reporter/urlconnection/ITURLConnectionSender.java index f401d9f0..5980c185 100644 --- a/urlconnection/src/test/java/zipkin2/reporter/urlconnection/ITURLConnectionSender.java +++ b/urlconnection/src/test/java/zipkin2/reporter/urlconnection/ITURLConnectionSender.java @@ -15,6 +15,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.stream.Stream; import okhttp3.mockwebserver.MockResponse; @@ -28,10 +29,9 @@ import zipkin2.codec.SpanBytesDecoder; import zipkin2.codec.SpanBytesEncoder; import zipkin2.reporter.AsyncReporter; -import zipkin2.reporter.Call; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Callback; import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; import static java.util.Arrays.asList; import static java.util.stream.Collectors.toList; @@ -50,59 +50,70 @@ class ITURLConnectionSender { String endpoint = server.url("/api/v2/spans").toString(); @BeforeEach void setUp() { - sender = URLConnectionSender.newBuilder() - .endpoint(endpoint) - .compressionEnabled(false) - .build(); + sender = URLConnectionSender.newBuilder().endpoint(endpoint).compressionEnabled(false).build(); } @Test void badUrlIsAnIllegalArgument() { - assertThatThrownBy(() -> URLConnectionSender.create("htp://localhost:9411/api/v1/spans")) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("unknown protocol: htp"); + assertThatThrownBy( + () -> URLConnectionSender.create("htp://localhost:9411/api/v1/spans")).isInstanceOf( + IllegalArgumentException.class).hasMessage("unknown protocol: htp"); } - @Test void sendsSpans() throws Exception { + @Test void send() throws Exception { server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // Ensure only one request was sent assertThat(server.getRequestCount()).isEqualTo(1); // Now, let's read back the spans we sent! - assertThat(SpanBytesDecoder.JSON_V2.decodeList(server.takeRequest().getBody().readByteArray())) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); + assertThat(SpanBytesDecoder.JSON_V2.decodeList( + server.takeRequest().getBody().readByteArray())).containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test void sendsSpans_PROTO3() throws Exception { + @Test void emptyOk() throws Exception { + server.enqueue(new MockResponse()); + + sender.send(Collections.emptyList()); + + assertThat(server.getRequestCount()).isEqualTo(1); + } + + @Test void sendFailsOnDisconnect() { + server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); + + assertThatThrownBy(() -> sendSpans(CLIENT_SPAN, CLIENT_SPAN)).isInstanceOf(IOException.class); + } + + @Test void send_PROTO3() throws Exception { sender = sender.toBuilder().encoding(Encoding.PROTO3).build(); server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // Ensure only one request was sent assertThat(server.getRequestCount()).isEqualTo(1); // Now, let's read back the spans we sent! - assertThat(SpanBytesDecoder.PROTO3.decodeList(server.takeRequest().getBody().readByteArray())) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); + assertThat(SpanBytesDecoder.PROTO3.decodeList( + server.takeRequest().getBody().readByteArray())).containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test void sendsSpans_THRIFT() throws Exception { + @Test void send_THRIFT() throws Exception { sender = sender.toBuilder().encoding(Encoding.THRIFT).build(); server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // Ensure only one request was sent assertThat(server.getRequestCount()).isEqualTo(1); // Now, let's read back the spans we sent! - assertThat(SpanBytesDecoder.THRIFT.decodeList(server.takeRequest().getBody().readByteArray())) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); + assertThat(SpanBytesDecoder.THRIFT.decodeList( + server.takeRequest().getBody().readByteArray())).containsExactly(CLIENT_SPAN, CLIENT_SPAN); } @Test void compression() throws Exception { @@ -112,21 +123,20 @@ class ITURLConnectionSender { server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // block until the request arrived requests.add(server.takeRequest()); } // we expect the first compressed request to be smaller than the uncompressed one. - assertThat(requests.get(0).getBodySize()) - .isLessThan(requests.get(1).getBodySize()); + assertThat(requests.get(0).getBodySize()).isLessThan(requests.get(1).getBodySize()); } @Test void ensuresProxiesDontTrace() throws Exception { server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // If the Zipkin endpoint is proxied and instrumented, it will know "0" means don't trace. assertThat(server.takeRequest().getHeader("b3")).isEqualTo("0"); @@ -135,17 +145,16 @@ class ITURLConnectionSender { @Test void mediaTypeBasedOnSpanEncoding() throws Exception { server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // block until the request arrived - assertThat(server.takeRequest().getHeader("Content-Type")) - .isEqualTo("application/json"); + assertThat(server.takeRequest().getHeader("Content-Type")).isEqualTo("application/json"); } - @Test void noExceptionWhenServerErrors() { + @Deprecated @Test void noExceptionWhenServerErrors() { server.enqueue(new MockResponse().setResponseCode(500)); - send().enqueue(new Callback() { + sender.sendSpans(Collections.emptyList()).enqueue(new Callback() { @Override public void onSuccess(Void aVoid) { } @@ -154,38 +163,23 @@ class ITURLConnectionSender { }); } - @Test void check_ok() { - server.enqueue(new MockResponse()); - - assertThat(sender.check().ok()).isTrue(); - - assertThat(server.getRequestCount()).isEqualTo(1); - } - - @Test void check_fail() { - server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); - - assertThat(sender.check().ok()).isFalse(); - } - @Test void illegalToSendWhenClosed() { sender.close(); - assertThatThrownBy(() -> send(CLIENT_SPAN).execute()) - .isInstanceOf(IllegalStateException.class); + assertThatThrownBy(() -> sendSpans(CLIENT_SPAN)).isInstanceOf(IllegalStateException.class); } /** - * The output of toString() on {@link Sender} implementations appears in thread names created by - * {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other monitoring - * tools, care should be taken to ensure the toString() output is a reasonable length and does not - * contain sensitive information. + * The output of toString() on {@link BytesMessageSender} implementations appears in thread names + * created by {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other + * monitoring tools, care should be taken to ensure the toString() output is a reasonable length + * and does not contain sensitive information. */ @Test void toStringContainsOnlySenderTypeAndEndpoint() { assertThat(sender.toString()).isEqualTo("URLConnectionSender{" + endpoint + "}"); } - Call send(Span... spans) { + void sendSpans(Span... spans) throws IOException { SpanBytesEncoder bytesEncoder; switch (sender.encoding()) { case JSON: @@ -200,6 +194,6 @@ Call send(Span... spans) { default: throw new UnsupportedOperationException("encoding: " + sender.encoding()); } - return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); + sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); } }