Skip to content

Commit

Permalink
Use PENDING mode in BigQuery writes
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Feb 28, 2024
1 parent 786cacd commit 40fe929
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.spi.Page;
Expand All @@ -31,12 +35,14 @@
import org.json.JSONArray;
import org.json.JSONObject;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static com.google.cloud.bigquery.storage.v1.WriteStream.Type.COMMITTED;
import static com.google.cloud.bigquery.storage.v1.WriteStream.Type.PENDING;
import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_BAD_WRITE;
import static io.trino.plugin.bigquery.BigQueryTypeUtils.readNativeValue;
Expand All @@ -49,10 +55,12 @@ public class BigQueryPageSink
{
private final BigQueryWriteClient client;
private final WriteStream writeStream;
private final JsonStreamWriter streamWriter;
private final List<String> columnNames;
private final List<Type> columnTypes;
private final ConnectorPageSinkId pageSinkId;
private final Optional<String> pageSinkIdColumnName;
private final TableName tableName;

public BigQueryPageSink(
BigQueryWriteClient client,
Expand All @@ -73,16 +81,21 @@ public BigQueryPageSink(
this.pageSinkIdColumnName = requireNonNull(pageSinkIdColumnName, "pageSinkIdColumnName is null");
checkArgument(temporaryTableName.isPresent() == pageSinkIdColumnName.isPresent(),
"temporaryTableName.isPresent is not equal to pageSinkIdColumn.isPresent");
TableName tableName = temporaryTableName
tableName = temporaryTableName
.map(table -> TableName.of(remoteTableName.getProjectId(), remoteTableName.getDatasetName(), table))
.orElseGet(remoteTableName::toTableName);
// TODO: Consider using PENDING mode
WriteStream stream = WriteStream.newBuilder().setType(COMMITTED).build();

CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest.newBuilder()
.setParent(tableName.toString())
.setWriteStream(stream)
.setWriteStream(WriteStream.newBuilder().setType(PENDING).build())
.build();
this.writeStream = client.createWriteStream(createWriteStreamRequest);
writeStream = client.createWriteStream(createWriteStreamRequest);
try {
streamWriter = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client).build();
}
catch (DescriptorValidationException | IOException | InterruptedException e) {
throw new TrinoException(BIGQUERY_BAD_WRITE, "Failed to create stream writer", e);
}
}

@Override
Expand All @@ -98,27 +111,39 @@ public CompletableFuture<?> appendPage(Page page)
batch.put(row);
}

insertWithCommitted(batch);
append(batch);
return NOT_BLOCKED;
}

private void insertWithCommitted(JSONArray batch)
private void append(JSONArray batch)
{
try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client).build()) {
ApiFuture<AppendRowsResponse> future = writer.append(batch);
try {
ApiFuture<AppendRowsResponse> future = streamWriter.append(batch);
AppendRowsResponse response = future.get(); // Throw error
if (response.hasError()) {
throw new TrinoException(BIGQUERY_BAD_WRITE, format("Response has error: %s", response.getError().getMessage()));
}
}
catch (Exception e) {
throw new TrinoException(BIGQUERY_BAD_WRITE, "Failed to insert rows", e);
catch (IOException | DescriptorValidationException | ExecutionException | InterruptedException | AppendSerializationError e) {
throw new TrinoException(BIGQUERY_BAD_WRITE, "Failed to append stream writer", e);
}
}

@Override
public CompletableFuture<Collection<Slice>> finish()
{
streamWriter.close();
client.finalizeWriteStream(streamWriter.getStreamName());

BatchCommitWriteStreamsRequest commitRequest = BatchCommitWriteStreamsRequest.newBuilder()
.setParent(tableName.toString())
.addWriteStreams(writeStream.getName())
.build();
BatchCommitWriteStreamsResponse response = client.batchCommitWriteStreams(commitRequest);
if (response.getStreamErrorsCount() > 0) {
throw new TrinoException(BIGQUERY_BAD_WRITE, "Response has error: " + response.getStreamErrorsList());
}

client.close();
Slice value = Slices.allocate(Long.BYTES);
value.setLong(0, pageSinkId.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,9 +499,9 @@ private SqlDataTypeTest timestampTypeTest(String inputType, String literalPrefix
public void testUnsupportedDatetime()
{
try (TestTable table = new TestTable(getBigQuerySqlExecutor(), "test.unsupported_datetime", "(col datetime)")) {
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-0001-01-01 00:00:00.000000')", "Failed to insert rows.*");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '0000-12-31 23:59:59.999999')", "Failed to insert rows.*");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '10000-01-01 00:00:00.000000')", "Failed to insert rows.*");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-0001-01-01 00:00:00.000000')", "Failed to append stream writer");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '0000-12-31 23:59:59.999999')", "Failed to append stream writer");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '10000-01-01 00:00:00.000000')", "Failed to append stream writer");

assertThatThrownBy(() -> getBigQuerySqlExecutor().execute("INSERT INTO " + table.getName() + " VALUES (datetime '-0001-01-01 00:00:00.000000')"))
.hasMessageContaining("Invalid DATETIME literal");
Expand Down Expand Up @@ -642,10 +642,10 @@ private SqlDataTypeTest testTimestampWithTimeZone(String inputType)
public void testUnsupportedTimestampWithTimeZone()
{
try (TestTable table = new TestTable(getBigQuerySqlExecutor(), "test.unsupported_tz", "(col timestamp)")) {
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-2021-09-07 23:59:59.999999 UTC')", "Failed to insert rows.*");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-0001-01-01 00:00:00.000000 UTC')", "Failed to insert rows.*");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '0000-12-31 23:59:59.999999 UTC')", "Failed to insert rows.*");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '10000-01-01 00:00:00.000000 UTC')", "Failed to insert rows.*");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-2021-09-07 23:59:59.999999 UTC')", "Failed to append stream writer");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-0001-01-01 00:00:00.000000 UTC')", "Failed to append stream writer");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '0000-12-31 23:59:59.999999 UTC')", "Failed to append stream writer");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '10000-01-01 00:00:00.000000 UTC')", "Failed to append stream writer");

assertThatThrownBy(() -> getBigQuerySqlExecutor().execute("INSERT INTO " + table.getName() + " VALUES (timestamp '-2021-09-07 23:59:59.999999 UTC')"))
.hasMessageContaining("Invalid TIMESTAMP literal");
Expand Down

0 comments on commit 40fe929

Please sign in to comment.