Skip to content

Commit

Permalink
Merge branch 'unstable' into hash-field-expiry
Browse files Browse the repository at this point in the history
  • Loading branch information
moticless committed Apr 7, 2024
2 parents f0c0481 + 4581d43 commit b2a4c38
Show file tree
Hide file tree
Showing 15 changed files with 367 additions and 56 deletions.
135 changes: 125 additions & 10 deletions src/bio.c
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
/* Background I/O service for Redis.
*
* This file implements operations that we need to perform in the background.
* Currently there is only a single operation, that is a background close(2)
* system call. This is needed as when the process is the last owner of a
* reference to a file closing it means unlinking it, and the deletion of the
* file is slow, blocking the server.
* Currently there are 3 operations:
* 1) a background close(2) system call. This is needed when the process is
* the last owner of a reference to a file closing it means unlinking it, and
* the deletion of the file is slow, blocking the server.
* 2) AOF fsync
* 3) lazyfree of memory
*
* In the future we'll either continue implementing new things we need or
* we'll switch to libeio. However there are probably long term uses for this
* file as we may want to put here Redis specific background tasks (for instance
* it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL
* implementation).
* file as we may want to put here Redis specific background tasks.
*
* DESIGN
* ------
Expand All @@ -26,8 +26,13 @@
* least-recently-inserted to the most-recently-inserted (older jobs processed
* first).
*
* Currently there is no way for the creator of the job to be notified about
* the completion of the operation, this will only be added when/if needed.
* To let the creator of the job to be notified about the completion of the
* operation, it will need to submit additional dummy job, coined as
* completion job request that will be written back eventually, by the
* background thread, into completion job response queue. This notification
* layout can simplify flows that might submit more than one job, such as
* in case of FLUSHALL which for a single command submits multiple jobs. It
* is also correct because jobs are processed in FIFO fashion.
*
* ----------------------------------------------------------------------------
*
Expand All @@ -38,9 +43,9 @@
* (RSALv2) or the Server Side Public License v1 (SSPLv1).
*/


#include "server.h"
#include "bio.h"
#include <fcntl.h>

static char* bio_worker_title[] = {
"bio_close_file",
Expand All @@ -55,6 +60,9 @@ static unsigned int bio_job_to_worker[] = {
[BIO_AOF_FSYNC] = 1,
[BIO_CLOSE_AOF] = 1,
[BIO_LAZY_FREE] = 2,
[BIO_COMP_RQ_CLOSE_FILE] = 0,
[BIO_COMP_RQ_AOF_FSYNC] = 1,
[BIO_COMP_RQ_LAZY_FREE] = 2
};

static pthread_t bio_threads[BIO_WORKER_NUM];
Expand All @@ -63,6 +71,18 @@ static pthread_cond_t bio_newjob_cond[BIO_WORKER_NUM];
static list *bio_jobs[BIO_WORKER_NUM];
static unsigned long bio_jobs_counter[BIO_NUM_OPS] = {0};

/* The bio_comp_list is used to hold completion job responses and to handover
* to main thread to callback as notification for job completion. Main
* thread will be triggered to read the list by signaling via writing to a pipe */
static list *bio_comp_list;
static pthread_mutex_t bio_mutex_comp;
static int job_comp_pipe[2]; /* Pipe used to awake the event loop */

typedef struct bio_comp_item {
comp_fn *func; /* callback after completion job will be processed */
uint64_t arg; /* user data to be passed to the function */
} bio_comp_item;

/* This structure represents a background Job. It is only used locally to this
* file as the API does not expose the internals at all. */
typedef union bio_job {
Expand All @@ -86,9 +106,15 @@ typedef union bio_job {
lazy_free_fn *free_fn; /* Function that will free the provided arguments */
void *free_args[]; /* List of arguments to be passed to the free function */
} free_args;
struct {
int type; /* header */
comp_fn *fn; /* callback. Handover to main thread to cb as notify for job completion */
uint64_t arg; /* callback arguments */
} comp_rq;
} bio_job;

void *bioProcessBackgroundJobs(void *arg);
void bioPipeReadJobCompList(aeEventLoop *el, int fd, void *privdata, int mask);

/* Make sure we have enough stack to perform all the things we do in the
* main thread. */
Expand All @@ -108,6 +134,27 @@ void bioInit(void) {
bio_jobs[j] = listCreate();
}

/* init jobs comp responses */
bio_comp_list = listCreate();
pthread_mutex_init(&bio_mutex_comp, NULL);

/* Create a pipe for background thread to be able to wake up the redis main thread.
* Make the pipe non blocking. This is just a best effort aware mechanism
* and we do not want to block not in the read nor in the write half.
* Enable close-on-exec flag on pipes in case of the fork-exec system calls in
* sentinels or redis servers. */
if (anetPipe(job_comp_pipe, O_CLOEXEC|O_NONBLOCK, O_CLOEXEC|O_NONBLOCK) == -1) {
serverLog(LL_WARNING,
"Can't create the pipe for bio thread: %s", strerror(errno));
exit(1);
}

/* Register a readable event for the pipe used to awake the event loop on job completion */
if (aeCreateFileEvent(server.el, job_comp_pipe[0], AE_READABLE,
bioPipeReadJobCompList, NULL) == AE_ERR) {
serverPanic("Error registering the readable event for the bio pipe.");
}

/* Set the stack size as by default it may be small in some system */
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
Expand Down Expand Up @@ -153,6 +200,28 @@ void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {
bioSubmitJob(BIO_LAZY_FREE, job);
}

void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_data) {
int type;
switch (assigned_worker) {
case BIO_WORKER_CLOSE_FILE:
type = BIO_COMP_RQ_CLOSE_FILE;
break;
case BIO_WORKER_AOF_FSYNC:
type = BIO_COMP_RQ_AOF_FSYNC;
break;
case BIO_WORKER_LAZY_FREE:
type = BIO_COMP_RQ_LAZY_FREE;
break;
default:
serverPanic("Invalid worker type in bioCreateCompRq().");
}

bio_job *job = zmalloc(sizeof(*job));
job->comp_rq.fn = func;
job->comp_rq.arg = user_data;
bioSubmitJob(type, job);
}

void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache) {
bio_job *job = zmalloc(sizeof(*job));
job->fd_args.fd = fd;
Expand Down Expand Up @@ -264,6 +333,21 @@ void *bioProcessBackgroundJobs(void *arg) {
close(job->fd_args.fd);
} else if (job_type == BIO_LAZY_FREE) {
job->free_args.free_fn(job->free_args.free_args);
} else if ((job_type == BIO_COMP_RQ_CLOSE_FILE) ||
(job_type == BIO_COMP_RQ_AOF_FSYNC) ||
(job_type == BIO_COMP_RQ_LAZY_FREE)) {
bio_comp_item *comp_rsp = zmalloc(sizeof(bio_comp_item));
comp_rsp->func = job->comp_rq.fn;
comp_rsp->arg = job->comp_rq.arg;

/* just write it to completion job responses */
pthread_mutex_lock(&bio_mutex_comp);
listAddNodeTail(bio_comp_list, comp_rsp);
pthread_mutex_unlock(&bio_mutex_comp);

if (write(job_comp_pipe[1],"A",1) != 1) {
/* Pipe is non-blocking, write() may fail if it's full. */
}
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
Expand Down Expand Up @@ -322,3 +406,34 @@ void bioKillThreads(void) {
}
}
}

void bioPipeReadJobCompList(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);

char buf[128];
list *tmp_list = NULL;

while (read(fd, buf, sizeof(buf)) == sizeof(buf));

/* Handle event loop events if pipe was written from event loop API */
pthread_mutex_lock(&bio_mutex_comp);
if (listLength(bio_comp_list)) {
tmp_list = bio_comp_list;
bio_comp_list = listCreate();
}
pthread_mutex_unlock(&bio_mutex_comp);

if (!tmp_list) return;

/* callback to all job completions */
while (listLength(tmp_list)) {
listNode *ln = listFirst(tmp_list);
bio_comp_item *rsp = ln->value;
listDelNode(tmp_list, ln);
rsp->func(rsp->arg);
zfree(rsp);
}
listRelease(tmp_list);
}
29 changes: 21 additions & 8 deletions src/bio.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,26 @@
#define __BIO_H

typedef void lazy_free_fn(void *args[]);
typedef void comp_fn(uint64_t user_data);

typedef enum bio_worker_t {
BIO_WORKER_CLOSE_FILE = 0,
BIO_WORKER_AOF_FSYNC,
BIO_WORKER_LAZY_FREE,
BIO_WORKER_NUM
} bio_worker_t;

/* Background job opcodes */
typedef enum bio_job_type_t {
BIO_CLOSE_FILE = 0, /* Deferred close(2) syscall. */
BIO_AOF_FSYNC, /* Deferred AOF fsync. */
BIO_LAZY_FREE, /* Deferred objects freeing. */
BIO_CLOSE_AOF,
BIO_COMP_RQ_CLOSE_FILE, /* Job completion request, registered on close-file worker's queue */
BIO_COMP_RQ_AOF_FSYNC, /* Job completion request, registered on aof-fsync worker's queue */
BIO_COMP_RQ_LAZY_FREE, /* Job completion request, registered on lazy-free worker's queue */
BIO_NUM_OPS
} bio_job_type_t;

/* Exported API */
void bioInit(void);
Expand All @@ -20,14 +40,7 @@ void bioCreateCloseJob(int fd, int need_fsync, int need_reclaim_cache);
void bioCreateCloseAofJob(int fd, long long offset, int need_reclaim_cache);
void bioCreateFsyncJob(int fd, long long offset, int need_reclaim_cache);
void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...);
void bioCreateCompRq(bio_worker_t assigned_worker, comp_fn *func, uint64_t user_data);

/* Background job opcodes */
enum {
BIO_CLOSE_FILE = 0, /* Deferred close(2) syscall. */
BIO_AOF_FSYNC, /* Deferred AOF fsync. */
BIO_LAZY_FREE, /* Deferred objects freeing. */
BIO_CLOSE_AOF, /* Deferred close for AOF files. */
BIO_NUM_OPS
};

#endif
20 changes: 16 additions & 4 deletions src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ void blockClient(client *c, int btype) {
/* Master client should never be blocked unless pause or module */
serverAssert(!(c->flags & CLIENT_MASTER &&
btype != BLOCKED_MODULE &&
btype != BLOCKED_LAZYFREE &&
btype != BLOCKED_POSTPONE));

c->flags |= CLIENT_BLOCKED;
Expand Down Expand Up @@ -175,6 +176,8 @@ void unblockClient(client *c, int queue_for_reprocessing) {
c->postponed_list_node = NULL;
} else if (c->bstate.btype == BLOCKED_SHUTDOWN) {
/* No special cleanup. */
} else if (c->bstate.btype == BLOCKED_LAZYFREE) {
/* No special cleanup. */
} else {
serverPanic("Unknown btype in unblockClient().");
}
Expand Down Expand Up @@ -206,7 +209,9 @@ void unblockClient(client *c, int queue_for_reprocessing) {
* send it a reply of some kind. After this function is called,
* unblockClient() will be called with the same client as argument. */
void replyToBlockedClientTimedOut(client *c) {
if (c->bstate.btype == BLOCKED_LIST ||
if (c->bstate.btype == BLOCKED_LAZYFREE) {
addReply(c, shared.ok); /* No reason lazy-free to fail */
} else if (c->bstate.btype == BLOCKED_LIST ||
c->bstate.btype == BLOCKED_ZSET ||
c->bstate.btype == BLOCKED_STREAM) {
addReplyNullArray(c);
Expand Down Expand Up @@ -263,9 +268,16 @@ void disconnectAllBlockedClients(void) {
if (c->bstate.btype == BLOCKED_POSTPONE)
continue;

unblockClientOnError(c,
"-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> replica?)");
if (c->bstate.btype == BLOCKED_LAZYFREE) {
addReply(c, shared.ok); /* No reason lazy-free to fail */
c->flags &= ~CLIENT_PENDING_COMMAND;
unblockClient(c, 1);
} else {

unblockClientOnError(c,
"-UNBLOCKED force unblock from blocking operation, "
"instance state changed (master -> replica?)");
}
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
}
}
Expand Down
Loading

0 comments on commit b2a4c38

Please sign in to comment.