Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added psutil support to worker container #114

Merged
merged 10 commits into from
Dec 25, 2023
8 changes: 4 additions & 4 deletions .github/workflows/parallel-support.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ jobs:
poetry install --with dev,test,ci

- name: Run tox for all environments in parallel
timeout-minutes: 20
timeout-minutes: 15
run: |
tox -e xdist -- --reruns 10
tox -e xdist

parallel:
runs-on: ${{ matrix.os }}
Expand Down Expand Up @@ -112,6 +112,6 @@ jobs:
poetry install --with dev,test,ci

- name: Run tox for all environments in parallel
timeout-minutes: 20
timeout-minutes: 15
run: |
tox -e parallel -- --reruns 10
tox -e parallel
4 changes: 2 additions & 2 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ jobs:
- name: Run tox for "${{ matrix.python-version }}-integration"
timeout-minutes: 15
run: |
tox --verbose --verbose -e "${{ matrix.python-version }}-integration" -- -n auto --reruns 10 --rerun-except AssertionError
tox --verbose --verbose -e "${{ matrix.python-version }}-integration" -- -n auto --reruns 2 --rerun-except AssertionError

Smoke:
needs:
Expand Down Expand Up @@ -172,4 +172,4 @@ jobs:
- name: Run tox for "${{ matrix.python-version }}-smoke"
timeout-minutes: 15
run: |
tox --verbose --verbose -e "${{ matrix.python-version }}-smoke" -- -n auto --reruns 10 --rerun-except AssertionError
tox --verbose --verbose -e "${{ matrix.python-version }}-smoke" -- -n auto --reruns 2 --rerun-except AssertionError
470 changes: 249 additions & 221 deletions poetry.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ celery = { version = "^5", extras = ["redis", "memcache", "pymemcache"] }
retry = "^0.9.2"
pytest-docker-tools = "^3.1.3"
docker = "^7.0.0"
psutil = "^5.9.7"

[tool.poetry.group.dev]

Expand Down
22 changes: 22 additions & 0 deletions src/pytest_celery/api/worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import json

Check warning on line 3 in src/pytest_celery/api/worker.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/api/worker.py#L3

Added line #L3 was not covered by tests

from celery import Celery

from pytest_celery.api.base import CeleryTestCluster
Expand Down Expand Up @@ -34,6 +36,26 @@
def hostname(self) -> str:
return f"{self.worker_name}@{super().hostname()}"

def get_running_processes_info(

Check warning on line 39 in src/pytest_celery/api/worker.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/api/worker.py#L39

Added line #L39 was not covered by tests
self,
columns: list[str] | None = None,
filters: dict[str, str] | None = None,
) -> list[dict]:
exit_code, output = self.container.exec_run(

Check warning on line 44 in src/pytest_celery/api/worker.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/api/worker.py#L44

Added line #L44 was not covered by tests
f'python -c "from utils import get_running_processes_info; print(get_running_processes_info({columns!r}))"'
)

if exit_code != 0:
raise RuntimeError(f"Failed to get processes info: {output}")

Check warning on line 49 in src/pytest_celery/api/worker.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/api/worker.py#L49

Added line #L49 was not covered by tests

decoded_str = output.decode("utf-8")
output = json.loads(decoded_str)

Check warning on line 52 in src/pytest_celery/api/worker.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/api/worker.py#L51-L52

Added lines #L51 - L52 were not covered by tests

if filters:
output = [item for item in output if all(item.get(key) == value for key, value in filters.items())]

return output

Check warning on line 57 in src/pytest_celery/api/worker.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/api/worker.py#L57

Added line #L57 was not covered by tests


class CeleryWorkerCluster(CeleryTestCluster):
def __init__(self, *workers: tuple[CeleryTestWorker | CeleryTestContainer]) -> None:
Expand Down
8 changes: 5 additions & 3 deletions src/pytest_celery/vendors/worker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ ENV PYTHONUNBUFFERED=1
ENV PYTHONDONTWRITEBYTECODE=1

# Install Python dependencies
RUN pip install --no-cache-dir --upgrade pip \
&& pip install --no-cache-dir celery[redis,memcache,pymemcache]${CELERY_VERSION:+==$CELERY_VERSION}
RUN pip install --no-cache-dir --upgrade \
pip \
celery[redis,memcache,pymemcache]${CELERY_VERSION:+==$CELERY_VERSION} \
psutil

# The workdir must be /app
WORKDIR /app

COPY app.py app.py
COPY content/ .

# Switch to the test_user
USER test_user
Expand Down
13 changes: 12 additions & 1 deletion src/pytest_celery/vendors/worker/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,16 @@

@classmethod
def app_module(cls) -> ModuleType:
from pytest_celery.vendors.worker import app
from pytest_celery.vendors.worker.content import app

return app

@classmethod

Check warning on line 46 in src/pytest_celery/vendors/worker/container.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/vendors/worker/container.py#L46

Added line #L46 was not covered by tests
def utils_module(cls) -> ModuleType:
from pytest_celery.vendors.worker.content import utils

return utils

@classmethod
def tasks_modules(cls) -> set:
from pytest_celery.vendors.worker import tasks
Expand Down Expand Up @@ -88,15 +94,20 @@
worker_signals: set | None = None,
worker_app: Celery | None = None,
app_module: ModuleType | None = None,
utils_module: ModuleType | None = None,
) -> dict:
if app_module is None:
app_module = cls.app_module()

if utils_module is None:
utils_module = cls.utils_module()

if worker_tasks is None:
worker_tasks = cls.tasks_modules()

content = WorkerInitialContent()
content.set_app_module(app_module)
content.set_utils_module(utils_module)
content.add_modules("tasks", worker_tasks)
if worker_signals:
content.add_modules("signals", worker_signals)
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
""" Template for Celery worker application. """
from __future__ import annotations

import json
import logging
Expand Down
20 changes: 20 additions & 0 deletions src/pytest_celery/vendors/worker/content/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from __future__ import annotations

import json

import psutil


def get_running_processes_info(columns: list[str] | None = None) -> str:
if not columns:
columns = [

Check warning on line 10 in src/pytest_celery/vendors/worker/content/utils.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/vendors/worker/content/utils.py#L10

Added line #L10 was not covered by tests
"pid",
"name",
"username",
"cmdline",
"cpu_percent",
"memory_percent",
"create_time",
]
processes = [proc.info for proc in psutil.process_iter(columns)]
return json.dumps(processes)

Check warning on line 20 in src/pytest_celery/vendors/worker/content/utils.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/vendors/worker/content/utils.py#L20

Added line #L20 was not covered by tests
7 changes: 7 additions & 0 deletions src/pytest_celery/vendors/worker/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,14 @@
def default_worker_initial_content(
default_worker_container_cls: type[CeleryWorkerContainer],
default_worker_app_module: ModuleType,
default_worker_utils_module: ModuleType,
default_worker_tasks: set,
default_worker_signals: set,
default_worker_app: Celery,
) -> dict:
yield default_worker_container_cls.initial_content(
app_module=default_worker_app_module,
utils_module=default_worker_utils_module,
worker_tasks=default_worker_tasks,
worker_signals=default_worker_signals,
worker_app=default_worker_app,
Expand All @@ -121,6 +123,11 @@
yield default_worker_container_cls.app_module()


@pytest.fixture

Check warning on line 126 in src/pytest_celery/vendors/worker/fixtures.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/vendors/worker/fixtures.py#L126

Added line #L126 was not covered by tests
def default_worker_utils_module(default_worker_container_cls: type[CeleryWorkerContainer]) -> ModuleType:
yield default_worker_container_cls.utils_module()

Check warning on line 128 in src/pytest_celery/vendors/worker/fixtures.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/vendors/worker/fixtures.py#L128

Added line #L128 was not covered by tests


@pytest.fixture
def default_worker_tasks(default_worker_container_cls: type[CeleryWorkerContainer]) -> set:
yield default_worker_container_cls.tasks_modules()
Expand Down
52 changes: 37 additions & 15 deletions src/pytest_celery/vendors/worker/volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,34 @@
config = "config = None"
return config

def __init__(self, app_module: ModuleType | None = None) -> None:
def __init__(

Check warning on line 51 in src/pytest_celery/vendors/worker/volume.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/vendors/worker/volume.py#L51

Added line #L51 was not covered by tests
self,
app_module: ModuleType | None = None,
utils_module: ModuleType | None = None,
) -> None:
self.parser = self.Parser()
self._initial_content = {
"__init__.py": b"",
"imports": dict(),
"imports": dict(), # Placeholder item
}
self.set_app_module(app_module)
self.set_utils_module(utils_module)
self.set_app_name()
self.set_config_from_object()

def __eq__(self, __value: object) -> bool:
if not isinstance(__value, WorkerInitialContent):
def __eq__(self, other: object) -> bool:

Check warning on line 66 in src/pytest_celery/vendors/worker/volume.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/vendors/worker/volume.py#L66

Added line #L66 was not covered by tests
if not isinstance(other, WorkerInitialContent):
return False
try:
return self.generate() == __value.generate()
return self.generate() == other.generate()
except ValueError:
return all(
[
self._app_module_src == __value._app_module_src,
self._initial_content == __value._initial_content,
self._app == __value._app,
self._config == __value._config,
self._app_module_src == other._app_module_src,
self._utils_module_src == other._utils_module_src,
self._initial_content == other._initial_content,
self._app == other._app,
self._config == other._config,
]
)

Expand All @@ -81,6 +87,14 @@
else:
self._app_module_src = None

def set_utils_module(self, utils_module: ModuleType | None = None) -> None:

Check warning on line 90 in src/pytest_celery/vendors/worker/volume.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/vendors/worker/volume.py#L90

Added line #L90 was not covered by tests
self._utils_module_src: str | None

if utils_module:
self._utils_module_src = inspect.getsource(utils_module)
else:
self._utils_module_src = None

def add_modules(self, name: str, modules: set[ModuleType]) -> None:
if not name:
raise ValueError("name cannot be empty")
Expand All @@ -99,13 +113,17 @@
self._config = self.parser.config(app)

def generate(self) -> dict:
if not self._app_module_src:
raise ValueError("Please set_app_module() before calling generate()")

initial_content = self._initial_content.copy()
initial_content["app.py"] = self._generate_app_py(initial_content)
initial_content["utils.py"] = self._generate_utils_py()
return initial_content

def _generate_app_py(self, initial_content: dict) -> bytes:

Check warning on line 121 in src/pytest_celery/vendors/worker/volume.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/vendors/worker/volume.py#L121

Added line #L121 was not covered by tests
if not self._app_module_src:
raise ValueError("Please set_app_module() before generating initial content")

if not initial_content["imports"]:
raise ValueError("Please add_modules() before calling generate()")
raise ValueError("Please add_modules() before generating initial content")

Check warning on line 126 in src/pytest_celery/vendors/worker/volume.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/vendors/worker/volume.py#L126

Added line #L126 was not covered by tests

_imports: dict | Any = initial_content.pop("imports")
imports = "{%s}" % "}{".join(_imports.keys())
Expand All @@ -122,6 +140,10 @@
self._app_module_src = self._app_module_src.replace(replacement_args["app"], app)
self._app_module_src = self._app_module_src.replace(replacement_args["config"], config)

initial_content["app.py"] = self._app_module_src.encode()
return self._app_module_src.encode()

return initial_content
def _generate_utils_py(self) -> bytes:

Check warning on line 145 in src/pytest_celery/vendors/worker/volume.py

View check run for this annotation

Codecov / codecov/patch

src/pytest_celery/vendors/worker/volume.py#L145

Added line #L145 was not covered by tests
if not self._utils_module_src:
raise ValueError("Please set_utils_module() before generating initial content")

return self._utils_module_src.encode()
2 changes: 1 addition & 1 deletion tests/integration/vendors/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def default_worker_app_module(self, request: pytest.FixtureRequest) -> ModuleTyp
if request.param == "Default":
yield request.getfixturevalue("default_worker_app_module")
else:
from pytest_celery.vendors.worker import app
from pytest_celery.vendors.worker.content import app

yield app

Expand Down
18 changes: 10 additions & 8 deletions tests/unit/vendors/test_worker/test_volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def test_init(self):
assert actual_content._config == "config = None"

def test_eq(self):
from pytest_celery.vendors.worker import app as app_module
from pytest_celery.vendors.worker.content import app as app_module

actual_content = WorkerInitialContent()

Expand All @@ -87,7 +87,7 @@ def test_eq(self):
assert actual_content != WorkerInitialContent()

def test_set_app_module(self):
from pytest_celery.vendors.worker import app as app_module
from pytest_celery.vendors.worker.content import app as app_module

actual_content = WorkerInitialContent()

Expand All @@ -114,13 +114,16 @@ def test_set_app_module(self):
],
)
def test_add_modules(self, modules: dict[str, set[ModuleType]]):
from pytest_celery.vendors.worker import app as app_module
from pytest_celery.vendors.worker.content import app as app_module
from pytest_celery.vendors.worker.content import utils as utils_module

actual_content = WorkerInitialContent()

expected_content = WorkerInitialContent()
actual_content.set_app_module(app_module)
actual_content.set_utils_module(utils_module)
expected_content.set_app_module(app_module)
expected_content.set_utils_module(utils_module)

for module_name, module_set in modules.items():
actual_content.add_modules(module_name, module_set)
Expand Down Expand Up @@ -172,15 +175,14 @@ def test_generate_default(self):
],
)
def test_generate(self, app: Celery | None):
from pytest_celery.vendors.worker import app as app_module
from pytest_celery.vendors.worker.content import app as app_module
from pytest_celery.vendors.worker.content import utils as utils_module

actual_content = WorkerInitialContent()

actual_content.set_app_module(app_module)
actual_content = WorkerInitialContent(app_module, utils_module)
actual_content.add_modules("tests_modules", {pytest, pytest_celery})
actual_content.set_config_from_object(app)

expected_content = WorkerInitialContent(app_module)
expected_content = WorkerInitialContent(app_module, utils_module)
expected_content.add_modules("tests_modules", {pytest, pytest_celery})
expected_content.set_config_from_object(app)

Expand Down
2 changes: 1 addition & 1 deletion tests/unit/vendors/test_worker/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def test_worker_queue(self):
assert CeleryWorkerContainer.worker_queue() == DEFAULT_WORKER_QUEUE

def test_app_module(self):
from pytest_celery.vendors.worker import app
from pytest_celery.vendors.worker.content import app

assert CeleryWorkerContainer.app_module() == app

Expand Down