diff --git a/port_ocean/context/ocean.py b/port_ocean/context/ocean.py index d07e57af84..e8e73fc8f1 100644 --- a/port_ocean/context/ocean.py +++ b/port_ocean/context/ocean.py @@ -10,6 +10,7 @@ RESYNC_EVENT_LISTENER, START_EVENT_LISTENER, RawEntityDiff, + EntityDiff, ) from port_ocean.exceptions.context import PortOceanContextNotFoundError @@ -67,6 +68,13 @@ async def update_raw_diff( ) -> None: await self.integration.update_raw_diff(kind, raw_diff, user_agent_type) + async def update_diff( + self, + diff: EntityDiff, + user_agent_type: UserAgentType = UserAgentType.exporter, + ) -> None: + await self.integration.update_diff(diff, user_agent_type) + async def register_raw( self, kind: str, diff --git a/port_ocean/core/event_listener/kafka/event_listener.py b/port_ocean/core/event_listener/kafka/event_listener.py index 309e95eac6..4011f6af7c 100644 --- a/port_ocean/core/event_listener/kafka/event_listener.py +++ b/port_ocean/core/event_listener/kafka/event_listener.py @@ -65,7 +65,7 @@ async def _handle_message(self, message: dict[Any, Any], topic: str) -> None: if not self.should_be_processed(message, topic): return - if "change.log" in topic: + if "change.log" in topic and message is not None: await self.events["on_resync"](message) def wrapped_start( diff --git a/port_ocean/core/integrations/mixins/sync.py b/port_ocean/core/integrations/mixins/sync.py index e330206603..e12fe5ffd0 100644 --- a/port_ocean/core/integrations/mixins/sync.py +++ b/port_ocean/core/integrations/mixins/sync.py @@ -25,12 +25,23 @@ ASYNC_GENERATOR_RESYNC_TYPE, ) from port_ocean.core.utils import zip_and_sum +from port_ocean.exceptions.core import OceanAbortException class SyncMixin(HandlerMixin): def __init__(self) -> None: HandlerMixin.__init__(self) + async def update_diff( + self, + desired_state: EntityDiff, + user_agent_type: UserAgentType, + ) -> None: + await self.entities_state_applier.apply_diff( + {"before": desired_state["before"], "after": desired_state["after"]}, + user_agent_type, + ) + async def register( self, entities: list[Entity], @@ -201,12 +212,15 @@ async def _register_in_batches( ) ) for generator in async_generators: - async for item in generator: - registered_entities_results.append( - await self._register_resource_raw( - resource_config, [item], user_agent_type + try: + async for item in generator: + registered_entities_results.append( + await self._register_resource_raw( + resource_config, [item], user_agent_type + ) ) - ) + except* OceanAbortException as error: + errors.append(error) entities: list[Entity] = sum(registered_entities_results, []) logger.info( diff --git a/port_ocean/core/integrations/mixins/utils.py b/port_ocean/core/integrations/mixins/utils.py index 5ba240f44e..8329186949 100644 --- a/port_ocean/core/integrations/mixins/utils.py +++ b/port_ocean/core/integrations/mixins/utils.py @@ -37,10 +37,17 @@ async def resync_generator_wrapper( fn: Callable[[str], ASYNC_GENERATOR_RESYNC_TYPE], kind: str ) -> ASYNC_GENERATOR_RESYNC_TYPE: generator = fn(kind) + errors = [] try: while True: - with resync_error_handling(): - result = await anext(generator) - yield validate_result([result])[0] + try: + with resync_error_handling(): + result = await anext(generator) + yield validate_result([result])[0] + except OceanAbortException as error: + errors.append(error) except StopAsyncIteration: - pass + if errors: + raise ExceptionGroup( + "At least one of the resync generator iterations failed", errors + ) diff --git a/port_ocean/port_defaults.py b/port_ocean/port_defaults.py index 6c97496f27..98a3c84ec3 100644 --- a/port_ocean/port_defaults.py +++ b/port_ocean/port_defaults.py @@ -1,11 +1,11 @@ import asyncio import json -from pathlib import Path from typing import Type, Any, TypedDict, Optional import httpx import yaml from loguru import logger +from pathlib import Path from pydantic import BaseModel, Field from starlette import status @@ -213,7 +213,8 @@ def get_port_integration_defaults( if path.stem in allowed_file_names: if path.suffix in YAML_EXTENSIONS: default_jsons[path.stem] = yaml.safe_load(path.read_text()) - default_jsons[path.stem] = json.loads(path.read_text()) + else: + default_jsons[path.stem] = json.loads(path.read_text()) return Defaults( blueprints=default_jsons.get("blueprints", []),