Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic Iceberg/Hive dataset parsing in Spark instrumentation #8152

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
9292a6c
a spark instrumentation poc to collect table dataset run by spark sql
yiliangzhou Dec 20, 2024
a700298
compile sparklistener on spark 2.4 and use reflection to parse logica…
yiliangzhou Jan 2, 2025
4650049
add spark datasets to application span
yiliangzhou Jan 2, 2025
0433b59
parse ReplaceData node from LogicalPlan to support dataset referenced…
yiliangzhou Jan 2, 2025
593e004
add hive table parsing in spark listener for insert and select
yiliangzhou Jan 3, 2025
5b921e4
serialize hive/iceberg table stats and properties as json string for …
yiliangzhou Jan 6, 2025
7e49737
add configuration to turn on and off data lineage collection for spar…
yiliangzhou Jan 8, 2025
91aaedb
add configuration to limit the dataset count can be collected for spa…
yiliangzhou Jan 8, 2025
6a58146
refactor to use variable to track spark lineage dataset parsed and sent
yiliangzhou Jan 8, 2025
fb281d4
refactor
yiliangzhou Jan 8, 2025
dc6199b
cleanup
yiliangzhou Jan 10, 2025
710836e
refactoring to ensure parsed dataset are added to application span ev…
yiliangzhou Jan 10, 2025
46eaeb6
clean up
yiliangzhou Jan 10, 2025
6bf6cc0
refactor
yiliangzhou Jan 10, 2025
8bf3596
refactor
yiliangzhou Jan 10, 2025
98451dd
refactor by moving lineage dataset parsing into onSQLExecutionEnd
yiliangzhou Jan 13, 2025
dd16045
cleanup
yiliangzhou Jan 13, 2025
1f32994
refactor
yiliangzhou Jan 13, 2025
584be1a
Merge branch 'master' into liangzhou.yi/djm-spark-table-lineage
yiliangzhou Jan 13, 2025
e505a7c
Merge branch 'master' into liangzhou.yi/djm-spark-table-lineage
yiliangzhou Jan 14, 2025
825c722
fix muzzle complains
yiliangzhou Jan 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.spark.SparkConf;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.StageInfo;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.SparkPlanInfo;
import org.apache.spark.sql.execution.metric.SQLMetricInfo;
import scala.collection.JavaConverters;
Expand Down Expand Up @@ -53,6 +56,15 @@ protected List<SQLMetricInfo> getPlanInfoMetrics(SparkPlanInfo info) {
return JavaConverters.seqAsJavaList(info.metrics());
}

@Override
protected List<SparkSQLUtils.LineageDataset> parseDatasetsFromLogicalPlan(
LogicalPlan logicalPlan) {
return JavaConverters.seqAsJavaList(logicalPlan.collect(SparkSQLUtils.logicalPlanToDataset))
.stream()
.filter(Objects::nonNull)
.collect(Collectors.toList());
}

@Override
protected int[] getStageParentIds(StageInfo info) {
int[] parentIds = new int[info.parentIds().length()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public String[] helperClassNames() {
packageName + ".SparkSQLUtils",
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
packageName + ".SparkSQLUtils$AccumulatorWithStage",
packageName + ".SparkSQLUtils$LineageDataset",
packageName + ".SparkSQLUtils$1",
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.spark.SparkConf;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.StageInfo;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.SparkPlanInfo;
import org.apache.spark.sql.execution.metric.SQLMetricInfo;
import scala.jdk.javaapi.CollectionConverters;
Expand Down Expand Up @@ -53,6 +56,15 @@ protected List<SQLMetricInfo> getPlanInfoMetrics(SparkPlanInfo info) {
return CollectionConverters.asJava(info.metrics());
}

@Override
protected List<SparkSQLUtils.LineageDataset> parseDatasetsFromLogicalPlan(
LogicalPlan logicalPlan) {
return CollectionConverters.asJava(logicalPlan.collect(SparkSQLUtils.logicalPlanToDataset))
.stream()
.filter(Objects::nonNull)
.collect(Collectors.toList());
}

@Override
protected int[] getStageParentIds(StageInfo info) {
int[] parentIds = new int[info.parentIds().length()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public String[] helperClassNames() {
packageName + ".SparkSQLUtils",
packageName + ".SparkSQLUtils$SparkPlanInfoForStage",
packageName + ".SparkSQLUtils$AccumulatorWithStage",
packageName + ".SparkSQLUtils$LineageDataset",
packageName + ".SparkSQLUtils$1",
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.reflect.Field;
import java.time.OffsetDateTime;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
Expand All @@ -36,10 +38,13 @@
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.spark.ExceptionFailure;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskFailedReason;
import org.apache.spark.scheduler.*;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.execution.QueryExecution;
import org.apache.spark.sql.execution.SQLExecution;
import org.apache.spark.sql.execution.SparkPlanInfo;
import org.apache.spark.sql.execution.metric.SQLMetricInfo;
Expand Down Expand Up @@ -103,6 +108,9 @@ public abstract class AbstractDatadogSparkListener extends SparkListener {
protected final HashMap<Long, SparkPlanInfo> sqlPlans = new HashMap<>();
private final HashMap<String, SparkListenerExecutorAdded> liveExecutors = new HashMap<>();

private final List<SparkSQLUtils.LineageDataset> inputLineageDatasets = new ArrayList<>();
private final List<SparkSQLUtils.LineageDataset> outputLineageDatasets = new ArrayList<>();

// There is no easy way to know if an accumulator is not useful anymore (meaning it is not part of
// an active SQL query)
// so capping the size of the collection storing them
Expand Down Expand Up @@ -173,6 +181,9 @@ public AbstractDatadogSparkListener(SparkConf sparkConf, String appId, String sp
/** Parent Ids of a Stage. Provide an implementation based on a specific scala version */
protected abstract int[] getStageParentIds(StageInfo info);

protected abstract List<SparkSQLUtils.LineageDataset> parseDatasetsFromLogicalPlan(
LogicalPlan logicalPlan);

@Override
public synchronized void onApplicationStart(SparkListenerApplicationStart applicationStart) {
this.applicationStart = applicationStart;
Expand Down Expand Up @@ -758,6 +769,7 @@ private synchronized void onSQLExecutionStart(SparkListenerSQLExecutionStart sql
private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd) {
AgentSpan span = sqlSpans.remove(sqlEnd.executionId());
SparkAggregatedTaskMetrics metrics = sqlMetrics.remove(sqlEnd.executionId());

sqlQueries.remove(sqlEnd.executionId());
sqlPlans.remove(sqlEnd.executionId());

Expand All @@ -768,6 +780,123 @@ private synchronized void onSQLExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd)

span.finish(sqlEnd.time() * 1000);
}

try {
addLineageDatasetsToSpan(sqlEnd);
} catch (Throwable ignored) {
}
}

private void addLineageDatasetsToSpan(SparkListenerSQLExecutionEnd sqlEnd) {
if (!Config.get().isSparkDataLineageEnabled()) {
return;
}

if (null == applicationSpan) {
return;
}

if (inputLineageDatasets.size() + outputLineageDatasets.size()
>= Config.get().getSparkDataLineageLimit()) {
return;
}

Optional<LogicalPlan> logicalPlan = getAnalyzedLogicalPlan(sqlEnd);
if (!logicalPlan.isPresent()) {
return;
}

List<SparkSQLUtils.LineageDataset> datasets =
adjustForLimit(parseDatasetsFromLogicalPlan(logicalPlan.get()));
if (datasets.isEmpty()) {
return;
}

log.info(
"adding {} datasets to span for query execution id {}",
datasets.size(),
sqlEnd.executionId());

List<SparkSQLUtils.LineageDataset> inputDatasets =
datasets.stream()
.filter(dataset -> dataset.type.equals("input"))
.collect(Collectors.toList());
List<SparkSQLUtils.LineageDataset> outputDatasets =
datasets.stream()
.filter(dataset -> dataset.type.equals("output"))
.collect(Collectors.toList());

updateLineageDatasetTag(
applicationSpan, inputDatasets, inputLineageDatasets, "spark.sql.dataset.input");
updateLineageDatasetTag(
applicationSpan, outputDatasets, outputLineageDatasets, "spark.sql.dataset.output");
}

private void updateLineageDatasetTag(
AgentSpan span,
List<SparkSQLUtils.LineageDataset> datasets,
List<SparkSQLUtils.LineageDataset> lineageDatasets,
String tagPrefix) {
if (datasets.isEmpty()) {
return;
}

final int lastLineageDatasetIndex = lineageDatasets.size();
for (int i = 0; i < datasets.size(); i++) {
SparkSQLUtils.LineageDataset dataset = datasets.get(i);
int datasetIndex = lastLineageDatasetIndex + i;

span.setTag(tagPrefix + ".details." + datasetIndex + ".name", dataset.name);
span.setTag(tagPrefix + ".details." + datasetIndex + ".schema", dataset.schema);
span.setTag(tagPrefix + ".details." + datasetIndex + ".stats", dataset.stats);
span.setTag(tagPrefix + ".details." + datasetIndex + ".properties", dataset.properties);
span.setTag(tagPrefix + ".details." + datasetIndex + ".type", dataset.type);
}

lineageDatasets.addAll(datasets);
try {
applicationSpan.setTag(tagPrefix + ".json", objectMapper.writeValueAsString(lineageDatasets));
applicationSpan.setTag(tagPrefix + ".count", lineageDatasets.size());
} catch (Exception ignored) {
}
}

private List<SparkSQLUtils.LineageDataset> adjustForLimit(
List<SparkSQLUtils.LineageDataset> datasets) {
final int currentLineageDatasetSize =
inputLineageDatasets.size() + outputLineageDatasets.size();
if (currentLineageDatasetSize >= Config.get().getSparkDataLineageLimit()) {
return Collections.emptyList();
}

final int datasetAboveLimit =
currentLineageDatasetSize + datasets.size() - Config.get().getSparkDataLineageLimit();
if (datasetAboveLimit > 0) {
datasets = datasets.subList(0, datasets.size() - datasetAboveLimit);
}

return datasets;
}

private Optional<LogicalPlan> getAnalyzedLogicalPlan(SparkListenerSQLExecutionEnd sqlEnd) {
long sqlExecutionId = sqlEnd.executionId();
QueryExecution queryExecution = SQLExecution.getQueryExecution(sqlExecutionId);

if (queryExecution != null) {
return Optional.ofNullable(queryExecution.analyzed());
}

try {
Field qeField = sqlEnd.getClass().getDeclaredField("qe");
qeField.setAccessible(true);
QueryExecution qe = (QueryExecution) qeField.get(sqlEnd);
if (qe != null) {
return Optional.ofNullable(qe.analyzed());
}
} catch (Exception ignored) {
}

return Optional.empty();
}

private synchronized void onStreamingQueryStartedEvent(
Expand Down
Loading
Loading