diff --git a/CMakeLists.txt b/CMakeLists.txt index 5bd8af9..a52eeae 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -96,6 +96,7 @@ list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake/ecaludp-module) # Add samples, if enabled if (ECALUDP_BUILD_SAMPLES) add_subdirectory(samples/ecaludp_sample) + add_subdirectory(samples/ecaludp_perftool) if (ECALUDP_ENABLE_NPCAP) add_subdirectory(samples/ecaludp_sample_npcap) endif() diff --git a/ecaludp/include_with_udpcap/ecaludp/socket_npcap.h b/ecaludp/include_with_udpcap/ecaludp/socket_npcap.h index fe2fdfe..e1c5870 100644 --- a/ecaludp/include_with_udpcap/ecaludp/socket_npcap.h +++ b/ecaludp/include_with_udpcap/ecaludp/socket_npcap.h @@ -90,12 +90,11 @@ namespace ecaludp ECALUDP_EXPORT std::shared_ptr receive_from(asio::ip::udp::endpoint& sender_endpoint, ecaludp::Error& error); ECALUDP_EXPORT void async_receive_from(asio::ip::udp::endpoint& sender_endpoint - , const std::function&, ecaludp::Error)>& completion_handler); - + , const std::function&, const ecaludp::Error&)>& completion_handler); private: void receive_next_datagram_from(asio::ip::udp::endpoint& sender_endpoint - , const std::function&, ecaludp::Error)>& completion_handler); + , const std::function&, const ecaludp::Error&)>& completion_handler); std::shared_ptr handle_datagram(const std::shared_ptr& buffer , const std::shared_ptr& sender_endpoint diff --git a/ecaludp/src/async_udpcap_socket.cpp b/ecaludp/src/async_udpcap_socket.cpp index f7f6ec6..3ce1fbe 100644 --- a/ecaludp/src/async_udpcap_socket.cpp +++ b/ecaludp/src/async_udpcap_socket.cpp @@ -72,7 +72,7 @@ namespace ecaludp , size_t max_buffer_size , Udpcap::HostAddress& sender_address , uint16_t& sender_port - , const std::function& read_handler) + , const std::function& read_handler) { const std::unique_lock lock(wait_thread_trigger_mutex_); async_receive_from_parameters_queue_.push_back({ buffer, max_buffer_size, &sender_address, &sender_port, read_handler }); diff --git a/ecaludp/src/async_udpcap_socket.h b/ecaludp/src/async_udpcap_socket.h index 8ddb509..c2cdfd8 100644 --- a/ecaludp/src/async_udpcap_socket.h +++ b/ecaludp/src/async_udpcap_socket.h @@ -68,13 +68,13 @@ namespace ecaludp , size_t max_buffer_size , Udpcap::HostAddress& sender_address , uint16_t& sender_port - , ecaludp::Error& error); + , ecaludp::Error& error); void asyncReceiveFrom( char* buffer , size_t max_buffer_size , Udpcap::HostAddress& sender_address , uint16_t& sender_port - , const std::function& read_handler); + , const std::function& read_handler); private: static void toEcaludpError(const Udpcap::Error& udpcap_error, ecaludp::Error& ecaludp_error); @@ -95,7 +95,7 @@ namespace ecaludp size_t max_buffer_size_; Udpcap::HostAddress* sender_address_; uint16_t* sender_port_; - std::function read_handler_; + std::function read_handler_; }; Udpcap::UdpcapSocket udpcap_socket_; diff --git a/ecaludp/src/socket_npcap.cpp b/ecaludp/src/socket_npcap.cpp index b8b693b..cacdab2 100644 --- a/ecaludp/src/socket_npcap.cpp +++ b/ecaludp/src/socket_npcap.cpp @@ -141,13 +141,13 @@ namespace ecaludp void SocketNpcap::async_receive_from(asio::ip::udp::endpoint& sender_endpoint - , const std::function&, ecaludp::Error)>& completion_handler) + , const std::function&, const ecaludp::Error&)>& completion_handler) { receive_next_datagram_from(sender_endpoint, completion_handler); } void SocketNpcap::receive_next_datagram_from(asio::ip::udp::endpoint& sender_endpoint - , const std::function&, ecaludp::Error)>& completion_handler) + , const std::function&, const ecaludp::Error&)>& completion_handler) { auto datagram_buffer = datagram_buffer_pool_->allocate(); @@ -164,7 +164,7 @@ namespace ecaludp , buffer->size() , *sender_address , *sender_port - , [this, buffer, completion_handler, sender_address, sender_port, &sender_endpoint](ecaludp::Error& error, size_t bytes_received) + , [this, buffer, completion_handler, sender_address, sender_port, &sender_endpoint](const ecaludp::Error& error, size_t bytes_received) { if (error) { diff --git a/samples/ecaludp_perftool/.clang-tidy b/samples/ecaludp_perftool/.clang-tidy new file mode 100644 index 0000000..2d7e379 --- /dev/null +++ b/samples/ecaludp_perftool/.clang-tidy @@ -0,0 +1,7 @@ +--- +# This disables the warning of non-private members. We use those for inheritance here (with protected visibility). + +Checks: "-cppcoreguidelines-non-private-member-variables-in-classes, +" + +InheritParentConfig: true diff --git a/samples/ecaludp_perftool/CMakeLists.txt b/samples/ecaludp_perftool/CMakeLists.txt new file mode 100644 index 0000000..7225c02 --- /dev/null +++ b/samples/ecaludp_perftool/CMakeLists.txt @@ -0,0 +1,65 @@ +################################################################################ +# Copyright (c) 2024 Continental Corporation +# +# This program and the accompanying materials are made available under the +# terms of the Apache License, Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# SPDX-License-Identifier: Apache-2.0 +################################################################################ + +cmake_minimum_required(VERSION 3.13) + +project(ecaludp_perftool) + +find_package(Threads REQUIRED) +find_package(ecaludp REQUIRED) + +set(sources + src/main.cpp + src/receiver.cpp + src/receiver.h + src/receiver_async.cpp + src/receiver_async.h + src/receiver_parameters.h + src/receiver_sync.cpp + src/receiver_sync.h + src/sender.cpp + src/sender.h + src/sender_async.cpp + src/sender_async.h + src/sender_parameters.h + src/sender_sync.cpp + src/sender_sync.h + src/socket_builder_asio.cpp + src/socket_builder_asio.h +) +if (${ECALUDP_ENABLE_NPCAP}) + list (APPEND sources + src/receiver_npcap_async.cpp + src/receiver_npcap_async.h + src/receiver_npcap_sync.cpp + src/receiver_npcap_sync.h + src/socket_builder_npcap.cpp + src/socket_builder_npcap.h + ) +endif() + +add_executable(${PROJECT_NAME} ${sources}) + +target_link_libraries(${PROJECT_NAME} + PRIVATE + ecaludp::ecaludp + Threads::Threads) + +target_compile_features(${PROJECT_NAME} PRIVATE cxx_std_14) + +source_group(TREE "${CMAKE_CURRENT_SOURCE_DIR}" FILES + ${sources} +) diff --git a/samples/ecaludp_perftool/Readme.md b/samples/ecaludp_perftool/Readme.md new file mode 100644 index 0000000..bb84bb3 --- /dev/null +++ b/samples/ecaludp_perftool/Readme.md @@ -0,0 +1,25 @@ +# ecaludp-perftool + +ecaludp_perftool is a simple tool for sending and receiving data using the eCAL protocol. + +``` +Usage: + ecaludp_perftool [PARAMETERS] + +With IMPLEMENTATION one of: + send Asio-based sender using send_to in a while-loop + sendasync Asio-based sender using async_send_to + receive Asio-based receiver using receive_from in a while-loop + receiveasync Asio-based receiver using async_receive_from + receivenpcap Npcap-based receiver using receive_from in a while-loop + receivenpcapasync Npcap-based receiver using async_receive_from + +Options: + -h, --help Show this help message and exit + + -i, --ip IP address to send to / receive from. Default to 127.0.0.1 + -p, --port Port to send to / receive from. Default to 14000 + -s, --size Message size to send. Default to 0 (-> empty messages) + -m, --max-udp-datagram-size Maximum UDP datagram size + -b, --buffer-size Buffer size for sending & receiving messages +``` \ No newline at end of file diff --git a/samples/ecaludp_perftool/src/main.cpp b/samples/ecaludp_perftool/src/main.cpp new file mode 100644 index 0000000..4fdd3fd --- /dev/null +++ b/samples/ecaludp_perftool/src/main.cpp @@ -0,0 +1,335 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include // IWYU pragma: keep + +#include "receiver.h" +#include "receiver_async.h" +#include "receiver_parameters.h" +#include "receiver_sync.h" +#include "sender.h" +#include "sender_async.h" +#include "sender_parameters.h" +#include "sender_sync.h" + +#if ECALUDP_UDPCAP_ENABLED + #include "receiver_npcap_sync.h" + #include "receiver_npcap_async.h" +#endif // ECALUDP_UDPCAP_ENABLED + +enum class Implementation +{ + NONE, + SEND, + SENDASYNC, + RECEIVE, + RECEIVEASYNC, + RECEIVENPCAP, + RECEIVENPCAPASYNC +}; + +void printUsage(const std::string& arg0) +{ + std::cout << "Usage:\n"; + std::cout << " " << arg0 << " [PARAMETERS]\n"; + std::cout << '\n'; + std::cout << "With IMPLEMENTATION one of:\n"; + std::cout << " send Asio-based sender using send_to in a while-loop\n"; + std::cout << " sendasync Asio-based sender using async_send_to\n"; + std::cout << " receive Asio-based receiver using receive_from in a while-loop\n"; + std::cout << " receiveasync Asio-based receiver using async_receive_from\n"; + std::cout << " receivenpcap Npcap-based receiver using receive_from in a while-loop\n"; + std::cout << " receivenpcapasync Npcap-based receiver using async_receive_from\n"; + std::cout << '\n'; + std::cout << "Options:\n"; + std::cout << " -h, --help Show this help message and exit\n"; + std::cout << '\n'; + std::cout << " -i, --ip IP address to send to / receive from. Default to 127.0.0.1\n"; + std::cout << " -p, --port Port to send to / receive from. Default to 14000\n"; + std::cout << " -s, --size Message size to send. Default to 0 (-> empty messages)\n"; + std::cout << " -m, --max-udp-datagram-size Maximum UDP datagram size\n"; + std::cout << " -b, --buffer-size Buffer size for sending & receiving messages\n"; + std::cout << '\n'; +} + +int main(int argc, char* argv[]) +{ + Implementation implementation = Implementation::NONE; + + ReceiverParameters receiver_parameters; + SenderParameters sender_parameters; + + // convert argc, argv to vector of strings + std::vector args; + args.reserve(static_cast(argc)); + for (int i = 0; i < argc; ++i) + { + args.emplace_back(argv[i]); + } + + // Check for -h / --help + if (args.size() < 2 + || std::find(args.begin(), args.end(), "-h") != args.end() + || std::find(args.begin(), args.end(), "--help") != args.end()) + { + printUsage(args[0]); + return 0; + } + + // Check for implementation + { + if (args[1] == "send") + { + implementation = Implementation::SEND; + } + else if (args[1] == "sendasync") + { + implementation = Implementation::SENDASYNC; + } + else if (args[1] == "receive") + { + implementation = Implementation::RECEIVE; + } + else if (args[1] == "receiveasync") + { + implementation = Implementation::RECEIVEASYNC; + } + else if (args[1] == "receivenpcap") + { + implementation = Implementation::RECEIVENPCAP; + } + else if (args[1] == "receivenpcapasync") + { + implementation = Implementation::RECEIVENPCAPASYNC; + } + else + { + printUsage(args[0]); + return 1; + } + } + + // Check for --ip / -i + { + auto it = std::find(args.begin(), args.end(), "--ip"); + if (it == args.end()) + { + it = std::find(args.begin(), args.end(), "-i"); + } + + if (it != args.end()) + { + if (it + 1 == args.end()) + { + std::cerr << "Error: --ip requires an argument\n"; + return 1; + } + sender_parameters.ip = *(it + 1); + receiver_parameters.ip = *(it + 1); + } + } + + // Check for --port / -p + { + auto it = std::find(args.begin(), args.end(), "--port"); + if (it == args.end()) + { + it = std::find(args.begin(), args.end(), "-p"); + } + if (it != args.end()) + { + if (it + 1 == args.end()) + { + std::cerr << "Error: --port requires an argument\n"; + return 1; + } + + unsigned long port {0}; + try + { + port = std::stoul(*(it + 1)); + } + catch (const std::exception& e) + { + std::cerr << "Error: --port requires a numeric argument: " << e.what() << '\n'; + return 1; + } + + // Check numeric limits and print error if out of range + if (port > std::numeric_limits::max()) + { + std::cerr << "Error: --port out of range\n"; + return 1; + } + + sender_parameters.port = static_cast(port); + receiver_parameters.port = static_cast(port); + } + } + + // Check for -s / --size + { + auto it = std::find(args.begin(), args.end(), "-s"); + if (it == args.end()) + { + it = std::find(args.begin(), args.end(), "--size"); + } + + if (it != args.end()) + { + if (it + 1 == args.end()) + { + std::cerr << "Error: -s / --size requires an argument\n"; + return 1; + } + + try + { + sender_parameters.message_size = std::stoul(*(it + 1)); + } + catch (const std::exception& e) + { + std::cerr << "Error: -s / --size requires a numeric argument: " << e.what() << '\n'; + return 1; + } + } + } + + // Check for -m / --max-udp-datagram-size + { + auto it = std::find(args.begin(), args.end(), "-m"); + if (it == args.end()) + { + it = std::find(args.begin(), args.end(), "--max-udp-datagram-size"); + } + + if (it != args.end()) + { + if (it + 1 == args.end()) + { + std::cerr << "Error: -m / --max-udp-datagram-size requires an argument\n"; + return 1; + } + + try + { + sender_parameters.max_udp_datagram_size = std::stoul(*(it + 1)); + } + catch (const std::exception& e) + { + std::cerr << "Error: -m / --max-udp-datagram-size requires a numeric argument: " << e.what() << '\n'; + return 1; + } + } + } + + // Check for -b / --buffer-size + { + auto it = std::find(args.begin(), args.end(), "--buffer-size"); + if (it == args.end()) + { + it = std::find(args.begin(), args.end(), "-b"); + } + if (it != args.end()) + { + if (it + 1 == args.end()) + { + std::cerr << "Error: --buffer-size requires an argument\n"; + return 1; + } + + unsigned long buffer_size {0}; + try + { + buffer_size = std::stoul(*(it + 1)); + } + catch (const std::exception& e) + { + std::cerr << "Error: --buffer-size requires a numeric argument: " << e.what() << '\n'; + return 1; + } + sender_parameters.buffer_size = buffer_size; + receiver_parameters.buffer_size = buffer_size; + } + } + + + // Run the selected implementation + std::shared_ptr sender; + std::shared_ptr receiver; + + switch (implementation) + { + case Implementation::NONE: + break; + case Implementation::SEND: + sender = std::make_shared(sender_parameters); + break; + case Implementation::SENDASYNC: + sender = std::make_shared(sender_parameters); + break; + case Implementation::RECEIVE: + receiver = std::make_shared(receiver_parameters); + break; + case Implementation::RECEIVEASYNC: + receiver = std::make_shared(receiver_parameters); + break; + case Implementation::RECEIVENPCAP: +#if ECALUDP_UDPCAP_ENABLED + receiver = std::make_shared(receiver_parameters); + break; +#else + std::cerr << "Error: Npcap-based receiver not enabled\n"; + return 1; +#endif // ECALUDP_UDPCAP_ENABLED + case Implementation::RECEIVENPCAPASYNC: +#if ECALUDP_UDPCAP_ENABLED + receiver = std::make_shared(receiver_parameters); + break; +#else + std::cerr << "Error: Npcap-based receiver not enabled\n"; + return 1; +#endif // ECALUDP_UDPCAP_ENABLED + default: + break; + } + + if (sender) + { + sender->start(); + } + else if (receiver) + { + receiver->start(); + } + + while(true) + std::this_thread::sleep_for(std::chrono::seconds(1)); + + return 0; +} diff --git a/samples/ecaludp_perftool/src/receiver.cpp b/samples/ecaludp_perftool/src/receiver.cpp new file mode 100644 index 0000000..23fcb49 --- /dev/null +++ b/samples/ecaludp_perftool/src/receiver.cpp @@ -0,0 +1,93 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include "receiver.h" +#include "receiver_parameters.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +Receiver::Receiver(const ReceiverParameters& parameters) + : parameters_(parameters) + , statistics_thread_{std::make_unique([this]() { this->print_statistics(); })} +{ + // Print information for debug purposes + std::cout << parameters_.to_string(); +} + +Receiver::~Receiver() +{ + { + // Stop statistics thread + const std::lock_guard lock(statistics_mutex_); + is_stopped_ = true; + cv_.notify_all(); + } + + if (statistics_thread_->joinable()) + statistics_thread_->join(); +} + +void Receiver::print_statistics() +{ + std::chrono::steady_clock::time_point last_statistics_run; + + while(true) + { + long long bytes_payload {0}; + long long messages_received {0}; + + { + std::unique_lock lock(statistics_mutex_); + cv_.wait_for(lock, std::chrono::seconds(1), [this]() -> bool { return is_stopped_; }); + + if (is_stopped_) + return; + + std::swap(bytes_payload_, bytes_payload); + std::swap(messages_received_, messages_received); + } + + auto now = std::chrono::steady_clock::now(); + + // Calculate message per seconds -> frequency) + double frequency = 0.0; + auto duration = std::chrono::duration_cast>(now - last_statistics_run).count(); + if (duration > 0) + { + frequency = static_cast(messages_received) / duration; + } + + { + std::stringstream ss; + ss << "cnt: " << messages_received; + ss << " | "; + ss << "rcv pyld: " << bytes_payload; + ss << " | "; + ss << "freq: " << std::fixed << std::setprecision(1) << frequency; + + std::cout << ss.str() << '\n'; + } + + last_statistics_run = now; + } +} diff --git a/samples/ecaludp_perftool/src/receiver.h b/samples/ecaludp_perftool/src/receiver.h new file mode 100644 index 0000000..ac7bde9 --- /dev/null +++ b/samples/ecaludp_perftool/src/receiver.h @@ -0,0 +1,59 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#pragma once + +#include +#include +#include +#include + +#include "receiver_parameters.h" + +class Receiver +{ +public: + Receiver(const ReceiverParameters& parameters); + virtual ~Receiver(); + + // disable copy and move + Receiver(const Receiver&) = delete; + Receiver(Receiver&&) = delete; + Receiver& operator=(const Receiver&) = delete; + Receiver& operator=(Receiver&&) = delete; + + virtual void start() = 0; + +private: + void print_statistics(); + +/////////////////////////////////////////////////////////// +// Member variables +/////////////////////////////////////////////////////////// + +protected: + ReceiverParameters parameters_; + + bool is_stopped_ {false}; + mutable std::mutex statistics_mutex_; + std::condition_variable cv_; + + long long bytes_payload_ {0}; + long long messages_received_{0}; + +private: + std::unique_ptr statistics_thread_; +}; diff --git a/samples/ecaludp_perftool/src/receiver_async.cpp b/samples/ecaludp_perftool/src/receiver_async.cpp new file mode 100644 index 0000000..de467b0 --- /dev/null +++ b/samples/ecaludp_perftool/src/receiver_async.cpp @@ -0,0 +1,101 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include "receiver_async.h" + +#include +#include +#include +#include +#include +#include + +#include + +#include "ecaludp/socket.h" +#include "receiver.h" +#include "receiver_parameters.h" +#include "socket_builder_asio.h" + +ReceiverAsync::ReceiverAsync(const ReceiverParameters& parameters) + : Receiver(parameters) +{ + std::cout << "Receiver implementation: Asynchronous asio\n"; +} + +ReceiverAsync::~ReceiverAsync() +{ + if (socket_) + { + asio::error_code ec; + socket_->cancel(ec); // NOLINT(bugprone-unused-return-value) The function also returns the error_code, but we already got it via the parameter + + if (ec) + std::cerr << "Error cancelling socket: " << ec.message() << '\n'; + } + + if(work_) + work_.reset(); + + if (io_context_thread_->joinable()) + io_context_thread_->join(); +} + +void ReceiverAsync::start() +{ + try + { + socket_ = SocketBuilderAsio::CreateReceiveSocket(io_context_, parameters_); + } + catch (const std::exception& e) + { + std::cerr << "Error creating socket: " << e.what() << '\n'; + std::exit(1); + } + + auto endpoint = asio::ip::udp::endpoint(asio::ip::make_address(parameters_.ip), parameters_.port); + + receive_message(); + + work_ = std::make_unique(io_context_); + + io_context_thread_ = std::make_unique([this](){ io_context_.run(); }); +} + +void ReceiverAsync::receive_message() +{ + auto endpoint = std::make_shared(); + + socket_->async_receive_from(*endpoint, + [this, endpoint](const std::shared_ptr& message, const asio::error_code& ec) + { + if (ec) + { + std::cerr << "Error sending: " << ec.message() << '\n'; + socket_->close(); + return; + } + + { + const std::lock_guard lock(statistics_mutex_); + + bytes_payload_ += message->size(); + messages_received_ ++; + } + + receive_message(); + }); +} diff --git a/samples/ecaludp_perftool/src/receiver_async.h b/samples/ecaludp_perftool/src/receiver_async.h new file mode 100644 index 0000000..d749889 --- /dev/null +++ b/samples/ecaludp_perftool/src/receiver_async.h @@ -0,0 +1,51 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#pragma once + +#include "receiver.h" +#include "receiver_parameters.h" + +#include +#include + +#include + +#include + +class ReceiverAsync : public Receiver +{ + public: + ReceiverAsync(const ReceiverParameters& parameters); + ~ReceiverAsync() override; + + // disable copy and move + ReceiverAsync(const ReceiverAsync&) = delete; + ReceiverAsync(ReceiverAsync&&) = delete; + ReceiverAsync& operator=(const ReceiverAsync&) = delete; + ReceiverAsync& operator=(ReceiverAsync&&) = delete; + + void start() override; + + private: + void receive_message(); + + private: + std::unique_ptr io_context_thread_; + asio::io_context io_context_; + std::shared_ptr socket_; + std::unique_ptr work_; +}; diff --git a/samples/ecaludp_perftool/src/receiver_npcap_async.cpp b/samples/ecaludp_perftool/src/receiver_npcap_async.cpp new file mode 100644 index 0000000..322b503 --- /dev/null +++ b/samples/ecaludp_perftool/src/receiver_npcap_async.cpp @@ -0,0 +1,84 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include "receiver_npcap_async.h" + +#include +#include +#include +#include +#include + +#include + +#include "ecaludp/socket.h" +#include "receiver.h" +#include "receiver_parameters.h" +#include "socket_builder_npcap.h" + +ReceiverNpcapAsync::ReceiverNpcapAsync(const ReceiverParameters& parameters) + : Receiver(parameters) +{ + std::cout << "Receiver implementation: Asynchronous NPCAP\n"; +} + +ReceiverNpcapAsync::~ReceiverNpcapAsync() +{ + if (socket_) + { + socket_->close(); + } +} + +void ReceiverNpcapAsync::start() +{ + try + { + socket_ = SocketBuilderNpcap::CreateReceiveSocket(parameters_); + } + catch (const std::exception& e) + { + std::cerr << "Error creating socket: " << e.what()<< '\n'; + std::exit(1); + } + + receive_message(); +} + +void ReceiverNpcapAsync::receive_message() +{ + auto endpoint = std::make_shared(); + + socket_->async_receive_from(*endpoint, + [this, endpoint](const std::shared_ptr& message, const ecaludp::Error& error) + { + if (error) + { + std::cerr << "Error sending: " << error.ToString()<< '\n'; + socket_->close(); + return; + } + + { + const std::lock_guard lock(statistics_mutex_); + + bytes_payload_ += message->size(); + messages_received_ ++; + } + + receive_message(); + }); +} diff --git a/samples/ecaludp_perftool/src/receiver_npcap_async.h b/samples/ecaludp_perftool/src/receiver_npcap_async.h new file mode 100644 index 0000000..d85eebb --- /dev/null +++ b/samples/ecaludp_perftool/src/receiver_npcap_async.h @@ -0,0 +1,47 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#pragma once + +#include "receiver.h" + +#include +#include + +#include + +#include + +class ReceiverNpcapAsync : public Receiver +{ + public: + ReceiverNpcapAsync(const ReceiverParameters& parameters); + ~ReceiverNpcapAsync() override; + + // disable copy and move + ReceiverNpcapAsync(const ReceiverNpcapAsync&) = delete; + ReceiverNpcapAsync(ReceiverNpcapAsync&&) = delete; + ReceiverNpcapAsync& operator=(const ReceiverNpcapAsync&) = delete; + ReceiverNpcapAsync& operator=(ReceiverNpcapAsync&&) = delete; + + void start() override; + + private: + void receive_message(); + + private: + std::shared_ptr socket_; +}; diff --git a/samples/ecaludp_perftool/src/receiver_npcap_sync.cpp b/samples/ecaludp_perftool/src/receiver_npcap_sync.cpp new file mode 100644 index 0000000..0f02b09 --- /dev/null +++ b/samples/ecaludp_perftool/src/receiver_npcap_sync.cpp @@ -0,0 +1,90 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include "receiver_npcap_sync.h" + +#include +#include +#include +#include +#include +#include + +#include + +#include "ecaludp/socket_npcap.h" +#include "receiver.h" +#include "receiver_parameters.h" +#include "socket_builder_npcap.h" + +ReceiverNpcapSync::ReceiverNpcapSync(const ReceiverParameters& parameters) + : Receiver(parameters) +{ + std::cout << "Receiver implementation: Synchronous NPCAP\n"; +} + +ReceiverNpcapSync::~ReceiverNpcapSync() +{ + if (receive_thread_ && receive_thread_->joinable()) + { + receive_thread_->join(); + } +} + +void ReceiverNpcapSync::start() +{ + receive_thread_ = std::make_unique(&ReceiverNpcapSync::receive_loop, this); +} + +void ReceiverNpcapSync::receive_loop() +{ + std::shared_ptr receive_socket; + try + { + receive_socket = SocketBuilderNpcap::CreateReceiveSocket(parameters_); + } + catch (const std::exception& e) + { + std::cerr << "Error creating socket: " << e.what()<< '\n'; + std::exit(1); + } + + asio::ip::udp::endpoint destination(asio::ip::address::from_string(parameters_.ip), parameters_.port); + + while (true) + { + { + ecaludp::Error error = ecaludp::Error::GENERIC_ERROR; + auto payload_buffer = receive_socket->receive_from(destination, error); + + if (error) + { + std::cerr << "Error receiving message: " << error.ToString()<< '\n'; + break; + } + + { + const std::lock_guard lock(statistics_mutex_); + + if (is_stopped_) + break; + + bytes_payload_ += payload_buffer->size(); + messages_received_ ++; + } + } + } +} diff --git a/samples/ecaludp_perftool/src/receiver_npcap_sync.h b/samples/ecaludp_perftool/src/receiver_npcap_sync.h new file mode 100644 index 0000000..3d970c1 --- /dev/null +++ b/samples/ecaludp_perftool/src/receiver_npcap_sync.h @@ -0,0 +1,43 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#pragma once + +#include "receiver.h" + +#include +#include + +class ReceiverNpcapSync : public Receiver +{ + public: + ReceiverNpcapSync(const ReceiverParameters& parameters); + ~ReceiverNpcapSync() override; + + // disable copy and move + ReceiverNpcapSync(const ReceiverNpcapSync&) = delete; + ReceiverNpcapSync(ReceiverNpcapSync&&) = delete; + ReceiverNpcapSync& operator=(const ReceiverNpcapSync&) = delete; + ReceiverNpcapSync& operator=(ReceiverNpcapSync&&) = delete; + + void start() override; + + private: + void receive_loop(); + + private: + std::unique_ptr receive_thread_; +}; diff --git a/samples/ecaludp_perftool/src/receiver_parameters.h b/samples/ecaludp_perftool/src/receiver_parameters.h new file mode 100644 index 0000000..de455b1 --- /dev/null +++ b/samples/ecaludp_perftool/src/receiver_parameters.h @@ -0,0 +1,40 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#pragma once + +#include +#include +#include + +struct ReceiverParameters +{ + std::string ip {"127.0.0.1"}; + uint16_t port {14000}; + int buffer_size {-1}; + + std::string to_string() const + { + std::stringstream ss; + + ss << "Receiver Parameters: \n"; + ss << " IP: " << ip << '\n'; + ss << " Port: " << port << '\n'; + ss << " Buffer Size: " << (buffer_size > 0 ? std::to_string(buffer_size) : "default") << '\n'; + + return ss.str(); + } +}; diff --git a/samples/ecaludp_perftool/src/receiver_sync.cpp b/samples/ecaludp_perftool/src/receiver_sync.cpp new file mode 100644 index 0000000..a5e8828 --- /dev/null +++ b/samples/ecaludp_perftool/src/receiver_sync.cpp @@ -0,0 +1,110 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include "receiver_sync.h" + +#include +#include +#include +#include +#include +#include + +#include + +#include "ecaludp/socket.h" +#include "receiver.h" +#include "receiver_parameters.h" +#include "socket_builder_asio.h" + +ReceiverSync::ReceiverSync(const ReceiverParameters& parameters) + : Receiver(parameters) +{ + std::cout << "Receiver implementation: Synchronous asio\n"; +} + +ReceiverSync::~ReceiverSync() +{ + if (receive_thread_ && receive_thread_->joinable()) + { + receive_thread_->join(); + } +} + +void ReceiverSync::start() +{ + receive_thread_ = std::make_unique(&ReceiverSync::receive_loop, this); +} + +void ReceiverSync::receive_loop() +{ + asio::io_context io_context; + + std::shared_ptr receive_socket; + try + { + receive_socket = SocketBuilderAsio::CreateReceiveSocket(io_context, parameters_); + } + catch (const std::exception& e) + { + std::cerr << "Error creating socket: " << e.what() << '\n'; + std::exit(1); + } + + asio::ip::udp::endpoint destination; + + while (true) + { + { + asio::error_code ec; + auto payload_buffer = receive_socket->receive_from(destination, 0, ec); + + if (ec) + { + std::cerr << "Error receiving message: " << ec.message() << '\n'; + break; + } + + { + const std::lock_guard lock(statistics_mutex_); + + if (is_stopped_) + break; + + bytes_payload_ += payload_buffer->size(); + messages_received_ ++; + } + } + } + + { + asio::error_code ec; + receive_socket->shutdown(asio::socket_base::shutdown_both, ec); // NOLINT(bugprone-unused-return-value) The function also returns the error_code, but we already got it via the parameter + if (ec) + { + std::cerr << "Error shutting down socket: " << ec.message() << '\n'; + } + } + + { + asio::error_code ec; + receive_socket->close(ec); // NOLINT(bugprone-unused-return-value) The function also returns the error_code, but we already got it via the parameter + if (ec) + { + std::cerr << "Error closing socket: " << ec.message() << '\n'; + } + } +} diff --git a/samples/ecaludp_perftool/src/receiver_sync.h b/samples/ecaludp_perftool/src/receiver_sync.h new file mode 100644 index 0000000..300cc2c --- /dev/null +++ b/samples/ecaludp_perftool/src/receiver_sync.h @@ -0,0 +1,44 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#pragma once + +#include "receiver.h" +#include "receiver_parameters.h" + +#include +#include + +class ReceiverSync : public Receiver +{ + public: + ReceiverSync(const ReceiverParameters& parameters); + ~ReceiverSync() override; + + // disable copy and move + ReceiverSync(const ReceiverSync&) = delete; + ReceiverSync(ReceiverSync&&) = delete; + ReceiverSync& operator=(const ReceiverSync&) = delete; + ReceiverSync& operator=(ReceiverSync&&) = delete; + + void start() override; + + private: + void receive_loop(); + + private: + std::unique_ptr receive_thread_; +}; diff --git a/samples/ecaludp_perftool/src/sender.cpp b/samples/ecaludp_perftool/src/sender.cpp new file mode 100644 index 0000000..2577297 --- /dev/null +++ b/samples/ecaludp_perftool/src/sender.cpp @@ -0,0 +1,98 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include "sender.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "sender_parameters.h" + +Sender::Sender(const SenderParameters& parameters) + : parameters_(parameters) + , statistics_thread_{std::make_unique([this]() { this->print_statistics(); })} +{ + // Print information for debug purposes + std::cout << parameters_.to_string(); +} + +Sender::~Sender() +{ + { + // Stop statistics thread + const std::lock_guard lock(statistics_mutex_); + is_stopped_ = true; + cv_.notify_all(); + } + + if (statistics_thread_->joinable()) + statistics_thread_->join(); +} + +void Sender::print_statistics() +{ + std::chrono::steady_clock::time_point last_statistics_run; + + while(true) + { + long long bytes_raw {0}; + long long bytes_payload{0}; + long long messages_sent{0}; + + + { + std::unique_lock lock(statistics_mutex_); + cv_.wait_for(lock, std::chrono::seconds(1), [this]() -> bool { return is_stopped_; }); + + if (is_stopped_) + return; + + std::swap(bytes_raw_, bytes_raw); + std::swap(bytes_payload_, bytes_payload); + std::swap(messages_sent_, messages_sent); + } + + auto now = std::chrono::steady_clock::now(); + + // Calculate message per seconds -> frequency) + double frequency = 0.0; + auto duration = std::chrono::duration_cast>(now - last_statistics_run).count(); + if (duration > 0) + { + frequency = static_cast(messages_sent) / duration; + } + + { + std::stringstream ss; + ss << "cnt: " << messages_sent; + ss << " | "; + //ss << "snt raw: " << bytes_raw << " pyld: " << bytes_payload; // TODO: The async send doesn't return the raw number of bytes sent + ss << "snt pyld: " << bytes_payload; + ss << " | "; + ss << "freq: " << std::fixed << std::setprecision(1) << frequency; + + std::cout << ss.str() << '\n'; + } + + last_statistics_run = now; + } +} diff --git a/samples/ecaludp_perftool/src/sender.h b/samples/ecaludp_perftool/src/sender.h new file mode 100644 index 0000000..27c0d6d --- /dev/null +++ b/samples/ecaludp_perftool/src/sender.h @@ -0,0 +1,60 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#pragma once + +#include +#include +#include +#include + +#include "sender_parameters.h" + +class Sender +{ +public: + Sender(const SenderParameters& parameters); + virtual ~Sender(); + + // disable copy and move + Sender(const Sender&) = delete; + Sender(Sender&&) = delete; + Sender& operator=(const Sender&) = delete; + Sender& operator=(Sender&&) = delete; + + virtual void start() = 0; + +private: + void print_statistics(); + +/////////////////////////////////////////////////////////// +// Member variables +/////////////////////////////////////////////////////////// + +protected: + SenderParameters parameters_; + + bool is_stopped_ {false}; + mutable std::mutex statistics_mutex_; + std::condition_variable cv_; + + long long bytes_raw_ {0}; + long long bytes_payload_ {0}; + long long messages_sent_ {0}; + +private: + std::unique_ptr statistics_thread_; +}; diff --git a/samples/ecaludp_perftool/src/sender_async.cpp b/samples/ecaludp_perftool/src/sender_async.cpp new file mode 100644 index 0000000..2cb886b --- /dev/null +++ b/samples/ecaludp_perftool/src/sender_async.cpp @@ -0,0 +1,99 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include "sender_async.h" + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "sender.h" +#include "sender_parameters.h" +#include "socket_builder_asio.h" + +SenderAsync::SenderAsync(const SenderParameters& parameters) + : Sender(parameters) +{ + std::cout << "Sender implementation: Asynchronous asio\n"; +} + +SenderAsync::~SenderAsync() +{ + if (socket_) + { + asio::error_code ec; + socket_->cancel(ec); // NOLINT(bugprone-unused-return-value) The function also returns the error_code, but we already got it via the parameter + + if (ec) + std::cerr << "Error cancelling socket: " << ec.message() << '\n'; + } + + if(io_context_thread_->joinable()) + io_context_thread_->join(); +} + +void SenderAsync::start() +{ + try + { + socket_ = SocketBuilderAsio::CreateSendSocket(io_context_, parameters_); + } + catch (const std::exception& e) + { + std::cerr << "Error creating socket: " << e.what() << '\n'; + std::exit(1); + } + + auto message = std::make_shared(parameters_.message_size, 'a'); + auto endpoint = asio::ip::udp::endpoint(asio::ip::make_address(parameters_.ip), parameters_.port); + + send_message(message, endpoint); + + io_context_thread_ = std::make_unique([this](){ io_context_.run(); }); +} + +void SenderAsync::send_message(const std::shared_ptr& message, const asio::ip::udp::endpoint& endpoint) +{ + + socket_->async_send_to( asio::buffer(*message) + , endpoint + , [this, message, endpoint](asio::error_code ec) + { + if (ec) + { + std::cerr << "Error sending: " << ec.message() << '\n'; + socket_->close(); + return; + } + + { + const std::lock_guard lock(statistics_mutex_); + + //bytes_raw_ += bytes_sent; // TODO: the current implementation doesn't return the raw number of bytes sent + bytes_payload_ += message->size(); + messages_sent_ ++; + } + + this->send_message(message, endpoint); + }); + +} diff --git a/samples/ecaludp_perftool/src/sender_async.h b/samples/ecaludp_perftool/src/sender_async.h new file mode 100644 index 0000000..7cc0740 --- /dev/null +++ b/samples/ecaludp_perftool/src/sender_async.h @@ -0,0 +1,51 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#pragma once + +#include "sender.h" +#include "sender_parameters.h" + +#include +#include +#include + +#include + +#include + +class SenderAsync : public Sender +{ + public: + SenderAsync(const SenderParameters& parameters); + ~SenderAsync() override; + + // disable copy and move + SenderAsync(const SenderAsync&) = delete; + SenderAsync(SenderAsync&&) = delete; + SenderAsync& operator=(const SenderAsync&) = delete; + SenderAsync& operator=(SenderAsync&&) = delete; + + void start() override; + + private: + void send_message(const std::shared_ptr& message, const asio::ip::udp::endpoint& endpoint); + + private: + std::unique_ptr io_context_thread_; + asio::io_context io_context_; + std::shared_ptr socket_; +}; diff --git a/samples/ecaludp_perftool/src/sender_parameters.h b/samples/ecaludp_perftool/src/sender_parameters.h new file mode 100644 index 0000000..76111a9 --- /dev/null +++ b/samples/ecaludp_perftool/src/sender_parameters.h @@ -0,0 +1,45 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#pragma once + +#include +#include +#include +#include + +struct SenderParameters +{ + std::string ip { "127.0.0.1" }; + uint16_t port {14000}; + size_t message_size {0}; + int max_udp_datagram_size {-1}; + int buffer_size {-1}; + + std::string to_string() const + { + std::stringstream ss; + + ss << "SenderParameters: \n"; + ss << " ip: " << ip << '\n'; + ss << " port: " << port << '\n'; + ss << " message_size: " << message_size << '\n'; + ss << " max_udp_datagram_size: " << (max_udp_datagram_size > 0 ? std::to_string(max_udp_datagram_size) : "default") << '\n'; + ss << " buffer_size: " << (buffer_size > 0 ? std::to_string(buffer_size) : "default") << '\n'; + + return ss.str(); + } +}; diff --git a/samples/ecaludp_perftool/src/sender_sync.cpp b/samples/ecaludp_perftool/src/sender_sync.cpp new file mode 100644 index 0000000..9fbbc5c --- /dev/null +++ b/samples/ecaludp_perftool/src/sender_sync.cpp @@ -0,0 +1,113 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include "sender_sync.h" + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "ecaludp/socket.h" +#include "sender.h" +#include "sender_parameters.h" +#include "socket_builder_asio.h" + +SenderSync::SenderSync(const SenderParameters& parameters) + : Sender(parameters) +{ + std::cout << "Sender implementation: Synchronous asio\n"; +} + +SenderSync::~SenderSync() +{ + if (send_thread_ && send_thread_->joinable()) + { + send_thread_->join(); + } +} + +void SenderSync::start() +{ + send_thread_ = std::make_unique(&SenderSync::send_loop, this); +} + +void SenderSync::send_loop() +{ + asio::io_context io_context; + + std::shared_ptr send_socket; + try + { + send_socket = SocketBuilderAsio::CreateSendSocket(io_context, parameters_); + } + catch (const std::exception& e) + { + std::cerr << "Error creating socket: " << e.what() << '\n'; + std::exit(1); + } + + const std::string message = std::string(parameters_.message_size, 'a'); + const asio::ip::udp::endpoint destination(asio::ip::address::from_string(parameters_.ip), parameters_.port); + + while (true) + { + { + asio::error_code ec; + auto bytes_sent = send_socket->send_to(asio::buffer(message), destination, 0, ec); + + if (ec) + { + std::cerr << "Error sending message: " << ec.message() << '\n'; + break; + } + + { + const std::lock_guard lock(statistics_mutex_); + + if (is_stopped_) + break; + + bytes_raw_ += bytes_sent; + bytes_payload_ += message.size(); + messages_sent_ ++; + } + } + } + + { + asio::error_code ec; + send_socket->shutdown(asio::socket_base::shutdown_both, ec); // NOLINT(bugprone-unused-return-value) The function also returns the error_code, but we already got it via the parameter + if (ec) + { + std::cerr << "Error shutting down socket: " << ec.message() << '\n'; + } + } + + { + asio::error_code ec; + send_socket->close(ec); // NOLINT(bugprone-unused-return-value) The function also returns the error_code, but we already got it via the parameter + if (ec) + { + std::cerr << "Error closing socket: " << ec.message() << '\n'; + } + } +} diff --git a/samples/ecaludp_perftool/src/sender_sync.h b/samples/ecaludp_perftool/src/sender_sync.h new file mode 100644 index 0000000..e16a66c --- /dev/null +++ b/samples/ecaludp_perftool/src/sender_sync.h @@ -0,0 +1,44 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#pragma once + +#include "sender.h" +#include "sender_parameters.h" + +#include +#include + +class SenderSync : public Sender +{ + public: + SenderSync(const SenderParameters& parameters); + ~SenderSync() override; + + // disable copy and move + SenderSync(const SenderSync&) = delete; + SenderSync(SenderSync&&) = delete; + SenderSync& operator=(const SenderSync&) = delete; + SenderSync& operator=(SenderSync&&) = delete; + + void start() override; + + private: + void send_loop(); + + private: + std::unique_ptr send_thread_; +}; diff --git a/samples/ecaludp_perftool/src/socket_builder_asio.cpp b/samples/ecaludp_perftool/src/socket_builder_asio.cpp new file mode 100644 index 0000000..964526b --- /dev/null +++ b/samples/ecaludp_perftool/src/socket_builder_asio.cpp @@ -0,0 +1,179 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include "socket_builder_asio.h" +#include "ecaludp/socket.h" +#include "receiver_parameters.h" +#include "sender_parameters.h" + +#include +#include + +#include +#include + +namespace SocketBuilderAsio +{ + std::shared_ptr CreateSendSocket(asio::io_context& io_context, const SenderParameters& parameters) + { + auto socket = std::make_shared(io_context, std::array{'E', 'C', 'A', 'L'}); + + asio::ip::address ip_address {}; + { + asio::error_code ec; + ip_address = asio::ip::make_address(parameters.ip, ec); + if (ec) + { + throw std::runtime_error("Invalid IP address: " + parameters.ip); + } + } + + const asio::ip::udp::endpoint destination(ip_address, parameters.port); + + if (parameters.max_udp_datagram_size > 0) + { + socket->set_max_udp_datagram_size(parameters.max_udp_datagram_size); + } + + { + asio::error_code ec; + socket->open(destination.protocol(), ec); // NOLINT(bugprone-unused-return-value) The function also returns the error_code, but we already got it via the parameter + if (ec) + { + throw std::runtime_error("Failed to open socket: " + ec.message()); + } + } + + // Set sent buffer size + if (parameters.buffer_size > 0) + { + const asio::socket_base::send_buffer_size option(parameters.buffer_size); + + asio::error_code ec; + socket->set_option(option, ec); // NOLINT(bugprone-unused-return-value) The function also returns the error_code, but we already got it via the parameter + if (ec) + { + throw std::runtime_error("Failed to set send buffer size: " + ec.message()); + } + } + + return socket; + } + + std::shared_ptr CreateReceiveSocket(asio::io_context& io_context, const ReceiverParameters& parameters) + { + auto socket = std::make_shared(io_context, std::array{'E', 'C', 'A', 'L'}); + + asio::ip::address ip_address {}; + { + asio::error_code ec; + ip_address = asio::ip::make_address(parameters.ip, ec); + if (ec) + { + throw std::runtime_error("Invalid IP address: " + parameters.ip); + } + } + + const asio::ip::udp::endpoint destination(ip_address, parameters.port); + + { + asio::error_code ec; + socket->open(destination.protocol(), ec); // NOLINT(bugprone-unused-return-value) The function also returns the error_code, but we already got it via the parameter + if (ec) + { + throw std::runtime_error("Failed to open socket: " + ec.message()); + } + } + + // Set reuse address + { + const asio::ip::udp::socket::reuse_address option(true); + + asio::error_code ec; + socket->set_option(option, ec); // NOLINT(bugprone-unused-return-value) The function also returns the error_code, but we already got it via the parameter + if (ec) + { + throw std::runtime_error("Failed to set reuse address: " + ec.message()); + } + } + + if (destination.address().is_multicast()) + { + { + // Set multicast loopback + asio::error_code ec; + const asio::ip::multicast::enable_loopback option(true); + socket->set_option(option, ec); // NOLINT(bugprone-unused-return-value) The function also returns the error_code, but we already got it via the parameter + if (ec) + { + throw std::runtime_error("Failed to set multicast loopback: " + ec.message()); + } + } + { + // "Bind" multicast address + asio::ip::udp::endpoint bind_endpoint; + if (ip_address.is_v4()) + { + bind_endpoint = asio::ip::udp::endpoint(asio::ip::address_v4(), destination.port()); + } + else + { + bind_endpoint = asio::ip::udp::endpoint(asio::ip::address_v6(), destination.port()); + } + + asio::error_code ec; + socket->bind(bind_endpoint, ec); // NOLINT(bugprone-unused-return-value) The function also returns the error_code, but we already got it via the parameter + if (ec) + { + throw std::runtime_error("Failed to bind socket: " + ec.message()); + } + } + { + // Join multicast group + asio::error_code ec; + socket->set_option(asio::ip::multicast::join_group(destination.address()), ec); // NOLINT(bugprone-unused-return-value) The function also returns the error_code, but we already got it via the parameter + if (ec) + { + throw std::runtime_error("Failed to join multicast group: " + ec.message()); + } + } + } + else + { + asio::error_code ec; + socket->bind(destination, ec); // NOLINT(bugprone-unused-return-value) The function also returns the error_code, but we already got it via the parameter + if (ec) + { + throw std::runtime_error("Failed to bind socket: " + ec.message()); + } + } + + // Set receive buffer size + if (parameters.buffer_size > 0) + { + const asio::socket_base::receive_buffer_size option(parameters.buffer_size); + + asio::error_code ec; + socket->set_option(option, ec); // NOLINT(bugprone-unused-return-value) The function also returns the error_code, but we already got it via the parameter + if (ec) + { + throw std::runtime_error("Failed to set receive buffer size: " + ec.message()); + } + } + + return socket; + } +} diff --git a/samples/ecaludp_perftool/src/socket_builder_asio.h b/samples/ecaludp_perftool/src/socket_builder_asio.h new file mode 100644 index 0000000..ffdc370 --- /dev/null +++ b/samples/ecaludp_perftool/src/socket_builder_asio.h @@ -0,0 +1,32 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#pragma once + +#include + +#include + +#include // IWYU pragma: keep + +#include "sender_parameters.h" +#include "receiver_parameters.h" + +namespace SocketBuilderAsio +{ + std::shared_ptr CreateSendSocket (asio::io_context& io_context, const SenderParameters& parameters); + std::shared_ptr CreateReceiveSocket(asio::io_context& io_context, const ReceiverParameters& parameters); +} diff --git a/samples/ecaludp_perftool/src/socket_builder_npcap.cpp b/samples/ecaludp_perftool/src/socket_builder_npcap.cpp new file mode 100644 index 0000000..c7983fa --- /dev/null +++ b/samples/ecaludp_perftool/src/socket_builder_npcap.cpp @@ -0,0 +1,95 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#include "socket_builder_npcap.h" + +#include +#include +#include + +#include +#include + +#include "receiver_parameters.h" + +namespace SocketBuilderNpcap +{ + std::shared_ptr CreateReceiveSocket(const ReceiverParameters& parameters) + { + auto socket = std::make_shared(std::array{'E', 'C', 'A', 'L'}); + + asio::ip::address ip_address {}; + { + asio::error_code ec; + ip_address = asio::ip::make_address(parameters.ip, ec); + if (ec) + { + throw std::runtime_error("Invalid IP address: " + parameters.ip); + } + } + + // only v4 is supported right now + if (!ip_address.is_v4()) + { + throw std::runtime_error("Only IPv4 is supported"); + } + + // Set receive buffer size + if (parameters.buffer_size > 0) + { + const bool success = socket->set_receive_buffer_size(parameters.buffer_size); + if (!success) + { + throw std::runtime_error("Failed to set receive buffer size"); + } + } + + + if (ip_address.is_multicast()) + { + socket->set_multicast_loopback_enabled(true); + + // "Bind" multicast address + { + const asio::ip::udp::endpoint bind_endpoint = asio::ip::udp::endpoint(asio::ip::address_v4(), parameters.port); + const bool success = socket->bind(bind_endpoint); + if (!success) + { + throw std::runtime_error("Failed to bind socket"); + } + } + + { + const bool success = socket->join_multicast_group(ip_address.to_v4()); + if (!success) + { + throw std::runtime_error("Failed to join multicast group"); + } + } + } + else + { + const asio::ip::udp::endpoint destination(ip_address, parameters.port); + const bool success = socket->bind(destination); + if (!success) + { + throw std::runtime_error("Failed to bind socket"); + } + } + + return socket; + } +} diff --git a/samples/ecaludp_perftool/src/socket_builder_npcap.h b/samples/ecaludp_perftool/src/socket_builder_npcap.h new file mode 100644 index 0000000..67a8101 --- /dev/null +++ b/samples/ecaludp_perftool/src/socket_builder_npcap.h @@ -0,0 +1,28 @@ +/******************************************************************************** + * Copyright (c) 2024 Continental Corporation + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +#pragma once + +#include + +#include + +#include "receiver_parameters.h" + +namespace SocketBuilderNpcap +{ + std::shared_ptr CreateReceiveSocket(const ReceiverParameters& parameters); +} diff --git a/samples/ecaludp_sample/src/main.cpp b/samples/ecaludp_sample/src/main.cpp index 970289e..373ce4c 100644 --- a/samples/ecaludp_sample/src/main.cpp +++ b/samples/ecaludp_sample/src/main.cpp @@ -43,7 +43,7 @@ void send_package() { if (ec) { - std::cout << "Error sending: " << ec.message() << std::endl; + std::cout << "Error sending: " << ec.message() << '\n'; return; } @@ -52,7 +52,7 @@ void send_package() { if (ec) { - std::cout << "Error waiting: " << ec.message() << std::endl; + std::cout << "Error waiting: " << ec.message() << '\n'; return; } @@ -70,12 +70,12 @@ void receive_package() { if (ec) { - std::cout << "Error receiving: " << ec.message() << std::endl; + std::cout << "Error receiving: " << ec.message() << '\n'; return; } std::string received_string(static_cast(buffer->data()), buffer->size()); - std::cout << "Received " << buffer->size() << " bytes from " << sender_endpoint->address().to_string() << ":" << sender_endpoint->port() << ": " << received_string << std::endl; + std::cout << "Received " << buffer->size() << " bytes from " << sender_endpoint->address().to_string() << ":" << sender_endpoint->port() << ": " << received_string << '\n'; receive_package(); @@ -96,7 +96,7 @@ int main(int argc, char** argv) if (ec) { - std::cout << "Error opening socket: " << ec.message() << std::endl; + std::cout << "Error opening socket: " << ec.message() << '\n'; return -1; } } @@ -107,7 +107,7 @@ int main(int argc, char** argv) if (ec) { - std::cout << "Error binding socket: " << ec.message() << std::endl; + std::cout << "Error binding socket: " << ec.message() << '\n'; return -1; } } diff --git a/samples/ecaludp_sample_npcap/src/main.cpp b/samples/ecaludp_sample_npcap/src/main.cpp index 5cb9e77..1fcf1af 100644 --- a/samples/ecaludp_sample_npcap/src/main.cpp +++ b/samples/ecaludp_sample_npcap/src/main.cpp @@ -67,7 +67,7 @@ void receive_package() auto sender_endpoint = std::make_shared(); receive_socket_->async_receive_from(*sender_endpoint - , [sender_endpoint](const std::shared_ptr& buffer, ecaludp::Error& error) + , [sender_endpoint](const std::shared_ptr& buffer, const ecaludp::Error& error) { if (error) { diff --git a/tests/ecaludp_test/src/ecaludp_socket_test.cpp b/tests/ecaludp_test/src/ecaludp_socket_test.cpp index b65414c..778c7dc 100644 --- a/tests/ecaludp_test/src/ecaludp_socket_test.cpp +++ b/tests/ecaludp_test/src/ecaludp_socket_test.cpp @@ -368,7 +368,7 @@ TEST(EcalUdpSocket, SyncHelloWorldMessage) asio::error_code ec; send_socket.send_to(asio::buffer(message_to_send), destination, 0, ec); if (ec) - std::cerr << ec.message() << std::endl; + std::cerr << ec.message() << '\n'; ASSERT_FALSE(ec); } @@ -460,7 +460,7 @@ TEST(EcalUdpSocket, SyncBigMessage) asio::error_code ec; send_socket.send_to(asio::buffer(message_to_send), destination, 0, ec); if (ec) - std::cerr << ec.message() << std::endl; + std::cerr << ec.message() << '\n'; ASSERT_FALSE(ec); } diff --git a/thirdparty/udpcap b/thirdparty/udpcap index 6b7622c..a381b68 160000 --- a/thirdparty/udpcap +++ b/thirdparty/udpcap @@ -1 +1 @@ -Subproject commit 6b7622c6ba51ddbbedd89b07e1b61d515a52d718 +Subproject commit a381b681af0559605d48421da18636b3964d861f