diff --git a/src/consumer_groups/emitter.rs b/src/consumer_groups/emitter.rs index 93ed7fa..4d5f6de 100644 --- a/src/consumer_groups/emitter.rs +++ b/src/consumer_groups/emitter.rs @@ -6,7 +6,10 @@ use std::{ use async_trait::async_trait; use const_format::formatcp; use konsumer_offsets::ConsumerProtocolAssignment; -use prometheus::{Histogram, HistogramOpts, IntGauge, IntGaugeVec, Opts, Registry}; +use prometheus::{ + register_histogram_with_registry, register_int_gauge_vec_with_registry, + register_int_gauge_with_registry, Histogram, IntGauge, IntGaugeVec, Registry, +}; use rdkafka::{admin::AdminClient, client::DefaultClientContext, groups::GroupList, ClientConfig}; use tokio::{ sync::mpsc, @@ -32,7 +35,7 @@ const M_CG_MEMBERS_HELP: &str = "Members of consumer groups currently in the clu const M_CG_FETCH_NAME: &str = formatcp!("{NAMESPACE}_consumer_groups_emitter_fetch_time_milliseconds"); const M_CG_FETCH_HELP: &str = - "Time (in milliseconds) taken to fetch information about all consumer groups in cluster"; + "Time (ms) taken to fetch information about all consumer groups in cluster"; const M_CG_CH_CAP_NAME: &str = formatcp!("{NAMESPACE}_consumer_groups_emitter_channel_capacity"); const M_CG_CH_CAP_HELP: &str = "Capacity of internal channel used to send consumer groups metadata to rest of the service"; @@ -128,38 +131,29 @@ impl ConsumerGroupsEmitter { /// /// * `admin_client_config` - Kafka admin client configuration, used to fetch Consumer Groups pub fn new(admin_client_config: ClientConfig, metrics: Arc) -> Self { - // Create metrics - let metric_cg = IntGauge::new(M_CG_NAME, M_CG_HELP) - .unwrap_or_else(|_| panic!("Failed to create metric: {M_CG_NAME}")); - let metric_cg_members = - IntGaugeVec::new(Opts::new(M_CG_MEMBERS_NAME, M_CG_MEMBERS_HELP), &[LABEL_GROUP]) - .unwrap_or_else(|_| panic!("Failed to create metric: {M_CG_MEMBERS_NAME}")); - let metric_cg_fetch = - Histogram::with_opts(HistogramOpts::new(M_CG_FETCH_NAME, M_CG_FETCH_HELP)) - .unwrap_or_else(|_| panic!("Failed to create metric: {M_CG_FETCH_NAME}")); - let metric_cg_ch_cap = IntGauge::new(M_CG_CH_CAP_NAME, M_CG_CH_CAP_HELP) - .unwrap_or_else(|_| panic!("Failed to create metric: {M_CG_CH_CAP_NAME}")); - - // Register metrics - metrics - .register(Box::new(metric_cg.clone())) - .unwrap_or_else(|_| panic!("Failed to register metric: {M_CG_NAME}")); - metrics - .register(Box::new(metric_cg_members.clone())) - .unwrap_or_else(|_| panic!("Failed to register metric: {M_CG_MEMBERS_NAME}")); - metrics - .register(Box::new(metric_cg_fetch.clone())) - .unwrap_or_else(|_| panic!("Failed to register metric: {M_CG_FETCH_NAME}")); - metrics - .register(Box::new(metric_cg_ch_cap.clone())) - .unwrap_or_else(|_| panic!("Failed to register metric: {M_CG_CH_CAP_NAME}")); - Self { admin_client_config, - metric_cg, - metric_cg_members, - metric_cg_fetch, - metric_cg_ch_cap, + metric_cg: register_int_gauge_with_registry!(M_CG_NAME, M_CG_HELP, metrics) + .unwrap_or_else(|_| panic!("Failed to create metric: {M_CG_NAME}")), + metric_cg_members: register_int_gauge_vec_with_registry!( + M_CG_MEMBERS_NAME, + M_CG_MEMBERS_HELP, + &[LABEL_GROUP], + metrics + ) + .unwrap_or_else(|_| panic!("Failed to create metric: {M_CG_MEMBERS_NAME}")), + metric_cg_fetch: register_histogram_with_registry!( + M_CG_FETCH_NAME, + M_CG_FETCH_HELP, + metrics + ) + .unwrap_or_else(|_| panic!("Failed to create metric: {M_CG_FETCH_NAME}")), + metric_cg_ch_cap: register_int_gauge_with_registry!( + M_CG_CH_CAP_NAME, + M_CG_CH_CAP_HELP, + metrics + ) + .unwrap_or_else(|_| panic!("Failed to create metric: {M_CG_CH_CAP_NAME}")), } } }