Skip to content

Commit

Permalink
Worker container Celery app can now be pre-configured before the work…
Browse files Browse the repository at this point in the history
…er starts (#64)

* Improved parallel tox env resiliency (again)

* Added signals support to unit tests

* Added app.py to worker Dockerfile

* New default_worker_app() to override worker config (before the worker runs)
  • Loading branch information
Nusnus authored Oct 14, 2023
1 parent 0ae9a51 commit 01aafda
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 10 deletions.
2 changes: 2 additions & 0 deletions src/pytest_celery/vendors/worker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ RUN pip install --no-cache-dir --upgrade pip \
# The workdir must be /app
WORKDIR /app

COPY app.py app.py

# Switch to the test_user
USER test_user

Expand Down
16 changes: 13 additions & 3 deletions src/pytest_celery/vendors/worker/app.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
import json
import logging
import sys

from celery import Celery
from celery.signals import after_setup_logger

# Will be replaced with the import strings at runtime
{}
config_updates = None
name = "celery_test_app" # Default name if not provided by the initial content

app = Celery("celery_test_app")
# Will be populated accoring to the initial content
{0}
{1}
app = Celery(name)

{2}

if config_updates:
app.config_from_object(config_updates)
print(f"Config updates from default_worker_app fixture: {json.dumps(config_updates, indent=4)}")


@after_setup_logger.connect
Expand Down
42 changes: 39 additions & 3 deletions src/pytest_celery/vendors/worker/container.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import inspect
from typing import Union

from celery import Celery
from celery.app.base import PendingConfiguration

from pytest_celery.api.container import CeleryTestContainer
from pytest_celery.vendors.worker.defaults import DEFAULT_WORKER_ENV
from pytest_celery.vendors.worker.defaults import DEFAULT_WORKER_LOG_LEVEL
Expand Down Expand Up @@ -47,7 +50,12 @@ def env(cls, celery_worker_cluster_config: dict) -> dict:
return {**DEFAULT_WORKER_ENV, **env}

@classmethod
def initial_content(cls, worker_tasks: set, worker_signals: Union[set, None] = None) -> dict:
def initial_content(
cls,
worker_tasks: set,
worker_signals: Union[set, None] = None,
worker_app: Union[Celery, None] = None,
) -> dict:
from pytest_celery.vendors.worker import app as app_module

app_module_src = inspect.getsource(app_module)
Expand All @@ -58,10 +66,38 @@ def initial_content(cls, worker_tasks: set, worker_signals: Union[set, None] = N
if worker_signals:
initial_content.update(cls._initial_content_worker_signals(worker_signals))
imports["signals_imports"] = initial_content.pop("signals_imports")
if worker_app:
# Accessing the worker_app.conf.changes.data property will trigger the PendingConfiguration to be resolved
# and the changes will be applied to the worker_app.conf, so we make a clone app to avoid affecting the
# original app object.
app = Celery(worker_app.main)
app.conf = worker_app.conf
config_changes_from_defaults = app.conf.changes.copy()
if isinstance(config_changes_from_defaults, PendingConfiguration):
config_changes_from_defaults = config_changes_from_defaults.data.changes
if not isinstance(config_changes_from_defaults, dict):
raise TypeError(f"Unexpected type for config_changes: {type(config_changes_from_defaults)}")
del config_changes_from_defaults["deprecated_settings"]

name_code = f'name = "{worker_app.main}"'
else:
config_changes_from_defaults = {}
name_code = f'name = "{cls.worker_name()}"'

imports_format = "{%s}" % "}{".join(imports.keys())
app_module_src = app_module_src.format(imports_format)
app_module_src = app_module_src.format(**imports)
imports_format = imports_format.format(**imports)
app_module_src = app_module_src.replace("{0}", imports_format)

app_module_src = app_module_src.replace("{1}", name_code)

config_items = (f" {repr(key)}: {repr(value)}" for key, value in config_changes_from_defaults.items())
config_code = (
"config_updates = {\n" + ",\n".join(config_items) + "\n}"
if config_changes_from_defaults
else "config_updates = {}"
)
app_module_src = app_module_src.replace("{2}", config_code)

initial_content["app.py"] = app_module_src.encode()
return initial_content

Expand Down
11 changes: 9 additions & 2 deletions src/pytest_celery/vendors/worker/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
def celery_setup_worker(
default_worker_cls: Type[CeleryTestWorker],
default_worker_container: CeleryWorkerContainer,
celery_setup_app: Celery,
default_worker_app: Celery,
) -> CeleryTestWorker:
worker = default_worker_cls(
container=default_worker_container,
app=celery_setup_app,
app=default_worker_app,
)
yield worker
worker.teardown()
Expand Down Expand Up @@ -103,10 +103,12 @@ def default_worker_initial_content(
default_worker_container_cls: Type[CeleryWorkerContainer],
default_worker_tasks: set,
default_worker_signals: set,
default_worker_app: Celery,
) -> dict:
yield default_worker_container_cls.initial_content(
worker_tasks=default_worker_tasks,
worker_signals=default_worker_signals,
worker_app=default_worker_app,
)


Expand All @@ -118,3 +120,8 @@ def default_worker_tasks(default_worker_container_cls: Type[CeleryWorkerContaine
@pytest.fixture
def default_worker_signals(default_worker_container_cls: Type[CeleryWorkerContainer]) -> set:
yield default_worker_container_cls.signals_modules()


@pytest.fixture
def default_worker_app(celery_setup_app: Celery) -> Celery:
yield celery_setup_app
11 changes: 10 additions & 1 deletion tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,24 @@ def default_worker_container_cls() -> Type[CeleryWorkerContainer]:
def worker_test_container_initial_content(
default_worker_container_cls: Type[CeleryWorkerContainer],
worker_test_container_tasks: set,
worker_test_container_signals: set,
) -> dict:
yield default_worker_container_cls.initial_content(worker_test_container_tasks)
yield default_worker_container_cls.initial_content(
worker_tasks=worker_test_container_tasks,
worker_signals=worker_test_container_signals,
)


@pytest.fixture(scope="session")
def worker_test_container_tasks(default_worker_container_cls: Type[CeleryWorkerContainer]) -> set:
yield default_worker_container_cls.tasks_modules()


@pytest.fixture(scope="session")
def worker_test_container_signals(default_worker_container_cls: Type[CeleryWorkerContainer]) -> set:
yield default_worker_container_cls.signals_modules()


worker_test_container = container(
image="{celery_unit_worker_image.id}",
scope="session",
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ setenv =
PYTHONDONTWRITEBYTECODE = 1
commands =
tox -e py311-unit,py311-integration,py311-smoke -p auto -o -- --exitfirst -n auto \
--reruns 10 --reruns-delay 20 --rerun-except AssertionError {posargs}
--reruns 15 --reruns-delay 10 --rerun-except AssertionError {posargs}

[testenv:mypy]
commands_pre =
Expand Down

0 comments on commit 01aafda

Please sign in to comment.