Skip to content

Commit

Permalink
Added unit tests coverage (#73)
Browse files Browse the repository at this point in the history
* New Tests: tests/unit/api/test_container.py

* New Tests: tests/unit/api/test_base.py

* New Tests: tests/unit/api/test_backend.py

* New Tests: tests/unit/api/test_broker.py

* New Tests: tests/unit/api/test_worker.py

* New Tests: tests/unit/api/test_setup.py

* New Tests: tests/unit/vendors/test_worker.py

* New Tests: tests/unit/vendors/test_memcached.py

* New Tests: tests/unit/vendors/test_rabbitmq.py

* New Tests: tests/unit/vendors/test_redis.py
  • Loading branch information
Nusnus authored Oct 25, 2023
1 parent ee0f988 commit a57dfdb
Show file tree
Hide file tree
Showing 17 changed files with 383 additions and 187 deletions.
17 changes: 9 additions & 8 deletions src/pytest_celery/api/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,23 @@
from pytest_celery.api.base import CeleryTestCluster
from pytest_celery.api.base import CeleryTestNode
from pytest_celery.api.container import CeleryTestContainer
from pytest_celery.defaults import WORKER_ENV
from pytest_celery.defaults import DEFAULT_WORKER_ENV


class CeleryTestBackend(CeleryTestNode):
@classmethod
def default_config(cls) -> dict:
return {
"url": WORKER_ENV["CELERY_RESULT_BACKEND"],
"local_url": WORKER_ENV["CELERY_RESULT_BACKEND"],
"url": DEFAULT_WORKER_ENV["CELERY_RESULT_BACKEND"],
"local_url": DEFAULT_WORKER_ENV["CELERY_RESULT_BACKEND"],
}

def restart(self) -> None:
super().restart()
self.app.conf.update(
result_backend=self.config()["local_url"],
)
if self.app:
self.app.conf.update(
result_backend=self.config()["local_url"],
)


class CeleryBackendCluster(CeleryTestCluster):
Expand All @@ -37,6 +38,6 @@ def _set_nodes(
@classmethod
def default_config(cls) -> dict:
return {
"urls": [WORKER_ENV["CELERY_RESULT_BACKEND"]],
"local_urls": [WORKER_ENV["CELERY_RESULT_BACKEND"]],
"urls": [DEFAULT_WORKER_ENV["CELERY_RESULT_BACKEND"]],
"local_urls": [DEFAULT_WORKER_ENV["CELERY_RESULT_BACKEND"]],
}
47 changes: 27 additions & 20 deletions src/pytest_celery/api/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,25 @@ def container(self) -> CeleryTestContainer:
def app(self) -> Celery:
return self._app

def ready(self) -> bool:
return self.container.ready()

def config(self, *args: tuple, **kwargs: dict) -> dict:
return self.container.celeryconfig
def __eq__(self, __value: object) -> bool:
if isinstance(__value, CeleryTestNode):
return all(
(
self.container == __value.container,
self.app == __value.app,
)
)
return False

@classmethod
def default_config(cls) -> dict:
return {}

def __eq__(self, __value: object) -> bool:
if isinstance(__value, CeleryTestNode):
return self.container == __value.container
return False
def ready(self) -> bool:
return self.container.ready()

def config(self, *args: tuple, **kwargs: dict) -> dict:
return self.container.celeryconfig

def logs(self) -> str:
return self.container.logs()
Expand Down Expand Up @@ -89,6 +94,14 @@ def __init__(self, *nodes: Tuple[Union[CeleryTestNode, CeleryTestContainer]]) ->

self.nodes = nodes # type: ignore

@property
def nodes(self) -> Tuple[CeleryTestNode]:
return self._nodes

@nodes.setter
def nodes(self, nodes: Tuple[Union[CeleryTestNode, CeleryTestContainer]]) -> None:
self._nodes = self._set_nodes(*nodes) # type: ignore

def __iter__(self) -> Iterator[CeleryTestNode]:
return iter(self.nodes)

Expand All @@ -105,13 +118,9 @@ def __eq__(self, __value: object) -> bool:
return False
return False

@property
def nodes(self) -> Tuple[CeleryTestNode]:
return self._nodes

@nodes.setter
def nodes(self, nodes: Tuple[Union[CeleryTestNode, CeleryTestContainer]]) -> None:
self._nodes = self._set_nodes(*nodes) # type: ignore
@classmethod
def default_config(cls) -> dict:
return {}

@abstractmethod
def _set_nodes(
Expand Down Expand Up @@ -139,9 +148,7 @@ def config(self, *args: tuple, **kwargs: dict) -> dict:
"local_urls": [c["local_url"] for c in config],
}

@classmethod
def default_config(cls) -> dict:
return {}

def teardown(self) -> None:
# Do not need to call teardown on the nodes
# but only tear down self
pass
17 changes: 9 additions & 8 deletions src/pytest_celery/api/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,23 @@
from pytest_celery.api.base import CeleryTestCluster
from pytest_celery.api.base import CeleryTestNode
from pytest_celery.api.container import CeleryTestContainer
from pytest_celery.defaults import WORKER_ENV
from pytest_celery.defaults import DEFAULT_WORKER_ENV


class CeleryTestBroker(CeleryTestNode):
@classmethod
def default_config(cls) -> dict:
return {
"url": WORKER_ENV["CELERY_BROKER_URL"],
"local_url": WORKER_ENV["CELERY_BROKER_URL"],
"url": DEFAULT_WORKER_ENV["CELERY_BROKER_URL"],
"local_url": DEFAULT_WORKER_ENV["CELERY_BROKER_URL"],
}

def restart(self) -> None:
super().restart()
self.app.conf.update(
broker_url=self.config()["local_url"],
)
if self.app:
self.app.conf.update(
broker_url=self.config()["local_url"],
)


class CeleryBrokerCluster(CeleryTestCluster):
Expand All @@ -37,6 +38,6 @@ def _set_nodes(
@classmethod
def default_config(cls) -> dict:
return {
"urls": [WORKER_ENV["CELERY_BROKER_URL"]],
"local_urls": [WORKER_ENV["CELERY_BROKER_URL"]],
"urls": [DEFAULT_WORKER_ENV["CELERY_BROKER_URL"]],
"local_urls": [DEFAULT_WORKER_ENV["CELERY_BROKER_URL"]],
}
3 changes: 3 additions & 0 deletions src/pytest_celery/api/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ def ready_prompt(self) -> Optional[str]:
return None

def _wait_port(self, port: str) -> int:
if not port:
raise ValueError("Port cannot be empty")

while not super().ready():
pass
_, p = self.get_addr(port)
Expand Down
66 changes: 33 additions & 33 deletions src/pytest_celery/api/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ def app(self) -> Celery:
return self._app

@property
def worker_cluster(self) -> CeleryWorkerCluster:
return self._worker_cluster
def backend_cluster(self) -> CeleryBackendCluster:
return self._backend_cluster

@property
def worker(self) -> CeleryTestWorker:
return self._worker_cluster[0] # type: ignore
def backend(self) -> CeleryTestBackend:
return self._backend_cluster[0] # type: ignore

@property
def broker_cluster(self) -> CeleryBrokerCluster:
Expand All @@ -50,37 +50,12 @@ def broker(self) -> CeleryTestBroker:
return self._broker_cluster[0] # type: ignore

@property
def backend_cluster(self) -> CeleryBackendCluster:
return self._backend_cluster
def worker_cluster(self) -> CeleryWorkerCluster:
return self._worker_cluster

@property
def backend(self) -> CeleryTestBackend:
return self._backend_cluster[0] # type: ignore

def ready(self, ping: bool = False, control: bool = False, docker: bool = True) -> bool:
ready = True

if docker and ready:
ready = all([self.broker_cluster.ready(), self.backend_cluster.ready()])
ready = ready and self.worker_cluster.ready()

if control and ready:
r = self.app.control.ping()
ready = ready and all([all([res["ok"] == "pong" for _, res in response.items()]) for response in r])

if ping and ready:
# TODO: ignore mypy globally for type overriding
worker: CeleryTestWorker
for worker in self.worker_cluster: # type: ignore
res = self.ping.s().apply_async(queue=worker.worker_queue)
ready = ready and res.get(timeout=RESULT_TIMEOUT) == "pong"

# Set app for all nodes
nodes = self.broker_cluster.nodes + self.backend_cluster.nodes
for node in nodes:
node._app = self.app

return ready
def worker(self) -> CeleryTestWorker:
return self._worker_cluster[0] # type: ignore

@classmethod
def name(cls) -> str:
Expand Down Expand Up @@ -130,3 +105,28 @@ def chords_allowed(self) -> bool:

def teardown(self) -> None:
pass

def ready(self, ping: bool = False, control: bool = False, docker: bool = True) -> bool:
ready = True

if docker and ready:
ready = all([self.broker_cluster.ready(), self.backend_cluster.ready()])
ready = ready and self.worker_cluster.ready()

if control and ready:
r = self.app.control.ping()
ready = ready and all([all([res["ok"] == "pong" for _, res in response.items()]) for response in r])

if ping and ready:
# TODO: ignore mypy globally for type overriding
worker: CeleryTestWorker
for worker in self.worker_cluster: # type: ignore
res = self.ping.s().apply_async(queue=worker.worker_queue)
ready = ready and res.get(timeout=RESULT_TIMEOUT) == "pong"

# Set app for all nodes
nodes = self.broker_cluster.nodes + self.backend_cluster.nodes
for node in nodes:
node._app = self.app

return ready
30 changes: 15 additions & 15 deletions src/pytest_celery/vendors/worker/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@


class CeleryWorkerContainer(CeleryTestContainer):
def _wait_port(self, port: str) -> int:
raise NotImplementedError

@property
def ready_prompt(self) -> str:
return "ready."

@classmethod
def version(cls) -> str:
return DEFAULT_WORKER_VERSION
Expand All @@ -29,6 +36,14 @@ def worker_name(cls) -> str:
def worker_queue(cls) -> str:
return DEFAULT_WORKER_QUEUE

@classmethod
def tasks_modules(cls) -> set:
return set()

@classmethod
def signals_modules(cls) -> set:
return set()

@classmethod
def buildargs(cls) -> dict:
return {
Expand Down Expand Up @@ -145,18 +160,3 @@ def _initial_content_worker_signals(cls, worker_signals: set) -> dict:
else:
print("No signals found")
return initial_content

@classmethod
def tasks_modules(cls) -> set:
return set()

@classmethod
def signals_modules(cls) -> set:
return set()

def _wait_port(self, port: str) -> int:
raise NotImplementedError

@property
def ready_prompt(self) -> str:
return "ready."
20 changes: 16 additions & 4 deletions tests/unit/api/test_backend.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
import pytest
from celery import Celery
from pytest_lazyfixture import lazy_fixture

from pytest_celery import CELERY_BACKEND
from pytest_celery import CELERY_BACKEND_CLUSTER
from pytest_celery import DEFAULT_WORKER_ENV
from pytest_celery import CeleryBackendCluster
from pytest_celery import CeleryTestBackend


@pytest.mark.parametrize("backend", [lazy_fixture(CELERY_BACKEND)])
class test_celey_test_backend:
def test_default_config_format(self, backend: CeleryTestBackend):
expected_format = {"url", "local_url"}
assert set(backend.default_config().keys()) == expected_format
assert backend.default_config()["url"] == DEFAULT_WORKER_ENV["CELERY_RESULT_BACKEND"]
assert backend.default_config()["local_url"] == DEFAULT_WORKER_ENV["CELERY_RESULT_BACKEND"]

def test_restart_no_app(self, backend: CeleryTestBackend):
assert backend.app is None
backend.restart()

def test_restart_with_app(self, backend: CeleryTestBackend, celery_setup_app: Celery):
backend._app = celery_setup_app
assert "result_backend" not in celery_setup_app.conf.changes
backend.restart()
assert "result_backend" in celery_setup_app.conf.changes


@pytest.mark.parametrize("cluster", [lazy_fixture(CELERY_BACKEND_CLUSTER)])
class test_celery_backend_cluster:
def test_default_config_format(self, cluster: CeleryBackendCluster):
expected_format = {"urls", "local_urls"}
assert set(cluster.default_config().keys()) == expected_format
assert cluster.default_config()["urls"] == [DEFAULT_WORKER_ENV["CELERY_RESULT_BACKEND"]]
assert cluster.default_config()["local_urls"] == [DEFAULT_WORKER_ENV["CELERY_RESULT_BACKEND"]]
Loading

0 comments on commit a57dfdb

Please sign in to comment.