Skip to content

Commit

Permalink
fix callback thread task lifetime
Browse files Browse the repository at this point in the history
fix:  alpaka-group#2007 alpaka-group#1936

Rewrite the callback thread to have a well defined lifetime of the
enqueued tasks and the moment when the future will be set after the
tasks is executed.
The lifetime issue of data captured by an lambda introduced with alpaka-group#1870
is solved to and explicit destroying data is not required anymore.
  • Loading branch information
psychocoderHPC authored and j-stephan committed Jul 28, 2023
1 parent 164edf1 commit ab4eb3f
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 34 deletions.
51 changes: 33 additions & 18 deletions include/alpaka/core/CallbackThread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ namespace alpaka::core
{
class CallbackThread
{
// std::packaged_task is used because std::function requires that the wrapped callable is copyable.
//! \todo with C++23 std::move_only_function should be used
using Task = std::packaged_task<void()>;
using TaskPackage = std::pair<Task, std::promise<void>>;

public:
~CallbackThread()
Expand All @@ -38,11 +41,8 @@ namespace alpaka::core
}
}

// Note: due to different std lib implementations of packaged_task, the lifetime of the passed function either
// ends when the packaged_task is destroyed (while the returned future is still alive) or when both are
// destroyed. Therefore, ensure that a submitted task does not extend the lifetime of any object that
// (transitively) holds the returned future. E.g. don't capture a shared_ptr to an alpaka object that stores
// the returned future. This is a cyclic dependency and creates a leak.
//! It is guaranteed that the task is fully destroyed before the future's result is set.
//! @{
template<typename NullaryFunction>
auto submit(NullaryFunction&& nf) -> std::future<void>
{
Expand All @@ -54,11 +54,13 @@ namespace alpaka::core

auto submit(Task task) -> std::future<void>
{
auto f = task.get_future();
// We do not use the future of std::packed_task because the future will keep the task alive
// and we can not control the moment the future is set.
auto tp = std::make_pair(std::move(task), std::promise<void>{});
auto f = tp.second.get_future();
{
std::unique_lock<std::mutex> lock{m_mutex};
++m_tasksInProgress;
m_tasks.emplace(std::move(task));
m_tasks.emplace(std::move(tp));
if(!m_thread.joinable())
startWorkerThread();
m_cond.notify_one();
Expand All @@ -67,18 +69,23 @@ namespace alpaka::core
return f;
}

[[nodiscard]] auto empty() const
//! @}

//! @return True if queue is empty and no task is executed else false.
//! If only one tasks is enqueued and the task is executed the task will see the queue as not empty.
//! During the destruction of this single enqueued task the queue will already be accounted as empty.
[[nodiscard]] auto empty()
{
return m_tasksInProgress == 0;
std::unique_lock<std::mutex> lock{m_mutex};
return m_tasks.empty();
}

private:
std::thread m_thread;
std::condition_variable m_cond;
std::mutex m_mutex;
bool m_stop{false};
std::queue<Task> m_tasks;
std::atomic<int> m_tasksInProgress{0};
std::queue<TaskPackage> m_tasks;

auto startWorkerThread() -> void
{
Expand All @@ -87,9 +94,9 @@ namespace alpaka::core
{
while(true)
{
std::promise<void> taskPromise;
{
// Do not move the tasks out of the loop else the lifetime could be extended until the
// moment where the callback thread is destructed.
// Task is destroyed before promise is updated but after the queue state is up to date.
Task task;
{
std::unique_lock<std::mutex> lock{m_mutex};
Expand All @@ -98,13 +105,21 @@ namespace alpaka::core
if(m_stop && m_tasks.empty())
break;

task = std::move(m_tasks.front());
m_tasks.pop();
task = std::move(m_tasks.front().first);
taskPromise = std::move(m_tasks.front().second);
}

task();
{
std::unique_lock<std::mutex> lock{m_mutex};
// Pop empty data from the queue, task and promise will be destroyed later in a
// well-defined order.
m_tasks.pop();
}
// Task will be destroyed here, the queue status is already updated.
}
--m_tasksInProgress;
// In case the executed tasks is the last task in the queue the waiting threads will see the
// queue as empty.
taskPromise.set_value();
}
});
}
Expand Down
13 changes: 6 additions & 7 deletions include/alpaka/event/EventGenericThreads.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,6 @@ namespace alpaka
{
spEventImpl->m_LastReadyEnqueueCount = spEventImpl->m_enqueueCount;
}
spEventImpl
.reset(); // avoid keeping the event alive as part of the background thread task's future
});
}
};
Expand Down Expand Up @@ -304,8 +302,6 @@ namespace alpaka
{
std::unique_lock<std::mutex> lk2(spEventImpl->m_mutex);
spEventImpl->wait(enqueueCount, lk2);
spEventImpl.reset(); // avoid keeping the event alive as part of the background thread
// task's future
});
}
}
Expand Down Expand Up @@ -376,9 +372,12 @@ namespace alpaka
{
ALPAKA_FN_HOST static auto currentThreadWaitFor(QueueGenericThreadsNonBlocking<TDev> const& queue) -> void
{
EventGenericThreads<TDev> event(getDev(queue));
alpaka::enqueue(const_cast<QueueGenericThreadsNonBlocking<TDev>&>(queue), event);
wait(event);
// Enqueue a dummy tasks into the worker thread of the queue will provide a future we can wait for.
// Previously we enqueued an event into the queue but this will not guarantee that queue is empty
// after the event is finished because the event handling can be finished before the event task is
// fully removed from the queue.
auto f = queue.m_spQueueImpl->m_workerThread.submit([]() noexcept {});
f.wait();
}
};
} // namespace trait
Expand Down
9 changes: 2 additions & 7 deletions include/alpaka/queue/cuda_hip/QueueUniformCudaHipRt.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,14 +203,9 @@ namespace alpaka
{
auto data = std::unique_ptr<HostFuncData>(reinterpret_cast<HostFuncData*>(arg));
auto& queue = data->q;
auto f = queue.m_callbackThread.submit(
[data = std::move(data)]() mutable
{
data->t();
data.reset(); // destroy the task
});
auto f = queue.m_callbackThread.submit([data = std::move(data)] { data->t(); });
f.wait();
} // destroys the future `f`, destroying the packaged task and the above lambda
}

ALPAKA_FN_HOST static auto enqueue(
uniform_cuda_hip::detail::QueueUniformCudaHipRt<TApi, TBlocking>& queue,
Expand Down
2 changes: 0 additions & 2 deletions include/alpaka/test/event/EventHostManualTrigger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,6 @@ namespace alpaka::trait
lk2,
[spEventImpl, enqueueCount]
{ return (enqueueCount != spEventImpl->m_enqueueCount) || spEventImpl->m_bIsReady; });
spEventImpl
.reset(); // avoid keeping the event alive as part of the background thread task's future
});
}
};
Expand Down

0 comments on commit ab4eb3f

Please sign in to comment.