From ed86f245e00094c168dce1e054a000ad17b17ba4 Mon Sep 17 00:00:00 2001 From: Michael Kofi Armah Date: Mon, 7 Oct 2024 09:04:22 +0000 Subject: [PATCH] [Core] | Added util for concurrency control (#1064) # Description What - Added a util `semaphore_async_iterator` to enable seamless control over concurrent executions per kind. Why - Simplifies the process of implementing concurrent limit when streaming async tasks. Works magic with the existing `stream_async_iterators_task` util for concurrency control. How - Utilized asyncio Semaphore context manager, works with Bounded and Non Bounded Semaphores ## Type of change Please leave one option from the following and delete the rest: - [ ] Bug fix (non-breaking change which fixes an issue) - [x] New feature (non-breaking change which adds functionality) - [ ] New Integration (non-breaking change which adds a new integration) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] Non-breaking change (fix of existing functionality that will not change current behavior) - [ ] Documentation (added/updated documentation)

All tests should be run against the port production environment(using a testing org).

### Core testing checklist - [x] Integration able to create all default resources from scratch - [x] Resync finishes successfully - [x] Resync able to create entities - [x] Resync able to update entities - [x] Resync able to detect and delete entities - [x] Scheduled resync able to abort existing resync and start a new one - [x] Tested with at least 2 integrations from scratch - [x] Tested with Kafka and Polling event listeners - [x] Tested deletion of entities that don't pass the selector ### Integration testing checklist - [ ] Integration able to create all default resources from scratch - [ ] Resync able to create entities - [ ] Resync able to update entities - [ ] Resync able to detect and delete entities - [ ] Resync finishes successfully - [ ] If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the `examples` folder in the integration directory. - [ ] If resource kind is updated, run the integration with the example data and check if the expected result is achieved - [ ] If new resource kind is added or updated, validate that live-events for that resource are working as expected - [ ] Docs PR link [here](#) ### Preflight checklist - [ ] Handled rate limiting - [ ] Handled pagination - [ ] Implemented the code in async - [ ] Support Multi account ## Screenshots Include screenshots from your environment showing how the resources of the integration will look. ## API Documentation Provide links to the API documentation used for this integration. --- CHANGELOG.md | 7 +++ .../tests/utils/test_async_iterators.py | 45 ++++++++++++++ port_ocean/utils/async_iterators.py | 60 +++++++++++++++++++ pyproject.toml | 2 +- 4 files changed, 113 insertions(+), 1 deletion(-) create mode 100644 port_ocean/tests/utils/test_async_iterators.py diff --git a/CHANGELOG.md b/CHANGELOG.md index f1cc2cf805..a854027cc0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm +## 0.12.12 (2024-10-06) + +### Improvements + +- Added a util `semaphore_async_iterator` to enable seamless control over concurrent executions. + + ## 0.12.1 (2024-10-02) ### Bug Fixes diff --git a/port_ocean/tests/utils/test_async_iterators.py b/port_ocean/tests/utils/test_async_iterators.py new file mode 100644 index 0000000000..e9e483dd7d --- /dev/null +++ b/port_ocean/tests/utils/test_async_iterators.py @@ -0,0 +1,45 @@ +from typing import Any, AsyncGenerator +import asyncio +from port_ocean.utils.async_iterators import semaphore_async_iterator +import pytest + + +@pytest.mark.asyncio +async def test_semaphore_async_iterator() -> None: + max_concurrency = 5 + semaphore = asyncio.BoundedSemaphore(max_concurrency) + + concurrent_tasks = 0 + max_concurrent_tasks = 0 + lock = asyncio.Lock() # Protect shared variables + + num_tasks = 20 + + async def mock_function() -> AsyncGenerator[str, None]: + nonlocal concurrent_tasks, max_concurrent_tasks + + async with lock: + concurrent_tasks += 1 + if concurrent_tasks > max_concurrent_tasks: + max_concurrent_tasks = concurrent_tasks + + await asyncio.sleep(0.1) + yield "result" + + async with lock: + concurrent_tasks -= 1 + + async def consume_iterator(async_iterator: Any) -> None: + async for _ in async_iterator: + pass + + tasks = [ + consume_iterator(semaphore_async_iterator(semaphore, mock_function)) + for _ in range(num_tasks) + ] + await asyncio.gather(*tasks) + + assert ( + max_concurrent_tasks <= max_concurrency + ), f"Max concurrent tasks {max_concurrent_tasks} exceeded semaphore limit {max_concurrency}" + assert concurrent_tasks == 0, "Not all tasks have completed" diff --git a/port_ocean/utils/async_iterators.py b/port_ocean/utils/async_iterators.py index 0d1158ab74..9b50458517 100644 --- a/port_ocean/utils/async_iterators.py +++ b/port_ocean/utils/async_iterators.py @@ -2,6 +2,9 @@ import aiostream +if typing.TYPE_CHECKING: + from asyncio import Semaphore + async def stream_async_iterators_tasks( *tasks: typing.AsyncIterable[typing.Any], @@ -47,3 +50,60 @@ async def main(): async with combine.stream() as streamer: async for batch_items in streamer: yield batch_items + + +async def semaphore_async_iterator( + semaphore: "Semaphore", + function: typing.Callable[[], typing.AsyncIterator[typing.Any]], +) -> typing.AsyncIterator[typing.Any]: + """ + Executes an asynchronous iterator function under a semaphore to limit concurrency. + + This function ensures that the provided asynchronous iterator function is executed + while respecting the concurrency limit imposed by the semaphore. It acquires the + semaphore before executing the function and releases it after the function completes, + thus controlling the number of concurrent executions. + + Parameters: + semaphore (asyncio.Semaphore | asyncio.BoundedSemaphore): The semaphore used to limit concurrency. + function (Callable[[], AsyncIterator[Any]]): A nullary asynchronous function, - apply arguments with `functools.partial` or an anonymous function (lambda) + that returns an asynchronous iterator. This function is executed under the semaphore. + + Yields: + Any: The items yielded by the asynchronous iterator function. + + Usage: + ```python + import asyncio + + async def async_iterator_function(param1, param2): + # Your async code here + yield ... + + async def async_generator_function(): + # Your async code to retrieve items + param1 = "your_param1" + yield param1 + + async def main(): + semaphore = asyncio.BoundedSemaphore(50) + param2 = "your_param2" + + tasks = [ + semaphore_async_iterator( + semaphore, + lambda: async_iterator_function(param1, param2) # functools.partial(async_iterator_function, param1, param2) + ) + async for param1 in async_generator_function() + ] + + async for batch in stream_async_iterators_tasks(*tasks): + # Process each batch + pass + + asyncio.run(main()) + ``` + """ + async with semaphore: + async for result in function(): + yield result diff --git a/pyproject.toml b/pyproject.toml index 488ad67c48..d05c183f8e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.12.1" +version = "0.12.2" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io"