Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
makemake-kbo committed Apr 1, 2024
2 parents 49a05f3 + 33560bc commit 496e2a1
Show file tree
Hide file tree
Showing 29 changed files with 279 additions and 120 deletions.
22 changes: 7 additions & 15 deletions src/admin/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,7 @@ use crate::{
Settings,
};

// use tokio::net::TcpListener;
// use hyper::{
// server::conn::http1,
// service::service_fn,
// };
// use hyper_util_blutgang::rt::TokioIo;

// For decoding JWT
/// For decoding JWT
#[derive(Debug, Serialize, Deserialize)]
struct Claims {
id: Value,
Expand All @@ -63,10 +56,10 @@ struct Claims {
exp: usize,
}

// Macro for getting responses from either the cache or RPC nodes.
//
// Since we don't cache the admin request responses, this functions
// quite differently from the one you'll find in `blutgang/balancer/accept_http.rs`
/// Macro for getting responses from either the cache or RPC nodes.
///
/// Since we don't cache the admin request responses, this functions
/// quite differently from the one you'll find in `blutgang/balancer/accept_http.rs`
macro_rules! get_response {
(
$tx:expr,
Expand Down Expand Up @@ -101,7 +94,7 @@ macro_rules! get_response {
}};
}

// Execute request and construct a HTTP response
/// Execute request and construct a HTTP response
async fn forward_body(
mut tx: Value,
rpc_list_rwlock: &Arc<RwLock<Vec<Rpc>>>,
Expand Down Expand Up @@ -131,7 +124,7 @@ async fn forward_body(
Ok(res)
}

// Accept admin request, self explanatory
/// Accept admin request, self explanatory
pub async fn accept_admin_request(
tx: Request<hyper::body::Incoming>,
rpc_list_rwlock: Arc<RwLock<Vec<Rpc>>>,
Expand Down Expand Up @@ -204,7 +197,6 @@ mod tests {
// Helper function to create a test cache
fn create_test_cache() -> Db {
let db = sled::Config::new().temporary(true);


db.open().unwrap()
}
Expand Down
3 changes: 2 additions & 1 deletion src/admin/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Errors
//! Admin specific errors

use std::error::Error;

#[derive(Debug)]
Expand Down
8 changes: 4 additions & 4 deletions src/admin/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ async fn admin_api_server(
}
}

// Used for listening to admin requests as its own tokio task.
// Also used for k8s liveness/readiness probes.
//
// Similar to what you'd find in main/balancer
/// Used for listening to admin requests as its own tokio task.
/// Also used for k8s liveness/readiness probes.
///
/// Similar to what you'd find in main/balancer
pub async fn listen_for_admin_requests(
rpc_list_rwlock: Arc<RwLock<Vec<Rpc>>>,
poverty_list_rwlock: Arc<RwLock<Vec<Rpc>>>,
Expand Down
28 changes: 16 additions & 12 deletions src/admin/liveready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ pub enum ReadinessState {
Setup,
}

/// `HealthState` represents what state blutgang is in:
/// - Healthy, Everything nominal
/// - MissingRpcs, Some RPCs are not following the head but otherwise ok
/// - Unhealthy, Nothing works
#[derive(Debug, PartialEq, Clone, Copy, Default)]
pub enum HealthState {
#[default]
Healthy, // Everything nominal
Healthy, // Everything nominal
MissingRpcs, // Some RPCs are not following the head but otherwise ok
Unhealthy, // Nothing works
}
Expand All @@ -42,19 +46,19 @@ pub enum LiveReadyUpdate {
Health(HealthState),
}

// These 2 are used to send and receive updates related to the current
// health of blutgang.
/// These 2 are used to send and receive updates related to the current
/// health of blutgang.
pub type LiveReadyUpdateRecv = mpsc::Receiver<LiveReadyUpdate>;
pub type LiveReadyUpdateSnd = mpsc::Sender<LiveReadyUpdate>;

// These are used to request/return updates about health
// pub type LiveReadyRecv = oneshot::Receiver<LiveReady>;
/// These are used to request/return updates about health
pub type LiveReadySnd = oneshot::Sender<LiveReady>;
// pub type LiveReadyRecv = oneshot::Receiver<LiveReady>;

pub type LiveReadyRequestRecv = mpsc::Receiver<LiveReadySnd>;
pub type LiveReadyRequestSnd = mpsc::Sender<LiveReadySnd>;

// Macros to make returning statuses less ugly in code
/// Macros to make returning statuses less ugly in code
macro_rules! ok {
() => {
Ok(hyper::Response::builder()
Expand Down Expand Up @@ -82,7 +86,7 @@ macro_rules! nok {
};
}

// Listen for liveness update messages and update the current status accordingly
/// Listen for liveness update messages and update the current status accordingly
async fn liveness_listener(
mut liveness_receiver: LiveReadyUpdateRecv,
liveness_status: Arc<RwLock<LiveReady>>,
Expand All @@ -101,7 +105,7 @@ async fn liveness_listener(
}
}

// Receives requests about current status updates and returns the current liveness
/// Receives requests about current status updates and returns the current liveness
async fn liveness_request_processor(
mut liveness_request_receiver: LiveReadyRequestRecv,
liveness_status: Arc<RwLock<LiveReady>>,
Expand All @@ -114,9 +118,9 @@ async fn liveness_request_processor(
}
}

// Monitor for new liveness updates and update the statuses accordingly.
//
// Also handles incoming requests about the current status.
/// Monitor for new liveness updates and update the statuses accordingly.
///
/// Also handles incoming requests about the current status.
pub(in crate::r#admin) async fn liveness_monitor(
liveness_receiver: LiveReadyUpdateRecv,
liveness_request_receiver: LiveReadyRequestRecv,
Expand Down Expand Up @@ -176,7 +180,7 @@ pub async fn accept_health_request(
}
}

// Just a sink used to immediately discard request in cases where admin is disabled
/// Sink used to immediately discard request in cases where admin is disabled
pub async fn liveness_update_sink(mut liveness_rx: LiveReadyUpdateRecv) {
loop {
while (liveness_rx.recv().await).is_some() {
Expand Down
45 changes: 20 additions & 25 deletions src/admin/methods.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use serde_json::{

use sled::Db;

// Extract the method, call the appropriate function and return the response
/// Extract the method, call the appropriate function and return the response
pub async fn execute_method(
tx: Value,
rpc_list: &Arc<RwLock<Vec<Rpc>>>,
Expand Down Expand Up @@ -101,8 +101,8 @@ pub async fn execute_method(
}
}

// Quit Blutgang upon receiving this method
// We're returning a Null and allowing unreachable code so rustc doesnt cry
/// Quit Blutgang upon receiving this method
/// We're returning a Null and allowing unreachable code so rustc doesnt cry
#[allow(unreachable_code)]
async fn admin_blutgang_quit(cache: Db) -> Result<Value, AdminError> {
// We're doing something not-good so flush everything to disk
Expand All @@ -117,7 +117,7 @@ async fn admin_blutgang_quit(cache: Db) -> Result<Value, AdminError> {
Ok(Value::Null)
}

// Flushes sled cache to disk
/// Flushes sled cache to disk
async fn admin_flush_cache(cache: Db) -> Result<Value, AdminError> {
let time = Instant::now();
let _ = cache.flush_async().await;
Expand All @@ -132,7 +132,7 @@ async fn admin_flush_cache(cache: Db) -> Result<Value, AdminError> {
Ok(rx)
}

// Respond with the config we started blutgang with
/// Respond with the config we started blutgang with
fn admin_config(config: Arc<RwLock<Settings>>) -> Result<Value, AdminError> {
let guard = config.read().unwrap();
let rx = json!({
Expand All @@ -154,8 +154,8 @@ fn admin_config(config: Arc<RwLock<Settings>>) -> Result<Value, AdminError> {
Ok(rx)
}

// List generic Fn to retrieve RPCs from an Arc<RwLock<Vec<Rpc>>>
// Used for `blutgang_rpc_list` and `blutgang_poverty_list`
/// List generic Fn to retrieve RPCs from an Arc<RwLock<Vec<Rpc>>>
/// Used for `blutgang_rpc_list` and `blutgang_poverty_list`
fn admin_list_rpc(rpc_list: &Arc<RwLock<Vec<Rpc>>>) -> Result<Value, AdminError> {
// Read the RPC list, handling errors
let rpc_list = rpc_list.read().map_err(|_| AdminError::Inaccessible)?;
Expand Down Expand Up @@ -185,12 +185,11 @@ fn admin_list_rpc(rpc_list: &Arc<RwLock<Vec<Rpc>>>) -> Result<Value, AdminError>
Ok(rx)
}

// Pushes an RPC to the end of the list
//
// param[0] - RPC url
// param[1] - max_consecutive
// param[2] - ma_len
// param[3] - ma_len
/// Pushes an RPC to the end of the list:
/// - param[0] - RPC url
/// - param[1] - max_consecutive
/// - param[2] - ma_len
/// - param[3] - ma_len
fn admin_add_rpc(
rpc_list: &Arc<RwLock<Vec<Rpc>>>,
params: Option<&Vec<Value>>,
Expand Down Expand Up @@ -259,9 +258,8 @@ fn admin_add_rpc(
Ok(rx)
}

// Remove RPC at a specified index, return the url of the removed RPC
//
// param[0] - RPC index
/// Remove RPC at a specified index, return the url of the removed RPC:
/// - param[0] - RPC index
fn admin_remove_rpc(
rpc_list: &Arc<RwLock<Vec<Rpc>>>,
params: Option<&Vec<Value>>,
Expand Down Expand Up @@ -301,7 +299,7 @@ fn admin_remove_rpc(

// TODO: change the following 4 fn so theyre generic

// Responds with health_check_ttl
/// Responds with health_check_ttl
fn admin_blutgang_health_check_ttl(config: Arc<RwLock<Settings>>) -> Result<Value, AdminError> {
let guard = config.read().unwrap();
let rx = json!({
Expand All @@ -313,7 +311,7 @@ fn admin_blutgang_health_check_ttl(config: Arc<RwLock<Settings>>) -> Result<Valu
Ok(rx)
}

// Responds with ttl
/// Responds with ttl
fn admin_blutgang_ttl(config: Arc<RwLock<Settings>>) -> Result<Value, AdminError> {
let guard = config.read().unwrap();
let rx = json!({
Expand All @@ -325,9 +323,8 @@ fn admin_blutgang_ttl(config: Arc<RwLock<Settings>>) -> Result<Value, AdminError
Ok(rx)
}

// Sets health_check_ttl
//
// param[0] - health_check_ttl
/// Sets health_check_ttl:
/// - param[0] - health_check_ttl
fn admin_blutgang_set_health_check_ttl(
config: Arc<RwLock<Settings>>,
params: Option<&Vec<Value>>,
Expand Down Expand Up @@ -358,9 +355,8 @@ fn admin_blutgang_set_health_check_ttl(
Ok(rx)
}

// Sets ttl
//
// param[0] - ttl
/// Sets ttl:
/// param[0] - ttl
fn admin_blutgang_set_ttl(
config: Arc<RwLock<Settings>>,
params: Option<&Vec<Value>>,
Expand Down Expand Up @@ -429,7 +425,6 @@ mod tests {
// Helper function to create a test cache
fn create_test_cache() -> Db {
let db = sled::Config::new().temporary(true);


db.open().unwrap()
}
Expand Down
7 changes: 7 additions & 0 deletions src/admin/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
//! # `admin` module
//!
//! The admin module is used to get info about the current status of
//! Blutgang, as well as modify settings, and add and remove RPCs.
//!
//! For detailed notes on how to use it, please check the wiki.

mod accept;
mod error;
pub mod listener;
Expand Down
20 changes: 11 additions & 9 deletions src/balancer/accept_http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl Clone for RequestChannels {
}
}

// Macros for accepting requests
/// Macros for accepting requests
#[macro_export]
macro_rules! accept {
(
Expand All @@ -173,7 +173,7 @@ macro_rules! accept {
};
}

// Macro for getting responses from either the cache or RPC nodes
/// Macro for getting responses from either the cache or RPC nodes
macro_rules! get_response {
(
$tx:expr,
Expand Down Expand Up @@ -300,8 +300,8 @@ macro_rules! fetch_from_rpc {
}};
}

// Pick RPC and send request to it. In case the result is cached,
// read and return from the cache.
/// Pick RPC and send request to it. In case the result is cached,
/// read and return from the cache.
async fn forward_body(
tx: Request<hyper::body::Incoming>,
rpc_list_rwlock: &Arc<RwLock<Vec<Rpc>>>,
Expand All @@ -317,7 +317,9 @@ async fn forward_body(
// Check if body has application/json
//
// Can be toggled via the config. Should be on if we want blutgang to be JSON-RPC compliant.
if params.header_check && tx.headers().get("content-type") != Some(&HeaderValue::from_static("application/json")) {
if params.header_check
&& tx.headers().get("content-type") != Some(&HeaderValue::from_static("application/json"))
{
return (
Ok(hyper::Response::builder()
.status(400)
Expand Down Expand Up @@ -386,10 +388,10 @@ async fn forward_body(
(Ok(res), rpc_position)
}

// Forward the request to *a* RPC picked by the algo set by the user.
// Measures the time needed for a request, and updates the respective
// RPC lself.
// In case of a timeout, returns an error.
/// Forward the request to *a* RPC picked by the algo set by the user.
/// Measures the time needed for a request, and updates the respective
/// RPC lself.
/// In case of a timeout, returns an error.
pub async fn accept_request(
mut tx: Request<hyper::body::Incoming>,
connection_params: ConnectionParams,
Expand Down
10 changes: 6 additions & 4 deletions src/balancer/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ enum NamedNumber {
Null,
}

// Returns the corresponding NamedNumber enum value for the named number
// Null if n/a
/// Returns the corresponding NamedNumber enum value for the named number
/// Null if n/a.
fn has_named_number(param: &str) -> NamedNumber {
let named_list = ["latest", "earliest", "safe", "finalized", "pending"];

Expand All @@ -50,7 +50,8 @@ fn has_named_number(param: &str) -> NamedNumber {
NamedNumber::Null
}

// Return the blocknumber from a json-rpc request as a Option<String>, returning None if it cant find anything
/// Return the blocknumber from a json-rpc request as a Option<String>,
/// returning None if it cant find anything.
pub fn get_block_number_from_request(
tx: Value,
named_blocknumbers: &Arc<RwLock<NamedBlocknumbers>>,
Expand Down Expand Up @@ -108,7 +109,7 @@ pub fn get_block_number_from_request(
Some(block_number)
}

// Replaces block tags with a hex number and return the request
/// Replaces block tags with a hex number and return the request
pub fn replace_block_tags(
tx: &mut Value,
named_blocknumbers: &Arc<RwLock<NamedBlocknumbers>>,
Expand Down Expand Up @@ -161,6 +162,7 @@ pub fn replace_block_tags(
tx.to_owned()
}

/// *Converts* a hyper `Incoming` request to a `serde_json::Value`.
pub async fn incoming_to_value(tx: Request<Incoming>) -> Result<Value, hyper::Error> {
#[cfg(feature = "debug-verbose")]
println!("Incoming request: {:?}", tx);
Expand Down
Loading

0 comments on commit 496e2a1

Please sign in to comment.