Skip to content

Commit

Permalink
Removed pytest_docker_tools.wrappers.container.wait_for_callable() in…
Browse files Browse the repository at this point in the history
… favor of CeleryTestWorker.wait_for_log() (#57)
  • Loading branch information
Nusnus authored Oct 13, 2023
1 parent cf50961 commit d33c685
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 59 deletions.
1 change: 1 addition & 0 deletions src/pytest_celery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

__version__ = "1.0.0a1"


from pytest_celery.api.backend import *
from pytest_celery.api.base import *
from pytest_celery.api.broker import *
Expand Down
7 changes: 4 additions & 3 deletions src/pytest_celery/api/container.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import sys
from typing import Any
from typing import Optional

Expand Down Expand Up @@ -47,8 +46,10 @@ def _wait_ready(self, timeout: int = 30) -> bool:
timeout=timeout,
)

sys.stdout.write(f"{self.__class__.__name__}::{self.name} is ready\n")

wait_for_callable(
f"{self.__class__.__name__}::{self.name} is ready",
lambda: self.ready_prompt or "" in self.logs(),
)
return True

def ready(self) -> bool:
Expand Down
6 changes: 6 additions & 0 deletions src/pytest_celery/api/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
from typing import Union

from celery import Celery
from pytest_docker_tools.wrappers.container import wait_for_callable

from pytest_celery.api.base import CeleryTestCluster
from pytest_celery.api.base import CeleryTestNode
from pytest_celery.api.container import CeleryTestContainer
from pytest_celery.defaults import RESULT_TIMEOUT
from pytest_celery.vendors.worker.container import CeleryWorkerContainer


Expand Down Expand Up @@ -40,6 +42,10 @@ def worker_name(self) -> str:
def worker_queue(self) -> str:
return self.container.worker_queue()

def wait_for_log(self, log: str, message: str = "", timeout: int = RESULT_TIMEOUT) -> None:
message = message or f"Waiting for worker container '{self.name()}' to log -> {log}"
wait_for_callable(message=message, func=lambda: log in self.logs(), timeout=timeout)


class CeleryWorkerCluster(CeleryTestCluster):
def __init__(self, *workers: Tuple[Union[CeleryTestWorker, CeleryTestContainer]]) -> None:
Expand Down
7 changes: 1 addition & 6 deletions tests/integration/api/custom_setup/test_custom_setup.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from celery import Celery
from pytest_docker_tools.wrappers.container import wait_for_callable

from pytest_celery import RESULT_TIMEOUT
from pytest_celery import CeleryTestSetup
Expand Down Expand Up @@ -34,11 +33,7 @@ def test_log_level_custom_setup(self, celery_setup: CeleryTestSetup):
worker: CeleryTestWorker
for worker in celery_setup.worker_cluster:
if worker.logs():
wait_for_callable(
"waiting for worker.log_level in worker.logs()",
lambda: worker.log_level in worker.logs(),
timeout=RESULT_TIMEOUT,
)
worker.wait_for_log(worker.log_level)

def test_celery_setup_override(self, celery_setup: CeleryTestSetup):
assert celery_setup.app
Expand Down
8 changes: 2 additions & 6 deletions tests/integration/api/test_setup.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from celery import Celery
from pytest_docker_tools.wrappers.container import wait_for_callable

from pytest_celery import RESULT_TIMEOUT
from pytest_celery import CeleryTestSetup
Expand Down Expand Up @@ -31,13 +30,10 @@ def test_worker_is_connected_to_broker(self, celery_setup: CeleryTestSetup):
def test_log_level(self, celery_setup: CeleryTestSetup):
worker: CeleryTestWorker
for worker in celery_setup.worker_cluster:
wait_for_callable(
"waiting for worker.log_level in worker.logs()",
lambda: worker.log_level in worker.logs(),
timeout=RESULT_TIMEOUT,
)
worker.wait_for_log(worker.log_level)

def test_ready(self, celery_setup: CeleryTestSetup):
worker: CeleryTestWorker
for worker in celery_setup.worker_cluster:
queue = worker.worker_queue
r = identity.s("test_ready").apply_async(queue=queue)
Expand Down
21 changes: 4 additions & 17 deletions tests/smoke/test_canvas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from celery.canvas import chord
from celery.canvas import group
from celery.canvas import signature
from pytest_docker_tools.wrappers.container import wait_for_callable

from pytest_celery import RESULT_TIMEOUT
from pytest_celery import CeleryTestSetup
Expand All @@ -15,17 +14,13 @@
class test_canvas:
def test_sanity(self, celery_setup: CeleryTestSetup):
assert celery_setup.ready(ping=True)
worker = celery_setup.worker_cluster[0]
worker: CeleryTestWorker = celery_setup.worker_cluster[0]
queue = worker.worker_queue
expected = "test_sanity"
res = identity.s(expected).apply_async(queue=queue)
assert res.get(timeout=RESULT_TIMEOUT) == expected

wait_for_callable(
f"waiting for {expected} in worker.logs()",
lambda: expected in worker.logs(),
timeout=RESULT_TIMEOUT,
)
worker.wait_for_log(expected)

if len(celery_setup.worker_cluster) > 1:
queue = celery_setup.worker_cluster[1].worker_queue
Expand All @@ -36,17 +31,9 @@ def test_sanity(self, celery_setup: CeleryTestSetup):
if len(celery_setup.worker_cluster) > 1:
assert expected not in celery_setup.worker_cluster[1].logs()
expected = "succeeded"
wait_for_callable(
f"waiting for {expected} in celery_setup.worker_cluster[1].logs()",
lambda: expected in celery_setup.worker_cluster[1].logs(),
timeout=RESULT_TIMEOUT,
)
celery_setup.worker_cluster[1].wait_for_log(expected)
else:
wait_for_callable(
f"waiting for {expected} in worker.logs()",
lambda: expected in worker.logs(),
timeout=RESULT_TIMEOUT,
)
worker.wait_for_log(expected)

def test_signature(self, celery_setup: CeleryTestSetup):
worker: CeleryTestWorker
Expand Down
32 changes: 5 additions & 27 deletions tests/smoke/test_signals.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import pytest
from celery.signals import after_task_publish
from celery.signals import before_task_publish
from pytest_docker_tools.wrappers.container import wait_for_callable

from pytest_celery import RESULT_TIMEOUT
from pytest_celery import CeleryTestSetup
from pytest_celery import CeleryTestWorker
from tests.tasks import identity
Expand Down Expand Up @@ -51,46 +49,26 @@ def after_task_publish_handler(*args, **kwargs):
def test_worker_init(self, celery_setup: CeleryTestSetup):
worker: CeleryTestWorker
for worker in celery_setup.worker_cluster:
wait_for_callable(
"waiting for worker_init_handler in worker.logs()",
lambda: "worker_init_handler" in worker.logs(),
timeout=RESULT_TIMEOUT,
)
worker.wait_for_log("worker_init_handler")

def test_worker_process_init(self, celery_setup: CeleryTestSetup):
worker: CeleryTestWorker
for worker in celery_setup.worker_cluster:
wait_for_callable(
"waiting for worker_process_init_handler in worker.logs()",
lambda: "worker_process_init_handler" in worker.logs(),
timeout=RESULT_TIMEOUT,
)
worker.wait_for_log("worker_process_init_handler")

def test_worker_ready(self, celery_setup: CeleryTestSetup):
worker: CeleryTestWorker
for worker in celery_setup.worker_cluster:
wait_for_callable(
"waiting for worker_ready_handler in worker.logs()",
lambda: "worker_ready_handler" in worker.logs(),
timeout=RESULT_TIMEOUT,
)
worker.wait_for_log("worker_ready_handler")

def test_worker_process_shutdown(self, celery_setup: CeleryTestSetup):
worker: CeleryTestWorker
for worker in celery_setup.worker_cluster:
worker.app.control.broadcast("shutdown")
wait_for_callable(
"waiting for worker_process_shutdown_handler in worker.logs()",
lambda: "worker_process_shutdown_handler" in worker.logs(),
timeout=RESULT_TIMEOUT,
)
worker.wait_for_log("worker_process_shutdown_handler")

def test_worker_shutdown(self, celery_setup: CeleryTestSetup):
worker: CeleryTestWorker
for worker in celery_setup.worker_cluster:
worker.app.control.broadcast("shutdown")
wait_for_callable(
"waiting for worker_shutdown_handler in worker.logs()",
lambda: "worker_shutdown_handler" in worker.logs(),
timeout=RESULT_TIMEOUT,
)
worker.wait_for_log("worker_shutdown_handler")

0 comments on commit d33c685

Please sign in to comment.