From 2bb92cfbcad90b5060d1b32ecfb141f5030ecbb7 Mon Sep 17 00:00:00 2001 From: Daniel Cloran Date: Sun, 14 Apr 2024 14:54:13 -0400 Subject: [PATCH 1/9] paired on seg fault issues Co-authored-by: Alec Pannunzio Co-authored-by: Garrett Chandler --- .vscode/settings.json | 7 +- CMakeLists.txt | 101 ++----- include/Connection.h | 100 +++++-- include/ParsingEngine.h | 33 +++ include/SocketClusterClient.h | 15 +- include/utility/ThreadSafeList.h | 97 ++++++ include/utility/ThreadSafeQueue.h | 92 ++++++ src/Connection.cpp | 471 +++++++++++++++++++++++++++--- src/ParsingEngine.cpp | 39 ++- src/SocketClusterClient.cpp | 8 +- test/examples/client_classed.cpp | 53 ++++ test/examples/client_simple.cpp | 0 test/test.cpp | 15 +- 13 files changed, 855 insertions(+), 176 deletions(-) create mode 100644 include/ParsingEngine.h create mode 100644 include/utility/ThreadSafeList.h create mode 100644 include/utility/ThreadSafeQueue.h delete mode 100644 test/examples/client_simple.cpp diff --git a/.vscode/settings.json b/.vscode/settings.json index f4b89af..b3a9770 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -93,6 +93,9 @@ "span": "cpp", "stop_token": "cpp", "cfenv": "cpp", - "typeindex": "cpp" - } + "typeindex": "cpp", + "forward_list": "cpp", + "unordered_set": "cpp" + }, + "cmake.configureOnOpen": true } \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 20b60cc..1af474b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,11 +1,6 @@ cmake_minimum_required(VERSION 3.10) # Example minimum version set(CMAKE_CXX_STANDARD 17) -# set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -coverage") -# set(CMAKE_CXX_FLAGS " ${CMAKE_CXX_FLAGS} -coverage") -set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fprofile-arcs -ftest-coverage -O0") -set(CMAKE_CXX_FLAGS " ${CMAKE_CXX_FLAGS} -fprofile-arcs -ftest-coverage -O0") - project(SocketClusterClientCPP) include(FetchContent) @@ -18,55 +13,32 @@ FetchContent_Declare(JsonCpp FetchContent_MakeAvailable(JsonCpp) # === === -# Fetch WebSocket++ -FetchContent_Declare( - websocketpp - GIT_REPOSITORY https://github.com/zaphoyd/websocketpp.git - GIT_TAG master # It's better to use a specific commit or tag for reproducibility -) - -# Make WebSocket++ available for #include -FetchContent_GetProperties(websocketpp) -if(NOT websocketpp_POPULATED) - FetchContent_Populate(websocketpp) - include_directories(${websocketpp_SOURCE_DIR}) -endif() - -# Fetch Asio (standalone version, without Boost) +set(BOOST_ENABLE_CMAKE ON) +include(FetchContent) FetchContent_Declare( - asio - GIT_REPOSITORY https://github.com/chriskohlhoff/asio.git - GIT_TAG asio-1-18-1 # Use a specific tag or commit to ensure reproducibility + Boost + GIT_REPOSITORY https://github.com/boostorg/boost.git + GIT_TAG boost-1.80.0 ) +FetchContent_MakeAvailable(Boost) -# Make Asio available for #include -FetchContent_GetProperties(asio) -if(NOT asio_POPULATED) - FetchContent_Populate(asio) - include_directories(${asio_SOURCE_DIR}/asio/include) - add_definitions(-DASIO_STANDALONE) -endif() +find_package(Boost 1.76.0 REQUIRED) # header only libraries must not be added here -# Build Library # Find source files file(GLOB_RECURSE LIBRARY_SOURCES src/*.cpp) - -# Create the library add_library(SocketClusterClientCPP SHARED ${LIBRARY_SOURCES}) - -# Link against json-c target_link_libraries(SocketClusterClientCPP PRIVATE jsoncpp_static) # Update include directories to find json-c headers target_include_directories(SocketClusterClientCPP PUBLIC $ $ - $ - $ - $ $ + $ ) +target_include_directories(SocketClusterClientCPP PUBLIC "${Boost_INCLUDE_DIRS}") + # Install targets and headers install(TARGETS SocketClusterClientCPP ARCHIVE DESTINATION lib @@ -76,9 +48,13 @@ install(TARGETS SocketClusterClientCPP install(DIRECTORY src/ DESTINATION include) +add_executable(simple_example test/examples/client_classed.cpp) +target_link_libraries(simple_example PUBLIC SocketClusterClientCPP) + + # Coverage -option(BUILD_TESTING "Builds only the test executable." ON) -option(CODE_COVERAGE "Collect coverage from test library" ON) +option(BUILD_TESTING "Builds only the test executable." OFF) +option(CODE_COVERAGE "Collect coverage from test library" OFF) if(BUILD_TESTING) enable_testing() @@ -86,6 +62,8 @@ if(BUILD_TESTING) add_test(NAME project_tests COMMAND ./bin/tests) if(CODE_COVERAGE AND CMAKE_BUILD_TYPE MATCHES Debug) + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fprofile-arcs -ftest-coverage -O0") + set(CMAKE_CXX_FLAGS " ${CMAKE_CXX_FLAGS} -fprofile-arcs -ftest-coverage -O0") # Set the coverage report output directory set(COVERAGE_DIR ${CMAKE_BINARY_DIR}/coverage) @@ -120,52 +98,11 @@ if(BUILD_TESTING) endif() - - - # if(CODE_COVERAGE) - # set(CODE_COVERAGE_VERBOSE TRUE) - # include(CodeCoverage.cmake) - # # append_coverage_compiler_flags() - # append_coverage_compiler_flags_to_target(SocketClusterClientCPP) - # # set(COVERAGE_EXCLUDES "src/*") - # setup_target_for_coverage_lcov( - # NAME coverage - # EXECUTABLE ./bin/tests - # # BASE_DIRECTORY "${PROJECT_SOURCE_DIR}/src/" - # # EXCLUDE "*.cpp" - # ) - - # # setup_target_for_coverage_lcov( - # # NAME coverage - # # EXECUTABLE ./bin/tests - # # EXCLUDE "${PROJECT_SOURCE_DIR}/_deps/*" - # # "${PROJECT_BINARY_DIR}/*" - # # "${PROJECT_SOURCE_DIR}/test/*" - # # ) - # endif() endif() -# # === websocket++ === -# FetchContent_Declare(websocketpp -# GIT_REPOSITORY https://github.com/zaphoyd/websocketpp.git -# GIT_TAG 0.8.2 -# ) -# FetchContent_GetProperties(websocketpp) -# if(NOT websocketpp_POPULATED) -# FetchContent_Populate(websocketpp) -# add_subdirectory(${websocketpp_SOURCE_DIR} ${websocketpp_BINARY_DIR} EXCLUDE_FROM_ALL) -# endif() -# add_library(Websockets INTERFACE) -# # === === -# # === boost++ === -# FetchContent_Declare( -# Boost -# GIT_REPOSITORY https://github.com/boostorg/boost.git -# GIT_TAG boost-1.80.0 # Replace with your desired version -# ) -# FetchContent_MakeAvailable(Boost) + diff --git a/include/Connection.h b/include/Connection.h index 4299af0..84c00d3 100644 --- a/include/Connection.h +++ b/include/Connection.h @@ -1,49 +1,87 @@ -#ifndef Connection_h -#define Connection_h +#ifndef CONNECTION_H +#define CONNECTION_H #pragma clang diagnostic push -#pragma clang diagnostic ignored "-Weverything" //ignore warnings in external libs +#pragma clang diagnostic ignored "-Weverything" -#include "websocketpp/client.hpp" -#include "websocketpp/config/asio_no_tls_client.hpp" +// Boost libs +#include +#include +#include -#include #include +#include +#include -typedef websocketpp::client Client; -typedef Client::connection_ptr ConnectionPtr; -typedef websocketpp::connection_hdl ConnectionHDL; -typedef websocketpp::lib::error_code ErrorCode; +#include "utility/ThreadSafeQueue.h" +#include "utility/ThreadSafeList.h" -typedef websocketpp::config::asio_client::message_type::ptr MessagePtr; +// Callback and Subscription types +typedef std::function socketCallback; +typedef std::tuple subscription; -enum class ConnectionResult { - SUCCESS = 0, - ERROR_TIMEOUT = -1, - ERROR_UNKNOWN = -2, - // ... add more specific error codes as needed -}; +namespace beast = boost::beast; +namespace http = beast::http; +namespace websocket = beast::websocket; +namespace net = boost::asio; +using tcp = boost::asio::ip::tcp; + +// enum class ConnectionResult { +// SUCCESS = 0, +// ERROR_TIMEOUT = -1, +// ERROR_UNKNOWN = -2, +// // ... add more specific error codes as needed +// }; -class Connection { +class Connection : public std::enable_shared_from_this { public: - Connection(const std::string& url, int port); - ~Connection(); - bool connect(); + Connection(net::io_context& ioc); + ~Connection() {}; + + std::thread launch_socket(const char * host, const char * port); + void stop(); + + // void subscribe(std::string channel, socketCallback callback); + // void unsubscribe(std::string channel); + // void publish(std::string channel, std::string data); + // void publish(std::string channel, Json::Value *data); private: - const std::string& m_url; - int m_port; + void fail(beast::error_code ec, char const* what); + void on_resolve(beast::error_code ec, tcp::resolver::results_type results); + void on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep); + void on_handshake(beast::error_code ec); + void on_write(beast::error_code ec, std::size_t bytes_transferred); + void on_read(beast::error_code ec, std::size_t bytes_transferred); + void on_close(beast::error_code ec); + + void run(char const* host, char const* port); - Client * m_client; - ConnectionHDL m_connectionHdl; + std::string m_uri; + int msgCounter = 1; + std::thread messageThread; + void message_processing(); + net::io_context& the_io_context; + tcp::resolver resolver_; + websocket::stream ws_; + beast::flat_buffer buffer_; + + std::string host_; + std::string port_; + + std::condition_variable writable_cv; + std::mutex writable_m; + volatile bool writable = false; + volatile bool socket_connected = false; + volatile bool socket_closed = false; - void close(); - void send(const std::string& data); - void onWebSocketOpen(ConnectionHDL hdl); - void onWebSocketClose(ConnectionHDL hdl); - void onWebSocketMessage(ConnectionHDL hdl, Client::message_ptr msg); + + + // Storing Connections and Subscription + std::unique_ptr> message_queue; + std::unique_ptr> subscription_list; }; -#endif /* Connection_h */ +#endif /* CONNECTION_H */ diff --git a/include/ParsingEngine.h b/include/ParsingEngine.h new file mode 100644 index 0000000..91bc07b --- /dev/null +++ b/include/ParsingEngine.h @@ -0,0 +1,33 @@ +// #ifndef PARSING_ENGINE_HPP +// #define PARSING_ENGINE_HPP + +// #include "json/json.h" + +// namespace scccpp +// { + +// class ParsingEngine +// { +// private: +// const std::istream json_template("{\"event\":\"#handshake\",\"data\":{\"authToken\":null},\"cid\": 1 }"); +// public: +// ParsingEngine(/* args */); +// ~ParsingEngine(); + +// int get_parse( std::string channel, std::string data, std::string event, int msgCounter); + +// }; + +// ParsingEngine::ParsingEngine(/* args */) {} +// ParsingEngine::~ParsingEngine(){} + +// } // namespace scccpp + + + + + + + + +// #endif \ No newline at end of file diff --git a/include/SocketClusterClient.h b/include/SocketClusterClient.h index 1ffb59e..42671e2 100644 --- a/include/SocketClusterClient.h +++ b/include/SocketClusterClient.h @@ -1,23 +1,28 @@ #ifndef SocketClusterClient_h #define SocketClusterClient_h + +// STD libs +#include #include +#include +#include #include +#include +#include +#include #include -#include +// JSON Library #include #include "Connection.h" - -typedef std::function Callback; - class SocketClusterClient { public: SocketClusterClient(); ~SocketClusterClient(); - std::shared_ptr createConnection(const std::string& url, int port); + std::shared_ptr createConnection(const char * url, const char * port); std::list>& getConnections(); private: diff --git a/include/utility/ThreadSafeList.h b/include/utility/ThreadSafeList.h new file mode 100644 index 0000000..5692c3e --- /dev/null +++ b/include/utility/ThreadSafeList.h @@ -0,0 +1,97 @@ +#ifndef ThreadSafeList_h +#define ThreadSafeList_h + +template +class ThreadSafeList +{ + struct node + { + std::mutex m; + std::shared_ptr data; + std::unique_ptr next; + node() : next() {} + + node(T const &value) : data(std::make_shared(value)) {} + }; + + node head; + +public: + ThreadSafeList() {} + + ~ThreadSafeList() + { + remove_if([](node const &) + { return true; }); + } + + ThreadSafeList(ThreadSafeList const &other) = delete; + ThreadSafeList &operator=(ThreadSafeList const &other) = delete; + + void push_front(T const &value) + { + std::unique_ptr new_node(new node(value)); // 4 + std::lock_guard lk(head.m); + new_node->next = std::move(head.next); // 5 + head.next = std::move(new_node); // 6 + } + + template + void for_each(Function f) // 7 + { + node *current = &head; + std::unique_lock lk(head.m); // 8 + while (node *const next = current->next.get()) // 9 + { + std::unique_lock next_lk(next->m); // 10 + lk.unlock(); // 11 + f(*next->data); // 12 + current = next; + lk = std::move(next_lk); // 13 + } + } + + template + std::shared_ptr find_first_if(Predicate p) // 14 + { + node *current = &head; + std::unique_lock lk(head.m); + while (node *const next = current->next.get()) + { + std::unique_lock next_lk(next->m); + lk.unlock(); + if (p(*next->data)) // 15 + { + return next->data; // 16 + } + current = next; + lk = std::move(next_lk); + } + return std::shared_ptr(); + } + + template + void remove_if(Predicate p) // 17 + { + node *current = &head; + std::unique_lock lk(head.m); + while (node *const next = current->next.get()) + { + std::unique_lock next_lk(next->m); + if (p(*next->data)) // 18 + { + std::unique_ptr old_next = std::move(current->next); + current->next = std::move(next->next); + next_lk.unlock(); + } // 20 + else + { + lk.unlock(); // 21 + current = next; + lk = std::move(next_lk); + } + } + } +}; + +#endif // ThreadSafeList_h diff --git a/include/utility/ThreadSafeQueue.h b/include/utility/ThreadSafeQueue.h new file mode 100644 index 0000000..09f7029 --- /dev/null +++ b/include/utility/ThreadSafeQueue.h @@ -0,0 +1,92 @@ +#ifndef ThreadSafeQueue_h +#define ThreadSafeQueue_h + +#include +#include +#include +#include + +template +class ThreadSafeQueue +{ +public: + ThreadSafeQueue(int maxSize = 200) : maxSize(maxSize), q(), m(), c() {} + ~ThreadSafeQueue() {} + + // Add an element to the queue. + void enqueue(T t) + { + std::lock_guard lock(m); + if (q.size() < maxSize) { + q.push_back(t); + } + else { + /* + If this error is thrown, the write rate is too high, or the socket + has disconnected without recognizing and the queue is backing up. + */ + auto currentClock = std::chrono::system_clock::now(); + std::time_t currentTime = std::chrono::system_clock::to_time_t(currentClock); + std::cerr << std::ctime(¤tTime) << "Message Queue Full. Dumping oldest message." << std::endl; + q.pop_back(); + q.push_back(t); + } + // std::cout<< "Adding to msg Queue size is now: " << q.size() << std::endl; + c.notify_one(); + } + + void push_front(T t) + { + std::lock_guard lock(m); + if (q.size() >= maxSize) + { + /* + If this error is thrown, the write rate is too high, or the socket + has disconnected without recognizing and the queue is backing up. + */ + auto currentClock = std::chrono::system_clock::now(); + std::time_t currentTime = std::chrono::system_clock::to_time_t(currentClock); + std::cerr << std::ctime(¤tTime) << "Push_front exceeds max queue size: allowing anyway." << std::endl;\ + } + q.push_front(t); + c.notify_one(); + } + + + // Get the front element. + // If the queue is empty, wait till a element is avaiable. + T dequeue(void) + { + std::unique_lock lock(m); + if (q.empty()) return ""; + T val = q.front(); + q.pop_front(); + // std::cout<< "Removing from msg Queue size is now: " << q.size() << std::endl; + return val; + } + + int block_until_value(void) + { + std::unique_lock lock(m); + while (q.empty()) + { + // release lock as long as the wait and reaquire it afterwards. + c.wait(lock); + } + return true; + } + + void clear(void) + { + std::unique_lock lock(m); + q.clear(); + } + +private: + std::deque q; + mutable std::mutex m; + std::condition_variable c; + int maxSize; +}; + +#endif // ThreadSafeQueue_h diff --git a/src/Connection.cpp b/src/Connection.cpp index 26dfcae..80f990a 100644 --- a/src/Connection.cpp +++ b/src/Connection.cpp @@ -1,68 +1,447 @@ #include "Connection.h" -#include // For error codes -#include // Temporary, for basic error reporting +// // #include // For error codes +// // #include // Temporary, for basic error reporting -using websocketpp::lib::placeholders::_1; -using websocketpp::lib::placeholders::_2; -using websocketpp::lib::bind; +// TODO: create context +// -Connection::Connection(const std::string& url, int port) : m_url(url), m_port(port) +Connection::Connection(net::io_context& ioc) : resolver_(net::make_strand(ioc)), ws_(net::make_strand(ioc)), the_io_context(ioc) { + message_queue = std::make_unique>(); + subscription_list = std::make_unique>(); + + messageThread = std::thread(&Connection::message_processing, this); + messageThread.detach(); +} + +std::thread Connection::launch_socket(const char * host, const char * port) { - // Initialize client endpoint (more setup might be needed later) - m_client = new Client(); - m_client->init_asio(); - m_client->set_access_channels(websocketpp::log::alevel::all); // Optional: suppress logs + std::thread socketThread([this, host, port] + { + while (!socket_closed) + { + run(host, port); + // Run the I/O service. The call will return when + // the socket is closed. + the_io_context.run(); + + // set to false as safety in case ::fail wasn't called. + socket_connected = false; + the_io_context.reset(); // reset the socket + } + }); + while (!socket_connected) + { + // Busy wait until socket has connected for the first time + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + return socketThread; } -Connection::~Connection() { - // Ensure clean disconnect here (if connected) +// Start the asynchronous operation +void Connection::run(char const* host, char const* port) +{ + writable = false; + host_ = host; + port_ = port; + + // Look up the domain name + resolver_.async_resolve(host, port, beast::bind_front_handler(&Connection::on_resolve, shared_from_this())); } -bool Connection::connect() { - // 1. Create a connection URI - std::cout << "Url: " << m_url << std::endl; - std::string uri = m_url + ":" + std::to_string(m_port); - std::cout << "Uri: " << uri << std::endl; - ErrorCode ec; - ConnectionPtr con = m_client->get_connection(uri, ec); - - if (ec) { - std::cout << "Could not create connection: " << ec.message() << std::endl; - return false; - } - - m_client->set_open_handler(bind(&Connection::onWebSocketOpen, this, _1)); - m_client->set_close_handler(bind(&Connection::onWebSocketClose, this, _1)); - m_client->set_message_handler(bind(&Connection::onWebSocketMessage, this, _1, _2)); +// Report a failure +void Connection::fail(beast::error_code ec, char const* what) +{ + socket_connected = false; + std::cerr << "Socket Fail / Closing: " << what << ": " << ec.message() << "\n"; +} + +void Connection::stop() { + // Close the WebSocket connection + socket_closed = true; + ws_.async_close(websocket::close_code::normal, beast::bind_front_handler(&Connection::on_close, shared_from_this())); +} - // 3. Start the connection - m_client->connect(con); +void Connection::on_close(beast::error_code ec) +{ + if(ec) return fail(ec, "close"); + std::string s(boost::asio::buffer_cast(buffer_.data()), buffer_.size()); +} + +void Connection::on_resolve(beast::error_code ec, tcp::resolver::results_type results) +{ + if(ec) return fail(ec, "resolve"); - // 4. Start the ASIO io_service run loop - m_client->run(); + // Set the timeout for the operation + beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(5)); - return true; // Replace with proper success/failure logic + // Make the connection on the IP address we get from a lookup + beast::get_lowest_layer(ws_).async_connect(results, beast::bind_front_handler(&Connection::on_connect, shared_from_this())); } -void Connection::close() { - // Add logic to gracefully close the WebSocket connection +void Connection::on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep) +{ + if(ec) return fail(ec, "connect"); + + // Turn off the timeout on the tcp_stream, because + // the websocket stream has its own timeout system. + beast::get_lowest_layer(ws_).expires_never(); + + // Set suggested timeout settings for the websocket + ws_.set_option(websocket::stream_base::timeout::suggested(beast::role_type::client)); + + // Set a decorator to change the User-Agent of the handshake + ws_.set_option(websocket::stream_base::decorator( + [](websocket::request_type& req) + { + req.set(http::field::user_agent, + std::string(BOOST_BEAST_VERSION_STRING) + + " websocket-client-async"); + })); + + host_ += ':' + std::to_string(ep.port()); + // Perform the websocket handshake + ws_.async_handshake(host_, "/socketcluster/", beast::bind_front_handler(&Connection::on_handshake, shared_from_this())); + + std::cout << "Successfully Connected to Socket." << std::endl; } -void Connection::send(const std::string& data) { - // Use m_client to send data over the WebSocket connection +void Connection::on_handshake(beast::error_code ec) +{ + std::cout << "Handshake Attempt" << std::endl; + if(ec) + return fail(ec, "handshake"); + + message_queue->push_front("{\"event\":\"#handshake\",\"data\":{\"authToken\":null},\"cid\": 1 }"); + msgCounter = 1; + std::unique_lock ul(writable_m); + writable = true; + writable_cv.notify_one(); + + socket_connected = true; + // Kick off Async Reading. + // Read a message into our buffer + ws_.async_read(buffer_, beast::bind_front_handler(&Connection::on_read, shared_from_this())); + + // TODO: Resubscribe to all channels after reconnect + // subscription_list->for_each([this](subscription const &n) + // {resubscribe(std::get<0>(n), std::get<1>(n));}); + std::cout << "Handshake Succeeded" << std::endl; } -void Connection::onWebSocketOpen(ConnectionHDL hdl) { - m_connectionHdl = hdl; - std::cout << "Connection established." << std::endl; + +// CORE MESSAGE PROCESSING +void Connection::message_processing() { + while (!socket_closed) { + /* + Only one call to async_write is allowed at a time. + on_write (callback of async_write) sets writable = true again. + */ + + message_queue->block_until_value(); + std::unique_lock ul(writable_m); + writable_cv.wait(ul,[this] {return writable;}); + + std::string message = message_queue->dequeue(); + writable = false; + + // Post into the main processing thread + net::post(the_io_context, [this, message] + { + ws_.async_write(net::buffer(message), beast::bind_front_handler(&Connection::on_write, shared_from_this())); + std::cout << "Wrote: " << message << std::endl; + }); + } } -void Connection::onWebSocketClose(ConnectionHDL hdl) { - std::cout << "Connection closed." << std::endl; +void Connection::on_write(beast::error_code ec, std::size_t bytes_transferred) +{ + boost::ignore_unused(bytes_transferred); + + if(ec) + return fail(ec, "write"); + + std::unique_lock ul(writable_m); //lock is applied on mutex m by thread t2 + writable = true; + writable_cv.notify_one(); //notify to condition variable } -void Connection::onWebSocketMessage(ConnectionHDL hdl, Client::message_ptr msg) { - std::cout << "Incoming message." << std::endl; - // ... Process incoming message (payload is in msg->get_payload()) -} +void Connection::on_read(beast::error_code ec, std::size_t bytes_transferred) +{ + boost::ignore_unused(bytes_transferred); + + if(ec) return fail(ec, "read"); + if (bytes_transferred == 0) + { + // Send ping + std::cout << "Ping about to send" << std::endl; + message_queue->enqueue(""); + } + else + { + std::string s(boost::asio::buffer_cast(buffer_.data()), buffer_.size()); + + std::cout << "Response From Server: " << s << std::endl; + // Json::Value root; + // Json::Reader reader; + // bool parsingSuccessful = reader.parse( s.c_str(), root); + // if (!parsingSuccessful) + // { + // std::cout << "[ScClient Service] data received is either null or not json parsable." << std::endl; + // } + // else + // { + // Json::Value msgData; + + // // int exists = json_object_object_get_ex(jobj, "data", &msgData); + // if (root.isMember("data")) + // { + // std::string channel = root["channel"].asString(); + // Json::Value data = root["data"]; + + // if (!channel.empty()) + // { + // std::shared_ptr sub = subscription_list->find_first_if([channel](subscription const &t) + // { return std::get<0>(t) == channel; }); + // if (sub != nullptr) + // { + // std::get<1>(*sub)(channel, data); + // } + // } + // } + // json_object_put(jobj); + // } + } + buffer_.clear(); + // Async_read recursively calls itself, so kick off once here. + ws_.async_read(buffer_, beast::bind_front_handler(&Connection::on_read, shared_from_this())); +} + + +// void Connection::publish(std::string channel, Json::Value* data) { +// // publish(channel, json_object_to_json_string(data)); +// } + +// void Connection::publish(std::string channel, std::string data) { +// //ParsingEngine.get_publish_json_object(); + +// // channel = test_channel_1 + +// // {event: #publish, channel: test_channel_1, cid: 72, data: {*user_data*}} + +// // If JSON String // + +// // Json::Value output = ParsingEngine.get_json_object(channel, data, event, msgCounter) + +// Json::Value dataobj; +// Json::Reader reader; + +// bool test = reader.parse(data, dataobj); + +// if(!test) +// { +// std::cout << reader.getFormattedErrorMessages(); +// return 0; +// } + +// Json::Value obj; + +// obj["channel"] = channel; +// obj["data"] = dataobj; +// obj["event"] = "#publish"; +// obj["msgCounter"] = msgCounter; + +// return obj; + + +// // "{\"event\":\"#handshake\",\"data\":{\"authToken\":null},\"cid\": 1 }" + +// //////////////////// + +// // json_object *dataObj = json_tokener_parse((char *)data.c_str()); +// // json_object *jobj = json_object_new_object(); +// // json_object *eventobject = json_object_new_string("#publish"); +// // json_object *jobj1 = json_object_new_object(); +// // json_object *cnt = json_object_new_int(++msgCounter); +// // json_object *channelobject = json_object_new_string(channel.c_str()); +// // json_object_object_add(jobj1, "channel", channelobject); +// // json_object_object_add(jobj1, "data", dataObj); +// // json_object_object_add(jobj, "event", eventobject); +// // json_object_object_add(jobj, "data", jobj1); +// // json_object_object_add(jobj, "cid", cnt); +// // message_queue->enqueue((char *)json_object_to_json_string(jobj)); +// // json_object_put(jobj); + +// } + + + + + + + + + + + + + + + +// // using websocketpp::lib::placeholders::_1; +// // using websocketpp::lib::placeholders::_2; +// // using websocketpp::lib::bind; + +// // Connection::Connection(const std::string& url, int port) : m_url(url), m_port(port) +// // { +// // // Initialize client endpoint (more setup might be needed later) +// // m_client = new Client(); +// // m_client->init_asio(); +// // m_client->set_access_channels(websocketpp::log::alevel::all); // Optional: suppress logs +// // } + +// // Connection::~Connection() { +// // // Ensure clean disconnect here (if connected) +// // } + +// // bool Connection::connect() { +// // // 1. Create a connection URI +// // std::cout << "Url: " << m_url << std::endl; +// // std::string uri = m_url + ":" + std::to_string(m_port); +// // std::cout << "Uri: " << uri << std::endl; + +// // ErrorCode ec; +// // ConnectionPtr con = m_client->get_connection(uri, ec); + +// // if (ec) { +// // std::cout << "Could not create connection: " << ec.message() << std::endl; +// // return false; +// // } + +// // m_client->set_open_handler(bind(&Connection::onWebSocketOpen, this, _1)); +// // m_client->set_close_handler(bind(&Connection::onWebSocketClose, this, _1)); +// // m_client->set_message_handler(bind(&Connection::onWebSocketMessage, this, _1, _2)); + +// // // 3. Start the connection +// // m_client->connect(con); + +// // // 4. Start the ASIO io_service run loop +// // m_client->run(); + +// // return true; // Replace with proper success/failure logic +// // } + +// // void Connection::close() { +// // // Add logic to gracefully close the WebSocket connection +// // } + +// // void Connection::send(const std::string& data) { +// // // Use m_client to send data over the WebSocket connection +// // } + +// // void Connection::onWebSocketOpen(ConnectionHDL hdl) { +// // m_connectionHdl = hdl; +// // std::cout << "Connection established." << std::endl; +// // } + +// // void Connection::onWebSocketClose(ConnectionHDL hdl) { +// // std::cout << "Connection closed." << std::endl; +// // } + +// // void Connection::onWebSocketMessage(ConnectionHDL hdl, Client::message_ptr msg) { +// // std::cout << "Incoming message." << std::endl; +// // // ... Process incoming message (payload is in msg->get_payload()) +// // } + +// #include "Connection.h" + +// Connection::Connection(const std::string& uri) : m_uri(uri), m_isConnected(false) { +// // Initialize the client +// m_client.init_asio(); +// m_client.set_access_channels(websocketpp::log::alevel::all); +// m_client.clear_access_channels(websocketpp::log::alevel::frame_payload); +// // m_client.clear_access_channels(websocketpp::log::alevel::all); +// // m_client.set_access_channels(websocketpp::log::alevel::connect); +// // m_client.set_access_channels(websocketpp::log::alevel::disconnect); +// // m_client.set_access_channels(websocketpp::log::alevel::app); + +// // Bind the handlers +// m_client.set_open_handler(std::bind(&Connection::on_open, this, std::placeholders::_1)); +// m_client.set_close_handler(std::bind(&Connection::on_close, this, std::placeholders::_1)); +// m_client.set_message_handler(std::bind(&Connection::on_message, this, std::placeholders::_1, std::placeholders::_2)); +// m_client.set_fail_handler(std::bind(&Connection::on_fail, this, std::placeholders::_1)); +// } + +// Connection::~Connection() { +// close(); +// } + +// ConnectionResult Connection::connect() { +// websocketpp::lib::error_code ec; +// ConnectionPtr con = m_client.get_connection(m_uri, ec); +// if (ec) { +// std::cout << "Could not create connection because: " << ec.message() << std::endl; +// return ConnectionResult::ERROR_UNKNOWN; +// } + +// // Save the connection handle for later +// m_connectionHdl = con->get_handle(); + +// // Connect +// m_client.connect(con); + + +// // Start ASIO io_service run loop in the background +// // Note: In a real application, consider running this on a separate thread +// m_client.run(); + +// return ConnectionResult::SUCCESS; // You may want to adjust this based on actual success or failure +// } + +// void Connection::close() { +// if (m_isConnected) { +// websocketpp::lib::error_code ec; +// m_client.close(m_connectionHdl, websocketpp::close::status::normal, "", ec); +// if (ec) { +// std::cout << "Error closing WebSocket: " << ec.message() << std::endl; +// } +// } +// } + +// void Connection::send(const std::string& message) { +// if (m_isConnected) { +// m_client.send(m_connectionHdl, message, websocketpp::frame::opcode::text); +// } else { +// std::cout << "Cannot send message, WebSocket not connected." << std::endl; +// } +// } + +// void Connection::on_open(ConnectionHDL hdl) { +// m_isConnected = true; +// std::cout << "WebSocket connection opened." << std::endl; + +// // Here, you could also perform the initial handshake or send a message +// // Construct the handshake message +// std::string handshakeMsg = "{\"event\":\"#handshake\",\"data\":{\"authToken\":null},\"cid\":1}"; + +// // Send the handshake message +// std::cout << "trying to send handshake" << std::endl; +// send(handshakeMsg); +// } + +// void Connection::on_close(ConnectionHDL hdl) { +// m_isConnected = false; +// std::cout << "WebSocket connection closed." << std::endl; +// } + +// void Connection::on_message(ConnectionHDL hdl, MessagePtr msg) { +// std::cout << "Received message: " << msg->get_payload() << std::endl; +// // Here, you could process the message, parse JSON, etc. +// } + +// void Connection::on_fail(ConnectionHDL hdl) { +// m_isConnected = false; +// std::cout << "WebSocket connection failed to open." << std::endl; +// // You can attempt a reconnect here or notify the user/application +// } + diff --git a/src/ParsingEngine.cpp b/src/ParsingEngine.cpp index ec5740c..f15a281 100644 --- a/src/ParsingEngine.cpp +++ b/src/ParsingEngine.cpp @@ -1 +1,38 @@ -// Static Class (Pass Message -> Get Object) \ No newline at end of file +// // Static Class (Pass Message -> Get Object) + + +// #include "ParsingEngine.h" +// #include +// #include + +// namespace scccpp +// { + + +// int ParsingEngine::get_parse( std::string channel, std::string data, std::string event, int msgCounter) { + +// Json::Value root; +// json_template >> root; + +// // json_object *dataObj = json_tokener_parse((char *)data.c_str()); +// // json_object *jobj = json_object_new_object(); +// // json_object *eventobject = json_object_new_string("#publish"); +// // json_object *jobj1 = json_object_new_object(); +// // json_object *cnt = json_object_new_int(++msgCounter); +// // json_object *channelobject = json_object_new_string(channel.c_str()); +// // json_object_object_add(jobj1, "channel", channelobject); +// // json_object_object_add(jobj1, "data", dataObj); +// // json_object_object_add(jobj, "event", eventobject); +// // json_object_object_add(jobj, "data", jobj1); +// // json_object_object_add(jobj, "cid", cnt); +// // message_queue->enqueue((char *)json_object_to_json_string(jobj)); +// // json_object_put(jobj); + +// return 1; +// } + + +// } // namespace scccpp + + + diff --git a/src/SocketClusterClient.cpp b/src/SocketClusterClient.cpp index e3705eb..280fa75 100644 --- a/src/SocketClusterClient.cpp +++ b/src/SocketClusterClient.cpp @@ -12,8 +12,12 @@ SocketClusterClient::~SocketClusterClient() { // No explicit cleanup needed! Smart pointers handle deletion } -std::shared_ptr SocketClusterClient::createConnection(const std::string& url, int port) { - auto connection = std::make_shared(url, port); +std::shared_ptr SocketClusterClient::createConnection(const char * url, const char * port) { + boost::asio::io_context the_io_context; + + auto connection = std::make_shared(the_io_context); + std::thread socketThread = connection->launch_socket(url, port); + socketThread.detach(); m_connections.push_back(connection); return connection; } diff --git a/test/examples/client_classed.cpp b/test/examples/client_classed.cpp index e69de29..2283ce8 100644 --- a/test/examples/client_classed.cpp +++ b/test/examples/client_classed.cpp @@ -0,0 +1,53 @@ +#include "SocketClusterClient.h" +#include "Connection.h" +#include +#include + +#define ENDPOINT "127.0.0.1" + +// These callbacks are not responsible for freeing the memory of the data. +// The data is freed by the client class. +// void test(std::string event, json_object *data) +// { +// std::cout << "In the event callback, data: " << json_object_to_json_string(data) << std::endl; +// } + +// void test_publish(std::shared_ptr theWebsocket) +// { +// json_object *data = json_object_new_object();json_object *jobj = json_object_new_object(); +// json_object *isCodeRunning = json_object_new_int(true); +// json_object_object_add(jobj, "isCodeRunning", isCodeRunning); +// theWebsocket->publish("test", json_object_to_json_string(jobj)); +// json_object_put(jobj); +// } + +int main() +{ + // boost::asio::io_context the_io_context; + std::shared_ptr client = std::make_shared(); + auto socket = client->createConnection(ENDPOINT, "8000"); + + + // Go ahead and continue running and doing stuff in the main thread after launching socket. + + /* + Subscribe Callbacks + Note: these can also be member functions using: + client->connected_callback = std::bind(&myClass::myFunc, myInstance, std::placeholders::_1); + See example_client_classed.cpp for an example of this. + */ + // theWebsocket->subscribe("test", test); + + // usleep(1000000); + // test_publish(theWebsocket); + // usleep(1000000); + + // // Unsubscribe from the event (optional) + // theWebsocket->unsubscribe("test"); + + // // Close the socket + // theWebsocket->stop(); + // socketThread.join(); + + return EXIT_SUCCESS; +} \ No newline at end of file diff --git a/test/examples/client_simple.cpp b/test/examples/client_simple.cpp deleted file mode 100644 index e69de29..0000000 diff --git a/test/test.cpp b/test/test.cpp index 8d0fa95..a0a3381 100644 --- a/test/test.cpp +++ b/test/test.cpp @@ -2,17 +2,18 @@ #include #include -#include "SocketClusterClient.h" -#include "Connection.h" +// #include "SocketClusterClient.h" +// #include "Connection.h" int connect_to_socket() { - SocketClusterClient *c = new SocketClusterClient(); - std::string url = "ws://127.0.0.1"; - std::shared_ptr con = c->createConnection(url, 8000); + // SocketClusterClient *c = new SocketClusterClient(); + // std::string url = "ws://127.0.0.1"; + // std::shared_ptr con = c->createConnection(url, 8000); + // con->connect(); - std::promise connection_promise; - auto connection_future = connection_promise.get_future(); + // std::promise connection_promise; + // auto connection_future = connection_promise.get_future(); return 0; } From 7e28b6e938e0632ef791cfa83d05801da10dbbd2 Mon Sep 17 00:00:00 2001 From: Daniel Cloran Date: Sun, 14 Apr 2024 15:43:28 -0400 Subject: [PATCH 2/9] started parsing engine pair programming --- CMakeLists.txt | 16 +- include/Connection.h | 9 +- include/ParsingEngine.h | 36 ++-- include/SocketClusterClient.h | 2 + src/Connection.cpp | 281 +++++++++++++++++-------------- src/ParsingEngine.cpp | 54 +++--- src/SocketClusterClient.cpp | 4 +- test/examples/client_classed.cpp | 21 ++- 8 files changed, 230 insertions(+), 193 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 1af474b..4c09741 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,14 +13,14 @@ FetchContent_Declare(JsonCpp FetchContent_MakeAvailable(JsonCpp) # === === -set(BOOST_ENABLE_CMAKE ON) -include(FetchContent) -FetchContent_Declare( - Boost - GIT_REPOSITORY https://github.com/boostorg/boost.git - GIT_TAG boost-1.80.0 -) -FetchContent_MakeAvailable(Boost) +# set(BOOST_ENABLE_CMAKE ON) +# include(FetchContent) +# FetchContent_Declare( +# Boost +# GIT_REPOSITORY https://github.com/boostorg/boost.git +# GIT_TAG boost-1.80.0 +# ) +# FetchContent_MakeAvailable(Boost) find_package(Boost 1.76.0 REQUIRED) # header only libraries must not be added here diff --git a/include/Connection.h b/include/Connection.h index 84c00d3..e11e1c5 100644 --- a/include/Connection.h +++ b/include/Connection.h @@ -39,12 +39,14 @@ class Connection : public std::enable_shared_from_this { Connection(net::io_context& ioc); ~Connection() {}; + std::thread socketThread; + std::thread launch_socket(const char * host, const char * port); void stop(); - // void subscribe(std::string channel, socketCallback callback); - // void unsubscribe(std::string channel); - // void publish(std::string channel, std::string data); + void subscribe(std::string channel, socketCallback callback); + void unsubscribe(std::string channel); + void publish(std::string channel, std::string data); // void publish(std::string channel, Json::Value *data); private: @@ -61,6 +63,7 @@ class Connection : public std::enable_shared_from_this { std::string m_uri; int msgCounter = 1; std::thread messageThread; + void message_processing(); net::io_context& the_io_context; diff --git a/include/ParsingEngine.h b/include/ParsingEngine.h index 91bc07b..27d0cd6 100644 --- a/include/ParsingEngine.h +++ b/include/ParsingEngine.h @@ -1,27 +1,27 @@ -// #ifndef PARSING_ENGINE_HPP -// #define PARSING_ENGINE_HPP +#ifndef PARSING_ENGINE_HPP +#define PARSING_ENGINE_HPP -// #include "json/json.h" +#include "json/json.h" -// namespace scccpp -// { +namespace scccpp +{ -// class ParsingEngine -// { -// private: -// const std::istream json_template("{\"event\":\"#handshake\",\"data\":{\"authToken\":null},\"cid\": 1 }"); -// public: -// ParsingEngine(/* args */); -// ~ParsingEngine(); + class ParsingEngine + { + private: + const std::istream json_template("{\"event\":\"#handshake\",\"data\":{\"authToken\":null},\"cid\": 1 }"); + public: + ParsingEngine(/* args */); + ~ParsingEngine(); -// int get_parse( std::string channel, std::string data, std::string event, int msgCounter); + int get_parse( std::string channel, std::string data, std::string event, int msgCounter); -// }; + }; -// ParsingEngine::ParsingEngine(/* args */) {} -// ParsingEngine::~ParsingEngine(){} + ParsingEngine::ParsingEngine(/* args */) {} + ParsingEngine::~ParsingEngine(){} -// } // namespace scccpp +} // namespace scccpp @@ -30,4 +30,4 @@ -// #endif \ No newline at end of file +#endif \ No newline at end of file diff --git a/include/SocketClusterClient.h b/include/SocketClusterClient.h index 42671e2..89186b8 100644 --- a/include/SocketClusterClient.h +++ b/include/SocketClusterClient.h @@ -17,6 +17,8 @@ #include #include "Connection.h" +boost::asio::io_context the_io_context; + class SocketClusterClient { public: SocketClusterClient(); diff --git a/src/Connection.cpp b/src/Connection.cpp index 80f990a..3cab17f 100644 --- a/src/Connection.cpp +++ b/src/Connection.cpp @@ -3,9 +3,10 @@ // // #include // Temporary, for basic error reporting // TODO: create context -// +// -Connection::Connection(net::io_context& ioc) : resolver_(net::make_strand(ioc)), ws_(net::make_strand(ioc)), the_io_context(ioc) { +Connection::Connection(net::io_context &ioc) : resolver_(net::make_strand(ioc)), ws_(net::make_strand(ioc)), the_io_context(ioc) +{ message_queue = std::make_unique>(); subscription_list = std::make_unique>(); @@ -13,22 +14,22 @@ Connection::Connection(net::io_context& ioc) : resolver_(net::make_strand(ioc)), messageThread.detach(); } -std::thread Connection::launch_socket(const char * host, const char * port) +std::thread Connection::launch_socket(const char *host, const char *port) { std::thread socketThread([this, host, port] - { + { while (!socket_closed) { run(host, port); // Run the I/O service. The call will return when // the socket is closed. + the_io_context.run(); - // set to false as safety in case ::fail wasn't called. - socket_connected = false; - the_io_context.reset(); // reset the socket - } - }); + // // set to false as safety in case ::fail wasn't called. + // socket_connected = false; + // the_io_context.reset(); // reset the socket + } }); while (!socket_connected) { // Busy wait until socket has connected for the first time @@ -38,7 +39,7 @@ std::thread Connection::launch_socket(const char * host, const char * port) } // Start the asynchronous operation -void Connection::run(char const* host, char const* port) +void Connection::run(char const *host, char const *port) { writable = false; host_ = host; @@ -48,15 +49,15 @@ void Connection::run(char const* host, char const* port) resolver_.async_resolve(host, port, beast::bind_front_handler(&Connection::on_resolve, shared_from_this())); } - // Report a failure -void Connection::fail(beast::error_code ec, char const* what) +void Connection::fail(beast::error_code ec, char const *what) { socket_connected = false; std::cerr << "Socket Fail / Closing: " << what << ": " << ec.message() << "\n"; } -void Connection::stop() { +void Connection::stop() +{ // Close the WebSocket connection socket_closed = true; ws_.async_close(websocket::close_code::normal, beast::bind_front_handler(&Connection::on_close, shared_from_this())); @@ -64,13 +65,15 @@ void Connection::stop() { void Connection::on_close(beast::error_code ec) { - if(ec) return fail(ec, "close"); - std::string s(boost::asio::buffer_cast(buffer_.data()), buffer_.size()); + if (ec) + return fail(ec, "close"); + std::string s(boost::asio::buffer_cast(buffer_.data()), buffer_.size()); } void Connection::on_resolve(beast::error_code ec, tcp::resolver::results_type results) { - if(ec) return fail(ec, "resolve"); + if (ec) + return fail(ec, "resolve"); // Set the timeout for the operation beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(5)); @@ -81,7 +84,8 @@ void Connection::on_resolve(beast::error_code ec, tcp::resolver::results_type re void Connection::on_connect(beast::error_code ec, tcp::resolver::results_type::endpoint_type ep) { - if(ec) return fail(ec, "connect"); + if (ec) + return fail(ec, "connect"); // Turn off the timeout on the tcp_stream, because // the websocket stream has its own timeout system. @@ -92,11 +96,11 @@ void Connection::on_connect(beast::error_code ec, tcp::resolver::results_type::e // Set a decorator to change the User-Agent of the handshake ws_.set_option(websocket::stream_base::decorator( - [](websocket::request_type& req) + [](websocket::request_type &req) { req.set(http::field::user_agent, - std::string(BOOST_BEAST_VERSION_STRING) + - " websocket-client-async"); + std::string(BOOST_BEAST_VERSION_STRING) + + " websocket-client-async"); })); host_ += ':' + std::to_string(ep.port()); @@ -108,8 +112,7 @@ void Connection::on_connect(beast::error_code ec, tcp::resolver::results_type::e void Connection::on_handshake(beast::error_code ec) { - std::cout << "Handshake Attempt" << std::endl; - if(ec) + if (ec) return fail(ec, "handshake"); message_queue->push_front("{\"event\":\"#handshake\",\"data\":{\"authToken\":null},\"cid\": 1 }"); @@ -126,13 +129,13 @@ void Connection::on_handshake(beast::error_code ec) // TODO: Resubscribe to all channels after reconnect // subscription_list->for_each([this](subscription const &n) // {resubscribe(std::get<0>(n), std::get<1>(n));}); - std::cout << "Handshake Succeeded" << std::endl; } - // CORE MESSAGE PROCESSING -void Connection::message_processing() { - while (!socket_closed) { +void Connection::message_processing() +{ + while (!socket_closed) + { /* Only one call to async_write is allowed at a time. on_write (callback of async_write) sets writable = true again. @@ -140,17 +143,17 @@ void Connection::message_processing() { message_queue->block_until_value(); std::unique_lock ul(writable_m); - writable_cv.wait(ul,[this] {return writable;}); + writable_cv.wait(ul, [this] + { return writable; }); std::string message = message_queue->dequeue(); writable = false; // Post into the main processing thread net::post(the_io_context, [this, message] - { + { ws_.async_write(net::buffer(message), beast::bind_front_handler(&Connection::on_write, shared_from_this())); - std::cout << "Wrote: " << message << std::endl; - }); + std::cout << "Wrote: " << message << std::endl; }); } } @@ -158,141 +161,161 @@ void Connection::on_write(beast::error_code ec, std::size_t bytes_transferred) { boost::ignore_unused(bytes_transferred); - if(ec) + if (ec) return fail(ec, "write"); - std::unique_lock ul(writable_m); //lock is applied on mutex m by thread t2 + std::unique_lock ul(writable_m); // lock is applied on mutex m by thread t2 writable = true; - writable_cv.notify_one(); //notify to condition variable + writable_cv.notify_one(); // notify to condition variable } void Connection::on_read(beast::error_code ec, std::size_t bytes_transferred) { boost::ignore_unused(bytes_transferred); - if(ec) return fail(ec, "read"); + if (ec) + return fail(ec, "read"); if (bytes_transferred == 0) { // Send ping - std::cout << "Ping about to send" << std::endl; message_queue->enqueue(""); } else { - std::string s(boost::asio::buffer_cast(buffer_.data()), buffer_.size()); + std::string s(boost::asio::buffer_cast(buffer_.data()), buffer_.size()); std::cout << "Response From Server: " << s << std::endl; - // Json::Value root; - // Json::Reader reader; - // bool parsingSuccessful = reader.parse( s.c_str(), root); - // if (!parsingSuccessful) - // { - // std::cout << "[ScClient Service] data received is either null or not json parsable." << std::endl; - // } - // else - // { - // Json::Value msgData; - - // // int exists = json_object_object_get_ex(jobj, "data", &msgData); - // if (root.isMember("data")) - // { - // std::string channel = root["channel"].asString(); - // Json::Value data = root["data"]; - - // if (!channel.empty()) - // { - // std::shared_ptr sub = subscription_list->find_first_if([channel](subscription const &t) - // { return std::get<0>(t) == channel; }); - // if (sub != nullptr) - // { - // std::get<1>(*sub)(channel, data); - // } - // } - // } - // json_object_put(jobj); - // } + Json::Value root; + Json::Reader reader; + bool parsingSuccessful = reader.parse(s.c_str(), root); + if (!parsingSuccessful) + { + std::cout << "[ScClient Service] data received is either null or not json parsable." << std::endl; + } + else + { + // std:: cout << root["data"].asString() << std::endl; + // Json::Value msgData; + + // // int exists = json_object_object_get_ex(jobj, "data", &msgData); + // if (root.isMember("data")) + // { + // std::string channel = root["channel"].asString(); + // Json::Value data = root["data"]; + + // if (!channel.empty()) + // { + // std::shared_ptr sub = subscription_list->find_first_if([channel](subscription const &t) + // { return std::get<0>(t) == channel; }); + // if (sub != nullptr) + // { + // std::get<1>(*sub)(channel, data); + // } + // } + // } + // json_object_put(jobj); + } } buffer_.clear(); // Async_read recursively calls itself, so kick off once here. ws_.async_read(buffer_, beast::bind_front_handler(&Connection::on_read, shared_from_this())); } - // void Connection::publish(std::string channel, Json::Value* data) { // // publish(channel, json_object_to_json_string(data)); // } -// void Connection::publish(std::string channel, std::string data) { -// //ParsingEngine.get_publish_json_object(); - -// // channel = test_channel_1 - -// // {event: #publish, channel: test_channel_1, cid: 72, data: {*user_data*}} - -// // If JSON String // - -// // Json::Value output = ParsingEngine.get_json_object(channel, data, event, msgCounter) - -// Json::Value dataobj; -// Json::Reader reader; - -// bool test = reader.parse(data, dataobj); - -// if(!test) -// { -// std::cout << reader.getFormattedErrorMessages(); -// return 0; -// } - -// Json::Value obj; - -// obj["channel"] = channel; -// obj["data"] = dataobj; -// obj["event"] = "#publish"; -// obj["msgCounter"] = msgCounter; - -// return obj; - - -// // "{\"event\":\"#handshake\",\"data\":{\"authToken\":null},\"cid\": 1 }" - -// //////////////////// - -// // json_object *dataObj = json_tokener_parse((char *)data.c_str()); -// // json_object *jobj = json_object_new_object(); -// // json_object *eventobject = json_object_new_string("#publish"); -// // json_object *jobj1 = json_object_new_object(); -// // json_object *cnt = json_object_new_int(++msgCounter); -// // json_object *channelobject = json_object_new_string(channel.c_str()); -// // json_object_object_add(jobj1, "channel", channelobject); -// // json_object_object_add(jobj1, "data", dataObj); -// // json_object_object_add(jobj, "event", eventobject); -// // json_object_object_add(jobj, "data", jobj1); -// // json_object_object_add(jobj, "cid", cnt); -// // message_queue->enqueue((char *)json_object_to_json_string(jobj)); -// // json_object_put(jobj); - -// } - - - - +void Connection::subscribe(std::string channel, socketCallback callback) { + std::shared_ptr sub = subscription_list->find_first_if([channel](subscription const &t) + { return std::get<0>(t) == channel; }); + if (sub == nullptr) + { + + // json_object *jobj = json_object_new_object(); + // json_object *eventobject = json_object_new_string("#subscribe"); + // json_object *jobj1 = json_object_new_object(); + // json_object *channelobject = json_object_new_string(channel.c_str()); + // json_object_object_add(jobj, "event", eventobject); + // json_object_object_add(jobj1, "channel", channelobject); + // json_object_object_add(jobj, "data", jobj1); + // json_object *cnt = json_object_new_int(++msgCounter); + // json_object_object_add(jobj, "cid", cnt); + // message_queue->enqueue(json_object_to_json_string(jobj)); + // json_object_put(jobj); + subscription_list->push_front(std::make_tuple(channel, callback)); + std::cout << "Subscribed to " << channel << "." << std::endl; + } + else + { + std::cout << "Duplicate Subscribe to " << channel << " attempted" << std::endl; + } +} +void Connection::unsubscribe(std::string channel) { + subscription_list->remove_if([channel](subscription const &n) + { return std::get<0>(n) == channel; }); + std::cout << "Unsubscribed from " << channel << std::endl; +} +void Connection::publish(std::string channel, std::string data) +{ + std::cout << "Publishing: " << channel << ":" << data << std::endl; + // Json::Value output = ParsingEngine.get_json_object(channel, data, event, msgCounter) + Json::Value userData; + Json::Reader reader; + bool parse_succeeded = reader.parse(data, userData); + if (!parse_succeeded) + { + std::cout << reader.getFormattedErrorMessages(); + return 0; + } + // { "event": "#publish", "data": { "channel": "test", "data": { "isCodeRunning": 1 } }, "cid": 3 } + // { "channel" : "test", "event" : "#publish", "msgCounter" : 1} + // { "channel" : "test","data" : { "amIAlive" : true},"event" : "#publish","msgCounter" : 1 + Json::Value obj; + obj["event"] = "#publish"; + obj["channel"] = channel; + obj["cid"] = ++msgCounter; + obj["data"] = obj["channel"]; + + Json::StreamWriterBuilder builder; + const std::string output = Json::writeString(builder, obj); + std::cout << "JSON OUTPUT: " << output << std::endl; + + message_queue->enqueue(output.c_str()); + + // "{\"event\":\"#handshake\",\"data\":{\"authToken\":null},\"cid\": 1 }" + + //////////////////// + + // json_object *dataObj = json_tokener_parse((char *)data.c_str()); + // json_object *jobj = json_object_new_object(); + // json_object *eventobject = json_object_new_string("#publish"); + // json_object *jobj1 = json_object_new_object(); + // json_object *cnt = json_object_new_int(++msgCounter); + // json_object *channelobject = json_object_new_string(channel.c_str()); + // json_object_object_add(jobj1, "channel", channelobject); + // json_object_object_add(jobj1, "data", dataObj); + // json_object_object_add(jobj, "event", eventobject); + // json_object_object_add(jobj, "data", jobj1); + // json_object_object_add(jobj, "cid", cnt); + // message_queue->enqueue((char *)json_object_to_json_string(jobj)); + // json_object_put(jobj); +} // // using websocketpp::lib::placeholders::_1; // // using websocketpp::lib::placeholders::_2; // // using websocketpp::lib::bind; -// // Connection::Connection(const std::string& url, int port) : m_url(url), m_port(port) +// // Connection::Connection(const std::string& url, int port) : m_url(url), m_port(port) // // { // // // Initialize client endpoint (more setup might be needed later) // // m_client = new Client(); @@ -312,15 +335,15 @@ void Connection::on_read(beast::error_code ec, std::size_t bytes_transferred) // // ErrorCode ec; // // ConnectionPtr con = m_client->get_connection(uri, ec); - + // // if (ec) { // // std::cout << "Could not create connection: " << ec.message() << std::endl; // // return false; // // } - + // // m_client->set_open_handler(bind(&Connection::onWebSocketOpen, this, _1)); // // m_client->set_close_handler(bind(&Connection::onWebSocketClose, this, _1)); -// // m_client->set_message_handler(bind(&Connection::onWebSocketMessage, this, _1, _2)); +// // m_client->set_message_handler(bind(&Connection::onWebSocketMessage, this, _1, _2)); // // // 3. Start the connection // // m_client->connect(con); @@ -350,8 +373,8 @@ void Connection::on_read(beast::error_code ec, std::size_t bytes_transferred) // // void Connection::onWebSocketMessage(ConnectionHDL hdl, Client::message_ptr msg) { // // std::cout << "Incoming message." << std::endl; -// // // ... Process incoming message (payload is in msg->get_payload()) -// // } +// // // ... Process incoming message (payload is in msg->get_payload()) +// // } // #include "Connection.h" @@ -359,7 +382,7 @@ void Connection::on_read(beast::error_code ec, std::size_t bytes_transferred) // // Initialize the client // m_client.init_asio(); // m_client.set_access_channels(websocketpp::log::alevel::all); -// m_client.clear_access_channels(websocketpp::log::alevel::frame_payload); +// m_client.clear_access_channels(websocketpp::log::alevel::frame_payload); // // m_client.clear_access_channels(websocketpp::log::alevel::all); // // m_client.set_access_channels(websocketpp::log::alevel::connect); // // m_client.set_access_channels(websocketpp::log::alevel::disconnect); @@ -390,7 +413,6 @@ void Connection::on_read(beast::error_code ec, std::size_t bytes_transferred) // // Connect // m_client.connect(con); - // // Start ASIO io_service run loop in the background // // Note: In a real application, consider running this on a separate thread // m_client.run(); @@ -444,4 +466,3 @@ void Connection::on_read(beast::error_code ec, std::size_t bytes_transferred) // std::cout << "WebSocket connection failed to open." << std::endl; // // You can attempt a reconnect here or notify the user/application // } - diff --git a/src/ParsingEngine.cpp b/src/ParsingEngine.cpp index f15a281..df08278 100644 --- a/src/ParsingEngine.cpp +++ b/src/ParsingEngine.cpp @@ -1,38 +1,38 @@ -// // Static Class (Pass Message -> Get Object) +// Static Class (Pass Message -> Get Object) -// #include "ParsingEngine.h" -// #include -// #include +#include "ParsingEngine.h" +#include +#include -// namespace scccpp -// { +namespace scccpp +{ - -// int ParsingEngine::get_parse( std::string channel, std::string data, std::string event, int msgCounter) { + // Data here could be null for #subscribe events + int ParsingEngine::get_parse( std::string channel, std::string data, std::string event, int msgCounter) { -// Json::Value root; -// json_template >> root; + Json::Value root; + json_template >> root; -// // json_object *dataObj = json_tokener_parse((char *)data.c_str()); -// // json_object *jobj = json_object_new_object(); -// // json_object *eventobject = json_object_new_string("#publish"); -// // json_object *jobj1 = json_object_new_object(); -// // json_object *cnt = json_object_new_int(++msgCounter); -// // json_object *channelobject = json_object_new_string(channel.c_str()); -// // json_object_object_add(jobj1, "channel", channelobject); -// // json_object_object_add(jobj1, "data", dataObj); -// // json_object_object_add(jobj, "event", eventobject); -// // json_object_object_add(jobj, "data", jobj1); -// // json_object_object_add(jobj, "cid", cnt); -// // message_queue->enqueue((char *)json_object_to_json_string(jobj)); -// // json_object_put(jobj); - -// return 1; -// } + // json_object *dataObj = json_tokener_parse((char *)data.c_str()); + // json_object *jobj = json_object_new_object(); + // json_object *eventobject = json_object_new_string("#publish"); + // json_object *jobj1 = json_object_new_object(); + // json_object *cnt = json_object_new_int(++msgCounter); + // json_object *channelobject = json_object_new_string(channel.c_str()); + // json_object_object_add(jobj1, "channel", channelobject); + // json_object_object_add(jobj1, "data", dataObj); + // json_object_object_add(jobj, "event", eventobject); + // json_object_object_add(jobj, "data", jobj1); + // json_object_object_add(jobj, "cid", cnt); + // message_queue->enqueue((char *)json_object_to_json_string(jobj)); + // json_object_put(jobj); + + return 1; + } -// } // namespace scccpp +} // namespace scccpp diff --git a/src/SocketClusterClient.cpp b/src/SocketClusterClient.cpp index 280fa75..babbb19 100644 --- a/src/SocketClusterClient.cpp +++ b/src/SocketClusterClient.cpp @@ -13,11 +13,9 @@ SocketClusterClient::~SocketClusterClient() { } std::shared_ptr SocketClusterClient::createConnection(const char * url, const char * port) { - boost::asio::io_context the_io_context; auto connection = std::make_shared(the_io_context); - std::thread socketThread = connection->launch_socket(url, port); - socketThread.detach(); + // connection->launch_socket(url, port); m_connections.push_back(connection); return connection; } diff --git a/test/examples/client_classed.cpp b/test/examples/client_classed.cpp index 2283ce8..675b0b6 100644 --- a/test/examples/client_classed.cpp +++ b/test/examples/client_classed.cpp @@ -7,10 +7,10 @@ // These callbacks are not responsible for freeing the memory of the data. // The data is freed by the client class. -// void test(std::string event, json_object *data) -// { -// std::cout << "In the event callback, data: " << json_object_to_json_string(data) << std::endl; -// } +void test(std::string event, Json::Value data) +{ + std::cout << "In the event callback, data: " << json_object_to_json_string(data) << std::endl; +} // void test_publish(std::shared_ptr theWebsocket) // { @@ -26,6 +26,16 @@ int main() // boost::asio::io_context the_io_context; std::shared_ptr client = std::make_shared(); auto socket = client->createConnection(ENDPOINT, "8000"); + std::thread socketThread = socket->launch_socket(ENDPOINT, "8000"); + + Json::Value test; + test["amIAlive"] = true; + Json::StreamWriterBuilder builder; + const std::string output = Json::writeString(builder, test); + + socket->subscribe("test") + + socket->publish("test", output); // Go ahead and continue running and doing stuff in the main thread after launching socket. @@ -48,6 +58,9 @@ int main() // // Close the socket // theWebsocket->stop(); // socketThread.join(); + socketThread.join(); + + std::cout << "Example ending" << std::endl; return EXIT_SUCCESS; } \ No newline at end of file From 56db7bc7334792dba1a1faac11e9d12d5499580b Mon Sep 17 00:00:00 2001 From: Daniel Cloran Date: Sun, 14 Apr 2024 15:59:25 -0400 Subject: [PATCH 3/9] working version --- include/ParsingEngine.h | 36 ++++++++++----------- src/ParsingEngine.cpp | 54 ++++++++++++++++---------------- test/examples/client_classed.cpp | 10 +++--- 3 files changed, 50 insertions(+), 50 deletions(-) diff --git a/include/ParsingEngine.h b/include/ParsingEngine.h index 27d0cd6..91bc07b 100644 --- a/include/ParsingEngine.h +++ b/include/ParsingEngine.h @@ -1,27 +1,27 @@ -#ifndef PARSING_ENGINE_HPP -#define PARSING_ENGINE_HPP +// #ifndef PARSING_ENGINE_HPP +// #define PARSING_ENGINE_HPP -#include "json/json.h" +// #include "json/json.h" -namespace scccpp -{ +// namespace scccpp +// { - class ParsingEngine - { - private: - const std::istream json_template("{\"event\":\"#handshake\",\"data\":{\"authToken\":null},\"cid\": 1 }"); - public: - ParsingEngine(/* args */); - ~ParsingEngine(); +// class ParsingEngine +// { +// private: +// const std::istream json_template("{\"event\":\"#handshake\",\"data\":{\"authToken\":null},\"cid\": 1 }"); +// public: +// ParsingEngine(/* args */); +// ~ParsingEngine(); - int get_parse( std::string channel, std::string data, std::string event, int msgCounter); +// int get_parse( std::string channel, std::string data, std::string event, int msgCounter); - }; +// }; - ParsingEngine::ParsingEngine(/* args */) {} - ParsingEngine::~ParsingEngine(){} +// ParsingEngine::ParsingEngine(/* args */) {} +// ParsingEngine::~ParsingEngine(){} -} // namespace scccpp +// } // namespace scccpp @@ -30,4 +30,4 @@ namespace scccpp -#endif \ No newline at end of file +// #endif \ No newline at end of file diff --git a/src/ParsingEngine.cpp b/src/ParsingEngine.cpp index df08278..f807b59 100644 --- a/src/ParsingEngine.cpp +++ b/src/ParsingEngine.cpp @@ -1,38 +1,38 @@ -// Static Class (Pass Message -> Get Object) +// // Static Class (Pass Message -> Get Object) -#include "ParsingEngine.h" -#include -#include +// #include "ParsingEngine.h" +// #include +// #include -namespace scccpp -{ +// namespace scccpp +// { - // Data here could be null for #subscribe events - int ParsingEngine::get_parse( std::string channel, std::string data, std::string event, int msgCounter) { +// // Data here could be null for #subscribe events +// int ParsingEngine::get_parse( std::string channel, std::string data, std::string event, int msgCounter) { - Json::Value root; - json_template >> root; +// Json::Value root; +// json_template >> root; - // json_object *dataObj = json_tokener_parse((char *)data.c_str()); - // json_object *jobj = json_object_new_object(); - // json_object *eventobject = json_object_new_string("#publish"); - // json_object *jobj1 = json_object_new_object(); - // json_object *cnt = json_object_new_int(++msgCounter); - // json_object *channelobject = json_object_new_string(channel.c_str()); - // json_object_object_add(jobj1, "channel", channelobject); - // json_object_object_add(jobj1, "data", dataObj); - // json_object_object_add(jobj, "event", eventobject); - // json_object_object_add(jobj, "data", jobj1); - // json_object_object_add(jobj, "cid", cnt); - // message_queue->enqueue((char *)json_object_to_json_string(jobj)); - // json_object_put(jobj); - - return 1; - } +// // json_object *dataObj = json_tokener_parse((char *)data.c_str()); +// // json_object *jobj = json_object_new_object(); +// // json_object *eventobject = json_object_new_string("#publish"); +// // json_object *jobj1 = json_object_new_object(); +// // json_object *cnt = json_object_new_int(++msgCounter); +// // json_object *channelobject = json_object_new_string(channel.c_str()); +// // json_object_object_add(jobj1, "channel", channelobject); +// // json_object_object_add(jobj1, "data", dataObj); +// // json_object_object_add(jobj, "event", eventobject); +// // json_object_object_add(jobj, "data", jobj1); +// // json_object_object_add(jobj, "cid", cnt); +// // message_queue->enqueue((char *)json_object_to_json_string(jobj)); +// // json_object_put(jobj); + +// return 1; +// } -} // namespace scccpp +// } // namespace scccpp diff --git a/test/examples/client_classed.cpp b/test/examples/client_classed.cpp index 675b0b6..f13caff 100644 --- a/test/examples/client_classed.cpp +++ b/test/examples/client_classed.cpp @@ -7,10 +7,10 @@ // These callbacks are not responsible for freeing the memory of the data. // The data is freed by the client class. -void test(std::string event, Json::Value data) -{ - std::cout << "In the event callback, data: " << json_object_to_json_string(data) << std::endl; -} +// void test(std::string event, Json::Value data) +// { +// std::cout << "In the event callback, data: " << json_object_to_json_string(data) << std::endl; +// } // void test_publish(std::shared_ptr theWebsocket) // { @@ -33,7 +33,7 @@ int main() Json::StreamWriterBuilder builder; const std::string output = Json::writeString(builder, test); - socket->subscribe("test") + // socket->subscribe("test") socket->publish("test", output); From 1342e3c80f1b532170b069f235f2f8e902029039 Mon Sep 17 00:00:00 2001 From: Alec Pannunzio Date: Sun, 14 Apr 2024 17:22:37 -0400 Subject: [PATCH 4/9] paired pregramming parsing engine to handle Json::value types Co-authored-by: Daniel Cloran Co-authored-by: Garrett Chandler --- .vscode/settings.json | 10 +- CMakeLists.txt | 4 +- include/Connection.h | 2 +- include/ParsingEngine.h | 41 ++--- src/Connection.cpp | 274 +++---------------------------- src/ParsingEngine.cpp | 71 ++++---- test/examples/client_classed.cpp | 19 ++- 7 files changed, 97 insertions(+), 324 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index b3a9770..e1c6a83 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -95,7 +95,15 @@ "cfenv": "cpp", "typeindex": "cpp", "forward_list": "cpp", - "unordered_set": "cpp" + "unordered_set": "cpp", + "csetjmp": "cpp", + "barrier": "cpp", + "slist": "cpp", + "latch": "cpp", + "ranges": "cpp", + "shared_mutex": "cpp", + "syncstream": "cpp", + "valarray": "cpp" }, "cmake.configureOnOpen": true } \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 4c09741..f552f08 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,12 +22,12 @@ FetchContent_MakeAvailable(JsonCpp) # ) # FetchContent_MakeAvailable(Boost) -find_package(Boost 1.76.0 REQUIRED) # header only libraries must not be added here +find_package(Boost 1.74.0 REQUIRED) # header only libraries must not be added here # Find source files file(GLOB_RECURSE LIBRARY_SOURCES src/*.cpp) add_library(SocketClusterClientCPP SHARED ${LIBRARY_SOURCES}) -target_link_libraries(SocketClusterClientCPP PRIVATE jsoncpp_static) +target_link_libraries(SocketClusterClientCPP PUBLIC jsoncpp_lib) # Update include directories to find json-c headers target_include_directories(SocketClusterClientCPP PUBLIC diff --git a/include/Connection.h b/include/Connection.h index e11e1c5..3efb431 100644 --- a/include/Connection.h +++ b/include/Connection.h @@ -17,7 +17,7 @@ #include "utility/ThreadSafeList.h" // Callback and Subscription types -typedef std::function socketCallback; +typedef std::function socketCallback; typedef std::tuple subscription; namespace beast = boost::beast; diff --git a/include/ParsingEngine.h b/include/ParsingEngine.h index 91bc07b..0a50921 100644 --- a/include/ParsingEngine.h +++ b/include/ParsingEngine.h @@ -1,33 +1,18 @@ -// #ifndef PARSING_ENGINE_HPP -// #define PARSING_ENGINE_HPP +#ifndef PARSING_ENGINE_HPP +#define PARSING_ENGINE_HPP -// #include "json/json.h" +#include "json/json.h" -// namespace scccpp -// { +class ParsingEngine +{ +private: + // const std::istream json_template("{\"event\":\"#handshake\",\"data\":{\"authToken\":null},\"cid\": 1 }"); +public: + ParsingEngine(/* args */) {}; + ~ParsingEngine() {}; -// class ParsingEngine -// { -// private: -// const std::istream json_template("{\"event\":\"#handshake\",\"data\":{\"authToken\":null},\"cid\": 1 }"); -// public: -// ParsingEngine(/* args */); -// ~ParsingEngine(); + static const std::string get_parse( std::string channel, std::string data, std::string event, int msgCounter); +}; -// int get_parse( std::string channel, std::string data, std::string event, int msgCounter); - -// }; -// ParsingEngine::ParsingEngine(/* args */) {} -// ParsingEngine::~ParsingEngine(){} - -// } // namespace scccpp - - - - - - - - -// #endif \ No newline at end of file +#endif \ No newline at end of file diff --git a/src/Connection.cpp b/src/Connection.cpp index 3cab17f..312e3fd 100644 --- a/src/Connection.cpp +++ b/src/Connection.cpp @@ -1,9 +1,5 @@ #include "Connection.h" -// // #include // For error codes -// // #include // Temporary, for basic error reporting - -// TODO: create context -// +#include "ParsingEngine.h" Connection::Connection(net::io_context &ioc) : resolver_(net::make_strand(ioc)), ws_(net::make_strand(ioc)), the_io_context(ioc) { @@ -153,7 +149,7 @@ void Connection::message_processing() net::post(the_io_context, [this, message] { ws_.async_write(net::buffer(message), beast::bind_front_handler(&Connection::on_write, shared_from_this())); - std::cout << "Wrote: " << message << std::endl; }); + /* std::cout << "Wrote: " << message << std::endl; */ }); } } @@ -183,8 +179,6 @@ void Connection::on_read(beast::error_code ec, std::size_t bytes_transferred) else { std::string s(boost::asio::buffer_cast(buffer_.data()), buffer_.size()); - - std::cout << "Response From Server: " << s << std::endl; Json::Value root; Json::Reader reader; bool parsingSuccessful = reader.parse(s.c_str(), root); @@ -194,26 +188,26 @@ void Connection::on_read(beast::error_code ec, std::size_t bytes_transferred) } else { - // std:: cout << root["data"].asString() << std::endl; - // Json::Value msgData; - - // // int exists = json_object_object_get_ex(jobj, "data", &msgData); - // if (root.isMember("data")) - // { - // std::string channel = root["channel"].asString(); - // Json::Value data = root["data"]; - - // if (!channel.empty()) - // { - // std::shared_ptr sub = subscription_list->find_first_if([channel](subscription const &t) - // { return std::get<0>(t) == channel; }); - // if (sub != nullptr) - // { - // std::get<1>(*sub)(channel, data); - // } - // } - // } - // json_object_put(jobj); + if (root.isMember("data")) + { + if (!root["data"]["channel"].empty()) { + std::string channel = root["data"]["channel"].asString(); + Json::Value data = root["data"]["data"]; + + // Json::StreamWriterBuilder builder; + // std::cout << "Response from server, Channel: " << channel << "Data: " << Json::writeString(builder, data) << std::endl; + + if (!channel.empty()) + { + std::shared_ptr sub = subscription_list->find_first_if([channel](subscription const &t) + { return std::get<0>(t) == channel; }); + if (sub != nullptr) + { + std::get<1>(*sub)(channel, data); + } + } + } + } } } buffer_.clear(); @@ -221,28 +215,14 @@ void Connection::on_read(beast::error_code ec, std::size_t bytes_transferred) ws_.async_read(buffer_, beast::bind_front_handler(&Connection::on_read, shared_from_this())); } -// void Connection::publish(std::string channel, Json::Value* data) { -// // publish(channel, json_object_to_json_string(data)); -// } - void Connection::subscribe(std::string channel, socketCallback callback) { std::shared_ptr sub = subscription_list->find_first_if([channel](subscription const &t) { return std::get<0>(t) == channel; }); if (sub == nullptr) { - - // json_object *jobj = json_object_new_object(); - // json_object *eventobject = json_object_new_string("#subscribe"); - // json_object *jobj1 = json_object_new_object(); - // json_object *channelobject = json_object_new_string(channel.c_str()); - // json_object_object_add(jobj, "event", eventobject); - // json_object_object_add(jobj1, "channel", channelobject); - // json_object_object_add(jobj, "data", jobj1); - // json_object *cnt = json_object_new_int(++msgCounter); - // json_object_object_add(jobj, "cid", cnt); - // message_queue->enqueue(json_object_to_json_string(jobj)); - // json_object_put(jobj); + const std::string output = ParsingEngine::get_parse(channel, "", "#subscribe", ++msgCounter); + message_queue->enqueue(output); subscription_list->push_front(std::make_tuple(channel, callback)); std::cout << "Subscribed to " << channel << "." << std::endl; } @@ -261,208 +241,6 @@ void Connection::unsubscribe(std::string channel) { void Connection::publish(std::string channel, std::string data) { std::cout << "Publishing: " << channel << ":" << data << std::endl; - // Json::Value output = ParsingEngine.get_json_object(channel, data, event, msgCounter) - - Json::Value userData; - Json::Reader reader; - - bool parse_succeeded = reader.parse(data, userData); - if (!parse_succeeded) - { - std::cout << reader.getFormattedErrorMessages(); - return 0; - } - - // { "event": "#publish", "data": { "channel": "test", "data": { "isCodeRunning": 1 } }, "cid": 3 } - - // { "channel" : "test", "event" : "#publish", "msgCounter" : 1} - // { "channel" : "test","data" : { "amIAlive" : true},"event" : "#publish","msgCounter" : 1 - - Json::Value obj; - - obj["event"] = "#publish"; - obj["channel"] = channel; - obj["cid"] = ++msgCounter; - obj["data"] = obj["channel"]; - - - Json::StreamWriterBuilder builder; - const std::string output = Json::writeString(builder, obj); - std::cout << "JSON OUTPUT: " << output << std::endl; - - message_queue->enqueue(output.c_str()); - - // "{\"event\":\"#handshake\",\"data\":{\"authToken\":null},\"cid\": 1 }" - - //////////////////// - - // json_object *dataObj = json_tokener_parse((char *)data.c_str()); - // json_object *jobj = json_object_new_object(); - // json_object *eventobject = json_object_new_string("#publish"); - // json_object *jobj1 = json_object_new_object(); - // json_object *cnt = json_object_new_int(++msgCounter); - // json_object *channelobject = json_object_new_string(channel.c_str()); - // json_object_object_add(jobj1, "channel", channelobject); - // json_object_object_add(jobj1, "data", dataObj); - // json_object_object_add(jobj, "event", eventobject); - // json_object_object_add(jobj, "data", jobj1); - // json_object_object_add(jobj, "cid", cnt); - // message_queue->enqueue((char *)json_object_to_json_string(jobj)); - // json_object_put(jobj); + const std::string output = ParsingEngine::get_parse(channel, data, "#publish", ++msgCounter); + message_queue->enqueue(output); // enqueues the output to the message queue } - -// // using websocketpp::lib::placeholders::_1; -// // using websocketpp::lib::placeholders::_2; -// // using websocketpp::lib::bind; - -// // Connection::Connection(const std::string& url, int port) : m_url(url), m_port(port) -// // { -// // // Initialize client endpoint (more setup might be needed later) -// // m_client = new Client(); -// // m_client->init_asio(); -// // m_client->set_access_channels(websocketpp::log::alevel::all); // Optional: suppress logs -// // } - -// // Connection::~Connection() { -// // // Ensure clean disconnect here (if connected) -// // } - -// // bool Connection::connect() { -// // // 1. Create a connection URI -// // std::cout << "Url: " << m_url << std::endl; -// // std::string uri = m_url + ":" + std::to_string(m_port); -// // std::cout << "Uri: " << uri << std::endl; - -// // ErrorCode ec; -// // ConnectionPtr con = m_client->get_connection(uri, ec); - -// // if (ec) { -// // std::cout << "Could not create connection: " << ec.message() << std::endl; -// // return false; -// // } - -// // m_client->set_open_handler(bind(&Connection::onWebSocketOpen, this, _1)); -// // m_client->set_close_handler(bind(&Connection::onWebSocketClose, this, _1)); -// // m_client->set_message_handler(bind(&Connection::onWebSocketMessage, this, _1, _2)); - -// // // 3. Start the connection -// // m_client->connect(con); - -// // // 4. Start the ASIO io_service run loop -// // m_client->run(); - -// // return true; // Replace with proper success/failure logic -// // } - -// // void Connection::close() { -// // // Add logic to gracefully close the WebSocket connection -// // } - -// // void Connection::send(const std::string& data) { -// // // Use m_client to send data over the WebSocket connection -// // } - -// // void Connection::onWebSocketOpen(ConnectionHDL hdl) { -// // m_connectionHdl = hdl; -// // std::cout << "Connection established." << std::endl; -// // } - -// // void Connection::onWebSocketClose(ConnectionHDL hdl) { -// // std::cout << "Connection closed." << std::endl; -// // } - -// // void Connection::onWebSocketMessage(ConnectionHDL hdl, Client::message_ptr msg) { -// // std::cout << "Incoming message." << std::endl; -// // // ... Process incoming message (payload is in msg->get_payload()) -// // } - -// #include "Connection.h" - -// Connection::Connection(const std::string& uri) : m_uri(uri), m_isConnected(false) { -// // Initialize the client -// m_client.init_asio(); -// m_client.set_access_channels(websocketpp::log::alevel::all); -// m_client.clear_access_channels(websocketpp::log::alevel::frame_payload); -// // m_client.clear_access_channels(websocketpp::log::alevel::all); -// // m_client.set_access_channels(websocketpp::log::alevel::connect); -// // m_client.set_access_channels(websocketpp::log::alevel::disconnect); -// // m_client.set_access_channels(websocketpp::log::alevel::app); - -// // Bind the handlers -// m_client.set_open_handler(std::bind(&Connection::on_open, this, std::placeholders::_1)); -// m_client.set_close_handler(std::bind(&Connection::on_close, this, std::placeholders::_1)); -// m_client.set_message_handler(std::bind(&Connection::on_message, this, std::placeholders::_1, std::placeholders::_2)); -// m_client.set_fail_handler(std::bind(&Connection::on_fail, this, std::placeholders::_1)); -// } - -// Connection::~Connection() { -// close(); -// } - -// ConnectionResult Connection::connect() { -// websocketpp::lib::error_code ec; -// ConnectionPtr con = m_client.get_connection(m_uri, ec); -// if (ec) { -// std::cout << "Could not create connection because: " << ec.message() << std::endl; -// return ConnectionResult::ERROR_UNKNOWN; -// } - -// // Save the connection handle for later -// m_connectionHdl = con->get_handle(); - -// // Connect -// m_client.connect(con); - -// // Start ASIO io_service run loop in the background -// // Note: In a real application, consider running this on a separate thread -// m_client.run(); - -// return ConnectionResult::SUCCESS; // You may want to adjust this based on actual success or failure -// } - -// void Connection::close() { -// if (m_isConnected) { -// websocketpp::lib::error_code ec; -// m_client.close(m_connectionHdl, websocketpp::close::status::normal, "", ec); -// if (ec) { -// std::cout << "Error closing WebSocket: " << ec.message() << std::endl; -// } -// } -// } - -// void Connection::send(const std::string& message) { -// if (m_isConnected) { -// m_client.send(m_connectionHdl, message, websocketpp::frame::opcode::text); -// } else { -// std::cout << "Cannot send message, WebSocket not connected." << std::endl; -// } -// } - -// void Connection::on_open(ConnectionHDL hdl) { -// m_isConnected = true; -// std::cout << "WebSocket connection opened." << std::endl; - -// // Here, you could also perform the initial handshake or send a message -// // Construct the handshake message -// std::string handshakeMsg = "{\"event\":\"#handshake\",\"data\":{\"authToken\":null},\"cid\":1}"; - -// // Send the handshake message -// std::cout << "trying to send handshake" << std::endl; -// send(handshakeMsg); -// } - -// void Connection::on_close(ConnectionHDL hdl) { -// m_isConnected = false; -// std::cout << "WebSocket connection closed." << std::endl; -// } - -// void Connection::on_message(ConnectionHDL hdl, MessagePtr msg) { -// std::cout << "Received message: " << msg->get_payload() << std::endl; -// // Here, you could process the message, parse JSON, etc. -// } - -// void Connection::on_fail(ConnectionHDL hdl) { -// m_isConnected = false; -// std::cout << "WebSocket connection failed to open." << std::endl; -// // You can attempt a reconnect here or notify the user/application -// } diff --git a/src/ParsingEngine.cpp b/src/ParsingEngine.cpp index f807b59..02cf84c 100644 --- a/src/ParsingEngine.cpp +++ b/src/ParsingEngine.cpp @@ -1,38 +1,37 @@ // // Static Class (Pass Message -> Get Object) - -// #include "ParsingEngine.h" -// #include -// #include - -// namespace scccpp -// { - -// // Data here could be null for #subscribe events -// int ParsingEngine::get_parse( std::string channel, std::string data, std::string event, int msgCounter) { - -// Json::Value root; -// json_template >> root; - -// // json_object *dataObj = json_tokener_parse((char *)data.c_str()); -// // json_object *jobj = json_object_new_object(); -// // json_object *eventobject = json_object_new_string("#publish"); -// // json_object *jobj1 = json_object_new_object(); -// // json_object *cnt = json_object_new_int(++msgCounter); -// // json_object *channelobject = json_object_new_string(channel.c_str()); -// // json_object_object_add(jobj1, "channel", channelobject); -// // json_object_object_add(jobj1, "data", dataObj); -// // json_object_object_add(jobj, "event", eventobject); -// // json_object_object_add(jobj, "data", jobj1); -// // json_object_object_add(jobj, "cid", cnt); -// // message_queue->enqueue((char *)json_object_to_json_string(jobj)); -// // json_object_put(jobj); - -// return 1; -// } - - -// } // namespace scccpp - - - +#include "ParsingEngine.h" +#include +#include + +// Data here could be null for #subscribe events +const std::string ParsingEngine::get_parse(std::string channel, std::string data, std::string event, int msgCounter) +{ + + Json::Value userData; + Json::Reader reader; + + if (!data.empty()) + { + bool parse_succeeded = reader.parse(data, userData); + if (!parse_succeeded) + { + std::cout << reader.getFormattedErrorMessages(); + return ""; + } + } + + Json::Value dataObj; + dataObj["channel"] = channel; + dataObj["data"] = userData; + + Json::Value obj; + obj["event"] = event; + obj["data"] = dataObj; + obj["cid"] = ++msgCounter; + + Json::StreamWriterBuilder builder; + const std::string output = Json::writeString(builder, obj); + // std::cout << "Parsing Engine Output: " << output << std::endl; + return output; +} diff --git a/test/examples/client_classed.cpp b/test/examples/client_classed.cpp index f13caff..f3fc568 100644 --- a/test/examples/client_classed.cpp +++ b/test/examples/client_classed.cpp @@ -7,10 +7,12 @@ // These callbacks are not responsible for freeing the memory of the data. // The data is freed by the client class. -// void test(std::string event, Json::Value data) -// { -// std::cout << "In the event callback, data: " << json_object_to_json_string(data) << std::endl; -// } +void test(std::string event, Json::Value data) +{ + Json::StreamWriterBuilder builder; + const std::string output = Json::writeString(builder, data); + std::cout << "In the event callback, data: " << output << std::endl; +} // void test_publish(std::shared_ptr theWebsocket) // { @@ -28,13 +30,14 @@ int main() auto socket = client->createConnection(ENDPOINT, "8000"); std::thread socketThread = socket->launch_socket(ENDPOINT, "8000"); - Json::Value test; - test["amIAlive"] = true; + Json::Value obj; + obj["amIAlive"] = true; Json::StreamWriterBuilder builder; - const std::string output = Json::writeString(builder, test); + const std::string output = Json::writeString(builder, obj); - // socket->subscribe("test") + socket->subscribe("test", test); + usleep(10000); socket->publish("test", output); From db9e5fc8add4709023d6c332c91724f1b7a53d65 Mon Sep 17 00:00:00 2001 From: Garrett Chandler Date: Wed, 17 Apr 2024 15:07:47 -0400 Subject: [PATCH 5/9] documentation: --- API.md | 92 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ README.md | 31 ++++++++++++++++++- 2 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 API.md diff --git a/API.md b/API.md new file mode 100644 index 0000000..13075d8 --- /dev/null +++ b/API.md @@ -0,0 +1,92 @@ +# API Manual + +## Installation + +The library requires the Boost Beast and Jsoncpp libraries to handle WebSocket communications and JSON data manipulation respectively. Install these dependencies using CMake if they are not already present on your system. + +### Required Libraries +- **Boost Beast** (version >= 1.76.0): Used for WebSocket implementation. Available at [boost.org](https://www.boost.org/). +- **Jsoncpp**: Used for JSON parsing and serialization. Known for its performance and ease of use in C++ environments. + +## Examples +Two simple examples are provided demonstrating the Library in a simple raw C and a C++ classed implementation. Compile the examples using: +```bash +- mkdir build && cd build +- cmake .. +- make +- ./example_client_simple or ./example_client_classed +``` + +## Classes +### Connection +The `Connection` class manages WebSocket connections using Boost Beast. It handles asynchronous read, write, and connection management activities necessary for real-time communication. + +#### Constructor +- `Connection(net::io_context& ioc)`: Initializes a new connection with a given I/O context. This context is used to handle all I/O operations for the WebSocket. + +#### Methods +- `std::thread launch_socket(const char *host, const char *port)`: Starts the WebSocket connection on a separate thread. +- `void stop()`: Stops the WebSocket connection and cleans up resources. +- `void subscribe(std::string channel, socketCallback callback)`: Subscribes to a specific channel with a callback to handle incoming messages. +- `void unsubscribe(std::string channel)`: Unsubscribes from a specific channel. +- `void publish(std::string channel, std::string data)`: Publishes data to a specific channel. +- `void message_processing()`: Handles the internal message processing in its thread. + +#### Callbacks +- `socketCallback`: A function type that handles incoming messages. Takes an event as `std::string` and data as `Json::Value`. + +#### Members +- `websocket::stream ws_`: WebSocket stream for the connection. +- `beast::flat_buffer buffer_`: Buffer used for reading WebSocket messages. + +### SocketClusterClient + +The `SocketClusterClient` class manages multiple WebSocket connections and provides methods to create and retrieve these connections. + +#### Constructor +- `SocketClusterClient()`: Initializes a new client capable of handling WebSocket connections. + +#### Methods +- `std::shared_ptr createConnection(const char *url, const char *port)`: Creates and returns a new connection to the specified URL and port. +- `std::list>& getConnections()`: Returns a list of all active connections. + +#### Members +- `std::list> m_connections`: List storing all managed connections. + + +## Errors & Exceptions +- `1000 - Normal Closure` : Connection closed successfully. +- `1001 - Going Away` : Server or client is shutting down. +- `1002 - Protocol Error` : Protocol violation detected. +- `1003 - Unsupported Data` : The endpoint received data of a type it cannot accept. +- `1006 - Abnormal Closure` : Connection closed abnormally without a status code. + + +## Example Usage of SocketClusterClient Library + +This example demonstrates how to use the `SocketClusterClient` and `Connection` classes to connect to a WebSocket server, subscribe to a channel, and handle incoming messages. + +```cpp +#include +#include "SocketClusterClient.h" + +// Define a callback function to handle messages received on the WebSocket +void handleMessage(std::string event, Json::Value data) { + std::cout << "Event: " << event << "\nMessage received: " << data.toStyledString() << std::endl; +} + +int main() { + // Create a client instance + SocketClusterClient client; + + // Create a new WebSocket connection to the desired host and port + auto connection = client.createConnection("ws://example.com", "80"); + + // Subscribe to a channel with the defined callback + connection->subscribe("exampleChannel", handleMessage); + + // Publish to a channel + connection->publish("exampleChannel", "Hello World!"); + + return 0; +} diff --git a/README.md b/README.md index 5655b37..ee12af6 100644 --- a/README.md +++ b/README.md @@ -1 +1,30 @@ -# socketcluster-client-cpp \ No newline at end of file +# Socket Cluster C++ Client + +This repository hosts a [SocketCluster](https://socketcluster.io/) C++ Client designed to facilitate communication between C++ applications and SocketCluster servers. This client supports real-time, scalable, bi-directional communication, making it ideal for applications requiring high performance and efficient data exchange. +S +This client is developed using the [Boost Beast](https://github.com/boostorg/beast) and [jsoncpp](https://github.com/open-source-parsers/jsoncpp) libraries in C++. + +## Features + +- **Real-Time Communication**: Enables real-time connectivity with SocketCluster servers. +- **Bi-Directional Communication**: Supports both sending and receiving messages efficiently. +- **Scalability**: Designed to handle high-load scenarios, making it suitable for large-scale deployments. +- **WebSocket Support**: Utilizes WebSockets for low-latency communication. + + +## Getting Starting +A detailed list of the libraries API can be found [here](API.md). + +## Security / SSL +#### Tokens +To address the threat of unauthenticated connections, we will utilize SocketCluster's built-in JWT-based authentication mechanism. Each JWT is uniquely signed using a server-specific authKey, ensuring secure and verified connections right from the initial handshake. Follow the guide [here](https://socketcluster.io/docs/authentication/) to enable JWTs with SocketCluster. +#### WSS +To allow development and production runs a flag can be set to enable SSL assuming the SocketCluster server has been configured to accept SSL connections. To best achieve this a flag can be set in the client to put the data transfer into the secure mode. +```cpp +#define SOCKETCLUSTER_SLL 1 +``` + +## Performance +The maximum output and input rates have not been tested yet. This document will be updated with statistics after tests have been run. **Both example programs have been profiled and show no memory leaks.** + +The SocketCluster server has been thoroughly tested in an [academic paper](https://arxiv.org/pdf/1409.3367.pdf). This client library aims to match the results listed. From 3ad7164997dfcc0f5176886227bab605e302f387 Mon Sep 17 00:00:00 2001 From: Garrett Chandler Date: Wed, 17 Apr 2024 15:15:45 -0400 Subject: [PATCH 6/9] documentation --- CMakeLists.txt | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index f552f08..4cf7ed2 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,14 +13,14 @@ FetchContent_Declare(JsonCpp FetchContent_MakeAvailable(JsonCpp) # === === -# set(BOOST_ENABLE_CMAKE ON) -# include(FetchContent) -# FetchContent_Declare( -# Boost -# GIT_REPOSITORY https://github.com/boostorg/boost.git -# GIT_TAG boost-1.80.0 -# ) -# FetchContent_MakeAvailable(Boost) +set(BOOST_ENABLE_CMAKE ON) +include(FetchContent) +FetchContent_Declare( +Boost +GIT_REPOSITORY https://github.com/boostorg/boost.git +GIT_TAG boost-1.74.0 +) +FetchContent_MakeAvailable(Boost) find_package(Boost 1.74.0 REQUIRED) # header only libraries must not be added here From 2b339e79ac26bd6928eb71d60cd9ad06e3d335da Mon Sep 17 00:00:00 2001 From: Garrett Chandler Date: Wed, 17 Apr 2024 15:33:35 -0400 Subject: [PATCH 7/9] "cmake" --- CMakeLists.txt | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4cf7ed2..4dca352 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,13 +16,16 @@ FetchContent_MakeAvailable(JsonCpp) set(BOOST_ENABLE_CMAKE ON) include(FetchContent) FetchContent_Declare( -Boost -GIT_REPOSITORY https://github.com/boostorg/boost.git -GIT_TAG boost-1.74.0 + Boost + GIT_REPOSITORY https://github.com/boostorg/boost.git + GIT_TAG boost-1.74.0 ) FetchContent_MakeAvailable(Boost) find_package(Boost 1.74.0 REQUIRED) # header only libraries must not be added here +IF(Boost_FOUND) + include_directories(${Boost_INCLUDE_DIRS}) +ENDIF(Boost_FOUND) # Find source files file(GLOB_RECURSE LIBRARY_SOURCES src/*.cpp) From 56755a9541b29348e10568a8198c5d597d49e708 Mon Sep 17 00:00:00 2001 From: Garrett Chandler Date: Wed, 17 Apr 2024 15:48:27 -0400 Subject: [PATCH 8/9] boost cmake fix: --- CMakeLists.txt | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4dca352..4983f68 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,19 +13,18 @@ FetchContent_Declare(JsonCpp FetchContent_MakeAvailable(JsonCpp) # === === +# set(BOOST_INCLUDE_LIBRARIES thread filesystem system program_options) set(BOOST_ENABLE_CMAKE ON) include(FetchContent) FetchContent_Declare( Boost GIT_REPOSITORY https://github.com/boostorg/boost.git GIT_TAG boost-1.74.0 + GIT_SHALLOW TRUE ) FetchContent_MakeAvailable(Boost) -find_package(Boost 1.74.0 REQUIRED) # header only libraries must not be added here -IF(Boost_FOUND) - include_directories(${Boost_INCLUDE_DIRS}) -ENDIF(Boost_FOUND) +find_package(Boost 1.74.0 REQUIRED) # Find source files file(GLOB_RECURSE LIBRARY_SOURCES src/*.cpp) From cfc6ba42a7302f77a83a6e4ce81dbfc7350c9864 Mon Sep 17 00:00:00 2001 From: Garrett Chandler Date: Wed, 17 Apr 2024 15:54:11 -0400 Subject: [PATCH 9/9] boost cmake fix: --- .github/workflows/cpp-ci.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/cpp-ci.yml b/.github/workflows/cpp-ci.yml index a677a1e..23e2b85 100644 --- a/.github/workflows/cpp-ci.yml +++ b/.github/workflows/cpp-ci.yml @@ -13,6 +13,8 @@ jobs: steps: - uses: actions/checkout@v3 + - name: Install Boost + run: sudo apt-get install libboost-all-dev - name: Install dependencies run: sudo apt-get update && sudo apt-get install -y build-essential cmake - name: Configure CMake