Skip to content

Commit

Permalink
Removing dependency on the async-trait crate (#115)
Browse files Browse the repository at this point in the history
* Removing dependency on the `async-trait` crate

As per Rust 1.75, I don't need to make use of this crate anymore to define traits that have `async` methods in them.
See: https://blog.rust-lang.org/2023/12/21/async-fn-rpit-in-traits.html#where-the-gaps-lie.

* Updated `CHANGELOG` with recent changes
  • Loading branch information
detro authored Jan 8, 2024
1 parent a0baf98 commit ed9e45e
Show file tree
Hide file tree
Showing 12 changed files with 18 additions and 35 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
# vNEXT

## Notes

* Updated [`axum`](https://crates.io/crates/axum) (a key dependency) to `v0.7` ([PR#113](https://github.com/kafkesc/kommitted/pull/113))
* Removed dependency on [`async-trait`](https://crates.io/crates/async-trait) (as per [Rust 1.75](https://blog.rust-lang.org/2023/12/21/async-fn-rpit-in-traits.html#where-the-gaps-lie)) ([PR#115](https://github.com/kafkesc/kommitted/pull/115))

# v0.2.2 (2023-11-19)

## Features
Expand Down
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ exclude = [
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0.1.77"
axum = { version = "0.7.3", features = ["http2"] }
chrono = "0.4.31"
clap = { version = "4.4.13", features = ["derive", "deprecated", "env", "wrap_help"] }
Expand Down
2 changes: 0 additions & 2 deletions src/cluster_status/emitter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::sync::Arc;

use async_trait::async_trait;
use prometheus::{
register_histogram_with_registry, register_int_gauge_with_registry, Histogram, IntGauge,
Registry,
Expand Down Expand Up @@ -101,7 +100,6 @@ impl ClusterStatusEmitter {
}
}

#[async_trait]
impl Emitter for ClusterStatusEmitter {
type Emitted = ClusterStatus;

Expand Down
2 changes: 0 additions & 2 deletions src/cluster_status/register.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::sync::Arc;

use async_trait::async_trait;
use prometheus::{
register_int_gauge_vec_with_registry, register_int_gauge_with_registry, IntGauge, IntGaugeVec,
Registry,
Expand Down Expand Up @@ -193,7 +192,6 @@ impl ClusterStatusRegister {
}
}

#[async_trait]
impl Awaitable for ClusterStatusRegister {
/// [`Self`] ready when its internal copy of [`ClusterStatus`] has been populated.
async fn is_ready(&self) -> bool {
Expand Down
2 changes: 0 additions & 2 deletions src/consumer_groups/emitter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{
sync::Arc,
};

use async_trait::async_trait;
use konsumer_offsets::ConsumerProtocolAssignment;
use prometheus::{
register_histogram_with_registry, register_int_gauge_vec_with_registry,
Expand Down Expand Up @@ -156,7 +155,6 @@ impl ConsumerGroupsEmitter {
}
}

#[async_trait]
impl Emitter for ConsumerGroupsEmitter {
type Emitted = ConsumerGroups;

Expand Down
2 changes: 0 additions & 2 deletions src/internals/awaitable.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use async_trait::async_trait;
use thiserror::Error;
use tokio::{time::interval, time::Duration};
use tokio_util::sync::CancellationToken;
Expand All @@ -10,7 +9,6 @@ const READYNESS_CHECK_INTERVAL: Duration = Duration::from_secs(1);
/// An [`Self`] is ready once [`Self::is_ready`] returns `true`.
/// The trait is built on _async/await_. The waiting is done by calling [`Self::await_ready`]:
/// a [`CancellationToken`] is provided in case the _awaiting loop_ has to be interrupted.
#[async_trait]
pub trait Awaitable {
/// Returns `true` if [`Self`] is ready, `false` otherwise.
///
Expand Down
2 changes: 0 additions & 2 deletions src/internals/emitter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use async_trait::async_trait;
use tokio::{sync::mpsc, task::JoinHandle, time::Interval};
use tokio_util::sync::CancellationToken;

Expand All @@ -8,7 +7,6 @@ use tokio_util::sync::CancellationToken;
/// It terminates itself when [`CancellationToken`] is cancelled (elsewhere).
///
/// Awaiting for its termination should be done via the returned [`JoinHandle`].
#[async_trait]
pub trait Emitter {
type Emitted: Send;

Expand Down
2 changes: 0 additions & 2 deletions src/konsumer_offsets_data/emitter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use async_trait::async_trait;
use konsumer_offsets::KonsumerOffsetsData;
use rdkafka::error::KafkaError;
use rdkafka::{
Expand Down Expand Up @@ -125,7 +124,6 @@ impl ConsumerContext for KonsumerOffsetsDataContext {

type KonsumerOffsetsDataConsumer = StreamConsumer<KonsumerOffsetsDataContext>;

#[async_trait]
impl Emitter for KonsumerOffsetsDataEmitter {
type Emitted = KonsumerOffsetsData;

Expand Down
28 changes: 11 additions & 17 deletions src/lag_register/register.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::{
sync::Arc,
};

use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
use konsumer_offsets::{GroupMetadata, KonsumerOffsetsData, OffsetCommit};
use log::Level::Trace;
Expand Down Expand Up @@ -216,29 +215,25 @@ async fn process_offset_commit(
let l = Lag {
offset: oc.offset as u64,
offset_timestamp: oc.commit_timestamp,
offset_lag: match po_reg.estimate_offset_lag(&tp, oc.offset as u64).await {
Ok(ol) => ol,
Err(e) => {
debug!(
offset_lag: po_reg.estimate_offset_lag(&tp, oc.offset as u64)
.await
.unwrap_or_else(|e| {
debug!(
"Failed to estimate Offset Lag of Group '{}' for Topic Partition '{}': {}",
oc.group, tp, e
);
0
},
},
time_lag: match po_reg
0
}),
time_lag: po_reg
.estimate_time_lag(&tp, oc.offset as u64, oc.commit_timestamp)
.await
{
Ok(tl) => tl,
Err(e) => {
debug!(
.unwrap_or_else(|e| {
debug!(
"Failed to estimate Time Lag of Group '{}' for Topic Partition '{}': {}",
oc.group, tp, e
);
Duration::zero()
},
},
Duration::zero()
}),
};

// Create or update entry `TopicPartition -> LagWithOwner`:
Expand Down Expand Up @@ -334,7 +329,6 @@ async fn process_group_metadata(
}
}

#[async_trait]
impl Awaitable for LagRegister {
async fn is_ready(&self) -> bool {
// TODO https://github.com/kafkesc/kommitted/issues/59
Expand Down
2 changes: 0 additions & 2 deletions src/partition_offsets/emitter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::sync::Arc;

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use prometheus::{
register_histogram_vec_with_registry, register_int_gauge_with_registry, HistogramVec, IntGauge,
Expand Down Expand Up @@ -92,7 +91,6 @@ impl PartitionOffsetsEmitter {
}
}

#[async_trait]
impl Emitter for PartitionOffsetsEmitter {
type Emitted = PartitionOffset;

Expand Down
2 changes: 0 additions & 2 deletions src/partition_offsets/register.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{collections::HashMap, sync::Arc};

use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
use prometheus::{register_int_gauge_vec_with_registry, IntGaugeVec, Registry};
use tokio::sync::{mpsc::Receiver, RwLock};
Expand Down Expand Up @@ -300,7 +299,6 @@ impl PartitionOffsetsRegister {
}
}

#[async_trait]
impl Awaitable for PartitionOffsetsRegister {
async fn is_ready(&self) -> bool {
let (min, max, avg, count) = self.get_usage().await;
Expand Down

0 comments on commit ed9e45e

Please sign in to comment.