From 8ebdf6cfac891a08e05822ad96db109fa31b71cf Mon Sep 17 00:00:00 2001 From: Adrian Cole Date: Wed, 13 Dec 2023 15:15:14 +0700 Subject: [PATCH] ci: moves ActiveMQ to docker based tests Signed-off-by: Adrian Cole --- activemq-client/pom.xml | 13 +- .../reporter/activemq/ActiveMQSender.java | 5 +- .../reporter/activemq/ActiveMQExtension.java | 76 +++++++++ .../reporter/activemq/ITActiveMQSender.java | 160 ++++++++---------- .../src/test/resources/logback.xml | 13 ++ 5 files changed, 163 insertions(+), 104 deletions(-) create mode 100644 activemq-client/src/test/java/zipkin2/reporter/activemq/ActiveMQExtension.java create mode 100644 activemq-client/src/test/resources/logback.xml diff --git a/activemq-client/pom.xml b/activemq-client/pom.xml index d86e03a3..946203bd 100644 --- a/activemq-client/pom.xml +++ b/activemq-client/pom.xml @@ -52,16 +52,9 @@ - org.apache.activemq - activemq-broker - ${activemq.version} - test - - - - org.apache.activemq.tooling - activemq-junit - ${activemq.version} + org.testcontainers + testcontainers + ${testcontainers.version} test diff --git a/activemq-client/src/main/java/zipkin2/reporter/activemq/ActiveMQSender.java b/activemq-client/src/main/java/zipkin2/reporter/activemq/ActiveMQSender.java index 6617cc57..fa5e1a44 100644 --- a/activemq-client/src/main/java/zipkin2/reporter/activemq/ActiveMQSender.java +++ b/activemq-client/src/main/java/zipkin2/reporter/activemq/ActiveMQSender.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 The OpenZipkin Authors + * Copyright 2016-2023 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -32,7 +32,7 @@ * This sends (usually json v2) encoded spans to an ActiveMQ queue. * *

Usage

- * + *

* This type is designed for {@link AsyncReporter.Builder#builder(Sender) the async reporter}. * *

Here's a simple configuration, configured for json: @@ -132,7 +132,6 @@ public final ActiveMQSender build() { } /** get and close are typically called from different threads */ - volatile ActiveMQConn conn; volatile boolean closeCalled; @Override public Encoding encoding() { diff --git a/activemq-client/src/test/java/zipkin2/reporter/activemq/ActiveMQExtension.java b/activemq-client/src/test/java/zipkin2/reporter/activemq/ActiveMQExtension.java new file mode 100644 index 00000000..e97ea61d --- /dev/null +++ b/activemq-client/src/test/java/zipkin2/reporter/activemq/ActiveMQExtension.java @@ -0,0 +1,76 @@ +/* + * Copyright 2016-2023 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package zipkin2.reporter.activemq; + +import java.time.Duration; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.junit.jupiter.api.extension.AfterAllCallback; +import org.junit.jupiter.api.extension.BeforeAllCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.opentest4j.TestAbortedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +import static org.testcontainers.utility.DockerImageName.parse; + +class ActiveMQExtension implements BeforeAllCallback, AfterAllCallback { + static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQExtension.class); + static final int ACTIVEMQ_PORT = 61616; + + ActiveMQContainer container = new ActiveMQContainer(); + + @Override public void beforeAll(ExtensionContext context) { + if (context.getRequiredTestClass().getEnclosingClass() != null) { + // Only run once in outermost scope. + return; + } + + container.start(); + LOGGER.info("Using brokerURL " + brokerURL()); + } + + @Override public void afterAll(ExtensionContext context) { + if (context.getRequiredTestClass().getEnclosingClass() != null) { + // Only run once in outermost scope. + return; + } + + container.stop(); + } + + ActiveMQSender.Builder newSenderBuilder(String queue) { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); + connectionFactory.setBrokerURL(brokerURL()); + return ActiveMQSender.newBuilder().queue(queue).connectionFactory(connectionFactory); + } + + String brokerURL() { + return "failover:tcp://" + container.getHost() + ":" + container.getMappedPort(ACTIVEMQ_PORT); + } + + // mostly waiting for https://github.com/testcontainers/testcontainers-java/issues/3537 + static final class ActiveMQContainer extends GenericContainer { + ActiveMQContainer() { + super(parse("ghcr.io/openzipkin/zipkin-activemq:2.25.0")); + if ("true".equals(System.getProperty("docker.skip"))) { + throw new TestAbortedException("${docker.skip} == true"); + } + withExposedPorts(ACTIVEMQ_PORT); + waitStrategy = Wait.forListeningPorts(ACTIVEMQ_PORT); + withStartupTimeout(Duration.ofSeconds(60)); + } + } +} diff --git a/activemq-client/src/test/java/zipkin2/reporter/activemq/ITActiveMQSender.java b/activemq-client/src/test/java/zipkin2/reporter/activemq/ITActiveMQSender.java index 4db5783e..f4316e0b 100644 --- a/activemq-client/src/test/java/zipkin2/reporter/activemq/ITActiveMQSender.java +++ b/activemq-client/src/test/java/zipkin2/reporter/activemq/ITActiveMQSender.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 The OpenZipkin Authors + * Copyright 2016-2023 The OpenZipkin Authors * * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except * in compliance with the License. You may obtain a copy of the License at @@ -16,14 +16,12 @@ import java.io.IOException; import java.util.stream.Stream; import javax.jms.BytesMessage; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.junit.EmbeddedActiveMQBroker; -import org.junit.After; -import org.junit.Before; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; import zipkin2.Call; import zipkin2.CheckResult; import zipkin2.Span; @@ -38,90 +36,72 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static zipkin2.TestObjects.CLIENT_SPAN; +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Timeout(60) public class ITActiveMQSender { - @ClassRule public static EmbeddedActiveMQBroker activemq = new EmbeddedActiveMQBroker(); - @Rule public TestName testName = new TestName(); + @RegisterExtension ActiveMQExtension activemq = new ActiveMQExtension(); - ActiveMQSender sender; - - @Before public void start() { - sender = builder().build(); - } - - @After public void stop() throws IOException { - sender.close(); - } - - @Test public void checkPasses() { - assertThat(sender.check().ok()).isTrue(); + @Test void checkPasses() { + try (ActiveMQSender sender = activemq.newSenderBuilder("checkPasses").build()) { + assertThat(sender.check().ok()).isTrue(); + } } - @Test public void checkFalseWhenBrokerIsDown() throws IOException { - sender.close(); - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); + @Test void checkFalseWhenBrokerIsDown() { // we can be pretty certain ActiveMQ isn't running on localhost port 80 - connectionFactory.setBrokerURL("tcp://localhost:80"); - sender = builder().connectionFactory(connectionFactory).build(); - - CheckResult check = sender.check(); - assertThat(check.ok()).isFalse(); - assertThat(check.error()).isInstanceOf(IOException.class); + try (ActiveMQSender sender = ActiveMQSender.create("tcp://localhost:80")) { + CheckResult check = sender.check(); + assertThat(check.ok()).isFalse(); + assertThat(check.error()).isInstanceOf(IOException.class); + } } - @Test public void sendFailsWithInvalidActiveMqServer() throws Exception { - sender.close(); - ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); + @Test void sendFailsWithInvalidActiveMqServer() { // we can be pretty certain ActiveMQ isn't running on localhost port 80 - connectionFactory.setBrokerURL("tcp://localhost:80"); - sender = builder().connectionFactory(connectionFactory).build(); - - assertThatThrownBy(() -> send(CLIENT_SPAN, CLIENT_SPAN).execute()) - .isInstanceOf(IOException.class) - .hasMessageContaining("Unable to establish connection to ActiveMQ broker"); - } - - @Test public void sendsSpans() throws Exception { - send(CLIENT_SPAN, CLIENT_SPAN).execute(); - - assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage())) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); + try (ActiveMQSender sender = ActiveMQSender.create("tcp://localhost:80")) { + assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN).execute()).isInstanceOf( + IOException.class) + .hasMessageContaining("Unable to establish connection to ActiveMQ broker"); + } } - @Test public void sendsSpans_PROTO3() throws Exception { - sender.close(); - sender = builder().encoding(Encoding.PROTO3).build(); + @Test void sendsSpans() throws Exception { + try (ActiveMQSender sender = activemq.newSenderBuilder("sendsSpans").build()) { + send(sender, CLIENT_SPAN, CLIENT_SPAN).execute(); - send(CLIENT_SPAN, CLIENT_SPAN).execute(); - - assertThat(SpanBytesDecoder.PROTO3.decodeList(readMessage())) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); + assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender))).containsExactly( + CLIENT_SPAN, CLIENT_SPAN); + } } - @Test public void sendsSpans_THRIFT() throws Exception { - sender.close(); - sender = builder().encoding(Encoding.THRIFT).build(); - - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + @Test void sendsSpans_PROTO3() throws Exception { + try (ActiveMQSender sender = activemq.newSenderBuilder("sendsSpans_PROTO3") + .encoding(Encoding.PROTO3) + .build()) { + send(sender, CLIENT_SPAN, CLIENT_SPAN).execute(); - assertThat(SpanBytesDecoder.THRIFT.decodeList(readMessage())) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); + assertThat(SpanBytesDecoder.PROTO3.decodeList(readMessage(sender))).containsExactly( + CLIENT_SPAN, CLIENT_SPAN); + } } - @Test public void sendsSpansToCorrectQueue() throws Exception { - sender.close(); - sender = builder().queue("customzipkinqueue").build(); - - send(CLIENT_SPAN, CLIENT_SPAN).execute(); + @Test void sendsSpans_THRIFT() throws Exception { + try (ActiveMQSender sender = activemq.newSenderBuilder("sendsSpans_THRIFT") + .encoding(Encoding.THRIFT) + .build()) { + send(sender, CLIENT_SPAN, CLIENT_SPAN).execute(); - assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage())) - .containsExactly(CLIENT_SPAN, CLIENT_SPAN); + assertThat(SpanBytesDecoder.THRIFT.decodeList(readMessage(sender))).containsExactly( + CLIENT_SPAN, CLIENT_SPAN); + } } - @Test public void illegalToSendWhenClosed() { - sender.close(); - - assertThatThrownBy(() -> send(CLIENT_SPAN, CLIENT_SPAN).execute()) - .isInstanceOf(IllegalStateException.class); + @Test void illegalToSendWhenClosed() { + try (ActiveMQSender sender = activemq.newSenderBuilder("illegalToSendWhenClosed").build()) { + sender.close(); + assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN).execute()).isInstanceOf( + IllegalStateException.class); + } } /** @@ -130,13 +110,14 @@ public class ITActiveMQSender { * tools, care should be taken to ensure the toString() output is a reasonable length and does not * contain sensitive information. */ - @Test public void toStringContainsOnlySummaryInformation() { - assertThat(sender).hasToString(String.format("ActiveMQSender{brokerURL=%s, queue=%s}", - activemq.getVmURL(), testName.getMethodName()) - ); + @Test void toStringContainsOnlySummaryInformation() { + try (ActiveMQSender sender = activemq.newSenderBuilder("toString").build()) { + assertThat(sender).hasToString( + String.format("ActiveMQSender{brokerURL=%s, queue=toString}", activemq.brokerURL())); + } } - Call send(Span... spans) { + Call send(ActiveMQSender sender, Span... spans) { SpanBytesEncoder bytesEncoder; switch (sender.encoding()) { case JSON: @@ -154,17 +135,14 @@ Call send(Span... spans) { return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList())); } - private byte[] readMessage() throws Exception { - BytesMessage message = activemq.peekBytesMessage(sender.lazyInit.queue); - byte[] result = new byte[(int) message.getBodyLength()]; - message.readBytes(result); - return result; - } - - ActiveMQSender.Builder builder() { - return ActiveMQSender.newBuilder() - .connectionFactory(activemq.createConnectionFactory()) - // prevent test flakes by having each run in an individual queue - .queue(testName.getMethodName()); + byte[] readMessage(ActiveMQSender sender) throws Exception { + ActiveMQConn conn = sender.lazyInit.get(); + Queue queue = conn.sender.getQueue(); + try (MessageConsumer consumer = conn.session.createConsumer(queue)) { + BytesMessage message = (BytesMessage) consumer.receive(1000L); + byte[] result = new byte[(int) message.getBodyLength()]; + message.readBytes(result); + return result; + } } } diff --git a/activemq-client/src/test/resources/logback.xml b/activemq-client/src/test/resources/logback.xml new file mode 100644 index 00000000..492858ca --- /dev/null +++ b/activemq-client/src/test/resources/logback.xml @@ -0,0 +1,13 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + +