Skip to content

Commit

Permalink
Using utility macros to create and register metrics for `consumer_gro…
Browse files Browse the repository at this point in the history
…ups` module

These were added in #91. Nothing changes: just the code to wire it up is a bit simpler/cleaner now.
  • Loading branch information
detro committed Nov 8, 2023
1 parent 32a5af6 commit 50d1426
Showing 1 changed file with 26 additions and 32 deletions.
58 changes: 26 additions & 32 deletions src/consumer_groups/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use std::{
use async_trait::async_trait;
use const_format::formatcp;
use konsumer_offsets::ConsumerProtocolAssignment;
use prometheus::{Histogram, HistogramOpts, IntGauge, IntGaugeVec, Opts, Registry};
use prometheus::{
register_histogram_with_registry, register_int_gauge_vec_with_registry,
register_int_gauge_with_registry, Histogram, IntGauge, IntGaugeVec, Registry,
};
use rdkafka::{admin::AdminClient, client::DefaultClientContext, groups::GroupList, ClientConfig};
use tokio::{
sync::mpsc,
Expand All @@ -32,7 +35,7 @@ const M_CG_MEMBERS_HELP: &str = "Members of consumer groups currently in the clu
const M_CG_FETCH_NAME: &str =
formatcp!("{NAMESPACE}_consumer_groups_emitter_fetch_time_milliseconds");
const M_CG_FETCH_HELP: &str =
"Time (in milliseconds) taken to fetch information about all consumer groups in cluster";
"Time (ms) taken to fetch information about all consumer groups in cluster";
const M_CG_CH_CAP_NAME: &str = formatcp!("{NAMESPACE}_consumer_groups_emitter_channel_capacity");
const M_CG_CH_CAP_HELP: &str =
"Capacity of internal channel used to send consumer groups metadata to rest of the service";
Expand Down Expand Up @@ -128,38 +131,29 @@ impl ConsumerGroupsEmitter {
///
/// * `admin_client_config` - Kafka admin client configuration, used to fetch Consumer Groups
pub fn new(admin_client_config: ClientConfig, metrics: Arc<Registry>) -> Self {
// Create metrics
let metric_cg = IntGauge::new(M_CG_NAME, M_CG_HELP)
.unwrap_or_else(|_| panic!("Failed to create metric: {M_CG_NAME}"));
let metric_cg_members =
IntGaugeVec::new(Opts::new(M_CG_MEMBERS_NAME, M_CG_MEMBERS_HELP), &[LABEL_GROUP])
.unwrap_or_else(|_| panic!("Failed to create metric: {M_CG_MEMBERS_NAME}"));
let metric_cg_fetch =
Histogram::with_opts(HistogramOpts::new(M_CG_FETCH_NAME, M_CG_FETCH_HELP))
.unwrap_or_else(|_| panic!("Failed to create metric: {M_CG_FETCH_NAME}"));
let metric_cg_ch_cap = IntGauge::new(M_CG_CH_CAP_NAME, M_CG_CH_CAP_HELP)
.unwrap_or_else(|_| panic!("Failed to create metric: {M_CG_CH_CAP_NAME}"));

// Register metrics
metrics
.register(Box::new(metric_cg.clone()))
.unwrap_or_else(|_| panic!("Failed to register metric: {M_CG_NAME}"));
metrics
.register(Box::new(metric_cg_members.clone()))
.unwrap_or_else(|_| panic!("Failed to register metric: {M_CG_MEMBERS_NAME}"));
metrics
.register(Box::new(metric_cg_fetch.clone()))
.unwrap_or_else(|_| panic!("Failed to register metric: {M_CG_FETCH_NAME}"));
metrics
.register(Box::new(metric_cg_ch_cap.clone()))
.unwrap_or_else(|_| panic!("Failed to register metric: {M_CG_CH_CAP_NAME}"));

Self {
admin_client_config,
metric_cg,
metric_cg_members,
metric_cg_fetch,
metric_cg_ch_cap,
metric_cg: register_int_gauge_with_registry!(M_CG_NAME, M_CG_HELP, metrics)
.unwrap_or_else(|_| panic!("Failed to create metric: {M_CG_NAME}")),
metric_cg_members: register_int_gauge_vec_with_registry!(
M_CG_MEMBERS_NAME,
M_CG_MEMBERS_HELP,
&[LABEL_GROUP],
metrics
)
.unwrap_or_else(|_| panic!("Failed to create metric: {M_CG_MEMBERS_NAME}")),
metric_cg_fetch: register_histogram_with_registry!(
M_CG_FETCH_NAME,
M_CG_FETCH_HELP,
metrics
)
.unwrap_or_else(|_| panic!("Failed to create metric: {M_CG_FETCH_NAME}")),
metric_cg_ch_cap: register_int_gauge_with_registry!(
M_CG_CH_CAP_NAME,
M_CG_CH_CAP_HELP,
metrics
)
.unwrap_or_else(|_| panic!("Failed to create metric: {M_CG_CH_CAP_NAME}")),
}
}
}
Expand Down

0 comments on commit 50d1426

Please sign in to comment.