From 7a06b33d476dda433271cbecaafa7ebb2db507ae Mon Sep 17 00:00:00 2001 From: Michael Kofi Armah Date: Tue, 15 Oct 2024 09:00:14 +0000 Subject: [PATCH] [Integration][AWS] | Improved Concurrency Control and Eliminated Likelihood of Thundering Herd (#1063) # Description **What:** - Refactored semaphore implementation to effectively limit concurrency across tasks. - Added a util `semaphore_async_iterator` to enable seamless control over concurrent executions per kind (can be re-used in other integrations). - Removed iterative calls to the cache for tracking token expiry, reducing the likelihood of a thundering herd problem. **Why:** - The previous implementation limited concurrency within tasks (accounts), ideally, this use case requires concurrent limits is to be global, thus across accounts. - Iterative cache calls could potentially cause a thundering herd problem when cache expires and token needed to be refreshed. **How:** - Applied the semaphore correctly by wrapping task creation with the semaphore, ensuring proper concurrency control. - Implemented unit tests using `pytest` to verify semaphore functionality and concurrency limits. - Optimized cache usage by eliminating unnecessary iterative calls for tracking expiry. ## Type of change Please leave one option from the following and delete the rest: - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] New Integration (non-breaking change which adds a new integration) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [x] Non-breaking change (fix of existing functionality that will not change current behavior) - [ ] Documentation (added/updated documentation) #### All tests should be run against the port production environment (using a testing org). ### Core testing checklist - [x] Integration able to create all default resources from scratch - [x] Resync finishes successfully - [x] Resync able to create entities - [x] Resync able to update entities - [x] Resync able to detect and delete entities - [x] Scheduled resync able to abort existing resync and start a new one - [x] Tested with at least 2 integrations from scratch - [ ] Tested with Kafka and Polling event listeners - [x] Tested deletion of entities that don't pass the selector ### Integration testing checklist - [x] Integration able to create all default resources from scratch - [x] Resync able to create entities - [x] Resync able to update entities - [x] Resync able to detect and delete entities - [x] Resync finishes successfully - [x] If new resource kind is added or updated in the integration, add example raw data, mapping, and expected result to the `examples` folder in the integration directory. - [x] If resource kind is updated, run the integration with the example data and check if the expected result is achieved - [x] If new resource kind is added or updated, validate that live-events for that resource are working as expected - [ ] Docs PR link [here](#) ### Preflight checklist - [x] Handled rate limiting - [x] Handled pagination - [x] Implemented the code in async - [x] Support Multi account ## Screenshots NB: Warning log in the screenshot below is just for demonstration purposes. image ## API Documentation Provide links to the API documentation used for this integration. --- integrations/aws/CHANGELOG.md | 9 ++ integrations/aws/main.py | 143 +++++++++++++++++----------- integrations/aws/pyproject.toml | 2 +- integrations/aws/utils/aws.py | 32 +++---- integrations/aws/utils/resources.py | 6 +- 5 files changed, 114 insertions(+), 78 deletions(-) diff --git a/integrations/aws/CHANGELOG.md b/integrations/aws/CHANGELOG.md index 70388b1e59..7446787537 100644 --- a/integrations/aws/CHANGELOG.md +++ b/integrations/aws/CHANGELOG.md @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +## 0.2.49 (2024-10-14) + + +### Improvements + +- Removed iterative calls to the cache for tracking expiry, reducing the likelihood of a thundering herd problem. +- Enhanced semaphore implementation to properly limit concurrency across tasks, rather than within tasks, improving performance and resource utilization. + + ## 0.2.48 (2024-10-14) diff --git a/integrations/aws/main.py b/integrations/aws/main.py index b3b48249d7..065dd902af 100644 --- a/integrations/aws/main.py +++ b/integrations/aws/main.py @@ -36,7 +36,11 @@ is_server_error, semaphore, ) -from port_ocean.utils.async_iterators import stream_async_iterators_tasks +from port_ocean.utils.async_iterators import ( + stream_async_iterators_tasks, + semaphore_async_iterator, +) +import functools async def _handle_global_resource_resync( @@ -71,25 +75,23 @@ async def resync_resources_for_account( credentials: AwsCredentials, kind: str ) -> ASYNC_GENERATOR_RESYNC_TYPE: """Function to handle fetching resources for a single account.""" + errors, regions = [], [] - async with semaphore: # limit the number of concurrent tasks - errors, regions = [], [] - - if is_global_resource(kind): - async for batch in _handle_global_resource_resync(kind, credentials): - yield batch - else: - async for session in credentials.create_session_for_each_region(): - try: - async for batch in resync_cloudcontrol(kind, session): - yield batch - except Exception as exc: - regions.append(session.region_name) - errors.append(exc) - continue - if errors: - message = f"Failed to fetch {kind} for these regions {regions} with {len(errors)} errors in account {credentials.account_id}" - raise ExceptionGroup(message, errors) + if is_global_resource(kind): + async for batch in _handle_global_resource_resync(kind, credentials): + yield batch + else: + async for session in credentials.create_session_for_each_region(): + try: + async for batch in resync_cloudcontrol(kind, session): + yield batch + except Exception as exc: + regions.append(session.region_name) + errors.append(exc) + continue + if errors: + message = f"Failed to fetch {kind} for these regions {regions} with {len(errors)} errors in account {credentials.account_id}" + raise ExceptionGroup(message, errors) @ocean.on_resync() @@ -99,11 +101,15 @@ async def resync_all(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() tasks = [ - resync_resources_for_account(credentials, kind) + semaphore_async_iterator( + semaphore, + functools.partial(resync_resources_for_account, credentials, kind), + ) async for credentials in get_accounts() ] if tasks: async for batch in stream_async_iterators_tasks(*tasks): + await update_available_access_credentials() yield batch @@ -119,18 +125,23 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() tasks = [ - resync_custom_kind( - kind, - session, - "elasticache", - "describe_cache_clusters", - "CacheClusters", - "Marker", + semaphore_async_iterator( + semaphore, + functools.partial( + resync_custom_kind, + kind, + session, + "elasticache", + "describe_cache_clusters", + "CacheClusters", + "Marker", + ), ) async for session in get_sessions() ] if tasks: async for batch in stream_async_iterators_tasks(*tasks): + await update_available_access_credentials() yield batch @@ -139,19 +150,24 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() tasks = [ - resync_custom_kind( - kind, - session, - "elbv2", - "describe_load_balancers", - "LoadBalancers", - "Marker", + semaphore_async_iterator( + semaphore, + functools.partial( + resync_custom_kind, + kind, + session, + "elbv2", + "describe_load_balancers", + "LoadBalancers", + "Marker", + ), ) async for session in get_sessions() ] if tasks: async for batch in stream_async_iterators_tasks(*tasks): + await update_available_access_credentials() yield batch @@ -160,19 +176,24 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() tasks = [ - resync_custom_kind( - kind, - session, - "acm", - "list_certificates", - "CertificateSummaryList", - "NextToken", + semaphore_async_iterator( + semaphore, + functools.partial( + resync_custom_kind, + kind, + session, + "acm", + "list_certificates", + "CertificateSummaryList", + "NextToken", + ), ) async for session in get_sessions() ] if tasks: async for batch in stream_async_iterators_tasks(*tasks): + await update_available_access_credentials() yield batch @@ -180,19 +201,24 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() tasks = [ - resync_custom_kind( - kind, - session, - "ec2", - "describe_images", - "Images", - "NextToken", - {"Owners": ["self"]}, + semaphore_async_iterator( + semaphore, + functools.partial( + resync_custom_kind, + kind, + session, + "ec2", + "describe_images", + "Images", + "NextToken", + {"Owners": ["self"]}, + ), ) async for session in get_sessions() ] if tasks: async for batch in stream_async_iterators_tasks(*tasks): + await update_available_access_credentials() yield batch @@ -200,19 +226,24 @@ async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() tasks = [ - resync_custom_kind( - kind, - session, - "cloudformation", - "describe_stacks", - "Stacks", - "NextToken", + semaphore_async_iterator( + semaphore, + functools.partial( + resync_custom_kind, + kind, + session, + "cloudformation", + "describe_stacks", + "Stacks", + "NextToken", + ), ) async for session in get_sessions() ] if tasks: async for batch in stream_async_iterators_tasks(*tasks): + await update_available_access_credentials() yield batch diff --git a/integrations/aws/pyproject.toml b/integrations/aws/pyproject.toml index 888ec408c3..2b619b48e3 100644 --- a/integrations/aws/pyproject.toml +++ b/integrations/aws/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aws" -version = "0.2.48" +version = "0.2.49" description = "This integration will map all your resources in all the available accounts to your Port entities" authors = ["Shalev Avhar ", "Erik Zaadi "] diff --git a/integrations/aws/utils/aws.py b/integrations/aws/utils/aws.py index 840de59138..4de0a93a2b 100644 --- a/integrations/aws/utils/aws.py +++ b/integrations/aws/utils/aws.py @@ -11,7 +11,6 @@ from asyncio import Lock from port_ocean.utils.async_iterators import stream_async_iterators_tasks -from utils.misc import semaphore _session_manager: SessionManager = SessionManager() @@ -81,23 +80,20 @@ async def get_sessions( """ await update_available_access_credentials() - async with semaphore: - if custom_account_id: - credentials = _session_manager.find_credentials_by_account_id( - custom_account_id - ) - async for session in session_factory( - credentials, custom_region, use_default_region - ): - yield session - else: - tasks = [ - session_factory(credentials, custom_region, use_default_region) - async for credentials in get_accounts() - ] - if tasks: - async for batch in stream_async_iterators_tasks(*tasks): - yield batch + if custom_account_id: + credentials = _session_manager.find_credentials_by_account_id(custom_account_id) + async for session in session_factory( + credentials, custom_region, use_default_region + ): + yield session + else: + tasks = [ + session_factory(credentials, custom_region, use_default_region) + async for credentials in get_accounts() + ] + if tasks: + async for batch in stream_async_iterators_tasks(*tasks): + yield batch def validate_request(request: Request) -> tuple[bool, str]: diff --git a/integrations/aws/utils/resources.py b/integrations/aws/utils/resources.py index 0b9e26e393..d8f408f314 100644 --- a/integrations/aws/utils/resources.py +++ b/integrations/aws/utils/resources.py @@ -14,7 +14,7 @@ from utils.aws import get_sessions from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE -from utils.aws import _session_manager, update_available_access_credentials +from utils.aws import _session_manager from utils.overrides import AWSResourceConfig from botocore.config import Config as Boto3Config @@ -127,7 +127,7 @@ async def resync_custom_kind( next_token = None if not describe_method_params: describe_method_params = {} - while await update_available_access_credentials(): + while True: async with session.client(service_name) as client: try: params: dict[str, Any] = describe_method_params @@ -172,7 +172,7 @@ async def resync_cloudcontrol( account_id = await _session_manager.find_account_id_by_session(session) logger.info(f"Resyncing {kind} in account {account_id} in region {region}") next_token = None - while await update_available_access_credentials(): + while True: async with session.client("cloudcontrol") as cloudcontrol: try: params = {