Skip to content

Commit

Permalink
Address PR Comments in NettyPacketToHttpConsumerTest
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <akurait@amazon.com>
  • Loading branch information
AndreKurait committed May 9, 2024
1 parent 9c367d2 commit 2ae2959
Showing 1 changed file with 24 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ private void testPeerResets(boolean useTls, boolean withServerReadTimeout,
Duration readTimeout, Duration resultWaitTimeout) throws Exception {
ClientConnectionPool clientConnectionPool = null;
try (var testServer = SimpleNettyHttpServer.makeServer(useTls,
withServerReadTimeout ? Duration.ofMillis(100) : null,
withServerReadTimeout ? readTimeout : null,
NettyPacketToHttpConsumerTest::makeResponseContext)) {
log.atError().setMessage("Got port " + testServer.port).log();
var sslContext = !useTls ? null :
Expand All @@ -208,7 +208,7 @@ private void testPeerResets(boolean useTls, boolean withServerReadTimeout,

var reqCtx = rootContext.getTestConnectionRequestContext(1);
var nphc = new NettyPacketToHttpConsumer(clientConnectionPool
.buildConnectionReplaySession(reqCtx.getChannelKeyContext()), reqCtx, REGULAR_RESPONSE_TIMEOUT);
.buildConnectionReplaySession(reqCtx.getChannelKeyContext()), reqCtx, readTimeout);
// purposefully send ONLY the beginning of a request
nphc.consumeBytes("GET ".getBytes(StandardCharsets.UTF_8));
if (resultWaitTimeout.minus(readTimeout).isNegative()) {
Expand Down Expand Up @@ -318,9 +318,7 @@ public void testMetricCountsFor_testThatConnectionsAreKeptAliveAndShared(boolean

@ParameterizedTest
@CsvSource({"false", "true"})
@Tag("longTest")
public void testResponseTakesLongerThanTimeout(boolean useTls) throws Exception {
var testDurationBuffer = Duration.ofSeconds(10);
var responseTimeout = Duration.ofMillis(50);
// Response shouldn't come back before everything else finishes
var responseDuration = Duration.ofHours(1);
Expand All @@ -337,7 +335,9 @@ public void testResponseTakesLongerThanTimeout(boolean useTls) throws Exception
"targetPool for testReadTimeoutHandler_responseTakesLongerThanTimeout", 1);

var timeShifter = new TimeShifter();
timeShifter.setFirstTimestamp(Instant.now());
var firstRequestTime = Instant.now();
timeShifter.setFirstTimestamp(firstRequestTime);
log.atInfo().setMessage("Initial Timestamp: " + firstRequestTime).log();

var sendingFactory = new ReplayEngine(
new RequestSenderOrchestrator(clientConnectionPool,
Expand All @@ -348,18 +348,20 @@ public void testResponseTakesLongerThanTimeout(boolean useTls) throws Exception
var requestFinishFuture = TrafficReplayerTopLevel.transformAndSendRequest(transformingHttpHandlerFactory,
sendingFactory, ctx, Instant.now(), Instant.now(),
() -> Stream.of(EXPECTED_REQUEST_STRING.getBytes(StandardCharsets.UTF_8)));
// Make sure finalize is triggered shortly after the response timeout
var aggregatedResponse = requestFinishFuture.get(responseTimeout.plus(testDurationBuffer));
var maxTimeToWaitForTimeoutOrResponse = Duration.ofSeconds(10);
var aggregatedResponse = requestFinishFuture.get(maxTimeToWaitForTimeoutOrResponse);
log.atInfo().setMessage("RequestFinishFuture finished").log();
Assertions.assertInstanceOf(ReadTimeoutException.class, aggregatedResponse.getError());
}
}

@ParameterizedTest
@CsvSource({"false", "true"})
public void testTimeBetweenRequestsLongerThanResponseTimeout(boolean useTls) throws Exception {
var testDurationBuffer = Duration.ofSeconds(10);
var responseTimeout = Duration.ofMillis(100);
var timeBetweenRequests = responseTimeout.plus(Duration.ofMillis(10));
log.atInfo().setMessage("Running testTimeBetweenRequestsLongerThanResponseTimeout with responseTimeout " +
responseTimeout + " and timeBetweenRequests" + timeBetweenRequests).log();
try (var testServer = SimpleNettyHttpServer.makeServer(useTls,
NettyPacketToHttpConsumerTest::makeResponseContext)) {
var sslContext = !testServer.localhostEndpoint().getScheme().equalsIgnoreCase("https") ? null :
Expand All @@ -371,24 +373,32 @@ public void testTimeBetweenRequestsLongerThanResponseTimeout(boolean useTls) thr
"targetPool for testTimeBetweenRequestsLongerThanResponseTimeout", 1);

var timeShifter = new TimeShifter();
timeShifter.setFirstTimestamp(Instant.now());
var firstRequestTime = Instant.now();
timeShifter.setFirstTimestamp(firstRequestTime);
log.atInfo().setMessage("Initial Timestamp: " + firstRequestTime).log();
var sendingFactory = new ReplayEngine(
new RequestSenderOrchestrator(clientConnectionPool,
(replaySession, ctx1) -> new NettyPacketToHttpConsumer(replaySession, ctx1, responseTimeout)),
new TestFlowController(), timeShifter);
for (int i = 0; i < 2; ++i) {
if (i != 0) {
parkForAtLeast(timeBetweenRequests);
}
int i = 0;
while (true) {
var ctx = rootContext.getTestConnectionRequestContext("TEST", i);
log.atInfo().setMessage("Starting transformAndSendRequest for request " + i).log();
var requestFinishFuture = TrafficReplayerTopLevel.transformAndSendRequest(
transformingHttpHandlerFactory,
sendingFactory, ctx, Instant.now(), Instant.now(),
() -> Stream.of(EXPECTED_REQUEST_STRING.getBytes(StandardCharsets.UTF_8)));
var aggregatedResponse = requestFinishFuture.get(testDurationBuffer);
var maxTimeToWaitForTimeoutOrResponse = Duration.ofSeconds(10);
var aggregatedResponse = requestFinishFuture.get(maxTimeToWaitForTimeoutOrResponse);
log.atInfo().setMessage("RequestFinishFuture finished for request " + i).log();
Assertions.assertNull(aggregatedResponse.getError());
var responseAsString = getResponsePacketsAsString(aggregatedResponse);
Assertions.assertEquals(EXPECTED_RESPONSE_STRING, responseAsString);
if (i > 1) {
break;
}
parkForAtLeast(timeBetweenRequests);
i++;
}
}
}
Expand Down

0 comments on commit 2ae2959

Please sign in to comment.