Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-0000: extended telemetry with partitionId, fixed the event type sen… #958

Merged
merged 3 commits into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@ public PipeProgressRegistryTelemetry(
this.telemetryService = telemetryService;
}

public void reportKafkaPartitionUsage(boolean isClosing) {
telemetryService.reportKafkaPartitionUsage(pipeCreation, isClosing);
public void reportKafkaPartitionStart() {
telemetryService.reportKafkaPartitionUsage(pipeCreation, false);
}

public void reportKafkaPartitionUsage() {
telemetryService.reportKafkaPartitionUsage(pipeTelemetry, false);
}

public void reportKafkaConnectFatalError(String message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,12 @@ private ServiceContext(

this.pipeStatus =
new SnowflakeTelemetryPipeStatus(
tableName, stageName, pipeName, enableCustomJMXMonitoring, this.metricsJmxReporter);
tableName,
stageName,
pipeName,
partition,
enableCustomJMXMonitoring,
this.metricsJmxReporter);

if (enableCustomJMXMonitoring) {
partitionBufferCountHistogram =
Expand Down Expand Up @@ -593,8 +598,8 @@ private void init(long recordOffset) {
} catch (Exception e) {
LOGGER.warn("Cleaner and Flusher threads shut down before initialization");
}
telemetryService.reportKafkaPartitionStart(pipeCreation);
}
telemetryService.reportKafkaPartitionStart(pipeCreation);
sfc-gh-gjachimko marked this conversation as resolved.
Show resolved Hide resolved
}

private boolean resetCleanerFiles() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public ProgressRegister trackFilesAsync() {
close();

SnowflakeTelemetryPipeCreation pipeCreation =
new SnowflakeTelemetryPipeCreation(tableName, stageName, pipeName);
preparePipeStartTelemetryEvent(tableName, stageName, pipeName);
ProgressRegisterImpl register = new ProgressRegisterImpl(this);

PipeProgressRegistryTelemetry telemetry =
Expand Down Expand Up @@ -200,13 +200,27 @@ void trackFiles(ProgressRegisterImpl register, PipeProgressRegistryTelemetry pro
final String threadName =
String.format("file-processor-[%s/%d:%s]", topic, partition, tableName);

progressTelemetry.reportKafkaPartitionStart();

cleanerTaskHolder.set(
schedulingExecutor.scheduleWithFixedDelay(
() -> {
Thread.currentThread().setName(threadName);
try {
// update metrics
progressTelemetry.reportKafkaPartitionUsage(false);
// cleaner starts along with the partition task, but until table, stage and pipe
// aren't created - there is no point in querying the stage.
if (isFirstRun.get()
&& checkPreRequisites() != CleanerPrerequisites.PIPE_COMPATIBLE) {
LOGGER.debug(
"neither table {} nor stage {} nor pipe {} have been initialized yet,"
+ " skipping cycle...",
tableName,
stageName,
pipeName);
return;
}

progressTelemetry.reportKafkaPartitionUsage();

// add all files which might have been collected during the last cycle for
// processing
Expand Down Expand Up @@ -422,7 +436,8 @@ private void moveFailedFiles(
String.join(", ", failedFiles));
// underlying code moves files one by one, so this is not going to impact performance.
// we have been observing scenarios, when the file listed at the start of the process was
// removed - this would cause cleaner to fail - so instead we process file one by one to
// removed, or process didn't have access to it - this would cause cleaner to fail - so
// instead we process file one by one to
// ensure all the ones which are still present will be actually moved
failedFiles.stream()
.map(Lists::newArrayList)
Expand All @@ -431,8 +446,11 @@ private void moveFailedFiles(
try {
conn.moveToTableStage(tableName, stageName, failedFile);
} catch (SnowflakeKafkaConnectorException e) {
telemetryService.reportKafkaConnectFatalError(
String.format("[cleaner for pipe %s]: %s", pipeName, e.getMessage()));
LOGGER.warn(
"Could not move file {} for pipe {} to table stage due to {} <{}>",
"Could not move file {} for pipe {} to table stage due to {} <{}>\n"
+ "File won't be tracked.",
failedFile.get(0),
pipeName,
e.getMessage(),
Expand Down Expand Up @@ -492,6 +510,70 @@ private void purgeDirtyFiles(Set<String> files) {
}
}

enum CleanerPrerequisites {
NONE,
TABLE_COMPATIBLE,
STAGE_COMPATIBLE,
PIPE_COMPATIBLE,
}

private SnowflakeTelemetryPipeCreation preparePipeStartTelemetryEvent(
String tableName, String stageName, String pipeName) {
SnowflakeTelemetryPipeCreation result =
new SnowflakeTelemetryPipeCreation(tableName, stageName, pipeName);
boolean canListFiles = false;

switch (checkPreRequisites()) {
case PIPE_COMPATIBLE:
result.setReusePipe(true);
case STAGE_COMPATIBLE:
result.setReuseStage(true);
canListFiles = true;
case TABLE_COMPATIBLE:
result.setReuseTable(true);
default:
break;
}

if (canListFiles) {
try {
List<String> stageFiles = conn.listStage(stageName, prefix);
result.setFileCountRestart(stageFiles.size());
} catch (Exception err) {
LOGGER.warn(
"could not list remote stage {} - {}<{}>\n{}",
stageName,
err.getMessage(),
err.getClass(),
err.getStackTrace());
}
// at this moment, file processor does not know how many files should be reprocessed...
result.setFileCountReprocessPurge(0);
}

return result;
}

private CleanerPrerequisites checkPreRequisites() {
CleanerPrerequisites result = CleanerPrerequisites.NONE;
if (conn.tableExist(tableName)) {
sfc-gh-gjachimko marked this conversation as resolved.
Show resolved Hide resolved
if (conn.isTableCompatible(tableName)) {
result = CleanerPrerequisites.TABLE_COMPATIBLE;
if (conn.stageExist(stageName)) {
if (conn.isStageCompatible(stageName)) {
result = CleanerPrerequisites.STAGE_COMPATIBLE;
if (conn.pipeExist(pipeName)) {
if (conn.isPipeCompatible(tableName, stageName, pipeName)) {
result = CleanerPrerequisites.PIPE_COMPATIBLE;
}
}
}
}
}
}
return result;
}

public static class FileCategorizer {
private final Set<String> dirtyFiles = new HashSet<>();
private final Map<String, IngestEntry> stageFiles = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_TABLE_STAGE_INGEST_FAIL;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FLUSHED_OFFSET;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.MEMORY_USAGE;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.PARTITION_ID;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.PIPE_NAME;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.PROCESSED_OFFSET;
import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.PURGED_OFFSET;
Expand Down Expand Up @@ -125,16 +126,19 @@ public class SnowflakeTelemetryPipeStatus extends SnowflakeTelemetryBasicInfo {

private final String stageName;
private final String pipeName;
private final int partitionId;

public SnowflakeTelemetryPipeStatus(
final String tableName,
final String stageName,
final String pipeName,
final int partitionId,
final boolean enableCustomJMXConfig,
final MetricsJmxReporter metricsJmxReporter) {
super(tableName, SnowflakeTelemetryService.TelemetryType.KAFKA_PIPE_USAGE);
this.stageName = stageName;
this.pipeName = pipeName;
this.partitionId = partitionId;

// Initial value of processed/flushed/committed/purged offset should be set to -1,
// because the offset stands for the last offset of the record that are at the status.
Expand Down Expand Up @@ -282,6 +286,7 @@ public void dumpTo(ObjectNode msg) {
msg.put(TABLE_NAME, tableName);
msg.put(STAGE_NAME, stageName);
msg.put(PIPE_NAME, pipeName);
msg.put(PARTITION_ID, partitionId);

msg.put(PROCESSED_OFFSET, processedOffset.get());
msg.put(FLUSHED_OFFSET, flushedOffset.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public final class TelemetryConstants {
public static final String TABLE_NAME = "table_name";
public static final String STAGE_NAME = "stage_name";
public static final String PIPE_NAME = "pipe_name";
public static final String PARTITION_ID = "partition_id";
public static final String CONNECTOR_NAME = "connector_name";

public static final String PROCESSED_OFFSET = "processed-offset";
Expand Down
Loading
Loading