From 2e13596373e376aea0f1fe0834f42e66606e4e10 Mon Sep 17 00:00:00 2001 From: Riya Verma Date: Mon, 3 Jun 2024 16:53:43 -0700 Subject: [PATCH] Add deepCopy test case --- .../streaming/state/RocksDBSuite.scala | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala index 6086fd43846f6..ce5dcd841dc27 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming.state import java.io._ import java.nio.charset.Charset +import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable import scala.language.implicitConversions @@ -874,6 +875,48 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared ) } + testWithChangelogCheckpointingEnabled("RocksDBFileManager: deepCopy") { + val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1") + val originalFileManager = new RocksDBFileManager( + dfsRootDir.getAbsolutePath, Utils.createTempDir(), new Configuration) + + val file1 = RocksDBSstFile("001.sst", "dfs1", 100L) + val file2 = RocksDBSstFile("002.sst", "dfs2", 100L) + + // Get access to the private ConcurrentHashMap + val origVersionToRocksDBFilesMap: ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]] + = getPrivateField(originalFileManager, "versionToRocksDBFiles") + + // Add sample files to hash maps + originalFileManager.localFilesToDfsFiles + .put("001.sst", file1) + origVersionToRocksDBFilesMap + .put(1L, Seq(file1)) + + // Check deepCopy duplicates the original hashmaps + val copiedFileManager = originalFileManager.deepCopy() + val copiedVersionToRocksDBFilesMap: ConcurrentHashMap[Long, Seq[RocksDBImmutableFile]] + = getPrivateField(copiedFileManager, "versionToRocksDBFiles") + + assert(origVersionToRocksDBFilesMap.equals(copiedVersionToRocksDBFilesMap)) + assert(originalFileManager.localFilesToDfsFiles + .equals(copiedFileManager.localFilesToDfsFiles)) + + // Add new values to original file manager hash maps + originalFileManager.localFilesToDfsFiles + .put("002.sst", file2) + + origVersionToRocksDBFilesMap + .put(2L, Seq(file2)) + + // Check deep copied and original file manager states differ + assert(origVersionToRocksDBFilesMap.containsKey(2L)) + assert(!copiedVersionToRocksDBFilesMap.containsKey(2L)) + + assert(originalFileManager.localFilesToDfsFiles.containsKey("002.sst")) + assert(!copiedFileManager.localFilesToDfsFiles.containsKey("002.sst")) + } + testWithChangelogCheckpointingEnabled("RocksDBFileManager: read and write changelog") { val dfsRootDir = new File(Utils.createTempDir().getAbsolutePath + "/state/1/1") val fileManager = new RocksDBFileManager( @@ -2303,6 +2346,12 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared } def listFiles(file: String): Seq[File] = listFiles(new File(file)) + + def getPrivateField[T](obj: Any, fieldName: String): T = { + val field = obj.getClass.getDeclaredField(fieldName) + field.setAccessible(true) + field.get(obj).asInstanceOf[T] + } } object RocksDBSuite {