Skip to content

Commit

Permalink
Fix serialization bwc broken in #511 (#515)
Browse files Browse the repository at this point in the history
  • Loading branch information
gtopper authored Apr 21, 2024
1 parent bbc644f commit 5c9ff31
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions storey/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,16 +797,17 @@ def __init__(self, status, body):

class _ConcurrentJobExecution(Flow):
_BACKOFF_MAX = 120
_DEFAULT_MAX_IN_FLIGHT = 8

def __init__(self, max_in_flight=None, retries=None, backoff_factor=None, **kwargs):
Flow.__init__(self, **kwargs)
if max_in_flight is not None and max_in_flight < 1:
raise ValueError(f"max_in_flight may not be less than 1 (got {max_in_flight})")
self.max_in_flight = max_in_flight
self.retries = retries
self.backoff_factor = backoff_factor

self._max_in_flight = max_in_flight or 8
self._queue_size = self._max_in_flight - 1
self._queue_size = (max_in_flight or self._DEFAULT_MAX_IN_FLIGHT) - 1

def _init(self):
super()._init()
Expand Down Expand Up @@ -963,9 +964,9 @@ def __init__(

self._executor = None
if concurrency_mechanism == "threading":
self._executor = ThreadPoolExecutor(max_workers=self._max_in_flight)
self._executor = ThreadPoolExecutor(max_workers=self.max_in_flight or self._DEFAULT_MAX_IN_FLIGHT)
elif concurrency_mechanism == "multiprocessing":
self._executor = ProcessPoolExecutor(max_workers=self._max_in_flight)
self._executor = ProcessPoolExecutor(max_workers=self.max_in_flight or self._DEFAULT_MAX_IN_FLIGHT)

self._pass_context = pass_context

Expand Down

0 comments on commit 5c9ff31

Please sign in to comment.