Skip to content
This repository has been archived by the owner on Dec 7, 2021. It is now read-only.

Commit

Permalink
Merge pull request #118 from chunfuchen/enhance/job_autorecover
Browse files Browse the repository at this point in the history
enable job auto-recovery for remote backends
  • Loading branch information
woodsp-ibm authored Sep 17, 2018
2 parents b7f0036 + 4034209 commit 12355f4
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 87 deletions.
16 changes: 7 additions & 9 deletions qiskit_aqua/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,27 @@

from .utils import cnx
from .algorithmerror import AlgorithmError
from .preferences import Preferences
from .operator import Operator
from .preferences import Preferences
from .quantumalgorithm import QuantumAlgorithm
from ._discover import (refresh_pluggables,
local_pluggables_types,
local_pluggables,
get_pluggable_configuration)


__version__ = '0.2.0'

__all__ = ['AlgorithmError',
'Preferences',
'Operator',
'Preferences',
'QuantumAlgorithm',
'refresh_pluggables',
'local_pluggables_types',
'local_pluggables',
'get_pluggable_configuration']
'get_pluggable_configuration',
'run_algorithm',
'run_algorithm_to_json']

from ._discover import _PLUGGABLES

Expand All @@ -61,9 +64,4 @@
exec(prefix + method)
__all__.append(method)

from .algomethods import run_algorithm
__all__.append('run_algorithm')
from .algomethods import run_algorithm_to_json
__all__.append('run_algorithm_to_json')
from .operator import Operator
__all__.append('Operator')
from .algomethods import run_algorithm, run_algorithm_to_json
68 changes: 18 additions & 50 deletions qiskit_aqua/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@
from scipy import sparse as scisparse
from scipy import linalg as scila
from qiskit import QuantumRegister, ClassicalRegister, QuantumCircuit
from qiskit import execute as q_execute
from qiskit.tools.qi.pauli import Pauli, label_to_pauli, sgn_prod
from qiskit.qasm import pi

from qiskit_aqua import AlgorithmError
from qiskit_aqua.utils import PauliGraph, summarize_circuits
from qiskit_aqua.utils import PauliGraph, summarize_circuits, run_circuits

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -564,12 +563,9 @@ def _eval_with_statevector(self, operator_mode, input_circuit, backend, execute_
if self._dia_matrix is None:
self._to_dia_matrix(mode='matrix')

job = q_execute(input_circuit, backend=backend, **execute_config)

if self._summarize_circuits and logger.isEnabledFor(logging.DEBUG):
logger.debug(summarize_circuits(input_circuit))

result = job.result()
result = run_circuits(input_circuit, backend=backend, execute_config=execute_config,
max_circuits_per_job=self.MAX_CIRCUITS_PER_JOB,
show_circuit_summary=self._summarize_circuits)
quantum_state = np.asarray(result.get_statevector(input_circuit))

if self._dia_matrix is not None:
Expand All @@ -581,8 +577,10 @@ def _eval_with_statevector(self, operator_mode, input_circuit, backend, execute_
self._check_representation("paulis")
n_qubits = self.num_qubits

input_job = q_execute(input_circuit, backend=backend, **execute_config)
simulator_initial_state = np.asarray(input_job.result().get_statevector(input_circuit))
result = run_circuits(input_circuit, backend=backend, execute_config=execute_config,
max_circuits_per_job=self.MAX_CIRCUITS_PER_JOB,
show_circuit_summary=self._summarize_circuits)
simulator_initial_state = np.asarray(result.get_statevector(input_circuit))

temp_config = copy.deepcopy(execute_config)

Expand Down Expand Up @@ -613,20 +611,9 @@ def _eval_with_statevector(self, operator_mode, input_circuit, backend, execute_
if len(circuit) != 0:
circuits_to_simulate.append(circuit)

jobs = []
chunks = int(np.ceil(len(circuits_to_simulate) / self.MAX_CIRCUITS_PER_JOB))
for i in range(chunks):
sub_circuits = circuits_to_simulate[i*self.MAX_CIRCUITS_PER_JOB:(i+1)*self.MAX_CIRCUITS_PER_JOB]
jobs.append(q_execute(sub_circuits, backend=backend, **temp_config))

if self._summarize_circuits and logger.isEnabledFor(logging.DEBUG):
logger.debug(summarize_circuits(circuits_to_simulate))

results = []
for job in jobs:
results.append(job.result())
if len(results) != 0:
result = reduce(lambda x, y: x + y, results)
result = run_circuits(circuits_to_simulate, backend=backend, execute_config=temp_config,
max_circuits_per_job=self.MAX_CIRCUITS_PER_JOB,
show_circuit_summary=self._summarize_circuits)

for idx, pauli in enumerate(self._paulis):
circuit = all_circuits[idx]
Expand Down Expand Up @@ -654,6 +641,7 @@ def _eval_multiple_shots(self, operator_mode, input_circuit, backend, execute_co
Returns:
float, float: mean and standard deviation of evaluation results
"""

num_shots = execute_config.get("shots", 1)
avg, std_dev, variance = 0.0, 0.0, 0.0
n_qubits = self.num_qubits
Expand Down Expand Up @@ -683,19 +671,9 @@ def _eval_multiple_shots(self, operator_mode, input_circuit, backend, execute_co

circuits.append(circuit)

jobs = []
chunks = int(np.ceil(len(circuits) / self.MAX_CIRCUITS_PER_JOB))
for i in range(chunks):
sub_circuits = circuits[i*self.MAX_CIRCUITS_PER_JOB:(i+1)*self.MAX_CIRCUITS_PER_JOB]
jobs.append(q_execute(sub_circuits, backend=backend, **execute_config))

if self._summarize_circuits and logger.isEnabledFor(logging.DEBUG):
logger.debug(summarize_circuits(circuits))

results = []
for job in jobs:
results.append(job.result(**qjob_config))
result = reduce(lambda x, y: x + y, results)
result = run_circuits(circuits, backend=backend, execute_config=execute_config,
qjob_config=qjob_config, max_circuits_per_job=self.MAX_CIRCUITS_PER_JOB,
show_circuit_summary=self._summarize_circuits)

avg_paulis = []
for idx, pauli in enumerate(self._paulis):
Expand Down Expand Up @@ -724,19 +702,9 @@ def _eval_multiple_shots(self, operator_mode, input_circuit, backend, execute_co
circuits.append(circuit)

# Execute all the stacked quantum circuits - one for each TPB set
jobs = []
chunks = int(np.ceil(len(circuits) / self.MAX_CIRCUITS_PER_JOB))
for i in range(chunks):
sub_circuits = circuits[i*self.MAX_CIRCUITS_PER_JOB:(i+1)*self.MAX_CIRCUITS_PER_JOB]
jobs.append(q_execute(sub_circuits, backend=backend, **execute_config))

if self._summarize_circuits and logger.isEnabledFor(logging.DEBUG):
logger.debug(summarize_circuits(circuits))

results = []
for job in jobs:
results.append(job.result(**qjob_config))
result = reduce(lambda x, y: x + y, results)
result = run_circuits(circuits, backend=backend, execute_config=execute_config,
qjob_config=qjob_config, max_circuits_per_job=self.MAX_CIRCUITS_PER_JOB,
show_circuit_summary=self._summarize_circuits)

for tpb_idx, tpb_set in enumerate(self._grouped_paulis):
avg_paulis = []
Expand Down
33 changes: 7 additions & 26 deletions qiskit_aqua/quantumalgorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,17 @@
from abc import ABC, abstractmethod
import logging
import sys
import functools

import numpy as np
from qiskit import __version__ as qiskit_version
from qiskit import register as q_register
from qiskit import unregister as q_unregister
from qiskit import registered_providers as q_registered_providers
from qiskit import execute as q_execute
from qiskit import available_backends, get_backend
from qiskit.backends.ibmq import IBMQProvider

from qiskit_aqua import AlgorithmError
from qiskit_aqua.utils import summarize_circuits
from qiskit_aqua.utils import run_circuits
from qiskit_aqua import Preferences

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -195,31 +193,14 @@ def execute(self, circuits):
circuits (QuantumCircuit or list[QuantumCircuit]): circuits to execute
Returns:
Result or [Result]: Result objects it will be a list if number of circuits
exceed the maximum number (300)
Result: Result object
"""

if not isinstance(circuits, list):
circuits = [circuits]
jobs = []
chunks = int(np.ceil(len(circuits) / self.MAX_CIRCUITS_PER_JOB))
for i in range(chunks):
sub_circuits = circuits[i *
self.MAX_CIRCUITS_PER_JOB:(i + 1) * self.MAX_CIRCUITS_PER_JOB]
jobs.append(q_execute(sub_circuits, self._backend,
**self._execute_config))

if logger.isEnabledFor(logging.DEBUG) and self._show_circuit_summary:
logger.debug(summarize_circuits(circuits))

result = run_circuits(circuits, self._backend, self._execute_config,
self._qjob_config, max_circuits_per_job=self.MAX_CIRCUITS_PER_JOB,
show_circuit_summary=self._show_circuit_summary)
if self._show_circuit_summary:
self.disable_circuit_summary()

results = []
for job in jobs:
results.append(job.result(**self._qjob_config))

result = functools.reduce(lambda x, y: x + y, results)
return result

@staticmethod
Expand All @@ -233,7 +214,7 @@ def register_and_get_operational_backends(*args, provider_class=IBMQProvider, **
break
except Exception as e:
logger.debug(
"Failed to unregister provider '{}' with Qiskit: {}".format(provider_class,str(e)))
"Failed to unregister provider '{}' with Qiskit: {}".format(provider_class, str(e)))

preferences = Preferences()
if args or kwargs or preferences.get_token() is not None:
Expand All @@ -243,7 +224,7 @@ def register_and_get_operational_backends(*args, provider_class=IBMQProvider, **
"Provider '{}' registered with Qiskit successfully.".format(provider_class))
except Exception as e:
logger.debug(
"Failed to register provider '{}' with Qiskit: {}".format(provider_class,str(e)))
"Failed to register provider '{}' with Qiskit: {}".format(provider_class, str(e)))

backends = available_backends()
backends = [
Expand Down
5 changes: 3 additions & 2 deletions qiskit_aqua/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
split_dataset_to_data_and_labels, map_label_to_class_name,
reduce_dim_to_via_pca)
from .qpsolver import optimize_svm

from .run_circuits import run_circuits

__all__ = ['tensorproduct',
'PauliGraph',
Expand All @@ -46,4 +46,5 @@
'split_dataset_to_data_and_labels',
'map_label_to_class_name',
'reduce_dim_to_via_pca',
'optimize_svm']
'optimize_svm',
'run_circuits']
140 changes: 140 additions & 0 deletions qiskit_aqua/utils/run_circuits.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# -*- coding: utf-8 -*-

# Copyright 2018 IBM.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# =============================================================================

import sys
import logging
import time
import functools

import numpy as np
from qiskit import get_backend, compile as q_compile
from qiskit.backends.jobstatus import JobStatus
from qiskit.backends import JobError

from qiskit_aqua.algorithmerror import AlgorithmError
from qiskit_aqua.utils import summarize_circuits

logger = logging.getLogger(__name__)


def run_circuits(circuits, backend, execute_config, qjob_config={},
max_circuits_per_job=sys.maxsize, show_circuit_summary=False):
"""
An execution wrapper with Qiskit-Terra, with job auto recover capability.
The autorecovery feature is only applied for non-simulator backend.
This wraper will try to get the result no matter how long it costs.
Args:
circuits (QuantumCircuit or list[QuantumCircuit]): circuits to execute
backend (str): name of backend
execute_config (dict): settings for qiskit execute (or compile)
qjob_config (dict): settings for job object, like timeout and wait
max_circuits_per_job (int): the maximum number of job, default is unlimited but 300
is limited if you submit to a remote backend
show_circuit_summary (bool): showing the summary of submitted circuits.
Returns:
Result: Result object
Raises:
AlgorithmError: Any error except for JobError raised by Qiskit Terra
"""

if not isinstance(circuits, list):
circuits = [circuits]

my_backend = get_backend(backend)
with_autorecover = False if my_backend.configuration()['simulator'] else True

qobjs = []
jobs = []
chunks = int(np.ceil(len(circuits) / max_circuits_per_job))

for i in range(chunks):
sub_circuits = circuits[i * max_circuits_per_job:(i + 1) * max_circuits_per_job]
qobj = q_compile(sub_circuits, my_backend, **execute_config)
job = my_backend.run(qobj)
jobs.append(job)
qobjs.append(qobj)

if logger.isEnabledFor(logging.DEBUG) and show_circuit_summary:
logger.debug(summarize_circuits(circuits))

results = []
if with_autorecover:

logger.info("There are {} circuits and they are chunked into "
"{} chunks, each with {} circutis.".format(len(circuits), chunks, max_circuits_per_job))

for idx in range(len(jobs)):
job = jobs[idx]
job_id = job.id()
logger.info("Running {}-th chunk circuits, job id: {}".format(idx, job_id))
while True:
try:
result = job.result(**qjob_config)
if result.status == 'COMPLETED':
results.append(result)
logger.info("COMPLETED the {}-th chunk of circuits, job id: {}".format(idx, job_id))
break
else:
logger.warning("FAILURE: the {}-th chunk of circuits, job id: {}".format(idx, job_id))
except JobError as e:
# if terra raise any error, which means something wrong, re-run it
logger.warning("FAILURE: the {}-th chunk of circuits, job id: {}, "
"Terra job error: {} ".format(idx, job_id, e))
except Exception as e:
raise AlgorithmError("FAILURE: the {}-th chunk of circuits, job id: {}, "
"Terra unknown error: {} ".format(idx, job_id, e)) from e

# keep querying the status until it is okay.
while True:
try:
job_status = job.status()
break
except JobError as e:
logger.warning("FAILURE: job id: {}, "
"status: 'FAIL_TO_GET_STATUS' Terra job error: {}".format(job_id, e))
time.sleep(5)
except Exception as e:
raise AlgorithmError("FAILURE: job id: {}, "
"status: 'FAIL_TO_GET_STATUS' ({})".format(job_id, e)) from e

logger.info("Job status: {}".format(job_status))
# when reach here, it means the job fails. let's check what kinds of failure it is.
if job_status == JobStatus.DONE:
logger.info("Job ({}) is completed anyway, retrieve result from backend.".format(job_id))
job = my_backend.retrieve_job(job_id)
elif job_status == JobStatus.RUNNING or job_status == JobStatus.QUEUED:
logger.info("Job ({}) is {}, but encounter an exception, "
"recover it from backend.".format(job_id, job_status))
job = my_backend.retrieve_job(job_id)
else:
logger.info("Fail to run Job ({}), resubmit it.".format(job_id))
qobj = qobjs[idx]
job = my_backend.run(qobj)
else:
results = []
for job in jobs:
results.append(job.result(**qjob_config))

if len(results) != 0:
result = functools.reduce(lambda x, y: x + y, results)
else:
result = None
return result

0 comments on commit 12355f4

Please sign in to comment.