/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */ /* * (C) 2014 by Argonne National Laboratory. * See COPYRIGHT in top-level directory. */ #include "ptl_impl.h" #include "rptl.h" /* FIXME: turn this into a CVAR, or fraction of the event limit from rptl_init */ #define PER_TARGET_THRESHOLD 50 /* * Prereqs: * * 1. We create an extra control portal that is only used by rportals. * * 2. All communication operations are logged at the origin process, * and their ACKs and NACKs are kept track of. If an operation gets * an ACK, it is complete and can be deleted from the logs. If an * operation gets a NACK, it will need to be retransmitted once the * flow-control protocol described below has completed. * * * Flow control algorithm: * * 1. When the primary data portal gets disabled, the target sends * PAUSE messages to all other processes. * * 2. Once each process confirms that it has no outstanding packets on * the wire (i.e., all packets have either been ACKed or NACKed), it * sends a PAUSE-ACK message. * * 3. When the target receives PAUSE-ACK messages from all processes * (thus confirming that the network traffic to itself has been * quiesced), it waits till the user has dequeued at least half the * messages from the overflow buffer. This is done by keeping track * of the number of messages that are injected into the overflow * buffer by portals and the number of messages that are dequeued by * the user. * * 4. Once we know that there is enough free space in the overflow * buffers, the target reenables the portal and send an UNPAUSE * message to all processes. * * * Known issues: * * 1. None of the error codes specified by portals allow us to return * an "OTHER" error, when something bad happens internally. So we * arbitrarily return PTL_FAIL when it is an internal error even * though that's not a specified error return code for some portals * functions. When portals functions are called internally, if they * return an error, we funnel them back upstream. This is not an * "issue" per se, but is still ugly. * * 2. None of the pt index types specified by portals allow us to * retuen an "INVALID" pt entry, to show that a portal is invalid. So * we arbitrarily use PTL_PT_ANY in such cases. Again, this is not an * "issue" per se, but is ugly. */ #define IDS_ARE_EQUAL(t1, t2) \ (t1.phys.nid == t2.phys.nid && t1.phys.pid == t2.phys.pid) struct rptl_info rptl_info; #undef FUNCNAME #define FUNCNAME find_target #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) static int find_target(ptl_process_t id, struct rptl_target **target) { int mpi_errno = MPI_SUCCESS; int ret = PTL_OK; struct rptl_target *t; MPIU_CHKPMEM_DECL(1); MPIDI_STATE_DECL(MPID_STATE_FIND_TARGET); MPIDI_FUNC_ENTER(MPID_STATE_FIND_TARGET); for (t = rptl_info.target_list; t; t = t->next) if (IDS_ARE_EQUAL(t->id, id)) break; /* if the target does not already exist, create one */ if (t == NULL) { MPIU_CHKPMEM_MALLOC(t, struct rptl_target *, sizeof(struct rptl_target), mpi_errno, "rptl target"); MPL_DL_APPEND(rptl_info.target_list, t); t->id = id; t->state = RPTL_TARGET_STATE_ACTIVE; t->rptl = NULL; t->op_segment_list = NULL; t->op_pool = NULL; t->data_op_list = NULL; t->control_op_list = NULL; t->issued_data_ops = 0; } *target = t; fn_exit: MPIU_CHKPMEM_COMMIT(); MPIDI_FUNC_EXIT(MPID_STATE_FIND_TARGET); return ret; fn_fail: if (mpi_errno) ret = PTL_FAIL; MPIU_CHKPMEM_REAP(); goto fn_exit; } static int rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, ptl_size_t length, ptl_ack_req_t ack_req, ptl_process_t target_id, ptl_pt_index_t pt_index, ptl_match_bits_t match_bits, ptl_size_t remote_offset, void *user_ptr, ptl_hdr_data_t hdr_data, enum rptl_pt_type pt_type); #undef FUNCNAME #define FUNCNAME poke_progress #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) static int poke_progress(void) { int ret = PTL_OK; struct rptl_target *target; struct rptl_op *op; struct rptl *rptl; int i; int mpi_errno = MPI_SUCCESS; ptl_process_t id; ptl_pt_index_t data_pt, control_pt; MPIDI_STATE_DECL(MPID_STATE_POKE_PROGRESS); MPIDI_FUNC_ENTER(MPID_STATE_POKE_PROGRESS); /* make progress on local RPTLs */ for (rptl = rptl_info.rptl_list; rptl; rptl = rptl->next) { /* if the local state is active, there's nothing to do */ if (rptl->local_state == RPTL_LOCAL_STATE_ACTIVE) continue; /* if we are in a local AWAITING PAUSE ACKS state, see if we * can send out the unpause message */ if (rptl->local_state == RPTL_LOCAL_STATE_AWAITING_PAUSE_ACKS && rptl->pause_ack_counter == rptl_info.world_size - 1) { /* if we are over the max count limit, do not send an * unpause message yet */ if (rptl->data.ob_curr_count > rptl->data.ob_max_count) continue; ret = PtlPTEnable(rptl->ni, rptl->data.pt); RPTLU_ERR_POP(ret, "Error returned while reenabling PT\n"); rptl->local_state = RPTL_LOCAL_STATE_ACTIVE; for (i = 0; i < rptl_info.world_size; i++) { if (i == MPIDI_Process.my_pg_rank) continue; mpi_errno = rptl_info.get_target_info(i, &id, rptl->data.pt, &data_pt, &control_pt); if (mpi_errno) { ret = PTL_FAIL; RPTLU_ERR_POP(ret, "Error getting target info\n"); } /* make sure the user setup a control portal */ assert(control_pt != PTL_PT_ANY); ret = rptl_put(rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt, 0, 0, NULL, RPTL_CONTROL_MSG_UNPAUSE, RPTL_PT_CONTROL); RPTLU_ERR_POP(ret, "Error sending unpause message\n"); } } } /* make progress on targets */ for (target = rptl_info.target_list; target; target = target->next) { if (target->state == RPTL_TARGET_STATE_RECEIVED_PAUSE) { for (op = target->data_op_list; op; op = op->next) if (op->state == RPTL_OP_STATE_ISSUED) break; if (op) continue; /* send a pause ack message */ assert(target->rptl); for (i = 0; i < rptl_info.world_size; i++) { if (i == MPIDI_Process.my_pg_rank) continue; /* find the target that has this target id and get the * control portal information for it */ mpi_errno = rptl_info.get_target_info(i, &id, target->rptl->data.pt, &data_pt, &control_pt); if (mpi_errno) { ret = PTL_FAIL; RPTLU_ERR_POP(ret, "Error getting target info\n"); } if (IDS_ARE_EQUAL(id, target->id)) break; } /* make sure the user setup a control portal */ assert(control_pt != PTL_PT_ANY); target->state = RPTL_TARGET_STATE_PAUSE_ACKED; ret = rptl_put(target->rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt, 0, 0, NULL, RPTL_CONTROL_MSG_PAUSE_ACK, RPTL_PT_CONTROL); RPTLU_ERR_POP(ret, "Error sending pause ack message\n"); continue; } /* issue out all the control messages first */ for (op = target->control_op_list; op; op = op->next) { assert(op->op_type == RPTL_OP_PUT); /* skip all the issued ops */ if (op->state == RPTL_OP_STATE_ISSUED) continue; /* we should not get any NACKs on the control portal */ assert(op->state != RPTL_OP_STATE_NACKED); if (rptl_info.origin_events_left < 2 || target->issued_data_ops > PER_TARGET_THRESHOLD) { /* too few origin events left. we can't issue this op * or any following op to this target in order to * maintain ordering */ break; } rptl_info.origin_events_left -= 2; target->issued_data_ops++; /* force request for an ACK even if the user didn't ask * for it. replace the user pointer with the OP id. */ ret = PtlPut(op->u.put.md_handle, op->u.put.local_offset, op->u.put.length, PTL_ACK_REQ, op->u.put.target_id, op->u.put.pt_index, op->u.put.match_bits, op->u.put.remote_offset, op, op->u.put.hdr_data); RPTLU_ERR_POP(ret, "Error issuing PUT\n"); op->state = RPTL_OP_STATE_ISSUED; } if (target->state == RPTL_TARGET_STATE_DISABLED || target->state == RPTL_TARGET_STATE_PAUSE_ACKED) continue; /* then issue out all the data messages */ for (op = target->data_op_list; op; op = op->next) { if (op->op_type == RPTL_OP_PUT) { /* skip all the issued ops */ if (op->state == RPTL_OP_STATE_ISSUED) continue; /* if an op has been nacked, don't issue anything else * to this target */ if (op->state == RPTL_OP_STATE_NACKED) break; if (rptl_info.origin_events_left < 2 || target->issued_data_ops > PER_TARGET_THRESHOLD) { /* too few origin events left. we can't issue * this op or any following op to this target in * order to maintain ordering */ break; } rptl_info.origin_events_left -= 2; target->issued_data_ops++; /* force request for an ACK even if the user didn't * ask for it. replace the user pointer with the OP * id. */ ret = PtlPut(op->u.put.md_handle, op->u.put.local_offset, op->u.put.length, PTL_ACK_REQ, op->u.put.target_id, op->u.put.pt_index, op->u.put.match_bits, op->u.put.remote_offset, op, op->u.put.hdr_data); RPTLU_ERR_POP(ret, "Error issuing PUT\n"); } else if (op->op_type == RPTL_OP_GET) { /* skip all the issued ops */ if (op->state == RPTL_OP_STATE_ISSUED) continue; /* if an op has been nacked, don't issue anything else * to this target */ if (op->state == RPTL_OP_STATE_NACKED) break; if (rptl_info.origin_events_left < 1 || target->issued_data_ops > PER_TARGET_THRESHOLD) { /* too few origin events left. we can't issue * this op or any following op to this target in * order to maintain ordering */ break; } rptl_info.origin_events_left--; target->issued_data_ops++; ret = PtlGet(op->u.get.md_handle, op->u.get.local_offset, op->u.get.length, op->u.get.target_id, op->u.get.pt_index, op->u.get.match_bits, op->u.get.remote_offset, op); RPTLU_ERR_POP(ret, "Error issuing GET\n"); } op->state = RPTL_OP_STATE_ISSUED; } } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_POKE_PROGRESS); return ret; fn_fail: goto fn_exit; } #undef FUNCNAME #define FUNCNAME rptl_put #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) static int rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, ptl_size_t length, ptl_ack_req_t ack_req, ptl_process_t target_id, ptl_pt_index_t pt_index, ptl_match_bits_t match_bits, ptl_size_t remote_offset, void *user_ptr, ptl_hdr_data_t hdr_data, enum rptl_pt_type pt_type) { struct rptl_op *op; int ret = PTL_OK; struct rptl_target *target; MPIDI_STATE_DECL(MPID_STATE_RPTL_PUT); MPIDI_FUNC_ENTER(MPID_STATE_RPTL_PUT); ret = find_target(target_id, &target); RPTLU_ERR_POP(ret, "error finding target structure\n"); ret = rptli_op_alloc(&op, target); RPTLU_ERR_POP(ret, "error allocating op\n"); op->op_type = RPTL_OP_PUT; op->state = RPTL_OP_STATE_QUEUED; /* store the user parameters */ op->u.put.md_handle = md_handle; op->u.put.local_offset = local_offset; op->u.put.length = length; op->u.put.ack_req = ack_req; op->u.put.target_id = target_id; op->u.put.pt_index = pt_index; op->u.put.match_bits = match_bits; op->u.put.remote_offset = remote_offset; op->u.put.user_ptr = user_ptr; op->u.put.hdr_data = hdr_data; /* place to store the send and ack events */ op->u.put.send = NULL; op->u.put.ack = NULL; op->u.put.pt_type = pt_type; op->events_ready = 0; op->target = target; if (op->u.put.pt_type == RPTL_PT_DATA) MPL_DL_APPEND(target->data_op_list, op); else MPL_DL_APPEND(target->control_op_list, op); ret = poke_progress(); RPTLU_ERR_POP(ret, "Error from poke_progress\n"); fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_RPTL_PUT); return ret; fn_fail: goto fn_exit; } #undef FUNCNAME #define FUNCNAME rptl_put #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) int MPID_nem_ptl_rptl_put(ptl_handle_md_t md_handle, ptl_size_t local_offset, ptl_size_t length, ptl_ack_req_t ack_req, ptl_process_t target_id, ptl_pt_index_t pt_index, ptl_match_bits_t match_bits, ptl_size_t remote_offset, void *user_ptr, ptl_hdr_data_t hdr_data) { return rptl_put(md_handle, local_offset, length, ack_req, target_id, pt_index, match_bits, remote_offset, user_ptr, hdr_data, RPTL_PT_DATA); } #undef FUNCNAME #define FUNCNAME MPID_nem_ptl_rptl_get #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) int MPID_nem_ptl_rptl_get(ptl_handle_md_t md_handle, ptl_size_t local_offset, ptl_size_t length, ptl_process_t target_id, ptl_pt_index_t pt_index, ptl_match_bits_t match_bits, ptl_size_t remote_offset, void *user_ptr) { struct rptl_op *op; int ret = PTL_OK; struct rptl_target *target; MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RPTL_GET); MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RPTL_GET); ret = find_target(target_id, &target); RPTLU_ERR_POP(ret, "error finding target structure\n"); ret = rptli_op_alloc(&op, target); RPTLU_ERR_POP(ret, "error allocating op\n"); op->op_type = RPTL_OP_GET; op->state = RPTL_OP_STATE_QUEUED; /* store the user parameters */ op->u.get.md_handle = md_handle; op->u.get.local_offset = local_offset; op->u.get.length = length; op->u.get.target_id = target_id; op->u.get.pt_index = pt_index; op->u.get.match_bits = match_bits; op->u.get.remote_offset = remote_offset; op->u.get.user_ptr = user_ptr; op->events_ready = 0; op->target = target; MPL_DL_APPEND(target->data_op_list, op); ret = poke_progress(); RPTLU_ERR_POP(ret, "Error from poke_progress\n"); fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_RPTL_GET); return ret; fn_fail: goto fn_exit; } #undef FUNCNAME #define FUNCNAME send_pause_messages #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) static int send_pause_messages(struct rptl *rptl) { int i, mpi_errno = MPI_SUCCESS; ptl_process_t id; ptl_pt_index_t data_pt, control_pt; int ret = PTL_OK; MPIDI_STATE_DECL(MPID_STATE_SEND_PAUSE_MESSAGES); MPIDI_FUNC_ENTER(MPID_STATE_SEND_PAUSE_MESSAGES); /* if no control portal is setup for this rptl, we are doomed */ assert(rptl->control.pt != PTL_PT_ANY); /* set the max message count in the overflow buffers we can keep * before sending the unpause messages */ rptl->data.ob_max_count = rptl->data.ob_curr_count / 2; for (i = 0; i < rptl_info.world_size; i++) { if (i == MPIDI_Process.my_pg_rank) continue; mpi_errno = rptl_info.get_target_info(i, &id, rptl->data.pt, &data_pt, &control_pt); if (mpi_errno) { ret = PTL_FAIL; RPTLU_ERR_POP(ret, "Error getting target info while sending pause messages\n"); } /* make sure the user setup a control portal */ assert(control_pt != PTL_PT_ANY); ret = rptl_put(rptl->md, 0, 0, PTL_NO_ACK_REQ, id, control_pt, 0, 0, NULL, RPTL_CONTROL_MSG_PAUSE, RPTL_PT_CONTROL); RPTLU_ERR_POP(ret, "Error sending pause message\n"); } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_SEND_PAUSE_MESSAGES); return ret; fn_fail: goto fn_exit; } #undef FUNCNAME #define FUNCNAME clear_nacks #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) static int clear_nacks(ptl_process_t target_id) { struct rptl_target *target; struct rptl_op *op; int ret = PTL_OK; MPIDI_STATE_DECL(MPID_STATE_CLEAR_NACKS); MPIDI_FUNC_ENTER(MPID_STATE_CLEAR_NACKS); ret = find_target(target_id, &target); RPTLU_ERR_POP(ret, "error finding target\n"); for (op = target->data_op_list; op; op = op->next) { if ((op->op_type == RPTL_OP_PUT && IDS_ARE_EQUAL(op->u.put.target_id, target_id)) || (op->op_type == RPTL_OP_GET && IDS_ARE_EQUAL(op->u.get.target_id, target_id))) { if (op->state == RPTL_OP_STATE_NACKED) op->state = RPTL_OP_STATE_QUEUED; } } target->state = RPTL_TARGET_STATE_ACTIVE; ret = poke_progress(); RPTLU_ERR_POP(ret, "error in poke_progress\n"); fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_CLEAR_NACKS); return ret; fn_fail: goto fn_exit; } #undef FUNCNAME #define FUNCNAME get_event_info #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) static int get_event_info(ptl_event_t * event, struct rptl **ret_rptl, struct rptl_op **ret_op) { struct rptl *rptl; struct rptl_op *op; int ret = PTL_OK; MPIDI_STATE_DECL(MPID_STATE_GET_EVENT_INFO); MPIDI_FUNC_ENTER(MPID_STATE_GET_EVENT_INFO); if (event->type == PTL_EVENT_SEND || event->type == PTL_EVENT_REPLY || event->type == PTL_EVENT_ACK) { op = (struct rptl_op *) event->user_ptr; rptl_info.origin_events_left++; if (event->type != PTL_EVENT_SEND) op->target->issued_data_ops--; /* see if there are any pending ops to be issued */ ret = poke_progress(); RPTLU_ERR_POP(ret, "Error returned from poke_progress\n"); assert(op); rptl = NULL; } else { /* for all target-side events, we look up the rptl based on * the pt_index */ for (rptl = rptl_info.rptl_list; rptl; rptl = rptl->next) if (rptl->data.pt == event->pt_index || rptl->control.pt == event->pt_index) break; assert(rptl); op = NULL; } *ret_rptl = rptl; *ret_op = op; fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_GET_EVENT_INFO); return ret; fn_fail: goto fn_exit; } #undef FUNCNAME #define FUNCNAME stash_event #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) static int stash_event(struct rptl_op *op, ptl_event_t event) { int mpi_errno = MPI_SUCCESS; int ret = PTL_OK; MPIU_CHKPMEM_DECL(1); MPIDI_STATE_DECL(MPID_STATE_STASH_EVENT); MPIDI_FUNC_ENTER(MPID_STATE_STASH_EVENT); /* make sure this is of the event type we know of */ assert(event.type == PTL_EVENT_SEND || event.type == PTL_EVENT_ACK); /* only PUT events are stashed */ assert(op->op_type == RPTL_OP_PUT); /* we should never stash anything when we are in events ready */ assert(op->events_ready == 0); /* only one of send or ack is stashed. if we are in this * function, both the events should be NULL at this point. */ assert(op->u.put.send == NULL && op->u.put.ack == NULL); if (event.type == PTL_EVENT_SEND) { MPIU_CHKPMEM_MALLOC(op->u.put.send, ptl_event_t *, sizeof(ptl_event_t), mpi_errno, "ptl event"); memcpy(op->u.put.send, &event, sizeof(ptl_event_t)); } else { MPIU_CHKPMEM_MALLOC(op->u.put.ack, ptl_event_t *, sizeof(ptl_event_t), mpi_errno, "ptl event"); memcpy(op->u.put.ack, &event, sizeof(ptl_event_t)); } fn_exit: MPIU_CHKPMEM_COMMIT(); MPIDI_FUNC_EXIT(MPID_STATE_STASH_EVENT); return ret; fn_fail: if (mpi_errno) ret = PTL_FAIL; MPIU_CHKPMEM_REAP(); goto fn_exit; } static ptl_event_t pending_event; static int pending_event_valid = 0; #undef FUNCNAME #define FUNCNAME MPID_nem_ptl_rptl_eqget #undef FCNAME #define FCNAME MPL_QUOTE(FUNCNAME) int MPID_nem_ptl_rptl_eqget(ptl_handle_eq_t eq_handle, ptl_event_t * event) { struct rptl_op *op = NULL; struct rptl *rptl = NULL; int ret = PTL_OK, tmp_ret = PTL_OK; int mpi_errno = MPI_SUCCESS; struct rptl_target *target; MPIDI_STATE_DECL(MPID_STATE_MPID_NEM_PTL_RPTL_EQGET); MPIDI_FUNC_ENTER(MPID_STATE_MPID_NEM_PTL_RPTL_EQGET); ret = poke_progress(); RPTLU_ERR_POP(ret, "error poking progress\n"); /* before we poll the eq, we need to check if there are any * completed operations that need to be returned */ if (pending_event_valid) { memcpy(event, &pending_event, sizeof(ptl_event_t)); pending_event_valid = 0; ret = PTL_OK; goto fn_exit; } ret = PtlEQGet(eq_handle, event); if (ret == PTL_EQ_EMPTY) goto fn_exit; /* find the rptl and op associated with this event */ tmp_ret = get_event_info(event, &rptl, &op); if (tmp_ret) { ret = tmp_ret; RPTLU_ERR_POP(ret, "Error returned from get_event_info\n"); } /* PT_DISABLED events only occur on the target */ if (event->type == PTL_EVENT_PT_DISABLED) { /* we hide PT disabled events from the user */ ret = PTL_EQ_EMPTY; /* we should only receive disable events on the data pt */ assert(rptl->data.pt == event->pt_index); /* if we don't have a control PT, we don't have a way to * recover from disable events */ assert(rptl->control.pt != PTL_PT_ANY); if (rptl->local_state == RPTL_LOCAL_STATE_ACTIVE) { rptl->local_state = RPTL_LOCAL_STATE_AWAITING_PAUSE_ACKS; rptl->pause_ack_counter = 0; /* send out pause messages */ tmp_ret = send_pause_messages(rptl); if (tmp_ret) { ret = tmp_ret; RPTLU_ERR_POP(ret, "Error returned from send_pause_messages\n"); } } } /* PUT_OVERFLOW events only occur on the target and only for the * data portal */ else if (event->type == PTL_EVENT_PUT_OVERFLOW || event->type == PTL_EVENT_GET_OVERFLOW) { /* something is being pulled out of the overflow buffer, * decrement counter */ rptl->data.ob_curr_count--; /* we should only receive disable events on the data pt */ assert(rptl->data.pt == event->pt_index); } /* PUT events only occur on the target */ else if (event->type == PTL_EVENT_PUT || event->type == PTL_EVENT_GET) { if (rptl->data.pt == event->pt_index) { /* if the event is in the OVERFLOW list, then it means we * just got a match in there */ if (event->ptl_list == PTL_OVERFLOW_LIST) rptl->data.ob_curr_count++; goto fn_exit; } /* control PT should never see a GET event */ assert(event->type == PTL_EVENT_PUT); /* else, this message is on the control PT, so hide this event * from the user */ ret = PTL_EQ_EMPTY; /* the message came in on the control PT, repost it */ tmp_ret = rptli_post_control_buffer(rptl->ni, rptl->control.pt, &rptl->control.me[rptl->control.me_idx]); if (tmp_ret) { ret = tmp_ret; RPTLU_ERR_POP(ret, "Error returned from rptli_post_control_buffer\n"); } rptl->control.me_idx++; if (rptl->control.me_idx >= 2 * rptl_info.world_size) rptl->control.me_idx = 0; if (event->hdr_data == RPTL_CONTROL_MSG_PAUSE) { tmp_ret = find_target(event->initiator, &target); if (tmp_ret) { ret = tmp_ret; RPTLU_ERR_POP(ret, "Error finding target\n"); } assert(target->state < RPTL_TARGET_STATE_RECEIVED_PAUSE); target->state = RPTL_TARGET_STATE_RECEIVED_PAUSE; target->rptl = rptl; } else if (event->hdr_data == RPTL_CONTROL_MSG_PAUSE_ACK) { rptl->pause_ack_counter++; } else { /* got an UNPAUSE message */ /* clear NACKs from all operations to this target and poke * progress */ tmp_ret = clear_nacks(event->initiator); if (tmp_ret) { ret = tmp_ret; RPTLU_ERR_POP(ret, "Error returned from clear_nacks\n"); } } } /* origin side events */ else if (event->type == PTL_EVENT_SEND || event->type == PTL_EVENT_ACK || event->type == PTL_EVENT_REPLY) { /* if this is a failed event, we simply drop this event */ if (event->ni_fail_type == PTL_NI_PT_DISABLED) { /* hide the event from the user */ ret = PTL_EQ_EMPTY; /* we should not get NACKs on the control portal */ if (event->type == PTL_EVENT_ACK) assert(op->u.put.pt_type == RPTL_PT_DATA); op->state = RPTL_OP_STATE_NACKED; if (op->op_type == RPTL_OP_PUT) { assert(!(event->type == PTL_EVENT_SEND && op->u.put.send)); assert(!(event->type == PTL_EVENT_ACK && op->u.put.ack)); /* discard pending events, since we will retransmit * this op anyway */ if (op->u.put.ack) { MPIU_Free(op->u.put.ack); op->u.put.ack = NULL; } if (op->u.put.send) { MPIU_Free(op->u.put.send); op->u.put.send = NULL; } } if (op->op_type == RPTL_OP_PUT) tmp_ret = find_target(op->u.put.target_id, &target); else tmp_ret = find_target(op->u.get.target_id, &target); if (tmp_ret) { ret = tmp_ret; RPTLU_ERR_POP(ret, "Error finding target\n"); } if (target->state == RPTL_TARGET_STATE_ACTIVE) { target->state = RPTL_TARGET_STATE_DISABLED; target->rptl = NULL; } } /* if this is a REPLY event, we are done with this op */ else if (event->type == PTL_EVENT_REPLY) { assert(op->op_type == RPTL_OP_GET); event->user_ptr = op->u.get.user_ptr; /* GET operations only go into the data op list */ MPL_DL_DELETE(op->target->data_op_list, op); rptli_op_free(op); } else if (event->type == PTL_EVENT_SEND && op->u.put.ack) { assert(op->op_type == RPTL_OP_PUT); /* we already got the other event we needed earlier. mark * the op events as ready and return this current event to * the user. */ op->events_ready = 1; event->user_ptr = op->u.put.user_ptr; /* if the message is over the control portal, ignore both * events */ if (op->u.put.pt_type == RPTL_PT_CONTROL) { /* drop the ack event */ MPIU_Free(op->u.put.ack); MPL_DL_DELETE(op->target->control_op_list, op); rptli_op_free(op); /* drop the send event */ ret = PTL_EQ_EMPTY; } else { /* if the message is over the data portal, we'll * return the send event. if the user asked for an * ACK, we will enqueue the ack to be returned * next. */ if (op->u.put.ack_req & PTL_ACK_REQ) { /* only one event should be pending */ assert(pending_event_valid == 0); memcpy(&pending_event, op->u.put.ack, sizeof(ptl_event_t)); pending_event_valid = 1; } MPIU_Free(op->u.put.ack); MPL_DL_DELETE(op->target->data_op_list, op); rptli_op_free(op); } } else if (event->type == PTL_EVENT_ACK && op->u.put.send) { assert(op->op_type == RPTL_OP_PUT); /* we already got the other event we needed earlier. mark * the op events as ready and return this current event to * the user. */ op->events_ready = 1; event->user_ptr = op->u.put.user_ptr; /* if the message is over the control portal, ignore both * events */ if (op->u.put.pt_type == RPTL_PT_CONTROL) { /* drop the send event */ MPIU_Free(op->u.put.send); MPL_DL_DELETE(op->target->control_op_list, op); rptli_op_free(op); /* drop the ack event */ ret = PTL_EQ_EMPTY; } else { /* if the message is over the data portal, we'll * return the send event. if the user asked for an * ACK, we will enqueue the ack to be returned * next. */ if (op->u.put.ack_req & PTL_ACK_REQ) { /* user asked for an ACK, so return it to the user * and queue up the SEND event for next time */ memcpy(&pending_event, op->u.put.send, sizeof(ptl_event_t)); MPIU_Free(op->u.put.send); assert(pending_event_valid == 0); pending_event_valid = 1; } else { /* user didn't ask for an ACK, overwrite the ACK * event with the pending send event */ memcpy(event, op->u.put.send, sizeof(ptl_event_t)); MPIU_Free(op->u.put.send); /* set the event user pointer again, since we * copied over the original event */ event->user_ptr = op->u.put.user_ptr; } /* we should be in the data op list */ MPL_DL_DELETE(op->target->data_op_list, op); rptli_op_free(op); } } else { assert(!(event->type == PTL_EVENT_SEND && op->u.put.send)); assert(!(event->type == PTL_EVENT_ACK && op->u.put.ack)); /* stash this event as we need to wait for the buddy event * as well before returning to the user */ ret = stash_event(op, *event); RPTLU_ERR_POP(ret, "error stashing event\n"); ret = PTL_EQ_EMPTY; } } fn_exit: MPIDI_FUNC_EXIT(MPID_STATE_MPID_NEM_PTL_RPTL_EQGET); return ret; fn_fail: if (mpi_errno) ret = PTL_FAIL; goto fn_exit; }