Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Working prototype of experiment sequence #2461

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion hydra/_internal/core_plugins/basic_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import logging
from dataclasses import dataclass
from pathlib import Path
from typing import List, Optional, Sequence
from typing import List, Optional, Sequence, Union
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from typing import List, Optional, Sequence, Union
from typing import List, Optional, Sequence


from omegaconf import DictConfig, open_dict

Expand All @@ -14,6 +14,7 @@
run_job,
setup_globals,
)
from hydra.plugins.experiment_sequence import ExperimentSequence
from hydra.plugins.launcher import Launcher
from hydra.types import HydraContext, TaskFunction

Expand Down Expand Up @@ -65,6 +66,7 @@ def launch(
idx = initial_job_idx + idx
lst = " ".join(filter_overrides(overrides))
log.info(f"\t#{idx} : {lst}")

sweep_config = self.hydra_context.config_loader.load_sweep_config(
self.config, list(overrides)
)
Expand All @@ -81,3 +83,40 @@ def launch(
runs.append(ret)
configure_log(self.config.hydra.hydra_logging, self.config.hydra.verbose)
return runs

def launch_experiment_sequence(
self, job_overrides: ExperimentSequence, initial_job_idx: int
) -> Sequence[JobReturn]:
setup_globals()
assert self.hydra_context is not None
assert self.config is not None
assert self.task_function is not None

configure_log(self.config.hydra.hydra_logging, self.config.hydra.verbose)
sweep_dir = self.config.hydra.sweep.dir
Path(str(sweep_dir)).mkdir(parents=True, exist_ok=True)
log.info(f"Launching {len(job_overrides)} jobs locally")
runs: List[JobReturn] = []
for idx, overrides in enumerate(job_overrides):
idx = initial_job_idx + idx
lst = " ".join(filter_overrides(overrides))
log.info(f"\t#{idx} : {lst}")

sweep_config = self.hydra_context.config_loader.load_sweep_config(
self.config, list(overrides)
)
with open_dict(sweep_config):
sweep_config.hydra.job.id = idx
sweep_config.hydra.job.num = idx
ret = run_job(
hydra_context=self.hydra_context,
task_function=self.task_function,
config=sweep_config,
job_dir_key="hydra.sweep.dir",
job_subdir_key="hydra.sweep.subdir",
)
runs.append(ret)
if isinstance(job_overrides, ExperimentSequence):
job_overrides.update_sequence((overrides, ret))
configure_log(self.config.hydra.hydra_logging, self.config.hydra.verbose)
return runs
38 changes: 38 additions & 0 deletions hydra/plugins/experiment_sequence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from abc import abstractmethod
import typing

from collections.abc import Iterator
from typing import Any, Sequence, Tuple


class ExperimentSequence(Iterator):
@abstractmethod
def __next__(self):
"""Return tuple of experiment id, optional trial object and experiment overrides."""
raise NotImplementedError()

def __iter__(self) -> typing.Iterator[Sequence[str]]:
return self

@abstractmethod
def update_sequence(self, experiment_result: Tuple[Sequence[str], Any]):
"""Update experiment generator(study) with experiment results"""
raise NotImplementedError()

def __len__(self):
"""Return maximum number of experiments sequence can produce"""
raise NotImplementedError()
27 changes: 26 additions & 1 deletion hydra/plugins/launcher.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
"""
Launcher plugin interface
Expand All @@ -8,7 +22,7 @@
from omegaconf import DictConfig

from hydra.core.utils import JobReturn

from hydra.plugins.experiment_sequence import ExperimentSequence
from hydra.types import TaskFunction, HydraContext

from .plugin import Plugin
Expand Down Expand Up @@ -37,3 +51,14 @@ def launch(
:param initial_job_idx: Initial job idx. used by sweepers that executes several batches
"""
raise NotImplementedError()

def launch_experiment_sequence(
self, job_overrides: ExperimentSequence, initial_job_idx: int
) -> Sequence[JobReturn]:
"""
:param job_overrides: a batch of job arguments
:param initial_job_idx: Initial job idx. used by sweepers that executes several batches
"""
raise NotImplementedError(
"This launcher doesn't support launching experiment sequence."
)
14 changes: 14 additions & 0 deletions hydra/plugins/sweeper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
Comment on lines +1 to 15
Copy link
Collaborator

@Jasha10 Jasha10 Dec 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved

No need to add the license to sweeper.py since sweeper.py is not otherwise modified.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me check, maybe I have added some changes and haven't committed them.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I originally had ExperimentSequence in sweeper file and forgot to remove license, sorry. I think this is otherwise good to go (except we can refactor multiprocessing launcher, but it would take too much time, so I think it will be next PR)

"""
Sweeper plugin interface
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import logging
from pathlib import Path
from typing import Any, Dict, List, Sequence
from typing import Any, Dict, Union, List, Sequence

from hydra.core.hydra_config import HydraConfig
from hydra.core.singleton import Singleton
Expand All @@ -12,9 +12,11 @@
run_job,
setup_globals,
)
from hydra.plugins.experiment_sequence import ExperimentSequence
from hydra.types import HydraContext, TaskFunction
from joblib import Parallel, delayed # type: ignore
from omegaconf import DictConfig, open_dict
import multiprocessing as mp

from .joblib_launcher import JoblibLauncher

Expand Down Expand Up @@ -63,13 +65,22 @@ def process_joblib_cfg(joblib_cfg: Dict[str, Any]) -> None:
pass


def _batch_sequence(sequence, batch_size=1):
while True:
overrides = [experiment_config for _, experiment_config in zip(range(batch_size), sequence)]
if overrides:
yield overrides
if len(overrides) != batch_size:
raise StopIteration


def launch(
launcher: JoblibLauncher,
job_overrides: Sequence[Sequence[str]],
job_overrides: Union[Sequence[Sequence[str]], ExperimentSequence],
initial_job_idx: int,
) -> Sequence[JobReturn]:
"""
:param job_overrides: a List of List<String>, where each inner list is the arguments for one job run.
:param job_overrides: an Iterable of List<String>, where each inner list is the arguments for one job run.
:param initial_job_idx: Initial job idx in batch.
:return: an array of return values from run_job with indexes corresponding to the input list indexes.
"""
Expand All @@ -87,30 +98,54 @@ def launch(
joblib_cfg = launcher.joblib
joblib_cfg["backend"] = "loky"
process_joblib_cfg(joblib_cfg)

log.info(
"Joblib.Parallel({}) is launching {} jobs".format(
",".join([f"{k}={v}" for k, v in joblib_cfg.items()]),
len(job_overrides),
)
)
log.info("Launching jobs, sweep output dir : {}".format(sweep_dir))
for idx, overrides in enumerate(job_overrides):
log.info("\t#{} : {}".format(idx, " ".join(filter_overrides(overrides))))

singleton_state = Singleton.get_state()

runs = Parallel(**joblib_cfg)(
delayed(execute_job)(
initial_job_idx + idx,
overrides,
launcher.hydra_context,
launcher.config,
launcher.task_function,
singleton_state,
if isinstance(job_overrides, ExperimentSequence):
log.info(
"Joblib.Parallel({}) is launching {} jobs".format(
",".join([f"{k}={v}" for k, v in joblib_cfg.items()]),
'generator of',
)
)
batch_size = v if (v := joblib_cfg['n_jobs']) != -1 else mp.cpu_count()
runs = []
overrides = []
for idx, overrides in enumerate(_batch_sequence(job_overrides, batch_size)):
results = Parallel(**joblib_cfg)(
delayed(execute_job)(
initial_job_idx + idx,
override,
launcher.hydra_context,
launcher.config,
launcher.task_function,
singleton_state,
)
for override in overrides
)
for experiment_result in zip(overrides, results):
job_overrides.update_sequence(experiment_result)
else:
log.info(
"Joblib.Parallel({}) is launching {} jobs".format(
",".join([f"{k}={v}" for k, v in joblib_cfg.items()]),
len(job_overrides),
)
)
log.info("Launching jobs, sweep output dir : {}".format(sweep_dir))
for idx, overrides in enumerate(job_overrides):
log.info("\t#{} : {}".format(idx, " ".join(filter_overrides(overrides))))

runs = Parallel(**joblib_cfg)(
delayed(execute_job)(
initial_job_idx + idx,
overrides,
launcher.hydra_context,
launcher.config,
launcher.task_function,
singleton_state,
)
for idx, overrides in enumerate(job_overrides)
)
for idx, overrides in enumerate(job_overrides)
)

assert isinstance(runs, List)
for run in runs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from hydra.core.utils import JobReturn
from hydra.plugins.launcher import Launcher
from hydra.plugins.experiment_sequence import ExperimentSequence
from hydra.types import HydraContext, TaskFunction
from omegaconf import DictConfig

Expand Down Expand Up @@ -45,3 +46,12 @@ def launch(
return _core.launch(
launcher=self, job_overrides=job_overrides, initial_job_idx=initial_job_idx
)

def launch_experiment_sequence(
self, job_overrides: ExperimentSequence, initial_job_idx: int
) -> Sequence[JobReturn]:
from . import _core

return _core.launch(
launcher=self, job_overrides=job_overrides, initial_job_idx=initial_job_idx
)
3 changes: 3 additions & 0 deletions plugins/hydra_loky_launcher/MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
global-exclude *.pyc
global-exclude __pycache__
recursive-include hydra_plugins/* *.yaml py.typed
1 change: 1 addition & 0 deletions plugins/hydra_loky_launcher/NEWS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

3 changes: 3 additions & 0 deletions plugins/hydra_loky_launcher/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Hydra loky Launcher
Provides a [loky](link) based Hydra Launcher supporting parallel worker pool execution.

9 changes: 9 additions & 0 deletions plugins/hydra_loky_launcher/example/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
defaults:
- override hydra/launcher: loky

task: 1

hydra:
launcher:
# override the number of jobs for loky
max_workers: 10
20 changes: 20 additions & 0 deletions plugins/hydra_loky_launcher/example/my_app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved
import logging
import os
import time

import hydra
from omegaconf import DictConfig

log = logging.getLogger(__name__)


@hydra.main(config_name="config")
def my_app(cfg: DictConfig) -> None:
log.info(f"Process ID {os.getpid()} executing task {cfg.task} ...")

time.sleep(1)


if __name__ == "__main__":
my_app()
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved

__version__ = "1.2.0"
Loading