Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored May 9, 2024
2 parents 4fe9ef4 + a86ec7b commit 14ce5de
Show file tree
Hide file tree
Showing 37 changed files with 2,035 additions and 223 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/airflow-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ jobs:
- python-version: "3.10"
extra_pip_requirements: 'apache-airflow==2.8.1 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt'
extra_pip_extras: plugin-v2
- python-version: "3.10"
extra_pip_requirements: 'apache-airflow==2.9.0 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.0/constraints-3.10.txt'
extra_pip_extras: plugin-v2
fail-fast: false
steps:
- name: Set up JDK 17
Expand Down
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ project.ext.externalDependency = [
'jettison': 'org.codehaus.jettison:jettison:1.5.4',
'jgrapht': 'org.jgrapht:jgrapht-core:1.5.1',
'jna': 'net.java.dev.jna:jna:5.12.1',
'jsonPatch': 'com.github.java-json-tools:json-patch:1.13',
'jsonPatch': 'jakarta.json:jakarta.json-api:2.1.3',
'jsonPathImpl': 'org.eclipse.parsson:parsson:1.1.6',
'jsonSimple': 'com.googlecode.json-simple:json-simple:1.1.1',
'jsonSmart': 'net.minidev:json-smart:2.4.9',
'json': 'org.json:json:20231013',
Expand Down
6 changes: 5 additions & 1 deletion entity-registry/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ dependencies {
implementation externalDependency.jacksonDataBind
implementation externalDependency.jacksonDataFormatYaml
implementation externalDependency.reflections
api externalDependency.jsonPatch

implementation externalDependency.jsonPatch
implementation externalDependency.jsonPathImpl

constraints {
implementation(externalDependency.snakeYaml) {
because("previous versions are vulnerable to CVE-2022-25857")
Expand All @@ -28,6 +31,7 @@ dependencies {

testImplementation project(':test-models')
testImplementation project(path: ':test-models', configuration: 'testDataTemplate')
testImplementation project(':metadata-utils')
testImplementation externalDependency.testng
testImplementation externalDependency.mockito
testImplementation externalDependency.mockitoInline
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.linkedin.metadata.aspect.batch;

import com.github.fge.jsonpatch.Patch;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.aspect.AspectRetriever;
import jakarta.json.JsonPatch;

/**
* A change proposal represented as a patch to an exiting stored object in the primary data store.
Expand All @@ -17,5 +17,5 @@ public interface PatchMCP extends MCPItem {
*/
ChangeMCP applyPatch(RecordTemplate recordTemplate, AspectRetriever aspectRetriever);

Patch getPatch();
JsonPatch getPatch();
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package com.linkedin.metadata.aspect.patch;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.JsonNode;
import com.github.fge.jsonpatch.JsonPatch;
import java.io.IOException;
import com.linkedin.util.Pair;
import jakarta.json.Json;
import jakarta.json.JsonArrayBuilder;
import jakarta.json.JsonPatch;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
Expand All @@ -21,15 +24,35 @@
public class GenericJsonPatch {
@Nullable private Map<String, List<String>> arrayPrimaryKeys;

@Nonnull private JsonNode patch;
@Nonnull private List<PatchOp> patch;

@Nonnull
public Map<String, List<String>> getArrayPrimaryKeys() {
return arrayPrimaryKeys == null ? Collections.emptyMap() : arrayPrimaryKeys;
}

@JsonIgnore
public JsonPatch getJsonPatch() throws IOException {
return JsonPatch.fromJson(patch);
public JsonPatch getJsonPatch() {
JsonArrayBuilder arrayBuilder = Json.createArrayBuilder();
patch.forEach(op -> arrayBuilder.add(Json.createObjectBuilder(op.toMap())));
return Json.createPatch(arrayBuilder.build());
}

@Data
@NoArgsConstructor
public static class PatchOp {
@Nonnull private String op;
@Nonnull private String path;
@Nullable private Object value;

public Map<String, ?> toMap() {
if (value != null) {
return Stream.of(Pair.of("op", op), Pair.of("path", path), Pair.of("value", value))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
} else {
return Stream.of(Pair.of("op", op), Pair.of("path", path))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
import static com.linkedin.metadata.Constants.UPSTREAM_LINEAGE_ASPECT_NAME;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.github.fge.jsonpatch.JsonPatchException;
import com.github.fge.jsonpatch.Patch;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.models.AspectSpec;
import jakarta.json.JsonPatch;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -75,12 +74,11 @@ public RecordTemplate getDefaultTemplate(String aspectSpecName) {
* @param aspectSpec aspectSpec of the template
* @return a {@link RecordTemplate} with the patch applied
* @throws JsonProcessingException if there is an issue with processing the record template's json
* @throws JsonPatchException if there is an issue with applying the json patch
*/
@Nonnull
public <T extends RecordTemplate> RecordTemplate applyPatch(
RecordTemplate recordTemplate, Patch jsonPatch, AspectSpec aspectSpec)
throws JsonProcessingException, JsonPatchException {
RecordTemplate recordTemplate, JsonPatch jsonPatch, AspectSpec aspectSpec)
throws JsonProcessingException {
Template<T> template = getTemplate(aspectSpec);
return template.applyPatch(recordTemplate, jsonPatch);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,29 @@
package com.linkedin.metadata.aspect.patch.template;

import static com.linkedin.metadata.aspect.patch.template.TemplateUtil.OBJECT_MAPPER;
import static com.linkedin.metadata.aspect.patch.template.TemplateUtil.populateTopLevelKeys;

import com.datahub.util.RecordUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.github.fge.jsonpatch.JsonPatchException;
import com.github.fge.jsonpatch.Patch;
import com.linkedin.data.template.RecordTemplate;
import jakarta.json.Json;
import jakarta.json.JsonObject;
import jakarta.json.JsonPatch;
import java.io.StringReader;

public abstract class CompoundKeyTemplate<T extends RecordTemplate>
implements ArrayMergingTemplate<T> {

@Override
public T applyPatch(RecordTemplate recordTemplate, Patch jsonPatch)
throws JsonProcessingException, JsonPatchException {
public T applyPatch(RecordTemplate recordTemplate, JsonPatch jsonPatch)
throws JsonProcessingException {
JsonNode transformed = populateTopLevelKeys(preprocessTemplate(recordTemplate), jsonPatch);
JsonNode patched = jsonPatch.apply(transformed);
JsonNode postProcessed = rebaseFields(patched);
JsonObject patched =
jsonPatch.apply(
Json.createReader(new StringReader(OBJECT_MAPPER.writeValueAsString(transformed)))
.readObject());
JsonNode postProcessed = rebaseFields(OBJECT_MAPPER.readTree(patched.toString()));
return RecordUtils.toRecordTemplate(getTemplateType(), postProcessed.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import com.datahub.util.RecordUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.github.fge.jsonpatch.JsonPatchException;
import com.github.fge.jsonpatch.Patch;
import com.linkedin.data.template.RecordTemplate;
import jakarta.json.Json;
import jakarta.json.JsonObject;
import jakarta.json.JsonPatch;
import java.io.StringReader;
import javax.annotation.Nonnull;

public interface Template<T extends RecordTemplate> {
Expand Down Expand Up @@ -45,18 +47,21 @@ default T getSubtype(RecordTemplate recordTemplate) throws ClassCastException {
* @param jsonPatch patch to apply
* @return patched value
* @throws JsonProcessingException if there is an issue converting the input to JSON
* @throws JsonPatchException if there is an issue applying the patch
*/
default T applyPatch(RecordTemplate recordTemplate, Patch jsonPatch)
throws JsonProcessingException, JsonPatchException {

default T applyPatch(RecordTemplate recordTemplate, JsonPatch jsonPatch)
throws JsonProcessingException {
TemplateUtil.validatePatch(jsonPatch);

JsonNode transformed = populateTopLevelKeys(preprocessTemplate(recordTemplate), jsonPatch);
try {
JsonNode patched = jsonPatch.apply(transformed);
JsonNode postProcessed = rebaseFields(patched);
// Hack in a more efficient patcher. Even with the serialization overhead 140% faster
JsonObject patched =
jsonPatch.apply(
Json.createReader(new StringReader(OBJECT_MAPPER.writeValueAsString(transformed)))
.readObject());
JsonNode postProcessed = rebaseFields(OBJECT_MAPPER.readTree(patched.toString()));
return RecordUtils.toRecordTemplate(getTemplateType(), postProcessed.toString());
} catch (JsonPatchException e) {
} catch (JsonProcessingException e) {
throw new RuntimeException(
String.format(
"Error performing JSON PATCH on aspect %s. Patch: %s Target: %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.fge.jsonpatch.Patch;
import com.linkedin.metadata.aspect.patch.PatchOperationType;
import com.linkedin.util.Pair;
import jakarta.json.JsonPatch;
import jakarta.json.JsonValue;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -30,34 +31,32 @@ private TemplateUtil() {}
.setStreamReadConstraints(StreamReadConstraints.builder().maxStringLength(maxSize).build());
}

public static List<Pair<PatchOperationType, String>> getPaths(Patch jsonPatch) {
JsonNode patchNode = OBJECT_MAPPER.valueToTree(jsonPatch);
public static List<Pair<PatchOperationType, String>> getPaths(JsonPatch jsonPatch) {
List<Pair<PatchOperationType, String>> paths = new ArrayList<>();
patchNode
.elements()
.forEachRemaining(
jsonPatch.toJsonArray().stream()
.map(JsonValue::asJsonObject)
.forEach(
node ->
paths.add(
Pair.of(
PatchOperationType.valueOf(node.get("op").asText().toUpperCase()),
node.get("path").asText())));
PatchOperationType.valueOf(node.getString("op").toUpperCase()),
node.getString("path"))));
return paths;
}

public static void validatePatch(Patch jsonPatch) {
public static void validatePatch(JsonPatch jsonPatch) {
// ensure supported patch operations
JsonNode patchNode = OBJECT_MAPPER.valueToTree(jsonPatch);
patchNode
.elements()
.forEachRemaining(
node -> {
jsonPatch.toJsonArray().stream()
.map(JsonValue::asJsonObject)
.forEach(
jsonObject -> {
try {
PatchOperationType.valueOf(node.get("op").asText().toUpperCase());
PatchOperationType.valueOf(jsonObject.getString("op").toUpperCase());
} catch (Exception e) {
throw new RuntimeException(
String.format(
"Unsupported PATCH operation: `%s` Operation `%s`",
node.get("op").asText(), node),
jsonObject.getString("op"), jsonObject),
e);
}
});
Expand All @@ -70,7 +69,7 @@ public static void validatePatch(Patch jsonPatch) {
* @param transformedNode transformed node to have keys populated
* @return transformed node that has top level keys populated
*/
public static JsonNode populateTopLevelKeys(JsonNode transformedNode, Patch jsonPatch) {
public static JsonNode populateTopLevelKeys(JsonNode transformedNode, JsonPatch jsonPatch) {
JsonNode transformedNodeClone = transformedNode.deepCopy();
List<Pair<PatchOperationType, String>> paths = getPaths(jsonPatch);
for (Pair<PatchOperationType, String> operationPath : paths) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.linkedin.metadata.aspect.patch.template.common;

import com.fasterxml.jackson.databind.JsonNode;
import com.github.fge.jsonpatch.JsonPatchException;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.aspect.patch.GenericJsonPatch;
import com.linkedin.metadata.aspect.patch.template.CompoundKeyTemplate;
Expand Down Expand Up @@ -53,7 +52,7 @@ public JsonNode rebaseFields(JsonNode patched) {
return transformedNode;
}

public T applyPatch(RecordTemplate recordTemplate) throws IOException, JsonPatchException {
public T applyPatch(RecordTemplate recordTemplate) throws IOException {
return super.applyPatch(recordTemplate, genericJsonPatch.getJsonPatch());
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
package com.linkedin.metadata.aspect.patch.template;

import static com.fasterxml.jackson.databind.node.JsonNodeFactory.*;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.fge.jackson.jsonpointer.JsonPointer;
import com.github.fge.jsonpatch.AddOperation;
import com.github.fge.jsonpatch.JsonPatch;
import com.github.fge.jsonpatch.JsonPatchOperation;
import com.linkedin.chart.ChartInfo;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.metadata.aspect.patch.template.chart.ChartInfoTemplate;
import java.util.ArrayList;
import java.util.List;
import jakarta.json.Json;
import jakarta.json.JsonObjectBuilder;
import jakarta.json.JsonPatchBuilder;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand All @@ -21,18 +15,16 @@ public class ChartInfoTemplateTest {
public void testChartInfoTemplate() throws Exception {
ChartInfoTemplate chartInfoTemplate = new ChartInfoTemplate();
ChartInfo dashboardInfo = chartInfoTemplate.getDefault();
List<JsonPatchOperation> patchOperations = new ArrayList<>();
ObjectNode edgeNode = instance.objectNode();
edgeNode.put(
JsonPatchBuilder patchOperations = Json.createPatchBuilder();

JsonObjectBuilder edgeNode = Json.createObjectBuilder();
edgeNode.add(
"destinationUrn", "urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)");
JsonPatchOperation operation =
new AddOperation(
new JsonPointer(
"/inputEdges/urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)"),
edgeNode);
patchOperations.add(operation);
JsonPatch patch = new JsonPatch(patchOperations);
ChartInfo result = chartInfoTemplate.applyPatch(dashboardInfo, patch);

patchOperations.add(
"/inputEdges/urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)",
edgeNode.build());
ChartInfo result = chartInfoTemplate.applyPatch(dashboardInfo, patchOperations.build());

Assert.assertEquals(
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)"),
Expand Down
Loading

0 comments on commit 14ce5de

Please sign in to comment.