Skip to content

Commit

Permalink
Expand RMM options for Python API (#266)
Browse files Browse the repository at this point in the history
* add rmm args

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

* working code

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

* update best practices

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

* add to localcluster

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

* run black

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

* remove rmm_track_allocations and add link

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>

---------

Signed-off-by: Sarah Yurick <sarahyurick@gmail.com>
Signed-off-by: Sarah Yurick <53962159+sarahyurick@users.noreply.github.com>
  • Loading branch information
sarahyurick authored Oct 9, 2024
1 parent fa4befc commit 06c388d
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 5 deletions.
14 changes: 13 additions & 1 deletion docs/user-guide/bestpractices.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,19 @@ Here are some features which can help optimize memory usage:
* Enable asynchronous memory allocation: Use the ``--rmm-async`` flag to allow RMM to handle memory allocation more efficiently, by allocating and deallocating GPU memory asynchronously.
* Set a memory release threshold: For example, ``--rmm-release-threshold 50GB`` can help prevent holding onto excess memory, releasing unused memory when a certain limit is reached. Please keep in mind that using this flag may degrade performance slightly as RMM is busy releasing the unused memory.

You can set these flags while initializing your own Dask client, for example:
You can pass these parameters directly into NeMo Curator's ``get_client`` function, which initializes a Dask client for you:

.. code-block:: python
from nemo_curator.utils.distributed_utils import get_client
client = get_client(
cluster_type="gpu",
rmm_async=True,
rmm_release_threshold="50GB",
)
Alternatively, you can set these flags while initializing your own Dask client, for example:

.. code-block:: python
Expand Down
54 changes: 50 additions & 4 deletions nemo_curator/utils/distributed_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ def start_dask_gpu_local_cluster(
rmm_pool_size="1024M",
enable_spilling=True,
set_torch_to_use_rmm=True,
rmm_async=True,
rmm_maximum_pool_size=None,
rmm_managed_memory=False,
rmm_release_threshold=None,
**cluster_kwargs,
) -> Client:
"""
This function sets up a Dask cluster across all the
Expand All @@ -82,9 +87,13 @@ def start_dask_gpu_local_cluster(
cluster = LocalCUDACluster(
rmm_pool_size=rmm_pool_size,
protocol=protocol,
rmm_async=True,
enable_cudf_spill=enable_spilling,
rmm_async=rmm_async,
rmm_maximum_pool_size=rmm_maximum_pool_size,
rmm_managed_memory=rmm_managed_memory,
rmm_release_threshold=rmm_release_threshold,
**extra_kwargs,
**cluster_kwargs,
)
client = Client(cluster)

Expand All @@ -101,14 +110,18 @@ def start_dask_gpu_local_cluster(


def start_dask_cpu_local_cluster(
n_workers=os.cpu_count(), threads_per_worker=1
n_workers=os.cpu_count(), threads_per_worker=1, **cluster_kwargs
) -> Client:
"""
This function sets up a Dask cluster across all the
CPUs present on the machine.
"""
cluster = LocalCluster(n_workers=n_workers, threads_per_worker=threads_per_worker)
cluster = LocalCluster(
n_workers=n_workers,
threads_per_worker=threads_per_worker,
**cluster_kwargs,
)
client = Client(cluster)
return client

Expand All @@ -124,6 +137,11 @@ def get_client(
rmm_pool_size="1024M",
enable_spilling=True,
set_torch_to_use_rmm=False,
rmm_async=True,
rmm_maximum_pool_size=None,
rmm_managed_memory=False,
rmm_release_threshold=None,
**cluster_kwargs,
) -> Client:
"""
Initializes or connects to a Dask cluster.
Expand Down Expand Up @@ -155,6 +173,27 @@ def get_client(
host to enable out-of-memory computation, i.e., computing on objects that occupy more memory than is available on the GPU.
set_torch_to_use_rmm: For GPU-based clusters only. Sets up the PyTorch memory pool to be the same as the RAPIDS memory pool.
This helps avoid OOM errors when using both PyTorch and RAPIDS on the same GPU.
rmm_async: For GPU-based clusters only. Initializes each worker with RAPIDS Memory Manager (RMM)
(see RMM documentation for more information: https://docs.rapids.ai/api/rmm/stable/)
and sets it to use RMM's asynchronous allocator. Warning: The asynchronous allocator requires CUDA Toolkit 11.2 or newer.
It is also incompatible with RMM pools and managed memory. Trying to enable both will result in an exception.
rmm_maximum_pool_size: For GPU-based clusters only. When rmm_pool_size is set, this argument indicates the maximum pool size.
Can be an integer (bytes), float (fraction of total device memory), string (like "5GB" or "5000M") or None.
By default, the total available memory on the GPU is used.
rmm_pool_size must be specified to use RMM pool and to set the maximum pool size.
Note: When paired with --enable-rmm-async the maximum size cannot be guaranteed due to fragmentation.
Note: This size is a per-worker configuration, and not cluster-wide.
rmm_managed_memory: For GPU-based clusters only. Initialize each worker with RMM and set it to use managed memory.
If disabled, RMM may still be used by specifying rmm_pool_size.
Warning: Managed memory is currently incompatible with NVLink. Trying to enable both will result in an exception.
rmm_release_threshold: For GPU-based clusters only. When rmm.async is True and the pool size grows beyond this value,
unused memory held by the pool will be released at the next synchronization point.
Can be an integer (bytes), float (fraction of total device memory), string (like "5GB" or "5000M") or None.
By default, this feature is disabled.
Note: This size is a per-worker configuration, and not cluster-wide.
cluster_kwargs: Additional keyword arguments for the LocalCluster or LocalCUDACluster configuration.
See API documentation https://docs.dask.org/en/stable/deploying-python.html#distributed.deploy.local.LocalCluster
for all LocalCluster parameters, or https://docs.rapids.ai/api/dask-cuda/nightly/api/ for all LocalCUDACluster parameters.
Returns:
A Dask client object.
Expand All @@ -179,10 +218,17 @@ def get_client(
rmm_pool_size=rmm_pool_size,
enable_spilling=enable_spilling,
set_torch_to_use_rmm=set_torch_to_use_rmm,
rmm_async=rmm_async,
rmm_maximum_pool_size=rmm_maximum_pool_size,
rmm_managed_memory=rmm_managed_memory,
rmm_release_threshold=rmm_release_threshold,
**cluster_kwargs,
)
else:
return start_dask_cpu_local_cluster(
n_workers=n_workers, threads_per_worker=threads_per_worker
n_workers=n_workers,
threads_per_worker=threads_per_worker,
**cluster_kwargs,
)


Expand Down

0 comments on commit 06c388d

Please sign in to comment.