From c74aabb228d3409ac1713f760fc5d53bb9b5543b Mon Sep 17 00:00:00 2001 From: Andre Vehreschild Date: Fri, 25 Oct 2024 16:10:40 +0200 Subject: [PATCH] WIP: Add get_by_ct call. --- CMakeLists.txt | 3 + src/application-binary-interface/libcaf.h | 9 ++ src/runtime-libraries/mpi/mpi_caf.c | 137 +++++++++++++++++++--- 3 files changed, 131 insertions(+), 18 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 77b1cdf8..430b1e96 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -233,6 +233,9 @@ endif() if ( gfortran_compiler AND ( NOT CMAKE_Fortran_COMPILER_VERSION VERSION_LESS 8.0.0 ) ) add_definitions(-DGCC_GE_8) # Tell library to build against GFortran 8.x bindings w/ descriptor change endif() + if ( gfortran_compiler AND ( NOT CMAKE_Fortran_COMPILER_VERSION VERSION_LESS 14.0.0 ) ) + add_definitions(-DGCC_GE_15) # Tell library to build against GFortran 15.x bindings + endif() if(gfortran_compiler) set(OLD_REQUIRED_FLAGS ${CMAKE_REQUIRED_FLAGS}) diff --git a/src/application-binary-interface/libcaf.h b/src/application-binary-interface/libcaf.h index d88f1537..946c370c 100644 --- a/src/application-binary-interface/libcaf.h +++ b/src/application-binary-interface/libcaf.h @@ -261,6 +261,15 @@ void PREFIX(caf_sendget)(caf_token_t, size_t, int, gfc_descriptor_t *, gfc_descriptor_t *, caf_vector_t *, int, int, bool, int *); +#ifdef GCC_GE_15 +void PREFIX(get_by_ct)( + caf_token_t token, int image_index, size_t bufsize, + void *set_buf /*void (*set)(void *buffer, void *set_data)*/, void *set_data, + void (*get)(void **buffer, bool *free_buffer, void *base, void *get_data), + void *get_data, size_t get_data_size, int *stat, + caf_team_t *team __attribute__((unused)), + int *team_number __attribute__((unused))); +#endif #ifdef GCC_GE_8 void PREFIX(get_by_ref)(caf_token_t, int, gfc_descriptor_t *dst, caf_reference_t *refs, int dst_kind, int src_kind, diff --git a/src/runtime-libraries/mpi/mpi_caf.c b/src/runtime-libraries/mpi/mpi_caf.c index ea93818c..8400700d 100644 --- a/src/runtime-libraries/mpi/mpi_caf.c +++ b/src/runtime-libraries/mpi/mpi_caf.c @@ -253,7 +253,7 @@ typedef struct int dest_image; int dest_tag; int flags; - void (*access)(void *dst, void *base, void *data); + void (*access)(void **dst, bool *free_dst, void *base, void *data); char data[]; } ct_msg_t; @@ -436,6 +436,9 @@ communication_thread(void *) MPI_Status status; MPI_Message msg_han; MPI_Comm comm; + void *baseptr, *buffer; + int flag; + bool free_buffer; dprint("ct: Started.\n"); @@ -451,37 +454,41 @@ communication_thread(void *) { MPI_Get_count(&status, MPI_BYTE, &cnt); - ct_msg_t *msg; if (cnt >= sizeof(ct_msg_t)) { - msg = alloca(cnt); + ct_msg_t *msg = alloca(cnt); + ierr = MPI_Mrecv(msg, cnt, MPI_BYTE, &msg_han, &status); chk_err(ierr); dprint("ct: Received request of size %ld.\n", cnt); - void *bptr; - int flag; - ierr = MPI_Win_get_attr(msg->win, MPI_WIN_BASE, &bptr, &flag); + ierr = MPI_Win_get_attr(msg->win, MPI_WIN_BASE, &baseptr, &flag); chk_err(ierr); - dprint("ct: Local base for win %ld is %p (set: %b).\n", msg->win, bptr, - flag); + dprint("ct: Local base for win %ld is %p (set: %b) Executing getter at " + "%p.\n", + msg->win, baseptr, flag, msg->access); if (!flag) { dprint("ct: Error: Window %p memory is not allocated.\n", msg->win); } + msg->access(&buffer, &free_buffer, baseptr, msg->data); + dprint("ct: getter executed.\n"); comm = (msg->flags & CT_INTER_CT) ? ct_COMM : CAF_COMM_WORLD; dprint("ct: Sending %ld bytes to image %d, tag %d on comm %x (%s).\n", msg->transfer_size, msg->dest_image, msg->dest_tag, comm, comm == CAF_COMM_WORLD ? "CAF_COMM_WORLD" : "ct_COMM"); - ierr = MPI_Send(bptr, msg->transfer_size, MPI_BYTE, msg->dest_image, + ierr = MPI_Send(buffer, msg->transfer_size, MPI_BYTE, msg->dest_image, msg->dest_tag, comm); chk_err(ierr); + if (free_buffer) + free(buffer); } else if (!commthread_running) { /* Pickup empty message. */ dprint("ct: Got termination message. Terminating.\n"); - ierr = MPI_Mrecv(&msg, cnt, MPI_BYTE, &msg_han, &status); + baseptr = NULL; + ierr = MPI_Mrecv(baseptr, cnt, MPI_BYTE, &msg_han, &status); chk_err(ierr); } else @@ -3669,6 +3676,13 @@ PREFIX(send)(caf_token_t token, size_t offset, int image_index, } } +void +get_access(void **dst, bool *dst_is_tmp, void *base, void *) +{ + *dst = base; + *dst_is_tmp = false; +} + /* Get array data from a remote src to a local dest. */ void @@ -3810,18 +3824,19 @@ PREFIX(get)(caf_token_t token, size_t offset, int image_index, { const size_t trans_size = ((dst_size > src_size) ? src_size : dst_size) * size; - ct_msg_t *buf = alloca(sizeof(ct_msg_t)); - buf->win = *p; - buf->transfer_size = trans_size; - buf->dest_image = mpi_this_image; - buf->dest_tag = CAF_CT_TAG + 1; - buf->flags = 0; - ierr = MPI_Send(buf, sizeof(ct_msg_t), MPI_BYTE, remote_image, + ct_msg_t *msg = alloca(sizeof(ct_msg_t)); + msg->win = *p; + msg->transfer_size = trans_size; + msg->dest_image = mpi_this_image; + msg->dest_tag = CAF_CT_TAG + 1; + msg->flags = 0; + msg->access = &get_access; + ierr = MPI_Send(msg, sizeof(ct_msg_t), MPI_BYTE, remote_image, CAF_CT_TAG, ct_COMM); chk_err(ierr); ierr = MPI_Recv(dest->base_addr, trans_size, MPI_BYTE, image_index - 1, - buf->dest_tag, CAF_COMM_WORLD, MPI_STATUS_IGNORE); + msg->dest_tag, CAF_COMM_WORLD, MPI_STATUS_IGNORE); chk_err(ierr); // CAF_Win_lock(MPI_LOCK_SHARED, remote_image, *p); @@ -4890,6 +4905,92 @@ get_for_ref(caf_reference_t *ref, size_t *i, size_t dst_index, } } +#ifdef GCC_GE_15 +void +PREFIX(get_by_ct)( + caf_token_t token, int image_index, size_t bufsize, void *set_buf, + /*void (*set)(void *buffer, void *set_data), */ void *set_data, + void (*get)(void **buffer, bool *free_buffer, void *base, void *get_data), + void *get_data, size_t get_data_size, int *stat, + caf_team_t *team __attribute__((unused)), + int *team_number __attribute__((unused))) +{ + MPI_Group current_team_group, win_group; + int ierr, this_image, remote_image; + int trans_ranks[2]; + bool free_t_buff, free_msg; + void *t_buff; + ct_msg_t *msg; + const size_t msg_size = sizeof(ct_msg_t) + get_data_size; + + if (stat) + *stat = 0; + + // Get mapped remote image + ierr = MPI_Comm_group(CAF_COMM_WORLD, ¤t_team_group); + chk_err(ierr); + ierr = MPI_Win_get_group(*TOKEN(token), &win_group); + chk_err(ierr); + ierr = MPI_Group_translate_ranks(current_team_group, 2, + (int[]){image_index - 1, mpi_this_image}, + win_group, trans_ranks); + chk_err(ierr); + remote_image = trans_ranks[0]; + this_image = trans_ranks[1]; + ierr = MPI_Group_free(¤t_team_group); + chk_err(ierr); + ierr = MPI_Group_free(&win_group); + chk_err(ierr); + + check_image_health(remote_image, stat); + + dprint("Entering get_by_ct(), win_rank = %d, this_rank = %d, getter: %p.\n", + remote_image, this_image, get); + + // create get msg + if ((free_msg = (((msg = alloca(msg_size))) == NULL))) + { + msg = malloc(msg_size); + if (msg == NULL) + caf_runtime_error("Unable to allocate memory " + "for internal message in get_by_ct()."); + } + msg->win = *TOKEN(token); + msg->transfer_size = bufsize; + msg->dest_image = mpi_this_image; + msg->dest_tag = CAF_CT_TAG + 1; + msg->flags = 0; + msg->access = get; + memcpy(msg->data, get_data, get_data_size); + + // call get on remote + ierr = MPI_Send(msg, msg_size, MPI_BYTE, remote_image, CAF_CT_TAG, ct_COMM); + chk_err(ierr); + + // allocate local buffer + if ((free_t_buff = (((t_buff = alloca(bufsize))) == NULL))) + { + t_buff = malloc(bufsize); + if (t_buff == NULL) + caf_runtime_error("Unable to allocate memory " + "for internal buffer in get_by_ct()."); + } + ierr = MPI_Recv(t_buff, bufsize, MPI_BYTE, image_index - 1, msg->dest_tag, + CAF_COMM_WORLD, MPI_STATUS_IGNORE); + chk_err(ierr); + + // set (buffer, set_data) + memcpy(set_buf, t_buff, bufsize); + // set(t_buff, set_data); + + // free (buffer) + if (free_msg) + free(msg); + if (free_t_buff) + free(t_buff); +} +#endif + void PREFIX(get_by_ref)(caf_token_t token, int image_index, gfc_descriptor_t *dst, caf_reference_t *refs, int dst_kind, int src_kind,