From 295647cf9d7ddc93b9b516a45098765084a0c2d8 Mon Sep 17 00:00:00 2001 From: Brent Layne Date: Mon, 29 Jan 2024 03:25:26 -0500 Subject: [PATCH] fix(clustered pubsub): check that `client.isOpen` before calling `client.disconnect()` when unsubscribing (#2687) * Confirm the client isOpen before disconnecting * Write tests * fix tests * fix tests --------- Co-authored-by: Leibale Eidelman --- packages/client/lib/cluster/cluster-slots.ts | 4 +-- packages/client/lib/cluster/index.spec.ts | 27 ++++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/packages/client/lib/cluster/cluster-slots.ts b/packages/client/lib/cluster/cluster-slots.ts index b540c2fa85f..b1cc49b4c82 100644 --- a/packages/client/lib/cluster/cluster-slots.ts +++ b/packages/client/lib/cluster/cluster-slots.ts @@ -562,7 +562,7 @@ export default class RedisClusterSlots< const client = await this.getPubSubClient(); await unsubscribe(client); - if (!client.isPubSubActive) { + if (!client.isPubSubActive && client.isOpen) { await client.disconnect(); this.pubSubNode = undefined; } @@ -613,7 +613,7 @@ export default class RedisClusterSlots< const client = await master.pubSubClient; await unsubscribe(client); - if (!client.isPubSubActive) { + if (!client.isPubSubActive && client.isOpen) { await client.disconnect(); master.pubSubClient = undefined; } diff --git a/packages/client/lib/cluster/index.spec.ts b/packages/client/lib/cluster/index.spec.ts index 8200375056a..569d716272a 100644 --- a/packages/client/lib/cluster/index.spec.ts +++ b/packages/client/lib/cluster/index.spec.ts @@ -235,6 +235,18 @@ describe('Cluster', () => { assert.equal(cluster.pubSubNode, undefined); }, GLOBAL.CLUSTERS.OPEN); + + testUtils.testWithCluster('concurrent UNSUBSCRIBE does not throw an error (#2685)', async cluster => { + const listener = spy(); + await Promise.all([ + cluster.subscribe('1', listener), + cluster.subscribe('2', listener) + ]); + await Promise.all([ + cluster.unsubscribe('1', listener), + cluster.unsubscribe('2', listener) + ]); + }, GLOBAL.CLUSTERS.OPEN); testUtils.testWithCluster('psubscribe & punsubscribe', async cluster => { const listener = spy(); @@ -323,6 +335,21 @@ describe('Cluster', () => { minimumDockerVersion: [7] }); + testUtils.testWithCluster('concurrent SUNSUBCRIBE does not throw an error (#2685)', async cluster => { + const listener = spy(); + await Promise.all([ + await cluster.sSubscribe('1', listener), + await cluster.sSubscribe('2', listener) + ]); + await Promise.all([ + cluster.sUnsubscribe('1', listener), + cluster.sUnsubscribe('2', listener) + ]); + }, { + ...GLOBAL.CLUSTERS.OPEN, + minimumDockerVersion: [7] + }); + testUtils.testWithCluster('should handle sharded-channel-moved events', async cluster => { const SLOT = 10328, migrating = cluster.slots[SLOT].master,