From de838705b93626efcd624e78400d8999d3295fa3 Mon Sep 17 00:00:00 2001 From: yair Date: Thu, 10 Oct 2024 15:46:50 +0300 Subject: [PATCH] upserts no entities save --- port_ocean/clients/port/mixins/entities.py | 79 ++++++++++--------- .../entities_state_applier/port/applier.py | 63 +++++++-------- .../core/integrations/mixins/sync_raw.py | 6 +- pyproject.toml | 2 +- 4 files changed, 76 insertions(+), 74 deletions(-) diff --git a/port_ocean/clients/port/mixins/entities.py b/port_ocean/clients/port/mixins/entities.py index f8d5f63fa3..6d3659dfcb 100644 --- a/port_ocean/clients/port/mixins/entities.py +++ b/port_ocean/clients/port/mixins/entities.py @@ -56,33 +56,33 @@ async def upsert_entity( f"entity: {entity.identifier} of " f"blueprint: {entity.blueprint}" ) - handle_status_code(response, should_raise) - result = response.json() - - result_entity = ( - Entity.parse_obj(result["entity"]) if result.get("entity") else entity - ) - - # Happens when upsert fails and search identifier is defined. - # We return None to ignore the entity later in the delete process - if result_entity.is_using_search_identifier: - return None - - # In order to save memory we'll keep only the identifier, blueprint and relations of the - # upserted entity result for later calculations - reduced_entity = Entity( - identifier=result_entity.identifier, blueprint=result_entity.blueprint - ) - - # Turning dict typed relations (raw search relations) is required - # for us to be able to successfully calculate the participation related entities - # and ignore the ones that don't as they weren't upserted - reduced_entity.relations = { - key: None if isinstance(relation, dict) else relation - for key, relation in result_entity.relations.items() - } - - return reduced_entity + # handle_status_code(response, should_raise) + # result = response.json() + # + # result_entity = ( + # Entity.parse_obj(result["entity"]) if result.get("entity") else entity + # ) + # + # # Happens when upsert fails and search identifier is defined. + # # We return None to ignore the entity later in the delete process + # if result_entity.is_using_search_identifier: + # return None + # + # # In order to save memory we'll keep only the identifier, blueprint and relations of the + # # upserted entity result for later calculations + # reduced_entity = Entity( + # identifier=result_entity.identifier, blueprint=result_entity.blueprint + # ) + # + # # Turning dict typed relations (raw search relations) is required + # # for us to be able to successfully calculate the participation related entities + # # and ignore the ones that don't as they weren't upserted + # reduced_entity.relations = { + # key: None if isinstance(relation, dict) else relation + # for key, relation in result_entity.relations.items() + # } + + return None async def batch_upsert_entities( self, @@ -91,7 +91,7 @@ async def batch_upsert_entities( user_agent_type: UserAgentType | None = None, should_raise: bool = True, ) -> list[Entity]: - modified_entities_results = await asyncio.gather( + await asyncio.gather( *( self.upsert_entity( entity, @@ -103,17 +103,18 @@ async def batch_upsert_entities( ), return_exceptions=True, ) - entity_results = [ - entity for entity in modified_entities_results if isinstance(entity, Entity) - ] - if not should_raise: - return entity_results - - for entity_result in modified_entities_results: - if isinstance(entity_result, Exception): - raise entity_result - - return entity_results + return [] + # entity_results = [ + # entity for entity in modified_entities_results if isinstance(entity, Entity) + # ] + # if not should_raise: + # return entity_results + # + # for entity_result in modified_entities_results: + # if isinstance(entity_result, Exception): + # raise entity_result + # + # return entity_results async def delete_entity( self, diff --git a/port_ocean/core/handlers/entities_state_applier/port/applier.py b/port_ocean/core/handlers/entities_state_applier/port/applier.py index c1ab47dd75..1badaf5022 100644 --- a/port_ocean/core/handlers/entities_state_applier/port/applier.py +++ b/port_ocean/core/handlers/entities_state_applier/port/applier.py @@ -97,37 +97,38 @@ async def upsert( self, entities: list[Entity], user_agent_type: UserAgentType ) -> list[Entity]: logger.info(f"Upserting {len(entities)} entities") - modified_entities: list[Entity] = [] - if event.port_app_config.create_missing_related_entities: - modified_entities = await self.context.port_client.batch_upsert_entities( - entities, - event.port_app_config.get_port_request_options(), - user_agent_type, - should_raise=False, - ) - else: - entities_with_search_identifier: list[Entity] = [] - entities_without_search_identifier: list[Entity] = [] - for entity in entities: - if entity.is_using_search_identifier: - entities_with_search_identifier.append(entity) - else: - entities_without_search_identifier.append(entity) - - ordered_created_entities = reversed( - entities_with_search_identifier - + order_by_entities_dependencies(entities_without_search_identifier) - ) - for entity in ordered_created_entities: - upsertedEntity = await self.context.port_client.upsert_entity( - entity, - event.port_app_config.get_port_request_options(), - user_agent_type, - should_raise=False, - ) - if upsertedEntity: - modified_entities.append(upsertedEntity) - return modified_entities + # modified_entities: list[Entity] = [] + # if event.port_app_config.create_missing_related_entities: + await self.context.port_client.batch_upsert_entities( + entities, + event.port_app_config.get_port_request_options(), + user_agent_type, + should_raise=False, + ) + # else: + # entities_with_search_identifier: list[Entity] = [] + # entities_without_search_identifier: list[Entity] = [] + # for entity in entities: + # if entity.is_using_search_identifier: + # entities_with_search_identifier.append(entity) + # else: + # entities_without_search_identifier.append(entity) + # + # ordered_created_entities = reversed( + # entities_with_search_identifier + # + order_by_entities_dependencies(entities_without_search_identifier) + # ) + # for entity in ordered_created_entities: + # upsertedEntity = await self.context.port_client.upsert_entity( + # entity, + # event.port_app_config.get_port_request_options(), + # user_agent_type, + # should_raise=False, + # ) + # if upsertedEntity: + # modified_entities.append(upsertedEntity) + # return modified_entities + return [] async def delete( self, entities: list[Entity], user_agent_type: UserAgentType diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index 4402d5b3b0..afbf71c255 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -139,9 +139,9 @@ async def _register_resource_raw( objects_diff = await self._calculate_raw( [(resource, results)], parse_all, send_raw_data_examples_amount ) - # modified_objects = await self.entities_state_applier.upsert( - # objects_diff[0].entity_selector_diff.passed, user_agent_type - # ) + await self.entities_state_applier.upsert( + objects_diff[0].entity_selector_diff.passed, user_agent_type + ) return CalculationResult( objects_diff[0].entity_selector_diff._replace(passed=objects_diff[0].entity_selector_diff.passed), errors=objects_diff[0].errors, diff --git a/pyproject.toml b/pyproject.toml index 881dffa749..60a556be89 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.12.2-dev04" +version = "0.12.2-dev05" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io"