Skip to content

Commit

Permalink
[Framework] Add ingestion of kind examples to port (#640)
Browse files Browse the repository at this point in the history
# Description

What - Add ingestion of kind examples to port
Why - Improve building mapping experience
How - Send one example that passed the selector, from each batch, to
port

## Type of change

Please leave one option from the following and delete the rest:

- [ ] Bug fix (non-breaking change which fixes an issue)
- [x] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [ ] Non-breaking change (fix of existing functionality that will not
change current behavior)
- [ ] Documentation (added/updated documentation)

## Screenshots

Include screenshots from your environment showing how the resources of
the integration will look.

## API Documentation

Provide links to the API documentation used for this integration.
  • Loading branch information
talsabagport authored May 26, 2024
1 parent 3053ddb commit 20307b9
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 25 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

## 0.5.21 (2024-05-26)

### Features

- Added `send_raw_data_examples` integration config to allow sending raw data examples from the third party API to port (on resync), for testing and managing the integration mapping


## 0.5.20 (2024-05-26)


Expand Down
16 changes: 16 additions & 0 deletions port_ocean/clients/port/mixins/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from port_ocean.clients.port.authentication import PortAuthentication
from port_ocean.clients.port.utils import handle_status_code
from port_ocean.log.sensetive import sensitive_log_filter

if TYPE_CHECKING:
from port_ocean.core.handlers.port_app_config.models import PortAppConfig
Expand Down Expand Up @@ -137,3 +138,18 @@ async def ingest_integration_logs(self, logs: list[dict[str, Any]]) -> None:
)
handle_status_code(response)
logger.debug("Logs successfully ingested")

async def ingest_integration_kind_examples(
self, kind: str, data: list[dict[str, Any]], should_log: bool = True
):
logger.debug(f"Ingesting examples for kind: {kind}")
headers = await self.auth.headers()
response = await self.client.post(
f"{self.auth.api_url}/integration/{self.integration_identifier}/kinds/{kind}/examples",
headers=headers,
json={
"examples": sensitive_log_filter.mask_object(data, full_hide=True),
},
)
handle_status_code(response, should_log=should_log)
logger.debug(f"Examples for kind {kind} successfully ingested")
1 change: 1 addition & 0 deletions port_ocean/config/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class IntegrationConfiguration(BaseOceanSettings, extra=Extra.allow):
initialize_port_resources: bool = True
scheduled_resync_interval: int | None = None
client_timeout: int = 30
send_raw_data_examples: bool = True
port: PortSettings
event_listener: EventListenerSettingsType
integration: IntegrationSettings
7 changes: 6 additions & 1 deletion port_ocean/core/handlers/entity_processor/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ async def _parse_items(
mapping: ResourceConfig,
raw_data: list[RAW_ITEM],
parse_all: bool = False,
send_raw_data_examples_amount: int = 0,
) -> EntitySelectorDiff:
pass

Expand All @@ -48,16 +49,20 @@ async def parse_items(
mapping: ResourceConfig,
raw_data: list[RAW_ITEM],
parse_all: bool = False,
send_raw_data_examples_amount: int = 0,
) -> EntitySelectorDiff:
"""Public method to parse raw entity data and map it to an EntityDiff.
Args:
mapping (ResourceConfig): The configuration for entity mapping.
raw_data (list[RawEntity]): The raw data to be parsed.
parse_all (bool): Whether to parse all data or just data that passed the selector.
send_raw_data_examples_amount (bool): Whether to send example data to the integration service.
Returns:
EntityDiff: The parsed entity differences.
"""
with logger.contextualize(kind=mapping.kind):
return await self._parse_items(mapping, raw_data, parse_all)
return await self._parse_items(
mapping, raw_data, parse_all, send_raw_data_examples_amount
)
62 changes: 51 additions & 11 deletions port_ocean/core/handlers/entity_processor/jq_entity_processor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import asyncio
import functools
from dataclasses import dataclass, field
from functools import lru_cache
from typing import Any
from loguru import logger
from typing import Any, Optional

import pyjq as jq # type: ignore
from loguru import logger

from port_ocean.context.ocean import ocean
from port_ocean.core.handlers.entity_processor.base import BaseEntityProcessor
from port_ocean.core.handlers.port_app_config.models import ResourceConfig
from port_ocean.core.models import Entity
Expand All @@ -17,6 +19,18 @@
from port_ocean.utils.queue_utils import process_in_queue


@dataclass
class MappedEntity:
"""Represents the entity after applying the mapping
This class holds the mapping entity along with the selector boolean value and optionally the raw data.
"""

entity: dict[str, Any] = field(default_factory=dict)
did_entity_pass_selector: bool = False
raw_data: Optional[dict[str, Any]] = None


class JQEntityProcessor(BaseEntityProcessor):
"""Processes and parses entities using JQ expressions.
Expand Down Expand Up @@ -78,13 +92,17 @@ async def _get_mapped_entity(
raw_entity_mappings: dict[str, Any],
selector_query: str,
parse_all: bool = False,
) -> tuple[dict[str, Any], bool]:
) -> MappedEntity:
should_run = await self._search_as_bool(data, selector_query)
if parse_all or should_run:
mapped_entity = await self._search_as_object(data, raw_entity_mappings)
return mapped_entity, should_run
return MappedEntity(
mapped_entity,
did_entity_pass_selector=should_run,
raw_data=data if should_run else None,
)

return {}, False
return MappedEntity()

async def _calculate_entity(
self,
Expand All @@ -93,7 +111,7 @@ async def _calculate_entity(
items_to_parse: str | None,
selector_query: str,
parse_all: bool = False,
) -> list[tuple[dict[str, Any], bool]]:
) -> list[MappedEntity]:
if items_to_parse:
items = await self._search(data, items_to_parse)
if isinstance(items, list):
Expand All @@ -118,13 +136,27 @@ async def _calculate_entity(
data, raw_entity_mappings, selector_query, parse_all
)
]
return [({}, False)]
return [MappedEntity()]

@staticmethod
async def _send_examples(data: list[dict[str, Any]], kind: str) -> None:
try:
if data:
await ocean.port_client.ingest_integration_kind_examples(
kind, data, should_log=False
)
except Exception as ex:
logger.warning(
f"Failed to send raw data example {ex}",
exc_info=True,
)

async def _parse_items(
self,
mapping: ResourceConfig,
raw_results: list[RAW_ITEM],
parse_all: bool = False,
send_raw_data_examples_amount: int = 0,
) -> EntitySelectorDiff:
raw_entity_mappings: dict[str, Any] = mapping.port.entity.mappings.dict(
exclude_unset=True
Expand All @@ -141,13 +173,21 @@ async def _parse_items(

passed_entities = []
failed_entities = []
examples_to_send: list[dict[str, Any]] = []
for entities_results in calculated_entities_results:
for entity, did_entity_pass_selector in entities_results:
if entity.get("identifier") and entity.get("blueprint"):
parsed_entity = Entity.parse_obj(entity)
if did_entity_pass_selector:
for result in entities_results:
if result.entity.get("identifier") and result.entity.get("blueprint"):
parsed_entity = Entity.parse_obj(result.entity)
if result.did_entity_pass_selector:
passed_entities.append(parsed_entity)
if (
len(examples_to_send) < send_raw_data_examples_amount
and result.raw_data is not None
):
examples_to_send.append(result.raw_data)
else:
failed_entities.append(parsed_entity)

await self._send_examples(examples_to_send, mapping.kind)

return EntitySelectorDiff(passed=passed_entities, failed=failed_entities)
30 changes: 26 additions & 4 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@
from port_ocean.exceptions.core import OceanAbortException


SEND_RAW_DATA_EXAMPLES_AMOUNT = 5


class SyncRawMixin(HandlerMixin, EventsMixin):
"""Mixin class for synchronization of raw constructed entities.
Expand Down Expand Up @@ -124,10 +127,13 @@ async def _calculate_raw(
self,
raw_diff: list[tuple[ResourceConfig, list[RAW_ITEM]]],
parse_all: bool = False,
send_raw_data_examples_amount: int = 0,
) -> list[EntitySelectorDiff]:
return await asyncio.gather(
*(
self.entity_processor.parse_items(mapping, results, parse_all)
self.entity_processor.parse_items(
mapping, results, parse_all, send_raw_data_examples_amount
)
for mapping, results in raw_diff
)
)
Expand All @@ -138,8 +144,11 @@ async def _register_resource_raw(
results: list[dict[Any, Any]],
user_agent_type: UserAgentType,
parse_all: bool = False,
send_raw_data_examples_amount: int = 0,
) -> EntitySelectorDiff:
objects_diff = await self._calculate_raw([(resource, results)], parse_all)
objects_diff = await self._calculate_raw(
[(resource, results)], parse_all, send_raw_data_examples_amount
)
await self.entities_state_applier.upsert(
objects_diff[0].passed, user_agent_type
)
Expand Down Expand Up @@ -171,19 +180,32 @@ async def _register_in_batches(
else:
async_generators.append(result)

send_raw_data_examples_amount = (
SEND_RAW_DATA_EXAMPLES_AMOUNT if ocean.config.send_raw_data_examples else 0
)
entities = (
await self._register_resource_raw(
resource_config, raw_results, user_agent_type
resource_config,
raw_results,
user_agent_type,
send_raw_data_examples_amount=send_raw_data_examples_amount,
)
).passed

for generator in async_generators:
try:
async for items in generator:
if send_raw_data_examples_amount > 0:
send_raw_data_examples_amount = max(
0, send_raw_data_examples_amount - len(entities)
)
entities.extend(
(
await self._register_resource_raw(
resource_config, items, user_agent_type
resource_config,
items,
user_agent_type,
send_raw_data_examples_amount=send_raw_data_examples_amount,
)
).passed
)
Expand Down
33 changes: 24 additions & 9 deletions port_ocean/log/sensetive.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import re
from typing import Callable, TYPE_CHECKING
from typing import Any, Callable, TYPE_CHECKING

if TYPE_CHECKING:
from loguru import Record
Expand Down Expand Up @@ -35,16 +35,31 @@ def hide_sensitive_strings(self, *tokens: str) -> None:
[re.compile(re.escape(token.strip())) for token in tokens if token.strip()]
)

def mask_string(self, string: str, full_hide: bool = False) -> str:
masked_string = string
for pattern in self.compiled_patterns:
replace: Callable[[re.Match[str]], str] | str = (
"[REDACTED]"
if full_hide
else lambda match: match.group()[:6] + "[REDACTED]"
)
masked_string = pattern.sub(replace, masked_string)
return masked_string

def mask_object(self, obj: Any, full_hide: bool = False) -> Any:
if isinstance(obj, str):
return self.mask_string(obj, full_hide)
if isinstance(obj, list):
return [self.mask_object(o, full_hide) for o in obj]
if isinstance(obj, dict):
for k, v in obj.items():
obj[k] = self.mask_object(v, full_hide)

return obj

def create_filter(self, full_hide: bool = False) -> Callable[["Record"], bool]:
def _filter(record: "Record") -> bool:
for pattern in self.compiled_patterns:
replace: Callable[[re.Match[str]], str] | str = (
"[REDACTED]"
if full_hide
else lambda match: match.group()[:6] + "[REDACTED]"
)
record["message"] = pattern.sub(replace, record["message"])

record["message"] = self.mask_string(record["message"], full_hide)
return True

return _filter
Expand Down

0 comments on commit 20307b9

Please sign in to comment.