Skip to content

Commit

Permalink
Feature: Support for dynamic worker args (#203)
Browse files Browse the repository at this point in the history
* Bugfix: RedisContainer.command() was skipping the fixtures layer

* Feature: New worker fixture "default_worker_command"
  • Loading branch information
Nusnus authored Feb 13, 2024
1 parent 66ceab8 commit f791921
Show file tree
Hide file tree
Showing 12 changed files with 96 additions and 6 deletions.
1 change: 1 addition & 0 deletions docs/userguide/examples/django.rst
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ and `container <https://github.com/Jc2k/pytest-docker-tools?tab=readme-ov-file#c
},
wrapper_class=DjangoWorkerContainer,
timeout=defaults.DEFAULT_WORKER_CONTAINER_TIMEOUT,
command=fxtr("default_worker_command"),
)
In this case, we also mount the project directory to ``/src`` in the container, so that we can install the project
Expand Down
1 change: 1 addition & 0 deletions examples/django/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def worker_queue(cls) -> str:
},
wrapper_class=DjangoWorkerContainer,
timeout=defaults.DEFAULT_WORKER_CONTAINER_TIMEOUT,
command=fxtr("default_worker_command"),
)


Expand Down
3 changes: 0 additions & 3 deletions src/pytest_celery/api/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,6 @@ def command(cls, *args: str) -> list[str]:
the first element, followed by the command-line arguments.
"""

# To be used with pytest_docker_tools.container using the command
# kwarg with the class method as value
# e.g. command=MyContainer.command()
raise NotImplementedError("CeleryTestContainer.command")

def teardown(self) -> None:
Expand Down
15 changes: 14 additions & 1 deletion src/pytest_celery/vendors/redis/backend/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,23 @@ def default_redis_backend_cls() -> type[RedisContainer]:
network="{default_pytest_celery_network.name}",
wrapper_class=RedisContainer,
timeout=REDIS_CONTAINER_TIMEOUT,
command=RedisContainer.command("--maxclients", "100000"),
command=fxtr("default_redis_backend_command"),
)


@pytest.fixture
def default_redis_backend_command(default_redis_backend_cls: type[RedisContainer]) -> list[str]:
"""Command to run the container.
Args:
default_redis_backend_cls (type[RedisContainer]): See also: :ref:`vendor-class`.
Returns:
list[str]: Docker CMD instruction.
"""
return default_redis_backend_cls.command("--maxclients", "100000")


@pytest.fixture
def default_redis_backend_env(default_redis_backend_cls: type[RedisContainer]) -> dict:
"""Environment variables for this vendor.
Expand Down
15 changes: 14 additions & 1 deletion src/pytest_celery/vendors/redis/broker/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,23 @@ def default_redis_broker_cls() -> type[RedisContainer]:
network="{default_pytest_celery_network.name}",
wrapper_class=RedisContainer,
timeout=REDIS_CONTAINER_TIMEOUT,
command=RedisContainer.command("--maxclients", "100000"),
command=fxtr("default_redis_broker_command"),
)


@pytest.fixture
def default_redis_broker_command(default_redis_broker_cls: type[RedisContainer]) -> list[str]:
"""Command to run the container.
Args:
default_redis_broker_cls (type[RedisContainer]): See also: :ref:`vendor-class`.
Returns:
list[str]: Docker CMD instruction.
"""
return default_redis_broker_cls.command("--maxclients", "100000")


@pytest.fixture
def default_redis_broker_env(default_redis_broker_cls: type[RedisContainer]) -> dict:
"""Environment variables for this vendor.
Expand Down
16 changes: 16 additions & 0 deletions src/pytest_celery/vendors/worker/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,22 @@ class CeleryWorkerContainer(CeleryTestContainer):
dependencies of your project.
"""

@classmethod
def command(cls, *args: str) -> list[str]:
args = args or tuple()
return [
"celery",
"-A",
"app",
"worker",
f"--loglevel={cls.log_level()}",
"-n",
f"{cls.worker_name()}@%h",
"-Q",
f"{cls.worker_queue()}",
*args,
]

def _wait_port(self, port: str) -> int:
# Not needed for worker container
raise NotImplementedError
Expand Down
16 changes: 15 additions & 1 deletion src/pytest_celery/vendors/worker/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def default_worker_container_session_cls() -> type[CeleryWorkerContainer]:
volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME},
wrapper_class=CeleryWorkerContainer,
timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT,
command=fxtr("default_worker_command"),
)

celery_base_worker_image = build(
Expand Down Expand Up @@ -161,6 +162,19 @@ def default_worker_celery_worker_queue(default_worker_container_session_cls: typ
return default_worker_container_session_cls.worker_queue()


@pytest.fixture
def default_worker_command(default_worker_container_cls: type[CeleryWorkerContainer]) -> list[str]:
"""Command to run the container.
Args:
default_worker_container_cls (type[CeleryWorkerContainer]): See also: :ref:`vendor-class`.
Returns:
list[str]: Docker CMD instruction.
"""
return default_worker_container_cls.command()


@pytest.fixture
def default_worker_env(
default_worker_container_cls: type[CeleryWorkerContainer],
Expand Down Expand Up @@ -201,7 +215,7 @@ def default_worker_initial_content(
.. note::
Move volumes may be added additionally.
More volumes may be added additionally.
Args:
default_worker_container_cls (type[CeleryWorkerContainer]): See also: :ref:`vendor-class`.
Expand Down
1 change: 1 addition & 0 deletions tests/integration/api/custom_setup/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def default_worker_container_session_cls() -> type[CeleryWorkerContainer]:
volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME},
wrapper_class=Celery5WorkerContainer,
timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT,
command=fxtr("default_worker_command"),
)


Expand Down
1 change: 1 addition & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,5 @@ def default_worker_container_session_cls() -> type[CeleryWorkerContainer]:
volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME},
wrapper_class=IntegrationWorkerContainer,
timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT,
command=fxtr("default_worker_command"),
)
8 changes: 8 additions & 0 deletions tests/integration/vendors/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ def default_worker_app_module(self, request: pytest.FixtureRequest) -> ModuleTyp
def test_replacing_app_module(self, container: CeleryWorkerContainer, default_worker_app_module: ModuleType):
assert container.app_module() == default_worker_app_module

class test_replacing_command:
@pytest.fixture
def default_worker_command(self, default_worker_command: list[str]) -> list[str]:
return [*default_worker_command, "--pool", "solo"]

def test_replacing_command(self, container: CeleryWorkerContainer):
assert container.logs().count("solo") == 1


class test_base_test_worker:
def test_config(self, celery_setup_worker: CeleryTestWorker):
Expand Down
1 change: 1 addition & 0 deletions tests/smoke/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ def default_worker_container_session_cls() -> type[CeleryWorkerContainer]:
volumes={"{default_worker_volume.name}": DEFAULT_WORKER_VOLUME},
wrapper_class=SmokeWorkerContainer,
timeout=DEFAULT_WORKER_CONTAINER_TIMEOUT,
command=fxtr("default_worker_command"),
)


Expand Down
24 changes: 24 additions & 0 deletions tests/smoke/test_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from __future__ import annotations

import pytest

from pytest_celery import CeleryTestSetup


@pytest.mark.parametrize(
"pool",
[
"solo",
"prefork",
"threads",
],
)
class test_replacing_pool:
@pytest.fixture
def default_worker_command(self, default_worker_command: list[str], pool: str) -> list[str]:
return [*default_worker_command, "--pool", pool]

def test_pool_from_celery_banner(self, celery_setup: CeleryTestSetup, pool: str):
if pool == "threads":
pool = "thread"
celery_setup.worker.assert_log_exists(pool)

0 comments on commit f791921

Please sign in to comment.