Skip to content

Commit

Permalink
Use PENDING mode in BigQuery writes
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Nov 6, 2024
1 parent 8f66aef commit 3cb1e24
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,16 @@

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;
import com.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializationError;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import io.airlift.slice.Slice;
import io.airlift.slice.Slices;
import io.trino.spi.Page;
Expand All @@ -31,13 +35,15 @@
import org.json.JSONArray;
import org.json.JSONObject;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.ExecutionException;

import static com.google.cloud.bigquery.storage.v1.WriteStream.Type.COMMITTED;
import static com.google.cloud.bigquery.storage.v1.WriteStream.Type.PENDING;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkArgument;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_BAD_WRITE;
import static io.trino.plugin.bigquery.BigQueryTypeUtils.readNativeValue;
Expand All @@ -49,12 +55,13 @@ public class BigQueryPageSink
implements ConnectorPageSink
{
private final BigQueryWriteClient client;
private final CreateWriteStreamRequest createWriteStreamRequest;
private final AtomicReference<WriteStream> writeStream = new AtomicReference<>();
private final WriteStream writeStream;
private final JsonStreamWriter streamWriter;
private final List<String> columnNames;
private final List<Type> columnTypes;
private final ConnectorPageSinkId pageSinkId;
private final Optional<String> pageSinkIdColumnName;
private final TableName tableName;

public BigQueryPageSink(
BigQueryWriteClient client,
Expand All @@ -75,15 +82,21 @@ public BigQueryPageSink(
this.pageSinkIdColumnName = requireNonNull(pageSinkIdColumnName, "pageSinkIdColumnName is null");
checkArgument(temporaryTableName.isPresent() == pageSinkIdColumnName.isPresent(),
"temporaryTableName.isPresent is not equal to pageSinkIdColumn.isPresent");
TableName tableName = temporaryTableName
tableName = temporaryTableName
.map(table -> TableName.of(remoteTableName.projectId(), remoteTableName.datasetName(), table))
.orElseGet(remoteTableName::toTableName);
// TODO: Consider using PENDING mode
WriteStream stream = WriteStream.newBuilder().setType(COMMITTED).build();
createWriteStreamRequest = CreateWriteStreamRequest.newBuilder()

CreateWriteStreamRequest createWriteStreamRequest = CreateWriteStreamRequest.newBuilder()
.setParent(tableName.toString())
.setWriteStream(stream)
.setWriteStream(WriteStream.newBuilder().setType(PENDING).build())
.build();
writeStream = client.createWriteStream(createWriteStreamRequest);
try {
streamWriter = JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema(), client).build();
}
catch (DescriptorValidationException | IOException | InterruptedException e) {
throw new TrinoException(BIGQUERY_BAD_WRITE, "Failed to create stream writer: " + firstNonNull(e.getMessage(), e), e);
}
}

@Override
Expand All @@ -99,36 +112,39 @@ public CompletableFuture<?> appendPage(Page page)
batch.put(row);
}

insertWithCommitted(batch);
append(batch);
return NOT_BLOCKED;
}

private void insertWithCommitted(JSONArray batch)
private void append(JSONArray batch)
{
WriteStream stream = writeStream.updateAndGet(this::getOrCreateWriteStream);
try (JsonStreamWriter writer = JsonStreamWriter.newBuilder(stream.getName(), stream.getTableSchema(), client).build()) {
ApiFuture<AppendRowsResponse> future = writer.append(batch);
try {
ApiFuture<AppendRowsResponse> future = streamWriter.append(batch);
AppendRowsResponse response = future.get(); // Throw error
if (response.hasError()) {
throw new TrinoException(BIGQUERY_BAD_WRITE, format("Response has error: %s", response.getError().getMessage()));
}
}
catch (Exception e) {
throw new TrinoException(BIGQUERY_BAD_WRITE, "Failed to insert rows", e);
}
}

private WriteStream getOrCreateWriteStream(WriteStream current)
{
if (current == null) {
return client.createWriteStream(createWriteStreamRequest);
catch (IOException | DescriptorValidationException | ExecutionException | InterruptedException | AppendSerializationError e) {
throw new TrinoException(BIGQUERY_BAD_WRITE, "Write failed: " + firstNonNull(e.getMessage(), e), e);
}
return current;
}

@Override
public CompletableFuture<Collection<Slice>> finish()
{
streamWriter.close();
client.finalizeWriteStream(streamWriter.getStreamName());

BatchCommitWriteStreamsRequest commitRequest = BatchCommitWriteStreamsRequest.newBuilder()
.setParent(tableName.toString())
.addWriteStreams(writeStream.getName())
.build();
BatchCommitWriteStreamsResponse response = client.batchCommitWriteStreams(commitRequest);
if (response.getStreamErrorsCount() > 0) {
throw new TrinoException(BIGQUERY_BAD_WRITE, "Committing write failed: " + response.getStreamErrorsList());
}

client.close();
Slice value = Slices.allocate(Long.BYTES);
value.setLong(0, pageSinkId.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,9 +500,9 @@ private SqlDataTypeTest timestampTypeTest(String inputType, String literalPrefix
public void testUnsupportedDatetime()
{
try (TestTable table = new TestTable(getBigQuerySqlExecutor(), "test.unsupported_datetime", "(col datetime)")) {
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-0001-01-01 00:00:00.000000')", "Failed to insert rows.*");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '0000-12-31 23:59:59.999999')", "Failed to insert rows.*");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '10000-01-01 00:00:00.000000')", "Failed to insert rows.*");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-0001-01-01 00:00:00.000000')", "Write failed: .*");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '0000-12-31 23:59:59.999999')", "Write failed: .*");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '10000-01-01 00:00:00.000000')", "Write failed: .*");

assertThatThrownBy(() -> getBigQuerySqlExecutor().execute("INSERT INTO " + table.getName() + " VALUES (datetime '-0001-01-01 00:00:00.000000')"))
.hasMessageContaining("Invalid DATETIME literal");
Expand Down Expand Up @@ -643,10 +643,10 @@ private SqlDataTypeTest testTimestampWithTimeZone(String inputType)
public void testUnsupportedTimestampWithTimeZone()
{
try (TestTable table = new TestTable(getBigQuerySqlExecutor(), "test.unsupported_tz", "(col timestamp)")) {
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-2021-09-07 23:59:59.999999 UTC')", "Failed to insert rows.*");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-0001-01-01 00:00:00.000000 UTC')", "Failed to insert rows.*");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '0000-12-31 23:59:59.999999 UTC')", "Failed to insert rows.*");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '10000-01-01 00:00:00.000000 UTC')", "Failed to insert rows.*");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-2021-09-07 23:59:59.999999 UTC')", "Write failed: .*");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '-0001-01-01 00:00:00.000000 UTC')", "Write failed: .*");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '0000-12-31 23:59:59.999999 UTC')", "Write failed: .*");
assertQueryFails("INSERT INTO " + table.getName() + " VALUES (timestamp '10000-01-01 00:00:00.000000 UTC')", "Write failed: .*");

assertThatThrownBy(() -> getBigQuerySqlExecutor().execute("INSERT INTO " + table.getName() + " VALUES (timestamp '-2021-09-07 23:59:59.999999 UTC')"))
.hasMessageContaining("Invalid TIMESTAMP literal");
Expand Down

0 comments on commit 3cb1e24

Please sign in to comment.