Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored May 22, 2024
2 parents ad05fe4 + ad3b8f9 commit 9da5700
Show file tree
Hide file tree
Showing 31 changed files with 493 additions and 236 deletions.
166 changes: 117 additions & 49 deletions docs/api/tutorials/lineage.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ This guide will show you how to

- Add lineage between datasets.
- Add column-level lineage between datasets.
- Read lineage.

## Prerequisites

Expand Down Expand Up @@ -109,14 +110,15 @@ Expected Response:
</TabItem>
</Tabs>

### Expected Outcomes of Adding Lineage
### Expected Outcome

You can now see the lineage between `fct_users_deleted` and `logging_events`.

<p align="center">
<img width="70%" src="https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/apis/tutorials/lineage-added.png"/>
</p>


## Add Column-level Lineage

<Tabs>
Expand All @@ -129,26 +131,38 @@ You can now see the lineage between `fct_users_deleted` and `logging_events`.
</TabItem>
</Tabs>

### Expected Outcome of Adding Column Level Lineage
### Expected Outcome

You can now see the column-level lineage between datasets. Note that you have to enable `Show Columns` to be able to see the column-level lineage.

<p align="center">
<img width="70%" src="https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/apis/tutorials/column-level-lineage-added.png"/>
</p>

## Read Table Lineage
## Add Lineage to Non-Dataset Entities

You can also add lineage to non-dataset entities, such as DataJobs, Charts, and Dashboards.
Please refer to the following examples.

| Connection | Examples | A.K.A |
|---------------------|-------------------|-----------------|
| DataJob to DataFlow | - [lineage_job_dataflow.py](../../../metadata-ingestion/examples/library/lineage_job_dataflow.py) | |
| DataJob to Dataset | - [lineage_dataset_job_dataset.py](../../../metadata-ingestion/examples/library/lineage_dataset_job_dataset.py) <br /> | Pipeline Lineage |
| Chart to Dashboard | - [lineage_chart_dashboard.py](../../../metadata-ingestion/examples/library/lineage_chart_dashboard.py) | |
| Chart to Dataset | - [lineage_dataset_chart.py](../../../metadata-ingestion/examples/library/lineage_dataset_chart.py) | |


## Read Lineage (Lineage Impact Analysis)

<Tabs>
<TabItem value="graphql" label="GraphQL" default>

```graphql
query searchAcrossLineage {
searchAcrossLineage(
query scrollAcrossLineage {
scrollAcrossLineage(
input: {
query: "*"
urn: "urn:li:dataset:(urn:li:dataPlatform:dbt,long_tail_companions.adoption.human_profiles,PROD)"
start: 0
urn: "urn:li:dataset:(urn:li:dataPlatform:hive,logging_events,PROD)"
count: 10
direction: DOWNSTREAM
orFilters: [
Expand All @@ -175,84 +189,138 @@ query searchAcrossLineage {
}
}
```
:::info Degree
Note that `degree` means the number of hops in the lineage. For example, `degree: 1` means the immediate downstream entities, `degree: 2` means the entities that are two hops away, and so on.
:::

The GraphQL example shows using lineage degrees as a filter, but additional search filters can be included here as well.
This will perform a multi-hop lineage search on the urn specified. For more information about the `scrollAcrossLineage` mutation, please refer to [scrollAcrossLineage](https://datahubproject.io/docs/graphql/queries/#scrollacrosslineage).

This example shows using lineage degrees as a filter, but additional search filters can be included here as well.

</TabItem>
<TabItem value="curl" label="Curl">

```shell
curl --location --request POST 'http://localhost:8080/api/graphql' \
--header 'Authorization: Bearer <my-access-token>' \
--header 'Content-Type: application/json' --data-raw '{ { "query": "query searchAcrossLineage { searchAcrossLineage( input: { query: \"*\" urn: \"urn:li:dataset:(urn:li:dataPlatform:dbt,long_tail_companions.adoption.human_profiles,PROD)\" start: 0 count: 10 direction: DOWNSTREAM orFilters: [ { and: [ { condition: EQUAL negated: false field: \"degree\" values: [\"1\", \"2\", \"3+\"] } ] } ] } ) { searchResults { degree entity { urn type } } }}"
--header 'Content-Type: application/json' --data-raw '{ { "query": "query scrollAcrossLineage { scrollAcrossLineage( input: { query: \"*\" urn: \"urn:li:dataset:(urn:li:dataPlatform:hive,logging_events,PROD)\" count: 10 direction: DOWNSTREAM orFilters: [ { and: [ { condition: EQUAL negated: false field: \"degree\" values: [\"1\", \"2\", \"3+\"] } ] } ] } ) { searchResults { degree entity { urn type } } }}"
}}'
```

</TabItem>
<TabItem value="python" label="Python">

```python
{{ inline /metadata-ingestion/examples/library/read_lineage_rest.py show_path_as_comment }}
{{ inline /metadata-ingestion/examples/library/read_lineage_execute_graphql.py show_path_as_comment }}
```
The Python SDK example shows how to read lineage of a dataset. Please note that the `aspect_type` parameter can vary depending on the entity type.
Below is a few examples of `aspect_type` for different entities.

|Entity|Aspect_type| Reference |
|-------|------------|--------------------------------------------------------------------------|
|Dataset|`UpstreamLineageClass`| [Link](/docs/generated/metamodel/entities/dataset.md#upstreamlineage) |
|Datajob|`DataJobInputOutputClass`| [Link](/docs/generated/metamodel/entities/dataJob.md#datajobinputoutput) |
|Dashboard|`DashboardInfoClass`| [Link](/docs/generated/metamodel/entities/dashboard.md#dashboardinfo) |
|DataFlow|`DataFlowInfoClass`| [Link](/docs/generated/metamodel/entities/dataFlow.md#dataflowinfo) |

Learn more about lineages of different entities in the [Add Lineage to Non-Dataset Entities](#add-lineage-to-non-dataset-entities) Section.

</TabItem>
</Tabs>

This will perform a multi-hop lineage search on the urn specified. For more information about the `searchAcrossLineage` mutation, please refer to [searchAcrossLineage](https://datahubproject.io/docs/graphql/queries/#searchacrosslineage).

## Read Column Lineage
### Expected Outcome

<Tabs>
<TabItem value="graphql" label="GraphQL" default>
As an outcome, you should see the downstream entities of `logging_events`.

```graphql
query searchAcrossLineage {
searchAcrossLineage(
input: {
query: "*"
urn: "urn:li:schemaField(urn:li:dataset:(urn:li:dataPlatform:dbt,long_tail_companions.adoption.human_profiles,PROD),profile_id)"
start: 0
count: 10
direction: DOWNSTREAM
orFilters: [
{
"data": {
"scrollAcrossLineage": {
"searchResults": [
{
and: [
{
condition: EQUAL
negated: false
field: "degree"
values: ["1", "2", "3+"]
}
]
"degree": 1,
"entity": {
"urn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,dag_abc,PROD),task_123)",
"type": "DATA_JOB"
}
},
...
{
"degree": 2,
"entity": {
"urn": "urn:li:mlPrimaryKey:(user_analytics,user_name)",
"type": "MLPRIMARY_KEY"
}
}
]
}
) {
searchResults {
degree
entity {
urn
type
}
}
}
},
"extensions": {}
}
```

This example shows using lineage degrees as a filter, but additional search filters can be included here as well.
## Read Column-level Lineage

</TabItem>
<TabItem value="curl" label="Curl">
You can also read column-level lineage via Python SDK.


<Tabs>
<TabItem value="python" label="Python">

```python
{{ inline /metadata-ingestion/examples/library/read_lineage_dataset_rest.py show_path_as_comment }}

```shell
curl --location --request POST 'http://localhost:8080/api/graphql' \
--header 'Authorization: Bearer <my-access-token>' \
--header 'Content-Type: application/json' --data-raw '{ { "query": "query searchAcrossLineage { searchAcrossLineage( input: { query: \"*\" urn: \"urn:li:schemaField(urn:li:dataset:(urn:li:dataPlatform:dbt,long_tail_companions.adoption.human_profiles,PROD),profile_id)\" start: 0 count: 10 direction: DOWNSTREAM orFilters: [ { and: [ { condition: EQUAL negated: false field: \"degree\" values: [\"1\", \"2\", \"3+\"] } ] } ] } ) { searchResults { degree entity { urn type } } }}"
}}'
```

</TabItem>
</Tabs>

This will perform a multi-hop lineage search on the urn specified. You can see schemaField URNs are made up of two parts: first the table they are a column of, and second the path of the column. For more information about the `searchAcrossLineage` mutation, please refer to [searchAcrossLineage](https://datahubproject.io/docs/graphql/queries/#searchacrosslineage).
### Expected Outcome

As a response, you will get the full lineage information like this.

```graphql
{
"UpstreamLineageClass": {
"upstreams": [
{
"UpstreamClass": {
"auditStamp": {
"AuditStampClass": {
"time": 0,
"actor": "urn:li:corpuser:unknown",
"impersonator": null,
"message": null
}
},
"created": null,
"dataset": "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_deleted,PROD)",
"type": "TRANSFORMED",
"properties": null,
"query": null
}
}
],
"fineGrainedLineages": [
{
"FineGrainedLineageClass": {
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_deleted,PROD),browser_id)",
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD),user_id)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,logging_events,PROD),browser)"
],
"transformOperation": null,
"confidenceScore": 1.0,
"query": null
}
}
]
}
}
```

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ private UrnUtils() {}
@Nonnull
public static DatasetUrn toDatasetUrn(
@Nonnull String platformName, @Nonnull String datasetName, @Nonnull String origin) {
return new DatasetUrn(new DataPlatformUrn(platformName), datasetName, toFabricType(origin));
return new DatasetUrn(new DataPlatformUrn(platformName), datasetName, FabricType.valueOf(origin.toUpperCase()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def _parse_sql_into_task_metadata(
task_name = f"{self.operator.dag_id}.{self.operator.task_id}"

run_facets = {}
job_facets = {"sql": SqlJobFacet(query=self._normalize_sql(sql))}
job_facets = {"sql": SqlJobFacet(query=SqlExtractor._normalize_sql(sql))}

# Prepare to run the SQL parser.
graph = self.context.get(_DATAHUB_GRAPH_CONTEXT_KEY, None)
Expand Down
13 changes: 13 additions & 0 deletions metadata-ingestion/examples/library/read_lineage_datajob_rest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph

# Imports for metadata model classes
from datahub.metadata.schema_classes import DataJobInputOutputClass

# Get the current lineage for a datajob
gms_endpoint = "http://localhost:8080"
graph = DataHubGraph(DatahubClientConfig(server=gms_endpoint))

urn = "urn:li:dataset:(urn:li:dataPlatform:hive,logging_events,PROD)"
result = graph.get_aspect(entity_urn=urn, aspect_type=DataJobInputOutputClass)

print(result)
13 changes: 13 additions & 0 deletions metadata-ingestion/examples/library/read_lineage_dataset_rest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph

# Imports for metadata model classes
from datahub.metadata.schema_classes import UpstreamLineageClass

# Get the current lineage for a dataset
gms_endpoint = "http://localhost:8080"
graph = DataHubGraph(DatahubClientConfig(server=gms_endpoint))

urn = "urn:li:dataset:(urn:li:dataPlatform:hive,logging_events,PROD)"
result = graph.get_aspect(entity_urn=urn, aspect_type=UpstreamLineageClass)

print(result)
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# read-modify-write requires access to the DataHubGraph (RestEmitter is not enough)
from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph

gms_endpoint = "http://localhost:8080"
graph = DataHubGraph(DatahubClientConfig(server=gms_endpoint))

# Query multiple aspects from entity
query = """
query scrollAcrossLineage($input: ScrollAcrossLineageInput!) {
scrollAcrossLineage(input: $input) {
searchResults {
degree
entity {
urn
type
}
}
}
}
"""

variables = {
"input": {
"query": "*",
"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,logging_events,PROD)",
"count": 10,
"direction": "DOWNSTREAM",
"orFilters": [
{
"and": [
{
"condition": "EQUAL",
"negated": "false",
"field": "degree",
"values": ["1", "2", "3+"],
}
]
}
],
}
}
result = graph.execute_graphql(query=query, variables=variables)

print(result)
43 changes: 0 additions & 43 deletions metadata-ingestion/examples/library/read_lineage_rest.py

This file was deleted.

Loading

0 comments on commit 9da5700

Please sign in to comment.