From 75ecf74d14b46f292a1da611e880deec117c6ea9 Mon Sep 17 00:00:00 2001 From: Sergey Lebedev Date: Wed, 17 Jan 2024 07:08:17 +0100 Subject: [PATCH] EC/CUDA: enable support for multiple contexts (#848) * EC/CUDA: add support for multiple context * MC/CUDA: add support for multiple contexts * MC/CUDA: support multiple contexts * BUILD: fix ucc parser --- config/m4/ucx.m4 | 4 + src/components/ec/cuda/Makefile.am | 6 +- src/components/ec/cuda/ec_cuda.c | 260 +++++---------- src/components/ec/cuda/ec_cuda.h | 85 +---- src/components/ec/cuda/ec_cuda_executor.c | 10 +- .../ec/cuda/ec_cuda_executor_interruptible.c | 39 ++- .../ec/cuda/ec_cuda_executor_persistent.c | 9 +- src/components/ec/cuda/ec_cuda_resources.c | 197 ++++++++++++ src/components/ec/cuda/ec_cuda_resources.h | 158 ++++++++++ src/components/ec/ucc_ec.c | 7 + src/components/mc/cuda/Makefile.am | 6 +- src/components/mc/cuda/mc_cuda.c | 298 +++++++----------- src/components/mc/cuda/mc_cuda.h | 34 +- src/components/mc/cuda/mc_cuda_resources.c | 92 ++++++ src/components/mc/cuda/mc_cuda_resources.h | 84 +++++ src/components/tl/ucp/reduce/reduce_dbt.c | 2 +- src/core/ucc_constructor.c | 165 +++++----- src/core/ucc_global_opts.h | 4 +- src/utils/arch/cuda_def.h | 9 + src/utils/ucc_parser.h | 26 +- 20 files changed, 917 insertions(+), 578 deletions(-) create mode 100644 src/components/ec/cuda/ec_cuda_resources.c create mode 100644 src/components/ec/cuda/ec_cuda_resources.h create mode 100644 src/components/mc/cuda/mc_cuda_resources.c create mode 100644 src/components/mc/cuda/mc_cuda_resources.h diff --git a/config/m4/ucx.m4 b/config/m4/ucx.m4 index fa608685cd..ba57dae303 100644 --- a/config/m4/ucx.m4 +++ b/config/m4/ucx.m4 @@ -128,6 +128,10 @@ AS_IF([test "x$ucx_checked" != "xyes"],[ [AC_DEFINE([UCS_HAVE_PARSER_SET_VALUE_TABLE_PREFIX], [1], [flags for ucs_rcache_get])], []) + AC_CHECK_MEMBER(ucs_config_parser_t.doc, + [AC_DEFINE([UCS_HAVE_PARSER_CONFIG_DOC], [1], [flags for ucs_rcache_get])], + [], + [#include ]) ], [ AS_IF([test "x$with_ucx" != "xguess"], diff --git a/src/components/ec/cuda/Makefile.am b/src/components/ec/cuda/Makefile.am index 3d7a862ef4..83f478d797 100644 --- a/src/components/ec/cuda/Makefile.am +++ b/src/components/ec/cuda/Makefile.am @@ -1,5 +1,5 @@ # -# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # if HAVE_CUDA @@ -12,7 +12,9 @@ sources = \ ec_cuda_executor.c \ ec_cuda_executor_interruptible.c \ ec_cuda_executor_persistent.c \ - ec_cuda_executor_persistent_wait.c + ec_cuda_executor_persistent_wait.c \ + ec_cuda_resources.c \ + ec_cuda_resources.h module_LTLIBRARIES = libucc_ec_cuda.la libucc_ec_cuda_la_SOURCES = $(sources) diff --git a/src/components/ec/cuda/ec_cuda.c b/src/components/ec/cuda/ec_cuda.c index 2357023fed..dd721e1f50 100644 --- a/src/components/ec/cuda/ec_cuda.c +++ b/src/components/ec/cuda/ec_cuda.c @@ -75,116 +75,6 @@ static ucc_config_field_t ucc_ec_cuda_config_table[] = { }; -static ucc_status_t ucc_ec_cuda_ee_executor_mpool_chunk_malloc(ucc_mpool_t *mp, //NOLINT: mp is unused - size_t *size_p, - void ** chunk_p) -{ - return CUDA_FUNC(cudaHostAlloc((void**)chunk_p, *size_p, - cudaHostAllocMapped)); -} - -static void ucc_ec_cuda_ee_executor_mpool_chunk_free(ucc_mpool_t *mp, //NOLINT: mp is unused - void *chunk) -{ - CUDA_FUNC(cudaFreeHost(chunk)); -} - -static void ucc_ec_cuda_executor_chunk_init(ucc_mpool_t *mp, void *obj, //NOLINT: mp is unused - void *chunk) //NOLINT: chunk is unused -{ - ucc_ec_cuda_executor_t *eee = (ucc_ec_cuda_executor_t*) obj; - int max_tasks = EC_CUDA_CONFIG->exec_max_tasks; - - CUDA_FUNC(cudaHostGetDevicePointer( - (void**)(&eee->dev_state), (void *)&eee->state, 0)); - CUDA_FUNC(cudaHostGetDevicePointer( - (void**)(&eee->dev_pidx), (void *)&eee->pidx, 0)); - CUDA_FUNC(cudaMalloc((void**)&eee->dev_cidx, sizeof(*eee->dev_cidx))); - CUDA_FUNC(cudaHostAlloc((void**)&eee->tasks, - max_tasks * MAX_SUBTASKS * - sizeof(ucc_ee_executor_task_args_t), - cudaHostAllocMapped)); - CUDA_FUNC(cudaHostGetDevicePointer( - (void**)(&eee->dev_tasks), (void *)eee->tasks, 0)); - if (ucc_ec_cuda.thread_mode == UCC_THREAD_MULTIPLE) { - ucc_spinlock_init(&eee->tasks_lock, 0); - } -} - -static void ucc_ec_cuda_executor_chunk_cleanup(ucc_mpool_t *mp, void *obj) //NOLINT: mp is unused -{ - ucc_ec_cuda_executor_t *eee = (ucc_ec_cuda_executor_t*) obj; - - CUDA_FUNC(cudaFree((void*)eee->dev_cidx)); - CUDA_FUNC(cudaFreeHost((void*)eee->tasks)); - if (ucc_ec_cuda.thread_mode == UCC_THREAD_MULTIPLE) { - ucc_spinlock_destroy(&eee->tasks_lock); - } -} - - -static ucc_mpool_ops_t ucc_ec_cuda_ee_executor_mpool_ops = { - .chunk_alloc = ucc_ec_cuda_ee_executor_mpool_chunk_malloc, - .chunk_release = ucc_ec_cuda_ee_executor_mpool_chunk_free, - .obj_init = ucc_ec_cuda_executor_chunk_init, - .obj_cleanup = ucc_ec_cuda_executor_chunk_cleanup, -}; - -static void ucc_ec_cuda_event_init(ucc_mpool_t *mp, void *obj, void *chunk) //NOLINT: mp is unused -{ - ucc_ec_cuda_event_t *base = (ucc_ec_cuda_event_t *) obj; - - CUDA_FUNC(cudaEventCreateWithFlags(&base->event, cudaEventDisableTiming)); -} - -static void ucc_ec_cuda_event_cleanup(ucc_mpool_t *mp, void *obj) //NOLINT: mp is unused -{ - ucc_ec_cuda_event_t *base = (ucc_ec_cuda_event_t *) obj; - - CUDA_FUNC(cudaEventDestroy(base->event)); -} - -static ucc_mpool_ops_t ucc_ec_cuda_event_mpool_ops = { - .chunk_alloc = ucc_mpool_hugetlb_malloc, - .chunk_release = ucc_mpool_hugetlb_free, - .obj_init = ucc_ec_cuda_event_init, - .obj_cleanup = ucc_ec_cuda_event_cleanup, -}; - -static void ucc_ec_cuda_graph_init(ucc_mpool_t *mp, void *obj, void *chunk) //NOLINT: mp is unused -{ - ucc_ec_cuda_executor_interruptible_task_t *task = - (ucc_ec_cuda_executor_interruptible_task_t *) obj; - cudaGraphNode_t memcpy_node; - int i; - - CUDA_FUNC(cudaGraphCreate(&task->graph, 0)); - for (i = 0; i < UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS; i++) { - CUDA_FUNC( - cudaGraphAddMemcpyNode1D(&memcpy_node, task->graph, NULL, 0, - (void*)1, (void*)1, 1, cudaMemcpyDefault)); - } - - CUDA_FUNC( - cudaGraphInstantiateWithFlags(&task->graph_exec, task->graph, 0)); -} - -static void ucc_ec_cuda_graph_cleanup(ucc_mpool_t *mp, void *obj) //NOLINT: mp is unused -{ - ucc_ec_cuda_executor_interruptible_task_t *task = - (ucc_ec_cuda_executor_interruptible_task_t *) obj; - - CUDA_FUNC(cudaGraphExecDestroy(task->graph_exec)); - CUDA_FUNC(cudaGraphDestroy(task->graph)); -} - -static ucc_mpool_ops_t ucc_ec_cuda_interruptible_task_mpool_ops = { - .chunk_alloc = ucc_mpool_hugetlb_malloc, - .chunk_release = ucc_mpool_hugetlb_free, - .obj_init = ucc_ec_cuda_graph_init, - .obj_cleanup = ucc_ec_cuda_graph_cleanup, -}; - static inline void ucc_ec_cuda_set_threads_nbr(int *nt, int maxThreadsPerBlock) { if (*nt != UCC_ULUNITS_AUTO) { @@ -208,15 +98,14 @@ static inline void ucc_ec_cuda_set_threads_nbr(int *nt, int maxThreadsPerBlock) static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params) { - ucc_ec_cuda_config_t *cfg = EC_CUDA_CONFIG; - ucc_status_t status; + ucc_ec_cuda_config_t *cfg = EC_CUDA_CONFIG; + int supports_coop_launch = 0; int device, num_devices; cudaError_t cuda_st; struct cudaDeviceProp prop; - int supportsCoopLaunch = 0; - ucc_ec_cuda.stream = NULL; - ucc_ec_cuda.stream_initialized = 0; + ucc_ec_cuda_config = ucc_derived_of(ucc_ec_cuda.super.config, + ucc_ec_cuda_config_t); ucc_ec_cuda.exec_streams_initialized = 0; ucc_strncpy_safe(ucc_ec_cuda.super.config->log_component.name, ucc_ec_cuda.super.super.name, @@ -228,9 +117,7 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params) return UCC_ERR_NO_RESOURCE; } CUDA_CHECK(cudaGetDevice(&device)); - CUDA_CHECK(cudaGetDeviceProperties(&prop, device)); - ucc_ec_cuda_set_threads_nbr((int *)&cfg->exec_num_threads, prop.maxThreadsPerBlock); ucc_ec_cuda_set_threads_nbr(&cfg->reduce_num_threads, @@ -253,52 +140,6 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params) cfg->exec_num_streams = 1; } - /*create event pool */ - ucc_ec_cuda.exec_streams = ucc_calloc(cfg->exec_num_streams, - sizeof(cudaStream_t), - "ec cuda streams"); - if (!ucc_ec_cuda.exec_streams) { - ec_error(&ucc_ec_cuda.super, "failed to allocate streams array"); - return UCC_ERR_NO_MEMORY; - } - status = ucc_mpool_init(&ucc_ec_cuda.events, 0, sizeof(ucc_ec_cuda_event_t), - 0, UCC_CACHE_LINE_SIZE, 16, UINT_MAX, - &ucc_ec_cuda_event_mpool_ops, UCC_THREAD_MULTIPLE, - "CUDA Event Objects"); - if (status != UCC_OK) { - ec_error(&ucc_ec_cuda.super, "failed to create event pool"); - return status; - } - - status = ucc_mpool_init( - &ucc_ec_cuda.executors, 0, sizeof(ucc_ec_cuda_executor_t), 0, - UCC_CACHE_LINE_SIZE, 16, UINT_MAX, &ucc_ec_cuda_ee_executor_mpool_ops, - UCC_THREAD_MULTIPLE, "EE executor Objects"); - if (status != UCC_OK) { - ec_error(&ucc_ec_cuda.super, "failed to create executors pool"); - return status; - } - - status = ucc_mpool_init( - &ucc_ec_cuda.executor_interruptible_tasks, 0, - sizeof(ucc_ec_cuda_executor_interruptible_task_t), 0, UCC_CACHE_LINE_SIZE, - 16, UINT_MAX, &ucc_ec_cuda_interruptible_task_mpool_ops, - UCC_THREAD_MULTIPLE, "interruptible executor tasks"); - if (status != UCC_OK) { - ec_error(&ucc_ec_cuda.super, "failed to create interruptible tasks pool"); - return status; - } - - status = ucc_mpool_init( - &ucc_ec_cuda.executor_persistent_tasks, 0, - sizeof(ucc_ec_cuda_executor_persistent_task_t), 0, UCC_CACHE_LINE_SIZE, - 16, UINT_MAX, NULL, UCC_THREAD_MULTIPLE, - "persistent executor tasks"); - if (status != UCC_OK) { - ec_error(&ucc_ec_cuda.super, "failed to create persistent tasks pool"); - return status; - } - if (cfg->strm_task_mode == UCC_EC_CUDA_TASK_KERNEL) { ucc_ec_cuda.strm_task_mode = UCC_EC_CUDA_TASK_KERNEL; } else { @@ -335,16 +176,17 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params) } if (cfg->use_cooperative_launch == 1) { - cudaDeviceGetAttribute(&supportsCoopLaunch, + cudaDeviceGetAttribute(&supports_coop_launch, cudaDevAttrCooperativeLaunch, device); - if (!supportsCoopLaunch) { + if (!supports_coop_launch) { cfg->use_cooperative_launch = 0; ec_warn(&ucc_ec_cuda.super, - "CUDA cooperative groups are not supported. " - "Fall back to non cooperative launch."); + "CUDA cooperative groups are not supported. " + "Fall back to non cooperative launch."); } } + ucc_ec_cuda.resources_hash = kh_init(ucc_ec_cuda_resources_hash); ucc_spinlock_init(&ucc_ec_cuda.init_spinlock, 0); return UCC_OK; } @@ -359,9 +201,15 @@ static ucc_status_t ucc_ec_cuda_get_attr(ucc_ec_attr_t *ec_attr) ucc_status_t ucc_ec_cuda_event_create(void **event) { - ucc_ec_cuda_event_t *cuda_event; + ucc_ec_cuda_event_t *cuda_event; + ucc_ec_cuda_resources_t *resources; + ucc_status_t status; - cuda_event = ucc_mpool_get(&ucc_ec_cuda.events); + status = ucc_ec_cuda_get_resources(&resources); + if (ucc_unlikely(status != UCC_OK)) { + return status; + } + cuda_event = ucc_mpool_get(&resources->events); if (ucc_unlikely(!cuda_event)) { ec_error(&ucc_ec_cuda.super, "failed to get event from mpool"); return UCC_ERR_NO_MEMORY; @@ -390,8 +238,8 @@ ucc_status_t ucc_ec_cuda_event_post(void *ee_context, void *event) ucc_status_t ucc_ec_cuda_event_test(void *event) { - cudaError_t cu_err; ucc_ec_cuda_event_t *cuda_event = event; + cudaError_t cu_err; cu_err = cudaEventQuery(cuda_event->event); @@ -404,26 +252,68 @@ ucc_status_t ucc_ec_cuda_event_test(void *event) static ucc_status_t ucc_ec_cuda_finalize() { - int i; + ucc_ec_cuda_resources_t *resources; - if (ucc_ec_cuda.stream_initialized) { - CUDA_FUNC(cudaStreamDestroy(ucc_ec_cuda.stream)); - ucc_ec_cuda.stream_initialized = 0; + resources = ec_cuda_resources_hash_pop(ucc_ec_cuda.resources_hash); + while (resources) { + ucc_ec_cuda_resources_cleanup(resources); + resources = ec_cuda_resources_hash_pop(ucc_ec_cuda.resources_hash); } - if (ucc_ec_cuda.exec_streams_initialized) { - for (i = 0; i < EC_CUDA_CONFIG->exec_num_streams; i++) { - CUDA_FUNC(cudaStreamDestroy(ucc_ec_cuda.exec_streams[i])); - } - ucc_ec_cuda.exec_streams_initialized = 0; + ucc_spinlock_destroy(&ucc_ec_cuda.init_spinlock); + + return UCC_OK; +} + +ucc_status_t ucc_ec_cuda_get_resources(ucc_ec_cuda_resources_t **resources) +{ + CUcontext cu_ctx; + unsigned long long int cu_ctx_id; + ucc_status_t status; + + status = CUDADRV_FUNC(cuCtxGetCurrent(&cu_ctx)); + if (ucc_unlikely(status != UCC_OK)) { + ec_error(&ucc_ec_cuda.super, "failed to get current CUDA context"); + return status; } - ucc_mpool_cleanup(&ucc_ec_cuda.events, 1); - ucc_mpool_cleanup(&ucc_ec_cuda.executors, 1); - ucc_mpool_cleanup(&ucc_ec_cuda.executor_interruptible_tasks, 1); - ucc_mpool_cleanup(&ucc_ec_cuda.executor_persistent_tasks, 1); - ucc_free(ucc_ec_cuda.exec_streams); +#if CUDA_VERSION < 12000 + cu_ctx_id = 1; +#else + status = CUDADRV_FUNC(cuCtxGetId(cu_ctx, &cu_ctx_id)); + if (ucc_unlikely(status != UCC_OK)) { + ec_error(&ucc_ec_cuda.super, "failed to get currect CUDA context ID"); + } +#endif + *resources = ec_cuda_resources_hash_get(ucc_ec_cuda.resources_hash, + cu_ctx_id); + if (ucc_unlikely(*resources == NULL)) { + ucc_spin_lock(&ucc_ec_cuda.init_spinlock); + *resources = ec_cuda_resources_hash_get(ucc_ec_cuda.resources_hash, + cu_ctx_id); + if (*resources == NULL) { + *resources = ucc_malloc(sizeof(ucc_ec_cuda_resources_t), + "ec cuda resources"); + if (*resources == NULL) { + ec_error(&ucc_ec_cuda.super, + "failed to allocate %zd bytes for resources", + sizeof(ucc_ec_cuda_resources_t)); + ucc_spin_unlock(&ucc_ec_cuda.init_spinlock); + return UCC_ERR_NO_MEMORY; + } + status = ucc_ec_cuda_resources_init(&ucc_ec_cuda.super, + *resources); + if (status != UCC_OK) { + ucc_free(*resources); + ucc_spin_unlock(&ucc_ec_cuda.init_spinlock); + return status; + } + ec_cuda_resources_hash_put(ucc_ec_cuda.resources_hash, cu_ctx_id, + *resources); + } + ucc_spin_unlock(&ucc_ec_cuda.init_spinlock); + } return UCC_OK; } @@ -455,5 +345,7 @@ ucc_ec_cuda_t ucc_ec_cuda = { .super.executor_ops.finalize = ucc_cuda_executor_finalize, }; +ucc_ec_cuda_config_t *ucc_ec_cuda_config; + UCC_CONFIG_REGISTER_TABLE_ENTRY(&ucc_ec_cuda.super.config_table, &ucc_config_global_list); diff --git a/src/components/ec/cuda/ec_cuda.h b/src/components/ec/cuda/ec_cuda.h index d732669f12..84b8588605 100644 --- a/src/components/ec/cuda/ec_cuda.h +++ b/src/components/ec/cuda/ec_cuda.h @@ -11,109 +11,30 @@ #include "components/ec/ucc_ec_log.h" #include "utils/arch/cuda_def.h" #include "utils/ucc_mpool.h" +#include "ec_cuda_resources.h" #include #define WARP_SIZE 32 -#define MAX_SUBTASKS 12 - -typedef enum ucc_ec_cuda_strm_task_mode { - UCC_EC_CUDA_TASK_KERNEL, - UCC_EC_CUDA_TASK_MEM_OPS, - UCC_EC_CUDA_TASK_AUTO, - UCC_EC_CUDA_TASK_LAST, -} ucc_ec_cuda_strm_task_mode_t; - -typedef enum ucc_ec_cuda_executor_state { - UCC_EC_CUDA_EXECUTOR_INITIALIZED, - UCC_EC_CUDA_EXECUTOR_POSTED, - UCC_EC_CUDA_EXECUTOR_STARTED, - UCC_EC_CUDA_EXECUTOR_SHUTDOWN, - UCC_EC_CUDA_EXECUTOR_SHUTDOWN_ACK -} ucc_ec_cuda_executor_state_t; - -typedef enum ucc_ec_cuda_executor_mode { - UCC_EC_CUDA_EXECUTOR_MODE_PERSISTENT, - UCC_EC_CUDA_EXECUTOR_MODE_INTERRUPTIBLE -} ucc_ec_cuda_executor_mode_t; typedef ucc_status_t (*ucc_ec_cuda_task_post_fn) (uint32_t *dev_status, int blocking_wait, cudaStream_t stream); -typedef struct ucc_ec_cuda_config { - ucc_ec_config_t super; - ucc_ec_cuda_strm_task_mode_t strm_task_mode; - unsigned long exec_num_workers; - unsigned long exec_num_threads; - unsigned long exec_max_tasks; - unsigned long exec_num_streams; - unsigned long reduce_num_blocks; - int reduce_num_threads; - int use_cooperative_launch; - unsigned long exec_copy_thresh; -} ucc_ec_cuda_config_t; - typedef struct ucc_ec_cuda { ucc_ec_base_t super; - int stream_initialized; - cudaStream_t stream; int exec_streams_initialized; - cudaStream_t *exec_streams; - ucc_mpool_t events; - ucc_mpool_t executors; - ucc_mpool_t executor_interruptible_tasks; - ucc_mpool_t executor_persistent_tasks; + ucc_ec_cuda_resources_hash_t *resources_hash; ucc_thread_mode_t thread_mode; ucc_ec_cuda_strm_task_mode_t strm_task_mode; ucc_spinlock_t init_spinlock; } ucc_ec_cuda_t; -typedef struct ucc_ec_cuda_event { - cudaEvent_t event; -} ucc_ec_cuda_event_t; - typedef struct ucc_ec_cuda_stream_request { uint32_t status; uint32_t *dev_status; cudaStream_t stream; } ucc_ec_cuda_stream_request_t; -typedef struct ucc_ec_cuda_executor_interruptible_task { - ucc_ee_executor_task_t super; - void *event; - cudaGraph_t graph; - cudaGraphExec_t graph_exec; -} ucc_ec_cuda_executor_interruptible_task_t; - -typedef struct ucc_ec_cuda_executor_persistent_task { - ucc_ee_executor_task_t super; - int num_subtasks; - ucc_ee_executor_task_args_t *subtasks[MAX_SUBTASKS]; -} ucc_ec_cuda_executor_persistent_task_t; - -typedef struct ucc_ec_cuda_executor_task_ops { - ucc_status_t (*task_post)(ucc_ee_executor_t *executor, - const ucc_ee_executor_task_args_t *task_args, - ucc_ee_executor_task_t **task); - ucc_status_t (*task_test)(const ucc_ee_executor_task_t *task); - ucc_status_t (*task_finalize)(ucc_ee_executor_task_t *task); -} ucc_ec_cuda_executor_task_ops_t; - -typedef struct ucc_ec_cuda_executor { - ucc_ee_executor_t super; - ucc_ec_cuda_executor_mode_t mode; - uint64_t requested_ops; - ucc_ec_cuda_executor_task_ops_t ops; - ucc_spinlock_t tasks_lock; - ucc_ec_cuda_executor_state_t state; - int pidx; - ucc_ee_executor_task_args_t *tasks; - ucc_ec_cuda_executor_state_t *dev_state; - ucc_ee_executor_task_args_t *dev_tasks; - int *dev_pidx; - int *dev_cidx; -} ucc_ec_cuda_executor_t; - ucc_status_t ucc_ec_cuda_event_create(void **event); ucc_status_t ucc_ec_cuda_event_destroy(void *event); @@ -122,6 +43,8 @@ ucc_status_t ucc_ec_cuda_event_post(void *ee_context, void *event); ucc_status_t ucc_ec_cuda_event_test(void *event); +ucc_status_t ucc_ec_cuda_get_resources(ucc_ec_cuda_resources_t **resources); + extern ucc_ec_cuda_t ucc_ec_cuda; #define EC_CUDA_CONFIG \ diff --git a/src/components/ec/cuda/ec_cuda_executor.c b/src/components/ec/cuda/ec_cuda_executor.c index 49ae469140..1349187b71 100644 --- a/src/components/ec/cuda/ec_cuda_executor.c +++ b/src/components/ec/cuda/ec_cuda_executor.c @@ -23,8 +23,16 @@ ucc_status_t ucc_cuda_executor_persistent_wait_stop(ucc_ee_executor_t *executor) ucc_status_t ucc_cuda_executor_init(const ucc_ee_executor_params_t *params, ucc_ee_executor_t **executor) { - ucc_ec_cuda_executor_t *eee = ucc_mpool_get(&ucc_ec_cuda.executors); + ucc_ec_cuda_executor_t *eee; + ucc_ec_cuda_resources_t *resources; + ucc_status_t status; + status = ucc_ec_cuda_get_resources(&resources); + if (ucc_unlikely(status != UCC_OK)) { + return status; + } + + eee = ucc_mpool_get(&resources->executors); if (ucc_unlikely(!eee)) { ec_error(&ucc_ec_cuda.super, "failed to allocate executor"); return UCC_ERR_NO_MEMORY; diff --git a/src/components/ec/cuda/ec_cuda_executor_interruptible.c b/src/components/ec/cuda/ec_cuda_executor_interruptible.c index 74cc80b96e..7272b2439f 100644 --- a/src/components/ec/cuda/ec_cuda_executor_interruptible.c +++ b/src/components/ec/cuda/ec_cuda_executor_interruptible.c @@ -9,37 +9,43 @@ ucc_status_t ucc_cuda_executor_interruptible_get_stream(cudaStream_t *stream) { - static uint32_t last_used = 0; - int num_streams = EC_CUDA_CONFIG->exec_num_streams; - ucc_status_t st; - int i, j; - uint32_t id; + static uint32_t last_used = 0; + int num_streams = EC_CUDA_CONFIG->exec_num_streams; + ucc_ec_cuda_resources_t *resources; + ucc_status_t st; + int i, j; + uint32_t id; ucc_assert(num_streams > 0); - if (ucc_unlikely(!ucc_ec_cuda.exec_streams_initialized)) { + st = ucc_ec_cuda_get_resources(&resources); + if (ucc_unlikely(st != UCC_OK)) { + return st; + } + + if (ucc_unlikely(!resources->streams_initialized)) { ucc_spin_lock(&ucc_ec_cuda.init_spinlock); - if (ucc_ec_cuda.exec_streams_initialized) { + if (resources->streams_initialized) { goto unlock; } for(i = 0; i < num_streams; i++) { - st = CUDA_FUNC(cudaStreamCreateWithFlags(&ucc_ec_cuda.exec_streams[i], + st = CUDA_FUNC(cudaStreamCreateWithFlags(&resources->exec_streams[i], cudaStreamNonBlocking)); if (st != UCC_OK) { for (j = 0; j < i; j++) { - CUDA_FUNC(cudaStreamDestroy(ucc_ec_cuda.exec_streams[j])); + CUDA_FUNC(cudaStreamDestroy(resources->exec_streams[j])); } ucc_spin_unlock(&ucc_ec_cuda.init_spinlock); return st; } } - ucc_ec_cuda.exec_streams_initialized = 1; + resources->streams_initialized = 1; unlock: ucc_spin_unlock(&ucc_ec_cuda.init_spinlock); } id = ucc_atomic_fadd32(&last_used, 1); - *stream = ucc_ec_cuda.exec_streams[id % num_streams]; + *stream = resources->exec_streams[id % num_streams]; return UCC_OK; } @@ -52,20 +58,25 @@ ucc_cuda_executor_interruptible_task_post(ucc_ee_executor_t *executor, const ucc_ee_executor_task_args_t *task_args, ucc_ee_executor_task_t **task) { - cudaStream_t stream = NULL; + cudaStream_t stream = NULL; + size_t num_nodes = UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS; ucc_ec_cuda_executor_interruptible_task_t *ee_task; ucc_status_t status; cudaGraphNode_t nodes[UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS]; - size_t num_nodes = UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS; + ucc_ec_cuda_resources_t *resources; int i; + status = ucc_ec_cuda_get_resources(&resources); + if (ucc_unlikely(status != UCC_OK)) { + return status; + } status = ucc_cuda_executor_interruptible_get_stream(&stream); if (ucc_unlikely(status != UCC_OK)) { return status; } - ee_task = ucc_mpool_get(&ucc_ec_cuda.executor_interruptible_tasks); + ee_task = ucc_mpool_get(&resources->executor_interruptible_tasks); if (ucc_unlikely(!ee_task)) { return UCC_ERR_NO_MEMORY; } diff --git a/src/components/ec/cuda/ec_cuda_executor_persistent.c b/src/components/ec/cuda/ec_cuda_executor_persistent.c index b937a89680..c43b132e12 100644 --- a/src/components/ec/cuda/ec_cuda_executor_persistent.c +++ b/src/components/ec/cuda/ec_cuda_executor_persistent.c @@ -18,12 +18,19 @@ ucc_cuda_executor_persistent_task_post(ucc_ee_executor_t *executor, ucc_ee_executor_task_args_t *subtask_args; ucc_ec_cuda_executor_persistent_task_t *ee_task; int i; + ucc_ec_cuda_resources_t *resources; + ucc_status_t status; + + status = ucc_ec_cuda_get_resources(&resources); + if (ucc_unlikely(status != UCC_OK)) { + return status; + } if (ucc_ec_cuda.thread_mode == UCC_THREAD_MULTIPLE) { ucc_spin_lock(&eee->tasks_lock); } - ee_task = ucc_mpool_get(&ucc_ec_cuda.executor_persistent_tasks); + ee_task = ucc_mpool_get(&resources->executor_persistent_tasks); if (ucc_unlikely(!ee_task)) { return UCC_ERR_NO_MEMORY; } diff --git a/src/components/ec/cuda/ec_cuda_resources.c b/src/components/ec/cuda/ec_cuda_resources.c new file mode 100644 index 0000000000..5bc0043f1f --- /dev/null +++ b/src/components/ec/cuda/ec_cuda_resources.c @@ -0,0 +1,197 @@ +#include "ec_cuda_resources.h" +#include "components/ec/ucc_ec_log.h" +#include "utils/ucc_malloc.h" + +static void ucc_ec_cuda_event_init(ucc_mpool_t *mp, void *obj, void *chunk) //NOLINT: mp is unused +{ + ucc_ec_cuda_event_t *base = (ucc_ec_cuda_event_t *) obj; + + CUDA_FUNC(cudaEventCreateWithFlags(&base->event, cudaEventDisableTiming)); +} + +static void ucc_ec_cuda_event_cleanup(ucc_mpool_t *mp, void *obj) //NOLINT: mp is unused +{ + ucc_ec_cuda_event_t *base = (ucc_ec_cuda_event_t *) obj; + + CUDA_FUNC(cudaEventDestroy(base->event)); +} + +static ucc_mpool_ops_t ucc_ec_cuda_event_mpool_ops = { + .chunk_alloc = ucc_mpool_hugetlb_malloc, + .chunk_release = ucc_mpool_hugetlb_free, + .obj_init = ucc_ec_cuda_event_init, + .obj_cleanup = ucc_ec_cuda_event_cleanup, +}; + +static ucc_status_t ucc_ec_cuda_ee_executor_mpool_chunk_malloc(ucc_mpool_t *mp, //NOLINT: mp is unused + size_t *size_p, + void ** chunk_p) +{ + return CUDA_FUNC(cudaHostAlloc((void**)chunk_p, *size_p, + cudaHostAllocMapped)); +} + +static void ucc_ec_cuda_ee_executor_mpool_chunk_free(ucc_mpool_t *mp, //NOLINT: mp is unused + void *chunk) +{ + CUDA_FUNC(cudaFreeHost(chunk)); +} + +static void ucc_ec_cuda_executor_chunk_init(ucc_mpool_t *mp, void *obj, //NOLINT: mp is unused + void *chunk) //NOLINT: chunk is unused +{ + ucc_ec_cuda_executor_t *eee = (ucc_ec_cuda_executor_t*) obj; + int max_tasks = ucc_ec_cuda_config->exec_max_tasks; + + CUDA_FUNC(cudaHostGetDevicePointer( + (void**)(&eee->dev_state), (void *)&eee->state, 0)); + CUDA_FUNC(cudaHostGetDevicePointer( + (void**)(&eee->dev_pidx), (void *)&eee->pidx, 0)); + CUDA_FUNC(cudaMalloc((void**)&eee->dev_cidx, sizeof(*eee->dev_cidx))); + CUDA_FUNC(cudaHostAlloc((void**)&eee->tasks, + max_tasks * MAX_SUBTASKS * + sizeof(ucc_ee_executor_task_args_t), + cudaHostAllocMapped)); + CUDA_FUNC(cudaHostGetDevicePointer( + (void**)(&eee->dev_tasks), (void *)eee->tasks, 0)); + ucc_spinlock_init(&eee->tasks_lock, 0); +} + +static void ucc_ec_cuda_executor_chunk_cleanup(ucc_mpool_t *mp, void *obj) //NOLINT: mp is unused +{ + ucc_ec_cuda_executor_t *eee = (ucc_ec_cuda_executor_t*) obj; + + CUDA_FUNC(cudaFree((void*)eee->dev_cidx)); + CUDA_FUNC(cudaFreeHost((void*)eee->tasks)); + ucc_spinlock_destroy(&eee->tasks_lock); +} + +static ucc_mpool_ops_t ucc_ec_cuda_ee_executor_mpool_ops = { + .chunk_alloc = ucc_ec_cuda_ee_executor_mpool_chunk_malloc, + .chunk_release = ucc_ec_cuda_ee_executor_mpool_chunk_free, + .obj_init = ucc_ec_cuda_executor_chunk_init, + .obj_cleanup = ucc_ec_cuda_executor_chunk_cleanup, +}; + +static void ucc_ec_cuda_graph_init(ucc_mpool_t *mp, void *obj, void *chunk) //NOLINT: mp is unused +{ + ucc_ec_cuda_executor_interruptible_task_t *task = + (ucc_ec_cuda_executor_interruptible_task_t *) obj; + cudaGraphNode_t memcpy_node; + int i; + + CUDA_FUNC(cudaGraphCreate(&task->graph, 0)); + for (i = 0; i < UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS; i++) { + CUDA_FUNC( + cudaGraphAddMemcpyNode1D(&memcpy_node, task->graph, NULL, 0, + (void*)1, (void*)1, 1, cudaMemcpyDefault)); + } + + CUDA_FUNC( + cudaGraphInstantiateWithFlags(&task->graph_exec, task->graph, 0)); +} + +static void ucc_ec_cuda_graph_cleanup(ucc_mpool_t *mp, void *obj) //NOLINT: mp is unused +{ + ucc_ec_cuda_executor_interruptible_task_t *task = + (ucc_ec_cuda_executor_interruptible_task_t *) obj; + + CUDA_FUNC(cudaGraphExecDestroy(task->graph_exec)); + CUDA_FUNC(cudaGraphDestroy(task->graph)); +} + +static ucc_mpool_ops_t ucc_ec_cuda_interruptible_task_mpool_ops = { + .chunk_alloc = ucc_mpool_hugetlb_malloc, + .chunk_release = ucc_mpool_hugetlb_free, + .obj_init = ucc_ec_cuda_graph_init, + .obj_cleanup = ucc_ec_cuda_graph_cleanup, +}; + +ucc_status_t ucc_ec_cuda_resources_init(ucc_ec_base_t *ec, + ucc_ec_cuda_resources_t *resources) +{ + ucc_status_t status; + int num_streams; + + CUDADRV_CHECK(cuCtxGetCurrent(&resources->cu_ctx)); + status = ucc_mpool_init(&resources->events, 0, sizeof(ucc_ec_cuda_event_t), + 0, UCC_CACHE_LINE_SIZE, 16, UINT_MAX, + &ucc_ec_cuda_event_mpool_ops, UCC_THREAD_MULTIPLE, + "CUDA Event Objects"); + if (status != UCC_OK) { + ec_error(ec, "failed to create CUDA events pool"); + goto exit_err; + } + + status = ucc_mpool_init(&resources->executors, 0, + sizeof(ucc_ec_cuda_executor_t), 0, + UCC_CACHE_LINE_SIZE, 16, UINT_MAX, + &ucc_ec_cuda_ee_executor_mpool_ops, + UCC_THREAD_MULTIPLE, "CUDA EE executor objects"); + if (status != UCC_OK) { + ec_error(ec, "failed to create executors pool"); + goto free_events_mpool; + } + + status = ucc_mpool_init(&resources->executor_interruptible_tasks, 0, + sizeof(ucc_ec_cuda_executor_interruptible_task_t), + 0, UCC_CACHE_LINE_SIZE, 16, UINT_MAX, + &ucc_ec_cuda_interruptible_task_mpool_ops, + UCC_THREAD_MULTIPLE, "interruptible executor tasks"); + if (status != UCC_OK) { + ec_error(ec, "failed to create interruptible tasks pool"); + goto free_executors_mpool; + } + + status = ucc_mpool_init(&resources->executor_persistent_tasks, 0, + sizeof(ucc_ec_cuda_executor_persistent_task_t), 0, + UCC_CACHE_LINE_SIZE, 16, UINT_MAX, NULL, + UCC_THREAD_MULTIPLE, "persistent executor tasks"); + if (status != UCC_OK) { + ec_error(ec, "failed to create persistent tasks pool"); + goto free_interruptible_tasks_mpool; + } + + num_streams = ucc_ec_cuda_config->exec_num_streams; + resources->exec_streams = ucc_calloc(num_streams, sizeof(cudaStream_t), + "ec cuda streams"); + if (!resources->exec_streams) { + ec_error(ec, "failed to allocate %zd bytes for executor streams", + sizeof(cudaStream_t) * num_streams); + status = UCC_ERR_NO_MEMORY; + goto free_persistent_tasks_mpool; + } + + return UCC_OK; + +free_persistent_tasks_mpool: + ucc_mpool_cleanup(&resources->executor_persistent_tasks, 0); +free_interruptible_tasks_mpool: + ucc_mpool_cleanup(&resources->executor_persistent_tasks, 0); +free_executors_mpool: + ucc_mpool_cleanup(&resources->executors, 0); +free_events_mpool: + ucc_mpool_cleanup(&resources->events, 0); +exit_err: + return status; +} + +void ucc_ec_cuda_resources_cleanup(ucc_ec_cuda_resources_t *resources) +{ + int i; + CUcontext tmp_context; + + cuCtxPushCurrent(resources->cu_ctx); + for (i = 0; i < ucc_ec_cuda_config->exec_num_streams; i++) { + if (resources->exec_streams[i] != NULL) { + CUDA_FUNC(cudaStreamDestroy(resources->exec_streams[i])); + } + } + ucc_mpool_cleanup(&resources->events, 1); + ucc_mpool_cleanup(&resources->executors, 1); + ucc_mpool_cleanup(&resources->executor_interruptible_tasks, 1); + ucc_mpool_cleanup(&resources->executor_persistent_tasks, 1); + + ucc_free(resources->exec_streams); + cuCtxPopCurrent(&tmp_context); +} diff --git a/src/components/ec/cuda/ec_cuda_resources.h b/src/components/ec/cuda/ec_cuda_resources.h new file mode 100644 index 0000000000..1390f76cdd --- /dev/null +++ b/src/components/ec/cuda/ec_cuda_resources.h @@ -0,0 +1,158 @@ +/** + * Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#ifndef UCC_EC_CUDA_RESOURCES_H_ +#define UCC_EC_CUDA_RESOURCES_H_ + +#include "components/ec/base/ucc_ec_base.h" +#include "utils/arch/cuda_def.h" +#include "utils/ucc_mpool.h" + +#define MAX_SUBTASKS 12 + +typedef enum ucc_ec_cuda_executor_state { + UCC_EC_CUDA_EXECUTOR_INITIALIZED, + UCC_EC_CUDA_EXECUTOR_POSTED, + UCC_EC_CUDA_EXECUTOR_STARTED, + UCC_EC_CUDA_EXECUTOR_SHUTDOWN, + UCC_EC_CUDA_EXECUTOR_SHUTDOWN_ACK +} ucc_ec_cuda_executor_state_t; + +typedef enum ucc_ec_cuda_executor_mode { + UCC_EC_CUDA_EXECUTOR_MODE_PERSISTENT, + UCC_EC_CUDA_EXECUTOR_MODE_INTERRUPTIBLE +} ucc_ec_cuda_executor_mode_t; + +typedef struct ucc_ec_cuda_event { + cudaEvent_t event; +} ucc_ec_cuda_event_t; + +typedef struct ucc_ec_cuda_executor_task_ops { + ucc_status_t (*task_post)(ucc_ee_executor_t *executor, + const ucc_ee_executor_task_args_t *task_args, + ucc_ee_executor_task_t **task); + ucc_status_t (*task_test)(const ucc_ee_executor_task_t *task); + ucc_status_t (*task_finalize)(ucc_ee_executor_task_t *task); +} ucc_ec_cuda_executor_task_ops_t; + +typedef struct ucc_ec_cuda_executor { + ucc_ee_executor_t super; + ucc_ec_cuda_executor_mode_t mode; + uint64_t requested_ops; + ucc_ec_cuda_executor_task_ops_t ops; + ucc_spinlock_t tasks_lock; + ucc_ec_cuda_executor_state_t state; + int pidx; + ucc_ee_executor_task_args_t *tasks; + ucc_ec_cuda_executor_state_t *dev_state; + ucc_ee_executor_task_args_t *dev_tasks; + int *dev_pidx; + int *dev_cidx; +} ucc_ec_cuda_executor_t; + +typedef struct ucc_ec_cuda_executor_interruptible_task { + ucc_ee_executor_task_t super; + void *event; + cudaGraph_t graph; + cudaGraphExec_t graph_exec; +} ucc_ec_cuda_executor_interruptible_task_t; + +typedef struct ucc_ec_cuda_executor_persistent_task { + ucc_ee_executor_task_t super; + int num_subtasks; + ucc_ee_executor_task_args_t *subtasks[MAX_SUBTASKS]; +} ucc_ec_cuda_executor_persistent_task_t; + +typedef struct ucc_ec_cuda_resources { + CUcontext cu_ctx; + ucc_mpool_t events; + ucc_mpool_t executors; + ucc_mpool_t executor_interruptible_tasks; + ucc_mpool_t executor_persistent_tasks; + int streams_initialized; + int num_streams; + cudaStream_t *exec_streams; +} ucc_ec_cuda_resources_t; + +typedef enum ucc_ec_cuda_strm_task_mode { + UCC_EC_CUDA_TASK_KERNEL, + UCC_EC_CUDA_TASK_MEM_OPS, + UCC_EC_CUDA_TASK_AUTO, + UCC_EC_CUDA_TASK_LAST, +} ucc_ec_cuda_strm_task_mode_t; + +typedef struct ucc_ec_cuda_config { + ucc_ec_config_t super; + ucc_ec_cuda_strm_task_mode_t strm_task_mode; + unsigned long exec_num_workers; + unsigned long exec_num_threads; + unsigned long exec_max_tasks; + unsigned long exec_num_streams; + unsigned long reduce_num_blocks; + int reduce_num_threads; + int use_cooperative_launch; + unsigned long exec_copy_thresh; +} ucc_ec_cuda_config_t; + +extern ucc_ec_cuda_config_t *ucc_ec_cuda_config; + +ucc_status_t ucc_ec_cuda_resources_init(ucc_ec_base_t *ec, + ucc_ec_cuda_resources_t *resources); + +void ucc_ec_cuda_resources_cleanup(ucc_ec_cuda_resources_t *resources); + +KHASH_INIT(ucc_ec_cuda_resources_hash, unsigned long long, void*, 1, \ + kh_int64_hash_func, kh_int64_hash_equal); +#define ucc_ec_cuda_resources_hash_t khash_t(ucc_ec_cuda_resources_hash) + +static inline +void* ec_cuda_resources_hash_get(ucc_ec_cuda_resources_hash_t *h, + unsigned long long key) +{ + khiter_t k; + void *value; + + k = kh_get(ucc_ec_cuda_resources_hash, h , key); + if (k == kh_end(h)) { + return NULL; + } + value = kh_value(h, k); + return value; +} + +static inline +void ec_cuda_resources_hash_put(ucc_ec_cuda_resources_hash_t *h, + unsigned long long key, + void *value) +{ + int ret; + khiter_t k; + k = kh_put(ucc_ec_cuda_resources_hash, h, key, &ret); + kh_value(h, k) = value; +} + +static inline +void* ec_cuda_resources_hash_pop(ucc_ec_cuda_resources_hash_t *h) +{ + void *resources = NULL; + khiter_t k; + + k = kh_begin(h); + while (k != kh_end(h)) { + if (kh_exist(h, k)) { + resources = kh_value(h, k); + break; + } + k++; + } + + if (resources) { + kh_del(ucc_ec_cuda_resources_hash, h, k); + } + return resources; +} + +#endif diff --git a/src/components/ec/ucc_ec.c b/src/components/ec/ucc_ec.c index af83e301b4..42cc096a0c 100644 --- a/src/components/ec/ucc_ec.c +++ b/src/components/ec/ucc_ec.c @@ -4,6 +4,7 @@ * See file LICENSE for terms. */ +#include #include "config.h" #include "base/ucc_ec_base.h" #include "ucc_ec.h" @@ -13,6 +14,7 @@ static const ucc_ec_ops_t *ec_ops[UCC_EE_LAST]; static const ucc_ee_executor_ops_t *executor_ops[UCC_EE_LAST]; +static pthread_mutex_t ucc_ec_mutex = PTHREAD_MUTEX_INITIALIZER; #define UCC_CHECK_EC_AVAILABLE(ee) \ do { \ @@ -28,6 +30,7 @@ ucc_status_t ucc_ec_init(const ucc_ec_params_t *ec_params) ucc_status_t status; ucc_ec_attr_t attr; + pthread_mutex_lock(&ucc_ec_mutex); memset(ec_ops, 0, UCC_EE_LAST * sizeof(ucc_ec_ops_t *)); n_ecs = ucc_global_config.ec_framework.n_components; for (i = 0; i < n_ecs; i++) { @@ -62,6 +65,7 @@ ucc_status_t ucc_ec_init(const ucc_ec_params_t *ec_params) attr.field_mask = UCC_EC_ATTR_FIELD_THREAD_MODE; status = ec->get_attr(&attr); if (status != UCC_OK) { + pthread_mutex_unlock(&ucc_ec_mutex); return status; } if (attr.thread_mode < ec_params->thread_mode) { @@ -75,6 +79,7 @@ ucc_status_t ucc_ec_init(const ucc_ec_params_t *ec_params) ec_ops[ec->type] = &ec->ops; executor_ops[ec->type] = &ec->executor_ops; } + pthread_mutex_unlock(&ucc_ec_mutex); return UCC_OK; } @@ -102,6 +107,7 @@ ucc_status_t ucc_ec_finalize() ucc_ee_type_t et; ucc_ec_base_t *ec; + pthread_mutex_lock(&ucc_ec_mutex); for (et = UCC_EE_FIRST; et < UCC_EE_LAST; et++) { if (NULL != ec_ops[et]) { ec = ucc_container_of(ec_ops[et], ucc_ec_base_t, ops); @@ -115,6 +121,7 @@ ucc_status_t ucc_ec_finalize() } } } + pthread_mutex_unlock(&ucc_ec_mutex); return UCC_OK; } diff --git a/src/components/mc/cuda/Makefile.am b/src/components/mc/cuda/Makefile.am index 1e25e2109a..d8e1dbe55e 100644 --- a/src/components/mc/cuda/Makefile.am +++ b/src/components/mc/cuda/Makefile.am @@ -5,8 +5,10 @@ if HAVE_CUDA sources = \ - mc_cuda.h \ - mc_cuda.c + mc_cuda.h \ + mc_cuda.c \ + mc_cuda_resources.c \ + mc_cuda_resources.h module_LTLIBRARIES = libucc_mc_cuda.la libucc_mc_cuda_la_SOURCES = $(sources) diff --git a/src/components/mc/cuda/mc_cuda.c b/src/components/mc/cuda/mc_cuda.c index a4b0a51330..aa2638b9da 100644 --- a/src/components/mc/cuda/mc_cuda.c +++ b/src/components/mc/cuda/mc_cuda.c @@ -50,8 +50,8 @@ static ucc_status_t ucc_mc_cuda_init(const ucc_mc_params_t *mc_params) int num_devices, driver_ver; cudaError_t cuda_st; - ucc_mc_cuda.stream = NULL; - ucc_mc_cuda.stream_initialized = 0; + ucc_mc_cuda_config = ucc_derived_of(ucc_mc_cuda.super.config, + ucc_mc_cuda_config_t); ucc_strncpy_safe(ucc_mc_cuda.super.config->log_component.name, ucc_mc_cuda.super.super.name, sizeof(ucc_mc_cuda.super.config->log_component.name)); @@ -100,6 +100,7 @@ static ucc_status_t ucc_mc_cuda_init(const ucc_mc_params_t *mc_params) "with driver version %d", driver_ver); } #endif + ucc_mc_cuda.resources_hash = kh_init(ucc_mc_cuda_resources_hash); // lock assures single mpool initiation when multiple threads concurrently execute // different collective operations thus concurrently entering init function. ucc_spinlock_init(&ucc_mc_cuda.init_spinlock, 0); @@ -127,8 +128,9 @@ static ucc_status_t ucc_mc_cuda_mem_alloc(ucc_mc_buffer_header_t **h_ptr, ucc_memory_type_t mt) { cudaError_t st; - ucc_mc_buffer_header_t *h = - ucc_malloc(sizeof(ucc_mc_buffer_header_t), "mc cuda"); + ucc_mc_buffer_header_t *h; + + h = ucc_malloc(sizeof(ucc_mc_buffer_header_t), "mc cuda"); if (ucc_unlikely(!h)) { mc_error(&ucc_mc_cuda.super, "failed to allocate %zd bytes", sizeof(ucc_mc_buffer_header_t)); @@ -139,13 +141,13 @@ static ucc_status_t ucc_mc_cuda_mem_alloc(ucc_mc_buffer_header_t **h_ptr, cudaMemAttachGlobal); if (ucc_unlikely(st != cudaSuccess)) { cudaGetLastError(); - mc_error(&ucc_mc_cuda.super, - "failed to allocate %zd bytes, " + mc_error(&ucc_mc_cuda.super, "failed to allocate %zd bytes, " "cuda error %d(%s)", size, st, cudaGetErrorString(st)); ucc_free(h); return UCC_ERR_NO_MEMORY; } + h->from_pool = 0; h->mt = UCC_MEMORY_TYPE_CUDA; *h_ptr = h; @@ -158,22 +160,25 @@ static ucc_status_t ucc_mc_cuda_mem_pool_alloc(ucc_mc_buffer_header_t **h_ptr, size_t size, ucc_memory_type_t mt) { - ucc_mc_buffer_header_t *h = NULL; - - if (size <= MC_CUDA_CONFIG->mpool_elem_size) { - if (mt == UCC_MEMORY_TYPE_CUDA) { - h = (ucc_mc_buffer_header_t *)ucc_mpool_get(&ucc_mc_cuda.mpool); - } else if (mt == UCC_MEMORY_TYPE_CUDA_MANAGED) { - h = (ucc_mc_buffer_header_t *)ucc_mpool_get(&ucc_mc_cuda.mpool_managed); - } else { - return UCC_ERR_INVALID_PARAM; + ucc_mc_buffer_header_t *h = NULL; + ucc_mc_cuda_resources_t *resources; + ucc_status_t status; + + if ((size <= MC_CUDA_CONFIG->mpool_elem_size) && + (mt != UCC_MEMORY_TYPE_CUDA_MANAGED)) { + status = ucc_mc_cuda_get_resources(&resources); + if (ucc_unlikely(status != UCC_OK)) { + return status; } + + h = (ucc_mc_buffer_header_t *)ucc_mpool_get(&resources->scratch_mpool); } if (!h) { // Slow path return ucc_mc_cuda_mem_alloc(h_ptr, size, mt); } + if (ucc_unlikely(!h->addr)){ return UCC_ERR_NO_MEMORY; } @@ -182,90 +187,6 @@ static ucc_status_t ucc_mc_cuda_mem_pool_alloc(ucc_mc_buffer_header_t **h_ptr, return UCC_OK; } -static ucc_status_t ucc_mc_cuda_chunk_alloc(ucc_mpool_t *mp, //NOLINT - size_t *size_p, - void **chunk_p) -{ - *chunk_p = ucc_malloc(*size_p, "mc cuda"); - if (!*chunk_p) { - mc_error(&ucc_mc_cuda.super, "failed to allocate %zd bytes", *size_p); - return UCC_ERR_NO_MEMORY; - } - - return UCC_OK; -} - -static void ucc_mc_cuda_chunk_init(ucc_mpool_t *mp, //NOLINT - void *obj, void *chunk) //NOLINT -{ - ucc_mc_buffer_header_t *h = (ucc_mc_buffer_header_t *)obj; - cudaError_t st = cudaMalloc(&h->addr, MC_CUDA_CONFIG->mpool_elem_size); - if (st != cudaSuccess) { - // h->addr will be 0 so ucc_mc_cuda_mem_alloc_pool function will - // return UCC_ERR_NO_MEMORY. As such mc_error message is suffice. - h->addr = NULL; - cudaGetLastError(); - mc_error(&ucc_mc_cuda.super, - "failed to allocate %zd bytes, " - "cuda error %d(%s)", - MC_CUDA_CONFIG->mpool_elem_size, st, cudaGetErrorString(st)); - } - h->from_pool = 1; - h->mt = UCC_MEMORY_TYPE_CUDA; -} - -static void ucc_mc_cuda_chunk_release(ucc_mpool_t *mp, void *chunk) //NOLINT: mp is unused -{ - ucc_free(chunk); -} - -static void ucc_mc_cuda_chunk_cleanup(ucc_mpool_t *mp, void *obj) //NOLINT: mp is unused -{ - ucc_mc_buffer_header_t *h = (ucc_mc_buffer_header_t *)obj; - cudaError_t st; - st = cudaFree(h->addr); - if (st != cudaSuccess) { - cudaGetLastError(); - mc_error(&ucc_mc_cuda.super, - "failed to free mem at %p, " - "cuda error %d(%s)", - obj, st, cudaGetErrorString(st)); - } -} - -static void ucc_mc_cuda_chunk_init_managed(ucc_mpool_t *mp, //NOLINT - void *obj, void *chunk) //NOLINT -{ - ucc_mc_buffer_header_t *h = (ucc_mc_buffer_header_t *)obj; - cudaError_t st; - - st = cudaMallocManaged(&h->addr, MC_CUDA_CONFIG->mpool_elem_size, - cudaMemAttachGlobal); - if (st != cudaSuccess) { - // h->addr will be 0 so ucc_mc_cuda_mem_alloc_pool function will - // return UCC_ERR_NO_MEMORY. As such mc_error message is suffice. - h->addr = NULL; - cudaGetLastError(); - mc_error(&ucc_mc_cuda.super, - "failed to allocate %zd bytes, " - "cuda error %d(%s)", - MC_CUDA_CONFIG->mpool_elem_size, st, cudaGetErrorString(st)); - } - h->from_pool = 1; - h->mt = UCC_MEMORY_TYPE_CUDA_MANAGED; -} - - -static ucc_mpool_ops_t ucc_mc_ops = {.chunk_alloc = ucc_mc_cuda_chunk_alloc, - .chunk_release = ucc_mc_cuda_chunk_release, - .obj_init = ucc_mc_cuda_chunk_init, - .obj_cleanup = ucc_mc_cuda_chunk_cleanup}; - -static ucc_mpool_ops_t ucc_mc_managed_ops = {.chunk_alloc = ucc_mc_cuda_chunk_alloc, - .chunk_release = ucc_mc_cuda_chunk_release, - .obj_init = ucc_mc_cuda_chunk_init_managed, - .obj_cleanup = ucc_mc_cuda_chunk_cleanup}; - static ucc_status_t ucc_mc_cuda_mem_free(ucc_mc_buffer_header_t *h_ptr) { cudaError_t st; @@ -293,105 +214,72 @@ static ucc_status_t ucc_mc_cuda_mem_pool_free(ucc_mc_buffer_header_t *h_ptr) static ucc_status_t ucc_mc_cuda_mem_pool_alloc_with_init(ucc_mc_buffer_header_t **h_ptr, - size_t size, - ucc_memory_type_t mt) + size_t size, + ucc_memory_type_t mt) { - ucc_status_t status; - - // lock assures single mpool initiation when multiple threads concurrently execute - // different collective operations thus concurrently entering init function. - ucc_spin_lock(&ucc_mc_cuda.init_spinlock); - if (MC_CUDA_CONFIG->mpool_max_elems == 0) { ucc_mc_cuda.super.ops.mem_alloc = ucc_mc_cuda_mem_alloc; ucc_mc_cuda.super.ops.mem_free = ucc_mc_cuda_mem_free; - ucc_spin_unlock(&ucc_mc_cuda.init_spinlock); return ucc_mc_cuda_mem_alloc(h_ptr, size, mt); - } - - if (!ucc_mc_cuda.mpool_init_flag) { - status = ucc_mpool_init( - &ucc_mc_cuda.mpool, 0, sizeof(ucc_mc_buffer_header_t), 0, - UCC_CACHE_LINE_SIZE, 1, MC_CUDA_CONFIG->mpool_max_elems, - &ucc_mc_ops, ucc_mc_cuda.thread_mode, "mc cuda mpool buffers"); - if (status != UCC_OK) { - ucc_spin_unlock(&ucc_mc_cuda.init_spinlock); - return status; - } - - status = ucc_mpool_init( - &ucc_mc_cuda.mpool_managed, 0, sizeof(ucc_mc_buffer_header_t), 0, - UCC_CACHE_LINE_SIZE, 1, MC_CUDA_CONFIG->mpool_max_elems, - &ucc_mc_managed_ops, ucc_mc_cuda.thread_mode, "mc cuda mpool buffers"); - if (status != UCC_OK) { - ucc_spin_unlock(&ucc_mc_cuda.init_spinlock); - return status; - } - - + } else { ucc_mc_cuda.super.ops.mem_alloc = ucc_mc_cuda_mem_pool_alloc; - ucc_mc_cuda.mpool_init_flag = 1; + ucc_mc_cuda.super.ops.mem_free = ucc_mc_cuda_mem_pool_free; + return ucc_mc_cuda_mem_pool_alloc(h_ptr, size, mt); } - ucc_spin_unlock(&ucc_mc_cuda.init_spinlock); - return ucc_mc_cuda_mem_pool_alloc(h_ptr, size, mt); } static ucc_status_t ucc_mc_cuda_memcpy(void *dst, const void *src, size_t len, ucc_memory_type_t dst_mem, ucc_memory_type_t src_mem) { - cudaError_t st; + ucc_status_t status; + ucc_mc_cuda_resources_t *resources; + ucc_assert(dst_mem == UCC_MEMORY_TYPE_CUDA || src_mem == UCC_MEMORY_TYPE_CUDA || dst_mem == UCC_MEMORY_TYPE_CUDA_MANAGED || src_mem == UCC_MEMORY_TYPE_CUDA_MANAGED); - UCC_MC_CUDA_INIT_STREAM(); - st = cudaMemcpyAsync(dst, src, len, cudaMemcpyDefault, ucc_mc_cuda.stream); - if (ucc_unlikely(st != cudaSuccess)) { - cudaGetLastError(); - mc_error(&ucc_mc_cuda.super, - "failed to launch cudaMemcpyAsync, dst %p, src %p, len %zd " - "cuda error %d(%s)", - dst, src, len, st, cudaGetErrorString(st)); - return UCC_ERR_NO_MESSAGE; + status = ucc_mc_cuda_get_resources(&resources); + if (ucc_unlikely(status) != UCC_OK) { + return status; } - st = cudaStreamSynchronize(ucc_mc_cuda.stream); - if (ucc_unlikely(st != cudaSuccess)) { - cudaGetLastError(); + + status = CUDA_FUNC(cudaMemcpyAsync(dst, src, len, cudaMemcpyDefault, + resources->stream)); + if (ucc_unlikely(status != UCC_OK)) { mc_error(&ucc_mc_cuda.super, - "failed to synchronize mc_cuda.stream " - "cuda error %d(%s)", - st, cudaGetErrorString(st)); - return UCC_ERR_NO_MESSAGE; + "failed to launch cudaMemcpyAsync, dst %p, src %p, len %zd", + dst, src, len); + return status; } - return UCC_OK; + + status = CUDA_FUNC(cudaStreamSynchronize(resources->stream)); + + return status; } ucc_status_t ucc_mc_cuda_memset(void *ptr, int val, size_t len) { - cudaError_t st; + ucc_status_t status; + ucc_mc_cuda_resources_t *resources; - UCC_MC_CUDA_INIT_STREAM(); - st = cudaMemsetAsync(ptr, val, len, ucc_mc_cuda.stream); - if (ucc_unlikely(st != cudaSuccess)) { - cudaGetLastError(); - mc_error(&ucc_mc_cuda.super, - "failed to launch cudaMemsetAsync, dst %p, len %zd " - "cuda error %d(%s)", - ptr, len, st, cudaGetErrorString(st)); - return UCC_ERR_NO_MESSAGE; + status = ucc_mc_cuda_get_resources(&resources); + if (ucc_unlikely(status) != UCC_OK) { + return status; } - st = cudaStreamSynchronize(ucc_mc_cuda.stream); - if (ucc_unlikely(st != cudaSuccess)) { - cudaGetLastError(); + + status = CUDA_FUNC(cudaMemsetAsync(ptr, val, len, resources->stream)); + if (ucc_unlikely(status != UCC_OK)) { mc_error(&ucc_mc_cuda.super, - "failed to synchronize mc_cuda.stream " - "cuda error %d(%s)", - st, cudaGetErrorString(st)); - return UCC_ERR_NO_MESSAGE; + "failed to launch cudaMemsetAsync, dst %p, len %zd", + ptr, len); + return status; } - return UCC_OK; + + status = CUDA_FUNC(cudaStreamSynchronize(resources->stream)); + + return status; } static ucc_status_t ucc_mc_cuda_mem_query(const void *ptr, @@ -463,18 +351,69 @@ static ucc_status_t ucc_mc_cuda_mem_query(const void *ptr, return UCC_OK; } -static ucc_status_t ucc_mc_cuda_finalize() +ucc_status_t ucc_mc_cuda_get_resources(ucc_mc_cuda_resources_t **resources) { - if (ucc_mc_cuda.stream != NULL) { - CUDA_CHECK(cudaStreamDestroy(ucc_mc_cuda.stream)); - ucc_mc_cuda.stream = NULL; + CUcontext cu_ctx; + unsigned long long int cu_ctx_id; + ucc_status_t status; + + status = CUDADRV_FUNC(cuCtxGetCurrent(&cu_ctx)); + if (ucc_unlikely(status != UCC_OK)) { + mc_error(&ucc_mc_cuda.super, "failed to get current CUDA context"); + return status; + } + +#if CUDA_VERSION < 12000 + cu_ctx_id = 1; +#else + status = CUDADRV_FUNC(cuCtxGetId(cu_ctx, &cu_ctx_id)); + if (ucc_unlikely(status != UCC_OK)) { + mc_error(&ucc_mc_cuda.super, "failed to get currect CUDA context ID"); + } +#endif + + *resources = mc_cuda_resources_hash_get(ucc_mc_cuda.resources_hash, + cu_ctx_id); + if (ucc_unlikely(*resources == NULL)) { + ucc_spin_lock(&ucc_mc_cuda.init_spinlock); + *resources = mc_cuda_resources_hash_get(ucc_mc_cuda.resources_hash, + cu_ctx_id); + if (*resources == NULL) { + *resources = ucc_malloc(sizeof(ucc_mc_cuda_resources_t), + "mc cuda resources"); + if (*resources == NULL) { + mc_error(&ucc_mc_cuda.super, + "failed to allocate %zd bytes for resources", + sizeof(ucc_mc_cuda_resources_t)); + ucc_spin_unlock(&ucc_mc_cuda.init_spinlock); + return UCC_ERR_NO_MEMORY; + } + status = ucc_mc_cuda_resources_init(&ucc_mc_cuda.super, + *resources); + if (status != UCC_OK) { + ucc_free(*resources); + ucc_spin_unlock(&ucc_mc_cuda.init_spinlock); + return status; + } + mc_cuda_resources_hash_put(ucc_mc_cuda.resources_hash, cu_ctx_id, + *resources); + } + ucc_spin_unlock(&ucc_mc_cuda.init_spinlock); } - if (ucc_mc_cuda.mpool_init_flag) { - ucc_mpool_cleanup(&ucc_mc_cuda.mpool, 1); - ucc_mpool_cleanup(&ucc_mc_cuda.mpool_managed, 1); - ucc_mc_cuda.mpool_init_flag = 0; - ucc_mc_cuda.super.ops.mem_alloc = ucc_mc_cuda_mem_pool_alloc_with_init; + return UCC_OK; +} + +static ucc_status_t ucc_mc_cuda_finalize() +{ + ucc_mc_cuda_resources_t *resources; + + resources = mc_cuda_resources_hash_pop(ucc_mc_cuda.resources_hash); + while (resources) { + ucc_mc_cuda_resources_cleanup(resources); + resources = mc_cuda_resources_hash_pop(ucc_mc_cuda.resources_hash); } + + ucc_mc_cuda.super.ops.mem_alloc = ucc_mc_cuda_mem_pool_alloc_with_init; ucc_spinlock_destroy(&ucc_mc_cuda.init_spinlock); return UCC_OK; } @@ -500,8 +439,9 @@ ucc_mc_cuda_t ucc_mc_cuda = { .table = ucc_mc_cuda_config_table, .size = sizeof(ucc_mc_cuda_config_t), }, - .mpool_init_flag = 0, }; +ucc_mc_cuda_config_t *ucc_mc_cuda_config; + UCC_CONFIG_REGISTER_TABLE_ENTRY(&ucc_mc_cuda.super.config_table, &ucc_config_global_list); diff --git a/src/components/mc/cuda/mc_cuda.h b/src/components/mc/cuda/mc_cuda.h index 63c730fa73..10779c27cb 100644 --- a/src/components/mc/cuda/mc_cuda.h +++ b/src/components/mc/cuda/mc_cuda.h @@ -7,30 +7,18 @@ #ifndef UCC_MC_CUDA_H_ #define UCC_MC_CUDA_H_ -#include +#include #include "components/mc/base/ucc_mc_base.h" #include "components/mc/ucc_mc_log.h" #include "utils/ucc_mpool.h" #include "utils/arch/cuda_def.h" -#include - -typedef struct ucc_mc_cuda_config { - ucc_mc_config_t super; - size_t mpool_elem_size; - int mpool_max_elems; -} ucc_mc_cuda_config_t; +#include "mc_cuda_resources.h" typedef struct ucc_mc_cuda { ucc_mc_base_t super; - int stream_initialized; - cudaStream_t stream; - ucc_mpool_t events; - ucc_mpool_t strm_reqs; - ucc_mpool_t mpool; - ucc_mpool_t mpool_managed; - int mpool_init_flag; ucc_spinlock_t init_spinlock; ucc_thread_mode_t thread_mode; + ucc_mc_cuda_resources_hash_t *resources_hash; } ucc_mc_cuda_t; extern ucc_mc_cuda_t ucc_mc_cuda; @@ -38,21 +26,7 @@ extern ucc_mc_cuda_t ucc_mc_cuda; #define MC_CUDA_CONFIG \ (ucc_derived_of(ucc_mc_cuda.super.config, ucc_mc_cuda_config_t)) -#define UCC_MC_CUDA_INIT_STREAM() do { \ - if (!ucc_mc_cuda.stream_initialized) { \ - cudaError_t cuda_st = cudaSuccess; \ - ucc_spin_lock(&ucc_mc_cuda.init_spinlock); \ - if (!ucc_mc_cuda.stream_initialized) { \ - cuda_st = cudaStreamCreateWithFlags(&ucc_mc_cuda.stream, \ - cudaStreamNonBlocking); \ - ucc_mc_cuda.stream_initialized = 1; \ - } \ - ucc_spin_unlock(&ucc_mc_cuda.init_spinlock); \ - if (ucc_unlikely(cudaSuccess != cuda_st)) { \ - return cuda_error_to_ucc_status(cuda_st); \ - } \ - } \ -} while(0) +ucc_status_t ucc_mc_cuda_get_resources(ucc_mc_cuda_resources_t **resources); ucc_status_t ucc_mc_cuda_memset(void *ptr, int val, size_t len); diff --git a/src/components/mc/cuda/mc_cuda_resources.c b/src/components/mc/cuda/mc_cuda_resources.c new file mode 100644 index 0000000000..398b83784e --- /dev/null +++ b/src/components/mc/cuda/mc_cuda_resources.c @@ -0,0 +1,92 @@ +#include "mc_cuda_resources.h" +#include "components/mc/ucc_mc_log.h" +#include "utils/ucc_malloc.h" + +static ucc_status_t ucc_mc_cuda_chunk_alloc(ucc_mpool_t *mp, //NOLINT + size_t *size_p, + void **chunk_p) +{ + *chunk_p = ucc_malloc(*size_p, "mc cuda"); + if (!*chunk_p) { + return UCC_ERR_NO_MEMORY; + } + + return UCC_OK; +} + +static void ucc_mc_cuda_chunk_init(ucc_mpool_t *mp, //NOLINT + void *obj, void *chunk) //NOLINT +{ + ucc_mc_buffer_header_t *h = (ucc_mc_buffer_header_t *)obj; + cudaError_t st; + + st = cudaMalloc(&h->addr, ucc_mc_cuda_config->mpool_elem_size); + if (st != cudaSuccess) { + // h->addr will be 0 so ucc_mc_cuda_mem_alloc_pool function will + // return UCC_ERR_NO_MEMORY. As such mc_error message is suffice. + cudaGetLastError(); + } + h->from_pool = 1; + h->mt = UCC_MEMORY_TYPE_CUDA; +} + +static void ucc_mc_cuda_chunk_release(ucc_mpool_t *mp, void *chunk) //NOLINT: mp is unused +{ + ucc_free(chunk); +} + +static void ucc_mc_cuda_chunk_cleanup(ucc_mpool_t *mp, void *obj) //NOLINT: mp is unused +{ + ucc_mc_buffer_header_t *h = (ucc_mc_buffer_header_t *)obj; + cudaError_t st; + + st = cudaFree(h->addr); + if (st != cudaSuccess) { + cudaGetLastError(); + } +} + +static ucc_mpool_ops_t ucc_mc_ops = {.chunk_alloc = ucc_mc_cuda_chunk_alloc, + .chunk_release = ucc_mc_cuda_chunk_release, + .obj_init = ucc_mc_cuda_chunk_init, + .obj_cleanup = ucc_mc_cuda_chunk_cleanup}; + +ucc_status_t ucc_mc_cuda_resources_init(ucc_mc_base_t *mc, + ucc_mc_cuda_resources_t *resources) +{ + ucc_status_t status; + + CUDADRV_CHECK(cuCtxGetCurrent(&resources->cu_ctx)); + status = ucc_mpool_init(&resources->scratch_mpool, 0, + sizeof(ucc_mc_buffer_header_t), 0, + UCC_CACHE_LINE_SIZE, 1, + ucc_mc_cuda_config->mpool_max_elems, &ucc_mc_ops, + UCC_THREAD_MULTIPLE, "mc cuda mpool buffers"); + if (status != UCC_OK) { + mc_error(mc, "failed to create scratch buffers mpool"); + return status; + } + + status = CUDA_FUNC(cudaStreamCreateWithFlags(&resources->stream, + cudaStreamNonBlocking)); + if (status != UCC_OK) { + mc_error(mc, "failed to create CUDA stream"); + goto free_scratch_mpool; + } + + return UCC_OK; + +free_scratch_mpool: + ucc_mpool_cleanup(&resources->scratch_mpool, 0); + return status; +} + +void ucc_mc_cuda_resources_cleanup(ucc_mc_cuda_resources_t *resources) +{ + CUcontext tmp_context; + + cuCtxPushCurrent(resources->cu_ctx); + ucc_mpool_cleanup(&resources->scratch_mpool, 1); + CUDA_FUNC(cudaStreamDestroy(resources->stream)); + cuCtxPopCurrent(&tmp_context); +} diff --git a/src/components/mc/cuda/mc_cuda_resources.h b/src/components/mc/cuda/mc_cuda_resources.h new file mode 100644 index 0000000000..557effe3c0 --- /dev/null +++ b/src/components/mc/cuda/mc_cuda_resources.h @@ -0,0 +1,84 @@ +/** + * Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#ifndef UCC_MC_CUDA_RESOURCES_H_ +#define UCC_MC_CUDA_RESOURCES_H_ + +#include "components/mc/base/ucc_mc_base.h" +#include "utils/arch/cuda_def.h" +#include "utils/ucc_mpool.h" + +typedef struct ucc_mc_cuda_config { + ucc_mc_config_t super; + size_t mpool_elem_size; + int mpool_max_elems; +} ucc_mc_cuda_config_t; + +typedef struct ucc_mc_cuda_resources { + CUcontext cu_ctx; + cudaStream_t stream; + ucc_mpool_t scratch_mpool; +} ucc_mc_cuda_resources_t; + +extern ucc_mc_cuda_config_t *ucc_mc_cuda_config; + +ucc_status_t ucc_mc_cuda_resources_init(ucc_mc_base_t *mc, + ucc_mc_cuda_resources_t *resources); + +void ucc_mc_cuda_resources_cleanup(ucc_mc_cuda_resources_t *resources); + +KHASH_INIT(ucc_mc_cuda_resources_hash, unsigned long long, void*, 1, \ + kh_int64_hash_func, kh_int64_hash_equal); +#define ucc_mc_cuda_resources_hash_t khash_t(ucc_mc_cuda_resources_hash) + +static inline +void* mc_cuda_resources_hash_get(ucc_mc_cuda_resources_hash_t *h, + unsigned long long key) +{ + khiter_t k; + void *value; + + k = kh_get(ucc_mc_cuda_resources_hash, h , key); + if (k == kh_end(h)) { + return NULL; + } + value = kh_value(h, k); + return value; +} + +static inline +void mc_cuda_resources_hash_put(ucc_mc_cuda_resources_hash_t *h, + unsigned long long key, + void *value) +{ + int ret; + khiter_t k; + k = kh_put(ucc_mc_cuda_resources_hash, h, key, &ret); + kh_value(h, k) = value; +} + +static inline +void* mc_cuda_resources_hash_pop(ucc_mc_cuda_resources_hash_t *h) +{ + void *resources = NULL; + khiter_t k; + + k = kh_begin(h); + while (k != kh_end(h)) { + if (kh_exist(h, k)) { + resources = kh_value(h, k); + break; + } + k++; + } + + if (resources) { + kh_del(ucc_mc_cuda_resources_hash, h, k); + } + return resources; +} + +#endif diff --git a/src/components/tl/ucp/reduce/reduce_dbt.c b/src/components/tl/ucp/reduce/reduce_dbt.c index cfa5d2ff22..08e8774974 100644 --- a/src/components/tl/ucp/reduce/reduce_dbt.c +++ b/src/components/tl/ucp/reduce/reduce_dbt.c @@ -226,7 +226,7 @@ void ucc_tl_ucp_reduce_dbt_progress(ucc_coll_task_t *coll_task) for (i = 0; i < 2; i++) { if (is_root && rank == trees[i].root) { UCPCHECK_GOTO(ucc_mc_memcpy(PTR_OFFSET(args->dst.info.buffer, - i * counts[i - 1] * ucc_dt_size(dt)), + i * counts[(i + 1) % 2] * ucc_dt_size(dt)), rbuf[i], counts[i] * ucc_dt_size(dt), mtype, mtype), task, out); } diff --git a/src/core/ucc_constructor.c b/src/core/ucc_constructor.c index 2cabdc2f32..c113d2ea56 100644 --- a/src/core/ucc_constructor.c +++ b/src/core/ucc_constructor.c @@ -15,6 +15,7 @@ #include "utils/profile/ucc_profile.h" #include "ucc/api/ucc_version.h" #include +#include static ucc_status_t ucc_check_config_file(void) { @@ -93,100 +94,106 @@ static ucc_status_t init_lib_paths(void) UCC_CONFIG_REGISTER_TABLE(ucc_global_config_table, "UCC global", NULL, ucc_global_config, &ucc_config_global_list) +static pthread_mutex_t ucc_constructor_mutex = PTHREAD_MUTEX_INITIALIZER; + ucc_status_t ucc_constructor(void) { - ucc_global_config_t *cfg = &ucc_global_config; - ucc_status_t status; + ucc_global_config_t *cfg = &ucc_global_config; + ucc_status_t status = UCC_OK; Dl_info dl_info; int ret; - if (!cfg->initialized) { - cfg->initialized = 1; - status = ucc_config_parser_fill_opts( - &ucc_global_config, UCC_CONFIG_GET_TABLE(ucc_global_config_table), - "UCC_", 1); - if (UCC_OK != status) { - ucc_error("failed to parse global options"); - return status; - } + pthread_mutex_lock(&ucc_constructor_mutex); + if (cfg->initialized) { + goto exit_unlock_mutex; + } - if (UCC_OK != (status = init_lib_paths())) { - ucc_error("failed to init ucc components path"); - return status; - } + cfg->initialized = 1; + status = ucc_config_parser_fill_opts( + &ucc_global_config, UCC_CONFIG_GET_TABLE(ucc_global_config_table), + "UCC_", 1); + if (UCC_OK != status) { + ucc_error("failed to parse global options"); + goto exit_unlock_mutex; + } - status = ucc_check_config_file(); - if (UCC_OK != status && UCC_ERR_NOT_FOUND != status) { - /* bail only in case of real error */ - return status; - } + if (UCC_OK != (status = init_lib_paths())) { + ucc_error("failed to init ucc components path"); + goto exit_unlock_mutex; + } - status = ucc_components_load("cl", &cfg->cl_framework); - if (UCC_OK != status) { - ucc_error("no CL components were found in the " - "ucc modules dir: %s", - cfg->component_path); - return status; - } - status = ucc_component_check_scores_uniq(&cfg->cl_framework); - if (UCC_OK != status) { - ucc_error("CLs must have distinct uniq default scores"); - return status; - } - status = ucc_components_load("tl", &cfg->tl_framework); - if (UCC_OK != status) { - /* not critical - some CLs may operate w/o use of TL */ - ucc_debug("no TL components were found in the " - "ucc modules dir: %s", - cfg->component_path); - } - status = ucc_component_check_scores_uniq(&cfg->tl_framework); - if (UCC_OK != status) { - ucc_error("TLs must have distinct uniq default scores"); - return status; - } - status = ucc_components_load("mc", &cfg->mc_framework); - if (UCC_OK != status) { - ucc_error("no memory components were found in the " - "ucc modules dir: %s", + status = ucc_check_config_file(); + if (UCC_OK != status && UCC_ERR_NOT_FOUND != status) { + /* bail only in case of real error */ + goto exit_unlock_mutex; + } + + status = ucc_components_load("cl", &cfg->cl_framework); + if (UCC_OK != status) { + ucc_error("no CL components were found in the " + "ucc modules dir: %s", cfg->component_path); + goto exit_unlock_mutex; + } + status = ucc_component_check_scores_uniq(&cfg->cl_framework); + if (UCC_OK != status) { + ucc_error("CLs must have distinct uniq default scores"); + goto exit_unlock_mutex; + } + status = ucc_components_load("tl", &cfg->tl_framework); + if (UCC_OK != status) { + /* not critical - some CLs may operate w/o use of TL */ + ucc_debug("no TL components were found in the " + "ucc modules dir: %s", cfg->component_path); + } + status = ucc_component_check_scores_uniq(&cfg->tl_framework); + if (UCC_OK != status) { + ucc_error("TLs must have distinct uniq default scores"); + goto exit_unlock_mutex; + } + status = ucc_components_load("mc", &cfg->mc_framework); + if (UCC_OK != status) { + ucc_error("no memory components were found in the " + "ucc modules dir: %s", cfg->component_path); + goto exit_unlock_mutex; + } + status = ucc_components_load("ec", &cfg->ec_framework); + if (status != UCC_OK) { + if (status == UCC_ERR_NOT_FOUND) { + ucc_info("no execution components were found in the " + "ucc modules dir: %s. " + "Triggered operations might not work", cfg->component_path); - return status; - } - status = ucc_components_load("ec", &cfg->ec_framework); - if (status != UCC_OK) { - if (status == UCC_ERR_NOT_FOUND) { - ucc_info("no execution components were found in the " - "ucc modules dir: %s. " - "Triggered operations might not work", - cfg->component_path); - } else { - ucc_error("failed to load execution components %d (%s)", - status, ucc_status_string(status)); - return status; - } + } else { + ucc_error("failed to load execution components %d (%s)", + status, ucc_status_string(status)); + goto exit_unlock_mutex; } + } - if (UCC_OK != ucc_local_proc_info_init()) { - ucc_error("failed to initialize local proc info"); - return status; - } + if (UCC_OK != ucc_local_proc_info_init()) { + ucc_error("failed to initialize local proc info"); + goto exit_unlock_mutex; + } #ifdef HAVE_PROFILING - ucc_profile_init(cfg->profile_mode, cfg->profile_file, - cfg->profile_log_size); + ucc_profile_init(cfg->profile_mode, cfg->profile_file, + cfg->profile_log_size); #endif - if (ucc_global_config.log_component.log_level >= UCC_LOG_LEVEL_INFO) { - ret = dladdr(ucc_init_version, &dl_info); - if (ret == 0) { - ucc_error("failed to get ucc_init_version handler"); - return UCC_ERR_NO_MESSAGE; - } - ucc_info("version: %s, loaded from: %s, cfg file: %s", - ucc_get_version_string(), dl_info.dli_fname, - ucc_global_config.file_cfg ? - ucc_global_config.file_cfg->filename: "n/a"); + if (ucc_global_config.log_component.log_level >= UCC_LOG_LEVEL_INFO) { + ret = dladdr(ucc_init_version, &dl_info); + if (ret == 0) { + ucc_error("failed to get ucc_init_version handler"); + status = UCC_ERR_NO_RESOURCE; + goto exit_unlock_mutex; } + ucc_info("version: %s, loaded from: %s, cfg file: %s", + ucc_get_version_string(), dl_info.dli_fname, + ucc_global_config.file_cfg ? + ucc_global_config.file_cfg->filename: "n/a"); } - return UCC_OK; + +exit_unlock_mutex: + pthread_mutex_unlock(&ucc_constructor_mutex); + return status; } __attribute__((destructor)) static void ucc_destructor(void) diff --git a/src/core/ucc_global_opts.h b/src/core/ucc_global_opts.h index 54079ad6fc..203ca65e9d 100644 --- a/src/core/ucc_global_opts.h +++ b/src/core/ucc_global_opts.h @@ -35,8 +35,8 @@ typedef struct ucc_global_config { /* Limit for profiling log size */ size_t profile_log_size; - char * cfg_filename; - ucc_file_config_t * file_cfg; + char *cfg_filename; + ucc_file_config_t *file_cfg; } ucc_global_config_t; extern ucc_global_config_t ucc_global_config; diff --git a/src/utils/arch/cuda_def.h b/src/utils/arch/cuda_def.h index 7f690531e2..d758846c9d 100644 --- a/src/utils/arch/cuda_def.h +++ b/src/utils/arch/cuda_def.h @@ -74,6 +74,15 @@ static inline ucc_status_t cuda_error_to_ucc_status(cudaError_t cuda_status) } \ } while(0) +#define CUDADRV_CHECK(_cmd) \ + /* coverity[dead_error_line] */ \ + do { \ + ucc_status_t _cuda_status = CUDADRV_FUNC(_cmd); \ + if (ucc_unlikely(_cuda_status != UCC_OK)) { \ + return _cuda_status; \ + } \ + } while(0) + #define CUDA_CHECK_GOTO(_cmd, _label, _cuda_status) \ do { \ _cuda_status = CUDA_FUNC(_cmd); \ diff --git a/src/utils/ucc_parser.h b/src/utils/ucc_parser.h index f8bd2e7ede..517dd88be8 100644 --- a/src/utils/ucc_parser.h +++ b/src/utils/ucc_parser.h @@ -268,8 +268,29 @@ int ucc_config_sprintf_uint_ranged(char *buf, size_t max, const void *src, ucs_status_t ucc_config_clone_uint_ranged(const void *src, void *dest, const void *arg); -void ucc_config_release_uint_ranged(void *ptr, const void *arg); +void ucc_config_release_uint_ranged(void *ptr, const void *arg); +#ifdef UCS_HAVE_PARSER_CONFIG_DOC +#define UCC_CONFIG_TYPE_UINT_RANGED \ + { \ + ucc_config_sscanf_uint_ranged, ucc_config_sprintf_uint_ranged, \ + ucc_config_clone_uint_ranged, ucc_config_release_uint_ranged, \ + ucs_config_help_generic, ucs_config_doc_nop, \ + "[-:[mtype]:value," \ + "-:[mtype]:value,...,]default_value\n" \ + "# value and default_value can be \"auto\"" \ + } + +#define UCC_CONFIG_TYPE_PIPELINE_PARAMS \ + { \ + ucc_config_sscanf_pipeline_params, ucc_config_sprintf_pipeline_params, \ + ucc_config_clone_pipeline_params, \ + ucc_config_release_pipeline_params, ucs_config_help_generic, \ + ucs_config_doc_nop, \ + "thresh=:fragsize=:nfrags=" \ + ":pdepth=:" \ + } +#else #define UCC_CONFIG_TYPE_UINT_RANGED \ { \ ucc_config_sscanf_uint_ranged, ucc_config_sprintf_uint_ranged, \ @@ -285,7 +306,8 @@ void ucc_config_release_uint_ranged(void *ptr, const void *arg); ucc_config_clone_pipeline_params, \ ucc_config_release_pipeline_params, ucs_config_help_generic, \ "thresh=:fragsize=:nfrags=" \ - ":pdepth=:" \ + ":pdepth=:" \ } +#endif #endif