diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSink.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSink.java index bf378f654eb94..c170ef8a7777c 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSink.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSink.java @@ -15,12 +15,16 @@ import com.google.api.core.ApiFuture; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest; +import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError; import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; import com.google.cloud.bigquery.storage.v1.TableName; import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.common.collect.ImmutableList; +import com.google.protobuf.Descriptors.DescriptorValidationException; import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.trino.spi.Page; @@ -31,12 +35,14 @@ import org.json.JSONArray; import org.json.JSONObject; +import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; -import static com.google.cloud.bigquery.storage.v1.WriteStream.Type.COMMITTED; +import static com.google.cloud.bigquery.storage.v1.WriteStream.Type.PENDING; import static com.google.common.base.Preconditions.checkArgument; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_BAD_WRITE; import static io.trino.plugin.bigquery.BigQueryTypeUtils.readNativeValue; @@ -49,10 +55,12 @@ public class BigQueryPageSink { private final BigQueryWriteClient client; private final WriteStream writeStream; + private final JsonStreamWriter streamWriter; private final List columnNames; private final List columnTypes; private final ConnectorPageSinkId pageSinkId; private final Optional pageSinkIdColumnName; + private final TableName tableName; public BigQueryPageSink( BigQueryWriteClient client, @@ -73,16 +81,21 @@ public BigQueryPageSink( this.pageSinkIdColumnName = requireNonNull(pageSinkIdColumnName, "pageSinkIdColumnName is null"); checkArgument(temporaryTableName.isPresent() == pageSinkIdColumnName.isPresent(), "temporaryTableName.isPresent is not equal to pageSinkIdColumn.isPresent"); - TableName tableName = temporaryTableName + tableName = temporaryTableName .map(table -> TableName.of(remoteTableName.getProjectId(), remoteTableName.getDatasetName(), table)) .orElseGet(remoteTableName::toTableName); - // TODO: Consider using PENDING mode - WriteStream stream = WriteStream.newBuilder().setType(COMMITTED).build(); + CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest.newBuilder() .setParent(tableName.toString()) - .setWriteStream(stream) + .setWriteStream(WriteStream.newBuilder().setType(PENDING).build()) .build(); - this.writeStream = client.createWriteStream(createWriteStreamRequest); + writeStream = client.createWriteStream(createWriteStreamRequest); + try { + streamWriter = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client).build(); + } + catch (DescriptorValidationException | IOException | InterruptedException e) { + throw new TrinoException(BIGQUERY_BAD_WRITE, "Failed to create stream writer", e); + } } @Override @@ -98,27 +111,39 @@ public CompletableFuture appendPage(Page page) batch.put(row); } - insertWithCommitted(batch); + append(batch); return NOT_BLOCKED; } - private void insertWithCommitted(JSONArray batch) + private void append(JSONArray batch) { - try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client).build()) { - ApiFuture future = writer.append(batch); + try { + ApiFuture future = streamWriter.append(batch); AppendRowsResponse response = future.get(); // Throw error if (response.hasError()) { throw new TrinoException(BIGQUERY_BAD_WRITE, format("Response has error: %s", response.getError().getMessage())); } } - catch (Exception e) { - throw new TrinoException(BIGQUERY_BAD_WRITE, "Failed to insert rows", e); + catch (IOException | DescriptorValidationException | ExecutionException | InterruptedException | AppendSerializationError e) { + throw new TrinoException(BIGQUERY_BAD_WRITE, "Failed to append stream writer", e); } } @Override public CompletableFuture> finish() { + streamWriter.close(); + client.finalizeWriteStream(streamWriter.getStreamName()); + + BatchCommitWriteStreamsRequest commitRequest = BatchCommitWriteStreamsRequest.newBuilder() + .setParent(tableName.toString()) + .addWriteStreams(writeStream.getName()) + .build(); + BatchCommitWriteStreamsResponse response = client.batchCommitWriteStreams(commitRequest); + if (response.getStreamErrorsCount() > 0) { + throw new TrinoException(BIGQUERY_BAD_WRITE, "Response has error: " + response.getStreamErrorsList()); + } + client.close(); Slice value = Slices.allocate(Long.BYTES); value.setLong(0, pageSinkId.getId()); diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryTypeMapping.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryTypeMapping.java index b3374bd27f3ca..089f9661bffbc 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryTypeMapping.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryTypeMapping.java @@ -499,9 +499,9 @@ private SqlDataTypeTest timestampTypeTest(String inputType, String literalPrefix public void testUnsupportedDatetime() { try (TestTable table = new TestTable(getBigQuerySqlExecutor(), "test.unsupported_datetime", "(col datetime)")) { - assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-0001-01-01 00:00:00.000000')", "Failed to insert rows.*"); - assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '0000-12-31 23:59:59.999999')", "Failed to insert rows.*"); - assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '10000-01-01 00:00:00.000000')", "Failed to insert rows.*"); + assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-0001-01-01 00:00:00.000000')", "Failed to append stream writer"); + assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '0000-12-31 23:59:59.999999')", "Failed to append stream writer"); + assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '10000-01-01 00:00:00.000000')", "Failed to append stream writer"); assertThatThrownBy(() -> getBigQuerySqlExecutor().execute("INSERT INTO " + table.getName() + " VALUES (datetime '-0001-01-01 00:00:00.000000')")) .hasMessageContaining("Invalid DATETIME literal"); @@ -642,10 +642,10 @@ private SqlDataTypeTest testTimestampWithTimeZone(String inputType) public void testUnsupportedTimestampWithTimeZone() { try (TestTable table = new TestTable(getBigQuerySqlExecutor(), "test.unsupported_tz", "(col timestamp)")) { - assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-2021-09-07 23:59:59.999999 UTC')", "Failed to insert rows.*"); - assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-0001-01-01 00:00:00.000000 UTC')", "Failed to insert rows.*"); - assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '0000-12-31 23:59:59.999999 UTC')", "Failed to insert rows.*"); - assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '10000-01-01 00:00:00.000000 UTC')", "Failed to insert rows.*"); + assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-2021-09-07 23:59:59.999999 UTC')", "Failed to append stream writer"); + assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-0001-01-01 00:00:00.000000 UTC')", "Failed to append stream writer"); + assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '0000-12-31 23:59:59.999999 UTC')", "Failed to append stream writer"); + assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '10000-01-01 00:00:00.000000 UTC')", "Failed to append stream writer"); assertThatThrownBy(() -> getBigQuerySqlExecutor().execute("INSERT INTO " + table.getName() + " VALUES (timestamp '-2021-09-07 23:59:59.999999 UTC')")) .hasMessageContaining("Invalid TIMESTAMP literal");