diff --git a/coordinator/core/src/main/kotlin/net/consensys/zkevm/ethereum/coordination/blob/BlobCompressionProofCoordinator.kt b/coordinator/core/src/main/kotlin/net/consensys/zkevm/ethereum/coordination/blob/BlobCompressionProofCoordinator.kt index 935c133b9..c3b55de48 100644 --- a/coordinator/core/src/main/kotlin/net/consensys/zkevm/ethereum/coordination/blob/BlobCompressionProofCoordinator.kt +++ b/coordinator/core/src/main/kotlin/net/consensys/zkevm/ethereum/coordination/blob/BlobCompressionProofCoordinator.kt @@ -21,8 +21,8 @@ import net.consensys.zkevm.persistence.blob.BlobsRepository import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.Logger import tech.pegasys.teku.infrastructure.async.SafeFuture -import java.util.concurrent.ArrayBlockingQueue import java.util.concurrent.CompletableFuture +import java.util.concurrent.LinkedBlockingDeque import kotlin.time.Duration class BlobCompressionProofCoordinator( @@ -37,7 +37,7 @@ class BlobCompressionProofCoordinator( ) : BlobCreationHandler, LongRunningService { private val log: Logger = LogManager.getLogger(this::class.java) private val defaultQueueCapacity = 1000 // Should be more than blob submission limit - private val blobsToHandle = ArrayBlockingQueue(defaultQueueCapacity) + private val blobsToHandle = LinkedBlockingDeque(defaultQueueCapacity) private var timerId: Long? = null private lateinit var blobPollingAction: Handler private val blobsCounter = metricsFacade.createCounter( @@ -181,10 +181,7 @@ class BlobCompressionProofCoordinator( override fun start(): CompletableFuture { if (timerId == null) { blobPollingAction = Handler { - handleBlobsFromTheQueue().whenComplete { _, error -> - error?.let { - log.error("Error polling blobs for aggregation: errorMessage={}", error.message, error) - } + handleBlobFromTheQueue().whenComplete { _, _ -> timerId = vertx.setTimer(config.pollingInterval.inWholeMilliseconds, blobPollingAction) } } @@ -193,22 +190,22 @@ class BlobCompressionProofCoordinator( return SafeFuture.completedFuture(Unit) } - private fun handleBlobsFromTheQueue(): SafeFuture { - var blobsHandlingFuture = SafeFuture.completedFuture(Unit) - if (blobsToHandle.isNotEmpty()) { + private fun handleBlobFromTheQueue(): SafeFuture { + return if (blobsToHandle.isNotEmpty()) { val blobToHandle = blobsToHandle.poll() - blobsHandlingFuture = blobsHandlingFuture.thenCompose { - sendBlobToCompressionProver(blobToHandle).whenException { exception -> - log.error( - "Error in sending blob to compression prover: blob={} errorMessage={} ", + sendBlobToCompressionProver(blobToHandle) + .whenException { exception -> + blobsToHandle.putFirst(blobToHandle) + log.warn( + "Error handling blob from BlobCompressionProofCoordinator queue: blob={} errorMessage={}", blobToHandle.intervalString(), exception.message, exception ) } - } + } else { + SafeFuture.completedFuture(Unit) } - return blobsHandlingFuture } override fun stop(): CompletableFuture { diff --git a/coordinator/core/src/test/kotlin/net/consensys/zkevm/ethereum/coordination/blob/RollingBlobShnarfCalculatorTest.kt b/coordinator/core/src/test/kotlin/net/consensys/zkevm/ethereum/coordination/blob/RollingBlobShnarfCalculatorTest.kt index e19eca467..29ba57e94 100644 --- a/coordinator/core/src/test/kotlin/net/consensys/zkevm/ethereum/coordination/blob/RollingBlobShnarfCalculatorTest.kt +++ b/coordinator/core/src/test/kotlin/net/consensys/zkevm/ethereum/coordination/blob/RollingBlobShnarfCalculatorTest.kt @@ -10,6 +10,7 @@ import org.junit.jupiter.api.assertThrows import org.mockito.kotlin.any import org.mockito.kotlin.eq import org.mockito.kotlin.mock +import org.mockito.kotlin.reset import org.mockito.kotlin.times import org.mockito.kotlin.verify import org.mockito.kotlin.whenever @@ -24,7 +25,7 @@ class RollingBlobShnarfCalculatorTest { private lateinit var mockBlobsRepository: BlobsRepository private lateinit var firstBlob: BlobRecord private val firstBlobEndBlockNumber = 100UL - private val fakeBlobShnarfCalculator = FakeBlobShnarfCalculator() + private val mockBlobShnarfCalculator = mock() @BeforeEach fun beforeEach() { @@ -44,8 +45,21 @@ class RollingBlobShnarfCalculatorTest { } } + whenever(mockBlobShnarfCalculator.calculateShnarf(any(), any(), any(), any(), any())).thenReturn( + ShnarfResult( + dataHash = Random.nextBytes(32), + snarkHash = Random.nextBytes(32), + expectedX = Random.nextBytes(32), + expectedY = Random.nextBytes(32), + expectedShnarf = Random.nextBytes(32), + commitment = Random.nextBytes(48), + kzgProofContract = Random.nextBytes(48), + kzgProofSideCar = Random.nextBytes(48) + ) + ) + rollingBlobShnarfCalculator = RollingBlobShnarfCalculator( - blobShnarfCalculator = fakeBlobShnarfCalculator, + blobShnarfCalculator = mockBlobShnarfCalculator, blobsRepository = mockBlobsRepository, genesisShnarf = ByteArray(32) ) @@ -125,4 +139,29 @@ class RollingBlobShnarfCalculatorTest { "is not equal to parent blob end block number=$secondBlobEndBlockNumber + 1" ) } + + @Test + fun `returns failed future when shnarf calculator throws exception`() { + reset(mockBlobShnarfCalculator) + whenever(mockBlobShnarfCalculator.calculateShnarf(any(), any(), any(), any(), any())) + .thenThrow(RuntimeException("Error while calculating Shnarf")) + val secondBlobStartBlockNumber = firstBlobEndBlockNumber + 1UL + val secondBlobEndBlockNumber = firstBlobEndBlockNumber + 100UL + + val exception = assertThrows { + rollingBlobShnarfCalculator.calculateShnarf( + compressedData = Random.nextBytes(100), + parentStateRootHash = Random.nextBytes(32), + finalStateRootHash = Random.nextBytes(32), + conflationOrder = BlockIntervals(secondBlobStartBlockNumber, listOf(secondBlobEndBlockNumber)) + ).get() + } + + assertThat(exception).isNotNull() + assertThat(exception).hasCauseInstanceOf(RuntimeException::class.java) + assertThat(exception.cause).hasMessage("Error while calculating Shnarf") + + verify(mockBlobsRepository, times(1)).findBlobByEndBlockNumber(any()) + verify(mockBlobsRepository, times(1)).findBlobByEndBlockNumber(eq(firstBlobEndBlockNumber.toLong())) + } } diff --git a/coordinator/persistence/blob/src/integrationTest/kotlin/net/consensys/zkevm/ethereum/coordination/blob/BlobCompressionProofCoordinatorIntTest.kt b/coordinator/persistence/blob/src/integrationTest/kotlin/net/consensys/zkevm/ethereum/coordination/blob/BlobCompressionProofCoordinatorIntTest.kt index 7a45fec9a..ac63e7f55 100644 --- a/coordinator/persistence/blob/src/integrationTest/kotlin/net/consensys/zkevm/ethereum/coordination/blob/BlobCompressionProofCoordinatorIntTest.kt +++ b/coordinator/persistence/blob/src/integrationTest/kotlin/net/consensys/zkevm/ethereum/coordination/blob/BlobCompressionProofCoordinatorIntTest.kt @@ -360,4 +360,73 @@ class BlobCompressionProofCoordinatorIntTest : CleanDbTestSuiteParallel() { } testContext.completeNow() } + + @Test + fun `test blob handle failures re-queue's the blob`( + testContext: VertxTestContext + ) { + val prevBlobRecord = createBlobRecord( + startBlockNumber = expectedStartBlock, + endBlockNumber = expectedEndBlock, + startBlockTime = expectedStartBlockTime + ) + timeToReturn = Clock.System.now() + blobsPostgresDao.saveNewBlob(prevBlobRecord).get() + + val blobs = createConsecutiveBlobs( + numberOfBlobs = maxBlobsToReturn.toInt() - 1, + startBlockNumber = expectedEndBlock + 1UL, + startBlockTime = prevBlobRecord.endBlockTime.plus(12.seconds) + ) + val maxMockedBlobZkStateFailures = 10 + var blobZkStateFailures = 0 + var blobZkStateCount = 0 + + Mockito.reset(blobZkStateProvider) + whenever(blobZkStateProvider.getBlobZKState(any())).thenAnswer { + blobZkStateCount += 1 + if (blobZkStateFailures <= maxMockedBlobZkStateFailures && blobZkStateCount % 2 == 0) { + blobZkStateFailures += 1 + SafeFuture.failedFuture(RuntimeException("Forced mock blobZkStateProvider failure")) + } else { + SafeFuture.completedFuture( + BlobZkState( + parentStateRootHash = Bytes32.random().toArray(), + finalStateRootHash = Bytes32.random().toArray() + ) + ) + } + } + + timeToReturn = Clock.System.now() + SafeFuture.allOf( + blobs.map { + blobCompressionProofCoordinator.handleBlob(it) + }.stream() + ).get() + + waitAtMost(100.seconds.toJavaDuration()) + .pollInterval(200.milliseconds.toJavaDuration()) + .untilAsserted { + val actualBlobs = blobsPostgresDao.getConsecutiveBlobsFromBlockNumber( + expectedStartBlock, + blobs.last().endBlockTime.plus(1.seconds) + ).get() + + assertThat(actualBlobs).size().isEqualTo(blobs.size + 1) + verify(blobsRepositorySpy, times(1)).findBlobByEndBlockNumber(any()) + verify(blobsRepositorySpy, times(1)).findBlobByEndBlockNumber(eq(expectedEndBlock.toLong())) + + var previousBlob = actualBlobs.first() + actualBlobs.drop(1).forEach { blobRecord -> + val blobCompressionProof = blobRecord.blobCompressionProof!! + testContext.verify { + assertThat(blobCompressionProof.parentDataHash).isEqualTo(previousBlob.blobHash) + assertThat(blobCompressionProof.prevShnarf).isEqualTo(previousBlob.expectedShnarf) + } + previousBlob = blobRecord + } + } + testContext.completeNow() + } } diff --git a/coordinator/persistence/blob/src/test/kotlin/net/consensys/zkevm/ethereum/coordinator/blob/BlobCompressionProofCoordinatorTest.kt b/coordinator/persistence/blob/src/test/kotlin/net/consensys/zkevm/ethereum/coordinator/blob/BlobCompressionProofCoordinatorTest.kt index a6e708324..84f0fd8d8 100644 --- a/coordinator/persistence/blob/src/test/kotlin/net/consensys/zkevm/ethereum/coordinator/blob/BlobCompressionProofCoordinatorTest.kt +++ b/coordinator/persistence/blob/src/test/kotlin/net/consensys/zkevm/ethereum/coordinator/blob/BlobCompressionProofCoordinatorTest.kt @@ -27,9 +27,11 @@ import org.mockito.Mockito import org.mockito.kotlin.any import org.mockito.kotlin.eq import org.mockito.kotlin.mock +import org.mockito.kotlin.times import org.mockito.kotlin.verify import org.mockito.kotlin.whenever import tech.pegasys.teku.infrastructure.async.SafeFuture +import java.lang.RuntimeException import kotlin.random.Random import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.seconds @@ -169,4 +171,113 @@ class BlobCompressionProofCoordinatorTest { ) } } + + @Test + fun `verify failed blob is re-queued and processed in order`() { + val startBlockTime = fixedClock.now() + val expectedParentDataHash = Random.nextBytes(32) + val expectedPrevShnarf = Random.nextBytes(32) + val parentStateRootHash = Random.nextBytes(32) + val finalStateRootHash = Random.nextBytes(32) + + val blob1 = Blob( + conflations = listOf( + ConflationCalculationResult( + startBlockNumber = 1uL, + endBlockNumber = 10uL, + conflationTrigger = ConflationTrigger.TRACES_LIMIT, + tracesCounters = TracesCountersV1.EMPTY_TRACES_COUNT + ) + ), + compressedData = Random.nextBytes(128), + startBlockTime = startBlockTime, + endBlockTime = fixedClock.now().plus((12 * (10 - 1)).seconds) + ) + + val blob2 = Blob( + conflations = listOf( + ConflationCalculationResult( + startBlockNumber = 11uL, + endBlockNumber = 20uL, + conflationTrigger = ConflationTrigger.TRACES_LIMIT, + tracesCounters = TracesCountersV1.EMPTY_TRACES_COUNT + ) + ), + compressedData = Random.nextBytes(128), + startBlockTime = startBlockTime, + endBlockTime = fixedClock.now().plus((12 * (20 - 11)).seconds) + ) + + whenever(blobZkStateProvider.getBlobZKState(any())) + .thenReturn( + SafeFuture.failedFuture(RuntimeException("Forced blobZkStateProvider mock failure")), + SafeFuture.completedFuture( + BlobZkState( + parentStateRootHash = parentStateRootHash, + finalStateRootHash = finalStateRootHash + ) + ), + SafeFuture.completedFuture( + BlobZkState( + parentStateRootHash = parentStateRootHash, + finalStateRootHash = finalStateRootHash + ) + ) + ) + + val shnarfResult = ShnarfResult( + dataHash = Random.nextBytes(32), + snarkHash = Random.nextBytes(32), + expectedX = Random.nextBytes(32), + expectedY = Random.nextBytes(32), + expectedShnarf = Random.nextBytes(32), + commitment = Random.nextBytes(48), + kzgProofContract = Random.nextBytes(48), + kzgProofSideCar = Random.nextBytes(48) + ) + + whenever(rollingBlobShnarfCalculator.calculateShnarf(any(), any(), any(), any())) + .thenAnswer { + SafeFuture.completedFuture( + RollingBlobShnarfResult( + shnarfResult = shnarfResult, + parentBlobHash = expectedParentDataHash, + parentBlobShnarf = expectedPrevShnarf + ) + ) + } + + blobCompressionProofCoordinator.handleBlob(blob1).get() + blobCompressionProofCoordinator.handleBlob(blob2).get() + + await() + .untilAsserted { + verify(blobCompressionProverClient, times(1)) + .requestBlobCompressionProof( + compressedData = eq(blob1.compressedData), + conflations = eq(blob1.conflations), + parentStateRootHash = eq(parentStateRootHash), + finalStateRootHash = eq(finalStateRootHash), + parentDataHash = eq(expectedParentDataHash), + prevShnarf = eq(expectedPrevShnarf), + expectedShnarfResult = eq(shnarfResult), + commitment = eq(shnarfResult.commitment), + kzgProofContract = eq(shnarfResult.kzgProofContract), + kzgProofSideCar = eq(shnarfResult.kzgProofSideCar) + ) + verify(blobCompressionProverClient, times(1)) + .requestBlobCompressionProof( + compressedData = eq(blob2.compressedData), + conflations = eq(blob2.conflations), + parentStateRootHash = eq(parentStateRootHash), + finalStateRootHash = eq(finalStateRootHash), + parentDataHash = eq(expectedParentDataHash), + prevShnarf = eq(expectedPrevShnarf), + expectedShnarfResult = eq(shnarfResult), + commitment = eq(shnarfResult.commitment), + kzgProofContract = eq(shnarfResult.kzgProofContract), + kzgProofSideCar = eq(shnarfResult.kzgProofSideCar) + ) + } + } }