Skip to content

Commit

Permalink
feat: add wait
Browse files Browse the repository at this point in the history
  • Loading branch information
crimx committed Apr 30, 2024
1 parent 513943f commit c200763
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 36 deletions.
77 changes: 41 additions & 36 deletions src/async-seq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ export interface AsyncSeqFn {

export interface AsyncSeqOptions {
/**
* Max size of the sequence.
* Max size of the sequence. Default unlimited.
*/
window?: number;
/**
* Tail is dropped by default if the sequence is full.
* Set this to true to drop the head instead.
* New tasks are added to the sequence tail. By default they are dropped if the sequence is full.
* Set this to `true` to drop old tasks from sequence head instead.
*/
dropHead?: boolean;
}
Expand All @@ -27,86 +27,91 @@ export interface AsyncSeqOptions {
* Run async functions in sequence.
*/
export class AsyncSeq {
private readonly window_: number;
private readonly dropHead_: boolean;
readonly #window: number;
readonly #dropHead: boolean;

private readonly fns_: AsyncSeqFn[];
private pRunning_?: Promise<void> | null;
private disposer_?: (() => any | Promise<any>) | null | void;
readonly #fns: AsyncSeqFn[];
#pRunning?: Promise<void> | null;
#disposer?: (() => any | Promise<any>) | null | void;

public constructor(options?: AsyncSeqOptions) {
this.fns_ = [];
this.window_ = options?.window ?? Infinity;
this.dropHead_ = options?.dropHead ?? false;
this.#fns = [];
this.#window = options?.window ?? Infinity;
this.#dropHead = options?.dropHead ?? false;
}

/**
* Is sequence running.
*/
public get running(): boolean {
return !!this.pRunning_;
return !!this.#pRunning;
}

/**
* Size of pending tasks in the sequence.
*/
public get size(): number {
return this.fns_.length;
return this.#fns.length;
}

/**
* Is sequence full.
*/
public get full(): boolean {
return this.size >= this.window_;
return this.size >= this.#window;
}

/**
* Add task executors to the sequence.
* @param fns Task executors. Optionally returns a disposer function that cleans up side effects.
* @returns Promise that resolves when all tasks in the sequence are executed.
*/
public async add(...fns: AsyncSeqFn[]): Promise<void> {
this.fns_.push(...fns);
const diff = this.fns_.length - this.window_;
if (diff >= 0) {
this.fns_.splice(this.dropHead_ ? 0 : this.window_, diff);
this.#fns.push(...fns);
const diff = this.#fns.length - this.#window;
if (diff > 0) {
this.#fns.splice(this.#dropHead ? 0 : this.#window, diff);
}
if (!this.pRunning_) {
this.pRunning_ = this.run_();
}
await this.pRunning_;
await (this.#pRunning ||= this.#run());
}

/**
* Wait for the sequence to finish.
*/
public async wait(): Promise<void> {
await this.#pRunning;
}

/**
* Dispose the sequence.
*/
public async dispose(): Promise<void> {
this.fns_.length = 0;
await this.pRunning_;
if (this.disposer_) {
await tryCall(this.disposer_);
this.#fns.length = 0;
await this.#pRunning;
if (this.#disposer) {
await tryCall(this.#disposer);
this.#disposer = null;
}
}

private async run_(): Promise<void> {
async #run(): Promise<void> {
let fn: AsyncSeqFn | undefined;
while ((fn = this.fns_[0])) {
if (this.disposer_) {
await tryCall(this.disposer_);
while ((fn = this.#fns[0])) {
if (this.#disposer) {
await tryCall(this.#disposer);
}
const disposer = await tryCall(fn);
this.disposer_ = disposer;
this.#disposer = disposer;
if (isAbortable(disposer)) {
disposer.abortable(() => {
if (this.disposer_ === disposer) {
this.disposer_ = null;
if (this.#disposer === disposer) {
this.#disposer = null;
}
});
}
this.fns_.shift();
this.#fns.shift();
}

this.pRunning_ = null;
this.#pRunning = null;
}
}

Expand Down
14 changes: 14 additions & 0 deletions test/async-seq.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,20 @@ it("should drop item from the tail if the sequence is full", async () => {
}
});

it("should wait for the sequence to finish", async () => {
const s = seq();
const spy = vi.fn(async () => {
await new Promise(resolve => setTimeout(resolve, 100));
});
s.add(spy);
expect(s.running).toBe(true);
await s.wait();
expect(s.running).toBe(false);
expect(spy).toBeCalledTimes(1);
await s.dispose();
expect(spy).toBeCalledTimes(1);
});

it("should drop item from the head if the sequence is full", async () => {
const window = 3;
const s = seq({ window, dropHead: true });
Expand Down

0 comments on commit c200763

Please sign in to comment.