Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1151484 Use insertRows for schema evolution #866

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -507,8 +507,6 @@ public InsertRowsResponse insertRecords(StreamingBuffer streamingBufferToInsert)
InsertRowsResponse response = null;
try {
response = insertRowsWithFallback(streamingBufferToInsert);
// Updates the flush time (last time we called insertRows API)
this.previousFlushTimeStampMs = System.currentTimeMillis();

LOGGER.info(
"Successfully called insertRows for channel:{}, buffer:{}, insertResponseHasErrors:{},"
Expand All @@ -517,17 +515,25 @@ public InsertRowsResponse insertRecords(StreamingBuffer streamingBufferToInsert)
streamingBufferToInsert,
response.hasErrors(),
response.needToResetOffset());
if (response.hasErrors()) {
handleInsertRowsFailures(
response.getInsertErrors(), streamingBufferToInsert.getSinkRecords());
}

// Due to schema evolution, we may need to reopen the channel and reset the offset in kafka
// since it's possible that not all rows are ingested
if (response.needToResetOffset()) {
streamingApiFallbackSupplier(
StreamingApiFallbackInvoker.INSERT_ROWS_SCHEMA_EVOLUTION_FALLBACK);
}
// If there are errors other than schema mismatch, we need to handle them and reinsert the
// good rows
if (response.hasErrors()) {
handleInsertRowsFailures(
response.getInsertErrors(), streamingBufferToInsert.getSinkRecords());
insertRecords(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to reset the offset if we put here the schema evolution records as well?
@sfc-gh-tzhang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, why we need to do that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are setting this flag needToResetOffset to be true in case of schema evolution error. I'm not sure what it exactly does and if we need this if we now reinsert the rows by rebuild the buffer with rebuildBufferWithoutErrorRows

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you see how needToResetOffset will be used, when it's true, we will reopen the channel and restart from the latest committed offset token

rebuildBufferWithoutErrorRows(streamingBufferToInsert, response.getInsertErrors()));
}

// Updates the flush time (last time we successfully insert some rows)
this.previousFlushTimeStampMs = System.currentTimeMillis();

return response;
} catch (TopicPartitionChannelInsertionException ex) {
// Suppressing the exception because other channels might still continue to ingest
Expand All @@ -540,6 +546,22 @@ public InsertRowsResponse insertRecords(StreamingBuffer streamingBufferToInsert)
return response;
}

/** Building a new buffer which contains only the good rows from the original buffer */
private StreamingBuffer rebuildBufferWithoutErrorRows(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a unit test for this function

StreamingBuffer streamingBufferToInsert,
List<InsertValidationResponse.InsertError> insertErrors) {
StreamingBuffer buffer = new StreamingBuffer();
int errorIdx = 0;
for (long rowIdx = 0; rowIdx < streamingBufferToInsert.getNumOfRecords(); rowIdx++) {
if (errorIdx < insertErrors.size() && rowIdx == insertErrors.get(errorIdx).getRowIndex()) {
errorIdx++;
} else {
buffer.insert(streamingBufferToInsert.getSinkRecord(rowIdx));
}
}
return buffer;
}

/**
* Uses {@link Fallback} API to reopen the channel if insertRows throws {@link SFException}.
*
Expand Down Expand Up @@ -620,65 +642,40 @@ public InsertRowsResponse get() throws Throwable {
this.insertRowsStreamingBuffer);
Pair<List<Map<String, Object>>, List<Long>> recordsAndOffsets =
this.insertRowsStreamingBuffer.getData();
List<Map<String, Object>> records = recordsAndOffsets.getKey();
List<Long> offsets = recordsAndOffsets.getValue();
InsertValidationResponse finalResponse = new InsertValidationResponse();
List<Map<String, Object>> records = recordsAndOffsets.getKey();
boolean needToResetOffset = false;
InsertValidationResponse response =
this.channel.insertRows(
records,
Long.toString(this.insertRowsStreamingBuffer.getFirstOffset()),
Long.toString(this.insertRowsStreamingBuffer.getLastOffset()));
if (!enableSchemaEvolution) {
finalResponse =
this.channel.insertRows(
records,
Long.toString(this.insertRowsStreamingBuffer.getFirstOffset()),
Long.toString(this.insertRowsStreamingBuffer.getLastOffset()));
finalResponse = response;
} else {
for (int idx = 0; idx < records.size(); idx++) {
// For schema evolution, we need to call the insertRows API row by row in order to
// preserve the original order, for anything after the first schema mismatch error we will
// retry after the evolution
InsertValidationResponse response =
this.channel.insertRow(records.get(idx), Long.toString(offsets.get(idx)));
if (response.hasErrors()) {
InsertValidationResponse.InsertError insertError = response.getInsertErrors().get(0);
List<String> extraColNames = insertError.getExtraColNames();

List<String> missingNotNullColNames = insertError.getMissingNotNullColNames();
Set<String> nonNullableColumns =
new HashSet<>(
missingNotNullColNames != null
? missingNotNullColNames
: Collections.emptySet());

List<String> nullValueForNotNullColNames = insertError.getNullValueForNotNullColNames();
nonNullableColumns.addAll(
nullValueForNotNullColNames != null
? nullValueForNotNullColNames
: Collections.emptySet());

long originalSinkRecordIdx =
offsets.get(idx) - this.insertRowsStreamingBuffer.getFirstOffset();

if (extraColNames == null && nonNullableColumns.isEmpty()) {
InsertValidationResponse.InsertError newInsertError =
new InsertValidationResponse.InsertError(
insertError.getRowContent(), originalSinkRecordIdx);
newInsertError.setException(insertError.getException());
newInsertError.setExtraColNames(insertError.getExtraColNames());
newInsertError.setMissingNotNullColNames(insertError.getMissingNotNullColNames());
newInsertError.setNullValueForNotNullColNames(
insertError.getNullValueForNotNullColNames());
// Simply added to the final response if it's not schema related errors
finalResponse.addError(insertError);
} else {
SchematizationUtils.evolveSchemaIfNeeded(
this.conn,
this.channel.getTableName(),
new ArrayList<>(nonNullableColumns),
extraColNames,
this.insertRowsStreamingBuffer.getSinkRecord(originalSinkRecordIdx));
// Offset reset needed since it's possible that we successfully ingested partial batch
needToResetOffset = true;
break;
}
for (InsertValidationResponse.InsertError insertError : response.getInsertErrors()) {
List<String> extraColNames = insertError.getExtraColNames();
List<String> missingNotNullColNames = insertError.getMissingNotNullColNames();
Set<String> nonNullableColumns =
new HashSet<>(
missingNotNullColNames != null ? missingNotNullColNames : Collections.emptySet());

List<String> nullValueForNotNullColNames = insertError.getNullValueForNotNullColNames();
nonNullableColumns.addAll(
nullValueForNotNullColNames != null
? nullValueForNotNullColNames
: Collections.emptySet());
if (extraColNames != null || !nonNullableColumns.isEmpty()) {
SchematizationUtils.evolveSchemaIfNeeded(
this.conn,
this.channel.getTableName(),
new ArrayList<>(nonNullableColumns),
extraColNames,
this.insertRowsStreamingBuffer.getSinkRecord(insertError.getRowIndex()));
needToResetOffset = true;
} else {
// Simply added to the final response if it's not schema related errors
finalResponse.addError(insertError);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this filters out schema evolution errors from the list of errors. This is the only difference compared with your old PR @sfc-gh-tzhang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my old PR, I don't have finalResponse anymore. I simply return response if there is no schema evolution related errors which should include all the errors already. Do you see any issue with that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without the finalResponse, the response will contain the schema evolution errors as well. This means in handleInsertRowsFailures we will put the schema evolutions erros in DLQ, this is a behaviour change.

Additionally, the rebuildBufferWithoutErrorRows will filter out the schema evolution errors and such rows wont be inserted ever, right? We don't wont such behaviour

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means in handleInsertRowsFailures we will put the schema evolutions erros in DLQ, this is a behaviour change.

In my old PR, we will return the response before calling handleInsertRowsFailures, right?

Additionally, the rebuildBufferWithoutErrorRows will filter out the schema evolution errors and such rows wont be inserted ever, right? We don't wont such behaviour

Reset the offset token will take care of that, Kafka will send us the same batch again.

}
}
}
Expand Down Expand Up @@ -998,7 +995,7 @@ private SnowflakeStreamingIngestChannel openChannelForTable() {
.setDBName(this.sfConnectorConfig.get(Utils.SF_DATABASE))
.setSchemaName(this.sfConnectorConfig.get(Utils.SF_SCHEMA))
.setTableName(this.tableName)
.setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE)
.setOnErrorOption(OpenChannelRequest.OnErrorOption.SKIP_BATCH)
.setOffsetTokenVerificationFunction(StreamingUtils.offsetTokenVerificationFunction)
.build();
LOGGER.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
import com.snowflake.kafka.connector.internal.BufferThreshold;
import com.snowflake.kafka.connector.internal.SnowflakeConnectionService;
import com.snowflake.kafka.connector.internal.TestUtils;
import com.snowflake.kafka.connector.internal.streaming.channel.TopicPartitionChannel;
import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService;
import com.snowflake.kafka.connector.records.RecordService;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -27,6 +29,7 @@
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -326,4 +329,107 @@ public void testInsertRows_ValidationResponseHasErrors_ErrorTolerance_ALL() thro

assert kafkaRecordErrorReporter.getReportedRecords().size() == 1;
}

@Test
public void testInsertRowsWithSchemaEvolution() throws Exception {
if (this.sfConnectorConfig
.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG)
.equals("true")) {
InsertValidationResponse notSchemaEvolutionErrorResponse = new InsertValidationResponse();
InsertValidationResponse.InsertError notSchemaEvolutionError =
new InsertValidationResponse.InsertError("CONTENT", 0);
notSchemaEvolutionError.setException(SF_EXCEPTION);
notSchemaEvolutionErrorResponse.addError(notSchemaEvolutionError);

InsertValidationResponse schemaEvolutionRecoverableErrorResponse =
new InsertValidationResponse();
InsertValidationResponse.InsertError schemaEvolutionRecoverableError =
new InsertValidationResponse.InsertError("CONTENT", 0);
schemaEvolutionRecoverableError.setException(SF_EXCEPTION);
schemaEvolutionRecoverableError.setExtraColNames(Collections.singletonList("gender"));
schemaEvolutionRecoverableErrorResponse.addError(schemaEvolutionRecoverableError);

Mockito.when(
mockStreamingChannel.insertRows(
ArgumentMatchers.any(), ArgumentMatchers.any(), ArgumentMatchers.any()))
.thenReturn(schemaEvolutionRecoverableErrorResponse)
.thenReturn(notSchemaEvolutionErrorResponse)
.thenReturn(new InsertValidationResponse()); // last insert with correct batch

Mockito.when(mockStreamingChannel.getLatestCommittedOffsetToken()).thenReturn("0");

SnowflakeConnectionService conn = Mockito.mock(SnowflakeConnectionService.class);
Mockito.when(
conn.hasSchemaEvolutionPermission(ArgumentMatchers.any(), ArgumentMatchers.any()))
.thenReturn(true);
Mockito.doNothing()
.when(conn)
.appendColumnsToTable(ArgumentMatchers.any(), ArgumentMatchers.any());

long bufferFlushTimeSeconds = 5L;
StreamingBufferThreshold bufferThreshold =
new StreamingBufferThreshold(bufferFlushTimeSeconds, 1_000 /* < 1KB */, 10000000L);

Map<String, String> sfConnectorConfigWithErrors = new HashMap<>(sfConnectorConfig);
sfConnectorConfigWithErrors.put(
ERRORS_TOLERANCE_CONFIG, SnowflakeSinkConnectorConfig.ErrorTolerance.ALL.toString());
sfConnectorConfigWithErrors.put(ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG, "test_DLQ");
InMemoryKafkaRecordErrorReporter kafkaRecordErrorReporter =
new InMemoryKafkaRecordErrorReporter();

TopicPartitionChannel topicPartitionChannel =
new BufferedTopicPartitionChannel(
mockStreamingClient,
topicPartition,
TEST_CHANNEL_NAME,
TEST_TABLE_NAME,
this.enableSchematization,
bufferThreshold,
sfConnectorConfigWithErrors,
kafkaRecordErrorReporter,
mockSinkTaskContext,
conn,
new RecordService(),
mockTelemetryService,
false,
null);

final int noOfRecords = 3;
List<SinkRecord> records =
TestUtils.createNativeJsonSinkRecords(0, noOfRecords, TOPIC, PARTITION);

for (int idx = 0; idx < records.size(); idx++) {
topicPartitionChannel.insertRecord(records.get(idx), idx == 0);
}

// In an ideal world, put API is going to invoke this to check if flush time threshold has
// reached.
// We are mimicking that call.
// Will wait for 10 seconds.
Thread.sleep(bufferFlushTimeSeconds * 1000 + 10);

topicPartitionChannel.insertBufferedRecordsIfFlushTimeThresholdReached();

// Verify that the buffer is cleaned up and nothing is in DLQ because of schematization error
Assert.assertTrue(topicPartitionChannel.isPartitionBufferEmpty());
Assert.assertEquals(0, kafkaRecordErrorReporter.getReportedRecords().size());

// Do it again without any schematization error, and we should have row in DLQ
for (int idx = 0; idx < records.size(); idx++) {
topicPartitionChannel.insertRecord(records.get(idx), idx == 0);
}

// In an ideal world, put API is going to invoke this to check if flush time threshold has
// reached.
// We are mimicking that call.
// Will wait for 10 seconds.
Thread.sleep(bufferFlushTimeSeconds * 1000 + 10);

topicPartitionChannel.insertBufferedRecordsIfFlushTimeThresholdReached();

// Verify that the buffer is cleaned up and one record is in the DLQ
Assert.assertTrue(topicPartitionChannel.isPartitionBufferEmpty());
Assert.assertEquals(1, kafkaRecordErrorReporter.getReportedRecords().size());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -806,10 +806,11 @@ public void testInsertRows_SuccessAfterReopenChannel() throws Exception {
}

@Test
public void testInsertRowsWithSchemaEvolution() throws Exception {
public void testInsertRowsWithSchemaEvolution_onlySingleBuffer() throws Exception {
if (this.sfConnectorConfig
.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG)
.equals("true")) {
.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG)
.equals("true")
&& !useDoubleBuffer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what needs to be updated without double buffer now

InsertValidationResponse validationResponse1 = new InsertValidationResponse();
InsertValidationResponse.InsertError insertError1 =
new InsertValidationResponse.InsertError("CONTENT", 0);
Expand Down
Loading