Skip to content

Commit

Permalink
Merge pull request #19 from j0tunn/feat/prj.09
Browse files Browse the repository at this point in the history
ДЗ-09. Многопоточная асинхронная обработка команд
  • Loading branch information
j0tunn authored Apr 3, 2023
2 parents 1db7bea + 17eba27 commit df6fc91
Show file tree
Hide file tree
Showing 18 changed files with 502 additions and 5 deletions.
10 changes: 6 additions & 4 deletions common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ function(setupGTest target)
gtest_discover_tests(${target})
endfunction()


function(setupCPack target)
install(TARGETS ${target} RUNTIME DESTINATION bin)

function(setupDebPkg)
set(CPACK_GENERATOR DEB)
set(CPACK_PACKAGE_VERSION_MAJOR "${PROJECT_VERSION_MAJOR}")
set(CPACK_PACKAGE_VERSION_MINOR "${PROJECT_VERSION_MINOR}")
set(CPACK_PACKAGE_VERSION_PATCH "${PROJECT_VERSION_PATCH}")
set(CPACK_PACKAGE_CONTACT j0tunn@ya.ru)
include(CPack) # generates target `package`
endfunction()

function(setupCPack target)
install(TARGETS ${target} RUNTIME DESTINATION bin)
setupDebPkg()
endfunction()
24 changes: 24 additions & 0 deletions projects/09/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
cmake_minimum_required(VERSION 3.10)

include(${CMAKE_CURRENT_SOURCE_DIR}/../../common/CMakeLists.txt)

project(async VERSION ${PROJECT_VERSION})

add_library(async SHARED
async.cpp
bulk_reader.cpp
stream_logger.cpp
file_logger.cpp
reader_state.cpp
thread_logger.cpp
)

add_executable(async_cli
main.cpp
)
target_link_libraries(async_cli async)

# Package
install(TARGETS async LIBRARY DESTINATION lib)
install(TARGETS async_cli RUNTIME DESTINATION bin)
setupDebPkg()
50 changes: 50 additions & 0 deletions projects/09/async.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#include "async.h"
#include "bulk_reader.h"
#include "stream_logger.h"
#include "file_logger.h"
#include "thread_logger.h"
#include <filesystem>
#include <string>

using namespace std;

namespace async {

class Handle {
public:
Handle(size_t bulkSize)
: reader_(bulkSize)
, consoleLogger_(1, []() { return new StreamLogger(cout); })
, fileLogger_(2, []() { return new FileLogger(filesystem::current_path()); })
{
reader_.addFlushObserver(&consoleLogger_);
reader_.addFlushObserver(&fileLogger_);
}

~Handle() {
reader_.eof();
}

void addCmd(const string& cmd) {
reader_.addCmd(cmd);
}

private:
BulkReader reader_;
ThreadLogger consoleLogger_;
ThreadLogger fileLogger_;
};

Handle* connect(size_t bulkSize) {
return new Handle(bulkSize);
}

void receive(Handle* h, const char *data, size_t size) {
h->addCmd(string(data, size));
}

void disconnect(Handle* h) {
delete h;
}

}
13 changes: 13 additions & 0 deletions projects/09/async.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#pragma once

#include <cstddef>

namespace async {

class Handle;

Handle* connect(std::size_t bulkSize);
void receive(Handle* h, const char *data, std::size_t size);
void disconnect(Handle* h);

}
41 changes: 41 additions & 0 deletions projects/09/bulk_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#include <iostream>
#include "bulk_reader.h"

using namespace std;

BulkReader::BulkReader(unsigned int bulkSize)
: state_(new AutoModeState([this](unique_ptr<ReaderState>&& newState) { this->setNewState(move(newState)); }, bulkSize))
{}

void BulkReader::addCmd(const string& cmd) {
if (cmd == "{") {
state_->startBulk();
return;
}

if (cmd == "}") {
state_->finishBulk();
return;
}

state_->addCmd(cmd);
}

void BulkReader::eof() {
state_->eof();
}

void BulkReader::setNewState(unique_ptr<ReaderState>&& newState) {
notifyFlush_(state_->getBulk());
state_ = move(newState);
}

void BulkReader::addFlushObserver(IFlushObserver* observer) {
observers_.push_back(observer);
}

void BulkReader::notifyFlush_(const vector<Command>& bulk) {
for (auto observer : observers_) {
observer->onFlush(bulk);
}
}
24 changes: 24 additions & 0 deletions projects/09/bulk_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once

#include <string>
#include <vector>
#include <memory>
#include "flush_observer.h"
#include "command.h"
#include "reader_state.h"

class BulkReader {
public:
explicit BulkReader(unsigned int bulkSize);
void addCmd(const std::string& cmd);
void eof();

void addFlushObserver(IFlushObserver* observer);

private:
void notifyFlush_(const std::vector<Command>& bulk);
void setNewState(std::unique_ptr<ReaderState>&& state);

std::vector<IFlushObserver*> observers_;
std::unique_ptr<ReaderState> state_;
};
16 changes: 16 additions & 0 deletions projects/09/command.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once

#include <string>
#include <ctime>

struct Command {
explicit Command(const std::string& cmd)
: val(cmd)
, timestamp(std::time(0))
{}

Command(const Command&) = default;

std::string val;
std::time_t timestamp;
};
24 changes: 24 additions & 0 deletions projects/09/file_logger.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#include <sstream>
#include <fstream>
#include <thread>
#include "file_logger.h"
#include "stream_logger.h"

using namespace std;

FileLogger::FileLogger(const filesystem::path& dir)
: logDir_(dir)
{}

void FileLogger::onFlush(const vector<Command>& bulk) {
if (bulk.size() == 0) {
return;
}

stringstream fileName;
fileName << "bulk" << bulk[0].timestamp << "_" << this_thread::get_id() << ".log";

ofstream out(logDir_ / fileName.str());
StreamLogger subLogger(out);
subLogger.onFlush(bulk);
}
14 changes: 14 additions & 0 deletions projects/09/file_logger.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#pragma once

#include <string>
#include <filesystem>
#include "flush_observer.h"

class FileLogger : public IFlushObserver {
public:
FileLogger(const std::filesystem::path& dir);
void onFlush(const std::vector<Command>& bulk) override;

private:
const std::filesystem::path logDir_;
};
10 changes: 10 additions & 0 deletions projects/09/flush_observer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#pragma once

#include <vector>
#include "command.h"

class IFlushObserver {
public:
virtual ~IFlushObserver() = default;
virtual void onFlush(const std::vector<Command>& bulk) = 0;
};
27 changes: 27 additions & 0 deletions projects/09/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#include <string>
#include <iostream>

#include "async.h"

using namespace std;

int main(int argc, char** argv) {
if (argc < 2) {
cerr << "No bulk size passed" << endl;
return 1;
}

const unsigned int bulkSize = stoi(argv[1]);
async::Handle* h = async::connect(bulkSize);

for (string line; getline(cin, line);) {
async::receive(h, &line[0], line.size());
if (line == "exit") {
break;
}
}

async::disconnect(h);

return 0;
}
65 changes: 65 additions & 0 deletions projects/09/reader_state.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include "reader_state.h"

using namespace std;

ReaderState::ReaderState(StateSwitchFn switchState, unsigned int bulkSize)
: bulk_()
, switchState_(switchState)
, bulkSize_(bulkSize)
{}

template <typename T>
void ReaderState::setNewState() {
switchState_(unique_ptr<ReaderState>(new T(switchState_, bulkSize_)));
}

const vector<Command>& ReaderState::getBulk() const {
return bulk_;
};

void ReaderState::addCmd(const std::string& cmd) {
bulk_.emplace_back(Command{cmd});
}

/// AutoModeState
AutoModeState::AutoModeState(StateSwitchFn switchState, unsigned int bulkSize)
: ReaderState(switchState, bulkSize)
{}

void AutoModeState::addCmd(const std::string& cmd) {
ReaderState::addCmd(cmd);

if (bulk_.size() >= bulkSize_) {
setNewState<AutoModeState>();
}
}

void AutoModeState::startBulk() {
setNewState<ManualModeState>();
}

void AutoModeState::finishBulk() {
}

void AutoModeState::eof() {
setNewState<AutoModeState>();
}

/// ManualModeState
ManualModeState::ManualModeState(StateSwitchFn switchState, unsigned int bulkSize)
: ReaderState(switchState, bulkSize)
, startCounter_(1)
{}

void ManualModeState::startBulk() {
++startCounter_;
}

void ManualModeState::finishBulk() {
if (--startCounter_ == 0) {
setNewState<AutoModeState>();
}
}

void ManualModeState::eof() {
}
58 changes: 58 additions & 0 deletions projects/09/reader_state.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#pragma once

#include <string>
#include <functional>
#include <memory>
#include <vector>
#include "command.h"

class ReaderState;
using StateSwitchFn = std::function<void(std::unique_ptr<ReaderState>&&)>;

///
class ReaderState {
public:
ReaderState(StateSwitchFn switchState, unsigned int bulkSize);
virtual ~ReaderState() = default;

const std::vector<Command>& getBulk() const;

virtual void addCmd(const std::string& cmd);

virtual void startBulk() = 0;
virtual void finishBulk() = 0;
virtual void eof() = 0;

protected:
template <typename T>
void setNewState();

std::vector<Command> bulk_;
StateSwitchFn switchState_;
unsigned int bulkSize_;
};

///
class AutoModeState : public ReaderState {
public:
AutoModeState(StateSwitchFn switchState, unsigned int bulkSize);
~AutoModeState() = default;

void addCmd(const std::string& cmd) override;
void startBulk() override;
void finishBulk() override;
void eof() override;
};

///
class ManualModeState : public ReaderState {
public:
ManualModeState(StateSwitchFn switchState, unsigned int bulkSize);

void startBulk() override;
void finishBulk() override;
void eof() override;

private:
unsigned int startCounter_;
};
Loading

0 comments on commit df6fc91

Please sign in to comment.