Skip to content

Commit

Permalink
Add more pre-task distribution statistics in query completion event
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Apr 25, 2024
1 parent 2e1759a commit 741dfec
Show file tree
Hide file tree
Showing 5 changed files with 597 additions and 7 deletions.
107 changes: 101 additions & 6 deletions core/trino-main/src/main/java/io/trino/event/QueryMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@
import io.trino.server.BasicQueryInfo;
import io.trino.spi.ErrorCode;
import io.trino.spi.QueryId;
import io.trino.spi.eventlistener.DoubleSymmetricDistribution;
import io.trino.spi.eventlistener.LongDistribution;
import io.trino.spi.eventlistener.LongSymmetricDistribution;
import io.trino.spi.eventlistener.OutputColumnMetadata;
import io.trino.spi.eventlistener.QueryCompletedEvent;
import io.trino.spi.eventlistener.QueryContext;
Expand Down Expand Up @@ -780,25 +782,77 @@ private List<StageTaskStatistics> getStageTaskStatistics(QueryInfo queryInfo)
}

ImmutableList.Builder<StageTaskStatistics> builder = ImmutableList.builder();
populateStageTaskStatistics(queryInfo.getOutputStage().get(), builder);
populateStageTaskStatistics(queryInfo, queryInfo.getOutputStage().get(), builder);

return builder.build();
}

private void populateStageTaskStatistics(StageInfo stageInfo, ImmutableList.Builder<StageTaskStatistics> builder)
private void populateStageTaskStatistics(QueryInfo queryInfo, StageInfo stageInfo, ImmutableList.Builder<StageTaskStatistics> builder)
{
builder.add(computeStageTaskStatistics(stageInfo));
builder.add(computeStageTaskStatistics(queryInfo, stageInfo));
for (StageInfo subStage : stageInfo.getSubStages()) {
populateStageTaskStatistics(subStage, builder);
populateStageTaskStatistics(queryInfo, subStage, builder);
}
}

private StageTaskStatistics computeStageTaskStatistics(StageInfo stageInfo)
private StageTaskStatistics computeStageTaskStatistics(QueryInfo queryInfo, StageInfo stageInfo)
{
long queryCreateTimeMillis = queryInfo.getQueryStats().getCreateTime().getMillis();
LongSymmetricDistribution createTimeMillisDistribution = getTasksSymmetricDistribution(stageInfo, taskInfo -> Optional.of(taskInfo.getStats().getCreateTime().getMillis() - queryCreateTimeMillis));
LongSymmetricDistribution firstStartTimeMillisDistribution = getTasksSymmetricDistribution(stageInfo, taskInfo -> Optional.ofNullable(taskInfo.getStats().getFirstStartTime()).map(value -> value.getMillis() - queryCreateTimeMillis));
LongSymmetricDistribution lastStartTimeMillisDistribution = getTasksSymmetricDistribution(stageInfo, taskInfo -> Optional.ofNullable(taskInfo.getStats().getLastStartTime()).map(value -> value.getMillis() - queryCreateTimeMillis));
LongSymmetricDistribution terminatingStartTimeMillisDistribution = getTasksSymmetricDistribution(stageInfo, taskInfo -> Optional.ofNullable(taskInfo.getStats().getTerminatingStartTime()).map(value -> value.getMillis() - queryCreateTimeMillis));
LongSymmetricDistribution lastEndTimeMillisDistribution = getTasksSymmetricDistribution(stageInfo, taskInfo -> Optional.ofNullable(taskInfo.getStats().getLastEndTime()).map(value -> value.getMillis() - queryCreateTimeMillis));
LongSymmetricDistribution endTimeMillisDistribution = getTasksSymmetricDistribution(stageInfo, taskInfo -> Optional.ofNullable(taskInfo.getStats().getEndTime()).map(value -> value.getMillis() - queryCreateTimeMillis));

Optional<Long> queryExecutionTime = Optional.ofNullable(queryInfo.getQueryStats().getEndTime()).map(value -> value.getMillis() - queryCreateTimeMillis);
DoubleSymmetricDistribution createTimeScaledDistribution;
DoubleSymmetricDistribution firstStartTimeScaledDistribution;
DoubleSymmetricDistribution lastStartTimeScaledDistribution;
DoubleSymmetricDistribution terminatingStartTimeScaledDistribution;
DoubleSymmetricDistribution lastEndTimeScaledDistribution;
DoubleSymmetricDistribution endTimeScaledDistribution;
if (queryExecutionTime.isPresent()) {
createTimeScaledDistribution = scaleDistribution(createTimeMillisDistribution, queryExecutionTime.orElseThrow());
firstStartTimeScaledDistribution = scaleDistribution(firstStartTimeMillisDistribution, queryExecutionTime.orElseThrow());
lastStartTimeScaledDistribution = scaleDistribution(lastStartTimeMillisDistribution, queryExecutionTime.orElseThrow());
terminatingStartTimeScaledDistribution = scaleDistribution(terminatingStartTimeMillisDistribution, queryExecutionTime.orElseThrow());
lastEndTimeScaledDistribution = scaleDistribution(lastEndTimeMillisDistribution, queryExecutionTime.orElseThrow());
endTimeScaledDistribution = scaleDistribution(endTimeMillisDistribution, queryExecutionTime.orElseThrow());
}
else {
createTimeScaledDistribution = DoubleSymmetricDistribution.ZERO;
firstStartTimeScaledDistribution = DoubleSymmetricDistribution.ZERO;
lastStartTimeScaledDistribution = DoubleSymmetricDistribution.ZERO;
terminatingStartTimeScaledDistribution = DoubleSymmetricDistribution.ZERO;
lastEndTimeScaledDistribution = DoubleSymmetricDistribution.ZERO;
endTimeScaledDistribution = DoubleSymmetricDistribution.ZERO;
}
return new StageTaskStatistics(
stageInfo.getStageId().getId(),
stageInfo.getTasks().size(),
getTasksDistribution(stageInfo, taskInfo -> Optional.of(taskInfo.getStats().getTotalCpuTime().toMillis())));
getTasksDistribution(stageInfo, taskInfo -> Optional.of(taskInfo.getStats().getTotalCpuTime().toMillis())),
getTasksDistribution(stageInfo, taskInfo -> Optional.of(taskInfo.getStats().getTotalScheduledTime().toMillis())),
getTasksDistribution(stageInfo, taskInfo -> Optional.of(taskInfo.getStats().getPeakUserMemoryReservation().toBytes())),
getTasksDistribution(stageInfo, taskInfo -> Optional.of(taskInfo.getStats().getRawInputDataSize().toBytes())),
getTasksDistribution(stageInfo, taskInfo -> Optional.of(taskInfo.getStats().getRawInputPositions())),
getTasksDistribution(stageInfo, taskInfo -> Optional.of(taskInfo.getStats().getProcessedInputDataSize().toBytes())),
getTasksDistribution(stageInfo, taskInfo -> Optional.of(taskInfo.getStats().getProcessedInputPositions())),
getTasksDistribution(stageInfo, taskInfo -> Optional.of(taskInfo.getStats().getOutputDataSize().toBytes())),
getTasksDistribution(stageInfo, taskInfo -> Optional.of(taskInfo.getStats().getOutputPositions())),
getTasksDistribution(stageInfo, taskInfo -> Optional.of((long) taskInfo.getStats().getTotalDrivers())),
createTimeMillisDistribution,
firstStartTimeMillisDistribution,
lastStartTimeMillisDistribution,
terminatingStartTimeMillisDistribution,
lastEndTimeMillisDistribution,
endTimeMillisDistribution,
createTimeScaledDistribution,
firstStartTimeScaledDistribution,
lastStartTimeScaledDistribution,
terminatingStartTimeScaledDistribution,
lastEndTimeScaledDistribution,
endTimeScaledDistribution);
}

private static LongDistribution getTasksDistribution(StageInfo stageInfo, Function<TaskInfo, Optional<Long>> metricFunction)
Expand All @@ -821,6 +875,47 @@ private static LongDistribution getTasksDistribution(StageInfo stageInfo, Functi
firstNonNaN(snapshot.getTotal() / snapshot.getCount(), 0.0));
}

private static LongSymmetricDistribution getTasksSymmetricDistribution(StageInfo stageInfo, Function<TaskInfo, Optional<Long>> metricFunction)
{
Distribution distribution = new Distribution();
for (TaskInfo taskInfo : stageInfo.getTasks()) {
metricFunction.apply(taskInfo).ifPresent(distribution::add);
}
DistributionSnapshot snapshot = distribution.snapshot();
return new LongSymmetricDistribution(
(long) snapshot.getP01(),
(long) snapshot.getP05(),
(long) snapshot.getP10(),
(long) snapshot.getP25(),
(long) snapshot.getP50(),
(long) snapshot.getP75(),
(long) snapshot.getP90(),
(long) snapshot.getP95(),
(long) snapshot.getP99(),
(long) snapshot.getMin(),
(long) snapshot.getMax(),
(long) snapshot.getTotal(),
firstNonNaN(snapshot.getTotal() / snapshot.getCount(), 0.0));
}

private static DoubleSymmetricDistribution scaleDistribution(LongSymmetricDistribution distribution, long scaleFactor)
{
return new DoubleSymmetricDistribution(
(double) distribution.getP01() / scaleFactor,
(double) distribution.getP05() / scaleFactor,
(double) distribution.getP10() / scaleFactor,
(double) distribution.getP25() / scaleFactor,
(double) distribution.getP50() / scaleFactor,
(double) distribution.getP75() / scaleFactor,
(double) distribution.getP90() / scaleFactor,
(double) distribution.getP95() / scaleFactor,
(double) distribution.getP99() / scaleFactor,
(double) distribution.getMin() / scaleFactor,
(double) distribution.getMax() / scaleFactor,
(double) distribution.getTotal(),
(double) distribution.getAverage() / scaleFactor);
}

private static class FragmentNode
{
private final PlanFragmentId fragmentId;
Expand Down
6 changes: 6 additions & 0 deletions core/trino-spi/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@
<code>java.class.removed</code>
<old>@interface io.trino.spi.function.RemoveInputFunction</old>
</item>
<item>
<ignore>true</ignore>
<code>java.method.numberOfParametersChanged</code>
<old>method void io.trino.spi.eventlistener.QueryStatistics::&lt;init&gt;(java.time.Duration, java.time.Duration, java.time.Duration, java.time.Duration, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, long, long, long, long, long, long, long, long, long, long, long, long, long, long, long, long, double, double, java.util.List&lt;io.trino.spi.eventlistener.StageGcStatistics&gt;, int, boolean, java.util.List&lt;io.trino.spi.eventlistener.StageCpuDistribution&gt;, java.util.List&lt;io.trino.spi.eventlistener.StageOutputBufferUtilization&gt;, java.util.function.Supplier&lt;java.util.List&lt;java.lang.String&gt;&gt;, java.util.List&lt;io.trino.spi.eventlistener.QueryPlanOptimizerStatistics&gt;, java.util.Optional&lt;java.lang.String&gt;)</old>
<new>method void io.trino.spi.eventlistener.QueryStatistics::&lt;init&gt;(java.time.Duration, java.time.Duration, java.time.Duration, java.time.Duration, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, java.util.Optional&lt;java.time.Duration&gt;, long, long, long, long, long, long, long, long, long, long, long, long, long, long, long, long, double, double, java.util.List&lt;io.trino.spi.eventlistener.StageGcStatistics&gt;, int, boolean, java.util.List&lt;io.trino.spi.eventlistener.StageCpuDistribution&gt;, java.util.List&lt;io.trino.spi.eventlistener.StageOutputBufferUtilization&gt;, java.util.List&lt;io.trino.spi.eventlistener.StageTaskStatistics&gt;, java.util.function.Supplier&lt;java.util.List&lt;java.lang.String&gt;&gt;, java.util.List&lt;io.trino.spi.eventlistener.QueryPlanOptimizerStatistics&gt;, java.util.Optional&lt;java.lang.String&gt;)</new>
</item>
</differences>
</revapi.differences>
</analysisConfiguration>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.spi.eventlistener;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.trino.spi.Unstable;

public class DoubleSymmetricDistribution
{
public static final DoubleSymmetricDistribution ZERO = new DoubleSymmetricDistribution(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0);

private final double p01;
private final double p05;
private final double p10;
private final double p25;
private final double p50;
private final double p75;
private final double p90;
private final double p95;
private final double p99;
private final double min;
private final double max;
private final double total;
private final double average;

@JsonCreator
@Unstable
public DoubleSymmetricDistribution(
@JsonProperty("p01") double p01,
@JsonProperty("p05") double p05,
@JsonProperty("p10") double p10,
@JsonProperty("p25") double p25,
@JsonProperty("p50") double p50,
@JsonProperty("p75") double p75,
@JsonProperty("p90") double p90,
@JsonProperty("p95") double p95,
@JsonProperty("p99") double p99,
@JsonProperty("min") double min,
@JsonProperty("max") double max,
@JsonProperty("total") double total,
@JsonProperty("average") double average)
{
this.p01 = p01;
this.p05 = p05;
this.p10 = p10;
this.p25 = p25;
this.p50 = p50;
this.p75 = p75;
this.p90 = p90;
this.p95 = p95;
this.p99 = p99;
this.min = min;
this.max = max;
this.total = total;
this.average = average;
}

@JsonProperty
public double getP01()
{
return p01;
}

@JsonProperty
public double getP05()
{
return p05;
}

@JsonProperty
public double getP10()
{
return p10;
}

@JsonProperty
public double getP25()
{
return p25;
}

@JsonProperty
public double getP50()
{
return p50;
}

@JsonProperty
public double getP75()
{
return p75;
}

@JsonProperty
public double getP90()
{
return p90;
}

@JsonProperty
public double getP95()
{
return p95;
}

@JsonProperty
public double getP99()
{
return p99;
}

@JsonProperty
public double getMin()
{
return min;
}

@JsonProperty
public double getMax()
{
return max;
}

@JsonProperty
public double getTotal()
{
return total;
}

@JsonProperty
public double getAverage()
{
return average;
}
}
Loading

0 comments on commit 741dfec

Please sign in to comment.