Skip to content

Commit

Permalink
[MINOR] improvement(server): Record more grpc process time and total …
Browse files Browse the repository at this point in the history
…metrics (#2167)

### What changes were proposed in this pull request?

Record more grpc process time and total metrics

### Why are the changes needed?

Especially, the reportShuffleResult and getShuffleResultForMultiPart would be the most cost time call, this PR could measure  the performance of these methods.

### Does this PR introduce _any_ user-facing change?

Introduced new metrics.
- grpc_get_shuffle_result_for_multi_part_process_latency
- grpc_report_shuffle_result_process_latency
- grpc_get_shuffle_result_for_multi_part_total

### How was this patch tested?

Locally.


<img width="530" alt="image" src="https://github.com/user-attachments/assets/372efc17-585c-47ad-a2e2-edb4552141b9">

<img width="642" alt="image" src="https://github.com/user-attachments/assets/a574c5da-c31b-4c26-80e6-1e1f24dfcc5e">

<img width="638" alt="image" src="https://github.com/user-attachments/assets/d2dee73e-8b2b-4eb4-ae0f-fbfb494c958c">
  • Loading branch information
maobaolong authored Oct 12, 2024
1 parent be23f61 commit 43e8a7d
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics {
public static final String GET_SHUFFLE_DATA_METHOD = "getLocalShuffleData";
public static final String GET_MEMORY_SHUFFLE_DATA_METHOD = "getMemoryShuffleData";
public static final String GET_SHUFFLE_INDEX_METHOD = "getLocalShuffleIndex";
public static final String GET_SHUFFLE_RESULT_FOR_MULTI_PART_METHOD =
"getShuffleResultForMultiPart";

private static final String GRPC_REGISTERED_SHUFFLE = "grpc_registered_shuffle";
private static final String GRPC_SEND_SHUFFLE_DATA = "grpc_send_shuffle_data";
Expand All @@ -57,6 +59,8 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics {
private static final String GRPC_GET_MEMORY_SHUFFLE_DATA_TOTAL =
"grpc_get_memory_shuffle_data_total";
private static final String GRPC_GET_SHUFFLE_INDEX_TOTAL = "grpc_get_local_shuffle_index_total";
private static final String GRPC_GET_SHUFFLE_RESULT_FOR_MULTI_PART_TOTAL =
"grpc_get_shuffle_result_for_multi_part_total";

private static final String GRPC_SEND_SHUFFLE_DATA_TRANSPORT_LATENCY =
"grpc_send_shuffle_data_transport_latency";
Expand All @@ -71,6 +75,10 @@ public class ShuffleServerGrpcMetrics extends GRPCMetrics {
"grpc_get_local_shuffle_data_process_latency";
private static final String GRPC_GET_MEMORY_SHUFFLE_DATA_PROCESS_LATENCY =
"grpc_get_memory_shuffle_data_process_latency";
private static final String GRPC_GET_SHUFFLE_RESULT_FOR_MULTI_PART_PROCESS_LATENCY =
"grpc_get_shuffle_result_for_multi_part_process_latency";
private static final String GRPC_REPORT_SHUFFLE_RESULT_PROCESS_LATENCY =
"grpc_report_shuffle_result_process_latency";

public ShuffleServerGrpcMetrics(ShuffleServerConf shuffleServerConf, String tags) {
super(shuffleServerConf, tags);
Expand Down Expand Up @@ -126,6 +134,9 @@ public void registerMetrics() {
metricsManager.addLabeledCounter(GRPC_GET_MEMORY_SHUFFLE_DATA_TOTAL));
counterMap.putIfAbsent(
GET_SHUFFLE_INDEX_METHOD, metricsManager.addLabeledCounter(GRPC_GET_SHUFFLE_INDEX_TOTAL));
counterMap.putIfAbsent(
GET_SHUFFLE_RESULT_FOR_MULTI_PART_METHOD,
metricsManager.addLabeledCounter(GRPC_GET_SHUFFLE_RESULT_FOR_MULTI_PART_TOTAL));

transportTimeSummaryMap.putIfAbsent(
SEND_SHUFFLE_DATA_METHOD,
Expand All @@ -146,5 +157,11 @@ public void registerMetrics() {
processTimeSummaryMap.putIfAbsent(
GET_MEMORY_SHUFFLE_DATA_METHOD,
metricsManager.addLabeledSummary(GRPC_GET_MEMORY_SHUFFLE_DATA_PROCESS_LATENCY));
processTimeSummaryMap.putIfAbsent(
REPORT_SHUFFLE_RESULT_METHOD,
metricsManager.addLabeledSummary(GRPC_REPORT_SHUFFLE_RESULT_PROCESS_LATENCY));
processTimeSummaryMap.putIfAbsent(
GET_SHUFFLE_RESULT_FOR_MULTI_PART_METHOD,
metricsManager.addLabeledSummary(GRPC_GET_SHUFFLE_RESULT_FOR_MULTI_PART_PROCESS_LATENCY));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,7 @@ public void reportShuffleResult(
+ "]";

try {
long start = System.currentTimeMillis();
int expectedBlockCount =
partitionToBlockIds.values().stream().mapToInt(x -> x.length).sum();
LOG.info(
Expand All @@ -863,6 +864,10 @@ public void reportShuffleResult(
shuffleServer
.getShuffleTaskManager()
.addFinishedBlockIds(appId, shuffleId, partitionToBlockIds, bitmapNum);
long costTime = System.currentTimeMillis() - start;
shuffleServer
.getGrpcMetrics()
.recordProcessTime(ShuffleServerGrpcMetrics.REPORT_SHUFFLE_RESULT_METHOD, costTime);
if (expectedBlockCount != updatedBlockCount) {
LOG.warn(
"Existing {} duplicated blockIds on blockId report for appId: {}, shuffleId: {}",
Expand Down Expand Up @@ -993,6 +998,7 @@ public void getShuffleResultForMultiPart(
ByteString serializedBlockIdsBytes = ByteString.EMPTY;

try {
long start = System.currentTimeMillis();
serializedBlockIds =
shuffleServer
.getShuffleTaskManager()
Expand All @@ -1004,6 +1010,11 @@ public void getShuffleResultForMultiPart(
LOG.warn(msg);
} else {
serializedBlockIdsBytes = UnsafeByteOperations.unsafeWrap(serializedBlockIds);
long costTime = System.currentTimeMillis() - start;
shuffleServer
.getGrpcMetrics()
.recordProcessTime(
ShuffleServerGrpcMetrics.GET_SHUFFLE_RESULT_FOR_MULTI_PART_METHOD, costTime);
}
} catch (Exception e) {
status = StatusCode.INTERNAL_ERROR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void testLatencyMetrics() throws InterruptedException {
Map<String, Summary.Child> sendTimeSummaryTime = metrics.getTransportTimeSummaryMap();
Map<String, Summary.Child> processTimeSummaryTime = metrics.getProcessTimeSummaryMap();
assertEquals(3, sendTimeSummaryTime.size());
assertEquals(3, processTimeSummaryTime.size());
assertEquals(5, processTimeSummaryTime.size());

Thread.sleep(1000L);
assertEquals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public void testGrpcMetrics() throws Exception {
ObjectMapper mapper = new ObjectMapper();
JsonNode actualObj = mapper.readTree(content);
assertEquals(2, actualObj.size());
assertEquals(69, actualObj.get("metrics").size());
assertEquals(84, actualObj.get("metrics").size());
}

@Test
Expand Down

0 comments on commit 43e8a7d

Please sign in to comment.