/* -*- Mode: c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */ /* * (C) 2011 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */ /* Copyright (c) 2001-2022, The Ohio State University. All rights * reserved. * * This file is part of the MVAPICH2 software package developed by the * team members of The Ohio State University's Network-Based Computing * Laboratory (NBCL), headed by Professor Dhabaleswar K. (DK) Panda. * * For detailed copyright and licensing information, please refer to the * copyright file COPYRIGHT in the top level MVAPICH2 directory. * */ #include "mpidimpl.h" #include "mpl_utlist.h" #include "helper_fns.h" /* A random guess at an appropriate value, we can tune it later. It could also * be a real tunable parameter. */ #define MPIDU_SCHED_INITIAL_ENTRIES (16) #if 0 #define dprintf fprintf #else /* FIXME this requires VA_ARGS macros */ #define dprintf(...) do{}while(0) #endif /* helper macros to improve code readability */ /* we pessimistically assume that MPI_DATATYPE_NULL may be passed as a "valid" type * for send/recv when MPI_PROC_NULL is the destination/src */ #define dtype_add_ref_if_not_builtin(datatype_) \ do { \ if ((datatype_) != MPI_DATATYPE_NULL && \ HANDLE_GET_KIND((datatype_)) != HANDLE_KIND_BUILTIN) \ { \ MPID_Datatype *dtp_ = NULL; \ MPID_Datatype_get_ptr((datatype_), dtp_); \ MPID_Datatype_add_ref(dtp_); \ } \ } while (0) #define dtype_release_if_not_builtin(datatype_) \ do { \ if ((datatype_) != MPI_DATATYPE_NULL && \ HANDLE_GET_KIND((datatype_)) != HANDLE_KIND_BUILTIN) \ { \ MPID_Datatype *dtp_ = NULL; \ MPID_Datatype_get_ptr((datatype_), dtp_); \ MPID_Datatype_release(dtp_); \ } \ } while (0) /* TODO move to a header somewhere? */ void MPIDU_Sched_dump(struct MPIDU_Sched *s); void MPIDU_Sched_dump_fh(struct MPIDU_Sched *s, FILE *fh); struct MPIDU_Sched_state { struct MPIDU_Sched *head; /* no need for a tail with utlist */ }; /* holds on to all incomplete schedules on which progress should be made */ struct MPIDU_Sched_state all_schedules = {NULL}; #if defined(CHANNEL_MRAIL) static int sched_in_continue = 0; #endif static int nbc_progress_hook_id = -1; /* returns TRUE if any schedules are currently pending completion by the * progress engine, FALSE otherwise */ #undef FUNCNAME #define FUNCNAME MPIDU_Sched_are_pending #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) int MPIDU_Sched_are_pending(void) { return (all_schedules.head != NULL); } #undef FUNCNAME #define FUNCNAME MPID_Sched_next_tag #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) int MPID_Sched_next_tag(MPID_Comm *comm_ptr, int *tag) { int mpi_errno = MPI_SUCCESS; /* TODO there should be an internal accessor/utility macro for getting the * TAG_UB value that doesn't require using the attribute interface */ int tag_ub = MPIR_Process.attrs.tag_ub; #if defined(HAVE_ERROR_CHECKING) int start = MPI_UNDEFINED; int end = MPI_UNDEFINED; struct MPIDU_Sched *elt = NULL; #endif *tag = comm_ptr->next_sched_tag; ++comm_ptr->next_sched_tag; #if defined(HAVE_ERROR_CHECKING) /* Upon entry into the second half of the tag space, ensure there are no * outstanding schedules still using the second half of the space. Check * the first half similarly on wraparound. */ if (comm_ptr->next_sched_tag == (tag_ub / 2)) { start = tag_ub / 2; end = tag_ub; } else if (comm_ptr->next_sched_tag == (tag_ub)) { start = MPIR_FIRST_NBC_TAG; end = tag_ub / 2; } if (start != MPI_UNDEFINED) { MPL_DL_FOREACH(all_schedules.head, elt) { if (elt->tag >= start && elt->tag < end) { MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**toomanynbc"); } } } #endif /* wrap the tag values around to the start, but don't allow it to conflict * with the tags used by the blocking collectives */ if (comm_ptr->next_sched_tag == tag_ub) { comm_ptr->next_sched_tag = MPIR_FIRST_NBC_TAG; } fn_fail: return mpi_errno; } #undef FUNCNAME #define FUNCNAME MPIDU_Sched_start_entry #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) /* initiates the schedule entry "e" in the NBC described by "s", where * "e" is at "idx" in "s". This means posting nonblocking sends/recvs, * performing reductions, calling callbacks, etc. */ static int MPIDU_Sched_start_entry(struct MPIDU_Sched *s, size_t idx, struct MPIDU_Sched_entry *e) { int mpi_errno = MPI_SUCCESS, ret_errno = MPI_SUCCESS; MPID_Request *r = s->req; MPID_Comm *comm; MPIU_Assert(e->status == MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED); switch (e->type) { case MPIDU_SCHED_ENTRY_SEND: comm = e->u.send.comm; MPIU_DBG_MSG_D(COMM, VERBOSE, "starting SEND entry %d\n", (int) idx); if (e->u.send.count_p) { /* deferred send */ /* originally there was no branch and send.count_p was set to * &send.count, but this requires patching up the pointers * during realloc of entries, so this is easier */ ret_errno = MPIC_Isend(e->u.send.buf, *e->u.send.count_p, e->u.send.datatype, e->u.send.dest, s->tag, comm, &e->u.send.sreq, &r->errflag); } else { if (e->u.send.is_sync) { ret_errno = MPIC_Issend(e->u.send.buf, e->u.send.count, e->u.send.datatype, e->u.send.dest, s->tag, comm, &e->u.send.sreq, &r->errflag); } else { ret_errno = MPIC_Isend(e->u.send.buf, e->u.send.count, e->u.send.datatype, e->u.send.dest, s->tag, comm, &e->u.send.sreq, &r->errflag); } } /* Check if the error is actually fatal to the NBC or we can continue. */ if (unlikely(ret_errno)) { if (MPIR_ERR_NONE == r->errflag) { if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(ret_errno)) { r->errflag = MPIR_ERR_PROC_FAILED; } else { r->errflag = MPIR_ERR_OTHER; } } e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED; MPIU_DBG_MSG_D(COMM, VERBOSE, "Sched SEND failed. Errflag: %d\n", (int) r->errflag); } else { e->status = MPIDU_SCHED_ENTRY_STATUS_STARTED; } break; case MPIDU_SCHED_ENTRY_RECV: MPIU_DBG_MSG_D(COMM, VERBOSE, "starting RECV entry %d\n", (int) idx); comm = e->u.recv.comm; ret_errno = MPIC_Irecv(e->u.recv.buf, e->u.recv.count, e->u.recv.datatype, e->u.recv.src, s->tag, comm, &e->u.recv.rreq); /* Check if the error is actually fatal to the NBC or we can continue. */ if (unlikely(ret_errno)) { if (MPIR_ERR_NONE == r->errflag) { if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(ret_errno)) { r->errflag = MPIR_ERR_PROC_FAILED; } else { r->errflag = MPIR_ERR_OTHER; } } /* We should set the status to failed here - since the request is not freed. this * will be handled later in MPIDU_Sched_progress_state, so set to started here */ e->status = MPIDU_SCHED_ENTRY_STATUS_STARTED; MPIU_DBG_MSG_D(COMM, VERBOSE, "Sched RECV failed. Errflag: %d\n", (int) r->errflag); } else { e->status = MPIDU_SCHED_ENTRY_STATUS_STARTED; } break; case MPIDU_SCHED_ENTRY_REDUCE: MPIU_DBG_MSG_D(COMM, VERBOSE, "starting REDUCE entry %d\n", (int) idx); mpi_errno = MPIR_Reduce_local_impl(e->u.reduce.inbuf, e->u.reduce.inoutbuf, e->u.reduce.count, e->u.reduce.datatype, e->u.reduce.op); if (mpi_errno) MPIR_ERR_POP(mpi_errno); if (HANDLE_GET_KIND(e->u.reduce.op) != HANDLE_KIND_BUILTIN) { MPID_Op *op_ptr = NULL; MPID_Op_get_ptr(e->u.reduce.op, op_ptr); MPIR_Op_release(op_ptr); } dtype_release_if_not_builtin(e->u.reduce.datatype); e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE; break; case MPIDU_SCHED_ENTRY_COPY: MPIU_DBG_MSG_D(COMM, VERBOSE, "starting COPY entry %d\n", (int) idx); mpi_errno = MPIR_Localcopy(e->u.copy.inbuf, e->u.copy.incount, e->u.copy.intype, e->u.copy.outbuf, e->u.copy.outcount, e->u.copy.outtype); if (mpi_errno) MPIR_ERR_POP(mpi_errno); dtype_release_if_not_builtin(e->u.copy.intype); dtype_release_if_not_builtin(e->u.copy.outtype); e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE; break; case MPIDU_SCHED_ENTRY_NOP: MPIU_DBG_MSG_D(COMM, VERBOSE, "starting NOOP entry %d\n", (int) idx); /* nothing to be done */ break; case MPIDU_SCHED_ENTRY_CB: MPIU_DBG_MSG_D(COMM, VERBOSE, "starting CB entry %d\n", (int) idx); if (e->u.cb.cb_type == MPIDU_SCHED_CB_TYPE_1) { ret_errno = e->u.cb.u.cb_p(r->comm, s->tag, e->u.cb.cb_state); /* Sched entries list can be reallocated inside callback */ e = &s->entries[idx]; if (unlikely(ret_errno)) { if (MPIR_ERR_NONE == r->errflag) { if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(ret_errno)) { r->errflag = MPIR_ERR_PROC_FAILED; } else { r->errflag = MPIR_ERR_OTHER; } } e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED; } else { e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE; } } else if (e->u.cb.cb_type == MPIDU_SCHED_CB_TYPE_2) { ret_errno = e->u.cb.u.cb2_p(r->comm, s->tag, e->u.cb.cb_state, e->u.cb.cb_state2); /* Sched entries list can be reallocated inside callback */ e = &s->entries[idx]; if (unlikely(ret_errno)) { if (MPIR_ERR_NONE == r->errflag) { if (MPIX_ERR_PROC_FAILED == MPIR_ERR_GET_CLASS(ret_errno)) { r->errflag = MPIR_ERR_PROC_FAILED; } else { r->errflag = MPIR_ERR_OTHER; } } e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED; } else { e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE; } } else { MPIU_DBG_MSG_D(COMM, TYPICAL, "unknown callback type, e->u.cb.cb_type=%d", e->u.cb.cb_type); e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE; } break; default: MPIU_DBG_MSG_D(COMM, TYPICAL, "unknown entry type, e->type=%d", e->type); break; } fn_exit: return mpi_errno; fn_fail: e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED; if (r) r->status.MPI_ERROR = mpi_errno; goto fn_exit; } /* Posts or performs any NOT_STARTED operations in the given schedule that are * permitted to be started. That is, this routine will respect schedule * barriers appropriately. */ #undef FUNCNAME #define FUNCNAME MPIDU_Sched_continue #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) static int MPIDU_Sched_continue(struct MPIDU_Sched *s) { int mpi_errno = MPI_SUCCESS; size_t i; for (i = s->idx; i < s->num_entries; ++i) { struct MPIDU_Sched_entry *e = &s->entries[i]; if (e->status == MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED) { mpi_errno = MPIDU_Sched_start_entry(s, i, e); /* Sched entries list can be reallocated inside callback */ e = &s->entries[i]; if (mpi_errno) MPIR_ERR_POP(mpi_errno); } /* _start_entry may have completed the operation, but won't update s->idx */ if (i == s->idx && e->status >= MPIDU_SCHED_ENTRY_STATUS_COMPLETE) { ++s->idx; /* this is valid even for barrier entries */ } /* watch the indexing, s->idx might have been incremented above, so * ||-short-circuit matters here */ if (e->is_barrier && (e->status < MPIDU_SCHED_ENTRY_STATUS_COMPLETE || (s->idx != i+1))) { /* we've hit a barrier but outstanding operations before this * barrier remain, so we cannot proceed past the barrier */ break; } } fn_exit: return mpi_errno; fn_fail: goto fn_exit; } #undef FUNCNAME #define FUNCNAME MPID_Sched_create #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) /* creates a new opaque schedule object and returns a handle to it in (*sp) */ int MPID_Sched_create(MPID_Sched_t *sp) { int mpi_errno = MPI_SUCCESS; struct MPIDU_Sched *s; MPIU_CHKPMEM_DECL(2); *sp = NULL; /* this mem will be freed by the progress engine when the request is completed */ MPIU_CHKPMEM_MALLOC(s, struct MPIDU_Sched *, sizeof(struct MPIDU_Sched), mpi_errno, "schedule object"); s->size = MPIDU_SCHED_INITIAL_ENTRIES; s->idx = 0; s->num_entries = 0; s->tag = -1; s->req = NULL; s->entries = NULL; s->next = NULL; /* only needed for sanity checks */ s->prev = NULL; /* only needed for sanity checks */ /* this mem will be freed by the progress engine when the request is completed */ MPIU_CHKPMEM_MALLOC(s->entries, struct MPIDU_Sched_entry *, MPIDU_SCHED_INITIAL_ENTRIES*sizeof(struct MPIDU_Sched_entry), mpi_errno, "schedule entries vector"); /* TODO in a debug build, defensively mark all entries as status=INVALID */ MPIU_CHKPMEM_COMMIT(); *sp = s; fn_exit: return mpi_errno; fn_fail: MPIU_CHKPMEM_REAP(); goto fn_exit; } #undef FUNCNAME #define FUNCNAME MPID_Sched_clone #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) /* clones orig and returns a handle to the new schedule in (*cloned) */ int MPID_Sched_clone(MPID_Sched_t orig, MPID_Sched_t *cloned) { int mpi_errno = MPI_SUCCESS; /* TODO implement this function for real */ MPIU_Assert_fmt_msg(FALSE, ("clone not yet implemented")); MPIU_Assertp(FALSE); return mpi_errno; } #undef FUNCNAME #define FUNCNAME MPID_Sched_start #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) /* sets (*sp) to MPID_SCHED_NULL and gives you back a request pointer in (*req). * The caller is giving up ownership of the opaque schedule object. */ int MPID_Sched_start(MPID_Sched_t *sp, MPID_Comm *comm, int tag, MPID_Request **req) { int mpi_errno = MPI_SUCCESS; MPID_Request *r; struct MPIDU_Sched *s = *sp; *req = NULL; *sp = MPID_SCHED_NULL; /* sanity check the schedule */ MPIU_Assert(s->num_entries <= s->size); MPIU_Assert(s->num_entries == 0 || s->idx < s->num_entries); MPIU_Assert(s->req == NULL); MPIU_Assert(s->next == NULL); MPIU_Assert(s->prev == NULL); MPIU_Assert(s->entries != NULL); /* now create and populate the request */ r = MPID_Request_create(); if (!r) MPIR_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem"); r->kind = MPID_COLL_REQUEST; /* FIXME is this right when comm/datatype GC is used? */ MPIR_Comm_add_ref(comm); r->comm = comm; /* req refcount is currently 1, for the user's request. Increment for the * schedule's reference */ MPIR_Request_add_ref(r); s->req = r; *req = r; /* cc is 1, which is fine b/c we only use it as a signal, rather than * incr/decr on every constituent operation */ s->tag = tag; /* Now kick off any initial operations. Do this before we tell the progress * engine about this req+sched, otherwise we have more MT issues to worry * about. Skipping this step will increase latency. */ mpi_errno = MPIDU_Sched_continue(s); if (mpi_errno) MPIR_ERR_POP(mpi_errno); /* finally, enqueue in the list of all pending schedules so that the * progress engine can make progress on it */ if (nbc_progress_hook_id < 0) { mpi_errno = MPID_Progress_register_hook( MPIDU_Sched_progress, &nbc_progress_hook_id); if (mpi_errno) MPIR_ERR_POP(mpi_errno); MPID_Progress_activate_hook(nbc_progress_hook_id); } MPL_DL_APPEND(all_schedules.head, s); MPIU_DBG_MSG_P(COMM, TYPICAL, "started schedule s=%p\n", s); MPIDU_Sched_dump(s); fn_exit: return mpi_errno; fn_fail: if (*req) *req = NULL; if (r) { MPID_Request_release(r); /* the schedule's ref */ MPID_Request_release(r); /* the user's ref */ } goto fn_exit; } #undef FUNCNAME #define FUNCNAME MPIDU_Sched_add_entry #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) /* idx and e are permitted to be NULL */ static int MPIDU_Sched_add_entry(struct MPIDU_Sched *s, int *idx, struct MPIDU_Sched_entry **e) { int mpi_errno = MPI_SUCCESS; int i; struct MPIDU_Sched_entry *ei; MPIU_Assert(s->entries != NULL); MPIU_Assert(s->size > 0); if (s->num_entries == s->size) { /* need to grow the entries array */ struct MPIDU_Sched_entry *entries = (struct MPIDU_Sched_entry *) MPIU_Malloc(2*s->size*sizeof(struct MPIDU_Sched_entry)); if (entries == NULL) MPIR_ERR_SETANDJUMP(mpi_errno,MPI_ERR_OTHER,"**nomem"); /* Stage data to new memory location */ memmove(entries, s->entries, (s->size*sizeof(struct MPIDU_Sched_entry))); /* Free existing memory */ MPIU_Free(s->entries); /* Point to new memory */ s->entries = entries; /* Reflect increased size */ s->size *= 2; } i = s->num_entries++; ei = &s->entries[i]; if (idx != NULL) *idx = i; if (e != NULL) *e = ei; fn_exit: return mpi_errno; fn_fail: goto fn_exit; } #undef FUNCNAME #define FUNCNAME MPID_Sched_send #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) /* do these ops need an entry handle returned? */ int MPID_Sched_send(const void *buf, MPI_Aint count, MPI_Datatype datatype, int dest, MPID_Comm *comm, MPID_Sched_t s) { int mpi_errno = MPI_SUCCESS; struct MPIDU_Sched_entry *e = NULL; mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e); if (mpi_errno) MPIR_ERR_POP(mpi_errno); e->type = MPIDU_SCHED_ENTRY_SEND; e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED; e->is_barrier = FALSE; e->u.send.buf = buf; e->u.send.count = count; e->u.send.count_p = NULL; e->u.send.datatype = datatype; e->u.send.dest = dest; e->u.send.sreq = NULL; /* will be populated by _start_entry */ e->u.send.comm = comm; e->u.send.is_sync = FALSE; /* the user may free the comm & type after initiating but before the * underlying send is actually posted, so we must add a reference here and * release it at entry completion time */ MPIR_Comm_add_ref(comm); dtype_add_ref_if_not_builtin(datatype); fn_exit: return mpi_errno; fn_fail: goto fn_exit; } #undef FUNCNAME #define FUNCNAME MPID_Sched_ssend #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) int MPID_Sched_ssend(const void *buf, MPI_Aint count, MPI_Datatype datatype, int dest, MPID_Comm *comm, MPID_Sched_t s) { int mpi_errno = MPI_SUCCESS; struct MPIDU_Sched_entry *e = NULL; mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e); if (mpi_errno) MPIR_ERR_POP(mpi_errno); e->type = MPIDU_SCHED_ENTRY_SEND; e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED; e->is_barrier = FALSE; e->u.send.buf = buf; e->u.send.count = count; e->u.send.count_p = NULL; e->u.send.datatype = datatype; e->u.send.dest = dest; e->u.send.sreq = NULL; /* will be populated by _start_entry */ e->u.send.comm = comm; e->u.send.is_sync = TRUE; /* the user may free the comm & type after initiating but before the * underlying send is actually posted, so we must add a reference here and * release it at entry completion time */ MPIR_Comm_add_ref(comm); dtype_add_ref_if_not_builtin(datatype); fn_exit: return mpi_errno; fn_fail: goto fn_exit; } #undef FUNCNAME #define FUNCNAME MPID_Sched_send_defer #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) int MPID_Sched_send_defer(const void *buf, const MPI_Aint *count, MPI_Datatype datatype, int dest, MPID_Comm *comm, MPID_Sched_t s) { int mpi_errno = MPI_SUCCESS; struct MPIDU_Sched_entry *e = NULL; mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e); if (mpi_errno) MPIR_ERR_POP(mpi_errno); e->type = MPIDU_SCHED_ENTRY_SEND; e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED; e->is_barrier = FALSE; e->u.send.buf = buf; e->u.send.count = MPI_UNDEFINED; e->u.send.count_p = count; e->u.send.datatype = datatype; e->u.send.dest = dest; e->u.send.sreq = NULL; /* will be populated by _start_entry */ e->u.send.comm = comm; e->u.send.is_sync = FALSE; /* the user may free the comm & type after initiating but before the * underlying send is actually posted, so we must add a reference here and * release it at entry completion time */ MPIR_Comm_add_ref(comm); dtype_add_ref_if_not_builtin(datatype); fn_exit: return mpi_errno; fn_fail: goto fn_exit; } #undef FUNCNAME #define FUNCNAME MPID_Sched_recv_status #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) int MPID_Sched_recv_status(void *buf, MPI_Aint count, MPI_Datatype datatype, int src, MPID_Comm *comm, MPI_Status *status, MPID_Sched_t s) { int mpi_errno = MPI_SUCCESS; struct MPIDU_Sched_entry *e = NULL; mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e); if (mpi_errno) MPIR_ERR_POP(mpi_errno); e->type = MPIDU_SCHED_ENTRY_RECV; e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED; e->is_barrier = FALSE; e->u.recv.buf = buf; e->u.recv.count = count; e->u.recv.datatype = datatype; e->u.recv.src = src; e->u.recv.rreq = NULL; /* will be populated by _start_entry */ e->u.recv.comm = comm; e->u.recv.status = status; status->MPI_ERROR = MPI_SUCCESS; MPIR_Comm_add_ref(comm); dtype_add_ref_if_not_builtin(datatype); fn_exit: return mpi_errno; fn_fail: goto fn_exit; } #undef FUNCNAME #define FUNCNAME MPID_Sched_recv #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) int MPID_Sched_recv(void *buf, MPI_Aint count, MPI_Datatype datatype, int src, MPID_Comm *comm, MPID_Sched_t s) { int mpi_errno = MPI_SUCCESS; struct MPIDU_Sched_entry *e = NULL; mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e); if (mpi_errno) MPIR_ERR_POP(mpi_errno); e->type = MPIDU_SCHED_ENTRY_RECV; e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED; e->is_barrier = FALSE; e->u.recv.buf = buf; e->u.recv.count = count; e->u.recv.datatype = datatype; e->u.recv.src = src; e->u.recv.rreq = NULL; /* will be populated by _start_entry */ e->u.recv.comm = comm; e->u.recv.status = MPI_STATUS_IGNORE; MPIR_Comm_add_ref(comm); dtype_add_ref_if_not_builtin(datatype); fn_exit: return mpi_errno; fn_fail: goto fn_exit; } #undef FUNCNAME #define FUNCNAME MPID_Sched_reduce #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) int MPID_Sched_reduce(const void *inbuf, void *inoutbuf, MPI_Aint count, MPI_Datatype datatype, MPI_Op op, MPID_Sched_t s) { int mpi_errno = MPI_SUCCESS; struct MPIDU_Sched_entry *e = NULL; struct MPIDU_Sched_reduce *reduce = NULL; mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e); if (mpi_errno) MPIR_ERR_POP(mpi_errno); e->type = MPIDU_SCHED_ENTRY_REDUCE; e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED; e->is_barrier = FALSE; reduce = &e->u.reduce; reduce->inbuf = inbuf; reduce->inoutbuf = inoutbuf; reduce->count = count; reduce->datatype = datatype; reduce->op = op; dtype_add_ref_if_not_builtin(datatype); if (HANDLE_GET_KIND(op) != HANDLE_KIND_BUILTIN) { MPID_Op *op_ptr = NULL; MPID_Op_get_ptr(op, op_ptr); MPIR_Op_add_ref(op_ptr); } fn_exit: return mpi_errno; fn_fail: goto fn_exit; } #undef FUNCNAME #define FUNCNAME MPID_Sched_copy #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) /* Schedules a copy of "incount" copies of "intype" from "inbuf" to "outbuf" as * specified by "outcount" and "outtype". It is erroneous to attempt to copy * more data than will fit into the (outbuf,outcount,outtype)-triple. This * corresponds naturally with the buffer sizing rules for send-recv. * * Packing/unpacking can be accomplished by passing MPI_PACKED as either intype * or outtype. */ int MPID_Sched_copy(const void *inbuf, MPI_Aint incount, MPI_Datatype intype, void *outbuf, MPI_Aint outcount, MPI_Datatype outtype, MPID_Sched_t s) { int mpi_errno = MPI_SUCCESS; struct MPIDU_Sched_entry *e = NULL; struct MPIDU_Sched_copy *copy = NULL; mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e); if (mpi_errno) MPIR_ERR_POP(mpi_errno); e->type = MPIDU_SCHED_ENTRY_COPY; e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED; e->is_barrier = FALSE; copy = &e->u.copy; copy->inbuf = inbuf; copy->incount = incount; copy->intype = intype; copy->outbuf = outbuf; copy->outcount = outcount; copy->outtype = outtype; dtype_add_ref_if_not_builtin(intype); dtype_add_ref_if_not_builtin(outtype); /* some sanity checking up front */ #if defined(HAVE_ERROR_CHECKING) && !defined(NDEBUG) { MPI_Aint intype_size, outtype_size; MPID_Datatype_get_size_macro(intype, intype_size); MPID_Datatype_get_size_macro(outtype, outtype_size); if (incount * intype_size > outcount * outtype_size) { MPL_error_printf("truncation: intype=%#x, intype_size=" MPI_AINT_FMT_DEC_SPEC ", incount=" MPI_AINT_FMT_DEC_SPEC ", outtype=%#x, outtype_size=" MPI_AINT_FMT_DEC_SPEC " outcount=" MPI_AINT_FMT_DEC_SPEC "\n", intype, intype_size, incount, outtype, outtype_size, outcount); } } #endif fn_exit: return mpi_errno; fn_fail: goto fn_exit; } #undef FUNCNAME #define FUNCNAME MPID_Sched_barrier #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) /* require that all previously added ops are complete before subsequent ops * may begin to execute */ int MPID_Sched_barrier(MPID_Sched_t s) { int mpi_errno = MPI_SUCCESS; /* mark the previous entry as a barrier unless we're at the beginning, which * would be a pointless barrier */ if (s->num_entries > 0) { s->entries[s->num_entries-1].is_barrier = TRUE; } return mpi_errno; } #undef FUNCNAME #define FUNCNAME MPID_Sched_cb #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) /* buffer management, fancy reductions, etc */ int MPID_Sched_cb(MPID_Sched_cb_t *cb_p, void *cb_state, MPID_Sched_t s) { int mpi_errno = MPI_SUCCESS; struct MPIDU_Sched_entry *e = NULL; struct MPIDU_Sched_cb *cb = NULL; mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e); if (mpi_errno) MPIR_ERR_POP(mpi_errno); e->type = MPIDU_SCHED_ENTRY_CB; e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED; e->is_barrier = FALSE; cb = &e->u.cb; cb->cb_type = MPIDU_SCHED_CB_TYPE_1; cb->u.cb_p = cb_p; cb->cb_state = cb_state; cb->cb_state2 = NULL; fn_exit: return mpi_errno; fn_fail: goto fn_exit; } #undef FUNCNAME #define FUNCNAME MPID_Sched_cb2 #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) /* buffer management, fancy reductions, etc */ int MPID_Sched_cb2(MPID_Sched_cb2_t *cb_p, void *cb_state, void *cb_state2, MPID_Sched_t s) { int mpi_errno = MPI_SUCCESS; struct MPIDU_Sched_entry *e = NULL; struct MPIDU_Sched_cb *cb = NULL; mpi_errno = MPIDU_Sched_add_entry(s, NULL, &e); if (mpi_errno) MPIR_ERR_POP(mpi_errno); e->type = MPIDU_SCHED_ENTRY_CB; e->status = MPIDU_SCHED_ENTRY_STATUS_NOT_STARTED; e->is_barrier = FALSE; cb = &e->u.cb; cb->cb_type = MPIDU_SCHED_CB_TYPE_2; cb->u.cb2_p = cb_p; cb->cb_state = cb_state; cb->cb_state2 = cb_state2; fn_exit: return mpi_errno; fn_fail: goto fn_exit; } /* returns TRUE in (*made_progress) if any of the outstanding schedules in state completed */ #undef FUNCNAME #define FUNCNAME MPIDU_Sched_progress_state #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) static int MPIDU_Sched_progress_state(struct MPIDU_Sched_state *state, int *made_progress) { int mpi_errno = MPI_SUCCESS; size_t i; struct MPIDU_Sched *s; struct MPIDU_Sched *tmp; if (made_progress) *made_progress = FALSE; MPL_DL_FOREACH_SAFE(state->head, s, tmp) { /*MPIDU_Sched_dump(s);*/ for (i = s->idx; i < s->num_entries; ++i) { struct MPIDU_Sched_entry *e = &s->entries[i]; switch (e->type) { case MPIDU_SCHED_ENTRY_SEND: if (e->u.send.sreq != NULL && MPID_Request_is_complete(e->u.send.sreq)) { MPIU_DBG_MSG_FMT(COMM, VERBOSE, (MPIU_DBG_FDEST, "completed SEND entry %d, sreq=%p\n", (int) i, e->u.send.sreq)); if (s->req->errflag != MPIR_ERR_NONE) e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED; else e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE; MPID_Request_release(e->u.send.sreq); e->u.send.sreq = NULL; MPIR_Comm_release(e->u.send.comm); dtype_release_if_not_builtin(e->u.send.datatype); } break; case MPIDU_SCHED_ENTRY_RECV: if (e->u.recv.rreq != NULL && MPID_Request_is_complete(e->u.recv.rreq)) { MPIU_DBG_MSG_FMT(COMM, VERBOSE, (MPIU_DBG_FDEST, "completed RECV entry %d, rreq=%p\n", (int) i, e->u.recv.rreq)); MPIR_Process_status(&e->u.recv.rreq->status, &s->req->errflag); if (e->u.recv.status != MPI_STATUS_IGNORE) { int recvd; e->u.recv.status->MPI_ERROR = e->u.recv.rreq->status.MPI_ERROR; MPIR_Get_count_impl(&e->u.recv.rreq->status, MPI_BYTE, &recvd); MPIR_STATUS_SET_COUNT(*(e->u.recv.status), recvd); } if (s->req->errflag != MPIR_ERR_NONE) e->status = MPIDU_SCHED_ENTRY_STATUS_FAILED; else e->status = MPIDU_SCHED_ENTRY_STATUS_COMPLETE; MPID_Request_release(e->u.recv.rreq); e->u.recv.rreq = NULL; MPIR_Comm_release(e->u.recv.comm); dtype_release_if_not_builtin(e->u.recv.datatype); } break; default: /* all other entry types don't have any sub-requests that * need to be checked */ break; } if (i == s->idx && e->status >= MPIDU_SCHED_ENTRY_STATUS_COMPLETE) { ++s->idx; MPIU_DBG_MSG_D(COMM, VERBOSE, "completed OTHER entry %d\n", (int) i); if (e->is_barrier) { /* post/perform the next round of operations */ #if defined(CHANNEL_MRAIL) sched_in_continue = 1; #endif mpi_errno = MPIDU_Sched_continue(s); if (mpi_errno) MPIR_ERR_POP(mpi_errno); #if defined(CHANNEL_MRAIL) sched_in_continue = 0; #endif } } else if (e->is_barrier && e->status < MPIDU_SCHED_ENTRY_STATUS_COMPLETE) { /* don't process anything after this barrier entry */ break; } } if (s->idx == s->num_entries) { MPIU_DBG_MSG_FMT(COMM, VERBOSE, (MPIU_DBG_FDEST, "completing and dequeuing s=%p r=%p\n", s, s->req)); /* dequeue this schedule from the state, it's complete */ MPL_DL_DELETE(state->head, s); /* TODO refactor into a sched_complete routine? */ switch (s->req->errflag) { case MPIR_ERR_PROC_FAILED: MPIR_ERR_SET(s->req->status.MPI_ERROR, MPIX_ERR_PROC_FAILED, "**comm"); break; case MPIR_ERR_OTHER: MPIR_ERR_SET(s->req->status.MPI_ERROR, MPI_ERR_OTHER, "**comm"); break; case MPIR_ERR_NONE: default: break; } mpi_errno = MPID_Request_complete(s->req); if (mpi_errno != MPI_SUCCESS) { MPIR_ERR_POP(mpi_errno); } s->req = NULL; MPIU_Free(s->entries); MPIU_Free(s); if (made_progress) *made_progress = TRUE; } } fn_exit: return mpi_errno; fn_fail: goto fn_exit; } /* returns TRUE in (*made_progress) if any of the outstanding schedules completed */ #undef FUNCNAME #define FUNCNAME MPIDU_Sched_progress #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) int MPIDU_Sched_progress(int *made_progress) { #if defined(CHANNEL_MRAIL) /* check if we are already in Sched_progress. ** MPID_Isend from MPIDU_Sched_progress can trigger MPIDU_Sched_progress again ** (MPIDU_Sched_progress -> MPID_Isend -> MPID_Progress_test -> MPIDU_Sched_progress) */ if (sched_in_continue) { return MPI_SUCCESS; } #endif return MPIDU_Sched_progress_state(&all_schedules, made_progress); } static const char *entry_to_str(enum MPIDU_Sched_entry_type type) ATTRIBUTE((unused,used)); static const char *entry_to_str(enum MPIDU_Sched_entry_type type) { switch(type) { case MPIDU_SCHED_ENTRY_SEND: return "SEND"; case MPIDU_SCHED_ENTRY_RECV: return "RECV"; case MPIDU_SCHED_ENTRY_REDUCE: return "REDUCE"; case MPIDU_SCHED_ENTRY_COPY: return "COPY"; case MPIDU_SCHED_ENTRY_NOP: return "NOP"; case MPIDU_SCHED_ENTRY_CB: return "CB"; default: return "(out of range)"; } } /* utility function for debugging, dumps the given schedule object to fh */ #undef FUNCNAME #define FUNCNAME MPIDU_Sched_dump_fh #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) void MPIDU_Sched_dump_fh(struct MPIDU_Sched *s, FILE *fh) { int i; dprintf(fh, "--------------------------------\n"); dprintf(fh, "s=%p\n", s); if (s) { dprintf(fh, "s->size=%zd\n", s->size); dprintf(fh, "s->idx=%zd\n", s->idx); dprintf(fh, "s->num_entries=%d\n", s->num_entries); dprintf(fh, "s->tag=%d\n", s->tag); dprintf(fh, "s->req=%p\n", s->req); dprintf(fh, "s->entries=%p\n", s->entries); for (i = 0; i < s->num_entries; ++i) { dprintf(fh, "&s->entries[%d]=%p\n", i, &s->entries[i]); dprintf(fh, "s->entries[%d].type=%s\n", i, entry_to_str(s->entries[i].type)); dprintf(fh, "s->entries[%d].status=%d\n", i, s->entries[i].status); dprintf(fh, "s->entries[%d].is_barrier=%s\n", i, (s->entries[i].is_barrier ? "TRUE" : "FALSE")); } } dprintf(fh, "--------------------------------\n"); /* dprintf(fh, "s->next=%p\n", s->next); dprintf(fh, "s->prev=%p\n", s->prev); */ } /* utility function for debugging, dumps the given schedule object to stderr */ #undef FUNCNAME #define FUNCNAME MPIDU_Sched_dump #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) void MPIDU_Sched_dump(struct MPIDU_Sched *s) { MPIDU_Sched_dump_fh(s, stderr); }