Skip to content

Commit

Permalink
refactor(agent): various refactors (#126)
Browse files Browse the repository at this point in the history
* refactor(agent): convert keypair service to api

* refactor(agent): use exit flag everywhere

* refactor(agent): StateApi -> Prices and refactor module

The StateApi is left over from the initial Adapter, but all functionality is
for pricing/product accounts. This refactors that module and fixes the cyclic
dependency between it and GlobalStore.

The new logic performs updates within the Prices API (Which is where the state
relevant to subscriptions already was, so is the better place for it).

File rename left for a future commit to keep the diffs clean.

* refactor(agent): refactor all references to adapter to state

* refactor(agent): remove pythd module, raise pyth module

* refactor(agent): remove store module

* refactor(agent): convert to a tracing logger
  • Loading branch information
Reisen authored Jun 6, 2024
1 parent 0cd76df commit 660b216
Show file tree
Hide file tree
Showing 28 changed files with 1,090 additions and 1,397 deletions.
324 changes: 94 additions & 230 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 2 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,11 @@ solana-account-decoder = "1.18.8"
solana-client = "1.18.8"
solana-sdk = "1.18.8"
bincode = "1.3.3"
slog = { version = "2.7.0", features = ["max_level_trace", "release_max_level_trace"] }
slog-term = "2.9.1"
rand = "0.8.5"
slog-async = "2.8.0"
config = "0.14.0"
thiserror = "1.0.58"
clap = { version = "4.5.4", features = ["derive"] }
humantime-serde = "1.1.1"
slog-envlogger = "2.2.0"
serde-this-or-that = "0.4.2"
# The public typed-html 0.2.2 release is causing a recursion limit
# error that cannot be fixed from outside the crate.
Expand All @@ -52,17 +48,17 @@ humantime = "2.1.0"
prometheus-client = "0.22.2"
lazy_static = "1.4.0"
toml_edit = "0.22.9"
slog-bunyan = "2.5.0"
winnow = "0.6.5"
proptest = "1.4.0"
tracing = { version = "0.1.40", features = ["log"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] }

[dev-dependencies]
tokio-util = { version = "0.7.10", features = ["full"] }
soketto = "0.8.0"
portpicker = "0.1.1"
rand = "0.8.5"
tokio-retry = "0.3.0"
slog-extlog = "8.1.0"
iobuffer = "0.2.0"

[profile.release]
Expand Down
2 changes: 1 addition & 1 deletion integration-tests/tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,7 +695,7 @@ async def test_update_price_discards_unpermissioned(self, client: PythAgentClien
lines_found += 1
expected_unperm_pubkey = final_price_account_unperm["account"]
# Must point at the expected account as all other attempts must be valid
assert f"price_account: {expected_unperm_pubkey}" in line
assert f'"unpermissioned_price_account":"{expected_unperm_pubkey}"' in line

# Must find at least one log discarding the account
assert lines_found > 0
Expand Down
107 changes: 49 additions & 58 deletions src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,32 +61,40 @@ Metrics Server:
Note that there is an Oracle and Exporter for each network, but only one Local Store and Global Store.
################################################################################################################################## */

pub mod legacy_schedule;
pub mod market_schedule;
pub mod metrics;
pub mod pythd;
pub mod remote_keypair_loader;
pub mod solana;
pub mod state;
pub mod store;
use {
self::{
config::Config,
pythd::api::rpc,
pyth::rpc,
solana::network,
state::notifier,
},
anyhow::Result,
futures_util::future::join_all,
slog::Logger,
lazy_static::lazy_static,
std::sync::Arc,
tokio::sync::{
broadcast,
mpsc,
},
tokio::sync::watch,
};

pub mod legacy_schedule;
pub mod market_schedule;
pub mod metrics;
pub mod pyth;
pub mod solana;
pub mod state;

lazy_static! {
/// A static exit flag to indicate to running threads that we're shutting down. This is used to
/// gracefully shut down the application.
///
/// We make this global based on the fact the:
/// - The `Sender` side does not rely on any async runtime.
/// - Exit logic doesn't really require carefully threading this value through the app.
/// - The `Receiver` side of a watch channel performs the detection based on if the change
/// happened after the subscribe, so it means all listeners should always be notified
/// correctly.
pub static ref EXIT: watch::Sender<bool> = watch::channel(false).0;
}

pub struct Agent {
config: Config,
}
Expand All @@ -96,86 +104,69 @@ impl Agent {
Agent { config }
}

pub async fn start(&self, logger: Logger) {
info!(logger, "Starting {}", env!("CARGO_PKG_NAME");
"config" => format!("{:?}", &self.config),
"version" => env!("CARGO_PKG_VERSION"),
"cwd" => std::env::current_dir().map(|p| format!("{}", p.display())).unwrap_or("<could not get current directory>".to_owned())
pub async fn start(&self) {
tracing::info!(
config = format!("{:?}", &self.config),
version = env!("CARGO_PKG_VERSION"),
cwd = std::env::current_dir()
.map(|p| format!("{}", p.display()))
.unwrap_or("<could not get current directory>".to_owned()),
"Starting {}",
env!("CARGO_PKG_NAME"),
);

if let Err(err) = self.spawn(logger.clone()).await {
error!(logger, "{}", err);
debug!(logger, "error context"; "context" => format!("{:?}", err));
if let Err(err) = self.spawn().await {
tracing::error!(err = ?err, "Agent spawn failed.");
};
}

async fn spawn(&self, logger: Logger) -> Result<()> {
async fn spawn(&self) -> Result<()> {
// job handles
let mut jhs = vec![];

// Create the channels
// TODO: make all components listen to shutdown signal
let (shutdown_tx, _) = broadcast::channel(self.config.channel_capacities.shutdown);
let (primary_keypair_loader_tx, primary_keypair_loader_rx) = mpsc::channel(10);
let (secondary_keypair_loader_tx, secondary_keypair_loader_rx) = mpsc::channel(10);

// Create the Pythd Adapter.
let adapter =
Arc::new(state::State::new(self.config.pythd_adapter.clone(), logger.clone()).await);
// Create the Application State.
let state = Arc::new(state::State::new(self.config.state.clone()).await);

// Spawn the primary network
jhs.extend(network::spawn_network(
self.config.primary_network.clone(),
network::Network::Primary,
primary_keypair_loader_tx,
logger.new(o!("primary" => true)),
adapter.clone(),
state.clone(),
)?);

// Spawn the secondary network, if needed
if let Some(config) = &self.config.secondary_network {
jhs.extend(network::spawn_network(
config.clone(),
network::Network::Secondary,
secondary_keypair_loader_tx,
logger.new(o!("primary" => false)),
adapter.clone(),
state.clone(),
)?);
}

// Create the Notifier task for the Pythd RPC.
jhs.push(tokio::spawn(notifier(
adapter.clone(),
shutdown_tx.subscribe(),
)));
jhs.push(tokio::spawn(notifier(state.clone())));

// Spawn the Pythd API Server
jhs.push(tokio::spawn(rpc::run(
self.config.pythd_api_server.clone(),
logger.clone(),
adapter.clone(),
shutdown_tx.subscribe(),
state.clone(),
)));

// Spawn the metrics server
jhs.push(tokio::spawn(metrics::MetricsServer::spawn(
jhs.push(tokio::spawn(metrics::spawn(
self.config.metrics_server.bind_address,
logger.clone(),
adapter,
)));

// Spawn the remote keypair loader endpoint for both networks
jhs.append(
&mut remote_keypair_loader::RemoteKeypairLoader::spawn(
primary_keypair_loader_rx,
secondary_keypair_loader_rx,
&mut state::keypairs::spawn(
self.config.primary_network.rpc_url.clone(),
self.config
.secondary_network
.as_ref()
.map(|c| c.rpc_url.clone()),
self.config.remote_keypair_loader.clone(),
logger,
state,
)
.await,
);
Expand All @@ -191,8 +182,7 @@ pub mod config {
use {
super::{
metrics,
pythd,
remote_keypair_loader,
pyth,
solana::network,
state,
},
Expand All @@ -214,13 +204,14 @@ pub mod config {
pub primary_network: network::Config,
pub secondary_network: Option<network::Config>,
#[serde(default)]
pub pythd_adapter: state::Config,
#[serde(rename = "pythd_adapter")]
pub state: state::Config,
#[serde(default)]
pub pythd_api_server: pythd::api::rpc::Config,
pub pythd_api_server: pyth::rpc::Config,
#[serde(default)]
pub metrics_server: metrics::Config,
#[serde(default)]
pub remote_keypair_loader: remote_keypair_loader::Config,
pub remote_keypair_loader: state::keypairs::Config,
}

impl Config {
Expand Down
84 changes: 30 additions & 54 deletions src/agent/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
use {
super::state::{
local::PriceInfo,
State,
},
crate::agent::{
solana::oracle::PriceEntry,
store::PriceIdentifier,
},
super::state::local::PriceInfo,
crate::agent::solana::oracle::PriceEntry,
lazy_static::lazy_static,
prometheus_client::{
encoding::{
Expand All @@ -21,15 +15,13 @@ use {
registry::Registry,
},
serde::Deserialize,
slog::Logger,
solana_sdk::pubkey::Pubkey,
std::{
net::SocketAddr,
sync::{
atomic::AtomicU64,
Arc,
},
time::Instant,
},
tokio::sync::Mutex,
warp::{
Expand Down Expand Up @@ -64,49 +56,33 @@ lazy_static! {
Arc::new(Mutex::new(<Registry>::default()));
}

/// Internal metrics server state, holds state needed for serving
/// metrics.
pub struct MetricsServer {
pub start_time: Instant,
pub logger: Logger,
pub adapter: Arc<State>,
}

impl MetricsServer {
/// Instantiate a metrics API.
pub async fn spawn(addr: impl Into<SocketAddr> + 'static, logger: Logger, adapter: Arc<State>) {
let server = MetricsServer {
start_time: Instant::now(),
logger,
adapter,
};

let shared_state = Arc::new(Mutex::new(server));
let shared_state4metrics = shared_state.clone();
let metrics_route = warp::path("metrics")
.and(warp::path::end())
.and_then(move || {
let shared_state = shared_state4metrics.clone();
async move {
let locked_state = shared_state.lock().await;
let mut buf = String::new();
let response = encode(&mut buf, &&PROMETHEUS_REGISTRY.lock().await)
.map_err(|e| -> Box<dyn std::error::Error> {
e.into()
})
.and_then(|_| -> Result<_, Box<dyn std::error::Error>> {
Ok(Box::new(reply::with_status(buf, StatusCode::OK)))
}).unwrap_or_else(|e| {
error!(locked_state.logger, "Metrics: Could not gather metrics from registry"; "error" => e.to_string());
Box::new(reply::with_status("Could not gather metrics. See logs for details".to_string(), StatusCode::INTERNAL_SERVER_ERROR))
});

Result::<Box<dyn Reply>, Rejection>::Ok(response)
}
});

warp::serve(metrics_route).bind(addr).await;
}
/// Instantiate a metrics API.
pub async fn spawn(addr: impl Into<SocketAddr> + 'static) {
let metrics_route = warp::path("metrics")
.and(warp::path::end())
.and_then(move || async move {
let mut buf = String::new();
let response = encode(&mut buf, &&PROMETHEUS_REGISTRY.lock().await)
.map_err(|e| -> Box<dyn std::error::Error> { e.into() })
.and_then(|_| -> Result<_, Box<dyn std::error::Error>> {
Ok(Box::new(reply::with_status(buf, StatusCode::OK)))
})
.unwrap_or_else(|e| {
tracing::error!(err = ?e, "Metrics: Could not gather metrics from registry");
Box::new(reply::with_status(
"Could not gather metrics. See logs for details".to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
))
});

Result::<Box<dyn Reply>, Rejection>::Ok(response)
});

let (_, serve) = warp::serve(metrics_route).bind_with_graceful_shutdown(addr, async {
let _ = crate::agent::EXIT.subscribe().changed().await;
});

serve.await
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
Expand Down Expand Up @@ -362,7 +338,7 @@ impl PriceLocalMetrics {
metrics
}

pub fn update(&self, price_id: &PriceIdentifier, price_info: &PriceInfo) {
pub fn update(&self, price_id: &pyth_sdk::Identifier, price_info: &PriceInfo) {
#[deny(unused_variables)]
let Self {
price,
Expand Down
4 changes: 2 additions & 2 deletions src/agent/pythd/api.rs → src/agent/pyth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use {
std::collections::BTreeMap,
};

pub mod rpc;

pub type Pubkey = String;
pub type Attrs = BTreeMap<String, String>;

Expand Down Expand Up @@ -83,5 +85,3 @@ pub struct PriceUpdate {
pub valid_slot: Slot,
pub pub_slot: Slot,
}

pub mod rpc;
Loading

0 comments on commit 660b216

Please sign in to comment.