diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java index ce0306333..06a5da490 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java @@ -75,19 +75,20 @@ private static class ConnectionClosedListenerHandler extends ChannelInboundHandl ConnectionClosedListenerHandler(IReplayContexts.IChannelKeyContext channelKeyContext) { socketContext = channelKeyContext.createSocketContext(); } + @Override - public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { + public void channelInactive(ChannelHandlerContext ctx) throws Exception { socketContext.close(); - super.channelUnregistered(ctx); + super.channelInactive(ctx); } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - socketContext.close(); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + socketContext.addTraceException(cause, true); log.atDebug().setMessage("Exception caught in ConnectionClosedListenerHandler." + - "Closing channel due to exception").setCause(cause).log(); + "Closing channel due to exception").setCause(cause).log(); ctx.close(); - ctx.fireExceptionCaught(cause); + super.exceptionCaught(ctx, cause); } } diff --git a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java index e6557b754..4ca89cb44 100644 --- a/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java +++ b/TrafficCapture/trafficReplayer/src/test/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumerTest.java @@ -7,6 +7,7 @@ import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.timeout.ReadTimeoutException; import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.concurrent.locks.LockSupport; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; @@ -315,6 +316,90 @@ public void testMetricCountsFor_testThatConnectionsAreKeptAliveAndShared(boolean Assertions.assertEquals(2, connectionsClosedCount); } + @ParameterizedTest + @CsvSource({"false", "true"}) + @Tag("longTest") + public void testResponseTakesLongerThanTimeout(boolean useTls) throws Exception { + var testDurationBuffer = Duration.ofSeconds(10); + var responseTimeout = Duration.ofMillis(50); + // Response shouldn't come back before everything else finishes + var responseDuration = Duration.ofHours(1); + try (var testServer = SimpleNettyHttpServer.makeServer(useTls, (requestFirstLine) -> { + parkForAtLeast(responseDuration); + return NettyPacketToHttpConsumerTest.makeResponseContext(requestFirstLine); + })) { + var sslContext = !testServer.localhostEndpoint().getScheme().equalsIgnoreCase("https") ? null : + SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); + var transformingHttpHandlerFactory = new PacketToTransformingHttpHandlerFactory( + new TransformationLoader().getTransformerFactoryLoader(null), null); + + var clientConnectionPool = new ClientConnectionPool(testServer.localhostEndpoint(), sslContext, + "targetPool for testReadTimeoutHandler_responseTakesLongerThanTimeout", 1); + + var timeShifter = new TimeShifter(); + timeShifter.setFirstTimestamp(Instant.now()); + + var sendingFactory = new ReplayEngine( + new RequestSenderOrchestrator(clientConnectionPool, + (replaySession, ctx1) -> new NettyPacketToHttpConsumer(replaySession, ctx1, responseTimeout)), + new TestFlowController(), timeShifter); + + var ctx = rootContext.getTestConnectionRequestContext("TEST", 0); + var requestFinishFuture = TrafficReplayerTopLevel.transformAndSendRequest(transformingHttpHandlerFactory, + sendingFactory, ctx, Instant.now(), Instant.now(), + () -> Stream.of(EXPECTED_REQUEST_STRING.getBytes(StandardCharsets.UTF_8))); + // Make sure finalize is triggered shortly after the response timeout + var aggregatedResponse = requestFinishFuture.get(responseTimeout.plus(testDurationBuffer)); + Assertions.assertInstanceOf(ReadTimeoutException.class, aggregatedResponse.getError()); + } + } + + @ParameterizedTest + @CsvSource({"false", "true"}) + public void testTimeBetweenRequestsLongerThanResponseTimeout(boolean useTls) throws Exception { + var testDurationBuffer = Duration.ofSeconds(10); + var responseTimeout = Duration.ofMillis(100); + var timeBetweenRequests = responseTimeout.plus(Duration.ofMillis(10)); + try (var testServer = SimpleNettyHttpServer.makeServer(useTls, + NettyPacketToHttpConsumerTest::makeResponseContext)) { + var sslContext = !testServer.localhostEndpoint().getScheme().equalsIgnoreCase("https") ? null : + SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build(); + var transformingHttpHandlerFactory = new PacketToTransformingHttpHandlerFactory( + new TransformationLoader().getTransformerFactoryLoader(null), null); + + var clientConnectionPool = new ClientConnectionPool(testServer.localhostEndpoint(), sslContext, + "targetPool for testTimeBetweenRequestsLongerThanResponseTimeout", 1); + + var timeShifter = new TimeShifter(); + timeShifter.setFirstTimestamp(Instant.now()); + var sendingFactory = new ReplayEngine( + new RequestSenderOrchestrator(clientConnectionPool, + (replaySession, ctx1) -> new NettyPacketToHttpConsumer(replaySession, ctx1, responseTimeout)), + new TestFlowController(), timeShifter); + for (int i = 0; i < 2; ++i) { + if (i != 0) { + parkForAtLeast(timeBetweenRequests); + } + var ctx = rootContext.getTestConnectionRequestContext("TEST", i); + var requestFinishFuture = TrafficReplayerTopLevel.transformAndSendRequest( + transformingHttpHandlerFactory, + sendingFactory, ctx, Instant.now(), Instant.now(), + () -> Stream.of(EXPECTED_REQUEST_STRING.getBytes(StandardCharsets.UTF_8))); + var aggregatedResponse = requestFinishFuture.get(testDurationBuffer); + Assertions.assertNull(aggregatedResponse.getError()); + var responseAsString = getResponsePacketsAsString(aggregatedResponse); + Assertions.assertEquals(EXPECTED_RESPONSE_STRING, responseAsString); + } + } + } + + private static void parkForAtLeast(Duration waitDuration) { + var responseTime = Instant.now().toEpochMilli() + waitDuration.toMillis(); + while (Instant.now().toEpochMilli() < responseTime) { + LockSupport.parkUntil(responseTime); + } + } + private class TestFlowController implements BufferedFlowController { @Override public void stopReadsPast(Instant pointInTime) {}