diff --git a/src/pytest_celery/vendors/worker/Dockerfile b/src/pytest_celery/vendors/worker/Dockerfile index ad885a31..23fce2d4 100644 --- a/src/pytest_celery/vendors/worker/Dockerfile +++ b/src/pytest_celery/vendors/worker/Dockerfile @@ -25,6 +25,8 @@ RUN pip install --no-cache-dir --upgrade pip \ # The workdir must be /app WORKDIR /app +COPY app.py app.py + # Switch to the test_user USER test_user diff --git a/src/pytest_celery/vendors/worker/app.py b/src/pytest_celery/vendors/worker/app.py index d92ecdfb..521fdf65 100644 --- a/src/pytest_celery/vendors/worker/app.py +++ b/src/pytest_celery/vendors/worker/app.py @@ -1,13 +1,23 @@ +import json import logging import sys from celery import Celery from celery.signals import after_setup_logger -# Will be replaced with the import strings at runtime -{} +config_updates = None +name = "celery_test_app" # Default name if not provided by the initial content -app = Celery("celery_test_app") +# Will be populated accoring to the initial content +{0} +{1} +app = Celery(name) + +{2} + +if config_updates: + app.config_from_object(config_updates) + print(f"Config updates from default_worker_app fixture: {json.dumps(config_updates, indent=4)}") @after_setup_logger.connect diff --git a/src/pytest_celery/vendors/worker/container.py b/src/pytest_celery/vendors/worker/container.py index 131fc590..89c1965b 100644 --- a/src/pytest_celery/vendors/worker/container.py +++ b/src/pytest_celery/vendors/worker/container.py @@ -1,6 +1,9 @@ import inspect from typing import Union +from celery import Celery +from celery.app.base import PendingConfiguration + from pytest_celery.api.container import CeleryTestContainer from pytest_celery.vendors.worker.defaults import DEFAULT_WORKER_ENV from pytest_celery.vendors.worker.defaults import DEFAULT_WORKER_LOG_LEVEL @@ -47,7 +50,12 @@ def env(cls, celery_worker_cluster_config: dict) -> dict: return {**DEFAULT_WORKER_ENV, **env} @classmethod - def initial_content(cls, worker_tasks: set, worker_signals: Union[set, None] = None) -> dict: + def initial_content( + cls, + worker_tasks: set, + worker_signals: Union[set, None] = None, + worker_app: Union[Celery, None] = None, + ) -> dict: from pytest_celery.vendors.worker import app as app_module app_module_src = inspect.getsource(app_module) @@ -58,10 +66,38 @@ def initial_content(cls, worker_tasks: set, worker_signals: Union[set, None] = N if worker_signals: initial_content.update(cls._initial_content_worker_signals(worker_signals)) imports["signals_imports"] = initial_content.pop("signals_imports") + if worker_app: + # Accessing the worker_app.conf.changes.data property will trigger the PendingConfiguration to be resolved + # and the changes will be applied to the worker_app.conf, so we make a clone app to avoid affecting the + # original app object. + app = Celery(worker_app.main) + app.conf = worker_app.conf + config_changes_from_defaults = app.conf.changes.copy() + if isinstance(config_changes_from_defaults, PendingConfiguration): + config_changes_from_defaults = config_changes_from_defaults.data.changes + if not isinstance(config_changes_from_defaults, dict): + raise TypeError(f"Unexpected type for config_changes: {type(config_changes_from_defaults)}") + del config_changes_from_defaults["deprecated_settings"] + + name_code = f'name = "{worker_app.main}"' + else: + config_changes_from_defaults = {} + name_code = f'name = "{cls.worker_name()}"' imports_format = "{%s}" % "}{".join(imports.keys()) - app_module_src = app_module_src.format(imports_format) - app_module_src = app_module_src.format(**imports) + imports_format = imports_format.format(**imports) + app_module_src = app_module_src.replace("{0}", imports_format) + + app_module_src = app_module_src.replace("{1}", name_code) + + config_items = (f" {repr(key)}: {repr(value)}" for key, value in config_changes_from_defaults.items()) + config_code = ( + "config_updates = {\n" + ",\n".join(config_items) + "\n}" + if config_changes_from_defaults + else "config_updates = {}" + ) + app_module_src = app_module_src.replace("{2}", config_code) + initial_content["app.py"] = app_module_src.encode() return initial_content diff --git a/src/pytest_celery/vendors/worker/fixtures.py b/src/pytest_celery/vendors/worker/fixtures.py index ae7bd8da..7a66eae3 100644 --- a/src/pytest_celery/vendors/worker/fixtures.py +++ b/src/pytest_celery/vendors/worker/fixtures.py @@ -20,11 +20,11 @@ def celery_setup_worker( default_worker_cls: Type[CeleryTestWorker], default_worker_container: CeleryWorkerContainer, - celery_setup_app: Celery, + default_worker_app: Celery, ) -> CeleryTestWorker: worker = default_worker_cls( container=default_worker_container, - app=celery_setup_app, + app=default_worker_app, ) yield worker worker.teardown() @@ -103,10 +103,12 @@ def default_worker_initial_content( default_worker_container_cls: Type[CeleryWorkerContainer], default_worker_tasks: set, default_worker_signals: set, + default_worker_app: Celery, ) -> dict: yield default_worker_container_cls.initial_content( worker_tasks=default_worker_tasks, worker_signals=default_worker_signals, + worker_app=default_worker_app, ) @@ -118,3 +120,8 @@ def default_worker_tasks(default_worker_container_cls: Type[CeleryWorkerContaine @pytest.fixture def default_worker_signals(default_worker_container_cls: Type[CeleryWorkerContainer]) -> set: yield default_worker_container_cls.signals_modules() + + +@pytest.fixture +def default_worker_app(celery_setup_app: Celery) -> Celery: + yield celery_setup_app diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index 6d4c6b93..acf1b416 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -79,8 +79,12 @@ def default_worker_container_cls() -> Type[CeleryWorkerContainer]: def worker_test_container_initial_content( default_worker_container_cls: Type[CeleryWorkerContainer], worker_test_container_tasks: set, + worker_test_container_signals: set, ) -> dict: - yield default_worker_container_cls.initial_content(worker_test_container_tasks) + yield default_worker_container_cls.initial_content( + worker_tasks=worker_test_container_tasks, + worker_signals=worker_test_container_signals, + ) @pytest.fixture(scope="session") @@ -88,6 +92,11 @@ def worker_test_container_tasks(default_worker_container_cls: Type[CeleryWorkerC yield default_worker_container_cls.tasks_modules() +@pytest.fixture(scope="session") +def worker_test_container_signals(default_worker_container_cls: Type[CeleryWorkerContainer]) -> set: + yield default_worker_container_cls.signals_modules() + + worker_test_container = container( image="{celery_unit_worker_image.id}", scope="session", diff --git a/tox.ini b/tox.ini index 2902cc4d..7b8de70e 100644 --- a/tox.ini +++ b/tox.ini @@ -57,7 +57,7 @@ setenv = PYTHONDONTWRITEBYTECODE = 1 commands = tox -e py311-unit,py311-integration,py311-smoke -p auto -o -- --exitfirst -n auto \ - --reruns 10 --reruns-delay 20 --rerun-except AssertionError {posargs} + --reruns 15 --reruns-delay 10 --rerun-except AssertionError {posargs} [testenv:mypy] commands_pre =