Skip to content

Commit

Permalink
coordinator: Generic prover client (#3847)
Browse files Browse the repository at this point in the history
* coordinator: Generic prover client WIP preparation to support prover switch between versions

* coordinator: WIP prover clients use GenericFileBasedProverClient

* coordinator: rename FileBasedProverConfig params

* coordinator: adds tests for GenericFileBasedProverClientTest

* coordinator: removes unnecessary annotation

* coordinator: removes legacy prover clients

* coordinator: small simplification

* coordinator: adds tests for ExecutionProofRequestDataDecorator

* coordinator: removes unused class ProverErrorType.kt

* Revert "coordinator: removes unused class ProverErrorType.kt"

This reverts commit e8ffbfb7a45c7e0e6287d2a0751ba94d0141529b.

* coordinator: naming consistency

* coordinator: adapt Blob coordinator to new prover interface

* coordinator: rename RequestResponse file

* coordinator: adapt to new prover aggregation client interface

* coordinator: adapt to new prover batch execution client interface

* coordinator: remove unnecessary prover client code

* coordinator: naming consistency

* Coordinator prover client v2 routing (#3872)

* coordinator: support for multi-prover

* coordinator: prover client inprove request reuse

* coordinator: small code readability improvement

* coordinator: small code readability improvement

* coordinator: small classes and comment renaming

* improve error handling in app stop

* remove duplicated default argument

* fix typo

* update coordinator image tag

* fix typo and add test assertion

* fix comment

* fix merge on BlobCompressionProofCoordinatorTest
  • Loading branch information
jpnovais authored Sep 4, 2024
1 parent b7822ec commit d8af29a
Show file tree
Hide file tree
Showing 61 changed files with 1,988 additions and 2,433 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
[prover]
fs-requests-directory="/data/prover-execution/v3/requests"
fs-responses-directory="/data/prover-execution/v3/responses"

[blob-compression.prover]
fs-requests-directory="/data/prover-compression/v3/requests"
fs-responses-directory="/data/prover-compression/v3/responses"

[proof-aggregation.prover]
fs-requests-directory="/data/prover-aggregation/v3/requests"
fs-responses-directory="/data/prover-aggregation/v3/responses"
[prover.execution]
fs-requests-directory = "/data/prover/v3/execution/requests"
fs-responses-directory = "/data/prover/v3/execution/responses"
[prover.blob-compression]
fs-requests-directory = "/data/prover/v3/compression/requests"
fs-responses-directory = "/data/prover/v3/compression/responses"
[prover.proof-aggregation]
fs-requests-directory = "/data/prover/v3/aggregation/requests"
fs-responses-directory = "/data/prover/v3/aggregation/responses"

[zk-traces]
eth-api="http://traces-node-v2:8545"
Expand Down
45 changes: 24 additions & 21 deletions config/coordinator/coordinator-docker.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,37 @@ duplicated-logs-debounce-time="PT15S"
eip4844-switch-l2-block-number=0

[prover]
fs-requests-directory="/data/prover-execution/v2/requests"
fs-responses-directory="/data/prover-execution/v2/responses"
fs-inprogress-request-writing-suffix=".inprogress_coordinator_writing"
fs-inprogress-proving-suffix-pattern=".*\\.inprogress\\.prover.*"
fs-polling-interval="PT1S"
fs-polling-timeout="PT10M"
fs-inprogress-request-writing-suffix = ".inprogress_coordinator_writing"
fs-inprogress-proving-suffix-pattern = ".*\\.inprogress\\.prover.*"
fs-polling-interval = "PT1S"
fs-polling-timeout = "PT10M"
[prover.execution]
fs-requests-directory = "/data/prover/v2/execution/requests"
fs-responses-directory = "/data/prover/v2/execution/responses"
[prover.blob-compression]
fs-requests-directory = "/data/prover/v2/compression/requests"
fs-responses-directory = "/data/prover/v2/compression/responses"
[prover.proof-aggregation]
fs-requests-directory = "/data/prover/v2/aggregation/requests"
fs-responses-directory = "/data/prover/v2/aggregation/responses"
#[prover.new]
#switch-block-number-inclusive=1000
#[prover.new.execution]
#fs-requests-directory = "/data/prover/v3/execution/requests"
#fs-responses-directory = "/data/prover/v3/execution/responses"
#[prover.new.blob-compression]
#fs-requests-directory = "/data/prover/v3/compression/requests"
#fs-responses-directory = "/data/prover/v3/compression/responses"
#[prover.new.proof-aggregation]
#fs-requests-directory = "/data/prover/v3/aggregation/requests"
#fs-responses-directory = "/data/prover/v3/aggregation/responses"

[blob-compression]
blob-size-limit=102400 # 100KB
handler-polling-interval="PT1S"
# default batches limit is aggregation-proofs-limit -1
# batches-limit must be less than or equal to aggregation-proofs-limit-1
batches-limit=1
[blob-compression.prover]
fs-requests-directory="/data/prover-compression/v2/requests"
fs-responses-directory="/data/prover-compression/v2/responses"
fs-inprogress-request-writing-suffix=".inprogress_coordinator_writing"
fs-inprogress-proving-suffix-pattern=".*\\.inprogress\\.prover.*"
fs-polling-interval="PT1S"
fs-polling-timeout="PT10M"

[zk-traces]
eth-api="http://traces-node:8545"
Expand Down Expand Up @@ -150,14 +161,6 @@ aggregation-coordinator-polling-interval="PT2S"
deadline-check-interval="PT8S"
target-end-blocks=[]

[proof-aggregation.prover]
fs-requests-directory="/data/prover-aggregation/v2/requests"
fs-responses-directory="/data/prover-aggregation/v2/responses"
fs-inprogress-request-writing-suffix=".inprogress_coordinator_writing"
fs-inprogress-proving-suffix-pattern=".*\\.inprogress\\.prover.*"
fs-polling-interval="PT20S"
fs-polling-timeout="PT20M"

[finalization-signer]
# Web3j/Web3signer
type="Web3j"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
[prover]
[prover.execution]
fs-requests-directory = "/data/prover/v3/execution/requests"
fs-responses-directory = "/data/prover/v3/execution/responses"
[prover.blob-compression]
fs-requests-directory = "/data/prover/v3/compression/requests"
fs-responses-directory = "/data/prover/v3/compression/responses"
[prover.proof-aggregation]
fs-requests-directory = "/data/prover/v3/aggregation/requests"
fs-responses-directory = "/data/prover/v3/aggregation/responses"

[zk-traces]
eth-api="http://127.0.0.1:8745"

Expand Down
20 changes: 9 additions & 11 deletions config/coordinator/coordinator-local-dev.config.overrides.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,15 @@ endpoint="http://127.0.0.1:9000"
endpoint="http://127.0.0.1:9000"

[prover]
fs-requests-directory="tmp/local/prover-execution/v2/requests"
fs-responses-directory="tmp/local/prover-execution/v2/responses"

[blob-compression]
[blob-compression.prover]
fs-requests-directory="tmp/local/prover-compression/v2/requests"
fs-responses-directory="tmp/local/prover-compression/v2/responses"

[proof-aggregation.prover]
fs-requests-directory="tmp/local/prover-aggregation/v2/requests"
fs-responses-directory="tmp/local/prover-aggregation/v2/responses"
[prover.execution]
fs-requests-directory = "tmp/local/prover/v2/execution/requests"
fs-responses-directory = "tmp/local/prover/v2/execution/responses"
[prover.blob-compression]
fs-requests-directory = "tmp/local/prover/v2/compression/requests"
fs-responses-directory = "tmp/local/prover/v2/compression/responses"
[prover.proof-aggregation]
fs-requests-directory = "tmp/local/prover/v2/aggregation/requests"
fs-responses-directory = "tmp/local/prover/v2/aggregation/responses"

[zk-traces]
eth-api="http://127.0.0.1:8645"
Expand Down
6 changes: 3 additions & 3 deletions config/coordinator/log4j2-dev.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@
<!-- <Logger name="net.consensys.zkevm.ethereum.coordination.blob" level="TRACE" additivity="false">-->
<!-- <appender-ref ref="console"/>-->
<!-- </Logger>-->
<Logger name="net.consensys.zkevm.ethereum.submission" level="DEBUG" additivity="false">
<Logger name="net.consensys.zkevm.ethereum.submission" level="INFO" additivity="false">
<DebouncingFilter/>
<appender-ref ref="rewrite"/>
</Logger>
<Logger name="net.consensys.zkevm.ethereum.finalization" level="DEBUG" additivity="false">
<Logger name="net.consensys.zkevm.ethereum.finalization" level="INFO" additivity="false">
<DebouncingFilter/>
<appender-ref ref="rewrite"/>
</Logger>
<Logger name="net.consensys.linea.ethereum.gaspricing.dynamiccap" level="DEBUG" additivity="false">
<Logger name="net.consensys.linea.ethereum.gaspricing.dynamiccap" level="INFO" additivity="false">
<DebouncingFilter/>
<appender-ref ref="rewrite"/>
</Logger>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import net.consensys.linea.ethereum.gaspricing.FeesFetcher
import net.consensys.linea.ethereum.gaspricing.WMAGasProvider
import net.consensys.linea.httprest.client.VertxHttpRestClient
import net.consensys.linea.web3j.SmartContractErrors
import net.consensys.zkevm.coordinator.app.config.L1Config
import net.consensys.zkevm.coordinator.app.config.L2Config
import net.consensys.zkevm.coordinator.app.config.SignerConfig
import net.consensys.zkevm.coordinator.clients.smartcontract.LineaRollupSmartContractClient
import net.consensys.zkevm.ethereum.crypto.Web3SignerRestClient
import net.consensys.zkevm.ethereum.crypto.Web3SignerTxSignService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import io.vertx.core.json.jackson.DatabindCodec
import io.vertx.micrometer.backends.BackendRegistries
import io.vertx.sqlclient.SqlClient
import net.consensys.linea.async.toSafeFuture
import net.consensys.linea.contract.Web3JL2MessageServiceLogsClient
import net.consensys.linea.contract.Web3JLogsClient
import net.consensys.linea.jsonrpc.client.LoadBalancingJsonRpcClient
import net.consensys.linea.jsonrpc.client.VertxHttpJsonRpcClientFactory
import net.consensys.linea.metrics.micrometer.MicrometerMetricsFacade
import net.consensys.linea.vertx.loadVertxConfig
import net.consensys.linea.web3j.okHttpClientBuilder
import net.consensys.zkevm.coordinator.api.Api
import net.consensys.zkevm.coordinator.clients.prover.FileBasedExecutionProverClient
import net.consensys.zkevm.coordinator.app.config.CoordinatorConfig
import net.consensys.zkevm.coordinator.app.config.DatabaseConfig
import net.consensys.zkevm.fileio.DirectoryCleaner
import net.consensys.zkevm.persistence.dao.aggregation.AggregationsRepositoryImpl
import net.consensys.zkevm.persistence.dao.aggregation.PostgresAggregationsDao
import net.consensys.zkevm.persistence.dao.aggregation.RetryingPostgresAggregationsDao
Expand Down Expand Up @@ -79,28 +79,6 @@ class CoordinatorApp(private val configs: CoordinatorConfig) {
Async.defaultExecutorService()
)

private fun createExecutionProverClient(config: ProverConfig): FileBasedExecutionProverClient {
return FileBasedExecutionProverClient(
config = FileBasedExecutionProverClient.Config(
requestDirectory = config.fsRequestsDirectory,
responseDirectory = config.fsResponsesDirectory,
inprogressProvingSuffixPattern = config.fsInprogressProvingSuffixPattern,
pollingInterval = config.fsPollingInterval.toKotlinDuration(),
timeout = config.fsPollingTimeout.toKotlinDuration(),
tracesVersion = configs.traces.rawExecutionTracesVersion,
stateManagerVersion = configs.stateManager.version
),
l2MessageServiceLogsClient = Web3JL2MessageServiceLogsClient(
logsClient = Web3JLogsClient(vertx, l2Web3jClient),
l2MessageServiceAddress = configs.l2.messageServiceAddress
),
vertx = vertx,
l2Web3jClient = l2Web3jClient
)
}

private val proverClient: FileBasedExecutionProverClient = createExecutionProverClient(configs.prover)

private val persistenceRetryer = PersistenceRetryer(
vertx = vertx,
config = PersistenceRetryer.Config(
Expand Down Expand Up @@ -164,7 +142,6 @@ class CoordinatorApp(private val configs: CoordinatorConfig) {
vertx = vertx,
l2Web3jClient = l2Web3jClient,
httpJsonRpcClientFactory = httpJsonRpcClientFactory,
proverClientV2 = proverClient,
batchesRepository = batchesRepository,
blobsRepository = blobsRepository,
aggregationsRepository = aggregationsRepository,
Expand All @@ -175,16 +152,22 @@ class CoordinatorApp(private val configs: CoordinatorConfig) {

private val requestFileCleanup = DirectoryCleaner(
vertx = vertx,
directories = listOf(
configs.prover.fsRequestsDirectory, // Execution proof request directory
configs.blobCompression.prover.fsRequestsDirectory, // Compression proof request directory
configs.proofAggregation.prover.fsRequestsDirectory // Aggregation proof request directory
directories = listOfNotNull(
configs.proversConfig.proverA.execution.requestsDirectory,
configs.proversConfig.proverA.blobCompression.requestsDirectory,
configs.proversConfig.proverA.proofAggregation.requestsDirectory,
configs.proversConfig.proverB?.execution?.requestsDirectory,
configs.proversConfig.proverB?.blobCompression?.requestsDirectory,
configs.proversConfig.proverB?.proofAggregation?.requestsDirectory
),
fileFilters = DirectoryCleaner.getSuffixFileFilters(
listOf(
configs.prover.fsInprogressRequestWritingSuffix,
configs.blobCompression.prover.fsInprogressRequestWritingSuffix,
configs.proofAggregation.prover.fsInprogressRequestWritingSuffix
listOfNotNull(
configs.proversConfig.proverA.execution.inprogressRequestWritingSuffix,
configs.proversConfig.proverA.blobCompression.inprogressRequestWritingSuffix,
configs.proversConfig.proverA.proofAggregation.inprogressRequestWritingSuffix,
configs.proversConfig.proverB?.execution?.inprogressRequestWritingSuffix,
configs.proversConfig.proverB?.blobCompression?.inprogressRequestWritingSuffix,
configs.proversConfig.proverB?.proofAggregation?.inprogressRequestWritingSuffix
)
) + DirectoryCleaner.JSON_FILE_FILTER
)
Expand All @@ -203,20 +186,25 @@ class CoordinatorApp(private val configs: CoordinatorConfig) {
}

fun stop(): Int {
SafeFuture.allOf(
l1App.stop(),
SafeFuture.fromRunnable { l2Web3jClient.shutdown() },
api.stop().toSafeFuture()
).thenApply {
LoadBalancingJsonRpcClient.stop()
}.thenCompose {
requestFileCleanup.cleanup()
}.thenCompose {
vertx.close().toSafeFuture().thenApply { log.info("vertx Stopped") }
}.thenApply {
log.info("CoordinatorApp Stopped")
}.get()
return 0
return kotlin.runCatching {
SafeFuture.allOf(
l1App.stop(),
SafeFuture.fromRunnable { l2Web3jClient.shutdown() },
api.stop().toSafeFuture()
).thenApply {
LoadBalancingJsonRpcClient.stop()
}.thenCompose {
requestFileCleanup.cleanup()
}.thenCompose {
vertx.close().toSafeFuture().thenApply { log.info("vertx Stopped") }
}.thenApply {
log.info("CoordinatorApp Stopped")
}.get()
0
}.recover { e ->
log.error("CoordinatorApp Stopped with error: errorMessage={}", e.message, e)
1
}.getOrThrow()
}

private fun initDb(dbConfig: DatabaseConfig): SqlClient {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import com.sksamuel.hoplite.ConfigLoaderBuilder
import com.sksamuel.hoplite.addFileSource
import net.consensys.linea.traces.TracesCountersV1
import net.consensys.linea.traces.TracesCountersV2
import net.consensys.zkevm.coordinator.app.config.CoordinatorConfig
import net.consensys.zkevm.coordinator.app.config.CoordinatorConfigTomlDto
import net.consensys.zkevm.coordinator.app.config.GasPriceCapTimeOfDayMultipliersConfig
import net.consensys.zkevm.coordinator.app.config.SmartContractErrorCodesConfig
import net.consensys.zkevm.coordinator.app.config.TracesLimitsV1ConfigFile
import net.consensys.zkevm.coordinator.app.config.TracesLimitsV2ConfigFile
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.Logger
import picocli.CommandLine
Expand Down Expand Up @@ -175,7 +181,7 @@ internal constructor(private val errorWriter: PrintWriter, private val startActi
val gasPriceCapTimeOfDayMultipliers =
loadConfigsOrError<GasPriceCapTimeOfDayMultipliersConfig>(listOf(gasPriceCapTimeOfDayMultipliersFile))

val configs = loadConfigsOrError<CoordinatorConfig>(coordinatorConfigFiles)
val configs = loadConfigsOrError<CoordinatorConfigTomlDto>(coordinatorConfigFiles)

if (tracesLimitsV1Configs is Err) {
hasConfigError = true
Expand Down Expand Up @@ -223,7 +229,7 @@ internal constructor(private val errorWriter: PrintWriter, private val startActi
return if (hasConfigError) {
null
} else {
configs.get()?.let { config: CoordinatorConfig ->
configs.get()?.let { config: CoordinatorConfigTomlDto ->
config.copy(
conflation = config.conflation.copy(
_tracesLimitsV1 = tracesLimitsV1Configs?.get()?.tracesLimits?.let { TracesCountersV1(it) },
Expand All @@ -235,7 +241,7 @@ internal constructor(private val errorWriter: PrintWriter, private val startActi
timeOfDayMultipliers = gasPriceCapTimeOfDayMultipliers.get()?.gasPriceCapTimeOfDayMultipliers
)
)
)
).reified()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package net.consensys.zkevm.coordinator.app

import net.consensys.zkevm.coordinator.app.config.CoordinatorConfig
import org.apache.logging.log4j.LogManager
import org.apache.logging.log4j.core.LoggerContext
import org.apache.logging.log4j.core.config.Configurator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import net.consensys.linea.jsonrpc.client.VertxHttpJsonRpcClientFactory
import net.consensys.linea.web3j.Web3jBlobExtended
import net.consensys.toKWeiUInt
import net.consensys.zkevm.LongRunningService
import net.consensys.zkevm.coordinator.app.config.DynamicGasPriceServiceConfig
import org.apache.logging.log4j.LogManager
import org.web3j.protocol.Web3j
import tech.pegasys.teku.infrastructure.async.SafeFuture
Expand Down
Loading

0 comments on commit d8af29a

Please sign in to comment.