From 06c388d09fdeb03b719c21168652534a5fc9a6ce Mon Sep 17 00:00:00 2001 From: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> Date: Wed, 9 Oct 2024 11:17:25 -0700 Subject: [PATCH] Expand RMM options for Python API (#266) * add rmm args Signed-off-by: Sarah Yurick * working code Signed-off-by: Sarah Yurick * update best practices Signed-off-by: Sarah Yurick * add to localcluster Signed-off-by: Sarah Yurick * run black Signed-off-by: Sarah Yurick * remove rmm_track_allocations and add link Signed-off-by: Sarah Yurick --------- Signed-off-by: Sarah Yurick Signed-off-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com> --- docs/user-guide/bestpractices.rst | 14 ++++++- nemo_curator/utils/distributed_utils.py | 54 +++++++++++++++++++++++-- 2 files changed, 63 insertions(+), 5 deletions(-) diff --git a/docs/user-guide/bestpractices.rst b/docs/user-guide/bestpractices.rst index 533777b2..bc151c68 100644 --- a/docs/user-guide/bestpractices.rst +++ b/docs/user-guide/bestpractices.rst @@ -33,7 +33,19 @@ Here are some features which can help optimize memory usage: * Enable asynchronous memory allocation: Use the ``--rmm-async`` flag to allow RMM to handle memory allocation more efficiently, by allocating and deallocating GPU memory asynchronously. * Set a memory release threshold: For example, ``--rmm-release-threshold 50GB`` can help prevent holding onto excess memory, releasing unused memory when a certain limit is reached. Please keep in mind that using this flag may degrade performance slightly as RMM is busy releasing the unused memory. -You can set these flags while initializing your own Dask client, for example: +You can pass these parameters directly into NeMo Curator's ``get_client`` function, which initializes a Dask client for you: + +.. code-block:: python + + from nemo_curator.utils.distributed_utils import get_client + + client = get_client( + cluster_type="gpu", + rmm_async=True, + rmm_release_threshold="50GB", + ) + +Alternatively, you can set these flags while initializing your own Dask client, for example: .. code-block:: python diff --git a/nemo_curator/utils/distributed_utils.py b/nemo_curator/utils/distributed_utils.py index 3f37eb90..c41b5ea9 100644 --- a/nemo_curator/utils/distributed_utils.py +++ b/nemo_curator/utils/distributed_utils.py @@ -62,6 +62,11 @@ def start_dask_gpu_local_cluster( rmm_pool_size="1024M", enable_spilling=True, set_torch_to_use_rmm=True, + rmm_async=True, + rmm_maximum_pool_size=None, + rmm_managed_memory=False, + rmm_release_threshold=None, + **cluster_kwargs, ) -> Client: """ This function sets up a Dask cluster across all the @@ -82,9 +87,13 @@ def start_dask_gpu_local_cluster( cluster = LocalCUDACluster( rmm_pool_size=rmm_pool_size, protocol=protocol, - rmm_async=True, enable_cudf_spill=enable_spilling, + rmm_async=rmm_async, + rmm_maximum_pool_size=rmm_maximum_pool_size, + rmm_managed_memory=rmm_managed_memory, + rmm_release_threshold=rmm_release_threshold, **extra_kwargs, + **cluster_kwargs, ) client = Client(cluster) @@ -101,14 +110,18 @@ def start_dask_gpu_local_cluster( def start_dask_cpu_local_cluster( - n_workers=os.cpu_count(), threads_per_worker=1 + n_workers=os.cpu_count(), threads_per_worker=1, **cluster_kwargs ) -> Client: """ This function sets up a Dask cluster across all the CPUs present on the machine. """ - cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker) + cluster = LocalCluster( + n_workers=n_workers, + threads_per_worker=threads_per_worker, + **cluster_kwargs, + ) client = Client(cluster) return client @@ -124,6 +137,11 @@ def get_client( rmm_pool_size="1024M", enable_spilling=True, set_torch_to_use_rmm=False, + rmm_async=True, + rmm_maximum_pool_size=None, + rmm_managed_memory=False, + rmm_release_threshold=None, + **cluster_kwargs, ) -> Client: """ Initializes or connects to a Dask cluster. @@ -155,6 +173,27 @@ def get_client( host to enable out-of-memory computation, i.e., computing on objects that occupy more memory than is available on the GPU. set_torch_to_use_rmm: For GPU-based clusters only. Sets up the PyTorch memory pool to be the same as the RAPIDS memory pool. This helps avoid OOM errors when using both PyTorch and RAPIDS on the same GPU. + rmm_async: For GPU-based clusters only. Initializes each worker with RAPIDS Memory Manager (RMM) + (see RMM documentation for more information: https://docs.rapids.ai/api/rmm/stable/) + and sets it to use RMM's asynchronous allocator. Warning: The asynchronous allocator requires CUDA Toolkit 11.2 or newer. + It is also incompatible with RMM pools and managed memory. Trying to enable both will result in an exception. + rmm_maximum_pool_size: For GPU-based clusters only. When rmm_pool_size is set, this argument indicates the maximum pool size. + Can be an integer (bytes), float (fraction of total device memory), string (like "5GB" or "5000M") or None. + By default, the total available memory on the GPU is used. + rmm_pool_size must be specified to use RMM pool and to set the maximum pool size. + Note: When paired with --enable-rmm-async the maximum size cannot be guaranteed due to fragmentation. + Note: This size is a per-worker configuration, and not cluster-wide. + rmm_managed_memory: For GPU-based clusters only. Initialize each worker with RMM and set it to use managed memory. + If disabled, RMM may still be used by specifying rmm_pool_size. + Warning: Managed memory is currently incompatible with NVLink. Trying to enable both will result in an exception. + rmm_release_threshold: For GPU-based clusters only. When rmm.async is True and the pool size grows beyond this value, + unused memory held by the pool will be released at the next synchronization point. + Can be an integer (bytes), float (fraction of total device memory), string (like "5GB" or "5000M") or None. + By default, this feature is disabled. + Note: This size is a per-worker configuration, and not cluster-wide. + cluster_kwargs: Additional keyword arguments for the LocalCluster or LocalCUDACluster configuration. + See API documentation https://docs.dask.org/en/stable/deploying-python.html#distributed.deploy.local.LocalCluster + for all LocalCluster parameters, or https://docs.rapids.ai/api/dask-cuda/nightly/api/ for all LocalCUDACluster parameters. Returns: A Dask client object. @@ -179,10 +218,17 @@ def get_client( rmm_pool_size=rmm_pool_size, enable_spilling=enable_spilling, set_torch_to_use_rmm=set_torch_to_use_rmm, + rmm_async=rmm_async, + rmm_maximum_pool_size=rmm_maximum_pool_size, + rmm_managed_memory=rmm_managed_memory, + rmm_release_threshold=rmm_release_threshold, + **cluster_kwargs, ) else: return start_dask_cpu_local_cluster( - n_workers=n_workers, threads_per_worker=threads_per_worker + n_workers=n_workers, + threads_per_worker=threads_per_worker, + **cluster_kwargs, )