Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored May 9, 2024
2 parents bc89773 + 27d0b92 commit 2f04c58
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 26 deletions.
11 changes: 11 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import UpstreamLineageClass
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.type_annotations import get_class_from_annotation

Expand Down Expand Up @@ -70,6 +71,9 @@ class SourceReport(Report):
aspects: Dict[str, Dict[str, int]] = field(
default_factory=lambda: defaultdict(lambda: defaultdict(int))
)
aspect_urn_samples: Dict[str, Dict[str, LossyList[str]]] = field(
default_factory=lambda: defaultdict(lambda: defaultdict(LossyList))
)

warnings: LossyDict[str, LossyList[str]] = field(default_factory=LossyDict)
failures: LossyDict[str, LossyList[str]] = field(default_factory=LossyDict)
Expand All @@ -96,6 +100,13 @@ def report_workunit(self, wu: WorkUnit) -> None:

if aspectName is not None: # usually true
self.aspects[entityType][aspectName] += 1
self.aspect_urn_samples[entityType][aspectName].append(urn)
if isinstance(mcp.aspect, UpstreamLineageClass):
upstream_lineage = cast(UpstreamLineageClass, mcp.aspect)
if upstream_lineage.fineGrainedLineages:
self.aspect_urn_samples[entityType][
"fineGrainedLineages"
].append(urn)

def report_warning(self, key: str, reason: str) -> None:
warnings = self.warnings.get(key, LossyList())
Expand Down
29 changes: 17 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,14 +795,17 @@ def make_mapping_upstream_lineage(


def get_column_type(
report: DBTSourceReport, dataset_name: str, column_type: str, dbt_adapter: str
report: DBTSourceReport,
dataset_name: str,
column_type: Optional[str],
dbt_adapter: str,
) -> SchemaFieldDataType:
"""
Maps known DBT types to datahub types
"""
TypeClass: Any = _field_type_mapping.get(column_type)
TypeClass: Any = _field_type_mapping.get(column_type) if column_type else None

if TypeClass is None:
if TypeClass is None and column_type:
# resolve a modified type
if dbt_adapter == "trino":
TypeClass = resolve_trino_modified_type(column_type)
Expand Down Expand Up @@ -934,12 +937,14 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
def _make_data_platform_instance_aspect(self) -> DataPlatformInstanceClass:
return DataPlatformInstanceClass(
platform=mce_builder.make_data_platform_urn(DBT_PLATFORM),
instance=mce_builder.make_dataplatform_instance_urn(
mce_builder.make_data_platform_urn(DBT_PLATFORM),
self.config.platform_instance,
)
if self.config.platform_instance
else None,
instance=(
mce_builder.make_dataplatform_instance_urn(
mce_builder.make_data_platform_urn(DBT_PLATFORM),
self.config.platform_instance,
)
if self.config.platform_instance
else None
),
)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -1809,9 +1814,9 @@ def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str:
)
for upstream in upstream_urns
],
fineGrainedLineages=(cll or None)
if self.config.include_column_lineage
else None,
fineGrainedLineages=(
(cll or None) if self.config.include_column_lineage else None
),
)

# This method attempts to read-modify and return the owners of a dataset.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,7 @@ def transform_connector_config(
for k, v in connector_config.items():
for key, value in lookupsByProvider.items():
if key in v:
connector_config[k] = v.replace(key, value)
connector_config[k] = connector_config[k].replace(key, value)


@platform_name("Kafka Connect")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
"regtype": None,
"regrole": None,
"regnamespace": None,
"super": None,
"uuid": StringType,
"pg_lsn": None,
"tsvector": None, # text search vector
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -756,16 +756,13 @@
}
},
{
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source4,PROD),unknown_source.query-topic)",
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,debezium-mysql-connector,PROD)",
"changeType": "UPSERT",
"aspectName": "dataJobInputOutput",
"aspectName": "status",
"aspect": {
"json": {
"inputDatasets": [],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:kafka,query-topic,PROD)"
]
"removed": false
}
},
"systemMetadata": {
Expand All @@ -774,13 +771,16 @@
}
},
{
"entityType": "dataFlow",
"entityUrn": "urn:li:dataFlow:(kafka-connect,debezium-mysql-connector,PROD)",
"entityType": "dataJob",
"entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(kafka-connect,mysql_source4,PROD),unknown_source.query-topic)",
"changeType": "UPSERT",
"aspectName": "status",
"aspectName": "dataJobInputOutput",
"aspect": {
"json": {
"removed": false
"inputDatasets": [],
"outputDatasets": [
"urn:li:dataset:(urn:li:dataPlatform:kafka,query-topic,PROD)"
]
}
},
"systemMetadata": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ source:
- source_mongodb_connector
- confluent_s3_sink_connector
provided_configs:
- provider: env
path_key: MYSQL_PORT
value: 3306
- provider: env
path_key: MYSQL_DB
value: librarydb
- provider: env
path_key: MYSQL_CONNECTION_URL
value: jdbc:mysql://test_mysql:3306/librarydb
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,7 @@ CONNECT_PLUGIN_PATH=/usr/share/confluent-hub-components, /usr/local/share/kafka/
CONNECT_CONFIG_PROVIDERS=env
CONNECT_CONFIG_PROVIDERS_ENV_CLASS=io.strimzi.kafka.EnvVarConfigProvider
MYSQL_CONNECTION_URL=jdbc:mysql://foo:datahub@test_mysql:3306/librarydb
MYSQL_PORT=3306
MYSQL_DB=librarydb
POSTGRES_CONNECTION_URL=jdbc:postgresql://test_postgres:5432/postgres?user=postgres&password=datahub
S3_ENDPOINT_URL=http://s3mock:9090
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def loaded_kafka_connect(kafka_connect_runner):
"query": "select * from member",
"topic.prefix": "query-topic",
"tasks.max": "1",
"connection.url": "${env:MYSQL_CONNECTION_URL}"
"connection.url": "jdbc:mysql://foo:datahub@test_mysql:${env:MYSQL_PORT}/${env:MYSQL_DB}"
}
}
""",
Expand Down Expand Up @@ -419,7 +419,17 @@ def test_kafka_connect_ingest_stateful(
"provider": "env",
"path_key": "MYSQL_CONNECTION_URL",
"value": "jdbc:mysql://test_mysql:3306/librarydb",
}
},
{
"provider": "env",
"path_key": "MYSQL_PORT",
"value": "3306",
},
{
"provider": "env",
"path_key": "MYSQL_DB",
"value": "librarydb",
},
],
"stateful_ingestion": {
"enabled": True,
Expand Down

0 comments on commit 2f04c58

Please sign in to comment.