From 60e3410857cbae64ed46b41588406fd502f89cb0 Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Wed, 8 May 2024 12:23:16 -0500 Subject: [PATCH 1/4] Split ReadTimeoutHandler from ConnectionClosedListenerHandler Signed-off-by: Andre Kurait --- .../replay/ClientConnectionPool.java | 8 ++--- .../migrations/replay/TrafficReplayer.java | 7 +++-- .../replay/TrafficReplayerTopLevel.java | 19 +++++++----- .../NettyPacketToHttpConsumer.java | 29 ++++++++++++++----- .../replay/RequestSenderOrchestratorTest.java | 11 +++---- .../NettyPacketToHttpConsumerTest.java | 14 ++++----- .../FullReplayerWithTracingChecksTest.java | 2 +- .../e2etests/FullTrafficReplayerTest.java | 2 +- ...ficStreamBecomesTwoTargetChannelsTest.java | 5 ++-- .../e2etests/TrafficReplayerRunner.java | 4 +-- .../RootReplayerConstructorExtensions.java | 22 +++----------- 11 files changed, 61 insertions(+), 62 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java index 7c6cd2912..c7b50aac8 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ClientConnectionPool.java @@ -22,7 +22,6 @@ import org.opensearch.migrations.replay.util.TrackedFuture; import java.net.URI; -import java.time.Duration; import java.util.concurrent.CompletableFuture; @Slf4j @@ -32,7 +31,6 @@ public class ClientConnectionPool { private final SslContext sslContext; public final NioEventLoopGroup eventLoopGroup; private final LoadingCache connectionId2ChannelCache; - private final Duration timeout; @EqualsAndHashCode @AllArgsConstructor @@ -48,11 +46,9 @@ private Key getKey(String connectionId, int sessionNumber) { public ClientConnectionPool(@NonNull URI serverUri, SslContext sslContext, @NonNull String targetConnectionPoolName, - int numThreads, - @NonNull Duration timeout) { + int numThreads) { this.serverUri = serverUri; this.sslContext = sslContext; - this.timeout = timeout; this.eventLoopGroup = new NioEventLoopGroup(numThreads, new DefaultThreadFactory(targetConnectionPoolName)); @@ -80,7 +76,7 @@ public ConnectionReplaySession buildConnectionReplaySession(IReplayContexts.ICha return new AdaptiveRateLimiter() .get(() -> NettyPacketToHttpConsumer.createClientConnection(eventLoop, - sslContext, serverUri, connectionContext, timeout) + sslContext, serverUri, connectionContext) .whenComplete((v,t)-> { if (t == null) { log.atDebug().setMessage(() -> "New network connection result for " + diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java index 85450010d..2a86a1074 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java @@ -309,8 +309,7 @@ public static void main(String[] args) throws Exception { var tr = new TrafficReplayerTopLevel(topContext, uri, authTransformer, new TransformationLoader().getTransformerFactoryLoader(uri.getHost(), params.userAgent, transformerConfig), TrafficReplayerTopLevel.makeClientConnectionPool( - uri, params.allowInsecureConnections, params.numClientThreads, - Duration.ofSeconds(params.targetServerResponseTimeoutSeconds)), + uri, params.allowInsecureConnections, params.numClientThreads), new TrafficStreamLimiter(params.maxConcurrentRequests), orderedRequestTracker); activeContextMonitor = new ActiveContextMonitor( globalContextTracker, perContextTracker, orderedRequestTracker, 64, @@ -326,7 +325,9 @@ public static void main(String[] args) throws Exception { setupShutdownHookForReplayer(tr); var tupleWriter = new TupleParserChainConsumer(new ResultsToLogsConsumer()); var timeShifter = new TimeShifter(params.speedupFactor); - tr.setupRunAndWaitForReplayWithShutdownChecks(Duration.ofSeconds(params.observedPacketConnectionTimeout), + tr.setupRunAndWaitForReplayWithShutdownChecks( + Duration.ofSeconds(params.observedPacketConnectionTimeout), + Duration.ofSeconds(params.targetServerResponseTimeoutSeconds), blockingTrafficSource, timeShifter, tupleWriter); log.info("Done processing TrafficStreams"); } finally { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayerTopLevel.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayerTopLevel.java index d24dd9e34..cb78c08e2 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayerTopLevel.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayerTopLevel.java @@ -97,20 +97,19 @@ public TrafficReplayerTopLevel(IRootReplayerContext context, } public static ClientConnectionPool makeClientConnectionPool(URI serverUri, boolean allowInsecureConnections, - int numSendingThreads, Duration timeout) + int numSendingThreads) throws SSLException { - return makeClientConnectionPool(serverUri, allowInsecureConnections, numSendingThreads, null, timeout); + return makeClientConnectionPool(serverUri, allowInsecureConnections, numSendingThreads, null); } public static ClientConnectionPool makeClientConnectionPool(URI serverUri, boolean allowInsecureConnections, int numSendingThreads, - String connectionPoolName, - Duration timeout) throws SSLException { + String connectionPoolName) throws SSLException { return new ClientConnectionPool(serverUri, loadSslContext(serverUri, allowInsecureConnections), connectionPoolName != null ? connectionPoolName : getTargetConnectionPoolName(targetConnectionPoolUniqueCounter.getAndIncrement()), - numSendingThreads, timeout); + numSendingThreads); } public static String getTargetConnectionPoolName(int i) { @@ -130,12 +129,15 @@ public static SslContext loadSslContext(URI serverUri, boolean allowInsecureConn } public void setupRunAndWaitForReplayToFinish(Duration observedPacketConnectionTimeout, + Duration targetServerResponseTimeout, BlockingTrafficSource trafficSource, TimeShifter timeShifter, Consumer resultTupleConsumer) throws InterruptedException, ExecutionException { - var senderOrchestrator = new RequestSenderOrchestrator(clientConnectionPool, NettyPacketToHttpConsumer::new); + var senderOrchestrator = new RequestSenderOrchestrator(clientConnectionPool, + (replaySession, ctx) -> new NettyPacketToHttpConsumer(replaySession, ctx, + targetServerResponseTimeout)); var replayEngine = new ReplayEngine(senderOrchestrator, trafficSource, timeShifter); CapturedTrafficToHttpTransactionAccumulator trafficToHttpTransactionAccumulator = @@ -204,13 +206,14 @@ protected void wrapUpWorkAndEmitSummary(ReplayEngine replayEngine, } public void setupRunAndWaitForReplayWithShutdownChecks(Duration observedPacketConnectionTimeout, + Duration targetServerResponseTimeout, BlockingTrafficSource trafficSource, TimeShifter timeShifter, Consumer resultTupleConsumer) throws TrafficReplayer.TerminationException, ExecutionException, InterruptedException { try { - setupRunAndWaitForReplayToFinish(observedPacketConnectionTimeout, trafficSource, - timeShifter, resultTupleConsumer); + setupRunAndWaitForReplayToFinish(observedPacketConnectionTimeout, targetServerResponseTimeout, + trafficSource, timeShifter, resultTupleConsumer); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new TrafficReplayer.TerminationException(shutdownReasonRef.get(), e); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java index be046c885..ce0306333 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java @@ -5,6 +5,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; @@ -53,6 +54,7 @@ public class NettyPacketToHttpConsumer implements IPacketFinalizingConsumer currentRequestContextUnion; + Duration readTimeoutDuration; - private static class ConnectionClosedListenerHandler extends ReadTimeoutHandler { + private static class ConnectionClosedListenerHandler extends ChannelInboundHandlerAdapter { private final IReplayContexts.ISocketContext socketContext; - ConnectionClosedListenerHandler(IReplayContexts.IChannelKeyContext channelKeyContext, - Duration timeout) { - super(timeout.toMillis(), TimeUnit.MILLISECONDS); + ConnectionClosedListenerHandler(IReplayContexts.IChannelKeyContext channelKeyContext) { socketContext = channelKeyContext.createSocketContext(); } @Override @@ -79,16 +80,27 @@ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { socketContext.close(); super.channelUnregistered(ctx); } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + socketContext.close(); + log.atDebug().setMessage("Exception caught in ConnectionClosedListenerHandler." + + "Closing channel due to exception").setCause(cause).log(); + ctx.close(); + ctx.fireExceptionCaught(cause); + } } public NettyPacketToHttpConsumer(ConnectionReplaySession replaySession, - IReplayContexts.IReplayerHttpTransactionContext ctx) { + IReplayContexts.IReplayerHttpTransactionContext ctx, + Duration readTimeoutDuration) { this.replaySession = replaySession; var parentContext = ctx.createTargetRequestContext(); this.setCurrentMessageContext(parentContext.createHttpSendingContext()); responseBuilder = AggregatedRawResponse.builder(Instant.now()); log.atDebug().setMessage(() -> "C'tor: incoming session=" + replaySession).log(); this.activeChannelFuture = activateLiveChannel(); + this.readTimeoutDuration = readTimeoutDuration; } private TrackedFuture activateLiveChannel() { @@ -141,8 +153,7 @@ public IReplayContexts.ITargetRequestContext getParentContext() { createClientConnection(EventLoopGroup eventLoopGroup, SslContext sslContext, URI serverUri, - IReplayContexts.IChannelKeyContext channelKeyContext, - Duration timeout) { + IReplayContexts.IChannelKeyContext channelKeyContext) { String host = serverUri.getHost(); int port = serverUri.getPort(); log.atTrace().setMessage(()->"Active - setting up backend connection to " + host + ":" + port).log(); @@ -153,7 +164,7 @@ public IReplayContexts.ITargetRequestContext getParentContext() { @Override protected void initChannel(@NonNull Channel ch) throws Exception { ch.pipeline().addFirst(CONNECTION_CLOSE_HANDLER_NAME, - new ConnectionClosedListenerHandler(channelKeyContext, timeout)); + new ConnectionClosedListenerHandler(channelKeyContext)); } }) .channel(NioSocketChannel.class) @@ -223,6 +234,8 @@ private void initializeChannelPipeline() { } getParentContext().onBytesReceived(size); })); + pipeline.addLast(READ_TIMEOUT_HANDLER_NAME, + new ReadTimeoutHandler(this.readTimeoutDuration.toMillis(), TimeUnit.MILLISECONDS)); addLoggingHandlerLast(pipeline, "B"); pipeline.addLast(new BacksideSnifferHandler(responseBuilder)); addLoggingHandlerLast(pipeline, "C"); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java index ac55a3d7d..775578d09 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/RequestSenderOrchestratorTest.java @@ -1,5 +1,7 @@ package org.opensearch.migrations.replay; +import static org.opensearch.migrations.replay.datahandlers.NettyPacketToHttpConsumerTest.REGULAR_RESPONSE_TIMEOUT; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.FullHttpResponse; @@ -90,8 +92,7 @@ public void testFutureGraphBuildout() throws Exception { final int NUM_PACKETS = 3; var clientConnectionPool = TrafficReplayerTopLevel.makeClientConnectionPool(new URI("http://localhost"), - false, 1, "testFutureGraphBuildout targetConnectionPool", - Duration.ofSeconds(30)); + false, 1, "testFutureGraphBuildout targetConnectionPool"); var connectionToConsumerMap = new HashMap(); var senderOrchestrator = new RequestSenderOrchestrator(clientConnectionPool, (s,c) -> connectionToConsumerMap.get(c.getSourceRequestIndex())); @@ -168,10 +169,10 @@ public void testThatSchedulingWorks() throws Exception { r -> TestHttpServerContext.makeResponse(r, Duration.ofMillis(100)))) { var testServerUri = httpServer.localhostEndpoint(); var clientConnectionPool = TrafficReplayerTopLevel.makeClientConnectionPool(testServerUri, false, - 1, "targetConnectionPool for testThatSchedulingWorks", - Duration.ofSeconds(30)); + 1, "targetConnectionPool for testThatSchedulingWorks"); var senderOrchestrator = - new RequestSenderOrchestrator(clientConnectionPool, NettyPacketToHttpConsumer::new); + new RequestSenderOrchestrator(clientConnectionPool, + (replaySession, ctx) -> new NettyPacketToHttpConsumer(replaySession, ctx, REGULAR_RESPONSE_TIMEOUT)); var baseTime = Instant.now(); Instant lastEndTime = baseTime; var scheduledItems = new ArrayList>(); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java index b0c749f19..e6557b754 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java @@ -150,8 +150,8 @@ public void testHttpResponseIsSuccessfullyCaptured(boolean useTls, boolean large var eventLoop = new NioEventLoopGroup(1, new DefaultThreadFactory("test")).next(); var replaySession = new ConnectionReplaySession(eventLoop, channelContext, () -> NettyPacketToHttpConsumer.createClientConnection(eventLoop, sslContext, - testServer.localhostEndpoint(), channelContext, REGULAR_RESPONSE_TIMEOUT)); - var nphc = new NettyPacketToHttpConsumer(replaySession, httpContext); + testServer.localhostEndpoint(), channelContext)); + var nphc = new NettyPacketToHttpConsumer(replaySession, httpContext, REGULAR_RESPONSE_TIMEOUT); nphc.consumeBytes((EXPECTED_REQUEST_STRING).getBytes(StandardCharsets.UTF_8)); var aggregatedResponse = nphc.finalizeRequest().get(); var responseBytePackets = aggregatedResponse.getCopyOfPackets(); @@ -203,12 +203,11 @@ private void testPeerResets(boolean useTls, boolean withServerReadTimeout, var timeShifter = new TimeShifter(); timeShifter.setFirstTimestamp(Instant.now()); clientConnectionPool = new ClientConnectionPool(testServer.localhostEndpoint(), sslContext, - "targetPool for testThatConnectionsAreKeptAliveAndShared", 1, - readTimeout); + "targetPool for testThatConnectionsAreKeptAliveAndShared", 1); var reqCtx = rootContext.getTestConnectionRequestContext(1); var nphc = new NettyPacketToHttpConsumer(clientConnectionPool - .buildConnectionReplaySession(reqCtx.getChannelKeyContext()), reqCtx); + .buildConnectionReplaySession(reqCtx.getChannelKeyContext()), reqCtx, REGULAR_RESPONSE_TIMEOUT); // purposefully send ONLY the beginning of a request nphc.consumeBytes("GET ".getBytes(StandardCharsets.UTF_8)); if (resultWaitTimeout.minus(readTimeout).isNegative()) { @@ -260,9 +259,10 @@ public void testThatConnectionsAreKeptAliveAndShared(boolean useTls, boolean lar var timeShifter = new TimeShifter(); timeShifter.setFirstTimestamp(Instant.now()); var clientConnectionPool = new ClientConnectionPool(testServer.localhostEndpoint(), sslContext, - "targetPool for testThatConnectionsAreKeptAliveAndShared", 1, REGULAR_RESPONSE_TIMEOUT); + "targetPool for testThatConnectionsAreKeptAliveAndShared", 1); var sendingFactory = new ReplayEngine( - new RequestSenderOrchestrator(clientConnectionPool, NettyPacketToHttpConsumer::new), + new RequestSenderOrchestrator(clientConnectionPool, + (replaySession, ctx1) -> new NettyPacketToHttpConsumer(replaySession, ctx1, REGULAR_RESPONSE_TIMEOUT)), new TestFlowController(), timeShifter); for (int j = 0; j < 2; ++j) { for (int i = 0; i < 2; ++i) { diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/FullReplayerWithTracingChecksTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/FullReplayerWithTracingChecksTest.java index 2620c7218..8757bc0a3 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/FullReplayerWithTracingChecksTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/FullReplayerWithTracingChecksTest.java @@ -97,7 +97,7 @@ public void testStreamWithRequestsWithCloseIsCommittedOnce(int numRequests) thro RootReplayerConstructorExtensions.makeClientConnectionPool(serverUri, 10), 10 * 1024 ); var blockingTrafficSource = new BlockingTrafficSource(trafficSource, Duration.ofMinutes(2))) { - tr.setupRunAndWaitForReplayToFinish(Duration.ofSeconds(70), blockingTrafficSource, + tr.setupRunAndWaitForReplayToFinish(Duration.ofSeconds(70), Duration.ofSeconds(30), blockingTrafficSource, new TimeShifter(10 * 1000), t -> { var wasNew = tuplesReceived.add(t.getRequestKey().toString()); diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/FullTrafficReplayerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/FullTrafficReplayerTest.java index 1020d9757..e09971520 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/FullTrafficReplayerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/FullTrafficReplayerTest.java @@ -85,7 +85,7 @@ public TrafficReplayerWithWaitOnClose(Duration maxWaitTime, String targetConnectionPoolName) throws SSLException { super(context, serverUri, authTransformerFactory, jsonTransformer, TrafficReplayerTopLevel.makeClientConnectionPool(serverUri, allowInsecureConnections, numSendingThreads, - targetConnectionPoolName, Duration.ofSeconds(30)), + targetConnectionPoolName), new TrafficStreamLimiter(maxConcurrentOutstandingRequests), new OrderedWorkerTracker<>()); this.maxWaitTime = maxWaitTime; } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/SlowAndExpiredTrafficStreamBecomesTwoTargetChannelsTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/SlowAndExpiredTrafficStreamBecomesTwoTargetChannelsTest.java index 68f27b711..1cc5a55e1 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/SlowAndExpiredTrafficStreamBecomesTwoTargetChannelsTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/SlowAndExpiredTrafficStreamBecomesTwoTargetChannelsTest.java @@ -103,11 +103,10 @@ public void test() throws Exception { new StaticAuthTransformerFactory("TEST"), new TransformationLoader().getTransformerFactoryLoader("localhost"), RootReplayerConstructorExtensions.makeClientConnectionPool(httpServer.localhostEndpoint(), true, - 0, "targetConnectionPool for SlowAndExpiredTrafficStreamBecomesTwoTargetChannelsTest", - Duration.ofSeconds(30)))) { + 0, "targetConnectionPool for SlowAndExpiredTrafficStreamBecomesTwoTargetChannelsTest"))) { new Thread(()->responseTracker.onCountDownFinished(Duration.ofSeconds(10), ()->replayer.shutdown(null).join())); - replayer.setupRunAndWaitForReplayWithShutdownChecks(Duration.ofMillis(1), + replayer.setupRunAndWaitForReplayWithShutdownChecks(Duration.ofMillis(1), Duration.ofSeconds(30), trafficSource, new TimeShifter(TIME_SPEEDUP_FACTOR), t->{}); } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/TrafficReplayerRunner.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/TrafficReplayerRunner.java index 1e312f1e2..7b70d60f0 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/TrafficReplayerRunner.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/e2etests/TrafficReplayerRunner.java @@ -195,8 +195,8 @@ private static void runTrafficReplayer(TrafficReplayerTopLevel trafficReplayer, try (var os = new NullOutputStream(); var trafficSource = captureSourceSupplier.get(); var blockingTrafficSource = new BlockingTrafficSource(trafficSource, Duration.ofMinutes(2))) { - trafficReplayer.setupRunAndWaitForReplayWithShutdownChecks(Duration.ofSeconds(70), blockingTrafficSource, - timeShifter, tupleReceiver); + trafficReplayer.setupRunAndWaitForReplayWithShutdownChecks(Duration.ofSeconds(70), Duration.ofSeconds(30), + blockingTrafficSource, timeShifter, tupleReceiver); } } diff --git a/TrafficCapture/trafficReplayer/src/testFixtures/java/org/opensearch/migrations/replay/RootReplayerConstructorExtensions.java b/TrafficCapture/trafficReplayer/src/testFixtures/java/org/opensearch/migrations/replay/RootReplayerConstructorExtensions.java index 4e56a3fcc..c6445ffd0 100644 --- a/TrafficCapture/trafficReplayer/src/testFixtures/java/org/opensearch/migrations/replay/RootReplayerConstructorExtensions.java +++ b/TrafficCapture/trafficReplayer/src/testFixtures/java/org/opensearch/migrations/replay/RootReplayerConstructorExtensions.java @@ -7,12 +7,9 @@ import javax.net.ssl.SSLException; import java.net.URI; -import java.time.Duration; public class RootReplayerConstructorExtensions extends TrafficReplayerTopLevel { - public static final Duration RESPONSE_TIMEOUT = Duration.ofSeconds(30); - public RootReplayerConstructorExtensions(IRootReplayerContext topContext, URI uri, IAuthTransformerFactory authTransformerFactory, @@ -43,27 +40,16 @@ public RootReplayerConstructorExtensions(IRootReplayerContext context, } public static ClientConnectionPool makeClientConnectionPool(URI serverUri) throws SSLException { - return makeClientConnectionPool(serverUri, null, RESPONSE_TIMEOUT); - } - - public static ClientConnectionPool makeClientConnectionPool(URI serverUri, String poolPrefix) throws SSLException { - return makeClientConnectionPool(serverUri, poolPrefix, RESPONSE_TIMEOUT); + return makeClientConnectionPool(serverUri, null); } public static ClientConnectionPool makeClientConnectionPool(URI serverUri, - String poolPrefix, - Duration timeout) throws SSLException { - return makeClientConnectionPool(serverUri, true, 0, poolPrefix, timeout); + String poolPrefix) throws SSLException { + return makeClientConnectionPool(serverUri, true, 0, poolPrefix); } public static ClientConnectionPool makeClientConnectionPool(URI serverUri, int numSendingThreads) throws SSLException { - return makeClientConnectionPool(serverUri, numSendingThreads, RESPONSE_TIMEOUT); - } - - public static ClientConnectionPool makeClientConnectionPool(URI serverUri, - int numSendingThreads, - Duration timeout) throws SSLException { - return makeClientConnectionPool(serverUri, true, numSendingThreads, null, timeout); + return makeClientConnectionPool(serverUri, true, numSendingThreads, null); } } From 14e56229050e81cb1059cb6575d03643f8330a6a Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Wed, 8 May 2024 14:46:07 -0500 Subject: [PATCH 2/4] Fix BacksideHttpWatcherHandler to not pass exceptions downstream Signed-off-by: Andre Kurait --- .../migrations/replay/netty/BacksideHttpWatcherHandler.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/netty/BacksideHttpWatcherHandler.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/netty/BacksideHttpWatcherHandler.java index 5bfd8d932..aa180b37e 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/netty/BacksideHttpWatcherHandler.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/netty/BacksideHttpWatcherHandler.java @@ -1,12 +1,9 @@ package org.opensearch.migrations.replay.netty; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.SimpleChannelInboundHandler; -import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.LastHttpContent; -import io.netty.util.ReferenceCountUtil; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import org.opensearch.migrations.replay.AggregatedRawResponse; @@ -54,7 +51,8 @@ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { aggregatedRawResponseBuilder.addErrorCause(cause); triggerResponseCallbackAndRemoveCallback(); - super.exceptionCaught(ctx, cause); + // AggregatedRawResponseBuilder will contain exception context so + // Exception caught event should not to propagate downstream } private void triggerResponseCallbackAndRemoveCallback() { From 9c367d20c1c52176fadd7d3c02390333f7e6a2dc Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Wed, 8 May 2024 14:46:23 -0500 Subject: [PATCH 3/4] Fix socketContext closed multiple times in NettyPacketToHttpConsumer Signed-off-by: Andre Kurait --- .../NettyPacketToHttpConsumer.java | 13 +-- .../NettyPacketToHttpConsumerTest.java | 85 +++++++++++++++++++ 2 files changed, 92 insertions(+), 6 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java index ce0306333..06a5da490 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java @@ -75,19 +75,20 @@ private static class ConnectionClosedListenerHandler extends ChannelInboundHandl ConnectionClosedListenerHandler(IReplayContexts.IChannelKeyContext channelKeyContext) { socketContext = channelKeyContext.createSocketContext(); } + @Override - public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + public void channelInactive(ChannelHandlerContext ctx) throws Exception { socketContext.close(); - super.channelUnregistered(ctx); + super.channelInactive(ctx); } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - socketContext.close(); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + socketContext.addTraceException(cause, true); log.atDebug().setMessage("Exception caught in ConnectionClosedListenerHandler." + - "Closing channel due to exception").setCause(cause).log(); + "Closing channel due to exception").setCause(cause).log(); ctx.close(); - ctx.fireExceptionCaught(cause); + super.exceptionCaught(ctx, cause); } } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java index e6557b754..4ca89cb44 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java @@ -7,6 +7,7 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.timeout.ReadTimeoutException; import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.concurrent.locks.LockSupport; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; @@ -315,6 +316,90 @@ public void testMetricCountsFor_testThatConnectionsAreKeptAliveAndShared(boolean Assertions.assertEquals(2, connectionsClosedCount); } + @ParameterizedTest + @CsvSource({"false", "true"}) + @Tag("longTest") + public void testResponseTakesLongerThanTimeout(boolean useTls) throws Exception { + var testDurationBuffer = Duration.ofSeconds(10); + var responseTimeout = Duration.ofMillis(50); + // Response shouldn't come back before everything else finishes + var responseDuration = Duration.ofHours(1); + try (var testServer = SimpleNettyHttpServer.makeServer(useTls, (requestFirstLine) -> { + parkForAtLeast(responseDuration); + return NettyPacketToHttpConsumerTest.makeResponseContext(requestFirstLine); + })) { + var sslContext = !testServer.localhostEndpoint().getScheme().equalsIgnoreCase("https") ? null : + SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); + var transformingHttpHandlerFactory = new PacketToTransformingHttpHandlerFactory( + new TransformationLoader().getTransformerFactoryLoader(null), null); + + var clientConnectionPool = new ClientConnectionPool(testServer.localhostEndpoint(), sslContext, + "targetPool for testReadTimeoutHandler_responseTakesLongerThanTimeout", 1); + + var timeShifter = new TimeShifter(); + timeShifter.setFirstTimestamp(Instant.now()); + + var sendingFactory = new ReplayEngine( + new RequestSenderOrchestrator(clientConnectionPool, + (replaySession, ctx1) -> new NettyPacketToHttpConsumer(replaySession, ctx1, responseTimeout)), + new TestFlowController(), timeShifter); + + var ctx = rootContext.getTestConnectionRequestContext("TEST", 0); + var requestFinishFuture = TrafficReplayerTopLevel.transformAndSendRequest(transformingHttpHandlerFactory, + sendingFactory, ctx, Instant.now(), Instant.now(), + () -> Stream.of(EXPECTED_REQUEST_STRING.getBytes(StandardCharsets.UTF_8))); + // Make sure finalize is triggered shortly after the response timeout + var aggregatedResponse = requestFinishFuture.get(responseTimeout.plus(testDurationBuffer)); + Assertions.assertInstanceOf(ReadTimeoutException.class, aggregatedResponse.getError()); + } + } + + @ParameterizedTest + @CsvSource({"false", "true"}) + public void testTimeBetweenRequestsLongerThanResponseTimeout(boolean useTls) throws Exception { + var testDurationBuffer = Duration.ofSeconds(10); + var responseTimeout = Duration.ofMillis(100); + var timeBetweenRequests = responseTimeout.plus(Duration.ofMillis(10)); + try (var testServer = SimpleNettyHttpServer.makeServer(useTls, + NettyPacketToHttpConsumerTest::makeResponseContext)) { + var sslContext = !testServer.localhostEndpoint().getScheme().equalsIgnoreCase("https") ? null : + SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); + var transformingHttpHandlerFactory = new PacketToTransformingHttpHandlerFactory( + new TransformationLoader().getTransformerFactoryLoader(null), null); + + var clientConnectionPool = new ClientConnectionPool(testServer.localhostEndpoint(), sslContext, + "targetPool for testTimeBetweenRequestsLongerThanResponseTimeout", 1); + + var timeShifter = new TimeShifter(); + timeShifter.setFirstTimestamp(Instant.now()); + var sendingFactory = new ReplayEngine( + new RequestSenderOrchestrator(clientConnectionPool, + (replaySession, ctx1) -> new NettyPacketToHttpConsumer(replaySession, ctx1, responseTimeout)), + new TestFlowController(), timeShifter); + for (int i = 0; i < 2; ++i) { + if (i != 0) { + parkForAtLeast(timeBetweenRequests); + } + var ctx = rootContext.getTestConnectionRequestContext("TEST", i); + var requestFinishFuture = TrafficReplayerTopLevel.transformAndSendRequest( + transformingHttpHandlerFactory, + sendingFactory, ctx, Instant.now(), Instant.now(), + () -> Stream.of(EXPECTED_REQUEST_STRING.getBytes(StandardCharsets.UTF_8))); + var aggregatedResponse = requestFinishFuture.get(testDurationBuffer); + Assertions.assertNull(aggregatedResponse.getError()); + var responseAsString = getResponsePacketsAsString(aggregatedResponse); + Assertions.assertEquals(EXPECTED_RESPONSE_STRING, responseAsString); + } + } + } + + private static void parkForAtLeast(Duration waitDuration) { + var responseTime = Instant.now().toEpochMilli() + waitDuration.toMillis(); + while (Instant.now().toEpochMilli() < responseTime) { + LockSupport.parkUntil(responseTime); + } + } + private class TestFlowController implements BufferedFlowController { @Override public void stopReadsPast(Instant pointInTime) {} From 2ae2959d5fb2486f1feae68f37182706f8105b5a Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Thu, 9 May 2024 00:32:16 -0500 Subject: [PATCH 4/4] Address PR Comments in NettyPacketToHttpConsumerTest Signed-off-by: Andre Kurait --- .../NettyPacketToHttpConsumerTest.java | 38 ++++++++++++------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java index 4ca89cb44..5f106787b 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java @@ -196,7 +196,7 @@ private void testPeerResets(boolean useTls, boolean withServerReadTimeout, Duration readTimeout, Duration resultWaitTimeout) throws Exception { ClientConnectionPool clientConnectionPool = null; try (var testServer = SimpleNettyHttpServer.makeServer(useTls, - withServerReadTimeout ? Duration.ofMillis(100) : null, + withServerReadTimeout ? readTimeout : null, NettyPacketToHttpConsumerTest::makeResponseContext)) { log.atError().setMessage("Got port " + testServer.port).log(); var sslContext = !useTls ? null : @@ -208,7 +208,7 @@ private void testPeerResets(boolean useTls, boolean withServerReadTimeout, var reqCtx = rootContext.getTestConnectionRequestContext(1); var nphc = new NettyPacketToHttpConsumer(clientConnectionPool - .buildConnectionReplaySession(reqCtx.getChannelKeyContext()), reqCtx, REGULAR_RESPONSE_TIMEOUT); + .buildConnectionReplaySession(reqCtx.getChannelKeyContext()), reqCtx, readTimeout); // purposefully send ONLY the beginning of a request nphc.consumeBytes("GET ".getBytes(StandardCharsets.UTF_8)); if (resultWaitTimeout.minus(readTimeout).isNegative()) { @@ -318,9 +318,7 @@ public void testMetricCountsFor_testThatConnectionsAreKeptAliveAndShared(boolean @ParameterizedTest @CsvSource({"false", "true"}) - @Tag("longTest") public void testResponseTakesLongerThanTimeout(boolean useTls) throws Exception { - var testDurationBuffer = Duration.ofSeconds(10); var responseTimeout = Duration.ofMillis(50); // Response shouldn't come back before everything else finishes var responseDuration = Duration.ofHours(1); @@ -337,7 +335,9 @@ public void testResponseTakesLongerThanTimeout(boolean useTls) throws Exception "targetPool for testReadTimeoutHandler_responseTakesLongerThanTimeout", 1); var timeShifter = new TimeShifter(); - timeShifter.setFirstTimestamp(Instant.now()); + var firstRequestTime = Instant.now(); + timeShifter.setFirstTimestamp(firstRequestTime); + log.atInfo().setMessage("Initial Timestamp: " + firstRequestTime).log(); var sendingFactory = new ReplayEngine( new RequestSenderOrchestrator(clientConnectionPool, @@ -348,8 +348,9 @@ public void testResponseTakesLongerThanTimeout(boolean useTls) throws Exception var requestFinishFuture = TrafficReplayerTopLevel.transformAndSendRequest(transformingHttpHandlerFactory, sendingFactory, ctx, Instant.now(), Instant.now(), () -> Stream.of(EXPECTED_REQUEST_STRING.getBytes(StandardCharsets.UTF_8))); - // Make sure finalize is triggered shortly after the response timeout - var aggregatedResponse = requestFinishFuture.get(responseTimeout.plus(testDurationBuffer)); + var maxTimeToWaitForTimeoutOrResponse = Duration.ofSeconds(10); + var aggregatedResponse = requestFinishFuture.get(maxTimeToWaitForTimeoutOrResponse); + log.atInfo().setMessage("RequestFinishFuture finished").log(); Assertions.assertInstanceOf(ReadTimeoutException.class, aggregatedResponse.getError()); } } @@ -357,9 +358,10 @@ public void testResponseTakesLongerThanTimeout(boolean useTls) throws Exception @ParameterizedTest @CsvSource({"false", "true"}) public void testTimeBetweenRequestsLongerThanResponseTimeout(boolean useTls) throws Exception { - var testDurationBuffer = Duration.ofSeconds(10); var responseTimeout = Duration.ofMillis(100); var timeBetweenRequests = responseTimeout.plus(Duration.ofMillis(10)); + log.atInfo().setMessage("Running testTimeBetweenRequestsLongerThanResponseTimeout with responseTimeout " + + responseTimeout + " and timeBetweenRequests" + timeBetweenRequests).log(); try (var testServer = SimpleNettyHttpServer.makeServer(useTls, NettyPacketToHttpConsumerTest::makeResponseContext)) { var sslContext = !testServer.localhostEndpoint().getScheme().equalsIgnoreCase("https") ? null : @@ -371,24 +373,32 @@ public void testTimeBetweenRequestsLongerThanResponseTimeout(boolean useTls) thr "targetPool for testTimeBetweenRequestsLongerThanResponseTimeout", 1); var timeShifter = new TimeShifter(); - timeShifter.setFirstTimestamp(Instant.now()); + var firstRequestTime = Instant.now(); + timeShifter.setFirstTimestamp(firstRequestTime); + log.atInfo().setMessage("Initial Timestamp: " + firstRequestTime).log(); var sendingFactory = new ReplayEngine( new RequestSenderOrchestrator(clientConnectionPool, (replaySession, ctx1) -> new NettyPacketToHttpConsumer(replaySession, ctx1, responseTimeout)), new TestFlowController(), timeShifter); - for (int i = 0; i < 2; ++i) { - if (i != 0) { - parkForAtLeast(timeBetweenRequests); - } + int i = 0; + while (true) { var ctx = rootContext.getTestConnectionRequestContext("TEST", i); + log.atInfo().setMessage("Starting transformAndSendRequest for request " + i).log(); var requestFinishFuture = TrafficReplayerTopLevel.transformAndSendRequest( transformingHttpHandlerFactory, sendingFactory, ctx, Instant.now(), Instant.now(), () -> Stream.of(EXPECTED_REQUEST_STRING.getBytes(StandardCharsets.UTF_8))); - var aggregatedResponse = requestFinishFuture.get(testDurationBuffer); + var maxTimeToWaitForTimeoutOrResponse = Duration.ofSeconds(10); + var aggregatedResponse = requestFinishFuture.get(maxTimeToWaitForTimeoutOrResponse); + log.atInfo().setMessage("RequestFinishFuture finished for request " + i).log(); Assertions.assertNull(aggregatedResponse.getError()); var responseAsString = getResponsePacketsAsString(aggregatedResponse); Assertions.assertEquals(EXPECTED_RESPONSE_STRING, responseAsString); + if (i > 1) { + break; + } + parkForAtLeast(timeBetweenRequests); + i++; } } }