diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java index daae92df74..96f892962c 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java @@ -208,12 +208,8 @@ private void flushToFile(ShuffleDataFlushEvent event) { // just log the error, don't throw the exception and stop the flush thread LOG.error("Exception happened when process flush shuffle data for " + event, e); } finally { - ShuffleBuffer shuffleBuffer = event.getShuffleBuffer(); - if (shuffleBuffer != null) { - shuffleBuffer.clearInFlushBuffer(event.getEventId()); - } + cleanupFlushEventData(event); if (shuffleServer != null) { - shuffleServer.getShuffleBufferManager().releaseMemory(event.getSize(), true, false); long duration = System.currentTimeMillis() - start; if (writeSuccess) { LOG.debug("Flush to file success in " + duration + " ms and release " + event.getSize() + " bytes"); @@ -310,14 +306,22 @@ void processPendingEvents() throws Exception { addPendingEventsInternal(event); } - private void dropPendingEvent(PendingShuffleFlushEvent event) { - ShuffleServerMetrics.counterTotalDroppedEventNum.inc(); + private void cleanupFlushEventData(ShuffleDataFlushEvent event) { + ShuffleBuffer shuffleBuffer = event.getShuffleBuffer(); + if (shuffleBuffer != null) { + shuffleBuffer.clearInFlushBuffer(event.getEventId()); + } if (shuffleServer != null) { shuffleServer.getShuffleBufferManager().releaseMemory( - event.getEvent().getSize(), true, false); + event.getSize(), true, false); } } + private void dropPendingEvent(PendingShuffleFlushEvent event) { + ShuffleServerMetrics.counterTotalDroppedEventNum.inc(); + cleanupFlushEventData(event.getEvent()); + } + @VisibleForTesting void addPendingEvents(ShuffleDataFlushEvent event) { addPendingEventsInternal(new PendingShuffleFlushEvent(event));