Skip to content

Commit

Permalink
PoC integration of Metrics (#569)
Browse files Browse the repository at this point in the history
* feat: Implement metrics recording in lurk-metrics package

- Added a new `lurk-metrics` package to monitor and record application's metrics in a thread-safe manner.
- This module hooks up [metrics](https://docs.rs/metrics) to a thread-local sink, that in turn is drained into logs by a global recorder on a fixed cadence (/5s).
- Made modifications to `Cargo.toml` to introduce the `lurk-metrics` package and its relevant dependencies.

* feat: Integrate lurk-metrics package into project workspace (PoC)

- Added `lurk-metrics` package to the project and updated `metrics` package to version `0.21.1` in workspace
- Modified `lurk-metrics` project to use workspace `metrics` dependency instead of previous external version
- Integrated `lurk_metrics::MetricsSink` initialization into the main function, displaying metrics in the log.
  • Loading branch information
huitseeker authored Jul 28, 2023
1 parent eccd914 commit c1960b7
Show file tree
Hide file tree
Showing 9 changed files with 652 additions and 14 deletions.
77 changes: 77 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 8 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ indexmap = { version = "1.9.3", features = ["rayon"] }
itertools = "0.9"
log = { workspace = true }
lurk-macros = { path = "lurk-macros" }
lurk-metrics = { path = "lurk-metrics" }
metrics = { workspace = true }
neptune = { workspace = true, features = ["arity2","arity4","arity8","arity16","pasta","bls"] }
nom = "7.1.3"
nom_locate = "4.1.0"
Expand Down Expand Up @@ -88,8 +90,11 @@ tempfile = "3.6.0"

[workspace]
resolver = "2"
members = ["clutch",
"fcomm", "lurk-macros"
members = [
"clutch",
"fcomm",
"lurk-macros",
"lurk-metrics"
]

# Dependencies that should be kept in sync through the whole workspace
Expand All @@ -102,6 +107,7 @@ blstrs = "0.7.0"
clap = "4.3.17"
ff = "0.13"
log = "0.4.19"
metrics = "0.21.1"
neptune = { version = "10.0.0" }
nova = { package = "nova-snark", version = "0.22", default-features = false }
once_cell = "1.18.0"
Expand Down
19 changes: 19 additions & 0 deletions lurk-metrics/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "lurk-metrics"
authors = ["Lurk Lab <engineering@lurk-lab.com>"]
version = "0.1.0"
edition = "2021"
license = "MIT OR Apache-2.0"
description = "Metrics Sink for lurk"
repository = "https://github.com/lurk-lab/lurk-rs"

[dependencies]
metrics = { workspace = true }
once_cell = { workspace = true }
log = { workspace = true }
hdrhistogram = { version = "7.5.2", default-features = false }


[dev-dependencies]
expect-test = "1"
testing_logger = "0.1.1"
171 changes: 171 additions & 0 deletions lurk-metrics/src/data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
use std::collections::HashMap;
use std::fmt::{self, Display, Formatter};

use log::info;
use metrics::Key;

pub const METRICS_TARGET_NAME: &str = "lurk::metrics";

/// A map of metrics data
#[derive(Debug, Default)]
pub struct Metrics(HashMap<Key, Metric>);

impl Metrics {
/// Get a mutable reference to a metric, creating it if it doesn't already exist in the map
pub fn get_mut(&mut self, typ: MetricType, key: &Key) -> &mut Metric {
if !self.0.contains_key(key) {
self.0.insert(key.clone(), Metric::new(typ));
}
self.0.get_mut(key).unwrap()
}

/// Aggregate another [Metrics] into this one
pub fn aggregate(&mut self, other: Metrics) {
for (key, data) in other.0 {
match self.0.get_mut(&key) {
Some(me) => {
me.aggregate(data);
}
None => {
self.0.insert(key, data);
}
}
}
}

/// Emit this [Metrics] object
pub fn emit(self) {
let mut keys = self.0.keys().collect::<Vec<_>>();
keys.sort();
for key in keys {
let metric = self.0.get(key).unwrap();
let labels = if key.labels().len() == 0 {
String::new()
} else {
format!(
"[{}]",
key.labels()
.map(|label| format!("{}={}", label.key(), label.value()))
.collect::<Vec<_>>()
.join(",")
)
};
info!(
target: METRICS_TARGET_NAME,
"{}{}: {}",
key.name(),
labels,
metric,
);
}
}

#[cfg(test)]
pub fn iter(&self) -> impl Iterator<Item = (&Key, &Metric)> {
self.0.iter()
}
}

#[derive(Debug)]
pub enum MetricType {
Counter,
Gauge,
Histogram,
}

#[derive(Debug)]
pub enum Metric {
Counter(ValueAndCount<u64>),
Gauge(ValueAndCount<f64>),
// Fixed scaling configuration for histograms, tuned for
// microsecond-scale latency timers. It saturates at 60 seconds.
Histogram(hdrhistogram::Histogram<u64>),
}

impl Metric {
fn new(typ: MetricType) -> Self {
match typ {
MetricType::Counter => Metric::Counter(Default::default()),
MetricType::Gauge => Metric::Gauge(Default::default()),
MetricType::Histogram => Metric::Histogram(
hdrhistogram::Histogram::new_with_bounds(1, 60 * 1000 * 1000, 2).unwrap(),
),
}
}

pub fn increment(&mut self, value: u64) {
match self {
Metric::Counter(inner) => {
inner.sum += value;
inner.n += 1;
}
Metric::Gauge(_inner) => {
panic!("increment gauge values are not supported");
}
Metric::Histogram(inner) => {
inner.saturating_record(value);
}
}
}

pub fn set(&mut self, value: f64) {
match self {
Metric::Counter(_inner) => panic!("set counter values are not supported"),
Metric::Gauge(inner) => {
inner.sum = value;
inner.n = 1;
}
Metric::Histogram(_inner) => panic!("set histogram values are not supported"),
}
}

fn aggregate(&mut self, other: Metric) {
match (self, other) {
(Metric::Counter(me), Metric::Counter(other)) => {
me.sum += other.sum;
me.n += other.n;
}
(Metric::Gauge(me), Metric::Gauge(other)) => {
me.sum += other.sum;
me.n += other.n;
}
(Metric::Histogram(me), Metric::Histogram(other)) => {
me.add(other).unwrap();
}
_ => debug_assert!(false, "can't aggregate different types"),
}
}
}

impl Display for Metric {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
Metric::Counter(inner) => {
if inner.sum == inner.n {
f.write_fmt(format_args!("{}", inner.sum))
} else {
f.write_fmt(format_args!("{} (n={})", inner.sum, inner.n))
}
}
Metric::Gauge(inner) => f.write_fmt(format_args!("{} (n={})", inner.sum, inner.n)),
Metric::Histogram(inner) => f.write_fmt(format_args!(
"n={}: min={} p10={} p50={} avg={:.2} p90={} p99={} p99.9={} max={}",
inner.len(),
inner.min(),
inner.value_at_quantile(0.1),
inner.value_at_quantile(0.5),
inner.mean(),
inner.value_at_quantile(0.9),
inner.value_at_quantile(0.99),
inner.value_at_quantile(0.999),
inner.max(),
)),
}
}
}

#[derive(Debug, Default)]
pub struct ValueAndCount<T> {
pub sum: T,
pub n: u64,
}
Loading

0 comments on commit c1960b7

Please sign in to comment.