Skip to content

Commit

Permalink
Merge branch 'v5.0.x' into topic/5.0.x/handle-masync-assign-ctx
Browse files Browse the repository at this point in the history
  • Loading branch information
Akshay-Venkatesh authored Oct 3, 2024
2 parents c09e947 + e25d3e6 commit 4537ce9
Show file tree
Hide file tree
Showing 13 changed files with 127 additions and 56 deletions.
2 changes: 1 addition & 1 deletion docs/tuning-apps/networking/rocm.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ using Open MPI and UCX ROCm support is something like this:
.. code-block::
shell$ mpirun -n 2 --mca pml ucx \
./osu_latency -d rocm D D
./osu_latency D D
Note: some additional configure flags are required to compile the OSU
benchmark to support ROCm buffers. Please refer to the `UCX ROCm
Expand Down
5 changes: 5 additions & 0 deletions ompi/mca/coll/cuda/coll_cuda.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ int mca_coll_cuda_reduce(const void *sbuf, void *rbuf, int count,
struct ompi_communicator_t *comm,
mca_coll_base_module_t *module);

int mca_coll_cuda_reduce_local(const void *sbuf, void *rbuf, size_t count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
mca_coll_base_module_t *module);

int mca_coll_cuda_exscan(const void *sbuf, void *rbuf, int count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
Expand Down
3 changes: 3 additions & 0 deletions ompi/mca/coll/cuda/coll_cuda_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ static void mca_coll_cuda_module_destruct(mca_coll_cuda_module_t *module)
{
OBJ_RELEASE(module->c_coll.coll_allreduce_module);
OBJ_RELEASE(module->c_coll.coll_reduce_module);
OBJ_RELEASE(module->c_coll.coll_reduce_local_module);
OBJ_RELEASE(module->c_coll.coll_reduce_scatter_block_module);
OBJ_RELEASE(module->c_coll.coll_scatter_module);
/* If the exscan module is not NULL, then this was an
Expand Down Expand Up @@ -103,6 +104,7 @@ mca_coll_cuda_comm_query(struct ompi_communicator_t *comm,
cuda_module->super.coll_gather = NULL;
cuda_module->super.coll_gatherv = NULL;
cuda_module->super.coll_reduce = mca_coll_cuda_reduce;
cuda_module->super.coll_reduce_local = mca_coll_cuda_reduce_local;
cuda_module->super.coll_reduce_scatter = NULL;
cuda_module->super.coll_reduce_scatter_block = mca_coll_cuda_reduce_scatter_block;
cuda_module->super.coll_scan = mca_coll_cuda_scan;
Expand Down Expand Up @@ -135,6 +137,7 @@ int mca_coll_cuda_module_enable(mca_coll_base_module_t *module,

CHECK_AND_RETAIN(comm, s, allreduce);
CHECK_AND_RETAIN(comm, s, reduce);
CHECK_AND_RETAIN(comm, s, reduce_local);
CHECK_AND_RETAIN(comm, s, reduce_scatter_block);
CHECK_AND_RETAIN(comm, s, scatter);
if (!OMPI_COMM_IS_INTER(comm)) {
Expand Down
57 changes: 57 additions & 0 deletions ompi/mca/coll/cuda/coll_cuda_reduce.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,60 @@ mca_coll_cuda_reduce(const void *sbuf, void *rbuf, int count,
}
return rc;
}

int
mca_coll_cuda_reduce_local(const void *sbuf, void *rbuf, size_t count,
struct ompi_datatype_t *dtype,
struct ompi_op_t *op,
mca_coll_base_module_t *module)
{
ptrdiff_t gap;
char *rbuf1 = NULL, *sbuf1 = NULL, *rbuf2 = NULL;
size_t bufsize;
int rc;

bufsize = opal_datatype_span(&dtype->super, count, &gap);

rc = mca_coll_cuda_check_buf((void *)sbuf);
if (rc < 0) {
return rc;
}

if ((MPI_IN_PLACE != sbuf) && (rc > 0)) {
sbuf1 = (char*)malloc(bufsize);
if (NULL == sbuf1) {
return OMPI_ERR_OUT_OF_RESOURCE;
}
mca_coll_cuda_memcpy(sbuf1, sbuf, bufsize);
sbuf = sbuf1 - gap;
}

rc = mca_coll_cuda_check_buf(rbuf);
if (rc < 0) {
return rc;
}

if (rc > 0) {
rbuf1 = (char*)malloc(bufsize);
if (NULL == rbuf1) {
if (NULL != sbuf1) free(sbuf1);
return OMPI_ERR_OUT_OF_RESOURCE;
}
mca_coll_cuda_memcpy(rbuf1, rbuf, bufsize);
rbuf2 = rbuf; /* save away original buffer */
rbuf = rbuf1 - gap;
}

ompi_op_reduce(op, (void *)sbuf, rbuf, count, dtype);
rc = OMPI_SUCCESS;

if (NULL != sbuf1) {
free(sbuf1);
}
if (NULL != rbuf1) {
rbuf = rbuf2;
mca_coll_cuda_memcpy(rbuf, rbuf1, bufsize);
free(rbuf1);
}
return rc;
}
2 changes: 1 addition & 1 deletion ompi/mca/pml/ob1/pml_ob1_isend.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ static inline int mca_pml_ob1_send_inline (const void *buf, size_t count,
}

if (OPAL_UNLIKELY(OMPI_SUCCESS != rc)) {
return rc;
return rc;
}

return (int) size;
Expand Down
7 changes: 4 additions & 3 deletions ompi/mca/pml/ob1/pml_ob1_recvreq.c
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ static int mca_pml_ob1_recv_request_get_frag_failed (mca_pml_ob1_rdma_frag_t *fr
}
}

if (++frag->retries < mca_pml_ob1.rdma_retries_limit &&
if (frag->retries < mca_pml_ob1.rdma_retries_limit &&
OMPI_ERR_OUT_OF_RESOURCE == rc) {
OPAL_THREAD_LOCK(&mca_pml_ob1.lock);
opal_list_append(&mca_pml_ob1.rdma_pending, (opal_list_item_t*)frag);
Expand Down Expand Up @@ -413,6 +413,7 @@ static void mca_pml_ob1_rget_completion (mca_btl_base_module_t* btl, struct mca_
/* check completion status */
if (OPAL_UNLIKELY(OMPI_SUCCESS != status)) {
status = mca_pml_ob1_recv_request_get_frag_failed (frag, status);
/* fragment was returned or queue by the above call */
if (OPAL_UNLIKELY(OMPI_SUCCESS != status)) {
size_t skipped_bytes = recvreq->req_send_offset - recvreq->req_rdma_offset;
opal_output_verbose(mca_pml_ob1_output, 1, "pml:ob1: %s: operation failed with code %d", __func__, status);
Expand All @@ -435,12 +436,12 @@ static void mca_pml_ob1_rget_completion (mca_btl_base_module_t* btl, struct mca_
mca_pml_ob1_send_fin (recvreq->req_recv.req_base.req_proc,
bml_btl, frag->rdma_hdr.hdr_rget.hdr_frag,
frag->rdma_length, 0, 0);

MCA_PML_OB1_RDMA_FRAG_RETURN(frag);
}

recv_request_pml_complete_check(recvreq);

MCA_PML_OB1_RDMA_FRAG_RETURN(frag);

MCA_PML_OB1_PROGRESS_PENDING(bml_btl);
}

Expand Down
44 changes: 26 additions & 18 deletions ompi/mca/pml/ob1/pml_ob1_sendreq.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
* Copyright (c) 2018-2019 Triad National Security, LLC. All rights
* reserved.
* Copyright (c) 2022 IBM Corporation. All rights reserved.
* Copyright (c) 2024 Google, LLC. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
Expand Down Expand Up @@ -1110,6 +1111,12 @@ mca_pml_ob1_send_request_schedule_once(mca_pml_ob1_send_request_t* sendreq)

range = get_send_range(sendreq);

if (NULL != sendreq->rdma_frag) {
/* this request was first attempted with RDMA but is now using send/recv */
MCA_PML_OB1_RDMA_FRAG_RETURN(sendreq->rdma_frag);
sendreq->rdma_frag = NULL;
}

while(range && (false == sendreq->req_throttle_sends ||
sendreq->req_pipeline_depth < mca_pml_ob1.send_pipeline_depth)) {
mca_pml_ob1_frag_hdr_t* hdr;
Expand Down Expand Up @@ -1268,30 +1275,31 @@ static void mca_pml_ob1_send_request_put_frag_failed (mca_pml_ob1_rdma_frag_t *f
mca_pml_ob1_send_request_t* sendreq = (mca_pml_ob1_send_request_t *) frag->rdma_req;
mca_bml_base_btl_t *bml_btl = frag->rdma_bml;

if (++frag->retries < mca_pml_ob1.rdma_retries_limit && OMPI_ERR_OUT_OF_RESOURCE == rc) {
if (frag->retries < mca_pml_ob1.rdma_retries_limit && OMPI_ERR_OUT_OF_RESOURCE == rc) {
/* queue the frag for later if there was a resource error */
OPAL_THREAD_LOCK(&mca_pml_ob1.lock);
opal_list_append(&mca_pml_ob1.rdma_pending, (opal_list_item_t*)frag);
OPAL_THREAD_UNLOCK(&mca_pml_ob1.lock);
} else {
return;
}

#if OPAL_ENABLE_FT
if(!ompi_proc_is_active(sendreq->req_send.req_base.req_proc)) {
return;
}
#endif /* OPAL_ENABLE_FT */
/* tell receiver to deregister memory */
mca_pml_ob1_send_fin (sendreq->req_send.req_base.req_proc, bml_btl,
frag->rdma_hdr.hdr_rdma.hdr_frag, 0, MCA_BTL_NO_ORDER,
OPAL_ERR_TEMP_OUT_OF_RESOURCE);

/* send fragment by copy in/out */
mca_pml_ob1_send_request_copy_in_out(sendreq, frag->rdma_hdr.hdr_rdma.hdr_rdma_offset,
frag->rdma_length);
/* if a pointer to a receive request is not set it means that
* ACK was not yet received. Don't schedule sends before ACK */
if (NULL != sendreq->req_recv.pval)
mca_pml_ob1_send_request_schedule (sendreq);
if(!ompi_proc_is_active(sendreq->req_send.req_base.req_proc)) {
return;
}
#endif /* OPAL_ENABLE_FT */
/* tell receiver to deregister memory */
mca_pml_ob1_send_fin (sendreq->req_send.req_base.req_proc, bml_btl,
frag->rdma_hdr.hdr_rdma.hdr_frag, 0, MCA_BTL_NO_ORDER,
OPAL_ERR_TEMP_OUT_OF_RESOURCE);

/* send fragment by copy in/out */
mca_pml_ob1_send_request_copy_in_out(sendreq, frag->rdma_hdr.hdr_rdma.hdr_rdma_offset,
frag->rdma_length);
/* if a pointer to a receive request is not set it means that
* ACK was not yet received. Don't schedule sends before ACK */
if (NULL != sendreq->req_recv.pval)
mca_pml_ob1_send_request_schedule (sendreq);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion opal/datatype/opal_datatype_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ struct opal_datatype_t;
# define OPAL_DATATYPE_SAFEGUARD_POINTER(ACTPTR, LENGTH, INITPTR, PDATA, COUNT) \
{ \
unsigned char *__lower_bound = (INITPTR), *__upper_bound; \
assert(((LENGTH) != 0) && ((COUNT) != 0)); \
assert( (COUNT) != 0 ); \
__lower_bound += (PDATA)->true_lb; \
__upper_bound = (INITPTR) + (PDATA)->true_ub + \
((PDATA)->ub - (PDATA)->lb) * ((COUNT) -1); \
Expand Down
7 changes: 4 additions & 3 deletions opal/datatype/opal_datatype_position.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ static inline void position_single_block(opal_convertor_t *CONVERTOR, unsigned c
}

/**
* Advance the convertors' position according. Update the pointer and the remaining space
* accordingly.
* Advance the convertors' position according to account for *COUNT elements. Update
* the pointer and the remaining space accordingly.
*/
static inline void position_predefined_data(opal_convertor_t *CONVERTOR, dt_elem_desc_t *ELEM,
size_t *COUNT, unsigned char **POINTER, size_t *SPACE)
Expand All @@ -82,7 +82,8 @@ static inline void position_predefined_data(opal_convertor_t *CONVERTOR, dt_elem

if (cando_count > *(COUNT)) {
cando_count = *(COUNT);
}
} else if( 0 == cando_count )
return;

if (1 == _elem->blocklen) {
DO_DEBUG(opal_output(0,
Expand Down
14 changes: 0 additions & 14 deletions opal/mca/btl/sm/btl_sm_send.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,4 @@ int mca_btl_sm_send(struct mca_btl_base_module_t *btl, struct mca_btl_base_endpo
}

return OPAL_SUCCESS;

#if 0
if (((frag->hdr->flags & MCA_BTL_SM_FLAG_SINGLE_COPY) ||
!(frag->base.des_flags & MCA_BTL_DES_FLAGS_BTL_OWNERSHIP)) &&
frag->base.des_cbfunc) {
frag->base.des_flags |= MCA_BTL_DES_SEND_ALWAYS_CALLBACK;

return OPAL_SUCCESS;
}

/* data is gone (from the pml's perspective). frag callback/release will
happen later */
return 1;
#endif
}
30 changes: 20 additions & 10 deletions opal/mca/btl/uct/btl_uct_am.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ mca_btl_base_descriptor_t *mca_btl_uct_alloc(mca_btl_base_module_t *btl,
}

static inline void _mca_btl_uct_send_pack(void *data, void *header, size_t header_size,
opal_convertor_t *convertor, size_t payload_size)
opal_convertor_t *convertor, size_t* payload_size)
{
uint32_t iov_count = 1;
struct iovec iov;
Expand All @@ -64,11 +64,9 @@ static inline void _mca_btl_uct_send_pack(void *data, void *header, size_t heade

/* pack the data into the supplied buffer */
iov.iov_base = (IOVBASE_TYPE *) ((intptr_t) data + header_size);
iov.iov_len = length = payload_size;
iov.iov_len = *payload_size;

(void) opal_convertor_pack(convertor, &iov, &iov_count, &length);

assert(length == payload_size);
(void) opal_convertor_pack(convertor, &iov, &iov_count, payload_size);
}

struct mca_btl_base_descriptor_t *mca_btl_uct_prepare_src(mca_btl_base_module_t *btl,
Expand All @@ -92,7 +90,10 @@ struct mca_btl_base_descriptor_t *mca_btl_uct_prepare_src(mca_btl_base_module_t
}

_mca_btl_uct_send_pack((void *) ((intptr_t) frag->uct_iov.buffer + reserve), NULL, 0,
convertor, *size);
convertor, size);
/* update the length of the fragment according to the convertor packed data */
frag->segments[0].seg_len = reserve + *size;
frag->uct_iov.length = frag->segments[0].seg_len;
} else {
opal_convertor_get_current_pointer(convertor, &data_ptr);
assert(NULL != data_ptr);
Expand Down Expand Up @@ -286,7 +287,7 @@ static size_t mca_btl_uct_sendi_pack(void *data, void *arg)

am_header->value = args->am_header;
_mca_btl_uct_send_pack((void *) ((intptr_t) data + 8), args->header, args->header_size,
args->convertor, args->payload_size);
args->convertor, &args->payload_size);
return args->header_size + args->payload_size + 8;
}

Expand Down Expand Up @@ -329,9 +330,18 @@ int mca_btl_uct_sendi(mca_btl_base_module_t *btl, mca_btl_base_endpoint_t *endpo
} else if (msg_size < (size_t) MCA_BTL_UCT_TL_ATTR(uct_btl->am_tl, context->context_id)
.cap.am.max_short) {
int8_t *data = alloca(total_size);
_mca_btl_uct_send_pack(data, header, header_size, convertor, payload_size);
ucs_status = uct_ep_am_short(ep_handle, MCA_BTL_UCT_FRAG, am_header.value, data,
total_size);
size_t packed_payload_size = payload_size;
_mca_btl_uct_send_pack(data, header, header_size, convertor, &packed_payload_size);
if (packed_payload_size != payload_size) {
/* This should never happen as the packed data should go in a single pack. But
in case it does, fallback onto a descriptor allocation and let the caller
send the data.
*/
ucs_status = UCS_ERR_NO_RESOURCE;
} else {
ucs_status = uct_ep_am_short(ep_handle, MCA_BTL_UCT_FRAG, am_header.value, data,
total_size);
}
} else {
ssize_t size;

Expand Down
3 changes: 2 additions & 1 deletion opal/mca/btl/uct/btl_uct_module.c
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ mca_btl_uct_module_t mca_btl_uct_module_template = {

/* set the default flags for this btl. uct provides us with rdma and both
* fetching and non-fetching atomics (though limited to add and cswap) */
.btl_flags = MCA_BTL_FLAGS_RDMA | MCA_BTL_FLAGS_ATOMIC_FOPS | MCA_BTL_FLAGS_ATOMIC_OPS,
.btl_flags = MCA_BTL_FLAGS_RDMA | MCA_BTL_FLAGS_ATOMIC_FOPS | MCA_BTL_FLAGS_ATOMIC_OPS
| MCA_BTL_FLAGS_RDMA_REMOTE_COMPLETION,
.btl_atomic_flags = MCA_BTL_ATOMIC_SUPPORTS_ADD | MCA_BTL_ATOMIC_SUPPORTS_CSWAP
| MCA_BTL_ATOMIC_SUPPORTS_SWAP | MCA_BTL_ATOMIC_SUPPORTS_32BIT,

Expand Down
7 changes: 3 additions & 4 deletions opal/mca/btl/uct/btl_uct_tl.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,10 @@ static void mca_btl_uct_module_set_atomic_flags(mca_btl_uct_module_t *module, mc
uint64_t atomic_flags32 = MCA_BTL_UCT_TL_ATTR(tl, 0).cap.atomic32.fop_flags;
uint64_t atomic_flags64 = MCA_BTL_UCT_TL_ATTR(tl, 0).cap.atomic64.fop_flags;

/* NTH: don't really have a way to separate 32-bit and 64-bit right now */
uint64_t all_flags = atomic_flags32 & atomic_flags64;

module->super.btl_atomic_flags = 0;
uint64_t all_flags = atomic_flags64 | atomic_flags32;

module->super.btl_atomic_flags = (0 != atomic_flags32) ? MCA_BTL_ATOMIC_SUPPORTS_32BIT : 0;

if (cap_flags & UCT_IFACE_FLAG_ATOMIC_CPU) {
module->super.btl_atomic_flags |= MCA_BTL_ATOMIC_SUPPORTS_GLOB;
}
Expand Down

0 comments on commit 4537ce9

Please sign in to comment.