From 22b0e371f80c8028568c09915461d5c2fe84a2e9 Mon Sep 17 00:00:00 2001 From: Crypt Keeper <64215+codefromthecrypt@users.noreply.github.com> Date: Sun, 14 Jan 2024 15:23:10 +0800 Subject: [PATCH] Deprecates Sender for much simpler BytesMessageSender (#244) async reporters never actually use the async code of the senders. The async side was only used by zipkin-server, as the async reporter design intentionally uses a blocking loop to flush the span backlog. So, this makes things simpler by only requiring a blocking implementation: `BytesMessageSender.send` (chosen to not create symbol collisions and similar to `BytesMessageEncoder`. The side effect is that new senders can use this interface and completely avoid the complicated `Call` then `execute` chain, deferring to whatever the library-specific blocking path is. As older spring libraries like sleuth may not upgrade, this maintains the old types until reporter 4. However, new senders can simply implement the new type and be done with it. Signed-off-by: Adrian Cole --- RATIONALE.md | 55 ++++++++ README.md | 20 ++- activemq-client/pom.xml | 2 +- .../reporter/activemq/ActiveMQSender.java | 46 ++++--- .../reporter/activemq/ITActiveMQSender.java | 56 ++++---- amqp-client/pom.xml | 2 +- .../zipkin2/reporter/amqp/RabbitMQSender.java | 34 +++-- .../reporter/amqp/ITRabbitMQSender.java | 35 +++-- .../reporter/amqp/RabbitMQSenderTest.java | 24 ++-- benchmarks/pom.xml | 2 +- .../reporter/HttpSenderBenchmarks.java | 4 +- .../reporter/KafkaSenderBenchmarks.java | 2 +- .../reporter/OkHttpSenderBenchmarks.java | 4 +- .../URLConnectionSenderBenchmarks.java | 4 +- .../amqp/RabbitMQSenderBenchmarks.java | 32 +++-- .../zipkin2/reporter/internal/NoopSender.java | 30 +---- .../reporter/internal/SenderBenchmarks.java | 13 +- bom/pom.xml | 2 +- brave/pom.xml | 2 +- .../AsyncZipkinSpanHandlerTest.java | 2 +- .../brave/AsyncZipkinSpanHandler.java | 9 +- .../brave/AsyncZipkinSpanHandlerTest.java | 114 ++++++++++++++++ .../brave/ConvertingSpanReporterTest.java | 66 +++++----- .../zipkin2/reporter/brave/FakeSender.java | 23 +--- core/pom.xml | 2 +- .../java/zipkin2/reporter/AsyncReporter.java | 26 ++-- .../zipkin2/reporter/AwaitableCallback.java | 4 +- .../zipkin2/reporter/BytesMessageSender.java | 120 +++++++++++++++++ core/src/main/java/zipkin2/reporter/Call.java | 6 + .../reporter/ClosedSenderException.java | 4 +- .../main/java/zipkin2/reporter/Component.java | 5 + .../zipkin2/reporter/ReporterMetrics.java | 4 +- .../main/java/zipkin2/reporter/Sender.java | 47 ++----- .../reporter/internal/AsyncReporter.java | 43 +++--- .../zipkin2/reporter/AsyncReporterTest.java | 122 ++++++++++++++++++ .../reporter/{internal => }/FakeSender.java | 35 +---- .../reporter/internal/AsyncReporterTest.java | 1 + kafka/pom.xml | 2 +- .../zipkin2/reporter/kafka/KafkaSender.java | 33 +++-- .../zipkin2/reporter/kafka/ITKafkaSender.java | 47 ++++--- libthrift/pom.xml | 2 +- .../reporter/libthrift/LibthriftSender.java | 32 +++-- .../reporter/libthrift/ITLibthriftSender.java | 34 ++--- .../libthrift/InternalScribeCodecTest.java | 2 +- metrics-micrometer/pom.xml | 2 +- okhttp3/pom.xml | 2 +- .../reporter/okhttp3/OkHttpSender.java | 22 +++- .../reporter/okhttp3/ITOkHttpSender.java | 73 ++++++----- pom.xml | 4 +- spring-beans/pom.xml | 2 +- .../reporter/beans/BaseAsyncFactoryBean.java | 7 +- .../beans/ActiveMQSenderFactoryBeanTest.java | 2 +- .../beans/AsyncReporterFactoryBeanTest.java | 5 +- ...AsyncZipkinSpanHandlerFactoryBeanTest.java | 5 +- .../zipkin2/reporter/beans/FakeSender.java | 16 +-- .../beans/KafkaSenderFactoryBeanTest.java | 2 +- .../beans/LibthriftSenderFactoryBeanTest.java | 4 +- .../beans/OkHttpSenderFactoryBeanTest.java | 2 +- .../beans/RabbitMQSenderFactoryBeanTest.java | 2 +- .../URLConnectionSenderFactoryBeanTest.java | 2 +- urlconnection/pom.xml | 2 +- .../urlconnection/URLConnectionSender.java | 28 ++-- .../urlconnection/ITURLConnectionSender.java | 98 +++++++------- 63 files changed, 931 insertions(+), 503 deletions(-) create mode 100644 RATIONALE.md create mode 100644 core/src/main/java/zipkin2/reporter/BytesMessageSender.java create mode 100644 core/src/test/java/zipkin2/reporter/AsyncReporterTest.java rename core/src/test/java/zipkin2/reporter/{internal => }/FakeSender.java (71%) diff --git a/RATIONALE.md b/RATIONALE.md new file mode 100644 index 00000000..380ce514 --- /dev/null +++ b/RATIONALE.md @@ -0,0 +1,55 @@ +# zipkin-reporter rationale + +## Sending an empty list is permitted + +Historically, we had a `Sender.check()` function for fail fast reasons, but it was rarely used and +rarely implemented correctly. In some cases, people returned `OK` having no knowledge of if the +health was good or not. In one case, Stackdriver, a seemingly good implementation was avoided for +directly sending an empty list of spans, until `check()` was changed to do the same. Rather than +define a poorly implementable `Sender.check()` which would likely still require sending an empty +list, we decided to document a call to send no spans should pass through. + +Two known examples of using `check()` were in server modules that forward spans with zipkin reporter +and finagle. `zipkin-finagle` is no longer maintained, so we'll focus on the server modules. + +zipkin-stackdriver (now zipkin-gcp) was both important to verify and difficult to implement a +meaningful `check()`. First attempts looked good, but would pass even when users had no permission +to write spans. For this reason, people ignored the check and did out-of-band sending zero spans to +the POST endpoint. Later, this logic was made the primary impl of `check()`. + +In HTTP senders a `check()` would be invalid for non-intuitive reasons unless you also just posted +no spans. For example, while zipkin has a `/health` endpoint, most clones do not implement that or +put it at a different path. So, you can't check with `/health` and are left with either falsely +returning `OK` or sending an empty list of spans. + +Note that zipkin server does obviate calls to storage when incoming lists are empty. This is not +just for things like this, but 3rd party instrumentation which bugged out and sent no spans. + +Messaging senders came close to implementing health except would suffer similar problems as +Stackdriver did. For example, verifying broker connectivity doesn't mean the queue or topic works. +While you can dig around and solve this for some brokers, it ends up the same situation. + +Another way could be to catch an exception from a prior "POST", and if that failed, return a +corresponding status. This could not be for fail-fast because the caller wouldn't have any spans to +send, yet. It is complicated code for a function uncommon in instrumentation and the impl would be +hard to reason with concurrently. + +The main problem is that we used the same `Component` type in reporter as we did for zipkin server, +which defined `check()` in a hardly used and hardly implementable way except sending no spans. + +We had the following choices: + +* force implementation of `check()` knowing its problems and that it is usual in instrumentation +* document that implementors can skip `send(empty)` even though call sites use this today +* document that you should not skip `send(empty)`, so that the few callers can use it for fail-fast + +The main driving points were how niche this function is (not called by many, or on interval), +and how much work it is to implement a `check()` vs allowing an empty send to proceed. In the +current code base, the only work required for the latter was documentation, as all senders would +pass an empty list. Secondary driving force was that the `BytesMessageSender` main goal is easier +implementation and re-introducing a bad `check()` api gets in the way of this. + +Due to the complexity of this problem, we decided that rather to leave empty undefined, document +sending empty is ok. This allows a couple users to implement a fail-fast in a portable way, without +burdening implementers of `BytesMessageSender` with an unimplementable or wrong `check()` function +for most platforms. diff --git a/README.md b/README.md index 5a830ece..64c21252 100644 --- a/README.md +++ b/README.md @@ -29,7 +29,7 @@ Reporter.CONSOLE.report(span); ## AsyncReporter AsyncReporter is how you actually get spans to zipkin. By default, it waits up to a second -before flushes any pending spans out of process via a Sender. +before flushes any pending spans out of process via a BytesMessageSender. ```java reporter = AsyncReporter.create(URLConnectionSender.create("http://localhost:9411/api/v2/spans")); @@ -67,7 +67,7 @@ Here are the most important properties to understand when tuning. Property | Description --- | --- `queuedMaxBytes` | Maximum backlog of span bytes reported vs sent. Corresponds to `ReporterMetrics.updateQueuedBytes`. Default 1% of heap -`messageMaxBytes` | Maximum bytes sendable per message including overhead. Default `500,000` bytes (`500KB`). Defined by `Sender.messageMaxBytes` +`messageMaxBytes` | Maximum bytes sendable per message including overhead. Default `500,000` bytes (`500KB`). Defined by `BytesMessageSender.messageMaxBytes` `messageTimeout` | Maximum time to wait for messageMaxBytes to accumulate before sending. Default 1 second `closeTimeout` | Maximum time to block for in-flight spans to send on close. Default 1 second @@ -84,11 +84,12 @@ by a large `messageTimeout` or `messageMaxBytes`. Consider lowering the `messageMaxBytes` if this occurs, as it will result in less work per message. -## Sender +## BytesMessageSender The sender component handles the last step of sending a list of encoded spans onto a transport. -This involves I/O, so you can call `Sender.check()` to check its health on a given frequency. +This involves I/O, so you can call `sender.send(Collections.emptyList())` to check it works before +using. -Sender is used by AsyncReporter, but you can also create your own if you need to. +BytesMessageSender is used by AsyncReporter, but you can also create your own if you need to. ```java class CustomReporter implements Flushable { @@ -99,7 +100,12 @@ class CustomReporter implements Flushable { // Is the connection healthy? public boolean ok() { - return sender.check().ok(); + try { + sender.send(Collections.emptyList()); + return true; + } catch (Exception e) { + return false; + } } public void report(Span span) { @@ -113,7 +119,7 @@ class CustomReporter implements Flushable { pending.drainTo(drained); if (drained.isEmpty()) return; - sender.sendSpans(drained, callback).execute(); + sender.send(drained); } ``` diff --git a/activemq-client/pom.xml b/activemq-client/pom.xml index 66d7c8c4..b6a1e50f 100644 --- a/activemq-client/pom.xml +++ b/activemq-client/pom.xml @@ -18,7 +18,7 @@ zipkin-reporter-parent io.zipkin.reporter2 - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT 4.0.0 diff --git a/activemq-client/src/main/java/zipkin2/reporter/activemq/ActiveMQSender.java b/activemq-client/src/main/java/zipkin2/reporter/activemq/ActiveMQSender.java index 6735271a..dedfa3e0 100644 --- a/activemq-client/src/main/java/zipkin2/reporter/activemq/ActiveMQSender.java +++ b/activemq-client/src/main/java/zipkin2/reporter/activemq/ActiveMQSender.java @@ -19,7 +19,9 @@ import javax.jms.JMSException; import javax.jms.QueueSender; import org.apache.activemq.ActiveMQConnectionFactory; +import zipkin2.reporter.AsyncReporter; import zipkin2.reporter.BytesMessageEncoder; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Call; import zipkin2.reporter.Callback; import zipkin2.reporter.CheckResult; @@ -32,7 +34,7 @@ * *

Usage

*

- * This type is designed for {@link zipkin2.reporter.AsyncReporter.Builder#builder(Sender) the async + * This type is designed for {@link AsyncReporter.Builder#builder(BytesMessageSender) the async * reporter}. * *

Here's a simple configuration, configured for json: @@ -150,13 +152,33 @@ public final ActiveMQSender build() { return encoding.listSizeInBytes(encodedSpans); } - @Override public Call sendSpans(List encodedSpans) { + /** {@inheritDoc} */ + @Override @Deprecated public Call sendSpans(List encodedSpans) { if (closeCalled) throw new ClosedSenderException(); byte[] message = encoder.encode(encodedSpans); return new ActiveMQCall(message); } - @Override public CheckResult check() { + /** {@inheritDoc} */ + @Override public void send(List encodedSpans) throws IOException { + if (closeCalled) throw new ClosedSenderException(); + send(encoder.encode(encodedSpans)); + } + + void send(byte[] message) throws IOException { + try { + ActiveMQConn conn = lazyInit.get(); + QueueSender sender = conn.sender; + BytesMessage bytesMessage = conn.session.createBytesMessage(); + bytesMessage.writeBytes(message); + sender.send(bytesMessage); + } catch (JMSException e) { + throw ioException("Unable to send message: ", e); + } + } + + /** {@inheritDoc} */ + @Override @Deprecated public CheckResult check() { try { lazyInit.get(); } catch (Throwable t) { @@ -171,7 +193,7 @@ public final ActiveMQSender build() { lazyInit.close(); } - @Override public final String toString() { + @Override public String toString() { return "ActiveMQSender{" + "brokerURL=" + lazyInit.connectionFactory.getBrokerURL() + ", queue=" + lazyInit.queue @@ -186,29 +208,17 @@ final class ActiveMQCall extends Call.Base { // ActiveMQCall is not cancel } @Override protected Void doExecute() throws IOException { - send(); + send(message); return null; } - void send() throws IOException { - try { - ActiveMQConn conn = lazyInit.get(); - QueueSender sender = conn.sender; - BytesMessage bytesMessage = conn.session.createBytesMessage(); - bytesMessage.writeBytes(message); - sender.send(bytesMessage); - } catch (JMSException e) { - throw ioException("Unable to send message: ", e); - } - } - @Override public Call clone() { return new ActiveMQCall(message); } @Override protected void doEnqueue(Callback callback) { try { - send(); + send(message); callback.onSuccess(null); } catch (Throwable t) { Call.propagateIfFatal(t); diff --git a/activemq-client/src/test/java/zipkin2/reporter/activemq/ITActiveMQSender.java b/activemq-client/src/test/java/zipkin2/reporter/activemq/ITActiveMQSender.java index 4a23094e..f363e9e7 100644 --- a/activemq-client/src/test/java/zipkin2/reporter/activemq/ITActiveMQSender.java +++ b/activemq-client/src/test/java/zipkin2/reporter/activemq/ITActiveMQSender.java @@ -14,6 +14,7 @@ package zipkin2.reporter.activemq; import java.io.IOException; +import java.util.Collections; import java.util.stream.Stream; import javax.jms.BytesMessage; import javax.jms.MessageConsumer; @@ -25,12 +26,10 @@ import org.testcontainers.junit.jupiter.Testcontainers; import zipkin2.Span; import zipkin2.codec.SpanBytesDecoder; -import zipkin2.reporter.SpanBytesEncoder; import zipkin2.reporter.AsyncReporter; -import zipkin2.reporter.Call; -import zipkin2.reporter.CheckResult; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; +import zipkin2.reporter.SpanBytesEncoder; import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; @@ -43,55 +42,46 @@ class ITActiveMQSender { @Container ActiveMQContainer activemq = new ActiveMQContainer(); - @Test void checkPasses() { + @Test void emptyOk() throws Exception { try (ActiveMQSender sender = activemq.newSenderBuilder("checkPasses").build()) { - assertThat(sender.check().ok()).isTrue(); - } - } - - @Test void checkFalseWhenBrokerIsDown() { - // we can be pretty certain ActiveMQ isn't running on localhost port 80 - try (ActiveMQSender sender = ActiveMQSender.create("tcp://localhost:80")) { - CheckResult check = sender.check(); - assertThat(check.ok()).isFalse(); - assertThat(check.error()).isInstanceOf(IOException.class); + sender.send(Collections.emptyList()); } } @Test void sendFailsWithInvalidActiveMqServer() { // we can be pretty certain ActiveMQ isn't running on localhost port 80 try (ActiveMQSender sender = ActiveMQSender.create("tcp://localhost:80")) { - assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN).execute()).isInstanceOf( - IOException.class) + assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN)) + .isInstanceOf(IOException.class) .hasMessageContaining("Unable to establish connection to ActiveMQ broker"); } } - @Test void sendsSpans() throws Exception { - try (ActiveMQSender sender = activemq.newSenderBuilder("sendsSpans").build()) { - send(sender, CLIENT_SPAN, CLIENT_SPAN).execute(); + @Test void send() throws Exception { + try (ActiveMQSender sender = activemq.newSenderBuilder("send").build()) { + send(sender, CLIENT_SPAN, CLIENT_SPAN); assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender))).containsExactly( CLIENT_SPAN, CLIENT_SPAN); } } - @Test void sendsSpans_PROTO3() throws Exception { - try (ActiveMQSender sender = activemq.newSenderBuilder("sendsSpans_PROTO3") + @Test void send_PROTO3() throws Exception { + try (ActiveMQSender sender = activemq.newSenderBuilder("send_PROTO3") .encoding(Encoding.PROTO3) .build()) { - send(sender, CLIENT_SPAN, CLIENT_SPAN).execute(); + send(sender, CLIENT_SPAN, CLIENT_SPAN); assertThat(SpanBytesDecoder.PROTO3.decodeList(readMessage(sender))).containsExactly( CLIENT_SPAN, CLIENT_SPAN); } } - @Test void sendsSpans_THRIFT() throws Exception { - try (ActiveMQSender sender = activemq.newSenderBuilder("sendsSpans_THRIFT") + @Test void send_THRIFT() throws Exception { + try (ActiveMQSender sender = activemq.newSenderBuilder("send_THRIFT") .encoding(Encoding.THRIFT) .build()) { - send(sender, CLIENT_SPAN, CLIENT_SPAN).execute(); + send(sender, CLIENT_SPAN, CLIENT_SPAN); assertThat(SpanBytesDecoder.THRIFT.decodeList(readMessage(sender))).containsExactly( CLIENT_SPAN, CLIENT_SPAN); @@ -101,16 +91,16 @@ class ITActiveMQSender { @Test void illegalToSendWhenClosed() { try (ActiveMQSender sender = activemq.newSenderBuilder("illegalToSendWhenClosed").build()) { sender.close(); - assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN).execute()).isInstanceOf( + assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN)).isInstanceOf( IllegalStateException.class); } } /** - * The output of toString() on {@link Sender} implementations appears in thread names created by - * {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other monitoring - * tools, care should be taken to ensure the toString() output is a reasonable length and does not - * contain sensitive information. + * The output of toString() on {@link BytesMessageSender} implementations appears in thread names + * created by {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other + * monitoring tools, care should be taken to ensure the toString() output is a reasonable length + * and does not contain sensitive information. */ @Test void toStringContainsOnlySummaryInformation() { try (ActiveMQSender sender = activemq.newSenderBuilder("toString").build()) { @@ -119,7 +109,7 @@ class ITActiveMQSender { } } - Call send(ActiveMQSender sender, Span... spans) { + void send(ActiveMQSender sender, Span... spans) throws IOException { SpanBytesEncoder bytesEncoder; switch (sender.encoding()) { case JSON: @@ -134,7 +124,7 @@ Call send(ActiveMQSender sender, Span... spans) { default: throw new UnsupportedOperationException("encoding: " + sender.encoding()); } - return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); + sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); } byte[] readMessage(ActiveMQSender sender) throws Exception { diff --git a/amqp-client/pom.xml b/amqp-client/pom.xml index fc694056..d2323453 100644 --- a/amqp-client/pom.xml +++ b/amqp-client/pom.xml @@ -20,7 +20,7 @@ io.zipkin.reporter2 zipkin-reporter-parent - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT zipkin-sender-amqp-client diff --git a/amqp-client/src/main/java/zipkin2/reporter/amqp/RabbitMQSender.java b/amqp-client/src/main/java/zipkin2/reporter/amqp/RabbitMQSender.java index baed1fa1..490ee764 100644 --- a/amqp-client/src/main/java/zipkin2/reporter/amqp/RabbitMQSender.java +++ b/amqp-client/src/main/java/zipkin2/reporter/amqp/RabbitMQSender.java @@ -21,7 +21,9 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeoutException; +import zipkin2.reporter.AsyncReporter; import zipkin2.reporter.BytesMessageEncoder; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Call; import zipkin2.reporter.Callback; import zipkin2.reporter.CheckResult; @@ -36,7 +38,7 @@ * *

Usage

*

- * This type is designed for {@link zipkin2.reporter.AsyncReporter.Builder#builder(Sender) the async + * This type is designed for {@link AsyncReporter.Builder#builder(BytesMessageSender) the async * reporter}. * *

Here's a simple configuration, configured for json: @@ -69,7 +71,7 @@ * RabbitMQ failure. * *

This sender is thread-safe: a channel is created for each thread that calls - * {@link #sendSpans(List)}. + * {@link #send(List)}. */ public final class RabbitMQSender extends Sender { /** Creates a sender that sends {@link Encoding#JSON} messages. */ @@ -213,15 +215,25 @@ public Builder toBuilder() { return encoding.listSizeInBytes(encodedSizeInBytes); } - /** This sends all of the spans as a single message. */ - @Override public Call sendSpans(List encodedSpans) { + /** {@inheritDoc} */ + @Override @Deprecated public Call sendSpans(List encodedSpans) { if (closeCalled) throw new ClosedSenderException(); byte[] message = encoder.encode(encodedSpans); return new RabbitMQCall(message); } - /** Ensures there are no connection issues. */ - @Override public CheckResult check() { + /** {@inheritDoc} */ + @Override public void send(List encodedSpans) throws IOException { + if (closeCalled) throw new ClosedSenderException(); + publish(encoder.encode(encodedSpans)); + } + + void publish(byte[] message) throws IOException { + localChannel().basicPublish("", queue, null, message); + } + + /** {@inheritDoc} */ + @Override @Deprecated public CheckResult check() { try { if (localChannel().isOpen()) return CheckResult.OK; throw new IllegalStateException("Not Open"); @@ -266,7 +278,7 @@ Connection newConnection() { final ThreadLocal CHANNEL = new ThreadLocal(); /** - * In most circumstances there will only be one thread calling {@link #sendSpans(List)}, the + * In most circumstances there will only be one thread calling {@link #send(List)}, the * {@link AsyncReporter}. Just in case someone is flushing manually, we use a thread-local. All of * this is to avoid recreating a channel for each publish, as that costs two additional network * roundtrips. @@ -288,17 +300,13 @@ class RabbitMQCall extends Call.Base { // RabbitMQFuture is not cancelable } @Override protected Void doExecute() throws IOException { - publish(); + publish(message); return null; } - void publish() throws IOException { - localChannel().basicPublish("", queue, null, message); - } - @Override protected void doEnqueue(Callback callback) { try { - publish(); + publish(message); callback.onSuccess(null); } catch (Throwable t) { Call.propagateIfFatal(t); diff --git a/amqp-client/src/test/java/zipkin2/reporter/amqp/ITRabbitMQSender.java b/amqp-client/src/test/java/zipkin2/reporter/amqp/ITRabbitMQSender.java index 6cfe8674..0d4e4c2b 100644 --- a/amqp-client/src/test/java/zipkin2/reporter/amqp/ITRabbitMQSender.java +++ b/amqp-client/src/test/java/zipkin2/reporter/amqp/ITRabbitMQSender.java @@ -17,6 +17,8 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; +import java.io.IOException; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -28,9 +30,8 @@ import org.testcontainers.junit.jupiter.Testcontainers; import zipkin2.Span; import zipkin2.codec.SpanBytesDecoder; -import zipkin2.reporter.Call; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; import zipkin2.reporter.SpanBytesEncoder; import static java.util.stream.Collectors.toList; @@ -43,34 +44,40 @@ public class ITRabbitMQSender { // public for use in src/it @Container RabbitMQContainer rabbit = new RabbitMQContainer(); - @Test void sendsSpans() throws Exception { - try (RabbitMQSender sender = rabbit.newSenderBuilder("sendsSpans").build()) { + @Test void emptyOk() throws Exception { + try (RabbitMQSender sender = rabbit.newSenderBuilder("emptyOk").build()) { + sender.send(Collections.emptyList()); + } + } + + @Test void send() throws Exception { + try (RabbitMQSender sender = rabbit.newSenderBuilder("send").build()) { - send(sender, CLIENT_SPAN, CLIENT_SPAN).execute(); + send(sender, CLIENT_SPAN, CLIENT_SPAN); assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender))) .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } } - @Test void sendsSpans_PROTO3() throws Exception { - try (RabbitMQSender sender = rabbit.newSenderBuilder("sendsSpans_PROTO3") + @Test void send_PROTO3() throws Exception { + try (RabbitMQSender sender = rabbit.newSenderBuilder("send_PROTO3") .encoding(Encoding.PROTO3) .build()) { - send(sender, CLIENT_SPAN, CLIENT_SPAN).execute(); + send(sender, CLIENT_SPAN, CLIENT_SPAN); assertThat(SpanBytesDecoder.PROTO3.decodeList(readMessage(sender))) .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } } - @Test void sendsSpans_configuredQueueDoesntExist() throws Exception { + @Test void send_configuredQueueDoesntExist() throws Exception { try (RabbitMQSender sender = rabbit.newSenderBuilder("ignored") - .queue("sendsSpans_configuredQueueDoesntExist") + .queue("send_configuredQueueDoesntExist") .build()) { - send(sender, CLIENT_SPAN, CLIENT_SPAN).execute(); // doesn't raise exception + send(sender, CLIENT_SPAN, CLIENT_SPAN); // doesn't raise exception } } @@ -78,7 +85,7 @@ public class ITRabbitMQSender { // public for use in src/it try (RabbitMQSender sender = rabbit.newSenderBuilder("shouldCloseRabbitMQConnectionOnClose") .build()) { - send(sender, CLIENT_SPAN, CLIENT_SPAN).execute(); + send(sender, CLIENT_SPAN, CLIENT_SPAN); sender.close(); @@ -88,10 +95,10 @@ public class ITRabbitMQSender { // public for use in src/it } /** Blocks until the callback completes to allow read-your-writes consistency during tests. */ - static Call send(Sender sender, Span... spans) { + static void send(BytesMessageSender sender, Span... spans) throws IOException { SpanBytesEncoder bytesEncoder = sender.encoding() == Encoding.JSON ? SpanBytesEncoder.JSON_V2 : SpanBytesEncoder.PROTO3; - return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); + sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); } byte[] readMessage(RabbitMQSender sender) throws Exception { diff --git a/amqp-client/src/test/java/zipkin2/reporter/amqp/RabbitMQSenderTest.java b/amqp-client/src/test/java/zipkin2/reporter/amqp/RabbitMQSenderTest.java index ec46540c..ca223f46 100644 --- a/amqp-client/src/test/java/zipkin2/reporter/amqp/RabbitMQSenderTest.java +++ b/amqp-client/src/test/java/zipkin2/reporter/amqp/RabbitMQSenderTest.java @@ -15,9 +15,8 @@ import org.junit.jupiter.api.Test; import zipkin2.reporter.AsyncReporter; -import zipkin2.reporter.CheckResult; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.ClosedSenderException; -import zipkin2.reporter.Sender; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -29,11 +28,14 @@ class RabbitMQSenderTest { RabbitMQSender sender = RabbitMQSender.newBuilder() .connectionTimeout(100).addresses("localhost:80").build(); - @Test void checkFalseWhenRabbitMQIsDown() { - CheckResult check = sender.check(); - assertThat(check.ok()).isFalse(); - assertThat(check.error()) - .isInstanceOf(RuntimeException.class); + @Test void sendFailsWhenRabbitMQIsDown() { + // We can be pretty certain RabbitMQ isn't running on localhost port 80 + RabbitMQSender sender = RabbitMQSender.newBuilder() + .connectionTimeout(100).addresses("localhost:80").build(); + + assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN)) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("Unable to establish connection to RabbitMQ server"); } @Test void illegalToSendWhenClosed() throws Exception { @@ -44,10 +46,10 @@ class RabbitMQSenderTest { } /** - * The output of toString() on {@link Sender} implementations appears in thread names created by - * {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other monitoring - * tools, care should be taken to ensure the toString() output is a reasonable length and does not - * contain sensitive information. + * The output of toString() on {@link BytesMessageSender} implementations appears in thread names + * created by {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other + * monitoring tools, care should be taken to ensure the toString() output is a reasonable length + * and does not contain sensitive information. */ @Test void toStringContainsOnlySummaryInformation() { assertThat(sender).hasToString( diff --git a/benchmarks/pom.xml b/benchmarks/pom.xml index 4f12a332..07524801 100644 --- a/benchmarks/pom.xml +++ b/benchmarks/pom.xml @@ -20,7 +20,7 @@ io.zipkin.reporter2 zipkin-reporter-parent - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT benchmarks diff --git a/benchmarks/src/main/java/zipkin2/reporter/HttpSenderBenchmarks.java b/benchmarks/src/main/java/zipkin2/reporter/HttpSenderBenchmarks.java index 74a7c3ed..37beea53 100644 --- a/benchmarks/src/main/java/zipkin2/reporter/HttpSenderBenchmarks.java +++ b/benchmarks/src/main/java/zipkin2/reporter/HttpSenderBenchmarks.java @@ -26,7 +26,7 @@ public abstract class HttpSenderBenchmarks extends SenderBenchmarks { Server server; - @Override protected Sender createSender() { + @Override protected BytesMessageSender createSender() { Route v2JsonSpans = Route.builder().methods(POST).consumes(JSON).path("/api/v2/spans").build(); server = Server.builder() .http(0) @@ -37,7 +37,7 @@ public abstract class HttpSenderBenchmarks extends SenderBenchmarks { return newHttpSender(url("/api/v2/spans")); } - abstract Sender newHttpSender(String endpoint); + abstract BytesMessageSender newHttpSender(String endpoint); @Override protected void afterSenderClose() { server.stop().join(); diff --git a/benchmarks/src/main/java/zipkin2/reporter/KafkaSenderBenchmarks.java b/benchmarks/src/main/java/zipkin2/reporter/KafkaSenderBenchmarks.java index 52d20f2d..b8805911 100644 --- a/benchmarks/src/main/java/zipkin2/reporter/KafkaSenderBenchmarks.java +++ b/benchmarks/src/main/java/zipkin2/reporter/KafkaSenderBenchmarks.java @@ -57,7 +57,7 @@ String bootstrapServer() { KafkaContainer kafka; KafkaConsumer consumer; - @Override protected Sender createSender() { + @Override protected BytesMessageSender createSender() { kafka = new KafkaContainer(); kafka.start(); diff --git a/benchmarks/src/main/java/zipkin2/reporter/OkHttpSenderBenchmarks.java b/benchmarks/src/main/java/zipkin2/reporter/OkHttpSenderBenchmarks.java index 55b14df7..4f607873 100644 --- a/benchmarks/src/main/java/zipkin2/reporter/OkHttpSenderBenchmarks.java +++ b/benchmarks/src/main/java/zipkin2/reporter/OkHttpSenderBenchmarks.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 The OpenZipkin Authors + * Copyright 2016-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -21,7 +21,7 @@ public class OkHttpSenderBenchmarks extends HttpSenderBenchmarks { - @Override Sender newHttpSender(String endpoint) { + @Override BytesMessageSender newHttpSender(String endpoint) { return OkHttpSender.create(endpoint); } diff --git a/benchmarks/src/main/java/zipkin2/reporter/URLConnectionSenderBenchmarks.java b/benchmarks/src/main/java/zipkin2/reporter/URLConnectionSenderBenchmarks.java index 789e8693..2dbf9c7a 100644 --- a/benchmarks/src/main/java/zipkin2/reporter/URLConnectionSenderBenchmarks.java +++ b/benchmarks/src/main/java/zipkin2/reporter/URLConnectionSenderBenchmarks.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 The OpenZipkin Authors + * Copyright 2016-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -21,7 +21,7 @@ public class URLConnectionSenderBenchmarks extends HttpSenderBenchmarks { - @Override Sender newHttpSender(String endpoint) { + @Override BytesMessageSender newHttpSender(String endpoint) { return URLConnectionSender.create(endpoint); } diff --git a/benchmarks/src/main/java/zipkin2/reporter/amqp/RabbitMQSenderBenchmarks.java b/benchmarks/src/main/java/zipkin2/reporter/amqp/RabbitMQSenderBenchmarks.java index 0c2b5068..2ea1dbcb 100644 --- a/benchmarks/src/main/java/zipkin2/reporter/amqp/RabbitMQSenderBenchmarks.java +++ b/benchmarks/src/main/java/zipkin2/reporter/amqp/RabbitMQSenderBenchmarks.java @@ -16,42 +16,40 @@ import com.rabbitmq.client.Channel; import com.rabbitmq.client.DefaultConsumer; import java.io.IOException; +import java.util.Collections; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.RunnerException; import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; -import zipkin2.reporter.CheckResult; -import zipkin2.reporter.Sender; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.internal.SenderBenchmarks; public class RabbitMQSenderBenchmarks extends SenderBenchmarks { private Channel channel; - @Override protected Sender createSender() throws Exception { - RabbitMQSender result = RabbitMQSender.newBuilder() - .queue("zipkin-jmh") - .addresses("localhost:5672").build(); + @Override protected BytesMessageSender createSender() throws Exception { + RabbitMQSender sender = RabbitMQSender.newBuilder() + .queue("zipkin-jmh") + .addresses("localhost:5672").build(); - CheckResult check = result.check(); - if (!check.ok()) { - throw new IllegalStateException(check.error().getMessage(), check.error()); - } + // check sender works at all + sender.send(Collections.emptyList()); - channel = result.localChannel(); - channel.queueDelete(result.queue); - channel.queueDeclare(result.queue, false, true, true, null); + channel = sender.localChannel(); + channel.queueDelete(sender.queue); + channel.queueDeclare(sender.queue, false, true, true, null); Thread.sleep(500L); new Thread(() -> { try { - channel.basicConsume(result.queue, true, new DefaultConsumer(channel)); + channel.basicConsume(sender.queue, true, new DefaultConsumer(channel)); } catch (IOException e) { e.printStackTrace(); } }).start(); - return result; + return sender; } @Override protected void afterSenderClose() { @@ -61,8 +59,8 @@ public class RabbitMQSenderBenchmarks extends SenderBenchmarks { // Convenience main entry-point public static void main(String[] args) throws RunnerException { Options opt = new OptionsBuilder() - .include(".*" + RabbitMQSenderBenchmarks.class.getSimpleName() + ".*") - .build(); + .include(".*" + RabbitMQSenderBenchmarks.class.getSimpleName() + ".*") + .build(); new Runner(opt).run(); } diff --git a/benchmarks/src/main/java/zipkin2/reporter/internal/NoopSender.java b/benchmarks/src/main/java/zipkin2/reporter/internal/NoopSender.java index 7b444a68..f11afcb4 100644 --- a/benchmarks/src/main/java/zipkin2/reporter/internal/NoopSender.java +++ b/benchmarks/src/main/java/zipkin2/reporter/internal/NoopSender.java @@ -15,21 +15,18 @@ import java.util.List; import zipkin2.reporter.BytesMessageEncoder; -import zipkin2.reporter.Call; -import zipkin2.reporter.CheckResult; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; -final class NoopSender extends Sender { - - final Encoding encoding; +/** Encodes messages on {@link #send(List)}, but doesn't do anything else. */ +final class NoopSender extends BytesMessageSender.Base { final BytesMessageEncoder messageEncoder; /** close is typically called from a different thread */ volatile boolean closeCalled; NoopSender(Encoding encoding) { - this.encoding = encoding; + super(encoding); this.messageEncoder = BytesMessageEncoder.forEncoding(encoding); } @@ -37,25 +34,8 @@ final class NoopSender extends Sender { return Integer.MAX_VALUE; } - @Override public Encoding encoding() { - return encoding; - } - - @Override public int messageSizeInBytes(List encodedSpans) { - return encoding().listSizeInBytes(encodedSpans); - } - - @Override public int messageSizeInBytes(int encodedSizeInBytes) { - return encoding().listSizeInBytes(encodedSizeInBytes); - } - - @Override public Call sendSpans(List encodedSpans) { + @Override public void send(List encodedSpans) { messageEncoder.encode(encodedSpans); - return Call.create(null); - } - - @Override public CheckResult check() { - return CheckResult.OK; } @Override public void close() { diff --git a/benchmarks/src/main/java/zipkin2/reporter/internal/SenderBenchmarks.java b/benchmarks/src/main/java/zipkin2/reporter/internal/SenderBenchmarks.java index f32c570d..2f460c28 100644 --- a/benchmarks/src/main/java/zipkin2/reporter/internal/SenderBenchmarks.java +++ b/benchmarks/src/main/java/zipkin2/reporter/internal/SenderBenchmarks.java @@ -13,6 +13,7 @@ */ package zipkin2.reporter.internal; +import java.util.Collections; import java.util.concurrent.TimeUnit; import org.openjdk.jmh.annotations.AuxCounters; import org.openjdk.jmh.annotations.Benchmark; @@ -31,9 +32,9 @@ import org.openjdk.jmh.annotations.Warmup; import zipkin2.Span; import zipkin2.TestObjects; -import zipkin2.reporter.CheckResult; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.InMemoryReporterMetrics; -import zipkin2.reporter.Sender; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.SpanBytesEncoder; /** @@ -89,7 +90,7 @@ public void clean() { } } - Sender sender; + BytesMessageSender sender; AsyncReporter.BoundedAsyncReporter reporter; @@ -97,8 +98,8 @@ public void clean() { public void setup() throws Throwable { sender = createSender(); - CheckResult senderCheck = sender.check(); - if (!senderCheck.ok()) throw senderCheck.error(); + // check sender works at all + sender.send(Collections.emptyList()); reporter = (AsyncReporter.BoundedAsyncReporter) AsyncReporter.newBuilder(sender) .messageMaxBytes(messageMaxBytes) @@ -106,7 +107,7 @@ public void setup() throws Throwable { .metrics(metrics).build(SpanBytesEncoder.JSON_V2); } - protected abstract Sender createSender() throws Exception; + protected abstract BytesMessageSender createSender() throws Exception; @Setup(Level.Iteration) public void fillQueue() { diff --git a/bom/pom.xml b/bom/pom.xml index 57dbd94e..699e758e 100644 --- a/bom/pom.xml +++ b/bom/pom.xml @@ -20,7 +20,7 @@ io.zipkin.reporter2 zipkin-reporter-bom Zipkin Reporter BOM - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT pom Bill Of Materials POM for all Zipkin reporter artifacts diff --git a/brave/pom.xml b/brave/pom.xml index bb4c0c61..13903a79 100644 --- a/brave/pom.xml +++ b/brave/pom.xml @@ -18,7 +18,7 @@ io.zipkin.reporter2 zipkin-reporter-parent - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT 4.0.0 diff --git a/brave/src/it/no_zipkin_deps/src/test/java/zipkin2/reporter/brave/no_zipkin_deps/AsyncZipkinSpanHandlerTest.java b/brave/src/it/no_zipkin_deps/src/test/java/zipkin2/reporter/brave/no_zipkin_deps/AsyncZipkinSpanHandlerTest.java index 11e73ecd..8eff74fa 100644 --- a/brave/src/it/no_zipkin_deps/src/test/java/zipkin2/reporter/brave/no_zipkin_deps/AsyncZipkinSpanHandlerTest.java +++ b/brave/src/it/no_zipkin_deps/src/test/java/zipkin2/reporter/brave/no_zipkin_deps/AsyncZipkinSpanHandlerTest.java @@ -40,7 +40,7 @@ class AsyncZipkinSpanHandlerTest { OkHttpSender sender = OkHttpSender.newBuilder().endpoint(endpoint).compressionEnabled(false).build(); - @Test void sendsSpans() throws Exception { + @Test void send() throws Exception { server.enqueue(new MockResponse()); try (AsyncZipkinSpanHandler zipkinSpanHandler = AsyncZipkinSpanHandler.newBuilder(sender) diff --git a/brave/src/main/java/zipkin2/reporter/brave/AsyncZipkinSpanHandler.java b/brave/src/main/java/zipkin2/reporter/brave/AsyncZipkinSpanHandler.java index c52d7d20..f8594c2f 100644 --- a/brave/src/main/java/zipkin2/reporter/brave/AsyncZipkinSpanHandler.java +++ b/brave/src/main/java/zipkin2/reporter/brave/AsyncZipkinSpanHandler.java @@ -22,6 +22,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import zipkin2.reporter.BytesEncoder; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Encoding; import zipkin2.reporter.Reporter; import zipkin2.reporter.ReporterMetrics; @@ -31,7 +32,7 @@ /** * A {@link brave.handler.SpanHandler} that queues spans on {@link #end} to bundle and send as a * bulk Zipkin JSON V2 message. When the {@link - * Sender} is HTTP, the endpoint is usually "http://zipkinhost:9411/api/v2/spans". + * BytesMessageSender} is HTTP, the endpoint is usually "http://zipkinhost:9411/api/v2/spans". * *

Example: *

{@code
@@ -46,12 +47,12 @@
  */
 public final class AsyncZipkinSpanHandler extends SpanHandler implements Closeable, Flushable {
   /** @since 2.14 */
-  public static AsyncZipkinSpanHandler create(Sender sender) {
+  public static AsyncZipkinSpanHandler create(BytesMessageSender sender) {
     return newBuilder(sender).build();
   }
 
   /** @since 2.14 */
-  public static Builder newBuilder(Sender sender) {
+  public static Builder newBuilder(BytesMessageSender sender) {
     if (sender == null) throw new NullPointerException("sender == null");
     return new Builder(sender);
   }
@@ -80,7 +81,7 @@ public static final class Builder extends ZipkinSpanHandler.Builder {
       this.errorTag = handler.errorTag;
     }
 
-    Builder(Sender sender) {
+    Builder(BytesMessageSender sender) {
       this.delegate = AsyncReporter.newBuilder(sender);
       this.encoding = sender.encoding();
     }
diff --git a/brave/src/test/java/zipkin2/reporter/brave/AsyncZipkinSpanHandlerTest.java b/brave/src/test/java/zipkin2/reporter/brave/AsyncZipkinSpanHandlerTest.java
index 575ee58c..482639ff 100644
--- a/brave/src/test/java/zipkin2/reporter/brave/AsyncZipkinSpanHandlerTest.java
+++ b/brave/src/test/java/zipkin2/reporter/brave/AsyncZipkinSpanHandlerTest.java
@@ -14,9 +14,21 @@
 package zipkin2.reporter.brave;
 
 import brave.handler.MutableSpan;
+import brave.handler.SpanHandler;
+import brave.propagation.TraceContext;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
 import org.junit.jupiter.api.Test;
+import zipkin2.Span;
+import zipkin2.codec.SpanBytesDecoder;
 import zipkin2.reporter.BytesEncoder;
+import zipkin2.reporter.Call;
+import zipkin2.reporter.Callback;
 import zipkin2.reporter.Encoding;
+import zipkin2.reporter.Sender;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -50,4 +62,106 @@ class AsyncZipkinSpanHandlerTest {
       assertThat(spanReporter).isNotNull();
     }
   }
+
+  @Test void example() {
+    AtomicInteger sentSpans = new AtomicInteger();
+    try (AsyncZipkinSpanHandler spanHandler = AsyncZipkinSpanHandler.newBuilder(FakeSender.create()
+        .onSpans(spans -> sentSpans.addAndGet(spans.size())))
+      .messageTimeout(0, TimeUnit.MILLISECONDS) // no thread
+      .build()) {
+
+      TraceContext context = TraceContext.newBuilder().traceId(1).spanId(2).sampled(true).build();
+      MutableSpan span = new MutableSpan();
+      span.traceId("1");
+      span.id("2");
+      span.name("test");
+      spanHandler.end(context, span, SpanHandler.Cause.FINISHED);
+      spanHandler.flush();
+    }
+
+    assertThat(sentSpans.get()).isEqualTo(1);
+  }
+
+  @Deprecated @Test void example_deprecatedSender() {
+    AtomicInteger sentSpans = new AtomicInteger();
+    try (AsyncZipkinSpanHandler spanHandler = AsyncZipkinSpanHandler.newBuilder(
+        new DeprecatedCheatingSender(
+          spans -> sentSpans.addAndGet(spans.size())
+        ))
+      .messageTimeout(0, TimeUnit.MILLISECONDS) // no thread
+      .build()) {
+
+      TraceContext context = TraceContext.newBuilder().traceId(1).spanId(2).sampled(true).build();
+      MutableSpan span = new MutableSpan();
+      span.traceId("1");
+      span.id("2");
+      span.name("test");
+      spanHandler.end(context, span, SpanHandler.Cause.FINISHED);
+      spanHandler.flush();
+    }
+
+    assertThat(sentSpans.get()).isEqualTo(1);
+  }
+
+  @Deprecated static class DeprecatedCheatingSender extends Sender {
+    final Consumer> onSpans;
+
+    DeprecatedCheatingSender(Consumer> onSpans) {
+      this.onSpans = onSpans;
+    }
+
+    @Override public Encoding encoding() {
+      return Encoding.JSON;
+    }
+
+    @Override public int messageMaxBytes() {
+      return 500;
+    }
+
+    @Override public int messageSizeInBytes(List encodedSpans) {
+      return Encoding.JSON.listSizeInBytes(encodedSpans);
+    }
+
+    @Override public int messageSizeInBytes(int encodedSizeInBytes) {
+      return Encoding.JSON.listSizeInBytes(encodedSizeInBytes);
+    }
+
+    @Override public Call sendSpans(List encodedSpans) {
+      List decoded = encodedSpans.stream()
+        .map(SpanBytesDecoder.JSON_V2::decodeOne).
+        collect(Collectors.toList());
+      return new CheatingVoidCall(onSpans, decoded);
+    }
+  }
+
+  @Deprecated static class CheatingVoidCall extends Call {
+    final Consumer> onSpans;
+    final List spans;
+
+    CheatingVoidCall(Consumer> onSpans, List spans) {
+      this.onSpans = onSpans;
+      this.spans = spans;
+    }
+
+    @Override public Void execute() {
+      onSpans.accept(spans);
+      return null;
+    }
+
+    @Override public void enqueue(Callback callback) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override public void cancel() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override public boolean isCanceled() {
+      return false;
+    }
+
+    @Override public Call clone() {
+      throw new UnsupportedOperationException();
+    }
+  }
 }
diff --git a/brave/src/test/java/zipkin2/reporter/brave/ConvertingSpanReporterTest.java b/brave/src/test/java/zipkin2/reporter/brave/ConvertingSpanReporterTest.java
index 27c4ece7..05c4f8c0 100644
--- a/brave/src/test/java/zipkin2/reporter/brave/ConvertingSpanReporterTest.java
+++ b/brave/src/test/java/zipkin2/reporter/brave/ConvertingSpanReporterTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-2023 The OpenZipkin Authors
+ * Copyright 2016-2024 The OpenZipkin Authors
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
  * in compliance with the License. You may obtain a copy of the License at
@@ -54,24 +54,24 @@ static class ListSpanReporter extends ArrayList implements Reporter
 
   @Test void generateKindMap() {
     assertThat(ConvertingSpanReporter.generateKindMap()).containsExactly(
-        entry(CLIENT, Span.Kind.CLIENT),
-        entry(SERVER, Span.Kind.SERVER),
-        entry(PRODUCER, Span.Kind.PRODUCER),
-        entry(CONSUMER, Span.Kind.CONSUMER)
+      entry(CLIENT, Span.Kind.CLIENT),
+      entry(SERVER, Span.Kind.SERVER),
+      entry(PRODUCER, Span.Kind.PRODUCER),
+      entry(CONSUMER, Span.Kind.CONSUMER)
     );
   }
 
   @Test void equalsAndHashCode() {
     assertThat(spanReporter)
-        .hasSameHashCodeAs(spans)
-        .isEqualTo(new ConvertingSpanReporter(spans, Tags.ERROR));
+      .hasSameHashCodeAs(spans)
+      .isEqualTo(new ConvertingSpanReporter(spans, Tags.ERROR));
 
     ConvertingSpanReporter otherReporter = new ConvertingSpanReporter(spans::add, Tags.ERROR);
 
     assertThat(spanReporter)
-        .isNotEqualTo(otherReporter)
-        .extracting(Objects::hashCode)
-        .isNotEqualTo(otherReporter.hashCode());
+      .isNotEqualTo(otherReporter)
+      .extracting(Objects::hashCode)
+      .isNotEqualTo(otherReporter.hashCode());
   }
 
   @Test void convertsSampledSpan() {
@@ -79,10 +79,10 @@ static class ListSpanReporter extends ArrayList implements Reporter
     spanReporter.report(span);
 
     assertThat(spans.get(0)).usingRecursiveComparison().isEqualTo(
-        Span.newBuilder()
-            .traceId("1")
-            .id("2")
-            .build()
+      Span.newBuilder()
+        .traceId("1")
+        .id("2")
+        .build()
     );
   }
 
@@ -92,11 +92,11 @@ static class ListSpanReporter extends ArrayList implements Reporter
     spanReporter.report(span);
 
     assertThat(spans.get(0)).usingRecursiveComparison().isEqualTo(
-        Span.newBuilder()
-            .traceId("0000000000000001")
-            .id("0000000000000002")
-            .debug(true)
-            .build()
+      Span.newBuilder()
+        .traceId("0000000000000001")
+        .id("0000000000000002")
+        .debug(true)
+        .build()
     );
   }
 
@@ -121,10 +121,10 @@ static class ListSpanReporter extends ArrayList implements Reporter
 
     spanReporter.report(span);
     assertThat(spans.get(0).tags()).containsOnly(
-        entry("1", "1"),
-        entry("foo", "baz"),
-        entry("2", "2"),
-        entry("3", "3")
+      entry("1", "1"),
+      entry("foo", "baz"),
+      entry("2", "2"),
+      entry("3", "3")
     );
   }
 
@@ -138,7 +138,7 @@ static class ListSpanReporter extends ArrayList implements Reporter
     spanReporter.report(span);
 
     assertThat(spans.get(0).tags())
-        .containsOnly(entry("error", "RuntimeException"));
+      .containsOnly(entry("error", "RuntimeException"));
   }
 
   @Test void doesntOverwriteErrorTag() {
@@ -150,7 +150,7 @@ static class ListSpanReporter extends ArrayList implements Reporter
     spanReporter.report(span);
 
     assertThat(spans.get(0).tags())
-        .containsOnly(entry("error", ""));
+      .containsOnly(entry("error", ""));
   }
 
   @Test void addsAnnotations() {
@@ -163,7 +163,7 @@ static class ListSpanReporter extends ArrayList implements Reporter
     spanReporter.report(span);
 
     assertThat(spans.get(0).annotations())
-        .containsOnly(Annotation.create(2L, "foo"));
+      .containsOnly(Annotation.create(2L, "foo"));
   }
 
   @Test void finished_client() {
@@ -232,10 +232,10 @@ void flush(brave.Span.Kind braveKind, Span.Kind span2Kind) {
     MutableSpan span = new MutableSpan(context, null);
 
     Endpoint endpoint = Endpoint.newBuilder()
-        .serviceName("fooService")
-        .ip("1.2.3.4")
-        .port(80)
-        .build();
+      .serviceName("fooService")
+      .ip("1.2.3.4")
+      .port(80)
+      .build();
 
     span.kind(CLIENT);
     span.remoteServiceName(endpoint.serviceName());
@@ -246,7 +246,7 @@ void flush(brave.Span.Kind braveKind, Span.Kind span2Kind) {
     spanReporter.report(span);
 
     assertThat(spans.get(0).remoteEndpoint())
-        .isEqualTo(endpoint);
+      .isEqualTo(endpoint);
   }
 
   // This prevents the server startTimestamp from overwriting the client one on the collector
@@ -261,7 +261,7 @@ void flush(brave.Span.Kind braveKind, Span.Kind span2Kind) {
     spanReporter.report(span);
 
     assertThat(spans.get(0).shared())
-        .isTrue();
+      .isTrue();
   }
 
   @Test void flushUnstartedNeitherSetsTimestampNorDuration() {
@@ -271,6 +271,6 @@ void flush(brave.Span.Kind braveKind, Span.Kind span2Kind) {
     spanReporter.report(flushed);
 
     assertThat(spans.get(0)).extracting(Span::timestampAsLong, Span::durationAsLong)
-        .allSatisfy(u -> assertThat(u).isEqualTo(0L));
+      .allSatisfy(u -> assertThat(u).isEqualTo(0L));
   }
 }
diff --git a/brave/src/test/java/zipkin2/reporter/brave/FakeSender.java b/brave/src/test/java/zipkin2/reporter/brave/FakeSender.java
index 5d071c32..d9792361 100644
--- a/brave/src/test/java/zipkin2/reporter/brave/FakeSender.java
+++ b/brave/src/test/java/zipkin2/reporter/brave/FakeSender.java
@@ -21,13 +21,12 @@
 import zipkin2.codec.SpanBytesDecoder;
 import zipkin2.reporter.BytesEncoder;
 import zipkin2.reporter.BytesMessageEncoder;
-import zipkin2.reporter.Call;
+import zipkin2.reporter.BytesMessageSender;
 import zipkin2.reporter.ClosedSenderException;
 import zipkin2.reporter.Encoding;
-import zipkin2.reporter.Sender;
 import zipkin2.reporter.SpanBytesEncoder;
 
-public final class FakeSender extends Sender {
+public final class FakeSender extends BytesMessageSender.Base {
 
   public static FakeSender create() {
     return new FakeSender(Encoding.JSON, Integer.MAX_VALUE,
@@ -36,7 +35,6 @@ public static FakeSender create() {
     });
   }
 
-  final Encoding encoding;
   final int messageMaxBytes;
   final BytesMessageEncoder messageEncoder;
   final BytesEncoder encoder;
@@ -45,7 +43,7 @@ public static FakeSender create() {
 
   FakeSender(Encoding encoding, int messageMaxBytes, BytesMessageEncoder messageEncoder,
     BytesEncoder encoder, BytesDecoder decoder, Consumer> onSpans) {
-    this.encoding = encoding;
+    super(encoding);
     this.messageMaxBytes = messageMaxBytes;
     this.messageEncoder = messageEncoder;
     this.encoder = encoder;
@@ -60,30 +58,21 @@ FakeSender encoding(Encoding encoding) {
       onSpans);
   }
 
-  @Override public Encoding encoding() {
-    return encoding;
+  FakeSender onSpans(Consumer> onSpans) {
+    return new FakeSender(encoding, messageMaxBytes, messageEncoder, encoder, decoder, onSpans);
   }
 
   @Override public int messageMaxBytes() {
     return messageMaxBytes;
   }
 
-  @Override public int messageSizeInBytes(List encodedSpans) {
-    return encoding.listSizeInBytes(encodedSpans);
-  }
-
-  @Override public int messageSizeInBytes(int encodedSizeInBytes) {
-    return encoding.listSizeInBytes(encodedSizeInBytes);
-  }
-
   /** close is typically called from a different thread */
   volatile boolean closeCalled;
 
-  @Override public Call sendSpans(List encodedSpans) {
+  @Override public void send(List encodedSpans) {
     if (closeCalled) throw new ClosedSenderException();
     List decoded = encodedSpans.stream().map(decoder::decodeOne).collect(Collectors.toList());
     onSpans.accept(decoded);
-    return Call.create(null);
   }
 
   @Override public void close() {
diff --git a/core/pom.xml b/core/pom.xml
index ce560f7c..640231d6 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -20,7 +20,7 @@
   
     io.zipkin.reporter2
     zipkin-reporter-parent
-    3.1.2-SNAPSHOT
+    3.2.0-SNAPSHOT
   
 
   zipkin-reporter
diff --git a/core/src/main/java/zipkin2/reporter/AsyncReporter.java b/core/src/main/java/zipkin2/reporter/AsyncReporter.java
index a655afe4..733e85fa 100644
--- a/core/src/main/java/zipkin2/reporter/AsyncReporter.java
+++ b/core/src/main/java/zipkin2/reporter/AsyncReporter.java
@@ -13,6 +13,7 @@
  */
 package zipkin2.reporter;
 
+import java.io.Closeable;
 import java.io.Flushable;
 import java.util.List;
 import java.util.concurrent.ThreadFactory;
@@ -25,7 +26,7 @@
  *
  * 

Spans are bundled into messages based on size in bytes or a timeout, whichever happens first. * - *

The thread that sends flushes spans to the {@linkplain Sender} does so in a synchronous loop. + *

The thread that sends flushes spans to the {@linkplain BytesMessageSender} does so in a synchronous loop. * This means that even asynchronous transports will wait for an ack before sending a next message. * We do this so that a surge of spans doesn't overrun memory or bandwidth via hundreds or * thousands of in-flight messages. The downside of this is that reporting is limited in speed to @@ -34,21 +35,21 @@ * @param type of the span, usually {@link zipkin2.Span} */ // This is effectively, but not explicitly final as it was not final in version 2.x. -public class AsyncReporter extends Component implements Reporter, Flushable { +public class AsyncReporter extends Component implements Reporter, Closeable, Flushable { /** * Builds a json reporter for Zipkin V2. If http, * the endpoint of the sender is usually "http://zipkinhost:9411/api/v2/spans". * - *

After a certain threshold, spans are drained and {@link Sender#sendSpans(List) sent} to - * Zipkin collectors. + *

After a certain threshold, spans are drained and {@link BytesMessageSender#send(List) sent} + * to Zipkin collectors. */ - public static AsyncReporter create(Sender sender) { + public static AsyncReporter create(BytesMessageSender sender) { return new Builder(sender).build(); } - /** Like {@link #create(Sender)}, except you can configure settings such as the timeout. */ - public static Builder builder(Sender sender) { + /** Like {@link #create(BytesMessageSender)}, except you can configure settings such as the timeout. */ + public static Builder builder(BytesMessageSender sender) { return new Builder(sender); } @@ -87,7 +88,7 @@ public static final class Builder { final zipkin2.reporter.internal.AsyncReporter.Builder delegate; final Encoding encoding; - Builder(Sender sender) { + Builder(BytesMessageSender sender) { this.delegate = zipkin2.reporter.internal.AsyncReporter.newBuilder(sender); this.encoding = sender.encoding(); } @@ -110,7 +111,7 @@ public Builder metrics(ReporterMetrics metrics) { /** * Maximum bytes sendable per message including overhead. Defaults to, and is limited by {@link - * Sender#messageMaxBytes()}. + * BytesMessageSender#messageMaxBytes()}. */ public Builder messageMaxBytes(int messageMaxBytes) { this.delegate.messageMaxBytes(messageMaxBytes); @@ -121,8 +122,8 @@ public Builder messageMaxBytes(int messageMaxBytes) { * Default 1 second. 0 implies spans are {@link #flush() flushed} externally. * *

Instead of sending one message at a time, spans are bundled into messages, up to {@link - * Sender#messageMaxBytes()}. This timeout ensures that spans are not stuck in an incomplete - * message. + * BytesMessageSender#messageMaxBytes()}. This timeout ensures that spans are not stuck in an + * incomplete message. * *

Note: this timeout starts when the first unsent span is reported. */ @@ -170,8 +171,9 @@ public AsyncReporter build(BytesEncoder encoder) { } } - static final class BytesEncoderAdapterimplements BytesEncoder { + static final class BytesEncoderAdapter implements BytesEncoder { final BytesEncoder delegate; + BytesEncoderAdapter(BytesEncoder delegate) { this.delegate = delegate; } diff --git a/core/src/main/java/zipkin2/reporter/AwaitableCallback.java b/core/src/main/java/zipkin2/reporter/AwaitableCallback.java index 34bfae83..a630956c 100644 --- a/core/src/main/java/zipkin2/reporter/AwaitableCallback.java +++ b/core/src/main/java/zipkin2/reporter/AwaitableCallback.java @@ -17,8 +17,10 @@ /** * Blocks until {@link Callback#onSuccess(Object)} or {@link Callback#onError(Throwable)}. + * + * @deprecated since 3.2 this is no longer used. */ -public final class AwaitableCallback implements Callback { +@Deprecated public final class AwaitableCallback implements Callback { final CountDownLatch countDown = new CountDownLatch(1); Throwable throwable; // thread visibility guaranteed by the countdown latch diff --git a/core/src/main/java/zipkin2/reporter/BytesMessageSender.java b/core/src/main/java/zipkin2/reporter/BytesMessageSender.java new file mode 100644 index 00000000..0012fb87 --- /dev/null +++ b/core/src/main/java/zipkin2/reporter/BytesMessageSender.java @@ -0,0 +1,120 @@ +/* + * Copyright 2016-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.reporter; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * Sends a list of encoded spans to a transport such as HTTP or Kafka. Usually, this involves + * encoding them into a message and enqueueing them for transport in a corresponding client library. + * The typical end recipient is a zipkin collector. + * + *

Unless mentioned otherwise, senders are not thread-safe. They were designed to be used by a + * single reporting thread, hence the operation is blocking + * + *

Those looking to initialize eagerly can {@link #send(List)} with an empty list. This can be + * used to reduce latency on the first send operation, or to fail fast. + * + *

Implementation notes + * + *

The parameter is a list of encoded spans as opposed to an encoded message. This allows + * implementations flexibility on how to encode spans into a message. For example, a large span + * might need to be sent as a separate message to avoid kafka limits. Also, logging transports like + * scribe will likely write each span as a separate log line. + * + *

This accepts a list of {@link BytesEncoder#encode(Object) encoded spans}, as opposed a list of + * spans like {@code zipkin2.Span}. This allows senders to be re-usable as model shapes change. This + * also allows them to use their most natural message type. For example, kafka would more naturally + * send messages as byte arrays. + * + * @since 3.2 + */ +public interface BytesMessageSender extends Closeable { + + /** + * Base class for implementation, which implements {@link #messageSizeInBytes(List)} and + * {@link #messageSizeInBytes(List)} with a given {@linkplain Encoding} + */ + abstract class Base implements BytesMessageSender { + protected final Encoding encoding; + + protected Base(Encoding encoding) { + this.encoding = encoding; + } + + /** {@inheritDoc} */ + @Override public Encoding encoding() { + return encoding; + } + + /** {@inheritDoc} */ + @Override public int messageSizeInBytes(List encodedSpans) { + return encoding.listSizeInBytes(encodedSpans); + } + + /** {@inheritDoc} */ + @Override public int messageSizeInBytes(int encodedSizeInBytes) { + return encoding.listSizeInBytes(encodedSizeInBytes); + } + } + + /** Returns the encoding this sender requires spans to have. */ + Encoding encoding(); + + /** + * Maximum bytes sendable per message including overhead. This can be calculated using {@link + * #messageSizeInBytes(List)} + *

+ * Defaults to 500KB as a conservative default. You may get better or reduced performance + * by changing this value based on, e.g., machine size or network bandwidth in your + * infrastructure. Finding a perfect value will require trying out different values in production, + * but the default should work well enough in most cases. + */ + int messageMaxBytes(); + + /** + * Before invoking {@link BytesMessageSender#send(List)}, callers must consider message overhead, + * which might be more than encoding overhead. This is used to not exceed {@link + * BytesMessageSender#messageMaxBytes()}. + * + *

Note this is not always {@link Encoding#listSizeInBytes(List)}, as some senders have + * inefficient list encoding. For example, Scribe base64's then tags each span with a category. + */ + int messageSizeInBytes(List encodedSpans); + + /** + * Like {@link #messageSizeInBytes(List)}, except for a single-span. This is used to ensure a span + * is never accepted that can never be sent. + * + *

Note this is not always {@link Encoding#listSizeInBytes(int)}, as some senders have + * inefficient list encoding. For example, Stackdriver's proto message contains other fields. + * + * @param encodedSizeInBytes the {@link BytesEncoder#sizeInBytes(Object) encoded size} of a span + */ + int messageSizeInBytes(int encodedSizeInBytes); + + /** + * Sends a list of encoded spans to a transport such as HTTP or Kafka. + * + *

Empty input is permitted. While async reporters in this repository will always send + * a non-empty list. Some external callers might use an empty send for fail-fast checking. If you + * obviate empty lists, you might break them. See /RATIONALE.md for more. + * + * @param encodedSpans a potentially empty list of encoded spans. + * @throws IllegalStateException if {@link #close() close} was called. + */ + void send(List encodedSpans) throws IOException; +} diff --git a/core/src/main/java/zipkin2/reporter/Call.java b/core/src/main/java/zipkin2/reporter/Call.java index a15a777c..9f566804 100644 --- a/core/src/main/java/zipkin2/reporter/Call.java +++ b/core/src/main/java/zipkin2/reporter/Call.java @@ -48,7 +48,9 @@ * * @param the success type, typically not null except when {@code V} is {@linkplain Void}. * @since 3.0 + * @deprecated since 3.2 this is no longer used. This will be removed in v4.0. */ +@Deprecated public abstract class Call implements Cloneable { /** * Returns a completed call which has the supplied value. This is useful when input parameters @@ -373,6 +375,10 @@ public void onError(Throwable t) { } } + /** + * @deprecated since 3.2 this is no longer used. This will be removed in v4.0. + */ + @Deprecated public static abstract class Base extends Call { volatile boolean canceled; boolean executed; diff --git a/core/src/main/java/zipkin2/reporter/ClosedSenderException.java b/core/src/main/java/zipkin2/reporter/ClosedSenderException.java index d8bda4e9..3e5fb6ee 100644 --- a/core/src/main/java/zipkin2/reporter/ClosedSenderException.java +++ b/core/src/main/java/zipkin2/reporter/ClosedSenderException.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 The OpenZipkin Authors + * Copyright 2016-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -13,7 +13,7 @@ */ package zipkin2.reporter; -/** An exception thrown when a {@link Sender} is used after it has been closed. */ +/** An exception thrown when a {@link BytesMessageSender} is used after it has been closed. */ public final class ClosedSenderException extends IllegalStateException { static final long serialVersionUID = -4636520624634625689L; } diff --git a/core/src/main/java/zipkin2/reporter/Component.java b/core/src/main/java/zipkin2/reporter/Component.java index 1eac9481..8bcb56ed 100644 --- a/core/src/main/java/zipkin2/reporter/Component.java +++ b/core/src/main/java/zipkin2/reporter/Component.java @@ -24,7 +24,9 @@ * avoid crashing the application graph if a network service is unavailable. * * @since 3.0 + * @deprecated since 3.2 this is no longer used. This will be removed in v4.0. */ +@Deprecated public abstract class Component implements Closeable { /** @@ -35,7 +37,10 @@ public abstract class Component implements Closeable { * possible to establish a meaningful result, and be safe to call many times, even concurrently. * * @see CheckResult#OK + * @deprecated since 3.2 this is no longer used. If you need to check a sender, send a zero-length + * list of spans. This will be removed in v4.0. */ + @Deprecated public CheckResult check() { return CheckResult.OK; } diff --git a/core/src/main/java/zipkin2/reporter/ReporterMetrics.java b/core/src/main/java/zipkin2/reporter/ReporterMetrics.java index cbce7a81..b7365aea 100644 --- a/core/src/main/java/zipkin2/reporter/ReporterMetrics.java +++ b/core/src/main/java/zipkin2/reporter/ReporterMetrics.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 The OpenZipkin Authors + * Copyright 2016-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -70,7 +70,7 @@ public interface ReporterMetrics { * *

This is a function of span bytes per message and overhead * - * @see Sender#messageSizeInBytes + * @see BytesMessageSender#messageSizeInBytes */ void incrementMessageBytes(int quantity); diff --git a/core/src/main/java/zipkin2/reporter/Sender.java b/core/src/main/java/zipkin2/reporter/Sender.java index 071bc65c..2b3861fb 100644 --- a/core/src/main/java/zipkin2/reporter/Sender.java +++ b/core/src/main/java/zipkin2/reporter/Sender.java @@ -13,6 +13,7 @@ */ package zipkin2.reporter; +import java.io.IOException; import java.util.Collections; import java.util.List; @@ -40,42 +41,13 @@ * send messages as byte arrays. * * @since 3.0 + * @deprecated since 3.2, use {@link BytesMessageSender} instead. This will be removed in v4.0. */ -public abstract class Sender extends Component { +@Deprecated +public abstract class Sender extends Component implements BytesMessageSender { - /** Returns the encoding this sender requires spans to have. */ - public abstract Encoding encoding(); - - /** - * Maximum bytes sendable per message including overhead. This can be calculated using {@link - * #messageSizeInBytes(List)} - *

- * Defaults to 500KB as a conservative default. You may get better or reduced performance - * by changing this value based on, e.g., machine size or network bandwidth in your - * infrastructure. Finding a perfect value will require trying out different values in production, - * but the default should work well enough in most cases. - */ - public abstract int messageMaxBytes(); - - /** - * Before invoking {@link Sender#sendSpans(List)}, callers must consider message overhead, which - * might be more than encoding overhead. This is used to not exceed {@link - * Sender#messageMaxBytes()}. - * - *

Note this is not always {@link Encoding#listSizeInBytes(List)}, as some senders have - * inefficient list encoding. For example, Scribe base64's then tags each span with a category. - */ - public abstract int messageSizeInBytes(List encodedSpans); - - /** - * Like {@link #messageSizeInBytes(List)}, except for a single-span. This is used to ensure a span - * is never accepted that can never be sent. - * - *

Always override this, which is only abstract as added after version 2.0 - * - * @param encodedSizeInBytes the {@link BytesEncoder#sizeInBytes(Object) encoded size} of a span - */ - public int messageSizeInBytes(int encodedSizeInBytes) { + /** {@inheritDoc} */ + @Override public int messageSizeInBytes(int encodedSizeInBytes) { return messageSizeInBytes(Collections.singletonList(new byte[encodedSizeInBytes])); } @@ -84,6 +56,13 @@ public int messageSizeInBytes(int encodedSizeInBytes) { * * @param encodedSpans list of encoded spans. * @throws IllegalStateException if {@link #close() close} was called. + * @deprecated since 3.2, use {@link BytesMessageSender} instead. This will be removed in v4.0. */ + @Deprecated public abstract Call sendSpans(List encodedSpans); + + /** {@inheritDoc} */ + @Override public void send(List encodedSpans) throws IOException { + sendSpans(encodedSpans).execute(); + } } diff --git a/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java b/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java index 6b8330fb..2c7cb3aa 100644 --- a/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java +++ b/core/src/main/java/zipkin2/reporter/internal/AsyncReporter.java @@ -13,8 +13,11 @@ */ package zipkin2.reporter.internal; +import java.io.Closeable; import java.io.Flushable; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; @@ -23,6 +26,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import zipkin2.reporter.BytesEncoder; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Call; import zipkin2.reporter.CheckResult; import zipkin2.reporter.ClosedSenderException; @@ -42,17 +46,18 @@ * *

Spans are bundled into messages based on size in bytes or a timeout, whichever happens first. * - *

The thread that sends flushes spans to the {@linkplain Sender} does so in a synchronous loop. - * This means that even asynchronous transports will wait for an ack before sending a next message. - * We do this so that a surge of spans doesn't overrun memory or bandwidth via hundreds or - * thousands of in-flight messages. The downside of this is that reporting is limited in speed to - * what a single thread can clear. When a thread cannot clear the backlog, new spans are dropped. + *

The thread that sends flushes spans to the {@linkplain BytesMessageSender} does so in a + * synchronous loop. This means that even asynchronous transports will wait for an ack before + * sending a next message. We do this so that a surge of spans doesn't overrun memory or bandwidth + * via hundreds or thousands of in-flight messages. The downside of this is that reporting is + * limited in speed to what a single thread can clear. When a thread cannot clear the backlog, new + * spans are dropped. * * @param type of the span, usually {@code zipkin2.Span} * @since 3.0 */ -public abstract class AsyncReporter extends Component implements Reporter, Flushable { - public static Builder newBuilder(Sender sender) { +public abstract class AsyncReporter extends Component implements Reporter, Closeable, Flushable { + public static Builder newBuilder(BytesMessageSender sender) { return new Builder(sender); } @@ -72,7 +77,7 @@ public static Builder newBuilder(Sender sender) { public abstract Builder toBuilder(); public static final class Builder { - final Sender sender; + final BytesMessageSender sender; ThreadFactory threadFactory = Executors.defaultThreadFactory(); ReporterMetrics metrics = ReporterMetrics.NOOP_METRICS; int messageMaxBytes; @@ -98,7 +103,7 @@ static int onePercentOfMemory() { return (int) Math.max(Math.min(Integer.MAX_VALUE, result), Integer.MIN_VALUE); } - Builder(Sender sender) { + Builder(BytesMessageSender sender) { if (sender == null) throw new NullPointerException("sender == null"); this.sender = sender; this.messageMaxBytes = sender.messageMaxBytes(); @@ -124,7 +129,7 @@ public Builder metrics(ReporterMetrics metrics) { /** * Maximum bytes sendable per message including overhead. Defaults to, and is limited by {@link - * Sender#messageMaxBytes()}. + * BytesMessageSender#messageMaxBytes()}. */ public Builder messageMaxBytes(int messageMaxBytes) { if (messageMaxBytes < 0) { @@ -138,8 +143,8 @@ public Builder messageMaxBytes(int messageMaxBytes) { * Default 1 second. 0 implies spans are {@link #flush() flushed} externally. * *

Instead of sending one message at a time, spans are bundled into messages, up to {@link - * Sender#messageMaxBytes()}. This timeout ensures that spans are not stuck in an incomplete - * message. + * BytesMessageSender#messageMaxBytes()}. This timeout ensures that spans are not stuck in an + * incomplete message. * *

Note: this timeout starts when the first unsent span is reported. */ @@ -188,7 +193,7 @@ static final class BoundedAsyncReporter extends AsyncReporter { final AtomicBoolean started, closed; final BytesEncoder encoder; final ByteBoundedQueue pending; - final Sender sender; + final BytesMessageSender sender; final int messageMaxBytes; final long messageTimeoutNanos, closeTimeoutNanos; final CountDownLatch close; @@ -273,7 +278,7 @@ void flush(BufferNextMessage bundler) { }); try { - sender.sendSpans(nextMessage).execute(); + sender.send(nextMessage); } catch (Throwable t) { // In failure case, we increment messages and spans dropped. int count = nextMessage.size(); @@ -307,8 +312,14 @@ void flush(BufferNextMessage bundler) { } } - @Override public CheckResult check() { - return sender.check(); + @Override @Deprecated public CheckResult check() { + try { + sender.send(Collections.emptyList()); + return CheckResult.OK; + } catch (Throwable t) { + Call.propagateIfFatal(t); + return CheckResult.failed(t); + } } @Override public void close() { diff --git a/core/src/test/java/zipkin2/reporter/AsyncReporterTest.java b/core/src/test/java/zipkin2/reporter/AsyncReporterTest.java new file mode 100644 index 00000000..8db12229 --- /dev/null +++ b/core/src/test/java/zipkin2/reporter/AsyncReporterTest.java @@ -0,0 +1,122 @@ +/* + * Copyright 2016-2024 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.reporter; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.junit.jupiter.api.Test; +import zipkin2.Span; +import zipkin2.TestObjects; +import zipkin2.codec.SpanBytesDecoder; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Only tests entry points as {@link zipkin2.reporter.internal.AsyncReporter} tests covers the rest. + */ +class AsyncReporterTest { + @Test void example() { + AtomicInteger sentSpans = new AtomicInteger(); + try (AsyncReporter reporter = AsyncReporter.builder(FakeSender.create() + .onSpans(spans -> sentSpans.addAndGet(spans.size()))) + .messageTimeout(0, TimeUnit.MILLISECONDS) // no thread + .build(SpanBytesEncoder.JSON_V2)) { + + reporter.report(TestObjects.CLIENT_SPAN); + reporter.flush(); + } + + assertThat(sentSpans.get()).isEqualTo(1); + } + + @Deprecated @Test void example_deprecatedSender() { + AtomicInteger sentSpans = new AtomicInteger(); + try (AsyncReporter reporter = AsyncReporter.builder(new DeprecatedCheatingSender( + spans -> sentSpans.addAndGet(spans.size()) + )) + .messageTimeout(0, TimeUnit.MILLISECONDS) // no thread + .build(SpanBytesEncoder.JSON_V2)) { + + reporter.report(TestObjects.CLIENT_SPAN); + reporter.flush(); + } + + assertThat(sentSpans.get()).isEqualTo(1); + } + + @Deprecated static class DeprecatedCheatingSender extends Sender { + final Consumer> onSpans; + + DeprecatedCheatingSender(Consumer> onSpans) { + this.onSpans = onSpans; + } + + @Override public Encoding encoding() { + return Encoding.JSON; + } + + @Override public int messageMaxBytes() { + return 500; + } + + @Override public int messageSizeInBytes(List encodedSpans) { + return Encoding.JSON.listSizeInBytes(encodedSpans); + } + + @Override public int messageSizeInBytes(int encodedSizeInBytes) { + return Encoding.JSON.listSizeInBytes(encodedSizeInBytes); + } + + @Override public Call sendSpans(List encodedSpans) { + List decoded = encodedSpans.stream() + .map(SpanBytesDecoder.JSON_V2::decodeOne). + collect(Collectors.toList()); + return new CheatingVoidCall(onSpans, decoded); + } + } + + @Deprecated static class CheatingVoidCall extends Call { + final Consumer> onSpans; + final List spans; + + CheatingVoidCall(Consumer> onSpans, List spans) { + this.onSpans = onSpans; + this.spans = spans; + } + + @Override public Void execute() { + onSpans.accept(spans); + return null; + } + + @Override public void enqueue(Callback callback) { + throw new UnsupportedOperationException(); + } + + @Override public void cancel() { + throw new UnsupportedOperationException(); + } + + @Override public boolean isCanceled() { + return false; + } + + @Override public Call clone() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/core/src/test/java/zipkin2/reporter/internal/FakeSender.java b/core/src/test/java/zipkin2/reporter/FakeSender.java similarity index 71% rename from core/src/test/java/zipkin2/reporter/internal/FakeSender.java rename to core/src/test/java/zipkin2/reporter/FakeSender.java index 4ca9b8bf..05427fa6 100644 --- a/core/src/test/java/zipkin2/reporter/internal/FakeSender.java +++ b/core/src/test/java/zipkin2/reporter/FakeSender.java @@ -11,7 +11,7 @@ * or implied. See the License for the specific language governing permissions and limitations under * the License. */ -package zipkin2.reporter.internal; +package zipkin2.reporter; import java.util.List; import java.util.function.Consumer; @@ -19,15 +19,8 @@ import zipkin2.Span; import zipkin2.codec.BytesDecoder; import zipkin2.codec.SpanBytesDecoder; -import zipkin2.reporter.BytesEncoder; -import zipkin2.reporter.BytesMessageEncoder; -import zipkin2.reporter.Call; -import zipkin2.reporter.ClosedSenderException; -import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; -import zipkin2.reporter.SpanBytesEncoder; -public final class FakeSender extends Sender { +public final class FakeSender extends BytesMessageSender.Base { public static FakeSender create() { return new FakeSender(Encoding.JSON, Integer.MAX_VALUE, @@ -36,7 +29,6 @@ public static FakeSender create() { }); } - final Encoding encoding; final int messageMaxBytes; final BytesMessageEncoder messageEncoder; final BytesEncoder encoder; @@ -45,7 +37,7 @@ public static FakeSender create() { FakeSender(Encoding encoding, int messageMaxBytes, BytesMessageEncoder messageEncoder, BytesEncoder encoder, BytesDecoder decoder, Consumer> onSpans) { - this.encoding = encoding; + super(encoding); this.messageMaxBytes = messageMaxBytes; this.messageEncoder = messageEncoder; this.encoder = encoder; @@ -53,45 +45,32 @@ public static FakeSender create() { this.onSpans = onSpans; } - FakeSender encoding(Encoding encoding) { + public FakeSender encoding(Encoding encoding) { return new FakeSender(encoding, messageMaxBytes, messageEncoder, // invalid but not needed, yet encoder, // invalid but not needed, yet decoder, // invalid but not needed, yet onSpans); } - FakeSender onSpans(Consumer> onSpans) { + public FakeSender onSpans(Consumer> onSpans) { return new FakeSender(encoding, messageMaxBytes, messageEncoder, encoder, decoder, onSpans); } - FakeSender messageMaxBytes(int messageMaxBytes) { + public FakeSender messageMaxBytes(int messageMaxBytes) { return new FakeSender(encoding, messageMaxBytes, messageEncoder, encoder, decoder, onSpans); } - @Override public Encoding encoding() { - return encoding; - } - @Override public int messageMaxBytes() { return messageMaxBytes; } - @Override public int messageSizeInBytes(List encodedSpans) { - return encoding.listSizeInBytes(encodedSpans); - } - - @Override public int messageSizeInBytes(int encodedSizeInBytes) { - return encoding.listSizeInBytes(encodedSizeInBytes); - } - /** close is typically called from a different thread */ volatile boolean closeCalled; - @Override public Call sendSpans(List encodedSpans) { + @Override public void send(List encodedSpans) { if (closeCalled) throw new ClosedSenderException(); List decoded = encodedSpans.stream().map(decoder::decodeOne).collect(Collectors.toList()); onSpans.accept(decoded); - return Call.create(null); } @Override public void close() { diff --git a/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java b/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java index 81c96857..065ec0b4 100644 --- a/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java +++ b/core/src/test/java/zipkin2/reporter/internal/AsyncReporterTest.java @@ -32,6 +32,7 @@ import zipkin2.reporter.BytesEncoder; import zipkin2.reporter.ClosedSenderException; import zipkin2.reporter.Encoding; +import zipkin2.reporter.FakeSender; import zipkin2.reporter.InMemoryReporterMetrics; import zipkin2.reporter.SpanBytesEncoder; import zipkin2.reporter.internal.AsyncReporter.BoundedAsyncReporter; diff --git a/kafka/pom.xml b/kafka/pom.xml index 32da5917..0ae5e15f 100644 --- a/kafka/pom.xml +++ b/kafka/pom.xml @@ -20,7 +20,7 @@ io.zipkin.reporter2 zipkin-reporter-parent - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT zipkin-sender-kafka diff --git a/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java b/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java index df88069f..55ac8949 100644 --- a/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java +++ b/kafka/src/main/java/zipkin2/reporter/kafka/KafkaSender.java @@ -26,8 +26,10 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.serialization.ByteArraySerializer; +import zipkin2.reporter.AsyncReporter; import zipkin2.reporter.AwaitableCallback; import zipkin2.reporter.BytesMessageEncoder; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Call; import zipkin2.reporter.Callback; import zipkin2.reporter.CheckResult; @@ -40,7 +42,7 @@ * *

Usage

*

- * This type is designed for {@link zipkin2.reporter.AsyncReporter.Builder#builder(Sender) the async + * This type is designed for {@link AsyncReporter.Builder#builder(BytesMessageSender) the async * reporter}. * *

Here's a simple configuration, configured for json: @@ -255,19 +257,32 @@ public Builder toBuilder() { return messageMaxBytes; } + /** {@inheritDoc} */ + @Override @Deprecated public Call sendSpans(List encodedSpans) { + if (closeCalled) throw new ClosedSenderException(); + byte[] message = encoder.encode(encodedSpans); + return new KafkaCall(message); + } + /** * This sends all the spans as a single message. * *

NOTE: this blocks until the metadata server is available. */ - @Override public Call sendSpans(List encodedSpans) { + @Override public void send(List encodedSpans) { if (closeCalled) throw new ClosedSenderException(); - byte[] message = encoder.encode(encodedSpans); - return new KafkaCall(message); + send(encoder.encode(encodedSpans)); } - /** Ensures there are no problems reading metadata about the topic. */ - @Override public CheckResult check() { + void send(byte[] message) { + if (closeCalled) throw new ClosedSenderException(); + AwaitableCallback callback = new AwaitableCallback(); + get().send(new ProducerRecord(topic, message), new CallbackAdapter(callback)); + callback.await(); + } + + /** {@inheritDoc} */ + @Override @Deprecated public CheckResult check() { try { KafkaFuture maybeClusterId = getAdminClient().describeCluster().clusterId(); maybeClusterId.get(1, TimeUnit.SECONDS); @@ -308,7 +323,7 @@ AdminClient getAdminClient() { closeCalled = true; } - @Override public final String toString() { + @Override public String toString() { return "KafkaSender{" + "bootstrapServers=" + properties.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) + ", topic=" + topic @@ -323,9 +338,7 @@ class KafkaCall extends Call.Base { // KafkaFuture is not cancelable } @Override protected Void doExecute() { - AwaitableCallback callback = new AwaitableCallback(); - get().send(new ProducerRecord(topic, message), new CallbackAdapter(callback)); - callback.await(); + send(message); return null; } diff --git a/kafka/src/test/java/zipkin2/reporter/kafka/ITKafkaSender.java b/kafka/src/test/java/zipkin2/reporter/kafka/ITKafkaSender.java index c87bd857..a4a6b7fc 100644 --- a/kafka/src/test/java/zipkin2/reporter/kafka/ITKafkaSender.java +++ b/kafka/src/test/java/zipkin2/reporter/kafka/ITKafkaSender.java @@ -20,13 +20,13 @@ import java.util.Map; import java.util.Properties; import java.util.Set; -import java.util.concurrent.TimeoutException; import java.util.stream.Stream; import javax.management.ObjectName; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.errors.RecordTooLargeException; +import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.junit.jupiter.api.AfterEach; @@ -39,10 +39,8 @@ import zipkin2.Span; import zipkin2.codec.SpanBytesDecoder; import zipkin2.reporter.AsyncReporter; -import zipkin2.reporter.Call; -import zipkin2.reporter.CheckResult; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; import zipkin2.reporter.SpanBytesEncoder; import static java.util.stream.Collectors.toList; @@ -69,49 +67,49 @@ class ITKafkaSender { sender.close(); } - @Test void sendsSpans() throws Exception { - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + @Test void send() { + sendSpans(CLIENT_SPAN, CLIENT_SPAN); sender.producer.flush(); assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage())) .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test void sendsSpans_PROTO3() throws Exception { + @Test void send_PROTO3() { sender.close(); sender = sender.toBuilder().encoding(Encoding.PROTO3).build(); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); sender.producer.flush(); assertThat(SpanBytesDecoder.PROTO3.decodeList(readMessage())) .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test void sendsSpans_THRIFT() throws Exception { + @Test void send_THRIFT() { sender.close(); sender = sender.toBuilder().encoding(Encoding.THRIFT).build(); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); sender.producer.flush(); assertThat(SpanBytesDecoder.THRIFT.decodeList(readMessage())) .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test void sendsSpansToCorrectTopic() throws Exception { + @Test void sendToCorrectTopic() { sender.close(); kafka.prepareTopics("customzipkintopic"); sender = sender.toBuilder().topic("customzipkintopic").build(); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); sender.producer.flush(); assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage("customzipkintopic"))) .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test void checkFalseWhenKafkaIsDown() { + @Test void sendWhenKafkaIsDown() { kafka.stop(); // Make a new tracer that fails faster than 60 seconds @@ -120,20 +118,19 @@ class ITKafkaSender { overrides.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "100"); sender = sender.toBuilder().overrides(overrides).build(); - CheckResult check = sender.check(); - assertThat(check.ok()).isFalse(); - assertThat(check.error()).isInstanceOf(TimeoutException.class); + assertThatThrownBy(() -> sendSpans(CLIENT_SPAN, CLIENT_SPAN)) + .isInstanceOf(TimeoutException.class); } @Test void illegalToSendWhenClosed() { sender.close(); - assertThatThrownBy(() -> send(CLIENT_SPAN, CLIENT_SPAN).execute()) + assertThatThrownBy(() -> sendSpans(CLIENT_SPAN, CLIENT_SPAN)) .isInstanceOf(IllegalStateException.class); } @Test void shouldCloseKafkaProducerOnClose() throws Exception { - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); sender.producer.flush(); final ObjectName kafkaProducerMXBeanName = new ObjectName("kafka.producer:*"); @@ -152,15 +149,15 @@ class ITKafkaSender { sender.close(); sender = sender.toBuilder().messageMaxBytes(1).build(); - assertThatThrownBy(() -> send(CLIENT_SPAN, CLIENT_SPAN).execute()) + assertThatThrownBy(() -> sendSpans(CLIENT_SPAN, CLIENT_SPAN)) .isInstanceOf(RecordTooLargeException.class); } /** - * The output of toString() on {@link Sender} implementations appears in thread names created by - * {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other monitoring - * tools, care should be taken to ensure the toString() output is a reasonable length and does not - * contain sensitive information. + * The output of toString() on {@link BytesMessageSender} implementations appears in thread names + * created by {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other + * monitoring tools, care should be taken to ensure the toString() output is a reasonable length + * and does not contain sensitive information. */ @Test void toStringContainsOnlySummaryInformation() { assertThat(sender.toString()).isEqualTo( @@ -190,7 +187,7 @@ class ITKafkaSender { assertThat(filteredProperties.get(ProducerConfig.SECURITY_PROVIDERS_CONFIG)).isNotNull(); } - Call send(Span... spans) { + void sendSpans(Span... spans) { SpanBytesEncoder bytesEncoder; switch (sender.encoding()) { case JSON: @@ -205,7 +202,7 @@ Call send(Span... spans) { default: throw new UnsupportedOperationException("encoding: " + sender.encoding()); } - return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); + sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); } byte[] readMessage(String topic) { diff --git a/libthrift/pom.xml b/libthrift/pom.xml index b6b40e79..137f65f3 100644 --- a/libthrift/pom.xml +++ b/libthrift/pom.xml @@ -20,7 +20,7 @@ io.zipkin.reporter2 zipkin-reporter-parent - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT zipkin-sender-libthrift diff --git a/libthrift/src/main/java/zipkin2/reporter/libthrift/LibthriftSender.java b/libthrift/src/main/java/zipkin2/reporter/libthrift/LibthriftSender.java index 8c8c375e..6d438d5f 100644 --- a/libthrift/src/main/java/zipkin2/reporter/libthrift/LibthriftSender.java +++ b/libthrift/src/main/java/zipkin2/reporter/libthrift/LibthriftSender.java @@ -84,7 +84,7 @@ public Builder messageMaxBytes(int messageMaxBytes) { return this; } - public final LibthriftSender build() { + public LibthriftSender build() { return new LibthriftSender(this); } @@ -129,11 +129,24 @@ public int messageSizeInBytes(List encodedSpans) { return ScribeClient.messageSizeInBytes(encodedSpans); } - @Override public Call sendSpans(List encodedSpans) { + /** {@inheritDoc} */ + @Override @Deprecated public Call sendSpans(List encodedSpans) { if (closeCalled) throw new ClosedSenderException(); return new ScribeCall(encodedSpans); } + /** {@inheritDoc} */ + @Override public void send(List encodedSpans) throws IOException { + if (closeCalled) throw new ClosedSenderException(); + try { + if (!get().log(encodedSpans)) { + throw new IllegalStateException("try later"); + } + } catch (TException e) { + throw new IOException(e); + } + } + ScribeClient get() { if (client == null) { synchronized (this) { @@ -149,9 +162,8 @@ ScribeClient get() { private volatile boolean closeCalled; private volatile ScribeClient client; - /** Sends an empty log message to the configured host. */ - @Override - public CheckResult check() { + /** {@inheritDoc} */ + @Override @Deprecated public CheckResult check() { try { if (get().log(Collections.emptyList())) { return CheckResult.OK; @@ -169,7 +181,7 @@ public CheckResult check() { if (client != null) client.close(); } - @Override public final String toString() { + @Override public String toString() { return "LibthriftSender(" + host + ":" + port + ")"; } @@ -181,13 +193,7 @@ class ScribeCall extends Call.Base { } @Override protected Void doExecute() throws IOException { - try { - if (!get().log(encodedSpans)) { - throw new IllegalStateException("try later"); - } - } catch (TException e) { - throw new IOException(e); - } + send(encodedSpans); return null; } diff --git a/libthrift/src/test/java/zipkin2/reporter/libthrift/ITLibthriftSender.java b/libthrift/src/test/java/zipkin2/reporter/libthrift/ITLibthriftSender.java index 7b2e072d..f53bcb67 100644 --- a/libthrift/src/test/java/zipkin2/reporter/libthrift/ITLibthriftSender.java +++ b/libthrift/src/test/java/zipkin2/reporter/libthrift/ITLibthriftSender.java @@ -15,6 +15,7 @@ import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; @@ -25,6 +26,8 @@ import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; import zipkin2.Span; +import zipkin2.reporter.AsyncReporter; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.SpanBytesEncoder; import static java.util.stream.Collectors.toList; @@ -51,8 +54,8 @@ class ITLibthriftSender { sender.close(); } - @Test void sendsSpans() throws Exception { - send(CLIENT_SPAN); + @Test void send() throws Exception { + sendSpans(CLIENT_SPAN); assertThat(zipkin.get("/api/v2/trace/" + CLIENT_SPAN.traceId()).isSuccessful()) .isTrue(); @@ -61,9 +64,9 @@ class ITLibthriftSender { /** * This will help verify sequence ID and response parsing logic works */ - @Test void sendsSpans_multipleTimes() throws Exception { + @Test void send_multipleTimes() throws Exception { for (int i = 0; i < 5; i++) { // Have client send 5 messages - send(Arrays.copyOfRange(LOTS_OF_SPANS, i, (i * 10) + 10)); + sendSpans(Arrays.copyOfRange(LOTS_OF_SPANS, i, (i * 10) + 10)); } for (int i = 0; i < 5; i++) { // Try the last ID of each @@ -73,11 +76,11 @@ class ITLibthriftSender { } } - @Test void check_okWhenScribeIsListening() { - assertThat(sender.check().ok()).isTrue(); + @Test void emptyOk() throws Exception { + sender.send(Collections.emptyList()); } - @Test void check_notOkWhenScribeIsDown() { + @Test void sendFailsWhenScribeIsDown() { sender.close(); // Reconfigure to a valid host but invalid port. @@ -85,7 +88,8 @@ class ITLibthriftSender { host(zipkin.host()). port(zipkin.httpPort()).build(); - assertThat(sender.check().ok()).isFalse(); + assertThatThrownBy(() -> sendSpans(CLIENT_SPAN, CLIENT_SPAN)) + .isInstanceOf(IOException.class); } @Test void reconnects() throws Exception { @@ -96,12 +100,12 @@ class ITLibthriftSender { host(zipkin.host()). port(9999).build(); - assertThatThrownBy(() -> send(CLIENT_SPAN, CLIENT_SPAN)) + assertThatThrownBy(() -> sendSpans(CLIENT_SPAN, CLIENT_SPAN)) .isInstanceOf(IOException.class); open(); - send(CLIENT_SPAN, CLIENT_SPAN); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); assertThat(zipkin.get("/api/v2/trace/" + CLIENT_SPAN.traceId()).isSuccessful()) .isTrue(); @@ -110,13 +114,13 @@ class ITLibthriftSender { @Test void illegalToSendWhenClosed() { sender.close(); - assertThatThrownBy(() -> send(CLIENT_SPAN, CLIENT_SPAN)) + assertThatThrownBy(() -> sendSpans(CLIENT_SPAN, CLIENT_SPAN)) .isInstanceOf(IllegalStateException.class); } /** - * The output of toString() on {@link zipkin2.reporter.Sender} implementations appears in thread - * names created by {@link zipkin2.reporter.AsyncReporter}. Since thread names are likely to be + * The output of toString() on {@link BytesMessageSender} implementations appears in thread + * names created by {@link AsyncReporter}. Since thread names are likely to be * exposed in logs and other monitoring tools, care should be taken to ensure the toString() * output is a reasonable length and does not contain sensitive information. */ @@ -128,9 +132,9 @@ class ITLibthriftSender { /** * Blocks until the callback completes to allow read-your-writes consistency during tests. */ - void send(Span... spans) throws IOException { + void sendSpans(Span... spans) throws IOException { List encodedSpans = Stream.of(spans).map(SpanBytesEncoder.THRIFT::encode).collect(toList()); - sender.sendSpans(encodedSpans).execute(); + sender.send(encodedSpans); } } diff --git a/libthrift/src/test/java/zipkin2/reporter/libthrift/InternalScribeCodecTest.java b/libthrift/src/test/java/zipkin2/reporter/libthrift/InternalScribeCodecTest.java index 12926dd0..a0fc7402 100644 --- a/libthrift/src/test/java/zipkin2/reporter/libthrift/InternalScribeCodecTest.java +++ b/libthrift/src/test/java/zipkin2/reporter/libthrift/InternalScribeCodecTest.java @@ -40,7 +40,7 @@ class InternalScribeCodecTest { } } - @Test void sendsSpansExpectedMetrics() throws Exception { + @Test void sendExpectedMetrics() throws Exception { byte[] thrift = SpanBytesEncoder.THRIFT.encode(CLIENT_SPAN); List encodedSpans = asList(thrift, thrift); diff --git a/metrics-micrometer/pom.xml b/metrics-micrometer/pom.xml index d7744d1c..448aa0c8 100644 --- a/metrics-micrometer/pom.xml +++ b/metrics-micrometer/pom.xml @@ -20,7 +20,7 @@ io.zipkin.reporter2 zipkin-reporter-parent - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT zipkin-reporter-metrics-micrometer diff --git a/okhttp3/pom.xml b/okhttp3/pom.xml index bc6041b5..93a97e5b 100644 --- a/okhttp3/pom.xml +++ b/okhttp3/pom.xml @@ -20,7 +20,7 @@ io.zipkin.reporter2 zipkin-reporter-parent - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT zipkin-sender-okhttp3 diff --git a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java index 53d39f6f..5209e623 100644 --- a/okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java +++ b/okhttp3/src/main/java/zipkin2/reporter/okhttp3/OkHttpSender.java @@ -19,6 +19,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import okhttp3.Call; import okhttp3.Dispatcher; import okhttp3.HttpUrl; import okhttp3.MediaType; @@ -30,19 +31,22 @@ import okio.BufferedSink; import okio.GzipSink; import okio.Okio; +import zipkin2.reporter.AsyncReporter; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.CheckResult; import zipkin2.reporter.ClosedSenderException; import zipkin2.reporter.Encoding; import zipkin2.reporter.Sender; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static zipkin2.reporter.okhttp3.HttpCall.parseResponse; /** * Reports spans to Zipkin, using its POST endpoint. * *

Usage

*

- * This type is designed for {@link zipkin2.reporter.AsyncReporter.Builder#builder(Sender) the async + * This type is designed for {@link AsyncReporter.Builder#builder(BytesMessageSender) the async * reporter}. * *

Here's a simple configuration, configured for json: @@ -268,8 +272,8 @@ public Builder toBuilder() { /** close is typically called from a different thread */ volatile boolean closeCalled; - /** The returned call sends spans as a POST to {@link Builder#endpoint(String)}. */ - @Override public zipkin2.reporter.Call sendSpans(List encodedSpans) { + /** {@inheritDoc} */ + @Override @Deprecated public zipkin2.reporter.Call sendSpans(List encodedSpans) { if (closeCalled) throw new ClosedSenderException(); Request request; try { @@ -280,8 +284,16 @@ public Builder toBuilder() { return new HttpCall(client.newCall(request)); } - /** Sends an empty json message to the configured endpoint. */ - @Override public CheckResult check() { + /** Sends spans as a POST to {@link Builder#endpoint(String)}. */ + @Override public void send(List encodedSpans) throws IOException { + if (closeCalled) throw new ClosedSenderException(); + Request request = newRequest(encoder.encode(encodedSpans)); + Call call = client.newCall(request); + parseResponse(call.execute()); + } + + /** {@inheritDoc} */ + @Override @Deprecated public CheckResult check() { try { Request request = new Request.Builder().url(endpoint) .post(RequestBody.create(MediaType.parse("application/json"), "[]")).build(); diff --git a/okhttp3/src/test/java/zipkin2/reporter/okhttp3/ITOkHttpSender.java b/okhttp3/src/test/java/zipkin2/reporter/okhttp3/ITOkHttpSender.java index 0750f1bf..c71fe6db 100644 --- a/okhttp3/src/test/java/zipkin2/reporter/okhttp3/ITOkHttpSender.java +++ b/okhttp3/src/test/java/zipkin2/reporter/okhttp3/ITOkHttpSender.java @@ -15,6 +15,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -29,6 +30,7 @@ import org.junit.jupiter.api.Test; import zipkin2.Span; import zipkin2.codec.SpanBytesDecoder; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.SpanBytesEncoder; import zipkin2.reporter.AsyncReporter; import zipkin2.reporter.AwaitableCallback; @@ -73,15 +75,15 @@ public class ITOkHttpSender { // public for use in src/it server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); assertThat(called.get()).isTrue(); } - @Test void sendsSpans() throws Exception { + @Test void send() throws Exception { server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // Ensure only one request was sent assertThat(server.getRequestCount()).isEqualTo(1); @@ -91,12 +93,27 @@ public class ITOkHttpSender { // public for use in src/it .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test void sendsSpans_PROTO3() throws Exception { + @Test void emptyOk() throws Exception{ + server.enqueue(new MockResponse()); + + sender.send(Collections.emptyList()); + + assertThat(server.getRequestCount()).isEqualTo(1); + } + + @Test void sendFailsOnDisconnect() { + server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); + + assertThatThrownBy(() -> sendSpans(CLIENT_SPAN, CLIENT_SPAN)) + .isInstanceOf(IOException.class); + } + + @Test void send_PROTO3() throws Exception { sender = sender.toBuilder().encoding(Encoding.PROTO3).build(); server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // Ensure only one request was sent assertThat(server.getRequestCount()).isEqualTo(1); @@ -106,12 +123,12 @@ public class ITOkHttpSender { // public for use in src/it .containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test void sendsSpans_THRIFT() throws Exception { + @Test void send_THRIFT() throws Exception { sender = sender.toBuilder().encoding(Encoding.THRIFT).build(); server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // Ensure only one request was sent assertThat(server.getRequestCount()).isEqualTo(1); @@ -128,7 +145,7 @@ public class ITOkHttpSender { // public for use in src/it server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // block until the request arrived requests.add(server.takeRequest()); @@ -142,7 +159,7 @@ public class ITOkHttpSender { // public for use in src/it @Test void ensuresProxiesDontTrace() throws Exception { server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // If the Zipkin endpoint is proxied and instrumented, it will know "0" means don't trace. assertThat(server.takeRequest().getHeader("b3")).isEqualTo("0"); @@ -151,13 +168,14 @@ public class ITOkHttpSender { // public for use in src/it @Test void mediaTypeBasedOnSpanEncoding() throws Exception { server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // block until the request arrived assertThat(server.takeRequest().getHeader("Content-Type")) .isEqualTo("application/json"); } + @Deprecated @Test void closeWhileRequestInFlight_cancelsRequest() throws Exception { server.shutdown(); // shutdown the normal zipkin rule sender.close(); @@ -197,6 +215,7 @@ public class ITOkHttpSender { // public for use in src/it /** * Each message by default is up to 5MiB, make sure these go out of process as soon as they can. */ + @Deprecated @Test void messagesSendImmediately() throws Exception { server.shutdown(); // shutdown the normal zipkin rule sender.close(); @@ -234,6 +253,7 @@ public class ITOkHttpSender { // public for use in src/it } } + @Deprecated @Test void closeWhileRequestInFlight_graceful() throws Exception { server.shutdown(); // shutdown the normal zipkin rule sender.close(); @@ -264,10 +284,11 @@ public class ITOkHttpSender { // public for use in src/it } } + @Deprecated @Test void noExceptionWhenServerErrors() { server.enqueue(new MockResponse().setResponseCode(500)); - send().enqueue(new Callback() { + sender.sendSpans(Collections.emptyList()).enqueue(new Callback() { @Override public void onSuccess(Void aVoid) { } @@ -277,38 +298,24 @@ public class ITOkHttpSender { // public for use in src/it } @Test void outOfBandCancel() { - HttpCall call = (HttpCall) send(CLIENT_SPAN, CLIENT_SPAN); + HttpCall call = (HttpCall) sender.sendSpans(Collections.emptyList()); call.cancel(); assertThat(call.isCanceled()).isTrue(); } - @Test void check_ok() { - server.enqueue(new MockResponse()); - - assertThat(sender.check().ok()).isTrue(); - - assertThat(server.getRequestCount()).isEqualTo(1); - } - - @Test void check_fail() { - server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); - - assertThat(sender.check().ok()).isFalse(); - } - @Test void illegalToSendWhenClosed() { sender.close(); - assertThatThrownBy(() -> send(CLIENT_SPAN)) + assertThatThrownBy(() -> sendSpans(CLIENT_SPAN)) .isInstanceOf(IllegalStateException.class); } /** - * The output of toString() on {@link Sender} implementations appears in thread names created by - * {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other monitoring - * tools, care should be taken to ensure the toString() output is a reasonable length and does not - * contain sensitive information. + * The output of toString() on {@link BytesMessageSender} implementations appears in thread names + * created by {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other + * monitoring tools, care should be taken to ensure the toString() output is a reasonable length + * and does not contain sensitive information. */ @Test void toStringContainsOnlySenderTypeAndEndpoint() { assertThat(sender.toString()).isEqualTo("OkHttpSender{" + endpoint + "}"); @@ -320,7 +327,7 @@ public class ITOkHttpSender { // public for use in src/it .isNull(); } - Call send(Span... spans) { + void sendSpans(Span... spans) throws IOException { SpanBytesEncoder bytesEncoder; switch (sender.encoding()) { case JSON: @@ -335,6 +342,6 @@ Call send(Span... spans) { default: throw new UnsupportedOperationException("encoding: " + sender.encoding()); } - return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); + sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); } } diff --git a/pom.xml b/pom.xml index 57ebf426..e059a2b0 100755 --- a/pom.xml +++ b/pom.xml @@ -19,7 +19,7 @@ io.zipkin.reporter2 zipkin-reporter-parent - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT pom @@ -93,7 +93,7 @@ 3.0.1 3.5.1 3.3.0 - 3.2.3 + 3.2.5 1.6.13 diff --git a/spring-beans/pom.xml b/spring-beans/pom.xml index d4f3d102..2c3997b0 100644 --- a/spring-beans/pom.xml +++ b/spring-beans/pom.xml @@ -18,7 +18,7 @@ io.zipkin.reporter2 zipkin-reporter-parent - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT 4.0.0 diff --git a/spring-beans/src/main/java/zipkin2/reporter/beans/BaseAsyncFactoryBean.java b/spring-beans/src/main/java/zipkin2/reporter/beans/BaseAsyncFactoryBean.java index 5326aeaa..914de351 100644 --- a/spring-beans/src/main/java/zipkin2/reporter/beans/BaseAsyncFactoryBean.java +++ b/spring-beans/src/main/java/zipkin2/reporter/beans/BaseAsyncFactoryBean.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 The OpenZipkin Authors + * Copyright 2016-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -14,11 +14,12 @@ package zipkin2.reporter.beans; import org.springframework.beans.factory.config.AbstractFactoryBean; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.ReporterMetrics; import zipkin2.reporter.Sender; abstract class BaseAsyncFactoryBean extends AbstractFactoryBean { - Sender sender; + BytesMessageSender sender; ReporterMetrics metrics; Integer messageMaxBytes; Integer messageTimeout; @@ -30,7 +31,7 @@ abstract class BaseAsyncFactoryBean extends AbstractFactoryBean { return true; } - public void setSender(Sender sender) { + public void setSender(BytesMessageSender sender) { this.sender = sender; } diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/ActiveMQSenderFactoryBeanTest.java b/spring-beans/src/test/java/zipkin2/reporter/beans/ActiveMQSenderFactoryBeanTest.java index 3bec3c21..4ac1eb7a 100755 --- a/spring-beans/src/test/java/zipkin2/reporter/beans/ActiveMQSenderFactoryBeanTest.java +++ b/spring-beans/src/test/java/zipkin2/reporter/beans/ActiveMQSenderFactoryBeanTest.java @@ -134,7 +134,7 @@ class ActiveMQSenderFactoryBeanTest { ActiveMQSender sender = context.getBean("sender", ActiveMQSender.class); context.close(); - sender.sendSpans(Arrays.asList(new byte[]{'{', '}'})); + sender.send(Arrays.asList(new byte[]{'{', '}'})); }); } } diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/AsyncReporterFactoryBeanTest.java b/spring-beans/src/test/java/zipkin2/reporter/beans/AsyncReporterFactoryBeanTest.java index 7ca97bcb..201cf39c 100755 --- a/spring-beans/src/test/java/zipkin2/reporter/beans/AsyncReporterFactoryBeanTest.java +++ b/spring-beans/src/test/java/zipkin2/reporter/beans/AsyncReporterFactoryBeanTest.java @@ -17,6 +17,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import zipkin2.reporter.AsyncReporter; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Encoding; import zipkin2.reporter.ReporterMetrics; import zipkin2.reporter.Sender; @@ -25,8 +26,8 @@ import static org.assertj.core.api.Assertions.assertThat; class AsyncReporterFactoryBeanTest { - public static Sender SENDER = new FakeSender(); - public static Sender PROTO3_SENDER = new FakeSender() { + public static BytesMessageSender SENDER = new FakeSender(); + public static BytesMessageSender PROTO3_SENDER = new FakeSender() { @Override public Encoding encoding() { return Encoding.PROTO3; } diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/AsyncZipkinSpanHandlerFactoryBeanTest.java b/spring-beans/src/test/java/zipkin2/reporter/beans/AsyncZipkinSpanHandlerFactoryBeanTest.java index 29837bc1..2c61d488 100755 --- a/spring-beans/src/test/java/zipkin2/reporter/beans/AsyncZipkinSpanHandlerFactoryBeanTest.java +++ b/spring-beans/src/test/java/zipkin2/reporter/beans/AsyncZipkinSpanHandlerFactoryBeanTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 The OpenZipkin Authors + * Copyright 2016-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -18,6 +18,7 @@ import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.ReporterMetrics; import zipkin2.reporter.Sender; import zipkin2.reporter.brave.AsyncZipkinSpanHandler; @@ -31,7 +32,7 @@ class AsyncZipkinSpanHandlerFactoryBeanTest { return null; } }; - public static Sender SENDER = new FakeSender(); + public static BytesMessageSender SENDER = new FakeSender(); public static ReporterMetrics METRICS = ReporterMetrics.NOOP_METRICS; XmlBeans context; diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/FakeSender.java b/spring-beans/src/test/java/zipkin2/reporter/beans/FakeSender.java index 73cce760..6f3cf4f9 100644 --- a/spring-beans/src/test/java/zipkin2/reporter/beans/FakeSender.java +++ b/spring-beans/src/test/java/zipkin2/reporter/beans/FakeSender.java @@ -13,25 +13,23 @@ */ package zipkin2.reporter.beans; +import java.io.IOException; import java.util.List; -import zipkin2.reporter.Call; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; -class FakeSender extends Sender { - @Override public Encoding encoding() { - return Encoding.JSON; +class FakeSender extends BytesMessageSender.Base { + FakeSender() { + super(Encoding.JSON); } @Override public int messageMaxBytes() { return 1024; } - @Override public int messageSizeInBytes(List encodedSpans) { - return 1024; + @Override public void send(List encodedSpans) { } - @Override public Call sendSpans(List encodedSpans) { - return Call.create(null); + @Override public void close() throws IOException { } } diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/KafkaSenderFactoryBeanTest.java b/spring-beans/src/test/java/zipkin2/reporter/beans/KafkaSenderFactoryBeanTest.java index 579d66fc..8816fe7e 100755 --- a/spring-beans/src/test/java/zipkin2/reporter/beans/KafkaSenderFactoryBeanTest.java +++ b/spring-beans/src/test/java/zipkin2/reporter/beans/KafkaSenderFactoryBeanTest.java @@ -95,7 +95,7 @@ class KafkaSenderFactoryBeanTest { KafkaSender sender = context.getBean("sender", KafkaSender.class); context.close(); - sender.sendSpans(Arrays.asList(new byte[] {'{', '}'})); + sender.send(Arrays.asList(new byte[] {'{', '}'})); }); } } diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/LibthriftSenderFactoryBeanTest.java b/spring-beans/src/test/java/zipkin2/reporter/beans/LibthriftSenderFactoryBeanTest.java index 9fc6f976..f2284759 100755 --- a/spring-beans/src/test/java/zipkin2/reporter/beans/LibthriftSenderFactoryBeanTest.java +++ b/spring-beans/src/test/java/zipkin2/reporter/beans/LibthriftSenderFactoryBeanTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2023 The OpenZipkin Authors + * Copyright 2016-2024 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -110,7 +110,7 @@ class LibthriftSenderFactoryBeanTest { LibthriftSender sender = context.getBean("sender", LibthriftSender.class); context.close(); - sender.sendSpans(Arrays.asList(new byte[0])); + sender.send(Arrays.asList(new byte[0])); }); } } diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/OkHttpSenderFactoryBeanTest.java b/spring-beans/src/test/java/zipkin2/reporter/beans/OkHttpSenderFactoryBeanTest.java index e02f589d..70805aab 100755 --- a/spring-beans/src/test/java/zipkin2/reporter/beans/OkHttpSenderFactoryBeanTest.java +++ b/spring-beans/src/test/java/zipkin2/reporter/beans/OkHttpSenderFactoryBeanTest.java @@ -144,7 +144,7 @@ class OkHttpSenderFactoryBeanTest { OkHttpSender sender = context.getBean("sender", OkHttpSender.class); context.close(); - sender.sendSpans(Arrays.asList(new byte[]{'{', '}'})); + sender.send(Arrays.asList(new byte[]{'{', '}'})); }); } } diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/RabbitMQSenderFactoryBeanTest.java b/spring-beans/src/test/java/zipkin2/reporter/beans/RabbitMQSenderFactoryBeanTest.java index 02d8cfa5..d130f497 100755 --- a/spring-beans/src/test/java/zipkin2/reporter/beans/RabbitMQSenderFactoryBeanTest.java +++ b/spring-beans/src/test/java/zipkin2/reporter/beans/RabbitMQSenderFactoryBeanTest.java @@ -132,7 +132,7 @@ class RabbitMQSenderFactoryBeanTest { RabbitMQSender sender = context.getBean("sender", RabbitMQSender.class); context.close(); - sender.sendSpans(asList(new byte[] {'{', '}'})); + sender.send(asList(new byte[] {'{', '}'})); }); } } diff --git a/spring-beans/src/test/java/zipkin2/reporter/beans/URLConnectionSenderFactoryBeanTest.java b/spring-beans/src/test/java/zipkin2/reporter/beans/URLConnectionSenderFactoryBeanTest.java index 8aeb0621..3231fed8 100755 --- a/spring-beans/src/test/java/zipkin2/reporter/beans/URLConnectionSenderFactoryBeanTest.java +++ b/spring-beans/src/test/java/zipkin2/reporter/beans/URLConnectionSenderFactoryBeanTest.java @@ -124,7 +124,7 @@ class URLConnectionSenderFactoryBeanTest { URLConnectionSender sender = context.getBean("sender", URLConnectionSender.class); context.close(); - sender.sendSpans(Arrays.asList(new byte[]{'{', '}'})); + sender.send(Arrays.asList(new byte[]{'{', '}'})); }); } } diff --git a/urlconnection/pom.xml b/urlconnection/pom.xml index 00865da7..55ffac50 100644 --- a/urlconnection/pom.xml +++ b/urlconnection/pom.xml @@ -20,7 +20,7 @@ io.zipkin.reporter2 zipkin-reporter-parent - 3.1.2-SNAPSHOT + 3.2.0-SNAPSHOT zipkin-sender-urlconnection diff --git a/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java b/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java index 6e76f428..a779e16e 100644 --- a/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java +++ b/urlconnection/src/main/java/zipkin2/reporter/urlconnection/URLConnectionSender.java @@ -117,7 +117,7 @@ public Builder encoding(Encoding encoding) { return this; } - public final URLConnectionSender build() { + public URLConnectionSender build() { return new URLConnectionSender(this); } @@ -182,14 +182,20 @@ public Builder toBuilder() { return messageMaxBytes; } - /** The returned call sends spans as a POST to {@link Builder#endpoint}. */ - @Override public Call sendSpans(List encodedSpans) { + /** {@inheritDoc} */ + @Override @Deprecated public Call sendSpans(List encodedSpans) { if (closeCalled) throw new ClosedSenderException(); return new HttpPostCall(encoder.encode(encodedSpans)); } - /** Sends an empty json message to the configured endpoint. */ - @Override public CheckResult check() { + /** Sends spans as a POST to {@link Builder#endpoint}. */ + @Override public void send(List encodedSpans) throws IOException { + if (closeCalled) throw new ClosedSenderException(); + send(encoder.encode(encodedSpans), mediaType); + } + + /** {@inheritDoc} */ + @Override @Deprecated public CheckResult check() { try { send(new byte[] {'[', ']'}, "application/json"); return CheckResult.OK; @@ -199,12 +205,8 @@ public Builder toBuilder() { } } - @Override public void close() { - closeCalled = true; - } - void send(byte[] body, String mediaType) throws IOException { - // intentionally not closing the connection, so as to use keep-alives + // intentionally not closing the connection, to use keep-alives HttpURLConnection connection = (HttpURLConnection) endpoint.openConnection(); connection.setConnectTimeout(connectTimeout); connection.setReadTimeout(readTimeout); @@ -255,7 +257,11 @@ static IOException skipAndSuppress(InputStream in) { } } - @Override public final String toString() { + @Override public void close() { + closeCalled = true; + } + + @Override public String toString() { return "URLConnectionSender{" + endpoint + "}"; } diff --git a/urlconnection/src/test/java/zipkin2/reporter/urlconnection/ITURLConnectionSender.java b/urlconnection/src/test/java/zipkin2/reporter/urlconnection/ITURLConnectionSender.java index f401d9f0..5980c185 100644 --- a/urlconnection/src/test/java/zipkin2/reporter/urlconnection/ITURLConnectionSender.java +++ b/urlconnection/src/test/java/zipkin2/reporter/urlconnection/ITURLConnectionSender.java @@ -15,6 +15,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.stream.Stream; import okhttp3.mockwebserver.MockResponse; @@ -28,10 +29,9 @@ import zipkin2.codec.SpanBytesDecoder; import zipkin2.codec.SpanBytesEncoder; import zipkin2.reporter.AsyncReporter; -import zipkin2.reporter.Call; +import zipkin2.reporter.BytesMessageSender; import zipkin2.reporter.Callback; import zipkin2.reporter.Encoding; -import zipkin2.reporter.Sender; import static java.util.Arrays.asList; import static java.util.stream.Collectors.toList; @@ -50,59 +50,70 @@ class ITURLConnectionSender { String endpoint = server.url("/api/v2/spans").toString(); @BeforeEach void setUp() { - sender = URLConnectionSender.newBuilder() - .endpoint(endpoint) - .compressionEnabled(false) - .build(); + sender = URLConnectionSender.newBuilder().endpoint(endpoint).compressionEnabled(false).build(); } @Test void badUrlIsAnIllegalArgument() { - assertThatThrownBy(() -> URLConnectionSender.create("htp://localhost:9411/api/v1/spans")) - .isInstanceOf(IllegalArgumentException.class) - .hasMessage("unknown protocol: htp"); + assertThatThrownBy( + () -> URLConnectionSender.create("htp://localhost:9411/api/v1/spans")).isInstanceOf( + IllegalArgumentException.class).hasMessage("unknown protocol: htp"); } - @Test void sendsSpans() throws Exception { + @Test void send() throws Exception { server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // Ensure only one request was sent assertThat(server.getRequestCount()).isEqualTo(1); // Now, let's read back the spans we sent! - assertThat(SpanBytesDecoder.JSON_V2.decodeList(server.takeRequest().getBody().readByteArray())) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); + assertThat(SpanBytesDecoder.JSON_V2.decodeList( + server.takeRequest().getBody().readByteArray())).containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test void sendsSpans_PROTO3() throws Exception { + @Test void emptyOk() throws Exception { + server.enqueue(new MockResponse()); + + sender.send(Collections.emptyList()); + + assertThat(server.getRequestCount()).isEqualTo(1); + } + + @Test void sendFailsOnDisconnect() { + server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); + + assertThatThrownBy(() -> sendSpans(CLIENT_SPAN, CLIENT_SPAN)).isInstanceOf(IOException.class); + } + + @Test void send_PROTO3() throws Exception { sender = sender.toBuilder().encoding(Encoding.PROTO3).build(); server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // Ensure only one request was sent assertThat(server.getRequestCount()).isEqualTo(1); // Now, let's read back the spans we sent! - assertThat(SpanBytesDecoder.PROTO3.decodeList(server.takeRequest().getBody().readByteArray())) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); + assertThat(SpanBytesDecoder.PROTO3.decodeList( + server.takeRequest().getBody().readByteArray())).containsExactly(CLIENT_SPAN, CLIENT_SPAN); } - @Test void sendsSpans_THRIFT() throws Exception { + @Test void send_THRIFT() throws Exception { sender = sender.toBuilder().encoding(Encoding.THRIFT).build(); server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // Ensure only one request was sent assertThat(server.getRequestCount()).isEqualTo(1); // Now, let's read back the spans we sent! - assertThat(SpanBytesDecoder.THRIFT.decodeList(server.takeRequest().getBody().readByteArray())) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); + assertThat(SpanBytesDecoder.THRIFT.decodeList( + server.takeRequest().getBody().readByteArray())).containsExactly(CLIENT_SPAN, CLIENT_SPAN); } @Test void compression() throws Exception { @@ -112,21 +123,20 @@ class ITURLConnectionSender { server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // block until the request arrived requests.add(server.takeRequest()); } // we expect the first compressed request to be smaller than the uncompressed one. - assertThat(requests.get(0).getBodySize()) - .isLessThan(requests.get(1).getBodySize()); + assertThat(requests.get(0).getBodySize()).isLessThan(requests.get(1).getBodySize()); } @Test void ensuresProxiesDontTrace() throws Exception { server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // If the Zipkin endpoint is proxied and instrumented, it will know "0" means don't trace. assertThat(server.takeRequest().getHeader("b3")).isEqualTo("0"); @@ -135,17 +145,16 @@ class ITURLConnectionSender { @Test void mediaTypeBasedOnSpanEncoding() throws Exception { server.enqueue(new MockResponse()); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + sendSpans(CLIENT_SPAN, CLIENT_SPAN); // block until the request arrived - assertThat(server.takeRequest().getHeader("Content-Type")) - .isEqualTo("application/json"); + assertThat(server.takeRequest().getHeader("Content-Type")).isEqualTo("application/json"); } - @Test void noExceptionWhenServerErrors() { + @Deprecated @Test void noExceptionWhenServerErrors() { server.enqueue(new MockResponse().setResponseCode(500)); - send().enqueue(new Callback() { + sender.sendSpans(Collections.emptyList()).enqueue(new Callback() { @Override public void onSuccess(Void aVoid) { } @@ -154,38 +163,23 @@ class ITURLConnectionSender { }); } - @Test void check_ok() { - server.enqueue(new MockResponse()); - - assertThat(sender.check().ok()).isTrue(); - - assertThat(server.getRequestCount()).isEqualTo(1); - } - - @Test void check_fail() { - server.enqueue(new MockResponse().setSocketPolicy(SocketPolicy.DISCONNECT_AT_START)); - - assertThat(sender.check().ok()).isFalse(); - } - @Test void illegalToSendWhenClosed() { sender.close(); - assertThatThrownBy(() -> send(CLIENT_SPAN).execute()) - .isInstanceOf(IllegalStateException.class); + assertThatThrownBy(() -> sendSpans(CLIENT_SPAN)).isInstanceOf(IllegalStateException.class); } /** - * The output of toString() on {@link Sender} implementations appears in thread names created by - * {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other monitoring - * tools, care should be taken to ensure the toString() output is a reasonable length and does not - * contain sensitive information. + * The output of toString() on {@link BytesMessageSender} implementations appears in thread names + * created by {@link AsyncReporter}. Since thread names are likely to be exposed in logs and other + * monitoring tools, care should be taken to ensure the toString() output is a reasonable length + * and does not contain sensitive information. */ @Test void toStringContainsOnlySenderTypeAndEndpoint() { assertThat(sender.toString()).isEqualTo("URLConnectionSender{" + endpoint + "}"); } - Call send(Span... spans) { + void sendSpans(Span... spans) throws IOException { SpanBytesEncoder bytesEncoder; switch (sender.encoding()) { case JSON: @@ -200,6 +194,6 @@ Call send(Span... spans) { default: throw new UnsupportedOperationException("encoding: " + sender.encoding()); } - return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); + sender.send(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); } }