Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Outbox Pattern (GSI-742) #6

Merged
merged 11 commits into from
May 13, 2024
3 changes: 2 additions & 1 deletion .devcontainer/.dev_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,6 @@ notification_event_type: notification
notification_event_topic: notification_events
file_registered_event_topic: internal_file_registry
file_registered_event_type: file_registered
iva_events_topic: ivas
iva_state_changed_event_topic: ivas
iva_state_changed_event_type: iva_state_changed
user_events_topic: users
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ repos:
- id: no-commit-to-branch
args: [--branch, dev, --branch, int, --branch, main]
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.4.3
rev: v0.4.4
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ The service requires the following configuration parameters:
```


- **`user_events_topic`** *(string)*: The name of the topic containing user events. Default: `"users"`.

- **`access_request_events_topic`** *(string)*: Name of the event topic used to consume access request events.


Expand Down Expand Up @@ -151,7 +153,7 @@ The service requires the following configuration parameters:
```


- **`iva_events_topic`** *(string)*: The name of the topic containing IVA events.
- **`iva_state_changed_event_topic`** *(string)*: The name of the topic containing IVA events.


Examples:
Expand Down
12 changes: 9 additions & 3 deletions config_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@
"title": "Notification Event Type",
"type": "string"
},
"user_events_topic": {
"default": "users",
"description": "The name of the topic containing user events.",
"title": "User Events Topic",
"type": "string"
},
"access_request_events_topic": {
"description": "Name of the event topic used to consume access request events",
"examples": [
Expand Down Expand Up @@ -84,12 +90,12 @@
"title": "File Registered Event Type",
"type": "string"
},
"iva_events_topic": {
"iva_state_changed_event_topic": {
"description": "The name of the topic containing IVA events.",
"examples": [
"ivas"
],
"title": "Iva Events Topic",
"title": "Iva State Changed Event Topic",
"type": "string"
},
"iva_state_changed_event_type": {
Expand Down Expand Up @@ -221,7 +227,7 @@
"access_request_denied_event_type",
"file_registered_event_topic",
"file_registered_event_type",
"iva_events_topic",
"iva_state_changed_event_topic",
"iva_state_changed_event_type",
"service_instance_id",
"kafka_servers",
Expand Down
3 changes: 2 additions & 1 deletion example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ db_name: users
file_registered_event_topic: internal_file_registry
file_registered_event_type: file_registered
generate_correlation_id: true
iva_events_topic: ivas
iva_state_changed_event_topic: ivas
iva_state_changed_event_type: iva_state_changed
kafka_security_protocol: PLAINTEXT
kafka_servers:
Expand All @@ -23,3 +23,4 @@ notification_event_topic: notification_events
notification_event_type: notification
service_instance_id: '1'
service_name: nos
user_events_topic: users
418 changes: 209 additions & 209 deletions lock/requirements-dev.txt

Large diffs are not rendered by default.

346 changes: 173 additions & 173 deletions lock/requirements.txt

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion src/nos/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
from hexkit.providers.mongodb import MongoDbConfig
from pydantic import Field

from nos.translators.inbound.event_sub import EventSubTranslatorConfig
from nos.translators.inbound.event_sub import (
EventSubTranslatorConfig,
OutboxSubTranslatorConfig,
)
from nos.translators.outbound.event_pub import NotificationEmitterConfig

SERVICE_NAME: str = "nos"
Expand All @@ -32,6 +35,7 @@ class Config(
LoggingConfig,
KafkaConfig,
EventSubTranslatorConfig,
OutboxSubTranslatorConfig,
NotificationEmitterConfig,
MongoDbConfig,
):
Expand Down
58 changes: 0 additions & 58 deletions src/nos/core/models.py

This file was deleted.

26 changes: 9 additions & 17 deletions src/nos/core/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,6 @@

log = logging.getLogger(__name__)

__all__ = [
"Notification",
"ACCESS_REQUEST_CREATED_TO_USER",
"ACCESS_REQUEST_CREATED_TO_DS",
"ACCESS_REQUEST_ALLOWED_TO_USER",
"ACCESS_REQUEST_ALLOWED_TO_DS",
"ACCESS_REQUEST_DENIED_TO_USER",
"ACCESS_REQUEST_DENIED_TO_DS",
"FILE_REGISTERED_TO_DS",
"ALL_IVAS_INVALIDATED_TO_USER",
"IVA_CODE_REQUESTED_TO_USER",
"IVA_CODE_REQUESTED_TO_DS",
"IVA_CODE_TRANSMITTED_TO_USER",
"IVA_CODE_SUBMITTED_TO_DS",
"IVA_UNVERIFIED_TO_DS",
]


class NotificationError(RuntimeError):
"""Raised for notification-related errors."""
Expand Down Expand Up @@ -187,3 +170,12 @@ def formatted(self, **kwargs) -> "Notification":
The specified contact email address is: {email}.
""",
)

USER_REREGISTERED_TO_USER = Notification(
"Account Details Changed",
"""
Your account details were recently updated. The changed details include: {changed_details}.

If you did not make these changes or have questions, please contact the GHGA immediately at {support_email}.
""",
)
69 changes: 61 additions & 8 deletions src/nos/core/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

import logging
from collections.abc import Callable
from contextlib import suppress
from functools import partial

from ghga_event_schemas import pydantic_ as event_schemas

from nos.config import Config
from nos.core import notifications
from nos.core.models import User
from nos.ports.inbound.orchestrator import OrchestratorPort
from nos.ports.outbound.dao import ResourceNotFoundError, UserDaoPort
from nos.ports.outbound.notification_emitter import NotificationEmitterPort
Expand Down Expand Up @@ -79,7 +79,9 @@ async def process_access_request_notification(

await method_map[event_type](user=user, dataset_id=dataset_id)

async def _access_request_created(self, *, user: User, dataset_id: str):
async def _access_request_created(
self, *, user: event_schemas.User, dataset_id: str
):
"""Processes an Access Request Created event.

One notification is sent to the data requester to confirm that their request
Expand Down Expand Up @@ -112,7 +114,9 @@ async def _access_request_created(self, *, user: User, dataset_id: str):
)
log.info("Sent Access Request Created notification to data steward")

async def _access_request_allowed(self, *, user: User, dataset_id: str):
async def _access_request_allowed(
self, *, user: event_schemas.User, dataset_id: str
):
"""Process an Access Request Allowed event.

One notification is sent to the data requester to inform them that the request
Expand Down Expand Up @@ -146,7 +150,9 @@ async def _access_request_allowed(self, *, user: User, dataset_id: str):
)
log.info("Sent Access Request Allowed notification to data steward")

async def _access_request_denied(self, *, user: User, dataset_id: str):
async def _access_request_denied(
self, *, user: event_schemas.User, dataset_id: str
):
"""Process an Access Request Denied event.

One notification is sent to the data requester telling them that the request
Expand Down Expand Up @@ -189,7 +195,7 @@ async def process_file_registered_notification(self, *, file_id: str):
)
log.info("Sent File Upload Completed notification to data steward")

async def _iva_code_requested(self, *, user: User):
async def _iva_code_requested(self, *, user: event_schemas.User):
"""Send notifications relaying that an IVA code has been requested.

One notification is sent to the user to confirm that their request was received.
Expand All @@ -212,15 +218,15 @@ async def _iva_code_requested(self, *, user: User):
),
)

async def _iva_code_transmitted(self, *, user: User):
async def _iva_code_transmitted(self, *, user: event_schemas.User):
"""Send a notification that an IVA code has been transmitted to the user."""
await self._notification_emitter.notify(
email=user.email,
full_name=user.name,
notification=notifications.IVA_CODE_TRANSMITTED_TO_USER,
)

async def _iva_code_validated(self, *, user: User):
async def _iva_code_validated(self, *, user: event_schemas.User):
"""Send a notification to the data steward that an IVA code has been validated."""
await self._notification_emitter.notify(
email=self._config.central_data_stewardship_email,
Expand All @@ -234,7 +240,7 @@ async def _iva_unverified(
self,
*,
iva_type: str,
user: User,
user: event_schemas.User,
):
"""Send notifications for IVAs set to 'unverified'.

Expand Down Expand Up @@ -320,3 +326,50 @@ async def process_iva_state_change(self, *, user_iva: event_schemas.UserIvaState
raise error from err

await method_map[user_iva.state][0](user=user)

def _changed_info(
self, existing_user: event_schemas.User, new_user: event_schemas.User
) -> str:
"""Check what critical user information has changed (if any).

Critical information includes the user's email address and name.
"""
changed = []
for field in ["email", "name"]:
if getattr(existing_user, field) != getattr(new_user, field):
changed.append(field)
return " and ".join(changed)

async def upsert_user_data(
self, resource_id: str, update: event_schemas.User
) -> None:
"""Upsert the user data.

This method will also examine the user data and send out notifications for
user re-registration.
"""
with suppress(ResourceNotFoundError):
existing_user = await self._user_dao.get_by_id(resource_id)
if changed_details := self._changed_info(existing_user, update):
await self._notification_emitter.notify(
email=existing_user.email,
full_name=update.name,
notification=notifications.USER_REREGISTERED_TO_USER.formatted(
support_email=self._config.central_data_stewardship_email,
changed_details=changed_details,
),
)
log.info("Sent User Re-registered notification to user")
await self._user_dao.upsert(dto=update)

async def delete_user_data(self, resource_id: str) -> None:
"""Delete the user data.

In the case that the user ID does not exist in the database, this method will
log the fact but not raise an error.
"""
try:
await self._user_dao.delete(resource_id)
except ResourceNotFoundError:
# do not raise an error if the user is not found, just log it.
log.warning("User not found for deletion", extra={"user_id": resource_id})
35 changes: 33 additions & 2 deletions src/nos/inject.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,20 @@
from contextlib import asynccontextmanager

from ghga_service_commons.utils.context import asyncnullcontext
from hexkit.providers.akafka import KafkaEventPublisher, KafkaEventSubscriber
from hexkit.providers.akafka import (
KafkaEventPublisher,
KafkaEventSubscriber,
KafkaOutboxSubscriber,
)
from hexkit.providers.mongodb import MongoDbDaoFactory

from nos.config import Config
from nos.core.orchestrator import Orchestrator
from nos.ports.inbound.orchestrator import OrchestratorPort
from nos.translators.inbound.event_sub import EventSubTranslator
from nos.translators.inbound.event_sub import (
EventSubTranslator,
OutboxSubTranslator,
)
from nos.translators.outbound.dao import user_dao_factory
from nos.translators.outbound.event_pub import NotificationEmitter

Expand Down Expand Up @@ -87,3 +94,27 @@ async def prepare_event_subscriber(
config=config, translator=event_sub_translator
) as event_subscriber:
yield event_subscriber


@asynccontextmanager
async def prepare_outbox_subscriber(
*,
config: Config,
core_override: OrchestratorPort | None = None,
) -> AsyncGenerator[KafkaOutboxSubscriber, None]:
"""Construct and initialize an outbox subscriber with all its dependencies.
By default, the core dependencies are automatically prepared but you can also
provide them using the override parameter.
"""
async with prepare_core_with_override(
config=config, core_override=core_override
) as orchestrator:
outbox_sub_translator = OutboxSubTranslator(
orchestrator=orchestrator,
config=config,
)

async with KafkaOutboxSubscriber.construct(
config=config, translators=[outbox_sub_translator]
) as outbox_subscriber:
yield outbox_subscriber
Loading