diff --git a/hydra/_internal/core_plugins/basic_launcher.py b/hydra/_internal/core_plugins/basic_launcher.py index af1c3493282..0fab519bf6c 100644 --- a/hydra/_internal/core_plugins/basic_launcher.py +++ b/hydra/_internal/core_plugins/basic_launcher.py @@ -2,7 +2,7 @@ import logging from dataclasses import dataclass from pathlib import Path -from typing import List, Optional, Sequence +from typing import List, Optional, Sequence, Union from omegaconf import DictConfig, open_dict @@ -14,6 +14,7 @@ run_job, setup_globals, ) +from hydra.plugins.experiment_sequence import ExperimentSequence from hydra.plugins.launcher import Launcher from hydra.types import HydraContext, TaskFunction @@ -65,6 +66,7 @@ def launch( idx = initial_job_idx + idx lst = " ".join(filter_overrides(overrides)) log.info(f"\t#{idx} : {lst}") + sweep_config = self.hydra_context.config_loader.load_sweep_config( self.config, list(overrides) ) @@ -81,3 +83,40 @@ def launch( runs.append(ret) configure_log(self.config.hydra.hydra_logging, self.config.hydra.verbose) return runs + + def launch_experiment_sequence( + self, job_overrides: ExperimentSequence, initial_job_idx: int + ) -> Sequence[JobReturn]: + setup_globals() + assert self.hydra_context is not None + assert self.config is not None + assert self.task_function is not None + + configure_log(self.config.hydra.hydra_logging, self.config.hydra.verbose) + sweep_dir = self.config.hydra.sweep.dir + Path(str(sweep_dir)).mkdir(parents=True, exist_ok=True) + log.info(f"Launching {len(job_overrides)} jobs locally") + runs: List[JobReturn] = [] + for idx, overrides in enumerate(job_overrides): + idx = initial_job_idx + idx + lst = " ".join(filter_overrides(overrides)) + log.info(f"\t#{idx} : {lst}") + + sweep_config = self.hydra_context.config_loader.load_sweep_config( + self.config, list(overrides) + ) + with open_dict(sweep_config): + sweep_config.hydra.job.id = idx + sweep_config.hydra.job.num = idx + ret = run_job( + hydra_context=self.hydra_context, + task_function=self.task_function, + config=sweep_config, + job_dir_key="hydra.sweep.dir", + job_subdir_key="hydra.sweep.subdir", + ) + runs.append(ret) + if isinstance(job_overrides, ExperimentSequence): + job_overrides.update_sequence((overrides, ret)) + configure_log(self.config.hydra.hydra_logging, self.config.hydra.verbose) + return runs \ No newline at end of file diff --git a/hydra/plugins/experiment_sequence.py b/hydra/plugins/experiment_sequence.py new file mode 100644 index 00000000000..40209a740c5 --- /dev/null +++ b/hydra/plugins/experiment_sequence.py @@ -0,0 +1,38 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import abstractmethod +import typing + +from collections.abc import Iterator +from typing import Any, Sequence, Tuple + + +class ExperimentSequence(Iterator): + @abstractmethod + def __next__(self): + """Return tuple of experiment id, optional trial object and experiment overrides.""" + raise NotImplementedError() + + def __iter__(self) -> typing.Iterator[Sequence[str]]: + return self + + @abstractmethod + def update_sequence(self, experiment_result: Tuple[Sequence[str], Any]): + """Update experiment generator(study) with experiment results""" + raise NotImplementedError() + + def __len__(self): + """Return maximum number of experiments sequence can produce""" + raise NotImplementedError() \ No newline at end of file diff --git a/hydra/plugins/launcher.py b/hydra/plugins/launcher.py index b11ad4d32b8..4a28e82d28e 100644 --- a/hydra/plugins/launcher.py +++ b/hydra/plugins/launcher.py @@ -1,3 +1,17 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved """ Launcher plugin interface @@ -8,7 +22,7 @@ from omegaconf import DictConfig from hydra.core.utils import JobReturn - +from hydra.plugins.experiment_sequence import ExperimentSequence from hydra.types import TaskFunction, HydraContext from .plugin import Plugin @@ -37,3 +51,14 @@ def launch( :param initial_job_idx: Initial job idx. used by sweepers that executes several batches """ raise NotImplementedError() + + def launch_experiment_sequence( + self, job_overrides: ExperimentSequence, initial_job_idx: int + ) -> Sequence[JobReturn]: + """ + :param job_overrides: a batch of job arguments + :param initial_job_idx: Initial job idx. used by sweepers that executes several batches + """ + raise NotImplementedError( + "This launcher doesn't support launching experiment sequence." + ) diff --git a/hydra/plugins/sweeper.py b/hydra/plugins/sweeper.py index 53e1b93d880..4f49957bfec 100644 --- a/hydra/plugins/sweeper.py +++ b/hydra/plugins/sweeper.py @@ -1,3 +1,17 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved """ Sweeper plugin interface diff --git a/plugins/hydra_joblib_launcher/hydra_plugins/hydra_joblib_launcher/_core.py b/plugins/hydra_joblib_launcher/hydra_plugins/hydra_joblib_launcher/_core.py index 8bee1491337..acc46d5ba5c 100644 --- a/plugins/hydra_joblib_launcher/hydra_plugins/hydra_joblib_launcher/_core.py +++ b/plugins/hydra_joblib_launcher/hydra_plugins/hydra_joblib_launcher/_core.py @@ -1,7 +1,7 @@ # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved import logging from pathlib import Path -from typing import Any, Dict, List, Sequence +from typing import Any, Dict, Union, List, Sequence from hydra.core.hydra_config import HydraConfig from hydra.core.singleton import Singleton @@ -12,9 +12,11 @@ run_job, setup_globals, ) +from hydra.plugins.experiment_sequence import ExperimentSequence from hydra.types import HydraContext, TaskFunction from joblib import Parallel, delayed # type: ignore from omegaconf import DictConfig, open_dict +import multiprocessing as mp from .joblib_launcher import JoblibLauncher @@ -63,13 +65,22 @@ def process_joblib_cfg(joblib_cfg: Dict[str, Any]) -> None: pass +def _batch_sequence(sequence, batch_size=1): + while True: + overrides = [experiment_config for _, experiment_config in zip(range(batch_size), sequence)] + if overrides: + yield overrides + if len(overrides) != batch_size: + raise StopIteration + + def launch( launcher: JoblibLauncher, - job_overrides: Sequence[Sequence[str]], + job_overrides: Union[Sequence[Sequence[str]], ExperimentSequence], initial_job_idx: int, ) -> Sequence[JobReturn]: """ - :param job_overrides: a List of List, where each inner list is the arguments for one job run. + :param job_overrides: an Iterable of List, where each inner list is the arguments for one job run. :param initial_job_idx: Initial job idx in batch. :return: an array of return values from run_job with indexes corresponding to the input list indexes. """ @@ -87,30 +98,53 @@ def launch( joblib_cfg = launcher.joblib joblib_cfg["backend"] = "loky" process_joblib_cfg(joblib_cfg) - - log.info( - "Joblib.Parallel({}) is launching {} jobs".format( - ",".join(f"{k}={v}" for k, v in joblib_cfg.items()), - len(job_overrides), - ) - ) - log.info(f"Launching jobs, sweep output dir : {sweep_dir}") - for idx, overrides in enumerate(job_overrides): - log.info("\t#{} : {}".format(idx, " ".join(filter_overrides(overrides)))) - singleton_state = Singleton.get_state() - runs = Parallel(**joblib_cfg)( - delayed(execute_job)( - initial_job_idx + idx, - overrides, - launcher.hydra_context, - launcher.config, - launcher.task_function, - singleton_state, + if isinstance(job_overrides, ExperimentSequence): + log.info( + "Joblib.Parallel({}) is launching {} jobs".format( + ",".join([f"{k}={v}" for k, v in joblib_cfg.items()]), + 'generator of', + ) + ) + batch_size = v if (v := joblib_cfg['n_jobs']) != -1 else mp.cpu_count() + runs = [] + for idx, overrides in enumerate(_batch_sequence(job_overrides, batch_size)): + results = Parallel(**joblib_cfg)( + delayed(execute_job)( + initial_job_idx + idx, + override, + launcher.hydra_context, + launcher.config, + launcher.task_function, + singleton_state, + ) + for override in overrides + ) + for experiment_result in zip(overrides, results): + job_overrides.update_sequence(experiment_result) + else: + log.info( + "Joblib.Parallel({}) is launching {} jobs".format( + ",".join(f"{k}={v}" for k, v in joblib_cfg.items()), + len(job_overrides), + ) + ) + log.info(f"Launching jobs, sweep output dir : {sweep_dir}") + for idx, overrides in enumerate(job_overrides): + log.info("\t#{} : {}".format(idx, " ".join(filter_overrides(overrides)))) + + runs = Parallel(**joblib_cfg)( + delayed(execute_job)( + initial_job_idx + idx, + overrides, + launcher.hydra_context, + launcher.config, + launcher.task_function, + singleton_state, + ) + for idx, overrides in enumerate(job_overrides) ) - for idx, overrides in enumerate(job_overrides) - ) assert isinstance(runs, List) for run in runs: diff --git a/plugins/hydra_joblib_launcher/hydra_plugins/hydra_joblib_launcher/joblib_launcher.py b/plugins/hydra_joblib_launcher/hydra_plugins/hydra_joblib_launcher/joblib_launcher.py index 66cb99d741e..96df393e9e9 100644 --- a/plugins/hydra_joblib_launcher/hydra_plugins/hydra_joblib_launcher/joblib_launcher.py +++ b/plugins/hydra_joblib_launcher/hydra_plugins/hydra_joblib_launcher/joblib_launcher.py @@ -4,6 +4,7 @@ from hydra.core.utils import JobReturn from hydra.plugins.launcher import Launcher +from hydra.plugins.experiment_sequence import ExperimentSequence from hydra.types import HydraContext, TaskFunction from omegaconf import DictConfig @@ -45,3 +46,12 @@ def launch( return _core.launch( launcher=self, job_overrides=job_overrides, initial_job_idx=initial_job_idx ) + + def launch_experiment_sequence( + self, job_overrides: ExperimentSequence, initial_job_idx: int + ) -> Sequence[JobReturn]: + from . import _core + + return _core.launch( + launcher=self, job_overrides=job_overrides, initial_job_idx=initial_job_idx + ) diff --git a/plugins/hydra_loky_launcher/MANIFEST.in b/plugins/hydra_loky_launcher/MANIFEST.in new file mode 100644 index 00000000000..580709b1349 --- /dev/null +++ b/plugins/hydra_loky_launcher/MANIFEST.in @@ -0,0 +1,3 @@ +global-exclude *.pyc +global-exclude __pycache__ +recursive-include hydra_plugins/* *.yaml py.typed diff --git a/plugins/hydra_loky_launcher/NEWS.md b/plugins/hydra_loky_launcher/NEWS.md new file mode 100644 index 00000000000..8b137891791 --- /dev/null +++ b/plugins/hydra_loky_launcher/NEWS.md @@ -0,0 +1 @@ + diff --git a/plugins/hydra_loky_launcher/README.md b/plugins/hydra_loky_launcher/README.md new file mode 100644 index 00000000000..69c386acef8 --- /dev/null +++ b/plugins/hydra_loky_launcher/README.md @@ -0,0 +1,3 @@ +# Hydra loky Launcher +Provides a [loky](link) based Hydra Launcher supporting parallel worker pool execution. + diff --git a/plugins/hydra_loky_launcher/example/config.yaml b/plugins/hydra_loky_launcher/example/config.yaml new file mode 100644 index 00000000000..cce48112670 --- /dev/null +++ b/plugins/hydra_loky_launcher/example/config.yaml @@ -0,0 +1,9 @@ +defaults: + - override hydra/launcher: loky + +task: 1 + +hydra: + launcher: + # override the number of jobs for loky + max_workers: 10 diff --git a/plugins/hydra_loky_launcher/example/my_app.py b/plugins/hydra_loky_launcher/example/my_app.py new file mode 100644 index 00000000000..b39236a19b6 --- /dev/null +++ b/plugins/hydra_loky_launcher/example/my_app.py @@ -0,0 +1,20 @@ +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved +import logging +import os +import time + +import hydra +from omegaconf import DictConfig + +log = logging.getLogger(__name__) + + +@hydra.main(config_name="config") +def my_app(cfg: DictConfig) -> None: + log.info(f"Process ID {os.getpid()} executing task {cfg.task} ...") + + time.sleep(1) + + +if __name__ == "__main__": + my_app() diff --git a/plugins/hydra_loky_launcher/hydra_plugins/hydra_loky_launcher/__init__.py b/plugins/hydra_loky_launcher/hydra_plugins/hydra_loky_launcher/__init__.py new file mode 100644 index 00000000000..aaea960e4fd --- /dev/null +++ b/plugins/hydra_loky_launcher/hydra_plugins/hydra_loky_launcher/__init__.py @@ -0,0 +1,17 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved + +__version__ = "1.2.0" diff --git a/plugins/hydra_loky_launcher/hydra_plugins/hydra_loky_launcher/_core.py b/plugins/hydra_loky_launcher/hydra_plugins/hydra_loky_launcher/_core.py new file mode 100644 index 00000000000..6a44377e663 --- /dev/null +++ b/plugins/hydra_loky_launcher/hydra_plugins/hydra_loky_launcher/_core.py @@ -0,0 +1,154 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved +import logging +from pathlib import Path +from typing import Any, Dict, Union, List, Sequence +from itertools import repeat +from concurrent.futures import wait, FIRST_COMPLETED, ALL_COMPLETED + +from hydra.core.hydra_config import HydraConfig +from hydra.core.singleton import Singleton +from hydra.core.utils import ( + JobReturn, + configure_log, + filter_overrides, + run_job, + setup_globals, +) +from hydra.plugins.experiment_sequence import ExperimentSequence +from hydra.types import HydraContext, TaskFunction +from loky import get_reusable_executor +from omegaconf import DictConfig, open_dict +import multiprocessing as mp + +from .loky_launcher import LokyLauncher + +log = logging.getLogger(__name__) + + +def execute_job( + idx: int, + overrides: Sequence[str], + hydra_context: HydraContext, + config: DictConfig, + task_function: TaskFunction, + singleton_state: Dict[Any, Any], +) -> JobReturn: + """Calls `run_job` in parallel""" + setup_globals() + Singleton.set_state(singleton_state) + + sweep_config = hydra_context.config_loader.load_sweep_config( + config, list(overrides) + ) + with open_dict(sweep_config): + sweep_config.hydra.job.id = "{}_{}".format(sweep_config.hydra.job.name, idx) + sweep_config.hydra.job.num = idx + HydraConfig.instance().set_config(sweep_config) + + ret = run_job( + hydra_context=hydra_context, + config=sweep_config, + task_function=task_function, + job_dir_key="hydra.sweep.dir", + job_subdir_key="hydra.sweep.subdir", + ) + + return ret + + +def process_loky_cfg(loky_cfg: Dict[str, Any]) -> None: + for k in ["timeout", "max_workers"]: + if k in loky_cfg.keys(): + try: + val = loky_cfg.get(k) + if val: + loky_cfg[k] = int(val) + except ValueError: + pass + + +def launch( + launcher: LokyLauncher, + job_overrides: Union[Sequence[Sequence[str]], ExperimentSequence], + initial_job_idx: int, +) -> Sequence[JobReturn]: + """ + :param job_overrides: an Iterable of List, where each inner list is the arguments for one job run. + :param initial_job_idx: Initial job idx in batch. + :return: an array of return values from run_job with indexes corresponding to the input list indexes. + """ + setup_globals() + assert launcher.config is not None + assert launcher.task_function is not None + assert launcher.hydra_context is not None + + configure_log(launcher.config.hydra.hydra_logging, launcher.config.hydra.verbose) + sweep_dir = Path(str(launcher.config.hydra.sweep.dir)) + sweep_dir.mkdir(parents=True, exist_ok=True) + + loky_cfg = launcher.loky + process_loky_cfg(loky_cfg) + singleton_state = Singleton.get_state() + + worker_pool = get_reusable_executor(**loky_cfg) + batch_size = v if (v := loky_cfg['max_workers']) is not None else mp.cpu_count() + + runs = [None for _ in range(len(job_overrides))] + log.info( + "ReusableExectutor({}) is launching {} jobs".format( + ",".join([f"{k}={v}" for k, v in loky_cfg.items()]), + 'generator of' if isinstance(job_overrides, ExperimentSequence) else len(job_overrides), + ) + ) + running_tasks = {} + for idx, override in enumerate(job_overrides): + log.info("\t#{} : {}".format(idx, " ".join(filter_overrides(override)))) + running_tasks[worker_pool.submit( + execute_job, + initial_job_idx + idx, + override, + launcher.hydra_context, + launcher.config, + launcher.task_function, + singleton_state + )] = (override, idx) + + if len(running_tasks) == batch_size: + finished, non_finished = wait(running_tasks, return_when=FIRST_COMPLETED) + overrides = [running_tasks[f] for f in finished] + results = [f.result() for f in finished] + running_tasks = {task: running_tasks[task] for task in non_finished} + for (_, idx), res in zip(overrides, results): + runs[idx] = res + if isinstance(job_overrides, ExperimentSequence): + for (override, _), res in zip(overrides, results): + job_overrides.update_sequence((override, res)) + + finished, _ = wait(running_tasks, return_when=ALL_COMPLETED) + overrides = [running_tasks[f] for f in finished] + results = [f.result() for f in finished] + + for (_, idx), res in zip(overrides, results): + runs[idx] = res + if isinstance(job_overrides, ExperimentSequence): + for (override, _), res in zip(overrides, results): + job_overrides.update_sequence((override, res)) + + assert isinstance(runs, List) + for run in runs: + assert isinstance(run, JobReturn) + return runs diff --git a/plugins/hydra_loky_launcher/hydra_plugins/hydra_loky_launcher/config.py b/plugins/hydra_loky_launcher/hydra_plugins/hydra_loky_launcher/config.py new file mode 100644 index 00000000000..814db8e1473 --- /dev/null +++ b/plugins/hydra_loky_launcher/hydra_plugins/hydra_loky_launcher/config.py @@ -0,0 +1,38 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved +from dataclasses import dataclass +from typing import Optional + +from hydra.core.config_store import ConfigStore + + +@dataclass +class LokyLauncherConf: + _target_: str = "hydra_plugins.hydra_loky_launcher.loky_launcher.LokyLauncher" + + # maximum number of concurrently running jobs. if None, all CPUs are used + max_workers: Optional[int] = None + + # timeout limit for each task in seconds dependent on backend implementation; + timeout: Optional[float] = 10 + + +ConfigStore.instance().store( + group="hydra/launcher", + name="loky", + node=LokyLauncherConf, + provider="loky_launcher", +) diff --git a/plugins/hydra_loky_launcher/hydra_plugins/hydra_loky_launcher/loky_launcher.py b/plugins/hydra_loky_launcher/hydra_plugins/hydra_loky_launcher/loky_launcher.py new file mode 100644 index 00000000000..3f6a9d3efe5 --- /dev/null +++ b/plugins/hydra_loky_launcher/hydra_plugins/hydra_loky_launcher/loky_launcher.py @@ -0,0 +1,70 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved +import logging +from typing import Any, Optional, Sequence, Union + +from hydra.core.utils import JobReturn +from hydra.plugins.launcher import Launcher +from hydra.plugins.experiment_sequence import ExperimentSequence +from hydra.types import HydraContext, TaskFunction +from omegaconf import DictConfig + +log = logging.getLogger(__name__) + + +class LokyLauncher(Launcher): + def __init__(self, **kwargs: Any) -> None: + """Loky Launcher + + Launches parallel jobs using loky's process pool. For details, refer to: + https://loky.readthedocs.io/en/stable/API.html#loky.get_reusable_executor + + This plugin is based on the idea and inital implementation of joblib plugin + """ + self.config: Optional[DictConfig] = None + self.task_function: Optional[TaskFunction] = None + self.hydra_context: Optional[HydraContext] = None + + self.loky = kwargs + + def setup( + self, + *, + hydra_context: HydraContext, + task_function: TaskFunction, + config: DictConfig, + ) -> None: + self.config = config + self.task_function = task_function + self.hydra_context = hydra_context + + def launch( + self, job_overrides: Sequence[Sequence[str]], initial_job_idx: int + ) -> Sequence[JobReturn]: + from . import _core + + return _core.launch( + launcher=self, job_overrides=job_overrides, initial_job_idx=initial_job_idx + ) + + def launch_experiment_sequence( + self, job_overrides: ExperimentSequence, initial_job_idx: int + ) -> Sequence[JobReturn]: + from . import _core + + return _core.launch( + launcher=self, job_overrides=job_overrides, initial_job_idx=initial_job_idx + ) diff --git a/plugins/hydra_loky_launcher/hydra_plugins/hydra_loky_launcher/py.typed b/plugins/hydra_loky_launcher/hydra_plugins/hydra_loky_launcher/py.typed new file mode 100644 index 00000000000..e69de29bb2d diff --git a/plugins/hydra_loky_launcher/news/.gitignore b/plugins/hydra_loky_launcher/news/.gitignore new file mode 100644 index 00000000000..b722e9e13ef --- /dev/null +++ b/plugins/hydra_loky_launcher/news/.gitignore @@ -0,0 +1 @@ +!.gitignore \ No newline at end of file diff --git a/plugins/hydra_loky_launcher/pyproject.toml b/plugins/hydra_loky_launcher/pyproject.toml new file mode 100644 index 00000000000..3cf412b73fb --- /dev/null +++ b/plugins/hydra_loky_launcher/pyproject.toml @@ -0,0 +1,43 @@ +[build-system] +requires = ["setuptools", "wheel", "read-version"] +build-backend = "setuptools.build_meta" + + +[tool.towncrier] + package = "hydra_plugins.hydra_loky_launcher" + filename = "NEWS.md" + directory = "news/" + title_format = "{version} ({project_date})" + template = "../../news/_template.rst" + issue_format = "[#{issue}](https://github.com/facebookresearch/hydra/issues/{issue})" + start_string = "\n" + + [[tool.towncrier.type]] + directory = "feature" + name = "Features" + showcontent = true + + [[tool.towncrier.type]] + directory = "api_change" + name = "API Change (Renames, deprecations and removals)" + showcontent = true + + [[tool.towncrier.type]] + directory = "bugfix" + name = "Bug Fixes" + showcontent = true + + [[tool.towncrier.type]] + directory = "config" + name = "Configuration structure changes" + showcontent = true + + [[tool.towncrier.type]] + directory = "docs" + name = "Improved Documentation" + showcontent = true + + [[tool.towncrier.type]] + directory = "maintenance" + name = "Maintenance Changes" + showcontent = true diff --git a/plugins/hydra_loky_launcher/setup.py b/plugins/hydra_loky_launcher/setup.py new file mode 100644 index 00000000000..60af12f7fad --- /dev/null +++ b/plugins/hydra_loky_launcher/setup.py @@ -0,0 +1,46 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved +# type: ignore +from pathlib import Path + +from read_version import read_version +from setuptools import find_namespace_packages, setup + +setup( + name="hydra-loky-launcher", + version=read_version("hydra_plugins/hydra_loky_launcher", "__init__.py"), + author="Dima Zhylko, Jan Bączek", + author_email="dzhylko@nvidia.com, jbaczek@nvidia.com", + long_description=(Path(__file__).parent / "README.md").read_text(), + long_description_content_type="text/markdown", + url="https://github.com/facebookresearch/hydra/", + packages=find_namespace_packages(include=["hydra_plugins.*"]), + classifiers=[ + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Operating System :: MacOS", + "Operating System :: Microsoft :: Windows", + "Operating System :: POSIX :: Linux", + ], + install_requires=[ + "hydra-core>=1.1.0.dev7", + "loky>=3.2.0", + ], + include_package_data=True, +) diff --git a/plugins/hydra_loky_launcher/tests/__init__.py b/plugins/hydra_loky_launcher/tests/__init__.py new file mode 100644 index 00000000000..630523d76de --- /dev/null +++ b/plugins/hydra_loky_launcher/tests/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved diff --git a/plugins/hydra_loky_launcher/tests/test_loky_launcher.py b/plugins/hydra_loky_launcher/tests/test_loky_launcher.py new file mode 100644 index 00000000000..480943e4fd7 --- /dev/null +++ b/plugins/hydra_loky_launcher/tests/test_loky_launcher.py @@ -0,0 +1,104 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved +from typing import Any + +from hydra.core.plugins import Plugins +from hydra.plugins.launcher import Launcher +from hydra.test_utils.launcher_common_tests import ( + IntegrationTestSuite, + LauncherTestSuite, +) +from hydra.test_utils.test_utils import TSweepRunner, chdir_plugin_root +from pytest import mark + +from hydra_plugins.hydra_loky_launcher.loky_launcher import LokyLauncher + +chdir_plugin_root() + + +def test_discovery() -> None: + # Tests that this plugin can be discovered via the plugins subsystem when looking for Launchers + assert LokyLauncher.__name__ in [ + x.__name__ for x in Plugins.instance().discover(Launcher) + ] + + +@mark.parametrize("launcher_name, overrides", [("loky", [])]) +class TestLokyLauncher(LauncherTestSuite): + """ + Run the Launcher test suite on this launcher. + """ + + pass + + +@mark.parametrize( + "task_launcher_cfg, extra_flags", + [ + # joblib with process-based backend (default) + ( + {}, + [ + "-m", + "hydra/job_logging=hydra_debug", + "hydra/job_logging=disabled", + "hydra/launcher=loky", + ], + ) + ], +) +class TestLokyLauncherIntegration(IntegrationTestSuite): + """ + Run this launcher through the integration test suite. + """ + + pass + + +def test_example_app(hydra_sweep_runner: TSweepRunner, tmpdir: Any) -> None: + with hydra_sweep_runner( + calling_file="example/my_app.py", + calling_module=None, + task_function=None, + config_path=".", + config_name="config", + overrides=["task=1,2,3,4", f"hydra.sweep.dir={tmpdir}"], + ) as sweep: + overrides = {("task=1",), ("task=2",), ("task=3",), ("task=4",)} + + assert sweep.returns is not None and len(sweep.returns[0]) == 4 + for ret in sweep.returns[0]: + assert tuple(ret.overrides) in overrides + + +@mark.parametrize( + "overrides", + [ + "hydra.launcher.timeout=10", + ], +) +def test_example_app_launcher_overrides( + hydra_sweep_runner: TSweepRunner, overrides: str +) -> None: + with hydra_sweep_runner( + calling_file="example/my_app.py", + calling_module=None, + task_function=None, + config_path=".", + config_name="config", + overrides=[overrides], + ) as sweep: + assert sweep.returns is not None and len(sweep.returns[0]) == 1 diff --git a/plugins/hydra_multiprocessing_launcher/MANIFEST.in b/plugins/hydra_multiprocessing_launcher/MANIFEST.in new file mode 100644 index 00000000000..580709b1349 --- /dev/null +++ b/plugins/hydra_multiprocessing_launcher/MANIFEST.in @@ -0,0 +1,3 @@ +global-exclude *.pyc +global-exclude __pycache__ +recursive-include hydra_plugins/* *.yaml py.typed diff --git a/plugins/hydra_multiprocessing_launcher/NEWS.md b/plugins/hydra_multiprocessing_launcher/NEWS.md new file mode 100644 index 00000000000..8b137891791 --- /dev/null +++ b/plugins/hydra_multiprocessing_launcher/NEWS.md @@ -0,0 +1 @@ + diff --git a/plugins/hydra_multiprocessing_launcher/README.md b/plugins/hydra_multiprocessing_launcher/README.md new file mode 100644 index 00000000000..2cb7a907e58 --- /dev/null +++ b/plugins/hydra_multiprocessing_launcher/README.md @@ -0,0 +1,3 @@ +# Hydra Multiprocessing Launcher +Provides a [multiprocessing.Pool](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool) based Hydra Launcher supporting parallel worker pool execution. + diff --git a/plugins/hydra_multiprocessing_launcher/example/config.yaml b/plugins/hydra_multiprocessing_launcher/example/config.yaml new file mode 100644 index 00000000000..ba590cf449b --- /dev/null +++ b/plugins/hydra_multiprocessing_launcher/example/config.yaml @@ -0,0 +1,9 @@ +defaults: + - override hydra/launcher: multiprocessing + +task: 1 + +hydra: + launcher: + # override the number of jobs for joblib + processes: 10 diff --git a/plugins/hydra_multiprocessing_launcher/example/my_app.py b/plugins/hydra_multiprocessing_launcher/example/my_app.py new file mode 100644 index 00000000000..b39236a19b6 --- /dev/null +++ b/plugins/hydra_multiprocessing_launcher/example/my_app.py @@ -0,0 +1,20 @@ +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved +import logging +import os +import time + +import hydra +from omegaconf import DictConfig + +log = logging.getLogger(__name__) + + +@hydra.main(config_name="config") +def my_app(cfg: DictConfig) -> None: + log.info(f"Process ID {os.getpid()} executing task {cfg.task} ...") + + time.sleep(1) + + +if __name__ == "__main__": + my_app() diff --git a/plugins/hydra_multiprocessing_launcher/hydra_plugins/hydra_multiprocessing_launcher/__init__.py b/plugins/hydra_multiprocessing_launcher/hydra_plugins/hydra_multiprocessing_launcher/__init__.py new file mode 100644 index 00000000000..aaea960e4fd --- /dev/null +++ b/plugins/hydra_multiprocessing_launcher/hydra_plugins/hydra_multiprocessing_launcher/__init__.py @@ -0,0 +1,17 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved + +__version__ = "1.2.0" diff --git a/plugins/hydra_multiprocessing_launcher/hydra_plugins/hydra_multiprocessing_launcher/_core.py b/plugins/hydra_multiprocessing_launcher/hydra_plugins/hydra_multiprocessing_launcher/_core.py new file mode 100644 index 00000000000..c7bac466cb0 --- /dev/null +++ b/plugins/hydra_multiprocessing_launcher/hydra_plugins/hydra_multiprocessing_launcher/_core.py @@ -0,0 +1,183 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved +import logging +from multiprocessing import context +from pathlib import Path +from typing import Any, Dict, Union, List, Sequence +from itertools import repeat +from enum import Enum + +import cloudpickle + +from hydra.core.hydra_config import HydraConfig +from hydra.core.singleton import Singleton +from hydra.core.utils import ( + JobReturn, + configure_log, + filter_overrides, + run_job, + setup_globals, +) +from hydra.plugins.experiment_sequence import ExperimentSequence +from hydra.types import HydraContext, TaskFunction +from omegaconf import DictConfig, open_dict +import multiprocessing as mp + +from .multiprocessing_launcher import MultiprocessingLauncher + +log = logging.getLogger(__name__) + + +class WaitingStrategy(Enum): + FIRST_COMPLETED = 'first_completed' + ALL_COMPLETED = 'all_completed' + + +def execute_job( + idx: int, + overrides: Sequence[str], + hydra_context: HydraContext, + config: DictConfig, + task_function: TaskFunction, + singleton_state: Dict[Any, Any], +) -> JobReturn: + """Calls `run_job` in parallel""" + setup_globals() + Singleton.set_state(singleton_state) + + sweep_config = hydra_context.config_loader.load_sweep_config( + config, list(overrides) + ) + with open_dict(sweep_config): + sweep_config.hydra.job.id = "{}_{}".format(sweep_config.hydra.job.name, idx) + sweep_config.hydra.job.num = idx + HydraConfig.instance().set_config(sweep_config) + + ret = run_job( + hydra_context=hydra_context, + config=sweep_config, + task_function=task_function, + job_dir_key="hydra.sweep.dir", + job_subdir_key="hydra.sweep.subdir", + ) + + return ret + + +def _proxy_fn_call(*args): + args = [cloudpickle.loads(obj) for obj in args] + return cloudpickle.dumps(args[0](*args[1:])) + + +def process_multiprocessing_cfg(mp_cfg: Dict[str, Any]) -> None: + for k in ["timeout", "max_workers"]: + if k in mp_cfg.keys(): + try: + val = mp_cfg.get(k) + if val: + mp_cfg[k] = int(val) + except ValueError: + pass + + +def wait(async_result_iter, condition, return_when=WaitingStrategy.ALL_COMPLETED): + waiting_strategy = all if return_when is WaitingStrategy.ALL_COMPLETED else any + with condition: + condition.wait_for(lambda: waiting_strategy([res.ready() for res in async_result_iter])) + finished = [res for res in async_result_iter if res.ready()] + return finished + + +def launch( + launcher: MultiprocessingLauncher, + job_overrides: Union[Sequence[Sequence[str]], ExperimentSequence], + initial_job_idx: int, +) -> Sequence[JobReturn]: + """ + :param job_overrides: an Iterable of List, where each inner list is the arguments for one job run. + :param initial_job_idx: Initial job idx in batch. + :return: an array of return values from run_job with indexes corresponding to the input list indexes. + """ + setup_globals() + assert launcher.config is not None + assert launcher.task_function is not None + assert launcher.hydra_context is not None + + configure_log(launcher.config.hydra.hydra_logging, launcher.config.hydra.verbose) + sweep_dir = Path(str(launcher.config.hydra.sweep.dir)) + sweep_dir.mkdir(parents=True, exist_ok=True) + + # ProcessPoolExecutor's backend is hard-coded to loky since the threading + # backend is incompatible with Hydra + singleton_state = Singleton.get_state() + batch_size = v if (v := launcher.mp_config['processes']) else mp.cpu_count() + + runs = [None for _ in range(len(job_overrides))] + log.info( + "NestablePool({}) is launching {} jobs".format( + ",".join([f"{k}={v}" for k, v in launcher.mp_config.items()]), + 'generator of' if isinstance(job_overrides, ExperimentSequence) else len(job_overrides), + ) + ) + running_tasks = {} + + def notify_complete(_): + with launcher.condition: + launcher.condition.notify() + + for idx, override in enumerate(job_overrides): + log.info("\t#{} : {}".format(idx, " ".join(filter_overrides(override)))) + running_tasks[launcher.executor.apply_async( + _proxy_fn_call, + [cloudpickle.dumps(obj) + for obj in (execute_job, + initial_job_idx + idx, + override, + launcher.hydra_context, + launcher.config, + launcher.task_function, + singleton_state)], + callback=notify_complete, + error_callback=notify_complete + )] = (override, idx) + + if len(running_tasks) == batch_size: + finished = wait(running_tasks, condition=launcher.condition, return_when=WaitingStrategy.FIRST_COMPLETED) + overrides = [running_tasks[f] for f in finished] + results = [cloudpickle.loads(f.get()) for f in finished] + running_tasks = {task: running_tasks[task] for task in running_tasks if task not in finished} + + for (_, idx), res in zip(overrides, results): + runs[idx] = res + if isinstance(job_overrides, ExperimentSequence): + for (override, _), res in zip(overrides, results): + job_overrides.update_sequence((override, res)) + + finished = wait(running_tasks, condition=launcher.condition, return_when=WaitingStrategy.ALL_COMPLETED) + overrides = [running_tasks[f] for f in finished] + results = [cloudpickle.loads(f.get()) for f in finished] + + for (_, idx), res in zip(overrides, results): + runs[idx] = res + if isinstance(job_overrides, ExperimentSequence): + for (override, _), res in zip(overrides, results): + job_overrides.update_sequence((override, res)) + + #launcher.executor.close() + assert isinstance(runs, List) + for run in runs: + assert isinstance(run, JobReturn) + return runs diff --git a/plugins/hydra_multiprocessing_launcher/hydra_plugins/hydra_multiprocessing_launcher/config.py b/plugins/hydra_multiprocessing_launcher/hydra_plugins/hydra_multiprocessing_launcher/config.py new file mode 100644 index 00000000000..4d1aba98488 --- /dev/null +++ b/plugins/hydra_multiprocessing_launcher/hydra_plugins/hydra_multiprocessing_launcher/config.py @@ -0,0 +1,38 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved +from dataclasses import dataclass +from typing import Optional + +from hydra.core.config_store import ConfigStore + + +@dataclass +class MultiprocessingLauncherConf: + _target_: str = "hydra_plugins.hydra_multiprocessing_launcher.multiprocessing_launcher.MultiprocessingLauncher" + + # maximum number of concurrently running jobs. if None, all CPUs are used + processes: Optional[int] = None + + # the number of tasks a worker process can complete before it will exit and be replaced with a fresh worker process + maxtasksperchild: Optional[int] = None + + +ConfigStore.instance().store( + group="hydra/launcher", + name="multiprocessing", + node=MultiprocessingLauncherConf, + provider="multiprocessing_launcher", +) diff --git a/plugins/hydra_multiprocessing_launcher/hydra_plugins/hydra_multiprocessing_launcher/multiprocessing_launcher.py b/plugins/hydra_multiprocessing_launcher/hydra_plugins/hydra_multiprocessing_launcher/multiprocessing_launcher.py new file mode 100644 index 00000000000..92f68a2a82a --- /dev/null +++ b/plugins/hydra_multiprocessing_launcher/hydra_plugins/hydra_multiprocessing_launcher/multiprocessing_launcher.py @@ -0,0 +1,107 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved +import logging +from typing import Any, Optional, Sequence, Union +import multiprocessing as mp +import multiprocessing.pool + +from hydra.core.utils import JobReturn +from hydra.plugins.launcher import Launcher +from hydra.plugins.experiment_sequence import ExperimentSequence +from hydra.types import HydraContext, TaskFunction +from omegaconf import DictConfig + +log = logging.getLogger(__name__) + + +class NoDaemonProcess(mp.context.SpawnProcess): + @property + def daemon(self): + return False + + @daemon.setter + def daemon(self, value): + pass + + +class NoDaemonContext(mp.context.BaseContext): + _name = 'spawn' + Process = NoDaemonProcess + + +# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool +# because the latter is only a wrapper function, not a proper class. +class NestablePool(mp.pool.Pool): + def __init__(self, *args, **kwargs): + kwargs['context'] = NoDaemonContext() + super(NestablePool, self).__init__(*args, **kwargs) + + +class MultiprocessingLauncher(Launcher): + def __init__(self, **kwargs: Any) -> None: + """Multiprocessing Launcher + + Launches parallel jobs using modified multiprocessing process pool. For details, refer to: + https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool + and https://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic + + Custom NestablePool is created to allow spawned jobs to create own threads (ex. multi-worker DataLoaders) + WARNING: NestablePool uses non daemonic processes, resouce menagement is on the user side. + We recomend setting `maxtasksperchild=1` + + This plugin is based on the idea and inital implementation of joblib launcher. + """ + self.config: Optional[DictConfig] = None + self.task_function: Optional[TaskFunction] = None + self.hydra_context: Optional[HydraContext] = None + self.executor = None + self.mp_config = kwargs + + def setup( + self, + *, + hydra_context: HydraContext, + task_function: TaskFunction, + config: DictConfig, + ) -> None: + self.config = config + self.task_function = task_function + self.hydra_context = hydra_context + self.condition = mp.Condition() + self.executor = NestablePool(**self.mp_config) + + def launch( + self, job_overrides: Sequence[Sequence[str]], initial_job_idx: int + ) -> Sequence[JobReturn]: + from . import _core + + return _core.launch( + launcher=self, job_overrides=job_overrides, initial_job_idx=initial_job_idx + ) + + def launch_experiment_sequence( + self, job_overrides: ExperimentSequence, initial_job_idx: int + ) -> Sequence[JobReturn]: + from . import _core + + return _core.launch( + launcher=self, job_overrides=job_overrides, initial_job_idx=initial_job_idx + ) + + def __del__(self): + if self.executor: + self.executor.close() + del self.executor \ No newline at end of file diff --git a/plugins/hydra_multiprocessing_launcher/hydra_plugins/hydra_multiprocessing_launcher/py.typed b/plugins/hydra_multiprocessing_launcher/hydra_plugins/hydra_multiprocessing_launcher/py.typed new file mode 100644 index 00000000000..e69de29bb2d diff --git a/plugins/hydra_multiprocessing_launcher/news/.gitignore b/plugins/hydra_multiprocessing_launcher/news/.gitignore new file mode 100644 index 00000000000..b722e9e13ef --- /dev/null +++ b/plugins/hydra_multiprocessing_launcher/news/.gitignore @@ -0,0 +1 @@ +!.gitignore \ No newline at end of file diff --git a/plugins/hydra_multiprocessing_launcher/pyproject.toml b/plugins/hydra_multiprocessing_launcher/pyproject.toml new file mode 100644 index 00000000000..3c4965e8b4c --- /dev/null +++ b/plugins/hydra_multiprocessing_launcher/pyproject.toml @@ -0,0 +1,43 @@ +[build-system] +requires = ["setuptools", "wheel", "read-version"] +build-backend = "setuptools.build_meta" + + +[tool.towncrier] + package = "hydra_plugins.hydra_multiprocessing_launcher" + filename = "NEWS.md" + directory = "news/" + title_format = "{version} ({project_date})" + template = "../../news/_template.rst" + issue_format = "[#{issue}](https://github.com/facebookresearch/hydra/issues/{issue})" + start_string = "\n" + + [[tool.towncrier.type]] + directory = "feature" + name = "Features" + showcontent = true + + [[tool.towncrier.type]] + directory = "api_change" + name = "API Change (Renames, deprecations and removals)" + showcontent = true + + [[tool.towncrier.type]] + directory = "bugfix" + name = "Bug Fixes" + showcontent = true + + [[tool.towncrier.type]] + directory = "config" + name = "Configuration structure changes" + showcontent = true + + [[tool.towncrier.type]] + directory = "docs" + name = "Improved Documentation" + showcontent = true + + [[tool.towncrier.type]] + directory = "maintenance" + name = "Maintenance Changes" + showcontent = true diff --git a/plugins/hydra_multiprocessing_launcher/setup.py b/plugins/hydra_multiprocessing_launcher/setup.py new file mode 100644 index 00000000000..48be54191c8 --- /dev/null +++ b/plugins/hydra_multiprocessing_launcher/setup.py @@ -0,0 +1,47 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved +# type: ignore +from pathlib import Path + +from read_version import read_version +from setuptools import find_namespace_packages, setup + +setup( + name="hydra-multiprocessing-launcher", + version=read_version("hydra_plugins/hydra_multiprocessing_launcher", "__init__.py"), + author="Dima Zhylko, Jan Bączek", + author_email="dzhylko@nvidia.com, jbaczek@nvidia.com", + description="Multiprocessing Launcher for Hydra apps", + long_description=(Path(__file__).parent / "README.md").read_text(), + long_description_content_type="text/markdown", + url="https://github.com/facebookresearch/hydra/", + packages=find_namespace_packages(include=["hydra_plugins.*"]), + classifiers=[ + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Operating System :: MacOS", + "Operating System :: Microsoft :: Windows", + "Operating System :: POSIX :: Linux", + ], + install_requires=[ + "hydra-core>=1.1.0.dev7", + "cloudpickle", + ], + include_package_data=True, +) diff --git a/plugins/hydra_multiprocessing_launcher/tests/__init__.py b/plugins/hydra_multiprocessing_launcher/tests/__init__.py new file mode 100644 index 00000000000..630523d76de --- /dev/null +++ b/plugins/hydra_multiprocessing_launcher/tests/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved diff --git a/plugins/hydra_multiprocessing_launcher/tests/test_multiprocessing_launcher.py b/plugins/hydra_multiprocessing_launcher/tests/test_multiprocessing_launcher.py new file mode 100644 index 00000000000..d4cb2c14265 --- /dev/null +++ b/plugins/hydra_multiprocessing_launcher/tests/test_multiprocessing_launcher.py @@ -0,0 +1,105 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved +from typing import Any + +from hydra.core.plugins import Plugins +from hydra.plugins.launcher import Launcher +from hydra.test_utils.launcher_common_tests import ( + IntegrationTestSuite, + LauncherTestSuite, +) +from hydra.test_utils.test_utils import TSweepRunner, chdir_plugin_root +from pytest import mark + +from hydra_plugins.hydra_multiprocessing_launcher.multiprocessing_launcher import MultiprocessingLauncher + +chdir_plugin_root() + + +def test_discovery() -> None: + # Tests that this plugin can be discovered via the plugins subsystem when looking for Launchers + assert MultiprocessingLauncher.__name__ in [ + x.__name__ for x in Plugins.instance().discover(Launcher) + ] + + +@mark.parametrize("launcher_name, overrides", [("multiprocessing", [])]) +class TestMultiprocessingLauncher(LauncherTestSuite): + """ + Run the Launcher test suite on this launcher. + """ + + pass + + +@mark.parametrize( + "task_launcher_cfg, extra_flags", + [ + # multiprocessing with process-based backend (default) + ( + {}, + [ + "-m", + "hydra/job_logging=hydra_debug", + "hydra/job_logging=disabled", + "hydra/launcher=multiprocessing", + ], + ) + ], +) +class TestMultiprocessingLauncherIntegration(IntegrationTestSuite): + """ + Run this launcher through the integration test suite. + """ + + pass + + +def test_example_app(hydra_sweep_runner: TSweepRunner, tmpdir: Any) -> None: + with hydra_sweep_runner( + calling_file="example/my_app.py", + calling_module=None, + task_function=None, + config_path=".", + config_name="config", + overrides=["task=1,2,3,4", f"hydra.sweep.dir={tmpdir}"], + ) as sweep: + overrides = {("task=1",), ("task=2",), ("task=3",), ("task=4",)} + + assert sweep.returns is not None and len(sweep.returns[0]) == 4 + for ret in sweep.returns[0]: + assert tuple(ret.overrides) in overrides + + +@mark.parametrize( + "overrides", + [ + "hydra.launcher.processes=1", + "hydra.launcher.maxtasksperchild=1" + ], +) +def test_example_app_launcher_overrides( + hydra_sweep_runner: TSweepRunner, overrides: str +) -> None: + with hydra_sweep_runner( + calling_file="example/my_app.py", + calling_module=None, + task_function=None, + config_path=".", + config_name="config", + overrides=[overrides], + ) as sweep: + assert sweep.returns is not None and len(sweep.returns[0]) == 1 diff --git a/plugins/hydra_optuna_sweeper/example/experiment-sequence-conf/config.yaml b/plugins/hydra_optuna_sweeper/example/experiment-sequence-conf/config.yaml new file mode 100644 index 00000000000..d32399e2072 --- /dev/null +++ b/plugins/hydra_optuna_sweeper/example/experiment-sequence-conf/config.yaml @@ -0,0 +1,23 @@ +defaults: + - override hydra/sweeper: optuna_v2 + - override hydra/sweeper/sampler: tpe + +hydra: + sweeper: + sampler: + seed: 123 + direction: minimize + study_name: sphere + storage: null + n_trials: 20 + max_failure_rate: 0.0 + params: + x: range(-5.5, 5.5, step=0.5) + y: choice(-5 ,0 ,5) + +x: 1 +y: 1 +z: 1 + +# if true, simulate a failure by raising an exception +error: false diff --git a/plugins/hydra_optuna_sweeper/example/sphere_sequence.py b/plugins/hydra_optuna_sweeper/example/sphere_sequence.py new file mode 100644 index 00000000000..2db8b5abd93 --- /dev/null +++ b/plugins/hydra_optuna_sweeper/example/sphere_sequence.py @@ -0,0 +1,32 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved +import hydra +from omegaconf import DictConfig + + +@hydra.main(version_base=None, config_path="experiment-sequence-conf", config_name="config") +def sphere(cfg: DictConfig) -> float: + x: float = cfg.x + y: float = cfg.y + + if cfg.get("error", False): + raise RuntimeError("cfg.error is True") + + return x**2 + y**2 + + +if __name__ == "__main__": + sphere() \ No newline at end of file diff --git a/plugins/hydra_optuna_sweeper/hydra_plugins/hydra_optuna_sweeper/_impl.py b/plugins/hydra_optuna_sweeper/hydra_plugins/hydra_optuna_sweeper/_impl.py index d17bbb1f70a..95963f71b32 100644 --- a/plugins/hydra_optuna_sweeper/hydra_plugins/hydra_optuna_sweeper/_impl.py +++ b/plugins/hydra_optuna_sweeper/hydra_plugins/hydra_optuna_sweeper/_impl.py @@ -1,3 +1,17 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved import functools import logging @@ -27,9 +41,10 @@ Transformer, ) from hydra.core.plugins import Plugins -from hydra.plugins.sweeper import Sweeper +from hydra.plugins.sweeper import Sweeper +from hydra.plugins.experiment_sequence import ExperimentSequence from hydra.types import HydraContext, TaskFunction -from hydra.utils import get_method +from hydra.utils import get_method, instantiate from omegaconf import DictConfig, OmegaConf from optuna.distributions import ( BaseDistribution, @@ -146,6 +161,124 @@ def create_params_from_overrides( return search_space_distributions, fixed_params +class OptunaExperimentSequence(ExperimentSequence): + def __init__(self, + study, + num_experiments, + search_space_distributions, + fixed_params, + directions, + custom_search_space_extender, + max_failure_rate=0.0, + is_grid_sampler=False, + config=None) -> None: + self.study = study + self.num_experiments = num_experiments + self.search_space_distributions = search_space_distributions + self.fixed_params = fixed_params + self.directions = directions + self.custom_search_space_extender = custom_search_space_extender + self.max_failure_rate = max_failure_rate + self.fault_tolerance = int(num_experiments * max_failure_rate) + self.is_grid_sampler = is_grid_sampler + self.config = config + self.idx = -1 + self.override_trial_mapping = {} + + def _configure_trial( + self, + trial: Trial, + search_space_distributions: Dict[str, BaseDistribution], + fixed_params: Dict[str, Any], + ) -> Sequence[str]: + for param_name, distribution in search_space_distributions.items(): + assert type(param_name) is str + trial._suggest(param_name, distribution) + for param_name, value in fixed_params.items(): + trial.set_user_attr(param_name, value) + + if self.custom_search_space_extender: + assert self.config is not None + self.custom_search_space_extender(self.config, trial) + + overlap = trial.params.keys() & trial.user_attrs + if len(overlap): + raise ValueError( + "Overlapping fixed parameters and search space parameters found!" + f"Overlapping parameters: {list(overlap)}" + ) + params = dict(trial.params) + params.update(fixed_params) + + return tuple(f"{name}={val}" for name, val in params.items()) + + def update_sequence(self, experiment_result: Tuple[Sequence[str], Any]): + override, ret = experiment_result + trial = self.override_trial_mapping[override] + values: Optional[List[float]] = None + state: optuna.trial.TrialState = optuna.trial.TrialState.COMPLETE + try: + if len(self.directions) == 1: + try: + values = [float(ret.return_value)] + except (ValueError, TypeError): + raise ValueError( + f"Return value must be float-castable. Got '{ret.return_value}'." + ).with_traceback(sys.exc_info()[2]) + else: + try: + values = [float(v) for v in ret.return_value] + except (ValueError, TypeError): + raise ValueError( + "Return value must be a list or tuple of float-castable values." + f" Got '{ret.return_value}'." + ).with_traceback(sys.exc_info()[2]) + if len(values) != len(self.directions): + raise ValueError( + "The number of the values and the number of the objectives are" + f" mismatched. Expect {len(self.directions)}, but actually {len(values)}." + ) + + try: + self.study.tell(trial=trial, state=state, values=values) + except RuntimeError as e: + if ( + self.is_grid_sampler + and "`Study.stop` is supposed to be invoked inside an objective function or a callback." + in str(e) + ): + pass + else: + raise e + + except Exception as e: + state = optuna.trial.TrialState.FAIL + self.study.tell(trial=trial, state=state, values=values) + log.warning(f"Failed experiment: {e}") + self.fault_tolerance -= 1 + + # raise if too many failures + if self.fault_tolerance < 0: + log.error( + f"Failed more then allowed {int(self.num_experiments * self.max_failure_rate)} time(s) " + f"out of total {self.num_experiments} experiments with max_failure_rate={self.max_failure_rate}." + ) + ret.return_value # delegate raising to JobReturn, with actual traceback + + def __next__(self) -> Sequence[str]: + self.idx += 1 + if self.idx < self.num_experiments: + trial = self.study.ask() + override = self._configure_trial(trial, self.search_space_distributions, self.fixed_params) + self.override_trial_mapping[override] = trial + return override + else: + raise StopIteration + + def __len__(self): + return self.num_experiments + + class OptunaSweeperImpl(Sweeper): def __init__( self, @@ -154,11 +287,12 @@ def __init__( storage: Optional[Any], study_name: Optional[str], n_trials: int, - n_jobs: int, + n_jobs: Optional[int], max_failure_rate: float, search_space: Optional[DictConfig], custom_search_space: Optional[str], params: Optional[DictConfig], + experiment_sequence: Optional[str] = None, ) -> None: self.sampler = sampler self.direction = direction @@ -178,6 +312,7 @@ def __init__( self.params = params self.job_idx: int = 0 self.search_space_distributions: Optional[Dict[str, BaseDistribution]] = None + self.experiment_sequence_inst = experiment_sequence def _process_searchspace_config(self) -> None: url = "https://hydra.cc/docs/upgrades/1.1_to_1.2/changes_to_sweeper_config/" @@ -226,7 +361,7 @@ def _get_directions(self) -> List[str]: elif isinstance(self.direction, str): return [self.direction] return [self.direction.name] - + def _configure_trials( self, trials: List[Trial], @@ -290,7 +425,7 @@ def sweep(self, arguments: List[str]) -> None: is_grid_sampler = ( isinstance(self.sampler, functools.partial) - and self.sampler.func == optuna.samplers.GridSampler + and self.sampler.func == optuna.samplers.GridSampler # type: ignore ) ( @@ -340,71 +475,92 @@ def sweep(self, arguments: List[str]) -> None: batch_size = self.n_jobs n_trials_to_go = self.n_trials - while n_trials_to_go > 0: - batch_size = min(n_trials_to_go, batch_size) - - trials = [study.ask() for _ in range(batch_size)] - overrides = self._configure_trials( - trials, search_space_distributions, fixed_params - ) + if self.experiment_sequence_inst is not None: + if batch_size is not None: + warnings.warn( + "Parameter sweeper.config.n_jobs is unused for optuna_v2." + "\n Job scheduling was delegated to launcher. Use launcher.config.n_jobs(or equivalent) instead." + ) + + experiment_sequence = instantiate({ + "_target_": self.experiment_sequence_inst, + "study": study, + "num_experiments": n_trials_to_go, + "search_space_distributions": search_space_distributions, + "fixed_params": fixed_params, + "directions": directions, + "custom_search_space_extender": self.custom_search_space_extender, + "max_failure_rate": self.max_failure_rate, + "is_grid_sampler": is_grid_sampler, + #"config": self.config + }) + self.launcher.launch_experiment_sequence(experiment_sequence, initial_job_idx=self.job_idx) + else: + while n_trials_to_go > 0: + batch_size = min(n_trials_to_go, batch_size) - returns = self.launcher.launch(overrides, initial_job_idx=self.job_idx) - self.job_idx += len(returns) - failures = [] - for trial, ret in zip(trials, returns): - values: Optional[List[float]] = None - state: optuna.trial.TrialState = optuna.trial.TrialState.COMPLETE - try: - if len(directions) == 1: - try: - values = [float(ret.return_value)] - except (ValueError, TypeError): - raise ValueError( - f"Return value must be float-castable. Got '{ret.return_value}'." - ).with_traceback(sys.exc_info()[2]) - else: - try: - values = [float(v) for v in ret.return_value] - except (ValueError, TypeError): - raise ValueError( - "Return value must be a list or tuple of float-castable values." - f" Got '{ret.return_value}'." - ).with_traceback(sys.exc_info()[2]) - if len(values) != len(directions): - raise ValueError( - "The number of the values and the number of the objectives are" - f" mismatched. Expect {len(directions)}, but actually {len(values)}." - ) + trials = [study.ask() for _ in range(batch_size)] + overrides = self._configure_trials( + trials, search_space_distributions, fixed_params + ) + returns = self.launcher.launch(overrides, initial_job_idx=self.job_idx) + self.job_idx += len(returns) + failures = [] + for trial, ret in zip(trials, returns): + values: Optional[List[float]] = None + state: optuna.trial.TrialState = optuna.trial.TrialState.COMPLETE try: - study.tell(trial=trial, state=state, values=values) - except RuntimeError as e: - if ( - is_grid_sampler - and "`Study.stop` is supposed to be invoked inside an objective function or a callback." - in str(e) - ): - pass + if len(directions) == 1: + try: + values = [float(ret.return_value)] + except (ValueError, TypeError): + raise ValueError( + f"Return value must be float-castable. Got '{ret.return_value}'." + ).with_traceback(sys.exc_info()[2]) else: - raise e - - except Exception as e: - state = optuna.trial.TrialState.FAIL - study.tell(trial=trial, state=state, values=values) - log.warning(f"Failed experiment: {e}") - failures.append(e) - - # raise if too many failures - if len(failures) / len(returns) > self.max_failure_rate: - log.error( - f"Failed {failures} times out of {len(returns)} " - f"with max_failure_rate={self.max_failure_rate}." - ) - assert len(failures) > 0 - for ret in returns: - ret.return_value # delegate raising to JobReturn, with actual traceback + try: + values = [float(v) for v in ret.return_value] + except (ValueError, TypeError): + raise ValueError( + "Return value must be a list or tuple of float-castable values." + f" Got '{ret.return_value}'." + ).with_traceback(sys.exc_info()[2]) + if len(values) != len(directions): + raise ValueError( + "The number of the values and the number of the objectives are" + f" mismatched. Expect {len(directions)}, but actually {len(values)}." + ) - n_trials_to_go -= batch_size + try: + study.tell(trial=trial, state=state, values=values) + except RuntimeError as e: + if ( + is_grid_sampler + and "`Study.stop` is supposed to be invoked inside an objective function or a callback." + in str(e) + ): + pass + else: + raise e + + except Exception as e: + state = optuna.trial.TrialState.FAIL + study.tell(trial=trial, state=state, values=values) + log.warning(f"Failed experiment: {e}") + failures.append(e) + + # raise if too many failures + if len(failures) / len(returns) > self.max_failure_rate: + log.error( + f"Failed {failures} times out of {len(returns)} " + f"with max_failure_rate={self.max_failure_rate}." + ) + assert len(failures) > 0 + for ret in returns: + ret.return_value # delegate raising to JobReturn, with actual traceback + + n_trials_to_go -= batch_size results_to_serialize: Dict[str, Any] if len(directions) < 2: diff --git a/plugins/hydra_optuna_sweeper/hydra_plugins/hydra_optuna_sweeper/config.py b/plugins/hydra_optuna_sweeper/hydra_plugins/hydra_optuna_sweeper/config.py index 03fde5a12d0..9f812af23df 100644 --- a/plugins/hydra_optuna_sweeper/hydra_plugins/hydra_optuna_sweeper/config.py +++ b/plugins/hydra_optuna_sweeper/hydra_plugins/hydra_optuna_sweeper/config.py @@ -143,6 +143,58 @@ class DistributionConfig: defaults = [{"sampler": "tpe"}] +@dataclass +class OptunaSweeperConfV2: + _target_: str = "hydra_plugins.hydra_optuna_sweeper.optuna_sweeper.OptunaSweeper" + defaults: List[Any] = field(default_factory=lambda: defaults) + + # Sampling algorithm + # Please refer to the reference for further details + # https://optuna.readthedocs.io/en/stable/reference/samplers.html + sampler: SamplerConfig = MISSING + + # Direction of optimization + # Union[Direction, List[Direction]] + direction: Any = Direction.minimize + + # Storage URL to persist optimization results + # For example, you can use SQLite if you set 'sqlite:///example.db' + # Please refer to the reference for further details + # https://optuna.readthedocs.io/en/stable/reference/storages.html + storage: Optional[Any] = None + + # Name of study to persist optimization results + study_name: Optional[str] = None + + # Total number of function evaluations + n_trials: int = 20 + + # Number of parallel workers unused in optuna_v2 because scheduling is delegeted to launcher + n_jobs: Optional[int] = None + + # Maximum authorized failure rate for a batch of parameters + max_failure_rate: float = 0.0 + + search_space: Optional[Dict[str, Any]] = None + + params: Optional[Dict[str, str]] = None + + # Allow custom trial configuration via Python methods. + # If given, `custom_search_space` should be a an instantiate-style dotpath targeting + # a callable with signature Callable[[DictConfig, optuna.trial.Trial], None]. + # https://optuna.readthedocs.io/en/stable/tutorial/10_key_features/002_configurations.html + custom_search_space: Optional[str] = None + + experiment_sequence: str = "hydra_plugins.hydra_optuna_sweeper._impl.OptunaExperimentSequence" + + +ConfigStore.instance().store( + group="hydra/sweeper", + name="optuna_v2", + node=OptunaSweeperConfV2, + provider="optuna_sweeper", +) + @dataclass class OptunaSweeperConf: _target_: str = "hydra_plugins.hydra_optuna_sweeper.optuna_sweeper.OptunaSweeper" diff --git a/plugins/hydra_optuna_sweeper/hydra_plugins/hydra_optuna_sweeper/optuna_sweeper.py b/plugins/hydra_optuna_sweeper/hydra_plugins/hydra_optuna_sweeper/optuna_sweeper.py index c53d5d7b5d0..41506705108 100644 --- a/plugins/hydra_optuna_sweeper/hydra_plugins/hydra_optuna_sweeper/optuna_sweeper.py +++ b/plugins/hydra_optuna_sweeper/hydra_plugins/hydra_optuna_sweeper/optuna_sweeper.py @@ -1,3 +1,17 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved from typing import Any, List, Optional @@ -18,11 +32,12 @@ def __init__( storage: Optional[Any], study_name: Optional[str], n_trials: int, - n_jobs: int, + n_jobs: Optional[int], max_failure_rate: float, search_space: Optional[DictConfig], custom_search_space: Optional[str], params: Optional[DictConfig], + experiment_sequence: Optional[str] = None, ) -> None: from ._impl import OptunaSweeperImpl @@ -37,6 +52,7 @@ def __init__( search_space, custom_search_space, params, + experiment_sequence, ) def setup( diff --git a/plugins/hydra_optuna_sweeper/setup.py b/plugins/hydra_optuna_sweeper/setup.py index 23a5a5435c5..c79019b11fd 100644 --- a/plugins/hydra_optuna_sweeper/setup.py +++ b/plugins/hydra_optuna_sweeper/setup.py @@ -29,6 +29,7 @@ install_requires=[ "hydra-core>=1.1.0.dev7", "optuna>=2.10.0,<3.0.0", + "sqlalchemy==1.3.24", ], include_package_data=True, ) diff --git a/plugins/hydra_optuna_sweeper/tests/test_optuna_sweeper_plugin.py b/plugins/hydra_optuna_sweeper/tests/test_optuna_sweeper_plugin.py index f042937a8fd..f15bec1055c 100644 --- a/plugins/hydra_optuna_sweeper/tests/test_optuna_sweeper_plugin.py +++ b/plugins/hydra_optuna_sweeper/tests/test_optuna_sweeper_plugin.py @@ -1,3 +1,17 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved import os import sys @@ -193,6 +207,48 @@ def test_optuna_example(with_commandline: bool, tmpdir: Path) -> None: assert returns["best_value"] <= 2.27 +@mark.parametrize("with_commandline", (True, False)) +def test_optuna_v2_example(with_commandline: bool, tmpdir: Path) -> None: + storage = "sqlite:///" + os.path.join(str(tmpdir), "test.db") + study_name = "test-optuna-v2-example" + cmd = [ + "example/sphere_sequence.py", + "--multirun", + "hydra/sweeper=optuna_v2", + "hydra.sweep.dir=" + str(tmpdir), + "hydra.job.chdir=True", + "hydra.sweeper.n_trials=20", + f"hydra.sweeper.storage={storage}", + f"hydra.sweeper.study_name={study_name}", + "hydra/sweeper/sampler=tpe", + "hydra.sweeper.sampler.seed=123", + "~z", + ] + if with_commandline: + cmd += [ + "x=choice(0, 1, 2)", + "y=0", # Fixed parameter + ] + run_python_script(cmd) + returns = OmegaConf.load(f"{tmpdir}/optimization_results.yaml") + study = optuna.load_study(storage=storage, study_name=study_name) + best_trial = study.best_trial + assert isinstance(returns, DictConfig) + assert returns.name == "optuna" + assert returns["best_params"]["x"] == best_trial.params["x"] + if with_commandline: + assert "y" not in returns["best_params"] + assert "y" not in best_trial.params + else: + assert returns["best_params"]["y"] == best_trial.params["y"] + assert returns["best_value"] == best_trial.value + # Check the search performance of the TPE sampler. + # The threshold is the 95th percentile calculated with 1000 different seed values + # to make the test robust against the detailed implementation of the sampler. + # See https://github.com/facebookresearch/hydra/pull/1746#discussion_r681549830. + assert returns["best_value"] <= 2.27 + + @mark.parametrize("num_trials", (10, 1)) def test_example_with_grid_sampler( tmpdir: Path,