diff --git a/.config/dictionary.txt b/.config/dictionary.txt index 92c29c9..c8ce714 100644 --- a/.config/dictionary.txt +++ b/.config/dictionary.txt @@ -3,9 +3,12 @@ bindir bthornto caplog capsys +cauthor +cdescription cnamespace cpart cpath +crepository fileh fqcn levelname diff --git a/.vscode/settings.json b/.vscode/settings.json index 454b3fa..ac23230 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -14,6 +14,5 @@ "mypy.targets": ["src", "tests"], "[jsonc]": { "editor.defaultFormatter": "esbenp.prettier-vscode" - }, - "python.formatting.provider": "none" + } } diff --git a/src/pip4a/cli.py b/src/pip4a/cli.py index 69fe00d..011c066 100644 --- a/src/pip4a/cli.py +++ b/src/pip4a/cli.py @@ -138,10 +138,13 @@ def run(self: Cli) -> None: """Run the application.""" logging.getLogger("pip4a") - term_features = TermFeatures( - color=False if os.environ.get("NO_COLOR") else not self.args.no_ansi, - links=not self.args.no_ansi, - ) + if not sys.stdout.isatty(): + term_features = TermFeatures(color=False, links=False) + else: + term_features = TermFeatures( + color=False if os.environ.get("NO_COLOR") else not self.args.no_ansi, + links=not self.args.no_ansi, + ) self.config = Config(args=self.args, term_features=term_features) self.config.init() diff --git a/src/pip4a/collection.py b/src/pip4a/collection.py new file mode 100644 index 0000000..fd12429 --- /dev/null +++ b/src/pip4a/collection.py @@ -0,0 +1,181 @@ +"""A collection abstraction.""" +from __future__ import annotations + +import logging +import re +import sys + +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING + +import yaml + +from .utils import hint + + +if TYPE_CHECKING: + from .config import Config + + +logger = logging.getLogger(__name__) + + +@dataclass +class Collection: + """A collection request specification.""" + + config: Config + path: Path | None = None + opt_deps: str | None = None + local: bool | None = None + cnamespace: str | None = None + cname: str | None = None + specifier: str | None = None + + @property + def name(self: Collection) -> str: + """Return the collection name.""" + return f"{self.cnamespace}.{self.cname}" + + @property + def cache_dir(self: Collection) -> Path: + """Return the collection cache directory.""" + collection_cache_dir = self.config.venv_cache_dir / self.name + if not collection_cache_dir.exists(): + collection_cache_dir.mkdir() + return collection_cache_dir + + @property + def build_dir(self: Collection) -> Path: + """Return the collection cache directory.""" + collection_build_dir = self.cache_dir / "build" + if not collection_build_dir.exists(): + collection_build_dir.mkdir() + return collection_build_dir + + @property + def site_pkg_path(self: Collection) -> Path: + """Return the site packages collection path.""" + if not self.cnamespace or not self.cname: + msg = "Collection namespace or name not set." + raise RuntimeError(msg) + return self.config.site_pkg_collections_path / self.cnamespace / self.cname + + +def parse_collection_request( # noqa: PLR0915 + string: str, + config: Config, +) -> Collection: + """Parse a collection request str.""" + collection = Collection(config=config) + # spec with dep, local + if "[" in string and "]" in string: + msg = f"Found optional dependencies in collection request: {string}" + logger.debug(msg) + path = Path(string.split("[")[0]).expanduser().resolve() + if not path.exists(): + msg = "Provide an existing path to a collection when specifying optional dependencies." + hint(msg) + msg = f"Failed to find collection path: {path}" + logger.critical(msg) + msg = f"Found local collection request with dependencies: {string}" + logger.debug(msg) + collection.path = path + msg = f"Setting collection path: {collection.path}" + collection.opt_deps = string.split("[")[1].split("]")[0] + msg = f"Setting optional dependencies: {collection.opt_deps}" + logger.debug(msg) + collection.local = True + msg = "Setting request as local" + logger.debug(msg) + get_galaxy(collection) + return collection + # spec without dep, local + path = Path(string).expanduser().resolve() + if path.exists(): + msg = f"Found local collection request without dependencies: {string}" + logger.debug(msg) + msg = f"Setting collection path: {path}" + logger.debug(msg) + collection.path = path + msg = "Setting request as local" + logger.debug(msg) + collection.local = True + get_galaxy(collection) + return collection + non_local_re = re.compile( + r""" + (?P[A-Za-z0-9]+) # collection name + \. # dot + (?P[A-Za-z0-9]+) # collection name + (?P[^A-Za-z0-9].*)? # optional specifier + """, + re.VERBOSE, + ) + matched = non_local_re.match(string) + if not matched: + msg = ( + "Specify a valid collection name (ns.n) with an optional version specifier" + ) + hint(msg) + msg = f"Failed to parse collection request: {string}" + logger.critical(msg) + sys.exit(1) + msg = f"Found non-local collection request: {string}" + logger.debug(msg) + + collection.cnamespace = matched.group("cnamespace") + msg = f"Setting collection namespace: {collection.cnamespace}" + logger.debug(msg) + + collection.cname = matched.group("cname") + msg = f"Setting collection name: {collection.cname}" + logger.debug(msg) + + if matched.group("specifier"): + collection.specifier = matched.group("specifier") + msg = f"Setting collection specifier: {collection.specifier}" + logger.debug(msg) + + collection.local = False + msg = "Setting request as non-local" + logger.debug(msg) + + return collection + + +def get_galaxy(collection: Collection) -> None: + """Retrieve the collection name from the galaxy.yml file. + + Args: + collection: A collection object + Raises: + SystemExit: If the collection name is not found + """ + if collection is None or collection.path is None: + msg = "get_galaxy called without a collection or path" + raise RuntimeError(msg) + file_name = collection.path / "galaxy.yml" + if not file_name.exists(): + err = f"Failed to find {file_name} in {collection.path}" + logger.critical(err) + + with file_name.open(encoding="utf-8") as fileh: + try: + yaml_file = yaml.safe_load(fileh) + except yaml.YAMLError as exc: + err = f"Failed to load yaml file: {exc}" + logger.critical(err) + + try: + collection.cnamespace = yaml_file["namespace"] + collection.cname = yaml_file["name"] + msg = f"Found collection name: {collection.name} from {file_name}." + logger.debug(msg) + except KeyError as exc: + err = f"Failed to find collection name in {file_name}: {exc}" + logger.critical(err) + else: + return + raise SystemExit(1) # We shouldn't be here diff --git a/src/pip4a/config.py b/src/pip4a/config.py index 09513ec..a27f5ba 100644 --- a/src/pip4a/config.py +++ b/src/pip4a/config.py @@ -11,15 +11,13 @@ from pathlib import Path from typing import TYPE_CHECKING -import yaml - -from .utils import parse_collection_request, subprocess_run +from .utils import subprocess_run if TYPE_CHECKING: from argparse import Namespace - from .utils import CollectionSpec, TermFeatures + from .utils import TermFeatures logger = logging.getLogger(__name__) @@ -40,23 +38,12 @@ def __init__( self.python_path: Path self.site_pkg_path: Path self.venv_interpreter: Path - self.collection: CollectionSpec self.term_features: TermFeatures = term_features def init(self: Config) -> None: """Initialize the configuration.""" if self.args.venv: self._create_venv = True - if self.args.subcommand == "install" and not self.args.requirement: - self.collection = parse_collection_request(self.args.collection_specifier) - if self.collection.path: - self._get_galaxy() - - elif self.args.subcommand == "uninstall" and not self.args.requirement: - self.collection = parse_collection_request(self.args.collection_specifier) - if self.collection.path: - err = "Please use a collection name for uninstallation." - logger.critical(err) self._set_interpreter() self._set_site_pkg_path() @@ -86,22 +73,6 @@ def venv_cache_dir(self: Config) -> Path: """Return the virtual environment cache directory.""" return self.cache_dir - @property - def collection_cache_dir(self: Config) -> Path: - """Return the collection cache directory.""" - collection_cache_dir = self.venv_cache_dir / self.collection.name - if not collection_cache_dir.exists(): - collection_cache_dir.mkdir() - return collection_cache_dir - - @property - def collection_build_dir(self: Config) -> Path: - """Return the collection cache directory.""" - collection_build_dir = self.collection_cache_dir / "build" - if not collection_build_dir.exists(): - collection_build_dir.mkdir() - return collection_build_dir - @property def discovered_python_reqs(self: Config) -> Path: """Return the discovered python requirements file.""" @@ -120,18 +91,6 @@ def site_pkg_collections_path(self: Config) -> Path: site_pkg_collections_path.mkdir() return site_pkg_collections_path - @property - def site_pkg_collection_path(self: Config) -> Path: - """Return the site packages collection path.""" - if not self.collection.cnamespace or not self.collection.cname: - msg = "Collection namespace or name not set." - raise RuntimeError(msg) - return ( - self.site_pkg_collections_path - / self.collection.cnamespace - / self.collection.cname - ) - @property def venv_bindir(self: Config) -> Path: """Return the virtual environment bin directory.""" @@ -142,42 +101,6 @@ def interpreter(self: Config) -> Path: """Return the current interpreter.""" return Path(sys.executable) - def _get_galaxy(self: Config) -> None: - """Retrieve the collection name from the galaxy.yml file. - - Returns: - str: The collection name and dependencies - - Raises: - SystemExit: If the collection name is not found - """ - if self.collection is None or self.collection.path is None: - msg = "_get_galaxy called without a collection or path" - raise RuntimeError(msg) - file_name = self.collection.path / "galaxy.yml" - if not file_name.exists(): - err = f"Failed to find {file_name} in {self.collection.path}" - logger.critical(err) - - with file_name.open(encoding="utf-8") as fileh: - try: - yaml_file = yaml.safe_load(fileh) - except yaml.YAMLError as exc: - err = f"Failed to load yaml file: {exc}" - logger.critical(err) - - try: - self.collection.cnamespace = yaml_file["namespace"] - self.collection.cname = yaml_file["name"] - msg = f"Found collection name: {self.collection.name} from {file_name}." - logger.debug(msg) - except KeyError as exc: - err = f"Failed to find collection name in {file_name}: {exc}" - logger.critical(err) - else: - return - raise SystemExit(1) # We shouldn't be here - def _set_interpreter( self: Config, ) -> None: @@ -187,8 +110,14 @@ def _set_interpreter( msg = f"Creating virtual environment: {self.venv}" logger.debug(msg) command = f"python -m venv {self.venv}" + work = "Creating virtual environment" try: - subprocess_run(command=command, verbose=self.args.verbose) + subprocess_run( + command=command, + verbose=self.args.verbose, + msg=work, + term_features=self.term_features, + ) msg = f"Created virtual environment: {self.venv}" logger.info(msg) except subprocess.CalledProcessError as exc: @@ -214,8 +143,14 @@ def _set_site_pkg_path(self: Config) -> None: f"{self.venv_interpreter} -c" " 'import json,site; print(json.dumps(site.getsitepackages()))'" ) + work = "Locating site packages directory" try: - proc = subprocess_run(command=command, verbose=self.args.verbose) + proc = subprocess_run( + command=command, + verbose=self.args.verbose, + msg=work, + term_features=self.term_features, + ) except subprocess.CalledProcessError as exc: err = f"Failed to find site packages path: {exc}" logger.critical(err) diff --git a/src/pip4a/subcommands/checker.py b/src/pip4a/subcommands/checker.py index 6cf8eb4..d300465 100644 --- a/src/pip4a/subcommands/checker.py +++ b/src/pip4a/subcommands/checker.py @@ -137,9 +137,15 @@ def _python_deps(self: Checker) -> None: f" {self._config.discovered_python_reqs} --dry-run" f" --report {missing_file}" ) + work = "Building python package dependency tree" try: - subprocess_run(command=command, verbose=self._config.args.verbose) + subprocess_run( + command=command, + verbose=self._config.args.verbose, + msg=work, + term_features=self._config.term_features, + ) except subprocess.CalledProcessError as exc: err = f"Failed to check python dependencies: {exc}" logger.critical(err) diff --git a/src/pip4a/subcommands/inspector.py b/src/pip4a/subcommands/inspector.py index 1ccac38..377287f 100644 --- a/src/pip4a/subcommands/inspector.py +++ b/src/pip4a/subcommands/inspector.py @@ -4,7 +4,6 @@ import json import logging -import os from typing import TYPE_CHECKING @@ -41,7 +40,7 @@ def run(self: Inspector) -> None: ) output = json.dumps(collections, indent=4, sort_keys=True) - if HAS_RICH and not os.environ.get("NOCOLOR"): + if HAS_RICH and self._config.term_features.color: print_json(output) else: print(output) # noqa: T201 diff --git a/src/pip4a/subcommands/installer.py b/src/pip4a/subcommands/installer.py index 281f5d1..338435e 100644 --- a/src/pip4a/subcommands/installer.py +++ b/src/pip4a/subcommands/installer.py @@ -10,6 +10,7 @@ from pathlib import Path from typing import TYPE_CHECKING +from pip4a.collection import Collection, parse_collection_request from pip4a.utils import ( builder_introspect, collections_from_requirements, @@ -36,10 +37,11 @@ def __init__(self: Installer, config: Config) -> None: config: The application configuration. """ self._config = config + self._collection: Collection def run(self: Installer) -> None: """Run the installer.""" - if self._config.args.editable and not self._config.collection.local: + if self._config.args.editable and not self._collection.local: err = "Editable installs are only supported for local collections." logger.critical(err) @@ -54,12 +56,17 @@ def run(self: Installer) -> None: if self._config.args.requirement: self._install_galaxy_requirements() - elif self._config.collection.local: - self._install_local_collection() - if self._config.args.editable: - self._swap_editable_collection() - elif not self._config.collection.local: - self._install_galaxy_collection() + elif self._config.args.collection_specifier: + self._collection = parse_collection_request( + string=self._config.args.collection_specifier, + config=self._config, + ) + if self._collection.local: + self._install_local_collection() + if self._config.args.editable: + self._swap_editable_collection() + elif not self._collection.local: + self._install_galaxy_collection() builder_introspect(config=self._config) self._pip_install() @@ -88,7 +95,12 @@ def _install_core(self: Installer) -> None: logger.debug(msg) command = f"{self._config.venv_interpreter} -m pip install ansible-core" try: - subprocess_run(command=command, verbose=self._config.args.verbose) + subprocess_run( + command=command, + verbose=self._config.args.verbose, + msg=msg, + term_features=self._config.term_features, + ) except subprocess.CalledProcessError as exc: err = f"Failed to install ansible-core: {exc}" logger.critical(err) @@ -98,13 +110,13 @@ def _install_galaxy_collection(self: Installer) -> None: msg = f"Installing collection from galaxy: {self._config.args.collection_specifier}" logger.info(msg) - if self._config.site_pkg_collection_path.exists(): - msg = f"Removing installed {self._config.site_pkg_collection_path}" + if self._collection.site_pkg_path.exists(): + msg = f"Removing installed {self._collection.site_pkg_path}" logger.debug(msg) - if self._config.site_pkg_collection_path.is_symlink(): - self._config.site_pkg_collection_path.unlink() + if self._collection.site_pkg_path.is_symlink(): + self._collection.site_pkg_path.unlink() else: - shutil.rmtree(self._config.site_pkg_collection_path) + shutil.rmtree(self._collection.site_pkg_path) command = ( f"{self._config.venv_bindir / 'ansible-galaxy'} collection" @@ -122,6 +134,8 @@ def _install_galaxy_collection(self: Installer) -> None: command=command, env=env, verbose=self._config.args.verbose, + msg=msg, + term_features=self._config.term_features, ) except subprocess.CalledProcessError as exc: err = f"Failed to install collection: {exc} {exc.stderr}" @@ -155,8 +169,14 @@ def _install_galaxy_requirements(self: Installer) -> None: f" -p {self._config.site_pkg_path}" " --force" ) + work = "Install collections from requirements file" try: - proc = subprocess_run(command=command, verbose=self._config.args.verbose) + proc = subprocess_run( + command=command, + verbose=self._config.args.verbose, + msg=work, + term_features=self._config.term_features, + ) except subprocess.CalledProcessError as exc: err = f"Failed to install collections: {exc} {exc.stderr}" logger.critical(err) @@ -171,29 +191,31 @@ def _install_local_collection(self: Installer) -> None: # noqa: PLR0912, PLR091 Raises: RuntimeError: If tarball is not found or if more than one tarball is found. """ - msg = f"Installing local collection from: {self._config.collection_build_dir}" + msg = f"Installing local collection from: {self._collection.build_dir}" logger.info(msg) command = ( "cp -r --parents $(git ls-files 2> /dev/null || ls)" - f" {self._config.collection_build_dir}" + f" {self._collection.build_dir}" ) msg = "Copying collection to build directory using git ls-files." logger.debug(msg) try: subprocess_run( command=command, - cwd=self._config.collection.path, + cwd=self._collection.path, verbose=self._config.args.verbose, + msg=msg, + term_features=self._config.term_features, ) except subprocess.CalledProcessError as exc: err = f"Failed to copy collection to build directory: {exc} {exc.stderr}" logger.critical(err) command = ( - f"cd {self._config.collection_build_dir} &&" + f"cd {self._collection.build_dir} &&" f" {self._config.venv_bindir / 'ansible-galaxy'} collection build" - f" --output-path {self._config.collection_build_dir}" + f" --output-path {self._collection.build_dir}" " --force" ) @@ -201,38 +223,43 @@ def _install_local_collection(self: Installer) -> None: # noqa: PLR0912, PLR091 logger.debug(msg) try: - subprocess_run(command=command, verbose=self._config.args.verbose) + subprocess_run( + command=command, + verbose=self._config.args.verbose, + msg=msg, + term_features=self._config.term_features, + ) except subprocess.CalledProcessError as exc: err = f"Failed to build collection: {exc} {exc.stderr}" logger.critical(err) built = [ f - for f in Path(self._config.collection_build_dir).iterdir() + for f in Path(self._collection.build_dir).iterdir() if f.is_file() and f.name.endswith(".tar.gz") ] if len(built) != 1: err = ( "Expected to find one collection tarball in" - f"{self._config.collection_build_dir}, found {len(built)}" + f"{self._collection.build_dir}, found {len(built)}" ) raise RuntimeError(err) tarball = built[0] - if self._config.site_pkg_collection_path.exists(): - msg = f"Removing installed {self._config.site_pkg_collection_path}" + if self._collection.site_pkg_path.exists(): + msg = f"Removing installed {self._collection.site_pkg_path}" logger.debug(msg) - if self._config.site_pkg_collection_path.is_symlink(): - self._config.site_pkg_collection_path.unlink() + if self._config.site_pkg_path.is_symlink(): + self._config.site_pkg_path.unlink() else: - shutil.rmtree(self._config.site_pkg_collection_path) + shutil.rmtree(self._config.site_pkg_path) info_dirs = [ entry for entry in self._config.site_pkg_collections_path.iterdir() if entry.is_dir() and entry.name.endswith(".info") - and entry.name.startswith(self._config.collection.name) + and entry.name.startswith(self._collection.name) ] for info_dir in info_dirs: msg = f"Removing installed {info_dir}" @@ -254,6 +281,8 @@ def _install_local_collection(self: Installer) -> None: # noqa: PLR0912, PLR091 command=command, env=env, verbose=self._config.args.verbose, + msg=msg, + term_features=self._config.term_features, ) except subprocess.CalledProcessError as exc: err = f"Failed to install collection: {exc} {exc.stderr}" @@ -265,13 +294,13 @@ def _install_local_collection(self: Installer) -> None: # noqa: PLR0912, PLR091 # preserve the MANIFEST.json file for editable installs if not self._config.args.editable: shutil.copy( - self._config.collection_build_dir / "galaxy.yml", - self._config.site_pkg_collection_path / "galaxy.yml", + self._collection.build_dir / "galaxy.yml", + self._config.site_pkg_path / "galaxy.yml", ) else: shutil.copy( - self._config.site_pkg_collection_path / "MANIFEST.json", - self._config.collection_cache_dir / "MANIFEST.json", + self._config.site_pkg_path / "MANIFEST.json", + self._collection.cache_dir / "MANIFEST.json", ) installed = re.findall(r"(\w+\.\w+):.*installed", proc.stdout) @@ -284,26 +313,23 @@ def _swap_editable_collection(self: Installer) -> None: Raises: RuntimeError: If the collection path is not set. """ - msg = f"Swapping {self._config.collection.name} with {self._config.collection.path}" + msg = f"Swapping {self._collection.name} with {self._collection.path}" logger.info(msg) - if self._config.collection.path is None: + if self._collection.path is None: msg = "Collection path not set" raise RuntimeError(msg) - msg = f"Removing installed {self._config.site_pkg_collection_path}" + msg = f"Removing installed {self._config.site_pkg_path}" logger.debug(msg) - if self._config.site_pkg_collection_path.exists(): - if self._config.site_pkg_collection_path.is_symlink(): - self._config.site_pkg_collection_path.unlink() + if self._config.site_pkg_path.exists(): + if self._config.site_pkg_path.is_symlink(): + self._config.site_pkg_path.unlink() else: - shutil.rmtree(self._config.site_pkg_collection_path) + shutil.rmtree(self._config.site_pkg_path) - msg = ( - f"Symlinking {self._config.site_pkg_collection_path}" - f" to {self._config.collection.path}" - ) + msg = f"Symlinking {self._collection.site_pkg_path} to {self._collection.path}" logger.debug(msg) - self._config.site_pkg_collection_path.symlink_to(self._config.collection.path) + self._collection.site_pkg_path.symlink_to(self._collection.path) def _pip_install(self: Installer) -> None: """Install the dependencies.""" @@ -319,8 +345,14 @@ def _pip_install(self: Installer) -> None: f"Installing python requirements from {self._config.discovered_python_reqs}" ) logger.debug(msg) + work = "Installing python requirements" try: - subprocess_run(command=command, verbose=self._config.args.verbose) + subprocess_run( + command=command, + verbose=self._config.args.verbose, + msg=work, + term_features=self._config.term_features, + ) except subprocess.CalledProcessError as exc: err = ( "Failed to install requirements from" @@ -337,8 +369,14 @@ def _check_bindep(self: Installer) -> None: logger.info(msg) command = f"bindep -b -f {self._config.discovered_bindep_reqs}" + work = "Checking system package requirements" try: - subprocess_run(command=command, verbose=self._config.args.verbose) + subprocess_run( + command=command, + verbose=self._config.args.verbose, + msg=work, + term_features=self._config.term_features, + ) except subprocess.CalledProcessError as exc: lines = exc.stdout.splitlines() msg = ( diff --git a/src/pip4a/subcommands/uninstaller.py b/src/pip4a/subcommands/uninstaller.py index 458a0cd..85fbd4d 100644 --- a/src/pip4a/subcommands/uninstaller.py +++ b/src/pip4a/subcommands/uninstaller.py @@ -8,7 +8,8 @@ from pathlib import Path from typing import TYPE_CHECKING -from pip4a.utils import collections_from_requirements, note, parse_collection_request +from pip4a.collection import Collection, parse_collection_request +from pip4a.utils import collections_from_requirements, note if TYPE_CHECKING: @@ -28,6 +29,7 @@ def __init__(self: UnInstaller, config: Config) -> None: config: The application configuration. """ self._config = config + self._collection: Collection def run(self: UnInstaller) -> None: """Run the uninstaller.""" @@ -38,32 +40,37 @@ def run(self: UnInstaller) -> None: logger.critical(err) collections = collections_from_requirements(requirements_path) for collection in collections: - self._config.collection = parse_collection_request( - collection["name"], + self._collection = parse_collection_request( + string=collection["name"], + config=self._config, ) self._remove_collection() else: + self._collection = parse_collection_request( + string=self._config.args.collection_specifier, + config=self._config, + ) self._remove_collection() def _remove_collection(self: UnInstaller) -> None: """Remove the collection.""" - msg = f"Checking {self._config.collection.name} at {self._config.site_pkg_collection_path}" + msg = f"Checking {self._collection.name} at {self._collection.site_pkg_path}" logger.debug(msg) - if self._config.site_pkg_collection_path.exists(): - msg = f"Exists: {self._config.site_pkg_collection_path}" + if self._collection.site_pkg_path.exists(): + msg = f"Exists: {self._collection.site_pkg_path}" logger.debug(msg) - if self._config.site_pkg_collection_path.is_symlink(): - self._config.site_pkg_collection_path.unlink() + if self._collection.site_pkg_path.is_symlink(): + self._collection.site_pkg_path.unlink() else: - shutil.rmtree(self._config.site_pkg_collection_path) - msg = f"Removed {self._config.collection.name}" + shutil.rmtree(self._collection.site_pkg_path) + msg = f"Removed {self._collection.name}" note(msg) else: err = ( - f"Failed to find {self._config.collection.name}:" - f" {self._config.site_pkg_collection_path}" + f"Failed to find {self._collection.name}:" + f" {self._collection.site_pkg_path}" ) logger.warning(err) @@ -71,15 +78,15 @@ def _remove_collection(self: UnInstaller) -> None: if all( ( entry.is_dir(), - entry.name.startswith(self._config.collection.name), + entry.name.startswith(self._collection.name), entry.suffix == ".info", ), ): shutil.rmtree(entry) - msg = f"Removed {self._config.collection.name}*.info: {entry}" + msg = f"Removed {self._collection.name}*.info: {entry}" logger.debug(msg) - collection_namespace_root = self._config.site_pkg_collection_path.parent + collection_namespace_root = self._collection.site_pkg_path.parent try: collection_namespace_root.rmdir() msg = f"Removed collection namespace root: {collection_namespace_root}" diff --git a/src/pip4a/utils.py b/src/pip4a/utils.py index 8313614..b18ed02 100644 --- a/src/pip4a/utils.py +++ b/src/pip4a/utils.py @@ -2,12 +2,14 @@ from __future__ import annotations +import itertools import json import logging import os -import re import subprocess import sys +import threading +import time from dataclasses import dataclass from typing import TYPE_CHECKING, Union @@ -17,9 +19,11 @@ if TYPE_CHECKING: + from pathlib import Path + from types import TracebackType + from .config import Config -from pathlib import Path from typing import Any @@ -80,17 +84,20 @@ class Ansi: UNDERLINE = "\x1B[4m" WHITE = "\x1B[37m" YELLOW = "\x1B[33m" + GREY = "\x1B[90m" -def subprocess_run( +def subprocess_run( # noqa: PLR0913 command: str, verbose: int, + msg: str, + term_features: TermFeatures, cwd: Path | None = None, env: dict[str, str] | None = None, ) -> subprocess.CompletedProcess[str]: """Run a subprocess command.""" - msg = f"Running command: {command}" - logger.debug(msg) + cmd = f"Running command: {command}" + logger.debug(cmd) log_level = logging.ERROR - (verbose * 10) if log_level == logging.DEBUG: return subprocess_tee.run( @@ -101,15 +108,16 @@ def subprocess_run( shell=True, # noqa: S604 text=True, ) - return subprocess.run( - command, - check=True, - cwd=cwd, - env=env, - shell=True, # noqa: S602 - capture_output=True, - text=True, - ) + with Spinner(message=msg, term_features=term_features): + return subprocess.run( + command, + check=True, + cwd=cwd, + env=env, + shell=True, # noqa: S602 + capture_output=True, + text=True, + ) def oxford_join(words: list[str]) -> str: @@ -261,8 +269,14 @@ def builder_introspect(config: Config) -> None: logger.debug(msg) msg = f"Writing discovered system requirements to: {config.discovered_bindep_reqs}" logger.debug(msg) + work = "Persisting requirements to file system" try: - subprocess_run(command=command, verbose=config.args.verbose) + subprocess_run( + command=command, + verbose=config.args.verbose, + msg=work, + term_features=config.term_features, + ) except subprocess.CalledProcessError as exc: err = f"Failed to discover requirements: {exc} {exc.stderr}" logger.critical(err) @@ -294,100 +308,6 @@ def hint(string: str) -> None: print(f"\033[95m{_hint}\033[0m") # noqa: T201 -@dataclass -class CollectionSpec: - """A collection request specification.""" - - path: Path | None = None - opt_deps: str | None = None - local: bool | None = None - cnamespace: str | None = None - cname: str | None = None - specifier: str | None = None - - @property - def name(self: CollectionSpec) -> str: - """Return the collection name.""" - return f"{self.cnamespace}.{self.cname}" - - -def parse_collection_request(string: str) -> CollectionSpec: # noqa: PLR0915 - """Parse a collection request str.""" - collection_spec = CollectionSpec() - # spec with dep, local - if "[" in string and "]" in string: - msg = f"Found optional dependencies in collection request: {string}" - logger.debug(msg) - path = Path(string.split("[")[0]).expanduser().resolve() - if not path.exists(): - msg = "Provide an existing path to a collection when specifying optional dependencies." - hint(msg) - msg = f"Failed to find collection path: {path}" - logger.critical(msg) - msg = f"Found local collection request with dependencies: {string}" - logger.debug(msg) - collection_spec.path = path - msg = f"Setting collection path: {collection_spec.path}" - collection_spec.opt_deps = string.split("[")[1].split("]")[0] - msg = f"Setting optional dependencies: {collection_spec.opt_deps}" - logger.debug(msg) - collection_spec.local = True - msg = "Setting request as local" - logger.debug(msg) - return collection_spec - # spec without dep, local - path = Path(string).expanduser().resolve() - if path.exists(): - msg = f"Found local collection request without dependencies: {string}" - logger.debug(msg) - msg = f"Setting collection path: {path}" - logger.debug(msg) - collection_spec.path = path - msg = "Setting request as local" - logger.debug(msg) - collection_spec.local = True - return collection_spec - non_local_re = re.compile( - r""" - (?P[A-Za-z0-9]+) # collection name - \. # dot - (?P[A-Za-z0-9]+) # collection name - (?P[^A-Za-z0-9].*)? # optional specifier - """, - re.VERBOSE, - ) - matched = non_local_re.match(string) - if not matched: - msg = ( - "Specify a valid collection name (ns.n) with an optional version specifier" - ) - hint(msg) - msg = f"Failed to parse collection request: {string}" - logger.critical(msg) - sys.exit(1) - msg = f"Found non-local collection request: {string}" - logger.debug(msg) - - collection_spec.cnamespace = matched.group("cnamespace") - msg = f"Setting collection namespace: {collection_spec.cnamespace}" - logger.debug(msg) - - collection_spec.cname = matched.group("cname") - msg = f"Setting collection name: {collection_spec.cname}" - logger.debug(msg) - - if matched.group("specifier"): - collection_spec.specifier = matched.group("specifier") - msg = f"Setting collection specifier: {collection_spec.specifier}" - logger.debug(msg) - - collection_spec.local = False - msg = "Setting request as non-local" - logger.debug(msg) - - return collection_spec - - def collections_from_requirements(file: Path) -> list[dict[str, str]]: """Build a list of collections from a requirements file.""" collections = [] @@ -459,3 +379,118 @@ def collections_meta(config: Config) -> dict[str, dict[str, Any]]: "dependencies": [], } return collections + + +class Spinner: # pylint: disable=too-many-instance-attributes + """A spinner.""" + + def __init__( + self: Spinner, + message: str, + term_features: TermFeatures, + delay: float = 0.1, + ) -> None: + """Initialize the spinner. + + Args: + message: The message to display + term_features: Terminal features + delay: The delay between characters + """ + self.spinner = itertools.cycle(["-", "/", "|", "\\"]) + self.delay = delay + self.busy = False + self.spinner_visible = False + self._term_features = term_features + self._screen_lock = threading.Lock() + self._start_time: float | None = None + self.thread: threading.Thread + self.msg: str = message.rstrip(".").rstrip(":").rstrip() + + def write_next(self: Spinner) -> None: + """Write the next char.""" + with self._screen_lock: + if not self.spinner_visible: + if self._term_features.color: + char = f"{Ansi.GREY}{next(self.spinner)}{Ansi.RESET}" + else: + char = next(self.spinner) + sys.stdout.write(char) + self.spinner_visible = True + sys.stdout.flush() + + def remove_spinner( + self: Spinner, + cleanup: bool = False, # noqa: FBT001,FBT002 + ) -> None: + """Remove the spinner. + + https://github.com/Tagar/stuff/blob/master/spinner.py + + Args: + cleanup: Should we cleanup after the spinner + """ + with self._screen_lock: + if self.spinner_visible: + sys.stdout.write("\b") + self.spinner_visible = False + if cleanup: + sys.stdout.write(" ") # overwrite spinner with blank + sys.stdout.write("\r") # move to next line + sys.stdout.write("\033[K") # clear line + sys.stdout.flush() + + def spinner_task(self: Spinner) -> None: + """Spin the spinner.""" + while self.busy: + self.write_next() + time.sleep(self.delay) + self.remove_spinner() + + def __enter__(self: Spinner) -> None: + """Enter the context handler.""" + # set the start time + self._start_time = time.time() + if not self._term_features.any_enabled(): + return + if self._term_features.color: + sys.stdout.write(f"{Ansi.GREY}{self.msg}:{Ansi.RESET} ") + else: + sys.stdout.write(f"{self.msg}: ") + # hide the cursor + sys.stdout.write("\033[?25l") + if self._term_features.any_enabled(): + self.busy = True + self.thread = threading.Thread(target=self.spinner_task) + self.thread.start() + + def __exit__( + self: Spinner, + typ: type[BaseException] | None, + exc: BaseException | None, + tb: TracebackType | None, + ) -> None: + """Exit the context handler. + + Args: + typ: The exception + exc: The exception value + tb: The traceback + + + """ + # delay if less than 1 second has elapsed + if not self._term_features.any_enabled(): + return + min_show_time = 1 + if self._start_time: + elapsed = time.time() - self._start_time + if elapsed < min_show_time: + time.sleep(1 - elapsed) + if self._term_features.any_enabled(): + self.busy = False + self.remove_spinner(cleanup=True) + else: + sys.stdout.write("\r") + # show the cursor + sys.stdout.write("\033[?25h") diff --git a/tests/fixtures/galaxy.yml b/tests/fixtures/galaxy.yml new file mode 100644 index 0000000..e731778 --- /dev/null +++ b/tests/fixtures/galaxy.yml @@ -0,0 +1,10 @@ +name: cname +namespace: cnamespace +version: 0.0.1 +license: + - MIT +description: cdescription +readme: README.md +authors: + - cauthor +repository: crepository diff --git a/tests/integration/test_basic.py b/tests/integration/test_basic.py index 03ecc6b..811487d 100644 --- a/tests/integration/test_basic.py +++ b/tests/integration/test_basic.py @@ -7,7 +7,7 @@ import pytest from pip4a.cli import main -from pip4a.utils import subprocess_run +from pip4a.utils import TermFeatures, subprocess_run def test_venv( @@ -17,12 +17,12 @@ def test_venv( ) -> None: """Basic smoke test.""" # disable color for json output - monkeypatch.setenv("NOCOLOR", "1") + term_features = TermFeatures(color=False, links=False) command = ( "git clone https://github.com/ansible-collections/cisco.nxos.git" f" {tmp_path/ 'cisco.nxos'}" ) - subprocess_run(command=command, verbose=True) + subprocess_run(command=command, verbose=True, msg="", term_features=term_features) monkeypatch.chdir(tmp_path) monkeypatch.setattr( @@ -57,7 +57,7 @@ def test_venv( captured = capsys.readouterr() assert "Removed ansible.utils" in captured.out - monkeypatch.setattr("sys.argv", ["pip4a", "inspect", "--venv=venv"]) + monkeypatch.setattr("sys.argv", ["pip4a", "inspect", "--venv=venv", "--no-ansi"]) with pytest.raises(SystemExit): main() captured = capsys.readouterr() @@ -67,7 +67,7 @@ def test_venv( assert "ansible.utils" not in captured_json command = f"{tmp_path / 'venv' / 'bin' / 'python'} -m pip uninstall xmltodict -y" - subprocess_run(command=command, verbose=True) + subprocess_run(command=command, verbose=True, msg="", term_features=term_features) monkeypatch.setattr("sys.argv", ["pip4a", "check", "--venv=venv"]) with pytest.raises(SystemExit): @@ -98,12 +98,6 @@ def test_non_local( with pytest.raises(SystemExit): main() captured = capsys.readouterr() - string = "\x1b[34m\x1b]8;;https://github.com/ansible-collections/ansible.scm\x1b\\ansible.scm\x1b]8;;\x1b\\\x1b[0m\nā””ā”€ā”€\x1b[34m\x1b]8;;https://github.com/ansible-collections/ansible.utils\x1b\\ansible.utils\x1b]8;;\x1b\\\x1b[0m\n\n" - assert string == captured.out - monkeypatch.setattr( - "sys.argv", - ["pip4a", "tree", "--no-ansi", f"--venv={tmp_path / 'venv'}"], - ) with pytest.raises(SystemExit): main() captured = capsys.readouterr() diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py index 4bcac2c..41ce897 100644 --- a/tests/unit/test_utils.py +++ b/tests/unit/test_utils.py @@ -2,18 +2,28 @@ from __future__ import annotations +from argparse import Namespace from pathlib import Path import pytest -from pip4a.utils import CollectionSpec, parse_collection_request +from pip4a.collection import Collection, parse_collection_request +from pip4a.config import Config +from pip4a.utils import TermFeatures +config = Config(args=Namespace(), term_features=TermFeatures(color=False, links=False)) + +FIXTURE_DIR = Path(__file__).parent.parent.resolve() / "fixtures" scenarios = ( - ("ansible.utils", CollectionSpec(cname="utils", cnamespace="ansible", local=False)), + ( + "ansible.utils", + Collection(config=config, cname="utils", cnamespace="ansible", local=False), + ), ( "ansible.utils:1.0.0", - CollectionSpec( + Collection( + config=config, cname="utils", cnamespace="ansible", specifier=":1.0.0", @@ -22,7 +32,8 @@ ), ( "ansible.utils>=1.0.0", - CollectionSpec( + Collection( + config=config, cname="utils", cnamespace="ansible", specifier=">=1.0.0", @@ -30,20 +41,26 @@ ), ), ( - "/", - CollectionSpec( - specifier=None, + str(FIXTURE_DIR), + Collection( + cname="cname", + cnamespace="cnamespace", + config=config, local=True, - path=Path("/"), + path=FIXTURE_DIR, + specifier=None, ), ), ( - "/[test]", - CollectionSpec( - specifier=None, + str(FIXTURE_DIR) + "/[test]", + Collection( + cname="cname", + cnamespace="cnamespace", + config=config, local=True, - path=Path("/"), opt_deps="test", + path=FIXTURE_DIR, + specifier=None, ), ), ( @@ -58,11 +75,11 @@ @pytest.mark.parametrize("scenario", scenarios) -def test_parse_collection_request(scenario: tuple[str, CollectionSpec | None]) -> None: +def test_parse_collection_request(scenario: tuple[str, Collection | None]) -> None: """Test that the parse_collection_request function works as expected.""" string, spec = scenario if spec is None: with pytest.raises(SystemExit): - parse_collection_request(string) + parse_collection_request(string=string, config=config) else: - assert parse_collection_request(string) == spec + assert parse_collection_request(string=string, config=config) == spec