Skip to content

Commit

Permalink
[Core] | Added util for concurrency control (#1064)
Browse files Browse the repository at this point in the history
# Description

What - Added a util `semaphore_async_iterator` to enable seamless
control over concurrent executions per kind.

Why - Simplifies the process of implementing concurrent limit when
streaming async tasks. Works magic with the existing
`stream_async_iterators_task` util for concurrency control.

How - Utilized asyncio Semaphore context manager, works with Bounded and
Non Bounded Semaphores

## Type of change

Please leave one option from the following and delete the rest:

- [ ] Bug fix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [ ] Non-breaking change (fix of existing functionality that will not
change current behavior)
- [ ] Documentation (added/updated documentation)

<h4> All tests should be run against the port production
environment(using a testing org). </h4>

### Core testing checklist

- [x] Integration able to create all default resources from scratch
- [x] Resync finishes successfully
- [x] Resync able to create entities
- [x] Resync able to update entities
- [x] Resync able to detect and delete entities
- [x] Scheduled resync able to abort existing resync and start a new one
- [x] Tested with at least 2 integrations from scratch
- [x] Tested with Kafka and Polling event listeners
- [x] Tested deletion of entities that don't pass the selector


### Integration testing checklist

- [ ] Integration able to create all default resources from scratch
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Resync finishes successfully
- [ ] If new resource kind is added or updated in the integration, add
example raw data, mapping and expected result to the `examples` folder
in the integration directory.
- [ ] If resource kind is updated, run the integration with the example
data and check if the expected result is achieved
- [ ] If new resource kind is added or updated, validate that
live-events for that resource are working as expected
- [ ] Docs PR link [here](#)

### Preflight checklist

- [ ] Handled rate limiting
- [ ] Handled pagination
- [ ] Implemented the code in async
- [ ] Support Multi account

## Screenshots

Include screenshots from your environment showing how the resources of
the integration will look.

## API Documentation

Provide links to the API documentation used for this integration.
  • Loading branch information
mk-armah authored Oct 7, 2024
1 parent 71c41a9 commit ed86f24
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 1 deletion.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

## 0.12.12 (2024-10-06)

### Improvements

- Added a util `semaphore_async_iterator` to enable seamless control over concurrent executions.


## 0.12.1 (2024-10-02)

### Bug Fixes
Expand Down
45 changes: 45 additions & 0 deletions port_ocean/tests/utils/test_async_iterators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from typing import Any, AsyncGenerator
import asyncio
from port_ocean.utils.async_iterators import semaphore_async_iterator
import pytest


@pytest.mark.asyncio
async def test_semaphore_async_iterator() -> None:
max_concurrency = 5
semaphore = asyncio.BoundedSemaphore(max_concurrency)

concurrent_tasks = 0
max_concurrent_tasks = 0
lock = asyncio.Lock() # Protect shared variables

num_tasks = 20

async def mock_function() -> AsyncGenerator[str, None]:
nonlocal concurrent_tasks, max_concurrent_tasks

async with lock:
concurrent_tasks += 1
if concurrent_tasks > max_concurrent_tasks:
max_concurrent_tasks = concurrent_tasks

await asyncio.sleep(0.1)
yield "result"

async with lock:
concurrent_tasks -= 1

async def consume_iterator(async_iterator: Any) -> None:
async for _ in async_iterator:
pass

tasks = [
consume_iterator(semaphore_async_iterator(semaphore, mock_function))
for _ in range(num_tasks)
]
await asyncio.gather(*tasks)

assert (
max_concurrent_tasks <= max_concurrency
), f"Max concurrent tasks {max_concurrent_tasks} exceeded semaphore limit {max_concurrency}"
assert concurrent_tasks == 0, "Not all tasks have completed"
60 changes: 60 additions & 0 deletions port_ocean/utils/async_iterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import aiostream

if typing.TYPE_CHECKING:
from asyncio import Semaphore


async def stream_async_iterators_tasks(
*tasks: typing.AsyncIterable[typing.Any],
Expand Down Expand Up @@ -47,3 +50,60 @@ async def main():
async with combine.stream() as streamer:
async for batch_items in streamer:
yield batch_items


async def semaphore_async_iterator(
semaphore: "Semaphore",
function: typing.Callable[[], typing.AsyncIterator[typing.Any]],
) -> typing.AsyncIterator[typing.Any]:
"""
Executes an asynchronous iterator function under a semaphore to limit concurrency.
This function ensures that the provided asynchronous iterator function is executed
while respecting the concurrency limit imposed by the semaphore. It acquires the
semaphore before executing the function and releases it after the function completes,
thus controlling the number of concurrent executions.
Parameters:
semaphore (asyncio.Semaphore | asyncio.BoundedSemaphore): The semaphore used to limit concurrency.
function (Callable[[], AsyncIterator[Any]]): A nullary asynchronous function, - apply arguments with `functools.partial` or an anonymous function (lambda)
that returns an asynchronous iterator. This function is executed under the semaphore.
Yields:
Any: The items yielded by the asynchronous iterator function.
Usage:
```python
import asyncio
async def async_iterator_function(param1, param2):
# Your async code here
yield ...
async def async_generator_function():
# Your async code to retrieve items
param1 = "your_param1"
yield param1
async def main():
semaphore = asyncio.BoundedSemaphore(50)
param2 = "your_param2"
tasks = [
semaphore_async_iterator(
semaphore,
lambda: async_iterator_function(param1, param2) # functools.partial(async_iterator_function, param1, param2)
)
async for param1 in async_generator_function()
]
async for batch in stream_async_iterators_tasks(*tasks):
# Process each batch
pass
asyncio.run(main())
```
"""
async with semaphore:
async for result in function():
yield result
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "port-ocean"
version = "0.12.1"
version = "0.12.2"
description = "Port Ocean is a CLI tool for managing your Port projects."
readme = "README.md"
homepage = "https://app.getport.io"
Expand Down

0 comments on commit ed86f24

Please sign in to comment.