diff --git a/metadata-ingestion/examples/recipes/file_to_datahub-jobs-golden.dhub.yaml b/metadata-ingestion/examples/recipes/file_to_datahub-jobs-golden.dhub.yaml new file mode 100644 index 0000000000000..bdad337b607de --- /dev/null +++ b/metadata-ingestion/examples/recipes/file_to_datahub-jobs-golden.dhub.yaml @@ -0,0 +1,11 @@ +--- +# see https://datahubproject.io/docs/generated/ingestion/sources/file for complete documentation +source: + type: "file" + config: + filename: ./examples/test_examples/via_node_test_example_fivetran.json +# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation +sink: + type: "datahub-rest" + config: + server: "http://localhost:8080" diff --git a/metadata-ingestion/examples/test_examples/via_node_test_example_fivetran.json b/metadata-ingestion/examples/test_examples/via_node_test_example_fivetran.json new file mode 100644 index 0000000000000..886ad2e2005ca --- /dev/null +++ b/metadata-ingestion/examples/test_examples/via_node_test_example_fivetran.json @@ -0,0 +1,731 @@ +[ +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)", + "changeType": "UPSERT", + "aspectName": "dataFlowInfo", + "aspect": { + "json": { + "customProperties": {}, + "name": "postgres" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [], + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "paused": "False", + "sync_frequency": "1440", + "destination_id": "'interval_unconstitutional'" + }, + "name": "postgres", + "type": { + "string": "COMMAND" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "project-id-1.bigquery-dataset-1.table-1", + "platform": "urn:li:dataPlatform:bigquery", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [{ + "fieldPath": "id", + "nullable": true, + "description": "mock comment for column", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "NUMBER(asdecimal=False)", + "recursive": false, + "isPartOfKey": true + }] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00" + } + }, + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "project-id-1.bigquery-dataset-1.table-1", + "platform": "urn:li:dataPlatform:bigquery", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.MySqlDDL": { + "tableSchema": "" + } + }, + "fields": [{ + "fieldPath": "id", + "nullable": true, + "description": "mock comment for column", + "type": { + "type": { + "com.linkedin.pegasus2avro.schema.NumberType": {} + } + }, + "nativeDataType": "NUMBER(asdecimal=False)", + "recursive": false, + "isPartOfKey": true + }] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "bigquery-2022_02_03-07_00_00" + } + }, + { + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)", + "type": "VIEW" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD),field_bar)" + ], + "confidenceScore": 1.0 + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1643871600000, + "runId": "oracle-2022_02_03-07_00_00", + "lastRunId": "no-run-id-provided" + } + }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)", + "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)" + ], + "outputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)" + ], + "inputDatajobs": [], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:kafka,SampleKafkaDataset,PROD),field_foo_2)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV),id)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD),field_bar)" + ], + "confidenceScore": 1.0 + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, + +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:Shubham Jagtap", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "4c9a03d6-eded-4422-a46a-163266e58243", + "type": "BATCH_SCHEDULED", + "created": { + "time": 1695191853000, + "actor": "urn:li:corpuser:datahub" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRelationships", + "aspect": { + "json": { + "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "upstreamInstances": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)", + "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:bigquery,test.postgres_public.employee,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:bigquery,test.postgres_public.company,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1695191853000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "status": "STARTED" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:ee88d32dbe3133a23a9023c097050190", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1695191885000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "status": "COMPLETE", + "result": { + "type": "SUCCESS", + "nativeResultType": "fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "f773d1e9-c791-48f4-894f-8cf9b3dfc834", + "type": "BATCH_SCHEDULED", + "created": { + "time": 1696343730000, + "actor": "urn:li:corpuser:datahub" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRelationships", + "aspect": { + "json": { + "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "upstreamInstances": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)", + "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:bigquery,test.postgres_public.employee,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:bigquery,test.postgres_public.company,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1696343730000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "status": "STARTED" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:be36f55c13ec4e313c7510770e50784a", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1696343732000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "status": "COMPLETE", + "result": { + "type": "SKIPPED", + "nativeResultType": "fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": {}, + "name": "63c2fc85-600b-455f-9ba0-f576522465be", + "type": "BATCH_SCHEDULED", + "created": { + "time": 1696343755000, + "actor": "urn:li:corpuser:datahub" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRelationships", + "aspect": { + "json": { + "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "upstreamInstances": [] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.employee,DEV)", + "urn:li:dataset:(urn:li:dataPlatform:postgres,postgres_db.public.company,DEV)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:bigquery,test.postgres_public.employee,PROD)", + "urn:li:dataset:(urn:li:dataPlatform:bigquery,test.postgres_public.company,PROD)" + ] + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1696343755000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "status": "STARTED" + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:d8f100271d2dc3fa905717f82d083c8d", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1696343790000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "status": "COMPLETE", + "result": { + "type": "FAILURE", + "nativeResultType": "fivetran" + } + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(fivetran,calendar_elected,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1654621200000, + "runId": "powerbi-test", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java index 5fc3dfc779fa4..8aa27363e985d 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ESGraphQueryDAO.java @@ -55,11 +55,13 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.client.RequestOptions; import org.opensearch.client.RestHighLevelClient; +import org.opensearch.common.lucene.search.function.CombineFunction; import org.opensearch.index.query.BoolQueryBuilder; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.SearchHit; import org.opensearch.search.builder.SearchSourceBuilder; +import org.opensearch.search.rescore.QueryRescorerBuilder; /** A search DAO for Elasticsearch backend. */ @Slf4j @@ -116,6 +118,9 @@ private SearchResponse executeSearchQuery( searchSourceBuilder.size(count); searchSourceBuilder.query(query); + if (graphQueryConfiguration.isBoostViaNodes()) { + addViaNodeBoostQuery(searchSourceBuilder); + } searchRequest.source(searchSourceBuilder); @@ -457,7 +462,7 @@ private List getLineageRelationships( } @VisibleForTesting - public static QueryBuilder getLineageQuery( + public QueryBuilder getLineageQuery( @Nonnull Map> urnsPerEntityType, @Nonnull Map> edgesPerEntityType, @Nonnull GraphFilters graphFilters, @@ -497,7 +502,7 @@ public static QueryBuilder getLineageQuery( } @VisibleForTesting - public static QueryBuilder getLineageQueryForEntityType( + public QueryBuilder getLineageQueryForEntityType( @Nonnull List urns, @Nonnull List lineageEdges, @Nonnull GraphFilters graphFilters) { @@ -520,6 +525,25 @@ public static QueryBuilder getLineageQueryForEntityType( return query; } + /** + * Replaces score from initial lineage query against the graph index with score from whether a via + * edge exists or not. We don't currently sort the results for the graph query for anything else, + * we just do a straight filter, but this will need to be re-evaluated if we do. + * + * @param sourceBuilder source builder for the lineage query + */ + private void addViaNodeBoostQuery(final SearchSourceBuilder sourceBuilder) { + QueryBuilders.functionScoreQuery(QueryBuilders.existsQuery(EDGE_FIELD_VIA)) + .boostMode(CombineFunction.REPLACE); + QueryRescorerBuilder queryRescorerBuilder = + new QueryRescorerBuilder( + QueryBuilders.functionScoreQuery(QueryBuilders.existsQuery(EDGE_FIELD_VIA)) + .boostMode(CombineFunction.REPLACE)); + queryRescorerBuilder.windowSize( + graphQueryConfiguration.getMaxResult()); // Will rescore all results + sourceBuilder.addRescorer(queryRescorerBuilder); + } + /** * Adds an individual relationship edge to a running set of unique paths to each node in the * graph. diff --git a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java index 90f46190ac18e..0235edbcd30cb 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/graph/elastic/ElasticSearchGraphService.java @@ -36,12 +36,8 @@ import com.linkedin.structured.StructuredPropertyDefinition; import io.opentelemetry.extension.annotations.WithSpan; import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Base64; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -67,8 +63,6 @@ public class ElasticSearchGraphService implements GraphService, ElasticSearchInd private final ESGraphWriteDAO _graphWriteDAO; private final ESGraphQueryDAO _graphReadDAO; private final ESIndexBuilder _indexBuilder; - - private static final String DOC_DELIMETER = "--"; public static final String INDEX_NAME = "graph_service_v1"; private static final Map EMPTY_HASH = new HashMap<>(); @@ -123,25 +117,6 @@ private String toDocument(@Nonnull final Edge edge) { return searchDocument.toString(); } - private String toDocId(@Nonnull final Edge edge) { - String rawDocId = - edge.getSource().toString() - + DOC_DELIMETER - + edge.getRelationshipType() - + DOC_DELIMETER - + edge.getDestination().toString(); - - try { - byte[] bytesOfRawDocID = rawDocId.getBytes(StandardCharsets.UTF_8); - MessageDigest md = MessageDigest.getInstance("MD5"); - byte[] thedigest = md.digest(bytesOfRawDocID); - return Base64.getEncoder().encodeToString(thedigest); - } catch (NoSuchAlgorithmException e) { - e.printStackTrace(); - return rawDocId; - } - } - @Override public LineageRegistry getLineageRegistry() { return _lineageRegistry; @@ -149,7 +124,7 @@ public LineageRegistry getLineageRegistry() { @Override public void addEdge(@Nonnull final Edge edge) { - String docId = toDocId(edge); + String docId = edge.toDocId(); String edgeDocument = toDocument(edge); _graphWriteDAO.upsertDocument(docId, edgeDocument); } @@ -161,7 +136,7 @@ public void upsertEdge(@Nonnull final Edge edge) { @Override public void removeEdge(@Nonnull final Edge edge) { - String docId = toDocId(edge); + String docId = edge.toDocId(); _graphWriteDAO.deleteDocument(docId); } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java index 2de61c8ed31bb..ca5fbfcd27a28 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBase.java @@ -92,11 +92,25 @@ public int compare(RelatedEntity left, RelatedEntity right) { protected static String datasetFiveUrnString = "urn:li:" + datasetType + ":(urn:li:dataPlatform:type,SampleDatasetFive,PROD)"; + protected static final String schemaFieldUrnOneString = + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:type,SampleDatasetFive,PROD),fieldOne)"; + protected static final String schemaFieldUrnTwoString = + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:type,SampleDatasetFour,PROD),fieldTwo)"; + + protected static final String lifeCycleOwnerOneString = + "urn:li:dataJob:(urn:li:dataFlow:(fivetran,calendar_elected,PROD),calendar_elected)"; + protected static final String lifeCycleOwnerTwoString = + "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)"; + protected static Urn datasetOneUrn = createFromString(datasetOneUrnString); protected static Urn datasetTwoUrn = createFromString(datasetTwoUrnString); protected static Urn datasetThreeUrn = createFromString(datasetThreeUrnString); protected static Urn datasetFourUrn = createFromString(datasetFourUrnString); protected static Urn datasetFiveUrn = createFromString(datasetFiveUrnString); + protected static final Urn schemaFieldUrnOne = createFromString(schemaFieldUrnOneString); + protected static final Urn schemaFieldUrnTwo = createFromString(schemaFieldUrnTwoString); + protected static final Urn lifeCycleOwnerOne = createFromString(lifeCycleOwnerOneString); + protected static final Urn lifeCycleOwnerTwo = createFromString(lifeCycleOwnerTwoString); protected static String unknownUrnString = "urn:li:unknown:(urn:li:unknown:Unknown)"; @@ -139,6 +153,14 @@ public int compare(RelatedEntity left, RelatedEntity right) { new RelatedEntity(downstreamOf, datasetThreeUrnString); protected static RelatedEntity downstreamOfDatasetFourRelatedEntity = new RelatedEntity(downstreamOf, datasetFourUrnString); + protected static final RelatedEntity downstreamOfSchemaFieldOneVia = + new RelatedEntity(downstreamOf, schemaFieldUrnOneString, lifeCycleOwnerOneString); + protected static final RelatedEntity downstreamOfSchemaFieldOne = + new RelatedEntity(downstreamOf, schemaFieldUrnOneString); + protected static final RelatedEntity downstreamOfSchemaFieldTwoVia = + new RelatedEntity(downstreamOf, schemaFieldUrnTwoString, lifeCycleOwnerOneString); + protected static final RelatedEntity downstreamOfSchemaFieldTwo = + new RelatedEntity(downstreamOf, schemaFieldUrnTwoString); protected static RelatedEntity hasOwnerDatasetOneRelatedEntity = new RelatedEntity(hasOwner, datasetOneUrnString); @@ -244,7 +266,29 @@ protected GraphService getPopulatedGraphService() throws Exception { new Edge(datasetThreeUrn, userTwoUrn, hasOwner, null, null, null, null, null), new Edge(datasetFourUrn, userTwoUrn, hasOwner, null, null, null, null, null), new Edge(userOneUrn, userTwoUrn, knowsUser, null, null, null, null, null), - new Edge(userTwoUrn, userOneUrn, knowsUser, null, null, null, null, null)); + new Edge(userTwoUrn, userOneUrn, knowsUser, null, null, null, null, null), + new Edge( + schemaFieldUrnOne, + schemaFieldUrnTwo, + downstreamOf, + 0L, + null, + 0L, + null, + null, + lifeCycleOwnerOne, + lifeCycleOwnerOne), + new Edge( + schemaFieldUrnOne, + schemaFieldUrnTwo, + downstreamOf, + 0L, + null, + 0L, + null, + null, + lifeCycleOwnerTwo, + null)); edges.forEach(service::addEdge); syncAfterWrite(); @@ -412,12 +456,14 @@ public void testPopulatedGraphService() throws Exception { outgoingRelationships, 0, 100); + // All downstreamOf, hasOwner, or knowsUser relationships, outgoing assertEqualsAnyOrder( relatedOutgoingEntitiesBeforeRemove, Arrays.asList( downstreamOfDatasetOneRelatedEntity, downstreamOfDatasetTwoRelatedEntity, hasOwnerUserOneRelatedEntity, hasOwnerUserTwoRelatedEntity, - knowsUserOneRelatedEntity, knowsUserTwoRelatedEntity)); + knowsUserOneRelatedEntity, knowsUserTwoRelatedEntity, + downstreamOfSchemaFieldTwoVia, downstreamOfSchemaFieldTwo)); RelatedEntitiesResult relatedIncomingEntitiesBeforeRemove = service.findRelatedEntities( anyType, @@ -428,6 +474,7 @@ public void testPopulatedGraphService() throws Exception { incomingRelationships, 0, 100); + // All downstreamOf, hasOwner, or knowsUser relationships, incoming assertEqualsAnyOrder( relatedIncomingEntitiesBeforeRemove, Arrays.asList( @@ -439,7 +486,44 @@ public void testPopulatedGraphService() throws Exception { hasOwnerDatasetThreeRelatedEntity, hasOwnerDatasetFourRelatedEntity, knowsUserOneRelatedEntity, - knowsUserTwoRelatedEntity)); + knowsUserTwoRelatedEntity, + downstreamOfSchemaFieldOneVia, + downstreamOfSchemaFieldOne)); + EntityLineageResult viaNodeResult = + service.getLineage( + schemaFieldUrnOne, + LineageDirection.UPSTREAM, + new GraphFilters(List.of("schemaField")), + 0, + 1000, + 100, + null, + null); + // Multi-path enabled + assertEquals(viaNodeResult.getRelationships().size(), 2); + // First one is via node + assertTrue( + viaNodeResult.getRelationships().get(0).getPaths().get(0).contains(lifeCycleOwnerOne)); + EntityLineageResult viaNodeResultNoMulti = + getGraphService(false) + .getLineage( + schemaFieldUrnOne, + LineageDirection.UPSTREAM, + new GraphFilters(List.of("schemaField")), + 0, + 1000, + 100, + null, + null); + + // Multi-path disabled, still has two because via flow creates both edges in response + assertEquals(viaNodeResultNoMulti.getRelationships().size(), 2); + // First one is via node + assertTrue( + viaNodeResult.getRelationships().get(0).getPaths().get(0).contains(lifeCycleOwnerOne)); + + // reset graph service + getGraphService(); } @Test @@ -685,12 +769,18 @@ private void doTestFindRelatedEntities( @DataProvider(name = "FindRelatedEntitiesSourceTypeTests") public Object[][] getFindRelatedEntitiesSourceTypeTests() { return new Object[][] { + // All DownstreamOf relationships, outgoing new Object[] { null, Arrays.asList(downstreamOf), outgoingRelationships, - Arrays.asList(downstreamOfDatasetOneRelatedEntity, downstreamOfDatasetTwoRelatedEntity) + Arrays.asList( + downstreamOfDatasetOneRelatedEntity, + downstreamOfDatasetTwoRelatedEntity, + downstreamOfSchemaFieldTwoVia, + downstreamOfSchemaFieldTwo) }, + // All DownstreamOf relationships, incoming new Object[] { null, Arrays.asList(downstreamOf), @@ -698,15 +788,20 @@ public Object[][] getFindRelatedEntitiesSourceTypeTests() { Arrays.asList( downstreamOfDatasetTwoRelatedEntity, downstreamOfDatasetThreeRelatedEntity, - downstreamOfDatasetFourRelatedEntity) + downstreamOfDatasetFourRelatedEntity, + downstreamOfSchemaFieldOneVia, + downstreamOfSchemaFieldOne) }, + // All DownstreamOf relationships, both directions new Object[] { null, Arrays.asList(downstreamOf), undirectedRelationships, Arrays.asList( downstreamOfDatasetOneRelatedEntity, downstreamOfDatasetTwoRelatedEntity, - downstreamOfDatasetThreeRelatedEntity, downstreamOfDatasetFourRelatedEntity) + downstreamOfDatasetThreeRelatedEntity, downstreamOfDatasetFourRelatedEntity, + downstreamOfSchemaFieldTwoVia, downstreamOfSchemaFieldTwo, + downstreamOfSchemaFieldOneVia, downstreamOfSchemaFieldOne) }, // "" used to be any type before v0.9.0, which is now encoded by null @@ -789,16 +884,24 @@ public Object[][] getFindRelatedEntitiesDestinationTypeTests() { null, Arrays.asList(downstreamOf), outgoingRelationships, - Arrays.asList(downstreamOfDatasetOneRelatedEntity, downstreamOfDatasetTwoRelatedEntity) + // All DownstreamOf relationships, outgoing + Arrays.asList( + downstreamOfDatasetOneRelatedEntity, + downstreamOfDatasetTwoRelatedEntity, + downstreamOfSchemaFieldTwoVia, + downstreamOfSchemaFieldTwo) }, new Object[] { null, Arrays.asList(downstreamOf), incomingRelationships, + // All DownstreamOf relationships, incoming Arrays.asList( downstreamOfDatasetTwoRelatedEntity, downstreamOfDatasetThreeRelatedEntity, - downstreamOfDatasetFourRelatedEntity) + downstreamOfDatasetFourRelatedEntity, + downstreamOfSchemaFieldOneVia, + downstreamOfSchemaFieldOne) }, new Object[] { null, @@ -806,7 +909,9 @@ public Object[][] getFindRelatedEntitiesDestinationTypeTests() { undirectedRelationships, Arrays.asList( downstreamOfDatasetOneRelatedEntity, downstreamOfDatasetTwoRelatedEntity, - downstreamOfDatasetThreeRelatedEntity, downstreamOfDatasetFourRelatedEntity) + downstreamOfDatasetThreeRelatedEntity, downstreamOfDatasetFourRelatedEntity, + downstreamOfSchemaFieldOneVia, downstreamOfSchemaFieldOne, + downstreamOfSchemaFieldTwoVia, downstreamOfSchemaFieldTwo) }, new Object[] { "", Arrays.asList(downstreamOf), outgoingRelationships, Collections.emptyList() @@ -1035,12 +1140,14 @@ public void testFindRelatedEntitiesRelationshipTypes() throws Exception { outgoingRelationships, 0, 100); + // All DownstreamOf relationships, outgoing (destination) assertEqualsAnyOrder( allOutgoingRelatedEntities, Arrays.asList( downstreamOfDatasetOneRelatedEntity, downstreamOfDatasetTwoRelatedEntity, hasOwnerUserOneRelatedEntity, hasOwnerUserTwoRelatedEntity, - knowsUserOneRelatedEntity, knowsUserTwoRelatedEntity)); + knowsUserOneRelatedEntity, knowsUserTwoRelatedEntity, + downstreamOfSchemaFieldTwoVia, downstreamOfSchemaFieldTwo)); RelatedEntitiesResult allIncomingRelatedEntities = service.findRelatedEntities( @@ -1052,6 +1159,7 @@ public void testFindRelatedEntitiesRelationshipTypes() throws Exception { incomingRelationships, 0, 100); + // All DownstreamOf relationships, incoming (source) assertEqualsAnyOrder( allIncomingRelatedEntities, Arrays.asList( @@ -1063,7 +1171,9 @@ public void testFindRelatedEntitiesRelationshipTypes() throws Exception { hasOwnerDatasetThreeRelatedEntity, hasOwnerDatasetFourRelatedEntity, knowsUserOneRelatedEntity, - knowsUserTwoRelatedEntity)); + knowsUserTwoRelatedEntity, + downstreamOfSchemaFieldOneVia, + downstreamOfSchemaFieldOne)); RelatedEntitiesResult allUnknownRelationshipTypeRelatedEntities = service.findRelatedEntities( @@ -1087,9 +1197,14 @@ public void testFindRelatedEntitiesRelationshipTypes() throws Exception { outgoingRelationships, 0, 100); + // All DownstreamOf relationships, outgoing (destination) assertEqualsAnyOrder( someUnknownRelationshipTypeRelatedEntities, - Arrays.asList(downstreamOfDatasetOneRelatedEntity, downstreamOfDatasetTwoRelatedEntity)); + Arrays.asList( + downstreamOfDatasetOneRelatedEntity, + downstreamOfDatasetTwoRelatedEntity, + downstreamOfSchemaFieldTwoVia, + downstreamOfSchemaFieldTwo)); } @Test @@ -1517,6 +1632,7 @@ public void testRemoveNode() throws Exception { syncAfterWrite(); // assert the modified graph + // All downstreamOf, hasOwner, knowsUser relationships minus datasetTwo's, outgoing assertEqualsAnyOrder( service.findRelatedEntities( anyType, @@ -1529,7 +1645,8 @@ public void testRemoveNode() throws Exception { 100), Arrays.asList( hasOwnerUserOneRelatedEntity, hasOwnerUserTwoRelatedEntity, - knowsUserOneRelatedEntity, knowsUserTwoRelatedEntity)); + knowsUserOneRelatedEntity, knowsUserTwoRelatedEntity, + downstreamOfSchemaFieldTwoVia, downstreamOfSchemaFieldTwo)); } @Test diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBaseNoVia.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBaseNoVia.java new file mode 100644 index 0000000000000..19ca2e85e8c54 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/GraphServiceTestBaseNoVia.java @@ -0,0 +1,386 @@ +package com.linkedin.metadata.graph; + +import static com.linkedin.metadata.search.utils.QueryUtils.*; +import static org.testng.Assert.*; + +import java.util.Arrays; +import java.util.Collections; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public abstract class GraphServiceTestBaseNoVia extends GraphServiceTestBase { + + @DataProvider(name = "NoViaFindRelatedEntitiesDestinationTypeTests") + public Object[][] getNoViaFindRelatedEntitiesDestinationTypeTests() { + return new Object[][] { + new Object[] { + null, + Arrays.asList(downstreamOf), + outgoingRelationships, + // All DownstreamOf relationships, outgoing + Arrays.asList( + downstreamOfDatasetOneRelatedEntity, + downstreamOfDatasetTwoRelatedEntity, + // TODO: Via not supported in Neo4J and DGraph + downstreamOfSchemaFieldTwo) + }, + new Object[] { + null, + Arrays.asList(downstreamOf), + incomingRelationships, + // All DownstreamOf relationships, incoming + Arrays.asList( + downstreamOfDatasetTwoRelatedEntity, + downstreamOfDatasetThreeRelatedEntity, + downstreamOfDatasetFourRelatedEntity, + // TODO: Via not supported in Neo4J and DGraph + downstreamOfSchemaFieldOne) + }, + new Object[] { + null, + Arrays.asList(downstreamOf), + undirectedRelationships, + Arrays.asList( + downstreamOfDatasetOneRelatedEntity, + downstreamOfDatasetTwoRelatedEntity, + downstreamOfDatasetThreeRelatedEntity, + downstreamOfDatasetFourRelatedEntity, + // TODO: Via not supported in Neo4J and DGraph + downstreamOfSchemaFieldOne, + downstreamOfSchemaFieldTwo) + }, + new Object[] { + "", Arrays.asList(downstreamOf), outgoingRelationships, Collections.emptyList() + }, + new Object[] { + "", Arrays.asList(downstreamOf), incomingRelationships, Collections.emptyList() + }, + new Object[] { + "", Arrays.asList(downstreamOf), undirectedRelationships, Collections.emptyList() + }, + new Object[] { + datasetType, + Arrays.asList(downstreamOf), + outgoingRelationships, + Arrays.asList(downstreamOfDatasetOneRelatedEntity, downstreamOfDatasetTwoRelatedEntity) + }, + new Object[] { + datasetType, + Arrays.asList(downstreamOf), + incomingRelationships, + Arrays.asList( + downstreamOfDatasetTwoRelatedEntity, + downstreamOfDatasetThreeRelatedEntity, + downstreamOfDatasetFourRelatedEntity) + }, + new Object[] { + datasetType, + Arrays.asList(downstreamOf), + undirectedRelationships, + Arrays.asList( + downstreamOfDatasetOneRelatedEntity, downstreamOfDatasetTwoRelatedEntity, + downstreamOfDatasetThreeRelatedEntity, downstreamOfDatasetFourRelatedEntity) + }, + new Object[] {datasetType, Arrays.asList(hasOwner), outgoingRelationships, Arrays.asList()}, + new Object[] { + datasetType, + Arrays.asList(hasOwner), + incomingRelationships, + Arrays.asList( + hasOwnerDatasetOneRelatedEntity, hasOwnerDatasetTwoRelatedEntity, + hasOwnerDatasetThreeRelatedEntity, hasOwnerDatasetFourRelatedEntity) + }, + new Object[] { + datasetType, + Arrays.asList(hasOwner), + undirectedRelationships, + Arrays.asList( + hasOwnerDatasetOneRelatedEntity, hasOwnerDatasetTwoRelatedEntity, + hasOwnerDatasetThreeRelatedEntity, hasOwnerDatasetFourRelatedEntity) + }, + new Object[] { + userType, + Arrays.asList(hasOwner), + outgoingRelationships, + Arrays.asList(hasOwnerUserOneRelatedEntity, hasOwnerUserTwoRelatedEntity) + }, + new Object[] {userType, Arrays.asList(hasOwner), incomingRelationships, Arrays.asList()}, + new Object[] { + userType, + Arrays.asList(hasOwner), + undirectedRelationships, + Arrays.asList(hasOwnerUserOneRelatedEntity, hasOwnerUserTwoRelatedEntity) + } + }; + } + + @DataProvider(name = "NoViaFindRelatedEntitiesSourceTypeTests") + public Object[][] getNoViaFindRelatedEntitiesSourceTypeTests() { + return new Object[][] { + // All DownstreamOf relationships, outgoing + new Object[] { + null, + Arrays.asList(downstreamOf), + outgoingRelationships, + Arrays.asList( + downstreamOfDatasetOneRelatedEntity, + downstreamOfDatasetTwoRelatedEntity, + // TODO: DGraph and Neo4J do not support via + downstreamOfSchemaFieldTwo) + }, + // All DownstreamOf relationships, incoming + new Object[] { + null, + Arrays.asList(downstreamOf), + incomingRelationships, + Arrays.asList( + downstreamOfDatasetTwoRelatedEntity, + downstreamOfDatasetThreeRelatedEntity, + downstreamOfDatasetFourRelatedEntity, + // TODO: DGraph and Neo4J do not support via + downstreamOfSchemaFieldOne) + }, + // All DownstreamOf relationships, both directions + new Object[] { + null, + Arrays.asList(downstreamOf), + undirectedRelationships, + Arrays.asList( + downstreamOfDatasetOneRelatedEntity, + downstreamOfDatasetTwoRelatedEntity, + downstreamOfDatasetThreeRelatedEntity, + downstreamOfDatasetFourRelatedEntity, + // TODO: DGraph and Neo4J do not support via + downstreamOfSchemaFieldTwo, + downstreamOfSchemaFieldOne) + }, + + // "" used to be any type before v0.9.0, which is now encoded by null + new Object[] { + "", Arrays.asList(downstreamOf), outgoingRelationships, Collections.emptyList() + }, + new Object[] { + "", Arrays.asList(downstreamOf), incomingRelationships, Collections.emptyList() + }, + new Object[] { + "", Arrays.asList(downstreamOf), undirectedRelationships, Collections.emptyList() + }, + new Object[] { + datasetType, + Arrays.asList(downstreamOf), + outgoingRelationships, + Arrays.asList(downstreamOfDatasetOneRelatedEntity, downstreamOfDatasetTwoRelatedEntity) + }, + new Object[] { + datasetType, + Arrays.asList(downstreamOf), + incomingRelationships, + Arrays.asList( + downstreamOfDatasetTwoRelatedEntity, + downstreamOfDatasetThreeRelatedEntity, + downstreamOfDatasetFourRelatedEntity) + }, + new Object[] { + datasetType, + Arrays.asList(downstreamOf), + undirectedRelationships, + Arrays.asList( + downstreamOfDatasetOneRelatedEntity, downstreamOfDatasetTwoRelatedEntity, + downstreamOfDatasetThreeRelatedEntity, downstreamOfDatasetFourRelatedEntity) + }, + new Object[] {userType, Arrays.asList(downstreamOf), outgoingRelationships, Arrays.asList()}, + new Object[] {userType, Arrays.asList(downstreamOf), incomingRelationships, Arrays.asList()}, + new Object[] { + userType, Arrays.asList(downstreamOf), undirectedRelationships, Arrays.asList() + }, + new Object[] {userType, Arrays.asList(hasOwner), outgoingRelationships, Arrays.asList()}, + new Object[] { + userType, + Arrays.asList(hasOwner), + incomingRelationships, + Arrays.asList( + hasOwnerDatasetOneRelatedEntity, hasOwnerDatasetTwoRelatedEntity, + hasOwnerDatasetThreeRelatedEntity, hasOwnerDatasetFourRelatedEntity) + }, + new Object[] { + userType, + Arrays.asList(hasOwner), + undirectedRelationships, + Arrays.asList( + hasOwnerDatasetOneRelatedEntity, hasOwnerDatasetTwoRelatedEntity, + hasOwnerDatasetThreeRelatedEntity, hasOwnerDatasetFourRelatedEntity) + } + }; + } + + @Test + @Override + public void testFindRelatedEntitiesRelationshipTypes() throws Exception { + GraphService service = getPopulatedGraphService(); + + RelatedEntitiesResult allOutgoingRelatedEntities = + service.findRelatedEntities( + anyType, + EMPTY_FILTER, + anyType, + EMPTY_FILTER, + Arrays.asList(downstreamOf, hasOwner, knowsUser), + outgoingRelationships, + 0, + 100); + // All DownstreamOf relationships, outgoing (destination) + assertEqualsAnyOrder( + allOutgoingRelatedEntities, + Arrays.asList( + downstreamOfDatasetOneRelatedEntity, + downstreamOfDatasetTwoRelatedEntity, + hasOwnerUserOneRelatedEntity, + hasOwnerUserTwoRelatedEntity, + knowsUserOneRelatedEntity, + knowsUserTwoRelatedEntity, + // TODO: DGraph and Neo4J do not support via + downstreamOfSchemaFieldTwo)); + + RelatedEntitiesResult allIncomingRelatedEntities = + service.findRelatedEntities( + anyType, + EMPTY_FILTER, + anyType, + EMPTY_FILTER, + Arrays.asList(downstreamOf, hasOwner, knowsUser), + incomingRelationships, + 0, + 100); + // All DownstreamOf relationships, incoming (source) + assertEqualsAnyOrder( + allIncomingRelatedEntities, + Arrays.asList( + downstreamOfDatasetTwoRelatedEntity, + downstreamOfDatasetThreeRelatedEntity, + downstreamOfDatasetFourRelatedEntity, + hasOwnerDatasetOneRelatedEntity, + hasOwnerDatasetTwoRelatedEntity, + hasOwnerDatasetThreeRelatedEntity, + hasOwnerDatasetFourRelatedEntity, + knowsUserOneRelatedEntity, + knowsUserTwoRelatedEntity, + // TODO: DGraph and Neo4J do not support via + downstreamOfSchemaFieldOne)); + + RelatedEntitiesResult allUnknownRelationshipTypeRelatedEntities = + service.findRelatedEntities( + anyType, + EMPTY_FILTER, + anyType, + EMPTY_FILTER, + Arrays.asList("unknownRelationshipType", "unseenRelationshipType"), + outgoingRelationships, + 0, + 100); + assertEqualsAnyOrder(allUnknownRelationshipTypeRelatedEntities, Collections.emptyList()); + + RelatedEntitiesResult someUnknownRelationshipTypeRelatedEntities = + service.findRelatedEntities( + anyType, + EMPTY_FILTER, + anyType, + EMPTY_FILTER, + Arrays.asList("unknownRelationshipType", downstreamOf), + outgoingRelationships, + 0, + 100); + // All DownstreamOf relationships, outgoing (destination) + assertEqualsAnyOrder( + someUnknownRelationshipTypeRelatedEntities, + Arrays.asList( + downstreamOfDatasetOneRelatedEntity, + downstreamOfDatasetTwoRelatedEntity, + // TODO: DGraph and Neo4J do not support via + downstreamOfSchemaFieldTwo)); + } + + @Test + @Override + public void testPopulatedGraphService() throws Exception { + GraphService service = getPopulatedGraphService(); + + RelatedEntitiesResult relatedOutgoingEntitiesBeforeRemove = + service.findRelatedEntities( + anyType, + EMPTY_FILTER, + anyType, + EMPTY_FILTER, + Arrays.asList(downstreamOf, hasOwner, knowsUser), + outgoingRelationships, + 0, + 100); + // All downstreamOf, hasOwner, or knowsUser relationships, outgoing + assertEqualsAnyOrder( + relatedOutgoingEntitiesBeforeRemove, + Arrays.asList( + downstreamOfDatasetOneRelatedEntity, + downstreamOfDatasetTwoRelatedEntity, + hasOwnerUserOneRelatedEntity, + hasOwnerUserTwoRelatedEntity, + knowsUserOneRelatedEntity, + knowsUserTwoRelatedEntity, + // TODO: DGraph and Neo4j do not support via + downstreamOfSchemaFieldTwo)); + RelatedEntitiesResult relatedIncomingEntitiesBeforeRemove = + service.findRelatedEntities( + anyType, + EMPTY_FILTER, + anyType, + EMPTY_FILTER, + Arrays.asList(downstreamOf, hasOwner, knowsUser), + incomingRelationships, + 0, + 100); + // All downstreamOf, hasOwner, or knowsUser relationships, incoming + assertEqualsAnyOrder( + relatedIncomingEntitiesBeforeRemove, + Arrays.asList( + downstreamOfDatasetTwoRelatedEntity, + downstreamOfDatasetThreeRelatedEntity, + downstreamOfDatasetFourRelatedEntity, + hasOwnerDatasetOneRelatedEntity, + hasOwnerDatasetTwoRelatedEntity, + hasOwnerDatasetThreeRelatedEntity, + hasOwnerDatasetFourRelatedEntity, + knowsUserOneRelatedEntity, + knowsUserTwoRelatedEntity, + // TODO: DGraph and Neo4j do not support via + downstreamOfSchemaFieldOne)); + // TODO: DGraph and Neo4j do not support via + // No checking of split via edge + } + + @Test + @Override + public void testRemoveNode() throws Exception { + GraphService service = getPopulatedGraphService(); + + service.removeNode(datasetTwoUrn); + syncAfterWrite(); + + // assert the modified graph + // All downstreamOf, hasOwner, knowsUser relationships minus datasetTwo's, outgoing + assertEqualsAnyOrder( + service.findRelatedEntities( + anyType, + EMPTY_FILTER, + anyType, + EMPTY_FILTER, + Arrays.asList(downstreamOf, hasOwner, knowsUser), + outgoingRelationships, + 0, + 100), + Arrays.asList( + hasOwnerUserOneRelatedEntity, + hasOwnerUserTwoRelatedEntity, + knowsUserOneRelatedEntity, + knowsUserTwoRelatedEntity, + // TODO: DGraph and Neo4j do not support via + downstreamOfSchemaFieldTwo)); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/dgraph/DgraphGraphServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/dgraph/DgraphGraphServiceTest.java index 1ccf018a74c3a..7e683502dd958 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/dgraph/DgraphGraphServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/dgraph/DgraphGraphServiceTest.java @@ -9,11 +9,12 @@ import com.google.common.collect.ImmutableList; import com.linkedin.metadata.graph.GraphService; -import com.linkedin.metadata.graph.GraphServiceTestBase; +import com.linkedin.metadata.graph.GraphServiceTestBaseNoVia; import com.linkedin.metadata.graph.RelatedEntity; import com.linkedin.metadata.models.registry.LineageRegistry; import com.linkedin.metadata.models.registry.SnapshotEntityRegistry; import com.linkedin.metadata.query.filter.RelationshipDirection; +import com.linkedin.metadata.query.filter.RelationshipFilter; import io.dgraph.DgraphClient; import io.dgraph.DgraphGrpc; import io.grpc.CallOptions; @@ -28,6 +29,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import javax.annotation.Nonnull; @@ -41,7 +43,7 @@ @SuppressWarnings("ArraysAsListWithZeroOrOneArgument") @Slf4j -public class DgraphGraphServiceTest extends GraphServiceTestBase { +public class DgraphGraphServiceTest extends GraphServiceTestBaseNoVia { private ManagedChannel _channel; private DgraphGraphService _service; @@ -823,4 +825,28 @@ public void testGetDestinationUrnsFromResponseData() { public void testPopulatedGraphServiceGetLineageMultihop(boolean attemptMultiHop) { // TODO: Remove this overridden method once the multihop for dGraph is implemented! } + + @Override + @Test(dataProvider = "NoViaFindRelatedEntitiesDestinationTypeTests") + public void testFindRelatedEntitiesDestinationType( + String datasetType, + List relationshipTypes, + RelationshipFilter relationships, + List expectedRelatedEntities) + throws Exception { + super.testFindRelatedEntitiesDestinationType( + datasetType, relationshipTypes, relationships, expectedRelatedEntities); + } + + @Override + @Test(dataProvider = "NoViaFindRelatedEntitiesSourceTypeTests") + public void testFindRelatedEntitiesSourceType( + String datasetType, + List relationshipTypes, + RelationshipFilter relationships, + List expectedRelatedEntities) + throws Exception { + super.testFindRelatedEntitiesSourceType( + datasetType, relationshipTypes, relationships, expectedRelatedEntities); + } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java index f1113368601c6..cff79618b8e09 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/neo4j/Neo4jGraphServiceTest.java @@ -12,6 +12,7 @@ import com.linkedin.metadata.graph.EntityLineageResult; import com.linkedin.metadata.graph.GraphService; import com.linkedin.metadata.graph.GraphServiceTestBase; +import com.linkedin.metadata.graph.GraphServiceTestBaseNoVia; import com.linkedin.metadata.graph.LineageDirection; import com.linkedin.metadata.graph.RelatedEntitiesResult; import com.linkedin.metadata.graph.RelatedEntity; @@ -35,7 +36,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -public class Neo4jGraphServiceTest extends GraphServiceTestBase { +public class Neo4jGraphServiceTest extends GraphServiceTestBaseNoVia { private Neo4jTestServerBuilder _serverBuilder; private Driver _driver; @@ -90,6 +91,7 @@ protected void assertEqualsAnyOrder( } @Override + @Test(dataProvider = "NoViaFindRelatedEntitiesSourceTypeTests") public void testFindRelatedEntitiesSourceType( String datasetType, List relationshipTypes, @@ -110,6 +112,7 @@ public void testFindRelatedEntitiesSourceType( } @Override + @Test(dataProvider = "NoViaFindRelatedEntitiesDestinationTypeTests") public void testFindRelatedEntitiesDestinationType( String datasetType, List relationshipTypes, diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/search/ESGraphQueryDAOTest.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/search/ESGraphQueryDAOTest.java index 5b7f880e6d83a..8ae2725b749d1 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/search/ESGraphQueryDAOTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/search/ESGraphQueryDAOTest.java @@ -7,6 +7,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.metadata.Constants; +import com.linkedin.metadata.config.search.GraphQueryConfiguration; import com.linkedin.metadata.graph.GraphFilters; import com.linkedin.metadata.graph.elastic.ESGraphQueryDAO; import com.linkedin.metadata.models.registry.LineageRegistry; @@ -99,19 +100,21 @@ private static void testGetQueryForLineageFullArguments() throws Exception { Long startTime = 0L; Long endTime = 1L; + ESGraphQueryDAO graphQueryDAO = + new ESGraphQueryDAO(null, null, null, new GraphQueryConfiguration()); QueryBuilder limitedBuilder = - ESGraphQueryDAO.getLineageQueryForEntityType(urns, edgeInfos, graphFilters); + graphQueryDAO.getLineageQueryForEntityType(urns, edgeInfos, graphFilters); QueryBuilder fullBuilder = - ESGraphQueryDAO.getLineageQuery( + graphQueryDAO.getLineageQuery( urnsPerEntityType, edgesPerEntityType, graphFilters, startTime, endTime); QueryBuilder fullBuilderEmptyFilters = - ESGraphQueryDAO.getLineageQuery( + graphQueryDAO.getLineageQuery( urnsPerEntityType, edgesPerEntityType, GraphFilters.emptyGraphFilters, null, null); QueryBuilder fullBuilderMultipleFilters = - ESGraphQueryDAO.getLineageQuery( + graphQueryDAO.getLineageQuery( urnsPerEntityTypeMultiple, edgesPerEntityTypeMultiple, graphFiltersMultiple, diff --git a/metadata-io/src/test/java/com/linkedin/metadata/graph/search/SearchGraphServiceTestBase.java b/metadata-io/src/test/java/com/linkedin/metadata/graph/search/SearchGraphServiceTestBase.java index 71f247ebfc29a..8c184055a6b0d 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/graph/search/SearchGraphServiceTestBase.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/graph/search/SearchGraphServiceTestBase.java @@ -20,7 +20,10 @@ import com.linkedin.metadata.graph.elastic.ESGraphQueryDAO; import com.linkedin.metadata.graph.elastic.ESGraphWriteDAO; import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; +import com.linkedin.metadata.models.registry.ConfigEntityRegistry; +import com.linkedin.metadata.models.registry.EntityRegistryException; import com.linkedin.metadata.models.registry.LineageRegistry; +import com.linkedin.metadata.models.registry.MergedEntityRegistry; import com.linkedin.metadata.models.registry.SnapshotEntityRegistry; import com.linkedin.metadata.query.filter.Filter; import com.linkedin.metadata.query.filter.RelationshipDirection; @@ -30,6 +33,7 @@ import com.linkedin.metadata.utils.elasticsearch.IndexConvention; import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl; import io.datahubproject.test.search.SearchTestUtils; +import io.datahubproject.test.search.config.SearchCommonTestConfiguration; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; @@ -76,7 +80,20 @@ public void wipe() throws Exception { @Nonnull private ElasticSearchGraphService buildService(boolean enableMultiPathSearch) { - LineageRegistry lineageRegistry = new LineageRegistry(SnapshotEntityRegistry.getInstance()); + ConfigEntityRegistry configEntityRegistry = + new ConfigEntityRegistry( + SearchCommonTestConfiguration.class + .getClassLoader() + .getResourceAsStream("entity-registry.yml")); + SnapshotEntityRegistry snapshotEntityRegistry = SnapshotEntityRegistry.getInstance(); + LineageRegistry lineageRegistry; + try { + MergedEntityRegistry mergedEntityRegistry = + new MergedEntityRegistry(snapshotEntityRegistry).apply(configEntityRegistry); + lineageRegistry = new LineageRegistry(mergedEntityRegistry); + } catch (EntityRegistryException e) { + throw new RuntimeException(e); + } GraphQueryConfiguration configuration = GraphQueryConfiguration.testDefaults; configuration.setEnableMultiPathSearch(enableMultiPathSearch); ESGraphQueryDAO readDAO = diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java index cae67108b4ca0..ddfcf4b72776e 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/UpdateIndicesHookTest.java @@ -140,10 +140,23 @@ public void testFineGrainedLineageEdgesAreAdded() throws Exception { Urn downstreamUrn = UrnUtils.getUrn( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD),field_foo)"); + Urn lifeCycleOwner = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD)"); MetadataChangeLog event = createUpstreamLineageMCL(upstreamUrn, downstreamUrn); updateIndicesHook.invoke(event); - Edge edge = new Edge(downstreamUrn, upstreamUrn, DOWNSTREAM_OF, null, null, null, null, null); + Edge edge = + new Edge( + downstreamUrn, + upstreamUrn, + DOWNSTREAM_OF, + null, + null, + null, + null, + null, + lifeCycleOwner, + null); Mockito.verify(mockGraphService, Mockito.times(1)).addEdge(Mockito.eq(edge)); Mockito.verify(mockGraphService, Mockito.times(1)) .removeEdgesFromNode( @@ -164,11 +177,24 @@ public void testFineGrainedLineageEdgesAreAddedRestate() throws Exception { Urn downstreamUrn = UrnUtils.getUrn( "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD),field_foo)"); + Urn lifeCycleOwner = + UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,SampleCypressHiveDataset,PROD)"); MetadataChangeLog event = createUpstreamLineageMCL(upstreamUrn, downstreamUrn, ChangeType.RESTATE); updateIndicesHook.invoke(event); - Edge edge = new Edge(downstreamUrn, upstreamUrn, DOWNSTREAM_OF, null, null, null, null, null); + Edge edge = + new Edge( + downstreamUrn, + upstreamUrn, + DOWNSTREAM_OF, + null, + null, + null, + null, + null, + lifeCycleOwner, + null); Mockito.verify(mockGraphService, Mockito.times(1)).addEdge(Mockito.eq(edge)); Mockito.verify(mockGraphService, Mockito.times(1)) .removeEdgesFromNode( diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/GraphQueryConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/GraphQueryConfiguration.java index 4da50f47e2feb..cd869a61bf3ab 100644 --- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/GraphQueryConfiguration.java +++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/GraphQueryConfiguration.java @@ -12,6 +12,12 @@ public class GraphQueryConfiguration { // will return all paths between the source and destination nodes within the hops limit. private boolean enableMultiPathSearch; + /** + * Adds a boosting query for via nodes being present on a lineage search hit, allows these nodes + * to be prioritized in the case of a multiple path situation with multi-path search disabled + */ + private boolean boostViaNodes; + public static GraphQueryConfiguration testDefaults; static { @@ -20,5 +26,6 @@ public class GraphQueryConfiguration { testDefaults.setTimeoutSeconds(10); testDefaults.setMaxResult(10000); testDefaults.setEnableMultiPathSearch(true); + testDefaults.setBoostViaNodes(true); } } diff --git a/metadata-service/configuration/src/main/resources/application.yml b/metadata-service/configuration/src/main/resources/application.yml index 467b1cf109dee..c0f82d8536922 100644 --- a/metadata-service/configuration/src/main/resources/application.yml +++ b/metadata-service/configuration/src/main/resources/application.yml @@ -236,7 +236,8 @@ elasticsearch: timeoutSeconds: ${ELASTICSEARCH_SEARCH_GRAPH_TIMEOUT_SECONDS:50} # graph dao timeout seconds batchSize: ${ELASTICSEARCH_SEARCH_GRAPH_BATCH_SIZE:1000} # graph dao batch size maxResult: ${ELASTICSEARCH_SEARCH_GRAPH_MAX_RESULT:10000} # graph dao max result size - enableMultiPathSearch: ${ELASTICSEARCH_SEARCH_GRAPH_MULTI_PATH_SEARCH:false} + enableMultiPathSearch: ${ELASTICSEARCH_SEARCH_GRAPH_MULTI_PATH_SEARCH:false} # allows a path to be retraversed to walk all paths to the node instead of just shortest, avoids cycles by not rewalking the visited edge + boostViaNodes: ${ELASTICSEARCH_SEARCH_GRAPH_BOOST_VIA_NODES:true} # adds a boosting query that ranks graph edges with via nodes higher, used to allow via paths to be prioritized when multi path search is disabled # TODO: Kafka topic convention kafka: diff --git a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/RelationshipController.java b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/RelationshipController.java index 3550a86163f51..09bd9f8bb09e5 100644 --- a/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/RelationshipController.java +++ b/metadata-service/openapi-servlet/src/main/java/io/datahubproject/openapi/v2/controller/RelationshipController.java @@ -9,6 +9,7 @@ import com.linkedin.common.urn.Urn; import com.linkedin.common.urn.UrnUtils; import com.linkedin.metadata.authorization.PoliciesConfig; +import com.linkedin.metadata.graph.Edge; import com.linkedin.metadata.graph.RelatedEntities; import com.linkedin.metadata.graph.RelatedEntitiesScrollResult; import com.linkedin.metadata.graph.elastic.ElasticSearchGraphService; @@ -49,16 +50,14 @@ name = "Generic Relationships", description = "APIs for ingesting and accessing entity relationships.") public class RelationshipController { - - private static final String[] SORT_FIELDS = {"source.urn", "destination.urn", "relationshipType"}; - private static final String[] SORT_ORDERS = {"ASCENDING", "ASCENDING", "ASCENDING"}; + private static final String[] SORT_ORDERS = {"ASCENDING", "ASCENDING", "ASCENDING", "ASCENDING"}; private static final List EDGE_SORT_CRITERION; static { EDGE_SORT_CRITERION = - IntStream.range(0, SORT_FIELDS.length) + IntStream.range(0, Edge.KEY_FIELDS.length) .mapToObj( - idx -> SearchUtil.sortBy(SORT_FIELDS[idx], SortOrder.valueOf(SORT_ORDERS[idx]))) + idx -> SearchUtil.sortBy(Edge.KEY_FIELDS[idx], SortOrder.valueOf(SORT_ORDERS[idx]))) .collect(Collectors.toList()); } diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/graph/Edge.java b/metadata-service/services/src/main/java/com/linkedin/metadata/graph/Edge.java index 458b23317c6c8..cb74ae5acd6a6 100644 --- a/metadata-service/services/src/main/java/com/linkedin/metadata/graph/Edge.java +++ b/metadata-service/services/src/main/java/com/linkedin/metadata/graph/Edge.java @@ -1,13 +1,20 @@ package com.linkedin.metadata.graph; import com.linkedin.common.urn.Urn; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Base64; import java.util.Map; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; @Data @AllArgsConstructor +@Slf4j public class Edge { @EqualsAndHashCode.Include private Urn source; @EqualsAndHashCode.Include private Urn destination; @@ -18,7 +25,7 @@ public class Edge { @EqualsAndHashCode.Exclude private Urn updatedActor; @EqualsAndHashCode.Exclude private Map properties; // The entity who owns the lifecycle of this edge - @EqualsAndHashCode.Exclude private Urn lifecycleOwner; + @EqualsAndHashCode.Include private Urn lifecycleOwner; // An entity through which the edge between source and destination is created @EqualsAndHashCode.Include private Urn via; @@ -44,4 +51,32 @@ public Edge( null, null); } + + public String toDocId() { + StringBuilder rawDocId = new StringBuilder(); + rawDocId + .append(getSource().toString()) + .append(DOC_DELIMETER) + .append(getRelationshipType()) + .append(DOC_DELIMETER) + .append(getDestination().toString()); + if (getLifecycleOwner() != null && StringUtils.isNotBlank(getLifecycleOwner().toString())) { + rawDocId.append(DOC_DELIMETER).append(getLifecycleOwner().toString()); + } + + try { + byte[] bytesOfRawDocID = rawDocId.toString().getBytes(StandardCharsets.UTF_8); + MessageDigest md = MessageDigest.getInstance("MD5"); + byte[] thedigest = md.digest(bytesOfRawDocID); + return Base64.getEncoder().encodeToString(thedigest); + } catch (NoSuchAlgorithmException e) { + log.error("Unable to hash document ID, returning unhashed id: " + rawDocId); + return rawDocId.toString(); + } + } + + public static final String[] KEY_FIELDS = { + "source.urn", "destination.urn", "relationshipType", "lifeCycleOwner" + }; + private static final String DOC_DELIMETER = "--"; }