Skip to content

Commit

Permalink
fix(clustered pubsub): check that client.isOpen before calling `cli…
Browse files Browse the repository at this point in the history
…ent.disconnect()` when unsubscribing (#2687)

* Confirm the client isOpen before disconnecting

* Write tests

* fix tests

* fix tests

---------

Co-authored-by: Leibale Eidelman <me@leibale.com>
  • Loading branch information
BrentLayne and leibale authored Jan 29, 2024
1 parent 5a96058 commit 295647c
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
4 changes: 2 additions & 2 deletions packages/client/lib/cluster/cluster-slots.ts
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ export default class RedisClusterSlots<
const client = await this.getPubSubClient();
await unsubscribe(client);

if (!client.isPubSubActive) {
if (!client.isPubSubActive && client.isOpen) {
await client.disconnect();
this.pubSubNode = undefined;
}
Expand Down Expand Up @@ -613,7 +613,7 @@ export default class RedisClusterSlots<
const client = await master.pubSubClient;
await unsubscribe(client);

if (!client.isPubSubActive) {
if (!client.isPubSubActive && client.isOpen) {
await client.disconnect();
master.pubSubClient = undefined;
}
Expand Down
27 changes: 27 additions & 0 deletions packages/client/lib/cluster/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,18 @@ describe('Cluster', () => {

assert.equal(cluster.pubSubNode, undefined);
}, GLOBAL.CLUSTERS.OPEN);

testUtils.testWithCluster('concurrent UNSUBSCRIBE does not throw an error (#2685)', async cluster => {
const listener = spy();
await Promise.all([
cluster.subscribe('1', listener),
cluster.subscribe('2', listener)
]);
await Promise.all([
cluster.unsubscribe('1', listener),
cluster.unsubscribe('2', listener)
]);
}, GLOBAL.CLUSTERS.OPEN);

testUtils.testWithCluster('psubscribe & punsubscribe', async cluster => {
const listener = spy();
Expand Down Expand Up @@ -323,6 +335,21 @@ describe('Cluster', () => {
minimumDockerVersion: [7]
});

testUtils.testWithCluster('concurrent SUNSUBCRIBE does not throw an error (#2685)', async cluster => {
const listener = spy();
await Promise.all([
await cluster.sSubscribe('1', listener),
await cluster.sSubscribe('2', listener)
]);
await Promise.all([
cluster.sUnsubscribe('1', listener),
cluster.sUnsubscribe('2', listener)
]);
}, {
...GLOBAL.CLUSTERS.OPEN,
minimumDockerVersion: [7]
});

testUtils.testWithCluster('should handle sharded-channel-moved events', async cluster => {
const SLOT = 10328,
migrating = cluster.slots[SLOT].master,
Expand Down

0 comments on commit 295647c

Please sign in to comment.