Skip to content

Commit

Permalink
Allow disabling clusters by returning None when overriding fixtures: …
Browse files Browse the repository at this point in the history
…celery_broker_cluster, celery_backend_cluster
  • Loading branch information
Nusnus committed Nov 27, 2023
1 parent eb13ca9 commit ae1b208
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 13 deletions.
30 changes: 21 additions & 9 deletions src/pytest_celery/api/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,14 @@ def config(cls, celery_worker_cluster_config: dict) -> dict:
if not celery_worker_cluster_config:
raise ValueError("celery_worker_cluster_config is empty")

celery_broker_cluster_config: dict = celery_worker_cluster_config["celery_broker_cluster_config"]
celery_backend_cluster_config: dict = celery_worker_cluster_config["celery_backend_cluster_config"]
return {
"broker_url": ";".join(celery_broker_cluster_config["local_urls"]),
"result_backend": ";".join(celery_backend_cluster_config["local_urls"]),
}
celery_broker_cluster_config: dict = celery_worker_cluster_config.get("celery_broker_cluster_config", {})
celery_backend_cluster_config: dict = celery_worker_cluster_config.get("celery_backend_cluster_config", {})
config = {}
if celery_broker_cluster_config:
config["broker_url"] = ";".join(celery_broker_cluster_config["local_urls"])
if celery_backend_cluster_config:
config["result_backend"] = ";".join(celery_backend_cluster_config["local_urls"])
return config

@classmethod
def update_app_config(cls, app: Celery) -> None:
Expand Down Expand Up @@ -110,8 +112,12 @@ def ready(self, ping: bool = False, control: bool = False, docker: bool = True)
ready = True

if docker and ready:
ready = all([self.broker_cluster.ready(), self.backend_cluster.ready()])
ready = ready and self.worker_cluster.ready()
if self.broker_cluster:
ready = ready and self.broker_cluster.ready()
if self.backend_cluster:
ready = ready and self.backend_cluster.ready()
if self.worker_cluster:
ready = ready and self.worker_cluster.ready()

if control and ready:
r = self.app.control.ping()
Expand All @@ -125,7 +131,13 @@ def ready(self, ping: bool = False, control: bool = False, docker: bool = True)
ready = ready and res.get(timeout=RESULT_TIMEOUT) == "pong"

# Set app for all nodes
nodes = self.broker_cluster.nodes + self.backend_cluster.nodes
nodes: tuple = tuple()
if self.broker_cluster:
nodes += self.broker_cluster.nodes
if self.backend_cluster:
nodes += self.backend_cluster.nodes
if self.worker_cluster:
nodes += self.worker_cluster.nodes
for node in nodes:
node._app = self.app

Expand Down
6 changes: 4 additions & 2 deletions src/pytest_celery/fixtures/backend.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# mypy: disable-error-code="misc"

from typing import Any

Check warning on line 3 in src/pytest_celery/fixtures/backend.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/fixtures/backend.py#L3

Added line #L3 was not covered by tests

import pytest

from pytest_celery.api.backend import CeleryBackendCluster
Expand All @@ -23,10 +25,10 @@ def celery_backend_cluster(celery_backend: CeleryTestBackend) -> CeleryBackendCl


@pytest.fixture
def celery_backend_cluster_config(request: pytest.FixtureRequest) -> dict:
def celery_backend_cluster_config(request: pytest.FixtureRequest) -> dict[Any, Any] | None:
try:
use_default_config = pytest.fail.Exception
cluster: CeleryBackendCluster = request.getfixturevalue(CELERY_BACKEND_CLUSTER)
return cluster.config()
return cluster.config() if cluster else None
except use_default_config:
return CeleryBackendCluster.default_config()
6 changes: 4 additions & 2 deletions src/pytest_celery/fixtures/broker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# mypy: disable-error-code="misc"

from typing import Any

Check warning on line 3 in src/pytest_celery/fixtures/broker.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/fixtures/broker.py#L3

Added line #L3 was not covered by tests

import pytest

from pytest_celery.api.broker import CeleryBrokerCluster
Expand All @@ -23,10 +25,10 @@ def celery_broker_cluster(celery_broker: CeleryTestBroker) -> CeleryBrokerCluste


@pytest.fixture
def celery_broker_cluster_config(request: pytest.FixtureRequest) -> dict:
def celery_broker_cluster_config(request: pytest.FixtureRequest) -> dict[Any, Any] | None:
try:
use_default_config = pytest.fail.Exception
cluster: CeleryBrokerCluster = request.getfixturevalue(CELERY_BROKER_CLUSTER)
return cluster.config()
return cluster.config() if cluster else None
except use_default_config:
return CeleryBrokerCluster.default_config()
4 changes: 4 additions & 0 deletions src/pytest_celery/vendors/worker/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,12 @@ def env(cls, celery_worker_cluster_config: dict) -> dict:
env = {}
if celery_broker_cluster_config:
env["CELERY_BROKER_URL"] = ";".join(celery_broker_cluster_config["urls"])
else:
del DEFAULT_WORKER_ENV["CELERY_BROKER_URL"]

Check warning on line 64 in src/pytest_celery/vendors/worker/container.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/vendors/worker/container.py#L64

Added line #L64 was not covered by tests
if celery_backend_cluster_config:
env["CELERY_RESULT_BACKEND"] = ";".join(celery_backend_cluster_config["urls"])
else:
del DEFAULT_WORKER_ENV["CELERY_RESULT_BACKEND"]

Check warning on line 68 in src/pytest_celery/vendors/worker/container.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/vendors/worker/container.py#L68

Added line #L68 was not covered by tests
return {**DEFAULT_WORKER_ENV, **env}

@classmethod
Expand Down

0 comments on commit ae1b208

Please sign in to comment.