Skip to content

Commit

Permalink
upserts no entities save
Browse files Browse the repository at this point in the history
  • Loading branch information
yairsimantov20 committed Oct 10, 2024
1 parent c5ec0c6 commit de83870
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 74 deletions.
79 changes: 40 additions & 39 deletions port_ocean/clients/port/mixins/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,33 +56,33 @@ async def upsert_entity(
f"entity: {entity.identifier} of "
f"blueprint: {entity.blueprint}"
)
handle_status_code(response, should_raise)
result = response.json()

result_entity = (
Entity.parse_obj(result["entity"]) if result.get("entity") else entity
)

# Happens when upsert fails and search identifier is defined.
# We return None to ignore the entity later in the delete process
if result_entity.is_using_search_identifier:
return None

# In order to save memory we'll keep only the identifier, blueprint and relations of the
# upserted entity result for later calculations
reduced_entity = Entity(
identifier=result_entity.identifier, blueprint=result_entity.blueprint
)

# Turning dict typed relations (raw search relations) is required
# for us to be able to successfully calculate the participation related entities
# and ignore the ones that don't as they weren't upserted
reduced_entity.relations = {
key: None if isinstance(relation, dict) else relation
for key, relation in result_entity.relations.items()
}

return reduced_entity
# handle_status_code(response, should_raise)
# result = response.json()
#
# result_entity = (
# Entity.parse_obj(result["entity"]) if result.get("entity") else entity
# )
#
# # Happens when upsert fails and search identifier is defined.
# # We return None to ignore the entity later in the delete process
# if result_entity.is_using_search_identifier:
# return None
#
# # In order to save memory we'll keep only the identifier, blueprint and relations of the
# # upserted entity result for later calculations
# reduced_entity = Entity(
# identifier=result_entity.identifier, blueprint=result_entity.blueprint
# )
#
# # Turning dict typed relations (raw search relations) is required
# # for us to be able to successfully calculate the participation related entities
# # and ignore the ones that don't as they weren't upserted
# reduced_entity.relations = {
# key: None if isinstance(relation, dict) else relation
# for key, relation in result_entity.relations.items()
# }

return None

async def batch_upsert_entities(
self,
Expand All @@ -91,7 +91,7 @@ async def batch_upsert_entities(
user_agent_type: UserAgentType | None = None,
should_raise: bool = True,
) -> list[Entity]:
modified_entities_results = await asyncio.gather(
await asyncio.gather(
*(
self.upsert_entity(
entity,
Expand All @@ -103,17 +103,18 @@ async def batch_upsert_entities(
),
return_exceptions=True,
)
entity_results = [
entity for entity in modified_entities_results if isinstance(entity, Entity)
]
if not should_raise:
return entity_results

for entity_result in modified_entities_results:
if isinstance(entity_result, Exception):
raise entity_result

return entity_results
return []
# entity_results = [
# entity for entity in modified_entities_results if isinstance(entity, Entity)
# ]
# if not should_raise:
# return entity_results
#
# for entity_result in modified_entities_results:
# if isinstance(entity_result, Exception):
# raise entity_result
#
# return entity_results

async def delete_entity(
self,
Expand Down
63 changes: 32 additions & 31 deletions port_ocean/core/handlers/entities_state_applier/port/applier.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,37 +97,38 @@ async def upsert(
self, entities: list[Entity], user_agent_type: UserAgentType
) -> list[Entity]:
logger.info(f"Upserting {len(entities)} entities")
modified_entities: list[Entity] = []
if event.port_app_config.create_missing_related_entities:
modified_entities = await self.context.port_client.batch_upsert_entities(
entities,
event.port_app_config.get_port_request_options(),
user_agent_type,
should_raise=False,
)
else:
entities_with_search_identifier: list[Entity] = []
entities_without_search_identifier: list[Entity] = []
for entity in entities:
if entity.is_using_search_identifier:
entities_with_search_identifier.append(entity)
else:
entities_without_search_identifier.append(entity)

ordered_created_entities = reversed(
entities_with_search_identifier
+ order_by_entities_dependencies(entities_without_search_identifier)
)
for entity in ordered_created_entities:
upsertedEntity = await self.context.port_client.upsert_entity(
entity,
event.port_app_config.get_port_request_options(),
user_agent_type,
should_raise=False,
)
if upsertedEntity:
modified_entities.append(upsertedEntity)
return modified_entities
# modified_entities: list[Entity] = []
# if event.port_app_config.create_missing_related_entities:
await self.context.port_client.batch_upsert_entities(
entities,
event.port_app_config.get_port_request_options(),
user_agent_type,
should_raise=False,
)
# else:
# entities_with_search_identifier: list[Entity] = []
# entities_without_search_identifier: list[Entity] = []
# for entity in entities:
# if entity.is_using_search_identifier:
# entities_with_search_identifier.append(entity)
# else:
# entities_without_search_identifier.append(entity)
#
# ordered_created_entities = reversed(
# entities_with_search_identifier
# + order_by_entities_dependencies(entities_without_search_identifier)
# )
# for entity in ordered_created_entities:
# upsertedEntity = await self.context.port_client.upsert_entity(
# entity,
# event.port_app_config.get_port_request_options(),
# user_agent_type,
# should_raise=False,
# )
# if upsertedEntity:
# modified_entities.append(upsertedEntity)
# return modified_entities
return []

async def delete(
self, entities: list[Entity], user_agent_type: UserAgentType
Expand Down
6 changes: 3 additions & 3 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ async def _register_resource_raw(
objects_diff = await self._calculate_raw(
[(resource, results)], parse_all, send_raw_data_examples_amount
)
# modified_objects = await self.entities_state_applier.upsert(
# objects_diff[0].entity_selector_diff.passed, user_agent_type
# )
await self.entities_state_applier.upsert(
objects_diff[0].entity_selector_diff.passed, user_agent_type
)
return CalculationResult(
objects_diff[0].entity_selector_diff._replace(passed=objects_diff[0].entity_selector_diff.passed),
errors=objects_diff[0].errors,
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "port-ocean"
version = "0.12.2-dev04"
version = "0.12.2-dev05"
description = "Port Ocean is a CLI tool for managing your Port projects."
readme = "README.md"
homepage = "https://app.getport.io"
Expand Down

0 comments on commit de83870

Please sign in to comment.