Skip to content

Commit

Permalink
[3816] BlobCompressionProofCoordinator will retry/requeue blob when b…
Browse files Browse the repository at this point in the history
…lob handling fails (#3848)

* [3816] BlobCompressionProofCoordinator will retry/requeue blob when blob handling fails

* fix test

* fix spotless

* Remove redundant error log in BlobCompressionProofCoordinator

* Use log warn instead or error on retry
  • Loading branch information
gauravahuja authored Sep 4, 2024
1 parent 50a5d9b commit b7822ec
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import net.consensys.zkevm.persistence.blob.BlobsRepository
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import tech.pegasys.teku.infrastructure.async.SafeFuture
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.CompletableFuture
import java.util.concurrent.LinkedBlockingDeque
import kotlin.time.Duration

class BlobCompressionProofCoordinator(
Expand All @@ -37,7 +37,7 @@ class BlobCompressionProofCoordinator(
) : BlobCreationHandler, LongRunningService {
private val log: Logger = LogManager.getLogger(this::class.java)
private val defaultQueueCapacity = 1000 // Should be more than blob submission limit
private val blobsToHandle = ArrayBlockingQueue<Blob>(defaultQueueCapacity)
private val blobsToHandle = LinkedBlockingDeque<Blob>(defaultQueueCapacity)
private var timerId: Long? = null
private lateinit var blobPollingAction: Handler<Long>
private val blobsCounter = metricsFacade.createCounter(
Expand Down Expand Up @@ -181,10 +181,7 @@ class BlobCompressionProofCoordinator(
override fun start(): CompletableFuture<Unit> {
if (timerId == null) {
blobPollingAction = Handler<Long> {
handleBlobsFromTheQueue().whenComplete { _, error ->
error?.let {
log.error("Error polling blobs for aggregation: errorMessage={}", error.message, error)
}
handleBlobFromTheQueue().whenComplete { _, _ ->
timerId = vertx.setTimer(config.pollingInterval.inWholeMilliseconds, blobPollingAction)
}
}
Expand All @@ -193,22 +190,22 @@ class BlobCompressionProofCoordinator(
return SafeFuture.completedFuture(Unit)
}

private fun handleBlobsFromTheQueue(): SafeFuture<Unit> {
var blobsHandlingFuture = SafeFuture.completedFuture(Unit)
if (blobsToHandle.isNotEmpty()) {
private fun handleBlobFromTheQueue(): SafeFuture<Unit> {
return if (blobsToHandle.isNotEmpty()) {
val blobToHandle = blobsToHandle.poll()
blobsHandlingFuture = blobsHandlingFuture.thenCompose {
sendBlobToCompressionProver(blobToHandle).whenException { exception ->
log.error(
"Error in sending blob to compression prover: blob={} errorMessage={} ",
sendBlobToCompressionProver(blobToHandle)
.whenException { exception ->
blobsToHandle.putFirst(blobToHandle)
log.warn(
"Error handling blob from BlobCompressionProofCoordinator queue: blob={} errorMessage={}",
blobToHandle.intervalString(),
exception.message,
exception
)
}
}
} else {
SafeFuture.completedFuture(Unit)
}
return blobsHandlingFuture
}

override fun stop(): CompletableFuture<Unit> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.junit.jupiter.api.assertThrows
import org.mockito.kotlin.any
import org.mockito.kotlin.eq
import org.mockito.kotlin.mock
import org.mockito.kotlin.reset
import org.mockito.kotlin.times
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
Expand All @@ -24,7 +25,7 @@ class RollingBlobShnarfCalculatorTest {
private lateinit var mockBlobsRepository: BlobsRepository
private lateinit var firstBlob: BlobRecord
private val firstBlobEndBlockNumber = 100UL
private val fakeBlobShnarfCalculator = FakeBlobShnarfCalculator()
private val mockBlobShnarfCalculator = mock<BlobShnarfCalculator>()

@BeforeEach
fun beforeEach() {
Expand All @@ -44,8 +45,21 @@ class RollingBlobShnarfCalculatorTest {
}
}

whenever(mockBlobShnarfCalculator.calculateShnarf(any(), any(), any(), any(), any())).thenReturn(
ShnarfResult(
dataHash = Random.nextBytes(32),
snarkHash = Random.nextBytes(32),
expectedX = Random.nextBytes(32),
expectedY = Random.nextBytes(32),
expectedShnarf = Random.nextBytes(32),
commitment = Random.nextBytes(48),
kzgProofContract = Random.nextBytes(48),
kzgProofSideCar = Random.nextBytes(48)
)
)

rollingBlobShnarfCalculator = RollingBlobShnarfCalculator(
blobShnarfCalculator = fakeBlobShnarfCalculator,
blobShnarfCalculator = mockBlobShnarfCalculator,
blobsRepository = mockBlobsRepository,
genesisShnarf = ByteArray(32)
)
Expand Down Expand Up @@ -125,4 +139,29 @@ class RollingBlobShnarfCalculatorTest {
"is not equal to parent blob end block number=$secondBlobEndBlockNumber + 1"
)
}

@Test
fun `returns failed future when shnarf calculator throws exception`() {
reset(mockBlobShnarfCalculator)
whenever(mockBlobShnarfCalculator.calculateShnarf(any(), any(), any(), any(), any()))
.thenThrow(RuntimeException("Error while calculating Shnarf"))
val secondBlobStartBlockNumber = firstBlobEndBlockNumber + 1UL
val secondBlobEndBlockNumber = firstBlobEndBlockNumber + 100UL

val exception = assertThrows<ExecutionException> {
rollingBlobShnarfCalculator.calculateShnarf(
compressedData = Random.nextBytes(100),
parentStateRootHash = Random.nextBytes(32),
finalStateRootHash = Random.nextBytes(32),
conflationOrder = BlockIntervals(secondBlobStartBlockNumber, listOf(secondBlobEndBlockNumber))
).get()
}

assertThat(exception).isNotNull()
assertThat(exception).hasCauseInstanceOf(RuntimeException::class.java)
assertThat(exception.cause).hasMessage("Error while calculating Shnarf")

verify(mockBlobsRepository, times(1)).findBlobByEndBlockNumber(any())
verify(mockBlobsRepository, times(1)).findBlobByEndBlockNumber(eq(firstBlobEndBlockNumber.toLong()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -360,4 +360,73 @@ class BlobCompressionProofCoordinatorIntTest : CleanDbTestSuiteParallel() {
}
testContext.completeNow()
}

@Test
fun `test blob handle failures re-queue's the blob`(
testContext: VertxTestContext
) {
val prevBlobRecord = createBlobRecord(
startBlockNumber = expectedStartBlock,
endBlockNumber = expectedEndBlock,
startBlockTime = expectedStartBlockTime
)
timeToReturn = Clock.System.now()
blobsPostgresDao.saveNewBlob(prevBlobRecord).get()

val blobs = createConsecutiveBlobs(
numberOfBlobs = maxBlobsToReturn.toInt() - 1,
startBlockNumber = expectedEndBlock + 1UL,
startBlockTime = prevBlobRecord.endBlockTime.plus(12.seconds)
)
val maxMockedBlobZkStateFailures = 10
var blobZkStateFailures = 0
var blobZkStateCount = 0

Mockito.reset(blobZkStateProvider)
whenever(blobZkStateProvider.getBlobZKState(any())).thenAnswer {
blobZkStateCount += 1
if (blobZkStateFailures <= maxMockedBlobZkStateFailures && blobZkStateCount % 2 == 0) {
blobZkStateFailures += 1
SafeFuture.failedFuture(RuntimeException("Forced mock blobZkStateProvider failure"))
} else {
SafeFuture.completedFuture(
BlobZkState(
parentStateRootHash = Bytes32.random().toArray(),
finalStateRootHash = Bytes32.random().toArray()
)
)
}
}

timeToReturn = Clock.System.now()
SafeFuture.allOf(
blobs.map {
blobCompressionProofCoordinator.handleBlob(it)
}.stream()
).get()

waitAtMost(100.seconds.toJavaDuration())
.pollInterval(200.milliseconds.toJavaDuration())
.untilAsserted {
val actualBlobs = blobsPostgresDao.getConsecutiveBlobsFromBlockNumber(
expectedStartBlock,
blobs.last().endBlockTime.plus(1.seconds)
).get()

assertThat(actualBlobs).size().isEqualTo(blobs.size + 1)
verify(blobsRepositorySpy, times(1)).findBlobByEndBlockNumber(any())
verify(blobsRepositorySpy, times(1)).findBlobByEndBlockNumber(eq(expectedEndBlock.toLong()))

var previousBlob = actualBlobs.first()
actualBlobs.drop(1).forEach { blobRecord ->
val blobCompressionProof = blobRecord.blobCompressionProof!!
testContext.verify {
assertThat(blobCompressionProof.parentDataHash).isEqualTo(previousBlob.blobHash)
assertThat(blobCompressionProof.prevShnarf).isEqualTo(previousBlob.expectedShnarf)
}
previousBlob = blobRecord
}
}
testContext.completeNow()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ import org.mockito.Mockito
import org.mockito.kotlin.any
import org.mockito.kotlin.eq
import org.mockito.kotlin.mock
import org.mockito.kotlin.times
import org.mockito.kotlin.verify
import org.mockito.kotlin.whenever
import tech.pegasys.teku.infrastructure.async.SafeFuture
import java.lang.RuntimeException
import kotlin.random.Random
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
Expand Down Expand Up @@ -169,4 +171,113 @@ class BlobCompressionProofCoordinatorTest {
)
}
}

@Test
fun `verify failed blob is re-queued and processed in order`() {
val startBlockTime = fixedClock.now()
val expectedParentDataHash = Random.nextBytes(32)
val expectedPrevShnarf = Random.nextBytes(32)
val parentStateRootHash = Random.nextBytes(32)
val finalStateRootHash = Random.nextBytes(32)

val blob1 = Blob(
conflations = listOf(
ConflationCalculationResult(
startBlockNumber = 1uL,
endBlockNumber = 10uL,
conflationTrigger = ConflationTrigger.TRACES_LIMIT,
tracesCounters = TracesCountersV1.EMPTY_TRACES_COUNT
)
),
compressedData = Random.nextBytes(128),
startBlockTime = startBlockTime,
endBlockTime = fixedClock.now().plus((12 * (10 - 1)).seconds)
)

val blob2 = Blob(
conflations = listOf(
ConflationCalculationResult(
startBlockNumber = 11uL,
endBlockNumber = 20uL,
conflationTrigger = ConflationTrigger.TRACES_LIMIT,
tracesCounters = TracesCountersV1.EMPTY_TRACES_COUNT
)
),
compressedData = Random.nextBytes(128),
startBlockTime = startBlockTime,
endBlockTime = fixedClock.now().plus((12 * (20 - 11)).seconds)
)

whenever(blobZkStateProvider.getBlobZKState(any()))
.thenReturn(
SafeFuture.failedFuture(RuntimeException("Forced blobZkStateProvider mock failure")),
SafeFuture.completedFuture(
BlobZkState(
parentStateRootHash = parentStateRootHash,
finalStateRootHash = finalStateRootHash
)
),
SafeFuture.completedFuture(
BlobZkState(
parentStateRootHash = parentStateRootHash,
finalStateRootHash = finalStateRootHash
)
)
)

val shnarfResult = ShnarfResult(
dataHash = Random.nextBytes(32),
snarkHash = Random.nextBytes(32),
expectedX = Random.nextBytes(32),
expectedY = Random.nextBytes(32),
expectedShnarf = Random.nextBytes(32),
commitment = Random.nextBytes(48),
kzgProofContract = Random.nextBytes(48),
kzgProofSideCar = Random.nextBytes(48)
)

whenever(rollingBlobShnarfCalculator.calculateShnarf(any(), any(), any(), any()))
.thenAnswer {
SafeFuture.completedFuture(
RollingBlobShnarfResult(
shnarfResult = shnarfResult,
parentBlobHash = expectedParentDataHash,
parentBlobShnarf = expectedPrevShnarf
)
)
}

blobCompressionProofCoordinator.handleBlob(blob1).get()
blobCompressionProofCoordinator.handleBlob(blob2).get()

await()
.untilAsserted {
verify(blobCompressionProverClient, times(1))
.requestBlobCompressionProof(
compressedData = eq(blob1.compressedData),
conflations = eq(blob1.conflations),
parentStateRootHash = eq(parentStateRootHash),
finalStateRootHash = eq(finalStateRootHash),
parentDataHash = eq(expectedParentDataHash),
prevShnarf = eq(expectedPrevShnarf),
expectedShnarfResult = eq(shnarfResult),
commitment = eq(shnarfResult.commitment),
kzgProofContract = eq(shnarfResult.kzgProofContract),
kzgProofSideCar = eq(shnarfResult.kzgProofSideCar)
)
verify(blobCompressionProverClient, times(1))
.requestBlobCompressionProof(
compressedData = eq(blob2.compressedData),
conflations = eq(blob2.conflations),
parentStateRootHash = eq(parentStateRootHash),
finalStateRootHash = eq(finalStateRootHash),
parentDataHash = eq(expectedParentDataHash),
prevShnarf = eq(expectedPrevShnarf),
expectedShnarfResult = eq(shnarfResult),
commitment = eq(shnarfResult.commitment),
kzgProofContract = eq(shnarfResult.kzgProofContract),
kzgProofSideCar = eq(shnarfResult.kzgProofSideCar)
)
}
}
}

0 comments on commit b7822ec

Please sign in to comment.