Skip to content

Commit

Permalink
Workaround for TF issue 62466
Browse files Browse the repository at this point in the history
  • Loading branch information
ekuznetsov139 authored and pemeliya committed Mar 18, 2024
1 parent 8c94034 commit 6700094
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions tensorflow/core/common_runtime/base_collective_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -391,16 +391,20 @@ void BaseCollectiveExecutor::CompleteParamsAsync(
cp->instance.impl_details.timeout_seconds * 1'000'000);
if (timeout_microseconds > 0) {
// TODO(xldrx): Share the timeout watchdog thread among collectives.
int timeout = cp->instance.impl_details.timeout_seconds;
SchedNonBlockingClosureAfter(
timeout_microseconds, [this, is_callback_called, done]() {
bool called = is_callback_called->exchange(true);
if (!called) {
Status status(
absl::StatusCode::kDeadlineExceeded,
"Collective has timed out waiting for other workers.");
StartAbort(status);
done(status);
}
1'000'000, [this, is_callback_called, done, timeout]() {
for(int count = 0; count < timeout; count++) {
bool called = is_callback_called->exchange(false);
if(called)
return;
usleep(1000000);
}
Status status(
absl::StatusCode::kDeadlineExceeded,
"Collective has timed out waiting for other workers.");
StartAbort(status);
done(status);
});
}
cem_->GetParamResolver()->CompleteParamsAsync(device, cp, cancel_mgr,
Expand Down

0 comments on commit 6700094

Please sign in to comment.