Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replayer instrumentation #475

Merged
merged 100 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from 92 commits
Commits
Show all changes
100 commits
Select commit Hold shift + click to select a range
a4caca7
Move addMetricsIfPresent into the metrics builder as a first class me…
gregschohn Nov 16, 2023
c026588
WIP to play with OpenTelemetry metric instruments and tracer spans.
gregschohn Nov 17, 2023
f3c0077
Get gradle files and docker-compose in order to support otlp exports …
gregschohn Nov 27, 2023
7fb8e2e
WIP
gregschohn Nov 27, 2023
a8ae3d1
Restore the docker-compose single-node/multi-node split docker-compos…
gregschohn Nov 28, 2023
da9d36b
Add labels to each metric instrument so that multiple values can be p…
gregschohn Nov 28, 2023
06618ca
Move the MetricsClosure into its own class and stop stuffing the metr…
gregschohn Nov 28, 2023
aba1aab
WIP - Cleanup + get Jaeger to work by switching the endpoint. Also i…
gregschohn Nov 28, 2023
900bc6d
Start moving away from ThreadLocal and 'current contexts' and toward …
gregschohn Nov 29, 2023
3746a8e
Get span parenting to work.
gregschohn Nov 30, 2023
e0e7bf1
Merge branch 'main' into DoNotMerge_MoreMetrics
gregschohn Nov 30, 2023
4b43262
Attempt to fix a failing unit test.
gregschohn Nov 30, 2023
322e12f
Refactor. Couple name changes, class package changes, and moved IRep…
gregschohn Nov 30, 2023
723bf77
Bundle all of the offloader spans with the netty handler spans.
gregschohn Dec 1, 2023
15a1705
Improve the tracing story for the capture proxy.
gregschohn Dec 2, 2023
8a6f52a
Tracing change: Flatten the flush span and just record it as 'blocked'.
gregschohn Dec 2, 2023
c50e01d
Minor cleanup - stop setting the namespace or trying to change in a p…
gregschohn Dec 4, 2023
17c517d
Start instrumenting the replayer with more contexts so that traces an…
gregschohn Dec 4, 2023
6288844
Double down on using Context objects in lieu of String labels and fix…
gregschohn Dec 11, 2023
09e849c
Merge branch 'FixKafkaResume' into OtelMetricsAndTraces
gregschohn Dec 11, 2023
9cf2540
Merge branch 'main' into OtelMetricsAndTraces
gregschohn Dec 12, 2023
c14da6a
Update the Http Logging Handler to suppress response packet captures …
gregschohn Dec 12, 2023
7de4009
File rename since the LoggingHttpRequest handler now handles both req…
gregschohn Dec 12, 2023
a5bfc7d
Shuffling lots of details of Contexts and the relationships between d…
gregschohn Dec 15, 2023
3d60106
Lots of refactoring to get a couple more test cases to pass.
gregschohn Dec 15, 2023
ad6bd13
Test fixes
gregschohn Dec 15, 2023
07ae016
Begin to cleanup endspans for some of the contexts. Lots of bugs rem…
gregschohn Dec 16, 2023
ccb517c
More work to get context chains to work better together.
gregschohn Dec 17, 2023
8bda2e3
Two critical bugfixes around handling close observations that were di…
gregschohn Dec 17, 2023
d9df3fa
Fix some test code where the nodeId and connectionId got reversed, ca…
gregschohn Dec 18, 2023
ab3dfb4
Extra guards to try to make tests more reliable, but one of the FullT…
gregschohn Dec 19, 2023
273c5aa
More test fixes, including fixing a regression that I had caused in a…
gregschohn Dec 19, 2023
e0167f5
Fix a race condition with commitKafkaKey.
gregschohn Dec 19, 2023
3d81ad8
Two changes to kafka interactions. Add trace spans for traffic sourc…
gregschohn Dec 20, 2023
ae45a6a
Extract an IInstrumentationAttributes interface from IScopedInstrumen…
gregschohn Dec 20, 2023
195d0ba
Checkpoint/WIP - More spans across the board, specifically through th…
gregschohn Dec 21, 2023
bef9944
Test bugfix. toString() wasn't threadsafe. Now it is.
gregschohn Dec 22, 2023
8b50c89
Refactoring and code consolidation around context management.
gregschohn Dec 22, 2023
0e5fe09
Refactoring. Which classes emit metrics, all scopes have names, and …
gregschohn Dec 22, 2023
613a504
Refactor TestContext so that it doesn't use statics and allows caller…
gregschohn Dec 23, 2023
7df0dcc
Remove a hardcoded path to my local directory
gregschohn Dec 23, 2023
de0d482
Fixed bugs in trace management and forced a lot more test code to tak…
gregschohn Dec 23, 2023
72b1ca8
Add another scheduled span before the request is sent.
gregschohn Dec 23, 2023
6043eed
Move all span names into virtual interface functions so that they can…
gregschohn Dec 26, 2023
37b99eb
Pass more contexts, make contexts able to express more metrics, and e…
gregschohn Jan 2, 2024
6684486
Minor bugfixes that make a huge difference. Fix a broken unit test a…
gregschohn Jan 3, 2024
7bf4388
Some refactoring to increase the typesafety and to support greater co…
gregschohn Jan 3, 2024
8ef0376
Fix mend security issue for json-path CVE by updating opensearch-secu…
gregschohn Jan 4, 2024
37ae548
Remove zipkin as a tracing sink.
gregschohn Jan 4, 2024
1172912
Make attribute name filtering more generic and fix a bug in negation …
gregschohn Jan 5, 2024
22296b7
Minor tweaks to the otel collector (including renaming from 'demo') a…
gregschohn Jan 8, 2024
d1b237a
Set the aggregation temporality to delta rather than cumulative.
gregschohn Jan 8, 2024
fdd8141
Wrap all metric emissions within the context's span so that the metri…
gregschohn Jan 8, 2024
490521d
In progress changes. I'm trying to track down a regression and want …
gregschohn Jan 9, 2024
f199e98
In-progress checkpoint (code won't compile). Setting up separate met…
gregschohn Jan 10, 2024
e110540
Another in-progress checkpoint (still won't compile) where I'm moving…
gregschohn Jan 10, 2024
156ae72
Another checkpoint that still doesn't compile, but less files (I thin…
gregschohn Jan 11, 2024
5dc32d9
Stop passing the Root telemetry scope as a generic parameter to all o…
gregschohn Jan 11, 2024
320e9d8
Working on updating proxy code to get everything to compile.
gregschohn Jan 12, 2024
9601a68
More refactoring, still doesn't all compile, but most of it does (top…
gregschohn Jan 12, 2024
5f6bb3f
Fix the last of the compilation errors though tests are failing still.
gregschohn Jan 13, 2024
ccc0e2a
Bugfixes and test fixes to get all of the unit tests to pass.
gregschohn Jan 14, 2024
0744424
Bugfixes. Stop metering double events in a couple spots and fix a co…
gregschohn Jan 15, 2024
82f8ebb
Merge branch 'main' into ReplayerInstrumentation
gregschohn Jan 15, 2024
35a9185
Upgrade otel libraries to 1.34.1 from 1.32 and add the enable_open_me…
gregschohn Jan 16, 2024
0e8379d
Fix a bug where the current scope's attributes weren't being added in…
gregschohn Jan 16, 2024
a076bc3
Add a TestContext for every replayer test via inheritance on the Test…
gregschohn Jan 18, 2024
d3ee4f1
Change how MetricInstruments classes are instantiated.
gregschohn Jan 19, 2024
54c5e27
Build fix - When refactoring to use TestContexts more globally, a tes…
gregschohn Jan 19, 2024
0ce6694
Spin up a grafana container in the docker solution with simple creden…
gregschohn Jan 19, 2024
c8674eb
Start to get Source/Target comparison metrics in place and more refac…
gregschohn Jan 21, 2024
ffcc09b
Minor cleanup on exception tracking
gregschohn Jan 22, 2024
dd89336
Bugfix - a class was inheriting from the Connection context's MetricI…
gregschohn Jan 22, 2024
a15d08a
Cleanup build.gradle files' open-telemetry dependencies. Embrace ote…
gregschohn Jan 22, 2024
5c454ca
Partial checkin to delete dead code and clean up imports and style is…
gregschohn Jan 22, 2024
bf8ea86
Addressing PR Feedback with some localized cleanups
gregschohn Jan 23, 2024
b641c5a
aws cli wasn't functional within my arm64 container because the docke…
gregschohn Jan 24, 2024
cd34b32
Rework otel-collector container packaging.
gregschohn Jan 26, 2024
a4173c0
PR feedback including:
gregschohn Jan 27, 2024
2072f69
Change path from otelcol to otelCollector and enable the collector an…
gregschohn Jan 27, 2024
be689b5
Split the implementations of fillAttributes into two.
gregschohn Jan 28, 2024
607ff05
Fix the dependencies for logging leaves and add 'processors' and 'rec…
gregschohn Jan 28, 2024
d480ed8
Setting the docker command for the otel-collector service to use the …
gregschohn Jan 28, 2024
59db42f
Set the permissions for the otel container to write to cloudwatch and…
gregschohn Jan 28, 2024
76c8c31
Minor cleanup
gregschohn Jan 31, 2024
2cc67fd
Fix the runTestBenchmarks script to work when the endpoint uses http …
gregschohn Feb 1, 2024
6e70d1a
Merge branch 'main' into ReplayerInstrumentation
gregschohn Feb 1, 2024
9425ab6
Aesthetic formatting changes
gregschohn Feb 3, 2024
deb19a4
IInstrumentationAttributes no longer has scope related functionality.…
gregschohn Feb 4, 2024
a5f947e
Merge branch 'main' into ReplayerInstrumentation
gregschohn Feb 5, 2024
9a31210
README documentation for the Instrumentation + some cleanup.
gregschohn Feb 6, 2024
805e13b
When the first bucket size is <=0 for the CommonScopedMetricInstrumen…
gregschohn Feb 6, 2024
9aa3432
Add tracing and metrics for replayer sockets as they're created and c…
gregschohn Feb 7, 2024
48a45ab
Bugfix, test fix, lint fix.
gregschohn Feb 7, 2024
16a9010
Fix an edge case where a socketChannel might not have been created ev…
gregschohn Feb 7, 2024
4b102f7
Handle SocketContexts as first class contexts rather than trying to i…
gregschohn Feb 8, 2024
1cb8927
Fix an issue with when to close the SocketContext and some memory lea…
gregschohn Feb 8, 2024
441ca40
Tie off more loose ends for memory leaks during test runs.
gregschohn Feb 8, 2024
dc76239
Test fixes + make scheduled contexts use System.nanotime instead of I…
gregschohn Feb 9, 2024
8a875b3
Set the x-ray exporter attribute index_all_attributes=true so that at…
gregschohn Feb 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions TrafficCapture/captureKafkaOffloader/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,22 @@ repositories {
}

dependencies {
api 'io.netty:netty-buffer:4.1.100.Final'
implementation project(':captureOffloader')
implementation project(':coreUtilities')
implementation 'org.projectlombok:lombok:1.18.26'
implementation 'com.google.protobuf:protobuf-java:3.22.2'
implementation 'org.apache.kafka:kafka-clients:3.6.0'
implementation 'software.amazon.msk:aws-msk-iam-auth:1.1.9'
implementation 'org.slf4j:slf4j-api:2.0.7'
implementation group: 'com.google.protobuf', name:'protobuf-java', version:'3.22.2'
api group:'io.netty', name:'netty-buffer', version: '4.1.100.Final'
implementation group: 'org.projectlombok', name:'lombok', version:'1.18.26'
implementation group: 'org.apache.kafka', name:'kafka-clients', version:'3.6.0'
implementation group: 'org.slf4j', name:'slf4j-api', version:'2.0.7'
implementation group: 'software.amazon.msk', name:'aws-msk-iam-auth', version:'1.1.9'

testImplementation project(':captureProtobufs')
testImplementation 'org.mockito:mockito-core:4.6.1'
testImplementation 'org.mockito:mockito-junit-jupiter:4.6.1'
testImplementation testFixtures(project(path: ':coreUtilities'))
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.20.0'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.20.0'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-slf4j2-impl', version: '2.20.0'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '4.6.1'
testImplementation group: 'org.mockito', name: 'mockito-junit-jupiter', version: '4.6.1'
testImplementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.7'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,22 @@
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.opensearch.migrations.coreutils.MetricsAttributeKey;
import org.opensearch.migrations.coreutils.MetricsEvent;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.opensearch.migrations.coreutils.MetricsLogger;
import org.opensearch.migrations.tracing.commoncontexts.IConnectionContext;
import org.opensearch.migrations.trafficcapture.CodedOutputStreamHolder;
import org.opensearch.migrations.trafficcapture.IChannelConnectionCaptureSerializer;
import org.opensearch.migrations.trafficcapture.IConnectionCaptureFactory;
import org.opensearch.migrations.trafficcapture.OrderedStreamLifecyleManager;
import org.opensearch.migrations.trafficcapture.StreamChannelConnectionCaptureSerializer;
import org.opensearch.migrations.coreutils.MetricsLogger;
import org.opensearch.migrations.trafficcapture.kafkaoffloader.tracing.IRootKafkaOffloaderContext;
import org.opensearch.migrations.trafficcapture.kafkaoffloader.tracing.KafkaRecordContext;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;

Expand All @@ -32,27 +37,31 @@
// general Kafka message overhead
public static final int KAFKA_MESSAGE_OVERHEAD_BYTES = 500;

private final IRootKafkaOffloaderContext rootScope;
private final String nodeId;
// Potential future optimization here to use a direct buffer (e.g. nio) instead of byte array
private final Producer<String, byte[]> producer;
private final String topicNameForTraffic;
private final int bufferSize;

public KafkaCaptureFactory(String nodeId, Producer<String, byte[]> producer,
public KafkaCaptureFactory(IRootKafkaOffloaderContext rootScope, String nodeId, Producer<String, byte[]> producer,
String topicNameForTraffic, int messageSize) {
this.rootScope = rootScope;
this.nodeId = nodeId;
this.producer = producer;
this.topicNameForTraffic = topicNameForTraffic;
this.bufferSize = messageSize - KAFKA_MESSAGE_OVERHEAD_BYTES;
}

public KafkaCaptureFactory(String nodeId, Producer<String, byte[]> producer, int messageSize) {
this(nodeId, producer, DEFAULT_TOPIC_NAME_FOR_TRAFFIC, messageSize);
public KafkaCaptureFactory(IRootKafkaOffloaderContext rootScope, String nodeId, Producer<String, byte[]> producer, int messageSize) {
this(rootScope, nodeId, producer, DEFAULT_TOPIC_NAME_FOR_TRAFFIC, messageSize);
}

@Override
public IChannelConnectionCaptureSerializer<RecordMetadata> createOffloader(String connectionId) {
return new StreamChannelConnectionCaptureSerializer<>(nodeId, connectionId, new StreamManager(connectionId));
public IChannelConnectionCaptureSerializer<RecordMetadata>
createOffloader(IConnectionContext ctx) {
return new StreamChannelConnectionCaptureSerializer<>(nodeId, ctx.getConnectionId(),
new StreamManager(rootScope, ctx));
}

@AllArgsConstructor
Expand All @@ -65,12 +74,22 @@
}
}

@AllArgsConstructor
class StreamManager extends OrderedStreamLifecyleManager<RecordMetadata> {
String connectionId;
IConnectionContext telemetryContext;
IRootKafkaOffloaderContext rootScope;
Instant startTime;

public StreamManager(IRootKafkaOffloaderContext rootScope, IConnectionContext ctx) {
// TODO - add https://opentelemetry.io/blog/2022/instrument-kafka-clients/
this.rootScope = rootScope;
this.telemetryContext = ctx;
this.startTime = Instant.now();
}

@Override
public CodedOutputStreamWrapper createStream() {
telemetryContext.getCurrentSpan().addEvent("streamCreated");

ByteBuffer bb = ByteBuffer.allocate(bufferSize);
return new CodedOutputStreamWrapper(CodedOutputStream.newInstance(bb), bb);
}
Expand All @@ -84,7 +103,7 @@
}
var osh = (CodedOutputStreamWrapper) outputStreamHolder;

// Structured context for MetricsLogger
final var connectionId = telemetryContext.getConnectionId();
try {
String recordId = String.format("%s.%d", connectionId, index);
var byteBuffer = osh.byteBuffer;
Expand All @@ -93,8 +112,12 @@
// Used to essentially wrap Future returned by Producer to CompletableFuture
var cf = new CompletableFuture<RecordMetadata>();
log.debug("Sending Kafka producer record: {} for topic: {}", recordId, topicNameForTraffic);

var flushContext = rootScope.createKafkaRecordContext(telemetryContext,
topicNameForTraffic, recordId, kafkaRecord.value().length);

// Async request to Kafka cluster
producer.send(kafkaRecord, handleProducerRecordSent(cf, recordId));
producer.send(kafkaRecord, handleProducerRecordSent(cf, recordId, flushContext));
metricsLogger.atSuccess(MetricsEvent.RECORD_SENT_TO_KAFKA)
.setAttribute(MetricsAttributeKey.CHANNEL_ID, connectionId)
.setAttribute(MetricsAttributeKey.TOPIC_NAME, topicNameForTraffic)
Expand All @@ -108,29 +131,35 @@
throw e;
}
}

/**
* The default KafkaProducer comes with built-in retry and error-handling logic that suits many cases. From the
* documentation here for retry: https://kafka.apache.org/35/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
* "If the request fails, the producer can automatically retry. The retries setting defaults to Integer.MAX_VALUE,
* and it's recommended to use delivery.timeout.ms to control retry behavior, instead of retries."
* <p>
* Apart from this the KafkaProducer has logic for deciding whether an error is transient and should be
* retried or not retried at all: https://kafka.apache.org/35/javadoc/org/apache/kafka/common/errors/RetriableException.html
* as well as basic retry backoff
*/
private Callback handleProducerRecordSent(CompletableFuture<RecordMetadata> cf, String recordId) {
return (metadata, exception) -> {
if (exception != null) {
log.error("Error sending producer record: {}", recordId, exception);
cf.completeExceptionally(exception);
} else {
log.debug("Kafka producer record: {} has finished sending for topic: {} and partition {}",
recordId, metadata.topic(), metadata.partition());
cf.complete(metadata);
}
};
}
}

/**
* The default KafkaProducer comes with built-in retry and error-handling logic that suits many cases. From the
* documentation here for retry: https://kafka.apache.org/35/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
* "If the request fails, the producer can automatically retry. The retries setting defaults to Integer.MAX_VALUE,
* and it's recommended to use delivery.timeout.ms to control retry behavior, instead of retries."
* <p>
* Apart from this the KafkaProducer has logic for deciding whether an error is transient and should be
* retried or not retried at all: https://kafka.apache.org/35/javadoc/org/apache/kafka/common/errors/RetriableException.html
* as well as basic retry backoff
*/
private Callback handleProducerRecordSent(CompletableFuture<RecordMetadata> cf, String recordId,
KafkaRecordContext flushContext) {
// Keep this out of the inner class because it is more unsafe to include it within
// the inner class since the inner class has context that shouldn't be used. This keeps
// that field out of scope.
return (metadata, exception) -> {
log.atInfo().setMessage(()->"kafka completed sending a record").log();
if (exception != null) {
flushContext.addException(exception);
log.error("Error sending producer record: {}", recordId, exception);
cf.completeExceptionally(exception);

Check warning on line 156 in TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java

View check run for this annotation

Codecov / codecov/patch

TrafficCapture/captureKafkaOffloader/src/main/java/org/opensearch/migrations/trafficcapture/kafkaoffloader/KafkaCaptureFactory.java#L154-L156

Added lines #L154 - L156 were not covered by tests
} else {
log.debug("Kafka producer record: {} has finished sending for topic: {} and partition {}",
recordId, metadata.topic(), metadata.partition());
cf.complete(metadata);
}
flushContext.close();
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.opensearch.migrations.trafficcapture.kafkaoffloader.tracing;

import org.opensearch.migrations.tracing.IInstrumentConstructor;
import org.opensearch.migrations.tracing.commoncontexts.IConnectionContext;

public interface IRootKafkaOffloaderContext extends IInstrumentConstructor {
KafkaRecordContext.MetricInstruments getKafkaOffloadingInstruments();

default KafkaRecordContext createKafkaRecordContext(IConnectionContext telemetryContext,
String topicNameForTraffic,
String recordId,
int length) {
return new KafkaRecordContext(this, telemetryContext, topicNameForTraffic, recordId, length);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.opensearch.migrations.trafficcapture.kafkaoffloader.tracing;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.metrics.Meter;
import lombok.Getter;
import lombok.NonNull;
import org.opensearch.migrations.tracing.BaseNestedSpanContext;
import org.opensearch.migrations.tracing.CommonScopedMetricInstruments;
import org.opensearch.migrations.tracing.IScopedInstrumentationAttributes;
import org.opensearch.migrations.tracing.commoncontexts.IConnectionContext;

public class KafkaRecordContext extends
BaseNestedSpanContext<IRootKafkaOffloaderContext, IConnectionContext>
implements IScopedInstrumentationAttributes {
public static final String ACTIVITY_NAME = "kafkaCommit";

static final AttributeKey<String> TOPIC_ATTR = AttributeKey.stringKey("topic");
static final AttributeKey<String> RECORD_ID_ATTR = AttributeKey.stringKey("recordId");
static final AttributeKey<Long> RECORD_SIZE_ATTR = AttributeKey.longKey("recordSize");

@Getter
public final String topic;
@Getter
public final String recordId;

public KafkaRecordContext(IRootKafkaOffloaderContext rootScope, IConnectionContext enclosingScope,
String topic, String recordId, int recordSize) {
super(rootScope, enclosingScope);
this.topic = topic;
this.recordId = recordId;
initializeSpan();
getCurrentSpan().setAttribute(RECORD_SIZE_ATTR, recordSize);
}

public static class MetricInstruments extends CommonScopedMetricInstruments {
private MetricInstruments(Meter meter, String activityName) {
super(meter, activityName);
}
}

public static @NonNull MetricInstruments makeMetrics(Meter meter) {
return new MetricInstruments(meter, ACTIVITY_NAME);
}

@Override
public @NonNull MetricInstruments getMetrics() {
return getRootInstrumentationScope().getKafkaOffloadingInstruments();
}

@Override
public String getActivityName() { return "stream_flush_called"; }

@Override
public AttributesBuilder fillAttributesForSpansBelow(AttributesBuilder builder) {
return super.fillAttributesForSpansBelow(builder)
.put(TOPIC_ATTR, getTopic())
.put(RECORD_ID_ATTR, getRecordId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.migrations.trafficcapture.IChannelConnectionCaptureSerializer;
import org.opensearch.migrations.trafficcapture.kafkaoffloader.tracing.TestRootKafkaOffloaderContext;
import org.opensearch.migrations.trafficcapture.tracing.ConnectionContext;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static org.mockito.ArgumentMatchers.any;
Expand All @@ -42,26 +42,24 @@ public class KafkaCaptureFactoryTest {
private String connectionId = "0242c0fffea82008-0000000a-00000003-62993a3207f92af6-9093ce33";
private String topic = "test_topic";


@Test
public void testLargeRequestIsWithinKafkaMessageSizeLimit() throws IOException, ExecutionException, InterruptedException {
final var referenceTimestamp = Instant.now(Clock.systemUTC());

int maxAllowableMessageSize = 1024*1024;
MockProducer<String, byte[]> producer = new MockProducer<>(true, new StringSerializer(), new ByteArraySerializer());
KafkaCaptureFactory kafkaCaptureFactory =
new KafkaCaptureFactory(TEST_NODE_ID_STRING, producer, maxAllowableMessageSize);
IChannelConnectionCaptureSerializer serializer = kafkaCaptureFactory.createOffloader(connectionId);

StringBuilder sb = new StringBuilder();
for (int i = 0; i < 15000; i++) {
sb.append("{ \"create\": { \"_index\": \"office-index\" } }\n{ \"title\": \"Malone's Cones\", \"year\": 2013 }\n");
}
Assertions.assertTrue(sb.toString().getBytes().length > 1024*1024);
byte[] fakeDataBytes = sb.toString().getBytes(StandardCharsets.UTF_8);
new KafkaCaptureFactory(TestRootKafkaOffloaderContext.noTracking(),
TEST_NODE_ID_STRING, producer, maxAllowableMessageSize);
var serializer = kafkaCaptureFactory.createOffloader(createCtx());

var testStr = "{ \"create\": { \"_index\": \"office-index\" } }\n{ \"title\": \"Malone's Cones\", \"year\": 2013 }\n"
.repeat(15000);
var fakeDataBytes = testStr.getBytes(StandardCharsets.UTF_8);
Assertions.assertTrue(fakeDataBytes.length > 1024*1024);
var bb = Unpooled.wrappedBuffer(fakeDataBytes);
serializer.addReadEvent(referenceTimestamp, bb);
CompletableFuture future = serializer.flushCommitAndResetStream(true);
var future = serializer.flushCommitAndResetStream(true);
future.get();
for (ProducerRecord<String, byte[]> record : producer.history()) {
int recordSize = calculateRecordSize(record, null);
Expand All @@ -73,6 +71,10 @@ public void testLargeRequestIsWithinKafkaMessageSizeLimit() throws IOException,
producer.close();
}

private static ConnectionContext createCtx() {
return new ConnectionContext(new TestRootKafkaOffloaderContext(), "test", "test");
}

/**
* This size calculation is based off the KafkaProducer client request size validation check done when Producer
* records are sent. This validation appears to be consistent for several versions now, here is a reference to
Expand All @@ -96,8 +98,9 @@ private int calculateRecordSize(ProducerRecord<String, byte[]> record, String re
@Test
public void testLinearOffloadingIsSuccessful() throws IOException {
KafkaCaptureFactory kafkaCaptureFactory =
new KafkaCaptureFactory(TEST_NODE_ID_STRING, mockProducer, 1024*1024);
IChannelConnectionCaptureSerializer offloader = kafkaCaptureFactory.createOffloader(connectionId);
new KafkaCaptureFactory(TestRootKafkaOffloaderContext.noTracking(),
TEST_NODE_ID_STRING, mockProducer, 1024*1024);
var offloader = kafkaCaptureFactory.createOffloader(createCtx());

List<Callback> recordSentCallbacks = new ArrayList<>(3);
when(mockProducer.send(any(), any())).thenAnswer(invocation -> {
Expand All @@ -112,11 +115,11 @@ public void testLinearOffloadingIsSuccessful() throws IOException {
byte[] fakeDataBytes = "FakeData".getBytes(StandardCharsets.UTF_8);
var bb = Unpooled.wrappedBuffer(fakeDataBytes);
offloader.addReadEvent(ts, bb);
CompletableFuture cf1 = offloader.flushCommitAndResetStream(false);
var cf1 = offloader.flushCommitAndResetStream(false);
offloader.addReadEvent(ts, bb);
CompletableFuture cf2 = offloader.flushCommitAndResetStream(false);
var cf2 = offloader.flushCommitAndResetStream(false);
offloader.addReadEvent(ts, bb);
CompletableFuture cf3 = offloader.flushCommitAndResetStream(false);
var cf3 = offloader.flushCommitAndResetStream(false);
bb.release();

Assertions.assertEquals(false, cf1.isDone());
Expand Down
Loading
Loading