From b89109854843e7d9e8a241e26978e6c845e6491c Mon Sep 17 00:00:00 2001 From: Peter Nied Date: Fri, 6 Oct 2023 21:20:02 +0000 Subject: [PATCH] Sharable channel handlers Signed-off-by: Peter Nied --- .../security/ResourceFocusedTests.java | 44 ++++++++++++++----- .../SecurityNonSslHttpServerTransport.java | 18 ++++---- .../netty/Netty4ConditionalDecompressor.java | 2 + .../Netty4HttpRequestHeaderVerifier.java | 2 + .../SecuritySSLNettyHttpServerTransport.java | 10 +++-- 5 files changed, 52 insertions(+), 24 deletions(-) diff --git a/src/integrationTest/java/org/opensearch/security/ResourceFocusedTests.java b/src/integrationTest/java/org/opensearch/security/ResourceFocusedTests.java index 0826e3a5e4..2ab19d15ce 100644 --- a/src/integrationTest/java/org/opensearch/security/ResourceFocusedTests.java +++ b/src/integrationTest/java/org/opensearch/security/ResourceFocusedTests.java @@ -25,7 +25,6 @@ import org.apache.hc.core5.http.ContentType; import org.apache.hc.core5.http.io.entity.ByteArrayEntity; import org.apache.hc.core5.http.message.BasicHeader; -import org.apache.http.HttpHeaders; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -113,19 +112,28 @@ public void testUnauthenticatedTonsSmall() { runResourceTest(size, requestPath, parrallelism, totalNumberOfRequests, statsPrinter); } - private Long runResourceTest(final RequestBodySize size, final String requestPath, final int parrallelism, final int totalNumberOfRequests, final boolean statsPrinter) { + private Long runResourceTest( + final RequestBodySize size, + final String requestPath, + final int parrallelism, + final int totalNumberOfRequests, + final boolean statsPrinter + ) { final byte[] compressedRequestBody = createCompressedRequestBody(size); try (final TestRestClient client = cluster.getRestClient(new BasicHeader("Content-Encoding", "gzip"))) { - if (statsPrinter) { printStats(); } + if (statsPrinter) { + printStats(); + } final HttpPost post = new HttpPost(client.getHttpServerUri() + requestPath); post.setEntity(new ByteArrayEntity(compressedRequestBody, ContentType.APPLICATION_JSON)); final ForkJoinPool forkJoinPool = new ForkJoinPool(parrallelism); - final List> waitingOn = IntStream.rangeClosed(1, totalNumberOfRequests).boxed().map( i -> - CompletableFuture.runAsync(() -> client.executeRequest(post), forkJoinPool) - ).collect(Collectors.toList()); + final List> waitingOn = IntStream.rangeClosed(1, totalNumberOfRequests) + .boxed() + .map(i -> CompletableFuture.runAsync(() -> client.executeRequest(post), forkJoinPool)) + .collect(Collectors.toList()); Supplier getCount = () -> waitingOn.stream().filter(cf -> cf.isDone() && !cf.isCompletedExceptionally()).count(); CompletableFuture statPrinter = statsPrinter ? CompletableFuture.runAsync(() -> { @@ -140,7 +148,6 @@ private Long runResourceTest(final RequestBodySize size, final String requestPat } }, forkJoinPool) : CompletableFuture.completedFuture(null); - final CompletableFuture allOfThem = CompletableFuture.allOf(waitingOn.toArray(new CompletableFuture[0])); try { @@ -162,7 +169,9 @@ static enum RequestBodySize { Small(1), Medium(1_000), XLarge(1_000_000); + public final int elementCount; + private RequestBodySize(final int elementCount) { this.elementCount = elementCount; } @@ -172,14 +181,16 @@ private byte[] createCompressedRequestBody(final RequestBodySize size) { final int repeatCount = size.elementCount; final String prefix = "{ \"items\": ["; final String repeatedElement = IntStream.range(0, 20) - .mapToObj(n -> ('a' + n)+"") + .mapToObj(n -> ('a' + n) + "") .map(n -> '"' + n + '"' + ": 123") .collect(Collectors.joining(",", "{", "}")); final String postfix = "]}"; long uncompressedBytesSize = 0; - try (final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - final GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream)) { + try ( + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + final GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream) + ) { final byte[] prefixBytes = prefix.getBytes(StandardCharsets.UTF_8); final byte[] repeatedElementBytes = repeatedElement.getBytes(StandardCharsets.UTF_8); @@ -196,7 +207,15 @@ private byte[] createCompressedRequestBody(final RequestBodySize size) { gzipOutputStream.finish(); final byte[] compressedRequestBody = byteArrayOutputStream.toByteArray(); - System.out.println("^^^" + String.format("Original size was %,d bytes, compressed to %,d bytes, ratio %,.2f", uncompressedBytesSize, compressedRequestBody.length, ((double)uncompressedBytesSize / compressedRequestBody.length))); + System.out.println( + "^^^" + + String.format( + "Original size was %,d bytes, compressed to %,d bytes, ratio %,.2f", + uncompressedBytesSize, + compressedRequestBody.length, + ((double) uncompressedBytesSize / compressedRequestBody.length) + ) + ); return compressedRequestBody; } catch (final IOException ioe) { throw new RuntimeException(ioe); @@ -227,6 +246,7 @@ private void printMemoryPools() { System.out.println(" " + memoryPool.getName() + " USED: " + usage.getUsed() + " MAX: " + usage.getMax()); } } + private void printGCPools() { List garbageCollectors = ManagementFactory.getGarbageCollectorMXBeans(); for (GarbageCollectorMXBean garbageCollector : garbageCollectors) { @@ -234,4 +254,4 @@ private void printGCPools() { } } -} \ No newline at end of file +} diff --git a/src/main/java/org/opensearch/security/http/SecurityNonSslHttpServerTransport.java b/src/main/java/org/opensearch/security/http/SecurityNonSslHttpServerTransport.java index b00e73a7b3..71586a2dff 100644 --- a/src/main/java/org/opensearch/security/http/SecurityNonSslHttpServerTransport.java +++ b/src/main/java/org/opensearch/security/http/SecurityNonSslHttpServerTransport.java @@ -46,7 +46,8 @@ public class SecurityNonSslHttpServerTransport extends Netty4HttpServerTransport { - private SecurityRestFilter restFilter; + private final ChannelInboundHandlerAdapter headerVerifier; + private final ChannelInboundHandlerAdapter conditionalDecompressor; public SecurityNonSslHttpServerTransport( final Settings settings, @@ -55,10 +56,10 @@ public SecurityNonSslHttpServerTransport( final ThreadPool threadPool, final NamedXContentRegistry namedXContentRegistry, final Dispatcher dispatcher, - ClusterSettings clusterSettings, - SharedGroupFactory sharedGroupFactory, - Tracer tracer, - SecurityRestFilter restFilter + final ClusterSettings clusterSettings, + final SharedGroupFactory sharedGroupFactory, + final Tracer tracer, + final SecurityRestFilter restFilter ) { super( settings, @@ -71,7 +72,8 @@ public SecurityNonSslHttpServerTransport( sharedGroupFactory, tracer ); - this.restFilter = restFilter; + headerVerifier = new Netty4HttpRequestHeaderVerifier(restFilter, threadPool, settings); + conditionalDecompressor = new Netty4ConditionalDecompressor(); } @Override @@ -93,11 +95,11 @@ protected void initChannel(Channel ch) throws Exception { @Override protected ChannelInboundHandlerAdapter createHeaderVerifier() { - return new Netty4HttpRequestHeaderVerifier(restFilter, threadPool, settings); + return headerVerifier; } @Override protected ChannelInboundHandlerAdapter createDecompressor() { - return new Netty4ConditionalDecompressor(); + return conditionalDecompressor; } } diff --git a/src/main/java/org/opensearch/security/ssl/http/netty/Netty4ConditionalDecompressor.java b/src/main/java/org/opensearch/security/ssl/http/netty/Netty4ConditionalDecompressor.java index 1eec49add0..c8059fad5d 100644 --- a/src/main/java/org/opensearch/security/ssl/http/netty/Netty4ConditionalDecompressor.java +++ b/src/main/java/org/opensearch/security/ssl/http/netty/Netty4ConditionalDecompressor.java @@ -8,6 +8,7 @@ package org.opensearch.security.ssl.http.netty; +import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.http.HttpContentDecompressor; @@ -16,6 +17,7 @@ import org.opensearch.security.filter.NettyAttribute; +@Sharable public class Netty4ConditionalDecompressor extends HttpContentDecompressor { @Override diff --git a/src/main/java/org/opensearch/security/ssl/http/netty/Netty4HttpRequestHeaderVerifier.java b/src/main/java/org/opensearch/security/ssl/http/netty/Netty4HttpRequestHeaderVerifier.java index 2563e0ff9b..b7c0aedebd 100644 --- a/src/main/java/org/opensearch/security/ssl/http/netty/Netty4HttpRequestHeaderVerifier.java +++ b/src/main/java/org/opensearch/security/ssl/http/netty/Netty4HttpRequestHeaderVerifier.java @@ -17,6 +17,7 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.common.util.concurrent.ThreadContext; +import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import org.opensearch.http.netty4.Netty4HttpChannel; import org.opensearch.security.filter.SecurityRequestChannel; @@ -42,6 +43,7 @@ import static org.opensearch.security.http.SecurityHttpServerTransport.SHOULD_DECOMPRESS; import static org.opensearch.security.http.SecurityHttpServerTransport.IS_AUTHENTICATED; +@Sharable public class Netty4HttpRequestHeaderVerifier extends SimpleChannelInboundHandler { private final SecurityRestFilter restFilter; private final ThreadPool threadPool; diff --git a/src/main/java/org/opensearch/security/ssl/http/netty/SecuritySSLNettyHttpServerTransport.java b/src/main/java/org/opensearch/security/ssl/http/netty/SecuritySSLNettyHttpServerTransport.java index 8ed3bdd59f..41e44ce371 100644 --- a/src/main/java/org/opensearch/security/ssl/http/netty/SecuritySSLNettyHttpServerTransport.java +++ b/src/main/java/org/opensearch/security/ssl/http/netty/SecuritySSLNettyHttpServerTransport.java @@ -48,7 +48,8 @@ public class SecuritySSLNettyHttpServerTransport extends Netty4HttpServerTranspo private static final Logger logger = LogManager.getLogger(SecuritySSLNettyHttpServerTransport.class); private final SecurityKeyStore sks; private final SslExceptionHandler errorHandler; - private final SecurityRestFilter restFilter; + private final ChannelInboundHandlerAdapter headerVerifier; + private final ChannelInboundHandlerAdapter conditionalDecompressor; public SecuritySSLNettyHttpServerTransport( final Settings settings, @@ -77,7 +78,8 @@ public SecuritySSLNettyHttpServerTransport( ); this.sks = sks; this.errorHandler = errorHandler; - this.restFilter = restFilter; + headerVerifier = new Netty4HttpRequestHeaderVerifier(restFilter, threadPool, settings); + conditionalDecompressor = new Netty4ConditionalDecompressor(); } @Override @@ -157,11 +159,11 @@ protected void configurePipeline(Channel ch) { @Override protected ChannelInboundHandlerAdapter createHeaderVerifier() { - return new Netty4HttpRequestHeaderVerifier(restFilter, threadPool, settings); + return headerVerifier; } @Override protected ChannelInboundHandlerAdapter createDecompressor() { - return new Netty4ConditionalDecompressor(); + return conditionalDecompressor; } }