diff --git a/core/trino-main/src/main/java/io/trino/execution/resourcegroups/InternalResourceGroup.java b/core/trino-main/src/main/java/io/trino/execution/resourcegroups/InternalResourceGroup.java index fe11ea4a6a67b..792da9a3241be 100644 --- a/core/trino-main/src/main/java/io/trino/execution/resourcegroups/InternalResourceGroup.java +++ b/core/trino-main/src/main/java/io/trino/execution/resourcegroups/InternalResourceGroup.java @@ -134,6 +134,7 @@ public class InternalResourceGroup @GuardedBy("root") private long lastStartMillis; private final CounterStat timeBetweenStartsSec = new CounterStat(); + private final CounterStat startedQueries = new CounterStat(); public InternalResourceGroup(String name, BiConsumer jmxExportListener, Executor executor) { @@ -513,6 +514,13 @@ public CounterStat getTimeBetweenStartsSec() return timeBetweenStartsSec; } + @Managed + @Nested + public CounterStat getStartedQueries() + { + return startedQueries; + } + @Managed @Override public int getSchedulingWeight() @@ -733,6 +741,10 @@ private void startInBackground(ManagedQueryExecution query) updateEligibility(); executor.execute(query::startWaitingForResources); } + // mark the time when the query was started for this group and all ancestors + for (InternalResourceGroup group = this; group != null; group = group.parent.orElse(null)) { + group.getStartedQueries().update(1); + } } public void updateGroupsAndProcessQueuedQueries() diff --git a/core/trino-main/src/test/java/io/trino/execution/resourcegroups/TestResourceGroups.java b/core/trino-main/src/test/java/io/trino/execution/resourcegroups/TestResourceGroups.java index 18211ca8a463a..6e9f45a263261 100644 --- a/core/trino-main/src/test/java/io/trino/execution/resourcegroups/TestResourceGroups.java +++ b/core/trino-main/src/test/java/io/trino/execution/resourcegroups/TestResourceGroups.java @@ -1291,6 +1291,48 @@ public void testGetResourceGroupStateInfo() assertThat(queryInfo.getResourceGroupId()).isEqualTo(Optional.of(rootB.getId())); } + @Test + public void testStartedQueries() + { + InternalResourceGroup root = new InternalResourceGroup("root", (_, _) -> {}, directExecutor()) + { + @Override + public void triggerProcessQueuedQueries() + { + // No op to allow the test fine-grained control about when to trigger the next query. + } + }; + var rootA = root.getOrCreateSubGroup("a"); + var rootA1 = rootA.getOrCreateSubGroup("1"); + var rootB = root.getOrCreateSubGroup("b"); + + var allGroups = List.of(root, rootB, rootA, rootA1); + allGroups.forEach(group -> { + group.setHardConcurrencyLimit(2); + group.setMaxQueuedQueries(100); + }); + + var queries = Stream.generate(() -> new MockManagedQueryExecutionBuilder().build()).limit(4).toArray(MockManagedQueryExecution[]::new); + + rootB.run(queries[0]); + // no values yet since there is no previous start time to compare against + assertThat(allGroups).extracting(group -> group.getStartedQueries().getTotalCount()).containsExactly(1L, 1L, 0L, 0L); + + rootA1.run(queries[1]); + assertThat(allGroups).extracting(group -> group.getStartedQueries().getTotalCount()).containsExactly(2L, 1L, 1L, 1L); + + // these should queue + rootA1.run(queries[2]); + rootA1.run(queries[3]); + assertThat(allGroups).extracting(group -> group.getStartedQueries().getTotalCount()).containsExactly(2L, 1L, 1L, 1L); + + // let q3/q4 run by draining q1/q2 + queries[0].complete(); + queries[1].complete(); + root.updateGroupsAndProcessQueuedQueries(); + assertThat(allGroups).extracting(group -> group.getStartedQueries().getTotalCount()).containsExactly(4L, 1L, 3L, 3L); + } + @Test public void testGetWaitingQueuedQueries() {