diff --git a/src/main/java/com/snowflake/kafka/connector/internal/PipeProgressRegistryTelemetry.java b/src/main/java/com/snowflake/kafka/connector/internal/PipeProgressRegistryTelemetry.java index 6518782cc..a04ed384c 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/PipeProgressRegistryTelemetry.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/PipeProgressRegistryTelemetry.java @@ -19,8 +19,12 @@ public PipeProgressRegistryTelemetry( this.telemetryService = telemetryService; } - public void reportKafkaPartitionUsage(boolean isClosing) { - telemetryService.reportKafkaPartitionUsage(pipeCreation, isClosing); + public void reportKafkaPartitionStart() { + telemetryService.reportKafkaPartitionUsage(pipeCreation, false); + } + + public void reportKafkaPartitionUsage() { + telemetryService.reportKafkaPartitionUsage(pipeTelemetry, false); } public void reportKafkaConnectFatalError(String message) { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java index 61cbaac90..616a90bd1 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.java @@ -536,7 +536,12 @@ private ServiceContext( this.pipeStatus = new SnowflakeTelemetryPipeStatus( - tableName, stageName, pipeName, enableCustomJMXMonitoring, this.metricsJmxReporter); + tableName, + stageName, + pipeName, + partition, + enableCustomJMXMonitoring, + this.metricsJmxReporter); if (enableCustomJMXMonitoring) { partitionBufferCountHistogram = @@ -593,8 +598,9 @@ private void init(long recordOffset) { } catch (Exception e) { LOGGER.warn("Cleaner and Flusher threads shut down before initialization"); } + // with v2 cleaner enabled, this event is raised by the cleaner itself + telemetryService.reportKafkaPartitionStart(pipeCreation); } - telemetryService.reportKafkaPartitionStart(pipeCreation); } private boolean resetCleanerFiles() { diff --git a/src/main/java/com/snowflake/kafka/connector/internal/StageFilesProcessor.java b/src/main/java/com/snowflake/kafka/connector/internal/StageFilesProcessor.java index 140e145ba..40d718cc2 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/StageFilesProcessor.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/StageFilesProcessor.java @@ -26,6 +26,7 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import net.snowflake.ingest.connection.HistoryResponse; @@ -165,7 +166,7 @@ public ProgressRegister trackFilesAsync() { close(); SnowflakeTelemetryPipeCreation pipeCreation = - new SnowflakeTelemetryPipeCreation(tableName, stageName, pipeName); + preparePipeStartTelemetryEvent(tableName, stageName, pipeName); ProgressRegisterImpl register = new ProgressRegisterImpl(this); PipeProgressRegistryTelemetry telemetry = @@ -200,13 +201,27 @@ void trackFiles(ProgressRegisterImpl register, PipeProgressRegistryTelemetry pro final String threadName = String.format("file-processor-[%s/%d:%s]", topic, partition, tableName); + progressTelemetry.reportKafkaPartitionStart(); + cleanerTaskHolder.set( schedulingExecutor.scheduleWithFixedDelay( () -> { Thread.currentThread().setName(threadName); try { - // update metrics - progressTelemetry.reportKafkaPartitionUsage(false); + // cleaner starts along with the partition task, but until table, stage and pipe + // aren't created - there is no point in querying the stage. + if (isFirstRun.get() + && checkPreRequisites() != CleanerPrerequisites.PIPE_COMPATIBLE) { + LOGGER.debug( + "neither table {} nor stage {} nor pipe {} have been initialized yet," + + " skipping cycle...", + tableName, + stageName, + pipeName); + return; + } + + progressTelemetry.reportKafkaPartitionUsage(); // add all files which might have been collected during the last cycle for // processing @@ -422,7 +437,8 @@ private void moveFailedFiles( String.join(", ", failedFiles)); // underlying code moves files one by one, so this is not going to impact performance. // we have been observing scenarios, when the file listed at the start of the process was - // removed - this would cause cleaner to fail - so instead we process file one by one to + // removed, or process didn't have access to it - this would cause cleaner to fail - so + // instead we process file one by one to // ensure all the ones which are still present will be actually moved failedFiles.stream() .map(Lists::newArrayList) @@ -431,8 +447,11 @@ private void moveFailedFiles( try { conn.moveToTableStage(tableName, stageName, failedFile); } catch (SnowflakeKafkaConnectorException e) { + telemetryService.reportKafkaConnectFatalError( + String.format("[cleaner for pipe %s]: %s", pipeName, e.getMessage())); LOGGER.warn( - "Could not move file {} for pipe {} to table stage due to {} <{}>", + "Could not move file {} for pipe {} to table stage due to {} <{}>\n" + + "File won't be tracked.", failedFile.get(0), pipeName, e.getMessage(), @@ -492,6 +511,71 @@ private void purgeDirtyFiles(Set files) { } } + enum CleanerPrerequisites { + NONE, + TABLE_COMPATIBLE, + STAGE_COMPATIBLE, + PIPE_COMPATIBLE, + } + + private SnowflakeTelemetryPipeCreation preparePipeStartTelemetryEvent( + String tableName, String stageName, String pipeName) { + SnowflakeTelemetryPipeCreation result = + new SnowflakeTelemetryPipeCreation(tableName, stageName, pipeName); + boolean canListFiles = false; + + switch (checkPreRequisites()) { + case PIPE_COMPATIBLE: + result.setReusePipe(true); + case STAGE_COMPATIBLE: + result.setReuseStage(true); + canListFiles = true; + case TABLE_COMPATIBLE: + result.setReuseTable(true); + default: + break; + } + + if (canListFiles) { + try { + List stageFiles = conn.listStage(stageName, prefix); + result.setFileCountRestart(stageFiles.size()); + } catch (Exception err) { + LOGGER.warn( + "could not list remote stage {} - {}<{}>\n{}", + stageName, + err.getMessage(), + err.getClass(), + err.getStackTrace()); + } + // at this moment, file processor does not know how many files should be reprocessed... + result.setFileCountReprocessPurge(0); + } + + return result; + } + + private CleanerPrerequisites checkPreRequisites() { + CleanerPrerequisites result = CleanerPrerequisites.NONE; + Supplier tableCompatible = + () -> conn.tableExist(tableName) && conn.isTableCompatible(tableName); + Supplier stageCompatible = + () -> conn.stageExist(stageName) && conn.isStageCompatible(stageName); + Supplier pipeCompatible = + () -> conn.pipeExist(pipeName) && conn.isPipeCompatible(tableName, stageName, pipeName); + + if (tableCompatible.get()) { + result = CleanerPrerequisites.TABLE_COMPATIBLE; + if (stageCompatible.get()) { + result = CleanerPrerequisites.STAGE_COMPATIBLE; + if (pipeCompatible.get()) { + result = CleanerPrerequisites.PIPE_COMPATIBLE; + } + } + } + return result; + } + public static class FileCategorizer { private final Set dirtyFiles = new HashSet<>(); private final Map stageFiles = new HashMap<>(); diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java index 27ab511fd..65f29ff1f 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryPipeStatus.java @@ -22,6 +22,7 @@ import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FILE_COUNT_TABLE_STAGE_INGEST_FAIL; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.FLUSHED_OFFSET; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.MEMORY_USAGE; +import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.PARTITION_ID; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.PIPE_NAME; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.PROCESSED_OFFSET; import static com.snowflake.kafka.connector.internal.telemetry.TelemetryConstants.PURGED_OFFSET; @@ -125,16 +126,19 @@ public class SnowflakeTelemetryPipeStatus extends SnowflakeTelemetryBasicInfo { private final String stageName; private final String pipeName; + private final int partitionId; public SnowflakeTelemetryPipeStatus( final String tableName, final String stageName, final String pipeName, + final int partitionId, final boolean enableCustomJMXConfig, final MetricsJmxReporter metricsJmxReporter) { super(tableName, SnowflakeTelemetryService.TelemetryType.KAFKA_PIPE_USAGE); this.stageName = stageName; this.pipeName = pipeName; + this.partitionId = partitionId; // Initial value of processed/flushed/committed/purged offset should be set to -1, // because the offset stands for the last offset of the record that are at the status. @@ -282,6 +286,7 @@ public void dumpTo(ObjectNode msg) { msg.put(TABLE_NAME, tableName); msg.put(STAGE_NAME, stageName); msg.put(PIPE_NAME, pipeName); + msg.put(PARTITION_ID, partitionId); msg.put(PROCESSED_OFFSET, processedOffset.get()); msg.put(FLUSHED_OFFSET, flushedOffset.get()); diff --git a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/TelemetryConstants.java b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/TelemetryConstants.java index de02dda73..cad3d5968 100644 --- a/src/main/java/com/snowflake/kafka/connector/internal/telemetry/TelemetryConstants.java +++ b/src/main/java/com/snowflake/kafka/connector/internal/telemetry/TelemetryConstants.java @@ -8,6 +8,7 @@ public final class TelemetryConstants { public static final String TABLE_NAME = "table_name"; public static final String STAGE_NAME = "stage_name"; public static final String PIPE_NAME = "pipe_name"; + public static final String PARTITION_ID = "partition_id"; public static final String CONNECTOR_NAME = "connector_name"; public static final String PROCESSED_OFFSET = "processed-offset"; diff --git a/src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java b/src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java index 96cde1c8c..7a655d81c 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/StageFilesProcessorTest.java @@ -15,6 +15,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.snowflake.kafka.connector.internal.streaming.IngestionMethodConfig; +import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryBasicInfo; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryPipeCreation; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryPipeStatus; import com.snowflake.kafka.connector.internal.telemetry.SnowflakeTelemetryService; @@ -32,6 +34,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.stream.Collectors; +import net.snowflake.client.jdbc.internal.fasterxml.jackson.databind.node.ObjectNode; import net.snowflake.client.jdbc.internal.joda.time.DateTime; import net.snowflake.client.jdbc.internal.joda.time.DateTimeZone; import org.junit.jupiter.api.BeforeEach; @@ -53,10 +57,8 @@ class StageFilesProcessorTest { private StageFilesProcessor.ProgressRegisterImpl register; private SnowflakeTelemetryPipeCreation pipeCreation; private SnowflakeTelemetryPipeStatus pipeTelemetry; - private SnowflakeTelemetryService telemetryService; - + private FakeSnowflakeTelemetryService telemetryService; private PipeProgressRegistryTelemetry telemetry; - // private AtomicInteger loops; private AtomicReference> nextTickCallback; private AtomicReference> scheduledFuture; @@ -69,8 +71,10 @@ void setUp() { conn = Mockito.mock(SnowflakeConnectionService.class); ingestionService = Mockito.mock(SnowflakeIngestionService.class); pipeCreation = new SnowflakeTelemetryPipeCreation(TABLE_NAME, STAGE_NAME, PIPE_NAME); - pipeTelemetry = Mockito.mock(SnowflakeTelemetryPipeStatus.class); - telemetryService = Mockito.mock(SnowflakeTelemetryService.class); + pipeCreation = new SnowflakeTelemetryPipeCreation(TABLE_NAME, STAGE_NAME, PIPE_NAME); + pipeTelemetry = + new SnowflakeTelemetryPipeStatus(TABLE_NAME, STAGE_NAME, PIPE_NAME, 0, false, null); + telemetryService = new FakeSnowflakeTelemetryService(); telemetry = new PipeProgressRegistryTelemetry(pipeCreation, pipeTelemetry, telemetryService); } @@ -104,17 +108,29 @@ void fileProcessor_WillTerminateOnStopSignal() { void fileProcessor_WillTryToGetInitialStateFromStage_ExactlyOnce() { // lets simulate 1 hour (60 cleanup cycles) createFileProcessor(60); + configureInitialState(); when(conn.listStage(STAGE_NAME, PREFIX)).thenReturn(new ArrayList<>()); victim.trackFiles(register, telemetry); + assertThat(telemetryService.records).hasSize(61); + List statuses = + telemetryService.records.stream() + .filter(r -> r instanceof SnowflakeTelemetryPipeStatus) + .map(SnowflakeTelemetryPipeStatus.class::cast) + .collect(Collectors.toList()); + assertThat(statuses).hasSize(60); + verify(conn, times(1)).listStage(STAGE_NAME, PREFIX); + assertInitialStateWasConfigured(); } @Test void fileProcessor_WillTryGettingStateFromStage_OnError() { createFileProcessor(10); + configureInitialState(); + String ingestFile = String.format("connector/topic/0/1_9_%d.json.gz", currentTime.get()); register.registerNewStageFile(ingestFile); @@ -128,11 +144,13 @@ void fileProcessor_WillTryGettingStateFromStage_OnError() { verify(conn, times(2)).listStage(STAGE_NAME, PREFIX); verify(ingestionService, times(10)).readIngestHistoryForward(anyMap(), any(), any(), anyInt()); + assertInitialStateWasConfigured(); } @Test void fileProcessor_WillPurgeLoadedFiles_WhenHistoryIsAvailable_AfterFilesHaveBeenSubmitted() { createFileProcessor(10); + configureInitialState(); String file1 = String.format("connector/topic/0/1_9_%d.json.gz", currentTime.get()); String file2 = String.format("connector/topic/0/10_19_%d.json.gz", currentTime.get()); @@ -178,11 +196,13 @@ void fileProcessor_WillPurgeLoadedFiles_WhenHistoryIsAvailable_AfterFilesHaveBee verify(ingestionService, times(3)).readIngestHistoryForward(anyMap(), any(), any(), anyInt()); verify(conn, times(3)).purgeStage(anyString(), anyList()); assertThat(purgedFiles).containsOnly(file1, file2, file3); + assertInitialStateWasConfigured(); } @Test void fileProcessor_WillPurgeLoadedFiles_WhenHistoryIsFetchedSooner_ThanFilesAreRegistered() { createFileProcessor(10); + configureInitialState(); String file1 = String.format("connector/topic/0/1_9_%d.json.gz", currentTime.get()); String file2 = String.format("connector/topic/0/10_19_%d.json.gz", currentTime.get()); @@ -230,11 +250,13 @@ void fileProcessor_WillPurgeLoadedFiles_WhenHistoryIsFetchedSooner_ThanFilesAreR verify(ingestionService, times(3)).readIngestHistoryForward(anyMap(), any(), any(), anyInt()); verify(conn, times(3)).purgeStage(anyString(), anyList()); assertThat(purgedFiles).containsOnly(file1, file2, file3); + assertInitialStateWasConfigured(); } @Test void fileProcessor_WillMoveOldFilesToTableStage() { createFileProcessor(60); + configureInitialState(); String file1 = String.format("connector/topic/0/1_9_%d.json.gz", currentTime.get()); String file2 = String.format("connector/topic/0/10_19_%d.json.gz", currentTime.get()); @@ -264,11 +286,13 @@ void fileProcessor_WillMoveOldFilesToTableStage() { verify(conn, times(3)).moveToTableStage(anyString(), anyString(), failedFiles.capture()); assertThat(failedFiles.getAllValues().stream().flatMap(Collection::stream)) .containsOnly(file1, file2, file3); + assertInitialStateWasConfigured(); } @Test void fileProcessor_WillMoveFailedFilesToStageEvenWhenOneFails() { createFileProcessor(61); + configureInitialState(); String file1 = String.format("connector/topic/0/1_9_%d.json.gz", currentTime.get()); String file2 = String.format("connector/topic/0/10_19_%d.json.gz", currentTime.get()); @@ -308,11 +332,14 @@ void fileProcessor_WillMoveFailedFilesToStageEvenWhenOneFails() { verify(conn, times(3)).moveToTableStage(anyString(), anyString(), failedFiles.capture()); assertThat(failedFiles.getAllValues().stream().flatMap(Collection::stream)) .containsOnly(file1, file2, file3); + + assertInitialStateWasConfigured(); } @Test void fileProcessor_WillTreatFreshStaleFileAsStageOne() { createFileProcessor(61); + configureInitialState(); String file1 = String.format( @@ -347,11 +374,13 @@ void fileProcessor_WillTreatFreshStaleFileAsStageOne() { // table stage verify(conn, times(1)).moveToTableStage(anyString(), anyString(), failedFiles.capture()); assertThat(failedFiles.getValue()).containsOnly(file1); + assertInitialStateWasConfigured(); } @Test void fileProcessor_WillProperlyHandleStaleFiles() { createFileProcessor(13); + configureInitialState(); String fileOk = String.format("connector/topic/0/10_19_%d.json.gz", currentTime.get()); String fileFailed = String.format("connector/topic/0/20_29_%d.json.gz", currentTime.get()); @@ -389,11 +418,13 @@ void fileProcessor_WillProperlyHandleStaleFiles() { assertThat(failedFiles.getValue()).containsOnly(fileFailed); assertThat(loadedFiles.getValue()).containsOnly(fileOk); + assertInitialStateWasConfigured(); } @Test void fileProcessor_WillDeleteDirtyFiles() { createFileProcessor(10); + configureInitialState(); String file1 = String.format("connector/topic/0/100_199_%d.json.gz", currentTime.get()); String file2 = String.format("connector/topic/0/200_299_%d.json.gz", currentTime.get()); @@ -414,11 +445,14 @@ void fileProcessor_WillDeleteDirtyFiles() { verify(conn, times(1)).listStage(STAGE_NAME, PREFIX); verify(conn, times(1)).purgeStage(anyString(), purgedFiles.capture()); assertThat(purgedFiles.getValue()).containsOnly(file1, file2, file3); + + assertInitialStateWasConfigured(); } @Test void fileProcessor_WillCleanHistoryEntries_OlderThanOneHour() { createFileProcessor(61); + configureInitialState(); String file1 = String.format("connector/topic/0/1_9_%d.json.gz", currentTime.get()); String file2 = String.format("connector/topic/0/10_19_%d.json.gz", currentTime.get()); @@ -457,9 +491,28 @@ void fileProcessor_WillCleanHistoryEntries_OlderThanOneHour() { // check if these old not matched files have been removed from history and do not result in // memory leak assertThat(register.currentProcessorContext.ingestHistory).isEmpty(); + assertInitialStateWasConfigured(); + } + + private void configureInitialState() { + when(conn.tableExist(TABLE_NAME)).thenReturn(true); + when(conn.isTableCompatible(TABLE_NAME)).thenReturn(true); + when(conn.stageExist(STAGE_NAME)).thenReturn(true); + when(conn.isStageCompatible(STAGE_NAME)).thenReturn(true); + when(conn.pipeExist(PIPE_NAME)).thenReturn(true); + when(conn.isPipeCompatible(TABLE_NAME, STAGE_NAME, PIPE_NAME)).thenReturn(true); } - private static ScheduledExecutorService createTestScheduler( + private void assertInitialStateWasConfigured() { + verify(conn, times(1)).tableExist(TABLE_NAME); + verify(conn, times(1)).isTableCompatible(TABLE_NAME); + verify(conn, times(1)).stageExist(STAGE_NAME); + verify(conn, times(1)).isStageCompatible(STAGE_NAME); + verify(conn, times(1)).pipeExist(PIPE_NAME); + verify(conn, times(1)).isPipeCompatible(TABLE_NAME, STAGE_NAME, PIPE_NAME); + } + + static ScheduledExecutorService createTestScheduler( int ticks, AtomicLong currentTime, AtomicReference> nextTickCallback, @@ -493,4 +546,30 @@ private static ScheduledExecutorService createTestScheduler( }); return service; } + + class FakeSnowflakeTelemetryService extends SnowflakeTelemetryService { + + List records = new ArrayList(); + + @Override + public ObjectNode getObjectNode() { + return getDefaultObjectNode(IngestionMethodConfig.SNOWPIPE); + } + + @Override + public void reportKafkaConnectFatalError(String errorDetail) { + records.add(errorDetail); + } + + @Override + public void reportKafkaPartitionUsage( + SnowflakeTelemetryBasicInfo partitionStatus, boolean isClosing) { + records.add(partitionStatus); + } + + @Override + public void reportKafkaPartitionStart(SnowflakeTelemetryBasicInfo partitionCreation) { + records.add(partitionCreation); + } + } } diff --git a/src/test/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryServiceTest.java b/src/test/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryServiceTest.java index e44b52ef9..45d0764ad 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryServiceTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/telemetry/SnowflakeTelemetryServiceTest.java @@ -206,7 +206,7 @@ public void testReportKafkaPartitionUsage(IngestionMethodConfig ingestionMethodC if (ingestionMethodConfig == IngestionMethodConfig.SNOWPIPE) { SnowflakeTelemetryPipeStatus pipeStatus = new SnowflakeTelemetryPipeStatus( - expectedTableName, expectedStageName, expectedPipeName, false, null); + expectedTableName, expectedStageName, expectedPipeName, 0, false, null); pipeStatus.setFlushedOffset(expectedFlushedOffset); pipeStatus.setProcessedOffset(expectedProcessedOffset); pipeStatus.setCommittedOffset(expectedCommittedOffset); diff --git a/src/test/java/com/snowflake/kafka/connector/internal/telemetry/TelemetryUnitTest.java b/src/test/java/com/snowflake/kafka/connector/internal/telemetry/TelemetryUnitTest.java index f3cd2d09d..f704c1df9 100644 --- a/src/test/java/com/snowflake/kafka/connector/internal/telemetry/TelemetryUnitTest.java +++ b/src/test/java/com/snowflake/kafka/connector/internal/telemetry/TelemetryUnitTest.java @@ -16,7 +16,7 @@ public void testEmptyPipeStatusObject() { new MetricsJmxReporter(new MetricRegistry(), connectorName); SnowflakeTelemetryPipeStatus pipeStatus = new SnowflakeTelemetryPipeStatus( - table, stage, pipe, true /* Set true for test*/, metricsJmxReporter); + table, stage, pipe, 0, true /* Set true for test*/, metricsJmxReporter); assert pipeStatus.isEmpty(); pipeStatus.setAverageCommitLagFileCount(1); assert !pipeStatus.isEmpty();