Skip to content

Commit

Permalink
Merge pull request #39 from myckhel/ft-job_cancel
Browse files Browse the repository at this point in the history
add feature: job cancellation
  • Loading branch information
SimonErm authored Feb 24, 2021
2 parents 36463ce + 3453fde commit be3f2dc
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 29 deletions.
59 changes: 58 additions & 1 deletion docs/classes/queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ queue.addJob("testWorker",{text:"Job example palyoad content text",delay:5000})
### Methods

* [addJob](queue.md#addjob)
* [cancelJob](queue.md#canceljob)
* [addWorker](queue.md#addworker)
* [configure](queue.md#configure)
* [getJobs](queue.md#getjobs)
Expand Down Expand Up @@ -112,10 +113,66 @@ Name | Type | Default |
`priority` | number | 0 |
`timeout` | number | 0 |

***note*** if a job timeout and it has `react-native-job-queue` `CANCEL` method then the cancel method will be invoked

`Default value` **startQueue**: *boolean*= true

**Returns:** *string*

___

### canceljob
**cancelJob**(`jobId`: string, `exception`: Error): *void*

*Defined in [Queue.ts:191](https://github.com/SimonErm/react-native-job-queue/blob/acf0a20/src/Queue.ts#L191)*

cancel running job by id

**Parameters:**

Name | Type | Description |
------ | ------ | ------ |
`jobId` | *string* | currently running job id |
`exception?` | Error<*any*> | Error object |

**Returns:** *void*

***note*** before you cancel a job, you must have implemented a cancel method for the returned promise of the [Worker](worker.md).
see example below

**Example:**
```jsx
import queue, {Worker, CANCEL} from 'react-native-job-queue';

queue.addWorker(
new Worker('testWorker', (payload) => {
let cancel
const promise = new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
console.log(payload);
resolve();
}, 5000);

cancel = () => {
clearTimeout(timeout)
reject({message: 'canceled'})
}
});

promise[CANCEL] = cancel
return promise
},{
onStart: ({id}) => {
/* cancel the job after 2sec */
setTimeout(() => {
queue.cancelJob(id, {message: 'Canceled'})
}, 2000)
},
})
);


```
___

### addWorker
Expand Down Expand Up @@ -203,4 +260,4 @@ ___

stop the queue from executing queued jobs

**Returns:** *void*
**Returns:** *void*
36 changes: 30 additions & 6 deletions example/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ import * as React from 'react';
import { Button, View } from 'react-native';

import queue from '../src/Queue';
import { Worker } from '../src/Worker';
import { Worker, CANCEL, CancellablePromise } from '../src/Worker';

export interface IAppProps {}

export interface IAppState {}
export interface IAppState {jobId?: string|null}
let counter = 0;
export default class App extends React.Component<IAppProps, IAppState> {
constructor(props: IAppProps) {
super(props);
this.state = {};
this.state = {
jobId: null,
};
}
componentDidMount() {
queue.configure({
Expand All @@ -20,13 +22,25 @@ export default class App extends React.Component<IAppProps, IAppState> {
}
});
queue.addWorker(
new Worker('testWorker', async (payload) => {
return new Promise((resolve) => {
setTimeout(() => {
new Worker('testWorker', (payload) => {
let cancel
const promise: CancellablePromise<any> = new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
console.log(payload);
resolve();
}, 5000);

cancel = () => {
clearTimeout(timeout)
reject(new Error('canceled'))
}
});

promise[CANCEL] = cancel
return promise
},{
onStart: ({id}) => this.setState({jobId: id}),
onCompletion: () => this.setState({jobId: null}),
})
);
}
Expand All @@ -39,6 +53,16 @@ export default class App extends React.Component<IAppProps, IAppState> {
queue.addJob('testWorker', { id: counter++ }, undefined, false);
}}
/>
<Button
title='cancel Job'
onPress={() => {
if(this.state.jobId){
queue.cancelJob(this.state.jobId, {message: 'Canceled'})
} else {
console.log("no job running");
}
}}
/>
<Button
title='start Queue'
onPress={() => {
Expand Down
40 changes: 37 additions & 3 deletions src/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { NativeModules } from 'react-native';
import { FALSE, Job, RawJob } from './models/Job';
import { JobStore } from './models/JobStore';
import { Uuid } from './utils/Uuid';
import { Worker } from './Worker';
import { Worker, CANCEL } from './Worker';

/**
* Options to configure the queue
Expand Down Expand Up @@ -75,10 +75,12 @@ export class Queue {
private onQueueFinish: (executedJobs: Array<Job<any>>) => void;

private queuedJobExecuter: any[] = [];
private runningJobPromises: {[key: string]: any};

private constructor() {
this.jobStore = NativeModules.JobQueue;
this.workers = {};
this.runningJobPromises = {};
this.isActive = false;

this.timeoutId = 0;
Expand Down Expand Up @@ -136,6 +138,7 @@ export class Queue {
* @param [payload={}] payload which is passed as parameter to the executer
* @param [options={ attempts: 0, timeout: 0, priority: 0 }] options to set max attempts, a timeout and a priority
* @param [startQueue=true] if set to false the queue won't start automaticly when adding a job
* @returns job id
*/
addJob<P extends object>(
workerName: string,
Expand All @@ -144,8 +147,9 @@ export class Queue {
startQueue: boolean = true
) {
const { attempts = 0, timeout = 0, priority = 0 } = options;
const id: string = Uuid.v4();
const job: RawJob = {
id: Uuid.v4(),
id,
payload: JSON.stringify(payload || {}),
metaData: JSON.stringify({ failedAttempts: 0, errors: [] }),
active: FALSE,
Expand All @@ -164,6 +168,8 @@ export class Queue {
if (startQueue && !this.isActive) {
this.start();
}

return id;
}
/**
* starts the queue to execute queued jobs
Expand All @@ -182,6 +188,20 @@ export class Queue {
stop() {
this.isActive = false;
}

/**
* cancel running job
*/
cancelJob(jobId: string, exception?: Error) {
const promise = this.runningJobPromises[jobId]
if (promise && typeof promise[CANCEL] === 'function') {
promise[CANCEL](exception || new Error(`canceled`))
} else if(!promise[CANCEL]){
console.warn("Worker does not have a cancel method implemented")
} else {
throw new Error(`Job with id ${jobId} not currently running`);
}
}
private resetActiveJob = async (job: RawJob) => {
this.jobStore.updateJob({ ...job, ...{ active: FALSE } });
};
Expand Down Expand Up @@ -277,14 +297,25 @@ export class Queue {
}

private excuteJob = async (rawJob: RawJob) => {
const worker = this.workers[rawJob.workerName];
const payload = JSON.parse(rawJob.payload);
const job = { ...rawJob, ...{ payload } };

try {
this.activeJobCount++;
if (!this.workers[rawJob.workerName]) {
throw new Error(`Missing worker with name ${rawJob.workerName}`);
}
await this.workers[rawJob.workerName].execute(rawJob);
const promise = worker.execute(rawJob);

this.runningJobPromises[rawJob.id] = promise
await promise

worker.triggerSuccess(job)

this.jobStore.removeJob(rawJob);
} catch (error) {
worker.triggerFailure(job, error);
const { attempts } = rawJob;
// tslint:disable-next-line: prefer-const
let { errors, failedAttempts } = JSON.parse(rawJob.metaData);
Expand All @@ -296,6 +327,9 @@ export class Queue {
const metaData = JSON.stringify({ errors: [...errors, error], failedAttempts });
this.jobStore.updateJob({ ...rawJob, ...{ active: FALSE, metaData, failed } });
} finally {
delete this.runningJobPromises[job.id]
worker.decreaseExecutionCount();
worker.triggerCompletion(job);
this.executedJobs.push(rawJob);
this.activeJobCount--;
}
Expand Down
64 changes: 46 additions & 18 deletions src/Worker.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import { Job, RawJob } from './models/Job';

export const CANCEL = 'rn_job_queue_cancel'

export interface WorkerOptions<P extends object> {
onStart?: (job: Job<P>) => void;
onSuccess?: (job: Job<P>) => void;
onFailure?: (job: Job<P>, error: Error) => void;
onCompletion?: (job: Job<P>) => void;
concurrency?: number;
}

export interface CancellablePromise<T> extends Promise<T> {
rn_job_queue_cancel?: () => void
}
/**
* @typeparam P specifies the Type of the Job-Payload.
*/
Expand All @@ -15,7 +21,7 @@ export class Worker<P extends object> {
public readonly concurrency: number;

private executionCount: number;
private executer: (payload: P) => Promise<any>;
private executer: (payload: P) => CancellablePromise<any>;

private onStart: (job: Job<P>) => void;
private onSuccess: (job: Job<P>) => void;
Expand Down Expand Up @@ -66,33 +72,55 @@ export class Worker<P extends object> {
* This method should not be invoked manually and is used by the queue to execute jobs
* @param job to be executed
*/
async execute(rawJob: RawJob) {
execute(rawJob: RawJob) {
const { timeout } = rawJob;
const payload: P = JSON.parse(rawJob.payload);
const job = { ...rawJob, ...{ payload } };
this.executionCount++;
try {
this.onStart(job);
if (timeout > 0) {
await this.executeWithTimeout(job, timeout);
} else {
await this.executer(payload);
}
this.onSuccess(job);
} catch (error) {
this.onFailure(job, error);
throw error;
} finally {
this.executionCount--;
this.onCompletion(job);
this.onStart(job);
if (timeout > 0) {
return this.executeWithTimeout(job, timeout);
} else {
return this.executer(payload);
}
}
private async executeWithTimeout(job: Job<P>, timeout: number) {
private executeWithTimeout(job: Job<P>, timeout: number) {
let cancel
const promise: CancellablePromise<any> = new Promise(async (resolve, reject) => {
const timeoutPromise = new Promise((resolve, reject) => {
setTimeout(() => {
reject(new Error(`Job ${job.id} timed out`));
}, timeout);
});
await Promise.race([timeoutPromise, this.executer(job.payload)]);
const executerPromise = this.executer(job.payload)
if (executerPromise) {
cancel = executerPromise[CANCEL]
try {
await Promise.race([timeoutPromise, executerPromise]);
resolve()
} catch (error) {
// cancel task if has cancel method
if (executerPromise[CANCEL] && typeof executerPromise[CANCEL] === 'function') {
executerPromise[CANCEL]!()
}
reject(error);
}
}
})
promise[CANCEL] = cancel
return promise
}

triggerSuccess(job: Job<P>) {
this.onSuccess(job)
}
triggerFailure(job: Job<P>, error: Error) {
this.onFailure(job, error)
}
triggerCompletion(job: Job<P>) {
this.onCompletion(job)
}
decreaseExecutionCount() {
this.executionCount--;
}
}
Loading

0 comments on commit be3f2dc

Please sign in to comment.