Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1151484 Use insertRows for schema evolution #866

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

sfc-gh-wtrefon
Copy link
Contributor

@sfc-gh-wtrefon sfc-gh-wtrefon commented Jun 13, 2024

Overview

SNOW-1161484

Mostly copy of old PR #796

Create channel with SKIP_BATCH instead of CONTINUE, to avoid the case that KC can crash right after adding the good rows to table, and the bad rows will be missing in the DLQ
Update the schematization code to use insertRows instead of insertRow

  • Better performance
  • If KC is configured with a longer buffer flush time, everything will be ingested as one batch so that the 1 second flush or 32MB channel size won't apply

Pre-review checklist

  • This change should be part of a Behavior Change Release. See go/behavior-change.
  • This change has passed Merge gate tests
  • Snowpipe Changes
  • Snowpipe Streaming Changes
  • This change is TEST-ONLY
  • This change is README/Javadocs only
  • This change is protected by a config parameter <PARAMETER_NAME> eg snowflake.ingestion.method.
    • Yes - Added end to end and Unit Tests.
    • No - Suggest why it is not param protected
  • Is his change protected by parameter <PARAMETER_NAME> on the server side?
    • The parameter/feature is not yet active in production (partial rollout or PrPr, see Changes for Unreleased Features and Fixes).
    • If there is an issue, it can be safely mitigated by turning the parameter off. This is also verified by a test (See go/ppp).

if (response.hasErrors()) {
handleInsertRowsFailures(
response.getInsertErrors(), streamingBufferToInsert.getSinkRecords());
insertRecords(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to reset the offset if we put here the schema evolution records as well?
@sfc-gh-tzhang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, why we need to do that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are setting this flag needToResetOffset to be true in case of schema evolution error. I'm not sure what it exactly does and if we need this if we now reinsert the rows by rebuild the buffer with rebuildBufferWithoutErrorRows

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you see how needToResetOffset will be used, when it's true, we will reopen the channel and restart from the latest committed offset token

needToResetOffset = true;
} else {
// Simply added to the final response if it's not schema related errors
finalResponse.addError(insertError);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this filters out schema evolution errors from the list of errors. This is the only difference compared with your old PR @sfc-gh-tzhang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my old PR, I don't have finalResponse anymore. I simply return response if there is no schema evolution related errors which should include all the errors already. Do you see any issue with that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without the finalResponse, the response will contain the schema evolution errors as well. This means in handleInsertRowsFailures we will put the schema evolutions erros in DLQ, this is a behaviour change.

Additionally, the rebuildBufferWithoutErrorRows will filter out the schema evolution errors and such rows wont be inserted ever, right? We don't wont such behaviour

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means in handleInsertRowsFailures we will put the schema evolutions erros in DLQ, this is a behaviour change.

In my old PR, we will return the response before calling handleInsertRowsFailures, right?

Additionally, the rebuildBufferWithoutErrorRows will filter out the schema evolution errors and such rows wont be inserted ever, right? We don't wont such behaviour

Reset the offset token will take care of that, Kafka will send us the same batch again.

if (response.hasErrors()) {
handleInsertRowsFailures(
response.getInsertErrors(), streamingBufferToInsert.getSinkRecords());
insertRecords(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, why we need to do that?

@@ -540,6 +546,22 @@ public InsertRowsResponse insertRecords(StreamingBuffer streamingBufferToInsert)
return response;
}

/** Building a new buffer which contains only the good rows from the original buffer */
private StreamingBuffer rebuildBufferWithoutErrorRows(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a unit test for this function

needToResetOffset = true;
} else {
// Simply added to the final response if it's not schema related errors
finalResponse.addError(insertError);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my old PR, I don't have finalResponse anymore. I simply return response if there is no schema evolution related errors which should include all the errors already. Do you see any issue with that?

.equals("true")) {
.get(SnowflakeSinkConnectorConfig.ENABLE_SCHEMATIZATION_CONFIG)
.equals("true")
&& !useDoubleBuffer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what needs to be updated without double buffer now

@sfc-gh-wtrefon
Copy link
Contributor Author

@sfc-gh-tzhang another question, do we need this change? With removal of the extra buffer we will get rid of this code anyway, do we want to invest time here?

@sfc-gh-akowalczyk
Copy link
Contributor

@sfc-gh-tzhang another question, do we need this change? With removal of the extra buffer we will get rid of this code anyway, do we want to invest time here?

@sfc-gh-tzhang friendly reminder about the question above ☝️

@sfc-gh-tzhang
Copy link
Contributor

@sfc-gh-tzhang another question, do we need this change? With removal of the extra buffer we will get rid of this code anyway, do we want to invest time here?

How would you do schema evolution with the removal of the extra buffer? I don't see a design doc so I don't know the answer.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants