Skip to content

Commit

Permalink
priorities (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
tlaziuk authored Feb 13, 2018
1 parent 8dc4d00 commit 02fb58e
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 16 deletions.
21 changes: 12 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ The main goal is to provide lightweight and modern library for queuing tasks.
The name was inspired by the [asap](https://github.com/kriskowal/asap) library.
There is already a few libraries with similar functionality, yet this is another one.

| lib | async | sync | concurrency | browser | server | size | license |
| lib | async | sync | concurrency | priority | size | license |
| ---: | :---: | :---: | :---: | :---: | :---: | :---: | :--- |
| [asap-es](https://github.com/tlaziuk/asap-es) | ✔️ | ✔️ | ✔️ | ✔️ | ✔️ | 652 B | MIT |
| [asap](https://github.com/kriskowal/asap) | ✖️ | ✔️ | ✖️ | ✔️ || 848 B | MIT |
| [d3-queue](https://github.com/d3/d3-queue) | ✔️ | ✔️ | ✔️ | ✔️ || 968 B | BSD-3-Clause |
| [aurelia-task-queue](https://github.com/aurelia/task-queue) | ✔️ | ✔️ | ✖️ | ✖️ | ✔️ | 3.11 kB | MIT |
| [kueue](https://github.com/jasonkneen/kueue) | ✔️ | ✔️ | ✖️ | ✔️ || 555 B | Apache 2.0 |
| [asap-es](https://github.com/tlaziuk/asap-es) | ✔️ | ✔️ | ✔️ | ✔️ | 652 B | MIT |
| [asap](https://github.com/kriskowal/asap) | ✖️ | ✔️ | ✖️ | | 848 B | MIT |
| [d3-queue](https://github.com/d3/d3-queue) | ✔️ | ✔️ | ✔️ | | 968 B | BSD-3-Clause |
| [aurelia-task-queue](https://github.com/aurelia/task-queue) | ✔️ | ✔️ | ✖️ | ✖️ | 3.11 kB | MIT |
| [kueue](https://github.com/jasonkneen/kueue) | ✔️ | ✔️ | ✖️ | | 555 B | Apache 2.0 |

## api

| name | description |
| ---: | :--- |
| `new <ctor>()` | create new _asap-es_ instance, concurrency can be passed as argument |
| `<ctor>()` | same as above |
| `new <ctor>(c)` | create new _asap-es_ instance, optinal concurrency can be passed as argument |
| `<ctor>(c)` | same as above |
| `<instance>.c` | the number of tasks to run simultaneously (`1` by default), set to `< 1` to pause the queue |
| `<instance>.q(task)` | enqueue new task, returns a promise which resolves or rejects when execution of the task is finished |
| `<instance>.q(task, priority)` | enqueue a new _task_, returns a promise which resolves or rejects when execution of the task is finished, optionally pass _priority_ |
| _task_ | task is a function which may return a value or a promise (task awaits for promise completion) |

## usage example
Expand Down Expand Up @@ -69,6 +69,9 @@ queue.q(async () => {
// do some async things
});

// task with higher priority
queue.q(() => void 0, -1);

// set concurrency and resume the queue
queue.c = 2;

Expand Down
60 changes: 60 additions & 0 deletions index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,66 @@ describe(ASAP.name, () => {
expect(promSpy.callCount).to.be.equal(1);
}));
}).timeout(1e4);
it("should the task priorities be working", async () => {
const asap = new ASAP();
const spyFn1 = spy(() => delay(5));
const spyFn2 = spy(() => delay(5));
const spyFn3 = spy(() => delay(5));
const spyFn4 = spy(() => delay(5));
const spyFn5 = spy(() => delay(5));
const spyFn6 = spy(() => delay(5));
const spyFn7 = spy(() => delay(5));
await Promise.all([
asap.q(spyFn1),
asap.q(spyFn2, 3),
asap.q(spyFn3, 22),
asap.q(spyFn4, 1),
asap.q(spyFn5, -1),
asap.q(spyFn6, -65),
asap.q(spyFn7, 0),
]);
expect(spyFn1.calledBefore(spyFn5)).to.be.equal(true);
expect(spyFn1.calledBefore(spyFn6)).to.be.equal(true);
expect(spyFn1.calledBefore(spyFn7)).to.be.equal(true);
expect(spyFn2.calledBefore(spyFn3)).to.be.equal(true);
expect(spyFn3.calledAfter(spyFn6)).to.be.equal(true);
expect(spyFn4.calledBefore(spyFn2)).to.be.equal(true);
expect(spyFn5.calledBefore(spyFn3)).to.be.equal(true);
expect(spyFn6.calledBefore(spyFn5)).to.be.equal(true);
expect(spyFn7.calledBefore(spyFn2)).to.be.equal(true);
}).timeout(1e4);
it("should the task priorities be working with concurrency", async () => {
const asap = new ASAP();
asap.c = 2;
const spyFn1 = spy(() => delay(5));
const spyFn2 = spy(() => delay(5));
const spyFn3 = spy(() => delay(5));
const spyFn4 = spy(() => delay(5));
const spyFn5 = spy(() => delay(5));
const spyFn6 = spy(() => delay(5));
const spyFn7 = spy(() => delay(5));
await Promise.all([
asap.q(spyFn1, 3),
asap.q(spyFn2, 2),
asap.q(spyFn3, 1),
asap.q(spyFn4, 0),
asap.q(spyFn5, -1),
asap.q(spyFn6, -2),
asap.q(spyFn7, -3),
]);
expect(spyFn1.calledBefore(spyFn3)).to.be.equal(true);
expect(spyFn1.calledBefore(spyFn4)).to.be.equal(true);
expect(spyFn2.calledBefore(spyFn3)).to.be.equal(true);
expect(spyFn2.calledBefore(spyFn4)).to.be.equal(true);
expect(spyFn7.calledBefore(spyFn3)).to.be.equal(true);
expect(spyFn7.calledBefore(spyFn4)).to.be.equal(true);
expect(spyFn6.calledBefore(spyFn3)).to.be.equal(true);
expect(spyFn6.calledBefore(spyFn4)).to.be.equal(true);
expect(spyFn5.calledBefore(spyFn3)).to.be.equal(true);
expect(spyFn5.calledBefore(spyFn4)).to.be.equal(true);
expect(spyFn7.calledAfter(spyFn1)).to.be.equal(true);
expect(spyFn7.calledAfter(spyFn2)).to.be.equal(true);
}).timeout(1e4);
it("should default concurrency be set", () => {
const asap = new ASAP();
expect(asap.c).to.be.equal(1);
Expand Down
18 changes: 11 additions & 7 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export interface IASAP {
/**
* enqueue a new task
* @param fn task to run
* @param priority task priority, smaller value means higher priority
* @returns a Promise resolves when the task gets executed
*
* example:
Expand All @@ -28,7 +29,7 @@ export interface IASAP {
* }
* ```
*/
q<T>(fn: task<T> | PromiseLike<task<T>>): Promise<T>;
q<T>(fn: task<T> | PromiseLike<task<T>>, priority?: number): Promise<T>;
}

function ASAP(this: any, c: boolean | number = 1): any {
Expand All @@ -47,7 +48,7 @@ function ASAP(this: any, c: boolean | number = 1): any {
/**
* array of functions which returns promises
*/
const heap = [] as Array<() => Promise<any>>;
const heap = [] as Array<[() => Promise<any>, number]>;

/**
* Set of resolved or rejected promise methods
Expand All @@ -66,11 +67,14 @@ function ASAP(this: any, c: boolean | number = 1): any {
if (pending.size < concurrency) {
heap.filter(
// filter the heap to get only not completed nor pending (running) tasks
(v) => !complete.has(v) && !pending.has(v),
([v]) => !complete.has(v) && !pending.has(v),
).sort(
// sort the heap from highest to lowest priority
([, a], [, b]) => a - b,
).slice(
0,
concurrency, // slice the array to the size of concurrency value
).forEach((v) => {
).forEach(([v]) => {
// mark the promise function as pending
pending.add(v);

Expand Down Expand Up @@ -99,7 +103,7 @@ function ASAP(this: any, c: boolean | number = 1): any {
},
},
q: {
value: <T>(fn: task<T> | PromiseLike<task<T>>) => new Promise<T>((resolve, reject) => {
value: <T>(fn: task<T> | PromiseLike<task<T>>, priority?: number) => new Promise<T>((resolve, reject) => {
const promFn = () => {
// create a new promise in case when the `fn` throws anything
const prom = Promise.resolve(fn).then((v) => v());
Expand All @@ -108,8 +112,8 @@ function ASAP(this: any, c: boolean | number = 1): any {
return prom.then(resolve, reject).then(() => { complete.add(promFn); });
};

// push the promise function to the task list
heap.push(promFn);
// push the promise function and priority to the task list
heap.push([promFn, priority || 0]);

// process the task list
process();
Expand Down

0 comments on commit 02fb58e

Please sign in to comment.