Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

slow ranks search improvements #99

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/broadcast.cu
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ testResult_t BroadcastRunTest(struct threadArgs* args, int root, ncclDataType_t
const char **run_typenames;
int type_count;
int begin_root, end_root;
int step = 0;

if ((int)type != -1) {
type_count = 1;
Expand All @@ -97,15 +98,16 @@ testResult_t BroadcastRunTest(struct threadArgs* args, int root, ncclDataType_t
run_typenames = test_typenames;
}

if (root != -1) {
if (root >= 0) {
begin_root = end_root = root;
} else {
step = -root;
begin_root = 0;
end_root = args->nProcs*args->nThreads*args->nGpus-1;
}

for (int i=0; i<type_count; i++) {
for (int j=begin_root; j<=end_root; j++) {
for (int j=begin_root; j<=end_root; j+=step) {
TESTCHECK(TimeTest(args, run_types[i], run_typenames[i], (ncclRedOp_t)0, "", j));
}
}
Expand Down
49 changes: 46 additions & 3 deletions src/common.cu
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ int test_ncclVersion = 0; // init'd with ncclGetVersion()
int test_opnum = 4;
#endif

// Communication timeout, default is 30min
double comm_timeout = 1800;

thread_local int is_main_thread = 0;

// Command line parameter defaults
Expand All @@ -65,6 +68,8 @@ static int nccltype = ncclFloat;
static int ncclroot = 0;
static int parallel_init = 0;
static int blocking_coll = 0;
static int slow_rank = -1;
static int slow_rank_usec = 100000;
static int cudaGraphLaunches = 0;
// Report average iteration time: (0=RANK0,1=AVG,2=MIN,3=MAX)
static int average = 1;
Expand Down Expand Up @@ -469,8 +474,11 @@ testResult_t CheckData(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
testResult_t testStreamSynchronize(int ngpus, cudaStream_t* streams, ncclComm_t* comms) {
cudaError_t cudaErr;
int remaining = ngpus;
auto start = std::chrono::high_resolution_clock::time_point::min();
int checkTimeout = 0;
int* done = (int*)malloc(sizeof(int)*ngpus);
memset(done, 0, sizeof(int)*ngpus);

while (remaining) {
int idle = 1;
for (int i=0; i<ngpus; i++) {
Expand Down Expand Up @@ -503,7 +511,25 @@ testResult_t testStreamSynchronize(int ngpus, cudaStream_t* streams, ncclComm_t*
}

// We might want to let other threads (including NCCL threads) use the CPU.
if (idle) pthread_yield();
if (idle) {
if (checkTimeout) {
auto delta = std::chrono::high_resolution_clock::now() - start;
double deltaSec = std::chrono::duration_cast<std::chrono::duration<double>>(delta).count();
if (deltaSec > comm_timeout) {
char hostname[1024];
getHostName(hostname, 1024);
printf("COMMUNICATION TIMEOUT\n");
printf("%s: Test NCCL failure %s:%d communication dedline exceeded %.0f sec, timeout %.0f sec\n",
hostname, __FILE__,__LINE__, deltaSec, comm_timeout);
return testNcclError;
}

} else {
start = std::chrono::high_resolution_clock::now();
checkTimeout = 1;
}
pthread_yield();
}
}
free(done);
return testSuccess;
Expand All @@ -529,6 +555,10 @@ testResult_t startColl(struct threadArgs* args, ncclDataType_t type, ncclRedOp_t
char* sendBuff = ((char*)args->sendbuffs[i]) + shift;
ncclRedOp_t op;

if (slow_rank == rank) {
usleep(slow_rank_usec);
}

if(opIndex < ncclNumOps) {
op = opIndex;
}
Expand Down Expand Up @@ -882,13 +912,15 @@ int main(int argc, char* argv[]) {
{"blocking", required_argument, 0, 'z'},
{"cudagraph", required_argument, 0, 'G'},
{"average", required_argument, 0, 'a'},
{"slowrank", required_argument, 0, 'S'},
{"slowrank_delay", required_argument, 0, 'D'},
{"help", no_argument, 0, 'h'},
{}
};

while(1) {
int c;
c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:p:c:o:d:r:z:hG:a:", longopts, &longindex);
c = getopt_long(argc, argv, "t:g:b:e:i:f:n:m:w:p:c:o:d:r:z:hG:a:S:D:", longopts, &longindex);

if (c == -1)
break;
Expand Down Expand Up @@ -953,6 +985,12 @@ int main(int argc, char* argv[]) {
case 'z':
blocking_coll = strtol(optarg, NULL, 0);
break;
case 'S':
slow_rank= (int)strtol(optarg, NULL, 0);
break;
case 'D':
slow_rank_usec= strtol(optarg, NULL, 0);
break;
case 'G':
#if (NCCL_MAJOR > 2 || (NCCL_MAJOR >= 2 && NCCL_MINOR >= 9)) && CUDART_VERSION >= 11030
cudaGraphLaunches = strtol(optarg, NULL, 0);
Expand Down Expand Up @@ -990,6 +1028,8 @@ int main(int argc, char* argv[]) {
"[-z,--blocking <0/1>] \n\t"
"[-G,--cudagraph <num graph launches>] \n\t"
"[-a,--average <0/1/2/3> report average iteration time <0=RANK0/1=AVG/2=MIN/3=MAX>] \n\t"
"[-S slowrank <rank>] slow rank (default is disabled) \n\t"
"[-D slowrank_delay <usec>] slow rank delay usec \n\t"
"[-h,--help]\n",
basename(argv[0]));
return 0;
Expand All @@ -1014,6 +1054,9 @@ testResult_t run() {
char hostname[1024];
getHostName(hostname, 1024);

char* str = getenv("NCCL_TESTS_COMM_TIMEOUT");
comm_timeout = str ? atof(str) : comm_timeout;

#ifdef MPI_SUPPORT
MPI_Comm_size(MPI_COMM_WORLD, &nProcs);
MPI_Comm_rank(MPI_COMM_WORLD, &proc);
Expand Down Expand Up @@ -1195,7 +1238,7 @@ testResult_t run() {
}
CUDACHECK(cudaFreeHost(delta));

char* str = getenv("NCCL_TESTS_MIN_BW");
str = getenv("NCCL_TESTS_MIN_BW");
double check_avg_bw = str ? atof(str) : -1;
bw[0] /= bw_count[0];

Expand Down
6 changes: 4 additions & 2 deletions src/gather.cu
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ testResult_t GatherRunTest(struct threadArgs* args, int root, ncclDataType_t typ
const char **run_typenames;
int type_count;
int begin_root, end_root;
int step = 0;

if ((int)type != -1) {
type_count = 1;
Expand All @@ -108,15 +109,16 @@ testResult_t GatherRunTest(struct threadArgs* args, int root, ncclDataType_t typ
run_typenames = test_typenames;
}

if (root != -1) {
if (root >= 0) {
begin_root = end_root = root;
} else {
step = -root;
begin_root = 0;
end_root = args->nProcs*args->nThreads*args->nGpus-1;
}

for (int i=0; i<type_count; i++) {
for (int j=begin_root; j<=end_root; j++) {
for (int j=begin_root; j<=end_root; j+=step) {
TESTCHECK(TimeTest(args, run_types[i], run_typenames[i], (ncclRedOp_t)0, "", j));
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/reduce.cu
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ testResult_t ReduceRunTest(struct threadArgs* args, int root, ncclDataType_t typ
const char **run_typenames, **run_opnames;
int type_count, op_count;
int begin_root, end_root;
int step = 0;

if ((int)type != -1) {
type_count = 1;
Expand All @@ -98,16 +99,17 @@ testResult_t ReduceRunTest(struct threadArgs* args, int root, ncclDataType_t typ
run_opnames = test_opnames;
}

if (root != -1) {
if (root >= 0) {
begin_root = end_root = root;
} else {
step = -root;
begin_root = 0;
end_root = args->nProcs*args->nThreads*args->nGpus-1;
}

for (int i=0; i<type_count; i++) {
for (int j=0; j<op_count; j++) {
for (int k=begin_root; k<=end_root; k++) {
for (int k=begin_root; k<=end_root; k+=step) {
TESTCHECK(TimeTest(args, run_types[i], run_typenames[i], run_ops[j], run_opnames[j], k));
}
}
Expand Down
6 changes: 4 additions & 2 deletions src/scatter.cu
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ testResult_t ScatterRunTest(struct threadArgs* args, int root, ncclDataType_t ty
const char **run_typenames;
int type_count;
int begin_root, end_root;
int step = 0;

if ((int)type != -1) {
type_count = 1;
Expand All @@ -102,15 +103,16 @@ testResult_t ScatterRunTest(struct threadArgs* args, int root, ncclDataType_t ty
run_typenames = test_typenames;
}

if (root != -1) {
if (root >= 0) {
begin_root = end_root = root;
} else {
step = -root;
begin_root = 0;
end_root = args->nProcs*args->nThreads*args->nGpus-1;
}

for (int i=0; i<type_count; i++) {
for (int j=begin_root; j<=end_root; j++) {
for (int j=begin_root; j<=end_root; j+=step) {
TESTCHECK(TimeTest(args, run_types[i], run_typenames[i], (ncclRedOp_t)0, "", j));
}
}
Expand Down
27 changes: 19 additions & 8 deletions src/sendrecv.cu
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@
#include "common.h"

void print_header() {
PRINT("# %10s %12s %8s out-of-place in-place \n", "", "", "");
PRINT("# %10s %12s %8s %7s %6s %6s %5s %7s %6s %6s %5s\n", "size", "count", "type",
PRINT("# %10s %12s %8s %6s out-of-place in-place \n", "", "", "", "");
PRINT("# %10s %12s %8s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "size", "count", "type", "dist",
"time", "algbw", "busbw", "error", "time", "algbw", "busbw", "error");
PRINT("# %10s %12s %8s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "",
PRINT("# %10s %12s %8s %6s %7s %6s %6s %5s %7s %6s %6s %5s\n", "(B)", "(elements)", "", "",
"(us)", "(GB/s)", "(GB/s)", "", "(us)", "(GB/s)", "(GB/s)", "");
}

void print_line_header (size_t size, size_t count, const char *typeName, const char *opName, int root) {
PRINT("%12li %12li %8s", size, count, typeName);
PRINT("%12li %12li %8s %6i", size, count, typeName, root);
}

void SendRecvGetCollByteCount(size_t *sendcount, size_t *recvcount, size_t *paramcount, size_t *sendInplaceOffset, size_t *recvInplaceOffset, size_t count, int nranks) {
Expand All @@ -39,7 +39,7 @@ testResult_t SendRecvInitData(struct threadArgs* args, ncclDataType_t type, nccl
CUDACHECK(cudaMemset(args->recvbuffs[i], 0, args->expectedBytes));
void* data = in_place ? args->recvbuffs[i] : args->sendbuffs[i];
TESTCHECK(InitData(data, sendcount, type, rep, rank));
int peer = (rank-1+nranks)%nranks;
int peer = (rank-root+nranks)%nranks;
TESTCHECK(InitData(args->expected[i], recvcount, type, rep, peer));
CUDACHECK(cudaDeviceSynchronize());
}
Expand All @@ -61,8 +61,8 @@ testResult_t SendRecvRunColl(void* sendbuff, void* recvbuff, size_t count, ncclD
NCCLCHECK(ncclCommCount(comm, &nRanks));
int rank;
NCCLCHECK(ncclCommUserRank(comm, &rank));
int recvPeer = (rank-1+nRanks) % nRanks;
int sendPeer = (rank+1) % nRanks;
int recvPeer = (rank-root+nRanks) % nRanks;
int sendPeer = (rank+root) % nRanks;

NCCLCHECK(ncclGroupStart());
NCCLCHECK(ncclSend(sendbuff, count, type, sendPeer, comm, stream));
Expand Down Expand Up @@ -90,6 +90,8 @@ testResult_t SendRecvRunTest(struct threadArgs* args, int root, ncclDataType_t t
ncclRedOp_t *run_ops;
const char **run_typenames, **run_opnames;
int type_count, op_count;
int begin_root, end_root;
int step = 0;

if ((int)type != -1) {
type_count = 1;
Expand All @@ -110,10 +112,19 @@ testResult_t SendRecvRunTest(struct threadArgs* args, int root, ncclDataType_t t
run_ops = test_ops;
run_opnames = test_opnames;
}
if (root >= 0) {
begin_root = end_root = root;
} else {
step = -root;
begin_root = step;
end_root = args->nProcs*args->nThreads*args->nGpus-1;
}

for (int i=0; i<type_count; i++) {
for (int j=0; j<op_count; j++) {
TESTCHECK(TimeTest(args, run_types[i], run_typenames[i], run_ops[j], run_opnames[j], -1));
for (int rr=begin_root; rr<=end_root; rr+=step) {
TESTCHECK(TimeTest(args, run_types[i], run_typenames[i], run_ops[j], run_opnames[j], rr));
}
}
}
return testSuccess;
Expand Down