From cf6c6bb6d358032db20440328bdfd06f31dbaf65 Mon Sep 17 00:00:00 2001 From: Joachim Jablon Date: Thu, 18 Mar 2021 00:13:11 +0100 Subject: [PATCH] Move dangling methods into class methods --- docs/howto.rst | 27 ++++- docs/reference.rst | 2 +- pypitoken/__init__.py | 6 +- pypitoken/token.py | 229 +++++++++++++++++++++++------------------- tests/test_token.py | 53 ++++++---- 5 files changed, 188 insertions(+), 129 deletions(-) diff --git a/docs/howto.rst b/docs/howto.rst index d2875b2..b280b9b 100644 --- a/docs/howto.rst +++ b/docs/howto.rst @@ -2,7 +2,9 @@ How to... ========= -This how-to section is divided into two parts: +This how-to section is divided into two parts: user and integrator doc. +User is for people who want to interact with their own tokens. +Integrator is for people interested into interacting this library in Warehouse. "User" documentation: you're a PyPI user ======================================== @@ -74,6 +76,29 @@ if you're a PyPI admin, you can find the key which is stored in the PyPI Databas Practically, PyPI admins don't go around looking at the token secrets keys. Your Macaroon keys are safe where they are, and it's best for everyone this way. +Introspect a token's restrictions +--------------------------------- + +`Token.restrictions` will give you a list of restriction objects. These objects +are dataclass instances that you can compare and introspect easily. + +There might also be cases where you want to interact with caveat values directly, +without a token. In this case, you can use the methods on the `Restriction` class:: + + import pypitoken + restriction = pypitoken.Restriction.load_json( + '{"version": 1, "permissions": "user"}' + ) + # or + restriction = pypitoken.Restriction.load( + {"version": 1, "permissions": "user"} + ) + # NoopRestriction() + + print(restriction.dump()) # outputs a dict + print(restriction.dump_json()) # outputs a json-encoded string + + "Integrator" documentation: you code for PyPI itself ==================================================== diff --git a/docs/reference.rst b/docs/reference.rst index dc801c4..0f5eb4c 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -16,7 +16,7 @@ should not have to call the methods directly. Use `Token.restrict` and `Token.restrictions` instead. .. autoclass:: pypitoken.token.Restriction - :members: load_value, dump, dump_value, get_schema, extract_kwargs, check + :members: load, load_json, dump_json, dump, check .. autoclass:: pypitoken.NoopRestriction :show-inheritance: diff --git a/pypitoken/__init__.py b/pypitoken/__init__.py index d9dd5bd..1f5a35e 100644 --- a/pypitoken/__init__.py +++ b/pypitoken/__init__.py @@ -1,10 +1,14 @@ from .exceptions import LoaderError, PyPITokenException, ValidationError -from .token import NoopRestriction, ProjectsRestriction, Token +from .token import NoopRestriction, ProjectsRestriction, Restriction, Token __all__ = [ + # Main classes "Token", + "Restriction", + # Restriction subclasses "NoopRestriction", "ProjectsRestriction", + # Exceptions "PyPITokenException", "LoaderError", "ValidationError", diff --git a/pypitoken/token.py b/pypitoken/token.py index 552cdc9..75e2df8 100644 --- a/pypitoken/token.py +++ b/pypitoken/token.py @@ -39,14 +39,8 @@ class Restriction: Expose lower-level methods for restriction/caveat introspection. """ - def dump(self) -> str: - """ - Transform a restriction into a JSON-encoded string - """ - return json.dumps(self.dump_value()) - @staticmethod - def get_schema() -> Dict: + def _get_schema() -> Dict: """ Return a jsonschema Dict object used to validate the format of a json restriction. @@ -54,7 +48,7 @@ def get_schema() -> Dict: raise NotImplementedError @classmethod - def load_value(cls: Type[T], value: Dict) -> T: + def _load_value(cls: Type[T], value: Dict) -> T: """ Create a Restriction from the JSON value stored in the caveat @@ -70,15 +64,75 @@ def load_value(cls: Type[T], value: Dict) -> T: try: jsonschema.validate( instance=value, - schema=cls.get_schema(), + schema=cls._get_schema(), ) except jsonschema.ValidationError as exc: raise exceptions.LoaderError() from exc - return cls(**cls.extract_kwargs(value=value)) # type: ignore + return cls(**cls._extract_kwargs(value=value)) # type: ignore + + @staticmethod + def _get_subclasses() -> List[Type["Restriction"]]: + """ + List all subclasses of Restriction that we want to match against + """ + # We could use __subclasses__ but that could lead to all kinds of funky things, + # especially in a security-sensistive library. + # Tests will check this against Restriction subclasses though. + return [NoopRestriction, ProjectsRestriction] + + @staticmethod + def _json_load_caveat(caveat: str) -> Any: + try: + value = json.loads(caveat) + except Exception as exc: + raise exceptions.LoaderError(f"Error while loading caveat: {exc}") from exc + + return value + + @classmethod + def load(cls, caveat: Dict) -> "Restriction": + """ + Create a Restriction from a raw caveat restriction JSON object. + + Raises + ------ + pypitokens.LoaderError + If the format cannot be understood + + Returns + ------- + `Restriction` + """ + for subclass in cls._get_subclasses(): + try: + return subclass._load_value(value=caveat) + except exceptions.LoaderError: + continue + + raise exceptions.LoaderError( + f"Could not find matching Restriction for {caveat}" + ) + + @classmethod + def load_json(cls, caveat: str) -> "Restriction": + """ + Create a Restriction from a raw caveat restriction JSON string. + + Raises + ------ + pypitokens.LoaderError + If the format cannot be understood + + Returns + ------- + `Restriction` + """ + caveat_obj = cls._json_load_caveat(caveat=caveat) + return cls.load(caveat=caveat_obj) @classmethod - def extract_kwargs(cls, value: Dict) -> Dict: + def _extract_kwargs(cls, value: Dict) -> Dict: """ Receive the parsed JSON value of a caveat for which the schema has been validated. Return the instantiation kwargs (``__init__`` parameters). @@ -101,12 +155,18 @@ def check(self, context: Context) -> None: """ raise NotImplementedError - def dump_value(self) -> Dict: + def dump(self) -> Dict: """ - Transform a restriction into a JSON object + Transform a restriction into a JSON-compatible dict object """ raise NotImplementedError + def dump_json(self) -> str: + """ + Transform a restriction into a JSON-encoded string + """ + return json.dumps(self.dump()) + @dataclasses.dataclass class NoopRestriction(Restriction): @@ -115,7 +175,7 @@ class NoopRestriction(Restriction): """ @staticmethod - def get_schema() -> Dict: + def _get_schema() -> Dict: return { "type": "object", "properties": { @@ -127,14 +187,14 @@ def get_schema() -> Dict: } @classmethod - def extract_kwargs(cls, value: Dict) -> Dict: + def _extract_kwargs(cls, value: Dict) -> Dict: return {} def check(self, context: Context) -> None: # Always passes return - def dump_value(self) -> Dict: + def dump(self) -> Dict: return {"version": 1, "permissions": "user"} @@ -152,7 +212,7 @@ class ProjectsRestriction(Restriction): projects: List[str] @staticmethod - def get_schema() -> Dict: + def _get_schema() -> Dict: return { "type": "object", "properties": { @@ -171,10 +231,10 @@ def get_schema() -> Dict: } @classmethod - def extract_kwargs(cls, value: Dict) -> Dict: + def _extract_kwargs(cls, value: Dict) -> Dict: return {"projects": value["permissions"]["projects"]} - def dump_value(self) -> Dict: + def dump(self) -> Dict: return {"version": 1, "permissions": {"projects": self.projects}} def check(self, context: Context) -> None: @@ -187,82 +247,6 @@ def check(self, context: Context) -> None: ) -# We could use __subclasses__ but that could lead to all kinds of funky things, -# especially in a security-sensistive library -RESTRICTION_CLASSES: List[Type[Restriction]] = [NoopRestriction, ProjectsRestriction] - - -def json_load_caveat(caveat: str) -> Any: - try: - value = json.loads(caveat) - except Exception as exc: - raise exceptions.LoaderError(f"Error while loading caveat: {exc}") from exc - - return value - - -def load_restriction( - caveat: Dict, classes: List[Type[Restriction]] = RESTRICTION_CLASSES -) -> "Restriction": - """ - Create a Restriction from a raw caveat restriction JSON object. - - Raises - ------ - pypitokens.LoaderError - If the format cannot be understood - - Returns - ------- - `Restriction` - """ - for subclass in classes: - try: - return subclass.load_value(value=caveat) - except exceptions.LoaderError: - continue - - raise exceptions.LoaderError(f"Could not find matching Restriction for {caveat}") - - -def check_caveat(caveat: str, context: Context, errors: List[Exception]) -> bool: - """ - This function follows the pymacaroon Verifier.satisfy_general API, except - that it takes a context parameter. It's expected to be used with `functools.partial` - - Parameters - ---------- - caveat : str - raw caveat string that will be turned into a restriction to be checked - context : - Dict describing what we're trying to use the Token for, in order to decide - whether the restrictions expressed by the caveat apply. - - Returns - ------- - bool - True if the caveat is met, False otherwise (should not raise). - """ - # The pymacaroons API tells us to return False when a validation fails, - # which keeps us from raising a more expressive error. - # To circumvent this, we store any exception in ``errors`` - - try: - value = json_load_caveat(caveat=caveat) - restriction = load_restriction(caveat=value) - except exceptions.LoaderError as exc: - errors.append(exc) - return False - - try: - restriction.check(context=context) - except exceptions.ValidationError as exc: - errors.append(exc) - return False - - return True - - class Token: """ Create a `Token` (as PyPI itself would do), load an existing @@ -430,7 +414,7 @@ def restrict( """ caveats: List[str] = [] if projects is not None: - caveats.append(ProjectsRestriction(projects).dump()) + caveats.append(ProjectsRestriction(projects).dump_json()) # Add other restrictions here @@ -438,7 +422,7 @@ def restrict( # It's actually not really useful to add a noop restriction, but # it's done that way in the original implementation, and has been kept so # far - caveats = [NoopRestriction().dump()] + caveats = [NoopRestriction().dump_json()] for caveat in caveats: self._macaroon.add_first_party_caveat(caveat) @@ -492,22 +476,59 @@ def check( errors: List[Exception] = [] verifier.satisfy_general( - functools.partial(check_caveat, context=context, errors=errors) + functools.partial(self._check_caveat, context=context, errors=errors) ) try: verifier.verify(self._macaroon, key) # https://github.com/ecordell/pymacaroons/issues/51 except Exception as exc: - # (we know it's actually a single item, there cannot be multiple items in - # this list) if errors: - raise exceptions.ValidationError( - f"Error while validating token: {errors[0]}" - ) from errors[0] + # (we know it's actually a single item, there cannot be multiple items + # in this list) + (exc,) = errors + raise exceptions.ValidationError( f"Error while validating token: {exc}" ) from exc + @staticmethod + def _check_caveat(caveat: str, context: Context, errors: List[Exception]) -> bool: + """ + This method follows the pymacaroon Verifier.satisfy_general API, except + that it takes a context parameter. It's expected to be used with + `functools.partial` + + Parameters + ---------- + caveat : + raw caveat string that will be turned into a restriction to be checked + context : + Dict describing what we're trying to use the Token for, in order to decide + whether the restrictions expressed by the caveat apply. + errors : + If any exception is raised, it will be appended to this list, so that + we'll be able to raise them properly later + + Returns + ------- + bool + True if the caveat is met, False otherwise (should not raise). + """ + + try: + restriction = Restriction.load_json(caveat=caveat) + except exceptions.LoaderError as exc: + errors.append(exc) + return False + + try: + restriction.check(context=context) + except exceptions.ValidationError as exc: + errors.append(exc) + return False + + return True + @property def restrictions(self) -> List[Restriction]: """ @@ -524,6 +545,6 @@ def restrictions(self) -> List[Restriction]: When the existing restrictions cannot be parsed """ return [ - load_restriction(caveat=json_load_caveat(caveat=caveat.caveat_id)) + Restriction.load_json(caveat=caveat.caveat_id) for caveat in self._macaroon.caveats ] diff --git a/tests/test_token.py b/tests/test_token.py index 42aff40..bde19ec 100644 --- a/tests/test_token.py +++ b/tests/test_token.py @@ -6,12 +6,12 @@ from pypitoken import exceptions, token -def test__Restriction__dump(): +def test__Restriction__dump_json(): class MyRestriction(token.Restriction): - def dump_value(self): + def dump(self): return {"a": ["b"]} - assert MyRestriction().dump() == '{"a": ["b"]}' + assert MyRestriction().dump_json() == '{"a": ["b"]}' def test__Restriction__load_value__pass(): @@ -93,9 +93,9 @@ def test__NoopRestriction__check(): assert noop.check(context=token.Context(project="foo")) is None -def test__NoopRestriction__dump_value(): +def test__NoopRestriction__dump(): noop = token.NoopRestriction() - assert noop.dump_value() == {"version": 1, "permissions": "user"} + assert noop.dump() == {"version": 1, "permissions": "user"} @pytest.mark.parametrize( @@ -156,31 +156,31 @@ def test__ProjectsRestriction__check__fail(): restriction.check(context=token.Context(project="c")) -def test__ProjectsRestriction__dump_value(): +def test__ProjectsRestriction__dump(): restriction = token.ProjectsRestriction(projects=["a", "b"]) - assert restriction.dump_value() == { + assert restriction.dump() == { "version": 1, "permissions": {"projects": ["a", "b"]}, } -def test__RESTRICTION_CLASSES(): +def test__Restriction__get_subclasses(): # This test ensures we didn't forget to add new restriction classes to # the set. - assert set(token.RESTRICTION_CLASSES) == { + assert set(token.Restriction._get_subclasses()) == { cls for cls in token.Restriction.__subclasses__() if cls.__module__ == "pypitoken.token" } -def test__json_load_caveat__pass(): - assert token.json_load_caveat('{"a": "b"}') == {"a": "b"} +def test__Restriction__json_load_caveat__pass(): + assert token.Restriction._json_load_caveat('{"a": "b"}') == {"a": "b"} -def test__json_load_caveat__fail(): +def test__Restriction__json_load_caveat__fail(): with pytest.raises(exceptions.LoaderError) as exc_info: - token.json_load_caveat(caveat='{"a": "b"') + token.Restriction._json_load_caveat(caveat='{"a": "b"') assert ( str(exc_info.value) == "Error while loading caveat: " "Expecting ',' delimiter: line 1 column 10 (char 9)" @@ -200,22 +200,29 @@ def test__json_load_caveat__fail(): ), ], ) -def test__load_restriction__pass(caveat, output): - assert token.load_restriction(caveat=caveat) == output +def test__Restriction__load__pass(caveat, output): + assert token.Restriction.load(caveat=caveat) == output -def test__load_restriction__fail(): +def test__Restriction__load__fail(): with pytest.raises(exceptions.LoaderError) as exc_info: - token.load_restriction(caveat={"version": 1, "permissions": "something"}) + token.Restriction.load(caveat={"version": 1, "permissions": "something"}) assert ( str(exc_info.value) == "Could not find matching Restriction for {'version': 1, 'permissions': 'something'}" ) -def test__check_caveat__pass(): +def test__Restriction__load_json(): + restriction = token.Restriction.load_json( + caveat='{"version": 1, "permissions": "user"}' + ) + assert restriction == token.NoopRestriction() + + +def test__Token__check_caveat__pass(): errors = [] - value = token.check_caveat( + value = token.Token._check_caveat( caveat='{"version": 1, "permissions": {"projects": ["a", "b"]}}', context=token.Context(project="a"), errors=errors, @@ -226,7 +233,9 @@ def test__check_caveat__pass(): def test__check_caveat__fail_load_json(): errors = [] - value = token.check_caveat("{", context=token.Context(project="a"), errors=errors) + value = token.Token._check_caveat( + "{", context=token.Context(project="a"), errors=errors + ) assert value is False messages = [str(e) for e in errors] assert messages == [ @@ -238,7 +247,7 @@ def test__check_caveat__fail_load_json(): def test__check_caveat__fail_load(): errors = [] - value = token.check_caveat( + value = token.Token._check_caveat( '{"version": 13}', context=token.Context(project="a"), errors=errors ) assert value is False @@ -249,7 +258,7 @@ def test__check_caveat__fail_load(): def test__check_caveat__fail_check(): errors = [] - value = token.check_caveat( + value = token.Token._check_caveat( '{"version": 1, "permissions": {"projects": ["a", "b"]}}', context=token.Context(project="c"), errors=errors,