Skip to content

Commit

Permalink
Merge pull request #284 from acryldata/ab-merge-upstream-2024-05-07
Browse files Browse the repository at this point in the history
Merge upstream 2024 05 07
  • Loading branch information
anshbansal authored May 8, 2024
2 parents 9508c47 + 313d42b commit c044257
Show file tree
Hide file tree
Showing 146 changed files with 10,651 additions and 7,497 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/docker-unified.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ jobs:
if: ${{ steps.ci-optimize.outputs.smoke-test-change == 'true' }}
run: |
python ./.github/scripts/check_python_package.py
./gradlew :smoke-test:lint
./gradlew :smoke-test:pythonLint
./gradlew :smoke-test:cypressLint
gms_build:
name: Build and Push DataHub GMS Docker Image
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ buildscript {
ext.hazelcastVersion = '5.3.6'
ext.ebeanVersion = '12.16.1'
ext.googleJavaFormatVersion = '1.18.1'
ext.openLineageVersion = '1.5.0'
ext.openLineageVersion = '1.13.1'
ext.logbackClassicJava8 = '1.2.12'

ext.docker_registry = 'acryldata'
Expand Down
2 changes: 2 additions & 0 deletions datahub-frontend/app/auth/sso/oidc/OidcCallbackLogic.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ private Result handleOidcCallback(
"Failed to perform post authentication steps. Error message: %s", e.getMessage()));
}

log.info("OIDC callback authentication successful for user: {}", userName);

// Successfully logged in - Generate GMS login token
final String accessToken = authClient.generateSessionTokenForUser(corpUserUrn.getId());
return result
Expand Down
6 changes: 4 additions & 2 deletions datahub-frontend/app/client/AuthServiceClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ public String generateSessionTokenForUser(@Nonnull final String userId) {
CloseableHttpResponse response = null;

try {

final String protocol = this.metadataServiceUseSsl ? "https" : "http";
final HttpPost request =
new HttpPost(
Expand All @@ -86,6 +85,8 @@ public String generateSessionTokenForUser(@Nonnull final String userId) {
this.metadataServicePort,
GENERATE_SESSION_TOKEN_ENDPOINT));

log.info("Requesting session token for user: {}", userId);

// Build JSON request to generate a token on behalf of a user.
final ObjectMapper objectMapper = new ObjectMapper();
final ObjectNode objectNode = objectMapper.createObjectNode();
Expand All @@ -100,7 +101,7 @@ public String generateSessionTokenForUser(@Nonnull final String userId) {
response = httpClient.execute(request);
final HttpEntity entity = response.getEntity();
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK && entity != null) {
// Successfully generated a token for the User
log.info("Successfully received session token for user: {}", userId);
final String jsonStr = EntityUtils.toString(entity);
return getAccessTokenFromJson(jsonStr);
} else {
Expand All @@ -110,6 +111,7 @@ public String generateSessionTokenForUser(@Nonnull final String userId) {
response.getStatusLine().toString(), response.getEntity().toString()));
}
} catch (Exception e) {
log.error("Failed to generate session token for user: {}", userId, e);
throw new RuntimeException("Failed to generate session token for user", e);
} finally {
try {
Expand Down
12 changes: 7 additions & 5 deletions datahub-frontend/app/controllers/AuthenticationController.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import play.mvc.Results;
import security.AuthenticationManager;

// TODO add logging.
public class AuthenticationController extends Controller {
public static final String AUTH_VERBOSE_LOGGING = "auth.verbose.logging";
private static final String AUTH_REDIRECT_URI_PARAM = "redirect_uri";
Expand Down Expand Up @@ -183,10 +182,12 @@ public Result logIn(Http.Request request) {
boolean loginSucceeded = tryLogin(username, password);

if (!loginSucceeded) {
_logger.info("Login failed for user: {}", username);
return Results.badRequest(invalidCredsJson);
}

final Urn actorUrn = new CorpuserUrn(username);
_logger.info("Login successful for user: {}, urn: {}", username, actorUrn);
final String accessToken = _authClient.generateSessionTokenForUser(actorUrn.getId());
return createSession(actorUrn.toString(), accessToken);
}
Expand Down Expand Up @@ -250,6 +251,7 @@ public Result signUp(Http.Request request) {
final Urn userUrn = new CorpuserUrn(email);
final String userUrnString = userUrn.toString();
_authClient.signUp(userUrnString, fullName, email, title, password, inviteToken);
_logger.info("Signed up user {} using invite tokens", userUrnString);
final String accessToken = _authClient.generateSessionTokenForUser(userUrn.getId());
return createSession(userUrnString, accessToken);
}
Expand Down Expand Up @@ -351,15 +353,15 @@ private boolean tryLogin(String username, String password) {
// First try jaas login, if enabled
if (_jaasConfigs.isJAASEnabled()) {
try {
_logger.debug("Attempting jaas authentication");
_logger.debug("Attempting JAAS authentication for user: {}", username);
AuthenticationManager.authenticateJaasUser(username, password);
_logger.debug("Jaas authentication successful. Login succeeded");
_logger.debug("JAAS authentication successful. Login succeeded");
loginSucceeded = true;
} catch (Exception e) {
if (_verbose) {
_logger.debug("Jaas authentication error. Login failed", e);
_logger.debug("JAAS authentication error. Login failed", e);
} else {
_logger.debug("Jaas authentication error. Login failed");
_logger.debug("JAAS authentication error. Login failed");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.datahub.upgrade.system.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
Expand All @@ -15,13 +16,14 @@ public class ReindexDataJobViaNodesCLLConfig {

@Bean
public NonBlockingSystemUpgrade reindexDataJobViaNodesCLL(
final OperationContext opContext,
final EntityService<?> entityService,
final AspectDao aspectDao,
@Value("${systemUpdate.dataJobNodeCLL.enabled}") final boolean enabled,
@Value("${systemUpdate.dataJobNodeCLL.batchSize}") final Integer batchSize,
@Value("${systemUpdate.dataJobNodeCLL.delayMs}") final Integer delayMs,
@Value("${systemUpdate.dataJobNodeCLL.limit}") final Integer limit) {
return new ReindexDataJobViaNodesCLL(
entityService, aspectDao, enabled, batchSize, delayMs, limit);
opContext, entityService, aspectDao, enabled, batchSize, delayMs, limit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package com.linkedin.datahub.upgrade.system;

import static com.linkedin.metadata.Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME;

import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.EntityUtils;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

/**
* Generic upgrade step class for generating MCLs for a given aspect in order to update ES documents
*/
@Slf4j
public abstract class AbstractMCLStep implements UpgradeStep {
private final OperationContext opContext;
private final EntityService<?> entityService;
private final AspectDao aspectDao;

private final int batchSize;
private final int batchDelayMs;
private final int limit;

public AbstractMCLStep(
OperationContext opContext,
EntityService<?> entityService,
AspectDao aspectDao,
Integer batchSize,
Integer batchDelayMs,
Integer limit) {
this.opContext = opContext;
this.entityService = entityService;
this.aspectDao = aspectDao;
this.batchSize = batchSize;
this.batchDelayMs = batchDelayMs;
this.limit = limit;
}

@Nonnull
protected abstract String getAspectName();

protected Urn getUpgradeIdUrn() {
return BootstrapStep.getUpgradeUrn(id());
}

/** Optionally apply an urn-like sql filter, otherwise all urns */
@Nullable
protected abstract String getUrnLike();

@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {

// re-using for configuring the sql scan
RestoreIndicesArgs args =
new RestoreIndicesArgs().aspectName(getAspectName()).batchSize(batchSize).limit(limit);

if (getUrnLike() != null) {
args = args.urnLike(getUrnLike());
}

aspectDao
.streamAspectBatches(args)
.forEach(
batch -> {
log.info("Processing batch({}) of size {}.", getAspectName(), batchSize);

List<Pair<Future<?>, Boolean>> futures =
EntityUtils.toSystemAspectFromEbeanAspects(
opContext.getRetrieverContext().get(),
batch.collect(Collectors.toList()))
.stream()
.map(
systemAspect ->
entityService.alwaysProduceMCLAsync(
opContext,
systemAspect.getUrn(),
systemAspect.getUrn().getEntityType(),
getAspectName(),
systemAspect.getAspectSpec(),
null,
systemAspect.getRecordTemplate(),
null,
systemAspect
.getSystemMetadata()
.setRunId(id())
.setLastObserved(System.currentTimeMillis()),
AuditStampUtils.createDefaultAuditStamp(),
ChangeType.UPSERT))
.collect(Collectors.toList());

futures.forEach(
f -> {
try {
f.getFirst().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

if (batchDelayMs > 0) {
log.info("Sleeping for {} ms", batchDelayMs);
try {
Thread.sleep(batchDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});

entityService
.streamRestoreIndices(opContext, args, x -> context.report().addLine((String) x))
.forEach(
result -> {
context.report().addLine("Rows migrated: " + result.rowsMigrated);
context.report().addLine("Rows ignored: " + result.ignored);
});

BootstrapStep.setUpgradeResult(opContext, getUpgradeIdUrn(), entityService);
context.report().addLine("State updated: " + getUpgradeIdUrn());

return new DefaultUpgradeStepResult(id(), UpgradeStepResult.Result.SUCCEEDED);
};
}

@Override
/** Returns whether the upgrade should be skipped. */
public boolean skip(UpgradeContext context) {
boolean previouslyRun =
entityService.exists(
opContext, getUpgradeIdUrn(), DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true);
if (previouslyRun) {
log.info("{} was already run. Skipping.", id());
}
return previouslyRun;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;

/**
Expand All @@ -18,6 +20,7 @@ public class ReindexDataJobViaNodesCLL implements NonBlockingSystemUpgrade {
private final List<UpgradeStep> _steps;

public ReindexDataJobViaNodesCLL(
@Nonnull OperationContext opContext,
EntityService<?> entityService,
AspectDao aspectDao,
boolean enabled,
Expand All @@ -28,7 +31,7 @@ public ReindexDataJobViaNodesCLL(
_steps =
ImmutableList.of(
new ReindexDataJobViaNodesCLLStep(
entityService, aspectDao, batchSize, batchDelayMs, limit));
opContext, entityService, aspectDao, batchSize, batchDelayMs, limit));
} else {
_steps = ImmutableList.of();
}
Expand Down
Loading

0 comments on commit c044257

Please sign in to comment.