From 8b2486710c3ffbb4f6be694b30995caa58fa5277 Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Fri, 5 Jul 2024 16:44:25 -0500 Subject: [PATCH] fcoll/vulcan: add support for GPU aggregation buffers If the user user input buffers are GPU device memory, use also GPU device memory for the aggregation step. This will allow the data transfer to occur between GPU buffers, and hence take advantage of the much higher GPU-GPU interconnects (e.g. XGMI, NVLINK, etc.). The downside of this approach is that we cannot call directly into the fbtl ipwritev routine, but have to go through the common_ompio_file_iwrite_pregen routine, which performs the necessary segmenting and staging through the host memory. Signed-off-by: Edgar Gabriel --- ompi/mca/common/ompio/common_ompio.h | 2 + .../common/ompio/common_ompio_file_write.c | 69 +++++++++++++++- ompi/mca/fcoll/vulcan/fcoll_vulcan.h | 1 + .../vulcan/fcoll_vulcan_file_write_all.c | 82 ++++++++++++++----- ompi/mca/io/ompio/io_ompio.c | 5 +- ompi/mca/io/ompio/io_ompio.h | 4 +- ompi/mca/io/ompio/io_ompio_component.c | 12 ++- 7 files changed, 149 insertions(+), 26 deletions(-) diff --git a/ompi/mca/common/ompio/common_ompio.h b/ompi/mca/common/ompio/common_ompio.h index f6261dff078..1ddb0b47d7f 100644 --- a/ompi/mca/common/ompio/common_ompio.h +++ b/ompi/mca/common/ompio/common_ompio.h @@ -262,6 +262,8 @@ OMPI_DECLSPEC int mca_common_ompio_file_write_at (ompio_file_t *fh, OMPI_MPI_OFF OMPI_DECLSPEC int mca_common_ompio_file_iwrite (ompio_file_t *fh, const void *buf, size_t count, struct ompi_datatype_t *datatype, ompi_request_t **request); +OMPI_DECLSPEC int mca_common_ompio_file_iwrite_pregen (ompio_file_t *fh, ompi_request_t *request); + OMPI_DECLSPEC int mca_common_ompio_file_iwrite_at (ompio_file_t *fh, OMPI_MPI_OFFSET_TYPE offset, const void *buf, size_t count, struct ompi_datatype_t *datatype, ompi_request_t **request); diff --git a/ompi/mca/common/ompio/common_ompio_file_write.c b/ompi/mca/common/ompio/common_ompio_file_write.c index b970346f7a2..b5d3900e7a3 100644 --- a/ompi/mca/common/ompio/common_ompio_file_write.c +++ b/ompi/mca/common/ompio/common_ompio_file_write.c @@ -12,7 +12,7 @@ * Copyright (c) 2008-2019 University of Houston. All rights reserved. * Copyright (c) 2015-2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. - * Copyright (c) 2022-2023 Advanced Micro Devices, Inc. All rights reserved. + * Copyright (c) 2022-2024 Advanced Micro Devices, Inc. All rights reserved. * Copyright (c) 2024 Triad National Security, LLC. All rights * reserved. * $COPYRIGHT$ @@ -329,6 +329,7 @@ static void mca_common_ompio_post_next_write_subreq(struct mca_ompio_request_t * decoded_iov.iov_base = req->req_tbuf; decoded_iov.iov_len = req->req_size; opal_convertor_pack (&req->req_convertor, &decoded_iov, &iov_count, &pos); + mca_common_ompio_build_io_array (req->req_fview, index, req->req_num_subreqs, bytes_per_cycle, pos, iov_count, &decoded_iov, @@ -472,6 +473,72 @@ int mca_common_ompio_file_iwrite (ompio_file_t *fh, return ret; } +/* +** This routine is invoked from the fcoll component. +** It is only used if the temporary buffer is a gpu buffer, +** and the fbtl supports the ipwritev operation. +** +** The io-array has already been generated in fcoll/xxx/file_write_all, +** and we use the pre-computed offsets to created a pseudo fview. +** The position of the file pointer is updated in the fcoll +** component, not here. +*/ + +int mca_common_ompio_file_iwrite_pregen (ompio_file_t *fh, + ompi_request_t *request) +{ + uint32_t i; + size_t max_data; + size_t pipeline_buf_size; + mca_ompio_request_t *ompio_req = (mca_ompio_request_t *) request; + + if (NULL == fh->f_fbtl->fbtl_ipwritev) { + return MPI_ERR_INTERN; + } + + max_data = fh->f_io_array[0].length; + pipeline_buf_size = OMPIO_MCA_GET(fh, pipeline_buffer_size); + + mca_common_ompio_register_progress (); + + OMPIO_PREPARE_BUF (fh, fh->f_io_array[0].memory_address, max_data, MPI_BYTE, + ompio_req->req_tbuf, &ompio_req->req_convertor, max_data, + pipeline_buf_size, NULL, i); + + ompio_req->req_num_subreqs = ceil((double)max_data/pipeline_buf_size); + ompio_req->req_size = pipeline_buf_size; + ompio_req->req_max_data = max_data; + ompio_req->req_post_next_subreq = mca_common_ompio_post_next_write_subreq; + ompio_req->req_fh = fh; + ompio_req->req_ompi.req_status.MPI_ERROR = MPI_SUCCESS; + + ompio_req->req_fview = (struct ompio_fview_t *) calloc(1, sizeof(struct ompio_fview_t)); + if (NULL == ompio_req->req_fview) { + opal_output(1, "common_ompio: error allocating memory\n"); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + ompio_req->req_fview->f_decoded_iov = (struct iovec*) malloc ( fh->f_num_of_io_entries * + sizeof(struct iovec)); + if (NULL == ompio_req->req_fview->f_decoded_iov) { + opal_output(1, "common_ompio_file_iwrite_pregen: could not allocate memory\n"); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + ompio_req->req_fview->f_iov_count = fh->f_num_of_io_entries; + for (i=0; i < ompio_req->req_fview->f_iov_count; i++) { + ompio_req->req_fview->f_decoded_iov[i].iov_base = fh->f_io_array[i].offset; + ompio_req->req_fview->f_decoded_iov[i].iov_len = fh->f_io_array[i].length ; + } + + fh->f_num_of_io_entries = 0; + free (fh->f_io_array); + fh->f_io_array = NULL; + + mca_common_ompio_post_next_write_subreq(ompio_req, 0); + return OMPI_SUCCESS; +} + int mca_common_ompio_file_iwrite_at (ompio_file_t *fh, OMPI_MPI_OFFSET_TYPE offset, const void *buf, diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan.h b/ompi/mca/fcoll/vulcan/fcoll_vulcan.h index ebddf429a5c..a2fd6ca82bc 100644 --- a/ompi/mca/fcoll/vulcan/fcoll_vulcan.h +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan.h @@ -43,6 +43,7 @@ extern int mca_fcoll_vulcan_priority; extern int mca_fcoll_vulcan_num_groups; extern int mca_fcoll_vulcan_write_chunksize; extern int mca_fcoll_vulcan_async_io; +extern int mca_fcoll_vulcan_use_accelerator_buffers; OMPI_DECLSPEC extern mca_fcoll_base_component_3_0_0_t mca_fcoll_vulcan_component; diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c b/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c index 8318b0ef374..b928f795bfe 100644 --- a/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c @@ -15,6 +15,7 @@ * Copyright (c) 2023 Jeffrey M. Squyres. All rights reserved. * Copyright (c) 2024 Triad National Security, LLC. All rights * reserved. + * Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -30,10 +31,12 @@ #include "ompi/mca/fcoll/fcoll.h" #include "ompi/mca/fcoll/base/fcoll_base_coll_array.h" #include "ompi/mca/common/ompio/common_ompio.h" +#include "ompi/mca/common/ompio/common_ompio_buffer.h" #include "ompi/mca/io/io.h" #include "ompi/mca/common/ompio/common_ompio_request.h" #include "math.h" #include "ompi/mca/pml/pml.h" +#include "opal/mca/accelerator/accelerator.h" #include #define DEBUG_ON 0 @@ -88,13 +91,12 @@ typedef struct mca_io_ompio_aggregator_data { _aggr[_i]->prev_recvtype=(ompi_datatype_t **)_t; } \ } - - static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data, ompi_request_t **reqs ); static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, - int write_chunksize, int write_synchType, ompi_request_t **request); + int write_chunksize, int write_synchType, ompi_request_t **request, + bool is_accelerator_buffer); int mca_fcoll_vulcan_break_file_view ( struct iovec *decoded_iov, int iov_count, struct iovec *local_iov_array, int local_count, struct iovec ***broken_decoded_iovs, int **broken_iov_counts, @@ -155,6 +157,8 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, ompi_count_array_t fview_count_desc; ompi_disp_array_t displs_desc; + int is_gpu, is_managed; + bool use_accelerator_buffer = false; #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN double write_time = 0.0, start_write_time = 0.0, end_write_time = 0.0; @@ -180,6 +184,11 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, goto exit; } + mca_common_ompio_check_gpu_buf (fh, buf, &is_gpu, &is_managed); + if (is_gpu && !is_managed && + fh->f_get_mca_parameter_value ("use_accelerator_buffers", strlen("use_accelerator_buffers"))) { + use_accelerator_buffer = true; + } /* since we want to overlap 2 iterations, define the bytes_per_cycle to be half of what the user requested */ bytes_per_cycle =bytes_per_cycle/2; @@ -529,13 +538,31 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, goto exit; } - - aggr_data[i]->global_buf = (char *) malloc (bytes_per_cycle); - aggr_data[i]->prev_global_buf = (char *) malloc (bytes_per_cycle); - if (NULL == aggr_data[i]->global_buf || NULL == aggr_data[i]->prev_global_buf){ - opal_output(1, "OUT OF MEMORY"); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; + if (use_accelerator_buffer) { + opal_output_verbose(10, ompi_fcoll_base_framework.framework_output, + "Allocating GPU device buffer for aggregation\n"); + ret = opal_accelerator.mem_alloc(MCA_ACCELERATOR_NO_DEVICE_ID, (void**)&aggr_data[i]->global_buf, + bytes_per_cycle); + if (OPAL_SUCCESS != ret) { + opal_output(1, "Could not allocate accelerator memory"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + ret = opal_accelerator.mem_alloc(MCA_ACCELERATOR_NO_DEVICE_ID, (void**)&aggr_data[i]->prev_global_buf, + bytes_per_cycle); + if (OPAL_SUCCESS != ret) { + opal_output(1, "Could not allocate accelerator memory"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + } else { + aggr_data[i]->global_buf = (char *) malloc (bytes_per_cycle); + aggr_data[i]->prev_global_buf = (char *) malloc (bytes_per_cycle); + if (NULL == aggr_data[i]->global_buf || NULL == aggr_data[i]->prev_global_buf){ + opal_output(1, "OUT OF MEMORY"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } } aggr_data[i]->recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group * @@ -605,7 +632,7 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, start_write_time = MPI_Wtime(); #endif ret = write_init (fh, fh->f_aggr_list[aggr_index], aggr_data[aggr_index], - write_chunksize, write_synch_type, &req_iwrite); + write_chunksize, write_synch_type, &req_iwrite, use_accelerator_buffer); if (OMPI_SUCCESS != ret){ goto exit; } @@ -645,7 +672,7 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, start_write_time = MPI_Wtime(); #endif ret = write_init (fh, fh->f_aggr_list[aggr_index], aggr_data[aggr_index], - write_chunksize, write_synch_type, &req_iwrite); + write_chunksize, write_synch_type, &req_iwrite, use_accelerator_buffer); if (OMPI_SUCCESS != ret){ goto exit; } @@ -704,8 +731,13 @@ exit : free (aggr_data[i]->disp_index); free (aggr_data[i]->max_disp_index); - free (aggr_data[i]->global_buf); - free (aggr_data[i]->prev_global_buf); + if (use_accelerator_buffer) { + opal_accelerator.mem_release(MCA_ACCELERATOR_NO_DEVICE_ID, aggr_data[i]->global_buf); + opal_accelerator.mem_release(MCA_ACCELERATOR_NO_DEVICE_ID, aggr_data[i]->prev_global_buf); + } else { + free (aggr_data[i]->global_buf); + free (aggr_data[i]->prev_global_buf); + } for(l=0;lprocs_per_group;l++){ free (aggr_data[i]->blocklen_per_process[l]); free (aggr_data[i]->displs_per_process[l]); @@ -749,7 +781,8 @@ static int write_init (ompio_file_t *fh, mca_io_ompio_aggregator_data *aggr_data, int write_chunksize, int write_synchType, - ompi_request_t **request ) + ompi_request_t **request, + bool is_accelerator_buffer) { int ret = OMPI_SUCCESS; ssize_t ret_temp = 0; @@ -770,11 +803,20 @@ static int write_init (ompio_file_t *fh, write_chunksize); if (1 == write_synchType) { - ret = fh->f_fbtl->fbtl_ipwritev(fh, (ompi_request_t *) ompio_req); - if(0 > ret) { - opal_output (1, "vulcan_write_all: fbtl_ipwritev failed\n"); - ompio_req->req_ompi.req_status.MPI_ERROR = ret; - ompio_req->req_ompi.req_status._ucount = 0; + if (is_accelerator_buffer) { + ret = mca_common_ompio_file_iwrite_pregen(fh, (ompi_request_t *) ompio_req); + if(0 > ret) { + opal_output (1, "vulcan_write_all: mca_common_ompio_iwrite_pregen failed\n"); + ompio_req->req_ompi.req_status.MPI_ERROR = ret; + ompio_req->req_ompi.req_status._ucount = 0; + } + } else { + ret = fh->f_fbtl->fbtl_ipwritev(fh, (ompi_request_t *) ompio_req); + if(0 > ret) { + opal_output (1, "vulcan_write_all: fbtl_ipwritev failed\n"); + ompio_req->req_ompi.req_status.MPI_ERROR = ret; + ompio_req->req_ompi.req_status._ucount = 0; + } } } else { diff --git a/ompi/mca/io/ompio/io_ompio.c b/ompi/mca/io/ompio/io_ompio.c index 2cc27e23ddc..506b6897e46 100644 --- a/ompi/mca/io/ompio/io_ompio.c +++ b/ompi/mca/io/ompio/io_ompio.c @@ -15,7 +15,7 @@ * Copyright (c) 2012-2013 Inria. All rights reserved. * Copyright (c) 2015-2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. - * Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved. + * Copyright (c) 2022-2024 Advanced Micro Devices, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -556,6 +556,9 @@ int mca_io_ompio_get_mca_parameter_value ( char *mca_parameter_name, int name_le else if ( !strncmp ( mca_parameter_name, "coll_timing_info", name_length )) { return mca_io_ompio_coll_timing_info; } + else if ( !strncmp (mca_parameter_name, "use_accelerator_buffers", name_length)) { + return mca_io_ompio_use_accelerator_buffers; + } else { opal_output (1, "Error in mca_io_ompio_get_mca_parameter_value: unknown parameter name"); } diff --git a/ompi/mca/io/ompio/io_ompio.h b/ompi/mca/io/ompio/io_ompio.h index 272c4e4a0c6..d38bfb1acb3 100644 --- a/ompi/mca/io/ompio/io_ompio.h +++ b/ompi/mca/io/ompio/io_ompio.h @@ -14,7 +14,7 @@ * Copyright (c) 2015-2018 Research Organization for Information Science * and Technology (RIST). All rights reserved. * Copyright (c) 2016-2017 IBM Corporation. All rights reserved. - * Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved. + * Copyright (c) 2022-2024 Advanced Micro Devices, Inc. All rights reserved. * Copyright (c) 2024 Triad National Security, LLC. All rights * reserved. * $COPYRIGHT$ @@ -57,7 +57,7 @@ extern int mca_io_ompio_max_aggregators_ratio; extern int mca_io_ompio_aggregators_cutoff_threshold; extern int mca_io_ompio_overwrite_amode; extern int mca_io_ompio_verbose_info_parsing; - +extern int mca_io_ompio_use_accelerator_buffers; OMPI_DECLSPEC extern int mca_io_ompio_coll_timing_info; #define QUEUESIZE 2048 diff --git a/ompi/mca/io/ompio/io_ompio_component.c b/ompi/mca/io/ompio/io_ompio_component.c index a8151315f40..38c4e8c3105 100644 --- a/ompi/mca/io/ompio/io_ompio_component.c +++ b/ompi/mca/io/ompio/io_ompio_component.c @@ -17,7 +17,7 @@ * and Technology (RIST). All rights reserved. * Copyright (c) 2016-2017 IBM Corporation. All rights reserved. * Copyright (c) 2018 DataDirect Networks. All rights reserved. - * Copyright (c) 2022-2023 Advanced Micro Devices, Inc. All rights reserved. + * Copyright (c) 2022-2024 Advanced Micro Devices, Inc. All rights reserved. * Copyright (c) 2024 Triad National Security, LLC. All rights * reserved. * $COPYRIGHT$ @@ -49,7 +49,7 @@ int mca_io_ompio_max_aggregators_ratio=8; int mca_io_ompio_aggregators_cutoff_threshold=3; int mca_io_ompio_overwrite_amode = 1; int mca_io_ompio_verbose_info_parsing = 0; - +int mca_io_ompio_use_accelerator_buffers = 1; int mca_io_ompio_grouping_option=5; /* @@ -263,6 +263,14 @@ static int register_component(void) MCA_BASE_VAR_SCOPE_READONLY, &mca_io_ompio_verbose_info_parsing); + mca_io_ompio_use_accelerator_buffers = 1; + (void) mca_base_component_var_register(&mca_io_ompio_component.io_version, + "use_accelerator_buffers", "Allow using accelerator buffers" + "for data aggregation in collective I/O if input buffer is device memory", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, &mca_io_ompio_use_accelerator_buffers); + return OMPI_SUCCESS; }