Skip to content

Commit

Permalink
feat: project 10
Browse files Browse the repository at this point in the history
  • Loading branch information
j0tunn committed Mar 23, 2023
1 parent 1db7bea commit 070d91d
Show file tree
Hide file tree
Showing 21 changed files with 641 additions and 1 deletion.
26 changes: 26 additions & 0 deletions projects/10/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
cmake_minimum_required(VERSION 3.10)

include(${CMAKE_CURRENT_SOURCE_DIR}/../../common/CMakeLists.txt)
set(CMAKE_CXX_STANDARD 20)

project(bulk_server VERSION ${PROJECT_VERSION})

set(Boost_USE_STATIC_LIBS ON)
find_package(Boost REQUIRED system regex)

add_executable(${PROJECT_NAME}
main.cpp
cmd_processor.cpp
bulk_reader.cpp
bulk_router.cpp
flush_observer.cpp
stream_logger.cpp
file_logger.cpp
reader_state.cpp
thread_logger.cpp
)

target_link_libraries(${PROJECT_NAME} PRIVATE Boost::system Boost::regex)

# Package
setupCPack(${PROJECT_NAME})
34 changes: 34 additions & 0 deletions projects/10/bulk_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#include "bulk_reader.h"

using namespace std;

BulkReader::BulkReader(unsigned int bulkSize)
: state_(new AutoModeState([this](unique_ptr<ReaderState>&& newState) { this->setNewState(move(newState)); }, bulkSize))
{}

void BulkReader::addCmd(const string& cmd) {
if (cmd == "{") {
state_->startBulk();
return;
}

if (cmd == "}") {
state_->finishBulk();
return;
}

state_->addCmd(cmd);
}

void BulkReader::eof() {
state_->eof();
}

bool BulkReader::isInStaticMode() const {
return state_->isAutoMode();
}

void BulkReader::setNewState(unique_ptr<ReaderState>&& newState) {
notifyFlush_(state_->getBulk());
state_ = move(newState);
}
21 changes: 21 additions & 0 deletions projects/10/bulk_reader.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#pragma once

#include <string>
#include <vector>
#include <memory>
#include "flush_observer.h"
#include "command.h"
#include "reader_state.h"

class BulkReader : public FlushObservable {
public:
explicit BulkReader(unsigned int bulkSize);
void addCmd(const std::string& cmd);
void eof();
bool isInStaticMode() const;

private:
void setNewState(std::unique_ptr<ReaderState>&& state);

std::unique_ptr<ReaderState> state_;
};
59 changes: 59 additions & 0 deletions projects/10/bulk_router.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#include "bulk_router.h"
#include <utility>

using namespace std;

BulkRouter::BulkRouter(unsigned int bulkSize)
: bulkSize_(bulkSize)
, pStaticBulkReader_(nullptr)
{
}

void BulkRouter::addCmd(const string& cmd, unsigned long sessionId) {
if (!addDynamicCmd(cmd, sessionId)) {
addStaticCmd(cmd, sessionId);
}
}

bool BulkRouter::addDynamicCmd(const string& cmd, unsigned long sessionId) {
auto itReader = dynamicBulkReaders_.find(sessionId);
if (itReader == dynamicBulkReaders_.end()) {
return false;
}

auto& pReader = itReader->second;
pReader->addCmd(cmd);

if (pReader->isInStaticMode()) {
dynamicBulkReaders_.erase(itReader);
}

return true;
}

void BulkRouter::addStaticCmd(const string& cmd, unsigned long sessionId) {
if (!pStaticBulkReader_) {
pStaticBulkReader_ = make_unique<BulkReader>(bulkSize_);
pStaticBulkReader_->addFlushObserver(this);
}

pStaticBulkReader_->addCmd(cmd);

if (!pStaticBulkReader_->isInStaticMode()) {
dynamicBulkReaders_.emplace(make_pair(sessionId, move(pStaticBulkReader_)));
}
}

void BulkRouter::eof() {
if (pStaticBulkReader_) {
pStaticBulkReader_->eof();
}

for (auto& pair : dynamicBulkReaders_) {
pair.second->eof();
}
}

void BulkRouter::onFlush(const vector<Command>& bulk) {
notifyFlush_(bulk);
};
24 changes: 24 additions & 0 deletions projects/10/bulk_router.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once

#include <string>
#include <map>
#include <memory>
#include "flush_observer.h"
#include "bulk_reader.h"

class BulkRouter : public FlushObservable, public IFlushObserver {
public:
explicit BulkRouter(unsigned int bulkSize);
void addCmd(const std::string& cmd, unsigned long sessionId);
void eof();

void onFlush(const std::vector<Command>& bulk) override;

private:
void addStaticCmd(const std::string& cmd, unsigned long sessionId);
bool addDynamicCmd(const std::string& cmd, unsigned long sessionId);

unsigned int bulkSize_;
std::unique_ptr<BulkReader> pStaticBulkReader_;
std::map<unsigned long, std::unique_ptr<BulkReader> > dynamicBulkReaders_;
};
23 changes: 23 additions & 0 deletions projects/10/cmd_processor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#include "cmd_processor.h"
#include "stream_logger.h"
#include "file_logger.h"
#include <iostream>

using namespace std;

CmdProcessor::CmdProcessor(unsigned int bulkSize)
: bulkRouter_(bulkSize)
, consoleLogger_(1, []() { return new StreamLogger(cout); })
, fileLogger_(2, []() { return new FileLogger(filesystem::current_path()); })
{
bulkRouter_.addFlushObserver(&consoleLogger_);
bulkRouter_.addFlushObserver(&fileLogger_);
}

CmdProcessor::~CmdProcessor() {
bulkRouter_.eof();
}

void CmdProcessor::handleCmd(const string& cmd, unsigned long sessionId) {
bulkRouter_.addCmd(cmd, sessionId);
}
18 changes: 18 additions & 0 deletions projects/10/cmd_processor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#pragma once

#include "bulk_router.h"
#include "thread_logger.h"
#include <string>

class CmdProcessor {
public:
CmdProcessor(unsigned int bulkSize);
~CmdProcessor();

void handleCmd(const std::string& cmd, unsigned long sessionId);

private:
BulkRouter bulkRouter_;
ThreadLogger consoleLogger_;
ThreadLogger fileLogger_;
};
16 changes: 16 additions & 0 deletions projects/10/command.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once

#include <string>
#include <ctime>

struct Command {
explicit Command(const std::string& cmd)
: val(cmd)
, timestamp(std::time(0))
{}

Command(const Command&) = default;

std::string val;
std::time_t timestamp;
};
24 changes: 24 additions & 0 deletions projects/10/file_logger.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#include <sstream>
#include <fstream>
#include <thread>
#include "file_logger.h"
#include "stream_logger.h"

using namespace std;

FileLogger::FileLogger(const filesystem::path& dir)
: logDir_(dir)
{}

void FileLogger::onFlush(const vector<Command>& bulk) {
if (bulk.size() == 0) {
return;
}

stringstream fileName;
fileName << "bulk" << bulk[0].timestamp << "_" << this_thread::get_id() << ".log";

ofstream out(logDir_ / fileName.str());
StreamLogger subLogger(out);
subLogger.onFlush(bulk);
}
14 changes: 14 additions & 0 deletions projects/10/file_logger.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#pragma once

#include <string>
#include <filesystem>
#include "flush_observer.h"

class FileLogger : public IFlushObserver {
public:
FileLogger(const std::filesystem::path& dir);
void onFlush(const std::vector<Command>& bulk) override;

private:
const std::filesystem::path logDir_;
};
13 changes: 13 additions & 0 deletions projects/10/flush_observer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#include "flush_observer.h"

using namespace std;

void FlushObservable::addFlushObserver(IFlushObserver* observer) {
observers_.push_back(observer);
}

void FlushObservable::notifyFlush_(const vector<Command>& bulk) {
for (auto observer : observers_) {
observer->onFlush(bulk);
}
}
21 changes: 21 additions & 0 deletions projects/10/flush_observer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#pragma once

#include <vector>
#include "command.h"

class IFlushObserver {
public:
virtual ~IFlushObserver() = default;
virtual void onFlush(const std::vector<Command>& bulk) = 0;
};

class FlushObservable {
public:
void addFlushObserver(IFlushObserver* observer);

protected:
void notifyFlush_(const std::vector<Command>& bulk);

private:
std::vector<IFlushObserver*> observers_;
};
81 changes: 81 additions & 0 deletions projects/10/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#include "cmd_processor.h"
#include <boost/asio.hpp>
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/algorithm/string/regex.hpp>
#include <iostream>

using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::use_awaitable;

using namespace std;

void processCmd(const string& cmd) {
cout << cmd << endl;
}

awaitable<void> handleSession(tcp::socket socket, unsigned long sessionId, CmdProcessor& cmdProcessor) {
string data;
try {
for (;;) {
size_t n = co_await boost::asio::async_read_until(socket, boost::asio::dynamic_buffer(data), boost::regex("(\r?\n)+"), use_awaitable);

cmdProcessor.handleCmd(data.substr(0, n - 1), sessionId);
data.erase(0, n);
}
} catch (boost::system::system_error& e) {
if (e.code() == boost::asio::error::eof) {
if (data.size() > 0) {
cmdProcessor.handleCmd(data, sessionId);
}
} else {
throw e;
}
} catch (exception& e) {
cerr << "session handle error: " << e.what() << "\n";
}
}

awaitable<void> listen(unsigned short port, CmdProcessor& cmdProcessor) {
auto executor = co_await boost::asio::this_coro::executor;
tcp::acceptor acceptor(executor, {tcp::v4(), port});

for (unsigned long sessionId = 0;; ++sessionId) {
tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
co_spawn(executor, handleSession(move(socket), sessionId, cmdProcessor), detached);
}
}

int main(int argc, char* argv[]) {
if (argc != 3) {
cerr << "Usage: bulk_server <port> <bulk_size>\n";
return 1;
}

try {
const unsigned short port = atoi(argv[1]);
const unsigned int bulkSize = atoi(argv[2]);
CmdProcessor cmdProcessor(bulkSize);

boost::asio::io_context io_context(1);

boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto){ io_context.stop(); });

co_spawn(io_context, listen(port, cmdProcessor), detached);

io_context.run();
} catch (exception& e) {
cerr << "Exception: " << e.what() << "\n";

return 1;
}

return 0;
}
3 changes: 3 additions & 0 deletions projects/10/prebuild
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

sudo apt-get install -y libboost-all-dev
Loading

0 comments on commit 070d91d

Please sign in to comment.