Skip to content

Commit

Permalink
tests: make ring test all devices
Browse files Browse the repository at this point in the history
the ring functional test tries all devices, instead of just one
random device. The test creates multiple rings to try all devices.
If multiple processes are run on the same host, the rings are such
that all devices use inter-node connections at least once.

Signed-off-by: Amedeo Sapio <[email protected]>
  • Loading branch information
AmedeoSapio committed Mar 4, 2024
1 parent c96ed49 commit 9d8a467
Showing 1 changed file with 140 additions and 123 deletions.
263 changes: 140 additions & 123 deletions tests/functional/ring.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ int main(int argc, char *argv[])
int buffer_type = NCCL_PTR_HOST;

/* Plugin defines */
int ndev, dev, cuda_dev;
int ndev, cuda_dev;

nccl_net_ofi_send_comm_t *sComm_next = NULL;
nccl_net_ofi_listen_comm_t *lComm = NULL;
Expand Down Expand Up @@ -104,6 +104,11 @@ int main(int argc, char *argv[])
CUcontext context;
CUDACHECK(cuCtxCreate(&context, CU_CTX_SCHED_SPIN|CU_CTX_MAP_HOST, cuda_dev));
CUDACHECK(cuCtxSetCurrent(context));

/* Allocate and populate expected buffer */
OFINCCLCHECKGOTO(allocate_buff((void **)&expected_buf, SEND_SIZE, NCCL_PTR_HOST), res, exit);
OFINCCLCHECKGOTO(initialize_buff((void *)expected_buf, SEND_SIZE, NCCL_PTR_HOST), res, exit);

/*
* Calculate the rank of the next process in the ring. Use the
* modulus operator so that the last process "wraps around" to
Expand Down Expand Up @@ -136,152 +141,164 @@ int main(int argc, char *argv[])
goto exit;
}

/* Get Properties for the device */
for (dev = 0; dev < ndev; dev++) {
test_nccl_properties_t props = {0};
OFINCCLCHECKGOTO(extNet->getProperties(dev, &props), res, exit);
print_dev_props(dev, &props);
/* Get Properties for the device */
for (int dev = 0; dev < ndev; dev++) {
test_nccl_properties_t props = {0};
OFINCCLCHECKGOTO(extNet->getProperties(dev, &props), res, exit);
print_dev_props(dev, &props);

/* Set CUDA support */
/* Set CUDA support */
support_gdr[dev] = is_gdr_supported_nic(props.ptrSupport);
}

/* Choose specific device per rank for communication */
dev = rand() % ndev;
NCCL_OFI_TRACE(NCCL_INIT, "Rank %d uses %d device for communication", rank, dev);

if (support_gdr[dev] == 1) {
NCCL_OFI_INFO(NCCL_INIT | NCCL_NET,
"Network supports communication using CUDA buffers. Dev: %d", dev);
buffer_type = NCCL_PTR_CUDA;
}

/* Listen API */
NCCL_OFI_INFO(NCCL_NET, "Server: Listening on device %d", dev);
OFINCCLCHECKGOTO(extNet->listen(dev, (void *)&handle, (void **)&lComm), res, exit);

/* MPI send: Distribute handle to prev and next ranks */
MPI_Send(&handle, NCCL_NET_HANDLE_MAXSIZE,
MPI_CHAR, prev, 0, MPI_COMM_WORLD);
MPI_Send(&handle, NCCL_NET_HANDLE_MAXSIZE,
MPI_CHAR, next, 0, MPI_COMM_WORLD);

/* MPI recv: Receive handle from prev and next ranks */
MPI_Recv((void *)src_handle_prev, NCCL_NET_HANDLE_MAXSIZE, MPI_CHAR,
prev, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv((void *)src_handle_next, NCCL_NET_HANDLE_MAXSIZE, MPI_CHAR,
next, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
/* Test all devices */
for (int dev_idx = 0; dev_idx < ndev; dev_idx++) {

/* Connect to next rank */
NCCL_OFI_INFO(NCCL_NET, "Send connection request to rank %d", next);
while (sComm_next == NULL)
OFINCCLCHECKGOTO(extNet->connect(dev, (void *)src_handle_next, (void **)&sComm_next, &s_ignore), res, exit);
int dev = (local_rank + dev_idx) % ndev;

/*
* Accept API: accept connection from prev rank as the data flow is
* clockwise
*/
NCCL_OFI_INFO(NCCL_NET, "Server: Start accepting requests");
while (rComm == NULL)
OFINCCLCHECKGOTO(extNet->accept((void *)lComm, (void **)&rComm, &r_ignore), res, exit);
NCCL_OFI_INFO(NCCL_NET, "Successfully accepted connection from rank %d", prev);
NCCL_OFI_TRACE(NCCL_INIT, "Rank %d uses %d device for communication", rank, dev);

/* Send NUM_REQUESTS to next rank */
NCCL_OFI_INFO(NCCL_NET, "Sending %d requests to rank %d", NUM_REQUESTS, next);
for (int idx = 0; idx < NUM_REQUESTS; idx++) {
OFINCCLCHECKGOTO(allocate_buff((void **)&send_buf[idx], SEND_SIZE, buffer_type), res, exit);
OFINCCLCHECKGOTO(initialize_buff((void *)send_buf[idx], SEND_SIZE, buffer_type), res, exit);
if (support_gdr[dev] == 1) {
NCCL_OFI_INFO(NCCL_INIT | NCCL_NET,
"Network supports communication using CUDA buffers. Dev: %d", dev);
buffer_type = NCCL_PTR_CUDA;
}

OFINCCLCHECKGOTO(extNet->regMr((void *)sComm_next, (void *)send_buf[idx], SEND_SIZE,
buffer_type, &send_mhandle[idx]), res, exit);
NCCL_OFI_TRACE(NCCL_NET, "Successfully registered send memory for request %d of rank %d", idx, rank);
/* Listen API */
NCCL_OFI_INFO(NCCL_NET, "Server: Listening on device %d", dev);
OFINCCLCHECKGOTO(extNet->listen(dev, (void *)&handle, (void **)&lComm), res, exit);

/* MPI send: Distribute handle to prev and next ranks */
MPI_Send(&handle, NCCL_NET_HANDLE_MAXSIZE,
MPI_CHAR, prev, 0, MPI_COMM_WORLD);
MPI_Send(&handle, NCCL_NET_HANDLE_MAXSIZE,
MPI_CHAR, next, 0, MPI_COMM_WORLD);

/* MPI recv: Receive handle from prev and next ranks */
MPI_Recv((void *)src_handle_prev, NCCL_NET_HANDLE_MAXSIZE, MPI_CHAR,
prev, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv((void *)src_handle_next, NCCL_NET_HANDLE_MAXSIZE, MPI_CHAR,
next, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

/* Connect to next rank */
NCCL_OFI_INFO(NCCL_NET, "Send connection request to rank %d", next);
while (sComm_next == NULL)
OFINCCLCHECKGOTO(extNet->connect(dev, (void *)src_handle_next, (void **)&sComm_next, &s_ignore), res, exit);

/*
* Accept API: accept connection from prev rank as the data flow is
* clockwise
*/
NCCL_OFI_INFO(NCCL_NET, "Server: Start accepting requests");
while (rComm == NULL)
OFINCCLCHECKGOTO(extNet->accept((void *)lComm, (void **)&rComm, &r_ignore), res, exit);
NCCL_OFI_INFO(NCCL_NET, "Successfully accepted connection from rank %d", prev);

/* Send NUM_REQUESTS to next rank */
NCCL_OFI_INFO(NCCL_NET, "Sending %d requests to rank %d", NUM_REQUESTS, next);
for (int idx = 0; idx < NUM_REQUESTS; idx++) {
OFINCCLCHECKGOTO(allocate_buff((void **)&send_buf[idx], SEND_SIZE, buffer_type), res, exit);
OFINCCLCHECKGOTO(initialize_buff((void *)send_buf[idx], SEND_SIZE, buffer_type), res, exit);

while (send_req[idx] == NULL) {
OFINCCLCHECKGOTO(extNet->isend((void *)sComm_next, (void *)send_buf[idx], SEND_SIZE, tag,
send_mhandle[idx], (void **)&send_req[idx]), res, exit);
}
}
OFINCCLCHECKGOTO(extNet->regMr((void *)sComm_next, (void *)send_buf[idx], SEND_SIZE,
buffer_type, &send_mhandle[idx]), res, exit);
NCCL_OFI_TRACE(NCCL_NET, "Successfully registered send memory for request %d of rank %d", idx, rank);

/* Receive NUM_REQUESTS from prev rank */
NCCL_OFI_INFO(NCCL_NET, "Rank %d posting %d receive buffers", rank, NUM_REQUESTS);
for (int idx = 0; idx < NUM_REQUESTS; idx++) {
OFINCCLCHECKGOTO(allocate_buff((void **)&recv_buf[idx], RECV_SIZE, buffer_type), res, exit);
OFINCCLCHECKGOTO(extNet->regMr((void *)rComm, (void *)recv_buf[idx], RECV_SIZE,
buffer_type, &recv_mhandle[idx]), res, exit);
NCCL_OFI_TRACE(NCCL_NET, "Successfully registered receive memory for request %d of rank %d", idx, rank);

while (recv_req[idx] == NULL) {
OFINCCLCHECKGOTO(extNet->irecv((void *)rComm, nrecv, (void **)&recv_buf[idx],
sizes, tags, &recv_mhandle[idx], (void **)&recv_req[idx]), res, exit);
while (send_req[idx] == NULL) {
OFINCCLCHECKGOTO(extNet->isend((void *)sComm_next, (void *)send_buf[idx], SEND_SIZE, tag,
send_mhandle[idx], (void **)&send_req[idx]), res, exit);
}
}
}

/* Allocate and populate expected buffer */
OFINCCLCHECKGOTO(allocate_buff((void **)&expected_buf, SEND_SIZE, NCCL_PTR_HOST), res, exit);
OFINCCLCHECKGOTO(initialize_buff((void *)expected_buf, SEND_SIZE, NCCL_PTR_HOST), res, exit);

/* Test all completions */
while (inflight_reqs > 0) {
/* Test for send completions */
/* Receive NUM_REQUESTS from prev rank */
NCCL_OFI_INFO(NCCL_NET, "Rank %d posting %d receive buffers", rank, NUM_REQUESTS);
for (int idx = 0; idx < NUM_REQUESTS; idx++) {
if (req_completed_send[idx])
continue;

OFINCCLCHECKGOTO(extNet->test((void *)send_req[idx], &done, &received_size), res, exit);
if (done) {
inflight_reqs--;
req_completed_send[idx] = 1;
/* Deregister memory handle */
OFINCCLCHECKGOTO(extNet->deregMr((void *)sComm_next, send_mhandle[idx]), res, exit);
OFINCCLCHECKGOTO(allocate_buff((void **)&recv_buf[idx], RECV_SIZE, buffer_type), res, exit);
OFINCCLCHECKGOTO(extNet->regMr((void *)rComm, (void *)recv_buf[idx], RECV_SIZE,
buffer_type, &recv_mhandle[idx]), res, exit);
NCCL_OFI_TRACE(NCCL_NET, "Successfully registered receive memory for request %d of rank %d", idx, rank);

while (recv_req[idx] == NULL) {
OFINCCLCHECKGOTO(extNet->irecv((void *)rComm, nrecv, (void **)&recv_buf[idx],
sizes, tags, &recv_mhandle[idx], (void **)&recv_req[idx]), res, exit);
}
}

/* Test for recv completions */
for (int idx = 0; idx < NUM_REQUESTS; idx++) {
if (req_completed_recv[idx])
continue;

OFINCCLCHECKGOTO(extNet->test((void *)recv_req[idx], &done, &received_size), res, exit);
if (done) {
inflight_reqs--;
req_completed_recv[idx] = 1;

/* Invoke flush operations unless user has explicitly disabled it */
if (buffer_type == NCCL_PTR_CUDA) {
NCCL_OFI_TRACE(NCCL_NET,
"Issue flush for data consistency. Request idx: %d",
idx);
nccl_net_ofi_req_t *iflush_req = NULL;
OFINCCLCHECKGOTO(extNet->iflush((void *)rComm, nrecv,
(void **)&recv_buf[idx], sizes,
&recv_mhandle[idx], (void **)&iflush_req), res, exit);
done = 0;
if (iflush_req) {
while (!done) {
OFINCCLCHECKGOTO(extNet->test((void *)iflush_req, &done, NULL), res, exit);
/* Test all completions */
while (inflight_reqs > 0) {
/* Test for send completions */
for (int idx = 0; idx < NUM_REQUESTS; idx++) {
if (req_completed_send[idx])
continue;

OFINCCLCHECKGOTO(extNet->test((void *)send_req[idx], &done, &received_size), res, exit);
if (done) {
inflight_reqs--;
req_completed_send[idx] = 1;
/* Deregister memory handle */
OFINCCLCHECKGOTO(extNet->deregMr((void *)sComm_next, send_mhandle[idx]), res, exit);
}
}

/* Test for recv completions */
for (int idx = 0; idx < NUM_REQUESTS; idx++) {
if (req_completed_recv[idx])
continue;

OFINCCLCHECKGOTO(extNet->test((void *)recv_req[idx], &done, &received_size), res, exit);
if (done) {
inflight_reqs--;
req_completed_recv[idx] = 1;

/* Invoke flush operations unless user has explicitly disabled it */
if (buffer_type == NCCL_PTR_CUDA) {
NCCL_OFI_TRACE(NCCL_NET,
"Issue flush for data consistency. Request idx: %d",
idx);
nccl_net_ofi_req_t *iflush_req = NULL;
OFINCCLCHECKGOTO(extNet->iflush((void *)rComm, nrecv,
(void **)&recv_buf[idx], sizes,
&recv_mhandle[idx], (void **)&iflush_req), res, exit);
done = 0;
if (iflush_req) {
while (!done) {
OFINCCLCHECKGOTO(extNet->test((void *)iflush_req, &done, NULL), res, exit);
}
}
}
}

if ((buffer_type == NCCL_PTR_CUDA) && !ofi_nccl_gdr_flush_disable()) {
/* Data validation may fail if flush operations are disabled */
} else
OFINCCLCHECKGOTO(validate_data(recv_buf[idx], expected_buf, SEND_SIZE, buffer_type), res, exit);
if ((buffer_type == NCCL_PTR_CUDA) && !ofi_nccl_gdr_flush_disable()) {
/* Data validation may fail if flush operations are disabled */
} else
OFINCCLCHECKGOTO(validate_data(recv_buf[idx], expected_buf, SEND_SIZE, buffer_type), res, exit);

/* Deregister memory handle */
OFINCCLCHECKGOTO(extNet->deregMr((void *)rComm, recv_mhandle[idx]), res, exit);
/* Deregister memory handle */
OFINCCLCHECKGOTO(extNet->deregMr((void *)rComm, recv_mhandle[idx]), res, exit);
}
}
}
}

for (int idx = 0; idx < NUM_REQUESTS; idx++) {
if (send_buf[idx]) {
OFINCCLCHECKGOTO(deallocate_buffer(send_buf[idx], buffer_type), res, exit);
send_buf[idx] = NULL;
}
if (recv_buf[idx]) {
OFINCCLCHECKGOTO(deallocate_buffer(recv_buf[idx], buffer_type), res, exit);
recv_buf[idx] = NULL;
}
}

/* Close all Comm objects */
OFINCCLCHECKGOTO(extNet->closeSend((void *)sComm_next), res, exit);
sComm_next = NULL;
OFINCCLCHECKGOTO(extNet->closeRecv((void *)rComm), res, exit);
rComm = NULL;
OFINCCLCHECKGOTO(extNet->closeListen((void *)lComm), res, exit);
lComm = NULL;
/* Close all Comm objects */
OFINCCLCHECKGOTO(extNet->closeSend((void *)sComm_next), res, exit);
sComm_next = NULL;
OFINCCLCHECKGOTO(extNet->closeRecv((void *)rComm), res, exit);
rComm = NULL;
OFINCCLCHECKGOTO(extNet->closeListen((void *)lComm), res, exit);
lComm = NULL;

MPI_Barrier(MPI_COMM_WORLD);
}

MPI_Barrier(MPI_COMM_WORLD);
MPI_Finalize();
Expand Down

0 comments on commit 9d8a467

Please sign in to comment.