From ed9e45e9284b8787c9d5da061f7771540fe5e718 Mon Sep 17 00:00:00 2001 From: Ivan De Marino Date: Mon, 8 Jan 2024 20:52:01 +0000 Subject: [PATCH] Removing dependency on the `async-trait` crate (#115) * Removing dependency on the `async-trait` crate As per Rust 1.75, I don't need to make use of this crate anymore to define traits that have `async` methods in them. See: https://blog.rust-lang.org/2023/12/21/async-fn-rpit-in-traits.html#where-the-gaps-lie. * Updated `CHANGELOG` with recent changes --- CHANGELOG.md | 7 +++++++ Cargo.lock | 1 - Cargo.toml | 1 - src/cluster_status/emitter.rs | 2 -- src/cluster_status/register.rs | 2 -- src/consumer_groups/emitter.rs | 2 -- src/internals/awaitable.rs | 2 -- src/internals/emitter.rs | 2 -- src/konsumer_offsets_data/emitter.rs | 2 -- src/lag_register/register.rs | 28 +++++++++++----------------- src/partition_offsets/emitter.rs | 2 -- src/partition_offsets/register.rs | 2 -- 12 files changed, 18 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 40f4d8b..7fcc90b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +# vNEXT + +## Notes + +* Updated [`axum`](https://crates.io/crates/axum) (a key dependency) to `v0.7` ([PR#113](https://github.com/kafkesc/kommitted/pull/113)) +* Removed dependency on [`async-trait`](https://crates.io/crates/async-trait) (as per [Rust 1.75](https://blog.rust-lang.org/2023/12/21/async-fn-rpit-in-traits.html#where-the-gaps-lie)) ([PR#115](https://github.com/kafkesc/kommitted/pull/115)) + # v0.2.2 (2023-11-19) ## Features diff --git a/Cargo.lock b/Cargo.lock index c7bad82..668ce42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -617,7 +617,6 @@ dependencies = [ name = "kommitted" version = "0.2.2" dependencies = [ - "async-trait", "axum", "chrono", "clap", diff --git a/Cargo.toml b/Cargo.toml index f7d0278..aaf8370 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,6 @@ exclude = [ # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -async-trait = "0.1.77" axum = { version = "0.7.3", features = ["http2"] } chrono = "0.4.31" clap = { version = "4.4.13", features = ["derive", "deprecated", "env", "wrap_help"] } diff --git a/src/cluster_status/emitter.rs b/src/cluster_status/emitter.rs index 114ced1..ec2cb0f 100644 --- a/src/cluster_status/emitter.rs +++ b/src/cluster_status/emitter.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use async_trait::async_trait; use prometheus::{ register_histogram_with_registry, register_int_gauge_with_registry, Histogram, IntGauge, Registry, @@ -101,7 +100,6 @@ impl ClusterStatusEmitter { } } -#[async_trait] impl Emitter for ClusterStatusEmitter { type Emitted = ClusterStatus; diff --git a/src/cluster_status/register.rs b/src/cluster_status/register.rs index 75384b8..11a411a 100644 --- a/src/cluster_status/register.rs +++ b/src/cluster_status/register.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use async_trait::async_trait; use prometheus::{ register_int_gauge_vec_with_registry, register_int_gauge_with_registry, IntGauge, IntGaugeVec, Registry, @@ -193,7 +192,6 @@ impl ClusterStatusRegister { } } -#[async_trait] impl Awaitable for ClusterStatusRegister { /// [`Self`] ready when its internal copy of [`ClusterStatus`] has been populated. async fn is_ready(&self) -> bool { diff --git a/src/consumer_groups/emitter.rs b/src/consumer_groups/emitter.rs index cfac30f..e25df14 100644 --- a/src/consumer_groups/emitter.rs +++ b/src/consumer_groups/emitter.rs @@ -3,7 +3,6 @@ use std::{ sync::Arc, }; -use async_trait::async_trait; use konsumer_offsets::ConsumerProtocolAssignment; use prometheus::{ register_histogram_with_registry, register_int_gauge_vec_with_registry, @@ -156,7 +155,6 @@ impl ConsumerGroupsEmitter { } } -#[async_trait] impl Emitter for ConsumerGroupsEmitter { type Emitted = ConsumerGroups; diff --git a/src/internals/awaitable.rs b/src/internals/awaitable.rs index 67b63da..a8e483a 100644 --- a/src/internals/awaitable.rs +++ b/src/internals/awaitable.rs @@ -1,4 +1,3 @@ -use async_trait::async_trait; use thiserror::Error; use tokio::{time::interval, time::Duration}; use tokio_util::sync::CancellationToken; @@ -10,7 +9,6 @@ const READYNESS_CHECK_INTERVAL: Duration = Duration::from_secs(1); /// An [`Self`] is ready once [`Self::is_ready`] returns `true`. /// The trait is built on _async/await_. The waiting is done by calling [`Self::await_ready`]: /// a [`CancellationToken`] is provided in case the _awaiting loop_ has to be interrupted. -#[async_trait] pub trait Awaitable { /// Returns `true` if [`Self`] is ready, `false` otherwise. /// diff --git a/src/internals/emitter.rs b/src/internals/emitter.rs index e3ad7d8..214c81b 100644 --- a/src/internals/emitter.rs +++ b/src/internals/emitter.rs @@ -1,4 +1,3 @@ -use async_trait::async_trait; use tokio::{sync::mpsc, task::JoinHandle, time::Interval}; use tokio_util::sync::CancellationToken; @@ -8,7 +7,6 @@ use tokio_util::sync::CancellationToken; /// It terminates itself when [`CancellationToken`] is cancelled (elsewhere). /// /// Awaiting for its termination should be done via the returned [`JoinHandle`]. -#[async_trait] pub trait Emitter { type Emitted: Send; diff --git a/src/konsumer_offsets_data/emitter.rs b/src/konsumer_offsets_data/emitter.rs index 3212775..353bc2d 100644 --- a/src/konsumer_offsets_data/emitter.rs +++ b/src/konsumer_offsets_data/emitter.rs @@ -1,4 +1,3 @@ -use async_trait::async_trait; use konsumer_offsets::KonsumerOffsetsData; use rdkafka::error::KafkaError; use rdkafka::{ @@ -125,7 +124,6 @@ impl ConsumerContext for KonsumerOffsetsDataContext { type KonsumerOffsetsDataConsumer = StreamConsumer; -#[async_trait] impl Emitter for KonsumerOffsetsDataEmitter { type Emitted = KonsumerOffsetsData; diff --git a/src/lag_register/register.rs b/src/lag_register/register.rs index a149ebf..c83965f 100644 --- a/src/lag_register/register.rs +++ b/src/lag_register/register.rs @@ -3,7 +3,6 @@ use std::{ sync::Arc, }; -use async_trait::async_trait; use chrono::{DateTime, Duration, Utc}; use konsumer_offsets::{GroupMetadata, KonsumerOffsetsData, OffsetCommit}; use log::Level::Trace; @@ -216,29 +215,25 @@ async fn process_offset_commit( let l = Lag { offset: oc.offset as u64, offset_timestamp: oc.commit_timestamp, - offset_lag: match po_reg.estimate_offset_lag(&tp, oc.offset as u64).await { - Ok(ol) => ol, - Err(e) => { - debug!( + offset_lag: po_reg.estimate_offset_lag(&tp, oc.offset as u64) + .await + .unwrap_or_else(|e| { + debug!( "Failed to estimate Offset Lag of Group '{}' for Topic Partition '{}': {}", oc.group, tp, e ); - 0 - }, - }, - time_lag: match po_reg + 0 + }), + time_lag: po_reg .estimate_time_lag(&tp, oc.offset as u64, oc.commit_timestamp) .await - { - Ok(tl) => tl, - Err(e) => { - debug!( + .unwrap_or_else(|e| { + debug!( "Failed to estimate Time Lag of Group '{}' for Topic Partition '{}': {}", oc.group, tp, e ); - Duration::zero() - }, - }, + Duration::zero() + }), }; // Create or update entry `TopicPartition -> LagWithOwner`: @@ -334,7 +329,6 @@ async fn process_group_metadata( } } -#[async_trait] impl Awaitable for LagRegister { async fn is_ready(&self) -> bool { // TODO https://github.com/kafkesc/kommitted/issues/59 diff --git a/src/partition_offsets/emitter.rs b/src/partition_offsets/emitter.rs index 8253c65..914748d 100644 --- a/src/partition_offsets/emitter.rs +++ b/src/partition_offsets/emitter.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use async_trait::async_trait; use chrono::{DateTime, Utc}; use prometheus::{ register_histogram_vec_with_registry, register_int_gauge_with_registry, HistogramVec, IntGauge, @@ -92,7 +91,6 @@ impl PartitionOffsetsEmitter { } } -#[async_trait] impl Emitter for PartitionOffsetsEmitter { type Emitted = PartitionOffset; diff --git a/src/partition_offsets/register.rs b/src/partition_offsets/register.rs index 33054bc..d05bfa1 100644 --- a/src/partition_offsets/register.rs +++ b/src/partition_offsets/register.rs @@ -1,6 +1,5 @@ use std::{collections::HashMap, sync::Arc}; -use async_trait::async_trait; use chrono::{DateTime, Duration, Utc}; use prometheus::{register_int_gauge_vec_with_registry, IntGaugeVec, Registry}; use tokio::sync::{mpsc::Receiver, RwLock}; @@ -300,7 +299,6 @@ impl PartitionOffsetsRegister { } } -#[async_trait] impl Awaitable for PartitionOffsetsRegister { async fn is_ready(&self) -> bool { let (min, max, avg, count) = self.get_usage().await;