From f3e9bb30a36cfdb40a0019fe3a11526981e735fd Mon Sep 17 00:00:00 2001 From: Tomer Nosrati Date: Thu, 7 Dec 2023 19:13:09 +0200 Subject: [PATCH] Refactored worker intial content code (#98) * Fixed paths in .github/workflows/docker.yml * Refactored worker intial content code * Added unit tests * Added new fixture: default_worker_app_module() --- .github/workflows/docker.yml | 4 +- src/pytest_celery/vendors/worker/app.py | 19 +- src/pytest_celery/vendors/worker/container.py | 112 +++-------- src/pytest_celery/vendors/worker/fixtures.py | 9 + src/pytest_celery/vendors/worker/volume.py | 127 ++++++++++++ tests/conftest.py | 5 +- tests/integration/vendors/test_worker.py | 15 ++ tests/unit/vendors/test_worker/__init__.py | 0 tests/unit/vendors/test_worker/test_volume.py | 187 ++++++++++++++++++ .../vendors/{ => test_worker}/test_worker.py | 12 +- 10 files changed, 383 insertions(+), 107 deletions(-) create mode 100644 src/pytest_celery/vendors/worker/volume.py create mode 100644 tests/unit/vendors/test_worker/__init__.py create mode 100644 tests/unit/vendors/test_worker/test_volume.py rename tests/unit/vendors/{ => test_worker}/test_worker.py (91%) diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 224ab154..2f0c6c2d 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -4,13 +4,13 @@ on: pull_request: branches: [ 'main'] paths: - - '/src/pytest_celery/vendors/worker/**' + - 'src/pytest_celery/vendors/worker/**' - '.github/workflows/docker.yml' - 'Dockerfile' push: branches: [ 'main'] paths: - - '/src/pytest_celery/vendors/worker/**' + - 'src/pytest_celery/vendors/worker/**' - '.github/workflows/docker.yml' - 'Dockerfile' diff --git a/src/pytest_celery/vendors/worker/app.py b/src/pytest_celery/vendors/worker/app.py index 521fdf65..a3fa2624 100644 --- a/src/pytest_celery/vendors/worker/app.py +++ b/src/pytest_celery/vendors/worker/app.py @@ -1,3 +1,5 @@ +""" Template for Celery worker application. """ + import json import logging import sys @@ -5,19 +7,14 @@ from celery import Celery from celery.signals import after_setup_logger -config_updates = None -name = "celery_test_app" # Default name if not provided by the initial content - -# Will be populated accoring to the initial content -{0} -{1} -app = Celery(name) +imports = None -{2} +app = Celery("celery_test_app") +config = None -if config_updates: - app.config_from_object(config_updates) - print(f"Config updates from default_worker_app fixture: {json.dumps(config_updates, indent=4)}") +if config: + app.config_from_object(config) + print(f"Changed worker configuration: {json.dumps(config, indent=4)}") @after_setup_logger.connect diff --git a/src/pytest_celery/vendors/worker/container.py b/src/pytest_celery/vendors/worker/container.py index a2a7c410..7c0cc35c 100644 --- a/src/pytest_celery/vendors/worker/container.py +++ b/src/pytest_celery/vendors/worker/container.py @@ -1,9 +1,8 @@ from __future__ import annotations -import inspect +from types import ModuleType from celery import Celery -from celery.app.base import PendingConfiguration from pytest_celery.api.container import CeleryTestContainer from pytest_celery.vendors.worker.defaults import DEFAULT_WORKER_ENV @@ -11,6 +10,7 @@ from pytest_celery.vendors.worker.defaults import DEFAULT_WORKER_NAME from pytest_celery.vendors.worker.defaults import DEFAULT_WORKER_QUEUE from pytest_celery.vendors.worker.defaults import DEFAULT_WORKER_VERSION +from pytest_celery.vendors.worker.volume import WorkerInitialContent class CeleryWorkerContainer(CeleryTestContainer): @@ -37,9 +37,17 @@ def worker_name(cls) -> str: def worker_queue(cls) -> str: return DEFAULT_WORKER_QUEUE + @classmethod + def app_module(cls) -> ModuleType: + from pytest_celery.vendors.worker import app + + return app + @classmethod def tasks_modules(cls) -> set: - return set() + from pytest_celery.vendors.worker import tasks + + return {tasks} @classmethod def signals_modules(cls) -> set: @@ -76,96 +84,24 @@ def env(cls, celery_worker_cluster_config: dict, initial: dict | None = None) -> @classmethod def initial_content( cls, - worker_tasks: set, + worker_tasks: set | None = None, worker_signals: set | None = None, worker_app: Celery | None = None, + app_module: ModuleType | None = None, ) -> dict: - from pytest_celery.vendors.worker import app as app_module + if app_module is None: + app_module = cls.app_module() - app_module_src = inspect.getsource(app_module) + if worker_tasks is None: + worker_tasks = cls.tasks_modules() - imports = dict() - initial_content = cls._initial_content_worker_tasks(worker_tasks) - imports["tasks_imports"] = initial_content.pop("tasks_imports") + content = WorkerInitialContent() + content.set_app_module(app_module) + content.add_modules("tasks", worker_tasks) if worker_signals: - initial_content.update(cls._initial_content_worker_signals(worker_signals)) - imports["signals_imports"] = initial_content.pop("signals_imports") + content.add_modules("signals", worker_signals) if worker_app: - # Accessing the worker_app.conf.changes.data property will trigger the PendingConfiguration to be resolved - # and the changes will be applied to the worker_app.conf, so we make a clone app to avoid affecting the - # original app object. - app = Celery(worker_app.main) - app.conf = worker_app.conf - config_changes_from_defaults = app.conf.changes.copy() - if isinstance(config_changes_from_defaults, PendingConfiguration): - config_changes_from_defaults = config_changes_from_defaults.data.changes - if not isinstance(config_changes_from_defaults, dict): - raise TypeError(f"Unexpected type for config_changes: {type(config_changes_from_defaults)}") - del config_changes_from_defaults["deprecated_settings"] - - name_code = f'name = "{worker_app.main}"' - else: - config_changes_from_defaults = {} - name_code = f'name = "{cls.worker_name()}"' - - imports_format = "{%s}" % "}{".join(imports.keys()) - imports_format = imports_format.format(**imports) - app_module_src = app_module_src.replace("{0}", imports_format) - - app_module_src = app_module_src.replace("{1}", name_code) - - config_items = (f" {repr(key)}: {repr(value)}" for key, value in config_changes_from_defaults.items()) - config_code = ( - "config_updates = {\n" + ",\n".join(config_items) + "\n}" - if config_changes_from_defaults - else "config_updates = {}" - ) - app_module_src = app_module_src.replace("{2}", config_code) - - initial_content["app.py"] = app_module_src.encode() - return initial_content - - @classmethod - def _initial_content_worker_tasks(cls, worker_tasks: set) -> dict: - from pytest_celery.vendors.worker import tasks - - worker_tasks.add(tasks) + content.set_app_name(worker_app.main) + content.set_config_from_object(worker_app) - import_string = "" - - for module in worker_tasks: - import_string += f"from {module.__name__} import *\n" - - initial_content = { - "__init__.py": b"", - "tasks_imports": import_string, - } - if worker_tasks: - default_worker_tasks_src = { - f"{module.__name__.replace('.', '/')}.py": inspect.getsource(module).encode() for module in worker_tasks - } - initial_content.update(default_worker_tasks_src) - else: - print("No tasks found") - return initial_content - - @classmethod - def _initial_content_worker_signals(cls, worker_signals: set) -> dict: - import_string = "" - - for module in worker_signals: - import_string += f"from {module.__name__} import *\n" - - initial_content = { - "__init__.py": b"", - "signals_imports": import_string, - } - if worker_signals: - default_worker_signals_src = { - f"{module.__name__.replace('.', '/')}.py": inspect.getsource(module).encode() - for module in worker_signals - } - initial_content.update(default_worker_signals_src) - else: - print("No signals found") - return initial_content + return content.generate() diff --git a/src/pytest_celery/vendors/worker/fixtures.py b/src/pytest_celery/vendors/worker/fixtures.py index fe71baa7..2ca78e53 100644 --- a/src/pytest_celery/vendors/worker/fixtures.py +++ b/src/pytest_celery/vendors/worker/fixtures.py @@ -2,6 +2,8 @@ from __future__ import annotations +from types import ModuleType + import pytest from celery import Celery from pytest_docker_tools import build @@ -101,17 +103,24 @@ def default_worker_env( @pytest.fixture def default_worker_initial_content( default_worker_container_cls: type[CeleryWorkerContainer], + default_worker_app_module: ModuleType, default_worker_tasks: set, default_worker_signals: set, default_worker_app: Celery, ) -> dict: yield default_worker_container_cls.initial_content( + app_module=default_worker_app_module, worker_tasks=default_worker_tasks, worker_signals=default_worker_signals, worker_app=default_worker_app, ) +@pytest.fixture +def default_worker_app_module(default_worker_container_cls: type[CeleryWorkerContainer]) -> ModuleType: + yield default_worker_container_cls.app_module() + + @pytest.fixture def default_worker_tasks(default_worker_container_cls: type[CeleryWorkerContainer]) -> set: yield default_worker_container_cls.tasks_modules() diff --git a/src/pytest_celery/vendors/worker/volume.py b/src/pytest_celery/vendors/worker/volume.py new file mode 100644 index 00000000..aa8b4a9f --- /dev/null +++ b/src/pytest_celery/vendors/worker/volume.py @@ -0,0 +1,127 @@ +from __future__ import annotations + +import inspect +from types import ModuleType +from typing import Any + +from celery import Celery +from celery.app.base import PendingConfiguration + +from pytest_celery.vendors.worker.defaults import DEFAULT_WORKER_APP_NAME + + +class WorkerInitialContent: + class Parser: + def imports_str(self, modules: set[ModuleType]) -> str: + return "".join(f"from {module.__name__} import *\n" for module in modules) + + def imports_src(self, modules: set[ModuleType]) -> dict: + src = dict() + for module in modules: + src[f"{module.__name__.replace('.', '/')}.py"] = inspect.getsource(module).encode() + return src + + def app_name(self, name: str | None = None) -> str: + name = name or DEFAULT_WORKER_APP_NAME + return f"app = Celery('{name}')" + + def config(self, app: Celery | None = None) -> str: + app = app or Celery(DEFAULT_WORKER_APP_NAME) + + # Accessing the app.conf.changes.data property will trigger the PendingConfiguration to be resolved + # and the changes will be applied to the app.conf, so we make a clone app to avoid affecting the + # original app object. + tmp_app = Celery(app.main) + tmp_app.conf = app.conf + + changes = tmp_app.conf.changes.copy() + if isinstance(changes, PendingConfiguration): + changes = changes.data.changes + if not isinstance(changes, dict): + raise TypeError(f"Unexpected type for app.conf.changes: {type(changes)}") + del changes["deprecated_settings"] + + if changes: + changes = (f"\t{repr(key)}: {repr(value)}" for key, value in changes.items()) + config = "config = {\n" + ",\n".join(changes) + "\n}" if changes else "config = None" + else: + config = "config = None" + return config + + def __init__(self, app_module: ModuleType | None = None) -> None: + self.parser = self.Parser() + self._initial_content = { + "__init__.py": b"", + "imports": dict(), + } + self.set_app_module(app_module) + self.set_app_name() + self.set_config_from_object() + + def __eq__(self, __value: object) -> bool: + if not isinstance(__value, WorkerInitialContent): + return False + try: + return self.generate() == __value.generate() + except ValueError: + return all( + [ + self._app_module_src == __value._app_module_src, + self._initial_content == __value._initial_content, + self._app == __value._app, + self._config == __value._config, + ] + ) + + def set_app_module(self, app_module: ModuleType | None = None) -> None: + self._app_module_src: str | None + + if app_module: + self._app_module_src = inspect.getsource(app_module) + else: + self._app_module_src = None + + def add_modules(self, name: str, modules: set[ModuleType]) -> None: + if not name: + raise ValueError("name cannot be empty") + + if not modules: + raise ValueError("modules cannot be empty") + + self._initial_content["imports"][name] = self.parser.imports_str(modules) # type: ignore + self._initial_content.update(self.parser.imports_src(modules)) + + def set_app_name(self, name: str | None = None) -> None: + name = name or DEFAULT_WORKER_APP_NAME + self._app = self.parser.app_name(name) + + def set_config_from_object(self, app: Celery | None = None) -> None: + self._config = self.parser.config(app) + + def generate(self) -> dict: + if not self._app_module_src: + raise ValueError("Please set_app_module() before calling generate()") + + initial_content = self._initial_content.copy() + + if not initial_content["imports"]: + raise ValueError("Please add_modules() before calling generate()") + + _imports: dict | Any = initial_content.pop("imports") + imports = "{%s}" % "}{".join(_imports.keys()) + imports = imports.format(**_imports) + + app, config = self._app, self._config + + replacement_args = { + "imports": "imports = None", + "app": f'app = Celery("{DEFAULT_WORKER_APP_NAME}")', + "config": "config = None", + } + self._app_module_src = self._app_module_src.replace(replacement_args["imports"], imports) + self._app_module_src = self._app_module_src.replace(replacement_args["app"], app) + self._app_module_src = self._app_module_src.replace(replacement_args["config"], config) + + initial_content["app.py"] = self._app_module_src.encode() + + return initial_content diff --git a/tests/conftest.py b/tests/conftest.py index 24337780..5be90e65 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -2,7 +2,8 @@ @pytest.fixture -def default_worker_tasks() -> set: +def default_worker_tasks(default_worker_tasks: set) -> set: from tests import tasks - yield {tasks} + default_worker_tasks.add(tasks) + yield default_worker_tasks diff --git a/tests/integration/vendors/test_worker.py b/tests/integration/vendors/test_worker.py index 23d9412b..33a7c46d 100644 --- a/tests/integration/vendors/test_worker.py +++ b/tests/integration/vendors/test_worker.py @@ -1,5 +1,7 @@ from __future__ import annotations +from types import ModuleType + import pytest from pytest_lazyfixture import lazy_fixture @@ -32,6 +34,19 @@ def test_disabling_backend_cluster(self, container: CeleryWorkerContainer): results = DEFAULT_WORKER_ENV["CELERY_BROKER_URL"] assert container.logs().count(f"transport: {results}") + class test_replacing_app_module: + @pytest.fixture(params=["Default", "Custom"]) + def default_worker_app_module(self, request: pytest.FixtureRequest) -> ModuleType: + if request.param == "Default": + yield request.getfixturevalue("default_worker_app_module") + else: + from pytest_celery.vendors.worker import app + + yield app + + def test_replacing_app_module(self, container: CeleryWorkerContainer, default_worker_app_module: ModuleType): + assert container.app_module() == default_worker_app_module + @pytest.mark.parametrize("worker", [lazy_fixture(CELERY_SETUP_WORKER)]) class test_base_test_worker: diff --git a/tests/unit/vendors/test_worker/__init__.py b/tests/unit/vendors/test_worker/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/vendors/test_worker/test_volume.py b/tests/unit/vendors/test_worker/test_volume.py new file mode 100644 index 00000000..00568b0f --- /dev/null +++ b/tests/unit/vendors/test_worker/test_volume.py @@ -0,0 +1,187 @@ +from __future__ import annotations + +import inspect +from types import ModuleType + +import pytest +from celery import Celery + +import pytest_celery +from pytest_celery import DEFAULT_WORKER_APP_NAME +from pytest_celery import WorkerInitialContent + + +class test_worker_initial_content: + @pytest.mark.parametrize("parser", [WorkerInitialContent.Parser()]) + class test_parser: + @pytest.mark.parametrize( + "modules", + [ + set(), + {pytest}, + {pytest, pytest_celery}, + ], + ) + class test_imports: + def test_imports_str(self, parser: WorkerInitialContent.Parser, modules: set[ModuleType]): + assert parser.imports_str(modules) == "".join( + f"from {module.__name__} import *\n" for module in modules + ) + + def test_imports_src(self, parser: WorkerInitialContent.Parser, modules: set[ModuleType]): + assert parser.imports_src(modules) == { + f"{module.__name__.replace('.', '/')}.py": inspect.getsource(module).encode() for module in modules + } + + @pytest.mark.parametrize( + "name", + [ + None, + "app", + "test_app", + ], + ) + def test_app_name(self, parser: WorkerInitialContent.Parser, name: str): + assert parser.app_name(name) == f"app = Celery('{name or DEFAULT_WORKER_APP_NAME}')" + + @pytest.mark.parametrize( + "app,config", + [ + (None, None), + (Celery("test"), None), + (Celery(), {"broker_url": "420"}), + ], + ) + def test_config(self, parser: WorkerInitialContent.Parser, app: Celery | None, config: dict | None): + if app and config: + app.conf.update(config) + + if config: + changes = (f"\t{repr(key)}: {repr(value)}" for key, value in config.items()) + expected_config = "config = {\n" + ",\n".join(changes) + "\n}" if changes else "config = None" + else: + expected_config = "config = None" + + assert parser.config(app) == expected_config + + def test_init(self): + actual_content = WorkerInitialContent() + assert isinstance(actual_content.parser, WorkerInitialContent.Parser) + assert actual_content._initial_content == { + "__init__.py": b"", + "imports": dict(), + } + assert actual_content._app == f"app = Celery('{DEFAULT_WORKER_APP_NAME}')" + assert actual_content._config == "config = None" + + def test_eq(self): + from pytest_celery.vendors.worker import app as app_module + + actual_content = WorkerInitialContent() + + assert actual_content == WorkerInitialContent() + assert actual_content == actual_content + + actual_content.set_app_module(app_module) + actual_content.add_modules("pytest", {pytest}) + assert actual_content != WorkerInitialContent() + + def test_set_app_module(self): + from pytest_celery.vendors.worker import app as app_module + + actual_content = WorkerInitialContent() + + actual_content.set_app_module(app_module) + assert actual_content._app_module_src == inspect.getsource(app_module) + + actual_content.set_app_module(None) + assert actual_content._app_module_src is None + + @pytest.mark.parametrize( + "modules", + [ + { + "pytest": {pytest}, + }, + { + "pytest": {pytest}, + "pytest_celery": {pytest_celery}, + }, + { + "pytest": {pytest, pytest_celery}, + "pytest_celery": {pytest, pytest_celery}, + }, + ], + ) + def test_add_modules(self, modules: dict[str, set[ModuleType]]): + from pytest_celery.vendors.worker import app as app_module + + actual_content = WorkerInitialContent() + + expected_content = WorkerInitialContent() + actual_content.set_app_module(app_module) + expected_content.set_app_module(app_module) + + for module_name, module_set in modules.items(): + actual_content.add_modules(module_name, module_set) + + actual_intial_content = actual_content._initial_content + assert "imports" in actual_intial_content + assert modules.keys() == actual_intial_content["imports"].keys() + + for module_name, modules_set in modules.items(): + expected_content._initial_content["imports"][module_name] = actual_content.parser.imports_str(modules_set) + expected_content._initial_content.update(actual_content.parser.imports_src(modules_set)) + + assert actual_content == expected_content + + def test_set_app_name(self): + actual_content = WorkerInitialContent() + + actual_content.set_app_name() + assert actual_content._app == f"app = Celery('{DEFAULT_WORKER_APP_NAME}')" + + actual_content.set_app_name("test") + assert actual_content._app == "app = Celery('test')" + + def test_set_config_from_object(self): + actual_content = WorkerInitialContent() + + actual_content.set_config_from_object() + assert actual_content._config == "config = None" + + actual_content.set_config_from_object(Celery("test")) + assert actual_content._config == "config = None" + + actual_content.set_config_from_object(Celery("test", broker_url="420")) + assert actual_content._config == "config = {\n\t'broker_url': '420'\n}" + + class test_generate: + def test_generate_default(self): + actual_content = WorkerInitialContent() + + with pytest.raises(ValueError): + actual_content.generate() + + @pytest.mark.parametrize( + "app", + [ + None, + Celery("test"), + Celery("test", broker_url="420"), + ], + ) + def test_generate(self, app: Celery | None): + from pytest_celery.vendors.worker import app as app_module + + actual_content = WorkerInitialContent() + + actual_content.set_app_module(app_module) + actual_content.add_modules("tests_modules", {pytest, pytest_celery}) + actual_content.set_config_from_object(app) + + expected_content = WorkerInitialContent(app_module) + expected_content.add_modules("tests_modules", {pytest, pytest_celery}) + expected_content.set_config_from_object(app) + + assert actual_content.generate() == expected_content.generate() diff --git a/tests/unit/vendors/test_worker.py b/tests/unit/vendors/test_worker/test_worker.py similarity index 91% rename from tests/unit/vendors/test_worker.py rename to tests/unit/vendors/test_worker/test_worker.py index a1adca7e..75d85220 100644 --- a/tests/unit/vendors/test_worker.py +++ b/tests/unit/vendors/test_worker/test_worker.py @@ -27,8 +27,15 @@ def test_worker_name(self): def test_worker_queue(self): assert CeleryWorkerContainer.worker_queue() == DEFAULT_WORKER_QUEUE + def test_app_module(self): + from pytest_celery.vendors.worker import app + + assert CeleryWorkerContainer.app_module() == app + def test_tasks_modules(self): - assert CeleryWorkerContainer.tasks_modules() == set() + from pytest_celery.vendors.worker import tasks + + assert CeleryWorkerContainer.tasks_modules() == {tasks} def test_signals_modules(self): assert CeleryWorkerContainer.signals_modules() == set() @@ -77,6 +84,3 @@ def test_initial_content_import_formatting(self): actual_initial_content = CeleryWorkerContainer.initial_content({tasks}) assert "from tests.tasks import *" in str(actual_initial_content["app.py"]) - - def test_task_modules(self): - assert CeleryWorkerContainer.tasks_modules() == set()