Skip to content

Commit

Permalink
Merge pull request #1 from BlockProject3D/async-optimize
Browse files Browse the repository at this point in the history
Async optimize
  • Loading branch information
Yuri6037 authored Mar 27, 2024
2 parents 0f4ef2e + c79ce6e commit 330bbf0
Show file tree
Hide file tree
Showing 5 changed files with 407 additions and 120 deletions.
9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "bp3d-logger"
version = "1.1.0"
version = "2.0.0-rc.1.0.0"
authors = ["Yuri Edward <yuri6037@outlook.com>"]
edition = "2021"
description = "A flexible Log implementation intended to be used with BP3D software."
Expand All @@ -14,12 +14,11 @@ categories = []

[dependencies]
log = "0.4.14"
bp3d-fs = "1.0.0"
bp3d-os = { version = "1.0.0-rc.1.2.1", features=["dirs", "time"] }
crossbeam-channel = "0.5.2"
crossbeam-queue = "0.3.8"
once_cell = "1.10.0"
time = { version = "0.3.7", features = ["formatting", "macros"] }
time-tz = { version = "0.3.1", features = ["system"] }
time = { version = "0.3.0", features = ["formatting", "macros"] }
termcolor = "1.1.3"
atty = "0.2.14"

[features]
25 changes: 19 additions & 6 deletions src/backend.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021, BlockProject 3D
// Copyright (c) 2023, BlockProject 3D
//
// All rights reserved.
//
Expand Down Expand Up @@ -28,13 +28,12 @@

use crate::easy_termcolor::{color, EasyTermColor};
use crate::Colors;
use atty::Stream;
use log::Level;
use std::collections::HashMap;
use std::fmt::Display;
use std::fmt::Formatter;
use std::fs::{File, OpenOptions};
use std::io::{BufWriter, Write};
use std::io::{BufWriter, IsTerminal, Write};
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use termcolor::{ColorChoice, ColorSpec, StandardStream};
Expand Down Expand Up @@ -72,11 +71,25 @@ fn write_msg(stream: StandardStream, target: &str, msg: &str, level: Level) {
.color(color(level))
.write(level)
.reset()
.write(']')
.write(format!(" {}", msg))
.write("] ")
.write(msg)
.lf();
}

enum Stream {
Stdout,
Stderr,
}

impl Stream {
pub fn isatty(&self) -> bool {
match self {
Stream::Stdout => std::io::stdout().is_terminal(),
Stream::Stderr => std::io::stderr().is_terminal(),
}
}
}

impl StdBackend {
pub fn new(smart_stderr: bool, colors: Colors) -> StdBackend {
StdBackend {
Expand Down Expand Up @@ -108,7 +121,7 @@ impl Backend for StdBackend {
let use_termcolor = match self.colors {
Colors::Disabled => false,
Colors::Enabled => true,
Colors::Auto => atty::is(stream),
Colors::Auto => stream.isatty(),
};
match use_termcolor {
true => {
Expand Down
209 changes: 126 additions & 83 deletions src/internal.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2021, BlockProject 3D
// Copyright (c) 2023, BlockProject 3D
//
// All rights reserved.
//
Expand Down Expand Up @@ -28,20 +28,28 @@

use crate::backend::Backend;
use crate::{LogMsg, Logger};
use bp3d_os::time::LocalOffsetDateTime;
use crossbeam_channel::{bounded, Receiver, Sender};
use crossbeam_queue::ArrayQueue;
use log::{Level, Log, Metadata, Record};
use std::fmt::Write;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
use time::macros::format_description;
use time::OffsetDateTime;
use time_tz::OffsetDateTimeExt;

const BUF_SIZE: usize = 128; // The maximum count of log messages in the channel.
const BUF_SIZE: usize = 16; // The maximum count of log messages in the channel.

//Disable large_enum_variant as using a Box will inevitably cause a small allocation on a critical path,
//allocating in a critical code path will most likely result in degraded performance.
//And yes, logging is a critical path when using bp3d-tracing.
#[allow(clippy::large_enum_variant)]
enum Command {
Flush,
Log(LogMsg),
Terminate,
EnableLogBuffer,
DisableLogBuffer,
}

fn log<T: Backend>(
Expand All @@ -57,58 +65,99 @@ fn log<T: Backend>(
}
}

fn exec_commad(cmd: Command, logger: &mut Logger) -> bool {
match cmd {
Command::Terminate => true,
Command::Flush => {
if let Some(file) = &mut logger.file {
if let Err(e) = file.flush() {
struct Thread {
logger: Logger,
recv_ch: Receiver<Command>,
enable_log_buffer: bool,
log_buffer: Arc<ArrayQueue<LogMsg>>,
}

impl Thread {
pub fn new(
logger: Logger,
recv_ch: Receiver<Command>,
log_buffer: Arc<ArrayQueue<LogMsg>>,
) -> Thread {
Thread {
logger,
recv_ch,
enable_log_buffer: false,
log_buffer,
}
}

fn exec_commad(&mut self, cmd: Command) -> bool {
match cmd {
Command::Terminate => true,
Command::Flush => {
if let Some(file) = &mut self.logger.file {
if let Err(e) = file.flush() {
let _ = log(
self.logger.std.as_mut(),
"bp3d-logger",
&format!("Could not flush file backend: {}", e),
Level::Error,
);
}
}
false
}
Command::Log(buffer) => {
let target = buffer.target();
let msg = buffer.msg();
let level = buffer.level();
if let Err(e) = log(self.logger.file.as_mut(), target, msg, level) {
let _ = log(
logger.std.as_mut(),
self.logger.std.as_mut(),
"bp3d-logger",
&format!("Could not flush file backend: {}", e),
&format!("Could not write to file backend: {}", e),
Level::Error,
);
}
let _ = log(self.logger.std.as_mut(), target, msg, level);
if self.enable_log_buffer {
self.log_buffer.force_push(buffer);
}
false
}
Command::EnableLogBuffer => {
self.enable_log_buffer = true;
false
}
Command::DisableLogBuffer => {
self.enable_log_buffer = false;
false
}
false
}
Command::Log(LogMsg { target, msg, level }) => {
if let Err(e) = log(logger.file.as_mut(), &target, &msg, level) {
let _ = log(
logger.std.as_mut(),
"bp3d-logger",
&format!("Could not write to file backend: {}", e),
Level::Error,
);
}

pub fn run(mut self) {
while let Ok(v) = self.recv_ch.recv() {
let flag = self.exec_commad(v);
if flag {
// The thread has requested to exit itself; drop out of the main loop.
break;
}
let _ = log(logger.std.as_mut(), &target, &msg, level);
false
}
}
}

pub struct LoggerImpl {
thread: Mutex<Option<std::thread::JoinHandle<()>>>,
send_ch: Sender<Command>,
recv_ch: Receiver<Command>,
log_buffer_send_ch: Sender<LogMsg>,
log_buffer_recv_ch: Receiver<LogMsg>,
enable_log_buffer: AtomicBool,
enabled: AtomicBool,
recv_ch: Receiver<Command>,
log_buffer: Arc<ArrayQueue<LogMsg>>,
thread: Mutex<Option<std::thread::JoinHandle<()>>>,
}

impl LoggerImpl {
pub fn new() -> LoggerImpl {
let (send_ch, recv_ch) = bounded(BUF_SIZE);
let (log_buffer_send_ch, log_buffer_recv_ch) = bounded(BUF_SIZE);
LoggerImpl {
thread: Mutex::new(None),
send_ch,
recv_ch,
log_buffer_send_ch,
log_buffer_recv_ch,
enable_log_buffer: AtomicBool::new(false),
log_buffer: Arc::new(ArrayQueue::new(BUF_SIZE)),
enabled: AtomicBool::new(false),
}
}
Expand All @@ -118,15 +167,27 @@ impl LoggerImpl {
}

pub fn enable_log_buffer(&self, flag: bool) {
self.enable_log_buffer.store(flag, Ordering::Release);
unsafe {
if flag {
self.send_ch
.send(Command::EnableLogBuffer)
.unwrap_unchecked();
} else {
self.send_ch
.send(Command::DisableLogBuffer)
.unwrap_unchecked();
}
}
}

#[inline]
pub fn clear_log_buffer(&self) {
while self.log_buffer_recv_ch.try_recv().is_ok() {} //Clear the entire log buffer.
while self.log_buffer.pop().is_some() {} //Clear the entire log buffer.
}

pub fn get_log_buffer(&self) -> Receiver<LogMsg> {
self.log_buffer_recv_ch.clone()
#[inline]
pub fn read_log(&self) -> Option<LogMsg> {
self.log_buffer.pop()
}

pub fn terminate(&self) {
Expand Down Expand Up @@ -160,15 +221,10 @@ impl LoggerImpl {
}
}
let recv_ch = self.recv_ch.clone();
let log_buffer = self.log_buffer.clone();
*thread = Some(std::thread::spawn(move || {
let mut logger = logger;
while let Ok(v) = recv_ch.recv() {
let flag = exec_commad(v, &mut logger);
if flag {
// The thread has requested to exit itself; drop out of the main loop.
break;
}
}
let thread = Thread::new(logger, recv_ch, log_buffer);
thread.run();
}));
}
if flag {
Expand All @@ -177,35 +233,27 @@ impl LoggerImpl {
// This cannot panic as send_ch is owned by LoggerImpl which is intended
// to be statically allocated.
self.send_ch
.send(Command::Log(LogMsg {
level: Level::Error,
msg: "The logging thread has panicked!".into(),
target: "bp3d-logger".into(),
}))
.send(Command::Log(LogMsg::from_msg(
"bp3d-logger",
Level::Error,
"The logging thread has panicked!",
)))
.unwrap_unchecked();
}
}
}

pub fn low_level_log(&self, msg: LogMsg) {
if self.enable_log_buffer.load(Ordering::Acquire) {
unsafe {
// This cannot panic as both send_ch and log_buffer_send_ch are owned by LoggerImpl
// which is intended to be statically allocated.
self.send_ch
.send(Command::Log(msg.clone()))
.unwrap_unchecked();
self.log_buffer_send_ch.send(msg).unwrap_unchecked();
}
} else {
unsafe {
// This cannot panic as send_ch is owned by LoggerImpl which is intended
// to be statically allocated.
self.send_ch.send(Command::Log(msg)).unwrap_unchecked();
}
pub fn low_level_log(&self, msg: &LogMsg) {
unsafe {
// This cannot panic as send_ch is owned by LoggerImpl which is intended
// to be statically allocated.
self.send_ch
.send(Command::Log(msg.clone()))
.unwrap_unchecked();
}
}

#[inline]
pub fn is_enabled(&self) -> bool {
self.enabled.load(Ordering::Acquire)
}
Expand All @@ -232,26 +280,21 @@ impl Log for LoggerImpl {
return;
}
let (target, module) = extract_target_module(record);
//In the future attempt to not update all the time https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=17c218f318826f55ab64535bfcd28ec6
let system_tz =
time_tz::system::get_timezone().unwrap_or(time_tz::timezones::db::us::CENTRAL);
let time = OffsetDateTime::now_local();
let format = format_description!("[weekday repr:short] [month repr:short] [day] [hour repr:12]:[minute]:[second] [period case:upper]");
//<error> is very unlikely to occur (only possibility is a weird io error).
let formatted = OffsetDateTime::now_utc()
.to_timezone(system_tz)
let formatted = time
.unwrap_or_else(OffsetDateTime::now_utc)
.format(format)
.unwrap_or_else(|_| "<error>".into());
let msg = LogMsg {
msg: format!(
"({}) {}: {}",
formatted,
module.unwrap_or("main"),
record.args()
),
target: target.into(),
level: record.level(),
};
self.low_level_log(msg);
.unwrap_or_default();
let mut msg = LogMsg::new(target, record.level());
let _ = write!(
msg,
"({}) {}: {}",
formatted,
module.unwrap_or("main"),
record.args()
);
self.low_level_log(&msg);
}

fn flush(&self) {
Expand Down
Loading

0 comments on commit 330bbf0

Please sign in to comment.