-
-
Notifications
You must be signed in to change notification settings - Fork 15
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
New Example: examples/worker_pool (#242)
- Loading branch information
Showing
10 changed files
with
307 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
.. _examples_worker_pool: | ||
|
||
============= | ||
worker_pool | ||
============= | ||
|
||
:Release: |version| | ||
:Date: |today| | ||
|
||
.. contents:: | ||
:local: | ||
:depth: 2 | ||
|
||
Description | ||
=========== | ||
|
||
This example project demonstrates how to use a different `worker pool <https://docs.celeryq.dev/en/stable/reference/cli.html#cmdoption-celery-worker-P>`_. | ||
The example uses two different methods to run the Celery worker with different pools. | ||
|
||
The following guide will explain each method and how they are used. | ||
|
||
.. tip:: | ||
|
||
See first the :ref:`examples_myworker` example before continuing with this one. | ||
|
||
Breakdown | ||
========= | ||
|
||
File Structure | ||
~~~~~~~~~~~~~~ | ||
|
||
The following diagram lists the relevant files in the project. | ||
|
||
.. code-block:: text | ||
rabbitmq_management/ | ||
├── tests/ | ||
│ ├── __init__.py | ||
│ └── test_gevent_pool.py | ||
│ └── test_solo_pool.py | ||
└── Dockerfile | ||
└── tasks.py | ||
└── requirements.txt | ||
Dockerfile | ||
~~~~~~~~~~ | ||
|
||
To use the gevent pool, we create our own image using a similar Dockerfile to the one in the :ref:`examples_myworker` example. | ||
The purpose of this worker is to ensure the gevent dependency is installed. | ||
|
||
.. literalinclude:: ../../../examples/worker_pool/Dockerfile | ||
:language: docker | ||
:caption: examples.worker_pool.Dockerfile | ||
|
||
.. literalinclude:: ../../../examples/worker_pool/requirements.txt | ||
:language: docker | ||
:caption: examples.worker_pool.requirements.txt | ||
|
||
tasks.py | ||
~~~~~~~~ | ||
|
||
Our tasks module is using the example task from the `Celery gevent example <https://github.com/celery/celery/blob/main/examples/gevent/README.rst>`_. | ||
|
||
.. literalinclude:: ../../../examples/worker_pool/tasks.py | ||
:language: python | ||
:caption: examples.worker_pool.tasks.py | ||
|
||
test_gevent_pool.py | ||
~~~~~~~~~~~~~~~~~~~ | ||
|
||
To add a new gevent worker, we create a new :class:`CeleryWorkerContainer <pytest_celery.vendors.worker.container.CeleryWorkerContainer>` to | ||
configure the worker with the gevent pool. | ||
|
||
.. literalinclude:: ../../../examples/worker_pool/tests/test_gevent_pool.py | ||
:language: python | ||
:caption: examples.worker_pool.tests.test_gevent_pool.py | ||
:end-before: # ---------------------------- | ||
|
||
And then we can just use it in our tests. | ||
|
||
.. literalinclude:: ../../../examples/worker_pool/tests/test_gevent_pool.py | ||
:language: python | ||
:caption: examples.worker_pool.tests.test_gevent_pool.py | ||
:start-after: # ---------------------------- | ||
|
||
test_solo_pool.py | ||
~~~~~~~~~~~~~~~~~ | ||
|
||
The solo pool example on the other hand, reconfigures the default :ref:`built-in-worker` | ||
as it does not require any additional dependencies. | ||
|
||
.. literalinclude:: ../../../examples/worker_pool/tests/test_solo_pool.py | ||
:language: python | ||
:caption: examples.worker_pool.tests.test_solo_pool.py |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
FROM python:3.11-bookworm | ||
|
||
# Create a user to run the worker | ||
RUN adduser --disabled-password --gecos "" test_user | ||
|
||
# Install system dependencies | ||
RUN apt-get update && apt-get install -y build-essential git libevent-dev | ||
|
||
# Set arguments | ||
ARG CELERY_LOG_LEVEL=INFO | ||
ARG CELERY_WORKER_NAME=my_worker | ||
ARG CELERY_WORKER_QUEUE=celery | ||
ENV LOG_LEVEL=$CELERY_LOG_LEVEL | ||
ENV WORKER_NAME=$CELERY_WORKER_NAME | ||
ENV WORKER_QUEUE=$CELERY_WORKER_QUEUE | ||
|
||
# Install packages | ||
COPY --chown=test_user:test_user requirements.txt . | ||
RUN pip install --no-cache-dir --upgrade pip | ||
RUN pip install -r ./requirements.txt | ||
|
||
# The workdir must be /app | ||
WORKDIR /app | ||
|
||
# Switch to the test_user | ||
USER test_user | ||
|
||
# Start the celery worker | ||
CMD celery -A app worker --loglevel=$LOG_LEVEL -n $WORKER_NAME@%h -Q $WORKER_QUEUE |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
[pytest] | ||
log_cli = true | ||
log_cli_level = INFO | ||
log_cli_format = %(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s) | ||
log_cli_date_format = %Y-%m-%d %H:%M:%S |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
pytest>=7.4.4 | ||
pytest-xdist>=3.5.0 | ||
pytest-subtests>=0.11.0 | ||
celery[gevent] | ||
pytest-celery[all]@git+https://github.com/celery/pytest-celery.git |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
# Based on https://github.com/celery/celery/blob/main/examples/gevent/tasks.py | ||
|
||
import requests | ||
from celery import shared_task | ||
|
||
|
||
@shared_task(ignore_result=True) | ||
def urlopen(url): | ||
print(f"Opening: {url}") | ||
try: | ||
requests.get(url) | ||
except requests.exceptions.RequestException as exc: | ||
print(f"Exception for {url}: {exc!r}") | ||
return url, 0 | ||
print(f"Done with: {url}") | ||
return url, 1 |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
from __future__ import annotations | ||
|
||
import pytest | ||
import tasks | ||
from celery import Celery | ||
from celery.canvas import Signature | ||
from celery.canvas import group | ||
from celery.result import AsyncResult | ||
from pytest_docker_tools import build | ||
from pytest_docker_tools import container | ||
from pytest_docker_tools import fxtr | ||
|
||
from pytest_celery import RESULT_TIMEOUT | ||
from pytest_celery import CeleryTestSetup | ||
from pytest_celery import CeleryTestWorker | ||
from pytest_celery import CeleryWorkerCluster | ||
from pytest_celery import CeleryWorkerContainer | ||
from pytest_celery import defaults | ||
from pytest_celery import ping | ||
|
||
|
||
class GeventWorkerContainer(CeleryWorkerContainer): | ||
@classmethod | ||
def command(cls, *args: str) -> list[str]: | ||
return super().command("-P", "gevent", "-c", "1000") | ||
|
||
|
||
gevent_worker_image = build( | ||
path=".", | ||
dockerfile="Dockerfile", | ||
tag="pytest-celery/examples/worker_pool:gevent", | ||
buildargs=GeventWorkerContainer.buildargs(), | ||
) | ||
|
||
|
||
gevent_worker_container = container( | ||
image="{gevent_worker_image.id}", | ||
environment=fxtr("default_worker_env"), | ||
network="{default_pytest_celery_network.name}", | ||
volumes={"{default_worker_volume.name}": defaults.DEFAULT_WORKER_VOLUME}, | ||
wrapper_class=GeventWorkerContainer, | ||
timeout=defaults.DEFAULT_WORKER_CONTAINER_TIMEOUT, | ||
command=GeventWorkerContainer.command(), | ||
) | ||
|
||
|
||
@pytest.fixture | ||
def gevent_worker(gevent_worker_container: GeventWorkerContainer, celery_setup_app: Celery) -> CeleryTestWorker: | ||
worker = CeleryTestWorker(gevent_worker_container, app=celery_setup_app) | ||
yield worker | ||
worker.teardown() | ||
|
||
|
||
@pytest.fixture | ||
def celery_worker_cluster(gevent_worker: CeleryTestWorker) -> CeleryWorkerCluster: | ||
cluster = CeleryWorkerCluster(gevent_worker) | ||
yield cluster | ||
cluster.teardown() | ||
|
||
|
||
@pytest.fixture | ||
def default_worker_tasks(default_worker_tasks: set) -> set: | ||
default_worker_tasks.add(tasks) | ||
return default_worker_tasks | ||
|
||
|
||
# ---------------------------- | ||
|
||
|
||
class TestGeventPool: | ||
def test_celery_banner(self, gevent_worker: CeleryTestWorker): | ||
gevent_worker.assert_log_exists("concurrency: 1000 (gevent)") | ||
|
||
def test_ping(self, celery_setup: CeleryTestSetup): | ||
sig: Signature = ping.s() | ||
res: AsyncResult = sig.apply_async() | ||
assert res.get(timeout=RESULT_TIMEOUT) == "pong" | ||
|
||
def test_celery_gevent_example(self, celery_setup: CeleryTestSetup): | ||
"""Based on https://github.com/celery/celery/tree/main/examples/gevent""" | ||
LIST_OF_URLS = [ | ||
"https://github.com/celery", | ||
"https://github.com/celery/celery", | ||
"https://github.com/celery/pytest-celery", | ||
] | ||
group(tasks.urlopen.s(url) for url in LIST_OF_URLS).apply_async() | ||
celery_setup.worker.assert_log_does_not_exist("Exception for") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
from __future__ import annotations | ||
|
||
import pytest | ||
from celery.canvas import Signature | ||
from celery.result import AsyncResult | ||
|
||
from pytest_celery import RESULT_TIMEOUT | ||
from pytest_celery import CeleryTestSetup | ||
from pytest_celery import CeleryTestWorker | ||
from pytest_celery import CeleryWorkerContainer | ||
from pytest_celery import ping | ||
|
||
|
||
class SoloPoolWorker(CeleryWorkerContainer): | ||
@classmethod | ||
def command(cls, *args: str) -> list[str]: | ||
return super().command("-P", "solo") | ||
|
||
|
||
@pytest.fixture | ||
def default_worker_container_cls() -> type[CeleryWorkerContainer]: | ||
return SoloPoolWorker | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
def default_worker_container_session_cls() -> type[CeleryWorkerContainer]: | ||
return SoloPoolWorker | ||
|
||
|
||
class TestSoloPool: | ||
def test_celery_banner(self, celery_worker: CeleryTestWorker): | ||
celery_worker.assert_log_exists("solo") | ||
|
||
def test_ping(self, celery_setup: CeleryTestSetup): | ||
sig: Signature = ping.s() | ||
res: AsyncResult = sig.apply_async() | ||
assert res.get(timeout=RESULT_TIMEOUT) == "pong" |