From 627f4e4d8eeb30f7453e615b4432c0d8ad3eeb58 Mon Sep 17 00:00:00 2001 From: Daniel Tai Date: Tue, 14 Nov 2023 13:25:22 +0800 Subject: [PATCH] !feat(client): uds support --- README.md | 24 ++++++++++++++++-------- src/client.rs | 44 +++++++++++++++++++++++++++++++------------- src/lib.rs | 8 ++++---- src/worker.rs | 12 +++++++----- 4 files changed, 58 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index 0f5eaa7..720a538 100644 --- a/README.md +++ b/README.md @@ -27,10 +27,18 @@ use tokio_fluent::record_map; #[tokio::main] async fn main() { - let client = Client::new(&Config { - addr: "127.0.0.1:24224".parse().unwrap(), - ..Default::default() - }) + // Connect to server using TCP + let client = Client::new_tcp( + "127.0.0.1:24224".parse().unwrap(), + &Config {..Default::default()} + ) + .await + .unwrap(); + // Or connecting using unix socket + let client_unix = Client::new_unix( + "/path/to/fluentd.sock", + &Config {..Default::default()} + ) .await .unwrap(); @@ -59,10 +67,10 @@ async fn main() { ## Setting config values ```rust -let client = Client::new(&Config { - addr: "127.0.0.1:24224".parse().unwrap(), - ..Default::default() - }) +let client = Client::new_tcp( + "127.0.0.1:24224".parse().unwrap(), + &Config {..Default::default()} + ) .await .unwrap(); ``` diff --git a/src/client.rs b/src/client.rs index de1eebf..02f15aa 100644 --- a/src/client.rs +++ b/src/client.rs @@ -8,10 +8,10 @@ //! //! #[tokio::main] //! async fn main() { -//! let client = Client::new(&Config { -//! addr: "127.0.0.1:24224".parse().unwrap(), -//! ..Default::default() -//! }) +//! let client = Client::new_tcp( +//! "127.0.0.1:24224".parse().unwrap(), +//! &Config{..Default::default()}, +//! ) //! .await //! .unwrap(); //! @@ -22,11 +22,12 @@ //! ``` use std::net::SocketAddr; +use std::path::Path; use std::time::Duration; use base64::{engine::general_purpose, Engine}; use tokio::{ - net::TcpStream, + net::{TcpStream, UnixStream}, sync::broadcast::{channel, Sender}, time::timeout, }; @@ -51,9 +52,6 @@ impl std::fmt::Display for SendError { #[derive(Debug, Clone)] /// Config for a client. pub struct Config { - /// The address of the fluentd server. - /// The default is `127.0.0.1:24224`. - pub addr: SocketAddr, /// The timeout value to connect to the fluentd server. /// The default is 3 seconds. pub timeout: Duration, @@ -72,7 +70,6 @@ pub struct Config { impl Default for Config { fn default() -> Self { Self { - addr: "127.0.0.1:24224".parse().unwrap(), timeout: Duration::new(3, 0), retry_wait: 500, max_retry: 10, @@ -93,9 +90,31 @@ pub struct Client { } impl Client { - /// Connect to the fluentd server and create a worker with tokio::spawn. - pub async fn new(config: &Config) -> tokio::io::Result { - let stream = timeout(config.timeout, TcpStream::connect(config.addr)).await??; + /// Connect to the fluentd server using TCP and create a worker with tokio::spawn. + pub async fn new_tcp(addr: SocketAddr, config: &Config) -> tokio::io::Result { + let stream = timeout(config.timeout, TcpStream::connect(addr)).await??; + let (sender, receiver) = channel(1024); + + let config = config.clone(); + let _ = tokio::spawn(async move { + let mut worker = Worker::new( + stream, + receiver, + RetryConfig { + initial_wait: config.retry_wait, + max: config.max_retry, + max_wait: config.max_retry_wait, + }, + ); + worker.run().await + }); + + Ok(Self { sender }) + } + + /// Connect to the fluentd server using unix domain socket and create a worker with tokio::spawn. + pub async fn new_unix>(path: P, config: &Config) -> tokio::io::Result { + let stream = timeout(config.timeout, UnixStream::connect(path)).await??; let (sender, receiver) = channel(1024); let config = config.clone(); @@ -239,7 +258,6 @@ mod tests { #[test] fn test_default_config() { let config: Config = Default::default(); - assert_eq!(config.addr, "127.0.0.1:24224".parse().unwrap()); assert_eq!(config.timeout, Duration::new(3, 0)); assert_eq!(config.retry_wait, 500); assert_eq!(config.max_retry, 10); diff --git a/src/lib.rs b/src/lib.rs index 978cac7..3133599 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,10 +11,10 @@ //! //! #[tokio::main] //! async fn main() { -//! let client = Client::new(&Config { -//! addr: "127.0.0.1:24224".parse().unwrap(), -//! ..Default::default() -//! }) +//! let client = Client::new_tcp( +//! "127.0.0.1:24224".parse().unwrap(), +//! &Config{..Default::default()}, +//! ) //! .await //! .unwrap(); //! diff --git a/src/worker.rs b/src/worker.rs index 2a8c12e..0ff222c 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -4,7 +4,6 @@ use rmp_serde::Serializer; use serde::{ser::SerializeMap, Deserialize, Serialize}; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, - net::TcpStream, sync::broadcast::{error::RecvError, Receiver}, time::Duration, }; @@ -84,14 +83,17 @@ pub struct RetryConfig { pub max_wait: u64, } -pub struct Worker { - stream: TcpStream, +pub struct Worker { + stream: StreamType, receiver: Receiver, retry_config: RetryConfig, } -impl Worker { - pub fn new(stream: TcpStream, receiver: Receiver, retry_config: RetryConfig) -> Self { +impl Worker +where + StreamType: AsyncReadExt + AsyncWriteExt + Unpin, +{ + pub fn new(stream: StreamType, receiver: Receiver, retry_config: RetryConfig) -> Self { Self { stream, receiver,