-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
78 additions
and
68 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,85 +1,97 @@ | ||
// tslint:disable:eofline | ||
export type task<T = any> = () => T | PromiseLike<T>; | ||
|
||
export default class ASAP { | ||
/** key for heap of all promises */ | ||
const heap = Symbol(); | ||
type THeap = Array<() => Promise<any>>; | ||
|
||
/** key for process method */ | ||
const process = Symbol(); | ||
type TProcess = () => void; | ||
|
||
/** key for pending promises */ | ||
const pending = Symbol(); | ||
type TPending = THeap; | ||
|
||
/** key for completed promises */ | ||
const complete = Symbol(); | ||
type TComplete = WeakMap<() => Promise<any>, Promise<any>>; | ||
|
||
/** key for completed promises */ | ||
const concurrency = Symbol(); | ||
|
||
export default class { | ||
/** | ||
* array of functions which returns promises | ||
*/ | ||
protected queue = [] as Array<() => Promise<any>>; | ||
// @ts-ignore no built-in symbol | ||
protected [heap] = [] as Array<() => Promise<any>>; | ||
|
||
/** | ||
* WeakMap of resolved or rejected promises | ||
*/ | ||
protected prom = new WeakMap<() => Promise<any>, Promise<any>>(); | ||
// @ts-ignore no built-in symbol | ||
protected [complete] = new WeakMap<() => Promise<any>, Promise<any>>(); | ||
|
||
/** | ||
* array of pending/running promise methods | ||
*/ | ||
protected pending = [] as Array<() => Promise<any>>; | ||
// @ts-ignore no built-in symbol | ||
protected [pending] = [] as Array<() => Promise<any>>; | ||
|
||
/** | ||
* concurrency protected var | ||
*/ | ||
protected c: number = 1; | ||
// @ts-ignore no built-in symbol | ||
protected [concurrency]: number = 1; | ||
|
||
/** | ||
* set concurrency | ||
*/ | ||
public set concurrency(v: number) { | ||
public set c(v: number) { | ||
if (v < 1) { | ||
throw new Error(`concurrency can not be lower than 1`); | ||
} | ||
this.c = v; | ||
this.process(); | ||
(this as any)[concurrency] = v; | ||
(this as any)[process](); | ||
} | ||
|
||
/** | ||
* get concurrency | ||
*/ | ||
public get concurrency(): number { | ||
return this.c; | ||
public get c(): number { | ||
return (this as any)[concurrency]; | ||
} | ||
|
||
/** | ||
* enqueue a new task | ||
* @param fn function to run | ||
*/ | ||
public enqueue<T>(fn: () => T | PromiseLike<T>): Promise<T> { | ||
public q<T>(fn: task<T>): Promise<T> { | ||
return new Promise<T>((resolve, reject) => { | ||
const promFn = async () => { | ||
const promFn = () => { | ||
const prom = new Promise<T>((result) => { result(fn()); }); | ||
prom.then(resolve, reject); | ||
try { | ||
await prom; | ||
} catch { | ||
// pass | ||
} finally { | ||
this.prom.set(promFn, prom); | ||
} | ||
const delFn = () => { (this as any)[complete].set(promFn, prom); }; | ||
return prom.then(resolve, reject).then(delFn, delFn); | ||
}; | ||
this.queue.push(promFn); | ||
this.process(); | ||
(this as any)[heap].push(promFn); | ||
(this as any)[process](); | ||
}); | ||
} | ||
|
||
/** | ||
* process the queue | ||
*/ | ||
public process(): void { | ||
if (this.pending.filter((v) => v).length < this.c) { | ||
this.queue.filter( | ||
(v) => !this.prom.has(v) && this.pending.indexOf(v) < 0, | ||
).slice(0, this.c).map(async (v) => { | ||
this.pending.push(v); | ||
try { | ||
await v(); | ||
} catch { | ||
// pass | ||
} finally { | ||
delete this.pending[this.pending.indexOf(v)]; | ||
} | ||
protected [process](): void { | ||
if (((this as any)[pending] as TPending).filter((v) => v).length < (this as any)[concurrency]) { | ||
((this as any)[heap] as THeap).filter( | ||
(v) => !(this as any)[complete].has(v) && (this as any)[pending].indexOf(v) < 0, | ||
).slice(0, (this as any)[concurrency]).map((v) => { | ||
(this as any)[pending].push(v); | ||
const delFn = () => { delete (this as any)[pending][(this as any)[pending].indexOf(v)]; }; | ||
return v().then(delFn, delFn); | ||
}).forEach((prom) => { | ||
prom.then(() => { this.process(); }); | ||
prom.then(() => { (this as any)[process](); }); | ||
}); | ||
} | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters