From b51d9df21dc5619eaed5610276178e3352fe832c Mon Sep 17 00:00:00 2001 From: CRIMX Date: Tue, 30 Apr 2024 17:20:01 +0800 Subject: [PATCH] fix: dispose stale tasks --- src/async-seq.ts | 24 +++++---- test/async-seq.test.ts | 110 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 125 insertions(+), 9 deletions(-) diff --git a/src/async-seq.ts b/src/async-seq.ts index 55aedbb..b674732 100644 --- a/src/async-seq.ts +++ b/src/async-seq.ts @@ -98,18 +98,24 @@ export class AsyncSeq { let fn: AsyncSeqFn | undefined; while ((fn = this.#fns[0])) { if (this.#disposer) { - await tryCall(this.#disposer); + tryCall(this.#disposer); + this.#disposer = null; } const disposer = await tryCall(fn); - this.#disposer = disposer; - if (isAbortable(disposer)) { - disposer.abortable(() => { - if (this.#disposer === disposer) { - this.#disposer = null; - } - }); + if (fn === this.#fns[0]) { + this.#disposer = disposer; + if (isAbortable(disposer)) { + disposer.abortable(() => { + if (this.#disposer === disposer) { + this.#disposer = null; + } + }); + } + this.#fns.shift(); + } else { + // stale task + disposer?.(); } - this.#fns.shift(); } this.#pRunning = null; } diff --git a/test/async-seq.test.ts b/test/async-seq.test.ts index 98c1dfb..4cbf96f 100644 --- a/test/async-seq.test.ts +++ b/test/async-seq.test.ts @@ -61,6 +61,57 @@ it("should drop item from the tail if the sequence is full", async () => { } }); +it("should drop item from the tail when adding to a sequence", async () => { + const window = 3; + const s = seq({ window }); + + const disposers2 = Array.from({ length: 10 }).map(() => vi.fn()); + const spies2 = Array.from({ length: 10 }).map((_, i) => + vi.fn(async () => { + await new Promise(resolve => setTimeout(resolve, 100)); + return disposers2[i]; + }) + ); + + const disposers1 = Array.from({ length: 3 }).map(() => vi.fn()); + const spies1 = Array.from({ length: 3 }).map((_, i) => + vi.fn(async () => { + if (i === 1) { + s.add(...spies2); + } + await new Promise(resolve => setTimeout(resolve, 100)); + return disposers1[i]; + }) + ); + + const p = s.add(...spies1); + expect(s.size).toBe(window); + expect(s.full).toBe(true); + expect(s.running).toBe(true); + + await p; + expect(s.size).toBe(0); + expect(s.full).toBe(false); + expect(s.running).toBe(false); + + for (const [i, spy] of spies1.entries()) { + expect(spy, `spy${i}`).toBeCalledTimes(1); + } + + for (const [i, disposer] of disposers1.entries()) { + expect(disposer, `disposer${i}`).toBeCalledTimes(1); + } + + for (const [i, spy] of spies2.entries()) { + expect(spy, `spy${i}`).toBeCalledTimes(i === 0 ? 1 : 0); + } + + await s.dispose(); + for (const [i, disposer] of disposers2.entries()) { + expect(disposer, `disposer${i}`).toBeCalledTimes(i === 0 ? 1 : 0); + } +}); + it("should wait for the sequence to finish", async () => { const s = seq(); const spy = vi.fn(async () => { @@ -97,6 +148,65 @@ it("should drop item from the head if the sequence is full", async () => { } }); +it("should drop item from the head when adding to a sequence", async () => { + const window = 3; + const s = seq({ window, dropHead: true }); + const disposers2 = Array.from({ length: 10 }).map(() => vi.fn()); + const spies2 = Array.from({ length: 10 }).map((_, i) => + vi.fn(async () => { + await new Promise(resolve => setTimeout(resolve, 100)); + return disposers2[i]; + }) + ); + const disposers1 = Array.from({ length: 3 }).map(() => vi.fn()); + const spies1 = Array.from({ length: 3 }).map((_, i) => + vi.fn(async () => { + if (i === 1) { + s.add(...spies2); + } + await new Promise(resolve => setTimeout(resolve, 100)); + return disposers1[i]; + }) + ); + + const p = s.add(...spies1); + expect(s.size).toBe(window); + expect(s.full).toBe(true); + expect(s.running).toBe(true); + + await p; + expect(s.size).toBe(0); + expect(s.full).toBe(false); + expect(s.running).toBe(false); + + for (const [i, spy] of spies1.entries()) { + expect(spy, `spy${i} of 0-${spies1.length - 1}`).toBeCalledTimes( + i <= 1 ? 1 : 0 + ); + } + + for (const [i, disposer] of disposers1.entries()) { + expect( + disposer, + `disposer${i} of 0-${disposers1.length - 1}` + ).toBeCalledTimes(i <= 1 ? 1 : 0); + } + + for (const [i, spy] of spies2.entries()) { + expect(spy, `spy${i} of 0-${spies2.length - 1}`).toBeCalledTimes( + i >= spies2.length - 3 ? 1 : 0 + ); + } + + await s.dispose(); + for (const [i, disposer] of disposers2.entries()) { + expect( + disposer, + `disposer${i} of 0-${disposers2.length - 1}` + ).toBeCalledTimes(i >= spies2.length - 3 ? 1 : 0); + } +}); + it("should catch error in tasks", async () => { const spy = vi .spyOn(globalThis.console, "error")