From 439d7c165a70738725eda44f8395012e745266db Mon Sep 17 00:00:00 2001 From: Tomer Nosrati Date: Tue, 13 Feb 2024 21:59:51 +0200 Subject: [PATCH] Feature: New worker fixture "default_worker_command" --- docs/getting-started/next-steps.rst | 1 + docs/userguide/examples/django.rst | 1 + docs/userguide/examples/myworker.rst | 1 + examples/django/tests/conftest.py | 1 + examples/myworker/tests/myworker/myworker.py | 1 + src/pytest_celery/api/container.py | 3 --- src/pytest_celery/vendors/worker/container.py | 16 +++++++++++++ src/pytest_celery/vendors/worker/fixtures.py | 14 +++++++++++ .../integration/api/custom_setup/conftest.py | 2 ++ tests/integration/conftest.py | 1 + tests/integration/vendors/test_worker.py | 8 +++++++ tests/smoke/conftest.py | 2 ++ tests/smoke/test_worker.py | 24 +++++++++++++++++++ 13 files changed, 72 insertions(+), 3 deletions(-) create mode 100644 tests/smoke/test_worker.py diff --git a/docs/getting-started/next-steps.rst b/docs/getting-started/next-steps.rst index b6e65e71e..ec7c0546c 100644 --- a/docs/getting-started/next-steps.rst +++ b/docs/getting-started/next-steps.rst @@ -263,6 +263,7 @@ And build our container using the standard `pytest-docker-tools str: }, wrapper_class=DjangoWorkerContainer, timeout=defaults.DEFAULT_WORKER_CONTAINER_TIMEOUT, + command=fxtr("default_worker_command"), ) diff --git a/examples/myworker/tests/myworker/myworker.py b/examples/myworker/tests/myworker/myworker.py index 0e12a9f99..e4bcf5cba 100644 --- a/examples/myworker/tests/myworker/myworker.py +++ b/examples/myworker/tests/myworker/myworker.py @@ -50,6 +50,7 @@ def worker_queue(cls) -> str: volumes={"{default_worker_volume.name}": defaults.DEFAULT_WORKER_VOLUME}, wrapper_class=MyWorkerContainer, timeout=defaults.DEFAULT_WORKER_CONTAINER_TIMEOUT, + command=fxtr("default_worker_command"), ) diff --git a/src/pytest_celery/api/container.py b/src/pytest_celery/api/container.py index 609e2fdef..1906cb012 100644 --- a/src/pytest_celery/api/container.py +++ b/src/pytest_celery/api/container.py @@ -71,9 +71,6 @@ def command(cls, *args: str) -> list[str]: the first element, followed by the command-line arguments. """ - # To be used with pytest_docker_tools.container using the command - # kwarg with the class method as value - # e.g. command=MyContainer.command() raise NotImplementedError("CeleryTestContainer.command") def teardown(self) -> None: diff --git a/src/pytest_celery/vendors/worker/container.py b/src/pytest_celery/vendors/worker/container.py index 600311b07..f973e8ce7 100644 --- a/src/pytest_celery/vendors/worker/container.py +++ b/src/pytest_celery/vendors/worker/container.py @@ -32,6 +32,22 @@ class CeleryWorkerContainer(CeleryTestContainer): dependencies of your project. """ + @classmethod + def command(cls, *args: str) -> list[str]: + args = args or tuple() + return [ + "celery", + "-A", + "app", + "worker", + f"--loglevel={cls.log_level()}", + "-n", + f"{cls.worker_name()}@%h", + "-Q", + f"{cls.worker_queue()}", + *args, + ] + def _wait_port(self, port: str) -> int: # Not needed for worker container raise NotImplementedError diff --git a/src/pytest_celery/vendors/worker/fixtures.py b/src/pytest_celery/vendors/worker/fixtures.py index 1e622665d..83625e67c 100644 --- a/src/pytest_celery/vendors/worker/fixtures.py +++ b/src/pytest_celery/vendors/worker/fixtures.py @@ -91,6 +91,7 @@ def default_worker_container_session_cls() -> type[CeleryWorkerContainer]: volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME}, wrapper_class=CeleryWorkerContainer, timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT, + command=fxtr("default_worker_command"), ) celery_base_worker_image = build( @@ -161,6 +162,19 @@ def default_worker_celery_worker_queue(default_worker_container_session_cls: typ return default_worker_container_session_cls.worker_queue() +@pytest.fixture +def default_worker_command(default_worker_container_cls: type[CeleryWorkerContainer]) -> list[str]: + """Command to run the container. + + Args: + default_worker_container_cls (type[CeleryWorkerContainer]): See also: :ref:`vendor-class`. + + Returns: + list[str]: Docker CMD instruction. + """ + return default_worker_container_cls.command() + + @pytest.fixture def default_worker_env( default_worker_container_cls: type[CeleryWorkerContainer], diff --git a/tests/integration/api/custom_setup/conftest.py b/tests/integration/api/custom_setup/conftest.py index 47e1d0afb..8e5113804 100644 --- a/tests/integration/api/custom_setup/conftest.py +++ b/tests/integration/api/custom_setup/conftest.py @@ -52,6 +52,7 @@ def worker_queue(cls) -> str: volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME}, wrapper_class=Celery4WorkerContainer, timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT, + command=fxtr("default_worker_command"), ) @@ -110,6 +111,7 @@ def default_worker_container_session_cls() -> type[CeleryWorkerContainer]: volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME}, wrapper_class=Celery5WorkerContainer, timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT, + command=fxtr("default_worker_command"), ) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 1bee7de44..03f1a0a19 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -56,4 +56,5 @@ def default_worker_container_session_cls() -> type[CeleryWorkerContainer]: volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME}, wrapper_class=IntegrationWorkerContainer, timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT, + command=fxtr("default_worker_command"), ) diff --git a/tests/integration/vendors/test_worker.py b/tests/integration/vendors/test_worker.py index 5b59a394e..ea28fed31 100644 --- a/tests/integration/vendors/test_worker.py +++ b/tests/integration/vendors/test_worker.py @@ -50,6 +50,14 @@ def default_worker_app_module(self, request: pytest.FixtureRequest) -> ModuleTyp def test_replacing_app_module(self, container: CeleryWorkerContainer, default_worker_app_module: ModuleType): assert container.app_module() == default_worker_app_module + class test_replacing_command: + @pytest.fixture + def default_worker_command(self, default_worker_command: list[str]) -> list[str]: + return [*default_worker_command, "--pool", "solo"] + + def test_replacing_command(self, container: CeleryWorkerContainer): + assert container.logs().count("solo") == 1 + class test_base_test_worker: def test_config(self, celery_setup_worker: CeleryTestWorker): diff --git a/tests/smoke/conftest.py b/tests/smoke/conftest.py index 1e7683c1f..6ab9cd263 100644 --- a/tests/smoke/conftest.py +++ b/tests/smoke/conftest.py @@ -48,6 +48,7 @@ def worker_queue(cls) -> str: volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME}, wrapper_class=CeleryLatestWorkerContainer, timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT, + command=fxtr("default_worker_command"), ) @@ -102,6 +103,7 @@ def default_worker_container_session_cls() -> type[CeleryWorkerContainer]: volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME}, wrapper_class=SmokeWorkerContainer, timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT, + command=fxtr("default_worker_command"), ) diff --git a/tests/smoke/test_worker.py b/tests/smoke/test_worker.py new file mode 100644 index 000000000..199e45bda --- /dev/null +++ b/tests/smoke/test_worker.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +import pytest + +from pytest_celery import CeleryTestSetup + + +@pytest.mark.parametrize( + "pool", + [ + "solo", + "prefork", + "threads", + ], +) +class test_replacing_pool: + @pytest.fixture + def default_worker_command(self, default_worker_command: list[str], pool: str) -> list[str]: + return [*default_worker_command, "--pool", pool] + + def test_pool_from_celery_banner(self, celery_setup: CeleryTestSetup, pool: str): + if pool == "threads": + pool = "thread" + celery_setup.worker.assert_log_exists(pool)