From 9c970f08da016d88821004a9ba9e3a7097d83b05 Mon Sep 17 00:00:00 2001 From: "Mateusz \"Serafin\" Gajewski" Date: Thu, 31 Oct 2024 14:26:33 +0100 Subject: [PATCH] Capture and propagate serialization exceptions --- .../java/io/trino/server/protocol/JsonBytesQueryData.java | 8 ++++++-- .../trino/server/protocol/JsonBytesQueryDataProducer.java | 2 +- .../java/io/trino/server/protocol/JsonEncodingUtils.java | 5 +++-- .../protocol/spooling/encoding/JsonQueryDataEncoder.java | 2 +- 4 files changed, 11 insertions(+), 6 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/JsonBytesQueryData.java b/core/trino-main/src/main/java/io/trino/server/protocol/JsonBytesQueryData.java index 2692c636461a0..5df81885b650c 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/JsonBytesQueryData.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/JsonBytesQueryData.java @@ -18,9 +18,11 @@ import io.trino.client.QueryData; import io.trino.server.protocol.JsonEncodingUtils.TypeEncoder; import io.trino.spi.Page; +import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import java.util.List; +import java.util.function.Consumer; import static io.trino.server.protocol.JsonEncodingUtils.writePagesToJsonGenerator; import static java.util.Objects.requireNonNull; @@ -32,10 +34,12 @@ public class JsonBytesQueryData private final TypeEncoder[] typeEncoders; private final int[] sourcePageChannels; private final List pages; + private final Consumer exceptionHandler; - public JsonBytesQueryData(ConnectorSession connectorSession, TypeEncoder[] typeEncoders, int[] sourcePageChannels, List pages) + public JsonBytesQueryData(ConnectorSession connectorSession, Consumer exceptionHandler, TypeEncoder[] typeEncoders, int[] sourcePageChannels, List pages) { this.connectorSession = requireNonNull(connectorSession, "connectorSession"); + this.exceptionHandler = requireNonNull(exceptionHandler, "exceptionHandler is null"); this.typeEncoders = requireNonNull(typeEncoders, "typeEncoders is null"); this.sourcePageChannels = requireNonNull(sourcePageChannels, "sourcePageChannels is null"); this.pages = ImmutableList.copyOf(pages); @@ -43,7 +47,7 @@ public JsonBytesQueryData(ConnectorSession connectorSession, TypeEncoder[] typeE public void writeTo(JsonGenerator generator) { - writePagesToJsonGenerator(connectorSession, generator, typeEncoders, sourcePageChannels, pages); + writePagesToJsonGenerator(connectorSession, exceptionHandler, generator, typeEncoders, sourcePageChannels, pages); } @Override diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/JsonBytesQueryDataProducer.java b/core/trino-main/src/main/java/io/trino/server/protocol/JsonBytesQueryDataProducer.java index 34c2b65842b94..c7216f9a85e9b 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/JsonBytesQueryDataProducer.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/JsonBytesQueryDataProducer.java @@ -50,6 +50,6 @@ public QueryData produce(ExternalUriInfo uriInfo, Session session, QueryResultRo } // Write to a buffer so we can capture and propagate the exception - return new JsonBytesQueryData(session.toConnectorSession(), typeEncoders, sourcePageChannels, rows.getPages()); + return new JsonBytesQueryData(session.toConnectorSession(), throwableConsumer, typeEncoders, sourcePageChannels, rows.getPages()); } } diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/JsonEncodingUtils.java b/core/trino-main/src/main/java/io/trino/server/protocol/JsonEncodingUtils.java index 10e4b5634ae6e..8783fc641e423 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/JsonEncodingUtils.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/JsonEncodingUtils.java @@ -50,6 +50,7 @@ import java.io.IOException; import java.math.BigDecimal; import java.util.List; +import java.util.function.Consumer; import static com.google.common.base.Verify.verify; import static io.trino.spi.StandardErrorCode.SERIALIZATION_ERROR; @@ -116,7 +117,7 @@ public static TypeEncoder createTypeEncoder(Type type, boolean supportsParametri }; } - public static void writePagesToJsonGenerator(ConnectorSession connectorSession, JsonGenerator generator, TypeEncoder[] typeEncoders, int[] sourcePageChannels, List pages) + public static void writePagesToJsonGenerator(ConnectorSession connectorSession, Consumer throwableConsumer, JsonGenerator generator, TypeEncoder[] typeEncoders, int[] sourcePageChannels, List pages) { verify(typeEncoders.length == sourcePageChannels.length, "Source page channels and type encoders must have the same length"); try { @@ -135,7 +136,7 @@ public static void writePagesToJsonGenerator(ConnectorSession connectorSession, generator.flush(); // final flush to have the data written to the output stream } catch (Exception e) { - throw new TrinoException(SERIALIZATION_ERROR, "Could not serialize data to JSON", e); + throwableConsumer.accept(new TrinoException(SERIALIZATION_ERROR, "Could not serialize data to JSON", e)); } } diff --git a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/JsonQueryDataEncoder.java b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/JsonQueryDataEncoder.java index 8372296b8198c..81b58988f9170 100644 --- a/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/JsonQueryDataEncoder.java +++ b/core/trino-main/src/main/java/io/trino/server/protocol/spooling/encoding/JsonQueryDataEncoder.java @@ -62,7 +62,7 @@ public DataAttributes encodeTo(OutputStream output, List pages) JsonFactory jsonFactory = jsonFactory(); ConnectorSession connectorSession = session.toConnectorSession(); try (CountingOutputStream wrapper = new CountingOutputStream(output); JsonGenerator generator = jsonFactory.createGenerator(wrapper)) { - writePagesToJsonGenerator(connectorSession, generator, typeEncoders, sourcePageChannels, pages); + writePagesToJsonGenerator(connectorSession, e -> { throw e; }, generator, typeEncoders, sourcePageChannels, pages); return DataAttributes.builder() .set(SEGMENT_SIZE, toIntExact(wrapper.getCount())) .build();