Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: backport fix for osx-* 6h timeouts #700

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions builtin/fsmonitor--daemon.c
Original file line number Diff line number Diff line change
Expand Up @@ -1208,9 +1208,9 @@ static int fsmonitor_run_daemon_1(struct fsmonitor_daemon_state *state)
* system event listener thread so that we have the IPC handle
* before we need it.
*/
if (ipc_server_run_async(&state->ipc_server_data,
state->path_ipc.buf, &ipc_opts,
handle_client, state))
if (ipc_server_init_async(&state->ipc_server_data,
state->path_ipc.buf, &ipc_opts,
handle_client, state))
return error_errno(
_("could not start IPC thread pool on '%s'"),
state->path_ipc.buf);
Expand Down
6 changes: 6 additions & 0 deletions compat/fsmonitor/fsm-listen-darwin.c
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,12 @@ void fsm_listen__loop(struct fsmonitor_daemon_state *state)
}
data->stream_started = 1;

/*
* Our fs event listener is now running, so it's safe to start
* serving client requests.
*/
ipc_server_start_async(state->ipc_server_data);

pthread_mutex_lock(&data->dq_lock);
pthread_cond_wait(&data->dq_finished, &data->dq_lock);
pthread_mutex_unlock(&data->dq_lock);
Expand Down
6 changes: 6 additions & 0 deletions compat/fsmonitor/fsm-listen-win32.c
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,12 @@ void fsm_listen__loop(struct fsmonitor_daemon_state *state)
start_rdcw_watch(data->watch_gitdir) == -1)
goto force_error_stop;

/*
* Now that we've established the rdcw watches, we can start
* serving clients.
*/
ipc_server_start_async(state->ipc_server_data);

for (;;) {
dwWait = WaitForMultipleObjects(data->nr_listener_handles,
data->hListener,
Expand Down
5 changes: 3 additions & 2 deletions compat/simple-ipc/ipc-shared.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ int ipc_server_run(const char *path, const struct ipc_server_opts *opts,
struct ipc_server_data *server_data = NULL;
int ret;

ret = ipc_server_run_async(&server_data, path, opts,
application_cb, application_data);
ret = ipc_server_init_async(&server_data, path, opts,
application_cb, application_data);
if (ret)
return ret;

ipc_server_start_async(server_data);
ret = ipc_server_await(server_data);

ipc_server_free(server_data);
Expand Down
28 changes: 23 additions & 5 deletions compat/simple-ipc/ipc-unix-socket.c
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ struct ipc_server_data {
int back_pos;
int front_pos;

int started;
int shutdown_requested;
int is_stopped;
};
Expand Down Expand Up @@ -824,10 +825,10 @@ static int setup_listener_socket(
/*
* Start IPC server in a pool of background threads.
*/
int ipc_server_run_async(struct ipc_server_data **returned_server_data,
const char *path, const struct ipc_server_opts *opts,
ipc_server_application_cb *application_cb,
void *application_data)
int ipc_server_init_async(struct ipc_server_data **returned_server_data,
const char *path, const struct ipc_server_opts *opts,
ipc_server_application_cb *application_cb,
void *application_data)
{
struct unix_ss_socket *server_socket = NULL;
struct ipc_server_data *server_data;
Expand Down Expand Up @@ -888,6 +889,12 @@ int ipc_server_run_async(struct ipc_server_data **returned_server_data,
server_data->accept_thread->fd_send_shutdown = sv[0];
server_data->accept_thread->fd_wait_shutdown = sv[1];

/*
* Hold work-available mutex so that no work can start until
* we unlock it.
*/
pthread_mutex_lock(&server_data->work_available_mutex);

if (pthread_create(&server_data->accept_thread->pthread_id, NULL,
accept_thread_proc, server_data->accept_thread))
die_errno(_("could not start accept_thread '%s'"), path);
Expand Down Expand Up @@ -918,6 +925,15 @@ int ipc_server_run_async(struct ipc_server_data **returned_server_data,
return 0;
}

void ipc_server_start_async(struct ipc_server_data *server_data)
{
if (!server_data || server_data->started)
return;

server_data->started = 1;
pthread_mutex_unlock(&server_data->work_available_mutex);
}

/*
* Gently tell the IPC server treads to shutdown.
* Can be run on any thread.
Expand All @@ -933,7 +949,9 @@ int ipc_server_stop_async(struct ipc_server_data *server_data)

trace2_region_enter("ipc-server", "server-stop-async", NULL);

pthread_mutex_lock(&server_data->work_available_mutex);
/* If we haven't started yet, we are already holding lock. */
if (server_data->started)
pthread_mutex_lock(&server_data->work_available_mutex);

server_data->shutdown_requested = 1;

Expand Down
48 changes: 44 additions & 4 deletions compat/simple-ipc/ipc-win32.c
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ struct ipc_server_data {
HANDLE hEventStopRequested;
struct ipc_server_thread_data *thread_list;
int is_stopped;

pthread_mutex_t startup_barrier;
int started;
};

enum connect_result {
Expand Down Expand Up @@ -526,6 +529,16 @@ static int use_connection(struct ipc_server_thread_data *server_thread_data)
return ret;
}

static void wait_for_startup_barrier(struct ipc_server_data *server_data)
{
/*
* Temporarily hold the startup_barrier mutex before starting,
* which lets us know that it's OK to start serving requests.
*/
pthread_mutex_lock(&server_data->startup_barrier);
pthread_mutex_unlock(&server_data->startup_barrier);
}

/*
* Thread proc for an IPC server worker thread. It handles a series of
* connections from clients. It cleans and reuses the hPipe between each
Expand All @@ -550,6 +563,8 @@ static void *server_thread_proc(void *_server_thread_data)
memset(&oConnect, 0, sizeof(oConnect));
oConnect.hEvent = hEventConnected;

wait_for_startup_barrier(server_thread_data->server_data);

for (;;) {
cr = wait_for_connection(server_thread_data, &oConnect);

Expand Down Expand Up @@ -752,10 +767,10 @@ static HANDLE create_new_pipe(wchar_t *wpath, int is_first)
return hPipe;
}

int ipc_server_run_async(struct ipc_server_data **returned_server_data,
const char *path, const struct ipc_server_opts *opts,
ipc_server_application_cb *application_cb,
void *application_data)
int ipc_server_init_async(struct ipc_server_data **returned_server_data,
const char *path, const struct ipc_server_opts *opts,
ipc_server_application_cb *application_cb,
void *application_data)
{
struct ipc_server_data *server_data;
wchar_t wpath[MAX_PATH];
Expand Down Expand Up @@ -787,6 +802,13 @@ int ipc_server_run_async(struct ipc_server_data **returned_server_data,
strbuf_addstr(&server_data->buf_path, path);
wcscpy(server_data->wpath, wpath);

/*
* Hold the startup_barrier lock so that no threads will progress
* until ipc_server_start_async() is called.
*/
pthread_mutex_init(&server_data->startup_barrier, NULL);
pthread_mutex_lock(&server_data->startup_barrier);

if (nr_threads < 1)
nr_threads = 1;

Expand Down Expand Up @@ -837,6 +859,15 @@ int ipc_server_run_async(struct ipc_server_data **returned_server_data,
return 0;
}

void ipc_server_start_async(struct ipc_server_data *server_data)
{
if (!server_data || server_data->started)
return;

server_data->started = 1;
pthread_mutex_unlock(&server_data->startup_barrier);
}

int ipc_server_stop_async(struct ipc_server_data *server_data)
{
if (!server_data)
Expand All @@ -850,6 +881,13 @@ int ipc_server_stop_async(struct ipc_server_data *server_data)
* We DO NOT attempt to force them to drop an active connection.
*/
SetEvent(server_data->hEventStopRequested);

/*
* If we haven't yet told the threads they are allowed to run,
* do so now, so they can receive the shutdown event.
*/
ipc_server_start_async(server_data);

return 0;
}

Expand Down Expand Up @@ -900,5 +938,7 @@ void ipc_server_free(struct ipc_server_data *server_data)
free(std);
}

pthread_mutex_destroy(&server_data->startup_barrier);

free(server_data);
}
17 changes: 13 additions & 4 deletions simple-ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,20 @@ struct ipc_server_opts
* When a client IPC message is received, the `application_cb` will be
* called (possibly on a random thread) to handle the message and
* optionally compose a reply message.
*
* This initializes all threads but no actual work will be done until
* ipc_server_start_async() is called.
*/
int ipc_server_init_async(struct ipc_server_data **returned_server_data,
const char *path, const struct ipc_server_opts *opts,
ipc_server_application_cb *application_cb,
void *application_data);

/*
* Let an async server start running. This needs to be called only once
* after initialization.
*/
int ipc_server_run_async(struct ipc_server_data **returned_server_data,
const char *path, const struct ipc_server_opts *opts,
ipc_server_application_cb *application_cb,
void *application_data);
void ipc_server_start_async(struct ipc_server_data *server_data);

/*
* Gently signal the IPC server pool to shutdown. No new client
Expand Down
Loading