From 3dd1c4cd414b0d807c128f4df8daaa2f01d399aa Mon Sep 17 00:00:00 2001
From: david-leifker <114954101+david-leifker@users.noreply.github.com>
Date: Thu, 6 Jun 2024 15:38:54 -0500
Subject: [PATCH 1/4] feat(entity-client): restli batchGetV2 batchSize fix and
concurrency (#10630)
---
datahub-frontend/app/auth/AuthModule.java | 4 +-
datahub-frontend/conf/application.conf | 6 +-
...eConsumerApplicationTestConfiguration.java | 3 +-
.../src/main/resources/application.yaml | 1 +
.../RestliEntityClientFactory.java | 15 +-
.../entity/client/RestliEntityClient.java | 180 +++++++++++-------
.../client/SystemRestliEntityClient.java | 5 +-
.../common/client/BaseClientTest.java | 6 +-
.../client/SystemRestliEntityClientTest.java | 12 +-
.../metadata/service/BaseService.java | 6 +-
10 files changed, 150 insertions(+), 88 deletions(-)
diff --git a/datahub-frontend/app/auth/AuthModule.java b/datahub-frontend/app/auth/AuthModule.java
index 7db8f5689ead5..32dfba00d47db 100644
--- a/datahub-frontend/app/auth/AuthModule.java
+++ b/datahub-frontend/app/auth/AuthModule.java
@@ -63,6 +63,7 @@ public class AuthModule extends AbstractModule {
private static final String ENTITY_CLIENT_RETRY_INTERVAL = "entityClient.retryInterval";
private static final String ENTITY_CLIENT_NUM_RETRIES = "entityClient.numRetries";
private static final String ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE = "entityClient.restli.get.batchSize";
+ private static final String ENTITY_CLIENT_RESTLI_GET_BATCH_CONCURRENCY = "entityClient.restli.get.batchConcurrency";
private static final String GET_SSO_SETTINGS_ENDPOINT = "auth/getSsoSettings";
private final com.typesafe.config.Config _configs;
@@ -208,7 +209,8 @@ protected SystemEntityClient provideEntityClient(
new ExponentialBackoff(_configs.getInt(ENTITY_CLIENT_RETRY_INTERVAL)),
_configs.getInt(ENTITY_CLIENT_NUM_RETRIES),
configurationProvider.getCache().getClient().getEntityClient(),
- Math.max(1, _configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE)));
+ Math.max(1, _configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE)),
+ Math.max(1, _configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_CONCURRENCY)));
}
@Provides
diff --git a/datahub-frontend/conf/application.conf b/datahub-frontend/conf/application.conf
index 6aa58d5b13b2c..045175ba69f02 100644
--- a/datahub-frontend/conf/application.conf
+++ b/datahub-frontend/conf/application.conf
@@ -289,5 +289,7 @@ entityClient.retryInterval = 2
entityClient.retryInterval = ${?ENTITY_CLIENT_RETRY_INTERVAL}
entityClient.numRetries = 3
entityClient.numRetries = ${?ENTITY_CLIENT_NUM_RETRIES}
-entityClient.restli.get.batchSize = 100
-entityClient.restli.get.batchSize = ${?ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE}
\ No newline at end of file
+entityClient.restli.get.batchSize = 50
+entityClient.restli.get.batchSize = ${?ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE}
+entityClient.restli.get.batchConcurrency = 2
+entityClient.restli.get.batchConcurrency = ${?ENTITY_CLIENT_RESTLI_GET_BATCH_CONCURRENCY}
\ No newline at end of file
diff --git a/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTestConfiguration.java b/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTestConfiguration.java
index 08ff802c37e40..ba650c25a6117 100644
--- a/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTestConfiguration.java
+++ b/metadata-jobs/mce-consumer-job/src/test/java/com/linkedin/metadata/kafka/MceConsumerApplicationTestConfiguration.java
@@ -47,7 +47,8 @@ public SystemEntityClient systemEntityClient(
new ExponentialBackoff(1),
1,
configurationProvider.getCache().getClient().getEntityClient(),
- 1);
+ 1,
+ 2);
}
@MockBean public Database ebeanServer;
diff --git a/metadata-service/configuration/src/main/resources/application.yaml b/metadata-service/configuration/src/main/resources/application.yaml
index 19621dce767c6..4d188bd5c6183 100644
--- a/metadata-service/configuration/src/main/resources/application.yaml
+++ b/metadata-service/configuration/src/main/resources/application.yaml
@@ -386,6 +386,7 @@ entityClient:
restli:
get:
batchSize: ${ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE:100} # limited to prevent exceeding restli URI size limit
+ batchConcurrency: ${ENTITY_CLIENT_RESTLI_GET_BATCH_CONCURRENCY:2} # parallel threads
usageClient:
retryInterval: ${USAGE_CLIENT_RETRY_INTERVAL:2}
diff --git a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/RestliEntityClientFactory.java b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/RestliEntityClientFactory.java
index 2d9f570e1b07d..9e7255bf43a34 100644
--- a/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/RestliEntityClientFactory.java
+++ b/metadata-service/factories/src/main/java/com/linkedin/gms/factory/entityclient/RestliEntityClientFactory.java
@@ -30,7 +30,8 @@ public EntityClient entityClient(
@Value("${datahub.gms.sslContext.protocol}") String gmsSslProtocol,
@Value("${entityClient.retryInterval:2}") int retryInterval,
@Value("${entityClient.numRetries:3}") int numRetries,
- final @Value("${entityClient.restli.get.batchSize:150}") int batchGetV2Size) {
+ final @Value("${entityClient.restli.get.batchSize}") int batchGetV2Size,
+ final @Value("${entityClient.restli.get.batchConcurrency}") int batchGetV2Concurrency) {
final Client restClient;
if (gmsUri != null) {
restClient = DefaultRestliClientFactory.getRestLiClient(URI.create(gmsUri), gmsSslProtocol);
@@ -39,7 +40,11 @@ public EntityClient entityClient(
DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol);
}
return new RestliEntityClient(
- restClient, new ExponentialBackoff(retryInterval), numRetries, batchGetV2Size);
+ restClient,
+ new ExponentialBackoff(retryInterval),
+ numRetries,
+ batchGetV2Size,
+ batchGetV2Concurrency);
}
@Bean("systemEntityClient")
@@ -53,7 +58,8 @@ public SystemEntityClient systemEntityClient(
@Value("${entityClient.retryInterval:2}") int retryInterval,
@Value("${entityClient.numRetries:3}") int numRetries,
final EntityClientCacheConfig entityClientCacheConfig,
- final @Value("${entityClient.restli.get.batchSize:150}") int batchGetV2Size) {
+ final @Value("${entityClient.restli.get.batchSize}") int batchGetV2Size,
+ final @Value("${entityClient.restli.get.batchConcurrency}") int batchGetV2Concurrency) {
final Client restClient;
if (gmsUri != null) {
@@ -67,6 +73,7 @@ public SystemEntityClient systemEntityClient(
new ExponentialBackoff(retryInterval),
numRetries,
entityClientCacheConfig,
- batchGetV2Size);
+ batchGetV2Size,
+ batchGetV2Concurrency);
}
}
diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java
index 70fae208ad77a..fe1ca571efea5 100644
--- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java
+++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java
@@ -85,8 +85,13 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.mail.MethodNotSupportedException;
@@ -110,14 +115,17 @@ public class RestliEntityClient extends BaseClient implements EntityClient {
private static final RunsRequestBuilders RUNS_REQUEST_BUILDERS = new RunsRequestBuilders();
private final int batchGetV2Size;
+ private final int batchGetV2Concurrency;
public RestliEntityClient(
@Nonnull final Client restliClient,
@Nonnull final BackoffPolicy backoffPolicy,
int retryCount,
- int batchGetV2Size) {
+ int batchGetV2Size,
+ int batchGetV2Concurrency) {
super(restliClient, backoffPolicy, retryCount);
this.batchGetV2Size = Math.max(1, batchGetV2Size);
+ this.batchGetV2Concurrency = batchGetV2Concurrency;
}
@Override
@@ -150,7 +158,6 @@ public Entity get(@Nonnull OperationContext opContext, @Nonnull final Urn urn)
*
Batch get a set of {@link Entity} objects by urn.
*
* @param urns the urns of the entities to batch get
- * @param authentication the authentication to include in the request to the Metadata Service
* @throws RemoteInvocationException when unable to execute request
*/
@Override
@@ -216,40 +223,54 @@ public Map batchGetV2(
throws RemoteInvocationException, URISyntaxException {
Map responseMap = new HashMap<>();
+ ExecutorService executor = Executors.newFixedThreadPool(Math.max(1, batchGetV2Concurrency));
- Iterators.partition(urns.iterator(), batchGetV2Size)
- .forEachRemaining(
- batch -> {
- try {
- final EntitiesV2BatchGetRequestBuilder requestBuilder =
- ENTITIES_V2_REQUEST_BUILDERS
- .batchGet()
- .aspectsParam(aspectNames)
- .ids(batch.stream().map(Urn::toString).collect(Collectors.toList()));
-
- responseMap.putAll(
- sendClientRequest(requestBuilder, opContext.getSessionAuthentication())
- .getEntity()
- .getResults()
- .entrySet()
- .stream()
- .collect(
- Collectors.toMap(
- entry -> {
- try {
- return Urn.createFromString(entry.getKey());
- } catch (URISyntaxException e) {
- throw new RuntimeException(
- String.format(
- "Failed to bind urn string with value %s into urn",
- entry.getKey()));
- }
- },
- entry -> entry.getValue().getEntity())));
- } catch (RemoteInvocationException e) {
- throw new RuntimeException(e);
- }
- });
+ try {
+ Iterable> iterable = () -> Iterators.partition(urns.iterator(), batchGetV2Size);
+ List>> futures =
+ StreamSupport.stream(iterable.spliterator(), false)
+ .map(
+ batch ->
+ executor.submit(
+ () -> {
+ try {
+ log.debug("Executing batchGetV2 with batch size: {}", batch.size());
+ final EntitiesV2BatchGetRequestBuilder requestBuilder =
+ ENTITIES_V2_REQUEST_BUILDERS
+ .batchGet()
+ .aspectsParam(aspectNames)
+ .ids(
+ batch.stream()
+ .map(Urn::toString)
+ .collect(Collectors.toList()));
+
+ return sendClientRequest(
+ requestBuilder, opContext.getSessionAuthentication())
+ .getEntity()
+ .getResults()
+ .entrySet()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ entry -> UrnUtils.getUrn(entry.getKey()),
+ entry -> entry.getValue().getEntity()));
+ } catch (RemoteInvocationException e) {
+ throw new RuntimeException(e);
+ }
+ }))
+ .collect(Collectors.toList());
+
+ futures.forEach(
+ result -> {
+ try {
+ responseMap.putAll(result.get());
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } finally {
+ executor.shutdown();
+ }
return responseMap;
}
@@ -260,7 +281,6 @@ public Map batchGetV2(
* @param entityName the entity type to fetch
* @param versionedUrns the urns of the entities to batch get
* @param aspectNames the aspect names to batch get
- * @param authentication the authentication to include in the request to the Metadata Service
* @throws RemoteInvocationException when unable to execute request
*/
@Override
@@ -272,39 +292,62 @@ public Map batchGetVersionedV2(
@Nullable final Set aspectNames) {
Map responseMap = new HashMap<>();
+ ExecutorService executor = Executors.newFixedThreadPool(Math.max(1, batchGetV2Concurrency));
- Iterators.partition(versionedUrns.iterator(), batchGetV2Size)
- .forEachRemaining(
- batch -> {
- final EntitiesVersionedV2BatchGetRequestBuilder requestBuilder =
- ENTITIES_VERSIONED_V2_REQUEST_BUILDERS
- .batchGet()
- .aspectsParam(aspectNames)
- .entityTypeParam(entityName)
- .ids(
- batch.stream()
- .map(
- versionedUrn ->
- com.linkedin.common.urn.VersionedUrn.of(
- versionedUrn.getUrn().toString(),
- versionedUrn.getVersionStamp()))
- .collect(Collectors.toSet()));
-
- try {
- responseMap.putAll(
- sendClientRequest(requestBuilder, opContext.getSessionAuthentication())
- .getEntity()
- .getResults()
- .entrySet()
- .stream()
- .collect(
- Collectors.toMap(
- entry -> UrnUtils.getUrn(entry.getKey().getUrn()),
- entry -> entry.getValue().getEntity())));
- } catch (RemoteInvocationException e) {
- throw new RuntimeException(e);
- }
- });
+ try {
+ Iterable> iterable =
+ () -> Iterators.partition(versionedUrns.iterator(), batchGetV2Size);
+ List>> futures =
+ StreamSupport.stream(iterable.spliterator(), false)
+ .map(
+ batch ->
+ executor.submit(
+ () -> {
+ try {
+ log.debug(
+ "Executing batchGetVersionedV2 with batch size: {}",
+ batch.size());
+ final EntitiesVersionedV2BatchGetRequestBuilder requestBuilder =
+ ENTITIES_VERSIONED_V2_REQUEST_BUILDERS
+ .batchGet()
+ .aspectsParam(aspectNames)
+ .entityTypeParam(entityName)
+ .ids(
+ batch.stream()
+ .map(
+ versionedUrn ->
+ com.linkedin.common.urn.VersionedUrn.of(
+ versionedUrn.getUrn().toString(),
+ versionedUrn.getVersionStamp()))
+ .collect(Collectors.toSet()));
+
+ return sendClientRequest(
+ requestBuilder, opContext.getSessionAuthentication())
+ .getEntity()
+ .getResults()
+ .entrySet()
+ .stream()
+ .collect(
+ Collectors.toMap(
+ entry -> UrnUtils.getUrn(entry.getKey().getUrn()),
+ entry -> entry.getValue().getEntity()));
+ } catch (RemoteInvocationException e) {
+ throw new RuntimeException(e);
+ }
+ }))
+ .collect(Collectors.toList());
+
+ futures.forEach(
+ result -> {
+ try {
+ responseMap.putAll(result.get());
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } finally {
+ executor.shutdown();
+ }
return responseMap;
}
@@ -955,7 +998,6 @@ public VersionedAspect getAspectOrNull(
* @param startTimeMillis the earliest desired event time of the aspect value in milliseconds.
* @param endTimeMillis the latest desired event time of the aspect value in milliseconds.
* @param limit the maximum number of desired aspect values.
- * @param authentication the actor associated with the request [internal]
* @return the list of EnvelopedAspect values satisfying the input parameters.
* @throws RemoteInvocationException on remote request error.
*/
diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/SystemRestliEntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/SystemRestliEntityClient.java
index 364ee9b0519d2..7546d1f0a3b54 100644
--- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/SystemRestliEntityClient.java
+++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/SystemRestliEntityClient.java
@@ -27,8 +27,9 @@ public SystemRestliEntityClient(
@Nonnull final BackoffPolicy backoffPolicy,
int retryCount,
EntityClientCacheConfig cacheConfig,
- int batchGetV2Size) {
- super(restliClient, backoffPolicy, retryCount, batchGetV2Size);
+ int batchGetV2Size,
+ int batchGetV2Concurrency) {
+ super(restliClient, backoffPolicy, retryCount, batchGetV2Size, batchGetV2Concurrency);
this.operationContextMap = CacheBuilder.newBuilder().maximumSize(500).build();
this.entityClientCache = buildEntityClientCache(SystemRestliEntityClient.class, cacheConfig);
}
diff --git a/metadata-service/restli-client/src/test/java/com/linkedin/common/client/BaseClientTest.java b/metadata-service/restli-client/src/test/java/com/linkedin/common/client/BaseClientTest.java
index 474bb24f9e16b..797ead10c1a66 100644
--- a/metadata-service/restli-client/src/test/java/com/linkedin/common/client/BaseClientTest.java
+++ b/metadata-service/restli-client/src/test/java/com/linkedin/common/client/BaseClientTest.java
@@ -37,7 +37,7 @@ public void testZeroRetry() throws RemoteInvocationException {
when(mockRestliClient.sendRequest(any(ActionRequest.class))).thenReturn(mockFuture);
RestliEntityClient testClient =
- new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 0, 10);
+ new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 0, 10, 2);
testClient.sendClientRequest(testRequestBuilder, AUTH);
// Expected 1 actual try and 0 retries
verify(mockRestliClient).sendRequest(any(ActionRequest.class));
@@ -56,7 +56,7 @@ public void testMultipleRetries() throws RemoteInvocationException {
.thenReturn(mockFuture);
RestliEntityClient testClient =
- new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 1, 10);
+ new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 1, 10, 2);
testClient.sendClientRequest(testRequestBuilder, AUTH);
// Expected 1 actual try and 1 retries
verify(mockRestliClient, times(2)).sendRequest(any(ActionRequest.class));
@@ -73,7 +73,7 @@ public void testNonRetry() {
.thenThrow(new RuntimeException(new RequiredFieldNotPresentException("value")));
RestliEntityClient testClient =
- new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 1, 10);
+ new RestliEntityClient(mockRestliClient, new ExponentialBackoff(1), 1, 10, 2);
assertThrows(
RuntimeException.class, () -> testClient.sendClientRequest(testRequestBuilder, AUTH));
}
diff --git a/metadata-service/restli-client/src/test/java/com/linkedin/entity/client/SystemRestliEntityClientTest.java b/metadata-service/restli-client/src/test/java/com/linkedin/entity/client/SystemRestliEntityClientTest.java
index 75614ca998f6a..e6d53fc98e2e3 100644
--- a/metadata-service/restli-client/src/test/java/com/linkedin/entity/client/SystemRestliEntityClientTest.java
+++ b/metadata-service/restli-client/src/test/java/com/linkedin/entity/client/SystemRestliEntityClientTest.java
@@ -45,7 +45,8 @@ public void testCache() throws RemoteInvocationException, URISyntaxException {
noCacheConfig.setEnabled(true);
SystemRestliEntityClient noCacheTest =
- new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig, 1);
+ new SystemRestliEntityClient(
+ mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig, 1, 2);
com.linkedin.entity.EntityResponse responseStatusTrue = buildStatusResponse(true);
com.linkedin.entity.EntityResponse responseStatusFalse = buildStatusResponse(false);
@@ -83,7 +84,8 @@ public void testCache() throws RemoteInvocationException, URISyntaxException {
Map.of(TEST_URN.getEntityType(), Map.of(Constants.STATUS_ASPECT_NAME, 60)));
SystemRestliEntityClient cacheTest =
- new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, cacheConfig, 1);
+ new SystemRestliEntityClient(
+ mockRestliClient, new ConstantBackoff(0), 0, cacheConfig, 1, 2);
mockResponse(mockRestliClient, responseStatusTrue);
assertEquals(
@@ -117,7 +119,8 @@ public void testBatchCache() throws RemoteInvocationException, URISyntaxExceptio
noCacheConfig.setEnabled(true);
SystemRestliEntityClient noCacheTest =
- new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig, 1);
+ new SystemRestliEntityClient(
+ mockRestliClient, new ConstantBackoff(0), 0, noCacheConfig, 1, 2);
com.linkedin.entity.EntityResponse responseStatusTrue = buildStatusResponse(true);
com.linkedin.entity.EntityResponse responseStatusFalse = buildStatusResponse(false);
@@ -155,7 +158,8 @@ public void testBatchCache() throws RemoteInvocationException, URISyntaxExceptio
Map.of(TEST_URN.getEntityType(), Map.of(Constants.STATUS_ASPECT_NAME, 60)));
SystemRestliEntityClient cacheTest =
- new SystemRestliEntityClient(mockRestliClient, new ConstantBackoff(0), 0, cacheConfig, 1);
+ new SystemRestliEntityClient(
+ mockRestliClient, new ConstantBackoff(0), 0, cacheConfig, 1, 2);
mockResponse(mockRestliClient, responseStatusTrue);
assertEquals(
diff --git a/metadata-service/services/src/main/java/com/linkedin/metadata/service/BaseService.java b/metadata-service/services/src/main/java/com/linkedin/metadata/service/BaseService.java
index 3f9022b634c67..dc533e4aa5de5 100644
--- a/metadata-service/services/src/main/java/com/linkedin/metadata/service/BaseService.java
+++ b/metadata-service/services/src/main/java/com/linkedin/metadata/service/BaseService.java
@@ -19,6 +19,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
@@ -61,8 +62,9 @@ protected Map getTagsAspects(
return finalResult;
} catch (Exception e) {
log.error(
- "Error retrieving global tags for entities. Entities: {} aspect: {}",
- entityUrns,
+ "Error retrieving global tags for {} entities. Sample Urns: {} aspect: {}",
+ entityUrns.size(),
+ entityUrns.stream().limit(10).collect(Collectors.toList()),
Constants.GLOSSARY_TERMS_ASPECT_NAME,
e);
return Collections.emptyMap();
From eda1db081b972b705737811242fb530fdd233d2f Mon Sep 17 00:00:00 2001
From: John Joyce
Date: Thu, 6 Jun 2024 14:05:44 -0700
Subject: [PATCH 2/4] docs(): Adding API docs for incidents, operations, and
assertions (#10522)
Co-authored-by: John Joyce
Co-authored-by: John Joyce
Co-authored-by: John Joyce
Co-authored-by: John Joyce
Co-authored-by: John Joyce
---
docs-website/sidebars.js | 7 +-
docs/api/tutorials/assertions.md | 1181 +++++++++++++++++
docs/api/tutorials/data-contracts.md | 217 +++
docs/api/tutorials/incidents.md | 164 +++
docs/api/tutorials/operations.md | 136 ++
.../library/dataset_read_operations.py | 19 +
.../library/dataset_report_operation.py | 19 +
.../examples/library/delete_assertion.py | 18 +
.../examples/library/run_assertion.py | 20 +
.../examples/library/run_assertions.py | 37 +
.../library/run_assertions_for_asset.py | 38 +
11 files changed, 1855 insertions(+), 1 deletion(-)
create mode 100644 docs/api/tutorials/assertions.md
create mode 100644 docs/api/tutorials/data-contracts.md
create mode 100644 docs/api/tutorials/incidents.md
create mode 100644 docs/api/tutorials/operations.md
create mode 100644 metadata-ingestion/examples/library/dataset_read_operations.py
create mode 100644 metadata-ingestion/examples/library/dataset_report_operation.py
create mode 100644 metadata-ingestion/examples/library/delete_assertion.py
create mode 100644 metadata-ingestion/examples/library/run_assertion.py
create mode 100644 metadata-ingestion/examples/library/run_assertions.py
create mode 100644 metadata-ingestion/examples/library/run_assertions_for_asset.py
diff --git a/docs-website/sidebars.js b/docs-website/sidebars.js
index 1f2208bc17978..2eb600eff74e8 100644
--- a/docs-website/sidebars.js
+++ b/docs-website/sidebars.js
@@ -754,7 +754,7 @@ module.exports = {
},
{
type: "category",
- label: "Datahub Actions",
+ label: "DataHub Actions",
link: { type: "doc", id: "docs/act-on-metadata" },
items: [
"docs/actions/README",
@@ -800,6 +800,11 @@ module.exports = {
"docs/api/tutorials/datasets",
"docs/api/tutorials/deprecation",
"docs/api/tutorials/descriptions",
+ "docs/api/tutorials/custom-properties",
+ "docs/api/tutorials/assertions",
+ "docs/api/tutorials/incidents",
+ "docs/api/tutorials/operations",
+ "docs/api/tutorials/data-contracts",
"docs/api/tutorials/domains",
"docs/api/tutorials/forms",
"docs/api/tutorials/lineage",
diff --git a/docs/api/tutorials/assertions.md b/docs/api/tutorials/assertions.md
new file mode 100644
index 0000000000000..08832ee19ff89
--- /dev/null
+++ b/docs/api/tutorials/assertions.md
@@ -0,0 +1,1181 @@
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+# Assertions
+
+
+
+This guide specifically covers how to use the Assertion APIs for **Acryl Cloud** native assertions, including:
+
+- [Freshness Assertions](/docs/managed-datahub/observe/freshness-assertions.md)
+- [Volume Assertions](/docs/managed-datahub/observe/volume-assertions.md)
+- [Column Assertions](/docs/managed-datahub/observe/column-assertions.md)
+- [Schema Assertions](/docs/managed-datahub/observe/schema-assertions.md)
+- [Custom SQL Assertions](/docs/managed-datahub/observe/custom-sql-assertions.md)
+
+## Why Would You Use Assertions APIs?
+
+The Assertions APIs allow you to create, schedule, run, and delete Assertions with Acryl Cloud.
+
+### Goal Of This Guide
+
+This guide will show you how to create, schedule, run and delete Assertions for a Table.
+
+## Prerequisites
+
+The actor making API calls must have the `Edit Assertions` and `Edit Monitors` privileges for the Tables at hand.
+
+## Create Assertions
+
+You can create new dataset Assertions to DataHub using the following APIs.
+
+
+
+
+### Freshness Assertion
+
+To create a new freshness assertion, use the `upsertDatasetFreshnessAssertionMonitor` GraphQL Mutation.
+
+```graphql
+mutation upsertDatasetFreshnessAssertionMonitor {
+ upsertDatasetFreshnessAssertionMonitor(
+ input: {
+ entityUrn: "",
+ schedule: {
+ type: FIXED_INTERVAL,
+ fixedInterval: { unit: HOUR, multiple: 8 }
+ }
+ evaluationSchedule: {
+ timezone: "America/Los_Angeles",
+ cron: "0 */8 * * *"
+ }
+ evaluationParameters: {
+ sourceType: INFORMATION_SCHEMA
+ }
+ mode: ACTIVE
+ }
+ ) {
+ urn
+ }
+}
+```
+
+This API will return a unique identifier (URN) for the new assertion if you were successful:
+
+```json
+{
+ "data": {
+ "upsertDatasetFreshnessAssertionMonitor": {
+ "urn": "urn:li:assertion:your-new-assertion-id"
+ }
+ },
+ "extensions": {}
+}
+```
+
+For more details, see the [Freshness Assertions](/docs/managed-datahub/observe/freshness-assertions.md) guide.
+
+### Volume Assertions
+
+To create a new volume assertion, use the `upsertDatasetVolumeAssertionMonitor` GraphQL Mutation.
+
+```graphql
+mutation upsertDatasetVolumeAssertionMonitor {
+ upsertDatasetVolumeAssertionMonitor(
+ input: {
+ entityUrn: ""
+ type: ROW_COUNT_TOTAL
+ rowCountTotal: {
+ operator: BETWEEN
+ parameters: {
+ minValue: {
+ value: "10"
+ type: NUMBER
+ }
+ maxValue: {
+ value: "20"
+ type: NUMBER
+ }
+ }
+ }
+ evaluationSchedule: {
+ timezone: "America/Los_Angeles"
+ cron: "0 */8 * * *"
+ }
+ evaluationParameters: {
+ sourceType: INFORMATION_SCHEMA
+ }
+ mode: ACTIVE
+ }
+ ) {
+ urn
+ }
+}
+```
+
+This API will return a unique identifier (URN) for the new assertion if you were successful:
+
+```json
+{
+ "data": {
+ "upsertDatasetVolumeAssertionMonitor": {
+ "urn": "urn:li:assertion:your-new-assertion-id"
+ }
+ },
+ "extensions": {}
+}
+```
+
+For more details, see the [Volume Assertions](/docs/managed-datahub/observe/volume-assertions.md) guide.
+
+### Column Assertions
+
+To create a new column assertion, use the `upsertDatasetFieldAssertionMonitor` GraphQL Mutation.
+
+```graphql
+mutation upsertDatasetFieldAssertionMonitor {
+ upsertDatasetFieldAssertionMonitor(
+ input: {
+ entityUrn: ""
+ type: FIELD_VALUES,
+ fieldValuesAssertion: {
+ field: {
+ path: "",
+ type: "NUMBER",
+ nativeType: "NUMBER(38,0)"
+ },
+ operator: GREATER_THAN,
+ parameters: {
+ value: {
+ type: NUMBER,
+ value: "10"
+ }
+ },
+ failThreshold: {
+ type: COUNT,
+ value: 0
+ },
+ excludeNulls: true
+ }
+ evaluationSchedule: {
+ timezone: "America/Los_Angeles"
+ cron: "0 */8 * * *"
+ }
+ evaluationParameters: {
+ sourceType: ALL_ROWS_QUERY
+ }
+ mode: ACTIVE
+ }
+ ){
+ urn
+ }
+}
+```
+
+This API will return a unique identifier (URN) for the new assertion if you were successful:
+
+```json
+{
+ "data": {
+ "upsertDatasetFieldAssertionMonitor": {
+ "urn": "urn:li:assertion:your-new-assertion-id"
+ }
+ },
+ "extensions": {}
+}
+```
+
+For more details, see the [Column Assertions](/docs/managed-datahub/observe/column-assertions.md) guide.
+
+### Custom SQL Assertions
+
+To create a new column assertion, use the `upsertDatasetSqlAssertionMonitor` GraphQL Mutation.
+
+```graphql
+mutation upsertDatasetSqlAssertionMonitor {
+ upsertDatasetSqlAssertionMonitor(
+ assertionUrn: ""
+ input: {
+ entityUrn: ""
+ type: METRIC,
+ description: "",
+ statement: "",
+ operator: GREATER_THAN_OR_EQUAL_TO,
+ parameters: {
+ value: {
+ value: "100",
+ type: NUMBER
+ }
+ }
+ evaluationSchedule: {
+ timezone: "America/Los_Angeles"
+ cron: "0 */6 * * *"
+ }
+ mode: ACTIVE
+ }
+ ) {
+ urn
+ }
+}
+```
+
+This API will return a unique identifier (URN) for the new assertion if you were successful:
+
+```json
+{
+ "data": {
+ "upsertDatasetSqlAssertionMonitor": {
+ "urn": "urn:li:assertion:your-new-assertion-id"
+ }
+ },
+ "extensions": {}
+}
+```
+
+For more details, see the [Custom SQL Assertions](/docs/managed-datahub/observe/custom-sql-assertions.md) guide.
+
+### Schema Assertions
+
+To create a new schema assertion, use the `upsertDatasetSchemaAssertionMonitor` GraphQL Mutation.
+
+```graphql
+mutation upsertDatasetSchemaAssertionMonitor {
+ upsertDatasetSchemaAssertionMonitor(
+ assertionUrn: "urn:li:assertion:existing-assertion-id",
+ input: {
+ entityUrn: "",
+ assertion: {
+ compatibility: EXACT_MATCH,
+ fields: [
+ {
+ path: "id",
+ type: STRING
+ },
+ {
+ path: "count",
+ type: NUMBER
+ },
+ {
+ path: "struct",
+ type: STRUCT
+ },
+ {
+ path: "struct.nestedBooleanField",
+ type: BOOLEAN
+ }
+ ]
+ },
+ description: "",
+ mode: ACTIVE
+ }
+ )
+}
+```
+
+This API will return a unique identifier (URN) for the new assertion if you were successful:
+
+```json
+{
+ "data": {
+ "upsertDatasetSchemaAssertionMonitor": {
+ "urn": "urn:li:assertion:your-new-assertion-id"
+ }
+ },
+ "extensions": {}
+}
+```
+
+For more details, see the [Schema Assertions](/docs/managed-datahub/observe/schema-assertions.md) guide.
+
+
+
+
+
+## Run Assertions
+
+You can use the following APIs to trigger the assertions you've created to run on-demand. This is
+particularly useful for running assertions on a custom schedule, for example from your production
+data pipelines.
+
+> **Long-Running Assertions**: The timeout for synchronously running an assertion is currently limited to a maximum of 30 seconds.
+> Each of the following APIs support an `async` parameter, which can be set to `true` to run the assertion asynchronously.
+> When set to `true`, the API will kick off the assertion run and return null immediately. To view the result of the assertion,
+> simply fetching the runEvents field of the `assertion(urn: String!)` GraphQL query.
+
+
+
+
+### Run Assertion
+
+```graphql
+mutation runAssertion {
+ runAssertion(urn: "urn:li:assertion:your-assertion-id", saveResult: true) {
+ type
+ nativeResults {
+ key
+ value
+ }
+ }
+}
+```
+
+Where **type** will contain the Result of the assertion run, either `SUCCESS`, `FAILURE`, or `ERROR`.
+
+The `saveResult` argument determines whether the result of the assertion will be saved to DataHub's backend,
+and available to view through the DataHub UI. If this is set to false, the result will NOT be stored in DataHub's
+backend. The value defaults to `true`.
+
+If the assertion is external (not natively executed by Acryl), this API will return an error.
+
+If running the assertion is successful, the result will be returned as follows:
+
+```json
+{
+ "data": {
+ "runAssertion": {
+ "type": "SUCCESS",
+ "nativeResults": [
+ {
+ "key": "Value",
+ "value": "1382"
+ }
+ ]
+ }
+ },
+ "extensions": {}
+}
+```
+
+### Run Group of Assertions
+
+```graphql
+mutation runAssertions {
+ runAssertions(urns: ["urn:li:assertion:your-assertion-id-1", "urn:li:assertion:your-assertion-id-2"], saveResults: true) {
+ passingCount
+ failingCount
+ errorCount
+ results {
+ urn
+ result {
+ type
+ nativeResults {
+ key
+ value
+ }
+ }
+ }
+ }
+}
+```
+
+Where **type** will contain the Result of the assertion run, either `SUCCESS`, `FAILURE`, or `ERROR`.
+
+The `saveResults` argument determines whether the result of the assertion will be saved to DataHub's backend,
+and available to view through the DataHub UI. If this is set to false, the result will NOT be stored in DataHub's
+backend. The value defaults to `true`.
+
+If any of the assertion are external (not natively executed by Acryl), they will simply be omitted from the result set.
+
+If running the assertions is successful, the results will be returned as follows:
+
+```json
+{
+ "data": {
+ "runAssertions": {
+ "passingCount": 2,
+ "failingCount": 0,
+ "errorCount": 0,
+ "results": [
+ {
+ "urn": "urn:li:assertion:your-assertion-id-1",
+ "result": {
+ "type": "SUCCESS",
+ "nativeResults": [
+ {
+ "key": "Value",
+ "value": "1382"
+ }
+ ]
+ }
+ },
+ {
+ "urn": "urn:li:assertion:your-assertion-id-2",
+ "result": {
+ "type": "FAILURE",
+ "nativeResults": [
+ {
+ "key": "Value",
+ "value": "12323"
+ }
+ ]
+ }
+ }
+ ]
+ }
+ },
+ "extensions": {}
+}
+```
+
+Where you should see one result object for each assertion.
+
+### Run All Assertions for Table
+
+You can also run all assertions for a specific data asset using the `runAssertionsForAsset` mutation.
+
+```graphql
+mutation runAssertionsForAsset {
+ runAssertionsForAsset(urn: "urn:li:dataset:(urn:li:dataPlatform:snowflake,purchase_events,PROD)", saveResults: true) {
+ passingCount
+ failingCount
+ errorCount
+ results {
+ urn
+ result {
+ type
+ nativeResults {
+ key
+ value
+ }
+ }
+ }
+ }
+}
+```
+
+Where `type` will contain the Result of the assertion run, either `SUCCESS`, `FAILURE`, or `ERROR`.
+
+The `saveResults` argument determines whether the result of the assertion will be saved to DataHub's backend,
+and available to view through the DataHub UI. If this is set to false, the result will NOT be stored in DataHub's
+backend. The value defaults to `true`.
+
+If any of the assertion are external (not natively executed by Acryl), they will simply be omitted from the result
+set.
+
+If running the assertions is successful, the results will be returned as follows:
+
+```json
+{
+ "data": {
+ "runAssertionsForAsset": {
+ "passingCount": 2,
+ "failingCount": 0,
+ "errorCount": 0,
+ "results": [
+ {
+ "urn": "urn:li:assertion:your-assertion-id-1",
+ "result": {
+ "type": "SUCCESS",
+ "nativeResults": [
+ {
+ "key": "Value",
+ "value": "1382"
+ }
+ ]
+ }
+ },
+ {
+ "urn": "urn:li:assertion:your-assertion-id-2",
+ "result": {
+ "type": "FAILURE",
+ "nativeResults": [
+ {
+ "key": "Value",
+ "value": "12323"
+ }
+ ]
+ }
+ }
+ ]
+ }
+ },
+ "extensions": {}
+}
+```
+
+Where you should see one result object for each assertion.
+
+### Run Group of Assertions for Table
+
+If you don't always want to run _all_ assertions for a given table, you can also opt to run a subset of the
+table's assertions using *Assertion Tags*. First, you'll add tags to your assertions to group and categorize them,
+then you'll call the `runAssertionsForAsset` mutation with the `tagUrns` argument to filter for assertions having those tags.
+
+#### Step 1: Adding Tag to an Assertion
+
+Currently, you can add tags to an assertion only via the DataHub GraphQL API. You can do this using the following mutation:
+
+```graphql
+mutation addTags {
+ addTag(input: {
+ resourceUrn: "urn:li:assertion:your-assertion",
+ tagUrn: "urn:li:tag:my-important-tag",
+ })
+}
+```
+
+#### Step 2: Run All Assertions for a Table with Tags
+
+Now, you can run all assertions for a table with a specific tag(s) using the `runAssertionsForAsset` mutation with the
+`tagUrns` input parameter:
+
+```graphql
+mutation runAssertionsForAsset {
+ runAssertionsForAsset(urn: "urn:li:dataset:(urn:li:dataPlatform:snowflake,purchase_events,PROD)", tagUrns: ["urn:li:tag:my-important-tag"]) {
+ passingCount
+ failingCount
+ errorCount
+ results {
+ urn
+ result {
+ type
+ nativeResults {
+ key
+ value
+ }
+ }
+ }
+ }
+}
+```
+
+**Coming Soon**: Support for adding tags to assertions through the DataHub UI.
+
+
+
+
+
+### Run Assertion
+
+```python
+{{ inline /metadata-ingestion/examples/library/run_assertion.py show_path_as_comment }}
+```
+
+### Run Group of Assertions
+
+```python
+{{ inline /metadata-ingestion/examples/library/run_assertions.py show_path_as_comment }}
+```
+
+### Run All Assertions for Table
+
+```python
+{{ inline /metadata-ingestion/examples/library/run_assertions_for_asset.py show_path_as_comment }}
+```
+
+
+
+
+
+### Experimental: Providing Dynamic Parameters to Assertions
+
+You can provide **dynamic parameters** to your assertions to customize their behavior. This is particularly useful for
+assertions that require dynamic parameters, such as a threshold value that changes based on the time of day.
+
+Dynamic parameters can be injected into the SQL fragment portion of any Assertion. For example, it can appear
+in any part of the SQL statement in a [Custom SQL](/docs/managed-datahub/observe/custom-sql-assertions.md) Assertion,
+or it can appear in the **Advanced > Filter** section of a [Column](/docs/managed-datahub/observe/column-assertions.md),
+[Volume](/docs/managed-datahub/observe/volume-assertions.md), or [Freshness](/docs/managed-datahub/observe/freshness-assertions.md) Assertion.
+
+To do so, you'll first need to edit the SQL fragment to include the dynamic parameter. Dynamic parameters appear
+as `${parameterName}` in the SQL fragment.
+
+Next, you'll call the `runAssertion`, `runAssertions`, or `runAssertionsForAsset` mutations with the `parameters` input argument.
+This argument is a list of key-value tuples, where the key is the parameter name and the value is the parameter value:
+
+```graphql
+mutation runAssertion {
+ runAssertion(urn: "urn:li:assertion:your-assertion-id", parameters: [{key: "parameterName", value: "parameterValue"}]) {
+ type
+ nativeResults {
+ key
+ value
+ }
+ }
+}
+```
+
+At runtime, the `${parameterName}` placeholder in the SQL fragment will be replaced with the provided `parameterValue` before the query
+is sent to the database for execution.
+
+## Get Assertion Details
+
+You can use the following APIs to
+
+1. Fetch existing assertion definitions + run history
+2. Fetch the assertions associated with a given table + their run history.
+
+
+
+
+### Get Assertions for Table
+
+To retrieve all the assertions for a table, you can use the following GraphQL Query.
+
+```graphql
+query dataset {
+ dataset(urn: "urn:li:dataset:(urn:li:dataPlatform:snowflake,purchases,PROD)") {
+ assertions(start: 0, count: 1000) {
+ start
+ count
+ total
+ assertions {
+ # Fetch the last run of each associated assertion.
+ runEvents(status: COMPLETE, limit: 1) {
+ total
+ failed
+ succeeded
+ runEvents {
+ timestampMillis
+ status
+ result {
+ type
+ nativeResults {
+ key
+ value
+ }
+ }
+ }
+ }
+ info {
+ type
+ description
+ lastUpdated {
+ time
+ actor
+ }
+ datasetAssertion {
+ datasetUrn
+ scope
+ aggregation
+ operator
+ parameters {
+ value {
+ value
+ type
+ }
+ minValue {
+ value
+ type
+ }
+ maxValue {
+ value
+ type
+ }
+ }
+ fields {
+ urn
+ path
+ }
+ nativeType
+ nativeParameters {
+ key
+ value
+ }
+ logic
+ }
+ freshnessAssertion {
+ type
+ entityUrn
+ schedule {
+ type
+ cron {
+ cron
+ timezone
+ }
+ fixedInterval {
+ unit
+ multiple
+ }
+ }
+ filter {
+ type
+ sql
+ }
+ }
+ sqlAssertion {
+ type
+ entityUrn
+ statement
+ changeType
+ operator
+ parameters {
+ value {
+ value
+ type
+ }
+ minValue {
+ value
+ type
+ }
+ maxValue {
+ value
+ type
+ }
+ }
+ }
+ fieldAssertion {
+ type
+ entityUrn
+ filter {
+ type
+ sql
+ }
+ fieldValuesAssertion {
+ field {
+ path
+ type
+ nativeType
+ }
+ transform {
+ type
+ }
+ operator
+ parameters {
+ value {
+ value
+ type
+ }
+ minValue {
+ value
+ type
+ }
+ maxValue {
+ value
+ type
+ }
+ }
+ failThreshold {
+ type
+ value
+ }
+ excludeNulls
+ }
+ fieldMetricAssertion {
+ field {
+ path
+ type
+ nativeType
+ }
+ metric
+ operator
+ parameters {
+ value {
+ value
+ type
+ }
+ minValue {
+ value
+ type
+ }
+ maxValue {
+ value
+ type
+ }
+ }
+ }
+ }
+ volumeAssertion {
+ type
+ entityUrn
+ filter {
+ type
+ sql
+ }
+ rowCountTotal {
+ operator
+ parameters {
+ value {
+ value
+ type
+ }
+ minValue {
+ value
+ type
+ }
+ maxValue {
+ value
+ type
+ }
+ }
+ }
+ rowCountChange {
+ type
+ operator
+ parameters {
+ value {
+ value
+ type
+ }
+ minValue {
+ value
+ type
+ }
+ maxValue {
+ value
+ type
+ }
+ }
+ }
+ }
+ schemaAssertion {
+ entityUrn
+ compatibility
+ fields {
+ path
+ type
+ nativeType
+ }
+ schema {
+ fields {
+ fieldPath
+ type
+ nativeDataType
+ }
+ }
+ }
+ source {
+ type
+ created {
+ time
+ actor
+ }
+ }
+ }
+ }
+ }
+ }
+}
+```
+
+### Get Assertion Details
+
+You can use the following GraphQL query to fetch the details for an assertion along with its evaluation history by URN.
+
+```graphql
+query getAssertion {
+ assertion(urn: "urn:li:assertion:assertion-id") {
+ # Fetch the last 10 runs for the assertion.
+ runEvents(status: COMPLETE, limit: 10) {
+ total
+ failed
+ succeeded
+ runEvents {
+ timestampMillis
+ status
+ result {
+ type
+ nativeResults {
+ key
+ value
+ }
+ }
+ }
+ }
+ info {
+ type
+ description
+ lastUpdated {
+ time
+ actor
+ }
+ datasetAssertion {
+ datasetUrn
+ scope
+ aggregation
+ operator
+ parameters {
+ value {
+ value
+ type
+ }
+ minValue {
+ value
+ type
+ }
+ maxValue {
+ value
+ type
+ }
+ }
+ fields {
+ urn
+ path
+ }
+ nativeType
+ nativeParameters {
+ key
+ value
+ }
+ logic
+ }
+ freshnessAssertion {
+ type
+ entityUrn
+ schedule {
+ type
+ cron {
+ cron
+ timezone
+ }
+ fixedInterval {
+ unit
+ multiple
+ }
+ }
+ filter {
+ type
+ sql
+ }
+ }
+ sqlAssertion {
+ type
+ entityUrn
+ statement
+ changeType
+ operator
+ parameters {
+ value {
+ value
+ type
+ }
+ minValue {
+ value
+ type
+ }
+ maxValue {
+ value
+ type
+ }
+ }
+ }
+ fieldAssertion {
+ type
+ entityUrn
+ filter {
+ type
+ sql
+ }
+ fieldValuesAssertion {
+ field {
+ path
+ type
+ nativeType
+ }
+ transform {
+ type
+ }
+ operator
+ parameters {
+ value {
+ value
+ type
+ }
+ minValue {
+ value
+ type
+ }
+ maxValue {
+ value
+ type
+ }
+ }
+ failThreshold {
+ type
+ value
+ }
+ excludeNulls
+ }
+ fieldMetricAssertion {
+ field {
+ path
+ type
+ nativeType
+ }
+ metric
+ operator
+ parameters {
+ value {
+ value
+ type
+ }
+ minValue {
+ value
+ type
+ }
+ maxValue {
+ value
+ type
+ }
+ }
+ }
+ }
+ volumeAssertion {
+ type
+ entityUrn
+ filter {
+ type
+ sql
+ }
+ rowCountTotal {
+ operator
+ parameters {
+ value {
+ value
+ type
+ }
+ minValue {
+ value
+ type
+ }
+ maxValue {
+ value
+ type
+ }
+ }
+ }
+ rowCountChange {
+ type
+ operator
+ parameters {
+ value {
+ value
+ type
+ }
+ minValue {
+ value
+ type
+ }
+ maxValue {
+ value
+ type
+ }
+ }
+ }
+ }
+ schemaAssertion {
+ entityUrn
+ compatibility
+ fields {
+ path
+ type
+ nativeType
+ }
+ schema {
+ fields {
+ fieldPath
+ type
+ nativeDataType
+ }
+ }
+ }
+ source {
+ type
+ created {
+ time
+ actor
+ }
+ }
+ }
+ }
+}
+```
+
+
+
+
+
+```python
+Python support coming soon!
+```
+
+
+
+
+## Add Tag to Assertion
+
+You can add tags to individual assertions to group and categorize them, for example by its priority or severity.
+Note that the tag should already exist in DataHub, or the operation will fail.
+
+
+
+
+```graphql
+mutation addTags {
+ addTag(input: {
+ resourceUrn: "urn:li:assertion:your-assertion",
+ tagUrn: "urn:li:tag:my-important-tag",
+ })
+}
+```
+
+If you see the following response, the operation was successful:
+
+```json
+{
+ "data": {
+ "addTag": true
+ },
+ "extensions": {}
+}
+```
+
+You can create new tags using the `createTag` mutation or via the UI.
+
+
+
+
+## Delete Assertions
+
+You can use delete dataset operations to DataHub using the following APIs.
+
+
+
+
+```graphql
+mutation deleteAssertion {
+ deleteAssertion(urn: "urn:li:assertion:test")
+}
+```
+
+If you see the following response, the operation was successful:
+
+```json
+{
+ "data": {
+ "deleteAssertion": true
+ },
+ "extensions": {}
+}
+```
+
+
+
+
+
+```python
+{{ inline /metadata-ingestion/examples/library/delete_assertion.py show_path_as_comment }}
+```
+
+
+
+
+## (Advanced) Create and Report Results for Custom Assertions
+
+If you'd like to create and report results for your own custom assertions, e.g. those which are run and
+evaluated outside of Acryl, you need to generate 2 important Assertion Entity aspects, and give the assertion a unique
+URN of the following format:
+
+
+1. Generate a unique URN for your assertion
+
+```plaintext
+urn:li:assertion:
+```
+
+2. Generate the [**AssertionInfo**](/docs/generated/metamodel/entities/assertion.md#assertion-info) aspect for the assertion. You can do this using the Python SDK. Give your assertion a `type` and a `source`
+with type `EXTERNAL` to mark it as an external assertion, not run by DataHub itself.
+
+3. Generate the [**AssertionRunEvent**](/docs/generated/metamodel/entities/assertion.md#assertionrunevent-timeseries) timeseries aspect using the Python SDK. This aspect should contain the result of the assertion
+run at a given timestamp and will be shown on the results graph in DataHub's UI.
+
diff --git a/docs/api/tutorials/data-contracts.md b/docs/api/tutorials/data-contracts.md
new file mode 100644
index 0000000000000..ac19920a5c4b7
--- /dev/null
+++ b/docs/api/tutorials/data-contracts.md
@@ -0,0 +1,217 @@
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+# Data Contracts
+
+
+
+This guide specifically covers how to use the Data Contract APIs with **Acryl Cloud**.
+
+## Why Would You Use Data Contract APIs?
+
+The Assertions APIs allow you to create, update, and evaluate Data Contracts programmatically. This is particularly
+useful to automate the monitoring of data quality and schema compliance for your data.
+
+### Goal Of This Guide
+
+This guide will show you how to create, update, and check the status of aData Contract.
+
+## Prerequisites
+
+### Privileges Required
+
+The actor making API calls must have the `Edit Data Contract` privileges for the Tables at hand.
+
+### Assertions
+
+Before creating a Data Contract, you should have already created the Assertions that you want to associate with the Data Contract.
+Check out the [Assertions](/docs/api/tutorials/assertions.md) guide for details on how to create DataHub Assertions.
+
+## Create & Update Data Contract
+
+You can create a new Data Contract, which is simply bundle of "important" assertions, using the following APIs.
+
+
+
+
+To create or update a Data Contract, simply use the `upsertDataContract` GraphQL Mutation.
+
+```graphql
+mutation upsertDataContract {
+ upsertDataContract(
+ input: {
+ entityUrn: "urn:li:dataset:(urn:li:dataPlatform:snowflake,purchases,PROD)", # Table to Create Contract for
+ freshness: [
+ {
+ assertionUrn: "urn:li:assertion:your-freshness-assertion-id",
+ }
+ ],
+ schema: [
+ {
+ assertionUrn: "urn:li:assertion:your-schema-assertion-id",
+ }
+ ],
+ dataQuality: [
+ {
+ assertionUrn: "urn:li:assertion:your-column-assertion-id-1",
+ },
+ {
+ assertionUrn: "urn:li:assertion:your-column-assertion-id-2",
+ }
+ ]
+ }) {
+ urn
+ }
+ )
+}
+```
+
+This API will return a unique identifier (URN) for the Data Contract if you were successful:
+
+```json
+{
+ "data": {
+ "upsertDataContract": {
+ "urn": "urn:li:dataContract:your-new-contract-id"
+ }
+ },
+ "extensions": {}
+}
+```
+
+If you want to update an existing Data Contract, you can use the same API, but also passing the `urn` parameter in the
+`upsertDataContract` mutation.
+
+```graphql
+mutation upsertDataContract {
+ upsertDataContract(
+ urn: "urn:li:dataContract:your-existing-contract-id",
+ input: {
+ freshness: [
+ {
+ assertionUrn: "urn:li:assertion:your-freshness-assertion-id",
+ }
+ ],
+ schema: [
+ {
+ assertionUrn: "urn:li:assertion:your-schema-assertion-id",
+ }
+ ],
+ dataQuality: [
+ {
+ assertionUrn: "urn:li:assertion:your-column-assertion-id-1",
+ },
+ {
+ assertionUrn: "urn:li:assertion:your-column-assertion-id-2",
+ }
+ ]
+ }) {
+ urn
+ }
+ )
+}
+```
+
+
+
+
+## Check Contract Status
+
+You can use the following APIs to check whether a Data Contract is passing or failing, which is determined
+by the last status of the assertions associated with the contract.
+
+
+
+
+
+### Check Contract Status for Table
+
+```graphql
+query getTableContractStatus {
+ dataset(urn: "urn:li:dataset(urn:li:dataPlatform:snowflake,purchases,PROD") {
+ contract {
+ result {
+ type # Passing or Failing.
+ assertionResults { # Results of each contract assertion.
+ assertion {
+ urn
+ }
+ result {
+ type
+ nativeResults {
+ key
+ value
+ }
+ }
+ }
+ }
+ }
+ }
+}
+```
+
+You can also _force refresh_ all of the Contract Assertions by evaluating them on-demand by providing the `refresh` argument
+in your query.
+
+```graphql
+query getTableContractStatus {
+ dataset(urn: "urn:li:dataset(urn:li:dataPlatform:snowflake,purchases,PROD") {
+ contract(refresh: true) {
+ ... same
+ }
+ }
+}
+```
+
+This will run any native Acryl assertions comprising the Data Contract. Be careful! This can take a while depending on how many native assertions are part of the contract.
+
+If you're successful, you'll get the latest status for the Table Contract:
+
+```json
+{
+ "data": {
+ "dataset": {
+ "contract": {
+ "result": {
+ "type": "PASSING",
+ "assertionResults": [
+ {
+ "assertion": {
+ "urn": "urn:li:assertion:your-freshness-assertion-id"
+ },
+ "result": {
+ "type": "SUCCESS",
+ "nativeResults": [
+ {
+ "key": "Value",
+ "value": "1382"
+ }
+ ]
+ }
+ },
+ {
+ "assertion": {
+ "urn": "urn:li:assertion:your-volume-assertion-id"
+ },
+ "result": {
+ "type": "SUCCESS",
+ "nativeResults": [
+ {
+ "key": "Value",
+ "value": "12323"
+ }
+ ]
+ }
+ }
+ ]
+ }
+ }
+ }
+ },
+ "extensions": {}
+}
+```
+
+
+
+
diff --git a/docs/api/tutorials/incidents.md b/docs/api/tutorials/incidents.md
new file mode 100644
index 0000000000000..20a24d58a1db4
--- /dev/null
+++ b/docs/api/tutorials/incidents.md
@@ -0,0 +1,164 @@
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+# Incidents
+
+## Why Would You Use Incidents APIs?
+
+The Incidents APIs allow you to raise, retrieve, update and resolve data incidents via API. This is
+useful for raising or resolving data incidents programmatically, for example from Airflow, Prefect, or Dagster DAGs.
+Incidents are also useful for conditional Circuit Breaking in these pipelines.
+
+### Goal Of This Guide
+
+This guide will show you how to raise, retrieve, update and resolve data incidents via API.
+
+## Prerequisites
+
+The actor making API calls must have the `Edit Incidents` privileges for the Tables at hand.
+
+## Raise Incident
+
+You can raise a new Data Incident for an existing asset using the following APIs.
+
+
+
+
+```graphql
+mutation raiseIncident {
+ raiseIncident(
+ input: {
+ resourceUrn: "urn:li:dataset:(urn:li:dataPlatform:snowflake,public.prod.purchases,PROD)",
+ type: OPERATIONAL,
+ title: "Data is Delayed",
+ description: "Data is delayed on May 15, 2024 because of downtime in the Spark Cluster.",
+ }
+ )
+}
+```
+
+Where `resourceUrn` is the unique identifier for the data asset (dataset, dashboard, chart, data job, or data flow) you want to raise the incident on.
+
+Where supported Incident Types include
+
+- `OPERATIONAL`
+- `FRESHNESS`
+- `VOLUME`
+- `COLUMN`
+- `SQL`
+- `DATA_SCHEMA`
+- `CUSTOM`
+
+If you see the following response, a unique identifier for the new incident will be returned.
+
+```json
+{
+ "data": {
+ "raiseIncident": "urn:li:incident:new-incident-id"
+ },
+ "extensions": {}
+}
+```
+
+
+
+
+
+```
+Python SDK support coming soon!
+```
+
+
+
+
+
+## Get Incidents For Data Asset
+
+You can use retrieve the incidents and their statuses for a given Data Asset using the following APIs.
+
+
+
+
+```graphql
+query getAssetIncidents {
+ dataset(urn: "urn:li:dataset:(urn:li:dataPlatform:snowflake,public.prod.purchases,PROD)") {
+ incidents(
+ state: ACTIVE, start: 0, count: 20
+ ) {
+ start
+ count
+ total
+ incidents {
+ urn
+ incidentType
+ title
+ description
+ status {
+ state
+ lastUpdated {
+ time
+ actor
+ }
+ }
+ }
+ }
+ }
+}
+```
+
+Where you can filter for active incidents by passing the `ACTIVE` state and resolved incidents by passing the `RESOLVED` state.
+This will return all relevant incidents for the dataset.
+
+
+
+
+
+```
+Python SDK support coming soon!
+```
+
+
+
+
+
+## Resolve Incidents
+
+You can update the status of an incident using the following APIs.
+
+
+
+
+```graphql
+mutation updateIncidentStatus {
+ updateIncidentStatus(
+ input: {
+ state: RESOLVED,
+ message: "The delayed data issue was resolved at 4:55pm on May 15."
+ }
+ )
+}
+```
+
+You can also reopen an incident by updating the state from `RESOLVED` to `ACTIVE`.
+
+If you see the following response, the operation was successful:
+
+```json
+{
+ "data": {
+ "updateIncidentStatus": true
+ },
+ "extensions": {}
+}
+```
+
+
+
+
+
+```
+Python SDK support coming soon!
+```
+
+
+
\ No newline at end of file
diff --git a/docs/api/tutorials/operations.md b/docs/api/tutorials/operations.md
new file mode 100644
index 0000000000000..70ede993ec95f
--- /dev/null
+++ b/docs/api/tutorials/operations.md
@@ -0,0 +1,136 @@
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+# Operations
+
+## Why Would You Use Operations APIs?
+
+The Operations APIs allow you to report operational changes that were made to a given Dataset or Table using the 'Operation' concept.
+These operations may be viewed on the Dataset Profile (e.g. as last modified time), accessed via the DataHub GraphQL API, or
+used to as inputs to Acryl Cloud [Freshness Assertions](/docs/managed-datahub/observe/freshness-assertions.md).
+
+### Goal Of This Guide
+
+This guide will show you how to report and query Operations for a Dataset.
+
+## Prerequisites
+
+For this tutorial, you need to deploy DataHub Quickstart and ingest sample data.
+For detailed steps, please refer to [DataHub Quickstart Guide](/docs/quickstart.md).
+
+:::note
+Before reporting operations for a dataset, you need to ensure the targeted dataset is already present in DataHub.
+:::
+
+## Report Operations
+
+You can use report dataset operations to DataHub using the following APIs.
+
+
+
+
+```graphql
+mutation reportOperation {
+ reportOperation(
+ input: {
+ urn: "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)",
+ operationType: INSERT,
+ sourceType: DATA_PROCESS
+ }
+ )
+}
+```
+
+Where supported operation types include
+
+- `INSERT`
+- `UPDATE`
+- `DELETE`
+- `CREATE`
+- `ALTER`
+- `DROP`
+- `CUSTOM`
+
+If you want to report an operation that happened at a specific time, you can also optionally provide
+the `timestampMillis` field. If not provided, the current server time will be used as the operation time.
+
+If you see the following response, the operation was successful:
+
+```json
+{
+ "data": {
+ "reportOperation": true
+ },
+ "extensions": {}
+}
+```
+
+
+
+
+
+```python
+{{ inline /metadata-ingestion/examples/library/dataset_report_operation.py show_path_as_comment }}
+```
+
+
+
+
+## Read Operations
+
+You can use read dataset operations to DataHub using the following APIs.
+
+
+
+
+```graphql
+query dataset {
+ dataset(urn: "urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)") {
+ operations(
+ limit: 10, filter: [], startTimeMillis: , endTimeMillis:
+ ) {
+ timestampMillis
+ operationType
+ sourceType
+ }
+ }
+}
+```
+
+Where startTimeMillis and endTimeMillis are optional. By default, operations are sorted by time descending.
+
+If you see the following response, the operation was successful:
+
+```json
+{
+ "data": {
+ "dataset": {
+ "operations": [
+ {
+ "timestampMillis": 1231232332,
+ "operationType": "INSERT",
+ "sourceType": "DATA_PROCESS"
+ }
+ ]
+ }
+ },
+ "extensions": {}
+}
+```
+
+
+
+
+
+```python
+{{ inline /metadata-ingestion/examples/library/dataset_read_operations.py show_path_as_comment }}
+```
+
+
+
+
+### Expected Outcomes of Reporting Operations
+
+Reported Operations will appear when displaying the Last Updated time for a Dataset on their DataHub Profile.
+They will also be used when selecting the `DataHub Operation` source type under the **Advanced** settings of a Freshness
+Assertion.
\ No newline at end of file
diff --git a/metadata-ingestion/examples/library/dataset_read_operations.py b/metadata-ingestion/examples/library/dataset_read_operations.py
new file mode 100644
index 0000000000000..78c9a92141cef
--- /dev/null
+++ b/metadata-ingestion/examples/library/dataset_read_operations.py
@@ -0,0 +1,19 @@
+from datahub.api.graphql import Operation
+
+DATAHUB_HOST = "https//:org.acryl.io/gms"
+DATAHUB_TOKEN = ",
+ # end_time_millis=
+)
diff --git a/metadata-ingestion/examples/library/dataset_report_operation.py b/metadata-ingestion/examples/library/dataset_report_operation.py
new file mode 100644
index 0000000000000..15ebc43dba60a
--- /dev/null
+++ b/metadata-ingestion/examples/library/dataset_report_operation.py
@@ -0,0 +1,19 @@
+from datahub.api.graphql import Operation
+
+DATAHUB_HOST = "https//:org.acryl.io/gms"
+DATAHUB_TOKEN = "
Date: Thu, 6 Jun 2024 16:11:45 -0500
Subject: [PATCH 3/4] feat(ci): fix conditionals and consolidate change
detection (#10649)
---
.github/actions/ci-optimization/action.yml | 5 ++
.github/workflows/docker-unified.yml | 95 +++++++---------------
2 files changed, 36 insertions(+), 64 deletions(-)
diff --git a/.github/actions/ci-optimization/action.yml b/.github/actions/ci-optimization/action.yml
index 0dcbdcecf34ad..2f677a0e552c2 100644
--- a/.github/actions/ci-optimization/action.yml
+++ b/.github/actions/ci-optimization/action.yml
@@ -17,6 +17,9 @@ outputs:
ingestion-change:
description: "Ingestion code has changed"
value: ${{ steps.filter.outputs.ingestion == 'true' }}
+ ingestion-base-change:
+ description: "Ingestion base image docker image has changed"
+ value: ${{ steps.filter.outputs.ingestion-base == 'true' }}
frontend-change:
description: "Frontend code has changed"
value: ${{ steps.filter.outputs.frontend == 'true' }}
@@ -56,6 +59,8 @@ runs:
- "metadata-models/**"
- "smoke-test/**"
- "docker/datahub-ingestion**"
+ ingestion-base:
+ - "docker/datahub-ingestion-base/**"
docker:
- "docker/**"
backend:
diff --git a/.github/workflows/docker-unified.yml b/.github/workflows/docker-unified.yml
index fe50ad67c6492..53ecb762912b7 100644
--- a/.github/workflows/docker-unified.yml
+++ b/.github/workflows/docker-unified.yml
@@ -52,6 +52,7 @@ jobs:
repository_name: ${{ steps.tag.outputs.repository_name }}
frontend_change: ${{ steps.ci-optimize.outputs.frontend-change == 'true' }}
ingestion_change: ${{ steps.ci-optimize.outputs.ingestion-change == 'true' }}
+ ingestion_base_change: ${{ steps.ci-optimize.outputs.ingestion-base-change == 'true' }}
backend_change: ${{ steps.ci-optimize.outputs.backend-change == 'true' }}
frontend_only: ${{ steps.ci-optimize.outputs.frontend-only == 'true' }}
ingestion_only: ${{ steps.ci-optimize.outputs.ingestion-only == 'true' }}
@@ -528,14 +529,8 @@ jobs:
steps:
- name: Check out the repo
uses: acryldata/sane-checkout-action@v3
- - uses: dorny/paths-filter@v2
- id: filter
- with:
- filters: |
- datahub-ingestion-base:
- - 'docker/datahub-ingestion-base/**'
- name: Build and push Base Image
- if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' }}
+ if: ${{ needs.setup.outputs.ingestion_base_change == 'true' }}
uses: ./.github/actions/docker-custom-build-and-push
with:
target: base
@@ -550,7 +545,7 @@ jobs:
platforms: linux/amd64,linux/arm64/v8
- name: Compute DataHub Ingestion (Base) Tag
id: tag
- run: echo "tag=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }}" >> $GITHUB_OUTPUT
+ run: echo "tag=${{ needs.setup.outputs.ingestion_base_change == 'true' && needs.setup.outputs.unique_tag || 'head' }}" >> $GITHUB_OUTPUT
datahub_ingestion_base_slim_build:
name: Build and Push DataHub Ingestion (Base-Slim) Docker Image
runs-on: ubuntu-latest
@@ -561,25 +556,19 @@ jobs:
steps:
- name: Check out the repo
uses: acryldata/sane-checkout-action@v3
- - uses: dorny/paths-filter@v2
- id: filter
- with:
- filters: |
- datahub-ingestion-base:
- - 'docker/datahub-ingestion-base/**'
- name: Download Base Image
uses: ishworkh/docker-image-artifact-download@v1
- if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }}
+ if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && needs.setup.outputs.ingestion_base_change == 'true' }}
with:
- image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }}
+ image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ needs.setup.outputs.ingestion_base_change == 'true' && needs.setup.outputs.unique_tag || 'head' }}
- name: Login to DockerHub
uses: docker/login-action@v3
- if: ${{ needs.setup.outputs.docker-login == 'true' && needs.setup.outputs.publish == 'false' && needs.setup.outputs.pr-publish == 'false' && steps.filter.outputs.datahub-ingestion-base == 'false' }}
+ if: ${{ needs.setup.outputs.docker-login == 'true' && needs.setup.outputs.publish == 'false' && needs.setup.outputs.pr-publish == 'false' && needs.setup.outputs.ingestion_base_change == 'false' }}
with:
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
- name: Build and push Base-Slim Image
- if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' }}
+ if: ${{ needs.setup.outputs.ingestion_base_change == 'true' }}
uses: ./.github/actions/docker-custom-build-and-push
with:
target: slim-install
@@ -590,14 +579,14 @@ jobs:
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
build-args: |
APP_ENV=slim
- BASE_IMAGE=${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }}
+ BASE_IMAGE=${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ needs.setup.outputs.ingestion_base_change == 'true' && needs.setup.outputs.unique_tag || 'head' }}
publish: ${{ needs.setup.outputs.publish == 'true' || needs.setup.outputs.pr-publish == 'true' }}
context: .
file: ./docker/datahub-ingestion-base/Dockerfile
platforms: linux/amd64,linux/arm64/v8
- name: Compute DataHub Ingestion (Base-Slim) Tag
id: tag
- run: echo "tag=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_slim_tag || 'head-slim' }}" >> $GITHUB_OUTPUT
+ run: echo "tag=${{ needs.setup.outputs.ingestion_base_change == 'true' && needs.setup.outputs.unique_slim_tag || 'head-slim' }}" >> $GITHUB_OUTPUT
datahub_ingestion_base_full_build:
name: Build and Push DataHub Ingestion (Base-Full) Docker Image
runs-on: ubuntu-latest
@@ -608,25 +597,19 @@ jobs:
steps:
- name: Check out the repo
uses: acryldata/sane-checkout-action@v3
- - uses: dorny/paths-filter@v2
- id: filter
- with:
- filters: |
- datahub-ingestion-base:
- - 'docker/datahub-ingestion-base/**'
- name: Download Base Image
uses: ishworkh/docker-image-artifact-download@v1
- if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }}
+ if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && needs.setup.outputs.ingestion_base_change == 'true' }}
with:
- image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }}
+ image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ needs.setup.outputs.ingestion_base_change == 'true' && needs.setup.outputs.unique_tag || 'head' }}
- name: Login to DockerHub
uses: docker/login-action@v3
- if: ${{ needs.setup.outputs.docker-login == 'true' && needs.setup.outputs.publish == 'false' && needs.setup.outputs.pr-publish == 'false' && steps.filter.outputs.datahub-ingestion-base == 'false' }}
+ if: ${{ needs.setup.outputs.docker-login == 'true' && needs.setup.outputs.publish == 'false' && needs.setup.outputs.pr-publish == 'false' && needs.setup.outputs.ingestion_base_change == 'false' }}
with:
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
- name: Build and push (Base-Full) Image
- if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' }}
+ if: ${{ needs.setup.outputs.ingestion_base_change == 'true' }}
uses: ./.github/actions/docker-custom-build-and-push
with:
target: full-install
@@ -637,21 +620,21 @@ jobs:
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
build-args: |
APP_ENV=full
- BASE_IMAGE=${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }}
+ BASE_IMAGE=${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ needs.setup.outputs.ingestion_base_change == 'true' && needs.setup.outputs.unique_tag || 'head' }}
publish: ${{ needs.setup.outputs.publish == 'true' || needs.setup.outputs.pr-publish == 'true' }}
context: .
file: ./docker/datahub-ingestion-base/Dockerfile
platforms: linux/amd64,linux/arm64/v8
- name: Compute DataHub Ingestion (Base-Full) Tag
id: tag
- run: echo "tag=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_full_tag || 'head' }}" >> $GITHUB_OUTPUT
+ run: echo "tag=${{ needs.setup.outputs.ingestion_base_change == 'true' && needs.setup.outputs.unique_full_tag || 'head' }}" >> $GITHUB_OUTPUT
datahub_ingestion_slim_build:
name: Build and Push DataHub Ingestion Docker Images
runs-on: ubuntu-latest
outputs:
tag: ${{ steps.tag.outputs.tag }}
- needs_artifact_download: ${{ (steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true') && ( needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true') }}
+ needs_artifact_download: ${{ needs.setup.outputs.ingestion_change == 'true' && ( needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true') }}
needs: [setup, datahub_ingestion_base_slim_build]
if: ${{ needs.setup.outputs.ingestion_change == 'true' || needs.setup.outputs.publish == 'true' }}
steps:
@@ -663,30 +646,22 @@ jobs:
- uses: gradle/gradle-build-action@v2
- name: Check out the repo
uses: acryldata/sane-checkout-action@v3
- - uses: dorny/paths-filter@v2
- id: filter
- with:
- filters: |
- datahub-ingestion-base:
- - 'docker/datahub-ingestion-base/**'
- datahub-ingestion:
- - 'docker/datahub-ingestion/**'
- name: Build codegen
- if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true' || needs.setup.outputs.publish }}
+ if: ${{ needs.setup.outputs.ingestion_change == 'true' || needs.setup.outputs.publish == 'true' }}
run: ./gradlew :metadata-ingestion:codegen
- name: Download Base Image
uses: ishworkh/docker-image-artifact-download@v1
- if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }}
+ if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && needs.setup.outputs.ingestion_base_change == 'true' }}
with:
- image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_slim_tag || 'head-slim' }}
+ image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ needs.setup.outputs.ingestion_base_change == 'true' && needs.setup.outputs.unique_slim_tag || 'head-slim' }}
- name: Login to DockerHub
uses: docker/login-action@v3
- if: ${{ needs.setup.outputs.docker-login == 'true' && needs.setup.outputs.publish == 'false' && needs.setup.outputs.pr-publish == 'false' && steps.filter.outputs.datahub-ingestion-base == 'false' }}
+ if: ${{ needs.setup.outputs.docker-login == 'true' && needs.setup.outputs.publish == 'false' && needs.setup.outputs.pr-publish == 'false' && needs.setup.outputs.ingestion_base_change == 'false' }}
with:
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
- name: Build and push Slim Image
- if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true' || needs.setup.outputs.publish }}
+ if: ${{ needs.setup.outputs.ingestion_change == 'true' || needs.setup.outputs.publish == 'true' }}
uses: ./.github/actions/docker-custom-build-and-push
with:
target: final
@@ -694,7 +669,7 @@ jobs:
${{ env.DATAHUB_INGESTION_IMAGE }}
build-args: |
BASE_IMAGE=${{ env.DATAHUB_INGESTION_BASE_IMAGE }}
- DOCKER_VERSION=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_slim_tag || 'head-slim' }}
+ DOCKER_VERSION=${{ needs.setup.outputs.ingestion_base_change == 'true' && needs.setup.outputs.unique_slim_tag || 'head-slim' }}
RELEASE_VERSION=${{ needs.setup.outputs.python_release_version }}
APP_ENV=slim
tags: ${{ needs.setup.outputs.slim_tag }}
@@ -706,7 +681,7 @@ jobs:
platforms: linux/amd64,linux/arm64/v8
- name: Compute Tag
id: tag
- run: echo "tag=${{ (steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true') && needs.setup.outputs.unique_slim_tag || 'head-slim' }}" >> $GITHUB_OUTPUT
+ run: echo "tag=${{ needs.setup.outputs.ingestion_change == 'true' && needs.setup.outputs.unique_slim_tag || 'head-slim' }}" >> $GITHUB_OUTPUT
datahub_ingestion_slim_scan:
permissions:
contents: read # for actions/checkout to fetch code
@@ -746,7 +721,7 @@ jobs:
runs-on: ubuntu-latest
outputs:
tag: ${{ steps.tag.outputs.tag }}
- needs_artifact_download: ${{ (steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true') && ( needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' ) }}
+ needs_artifact_download: ${{ needs.setup.outputs.ingestion_change == 'true' && ( needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' ) }}
needs: [setup, datahub_ingestion_base_full_build]
if: ${{ needs.setup.outputs.ingestion_change == 'true' || needs.setup.outputs.publish == 'true' }}
steps:
@@ -758,30 +733,22 @@ jobs:
- uses: gradle/gradle-build-action@v2
- name: Check out the repo
uses: acryldata/sane-checkout-action@v3
- - uses: dorny/paths-filter@v2
- id: filter
- with:
- filters: |
- datahub-ingestion-base:
- - 'docker/datahub-ingestion-base/**'
- datahub-ingestion:
- - 'docker/datahub-ingestion/**'
- name: Build codegen
- if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true' || needs.setup.outputs.publish }}
+ if: ${{ needs.setup.outputs.ingestion_change == 'true' || needs.setup.outputs.publish == 'true' }}
run: ./gradlew :metadata-ingestion:codegen
- name: Download Base Image
uses: ishworkh/docker-image-artifact-download@v1
- if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && steps.filter.outputs.datahub-ingestion-base == 'true' }}
+ if: ${{ needs.setup.outputs.publish != 'true' && needs.setup.outputs.pr-publish != 'true' && needs.setup.outputs.ingestion_base_change == 'true' }}
with:
- image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }}
+ image: ${{ env.DATAHUB_INGESTION_BASE_IMAGE }}:${{ needs.setup.outputs.ingestion_base_change == 'true' && needs.setup.outputs.unique_tag || 'head' }}
- name: Login to DockerHub
uses: docker/login-action@v3
- if: ${{ needs.setup.outputs.docker-login == 'true' && needs.setup.outputs.publish == 'false' && needs.setup.outputs.pr-publish == 'false' && steps.filter.outputs.datahub-ingestion-base == 'false' }}
+ if: ${{ needs.setup.outputs.docker-login == 'true' && needs.setup.outputs.publish == 'false' && needs.setup.outputs.pr-publish == 'false' && needs.setup.outputs.ingestion_base_change == 'false' }}
with:
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
password: ${{ secrets.ACRYL_DOCKER_PASSWORD }}
- name: Build and push Full Image
- if: ${{ steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true' || needs.setup.outputs.publish }}
+ if: ${{ needs.setup.outputs.ingestion_change == 'true' || needs.setup.outputs.publish == 'true' }}
uses: ./.github/actions/docker-custom-build-and-push
with:
target: final
@@ -789,7 +756,7 @@ jobs:
${{ env.DATAHUB_INGESTION_IMAGE }}
build-args: |
BASE_IMAGE=${{ env.DATAHUB_INGESTION_BASE_IMAGE }}
- DOCKER_VERSION=${{ steps.filter.outputs.datahub-ingestion-base == 'true' && needs.setup.outputs.unique_tag || 'head' }}
+ DOCKER_VERSION=${{ needs.setup.outputs.ingestion_base_change == 'true' && needs.setup.outputs.unique_tag || 'head' }}
RELEASE_VERSION=${{ needs.setup.outputs.python_release_version }}
tags: ${{ needs.setup.outputs.tag }}
username: ${{ secrets.ACRYL_DOCKER_USERNAME }}
@@ -800,7 +767,7 @@ jobs:
platforms: linux/amd64,linux/arm64/v8
- name: Compute Tag (Full)
id: tag
- run: echo "tag=${{ (steps.filter.outputs.datahub-ingestion-base == 'true' || steps.filter.outputs.datahub-ingestion == 'true') && needs.setup.outputs.unique_tag || 'head' }}" >> $GITHUB_OUTPUT
+ run: echo "tag=${{ needs.setup.outputs.ingestion_change == 'true' && needs.setup.outputs.unique_tag || 'head' }}" >> $GITHUB_OUTPUT
datahub_ingestion_full_scan:
permissions:
contents: read # for actions/checkout to fetch code
From e62e4b18c0b18d382d879d0a1e594dc3111d194c Mon Sep 17 00:00:00 2001
From: Harshal Sheth
Date: Thu, 6 Jun 2024 15:54:32 -0700
Subject: [PATCH 4/4] fix(ingest/snowflake): avoid overfetching schemas from
datahub (#10527)
---
.../ingestion/source/snowflake/snowflake_v2.py | 15 +++++++++++++--
1 file changed, 13 insertions(+), 2 deletions(-)
diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py
index 9bb6226b4947a..f155ac24fea3f 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py
@@ -254,7 +254,18 @@ def __init__(self, ctx: PipelineContext, config: SnowflakeV2Config):
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
- graph=self.ctx.graph,
+ graph=(
+ # If we're ingestion schema metadata for tables/views, then we will populate
+ # schemas into the resolver as we go. We only need to do a bulk fetch
+ # if we're not ingesting schema metadata as part of ingestion.
+ self.ctx.graph
+ if not (
+ self.config.include_technical_schema
+ and self.config.include_tables
+ and self.config.include_views
+ )
+ else None
+ ),
generate_usage_statistics=False,
generate_operations=False,
format_queries=self.config.format_sql_queries,
@@ -1252,7 +1263,7 @@ def gen_schema_metadata(
foreignKeys=foreign_keys,
)
- if self.aggregator and self.config.parse_view_ddl:
+ if self.aggregator:
self.aggregator.register_schema(urn=dataset_urn, schema=schema_metadata)
return schema_metadata