diff --git a/include/ofi_util.h b/include/ofi_util.h index dda5c903e6e..bc590bb4d1a 100644 --- a/include/ofi_util.h +++ b/include/ofi_util.h @@ -955,12 +955,15 @@ struct rxm_av { struct fid_peer_av peer_av; struct fid_av *util_coll_av; struct fid_av *offload_coll_av; + void (*foreach_ep)(struct util_av *av, struct util_ep *util_ep); }; int rxm_util_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, struct fid_av **fid_av, void *context, size_t conn_size, void (*remove_handler)(struct util_ep *util_ep, - struct util_peer_addr *peer)); + struct util_peer_addr *peer), + void (*foreach_ep)(struct util_av *av, + struct util_ep *ep)); size_t rxm_av_max_peers(struct rxm_av *av); void rxm_ref_peer(struct util_peer_addr *peer); void *rxm_av_alloc_conn(struct rxm_av *av); diff --git a/prov/rxm/src/rxm.h b/prov/rxm/src/rxm.h index 5d18f16e157..93e08624fc1 100644 --- a/prov/rxm/src/rxm.h +++ b/prov/rxm/src/rxm.h @@ -183,9 +183,9 @@ do { \ extern struct fi_provider rxm_prov; extern struct util_prov rxm_util_prov; -extern struct fi_ops_msg rxm_msg_ops; +extern struct fi_ops_msg rxm_msg_ops, rxm_no_recv_msg_ops; extern struct fi_ops_msg rxm_msg_thru_ops; -extern struct fi_ops_tagged rxm_tagged_ops; +extern struct fi_ops_tagged rxm_tagged_ops, rxm_no_recv_tagged_ops; extern struct fi_ops_tagged rxm_tagged_thru_ops; extern struct fi_ops_rma rxm_rma_ops; extern struct fi_ops_rma rxm_rma_thru_ops; @@ -265,6 +265,8 @@ struct rxm_fabric { struct rxm_domain { struct util_domain util_domain; struct fid_domain *msg_domain; + struct fid_ep rx_ep; + struct fid_peer_srx *srx; size_t max_atomic_size; size_t rx_post_size; uint64_t mr_key; @@ -443,24 +445,29 @@ rxm_sar_set_seg_type(struct ofi_ctrl_hdr *ctrl_hdr, enum rxm_sar_seg_type seg_ty ((union rxm_sar_ctrl_data *)&(ctrl_hdr->ctrl_data))->seg_type = seg_type; } -struct rxm_recv_match_attr { - fi_addr_t addr; - uint64_t tag; - uint64_t ignore; -}; - -struct rxm_unexp_msg { - struct dlist_entry entry; - fi_addr_t addr; - uint64_t tag; -}; - struct rxm_iov { struct iovec iov[RXM_IOV_LIMIT]; void *desc[RXM_IOV_LIMIT]; uint8_t count; }; +struct rxm_proto_info { + /* Used for SAR protocol */ + struct { + struct dlist_entry entry; + struct dlist_entry pkt_list; + struct fi_peer_rx_entry *rx_entry; + size_t total_recv_len; + struct rxm_conn *conn; + uint64_t msg_id; + } sar; + /* Used for Rendezvous protocol */ + struct { + /* This is used to send RNDV ACK */ + struct rxm_tx_buf *tx_buf; + } rndv; +}; + struct rxm_buf { /* Must stay at top */ struct fi_context fi_context; @@ -478,9 +485,10 @@ struct rxm_rx_buf { /* MSG EP / shared context to which bufs would be posted to */ struct fid_ep *rx_ep; struct dlist_entry repost_entry; + struct dlist_entry unexp_entry; struct rxm_conn *conn; /* msg ep data was received on */ - struct rxm_recv_entry *recv_entry; - struct rxm_unexp_msg unexp_msg; + struct fi_peer_rx_entry *peer_entry; + struct rxm_proto_info *proto_info; uint64_t comp_flags; struct fi_recv_context recv_context; bool repost; @@ -608,49 +616,6 @@ struct rxm_deferred_tx_entry { }; }; -struct rxm_recv_entry { - struct dlist_entry entry; - struct rxm_iov rxm_iov; - fi_addr_t addr; - void *context; - uint64_t flags; - uint64_t tag; - uint64_t ignore; - uint64_t comp_flags; - size_t total_len; - struct rxm_recv_queue *recv_queue; - - /* Used for SAR protocol */ - struct { - struct dlist_entry entry; - size_t total_recv_len; - struct rxm_conn *conn; - uint64_t msg_id; - } sar; - /* Used for Rendezvous protocol */ - struct { - /* This is used to send RNDV ACK */ - struct rxm_tx_buf *tx_buf; - } rndv; -}; -OFI_DECLARE_FREESTACK(struct rxm_recv_entry, rxm_recv_fs); - -enum rxm_recv_queue_type { - RXM_RECV_QUEUE_UNSPEC, - RXM_RECV_QUEUE_MSG, - RXM_RECV_QUEUE_TAGGED, -}; - -struct rxm_recv_queue { - struct rxm_ep *rxm_ep; - enum rxm_recv_queue_type type; - struct rxm_recv_fs *fs; - struct dlist_entry recv_list; - struct dlist_entry unexp_msg_list; - dlist_func_t *match_recv; - dlist_func_t *match_unexp; -}; - struct rxm_eager_ops { void (*comp_tx)(struct rxm_ep *rxm_ep, struct rxm_tx_buf *tx_eager_buf); @@ -690,6 +655,8 @@ struct rxm_ep { struct fi_ops_transfer_peer *offload_coll_peer_xfer_ops; uint64_t offload_coll_mask; + struct fid_peer_srx *srx; + struct fid_cq *msg_cq; uint64_t msg_cq_last_poll; size_t comp_per_progress; @@ -703,7 +670,6 @@ struct rxm_ep { bool do_progress; bool enable_direct_send; - size_t min_multi_recv_size; size_t buffered_min; size_t buffered_limit; size_t inject_limit; @@ -715,15 +681,13 @@ struct rxm_ep { struct ofi_bufpool *rx_pool; struct ofi_bufpool *tx_pool; struct ofi_bufpool *coll_pool; + struct ofi_bufpool *proto_info_pool; + struct rxm_pkt *inject_pkt; struct dlist_entry deferred_queue; struct dlist_entry rndv_wait_list; - struct rxm_recv_queue recv_queue; - struct rxm_recv_queue trecv_queue; - struct ofi_bufpool *multi_recv_pool; - struct rxm_eager_ops *eager_ops; struct rxm_rndv_ops *rndv_ops; }; @@ -757,6 +721,9 @@ int rxm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, struct fid_cq **cq_fid, void *context); ssize_t rxm_handle_rx_buf(struct rxm_rx_buf *rx_buf); +int rxm_srx_context(struct fid_domain *domain, struct fi_rx_attr *attr, + struct fid_ep **rx_ep, void *context); + int rxm_endpoint(struct fid_domain *domain, struct fi_info *info, struct fid_ep **ep, void *context); void rxm_cq_write_tx_error(struct rxm_ep *rxm_ep, uint8_t op, void *op_context, @@ -915,17 +882,10 @@ ssize_t rxm_inject_send(struct rxm_ep *rxm_ep, struct rxm_conn *rxm_conn, const void *buf, size_t len); -struct rxm_recv_entry * -rxm_recv_entry_get(struct rxm_ep *rxm_ep, const struct iovec *iov, - void **desc, size_t count, fi_addr_t src_addr, - uint64_t tag, uint64_t ignore, void *context, - uint64_t flags, struct rxm_recv_queue *recv_queue); -struct rxm_rx_buf * -rxm_get_unexp_msg(struct rxm_recv_queue *recv_queue, fi_addr_t addr, - uint64_t tag, uint64_t ignore); -ssize_t rxm_handle_unexp_sar(struct rxm_recv_queue *recv_queue, - struct rxm_recv_entry *recv_entry, - struct rxm_rx_buf *rx_buf); +ssize_t rxm_handle_unexp_sar(struct fi_peer_rx_entry *peer_entry); +int rxm_srx_context(struct fid_domain *domain, struct fi_rx_attr *attr, + struct fid_ep **rx_ep, void *context); + int rxm_post_recv(struct rxm_rx_buf *rx_buf); void rxm_av_remove_handler(struct util_ep *util_ep, struct util_peer_addr *peer); @@ -946,15 +906,6 @@ rxm_free_rx_buf(struct rxm_rx_buf *rx_buf) } } -static inline void -rxm_recv_entry_release(struct rxm_recv_entry *entry) -{ - if (entry->recv_queue) - ofi_freestack_push(entry->recv_queue->fs, entry); - else - ofi_buf_free(entry); -} - struct rxm_mr *rxm_mr_get_map_entry(struct rxm_domain *domain, uint64_t key); struct rxm_recv_entry * diff --git a/prov/rxm/src/rxm_attr.c b/prov/rxm/src/rxm_attr.c index 632543585e4..6dc1241329e 100644 --- a/prov/rxm/src/rxm_attr.c +++ b/prov/rxm/src/rxm_attr.c @@ -40,7 +40,8 @@ OFI_RX_RMA_CAPS | FI_ATOMICS | FI_DIRECTED_RECV | \ FI_MULTI_RECV) -#define RXM_DOMAIN_CAPS (FI_LOCAL_COMM | FI_REMOTE_COMM | FI_AV_USER_ID) +#define RXM_DOMAIN_CAPS (FI_LOCAL_COMM | FI_REMOTE_COMM | FI_AV_USER_ID | \ + FI_PEER) /* Since we are a layering provider, the attributes for which we rely on the diff --git a/prov/rxm/src/rxm_conn.c b/prov/rxm/src/rxm_conn.c index afe603234ec..73b26f2a9f3 100644 --- a/prov/rxm/src/rxm_conn.c +++ b/prov/rxm/src/rxm_conn.c @@ -58,7 +58,7 @@ struct rxm_eq_cm_entry { static void rxm_close_conn(struct rxm_conn *conn) { struct rxm_deferred_tx_entry *tx_entry; - struct rxm_recv_entry *rx_entry; + struct fi_peer_rx_entry *rx_entry; struct rxm_rx_buf *buf; FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "closing conn %p\n", conn); @@ -74,16 +74,13 @@ static void rxm_close_conn(struct rxm_conn *conn) while (!dlist_empty(&conn->deferred_sar_segments)) { buf = container_of(conn->deferred_sar_segments.next, - struct rxm_rx_buf, unexp_msg.entry); - dlist_remove(&buf->unexp_msg.entry); - rxm_free_rx_buf(buf); + struct rxm_rx_buf, unexp_entry); + dlist_remove(&buf->unexp_entry); } while (!dlist_empty(&conn->deferred_sar_msgs)) { - rx_entry = container_of(conn->deferred_sar_msgs.next, - struct rxm_recv_entry, sar.entry); - dlist_remove(&rx_entry->entry); - rxm_recv_entry_release(rx_entry); + rx_entry = (struct fi_peer_rx_entry*)conn->deferred_sar_msgs.next; + rx_entry->srx->owner_ops->free_entry(rx_entry); } fi_close(&conn->msg_ep->fid); rxm_flush_msg_cq(conn->ep); diff --git a/prov/rxm/src/rxm_cq.c b/prov/rxm/src/rxm_cq.c index b04b36444d3..51206ddde04 100644 --- a/prov/rxm/src/rxm_cq.c +++ b/prov/rxm/src/rxm_cq.c @@ -106,11 +106,12 @@ static void rxm_cq_write_recv_comp(struct rxm_rx_buf *rx_buf, void *context, { int ret; + flags &= ~FI_COMPLETION; if (rx_buf->ep->util_coll_peer_xfer_ops && rx_buf->pkt.hdr.tag & RXM_PEER_XFER_TAG_FLAG) { struct fi_cq_tagged_entry cqe = { .tag = rx_buf->pkt.hdr.tag, - .op_context = rx_buf->recv_entry->context, + .op_context = rx_buf->peer_entry->context, }; rx_buf->ep->util_coll_peer_xfer_ops-> complete(rx_buf->ep->util_coll_ep, &cqe, 0); @@ -137,7 +138,7 @@ static void rxm_finish_buf_recv(struct rxm_rx_buf *rx_buf) if ((rx_buf->pkt.ctrl_hdr.type == rxm_ctrl_seg) && rxm_sar_get_seg_type(&rx_buf->pkt.ctrl_hdr) != RXM_SAR_SEG_FIRST) { - dlist_insert_tail(&rx_buf->unexp_msg.entry, + dlist_insert_tail(&rx_buf->unexp_entry, &rx_buf->conn->deferred_sar_segments); rxm_replace_rx_buf(rx_buf); } @@ -172,10 +173,11 @@ static void rxm_cq_write_error_trunc(struct rxm_rx_buf *rx_buf, size_t done_len) done_len, rx_buf->pkt.hdr.size); ret = ofi_peer_cq_write_error_trunc( rx_buf->ep->util_ep.rx_cq, - rx_buf->recv_entry->context, - rx_buf->recv_entry->comp_flags | - rx_buf->pkt.hdr.flags, rx_buf->pkt.hdr.size, - rx_buf->recv_entry->rxm_iov.iov[0].iov_base, + rx_buf->peer_entry->context, + rx_buf->peer_entry->flags | + rx_buf->pkt.hdr.flags, + rx_buf->pkt.hdr.size, + rx_buf->peer_entry->iov[0].iov_base, rx_buf->pkt.hdr.data, rx_buf->pkt.hdr.tag, rx_buf->pkt.hdr.size - done_len); if (ret) { @@ -186,27 +188,22 @@ static void rxm_cq_write_error_trunc(struct rxm_rx_buf *rx_buf, size_t done_len) static void rxm_finish_recv(struct rxm_rx_buf *rx_buf, size_t done_len) { - struct rxm_recv_entry *recv_entry = rx_buf->recv_entry; - if (done_len < rx_buf->pkt.hdr.size) { rxm_cq_write_error_trunc(rx_buf, done_len); goto release; } - if (rx_buf->recv_entry->flags & FI_COMPLETION || + if (rx_buf->peer_entry->flags & FI_COMPLETION || rx_buf->ep->rxm_info->mode & OFI_BUFFERED_RECV) { - rxm_cq_write_recv_comp( - rx_buf, rx_buf->recv_entry->context, - rx_buf->recv_entry->comp_flags | - rx_buf->pkt.hdr.flags | - (rx_buf->recv_entry->flags & FI_MULTI_RECV), - rx_buf->pkt.hdr.size, - rx_buf->recv_entry->rxm_iov. - iov[0].iov_base); + rxm_cq_write_recv_comp(rx_buf, rx_buf->peer_entry->context, + rx_buf->peer_entry->flags | + rx_buf->pkt.hdr.flags, + rx_buf->pkt.hdr.size, + rx_buf->peer_entry->iov[0].iov_base); } ofi_ep_peer_rx_cntr_inc(&rx_buf->ep->util_ep, ofi_op_msg); release: - rxm_recv_entry_release(recv_entry); + rx_buf->ep->srx->owner_ops->free_entry(rx_buf->peer_entry); rxm_free_rx_buf(rx_buf); } @@ -294,18 +291,20 @@ static void rxm_handle_sar_comp(struct rxm_ep *rxm_ep, static void rxm_rndv_rx_finish(struct rxm_rx_buf *rx_buf) { + struct rxm_proto_info *proto_info; + RXM_UPDATE_STATE(FI_LOG_CQ, rx_buf, RXM_RNDV_FINISH); - if (rx_buf->recv_entry->rndv.tx_buf) { - ofi_buf_free(rx_buf->recv_entry->rndv.tx_buf); - rx_buf->recv_entry->rndv.tx_buf = NULL; + proto_info = rx_buf->proto_info; + if (proto_info->rndv.tx_buf) { + ofi_buf_free(proto_info); + ofi_buf_free(proto_info->rndv.tx_buf); } if (!rx_buf->ep->rdm_mr_local) - rxm_msg_mr_closev(rx_buf->mr, - rx_buf->recv_entry->rxm_iov.count); + rxm_msg_mr_closev(rx_buf->mr, rx_buf->peer_entry->count); - rxm_finish_recv(rx_buf, rx_buf->recv_entry->total_len); + rxm_finish_recv(rx_buf, rx_buf->peer_entry->msg_size); } static void rxm_rndv_tx_finish(struct rxm_ep *rxm_ep, @@ -398,96 +397,135 @@ static int rxm_rx_buf_match_msg_id(struct dlist_entry *item, const void *arg) uint64_t msg_id = *((uint64_t *) arg); struct rxm_rx_buf *rx_buf; - rx_buf = container_of(item, struct rxm_rx_buf, unexp_msg.entry); + rx_buf = container_of(item, struct rxm_rx_buf, unexp_entry); return (msg_id == rx_buf->pkt.ctrl_hdr.msg_id); } -static void rxm_process_seg_data(struct rxm_rx_buf *rx_buf, int *done) +static void rxm_init_sar_proto(struct rxm_rx_buf *rx_buf) +{ + struct rxm_proto_info *proto_info; + + proto_info = ofi_buf_alloc(rx_buf->ep->proto_info_pool); + if (!proto_info) { + FI_WARN(&rxm_prov, FI_LOG_CQ, + "Failed to allocate proto info buffer\n"); + return; + } + if (!rx_buf->conn) { + rx_buf->conn = ofi_idm_at(&rx_buf->ep->conn_idx_map, + (int) rx_buf->pkt.ctrl_hdr.conn_id); + } + + proto_info->sar.conn = rx_buf->conn; + proto_info->sar.msg_id = rx_buf->pkt.ctrl_hdr.msg_id; + proto_info->sar.total_recv_len = 0; + proto_info->sar.rx_entry = rx_buf->peer_entry; + + dlist_insert_tail(&proto_info->sar.entry, + &rx_buf->conn->deferred_sar_msgs); + + dlist_init(&proto_info->sar.pkt_list); + if (rx_buf->peer_entry->peer_context) + dlist_insert_tail(&rx_buf->unexp_entry, + &proto_info->sar.pkt_list); + + + rx_buf->proto_info = proto_info; +} + +int rxm_process_seg_data(struct rxm_rx_buf *rx_buf) { enum fi_hmem_iface iface; + struct rxm_proto_info *proto_info; uint64_t device; ssize_t done_len; + int done = 0; - iface = rxm_iov_desc_to_hmem_iface_dev(rx_buf->recv_entry->rxm_iov.iov, - rx_buf->recv_entry->rxm_iov.desc, - rx_buf->recv_entry->rxm_iov.count, + proto_info = rx_buf->proto_info; + iface = rxm_iov_desc_to_hmem_iface_dev(rx_buf->peer_entry->iov, + rx_buf->peer_entry->desc, + rx_buf->peer_entry->count, &device); done_len = ofi_copy_to_hmem_iov(iface, device, - rx_buf->recv_entry->rxm_iov.iov, - rx_buf->recv_entry->rxm_iov.count, - rx_buf->recv_entry->sar.total_recv_len, + rx_buf->peer_entry->iov, + rx_buf->peer_entry->count, + proto_info->sar.total_recv_len, rx_buf->pkt.data, rx_buf->pkt.ctrl_hdr.seg_size); assert(done_len == rx_buf->pkt.ctrl_hdr.seg_size); - rx_buf->recv_entry->sar.total_recv_len += done_len; + proto_info->sar.total_recv_len += done_len; if ((rxm_sar_get_seg_type(&rx_buf->pkt.ctrl_hdr) == RXM_SAR_SEG_LAST) || (done_len != rx_buf->pkt.ctrl_hdr.seg_size)) { - - dlist_remove(&rx_buf->recv_entry->sar.entry); - - /* Mark rxm_recv_entry::msg_id as unknown for futher re-use */ - rx_buf->recv_entry->sar.msg_id = RXM_SAR_RX_INIT; - - done_len = rx_buf->recv_entry->sar.total_recv_len; - rx_buf->recv_entry->sar.total_recv_len = 0; - - *done = 1; + if (!rx_buf->peer_entry->peer_context) + dlist_remove(&proto_info->sar.entry); + done_len = proto_info->sar.total_recv_len; + done = 1; + ofi_buf_free(rx_buf->proto_info); rxm_finish_recv(rx_buf, done_len); } else { - if (rx_buf->recv_entry->sar.msg_id == RXM_SAR_RX_INIT) { - if (!rx_buf->conn) { - rx_buf->conn = ofi_idm_at(&rx_buf->ep->conn_idx_map, - (int) rx_buf->pkt.ctrl_hdr.conn_id); - } - - rx_buf->recv_entry->sar.conn = rx_buf->conn; - rx_buf->recv_entry->sar.msg_id = rx_buf->pkt.ctrl_hdr.msg_id; - - dlist_insert_tail(&rx_buf->recv_entry->sar.entry, - &rx_buf->conn->deferred_sar_msgs); - } - /* The RX buffer can be reposted for further re-use */ - rx_buf->recv_entry = NULL; + rx_buf->peer_entry = NULL; rxm_free_rx_buf(rx_buf); - - *done = 0; } + return done; } static void rxm_handle_seg_data(struct rxm_rx_buf *rx_buf) { - struct rxm_recv_entry *recv_entry; + struct rxm_proto_info *proto_info; + struct fi_peer_rx_entry *rx_entry; struct rxm_conn *conn; uint64_t msg_id; struct dlist_entry *entry; - int done; - rxm_process_seg_data(rx_buf, &done); - if (done || !(rx_buf->ep->rxm_info->mode & OFI_BUFFERED_RECV)) + if (dlist_empty(&rx_buf->proto_info->sar.pkt_list)) { + rxm_process_seg_data(rx_buf); return; + } - recv_entry = rx_buf->recv_entry; + proto_info = rx_buf->proto_info; + dlist_insert_tail(&rx_buf->unexp_entry, &proto_info->sar.pkt_list); + + if ((rxm_sar_get_seg_type(&rx_buf->pkt.ctrl_hdr) == RXM_SAR_SEG_LAST)) + dlist_remove(&proto_info->sar.entry); + + rx_entry = rx_buf->peer_entry; conn = rx_buf->conn; msg_id = rx_buf->pkt.ctrl_hdr.msg_id; dlist_foreach_container_safe(&conn->deferred_sar_segments, struct rxm_rx_buf, rx_buf, - unexp_msg.entry, entry) { - if (!rxm_rx_buf_match_msg_id(&rx_buf->unexp_msg.entry, &msg_id)) + unexp_entry, entry) { + if (!rxm_rx_buf_match_msg_id(&rx_buf->unexp_entry, &msg_id)) continue; - dlist_remove(&rx_buf->unexp_msg.entry); - rx_buf->recv_entry = recv_entry; - rxm_process_seg_data(rx_buf, &done); - if (done) + dlist_remove(&rx_buf->unexp_entry); + rx_buf->peer_entry = rx_entry; + if (rxm_process_seg_data(rx_buf)) break; } } +ssize_t rxm_handle_unexp_sar(struct fi_peer_rx_entry *peer_entry) +{ + struct rxm_proto_info *proto_info; + struct rxm_rx_buf *rx_buf; + + rx_buf = (struct rxm_rx_buf *) peer_entry->peer_context; + proto_info = rx_buf->proto_info; + + while (!dlist_empty(&proto_info->sar.pkt_list)) { + dlist_pop_front(&proto_info->sar.pkt_list, + struct rxm_rx_buf, rx_buf, unexp_entry); + rxm_process_seg_data(rx_buf); + } + peer_entry->peer_context = NULL; + return FI_SUCCESS; +} + static ssize_t rxm_rndv_xfer(struct rxm_ep *rxm_ep, struct fid_ep *msg_ep, struct rxm_rndv_hdr *remote_hdr, struct iovec *local_iov, void **local_desc, size_t local_count, size_t total_len, @@ -538,14 +576,15 @@ ssize_t rxm_rndv_read(struct rxm_rx_buf *rx_buf) ssize_t ret; size_t total_len; - total_len = MIN(rx_buf->recv_entry->total_len, rx_buf->pkt.hdr.size); + total_len = MIN(rx_buf->peer_entry->msg_size, rx_buf->pkt.hdr.size); + rx_buf->peer_entry->msg_size = total_len; RXM_UPDATE_STATE(FI_LOG_CQ, rx_buf, RXM_RNDV_READ); ret = rxm_rndv_xfer(rx_buf->ep, rx_buf->conn->msg_ep, rx_buf->remote_rndv_hdr, - rx_buf->recv_entry->rxm_iov.iov, - rx_buf->recv_entry->rxm_iov.desc, - rx_buf->recv_entry->rxm_iov.count, total_len, + rx_buf->peer_entry->iov, + rx_buf->peer_entry->desc, + rx_buf->peer_entry->count, total_len, rx_buf); if (ret) { rxm_cq_write_rx_error(rx_buf->ep, ofi_op_msg, rx_buf, @@ -621,28 +660,26 @@ static ssize_t rxm_handle_rndv(struct rxm_rx_buf *rx_buf) rx_buf->rndv_rma_index = 0; if (!rx_buf->ep->rdm_mr_local) { - total_recv_len = MIN(rx_buf->recv_entry->total_len, + total_recv_len = MIN(rx_buf->peer_entry->msg_size, rx_buf->pkt.hdr.size); - ret = rxm_msg_mr_regv(rx_buf->ep, rx_buf->recv_entry->rxm_iov.iov, - rx_buf->recv_entry->rxm_iov.count, + ret = rxm_msg_mr_regv(rx_buf->ep, rx_buf->peer_entry->iov, + rx_buf->peer_entry->count, total_recv_len, rx_buf->ep->rndv_ops->rx_mr_access, rx_buf->mr); if (ret) return ret; - for (i = 0; (i < rx_buf->recv_entry->rxm_iov.count && + for (i = 0; (i < rx_buf->peer_entry->count && rx_buf->mr[i]); i++) { - rx_buf->recv_entry->rxm_iov.desc[i] = - fi_mr_desc(rx_buf->mr[i]); + rx_buf->peer_entry->desc[i] = fi_mr_desc(rx_buf->mr[i]); } } else { struct rxm_mr *mr; - for (i = 0; i < rx_buf->recv_entry->rxm_iov.count; i++) { - mr = rx_buf->recv_entry->rxm_iov.desc[i]; - rx_buf->recv_entry->rxm_iov.desc[i] = - fi_mr_desc(mr->msg_mr); + for (i = 0; i < rx_buf->peer_entry->count; i++) { + mr = rx_buf->peer_entry->desc[i]; + rx_buf->peer_entry->desc[i] = fi_mr_desc(mr->msg_mr); rx_buf->mr[i] = mr->msg_mr; } } @@ -656,9 +693,9 @@ static ssize_t rxm_handle_rndv(struct rxm_rx_buf *rx_buf) void rxm_handle_eager(struct rxm_rx_buf *rx_buf) { ssize_t done_len = rxm_copy_to_hmem_iov( - rx_buf->recv_entry->rxm_iov.desc, rx_buf->data, - rx_buf->pkt.hdr.size, rx_buf->recv_entry->rxm_iov.iov, - rx_buf->recv_entry->rxm_iov.count, 0); + rx_buf->peer_entry->desc, rx_buf->data, + rx_buf->pkt.hdr.size, rx_buf->peer_entry->iov, + rx_buf->peer_entry->count, 0); assert((size_t) done_len == rx_buf->pkt.hdr.size); @@ -671,14 +708,14 @@ void rxm_handle_coll_eager(struct rxm_rx_buf *rx_buf) uint64_t device; ssize_t done_len; - iface = rxm_iov_desc_to_hmem_iface_dev(rx_buf->recv_entry->rxm_iov.iov, - rx_buf->recv_entry->rxm_iov.desc, - rx_buf->recv_entry->rxm_iov.count, + iface = rxm_iov_desc_to_hmem_iface_dev(rx_buf->peer_entry->iov, + rx_buf->peer_entry->desc, + rx_buf->peer_entry->count, &device); done_len = ofi_copy_to_hmem_iov(iface, device, - rx_buf->recv_entry->rxm_iov.iov, - rx_buf->recv_entry->rxm_iov.count, 0, + rx_buf->peer_entry->iov, + rx_buf->peer_entry->count, 0, rx_buf->data, rx_buf->pkt.hdr.size); assert((size_t) done_len == rx_buf->pkt.hdr.size); @@ -686,11 +723,11 @@ void rxm_handle_coll_eager(struct rxm_rx_buf *rx_buf) rx_buf->pkt.hdr.tag & RXM_PEER_XFER_TAG_FLAG) { struct fi_cq_tagged_entry cqe = { .tag = rx_buf->pkt.hdr.tag, - .op_context = rx_buf->recv_entry->context, + .op_context = rx_buf->peer_entry->context, }; rx_buf->ep->util_coll_peer_xfer_ops-> complete(rx_buf->ep->util_coll_ep, &cqe, 0); - rxm_recv_entry_release(rx_buf->recv_entry); + rx_buf->ep->srx->owner_ops->free_entry(rx_buf->peer_entry); rxm_free_rx_buf(rx_buf); } else { rxm_finish_recv(rx_buf, done_len); @@ -715,73 +752,26 @@ ssize_t rxm_handle_rx_buf(struct rxm_rx_buf *rx_buf) } } -static void rxm_adjust_multi_recv(struct rxm_rx_buf *rx_buf) +static inline void rxm_entry_prep_for_queue(struct fi_peer_rx_entry *rx_entry, + struct rxm_rx_buf *rx_buf) { - struct rxm_recv_entry *recv_entry; - struct iovec new_iov; - size_t recv_size; - - recv_size = rx_buf->pkt.hdr.size; - - if (rx_buf->recv_entry->rxm_iov.iov[0].iov_len < recv_size || - rx_buf->recv_entry->rxm_iov.iov[0].iov_len - recv_size < - rx_buf->ep->min_multi_recv_size) - return; - - new_iov.iov_base = (uint8_t *) - rx_buf->recv_entry->rxm_iov.iov[0].iov_base + recv_size; - new_iov.iov_len = rx_buf->recv_entry->rxm_iov.iov[0].iov_len - recv_size;; - - rx_buf->recv_entry->rxm_iov.iov[0].iov_len = recv_size; - - recv_entry = rxm_multi_recv_entry_get(rx_buf->ep, &new_iov, - rx_buf->recv_entry->rxm_iov.desc, 1, - rx_buf->recv_entry->addr, - rx_buf->recv_entry->tag, - rx_buf->recv_entry->ignore, - rx_buf->recv_entry->context, - rx_buf->recv_entry->flags); - - rx_buf->recv_entry->flags &= ~FI_MULTI_RECV; - - dlist_insert_head(&recv_entry->entry, &rx_buf->ep->recv_queue.recv_list); -} - -static ssize_t -rxm_match_rx_buf(struct rxm_rx_buf *rx_buf, - struct rxm_recv_queue *recv_queue, - struct rxm_recv_match_attr *match_attr) -{ - struct dlist_entry *entry; - - entry = dlist_remove_first_match(&recv_queue->recv_list, - recv_queue->match_recv, match_attr); - if (entry) { - rx_buf->recv_entry = container_of(entry, struct rxm_recv_entry, entry); - - if (rx_buf->recv_entry->flags & FI_MULTI_RECV) - rxm_adjust_multi_recv(rx_buf); - - return rxm_handle_rx_buf(rx_buf); + rx_entry->peer_context = rx_buf; + rx_buf->peer_entry = rx_entry; + if (rx_buf->pkt.hdr.flags & FI_REMOTE_CQ_DATA) { + rx_entry->flags |= FI_REMOTE_CQ_DATA; + rx_entry->cq_data = rx_buf->pkt.hdr.data; } - - RXM_DBG_ADDR_TAG(FI_LOG_CQ, "No matching recv found for incoming msg", - match_attr->addr, match_attr->tag); - FI_DBG(&rxm_prov, FI_LOG_CQ, "Enqueueing msg to unexpected msg queue\n"); - rx_buf->unexp_msg.addr = match_attr->addr; - rx_buf->unexp_msg.tag = match_attr->tag; - - dlist_insert_tail(&rx_buf->unexp_msg.entry, - &recv_queue->unexp_msg_list); + if (rx_buf->pkt.ctrl_hdr.type == rxm_ctrl_seg) + rxm_init_sar_proto(rx_buf); rxm_replace_rx_buf(rx_buf); - return 0; } static ssize_t rxm_handle_recv_comp(struct rxm_rx_buf *rx_buf) { - struct rxm_recv_match_attr match_attr = { - .addr = FI_ADDR_UNSPEC, - }; + struct fid_peer_srx *srx = rx_buf->ep->srx; + struct fi_peer_rx_entry *rx_entry; + struct fi_peer_match_attr match = {0}; + int ret; if (rx_buf->ep->rxm_info->caps & (FI_SOURCE | FI_DIRECTED_RECV)) { if (rx_buf->ep->msg_srx) @@ -789,7 +779,9 @@ static ssize_t rxm_handle_recv_comp(struct rxm_rx_buf *rx_buf) (int) rx_buf->pkt.ctrl_hdr.conn_id); if (!rx_buf->conn) return -FI_EOTHER; - match_attr.addr = rx_buf->conn->peer->fi_addr; + match.addr = rx_buf->conn->peer->fi_addr; + } else { + match.addr = FI_ADDR_UNSPEC; } if (rx_buf->ep->rxm_info->mode & OFI_BUFFERED_RECV) { @@ -799,33 +791,52 @@ static ssize_t rxm_handle_recv_comp(struct rxm_rx_buf *rx_buf) switch(rx_buf->pkt.hdr.op) { case ofi_op_msg: + match.msg_size = rx_buf->pkt.hdr.size; FI_DBG(&rxm_prov, FI_LOG_CQ, "Got MSG op\n"); - return rxm_match_rx_buf(rx_buf, &rx_buf->ep->recv_queue, - &match_attr); + ret = srx->owner_ops->get_msg(srx, &match, &rx_entry); + if (ret == -FI_ENOENT) { + rxm_entry_prep_for_queue(rx_entry, rx_buf); + return srx->owner_ops->queue_msg(rx_entry); + } + rx_entry->peer_context = NULL; + break; case ofi_op_tagged: + match.tag = rx_buf->pkt.hdr.tag; + match.msg_size = rx_buf->pkt.hdr.size; FI_DBG(&rxm_prov, FI_LOG_CQ, "Got TAGGED op\n"); - match_attr.tag = rx_buf->pkt.hdr.tag; - return rxm_match_rx_buf(rx_buf, &rx_buf->ep->trecv_queue, - &match_attr); + ret = srx->owner_ops->get_tag(srx, &match, &rx_entry); + if (ret == -FI_ENOENT) { + rxm_entry_prep_for_queue(rx_entry, rx_buf); + return srx->owner_ops->queue_tag(rx_entry); + } + rx_entry->peer_context = NULL; + break; default: FI_WARN(&rxm_prov, FI_LOG_CQ, "Unknown op!\n"); assert(0); return -FI_EINVAL; } + rx_buf->peer_entry = rx_entry; + + if (rx_buf->pkt.ctrl_hdr.type == rxm_ctrl_seg) + rxm_init_sar_proto(rx_buf); + + return rxm_handle_rx_buf(rx_buf); } static int rxm_sar_match_msg_id(struct dlist_entry *item, const void *arg) { uint64_t msg_id = *((uint64_t *) arg); - struct rxm_recv_entry *recv_entry; + struct rxm_proto_info *proto_info; - recv_entry = container_of(item, struct rxm_recv_entry, sar.entry); - return (msg_id == recv_entry->sar.msg_id); + proto_info = container_of(item, struct rxm_proto_info, sar.entry); + return (msg_id == proto_info->sar.msg_id); } static ssize_t rxm_sar_handle_segment(struct rxm_rx_buf *rx_buf) { struct dlist_entry *sar_entry; + struct rxm_proto_info *proto_info; rx_buf->conn = ofi_idm_at(&rx_buf->ep->conn_idx_map, (int) rx_buf->pkt.ctrl_hdr.conn_id); @@ -841,8 +852,9 @@ static ssize_t rxm_sar_handle_segment(struct rxm_rx_buf *rx_buf) if (!sar_entry) return rxm_handle_recv_comp(rx_buf); - rx_buf->recv_entry = container_of(sar_entry, struct rxm_recv_entry, - sar.entry); + proto_info = container_of(sar_entry, struct rxm_proto_info, sar.entry); + rx_buf->peer_entry = proto_info->sar.rx_entry; + rx_buf->proto_info = proto_info; rxm_handle_seg_data(rx_buf); return 0; } @@ -860,8 +872,15 @@ static void rxm_rndv_send_rd_done(struct rxm_rx_buf *rx_buf) ret = -FI_ENOMEM; goto err; } + rx_buf->proto_info = ofi_buf_alloc(rx_buf->ep->proto_info_pool); + if (!rx_buf->proto_info) { + FI_WARN(&rxm_prov, FI_LOG_CQ, + "Failed to allocated proto info buf\n"); + assert(0); + return; + } - rx_buf->recv_entry->rndv.tx_buf = buf; + rx_buf->proto_info->rndv.tx_buf = buf; buf->pkt.ctrl_hdr.type = rxm_ctrl_rndv_rd_done; buf->pkt.ctrl_hdr.conn_id = rx_buf->conn->remote_index; @@ -888,8 +907,9 @@ static void rxm_rndv_send_rd_done(struct rxm_rx_buf *rx_buf) return; free: + rx_buf->proto_info->rndv.tx_buf = NULL; + ofi_buf_free(rx_buf->proto_info); ofi_buf_free(buf); - rx_buf->recv_entry->rndv.tx_buf = NULL; err: FI_WARN(&rxm_prov, FI_LOG_CQ, "unable to allocate/send rd rndv ack: %s\n", @@ -968,14 +988,22 @@ ssize_t rxm_rndv_send_wr_data(struct rxm_rx_buf *rx_buf) goto err; } - rx_buf->recv_entry->rndv.tx_buf = buf; + rx_buf->proto_info = ofi_buf_alloc(rx_buf->ep->proto_info_pool); + if (!rx_buf->proto_info) { + FI_WARN(&rxm_prov, FI_LOG_CQ, + "Failed to allocated proto info buf\n"); + return -FI_ENOMEM; + } + + rx_buf->proto_info->rndv.tx_buf = buf; + buf->pkt.ctrl_hdr.type = rxm_ctrl_rndv_wr_data; buf->pkt.ctrl_hdr.conn_id = rx_buf->conn->remote_index; buf->pkt.ctrl_hdr.msg_id = rx_buf->pkt.ctrl_hdr.msg_id; rxm_rndv_hdr_init(rx_buf->ep, buf->pkt.data, - rx_buf->recv_entry->rxm_iov.iov, - rx_buf->recv_entry->rxm_iov.count, rx_buf->mr); + rx_buf->peer_entry->iov, + rx_buf->peer_entry->count, rx_buf->mr); ret = fi_send(rx_buf->conn->msg_ep, &buf->pkt, sizeof(buf->pkt) + sizeof(struct rxm_rndv_hdr), buf->hdr.desc, 0, rx_buf); @@ -999,8 +1027,9 @@ ssize_t rxm_rndv_send_wr_data(struct rxm_rx_buf *rx_buf) return 0; free: + rx_buf->proto_info->rndv.tx_buf = NULL; + ofi_buf_free(rx_buf->proto_info); ofi_buf_free(buf); - rx_buf->recv_entry->rndv.tx_buf = NULL; err: FI_WARN(&rxm_prov, FI_LOG_CQ, "unable to allocate/send wr rndv ready: %s\n", @@ -1638,7 +1667,7 @@ void rxm_handle_comp_error(struct rxm_ep *rxm_ep) * the event yet. */ rx_buf = (struct rxm_rx_buf *) err_entry.op_context; - if (!rx_buf->recv_entry) { + if (!rx_buf->peer_entry) { ofi_buf_free((struct rxm_rx_buf *)err_entry.op_context); return; } @@ -1647,9 +1676,9 @@ void rxm_handle_comp_error(struct rxm_ep *rxm_ep) case RXM_RNDV_WRITE_DATA_SENT: /* BUG: should fail initial send */ case RXM_RNDV_READ: rx_buf = (struct rxm_rx_buf *) err_entry.op_context; - assert(rx_buf->recv_entry); - err_entry.op_context = rx_buf->recv_entry->context; - err_entry.flags = rx_buf->recv_entry->comp_flags; + assert(rx_buf->peer_entry); + err_entry.op_context = rx_buf->peer_entry->context; + err_entry.flags = rx_buf->peer_entry->flags; cq = rx_buf->ep->util_ep.rx_cq; cntr = rx_buf->ep->util_ep.cntrs[CNTR_RX]; @@ -1780,7 +1809,8 @@ int rxm_post_recv(struct rxm_rx_buf *rx_buf) if (rx_buf->ep->msg_srx) rx_buf->conn = NULL; rx_buf->hdr.state = RXM_RX; - rx_buf->recv_entry = NULL; + rx_buf->peer_entry = NULL; + rx_buf->proto_info = NULL; domain = container_of(rx_buf->ep->util_ep.domain, struct rxm_domain, util_domain); @@ -1858,7 +1888,7 @@ void rxm_ep_do_progress(struct util_ep *util_ep) rxm_conn_progress(rxm_ep); } } else { - rxm_conn_progress(rxm_ep); + rxm_conn_progress(rxm_ep); } } } while ((ret > 0) && (comp_read < rxm_ep->comp_per_progress)); @@ -1975,6 +2005,9 @@ int rxm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, if (ret) goto err1; + if (attr->flags & FI_PEER) + goto out; + rxm_domain = container_of(domain, struct rxm_domain, util_domain.domain_fid); @@ -1996,11 +2029,12 @@ int rxm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, if (ret) goto err2; } + rxm_cq->util_cq.cq_fid.ops = &rxm_cq_ops; +out: *cq_fid = &rxm_cq->util_cq.cq_fid; /* Override util_cq_fi_ops */ (*cq_fid)->fid.ops = &rxm_cq_fi_ops; - (*cq_fid)->ops = &rxm_cq_ops; return 0; err2: diff --git a/prov/rxm/src/rxm_domain.c b/prov/rxm/src/rxm_domain.c index 055fca16bea..9fcadf56763 100644 --- a/prov/rxm/src/rxm_domain.c +++ b/prov/rxm/src/rxm_domain.c @@ -221,6 +221,25 @@ static struct fi_ops_av_owner rxm_av_owner_ops = { .ep_addr = rxm_peer_av_ep_addr, }; +static fi_addr_t rxm_get_addr(struct fi_peer_rx_entry *rx_entry) +{ + struct rxm_rx_buf *rx_buf = rx_entry->peer_context; + + return rx_buf->conn->peer->fi_addr; +} + +static void rxm_foreach_ep(struct util_av *av, struct util_ep *ep) +{ + struct rxm_ep *rxm_ep; + struct fid_peer_srx *peer_srx; + + rxm_ep = container_of(ep, struct rxm_ep, util_ep); + peer_srx = container_of(rxm_ep->srx, struct fid_peer_srx, ep_fid); + if (peer_srx) + peer_srx->owner_ops->foreach_unspec_addr(peer_srx, &rxm_get_addr); +} + + static int rxm_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, struct fid_av **fid_av, void *context) @@ -236,7 +255,8 @@ rxm_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, ret = rxm_util_av_open(domain_fid, attr, &fid_av_new, context, sizeof(struct rxm_conn), - ofi_av_remove_cleanup ? rxm_av_remove_handler : NULL); + ofi_av_remove_cleanup ? rxm_av_remove_handler : NULL, + &rxm_foreach_ep); if (ret) return ret; @@ -346,7 +366,7 @@ static struct fi_ops_domain rxm_domain_ops = { .cntr_open = rxm_cntr_open, .poll_open = fi_poll_create, .stx_ctx = fi_no_stx_context, - .srx_ctx = fi_no_srx_context, + .srx_ctx = rxm_srx_context, .query_atomic = rxm_ep_query_atomic, .query_collective = rxm_query_collective, }; diff --git a/prov/rxm/src/rxm_ep.c b/prov/rxm/src/rxm_ep.c index 69a88e2caaf..b967643c0c5 100644 --- a/prov/rxm/src/rxm_ep.c +++ b/prov/rxm/src/rxm_ep.c @@ -42,79 +42,6 @@ #include "rxm.h" -static int rxm_match_noop(struct dlist_entry *item, const void *arg) -{ - OFI_UNUSED(item); - OFI_UNUSED(arg); - return 1; -} - -static int rxm_match_recv_entry(struct dlist_entry *item, const void *arg) -{ - struct rxm_recv_match_attr *attr = (struct rxm_recv_match_attr *) arg; - struct rxm_recv_entry *recv_entry = - container_of(item, struct rxm_recv_entry, entry); - return ofi_match_addr(recv_entry->addr, attr->addr); -} - -static int rxm_match_recv_entry_tag(struct dlist_entry *item, const void *arg) -{ - struct rxm_recv_match_attr *attr = (struct rxm_recv_match_attr *) arg; - struct rxm_recv_entry *recv_entry = - container_of(item, struct rxm_recv_entry, entry); - return ofi_match_tag(recv_entry->tag, recv_entry->ignore, attr->tag); -} - -static int rxm_match_recv_entry_tag_addr(struct dlist_entry *item, const void *arg) -{ - struct rxm_recv_match_attr *attr = (struct rxm_recv_match_attr *) arg; - struct rxm_recv_entry *recv_entry = - container_of(item, struct rxm_recv_entry, entry); - return ofi_match_addr(recv_entry->addr, attr->addr) && - ofi_match_tag(recv_entry->tag, recv_entry->ignore, attr->tag); -} - -static int rxm_match_recv_entry_context(struct dlist_entry *item, const void *context) -{ - struct rxm_recv_entry *recv_entry = - container_of(item, struct rxm_recv_entry, entry); - return recv_entry->context == context; -} - -static fi_addr_t rxm_get_unexp_addr(struct rxm_unexp_msg *unexp_msg) -{ - struct rxm_rx_buf *rx_buf; - - rx_buf = container_of(unexp_msg, struct rxm_rx_buf, unexp_msg); - return (unexp_msg->addr != FI_ADDR_UNSPEC) ? - unexp_msg->addr : rx_buf->conn->peer->fi_addr; -} - -static int rxm_match_unexp_msg(struct dlist_entry *item, const void *arg) -{ - struct rxm_recv_match_attr *attr = (struct rxm_recv_match_attr *)arg; - struct rxm_unexp_msg *unexp_msg = - container_of(item, struct rxm_unexp_msg, entry); - return ofi_match_addr(attr->addr, rxm_get_unexp_addr(unexp_msg)); -} - -static int rxm_match_unexp_msg_tag(struct dlist_entry *item, const void *arg) -{ - struct rxm_recv_match_attr *attr = (struct rxm_recv_match_attr *) arg; - struct rxm_unexp_msg *unexp_msg = - container_of(item, struct rxm_unexp_msg, entry); - return ofi_match_tag(attr->tag, attr->ignore, unexp_msg->tag); -} - -static int rxm_match_unexp_msg_tag_addr(struct dlist_entry *item, const void *arg) -{ - struct rxm_recv_match_attr *attr = (struct rxm_recv_match_attr *) arg; - struct rxm_unexp_msg *unexp_msg = - container_of(item, struct rxm_unexp_msg, entry); - return ofi_match_addr(attr->addr, rxm_get_unexp_addr(unexp_msg)) && - ofi_match_tag(attr->tag, attr->ignore, unexp_msg->tag); -} - static int rxm_buf_reg(struct ofi_bufpool_region *region) { struct rxm_ep *rxm_ep = region->pool->attr.context; @@ -158,6 +85,7 @@ static void rxm_init_rx_buf(struct ofi_bufpool_region *region, void *buf) fi_mr_desc((struct fid_mr *) region->context) : NULL; rx_buf->ep = ep; rx_buf->data = &rx_buf->pkt.data; + dlist_init(&rx_buf->unexp_entry); } static void rxm_init_tx_buf(struct ofi_bufpool_region *region, void *buf) @@ -186,69 +114,6 @@ static void rxm_buf_close(struct ofi_bufpool_region *region) } } -static void rxm_recv_entry_init(struct rxm_recv_entry *entry, void *arg) -{ - struct rxm_recv_queue *recv_queue = arg; - - assert(recv_queue->type != RXM_RECV_QUEUE_UNSPEC); - - entry->recv_queue = recv_queue; - entry->sar.msg_id = RXM_SAR_RX_INIT; - entry->sar.total_recv_len = 0; - /* set it to NULL to differentiate between regular ACKs and those - * sent with FI_INJECT */ - entry->rndv.tx_buf = NULL; - entry->comp_flags = FI_RECV; - - if (recv_queue->type == RXM_RECV_QUEUE_MSG) - entry->comp_flags |= FI_MSG; - else - entry->comp_flags |= FI_TAGGED; -} - -static int rxm_recv_queue_init(struct rxm_ep *rxm_ep, struct rxm_recv_queue *recv_queue, - size_t size, enum rxm_recv_queue_type type) -{ - recv_queue->rxm_ep = rxm_ep; - recv_queue->type = type; - recv_queue->fs = rxm_recv_fs_create(size, rxm_recv_entry_init, - recv_queue); - if (!recv_queue->fs) - return -FI_ENOMEM; - - dlist_init(&recv_queue->recv_list); - dlist_init(&recv_queue->unexp_msg_list); - if (type == RXM_RECV_QUEUE_MSG) { - if (rxm_ep->rxm_info->caps & FI_DIRECTED_RECV) { - recv_queue->match_recv = rxm_match_recv_entry; - recv_queue->match_unexp = rxm_match_unexp_msg; - } else { - recv_queue->match_recv = rxm_match_noop; - recv_queue->match_unexp = rxm_match_noop; - } - } else { - if (rxm_ep->rxm_info->caps & FI_DIRECTED_RECV) { - recv_queue->match_recv = rxm_match_recv_entry_tag_addr; - recv_queue->match_unexp = rxm_match_unexp_msg_tag_addr; - } else { - recv_queue->match_recv = rxm_match_recv_entry_tag; - recv_queue->match_unexp = rxm_match_unexp_msg_tag; - } - } - - return 0; -} - -static void rxm_recv_queue_close(struct rxm_recv_queue *recv_queue) -{ - /* It indicates that the recv_queue were allocated */ - if (recv_queue->fs) { - rxm_recv_fs_free(recv_queue->fs); - recv_queue->fs = NULL; - } - // TODO cleanup recv_list and unexp msg list -} - static int rxm_ep_create_pools(struct rxm_ep *rxm_ep) { struct ofi_bufpool_attr attr = {0}; @@ -287,8 +152,18 @@ static int rxm_ep_create_pools(struct rxm_ep *rxm_ep) "Unable to create peer xfer context pool\n"); goto free_tx_pool; } - return 0; + attr.size = sizeof(struct rxm_proto_info); + attr.alloc_fn = NULL; + attr.free_fn = NULL; + attr.init_fn = NULL; + ret = ofi_bufpool_create_attr(&attr, &rxm_ep->proto_info_pool); + if (ret) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, + "Unable to create proto info pool\n"); + goto free_tx_pool; + } + return 0; free_tx_pool: ofi_bufpool_destroy(rxm_ep->tx_pool); @@ -298,62 +173,13 @@ static int rxm_ep_create_pools(struct rxm_ep *rxm_ep) return ret; } -static int rxm_multi_recv_pool_init(struct rxm_ep *rxm_ep) -{ - struct ofi_bufpool_attr attr = { - .size = sizeof(struct rxm_recv_entry), - .alignment = 16, - .max_cnt = 0, - .chunk_cnt = 16, - .alloc_fn = NULL, - .init_fn = NULL, - .context = rxm_ep, - .flags = OFI_BUFPOOL_NO_TRACK, - }; - - return ofi_bufpool_create_attr(&attr, &rxm_ep->multi_recv_pool); -} - -static int rxm_ep_rx_queue_init(struct rxm_ep *rxm_ep) -{ - int ret; - - ret = rxm_recv_queue_init(rxm_ep, &rxm_ep->recv_queue, - rxm_ep->rxm_info->rx_attr->size, - RXM_RECV_QUEUE_MSG); - if (ret) - return ret; - - ret = rxm_recv_queue_init(rxm_ep, &rxm_ep->trecv_queue, - rxm_ep->rxm_info->rx_attr->size, - RXM_RECV_QUEUE_TAGGED); - if (ret) - goto err_recv_tag; - - ret = rxm_multi_recv_pool_init(rxm_ep); - if (ret) - goto err_multi; - - return FI_SUCCESS; - -err_multi: - rxm_recv_queue_close(&rxm_ep->trecv_queue); -err_recv_tag: - rxm_recv_queue_close(&rxm_ep->recv_queue); - return ret; -} - /* It is safe to call this function, even if `rxm_ep_txrx_res_open` * has not yet been called */ static void rxm_ep_txrx_res_close(struct rxm_ep *ep) { - rxm_recv_queue_close(&ep->trecv_queue); - rxm_recv_queue_close(&ep->recv_queue); + if (ep->srx && ep->util_ep.ep_fid.msg != &rxm_no_recv_msg_ops) + (void) util_srx_close(&ep->srx->ep_fid.fid); - if (ep->multi_recv_pool) { - ofi_bufpool_destroy(ep->multi_recv_pool); - ep->multi_recv_pool = NULL; - } if (ep->rx_pool) { ofi_bufpool_destroy(ep->rx_pool); ep->rx_pool = NULL; @@ -362,6 +188,10 @@ static void rxm_ep_txrx_res_close(struct rxm_ep *ep) ofi_bufpool_destroy(ep->tx_pool); ep->tx_pool = NULL; } + if (ep->proto_info_pool) { + ofi_bufpool_destroy(ep->proto_info_pool); + ep->proto_info_pool = NULL; + } if (ep->coll_pool) { ofi_bufpool_destroy(ep->coll_pool); ep->coll_pool = NULL; @@ -420,53 +250,13 @@ static struct rxm_eager_ops coll_eager_ops = { .handle_rx = rxm_handle_coll_eager, }; -static bool rxm_ep_cancel_recv(struct rxm_ep *rxm_ep, - struct rxm_recv_queue *recv_queue, void *context) -{ - struct fi_cq_err_entry err_entry; - struct rxm_recv_entry *recv_entry; - struct dlist_entry *entry; - int ret; - - ofi_genlock_lock(&rxm_ep->util_ep.lock); - entry = dlist_remove_first_match(&recv_queue->recv_list, - rxm_match_recv_entry_context, - context); - if (!entry) - goto unlock; - - recv_entry = container_of(entry, struct rxm_recv_entry, entry); - memset(&err_entry, 0, sizeof(err_entry)); - err_entry.op_context = recv_entry->context; - err_entry.flags |= recv_entry->comp_flags; - err_entry.tag = recv_entry->tag; - err_entry.err = FI_ECANCELED; - err_entry.prov_errno = -FI_ECANCELED; - rxm_recv_entry_release(recv_entry); - ret = ofi_cq_write_error(rxm_ep->util_ep.rx_cq, &err_entry); - if (ret) { - FI_WARN(&rxm_prov, FI_LOG_CQ, "Error writing to CQ\n"); - assert(0); - } - -unlock: - ofi_genlock_unlock(&rxm_ep->util_ep.lock); - return entry != NULL; -} - static ssize_t rxm_ep_cancel(fid_t fid_ep, void *context) { struct rxm_ep *ep; ep = container_of(fid_ep, struct rxm_ep, util_ep.ep_fid); - if (rxm_passthru_info(ep->rxm_info)) - return fi_cancel(&ep->msg_srx->fid, context); - - if (!rxm_ep_cancel_recv(ep, &ep->trecv_queue, context)) - rxm_ep_cancel_recv(ep, &ep->recv_queue, context); - - return 0; + return ep->srx->ep_fid.ops->cancel(&ep->srx->ep_fid.fid, context); } static int rxm_ep_getopt(fid_t fid, int level, int optname, void *optval, @@ -480,10 +270,8 @@ static int rxm_ep_getopt(fid_t fid, int level, int optname, void *optval, switch (optname) { case FI_OPT_MIN_MULTI_RECV: - assert(sizeof(rxm_ep->min_multi_recv_size) == sizeof(size_t)); - *(size_t *)optval = rxm_ep->min_multi_recv_size; - *optlen = sizeof(size_t); - break; + return rxm_ep->srx->ep_fid.ops->getopt(&rxm_ep->srx->ep_fid.fid, + level, optname, optval, optlen); case FI_OPT_BUFFERED_MIN: assert(sizeof(rxm_ep->buffered_min) == sizeof(size_t)); *(size_t *)optval = rxm_ep->buffered_min; @@ -507,11 +295,8 @@ static int rxm_ep_setopt(fid_t fid, int level, int optname, switch (optname) { case FI_OPT_MIN_MULTI_RECV: - rxm_ep->min_multi_recv_size = *(size_t *)optval; - FI_INFO(&rxm_prov, FI_LOG_CORE, - "FI_OPT_MIN_MULTI_RECV set to %zu\n", - rxm_ep->min_multi_recv_size); - break; + return rxm_ep->srx->ep_fid.ops->setopt(&rxm_ep->srx->ep_fid.fid, + level, optname, optval, optlen); case FI_OPT_BUFFERED_MIN: if (rxm_ep->rx_pool) { FI_WARN(&rxm_prov, FI_LOG_EP_DATA, @@ -564,99 +349,6 @@ static struct fi_ops_ep rxm_ops_ep = { .tx_size_left = fi_no_tx_size_left, }; - -/* Caller must hold recv_queue->lock -- TODO which lock? */ -struct rxm_rx_buf * -rxm_get_unexp_msg(struct rxm_recv_queue *recv_queue, fi_addr_t addr, - uint64_t tag, uint64_t ignore) -{ - struct rxm_recv_match_attr match_attr; - struct dlist_entry *entry; - - if (dlist_empty(&recv_queue->unexp_msg_list)) - return NULL; - - match_attr.addr = addr; - match_attr.tag = tag; - match_attr.ignore = ignore; - - entry = dlist_find_first_match(&recv_queue->unexp_msg_list, - recv_queue->match_unexp, &match_attr); - if (!entry) - return NULL; - - RXM_DBG_ADDR_TAG(FI_LOG_EP_DATA, "Match for posted recv found in unexp" - " msg list\n", match_attr.addr, match_attr.tag); - - return container_of(entry, struct rxm_rx_buf, unexp_msg.entry); -} - -static void rxm_recv_entry_init_common(struct rxm_recv_entry *recv_entry, - const struct iovec *iov, void **desc, size_t count, - fi_addr_t src_addr, uint64_t tag, uint64_t ignore, - void *context, uint64_t flags, - struct rxm_recv_queue *recv_queue) -{ - size_t i; - - assert(!recv_entry->rndv.tx_buf); - recv_entry->rxm_iov.count = (uint8_t) count; - recv_entry->addr = src_addr; - recv_entry->context = context; - recv_entry->flags = flags; - recv_entry->ignore = ignore; - recv_entry->tag = tag; - - recv_entry->sar.msg_id = RXM_SAR_RX_INIT; - recv_entry->sar.total_recv_len = 0; - recv_entry->total_len = 0; - - for (i = 0; i < count; i++) { - recv_entry->rxm_iov.iov[i] = iov[i]; - recv_entry->total_len += iov[i].iov_len; - if (desc && desc[i]) - recv_entry->rxm_iov.desc[i] = desc[i]; - else - recv_entry->rxm_iov.desc[i] = NULL; - } -} - -struct rxm_recv_entry * -rxm_recv_entry_get(struct rxm_ep *rxm_ep, const struct iovec *iov, - void **desc, size_t count, fi_addr_t src_addr, - uint64_t tag, uint64_t ignore, void *context, - uint64_t flags, struct rxm_recv_queue *recv_queue) -{ - struct rxm_recv_entry *recv_entry; - - if (ofi_freestack_isempty(recv_queue->fs)) - return NULL; - - recv_entry = ofi_freestack_pop(recv_queue->fs); - - rxm_recv_entry_init_common(recv_entry, iov, desc, count, src_addr, tag, - ignore, context, flags, recv_queue); - - return recv_entry; -} - -struct rxm_recv_entry * -rxm_multi_recv_entry_get(struct rxm_ep *rxm_ep, const struct iovec *iov, - void **desc, size_t count, fi_addr_t src_addr, - uint64_t tag, uint64_t ignore, void *context, - uint64_t flags) -{ - struct rxm_recv_entry *recv_entry; - - recv_entry = ofi_buf_alloc(rxm_ep->multi_recv_pool); - - rxm_recv_entry_init_common(recv_entry, iov, desc, count, src_addr, tag, - ignore, context, flags, NULL); - - recv_entry->comp_flags = FI_MSG | FI_RECV; - return recv_entry; -} - struct rxm_tx_buf *rxm_get_tx_buf(struct rxm_ep *ep) { struct rxm_tx_buf *buf; @@ -820,6 +512,7 @@ void rxm_ep_progress_deferred_queue(struct rxm_ep *rxm_ep, struct rxm_conn *rxm_conn) { struct rxm_deferred_tx_entry *def_tx_entry; + struct rxm_proto_info *proto_info; struct iovec iov; struct fi_msg msg; ssize_t ret = 0; @@ -832,12 +525,11 @@ void rxm_ep_progress_deferred_queue(struct rxm_ep *rxm_ep, struct rxm_deferred_tx_entry, entry); switch (def_tx_entry->type) { case RXM_DEFERRED_TX_RNDV_ACK: + proto_info = def_tx_entry->rndv_ack.rx_buf->proto_info; ret = fi_send(def_tx_entry->rxm_conn->msg_ep, - &def_tx_entry->rndv_ack.rx_buf-> - recv_entry->rndv.tx_buf->pkt, + &proto_info->rndv.tx_buf->pkt, def_tx_entry->rndv_ack.pkt_size, - def_tx_entry->rndv_ack.rx_buf->recv_entry-> - rndv.tx_buf->hdr.desc, + proto_info->rndv.tx_buf->hdr.desc, 0, def_tx_entry->rndv_ack.rx_buf); if (ret) { if (ret == -FI_EAGAIN) @@ -845,11 +537,10 @@ void rxm_ep_progress_deferred_queue(struct rxm_ep *rxm_ep, rxm_cq_write_rx_error( def_tx_entry->rxm_ep, ofi_op_msg, def_tx_entry->rndv_ack.rx_buf-> - recv_entry->context, (int) ret); + peer_entry->context, (int) ret); } - if (def_tx_entry->rndv_ack.rx_buf->recv_entry->rndv - .tx_buf->pkt.ctrl_hdr - .type == rxm_ctrl_rndv_rd_done) + if (proto_info->rndv.tx_buf->pkt.ctrl_hdr.type == + rxm_ctrl_rndv_rd_done) RXM_UPDATE_STATE(FI_LOG_EP_DATA, def_tx_entry->rndv_ack.rx_buf, RXM_RNDV_READ_DONE_SENT); @@ -891,7 +582,7 @@ void rxm_ep_progress_deferred_queue(struct rxm_ep *rxm_ep, rxm_cq_write_rx_error( def_tx_entry->rxm_ep, ofi_op_msg, def_tx_entry->rndv_read.rx_buf-> - recv_entry->context, (int) ret); + peer_entry->context, (int) ret); } break; case RXM_DEFERRED_TX_RNDV_WRITE: @@ -944,7 +635,7 @@ void rxm_ep_progress_deferred_queue(struct rxm_ep *rxm_ep, def_tx_entry->rxm_ep, ofi_op_msg, def_tx_entry->rndv_read.rx_buf-> - recv_entry->context, + peer_entry->context, (int) ret); } return; @@ -1451,9 +1142,6 @@ static void rxm_ep_settings_init(struct rxm_ep *rxm_ep) sizeof(struct rxm_rndv_hdr))), rxm_buffer_size); - assert(!rxm_ep->min_multi_recv_size); - rxm_ep->min_multi_recv_size = rxm_buffer_size; - assert(!rxm_ep->buffered_limit); rxm_ep->buffered_limit = rxm_buffer_size; @@ -1465,13 +1153,11 @@ static void rxm_ep_settings_init(struct rxm_ep *rxm_ep) "\t\t MR local: MSG - %d, RxM - %d\n" "\t\t Completions per progress: MSG - %zu\n" "\t\t Buffered min: %zu\n" - "\t\t Min multi recv size: %zu\n" "\t\t inject size: %zu\n" "\t\t Protocol limits: Eager: %zu, SAR: %zu\n", rxm_ep->msg_mr_local, rxm_ep->rdm_mr_local, rxm_ep->comp_per_progress, rxm_ep->buffered_min, - rxm_ep->min_multi_recv_size, rxm_ep->inject_limit, - rxm_ep->eager_limit, rxm_ep->sar_limit); + rxm_ep->inject_limit, rxm_ep->eager_limit, rxm_ep->sar_limit); } static int rxm_ep_txrx_res_open(struct rxm_ep *rxm_ep) @@ -1484,19 +1170,7 @@ static int rxm_ep_txrx_res_open(struct rxm_ep *rxm_ep) dlist_init(&rxm_ep->deferred_queue); - ret = rxm_ep_rx_queue_init(rxm_ep); - if (ret) - goto err; - return FI_SUCCESS; -err: - ofi_bufpool_destroy(rxm_ep->coll_pool); - ofi_bufpool_destroy(rxm_ep->rx_pool); - ofi_bufpool_destroy(rxm_ep->tx_pool); - rxm_ep->coll_pool = NULL; - rxm_ep->rx_pool = NULL; - rxm_ep->tx_pool = NULL; - return ret; } static int rxm_ep_enable_check(struct rxm_ep *rxm_ep) @@ -1526,9 +1200,129 @@ static int rxm_ep_enable_check(struct rxm_ep *rxm_ep) return 0; } +static int rxm_unexp_start(struct fi_peer_rx_entry *rx_entry) +{ + struct rxm_rx_buf *rx_buf = rx_entry->peer_context; + + return rx_buf->pkt.ctrl_hdr.type == rxm_ctrl_seg ? + rxm_handle_unexp_sar(rx_entry): + rxm_handle_rx_buf(rx_buf); +} + +static int rxm_discard(struct fi_peer_rx_entry *rx_entry) +{ + struct rxm_rx_buf *rx_buf, *seg_rx; + + rx_buf = rx_entry->peer_context; + + if (rx_buf->pkt.ctrl_hdr.type == rxm_ctrl_seg) { + while (!dlist_empty(&rx_buf->proto_info->sar.pkt_list)) { + dlist_pop_front(&rx_buf->proto_info->sar.pkt_list, + struct rxm_rx_buf, seg_rx, unexp_entry); + rxm_free_rx_buf(seg_rx); + } + ofi_buf_free(rx_buf->proto_info); + } + + rxm_free_rx_buf(rx_buf); + return FI_SUCCESS; +} + +struct fi_ops_srx_peer rxm_srx_peer_ops = { + .size = sizeof(struct fi_ops_srx_peer), + .start_msg = rxm_unexp_start, + .start_tag = rxm_unexp_start, + .discard_msg = rxm_discard, + .discard_tag = rxm_discard, +}; + +static int rxm_srx_close(struct fid *fid) +{ + struct rxm_domain *domain = container_of(fid, struct rxm_domain, + rx_ep.fid); + + ofi_atomic_dec32(&domain->util_domain.ref); + + return FI_SUCCESS; +} + +static struct fi_ops rxm_srx_fi_ops = { + .size = sizeof(struct fi_ops), + .close = rxm_srx_close, + .bind = fi_no_bind, + .control = fi_no_control, + .ops_open = fi_no_ops_open, +}; + +static struct fi_ops_msg rxm_srx_msg_ops = { + .size = sizeof(struct fi_ops_msg), + .recv = fi_no_msg_recv, + .recvv = fi_no_msg_recvv, + .recvmsg = fi_no_msg_recvmsg, + .send = fi_no_msg_send, + .sendv = fi_no_msg_sendv, + .sendmsg = fi_no_msg_sendmsg, + .inject = fi_no_msg_inject, + .senddata = fi_no_msg_senddata, + .injectdata = fi_no_msg_injectdata, +}; + +static struct fi_ops_tagged rxm_srx_tagged_ops = { + .size = sizeof(struct fi_ops_msg), + .recv = fi_no_tagged_recv, + .recvv = fi_no_tagged_recvv, + .recvmsg = fi_no_tagged_recvmsg, + .send = fi_no_tagged_send, + .sendv = fi_no_tagged_sendv, + .sendmsg = fi_no_tagged_sendmsg, + .inject = fi_no_tagged_inject, + .senddata = fi_no_tagged_senddata, + .injectdata = fi_no_tagged_injectdata, +}; + +int rxm_srx_context(struct fid_domain *domain, struct fi_rx_attr *attr, + struct fid_ep **rx_ep, void *context) +{ + struct rxm_domain *rxm_domain; + + if (!(attr->op_flags & FI_PEER)) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, + "shared srx only supported with FI_PEER flag\n"); + return -FI_EINVAL; + } + + rxm_domain = container_of(domain, struct rxm_domain, + util_domain.domain_fid); + + if (rxm_domain->srx) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, + "Peer SRX context already imported\n"); + return -FI_EINVAL; + } + + rxm_domain->srx = ((struct fi_peer_srx_context *) + (context))->srx; + rxm_domain->srx->peer_ops = &rxm_srx_peer_ops; + rxm_domain->rx_ep.msg = &rxm_srx_msg_ops; + rxm_domain->rx_ep.tagged = &rxm_srx_tagged_ops; + rxm_domain->rx_ep.fid.ops = &rxm_srx_fi_ops; + rxm_domain->rx_ep.fid.fclass = FI_CLASS_SRX_CTX; + *rx_ep = &rxm_domain->rx_ep; + ofi_atomic_inc32(&rxm_domain->util_domain.ref); + + return FI_SUCCESS; +} + +static void rxm_update(struct util_srx_ctx *srx, struct util_rx_entry *rx_entry) +{ + //no update needed +} + static int rxm_ep_ctrl(struct fid *fid, int command, void *arg) { struct rxm_ep *ep; + struct rxm_domain *domain; + struct fid_ep *srx; int ret; ep = container_of(fid, struct rxm_ep, util_ep.ep_fid.fid); @@ -1564,6 +1358,32 @@ static int rxm_ep_ctrl(struct fid *fid, int command, void *arg) if (ret) return ret; + if (!ep->srx) { + domain = container_of(ep->util_ep.domain, + struct rxm_domain, + util_domain.domain_fid); + ret = util_ep_srx_context(&domain->util_domain, + ep->rxm_info->rx_attr->size, + RXM_IOV_LIMIT, rxm_buffer_size, + &rxm_update, &ep->util_ep.lock, + &srx); + if (ret) + return ret; + + ep->srx = container_of(srx, struct fid_peer_srx, + ep_fid.fid); + ep->srx->peer_ops = &rxm_srx_peer_ops; + + ret = util_srx_bind(&ep->srx->ep_fid.fid, + &ep->util_ep.rx_cq->cq_fid.fid, + FI_RECV); + if (ret) + return ret; + } else { + ep->util_ep.ep_fid.msg = &rxm_no_recv_msg_ops; + ep->util_ep.ep_fid.tagged = &rxm_no_recv_tagged_ops; + } + if (ep->msg_srx && !rxm_passthru_info(ep->rxm_info)) { ret = rxm_prepost_recv(ep, ep->msg_srx); if (ret) @@ -1592,10 +1412,21 @@ static int rxm_ep_bind(struct fid *ep_fid, struct fid *bfid, uint64_t flags) struct rxm_av *rxm_av; struct rxm_cq *rxm_cq; struct rxm_eq *rxm_eq; - int ret, retv = 0; + int ret; rxm_ep = container_of(ep_fid, struct rxm_ep, util_ep.ep_fid.fid); + if (bfid->fclass == FI_CLASS_SRX_CTX) { + if (rxm_ep->srx) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, + "SRX context already bound to EP\n"); + return -FI_EINVAL; + } + rxm_ep->srx = + (container_of(bfid, struct rxm_domain, rx_ep.fid))->srx; + return FI_SUCCESS; + } + ret = ofi_ep_bind(&rxm_ep->util_ep, bfid, flags); if (ret) return ret; @@ -1608,14 +1439,14 @@ static int rxm_ep_bind(struct fid *ep_fid, struct fid *bfid, uint64_t flags) &rxm_av->util_coll_av->fid, flags); if (ret) - retv = ret; + return ret; } if (rxm_ep->offload_coll_ep && rxm_av->offload_coll_av) { ret = ofi_ep_fid_bind(&rxm_ep->offload_coll_ep->fid, &rxm_av->offload_coll_av->fid, flags); if (ret) - retv = ret; + return ret; } break; @@ -1626,14 +1457,14 @@ static int rxm_ep_bind(struct fid *ep_fid, struct fid *bfid, uint64_t flags) &rxm_cq->util_coll_cq->fid, flags); if (ret) - retv = ret; + return ret; } if (rxm_ep->offload_coll_ep && rxm_cq->offload_coll_cq) { ret = ofi_ep_fid_bind(&rxm_ep->offload_coll_ep->fid, &rxm_cq->offload_coll_cq->fid, flags); if (ret) - retv = ret; + return ret; } break; @@ -1644,19 +1475,18 @@ static int rxm_ep_bind(struct fid *ep_fid, struct fid *bfid, uint64_t flags) &rxm_eq->util_coll_eq->fid, flags); if (ret) - retv = ret; + return ret; } if (rxm_ep->offload_coll_ep && rxm_eq->offload_coll_eq) { ret = ofi_ep_fid_bind(&rxm_ep->offload_coll_ep->fid, &rxm_eq->offload_coll_eq->fid, flags); if (ret) - retv = ret; + return ret; } - break; } - return retv; + return FI_SUCCESS; } static struct fi_ops rxm_ep_fi_ops = { diff --git a/prov/rxm/src/rxm_init.c b/prov/rxm/src/rxm_init.c index 1a76796d4e0..10a7ae535d7 100644 --- a/prov/rxm/src/rxm_init.c +++ b/prov/rxm/src/rxm_init.c @@ -262,8 +262,8 @@ int rxm_info_to_core(uint32_t version, const struct fi_info *hints, core_info->rx_attr->op_flags &= ~FI_MULTI_RECV; - core_info->domain_attr->caps &= ~(FI_AV_USER_ID); - core_info->caps &= ~(FI_AV_USER_ID); + core_info->domain_attr->caps &= ~(FI_AV_USER_ID | FI_PEER); + core_info->caps &= ~(FI_AV_USER_ID | FI_PEER); return 0; } diff --git a/prov/rxm/src/rxm_msg.c b/prov/rxm/src/rxm_msg.c index 3b9088a2858..5d48e88e53a 100644 --- a/prov/rxm/src/rxm_msg.c +++ b/prov/rxm/src/rxm_msg.c @@ -40,214 +40,16 @@ #include "rxm.h" - -ssize_t rxm_handle_unexp_sar(struct rxm_recv_queue *recv_queue, - struct rxm_recv_entry *recv_entry, - struct rxm_rx_buf *rx_buf) -{ - struct rxm_recv_match_attr match_attr; - struct dlist_entry *entry; - bool last; - ssize_t ret; - - ret = rxm_handle_rx_buf(rx_buf); - last = rxm_sar_get_seg_type(&rx_buf->pkt.ctrl_hdr) == RXM_SAR_SEG_LAST; - if (ret || last) - return ret; - - match_attr.addr = recv_entry->addr; - match_attr.tag = recv_entry->tag; - match_attr.ignore = recv_entry->ignore; - - dlist_foreach_container_safe(&recv_queue->unexp_msg_list, - struct rxm_rx_buf, rx_buf, - unexp_msg.entry, entry) { - if (!recv_queue->match_unexp(&rx_buf->unexp_msg.entry, - &match_attr)) - continue; - /* Handle unordered completions from MSG provider */ - if ((rx_buf->pkt.ctrl_hdr.msg_id != recv_entry->sar.msg_id) || - ((rx_buf->pkt.ctrl_hdr.type != rxm_ctrl_seg))) - continue; - - if (!rx_buf->conn) { - rx_buf->conn = ofi_idm_at(&rx_buf->ep->conn_idx_map, - (int) rx_buf->pkt.ctrl_hdr.conn_id); - } - if (recv_entry->sar.conn != rx_buf->conn) - continue; - rx_buf->recv_entry = recv_entry; - dlist_remove(&rx_buf->unexp_msg.entry); - last = rxm_sar_get_seg_type(&rx_buf->pkt.ctrl_hdr) == - RXM_SAR_SEG_LAST; - ret = rxm_handle_rx_buf(rx_buf); - if (ret || last) - break; - } - return ret; -} - -/* - * We don't expect to have unexpected messages when the app is using - * multi-recv buffers. Optimize for that case. - * - * If there are unexpected messages waiting when we post a mult-recv buffer, - * we trim off the start of the buffer, treat it as a normal buffer, and pair - * it with an unexpected message. We continue doing this until either no - * unexpected messages are left or the multi-recv buffer has been consumed. - */ -static ssize_t -rxm_post_mrecv(struct rxm_ep *ep, const struct iovec *iov, - void **desc, void *context, uint64_t op_flags) -{ - struct rxm_recv_entry *recv_entry; - struct rxm_rx_buf *rx_buf; - struct iovec cur_iov = *iov; - ssize_t ret; - - do { - recv_entry = rxm_recv_entry_get(ep, &cur_iov, desc, 1, - FI_ADDR_UNSPEC, 0, 0, context, - op_flags, &ep->recv_queue); - if (!recv_entry) { - ret = -FI_ENOMEM; - break; - } - - rx_buf = rxm_get_unexp_msg(&ep->recv_queue, recv_entry->addr, 0, 0); - if (!rx_buf) { - dlist_insert_tail(&recv_entry->entry, - &ep->recv_queue.recv_list); - return 0; - } - - dlist_remove(&rx_buf->unexp_msg.entry); - rx_buf->recv_entry = recv_entry; - recv_entry->flags &= ~FI_MULTI_RECV; - recv_entry->total_len = MIN(cur_iov.iov_len, rx_buf->pkt.hdr.size); - recv_entry->rxm_iov.iov[0].iov_len = recv_entry->total_len; - - cur_iov.iov_base = (uint8_t *) cur_iov.iov_base + recv_entry->total_len; - cur_iov.iov_len -= recv_entry->total_len; - - if (rx_buf->pkt.ctrl_hdr.type != rxm_ctrl_seg) - ret = rxm_handle_rx_buf(rx_buf); - else - ret = rxm_handle_unexp_sar(&ep->recv_queue, recv_entry, - rx_buf); - - } while (!ret && cur_iov.iov_len >= ep->min_multi_recv_size); - - if ((cur_iov.iov_len < ep->min_multi_recv_size) || - (ret && cur_iov.iov_len != iov->iov_len)) { - ofi_peer_cq_write(ep->util_ep.rx_cq, context, FI_MULTI_RECV, - 0, NULL, 0, 0, FI_ADDR_NOTAVAIL); - } - - return ret; -} - -static ssize_t -rxm_recv_common(struct rxm_ep *rxm_ep, const struct iovec *iov, - void **desc, size_t count, fi_addr_t src_addr, - void *context, uint64_t op_flags) -{ - struct rxm_recv_entry *recv_entry; - struct rxm_rx_buf *rx_buf; - ssize_t ret; - - assert(rxm_ep->util_ep.rx_cq); - assert(count <= rxm_ep->rxm_info->rx_attr->iov_limit); - - ofi_genlock_lock(&rxm_ep->util_ep.lock); - if (op_flags & FI_MULTI_RECV) { - ret = rxm_post_mrecv(rxm_ep, iov, desc, context, op_flags); - goto release; - } - - recv_entry = rxm_recv_entry_get(rxm_ep, iov, desc, count, src_addr, - 0, 0, context, op_flags, - &rxm_ep->recv_queue); - if (!recv_entry) { - ret = -FI_EAGAIN; - goto release; - } - - rx_buf = rxm_get_unexp_msg(&rxm_ep->recv_queue, recv_entry->addr, 0, 0); - if (!rx_buf) { - dlist_insert_tail(&recv_entry->entry, - &rxm_ep->recv_queue.recv_list); - ret = FI_SUCCESS; - goto release; - } - - dlist_remove(&rx_buf->unexp_msg.entry); - rx_buf->recv_entry = recv_entry; - - ret = (rx_buf->pkt.ctrl_hdr.type != rxm_ctrl_seg) ? - rxm_handle_rx_buf(rx_buf) : - rxm_handle_unexp_sar(&rxm_ep->recv_queue, recv_entry, rx_buf); - -release: - ofi_genlock_unlock(&rxm_ep->util_ep.lock); - return ret; -} - -static ssize_t -rxm_buf_recv(struct rxm_ep *rxm_ep, const struct iovec *iov, - void **desc, size_t count, fi_addr_t src_addr, - void *context, uint64_t flags) -{ - struct rxm_recv_entry *recv_entry; - struct fi_recv_context *recv_ctx = context; - struct rxm_rx_buf *rx_buf; - ssize_t ret = 0; - - context = recv_ctx->context; - rx_buf = container_of(recv_ctx, struct rxm_rx_buf, recv_context); - - ofi_genlock_lock(&rxm_ep->util_ep.lock); - if (flags & FI_CLAIM) { - FI_DBG(&rxm_prov, FI_LOG_EP_DATA, - "Claiming buffered receive\n"); - - recv_entry = rxm_recv_entry_get(rxm_ep, iov, desc, count, - src_addr, 0, 0, context, - flags, &rxm_ep->recv_queue); - if (!recv_entry) { - ret = -FI_EAGAIN; - goto unlock; - } - - recv_entry->comp_flags |= FI_CLAIM; - - rx_buf->recv_entry = recv_entry; - ret = rxm_handle_rx_buf(rx_buf); - } else { - assert(flags & FI_DISCARD); - FI_DBG(&rxm_prov, FI_LOG_EP_DATA, - "Discarding buffered receive\n"); - rxm_free_rx_buf(rx_buf); - } -unlock: - ofi_genlock_unlock(&rxm_ep->util_ep.lock); - return ret; -} - static ssize_t rxm_recvmsg(struct fid_ep *ep_fid, const struct fi_msg *msg, uint64_t flags) { struct rxm_ep *rxm_ep = container_of(ep_fid, struct rxm_ep, util_ep.ep_fid.fid); - if (rxm_ep->rxm_info->mode & OFI_BUFFERED_RECV) - return rxm_buf_recv(rxm_ep, msg->msg_iov, msg->desc, - msg->iov_count, msg->addr, msg->context, - flags | rxm_ep->util_ep.rx_msg_flags); - - return rxm_recv_common(rxm_ep, msg->msg_iov, msg->desc, - msg->iov_count, msg->addr, msg->context, - flags | rxm_ep->util_ep.rx_msg_flags); + return util_srx_generic_recv(&rxm_ep->srx->ep_fid, msg->msg_iov, + msg->desc, msg->iov_count, msg->addr, + msg->context, + flags | rxm_ep->util_ep.rx_msg_flags); } @@ -262,8 +64,9 @@ rxm_recv(struct fid_ep *ep_fid, void *buf, size_t len, .iov_len = len, }; - return rxm_recv_common(rxm_ep, &iov, &desc, 1, src_addr, - context, rxm_ep->util_ep.rx_op_flags); + return util_srx_generic_recv(&rxm_ep->srx->ep_fid, &iov, &desc, 1, + src_addr, context, + rxm_ep->util_ep.rx_op_flags); } static ssize_t @@ -273,8 +76,9 @@ rxm_recvv(struct fid_ep *ep_fid, const struct iovec *iov, struct rxm_ep *rxm_ep = container_of(ep_fid, struct rxm_ep, util_ep.ep_fid.fid); - return rxm_recv_common(rxm_ep, iov, desc, count, src_addr, - context, rxm_ep->util_ep.rx_op_flags); + return util_srx_generic_recv(&rxm_ep->srx->ep_fid, iov, desc, count, + src_addr, context, + rxm_ep->util_ep.rx_op_flags); } static ssize_t @@ -661,15 +465,13 @@ rxm_send_eager(struct rxm_ep *rxm_ep, struct rxm_conn *rxm_conn, eager_buf->app_context = context; eager_buf->flags = flags; + rxm_ep_format_tx_buf_pkt(rxm_conn, data_len, op, data, tag, + flags, &eager_buf->pkt); if (rxm_use_direct_send(rxm_ep, count, flags)) { - rxm_ep_format_tx_buf_pkt(rxm_conn, data_len, op, data, tag, - flags, &eager_buf->pkt); ret = rxm_direct_send(rxm_ep, rxm_conn, eager_buf, iov, desc, count); } else { - rxm_ep_format_tx_buf_pkt(rxm_conn, data_len, op, data, tag, - flags, &eager_buf->pkt); ret = rxm_copy_from_hmem_iov(desc, eager_buf->pkt.data, eager_buf->pkt.hdr.size, iov, count, 0); @@ -891,6 +693,19 @@ struct fi_ops_msg rxm_msg_ops = { .injectdata = rxm_injectdata, }; +struct fi_ops_msg rxm_no_recv_msg_ops = { + .size = sizeof(struct fi_ops_msg), + .recv = fi_no_msg_recv, + .recvv = fi_no_msg_recvv, + .recvmsg = fi_no_msg_recvmsg, + .send = rxm_send, + .sendv = rxm_sendv, + .sendmsg = rxm_sendmsg, + .inject = rxm_inject, + .senddata = rxm_senddata, + .injectdata = rxm_injectdata, +}; + static ssize_t rxm_recv_thru(struct fid_ep *ep_fid, void *buf, size_t len, void *desc, fi_addr_t src_addr, void *context) diff --git a/prov/rxm/src/rxm_tagged.c b/prov/rxm/src/rxm_tagged.c index 8f18f34b3eb..1276bac0ba3 100644 --- a/prov/rxm/src/rxm_tagged.c +++ b/prov/rxm/src/rxm_tagged.c @@ -43,189 +43,21 @@ #include "rxm.h" -static void -rxm_discard_recv(struct rxm_ep *rxm_ep, struct rxm_rx_buf *rx_buf, - void *context) -{ - RXM_DBG_ADDR_TAG(FI_LOG_EP_DATA, "Discarding message", - rx_buf->unexp_msg.addr, rx_buf->unexp_msg.tag); - - ofi_peer_cq_write(rxm_ep->util_ep.rx_cq, context, FI_TAGGED | FI_RECV, - 0, NULL, rx_buf->pkt.hdr.data, - rx_buf->pkt.hdr.tag, FI_ADDR_NOTAVAIL); - rxm_free_rx_buf(rx_buf); -} - -static void -rxm_peek_recv(struct rxm_ep *rxm_ep, fi_addr_t addr, uint64_t tag, - uint64_t ignore, void *context, uint64_t flags, - struct rxm_recv_queue *recv_queue) -{ - struct rxm_rx_buf *rx_buf; - int ret; - - RXM_DBG_ADDR_TAG(FI_LOG_EP_DATA, "Peeking message", addr, tag); - - /* peek doesn't support peer transfer at this moment */ - assert(!(flags & FI_PEER_TRANSFER)); - - rxm_ep_do_progress(&rxm_ep->util_ep); - - rx_buf = rxm_get_unexp_msg(recv_queue, addr, tag, ignore); - if (!rx_buf) { - FI_DBG(&rxm_prov, FI_LOG_EP_DATA, "Message not found\n"); - ret = ofi_peer_cq_write_error_peek( - rxm_ep->util_ep.rx_cq, tag, context); - if (ret) - FI_WARN(&rxm_prov, FI_LOG_CQ, "Error writing to CQ\n"); - return; - } - - FI_DBG(&rxm_prov, FI_LOG_EP_DATA, "Message found\n"); - - if (flags & FI_DISCARD) { - dlist_remove(&rx_buf->unexp_msg.entry); - rxm_discard_recv(rxm_ep, rx_buf, context); - return; - } - - if (flags & FI_CLAIM) { - FI_DBG(&rxm_prov, FI_LOG_EP_DATA, "Marking message for Claim\n"); - ((struct fi_context *)context)->internal[0] = rx_buf; - dlist_remove(&rx_buf->unexp_msg.entry); - } - - ofi_peer_cq_write(rxm_ep->util_ep.rx_cq, context, FI_TAGGED | FI_RECV, - rx_buf->pkt.hdr.size, NULL, rx_buf->pkt.hdr.data, - rx_buf->pkt.hdr.tag, FI_ADDR_NOTAVAIL); -} - -static ssize_t -rxm_post_trecv(struct rxm_ep *rxm_ep, const struct iovec *iov, - void **desc, size_t count, fi_addr_t src_addr, - uint64_t tag, uint64_t ignore, void *context, uint64_t op_flags) -{ - struct rxm_recv_entry *recv_entry; - struct rxm_rx_buf *rx_buf; - - assert(count <= rxm_ep->rxm_info->rx_attr->iov_limit); - - recv_entry = rxm_recv_entry_get(rxm_ep, iov, desc, count, src_addr, - tag, ignore, context, op_flags, - &rxm_ep->trecv_queue); - if (!recv_entry) - return -FI_EAGAIN; - - rx_buf = rxm_get_unexp_msg(&rxm_ep->trecv_queue, recv_entry->addr, - recv_entry->tag, recv_entry->ignore); - if (!rx_buf) { - dlist_insert_tail(&recv_entry->entry, - &rxm_ep->trecv_queue.recv_list); - return FI_SUCCESS; - } - - dlist_remove(&rx_buf->unexp_msg.entry); - rx_buf->recv_entry = recv_entry; - - if (rx_buf->pkt.ctrl_hdr.type != rxm_ctrl_seg) - return rxm_handle_rx_buf(rx_buf); - else - return rxm_handle_unexp_sar(&rxm_ep->trecv_queue, recv_entry, - rx_buf); -} - -static ssize_t -rxm_trecv_common(struct rxm_ep *rxm_ep, const struct iovec *iov, - void **desc, size_t count, fi_addr_t src_addr, - uint64_t tag, uint64_t ignore, void *context, - uint64_t op_flags) -{ - ssize_t ret; - - if (op_flags & FI_PEER_TRANSFER) - tag |= RXM_PEER_XFER_TAG_FLAG; - - ofi_genlock_lock(&rxm_ep->util_ep.lock); - ret = rxm_post_trecv(rxm_ep, iov, desc, count, src_addr, - tag, ignore, context, op_flags); - ofi_genlock_unlock(&rxm_ep->util_ep.lock); - return ret; -} - static ssize_t rxm_trecvmsg(struct fid_ep *ep_fid, const struct fi_msg_tagged *msg, uint64_t flags) { - struct rxm_ep *rxm_ep; - struct rxm_recv_entry *recv_entry; - struct fi_recv_context *recv_ctx; - struct rxm_rx_buf *rx_buf; - void *context = msg->context; - ssize_t ret = 0; - - rxm_ep = container_of(ep_fid, struct rxm_ep, util_ep.ep_fid.fid); - flags |= rxm_ep->util_ep.rx_msg_flags; - - if (!(flags & (FI_CLAIM | FI_PEEK)) && - !(rxm_ep->rxm_info->mode & OFI_BUFFERED_RECV)) { - return rxm_trecv_common(rxm_ep, msg->msg_iov, msg->desc, - msg->iov_count, msg->addr, - msg->tag, msg->ignore, context, flags); - } - - ofi_genlock_lock(&rxm_ep->util_ep.lock); - if (rxm_ep->rxm_info->mode & OFI_BUFFERED_RECV) { - recv_ctx = msg->context; - context = recv_ctx->context; - rx_buf = container_of(recv_ctx, struct rxm_rx_buf, recv_context); - - if (flags & FI_CLAIM) { - FI_DBG(&rxm_prov, FI_LOG_EP_DATA, - "Claiming buffered receive\n"); - goto claim; - } - - assert(flags & FI_DISCARD); - FI_DBG(&rxm_prov, FI_LOG_EP_DATA, "Discarding buffered receive\n"); - rxm_free_rx_buf(rx_buf); - goto unlock; - } - - if (flags & FI_PEEK) { - rxm_peek_recv(rxm_ep, msg->addr, msg->tag, msg->ignore, - context, flags, &rxm_ep->trecv_queue); - goto unlock; - } - - rx_buf = ((struct fi_context *) context)->internal[0]; - assert(rx_buf); - FI_DBG(&rxm_prov, FI_LOG_EP_DATA, "Claim message\n"); - - if (flags & FI_DISCARD) { - rxm_discard_recv(rxm_ep, rx_buf, context); - goto unlock; - } - -claim: - assert (flags & FI_CLAIM); - recv_entry = rxm_recv_entry_get(rxm_ep, msg->msg_iov, msg->desc, - msg->iov_count, msg->addr, - msg->tag, msg->ignore, context, flags, - &rxm_ep->trecv_queue); - if (!recv_entry) { - ret = -FI_EAGAIN; - goto unlock; - } - - if (rxm_ep->rxm_info->mode & OFI_BUFFERED_RECV) - recv_entry->comp_flags |= FI_CLAIM; + uint64_t tag = msg->tag; + struct rxm_ep *rxm_ep = container_of(ep_fid, struct rxm_ep, + util_ep.ep_fid.fid); - rx_buf->recv_entry = recv_entry; - ret = rxm_handle_rx_buf(rx_buf); + if (flags & FI_PEER_TRANSFER) + tag |= RXM_PEER_XFER_TAG_FLAG; -unlock: - ofi_genlock_unlock(&rxm_ep->util_ep.lock); - return ret; + return util_srx_generic_trecv(&rxm_ep->srx->ep_fid, msg->msg_iov, + msg->desc, msg->iov_count, msg->addr, + msg->context, tag, msg->ignore, + flags | rxm_ep->util_ep.rx_msg_flags); } static ssize_t @@ -240,8 +72,9 @@ rxm_trecv(struct fid_ep *ep_fid, void *buf, size_t len, }; rxm_ep = container_of(ep_fid, struct rxm_ep, util_ep.ep_fid.fid); - return rxm_trecv_common(rxm_ep, &iov, &desc, 1, src_addr, tag, ignore, - context, rxm_ep->util_ep.rx_op_flags); + return util_srx_generic_trecv(&rxm_ep->srx->ep_fid, &iov, &desc, 1, + src_addr, context, tag, ignore, + rxm_ep->util_ep.rx_op_flags); } static ssize_t @@ -252,8 +85,9 @@ rxm_trecvv(struct fid_ep *ep_fid, const struct iovec *iov, struct rxm_ep *rxm_ep; rxm_ep = container_of(ep_fid, struct rxm_ep, util_ep.ep_fid.fid); - return rxm_trecv_common(rxm_ep, iov, desc, count, src_addr, tag, - ignore, context, rxm_ep->util_ep.rx_op_flags); + return util_srx_generic_trecv(&rxm_ep->srx->ep_fid, iov, desc, count, + src_addr, context, tag, ignore, + rxm_ep->util_ep.rx_op_flags); } static ssize_t @@ -372,7 +206,7 @@ rxm_tsenddata(struct fid_ep *ep_fid, const void *buf, size_t len, ret = rxm_send_common(rxm_ep, rxm_conn, &iov, &desc, 1, context, data, rxm_ep->util_ep.tx_op_flags | FI_REMOTE_CQ_DATA, - tag, ofi_op_tagged); + tag, ofi_op_tagged); unlock: ofi_genlock_unlock(&rxm_ep->util_ep.lock); return ret; @@ -416,6 +250,18 @@ struct fi_ops_tagged rxm_tagged_ops = { .injectdata = rxm_tinjectdata, }; +struct fi_ops_tagged rxm_no_recv_tagged_ops = { + .size = sizeof(struct fi_ops_tagged), + .recv = fi_no_tagged_recv, + .recvv = fi_no_tagged_recvv, + .recvmsg = fi_no_tagged_recvmsg, + .send = rxm_tsend, + .sendv = rxm_tsendv, + .sendmsg = rxm_tsendmsg, + .inject = rxm_tinject, + .senddata = rxm_tsenddata, + .injectdata = rxm_tinjectdata, +}; static ssize_t rxm_trecv_thru(struct fid_ep *ep_fid, void *buf, size_t len, diff --git a/prov/tcp/src/xnet_av.c b/prov/tcp/src/xnet_av.c index 14b82ccdafd..80b18f2a568 100644 --- a/prov/tcp/src/xnet_av.c +++ b/prov/tcp/src/xnet_av.c @@ -38,7 +38,7 @@ int xnet_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, struct fid_av **fid_av, void *context) { return rxm_util_av_open(domain_fid, attr, fid_av, context, - sizeof(struct xnet_conn), NULL); + sizeof(struct xnet_conn), NULL, NULL); } static int xnet_mplex_av_remove(struct fid_av *av_fid, fi_addr_t *fi_addr, diff --git a/prov/util/src/rxm_av.c b/prov/util/src/rxm_av.c index a5e30c95026..beb11d0620c 100644 --- a/prov/util/src/rxm_av.c +++ b/prov/util/src/rxm_av.c @@ -281,6 +281,8 @@ static int rxm_av_insert(struct fid_av *av_fid, const void *addr, size_t count, { struct rxm_av *av; fi_addr_t *user_ids = NULL; + struct dlist_entry *av_entry; + struct util_ep *util_ep; int ret; if (flags & FI_AV_USER_ID) { @@ -303,6 +305,14 @@ static int rxm_av_insert(struct fid_av *av_fid, const void *addr, size_t count, goto out; } + if (!av->foreach_ep) + goto out; + + dlist_foreach(&av->util_av.ep_list, av_entry) { + util_ep = container_of(av_entry, struct util_ep, av_entry); + av->foreach_ep(&av->util_av, util_ep); + } + out: free(user_ids); if (ret) @@ -420,7 +430,9 @@ static struct fi_ops_av rxm_av_ops = { int rxm_util_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, struct fid_av **fid_av, void *context, size_t conn_size, void (*remove_handler)(struct util_ep *util_ep, - struct util_peer_addr *peer)) + struct util_peer_addr *peer), + void (*foreach_ep)(struct util_av *av, struct util_ep *ep)) + { struct util_domain *domain; struct util_av_attr util_attr; @@ -457,6 +469,7 @@ int rxm_util_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, av->util_av.av_fid.fid.ops = &rxm_av_fi_ops; av->util_av.av_fid.ops = &rxm_av_ops; av->util_av.remove_handler = remove_handler; + av->foreach_ep = foreach_ep; *fid_av = &av->util_av.av_fid; return 0;