generateUrnFromStreamingDescription(
+ String description, SparkLineageConf sparkLineageConf) {
+ String pattern = "(.*?)\\[(.*)]";
+ Pattern r = Pattern.compile(pattern);
+ Matcher m = r.matcher(description);
+ if (m.find()) {
+ String namespace = m.group(1);
+ String platform = getDatahubPlatform(namespace);
+ String path = m.group(2);
+ log.debug("Streaming description Platform: {}, Path: {}", platform, path);
+ if (platform.equals(KAFKA_PLATFORM)) {
+ path = getKafkaTopicFromPath(m.group(2));
+ } else if (platform.equals(FILE_PLATFORM) || platform.equals(DELTA_LAKE_PLATFORM)) {
+ try {
+ DatasetUrn urn =
+ HdfsPathDataset.create(new URI(path), sparkLineageConf.getOpenLineageConf()).urn();
+ return Optional.of(urn);
+ } catch (InstantiationException e) {
+ return Optional.empty();
+ } catch (URISyntaxException e) {
+ log.error("Failed to parse path {}", path, e);
+ return Optional.empty();
+ }
+ }
+ return Optional.of(
+ new DatasetUrn(
+ new DataPlatformUrn(platform),
+ path,
+ sparkLineageConf.getOpenLineageConf().getFabricType()));
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ public static String getDatahubPlatform(String namespace) {
+ switch (namespace) {
+ case "KafkaV2":
+ return "kafka";
+ case "DeltaSink":
+ return "delta-lake";
+ case "CloudFilesSource":
+ return "dbfs";
+ case "FileSink":
+ case "FileStreamSource":
+ return "file";
+ default:
+ return namespace;
+ }
+ }
+
+ public static String getKafkaTopicFromPath(String path) {
+ return StringUtils.substringBetween(path, "[", "]");
+ }
+}
diff --git a/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/lifecycle/OpenLineageRunEventBuilder.java b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/lifecycle/OpenLineageRunEventBuilder.java
new file mode 100644
index 0000000000000..99643592dc200
--- /dev/null
+++ b/metadata-integration/java/spark-lineage-beta/src/main/java/io/openlineage/spark/agent/lifecycle/OpenLineageRunEventBuilder.java
@@ -0,0 +1,493 @@
+/*
+/* Copyright 2018-2023 contributors to the OpenLineage project
+/* SPDX-License-Identifier: Apache-2.0
+*/
+
+package io.openlineage.spark.agent.lifecycle;
+
+import static io.openlineage.client.OpenLineageClientUtils.mergeFacets;
+import static io.openlineage.spark.agent.util.ScalaConversionUtils.fromSeq;
+import static io.openlineage.spark.agent.util.ScalaConversionUtils.toScalaFn;
+
+import io.openlineage.client.OpenLineage;
+import io.openlineage.client.OpenLineage.DatasetFacet;
+import io.openlineage.client.OpenLineage.DatasetFacets;
+import io.openlineage.client.OpenLineage.InputDataset;
+import io.openlineage.client.OpenLineage.InputDatasetFacet;
+import io.openlineage.client.OpenLineage.InputDatasetInputFacets;
+import io.openlineage.client.OpenLineage.JobBuilder;
+import io.openlineage.client.OpenLineage.JobFacet;
+import io.openlineage.client.OpenLineage.OutputDataset;
+import io.openlineage.client.OpenLineage.OutputDatasetFacet;
+import io.openlineage.client.OpenLineage.OutputDatasetOutputFacets;
+import io.openlineage.client.OpenLineage.ParentRunFacet;
+import io.openlineage.client.OpenLineage.RunEvent;
+import io.openlineage.client.OpenLineage.RunEventBuilder;
+import io.openlineage.client.OpenLineage.RunFacet;
+import io.openlineage.client.OpenLineage.RunFacets;
+import io.openlineage.client.OpenLineage.RunFacetsBuilder;
+import io.openlineage.spark.agent.hooks.HookUtils;
+import io.openlineage.spark.agent.lifecycle.plan.column.ColumnLevelLineageUtils;
+import io.openlineage.spark.agent.lifecycle.plan.column.ColumnLevelLineageVisitor;
+import io.openlineage.spark.agent.util.FacetUtils;
+import io.openlineage.spark.agent.util.PlanUtils;
+import io.openlineage.spark.agent.util.ScalaConversionUtils;
+import io.openlineage.spark.api.CustomFacetBuilder;
+import io.openlineage.spark.api.OpenLineageContext;
+import io.openlineage.spark.api.OpenLineageEventHandlerFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import lombok.AllArgsConstructor;
+import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.spark.rdd.RDD;
+import org.apache.spark.scheduler.ActiveJob;
+import org.apache.spark.scheduler.JobFailed;
+import org.apache.spark.scheduler.SparkListenerJobEnd;
+import org.apache.spark.scheduler.SparkListenerJobStart;
+import org.apache.spark.scheduler.SparkListenerStageCompleted;
+import org.apache.spark.scheduler.SparkListenerStageSubmitted;
+import org.apache.spark.scheduler.Stage;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
+import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
+import scala.Function1;
+import scala.PartialFunction;
+
+/**
+ * Event handler that accepts various {@link org.apache.spark.scheduler.SparkListener} events and
+ * helps build up an {@link RunEvent} by passing event components to partial functions that know how
+ * to convert those event components into {@link RunEvent} properties.
+ *
+ * The event types that can be consumed to generate @link OpenLineage.RunEvent} properties have
+ * no common supertype, so the generic argument for the function input is simply {@link Object}. The
+ * types of arguments that may be found include
+ *
+ *
+ * - {@link org.apache.spark.scheduler.StageInfo}
+ *
- {@link Stage}
+ *
- {@link RDD}
+ *
- {@link ActiveJob}
+ *
- {@link org.apache.spark.sql.execution.QueryExecution}
+ *
+ *
+ * These components are extracted from various {@link org.apache.spark.scheduler.SparkListener}
+ * events, such as {@link SparkListenerStageCompleted}, {@link SparkListenerJobStart}, and {@link
+ * org.apache.spark.scheduler.SparkListenerTaskEnd}.
+ *
+ *
{@link RDD} chains will be _flattened_ so each `RDD` dependency is passed to the builders one
+ * at a time. This means a builder can directly specify the type of {@link RDD} it handles, such as
+ * a {@link org.apache.spark.rdd.HadoopRDD} or a {@link
+ * org.apache.spark.sql.execution.datasources.FileScanRDD}, without having to check the dependencies
+ * of every {@link org.apache.spark.rdd.MapPartitionsRDD} or {@link
+ * org.apache.spark.sql.execution.SQLExecutionRDD}.
+ *
+ *
Any {@link RunFacet}s and {@link JobFacet}s returned by the {@link CustomFacetBuilder}s are
+ * appended to the {@link OpenLineage.Run} and {@link OpenLineage.Job}, respectively.
+ *
+ *
If any {@link OpenLineage.InputDatasetBuilder}s or {@link
+ * OpenLineage.OutputDatasetBuilder}s are returned from the partial functions, the {@link
+ * #inputDatasetBuilders} or {@link #outputDatasetBuilders} will be invoked using the same input
+ * arguments in order to construct any {@link InputDatasetFacet}s or {@link OutputDatasetFacet}s to
+ * the returned dataset. {@link InputDatasetFacet}s and {@link OutputDatasetFacet}s will be attached
+ * to any {@link OpenLineage.InputDatasetBuilder} or {@link OpenLineage.OutputDatasetBuilder}
+ * found for the event. This is because facets may be constructed from generic information that is
+ * not specifically tied to a Dataset. For example, {@link
+ * OpenLineage.OutputStatisticsOutputDatasetFacet}s are created from {@link
+ * org.apache.spark.executor.TaskMetrics} attached to the last {@link
+ * org.apache.spark.scheduler.StageInfo} for a given job execution. However, the {@link
+ * OutputDataset} is constructed by reading the {@link LogicalPlan}. There's no way to tie the
+ * output metrics in the {@link org.apache.spark.scheduler.StageInfo} to the {@link OutputDataset}
+ * in the {@link LogicalPlan} except by inference. Similarly, input metrics can be found in the
+ * {@link org.apache.spark.scheduler.StageInfo} for the stage that reads a dataset and the {@link
+ * InputDataset} can usually be constructed by walking the {@link RDD} dependency tree for that
+ * {@link Stage} and finding a {@link org.apache.spark.sql.execution.datasources.FileScanRDD} or
+ * other concrete implementation. But while there is typically only one {@link InputDataset} read in
+ * a given stage, there's no guarantee of that and the {@link org.apache.spark.executor.TaskMetrics}
+ * in the {@link org.apache.spark.scheduler.StageInfo} won't disambiguate.
+ *
+ *
If a facet needs to be attached to a specific dataset, the user must take care to construct
+ * both the Dataset and the Facet in the same builder.
+ */
+@Slf4j
+@AllArgsConstructor
+class OpenLineageRunEventBuilder {
+
+ @NonNull private final OpenLineageContext openLineageContext;
+
+ @NonNull
+ private final Collection>> inputDatasetBuilders;
+
+ @NonNull
+ private final Collection>>
+ inputDatasetQueryPlanVisitors;
+
+ @NonNull
+ private final Collection>> outputDatasetBuilders;
+
+ @NonNull
+ private final Collection>>
+ outputDatasetQueryPlanVisitors;
+
+ @NonNull
+ private final Collection> datasetFacetBuilders;
+
+ @NonNull
+ private final Collection>
+ inputDatasetFacetBuilders;
+
+ @NonNull
+ private final Collection>
+ outputDatasetFacetBuilders;
+
+ @NonNull private final Collection> runFacetBuilders;
+ @NonNull private final Collection> jobFacetBuilders;
+ @NonNull private final Collection columnLineageVisitors;
+ private final UnknownEntryFacetListener unknownEntryFacetListener =
+ UnknownEntryFacetListener.getInstance();
+ private final Map jobMap = new HashMap<>();
+ private final Map stageMap = new HashMap<>();
+
+ OpenLineageRunEventBuilder(OpenLineageContext context, OpenLineageEventHandlerFactory factory) {
+ this(
+ context,
+ factory.createInputDatasetBuilder(context),
+ factory.createInputDatasetQueryPlanVisitors(context),
+ factory.createOutputDatasetBuilder(context),
+ factory.createOutputDatasetQueryPlanVisitors(context),
+ factory.createDatasetFacetBuilders(context),
+ factory.createInputDatasetFacetBuilders(context),
+ factory.createOutputDatasetFacetBuilders(context),
+ factory.createRunFacetBuilders(context),
+ factory.createJobFacetBuilders(context),
+ factory.createColumnLevelLineageVisitors(context));
+ }
+
+ /**
+ * Add an {@link ActiveJob} and all of its {@link Stage}s to the maps so we can look them up by id
+ * later.
+ *
+ * @param job
+ */
+ void registerJob(ActiveJob job) {
+ jobMap.put(job.jobId(), job);
+ stageMap.put(job.finalStage().id(), job.finalStage());
+ job.finalStage()
+ .parents()
+ .forall(
+ toScalaFn(
+ stage -> {
+ stageMap.put(stage.id(), stage);
+ return true;
+ }));
+ }
+
+ RunEvent buildRun(
+ Optional parentRunFacet,
+ RunEventBuilder runEventBuilder,
+ JobBuilder jobBuilder,
+ SparkListenerStageSubmitted event) {
+ Stage stage = stageMap.get(event.stageInfo().stageId());
+ RDD> rdd = stage.rdd();
+
+ List