Skip to content

Commit

Permalink
EC/CUDA: enable support for multiple contexts (openucx#848)
Browse files Browse the repository at this point in the history
* EC/CUDA: add support for multiple context

* MC/CUDA: add support for multiple contexts

* MC/CUDA: support multiple contexts

* BUILD: fix ucc parser
  • Loading branch information
Sergei-Lebedev authored Jan 17, 2024
1 parent 2058f67 commit 75ecf74
Show file tree
Hide file tree
Showing 20 changed files with 917 additions and 578 deletions.
4 changes: 4 additions & 0 deletions config/m4/ucx.m4
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ AS_IF([test "x$ucx_checked" != "xyes"],[
[AC_DEFINE([UCS_HAVE_PARSER_SET_VALUE_TABLE_PREFIX], [1], [flags for ucs_rcache_get])],
[])
AC_CHECK_MEMBER(ucs_config_parser_t.doc,
[AC_DEFINE([UCS_HAVE_PARSER_CONFIG_DOC], [1], [flags for ucs_rcache_get])],
[],
[#include <ucs/memory/rcache.h>])
],
[
AS_IF([test "x$with_ucx" != "xguess"],
Expand Down
6 changes: 4 additions & 2 deletions src/components/ec/cuda/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#

if HAVE_CUDA
Expand All @@ -12,7 +12,9 @@ sources = \
ec_cuda_executor.c \
ec_cuda_executor_interruptible.c \
ec_cuda_executor_persistent.c \
ec_cuda_executor_persistent_wait.c
ec_cuda_executor_persistent_wait.c \
ec_cuda_resources.c \
ec_cuda_resources.h

module_LTLIBRARIES = libucc_ec_cuda.la
libucc_ec_cuda_la_SOURCES = $(sources)
Expand Down
260 changes: 76 additions & 184 deletions src/components/ec/cuda/ec_cuda.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,116 +75,6 @@ static ucc_config_field_t ucc_ec_cuda_config_table[] = {

};

static ucc_status_t ucc_ec_cuda_ee_executor_mpool_chunk_malloc(ucc_mpool_t *mp, //NOLINT: mp is unused
size_t *size_p,
void ** chunk_p)
{
return CUDA_FUNC(cudaHostAlloc((void**)chunk_p, *size_p,
cudaHostAllocMapped));
}

static void ucc_ec_cuda_ee_executor_mpool_chunk_free(ucc_mpool_t *mp, //NOLINT: mp is unused
void *chunk)
{
CUDA_FUNC(cudaFreeHost(chunk));
}

static void ucc_ec_cuda_executor_chunk_init(ucc_mpool_t *mp, void *obj, //NOLINT: mp is unused
void *chunk) //NOLINT: chunk is unused
{
ucc_ec_cuda_executor_t *eee = (ucc_ec_cuda_executor_t*) obj;
int max_tasks = EC_CUDA_CONFIG->exec_max_tasks;

CUDA_FUNC(cudaHostGetDevicePointer(
(void**)(&eee->dev_state), (void *)&eee->state, 0));
CUDA_FUNC(cudaHostGetDevicePointer(
(void**)(&eee->dev_pidx), (void *)&eee->pidx, 0));
CUDA_FUNC(cudaMalloc((void**)&eee->dev_cidx, sizeof(*eee->dev_cidx)));
CUDA_FUNC(cudaHostAlloc((void**)&eee->tasks,
max_tasks * MAX_SUBTASKS *
sizeof(ucc_ee_executor_task_args_t),
cudaHostAllocMapped));
CUDA_FUNC(cudaHostGetDevicePointer(
(void**)(&eee->dev_tasks), (void *)eee->tasks, 0));
if (ucc_ec_cuda.thread_mode == UCC_THREAD_MULTIPLE) {
ucc_spinlock_init(&eee->tasks_lock, 0);
}
}

static void ucc_ec_cuda_executor_chunk_cleanup(ucc_mpool_t *mp, void *obj) //NOLINT: mp is unused
{
ucc_ec_cuda_executor_t *eee = (ucc_ec_cuda_executor_t*) obj;

CUDA_FUNC(cudaFree((void*)eee->dev_cidx));
CUDA_FUNC(cudaFreeHost((void*)eee->tasks));
if (ucc_ec_cuda.thread_mode == UCC_THREAD_MULTIPLE) {
ucc_spinlock_destroy(&eee->tasks_lock);
}
}


static ucc_mpool_ops_t ucc_ec_cuda_ee_executor_mpool_ops = {
.chunk_alloc = ucc_ec_cuda_ee_executor_mpool_chunk_malloc,
.chunk_release = ucc_ec_cuda_ee_executor_mpool_chunk_free,
.obj_init = ucc_ec_cuda_executor_chunk_init,
.obj_cleanup = ucc_ec_cuda_executor_chunk_cleanup,
};

static void ucc_ec_cuda_event_init(ucc_mpool_t *mp, void *obj, void *chunk) //NOLINT: mp is unused
{
ucc_ec_cuda_event_t *base = (ucc_ec_cuda_event_t *) obj;

CUDA_FUNC(cudaEventCreateWithFlags(&base->event, cudaEventDisableTiming));
}

static void ucc_ec_cuda_event_cleanup(ucc_mpool_t *mp, void *obj) //NOLINT: mp is unused
{
ucc_ec_cuda_event_t *base = (ucc_ec_cuda_event_t *) obj;

CUDA_FUNC(cudaEventDestroy(base->event));
}

static ucc_mpool_ops_t ucc_ec_cuda_event_mpool_ops = {
.chunk_alloc = ucc_mpool_hugetlb_malloc,
.chunk_release = ucc_mpool_hugetlb_free,
.obj_init = ucc_ec_cuda_event_init,
.obj_cleanup = ucc_ec_cuda_event_cleanup,
};

static void ucc_ec_cuda_graph_init(ucc_mpool_t *mp, void *obj, void *chunk) //NOLINT: mp is unused
{
ucc_ec_cuda_executor_interruptible_task_t *task =
(ucc_ec_cuda_executor_interruptible_task_t *) obj;
cudaGraphNode_t memcpy_node;
int i;

CUDA_FUNC(cudaGraphCreate(&task->graph, 0));
for (i = 0; i < UCC_EE_EXECUTOR_MULTI_OP_NUM_BUFS; i++) {
CUDA_FUNC(
cudaGraphAddMemcpyNode1D(&memcpy_node, task->graph, NULL, 0,
(void*)1, (void*)1, 1, cudaMemcpyDefault));
}

CUDA_FUNC(
cudaGraphInstantiateWithFlags(&task->graph_exec, task->graph, 0));
}

static void ucc_ec_cuda_graph_cleanup(ucc_mpool_t *mp, void *obj) //NOLINT: mp is unused
{
ucc_ec_cuda_executor_interruptible_task_t *task =
(ucc_ec_cuda_executor_interruptible_task_t *) obj;

CUDA_FUNC(cudaGraphExecDestroy(task->graph_exec));
CUDA_FUNC(cudaGraphDestroy(task->graph));
}

static ucc_mpool_ops_t ucc_ec_cuda_interruptible_task_mpool_ops = {
.chunk_alloc = ucc_mpool_hugetlb_malloc,
.chunk_release = ucc_mpool_hugetlb_free,
.obj_init = ucc_ec_cuda_graph_init,
.obj_cleanup = ucc_ec_cuda_graph_cleanup,
};

static inline void ucc_ec_cuda_set_threads_nbr(int *nt, int maxThreadsPerBlock)
{
if (*nt != UCC_ULUNITS_AUTO) {
Expand All @@ -208,15 +98,14 @@ static inline void ucc_ec_cuda_set_threads_nbr(int *nt, int maxThreadsPerBlock)

static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params)
{
ucc_ec_cuda_config_t *cfg = EC_CUDA_CONFIG;
ucc_status_t status;
ucc_ec_cuda_config_t *cfg = EC_CUDA_CONFIG;
int supports_coop_launch = 0;
int device, num_devices;
cudaError_t cuda_st;
struct cudaDeviceProp prop;
int supportsCoopLaunch = 0;

ucc_ec_cuda.stream = NULL;
ucc_ec_cuda.stream_initialized = 0;
ucc_ec_cuda_config = ucc_derived_of(ucc_ec_cuda.super.config,
ucc_ec_cuda_config_t);
ucc_ec_cuda.exec_streams_initialized = 0;
ucc_strncpy_safe(ucc_ec_cuda.super.config->log_component.name,
ucc_ec_cuda.super.super.name,
Expand All @@ -228,9 +117,7 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params)
return UCC_ERR_NO_RESOURCE;
}
CUDA_CHECK(cudaGetDevice(&device));

CUDA_CHECK(cudaGetDeviceProperties(&prop, device));

ucc_ec_cuda_set_threads_nbr((int *)&cfg->exec_num_threads,
prop.maxThreadsPerBlock);
ucc_ec_cuda_set_threads_nbr(&cfg->reduce_num_threads,
Expand All @@ -253,52 +140,6 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params)
cfg->exec_num_streams = 1;
}

/*create event pool */
ucc_ec_cuda.exec_streams = ucc_calloc(cfg->exec_num_streams,
sizeof(cudaStream_t),
"ec cuda streams");
if (!ucc_ec_cuda.exec_streams) {
ec_error(&ucc_ec_cuda.super, "failed to allocate streams array");
return UCC_ERR_NO_MEMORY;
}
status = ucc_mpool_init(&ucc_ec_cuda.events, 0, sizeof(ucc_ec_cuda_event_t),
0, UCC_CACHE_LINE_SIZE, 16, UINT_MAX,
&ucc_ec_cuda_event_mpool_ops, UCC_THREAD_MULTIPLE,
"CUDA Event Objects");
if (status != UCC_OK) {
ec_error(&ucc_ec_cuda.super, "failed to create event pool");
return status;
}

status = ucc_mpool_init(
&ucc_ec_cuda.executors, 0, sizeof(ucc_ec_cuda_executor_t), 0,
UCC_CACHE_LINE_SIZE, 16, UINT_MAX, &ucc_ec_cuda_ee_executor_mpool_ops,
UCC_THREAD_MULTIPLE, "EE executor Objects");
if (status != UCC_OK) {
ec_error(&ucc_ec_cuda.super, "failed to create executors pool");
return status;
}

status = ucc_mpool_init(
&ucc_ec_cuda.executor_interruptible_tasks, 0,
sizeof(ucc_ec_cuda_executor_interruptible_task_t), 0, UCC_CACHE_LINE_SIZE,
16, UINT_MAX, &ucc_ec_cuda_interruptible_task_mpool_ops,
UCC_THREAD_MULTIPLE, "interruptible executor tasks");
if (status != UCC_OK) {
ec_error(&ucc_ec_cuda.super, "failed to create interruptible tasks pool");
return status;
}

status = ucc_mpool_init(
&ucc_ec_cuda.executor_persistent_tasks, 0,
sizeof(ucc_ec_cuda_executor_persistent_task_t), 0, UCC_CACHE_LINE_SIZE,
16, UINT_MAX, NULL, UCC_THREAD_MULTIPLE,
"persistent executor tasks");
if (status != UCC_OK) {
ec_error(&ucc_ec_cuda.super, "failed to create persistent tasks pool");
return status;
}

if (cfg->strm_task_mode == UCC_EC_CUDA_TASK_KERNEL) {
ucc_ec_cuda.strm_task_mode = UCC_EC_CUDA_TASK_KERNEL;
} else {
Expand Down Expand Up @@ -335,16 +176,17 @@ static ucc_status_t ucc_ec_cuda_init(const ucc_ec_params_t *ec_params)
}

if (cfg->use_cooperative_launch == 1) {
cudaDeviceGetAttribute(&supportsCoopLaunch,
cudaDeviceGetAttribute(&supports_coop_launch,
cudaDevAttrCooperativeLaunch, device);
if (!supportsCoopLaunch) {
if (!supports_coop_launch) {
cfg->use_cooperative_launch = 0;
ec_warn(&ucc_ec_cuda.super,
"CUDA cooperative groups are not supported. "
"Fall back to non cooperative launch.");
"CUDA cooperative groups are not supported. "
"Fall back to non cooperative launch.");
}
}

ucc_ec_cuda.resources_hash = kh_init(ucc_ec_cuda_resources_hash);
ucc_spinlock_init(&ucc_ec_cuda.init_spinlock, 0);
return UCC_OK;
}
Expand All @@ -359,9 +201,15 @@ static ucc_status_t ucc_ec_cuda_get_attr(ucc_ec_attr_t *ec_attr)

ucc_status_t ucc_ec_cuda_event_create(void **event)
{
ucc_ec_cuda_event_t *cuda_event;
ucc_ec_cuda_event_t *cuda_event;
ucc_ec_cuda_resources_t *resources;
ucc_status_t status;

cuda_event = ucc_mpool_get(&ucc_ec_cuda.events);
status = ucc_ec_cuda_get_resources(&resources);
if (ucc_unlikely(status != UCC_OK)) {
return status;
}
cuda_event = ucc_mpool_get(&resources->events);
if (ucc_unlikely(!cuda_event)) {
ec_error(&ucc_ec_cuda.super, "failed to get event from mpool");
return UCC_ERR_NO_MEMORY;
Expand Down Expand Up @@ -390,8 +238,8 @@ ucc_status_t ucc_ec_cuda_event_post(void *ee_context, void *event)

ucc_status_t ucc_ec_cuda_event_test(void *event)
{
cudaError_t cu_err;
ucc_ec_cuda_event_t *cuda_event = event;
cudaError_t cu_err;

cu_err = cudaEventQuery(cuda_event->event);

Expand All @@ -404,26 +252,68 @@ ucc_status_t ucc_ec_cuda_event_test(void *event)

static ucc_status_t ucc_ec_cuda_finalize()
{
int i;
ucc_ec_cuda_resources_t *resources;

if (ucc_ec_cuda.stream_initialized) {
CUDA_FUNC(cudaStreamDestroy(ucc_ec_cuda.stream));
ucc_ec_cuda.stream_initialized = 0;
resources = ec_cuda_resources_hash_pop(ucc_ec_cuda.resources_hash);
while (resources) {
ucc_ec_cuda_resources_cleanup(resources);
resources = ec_cuda_resources_hash_pop(ucc_ec_cuda.resources_hash);
}

if (ucc_ec_cuda.exec_streams_initialized) {
for (i = 0; i < EC_CUDA_CONFIG->exec_num_streams; i++) {
CUDA_FUNC(cudaStreamDestroy(ucc_ec_cuda.exec_streams[i]));
}
ucc_ec_cuda.exec_streams_initialized = 0;
ucc_spinlock_destroy(&ucc_ec_cuda.init_spinlock);

return UCC_OK;
}

ucc_status_t ucc_ec_cuda_get_resources(ucc_ec_cuda_resources_t **resources)
{
CUcontext cu_ctx;
unsigned long long int cu_ctx_id;
ucc_status_t status;

status = CUDADRV_FUNC(cuCtxGetCurrent(&cu_ctx));
if (ucc_unlikely(status != UCC_OK)) {
ec_error(&ucc_ec_cuda.super, "failed to get current CUDA context");
return status;
}

ucc_mpool_cleanup(&ucc_ec_cuda.events, 1);
ucc_mpool_cleanup(&ucc_ec_cuda.executors, 1);
ucc_mpool_cleanup(&ucc_ec_cuda.executor_interruptible_tasks, 1);
ucc_mpool_cleanup(&ucc_ec_cuda.executor_persistent_tasks, 1);
ucc_free(ucc_ec_cuda.exec_streams);
#if CUDA_VERSION < 12000
cu_ctx_id = 1;
#else
status = CUDADRV_FUNC(cuCtxGetId(cu_ctx, &cu_ctx_id));
if (ucc_unlikely(status != UCC_OK)) {
ec_error(&ucc_ec_cuda.super, "failed to get currect CUDA context ID");
}
#endif

*resources = ec_cuda_resources_hash_get(ucc_ec_cuda.resources_hash,
cu_ctx_id);
if (ucc_unlikely(*resources == NULL)) {
ucc_spin_lock(&ucc_ec_cuda.init_spinlock);
*resources = ec_cuda_resources_hash_get(ucc_ec_cuda.resources_hash,
cu_ctx_id);
if (*resources == NULL) {
*resources = ucc_malloc(sizeof(ucc_ec_cuda_resources_t),
"ec cuda resources");
if (*resources == NULL) {
ec_error(&ucc_ec_cuda.super,
"failed to allocate %zd bytes for resources",
sizeof(ucc_ec_cuda_resources_t));
ucc_spin_unlock(&ucc_ec_cuda.init_spinlock);
return UCC_ERR_NO_MEMORY;
}
status = ucc_ec_cuda_resources_init(&ucc_ec_cuda.super,
*resources);
if (status != UCC_OK) {
ucc_free(*resources);
ucc_spin_unlock(&ucc_ec_cuda.init_spinlock);
return status;
}
ec_cuda_resources_hash_put(ucc_ec_cuda.resources_hash, cu_ctx_id,
*resources);
}
ucc_spin_unlock(&ucc_ec_cuda.init_spinlock);
}
return UCC_OK;
}

Expand Down Expand Up @@ -455,5 +345,7 @@ ucc_ec_cuda_t ucc_ec_cuda = {
.super.executor_ops.finalize = ucc_cuda_executor_finalize,
};

ucc_ec_cuda_config_t *ucc_ec_cuda_config;

UCC_CONFIG_REGISTER_TABLE_ENTRY(&ucc_ec_cuda.super.config_table,
&ucc_config_global_list);
Loading

0 comments on commit 75ecf74

Please sign in to comment.