diff --git a/CHANGELOG.md b/CHANGELOG.md index f1cc2cf805..a854027cc0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm +## 0.12.12 (2024-10-06) + +### Improvements + +- Added a util `semaphore_async_iterator` to enable seamless control over concurrent executions. + + ## 0.12.1 (2024-10-02) ### Bug Fixes diff --git a/port_ocean/tests/utils/test_async_iterators.py b/port_ocean/tests/utils/test_async_iterators.py new file mode 100644 index 0000000000..e9e483dd7d --- /dev/null +++ b/port_ocean/tests/utils/test_async_iterators.py @@ -0,0 +1,45 @@ +from typing import Any, AsyncGenerator +import asyncio +from port_ocean.utils.async_iterators import semaphore_async_iterator +import pytest + + +@pytest.mark.asyncio +async def test_semaphore_async_iterator() -> None: + max_concurrency = 5 + semaphore = asyncio.BoundedSemaphore(max_concurrency) + + concurrent_tasks = 0 + max_concurrent_tasks = 0 + lock = asyncio.Lock() # Protect shared variables + + num_tasks = 20 + + async def mock_function() -> AsyncGenerator[str, None]: + nonlocal concurrent_tasks, max_concurrent_tasks + + async with lock: + concurrent_tasks += 1 + if concurrent_tasks > max_concurrent_tasks: + max_concurrent_tasks = concurrent_tasks + + await asyncio.sleep(0.1) + yield "result" + + async with lock: + concurrent_tasks -= 1 + + async def consume_iterator(async_iterator: Any) -> None: + async for _ in async_iterator: + pass + + tasks = [ + consume_iterator(semaphore_async_iterator(semaphore, mock_function)) + for _ in range(num_tasks) + ] + await asyncio.gather(*tasks) + + assert ( + max_concurrent_tasks <= max_concurrency + ), f"Max concurrent tasks {max_concurrent_tasks} exceeded semaphore limit {max_concurrency}" + assert concurrent_tasks == 0, "Not all tasks have completed" diff --git a/port_ocean/utils/async_iterators.py b/port_ocean/utils/async_iterators.py index 0d1158ab74..9b50458517 100644 --- a/port_ocean/utils/async_iterators.py +++ b/port_ocean/utils/async_iterators.py @@ -2,6 +2,9 @@ import aiostream +if typing.TYPE_CHECKING: + from asyncio import Semaphore + async def stream_async_iterators_tasks( *tasks: typing.AsyncIterable[typing.Any], @@ -47,3 +50,60 @@ async def main(): async with combine.stream() as streamer: async for batch_items in streamer: yield batch_items + + +async def semaphore_async_iterator( + semaphore: "Semaphore", + function: typing.Callable[[], typing.AsyncIterator[typing.Any]], +) -> typing.AsyncIterator[typing.Any]: + """ + Executes an asynchronous iterator function under a semaphore to limit concurrency. + + This function ensures that the provided asynchronous iterator function is executed + while respecting the concurrency limit imposed by the semaphore. It acquires the + semaphore before executing the function and releases it after the function completes, + thus controlling the number of concurrent executions. + + Parameters: + semaphore (asyncio.Semaphore | asyncio.BoundedSemaphore): The semaphore used to limit concurrency. + function (Callable[[], AsyncIterator[Any]]): A nullary asynchronous function, - apply arguments with `functools.partial` or an anonymous function (lambda) + that returns an asynchronous iterator. This function is executed under the semaphore. + + Yields: + Any: The items yielded by the asynchronous iterator function. + + Usage: + ```python + import asyncio + + async def async_iterator_function(param1, param2): + # Your async code here + yield ... + + async def async_generator_function(): + # Your async code to retrieve items + param1 = "your_param1" + yield param1 + + async def main(): + semaphore = asyncio.BoundedSemaphore(50) + param2 = "your_param2" + + tasks = [ + semaphore_async_iterator( + semaphore, + lambda: async_iterator_function(param1, param2) # functools.partial(async_iterator_function, param1, param2) + ) + async for param1 in async_generator_function() + ] + + async for batch in stream_async_iterators_tasks(*tasks): + # Process each batch + pass + + asyncio.run(main()) + ``` + """ + async with semaphore: + async for result in function(): + yield result diff --git a/pyproject.toml b/pyproject.toml index 488ad67c48..d05c183f8e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.12.1" +version = "0.12.2" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io"