Skip to content

Commit

Permalink
Pack down redundant code in the orchestrator
Browse files Browse the repository at this point in the history
  • Loading branch information
TheByronHimes committed Mar 8, 2024
1 parent cb405c8 commit 1ba97d2
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 101 deletions.
92 changes: 40 additions & 52 deletions src/nos/core/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

import logging

from nos.config import Config
from nos.core import notifications
from nos.core.models import User
from nos.ports.inbound.orchestrator import OrchestratorPort
from nos.ports.outbound.dao import ResourceNotFoundError, UserDaoPort
from nos.ports.outbound.notification_emitter import NotificationEmitterPort
Expand All @@ -34,27 +36,28 @@ class Orchestrator(OrchestratorPort):
def __init__(
self,
*,
config,
config: Config,
user_dao: UserDaoPort,
notification_emitter: NotificationEmitterPort,
):
self._config = config
self._user_dao = user_dao
self._notification_emitter = notification_emitter

async def process_access_request_created(self, *, user_id: str, dataset_id: str):
"""Processes an Access Request Created event.
One notification is sent to the data requester to confirm that their request
was created.
Another notification is sent to the data steward to inform them of the request.
async def process_access_request_notification(
self, *, event_type: str, user_id: str, dataset_id: str
):
"""Handle notifications for access requests.
Raises:
- MissingUserError: When the provided user ID does not exist in the DB.
- NotificationInterpolationError: When there is a problem with the values
used to perform the notification text interpolation.
- MissingUserError:
When the provided user ID does not exist in the DB.
"""
method_map = {
self._config.access_request_created_type: self._access_request_created,
self._config.access_request_allowed_type: self._access_request_allowed,
self._config.access_request_denied_type: self._access_request_denied,
}
extra = { # for error logging
"user_id": user_id,
"dataset_id": dataset_id,
Expand All @@ -70,6 +73,21 @@ async def process_access_request_created(self, *, user_id: str, dataset_id: str)
log.error(error, extra=extra)
raise error from err

await method_map[event_type](user=user, dataset_id=dataset_id)

async def _access_request_created(self, *, user: User, dataset_id: str):
"""Processes an Access Request Created event.
One notification is sent to the data requester to confirm that their request
was created.
Another notification is sent to the data steward to inform them of the request.
Raises:
- NotificationInterpolationError:
When there is a problem with the values used to perform the notification
text interpolation.
"""
# Send a confirmation email notification to the Data Requester
await self._notification_emitter.notify(
email=user.email,
Expand All @@ -82,15 +100,15 @@ async def process_access_request_created(self, *, user_id: str, dataset_id: str)

# Send a notification to the data steward
await self._notification_emitter.notify(
email=self._config.central_data_steward_email,
email=self._config.central_data_stewardship_email,
full_name=DATA_STEWARD_NAME,
notification=notifications.ACCESS_REQUEST_CREATED_TO_DS.formatted(
full_user_name=user.name, email=user.email, dataset_id=dataset_id
),
)
log.info("Sent Access Request Created notification to data steward")

async def process_access_request_allowed(self, *, user_id: str, dataset_id: str):
async def _access_request_allowed(self, *, user: User, dataset_id: str):
"""Process an Access Request Allowed event.
One notification is sent to the data requester to inform them that the request
Expand All @@ -100,25 +118,10 @@ async def process_access_request_allowed(self, *, user_id: str, dataset_id: str)
was allowed.
Raises:
- MissingUserError: When the provided user ID does not exist in the DB.
- NotificationInterpolationError: When there is a problem with the values
used to perform the notification text interpolation.
- NotificationInterpolationError:
When there is a problem with the values used to perform the notification
text interpolation.
"""
extra = { # for error logging
"user_id": user_id,
"dataset_id": dataset_id,
"notification_name": "Access Request Allowed",
}

try:
user = await self._user_dao.get_by_id(user_id)
except ResourceNotFoundError as err:
error = self.MissingUserError(
user_id=user_id, notification_name=extra["notification_name"]
)
log.error(error, extra=extra)
raise error from err

# Send a notification to the data requester
await self._notification_emitter.notify(
email=user.email,
Expand All @@ -131,15 +134,15 @@ async def process_access_request_allowed(self, *, user_id: str, dataset_id: str)

# Send a confirmation email to the data steward
await self._notification_emitter.notify(
email=self._config.central_data_steward_email,
email=self._config.central_data_stewardship_email,
full_name=DATA_STEWARD_NAME,
notification=notifications.ACCESS_REQUEST_ALLOWED_TO_DS.formatted(
full_user_name=user.name, dataset_id=dataset_id
),
)
log.info("Sent Access Request Allowed notification to data steward")

async def process_access_request_denied(self, *, user_id: str, dataset_id: str):
async def _access_request_denied(self, *, user: User, dataset_id: str):
"""Process an Access Request Denied event.
One notification is sent to the data requester telling them that the request
Expand All @@ -148,25 +151,10 @@ async def process_access_request_denied(self, *, user_id: str, dataset_id: str):
Another confirmation notification is sent to the data steward.
Raises:
- MissingUserError: When the provided user ID does not exist in the DB.
- NotificationInterpolationError: When there is a problem with the values
used to perform the notification text interpolation.
- NotificationInterpolationError:
When there is a problem with the values used to perform the notification
text interpolation.
"""
extra = { # for error logging
"user_id": user_id,
"dataset_id": dataset_id,
"notification_name": "Access Request Denied",
}

try:
user = await self._user_dao.get_by_id(user_id)
except ResourceNotFoundError as err:
error = self.MissingUserError(
user_id=user_id, notification_name=extra["notification_name"]
)
log.error(error, extra=extra)
raise error from err

# Send a notification to the data requester
await self._notification_emitter.notify(
email=user.email,
Expand All @@ -179,7 +167,7 @@ async def process_access_request_denied(self, *, user_id: str, dataset_id: str):

# Send a confirmation email to the data steward
await self._notification_emitter.notify(
email=self._config.central_data_steward_email,
email=self._config.central_data_stewardship_email,
full_name=DATA_STEWARD_NAME,
notification=notifications.ACCESS_REQUEST_DENIED_TO_DS.formatted(
full_user_name=user.name, dataset_id=dataset_id
Expand Down
47 changes: 6 additions & 41 deletions src/nos/ports/inbound/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,47 +36,12 @@ def __init__(self, *, user_id: str, notification_name: str) -> None:
super().__init__(message)

@abstractmethod
async def process_access_request_created(self, *, user_id: str, dataset_id: str):
"""Processes an Access Request Created event.
One notification is sent to the data requester to confirm that their request
was created.
Another notification is sent to the data steward to inform them of the request.
Raises:
- MissingUserError: When the provided user ID does not exist in the DB.
- NotificationInterpolationError: When there is a problem with the values
used to perform the notification text interpolation.
"""

@abstractmethod
async def process_access_request_allowed(self, *, user_id: str, dataset_id: str):
"""Process an Access Request Allowed event.
One notification is sent to the data requester to inform them that the request
has been approved/allowed.
Another notification is sent to the data steward confirming that the request
was allowed.
Raises:
- MissingUserError: When the provided user ID does not exist in the DB.
- NotificationInterpolationError: When there is a problem with the values
used to perform the notification text interpolation.
"""

@abstractmethod
async def process_access_request_denied(self, *, user_id: str, dataset_id: str):
"""Process an Access Request Denied event.
One notification is sent to the data requester telling them that the request
was denied.
Another confirmation notification is sent to the data steward.
async def process_access_request_notification(
self, *, event_type: str, user_id: str, dataset_id: str
):
"""Handle notifications for access requests.
Raises:
- MissingUserError: When the provided user ID does not exist in the DB.
- NotificationInterpolationError: When there is a problem with the values
used to perform the notification text interpolation.
- MissingUserError:
When the provided user ID does not exist in the DB.
"""
17 changes: 9 additions & 8 deletions src/nos/translators/inbound/event_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,23 @@ def __init__(
]
self._config = config
self._orchestrator = orchestrator
self._access_request_map = {
config.access_request_created_type: self._orchestrator.process_access_request_created,
config.access_request_allowed_type: self._orchestrator.process_access_request_allowed,
config.access_request_denied_type: self._orchestrator.process_access_request_denied,
}

async def _handle_access_request(self, type_: str, payload: JsonObject) -> None:
"""Send notifications for an access request-related event."""
validated_payload = get_validated_payload(payload, AccessRequestDetails)
await self._access_request_map[type_](
user_id=validated_payload.user_id, dataset_id=validated_payload.dataset_id
await self._orchestrator.process_access_request_notification(
event_type=type_,
user_id=validated_payload.user_id,
dataset_id=validated_payload.dataset_id,
)

async def _consume_validated(
self, *, payload: JsonObject, type_: Ascii, topic: Ascii
) -> None:
"""Consumes an event"""
if type_ in self._access_request_map:
if type_ in (
self._config.access_request_created_type,
self._config.access_request_allowed_type,
self._config.access_request_denied_type,
):
await self._handle_access_request(type_, payload)

0 comments on commit 1ba97d2

Please sign in to comment.