diff --git a/src/runtime/HalideRuntime.h b/src/runtime/HalideRuntime.h index efdb54feba63..df6b4f127249 100644 --- a/src/runtime/HalideRuntime.h +++ b/src/runtime/HalideRuntime.h @@ -194,7 +194,7 @@ struct halide_mutex { /** Cross platform condition variable. Must be initialized to 0. */ struct halide_cond { - uintptr_t _private[1]; + uintptr_t _private[2]; }; /** A basic set of mutex and condition variable functions, which call diff --git a/src/runtime/synchronization_common.h b/src/runtime/synchronization_common.h index 778c423e4046..0fc89a692ed7 100644 --- a/src/runtime/synchronization_common.h +++ b/src/runtime/synchronization_common.h @@ -816,11 +816,15 @@ struct wait_parking_control final : public parking_control { class fast_cond { uintptr_t state = 0; + uintptr_t counter = 0; public: ALWAYS_INLINE void signal() { if_tsan_pre_signal(this); + // Release any spinning waiters + atomic_fetch_add_acquire_release(&counter, (uintptr_t)1); + uintptr_t val; atomic_load_relaxed(&state, &val); if (val == 0) { @@ -834,6 +838,10 @@ class fast_cond { ALWAYS_INLINE void broadcast() { if_tsan_pre_signal(this); + + // Release any spinning waiters + atomic_fetch_add_acquire_release(&counter, (uintptr_t)1); + uintptr_t val; atomic_load_relaxed(&state, &val); if (val == 0) { @@ -846,6 +854,35 @@ class fast_cond { } ALWAYS_INLINE void wait(fast_mutex *mutex) { + // Spin for a bit, waiting to see if someone else calls signal or + // broadcast. + uintptr_t initial; + atomic_load_relaxed(&counter, &initial); + mutex->unlock(); + spin_control spinner; + while (spinner.should_spin()) { + halide_thread_yield(); + uintptr_t current; + atomic_load_relaxed(&counter, ¤t); + if (current != initial) { + mutex->lock(); + return; + } + } + + mutex->lock(); // Can locking and then immediately waiting be + // optimized? + + // Check one final time with the lock held. This guarantees we won't + // miss an increment of the counter because it is only ever incremented + // with the lock held. + uintptr_t current; + atomic_load_relaxed(&counter, ¤t); + if (current != initial) { + return; + } + + // Go to sleep until signaled wait_parking_control control(&state, mutex); uintptr_t result = control.park((uintptr_t)this); if (result != (uintptr_t)mutex) { diff --git a/src/runtime/thread_pool_common.h b/src/runtime/thread_pool_common.h index 5dbfe9191bab..a50536d54a79 100644 --- a/src/runtime/thread_pool_common.h +++ b/src/runtime/thread_pool_common.h @@ -203,9 +203,6 @@ WEAK void dump_job_state() { WEAK void worker_thread(void *); WEAK void worker_thread_already_locked(work *owned_job) { - int spin_count = 0; - const int max_spin_count = 40; - while (owned_job ? owned_job->running() : !work_queue.shutdown) { work *job = work_queue.jobs; work **prev_ptr = &work_queue.jobs; @@ -283,18 +280,11 @@ WEAK void worker_thread_already_locked(work *owned_job) { if (!job) { // There is no runnable job. Go to sleep. if (owned_job) { - if (spin_count++ < max_spin_count) { - // Give the workers a chance to finish up before sleeping - halide_mutex_unlock(&work_queue.mutex); - halide_thread_yield(); - halide_mutex_lock(&work_queue.mutex); - } else { - work_queue.owners_sleeping++; - owned_job->owner_is_sleeping = true; - halide_cond_wait(&work_queue.wake_owners, &work_queue.mutex); - owned_job->owner_is_sleeping = false; - work_queue.owners_sleeping--; - } + work_queue.owners_sleeping++; + owned_job->owner_is_sleeping = true; + halide_cond_wait(&work_queue.wake_owners, &work_queue.mutex); + owned_job->owner_is_sleeping = false; + work_queue.owners_sleeping--; } else { work_queue.workers_sleeping++; if (work_queue.a_team_size > work_queue.target_a_team_size) { @@ -302,19 +292,12 @@ WEAK void worker_thread_already_locked(work *owned_job) { work_queue.a_team_size--; halide_cond_wait(&work_queue.wake_b_team, &work_queue.mutex); work_queue.a_team_size++; - } else if (spin_count++ < max_spin_count) { - // Spin waiting for new work - halide_mutex_unlock(&work_queue.mutex); - halide_thread_yield(); - halide_mutex_lock(&work_queue.mutex); } else { halide_cond_wait(&work_queue.wake_a_team, &work_queue.mutex); } work_queue.workers_sleeping--; } continue; - } else { - spin_count = 0; } log_message("Working on job " << job->task.name);