Skip to content

Commit

Permalink
Merge pull request #615 from AndreKurait/ByteBufImprovements
Browse files Browse the repository at this point in the history
Favor ByteBuf Duplicates, NettyJsonContentAuthSigner and StreamChannelConnectionCaptureSerializer Refactoring
  • Loading branch information
AndreKurait authored May 3, 2024
2 parents d1b3e27 + 7b270fa commit d1771cc
Show file tree
Hide file tree
Showing 16 changed files with 643 additions and 130 deletions.
1 change: 1 addition & 0 deletions TrafficCapture/captureOffloader/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {
implementation group: 'org.projectlombok', name: 'lombok', version: '1.18.26'
implementation group: 'org.slf4j', name: 'slf4j-api', version: '2.0.7'

testImplementation testFixtures(project(path: ':testUtilities'))
testImplementation project(':coreUtilities')
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.20.0'
testImplementation group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.20.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Timestamp;
import io.netty.buffer.ByteBuf;
import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;

import java.nio.ByteBuffer;
import java.time.Instant;

/**
Expand All @@ -28,25 +28,41 @@ public static int getSizeOfTimestamp(Instant t) {

/**
* This function calculates the maximum bytes that would be needed to store a [Read/Write]SegmentObservation, if constructed
* from the given ByteBuffer and associated segment field numbers and values passed in. This estimate is essentially
* the max size needed in the CodedOutputStream to store the provided ByteBuffer data and its associated TrafficStream
* from the given ByteBuf and associated segment field numbers and values passed in. This estimate is essentially
* the max size needed in the CodedOutputStream to store the provided ByteBuf data and its associated TrafficStream
* overhead. The actual required bytes could be marginally smaller.
*/
public static int maxBytesNeededForASegmentedObservation(Instant timestamp, int observationFieldNumber,
int dataFieldNumber, ByteBuffer buffer) {
int dataFieldNumber, ByteBuf buf) {
// Timestamp required bytes
int tsContentSize = getSizeOfTimestamp(timestamp);
int tsTagAndContentSize = CodedOutputStream.computeInt32Size(TrafficObservation.TS_FIELD_NUMBER, tsContentSize) + tsContentSize;

// Capture required bytes
int dataSize = CodedOutputStream.computeByteBufferSize(dataFieldNumber, buffer);
int dataSize = computeByteBufRemainingSize(dataFieldNumber, buf);
int captureTagAndContentSize = CodedOutputStream.computeInt32Size(observationFieldNumber, dataSize) + dataSize;

// Observation and closing index required bytes
return bytesNeededForObservationAndClosingIndex(tsTagAndContentSize + captureTagAndContentSize,
Integer.MAX_VALUE);
}

/**
* This function determines the number of bytes needed to write the readable bytes in a byteBuf and its tag.
*/
public static int computeByteBufRemainingSize(int fieldNumber, ByteBuf buf) {
return CodedOutputStream.computeTagSize(fieldNumber) + computeByteBufRemainingSizeNoTag(buf);
}

/**
* This function determines the number of bytes needed to write the readable bytes in a byteBuf.
*/
public static int computeByteBufRemainingSizeNoTag(ByteBuf buf) {
int bufSize = buf.readableBytes();
return CodedOutputStream.computeUInt32SizeNoTag(bufSize) + bufSize;
}


/**
* This function determines the number of bytes needed to store a TrafficObservation and a closing index for a
* TrafficStream, from the provided input.
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package org.opensearch.migrations.trafficcapture;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.nio.charset.StandardCharsets;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.time.Instant;

class CodedOutputStreamSizeUtilTest {

@Test
void testGetSizeOfTimestamp() {
// Timestamp with only seconds (no explicit nanoseconds)
Instant timestampSecondsOnly = Instant.parse("2024-01-01T00:00:00Z");
int sizeSecondsOnly = CodedOutputStreamSizeUtil.getSizeOfTimestamp(timestampSecondsOnly);
Assertions.assertEquals( 6, sizeSecondsOnly);

// Timestamp with both seconds and nanoseconds
Instant timestampWithNanos = Instant.parse("2024-12-31T23:59:59.123456789Z");
int sizeWithNanos = CodedOutputStreamSizeUtil.getSizeOfTimestamp(timestampWithNanos);
Assertions.assertEquals( 11, sizeWithNanos);
}

@Test
void testMaxBytesNeededForASegmentedObservation() {
Instant timestamp = Instant.parse("2024-01-01T00:00:00Z");
ByteBuf buf = Unpooled.buffer(100);
buf.writeCharSequence("Test", StandardCharsets.UTF_8);
int result = CodedOutputStreamSizeUtil.maxBytesNeededForASegmentedObservation(timestamp, 1, 2, buf);
Assertions.assertEquals(24, result);
}

@Test
void test_computeByteBufRemainingSize_emptyBufWithCapacity() {
var buf = Unpooled.directBuffer(100);
int result = CodedOutputStreamSizeUtil.computeByteBufRemainingSize(2, buf);
Assertions.assertEquals(2, result);
}

@Test
void test_computeByteBufRemainingSize() {
var buf = Unpooled.directBuffer();
buf.writeCharSequence("hello_test", StandardCharsets.UTF_8);
int result = CodedOutputStreamSizeUtil.computeByteBufRemainingSize(2, buf);
Assertions.assertEquals(12, result);
}

@Test
void test_computeByteBufRemainingSize_largeBuf() {
var buf = Unpooled.directBuffer();
buf.writeCharSequence("1234567890".repeat(100000), StandardCharsets.UTF_8);
int result = CodedOutputStreamSizeUtil.computeByteBufRemainingSize(2, buf);
Assertions.assertEquals(1000004, result);
}


@Test
void test_computeByteBufRemainingSize_ByteBufAtCapacity() {
ByteBuf buf = Unpooled.buffer(4);
buf.writeCharSequence("Test", StandardCharsets.UTF_8);
int result = CodedOutputStreamSizeUtil.computeByteBufRemainingSize(2, buf);
Assertions.assertEquals(6, result);
}

@Test
void test_computeByteBufRemainingSize_EmptyByteBuf() {
ByteBuf buf = Unpooled.buffer(0, 0);
int result = CodedOutputStreamSizeUtil.computeByteBufRemainingSize(2, buf);
Assertions.assertEquals(2, result);
}

@Test
void testBytesNeededForObservationAndClosingIndex() {
int observationContentSize = 50;
int numberOfTrafficStreamsSoFar = 10;

int result = CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numberOfTrafficStreamsSoFar);
Assertions.assertEquals(54, result);
}

@Test
void testBytesNeededForObservationAndClosingIndex_WithZeroContent() {
int observationContentSize = 0;
int numberOfTrafficStreamsSoFar = 0;

int result = CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, numberOfTrafficStreamsSoFar);
Assertions.assertEquals(4, result);
}

@Test
void testBytesNeededForObservationAndClosingIndex_VariousIndices() {
int observationContentSize = 20;

// Test with increasing indices to verify scaling of index size
int[] indices = new int[]{1, 1000, 100000};
int[] expectedResults = new int[]{24, 25, 26};

for (int i = 0; i < indices.length; i++) {
int result = CodedOutputStreamSizeUtil.bytesNeededForObservationAndClosingIndex(observationContentSize, indices[i]);
Assertions.assertEquals(expectedResults[i], result);
}
}

}
Loading

0 comments on commit d1771cc

Please sign in to comment.