Skip to content

Commit

Permalink
feat(entity-client): batch entity-client ingestProposals
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker committed Nov 5, 2024
1 parent e706d15 commit 05be5ff
Show file tree
Hide file tree
Showing 17 changed files with 411 additions and 206 deletions.
13 changes: 8 additions & 5 deletions datahub-frontend/app/auth/AuthModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.metadata.models.registry.EmptyEntityRegistry;
Expand Down Expand Up @@ -213,11 +214,13 @@ protected SystemEntityClient provideEntityClient(

return new SystemRestliEntityClient(
buildRestliClient(),
new ExponentialBackoff(configs.getInt(ENTITY_CLIENT_RETRY_INTERVAL)),
configs.getInt(ENTITY_CLIENT_NUM_RETRIES),
configurationProvider.getCache().getClient().getEntityClient(),
Math.max(1, configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE)),
Math.max(1, configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_CONCURRENCY)));
EntityClientConfig.builder()
.backoffPolicy(new ExponentialBackoff(configs.getInt(ENTITY_CLIENT_RETRY_INTERVAL)))
.retryCount(configs.getInt(ENTITY_CLIENT_NUM_RETRIES))
.batchGetV2Size(configs.getInt(ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE))
.batchGetV2Concurrency(2)
.build(),
configurationProvider.getCache().getClient().getEntityClient());
}

@Provides
Expand Down
2 changes: 2 additions & 0 deletions li-utils/src/main/java/com/linkedin/metadata/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -464,5 +464,7 @@ public class Constants {
public static final String MDC_ENTITY_TYPE = "entityType";
public static final String MDC_CHANGE_TYPE = "changeType";

public static final String RESTLI_SUCCESS = "success";

private Constants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.testng.AssertJUnit.assertNotNull;
import static org.testng.AssertJUnit.assertTrue;

import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.kafka.MCLKafkaListenerRegistrar;
import com.linkedin.metadata.kafka.hook.UpdateIndicesHook;
Expand Down Expand Up @@ -36,6 +37,12 @@ public class MCLMAESpringTest extends AbstractTestNGSpringContextTests {

@Autowired private UpdateIndicesService updateIndicesService;

static {
PathSpecBasedSchemaAnnotationVisitor.class
.getClassLoader()
.setClassAssertionStatus(PathSpecBasedSchemaAnnotationVisitor.class.getName(), false);
}

@Test
public void testHooks() {
MCLKafkaListenerRegistrar registrar =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.datahub.authentication.Authentication;
import com.datahub.metadata.ingestion.IngestionScheduler;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.gms.factory.plugins.SpringStandardPluginConfiguration;
import com.linkedin.metadata.boot.kafka.DataHubUpgradeKafkaListener;
Expand Down Expand Up @@ -58,6 +59,11 @@ public class MCLSpringCommonTestConfiguration {

@MockBean public IngestionScheduler ingestionScheduler;

@Bean
public EntityClientConfig entityClientConfig() {
return EntityClientConfig.builder().build();
}

@MockBean(name = "systemEntityClient")
public SystemEntityClient systemEntityClient;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.metadata.kafka;

import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.gms.factory.auth.SystemAuthenticationFactory;
Expand Down Expand Up @@ -39,16 +40,25 @@ public class MceConsumerApplicationTestConfiguration {
@Bean
@Primary
public SystemEntityClient systemEntityClient(
@Qualifier("configurationProvider") final ConfigurationProvider configurationProvider) {
@Qualifier("configurationProvider") final ConfigurationProvider configurationProvider,
final EntityClientConfig entityClientConfig) {
String selfUri = restTemplate.getRootUri();
final Client restClient = DefaultRestliClientFactory.getRestLiClient(URI.create(selfUri), null);
return new SystemRestliEntityClient(
restClient,
new ExponentialBackoff(1),
1,
configurationProvider.getCache().getClient().getEntityClient(),
1,
2);
entityClientConfig,
configurationProvider.getCache().getClient().getEntityClient());
}

@Bean
@Primary
public EntityClientConfig entityClientConfig() {
return EntityClientConfig.builder()
.backoffPolicy(new ExponentialBackoff(1))
.retryCount(1)
.batchGetV2Size(1)
.batchGetV2Concurrency(2)
.build();
}

@MockBean public Database ebeanServer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,10 +457,19 @@ entityClient:
java:
get:
batchSize: ${ENTITY_CLIENT_JAVA_GET_BATCH_SIZE:375} # matches EbeanAspectDao batch size
ingest:
batchSize: ${ENTITY_CLIENT_JAVA_INGEST_BATCH_SIZE:375}
restli:
get:
batchSize: ${ENTITY_CLIENT_RESTLI_GET_BATCH_SIZE:100} # limited to prevent exceeding restli URI size limit
batchConcurrency: ${ENTITY_CLIENT_RESTLI_GET_BATCH_CONCURRENCY:2} # parallel threads
batchQueueSize: ${ENTITY_CLIENT_RESTLI_GET_BATCH_QUEUE_SIZE:500}
batchThreadKeepAlive: ${ENTITY_CLIENT_RESTLI_GET_BATCH_THREAD_KEEP_ALIVE:60}
ingest:
batchSize: ${ENTITY_CLIENT_RESTLI_INGEST_BATCH_SIZE:50} # limited to prevent exceeding restli timeouts
batchConcurrency: ${ENTITY_CLIENT_RESTLI_INGEST_BATCH_CONCURRENCY:2} # parallel threads
batchQueueSize: ${ENTITY_CLIENT_RESTLI_INGEST_BATCH_QUEUE_SIZE:500}
batchThreadKeepAlive: ${ENTITY_CLIENT_RESTLI_INGEST_BATCH_THREAD_KEEP_ALIVE:60}

usageClient:
retryInterval: ${USAGE_CLIENT_RETRY_INTERVAL:2}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package com.linkedin.gms.factory.entityclient;

import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig;
import com.linkedin.parseq.retry.backoff.ExponentialBackoff;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

Expand All @@ -14,4 +17,30 @@ public EntityClientCacheConfig entityClientCacheConfig(
@Qualifier("configurationProvider") final ConfigurationProvider configurationProvider) {
return configurationProvider.getCache().getClient().getEntityClient();
}

@Bean
public EntityClientConfig entityClientConfig(
final @Value("${entityClient.retryInterval:2}") int retryInterval,
final @Value("${entityClient.numRetries:3}") int numRetries,
final @Value("${entityClient.restli.get.batchSize}") int batchGetV2Size,
final @Value("${entityClient.restli.get.batchConcurrency}") int batchGetV2Concurrency,
final @Value("${entityClient.restli.get.batchQueueSize}") int batchGetV2QueueSize,
final @Value("${entityClient.restli.get.batchThreadKeepAlive}") int batchGetV2KeepAlive,
final @Value("${entityClient.restli.ingest.batchSize}") int batchIngestSize,
final @Value("${entityClient.restli.ingest.batchConcurrency}") int batchIngestConcurrency,
final @Value("${entityClient.restli.ingest.batchQueueSize}") int batchIngestQueueSize,
final @Value("${entityClient.restli.ingest.batchThreadKeepAlive}") int batchIngestKeepAlive) {
return EntityClientConfig.builder()
.backoffPolicy(new ExponentialBackoff(retryInterval))
.retryCount(numRetries)
.batchGetV2Size(batchGetV2Size)
.batchGetV2Concurrency(batchGetV2Concurrency)
.batchGetV2QueueSize(batchGetV2QueueSize)
.batchGetV2KeepAlive(batchGetV2KeepAlive)
.batchIngestSize(batchIngestSize)
.batchIngestConcurrency(batchIngestConcurrency)
.batchIngestQueueSize(batchIngestQueueSize)
.batchIngestKeepAlive(batchIngestKeepAlive)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.gms.factory.entityclient;

import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.metadata.client.JavaEntityClient;
import com.linkedin.metadata.client.SystemJavaEntityClient;
Expand All @@ -16,7 +17,6 @@
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import javax.inject.Singleton;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -39,7 +39,7 @@ public EntityClient entityClient(
final @Qualifier("relationshipSearchService") LineageSearchService _lineageSearchService,
final @Qualifier("kafkaEventProducer") EventProducer _eventProducer,
final RollbackService rollbackService,
final @Value("${entityClient.restli.get.batchSize:375}") int batchGetV2Size) {
final EntityClientConfig entityClientConfig) {
return new JavaEntityClient(
_entityService,
_deleteEntityService,
Expand All @@ -50,7 +50,7 @@ public EntityClient entityClient(
_timeseriesAspectService,
rollbackService,
_eventProducer,
batchGetV2Size);
entityClientConfig.getBatchGetV2Size());
}

@Bean("systemEntityClient")
Expand All @@ -67,7 +67,7 @@ public SystemEntityClient systemEntityClient(
final @Qualifier("kafkaEventProducer") EventProducer _eventProducer,
final RollbackService rollbackService,
final EntityClientCacheConfig entityClientCacheConfig,
final @Value("${entityClient.restli.get.batchSize:375}") int batchGetV2Size) {
final EntityClientConfig entityClientConfig) {
return new SystemJavaEntityClient(
_entityService,
_deleteEntityService,
Expand All @@ -79,6 +79,6 @@ public SystemEntityClient systemEntityClient(
rollbackService,
_eventProducer,
entityClientCacheConfig,
batchGetV2Size);
entityClientConfig.getBatchGetV2Size());
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.linkedin.gms.factory.entityclient;

import com.linkedin.entity.client.EntityClient;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.entity.client.RestliEntityClient;
import com.linkedin.entity.client.SystemEntityClient;
import com.linkedin.entity.client.SystemRestliEntityClient;
import com.linkedin.metadata.config.cache.client.EntityClientCacheConfig;
import com.linkedin.metadata.restli.DefaultRestliClientFactory;
import com.linkedin.parseq.retry.backoff.ExponentialBackoff;
import com.linkedin.restli.client.Client;
import java.net.URI;
import javax.inject.Singleton;
Expand All @@ -28,23 +28,15 @@ public EntityClient entityClient(
@Value("${datahub.gms.useSSL}") boolean gmsUseSSL,
@Value("${datahub.gms.uri}") String gmsUri,
@Value("${datahub.gms.sslContext.protocol}") String gmsSslProtocol,
@Value("${entityClient.retryInterval:2}") int retryInterval,
@Value("${entityClient.numRetries:3}") int numRetries,
final @Value("${entityClient.restli.get.batchSize}") int batchGetV2Size,
final @Value("${entityClient.restli.get.batchConcurrency}") int batchGetV2Concurrency) {
final EntityClientConfig entityClientConfig) {
final Client restClient;
if (gmsUri != null) {
restClient = DefaultRestliClientFactory.getRestLiClient(URI.create(gmsUri), gmsSslProtocol);
} else {
restClient =
DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol);
}
return new RestliEntityClient(
restClient,
new ExponentialBackoff(retryInterval),
numRetries,
batchGetV2Size,
batchGetV2Concurrency);
return new RestliEntityClient(restClient, entityClientConfig);
}

@Bean("systemEntityClient")
Expand All @@ -55,11 +47,8 @@ public SystemEntityClient systemEntityClient(
@Value("${datahub.gms.useSSL}") boolean gmsUseSSL,
@Value("${datahub.gms.uri}") String gmsUri,
@Value("${datahub.gms.sslContext.protocol}") String gmsSslProtocol,
@Value("${entityClient.retryInterval:2}") int retryInterval,
@Value("${entityClient.numRetries:3}") int numRetries,
final EntityClientCacheConfig entityClientCacheConfig,
final @Value("${entityClient.restli.get.batchSize}") int batchGetV2Size,
final @Value("${entityClient.restli.get.batchConcurrency}") int batchGetV2Concurrency) {
final EntityClientConfig entityClientConfig) {

final Client restClient;
if (gmsUri != null) {
Expand All @@ -68,12 +57,6 @@ public SystemEntityClient systemEntityClient(
restClient =
DefaultRestliClientFactory.getRestLiClient(gmsHost, gmsPort, gmsUseSSL, gmsSslProtocol);
}
return new SystemRestliEntityClient(
restClient,
new ExponentialBackoff(retryInterval),
numRetries,
entityClientCacheConfig,
batchGetV2Size,
batchGetV2Concurrency);
return new SystemRestliEntityClient(restClient, entityClientConfig, entityClientCacheConfig);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import com.datahub.authentication.Authentication;
import com.linkedin.common.callback.FutureCallback;
import com.linkedin.entity.client.EntityClientConfig;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.parseq.retry.backoff.BackoffPolicy;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.restli.client.AbstractRequestBuilder;
import com.linkedin.restli.client.Client;
Expand All @@ -19,17 +19,15 @@
@Slf4j
public abstract class BaseClient implements AutoCloseable {

protected final Client _client;
protected final BackoffPolicy _backoffPolicy;
protected final int _retryCount;
protected final Client client;
protected final EntityClientConfig entityClientConfig;

protected static final Set<String> NON_RETRYABLE =
Set.of("com.linkedin.data.template.RequiredFieldNotPresentException");

protected BaseClient(@Nonnull Client restliClient, BackoffPolicy backoffPolicy, int retryCount) {
_client = Objects.requireNonNull(restliClient);
_backoffPolicy = backoffPolicy;
_retryCount = retryCount;
protected BaseClient(@Nonnull Client restliClient, EntityClientConfig entityClientConfig) {
client = Objects.requireNonNull(restliClient);
this.entityClientConfig = entityClientConfig;
}

protected <T> Response<T> sendClientRequest(
Expand All @@ -52,9 +50,9 @@ protected <T> Response<T> sendClientRequest(

int attemptCount = 0;

while (attemptCount < _retryCount + 1) {
while (attemptCount < entityClientConfig.getRetryCount() + 1) {
try {
return _client.sendRequest(requestBuilder.build()).getResponse();
return client.sendRequest(requestBuilder.build()).getResponse();
} catch (Throwable ex) {
MetricUtils.counter(
BaseClient.class,
Expand All @@ -66,12 +64,13 @@ protected <T> Response<T> sendClientRequest(
|| (ex.getCause() != null
&& NON_RETRYABLE.contains(ex.getCause().getClass().getCanonicalName()));

if (attemptCount == _retryCount || skipRetry) {
if (attemptCount == entityClientConfig.getRetryCount() || skipRetry) {
throw ex;
} else {
attemptCount = attemptCount + 1;
try {
Thread.sleep(_backoffPolicy.nextBackoff(attemptCount, ex) * 1000);
Thread.sleep(
entityClientConfig.getBackoffPolicy().nextBackoff(attemptCount, ex) * 1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
Expand All @@ -84,6 +83,6 @@ protected <T> Response<T> sendClientRequest(

@Override
public void close() {
_client.shutdown(new FutureCallback<>());
client.shutdown(new FutureCallback<>());
}
}
Loading

0 comments on commit 05be5ff

Please sign in to comment.