Skip to content

Commit

Permalink
feat(ingestion): Copy urns from previous checkpoint state on ingestio…
Browse files Browse the repository at this point in the history
…n failure (datahub-project#10347)
  • Loading branch information
shubhamjagtap639 authored May 7, 2024
1 parent d08f36f commit ae3f0fd
Show file tree
Hide file tree
Showing 9 changed files with 368 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ class IngestionCheckpointingProviderBase(StatefulCommittable[CheckpointJobStates
The base class for all checkpointing state provider implementations.
"""

def __init__(
self, name: str, commit_policy: CommitPolicy = CommitPolicy.ON_NO_ERRORS
):
def __init__(self, name: str, commit_policy: CommitPolicy = CommitPolicy.ALWAYS):
# Set the initial state to an empty dict.
super().__init__(name, commit_policy, {})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ def set_job_id(self, unique_id):
def is_checkpointing_enabled(self) -> bool:
return self.checkpointing_enabled

def _get_state_obj(self):
return self.state_type_class()

def create_checkpoint(self) -> Optional[Checkpoint]:
if self.is_checkpointing_enabled() and not self._ignore_new_state():
assert self.stateful_ingestion_config is not None
Expand All @@ -172,7 +175,7 @@ def create_checkpoint(self) -> Optional[Checkpoint]:
job_name=self.job_id,
pipeline_name=self.pipeline_name,
run_id=self.run_id,
state=self.state_type_class(),
state=self._get_state_obj(),
)
return None

Expand Down Expand Up @@ -255,9 +258,13 @@ def gen_removed_entity_workunits(self) -> Iterable[MetadataWorkUnit]:
# If the source already had a failure, skip soft-deletion.
# TODO: Eventually, switch this to check if anything in the pipeline had a failure so far.
if self.source.get_report().failures:
for urn in last_checkpoint_state.get_urns_not_in(
type="*", other_checkpoint_state=cur_checkpoint_state
):
self.add_entity_to_state("", urn)
self.source.get_report().report_warning(
"stale-entity-removal",
"Skipping stale entity soft-deletion since source already had failures.",
"Skipping stale entity soft-deletion and coping urns from last state since source already had failures.",
)
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
"config": "",
"state": {
"formatVersion": "1.0",
"serde": "base85-bz2-json",
"payload": "LRx4!F+o`-Q(1w>5G4QrYoCBnWH=B60MH7jr`{?c0BA?5L)2-AGyu>6y;V<9hz%Mv0Bt1*)lOMzr>a0|Iq-4VtTsYONQsFPLn1EpdQS;HIy|&CvSAlRvAJwmtCEM+Rx(v_)~sVvkx3V@WX4O`=losC6yZWb2OL0@"
"serde": "utf-8",
"payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\"]}"
},
"runId": "dummy-test-stateful-ingestion"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
"config": "",
"state": {
"formatVersion": "1.0",
"serde": "base85-bz2-json",
"payload": "LRx4!F+o`-Q(317h`0a%NgsevWH1l}0MH7jr`{?c0B9vdZ9%mLfYG4P6;f$2G%+v`9z&~6n|e(JEPC2_Iix~CA_im)jR-zsjEK*yo|HQz#IUUHtf@DYVEme-lUW9{Xmmt~y^2jCdyY95az!{$kf#WUxB"
"serde": "utf-8",
"payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\"]}"
},
"runId": "dummy-test-stateful-ingestion"
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(file,dummy_stateful,prod),default_stale_entity_removal)",
"changeType": "UPSERT",
"aspectName": "datahubIngestionCheckpoint",
"aspect": {
"json": {
"timestampMillis": 1586847600000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"pipelineName": "dummy_stateful",
"platformInstanceId": "",
"config": "",
"state": {
"formatVersion": "1.0",
"serde": "utf-8",
"payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\"]}"
},
"runId": "dummy-test-stateful-ingestion"
}
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(file,dummy_stateful,prod),default_stale_entity_removal)",
"changeType": "UPSERT",
"aspectName": "datahubIngestionCheckpoint",
"aspect": {
"json": {
"timestampMillis": 1586847600000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
},
"pipelineName": "dummy_stateful",
"platformInstanceId": "",
"config": "",
"state": {
"formatVersion": "1.0",
"serde": "utf-8",
"payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\"]}"
},
"runId": "dummy-test-stateful-ingestion"
}
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
[
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
}
]
Loading

0 comments on commit ae3f0fd

Please sign in to comment.