From 423abcb8f59e390106bcbf813b5ff7ca06e813be Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Thu, 31 Oct 2024 13:25:42 +0100 Subject: [PATCH] Do not buffer results --- .../server/protocol/JsonBytesQueryData.java | 23 +++++++++++++------ .../protocol/JsonBytesQueryDataProducer.java | 17 +------------- 2 files changed, 17 insertions(+), 23 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/JsonBytesQueryData.java b/core/trino-main/src/main/java/io/trino/server/protocol/JsonBytesQueryData.java index 004033cbf1744..2692c636461a0 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/JsonBytesQueryData.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/JsonBytesQueryData.java @@ -14,27 +14,36 @@ package io.trino.server.protocol; import com.fasterxml.jackson.core.JsonGenerator; +import com.google.common.collect.ImmutableList; import io.trino.client.QueryData; +import io.trino.server.protocol.JsonEncodingUtils.TypeEncoder; +import io.trino.spi.Page; +import io.trino.spi.connector.ConnectorSession; -import java.io.IOException; +import java.util.List; -import static java.nio.charset.StandardCharsets.UTF_8; +import static io.trino.server.protocol.JsonEncodingUtils.writePagesToJsonGenerator; import static java.util.Objects.requireNonNull; public class JsonBytesQueryData implements QueryData { - private final byte[] json; + private final ConnectorSession connectorSession; + private final TypeEncoder[] typeEncoders; + private final int[] sourcePageChannels; + private final List pages; - public JsonBytesQueryData(byte[] json) + public JsonBytesQueryData(ConnectorSession connectorSession, TypeEncoder[] typeEncoders, int[] sourcePageChannels, List pages) { - this.json = requireNonNull(json, "json is null"); + this.connectorSession = requireNonNull(connectorSession, "connectorSession"); + this.typeEncoders = requireNonNull(typeEncoders, "typeEncoders is null"); + this.sourcePageChannels = requireNonNull(sourcePageChannels, "sourcePageChannels is null"); + this.pages = ImmutableList.copyOf(pages); } public void writeTo(JsonGenerator generator) - throws IOException { - generator.writeRawValue(new String(json, UTF_8)); + writePagesToJsonGenerator(connectorSession, generator, typeEncoders, sourcePageChannels, pages); } @Override diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/JsonBytesQueryDataProducer.java b/core/trino-main/src/main/java/io/trino/server/protocol/JsonBytesQueryDataProducer.java index 062c471c4e762..34c2b65842b94 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/JsonBytesQueryDataProducer.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/JsonBytesQueryDataProducer.java @@ -13,7 +13,6 @@ */ package io.trino.server.protocol; -import com.fasterxml.jackson.core.JsonGenerator; import io.trino.Session; import io.trino.client.QueryData; import io.trino.server.ExternalUriInfo; @@ -21,15 +20,10 @@ import io.trino.server.protocol.spooling.QueryDataProducer; import io.trino.spi.TrinoException; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.UncheckedIOException; import java.util.List; import java.util.function.Consumer; -import static io.trino.plugin.base.util.JsonUtils.jsonFactory; import static io.trino.server.protocol.JsonEncodingUtils.createTypeEncoders; -import static io.trino.server.protocol.JsonEncodingUtils.writePagesToJsonGenerator; import static java.util.Objects.requireNonNull; public class JsonBytesQueryDataProducer @@ -56,15 +50,6 @@ public QueryData produce(ExternalUriInfo uriInfo, Session session, QueryResultRo } // Write to a buffer so we can capture and propagate the exception - try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); JsonGenerator generator = jsonFactory().createGenerator(outputStream)) { - writePagesToJsonGenerator(session.toConnectorSession(), generator, typeEncoders, sourcePageChannels, rows.getPages()); - return new JsonBytesQueryData(outputStream.toByteArray()); - } - catch (TrinoException e) { - return null; - } - catch (IOException e) { - throw new UncheckedIOException(e); - } + return new JsonBytesQueryData(session.toConnectorSession(), typeEncoders, sourcePageChannels, rows.getPages()); } }