Skip to content

Commit

Permalink
Fix socketContext closed multiple times in NettyPacketToHttpConsumer
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <akurait@amazon.com>
  • Loading branch information
AndreKurait committed May 9, 2024
1 parent 14e5622 commit 9c367d2
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,20 @@ private static class ConnectionClosedListenerHandler extends ChannelInboundHandl
ConnectionClosedListenerHandler(IReplayContexts.IChannelKeyContext channelKeyContext) {
socketContext = channelKeyContext.createSocketContext();
}

@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
socketContext.close();
super.channelUnregistered(ctx);
super.channelInactive(ctx);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
socketContext.close();
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
socketContext.addTraceException(cause, true);
log.atDebug().setMessage("Exception caught in ConnectionClosedListenerHandler." +
"Closing channel due to exception").setCause(cause).log();
"Closing channel due to exception").setCause(cause).log();
ctx.close();
ctx.fireExceptionCaught(cause);
super.exceptionCaught(ctx, cause);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.locks.LockSupport;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -315,6 +316,90 @@ public void testMetricCountsFor_testThatConnectionsAreKeptAliveAndShared(boolean
Assertions.assertEquals(2, connectionsClosedCount);
}

@ParameterizedTest
@CsvSource({"false", "true"})
@Tag("longTest")
public void testResponseTakesLongerThanTimeout(boolean useTls) throws Exception {
var testDurationBuffer = Duration.ofSeconds(10);
var responseTimeout = Duration.ofMillis(50);
// Response shouldn't come back before everything else finishes
var responseDuration = Duration.ofHours(1);
try (var testServer = SimpleNettyHttpServer.makeServer(useTls, (requestFirstLine) -> {
parkForAtLeast(responseDuration);
return NettyPacketToHttpConsumerTest.makeResponseContext(requestFirstLine);
})) {
var sslContext = !testServer.localhostEndpoint().getScheme().equalsIgnoreCase("https") ? null :
SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
var transformingHttpHandlerFactory = new PacketToTransformingHttpHandlerFactory(
new TransformationLoader().getTransformerFactoryLoader(null), null);

var clientConnectionPool = new ClientConnectionPool(testServer.localhostEndpoint(), sslContext,
"targetPool for testReadTimeoutHandler_responseTakesLongerThanTimeout", 1);

var timeShifter = new TimeShifter();
timeShifter.setFirstTimestamp(Instant.now());

var sendingFactory = new ReplayEngine(
new RequestSenderOrchestrator(clientConnectionPool,
(replaySession, ctx1) -> new NettyPacketToHttpConsumer(replaySession, ctx1, responseTimeout)),
new TestFlowController(), timeShifter);

var ctx = rootContext.getTestConnectionRequestContext("TEST", 0);
var requestFinishFuture = TrafficReplayerTopLevel.transformAndSendRequest(transformingHttpHandlerFactory,
sendingFactory, ctx, Instant.now(), Instant.now(),
() -> Stream.of(EXPECTED_REQUEST_STRING.getBytes(StandardCharsets.UTF_8)));
// Make sure finalize is triggered shortly after the response timeout
var aggregatedResponse = requestFinishFuture.get(responseTimeout.plus(testDurationBuffer));
Assertions.assertInstanceOf(ReadTimeoutException.class, aggregatedResponse.getError());
}
}

@ParameterizedTest
@CsvSource({"false", "true"})
public void testTimeBetweenRequestsLongerThanResponseTimeout(boolean useTls) throws Exception {
var testDurationBuffer = Duration.ofSeconds(10);
var responseTimeout = Duration.ofMillis(100);
var timeBetweenRequests = responseTimeout.plus(Duration.ofMillis(10));
try (var testServer = SimpleNettyHttpServer.makeServer(useTls,
NettyPacketToHttpConsumerTest::makeResponseContext)) {
var sslContext = !testServer.localhostEndpoint().getScheme().equalsIgnoreCase("https") ? null :
SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
var transformingHttpHandlerFactory = new PacketToTransformingHttpHandlerFactory(
new TransformationLoader().getTransformerFactoryLoader(null), null);

var clientConnectionPool = new ClientConnectionPool(testServer.localhostEndpoint(), sslContext,
"targetPool for testTimeBetweenRequestsLongerThanResponseTimeout", 1);

var timeShifter = new TimeShifter();
timeShifter.setFirstTimestamp(Instant.now());
var sendingFactory = new ReplayEngine(
new RequestSenderOrchestrator(clientConnectionPool,
(replaySession, ctx1) -> new NettyPacketToHttpConsumer(replaySession, ctx1, responseTimeout)),
new TestFlowController(), timeShifter);
for (int i = 0; i < 2; ++i) {
if (i != 0) {
parkForAtLeast(timeBetweenRequests);
}
var ctx = rootContext.getTestConnectionRequestContext("TEST", i);
var requestFinishFuture = TrafficReplayerTopLevel.transformAndSendRequest(
transformingHttpHandlerFactory,
sendingFactory, ctx, Instant.now(), Instant.now(),
() -> Stream.of(EXPECTED_REQUEST_STRING.getBytes(StandardCharsets.UTF_8)));
var aggregatedResponse = requestFinishFuture.get(testDurationBuffer);
Assertions.assertNull(aggregatedResponse.getError());
var responseAsString = getResponsePacketsAsString(aggregatedResponse);
Assertions.assertEquals(EXPECTED_RESPONSE_STRING, responseAsString);
}
}
}

private static void parkForAtLeast(Duration waitDuration) {
var responseTime = Instant.now().toEpochMilli() + waitDuration.toMillis();
while (Instant.now().toEpochMilli() < responseTime) {
LockSupport.parkUntil(responseTime);
}
}

private class TestFlowController implements BufferedFlowController {
@Override
public void stopReadsPast(Instant pointInTime) {}
Expand Down

0 comments on commit 9c367d2

Please sign in to comment.