diff --git a/include/alpaka/core/CallbackThread.hpp b/include/alpaka/core/CallbackThread.hpp index aa873c709de2..24967a53db8a 100644 --- a/include/alpaka/core/CallbackThread.hpp +++ b/include/alpaka/core/CallbackThread.hpp @@ -16,7 +16,10 @@ namespace alpaka::core { class CallbackThread { + // std::packaged_task is used because std::function requires that the wrapped callable is copyable. + //! \todo with C++23 std::move_only_function should be used using Task = std::packaged_task; + using TaskPackage = std::pair>; public: ~CallbackThread() @@ -38,11 +41,8 @@ namespace alpaka::core } } - // Note: due to different std lib implementations of packaged_task, the lifetime of the passed function either - // ends when the packaged_task is destroyed (while the returned future is still alive) or when both are - // destroyed. Therefore, ensure that a submitted task does not extend the lifetime of any object that - // (transitively) holds the returned future. E.g. don't capture a shared_ptr to an alpaka object that stores - // the returned future. This is a cyclic dependency and creates a leak. + //! It is guaranteed that the task is fully destroyed before the future's result is set. + //! @{ template auto submit(NullaryFunction&& nf) -> std::future { @@ -54,11 +54,13 @@ namespace alpaka::core auto submit(Task task) -> std::future { - auto f = task.get_future(); + // We do not use the future of std::packed_task because the future will keep the task alive + // and we can not control the moment the future is set. + auto tp = std::make_pair(std::move(task), std::promise{}); + auto f = tp.second.get_future(); { std::unique_lock lock{m_mutex}; - ++m_tasksInProgress; - m_tasks.emplace(std::move(task)); + m_tasks.emplace(std::move(tp)); if(!m_thread.joinable()) startWorkerThread(); m_cond.notify_one(); @@ -67,9 +69,15 @@ namespace alpaka::core return f; } - [[nodiscard]] auto empty() const + //! @} + + //! @return True if queue is empty and no task is executed else false. + //! If only one tasks is enqueued and the task is executed the task will see the queue as not empty. + //! During the destruction of this single enqueued task the queue will already be accounted as empty. + [[nodiscard]] auto empty() { - return m_tasksInProgress == 0; + std::unique_lock lock{m_mutex}; + return m_tasks.empty(); } private: @@ -77,8 +85,7 @@ namespace alpaka::core std::condition_variable m_cond; std::mutex m_mutex; bool m_stop{false}; - std::queue m_tasks; - std::atomic m_tasksInProgress{0}; + std::queue m_tasks; auto startWorkerThread() -> void { @@ -87,9 +94,9 @@ namespace alpaka::core { while(true) { + std::promise taskPromise; { - // Do not move the tasks out of the loop else the lifetime could be extended until the - // moment where the callback thread is destructed. + // Task is destroyed before promise is updated but after the queue state is up to date. Task task; { std::unique_lock lock{m_mutex}; @@ -98,13 +105,21 @@ namespace alpaka::core if(m_stop && m_tasks.empty()) break; - task = std::move(m_tasks.front()); - m_tasks.pop(); + task = std::move(m_tasks.front().first); + taskPromise = std::move(m_tasks.front().second); } - task(); + { + std::unique_lock lock{m_mutex}; + // Pop empty data from the queue, task and promise will be destroyed later in a + // well-defined order. + m_tasks.pop(); + } + // Task will be destroyed here, the queue status is already updated. } - --m_tasksInProgress; + // In case the executed tasks is the last task in the queue the waiting threads will see the + // queue as empty. + taskPromise.set_value(); } }); } diff --git a/include/alpaka/event/EventGenericThreads.hpp b/include/alpaka/event/EventGenericThreads.hpp index 032bc7e9400a..aa9aa5b2f48f 100644 --- a/include/alpaka/event/EventGenericThreads.hpp +++ b/include/alpaka/event/EventGenericThreads.hpp @@ -154,8 +154,6 @@ namespace alpaka { spEventImpl->m_LastReadyEnqueueCount = spEventImpl->m_enqueueCount; } - spEventImpl - .reset(); // avoid keeping the event alive as part of the background thread task's future }); } }; @@ -304,8 +302,6 @@ namespace alpaka { std::unique_lock lk2(spEventImpl->m_mutex); spEventImpl->wait(enqueueCount, lk2); - spEventImpl.reset(); // avoid keeping the event alive as part of the background thread - // task's future }); } } @@ -376,9 +372,12 @@ namespace alpaka { ALPAKA_FN_HOST static auto currentThreadWaitFor(QueueGenericThreadsNonBlocking const& queue) -> void { - EventGenericThreads event(getDev(queue)); - alpaka::enqueue(const_cast&>(queue), event); - wait(event); + // Enqueue a dummy tasks into the worker thread of the queue will provide a future we can wait for. + // Previously we enqueued an event into the queue but this will not guarantee that queue is empty + // after the event is finished because the event handling can be finished before the event task is + // fully removed from the queue. + auto f = queue.m_spQueueImpl->m_workerThread.submit([]() noexcept {}); + f.wait(); } }; } // namespace trait diff --git a/include/alpaka/queue/cuda_hip/QueueUniformCudaHipRt.hpp b/include/alpaka/queue/cuda_hip/QueueUniformCudaHipRt.hpp index 29a5c66a5976..a68b5ea2cb71 100644 --- a/include/alpaka/queue/cuda_hip/QueueUniformCudaHipRt.hpp +++ b/include/alpaka/queue/cuda_hip/QueueUniformCudaHipRt.hpp @@ -203,14 +203,9 @@ namespace alpaka { auto data = std::unique_ptr(reinterpret_cast(arg)); auto& queue = data->q; - auto f = queue.m_callbackThread.submit( - [data = std::move(data)]() mutable - { - data->t(); - data.reset(); // destroy the task - }); + auto f = queue.m_callbackThread.submit([data = std::move(data)] { data->t(); }); f.wait(); - } // destroys the future `f`, destroying the packaged task and the above lambda + } ALPAKA_FN_HOST static auto enqueue( uniform_cuda_hip::detail::QueueUniformCudaHipRt& queue, diff --git a/include/alpaka/test/event/EventHostManualTrigger.hpp b/include/alpaka/test/event/EventHostManualTrigger.hpp index b6318527c59d..85891c0fbe82 100644 --- a/include/alpaka/test/event/EventHostManualTrigger.hpp +++ b/include/alpaka/test/event/EventHostManualTrigger.hpp @@ -185,8 +185,6 @@ namespace alpaka::trait lk2, [spEventImpl, enqueueCount] { return (enqueueCount != spEventImpl->m_enqueueCount) || spEventImpl->m_bIsReady; }); - spEventImpl - .reset(); // avoid keeping the event alive as part of the background thread task's future }); } };