Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: batch time limit #125

Merged
merged 1 commit into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

# 0.13.2

* feat: batch time limit

# 0.13.1

* feat: add `BatchProcess` and related classes
Expand Down
5 changes: 5 additions & 0 deletions packages/rekapager-core/src/Batch/AbstractBatchProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Rekalogika\Rekapager\Batch\Event\BeforeProcessEvent;
use Rekalogika\Rekapager\Batch\Event\InterruptEvent;
use Rekalogika\Rekapager\Batch\Event\ItemEvent;
use Rekalogika\Rekapager\Batch\Event\TimeLimitEvent;

/**
* @template TKey of array-key
Expand Down Expand Up @@ -53,4 +54,8 @@ public function afterPage(AfterPageEvent $event): void
public function onInterrupt(InterruptEvent $event): void
{
}

public function onTimeLimit(TimeLimitEvent $event): void
{
}
}
25 changes: 23 additions & 2 deletions packages/rekapager-core/src/Batch/BatchProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use Rekalogika\Rekapager\Batch\Event\BeforeProcessEvent;
use Rekalogika\Rekapager\Batch\Event\InterruptEvent;
use Rekalogika\Rekapager\Batch\Event\ItemEvent;
use Rekalogika\Rekapager\Batch\Event\TimeLimitEvent;
use Rekalogika\Rekapager\Contracts\PageIdentifierEncoderResolverInterface;

/**
Expand Down Expand Up @@ -56,10 +57,13 @@ final public function stop(): bool
/**
* @param int<1,max>|null $pageSize
*/
final public function process(
final public function run(
?string $resume = null,
?int $pageSize = null
?int $pageSize = null,
?int $timeLimit = null,
): void {
$startTime = time();

// determine start page identifier

if ($resume !== null) {
Expand Down Expand Up @@ -88,6 +92,21 @@ final public function process(
$pageIdentifier = $page->getPageIdentifier();
$pageIdentifierString = $this->pageableIdentifierResolver->encode($pageIdentifier);

// check time limit

if ($timeLimit !== null && time() - $startTime >= $timeLimit) {
$timeLimitEvent = new TimeLimitEvent(
pageable: $pageable,
nextPageIdentifier: $pageIdentifierString,
);

$this->batchProcessor->onTimeLimit($timeLimitEvent);

return;
}

// check stop flag

if ($this->stopFlag) {
$interruptEvent = new InterruptEvent(
pageable: $pageable,
Expand All @@ -99,6 +118,8 @@ final public function process(
return;
}

// process page

$beforePageEvent = new BeforePageEvent(
page: $page,
encodedPageIdentifier: $pageIdentifierString,
Expand Down
6 changes: 6 additions & 0 deletions packages/rekapager-core/src/Batch/BatchProcessorDecorator.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Rekalogika\Rekapager\Batch\Event\BeforeProcessEvent;
use Rekalogika\Rekapager\Batch\Event\InterruptEvent;
use Rekalogika\Rekapager\Batch\Event\ItemEvent;
use Rekalogika\Rekapager\Batch\Event\TimeLimitEvent;

/**
* @template TKey of array-key
Expand Down Expand Up @@ -68,4 +69,9 @@ public function onInterrupt(InterruptEvent $event): void
{
$this->decorated->onInterrupt($event);
}

public function onTimeLimit(TimeLimitEvent $event): void
{
$this->decorated->onTimeLimit($event);
}
}
6 changes: 6 additions & 0 deletions packages/rekapager-core/src/Batch/BatchProcessorInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Rekalogika\Rekapager\Batch\Event\BeforeProcessEvent;
use Rekalogika\Rekapager\Batch\Event\InterruptEvent;
use Rekalogika\Rekapager\Batch\Event\ItemEvent;
use Rekalogika\Rekapager\Batch\Event\TimeLimitEvent;

/**
* @template TKey of array-key
Expand Down Expand Up @@ -60,4 +61,9 @@ public function afterPage(AfterPageEvent $event): void;
* @param InterruptEvent<TKey,T> $event
*/
public function onInterrupt(InterruptEvent $event): void;

/**
* @param TimeLimitEvent<TKey,T> $event
*/
public function onTimeLimit(TimeLimitEvent $event): void;
}
45 changes: 45 additions & 0 deletions packages/rekapager-core/src/Batch/Event/TimeLimitEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

declare(strict_types=1);

/*
* This file is part of rekalogika/rekapager package.
*
* (c) Priyadi Iman Nurcahyo <https://rekalogika.dev>
*
* For the full copyright and license information, please view the LICENSE file
* that was distributed with this source code.
*/

namespace Rekalogika\Rekapager\Batch\Event;

use Rekalogika\Contracts\Rekapager\PageableInterface;

/**
* @template TKey of array-key
* @template T
*/
final class TimeLimitEvent
{
/**
* @param PageableInterface<TKey,T> $pageable
*/
public function __construct(
private readonly PageableInterface $pageable,
private readonly ?string $nextPageIdentifier,
) {
}

/**
* @return PageableInterface<TKey,T>
*/
public function getPageable(): PageableInterface
{
return $this->pageable;
}

public function getNextPageIdentifier(): ?string
{
return $this->nextPageIdentifier;
}
}
21 changes: 18 additions & 3 deletions packages/rekapager-symfony-bridge/src/Batch/BatchCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ abstract class BatchCommand extends Command implements SignalableCommandInterfac
private ?SymfonyStyle $io = null;
private ?BatchProcessFactoryInterface $batchProcessFactory = null;

public function __construct(
) {
public function __construct()
{
parent::__construct();

$this->addOption('resume', 'r', InputOption::VALUE_OPTIONAL, 'Page identifier to resume from');
$this->addOption('pagesize', 'p', InputOption::VALUE_OPTIONAL, 'Batch/page/chunk size');
$this->addOption('progress-file', 'f', InputOption::VALUE_OPTIONAL, 'Temporary file to store progress data');
$this->addOption('time-limit', 't', InputOption::VALUE_OPTIONAL, 'Runs the batch up to the specified time limit (in seconds)');
}

#[Required]
Expand Down Expand Up @@ -78,6 +79,7 @@ final protected function execute(InputInterface $input, OutputInterface $output)
$resume = $input->getOption('resume');
$pageSize = $input->getOption('pagesize');
$progressFile = $input->getOption('progress-file');
$timeLimit = $input->getOption('time-limit');

/** @psalm-suppress TypeDoesNotContainType */
if (!\is_string($resume) && $resume !== null) {
Expand All @@ -98,6 +100,15 @@ final protected function execute(InputInterface $input, OutputInterface $output)
throw new InvalidArgumentException('Invalid progress-file option');
}

if (!is_numeric($timeLimit) && $timeLimit !== null) {
throw new InvalidArgumentException('Invalid time-limit option');
}

if ($timeLimit !== null) {
$timeLimit = (int) $timeLimit;
\assert($timeLimit > 0);
}

// check resuming

if ($progressFile !== null && file_exists($progressFile) && $resume === null) {
Expand All @@ -124,7 +135,11 @@ final protected function execute(InputInterface $input, OutputInterface $output)
batchProcessor: $batchProcessor,
);

$this->batchProcess->process($resume, $pageSize);
$this->batchProcess->run(
resume: $resume,
pageSize: $pageSize,
timeLimit: $timeLimit,
);

return Command::SUCCESS;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use Rekalogika\Rekapager\Batch\Event\BeforeProcessEvent;
use Rekalogika\Rekapager\Batch\Event\InterruptEvent;
use Rekalogika\Rekapager\Batch\Event\ItemEvent;
use Rekalogika\Rekapager\Batch\Event\TimeLimitEvent;
use Symfony\Component\Console\Helper\Helper;
use Symfony\Component\Console\Helper\ProgressIndicator;
use Symfony\Component\Console\Style\SymfonyStyle;
Expand Down Expand Up @@ -143,6 +144,12 @@ public function processItem(ItemEvent $itemEvent): void

public function onInterrupt(InterruptEvent $event): void
{
$nextPageIdentifier = $event->getNextPageIdentifier();

if ($this->progressFile !== null && $nextPageIdentifier !== null) {
file_put_contents($this->progressFile, $nextPageIdentifier);
}

$this->decorated->onInterrupt($event);

$nextPageIdentifier = $event->getNextPageIdentifier();
Expand All @@ -164,11 +171,35 @@ public function onInterrupt(InterruptEvent $event): void
$this->showStats($event);
}

public function onTimeLimit(TimeLimitEvent $event): void
{
$nextPageIdentifier = $event->getNextPageIdentifier();

if ($this->progressFile !== null && $nextPageIdentifier !== null) {
file_put_contents($this->progressFile, $nextPageIdentifier);
}

$this->decorated->onTimeLimit($event);

$nextPageIdentifier = $event->getNextPageIdentifier();

if ($nextPageIdentifier !== null) {
$this->io->warning(sprintf(
'Time limit reached. To resume, use the argument "-r %s"',
$nextPageIdentifier
));
} else {
$this->io->error('Time limit reached, but there does not seem to be a next page identifier for you to resume');
}

$this->showStats($event);
}

/**
* @param AfterPageEvent<TKey,T>|AfterProcessEvent<TKey,T>|InterruptEvent<TKey,T> $event
* @param AfterPageEvent<TKey,T>|AfterProcessEvent<TKey,T>|InterruptEvent<TKey,T>|TimeLimitEvent<TKey,T> $event
* @return void
*/
private function showStats(AfterPageEvent|AfterProcessEvent|InterruptEvent $event): void
private function showStats(AfterPageEvent|AfterProcessEvent|InterruptEvent|TimeLimitEvent $event): void
{
if ($event instanceof AfterPageEvent) {
$this->io->writeln('');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Rekalogika\Rekapager\Batch\Event\BeforePageEvent;
use Rekalogika\Rekapager\Batch\Event\BeforeProcessEvent;
use Rekalogika\Rekapager\Batch\Event\InterruptEvent;
use Rekalogika\Rekapager\Batch\Event\TimeLimitEvent;

/**
* @template TKey of array-key
Expand Down Expand Up @@ -57,4 +58,8 @@ public function afterPage(AfterPageEvent $event): void
public function onInterrupt(InterruptEvent $event): void
{
}

public function onTimeLimit(TimeLimitEvent $event): void
{
}
}