Skip to content

Commit

Permalink
ByteBuf String encoding simplification
Browse files Browse the repository at this point in the history
Signed-off-by: Andre Kurait <akurait@amazon.com>
  • Loading branch information
AndreKurait committed Apr 26, 2024
1 parent 6d1712a commit 9134710
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -370,18 +370,18 @@ protected CompletableFuture<Void> kickoffCloseStream(CodedOutputStreamHolder out
"Unknown outputStreamHolder sent back to StreamManager: " + outputStreamHolder);
}
var osh = (CodedOutputStreamAndByteBufferWrapper) outputStreamHolder;
log.trace("Getting ready to flush for " + osh);
log.trace("Bytes written so far... " + StandardCharsets.UTF_8.decode(osh.getByteBuffer().duplicate()));
log.atTrace().log(() -> "Getting ready to flush for " + osh);
log.atTrace().log(() -> "Bytes written so far... " + StandardCharsets.UTF_8.decode(osh.getByteBuffer().slice()));

return CompletableFuture.runAsync(() -> {
try {
osh.getOutputStream().flush();
log.trace("Just flushed for " + osh.getOutputStream());
log.atTrace().log(() -> "Just flushed for " + osh.getOutputStream());
var bb = osh.getByteBuffer();
bb.position(0);
var bytesWritten = osh.getOutputStream().getTotalBytesWritten();
bb.limit(bytesWritten);
log.trace("Adding " + StandardCharsets.UTF_8.decode(bb.duplicate()));
log.atTrace().log(() -> "Adding " + StandardCharsets.UTF_8.decode(bb.slice()));
outputBuffers.add(bb);
} catch (IOException e) {
throw Lombok.sneakyThrow(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package org.opensearch.migrations.replay;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.http.HttpHeaders;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Base64;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -100,18 +101,16 @@ public static void fillStatusCodeMetrics(@NonNull IReplayContexts.ITupleHandling
targetResponseOp.ifPresent(r -> context.setTargetStatus((Integer) r.get(STATUS_CODE_KEY)));
}

private static byte[] getBytesFromByteBuf(ByteBuf buf) {
var bytes = new byte[buf.readableBytes()];
buf.getBytes(buf.readerIndex(), bytes);
return bytes;
}

private static Map<String, Object> fillMap(LinkedHashMap<String, Object> map,
HttpHeaders headers, ByteBuf content) {
String base64body = Base64.getEncoder().encodeToString(getBytesFromByteBuf(content));
map.put("body", base64body);
headers.entries().stream().forEach(kvp -> map.put(kvp.getKey(), kvp.getValue()));
return map;
try (var bufHolder = RefSafeHolder.create(Base64.encode(content))) {
var buf = bufHolder.get();
assert buf != null : "Base64.encode should not return null";
String base64body = buf.toString(StandardCharsets.UTF_8);
map.put("body", base64body);
headers.entries().forEach(kvp -> map.put(kvp.getKey(), kvp.getValue()));
return map;
}
}

private static Map<String, Object> makeSafeMap(@NonNull IReplayContexts.ITupleHandlingContext context,
Expand Down

0 comments on commit 9134710

Please sign in to comment.