Skip to content

Commit

Permalink
Do not buffer results
Browse files Browse the repository at this point in the history
  • Loading branch information
wendigo committed Nov 1, 2024
1 parent 61a79dd commit 423abcb
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,27 +14,36 @@
package io.trino.server.protocol;

import com.fasterxml.jackson.core.JsonGenerator;
import com.google.common.collect.ImmutableList;
import io.trino.client.QueryData;
import io.trino.server.protocol.JsonEncodingUtils.TypeEncoder;
import io.trino.spi.Page;
import io.trino.spi.connector.ConnectorSession;

import java.io.IOException;
import java.util.List;

import static java.nio.charset.StandardCharsets.UTF_8;
import static io.trino.server.protocol.JsonEncodingUtils.writePagesToJsonGenerator;
import static java.util.Objects.requireNonNull;

public class JsonBytesQueryData
implements QueryData
{
private final byte[] json;
private final ConnectorSession connectorSession;
private final TypeEncoder[] typeEncoders;
private final int[] sourcePageChannels;
private final List<Page> pages;

public JsonBytesQueryData(byte[] json)
public JsonBytesQueryData(ConnectorSession connectorSession, TypeEncoder[] typeEncoders, int[] sourcePageChannels, List<Page> pages)
{
this.json = requireNonNull(json, "json is null");
this.connectorSession = requireNonNull(connectorSession, "connectorSession");
this.typeEncoders = requireNonNull(typeEncoders, "typeEncoders is null");
this.sourcePageChannels = requireNonNull(sourcePageChannels, "sourcePageChannels is null");
this.pages = ImmutableList.copyOf(pages);
}

public void writeTo(JsonGenerator generator)
throws IOException
{
generator.writeRawValue(new String(json, UTF_8));
writePagesToJsonGenerator(connectorSession, generator, typeEncoders, sourcePageChannels, pages);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,17 @@
*/
package io.trino.server.protocol;

import com.fasterxml.jackson.core.JsonGenerator;
import io.trino.Session;
import io.trino.client.QueryData;
import io.trino.server.ExternalUriInfo;
import io.trino.server.protocol.JsonEncodingUtils.TypeEncoder;
import io.trino.server.protocol.spooling.QueryDataProducer;
import io.trino.spi.TrinoException;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.function.Consumer;

import static io.trino.plugin.base.util.JsonUtils.jsonFactory;
import static io.trino.server.protocol.JsonEncodingUtils.createTypeEncoders;
import static io.trino.server.protocol.JsonEncodingUtils.writePagesToJsonGenerator;
import static java.util.Objects.requireNonNull;

public class JsonBytesQueryDataProducer
Expand All @@ -56,15 +50,6 @@ public QueryData produce(ExternalUriInfo uriInfo, Session session, QueryResultRo
}

// Write to a buffer so we can capture and propagate the exception
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); JsonGenerator generator = jsonFactory().createGenerator(outputStream)) {
writePagesToJsonGenerator(session.toConnectorSession(), generator, typeEncoders, sourcePageChannels, rows.getPages());
return new JsonBytesQueryData(outputStream.toByteArray());
}
catch (TrinoException e) {
return null;
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
return new JsonBytesQueryData(session.toConnectorSession(), typeEncoders, sourcePageChannels, rows.getPages());
}
}

0 comments on commit 423abcb

Please sign in to comment.