Skip to content

Commit

Permalink
Handle underflow condition of network out slot stats metric (valkey-i…
Browse files Browse the repository at this point in the history
…o#840)

Fixes test failure
(https://github.com/valkey-io/valkey/actions/runs/10146979329/job/28056316421?pr=837)
on 32 bit system for slot stats metric underflow on the following
condition:

```
server.cluster->slot_stats[c->slot].network_bytes_out += (len * listLength(server.replicas));
```
* Here listLength accesses `len` which is of `unsigned long` type and
multiplied with `len` (which could be negative). This is a risky
operation and behaves differently based on the architecture.

```
clusterSlotStatsAddNetworkBytesOutForReplication(-sdslen(selectcmd->ptr));
```
* `sdslen` method returns `size_t`. applying `-` operation to decrement
network bytes out is also incorrect.

This change adds assertion on `len` being negative and handles the
wrapping of overall value.

---------

Signed-off-by: Harkrishn Patro <harkrisp@amazon.com>
  • Loading branch information
hpatro authored Jul 30, 2024
1 parent b4d96ca commit aaa7834
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 4 deletions.
16 changes: 15 additions & 1 deletion src/cluster_slot_stats.c
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,28 @@ void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c) {
}

/* Accumulates egress bytes upon sending replication stream. This only applies for primary nodes. */
void clusterSlotStatsAddNetworkBytesOutForReplication(int len) {
static void clusterSlotStatsUpdateNetworkBytesOutForReplication(long long len) {
client *c = server.current_client;
if (c == NULL || !canAddNetworkBytesOut(c)) return;

serverAssert(c->slot >= 0 && c->slot < CLUSTER_SLOTS);
if (len < 0) serverAssert(server.cluster->slot_stats[c->slot].network_bytes_out >= (uint64_t)llabs(len));
server.cluster->slot_stats[c->slot].network_bytes_out += (len * listLength(server.replicas));
}

/* Increment network bytes out for replication stream. This method will increment `len` value times the active replica
* count. */
void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len) {
clusterSlotStatsUpdateNetworkBytesOutForReplication(len);
}

/* Decrement network bytes out for replication stream.
* This is used to remove accounting of data which doesn't belong to any particular slots e.g. SELECT command.
* This will decrement `len` value times the active replica count. */
void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len) {
clusterSlotStatsUpdateNetworkBytesOutForReplication(-len);
}

/* Upon SPUBLISH, two egress events are triggered.
* 1) Internal propagation, for clients that are subscribed to the current node.
* 2) External propagation, for other nodes within the same shard (could either be a primary or replica).
Expand Down
3 changes: 2 additions & 1 deletion src/cluster_slot_stats.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ void clusterSlotStatsResetClusterMsgLength(void);

/* network-bytes-out metric. */
void clusterSlotStatsAddNetworkBytesOutForUserClient(client *c);
void clusterSlotStatsAddNetworkBytesOutForReplication(int len);
void clusterSlotStatsIncrNetworkBytesOutForReplication(long long len);
void clusterSlotStatsDecrNetworkBytesOutForReplication(long long len);
void clusterSlotStatsAddNetworkBytesOutForShardedPubSubInternalPropagation(client *c, int slot);
4 changes: 2 additions & 2 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ void feedReplicationBuffer(char *s, size_t len) {

if (server.repl_backlog == NULL) return;

clusterSlotStatsAddNetworkBytesOutForReplication(len);
clusterSlotStatsIncrNetworkBytesOutForReplication(len);

while (len > 0) {
size_t start_pos = 0; /* The position of referenced block to start sending. */
Expand Down Expand Up @@ -574,7 +574,7 @@ void replicationFeedReplicas(int dictid, robj **argv, int argc) {
/* Although the SELECT command is not associated with any slot,
* its per-slot network-bytes-out accumulation is made by the above function call.
* To cancel-out this accumulation, below adjustment is made. */
clusterSlotStatsAddNetworkBytesOutForReplication(-sdslen(selectcmd->ptr));
clusterSlotStatsDecrNetworkBytesOutForReplication(sdslen(selectcmd->ptr));

if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) decrRefCount(selectcmd);

Expand Down

0 comments on commit aaa7834

Please sign in to comment.