diff --git a/bluefog/common/operations.cc b/bluefog/common/operations.cc index 578291e0..74363952 100644 --- a/bluefog/common/operations.cc +++ b/bluefog/common/operations.cc @@ -1437,7 +1437,10 @@ Status EnqueueTensorAllreduce(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -1473,7 +1476,10 @@ Status EnqueueTensorBroadcast(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -1508,7 +1514,10 @@ Status EnqueueTensorAllgather(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -1553,7 +1562,10 @@ Status EnqueueTensorNeighborAllgather(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -1605,7 +1617,10 @@ Status EnqueueTensorNeighborAllreduce(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -1646,7 +1661,10 @@ Status EnqueueTensorPairGossip(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -1679,7 +1697,10 @@ Status EnqueueTensorWindowCreate( return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -1704,7 +1725,10 @@ Status EnqueueTensorWindowFree(const std::string& name, int device, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -1738,7 +1762,10 @@ Status EnqueueTensorWindowPut(std::shared_ptr tensor, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -1770,7 +1797,10 @@ Status EnqueueTensorWindowAccumulate( return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -1798,7 +1828,10 @@ Status EnqueueTensorWindowGet(const std::string& name, return SUSPEND_ERROR; } Status status = bluefog_global.tensor_queue.AddToTensorQueue(e, message); - bluefog_global.loop_cv.notify_one(); + { + std::unique_lock lk(bluefog_global.loop_mutex); + bluefog_global.loop_cv.notify_one(); + } return status; } @@ -2048,8 +2081,8 @@ void SetSkipNegotiateStageState(bool value) { { std::lock_guard lk(bluefog_global.loop_mutex); global_skip_negotiate_stage = value; + bluefog_global.loop_cv.notify_one(); } - bluefog_global.loop_cv.notify_one(); } bool GetSkipNegotiateStageState() {