Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: disable batch flushing by default #139

Merged
merged 3 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyth-agent"
version = "2.10.3"
version = "2.10.4"
edition = "2021"

[[bin]]
Expand Down
54 changes: 9 additions & 45 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,58 +34,22 @@ through the `RUST_LOG` environment variable using the standard
`error|warn|info|debug|trace`.

#### Plain/JSON logging
By default, pyth-agent will print plaintext log statements. This can be switched to structured JSON output with `-l json`.

#### Code location in logs
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't work anymore.

For debugging purposes, you can specify `-L` to print file/line information with each log statement. This option is disabled by default.

### Key Store Config Migration [v1.x.x LEGACY]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

legacy.

Pyth agent v2.0.0 introduces a simplified program and mapping key configuration. This breaking change alters how you define program/mapping key options in your agent config:
```toml
# Old v1.x.x way
[primary network]
key_store.root_path = "/path/to/keystore"
key_store.publish_keypair_path = "publish_key_pair.json" # Relative path from root_path, "publish_key_pair.json" by default
key_store.program_key_path = "program_key.json" # Relative path from root_path, "program_key.json" by default
key_store.mapping_key_path = "mapping_key.json" # Relative path from root_path, "mapping_key.json" by default

# [...]

# New v2.0.0 way
[primary_network]
key_store.publish_keypair_path = "/path/to/keypair.json" # The root_path is gone, we specify the full path
# Not using separate files anymore
key_store.program_key = "LiteralProgramPubkeyInsideTheConfig" # contents of legacy program_key.json;
key_store.mapping_key = "LiteralMappingPubkeyInsideTheConfig" # contents of legacy mapping_key.json

# [...]

```

#### Automatic Migration
If you are upgrading to agent v2.0.0 with an existing config, you can use the provided automatic migrator program:
```shell
# Build
$ cargo build --release
# Run the migrator, making sure that the key store with previous keys is reachable
$ target/release/agent-migrate-config -c <existing_config_file>.toml > my_new_config.toml
```

#### `Could not open {mapping|program|...} key file`
This error can appear if some of your program/mapping/publish key
files are not reachable under their `key_store.*` setting values.

Ensure that your current working directory is correct for reaching the
key store path inside your config. You may also migrate manually by
changing `key_store.*_key_path` and `key_store.publish_keypair_path`
options by hand, as described in the config example above.
Pyth agent will print logs in plaintext in terminal and JSON format in non-terminal environments (e.g. when writing to a file).

## Run
`cargo run --release -- --config <your_config.toml>` will build and run the agent in a single step.

## Publishing API
A running agent will expose a WebSocket serving the JRPC publishing API documented [here](https://docs.pyth.network/documentation/publish-data/pyth-client-websocket-api). See `config/config.toml` for related settings.

## Best practices
If your publisher is publishing updates to more than 50 price feeds, it is recommended that you do the following to reduce the connection overhead to the agent:
- Batch your messages together and send them as a single request to the agent (as an array of messages). The agent will respond to the batch messages
with a single response containing an array of individual responses (in the same order). If batching is not possible, you can disable the `instant_flush` option
in the configuration file to let agent send the responses every `flush_interval` seconds.
- Do not use subscribe to the price schedule. Instead, define a schedule on the client side and send the messages based on your own schedule. Ideally
you should send price updates as soon as you have them to increase the latency of the data on the Pyth Network.

# Development
## Unit Testing
A collection of Rust unit tests is provided, ran with `cargo test`.
Expand Down
9 changes: 8 additions & 1 deletion config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@ listen_address = "127.0.0.1:8910"
# received from the Price state.
# notify_price_sched_tx_buffer = 10000

# Whether flush messages and responses to the client immediately. Once disabled the
# messages will be flushed every `flush_interval_duration`. Disabling it is useful if
# there are many messages to be sent between the client and the server to avoid overloading
# the connection.
# instant_flush = true

# Flush interval for responses and notifications. This is the maximum time the
# server will wait before flushing the messages to the client.
# server will wait before flushing the messages to the client. It will have no
# effect if `instant_flush` is set to true.
# flush_interval_duration = "50ms"

# Configuration for the primary network this agent will publish data to. In most cases this should be a Pythnet endpoint.
Expand Down
40 changes: 35 additions & 5 deletions src/agent/pyth/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use {
anyhow,
Result,
},
futures::future::OptionFuture,
futures_util::{
stream::{
SplitSink,
Expand Down Expand Up @@ -50,7 +51,10 @@ use {
sync::Arc,
time::Duration,
},
tokio::sync::mpsc,
tokio::{
sync::mpsc,
time::Interval,
},
tracing::instrument,
warp::{
ws::{
Expand Down Expand Up @@ -111,11 +115,18 @@ enum ConnectionError {
WebsocketConnectionClosed,
}

#[derive(Debug)]
enum FlushStrategy {
Instant,
Interval(Interval),
}

async fn handle_connection<S>(
ws_conn: WebSocket,
state: Arc<S>,
notify_price_tx_buffer: usize,
notify_price_sched_tx_buffer: usize,
instant_flush: bool,
flush_interval_duration: Duration,
) where
S: state::Prices,
Expand All @@ -129,7 +140,10 @@ async fn handle_connection<S>(
let (mut notify_price_sched_tx, mut notify_price_sched_rx) =
mpsc::channel(notify_price_sched_tx_buffer);

let mut flush_interval = tokio::time::interval(flush_interval_duration);
let mut flush_strategy = match instant_flush {
true => FlushStrategy::Instant,
false => FlushStrategy::Interval(tokio::time::interval(flush_interval_duration)),
};

loop {
if let Err(err) = handle_next(
Expand All @@ -140,7 +154,7 @@ async fn handle_connection<S>(
&mut notify_price_rx,
&mut notify_price_sched_tx,
&mut notify_price_sched_rx,
&mut flush_interval,
&mut flush_strategy,
)
.await
{
Expand All @@ -156,6 +170,7 @@ async fn handle_connection<S>(
}
}

#[allow(clippy::too_many_arguments)]
async fn handle_next<S>(
state: &S,
ws_tx: &mut SplitSink<WebSocket, Message>,
Expand All @@ -164,11 +179,17 @@ async fn handle_next<S>(
notify_price_rx: &mut mpsc::Receiver<NotifyPrice>,
notify_price_sched_tx: &mut mpsc::Sender<NotifyPriceSched>,
notify_price_sched_rx: &mut mpsc::Receiver<NotifyPriceSched>,
flush_interval: &mut tokio::time::Interval,
flush_strategy: &mut FlushStrategy,
) -> Result<()>
where
S: state::Prices,
{
let optional_flush_tick: OptionFuture<_> = match flush_strategy {
FlushStrategy::Instant => None,
FlushStrategy::Interval(interval) => Some(interval.tick()),
}
.into();

tokio::select! {
msg = ws_rx.next() => {
match msg {
Expand Down Expand Up @@ -196,9 +217,14 @@ where
feed_notification(ws_tx, Method::NotifyPriceSched, Some(notify_price_sched))
.await
}
_ = flush_interval.tick() => {
Some(_) = optional_flush_tick => {
flush(ws_tx).await
}
}?;

match flush_strategy {
FlushStrategy::Interval(_) => Ok(()),
FlushStrategy::Instant => flush(ws_tx).await,
}
}

Expand Down Expand Up @@ -413,6 +439,8 @@ pub struct Config {
/// Size of the buffer of each Server's channel on which `notify_price_sched` events are
/// received from the Price state.
pub notify_price_sched_tx_buffer: usize,
/// Whether to flush immediately after sending a message or notification.
pub instant_flush: bool,
/// Flush interval duration for the notifications.
#[serde(with = "humantime_serde")]
pub flush_interval_duration: Duration,
Expand All @@ -424,6 +452,7 @@ impl Default for Config {
listen_address: "127.0.0.1:8910".to_string(),
notify_price_tx_buffer: 10000,
notify_price_sched_tx_buffer: 10000,
instant_flush: true,
flush_interval_duration: Duration::from_millis(50),
}
}
Expand Down Expand Up @@ -465,6 +494,7 @@ where
state,
config.notify_price_tx_buffer,
config.notify_price_sched_tx_buffer,
config.instant_flush,
config.flush_interval_duration,
)
.await
Expand Down
Loading