diff --git a/docs/userguide/examples/django.rst b/docs/userguide/examples/django.rst index 7305aa18..7c135541 100644 --- a/docs/userguide/examples/django.rst +++ b/docs/userguide/examples/django.rst @@ -144,6 +144,7 @@ and `container str: }, wrapper_class=DjangoWorkerContainer, timeout=defaults.DEFAULT_WORKER_CONTAINER_TIMEOUT, + command=fxtr("default_worker_command"), ) diff --git a/src/pytest_celery/api/container.py b/src/pytest_celery/api/container.py index 609e2fde..1906cb01 100644 --- a/src/pytest_celery/api/container.py +++ b/src/pytest_celery/api/container.py @@ -71,9 +71,6 @@ def command(cls, *args: str) -> list[str]: the first element, followed by the command-line arguments. """ - # To be used with pytest_docker_tools.container using the command - # kwarg with the class method as value - # e.g. command=MyContainer.command() raise NotImplementedError("CeleryTestContainer.command") def teardown(self) -> None: diff --git a/src/pytest_celery/vendors/redis/backend/fixtures.py b/src/pytest_celery/vendors/redis/backend/fixtures.py index c0caf5ce..c9ac834f 100644 --- a/src/pytest_celery/vendors/redis/backend/fixtures.py +++ b/src/pytest_celery/vendors/redis/backend/fixtures.py @@ -50,10 +50,23 @@ def default_redis_backend_cls() -> type[RedisContainer]: network="{default_pytest_celery_network.name}", wrapper_class=RedisContainer, timeout=REDIS_CONTAINER_TIMEOUT, - command=RedisContainer.command("--maxclients", "100000"), + command=fxtr("default_redis_backend_command"), ) +@pytest.fixture +def default_redis_backend_command(default_redis_backend_cls: type[RedisContainer]) -> list[str]: + """Command to run the container. + + Args: + default_redis_backend_cls (type[RedisContainer]): See also: :ref:`vendor-class`. + + Returns: + list[str]: Docker CMD instruction. + """ + return default_redis_backend_cls.command("--maxclients", "100000") + + @pytest.fixture def default_redis_backend_env(default_redis_backend_cls: type[RedisContainer]) -> dict: """Environment variables for this vendor. diff --git a/src/pytest_celery/vendors/redis/broker/fixtures.py b/src/pytest_celery/vendors/redis/broker/fixtures.py index 1a978133..10bada61 100644 --- a/src/pytest_celery/vendors/redis/broker/fixtures.py +++ b/src/pytest_celery/vendors/redis/broker/fixtures.py @@ -50,10 +50,23 @@ def default_redis_broker_cls() -> type[RedisContainer]: network="{default_pytest_celery_network.name}", wrapper_class=RedisContainer, timeout=REDIS_CONTAINER_TIMEOUT, - command=RedisContainer.command("--maxclients", "100000"), + command=fxtr("default_redis_broker_command"), ) +@pytest.fixture +def default_redis_broker_command(default_redis_broker_cls: type[RedisContainer]) -> list[str]: + """Command to run the container. + + Args: + default_redis_broker_cls (type[RedisContainer]): See also: :ref:`vendor-class`. + + Returns: + list[str]: Docker CMD instruction. + """ + return default_redis_broker_cls.command("--maxclients", "100000") + + @pytest.fixture def default_redis_broker_env(default_redis_broker_cls: type[RedisContainer]) -> dict: """Environment variables for this vendor. diff --git a/src/pytest_celery/vendors/worker/container.py b/src/pytest_celery/vendors/worker/container.py index 600311b0..f973e8ce 100644 --- a/src/pytest_celery/vendors/worker/container.py +++ b/src/pytest_celery/vendors/worker/container.py @@ -32,6 +32,22 @@ class CeleryWorkerContainer(CeleryTestContainer): dependencies of your project. """ + @classmethod + def command(cls, *args: str) -> list[str]: + args = args or tuple() + return [ + "celery", + "-A", + "app", + "worker", + f"--loglevel={cls.log_level()}", + "-n", + f"{cls.worker_name()}@%h", + "-Q", + f"{cls.worker_queue()}", + *args, + ] + def _wait_port(self, port: str) -> int: # Not needed for worker container raise NotImplementedError diff --git a/src/pytest_celery/vendors/worker/fixtures.py b/src/pytest_celery/vendors/worker/fixtures.py index 1e622665..636bf56b 100644 --- a/src/pytest_celery/vendors/worker/fixtures.py +++ b/src/pytest_celery/vendors/worker/fixtures.py @@ -91,6 +91,7 @@ def default_worker_container_session_cls() -> type[CeleryWorkerContainer]: volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME}, wrapper_class=CeleryWorkerContainer, timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT, + command=fxtr("default_worker_command"), ) celery_base_worker_image = build( @@ -161,6 +162,19 @@ def default_worker_celery_worker_queue(default_worker_container_session_cls: typ return default_worker_container_session_cls.worker_queue() +@pytest.fixture +def default_worker_command(default_worker_container_cls: type[CeleryWorkerContainer]) -> list[str]: + """Command to run the container. + + Args: + default_worker_container_cls (type[CeleryWorkerContainer]): See also: :ref:`vendor-class`. + + Returns: + list[str]: Docker CMD instruction. + """ + return default_worker_container_cls.command() + + @pytest.fixture def default_worker_env( default_worker_container_cls: type[CeleryWorkerContainer], @@ -201,7 +215,7 @@ def default_worker_initial_content( .. note:: - Move volumes may be added additionally. + More volumes may be added additionally. Args: default_worker_container_cls (type[CeleryWorkerContainer]): See also: :ref:`vendor-class`. diff --git a/tests/integration/api/custom_setup/conftest.py b/tests/integration/api/custom_setup/conftest.py index 47e1d0af..e9c1a571 100644 --- a/tests/integration/api/custom_setup/conftest.py +++ b/tests/integration/api/custom_setup/conftest.py @@ -110,6 +110,7 @@ def default_worker_container_session_cls() -> type[CeleryWorkerContainer]: volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME}, wrapper_class=Celery5WorkerContainer, timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT, + command=fxtr("default_worker_command"), ) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 1bee7de4..03f1a0a1 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -56,4 +56,5 @@ def default_worker_container_session_cls() -> type[CeleryWorkerContainer]: volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME}, wrapper_class=IntegrationWorkerContainer, timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT, + command=fxtr("default_worker_command"), ) diff --git a/tests/integration/vendors/test_worker.py b/tests/integration/vendors/test_worker.py index 5b59a394..ea28fed3 100644 --- a/tests/integration/vendors/test_worker.py +++ b/tests/integration/vendors/test_worker.py @@ -50,6 +50,14 @@ def default_worker_app_module(self, request: pytest.FixtureRequest) -> ModuleTyp def test_replacing_app_module(self, container: CeleryWorkerContainer, default_worker_app_module: ModuleType): assert container.app_module() == default_worker_app_module + class test_replacing_command: + @pytest.fixture + def default_worker_command(self, default_worker_command: list[str]) -> list[str]: + return [*default_worker_command, "--pool", "solo"] + + def test_replacing_command(self, container: CeleryWorkerContainer): + assert container.logs().count("solo") == 1 + class test_base_test_worker: def test_config(self, celery_setup_worker: CeleryTestWorker): diff --git a/tests/smoke/conftest.py b/tests/smoke/conftest.py index 1e7683c1..aef23de9 100644 --- a/tests/smoke/conftest.py +++ b/tests/smoke/conftest.py @@ -102,6 +102,7 @@ def default_worker_container_session_cls() -> type[CeleryWorkerContainer]: volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME}, wrapper_class=SmokeWorkerContainer, timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT, + command=fxtr("default_worker_command"), ) diff --git a/tests/smoke/test_worker.py b/tests/smoke/test_worker.py new file mode 100644 index 00000000..199e45bd --- /dev/null +++ b/tests/smoke/test_worker.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +import pytest + +from pytest_celery import CeleryTestSetup + + +@pytest.mark.parametrize( + "pool", + [ + "solo", + "prefork", + "threads", + ], +) +class test_replacing_pool: + @pytest.fixture + def default_worker_command(self, default_worker_command: list[str], pool: str) -> list[str]: + return [*default_worker_command, "--pool", pool] + + def test_pool_from_celery_banner(self, celery_setup: CeleryTestSetup, pool: str): + if pool == "threads": + pool = "thread" + celery_setup.worker.assert_log_exists(pool)