Skip to content

Commit

Permalink
Capture and propagate serialization exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
wendigo committed Nov 1, 2024
1 parent 423abcb commit 9c970f0
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import io.trino.client.QueryData;
import io.trino.server.protocol.JsonEncodingUtils.TypeEncoder;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;

import java.util.List;
import java.util.function.Consumer;

import static io.trino.server.protocol.JsonEncodingUtils.writePagesToJsonGenerator;
import static java.util.Objects.requireNonNull;
Expand All @@ -32,18 +34,20 @@ public class JsonBytesQueryData
private final TypeEncoder[] typeEncoders;
private final int[] sourcePageChannels;
private final List<Page> pages;
private final Consumer<TrinoException> exceptionHandler;

public JsonBytesQueryData(ConnectorSession connectorSession, TypeEncoder[] typeEncoders, int[] sourcePageChannels, List<Page> pages)
public JsonBytesQueryData(ConnectorSession connectorSession, Consumer<TrinoException> exceptionHandler, TypeEncoder[] typeEncoders, int[] sourcePageChannels, List<Page> pages)
{
this.connectorSession = requireNonNull(connectorSession, "connectorSession");
this.exceptionHandler = requireNonNull(exceptionHandler, "exceptionHandler is null");
this.typeEncoders = requireNonNull(typeEncoders, "typeEncoders is null");
this.sourcePageChannels = requireNonNull(sourcePageChannels, "sourcePageChannels is null");
this.pages = ImmutableList.copyOf(pages);
}

public void writeTo(JsonGenerator generator)
{
writePagesToJsonGenerator(connectorSession, generator, typeEncoders, sourcePageChannels, pages);
writePagesToJsonGenerator(connectorSession, exceptionHandler, generator, typeEncoders, sourcePageChannels, pages);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,6 @@ public QueryData produce(ExternalUriInfo uriInfo, Session session, QueryResultRo
}

// Write to a buffer so we can capture and propagate the exception
return new JsonBytesQueryData(session.toConnectorSession(), typeEncoders, sourcePageChannels, rows.getPages());
return new JsonBytesQueryData(session.toConnectorSession(), throwableConsumer, typeEncoders, sourcePageChannels, rows.getPages());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
import java.util.function.Consumer;

import static com.google.common.base.Verify.verify;
import static io.trino.spi.StandardErrorCode.SERIALIZATION_ERROR;
Expand Down Expand Up @@ -116,7 +117,7 @@ public static TypeEncoder createTypeEncoder(Type type, boolean supportsParametri
};
}

public static void writePagesToJsonGenerator(ConnectorSession connectorSession, JsonGenerator generator, TypeEncoder[] typeEncoders, int[] sourcePageChannels, List<Page> pages)
public static void writePagesToJsonGenerator(ConnectorSession connectorSession, Consumer<TrinoException> throwableConsumer, JsonGenerator generator, TypeEncoder[] typeEncoders, int[] sourcePageChannels, List<Page> pages)
{
verify(typeEncoders.length == sourcePageChannels.length, "Source page channels and type encoders must have the same length");
try {
Expand All @@ -135,7 +136,7 @@ public static void writePagesToJsonGenerator(ConnectorSession connectorSession,
generator.flush(); // final flush to have the data written to the output stream
}
catch (Exception e) {
throw new TrinoException(SERIALIZATION_ERROR, "Could not serialize data to JSON", e);
throwableConsumer.accept(new TrinoException(SERIALIZATION_ERROR, "Could not serialize data to JSON", e));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public DataAttributes encodeTo(OutputStream output, List<Page> pages)
JsonFactory jsonFactory = jsonFactory();
ConnectorSession connectorSession = session.toConnectorSession();
try (CountingOutputStream wrapper = new CountingOutputStream(output); JsonGenerator generator = jsonFactory.createGenerator(wrapper)) {
writePagesToJsonGenerator(connectorSession, generator, typeEncoders, sourcePageChannels, pages);
writePagesToJsonGenerator(connectorSession, e -> { throw e; }, generator, typeEncoders, sourcePageChannels, pages);
return DataAttributes.builder()
.set(SEGMENT_SIZE, toIntExact(wrapper.getCount()))
.build();
Expand Down

0 comments on commit 9c970f0

Please sign in to comment.