Skip to content

Commit

Permalink
Merge branch 'main' into FixFlakyTests
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreKurait authored Apr 16, 2024
2 parents 85c91cd + 6fd892c commit 3bdecac
Show file tree
Hide file tree
Showing 16 changed files with 240 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,12 @@ class CommonUtils {
}

copyFile("jars", "/jars")
def jvmParams = "-XX:MaxRAMPercentage=80.0 -XX:+ExitOnOutOfMemoryError"
// can't set the environment variable from the runtimeClasspath because the Dockerfile is
// constructed in the configuration phase and the classpath won't be realized until the
// execution phase. Therefore, we need to have docker run the command to resolve the classpath
// and it's simplest to pack that up into a helper script.
runCommand("printf \"#!/bin/sh\\njava -XX:MaxRAMPercentage=80.0 -cp `echo /jars/*.jar | tr \\ :` \\\"\\\$@\\\" \" > /runJavaWithClasspath.sh");
runCommand("printf \"#!/bin/sh\\njava ${jvmParams} -cp `echo /jars/*.jar | tr \\ :` \\\"\\\$@\\\" \" > /runJavaWithClasspath.sh");
runCommand("chmod +x /runJavaWithClasspath.sh")
// container stay-alive
defaultCommand('tail', '-f', '/dev/null')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,8 @@ public ChannelFuture getOutboundConnectionFuture(EventLoop eventLoop) {
private void logProgressAtInterval(Level logLevel, EventLoop eventLoop,
ExpiringSubstitutableItemPool<ChannelFuture, Void> channelPoolMap,
Duration frequency) {
eventLoop.schedule(() -> {
log.atLevel(logLevel).log(channelPoolMap.getStats().toString());
logProgressAtInterval(logLevel, eventLoop, channelPoolMap, frequency);
}, frequency.toMillis(), TimeUnit.MILLISECONDS);
eventLoop.scheduleAtFixedRate(() -> log.atLevel(logLevel).log(channelPoolMap.getStats().toString()),
frequency.toMillis(), frequency.toMillis(), TimeUnit.MILLISECONDS);
}

private ChannelFuture buildConnectionFuture(EventLoop eventLoop) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
import lombok.SneakyThrows;
Expand All @@ -23,15 +24,18 @@
import org.opensearch.migrations.replay.util.StringTrackableCompletableFuture;

import java.net.URI;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

@Slf4j
public class ClientConnectionPool {

private final URI serverUri;
private final SslContext sslContext;
public final NioEventLoopGroup eventLoopGroup;
private final LoadingCache<Key, ConnectionReplaySession> connectionId2ChannelCache;
private final Duration timeout;

@EqualsAndHashCode
@AllArgsConstructor
Expand All @@ -44,9 +48,14 @@ private Key getKey(String connectionId, int sessionNumber) {
return new Key(connectionId, sessionNumber);
}

public ClientConnectionPool(URI serverUri, SslContext sslContext, String targetConnectionPoolName, int numThreads) {
public ClientConnectionPool(@NonNull URI serverUri,
SslContext sslContext,
@NonNull String targetConnectionPoolName,
int numThreads,
@NonNull Duration timeout) {
this.serverUri = serverUri;
this.sslContext = sslContext;
this.timeout = timeout;
this.eventLoopGroup =
new NioEventLoopGroup(numThreads, new DefaultThreadFactory(targetConnectionPoolName));

Expand All @@ -55,7 +64,7 @@ public ClientConnectionPool(URI serverUri, SslContext sslContext, String targetC
}));
}

public ConnectionReplaySession buildConnectionReplaySession(final IReplayContexts.IChannelKeyContext channelKeyCtx) {
public ConnectionReplaySession buildConnectionReplaySession(IReplayContexts.IChannelKeyContext channelKeyCtx) {
if (eventLoopGroup.isShuttingDown()) {
throw new IllegalStateException("Event loop group is shutting down. Not creating a new session.");
}
Expand All @@ -74,7 +83,7 @@ public ConnectionReplaySession buildConnectionReplaySession(final IReplayContext
return new AdaptiveRateLimiter<String, ChannelFuture>()
.get(() -> {
var channelFuture = NettyPacketToHttpConsumer.createClientConnection(eventLoop,
sslContext, serverUri, connectionContext);
sslContext, serverUri, connectionContext, timeout);
return getCompletedChannelFutureAsCompletableFuture(connectionContext, channelFuture);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.opensearch.migrations.replay.tracing.RootReplayerContext;
import org.opensearch.migrations.replay.traffic.source.TrafficStreamLimiter;
import org.opensearch.migrations.replay.util.ActiveContextMonitor;
import org.opensearch.migrations.replay.util.OrderedWorkerTracker;
import org.opensearch.migrations.tracing.ActiveContextTracker;
Expand Down Expand Up @@ -128,6 +129,28 @@ public static class Parameters {
String useSigV4ServiceAndRegion;


@Parameter(required = false,
names = "--transformer-config",
arity = 1,
description = "Configuration of message transformers. Either as a string that identifies the " +
"transformer that should be run (with default settings) or as json to specify options " +
"as well as multiple transformers to run in sequence. " +
"For json, keys are the (simple) names of the loaded transformers and values are the " +
"configuration passed to each of the transformers.")
String transformerConfig;
@Parameter(required = false,
names = "--transformer-config-file",
arity = 1,
description = "Path to the JSON configuration file of message transformers.")
String transformerConfigFile;
@Parameter(required = false,
names = "--user-agent",
arity = 1,
description = "For HTTP requests to the target cluster, append this string (after \"; \") to" +
"the existing user-agent field or if the field wasn't present, simply use this value")
String userAgent;


@Parameter(required = false,
names = {"-i", "--input"},
arity=1,
Expand Down Expand Up @@ -162,6 +185,14 @@ public static class Parameters {
description = "Number of threads to use to send requests from.")
int numClientThreads = 0;


// https://github.com/opensearch-project/opensearch-java/blob/main/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5TransportBuilder.java#L49-L54
@Parameter(required = false,
names = {"--target-response-timeout"},
arity = 1,
description = "Seconds to wait before timing out a replayed request to the target.")
int targetServerResponseTimeoutSeconds = 30;

@Parameter(required = false,
names = {"--kafka-traffic-brokers"},
arity=1,
Expand Down Expand Up @@ -194,26 +225,6 @@ public static class Parameters {
description = "Endpoint (host:port) for the OpenTelemetry Collector to which metrics logs should be" +
"forwarded. If no value is provided, metrics will not be forwarded.")
String otelCollectorEndpoint;
@Parameter(required = false,
names = "--transformer-config",
arity = 1,
description = "Configuration of message transformers. Either as a string that identifies the " +
"transformer that should be run (with default settings) or as json to specify options " +
"as well as multiple transformers to run in sequence. " +
"For json, keys are the (simple) names of the loaded transformers and values are the " +
"configuration passed to each of the transformers.")
String transformerConfig;
@Parameter(required = false,
names = "--transformer-config-file",
arity = 1,
description = "Path to the JSON configuration file of message transformers.")
String transformerConfigFile;
@Parameter(required = false,
names = "--user-agent",
arity = 1,
description = "For HTTP requests to the target cluster, append this string (after \"; \") to" +
"the existing user-agent field or if the field wasn't present, simply use this value")
String userAgent;
}

private static Parameters parseArgs(String[] args) {
Expand Down Expand Up @@ -300,16 +311,18 @@ public static void main(String[] args) throws Exception {
var orderedRequestTracker = new OrderedWorkerTracker<Void>();
var tr = new TrafficReplayerTopLevel(topContext, uri, authTransformer,
new TransformationLoader().getTransformerFactoryLoader(uri.getHost(), params.userAgent, transformerConfig),
params.allowInsecureConnections, params.numClientThreads, params.maxConcurrentRequests,
orderedRequestTracker);
TrafficReplayerTopLevel.makeClientConnectionPool(
uri, params.allowInsecureConnections, params.numClientThreads,
Duration.ofSeconds(params.targetServerResponseTimeoutSeconds)),
new TrafficStreamLimiter(params.maxConcurrentRequests), orderedRequestTracker);
activeContextMonitor = new ActiveContextMonitor(
globalContextTracker, perContextTracker, orderedRequestTracker, 64,
cf->cf.formatAsString(TrafficReplayerTopLevel::formatWorkItem), activeContextLogger);
ActiveContextMonitor finalActiveContextMonitor = activeContextMonitor;
scheduledExecutorService.scheduleAtFixedRate(() -> {
activeContextLogger.atInfo().setMessage(() -> "Total requests outstanding: " + tr.requestWorkTracker.size()).log();
scheduledExecutorService.scheduleAtFixedRate(()->{
activeContextLogger.atInfo().setMessage(()->"Total requests outstanding: " + tr.requestWorkTracker.size()).log();
finalActiveContextMonitor.run();
},
},
ACTIVE_WORK_MONITOR_CADENCE_MS, ACTIVE_WORK_MONITOR_CADENCE_MS, TimeUnit.MILLISECONDS);

setupShutdownHookForReplayer(tr);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,79 +82,6 @@ public int size() {
private final AtomicReference<Error> shutdownReasonRef;
private final AtomicReference<CompletableFuture<Void>> shutdownFutureRef;

public TrafficReplayerTopLevel(IRootReplayerContext context,
URI serverUri,
IAuthTransformerFactory authTransformerFactory,
boolean allowInsecureConnections)
throws SSLException {
this(context, serverUri, authTransformerFactory,
new TransformationLoader().getTransformerFactoryLoader(serverUri.getHost()), allowInsecureConnections);
}

public TrafficReplayerTopLevel(IRootReplayerContext topContext,
URI uri,
IAuthTransformerFactory authTransformerFactory,
IJsonTransformer jsonTransformer,
boolean allowInsecureConnections)
throws SSLException {
this(topContext, uri, authTransformerFactory, jsonTransformer, allowInsecureConnections, 0,
1024);
}

public TrafficReplayerTopLevel(IRootReplayerContext topContext,
URI uri,
IAuthTransformerFactory authTransformer,
IJsonTransformer jsonTransformer,
boolean allowInsecureConnections,
int numClientThreads,
int maxConcurrentRequests) throws SSLException {
this(topContext, uri, authTransformer, jsonTransformer, allowInsecureConnections, numClientThreads,
maxConcurrentRequests, new ConcurrentHashMapWorkTracker<>());
}

public TrafficReplayerTopLevel(IRootReplayerContext topContext,
URI uri,
IAuthTransformerFactory authTransformer,
IJsonTransformer jsonTransformer,
boolean allowInsecureConnections,
int numClientThreads,
int maxConcurrentRequests,
IStreamableWorkTracker<Void> workTracker) throws SSLException {
this(topContext, uri, authTransformer, jsonTransformer, allowInsecureConnections,
numClientThreads, maxConcurrentRequests,
getTargetConnectionPoolName(targetConnectionPoolUniqueCounter.getAndIncrement()),
workTracker);
}

public TrafficReplayerTopLevel(IRootReplayerContext topContext,
URI uri,
IAuthTransformerFactory authTransformer,
IJsonTransformer jsonTransformer,
boolean allowInsecureConnections,
int numClientThreads,
int maxConcurrentRequests,
String connectionPoolName) throws SSLException {
this(topContext, uri, authTransformer, jsonTransformer, allowInsecureConnections,
numClientThreads, maxConcurrentRequests,
connectionPoolName,
new ConcurrentHashMapWorkTracker<>());
}

public TrafficReplayerTopLevel(IRootReplayerContext context,
URI serverUri,
IAuthTransformerFactory authTransformerFactory,
IJsonTransformer jsonTransformer,
boolean allowInsecureConnections,
int numSendingThreads,
int maxConcurrentOutstandingRequests,
String connectionPoolName,
IStreamableWorkTracker<Void> workTracker) throws SSLException {
this(context, serverUri, authTransformerFactory, jsonTransformer,
new ClientConnectionPool(serverUri,
loadSslContext(serverUri, allowInsecureConnections), connectionPoolName, numSendingThreads),
new TrafficStreamLimiter(maxConcurrentOutstandingRequests), workTracker);
}

public TrafficReplayerTopLevel(IRootReplayerContext context,
URI serverUri,
IAuthTransformerFactory authTransformerFactory,
Expand All @@ -169,7 +96,28 @@ public TrafficReplayerTopLevel(IRootReplayerContext context,
shutdownFutureRef = new AtomicReference<>();
}

private static SslContext loadSslContext(URI serverUri, boolean allowInsecureConnections) throws SSLException {
public static ClientConnectionPool makeClientConnectionPool(URI serverUri, boolean allowInsecureConnections,
int numSendingThreads, Duration timeout)
throws SSLException {
return makeClientConnectionPool(serverUri, allowInsecureConnections, numSendingThreads, null, timeout);
}

public static ClientConnectionPool makeClientConnectionPool(URI serverUri,
boolean allowInsecureConnections,
int numSendingThreads,
String connectionPoolName,
Duration timeout) throws SSLException {
return new ClientConnectionPool(serverUri, loadSslContext(serverUri, allowInsecureConnections),
connectionPoolName != null ? connectionPoolName :
getTargetConnectionPoolName(targetConnectionPoolUniqueCounter.getAndIncrement()),
numSendingThreads, timeout);
}

public static String getTargetConnectionPoolName(int i) {
return TARGET_CONNECTION_POOL_NAME + (i == 0 ? "" : Integer.toString(i));
}

public static SslContext loadSslContext(URI serverUri, boolean allowInsecureConnections) throws SSLException {
if (serverUri.getScheme().equalsIgnoreCase("https")) {
var sslContextBuilder = SslContextBuilder.forClient();
if (allowInsecureConnections) {
Expand All @@ -181,10 +129,6 @@ private static SslContext loadSslContext(URI serverUri, boolean allowInsecureCon
}
}

private static String getTargetConnectionPoolName(int i) {
return TARGET_CONNECTION_POOL_NAME + (i == 0 ? "" : Integer.toString(i));
}

public void setupRunAndWaitForReplayToFinish(Duration observedPacketConnectionTimeout,
BlockingTrafficSource trafficSource,
TimeShifter timeShifter,
Expand Down
Loading

0 comments on commit 3bdecac

Please sign in to comment.