Skip to content

Commit

Permalink
Merge pull request #47 from danielsig727/uds_support
Browse files Browse the repository at this point in the history
!feat(client): uds support
  • Loading branch information
johnmanjiro13 authored Nov 17, 2023
2 parents a059ac9 + 627f4e4 commit f307f0b
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 30 deletions.
24 changes: 16 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,18 @@ use tokio_fluent::record_map;

#[tokio::main]
async fn main() {
let client = Client::new(&Config {
addr: "127.0.0.1:24224".parse().unwrap(),
..Default::default()
})
// Connect to server using TCP
let client = Client::new_tcp(
"127.0.0.1:24224".parse().unwrap(),
&Config {..Default::default()}
)
.await
.unwrap();
// Or connecting using unix socket
let client_unix = Client::new_unix(
"/path/to/fluentd.sock",
&Config {..Default::default()}
)
.await
.unwrap();

Expand Down Expand Up @@ -59,10 +67,10 @@ async fn main() {
## Setting config values

```rust
let client = Client::new(&Config {
addr: "127.0.0.1:24224".parse().unwrap(),
..Default::default()
})
let client = Client::new_tcp(
"127.0.0.1:24224".parse().unwrap(),
&Config {..Default::default()}
)
.await
.unwrap();
```
Expand Down
44 changes: 31 additions & 13 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
//!
//! #[tokio::main]
//! async fn main() {
//! let client = Client::new(&Config {
//! addr: "127.0.0.1:24224".parse().unwrap(),
//! ..Default::default()
//! })
//! let client = Client::new_tcp(
//! "127.0.0.1:24224".parse().unwrap(),
//! &Config{..Default::default()},
//! )
//! .await
//! .unwrap();
//!
Expand All @@ -22,11 +22,12 @@
//! ```

use std::net::SocketAddr;
use std::path::Path;
use std::time::Duration;

use base64::{engine::general_purpose, Engine};
use tokio::{
net::TcpStream,
net::{TcpStream, UnixStream},
sync::broadcast::{channel, Sender},
time::timeout,
};
Expand All @@ -51,9 +52,6 @@ impl std::fmt::Display for SendError {
#[derive(Debug, Clone)]
/// Config for a client.
pub struct Config {
/// The address of the fluentd server.
/// The default is `127.0.0.1:24224`.
pub addr: SocketAddr,
/// The timeout value to connect to the fluentd server.
/// The default is 3 seconds.
pub timeout: Duration,
Expand All @@ -72,7 +70,6 @@ pub struct Config {
impl Default for Config {
fn default() -> Self {
Self {
addr: "127.0.0.1:24224".parse().unwrap(),
timeout: Duration::new(3, 0),
retry_wait: 500,
max_retry: 10,
Expand All @@ -93,9 +90,31 @@ pub struct Client {
}

impl Client {
/// Connect to the fluentd server and create a worker with tokio::spawn.
pub async fn new(config: &Config) -> tokio::io::Result<Client> {
let stream = timeout(config.timeout, TcpStream::connect(config.addr)).await??;
/// Connect to the fluentd server using TCP and create a worker with tokio::spawn.
pub async fn new_tcp(addr: SocketAddr, config: &Config) -> tokio::io::Result<Client> {
let stream = timeout(config.timeout, TcpStream::connect(addr)).await??;
let (sender, receiver) = channel(1024);

let config = config.clone();
let _ = tokio::spawn(async move {
let mut worker = Worker::new(
stream,
receiver,
RetryConfig {
initial_wait: config.retry_wait,
max: config.max_retry,
max_wait: config.max_retry_wait,
},
);
worker.run().await
});

Ok(Self { sender })
}

/// Connect to the fluentd server using unix domain socket and create a worker with tokio::spawn.
pub async fn new_unix<P: AsRef<Path>>(path: P, config: &Config) -> tokio::io::Result<Client> {
let stream = timeout(config.timeout, UnixStream::connect(path)).await??;
let (sender, receiver) = channel(1024);

let config = config.clone();
Expand Down Expand Up @@ -239,7 +258,6 @@ mod tests {
#[test]
fn test_default_config() {
let config: Config = Default::default();
assert_eq!(config.addr, "127.0.0.1:24224".parse().unwrap());
assert_eq!(config.timeout, Duration::new(3, 0));
assert_eq!(config.retry_wait, 500);
assert_eq!(config.max_retry, 10);
Expand Down
8 changes: 4 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@
//!
//! #[tokio::main]
//! async fn main() {
//! let client = Client::new(&Config {
//! addr: "127.0.0.1:24224".parse().unwrap(),
//! ..Default::default()
//! })
//! let client = Client::new_tcp(
//! "127.0.0.1:24224".parse().unwrap(),
//! &Config{..Default::default()},
//! )
//! .await
//! .unwrap();
//!
Expand Down
12 changes: 7 additions & 5 deletions src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use rmp_serde::Serializer;
use serde::{ser::SerializeMap, Deserialize, Serialize};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpStream,
sync::broadcast::{error::RecvError, Receiver},
time::Duration,
};
Expand Down Expand Up @@ -84,14 +83,17 @@ pub struct RetryConfig {
pub max_wait: u64,
}

pub struct Worker {
stream: TcpStream,
pub struct Worker<StreamType> {
stream: StreamType,
receiver: Receiver<Message>,
retry_config: RetryConfig,
}

impl Worker {
pub fn new(stream: TcpStream, receiver: Receiver<Message>, retry_config: RetryConfig) -> Self {
impl<StreamType> Worker<StreamType>
where
StreamType: AsyncReadExt + AsyncWriteExt + Unpin,
{
pub fn new(stream: StreamType, receiver: Receiver<Message>, retry_config: RetryConfig) -> Self {
Self {
stream,
receiver,
Expand Down

0 comments on commit f307f0b

Please sign in to comment.