Skip to content

Commit

Permalink
ci: moves ActiveMQ to docker based tests
Browse files Browse the repository at this point in the history
Signed-off-by: Adrian Cole <adrian@tetrate.io>
  • Loading branch information
Adrian Cole committed Dec 13, 2023
1 parent 9ca8a6c commit cfceb8b
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 105 deletions.
13 changes: 3 additions & 10 deletions activemq-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,9 @@
</dependency>

<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<version>${activemq.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.activemq.tooling</groupId>
<artifactId>activemq-junit</artifactId>
<version>${activemq.version}</version>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 The OpenZipkin Authors
* Copyright 2016-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -32,7 +32,7 @@
* This sends (usually json v2) encoded spans to an ActiveMQ queue.
*
* <h3>Usage</h3>
*
* <p>
* This type is designed for {@link AsyncReporter.Builder#builder(Sender) the async reporter}.
*
* <p>Here's a simple configuration, configured for json:
Expand Down Expand Up @@ -132,7 +132,6 @@ public final ActiveMQSender build() {
}

/** get and close are typically called from different threads */
volatile ActiveMQConn conn;
volatile boolean closeCalled;

@Override public Encoding encoding() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2016-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package zipkin2.reporter.activemq;

import java.time.Duration;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.opentest4j.TestAbortedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;

import static org.testcontainers.utility.DockerImageName.parse;

class ActiveMQExtension implements BeforeAllCallback, AfterAllCallback {
static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQExtension.class);
static final int ACTIVEMQ_PORT = 61616;

ActiveMQContainer container = new ActiveMQContainer();

@Override public void beforeAll(ExtensionContext context) {
if (context.getRequiredTestClass().getEnclosingClass() != null) {
// Only run once in outermost scope.
return;
}

container.start();
LOGGER.info("Using brokerURL " + brokerURL());
}

@Override public void afterAll(ExtensionContext context) {
if (context.getRequiredTestClass().getEnclosingClass() != null) {
// Only run once in outermost scope.
return;
}

container.stop();
}

ActiveMQSender.Builder newSenderBuilder(String queue) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL(brokerURL());
return ActiveMQSender.newBuilder().queue(queue).connectionFactory(connectionFactory);
}

String brokerURL() {
return "failover:tcp://" + container.getHost() + ":" + container.getMappedPort(ACTIVEMQ_PORT);
}

// mostly waiting for https://github.com/testcontainers/testcontainers-java/issues/3537
static final class ActiveMQContainer extends GenericContainer<ActiveMQContainer> {
ActiveMQContainer() {
super(parse("openzipkin/zipkin-activemq:test"));
if ("true".equals(System.getProperty("docker.skip"))) {
throw new TestAbortedException("${docker.skip} == true");
}
withExposedPorts(ACTIVEMQ_PORT);
waitStrategy = Wait.forListeningPorts(ACTIVEMQ_PORT);
withStartupTimeout(Duration.ofSeconds(60));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2020 The OpenZipkin Authors
* Copyright 2016-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -16,14 +16,12 @@
import java.io.IOException;
import java.util.stream.Stream;
import javax.jms.BytesMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.junit.EmbeddedActiveMQBroker;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import zipkin2.Call;
import zipkin2.CheckResult;
import zipkin2.Span;
Expand All @@ -38,90 +36,70 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static zipkin2.TestObjects.CLIENT_SPAN;

public class ITActiveMQSender {
@ClassRule public static EmbeddedActiveMQBroker activemq = new EmbeddedActiveMQBroker();
@Rule public TestName testName = new TestName();
@TestInstance(TestInstance.Lifecycle.PER_CLASS) @Timeout(60) public class ITActiveMQSender {
@RegisterExtension ActiveMQExtension activemq = new ActiveMQExtension();

ActiveMQSender sender;

@Before public void start() {
sender = builder().build();
}

@After public void stop() throws IOException {
sender.close();
}

@Test public void checkPasses() {
assertThat(sender.check().ok()).isTrue();
@Test void checkPasses() {
try (ActiveMQSender sender = activemq.newSenderBuilder("checkPasses").build()) {
assertThat(sender.check().ok()).isTrue();
}
}

@Test public void checkFalseWhenBrokerIsDown() throws IOException {
sender.close();
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
@Test void checkFalseWhenBrokerIsDown() {
// we can be pretty certain ActiveMQ isn't running on localhost port 80
connectionFactory.setBrokerURL("tcp://localhost:80");
sender = builder().connectionFactory(connectionFactory).build();

CheckResult check = sender.check();
assertThat(check.ok()).isFalse();
assertThat(check.error()).isInstanceOf(IOException.class);
try (ActiveMQSender sender = ActiveMQSender.create("tcp://localhost:80")) {
CheckResult check = sender.check();
assertThat(check.ok()).isFalse();
assertThat(check.error()).isInstanceOf(IOException.class);
}
}

@Test public void sendFailsWithInvalidActiveMqServer() throws Exception {
sender.close();
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
@Test void sendFailsWithInvalidActiveMqServer() {
// we can be pretty certain ActiveMQ isn't running on localhost port 80
connectionFactory.setBrokerURL("tcp://localhost:80");
sender = builder().connectionFactory(connectionFactory).build();

assertThatThrownBy(() -> send(CLIENT_SPAN, CLIENT_SPAN).execute())
.isInstanceOf(IOException.class)
.hasMessageContaining("Unable to establish connection to ActiveMQ broker");
}

@Test public void sendsSpans() throws Exception {
send(CLIENT_SPAN, CLIENT_SPAN).execute();

assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage()))
.containsExactly(CLIENT_SPAN, CLIENT_SPAN);
try (ActiveMQSender sender = ActiveMQSender.create("tcp://localhost:80")) {
assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN).execute()).isInstanceOf(
IOException.class)
.hasMessageContaining("Unable to establish connection to ActiveMQ broker");
}
}

@Test public void sendsSpans_PROTO3() throws Exception {
sender.close();
sender = builder().encoding(Encoding.PROTO3).build();
@Test void sendsSpans() throws Exception {
try (ActiveMQSender sender = activemq.newSenderBuilder("sendsSpans").build()) {
send(sender, CLIENT_SPAN, CLIENT_SPAN).execute();

send(CLIENT_SPAN, CLIENT_SPAN).execute();

assertThat(SpanBytesDecoder.PROTO3.decodeList(readMessage()))
.containsExactly(CLIENT_SPAN, CLIENT_SPAN);
assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender))).containsExactly(
CLIENT_SPAN, CLIENT_SPAN);
}
}

@Test public void sendsSpans_THRIFT() throws Exception {
sender.close();
sender = builder().encoding(Encoding.THRIFT).build();

send(CLIENT_SPAN, CLIENT_SPAN).execute();
@Test void sendsSpans_PROTO3() throws Exception {
try (ActiveMQSender sender = activemq.newSenderBuilder("sendsSpans_PROTO3")
.encoding(Encoding.PROTO3)
.build()) {
send(sender, CLIENT_SPAN, CLIENT_SPAN).execute();

assertThat(SpanBytesDecoder.THRIFT.decodeList(readMessage()))
.containsExactly(CLIENT_SPAN, CLIENT_SPAN);
assertThat(SpanBytesDecoder.PROTO3.decodeList(readMessage(sender))).containsExactly(
CLIENT_SPAN, CLIENT_SPAN);
}
}

@Test public void sendsSpansToCorrectQueue() throws Exception {
sender.close();
sender = builder().queue("customzipkinqueue").build();

send(CLIENT_SPAN, CLIENT_SPAN).execute();
@Test void sendsSpans_THRIFT() throws Exception {
try (ActiveMQSender sender = activemq.newSenderBuilder("sendsSpans_THRIFT")
.encoding(Encoding.THRIFT)
.build()) {
send(sender, CLIENT_SPAN, CLIENT_SPAN).execute();

assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage()))
.containsExactly(CLIENT_SPAN, CLIENT_SPAN);
assertThat(SpanBytesDecoder.THRIFT.decodeList(readMessage(sender))).containsExactly(
CLIENT_SPAN, CLIENT_SPAN);
}
}

@Test public void illegalToSendWhenClosed() {
sender.close();

assertThatThrownBy(() -> send(CLIENT_SPAN, CLIENT_SPAN).execute())
.isInstanceOf(IllegalStateException.class);
@Test void illegalToSendWhenClosed() {
try (ActiveMQSender sender = activemq.newSenderBuilder("illegalToSendWhenClosed").build()) {
sender.close();
assertThatThrownBy(() -> send(sender, CLIENT_SPAN, CLIENT_SPAN).execute()).isInstanceOf(
IllegalStateException.class);
}
}

/**
Expand All @@ -130,13 +108,14 @@ public class ITActiveMQSender {
* tools, care should be taken to ensure the toString() output is a reasonable length and does not
* contain sensitive information.
*/
@Test public void toStringContainsOnlySummaryInformation() {
assertThat(sender).hasToString(String.format("ActiveMQSender{brokerURL=%s, queue=%s}",
activemq.getVmURL(), testName.getMethodName())
);
@Test void toStringContainsOnlySummaryInformation() {
try (ActiveMQSender sender = activemq.newSenderBuilder("toString").build()) {
assertThat(sender).hasToString(
String.format("ActiveMQSender{brokerURL=%s, queue=toString}", activemq.brokerURL()));
}
}

Call<Void> send(Span... spans) {
Call<Void> send(ActiveMQSender sender, Span... spans) {
SpanBytesEncoder bytesEncoder;
switch (sender.encoding()) {
case JSON:
Expand All @@ -154,17 +133,14 @@ Call<Void> send(Span... spans) {
return sender.sendSpans(Stream.of(spans).map(bytesEncoder::encode).collect(toList()));
}

private byte[] readMessage() throws Exception {
BytesMessage message = activemq.peekBytesMessage(sender.lazyInit.queue);
byte[] result = new byte[(int) message.getBodyLength()];
message.readBytes(result);
return result;
}

ActiveMQSender.Builder builder() {
return ActiveMQSender.newBuilder()
.connectionFactory(activemq.createConnectionFactory())
// prevent test flakes by having each run in an individual queue
.queue(testName.getMethodName());
byte[] readMessage(ActiveMQSender sender) throws Exception {
ActiveMQConn conn = sender.lazyInit.get();
Queue queue = conn.sender.getQueue();
try (MessageConsumer consumer = conn.session.createConsumer(queue)) {
BytesMessage message = (BytesMessage) consumer.receive(1000L);
byte[] result = new byte[(int) message.getBodyLength()];
message.readBytes(result);
return result;
}
}
}
13 changes: 13 additions & 0 deletions activemq-client/src/test/resources/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
</encoder>
</appender>

<root level="WARN">
<appender-ref ref="STDOUT" />
</root>
</configuration>

0 comments on commit cfceb8b

Please sign in to comment.