Skip to content

Commit

Permalink
fix: dispose stale tasks (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
crimx authored Apr 30, 2024
1 parent ba71f95 commit eea3872
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 9 deletions.
24 changes: 15 additions & 9 deletions src/async-seq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,18 +98,24 @@ export class AsyncSeq {
let fn: AsyncSeqFn | undefined;
while ((fn = this.#fns[0])) {
if (this.#disposer) {
await tryCall(this.#disposer);
tryCall(this.#disposer);
this.#disposer = null;
}
const disposer = await tryCall(fn);
this.#disposer = disposer;
if (isAbortable(disposer)) {
disposer.abortable(() => {
if (this.#disposer === disposer) {
this.#disposer = null;
}
});
if (fn === this.#fns[0]) {
this.#disposer = disposer;
if (isAbortable(disposer)) {
disposer.abortable(() => {
if (this.#disposer === disposer) {
this.#disposer = null;
}
});
}
this.#fns.shift();
} else {
// stale task
disposer?.();
}
this.#fns.shift();
}
this.#pRunning = null;
}
Expand Down
110 changes: 110 additions & 0 deletions test/async-seq.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,57 @@ it("should drop item from the tail if the sequence is full", async () => {
}
});

it("should drop item from the tail when adding to a sequence", async () => {
const window = 3;
const s = seq({ window });

const disposers2 = Array.from({ length: 10 }).map(() => vi.fn());
const spies2 = Array.from({ length: 10 }).map((_, i) =>
vi.fn(async () => {
await new Promise(resolve => setTimeout(resolve, 100));
return disposers2[i];
})
);

const disposers1 = Array.from({ length: 3 }).map(() => vi.fn());
const spies1 = Array.from({ length: 3 }).map((_, i) =>
vi.fn(async () => {
if (i === 1) {
s.add(...spies2);
}
await new Promise(resolve => setTimeout(resolve, 100));
return disposers1[i];
})
);

const p = s.add(...spies1);
expect(s.size).toBe(window);
expect(s.full).toBe(true);
expect(s.running).toBe(true);

await p;
expect(s.size).toBe(0);
expect(s.full).toBe(false);
expect(s.running).toBe(false);

for (const [i, spy] of spies1.entries()) {
expect(spy, `spy${i}`).toBeCalledTimes(1);
}

for (const [i, disposer] of disposers1.entries()) {
expect(disposer, `disposer${i}`).toBeCalledTimes(1);
}

for (const [i, spy] of spies2.entries()) {
expect(spy, `spy${i}`).toBeCalledTimes(i === 0 ? 1 : 0);
}

await s.dispose();
for (const [i, disposer] of disposers2.entries()) {
expect(disposer, `disposer${i}`).toBeCalledTimes(i === 0 ? 1 : 0);
}
});

it("should wait for the sequence to finish", async () => {
const s = seq();
const spy = vi.fn(async () => {
Expand Down Expand Up @@ -97,6 +148,65 @@ it("should drop item from the head if the sequence is full", async () => {
}
});

it("should drop item from the head when adding to a sequence", async () => {
const window = 3;
const s = seq({ window, dropHead: true });
const disposers2 = Array.from({ length: 10 }).map(() => vi.fn());
const spies2 = Array.from({ length: 10 }).map((_, i) =>
vi.fn(async () => {
await new Promise(resolve => setTimeout(resolve, 100));
return disposers2[i];
})
);
const disposers1 = Array.from({ length: 3 }).map(() => vi.fn());
const spies1 = Array.from({ length: 3 }).map((_, i) =>
vi.fn(async () => {
if (i === 1) {
s.add(...spies2);
}
await new Promise(resolve => setTimeout(resolve, 100));
return disposers1[i];
})
);

const p = s.add(...spies1);
expect(s.size).toBe(window);
expect(s.full).toBe(true);
expect(s.running).toBe(true);

await p;
expect(s.size).toBe(0);
expect(s.full).toBe(false);
expect(s.running).toBe(false);

for (const [i, spy] of spies1.entries()) {
expect(spy, `spy${i} of 0-${spies1.length - 1}`).toBeCalledTimes(
i <= 1 ? 1 : 0
);
}

for (const [i, disposer] of disposers1.entries()) {
expect(
disposer,
`disposer${i} of 0-${disposers1.length - 1}`
).toBeCalledTimes(i <= 1 ? 1 : 0);
}

for (const [i, spy] of spies2.entries()) {
expect(spy, `spy${i} of 0-${spies2.length - 1}`).toBeCalledTimes(
i >= spies2.length - 3 ? 1 : 0
);
}

await s.dispose();
for (const [i, disposer] of disposers2.entries()) {
expect(
disposer,
`disposer${i} of 0-${disposers2.length - 1}`
).toBeCalledTimes(i >= spies2.length - 3 ? 1 : 0);
}
});

it("should catch error in tasks", async () => {
const spy = vi
.spyOn(globalThis.console, "error")
Expand Down

0 comments on commit eea3872

Please sign in to comment.