-
Notifications
You must be signed in to change notification settings - Fork 15
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* adds json-rpc client V2 with simpler interface Signed-off-by: Pedro Novais <1478752+jpnovais@users.noreply.github.com> Co-authored-by: jonesho <81145364+jonesho@users.noreply.github.com> Co-authored-by: Roman Vaseev <4833306+Filter94@users.noreply.github.com>
- Loading branch information
1 parent
74529e6
commit 9fc7fa6
Showing
17 changed files
with
1,173 additions
and
107 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
37 changes: 33 additions & 4 deletions
37
jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcClient.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,27 +1,56 @@ | ||
package net.consensys.linea.jsonrpc.client | ||
|
||
import com.fasterxml.jackson.databind.JsonNode | ||
import com.fasterxml.jackson.databind.node.JsonNodeType | ||
import com.github.michaelbull.result.Ok | ||
import com.github.michaelbull.result.Result | ||
import io.vertx.core.Future | ||
import io.vertx.core.json.JsonArray | ||
import io.vertx.core.json.JsonObject | ||
import net.consensys.linea.jsonrpc.JsonRpcErrorResponse | ||
import net.consensys.linea.jsonrpc.JsonRpcRequest | ||
import net.consensys.linea.jsonrpc.JsonRpcSuccessResponse | ||
|
||
fun identityMapper(value: Any?): Any? = value | ||
fun toPrimitiveOrJacksonJsonNode(value: Any?): Any? = value | ||
|
||
@Suppress("UNCHECKED_CAST") | ||
fun toPrimitiveOrVertxJson(value: Any?): Any? { | ||
if (value == null) { | ||
return null | ||
} | ||
return when (value) { | ||
is String -> value | ||
is Number -> value | ||
is Boolean -> value | ||
is JsonNode -> { | ||
when (value.nodeType) { | ||
JsonNodeType.STRING, JsonNodeType.NUMBER, JsonNodeType.BOOLEAN, JsonNodeType.NULL -> | ||
value | ||
.toPrimitiveOrJsonNode() | ||
|
||
JsonNodeType.OBJECT -> JsonObject(objectMapper.convertValue(value, Map::class.java) as Map<String, Any?>) | ||
JsonNodeType.ARRAY -> JsonArray(objectMapper.convertValue(value, List::class.java) as List<Any?>) | ||
else -> throw IllegalArgumentException("Unsupported JsonNodeType: ${value.nodeType}") | ||
} | ||
} | ||
|
||
else -> throw IllegalArgumentException("Unsupported type: ${value::class.java}") | ||
} | ||
} | ||
|
||
interface JsonRpcClient { | ||
fun makeRequest( | ||
request: JsonRpcRequest, | ||
resultMapper: (Any?) -> Any? = ::identityMapper | ||
resultMapper: (Any?) -> Any? = ::toPrimitiveOrVertxJson // to keep backward compatibility | ||
): Future<Result<JsonRpcSuccessResponse, JsonRpcErrorResponse>> | ||
} | ||
|
||
fun isResultOk(result: Result<JsonRpcSuccessResponse, JsonRpcErrorResponse>): Boolean = result is Ok | ||
fun isResultOk(result: Result<Any?, Any?>): Boolean = result is Ok | ||
|
||
interface JsonRpcClientWithRetries : JsonRpcClient { | ||
fun makeRequest( | ||
request: JsonRpcRequest, | ||
resultMapper: (Any?) -> Any? = ::identityMapper, | ||
resultMapper: (Any?) -> Any? = ::toPrimitiveOrVertxJson, // to keep backward compatibility | ||
stopRetriesPredicate: (result: Result<JsonRpcSuccessResponse, JsonRpcErrorResponse>) -> Boolean = ::isResultOk | ||
): Future<Result<JsonRpcSuccessResponse, JsonRpcErrorResponse>> | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
109 changes: 109 additions & 0 deletions
109
...bs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcRequestRetryerV2.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
package net.consensys.linea.jsonrpc.client | ||
|
||
import com.fasterxml.jackson.databind.ObjectMapper | ||
import com.github.michaelbull.result.Err | ||
import com.github.michaelbull.result.Ok | ||
import com.github.michaelbull.result.Result | ||
import com.github.michaelbull.result.map | ||
import com.github.michaelbull.result.mapError | ||
import com.github.michaelbull.result.onFailure | ||
import io.vertx.core.Vertx | ||
import net.consensys.linea.async.AsyncRetryer | ||
import net.consensys.linea.async.RetriedExecutionException | ||
import net.consensys.linea.async.toSafeFuture | ||
import net.consensys.linea.jsonrpc.JsonRpcErrorResponse | ||
import net.consensys.linea.jsonrpc.JsonRpcRequest | ||
import net.consensys.linea.jsonrpc.JsonRpcSuccessResponse | ||
import org.apache.logging.log4j.Level | ||
import org.apache.logging.log4j.LogManager | ||
import org.apache.logging.log4j.Logger | ||
import tech.pegasys.teku.infrastructure.async.SafeFuture | ||
import java.util.concurrent.atomic.AtomicInteger | ||
import java.util.concurrent.atomic.AtomicReference | ||
import java.util.function.Predicate | ||
|
||
class JsonRpcRequestRetryerV2( | ||
private val vertx: Vertx, | ||
private val delegate: JsonRpcClient, | ||
private val requestRetry: RequestRetryConfig, | ||
private val requestObjectMapper: ObjectMapper = objectMapper, | ||
private val shallRetryRequestsClientBasePredicate: Predicate<Result<Any?, Throwable>>, | ||
private val log: Logger = LogManager.getLogger(JsonRpcRequestRetryer::class.java), | ||
private val failuresLogLevel: Level = Level.WARN | ||
) { | ||
fun <T> makeRequest( | ||
request: JsonRpcRequest, | ||
shallRetryRequestPredicate: Predicate<Result<T, Throwable>>, | ||
resultMapper: (Any?) -> T | ||
): SafeFuture<T> { | ||
return makeRequestWithRetryer(request, resultMapper, shallRetryRequestPredicate) | ||
} | ||
|
||
private fun shallWarnFailureRetries(retries: Int): Boolean { | ||
return requestRetry.failuresWarningThreshold > 0u && | ||
retries > 0 && | ||
(retries % requestRetry.failuresWarningThreshold.toInt()) == 0 | ||
} | ||
|
||
private fun <T> makeRequestWithRetryer( | ||
request: JsonRpcRequest, | ||
resultMapper: (Any?) -> T, | ||
shallRetryRequestPredicate: Predicate<Result<T, Throwable>> | ||
): SafeFuture<T> { | ||
val lastException = AtomicReference<Throwable>() | ||
val retriesCount = AtomicInteger(0) | ||
val requestPredicate = Predicate<Result<T, Throwable>> { result -> | ||
log.info("result: {}", result) | ||
shallRetryRequestsClientBasePredicate.test(result) || shallRetryRequestPredicate.test(result) | ||
} | ||
|
||
return AsyncRetryer.retry( | ||
vertx = vertx, | ||
backoffDelay = requestRetry.backoffDelay, | ||
maxRetries = requestRetry.maxRetries?.toInt(), | ||
timeout = requestRetry.timeout, | ||
stopRetriesPredicate = { result: Result<T, Throwable> -> | ||
result.onFailure(lastException::set) | ||
!requestPredicate.test(result) | ||
} | ||
) { | ||
if (shallWarnFailureRetries(retriesCount.get())) { | ||
log.log( | ||
failuresLogLevel, | ||
"Request '{}' already retried {} times. lastError={}", | ||
requestObjectMapper.writeValueAsString(request), | ||
retriesCount.get(), | ||
lastException.get() | ||
) | ||
} | ||
retriesCount.incrementAndGet() | ||
delegate.makeRequest(request, resultMapper).toSafeFuture().thenApply { unfoldResultValueOrException<T>(it) } | ||
.exceptionally { th -> | ||
if (th is Error || th.cause is Error) { | ||
// Very serious JVM error, we should stop retrying anyway | ||
throw th | ||
} else { | ||
Err(th.cause ?: th) | ||
} | ||
} | ||
}.handleComposed { result, throwable -> | ||
when { | ||
result is Ok -> SafeFuture.completedFuture(result.value) | ||
result is Err -> SafeFuture.failedFuture<T>(result.error) | ||
throwable != null && throwable is RetriedExecutionException -> SafeFuture.failedFuture(lastException.get()) | ||
else -> SafeFuture.failedFuture(throwable) | ||
} | ||
} | ||
} | ||
|
||
companion object { | ||
fun <T> unfoldResultValueOrException( | ||
response: Result<JsonRpcSuccessResponse, JsonRpcErrorResponse> | ||
): Result<T, Throwable> { | ||
@Suppress("UNCHECKED_CAST") | ||
return response | ||
.map { it.result as T } | ||
.mapError { it.error.asException() } | ||
} | ||
} | ||
} |
34 changes: 34 additions & 0 deletions
34
jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcV2Client.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
package net.consensys.linea.jsonrpc.client | ||
|
||
import com.github.michaelbull.result.Result | ||
import tech.pegasys.teku.infrastructure.async.SafeFuture | ||
import java.util.function.Predicate | ||
|
||
/** | ||
* JSON-RPC client that supports JSON-RPC v2.0. | ||
* It will automatically generate the request id and retry requests when JSON-RPC errors are received. | ||
* Please override default stopRetriesPredicate to customize the retry logic. | ||
* | ||
* JSON-RPC result/error.data serialization is done automatically to Jackson JsonNode or primitive types. | ||
*/ | ||
interface JsonRpcV2Client { | ||
/** | ||
* Makes a JSON-RPC request. | ||
* @param method The method to call. | ||
* @param params The parameters to pass to the method. It can be a List<Any?>, a Map<String, *> or a Pojo. | ||
* @param shallRetryRequestPredicate predicate to evaluate request retrying. It defaults to never retrying. | ||
* @param resultMapper Mapper to apply to successful JSON-RPC result. | ||
* the result is primary type (String, Number, Boolean, null) or (jackson's JsonNode or vertx JsonObject/JsonArray) | ||
* The underlying type will depend on the serialization configured on the concrete implementation. | ||
* @return A future that | ||
* - when success - resolves with mapped result | ||
* - when JSON-RPC error - rejects with JsonRpcErrorException with corresponding error code, message and data | ||
* - when other error - rejects with underlying exception | ||
*/ | ||
fun <T> makeRequest( | ||
method: String, | ||
params: Any, // List<Any?>, Map<String, Any?>, Pojo | ||
shallRetryRequestPredicate: Predicate<Result<T, Throwable>> = Predicate { false }, | ||
resultMapper: (Any?) -> T | ||
): SafeFuture<T> | ||
} |
28 changes: 28 additions & 0 deletions
28
jvm-libs/json-rpc/src/main/kotlin/net/consensys/linea/jsonrpc/client/JsonRpcV2ClientImpl.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
package net.consensys.linea.jsonrpc.client | ||
|
||
import com.github.michaelbull.result.Result | ||
import net.consensys.linea.jsonrpc.JsonRpcRequestData | ||
import tech.pegasys.teku.infrastructure.async.SafeFuture | ||
import java.util.function.Predicate | ||
import java.util.function.Supplier | ||
|
||
internal class JsonRpcV2ClientImpl( | ||
private val delegate: JsonRpcRequestRetryerV2, | ||
private val idSupplier: Supplier<Any> | ||
) : JsonRpcV2Client { | ||
|
||
override fun <T> makeRequest( | ||
method: String, | ||
params: Any, | ||
shallRetryRequestPredicate: Predicate<Result<T, Throwable>>, | ||
resultMapper: (Any?) -> T | ||
): SafeFuture<T> { | ||
val request = JsonRpcRequestData(jsonrpc = "2.0", id = idSupplier.get(), method, params) | ||
|
||
return delegate.makeRequest( | ||
request = request, | ||
shallRetryRequestPredicate = shallRetryRequestPredicate, | ||
resultMapper = resultMapper | ||
) | ||
} | ||
} |
Oops, something went wrong.