Skip to content

Commit

Permalink
feat: project 11
Browse files Browse the repository at this point in the history
  • Loading branch information
j0tunn committed Feb 9, 2023
1 parent 3d9a936 commit 7f465dd
Show file tree
Hide file tree
Showing 6 changed files with 253 additions and 1 deletion.
27 changes: 27 additions & 0 deletions projects/11/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
cmake_minimum_required(VERSION 3.10)

include(${CMAKE_CURRENT_SOURCE_DIR}/../../common/CMakeLists.txt)
set(CMAKE_CXX_STANDARD 20)

project(join_server VERSION ${PROJECT_VERSION})

find_package(Boost REQUIRED system regex)

include(FetchContent)
FetchContent_Declare(sqlite3 URL "https://www.sqlite.org/snapshot/sqlite-snapshot-202205041843.tar.gz")
FetchContent_MakeAvailable(sqlite3)
if (NOT sqlite3_POPULATED)
FetchContent_Populate(sqlite3)
endif()

add_executable(join_server
main.cpp
db.cpp
"${sqlite3_SOURCE_DIR}/sqlite3.c"
)

target_include_directories(join_server PUBLIC "${sqlite3_SOURCE_DIR}")
target_link_libraries(join_server PRIVATE Boost::system Boost::regex)

# Package
setupCPack(join_server)
69 changes: 69 additions & 0 deletions projects/11/db.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#include <stdexcept>
#include <string>
#include <sstream>
#include "db.h"

using namespace std;

DataBase::DataBase()
: handle_(nullptr)
{
if (sqlite3_open_v2("db.sqlite", &handle_, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE | SQLITE_OPEN_NOMUTEX, NULL)) {
throw logic_error(string("Can't open database: ") + sqlite3_errmsg(handle_));
}

exec_("CREATE TABLE IF NOT EXISTS A (id int primary key, name text);");
exec_("CREATE TABLE IF NOT EXISTS B (id int primary key, name text);");
}

DataBase::~DataBase() {
if (handle_) {
sqlite3_close(handle_);
}
}

void DataBase::insert(const string& table, const string& id, const string& name) {
exec_((stringstream() << "INSERT INTO " << table << " VALUES(" << id << ", '" << name << "');").str());
}

void DataBase::truncate(const string& table) {
exec_((stringstream() << "DELETE FROM " << table << "; VACUUM;").str());
}

string DataBase::getIntersection() {
return execRead_("SELECT A.id, A.name, B.name FROM A, B WHERE A.id == B.id");
}

string DataBase::getSymmetricDifference() {
return execRead_("SELECT CASE WHEN A.id IS NOT NULL THEN A.id ELSE B.id END as id, A.name, B.name FROM"
" A FULL OUTER JOIN B ON A.id == B.id WHERE A.name IS NULL OR B.name IS NULL");
}

string DataBase::execRead_(const string& sql) {
stringstream outStream;

exec_(sql, [](void* pCtx, int columns, char **data, char **names) -> int {
stringstream* pOut = reinterpret_cast<stringstream*>(pCtx);

for (int i = 0; i < columns; ++i) {
*pOut << (data[i] ? data[i] : "") << (i < columns - 1 ? "," : "");
}

*pOut << endl;

return 0;
}, &outStream);

return outStream.str();
}

void DataBase::exec_(const string& sql, DataBase::CallbackPtr pCb, stringstream* pOut) {
char* errMsg;
int rc = sqlite3_exec(handle_, sql.data(), pCb, pOut, &errMsg);
if (rc != SQLITE_OK) {
const string err = string("Can't execute query: ") + errMsg;
sqlite3_free(errMsg);

throw logic_error(err);
}
}
24 changes: 24 additions & 0 deletions projects/11/db.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#pragma once

#include <string>
#include <functional>
#include <sstream>
#include "sqlite3.h"

class DataBase {
public:
DataBase();
~DataBase();

void insert(const std::string& table, const std::string& id, const std::string& name);
void truncate(const std::string& table);
std::string getIntersection();
std::string getSymmetricDifference();

private:
std::string execRead_(const std::string& sql);

typedef int(*CallbackPtr)(void*,int,char**,char**);
void exec_(const std::string& sql, CallbackPtr pCb = nullptr, std::stringstream* pOut = nullptr);
sqlite3* handle_;
};
129 changes: 129 additions & 0 deletions projects/11/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#include <boost/asio/co_spawn.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/write.hpp>
#include <boost/algorithm/string/regex.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <iostream>
#include <sstream>
#include <stdexcept>
#include <regex>
#include "db.h"

using boost::asio::ip::tcp;
using boost::asio::awaitable;
using boost::asio::co_spawn;
using boost::asio::detached;
using boost::asio::use_awaitable;

using namespace std;

string processCmd(const string& cmd, DataBase& db) {
cout << cmd << endl;
smatch match;

if (regex_match(cmd, match, regex("^INSERT (A|B) ([0-9]+) (.+)$"))) {
db.insert(match[1], match[2], match[3]);
return "";
}

if (regex_match(cmd, match, regex("^TRUNCATE (A|B)$"))) {
db.truncate(match[1]);
return "";
}

if (cmd == "INTERSECTION") {
return db.getIntersection();
}

if (cmd == "SYMMETRIC_DIFFERENCE") {
return db.getSymmetricDifference();
}

throw logic_error(string("Invalid command: ") + cmd);
}

awaitable<string> getRequest(tcp::socket& socket) {
char data[256] = {0};
size_t n = co_await socket.async_read_some(boost::asio::buffer(data), use_awaitable);

string res(data, n);
if (n && data[n - 1] != '\n') {
res += co_await getRequest(socket);
}

boost::trim(res);

co_return res;
}

awaitable<void> handleRequest(tcp::socket& socket, DataBase& db) {
const string req = co_await getRequest(socket);

vector<string> commands;
boost::split_regex(commands, req, boost::regex("(\r?\n)+"));

for (const string& cmd : commands) {
string resp;

try {
resp = processCmd(cmd, db) + "OK";
} catch (exception& e) {
resp = string("ERR: ") + e.what();
}

co_await async_write(socket, boost::asio::buffer(resp + "\n"), use_awaitable);
}
}

awaitable<void> handleSession(tcp::socket socket) {
try {
DataBase db;

for (;;) {
co_await handleRequest(socket, db);
}
} catch (boost::system::system_error& e) {
if (e.code() != boost::asio::error::eof) {
throw e;
}
} catch (exception& e) {
cerr << "session handle error: " << e.what() << "\n";
}
}

awaitable<void> listen(unsigned short port) {
auto executor = co_await boost::asio::this_coro::executor;
tcp::acceptor acceptor(executor, {tcp::v4(), port});

for (;;) {
tcp::socket socket = co_await acceptor.async_accept(use_awaitable);
co_spawn(executor, handleSession(move(socket)), detached);
}
}

int main(int argc, char* argv[]) {
if (argc != 2) {
cerr << "Usage: join_server <port>\n";
return 1;
}

try {
boost::asio::io_context io_context(1);

boost::asio::signal_set signals(io_context, SIGINT, SIGTERM);
signals.async_wait([&](auto, auto){ io_context.stop(); });

co_spawn(io_context, listen(atoi(argv[1])), detached);

io_context.run();
} catch (exception& e) {
cerr << "Exception: " << e.what() << "\n";

return 1;
}

return 0;
}
3 changes: 3 additions & 0 deletions projects/11/prebuild
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/bash

sudo apt-get install -y libboost-all-dev
2 changes: 1 addition & 1 deletion projects/current

0 comments on commit 7f465dd

Please sign in to comment.