get(DataFetchingEnvironment environmen
sanitizedQuery,
start,
count,
- context.getAuthentication());
+ context.getAuthentication(),
+ searchFlags);
return mapBrowseResults(browseResults);
} catch (Exception e) {
throw new RuntimeException("Failed to execute browse V2", e);
diff --git a/datahub-graphql-core/src/main/resources/entity.graphql b/datahub-graphql-core/src/main/resources/entity.graphql
index 0074dc3fcb44c..762514b480fca 100644
--- a/datahub-graphql-core/src/main/resources/entity.graphql
+++ b/datahub-graphql-core/src/main/resources/entity.graphql
@@ -1763,6 +1763,12 @@ input AspectParams {
Only fetch auto render aspects
"""
autoRenderOnly: Boolean
+
+ """
+ Fetch using aspect names
+ If absent, returns all aspects matching other inputs
+ """
+ aspectNames: [String!]
}
@@ -6788,6 +6794,12 @@ type Assertion implements EntityWithRelationships & Entity {
Edges extending from this entity grouped by direction in the lineage graph
"""
lineage(input: LineageInput!): EntityLineageResult
+
+ """
+ Experimental API.
+ For fetching extra aspects that do not have custom UI code yet
+ """
+ aspects(input: AspectParams): [RawAspect!]
}
"""
diff --git a/datahub-graphql-core/src/main/resources/search.graphql b/datahub-graphql-core/src/main/resources/search.graphql
index 2b921601058fb..a906362cee185 100644
--- a/datahub-graphql-core/src/main/resources/search.graphql
+++ b/datahub-graphql-core/src/main/resources/search.graphql
@@ -1230,6 +1230,11 @@ input BrowseV2Input {
The search query string
"""
query: String
+
+ """
+ Flags controlling search options
+ """
+ searchFlags: SearchFlags
}
"""
diff --git a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/browse/BrowseV2ResolverTest.java b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/browse/BrowseV2ResolverTest.java
index c565e771a0475..41797fac636f1 100644
--- a/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/browse/BrowseV2ResolverTest.java
+++ b/datahub-graphql-core/src/test/java/com/linkedin/datahub/graphql/resolvers/browse/BrowseV2ResolverTest.java
@@ -21,6 +21,7 @@
import com.linkedin.metadata.browse.BrowseResultGroupV2Array;
import com.linkedin.metadata.browse.BrowseResultMetadata;
import com.linkedin.metadata.browse.BrowseResultV2;
+import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.Criterion;
@@ -262,7 +263,8 @@ private static EntityClient initMockEntityClient(
Mockito.eq(query),
Mockito.eq(start),
Mockito.eq(limit),
- Mockito.any(Authentication.class)))
+ Mockito.any(Authentication.class),
+ Mockito.nullable(SearchFlags.class)))
.thenReturn(result);
return client;
}
diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java
index ff8bd542fbdff..50847da07be73 100644
--- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java
+++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/UpgradeCliApplication.java
@@ -2,6 +2,10 @@
import com.linkedin.gms.factory.auth.AuthorizerChainFactory;
import com.linkedin.gms.factory.auth.DataHubAuthorizerFactory;
+import com.linkedin.gms.factory.graphql.GraphQLEngineFactory;
+import com.linkedin.gms.factory.kafka.KafkaEventConsumerFactory;
+import com.linkedin.gms.factory.kafka.SimpleKafkaConsumerFactory;
+import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory;
import com.linkedin.gms.factory.telemetry.ScheduledAnalyticsFactory;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -24,7 +28,11 @@
classes = {
ScheduledAnalyticsFactory.class,
AuthorizerChainFactory.class,
- DataHubAuthorizerFactory.class
+ DataHubAuthorizerFactory.class,
+ SimpleKafkaConsumerFactory.class,
+ KafkaEventConsumerFactory.class,
+ InternalSchemaRegistryFactory.class,
+ GraphQLEngineFactory.class
})
})
public class UpgradeCliApplication {
diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillBrowsePathsV2Config.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillBrowsePathsV2Config.java
index 406963c58fd71..2b2f4648f76e7 100644
--- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillBrowsePathsV2Config.java
+++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/BackfillBrowsePathsV2Config.java
@@ -3,6 +3,7 @@
import com.linkedin.datahub.upgrade.system.entity.steps.BackfillBrowsePathsV2;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.SearchService;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -11,7 +12,12 @@ public class BackfillBrowsePathsV2Config {
@Bean
public BackfillBrowsePathsV2 backfillBrowsePathsV2(
- EntityService> entityService, SearchService searchService) {
- return new BackfillBrowsePathsV2(entityService, searchService);
+ EntityService> entityService,
+ SearchService searchService,
+ @Value("${systemUpdate.browsePathsV2.enabled}") final boolean enabled,
+ @Value("${systemUpdate.browsePathsV2.reprocess.enabled}") final boolean reprocessEnabled,
+ @Value("${systemUpdate.browsePathsV2.batchSize}") final Integer batchSize) {
+ return new BackfillBrowsePathsV2(
+ entityService, searchService, enabled, reprocessEnabled, batchSize);
}
}
diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/ReindexDataJobViaNodesCLLConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/ReindexDataJobViaNodesCLLConfig.java
index 06311e1853874..83dad80944f5f 100644
--- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/ReindexDataJobViaNodesCLLConfig.java
+++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/ReindexDataJobViaNodesCLLConfig.java
@@ -2,6 +2,7 @@
import com.linkedin.datahub.upgrade.system.via.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.entity.EntityService;
+import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -9,7 +10,10 @@
public class ReindexDataJobViaNodesCLLConfig {
@Bean
- public ReindexDataJobViaNodesCLL _reindexDataJobViaNodesCLL(EntityService> entityService) {
- return new ReindexDataJobViaNodesCLL(entityService);
+ public ReindexDataJobViaNodesCLL _reindexDataJobViaNodesCLL(
+ EntityService> entityService,
+ @Value("${systemUpdate.dataJobNodeCLL.enabled}") final boolean enabled,
+ @Value("${systemUpdate.dataJobNodeCLL.batchSize}") final Integer batchSize) {
+ return new ReindexDataJobViaNodesCLL(entityService, enabled, batchSize);
}
}
diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateCondition.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateCondition.java
new file mode 100644
index 0000000000000..ea432dfa9f7df
--- /dev/null
+++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateCondition.java
@@ -0,0 +1,14 @@
+package com.linkedin.datahub.upgrade.config;
+
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.context.annotation.Condition;
+import org.springframework.context.annotation.ConditionContext;
+import org.springframework.core.type.AnnotatedTypeMetadata;
+
+public class SystemUpdateCondition implements Condition {
+ @Override
+ public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
+ return context.getBeanFactory().getBean(ApplicationArguments.class).getNonOptionArgs().stream()
+ .anyMatch("SystemUpdate"::equals);
+ }
+}
diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java
index 177d4b531ba86..cde3a29248fb5 100644
--- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java
+++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/config/SystemUpdateConfig.java
@@ -8,6 +8,7 @@
import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.kafka.DataHubKafkaProducerFactory;
+import com.linkedin.gms.factory.kafka.schemaregistry.InternalSchemaRegistryFactory;
import com.linkedin.gms.factory.kafka.schemaregistry.SchemaRegistryConfig;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
@@ -21,9 +22,12 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
@Slf4j
@Configuration
@@ -74,4 +78,23 @@ protected KafkaEventProducer duheKafkaEventProducer(
duheSchemaRegistryConfig, kafkaConfiguration, properties));
return new KafkaEventProducer(producer, topicConvention, kafkaHealthChecker);
}
+
+ /**
+ * The ReindexDataJobViaNodesCLLConfig step requires publishing to MCL. Overriding the default
+ * producer with this special producer which doesn't require an active registry.
+ *
+ * Use when INTERNAL registry and is SYSTEM_UPDATE
+ *
+ *
This forces this producer into the EntityService
+ */
+ @Primary
+ @Bean(name = "kafkaEventProducer")
+ @Conditional(SystemUpdateCondition.class)
+ @ConditionalOnProperty(
+ name = "kafka.schemaRegistry.type",
+ havingValue = InternalSchemaRegistryFactory.TYPE)
+ protected KafkaEventProducer kafkaEventProducer(
+ @Qualifier("duheKafkaEventProducer") KafkaEventProducer kafkaEventProducer) {
+ return kafkaEventProducer;
+ }
}
diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2.java
index 4b9fc5bba0204..9b023e1e239a2 100644
--- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2.java
+++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2.java
@@ -11,8 +11,20 @@ public class BackfillBrowsePathsV2 implements Upgrade {
private final List _steps;
- public BackfillBrowsePathsV2(EntityService> entityService, SearchService searchService) {
- _steps = ImmutableList.of(new BackfillBrowsePathsV2Step(entityService, searchService));
+ public BackfillBrowsePathsV2(
+ EntityService> entityService,
+ SearchService searchService,
+ boolean enabled,
+ boolean reprocessEnabled,
+ Integer batchSize) {
+ if (enabled) {
+ _steps =
+ ImmutableList.of(
+ new BackfillBrowsePathsV2Step(
+ entityService, searchService, reprocessEnabled, batchSize));
+ } else {
+ _steps = ImmutableList.of();
+ }
}
@Override
diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2Step.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2Step.java
index 601ce4d25493c..2d64e0052ae82 100644
--- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2Step.java
+++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/entity/steps/BackfillBrowsePathsV2Step.java
@@ -16,6 +16,7 @@
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.aspect.utils.DefaultAspectsUtil;
+import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.Condition;
@@ -37,9 +38,8 @@
@Slf4j
public class BackfillBrowsePathsV2Step implements UpgradeStep {
- public static final String BACKFILL_BROWSE_PATHS_V2 = "BACKFILL_BROWSE_PATHS_V2";
- public static final String REPROCESS_DEFAULT_BROWSE_PATHS_V2 =
- "REPROCESS_DEFAULT_BROWSE_PATHS_V2";
+ private static final String UPGRADE_ID = "BackfillBrowsePathsV2Step";
+ private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);
public static final String DEFAULT_BROWSE_PATH_V2 = "␟Default";
private static final Set ENTITY_TYPES_TO_MIGRATE =
@@ -53,14 +53,22 @@ public class BackfillBrowsePathsV2Step implements UpgradeStep {
Constants.ML_MODEL_GROUP_ENTITY_NAME,
Constants.ML_FEATURE_TABLE_ENTITY_NAME,
Constants.ML_FEATURE_ENTITY_NAME);
- private static final Integer BATCH_SIZE = 5000;
- private final EntityService> _entityService;
- private final SearchService _searchService;
-
- public BackfillBrowsePathsV2Step(EntityService> entityService, SearchService searchService) {
- _searchService = searchService;
- _entityService = entityService;
+ private final EntityService> entityService;
+ private final SearchService searchService;
+
+ private final boolean reprocessEnabled;
+ private final Integer batchSize;
+
+ public BackfillBrowsePathsV2Step(
+ EntityService> entityService,
+ SearchService searchService,
+ boolean reprocessEnabled,
+ Integer batchSize) {
+ this.searchService = searchService;
+ this.entityService = entityService;
+ this.reprocessEnabled = reprocessEnabled;
+ this.batchSize = batchSize;
}
@Override
@@ -78,11 +86,14 @@ public Function executable() {
log.info(
String.format(
"Upgrading batch %s-%s of browse paths for entity type %s",
- migratedCount, migratedCount + BATCH_SIZE, entityType));
+ migratedCount, migratedCount + batchSize, entityType));
scrollId = backfillBrowsePathsV2(entityType, auditStamp, scrollId);
- migratedCount += BATCH_SIZE;
+ migratedCount += batchSize;
} while (scrollId != null);
}
+
+ BootstrapStep.setUpgradeResult(UPGRADE_ID_URN, entityService);
+
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
};
}
@@ -91,27 +102,27 @@ private String backfillBrowsePathsV2(String entityType, AuditStamp auditStamp, S
final Filter filter;
- if (System.getenv().containsKey(REPROCESS_DEFAULT_BROWSE_PATHS_V2)
- && Boolean.parseBoolean(System.getenv(REPROCESS_DEFAULT_BROWSE_PATHS_V2))) {
+ if (reprocessEnabled) {
filter = backfillDefaultBrowsePathsV2Filter();
} else {
filter = backfillBrowsePathsV2Filter();
}
final ScrollResult scrollResult =
- _searchService.scrollAcrossEntities(
+ searchService.scrollAcrossEntities(
ImmutableList.of(entityType),
"*",
filter,
null,
scrollId,
null,
- BATCH_SIZE,
+ batchSize,
new SearchFlags()
.setFulltext(true)
.setSkipCache(true)
.setSkipHighlighting(true)
.setSkipAggregates(true));
+
if (scrollResult.getNumEntities() == 0 || scrollResult.getEntities().size() == 0) {
return null;
}
@@ -183,7 +194,7 @@ private Filter backfillDefaultBrowsePathsV2Filter() {
private void ingestBrowsePathsV2(Urn urn, AuditStamp auditStamp) throws Exception {
BrowsePathsV2 browsePathsV2 =
- DefaultAspectsUtil.buildDefaultBrowsePathV2(urn, true, _entityService);
+ DefaultAspectsUtil.buildDefaultBrowsePathV2(urn, true, entityService);
log.debug(String.format("Adding browse path v2 for urn %s with value %s", urn, browsePathsV2));
MetadataChangeProposal proposal = new MetadataChangeProposal();
proposal.setEntityUrn(urn);
@@ -193,12 +204,12 @@ private void ingestBrowsePathsV2(Urn urn, AuditStamp auditStamp) throws Exceptio
proposal.setSystemMetadata(
new SystemMetadata().setRunId(DEFAULT_RUN_ID).setLastObserved(System.currentTimeMillis()));
proposal.setAspect(GenericRecordUtils.serializeAspect(browsePathsV2));
- _entityService.ingestProposal(proposal, auditStamp, true);
+ entityService.ingestProposal(proposal, auditStamp, true);
}
@Override
public String id() {
- return "BackfillBrowsePathsV2Step";
+ return UPGRADE_ID;
}
/**
@@ -211,7 +222,22 @@ public boolean isOptional() {
}
@Override
+ /**
+ * Returns whether the upgrade should be skipped. Uses previous run history or the environment
+ * variables REPROCESS_DEFAULT_BROWSE_PATHS_V2 & BACKFILL_BROWSE_PATHS_V2 to determine whether to
+ * skip.
+ */
public boolean skip(UpgradeContext context) {
- return !Boolean.parseBoolean(System.getenv(BACKFILL_BROWSE_PATHS_V2));
+ boolean envEnabled = Boolean.parseBoolean(System.getenv("BACKFILL_BROWSE_PATHS_V2"));
+
+ if (reprocessEnabled && envEnabled) {
+ return false;
+ }
+
+ boolean previouslyRun = entityService.exists(UPGRADE_ID_URN, true);
+ if (previouslyRun) {
+ log.info("{} was already run. Skipping.", id());
+ }
+ return (previouslyRun || !envEnabled);
}
}
diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLL.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLL.java
index 41179a50c4b54..59975693322d1 100644
--- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLL.java
+++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLL.java
@@ -18,8 +18,13 @@ public class ReindexDataJobViaNodesCLL implements Upgrade {
private final List _steps;
- public ReindexDataJobViaNodesCLL(EntityService> entityService) {
- _steps = ImmutableList.of(new ReindexDataJobViaNodesCLLStep(entityService));
+ public ReindexDataJobViaNodesCLL(
+ EntityService> entityService, boolean enabled, Integer batchSize) {
+ if (enabled) {
+ _steps = ImmutableList.of(new ReindexDataJobViaNodesCLLStep(entityService, batchSize));
+ } else {
+ _steps = ImmutableList.of();
+ }
}
@Override
diff --git a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLLStep.java b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLLStep.java
index 70afbc3d205b2..56166caf5b57e 100644
--- a/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLLStep.java
+++ b/datahub-upgrade/src/main/java/com/linkedin/datahub/upgrade/system/via/ReindexDataJobViaNodesCLLStep.java
@@ -11,7 +11,6 @@
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesResult;
-import java.net.URISyntaxException;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
@@ -21,12 +20,12 @@ public class ReindexDataJobViaNodesCLLStep implements UpgradeStep {
private static final String UPGRADE_ID = "via-node-cll-reindex-datajob";
private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);
- private static final Integer BATCH_SIZE = 5000;
+ private final EntityService> entityService;
+ private final Integer batchSize;
- private final EntityService _entityService;
-
- public ReindexDataJobViaNodesCLLStep(EntityService entityService) {
- _entityService = entityService;
+ public ReindexDataJobViaNodesCLLStep(EntityService> entityService, Integer batchSize) {
+ this.entityService = entityService;
+ this.batchSize = batchSize;
}
@Override
@@ -35,17 +34,16 @@ public Function executable() {
RestoreIndicesArgs args =
new RestoreIndicesArgs()
.setAspectName(DATA_JOB_INPUT_OUTPUT_ASPECT_NAME)
- .setUrnLike("urn:li:" + DATA_JOB_ENTITY_NAME + ":%");
+ .setUrnLike("urn:li:" + DATA_JOB_ENTITY_NAME + ":%")
+ .setBatchSize(batchSize);
RestoreIndicesResult result =
- _entityService.restoreIndices(args, x -> context.report().addLine((String) x));
+ entityService.restoreIndices(args, x -> context.report().addLine((String) x));
context.report().addLine("Rows migrated: " + result.rowsMigrated);
context.report().addLine("Rows ignored: " + result.ignored);
- try {
- BootstrapStep.setUpgradeResult(UPGRADE_ID_URN, _entityService);
- context.report().addLine("State updated: " + UPGRADE_ID_URN);
- } catch (URISyntaxException e) {
- throw new RuntimeException(e);
- }
+
+ BootstrapStep.setUpgradeResult(UPGRADE_ID_URN, entityService);
+ context.report().addLine("State updated: " + UPGRADE_ID_URN);
+
return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
};
}
@@ -70,7 +68,7 @@ public boolean isOptional() {
* variable SKIP_REINDEX_DATA_JOB_INPUT_OUTPUT to determine whether to skip.
*/
public boolean skip(UpgradeContext context) {
- boolean previouslyRun = _entityService.exists(UPGRADE_ID_URN, true);
+ boolean previouslyRun = entityService.exists(UPGRADE_ID_URN, true);
boolean envFlagRecommendsSkip =
Boolean.parseBoolean(System.getenv("SKIP_REINDEX_DATA_JOB_INPUT_OUTPUT"));
if (previouslyRun) {
diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java
index 83b8e028727ce..4c9e12c0ed151 100644
--- a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java
+++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/DatahubUpgradeNoSchemaRegistryTest.java
@@ -4,6 +4,8 @@
import static org.testng.AssertJUnit.assertNotNull;
import com.linkedin.datahub.upgrade.system.SystemUpdate;
+import com.linkedin.metadata.dao.producer.KafkaEventProducer;
+import com.linkedin.metadata.entity.EntityServiceImpl;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -19,19 +21,37 @@
classes = {UpgradeCliApplication.class, UpgradeCliApplicationTestConfiguration.class},
properties = {
"kafka.schemaRegistry.type=INTERNAL",
- "DATAHUB_UPGRADE_HISTORY_TOPIC_NAME=test_due_topic"
- })
+ "DATAHUB_UPGRADE_HISTORY_TOPIC_NAME=test_due_topic",
+ "METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME=test_mcl_versioned_topic"
+ },
+ args = {"-u", "SystemUpdate"})
public class DatahubUpgradeNoSchemaRegistryTest extends AbstractTestNGSpringContextTests {
@Autowired
@Named("systemUpdate")
private SystemUpdate systemUpdate;
+ @Autowired
+ @Named("kafkaEventProducer")
+ private KafkaEventProducer kafkaEventProducer;
+
+ @Autowired
+ @Named("duheKafkaEventProducer")
+ private KafkaEventProducer duheKafkaEventProducer;
+
+ @Autowired private EntityServiceImpl entityService;
+
@Test
public void testSystemUpdateInit() {
assertNotNull(systemUpdate);
}
+ @Test
+ public void testSystemUpdateKafkaProducerOverride() {
+ assertEquals(kafkaEventProducer, duheKafkaEventProducer);
+ assertEquals(entityService.get_producer(), duheKafkaEventProducer);
+ }
+
@Test
public void testSystemUpdateSend() {
UpgradeStepResult.Result result =
diff --git a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java
index be28b7f739cf5..5c2d6fff0f07c 100644
--- a/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java
+++ b/datahub-upgrade/src/test/java/com/linkedin/datahub/upgrade/UpgradeCliApplicationTestConfiguration.java
@@ -1,15 +1,21 @@
package com.linkedin.datahub.upgrade;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import com.linkedin.gms.factory.auth.SystemAuthenticationFactory;
-import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphService;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
+import com.linkedin.metadata.registry.SchemaRegistryService;
import com.linkedin.metadata.search.SearchService;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.EntityIndexBuilders;
import io.ebean.Database;
+import java.util.Optional;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.mock.mockito.MockBean;
+import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
@TestConfiguration
@@ -20,8 +26,6 @@ public class UpgradeCliApplicationTestConfiguration {
@MockBean private Database ebeanServer;
- @MockBean private EntityService> _entityService;
-
@MockBean private SearchService searchService;
@MockBean private GraphService graphService;
@@ -31,4 +35,11 @@ public class UpgradeCliApplicationTestConfiguration {
@MockBean ConfigEntityRegistry configEntityRegistry;
@MockBean public EntityIndexBuilders entityIndexBuilders;
+
+ @Bean
+ public SchemaRegistryService schemaRegistryService() {
+ SchemaRegistryService mockService = mock(SchemaRegistryService.class);
+ when(mockService.getSchemaIdForTopic(anyString())).thenReturn(Optional.of(0));
+ return mockService;
+ }
}
diff --git a/docs-website/src/pages/slack/index.js b/docs-website/src/pages/slack/index.js
index c85a1eefe5545..5989224191112 100644
--- a/docs-website/src/pages/slack/index.js
+++ b/docs-website/src/pages/slack/index.js
@@ -36,7 +36,7 @@ function SlackSurvey() {
Join the DataHub Slack Community!
-
We will send the link to join our Slack community to your email.
+
We'd love to find out a little more about you!
diff --git a/docs/lineage/airflow.md b/docs/lineage/airflow.md
index da3a36bc87be5..94eb69a2ed827 100644
--- a/docs/lineage/airflow.md
+++ b/docs/lineage/airflow.md
@@ -135,6 +135,8 @@ conn_id = datahub_rest_default # or datahub_kafka_default
| capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. |
| capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
| capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. |
+| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid.
+ |
| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |
#### Validate that the plugin is working
diff --git a/metadata-events/mxe-utils-avro/src/main/java/com/linkedin/metadata/EventUtils.java b/metadata-events/mxe-utils-avro/src/main/java/com/linkedin/metadata/EventUtils.java
index 645c2fe210e09..adff32d5d336d 100644
--- a/metadata-events/mxe-utils-avro/src/main/java/com/linkedin/metadata/EventUtils.java
+++ b/metadata-events/mxe-utils-avro/src/main/java/com/linkedin/metadata/EventUtils.java
@@ -57,7 +57,7 @@ public class EventUtils {
private static final Schema ORIGINAL_MCP_AVRO_SCHEMA =
getAvroSchemaFromResource("avro/com/linkedin/mxe/MetadataChangeProposal.avsc");
- private static final Schema ORIGINAL_MCL_AVRO_SCHEMA =
+ public static final Schema ORIGINAL_MCL_AVRO_SCHEMA =
getAvroSchemaFromResource("avro/com/linkedin/mxe/MetadataChangeLog.avsc");
private static final Schema ORIGINAL_FMCL_AVRO_SCHEMA =
diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py
index 67843da2ba995..48d462b85702a 100644
--- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py
+++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py
@@ -1,3 +1,4 @@
+from enum import Enum
from typing import TYPE_CHECKING, Optional
import datahub.emitter.mce_builder as builder
@@ -8,6 +9,11 @@
from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook
+class DatajobUrl(Enum):
+ GRID = "grid"
+ TASKINSTANCE = "taskinstance"
+
+
class DatahubLineageConfig(ConfigModel):
# This class is shared between the lineage backend and the Airflow plugin.
# The defaults listed here are only relevant for the lineage backend.
@@ -41,6 +47,8 @@ class DatahubLineageConfig(ConfigModel):
# The Airflow plugin behaves as if it were set to True.
graceful_exceptions: bool = True
+ datajob_url_link: DatajobUrl = DatajobUrl.TASKINSTANCE
+
def make_emitter_hook(self) -> "DatahubGenericHook":
# This is necessary to avoid issues with circular imports.
from datahub_airflow_plugin.hooks.datahub import DatahubGenericHook
@@ -65,6 +73,9 @@ def get_lineage_config() -> DatahubLineageConfig:
disable_openlineage_plugin = conf.get(
"datahub", "disable_openlineage_plugin", fallback=True
)
+ datajob_url_link = conf.get(
+ "datahub", "datajob_url_link", fallback=DatajobUrl.TASKINSTANCE.value
+ )
return DatahubLineageConfig(
enabled=enabled,
@@ -77,4 +88,5 @@ def get_lineage_config() -> DatahubLineageConfig:
log_level=log_level,
debug_emitter=debug_emitter,
disable_openlineage_plugin=disable_openlineage_plugin,
+ datajob_url_link=datajob_url_link,
)
diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py
index e1d53be7bae6b..2fa15f13e848b 100644
--- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py
+++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/client/airflow_generator.py
@@ -13,6 +13,7 @@
from datahub.utilities.urns.data_job_urn import DataJobUrn
from datahub_airflow_plugin._airflow_compat import AIRFLOW_PATCHED
+from datahub_airflow_plugin._config import DatahubLineageConfig, DatajobUrl
assert AIRFLOW_PATCHED
@@ -208,6 +209,7 @@ def generate_datajob(
set_dependencies: bool = True,
capture_owner: bool = True,
capture_tags: bool = True,
+ config: Optional[DatahubLineageConfig] = None,
) -> DataJob:
"""
@@ -217,6 +219,7 @@ def generate_datajob(
:param set_dependencies: bool - whether to extract dependencies from airflow task
:param capture_owner: bool - whether to extract owner from airflow task
:param capture_tags: bool - whether to set tags automatically from airflow task
+ :param config: DatahubLineageConfig
:return: DataJob - returns the generated DataJob object
"""
dataflow_urn = DataFlowUrn.create_from_ids(
@@ -267,7 +270,11 @@ def generate_datajob(
datajob.properties = job_property_bag
base_url = conf.get("webserver", "base_url")
- datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.get_flow_id()}&_flt_3_task_id={task.task_id}"
+
+ if config and config.datajob_url_link == DatajobUrl.GRID:
+ datajob.url = f"{base_url}/dags/{datajob.flow_urn.get_flow_id()}/grid?task_id={task.task_id}"
+ else:
+ datajob.url = f"{base_url}/taskinstance/list/?flt1_dag_id_equals={datajob.flow_urn.get_flow_id()}&_flt_3_task_id={task.task_id}"
if capture_owner and dag.owner:
datajob.owners.add(dag.owner)
@@ -290,9 +297,12 @@ def create_datajob_instance(
task: "Operator",
dag: "DAG",
data_job: Optional[DataJob] = None,
+ config: Optional[DatahubLineageConfig] = None,
) -> DataProcessInstance:
if data_job is None:
- data_job = AirflowGenerator.generate_datajob(cluster, task=task, dag=dag)
+ data_job = AirflowGenerator.generate_datajob(
+ cluster, task=task, dag=dag, config=config
+ )
dpi = DataProcessInstance.from_datajob(
datajob=data_job, id=task.task_id, clone_inlets=True, clone_outlets=True
)
@@ -407,9 +417,12 @@ def run_datajob(
datajob: Optional[DataJob] = None,
attempt: Optional[int] = None,
emit_templates: bool = True,
+ config: Optional[DatahubLineageConfig] = None,
) -> DataProcessInstance:
if datajob is None:
- datajob = AirflowGenerator.generate_datajob(cluster, ti.task, dag)
+ datajob = AirflowGenerator.generate_datajob(
+ cluster, ti.task, dag, config=config
+ )
assert dag_run.run_id
dpi = DataProcessInstance.from_datajob(
@@ -480,6 +493,7 @@ def complete_datajob(
end_timestamp_millis: Optional[int] = None,
result: Optional[InstanceRunResult] = None,
datajob: Optional[DataJob] = None,
+ config: Optional[DatahubLineageConfig] = None,
) -> DataProcessInstance:
"""
@@ -491,10 +505,13 @@ def complete_datajob(
:param end_timestamp_millis: Optional[int]
:param result: Optional[str] One of the result from datahub.metadata.schema_class.RunResultTypeClass
:param datajob: Optional[DataJob]
+ :param config: Optional[DatahubLineageConfig]
:return: DataProcessInstance
"""
if datajob is None:
- datajob = AirflowGenerator.generate_datajob(cluster, ti.task, dag)
+ datajob = AirflowGenerator.generate_datajob(
+ cluster, ti.task, dag, config=config
+ )
if end_timestamp_millis is None:
if ti.end_date:
diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
index a7f588a166dde..475f3791bc0c8 100644
--- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
+++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py
@@ -376,6 +376,7 @@ def on_task_instance_running(
dag=dag,
capture_tags=self.config.capture_tags_info,
capture_owner=self.config.capture_ownership_info,
+ config=self.config,
)
# TODO: Make use of get_task_location to extract github urls.
@@ -397,6 +398,7 @@ def on_task_instance_running(
dag_run=dagrun,
datajob=datajob,
emit_templates=False,
+ config=self.config,
)
logger.debug(f"Emitted DataHub DataProcess Instance start: {dpi}")
@@ -419,6 +421,7 @@ def on_task_instance_finish(
dag=dag,
capture_tags=self.config.capture_tags_info,
capture_owner=self.config.capture_ownership_info,
+ config=self.config,
)
# Add lineage info.
@@ -436,6 +439,7 @@ def on_task_instance_finish(
dag_run=dagrun,
datajob=datajob,
result=status,
+ config=self.config,
)
logger.debug(
f"Emitted DataHub DataProcess Instance with status {status}: {dpi}"
diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py
index 51a4151bc8207..7b8d719712d10 100644
--- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py
+++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py
@@ -120,6 +120,7 @@ def datahub_task_status_callback(context, status):
dag=dag,
capture_tags=config.capture_tags_info,
capture_owner=config.capture_ownership_info,
+ config=config,
)
datajob.inlets.extend(
entities_to_dataset_urn_list([let.urn for let in task_inlets])
@@ -143,6 +144,7 @@ def datahub_task_status_callback(context, status):
dag_run=context["dag_run"],
datajob=datajob,
start_timestamp_millis=int(ti.start_date.timestamp() * 1000),
+ config=config,
)
task.log.info(f"Emitted Start Datahub Dataprocess Instance: {dpi}")
@@ -185,6 +187,7 @@ def datahub_pre_execution(context):
dag=dag,
capture_tags=config.capture_tags_info,
capture_owner=config.capture_ownership_info,
+ config=config,
)
datajob.inlets.extend(
entities_to_dataset_urn_list([let.urn for let in task_inlets])
@@ -208,6 +211,7 @@ def datahub_pre_execution(context):
dag_run=context["dag_run"],
datajob=datajob,
start_timestamp_millis=int(ti.start_date.timestamp() * 1000),
+ config=config,
)
task.log.info(f"Emitting Datahub Dataprocess Instance: {dpi}")
diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py
index 75fc79443e49e..daf45e1cd83f8 100644
--- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py
+++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py
@@ -51,6 +51,7 @@ def send_lineage_to_datahub(
dag=dag,
capture_tags=config.capture_tags_info,
capture_owner=config.capture_ownership_info,
+ config=config,
)
datajob.inlets.extend(entities_to_dataset_urn_list([let.urn for let in inlets]))
datajob.outlets.extend(entities_to_dataset_urn_list([let.urn for let in outlets]))
diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/datahub.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/datahub.py
index 3ebe7831d08f9..6f81812ea766e 100644
--- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/datahub.py
+++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/datahub.py
@@ -71,7 +71,11 @@ def send_lineage(
try:
context = context or {} # ensure not None to satisfy mypy
send_lineage_to_datahub(
- config, operator, operator.inlets, operator.outlets, context
+ config,
+ operator,
+ operator.inlets,
+ operator.outlets,
+ context,
)
except Exception as e:
operator.log.error(e)
diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py
index bbbab73fd1cf5..74dcde5e066b3 100644
--- a/metadata-ingestion/setup.py
+++ b/metadata-ingestion/setup.py
@@ -245,6 +245,10 @@
powerbi_report_server = {"requests", "requests_ntlm"}
+slack = {
+ "slack-sdk==3.18.1"
+}
+
databricks = {
# 0.1.11 appears to have authentication issues with azure databricks
"databricks-sdk>=0.9.0",
@@ -367,6 +371,7 @@
"snowflake": snowflake_common | usage_common | sqlglot_lib,
"sqlalchemy": sql_common,
"sql-queries": usage_common | sqlglot_lib,
+ "slack": slack,
"superset": {
"requests",
"sqlalchemy",
@@ -503,6 +508,7 @@
"redshift",
"s3",
"snowflake",
+ "slack",
"tableau",
"teradata",
"trino",
@@ -543,6 +549,7 @@
"kafka-connect",
"ldap",
"mongodb",
+ "slack",
"mssql",
"mysql",
"mariadb",
@@ -597,6 +604,7 @@
"postgres = datahub.ingestion.source.sql.postgres:PostgresSource",
"redash = datahub.ingestion.source.redash:RedashSource",
"redshift = datahub.ingestion.source.redshift.redshift:RedshiftSource",
+ "slack = datahub.ingestion.source.slack.slack:SlackSource",
"snowflake = datahub.ingestion.source.snowflake.snowflake_v2:SnowflakeV2Source",
"superset = datahub.ingestion.source.superset:SupersetSource",
"tableau = datahub.ingestion.source.tableau:TableauSource",
diff --git a/metadata-ingestion/src/datahub/ingestion/source/slack/__init__.py b/metadata-ingestion/src/datahub/ingestion/source/slack/__init__.py
new file mode 100644
index 0000000000000..e69de29bb2d1d
diff --git a/metadata-ingestion/src/datahub/ingestion/source/slack/slack.py b/metadata-ingestion/src/datahub/ingestion/source/slack/slack.py
new file mode 100644
index 0000000000000..ed425cc25d98f
--- /dev/null
+++ b/metadata-ingestion/src/datahub/ingestion/source/slack/slack.py
@@ -0,0 +1,181 @@
+import logging
+import textwrap
+from dataclasses import dataclass
+from typing import Iterable, Optional
+
+from pydantic import Field, SecretStr
+from slack_sdk import WebClient
+
+from datahub.configuration.common import ConfigModel
+from datahub.emitter.mcp import MetadataChangeProposalWrapper
+from datahub.ingestion.api.common import PipelineContext
+from datahub.ingestion.api.decorators import (
+ SupportStatus,
+ config_class,
+ platform_name,
+ support_status,
+)
+from datahub.ingestion.api.source import (
+ SourceReport,
+ TestableSource,
+ TestConnectionReport,
+)
+from datahub.ingestion.api.workunit import MetadataWorkUnit
+from datahub.metadata.schema_classes import CorpUserEditableInfoClass
+from datahub.utilities.urns.urn import Urn
+
+logger: logging.Logger = logging.getLogger(__name__)
+
+
+@dataclass
+class CorpUser:
+ urn: Optional[str] = None
+ email: Optional[str] = None
+ slack_id: Optional[str] = None
+ title: Optional[str] = None
+ image_url: Optional[str] = None
+ phone: Optional[str] = None
+
+
+class SlackSourceConfig(ConfigModel):
+ bot_token: SecretStr = Field(
+ description="Bot token for the Slack workspace. Needs `users:read`, `users:read.email` and `users.profile:read` scopes.",
+ )
+
+
+@platform_name("Slack")
+@config_class(SlackSourceConfig)
+@support_status(SupportStatus.TESTING)
+class SlackSource(TestableSource):
+ def __init__(self, ctx: PipelineContext, config: SlackSourceConfig):
+ self.ctx = ctx
+ self.config = config
+ self.report = SourceReport()
+
+ @classmethod
+ def create(cls, config_dict, ctx):
+ config = SlackSourceConfig.parse_obj(config_dict)
+ return cls(ctx, config)
+
+ @staticmethod
+ def test_connection(config_dict: dict) -> TestConnectionReport:
+ raise NotImplementedError("This class does not implement this method")
+
+ def get_slack_client(self) -> WebClient:
+ return WebClient(token=self.config.bot_token.get_secret_value())
+
+ def get_workunits_internal(
+ self,
+ ) -> Iterable[MetadataWorkUnit]:
+ assert self.ctx.graph is not None
+ auth_resp = self.get_slack_client().auth_test()
+ logger.info("Successfully connected to Slack")
+ logger.info(auth_resp.data)
+ for user_obj in self.get_user_to_be_updated():
+ self.populate_slack_id_from_email(user_obj)
+ if user_obj.slack_id is None:
+ continue
+ self.populate_user_profile(user_obj)
+ if user_obj.urn is None:
+ continue
+ logger.info(f"User: {user_obj}")
+ corpuser_editable_info = (
+ self.ctx.graph.get_aspect(
+ entity_urn=user_obj.urn, aspect_type=CorpUserEditableInfoClass
+ )
+ or CorpUserEditableInfoClass()
+ )
+ corpuser_editable_info.email = user_obj.email
+ corpuser_editable_info.slack = user_obj.slack_id
+ corpuser_editable_info.title = user_obj.title
+ if user_obj.image_url:
+ corpuser_editable_info.pictureLink = user_obj.image_url
+ if user_obj.phone:
+ corpuser_editable_info.phone = user_obj.phone
+ yield MetadataWorkUnit(
+ id=f"{user_obj.urn}",
+ mcp=MetadataChangeProposalWrapper(
+ entityUrn=user_obj.urn,
+ aspect=corpuser_editable_info,
+ ),
+ )
+
+ def populate_user_profile(self, user_obj: CorpUser) -> None:
+ try:
+ # https://api.slack.com/methods/users.profile.get
+ user_profile_res = self.get_slack_client().users_profile_get(
+ user=user_obj.slack_id
+ )
+ user_profile = user_profile_res.get("profile", {})
+ user_obj.title = user_profile.get("title")
+ user_obj.image_url = user_profile.get("image_192")
+ user_obj.phone = user_profile.get("phone")
+ except Exception as e:
+ if "missing_scope" in str(e):
+ raise e
+ return
+
+ def populate_slack_id_from_email(self, user_obj: CorpUser) -> None:
+ if user_obj.email is None:
+ return
+ try:
+ # https://api.slack.com/methods/users.lookupByEmail
+ user_info_res = self.get_slack_client().users_lookupByEmail(
+ email=user_obj.email
+ )
+ user_info = user_info_res.get("user", {})
+ user_obj.slack_id = user_info.get("id")
+ except Exception as e:
+ if "users_not_found" in str(e):
+ return
+ raise e
+
+ def get_user_to_be_updated(self) -> Iterable[CorpUser]:
+ graphql_query = textwrap.dedent(
+ """
+ query listUsers($input: ListUsersInput!) {
+ listUsers(input: $input) {
+ total
+ users {
+ urn
+ editableProperties {
+ email
+ slack
+ }
+ }
+ }
+ }
+ """
+ )
+ start = 0
+ count = 10
+ total = count
+
+ assert self.ctx.graph is not None
+
+ while start < total:
+ variables = {"input": {"start": start, "count": count}}
+ response = self.ctx.graph.execute_graphql(
+ query=graphql_query, variables=variables
+ )
+ list_users = response.get("listUsers", {})
+ total = list_users.get("total", 0)
+ users = list_users.get("users", [])
+ for user in users:
+ user_obj = CorpUser()
+ editable_properties = user.get("editableProperties", {})
+ user_obj.urn = user.get("urn")
+ if user_obj.urn is None:
+ continue
+ if editable_properties is not None:
+ user_obj.email = editable_properties.get("email")
+ if user_obj.email is None:
+ urn_id = Urn.from_string(user_obj.urn).get_entity_id_as_string()
+ if "@" in urn_id:
+ user_obj.email = urn_id
+ if user_obj.email is not None:
+ yield user_obj
+ start += count
+
+ def get_report(self) -> SourceReport:
+ return self.report
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java
index 9a3bc9e319d2b..15de029340a3c 100644
--- a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java
+++ b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java
@@ -229,9 +229,11 @@ public BrowseResultV2 browseV2(
@Nonnull String input,
int start,
int count,
- @Nonnull Authentication authentication) {
+ @Nonnull Authentication authentication,
+ @Nullable SearchFlags searchFlags) {
// TODO: cache browseV2 results
- return _entitySearchService.browseV2(entityName, path, filter, input, start, count);
+ return _entitySearchService.browseV2(
+ entityName, path, filter, input, start, count, searchFlags);
}
/**
@@ -253,9 +255,11 @@ public BrowseResultV2 browseV2(
@Nonnull String input,
int start,
int count,
- @Nonnull Authentication authentication) {
+ @Nonnull Authentication authentication,
+ @Nullable SearchFlags searchFlags) {
// TODO: cache browseV2 results
- return _entitySearchService.browseV2(entityNames, path, filter, input, start, count);
+ return _entitySearchService.browseV2(
+ entityNames, path, filter, input, start, count, searchFlags);
}
@SneakyThrows
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java
index 7f15e3a7fd8fc..eec5c6120886d 100644
--- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java
+++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java
@@ -15,6 +15,7 @@
import com.codahale.metrics.Timer;
import com.datahub.util.RecordUtils;
import com.datahub.util.exception.ModelConversionException;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
@@ -146,7 +147,8 @@ public class EntityServiceImpl implements EntityService {
private static final int DEFAULT_MAX_TRANSACTION_RETRY = 3;
protected final AspectDao _aspectDao;
- private final EventProducer _producer;
+
+ @VisibleForTesting @Getter private final EventProducer _producer;
private final EntityRegistry _entityRegistry;
private final Map> _entityToValidAspects;
private RetentionService _retentionService;
@@ -637,10 +639,15 @@ public List ingestAspects(
@Override
public List ingestAspects(
@Nonnull final AspectsBatch aspectsBatch, boolean emitMCL, boolean overwrite) {
+ Set items = new HashSet<>(aspectsBatch.getItems());
+
+ // Generate additional items as needed
+ items.addAll(DefaultAspectsUtil.getAdditionalChanges(aspectsBatch, this, enableBrowseV2));
+ AspectsBatch withDefaults = AspectsBatchImpl.builder().items(items).build();
Timer.Context ingestToLocalDBTimer =
MetricUtils.timer(this.getClass(), "ingestAspectsToLocalDB").time();
- List ingestResults = ingestAspectsToLocalDB(aspectsBatch, overwrite);
+ List ingestResults = ingestAspectsToLocalDB(withDefaults, overwrite);
List mclResults = emitMCL(ingestResults, emitMCL);
ingestToLocalDBTimer.stop();
@@ -964,7 +971,7 @@ public IngestResult ingestProposal(
*/
@Override
public Set ingestProposal(AspectsBatch aspectsBatch, final boolean async) {
- Stream timeseriesIngestResults = ingestTimeseriesProposal(aspectsBatch);
+ Stream timeseriesIngestResults = ingestTimeseriesProposal(aspectsBatch, async);
Stream nonTimeseriesIngestResults =
async ? ingestProposalAsync(aspectsBatch) : ingestProposalSync(aspectsBatch);
@@ -978,7 +985,8 @@ public Set ingestProposal(AspectsBatch aspectsBatch, final boolean
* @param aspectsBatch timeseries upserts batch
* @return returns ingest proposal result, however was never in the MCP topic
*/
- private Stream ingestTimeseriesProposal(AspectsBatch aspectsBatch) {
+ private Stream ingestTimeseriesProposal(
+ AspectsBatch aspectsBatch, final boolean async) {
List extends BatchItem> unsupported =
aspectsBatch.getItems().stream()
.filter(
@@ -992,6 +1000,20 @@ private Stream ingestTimeseriesProposal(AspectsBatch aspectsBatch)
+ unsupported.stream().map(BatchItem::getChangeType).collect(Collectors.toSet()));
}
+ if (!async) {
+ // Create default non-timeseries aspects for timeseries aspects
+ List timeseriesItems =
+ aspectsBatch.getItems().stream()
+ .filter(item -> item.getAspectSpec().isTimeseries())
+ .collect(Collectors.toList());
+
+ List defaultAspects =
+ DefaultAspectsUtil.getAdditionalChanges(
+ AspectsBatchImpl.builder().items(timeseriesItems).build(), this, enableBrowseV2);
+ ingestProposalSync(AspectsBatchImpl.builder().items(defaultAspects).build());
+ }
+
+ // Emit timeseries MCLs
List, Boolean>>>> timeseriesResults =
aspectsBatch.getItems().stream()
.filter(item -> item.getAspectSpec().isTimeseries())
@@ -1080,17 +1102,10 @@ private Stream ingestProposalAsync(AspectsBatch aspectsBatch) {
}
private Stream ingestProposalSync(AspectsBatch aspectsBatch) {
- Set items = new HashSet<>(aspectsBatch.getItems());
-
- // Generate additional items as needed
- items.addAll(DefaultAspectsUtil.getAdditionalChanges(aspectsBatch, this, enableBrowseV2));
-
- AspectsBatch withDefaults = AspectsBatchImpl.builder().items(items).build();
-
AspectsBatchImpl nonTimeseries =
AspectsBatchImpl.builder()
.items(
- withDefaults.getItems().stream()
+ aspectsBatch.getItems().stream()
.filter(item -> !item.getAspectSpec().isTimeseries())
.collect(Collectors.toList()))
.build();
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java
index 7cba2e0ecc8cb..c20c16e0ea7d1 100644
--- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java
+++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/ElasticSearchService.java
@@ -215,8 +215,9 @@ public BrowseResultV2 browseV2(
@Nullable Filter filter,
@Nonnull String input,
int start,
- int count) {
- return esBrowseDAO.browseV2(entityName, path, filter, input, start, count);
+ int count,
+ @Nullable SearchFlags searchFlags) {
+ return esBrowseDAO.browseV2(entityName, path, filter, input, start, count, searchFlags);
}
@Nonnull
@@ -227,8 +228,9 @@ public BrowseResultV2 browseV2(
@Nullable Filter filter,
@Nonnull String input,
int start,
- int count) {
- return esBrowseDAO.browseV2(entityNames, path, filter, input, start, count);
+ int count,
+ @Nullable SearchFlags searchFlags) {
+ return esBrowseDAO.browseV2(entityNames, path, filter, input, start, count, searchFlags);
}
@Nonnull
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/ESBrowseDAO.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/ESBrowseDAO.java
index 0a9a9fbbad086..b808588520089 100644
--- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/ESBrowseDAO.java
+++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/ESBrowseDAO.java
@@ -21,6 +21,7 @@
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.annotation.SearchableAnnotation;
import com.linkedin.metadata.models.registry.EntityRegistry;
+import com.linkedin.metadata.query.SearchFlags;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.metadata.search.elasticsearch.query.request.SearchRequestHandler;
import com.linkedin.metadata.search.utils.ESUtils;
@@ -34,6 +35,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
@@ -399,14 +401,15 @@ public BrowseResultV2 browseV2(
@Nullable Filter filter,
@Nonnull String input,
int start,
- int count) {
+ int count,
+ @Nullable SearchFlags searchFlags) {
try {
final SearchResponse groupsResponse;
try (Timer.Context ignored = MetricUtils.timer(this.getClass(), "esGroupSearch").time()) {
final String finalInput = input.isEmpty() ? "*" : input;
groupsResponse =
client.search(
- constructGroupsSearchRequestV2(entityName, path, filter, finalInput),
+ constructGroupsSearchRequestV2(entityName, path, filter, finalInput, searchFlags),
RequestOptions.DEFAULT);
}
@@ -435,7 +438,8 @@ public BrowseResultV2 browseV2(
@Nullable Filter filter,
@Nonnull String input,
int start,
- int count) {
+ int count,
+ @Nullable SearchFlags searchFlags) {
try {
final SearchResponse groupsResponse;
@@ -444,7 +448,7 @@ public BrowseResultV2 browseV2(
groupsResponse =
client.search(
constructGroupsSearchRequestBrowseAcrossEntities(
- entities, path, filter, finalInput),
+ entities, path, filter, finalInput, searchFlags),
RequestOptions.DEFAULT);
}
@@ -472,7 +476,8 @@ private SearchRequest constructGroupsSearchRequestV2(
@Nonnull String entityName,
@Nonnull String path,
@Nullable Filter filter,
- @Nonnull String input) {
+ @Nonnull String input,
+ @Nullable SearchFlags searchFlags) {
final String indexName = indexConvention.getIndexName(entityRegistry.getEntitySpec(entityName));
final SearchRequest searchRequest = new SearchRequest(indexName);
final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
@@ -482,7 +487,8 @@ private SearchRequest constructGroupsSearchRequestV2(
entityName,
path,
SearchUtil.transformFilterForEntities(filter, indexConvention),
- input));
+ input,
+ searchFlags));
searchSourceBuilder.aggregation(buildAggregationsV2(path));
searchRequest.source(searchSourceBuilder);
return searchRequest;
@@ -493,7 +499,8 @@ private SearchRequest constructGroupsSearchRequestBrowseAcrossEntities(
@Nonnull List entities,
@Nonnull String path,
@Nullable Filter filter,
- @Nonnull String input) {
+ @Nonnull String input,
+ @Nullable SearchFlags searchFlags) {
List entitySpecs =
entities.stream().map(entityRegistry::getEntitySpec).collect(Collectors.toList());
@@ -509,7 +516,8 @@ private SearchRequest constructGroupsSearchRequestBrowseAcrossEntities(
entitySpecs,
path,
SearchUtil.transformFilterForEntities(filter, indexConvention),
- input));
+ input,
+ searchFlags));
searchSourceBuilder.aggregation(buildAggregationsV2(path));
searchRequest.source(searchSourceBuilder);
return searchRequest;
@@ -537,7 +545,10 @@ private QueryBuilder buildQueryStringV2(
@Nonnull String entityName,
@Nonnull String path,
@Nullable Filter filter,
- @Nonnull String input) {
+ @Nonnull String input,
+ @Nullable SearchFlags searchFlags) {
+ SearchFlags finalSearchFlags =
+ Optional.ofNullable(searchFlags).orElse(new SearchFlags().setFulltext(true));
final int browseDepthVal = getPathDepthV2(path);
final BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
@@ -545,7 +556,7 @@ private QueryBuilder buildQueryStringV2(
EntitySpec entitySpec = entityRegistry.getEntitySpec(entityName);
QueryBuilder query =
SearchRequestHandler.getBuilder(entitySpec, searchConfiguration, customSearchConfiguration)
- .getQuery(input, false);
+ .getQuery(input, Boolean.TRUE.equals(finalSearchFlags.isFulltext()));
queryBuilder.must(query);
filterSoftDeletedByDefault(filter, queryBuilder);
@@ -567,14 +578,17 @@ private QueryBuilder buildQueryStringBrowseAcrossEntities(
@Nonnull List entitySpecs,
@Nonnull String path,
@Nullable Filter filter,
- @Nonnull String input) {
+ @Nonnull String input,
+ @Nullable SearchFlags searchFlags) {
+ SearchFlags finalSearchFlags =
+ Optional.ofNullable(searchFlags).orElse(new SearchFlags().setFulltext(true));
final int browseDepthVal = getPathDepthV2(path);
final BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery();
QueryBuilder query =
SearchRequestHandler.getBuilder(entitySpecs, searchConfiguration, customSearchConfiguration)
- .getQuery(input, false);
+ .getQuery(input, Boolean.TRUE.equals(finalSearchFlags.isFulltext()));
queryBuilder.must(query);
if (!path.isEmpty()) {
diff --git a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchQueryBuilder.java b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchQueryBuilder.java
index 7ddccb0d56724..4c704f81b4c13 100644
--- a/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchQueryBuilder.java
+++ b/metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/SearchQueryBuilder.java
@@ -135,14 +135,10 @@ private QueryBuilder buildInternalQuery(
query.startsWith(STRUCTURED_QUERY_PREFIX)
? query.substring(STRUCTURED_QUERY_PREFIX.length())
: query;
-
- QueryStringQueryBuilder queryBuilder = QueryBuilders.queryStringQuery(withoutQueryPrefix);
- queryBuilder.defaultOperator(Operator.AND);
- getStandardFields(entitySpecs)
- .forEach(entitySpec -> queryBuilder.field(entitySpec.fieldName(), entitySpec.boost()));
- finalQuery.should(queryBuilder);
+ getStructuredQuery(customQueryConfig, entitySpecs, withoutQueryPrefix)
+ .ifPresent(finalQuery::should);
if (exactMatchConfiguration.isEnableStructured()) {
- getPrefixAndExactMatchQuery(null, entitySpecs, withoutQueryPrefix)
+ getPrefixAndExactMatchQuery(customQueryConfig, entitySpecs, withoutQueryPrefix)
.ifPresent(finalQuery::should);
}
}
@@ -415,6 +411,29 @@ private Optional getPrefixAndExactMatchQuery(
return finalQuery.should().size() > 0 ? Optional.of(finalQuery) : Optional.empty();
}
+ private Optional getStructuredQuery(
+ @Nullable QueryConfiguration customQueryConfig,
+ List entitySpecs,
+ String sanitizedQuery) {
+ Optional result = Optional.empty();
+
+ final boolean executeStructuredQuery;
+ if (customQueryConfig != null) {
+ executeStructuredQuery = customQueryConfig.isStructuredQuery();
+ } else {
+ executeStructuredQuery = !(isQuoted(sanitizedQuery) && exactMatchConfiguration.isExclusive());
+ }
+
+ if (executeStructuredQuery) {
+ QueryStringQueryBuilder queryBuilder = QueryBuilders.queryStringQuery(sanitizedQuery);
+ queryBuilder.defaultOperator(Operator.AND);
+ getStandardFields(entitySpecs)
+ .forEach(entitySpec -> queryBuilder.field(entitySpec.fieldName(), entitySpec.boost()));
+ result = Optional.of(queryBuilder);
+ }
+ return result;
+ }
+
private FunctionScoreQueryBuilder buildScoreFunctions(
@Nullable QueryConfiguration customQueryConfig,
@Nonnull List entitySpecs,
diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java
index ea4e97d264bca..384b54c7a1c8d 100644
--- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java
+++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java
@@ -479,7 +479,7 @@ public void testIngestAspectsGetLatestAspects() throws Exception {
assertTrue(DataTemplateUtil.areEqual(writeAspect1, latestAspects.get(aspectName1)));
assertTrue(DataTemplateUtil.areEqual(writeAspect2, latestAspects.get(aspectName2)));
- verify(_mockProducer, times(2))
+ verify(_mockProducer, times(3))
.produceMetadataChangeLog(Mockito.eq(entityUrn), Mockito.any(), Mockito.any());
verifyNoMoreInteractions(_mockProducer);
@@ -772,6 +772,12 @@ public void testUpdateGetAspect() throws AssertionError {
.produceMetadataChangeLog(
Mockito.eq(entityUrn), Mockito.eq(corpUserInfoSpec), Mockito.any());
+ verify(_mockProducer, times(1))
+ .produceMetadataChangeLog(
+ Mockito.eq(entityUrn),
+ Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserKey")),
+ Mockito.any());
+
verifyNoMoreInteractions(_mockProducer);
}
@@ -824,6 +830,13 @@ public void testGetAspectAtVersion() throws AssertionError {
readAspect1 = _entityServiceImpl.getVersionedAspect(entityUrn, aspectName, -1);
assertFalse(DataTemplateUtil.areEqual(writtenVersionedAspect1, readAspect1));
+ // check key aspect
+ verify(_mockProducer, times(1))
+ .produceMetadataChangeLog(
+ Mockito.eq(entityUrn),
+ Mockito.eq(_testEntityRegistry.getEntitySpec("corpuser").getAspectSpec("corpUserKey")),
+ Mockito.any());
+
verifyNoMoreInteractions(_mockProducer);
}
@@ -1094,13 +1107,22 @@ public void testIngestGetLatestAspect() throws AssertionError {
ArgumentCaptor mclCaptor = ArgumentCaptor.forClass(MetadataChangeLog.class);
verify(_mockProducer, times(1))
- .produceMetadataChangeLog(Mockito.eq(entityUrn), Mockito.any(), mclCaptor.capture());
+ .produceMetadataChangeLog(
+ Mockito.eq(entityUrn),
+ Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserInfo")),
+ mclCaptor.capture());
MetadataChangeLog mcl = mclCaptor.getValue();
assertEquals(mcl.getEntityType(), "corpuser");
assertNull(mcl.getPreviousAspectValue());
assertNull(mcl.getPreviousSystemMetadata());
assertEquals(mcl.getChangeType(), ChangeType.UPSERT);
+ verify(_mockProducer, times(1))
+ .produceMetadataChangeLog(
+ Mockito.eq(entityUrn),
+ Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserKey")),
+ Mockito.any());
+
verifyNoMoreInteractions(_mockProducer);
reset(_mockProducer);
@@ -1201,7 +1223,16 @@ public void testIngestGetLatestEnvelopedAspect() throws Exception {
EntityUtils.parseSystemMetadata(readAspectDao1.getSystemMetadata()), metadata1));
verify(_mockProducer, times(2))
- .produceMetadataChangeLog(Mockito.eq(entityUrn), Mockito.any(), Mockito.any());
+ .produceMetadataChangeLog(
+ Mockito.eq(entityUrn),
+ Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserInfo")),
+ Mockito.any());
+
+ verify(_mockProducer, times(1))
+ .produceMetadataChangeLog(
+ Mockito.eq(entityUrn),
+ Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserKey")),
+ Mockito.any());
verifyNoMoreInteractions(_mockProducer);
}
@@ -1234,9 +1265,18 @@ public void testIngestSameAspect() throws AssertionError {
RecordTemplate readAspect1 = _entityServiceImpl.getLatestAspect(entityUrn, aspectName);
assertTrue(DataTemplateUtil.areEqual(writeAspect1, readAspect1));
+ verify(_mockProducer, times(1))
+ .produceMetadataChangeLog(
+ Mockito.eq(entityUrn),
+ Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserKey")),
+ Mockito.any());
+
ArgumentCaptor mclCaptor = ArgumentCaptor.forClass(MetadataChangeLog.class);
verify(_mockProducer, times(1))
- .produceMetadataChangeLog(Mockito.eq(entityUrn), Mockito.any(), mclCaptor.capture());
+ .produceMetadataChangeLog(
+ Mockito.eq(entityUrn),
+ Mockito.eq(_testEntityRegistry.getEntitySpec("corpUser").getAspectSpec("corpUserInfo")),
+ mclCaptor.capture());
MetadataChangeLog mcl = mclCaptor.getValue();
assertEquals(mcl.getEntityType(), "corpuser");
assertNull(mcl.getPreviousAspectValue());
diff --git a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/custom/QueryConfiguration.java b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/custom/QueryConfiguration.java
index 901bf803d2bca..e3a9d076dbef2 100644
--- a/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/custom/QueryConfiguration.java
+++ b/metadata-service/configuration/src/main/java/com/linkedin/metadata/config/search/custom/QueryConfiguration.java
@@ -19,6 +19,13 @@ public class QueryConfiguration {
private String queryRegex;
@Builder.Default private boolean simpleQuery = true;
+
+ /**
+ * Used to determine if standard structured query logic should be applied when relevant, i.e.
+ * fullText flag is false. Will not be added in cases where simpleQuery would be the standard.
+ */
+ @Builder.Default private boolean structuredQuery = true;
+
@Builder.Default private boolean exactMatchQuery = true;
@Builder.Default private boolean prefixMatchQuery = true;
private BoolQueryConfiguration boolQuery;
diff --git a/metadata-service/configuration/src/main/resources/application.yml b/metadata-service/configuration/src/main/resources/application.yml
index d4c11d4aa53bd..c2a0d508b57d6 100644
--- a/metadata-service/configuration/src/main/resources/application.yml
+++ b/metadata-service/configuration/src/main/resources/application.yml
@@ -314,6 +314,14 @@ systemUpdate:
maxBackOffs: ${BOOTSTRAP_SYSTEM_UPDATE_MAX_BACK_OFFS:50}
backOffFactor: ${BOOTSTRAP_SYSTEM_UPDATE_BACK_OFF_FACTOR:2} # Multiplicative factor for back off, default values will result in waiting 5min 15s
waitForSystemUpdate: ${BOOTSTRAP_SYSTEM_UPDATE_WAIT_FOR_SYSTEM_UPDATE:true}
+ dataJobNodeCLL:
+ enabled: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_ENABLED:true}
+ batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_BATCH_SIZE:200}
+ browsePathsV2:
+ enabled: ${BOOTSTRAP_SYSTEM_UPDATE_BROWSE_PATHS_V2_ENABLED:true}
+ batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_BROWSE_PATHS_V2_BATCH_SIZE:5000}
+ reprocess:
+ enabled: ${REPROCESS_DEFAULT_BROWSE_PATHS_V2:false}
structuredProperties:
enabled: ${ENABLE_STRUCTURED_PROPERTIES_HOOK:true} # applies structured properties mappings
diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/EntityServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/EntityServiceFactory.java
index 871f16d97be33..2ccdee5fb1dbf 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/EntityServiceFactory.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entity/EntityServiceFactory.java
@@ -1,20 +1,15 @@
package com.linkedin.gms.factory.entity;
import com.linkedin.datahub.graphql.featureflags.FeatureFlags;
-import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.dao.producer.KafkaEventProducer;
-import com.linkedin.metadata.dao.producer.KafkaHealthChecker;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.EntityServiceImpl;
import com.linkedin.metadata.entity.ebean.batch.MCPUpsertBatchItem;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.service.UpdateIndicesService;
-import com.linkedin.mxe.TopicConvention;
import javax.annotation.Nonnull;
-import org.apache.avro.generic.IndexedRecord;
-import org.apache.kafka.clients.producer.Producer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
@@ -28,26 +23,16 @@ public class EntityServiceFactory {
private Integer _ebeanMaxTransactionRetry;
@Bean(name = "entityService")
- @DependsOn({
- "entityAspectDao",
- "kafkaEventProducer",
- "kafkaHealthChecker",
- TopicConventionFactory.TOPIC_CONVENTION_BEAN,
- "entityRegistry"
- })
+ @DependsOn({"entityAspectDao", "kafkaEventProducer", "entityRegistry"})
@Nonnull
protected EntityService createInstance(
- Producer producer,
- TopicConvention convention,
- KafkaHealthChecker kafkaHealthChecker,
+ @Qualifier("kafkaEventProducer") final KafkaEventProducer eventProducer,
@Qualifier("entityAspectDao") AspectDao aspectDao,
EntityRegistry entityRegistry,
ConfigurationProvider configurationProvider,
UpdateIndicesService updateIndicesService,
@Value("${featureFlags.showBrowseV2}") final boolean enableBrowsePathV2) {
- final KafkaEventProducer eventProducer =
- new KafkaEventProducer(producer, convention, kafkaHealthChecker);
FeatureFlags featureFlags = configurationProvider.getFeatureFlags();
return new EntityServiceImpl(
diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/DUHESchemaRegistryFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/DUHESchemaRegistryFactory.java
deleted file mode 100644
index 4819984307af9..0000000000000
--- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/DUHESchemaRegistryFactory.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package com.linkedin.gms.factory.kafka.schemaregistry;
-
-import static com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener.TOPIC_NAME;
-
-import com.linkedin.gms.factory.config.ConfigurationProvider;
-import com.linkedin.metadata.boot.kafka.MockDUHEDeserializer;
-import com.linkedin.metadata.boot.kafka.MockDUHESerializer;
-import com.linkedin.metadata.config.kafka.KafkaConfiguration;
-import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
-import java.util.HashMap;
-import java.util.Map;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-@Slf4j
-@Configuration
-public class DUHESchemaRegistryFactory {
-
- public static final String DUHE_SCHEMA_REGISTRY_TOPIC_KEY = "duheTopicName";
-
- @Value(TOPIC_NAME)
- private String duheTopicName;
-
- /** Configure Kafka Producer/Consumer processes with a custom schema registry. */
- @Bean("duheSchemaRegistryConfig")
- protected SchemaRegistryConfig duheSchemaRegistryConfig(ConfigurationProvider provider) {
- Map props = new HashMap<>();
- KafkaConfiguration kafkaConfiguration = provider.getKafka();
-
- props.put(
- AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
- kafkaConfiguration.getSchemaRegistry().getUrl());
- props.put(DUHE_SCHEMA_REGISTRY_TOPIC_KEY, duheTopicName);
-
- log.info("DataHub System Update Registry");
- return new SchemaRegistryConfig(MockDUHESerializer.class, MockDUHEDeserializer.class, props);
- }
-}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/InternalSchemaRegistryFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/InternalSchemaRegistryFactory.java
index 8c814e5054758..46b27195ecc67 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/InternalSchemaRegistryFactory.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/InternalSchemaRegistryFactory.java
@@ -1,11 +1,7 @@
package com.linkedin.gms.factory.kafka.schemaregistry;
-import com.linkedin.gms.factory.common.TopicConventionFactory;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.config.kafka.KafkaConfiguration;
-import com.linkedin.metadata.registry.SchemaRegistryService;
-import com.linkedin.metadata.registry.SchemaRegistryServiceImpl;
-import com.linkedin.mxe.TopicConvention;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
@@ -17,7 +13,6 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.DependsOn;
@Slf4j
@Configuration
@@ -45,11 +40,4 @@ protected SchemaRegistryConfig getInstance(
kafkaConfiguration.getSchemaRegistry().getUrl());
return new SchemaRegistryConfig(KafkaAvroSerializer.class, KafkaAvroDeserializer.class, props);
}
-
- @Bean(name = "schemaRegistryService")
- @Nonnull
- @DependsOn({TopicConventionFactory.TOPIC_CONVENTION_BEAN})
- protected SchemaRegistryService schemaRegistryService(TopicConvention convention) {
- return new SchemaRegistryServiceImpl(convention);
- }
}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SchemaRegistryServiceFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SchemaRegistryServiceFactory.java
new file mode 100644
index 0000000000000..a6869321d796f
--- /dev/null
+++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SchemaRegistryServiceFactory.java
@@ -0,0 +1,20 @@
+package com.linkedin.gms.factory.kafka.schemaregistry;
+
+import com.linkedin.gms.factory.common.TopicConventionFactory;
+import com.linkedin.metadata.registry.SchemaRegistryService;
+import com.linkedin.metadata.registry.SchemaRegistryServiceImpl;
+import com.linkedin.mxe.TopicConvention;
+import javax.annotation.Nonnull;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.DependsOn;
+
+@Configuration
+public class SchemaRegistryServiceFactory {
+ @Bean(name = "schemaRegistryService")
+ @Nonnull
+ @DependsOn({TopicConventionFactory.TOPIC_CONVENTION_BEAN})
+ protected SchemaRegistryService schemaRegistryService(TopicConvention convention) {
+ return new SchemaRegistryServiceImpl(convention);
+ }
+}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SystemUpdateSchemaRegistryFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SystemUpdateSchemaRegistryFactory.java
new file mode 100644
index 0000000000000..d02cdc0e68f52
--- /dev/null
+++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/kafka/schemaregistry/SystemUpdateSchemaRegistryFactory.java
@@ -0,0 +1,66 @@
+package com.linkedin.gms.factory.kafka.schemaregistry;
+
+import static com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener.TOPIC_NAME;
+
+import com.linkedin.gms.factory.config.ConfigurationProvider;
+import com.linkedin.metadata.boot.kafka.MockSystemUpdateDeserializer;
+import com.linkedin.metadata.boot.kafka.MockSystemUpdateSerializer;
+import com.linkedin.metadata.config.kafka.KafkaConfiguration;
+import com.linkedin.metadata.registry.SchemaRegistryService;
+import com.linkedin.mxe.Topics;
+import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
+import java.util.HashMap;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Slf4j
+@Configuration
+public class SystemUpdateSchemaRegistryFactory {
+
+ public static final String SYSTEM_UPDATE_TOPIC_KEY_PREFIX = "data-hub.system-update.topic-key.";
+ public static final String SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX = ".id";
+
+ public static final String DUHE_SCHEMA_REGISTRY_TOPIC_KEY =
+ SYSTEM_UPDATE_TOPIC_KEY_PREFIX + "duhe";
+ public static final String MCL_VERSIONED_SCHEMA_REGISTRY_TOPIC_KEY =
+ SYSTEM_UPDATE_TOPIC_KEY_PREFIX + "mcl-versioned";
+
+ @Value(TOPIC_NAME)
+ private String duheTopicName;
+
+ @Value("${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_VERSIONED + "}")
+ private String mclTopicName;
+
+ /** Configure Kafka Producer/Consumer processes with a custom schema registry. */
+ @Bean("duheSchemaRegistryConfig")
+ protected SchemaRegistryConfig duheSchemaRegistryConfig(
+ final ConfigurationProvider provider, final SchemaRegistryService schemaRegistryService) {
+ Map props = new HashMap<>();
+ KafkaConfiguration kafkaConfiguration = provider.getKafka();
+
+ props.put(
+ AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
+ kafkaConfiguration.getSchemaRegistry().getUrl());
+
+ // topic names
+ props.putAll(
+ Map.of(
+ DUHE_SCHEMA_REGISTRY_TOPIC_KEY, duheTopicName,
+ MCL_VERSIONED_SCHEMA_REGISTRY_TOPIC_KEY, mclTopicName));
+
+ // topic ordinals
+ props.putAll(
+ Map.of(
+ DUHE_SCHEMA_REGISTRY_TOPIC_KEY + SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX,
+ schemaRegistryService.getSchemaIdForTopic(duheTopicName).get().toString(),
+ MCL_VERSIONED_SCHEMA_REGISTRY_TOPIC_KEY + SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX,
+ schemaRegistryService.getSchemaIdForTopic(mclTopicName).get().toString()));
+
+ log.info("DataHub System Update Registry");
+ return new SchemaRegistryConfig(
+ MockSystemUpdateSerializer.class, MockSystemUpdateDeserializer.class, props);
+ }
+}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/BootstrapStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/BootstrapStep.java
index a79bdacfc55e9..2dccda4243bca 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/BootstrapStep.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/BootstrapStep.java
@@ -1,16 +1,15 @@
package com.linkedin.metadata.boot;
-import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.key.DataHubUpgradeKey;
+import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.upgrade.DataHubUpgradeResult;
-import java.net.URISyntaxException;
import javax.annotation.Nonnull;
/** A single step in the Bootstrap process. */
@@ -40,24 +39,10 @@ static Urn getUpgradeUrn(String upgradeId) {
new DataHubUpgradeKey().setId(upgradeId), Constants.DATA_HUB_UPGRADE_ENTITY_NAME);
}
- static void setUpgradeResult(Urn urn, EntityService> entityService) throws URISyntaxException {
- final AuditStamp auditStamp =
- new AuditStamp()
- .setActor(Urn.createFromString(Constants.SYSTEM_ACTOR))
- .setTime(System.currentTimeMillis());
+ static void setUpgradeResult(Urn urn, EntityService> entityService) {
final DataHubUpgradeResult upgradeResult =
new DataHubUpgradeResult().setTimestampMs(System.currentTimeMillis());
- // Workaround because entity service does not auto-generate the key aspect for us
- final MetadataChangeProposal keyProposal = new MetadataChangeProposal();
- final DataHubUpgradeKey upgradeKey = new DataHubUpgradeKey().setId(urn.getId());
- keyProposal.setEntityUrn(urn);
- keyProposal.setEntityType(Constants.DATA_HUB_UPGRADE_ENTITY_NAME);
- keyProposal.setAspectName(Constants.DATA_HUB_UPGRADE_KEY_ASPECT_NAME);
- keyProposal.setAspect(GenericRecordUtils.serializeAspect(upgradeKey));
- keyProposal.setChangeType(ChangeType.UPSERT);
- entityService.ingestProposal(keyProposal, auditStamp, false);
-
// Ingest the upgrade result
final MetadataChangeProposal upgradeProposal = new MetadataChangeProposal();
upgradeProposal.setEntityUrn(urn);
@@ -65,6 +50,6 @@ static void setUpgradeResult(Urn urn, EntityService> entityService) throws URI
upgradeProposal.setAspectName(Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME);
upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(upgradeResult));
upgradeProposal.setChangeType(ChangeType.UPSERT);
- entityService.ingestProposal(upgradeProposal, auditStamp, false);
+ entityService.ingestProposal(upgradeProposal, AuditStampUtils.createDefaultAuditStamp(), false);
}
}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHESerializer.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHESerializer.java
deleted file mode 100644
index 36fe514d5536f..0000000000000
--- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHESerializer.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.linkedin.metadata.boot.kafka;
-
-import static com.linkedin.gms.factory.kafka.schemaregistry.DUHESchemaRegistryFactory.DUHE_SCHEMA_REGISTRY_TOPIC_KEY;
-
-import com.linkedin.metadata.EventUtils;
-import io.confluent.kafka.schemaregistry.avro.AvroSchema;
-import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
-import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
-import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
-import io.confluent.kafka.serializers.KafkaAvroSerializer;
-import java.io.IOException;
-import java.util.Map;
-import lombok.extern.slf4j.Slf4j;
-
-/** Used for early bootstrap to avoid contact with not yet existing schema registry */
-@Slf4j
-public class MockDUHESerializer extends KafkaAvroSerializer {
-
- private static final String DATAHUB_UPGRADE_HISTORY_EVENT_SUBJECT_SUFFIX = "-value";
-
- private String topicName;
-
- public MockDUHESerializer() {
- this.schemaRegistry = buildMockSchemaRegistryClient();
- }
-
- public MockDUHESerializer(SchemaRegistryClient client) {
- super(client);
- this.schemaRegistry = buildMockSchemaRegistryClient();
- }
-
- public MockDUHESerializer(SchemaRegistryClient client, Map props) {
- super(client, props);
- this.schemaRegistry = buildMockSchemaRegistryClient();
- }
-
- @Override
- public void configure(Map configs, boolean isKey) {
- super.configure(configs, isKey);
- topicName = configs.get(DUHE_SCHEMA_REGISTRY_TOPIC_KEY).toString();
- }
-
- private MockSchemaRegistryClient buildMockSchemaRegistryClient() {
- MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient();
- try {
- schemaRegistry.register(
- topicToSubjectName(topicName), new AvroSchema(EventUtils.ORIGINAL_DUHE_AVRO_SCHEMA));
- return schemaRegistry;
- } catch (IOException | RestClientException e) {
- throw new RuntimeException(e);
- }
- }
-
- public static String topicToSubjectName(String topicName) {
- return topicName + DATAHUB_UPGRADE_HISTORY_EVENT_SUBJECT_SUFFIX;
- }
-}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHEDeserializer.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateDeserializer.java
similarity index 57%
rename from metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHEDeserializer.java
rename to metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateDeserializer.java
index e631f776abd08..74a20cdacbb21 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockDUHEDeserializer.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateDeserializer.java
@@ -1,50 +1,49 @@
package com.linkedin.metadata.boot.kafka;
-import static com.linkedin.gms.factory.kafka.schemaregistry.DUHESchemaRegistryFactory.DUHE_SCHEMA_REGISTRY_TOPIC_KEY;
-import static com.linkedin.metadata.boot.kafka.MockDUHESerializer.topicToSubjectName;
+import static com.linkedin.gms.factory.kafka.schemaregistry.SystemUpdateSchemaRegistryFactory.DUHE_SCHEMA_REGISTRY_TOPIC_KEY;
+import static com.linkedin.gms.factory.kafka.schemaregistry.SystemUpdateSchemaRegistryFactory.SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX;
+import static com.linkedin.metadata.boot.kafka.MockSystemUpdateSerializer.topicToSubjectName;
import com.linkedin.metadata.EventUtils;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
-import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.io.IOException;
import java.util.Map;
import lombok.extern.slf4j.Slf4j;
-/** Used for early bootstrap to avoid contact with not yet existing schema registry */
+/**
+ * Used for early bootstrap to avoid contact with not yet existing schema registry Only supports the
+ * DUHE topic
+ */
@Slf4j
-public class MockDUHEDeserializer extends KafkaAvroDeserializer {
+public class MockSystemUpdateDeserializer extends KafkaAvroDeserializer {
private String topicName;
-
- public MockDUHEDeserializer() {
- this.schemaRegistry = buildMockSchemaRegistryClient();
- }
-
- public MockDUHEDeserializer(SchemaRegistryClient client) {
- super(client);
- this.schemaRegistry = buildMockSchemaRegistryClient();
- }
-
- public MockDUHEDeserializer(SchemaRegistryClient client, Map props) {
- super(client, props);
- this.schemaRegistry = buildMockSchemaRegistryClient();
- }
+ private Integer schemaId;
@Override
public void configure(Map configs, boolean isKey) {
super.configure(configs, isKey);
topicName = configs.get(DUHE_SCHEMA_REGISTRY_TOPIC_KEY).toString();
+ schemaId =
+ Integer.valueOf(
+ configs
+ .get(DUHE_SCHEMA_REGISTRY_TOPIC_KEY + SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX)
+ .toString());
+ this.schemaRegistry = buildMockSchemaRegistryClient();
}
private MockSchemaRegistryClient buildMockSchemaRegistryClient() {
- MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient2();
+ MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient2(schemaId);
try {
schemaRegistry.register(
- topicToSubjectName(topicName), new AvroSchema(EventUtils.ORIGINAL_DUHE_AVRO_SCHEMA));
+ topicToSubjectName(topicName),
+ new AvroSchema(EventUtils.ORIGINAL_DUHE_AVRO_SCHEMA),
+ 0,
+ schemaId);
return schemaRegistry;
} catch (IOException | RestClientException e) {
throw new RuntimeException(e);
@@ -52,13 +51,19 @@ private MockSchemaRegistryClient buildMockSchemaRegistryClient() {
}
public static class MockSchemaRegistryClient2 extends MockSchemaRegistryClient {
+ private final int schemaId;
+
+ public MockSchemaRegistryClient2(int schemaId) {
+ this.schemaId = schemaId;
+ }
+
/**
* Previously used topics can have schema ids > 1 which fully match however we are replacing
* that registry so force schema id to 1
*/
@Override
public synchronized ParsedSchema getSchemaById(int id) throws IOException, RestClientException {
- return super.getSchemaById(1);
+ return super.getSchemaById(schemaId);
}
}
}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateSerializer.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateSerializer.java
new file mode 100644
index 0000000000000..14aac2758a69d
--- /dev/null
+++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/kafka/MockSystemUpdateSerializer.java
@@ -0,0 +1,76 @@
+package com.linkedin.metadata.boot.kafka;
+
+import static com.linkedin.gms.factory.kafka.schemaregistry.SystemUpdateSchemaRegistryFactory.DUHE_SCHEMA_REGISTRY_TOPIC_KEY;
+import static com.linkedin.gms.factory.kafka.schemaregistry.SystemUpdateSchemaRegistryFactory.MCL_VERSIONED_SCHEMA_REGISTRY_TOPIC_KEY;
+import static com.linkedin.gms.factory.kafka.schemaregistry.SystemUpdateSchemaRegistryFactory.SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX;
+import static com.linkedin.gms.factory.kafka.schemaregistry.SystemUpdateSchemaRegistryFactory.SYSTEM_UPDATE_TOPIC_KEY_PREFIX;
+
+import com.linkedin.metadata.EventUtils;
+import com.linkedin.util.Pair;
+import io.confluent.kafka.schemaregistry.avro.AvroSchema;
+import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
+import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import java.io.IOException;
+import java.util.Map;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+
+/** Used for early bootstrap to avoid contact with not yet existing schema registry */
+@Slf4j
+public class MockSystemUpdateSerializer extends KafkaAvroSerializer {
+
+ private static final String DATAHUB_SYSTEM_UPDATE_SUBJECT_SUFFIX = "-value";
+
+ private static final Map AVRO_SCHEMA_MAP =
+ Map.of(
+ DUHE_SCHEMA_REGISTRY_TOPIC_KEY, new AvroSchema(EventUtils.ORIGINAL_DUHE_AVRO_SCHEMA),
+ MCL_VERSIONED_SCHEMA_REGISTRY_TOPIC_KEY,
+ new AvroSchema(EventUtils.ORIGINAL_MCL_AVRO_SCHEMA));
+
+ private Map> topicNameToAvroSchemaMap;
+
+ @Override
+ public void configure(Map configs, boolean isKey) {
+ super.configure(configs, isKey);
+ topicNameToAvroSchemaMap =
+ configs.entrySet().stream()
+ .filter(
+ e ->
+ e.getKey().startsWith(SYSTEM_UPDATE_TOPIC_KEY_PREFIX)
+ && !e.getKey().endsWith(SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX)
+ && e.getValue() instanceof String)
+ .map(
+ e -> {
+ Integer id =
+ Integer.valueOf(
+ (String) configs.get(e.getKey() + SYSTEM_UPDATE_TOPIC_KEY_ID_SUFFIX));
+ return Pair.of(
+ (String) e.getValue(), Pair.of(AVRO_SCHEMA_MAP.get(e.getKey()), id));
+ })
+ .collect(Collectors.toMap(Pair::getKey, Pair::getValue));
+ this.schemaRegistry = buildMockSchemaRegistryClient();
+ }
+
+ private MockSchemaRegistryClient buildMockSchemaRegistryClient() {
+ MockSchemaRegistryClient schemaRegistry = new MockSchemaRegistryClient();
+
+ if (topicNameToAvroSchemaMap != null) {
+ topicNameToAvroSchemaMap.forEach(
+ (topicName, schemaId) -> {
+ try {
+ schemaRegistry.register(
+ topicToSubjectName(topicName), schemaId.getFirst(), 0, schemaId.getSecond());
+ } catch (IOException | RestClientException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ return schemaRegistry;
+ }
+
+ public static String topicToSubjectName(String topicName) {
+ return topicName + DATAHUB_SYSTEM_UPDATE_SUBJECT_SUFFIX;
+ }
+}
diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/EntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/EntityClient.java
index b1b24ac97f0b8..676b80c8bea32 100644
--- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/EntityClient.java
+++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/EntityClient.java
@@ -154,7 +154,8 @@ public BrowseResultV2 browseV2(
@Nonnull String input,
int start,
int count,
- @Nonnull Authentication authentication)
+ @Nonnull Authentication authentication,
+ @Nullable SearchFlags searchFlags)
throws RemoteInvocationException;
/**
@@ -176,7 +177,8 @@ public BrowseResultV2 browseV2(
@Nonnull String input,
int start,
int count,
- @Nonnull Authentication authentication)
+ @Nonnull Authentication authentication,
+ @Nullable SearchFlags searchFlags)
throws RemoteInvocationException;
@Deprecated
diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java
index 3108345bd3937..653ef046ffc02 100644
--- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java
+++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java
@@ -378,7 +378,8 @@ public BrowseResultV2 browseV2(
@Nonnull String input,
int start,
int count,
- @Nonnull Authentication authentication) {
+ @Nonnull Authentication authentication,
+ @Nullable SearchFlags searchFlags) {
throw new NotImplementedException("BrowseV2 is not implemented in Restli yet");
}
@@ -391,7 +392,8 @@ public BrowseResultV2 browseV2(
@Nonnull String input,
int start,
int count,
- @Nonnull Authentication authentication)
+ @Nonnull Authentication authentication,
+ @Nullable SearchFlags searchFlags)
throws RemoteInvocationException {
throw new NotImplementedException("BrowseV2 is not implemented in Restli yet");
}
diff --git a/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java b/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java
index 1678fe92ec70e..17c5160494722 100644
--- a/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java
+++ b/metadata-service/restli-servlet-impl/src/test/java/com/linkedin/metadata/resources/entity/AspectResourceTest.java
@@ -122,7 +122,7 @@ public void testAsyncDefaultAspects() throws URISyntaxException {
.request(req)
.build())));
_aspectResource.ingestProposal(mcp, "false");
- verify(_producer, times(5))
+ verify(_producer, times(10))
.produceMetadataChangeLog(eq(urn), any(AspectSpec.class), any(MetadataChangeLog.class));
verifyNoMoreInteractions(_producer);
}
diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/search/EntitySearchService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/search/EntitySearchService.java
index 2fec88ad221fd..0d1c031db136e 100644
--- a/metadata-service/services/src/main/java/com/linkedin/metadata/search/EntitySearchService.java
+++ b/metadata-service/services/src/main/java/com/linkedin/metadata/search/EntitySearchService.java
@@ -197,6 +197,7 @@ BrowseResult browse(
* @param input search query
* @param start start offset of first group
* @param count max number of results requested
+ * @param searchFlags configuration options for search
*/
@Nonnull
public BrowseResultV2 browseV2(
@@ -205,7 +206,8 @@ public BrowseResultV2 browseV2(
@Nullable Filter filter,
@Nonnull String input,
int start,
- int count);
+ int count,
+ @Nullable SearchFlags searchFlags);
/**
* Gets browse snapshot of a given path
@@ -216,6 +218,7 @@ public BrowseResultV2 browseV2(
* @param input search query
* @param start start offset of first group
* @param count max number of results requested
+ * @param searchFlags configuration options for search
*/
@Nonnull
public BrowseResultV2 browseV2(
@@ -224,7 +227,8 @@ public BrowseResultV2 browseV2(
@Nullable Filter filter,
@Nonnull String input,
int start,
- int count);
+ int count,
+ @Nullable SearchFlags searchFlags);
/**
* Gets a list of paths for a given urn.