diff --git a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSink.java b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSink.java index a6dd78859f7b6..7c58902fbcd6f 100644 --- a/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSink.java +++ b/plugin/trino-bigquery/src/main/java/io/trino/plugin/bigquery/BigQueryPageSink.java @@ -15,12 +15,16 @@ import com.google.api.core.ApiFuture; import com.google.cloud.bigquery.storage.v1.AppendRowsResponse; +import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest; +import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse; import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest; +import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError; import com.google.cloud.bigquery.storage.v1.JsonStreamWriter; import com.google.cloud.bigquery.storage.v1.TableName; import com.google.cloud.bigquery.storage.v1.WriteStream; import com.google.common.collect.ImmutableList; +import com.google.protobuf.Descriptors.DescriptorValidationException; import io.airlift.slice.Slice; import io.airlift.slice.Slices; import io.trino.spi.Page; @@ -31,13 +35,15 @@ import org.json.JSONArray; import org.json.JSONObject; +import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ExecutionException; -import static com.google.cloud.bigquery.storage.v1.WriteStream.Type.COMMITTED; +import static com.google.cloud.bigquery.storage.v1.WriteStream.Type.PENDING; +import static com.google.common.base.MoreObjects.firstNonNull; import static com.google.common.base.Preconditions.checkArgument; import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_BAD_WRITE; import static io.trino.plugin.bigquery.BigQueryTypeUtils.readNativeValue; @@ -49,12 +55,13 @@ public class BigQueryPageSink implements ConnectorPageSink { private final BigQueryWriteClient client; - private final CreateWriteStreamRequest createWriteStreamRequest; - private final AtomicReference writeStream = new AtomicReference<>(); + private final WriteStream writeStream; + private final JsonStreamWriter streamWriter; private final List columnNames; private final List columnTypes; private final ConnectorPageSinkId pageSinkId; private final Optional pageSinkIdColumnName; + private final TableName tableName; public BigQueryPageSink( BigQueryWriteClient client, @@ -75,15 +82,21 @@ public BigQueryPageSink( this.pageSinkIdColumnName = requireNonNull(pageSinkIdColumnName, "pageSinkIdColumnName is null"); checkArgument(temporaryTableName.isPresent() == pageSinkIdColumnName.isPresent(), "temporaryTableName.isPresent is not equal to pageSinkIdColumn.isPresent"); - TableName tableName = temporaryTableName + tableName = temporaryTableName .map(table -> TableName.of(remoteTableName.projectId(), remoteTableName.datasetName(), table)) .orElseGet(remoteTableName::toTableName); - // TODO: Consider using PENDING mode - WriteStream stream = WriteStream.newBuilder().setType(COMMITTED).build(); - createWriteStreamRequest = CreateWriteStreamRequest.newBuilder() + + CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest.newBuilder() .setParent(tableName.toString()) - .setWriteStream(stream) + .setWriteStream(WriteStream.newBuilder().setType(PENDING).build()) .build(); + writeStream = client.createWriteStream(createWriteStreamRequest); + try { + streamWriter = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client).build(); + } + catch (DescriptorValidationException | IOException | InterruptedException e) { + throw new TrinoException(BIGQUERY_BAD_WRITE, "Failed to create stream writer: " + firstNonNull(e.getMessage(), e), e); + } } @Override @@ -99,36 +112,39 @@ public CompletableFuture appendPage(Page page) batch.put(row); } - insertWithCommitted(batch); + append(batch); return NOT_BLOCKED; } - private void insertWithCommitted(JSONArray batch) + private void append(JSONArray batch) { - WriteStream stream = writeStream.updateAndGet(this::getOrCreateWriteStream); - try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(stream.getName(), stream.getTableSchema(), client).build()) { - ApiFuture future = writer.append(batch); + try { + ApiFuture future = streamWriter.append(batch); AppendRowsResponse response = future.get(); // Throw error if (response.hasError()) { throw new TrinoException(BIGQUERY_BAD_WRITE, format("Response has error: %s", response.getError().getMessage())); } } - catch (Exception e) { - throw new TrinoException(BIGQUERY_BAD_WRITE, "Failed to insert rows", e); - } - } - - private WriteStream getOrCreateWriteStream(WriteStream current) - { - if (current == null) { - return client.createWriteStream(createWriteStreamRequest); + catch (IOException | DescriptorValidationException | ExecutionException | InterruptedException | AppendSerializationError e) { + throw new TrinoException(BIGQUERY_BAD_WRITE, "Write failed: " + firstNonNull(e.getMessage(), e), e); } - return current; } @Override public CompletableFuture> finish() { + streamWriter.close(); + client.finalizeWriteStream(streamWriter.getStreamName()); + + BatchCommitWriteStreamsRequest commitRequest = BatchCommitWriteStreamsRequest.newBuilder() + .setParent(tableName.toString()) + .addWriteStreams(writeStream.getName()) + .build(); + BatchCommitWriteStreamsResponse response = client.batchCommitWriteStreams(commitRequest); + if (response.getStreamErrorsCount() > 0) { + throw new TrinoException(BIGQUERY_BAD_WRITE, "Committing write failed: " + response.getStreamErrorsList()); + } + client.close(); Slice value = Slices.allocate(Long.BYTES); value.setLong(0, pageSinkId.getId()); diff --git a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryTypeMapping.java b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryTypeMapping.java index 9724da1b9b554..19fe7f8f61256 100644 --- a/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryTypeMapping.java +++ b/plugin/trino-bigquery/src/test/java/io/trino/plugin/bigquery/BaseBigQueryTypeMapping.java @@ -500,9 +500,9 @@ private SqlDataTypeTest timestampTypeTest(String inputType, String literalPrefix public void testUnsupportedDatetime() { try (TestTable table = new TestTable(getBigQuerySqlExecutor(), "test.unsupported_datetime", "(col datetime)")) { - assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-0001-01-01 00:00:00.000000')", "Failed to insert rows.*"); - assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '0000-12-31 23:59:59.999999')", "Failed to insert rows.*"); - assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '10000-01-01 00:00:00.000000')", "Failed to insert rows.*"); + assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-0001-01-01 00:00:00.000000')", "Write failed: .*"); + assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '0000-12-31 23:59:59.999999')", "Write failed: .*"); + assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '10000-01-01 00:00:00.000000')", "Write failed: .*"); assertThatThrownBy(() -> getBigQuerySqlExecutor().execute("INSERT INTO " + table.getName() + " VALUES (datetime '-0001-01-01 00:00:00.000000')")) .hasMessageContaining("Invalid DATETIME literal"); @@ -643,10 +643,10 @@ private SqlDataTypeTest testTimestampWithTimeZone(String inputType) public void testUnsupportedTimestampWithTimeZone() { try (TestTable table = new TestTable(getBigQuerySqlExecutor(), "test.unsupported_tz", "(col timestamp)")) { - assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-2021-09-07 23:59:59.999999 UTC')", "Failed to insert rows.*"); - assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-0001-01-01 00:00:00.000000 UTC')", "Failed to insert rows.*"); - assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '0000-12-31 23:59:59.999999 UTC')", "Failed to insert rows.*"); - assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '10000-01-01 00:00:00.000000 UTC')", "Failed to insert rows.*"); + assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-2021-09-07 23:59:59.999999 UTC')", "Write failed: .*"); + assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-0001-01-01 00:00:00.000000 UTC')", "Write failed: .*"); + assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '0000-12-31 23:59:59.999999 UTC')", "Write failed: .*"); + assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '10000-01-01 00:00:00.000000 UTC')", "Write failed: .*"); assertThatThrownBy(() -> getBigQuerySqlExecutor().execute("INSERT INTO " + table.getName() + " VALUES (timestamp '-2021-09-07 23:59:59.999999 UTC')")) .hasMessageContaining("Invalid TIMESTAMP literal");