diff --git a/services/ucs/src/ucs/adapters/inbound/event_sub.py b/services/ucs/src/ucs/adapters/inbound/event_sub.py index 24d757d5..4a8f2cb7 100644 --- a/services/ucs/src/ucs/adapters/inbound/event_sub.py +++ b/services/ucs/src/ucs/adapters/inbound/event_sub.py @@ -163,7 +163,8 @@ async def _consume_validated( *, payload: JsonObject, type_: Ascii, - topic: Ascii, # pylint: disable=unused-argument + topic: Ascii, + key: Ascii, ) -> None: """Consume events from the topics of interest.""" if type_ == self._config.file_metadata_event_type: diff --git a/services/ucs/src/ucs/adapters/inbound/fastapi_/rest_models.py b/services/ucs/src/ucs/adapters/inbound/fastapi_/rest_models.py index 4372d9a0..729b4346 100644 --- a/services/ucs/src/ucs/adapters/inbound/fastapi_/rest_models.py +++ b/services/ucs/src/ucs/adapters/inbound/fastapi_/rest_models.py @@ -14,6 +14,7 @@ # limitations under the License. """REST API-specific data models (not used by core package)""" + from pydantic import ConfigDict try: # workaround for https://github.com/pydantic/pydantic/issues/5821 diff --git a/services/ucs/src/ucs/ports/inbound/file_service.py b/services/ucs/src/ucs/ports/inbound/file_service.py index 6f30d032..953c16d6 100644 --- a/services/ucs/src/ucs/ports/inbound/file_service.py +++ b/services/ucs/src/ucs/ports/inbound/file_service.py @@ -15,7 +15,6 @@ """Interfaces for the main upload handling logic of this service.""" - from abc import ABC, abstractmethod from collections.abc import Iterable, Sequence diff --git a/services/ucs/tests_ucs/conftest.py b/services/ucs/tests_ucs/conftest.py new file mode 100644 index 00000000..caf21427 --- /dev/null +++ b/services/ucs/tests_ucs/conftest.py @@ -0,0 +1,32 @@ +# Copyright 2021 - 2023 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln +# for the German Human Genome-Phenome Archive (GHGA) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Set up session-scope fixtures for tests.""" + +from hexkit.providers.akafka.testutils import ( # noqa: F401 + kafka_container_fixture, + kafka_fixture, +) +from hexkit.providers.mongodb.testutils import ( # noqa: F401 + mongodb_container_fixture, + mongodb_fixture, +) +from hexkit.providers.s3.testutils import ( # noqa: F401 + s3_container_fixture, + s3_fixture, +) + +from tests_ucs.fixtures.joint import get_joint_fixture + +joint_fixture = get_joint_fixture() diff --git a/services/ucs/tests_ucs/fixtures/config.py b/services/ucs/tests_ucs/fixtures/config.py index 665bb0af..017d9115 100644 --- a/services/ucs/tests_ucs/fixtures/config.py +++ b/services/ucs/tests_ucs/fixtures/config.py @@ -39,7 +39,7 @@ def get_config( for source in sources: sources_dict.update(**source.model_dump()) - return Config(config_yaml=default_config_yaml, **sources_dict) # type: ignore + return Config(config_yaml=default_config_yaml, **sources_dict) DEFAULT_CONFIG = get_config() diff --git a/services/ucs/tests_ucs/fixtures/example_data.py b/services/ucs/tests_ucs/fixtures/example_data.py index ca338c1c..e355797a 100644 --- a/services/ucs/tests_ucs/fixtures/example_data.py +++ b/services/ucs/tests_ucs/fixtures/example_data.py @@ -93,7 +93,7 @@ class UploadDetails: submission_metadata=FILE_TO_REGISTER_1, ) UPLOAD_DETAILS_2 = UploadDetails( - storage_alias=STORAGE_ALIASES[1], + storage_alias=STORAGE_ALIASES[0], file_metadata=EXAMPLE_FILE_2, upload_attempt=EXAMPLE_UPLOAD_2, submission_metadata=FILE_TO_REGISTER_2, diff --git a/services/ucs/tests_ucs/fixtures/joint.py b/services/ucs/tests_ucs/fixtures/joint.py index 70adbac7..d5a26ada 100644 --- a/services/ucs/tests_ucs/fixtures/joint.py +++ b/services/ucs/tests_ucs/fixtures/joint.py @@ -15,14 +15,7 @@ """Join the functionality of all fixtures for API-level integration testing.""" -__all__ = [ - "joint_fixture", - "JointFixture", - "mongodb_fixture", - "kafka_fixture", - "s3_fixture", - "second_s3_fixture", -] +__all__ = ["JointFixture", "get_joint_fixture"] from collections.abc import AsyncGenerator from dataclasses import dataclass @@ -35,9 +28,9 @@ S3ObjectStoragesConfig, ) from hexkit.providers.akafka import KafkaEventSubscriber -from hexkit.providers.akafka.testutils import KafkaFixture, get_kafka_fixture -from hexkit.providers.mongodb.testutils import MongoDbFixture, get_mongodb_fixture -from hexkit.providers.s3.testutils import S3Fixture, get_s3_fixture +from hexkit.providers.akafka.testutils import KafkaFixture +from hexkit.providers.mongodb.testutils import MongoDbFixture +from hexkit.providers.s3.testutils import S3Fixture from pytest_asyncio.plugin import _ScopeName from ucs.adapters.outbound.dao import DaoCollectionTranslator from ucs.config import Config @@ -69,23 +62,14 @@ class JointFixture: mongodb: MongoDbFixture kafka: KafkaFixture s3: S3Fixture - second_s3: S3Fixture bucket_id: str inbox_inspector: StorageInspectorPort - async def reset_state(self): - """Completely reset fixture states""" - await self.s3.empty_buckets() - await self.second_s3.empty_buckets() - self.mongodb.empty_collections() - self.kafka.clear_topics() - async def joint_fixture_function( - mongodb_fixture: MongoDbFixture, - kafka_fixture: KafkaFixture, - s3_fixture: S3Fixture, - second_s3_fixture: S3Fixture, + mongodb: MongoDbFixture, + kafka: KafkaFixture, + s3: S3Fixture, ) -> AsyncGenerator[JointFixture, None]: """A fixture that embeds all other fixtures for API-level integration testing. @@ -93,29 +77,20 @@ async def joint_fixture_function( """ bucket_id = "test-inbox" - node_config = S3ObjectStorageNodeConfig( - bucket=bucket_id, credentials=s3_fixture.config - ) - second_node_config = S3ObjectStorageNodeConfig( - bucket=bucket_id, credentials=second_s3_fixture.config - ) + node_config = S3ObjectStorageNodeConfig(bucket=bucket_id, credentials=s3.config) object_storages_config = S3ObjectStoragesConfig( object_storages={ STORAGE_ALIASES[0]: node_config, - STORAGE_ALIASES[1]: second_node_config, } ) # merge configs from different sources with the default one: - config = get_config( - sources=[mongodb_fixture.config, kafka_fixture.config, object_storages_config] - ) + config = get_config(sources=[mongodb.config, kafka.config, object_storages_config]) - daos = await DaoCollectionTranslator.construct(provider=mongodb_fixture.dao_factory) - await s3_fixture.populate_buckets([bucket_id]) - await second_s3_fixture.populate_buckets([bucket_id]) + daos = await DaoCollectionTranslator.construct(provider=mongodb.dao_factory) + await s3.populate_buckets([bucket_id]) - # create a DI container instance:translators + # Assemble joint fixture with config injection async with ( prepare_core(config=config) as ( upload_service, @@ -139,10 +114,9 @@ async def joint_fixture_function( file_metadata_service=file_metadata_service, rest_client=rest_client, event_subscriber=event_subscriber, - mongodb=mongodb_fixture, - kafka=kafka_fixture, - s3=s3_fixture, - second_s3=second_s3_fixture, + mongodb=mongodb, + kafka=kafka, + s3=s3, bucket_id=bucket_id, inbox_inspector=inbox_inspector, ) @@ -151,10 +125,3 @@ async def joint_fixture_function( def get_joint_fixture(scope: _ScopeName = "function"): """Produce a joint fixture with desired scope""" return pytest_asyncio.fixture(joint_fixture_function, scope=scope) - - -joint_fixture = get_joint_fixture() -mongodb_fixture = get_mongodb_fixture() -kafka_fixture = get_kafka_fixture() -s3_fixture = get_s3_fixture() -second_s3_fixture = get_s3_fixture() diff --git a/services/ucs/tests_ucs/fixtures/module_scope_fixtures.py b/services/ucs/tests_ucs/fixtures/module_scope_fixtures.py deleted file mode 100644 index 0a2e1546..00000000 --- a/services/ucs/tests_ucs/fixtures/module_scope_fixtures.py +++ /dev/null @@ -1,42 +0,0 @@ -# Copyright 2021 - 2023 Universität Tübingen, DKFZ, EMBL, and Universität zu Köln -# for the German Human Genome-Phenome Archive (GHGA) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Contains module-scoped fixtures""" - -import asyncio - -import pytest -from hexkit.providers.akafka.testutils import get_kafka_fixture -from hexkit.providers.mongodb.testutils import get_mongodb_fixture -from hexkit.providers.s3.testutils import get_s3_fixture - -from tests_ucs.fixtures.joint import JointFixture, get_joint_fixture - - -@pytest.fixture(autouse=True, scope="function") -def reset_state(joint_fixture: JointFixture): - """Clear joint_fixture state before tests that use this fixture. - - This is a function-level fixture because it needs to run in each test. - """ - loop = asyncio.get_event_loop() - loop.run_until_complete(joint_fixture.reset_state()) - - -mongodb_fixture = get_mongodb_fixture("module") -kafka_fixture = get_kafka_fixture("module") -s3_fixture = get_s3_fixture("module") -second_s3_fixture = get_s3_fixture("module") -joint_fixture = get_joint_fixture("module") diff --git a/services/ucs/tests_ucs/test_edge_cases.py b/services/ucs/tests_ucs/test_edge_cases.py index c7f65527..c6483895 100644 --- a/services/ucs/tests_ucs/test_edge_cases.py +++ b/services/ucs/tests_ucs/test_edge_cases.py @@ -28,22 +28,14 @@ from hexkit.providers.s3.testutils import upload_part_via_url from ucs.core import models -from tests_ucs.fixtures.example_data import UPLOAD_DETAILS_1, UPLOAD_DETAILS_2 -from tests_ucs.fixtures.module_scope_fixtures import ( # noqa: F401 - JointFixture, - joint_fixture, - kafka_fixture, - mongodb_fixture, - reset_state, - s3_fixture, - second_s3_fixture, -) +from tests_ucs.fixtures.example_data import UPLOAD_DETAILS_1 +from tests_ucs.fixtures.joint import JointFixture pytestmark = pytest.mark.asyncio() async def create_multipart_upload_with_data( - joint_fixture: JointFixture, # noqa: F811 + joint_fixture: JointFixture, file_to_register: event_schemas.MetadataSubmissionFiles, storage_alias: str, ): @@ -88,7 +80,7 @@ async def create_multipart_upload_with_data( return upload_details["object_id"] -async def test_get_health(joint_fixture: JointFixture): # noqa: F811 +async def test_get_health(joint_fixture: JointFixture): """Test the GET /health endpoint. reset_state fixture isn't needed because the test is unaffected by state. @@ -99,7 +91,7 @@ async def test_get_health(joint_fixture: JointFixture): # noqa: F811 assert response.json() == {"status": "OK"} -async def test_get_file_metadata_not_found(joint_fixture: JointFixture): # noqa: F811 +async def test_get_file_metadata_not_found(joint_fixture: JointFixture): """Test the get_file_metadata endpoint with an non-existing file id.""" file_id = "myNonExistingFile001" response = await joint_fixture.rest_client.get(f"/files/{file_id}") @@ -108,7 +100,7 @@ async def test_get_file_metadata_not_found(joint_fixture: JointFixture): # noqa assert response.json()["exception_id"] == "fileNotRegistered" -async def test_create_upload_not_found(joint_fixture: JointFixture): # noqa: F811 +async def test_create_upload_not_found(joint_fixture: JointFixture): """Test the create_upload endpoint with an non-existing file id.""" file_id = "myNonExistingFile001" response = await joint_fixture.rest_client.post( @@ -137,7 +129,7 @@ async def test_create_upload_not_found(joint_fixture: JointFixture): # noqa: F8 ) async def test_create_upload_other_active( existing_status: models.UploadStatus, - joint_fixture: JointFixture, # noqa: F811 + joint_fixture: JointFixture, ): """Test the create_upload endpoint when there is another active update already existing. @@ -176,7 +168,7 @@ async def test_create_upload_other_active( ) async def test_create_upload_accepted( existing_status: models.UploadStatus, - joint_fixture: JointFixture, # noqa: F811 + joint_fixture: JointFixture, ): """Test the create_upload endpoint when another update has already been accepted or is currently being evaluated. @@ -207,9 +199,7 @@ async def test_create_upload_accepted( ) -async def test_create_upload_unknown_storage( - joint_fixture: JointFixture, # noqa: F811 -): +async def test_create_upload_unknown_storage(joint_fixture: JointFixture): """Test the create_upload endpoint with storage_alias missing in the request body""" # insert upload metadata into the database: await joint_fixture.daos.file_metadata.insert(UPLOAD_DETAILS_1.file_metadata) @@ -229,7 +219,7 @@ async def test_create_upload_unknown_storage( assert response_body["exception_id"] == "noSuchStorage" -async def test_get_upload_not_found(joint_fixture: JointFixture): # noqa: F811 +async def test_get_upload_not_found(joint_fixture: JointFixture): """Test the get_upload endpoint with non-existing upload ID.""" upload_id = "myNonExistingUpload001" response = await joint_fixture.rest_client.get(f"/uploads/{upload_id}") @@ -238,9 +228,7 @@ async def test_get_upload_not_found(joint_fixture: JointFixture): # noqa: F811 assert response.json()["exception_id"] == "noSuchUpload" -async def test_update_upload_status_not_found( - joint_fixture: JointFixture, # noqa: F811 -): +async def test_update_upload_status_not_found(joint_fixture: JointFixture): """Test the update_upload_status endpoint with non existing upload ID.""" upload_id = "myNonExistingUpload001" @@ -262,7 +250,7 @@ async def test_update_upload_status_not_found( ) async def test_update_upload_status_invalid_new_status( new_status: models.UploadStatus, - joint_fixture: JointFixture, # noqa: F811 + joint_fixture: JointFixture, ): """Test the update_upload_status endpoint with invalid new status values.""" upload_id = "myNonExistingUpload001" @@ -286,7 +274,7 @@ async def test_update_upload_status_invalid_new_status( ) async def test_update_upload_status_non_pending( old_status: models.UploadStatus, - joint_fixture: JointFixture, # noqa: F811 + joint_fixture: JointFixture, ): """Test the update_upload_status endpoint on non pending upload.""" target_upload = UPLOAD_DETAILS_1.upload_attempt.model_copy( @@ -308,9 +296,7 @@ async def test_update_upload_status_non_pending( assert response_body["data"]["current_upload_status"] == old_status.value -async def test_create_presigned_url_not_found( - joint_fixture: JointFixture, # noqa: F811 -): +async def test_create_presigned_url_not_found(joint_fixture: JointFixture): """Test the create_presigned_url endpoint with non existing upload ID.""" upload_id = "myNonExistingUpload001" @@ -322,76 +308,69 @@ async def test_create_presigned_url_not_found( assert response.json()["exception_id"] == "noSuchUpload" -async def test_deletion_upload_ongoing(joint_fixture: JointFixture): # noqa: F811 +async def test_deletion_upload_ongoing(joint_fixture: JointFixture): """Test file data deletion while upload is still ongoing. This mainly tests if abort multipart upload worked correctly in the deletion context. """ - for s3, upload_details in zip( - (joint_fixture.s3, joint_fixture.second_s3), - (UPLOAD_DETAILS_1, UPLOAD_DETAILS_2), - ): - storage_alias = upload_details.storage_alias - file_to_register = upload_details.submission_metadata - file_id = file_to_register.file_id - - inbox_object_id = await create_multipart_upload_with_data( - joint_fixture=joint_fixture, - file_to_register=file_to_register, - storage_alias=storage_alias, - ) + storage_alias = UPLOAD_DETAILS_1.storage_alias + file_to_register = UPLOAD_DETAILS_1.submission_metadata + file_id = file_to_register.file_id - # Verify everything that should exist is present - assert not await s3.storage.does_object_exist( + inbox_object_id = await create_multipart_upload_with_data( + joint_fixture=joint_fixture, + file_to_register=file_to_register, + storage_alias=storage_alias, + ) + + # Verify everything that should exist is present + assert not await joint_fixture.s3.storage.does_object_exist( + bucket_id=joint_fixture.bucket_id, object_id=inbox_object_id + ) + with suppress(joint_fixture.s3.storage.MultiPartUploadAlreadyExistsError): + await joint_fixture.s3.storage._assert_no_multipart_upload( bucket_id=joint_fixture.bucket_id, object_id=inbox_object_id ) - with suppress(s3.storage.MultiPartUploadAlreadyExistsError): - await s3.storage._assert_no_multipart_upload( - bucket_id=joint_fixture.bucket_id, object_id=inbox_object_id - ) - assert await joint_fixture.daos.file_metadata.get_by_id(id_=file_id) - - num_attempts = 0 - async for _ in joint_fixture.daos.upload_attempts.find_all( - mapping={"file_id": file_id} - ): - num_attempts += 1 - assert num_attempts == 1 - - # Request deletion - deletion_event = event_schemas.FileDeletionRequested(file_id=file_id) - await joint_fixture.kafka.publish_event( - payload=json.loads(deletion_event.model_dump_json()), - type_=joint_fixture.config.files_to_delete_type, - topic=joint_fixture.config.files_to_delete_topic, - ) + assert await joint_fixture.daos.file_metadata.get_by_id(id_=file_id) - # Consume inbound event and check outbound event - deletion_successful_event = event_schemas.FileDeletionSuccess(file_id=file_id) - async with joint_fixture.kafka.record_events( - in_topic=joint_fixture.config.file_deleted_event_topic - ) as recorder: - await joint_fixture.event_subscriber.run(forever=False) - - assert len(recorder.recorded_events) == 1 - assert ( - recorder.recorded_events[0].payload - == deletion_successful_event.model_dump() - ) + num_attempts = 0 + async for _ in joint_fixture.daos.upload_attempts.find_all( + mapping={"file_id": file_id} + ): + num_attempts += 1 + assert num_attempts == 1 - # Verify everything is gone - assert not await s3.storage.does_object_exist( - bucket_id=joint_fixture.bucket_id, object_id=inbox_object_id - ) - await s3.storage._assert_no_multipart_upload( - bucket_id=joint_fixture.bucket_id, object_id=inbox_object_id - ) - with suppress(ResourceNotFoundError): - await joint_fixture.daos.file_metadata.get_by_id(id_=file_id) - - num_attempts = 0 - async for _ in joint_fixture.daos.upload_attempts.find_all( - mapping={"file_id": file_id} - ): - num_attempts += 1 - assert num_attempts == 0 + # Request deletion + deletion_event = event_schemas.FileDeletionRequested(file_id=file_id) + await joint_fixture.kafka.publish_event( + payload=json.loads(deletion_event.model_dump_json()), + type_=joint_fixture.config.files_to_delete_type, + topic=joint_fixture.config.files_to_delete_topic, + ) + + # Consume inbound event and check outbound event + deletion_successful_event = event_schemas.FileDeletionSuccess(file_id=file_id) + async with joint_fixture.kafka.record_events( + in_topic=joint_fixture.config.file_deleted_event_topic + ) as recorder: + await joint_fixture.event_subscriber.run(forever=False) + + assert len(recorder.recorded_events) == 1 + assert recorder.recorded_events[0].payload == deletion_successful_event.model_dump() + + # Verify everything is gone + assert not await joint_fixture.s3.storage.does_object_exist( + bucket_id=joint_fixture.bucket_id, object_id=inbox_object_id + ) + await joint_fixture.s3.storage._assert_no_multipart_upload( + bucket_id=joint_fixture.bucket_id, object_id=inbox_object_id + ) + with suppress(ResourceNotFoundError): + await joint_fixture.daos.file_metadata.get_by_id(id_=file_id) + + num_attempts = 0 + async for _ in joint_fixture.daos.upload_attempts.find_all( + mapping={"file_id": file_id} + ): + num_attempts += 1 + assert num_attempts == 0 diff --git a/services/ucs/tests_ucs/test_typical_journey.py b/services/ucs/tests_ucs/test_typical_journey.py index 3ed57aa8..33963184 100644 --- a/services/ucs/tests_ucs/test_typical_journey.py +++ b/services/ucs/tests_ucs/test_typical_journey.py @@ -33,20 +33,15 @@ from ucs.core.models import UploadStatus from tests_ucs.fixtures.example_data import UPLOAD_DETAILS_1, UPLOAD_DETAILS_2 -from tests_ucs.fixtures.joint import ( # noqa: F401 - JointFixture, - joint_fixture, - kafka_fixture, - mongodb_fixture, - s3_fixture, - second_s3_fixture, -) +from tests_ucs.fixtures.joint import JointFixture TARGET_BUCKET_ID = "test-staging" +pytestmark = pytest.mark.asyncio() + async def run_until_uploaded( - joint_fixture: JointFixture, # noqa: F811 + joint_fixture: JointFixture, file_to_register: event_schemas.MetadataSubmissionFiles, storage_alias: str, ): @@ -111,7 +106,7 @@ async def run_until_uploaded( async def perform_upload( - joint_fixture: JointFixture, # noqa: F811 + joint_fixture: JointFixture, *, file_id: str, final_status: Literal["cancelled", "uploaded"], @@ -184,136 +179,118 @@ async def perform_upload( return upload_details["upload_id"] -@pytest.mark.asyncio -async def test_happy_journey(joint_fixture: JointFixture): # noqa: F811 +async def test_happy_journey(joint_fixture: JointFixture): """Test the typical anticipated/successful journey through the service's APIs.""" - for s3, upload_details in zip( - (joint_fixture.s3, joint_fixture.second_s3), - (UPLOAD_DETAILS_1, UPLOAD_DETAILS_2), - ): - storage_alias = upload_details.storage_alias - file_to_register = upload_details.submission_metadata + storage_alias = UPLOAD_DETAILS_1.storage_alias + file_to_register = UPLOAD_DETAILS_1.submission_metadata - inbox_object_id = await run_until_uploaded( - joint_fixture=joint_fixture, - file_to_register=file_to_register, - storage_alias=storage_alias, - ) + inbox_object_id = await run_until_uploaded( + joint_fixture=joint_fixture, + file_to_register=file_to_register, + storage_alias=storage_alias, + ) - # publish an event to mark the upload as accepted: - acceptance_event = event_schemas.FileInternallyRegistered( - s3_endpoint_alias=storage_alias, - file_id=file_to_register.file_id, - object_id=upload_details.upload_attempt.object_id, - bucket_id=TARGET_BUCKET_ID, - upload_date=now_as_utc().isoformat(), - decrypted_sha256=file_to_register.decrypted_sha256, - decrypted_size=file_to_register.decrypted_size, - decryption_secret_id="some-secret", - content_offset=123456, - encrypted_part_size=123456, - encrypted_parts_md5=["somechecksum", "anotherchecksum"], - encrypted_parts_sha256=["somechecksum", "anotherchecksum"], - ) - await joint_fixture.kafka.publish_event( - payload=json.loads(acceptance_event.model_dump_json()), - type_=joint_fixture.config.upload_accepted_event_type, - topic=joint_fixture.config.upload_accepted_event_topic, - ) + # publish an event to mark the upload as accepted: + acceptance_event = event_schemas.FileInternallyRegistered( + s3_endpoint_alias=storage_alias, + file_id=file_to_register.file_id, + object_id=UPLOAD_DETAILS_1.upload_attempt.object_id, + bucket_id=TARGET_BUCKET_ID, + upload_date=now_as_utc().isoformat(), + decrypted_sha256=file_to_register.decrypted_sha256, + decrypted_size=file_to_register.decrypted_size, + decryption_secret_id="some-secret", + content_offset=123456, + encrypted_part_size=123456, + encrypted_parts_md5=["somechecksum", "anotherchecksum"], + encrypted_parts_sha256=["somechecksum", "anotherchecksum"], + ) + await joint_fixture.kafka.publish_event( + payload=json.loads(acceptance_event.model_dump_json()), + type_=joint_fixture.config.upload_accepted_event_type, + topic=joint_fixture.config.upload_accepted_event_topic, + ) - # consume the acceptance event: - await joint_fixture.event_subscriber.run(forever=False) + # consume the acceptance event: + await joint_fixture.event_subscriber.run(forever=False) - # make sure that the latest upload of the corresponding file was marked as - # accepted: - # First get the ID of the latest upload for that file - response = await joint_fixture.rest_client.get( - f"/files/{file_to_register.file_id}" - ) - assert response.status_code == status.HTTP_200_OK - latest_upload_id = response.json()["latest_upload_id"] + # make sure that the latest upload of the corresponding file was marked as + # accepted: + # First get the ID of the latest upload for that file + response = await joint_fixture.rest_client.get(f"/files/{file_to_register.file_id}") + assert response.status_code == status.HTTP_200_OK + latest_upload_id = response.json()["latest_upload_id"] - # Then get upload details - response = await joint_fixture.rest_client.get(f"/uploads/{latest_upload_id}") - assert response.status_code == status.HTTP_200_OK - assert response.json()["status"] == "accepted" + # Then get upload details + response = await joint_fixture.rest_client.get(f"/uploads/{latest_upload_id}") + assert response.status_code == status.HTTP_200_OK + assert response.json()["status"] == "accepted" - # Finally verify the corresponding object has been removed from object storage - assert not await s3.storage.does_object_exist( - bucket_id=joint_fixture.bucket_id, object_id=inbox_object_id - ) + # Finally verify the corresponding object has been removed from object storage + assert not await joint_fixture.s3.storage.does_object_exist( + bucket_id=joint_fixture.bucket_id, object_id=inbox_object_id + ) -@pytest.mark.asyncio -async def test_unhappy_journey(joint_fixture: JointFixture): # noqa: F811 +async def test_unhappy_journey(joint_fixture: JointFixture): """Test the typical journey. Work through the service's APIs, but reject the upload attempt due to a file validation error. """ - for s3, upload_details in zip( - (joint_fixture.s3, joint_fixture.second_s3), - (UPLOAD_DETAILS_1, UPLOAD_DETAILS_2), - ): - storage_alias = upload_details.storage_alias - file_to_register = upload_details.submission_metadata + storage_alias = UPLOAD_DETAILS_1.storage_alias + file_to_register = UPLOAD_DETAILS_1.submission_metadata - inbox_object_id = await run_until_uploaded( - joint_fixture=joint_fixture, - file_to_register=file_to_register, - storage_alias=storage_alias, - ) + inbox_object_id = await run_until_uploaded( + joint_fixture=joint_fixture, + file_to_register=file_to_register, + storage_alias=storage_alias, + ) - # publish an event to mark the upload as rejected due to validation failure - failure_event = event_schemas.FileUploadValidationFailure( - s3_endpoint_alias=storage_alias, - file_id=file_to_register.file_id, - object_id=upload_details.upload_attempt.object_id, - bucket_id=TARGET_BUCKET_ID, - upload_date=now_as_utc().isoformat(), - reason="Sorry, but this has to fail.", - ) + # publish an event to mark the upload as rejected due to validation failure + failure_event = event_schemas.FileUploadValidationFailure( + s3_endpoint_alias=storage_alias, + file_id=file_to_register.file_id, + object_id=UPLOAD_DETAILS_1.upload_attempt.object_id, + bucket_id=TARGET_BUCKET_ID, + upload_date=now_as_utc().isoformat(), + reason="Sorry, but this has to fail.", + ) - await joint_fixture.kafka.publish_event( - payload=json.loads(failure_event.model_dump_json()), - type_=joint_fixture.config.upload_rejected_event_type, - topic=joint_fixture.config.upload_rejected_event_topic, - ) + await joint_fixture.kafka.publish_event( + payload=json.loads(failure_event.model_dump_json()), + type_=joint_fixture.config.upload_rejected_event_type, + topic=joint_fixture.config.upload_rejected_event_topic, + ) - # consume the validation failure event: - await joint_fixture.event_subscriber.run(forever=False) + # consume the validation failure event: + await joint_fixture.event_subscriber.run(forever=False) - # make sure that the latest upload of the corresponding file was marked as rejected: - # First get the ID of the latest upload for that file - response = await joint_fixture.rest_client.get( - f"/files/{file_to_register.file_id}" - ) - assert response.status_code == status.HTTP_200_OK - latest_upload_id = response.json()["latest_upload_id"] + # make sure that the latest upload of the corresponding file was marked as rejected: + # First get the ID of the latest upload for that file + response = await joint_fixture.rest_client.get(f"/files/{file_to_register.file_id}") + assert response.status_code == status.HTTP_200_OK + latest_upload_id = response.json()["latest_upload_id"] - # Then get upload details - response = await joint_fixture.rest_client.get(f"/uploads/{latest_upload_id}") - assert response.status_code == status.HTTP_200_OK - assert response.json()["status"] == "rejected" + # Then get upload details + response = await joint_fixture.rest_client.get(f"/uploads/{latest_upload_id}") + assert response.status_code == status.HTTP_200_OK + assert response.json()["status"] == "rejected" - # Finally verify the corresponding object has been removed from object storage - assert not await s3.storage.does_object_exist( - bucket_id=joint_fixture.bucket_id, object_id=inbox_object_id - ) + # Finally verify the corresponding object has been removed from object storage + assert not await joint_fixture.s3.storage.does_object_exist( + bucket_id=joint_fixture.bucket_id, object_id=inbox_object_id + ) -@pytest.mark.asyncio -async def test_inbox_inspector( - caplog, - joint_fixture: JointFixture, # noqa: F811 -): +async def test_inbox_inspector(caplog, joint_fixture: JointFixture): """Sanity check for inbox inspection functionality.""" # Run upload for both files and store object IDs inbox_object_ids = [] - for upload_details in (UPLOAD_DETAILS_1, UPLOAD_DETAILS_2): - storage_alias = upload_details.storage_alias - file_to_register = upload_details.submission_metadata + for upload in (UPLOAD_DETAILS_1, UPLOAD_DETAILS_2): + storage_alias = upload.storage_alias + file_to_register = upload.submission_metadata inbox_object_id = await run_until_uploaded( joint_fixture=joint_fixture, file_to_register=file_to_register, @@ -364,7 +341,7 @@ async def test_inbox_inspector( # Verify ID matches and object is present to be properly logged as stale stale_id = inbox_object_ids[1] assert stale_id == attempt.object_id - assert await joint_fixture.second_s3.storage.does_object_exist( + assert await joint_fixture.s3.storage.does_object_exist( bucket_id=joint_fixture.bucket_id, object_id=stale_id ) @@ -381,69 +358,62 @@ async def test_inbox_inspector( assert expected_message in caplog.messages -@pytest.mark.asyncio -async def test_happy_deletion(joint_fixture: JointFixture): # noqa: F811 +async def test_happy_deletion(joint_fixture: JointFixture): """Test happy path for file data/metadata deletion, with file metadata, two upload attempts and a file still in the inbox. """ - for s3, upload_details in zip( - (joint_fixture.s3, joint_fixture.second_s3), - (UPLOAD_DETAILS_1, UPLOAD_DETAILS_2), + s3 = joint_fixture.s3 + storage_alias = UPLOAD_DETAILS_1.storage_alias + file_to_register = UPLOAD_DETAILS_1.submission_metadata + file_id = UPLOAD_DETAILS_1.file_metadata.file_id + + inbox_object_id = await run_until_uploaded( + joint_fixture=joint_fixture, + file_to_register=file_to_register, + storage_alias=storage_alias, + ) + + # Verify everything is present + assert await s3.storage.does_object_exist( + bucket_id=joint_fixture.bucket_id, object_id=inbox_object_id + ) + assert await joint_fixture.daos.file_metadata.get_by_id(id_=file_id) + + num_attempts = 0 + async for _ in joint_fixture.daos.upload_attempts.find_all( + mapping={"file_id": file_id} ): - storage_alias = upload_details.storage_alias - file_to_register = upload_details.submission_metadata - file_id = upload_details.file_metadata.file_id + num_attempts += 1 + assert num_attempts == 2 - inbox_object_id = await run_until_uploaded( - joint_fixture=joint_fixture, - file_to_register=file_to_register, - storage_alias=storage_alias, - ) + # Request deletion + deletion_event = event_schemas.FileDeletionRequested(file_id=file_id) + await joint_fixture.kafka.publish_event( + payload=json.loads(deletion_event.model_dump_json()), + type_=joint_fixture.config.files_to_delete_type, + topic=joint_fixture.config.files_to_delete_topic, + ) - # Verify everything is present - assert await s3.storage.does_object_exist( - bucket_id=joint_fixture.bucket_id, object_id=inbox_object_id - ) - assert await joint_fixture.daos.file_metadata.get_by_id(id_=file_id) - - num_attempts = 0 - async for _ in joint_fixture.daos.upload_attempts.find_all( - mapping={"file_id": file_id} - ): - num_attempts += 1 - assert num_attempts == 2 - - # Request deletion - deletion_event = event_schemas.FileDeletionRequested(file_id=file_id) - await joint_fixture.kafka.publish_event( - payload=json.loads(deletion_event.model_dump_json()), - type_=joint_fixture.config.files_to_delete_type, - topic=joint_fixture.config.files_to_delete_topic, - ) + # Consume inbound event and check outbound event + deletion_successful_event = event_schemas.FileDeletionSuccess(file_id=file_id) + async with joint_fixture.kafka.record_events( + in_topic=joint_fixture.config.file_deleted_event_topic + ) as recorder: + await joint_fixture.event_subscriber.run(forever=False) - # Consume inbound event and check outbound event - deletion_successful_event = event_schemas.FileDeletionSuccess(file_id=file_id) - async with joint_fixture.kafka.record_events( - in_topic=joint_fixture.config.file_deleted_event_topic - ) as recorder: - await joint_fixture.event_subscriber.run(forever=False) - - assert len(recorder.recorded_events) == 1 - assert ( - recorder.recorded_events[0].payload - == deletion_successful_event.model_dump() - ) + assert len(recorder.recorded_events) == 1 + assert recorder.recorded_events[0].payload == deletion_successful_event.model_dump() - # Verify everything is gone - assert not await s3.storage.does_object_exist( - bucket_id=joint_fixture.bucket_id, object_id=inbox_object_id - ) - with suppress(ResourceNotFoundError): - await joint_fixture.daos.file_metadata.get_by_id(id_=file_id) - - num_attempts = 0 - async for _ in joint_fixture.daos.upload_attempts.find_all( - mapping={"file_id": file_id} - ): - num_attempts += 1 - assert num_attempts == 0 + # Verify everything is gone + assert not await s3.storage.does_object_exist( + bucket_id=joint_fixture.bucket_id, object_id=inbox_object_id + ) + with suppress(ResourceNotFoundError): + await joint_fixture.daos.file_metadata.get_by_id(id_=file_id) + + num_attempts = 0 + async for _ in joint_fixture.daos.upload_attempts.find_all( + mapping={"file_id": file_id} + ): + num_attempts += 1 + assert num_attempts == 0