From 622a773151faacf1126d3792fbd8500689aeae64 Mon Sep 17 00:00:00 2001 From: Alexia Ingerson Date: Thu, 31 Oct 2024 08:48:51 -0700 Subject: [PATCH] prov/rxm: replace rxm managed srx with util srx, support FI_PEER Remove rxm implementation of receive queues and leverage the util srx implementation which supports the peer srx API. This allows rxm to use the peer API calls to match receives. To do this, move the rxm protocol information from the receive entry into the rx_buf and allocate it dynamically as needed to track protocol information. This allows rxm to use the default peer_rx_entry instead of its own custom receive entry. With this last piece of the peer API implemented, rxm can also now advertise full support of the FI_PEER capability. Just like the FI_AV_USER_ID capability, rxm removes the bit from the core provider info as it is only a requirement from the application side and not from the message provider Signed-off-by: Alexia Ingerson --- include/ofi_util.h | 5 +- prov/rxm/src/rxm.h | 119 +++----- prov/rxm/src/rxm_attr.c | 3 +- prov/rxm/src/rxm_conn.c | 13 +- prov/rxm/src/rxm_cq.c | 390 ++++++++++++++------------ prov/rxm/src/rxm_domain.c | 24 +- prov/rxm/src/rxm_ep.c | 566 +++++++++++++------------------------- prov/rxm/src/rxm_init.c | 4 +- prov/rxm/src/rxm_msg.c | 235 ++-------------- prov/rxm/src/rxm_tagged.c | 210 ++------------ prov/tcp/src/xnet_av.c | 2 +- prov/util/src/rxm_av.c | 15 +- 12 files changed, 548 insertions(+), 1038 deletions(-) diff --git a/include/ofi_util.h b/include/ofi_util.h index dda5c903e6e..bc590bb4d1a 100644 --- a/include/ofi_util.h +++ b/include/ofi_util.h @@ -955,12 +955,15 @@ struct rxm_av { struct fid_peer_av peer_av; struct fid_av *util_coll_av; struct fid_av *offload_coll_av; + void (*foreach_ep)(struct util_av *av, struct util_ep *util_ep); }; int rxm_util_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, struct fid_av **fid_av, void *context, size_t conn_size, void (*remove_handler)(struct util_ep *util_ep, - struct util_peer_addr *peer)); + struct util_peer_addr *peer), + void (*foreach_ep)(struct util_av *av, + struct util_ep *ep)); size_t rxm_av_max_peers(struct rxm_av *av); void rxm_ref_peer(struct util_peer_addr *peer); void *rxm_av_alloc_conn(struct rxm_av *av); diff --git a/prov/rxm/src/rxm.h b/prov/rxm/src/rxm.h index 5d18f16e157..93e08624fc1 100644 --- a/prov/rxm/src/rxm.h +++ b/prov/rxm/src/rxm.h @@ -183,9 +183,9 @@ do { \ extern struct fi_provider rxm_prov; extern struct util_prov rxm_util_prov; -extern struct fi_ops_msg rxm_msg_ops; +extern struct fi_ops_msg rxm_msg_ops, rxm_no_recv_msg_ops; extern struct fi_ops_msg rxm_msg_thru_ops; -extern struct fi_ops_tagged rxm_tagged_ops; +extern struct fi_ops_tagged rxm_tagged_ops, rxm_no_recv_tagged_ops; extern struct fi_ops_tagged rxm_tagged_thru_ops; extern struct fi_ops_rma rxm_rma_ops; extern struct fi_ops_rma rxm_rma_thru_ops; @@ -265,6 +265,8 @@ struct rxm_fabric { struct rxm_domain { struct util_domain util_domain; struct fid_domain *msg_domain; + struct fid_ep rx_ep; + struct fid_peer_srx *srx; size_t max_atomic_size; size_t rx_post_size; uint64_t mr_key; @@ -443,24 +445,29 @@ rxm_sar_set_seg_type(struct ofi_ctrl_hdr *ctrl_hdr, enum rxm_sar_seg_type seg_ty ((union rxm_sar_ctrl_data *)&(ctrl_hdr->ctrl_data))->seg_type = seg_type; } -struct rxm_recv_match_attr { - fi_addr_t addr; - uint64_t tag; - uint64_t ignore; -}; - -struct rxm_unexp_msg { - struct dlist_entry entry; - fi_addr_t addr; - uint64_t tag; -}; - struct rxm_iov { struct iovec iov[RXM_IOV_LIMIT]; void *desc[RXM_IOV_LIMIT]; uint8_t count; }; +struct rxm_proto_info { + /* Used for SAR protocol */ + struct { + struct dlist_entry entry; + struct dlist_entry pkt_list; + struct fi_peer_rx_entry *rx_entry; + size_t total_recv_len; + struct rxm_conn *conn; + uint64_t msg_id; + } sar; + /* Used for Rendezvous protocol */ + struct { + /* This is used to send RNDV ACK */ + struct rxm_tx_buf *tx_buf; + } rndv; +}; + struct rxm_buf { /* Must stay at top */ struct fi_context fi_context; @@ -478,9 +485,10 @@ struct rxm_rx_buf { /* MSG EP / shared context to which bufs would be posted to */ struct fid_ep *rx_ep; struct dlist_entry repost_entry; + struct dlist_entry unexp_entry; struct rxm_conn *conn; /* msg ep data was received on */ - struct rxm_recv_entry *recv_entry; - struct rxm_unexp_msg unexp_msg; + struct fi_peer_rx_entry *peer_entry; + struct rxm_proto_info *proto_info; uint64_t comp_flags; struct fi_recv_context recv_context; bool repost; @@ -608,49 +616,6 @@ struct rxm_deferred_tx_entry { }; }; -struct rxm_recv_entry { - struct dlist_entry entry; - struct rxm_iov rxm_iov; - fi_addr_t addr; - void *context; - uint64_t flags; - uint64_t tag; - uint64_t ignore; - uint64_t comp_flags; - size_t total_len; - struct rxm_recv_queue *recv_queue; - - /* Used for SAR protocol */ - struct { - struct dlist_entry entry; - size_t total_recv_len; - struct rxm_conn *conn; - uint64_t msg_id; - } sar; - /* Used for Rendezvous protocol */ - struct { - /* This is used to send RNDV ACK */ - struct rxm_tx_buf *tx_buf; - } rndv; -}; -OFI_DECLARE_FREESTACK(struct rxm_recv_entry, rxm_recv_fs); - -enum rxm_recv_queue_type { - RXM_RECV_QUEUE_UNSPEC, - RXM_RECV_QUEUE_MSG, - RXM_RECV_QUEUE_TAGGED, -}; - -struct rxm_recv_queue { - struct rxm_ep *rxm_ep; - enum rxm_recv_queue_type type; - struct rxm_recv_fs *fs; - struct dlist_entry recv_list; - struct dlist_entry unexp_msg_list; - dlist_func_t *match_recv; - dlist_func_t *match_unexp; -}; - struct rxm_eager_ops { void (*comp_tx)(struct rxm_ep *rxm_ep, struct rxm_tx_buf *tx_eager_buf); @@ -690,6 +655,8 @@ struct rxm_ep { struct fi_ops_transfer_peer *offload_coll_peer_xfer_ops; uint64_t offload_coll_mask; + struct fid_peer_srx *srx; + struct fid_cq *msg_cq; uint64_t msg_cq_last_poll; size_t comp_per_progress; @@ -703,7 +670,6 @@ struct rxm_ep { bool do_progress; bool enable_direct_send; - size_t min_multi_recv_size; size_t buffered_min; size_t buffered_limit; size_t inject_limit; @@ -715,15 +681,13 @@ struct rxm_ep { struct ofi_bufpool *rx_pool; struct ofi_bufpool *tx_pool; struct ofi_bufpool *coll_pool; + struct ofi_bufpool *proto_info_pool; + struct rxm_pkt *inject_pkt; struct dlist_entry deferred_queue; struct dlist_entry rndv_wait_list; - struct rxm_recv_queue recv_queue; - struct rxm_recv_queue trecv_queue; - struct ofi_bufpool *multi_recv_pool; - struct rxm_eager_ops *eager_ops; struct rxm_rndv_ops *rndv_ops; }; @@ -757,6 +721,9 @@ int rxm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, struct fid_cq **cq_fid, void *context); ssize_t rxm_handle_rx_buf(struct rxm_rx_buf *rx_buf); +int rxm_srx_context(struct fid_domain *domain, struct fi_rx_attr *attr, + struct fid_ep **rx_ep, void *context); + int rxm_endpoint(struct fid_domain *domain, struct fi_info *info, struct fid_ep **ep, void *context); void rxm_cq_write_tx_error(struct rxm_ep *rxm_ep, uint8_t op, void *op_context, @@ -915,17 +882,10 @@ ssize_t rxm_inject_send(struct rxm_ep *rxm_ep, struct rxm_conn *rxm_conn, const void *buf, size_t len); -struct rxm_recv_entry * -rxm_recv_entry_get(struct rxm_ep *rxm_ep, const struct iovec *iov, - void **desc, size_t count, fi_addr_t src_addr, - uint64_t tag, uint64_t ignore, void *context, - uint64_t flags, struct rxm_recv_queue *recv_queue); -struct rxm_rx_buf * -rxm_get_unexp_msg(struct rxm_recv_queue *recv_queue, fi_addr_t addr, - uint64_t tag, uint64_t ignore); -ssize_t rxm_handle_unexp_sar(struct rxm_recv_queue *recv_queue, - struct rxm_recv_entry *recv_entry, - struct rxm_rx_buf *rx_buf); +ssize_t rxm_handle_unexp_sar(struct fi_peer_rx_entry *peer_entry); +int rxm_srx_context(struct fid_domain *domain, struct fi_rx_attr *attr, + struct fid_ep **rx_ep, void *context); + int rxm_post_recv(struct rxm_rx_buf *rx_buf); void rxm_av_remove_handler(struct util_ep *util_ep, struct util_peer_addr *peer); @@ -946,15 +906,6 @@ rxm_free_rx_buf(struct rxm_rx_buf *rx_buf) } } -static inline void -rxm_recv_entry_release(struct rxm_recv_entry *entry) -{ - if (entry->recv_queue) - ofi_freestack_push(entry->recv_queue->fs, entry); - else - ofi_buf_free(entry); -} - struct rxm_mr *rxm_mr_get_map_entry(struct rxm_domain *domain, uint64_t key); struct rxm_recv_entry * diff --git a/prov/rxm/src/rxm_attr.c b/prov/rxm/src/rxm_attr.c index 632543585e4..6dc1241329e 100644 --- a/prov/rxm/src/rxm_attr.c +++ b/prov/rxm/src/rxm_attr.c @@ -40,7 +40,8 @@ OFI_RX_RMA_CAPS | FI_ATOMICS | FI_DIRECTED_RECV | \ FI_MULTI_RECV) -#define RXM_DOMAIN_CAPS (FI_LOCAL_COMM | FI_REMOTE_COMM | FI_AV_USER_ID) +#define RXM_DOMAIN_CAPS (FI_LOCAL_COMM | FI_REMOTE_COMM | FI_AV_USER_ID | \ + FI_PEER) /* Since we are a layering provider, the attributes for which we rely on the diff --git a/prov/rxm/src/rxm_conn.c b/prov/rxm/src/rxm_conn.c index afe603234ec..73b26f2a9f3 100644 --- a/prov/rxm/src/rxm_conn.c +++ b/prov/rxm/src/rxm_conn.c @@ -58,7 +58,7 @@ struct rxm_eq_cm_entry { static void rxm_close_conn(struct rxm_conn *conn) { struct rxm_deferred_tx_entry *tx_entry; - struct rxm_recv_entry *rx_entry; + struct fi_peer_rx_entry *rx_entry; struct rxm_rx_buf *buf; FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "closing conn %p\n", conn); @@ -74,16 +74,13 @@ static void rxm_close_conn(struct rxm_conn *conn) while (!dlist_empty(&conn->deferred_sar_segments)) { buf = container_of(conn->deferred_sar_segments.next, - struct rxm_rx_buf, unexp_msg.entry); - dlist_remove(&buf->unexp_msg.entry); - rxm_free_rx_buf(buf); + struct rxm_rx_buf, unexp_entry); + dlist_remove(&buf->unexp_entry); } while (!dlist_empty(&conn->deferred_sar_msgs)) { - rx_entry = container_of(conn->deferred_sar_msgs.next, - struct rxm_recv_entry, sar.entry); - dlist_remove(&rx_entry->entry); - rxm_recv_entry_release(rx_entry); + rx_entry = (struct fi_peer_rx_entry*)conn->deferred_sar_msgs.next; + rx_entry->srx->owner_ops->free_entry(rx_entry); } fi_close(&conn->msg_ep->fid); rxm_flush_msg_cq(conn->ep); diff --git a/prov/rxm/src/rxm_cq.c b/prov/rxm/src/rxm_cq.c index b04b36444d3..51206ddde04 100644 --- a/prov/rxm/src/rxm_cq.c +++ b/prov/rxm/src/rxm_cq.c @@ -106,11 +106,12 @@ static void rxm_cq_write_recv_comp(struct rxm_rx_buf *rx_buf, void *context, { int ret; + flags &= ~FI_COMPLETION; if (rx_buf->ep->util_coll_peer_xfer_ops && rx_buf->pkt.hdr.tag & RXM_PEER_XFER_TAG_FLAG) { struct fi_cq_tagged_entry cqe = { .tag = rx_buf->pkt.hdr.tag, - .op_context = rx_buf->recv_entry->context, + .op_context = rx_buf->peer_entry->context, }; rx_buf->ep->util_coll_peer_xfer_ops-> complete(rx_buf->ep->util_coll_ep, &cqe, 0); @@ -137,7 +138,7 @@ static void rxm_finish_buf_recv(struct rxm_rx_buf *rx_buf) if ((rx_buf->pkt.ctrl_hdr.type == rxm_ctrl_seg) && rxm_sar_get_seg_type(&rx_buf->pkt.ctrl_hdr) != RXM_SAR_SEG_FIRST) { - dlist_insert_tail(&rx_buf->unexp_msg.entry, + dlist_insert_tail(&rx_buf->unexp_entry, &rx_buf->conn->deferred_sar_segments); rxm_replace_rx_buf(rx_buf); } @@ -172,10 +173,11 @@ static void rxm_cq_write_error_trunc(struct rxm_rx_buf *rx_buf, size_t done_len) done_len, rx_buf->pkt.hdr.size); ret = ofi_peer_cq_write_error_trunc( rx_buf->ep->util_ep.rx_cq, - rx_buf->recv_entry->context, - rx_buf->recv_entry->comp_flags | - rx_buf->pkt.hdr.flags, rx_buf->pkt.hdr.size, - rx_buf->recv_entry->rxm_iov.iov[0].iov_base, + rx_buf->peer_entry->context, + rx_buf->peer_entry->flags | + rx_buf->pkt.hdr.flags, + rx_buf->pkt.hdr.size, + rx_buf->peer_entry->iov[0].iov_base, rx_buf->pkt.hdr.data, rx_buf->pkt.hdr.tag, rx_buf->pkt.hdr.size - done_len); if (ret) { @@ -186,27 +188,22 @@ static void rxm_cq_write_error_trunc(struct rxm_rx_buf *rx_buf, size_t done_len) static void rxm_finish_recv(struct rxm_rx_buf *rx_buf, size_t done_len) { - struct rxm_recv_entry *recv_entry = rx_buf->recv_entry; - if (done_len < rx_buf->pkt.hdr.size) { rxm_cq_write_error_trunc(rx_buf, done_len); goto release; } - if (rx_buf->recv_entry->flags & FI_COMPLETION || + if (rx_buf->peer_entry->flags & FI_COMPLETION || rx_buf->ep->rxm_info->mode & OFI_BUFFERED_RECV) { - rxm_cq_write_recv_comp( - rx_buf, rx_buf->recv_entry->context, - rx_buf->recv_entry->comp_flags | - rx_buf->pkt.hdr.flags | - (rx_buf->recv_entry->flags & FI_MULTI_RECV), - rx_buf->pkt.hdr.size, - rx_buf->recv_entry->rxm_iov. - iov[0].iov_base); + rxm_cq_write_recv_comp(rx_buf, rx_buf->peer_entry->context, + rx_buf->peer_entry->flags | + rx_buf->pkt.hdr.flags, + rx_buf->pkt.hdr.size, + rx_buf->peer_entry->iov[0].iov_base); } ofi_ep_peer_rx_cntr_inc(&rx_buf->ep->util_ep, ofi_op_msg); release: - rxm_recv_entry_release(recv_entry); + rx_buf->ep->srx->owner_ops->free_entry(rx_buf->peer_entry); rxm_free_rx_buf(rx_buf); } @@ -294,18 +291,20 @@ static void rxm_handle_sar_comp(struct rxm_ep *rxm_ep, static void rxm_rndv_rx_finish(struct rxm_rx_buf *rx_buf) { + struct rxm_proto_info *proto_info; + RXM_UPDATE_STATE(FI_LOG_CQ, rx_buf, RXM_RNDV_FINISH); - if (rx_buf->recv_entry->rndv.tx_buf) { - ofi_buf_free(rx_buf->recv_entry->rndv.tx_buf); - rx_buf->recv_entry->rndv.tx_buf = NULL; + proto_info = rx_buf->proto_info; + if (proto_info->rndv.tx_buf) { + ofi_buf_free(proto_info); + ofi_buf_free(proto_info->rndv.tx_buf); } if (!rx_buf->ep->rdm_mr_local) - rxm_msg_mr_closev(rx_buf->mr, - rx_buf->recv_entry->rxm_iov.count); + rxm_msg_mr_closev(rx_buf->mr, rx_buf->peer_entry->count); - rxm_finish_recv(rx_buf, rx_buf->recv_entry->total_len); + rxm_finish_recv(rx_buf, rx_buf->peer_entry->msg_size); } static void rxm_rndv_tx_finish(struct rxm_ep *rxm_ep, @@ -398,96 +397,135 @@ static int rxm_rx_buf_match_msg_id(struct dlist_entry *item, const void *arg) uint64_t msg_id = *((uint64_t *) arg); struct rxm_rx_buf *rx_buf; - rx_buf = container_of(item, struct rxm_rx_buf, unexp_msg.entry); + rx_buf = container_of(item, struct rxm_rx_buf, unexp_entry); return (msg_id == rx_buf->pkt.ctrl_hdr.msg_id); } -static void rxm_process_seg_data(struct rxm_rx_buf *rx_buf, int *done) +static void rxm_init_sar_proto(struct rxm_rx_buf *rx_buf) +{ + struct rxm_proto_info *proto_info; + + proto_info = ofi_buf_alloc(rx_buf->ep->proto_info_pool); + if (!proto_info) { + FI_WARN(&rxm_prov, FI_LOG_CQ, + "Failed to allocate proto info buffer\n"); + return; + } + if (!rx_buf->conn) { + rx_buf->conn = ofi_idm_at(&rx_buf->ep->conn_idx_map, + (int) rx_buf->pkt.ctrl_hdr.conn_id); + } + + proto_info->sar.conn = rx_buf->conn; + proto_info->sar.msg_id = rx_buf->pkt.ctrl_hdr.msg_id; + proto_info->sar.total_recv_len = 0; + proto_info->sar.rx_entry = rx_buf->peer_entry; + + dlist_insert_tail(&proto_info->sar.entry, + &rx_buf->conn->deferred_sar_msgs); + + dlist_init(&proto_info->sar.pkt_list); + if (rx_buf->peer_entry->peer_context) + dlist_insert_tail(&rx_buf->unexp_entry, + &proto_info->sar.pkt_list); + + + rx_buf->proto_info = proto_info; +} + +int rxm_process_seg_data(struct rxm_rx_buf *rx_buf) { enum fi_hmem_iface iface; + struct rxm_proto_info *proto_info; uint64_t device; ssize_t done_len; + int done = 0; - iface = rxm_iov_desc_to_hmem_iface_dev(rx_buf->recv_entry->rxm_iov.iov, - rx_buf->recv_entry->rxm_iov.desc, - rx_buf->recv_entry->rxm_iov.count, + proto_info = rx_buf->proto_info; + iface = rxm_iov_desc_to_hmem_iface_dev(rx_buf->peer_entry->iov, + rx_buf->peer_entry->desc, + rx_buf->peer_entry->count, &device); done_len = ofi_copy_to_hmem_iov(iface, device, - rx_buf->recv_entry->rxm_iov.iov, - rx_buf->recv_entry->rxm_iov.count, - rx_buf->recv_entry->sar.total_recv_len, + rx_buf->peer_entry->iov, + rx_buf->peer_entry->count, + proto_info->sar.total_recv_len, rx_buf->pkt.data, rx_buf->pkt.ctrl_hdr.seg_size); assert(done_len == rx_buf->pkt.ctrl_hdr.seg_size); - rx_buf->recv_entry->sar.total_recv_len += done_len; + proto_info->sar.total_recv_len += done_len; if ((rxm_sar_get_seg_type(&rx_buf->pkt.ctrl_hdr) == RXM_SAR_SEG_LAST) || (done_len != rx_buf->pkt.ctrl_hdr.seg_size)) { - - dlist_remove(&rx_buf->recv_entry->sar.entry); - - /* Mark rxm_recv_entry::msg_id as unknown for futher re-use */ - rx_buf->recv_entry->sar.msg_id = RXM_SAR_RX_INIT; - - done_len = rx_buf->recv_entry->sar.total_recv_len; - rx_buf->recv_entry->sar.total_recv_len = 0; - - *done = 1; + if (!rx_buf->peer_entry->peer_context) + dlist_remove(&proto_info->sar.entry); + done_len = proto_info->sar.total_recv_len; + done = 1; + ofi_buf_free(rx_buf->proto_info); rxm_finish_recv(rx_buf, done_len); } else { - if (rx_buf->recv_entry->sar.msg_id == RXM_SAR_RX_INIT) { - if (!rx_buf->conn) { - rx_buf->conn = ofi_idm_at(&rx_buf->ep->conn_idx_map, - (int) rx_buf->pkt.ctrl_hdr.conn_id); - } - - rx_buf->recv_entry->sar.conn = rx_buf->conn; - rx_buf->recv_entry->sar.msg_id = rx_buf->pkt.ctrl_hdr.msg_id; - - dlist_insert_tail(&rx_buf->recv_entry->sar.entry, - &rx_buf->conn->deferred_sar_msgs); - } - /* The RX buffer can be reposted for further re-use */ - rx_buf->recv_entry = NULL; + rx_buf->peer_entry = NULL; rxm_free_rx_buf(rx_buf); - - *done = 0; } + return done; } static void rxm_handle_seg_data(struct rxm_rx_buf *rx_buf) { - struct rxm_recv_entry *recv_entry; + struct rxm_proto_info *proto_info; + struct fi_peer_rx_entry *rx_entry; struct rxm_conn *conn; uint64_t msg_id; struct dlist_entry *entry; - int done; - rxm_process_seg_data(rx_buf, &done); - if (done || !(rx_buf->ep->rxm_info->mode & OFI_BUFFERED_RECV)) + if (dlist_empty(&rx_buf->proto_info->sar.pkt_list)) { + rxm_process_seg_data(rx_buf); return; + } - recv_entry = rx_buf->recv_entry; + proto_info = rx_buf->proto_info; + dlist_insert_tail(&rx_buf->unexp_entry, &proto_info->sar.pkt_list); + + if ((rxm_sar_get_seg_type(&rx_buf->pkt.ctrl_hdr) == RXM_SAR_SEG_LAST)) + dlist_remove(&proto_info->sar.entry); + + rx_entry = rx_buf->peer_entry; conn = rx_buf->conn; msg_id = rx_buf->pkt.ctrl_hdr.msg_id; dlist_foreach_container_safe(&conn->deferred_sar_segments, struct rxm_rx_buf, rx_buf, - unexp_msg.entry, entry) { - if (!rxm_rx_buf_match_msg_id(&rx_buf->unexp_msg.entry, &msg_id)) + unexp_entry, entry) { + if (!rxm_rx_buf_match_msg_id(&rx_buf->unexp_entry, &msg_id)) continue; - dlist_remove(&rx_buf->unexp_msg.entry); - rx_buf->recv_entry = recv_entry; - rxm_process_seg_data(rx_buf, &done); - if (done) + dlist_remove(&rx_buf->unexp_entry); + rx_buf->peer_entry = rx_entry; + if (rxm_process_seg_data(rx_buf)) break; } } +ssize_t rxm_handle_unexp_sar(struct fi_peer_rx_entry *peer_entry) +{ + struct rxm_proto_info *proto_info; + struct rxm_rx_buf *rx_buf; + + rx_buf = (struct rxm_rx_buf *) peer_entry->peer_context; + proto_info = rx_buf->proto_info; + + while (!dlist_empty(&proto_info->sar.pkt_list)) { + dlist_pop_front(&proto_info->sar.pkt_list, + struct rxm_rx_buf, rx_buf, unexp_entry); + rxm_process_seg_data(rx_buf); + } + peer_entry->peer_context = NULL; + return FI_SUCCESS; +} + static ssize_t rxm_rndv_xfer(struct rxm_ep *rxm_ep, struct fid_ep *msg_ep, struct rxm_rndv_hdr *remote_hdr, struct iovec *local_iov, void **local_desc, size_t local_count, size_t total_len, @@ -538,14 +576,15 @@ ssize_t rxm_rndv_read(struct rxm_rx_buf *rx_buf) ssize_t ret; size_t total_len; - total_len = MIN(rx_buf->recv_entry->total_len, rx_buf->pkt.hdr.size); + total_len = MIN(rx_buf->peer_entry->msg_size, rx_buf->pkt.hdr.size); + rx_buf->peer_entry->msg_size = total_len; RXM_UPDATE_STATE(FI_LOG_CQ, rx_buf, RXM_RNDV_READ); ret = rxm_rndv_xfer(rx_buf->ep, rx_buf->conn->msg_ep, rx_buf->remote_rndv_hdr, - rx_buf->recv_entry->rxm_iov.iov, - rx_buf->recv_entry->rxm_iov.desc, - rx_buf->recv_entry->rxm_iov.count, total_len, + rx_buf->peer_entry->iov, + rx_buf->peer_entry->desc, + rx_buf->peer_entry->count, total_len, rx_buf); if (ret) { rxm_cq_write_rx_error(rx_buf->ep, ofi_op_msg, rx_buf, @@ -621,28 +660,26 @@ static ssize_t rxm_handle_rndv(struct rxm_rx_buf *rx_buf) rx_buf->rndv_rma_index = 0; if (!rx_buf->ep->rdm_mr_local) { - total_recv_len = MIN(rx_buf->recv_entry->total_len, + total_recv_len = MIN(rx_buf->peer_entry->msg_size, rx_buf->pkt.hdr.size); - ret = rxm_msg_mr_regv(rx_buf->ep, rx_buf->recv_entry->rxm_iov.iov, - rx_buf->recv_entry->rxm_iov.count, + ret = rxm_msg_mr_regv(rx_buf->ep, rx_buf->peer_entry->iov, + rx_buf->peer_entry->count, total_recv_len, rx_buf->ep->rndv_ops->rx_mr_access, rx_buf->mr); if (ret) return ret; - for (i = 0; (i < rx_buf->recv_entry->rxm_iov.count && + for (i = 0; (i < rx_buf->peer_entry->count && rx_buf->mr[i]); i++) { - rx_buf->recv_entry->rxm_iov.desc[i] = - fi_mr_desc(rx_buf->mr[i]); + rx_buf->peer_entry->desc[i] = fi_mr_desc(rx_buf->mr[i]); } } else { struct rxm_mr *mr; - for (i = 0; i < rx_buf->recv_entry->rxm_iov.count; i++) { - mr = rx_buf->recv_entry->rxm_iov.desc[i]; - rx_buf->recv_entry->rxm_iov.desc[i] = - fi_mr_desc(mr->msg_mr); + for (i = 0; i < rx_buf->peer_entry->count; i++) { + mr = rx_buf->peer_entry->desc[i]; + rx_buf->peer_entry->desc[i] = fi_mr_desc(mr->msg_mr); rx_buf->mr[i] = mr->msg_mr; } } @@ -656,9 +693,9 @@ static ssize_t rxm_handle_rndv(struct rxm_rx_buf *rx_buf) void rxm_handle_eager(struct rxm_rx_buf *rx_buf) { ssize_t done_len = rxm_copy_to_hmem_iov( - rx_buf->recv_entry->rxm_iov.desc, rx_buf->data, - rx_buf->pkt.hdr.size, rx_buf->recv_entry->rxm_iov.iov, - rx_buf->recv_entry->rxm_iov.count, 0); + rx_buf->peer_entry->desc, rx_buf->data, + rx_buf->pkt.hdr.size, rx_buf->peer_entry->iov, + rx_buf->peer_entry->count, 0); assert((size_t) done_len == rx_buf->pkt.hdr.size); @@ -671,14 +708,14 @@ void rxm_handle_coll_eager(struct rxm_rx_buf *rx_buf) uint64_t device; ssize_t done_len; - iface = rxm_iov_desc_to_hmem_iface_dev(rx_buf->recv_entry->rxm_iov.iov, - rx_buf->recv_entry->rxm_iov.desc, - rx_buf->recv_entry->rxm_iov.count, + iface = rxm_iov_desc_to_hmem_iface_dev(rx_buf->peer_entry->iov, + rx_buf->peer_entry->desc, + rx_buf->peer_entry->count, &device); done_len = ofi_copy_to_hmem_iov(iface, device, - rx_buf->recv_entry->rxm_iov.iov, - rx_buf->recv_entry->rxm_iov.count, 0, + rx_buf->peer_entry->iov, + rx_buf->peer_entry->count, 0, rx_buf->data, rx_buf->pkt.hdr.size); assert((size_t) done_len == rx_buf->pkt.hdr.size); @@ -686,11 +723,11 @@ void rxm_handle_coll_eager(struct rxm_rx_buf *rx_buf) rx_buf->pkt.hdr.tag & RXM_PEER_XFER_TAG_FLAG) { struct fi_cq_tagged_entry cqe = { .tag = rx_buf->pkt.hdr.tag, - .op_context = rx_buf->recv_entry->context, + .op_context = rx_buf->peer_entry->context, }; rx_buf->ep->util_coll_peer_xfer_ops-> complete(rx_buf->ep->util_coll_ep, &cqe, 0); - rxm_recv_entry_release(rx_buf->recv_entry); + rx_buf->ep->srx->owner_ops->free_entry(rx_buf->peer_entry); rxm_free_rx_buf(rx_buf); } else { rxm_finish_recv(rx_buf, done_len); @@ -715,73 +752,26 @@ ssize_t rxm_handle_rx_buf(struct rxm_rx_buf *rx_buf) } } -static void rxm_adjust_multi_recv(struct rxm_rx_buf *rx_buf) +static inline void rxm_entry_prep_for_queue(struct fi_peer_rx_entry *rx_entry, + struct rxm_rx_buf *rx_buf) { - struct rxm_recv_entry *recv_entry; - struct iovec new_iov; - size_t recv_size; - - recv_size = rx_buf->pkt.hdr.size; - - if (rx_buf->recv_entry->rxm_iov.iov[0].iov_len < recv_size || - rx_buf->recv_entry->rxm_iov.iov[0].iov_len - recv_size < - rx_buf->ep->min_multi_recv_size) - return; - - new_iov.iov_base = (uint8_t *) - rx_buf->recv_entry->rxm_iov.iov[0].iov_base + recv_size; - new_iov.iov_len = rx_buf->recv_entry->rxm_iov.iov[0].iov_len - recv_size;; - - rx_buf->recv_entry->rxm_iov.iov[0].iov_len = recv_size; - - recv_entry = rxm_multi_recv_entry_get(rx_buf->ep, &new_iov, - rx_buf->recv_entry->rxm_iov.desc, 1, - rx_buf->recv_entry->addr, - rx_buf->recv_entry->tag, - rx_buf->recv_entry->ignore, - rx_buf->recv_entry->context, - rx_buf->recv_entry->flags); - - rx_buf->recv_entry->flags &= ~FI_MULTI_RECV; - - dlist_insert_head(&recv_entry->entry, &rx_buf->ep->recv_queue.recv_list); -} - -static ssize_t -rxm_match_rx_buf(struct rxm_rx_buf *rx_buf, - struct rxm_recv_queue *recv_queue, - struct rxm_recv_match_attr *match_attr) -{ - struct dlist_entry *entry; - - entry = dlist_remove_first_match(&recv_queue->recv_list, - recv_queue->match_recv, match_attr); - if (entry) { - rx_buf->recv_entry = container_of(entry, struct rxm_recv_entry, entry); - - if (rx_buf->recv_entry->flags & FI_MULTI_RECV) - rxm_adjust_multi_recv(rx_buf); - - return rxm_handle_rx_buf(rx_buf); + rx_entry->peer_context = rx_buf; + rx_buf->peer_entry = rx_entry; + if (rx_buf->pkt.hdr.flags & FI_REMOTE_CQ_DATA) { + rx_entry->flags |= FI_REMOTE_CQ_DATA; + rx_entry->cq_data = rx_buf->pkt.hdr.data; } - - RXM_DBG_ADDR_TAG(FI_LOG_CQ, "No matching recv found for incoming msg", - match_attr->addr, match_attr->tag); - FI_DBG(&rxm_prov, FI_LOG_CQ, "Enqueueing msg to unexpected msg queue\n"); - rx_buf->unexp_msg.addr = match_attr->addr; - rx_buf->unexp_msg.tag = match_attr->tag; - - dlist_insert_tail(&rx_buf->unexp_msg.entry, - &recv_queue->unexp_msg_list); + if (rx_buf->pkt.ctrl_hdr.type == rxm_ctrl_seg) + rxm_init_sar_proto(rx_buf); rxm_replace_rx_buf(rx_buf); - return 0; } static ssize_t rxm_handle_recv_comp(struct rxm_rx_buf *rx_buf) { - struct rxm_recv_match_attr match_attr = { - .addr = FI_ADDR_UNSPEC, - }; + struct fid_peer_srx *srx = rx_buf->ep->srx; + struct fi_peer_rx_entry *rx_entry; + struct fi_peer_match_attr match = {0}; + int ret; if (rx_buf->ep->rxm_info->caps & (FI_SOURCE | FI_DIRECTED_RECV)) { if (rx_buf->ep->msg_srx) @@ -789,7 +779,9 @@ static ssize_t rxm_handle_recv_comp(struct rxm_rx_buf *rx_buf) (int) rx_buf->pkt.ctrl_hdr.conn_id); if (!rx_buf->conn) return -FI_EOTHER; - match_attr.addr = rx_buf->conn->peer->fi_addr; + match.addr = rx_buf->conn->peer->fi_addr; + } else { + match.addr = FI_ADDR_UNSPEC; } if (rx_buf->ep->rxm_info->mode & OFI_BUFFERED_RECV) { @@ -799,33 +791,52 @@ static ssize_t rxm_handle_recv_comp(struct rxm_rx_buf *rx_buf) switch(rx_buf->pkt.hdr.op) { case ofi_op_msg: + match.msg_size = rx_buf->pkt.hdr.size; FI_DBG(&rxm_prov, FI_LOG_CQ, "Got MSG op\n"); - return rxm_match_rx_buf(rx_buf, &rx_buf->ep->recv_queue, - &match_attr); + ret = srx->owner_ops->get_msg(srx, &match, &rx_entry); + if (ret == -FI_ENOENT) { + rxm_entry_prep_for_queue(rx_entry, rx_buf); + return srx->owner_ops->queue_msg(rx_entry); + } + rx_entry->peer_context = NULL; + break; case ofi_op_tagged: + match.tag = rx_buf->pkt.hdr.tag; + match.msg_size = rx_buf->pkt.hdr.size; FI_DBG(&rxm_prov, FI_LOG_CQ, "Got TAGGED op\n"); - match_attr.tag = rx_buf->pkt.hdr.tag; - return rxm_match_rx_buf(rx_buf, &rx_buf->ep->trecv_queue, - &match_attr); + ret = srx->owner_ops->get_tag(srx, &match, &rx_entry); + if (ret == -FI_ENOENT) { + rxm_entry_prep_for_queue(rx_entry, rx_buf); + return srx->owner_ops->queue_tag(rx_entry); + } + rx_entry->peer_context = NULL; + break; default: FI_WARN(&rxm_prov, FI_LOG_CQ, "Unknown op!\n"); assert(0); return -FI_EINVAL; } + rx_buf->peer_entry = rx_entry; + + if (rx_buf->pkt.ctrl_hdr.type == rxm_ctrl_seg) + rxm_init_sar_proto(rx_buf); + + return rxm_handle_rx_buf(rx_buf); } static int rxm_sar_match_msg_id(struct dlist_entry *item, const void *arg) { uint64_t msg_id = *((uint64_t *) arg); - struct rxm_recv_entry *recv_entry; + struct rxm_proto_info *proto_info; - recv_entry = container_of(item, struct rxm_recv_entry, sar.entry); - return (msg_id == recv_entry->sar.msg_id); + proto_info = container_of(item, struct rxm_proto_info, sar.entry); + return (msg_id == proto_info->sar.msg_id); } static ssize_t rxm_sar_handle_segment(struct rxm_rx_buf *rx_buf) { struct dlist_entry *sar_entry; + struct rxm_proto_info *proto_info; rx_buf->conn = ofi_idm_at(&rx_buf->ep->conn_idx_map, (int) rx_buf->pkt.ctrl_hdr.conn_id); @@ -841,8 +852,9 @@ static ssize_t rxm_sar_handle_segment(struct rxm_rx_buf *rx_buf) if (!sar_entry) return rxm_handle_recv_comp(rx_buf); - rx_buf->recv_entry = container_of(sar_entry, struct rxm_recv_entry, - sar.entry); + proto_info = container_of(sar_entry, struct rxm_proto_info, sar.entry); + rx_buf->peer_entry = proto_info->sar.rx_entry; + rx_buf->proto_info = proto_info; rxm_handle_seg_data(rx_buf); return 0; } @@ -860,8 +872,15 @@ static void rxm_rndv_send_rd_done(struct rxm_rx_buf *rx_buf) ret = -FI_ENOMEM; goto err; } + rx_buf->proto_info = ofi_buf_alloc(rx_buf->ep->proto_info_pool); + if (!rx_buf->proto_info) { + FI_WARN(&rxm_prov, FI_LOG_CQ, + "Failed to allocated proto info buf\n"); + assert(0); + return; + } - rx_buf->recv_entry->rndv.tx_buf = buf; + rx_buf->proto_info->rndv.tx_buf = buf; buf->pkt.ctrl_hdr.type = rxm_ctrl_rndv_rd_done; buf->pkt.ctrl_hdr.conn_id = rx_buf->conn->remote_index; @@ -888,8 +907,9 @@ static void rxm_rndv_send_rd_done(struct rxm_rx_buf *rx_buf) return; free: + rx_buf->proto_info->rndv.tx_buf = NULL; + ofi_buf_free(rx_buf->proto_info); ofi_buf_free(buf); - rx_buf->recv_entry->rndv.tx_buf = NULL; err: FI_WARN(&rxm_prov, FI_LOG_CQ, "unable to allocate/send rd rndv ack: %s\n", @@ -968,14 +988,22 @@ ssize_t rxm_rndv_send_wr_data(struct rxm_rx_buf *rx_buf) goto err; } - rx_buf->recv_entry->rndv.tx_buf = buf; + rx_buf->proto_info = ofi_buf_alloc(rx_buf->ep->proto_info_pool); + if (!rx_buf->proto_info) { + FI_WARN(&rxm_prov, FI_LOG_CQ, + "Failed to allocated proto info buf\n"); + return -FI_ENOMEM; + } + + rx_buf->proto_info->rndv.tx_buf = buf; + buf->pkt.ctrl_hdr.type = rxm_ctrl_rndv_wr_data; buf->pkt.ctrl_hdr.conn_id = rx_buf->conn->remote_index; buf->pkt.ctrl_hdr.msg_id = rx_buf->pkt.ctrl_hdr.msg_id; rxm_rndv_hdr_init(rx_buf->ep, buf->pkt.data, - rx_buf->recv_entry->rxm_iov.iov, - rx_buf->recv_entry->rxm_iov.count, rx_buf->mr); + rx_buf->peer_entry->iov, + rx_buf->peer_entry->count, rx_buf->mr); ret = fi_send(rx_buf->conn->msg_ep, &buf->pkt, sizeof(buf->pkt) + sizeof(struct rxm_rndv_hdr), buf->hdr.desc, 0, rx_buf); @@ -999,8 +1027,9 @@ ssize_t rxm_rndv_send_wr_data(struct rxm_rx_buf *rx_buf) return 0; free: + rx_buf->proto_info->rndv.tx_buf = NULL; + ofi_buf_free(rx_buf->proto_info); ofi_buf_free(buf); - rx_buf->recv_entry->rndv.tx_buf = NULL; err: FI_WARN(&rxm_prov, FI_LOG_CQ, "unable to allocate/send wr rndv ready: %s\n", @@ -1638,7 +1667,7 @@ void rxm_handle_comp_error(struct rxm_ep *rxm_ep) * the event yet. */ rx_buf = (struct rxm_rx_buf *) err_entry.op_context; - if (!rx_buf->recv_entry) { + if (!rx_buf->peer_entry) { ofi_buf_free((struct rxm_rx_buf *)err_entry.op_context); return; } @@ -1647,9 +1676,9 @@ void rxm_handle_comp_error(struct rxm_ep *rxm_ep) case RXM_RNDV_WRITE_DATA_SENT: /* BUG: should fail initial send */ case RXM_RNDV_READ: rx_buf = (struct rxm_rx_buf *) err_entry.op_context; - assert(rx_buf->recv_entry); - err_entry.op_context = rx_buf->recv_entry->context; - err_entry.flags = rx_buf->recv_entry->comp_flags; + assert(rx_buf->peer_entry); + err_entry.op_context = rx_buf->peer_entry->context; + err_entry.flags = rx_buf->peer_entry->flags; cq = rx_buf->ep->util_ep.rx_cq; cntr = rx_buf->ep->util_ep.cntrs[CNTR_RX]; @@ -1780,7 +1809,8 @@ int rxm_post_recv(struct rxm_rx_buf *rx_buf) if (rx_buf->ep->msg_srx) rx_buf->conn = NULL; rx_buf->hdr.state = RXM_RX; - rx_buf->recv_entry = NULL; + rx_buf->peer_entry = NULL; + rx_buf->proto_info = NULL; domain = container_of(rx_buf->ep->util_ep.domain, struct rxm_domain, util_domain); @@ -1858,7 +1888,7 @@ void rxm_ep_do_progress(struct util_ep *util_ep) rxm_conn_progress(rxm_ep); } } else { - rxm_conn_progress(rxm_ep); + rxm_conn_progress(rxm_ep); } } } while ((ret > 0) && (comp_read < rxm_ep->comp_per_progress)); @@ -1975,6 +2005,9 @@ int rxm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, if (ret) goto err1; + if (attr->flags & FI_PEER) + goto out; + rxm_domain = container_of(domain, struct rxm_domain, util_domain.domain_fid); @@ -1996,11 +2029,12 @@ int rxm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr, if (ret) goto err2; } + rxm_cq->util_cq.cq_fid.ops = &rxm_cq_ops; +out: *cq_fid = &rxm_cq->util_cq.cq_fid; /* Override util_cq_fi_ops */ (*cq_fid)->fid.ops = &rxm_cq_fi_ops; - (*cq_fid)->ops = &rxm_cq_ops; return 0; err2: diff --git a/prov/rxm/src/rxm_domain.c b/prov/rxm/src/rxm_domain.c index 055fca16bea..9fcadf56763 100644 --- a/prov/rxm/src/rxm_domain.c +++ b/prov/rxm/src/rxm_domain.c @@ -221,6 +221,25 @@ static struct fi_ops_av_owner rxm_av_owner_ops = { .ep_addr = rxm_peer_av_ep_addr, }; +static fi_addr_t rxm_get_addr(struct fi_peer_rx_entry *rx_entry) +{ + struct rxm_rx_buf *rx_buf = rx_entry->peer_context; + + return rx_buf->conn->peer->fi_addr; +} + +static void rxm_foreach_ep(struct util_av *av, struct util_ep *ep) +{ + struct rxm_ep *rxm_ep; + struct fid_peer_srx *peer_srx; + + rxm_ep = container_of(ep, struct rxm_ep, util_ep); + peer_srx = container_of(rxm_ep->srx, struct fid_peer_srx, ep_fid); + if (peer_srx) + peer_srx->owner_ops->foreach_unspec_addr(peer_srx, &rxm_get_addr); +} + + static int rxm_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, struct fid_av **fid_av, void *context) @@ -236,7 +255,8 @@ rxm_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, ret = rxm_util_av_open(domain_fid, attr, &fid_av_new, context, sizeof(struct rxm_conn), - ofi_av_remove_cleanup ? rxm_av_remove_handler : NULL); + ofi_av_remove_cleanup ? rxm_av_remove_handler : NULL, + &rxm_foreach_ep); if (ret) return ret; @@ -346,7 +366,7 @@ static struct fi_ops_domain rxm_domain_ops = { .cntr_open = rxm_cntr_open, .poll_open = fi_poll_create, .stx_ctx = fi_no_stx_context, - .srx_ctx = fi_no_srx_context, + .srx_ctx = rxm_srx_context, .query_atomic = rxm_ep_query_atomic, .query_collective = rxm_query_collective, }; diff --git a/prov/rxm/src/rxm_ep.c b/prov/rxm/src/rxm_ep.c index 69a88e2caaf..b967643c0c5 100644 --- a/prov/rxm/src/rxm_ep.c +++ b/prov/rxm/src/rxm_ep.c @@ -42,79 +42,6 @@ #include "rxm.h" -static int rxm_match_noop(struct dlist_entry *item, const void *arg) -{ - OFI_UNUSED(item); - OFI_UNUSED(arg); - return 1; -} - -static int rxm_match_recv_entry(struct dlist_entry *item, const void *arg) -{ - struct rxm_recv_match_attr *attr = (struct rxm_recv_match_attr *) arg; - struct rxm_recv_entry *recv_entry = - container_of(item, struct rxm_recv_entry, entry); - return ofi_match_addr(recv_entry->addr, attr->addr); -} - -static int rxm_match_recv_entry_tag(struct dlist_entry *item, const void *arg) -{ - struct rxm_recv_match_attr *attr = (struct rxm_recv_match_attr *) arg; - struct rxm_recv_entry *recv_entry = - container_of(item, struct rxm_recv_entry, entry); - return ofi_match_tag(recv_entry->tag, recv_entry->ignore, attr->tag); -} - -static int rxm_match_recv_entry_tag_addr(struct dlist_entry *item, const void *arg) -{ - struct rxm_recv_match_attr *attr = (struct rxm_recv_match_attr *) arg; - struct rxm_recv_entry *recv_entry = - container_of(item, struct rxm_recv_entry, entry); - return ofi_match_addr(recv_entry->addr, attr->addr) && - ofi_match_tag(recv_entry->tag, recv_entry->ignore, attr->tag); -} - -static int rxm_match_recv_entry_context(struct dlist_entry *item, const void *context) -{ - struct rxm_recv_entry *recv_entry = - container_of(item, struct rxm_recv_entry, entry); - return recv_entry->context == context; -} - -static fi_addr_t rxm_get_unexp_addr(struct rxm_unexp_msg *unexp_msg) -{ - struct rxm_rx_buf *rx_buf; - - rx_buf = container_of(unexp_msg, struct rxm_rx_buf, unexp_msg); - return (unexp_msg->addr != FI_ADDR_UNSPEC) ? - unexp_msg->addr : rx_buf->conn->peer->fi_addr; -} - -static int rxm_match_unexp_msg(struct dlist_entry *item, const void *arg) -{ - struct rxm_recv_match_attr *attr = (struct rxm_recv_match_attr *)arg; - struct rxm_unexp_msg *unexp_msg = - container_of(item, struct rxm_unexp_msg, entry); - return ofi_match_addr(attr->addr, rxm_get_unexp_addr(unexp_msg)); -} - -static int rxm_match_unexp_msg_tag(struct dlist_entry *item, const void *arg) -{ - struct rxm_recv_match_attr *attr = (struct rxm_recv_match_attr *) arg; - struct rxm_unexp_msg *unexp_msg = - container_of(item, struct rxm_unexp_msg, entry); - return ofi_match_tag(attr->tag, attr->ignore, unexp_msg->tag); -} - -static int rxm_match_unexp_msg_tag_addr(struct dlist_entry *item, const void *arg) -{ - struct rxm_recv_match_attr *attr = (struct rxm_recv_match_attr *) arg; - struct rxm_unexp_msg *unexp_msg = - container_of(item, struct rxm_unexp_msg, entry); - return ofi_match_addr(attr->addr, rxm_get_unexp_addr(unexp_msg)) && - ofi_match_tag(attr->tag, attr->ignore, unexp_msg->tag); -} - static int rxm_buf_reg(struct ofi_bufpool_region *region) { struct rxm_ep *rxm_ep = region->pool->attr.context; @@ -158,6 +85,7 @@ static void rxm_init_rx_buf(struct ofi_bufpool_region *region, void *buf) fi_mr_desc((struct fid_mr *) region->context) : NULL; rx_buf->ep = ep; rx_buf->data = &rx_buf->pkt.data; + dlist_init(&rx_buf->unexp_entry); } static void rxm_init_tx_buf(struct ofi_bufpool_region *region, void *buf) @@ -186,69 +114,6 @@ static void rxm_buf_close(struct ofi_bufpool_region *region) } } -static void rxm_recv_entry_init(struct rxm_recv_entry *entry, void *arg) -{ - struct rxm_recv_queue *recv_queue = arg; - - assert(recv_queue->type != RXM_RECV_QUEUE_UNSPEC); - - entry->recv_queue = recv_queue; - entry->sar.msg_id = RXM_SAR_RX_INIT; - entry->sar.total_recv_len = 0; - /* set it to NULL to differentiate between regular ACKs and those - * sent with FI_INJECT */ - entry->rndv.tx_buf = NULL; - entry->comp_flags = FI_RECV; - - if (recv_queue->type == RXM_RECV_QUEUE_MSG) - entry->comp_flags |= FI_MSG; - else - entry->comp_flags |= FI_TAGGED; -} - -static int rxm_recv_queue_init(struct rxm_ep *rxm_ep, struct rxm_recv_queue *recv_queue, - size_t size, enum rxm_recv_queue_type type) -{ - recv_queue->rxm_ep = rxm_ep; - recv_queue->type = type; - recv_queue->fs = rxm_recv_fs_create(size, rxm_recv_entry_init, - recv_queue); - if (!recv_queue->fs) - return -FI_ENOMEM; - - dlist_init(&recv_queue->recv_list); - dlist_init(&recv_queue->unexp_msg_list); - if (type == RXM_RECV_QUEUE_MSG) { - if (rxm_ep->rxm_info->caps & FI_DIRECTED_RECV) { - recv_queue->match_recv = rxm_match_recv_entry; - recv_queue->match_unexp = rxm_match_unexp_msg; - } else { - recv_queue->match_recv = rxm_match_noop; - recv_queue->match_unexp = rxm_match_noop; - } - } else { - if (rxm_ep->rxm_info->caps & FI_DIRECTED_RECV) { - recv_queue->match_recv = rxm_match_recv_entry_tag_addr; - recv_queue->match_unexp = rxm_match_unexp_msg_tag_addr; - } else { - recv_queue->match_recv = rxm_match_recv_entry_tag; - recv_queue->match_unexp = rxm_match_unexp_msg_tag; - } - } - - return 0; -} - -static void rxm_recv_queue_close(struct rxm_recv_queue *recv_queue) -{ - /* It indicates that the recv_queue were allocated */ - if (recv_queue->fs) { - rxm_recv_fs_free(recv_queue->fs); - recv_queue->fs = NULL; - } - // TODO cleanup recv_list and unexp msg list -} - static int rxm_ep_create_pools(struct rxm_ep *rxm_ep) { struct ofi_bufpool_attr attr = {0}; @@ -287,8 +152,18 @@ static int rxm_ep_create_pools(struct rxm_ep *rxm_ep) "Unable to create peer xfer context pool\n"); goto free_tx_pool; } - return 0; + attr.size = sizeof(struct rxm_proto_info); + attr.alloc_fn = NULL; + attr.free_fn = NULL; + attr.init_fn = NULL; + ret = ofi_bufpool_create_attr(&attr, &rxm_ep->proto_info_pool); + if (ret) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, + "Unable to create proto info pool\n"); + goto free_tx_pool; + } + return 0; free_tx_pool: ofi_bufpool_destroy(rxm_ep->tx_pool); @@ -298,62 +173,13 @@ static int rxm_ep_create_pools(struct rxm_ep *rxm_ep) return ret; } -static int rxm_multi_recv_pool_init(struct rxm_ep *rxm_ep) -{ - struct ofi_bufpool_attr attr = { - .size = sizeof(struct rxm_recv_entry), - .alignment = 16, - .max_cnt = 0, - .chunk_cnt = 16, - .alloc_fn = NULL, - .init_fn = NULL, - .context = rxm_ep, - .flags = OFI_BUFPOOL_NO_TRACK, - }; - - return ofi_bufpool_create_attr(&attr, &rxm_ep->multi_recv_pool); -} - -static int rxm_ep_rx_queue_init(struct rxm_ep *rxm_ep) -{ - int ret; - - ret = rxm_recv_queue_init(rxm_ep, &rxm_ep->recv_queue, - rxm_ep->rxm_info->rx_attr->size, - RXM_RECV_QUEUE_MSG); - if (ret) - return ret; - - ret = rxm_recv_queue_init(rxm_ep, &rxm_ep->trecv_queue, - rxm_ep->rxm_info->rx_attr->size, - RXM_RECV_QUEUE_TAGGED); - if (ret) - goto err_recv_tag; - - ret = rxm_multi_recv_pool_init(rxm_ep); - if (ret) - goto err_multi; - - return FI_SUCCESS; - -err_multi: - rxm_recv_queue_close(&rxm_ep->trecv_queue); -err_recv_tag: - rxm_recv_queue_close(&rxm_ep->recv_queue); - return ret; -} - /* It is safe to call this function, even if `rxm_ep_txrx_res_open` * has not yet been called */ static void rxm_ep_txrx_res_close(struct rxm_ep *ep) { - rxm_recv_queue_close(&ep->trecv_queue); - rxm_recv_queue_close(&ep->recv_queue); + if (ep->srx && ep->util_ep.ep_fid.msg != &rxm_no_recv_msg_ops) + (void) util_srx_close(&ep->srx->ep_fid.fid); - if (ep->multi_recv_pool) { - ofi_bufpool_destroy(ep->multi_recv_pool); - ep->multi_recv_pool = NULL; - } if (ep->rx_pool) { ofi_bufpool_destroy(ep->rx_pool); ep->rx_pool = NULL; @@ -362,6 +188,10 @@ static void rxm_ep_txrx_res_close(struct rxm_ep *ep) ofi_bufpool_destroy(ep->tx_pool); ep->tx_pool = NULL; } + if (ep->proto_info_pool) { + ofi_bufpool_destroy(ep->proto_info_pool); + ep->proto_info_pool = NULL; + } if (ep->coll_pool) { ofi_bufpool_destroy(ep->coll_pool); ep->coll_pool = NULL; @@ -420,53 +250,13 @@ static struct rxm_eager_ops coll_eager_ops = { .handle_rx = rxm_handle_coll_eager, }; -static bool rxm_ep_cancel_recv(struct rxm_ep *rxm_ep, - struct rxm_recv_queue *recv_queue, void *context) -{ - struct fi_cq_err_entry err_entry; - struct rxm_recv_entry *recv_entry; - struct dlist_entry *entry; - int ret; - - ofi_genlock_lock(&rxm_ep->util_ep.lock); - entry = dlist_remove_first_match(&recv_queue->recv_list, - rxm_match_recv_entry_context, - context); - if (!entry) - goto unlock; - - recv_entry = container_of(entry, struct rxm_recv_entry, entry); - memset(&err_entry, 0, sizeof(err_entry)); - err_entry.op_context = recv_entry->context; - err_entry.flags |= recv_entry->comp_flags; - err_entry.tag = recv_entry->tag; - err_entry.err = FI_ECANCELED; - err_entry.prov_errno = -FI_ECANCELED; - rxm_recv_entry_release(recv_entry); - ret = ofi_cq_write_error(rxm_ep->util_ep.rx_cq, &err_entry); - if (ret) { - FI_WARN(&rxm_prov, FI_LOG_CQ, "Error writing to CQ\n"); - assert(0); - } - -unlock: - ofi_genlock_unlock(&rxm_ep->util_ep.lock); - return entry != NULL; -} - static ssize_t rxm_ep_cancel(fid_t fid_ep, void *context) { struct rxm_ep *ep; ep = container_of(fid_ep, struct rxm_ep, util_ep.ep_fid); - if (rxm_passthru_info(ep->rxm_info)) - return fi_cancel(&ep->msg_srx->fid, context); - - if (!rxm_ep_cancel_recv(ep, &ep->trecv_queue, context)) - rxm_ep_cancel_recv(ep, &ep->recv_queue, context); - - return 0; + return ep->srx->ep_fid.ops->cancel(&ep->srx->ep_fid.fid, context); } static int rxm_ep_getopt(fid_t fid, int level, int optname, void *optval, @@ -480,10 +270,8 @@ static int rxm_ep_getopt(fid_t fid, int level, int optname, void *optval, switch (optname) { case FI_OPT_MIN_MULTI_RECV: - assert(sizeof(rxm_ep->min_multi_recv_size) == sizeof(size_t)); - *(size_t *)optval = rxm_ep->min_multi_recv_size; - *optlen = sizeof(size_t); - break; + return rxm_ep->srx->ep_fid.ops->getopt(&rxm_ep->srx->ep_fid.fid, + level, optname, optval, optlen); case FI_OPT_BUFFERED_MIN: assert(sizeof(rxm_ep->buffered_min) == sizeof(size_t)); *(size_t *)optval = rxm_ep->buffered_min; @@ -507,11 +295,8 @@ static int rxm_ep_setopt(fid_t fid, int level, int optname, switch (optname) { case FI_OPT_MIN_MULTI_RECV: - rxm_ep->min_multi_recv_size = *(size_t *)optval; - FI_INFO(&rxm_prov, FI_LOG_CORE, - "FI_OPT_MIN_MULTI_RECV set to %zu\n", - rxm_ep->min_multi_recv_size); - break; + return rxm_ep->srx->ep_fid.ops->setopt(&rxm_ep->srx->ep_fid.fid, + level, optname, optval, optlen); case FI_OPT_BUFFERED_MIN: if (rxm_ep->rx_pool) { FI_WARN(&rxm_prov, FI_LOG_EP_DATA, @@ -564,99 +349,6 @@ static struct fi_ops_ep rxm_ops_ep = { .tx_size_left = fi_no_tx_size_left, }; - -/* Caller must hold recv_queue->lock -- TODO which lock? */ -struct rxm_rx_buf * -rxm_get_unexp_msg(struct rxm_recv_queue *recv_queue, fi_addr_t addr, - uint64_t tag, uint64_t ignore) -{ - struct rxm_recv_match_attr match_attr; - struct dlist_entry *entry; - - if (dlist_empty(&recv_queue->unexp_msg_list)) - return NULL; - - match_attr.addr = addr; - match_attr.tag = tag; - match_attr.ignore = ignore; - - entry = dlist_find_first_match(&recv_queue->unexp_msg_list, - recv_queue->match_unexp, &match_attr); - if (!entry) - return NULL; - - RXM_DBG_ADDR_TAG(FI_LOG_EP_DATA, "Match for posted recv found in unexp" - " msg list\n", match_attr.addr, match_attr.tag); - - return container_of(entry, struct rxm_rx_buf, unexp_msg.entry); -} - -static void rxm_recv_entry_init_common(struct rxm_recv_entry *recv_entry, - const struct iovec *iov, void **desc, size_t count, - fi_addr_t src_addr, uint64_t tag, uint64_t ignore, - void *context, uint64_t flags, - struct rxm_recv_queue *recv_queue) -{ - size_t i; - - assert(!recv_entry->rndv.tx_buf); - recv_entry->rxm_iov.count = (uint8_t) count; - recv_entry->addr = src_addr; - recv_entry->context = context; - recv_entry->flags = flags; - recv_entry->ignore = ignore; - recv_entry->tag = tag; - - recv_entry->sar.msg_id = RXM_SAR_RX_INIT; - recv_entry->sar.total_recv_len = 0; - recv_entry->total_len = 0; - - for (i = 0; i < count; i++) { - recv_entry->rxm_iov.iov[i] = iov[i]; - recv_entry->total_len += iov[i].iov_len; - if (desc && desc[i]) - recv_entry->rxm_iov.desc[i] = desc[i]; - else - recv_entry->rxm_iov.desc[i] = NULL; - } -} - -struct rxm_recv_entry * -rxm_recv_entry_get(struct rxm_ep *rxm_ep, const struct iovec *iov, - void **desc, size_t count, fi_addr_t src_addr, - uint64_t tag, uint64_t ignore, void *context, - uint64_t flags, struct rxm_recv_queue *recv_queue) -{ - struct rxm_recv_entry *recv_entry; - - if (ofi_freestack_isempty(recv_queue->fs)) - return NULL; - - recv_entry = ofi_freestack_pop(recv_queue->fs); - - rxm_recv_entry_init_common(recv_entry, iov, desc, count, src_addr, tag, - ignore, context, flags, recv_queue); - - return recv_entry; -} - -struct rxm_recv_entry * -rxm_multi_recv_entry_get(struct rxm_ep *rxm_ep, const struct iovec *iov, - void **desc, size_t count, fi_addr_t src_addr, - uint64_t tag, uint64_t ignore, void *context, - uint64_t flags) -{ - struct rxm_recv_entry *recv_entry; - - recv_entry = ofi_buf_alloc(rxm_ep->multi_recv_pool); - - rxm_recv_entry_init_common(recv_entry, iov, desc, count, src_addr, tag, - ignore, context, flags, NULL); - - recv_entry->comp_flags = FI_MSG | FI_RECV; - return recv_entry; -} - struct rxm_tx_buf *rxm_get_tx_buf(struct rxm_ep *ep) { struct rxm_tx_buf *buf; @@ -820,6 +512,7 @@ void rxm_ep_progress_deferred_queue(struct rxm_ep *rxm_ep, struct rxm_conn *rxm_conn) { struct rxm_deferred_tx_entry *def_tx_entry; + struct rxm_proto_info *proto_info; struct iovec iov; struct fi_msg msg; ssize_t ret = 0; @@ -832,12 +525,11 @@ void rxm_ep_progress_deferred_queue(struct rxm_ep *rxm_ep, struct rxm_deferred_tx_entry, entry); switch (def_tx_entry->type) { case RXM_DEFERRED_TX_RNDV_ACK: + proto_info = def_tx_entry->rndv_ack.rx_buf->proto_info; ret = fi_send(def_tx_entry->rxm_conn->msg_ep, - &def_tx_entry->rndv_ack.rx_buf-> - recv_entry->rndv.tx_buf->pkt, + &proto_info->rndv.tx_buf->pkt, def_tx_entry->rndv_ack.pkt_size, - def_tx_entry->rndv_ack.rx_buf->recv_entry-> - rndv.tx_buf->hdr.desc, + proto_info->rndv.tx_buf->hdr.desc, 0, def_tx_entry->rndv_ack.rx_buf); if (ret) { if (ret == -FI_EAGAIN) @@ -845,11 +537,10 @@ void rxm_ep_progress_deferred_queue(struct rxm_ep *rxm_ep, rxm_cq_write_rx_error( def_tx_entry->rxm_ep, ofi_op_msg, def_tx_entry->rndv_ack.rx_buf-> - recv_entry->context, (int) ret); + peer_entry->context, (int) ret); } - if (def_tx_entry->rndv_ack.rx_buf->recv_entry->rndv - .tx_buf->pkt.ctrl_hdr - .type == rxm_ctrl_rndv_rd_done) + if (proto_info->rndv.tx_buf->pkt.ctrl_hdr.type == + rxm_ctrl_rndv_rd_done) RXM_UPDATE_STATE(FI_LOG_EP_DATA, def_tx_entry->rndv_ack.rx_buf, RXM_RNDV_READ_DONE_SENT); @@ -891,7 +582,7 @@ void rxm_ep_progress_deferred_queue(struct rxm_ep *rxm_ep, rxm_cq_write_rx_error( def_tx_entry->rxm_ep, ofi_op_msg, def_tx_entry->rndv_read.rx_buf-> - recv_entry->context, (int) ret); + peer_entry->context, (int) ret); } break; case RXM_DEFERRED_TX_RNDV_WRITE: @@ -944,7 +635,7 @@ void rxm_ep_progress_deferred_queue(struct rxm_ep *rxm_ep, def_tx_entry->rxm_ep, ofi_op_msg, def_tx_entry->rndv_read.rx_buf-> - recv_entry->context, + peer_entry->context, (int) ret); } return; @@ -1451,9 +1142,6 @@ static void rxm_ep_settings_init(struct rxm_ep *rxm_ep) sizeof(struct rxm_rndv_hdr))), rxm_buffer_size); - assert(!rxm_ep->min_multi_recv_size); - rxm_ep->min_multi_recv_size = rxm_buffer_size; - assert(!rxm_ep->buffered_limit); rxm_ep->buffered_limit = rxm_buffer_size; @@ -1465,13 +1153,11 @@ static void rxm_ep_settings_init(struct rxm_ep *rxm_ep) "\t\t MR local: MSG - %d, RxM - %d\n" "\t\t Completions per progress: MSG - %zu\n" "\t\t Buffered min: %zu\n" - "\t\t Min multi recv size: %zu\n" "\t\t inject size: %zu\n" "\t\t Protocol limits: Eager: %zu, SAR: %zu\n", rxm_ep->msg_mr_local, rxm_ep->rdm_mr_local, rxm_ep->comp_per_progress, rxm_ep->buffered_min, - rxm_ep->min_multi_recv_size, rxm_ep->inject_limit, - rxm_ep->eager_limit, rxm_ep->sar_limit); + rxm_ep->inject_limit, rxm_ep->eager_limit, rxm_ep->sar_limit); } static int rxm_ep_txrx_res_open(struct rxm_ep *rxm_ep) @@ -1484,19 +1170,7 @@ static int rxm_ep_txrx_res_open(struct rxm_ep *rxm_ep) dlist_init(&rxm_ep->deferred_queue); - ret = rxm_ep_rx_queue_init(rxm_ep); - if (ret) - goto err; - return FI_SUCCESS; -err: - ofi_bufpool_destroy(rxm_ep->coll_pool); - ofi_bufpool_destroy(rxm_ep->rx_pool); - ofi_bufpool_destroy(rxm_ep->tx_pool); - rxm_ep->coll_pool = NULL; - rxm_ep->rx_pool = NULL; - rxm_ep->tx_pool = NULL; - return ret; } static int rxm_ep_enable_check(struct rxm_ep *rxm_ep) @@ -1526,9 +1200,129 @@ static int rxm_ep_enable_check(struct rxm_ep *rxm_ep) return 0; } +static int rxm_unexp_start(struct fi_peer_rx_entry *rx_entry) +{ + struct rxm_rx_buf *rx_buf = rx_entry->peer_context; + + return rx_buf->pkt.ctrl_hdr.type == rxm_ctrl_seg ? + rxm_handle_unexp_sar(rx_entry): + rxm_handle_rx_buf(rx_buf); +} + +static int rxm_discard(struct fi_peer_rx_entry *rx_entry) +{ + struct rxm_rx_buf *rx_buf, *seg_rx; + + rx_buf = rx_entry->peer_context; + + if (rx_buf->pkt.ctrl_hdr.type == rxm_ctrl_seg) { + while (!dlist_empty(&rx_buf->proto_info->sar.pkt_list)) { + dlist_pop_front(&rx_buf->proto_info->sar.pkt_list, + struct rxm_rx_buf, seg_rx, unexp_entry); + rxm_free_rx_buf(seg_rx); + } + ofi_buf_free(rx_buf->proto_info); + } + + rxm_free_rx_buf(rx_buf); + return FI_SUCCESS; +} + +struct fi_ops_srx_peer rxm_srx_peer_ops = { + .size = sizeof(struct fi_ops_srx_peer), + .start_msg = rxm_unexp_start, + .start_tag = rxm_unexp_start, + .discard_msg = rxm_discard, + .discard_tag = rxm_discard, +}; + +static int rxm_srx_close(struct fid *fid) +{ + struct rxm_domain *domain = container_of(fid, struct rxm_domain, + rx_ep.fid); + + ofi_atomic_dec32(&domain->util_domain.ref); + + return FI_SUCCESS; +} + +static struct fi_ops rxm_srx_fi_ops = { + .size = sizeof(struct fi_ops), + .close = rxm_srx_close, + .bind = fi_no_bind, + .control = fi_no_control, + .ops_open = fi_no_ops_open, +}; + +static struct fi_ops_msg rxm_srx_msg_ops = { + .size = sizeof(struct fi_ops_msg), + .recv = fi_no_msg_recv, + .recvv = fi_no_msg_recvv, + .recvmsg = fi_no_msg_recvmsg, + .send = fi_no_msg_send, + .sendv = fi_no_msg_sendv, + .sendmsg = fi_no_msg_sendmsg, + .inject = fi_no_msg_inject, + .senddata = fi_no_msg_senddata, + .injectdata = fi_no_msg_injectdata, +}; + +static struct fi_ops_tagged rxm_srx_tagged_ops = { + .size = sizeof(struct fi_ops_msg), + .recv = fi_no_tagged_recv, + .recvv = fi_no_tagged_recvv, + .recvmsg = fi_no_tagged_recvmsg, + .send = fi_no_tagged_send, + .sendv = fi_no_tagged_sendv, + .sendmsg = fi_no_tagged_sendmsg, + .inject = fi_no_tagged_inject, + .senddata = fi_no_tagged_senddata, + .injectdata = fi_no_tagged_injectdata, +}; + +int rxm_srx_context(struct fid_domain *domain, struct fi_rx_attr *attr, + struct fid_ep **rx_ep, void *context) +{ + struct rxm_domain *rxm_domain; + + if (!(attr->op_flags & FI_PEER)) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, + "shared srx only supported with FI_PEER flag\n"); + return -FI_EINVAL; + } + + rxm_domain = container_of(domain, struct rxm_domain, + util_domain.domain_fid); + + if (rxm_domain->srx) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, + "Peer SRX context already imported\n"); + return -FI_EINVAL; + } + + rxm_domain->srx = ((struct fi_peer_srx_context *) + (context))->srx; + rxm_domain->srx->peer_ops = &rxm_srx_peer_ops; + rxm_domain->rx_ep.msg = &rxm_srx_msg_ops; + rxm_domain->rx_ep.tagged = &rxm_srx_tagged_ops; + rxm_domain->rx_ep.fid.ops = &rxm_srx_fi_ops; + rxm_domain->rx_ep.fid.fclass = FI_CLASS_SRX_CTX; + *rx_ep = &rxm_domain->rx_ep; + ofi_atomic_inc32(&rxm_domain->util_domain.ref); + + return FI_SUCCESS; +} + +static void rxm_update(struct util_srx_ctx *srx, struct util_rx_entry *rx_entry) +{ + //no update needed +} + static int rxm_ep_ctrl(struct fid *fid, int command, void *arg) { struct rxm_ep *ep; + struct rxm_domain *domain; + struct fid_ep *srx; int ret; ep = container_of(fid, struct rxm_ep, util_ep.ep_fid.fid); @@ -1564,6 +1358,32 @@ static int rxm_ep_ctrl(struct fid *fid, int command, void *arg) if (ret) return ret; + if (!ep->srx) { + domain = container_of(ep->util_ep.domain, + struct rxm_domain, + util_domain.domain_fid); + ret = util_ep_srx_context(&domain->util_domain, + ep->rxm_info->rx_attr->size, + RXM_IOV_LIMIT, rxm_buffer_size, + &rxm_update, &ep->util_ep.lock, + &srx); + if (ret) + return ret; + + ep->srx = container_of(srx, struct fid_peer_srx, + ep_fid.fid); + ep->srx->peer_ops = &rxm_srx_peer_ops; + + ret = util_srx_bind(&ep->srx->ep_fid.fid, + &ep->util_ep.rx_cq->cq_fid.fid, + FI_RECV); + if (ret) + return ret; + } else { + ep->util_ep.ep_fid.msg = &rxm_no_recv_msg_ops; + ep->util_ep.ep_fid.tagged = &rxm_no_recv_tagged_ops; + } + if (ep->msg_srx && !rxm_passthru_info(ep->rxm_info)) { ret = rxm_prepost_recv(ep, ep->msg_srx); if (ret) @@ -1592,10 +1412,21 @@ static int rxm_ep_bind(struct fid *ep_fid, struct fid *bfid, uint64_t flags) struct rxm_av *rxm_av; struct rxm_cq *rxm_cq; struct rxm_eq *rxm_eq; - int ret, retv = 0; + int ret; rxm_ep = container_of(ep_fid, struct rxm_ep, util_ep.ep_fid.fid); + if (bfid->fclass == FI_CLASS_SRX_CTX) { + if (rxm_ep->srx) { + FI_WARN(&rxm_prov, FI_LOG_EP_CTRL, + "SRX context already bound to EP\n"); + return -FI_EINVAL; + } + rxm_ep->srx = + (container_of(bfid, struct rxm_domain, rx_ep.fid))->srx; + return FI_SUCCESS; + } + ret = ofi_ep_bind(&rxm_ep->util_ep, bfid, flags); if (ret) return ret; @@ -1608,14 +1439,14 @@ static int rxm_ep_bind(struct fid *ep_fid, struct fid *bfid, uint64_t flags) &rxm_av->util_coll_av->fid, flags); if (ret) - retv = ret; + return ret; } if (rxm_ep->offload_coll_ep && rxm_av->offload_coll_av) { ret = ofi_ep_fid_bind(&rxm_ep->offload_coll_ep->fid, &rxm_av->offload_coll_av->fid, flags); if (ret) - retv = ret; + return ret; } break; @@ -1626,14 +1457,14 @@ static int rxm_ep_bind(struct fid *ep_fid, struct fid *bfid, uint64_t flags) &rxm_cq->util_coll_cq->fid, flags); if (ret) - retv = ret; + return ret; } if (rxm_ep->offload_coll_ep && rxm_cq->offload_coll_cq) { ret = ofi_ep_fid_bind(&rxm_ep->offload_coll_ep->fid, &rxm_cq->offload_coll_cq->fid, flags); if (ret) - retv = ret; + return ret; } break; @@ -1644,19 +1475,18 @@ static int rxm_ep_bind(struct fid *ep_fid, struct fid *bfid, uint64_t flags) &rxm_eq->util_coll_eq->fid, flags); if (ret) - retv = ret; + return ret; } if (rxm_ep->offload_coll_ep && rxm_eq->offload_coll_eq) { ret = ofi_ep_fid_bind(&rxm_ep->offload_coll_ep->fid, &rxm_eq->offload_coll_eq->fid, flags); if (ret) - retv = ret; + return ret; } - break; } - return retv; + return FI_SUCCESS; } static struct fi_ops rxm_ep_fi_ops = { diff --git a/prov/rxm/src/rxm_init.c b/prov/rxm/src/rxm_init.c index 1a76796d4e0..10a7ae535d7 100644 --- a/prov/rxm/src/rxm_init.c +++ b/prov/rxm/src/rxm_init.c @@ -262,8 +262,8 @@ int rxm_info_to_core(uint32_t version, const struct fi_info *hints, core_info->rx_attr->op_flags &= ~FI_MULTI_RECV; - core_info->domain_attr->caps &= ~(FI_AV_USER_ID); - core_info->caps &= ~(FI_AV_USER_ID); + core_info->domain_attr->caps &= ~(FI_AV_USER_ID | FI_PEER); + core_info->caps &= ~(FI_AV_USER_ID | FI_PEER); return 0; } diff --git a/prov/rxm/src/rxm_msg.c b/prov/rxm/src/rxm_msg.c index 3b9088a2858..5d48e88e53a 100644 --- a/prov/rxm/src/rxm_msg.c +++ b/prov/rxm/src/rxm_msg.c @@ -40,214 +40,16 @@ #include "rxm.h" - -ssize_t rxm_handle_unexp_sar(struct rxm_recv_queue *recv_queue, - struct rxm_recv_entry *recv_entry, - struct rxm_rx_buf *rx_buf) -{ - struct rxm_recv_match_attr match_attr; - struct dlist_entry *entry; - bool last; - ssize_t ret; - - ret = rxm_handle_rx_buf(rx_buf); - last = rxm_sar_get_seg_type(&rx_buf->pkt.ctrl_hdr) == RXM_SAR_SEG_LAST; - if (ret || last) - return ret; - - match_attr.addr = recv_entry->addr; - match_attr.tag = recv_entry->tag; - match_attr.ignore = recv_entry->ignore; - - dlist_foreach_container_safe(&recv_queue->unexp_msg_list, - struct rxm_rx_buf, rx_buf, - unexp_msg.entry, entry) { - if (!recv_queue->match_unexp(&rx_buf->unexp_msg.entry, - &match_attr)) - continue; - /* Handle unordered completions from MSG provider */ - if ((rx_buf->pkt.ctrl_hdr.msg_id != recv_entry->sar.msg_id) || - ((rx_buf->pkt.ctrl_hdr.type != rxm_ctrl_seg))) - continue; - - if (!rx_buf->conn) { - rx_buf->conn = ofi_idm_at(&rx_buf->ep->conn_idx_map, - (int) rx_buf->pkt.ctrl_hdr.conn_id); - } - if (recv_entry->sar.conn != rx_buf->conn) - continue; - rx_buf->recv_entry = recv_entry; - dlist_remove(&rx_buf->unexp_msg.entry); - last = rxm_sar_get_seg_type(&rx_buf->pkt.ctrl_hdr) == - RXM_SAR_SEG_LAST; - ret = rxm_handle_rx_buf(rx_buf); - if (ret || last) - break; - } - return ret; -} - -/* - * We don't expect to have unexpected messages when the app is using - * multi-recv buffers. Optimize for that case. - * - * If there are unexpected messages waiting when we post a mult-recv buffer, - * we trim off the start of the buffer, treat it as a normal buffer, and pair - * it with an unexpected message. We continue doing this until either no - * unexpected messages are left or the multi-recv buffer has been consumed. - */ -static ssize_t -rxm_post_mrecv(struct rxm_ep *ep, const struct iovec *iov, - void **desc, void *context, uint64_t op_flags) -{ - struct rxm_recv_entry *recv_entry; - struct rxm_rx_buf *rx_buf; - struct iovec cur_iov = *iov; - ssize_t ret; - - do { - recv_entry = rxm_recv_entry_get(ep, &cur_iov, desc, 1, - FI_ADDR_UNSPEC, 0, 0, context, - op_flags, &ep->recv_queue); - if (!recv_entry) { - ret = -FI_ENOMEM; - break; - } - - rx_buf = rxm_get_unexp_msg(&ep->recv_queue, recv_entry->addr, 0, 0); - if (!rx_buf) { - dlist_insert_tail(&recv_entry->entry, - &ep->recv_queue.recv_list); - return 0; - } - - dlist_remove(&rx_buf->unexp_msg.entry); - rx_buf->recv_entry = recv_entry; - recv_entry->flags &= ~FI_MULTI_RECV; - recv_entry->total_len = MIN(cur_iov.iov_len, rx_buf->pkt.hdr.size); - recv_entry->rxm_iov.iov[0].iov_len = recv_entry->total_len; - - cur_iov.iov_base = (uint8_t *) cur_iov.iov_base + recv_entry->total_len; - cur_iov.iov_len -= recv_entry->total_len; - - if (rx_buf->pkt.ctrl_hdr.type != rxm_ctrl_seg) - ret = rxm_handle_rx_buf(rx_buf); - else - ret = rxm_handle_unexp_sar(&ep->recv_queue, recv_entry, - rx_buf); - - } while (!ret && cur_iov.iov_len >= ep->min_multi_recv_size); - - if ((cur_iov.iov_len < ep->min_multi_recv_size) || - (ret && cur_iov.iov_len != iov->iov_len)) { - ofi_peer_cq_write(ep->util_ep.rx_cq, context, FI_MULTI_RECV, - 0, NULL, 0, 0, FI_ADDR_NOTAVAIL); - } - - return ret; -} - -static ssize_t -rxm_recv_common(struct rxm_ep *rxm_ep, const struct iovec *iov, - void **desc, size_t count, fi_addr_t src_addr, - void *context, uint64_t op_flags) -{ - struct rxm_recv_entry *recv_entry; - struct rxm_rx_buf *rx_buf; - ssize_t ret; - - assert(rxm_ep->util_ep.rx_cq); - assert(count <= rxm_ep->rxm_info->rx_attr->iov_limit); - - ofi_genlock_lock(&rxm_ep->util_ep.lock); - if (op_flags & FI_MULTI_RECV) { - ret = rxm_post_mrecv(rxm_ep, iov, desc, context, op_flags); - goto release; - } - - recv_entry = rxm_recv_entry_get(rxm_ep, iov, desc, count, src_addr, - 0, 0, context, op_flags, - &rxm_ep->recv_queue); - if (!recv_entry) { - ret = -FI_EAGAIN; - goto release; - } - - rx_buf = rxm_get_unexp_msg(&rxm_ep->recv_queue, recv_entry->addr, 0, 0); - if (!rx_buf) { - dlist_insert_tail(&recv_entry->entry, - &rxm_ep->recv_queue.recv_list); - ret = FI_SUCCESS; - goto release; - } - - dlist_remove(&rx_buf->unexp_msg.entry); - rx_buf->recv_entry = recv_entry; - - ret = (rx_buf->pkt.ctrl_hdr.type != rxm_ctrl_seg) ? - rxm_handle_rx_buf(rx_buf) : - rxm_handle_unexp_sar(&rxm_ep->recv_queue, recv_entry, rx_buf); - -release: - ofi_genlock_unlock(&rxm_ep->util_ep.lock); - return ret; -} - -static ssize_t -rxm_buf_recv(struct rxm_ep *rxm_ep, const struct iovec *iov, - void **desc, size_t count, fi_addr_t src_addr, - void *context, uint64_t flags) -{ - struct rxm_recv_entry *recv_entry; - struct fi_recv_context *recv_ctx = context; - struct rxm_rx_buf *rx_buf; - ssize_t ret = 0; - - context = recv_ctx->context; - rx_buf = container_of(recv_ctx, struct rxm_rx_buf, recv_context); - - ofi_genlock_lock(&rxm_ep->util_ep.lock); - if (flags & FI_CLAIM) { - FI_DBG(&rxm_prov, FI_LOG_EP_DATA, - "Claiming buffered receive\n"); - - recv_entry = rxm_recv_entry_get(rxm_ep, iov, desc, count, - src_addr, 0, 0, context, - flags, &rxm_ep->recv_queue); - if (!recv_entry) { - ret = -FI_EAGAIN; - goto unlock; - } - - recv_entry->comp_flags |= FI_CLAIM; - - rx_buf->recv_entry = recv_entry; - ret = rxm_handle_rx_buf(rx_buf); - } else { - assert(flags & FI_DISCARD); - FI_DBG(&rxm_prov, FI_LOG_EP_DATA, - "Discarding buffered receive\n"); - rxm_free_rx_buf(rx_buf); - } -unlock: - ofi_genlock_unlock(&rxm_ep->util_ep.lock); - return ret; -} - static ssize_t rxm_recvmsg(struct fid_ep *ep_fid, const struct fi_msg *msg, uint64_t flags) { struct rxm_ep *rxm_ep = container_of(ep_fid, struct rxm_ep, util_ep.ep_fid.fid); - if (rxm_ep->rxm_info->mode & OFI_BUFFERED_RECV) - return rxm_buf_recv(rxm_ep, msg->msg_iov, msg->desc, - msg->iov_count, msg->addr, msg->context, - flags | rxm_ep->util_ep.rx_msg_flags); - - return rxm_recv_common(rxm_ep, msg->msg_iov, msg->desc, - msg->iov_count, msg->addr, msg->context, - flags | rxm_ep->util_ep.rx_msg_flags); + return util_srx_generic_recv(&rxm_ep->srx->ep_fid, msg->msg_iov, + msg->desc, msg->iov_count, msg->addr, + msg->context, + flags | rxm_ep->util_ep.rx_msg_flags); } @@ -262,8 +64,9 @@ rxm_recv(struct fid_ep *ep_fid, void *buf, size_t len, .iov_len = len, }; - return rxm_recv_common(rxm_ep, &iov, &desc, 1, src_addr, - context, rxm_ep->util_ep.rx_op_flags); + return util_srx_generic_recv(&rxm_ep->srx->ep_fid, &iov, &desc, 1, + src_addr, context, + rxm_ep->util_ep.rx_op_flags); } static ssize_t @@ -273,8 +76,9 @@ rxm_recvv(struct fid_ep *ep_fid, const struct iovec *iov, struct rxm_ep *rxm_ep = container_of(ep_fid, struct rxm_ep, util_ep.ep_fid.fid); - return rxm_recv_common(rxm_ep, iov, desc, count, src_addr, - context, rxm_ep->util_ep.rx_op_flags); + return util_srx_generic_recv(&rxm_ep->srx->ep_fid, iov, desc, count, + src_addr, context, + rxm_ep->util_ep.rx_op_flags); } static ssize_t @@ -661,15 +465,13 @@ rxm_send_eager(struct rxm_ep *rxm_ep, struct rxm_conn *rxm_conn, eager_buf->app_context = context; eager_buf->flags = flags; + rxm_ep_format_tx_buf_pkt(rxm_conn, data_len, op, data, tag, + flags, &eager_buf->pkt); if (rxm_use_direct_send(rxm_ep, count, flags)) { - rxm_ep_format_tx_buf_pkt(rxm_conn, data_len, op, data, tag, - flags, &eager_buf->pkt); ret = rxm_direct_send(rxm_ep, rxm_conn, eager_buf, iov, desc, count); } else { - rxm_ep_format_tx_buf_pkt(rxm_conn, data_len, op, data, tag, - flags, &eager_buf->pkt); ret = rxm_copy_from_hmem_iov(desc, eager_buf->pkt.data, eager_buf->pkt.hdr.size, iov, count, 0); @@ -891,6 +693,19 @@ struct fi_ops_msg rxm_msg_ops = { .injectdata = rxm_injectdata, }; +struct fi_ops_msg rxm_no_recv_msg_ops = { + .size = sizeof(struct fi_ops_msg), + .recv = fi_no_msg_recv, + .recvv = fi_no_msg_recvv, + .recvmsg = fi_no_msg_recvmsg, + .send = rxm_send, + .sendv = rxm_sendv, + .sendmsg = rxm_sendmsg, + .inject = rxm_inject, + .senddata = rxm_senddata, + .injectdata = rxm_injectdata, +}; + static ssize_t rxm_recv_thru(struct fid_ep *ep_fid, void *buf, size_t len, void *desc, fi_addr_t src_addr, void *context) diff --git a/prov/rxm/src/rxm_tagged.c b/prov/rxm/src/rxm_tagged.c index 8f18f34b3eb..1276bac0ba3 100644 --- a/prov/rxm/src/rxm_tagged.c +++ b/prov/rxm/src/rxm_tagged.c @@ -43,189 +43,21 @@ #include "rxm.h" -static void -rxm_discard_recv(struct rxm_ep *rxm_ep, struct rxm_rx_buf *rx_buf, - void *context) -{ - RXM_DBG_ADDR_TAG(FI_LOG_EP_DATA, "Discarding message", - rx_buf->unexp_msg.addr, rx_buf->unexp_msg.tag); - - ofi_peer_cq_write(rxm_ep->util_ep.rx_cq, context, FI_TAGGED | FI_RECV, - 0, NULL, rx_buf->pkt.hdr.data, - rx_buf->pkt.hdr.tag, FI_ADDR_NOTAVAIL); - rxm_free_rx_buf(rx_buf); -} - -static void -rxm_peek_recv(struct rxm_ep *rxm_ep, fi_addr_t addr, uint64_t tag, - uint64_t ignore, void *context, uint64_t flags, - struct rxm_recv_queue *recv_queue) -{ - struct rxm_rx_buf *rx_buf; - int ret; - - RXM_DBG_ADDR_TAG(FI_LOG_EP_DATA, "Peeking message", addr, tag); - - /* peek doesn't support peer transfer at this moment */ - assert(!(flags & FI_PEER_TRANSFER)); - - rxm_ep_do_progress(&rxm_ep->util_ep); - - rx_buf = rxm_get_unexp_msg(recv_queue, addr, tag, ignore); - if (!rx_buf) { - FI_DBG(&rxm_prov, FI_LOG_EP_DATA, "Message not found\n"); - ret = ofi_peer_cq_write_error_peek( - rxm_ep->util_ep.rx_cq, tag, context); - if (ret) - FI_WARN(&rxm_prov, FI_LOG_CQ, "Error writing to CQ\n"); - return; - } - - FI_DBG(&rxm_prov, FI_LOG_EP_DATA, "Message found\n"); - - if (flags & FI_DISCARD) { - dlist_remove(&rx_buf->unexp_msg.entry); - rxm_discard_recv(rxm_ep, rx_buf, context); - return; - } - - if (flags & FI_CLAIM) { - FI_DBG(&rxm_prov, FI_LOG_EP_DATA, "Marking message for Claim\n"); - ((struct fi_context *)context)->internal[0] = rx_buf; - dlist_remove(&rx_buf->unexp_msg.entry); - } - - ofi_peer_cq_write(rxm_ep->util_ep.rx_cq, context, FI_TAGGED | FI_RECV, - rx_buf->pkt.hdr.size, NULL, rx_buf->pkt.hdr.data, - rx_buf->pkt.hdr.tag, FI_ADDR_NOTAVAIL); -} - -static ssize_t -rxm_post_trecv(struct rxm_ep *rxm_ep, const struct iovec *iov, - void **desc, size_t count, fi_addr_t src_addr, - uint64_t tag, uint64_t ignore, void *context, uint64_t op_flags) -{ - struct rxm_recv_entry *recv_entry; - struct rxm_rx_buf *rx_buf; - - assert(count <= rxm_ep->rxm_info->rx_attr->iov_limit); - - recv_entry = rxm_recv_entry_get(rxm_ep, iov, desc, count, src_addr, - tag, ignore, context, op_flags, - &rxm_ep->trecv_queue); - if (!recv_entry) - return -FI_EAGAIN; - - rx_buf = rxm_get_unexp_msg(&rxm_ep->trecv_queue, recv_entry->addr, - recv_entry->tag, recv_entry->ignore); - if (!rx_buf) { - dlist_insert_tail(&recv_entry->entry, - &rxm_ep->trecv_queue.recv_list); - return FI_SUCCESS; - } - - dlist_remove(&rx_buf->unexp_msg.entry); - rx_buf->recv_entry = recv_entry; - - if (rx_buf->pkt.ctrl_hdr.type != rxm_ctrl_seg) - return rxm_handle_rx_buf(rx_buf); - else - return rxm_handle_unexp_sar(&rxm_ep->trecv_queue, recv_entry, - rx_buf); -} - -static ssize_t -rxm_trecv_common(struct rxm_ep *rxm_ep, const struct iovec *iov, - void **desc, size_t count, fi_addr_t src_addr, - uint64_t tag, uint64_t ignore, void *context, - uint64_t op_flags) -{ - ssize_t ret; - - if (op_flags & FI_PEER_TRANSFER) - tag |= RXM_PEER_XFER_TAG_FLAG; - - ofi_genlock_lock(&rxm_ep->util_ep.lock); - ret = rxm_post_trecv(rxm_ep, iov, desc, count, src_addr, - tag, ignore, context, op_flags); - ofi_genlock_unlock(&rxm_ep->util_ep.lock); - return ret; -} - static ssize_t rxm_trecvmsg(struct fid_ep *ep_fid, const struct fi_msg_tagged *msg, uint64_t flags) { - struct rxm_ep *rxm_ep; - struct rxm_recv_entry *recv_entry; - struct fi_recv_context *recv_ctx; - struct rxm_rx_buf *rx_buf; - void *context = msg->context; - ssize_t ret = 0; - - rxm_ep = container_of(ep_fid, struct rxm_ep, util_ep.ep_fid.fid); - flags |= rxm_ep->util_ep.rx_msg_flags; - - if (!(flags & (FI_CLAIM | FI_PEEK)) && - !(rxm_ep->rxm_info->mode & OFI_BUFFERED_RECV)) { - return rxm_trecv_common(rxm_ep, msg->msg_iov, msg->desc, - msg->iov_count, msg->addr, - msg->tag, msg->ignore, context, flags); - } - - ofi_genlock_lock(&rxm_ep->util_ep.lock); - if (rxm_ep->rxm_info->mode & OFI_BUFFERED_RECV) { - recv_ctx = msg->context; - context = recv_ctx->context; - rx_buf = container_of(recv_ctx, struct rxm_rx_buf, recv_context); - - if (flags & FI_CLAIM) { - FI_DBG(&rxm_prov, FI_LOG_EP_DATA, - "Claiming buffered receive\n"); - goto claim; - } - - assert(flags & FI_DISCARD); - FI_DBG(&rxm_prov, FI_LOG_EP_DATA, "Discarding buffered receive\n"); - rxm_free_rx_buf(rx_buf); - goto unlock; - } - - if (flags & FI_PEEK) { - rxm_peek_recv(rxm_ep, msg->addr, msg->tag, msg->ignore, - context, flags, &rxm_ep->trecv_queue); - goto unlock; - } - - rx_buf = ((struct fi_context *) context)->internal[0]; - assert(rx_buf); - FI_DBG(&rxm_prov, FI_LOG_EP_DATA, "Claim message\n"); - - if (flags & FI_DISCARD) { - rxm_discard_recv(rxm_ep, rx_buf, context); - goto unlock; - } - -claim: - assert (flags & FI_CLAIM); - recv_entry = rxm_recv_entry_get(rxm_ep, msg->msg_iov, msg->desc, - msg->iov_count, msg->addr, - msg->tag, msg->ignore, context, flags, - &rxm_ep->trecv_queue); - if (!recv_entry) { - ret = -FI_EAGAIN; - goto unlock; - } - - if (rxm_ep->rxm_info->mode & OFI_BUFFERED_RECV) - recv_entry->comp_flags |= FI_CLAIM; + uint64_t tag = msg->tag; + struct rxm_ep *rxm_ep = container_of(ep_fid, struct rxm_ep, + util_ep.ep_fid.fid); - rx_buf->recv_entry = recv_entry; - ret = rxm_handle_rx_buf(rx_buf); + if (flags & FI_PEER_TRANSFER) + tag |= RXM_PEER_XFER_TAG_FLAG; -unlock: - ofi_genlock_unlock(&rxm_ep->util_ep.lock); - return ret; + return util_srx_generic_trecv(&rxm_ep->srx->ep_fid, msg->msg_iov, + msg->desc, msg->iov_count, msg->addr, + msg->context, tag, msg->ignore, + flags | rxm_ep->util_ep.rx_msg_flags); } static ssize_t @@ -240,8 +72,9 @@ rxm_trecv(struct fid_ep *ep_fid, void *buf, size_t len, }; rxm_ep = container_of(ep_fid, struct rxm_ep, util_ep.ep_fid.fid); - return rxm_trecv_common(rxm_ep, &iov, &desc, 1, src_addr, tag, ignore, - context, rxm_ep->util_ep.rx_op_flags); + return util_srx_generic_trecv(&rxm_ep->srx->ep_fid, &iov, &desc, 1, + src_addr, context, tag, ignore, + rxm_ep->util_ep.rx_op_flags); } static ssize_t @@ -252,8 +85,9 @@ rxm_trecvv(struct fid_ep *ep_fid, const struct iovec *iov, struct rxm_ep *rxm_ep; rxm_ep = container_of(ep_fid, struct rxm_ep, util_ep.ep_fid.fid); - return rxm_trecv_common(rxm_ep, iov, desc, count, src_addr, tag, - ignore, context, rxm_ep->util_ep.rx_op_flags); + return util_srx_generic_trecv(&rxm_ep->srx->ep_fid, iov, desc, count, + src_addr, context, tag, ignore, + rxm_ep->util_ep.rx_op_flags); } static ssize_t @@ -372,7 +206,7 @@ rxm_tsenddata(struct fid_ep *ep_fid, const void *buf, size_t len, ret = rxm_send_common(rxm_ep, rxm_conn, &iov, &desc, 1, context, data, rxm_ep->util_ep.tx_op_flags | FI_REMOTE_CQ_DATA, - tag, ofi_op_tagged); + tag, ofi_op_tagged); unlock: ofi_genlock_unlock(&rxm_ep->util_ep.lock); return ret; @@ -416,6 +250,18 @@ struct fi_ops_tagged rxm_tagged_ops = { .injectdata = rxm_tinjectdata, }; +struct fi_ops_tagged rxm_no_recv_tagged_ops = { + .size = sizeof(struct fi_ops_tagged), + .recv = fi_no_tagged_recv, + .recvv = fi_no_tagged_recvv, + .recvmsg = fi_no_tagged_recvmsg, + .send = rxm_tsend, + .sendv = rxm_tsendv, + .sendmsg = rxm_tsendmsg, + .inject = rxm_tinject, + .senddata = rxm_tsenddata, + .injectdata = rxm_tinjectdata, +}; static ssize_t rxm_trecv_thru(struct fid_ep *ep_fid, void *buf, size_t len, diff --git a/prov/tcp/src/xnet_av.c b/prov/tcp/src/xnet_av.c index 14b82ccdafd..80b18f2a568 100644 --- a/prov/tcp/src/xnet_av.c +++ b/prov/tcp/src/xnet_av.c @@ -38,7 +38,7 @@ int xnet_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, struct fid_av **fid_av, void *context) { return rxm_util_av_open(domain_fid, attr, fid_av, context, - sizeof(struct xnet_conn), NULL); + sizeof(struct xnet_conn), NULL, NULL); } static int xnet_mplex_av_remove(struct fid_av *av_fid, fi_addr_t *fi_addr, diff --git a/prov/util/src/rxm_av.c b/prov/util/src/rxm_av.c index a5e30c95026..beb11d0620c 100644 --- a/prov/util/src/rxm_av.c +++ b/prov/util/src/rxm_av.c @@ -281,6 +281,8 @@ static int rxm_av_insert(struct fid_av *av_fid, const void *addr, size_t count, { struct rxm_av *av; fi_addr_t *user_ids = NULL; + struct dlist_entry *av_entry; + struct util_ep *util_ep; int ret; if (flags & FI_AV_USER_ID) { @@ -303,6 +305,14 @@ static int rxm_av_insert(struct fid_av *av_fid, const void *addr, size_t count, goto out; } + if (!av->foreach_ep) + goto out; + + dlist_foreach(&av->util_av.ep_list, av_entry) { + util_ep = container_of(av_entry, struct util_ep, av_entry); + av->foreach_ep(&av->util_av, util_ep); + } + out: free(user_ids); if (ret) @@ -420,7 +430,9 @@ static struct fi_ops_av rxm_av_ops = { int rxm_util_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, struct fid_av **fid_av, void *context, size_t conn_size, void (*remove_handler)(struct util_ep *util_ep, - struct util_peer_addr *peer)) + struct util_peer_addr *peer), + void (*foreach_ep)(struct util_av *av, struct util_ep *ep)) + { struct util_domain *domain; struct util_av_attr util_attr; @@ -457,6 +469,7 @@ int rxm_util_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr, av->util_av.av_fid.fid.ops = &rxm_av_fi_ops; av->util_av.av_fid.ops = &rxm_av_ops; av->util_av.remove_handler = remove_handler; + av->foreach_ep = foreach_ep; *fid_av = &av->util_av.av_fid; return 0;