Skip to content

Commit

Permalink
Move success condition to new function shouldSkipDeletion
Browse files Browse the repository at this point in the history
  • Loading branch information
riyaverm-db committed Jul 22, 2024
1 parent c7a4a78 commit 5363da3
Showing 1 changed file with 36 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,40 @@ class RocksDBFileManager(
}
}

/**
* Determines whether batch deletion of stale version files should be skipped
* based on the following parameters and estimates of maximum and minimum
* versions present in the checkpoint directory.
*
* @param numVersionsToRetain Number of versions to retain for rollbacks.
* @param minVersionsToDelete Minimum number of stale versions required to trigger deletion.
* @return `true` if insufficient stale versions present, otherwise `false`.
*/
private def shouldSkipDeletion(numVersionsToRetain: Int, minVersionsToDelete: Long): Boolean = {
// If minVersionsToDelete <= 0, we call list every time maintenance is invoked
// This is the original behaviour without list api call optimization
if (minVersionsToDelete > 0) {
// When maxSeenVersion is defined, we check the if number of stale version files
// are at least the value of minVersionsToDelete for batch deletion of files
// We still proceed with deletion if maxSeenVersion isn't set to ensure the fallback
// is to clean up files if maxSeenVersion fails to be initialized
if (maxSeenVersion.isDefined) {
logInfo(log"Estimated maximum version is " +
log"${MDC(LogKeys.MAX_SEEN_VERSION, maxSeenVersion.get)}" +
log" and minimum version is ${MDC(LogKeys.MIN_SEEN_VERSION, minSeenVersion)}")
val versionsToDelete = maxSeenVersion.get - minSeenVersion + 1 - numVersionsToRetain
if (versionsToDelete < minVersionsToDelete) {
logInfo(log"Skipping deleting files." +
log" Need at least ${MDC(LogKeys.MIN_VERSIONS_TO_DELETE, minVersionsToDelete)}" +
log" stale versions for batch deletion but found only" +
log" ${MDC(LogKeys.VERSIONS_TO_DELETE, versionsToDelete)}.")
return true
}
}
}
false
}

/**
* Delete old versions by deleting the associated version and SST files.
* At a high-level, this method finds which versions to delete, and which SST files that were
Expand Down Expand Up @@ -442,27 +476,8 @@ class RocksDBFileManager(
* set of SST files.
*/
def deleteOldVersions(numVersionsToRetain: Int, minVersionsToDelete: Long = 0): Unit = {
// If minVersionsToDelete <= 0, we call list every time maintenance is invoked
// This is the original behaviour without list api call optimization
if (minVersionsToDelete > 0) {
// When maxSeenVersion is defined, we check the if number of stale version files
// are at least the value of minVersionsToDelete for batch deletion of files
// We still proceed with deletion if maxSeenVersion isn't set to ensure the fallback
// is to clean up files if maxSeenVersion fails to be initialized
if (maxSeenVersion.isDefined) {
logInfo(log"Estimated maximum version is " +
log"${MDC(LogKeys.MAX_SEEN_VERSION, maxSeenVersion.get)}" +
log" and minimum version is ${MDC(LogKeys.MIN_SEEN_VERSION, minSeenVersion)}")
val versionsToDelete = maxSeenVersion.get - minSeenVersion + 1 - numVersionsToRetain
if (versionsToDelete < minVersionsToDelete) {
logInfo(log"Skipping deleting files." +
log" Need at least ${MDC(LogKeys.MIN_VERSIONS_TO_DELETE, minVersionsToDelete)}" +
log" stale versions for batch deletion but found only" +
log" ${MDC(LogKeys.VERSIONS_TO_DELETE, versionsToDelete)}.")
return
}
}
}
// Check if enough stale version files present
if (shouldSkipDeletion(numVersionsToRetain, minVersionsToDelete)) return

val path = new Path(dfsRootDir)
val allFiles = fm.list(path).map(_.getPath)
Expand Down

0 comments on commit 5363da3

Please sign in to comment.