Skip to content

Commit

Permalink
Add new SFLUSH Command to Cluster for Slot-Based Flushing
Browse files Browse the repository at this point in the history
  • Loading branch information
moticless committed Sep 23, 2024
1 parent ac03e37 commit d179b2b
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 22 deletions.
12 changes: 8 additions & 4 deletions src/bio.c
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ static pthread_mutex_t bio_mutex_comp;
static int job_comp_pipe[2]; /* Pipe used to awake the event loop */

typedef struct bio_comp_item {
comp_fn *func; /* callback after completion job will be processed */
uint64_t arg; /* user data to be passed to the function */
comp_fn *func; /* callback after completion job will be processed */
uint64_t arg; /* user data to be passed to the function */
void *ptr; /* user pointer to be passed to the function */
} bio_comp_item;

/* This structure represents a background Job. It is only used locally to this
Expand Down Expand Up @@ -110,6 +111,7 @@ typedef union bio_job {
int type; /* header */
comp_fn *fn; /* callback. Handover to main thread to cb as notify for job completion */
uint64_t arg; /* callback arguments */
void *ptr; /* callback pointer */
} comp_rq;
} bio_job;

Expand Down Expand Up @@ -200,7 +202,7 @@ void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {
bioSubmitJob(BIO_LAZY_FREE, job);
}

void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_data) {
void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_data, void *user_ptr) {
int type;
switch (assigned_worker) {
case BIO_WORKER_CLOSE_FILE:
Expand All @@ -219,6 +221,7 @@ void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_
bio_job *job = zmalloc(sizeof(*job));
job->comp_rq.fn = func;
job->comp_rq.arg = user_data;
job->comp_rq.ptr = user_ptr;
bioSubmitJob(type, job);
}

Expand Down Expand Up @@ -339,6 +342,7 @@ void *bioProcessBackgroundJobs(void *arg) {
bio_comp_item *comp_rsp = zmalloc(sizeof(bio_comp_item));
comp_rsp->func = job->comp_rq.fn;
comp_rsp->arg = job->comp_rq.arg;
comp_rsp->ptr = job->comp_rq.ptr;

/* just write it to completion job responses */
pthread_mutex_lock(&bio_mutex_comp);
Expand Down Expand Up @@ -432,7 +436,7 @@ void bioPipeReadJobCompList(aeEventLoop *el, int fd, void *privdata, int mask) {
listNode *ln = listFirst(tmp_list);
bio_comp_item *rsp = ln->value;
listDelNode(tmp_list, ln);
rsp->func(rsp->arg);
rsp->func(rsp->arg, rsp->ptr);
zfree(rsp);
}
listRelease(tmp_list);
Expand Down
4 changes: 2 additions & 2 deletions src/bio.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#define __BIO_H

typedef void lazy_free_fn(void *args[]);
typedef void comp_fn(uint64_t user_data);
typedef void comp_fn(uint64_t user_data, void *user_ptr);

typedef enum bio_worker_t {
BIO_WORKER_CLOSE_FILE = 0,
Expand Down Expand Up @@ -40,7 +40,7 @@ void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache);
void bioCreateCloseAofJob(int fd, long long offset, int need_reclaim_cache);
void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache);
void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...);
void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_data);
void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_data, void *user_ptr);


#endif
118 changes: 118 additions & 0 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1558,6 +1558,124 @@ void readonlyCommand(client *c) {
addReply(c,shared.ok);
}

void replySlotsFlush(client *c, SlotsFlush *sflush) {
addReplyArrayLen(c, sflush->numRanges);
for (int i = 0 ; i < sflush->numRanges ; i++) {
addReplyArrayLen(c, 2);
addReplyLongLong(c, sflush->ranges[i].first);
addReplyLongLong(c, sflush->ranges[i].last);
}
zfree(sflush);
}

/* Partially flush destination DB in a cluster node, based on the slot range.
*
* Usage: SFLUSH <start-slot> <end slot> [<start-slot> <end slot>]* [SYNC|ASYNC]
*
* This is an initial implementation of SFLUSH (slots flush) which is limited to
* flushing a single shard as a whole, but in the future the same command may be
* used to partially flush a shard based on hash slots. Currently only if provided
* slots cover entirely the slots of a node, the node will be flushed and the
* return value will be pairs of slot ranges and the node. Otherwise, a single empty
* set will be returned. If possible, SFLUSH SYNC will be run as blocking ASYNC.
*/
void sflushCommand(client *c) {
int flags = EMPTYDB_NO_FLAGS, argc = c->argc;

if (server.cluster_enabled == 0) {
addReplyError(c,"This instance has cluster support disabled");
return;
}

/* check if last argument is SYNC or ASYNC */
if (!strcasecmp(c->argv[c->argc-1]->ptr,"sync")) {
argc--;
} else if (!strcasecmp(c->argv[c->argc-1]->ptr,"async")) {
flags |= EMPTYDB_ASYNC;
argc--;
}

/* parse the slot range */
if (argc % 2 == 0) {
addReplyErrorArity(c);
return;
}

/* Verify <first, last> slot pairs are valid and not overlapping */
long long j, first, last;
unsigned char slotsToFlushRq[CLUSTER_SLOTS] = {0};
for (j = 1; j < argc; j += 2) {
/* check if the first slot is valid */
if (getLongLongFromObject(c->argv[j], &first) != C_OK || first < 0 || first >= CLUSTER_SLOTS) {
addReplyError(c,"Invalid or out of range slot");
return;
}

/* check if the last slot is valid */
if (getLongLongFromObject(c->argv[j+1], &last) != C_OK || last < 0 || last >= CLUSTER_SLOTS) {
addReplyError(c,"Invalid or out of range slot");
return;
}

if (first > last) {
addReplyErrorFormat(c,"start slot number %lld is greater than end slot number %lld", first, last);
return;
}

/* Mark the slots in slotsToFlushRq[] */
for (int i = first; i <= last; i++) {
if (slotsToFlushRq[i]) {
addReplyErrorFormat(c, "Slot %d specified multiple times", i);
return;
}
slotsToFlushRq[i] = 1;
}
}

/* Verify slotsToFlushRq[] covers ALL slots of myNode. */
clusterNode *myNode = getMyClusterNode();
/* During iteration trace also the slot range pairs and save in SlotsFlush
* Allocated on heap since there is a chance that FLUSH SYNC will be run as
* blocking ASYNC and only later reply with slot ranges */
int capacity = 32; /* Initial capacity */
SlotsFlush *sflush = zmalloc(sizeof(SlotsFlush) + sizeof(SlotRange) * capacity);
sflush->numRanges = 0;
int inSlotRange = 0;
for (int i = 0; i < CLUSTER_SLOTS; i++) {
if (myNode == getNodeBySlot(i)) {
if (!slotsToFlushRq[i]) {
addReplySetLen(c, 0); /* Not all slots of my node got covered */
zfree(sflush);
return;
}

if (!inSlotRange) { /* If start another slot range */
sflush->ranges[sflush->numRanges].first = i;
inSlotRange = 1;
}
} else {
if (inSlotRange) { /* If end another slot range */
sflush->ranges[sflush->numRanges++].last = i - 1;
inSlotRange = 0;
/* If reached 'sflush' capacity, double the capacity */
if (sflush->numRanges >= capacity) {
capacity *= 2;
sflush = zrealloc(sflush, sizeof(SlotsFlush) + sizeof(SlotRange) * capacity);
}
}
}
}

/* Update last pair if last cluster slot is also end of last range */
if (inSlotRange) sflush->ranges[sflush->numRanges++].last = CLUSTER_SLOTS - 1;
/* Flush all selected slots */
if (flushCommandCommon(c, FLUSH_TYPE_SLOTS, EMPTYDB_NO_FLAGS, sflush) == 0) {
/* If not running as blocking ASYNC, reply with ranges */
replySlotsFlush(c, sflush);
zfree(sflush);
}
}

/* The READWRITE command just clears the READONLY command state. */
void readwriteCommand(client *c) {
if (server.cluster_enabled == 0) {
Expand Down
65 changes: 65 additions & 0 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,34 @@ struct COMMAND_ARG CLUSTER_SETSLOT_Args[] = {
{MAKE_ARG("subcommand",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,4,NULL),.subargs=CLUSTER_SETSLOT_subcommand_Subargs},
};

/********** CLUSTER SFLUSH ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
/* CLUSTER SFLUSH history */
#define CLUSTER_SFLUSH_History NULL
#endif

#ifndef SKIP_CMD_TIPS_TABLE
/* CLUSTER SFLUSH tips */
#define CLUSTER_SFLUSH_Tips NULL
#endif

#ifndef SKIP_CMD_KEY_SPECS_TABLE
/* CLUSTER SFLUSH key specs */
#define CLUSTER_SFLUSH_Keyspecs NULL
#endif

/* CLUSTER SFLUSH range argument table */
struct COMMAND_ARG CLUSTER_SFLUSH_range_Subargs[] = {
{MAKE_ARG("start-slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("end-slot",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/* CLUSTER SFLUSH argument table */
struct COMMAND_ARG CLUSTER_SFLUSH_Args[] = {
{MAKE_ARG("range",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_MULTIPLE,2,NULL),.subargs=CLUSTER_SFLUSH_range_Subargs},
};

/********** CLUSTER SHARDS ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
Expand Down Expand Up @@ -970,6 +998,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = {
{MAKE_CMD("saveconfig","Forces a node to save the cluster configuration to disk.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SAVECONFIG_History,0,CLUSTER_SAVECONFIG_Tips,0,clusterCommand,2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SAVECONFIG_Keyspecs,0,NULL,0)},
{MAKE_CMD("set-config-epoch","Sets the configuration epoch for a new node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SET_CONFIG_EPOCH_History,0,CLUSTER_SET_CONFIG_EPOCH_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SET_CONFIG_EPOCH_Keyspecs,0,NULL,1),.args=CLUSTER_SET_CONFIG_EPOCH_Args},
{MAKE_CMD("setslot","Binds a hash slot to a node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SETSLOT_History,0,CLUSTER_SETSLOT_Tips,0,clusterCommand,-4,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SETSLOT_Keyspecs,0,NULL,2),.args=CLUSTER_SETSLOT_Args},
{MAKE_CMD("sflush","Flush selected range of slots","O(N)+O(K) where N is the total number of keys in K selected slots.","8.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SFLUSH_History,0,CLUSTER_SFLUSH_Tips,0,clusterCommand,-4,CMD_ADMIN|CMD_STALE,0,CLUSTER_SFLUSH_Keyspecs,0,NULL,1),.args=CLUSTER_SFLUSH_Args},
{MAKE_CMD("shards","Returns the mapping of cluster slots to shards.","O(N) where N is the total number of cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SHARDS_History,0,CLUSTER_SHARDS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SHARDS_Keyspecs,0,NULL,0)},
{MAKE_CMD("slaves","Lists the replica nodes of a master node.","O(N) where N is the number of replicas.","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER REPLICAS`","5.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLAVES_History,0,CLUSTER_SLAVES_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_SLAVES_Keyspecs,0,NULL,1),.args=CLUSTER_SLAVES_Args},
{MAKE_CMD("slots","Returns the mapping of cluster slots to nodes.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_DEPRECATED,"`CLUSTER SHARDS`","7.0.0","cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SLOTS_History,2,CLUSTER_SLOTS_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_SLOTS_Keyspecs,0,NULL,0)},
Expand Down Expand Up @@ -7735,6 +7764,41 @@ struct COMMAND_ARG RESTORE_ASKING_Args[] = {
#define SAVE_Keyspecs NULL
#endif

/********** SFLUSH ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
/* SFLUSH history */
#define SFLUSH_History NULL
#endif

#ifndef SKIP_CMD_TIPS_TABLE
/* SFLUSH tips */
#define SFLUSH_Tips NULL
#endif

#ifndef SKIP_CMD_KEY_SPECS_TABLE
/* SFLUSH key specs */
#define SFLUSH_Keyspecs NULL
#endif

/* SFLUSH data argument table */
struct COMMAND_ARG SFLUSH_data_Subargs[] = {
{MAKE_ARG("slot-start",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("slot-last",ARG_TYPE_INTEGER,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/* SFLUSH flush_type argument table */
struct COMMAND_ARG SFLUSH_flush_type_Subargs[] = {
{MAKE_ARG("async",ARG_TYPE_PURE_TOKEN,-1,"ASYNC",NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("sync",ARG_TYPE_PURE_TOKEN,-1,"SYNC",NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/* SFLUSH argument table */
struct COMMAND_ARG SFLUSH_Args[] = {
{MAKE_ARG("data",ARG_TYPE_BLOCK,-1,NULL,NULL,NULL,CMD_ARG_MULTIPLE,2,NULL),.subargs=SFLUSH_data_Subargs},
{MAKE_ARG("flush-type",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,2,NULL),.subargs=SFLUSH_flush_type_Subargs},
};

/********** SHUTDOWN ********************/

#ifndef SKIP_CMD_HISTORY_TABLE
Expand Down Expand Up @@ -11130,6 +11194,7 @@ struct COMMAND_STRUCT redisCommandTable[] = {
{MAKE_CMD("restore-asking","An internal command for migrating keys in a cluster.","O(1) to create the new key and additional O(N*M) to reconstruct the serialized value, where N is the number of Redis objects composing the value and M their average size. For small string values the time complexity is thus O(1)+O(1*M) where M is small, so simply O(1). However for sorted set values the complexity is O(N*M*log(N)) because inserting values into sorted sets is O(log(N)).","3.0.0",CMD_DOC_SYSCMD,NULL,NULL,"server",COMMAND_GROUP_SERVER,RESTORE_ASKING_History,3,RESTORE_ASKING_Tips,0,restoreCommand,-4,CMD_WRITE|CMD_DENYOOM|CMD_ASKING,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,RESTORE_ASKING_Keyspecs,1,NULL,7),.args=RESTORE_ASKING_Args},
{MAKE_CMD("role","Returns the replication role.","O(1)","2.8.12",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,ROLE_History,0,ROLE_Tips,0,roleCommand,1,CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_SENTINEL,ACL_CATEGORY_ADMIN|ACL_CATEGORY_DANGEROUS,ROLE_Keyspecs,0,NULL,0)},
{MAKE_CMD("save","Synchronously saves the database(s) to disk.","O(N) where N is the total number of keys in all databases","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,SAVE_History,0,SAVE_Tips,0,saveCommand,1,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT|CMD_NO_MULTI,0,SAVE_Keyspecs,0,NULL,0)},
{MAKE_CMD("sflush","Remove all keys from selected range of slots.","O(N)+O(k) where N is the number of keys and k is the number of slots.","8.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,SFLUSH_History,0,SFLUSH_Tips,0,sflushCommand,-3,CMD_WRITE,ACL_CATEGORY_KEYSPACE|ACL_CATEGORY_DANGEROUS,SFLUSH_Keyspecs,0,NULL,2),.args=SFLUSH_Args},
{MAKE_CMD("shutdown","Synchronously saves the database(s) to disk and shuts down the Redis server.","O(N) when saving, where N is the total number of keys in all databases when saving data, otherwise O(1)","1.0.0",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,SHUTDOWN_History,1,SHUTDOWN_Tips,0,shutdownCommand,-1,CMD_ADMIN|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_NO_MULTI|CMD_SENTINEL|CMD_ALLOW_BUSY,0,SHUTDOWN_Keyspecs,0,NULL,4),.args=SHUTDOWN_Args},
{MAKE_CMD("slaveof","Sets a Redis server as a replica of another, or promotes it to being a master.","O(1)","1.0.0",CMD_DOC_DEPRECATED,"`REPLICAOF`","5.0.0","server",COMMAND_GROUP_SERVER,SLAVEOF_History,0,SLAVEOF_Tips,0,replicaofCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_NOSCRIPT|CMD_STALE,0,SLAVEOF_Keyspecs,0,NULL,1),.args=SLAVEOF_Args},
{MAKE_CMD("slowlog","A container for slow log commands.","Depends on subcommand.","2.2.12",CMD_DOC_NONE,NULL,NULL,"server",COMMAND_GROUP_SERVER,SLOWLOG_History,0,SLOWLOG_Tips,0,NULL,-2,0,0,SLOWLOG_Keyspecs,0,NULL,0),.subcommands=SLOWLOG_Subcommands},
Expand Down
Loading

0 comments on commit d179b2b

Please sign in to comment.