Skip to content

Commit

Permalink
PORT-4295 Add update_diff to the ocean context for gitops (#17)
Browse files Browse the repository at this point in the history
* Add update_diff
  • Loading branch information
yairsimantov20 authored Jul 20, 2023
1 parent 217c514 commit 62b5d47
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 12 deletions.
8 changes: 8 additions & 0 deletions port_ocean/context/ocean.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
RESYNC_EVENT_LISTENER,
START_EVENT_LISTENER,
RawEntityDiff,
EntityDiff,
)
from port_ocean.exceptions.context import PortOceanContextNotFoundError

Expand Down Expand Up @@ -67,6 +68,13 @@ async def update_raw_diff(
) -> None:
await self.integration.update_raw_diff(kind, raw_diff, user_agent_type)

async def update_diff(
self,
diff: EntityDiff,
user_agent_type: UserAgentType = UserAgentType.exporter,
) -> None:
await self.integration.update_diff(diff, user_agent_type)

async def register_raw(
self,
kind: str,
Expand Down
2 changes: 1 addition & 1 deletion port_ocean/core/event_listener/kafka/event_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async def _handle_message(self, message: dict[Any, Any], topic: str) -> None:
if not self.should_be_processed(message, topic):
return

if "change.log" in topic:
if "change.log" in topic and message is not None:
await self.events["on_resync"](message)

def wrapped_start(
Expand Down
24 changes: 19 additions & 5 deletions port_ocean/core/integrations/mixins/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,23 @@
ASYNC_GENERATOR_RESYNC_TYPE,
)
from port_ocean.core.utils import zip_and_sum
from port_ocean.exceptions.core import OceanAbortException


class SyncMixin(HandlerMixin):
def __init__(self) -> None:
HandlerMixin.__init__(self)

async def update_diff(
self,
desired_state: EntityDiff,
user_agent_type: UserAgentType,
) -> None:
await self.entities_state_applier.apply_diff(
{"before": desired_state["before"], "after": desired_state["after"]},
user_agent_type,
)

async def register(
self,
entities: list[Entity],
Expand Down Expand Up @@ -201,12 +212,15 @@ async def _register_in_batches(
)
)
for generator in async_generators:
async for item in generator:
registered_entities_results.append(
await self._register_resource_raw(
resource_config, [item], user_agent_type
try:
async for item in generator:
registered_entities_results.append(
await self._register_resource_raw(
resource_config, [item], user_agent_type
)
)
)
except* OceanAbortException as error:
errors.append(error)

entities: list[Entity] = sum(registered_entities_results, [])
logger.info(
Expand Down
15 changes: 11 additions & 4 deletions port_ocean/core/integrations/mixins/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,17 @@ async def resync_generator_wrapper(
fn: Callable[[str], ASYNC_GENERATOR_RESYNC_TYPE], kind: str
) -> ASYNC_GENERATOR_RESYNC_TYPE:
generator = fn(kind)
errors = []
try:
while True:
with resync_error_handling():
result = await anext(generator)
yield validate_result([result])[0]
try:
with resync_error_handling():
result = await anext(generator)
yield validate_result([result])[0]
except OceanAbortException as error:
errors.append(error)
except StopAsyncIteration:
pass
if errors:
raise ExceptionGroup(
"At least one of the resync generator iterations failed", errors
)
5 changes: 3 additions & 2 deletions port_ocean/port_defaults.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import asyncio
import json
from pathlib import Path
from typing import Type, Any, TypedDict, Optional

import httpx
import yaml
from loguru import logger
from pathlib import Path
from pydantic import BaseModel, Field
from starlette import status

Expand Down Expand Up @@ -213,7 +213,8 @@ def get_port_integration_defaults(
if path.stem in allowed_file_names:
if path.suffix in YAML_EXTENSIONS:
default_jsons[path.stem] = yaml.safe_load(path.read_text())
default_jsons[path.stem] = json.loads(path.read_text())
else:
default_jsons[path.stem] = json.loads(path.read_text())

return Defaults(
blueprints=default_jsons.get("blueprints", []),
Expand Down

0 comments on commit 62b5d47

Please sign in to comment.