diff --git a/common/CMakeLists.txt b/common/CMakeLists.txt index db98851..6cc5043 100644 --- a/common/CMakeLists.txt +++ b/common/CMakeLists.txt @@ -22,10 +22,7 @@ function(setupGTest target) gtest_discover_tests(${target}) endfunction() - -function(setupCPack target) - install(TARGETS ${target} RUNTIME DESTINATION bin) - +function(setupDebPkg) set(CPACK_GENERATOR DEB) set(CPACK_PACKAGE_VERSION_MAJOR "${PROJECT_VERSION_MAJOR}") set(CPACK_PACKAGE_VERSION_MINOR "${PROJECT_VERSION_MINOR}") @@ -33,3 +30,8 @@ function(setupCPack target) set(CPACK_PACKAGE_CONTACT j0tunn@ya.ru) include(CPack) # generates target `package` endfunction() + +function(setupCPack target) + install(TARGETS ${target} RUNTIME DESTINATION bin) + setupDebPkg() +endfunction() diff --git a/projects/09/CMakeLists.txt b/projects/09/CMakeLists.txt new file mode 100644 index 0000000..8592a07 --- /dev/null +++ b/projects/09/CMakeLists.txt @@ -0,0 +1,24 @@ +cmake_minimum_required(VERSION 3.10) + +include(${CMAKE_CURRENT_SOURCE_DIR}/../../common/CMakeLists.txt) + +project(async VERSION ${PROJECT_VERSION}) + +add_library(async SHARED + async.cpp + bulk_reader.cpp + stream_logger.cpp + file_logger.cpp + reader_state.cpp + thread_logger.cpp +) + +add_executable(async_cli + main.cpp +) +target_link_libraries(async_cli async) + +# Package +install(TARGETS async LIBRARY DESTINATION lib) +install(TARGETS async_cli RUNTIME DESTINATION bin) +setupDebPkg() diff --git a/projects/09/async.cpp b/projects/09/async.cpp new file mode 100644 index 0000000..f0078f9 --- /dev/null +++ b/projects/09/async.cpp @@ -0,0 +1,50 @@ +#include "async.h" +#include "bulk_reader.h" +#include "stream_logger.h" +#include "file_logger.h" +#include "thread_logger.h" +#include +#include + +using namespace std; + +namespace async { + +class Handle { +public: + Handle(size_t bulkSize) + : reader_(bulkSize) + , consoleLogger_(1, []() { return new StreamLogger(cout); }) + , fileLogger_(2, []() { return new FileLogger(filesystem::current_path()); }) + { + reader_.addFlushObserver(&consoleLogger_); + reader_.addFlushObserver(&fileLogger_); + } + + ~Handle() { + reader_.eof(); + } + + void addCmd(const string& cmd) { + reader_.addCmd(cmd); + } + +private: + BulkReader reader_; + ThreadLogger consoleLogger_; + ThreadLogger fileLogger_; +}; + +Handle* connect(size_t bulkSize) { + return new Handle(bulkSize); +} + +void receive(Handle* h, const char *data, size_t size) { + h->addCmd(string(data, size)); +} + +void disconnect(Handle* h) { + delete h; +} + +} diff --git a/projects/09/async.h b/projects/09/async.h new file mode 100644 index 0000000..b2c9b28 --- /dev/null +++ b/projects/09/async.h @@ -0,0 +1,13 @@ +#pragma once + +#include + +namespace async { + +class Handle; + +Handle* connect(std::size_t bulkSize); +void receive(Handle* h, const char *data, std::size_t size); +void disconnect(Handle* h); + +} diff --git a/projects/09/bulk_reader.cpp b/projects/09/bulk_reader.cpp new file mode 100644 index 0000000..f6795fa --- /dev/null +++ b/projects/09/bulk_reader.cpp @@ -0,0 +1,41 @@ +#include +#include "bulk_reader.h" + +using namespace std; + +BulkReader::BulkReader(unsigned int bulkSize) + : state_(new AutoModeState([this](unique_ptr&& newState) { this->setNewState(move(newState)); }, bulkSize)) +{} + +void BulkReader::addCmd(const string& cmd) { + if (cmd == "{") { + state_->startBulk(); + return; + } + + if (cmd == "}") { + state_->finishBulk(); + return; + } + + state_->addCmd(cmd); +} + +void BulkReader::eof() { + state_->eof(); +} + +void BulkReader::setNewState(unique_ptr&& newState) { + notifyFlush_(state_->getBulk()); + state_ = move(newState); +} + +void BulkReader::addFlushObserver(IFlushObserver* observer) { + observers_.push_back(observer); +} + +void BulkReader::notifyFlush_(const vector& bulk) { + for (auto observer : observers_) { + observer->onFlush(bulk); + } +} diff --git a/projects/09/bulk_reader.h b/projects/09/bulk_reader.h new file mode 100644 index 0000000..219fed3 --- /dev/null +++ b/projects/09/bulk_reader.h @@ -0,0 +1,24 @@ +#pragma once + +#include +#include +#include +#include "flush_observer.h" +#include "command.h" +#include "reader_state.h" + +class BulkReader { +public: + explicit BulkReader(unsigned int bulkSize); + void addCmd(const std::string& cmd); + void eof(); + + void addFlushObserver(IFlushObserver* observer); + +private: + void notifyFlush_(const std::vector& bulk); + void setNewState(std::unique_ptr&& state); + + std::vector observers_; + std::unique_ptr state_; +}; diff --git a/projects/09/command.h b/projects/09/command.h new file mode 100644 index 0000000..425f5ed --- /dev/null +++ b/projects/09/command.h @@ -0,0 +1,16 @@ +#pragma once + +#include +#include + +struct Command { + explicit Command(const std::string& cmd) + : val(cmd) + , timestamp(std::time(0)) + {} + + Command(const Command&) = default; + + std::string val; + std::time_t timestamp; +}; diff --git a/projects/09/file_logger.cpp b/projects/09/file_logger.cpp new file mode 100644 index 0000000..866ef8d --- /dev/null +++ b/projects/09/file_logger.cpp @@ -0,0 +1,24 @@ +#include +#include +#include +#include "file_logger.h" +#include "stream_logger.h" + +using namespace std; + +FileLogger::FileLogger(const filesystem::path& dir) + : logDir_(dir) +{} + +void FileLogger::onFlush(const vector& bulk) { + if (bulk.size() == 0) { + return; + } + + stringstream fileName; + fileName << "bulk" << bulk[0].timestamp << "_" << this_thread::get_id() << ".log"; + + ofstream out(logDir_ / fileName.str()); + StreamLogger subLogger(out); + subLogger.onFlush(bulk); +} diff --git a/projects/09/file_logger.h b/projects/09/file_logger.h new file mode 100644 index 0000000..46358e7 --- /dev/null +++ b/projects/09/file_logger.h @@ -0,0 +1,14 @@ +#pragma once + +#include +#include +#include "flush_observer.h" + +class FileLogger : public IFlushObserver { +public: + FileLogger(const std::filesystem::path& dir); + void onFlush(const std::vector& bulk) override; + +private: + const std::filesystem::path logDir_; +}; diff --git a/projects/09/flush_observer.h b/projects/09/flush_observer.h new file mode 100644 index 0000000..8dcb43d --- /dev/null +++ b/projects/09/flush_observer.h @@ -0,0 +1,10 @@ +#pragma once + +#include +#include "command.h" + +class IFlushObserver { +public: + virtual ~IFlushObserver() = default; + virtual void onFlush(const std::vector& bulk) = 0; +}; diff --git a/projects/09/main.cpp b/projects/09/main.cpp new file mode 100644 index 0000000..44d5545 --- /dev/null +++ b/projects/09/main.cpp @@ -0,0 +1,27 @@ +#include +#include + +#include "async.h" + +using namespace std; + +int main(int argc, char** argv) { + if (argc < 2) { + cerr << "No bulk size passed" << endl; + return 1; + } + + const unsigned int bulkSize = stoi(argv[1]); + async::Handle* h = async::connect(bulkSize); + + for (string line; getline(cin, line);) { + async::receive(h, &line[0], line.size()); + if (line == "exit") { + break; + } + } + + async::disconnect(h); + + return 0; +} diff --git a/projects/09/reader_state.cpp b/projects/09/reader_state.cpp new file mode 100644 index 0000000..09eaafd --- /dev/null +++ b/projects/09/reader_state.cpp @@ -0,0 +1,65 @@ +#include "reader_state.h" + +using namespace std; + +ReaderState::ReaderState(StateSwitchFn switchState, unsigned int bulkSize) + : bulk_() + , switchState_(switchState) + , bulkSize_(bulkSize) +{} + +template +void ReaderState::setNewState() { + switchState_(unique_ptr(new T(switchState_, bulkSize_))); +} + +const vector& ReaderState::getBulk() const { + return bulk_; +}; + +void ReaderState::addCmd(const std::string& cmd) { + bulk_.emplace_back(Command{cmd}); +} + +/// AutoModeState +AutoModeState::AutoModeState(StateSwitchFn switchState, unsigned int bulkSize) + : ReaderState(switchState, bulkSize) +{} + +void AutoModeState::addCmd(const std::string& cmd) { + ReaderState::addCmd(cmd); + + if (bulk_.size() >= bulkSize_) { + setNewState(); + } +} + +void AutoModeState::startBulk() { + setNewState(); +} + +void AutoModeState::finishBulk() { +} + +void AutoModeState::eof() { + setNewState(); +} + +/// ManualModeState +ManualModeState::ManualModeState(StateSwitchFn switchState, unsigned int bulkSize) + : ReaderState(switchState, bulkSize) + , startCounter_(1) +{} + +void ManualModeState::startBulk() { + ++startCounter_; +} + +void ManualModeState::finishBulk() { + if (--startCounter_ == 0) { + setNewState(); + } +} + +void ManualModeState::eof() { +} diff --git a/projects/09/reader_state.h b/projects/09/reader_state.h new file mode 100644 index 0000000..93ea202 --- /dev/null +++ b/projects/09/reader_state.h @@ -0,0 +1,58 @@ +#pragma once + +#include +#include +#include +#include +#include "command.h" + +class ReaderState; +using StateSwitchFn = std::function&&)>; + +/// +class ReaderState { +public: + ReaderState(StateSwitchFn switchState, unsigned int bulkSize); + virtual ~ReaderState() = default; + + const std::vector& getBulk() const; + + virtual void addCmd(const std::string& cmd); + + virtual void startBulk() = 0; + virtual void finishBulk() = 0; + virtual void eof() = 0; + +protected: + template + void setNewState(); + + std::vector bulk_; + StateSwitchFn switchState_; + unsigned int bulkSize_; +}; + +/// +class AutoModeState : public ReaderState { +public: + AutoModeState(StateSwitchFn switchState, unsigned int bulkSize); + ~AutoModeState() = default; + + void addCmd(const std::string& cmd) override; + void startBulk() override; + void finishBulk() override; + void eof() override; +}; + +/// +class ManualModeState : public ReaderState { +public: + ManualModeState(StateSwitchFn switchState, unsigned int bulkSize); + + void startBulk() override; + void finishBulk() override; + void eof() override; + +private: + unsigned int startCounter_; +}; diff --git a/projects/09/stream_logger.cpp b/projects/09/stream_logger.cpp new file mode 100644 index 0000000..2ee3a02 --- /dev/null +++ b/projects/09/stream_logger.cpp @@ -0,0 +1,20 @@ +#include "stream_logger.h" + +using namespace std; + +StreamLogger::StreamLogger(ostream& out) + : out_(out) +{} + +void StreamLogger::onFlush(const vector& bulk) { + out_ << "bulk: "; + + for (unsigned int i = 0; i < bulk.size(); ++i) { + out_ << bulk[i].val; + if (i < bulk.size() - 1) { + out_ << ", "; + } + } + + out_ << endl; +} diff --git a/projects/09/stream_logger.h b/projects/09/stream_logger.h new file mode 100644 index 0000000..84d16b8 --- /dev/null +++ b/projects/09/stream_logger.h @@ -0,0 +1,13 @@ +#pragma once + +#include +#include "flush_observer.h" + +class StreamLogger : public IFlushObserver { +public: + explicit StreamLogger(std::ostream& out); + void onFlush(const std::vector& bulk) override; + +private: + std::ostream& out_; +}; diff --git a/projects/09/thread_logger.cpp b/projects/09/thread_logger.cpp new file mode 100644 index 0000000..13c0467 --- /dev/null +++ b/projects/09/thread_logger.cpp @@ -0,0 +1,65 @@ +#include "thread_logger.h" +#include + +using namespace std; + +ThreadLogger::ThreadLogger(size_t maxThreads, ThreadLogger::SubLoggerCreateFn createSubLogger) + : maxThreads_(maxThreads) + , createSubLogger_(createSubLogger) + , stopped_(false) +{ + threads_.reserve(maxThreads); +} + +ThreadLogger::~ThreadLogger() { + stop(); + + for (auto& t : threads_) { + t.join(); + } +} + +void ThreadLogger::onFlush(const vector& bulk) { + if (threads_.size() < maxThreads_) { + threads_.emplace_back(&ThreadLogger::workerThread, this); + } + + { + lock_guard guard{dataAccess_}; + bulks_.push(bulk); + } + + dataChanged_.notify_one(); +}; + +void ThreadLogger::workerThread() { + unique_ptr pSubLogger{createSubLogger_()}; + vector nextBulk; + + for (;;) { + { + unique_lock lck{dataAccess_}; + dataChanged_.wait(lck, [this]() { + return stopped_ || !bulks_.empty(); + }); + + if (stopped_) { + break; + } + + nextBulk = bulks_.front(); + bulks_.pop(); + } + + pSubLogger->onFlush(nextBulk); + } +} + +void ThreadLogger::stop() { + { + lock_guard guard{dataAccess_}; + stopped_ = true; + } + + dataChanged_.notify_all(); +} diff --git a/projects/09/thread_logger.h b/projects/09/thread_logger.h new file mode 100644 index 0000000..4d0d60c --- /dev/null +++ b/projects/09/thread_logger.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include "flush_observer.h" + +class ThreadLogger : public IFlushObserver { +public: + using SubLoggerCreateFn = std::function; + + ThreadLogger(std::size_t maxThreads, SubLoggerCreateFn createSubLogger); + ~ThreadLogger(); + + void onFlush(const std::vector& bulk) override; + +private: + void workerThread(); + void stop(); + + std::size_t maxThreads_; + SubLoggerCreateFn createSubLogger_; + std::vector threads_; + std::queue > bulks_; + std::mutex dataAccess_; + std::condition_variable dataChanged_; + bool stopped_; +}; diff --git a/projects/current b/projects/current index ca7bf83..aa2f0b2 120000 --- a/projects/current +++ b/projects/current @@ -1 +1 @@ -13 \ No newline at end of file +09 \ No newline at end of file