Skip to content

Commit

Permalink
Add deepCopy test case
Browse files Browse the repository at this point in the history
  • Loading branch information
riyaverm-db committed Jun 3, 2024
1 parent 42a7752 commit 2e13596
Showing 1 changed file with 49 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming.state

import java.io._
import java.nio.charset.Charset
import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable
import scala.language.implicitConversions
Expand Down Expand Up @@ -874,6 +875,48 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
)
}

testWithChangelogCheckpointingEnabled("RocksDBFileManager: deepCopy") {
val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1")
val originalFileManager = new RocksDBFileManager(
dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration)

val file1 = RocksDBSstFile("001.sst", "dfs1", 100L)
val file2 = RocksDBSstFile("002.sst", "dfs2", 100L)

// Get access to the private ConcurrentHashMap
val origVersionToRocksDBFilesMap: ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]
= getPrivateField(originalFileManager, "versionToRocksDBFiles")

// Add sample files to hash maps
originalFileManager.localFilesToDfsFiles
.put("001.sst", file1)
origVersionToRocksDBFilesMap
.put(1L, Seq(file1))

// Check deepCopy duplicates the original hashmaps
val copiedFileManager = originalFileManager.deepCopy()
val copiedVersionToRocksDBFilesMap: ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]]
= getPrivateField(copiedFileManager, "versionToRocksDBFiles")

assert(origVersionToRocksDBFilesMap.equals(copiedVersionToRocksDBFilesMap))
assert(originalFileManager.localFilesToDfsFiles
.equals(copiedFileManager.localFilesToDfsFiles))

// Add new values to original file manager hash maps
originalFileManager.localFilesToDfsFiles
.put("002.sst", file2)

origVersionToRocksDBFilesMap
.put(2L, Seq(file2))

// Check deep copied and original file manager states differ
assert(origVersionToRocksDBFilesMap.containsKey(2L))
assert(!copiedVersionToRocksDBFilesMap.containsKey(2L))

assert(originalFileManager.localFilesToDfsFiles.containsKey("002.sst"))
assert(!copiedFileManager.localFilesToDfsFiles.containsKey("002.sst"))
}

testWithChangelogCheckpointingEnabled("RocksDBFileManager: read and write changelog") {
val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1")
val fileManager = new RocksDBFileManager(
Expand Down Expand Up @@ -2303,6 +2346,12 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared
}

def listFiles(file: String): Seq[File] = listFiles(new File(file))

def getPrivateField[T](obj: Any, fieldName: String): T = {
val field = obj.getClass.getDeclaredField(fieldName)
field.setAccessible(true)
field.get(obj).asInstanceOf[T]
}
}

object RocksDBSuite {
Expand Down

0 comments on commit 2e13596

Please sign in to comment.