diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index cad17103b7..934b99110b 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -63,6 +63,8 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/executable_list.cpp src/rclcpp/executor.cpp src/rclcpp/executors.cpp + src/rclcpp/executors/cbg_executor.cpp + src/rclcpp/executors/detail/weak_executable_with_rcl_handle_cache.cpp src/rclcpp/executors/executor_entities_collection.cpp src/rclcpp/executors/executor_entities_collector.cpp src/rclcpp/executors/executor_notify_waitable.cpp diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index 3d8cd3c3e9..9480153a95 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -485,7 +485,7 @@ class Executor RCLCPP_PUBLIC static void execute_subscription( - rclcpp::SubscriptionBase::SharedPtr subscription); + const rclcpp::SubscriptionBase::SharedPtr &subscription); /// Run timer executable. /** @@ -503,7 +503,7 @@ class Executor */ RCLCPP_PUBLIC static void - execute_service(rclcpp::ServiceBase::SharedPtr service); + execute_service(const rclcpp::ServiceBase::SharedPtr &service); /// Run service client executable. /** @@ -512,7 +512,7 @@ class Executor */ RCLCPP_PUBLIC static void - execute_client(rclcpp::ClientBase::SharedPtr client); + execute_client(const rclcpp::ClientBase::SharedPtr &client); protected: /// Block until more work becomes avilable or timeout is reached. diff --git a/rclcpp/include/rclcpp/executors/cbg_executor.hpp b/rclcpp/include/rclcpp/executors/cbg_executor.hpp new file mode 100644 index 0000000000..e7f9e90f6d --- /dev/null +++ b/rclcpp/include/rclcpp/executors/cbg_executor.hpp @@ -0,0 +1,592 @@ +// Copyright 2024 Cellumation GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +// #include "rclcpp/executors/detail/any_executable_weak_ref.hpp" +#include "rclcpp/executors/detail/weak_executable_with_rcl_handle.hpp" +#include "rclcpp/executor.hpp" +// #include "rclcpp/executors/callback_group_state.hpp" +#include "rclcpp/macros.hpp" +#include "rclcpp/memory_strategies.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ +namespace executors +{ + +struct AnyExecutableCbg +{ + std::function execute_function; + + bool more_executables_ready_in_cbg = false; +}; + +template +class ExecutionGroup +{ +public: + ExecutionGroup(size_t expected_size = 0) + { + ready_executables.reserve(expected_size); + } + + void clear_and_prepare(size_t expected_size) + { + ready_executables.clear(); + ready_executables.reserve(expected_size); + next_unprocessed_ready_executable = 0; + } + + void add_ready_executable(ExecutableRef & e) + { + ready_executables.push_back(&e); + } + + bool has_unprocessed_executables() + { + for (; next_unprocessed_ready_executable < ready_executables.size(); + next_unprocessed_ready_executable++) + { + auto & ready_executable = ready_executables[next_unprocessed_ready_executable]; + + if (ready_executable->executable_alive()) { + return true; + } + } + return false; + } + bool get_unprocessed_executable(AnyExecutableCbg & any_executable) + { + for (; next_unprocessed_ready_executable < ready_executables.size(); + next_unprocessed_ready_executable++) + { + const auto & ready_executable = ready_executables[next_unprocessed_ready_executable]; + + if (fill_any_executable(any_executable, ready_executable->executable)) { + + // mark the current element as processed + next_unprocessed_ready_executable++; + + return true; + } + } + + return false; + } + + bool execute_unprocessed_executable_until(const std::chrono::time_point &stop_time) + { + bool executed_executable = false; + + for (; next_unprocessed_ready_executable < ready_executables.size(); + next_unprocessed_ready_executable++) + { + const auto & ready_executable = ready_executables[next_unprocessed_ready_executable]; + + AnyExecutableCbg any_executable; + if (fill_any_executable(any_executable, ready_executable->executable)) { + + any_executable.execute_function(); + + executed_executable = true; + + if(std::chrono::steady_clock::now() >= stop_time) + { + // mark the current element as processed + next_unprocessed_ready_executable++; + + return true; + } + } + } + + return executed_executable; + } + + bool get_unprocessed_executable(AnyExecutable & any_executable) + { + for (; next_unprocessed_ready_executable < ready_executables.size(); + next_unprocessed_ready_executable++) + { + const auto & ready_executable = ready_executables[next_unprocessed_ready_executable]; + + if (fill_any_executable(any_executable, ready_executable->executable)) { + // mark the current element as processed + next_unprocessed_ready_executable++; + + return true; + } + } + + return false; + } + +private: + bool fill_any_executable( + AnyExecutable & any_executable, + const rclcpp::SubscriptionBase::WeakPtr & ptr) + { + any_executable.subscription = ptr.lock(); +/* + if (any_executable.subscription) { + //RCUTILS_LOG_INFO("Executing subscription"); + }*/ + + return any_executable.subscription.operator bool(); + } + + bool fill_any_executable( + AnyExecutableCbg & any_executable, + const rclcpp::SubscriptionBase::WeakPtr & ptr) + { + auto shr_ptr = ptr.lock(); + if(!shr_ptr) + { + return false; + } + any_executable.execute_function = [shr_ptr = std::move(shr_ptr)] () { + rclcpp::Executor::execute_subscription(shr_ptr); + }; + + return true; + } + bool fill_any_executable(AnyExecutableCbg & any_executable, const rclcpp::TimerBase::WeakPtr & ptr) + { + auto shr_ptr = ptr.lock(); + if(!shr_ptr) + { + return false; + } + auto data = shr_ptr->call(); + if (!data) { + // timer was cancelled, skip it. + return false; + } + + any_executable.execute_function = [shr_ptr = std::move(shr_ptr)] () { + shr_ptr->execute_callback(); +// rclcpp::Executor::execute_timer(shr_ptr, data); + }; + + return true; + } + bool fill_any_executable( + AnyExecutableCbg & any_executable, + const rclcpp::ServiceBase::WeakPtr & ptr) + { + auto shr_ptr = ptr.lock(); + if(!shr_ptr) + { + return false; + } + any_executable.execute_function = [shr_ptr = std::move(shr_ptr)] () { + rclcpp::Executor::execute_service(shr_ptr); + }; + + return true; + } + bool fill_any_executable( + AnyExecutableCbg & any_executable, + const rclcpp::ClientBase::WeakPtr & ptr) + { + auto shr_ptr = ptr.lock(); + if(!shr_ptr) + { + return false; + } + any_executable.execute_function = [shr_ptr = std::move(shr_ptr)] () { + rclcpp::Executor::execute_client(shr_ptr); + }; + + return true; + } + bool fill_any_executable( + AnyExecutableCbg & any_executable, + const rclcpp::Waitable::WeakPtr & ptr) + { + auto shr_ptr = ptr.lock(); + if(!shr_ptr) + { + return false; + } + auto data = shr_ptr->take_data(); + + any_executable.execute_function = [shr_ptr = std::move(shr_ptr), data = std::move(data)] () { + std::shared_ptr cpy = std::move(data); + shr_ptr->execute(cpy); + }; + + return true; + } + bool fill_any_executable(AnyExecutable & any_executable, const rclcpp::TimerBase::WeakPtr & ptr) + { + any_executable.timer = ptr.lock(); + if (any_executable.timer) { + auto data = any_executable.timer->call(); + if (!data) { + // timer was cancelled, skip it. + return false; + } + +// any_executable.data = *data; + + return true; + //RCUTILS_LOG_INFO("Executing timer"); + } + return false; + } + bool fill_any_executable(AnyExecutable & any_executable, const rclcpp::ServiceBase::WeakPtr & ptr) + { + any_executable.service = ptr.lock(); +// if (any_executable.service) { +// //RCUTILS_LOG_INFO("Executing service"); +// } + return any_executable.service.operator bool(); + } + bool fill_any_executable(AnyExecutable & any_executable, const rclcpp::ClientBase::WeakPtr & ptr) + { + any_executable.client = ptr.lock(); +// if (any_executable.client) { +// //RCUTILS_LOG_INFO("Executing client"); +// } + return any_executable.client.operator bool(); + } + bool fill_any_executable(AnyExecutable & any_executable, const rclcpp::Waitable::WeakPtr & ptr) + { + any_executable.waitable = ptr.lock(); + if (any_executable.waitable) { + any_executable.data = any_executable.waitable->take_data(); + return true; + } + + return false; + } + + std::vector ready_executables; + size_t next_unprocessed_ready_executable = 0; + +}; + +class CallbackGroupScheduler +{ +public: + + enum SchedulingPolicy + { + // Execute all ready events in priority order + PrioritizedFistInFirstOut, + // Only execute the highest ready event + Prioritized, + }; + + CallbackGroupScheduler(SchedulingPolicy sched_policy = SchedulingPolicy::PrioritizedFistInFirstOut) : + sched_policy(sched_policy) + { + } + + void clear_and_prepare(const size_t max_timer, const size_t max_subs, const size_t max_services, const size_t max_clients, const size_t max_waitables); + + void add_ready_executable(TimerRef & executable); + void add_ready_executable(SubscriberRef & executable); + void add_ready_executable(ServiceRef & executable); + void add_ready_executable(ClientRef & executable); + void add_ready_executable(WaitableRef & executable); + + enum Priorities + { + Timer = 0, + Subscription, + Service, + Client, + Waitable + }; + + bool get_unprocessed_executable(AnyExecutable & any_executable, enum Priorities for_priority); + bool execute_unprocessed_executable_until(const std::chrono::time_point &stop_time, enum Priorities for_priority); + + bool has_unprocessed_executables(); + +private: + ExecutionGroup ready_timers; + ExecutionGroup ready_subscriptions; + ExecutionGroup ready_services; + ExecutionGroup ready_clients; + ExecutionGroup ready_waitables; + + SchedulingPolicy sched_policy; +}; + +class CBGExecutor : public rclcpp::Executor +{ +public: + RCLCPP_SMART_PTR_DEFINITIONS(CBGExecutor) + + /** + * For the yield_before_execute option, when true std::this_thread::yield() + * will be called after acquiring work (as an AnyExecutable) and + * releasing the spinning lock, but before executing the work. + * This is useful for reproducing some bugs related to taking work more than + * once. + * + * \param options common options for all executors + * \param number_of_threads number of threads to have in the thread pool, + * the default 0 will use the number of cpu cores found (minimum of 2) + * \param yield_before_execute if true std::this_thread::yield() is called + * \param timeout maximum time to wait + */ + RCLCPP_PUBLIC + explicit CBGExecutor( + const rclcpp::ExecutorOptions & options = rclcpp::ExecutorOptions(), + size_t number_of_threads = 0, + std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + + RCLCPP_PUBLIC + virtual ~CBGExecutor(); + + RCLCPP_PUBLIC + virtual void + add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify = true); + + RCLCPP_PUBLIC + virtual std::vector + get_all_callback_groups(); + + RCLCPP_PUBLIC + virtual void + remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + bool notify = true); + + RCLCPP_PUBLIC + virtual void + add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify = true); + + /// Convenience function which takes Node and forwards NodeBaseInterface. + /** + * \see rclcpp::Executor::add_node + */ + RCLCPP_PUBLIC + virtual void + add_node(std::shared_ptr node_ptr, bool notify = true); + + RCLCPP_PUBLIC + virtual void + remove_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify = true); + + /// Convenience function which takes Node and forwards NodeBaseInterface. + /** + * \see rclcpp::Executor::remove_node + */ + RCLCPP_PUBLIC + virtual void + remove_node(std::shared_ptr node_ptr, bool notify = true); + + + // add a callback group to the executor, not bound to any node + void add_callback_group_only(rclcpp::CallbackGroup::SharedPtr group_ptr); + + /** + * \sa rclcpp::Executor:spin() for more details + * \throws std::runtime_error when spin() called while already spinning + */ + RCLCPP_PUBLIC + void + spin(); + + virtual void + spin_once(std::chrono::nanoseconds timeout = std::chrono::nanoseconds(-1)); + + RCLCPP_PUBLIC + virtual void + spin_some(std::chrono::nanoseconds max_duration = std::chrono::nanoseconds(0)); + + /** + * @return true if work was available and executed + */ + bool collect_and_execute_ready_events( + std::chrono::nanoseconds max_duration, + bool recollect_if_no_work_available); + + virtual void + spin_all(std::chrono::nanoseconds max_duration); + + RCLCPP_PUBLIC + void + execute_any_executable(AnyExecutable & any_exec); + + + /// Cancel any running spin* function, causing it to return. + /** + * This function can be called asynchonously from any thread. + * \throws std::runtime_error if there is an issue triggering the guard condition + */ + RCLCPP_PUBLIC + void + cancel(); + + RCLCPP_PUBLIC + size_t + get_number_of_threads(); + + bool + is_spinning() + { + return spinning; + } + + template + FutureReturnCode + spin_until_future_complete( + const FutureT & future, + std::chrono::duration timeout = std::chrono::duration(-1)) + { + // TODO(wjwwood): does not work recursively; can't call spin_node_until_future_complete + // inside a callback executed by an executor. + + // Check the future before entering the while loop. + // If the future is already complete, don't try to spin. + std::future_status status = future.wait_for(std::chrono::seconds(0)); + if (status == std::future_status::ready) { + return FutureReturnCode::SUCCESS; + } + + auto end_time = std::chrono::steady_clock::now(); + std::chrono::nanoseconds timeout_ns = std::chrono::duration_cast( + timeout); + if (timeout_ns > std::chrono::nanoseconds::zero()) { + end_time += timeout_ns; + } + std::chrono::nanoseconds timeout_left = timeout_ns; + + if (spinning.exchange(true)) { + throw std::runtime_error("spin_until_future_complete() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + while (rclcpp::ok(this->context_) && spinning.load()) { + // Do one item of work. + spin_once_internal(timeout_left); + + // Check if the future is set, return SUCCESS if it is. + status = future.wait_for(std::chrono::seconds(0)); + if (status == std::future_status::ready) { + return FutureReturnCode::SUCCESS; + } + // If the original timeout is < 0, then this is blocking, never TIMEOUT. + if (timeout_ns < std::chrono::nanoseconds::zero()) { + continue; + } + // Otherwise check if we still have time to wait, return TIMEOUT if not. + auto now = std::chrono::steady_clock::now(); + if (now >= end_time) { + return FutureReturnCode::TIMEOUT; + } + // Subtract the elapsed time from the original timeout. + timeout_left = std::chrono::duration_cast(end_time - now); + } + + // The future did not complete before ok() returned false, return INTERRUPTED. + return FutureReturnCode::INTERRUPTED; + } + +protected: + RCLCPP_PUBLIC + void + run(size_t this_thread_number); + + void wait_for_work(std::chrono::nanoseconds timeout, bool do_not_wait_if_all_groups_busy = true); + + struct CallbackGroupData + { + CallbackGroup::WeakPtr callback_group; + + std::unique_ptr scheduler; + std::unique_ptr executable_cache; + }; + + bool + get_next_ready_executable(AnyExecutable & any_executable); + + bool execute_ready_executables_until(const std::chrono::time_point &stop_time); + +private: + void sync_callback_groups(); + + void spin_once_internal(std::chrono::nanoseconds timeout); + + RCLCPP_DISABLE_COPY(CBGExecutor) + + std::deque> update_functions; + + std::mutex added_callback_groups_mutex_; + std::vector added_callback_groups; + + std::mutex added_nodes_mutex_; + std::vector added_nodes; + + std::mutex callback_groups_mutex; + + //FIXME make unique_ptr + std::vector callback_groups; + + std::mutex wait_mutex_; + size_t number_of_threads_; + + std::chrono::nanoseconds next_exec_timeout_; + + std::atomic_bool needs_callback_group_resync; + + /// Spinning state, used to prevent multi threaded calls to spin and to cancel blocking spins. + std::atomic_bool spinning; + + /// Guard condition for signaling the rmw layer to wake up for special events. + std::shared_ptr interrupt_guard_condition_; + + /// Guard condition for signaling the rmw layer to wake up for system shutdown. + std::shared_ptr shutdown_guard_condition_; + + /// Wait set for managing entities that the rmw layer waits on. + rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set(); + + /// shutdown callback handle registered to Context + rclcpp::OnShutdownCallbackHandle shutdown_callback_handle_; + + /// The context associated with this executor. + std::shared_ptr context_; + + /// Stores the executables for the internal guard conditions + /// e.g. interrupt_guard_condition_ and shutdown_guard_condition_ + std::unique_ptr global_executable_cache; + + /// Stores the executables for guard conditions of the nodes + std::unique_ptr nodes_executable_cache; + + std::mutex conditional_mutex; + std::condition_variable work_ready_conditional; + +}; + +} // namespace executors +} // namespace rclcpp diff --git a/rclcpp/include/rclcpp/executors/detail/weak_executable_with_rcl_handle.hpp b/rclcpp/include/rclcpp/executors/detail/weak_executable_with_rcl_handle.hpp new file mode 100644 index 0000000000..d7fecf5b33 --- /dev/null +++ b/rclcpp/include/rclcpp/executors/detail/weak_executable_with_rcl_handle.hpp @@ -0,0 +1,697 @@ +#pragma once +#include "rclcpp/subscription_base.hpp" +#include "rclcpp/timer.hpp" +#include "rclcpp/service.hpp" +#include "rclcpp/client.hpp" +#include "rclcpp/waitable.hpp" +#include "rclcpp/guard_condition.hpp" +#include "rclcpp/callback_group.hpp" +#include +#include +#include + +namespace rclcpp::executors +{ + +class CallbackGroupScheduler; + +template +struct WeakExecutableWithRclHandle +{ + WeakExecutableWithRclHandle(const std::shared_ptr &executable, std::shared_ptr &&rcl_handle, CallbackGroupScheduler *scheduler) + : executable(executable), executable_ptr(executable.get()), rcl_handle(std::move(rcl_handle)), scheduler(scheduler), processed(false) + { + } +/* + template + WeakExecutableWithRclHandle(const std::shared_ptr &executableIn, getHandleFun, CallbackGroupScheduler *scheduler) + : executable(executableIn), executable_ptr(executableIn.get()), rcl_handle(getHandleFun(executableIn)), scheduler(scheduler), processed(false) + { + }*/ + + using RclHandleType = RclHandleType_T; + using ExecutableType = ExecutableType_T; + +// WeakExecutableWithRclHandle(WeakExecutableWithRclHandle &&) = default; +/* + WeakExecutableWithRclHandle& operator= (const WeakExecutableWithRclHandle &) = delete;*/ + + + std::weak_ptr executable; + const ExecutableType *executable_ptr; + std::shared_ptr rcl_handle; + CallbackGroupScheduler *scheduler; + bool processed; + + bool executable_alive() + { + auto use_cnt = rcl_handle.use_count(); + if(use_cnt == 0) + { + return false; + } + + if(use_cnt <= 1) + { + executable.reset(); + rcl_handle.reset(); + return false; + } + + return true; + } + +}; + +struct GuardConditionWithFunction +{ + GuardConditionWithFunction(rclcpp::GuardCondition::SharedPtr gc, std::function fun) : + guard_condition(std::move(gc)), handle_guard_condition_fun(std::move(fun)) + { + } + + rclcpp::GuardCondition::SharedPtr guard_condition; + + // A function that should be executed if the guard_condition is ready + std::function handle_guard_condition_fun; +}; + +using TimerRef = WeakExecutableWithRclHandle; + +using SubscriberRef = WeakExecutableWithRclHandle; +using ClientRef = WeakExecutableWithRclHandle; +using ServiceRef = WeakExecutableWithRclHandle; +using WaitableRef = WeakExecutableWithRclHandle; + +using AnyRef = std::variant; + +/// Helper class to compute the size of a waitset +struct WaitSetSize +{ + size_t subscriptions = 0; + size_t clients = 0; + size_t services = 0; + size_t timers = 0; + size_t guard_conditions = 0; + size_t events = 0; + + void clear() + { + subscriptions = 0; + clients = 0; + services = 0; + timers = 0; + guard_conditions = 0; + events = 0; + } + + void addWaitable(const rclcpp::Waitable::SharedPtr & waitable_ptr) + { + if (!waitable_ptr) { + return; + } + + subscriptions += waitable_ptr->get_number_of_ready_subscriptions(); + clients += waitable_ptr->get_number_of_ready_clients(); + services += waitable_ptr->get_number_of_ready_services(); + timers += waitable_ptr->get_number_of_ready_timers(); + guard_conditions += waitable_ptr->get_number_of_ready_guard_conditions(); + events += waitable_ptr->get_number_of_ready_events(); + } + + void add(const WaitSetSize &waitset) + { + subscriptions += waitset.subscriptions; + clients += waitset.clients; + services += waitset.services; + timers += waitset.timers; + guard_conditions += waitset.guard_conditions; + events += waitset.events; +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "add: waitset new size : t %lu, s %lu, c %lu, s %lu, gc %lu", timers, subscriptions, clients, services, guard_conditions); + } + + void clear_and_resize_wait_set(rcl_wait_set_s & wait_set) const + { + // clear wait set + rcl_ret_t ret = rcl_wait_set_clear(&wait_set); + if (ret != RCL_RET_OK) { + exceptions::throw_from_rcl_error(ret, "Couldn't clear wait set"); + } + +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "Needed size of waitset : t %lu, s %lu, c %lu, s %lu, gc %lu", timers, subscriptions, clients, services, guard_conditions); + if(wait_set.size_of_subscriptions < subscriptions || wait_set.size_of_guard_conditions < guard_conditions || + wait_set.size_of_timers < timers || wait_set.size_of_clients < clients || wait_set.size_of_services < services || wait_set.size_of_events < events) + { + // The size of waitables are accounted for in size of the other entities + ret = rcl_wait_set_resize( + &wait_set, subscriptions, + guard_conditions, timers, + clients, services, events); + if (RCL_RET_OK != ret) { + exceptions::throw_from_rcl_error(ret, "Couldn't resize the wait set"); + } + } + } +}; + + +class InsertHelper +{ + + + size_t idx = 0; + size_t final_size = 0; + bool valid = true; + std::vector &container; +public: + + InsertHelper(std::vector &con) : + container(con) + { + }; + + template< class RefType, class Pointer> + inline bool exists(const Pointer &s) + { + final_size++; +// std::cout << "exists called " << final_size << "times" << std::endl; + if(valid && idx < container.size()) + { + if(s.get() == std::get(container[idx]).executable_ptr) + { +// std::cout << idx << " found reusing element" << std::endl; + idx++; + return true; + } + else + { +// std::cout << idx << " Not found, deleting old size " << container.size() << std::endl; + container.erase(container.begin() + idx, container.end()); +// std::cout << idx << " Not found, deleting new size " << container.size() << std::endl; + valid = false; +// final_size = idx; + return false; + } + + } + valid = false; + +// final_size++; + + return false; + } + + void resize() + { +// std::cout << "Container has size " << container.size() << " should be " << final_size << std::endl; + container.erase(container.begin() + final_size, container.end()); + } + +}; + +struct WeakExecutableWithRclHandleCache +{ + WeakExecutableWithRclHandleCache(CallbackGroupScheduler &scheduler); + WeakExecutableWithRclHandleCache(); + + std::vector timers; + std::vector subscribers; + std::vector clients; + std::vector services; + std::vector waitables; + std::vector guard_conditions; +// std::vector timers_tmp; +// std::vector subscribers_tmp; +// std::vector clients_tmp; +// std::vector services_tmp; +// std::vector waitables_tmp; +// +// InsertHelper subHelper; +// InsertHelper timerHelper; +// InsertHelper clientHelper; +// InsertHelper serviceHelper; +// InsertHelper waitableHelper; + + + CallbackGroupScheduler &scheduler; + + bool cache_ditry = true; + + WaitSetSize wait_set_size; + + void clear() + { + timers.clear(); + subscribers.clear(); + clients.clear(); + services.clear(); + waitables.clear(); + guard_conditions.clear(); + + wait_set_size.clear(); + } + + template + inline bool move_matching_element(const std::shared_ptr &exec_ptr, std::deque> &source, std::deque> &target) + { + //we run with the assumption, that the ordering of elements did not change since the last call + //therefore we just delete the first elements in case we did not find anything + while(!source.empty()) + { + if(exec_ptr.get() == std::get(*source.front()).executable_ptr) + { + target.push_back(std::move(source.front())); + source.pop_front(); + return true; + } + +// std::cout << "Elemente not found " << std::endl; + source.pop_front(); + } + + return false; + } + + + void regenerate(rclcpp::CallbackGroup & callback_group) + { +// std::cout << "Regenerate callback_group_ptr " << &callback_group << std::endl; + +// timers.clear(); +// subscribers.clear(); +// clients.clear(); +// services.clear(); +// waitables.clear(); +// guard_conditions.clear(); + + wait_set_size.clear(); + + // we reserve to much memory here, this this should be fine + if(timers.capacity() == 0) + { + timers.reserve(callback_group.size()); + subscribers.reserve(callback_group.size()); + clients.reserve(callback_group.size()); + services.reserve(callback_group.size()); + waitables.reserve(callback_group.size()); + +// timers_tmp.reserve(callback_group.size()); +// subscribers_tmp.reserve(callback_group.size()); +// clients_tmp.reserve(callback_group.size()); +// services_tmp.reserve(callback_group.size()); +// waitables_tmp.reserve(callback_group.size()); + } + +// timers.swap(timers_tmp); +// subscribers.swap(subscribers_tmp); +// clients.swap(clients_tmp); +// services.swap(services_tmp); +// waitables.swap(waitables_tmp); + +// timers.clear(); +// subscribers.clear(); +// clients.clear(); +// services.clear(); +// waitables.clear(); + + + InsertHelper sub_helper(subscribers); + InsertHelper timer_helper(timers); + InsertHelper client_helper(clients); + InsertHelper service_helper(services); + InsertHelper waitable_helper(waitables); + + const auto add_sub = [this, &sub_helper](const rclcpp::SubscriptionBase::SharedPtr &s) + { +// std::cout << "add_sub called" << std::endl; + + if(sub_helper.exists(s)) + { + return; + } + +// if(sub_idx >= 0 && sub_idx < static_cast(subscribers.size())) +// { +// if(s.get() && s.get() == std::get(subscribers[sub_idx]).executable_ptr) +// { +// // std::cout << sub_idx << " Reusiong sub" << std::endl; +// sub_idx++; +// return; +// } +// else +// { +// // std::cout << sub_idx << " Not found, deleting" << std::endl; +// subscribers.erase(subscribers.begin() + sub_idx, subscribers.end()); +// sub_idx = -1; +// } +// +// } + +// if(move_matching_element(s, subscribers_tmp, subscribers)) +// { +// return; +// } + + //element not found, add new one +// auto handle_shr_ptr = ; + auto &entry = subscribers.emplace_back(SubscriberRef(s, s->get_subscription_handle(), &scheduler)); + const_cast::type *>(std::get(entry).rcl_handle.get())->user_data = &entry; +// if(subHelper.exists(s.get())) +// { +// return; +// } + +// std::cout << "Adding new subscriber" << std::endl; + +// auto handle_shr_ptr = s->get_subscription_handle(); +// auto &entry = subscribers.emplace_back(SubscriberRef(s, std::move(handle_shr_ptr), &scheduler)); +// const_cast::type *>(std::get(entry).rcl_handle.get())->user_data = &entry; + }; + const auto add_timer = [this, &timer_helper](const rclcpp::TimerBase::SharedPtr &s) + { + if(timer_helper.exists(s)) + { + return; + } +// if(move_matching_element(s, timers_tmp, timers)) +// { +// return; +// } + +// auto handle_shr_ptr = ; + auto &entry = timers.emplace_back(TimerRef(s, s->get_timer_handle(), &scheduler)); + const_cast::type *>(std::get(entry).rcl_handle.get())->user_data = &entry; + }; + + const auto add_client = [this, &client_helper](const rclcpp::ClientBase::SharedPtr &s) + { + if(client_helper.exists(s)) + { + return; + } +// if(move_matching_element(s, clients_tmp, clients)) +// { +// return; +// } + + auto &entry = clients.emplace_back(ClientRef(s, s->get_client_handle(), &scheduler)); + const_cast::type *>(std::get(entry).rcl_handle.get())->user_data = &entry; + }; + const auto add_service = [this, &service_helper](const rclcpp::ServiceBase::SharedPtr &s) + { + if(service_helper.exists(s)) + { + return; + } +// if(move_matching_element(s, services_tmp, services)) +// { +// return; +// } + +// auto handle_shr_ptr = s->get_service_handle(); + auto &entry = services.emplace_back(ServiceRef(s, s->get_service_handle(), &scheduler)); + const_cast::type *>(std::get(entry).rcl_handle.get())->user_data = &entry; + }; + + wait_set_size.guard_conditions = 0; + + if(guard_conditions.empty()) + { +// guard_conditions.reserve(1); + + add_guard_condition(callback_group.get_notify_guard_condition(), [this]() { + // RCUTILS_LOG_ERROR_NAMED("rclcpp", "GC: Callback group was changed"); + + cache_ditry = true; + }); + } + else + { + wait_set_size.guard_conditions++; + } + + const auto add_waitable = [this, &waitable_helper](const rclcpp::Waitable::SharedPtr &s) + { + if(waitable_helper.exists(s)) + { + wait_set_size.addWaitable(s); + return; + } +// if(move_matching_element(s, waitables_tmp, waitables)) +// { +// wait_set_size.addWaitable(s); +// return; +// } + + rclcpp::Waitable::SharedPtr cpy(s); + wait_set_size.addWaitable(s); + waitables.emplace_back(WaitableRef(s, std::move(cpy), &scheduler)); + }; + + + // as we hold waitable pointers, we keep them implicitly alive. We need to drop them before the recollect + for(AnyRef &any : waitables) + { + std::get(any).executable_alive(); + } + + callback_group.collect_all_ptrs(add_sub, add_service, add_client, add_timer, add_waitable); + +// subHelper.remove_missing(subscribers); +// timerHelper.remove_missing(timers); +// clientHelper.remove_missing(clients); +// serviceHelper.remove_missing(services); +// waitableHelper.remove_missing(waitables); + + sub_helper.resize(); + timer_helper.resize(); + client_helper.resize(); + service_helper.resize(); + waitable_helper.resize(); +// // subscribers_tmp.clear(); +// timers_tmp.clear(); +// services_tmp.clear(); +// clients_tmp.clear(); +// waitables_tmp.clear(); + + wait_set_size.timers += timers.size(); + wait_set_size.subscriptions += subscribers.size(); + wait_set_size.clients += clients.size(); + wait_set_size.services += services.size(); + +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "GC: regeneraetd, new size : t %lu, sub %lu, c %lu, s %lu, gc %lu, waitables %lu", wait_set_size.timers, wait_set_size.subscriptions, wait_set_size.clients, wait_set_size.services, wait_set_size.guard_conditions, waitables.size()); + + + cache_ditry = false; + } + + void add_guard_condition(rclcpp::GuardCondition::SharedPtr ptr, std::function fun) + { + guard_conditions.emplace_back(GuardConditionWithFunction(std::move(ptr), std::move(fun))); + + for(auto &entry : guard_conditions) + { + GuardConditionWithFunction &gcwf = std::get(entry); + gcwf.guard_condition->get_rcl_guard_condition().user_data = &entry; + } + + wait_set_size.guard_conditions++; + } + + template + inline bool add_container_to_wait_set(const ContainerType &container, rcl_wait_set_s & ws) const + { + for(const auto &any_ref: container) + { + const RefType &ref = std::get(*any_ref); + if(!add_to_wait_set(ws, ref.rcl_handle.get())) + { + return false; + } + } + + return true; + } + + template + inline bool add_container_to_wait_set(const std::vector &container, rcl_wait_set_s & ws) const + { + for(const auto &any_ref: container) + { + const RefType &ref = std::get(any_ref); + if(!add_to_wait_set(ws, ref.rcl_handle.get())) + { + return false; + } + } + + return true; + } + + bool add_to_wait_set(rcl_wait_set_s & ws) + { + if(!add_container_to_wait_set(timers, ws)) + { + return false; + } + if(!add_container_to_wait_set(subscribers, ws)) + { + return false; + } + if(!add_container_to_wait_set(services, ws)) + { + return false; + } + if(!add_container_to_wait_set(clients, ws)) + { + return false; + } + for(auto &any_ref: waitables) + { + WaitableRef &ref = std::get(any_ref); + add_to_wait_set(ws, ref); + } + + //RCUTILS_LOG_ERROR_NAMED("rclcpp", "Adding %lu guard conditions", guard_conditions.size()); + + for(auto &any_ref: guard_conditions) + { + const GuardConditionWithFunction &ref = std::get(any_ref); + add_to_wait_set(ws, ref.guard_condition); + } + + return true; + } + + inline bool add_to_wait_set( + rcl_wait_set_s & ws, + rcl_subscription_t *handle_ptr) const + { + if (rcl_wait_set_add_subscription( + &ws, handle_ptr, + nullptr) != RCL_RET_OK) + { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "Couldn't add subscription to wait set: %s", rcl_get_error_string().str); + return false; + } + + return true; + } + + inline bool add_to_wait_set( + rcl_wait_set_s & ws, + const rcl_timer_t *handle_ptr) const + { + if (rcl_wait_set_add_timer(&ws, handle_ptr, nullptr) != RCL_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "Couldn't add timer to wait set: %s", rcl_get_error_string().str); + return false; + } + + return true; + } + inline bool add_to_wait_set( + rcl_wait_set_s & ws, + const rcl_service_t *handle_ptr) const + { + if (rcl_wait_set_add_service(&ws, handle_ptr, nullptr) != RCL_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "Couldn't add service to wait set: %s", rcl_get_error_string().str); + return false; + } + + return true; + } + inline bool add_to_wait_set( + rcl_wait_set_s & ws, + const rclcpp::GuardCondition::SharedPtr &gc_ptr) const + { + rcl_ret_t ret = rcl_wait_set_add_guard_condition(&ws, &(gc_ptr->get_rcl_guard_condition()), nullptr); + + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error( + ret, "failed to add guard condition to wait set"); + } + + return true; + } + + + inline bool add_to_wait_set( + rcl_wait_set_s & ws, + const rcl_client_t *handle_ptr) const + { + if (rcl_wait_set_add_client(&ws, handle_ptr, nullptr) != RCL_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "Couldn't add client to wait set: %s", rcl_get_error_string().str); + return false; + } + + return true; + } + inline bool add_to_wait_set( + rcl_wait_set_s & ws, + WaitableRef &waitable) + { + const size_t nr_of_added_clients = ws.nr_of_added_clients; + const size_t nr_of_added_events = ws.nr_of_added_events; + const size_t old_nr_of_added_guard_conditions = ws.nr_of_added_guard_conditions; + const size_t old_nr_of_added_services = ws.nr_of_added_services; + const size_t old_nr_of_added_subscriptions = ws.nr_of_added_subscriptions; + const size_t old_nr_of_added_timers = ws.nr_of_added_timers; + + // reset the processed flag, used to make sure, that a waitable + // will not get added two times to the scheduler + waitable.processed = false; + waitable.rcl_handle->add_to_wait_set(&ws); + + { + for (size_t i = nr_of_added_clients; i < ws.nr_of_added_clients; i++) { + const_cast(ws.clients[i])->user_data = &waitable; + } + } + + { + for (size_t i = nr_of_added_events; i < ws.nr_of_added_events; i++) { + const_cast(ws.events[i])->user_data = &waitable; + } + } + + { + for (size_t i = old_nr_of_added_guard_conditions; i < ws.nr_of_added_guard_conditions; i++) { + const_cast(ws.guard_conditions[i])->user_data = &waitable; + } + } + + { + for (size_t i = old_nr_of_added_services; i < ws.nr_of_added_services; i++) { + const_cast(ws.services[i])->user_data = &waitable; + } + } + + { + for (size_t i = old_nr_of_added_subscriptions; i < ws.nr_of_added_subscriptions; i++) { + const_cast(ws.subscriptions[i])->user_data = &waitable; + } + } + + { + for (size_t i = old_nr_of_added_timers; i < ws.nr_of_added_timers; i++) { + const_cast(ws.timers[i])->user_data = &waitable; + } + } + + return true; + } + + static void collect_ready_from_waitset(rcl_wait_set_s & ws); +}; + + + +} diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 4dd0c9a358..094616f908 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -612,7 +612,7 @@ take_and_do_error_handling( } void -Executor::execute_subscription(rclcpp::SubscriptionBase::SharedPtr subscription) +Executor::execute_subscription(const rclcpp::SubscriptionBase::SharedPtr &subscription) { using rclcpp::dynamic_typesupport::DynamicMessage; @@ -720,7 +720,7 @@ Executor::execute_timer(rclcpp::TimerBase::SharedPtr timer) } void -Executor::execute_service(rclcpp::ServiceBase::SharedPtr service) +Executor::execute_service(const rclcpp::ServiceBase::SharedPtr &service) { auto request_header = service->create_request_header(); std::shared_ptr request = service->create_request(); @@ -733,7 +733,7 @@ Executor::execute_service(rclcpp::ServiceBase::SharedPtr service) void Executor::execute_client( - rclcpp::ClientBase::SharedPtr client) + const rclcpp::ClientBase::SharedPtr &client) { auto request_header = client->create_request_header(); std::shared_ptr response = client->create_response(); diff --git a/rclcpp/src/rclcpp/executors/cbg_executor.cpp b/rclcpp/src/rclcpp/executors/cbg_executor.cpp new file mode 100644 index 0000000000..e1ba0a3021 --- /dev/null +++ b/rclcpp/src/rclcpp/executors/cbg_executor.cpp @@ -0,0 +1,922 @@ +// Copyright 2024 Cellumation GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/executors/cbg_executor.hpp" + +#include +#include +#include +#include + +#include "rcpputils/scope_exit.hpp" +#include "rclcpp/exceptions/exceptions.hpp" +#include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp" + +#include "rclcpp/logging.hpp" +#include "rclcpp/node.hpp" +#include +// #include "rclcpp/executors/detail/rcl_to_rclcpp_map.hpp" +// #include "rclcpp/executors/detail/any_executable_weak_ref.hpp" + +namespace rclcpp::executors +{ + +CBGExecutor::CBGExecutor( + const rclcpp::ExecutorOptions & options, + size_t number_of_threads, + std::chrono::nanoseconds next_exec_timeout) + : next_exec_timeout_(next_exec_timeout), + + spinning(false), + interrupt_guard_condition_(std::make_shared(options.context)), + shutdown_guard_condition_(std::make_shared(options.context)), + context_(options.context), + global_executable_cache(std::make_unique()), + nodes_executable_cache(std::make_unique()) +{ + + global_executable_cache->add_guard_condition( + interrupt_guard_condition_, + std::function()); + global_executable_cache->add_guard_condition( + shutdown_guard_condition_, [this] () + { + work_ready_conditional.notify_all(); + }); + + + number_of_threads_ = number_of_threads > 0 ? + number_of_threads : + std::max(std::thread::hardware_concurrency(), 2U); + + shutdown_callback_handle_ = context_->add_on_shutdown_callback( + [weak_gc = std::weak_ptr {shutdown_guard_condition_}]() { + auto strong_gc = weak_gc.lock(); + if (strong_gc) { + strong_gc->trigger(); + } + }); + + rcl_allocator_t allocator = options.memory_strategy->get_allocator(); + + rcl_ret_t ret = rcl_wait_set_init( + &wait_set_, + 0, 0, 0, 0, 0, 0, + context_->get_rcl_context().get(), + allocator); + if (RCL_RET_OK != ret) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "failed to create wait set: %s", rcl_get_error_string().str); + rcl_reset_error(); + exceptions::throw_from_rcl_error(ret, "Failed to create wait set in Executor constructor"); + } + +} + +CBGExecutor::~CBGExecutor() +{ + + work_ready_conditional.notify_all(); + + std::vector added_nodes_cpy; + { + std::lock_guard lock{added_nodes_mutex_}; + added_nodes_cpy = added_nodes; + } + + for (const node_interfaces::NodeBaseInterface::WeakPtr & node_weak_ptr : added_nodes_cpy) { + const node_interfaces::NodeBaseInterface::SharedPtr & node_ptr = node_weak_ptr.lock(); + if (node_ptr) { + remove_node(node_ptr, false); + } + } + + std::vector added_cbgs_cpy; + { + std::lock_guard lock{added_callback_groups_mutex_}; + added_cbgs_cpy = added_callback_groups; + } + + for (const auto & weak_ptr : added_cbgs_cpy) { + auto shr_ptr = weak_ptr.lock(); + if (shr_ptr) { + remove_callback_group(shr_ptr, false); + } + } + + // Remove shutdown callback handle registered to Context + if (!context_->remove_on_shutdown_callback(shutdown_callback_handle_)) { + RCUTILS_LOG_ERROR_NAMED( + "rclcpp", + "failed to remove registered on_shutdown callback"); + rcl_reset_error(); + } + +} + +void CallbackGroupScheduler::clear_and_prepare(const size_t max_timer, const size_t max_subs, const size_t max_services, const size_t max_clients, const size_t max_waitables) +{ + ready_timers.clear_and_prepare(max_timer); + ready_subscriptions.clear_and_prepare(max_subs); + ready_services.clear_and_prepare(max_services); + ready_clients.clear_and_prepare(max_clients); + ready_waitables.clear_and_prepare(max_waitables); +} + +bool CallbackGroupScheduler::execute_unprocessed_executable_until(const std::chrono::time_point &stop_time, enum Priorities for_priority) +{ + switch (for_priority) { + case Client: + return ready_clients.execute_unprocessed_executable_until(stop_time); + break; + case Service: + return ready_services.execute_unprocessed_executable_until(stop_time); + break; + case Subscription: + return ready_subscriptions.execute_unprocessed_executable_until(stop_time); + break; + case Timer: + return ready_timers.execute_unprocessed_executable_until(stop_time); + break; + case Waitable: + return ready_waitables.execute_unprocessed_executable_until(stop_time); + break; + } + return false; +} + +bool CallbackGroupScheduler::get_unprocessed_executable( + AnyExecutable & any_executable, + enum Priorities for_priority) +{ + switch (for_priority) { + case Client: + return ready_clients.get_unprocessed_executable(any_executable); + break; + case Service: + return ready_services.get_unprocessed_executable(any_executable); + break; + case Subscription: + return ready_subscriptions.get_unprocessed_executable(any_executable); + break; + case Timer: + return ready_timers.get_unprocessed_executable(any_executable); + break; + case Waitable: + return ready_waitables.get_unprocessed_executable(any_executable); + break; + } + return false; +} + +bool CallbackGroupScheduler::has_unprocessed_executables() +{ + return ready_subscriptions.has_unprocessed_executables() || + ready_timers.has_unprocessed_executables() || + ready_clients.has_unprocessed_executables() || + ready_services.has_unprocessed_executables() || + ready_waitables.has_unprocessed_executables(); +} + + +void CallbackGroupScheduler::add_ready_executable(SubscriberRef & executable) +{ + ready_subscriptions.add_ready_executable(executable); +} +void CallbackGroupScheduler::add_ready_executable(ServiceRef & executable) +{ + ready_services.add_ready_executable(executable); +} +void CallbackGroupScheduler::add_ready_executable(TimerRef & executable) +{ + ready_timers.add_ready_executable(executable); +} +void CallbackGroupScheduler::add_ready_executable(ClientRef & executable) +{ + ready_clients.add_ready_executable(executable); +} +void CallbackGroupScheduler::add_ready_executable(WaitableRef & executable) +{ + ready_waitables.add_ready_executable(executable); +} + +bool CBGExecutor::execute_ready_executables_until(const std::chrono::time_point &stop_time) +{ + bool found_work = false; + + for (size_t i = CallbackGroupScheduler::Priorities::Timer; + i <= CallbackGroupScheduler::Priorities::Waitable; i++) + { + CallbackGroupScheduler::Priorities cur_prio(static_cast(i)); + + for (CallbackGroupData & cbg_with_data : callback_groups) + { + rclcpp::AnyExecutable any_executable; + if (cbg_with_data.scheduler->execute_unprocessed_executable_until(stop_time, cur_prio)) { + if(std::chrono::steady_clock::now() >= stop_time) + { + return true; + } + } + } + } + return found_work; +} + + +bool CBGExecutor::get_next_ready_executable(AnyExecutable & any_executable) +{ + if(!spinning.load()) + { + return false; + } + + std::lock_guard g(callback_groups_mutex); +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "get_next_ready_executable"); + + struct ReadyCallbacksWithSharedPtr + { + CallbackGroupData * data; + rclcpp::CallbackGroup::SharedPtr callback_group; + bool ready = true; + }; + + std::vector ready_callbacks; + ready_callbacks.reserve(callback_groups.size()); + + + for (auto it = callback_groups.begin(); it != callback_groups.end(); ) { + CallbackGroupData & cbg_with_data(*it); + + ReadyCallbacksWithSharedPtr e; + e.callback_group = cbg_with_data.callback_group.lock(); + if (!e.callback_group) { + it = callback_groups.erase(it); + continue; + } + + if (e.callback_group->can_be_taken_from().load()) { + e.data = &cbg_with_data; + ready_callbacks.push_back(std::move(e)); + } + + it++; + } + + bool found_work = false; + + for (size_t i = CallbackGroupScheduler::Priorities::Timer; + i <= CallbackGroupScheduler::Priorities::Waitable; i++) + { + CallbackGroupScheduler::Priorities cur_prio(static_cast(i)); + for (ReadyCallbacksWithSharedPtr & ready_elem: ready_callbacks) { + + if(!found_work) + { + if (ready_elem.data->scheduler->get_unprocessed_executable(any_executable, cur_prio)) { + any_executable.callback_group = ready_elem.callback_group; + // mark callback group as in use + any_executable.callback_group->can_be_taken_from().store(false); + found_work = true; + ready_elem.ready = false; +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "get_next_ready_executable : found ready executable"); + } + } + else + { + if (ready_elem.ready && ready_elem.data->scheduler->has_unprocessed_executables()) + { + //wake up worker thread + work_ready_conditional.notify_one(); + return true; + } + } + } + } + + return found_work; +} + +size_t +CBGExecutor::get_number_of_threads() +{ + return number_of_threads_; +} + +void CBGExecutor::sync_callback_groups() +{ + if (!needs_callback_group_resync.exchange(false)) { + return; + } +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "sync_callback_groups"); + + std::vector> cur_group_data; + cur_group_data.reserve(callback_groups.size()); + + for (CallbackGroupData & d : callback_groups) { + auto p = d.callback_group.lock(); + if (p) { + cur_group_data.emplace_back(&d, std::move(p)); + } + } + + std::scoped_lock lk(callback_groups_mutex); + std::vector next_group_data; + + std::set added_cbgs; + + auto insert_data = [&cur_group_data, &next_group_data, &added_cbgs](rclcpp::CallbackGroup::SharedPtr &&cbg) + { + // nodes may share callback groups, therefore we need to make sure we only add them once + if(added_cbgs.find(cbg.get()) != added_cbgs.end()) + { + return; + } + + added_cbgs.insert(cbg.get()); + + for (const auto & pair : cur_group_data) { + if (pair.second == cbg) { +// RCUTILS_LOG_INFO("Using existing callback group"); + next_group_data.push_back(std::move(*pair.first)); + return; + } + } + +// RCUTILS_LOG_INFO("Using new callback group"); + + CallbackGroupData new_entry; + new_entry.scheduler = std::make_unique(); + new_entry.executable_cache = std::make_unique(*new_entry.scheduler); + new_entry.executable_cache->regenerate(*cbg); + new_entry.callback_group = std::move(cbg); + next_group_data.push_back(std::move(new_entry)); + }; + + { + std::vector added_cbgs_cpy; + { + std::lock_guard lock{added_callback_groups_mutex_}; + added_cbgs_cpy = added_callback_groups; + } + + std::vector added_nodes_cpy; + { + std::lock_guard lock{added_nodes_mutex_}; + added_nodes_cpy = added_nodes; + } + + // *3 ist a rough estimate of how many callback_group a node may have + next_group_data.reserve(added_cbgs_cpy.size() + added_nodes_cpy.size() * 3); + + nodes_executable_cache->clear(); +// nodes_executable_cache->guard_conditions.reserve(added_nodes_cpy.size()); + +// RCUTILS_LOG_INFO("Added node size is %lu", added_nodes_cpy.size()); + + for (const node_interfaces::NodeBaseInterface::WeakPtr & node_weak_ptr : added_nodes_cpy) { + auto node_ptr = node_weak_ptr.lock(); + if (node_ptr) { + node_ptr->for_each_callback_group( + [&insert_data](rclcpp::CallbackGroup::SharedPtr cbg) + { + if (cbg->automatically_add_to_executor_with_node()) { + insert_data(std::move(cbg)); + } + }); + + // register node guard condition, and trigger resync on node change event + nodes_executable_cache->add_guard_condition(node_ptr->get_shared_notify_guard_condition(), + [this] () { +// RCUTILS_LOG_INFO("Node changed GC triggered"); + needs_callback_group_resync.store(true); + }); + } + } + + for (const rclcpp::CallbackGroup::WeakPtr & cbg : added_cbgs_cpy) { + auto p = cbg.lock(); + if (p) { + insert_data(std::move(p)); + } + } + } + + callback_groups.swap(next_group_data); +} + +void CBGExecutor::wait_for_work( + std::chrono::nanoseconds timeout, + bool do_not_wait_if_all_groups_busy) +{ + using namespace rclcpp::exceptions; + + WaitSetSize wait_set_size; + + sync_callback_groups(); + + std::vector idle_callback_groups; + idle_callback_groups.reserve(callback_groups.size()); + + { + std::lock_guard g(callback_groups_mutex); + for (CallbackGroupData & cbg_with_data: callback_groups) { + + // don't add anything to a waitset that has unprocessed data + if (cbg_with_data.scheduler->has_unprocessed_executables()) { + continue; + } + + auto cbg_shr_ptr = cbg_with_data.callback_group.lock(); + if (!cbg_shr_ptr) { + continue; + } + + if (!cbg_shr_ptr->can_be_taken_from()) { + continue; + } + + // CallbackGroupState & cbg_state = *cbg_with_data.callback_group_state; + // regenerate the state data + if (cbg_with_data.executable_cache->cache_ditry) + { + // RCUTILS_LOG_INFO("Regenerating callback group"); + // Regenerating clears the dirty flag + cbg_with_data.executable_cache->regenerate(*cbg_shr_ptr); + } + + wait_set_size.add(cbg_with_data.executable_cache->wait_set_size); + + + // setup the groups for the next round + cbg_with_data.scheduler->clear_and_prepare(cbg_with_data.executable_cache->timers.size(), + cbg_with_data.executable_cache->subscribers.size(), + cbg_with_data.executable_cache->services.size(), + cbg_with_data.executable_cache->clients.size(), + cbg_with_data.executable_cache->waitables.size()); + + idle_callback_groups.push_back(&cbg_with_data); + } + } + + if (do_not_wait_if_all_groups_busy && idle_callback_groups.empty()) { + return; + } + +// RCUTILS_LOG_WARN("Adding global_executable_cache"); + + // interrupt_guard_condition_ and shutdown_guard_condition_ + wait_set_size.add(global_executable_cache->wait_set_size); + +// RCUTILS_LOG_WARN("Adding nodes_executable_cache"); + // guard conditions of the nodes + wait_set_size.add(nodes_executable_cache->wait_set_size); + + // prepare the wait set + wait_set_size.clear_and_resize_wait_set(wait_set_); + + // add all ready callback groups + for (CallbackGroupData *cbg_with_data: idle_callback_groups) { + // no locking needed here, as we don't access the scheduler + + if(!cbg_with_data->executable_cache->add_to_wait_set(wait_set_)) + { + RCUTILS_LOG_WARN_NAMED( + "rclcpp", + "Adding of objects to waitset failed. This should never happen."); + } + } + + // add guard_conditions for node or callback change + global_executable_cache->add_to_wait_set(wait_set_); + nodes_executable_cache->add_to_wait_set(wait_set_); + + if(!spinning.load()) + { + return; + } + + rcl_ret_t status = + rcl_wait(&wait_set_, timeout.count()); + if (status == RCL_RET_WAIT_SET_EMPTY) { + RCUTILS_LOG_WARN_NAMED( + "rclcpp", + "empty wait set received in rcl_wait(). This should never happen."); + } else if (status != RCL_RET_OK && status != RCL_RET_TIMEOUT) { + using rclcpp::exceptions::throw_from_rcl_error; + throw_from_rcl_error(status, "rcl_wait() failed"); + } + + { + std::lock_guard g(callback_groups_mutex); + // super duper expensive + // This will add all ready events to the associated schedulers + WeakExecutableWithRclHandleCache::collect_ready_from_waitset(wait_set_); + } + + //at this point we don't need the wait_set_ any more +} + +void +CBGExecutor::run(size_t this_thread_number) +{ + (void)this_thread_number; + + bool had_work = false; + + while (rclcpp::ok(this->context_) && spinning.load()) { + rclcpp::AnyExecutable any_exec; + + if (!get_next_ready_executable(any_exec)) + { + if(wait_mutex_.try_lock()) + { +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "calling wait_for_work"); + wait_for_work(next_exec_timeout_, false); + wait_mutex_.unlock(); + } + else + { + //some other thread is blocking the wait_for_work function. + + if(had_work) + { +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "triggering waitset"); + + // Wake the wait, because it may need to be recalculated + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition from execute_any_executable: ") + ex.what()); + } + } + + had_work = false; + std::unique_lock lk(conditional_mutex); + + if(spinning.load()) + { +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "going to sleep"); + work_ready_conditional.wait(lk); + } +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "woken up"); + } + continue; + } + + had_work = true; + + execute_any_executable(any_exec); + + // Clear the callback_group to prevent the AnyExecutable destructor from + // resetting the callback group `can_be_taken_from` + any_exec.callback_group.reset(); + } + +// RCUTILS_LOG_INFO("Stopping execution thread"); +} + +void CBGExecutor::spin_once_internal(std::chrono::nanoseconds timeout) +{ + rclcpp::AnyExecutable any_exec; + { + if (!rclcpp::ok(this->context_) || !spinning.load()) { + return; + } + + if (!get_next_ready_executable(any_exec)) { + + wait_for_work(timeout); + + if (!get_next_ready_executable(any_exec)) { + return; + } + } + + any_exec.callback_group->can_be_taken_from().store(false); + } + + execute_any_executable(any_exec); + + // Clear the callback_group to prevent the AnyExecutable destructor from + // resetting the callback group `can_be_taken_from` + any_exec.callback_group.reset(); +} + +void +CBGExecutor::spin_once(std::chrono::nanoseconds timeout) +{ + if (spinning.exchange(true)) { + throw std::runtime_error("spin_once() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + + spin_once_internal(timeout); +} + +void +CBGExecutor::execute_any_executable(AnyExecutable & any_exec) +{ + if (any_exec.timer) { +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "Executing Timer"); + rclcpp::Executor::execute_timer(any_exec.timer);//, any_exec.data); + } + if (any_exec.subscription) { +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "Executing subscription"); + rclcpp::Executor::execute_subscription(any_exec.subscription); + } + if (any_exec.service) { +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "Executing service"); + rclcpp::Executor::execute_service(any_exec.service); + } + if (any_exec.client) { +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "Executing client"); + rclcpp::Executor::execute_client(any_exec.client); + } + if (any_exec.waitable) { +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "Executing waitable"); + any_exec.waitable->execute(any_exec.data); + } + // Reset the callback_group, regardless of type + if(any_exec.callback_group) + { + any_exec.callback_group->can_be_taken_from().store(true); + } +} + +bool CBGExecutor::collect_and_execute_ready_events( + std::chrono::nanoseconds max_duration, + bool recollect_if_no_work_available) +{ + if (spinning.exchange(true)) { + throw std::runtime_error("collect_and_execute_ready_events() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + + const auto start = std::chrono::steady_clock::now(); + const auto end_time = start + max_duration; + auto cur_time = start; + + // collect any work, that is already ready +// wait_for_work(std::chrono::nanoseconds::zero(), true); + + bool got_work_since_collect = false; + bool first_collect = true; + bool had_work = false; + + while (rclcpp::ok(this->context_) && spinning && cur_time < end_time) { + if (!execute_ready_executables_until(end_time)) { + + if (!first_collect && !recollect_if_no_work_available) { + // we are done + return had_work; + } + + if (first_collect || got_work_since_collect) { + // collect any work, that is already ready + wait_for_work(std::chrono::nanoseconds::zero(), !first_collect); + + first_collect = false; + got_work_since_collect = false; + continue; + } + + return had_work; + } else { + got_work_since_collect = true; + } + + cur_time = std::chrono::steady_clock::now(); + } + + return had_work; +} + +void +CBGExecutor::spin_some(std::chrono::nanoseconds max_duration) +{ + collect_and_execute_ready_events(max_duration, false); +} + +void CBGExecutor::spin_all(std::chrono::nanoseconds max_duration) +{ + if (max_duration < std::chrono::nanoseconds::zero()) { + throw std::invalid_argument("max_duration must be greater than or equal to 0"); + } + + collect_and_execute_ready_events(max_duration, true); +} + +void +CBGExecutor::spin() +{ +// RCUTILS_LOG_ERROR_NAMED( +// "rclcpp", +// "CBGExecutor::spin()"); + + + if (spinning.exchange(true)) { + throw std::runtime_error("spin() called while already spinning"); + } + RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); + std::vector threads; + size_t thread_id = 0; + { + std::lock_guard wait_lock{wait_mutex_}; + for (; thread_id < number_of_threads_ - 1; ++thread_id) { + auto func = std::bind(&CBGExecutor::run, this, thread_id); + threads.emplace_back(func); + } + } + + run(thread_id); + for (auto & thread : threads) { + thread.join(); + } +} + +void +CBGExecutor::add_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr /*node_ptr*/, + bool notify) +{ + { + std::lock_guard lock{added_callback_groups_mutex_}; + added_callback_groups.push_back(group_ptr); + } + needs_callback_group_resync = true; + if (notify) { + // Interrupt waiting to handle new node + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group add: ") + ex.what()); + } + } +} + +void +CBGExecutor::cancel() +{ + spinning.store(false); + + work_ready_conditional.notify_all(); + + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string("Failed to trigger guard condition in cancel: ") + ex.what()); + } + work_ready_conditional.notify_all(); +} + +std::vector +CBGExecutor::get_all_callback_groups() +{ + std::lock_guard lock{added_callback_groups_mutex_}; + return added_callback_groups; +} + +void +CBGExecutor::remove_callback_group( + rclcpp::CallbackGroup::SharedPtr group_ptr, + bool notify) +{ + bool found = false; + { + std::lock_guard lock{added_callback_groups_mutex_}; + added_callback_groups.erase( + std::remove_if( + added_callback_groups.begin(), added_callback_groups.end(), + [&group_ptr, &found](const auto & weak_ptr) + { + auto shr_ptr = weak_ptr.lock(); + if (!shr_ptr) { + return true; + } + + if (group_ptr == shr_ptr) { + found = true; + return true; + } + return false; + }), added_callback_groups.end()); + added_callback_groups.push_back(group_ptr); + } + + if (found) { + needs_callback_group_resync = true; + } + + if (notify) { + // Interrupt waiting to handle new node + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group add: ") + ex.what()); + } + } +} + +void +CBGExecutor::add_node(rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, bool notify) +{ + // If the node already has an executor + std::atomic_bool & has_executor = node_ptr->get_associated_with_executor_atomic(); + if (has_executor.exchange(true)) { + throw std::runtime_error( + std::string("Node '") + node_ptr->get_fully_qualified_name() + + "' has already been added to an executor."); + } + + { + std::lock_guard lock{added_nodes_mutex_}; + added_nodes.push_back(node_ptr); + } + + needs_callback_group_resync = true; + + if (notify) { + // Interrupt waiting to handle new node + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group add: ") + ex.what()); + } + } +} + +void +CBGExecutor::add_node(std::shared_ptr node_ptr, bool notify) +{ + add_node(node_ptr->get_node_base_interface(), notify); +} + +void +CBGExecutor::remove_node( + rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_ptr, + bool notify) +{ + { + std::lock_guard lock{added_nodes_mutex_}; + added_nodes.erase( + std::remove_if( + added_nodes.begin(), added_nodes.end(), [&node_ptr](const auto & weak_ptr) { + const auto shr_ptr = weak_ptr.lock(); + if (shr_ptr && shr_ptr == node_ptr) { + return true; + } + return false; + }), added_nodes.end()); + } + + needs_callback_group_resync = true; + + if (notify) { + // Interrupt waiting to handle new node + try { + interrupt_guard_condition_->trigger(); + } catch (const rclcpp::exceptions::RCLError & ex) { + throw std::runtime_error( + std::string( + "Failed to trigger guard condition on callback group add: ") + ex.what()); + } + } + + node_ptr->get_associated_with_executor_atomic().store(false); +} + +void +CBGExecutor::remove_node(std::shared_ptr node_ptr, bool notify) +{ + remove_node(node_ptr->get_node_base_interface(), notify); +} + +// add a callback group to the executor, not bound to any node +void CBGExecutor::add_callback_group_only(rclcpp::CallbackGroup::SharedPtr group_ptr) +{ + add_callback_group(group_ptr, nullptr, true); +} +} diff --git a/rclcpp/src/rclcpp/executors/detail/weak_executable_with_rcl_handle_cache.cpp b/rclcpp/src/rclcpp/executors/detail/weak_executable_with_rcl_handle_cache.cpp new file mode 100644 index 0000000000..fb2826c57f --- /dev/null +++ b/rclcpp/src/rclcpp/executors/detail/weak_executable_with_rcl_handle_cache.cpp @@ -0,0 +1,114 @@ +#include +#include +namespace rclcpp::executors +{ + +WeakExecutableWithRclHandleCache::WeakExecutableWithRclHandleCache(CallbackGroupScheduler &scheduler) : scheduler(scheduler) +{ +} + +WeakExecutableWithRclHandleCache::WeakExecutableWithRclHandleCache() : scheduler(*static_cast(nullptr)) +{ +} + + +template +inline void add_to_scheduler(const RclType *entry, rcl_wait_set_s & ws) +{ + if(entry) + { + AnyRef *any_ref = static_cast(entry->user_data); + +// if(RefType *ref_ptr = std::get_if(any_ref)) +// { + if(any_ref->index() != 4) + { + RefType &ref_ptr = std::get(*any_ref); +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "Found ready event of type %s", typeid(ref_ptr->executable).name()); + + ref_ptr.scheduler->add_ready_executable(ref_ptr); + } + else + { + WaitableRef &ref = std::get(*any_ref); + +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "Found ready waitable"); + + if(ref.processed) + { + return; + } + + ref.processed = true; + + if(ref.rcl_handle->is_ready(&ws)) + { + ref.scheduler->add_ready_executable(ref); + } + } + } + +} + +void WeakExecutableWithRclHandleCache::collect_ready_from_waitset(rcl_wait_set_s & ws) +{ +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "collect_ready_from_waitset Waitset has %lu timers", ws.nr_of_added_timers); + +// return; + + for (size_t i = 0; i < ws.nr_of_added_timers; i++) { + add_to_scheduler(ws.timers[i], ws); + } + for (size_t i = 0; i < ws.nr_of_added_subscriptions; i++) { + add_to_scheduler(ws.subscriptions[i], ws); + } + for (size_t i = 0; i < ws.nr_of_added_clients; i++) { + add_to_scheduler(ws.clients[i], ws); + } + for (size_t i = 0; i < ws.nr_of_added_services; i++) { + add_to_scheduler(ws.services[i], ws); + } + for (size_t i = 0; i < ws.nr_of_added_events; i++) { + add_to_scheduler(ws.events[i], ws); + } + for (size_t i = 0; i < ws.nr_of_added_guard_conditions; i++) { + const auto &entry = ws.guard_conditions[i]; + if(!entry) + { + continue; + } + + AnyRef *any_ref = static_cast(entry->user_data); + + if(GuardConditionWithFunction *ref_ptr = std::get_if(any_ref)) + { +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "Found ready GuardConditionWithFunction"); + + // if it is a guard condition, see if we have a function + // attached to it and execute it + if(ref_ptr->handle_guard_condition_fun) + { +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "Executing GuardConditionWithFunction"); + ref_ptr->handle_guard_condition_fun(); + } + } + else + { +// RCUTILS_LOG_ERROR_NAMED("rclcpp", "Found ready GuardCondition of waitable"); + WaitableRef &ref = std::get(*any_ref); + + if(ref.processed) + { + return; + } + + ref.processed = true; + + if(ref.rcl_handle->is_ready(&ws)) + { + ref.scheduler->add_ready_executable(ref); + } + } + } +} +} diff --git a/rclcpp/test/benchmark/benchmark_executor.cpp b/rclcpp/test/benchmark/benchmark_executor.cpp index a6abf52d70..412b9cba3f 100644 --- a/rclcpp/test/benchmark/benchmark_executor.cpp +++ b/rclcpp/test/benchmark/benchmark_executor.cpp @@ -20,6 +20,7 @@ #include "performance_test_fixture/performance_test_fixture.hpp" #include "rclcpp/rclcpp.hpp" +#include "rclcpp/executors/cbg_executor.hpp" #include "rcpputils/scope_exit.hpp" #include "test_msgs/msg/empty.hpp" @@ -27,6 +28,7 @@ using namespace std::chrono_literals; using performance_test_fixture::PerformanceTest; constexpr unsigned int kNumberOfNodes = 10; +constexpr unsigned int kNumberOfPubSubs = 10; class PerformanceTestExecutor : public PerformanceTest { @@ -38,14 +40,16 @@ class PerformanceTestExecutor : public PerformanceTest for (unsigned int i = 0u; i < kNumberOfNodes; i++) { nodes.push_back(std::make_shared("my_node_" + std::to_string(i))); - publishers.push_back( - nodes[i]->create_publisher( - "/empty_msgs_" + std::to_string(i), rclcpp::QoS(10))); + for (unsigned int j = 0u; j < kNumberOfPubSubs; j++) { + publishers.push_back( + nodes[i]->create_publisher( + "/thread" + std::to_string(st.thread_index()) + "/empty_msgs_" + std::to_string(i) + "_" + std::to_string(j), rclcpp::QoS(10))); - auto callback = [this](test_msgs::msg::Empty::ConstSharedPtr) {this->callback_count++;}; - subscriptions.push_back( - nodes[i]->create_subscription( - "/empty_msgs_" + std::to_string(i), rclcpp::QoS(10), std::move(callback))); + auto callback = [this](test_msgs::msg::Empty::ConstSharedPtr) {this->callback_count++;}; + subscriptions.push_back( + nodes[i]->create_subscription( + "/thread" + std::to_string(st.thread_index()) + "/empty_msgs_" + std::to_string(i) + "_" + std::to_string(j), rclcpp::QoS(10), std::move(callback))); + } } PerformanceTest::SetUp(st); } @@ -64,8 +68,18 @@ class PerformanceTestExecutor : public PerformanceTest Executer executor; for (unsigned int i = 0u; i < kNumberOfNodes; i++) { executor.add_node(nodes[i]); - publishers[i]->publish(empty_msgs); - executor.spin_some(100ms); + } + while(subscriptions.front()->get_publisher_count() == 0) + { + std::this_thread::sleep_for(10ms); + } + + // precompute executer internals + publishers[0]->publish(empty_msgs); + executor.spin_some(100ms); + + if (callback_count == 0) { + st.SkipWithError("No message was received"); } callback_count = 0; @@ -74,14 +88,18 @@ class PerformanceTestExecutor : public PerformanceTest for (auto _ : st) { (void)_; st.PauseTiming(); - for (unsigned int i = 0u; i < kNumberOfNodes; i++) { - publishers[i]->publish(empty_msgs); + for (auto &pub : publishers) { + pub->publish(empty_msgs); } +// std::this_thread::sleep_for(10ms); + std::this_thread::yield(); st.ResumeTiming(); callback_count = 0; - while (callback_count < kNumberOfNodes) { - executor.spin_some(100ms); + executor.spin_some(1000ms); + if(callback_count < publishers.size()) + { + st.SkipWithError("Not all messages were received"); } } if (callback_count == 0) { @@ -104,10 +122,17 @@ class PerformanceTestExecutor : public PerformanceTest ExecuterDerived executor; for (unsigned int i = 0u; i < kNumberOfNodes; i++) { executor.add_node(nodes[i]); - executor.spin_some(100ms); + } + while(subscriptions.front()->get_publisher_count() == 0) + { + std::this_thread::sleep_for(10ms); } // we need one ready event publishers[0]->publish(empty_msgs); + executor.spin_some(100ms); + if (callback_count == 0) { + st.SkipWithError("No message was received"); + } reset_heap_counters(); @@ -127,6 +152,60 @@ class PerformanceTestExecutor : public PerformanceTest } } + template + void benchmark_wait_for_work_force_rebuild(benchmark::State & st) + { + class ExecuterDerived : public Executer + { +public: + void call_wait_for_work(std::chrono::nanoseconds timeout) + { + Executer::wait_for_work(timeout); + } + }; + + ExecuterDerived executor; + for (unsigned int i = 0u; i < kNumberOfNodes; i++) { + executor.add_node(nodes[i]); + } + + while(subscriptions.front()->get_publisher_count() == 0) + { + std::this_thread::sleep_for(10ms); + } + + // we need one ready event + publishers[0]->publish(empty_msgs); + + executor.spin_some(100ms); + if (callback_count == 0) { + st.SkipWithError("No message was received"); + } + + // only need to force a waitset rebuild + rclcpp::PublisherBase::SharedPtr somePub; + + reset_heap_counters(); + +// std::cout << "Perf loop start !!!!!!!!!" << std::endl; + + for (auto _ : st) { + (void)_; + + st.PauseTiming(); + somePub = nodes[0]->create_publisher( + "/foo", rclcpp::QoS(10)); + // we need one ready event + publishers[0]->publish(empty_msgs); + st.ResumeTiming(); + + + executor.call_wait_for_work(std::chrono::nanoseconds::zero()); + st.PauseTiming(); + executor.spin_some(100ms); + st.ResumeTiming(); + } + } test_msgs::msg::Empty empty_msgs; std::vector nodes; std::vector::SharedPtr> publishers; @@ -140,23 +219,146 @@ BENCHMARK_F(PerformanceTestExecutor, single_thread_executor_spin_some)(benchmark executor_spin_some(st); } +BENCHMARK_F(PerformanceTestExecutor, static_single_thread_executor_spin_some)(benchmark::State & st) +{ + executor_spin_some(st); +} + BENCHMARK_F(PerformanceTestExecutor, multi_thread_executor_spin_some)(benchmark::State & st) { executor_spin_some(st); } +BENCHMARK_F(PerformanceTestExecutor, cbg_executor_spin_some)(benchmark::State & st) +{ + executor_spin_some(st); +} + + BENCHMARK_F(PerformanceTestExecutor, single_thread_executor_wait_for_work)(benchmark::State & st) { benchmark_wait_for_work(st); } +// BENCHMARK_F(PerformanceTestExecutor, static_single_thread_executor_wait_for_work)(benchmark::State & st) +// { +// benchmark_wait_for_work(st); +// } + BENCHMARK_F(PerformanceTestExecutor, multi_thread_executor_wait_for_work)(benchmark::State & st) { benchmark_wait_for_work(st); } +BENCHMARK_F(PerformanceTestExecutor, cbg_executor_wait_for_work)(benchmark::State & st) +{ + benchmark_wait_for_work(st); +} + + +BENCHMARK_F(PerformanceTestExecutor, single_thread_executor_wait_for_work_rebuild)(benchmark::State & st) +{ + benchmark_wait_for_work_force_rebuild(st); +} + +// BENCHMARK_F(PerformanceTestExecutor, static_single_thread_executor_wait_for_work_rebuild)(benchmark::State & st) +// { +// benchmark_wait_for_work_force_rebuild(st); +// } + +BENCHMARK_F(PerformanceTestExecutor, multi_thread_executor_wait_for_work_rebuild)(benchmark::State & st) +{ + benchmark_wait_for_work_force_rebuild(st); +} + +BENCHMARK_F(PerformanceTestExecutor, cbg_executor_wait_for_work_rebuild)(benchmark::State & st) +{ + benchmark_wait_for_work_force_rebuild(st); +} + +class CallbackWaitable : public rclcpp::Waitable +{ +public: + CallbackWaitable() = default; + + void + add_to_wait_set(rcl_wait_set_t * wait_set) override + { + + rcl_ret_t ret = rcl_wait_set_add_guard_condition(wait_set, &gc.get_rcl_guard_condition(), &gc_waitset_idx); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error( + ret, "failed to add guard condition to wait set"); + } + + if(has_trigger) + { + gc.trigger(); + } + + } + + void trigger() + { +// std::cout << "Trigger called for idx " << gc_waitset_idx << std::endl; + has_trigger = true; + gc.trigger(); + } + + bool + is_ready(rcl_wait_set_t * wait_set) override + { +// std::cout << "Is ready called for idx " << gc_waitset_idx << std::endl; + return wait_set->guard_conditions[gc_waitset_idx]; + } + + std::shared_ptr + take_data() override + { + return nullptr; + } + + std::shared_ptr + take_data_by_entity_id(size_t id) override + { + (void) id; + return nullptr; + } + + void + execute(std::shared_ptr & data) override + { +// std::cout << "execute called for idx " << gc_waitset_idx << std::endl; + has_trigger = false; + (void) data; + if(cb_fun) + { + cb_fun(); + } + } + + void set_execute_callback_function(std::function cb_fun_in) + { + cb_fun = std::move(cb_fun_in); + } + + size_t + get_number_of_ready_guard_conditions() override {return 1;} + + +private: + std::atomic has_trigger = false; + std::function cb_fun; + size_t gc_waitset_idx = 0; + rclcpp::GuardCondition gc; + +}; + class CascadedPerformanceTestExecutor : public PerformanceTest { + std::condition_variable cascase_done; + std::vector> waitables; + std::atomic last_cb_triggered; public: void SetUp(benchmark::State & st) { @@ -165,20 +367,44 @@ class CascadedPerformanceTestExecutor : public PerformanceTest for (unsigned int i = 0u; i < kNumberOfNodes; i++) { nodes.push_back(std::make_shared("my_node_" + std::to_string(i))); + auto waitable_interfaces = nodes.back()->get_node_waitables_interface(); + + auto callback_waitable = std::make_shared(); + waitables.push_back(callback_waitable); + waitable_interfaces->add_waitable(callback_waitable, nullptr); + publishers.push_back( nodes[i]->create_publisher( - "/empty_msgs_" + std::to_string(i), rclcpp::QoS(10))); + "/thread" + std::to_string(st.thread_index()) + "/empty_msgs_" + std::to_string(i), rclcpp::QoS(10))); auto callback = [this, i](test_msgs::msg::Empty::ConstSharedPtr) { if (i == kNumberOfNodes - 1) { this->callback_count++; + cascase_done.notify_all(); } else { publishers[i + 1]->publish(empty_msgs); } }; subscriptions.push_back( nodes[i]->create_subscription( - "/empty_msgs_" + std::to_string(i), rclcpp::QoS(10), std::move(callback))); + "/thread" + std::to_string(st.thread_index()) + "/empty_msgs_" + std::to_string(i), rclcpp::QoS(10), std::move(callback))); + } + for (unsigned int i = 0u; i < waitables.size(); i++) { + if (i == waitables.size() - 1) { + waitables[i]->set_execute_callback_function([this]() { + last_cb_triggered = true; + cascase_done.notify_all(); +// std::cout << "Notify done " << std::endl; + }); + + } else { + waitables[i]->set_execute_callback_function([this, i]() { +// std::cout << "Triggering callback " << i+1 << std::endl; + waitables[i+1]->trigger(); + + }); + } + } PerformanceTest::SetUp(st); } @@ -188,6 +414,8 @@ class CascadedPerformanceTestExecutor : public PerformanceTest subscriptions.clear(); publishers.clear(); nodes.clear(); + waitables.clear(); + last_cb_triggered = false; rclcpp::shutdown(); } @@ -197,23 +425,52 @@ class CascadedPerformanceTestExecutor : public PerformanceTest Executer executor; for (unsigned int i = 0u; i < kNumberOfNodes; i++) { executor.add_node(nodes[i]); - executor.spin_some(100ms); + } + + // precompute the executor internals + executor.spin_some(100ms); + + while(subscriptions.front()->get_publisher_count() == 0) + { + std::this_thread::sleep_for(10ms); } callback_count = 0; reset_heap_counters(); + std::mutex cond_mutex; + std::unique_lock lk(cond_mutex); + auto thread = std::thread([&executor] { +// std::cout << "Spin started" << std::endl; + executor.spin(); + +// std::cout << "Spin terminated" << std::endl; + + }); + for (auto _ : st) { (void)_; + last_cb_triggered = false; + // start the cascasde - publishers[0]->publish(empty_msgs); + waitables[0]->trigger(); - callback_count = 0; - while (callback_count == 0) { - executor.spin_some(100ms); + cascase_done.wait_for(lk, 500ms); +// std::cout << "waking up last cb was triggered : " << last_cb_triggered << std::endl; + + if (!last_cb_triggered) { + st.SkipWithError("No message was received"); } + } - if (callback_count == 0) { + +// std::cout << "Stopping executor " << std::endl; + + executor.cancel(); + + thread.join(); + + if (!last_cb_triggered) { st.SkipWithError("No message was received"); } } @@ -228,21 +485,33 @@ class CascadedPerformanceTestExecutor : public PerformanceTest BENCHMARK_F( CascadedPerformanceTestExecutor, - single_thread_executor_spin_some)(benchmark::State & st) + single_thread_executor_spin)(benchmark::State & st) { executor_spin_some(st); } -BENCHMARK_F(CascadedPerformanceTestExecutor, multi_thread_executor_spin_some)(benchmark::State & st) +BENCHMARK_F( + CascadedPerformanceTestExecutor, + static_single_thread_executor_spin)(benchmark::State & st) +{ + executor_spin_some(st); +} + +BENCHMARK_F(CascadedPerformanceTestExecutor, multi_thread_executor_spin)(benchmark::State & st) { executor_spin_some(st); } +BENCHMARK_F(CascadedPerformanceTestExecutor, cbg_executor_spin)(benchmark::State & st) +{ + executor_spin_some(st); +} class PerformanceTestExecutorMultipleCallbackGroups : public PerformanceTest { public: +// static constexpr unsigned int kNumberOfNodes = 20; void SetUp(benchmark::State & st) { @@ -251,25 +520,28 @@ class PerformanceTestExecutorMultipleCallbackGroups : public PerformanceTest for (unsigned int i = 0u; i < kNumberOfNodes; i++) { nodes.push_back(std::make_shared("my_node_" + std::to_string(i))); - callback_groups.push_back( + + for (unsigned int j = 0u; j < kNumberOfPubSubs; j++) { + callback_groups.push_back( nodes[i]->create_callback_group( - rclcpp::CallbackGroupType:: - MutuallyExclusive)); + rclcpp::CallbackGroupType:: + MutuallyExclusive)); - rclcpp::PublisherOptions pubOps; - pubOps.callback_group = callback_groups[i]; + rclcpp::PublisherOptions pubOps; + pubOps.callback_group = callback_groups.back(); - publishers.push_back( - nodes[i]->create_publisher( - "/empty_msgs_" + std::to_string(i), rclcpp::QoS(10), pubOps)); + publishers.push_back( + nodes[i]->create_publisher( + "/thread" + std::to_string(st.thread_index()) + "/empty_msgs_" + std::to_string(i) + "_" + std::to_string(j), rclcpp::QoS(10), pubOps)); - rclcpp::SubscriptionOptions subOps; - subOps.callback_group = callback_groups[i]; + rclcpp::SubscriptionOptions subOps; + subOps.callback_group = callback_groups.back(); - auto callback = [this](test_msgs::msg::Empty::ConstSharedPtr) {this->callback_count++;}; - subscriptions.push_back( - nodes[i]->create_subscription( - "/empty_msgs_" + std::to_string(i), rclcpp::QoS(10), std::move(callback), subOps)); + auto callback = [this](test_msgs::msg::Empty::ConstSharedPtr) {this->callback_count++;}; + subscriptions.push_back( + nodes[i]->create_subscription( + "/thread" + std::to_string(st.thread_index()) + "/empty_msgs_" + std::to_string(i) + "_" + std::to_string(j), rclcpp::QoS(10), std::move(callback), subOps)); + } } PerformanceTest::SetUp(st); } @@ -289,24 +561,29 @@ class PerformanceTestExecutorMultipleCallbackGroups : public PerformanceTest Executer executor; for (unsigned int i = 0u; i < kNumberOfNodes; i++) { executor.add_node(nodes[i]); - publishers[i]->publish(empty_msgs); - executor.spin_some(100ms); } + publishers.front()->publish(empty_msgs); + executor.spin_some(100ms); + callback_count = 0; reset_heap_counters(); for (auto _ : st) { (void)_; st.PauseTiming(); - for (unsigned int i = 0u; i < kNumberOfNodes; i++) { - publishers[i]->publish(empty_msgs); + for (auto &pub : publishers) { + pub->publish(empty_msgs); } + // wait for the messages to arrive +// std::this_thread::sleep_for(10ms); + std::this_thread::yield(); st.ResumeTiming(); callback_count = 0; - while (callback_count < kNumberOfNodes) { - executor.spin_some(100ms); + executor.spin_some(500ms); + if (callback_count != publishers.size()) { + st.SkipWithError("Not all message were received"); } } if (callback_count == 0) { @@ -330,12 +607,25 @@ BENCHMARK_F( executor_spin_some(st); } +BENCHMARK_F( + PerformanceTestExecutorMultipleCallbackGroups, + static_single_thread_executor_spin_some)(benchmark::State & st) +{ + executor_spin_some(st); +} + BENCHMARK_F( PerformanceTestExecutorMultipleCallbackGroups, multi_thread_executor_spin_some)(benchmark::State & st) { executor_spin_some(st); } + +BENCHMARK_F( + PerformanceTestExecutorMultipleCallbackGroups, + cbg_executor_spin_some)(benchmark::State & st) +{ + executor_spin_some(st); } class PerformanceTestExecutorSimple : public PerformanceTest @@ -465,6 +755,16 @@ BENCHMARK_F(PerformanceTestExecutorSimple, multi_thread_executor_remove_node)(be remove_node(st); } +BENCHMARK_F(PerformanceTestExecutorSimple, cbg_executor_add_node)(benchmark::State & st) +{ + add_node(st); +} + +BENCHMARK_F(PerformanceTestExecutorSimple, cbg_executor_remove_node)(benchmark::State & st) +{ + remove_node(st); +} + BENCHMARK_F( PerformanceTestExecutorSimple, static_single_thread_executor_add_node)(benchmark::State & st) @@ -493,6 +793,13 @@ BENCHMARK_F( spin_node_until_future_complete(st); } +BENCHMARK_F( + PerformanceTestExecutorSimple, + cbg_executor_spin_node_until_future_complete)(benchmark::State & st) +{ + spin_node_until_future_complete(st); +} + BENCHMARK_F( PerformanceTestExecutorSimple, static_single_thread_executor_spin_node_until_future_complete)(benchmark::State & st) @@ -500,6 +807,341 @@ BENCHMARK_F( spin_node_until_future_complete(st); } +class SharedPtrHolder : public PerformanceTest +{ +public: + void SetUp(benchmark::State & st) + { + shared_ptr = std::make_shared("foo"); + weak_ptr = shared_ptr; + PerformanceTest::SetUp(st); + } + void TearDown(benchmark::State & st) + { + PerformanceTest::TearDown(st); + shared_ptr.reset(); + weak_ptr.reset(); + } + std::shared_ptr shared_ptr; + std::weak_ptr weak_ptr; +}; + + +BENCHMARK_F(SharedPtrHolder, + shared_ptr_use_cnt)(benchmark::State & st) +{ + for (auto _ : st) { + (void)_; + size_t cnt = shared_ptr.use_count(); + benchmark::DoNotOptimize(cnt); + } +} + +BENCHMARK_F(SharedPtrHolder, + weakt_upcast)(benchmark::State & st) +{ + for (auto _ : st) { + (void)_; + + std::shared_ptr ptr = weak_ptr.lock(); + benchmark::DoNotOptimize(ptr); + + } +} + +class Foo +{ +public: + Foo(size_t i) : num(i) {}; + Foo() : num(0) {}; + + ~Foo() + { + } + + size_t num = 0; + std::shared_ptr foo_ptr; + rclcpp::AnyExecutable exec; +}; + +BENCHMARK_F(SharedPtrHolder, + clear_std_vector)(benchmark::State & st) +{ + const size_t size = 80; + + std::vector tmp; + tmp.reserve(size); + + for (auto _ : st) { + (void)_; + tmp.clear(); + tmp.reserve(size); + + for(size_t i = 0; i < size; i++ ) + { + tmp.push_back(i); + } + + benchmark::DoNotOptimize(tmp); + } +} + +BENCHMARK_F(SharedPtrHolder, + resize_zero_std_vector)(benchmark::State & st) +{ + const size_t size = 80; + + std::vector tmp; + tmp.reserve(size); + + for (auto _ : st) { + (void)_; + tmp.resize(0); + tmp.reserve(size); + + for(size_t i = 0; i < size; i++ ) + { + tmp.push_back(i); + } + + benchmark::DoNotOptimize(tmp); + } +} + +using any_exec = std::variant; + +struct StupidVariant +{ + enum type { + Subscription, + Service, + Timer, + Client, + Waitable, + GuardCondition, + }; + + type type; + + std::weak_ptr type_ptr; + + StupidVariant(rclcpp::SubscriptionBase::WeakPtr ptr) : + type(Subscription), + type_ptr(std::move(ptr)) + { + } + + StupidVariant(rclcpp::TimerBase::WeakPtr ptr) : + type(Timer), + type_ptr(std::move(ptr)) + { + } + + template + std::shared_ptr as_share_ptr() const + { + return std::static_pointer_cast(type_ptr.lock()); + } + +}; + +BENCHMARK_F(SharedPtrHolder, + variant_insert)(benchmark::State & st) +{ + rclcpp::init(0, nullptr); + + rclcpp::Node node("FooNode"); + + const size_t size = 80; + + std::vector tmp; + + std::vector ptrs; + ptrs.reserve(size); + for(size_t i = 0; i < size; i++ ) + { + ptrs.emplace_back(node.create_subscription( + "/foo" + std::to_string(i), rclcpp::QoS(10), [] (const test_msgs::msg::Empty &/*msg*/) {})); + } + + for (auto _ : st) { + (void)_; + tmp.clear(); + tmp.reserve(size); + + for(size_t i = 0; i < size; i++ ) + { + tmp.emplace_back(ptrs[i]); + } + } + rclcpp::shutdown(); +} + +BENCHMARK_F(SharedPtrHolder, + push_ref)(benchmark::State & st) +{ + rclcpp::init(0, nullptr); + + rclcpp::Node node("FooNode"); + + const size_t size = 80; + + std::vector execs; + + std::vector ptrs; + ptrs.reserve(size); + for(size_t i = 0; i < size; i++ ) + { + ptrs.emplace_back(node.create_subscription( + "/foo" + std::to_string(i), rclcpp::QoS(10), [] (const test_msgs::msg::Empty &/*msg*/) {})); + } + execs.clear(); + execs.reserve(size); + + for(size_t i = 0; i < size; i++ ) + { + execs.emplace_back(ptrs[i]); + } + std::vector subscribers; + + for (auto _ : st) { + (void)_; + + subscribers.clear(); + subscribers.reserve(size); + for(size_t i = 0; i < size; i++ ) + { + auto handle_shr_ptr = ptrs[i]->get_subscription_handle(); + +// subscribers.push_back(rclcpp::executors::SubscriberRef(ptrs[i], handle_shr_ptr , nullptr)); + + } + } + + rclcpp::shutdown(); +} + +BENCHMARK_F(SharedPtrHolder, + emplace_ref)(benchmark::State & st) +{ + rclcpp::init(0, nullptr); + + rclcpp::Node node("FooNode"); + + const size_t size = 80; + + std::vector execs; + + std::vector ptrs; + ptrs.reserve(size); + for(size_t i = 0; i < size; i++ ) + { + ptrs.emplace_back(node.create_subscription( + "/foo" + std::to_string(i), rclcpp::QoS(10), [] (const test_msgs::msg::Empty &/*msg*/) {})); + } + execs.clear(); + execs.reserve(size); + + for(size_t i = 0; i < size; i++ ) + { + execs.emplace_back(ptrs[i]); + } + std::vector subscribers; + + for (auto _ : st) { + (void)_; + + subscribers.clear(); + subscribers.reserve(size); + for(size_t i = 0; i < size; i++ ) + { + auto handle_shr_ptr = ptrs[i]->get_subscription_handle(); + +// subscribers.emplace_back(rclcpp::executors::SubscriberRef(ptrs[i], handle_shr_ptr , nullptr)); + + } + } + + rclcpp::shutdown(); +} +BENCHMARK_F(SharedPtrHolder, + switch_and_cast_insert)(benchmark::State & st) +{ + rclcpp::init(0, nullptr); + + rclcpp::Node node("FooNode"); + + const size_t size = 80; + + std::vector tmp; + + std::vector ptrs; + ptrs.reserve(size); + for(size_t i = 0; i < size; i++ ) + { + ptrs.emplace_back(node.create_subscription( + "/foo" + std::to_string(i), rclcpp::QoS(10), [] (const test_msgs::msg::Empty &/*msg*/) {})); + } + + for (auto _ : st) { + (void)_; + tmp.clear(); + tmp.reserve(size); + + for(size_t i = 0; i < size; i++ ) + { + tmp.emplace_back(ptrs[i]); + } + } + rclcpp::shutdown(); +} + +BENCHMARK_F(SharedPtrHolder, + stupid_variant_get)(benchmark::State & st) +{ + rclcpp::init(0, nullptr); + + rclcpp::Node node("FooNode"); + + const size_t size = 80; + + std::vector execs; + + std::vector ptrs; + ptrs.reserve(size); + for(size_t i = 0; i < size; i++ ) + { + ptrs.emplace_back(node.create_subscription( + "/foo" + std::to_string(i), rclcpp::QoS(10), [] (const test_msgs::msg::Empty &/*msg*/) {})); + } + execs.clear(); + execs.reserve(size); + + for(size_t i = 0; i < size; i++ ) + { + execs.emplace_back(ptrs[i]); + } + + + + + for (auto _ : st) { + (void)_; + for(size_t i = 0; i < size; i++ ) + { + + const rclcpp::SubscriptionBase::SharedPtr ref = execs[i].as_share_ptr(); + + benchmark::DoNotOptimize(ref); + + } + } + + rclcpp::shutdown(); +} + BENCHMARK_F( PerformanceTestExecutorSimple, static_single_thread_executor_spin_until_future_complete)(benchmark::State & st) diff --git a/rclcpp/test/rclcpp/executors/test_executors.cpp b/rclcpp/test/rclcpp/executors/test_executors.cpp index 35eea01038..9d99df204e 100644 --- a/rclcpp/test/rclcpp/executors/test_executors.cpp +++ b/rclcpp/test/rclcpp/executors/test_executors.cpp @@ -40,6 +40,8 @@ #include "test_msgs/msg/empty.hpp" #include "rosgraph_msgs/msg/clock.hpp" +#include "rclcpp/executors/cbg_executor.hpp" +#include "test_msgs/srv/empty.hpp" using namespace std::chrono_literals; @@ -91,6 +93,7 @@ using ExecutorTypes = rclcpp::executors::SingleThreadedExecutor, rclcpp::executors::MultiThreadedExecutor, rclcpp::executors::StaticSingleThreadedExecutor, + rclcpp::executors::CBGExecutor, rclcpp::experimental::executors::EventsExecutor>; class ExecutorTypeNames @@ -112,6 +115,10 @@ class ExecutorTypeNames return "StaticSingleThreadedExecutor"; } + if (std::is_same()) { + return "CallbackGroupExecutor"; + } + if (std::is_same()) { return "EventsExecutor"; } @@ -130,6 +137,7 @@ using StandardExecutors = ::testing::Types< rclcpp::executors::SingleThreadedExecutor, rclcpp::executors::MultiThreadedExecutor, + rclcpp::executors::CBGExecutor, rclcpp::experimental::executors::EventsExecutor>; TYPED_TEST_SUITE(TestExecutorsStable, StandardExecutors, ExecutorTypeNames); @@ -607,6 +615,143 @@ TYPED_TEST(TestExecutors, testSpinUntilFutureCompleteInterrupted) spinner.join(); } +// Check if services work as expected +TYPED_TEST(TestExecutors, testService) +{ + using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + + ExecutorType executor; + executor.add_node(this->node); + + bool spin_exited = false; + + rclcpp::Node::SharedPtr node = this->node; + + const std::string service_name("/test/test_service"); + + using Service = test_msgs::srv::Empty; + + bool gotCallback = false; + + auto service_cb = [&gotCallback] (const std::shared_ptr /*request*/, + std::shared_ptr /*response*/) + { + gotCallback = true; + }; + + auto service = node->create_service(service_name, service_cb, rclcpp::ServicesQoS()); + + // Long timeout + std::thread spinner([&spin_exited, &executor]() { + executor.spin(); + spin_exited = true; + }); + + std::this_thread::sleep_for(1ms); + + const std::shared_ptr req = std::make_shared(); + + auto client = node->create_client(service_name); + + + EXPECT_TRUE(client->wait_for_service(30ms)); + + auto handle = client->async_send_request(req); + + auto retCode = handle.wait_for(500ms); + EXPECT_EQ(retCode, std::future_status::ready); + + EXPECT_TRUE(gotCallback); + + // Force interruption + rclcpp::shutdown(); + + // Give it time to exit + auto start = std::chrono::steady_clock::now(); + while (!spin_exited && (std::chrono::steady_clock::now() - start) < 1s) { + std::this_thread::sleep_for(1ms); + } + + EXPECT_TRUE(spin_exited); + spinner.join(); +} + +//test if objects work that were added after spinning started +TYPED_TEST(TestExecutors, addAfterSpin) +{ + using ExecutorType = TypeParam; + // rmw_connextdds doesn't support events-executor + if ( + std::is_same() && + std::string(rmw_get_implementation_identifier()).find("rmw_connextdds") == 0) + { + GTEST_SKIP(); + } + + ExecutorType executor; + executor.add_node(this->node); + + bool spin_exited = false; + + + // Long timeout + std::thread spinner([&spin_exited, &executor]() { + executor.spin(); + spin_exited = true; + }); + + std::this_thread::sleep_for(10ms); + + rclcpp::Node::SharedPtr node = this->node; + + const std::string service_name("/test/test_service"); + + using Service = test_msgs::srv::Empty; + + bool gotCallback = false; + + auto service_cb = [&gotCallback] (const std::shared_ptr /*request*/, + std::shared_ptr /*response*/) + { + gotCallback = true; + }; + + auto service = node->create_service(service_name, service_cb, rclcpp::ServicesQoS()); + + const std::shared_ptr req = std::make_shared(); + + auto client = node->create_client(service_name); + + + EXPECT_TRUE(client->wait_for_service(30ms)); + + auto handle = client->async_send_request(req); + + auto retCode = handle.wait_for(500ms); + EXPECT_EQ(retCode, std::future_status::ready); + + EXPECT_TRUE(gotCallback); + + // Force interruption + rclcpp::shutdown(); + + // Give it time to exit + auto start = std::chrono::steady_clock::now(); + while (!spin_exited && (std::chrono::steady_clock::now() - start) < 1s) { + std::this_thread::sleep_for(1ms); + } + + EXPECT_TRUE(spin_exited); + spinner.join(); +} + // This test verifies that the add_node operation is robust wrt race conditions. // It's mostly meant to prevent regressions in the events-executor, but the operation should be // thread-safe in all executor implementations.