/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */ /* * (C) 2001 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */ /* Copyright (c) 2001-2022, The Ohio State University. All rights * reserved. * * This file is part of the MVAPICH2 software package developed by the * team members of The Ohio State University's Network-Based Computing * Laboratory (NBCL), headed by Professor Dhabaleswar K. (DK) Panda. * * For detailed copyright and licensing information, please refer to the * copyright file COPYRIGHT in the top level MVAPICH2 directory. * */ #if !defined(MPID_RMA_H_INCLUDED) #define MPID_RMA_H_INCLUDED #include "mpid_rma_types.h" #include "mpid_rma_oplist.h" #include "mpid_rma_shm.h" #include "mpid_rma_issue.h" #include "mpid_rma_lockqueue.h" /* TODO: Hari - see if we want to remove these completely */ #include "mpl_utlist.h" #include "mpidi_ch3_impl.h" #include #if defined(CHANNEL_MRAIL) #define MPIDI_CH3I_SHM_win_mutex_lock(win, rank) pthread_mutex_lock(&win->shm_win_mutex[rank]); #define MPIDI_CH3I_SHM_win_mutex_unlock(win, rank) pthread_mutex_unlock(&win->shm_win_mutex[rank]); #endif MPIR_T_PVAR_DOUBLE_TIMER_DECL_EXTERN(RMA, rma_wincreate_allgather); MPIR_T_PVAR_DOUBLE_TIMER_DECL_EXTERN(RMA, rma_winfree_rs); MPIR_T_PVAR_DOUBLE_TIMER_DECL_EXTERN(RMA, rma_winfree_complete); /* Routine use to tune RMA optimizations */ #if defined(CHANNEL_MRAIL) void MPIDI_CH3I_RDMA_win_create(void *base, MPI_Aint size, int comm_size, int rank, MPID_Win ** win_ptr, MPID_Comm * comm_ptr); void MPIDI_CH3I_RDMA_win_free(MPID_Win ** win_ptr); void MPIDI_CH3I_RDMA_start(MPID_Win * win_ptr, int start_grp_size, int *ranks_in_win_grp); void MPIDI_CH3I_RDMA_try_rma(MPID_Win * win_ptr, MPIDI_RMA_Target_t * target); int MPIDI_CH3I_RDMA_try_rma_op_fast( int type, void *origin_addr, int origin_count, MPI_Datatype origin_datatype, int target_rank, MPI_Aint target_disp, int target_count, MPI_Datatype target_datatype, void *compare_addr, void *result_addr, MPID_Win *win_ptr); int MPIDI_CH3I_RDMA_post(MPID_Win * win_ptr, int target_rank); int MPIDI_CH3I_RDMA_finish_rma(MPID_Win * win_ptr); int MPIDI_CH3I_RDMA_finish_rma_target(MPID_Win *win_ptr, int target_rank); int MPIDI_CH3I_RDMA_set_CC(MPID_Win *, int target_rank); int MPIDI_CH3I_barrier_in_rma(MPID_Win **win_ptr, int rank, int node_size, int comm_size); void mv2_init_rank_for_barrier (MPID_Win ** win_ptr); #endif /* defined(CHANNEL_MRAIL) */ #undef FUNCNAME #define FUNCNAME send_lock_msg #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) static inline int send_lock_msg(int dest, int lock_type, MPID_Win * win_ptr) { int mpi_errno = MPI_SUCCESS; MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_lock_t *lock_pkt = &upkt.lock; MPID_Request *req = NULL; MPIDI_VC_t *vc; #if defined(CHANNEL_MRAIL) && defined(MPID_USE_SEQUENCE_NUMBERS) MPID_Seqnum_t seqnum; #endif /* defined(CHANNEL_MRAIL) && defined(MPID_USE_SEQUENCE_NUMBERS) */ MPIDI_STATE_DECL(MPID_STATE_SEND_LOCK_MSG); MPIDI_RMA_FUNC_ENTER(MPID_STATE_SEND_LOCK_MSG); MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dest, &vc); MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK); lock_pkt->target_win_handle = win_ptr->basic_info_table[dest].win_handle; lock_pkt->source_win_handle = win_ptr->handle; lock_pkt->request_handle = MPI_REQUEST_NULL; lock_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE; if (lock_type == MPI_LOCK_SHARED) lock_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED; else { MPIU_Assert(lock_type == MPI_LOCK_EXCLUSIVE); lock_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE; } #if defined (CHANNEL_PSM) lock_pkt->source_rank = win_ptr->comm_ptr->rank; lock_pkt->target_rank = dest; lock_pkt->mapped_srank = win_ptr->rank_mapping[win_ptr->comm_ptr->rank]; lock_pkt->mapped_trank = win_ptr->rank_mapping[dest]; #endif /* CHANNEL_PSM */ #if defined(CHANNEL_MRAIL) MPIDI_VC_FAI_send_seqnum(vc, seqnum); MPIDI_Pkt_set_seqnum(lock_pkt, seqnum); #endif /* defined(CHANNEL_MRAIL) */ MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex); mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_pkt, sizeof(*lock_pkt), &req); MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex); MPIR_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg"); /* release the request returned by iStartMsg */ if (req != NULL) { MPID_Request_release(req); } fn_exit: MPIDI_RMA_FUNC_EXIT(MPID_STATE_SEND_LOCK_MSG); return mpi_errno; /* --BEGIN ERROR HANDLING-- */ fn_fail: goto fn_exit; /* --END ERROR HANDLING-- */ } #undef FUNCNAME #define FUNCNAME send_unlock_msg #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) static inline int send_unlock_msg(int dest, MPID_Win * win_ptr, MPIDI_CH3_Pkt_flags_t flags) { int mpi_errno = MPI_SUCCESS; MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_unlock_t *unlock_pkt = &upkt.unlock; MPID_Request *req = NULL; MPIDI_VC_t *vc; #if defined(CHANNEL_MRAIL) && defined(MPID_USE_SEQUENCE_NUMBERS) MPID_Seqnum_t seqnum; #endif /* defined(CHANNEL_MRAIL) && defined(MPID_USE_SEQUENCE_NUMBERS) */ MPIDI_STATE_DECL(MPID_STATE_SEND_UNLOCK_MSG); MPIDI_RMA_FUNC_ENTER(MPID_STATE_SEND_UNLOCK_MSG); MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dest, &vc); /* Send a lock packet over to the target. wait for the lock_granted * reply. Then do all the RMA ops. */ MPIDI_Pkt_init(unlock_pkt, MPIDI_CH3_PKT_UNLOCK); unlock_pkt->target_win_handle = win_ptr->basic_info_table[dest].win_handle; unlock_pkt->source_win_handle = win_ptr->handle; unlock_pkt->flags = flags; #if defined (CHANNEL_PSM) unlock_pkt->source_rank = win_ptr->comm_ptr->rank; unlock_pkt->mapped_trank = win_ptr->rank_mapping[dest]; unlock_pkt->mapped_srank = win_ptr->rank_mapping[win_ptr->comm_ptr->rank]; #endif #if defined(CHANNEL_MRAIL) MPIDI_VC_FAI_send_seqnum(vc, seqnum); MPIDI_Pkt_set_seqnum(unlock_pkt, seqnum); #endif /* defined(CHANNEL_MRAIL) */ MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex); mpi_errno = MPIDI_CH3_iStartMsg(vc, unlock_pkt, sizeof(*unlock_pkt), &req); MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex); MPIR_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg"); /* Release the request returned by iStartMsg */ if (req != NULL) { MPID_Request_release(req); } fn_exit: MPIDI_RMA_FUNC_EXIT(MPID_STATE_SEND_UNLOCK_MSG); return mpi_errno; /* --BEGIN ERROR HANDLING-- */ fn_fail: goto fn_exit; /* --END ERROR HANDLING-- */ } #undef FUNCNAME #define FUNCNAME MPIDI_CH3I_Send_lock_ack_pkt #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) static inline int MPIDI_CH3I_Send_lock_ack_pkt(MPIDI_VC_t * vc, MPID_Win * win_ptr, MPIDI_CH3_Pkt_flags_t flags, MPI_Win source_win_handle, MPI_Request request_handle) { MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_lock_ack_t *lock_ack_pkt = &upkt.lock_ack; MPID_Request *req = NULL; int mpi_errno; #if defined(CHANNEL_MRAIL) && defined(MPID_USE_SEQUENCE_NUMBERS) MPID_Seqnum_t seqnum; #endif /* defined(CHANNEL_MRAIL) && defined(MPID_USE_SEQUENCE_NUMBERS) */ MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_LOCK_ACK_PKT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SEND_LOCK_ACK_PKT); MPIU_Assert(!(source_win_handle != MPI_WIN_NULL && request_handle != MPI_REQUEST_NULL)); /* send lock ack packet */ MPIDI_Pkt_init(lock_ack_pkt, MPIDI_CH3_PKT_LOCK_ACK); lock_ack_pkt->source_win_handle = source_win_handle; lock_ack_pkt->request_handle = request_handle; lock_ack_pkt->target_rank = win_ptr->comm_ptr->rank; lock_ack_pkt->flags = flags; #if defined (CHANNEL_PSM) MPID_Win *winptr; MPID_Comm *commptr; MPID_Win_get_ptr(source_win_handle, winptr); commptr = winptr->comm_ptr; lock_ack_pkt->source_rank = commptr->rank; lock_ack_pkt->mapped_srank = winptr->rank_mapping[commptr->rank]; #endif #if defined(CHANNEL_MRAIL) MPIDI_VC_FAI_send_seqnum(vc, seqnum); MPIDI_Pkt_set_seqnum(lock_ack_pkt, seqnum); #endif /* defined(CHANNEL_MRAIL) */ MPIU_DBG_MSG_FMT(CH3_OTHER, VERBOSE, (MPIU_DBG_FDEST, "sending lock ack pkt on vc=%p, source_win_handle=%#08x", vc, lock_ack_pkt->source_win_handle)); MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex); mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_ack_pkt, sizeof(*lock_ack_pkt), &req); MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex); if (mpi_errno) { MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg"); } if (req != NULL) { MPID_Request_release(req); } fn_fail: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SEND_LOCK_ACK_PKT); return mpi_errno; } #undef FUNCNAME #define FUNCNAME MPIDI_CH3I_Send_lock_op_ack_pkt #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) static inline int MPIDI_CH3I_Send_lock_op_ack_pkt(MPIDI_VC_t * vc, MPID_Win * win_ptr, MPIDI_CH3_Pkt_flags_t flags, MPI_Win source_win_handle, MPI_Request request_handle) { MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_lock_op_ack_t *lock_op_ack_pkt = &upkt.lock_op_ack; MPID_Request *req = NULL; int mpi_errno; #if defined(CHANNEL_MRAIL) && defined(MPID_USE_SEQUENCE_NUMBERS) MPID_Seqnum_t seqnum; #endif /* defined(CHANNEL_MRAIL) && defined(MPID_USE_SEQUENCE_NUMBERS) */ MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT); MPIU_Assert(!(source_win_handle != MPI_WIN_NULL && request_handle != MPI_REQUEST_NULL)); /* send lock ack packet */ MPIDI_Pkt_init(lock_op_ack_pkt, MPIDI_CH3_PKT_LOCK_OP_ACK); lock_op_ack_pkt->source_win_handle = source_win_handle; lock_op_ack_pkt->request_handle = request_handle; lock_op_ack_pkt->target_rank = win_ptr->comm_ptr->rank; lock_op_ack_pkt->flags = flags; #if defined (CHANNEL_PSM) lock_op_ack_pkt->trank = vc->pg_rank; /* PSM channel needs to use the global rank */ lock_op_ack_pkt->mapped_trank = vc->pg_rank; #endif #if defined(CHANNEL_MRAIL) MPIDI_VC_FAI_send_seqnum(vc, seqnum); MPIDI_Pkt_set_seqnum(lock_op_ack_pkt, seqnum); #endif /* defined(CHANNEL_MRAIL) */ MPIU_DBG_MSG_FMT(CH3_OTHER, VERBOSE, (MPIU_DBG_FDEST, "sending lock op ack pkt on vc=%p, source_win_handle=%#08x", vc, lock_op_ack_pkt->source_win_handle)); MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex); mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_op_ack_pkt, sizeof(*lock_op_ack_pkt), &req); MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex); if (mpi_errno) { MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg"); } if (req != NULL) { MPID_Request_release(req); } fn_fail: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT); return mpi_errno; } #undef FUNCNAME #define FUNCNAME MPIDI_CH3I_Send_ack_pkt #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) static inline int MPIDI_CH3I_Send_ack_pkt(MPIDI_VC_t * vc, MPID_Win * win_ptr, MPI_Win source_win_handle) { MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_ack_t *ack_pkt = &upkt.ack; MPID_Request *req; int mpi_errno = MPI_SUCCESS; #if defined(CHANNEL_MRAIL) && defined(MPID_USE_SEQUENCE_NUMBERS) MPID_Seqnum_t seqnum; #endif /* defined(CHANNEL_MRAIL) && defined(MPID_USE_SEQUENCE_NUMBERS) */ MPIDI_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_ACK_PKT); MPIDI_FUNC_ENTER(MPID_STATE_MPIDI_CH3I_SEND_ACK_PKT); MPIDI_Pkt_init(ack_pkt, MPIDI_CH3_PKT_ACK); ack_pkt->source_win_handle = source_win_handle; ack_pkt->target_rank = win_ptr->comm_ptr->rank; #if defined(CHANNEL_MRAIL) MPIDI_VC_FAI_send_seqnum(vc, seqnum); MPIDI_Pkt_set_seqnum(ack_pkt, seqnum); #endif /* defined(CHANNEL_MRAIL) */ #if defined (CHANNEL_PSM) MPID_Win *winptr; MPID_Comm *commptr; MPID_Win_get_ptr(source_win_handle, winptr); commptr = winptr->comm_ptr; ack_pkt->source_rank = commptr->rank; ack_pkt->mapped_srank = winptr->rank_mapping[commptr->rank]; #endif /* Because this is in a packet handler, it is already within a critical section */ /* MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex); */ mpi_errno = MPIDI_CH3_iStartMsg(vc, ack_pkt, sizeof(*ack_pkt), &req); /* MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex); */ if (mpi_errno != MPI_SUCCESS) { MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg"); } if (req != NULL) { MPID_Request_release(req); } fn_fail: MPIDI_FUNC_EXIT(MPID_STATE_MPIDI_CH3I_SEND_ACK_PKT); return mpi_errno; } #undef FUNCNAME #define FUNCNAME send_decr_at_cnt_msg #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) static inline int send_decr_at_cnt_msg(int dst, MPID_Win * win_ptr, MPIDI_CH3_Pkt_flags_t flags) { MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_decr_at_counter_t *decr_at_cnt_pkt = &upkt.decr_at_cnt; MPIDI_VC_t *vc; MPID_Request *request = NULL; int mpi_errno = MPI_SUCCESS; #if defined(CHANNEL_MRAIL) && defined(MPID_USE_SEQUENCE_NUMBERS) MPID_Seqnum_t seqnum; #endif /* defined(CHANNEL_MRAIL) && defined(MPID_USE_SEQUENCE_NUMBERS) */ MPIDI_STATE_DECL(MPID_STATE_SEND_DECR_AT_CNT_MSG); MPIDI_RMA_FUNC_ENTER(MPID_STATE_SEND_DECR_AT_CNT_MSG); MPIDI_Pkt_init(decr_at_cnt_pkt, MPIDI_CH3_PKT_DECR_AT_COUNTER); decr_at_cnt_pkt->target_win_handle = win_ptr->basic_info_table[dst].win_handle; decr_at_cnt_pkt->source_win_handle = win_ptr->handle; decr_at_cnt_pkt->flags = flags; MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dst, &vc); #if defined (CHANNEL_PSM) decr_at_cnt_pkt->target_rank = dst; MPID_Win *winptr; MPID_Comm *commptr; MPID_Win_get_ptr(decr_at_cnt_pkt->source_win_handle, winptr); commptr = winptr->comm_ptr; decr_at_cnt_pkt->source_rank = commptr->rank; decr_at_cnt_pkt->mapped_srank = winptr->rank_mapping[commptr->rank]; decr_at_cnt_pkt->mapped_trank = winptr->rank_mapping[dst]; #endif #if defined(CHANNEL_MRAIL) MPIDI_VC_FAI_send_seqnum(vc, seqnum); MPIDI_Pkt_set_seqnum(decr_at_cnt_pkt, seqnum); #endif /* defined(CHANNEL_MRAIL) */ MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex); mpi_errno = MPIDI_CH3_iStartMsg(vc, decr_at_cnt_pkt, sizeof(*decr_at_cnt_pkt), &request); MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex); if (mpi_errno != MPI_SUCCESS) { MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg"); } if (request != NULL) { MPID_Request_release(request); } fn_exit: MPIDI_RMA_FUNC_EXIT(MPID_STATE_SEND_DECR_AT_CNT_MSG); return mpi_errno; /* --BEGIN ERROR HANDLING-- */ fn_fail: goto fn_exit; /* --END ERROR HANDLING-- */ } #undef FUNCNAME #define FUNCNAME send_flush_msg #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) static inline int send_flush_msg(int dest, MPID_Win * win_ptr) { int mpi_errno = MPI_SUCCESS; MPIDI_CH3_Pkt_t upkt; MPIDI_CH3_Pkt_flush_t *flush_pkt = &upkt.flush; MPID_Request *req = NULL; MPIDI_VC_t *vc; MPIDI_STATE_DECL(MPID_STATE_SEND_FLUSH_MSG); MPIDI_RMA_FUNC_ENTER(MPID_STATE_SEND_FLUSH_MSG); #if defined(CHANNEL_MRAIL) && defined(MPID_USE_SEQUENCE_NUMBERS) MPID_Seqnum_t seqnum; #endif /* defined(CHANNEL_MRAIL) && defined(MPID_USE_SEQUENCE_NUMBERS) */ MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dest, &vc); MPIDI_Pkt_init(flush_pkt, MPIDI_CH3_PKT_FLUSH); flush_pkt->target_win_handle = win_ptr->basic_info_table[dest].win_handle; flush_pkt->source_win_handle = win_ptr->handle; #if defined(CHANNEL_MRAIL) MPIDI_VC_FAI_send_seqnum(vc, seqnum); MPIDI_Pkt_set_seqnum(flush_pkt, seqnum); #endif /* defined(CHANNEL_MRAIL) */ MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex); mpi_errno = MPIDI_CH3_iStartMsg(vc, flush_pkt, sizeof(*flush_pkt), &req); MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex); MPIR_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg"); /* Release the request returned by iStartMsg */ if (req != NULL) { MPID_Request_release(req); } fn_exit: MPIDI_RMA_FUNC_EXIT(MPID_STATE_SEND_FLUSH_MSG); return mpi_errno; /* --BEGIN ERROR HANDLING-- */ fn_fail: goto fn_exit; /* --END ERROR HANDLING-- */ } /* enqueue an unsatisfied origin in passive target at target side. */ static inline int enqueue_lock_origin(MPID_Win * win_ptr, MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt, void * data, MPIDI_msg_sz_t * buflen, MPID_Request ** reqp) { MPIDI_RMA_Target_lock_entry_t *new_ptr = NULL; MPIDI_CH3_Pkt_flags_t flag; MPI_Win source_win_handle; MPI_Request request_handle; int lock_discarded = 0, data_discarded = 0; int mpi_errno = MPI_SUCCESS; (*reqp) = NULL; new_ptr = MPIDI_CH3I_Win_target_lock_entry_alloc(win_ptr, pkt); if (new_ptr != NULL) { MPIDI_RMA_Target_lock_entry_t **head_ptr = (MPIDI_RMA_Target_lock_entry_t **) (&(win_ptr->target_lock_queue_head)); MPL_DL_APPEND((*head_ptr), new_ptr); new_ptr->vc = vc; } else { lock_discarded = 1; } if (MPIDI_CH3I_RMA_PKT_IS_IMMED_OP(*pkt) || pkt->type == MPIDI_CH3_PKT_LOCK || pkt->type == MPIDI_CH3_PKT_GET) { /* return bytes of data processed in this pkt handler */ (*buflen) = sizeof(MPIDI_CH3_Pkt_t); if (new_ptr != NULL) new_ptr->all_data_recved = 1; goto issue_ack; } else { MPI_Aint type_size = 0; MPI_Aint type_extent; MPIDI_msg_sz_t recv_data_sz = 0; MPIDI_msg_sz_t buf_size = 0; MPID_Request *req = NULL; MPI_Datatype target_dtp; int target_count; int complete = 0; MPIDI_msg_sz_t data_len; MPIDI_CH3_Pkt_flags_t flags; /* This is PUT, ACC, GACC, FOP */ MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE((*pkt), target_dtp, mpi_errno); MPIDI_CH3_PKT_RMA_GET_TARGET_COUNT((*pkt), target_count, mpi_errno); MPIDI_CH3_PKT_RMA_GET_FLAGS((*pkt), flags, mpi_errno); MPID_Datatype_get_extent_macro(target_dtp, type_extent); MPID_Datatype_get_size_macro(target_dtp, type_size); if (pkt->type == MPIDI_CH3_PKT_PUT) { recv_data_sz = type_size * target_count; buf_size = type_extent * target_count; } else { MPI_Aint stream_elem_count; MPI_Aint total_len; MPI_Op op; MPIDI_CH3_PKT_RMA_GET_OP((*pkt), op, mpi_errno); if (op != MPI_NO_OP) { stream_elem_count = MPIDI_CH3U_SRBuf_size / type_extent; total_len = type_size * target_count; recv_data_sz = MPIR_MIN(total_len, type_size * stream_elem_count); buf_size = type_extent * (recv_data_sz / type_size); } } if (flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) { MPIU_Assert(pkt->type == MPIDI_CH3_PKT_ACCUMULATE || pkt->type == MPIDI_CH3_PKT_GET_ACCUM); /* Only basic datatype may contain piggyback lock. * Thus we do not check extended header type for derived case.*/ if (pkt->type == MPIDI_CH3_PKT_ACCUMULATE) { recv_data_sz += sizeof(MPIDI_CH3_Ext_pkt_accum_stream_t); buf_size += sizeof(MPIDI_CH3_Ext_pkt_accum_stream_t); } else { recv_data_sz += sizeof(MPIDI_CH3_Ext_pkt_get_accum_stream_t); buf_size += sizeof(MPIDI_CH3_Ext_pkt_get_accum_stream_t); } } if (new_ptr != NULL) { if (win_ptr->current_target_lock_data_bytes + buf_size < MPIR_CVAR_CH3_RMA_TARGET_LOCK_DATA_BYTES) { new_ptr->data = MPIU_Malloc(buf_size); } if (new_ptr->data == NULL) { /* Note that there are two possible reasons to make new_ptr->data to be NULL: * (1) win_ptr->current_target_lock_data_bytes + buf_size >= MPIR_CVAR_CH3_RMA_TARGET_LOCK_DATA_BYTES; * (2) MPIU_Malloc(buf_size) failed. * In such cases, we cannot allocate memory for lock data, so we give up * buffering lock data, however, we still buffer lock request. */ MPIDI_CH3_Pkt_t new_pkt; MPIDI_CH3_Pkt_lock_t *lock_pkt = &new_pkt.lock; MPI_Win target_win_handle; MPIDI_CH3_PKT_RMA_GET_TARGET_WIN_HANDLE((*pkt), target_win_handle, mpi_errno); if (!MPIDI_CH3I_RMA_PKT_IS_READ_OP(*pkt)) { MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno); request_handle = MPI_REQUEST_NULL; } else { source_win_handle = MPI_WIN_NULL; MPIDI_CH3_PKT_RMA_GET_REQUEST_HANDLE((*pkt), request_handle, mpi_errno); } MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK); lock_pkt->target_win_handle = target_win_handle; lock_pkt->source_win_handle = source_win_handle; lock_pkt->request_handle = request_handle; lock_pkt->flags = flags; /* replace original pkt with lock pkt */ new_ptr->pkt = new_pkt; new_ptr->all_data_recved = 1; data_discarded = 1; } else { win_ptr->current_target_lock_data_bytes += buf_size; new_ptr->buf_size = buf_size; } } /* create request to receive upcoming requests */ req = MPID_Request_create(); MPIU_Object_set_ref(req, 1); /* fill in area in req that will be used in Receive_data_found() */ if (lock_discarded || data_discarded) { req->dev.drop_data = TRUE; req->dev.user_buf = NULL; req->dev.user_count = target_count; req->dev.datatype = target_dtp; req->dev.recv_data_sz = recv_data_sz; req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete; req->dev.OnFinal = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete; req->dev.target_lock_queue_entry = new_ptr; data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t); MPIU_Assert(req->dev.recv_data_sz >= 0); } else { req->dev.user_buf = new_ptr->data; req->dev.user_count = target_count; req->dev.datatype = target_dtp; req->dev.recv_data_sz = recv_data_sz; req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete; req->dev.OnFinal = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete; req->dev.target_lock_queue_entry = new_ptr; data_len = *buflen - sizeof(MPIDI_CH3_Pkt_t); MPIU_Assert(req->dev.recv_data_sz >= 0); } mpi_errno = MPIDI_CH3U_Receive_data_found(req, data, &data_len, &complete); if (mpi_errno != MPI_SUCCESS) MPIR_ERR_POP(mpi_errno); /* return bytes of data processed in this pkt handler */ (*buflen) = sizeof(MPIDI_CH3_Pkt_t) + data_len; if (complete) { mpi_errno = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete(vc, req, &complete); if (mpi_errno != MPI_SUCCESS) MPIR_ERR_POP(mpi_errno); if (complete) { goto issue_ack; } } (*reqp) = req; } issue_ack: if (pkt->type == MPIDI_CH3_PKT_LOCK) { if (lock_discarded) flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED; else flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED; MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno); MPIDI_CH3_PKT_RMA_GET_REQUEST_HANDLE((*pkt), request_handle, mpi_errno); mpi_errno = MPIDI_CH3I_Send_lock_ack_pkt(vc, win_ptr, flag, source_win_handle, request_handle); if (mpi_errno != MPI_SUCCESS) MPIR_ERR_POP(mpi_errno); } else { if (lock_discarded) flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED; else if (data_discarded) flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_DISCARDED; else flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED; if (!MPIDI_CH3I_RMA_PKT_IS_READ_OP(*pkt)) { MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno); request_handle = MPI_REQUEST_NULL; } else { source_win_handle = MPI_WIN_NULL; MPIDI_CH3_PKT_RMA_GET_REQUEST_HANDLE((*pkt), request_handle, mpi_errno); } mpi_errno = MPIDI_CH3I_Send_lock_op_ack_pkt(vc, win_ptr, flag, source_win_handle, request_handle); if (mpi_errno != MPI_SUCCESS) MPIR_ERR_POP(mpi_errno); } fn_exit: return mpi_errno; fn_fail: goto fn_exit; } static inline int handle_lock_ack(MPID_Win * win_ptr, int target_rank, MPIDI_CH3_Pkt_flags_t flags) { MPIDI_RMA_Target_t *t = NULL; int mpi_errno = MPI_SUCCESS; MPIU_Assert(win_ptr->states.access_state == MPIDI_RMA_PER_TARGET || win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED || win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_ISSUED); if (win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED) { MPIDI_VC_t *orig_vc = NULL, *target_vc = NULL; MPIDI_Comm_get_vc(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &orig_vc); MPIDI_Comm_get_vc(win_ptr->comm_ptr, target_rank, &target_vc); if (win_ptr->comm_ptr->rank == target_rank || (win_ptr->shm_allocated == TRUE && orig_vc->node_id == target_vc->node_id)) { if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) { win_ptr->outstanding_locks--; MPIU_Assert(win_ptr->outstanding_locks >= 0); } else if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) { /* re-send lock request message. */ mpi_errno = send_lock_msg(target_rank, MPI_LOCK_SHARED, win_ptr); if (mpi_errno != MPI_SUCCESS) MPIR_ERR_POP(mpi_errno); } goto fn_exit; } } else if (win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_ISSUED) { if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) { win_ptr->outstanding_locks--; MPIU_Assert(win_ptr->outstanding_locks >= 0); if (win_ptr->outstanding_locks == 0) { win_ptr->states.access_state = MPIDI_RMA_LOCK_ALL_GRANTED; if (win_ptr->num_targets_with_pending_net_ops) { mpi_errno = MPIDI_CH3I_Win_set_active(win_ptr); if (mpi_errno != MPI_SUCCESS) { MPIR_ERR_POP(mpi_errno); } } } } else if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) { /* re-send lock request message. */ mpi_errno = send_lock_msg(target_rank, MPI_LOCK_SHARED, win_ptr); if (mpi_errno != MPI_SUCCESS) MPIR_ERR_POP(mpi_errno); } goto fn_exit; } mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &t); if (mpi_errno != MPI_SUCCESS) MPIR_ERR_POP(mpi_errno); MPIU_Assert(t != NULL); if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) { t->access_state = MPIDI_RMA_LOCK_GRANTED; if (t->pending_net_ops_list_head) MPIDI_CH3I_Win_set_active(win_ptr); } if (win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_GRANTED || t->access_state == MPIDI_RMA_LOCK_GRANTED) { if (t->pending_net_ops_list_head == NULL) { int made_progress ATTRIBUTE((unused)) = 0; mpi_errno = MPIDI_CH3I_RMA_Make_progress_target(win_ptr, t->target_rank, &made_progress); if (mpi_errno != MPI_SUCCESS) { MPIR_ERR_POP(mpi_errno); } } } if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) t->access_state = MPIDI_RMA_LOCK_CALLED; fn_exit: return mpi_errno; fn_fail: goto fn_exit; } #undef FUNCNAME #define FUNCNAME check_and_set_req_completion #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) static inline int check_and_set_req_completion(MPID_Win * win_ptr, MPIDI_RMA_Target_t * target, MPIDI_RMA_Op_t * rma_op, int *op_completed) { int i, mpi_errno = MPI_SUCCESS; int incomplete_req_cnt = 0; MPID_Request **req = NULL; MPIDI_STATE_DECL(MPID_STATE_CHECK_AND_SET_REQ_COMPLETION); MPIDI_RMA_FUNC_ENTER(MPID_STATE_CHECK_AND_SET_REQ_COMPLETION); (*op_completed) = FALSE; for (i = 0; i < rma_op->reqs_size; i++) { if (rma_op->reqs_size == 1) req = &(rma_op->single_req); else req = &(rma_op->multi_reqs[i]); if ((*req) == NULL) continue; if (MPID_Request_is_complete((*req))) { MPID_Request_release((*req)); (*req) = NULL; } else { (*req)->request_completed_cb = MPIDI_CH3_Req_handler_rma_op_complete; (*req)->dev.source_win_handle = win_ptr->handle; (*req)->dev.rma_target_ptr = target; incomplete_req_cnt++; if (rma_op->ureq != NULL) { MPID_cc_set(&(rma_op->ureq->cc), incomplete_req_cnt); (*req)->dev.request_handle = rma_op->ureq->handle; } MPID_Request_release((*req)); } } if (incomplete_req_cnt == 0) { if (rma_op->ureq != NULL) { mpi_errno = MPID_Request_complete(rma_op->ureq); if (mpi_errno != MPI_SUCCESS) { MPIR_ERR_POP(mpi_errno); } } (*op_completed) = TRUE; } else { MPIDI_CH3I_RMA_Active_req_cnt += incomplete_req_cnt; target->num_pkts_wait_for_local_completion += incomplete_req_cnt; } MPIDI_CH3I_RMA_Ops_free_elem(win_ptr, &(target->pending_net_ops_list_head), rma_op); if (target->pending_net_ops_list_head == NULL) { win_ptr->num_targets_with_pending_net_ops--; MPIU_Assert(win_ptr->num_targets_with_pending_net_ops >= 0); if (win_ptr->num_targets_with_pending_net_ops == 0) { MPIDI_CH3I_Win_set_inactive(win_ptr); } } fn_exit: MPIDI_RMA_FUNC_EXIT(MPID_STATE_CHECK_AND_SET_REQ_COMPLETION); return mpi_errno; fn_fail: goto fn_exit; } static inline int handle_lock_ack_with_op(MPID_Win * win_ptr, int target_rank, MPIDI_CH3_Pkt_flags_t flags) { MPIDI_RMA_Target_t *target = NULL; MPIDI_RMA_Op_t *op = NULL; MPIDI_CH3_Pkt_flags_t op_flags = MPIDI_CH3_PKT_FLAG_NONE; int op_completed ATTRIBUTE((unused)) = FALSE; int mpi_errno = MPI_SUCCESS; mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &target); if (mpi_errno != MPI_SUCCESS) MPIR_ERR_POP(mpi_errno); MPIU_Assert(target != NULL); /* Here the next_op_to_issue pointer should still point to the OP piggybacked * with LOCK */ op = target->next_op_to_issue; MPIU_Assert(op != NULL); MPIDI_CH3_PKT_RMA_GET_FLAGS(op->pkt, op_flags, mpi_errno); MPIU_Assert(op_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED || op_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE); if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) { if ((op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE || op->pkt.type == MPIDI_CH3_PKT_GET_ACCUM) && op->issued_stream_count != ALL_STREAM_UNITS_ISSUED) { /* Now we successfully issue out the first stream unit, * keep next_op_to_issue still stick to the current op * since we need to issue the following stream units. */ goto fn_exit; } /* We are done with the current operation, make next_op_to_issue points to the * next operation. */ target->next_op_to_issue = op->next; if (target->next_op_to_issue == NULL) { if (((target->sync.sync_flag == MPIDI_RMA_SYNC_FLUSH) && (op_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH)) || ((target->sync.sync_flag == MPIDI_RMA_SYNC_UNLOCK) && (op_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))) { /* We are done with ending sync, unset target's sync_flag. */ target->sync.sync_flag = MPIDI_RMA_SYNC_NONE; } } check_and_set_req_completion(win_ptr, target, op, &op_completed); } else if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_DISCARDED || flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) { /* We need to re-transmit this operation, so we destroy * the internal request and erase all flags in current * operation. */ if (op->reqs_size == 1) { MPIU_Assert(op->single_req != NULL); MPID_Request_release(op->single_req); op->single_req = NULL; op->reqs_size = 0; } else if (op->reqs_size > 1) { MPIU_Assert(op->multi_reqs != NULL && op->multi_reqs[0] != NULL); MPID_Request_release(op->multi_reqs[0]); /* free req array in this op */ MPIU_Free(op->multi_reqs); op->multi_reqs = NULL; op->reqs_size = 0; } MPIDI_CH3_PKT_RMA_ERASE_FLAGS(op->pkt, mpi_errno); op->issued_stream_count = 0; } fn_exit: return mpi_errno; fn_fail: goto fn_exit; } #undef FUNCNAME #define FUNCNAME acquire_local_lock #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) static inline int acquire_local_lock(MPID_Win * win_ptr, int lock_type) { int mpi_errno = MPI_SUCCESS; MPIDI_STATE_DECL(MPID_STATE_ACQUIRE_LOCAL_LOCK); MPIDI_RMA_FUNC_ENTER(MPID_STATE_ACQUIRE_LOCAL_LOCK); MPIR_T_PVAR_TIMER_START(RMA, rma_winlock_getlocallock); if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_type) == 1) { mpi_errno = handle_lock_ack(win_ptr, win_ptr->comm_ptr->rank, MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED); if (mpi_errno) MPIR_ERR_POP(mpi_errno); } else { /* Queue the lock information. */ MPIDI_CH3_Pkt_t pkt; MPIDI_CH3_Pkt_lock_t *lock_pkt = &pkt.lock; MPIDI_RMA_Target_lock_entry_t *new_ptr = NULL; MPIDI_VC_t *my_vc; MPIDI_RMA_Target_lock_entry_t **head_ptr = (MPIDI_RMA_Target_lock_entry_t **) (&(win_ptr->target_lock_queue_head)); MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK); lock_pkt->flags = MPIDI_CH3_PKT_FLAG_NONE; if (lock_type == MPI_LOCK_SHARED) lock_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED; else { MPIU_Assert(lock_type == MPI_LOCK_EXCLUSIVE); lock_pkt->flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE; } new_ptr = MPIDI_CH3I_Win_target_lock_entry_alloc(win_ptr, &pkt); if (new_ptr == NULL) { mpi_errno = handle_lock_ack(win_ptr, win_ptr->comm_ptr->rank, MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED); if (mpi_errno != MPI_SUCCESS) MPIR_ERR_POP(mpi_errno); goto fn_exit; } MPL_DL_APPEND((*head_ptr), new_ptr); MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &my_vc); new_ptr->vc = my_vc; new_ptr->all_data_recved = 1; } fn_exit: MPIR_T_PVAR_TIMER_END(RMA, rma_winlock_getlocallock); MPIDI_RMA_FUNC_EXIT(MPID_STATE_ACQUIRE_LOCAL_LOCK); return mpi_errno; /* --BEGIN ERROR HANDLING-- */ fn_fail: goto fn_exit; /* --END ERROR HANDLING-- */ } #undef FUNCNAME #define FUNCNAME MPIDI_CH3I_RMA_Handle_ack #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) static inline int MPIDI_CH3I_RMA_Handle_ack(MPID_Win * win_ptr, int target_rank) { int mpi_errno = MPI_SUCCESS; MPIDI_RMA_Target_t *t; mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &t); if (mpi_errno != MPI_SUCCESS) MPIR_ERR_POP(mpi_errno); t->sync.outstanding_acks--; MPIU_Assert(t->sync.outstanding_acks >= 0); win_ptr->outstanding_acks--; MPIU_Assert(win_ptr->outstanding_acks >= 0); fn_exit: return mpi_errno; fn_fail: goto fn_exit; } #undef FUNCNAME #define FUNCNAME do_accumulate_op #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) static inline int do_accumulate_op(void *source_buf, int source_count, MPI_Datatype source_dtp, void *target_buf, int target_count, MPI_Datatype target_dtp, MPI_Aint stream_offset, MPI_Op acc_op) { int mpi_errno = MPI_SUCCESS; MPI_User_function *uop = NULL; MPI_Aint source_dtp_size = 0, source_dtp_extent = 0; int is_empty_source = FALSE; MPIDI_STATE_DECL(MPID_STATE_DO_ACCUMULATE_OP); MPIDI_FUNC_ENTER(MPID_STATE_DO_ACCUMULATE_OP); /* first Judge if source buffer is empty */ if (acc_op == MPI_NO_OP) is_empty_source = TRUE; if (is_empty_source == FALSE) { MPIU_Assert(MPIR_DATATYPE_IS_PREDEFINED(source_dtp)); MPID_Datatype_get_size_macro(source_dtp, source_dtp_size); MPID_Datatype_get_extent_macro(source_dtp, source_dtp_extent); } if (HANDLE_GET_KIND(acc_op) == HANDLE_KIND_BUILTIN) { /* get the function by indexing into the op table */ uop = MPIR_OP_HDL_TO_FN(acc_op); } else { /* --BEGIN ERROR HANDLING-- */ mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OP, "**opnotpredefined", "**opnotpredefined %d", acc_op); return mpi_errno; /* --END ERROR HANDLING-- */ } if (is_empty_source == TRUE || MPIR_DATATYPE_IS_PREDEFINED(target_dtp)) { /* directly apply op if target dtp is predefined dtp OR source buffer is empty */ MPI_Aint real_stream_offset; void *curr_target_buf; if (is_empty_source == FALSE) { MPIU_Assert(source_dtp == target_dtp); real_stream_offset = (stream_offset / source_dtp_size) * source_dtp_extent; curr_target_buf = (void *) ((char *) target_buf + real_stream_offset); } else { curr_target_buf = target_buf; } (*uop) (source_buf, curr_target_buf, &source_count, &source_dtp); } else { /* derived datatype */ MPID_Segment *segp; DLOOP_VECTOR *dloop_vec; MPI_Aint first, last; int vec_len, i, count; MPI_Aint type_extent, type_size; MPI_Datatype type; MPID_Datatype *dtp; MPI_Aint curr_len; void *curr_loc; int accumulated_count; segp = MPID_Segment_alloc(); /* --BEGIN ERROR HANDLING-- */ if (!segp) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); MPIDI_FUNC_EXIT(MPID_STATE_DO_ACCUMULATE_OP); return mpi_errno; } /* --END ERROR HANDLING-- */ MPID_Segment_init(NULL, target_count, target_dtp, segp, 0); first = stream_offset; last = first + source_count * source_dtp_size; MPID_Datatype_get_ptr(target_dtp, dtp); vec_len = dtp->max_contig_blocks * target_count + 1; /* +1 needed because Rob says so */ dloop_vec = (DLOOP_VECTOR *) MPIU_Malloc(vec_len * sizeof(DLOOP_VECTOR)); /* --BEGIN ERROR HANDLING-- */ if (!dloop_vec) { mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**nomem", 0); MPIDI_FUNC_EXIT(MPID_STATE_DO_ACCUMULATE_OP); return mpi_errno; } /* --END ERROR HANDLING-- */ MPID_Segment_pack_vector(segp, first, &last, dloop_vec, &vec_len); type = dtp->basic_type; MPIU_Assert(type != MPI_DATATYPE_NULL); MPIU_Assert(type == source_dtp); type_size = source_dtp_size; type_extent = source_dtp_extent; i = 0; curr_loc = dloop_vec[0].DLOOP_VECTOR_BUF; curr_len = dloop_vec[0].DLOOP_VECTOR_LEN; accumulated_count = 0; while (i != vec_len) { if (curr_len < type_size) { MPIU_Assert(i != vec_len); i++; curr_len += dloop_vec[i].DLOOP_VECTOR_LEN; continue; } MPIU_Assign_trunc(count, curr_len / type_size, int); (*uop) ((char *) source_buf + type_extent * accumulated_count, (char *) target_buf + MPIU_PtrToAint(curr_loc), &count, &type); if (curr_len % type_size == 0) { i++; if (i != vec_len) { curr_loc = dloop_vec[i].DLOOP_VECTOR_BUF; curr_len = dloop_vec[i].DLOOP_VECTOR_LEN; } } else { curr_loc = (void *) ((char *) curr_loc + type_extent * count); curr_len -= type_size * count; } accumulated_count += count; } MPID_Segment_free(segp); MPIU_Free(dloop_vec); } MPIDI_FUNC_EXIT(MPID_STATE_DO_ACCUMULATE_OP); return mpi_errno; } static inline int check_piggyback_lock(MPID_Win * win_ptr, MPIDI_VC_t * vc, MPIDI_CH3_Pkt_t * pkt, void * data, MPIDI_msg_sz_t * buflen, int *acquire_lock_fail, MPID_Request ** reqp) { int lock_type; MPIDI_CH3_Pkt_flags_t flags; int mpi_errno = MPI_SUCCESS; (*acquire_lock_fail) = 0; (*reqp) = NULL; MPIDI_CH3_PKT_RMA_GET_FLAGS((*pkt), flags, mpi_errno); if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED || flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) { if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED) { lock_type = MPI_LOCK_SHARED; } else { MPIU_Assert(flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE); lock_type = MPI_LOCK_EXCLUSIVE; } if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_type) == 0) { /* cannot acquire the lock, queue up this operation. */ mpi_errno = enqueue_lock_origin(win_ptr, vc, pkt, data, buflen, reqp); if (mpi_errno != MPI_SUCCESS) { MPIR_ERR_POP(mpi_errno); (*acquire_lock_fail) = 1; } } } fn_exit: return mpi_errno; fn_fail: goto fn_exit; } static inline int finish_op_on_target(MPID_Win * win_ptr, MPIDI_VC_t * vc, int has_response_data, MPIDI_CH3_Pkt_flags_t flags, MPI_Win source_win_handle) { int mpi_errno = MPI_SUCCESS; if (!has_response_data) { /* This is PUT or ACC */ if (flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED || flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) { MPIDI_CH3_Pkt_flags_t pkt_flags = MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED; if ((flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) || (flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK)) pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_ACK; MPIU_Assert(source_win_handle != MPI_WIN_NULL); mpi_errno = MPIDI_CH3I_Send_lock_op_ack_pkt(vc, win_ptr, pkt_flags, source_win_handle, MPI_REQUEST_NULL); if (mpi_errno != MPI_SUCCESS) MPIR_ERR_POP(mpi_errno); MPIDI_CH3_Progress_signal_completion(); } if (flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) { if (!(flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED || flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)) { /* If op is piggybacked with both LOCK and FLUSH, * we only send LOCK ACK back, do not send FLUSH ACK. */ mpi_errno = MPIDI_CH3I_Send_ack_pkt(vc, win_ptr, source_win_handle); if (mpi_errno) MPIR_ERR_POP(mpi_errno); } MPIDI_CH3_Progress_signal_completion(); } if (flags & MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER) { win_ptr->at_completion_counter--; MPIU_Assert(win_ptr->at_completion_counter >= 0); /* Signal the local process when the op counter reaches 0. */ if (win_ptr->at_completion_counter == 0) MPIDI_CH3_Progress_signal_completion(); } if (flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK) { if (!(flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED || flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)) { /* If op is piggybacked with both LOCK and UNLOCK, * we only send LOCK ACK back, do not send FLUSH (UNLOCK) ACK. */ mpi_errno = MPIDI_CH3I_Send_ack_pkt(vc, win_ptr, source_win_handle); if (mpi_errno) MPIR_ERR_POP(mpi_errno); } mpi_errno = MPIDI_CH3I_Release_lock(win_ptr); if (mpi_errno) MPIR_ERR_POP(mpi_errno); MPIDI_CH3_Progress_signal_completion(); } } else { /* This is GACC / GET / CAS / FOP */ if (flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK) { mpi_errno = MPIDI_CH3I_Release_lock(win_ptr); if (mpi_errno) MPIR_ERR_POP(mpi_errno); MPIDI_CH3_Progress_signal_completion(); } if (flags & MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER) { win_ptr->at_completion_counter--; MPIU_Assert(win_ptr->at_completion_counter >= 0); /* Signal the local process when the op counter reaches 0. */ if (win_ptr->at_completion_counter == 0) MPIDI_CH3_Progress_signal_completion(); } } fn_exit: return mpi_errno; fn_fail: goto fn_exit; } static inline int fill_ranks_in_win_grp(MPID_Win * win_ptr, MPID_Group * group_ptr, int *ranks_in_win_grp) { int mpi_errno = MPI_SUCCESS; int i, *ranks_in_grp; MPID_Group *win_grp_ptr; MPIU_CHKLMEM_DECL(1); MPIDI_STATE_DECL(MPID_STATE_FILL_RANKS_IN_WIN_GRP); MPIDI_RMA_FUNC_ENTER(MPID_STATE_FILL_RANKS_IN_WIN_GRP); MPIU_CHKLMEM_MALLOC(ranks_in_grp, int *, group_ptr->size * sizeof(int), mpi_errno, "ranks_in_grp"); for (i = 0; i < group_ptr->size; i++) ranks_in_grp[i] = i; mpi_errno = MPIR_Comm_group_impl(win_ptr->comm_ptr, &win_grp_ptr); if (mpi_errno != MPI_SUCCESS) MPIR_ERR_POP(mpi_errno); mpi_errno = MPIR_Group_translate_ranks_impl(group_ptr, group_ptr->size, ranks_in_grp, win_grp_ptr, ranks_in_win_grp); if (mpi_errno != MPI_SUCCESS) MPIR_ERR_POP(mpi_errno); mpi_errno = MPIR_Group_free_impl(win_grp_ptr); if (mpi_errno != MPI_SUCCESS) MPIR_ERR_POP(mpi_errno); fn_exit: MPIU_CHKLMEM_FREEALL(); MPIDI_RMA_FUNC_EXIT(MPID_STATE_FILL_RANKS_IN_WIN_GRP); return mpi_errno; fn_fail: goto fn_exit; } static inline int wait_progress_engine(void) { int mpi_errno = MPI_SUCCESS; MPID_Progress_state progress_state; MPID_Progress_start(&progress_state); mpi_errno = MPID_Progress_wait(&progress_state); /* --BEGIN ERROR HANDLING-- */ if (mpi_errno != MPI_SUCCESS) { MPID_Progress_end(&progress_state); MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**winnoprogress"); } /* --END ERROR HANDLING-- */ MPID_Progress_end(&progress_state); fn_exit: return mpi_errno; fn_fail: goto fn_exit; } static inline int poke_progress_engine(void) { int mpi_errno = MPI_SUCCESS; MPID_Progress_state progress_state; MPID_Progress_start(&progress_state); mpi_errno = MPID_Progress_poke(); if (mpi_errno != MPI_SUCCESS) MPIR_ERR_POP(mpi_errno); MPID_Progress_end(&progress_state); fn_exit: return mpi_errno; fn_fail: goto fn_exit; } static inline void MPIDI_CH3_ExtPkt_Accum_get_stream(MPIDI_CH3_Pkt_flags_t flags, int is_derived_dt, void *ext_hdr_ptr, MPI_Aint * stream_offset) { if ((flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) && is_derived_dt) { MPIU_Assert(ext_hdr_ptr != NULL); (*stream_offset) = ((MPIDI_CH3_Ext_pkt_accum_stream_derived_t *) ext_hdr_ptr)->stream_offset; } else if (flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) { MPIU_Assert(ext_hdr_ptr != NULL); (*stream_offset) = ((MPIDI_CH3_Ext_pkt_accum_stream_t *) ext_hdr_ptr)->stream_offset; } } static inline void MPIDI_CH3_ExtPkt_Gaccum_get_stream(MPIDI_CH3_Pkt_flags_t flags, int is_derived_dt, void *ext_hdr_ptr, MPI_Aint * stream_offset) { /* We do not check packet match here, because error must have already been * reported at header init time (on origin) and at packet receive time (on target). */ MPIDI_CH3_ExtPkt_Accum_get_stream(flags, is_derived_dt, ext_hdr_ptr, stream_offset); } #endif /* MPID_RMA_H_INCLUDED */