Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Favor ByteBuf Duplicates, NettyJsonContentAuthSigner and StreamChannelConnectionCaptureSerializer Refactoring #615

Merged
merged 18 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
66f9e58
Prefer duplicate to slices
AndreKurait Apr 26, 2024
3d28553
Simplify ParsedHttpMessagesAsDicts Base64 Encoding
AndreKurait Apr 26, 2024
4bf9a0f
Fix release in NettyJsonContentCompressor
AndreKurait Apr 26, 2024
e2d7aaf
Refactor NettyJsonContentAuthSigner for safer operation
AndreKurait Apr 26, 2024
05271b2
Remove reader index modification in HttpJsonTransformingConsumer
AndreKurait Apr 26, 2024
daf2b23
Add duplicate before each byteBuf read
AndreKurait Apr 26, 2024
7943c26
Add StreamChannelConnectionCaptureSerializerTest for buffer with fewe…
AndreKurait Apr 29, 2024
5afd1a0
Fix StreamChannelConnectionCaptureSerializer behavior for nioBuffers …
AndreKurait Apr 30, 2024
bcc1c97
Log when NettyJsonContentAuthSigner fails to sign message
AndreKurait May 1, 2024
578cb0b
Fill headers before body in ParsedHttpMessagesAsDicts
AndreKurait May 1, 2024
171dea0
Add StreamChannelConnectionCaptureSerializerTests around segmentedObs…
AndreKurait May 2, 2024
9f78caf
Fix StreamChannelConnectionCaptureSerializer edge case around segment…
AndreKurait May 2, 2024
8bc47ca
Fix ResultsToLogsConsumerTest with body at end of tuple output
AndreKurait May 2, 2024
7fbd7a5
Add LeakDetection to StreamChannelConnectionCaptureSerializerTest
AndreKurait May 2, 2024
1b66165
Remove memory leaks in ConditionallyReliableLoggingHttpHandlerTest
AndreKurait May 2, 2024
0325b24
Add PessimisticallyCalculateMaxWritableSpace
AndreKurait May 3, 2024
61dbc8f
Address PR comments
AndreKurait May 3, 2024
7b270fa
Simplify TestUtils getStringFromContent
AndreKurait May 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions TrafficCapture/captureOffloader/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {
implementation group: 'org.projectlombok', name: 'lombok', version: '1.18.26'
implementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.7'

testImplementation testFixtures(project(path: ':testUtilities'))
testImplementation project(':coreUtilities')
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.20.0'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.20.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Timestamp;
import io.netty.buffer.ByteBuf;
import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;

import java.nio.ByteBuffer;
import java.time.Instant;

/**
Expand All @@ -28,25 +28,41 @@ public static int getSizeOfTimestamp(Instant t) {

/**
* This function calculates the maximum bytes that would be needed to store a [Read/Write]SegmentObservation, if constructed
* from the given ByteBuffer and associated segment field numbers and values passed in. This estimate is essentially
* the max size needed in the CodedOutputStream to store the provided ByteBuffer data and its associated TrafficStream
* from the given ByteBuf and associated segment field numbers and values passed in. This estimate is essentially
* the max size needed in the CodedOutputStream to store the provided ByteBuf data and its associated TrafficStream
* overhead. The actual required bytes could be marginally smaller.
*/
public static int maxBytesNeededForASegmentedObservation(Instant timestamp, int observationFieldNumber,
int dataFieldNumber, ByteBuffer buffer) {
int dataFieldNumber, ByteBuf buf) {
// Timestamp required bytes
int tsContentSize = getSizeOfTimestamp(timestamp);
int tsTagAndContentSize = CodedOutputStream.computeInt32Size(TrafficObservation.TS_FIELD_NUMBER, tsContentSize) + tsContentSize;

// Capture required bytes
int dataSize = CodedOutputStream.computeByteBufferSize(dataFieldNumber, buffer);
int dataSize = computeByteBufRemainingSize(dataFieldNumber, buf);
int captureTagAndContentSize = CodedOutputStream.computeInt32Size(observationFieldNumber, dataSize) + dataSize;

// Observation and closing index required bytes
return bytesNeededForObservationAndClosingIndex(tsTagAndContentSize + captureTagAndContentSize,
Integer.MAX_VALUE);
}

/**
* This function determines the number of bytes needed to write the readable bytes in a byteBuf and its tag.
*/
public static int computeByteBufRemainingSize(int fieldNumber, ByteBuf buf) {
return CodedOutputStream.computeTagSize(fieldNumber) + computeByteBufRemainingSizeNoTag(buf);
}

/**
* This function determines the number of bytes needed to write the readable bytes in a byteBuf.
*/
public static int computeByteBufRemainingSizeNoTag(ByteBuf buf) {
int bufSize = buf.readableBytes();
return CodedOutputStream.computeUInt32SizeNoTag(bufSize) + bufSize;
}


/**
* This function determines the number of bytes needed to store a TrafficObservation and a closing index for a
* TrafficStream, from the provided input.
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package org.opensearch.migrations.trafficcapture;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.Instant;

class CodedOutputStreamSizeUtilTest {

@Test
void testGetSizeOfTimestamp() {
// Timestamp with only seconds (no explicit nanoseconds)
Instant timestampSecondsOnly = Instant.parse("2024-01-01T00:00:00Z");
int sizeSecondsOnly = CodedOutputStreamSizeUtil.getSizeOfTimestamp(timestampSecondsOnly);
Assertions.assertEquals( 6, sizeSecondsOnly);

// Timestamp with both seconds and nanoseconds
Instant timestampWithNanos = Instant.parse("2024-12-31T23:59:59.123456789Z");
int sizeWithNanos = CodedOutputStreamSizeUtil.getSizeOfTimestamp(timestampWithNanos);
Assertions.assertEquals( 11, sizeWithNanos);
}

@Test
void testMaxBytesNeededForASegmentedObservation() {
Instant timestamp = Instant.parse("2024-01-01T00:00:00Z");
ByteBuf buf = Unpooled.buffer(100);
buf.writeCharSequence("Test", StandardCharsets.UTF_8);
int result = CodedOutputStreamSizeUtil.maxBytesNeededForASegmentedObservation(timestamp, 1, 2, buf);
Assertions.assertEquals(24, result);
}

@Test
void test_computeByteBufRemainingSize_emptyBufWithCapacity() {
var buf = Unpooled.directBuffer(100);
int result = CodedOutputStreamSizeUtil.computeByteBufRemainingSize(2, buf);
Assertions.assertEquals(2, result);
}

@Test
void test_computeByteBufRemainingSize() {
var buf = Unpooled.directBuffer();
buf.writeCharSequence("hello_test", StandardCharsets.UTF_8);
int result = CodedOutputStreamSizeUtil.computeByteBufRemainingSize(2, buf);
Assertions.assertEquals(12, result);
}

@Test
void test_computeByteBufRemainingSize_largeBuf() {
var buf = Unpooled.directBuffer();
buf.writeCharSequence("1234567890".repeat(100000), StandardCharsets.UTF_8);
int result = CodedOutputStreamSizeUtil.computeByteBufRemainingSize(2, buf);
Assertions.assertEquals(1000004, result);
}


@Test
void test_computeByteBufRemainingSize_ByteBufAtCapacity() {
ByteBuf buf = Unpooled.buffer(4);
buf.writeCharSequence("Test", StandardCharsets.UTF_8);
int result = CodedOutputStreamSizeUtil.computeByteBufRemainingSize(2, buf);
Assertions.assertEquals(6, result);
}

@Test
void test_computeByteBufRemainingSize_EmptyByteBuf() {
ByteBuf buf = Unpooled.buffer(0, 0);
int result = CodedOutputStreamSizeUtil.computeByteBufRemainingSize(2, buf);
Assertions.assertEquals(2, result);
}

@Test
void testBytesNeededForObservationAndClosingIndex() {
int observationContentSize = 50;
int numberOfTrafficStreamsSoFar = 10;

int result = CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numberOfTrafficStreamsSoFar);
Assertions.assertEquals(54, result);
}

@Test
void testBytesNeededForObservationAndClosingIndex_WithZeroContent() {
int observationContentSize = 0;
int numberOfTrafficStreamsSoFar = 0;

int result = CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numberOfTrafficStreamsSoFar);
Assertions.assertEquals(4, result);
}

@Test
void testBytesNeededForObservationAndClosingIndex_VariousIndices() {
int observationContentSize = 20;

// Test with increasing indices to verify scaling of index size
int[] indices = new int[]{1, 1000, 100000};
int[] expectedResults = new int[]{24, 25, 26};

for (int i = 0; i < indices.length; i++) {
int result = CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, indices[i]);
Assertions.assertEquals(expectedResults[i], result);
}
}

}
Loading