diff --git a/repository_service_tuf_worker/repository.py b/repository_service_tuf_worker/repository.py index e2065673..fcc893c8 100644 --- a/repository_service_tuf_worker/repository.py +++ b/repository_service_tuf_worker/repository.py @@ -1109,12 +1109,6 @@ def _trusted_root_update( f"Expected 'root', got '{new_root.signed.type}'" ) - # Verify that new root is signed by trusted root - current_root.verify_delegate(Root.type, new_root) - - # Verify that new root is signed by itself - new_root.verify_delegate(Root.type, new_root) - # Verify the new root version if new_root.signed.version != current_root.signed.version + 1: raise BadVersionNumberError( @@ -1122,6 +1116,12 @@ def _trusted_root_update( f" instead got version {new_root.signed.version}" ) + # Verify that new root is signed by trusted root + current_root.verify_delegate(Root.type, new_root) + + # Verify that new root is signed by itself + new_root.verify_delegate(Root.type, new_root) + def _root_metadata_update( self, new_root: Metadata[Root] ) -> Dict[str, Any]: @@ -1130,9 +1130,26 @@ def _root_metadata_update( try: self._trusted_root_update(current_root, new_root) + + except UnsignedMetadataError: + # TODO: Add missing sanity check - new root must have at least 1 + # and only valid signature - use `get_verification_status` (#367) + self.write_repository_settings("ROOT_SIGNING", new_root.to_dict()) + return self._task_result( + TaskName.METADATA_UPDATE, + True, + { + "message": "Metadata Update Processed", + "role": Root.type, + "update": ( + f"Root v{new_root.signed.version} is " + "pending signatures" + ), + }, + ) + except ( ValueError, - UnsignedMetadataError, TypeError, BadVersionNumberError, RepositoryError, @@ -1146,6 +1163,16 @@ def _root_metadata_update( }, ) + self._root_metadata_update_finalize(current_root, new_root) + return self._task_result( + TaskName.METADATA_UPDATE, + True, + {"message": "Metadata Update Processed", "role": Root.type}, + ) + + def _root_metadata_update_finalize( + self, current_root: Metadata[Root], new_root: Metadata[Root] + ) -> None: # We always persist the new root metadata, but we cannot persist # without verifying if the online key is rotated to avoid a mismatch # with the rest of the roles using the online key. @@ -1185,12 +1212,6 @@ def _root_metadata_update( f"({self._timeout} seconds)" ) - return self._task_result( - TaskName.METADATA_UPDATE, - True, - {"message": "Metadata Update Processed", "role": Root.type}, - ) - def metadata_update( self, payload: Dict[Literal["metadata"], Dict[Literal[Root.type], Any]], @@ -1262,20 +1283,24 @@ def metadata_rotation( return self.metadata_update(payload, update_state) @staticmethod - def _validate_signature(metadata: Metadata, signature: Signature) -> bool: + def _validate_signature( + metadata: Metadata, + signature: Signature, + delegator: Optional[Metadata] = None, + ) -> bool: """ - Validate signature over metadata using appropriate delegator. - - NOTE: In "metadata update" signing event, the public key and - authorization info is retrieved from "trusted root" - + Validate signature over metadata using appropriate delegator. + If no delegator is passed, the metadata itself is used as delegator. """ + if delegator is None: + delegator = metadata + keyid = signature.keyid - if keyid not in metadata.signed.roles[Root.type].keyids: + if keyid not in delegator.signed.roles[Root.type].keyids: logging.info(f"signature '{keyid}' not authorized") return False - key = metadata.signed.keys.get(signature.keyid) + key = delegator.signed.keys.get(signature.keyid) if not key: logging.info(f"no key for signature '{keyid}'") return False @@ -1292,25 +1317,18 @@ def _validate_signature(metadata: Metadata, signature: Signature) -> bool: return True @staticmethod - def _validate_threshold(metadata: Metadata) -> bool: + def _validate_threshold( + metadata: Metadata, delegator: Optional[Metadata] = None + ) -> bool: """ Validate signature threshold using appropriate delegator(s). - - NOTE: In "metadata update" signing event, the threshold for: - - root is validated with the passed metadata AND the trusted root; - - top-level targets is validated with the trusted root; - - delegated targets is validated with the delegating targets; - as delegator. + If no delegator is passed, the metadata itself is used as delegator. """ + if delegator is None: + delegator = metadata try: - # TODO: `verify_delegate` does not tell us if there are any - # superfluous valid or invalid signatures. Is this something we - # want to know, e.g. to detect mistakes? To detect superfluous - # signatures if verify_delegate succeeds, would be easy: `assert - # len(signatures) == threshold`. Anything, else would require a - # custom `verify_delegate` function. - metadata.verify_delegate(Root.type, metadata) + delegator.verify_delegate(Root.type, metadata) except UnsignedMetadataError as e: logging.info(e) @@ -1327,100 +1345,95 @@ def sign_metadata( ) -> Dict[str, Any]: """Add signature to metadata for pending signing event. - Add signature (from payload) to cached role metadata (from settings) - for the role that matches the passed rolename (from payload), if a - signing event exists for that role, and the signature is valid. + Add signature (from payload) to cached root metadata (from settings), + if a signing event exists, and the signature is valid. + + Signing event types are 'bootstrap' or 'metadata update'. - If the signature threshold is reached, the signing event is finalized. + If the signature threshold is reached, the signing event is finalized, + otherwise it remains in pending state. + """ - ** Signing event types (and details) ** + def _result(status, error=None, bootstrap=None, update=None): + details = {} + if status: + details["message"] = "Signature Processed" + else: + details["message"] = "Signature Failed" + if error: + details["error"] = error + elif bootstrap: + details["bootstrap"] = bootstrap + elif update: + details["update"] = update - BOOTSTRAP: Only root metadata can be updated in this event. To verify - the passed signature, the keys and threshold are read from the root - metadata to be updated itself. If the threshold is reached, the - bootstrap process is finalized. + return self._task_result(TaskName.SIGN_METADATA, status, details) - METADATA UPDATE: Root, targets or delegated targets metadata can be - updated in this event. Depending on the metadata type, the authorized - public keys and threshold are read from different delegating metadata. - """ + signature = Signature.from_dict(payload["signature"]) rolename = payload["role"] - signature_dict = payload["signature"] - # Assert pending signing event - metadata_dict = self._settings.get_fresh(f"{rolename.upper()}_SIGNING") - if metadata_dict is None: - return self._task_result( - TaskName.SIGN_METADATA, - False, - { - "message": "Signature Failed", - "error": f"No signatures pending for {rolename}", - }, - ) + # Assert requested metadata type is root + if rolename != Root.type: + msg = f"Expected '{Root.type}', got '{rolename}'" + return _result(False, error=msg) - # Assert signing event type (currently bootstrap only) - # TODO: repository-service-tuf/repository-service-tuf-worker#336 - bootstrap_state = self._settings.get_fresh("BOOTSTRAP") - if "signing" not in bootstrap_state: - return self._task_result( - TaskName.SIGN_METADATA, - False, - { - "message": "Signature Failed", - "error": "No bootstrap available for signing", - }, - ) + # Assert pending signing event exists + metadata_dict = self._settings.get_fresh("ROOT_SIGNING") + if metadata_dict is None: + msg = "No signatures pending for root" + return _result(False, error=msg) - # Assert metadata type is allowed for signing event + # Assert metadata type is root root = Metadata.from_dict(metadata_dict) if not isinstance(root.signed, Root): - return self._task_result( - TaskName.SIGN_METADATA, - False, - { - "message": "Signature Failed", - "error": f"Role {rolename} has wrong type", - }, - ) + msg = f"Expected 'root', got '{root.signed.type}'" + return _result(False, error=msg) - # Assert passed signature is valid for metadata - signature = Signature.from_dict(signature_dict) - if not self._validate_signature(root, signature): - return self._task_result( - TaskName.SIGN_METADATA, - False, - { - "message": "Signature Failed", - "error": "Invalid signature", - }, - ) + # If it isn't a "bootstrap" signing event, it must be "update metadata" + bootstrap_state = self._settings.get_fresh("BOOTSTRAP") + if "signing" in bootstrap_state: + # Signature and threshold of initial root can only self-validate, + # there is no "trusted root" at bootstrap time yet. + if not self._validate_signature(root, signature): + return _result(False, error="Invalid signature") + + root.signatures[signature.keyid] = signature + if not self._validate_threshold(root): + self.write_repository_settings("ROOT_SIGNING", root.to_dict()) + msg = f"Root v{root.signed.version} is pending signatures" + return _result(True, bootstrap=msg) + + bootstrap_task_id = bootstrap_state.split("signing-")[1] + self._bootstrap_finalize(root, bootstrap_task_id) + return _result(True, bootstrap="Bootstrap Finished") - # Check threshold with new signature included - root.signatures[signature.keyid] = signature - if not self._validate_threshold(root): - self.write_repository_settings("ROOT_SIGNING", root.to_dict()) - root_version = root.signed.version - return self._task_result( - TaskName.SIGN_METADATA, - True, - { - "message": "Signature Processed", - "bootstrap": f"Root v{root_version} is pending signatures", - }, + else: + # We need the "trusted root" when updating to a new root: + # - signature could come from a key, which is only in the trusted + # root, OR from a key, which is only in the new root + # - threshold must validate with the threshold of keys as defined + # in the trusted root AND as defined in the new root + trusted_root = self._storage_backend.get("root") + is_valid_trusted = self._validate_signature( + root, signature, trusted_root ) - - # Finalize bootstrap - bootstrap_task_id = bootstrap_state.split("signing-")[1] - self._bootstrap_finalize(root, bootstrap_task_id) - return self._task_result( - TaskName.SIGN_METADATA, - True, - { - "message": "Signature Processed", - "bootstrap": "Bootstrap Finished", - }, - ) + is_valid_new = self._validate_signature(root, signature) + + if not (is_valid_trusted or is_valid_new): + return _result(False, error="Invalid signature") + + root.signatures[signature.keyid] = signature + trusted_threshold = self._validate_threshold(root, trusted_root) + new_threshold = self._validate_threshold(root) + if not (trusted_threshold and new_threshold): + self.write_repository_settings("ROOT_SIGNING", root.to_dict()) + msg = f"Root v{root.signed.version} is pending signatures" + return _result(True, update=msg) + + # Threshold reached -> finalize event + self._root_metadata_update_finalize(trusted_root, root) + self.write_repository_settings("ROOT_SIGNING", None) + return _result(True, update="Metadata update finished") def delete_sign_metadata( self, diff --git a/tests/unit/tuf_repository_service_worker/test_repository.py b/tests/unit/tuf_repository_service_worker/test_repository.py index a6494d14..dbf3dc6a 100644 --- a/tests/unit/tuf_repository_service_worker/test_repository.py +++ b/tests/unit/tuf_repository_service_worker/test_repository.py @@ -2513,25 +2513,17 @@ def test__trusted_root_update_bad_version(self, test_repo): version=4, type=repository.Root.type, ), - verify_delegate=pretend.call_recorder(lambda *a: None), ) fake_old_root_md = pretend.stub( signed=pretend.stub( roles={"timestamp": pretend.stub(keyids={"k1": "v1"})}, version=1, ), - verify_delegate=pretend.call_recorder(lambda *a: None), ) with pytest.raises(repository.BadVersionNumberError) as err: test_repo._trusted_root_update(fake_old_root_md, fake_new_root_md) assert "Expected root version 2 instead got version 4" in str(err) - assert fake_new_root_md.verify_delegate.calls == [ - pretend.call(repository.Root.type, fake_new_root_md) - ] - assert fake_old_root_md.verify_delegate.calls == [ - pretend.call(repository.Root.type, fake_new_root_md) - ] def test__trusted_root_update_bad_type(self, test_repo): fake_new_root_md = pretend.stub( @@ -2593,6 +2585,53 @@ def test__root_metadata_update(self, test_repo, mocked_datetime): pretend.call(fake_new_root_md, repository.Root.type) ] + def test__root_metadata_update_signatures_pending( + self, test_repo, mocked_datetime + ): + fake_datetime = mocked_datetime + fake_new_root_md = pretend.stub( + signed=pretend.stub( + roles={"timestamp": pretend.stub(keyids={"k1": "v1"})}, + version=2, + ) + ) + fake_old_root_md = pretend.stub( + signed=pretend.stub( + roles={"timestamp": pretend.stub(keyids={"k1": "v1"})}, + version=1, + ) + ) + test_repo._storage_backend.get = pretend.call_recorder( + lambda *a: fake_old_root_md + ) + test_repo._trusted_root_update = pretend.raiser( + repository.UnsignedMetadataError() + ) + + fake_new_root_md.to_dict = pretend.call_recorder(lambda: "fake dict") + test_repo.write_repository_settings = pretend.call_recorder( + lambda *a: "fake" + ) + + result = test_repo._root_metadata_update(fake_new_root_md) + + assert result == { + "task": "metadata_update", + "status": True, + "last_update": fake_datetime.now(), + "details": { + "message": "Metadata Update Processed", + "role": "root", + "update": "Root v2 is pending signatures", + }, + } + assert test_repo._storage_backend.get.calls == [ + pretend.call(repository.Root.type) + ] + assert test_repo.write_repository_settings.calls == [ + pretend.call("ROOT_SIGNING", "fake dict") + ] + def test__root_metadata_update_not_trusted( self, test_repo, mocked_datetime ): @@ -3103,49 +3142,6 @@ def test_sign_metadata_no_role_signing( pretend.call("ROOT_SIGNING"), ] - def test_sign_metadata_root_signing_no_bootstrap( - self, test_repo, monkeypatch, mocked_datetime - ): - fake_datetime = mocked_datetime - - def fake_get_fresh(key): - if key == "BOOTSTRAP": - return "" - if key == "ROOT_SIGNING": - return {"metadata": "fake"} - - fake_settings = pretend.stub( - get_fresh=pretend.call_recorder(fake_get_fresh), - ) - monkeypatch.setattr( - repository, - "get_repository_settings", - lambda *a, **kw: fake_settings, - ) - - payload = { - "role": "root", - "signature": {"keyid": "keyid2", "sig": "sig2"}, - } - result = test_repo.sign_metadata(payload) - - assert result == { - "task": "sign_metadata", - "status": False, - "last_update": fake_datetime.now(), - "details": { - "message": "Signature Failed", - "error": "No bootstrap available for signing", - }, - } - assert fake_settings.get_fresh.calls == [ - pretend.call("ROOT_SIGNING"), - pretend.call("BOOTSTRAP"), - ] - assert repository.Metadata.from_dict.calls == [ - pretend.call({"metadata": "fake"}) - ] - def test_sign_metadata_invalid_role_type( self, test_repo, monkeypatch, mocked_datetime ): @@ -3183,12 +3179,11 @@ def fake_get_fresh(key): "last_update": fake_datetime.now(), "details": { "message": "Signature Failed", - "error": f"Role {payload['role']} has wrong type", + "error": "Expected 'root', got 'targets'", }, } assert fake_settings.get_fresh.calls == [ pretend.call("ROOT_SIGNING"), - pretend.call("BOOTSTRAP"), ] def test_sign_metadata_invalid_signature( @@ -3315,6 +3310,342 @@ def fake_get_fresh(key): pretend.call("ROOT_SIGNING", "fake_metadata") ] + def test_sign_metadata__update__bad_role_type( + self, test_repo, monkeypatch, mocked_datetime + ): + fake_signature = pretend.stub(keyid="fake") + repository.Signature.from_dict = pretend.call_recorder( + lambda *a: fake_signature + ) + payload = {"signature": "fake", "role": "foo"} + result = test_repo.sign_metadata(payload) + + assert result == { + "task": "sign_metadata", + "status": False, + "last_update": mocked_datetime.now(), + "details": { + "message": "Signature Failed", + "error": "Expected 'root', got 'foo'", + }, + } + + def test_sign_metadata__update__invalid_signature( + self, + test_repo, + monkeypatch, + mocked_datetime, + ): + def fake_get_fresh(key): + if key == "BOOTSTRAP": + return "" + if key == "ROOT_SIGNING": + return {"metadata": "fake"} + + fake_settings = pretend.stub( + get_fresh=pretend.call_recorder(fake_get_fresh), + ) + monkeypatch.setattr( + repository, + "get_repository_settings", + lambda *a, **kw: fake_settings, + ) + fake_signature = pretend.stub(keyid="fake") + repository.Signature.from_dict = pretend.call_recorder( + lambda *a: fake_signature + ) + fake_trusted_root = repository.Metadata( + signed=repository.Root(version=1) + ) + test_repo._storage_backend.get = pretend.call_recorder( + lambda r: fake_trusted_root + ) + fake_new_root = repository.Metadata(signed=repository.Root(version=2)) + repository.Metadata.from_dict = pretend.call_recorder( + lambda *a: fake_new_root + ) + + # Use `next` below to mock subsequent calls + fake_signature_result = iter((False, False)) + test_repo._validate_signature = pretend.call_recorder( + lambda *a: next(fake_signature_result) + ) + + payload = {"signature": "fake", "role": "root"} + result = test_repo.sign_metadata(payload) + + assert result == { + "task": "sign_metadata", + "status": False, + "last_update": mocked_datetime.now(), + "details": { + "message": "Signature Failed", + "error": "Invalid signature", + }, + } + + def test_sign_metadata__update__invalid_threshold__trusted_and_new( + self, + test_repo, + monkeypatch, + mocked_datetime, + ): + def fake_get_fresh(key): + if key == "BOOTSTRAP": + return "" + if key == "ROOT_SIGNING": + return {"metadata": "fake"} + + fake_settings = pretend.stub( + get_fresh=pretend.call_recorder(fake_get_fresh), + ) + monkeypatch.setattr( + repository, + "get_repository_settings", + lambda *a, **kw: fake_settings, + ) + fake_signature = pretend.stub(keyid="fake") + repository.Signature.from_dict = pretend.call_recorder( + lambda *a: fake_signature + ) + fake_trusted_root = repository.Metadata( + signed=repository.Root(version=1) + ) + test_repo._storage_backend.get = pretend.call_recorder( + lambda r: fake_trusted_root + ) + fake_new_root = repository.Metadata(signed=repository.Root(version=2)) + repository.Metadata.from_dict = pretend.call_recorder( + lambda *a: fake_new_root + ) + fake_new_root.to_dict = pretend.call_recorder(lambda: "fake") + test_repo.write_repository_settings = pretend.call_recorder( + lambda *a: "fake" + ) + + # Use `next` below to mock subsequent calls + fake_signature_result = iter((True, False)) + fake_threshold_result = iter((False, False)) + + test_repo._validate_signature = pretend.call_recorder( + lambda *a: next(fake_signature_result) + ) + test_repo._validate_threshold = pretend.call_recorder( + lambda *a: next(fake_threshold_result) + ) + + # Call sign_metadata with fake payload + # All deserialization and validation is mocked + payload = {"signature": "fake", "role": "root"} + result = test_repo.sign_metadata(payload) + + assert result == { + "task": "sign_metadata", + "status": True, + "last_update": mocked_datetime.now(), + "details": { + "message": "Signature Processed", + "update": "Root v2 is pending signatures", + }, + } + + def test_sign_metadata__update__invalid_threshold__trusted( + self, + test_repo, + monkeypatch, + mocked_datetime, + ): + def fake_get_fresh(key): + if key == "BOOTSTRAP": + return "" + if key == "ROOT_SIGNING": + return {"metadata": "fake"} + + fake_settings = pretend.stub( + get_fresh=pretend.call_recorder(fake_get_fresh), + ) + monkeypatch.setattr( + repository, + "get_repository_settings", + lambda *a, **kw: fake_settings, + ) + fake_signature = pretend.stub(keyid="fake") + repository.Signature.from_dict = pretend.call_recorder( + lambda *a: fake_signature + ) + fake_trusted_root = repository.Metadata( + signed=repository.Root(version=1) + ) + test_repo._storage_backend.get = pretend.call_recorder( + lambda r: fake_trusted_root + ) + fake_new_root = repository.Metadata(signed=repository.Root(version=2)) + repository.Metadata.from_dict = pretend.call_recorder( + lambda *a: fake_new_root + ) + fake_new_root.to_dict = pretend.call_recorder(lambda: "fake") + test_repo.write_repository_settings = pretend.call_recorder( + lambda *a: "fake" + ) + + # Use `next` below to mock subsequent calls + fake_signature_result = iter((False, True)) + fake_threshold_result = iter((False, True)) + + test_repo._validate_signature = pretend.call_recorder( + lambda *a: next(fake_signature_result) + ) + test_repo._validate_threshold = pretend.call_recorder( + lambda *a: next(fake_threshold_result) + ) + + # Call sign_metadata with fake payload + # All deserialization and validation is mocked + payload = {"signature": "fake", "role": "root"} + result = test_repo.sign_metadata(payload) + + assert result == { + "task": "sign_metadata", + "status": True, + "last_update": mocked_datetime.now(), + "details": { + "message": "Signature Processed", + "update": "Root v2 is pending signatures", + }, + } + + def test_sign_metadata__update__invalid_threshold__new( + self, + test_repo, + monkeypatch, + mocked_datetime, + ): + """Test: New root does not meet signature threshold.""" + + def fake_get_fresh(key): + if key == "BOOTSTRAP": + return "" + if key == "ROOT_SIGNING": + return {"metadata": "fake"} + + fake_settings = pretend.stub( + get_fresh=pretend.call_recorder(fake_get_fresh), + ) + monkeypatch.setattr( + repository, + "get_repository_settings", + lambda *a, **kw: fake_settings, + ) + fake_signature = pretend.stub(keyid="fake") + repository.Signature.from_dict = pretend.call_recorder( + lambda *a: fake_signature + ) + fake_trusted_root = repository.Metadata( + signed=repository.Root(version=1) + ) + test_repo._storage_backend.get = pretend.call_recorder( + lambda r: fake_trusted_root + ) + fake_new_root = repository.Metadata(signed=repository.Root(version=2)) + repository.Metadata.from_dict = pretend.call_recorder( + lambda *a: fake_new_root + ) + fake_new_root.to_dict = pretend.call_recorder(lambda: "fake") + test_repo.write_repository_settings = pretend.call_recorder( + lambda *a: "fake" + ) + + # Use `next` below to mock subsequent calls + fake_signature_result = iter((True, True)) + fake_threshold_result = iter((True, False)) + + test_repo._validate_signature = pretend.call_recorder( + lambda *a: next(fake_signature_result) + ) + test_repo._validate_threshold = pretend.call_recorder( + lambda *a: next(fake_threshold_result) + ) + + # Call sign_metadata with fake payload + # All deserialization and validation is mocked + payload = {"signature": "fake", "role": "root"} + result = test_repo.sign_metadata(payload) + + assert result == { + "task": "sign_metadata", + "status": True, + "last_update": mocked_datetime.now(), + "details": { + "message": "Signature Processed", + "update": "Root v2 is pending signatures", + }, + } + + def test_sign_metadata__update__valid_threshold( + self, + test_repo, + monkeypatch, + mocked_datetime, + ): + def fake_get_fresh(key): + if key == "BOOTSTRAP": + return "" + if key == "ROOT_SIGNING": + return {"metadata": "fake"} + + fake_settings = pretend.stub( + get_fresh=pretend.call_recorder(fake_get_fresh), + ) + monkeypatch.setattr( + repository, + "get_repository_settings", + lambda *a, **kw: fake_settings, + ) + fake_signature = pretend.stub(keyid="fake") + repository.Signature.from_dict = pretend.call_recorder( + lambda *a: fake_signature + ) + fake_trusted_root = repository.Metadata( + signed=repository.Root(version=1) + ) + test_repo._storage_backend.get = pretend.call_recorder( + lambda r: fake_trusted_root + ) + fake_new_root = repository.Metadata(signed=repository.Root(version=2)) + repository.Metadata.from_dict = pretend.call_recorder( + lambda *a: fake_new_root + ) + fake_new_root.to_dict = pretend.call_recorder(lambda: "fake") + test_repo.write_repository_settings = pretend.call_recorder( + lambda *a: "fake" + ) + + # Use `next` below to mock subsequent calls + fake_signature_result = iter((True, True)) + fake_threshold_result = iter((True, True)) + + test_repo._validate_signature = pretend.call_recorder( + lambda *a: next(fake_signature_result) + ) + test_repo._validate_threshold = pretend.call_recorder( + lambda *a: next(fake_threshold_result) + ) + + # Call sign_metadata with fake payload + # All deserialization and validation is mocked + payload = {"signature": "fake", "role": "root"} + result = test_repo.sign_metadata(payload) + + assert result == { + "task": "sign_metadata", + "status": True, + "last_update": mocked_datetime.now(), + "details": { + "message": "Signature Processed", + "update": "Metadata update finished", + }, + } + def test_delete_sign_metadata_bootstrap_signing_state( self, test_repo, monkeypatch, mocked_datetime ):