Skip to content

Commit

Permalink
[BUG] Potenial memory leak when encountering disk unhealthy (#370)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Fix potential memory leak when encountering disk unhealthy

### Why are the changes needed?

When encountering disk unhealthy and exceed the timeout of pendingDataShuffleFlushEvent, it will release memory. But in current codebase, it wont release the data reference and cause the memory leak.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?
No need.
  • Loading branch information
zuston authored and kaijchen committed Nov 30, 2022
1 parent 753f7b3 commit 36759f3
Showing 1 changed file with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,8 @@ private void flushToFile(ShuffleDataFlushEvent event) {
// just log the error, don't throw the exception and stop the flush thread
LOG.error("Exception happened when process flush shuffle data for " + event, e);
} finally {
ShuffleBuffer shuffleBuffer = event.getShuffleBuffer();
if (shuffleBuffer != null) {
shuffleBuffer.clearInFlushBuffer(event.getEventId());
}
cleanupFlushEventData(event);
if (shuffleServer != null) {
shuffleServer.getShuffleBufferManager().releaseMemory(event.getSize(), true, false);
long duration = System.currentTimeMillis() - start;
if (writeSuccess) {
LOG.debug("Flush to file success in " + duration + " ms and release " + event.getSize() + " bytes");
Expand Down Expand Up @@ -310,14 +306,22 @@ void processPendingEvents() throws Exception {
addPendingEventsInternal(event);
}

private void dropPendingEvent(PendingShuffleFlushEvent event) {
ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
private void cleanupFlushEventData(ShuffleDataFlushEvent event) {
ShuffleBuffer shuffleBuffer = event.getShuffleBuffer();
if (shuffleBuffer != null) {
shuffleBuffer.clearInFlushBuffer(event.getEventId());
}
if (shuffleServer != null) {
shuffleServer.getShuffleBufferManager().releaseMemory(
event.getEvent().getSize(), true, false);
event.getSize(), true, false);
}
}

private void dropPendingEvent(PendingShuffleFlushEvent event) {
ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
cleanupFlushEventData(event.getEvent());
}

@VisibleForTesting
void addPendingEvents(ShuffleDataFlushEvent event) {
addPendingEventsInternal(new PendingShuffleFlushEvent(event));
Expand Down

0 comments on commit 36759f3

Please sign in to comment.