diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 1282d1a34..9c8d09a88 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -16,10 +16,10 @@ jobs: python-version: ["3.10"] steps: - name: Checkout Repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v3 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Install dependencies diff --git a/.github/workflows/add-untriaged.yml b/.github/workflows/add-untriaged.yml index 9dcc7020d..74e494be1 100644 --- a/.github/workflows/add-untriaged.yml +++ b/.github/workflows/add-untriaged.yml @@ -8,7 +8,7 @@ jobs: apply-label: runs-on: ubuntu-latest steps: - - uses: actions/github-script@v6 + - uses: actions/github-script@v7 with: script: | github.rest.issues.addLabels({ diff --git a/.github/workflows/backport.yml b/.github/workflows/backport.yml index 5a75d2c87..0400f8daf 100644 --- a/.github/workflows/backport.yml +++ b/.github/workflows/backport.yml @@ -16,14 +16,14 @@ jobs: steps: - name: GitHub App token id: github_app_token - uses: tibdex/github-app-token@v1.5.0 + uses: tibdex/github-app-token@v2 with: app_id: ${{ secrets.APP_ID }} private_key: ${{ secrets.APP_PRIVATE_KEY }} installation_id: 22958780 - name: Backport - uses: VachaShah/backport@v2.1.0 + uses: VachaShah/backport@v2 with: github_token: ${{ steps.github_app_token.outputs.token }} head_template: backport/backport-<%= number %>-to-<%= base %> diff --git a/.github/workflows/e2eTest.yml b/.github/workflows/e2eTest.yml index 7393ca28c..7e4419dc6 100644 --- a/.github/workflows/e2eTest.yml +++ b/.github/workflows/e2eTest.yml @@ -14,22 +14,27 @@ jobs: steps: - name: Check out code - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set up JDK - uses: actions/setup-java@v2 + uses: actions/setup-java@v4 with: java-version: '11' - distribution: 'adopt' + distribution: 'corretto' + + - name: Setup Gradle + uses: gradle/actions/setup-gradle@v3 + with: + gradle-version: 8.0.2 + gradle-home-cache-cleanup: true + cache-read-only: false - name: Start Docker Solution - run: | - cd TrafficCapture - chmod +x ./gradlew - ./gradlew dockerSolution:ComposeUp + run: ./gradlew dockerSolution:ComposeUp -x test + working-directory: TrafficCapture - name: Set up Python - uses: actions/setup-python@v3 + uses: actions/setup-python@v5 with: python-version: '3.10' diff --git a/.github/workflows/gradle-build-and-test.yml b/.github/workflows/gradle-build-and-test.yml index 5ddda0506..fc3b82db4 100644 --- a/.github/workflows/gradle-build-and-test.yml +++ b/.github/workflows/gradle-build-and-test.yml @@ -8,15 +8,23 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - name: Checkout project sources + uses: actions/checkout@v4 + - name: Set up JDK 11 - uses: actions/setup-java@v2 + uses: actions/setup-java@v4 with: java-version: '11' - distribution: 'adopt' + distribution: 'corretto' + + - name: Setup Gradle + uses: gradle/actions/setup-gradle@v3 + with: + gradle-version: 8.0.2 + gradle-home-cache-cleanup: true - name: Run Gradle Build - run: ./gradlew build -x test + run: ./gradlew build -x test --info working-directory: TrafficCapture - name: Run Tests with Coverage @@ -24,7 +32,7 @@ jobs: working-directory: TrafficCapture - name: Upload to Codecov - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 with: files: "TrafficCapture/**/jacocoTestReport.xml" flags: unittests diff --git a/.github/workflows/linkCheck.yml b/.github/workflows/linkCheck.yml index 382a2a6bd..23af1a6da 100644 --- a/.github/workflows/linkCheck.yml +++ b/.github/workflows/linkCheck.yml @@ -13,10 +13,10 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - name: lychee Link Checker id: lychee - uses: lycheeverse/lychee-action@v1.5.0 + uses: lycheeverse/lychee-action@v1 with: args: --verbose --accept=200,403,429 "**/*.html" "**/*.md" "**/*.txt" "**/*.json" --exclude "file:///github/workspace/*" diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml index dfbd65288..fcfe5b1a4 100644 --- a/.github/workflows/python-tests.yml +++ b/.github/workflows/python-tests.yml @@ -20,9 +20,9 @@ jobs: working-directory: ./experimental/cluster_migration_core steps: - name: Checkout Repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Set up Python ${{ matrix.python-version }} - uses: actions/setup-python@v3 + uses: actions/setup-python@v5 with: python-version: ${{ matrix.python-version }} - name: Install Dependencies for Framework and Test Coverage @@ -32,6 +32,6 @@ jobs: run: | python -m pytest unit_tests/ --cov=cluster_migration_core --cov-report=xml --cov-branch - name: Upload Coverage Report - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 with: files: cluster_migration_core/coverage.xml diff --git a/.github/workflows/release-drafter.yml b/.github/workflows/release-drafter.yml index 1c91b32f0..5f9c786c6 100644 --- a/.github/workflows/release-drafter.yml +++ b/.github/workflows/release-drafter.yml @@ -11,7 +11,7 @@ jobs: name: Draft a release runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - id: get_data run: | echo "approvers=$(cat .github/CODEOWNERS | grep @ | tr -d '*\n ' | sed 's/@/,/g' | sed 's/,//1')" >> $GITHUB_OUTPUT diff --git a/TrafficCapture/build.gradle b/TrafficCapture/build.gradle index d8c352490..58afc0aa0 100644 --- a/TrafficCapture/build.gradle +++ b/TrafficCapture/build.gradle @@ -3,8 +3,10 @@ plugins { id 'org.owasp.dependencycheck' version '8.2.1' } -repositories { - mavenCentral() +allprojects { + repositories { + mavenCentral() + } } allprojects { @@ -19,38 +21,34 @@ allprojects { } jacoco { - toolVersion = '0.8.9' + toolVersion = '0.8.11' } - test { + tasks.withType(Test) { // Provide way to exclude particular tests from CLI // e.g. ./gradlew test -PexcludeTests=**/KafkaProtobufConsumerLongTermTest* if (project.hasProperty('excludeTests')) { exclude project.property('excludeTests') } + useJUnitPlatform { + systemProperty 'junit.jupiter.execution.parallel.enabled', 'true' + systemProperty 'junit.jupiter.execution.parallel.mode.default', "concurrent" + systemProperty 'junit.jupiter.execution.parallel.mode.classes.default', 'concurrent' + } + } + + test { systemProperty 'disableMemoryLeakTests', 'true' useJUnitPlatform { excludeTags 'longTest' } - jvmArgs '-ea' - jacoco { enabled = false } } task slowTest(type: Test) { - dependsOn test - // Provide way to exclude particular tests from CLI - // e.g. ./gradlew test -PexcludeTests=**/KafkaProtobufConsumerLongTermTest* - if (project.hasProperty('excludeTests')) { - exclude project.property('excludeTests') - } systemProperty 'disableMemoryLeakTests', 'false' - useJUnitPlatform { - } - jvmArgs '-ea' - jacoco { enabled = true } @@ -86,4 +84,4 @@ jacocoTestReport { html.required = true html.destination file("${buildDir}/reports/jacoco/test/html") } -} \ No newline at end of file +} diff --git a/TrafficCapture/gradle.properties b/TrafficCapture/gradle.properties new file mode 100644 index 000000000..5ca1b8ea9 --- /dev/null +++ b/TrafficCapture/gradle.properties @@ -0,0 +1,7 @@ +org.gradle.caching=true +org.gradle.configuration-cache=true +org.gradle.configureondemand=true + +# Set Gradle Daemon's idle timeout to 30 minutes +org.gradle.daemon.idletimeout=1800000 +org.gradle.parallel=true \ No newline at end of file diff --git a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java index f4fe7b893..9fecf3e17 100644 --- a/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java +++ b/TrafficCapture/nettyWireLogging/src/test/java/org/opensearch/migrations/trafficcapture/netty/ConditionallyReliableLoggingHttpHandlerTest.java @@ -11,6 +11,7 @@ import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.ResourceLock; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.opensearch.migrations.testutils.TestUtilities; @@ -128,6 +129,7 @@ private static byte[] consumeIntoArray(ByteBuf m) { @ParameterizedTest @ValueSource(booleans = {true, false}) + @ResourceLock("OpenTelemetryExtension") public void testThatAPostInASinglePacketBlocksFutureActivity(boolean usePool) throws IOException { byte[] fullTrafficBytes = SimpleRequests.SMALL_POST.getBytes(StandardCharsets.UTF_8); var bb = TestUtilities.getByteBuf(fullTrafficBytes, usePool); @@ -137,6 +139,7 @@ public void testThatAPostInASinglePacketBlocksFutureActivity(boolean usePool) th @ParameterizedTest @ValueSource(booleans = {true, false}) + @ResourceLock("OpenTelemetryExtension") public void testThatAPostInTinyPacketsBlocksFutureActivity(boolean usePool) throws IOException { byte[] fullTrafficBytes = SimpleRequests.SMALL_POST.getBytes(StandardCharsets.UTF_8); writeMessageAndVerify(fullTrafficBytes, getSingleByteAtATimeWriter(usePool, fullTrafficBytes)); @@ -261,6 +264,7 @@ private Consumer getWriter(boolean singleBytes, boolean usePool @ParameterizedTest @ValueSource(booleans = {true, false}) @WrapWithNettyLeakDetection(repetitions = 16) + @ResourceLock("OpenTelemetryExtension") public void testThatAPostInTinyPacketsBlocksFutureActivity_withLeakDetection(boolean usePool) throws Exception { testThatAPostInTinyPacketsBlocksFutureActivity(usePool); //MyResourceLeakDetector.dumpHeap("nettyWireLogging_"+COUNT+"_"+ Instant.now() +".hprof", true); @@ -269,6 +273,7 @@ public void testThatAPostInTinyPacketsBlocksFutureActivity_withLeakDetection(boo @ParameterizedTest @ValueSource(booleans = {true, false}) @WrapWithNettyLeakDetection(repetitions = 32) + @ResourceLock("OpenTelemetryExtension") public void testThatAPostInASinglePacketBlocksFutureActivity_withLeakDetection(boolean usePool) throws Exception { testThatAPostInASinglePacketBlocksFutureActivity(usePool); //MyResourceLeakDetector.dumpHeap("nettyWireLogging_"+COUNT+"_"+ Instant.now() +".hprof", true); diff --git a/TrafficCapture/replayerPlugins/build.gradle b/TrafficCapture/replayerPlugins/build.gradle index fb3c3f88a..6b416abc4 100644 --- a/TrafficCapture/replayerPlugins/build.gradle +++ b/TrafficCapture/replayerPlugins/build.gradle @@ -2,4 +2,4 @@ subprojects { repositories { mavenCentral() } -} \ No newline at end of file +} diff --git a/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/PortFinder.java b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/PortFinder.java index 3a5d5112b..7ef0bae24 100644 --- a/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/PortFinder.java +++ b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/PortFinder.java @@ -1,6 +1,8 @@ package org.opensearch.migrations.testutils; +import java.io.IOException; +import java.net.ServerSocket; import lombok.extern.slf4j.Slf4j; import java.util.Random; @@ -17,7 +19,6 @@ public class PortFinder { private PortFinder() {} private static final int MAX_PORT_TRIES = 100; - private static final Random random = new Random(); public static class ExceededMaxPortAssigmentAttemptException extends Exception { public ExceededMaxPortAssigmentAttemptException(Throwable cause) { @@ -30,11 +31,11 @@ public static int retryWithNewPortUntilNoThrow(IntConsumer r) int numTries = 0; while (true) { try { - int port = random.nextInt((2 << 15) - 1025) + 1025; - r.accept(Integer.valueOf(port)); + int port = findOpenPort(); + r.accept(port); return port; } catch (Exception e) { - if (++numTries <= MAX_PORT_TRIES) { + if (++numTries >= MAX_PORT_TRIES) { log.atError().setCause(e).setMessage(()->"Exceeded max tries {} giving up") .addArgument(MAX_PORT_TRIES).log(); throw new ExceededMaxPortAssigmentAttemptException(e); @@ -44,4 +45,14 @@ public static int retryWithNewPortUntilNoThrow(IntConsumer r) } } + public static int findOpenPort() { + try (ServerSocket serverSocket = new ServerSocket(0)) { + int port = serverSocket.getLocalPort(); + log.info("Open port found: " + port); + return port; + } catch (IOException e) { + log.error("Failed to find an open port: " + e.getMessage()); + throw new RuntimeException(e); + } + } } \ No newline at end of file diff --git a/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java index a2bba88f2..13799499b 100644 --- a/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java +++ b/TrafficCapture/testUtilities/src/testFixtures/java/org/opensearch/migrations/testutils/SimpleHttpClientForTesting.java @@ -5,6 +5,8 @@ import lombok.AllArgsConstructor; import org.apache.hc.client5.http.classic.methods.HttpGet; import org.apache.hc.client5.http.classic.methods.HttpPut; +import org.apache.hc.client5.http.config.ConnectionConfig; +import org.apache.hc.client5.http.config.RequestConfig; import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.classic.HttpClients; import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager; @@ -19,6 +21,8 @@ import org.apache.hc.core5.http.io.entity.InputStreamEntity; import org.apache.hc.core5.http.io.entity.StringEntity; import org.apache.hc.core5.ssl.SSLContexts; +import org.apache.hc.core5.util.Timeout; + import java.nio.charset.Charset; import java.io.IOException; @@ -38,6 +42,9 @@ */ public class SimpleHttpClientForTesting implements AutoCloseable { + private final static Timeout DEFAULT_RESPONSE_TIMEOUT = Timeout.ofSeconds(5); + private final static Timeout DEFAULT_CONNECTION_TIMEOUT = Timeout.ofSeconds(5); + private final CloseableHttpClient httpClient; private static BasicHttpClientConnectionManager getInsecureTlsConnectionManager() @@ -65,7 +72,15 @@ public SimpleHttpClientForTesting(boolean useTlsAndInsecurelyInsteadOfClearText) } private SimpleHttpClientForTesting(BasicHttpClientConnectionManager connectionManager) { - httpClient = HttpClients.custom().setConnectionManager(connectionManager).build(); + var requestConfig = RequestConfig.custom() + .setConnectionRequestTimeout(DEFAULT_CONNECTION_TIMEOUT) + .setResponseTimeout(DEFAULT_RESPONSE_TIMEOUT) + .build(); + + httpClient = HttpClients.custom() + .setConnectionManager(connectionManager) + .setDefaultRequestConfig(requestConfig) + .build(); } @AllArgsConstructor diff --git a/TrafficCapture/trafficCaptureProxyServer/build.gradle b/TrafficCapture/trafficCaptureProxyServer/build.gradle index 74fd3dfe8..fdec3d131 100644 --- a/TrafficCapture/trafficCaptureProxyServer/build.gradle +++ b/TrafficCapture/trafficCaptureProxyServer/build.gradle @@ -41,6 +41,11 @@ dependencies { testImplementation testFixtures(project(path: ':testUtilities')) testImplementation testFixtures(project(path: ':captureOffloader')) testImplementation testFixtures(project(path: ':coreUtilities')) + testImplementation group: 'eu.rekawek.toxiproxy', name: 'toxiproxy-java', version: '2.1.7' + testImplementation group: 'org.testcontainers', name: 'junit-jupiter', version: '1.19.5' + testImplementation group: 'org.testcontainers', name: 'kafka', version: '1.19.5' + testImplementation group: 'org.testcontainers', name: 'testcontainers', version: '1.19.5' + testImplementation group: 'org.testcontainers', name: 'toxiproxy', version: '1.19.5' } tasks.withType(Tar){ diff --git a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPool.java b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPool.java index 46a20b517..6c165e39b 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPool.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/main/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPool.java @@ -319,8 +319,6 @@ private String toStringOnThread() { sb.append(", numInProgressItems=").append(inProgressItems.size()); sb.append(", numReadyItems=").append(readyItems.size()); } - sb.append(", inProgressItems=").append(inProgressItems); - sb.append(", readyItems=").append(readyItems); sb.append(", itemSupplier=").append(itemSupplier); sb.append(", onExpirationConsumer=").append(onExpirationConsumer); sb.append(", eventLoop=").append(eventLoop); diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/KafkaConfigurationCaptureProxyTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/KafkaConfigurationCaptureProxyTest.java new file mode 100644 index 000000000..cfae95c80 --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/KafkaConfigurationCaptureProxyTest.java @@ -0,0 +1,217 @@ +package org.opensearch.migrations.trafficcapture.proxyserver; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import eu.rekawek.toxiproxy.Proxy; +import eu.rekawek.toxiproxy.model.ToxicDirection; +import java.io.IOException; +import java.net.URI; +import java.time.Duration; +import java.util.stream.IntStream; +import java.util.stream.Stream; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.ThrowingConsumer; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.opensearch.migrations.testutils.SimpleHttpClientForTesting; +import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.CaptureProxyContainer; +import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.HttpdContainerTestBase; +import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.KafkaContainerTestBase; +import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.ToxiproxyContainerTestBase; +import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations.HttpdContainerTest; +import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations.KafkaContainerTest; +import org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations.ToxiproxyContainerTest; + +@Slf4j +@KafkaContainerTest +@HttpdContainerTest +@ToxiproxyContainerTest +public class KafkaConfigurationCaptureProxyTest { + + private static final KafkaContainerTestBase kafkaTestBase = new KafkaContainerTestBase(); + private static final HttpdContainerTestBase httpdTestBase = new HttpdContainerTestBase(); + private static final ToxiproxyContainerTestBase toxiproxyTestBase = new ToxiproxyContainerTestBase(); + private static final String HTTPD_GET_EXPECTED_RESPONSE = "

It works!

\n"; + private static final int DEFAULT_NUMBER_OF_CALLS = 3; + private static final long PROXY_EXPECTED_MAX_LATENCY_MS = Duration.ofSeconds(1).toMillis(); + private Proxy kafkaProxy; + private Proxy destinationProxy; + + @BeforeAll + public static void setUp() { + kafkaTestBase.start(); + httpdTestBase.start(); + toxiproxyTestBase.start(); + } + + @AfterAll + public static void tearDown() { + kafkaTestBase.stop(); + httpdTestBase.stop(); + toxiproxyTestBase.stop(); + } + + private static void assertLessThan(long ceiling, long actual) { + Assertions.assertTrue(actual < ceiling, + () -> "Expected actual value to be less than " + ceiling + " but was " + actual + "."); + } + + @BeforeEach + public void setUpTest() { + kafkaProxy = toxiproxyTestBase.getProxy(kafkaTestBase.getContainer()); + destinationProxy = toxiproxyTestBase.getProxy(httpdTestBase.getContainer()); + } + + @AfterEach + public void tearDownTest() { + toxiproxyTestBase.deleteProxy(kafkaProxy); + toxiproxyTestBase.deleteProxy(destinationProxy); + } + + @ParameterizedTest + @EnumSource(FailureMode.class) + @Disabled + // TODO: Fix proxy bug and enable test + public void testCaptureProxyWithKafkaImpairedBeforeStart(FailureMode failureMode) { + try (var captureProxy = new CaptureProxyContainer(toxiproxyTestBase.getProxyUrlHttp(destinationProxy), + toxiproxyTestBase.getProxyUrlHttp(kafkaProxy))) { + failureMode.apply(kafkaProxy); + + captureProxy.start(); + + var latency = assertBasicCalls(captureProxy, DEFAULT_NUMBER_OF_CALLS); + + assertLessThan(PROXY_EXPECTED_MAX_LATENCY_MS, latency.toMillis()); + } + } + + @ParameterizedTest + @EnumSource(FailureMode.class) + public void testCaptureProxyWithKafkaImpairedAfterStart(FailureMode failureMode) { + try (var captureProxy = new CaptureProxyContainer(toxiproxyTestBase.getProxyUrlHttp(destinationProxy), + toxiproxyTestBase.getProxyUrlHttp(kafkaProxy))) { + captureProxy.start(); + + failureMode.apply(kafkaProxy); + + var latency = assertBasicCalls(captureProxy, DEFAULT_NUMBER_OF_CALLS); + + assertLessThan(PROXY_EXPECTED_MAX_LATENCY_MS, latency.toMillis()); + } + } + + @ParameterizedTest + @EnumSource(FailureMode.class) + public void testCaptureProxyWithKafkaImpairedDoesNotAffectRequest_proxysRequest(FailureMode failureMode) { + try (var captureProxy = new CaptureProxyContainer(toxiproxyTestBase.getProxyUrlHttp(destinationProxy), + toxiproxyTestBase.getProxyUrlHttp(kafkaProxy))) { + captureProxy.start(); + final int numberOfTests = 20; + + // Performance is different for first few calls so throw them away + assertBasicCalls(captureProxy, 3); + + var averageBaselineDuration = assertBasicCalls(captureProxy, numberOfTests); + + failureMode.apply(kafkaProxy); + + // Calculate average duration of impaired calls + var averageImpairedDuration = assertBasicCalls(captureProxy, numberOfTests); + + long acceptableDifference = Duration.ofMillis(25).toMillis(); + + log.info("Baseline Duration: {}ms, Impaired Duration: {}ms", averageBaselineDuration.toMillis(), + averageImpairedDuration.toMillis()); + + assertEquals(averageBaselineDuration.toMillis(), averageImpairedDuration.toMillis(), acceptableDifference, + "The average durations are not close enough"); + } + } + + @Test + public void testCaptureProxyLatencyAddition() { + try (var captureProxy = new CaptureProxyContainer(toxiproxyTestBase.getProxyUrlHttp(destinationProxy), + toxiproxyTestBase.getProxyUrlHttp(kafkaProxy))) { + captureProxy.start(); + final int numberOfTests = 25; + + // Performance is different for first few calls so throw them away + assertBasicCalls(captureProxy, 3); + + var averageRequestDurationWithProxy = assertBasicCalls(captureProxy, numberOfTests); + + var averageNoProxyDuration = assertBasicCalls(toxiproxyTestBase.getProxyUrlHttp(destinationProxy), + numberOfTests); + + var acceptableProxyLatencyAdd = Duration.ofMillis(25); + + assertLessThan(averageNoProxyDuration.plus(acceptableProxyLatencyAdd).toMillis(), + averageRequestDurationWithProxy.toMillis()); + } + } + + private Duration assertBasicCalls(CaptureProxyContainer proxy, int numberOfCalls) { + return assertBasicCalls(CaptureProxyContainer.getUriFromContainer(proxy), numberOfCalls); + } + + private Duration assertBasicCalls(String endpoint, int numberOfCalls) { + return IntStream.range(0, numberOfCalls).mapToObj(i -> assertBasicCall(endpoint)) + .reduce(Duration.ZERO, Duration::plus).dividedBy(numberOfCalls); + } + + + private Duration assertBasicCall(String endpoint) { + try (var client = new SimpleHttpClientForTesting()) { + long startTimeNanos = System.nanoTime(); + var response = client.makeGetRequest(URI.create(endpoint), Stream.empty()); + long endTimeNanos = System.nanoTime(); + + var responseBody = new String(response.payloadBytes); + assertEquals(HTTPD_GET_EXPECTED_RESPONSE, responseBody); + return Duration.ofNanos(endTimeNanos - startTimeNanos); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public enum FailureMode { + LATENCY( + (proxy) -> proxy.toxics().latency("latency", ToxicDirection.UPSTREAM, 5000)), + BANDWIDTH( + (proxy) -> proxy.toxics().bandwidth("bandwidth", ToxicDirection.DOWNSTREAM, 1)), + TIMEOUT( + (proxy) -> proxy.toxics().timeout("timeout", ToxicDirection.UPSTREAM, 5000)), + SLICER( + (proxy) -> { + proxy.toxics().slicer("slicer_down", ToxicDirection.DOWNSTREAM, 1, 1000); + proxy.toxics().slicer("slicer_up", ToxicDirection.UPSTREAM, 1, 1000); + }), + SLOW_CLOSE( + (proxy) -> proxy.toxics().slowClose("slow_close", ToxicDirection.UPSTREAM, 5000)), + RESET_PEER( + (proxy) -> proxy.toxics().resetPeer("reset_peer", ToxicDirection.UPSTREAM, 5000)), + LIMIT_DATA( + (proxy) -> proxy.toxics().limitData("limit_data", ToxicDirection.UPSTREAM, 10)), + DISCONNECT(Proxy::disable); + private final ThrowingConsumer failureModeApplier; + + FailureMode(ThrowingConsumer applier) { + this.failureModeApplier = applier; + } + + public void apply(Proxy proxy) { + try { + this.failureModeApplier.accept(proxy); + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + } +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPoolTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPoolTest.java index 47944c922..03f162fe1 100644 --- a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPoolTest.java +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/netty/ExpiringSubstitutableItemPoolTest.java @@ -20,8 +20,10 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.junit.jupiter.api.parallel.Isolated; @Slf4j +@Isolated("Isolation based on temporal checks") class ExpiringSubstitutableItemPoolTest { public static final int NUM_POOLED_ITEMS = 5; @@ -69,7 +71,6 @@ void get() throws Exception { return rval; }, item->{ - expireCountdownLatch.countDown(); log.info("Expiring item: "+item); try { expiredItems.add(item.get()); @@ -79,6 +80,7 @@ void get() throws Exception { } catch (ExecutionException e) { throw Lombok.sneakyThrow(e); } + expireCountdownLatch.countDown(); }); for (int i = 0; i destinationUriSupplier; + private final Supplier kafkaUriSupplier; + private Integer listeningPort; + private Thread serverThread; + + public CaptureProxyContainer(final Supplier destinationUriSupplier, + final Supplier kafkaUriSupplier) { + this.destinationUriSupplier = destinationUriSupplier; + this.kafkaUriSupplier = kafkaUriSupplier; + } + + public CaptureProxyContainer(final String destinationUri, final String kafkaUri) { + this.destinationUriSupplier = () -> destinationUri; + this.kafkaUriSupplier = () -> kafkaUri; + } + + public CaptureProxyContainer(final Container destination, final KafkaContainer kafka) { + this(() -> getUriFromContainer(destination), () -> getUriFromContainer(kafka)); + } + + public static String getUriFromContainer(final Container container) { + return "http://" + container.getHost() + ":" + container.getFirstMappedPort(); + } + + @Override + public void start() { + this.listeningPort = PortFinder.findOpenPort(); + serverThread = new Thread(() -> { + try { + String[] args = { + "--kafkaConnection", kafkaUriSupplier.get(), + "--destinationUri", destinationUriSupplier.get(), + "--listenPort", String.valueOf(listeningPort), + "--insecureDestination" + }; + + CaptureProxy.main(args); + } catch (Exception e) { + throw new AssertionError("Should not have exception", e); + } + }); + + serverThread.start(); + new HttpWaitStrategy().forPort(listeningPort) + .withStartupTimeout(TIMEOUT_DURATION) + .waitUntilReady(this); + } + + @Override + public boolean isRunning() { + return serverThread != null; + } + + @Override + public void stop() { + if (serverThread != null) { + serverThread.interrupt(); + this.serverThread = null; + } + this.listeningPort = null; + close(); + } + + @Override + public void close() { + } + + @Override + public Set getLivenessCheckPortNumbers() { + return getExposedPorts() + .stream() + .map(this::getMappedPort) + .collect(Collectors.toSet()); + } + + @Override + public String getHost() { + return "localhost"; + } + + @Override + public Integer getMappedPort(int originalPort) { + if (getExposedPorts().contains(originalPort)) { + return listeningPort; + } + return null; + } + + @Override + public List getExposedPorts() { + // Internal and External ports are the same + return List.of(listeningPort); + } + + @Override + public InspectContainerResponse getContainerInfo() { + return new InspectNonContainerResponse("captureProxy"); + } + + @AllArgsConstructor + static class InspectNonContainerResponse extends InspectContainerResponse { + + private String name; + + @Override + public String getName() { + return name; + } + } +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/HttpdContainerTestBase.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/HttpdContainerTestBase.java new file mode 100644 index 000000000..f16993eb7 --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/HttpdContainerTestBase.java @@ -0,0 +1,13 @@ +package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers; + +import org.testcontainers.containers.GenericContainer; + +public class HttpdContainerTestBase extends TestContainerTestBase> { + + private static final GenericContainer httpd = new GenericContainer("httpd:alpine") + .withExposedPorts(80); // Container Port + + public GenericContainer getContainer() { + return httpd; + } +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/KafkaContainerTestBase.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/KafkaContainerTestBase.java new file mode 100644 index 000000000..be90048b6 --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/KafkaContainerTestBase.java @@ -0,0 +1,14 @@ +package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers; + +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.utility.DockerImageName; + +public class KafkaContainerTestBase extends TestContainerTestBase { + + private static final KafkaContainer kafka = new KafkaContainer( + DockerImageName.parse("confluentinc/cp-kafka:latest")); + + public KafkaContainer getContainer() { + return kafka; + } +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/TestContainerTestBase.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/TestContainerTestBase.java new file mode 100644 index 000000000..0d1e23f50 --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/TestContainerTestBase.java @@ -0,0 +1,16 @@ +package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers; + +import org.testcontainers.containers.GenericContainer; + +abstract class TestContainerTestBase> { + + public void start() { + getContainer().start(); + } + + public void stop() { + getContainer().start(); + } + + abstract T getContainer(); +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/ToxiproxyContainerTestBase.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/ToxiproxyContainerTestBase.java new file mode 100644 index 000000000..de773a9b9 --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/ToxiproxyContainerTestBase.java @@ -0,0 +1,76 @@ +package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers; + +import eu.rekawek.toxiproxy.Proxy; +import eu.rekawek.toxiproxy.ToxiproxyClient; +import java.io.IOException; +import java.util.HashSet; +import java.util.concurrent.ConcurrentSkipListSet; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.ToxiproxyContainer; + +public class ToxiproxyContainerTestBase extends TestContainerTestBase { + + private static final ToxiproxyContainer toxiproxy = new ToxiproxyContainer( + "ghcr.io/shopify/toxiproxy:latest") + .withAccessToHost(true); + + final ConcurrentSkipListSet toxiproxyUnusedExposedPorts = new ConcurrentSkipListSet<>(); + + static int getListeningPort(Proxy proxy) { + return Integer.parseInt(proxy.getListen().replaceAll(".*:", "")); + } + + public ToxiproxyContainer getContainer() { + return toxiproxy; + } + + @Override + public void start() { + final int TOXIPROXY_CONTROL_PORT = 8474; + getContainer().start(); + var concurrentPortSet = new HashSet<>(getContainer().getExposedPorts()); + concurrentPortSet.remove(TOXIPROXY_CONTROL_PORT); + toxiproxyUnusedExposedPorts.addAll(concurrentPortSet); + } + + @Override + public void stop() { + toxiproxyUnusedExposedPorts.clear(); + getContainer().stop(); + } + + public void deleteProxy(Proxy proxy) { + var proxyPort = getListeningPort(proxy); + try { + proxy.delete(); + toxiproxyUnusedExposedPorts.add(proxyPort); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public Proxy getProxy(GenericContainer container) { + var containerPort = container.getFirstMappedPort(); + final ToxiproxyClient toxiproxyClient = new ToxiproxyClient(toxiproxy.getHost(), + getContainer().getControlPort()); + org.testcontainers.Testcontainers.exposeHostPorts(containerPort); + try { + var containerName = (container.getDockerImageName() + "_" + container.getContainerName() + "_" + + Thread.currentThread().getId()).replaceAll("[^a-zA-Z0-9_]+", "_"); + synchronized (toxiproxyUnusedExposedPorts) { + var proxyPort = toxiproxyUnusedExposedPorts.first(); + var proxy = toxiproxyClient.createProxy(containerName, "0.0.0.0:" + proxyPort, + "host.testcontainers.internal" + ":" + containerPort); + toxiproxyUnusedExposedPorts.remove(proxyPort); + proxy.enable(); + return proxy; + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public String getProxyUrlHttp(Proxy proxy) { + return "http://" + getContainer().getHost() + ":" + getContainer().getMappedPort(getListeningPort(proxy)); + } +} \ No newline at end of file diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/HttpdContainerTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/HttpdContainerTest.java new file mode 100644 index 000000000..719069bae --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/HttpdContainerTest.java @@ -0,0 +1,11 @@ +package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations; + +import java.lang.annotation.Inherited; +import org.junit.jupiter.api.parallel.ResourceLock; + +@Inherited +@ResourceLock("HttpdContainer") +@TestContainerTest +public @interface HttpdContainerTest { + +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/KafkaContainerTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/KafkaContainerTest.java new file mode 100644 index 000000000..30c5a2cd2 --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/KafkaContainerTest.java @@ -0,0 +1,11 @@ +package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations; + +import java.lang.annotation.Inherited; +import org.junit.jupiter.api.parallel.ResourceLock; + +@Inherited +@ResourceLock("KafkaContainer") +@TestContainerTest +public @interface KafkaContainerTest { + +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/TestContainerTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/TestContainerTest.java new file mode 100644 index 000000000..1163dff3e --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/TestContainerTest.java @@ -0,0 +1,11 @@ +package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations; + +import java.lang.annotation.Inherited; +import org.junit.jupiter.api.Tag; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Inherited +@Tag("longTest") +@Testcontainers(disabledWithoutDocker = true, parallel = true) +public @interface TestContainerTest { +} diff --git a/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/ToxiproxyContainerTest.java b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/ToxiproxyContainerTest.java new file mode 100644 index 000000000..85146b080 --- /dev/null +++ b/TrafficCapture/trafficCaptureProxyServer/src/test/java/org/opensearch/migrations/trafficcapture/proxyserver/testcontainers/annotations/ToxiproxyContainerTest.java @@ -0,0 +1,10 @@ +package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers.annotations; + +import java.lang.annotation.Inherited; +import org.junit.jupiter.api.parallel.ResourceLock; + +@Inherited +@ResourceLock("ToxiproxyContainerTestBase") +@TestContainerTest +public @interface ToxiproxyContainerTest { +} diff --git a/TrafficCapture/trafficReplayer/build.gradle b/TrafficCapture/trafficReplayer/build.gradle index 368eb1901..eda22f161 100644 --- a/TrafficCapture/trafficReplayer/build.gradle +++ b/TrafficCapture/trafficReplayer/build.gradle @@ -82,9 +82,9 @@ dependencies { testImplementation group: 'org.apache.httpcomponents.client5', name: 'httpclient5', version: '5.2.1' testImplementation group: 'org.junit.jupiter', name:'junit-jupiter-api', version:'5.x.x' - testImplementation group: 'org.testcontainers', name: 'junit-jupiter', version: '1.19.0' - testImplementation group: 'org.testcontainers', name: 'kafka', version: '1.19.0' - testImplementation group: 'org.testcontainers', name: 'testcontainers', version: '1.19.0' + testImplementation group: 'org.testcontainers', name: 'junit-jupiter', version: '1.19.5' + testImplementation group: 'org.testcontainers', name: 'kafka', version: '1.19.5' + testImplementation group: 'org.testcontainers', name: 'testcontainers', version: '1.19.5' testImplementation group: 'org.mockito', name:'mockito-core', version:'4.6.1' testImplementation group: 'org.mockito', name:'mockito-junit-jupiter', version:'4.6.1' testRuntimeOnly group:'org.junit.jupiter', name:'junit-jupiter-engine', version:'5.x.x' @@ -119,3 +119,10 @@ jar { attributes 'Main-Class': application.mainClass } } + +// TODO: Fix Parallel Execution for junit trafficReplayer tests +// with resourceSharing of TrafficReplayerRunner to eliminate forking and rely on threads +slowTest { + forkEvery(1) + maxParallelForks = Runtime.runtime.availableProcessors() +} diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ResultsToLogsConsumer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ResultsToLogsConsumer.java index a38bf273f..c674797b5 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ResultsToLogsConsumer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ResultsToLogsConsumer.java @@ -19,8 +19,17 @@ public class ResultsToLogsConsumer implements BiConsumerblockingSource.readNextTrafficStreamChunk(rootContext::createReadChunkContext) - .get(10, TimeUnit.MILLISECONDS)); + .get(10000, TimeUnit.MILLISECONDS)); Assertions.assertInstanceOf(EOFException.class, exception.getCause()); } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullReplayerWithTracingChecksTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullReplayerWithTracingChecksTest.java index 74fbb8c49..b3ea65f49 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullReplayerWithTracingChecksTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullReplayerWithTracingChecksTest.java @@ -7,6 +7,7 @@ import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.ResourceLock; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.opensearch.migrations.replay.traffic.source.BlockingTrafficSource; @@ -41,6 +42,7 @@ protected TestContext makeInstrumentationContext() { } @Test + @ResourceLock("TrafficReplayerRunner") public void testSingleStreamWithCloseIsCommitted() throws Throwable { var random = new Random(1); var httpServer = SimpleNettyHttpServer.makeServer(false, Duration.ofMillis(2), @@ -57,11 +59,13 @@ public void testSingleStreamWithCloseIsCommitted() throws Throwable { () -> TestContext.withAllTracking(), trafficSourceSupplier); Assertions.assertEquals(1, trafficSourceSupplier.nextReadCursor.get()); + httpServer.close(); log.info("done"); } @ParameterizedTest @ValueSource(ints = {1,2}) + @ResourceLock("TrafficReplayerRunner") public void testStreamWithRequestsWithCloseIsCommittedOnce(int numRequests) throws Throwable { var random = new Random(1); var httpServer = SimpleNettyHttpServer.makeServer(false, Duration.ofMillis(2), @@ -108,7 +112,8 @@ public void testStreamWithRequestsWithCloseIsCommittedOnce(int numRequests) thro Assertions.assertTrue(wasNew); }); } finally { - tr.shutdown(null); + tr.shutdown(null).join(); + httpServer.close(); } Assertions.assertEquals(numRequests, tuplesReceived.size()); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullTrafficReplayerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullTrafficReplayerTest.java index b5bdb7e5c..5188af001 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullTrafficReplayerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/FullTrafficReplayerTest.java @@ -6,6 +6,7 @@ import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.parallel.ResourceLock; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.opensearch.migrations.replay.datatypes.ITrafficStreamKey; @@ -74,6 +75,7 @@ public Consumer get() { "-1,true", }) @Tag("longTest") + @ResourceLock("TrafficReplayerRunner") public void fullTest(int testSize, boolean randomize) throws Throwable { var random = new Random(1); var httpServer = SimpleNettyHttpServer.makeServer(false, Duration.ofMillis(200), diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaRestartingTrafficReplayerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaRestartingTrafficReplayerTest.java index 9dd8164c7..8b6f16a19 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaRestartingTrafficReplayerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/KafkaRestartingTrafficReplayerTest.java @@ -10,6 +10,8 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.parallel.ResourceLock; +import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.opensearch.migrations.replay.kafka.KafkaTestUtils; import org.opensearch.migrations.replay.kafka.KafkaTrafficCaptureSource; @@ -76,7 +78,7 @@ public Consumer get() { } } - //@ParameterizedTest + @ParameterizedTest @CsvSource(value = { "3,false", "-1,false", @@ -84,6 +86,7 @@ public Consumer get() { "-1,true", }) @Tag("longTest") + @ResourceLock("TrafficReplayerRunner") public void fullTest(int testSize, boolean randomize) throws Throwable { var random = new Random(1); var httpServer = SimpleNettyHttpServer.makeServer(false, Duration.ofMillis(2), @@ -102,6 +105,7 @@ public void fullTest(int testSize, boolean randomize) throws Throwable { rootContext -> new SentinelSensingTrafficSource( new KafkaTrafficCaptureSource(rootContext, buildKafkaConsumer(), TEST_TOPIC_NAME, Duration.ofMillis(DEFAULT_POLL_INTERVAL_MS)))); + httpServer.close(); log.info("done"); } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java index 7f01c7d56..fe335a252 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java @@ -8,6 +8,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; import org.opensearch.migrations.replay.util.DiagnosticTrackableCompletableFuture; import org.opensearch.migrations.testutils.SimpleHttpServer; import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection; @@ -30,6 +32,7 @@ class RequestSenderOrchestratorTest extends InstrumentationTest { @Test @Tag("longTest") + @Execution(ExecutionMode.SAME_THREAD) public void testThatSchedulingWorks() throws Exception { var httpServer = SimpleHttpServer.makeServer(false, r -> TestHttpServerContext.makeResponse(r, Duration.ofMillis(100))); @@ -77,6 +80,7 @@ public void testThatSchedulingWorks() throws Exception { } } closeFuture.get(); + httpServer.close(); } private List makeRequest(int i) { diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java index 3ff102293..3a294e8bc 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/ResultsToLogsConsumerTest.java @@ -3,15 +3,23 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.netty.buffer.Unpooled; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.core.LogEvent; -import org.apache.logging.log4j.core.Logger; -import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.appender.AbstractAppender; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.ResourceLock; import org.opensearch.migrations.replay.datahandlers.NettyPacketToHttpConsumerTest; import org.opensearch.migrations.replay.datatypes.HttpRequestTransformationStatus; import org.opensearch.migrations.replay.datatypes.PojoTrafficStreamKeyAndContext; @@ -20,15 +28,7 @@ import org.opensearch.migrations.testutils.WrapWithNettyLeakDetection; import org.opensearch.migrations.tracing.InstrumentationTest; import org.opensearch.migrations.tracing.TestContext; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.time.Duration; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; +import org.slf4j.LoggerFactory; @Slf4j @WrapWithNettyLeakDetection(repetitions = 4) @@ -43,28 +43,38 @@ protected TestContext makeInstrumentationContext() { } private static class CloseableLogSetup implements AutoCloseable { - List logEvents = new ArrayList<>(); + List logEvents = Collections.synchronizedList(new ArrayList<>()); AbstractAppender testAppender; + org.slf4j.Logger testLogger; + org.apache.logging.log4j.core.Logger internalLogger; + + final String instanceName; + public CloseableLogSetup() { - testAppender = new AbstractAppender(ResultsToLogsConsumer.OUTPUT_TUPLE_JSON_LOGGER, + instanceName = this.getClass().getName() + ".Thread" + Thread.currentThread().getId(); + + testAppender = new AbstractAppender(instanceName, null, null, false, null) { @Override public void append(LogEvent event) { logEvents.add(event.getMessage().getFormattedMessage()); } }; - var tupleLogger = (Logger) LogManager.getLogger(ResultsToLogsConsumer.OUTPUT_TUPLE_JSON_LOGGER); - tupleLogger.setLevel(Level.ALL); + testAppender.start(); - tupleLogger.setAdditive(false); - tupleLogger.addAppender(testAppender); - var loggerCtx = ((LoggerContext) LogManager.getContext(false)); + + internalLogger = (org.apache.logging.log4j.core.Logger) LogManager.getLogger(instanceName); + testLogger = LoggerFactory.getLogger(instanceName); + + // Cast to core.Logger to access internal methods + internalLogger.setLevel(Level.ALL); + internalLogger.setAdditive(false); + internalLogger.addAppender(testAppender); } @Override public void close() { - var tupleLogger = (Logger) LogManager.getLogger(ResultsToLogsConsumer.OUTPUT_TUPLE_JSON_LOGGER); - tupleLogger.removeAppender(testAppender); + internalLogger.removeAppender(testAppender); testAppender.stop(); } } @@ -80,6 +90,7 @@ public void testTupleNewWithNullKeyThrows() { } @Test + @ResourceLock("TestContext") public void testOutputterWithNulls() throws IOException { var urk = new UniqueReplayerRequestKey(PojoTrafficStreamKeyAndContext.build(NODE_ID, "c", 0, @@ -87,7 +98,8 @@ public void testOutputterWithNulls() throws IOException { var emptyTuple = new SourceTargetCaptureTuple(rootContext.getTestTupleContext(), null, null, null, null, null, null); try (var closeableLogSetup = new CloseableLogSetup()) { - var consumer = new TupleParserChainConsumer(new ResultsToLogsConsumer()); + var resultsToLogsConsumer = new ResultsToLogsConsumer(closeableLogSetup.testLogger, null); + var consumer = new TupleParserChainConsumer(resultsToLogsConsumer); consumer.accept(emptyTuple); Assertions.assertEquals(1, closeableLogSetup.logEvents.size()); var contents = closeableLogSetup.logEvents.get(0); @@ -97,13 +109,15 @@ public void testOutputterWithNulls() throws IOException { } @Test - public void testOutputterWithException() throws IOException { + @ResourceLock("TestContext") + public void testOutputterWithException() { var exception = new Exception(TEST_EXCEPTION_MESSAGE); var emptyTuple = new SourceTargetCaptureTuple(rootContext.getTestTupleContext(), null, null, null, null, exception, null); try (var closeableLogSetup = new CloseableLogSetup()) { - var consumer = new TupleParserChainConsumer(new ResultsToLogsConsumer()); + var resultsToLogsConsumer = new ResultsToLogsConsumer(closeableLogSetup.testLogger, null); + var consumer = new TupleParserChainConsumer(resultsToLogsConsumer); consumer.accept(emptyTuple); Assertions.assertEquals(1, closeableLogSetup.logEvents.size()); var contents = closeableLogSetup.logEvents.get(0); @@ -113,13 +127,14 @@ public void testOutputterWithException() throws IOException { } } - public static byte[] loadResourceAsBytes(String path) throws IOException { + private static byte[] loadResourceAsBytes(String path) throws IOException { try (InputStream inputStream = ResultsToLogsConsumerTest.class.getResourceAsStream(path)) { return inputStream.readAllBytes(); } } @Test + @ResourceLock("TestContext") public void testOutputterForGet() throws IOException { final String EXPECTED_LOGGED_OUTPUT = "" + @@ -178,6 +193,7 @@ public void testOutputterForGet() throws IOException { } @Test + @ResourceLock("TestContext") public void testOutputterForPost() throws IOException { final String EXPECTED_LOGGED_OUTPUT = "" + "{\n" + @@ -232,7 +248,7 @@ public void testOutputterForPost() throws IOException { testOutputterForRequest("post_formUrlEncoded_withFixedLength.txt", EXPECTED_LOGGED_OUTPUT); } - public void testOutputterForRequest(String requestResourceName, String expected) throws IOException { + private void testOutputterForRequest(String requestResourceName, String expected) throws IOException { var trafficStreamKey = PojoTrafficStreamKeyAndContext.build(NODE_ID, "c", 0, rootContext::createTrafficStreamContextForTest); var sourcePair = new RequestResponsePacketPair(trafficStreamKey, Instant.EPOCH, @@ -251,7 +267,7 @@ public void testOutputterForRequest(String requestResourceName, String expected) var closeableLogSetup = new CloseableLogSetup()) { var tuple = new SourceTargetCaptureTuple(tupleContext, sourcePair, targetRequest, targetResponse, HttpRequestTransformationStatus.SKIPPED, null, Duration.ofMillis(267)); - var streamConsumer = new ResultsToLogsConsumer(); + var streamConsumer = new ResultsToLogsConsumer(closeableLogSetup.testLogger, null); var consumer = new TupleParserChainConsumer(streamConsumer); consumer.accept(tuple); Assertions.assertEquals(1, closeableLogSetup.logEvents.size()); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java index c79986c89..049c9b527 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java @@ -10,8 +10,11 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.internal.matchers.Same; import org.opensearch.migrations.replay.ClientConnectionPool; import org.opensearch.migrations.replay.PacketToTransformingHttpHandlerFactory; import org.opensearch.migrations.replay.ReplayEngine; diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceLongTermTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceLongTermTest.java index 81ad5d8ef..7790f16f0 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceLongTermTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceLongTermTest.java @@ -61,7 +61,7 @@ public void testTrafficCaptureSource() throws Exception { for (int i = 0; i < TEST_RECORD_COUNT; ) { Thread.sleep(getSleepAmountMsForProducerRun(i)); var nextChunkFuture = kafkaTrafficCaptureSource.readNextTrafficStreamChunk(rootContext::createReadChunkContext); - var recordsList = nextChunkFuture.get((2 + TEST_RECORD_COUNT) * PRODUCER_SLEEP_INTERVAL_MS, TimeUnit.MILLISECONDS); + var recordsList = nextChunkFuture.get((2 * TEST_RECORD_COUNT) * PRODUCER_SLEEP_INTERVAL_MS, TimeUnit.MILLISECONDS); for (int j = 0; j < recordsList.size(); ++j) { Assertions.assertEquals(KafkaTestUtils.getConnectionId(i + j), recordsList.get(j).getStream().getConnectionId()); } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java index e2b13fdca..521943f01 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/kafka/KafkaTrafficCaptureSourceTest.java @@ -38,6 +38,8 @@ class KafkaTrafficCaptureSourceTest extends InstrumentationTest { public static final int NUM_READ_ITEMS_BOUND = 1000; public static final String TEST_TOPIC_NAME = "TEST_TOPIC_NAME"; + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(5); + @Test public void testRecordToString() { var ts = TrafficStream.newBuilder() @@ -74,7 +76,7 @@ public void testSupplyTrafficFromSource() { // were missing traffic streams. Its task currently is limited to the numTrafficStreams where it will stop the stream var tsCount = new AtomicInteger(); - Assertions.assertTimeoutPreemptively(Duration.ofSeconds(1), () -> { + Assertions.assertTimeoutPreemptively(TEST_TIMEOUT, () -> { while (tsCount.get() < numTrafficStreams) { protobufConsumer.readNextTrafficStreamChunk(rootContext::createReadChunkContext).get().stream() .forEach(streamWithKey -> { @@ -125,7 +127,7 @@ public void testSupplyTrafficWithUnformattedMessages() { // were missing traffic streams. Its task currently is limited to the numTrafficStreams where it will stop the stream var tsCount = new AtomicInteger(); - Assertions.assertTimeoutPreemptively(Duration.ofSeconds(1), () -> { + Assertions.assertTimeoutPreemptively(TEST_TIMEOUT, () -> { while (tsCount.get() < numTrafficStreams) { protobufConsumer.readNextTrafficStreamChunk(rootContext::createReadChunkContext).get().stream() .forEach(streamWithKey->{