Skip to content

Commit

Permalink
Added psutil support to worker container (#114)
Browse files Browse the repository at this point in the history
* Refactored src/pytest_celery/vendors/worker/Dockerfile: Split pip packages

* Refactored WorkerInitialContent.generate(): Split logic into private methods

* src/pytest_celery/vendors/worker/app.py -> src/pytest_celery/vendors/worker/content/app.py

* Added 'utils.py' to /app in the worker's container

* Added psutil package and CeleryTestWorker.get_running_processes_info() API

* Fixed unit tests

* Fixed error with type annotations in Python < 3.10

* Fixed mypy errors

* Reduced reruns for parallel CI runs to default (a.k.a tox.ini)

* Reduced reruns for integration & smoke CI runs from 10 -> 2 reruns
  • Loading branch information
Nusnus authored Dec 25, 2023
1 parent 1337916 commit ea2f656
Show file tree
Hide file tree
Showing 15 changed files with 372 additions and 256 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/parallel-support.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ jobs:
poetry install --with dev,test,ci
- name: Run tox for all environments in parallel
timeout-minutes: 20
timeout-minutes: 15
run: |
tox -e xdist -- --reruns 10
tox -e xdist
parallel:
runs-on: ${{ matrix.os }}
Expand Down Expand Up @@ -112,6 +112,6 @@ jobs:
poetry install --with dev,test,ci
- name: Run tox for all environments in parallel
timeout-minutes: 20
timeout-minutes: 15
run: |
tox -e parallel -- --reruns 10
tox -e parallel
4 changes: 2 additions & 2 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ jobs:
- name: Run tox for "${{ matrix.python-version }}-integration"
timeout-minutes: 15
run: |
tox --verbose --verbose -e "${{ matrix.python-version }}-integration" -- -n auto --reruns 10 --rerun-except AssertionError
tox --verbose --verbose -e "${{ matrix.python-version }}-integration" -- -n auto --reruns 2 --rerun-except AssertionError
Smoke:
needs:
Expand Down Expand Up @@ -172,4 +172,4 @@ jobs:
- name: Run tox for "${{ matrix.python-version }}-smoke"
timeout-minutes: 15
run: |
tox --verbose --verbose -e "${{ matrix.python-version }}-smoke" -- -n auto --reruns 10 --rerun-except AssertionError
tox --verbose --verbose -e "${{ matrix.python-version }}-smoke" -- -n auto --reruns 2 --rerun-except AssertionError
470 changes: 249 additions & 221 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ celery = { version = "^5", extras = ["redis", "memcache", "pymemcache"] }
retry = "^0.9.2"
pytest-docker-tools = "^3.1.3"
docker = "^7.0.0"
psutil = "^5.9.7"

[tool.poetry.group.dev]

Expand Down
22 changes: 22 additions & 0 deletions src/pytest_celery/api/worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import json

from celery import Celery

from pytest_celery.api.base import CeleryTestCluster
Expand Down Expand Up @@ -34,6 +36,26 @@ def worker_queue(self) -> str:
def hostname(self) -> str:
return f"{self.worker_name}@{super().hostname()}"

def get_running_processes_info(
self,
columns: list[str] | None = None,
filters: dict[str, str] | None = None,
) -> list[dict]:
exit_code, output = self.container.exec_run(
f'python -c "from utils import get_running_processes_info; print(get_running_processes_info({columns!r}))"'
)

if exit_code != 0:
raise RuntimeError(f"Failed to get processes info: {output}")

decoded_str = output.decode("utf-8")
output = json.loads(decoded_str)

if filters:
output = [item for item in output if all(item.get(key) == value for key, value in filters.items())]

return output


class CeleryWorkerCluster(CeleryTestCluster):
def __init__(self, *workers: tuple[CeleryTestWorker | CeleryTestContainer]) -> None:
Expand Down
8 changes: 5 additions & 3 deletions src/pytest_celery/vendors/worker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1

# Install Python dependencies
RUN pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir celery[redis,memcache,pymemcache]${CELERY_VERSION:+==$CELERY_VERSION}
RUN pip install --no-cache-dir --upgrade \
pip \
celery[redis,memcache,pymemcache]${CELERY_VERSION:+==$CELERY_VERSION} \
psutil

# The workdir must be /app
WORKDIR /app

COPY app.py app.py
COPY content/ .

# Switch to the test_user
USER test_user
Expand Down
13 changes: 12 additions & 1 deletion src/pytest_celery/vendors/worker/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,16 @@ def worker_queue(cls) -> str:

@classmethod
def app_module(cls) -> ModuleType:
from pytest_celery.vendors.worker import app
from pytest_celery.vendors.worker.content import app

return app

@classmethod
def utils_module(cls) -> ModuleType:
from pytest_celery.vendors.worker.content import utils

return utils

@classmethod
def tasks_modules(cls) -> set:
from pytest_celery.vendors.worker import tasks
Expand Down Expand Up @@ -88,15 +94,20 @@ def initial_content(
worker_signals: set | None = None,
worker_app: Celery | None = None,
app_module: ModuleType | None = None,
utils_module: ModuleType | None = None,
) -> dict:
if app_module is None:
app_module = cls.app_module()

if utils_module is None:
utils_module = cls.utils_module()

if worker_tasks is None:
worker_tasks = cls.tasks_modules()

content = WorkerInitialContent()
content.set_app_module(app_module)
content.set_utils_module(utils_module)
content.add_modules("tasks", worker_tasks)
if worker_signals:
content.add_modules("signals", worker_signals)
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
""" Template for Celery worker application. """
from __future__ import annotations

import json
import logging
Expand Down
20 changes: 20 additions & 0 deletions src/pytest_celery/vendors/worker/content/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from __future__ import annotations

import json

import psutil


def get_running_processes_info(columns: list[str] | None = None) -> str:
if not columns:
columns = [
"pid",
"name",
"username",
"cmdline",
"cpu_percent",
"memory_percent",
"create_time",
]
processes = [proc.info for proc in psutil.process_iter(columns)]
return json.dumps(processes)
7 changes: 7 additions & 0 deletions src/pytest_celery/vendors/worker/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,14 @@ def default_worker_env(
def default_worker_initial_content(
default_worker_container_cls: type[CeleryWorkerContainer],
default_worker_app_module: ModuleType,
default_worker_utils_module: ModuleType,
default_worker_tasks: set,
default_worker_signals: set,
default_worker_app: Celery,
) -> dict:
yield default_worker_container_cls.initial_content(
app_module=default_worker_app_module,
utils_module=default_worker_utils_module,
worker_tasks=default_worker_tasks,
worker_signals=default_worker_signals,
worker_app=default_worker_app,
Expand All @@ -121,6 +123,11 @@ def default_worker_app_module(default_worker_container_cls: type[CeleryWorkerCon
yield default_worker_container_cls.app_module()


@pytest.fixture
def default_worker_utils_module(default_worker_container_cls: type[CeleryWorkerContainer]) -> ModuleType:
yield default_worker_container_cls.utils_module()


@pytest.fixture
def default_worker_tasks(default_worker_container_cls: type[CeleryWorkerContainer]) -> set:
yield default_worker_container_cls.tasks_modules()
Expand Down
52 changes: 37 additions & 15 deletions src/pytest_celery/vendors/worker/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,34 @@ def config(self, app: Celery | None = None) -> str:
config = "config = None"
return config

def __init__(self, app_module: ModuleType | None = None) -> None:
def __init__(
self,
app_module: ModuleType | None = None,
utils_module: ModuleType | None = None,
) -> None:
self.parser = self.Parser()
self._initial_content = {
"__init__.py": b"",
"imports": dict(),
"imports": dict(), # Placeholder item
}
self.set_app_module(app_module)
self.set_utils_module(utils_module)
self.set_app_name()
self.set_config_from_object()

def __eq__(self, __value: object) -> bool:
if not isinstance(__value, WorkerInitialContent):
def __eq__(self, other: object) -> bool:
if not isinstance(other, WorkerInitialContent):
return False
try:
return self.generate() == __value.generate()
return self.generate() == other.generate()
except ValueError:
return all(
[
self._app_module_src == __value._app_module_src,
self._initial_content == __value._initial_content,
self._app == __value._app,
self._config == __value._config,
self._app_module_src == other._app_module_src,
self._utils_module_src == other._utils_module_src,
self._initial_content == other._initial_content,
self._app == other._app,
self._config == other._config,
]
)

Expand All @@ -81,6 +87,14 @@ def set_app_module(self, app_module: ModuleType | None = None) -> None:
else:
self._app_module_src = None

def set_utils_module(self, utils_module: ModuleType | None = None) -> None:
self._utils_module_src: str | None

if utils_module:
self._utils_module_src = inspect.getsource(utils_module)
else:
self._utils_module_src = None

def add_modules(self, name: str, modules: set[ModuleType]) -> None:
if not name:
raise ValueError("name cannot be empty")
Expand All @@ -99,13 +113,17 @@ def set_config_from_object(self, app: Celery | None = None) -> None:
self._config = self.parser.config(app)

def generate(self) -> dict:
if not self._app_module_src:
raise ValueError("Please set_app_module() before calling generate()")

initial_content = self._initial_content.copy()
initial_content["app.py"] = self._generate_app_py(initial_content)
initial_content["utils.py"] = self._generate_utils_py()
return initial_content

def _generate_app_py(self, initial_content: dict) -> bytes:
if not self._app_module_src:
raise ValueError("Please set_app_module() before generating initial content")

if not initial_content["imports"]:
raise ValueError("Please add_modules() before calling generate()")
raise ValueError("Please add_modules() before generating initial content")

_imports: dict | Any = initial_content.pop("imports")
imports = "{%s}" % "}{".join(_imports.keys())
Expand All @@ -122,6 +140,10 @@ def generate(self) -> dict:
self._app_module_src = self._app_module_src.replace(replacement_args["app"], app)
self._app_module_src = self._app_module_src.replace(replacement_args["config"], config)

initial_content["app.py"] = self._app_module_src.encode()
return self._app_module_src.encode()

return initial_content
def _generate_utils_py(self) -> bytes:
if not self._utils_module_src:
raise ValueError("Please set_utils_module() before generating initial content")

return self._utils_module_src.encode()
2 changes: 1 addition & 1 deletion tests/integration/vendors/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def default_worker_app_module(self, request: pytest.FixtureRequest) -> ModuleTyp
if request.param == "Default":
yield request.getfixturevalue("default_worker_app_module")
else:
from pytest_celery.vendors.worker import app
from pytest_celery.vendors.worker.content import app

yield app

Expand Down
18 changes: 10 additions & 8 deletions tests/unit/vendors/test_worker/test_volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_init(self):
assert actual_content._config == "config = None"

def test_eq(self):
from pytest_celery.vendors.worker import app as app_module
from pytest_celery.vendors.worker.content import app as app_module

actual_content = WorkerInitialContent()

Expand All @@ -87,7 +87,7 @@ def test_eq(self):
assert actual_content != WorkerInitialContent()

def test_set_app_module(self):
from pytest_celery.vendors.worker import app as app_module
from pytest_celery.vendors.worker.content import app as app_module

actual_content = WorkerInitialContent()

Expand All @@ -114,13 +114,16 @@ def test_set_app_module(self):
],
)
def test_add_modules(self, modules: dict[str, set[ModuleType]]):
from pytest_celery.vendors.worker import app as app_module
from pytest_celery.vendors.worker.content import app as app_module
from pytest_celery.vendors.worker.content import utils as utils_module

actual_content = WorkerInitialContent()

expected_content = WorkerInitialContent()
actual_content.set_app_module(app_module)
actual_content.set_utils_module(utils_module)
expected_content.set_app_module(app_module)
expected_content.set_utils_module(utils_module)

for module_name, module_set in modules.items():
actual_content.add_modules(module_name, module_set)
Expand Down Expand Up @@ -172,15 +175,14 @@ def test_generate_default(self):
],
)
def test_generate(self, app: Celery | None):
from pytest_celery.vendors.worker import app as app_module
from pytest_celery.vendors.worker.content import app as app_module
from pytest_celery.vendors.worker.content import utils as utils_module

actual_content = WorkerInitialContent()

actual_content.set_app_module(app_module)
actual_content = WorkerInitialContent(app_module, utils_module)
actual_content.add_modules("tests_modules", {pytest, pytest_celery})
actual_content.set_config_from_object(app)

expected_content = WorkerInitialContent(app_module)
expected_content = WorkerInitialContent(app_module, utils_module)
expected_content.add_modules("tests_modules", {pytest, pytest_celery})
expected_content.set_config_from_object(app)

Expand Down
2 changes: 1 addition & 1 deletion tests/unit/vendors/test_worker/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test_worker_queue(self):
assert CeleryWorkerContainer.worker_queue() == DEFAULT_WORKER_QUEUE

def test_app_module(self):
from pytest_celery.vendors.worker import app
from pytest_celery.vendors.worker.content import app

assert CeleryWorkerContainer.app_module() == app

Expand Down

0 comments on commit ea2f656

Please sign in to comment.