From 9dc8ae413b4a52495876c5efb86e01186efdc111 Mon Sep 17 00:00:00 2001 From: Lei Zhang Date: Mon, 26 Feb 2024 20:21:15 -0800 Subject: [PATCH] [cuda][hip] Fix launch host func and worker thread state update (#16568) This commits fixes a few issues in pending action queue to resolve driver deadlock issues: * In host launch func, which is called from a driver thread, we cannot invoke any GPU API. Otherwise we might see deadlock. This includes cleaning up the actions after execution--it may involve buffer releasing/unregistering which was the issue causing hip driver hang. Now move this cleanup into the worker thread. This is done by adding a state field to each action to indicate whether it's alive or zombie. We enqueue each action again after done execution by flipping its state to zombie to let the worker thread to cleanup. * The worker thread can have five states--two normal states (idle waiting or workload pending), three exit states (requested, committed, error). They have increasing priorities w.r.t. overwriting. We cannot overwrite state later in the list without checking. This guarantees that exit requests are properly respected and not dropping to the floor so to have clean exit. * When the worker thread is waken to process ready list, we need to immediately flip the worker state from workload pending to idle waiting, before any real processing. This makes sure we don't drop new workload enqueued while we are processing, and the worker thread can be waken up again properly later. With the above fixes, we can pass all stablehlo/tosa e2e op tests on hip driver without hang or crashes. The same change is mirrored to the cuda pending action queue. Fixes https://github.com/openxla/iree/issues/15790 Progress towards https://github.com/openxla/iree/issues/16504 --- experimental/hip/pending_queue_actions.c | 201 ++++++++++++------ .../hal/drivers/cuda/pending_queue_actions.c | 201 ++++++++++++------ 2 files changed, 278 insertions(+), 124 deletions(-) diff --git a/experimental/hip/pending_queue_actions.c b/experimental/hip/pending_queue_actions.c index cf63c45f06af..7f346a3d334f 100644 --- a/experimental/hip/pending_queue_actions.c +++ b/experimental/hip/pending_queue_actions.c @@ -17,6 +17,7 @@ #include "iree/base/api.h" #include "iree/base/internal/arena.h" #include "iree/base/internal/atomic_slist.h" +#include "iree/base/internal/atomics.h" #include "iree/base/internal/synchronization.h" #include "iree/base/internal/threading.h" #include "iree/hal/api.h" @@ -35,6 +36,13 @@ typedef enum iree_hal_hip_queue_action_kind_e { // TODO: Add support for queue alloca and dealloca. } iree_hal_hip_queue_action_kind_t; +typedef enum iree_hal_hip_queue_action_state_e { + // The current action is active as waiting for or under execution. + IREE_HAL_HIP_QUEUE_ACTION_STATE_ALIVE, + // The current action is done execution and waiting for destruction. + IREE_HAL_HIP_QUEUE_ACTION_STATE_ZOMBIE, +} iree_hal_hip_queue_action_state_t; + // A pending queue action. // // Note that this struct does not have internal synchronization; it's expected @@ -49,6 +57,11 @@ typedef struct iree_hal_hip_queue_action_t { // Retained to make sure it outlives the current action. iree_hal_hip_pending_queue_actions_t* owning_actions; + // The current state of this action. When an action is initially created it + // will be alive and enqueued to wait for releasing to the GPU. After done + // execution, it will be flipped into zombie state and enqueued again for + // destruction. + iree_hal_hip_queue_action_state_t state; // The callback to run after completing this action and before freeing // all resources. Can be NULL. iree_hal_hip_pending_action_cleanup_callback_t cleanup_callback; @@ -83,6 +96,7 @@ typedef struct iree_hal_hip_queue_action_t { // Scratch fields for analyzing whether actions are ready to issue. hipEvent_t events[IREE_HAL_HIP_MAX_WAIT_EVENT_COUNT]; iree_host_size_t event_count; + // Whether the current action is still not ready for releasing to the GPU. bool is_pending; } iree_hal_hip_queue_action_t; @@ -139,7 +153,7 @@ static void iree_hal_hip_queue_action_list_erase( static void iree_hal_hip_queue_action_list_take_all( iree_hal_hip_queue_action_list_t* available_list, iree_hal_hip_queue_action_list_t* ready_list) { - IREE_ASSERT(available_list != ready_list); + IREE_ASSERT_NE(available_list, ready_list); ready_list->head = available_list->head; ready_list->tail = available_list->tail; available_list->head = NULL; @@ -173,12 +187,16 @@ IREE_TYPED_ATOMIC_SLIST_WRAPPER(iree_hal_hip_ready_action, slist_next)); // The ready-list processing worker's working/exiting state. +// +// States in the list has increasing priorities--meaning normally ones appearing +// earlier can overwrite ones appearing later without checking; but not the +// reverse order. typedef enum iree_hal_hip_worker_state_e { - IREE_HAL_HIP_WORKER_STATE_IDLE_WAITING = 0, - IREE_HAL_HIP_WORKER_STATE_WORKLOAD_PENDING = 1, - IREE_HAL_HIP_WORKER_STATE_EXIT_REQUESTED = -1, - IREE_HAL_HIP_WORKER_STATE_EXIT_COMMITTED = -2, - IREE_HAL_HIP_WORKER_STATE_EXIT_ERROR = -3, + IREE_HAL_HIP_WORKER_STATE_IDLE_WAITING = 0, // Worker to main thread + IREE_HAL_HIP_WORKER_STATE_WORKLOAD_PENDING = 1, // Main to worker thread + IREE_HAL_HIP_WORKER_STATE_EXIT_REQUESTED = -1, // Main to worker thread + IREE_HAL_HIP_WORKER_STATE_EXIT_COMMITTED = -2, // Worker to main thread + IREE_HAL_HIP_WORKER_STATE_EXIT_ERROR = -3, // Worker to main thread } iree_hal_hip_worker_state_t; // The data structure needed by a ready-list processing worker thread to issue @@ -329,6 +347,7 @@ void iree_hal_hip_pending_queue_actions_destroy( iree_memory_order_acq_rel); iree_notification_post(&working_area->state_notification, IREE_ALL_WAITERS); + // Check potential exit states from the worker. if (prev_state != IREE_HAL_HIP_WORKER_STATE_EXIT_ERROR) { // Wait until the worker acknowledged exiting. iree_notification_await( @@ -424,12 +443,16 @@ iree_status_t iree_hal_hip_pending_queue_actions_enqueue_execution( z0, iree_allocator_malloc(actions->host_allocator, sizeof(*action), (void**)&action)); - action->kind = IREE_HAL_HIP_QUEUE_ACTION_TYPE_EXECUTION; + action->owning_actions = actions; + action->state = IREE_HAL_HIP_QUEUE_ACTION_STATE_ALIVE; action->cleanup_callback = cleanup_callback; action->callback_user_data = callback_user_data; + action->kind = IREE_HAL_HIP_QUEUE_ACTION_TYPE_EXECUTION; action->device = device; action->dispatch_hip_stream = dispatch_stream; action->callback_hip_stream = callback_stream; + + // Initialize scratch fields. action->event_count = 0; action->is_pending = true; @@ -451,6 +474,9 @@ iree_status_t iree_hal_hip_pending_queue_actions_enqueue_execution( iree_hal_resource_set_insert(resource_set, signal_semaphore_list.count, signal_semaphore_list.semaphores); } + if (IREE_LIKELY(iree_status_is_ok(status))) { + action->resource_set = resource_set; + } // Copy the command buffer list for later access. // TODO: avoid host allocator malloc; use some pool for the allocation. @@ -475,11 +501,10 @@ iree_status_t iree_hal_hip_pending_queue_actions_enqueue_execution( } if (IREE_LIKELY(iree_status_is_ok(status))) { - action->owning_actions = actions; + // Retain the owning queue to make sure the action outlives it. iree_hal_resource_retain(actions); - action->resource_set = resource_set; - + // Now everything is okay and we can enqueue the action. iree_slim_mutex_lock(&actions->action_mutex); iree_hal_hip_queue_action_list_push_back(&actions->action_list, action); iree_slim_mutex_unlock(&actions->action_mutex); @@ -498,34 +523,42 @@ iree_status_t iree_hal_hip_pending_queue_actions_enqueue_execution( return status; } -static void iree_hal_hip_pending_queue_actions_cleanup_execution( - iree_hal_hip_queue_action_t* action); - // Releases resources after action completion on the GPU and advances timeline // and pending actions queue. // -// This is the HIP host function callback to hipLaunchHostFunc, invoked by a -// HIP driver thread. +// This is the HIP host function callback to hipLaunchHostFunc(), invoked by a +// HIP driver thread. Note that code in this function MUST NOT invoke any GPU +// API under the hood to avoid potential deadlock. static void iree_hal_hip_execution_device_signal_host_callback( void* user_data) { IREE_TRACE_ZONE_BEGIN(z0); iree_hal_hip_queue_action_t* action = (iree_hal_hip_queue_action_t*)user_data; + IREE_ASSERT_EQ(action->kind, IREE_HAL_HIP_QUEUE_ACTION_TYPE_EXECUTION); + IREE_ASSERT_EQ(action->state, IREE_HAL_HIP_QUEUE_ACTION_STATE_ALIVE); iree_hal_hip_pending_queue_actions_t* actions = action->owning_actions; + + // Flip the action state to zombie and enqueue it again so that we can let + // the worker thread clean it up. Note that this is necessary because cleanup + // may involve GPU API calls like buffer releasing or unregistering, so we can + // not inline it here. + action->state = IREE_HAL_HIP_QUEUE_ACTION_STATE_ZOMBIE; + iree_slim_mutex_lock(&actions->action_mutex); + iree_hal_hip_queue_action_list_push_back(&actions->action_list, action); + iree_slim_mutex_unlock(&actions->action_mutex); + // Advance semaphore timelines by calling into the host signaling function. + // This will internally try to release more workload to the GPU. IREE_IGNORE_ERROR( iree_hal_semaphore_list_signal(action->signal_semaphore_list)); - // Destroy the current action given its done now--this also frees all retained - // resources. - iree_hal_hip_pending_queue_actions_cleanup_execution(action); - // Try to release more pending actions to the GPU now. - IREE_IGNORE_ERROR(iree_hal_hip_pending_queue_actions_issue(actions)); + IREE_TRACE_ZONE_END(z0); } // Issues the given kernel dispatch |action| to the GPU. static iree_status_t iree_hal_hip_pending_queue_actions_issue_execution( iree_hal_hip_queue_action_t* action) { - IREE_ASSERT(action->is_pending == false); + IREE_ASSERT_EQ(action->kind, IREE_HAL_HIP_QUEUE_ACTION_TYPE_EXECUTION); + IREE_ASSERT_EQ(action->is_pending, false); const iree_hal_hip_dynamic_symbols_t* symbols = action->owning_actions->symbols; IREE_TRACE_ZONE_BEGIN(z0); @@ -605,27 +638,32 @@ static iree_status_t iree_hal_hip_pending_queue_actions_issue_execution( return iree_ok_status(); } -// Releases resources after completing the given kernel dispatch |action|. -static void iree_hal_hip_pending_queue_actions_cleanup_execution( +// Performs the given cleanup |action| on the CPU. +static iree_status_t iree_hal_hip_pending_queue_actions_issue_cleanup( iree_hal_hip_queue_action_t* action) { iree_hal_hip_pending_queue_actions_t* actions = action->owning_actions; iree_allocator_t host_allocator = actions->host_allocator; IREE_TRACE_ZONE_BEGIN(z0); + // Call user provided callback before releasing any resource. if (action->cleanup_callback) { action->cleanup_callback(action->callback_user_data); } + // Only release resources after callbacks have been issued. iree_hal_resource_set_free(action->resource_set); iree_hal_hip_free_semaphore_list(host_allocator, &action->wait_semaphore_list); iree_hal_hip_free_semaphore_list(host_allocator, &action->signal_semaphore_list); + + // Drop reference to the pending action queue given now we are done. iree_hal_resource_release(actions); iree_allocator_free(host_allocator, action); IREE_TRACE_ZONE_END(z0); + return iree_ok_status(); } iree_status_t iree_hal_hip_pending_queue_actions_issue( @@ -657,35 +695,39 @@ iree_status_t iree_hal_hip_pending_queue_actions_issue( action->event_count = 0; action->is_pending = false; - // Look at all wait semaphores. - for (iree_host_size_t i = 0; i < semaphore_count; ++i) { - // If this semaphore has already signaled past the desired value, we can - // just ignore it. - uint64_t value = 0; - status = iree_hal_semaphore_query(semaphores[i], &value); - if (IREE_UNLIKELY(!iree_status_is_ok(status))) break; - if (value >= values[i]) continue; - - // Try to acquire a hipEvent_t from a device wait timepoint. If so, we can - // use that hipEvent_t to wait on the device. Otherwise, this action is - // still not ready. - hipEvent_t event = NULL; - status = iree_hal_hip_event_semaphore_acquire_timepoint_device_wait( - semaphores[i], values[i], &event); - if (IREE_UNLIKELY(!iree_status_is_ok(status))) break; - if (!event) { - // Clear the scratch fields. - action->event_count = 0; - action->is_pending = true; - break; - } - if (IREE_UNLIKELY(action->event_count >= - IREE_HAL_HIP_MAX_WAIT_EVENT_COUNT)) { - status = iree_make_status(IREE_STATUS_RESOURCE_EXHAUSTED, - "exceeded max wait hipEvent_t limit"); - break; + // Cleanup actions are immediately ready to release. Otherwise, look at all + // wait semaphores to make sure that they are either already ready or we can + // wait on a device event. + if (action->state == IREE_HAL_HIP_QUEUE_ACTION_STATE_ALIVE) { + for (iree_host_size_t i = 0; i < semaphore_count; ++i) { + // If this semaphore has already signaled past the desired value, we can + // just ignore it. + uint64_t value = 0; + status = iree_hal_semaphore_query(semaphores[i], &value); + if (IREE_UNLIKELY(!iree_status_is_ok(status))) break; + if (value >= values[i]) continue; + + // Try to acquire a hipEvent_t from a device wait timepoint. If so, we + // can use that hipEvent_t to wait on the device. Otherwise, this action + // is still not ready. + hipEvent_t event = NULL; + status = iree_hal_hip_event_semaphore_acquire_timepoint_device_wait( + semaphores[i], values[i], &event); + if (IREE_UNLIKELY(!iree_status_is_ok(status))) break; + if (!event) { + // Clear the scratch fields. + action->event_count = 0; + action->is_pending = true; + break; + } + if (IREE_UNLIKELY(action->event_count >= + IREE_HAL_HIP_MAX_WAIT_EVENT_COUNT)) { + status = iree_make_status(IREE_STATUS_RESOURCE_EXHAUSTED, + "exceeded max wait hipEvent_t limit"); + break; + } + action->events[action->event_count++] = event; } - action->events[action->event_count++] = event; } if (IREE_UNLIKELY(!iree_status_is_ok(status))) break; @@ -712,6 +754,12 @@ iree_status_t iree_hal_hip_pending_queue_actions_issue( iree_slim_mutex_unlock(&actions->action_mutex); + if (ready_list.head == NULL) { + // Nothing ready yet. Just return. + IREE_TRACE_ZONE_END(z0); + return status; + } + iree_hal_hip_atomic_slist_entry_t* entry = NULL; // TODO: avoid host allocator malloc; use some pool for the allocation. if (iree_status_is_ok(status)) { @@ -723,6 +771,7 @@ iree_status_t iree_hal_hip_pending_queue_actions_issue( // Release all actions in the ready list to avoid leaking. iree_hal_hip_queue_action_list_free_actions(actions->host_allocator, &ready_list); + iree_allocator_free(actions->host_allocator, entry); IREE_TRACE_ZONE_END(z0); return status; } @@ -732,11 +781,17 @@ iree_status_t iree_hal_hip_pending_queue_actions_issue( entry->ready_list_head = ready_list.head; iree_hal_hip_ready_action_slist_push(&actions->working_area.ready_worklist, entry); + + // We can only overwrite the worker state if the previous state is idle + // waiting; we cannot overwrite exit related states. so we need to perform + // atomic compare and exchange here. iree_hal_hip_worker_state_t prev_state = - (iree_hal_hip_worker_state_t)iree_atomic_exchange_int32( - &actions->working_area.worker_state, - IREE_HAL_HIP_WORKER_STATE_WORKLOAD_PENDING, - iree_memory_order_acq_rel); + IREE_HAL_HIP_WORKER_STATE_IDLE_WAITING; + iree_atomic_compare_exchange_strong_int32( + &actions->working_area.worker_state, /*expected=*/&prev_state, + /*desired=*/IREE_HAL_HIP_WORKER_STATE_WORKLOAD_PENDING, + /*order_succ=*/iree_memory_order_acq_rel, + /*order_fail=*/iree_memory_order_acquire); iree_notification_post(&actions->working_area.state_notification, IREE_ALL_WAITERS); @@ -759,6 +814,8 @@ static bool iree_hal_hip_worker_has_incoming_request( iree_hal_hip_working_area_t* working_area) { iree_hal_hip_worker_state_t value = iree_atomic_load_int32( &working_area->worker_state, iree_memory_order_acquire); + // These are the only two possible states that set from the main thread to + // the worker thread. return value == IREE_HAL_HIP_WORKER_STATE_WORKLOAD_PENDING || value == IREE_HAL_HIP_WORKER_STATE_EXIT_REQUESTED; } @@ -788,7 +845,14 @@ static iree_status_t iree_hal_hip_worker_process_ready_list( iree_hal_hip_queue_action_t* next_action = action->next; action->next = NULL; - status = iree_hal_hip_pending_queue_actions_issue_execution(action); + switch (action->state) { + case IREE_HAL_HIP_QUEUE_ACTION_STATE_ALIVE: + status = iree_hal_hip_pending_queue_actions_issue_execution(action); + break; + case IREE_HAL_HIP_QUEUE_ACTION_STATE_ZOMBIE: + status = iree_hal_hip_pending_queue_actions_issue_cleanup(action); + break; + } if (!iree_status_is_ok(status)) break; action->event_count = 0; @@ -814,11 +878,26 @@ static int iree_hal_hip_worker_execute( (iree_condition_fn_t)iree_hal_hip_worker_has_incoming_request, working_area, iree_infinite_timeout()); + // Immediately flip the state to idle waiting if and only if the previous + // state is workload pending. We do it before processing ready list to make + // sure that we don't accidentally ignore new workload pushed after done + // ready list processing but before overwriting the state from this worker + // thread. Also we don't want to overwrite other exit states. So we need to + // perform atomic compare and exchange here. + iree_hal_hip_worker_state_t prev_state = + IREE_HAL_HIP_WORKER_STATE_WORKLOAD_PENDING; + iree_atomic_compare_exchange_strong_int32( + &working_area->worker_state, /*expected=*/&prev_state, + /*desired=*/IREE_HAL_HIP_WORKER_STATE_IDLE_WAITING, + /*order_succ=*/iree_memory_order_acq_rel, + /*order_fail=*/iree_memory_order_acquire); + // Check if we received request to stop processing and exit this thread. bool should_exit = iree_atomic_load_int32(&working_area->worker_state, iree_memory_order_acquire) == IREE_HAL_HIP_WORKER_STATE_EXIT_REQUESTED; + // Process the ready list. We also want this even requested to exit. iree_status_t status = iree_hal_hip_worker_process_ready_list( working_area->host_allocator, worklist); if (IREE_UNLIKELY(!iree_status_is_ok(status))) { @@ -826,6 +905,7 @@ static int iree_hal_hip_worker_execute( iree_atomic_store_int32(&working_area->error_code, iree_status_code(status), iree_memory_order_release); + // This state has the highest priority so just overwrite. iree_atomic_store_int32(&working_area->worker_state, IREE_HAL_HIP_WORKER_STATE_EXIT_ERROR, iree_memory_order_release); @@ -835,7 +915,9 @@ static int iree_hal_hip_worker_execute( } if (should_exit) { - // Signal that this thread is committed to exit. + // Signal that this thread is committed to exit. This state has a priority + // that is only lower than error exit. And we just checked error exit in + // the above. So also just overwrite. iree_atomic_store_int32(&working_area->worker_state, IREE_HAL_HIP_WORKER_STATE_EXIT_COMMITTED, iree_memory_order_release); @@ -843,11 +925,6 @@ static int iree_hal_hip_worker_execute( IREE_ALL_WAITERS); return 0; } - - // Signal that this thread is done processing and now waiting for more. - iree_atomic_store_int32(&working_area->worker_state, - IREE_HAL_HIP_WORKER_STATE_IDLE_WAITING, - iree_memory_order_release); } return 0; } diff --git a/runtime/src/iree/hal/drivers/cuda/pending_queue_actions.c b/runtime/src/iree/hal/drivers/cuda/pending_queue_actions.c index 61b6576cdf26..211056285b88 100644 --- a/runtime/src/iree/hal/drivers/cuda/pending_queue_actions.c +++ b/runtime/src/iree/hal/drivers/cuda/pending_queue_actions.c @@ -12,6 +12,7 @@ #include "iree/base/api.h" #include "iree/base/internal/arena.h" #include "iree/base/internal/atomic_slist.h" +#include "iree/base/internal/atomics.h" #include "iree/base/internal/synchronization.h" #include "iree/base/internal/threading.h" #include "iree/hal/api.h" @@ -35,6 +36,13 @@ typedef enum iree_hal_cuda_queue_action_kind_e { // TODO: Add support for queue alloca and dealloca. } iree_hal_cuda_queue_action_kind_t; +typedef enum iree_hal_cuda_queue_action_state_e { + // The current action is active as waiting for or under execution. + IREE_HAL_cuda_QUEUE_ACTION_STATE_ALIVE, + // The current action is done execution and waiting for destruction. + IREE_HAL_cuda_QUEUE_ACTION_STATE_ZOMBIE, +} iree_hal_cuda_queue_action_state_t; + // A pending queue action. // // Note that this struct does not have internal synchronization; it's expected @@ -49,6 +57,11 @@ typedef struct iree_hal_cuda_queue_action_t { // Retained to make sure it outlives the current action. iree_hal_cuda_pending_queue_actions_t* owning_actions; + // The current state of this action. When an action is initially created it + // will be alive and enqueued to wait for releasing to the GPU. After done + // execution, it will be flipped into zombie state and enqueued again for + // destruction. + iree_hal_cuda_queue_action_state_t state; // The callback to run after completing this action and before freeing // all resources. Can be NULL. iree_hal_cuda_pending_action_cleanup_callback_t cleanup_callback; @@ -84,6 +97,7 @@ typedef struct iree_hal_cuda_queue_action_t { CUevent events[IREE_HAL_CUDA_MAX_WAIT_EVENT_COUNT]; iree_host_size_t event_count; bool is_pending; + // Whether the current action is still not ready for releasing to the GPU. } iree_hal_cuda_queue_action_t; //===----------------------------------------------------------------------===// @@ -139,7 +153,7 @@ static void iree_hal_cuda_queue_action_list_erase( static void iree_hal_cuda_queue_action_list_take_all( iree_hal_cuda_queue_action_list_t* available_list, iree_hal_cuda_queue_action_list_t* ready_list) { - IREE_ASSERT(available_list != ready_list); + IREE_ASSERT_NE(available_list, ready_list); ready_list->head = available_list->head; ready_list->tail = available_list->tail; available_list->head = NULL; @@ -173,12 +187,16 @@ IREE_TYPED_ATOMIC_SLIST_WRAPPER(iree_hal_cuda_ready_action, slist_next)); // The ready-list processing worker's working/exiting state. +// +// States in the list has increasing priorities--meaning normally ones appearing +// earlier can overwrite ones appearing later without checking; but not the +// reverse order. typedef enum iree_hal_cuda_worker_state_e { - IREE_HAL_CUDA_WORKER_STATE_IDLE_WAITING = 0, - IREE_HAL_CUDA_WORKER_STATE_WORKLOAD_PENDING = 1, - IREE_HAL_CUDA_WORKER_STATE_EXIT_REQUESTED = -1, - IREE_HAL_CUDA_WORKER_STATE_EXIT_COMMITTED = -2, - IREE_HAL_CUDA_WORKER_STATE_EXIT_ERROR = -3, + IREE_HAL_CUDA_WORKER_STATE_IDLE_WAITING = 0, // Worker to main thread + IREE_HAL_CUDA_WORKER_STATE_WORKLOAD_PENDING = 1, // Main to worker thread + IREE_HAL_CUDA_WORKER_STATE_EXIT_REQUESTED = -1, // Main to worker thread + IREE_HAL_CUDA_WORKER_STATE_EXIT_COMMITTED = -2, // Worker to main thread + IREE_HAL_CUDA_WORKER_STATE_EXIT_ERROR = -3, // Worker to main thread } iree_hal_cuda_worker_state_t; // The data structure needed by a ready-list processing worker thread to issue @@ -329,6 +347,7 @@ void iree_hal_cuda_pending_queue_actions_destroy( IREE_HAL_CUDA_WORKER_STATE_EXIT_REQUESTED, iree_memory_order_acq_rel); iree_notification_post(&working_area->state_notification, IREE_ALL_WAITERS); + // Check potential exit states from the worker. if (prev_state != IREE_HAL_CUDA_WORKER_STATE_EXIT_ERROR) { // Wait until the worker acknowledged exiting. iree_notification_await( @@ -424,12 +443,16 @@ iree_status_t iree_hal_cuda_pending_queue_actions_enqueue_execution( z0, iree_allocator_malloc(actions->host_allocator, sizeof(*action), (void**)&action)); - action->kind = IREE_HAL_CUDA_QUEUE_ACTION_TYPE_EXECUTION; + action->owning_actions = actions; + action->state = IREE_HAL_cuda_QUEUE_ACTION_STATE_ALIVE; action->cleanup_callback = cleanup_callback; action->callback_user_data = callback_user_data; + action->kind = IREE_HAL_CUDA_QUEUE_ACTION_TYPE_EXECUTION; action->device = device; action->dispatch_cu_stream = dispatch_stream; action->callback_cu_stream = callback_stream; + + // Initialize scratch fields. action->event_count = 0; action->is_pending = true; @@ -451,6 +474,9 @@ iree_status_t iree_hal_cuda_pending_queue_actions_enqueue_execution( iree_hal_resource_set_insert(resource_set, signal_semaphore_list.count, signal_semaphore_list.semaphores); } + if (IREE_LIKELY(iree_status_is_ok(status))) { + action->resource_set = resource_set; + } // Copy the command buffer list for later access. // TODO: avoid host allocator malloc; use some pool for the allocation. @@ -475,11 +501,10 @@ iree_status_t iree_hal_cuda_pending_queue_actions_enqueue_execution( } if (IREE_LIKELY(iree_status_is_ok(status))) { - action->owning_actions = actions; + // Retain the owning queue to make sure the action outlives it. iree_hal_resource_retain(actions); - action->resource_set = resource_set; - + // Now everything is okay and we can enqueue the action. iree_slim_mutex_lock(&actions->action_mutex); iree_hal_cuda_queue_action_list_push_back(&actions->action_list, action); iree_slim_mutex_unlock(&actions->action_mutex); @@ -498,35 +523,43 @@ iree_status_t iree_hal_cuda_pending_queue_actions_enqueue_execution( return status; } -static void iree_hal_cuda_pending_queue_actions_cleanup_execution( - iree_hal_cuda_queue_action_t* action); - // Releases resources after action completion on the GPU and advances timeline // and pending actions queue. // -// This is the CUDA host function callback to cudaLaunchHostFunc, invoked by a -// CUDA driver thread. +// This is the CUDA host function callback to cudaLaunchHostFunc(), invoked by a +// CUDA driver thread. Note that code in this function MUST NOT invoke any GPU +// API under the hood to avoid potential deadlock. static void iree_hal_cuda_execution_device_signal_host_callback( void* user_data) { IREE_TRACE_ZONE_BEGIN(z0); iree_hal_cuda_queue_action_t* action = (iree_hal_cuda_queue_action_t*)user_data; + IREE_ASSERT_EQ(action->kind, IREE_HAL_CUDA_QUEUE_ACTION_TYPE_EXECUTION); + IREE_ASSERT_EQ(action->state, IREE_HAL_cuda_QUEUE_ACTION_STATE_ALIVE); iree_hal_cuda_pending_queue_actions_t* actions = action->owning_actions; + + // Flip the action state to zombie and enqueue it again so that we can let + // the worker thread clean it up. Note that this is necessary because cleanup + // may involve GPU API calls like buffer releasing or unregistering, so we can + // not inline it here. + action->state = IREE_HAL_cuda_QUEUE_ACTION_STATE_ZOMBIE; + iree_slim_mutex_lock(&actions->action_mutex); + iree_hal_cuda_queue_action_list_push_back(&actions->action_list, action); + iree_slim_mutex_unlock(&actions->action_mutex); + // Advance semaphore timelines by calling into the host signaling function. + // This will internally try to release more workload to the GPU. IREE_IGNORE_ERROR( iree_hal_semaphore_list_signal(action->signal_semaphore_list)); - // Destroy the current action given its done now--this also frees all retained - // resources. - iree_hal_cuda_pending_queue_actions_cleanup_execution(action); - // Try to release more pending actions to the GPU now. - IREE_IGNORE_ERROR(iree_hal_cuda_pending_queue_actions_issue(actions)); + IREE_TRACE_ZONE_END(z0); } // Issues the given kernel dispatch |action| to the GPU. static iree_status_t iree_hal_cuda_pending_queue_actions_issue_execution( iree_hal_cuda_queue_action_t* action) { - IREE_ASSERT(action->is_pending == false); + IREE_ASSERT_EQ(action->kind, IREE_HAL_CUDA_QUEUE_ACTION_TYPE_EXECUTION); + IREE_ASSERT_EQ(action->is_pending, false); const iree_hal_cuda_dynamic_symbols_t* symbols = action->owning_actions->symbols; IREE_TRACE_ZONE_BEGIN(z0); @@ -607,27 +640,32 @@ static iree_status_t iree_hal_cuda_pending_queue_actions_issue_execution( return iree_ok_status(); } -// Releases resources after completing the given kernel dispatch |action|. -static void iree_hal_cuda_pending_queue_actions_cleanup_execution( +// Performs the given cleanup |action| on the CPU. +static iree_status_t iree_hal_cuda_pending_queue_actions_issue_cleanup( iree_hal_cuda_queue_action_t* action) { iree_hal_cuda_pending_queue_actions_t* actions = action->owning_actions; iree_allocator_t host_allocator = actions->host_allocator; IREE_TRACE_ZONE_BEGIN(z0); + // Call user provided callback before releasing any resource. if (action->cleanup_callback) { action->cleanup_callback(action->callback_user_data); } + // Only release resources after callbacks have been issued. iree_hal_resource_set_free(action->resource_set); iree_hal_cuda_free_semaphore_list(host_allocator, &action->wait_semaphore_list); iree_hal_cuda_free_semaphore_list(host_allocator, &action->signal_semaphore_list); + + // Drop reference to the pending action queue given now we are done. iree_hal_resource_release(actions); iree_allocator_free(host_allocator, action); IREE_TRACE_ZONE_END(z0); + return iree_ok_status(); } iree_status_t iree_hal_cuda_pending_queue_actions_issue( @@ -659,35 +697,39 @@ iree_status_t iree_hal_cuda_pending_queue_actions_issue( action->event_count = 0; action->is_pending = false; - // Look at all wait semaphores. - for (iree_host_size_t i = 0; i < semaphore_count; ++i) { - // If this semaphore has already signaled past the desired value, we can - // just ignore it. - uint64_t value = 0; - status = iree_hal_semaphore_query(semaphores[i], &value); - if (IREE_UNLIKELY(!iree_status_is_ok(status))) break; - if (value >= values[i]) continue; - - // Try to acquire a CUevent from a device wait timepoint. If so, we can - // use that CUevent to wait on the device. Otherwise, this action is still - // not ready. - CUevent event = NULL; - status = iree_hal_cuda_event_semaphore_acquire_timepoint_device_wait( - semaphores[i], values[i], &event); - if (IREE_UNLIKELY(!iree_status_is_ok(status))) break; - if (!event) { - // Clear the scratch fields. - action->event_count = 0; - action->is_pending = true; - break; + // Cleanup actions are immediately ready to release. Otherwise, look at all + // wait semaphores to make sure that they are either already ready or we can + // wait on a device event. + if (action->state == IREE_HAL_cuda_QUEUE_ACTION_STATE_ALIVE) { + for (iree_host_size_t i = 0; i < semaphore_count; ++i) { + // If this semaphore has already signaled past the desired value, we can + // just ignore it. + uint64_t value = 0; + status = iree_hal_semaphore_query(semaphores[i], &value); + if (IREE_UNLIKELY(!iree_status_is_ok(status))) break; + if (value >= values[i]) continue; + + // Try to acquire a CUevent from a device wait timepoint. If so, we can + // use that CUevent to wait on the device. Otherwise, this action is + // still not ready. + CUevent event = NULL; + status = iree_hal_cuda_event_semaphore_acquire_timepoint_device_wait( + semaphores[i], values[i], &event); + if (IREE_UNLIKELY(!iree_status_is_ok(status))) break; + if (!event) { + // Clear the scratch fields. + action->event_count = 0; + action->is_pending = true; + break; + } + if (IREE_UNLIKELY(action->event_count >= + IREE_HAL_CUDA_MAX_WAIT_EVENT_COUNT)) { + status = iree_make_status(IREE_STATUS_RESOURCE_EXHAUSTED, + "exceeded max wait CUevent limit"); + break; + } + action->events[action->event_count++] = event; } - if (IREE_UNLIKELY(action->event_count >= - IREE_HAL_CUDA_MAX_WAIT_EVENT_COUNT)) { - status = iree_make_status(IREE_STATUS_RESOURCE_EXHAUSTED, - "exceeded max wait CUevent limit"); - break; - } - action->events[action->event_count++] = event; } if (IREE_UNLIKELY(!iree_status_is_ok(status))) break; @@ -714,6 +756,12 @@ iree_status_t iree_hal_cuda_pending_queue_actions_issue( iree_slim_mutex_unlock(&actions->action_mutex); + if (ready_list.head == NULL) { + // Nothing ready yet. Just return. + IREE_TRACE_ZONE_END(z0); + return status; + } + iree_hal_cuda_atomic_slist_entry_t* entry = NULL; // TODO: avoid host allocator malloc; use some pool for the allocation. if (iree_status_is_ok(status)) { @@ -725,6 +773,7 @@ iree_status_t iree_hal_cuda_pending_queue_actions_issue( // Release all actions in the ready list to avoid leaking. iree_hal_cuda_queue_action_list_free_actions(actions->host_allocator, &ready_list); + iree_allocator_free(actions->host_allocator, entry); IREE_TRACE_ZONE_END(z0); return status; } @@ -734,11 +783,17 @@ iree_status_t iree_hal_cuda_pending_queue_actions_issue( entry->ready_list_head = ready_list.head; iree_hal_cuda_ready_action_slist_push(&actions->working_area.ready_worklist, entry); + + // We can only overwrite the worker state if the previous state is idle + // waiting; we cannot overwrite exit related states. so we need to perform + // atomic compare and exchange here. iree_hal_cuda_worker_state_t prev_state = - (iree_hal_cuda_worker_state_t)iree_atomic_exchange_int32( - &actions->working_area.worker_state, - IREE_HAL_CUDA_WORKER_STATE_WORKLOAD_PENDING, - iree_memory_order_acq_rel); + IREE_HAL_CUDA_WORKER_STATE_IDLE_WAITING; + iree_atomic_compare_exchange_strong_int32( + &actions->working_area.worker_state, /*expected=*/&prev_state, + /*desired=*/IREE_HAL_CUDA_WORKER_STATE_WORKLOAD_PENDING, + /*order_succ=*/iree_memory_order_acq_rel, + /*order_fail=*/iree_memory_order_acquire); iree_notification_post(&actions->working_area.state_notification, IREE_ALL_WAITERS); @@ -761,6 +816,8 @@ static bool iree_hal_cuda_worker_has_incoming_request( iree_hal_cuda_working_area_t* working_area) { iree_hal_cuda_worker_state_t value = iree_atomic_load_int32( &working_area->worker_state, iree_memory_order_acquire); + // These are the only two possible states that set from the main thread to + // the worker thread. return value == IREE_HAL_CUDA_WORKER_STATE_WORKLOAD_PENDING || value == IREE_HAL_CUDA_WORKER_STATE_EXIT_REQUESTED; } @@ -790,7 +847,14 @@ static iree_status_t iree_hal_cuda_worker_process_ready_list( iree_hal_cuda_queue_action_t* next_action = action->next; action->next = NULL; - status = iree_hal_cuda_pending_queue_actions_issue_execution(action); + switch (action->state) { + case IREE_HAL_cuda_QUEUE_ACTION_STATE_ALIVE: + status = iree_hal_cuda_pending_queue_actions_issue_execution(action); + break; + case IREE_HAL_cuda_QUEUE_ACTION_STATE_ZOMBIE: + status = iree_hal_cuda_pending_queue_actions_issue_cleanup(action); + break; + } if (!iree_status_is_ok(status)) break; action->event_count = 0; @@ -816,11 +880,26 @@ static int iree_hal_cuda_worker_execute( (iree_condition_fn_t)iree_hal_cuda_worker_has_incoming_request, working_area, iree_infinite_timeout()); + // Immediately flip the state to idle waiting if and only if the previous + // state is workload pending. We do it before processing ready list to make + // sure that we don't accidentally ignore new workload pushed after done + // ready list processing but before overwriting the state from this worker + // thread. Also we don't want to overwrite other exit states. So we need to + // perform atomic compare and exchange here. + iree_hal_cuda_worker_state_t prev_state = + IREE_HAL_CUDA_WORKER_STATE_WORKLOAD_PENDING; + iree_atomic_compare_exchange_strong_int32( + &working_area->worker_state, /*expected=*/&prev_state, + /*desired=*/IREE_HAL_CUDA_WORKER_STATE_IDLE_WAITING, + /*order_succ=*/iree_memory_order_acq_rel, + /*order_fail=*/iree_memory_order_acquire); + // Check if we received request to stop processing and exit this thread. bool should_exit = iree_atomic_load_int32(&working_area->worker_state, iree_memory_order_acquire) == IREE_HAL_CUDA_WORKER_STATE_EXIT_REQUESTED; + // Process the ready list. We also want this even requested to exit. iree_status_t status = iree_hal_cuda_worker_process_ready_list( working_area->host_allocator, worklist); if (IREE_UNLIKELY(!iree_status_is_ok(status))) { @@ -828,6 +907,7 @@ static int iree_hal_cuda_worker_execute( iree_atomic_store_int32(&working_area->error_code, iree_status_code(status), iree_memory_order_release); + // This state has the highest priority so just overwrite. iree_atomic_store_int32(&working_area->worker_state, IREE_HAL_CUDA_WORKER_STATE_EXIT_ERROR, iree_memory_order_release); @@ -837,7 +917,9 @@ static int iree_hal_cuda_worker_execute( } if (should_exit) { - // Signal that this thread is committed to exit. + // Signal that this thread is committed to exit. This state has a priority + // that is only lower than error exit. And we just checked error exit in + // the above. So also just overwrite. iree_atomic_store_int32(&working_area->worker_state, IREE_HAL_CUDA_WORKER_STATE_EXIT_COMMITTED, iree_memory_order_release); @@ -845,11 +927,6 @@ static int iree_hal_cuda_worker_execute( IREE_ALL_WAITERS); return 0; } - - // Signal that this thread is done processing and now waiting for more. - iree_atomic_store_int32(&working_area->worker_state, - IREE_HAL_CUDA_WORKER_STATE_IDLE_WAITING, - iree_memory_order_release); } return 0; }