From 43e8a7df5e287f05a522c70f0e08262788f3f556 Mon Sep 17 00:00:00 2001 From: maobaolong Date: Sat, 12 Oct 2024 11:48:01 +0800 Subject: [PATCH] [MINOR] improvement(server): Record more grpc process time and total metrics (#2167) ### What changes were proposed in this pull request? Record more grpc process time and total metrics ### Why are the changes needed? Especially, the reportShuffleResult and getShuffleResultForMultiPart would be the most cost time call, this PR could measure the performance of these methods. ### Does this PR introduce _any_ user-facing change? Introduced new metrics. - grpc_get_shuffle_result_for_multi_part_process_latency - grpc_report_shuffle_result_process_latency - grpc_get_shuffle_result_for_multi_part_total ### How was this patch tested? Locally. image image image --- .../server/ShuffleServerGrpcMetrics.java | 17 +++++++++++++++++ .../server/ShuffleServerGrpcService.java | 11 +++++++++++ .../server/ShuffleServerGrpcMetricsTest.java | 2 +- .../server/ShuffleServerMetricsTest.java | 2 +- 4 files changed, 30 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java index f4c53ad76f..929e03d489 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcMetrics.java @@ -32,6 +32,8 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics { public static final String GET_SHUFFLE_DATA_METHOD = "getLocalShuffleData"; public static final String GET_MEMORY_SHUFFLE_DATA_METHOD = "getMemoryShuffleData"; public static final String GET_SHUFFLE_INDEX_METHOD = "getLocalShuffleIndex"; + public static final String GET_SHUFFLE_RESULT_FOR_MULTI_PART_METHOD = + "getShuffleResultForMultiPart"; private static final String GRPC_REGISTERED_SHUFFLE = "grpc_registered_shuffle"; private static final String GRPC_SEND_SHUFFLE_DATA = "grpc_send_shuffle_data"; @@ -57,6 +59,8 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics { private static final String GRPC_GET_MEMORY_SHUFFLE_DATA_TOTAL = "grpc_get_memory_shuffle_data_total"; private static final String GRPC_GET_SHUFFLE_INDEX_TOTAL = "grpc_get_local_shuffle_index_total"; + private static final String GRPC_GET_SHUFFLE_RESULT_FOR_MULTI_PART_TOTAL = + "grpc_get_shuffle_result_for_multi_part_total"; private static final String GRPC_SEND_SHUFFLE_DATA_TRANSPORT_LATENCY = "grpc_send_shuffle_data_transport_latency"; @@ -71,6 +75,10 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics { "grpc_get_local_shuffle_data_process_latency"; private static final String GRPC_GET_MEMORY_SHUFFLE_DATA_PROCESS_LATENCY = "grpc_get_memory_shuffle_data_process_latency"; + private static final String GRPC_GET_SHUFFLE_RESULT_FOR_MULTI_PART_PROCESS_LATENCY = + "grpc_get_shuffle_result_for_multi_part_process_latency"; + private static final String GRPC_REPORT_SHUFFLE_RESULT_PROCESS_LATENCY = + "grpc_report_shuffle_result_process_latency"; public ShuffleServerGrpcMetrics(ShuffleServerConf shuffleServerConf, String tags) { super(shuffleServerConf, tags); @@ -126,6 +134,9 @@ public void registerMetrics() { metricsManager.addLabeledCounter(GRPC_GET_MEMORY_SHUFFLE_DATA_TOTAL)); counterMap.putIfAbsent( GET_SHUFFLE_INDEX_METHOD, metricsManager.addLabeledCounter(GRPC_GET_SHUFFLE_INDEX_TOTAL)); + counterMap.putIfAbsent( + GET_SHUFFLE_RESULT_FOR_MULTI_PART_METHOD, + metricsManager.addLabeledCounter(GRPC_GET_SHUFFLE_RESULT_FOR_MULTI_PART_TOTAL)); transportTimeSummaryMap.putIfAbsent( SEND_SHUFFLE_DATA_METHOD, @@ -146,5 +157,11 @@ public void registerMetrics() { processTimeSummaryMap.putIfAbsent( GET_MEMORY_SHUFFLE_DATA_METHOD, metricsManager.addLabeledSummary(GRPC_GET_MEMORY_SHUFFLE_DATA_PROCESS_LATENCY)); + processTimeSummaryMap.putIfAbsent( + REPORT_SHUFFLE_RESULT_METHOD, + metricsManager.addLabeledSummary(GRPC_REPORT_SHUFFLE_RESULT_PROCESS_LATENCY)); + processTimeSummaryMap.putIfAbsent( + GET_SHUFFLE_RESULT_FOR_MULTI_PART_METHOD, + metricsManager.addLabeledSummary(GRPC_GET_SHUFFLE_RESULT_FOR_MULTI_PART_PROCESS_LATENCY)); } } diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java index a2b4d47d60..c9375123f4 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java @@ -852,6 +852,7 @@ public void reportShuffleResult( + "]"; try { + long start = System.currentTimeMillis(); int expectedBlockCount = partitionToBlockIds.values().stream().mapToInt(x -> x.length).sum(); LOG.info( @@ -863,6 +864,10 @@ public void reportShuffleResult( shuffleServer .getShuffleTaskManager() .addFinishedBlockIds(appId, shuffleId, partitionToBlockIds, bitmapNum); + long costTime = System.currentTimeMillis() - start; + shuffleServer + .getGrpcMetrics() + .recordProcessTime(ShuffleServerGrpcMetrics.REPORT_SHUFFLE_RESULT_METHOD, costTime); if (expectedBlockCount != updatedBlockCount) { LOG.warn( "Existing {} duplicated blockIds on blockId report for appId: {}, shuffleId: {}", @@ -993,6 +998,7 @@ public void getShuffleResultForMultiPart( ByteString serializedBlockIdsBytes = ByteString.EMPTY; try { + long start = System.currentTimeMillis(); serializedBlockIds = shuffleServer .getShuffleTaskManager() @@ -1004,6 +1010,11 @@ public void getShuffleResultForMultiPart( LOG.warn(msg); } else { serializedBlockIdsBytes = UnsafeByteOperations.unsafeWrap(serializedBlockIds); + long costTime = System.currentTimeMillis() - start; + shuffleServer + .getGrpcMetrics() + .recordProcessTime( + ShuffleServerGrpcMetrics.GET_SHUFFLE_RESULT_FOR_MULTI_PART_METHOD, costTime); } } catch (Exception e) { status = StatusCode.INTERNAL_ERROR; diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java index 27ef4807b2..033494349c 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerGrpcMetricsTest.java @@ -43,7 +43,7 @@ public void testLatencyMetrics() throws InterruptedException { Map sendTimeSummaryTime = metrics.getTransportTimeSummaryMap(); Map processTimeSummaryTime = metrics.getProcessTimeSummaryMap(); assertEquals(3, sendTimeSummaryTime.size()); - assertEquals(3, processTimeSummaryTime.size()); + assertEquals(5, processTimeSummaryTime.size()); Thread.sleep(1000L); assertEquals( diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java index 58a31b47a8..2e960b74b1 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java @@ -231,7 +231,7 @@ public void testGrpcMetrics() throws Exception { ObjectMapper mapper = new ObjectMapper(); JsonNode actualObj = mapper.readTree(content); assertEquals(2, actualObj.size()); - assertEquals(69, actualObj.get("metrics").size()); + assertEquals(84, actualObj.get("metrics").size()); } @Test