diff --git a/experimental/hip/pending_queue_actions.c b/experimental/hip/pending_queue_actions.c index cf63c45f06af..7f346a3d334f 100644 --- a/experimental/hip/pending_queue_actions.c +++ b/experimental/hip/pending_queue_actions.c @@ -17,6 +17,7 @@ #include "iree/base/api.h" #include "iree/base/internal/arena.h" #include "iree/base/internal/atomic_slist.h" +#include "iree/base/internal/atomics.h" #include "iree/base/internal/synchronization.h" #include "iree/base/internal/threading.h" #include "iree/hal/api.h" @@ -35,6 +36,13 @@ typedef enum iree_hal_hip_queue_action_kind_e { // TODO: Add support for queue alloca and dealloca. } iree_hal_hip_queue_action_kind_t; +typedef enum iree_hal_hip_queue_action_state_e { + // The current action is active as waiting for or under execution. + IREE_HAL_HIP_QUEUE_ACTION_STATE_ALIVE, + // The current action is done execution and waiting for destruction. + IREE_HAL_HIP_QUEUE_ACTION_STATE_ZOMBIE, +} iree_hal_hip_queue_action_state_t; + // A pending queue action. // // Note that this struct does not have internal synchronization; it's expected @@ -49,6 +57,11 @@ typedef struct iree_hal_hip_queue_action_t { // Retained to make sure it outlives the current action. iree_hal_hip_pending_queue_actions_t* owning_actions; + // The current state of this action. When an action is initially created it + // will be alive and enqueued to wait for releasing to the GPU. After done + // execution, it will be flipped into zombie state and enqueued again for + // destruction. + iree_hal_hip_queue_action_state_t state; // The callback to run after completing this action and before freeing // all resources. Can be NULL. iree_hal_hip_pending_action_cleanup_callback_t cleanup_callback; @@ -83,6 +96,7 @@ typedef struct iree_hal_hip_queue_action_t { // Scratch fields for analyzing whether actions are ready to issue. hipEvent_t events[IREE_HAL_HIP_MAX_WAIT_EVENT_COUNT]; iree_host_size_t event_count; + // Whether the current action is still not ready for releasing to the GPU. bool is_pending; } iree_hal_hip_queue_action_t; @@ -139,7 +153,7 @@ static void iree_hal_hip_queue_action_list_erase( static void iree_hal_hip_queue_action_list_take_all( iree_hal_hip_queue_action_list_t* available_list, iree_hal_hip_queue_action_list_t* ready_list) { - IREE_ASSERT(available_list != ready_list); + IREE_ASSERT_NE(available_list, ready_list); ready_list->head = available_list->head; ready_list->tail = available_list->tail; available_list->head = NULL; @@ -173,12 +187,16 @@ IREE_TYPED_ATOMIC_SLIST_WRAPPER(iree_hal_hip_ready_action, slist_next)); // The ready-list processing worker's working/exiting state. +// +// States in the list has increasing priorities--meaning normally ones appearing +// earlier can overwrite ones appearing later without checking; but not the +// reverse order. typedef enum iree_hal_hip_worker_state_e { - IREE_HAL_HIP_WORKER_STATE_IDLE_WAITING = 0, - IREE_HAL_HIP_WORKER_STATE_WORKLOAD_PENDING = 1, - IREE_HAL_HIP_WORKER_STATE_EXIT_REQUESTED = -1, - IREE_HAL_HIP_WORKER_STATE_EXIT_COMMITTED = -2, - IREE_HAL_HIP_WORKER_STATE_EXIT_ERROR = -3, + IREE_HAL_HIP_WORKER_STATE_IDLE_WAITING = 0, // Worker to main thread + IREE_HAL_HIP_WORKER_STATE_WORKLOAD_PENDING = 1, // Main to worker thread + IREE_HAL_HIP_WORKER_STATE_EXIT_REQUESTED = -1, // Main to worker thread + IREE_HAL_HIP_WORKER_STATE_EXIT_COMMITTED = -2, // Worker to main thread + IREE_HAL_HIP_WORKER_STATE_EXIT_ERROR = -3, // Worker to main thread } iree_hal_hip_worker_state_t; // The data structure needed by a ready-list processing worker thread to issue @@ -329,6 +347,7 @@ void iree_hal_hip_pending_queue_actions_destroy( iree_memory_order_acq_rel); iree_notification_post(&working_area->state_notification, IREE_ALL_WAITERS); + // Check potential exit states from the worker. if (prev_state != IREE_HAL_HIP_WORKER_STATE_EXIT_ERROR) { // Wait until the worker acknowledged exiting. iree_notification_await( @@ -424,12 +443,16 @@ iree_status_t iree_hal_hip_pending_queue_actions_enqueue_execution( z0, iree_allocator_malloc(actions->host_allocator, sizeof(*action), (void**)&action)); - action->kind = IREE_HAL_HIP_QUEUE_ACTION_TYPE_EXECUTION; + action->owning_actions = actions; + action->state = IREE_HAL_HIP_QUEUE_ACTION_STATE_ALIVE; action->cleanup_callback = cleanup_callback; action->callback_user_data = callback_user_data; + action->kind = IREE_HAL_HIP_QUEUE_ACTION_TYPE_EXECUTION; action->device = device; action->dispatch_hip_stream = dispatch_stream; action->callback_hip_stream = callback_stream; + + // Initialize scratch fields. action->event_count = 0; action->is_pending = true; @@ -451,6 +474,9 @@ iree_status_t iree_hal_hip_pending_queue_actions_enqueue_execution( iree_hal_resource_set_insert(resource_set, signal_semaphore_list.count, signal_semaphore_list.semaphores); } + if (IREE_LIKELY(iree_status_is_ok(status))) { + action->resource_set = resource_set; + } // Copy the command buffer list for later access. // TODO: avoid host allocator malloc; use some pool for the allocation. @@ -475,11 +501,10 @@ iree_status_t iree_hal_hip_pending_queue_actions_enqueue_execution( } if (IREE_LIKELY(iree_status_is_ok(status))) { - action->owning_actions = actions; + // Retain the owning queue to make sure the action outlives it. iree_hal_resource_retain(actions); - action->resource_set = resource_set; - + // Now everything is okay and we can enqueue the action. iree_slim_mutex_lock(&actions->action_mutex); iree_hal_hip_queue_action_list_push_back(&actions->action_list, action); iree_slim_mutex_unlock(&actions->action_mutex); @@ -498,34 +523,42 @@ iree_status_t iree_hal_hip_pending_queue_actions_enqueue_execution( return status; } -static void iree_hal_hip_pending_queue_actions_cleanup_execution( - iree_hal_hip_queue_action_t* action); - // Releases resources after action completion on the GPU and advances timeline // and pending actions queue. // -// This is the HIP host function callback to hipLaunchHostFunc, invoked by a -// HIP driver thread. +// This is the HIP host function callback to hipLaunchHostFunc(), invoked by a +// HIP driver thread. Note that code in this function MUST NOT invoke any GPU +// API under the hood to avoid potential deadlock. static void iree_hal_hip_execution_device_signal_host_callback( void* user_data) { IREE_TRACE_ZONE_BEGIN(z0); iree_hal_hip_queue_action_t* action = (iree_hal_hip_queue_action_t*)user_data; + IREE_ASSERT_EQ(action->kind, IREE_HAL_HIP_QUEUE_ACTION_TYPE_EXECUTION); + IREE_ASSERT_EQ(action->state, IREE_HAL_HIP_QUEUE_ACTION_STATE_ALIVE); iree_hal_hip_pending_queue_actions_t* actions = action->owning_actions; + + // Flip the action state to zombie and enqueue it again so that we can let + // the worker thread clean it up. Note that this is necessary because cleanup + // may involve GPU API calls like buffer releasing or unregistering, so we can + // not inline it here. + action->state = IREE_HAL_HIP_QUEUE_ACTION_STATE_ZOMBIE; + iree_slim_mutex_lock(&actions->action_mutex); + iree_hal_hip_queue_action_list_push_back(&actions->action_list, action); + iree_slim_mutex_unlock(&actions->action_mutex); + // Advance semaphore timelines by calling into the host signaling function. + // This will internally try to release more workload to the GPU. IREE_IGNORE_ERROR( iree_hal_semaphore_list_signal(action->signal_semaphore_list)); - // Destroy the current action given its done now--this also frees all retained - // resources. - iree_hal_hip_pending_queue_actions_cleanup_execution(action); - // Try to release more pending actions to the GPU now. - IREE_IGNORE_ERROR(iree_hal_hip_pending_queue_actions_issue(actions)); + IREE_TRACE_ZONE_END(z0); } // Issues the given kernel dispatch |action| to the GPU. static iree_status_t iree_hal_hip_pending_queue_actions_issue_execution( iree_hal_hip_queue_action_t* action) { - IREE_ASSERT(action->is_pending == false); + IREE_ASSERT_EQ(action->kind, IREE_HAL_HIP_QUEUE_ACTION_TYPE_EXECUTION); + IREE_ASSERT_EQ(action->is_pending, false); const iree_hal_hip_dynamic_symbols_t* symbols = action->owning_actions->symbols; IREE_TRACE_ZONE_BEGIN(z0); @@ -605,27 +638,32 @@ static iree_status_t iree_hal_hip_pending_queue_actions_issue_execution( return iree_ok_status(); } -// Releases resources after completing the given kernel dispatch |action|. -static void iree_hal_hip_pending_queue_actions_cleanup_execution( +// Performs the given cleanup |action| on the CPU. +static iree_status_t iree_hal_hip_pending_queue_actions_issue_cleanup( iree_hal_hip_queue_action_t* action) { iree_hal_hip_pending_queue_actions_t* actions = action->owning_actions; iree_allocator_t host_allocator = actions->host_allocator; IREE_TRACE_ZONE_BEGIN(z0); + // Call user provided callback before releasing any resource. if (action->cleanup_callback) { action->cleanup_callback(action->callback_user_data); } + // Only release resources after callbacks have been issued. iree_hal_resource_set_free(action->resource_set); iree_hal_hip_free_semaphore_list(host_allocator, &action->wait_semaphore_list); iree_hal_hip_free_semaphore_list(host_allocator, &action->signal_semaphore_list); + + // Drop reference to the pending action queue given now we are done. iree_hal_resource_release(actions); iree_allocator_free(host_allocator, action); IREE_TRACE_ZONE_END(z0); + return iree_ok_status(); } iree_status_t iree_hal_hip_pending_queue_actions_issue( @@ -657,35 +695,39 @@ iree_status_t iree_hal_hip_pending_queue_actions_issue( action->event_count = 0; action->is_pending = false; - // Look at all wait semaphores. - for (iree_host_size_t i = 0; i < semaphore_count; ++i) { - // If this semaphore has already signaled past the desired value, we can - // just ignore it. - uint64_t value = 0; - status = iree_hal_semaphore_query(semaphores[i], &value); - if (IREE_UNLIKELY(!iree_status_is_ok(status))) break; - if (value >= values[i]) continue; - - // Try to acquire a hipEvent_t from a device wait timepoint. If so, we can - // use that hipEvent_t to wait on the device. Otherwise, this action is - // still not ready. - hipEvent_t event = NULL; - status = iree_hal_hip_event_semaphore_acquire_timepoint_device_wait( - semaphores[i], values[i], &event); - if (IREE_UNLIKELY(!iree_status_is_ok(status))) break; - if (!event) { - // Clear the scratch fields. - action->event_count = 0; - action->is_pending = true; - break; - } - if (IREE_UNLIKELY(action->event_count >= - IREE_HAL_HIP_MAX_WAIT_EVENT_COUNT)) { - status = iree_make_status(IREE_STATUS_RESOURCE_EXHAUSTED, - "exceeded max wait hipEvent_t limit"); - break; + // Cleanup actions are immediately ready to release. Otherwise, look at all + // wait semaphores to make sure that they are either already ready or we can + // wait on a device event. + if (action->state == IREE_HAL_HIP_QUEUE_ACTION_STATE_ALIVE) { + for (iree_host_size_t i = 0; i < semaphore_count; ++i) { + // If this semaphore has already signaled past the desired value, we can + // just ignore it. + uint64_t value = 0; + status = iree_hal_semaphore_query(semaphores[i], &value); + if (IREE_UNLIKELY(!iree_status_is_ok(status))) break; + if (value >= values[i]) continue; + + // Try to acquire a hipEvent_t from a device wait timepoint. If so, we + // can use that hipEvent_t to wait on the device. Otherwise, this action + // is still not ready. + hipEvent_t event = NULL; + status = iree_hal_hip_event_semaphore_acquire_timepoint_device_wait( + semaphores[i], values[i], &event); + if (IREE_UNLIKELY(!iree_status_is_ok(status))) break; + if (!event) { + // Clear the scratch fields. + action->event_count = 0; + action->is_pending = true; + break; + } + if (IREE_UNLIKELY(action->event_count >= + IREE_HAL_HIP_MAX_WAIT_EVENT_COUNT)) { + status = iree_make_status(IREE_STATUS_RESOURCE_EXHAUSTED, + "exceeded max wait hipEvent_t limit"); + break; + } + action->events[action->event_count++] = event; } - action->events[action->event_count++] = event; } if (IREE_UNLIKELY(!iree_status_is_ok(status))) break; @@ -712,6 +754,12 @@ iree_status_t iree_hal_hip_pending_queue_actions_issue( iree_slim_mutex_unlock(&actions->action_mutex); + if (ready_list.head == NULL) { + // Nothing ready yet. Just return. + IREE_TRACE_ZONE_END(z0); + return status; + } + iree_hal_hip_atomic_slist_entry_t* entry = NULL; // TODO: avoid host allocator malloc; use some pool for the allocation. if (iree_status_is_ok(status)) { @@ -723,6 +771,7 @@ iree_status_t iree_hal_hip_pending_queue_actions_issue( // Release all actions in the ready list to avoid leaking. iree_hal_hip_queue_action_list_free_actions(actions->host_allocator, &ready_list); + iree_allocator_free(actions->host_allocator, entry); IREE_TRACE_ZONE_END(z0); return status; } @@ -732,11 +781,17 @@ iree_status_t iree_hal_hip_pending_queue_actions_issue( entry->ready_list_head = ready_list.head; iree_hal_hip_ready_action_slist_push(&actions->working_area.ready_worklist, entry); + + // We can only overwrite the worker state if the previous state is idle + // waiting; we cannot overwrite exit related states. so we need to perform + // atomic compare and exchange here. iree_hal_hip_worker_state_t prev_state = - (iree_hal_hip_worker_state_t)iree_atomic_exchange_int32( - &actions->working_area.worker_state, - IREE_HAL_HIP_WORKER_STATE_WORKLOAD_PENDING, - iree_memory_order_acq_rel); + IREE_HAL_HIP_WORKER_STATE_IDLE_WAITING; + iree_atomic_compare_exchange_strong_int32( + &actions->working_area.worker_state, /*expected=*/&prev_state, + /*desired=*/IREE_HAL_HIP_WORKER_STATE_WORKLOAD_PENDING, + /*order_succ=*/iree_memory_order_acq_rel, + /*order_fail=*/iree_memory_order_acquire); iree_notification_post(&actions->working_area.state_notification, IREE_ALL_WAITERS); @@ -759,6 +814,8 @@ static bool iree_hal_hip_worker_has_incoming_request( iree_hal_hip_working_area_t* working_area) { iree_hal_hip_worker_state_t value = iree_atomic_load_int32( &working_area->worker_state, iree_memory_order_acquire); + // These are the only two possible states that set from the main thread to + // the worker thread. return value == IREE_HAL_HIP_WORKER_STATE_WORKLOAD_PENDING || value == IREE_HAL_HIP_WORKER_STATE_EXIT_REQUESTED; } @@ -788,7 +845,14 @@ static iree_status_t iree_hal_hip_worker_process_ready_list( iree_hal_hip_queue_action_t* next_action = action->next; action->next = NULL; - status = iree_hal_hip_pending_queue_actions_issue_execution(action); + switch (action->state) { + case IREE_HAL_HIP_QUEUE_ACTION_STATE_ALIVE: + status = iree_hal_hip_pending_queue_actions_issue_execution(action); + break; + case IREE_HAL_HIP_QUEUE_ACTION_STATE_ZOMBIE: + status = iree_hal_hip_pending_queue_actions_issue_cleanup(action); + break; + } if (!iree_status_is_ok(status)) break; action->event_count = 0; @@ -814,11 +878,26 @@ static int iree_hal_hip_worker_execute( (iree_condition_fn_t)iree_hal_hip_worker_has_incoming_request, working_area, iree_infinite_timeout()); + // Immediately flip the state to idle waiting if and only if the previous + // state is workload pending. We do it before processing ready list to make + // sure that we don't accidentally ignore new workload pushed after done + // ready list processing but before overwriting the state from this worker + // thread. Also we don't want to overwrite other exit states. So we need to + // perform atomic compare and exchange here. + iree_hal_hip_worker_state_t prev_state = + IREE_HAL_HIP_WORKER_STATE_WORKLOAD_PENDING; + iree_atomic_compare_exchange_strong_int32( + &working_area->worker_state, /*expected=*/&prev_state, + /*desired=*/IREE_HAL_HIP_WORKER_STATE_IDLE_WAITING, + /*order_succ=*/iree_memory_order_acq_rel, + /*order_fail=*/iree_memory_order_acquire); + // Check if we received request to stop processing and exit this thread. bool should_exit = iree_atomic_load_int32(&working_area->worker_state, iree_memory_order_acquire) == IREE_HAL_HIP_WORKER_STATE_EXIT_REQUESTED; + // Process the ready list. We also want this even requested to exit. iree_status_t status = iree_hal_hip_worker_process_ready_list( working_area->host_allocator, worklist); if (IREE_UNLIKELY(!iree_status_is_ok(status))) { @@ -826,6 +905,7 @@ static int iree_hal_hip_worker_execute( iree_atomic_store_int32(&working_area->error_code, iree_status_code(status), iree_memory_order_release); + // This state has the highest priority so just overwrite. iree_atomic_store_int32(&working_area->worker_state, IREE_HAL_HIP_WORKER_STATE_EXIT_ERROR, iree_memory_order_release); @@ -835,7 +915,9 @@ static int iree_hal_hip_worker_execute( } if (should_exit) { - // Signal that this thread is committed to exit. + // Signal that this thread is committed to exit. This state has a priority + // that is only lower than error exit. And we just checked error exit in + // the above. So also just overwrite. iree_atomic_store_int32(&working_area->worker_state, IREE_HAL_HIP_WORKER_STATE_EXIT_COMMITTED, iree_memory_order_release); @@ -843,11 +925,6 @@ static int iree_hal_hip_worker_execute( IREE_ALL_WAITERS); return 0; } - - // Signal that this thread is done processing and now waiting for more. - iree_atomic_store_int32(&working_area->worker_state, - IREE_HAL_HIP_WORKER_STATE_IDLE_WAITING, - iree_memory_order_release); } return 0; } diff --git a/runtime/src/iree/hal/drivers/cuda/pending_queue_actions.c b/runtime/src/iree/hal/drivers/cuda/pending_queue_actions.c index 61b6576cdf26..211056285b88 100644 --- a/runtime/src/iree/hal/drivers/cuda/pending_queue_actions.c +++ b/runtime/src/iree/hal/drivers/cuda/pending_queue_actions.c @@ -12,6 +12,7 @@ #include "iree/base/api.h" #include "iree/base/internal/arena.h" #include "iree/base/internal/atomic_slist.h" +#include "iree/base/internal/atomics.h" #include "iree/base/internal/synchronization.h" #include "iree/base/internal/threading.h" #include "iree/hal/api.h" @@ -35,6 +36,13 @@ typedef enum iree_hal_cuda_queue_action_kind_e { // TODO: Add support for queue alloca and dealloca. } iree_hal_cuda_queue_action_kind_t; +typedef enum iree_hal_cuda_queue_action_state_e { + // The current action is active as waiting for or under execution. + IREE_HAL_cuda_QUEUE_ACTION_STATE_ALIVE, + // The current action is done execution and waiting for destruction. + IREE_HAL_cuda_QUEUE_ACTION_STATE_ZOMBIE, +} iree_hal_cuda_queue_action_state_t; + // A pending queue action. // // Note that this struct does not have internal synchronization; it's expected @@ -49,6 +57,11 @@ typedef struct iree_hal_cuda_queue_action_t { // Retained to make sure it outlives the current action. iree_hal_cuda_pending_queue_actions_t* owning_actions; + // The current state of this action. When an action is initially created it + // will be alive and enqueued to wait for releasing to the GPU. After done + // execution, it will be flipped into zombie state and enqueued again for + // destruction. + iree_hal_cuda_queue_action_state_t state; // The callback to run after completing this action and before freeing // all resources. Can be NULL. iree_hal_cuda_pending_action_cleanup_callback_t cleanup_callback; @@ -84,6 +97,7 @@ typedef struct iree_hal_cuda_queue_action_t { CUevent events[IREE_HAL_CUDA_MAX_WAIT_EVENT_COUNT]; iree_host_size_t event_count; bool is_pending; + // Whether the current action is still not ready for releasing to the GPU. } iree_hal_cuda_queue_action_t; //===----------------------------------------------------------------------===// @@ -139,7 +153,7 @@ static void iree_hal_cuda_queue_action_list_erase( static void iree_hal_cuda_queue_action_list_take_all( iree_hal_cuda_queue_action_list_t* available_list, iree_hal_cuda_queue_action_list_t* ready_list) { - IREE_ASSERT(available_list != ready_list); + IREE_ASSERT_NE(available_list, ready_list); ready_list->head = available_list->head; ready_list->tail = available_list->tail; available_list->head = NULL; @@ -173,12 +187,16 @@ IREE_TYPED_ATOMIC_SLIST_WRAPPER(iree_hal_cuda_ready_action, slist_next)); // The ready-list processing worker's working/exiting state. +// +// States in the list has increasing priorities--meaning normally ones appearing +// earlier can overwrite ones appearing later without checking; but not the +// reverse order. typedef enum iree_hal_cuda_worker_state_e { - IREE_HAL_CUDA_WORKER_STATE_IDLE_WAITING = 0, - IREE_HAL_CUDA_WORKER_STATE_WORKLOAD_PENDING = 1, - IREE_HAL_CUDA_WORKER_STATE_EXIT_REQUESTED = -1, - IREE_HAL_CUDA_WORKER_STATE_EXIT_COMMITTED = -2, - IREE_HAL_CUDA_WORKER_STATE_EXIT_ERROR = -3, + IREE_HAL_CUDA_WORKER_STATE_IDLE_WAITING = 0, // Worker to main thread + IREE_HAL_CUDA_WORKER_STATE_WORKLOAD_PENDING = 1, // Main to worker thread + IREE_HAL_CUDA_WORKER_STATE_EXIT_REQUESTED = -1, // Main to worker thread + IREE_HAL_CUDA_WORKER_STATE_EXIT_COMMITTED = -2, // Worker to main thread + IREE_HAL_CUDA_WORKER_STATE_EXIT_ERROR = -3, // Worker to main thread } iree_hal_cuda_worker_state_t; // The data structure needed by a ready-list processing worker thread to issue @@ -329,6 +347,7 @@ void iree_hal_cuda_pending_queue_actions_destroy( IREE_HAL_CUDA_WORKER_STATE_EXIT_REQUESTED, iree_memory_order_acq_rel); iree_notification_post(&working_area->state_notification, IREE_ALL_WAITERS); + // Check potential exit states from the worker. if (prev_state != IREE_HAL_CUDA_WORKER_STATE_EXIT_ERROR) { // Wait until the worker acknowledged exiting. iree_notification_await( @@ -424,12 +443,16 @@ iree_status_t iree_hal_cuda_pending_queue_actions_enqueue_execution( z0, iree_allocator_malloc(actions->host_allocator, sizeof(*action), (void**)&action)); - action->kind = IREE_HAL_CUDA_QUEUE_ACTION_TYPE_EXECUTION; + action->owning_actions = actions; + action->state = IREE_HAL_cuda_QUEUE_ACTION_STATE_ALIVE; action->cleanup_callback = cleanup_callback; action->callback_user_data = callback_user_data; + action->kind = IREE_HAL_CUDA_QUEUE_ACTION_TYPE_EXECUTION; action->device = device; action->dispatch_cu_stream = dispatch_stream; action->callback_cu_stream = callback_stream; + + // Initialize scratch fields. action->event_count = 0; action->is_pending = true; @@ -451,6 +474,9 @@ iree_status_t iree_hal_cuda_pending_queue_actions_enqueue_execution( iree_hal_resource_set_insert(resource_set, signal_semaphore_list.count, signal_semaphore_list.semaphores); } + if (IREE_LIKELY(iree_status_is_ok(status))) { + action->resource_set = resource_set; + } // Copy the command buffer list for later access. // TODO: avoid host allocator malloc; use some pool for the allocation. @@ -475,11 +501,10 @@ iree_status_t iree_hal_cuda_pending_queue_actions_enqueue_execution( } if (IREE_LIKELY(iree_status_is_ok(status))) { - action->owning_actions = actions; + // Retain the owning queue to make sure the action outlives it. iree_hal_resource_retain(actions); - action->resource_set = resource_set; - + // Now everything is okay and we can enqueue the action. iree_slim_mutex_lock(&actions->action_mutex); iree_hal_cuda_queue_action_list_push_back(&actions->action_list, action); iree_slim_mutex_unlock(&actions->action_mutex); @@ -498,35 +523,43 @@ iree_status_t iree_hal_cuda_pending_queue_actions_enqueue_execution( return status; } -static void iree_hal_cuda_pending_queue_actions_cleanup_execution( - iree_hal_cuda_queue_action_t* action); - // Releases resources after action completion on the GPU and advances timeline // and pending actions queue. // -// This is the CUDA host function callback to cudaLaunchHostFunc, invoked by a -// CUDA driver thread. +// This is the CUDA host function callback to cudaLaunchHostFunc(), invoked by a +// CUDA driver thread. Note that code in this function MUST NOT invoke any GPU +// API under the hood to avoid potential deadlock. static void iree_hal_cuda_execution_device_signal_host_callback( void* user_data) { IREE_TRACE_ZONE_BEGIN(z0); iree_hal_cuda_queue_action_t* action = (iree_hal_cuda_queue_action_t*)user_data; + IREE_ASSERT_EQ(action->kind, IREE_HAL_CUDA_QUEUE_ACTION_TYPE_EXECUTION); + IREE_ASSERT_EQ(action->state, IREE_HAL_cuda_QUEUE_ACTION_STATE_ALIVE); iree_hal_cuda_pending_queue_actions_t* actions = action->owning_actions; + + // Flip the action state to zombie and enqueue it again so that we can let + // the worker thread clean it up. Note that this is necessary because cleanup + // may involve GPU API calls like buffer releasing or unregistering, so we can + // not inline it here. + action->state = IREE_HAL_cuda_QUEUE_ACTION_STATE_ZOMBIE; + iree_slim_mutex_lock(&actions->action_mutex); + iree_hal_cuda_queue_action_list_push_back(&actions->action_list, action); + iree_slim_mutex_unlock(&actions->action_mutex); + // Advance semaphore timelines by calling into the host signaling function. + // This will internally try to release more workload to the GPU. IREE_IGNORE_ERROR( iree_hal_semaphore_list_signal(action->signal_semaphore_list)); - // Destroy the current action given its done now--this also frees all retained - // resources. - iree_hal_cuda_pending_queue_actions_cleanup_execution(action); - // Try to release more pending actions to the GPU now. - IREE_IGNORE_ERROR(iree_hal_cuda_pending_queue_actions_issue(actions)); + IREE_TRACE_ZONE_END(z0); } // Issues the given kernel dispatch |action| to the GPU. static iree_status_t iree_hal_cuda_pending_queue_actions_issue_execution( iree_hal_cuda_queue_action_t* action) { - IREE_ASSERT(action->is_pending == false); + IREE_ASSERT_EQ(action->kind, IREE_HAL_CUDA_QUEUE_ACTION_TYPE_EXECUTION); + IREE_ASSERT_EQ(action->is_pending, false); const iree_hal_cuda_dynamic_symbols_t* symbols = action->owning_actions->symbols; IREE_TRACE_ZONE_BEGIN(z0); @@ -607,27 +640,32 @@ static iree_status_t iree_hal_cuda_pending_queue_actions_issue_execution( return iree_ok_status(); } -// Releases resources after completing the given kernel dispatch |action|. -static void iree_hal_cuda_pending_queue_actions_cleanup_execution( +// Performs the given cleanup |action| on the CPU. +static iree_status_t iree_hal_cuda_pending_queue_actions_issue_cleanup( iree_hal_cuda_queue_action_t* action) { iree_hal_cuda_pending_queue_actions_t* actions = action->owning_actions; iree_allocator_t host_allocator = actions->host_allocator; IREE_TRACE_ZONE_BEGIN(z0); + // Call user provided callback before releasing any resource. if (action->cleanup_callback) { action->cleanup_callback(action->callback_user_data); } + // Only release resources after callbacks have been issued. iree_hal_resource_set_free(action->resource_set); iree_hal_cuda_free_semaphore_list(host_allocator, &action->wait_semaphore_list); iree_hal_cuda_free_semaphore_list(host_allocator, &action->signal_semaphore_list); + + // Drop reference to the pending action queue given now we are done. iree_hal_resource_release(actions); iree_allocator_free(host_allocator, action); IREE_TRACE_ZONE_END(z0); + return iree_ok_status(); } iree_status_t iree_hal_cuda_pending_queue_actions_issue( @@ -659,35 +697,39 @@ iree_status_t iree_hal_cuda_pending_queue_actions_issue( action->event_count = 0; action->is_pending = false; - // Look at all wait semaphores. - for (iree_host_size_t i = 0; i < semaphore_count; ++i) { - // If this semaphore has already signaled past the desired value, we can - // just ignore it. - uint64_t value = 0; - status = iree_hal_semaphore_query(semaphores[i], &value); - if (IREE_UNLIKELY(!iree_status_is_ok(status))) break; - if (value >= values[i]) continue; - - // Try to acquire a CUevent from a device wait timepoint. If so, we can - // use that CUevent to wait on the device. Otherwise, this action is still - // not ready. - CUevent event = NULL; - status = iree_hal_cuda_event_semaphore_acquire_timepoint_device_wait( - semaphores[i], values[i], &event); - if (IREE_UNLIKELY(!iree_status_is_ok(status))) break; - if (!event) { - // Clear the scratch fields. - action->event_count = 0; - action->is_pending = true; - break; + // Cleanup actions are immediately ready to release. Otherwise, look at all + // wait semaphores to make sure that they are either already ready or we can + // wait on a device event. + if (action->state == IREE_HAL_cuda_QUEUE_ACTION_STATE_ALIVE) { + for (iree_host_size_t i = 0; i < semaphore_count; ++i) { + // If this semaphore has already signaled past the desired value, we can + // just ignore it. + uint64_t value = 0; + status = iree_hal_semaphore_query(semaphores[i], &value); + if (IREE_UNLIKELY(!iree_status_is_ok(status))) break; + if (value >= values[i]) continue; + + // Try to acquire a CUevent from a device wait timepoint. If so, we can + // use that CUevent to wait on the device. Otherwise, this action is + // still not ready. + CUevent event = NULL; + status = iree_hal_cuda_event_semaphore_acquire_timepoint_device_wait( + semaphores[i], values[i], &event); + if (IREE_UNLIKELY(!iree_status_is_ok(status))) break; + if (!event) { + // Clear the scratch fields. + action->event_count = 0; + action->is_pending = true; + break; + } + if (IREE_UNLIKELY(action->event_count >= + IREE_HAL_CUDA_MAX_WAIT_EVENT_COUNT)) { + status = iree_make_status(IREE_STATUS_RESOURCE_EXHAUSTED, + "exceeded max wait CUevent limit"); + break; + } + action->events[action->event_count++] = event; } - if (IREE_UNLIKELY(action->event_count >= - IREE_HAL_CUDA_MAX_WAIT_EVENT_COUNT)) { - status = iree_make_status(IREE_STATUS_RESOURCE_EXHAUSTED, - "exceeded max wait CUevent limit"); - break; - } - action->events[action->event_count++] = event; } if (IREE_UNLIKELY(!iree_status_is_ok(status))) break; @@ -714,6 +756,12 @@ iree_status_t iree_hal_cuda_pending_queue_actions_issue( iree_slim_mutex_unlock(&actions->action_mutex); + if (ready_list.head == NULL) { + // Nothing ready yet. Just return. + IREE_TRACE_ZONE_END(z0); + return status; + } + iree_hal_cuda_atomic_slist_entry_t* entry = NULL; // TODO: avoid host allocator malloc; use some pool for the allocation. if (iree_status_is_ok(status)) { @@ -725,6 +773,7 @@ iree_status_t iree_hal_cuda_pending_queue_actions_issue( // Release all actions in the ready list to avoid leaking. iree_hal_cuda_queue_action_list_free_actions(actions->host_allocator, &ready_list); + iree_allocator_free(actions->host_allocator, entry); IREE_TRACE_ZONE_END(z0); return status; } @@ -734,11 +783,17 @@ iree_status_t iree_hal_cuda_pending_queue_actions_issue( entry->ready_list_head = ready_list.head; iree_hal_cuda_ready_action_slist_push(&actions->working_area.ready_worklist, entry); + + // We can only overwrite the worker state if the previous state is idle + // waiting; we cannot overwrite exit related states. so we need to perform + // atomic compare and exchange here. iree_hal_cuda_worker_state_t prev_state = - (iree_hal_cuda_worker_state_t)iree_atomic_exchange_int32( - &actions->working_area.worker_state, - IREE_HAL_CUDA_WORKER_STATE_WORKLOAD_PENDING, - iree_memory_order_acq_rel); + IREE_HAL_CUDA_WORKER_STATE_IDLE_WAITING; + iree_atomic_compare_exchange_strong_int32( + &actions->working_area.worker_state, /*expected=*/&prev_state, + /*desired=*/IREE_HAL_CUDA_WORKER_STATE_WORKLOAD_PENDING, + /*order_succ=*/iree_memory_order_acq_rel, + /*order_fail=*/iree_memory_order_acquire); iree_notification_post(&actions->working_area.state_notification, IREE_ALL_WAITERS); @@ -761,6 +816,8 @@ static bool iree_hal_cuda_worker_has_incoming_request( iree_hal_cuda_working_area_t* working_area) { iree_hal_cuda_worker_state_t value = iree_atomic_load_int32( &working_area->worker_state, iree_memory_order_acquire); + // These are the only two possible states that set from the main thread to + // the worker thread. return value == IREE_HAL_CUDA_WORKER_STATE_WORKLOAD_PENDING || value == IREE_HAL_CUDA_WORKER_STATE_EXIT_REQUESTED; } @@ -790,7 +847,14 @@ static iree_status_t iree_hal_cuda_worker_process_ready_list( iree_hal_cuda_queue_action_t* next_action = action->next; action->next = NULL; - status = iree_hal_cuda_pending_queue_actions_issue_execution(action); + switch (action->state) { + case IREE_HAL_cuda_QUEUE_ACTION_STATE_ALIVE: + status = iree_hal_cuda_pending_queue_actions_issue_execution(action); + break; + case IREE_HAL_cuda_QUEUE_ACTION_STATE_ZOMBIE: + status = iree_hal_cuda_pending_queue_actions_issue_cleanup(action); + break; + } if (!iree_status_is_ok(status)) break; action->event_count = 0; @@ -816,11 +880,26 @@ static int iree_hal_cuda_worker_execute( (iree_condition_fn_t)iree_hal_cuda_worker_has_incoming_request, working_area, iree_infinite_timeout()); + // Immediately flip the state to idle waiting if and only if the previous + // state is workload pending. We do it before processing ready list to make + // sure that we don't accidentally ignore new workload pushed after done + // ready list processing but before overwriting the state from this worker + // thread. Also we don't want to overwrite other exit states. So we need to + // perform atomic compare and exchange here. + iree_hal_cuda_worker_state_t prev_state = + IREE_HAL_CUDA_WORKER_STATE_WORKLOAD_PENDING; + iree_atomic_compare_exchange_strong_int32( + &working_area->worker_state, /*expected=*/&prev_state, + /*desired=*/IREE_HAL_CUDA_WORKER_STATE_IDLE_WAITING, + /*order_succ=*/iree_memory_order_acq_rel, + /*order_fail=*/iree_memory_order_acquire); + // Check if we received request to stop processing and exit this thread. bool should_exit = iree_atomic_load_int32(&working_area->worker_state, iree_memory_order_acquire) == IREE_HAL_CUDA_WORKER_STATE_EXIT_REQUESTED; + // Process the ready list. We also want this even requested to exit. iree_status_t status = iree_hal_cuda_worker_process_ready_list( working_area->host_allocator, worklist); if (IREE_UNLIKELY(!iree_status_is_ok(status))) { @@ -828,6 +907,7 @@ static int iree_hal_cuda_worker_execute( iree_atomic_store_int32(&working_area->error_code, iree_status_code(status), iree_memory_order_release); + // This state has the highest priority so just overwrite. iree_atomic_store_int32(&working_area->worker_state, IREE_HAL_CUDA_WORKER_STATE_EXIT_ERROR, iree_memory_order_release); @@ -837,7 +917,9 @@ static int iree_hal_cuda_worker_execute( } if (should_exit) { - // Signal that this thread is committed to exit. + // Signal that this thread is committed to exit. This state has a priority + // that is only lower than error exit. And we just checked error exit in + // the above. So also just overwrite. iree_atomic_store_int32(&working_area->worker_state, IREE_HAL_CUDA_WORKER_STATE_EXIT_COMMITTED, iree_memory_order_release); @@ -845,11 +927,6 @@ static int iree_hal_cuda_worker_execute( IREE_ALL_WAITERS); return 0; } - - // Signal that this thread is done processing and now waiting for more. - iree_atomic_store_int32(&working_area->worker_state, - IREE_HAL_CUDA_WORKER_STATE_IDLE_WAITING, - iree_memory_order_release); } return 0; }