From de410e4d93145eb455dbaeb043147846bf351804 Mon Sep 17 00:00:00 2001 From: Alexia Ingerson Date: Mon, 21 Oct 2024 15:40:44 -0700 Subject: [PATCH] prov/rxm: add rxm support for using a peer CQs and counters Support using the peer APIs by default using the util peer helper functions. Instead of going through the rxm-specific functions to write to CQs and counters, use the ofi_peer_cq/cntr APIs which use the owner ops. In the default case where rxm is not being used as a peer these will go to the regular ofi_cq_write functions. Signed-off-by: Alexia Ingerson --- prov/rxm/src/rxm.h | 77 +-------------- prov/rxm/src/rxm_cq.c | 200 +++++++++++++++++++++----------------- prov/rxm/src/rxm_ep.c | 44 +++++---- prov/rxm/src/rxm_msg.c | 4 +- prov/rxm/src/rxm_tagged.c | 15 +-- 5 files changed, 146 insertions(+), 194 deletions(-) diff --git a/prov/rxm/src/rxm.h b/prov/rxm/src/rxm.h index e2759d6d077..5d18f16e157 100644 --- a/prov/rxm/src/rxm.h +++ b/prov/rxm/src/rxm.h @@ -759,9 +759,10 @@ ssize_t rxm_handle_rx_buf(struct rxm_rx_buf *rx_buf); int rxm_endpoint(struct fid_domain *domain, struct fi_info *info, struct fid_ep **ep, void *context); - -void rxm_cq_write_error(struct util_cq *cq, struct util_cntr *cntr, - void *op_context, int err); +void rxm_cq_write_tx_error(struct rxm_ep *rxm_ep, uint8_t op, void *op_context, + int err); +void rxm_cq_write_rx_error(struct rxm_ep *rxm_ep, uint8_t op, void *op_context, + int err); void rxm_cq_write_error_all(struct rxm_ep *rxm_ep, int err); void rxm_handle_comp_error(struct rxm_ep *rxm_ep); ssize_t rxm_handle_comp(struct rxm_ep *rxm_ep, struct fi_cq_data_entry *comp); @@ -880,50 +881,6 @@ int rxm_msg_mr_reg_internal(struct rxm_domain *rxm_domain, const void *buf, size_t len, uint64_t acs, uint64_t flags, struct fid_mr **mr); -static inline void rxm_cntr_incerr(struct util_cntr *cntr) -{ - if (cntr) - cntr->cntr_fid.ops->adderr(&cntr->cntr_fid, 1); -} - -static inline void -rxm_cq_write(struct util_cq *cq, void *context, uint64_t flags, size_t len, - void *buf, uint64_t data, uint64_t tag) -{ - int ret; - - FI_DBG(&rxm_prov, FI_LOG_CQ, "Reporting %s completion\n", - fi_tostr((void *) &flags, FI_TYPE_CQ_EVENT_FLAGS)); - - ret = ofi_cq_write(cq, context, flags, len, buf, data, tag); - if (ret) { - FI_WARN(&rxm_prov, FI_LOG_CQ, - "Unable to report completion\n"); - assert(0); - } - if (cq->wait) - cq->wait->signal(cq->wait); -} - -static inline void -rxm_cq_write_src(struct util_cq *cq, void *context, uint64_t flags, size_t len, - void *buf, uint64_t data, uint64_t tag, fi_addr_t addr) -{ - int ret; - - FI_DBG(&rxm_prov, FI_LOG_CQ, "Reporting %s completion\n", - fi_tostr((void *) &flags, FI_TYPE_CQ_EVENT_FLAGS)); - - ret = ofi_cq_write_src(cq, context, flags, len, buf, data, tag, addr); - if (ret) { - FI_WARN(&rxm_prov, FI_LOG_CQ, - "Unable to report completion\n"); - assert(0); - } - if (cq->wait) - cq->wait->signal(cq->wait); -} - ssize_t rxm_get_conn(struct rxm_ep *rxm_ep, fi_addr_t addr, struct rxm_conn **rxm_conn); @@ -998,32 +955,6 @@ rxm_recv_entry_release(struct rxm_recv_entry *entry) ofi_buf_free(entry); } -static inline void -rxm_cq_write_recv_comp(struct rxm_rx_buf *rx_buf, void *context, uint64_t flags, - size_t len, char *buf) -{ - if (rx_buf->ep->util_coll_peer_xfer_ops && - rx_buf->pkt.hdr.tag & RXM_PEER_XFER_TAG_FLAG) { - struct fi_cq_tagged_entry cqe = { - .tag = rx_buf->pkt.hdr.tag, - .op_context = rx_buf->recv_entry->context, - }; - rx_buf->ep->util_coll_peer_xfer_ops-> - complete(rx_buf->ep->util_coll_ep, &cqe, 0); - return; - } - - if (rx_buf->ep->rxm_info->caps & FI_SOURCE) - rxm_cq_write_src(rx_buf->ep->util_ep.rx_cq, context, - flags, len, buf, rx_buf->pkt.hdr.data, - rx_buf->pkt.hdr.tag, - rx_buf->conn->peer->fi_addr); - else - rxm_cq_write(rx_buf->ep->util_ep.rx_cq, context, - flags, len, buf, rx_buf->pkt.hdr.data, - rx_buf->pkt.hdr.tag); -} - struct rxm_mr *rxm_mr_get_map_entry(struct rxm_domain *domain, uint64_t key); struct rxm_recv_entry * diff --git a/prov/rxm/src/rxm_cq.c b/prov/rxm/src/rxm_cq.c index 27c8cc6f1c0..b04b36444d3 100644 --- a/prov/rxm/src/rxm_cq.c +++ b/prov/rxm/src/rxm_cq.c @@ -101,6 +101,35 @@ static void rxm_replace_rx_buf(struct rxm_rx_buf *rx_buf) ofi_buf_free(new_rx_buf); } +static void rxm_cq_write_recv_comp(struct rxm_rx_buf *rx_buf, void *context, + uint64_t flags, size_t len, char *buf) +{ + int ret; + + if (rx_buf->ep->util_coll_peer_xfer_ops && + rx_buf->pkt.hdr.tag & RXM_PEER_XFER_TAG_FLAG) { + struct fi_cq_tagged_entry cqe = { + .tag = rx_buf->pkt.hdr.tag, + .op_context = rx_buf->recv_entry->context, + }; + rx_buf->ep->util_coll_peer_xfer_ops-> + complete(rx_buf->ep->util_coll_ep, &cqe, 0); + return; + } + if (rx_buf->ep->rxm_info->caps & FI_SOURCE) + ret = ofi_peer_cq_write(rx_buf->ep->util_ep.rx_cq, context, + flags, len, buf, rx_buf->pkt.hdr.data, + rx_buf->pkt.hdr.tag, + rx_buf->conn->peer->fi_addr); + else + ret = ofi_peer_cq_write(rx_buf->ep->util_ep.rx_cq, context, + flags, len, buf, rx_buf->pkt.hdr.data, + rx_buf->pkt.hdr.tag, FI_ADDR_NOTAVAIL); + if (ret) + FI_WARN(&rxm_prov, FI_LOG_CQ, + "Unable to write rx completion\n"); +} + static void rxm_finish_buf_recv(struct rxm_rx_buf *rx_buf) { uint64_t flags; @@ -136,19 +165,19 @@ static void rxm_cq_write_error_trunc(struct rxm_rx_buf *rx_buf, size_t done_len) int ret; if (rx_buf->ep->util_ep.flags & OFI_CNTR_ENABLED) - rxm_cntr_incerr(rx_buf->ep->util_ep.cntrs[CNTR_RX]); + ofi_ep_peer_rx_cntr_incerr(&rx_buf->ep->util_ep, ofi_op_msg); FI_WARN(&rxm_prov, FI_LOG_CQ, "Message truncated: " "recv buf length: %zu message length: %" PRIu64 "\n", done_len, rx_buf->pkt.hdr.size); - ret = ofi_cq_write_error_trunc(rx_buf->ep->util_ep.rx_cq, - rx_buf->recv_entry->context, - rx_buf->recv_entry->comp_flags | - rx_buf->pkt.hdr.flags, - rx_buf->pkt.hdr.size, - rx_buf->recv_entry->rxm_iov.iov[0].iov_base, - rx_buf->pkt.hdr.data, rx_buf->pkt.hdr.tag, - rx_buf->pkt.hdr.size - done_len); + ret = ofi_peer_cq_write_error_trunc( + rx_buf->ep->util_ep.rx_cq, + rx_buf->recv_entry->context, + rx_buf->recv_entry->comp_flags | + rx_buf->pkt.hdr.flags, rx_buf->pkt.hdr.size, + rx_buf->recv_entry->rxm_iov.iov[0].iov_base, + rx_buf->pkt.hdr.data, rx_buf->pkt.hdr.tag, + rx_buf->pkt.hdr.size - done_len); if (ret) { FI_WARN(&rxm_prov, FI_LOG_CQ, "Unable to write recv error CQ\n"); assert(0); @@ -166,16 +195,16 @@ static void rxm_finish_recv(struct rxm_rx_buf *rx_buf, size_t done_len) if (rx_buf->recv_entry->flags & FI_COMPLETION || rx_buf->ep->rxm_info->mode & OFI_BUFFERED_RECV) { - rxm_cq_write_recv_comp(rx_buf, rx_buf->recv_entry->context, - rx_buf->recv_entry->comp_flags | - rx_buf->pkt.hdr.flags | - (rx_buf->recv_entry->flags & FI_MULTI_RECV), - rx_buf->pkt.hdr.size, - rx_buf->recv_entry->rxm_iov. - iov[0].iov_base); - } - ofi_ep_cntr_inc(&rx_buf->ep->util_ep, CNTR_RX); - + rxm_cq_write_recv_comp( + rx_buf, rx_buf->recv_entry->context, + rx_buf->recv_entry->comp_flags | + rx_buf->pkt.hdr.flags | + (rx_buf->recv_entry->flags & FI_MULTI_RECV), + rx_buf->pkt.hdr.size, + rx_buf->recv_entry->rxm_iov. + iov[0].iov_base); + } + ofi_ep_peer_rx_cntr_inc(&rx_buf->ep->util_ep, ofi_op_msg); release: rxm_recv_entry_release(recv_entry); rxm_free_rx_buf(rx_buf); @@ -186,8 +215,9 @@ rxm_cq_write_tx_comp(struct rxm_ep *rxm_ep, uint64_t comp_flags, void *app_context, uint64_t flags) { if (flags & FI_COMPLETION) { - rxm_cq_write(rxm_ep->util_ep.tx_cq, app_context, - comp_flags, 0, NULL, 0, 0); + (void) ofi_peer_cq_write(rxm_ep->util_ep.tx_cq, app_context, + comp_flags, 0, NULL, 0, 0, + FI_ADDR_NOTAVAIL); } } @@ -201,9 +231,9 @@ static void rxm_finish_rma(struct rxm_ep *rxm_ep, struct rxm_tx_buf *rma_buf, rma_buf->flags); if (comp_flags & FI_WRITE) - ofi_ep_cntr_inc(&rxm_ep->util_ep, CNTR_WR); + ofi_ep_peer_tx_cntr_inc(&rxm_ep->util_ep, ofi_op_write); else - ofi_ep_cntr_inc(&rxm_ep->util_ep, CNTR_RD); + ofi_ep_peer_tx_cntr_inc(&rxm_ep->util_ep, ofi_op_read_req); if (!(rma_buf->flags & FI_INJECT) && !rxm_ep->rdm_mr_local && rxm_ep->msg_mr_local) { @@ -219,7 +249,7 @@ void rxm_finish_eager_send(struct rxm_ep *rxm_ep, struct rxm_tx_buf *tx_buf) rxm_cq_write_tx_comp(rxm_ep, ofi_tx_cq_flags(tx_buf->pkt.hdr.op), tx_buf->app_context, tx_buf->flags); - ofi_ep_cntr_inc(&rxm_ep->util_ep, CNTR_TX); + ofi_ep_peer_tx_cntr_inc(&rxm_ep->util_ep, ofi_op_msg); } static bool rxm_complete_sar(struct rxm_ep *rxm_ep, @@ -259,7 +289,7 @@ static void rxm_handle_sar_comp(struct rxm_ep *rxm_ep, return; rxm_cq_write_tx_comp(rxm_ep, comp_flags, app_context, tx_flags); - ofi_ep_cntr_inc(&rxm_ep->util_ep, CNTR_TX); + ofi_ep_peer_tx_cntr_inc(&rxm_ep->util_ep, ofi_op_msg); } static void rxm_rndv_rx_finish(struct rxm_rx_buf *rx_buf) @@ -295,7 +325,7 @@ static void rxm_rndv_tx_finish(struct rxm_ep *rxm_ep, ofi_buf_free(tx_buf->write_rndv.done_buf); tx_buf->write_rndv.done_buf = NULL; } - ofi_ep_cntr_inc(&rxm_ep->util_ep, CNTR_TX); + ofi_ep_peer_tx_cntr_inc(&rxm_ep->util_ep, ofi_op_msg); rxm_free_tx_buf(rxm_ep, tx_buf); } @@ -518,8 +548,8 @@ ssize_t rxm_rndv_read(struct rxm_rx_buf *rx_buf) rx_buf->recv_entry->rxm_iov.count, total_len, rx_buf); if (ret) { - rxm_cq_write_error(rx_buf->ep->util_ep.rx_cq, - rx_buf->ep->util_ep.cntrs[CNTR_RX], rx_buf, (int) ret); + rxm_cq_write_rx_error(rx_buf->ep, ofi_op_msg, rx_buf, + (int) ret); } return ret; } @@ -561,9 +591,8 @@ static ssize_t rxm_rndv_handle_wr_data(struct rxm_rx_buf *rx_buf) tx_buf->rma.count, total_len, tx_buf); if (ret) - rxm_cq_write_error(rx_buf->ep->util_ep.rx_cq, - rx_buf->ep->util_ep.cntrs[CNTR_RX], - tx_buf, (int) ret); + rxm_cq_write_rx_error(rx_buf->ep, ofi_op_msg, tx_buf, (int) ret); + rxm_free_rx_buf(rx_buf); return ret; } @@ -986,9 +1015,9 @@ ssize_t rxm_rndv_send_wr_data(struct rxm_rx_buf *rx_buf) static void rxm_handle_remote_write(struct rxm_ep *rxm_ep, struct fi_cq_data_entry *comp) { - rxm_cq_write(rxm_ep->util_ep.rx_cq, NULL, comp->flags, comp->len, NULL, - comp->data, 0); - ofi_ep_cntr_inc(&rxm_ep->util_ep, CNTR_REM_WR); + ofi_peer_cq_write(rxm_ep->util_ep.rx_cq, NULL, comp->flags, comp->len, + NULL, comp->data, 0, FI_ADDR_NOTAVAIL); + ofi_ep_peer_rx_cntr_inc(&rxm_ep->util_ep, ofi_op_write); if (comp->op_context) rxm_free_rx_buf(comp->op_context); } @@ -1222,10 +1251,7 @@ static ssize_t rxm_handle_atomic_req(struct rxm_ep *rxm_ep, } result_len = op == ofi_op_atomic ? 0 : offset; - if (op == ofi_op_atomic) - ofi_ep_cntr_inc(&rxm_ep->util_ep, CNTR_REM_WR); - else - ofi_ep_cntr_inc(&rxm_ep->util_ep, CNTR_REM_RD); + ofi_ep_peer_rx_cntr_inc(&rxm_ep->util_ep, op); return rxm_atomic_send_resp(rxm_ep, rx_buf, resp_buf, result_len, FI_SUCCESS); @@ -1236,7 +1262,6 @@ static ssize_t rxm_handle_atomic_resp(struct rxm_ep *rxm_ep, { struct rxm_tx_buf *tx_buf; struct rxm_atomic_resp_hdr *resp_hdr; - struct util_cntr *cntr = NULL; uint64_t len; ssize_t copy_len; ssize_t ret = 0; @@ -1286,33 +1311,15 @@ static ssize_t rxm_handle_atomic_resp(struct rxm_ep *rxm_ep, rxm_cq_write_tx_comp(rxm_ep, ofi_tx_cq_flags(tx_buf->pkt.hdr.op), tx_buf->app_context, tx_buf->flags); - if (tx_buf->pkt.hdr.op == ofi_op_atomic) { - ofi_ep_cntr_inc(&rxm_ep->util_ep, CNTR_WR); - } else if (tx_buf->pkt.hdr.op == ofi_op_atomic_compare || - tx_buf->pkt.hdr.op == ofi_op_atomic_fetch) { - ofi_ep_cntr_inc(&rxm_ep->util_ep, CNTR_RD); - } else { - ret = -FI_EOPNOTSUPP; - goto write_err; - } + ofi_ep_peer_tx_cntr_inc(&rxm_ep->util_ep, tx_buf->pkt.hdr.op); free: rxm_free_rx_buf(rx_buf); rxm_free_tx_buf(rxm_ep, tx_buf); return ret; write_err: - if (tx_buf->pkt.hdr.op == ofi_op_atomic) { - cntr = rxm_ep->util_ep.cntrs[CNTR_WR]; - } else if (tx_buf->pkt.hdr.op == ofi_op_atomic_compare || - tx_buf->pkt.hdr.op == ofi_op_atomic_fetch) { - cntr = rxm_ep->util_ep.cntrs[CNTR_RD]; - } else { - FI_WARN(&rxm_prov, FI_LOG_CQ, - "unknown atomic request op!\n"); - assert(0); - } - rxm_cq_write_error(rxm_ep->util_ep.tx_cq, cntr, - tx_buf->app_context, (int) ret); + rxm_cq_write_tx_error(rxm_ep, tx_buf->pkt.hdr.op, tx_buf->app_context, + (int) ret); goto free; } @@ -1480,23 +1487,38 @@ ssize_t rxm_handle_comp(struct rxm_ep *rxm_ep, struct fi_cq_data_entry *comp) } } -void rxm_cq_write_error(struct util_cq *cq, struct util_cntr *cntr, - void *op_context, int err) +void rxm_cq_write_tx_error(struct rxm_ep *rxm_ep, uint8_t op, void *op_context, + int err) { struct fi_cq_err_entry err_entry = {0}; err_entry.op_context = op_context; err_entry.prov_errno = err; err_entry.err = -err; - if (cntr) - rxm_cntr_incerr(cntr); + ofi_ep_peer_tx_cntr_incerr(&rxm_ep->util_ep, op); - if (ofi_cq_write_error(cq, &err_entry)) { - FI_WARN(&rxm_prov, FI_LOG_CQ, "Unable to ofi_cq_write_error\n"); + if (ofi_peer_cq_write_error(rxm_ep->util_ep.tx_cq, &err_entry)) { + FI_WARN(&rxm_prov, FI_LOG_CQ, + "Unable to ofi_peer_cq_write_error\n"); assert(0); } } +void rxm_cq_write_rx_error(struct rxm_ep *rxm_ep, uint8_t op, void *op_context, + int err) +{ + struct fi_cq_err_entry err_entry = {0}; + err_entry.op_context = op_context; + err_entry.prov_errno = err; + err_entry.err = -err; + + ofi_ep_peer_rx_cntr_incerr(&rxm_ep->util_ep, op); + + if (ofi_peer_cq_write_error(rxm_ep->util_ep.rx_cq, &err_entry)) + FI_WARN(&rxm_prov, FI_LOG_CQ, + "Unable to ofi_peer_cq_write_error\n"); +} + void rxm_cq_write_error_all(struct rxm_ep *rxm_ep, int err) { struct fi_cq_err_entry err_entry = {0}; @@ -1505,32 +1527,26 @@ void rxm_cq_write_error_all(struct rxm_ep *rxm_ep, int err) err_entry.prov_errno = err; err_entry.err = -err; if (rxm_ep->util_ep.tx_cq) { - ret = ofi_cq_write_error(rxm_ep->util_ep.tx_cq, &err_entry); + ret = ofi_peer_cq_write_error(rxm_ep->util_ep.tx_cq, &err_entry); if (ret) { FI_WARN(&rxm_prov, FI_LOG_CQ, - "Unable to ofi_cq_write_error\n"); + "Unable to ofi_peer_cq_write_error\n"); assert(0); } } if (rxm_ep->util_ep.rx_cq) { - ret = ofi_cq_write_error(rxm_ep->util_ep.rx_cq, &err_entry); + ret = ofi_peer_cq_write_error(rxm_ep->util_ep.rx_cq, &err_entry); if (ret) { FI_WARN(&rxm_prov, FI_LOG_CQ, - "Unable to ofi_cq_write_error\n"); + "Unable to ofi_peer_cq_write_error\n"); assert(0); } } - if (rxm_ep->util_ep.cntrs[CNTR_TX]) - rxm_cntr_incerr(rxm_ep->util_ep.cntrs[CNTR_TX]); - - if (rxm_ep->util_ep.cntrs[CNTR_RX]) - rxm_cntr_incerr(rxm_ep->util_ep.cntrs[CNTR_RX]); - - if (rxm_ep->util_ep.cntrs[CNTR_WR]) - rxm_cntr_incerr(rxm_ep->util_ep.cntrs[CNTR_WR]); - if (rxm_ep->util_ep.cntrs[CNTR_RD]) - rxm_cntr_incerr(rxm_ep->util_ep.cntrs[CNTR_RD]); + ofi_ep_peer_tx_cntr_incerr(&rxm_ep->util_ep, ofi_op_msg); + ofi_ep_peer_rx_cntr_incerr(&rxm_ep->util_ep, ofi_op_msg); + ofi_ep_peer_tx_cntr_incerr(&rxm_ep->util_ep, ofi_op_write); + ofi_ep_peer_tx_cntr_incerr(&rxm_ep->util_ep, ofi_op_read_req); } void rxm_handle_comp_error(struct rxm_ep *rxm_ep) @@ -1583,7 +1599,7 @@ void rxm_handle_comp_error(struct rxm_ep *rxm_ep) case RXM_INJECT_TX: rxm_free_tx_buf(rxm_ep, err_entry.op_context); if (cntr) - rxm_cntr_incerr(cntr); + cntr->peer_cntr->owner_ops->incerr(cntr->peer_cntr); return; case RXM_CREDIT_TX: case RXM_ATOMIC_RESP_SENT: /* BUG: should have consumed tx credit */ @@ -1647,12 +1663,13 @@ void rxm_handle_comp_error(struct rxm_ep *rxm_ep) } if (cntr) - rxm_cntr_incerr(cntr); + cntr->peer_cntr->owner_ops->incerr(cntr->peer_cntr); assert(cq); - ret = ofi_cq_write_error(cq, &err_entry); + ret = ofi_peer_cq_write_error(cq, &err_entry); if (ret) { - FI_WARN(&rxm_prov, FI_LOG_CQ, "Unable to ofi_cq_write_error\n"); + FI_WARN(&rxm_prov, FI_LOG_CQ, + "Unable to ofi_peer_cq_write_error\n"); assert(0); } } @@ -1665,8 +1682,8 @@ ssize_t rxm_thru_comp(struct rxm_ep *ep, struct fi_cq_data_entry *comp) cq = (comp->flags & (FI_RECV | FI_REMOTE_WRITE | FI_REMOTE_READ)) ? ep->util_ep.rx_cq : ep->util_ep.tx_cq; - ret = ofi_cq_write(cq, comp->op_context, comp->flags, comp->len, - comp->buf, comp->data, 0); + ret = ofi_peer_cq_write(cq, comp->op_context, comp->flags, comp->len, + comp->buf, comp->data, 0, FI_ADDR_NOTAVAIL); if (ret) { FI_WARN(&rxm_prov, FI_LOG_CQ, "Unable to report completion\n"); assert(0); @@ -1692,9 +1709,10 @@ void rxm_thru_comp_error(struct rxm_ep *ep) } cq = (err_entry.flags & FI_RECV) ? ep->util_ep.rx_cq : ep->util_ep.tx_cq; - ret = ofi_cq_write_error(cq, &err_entry); + ret = ofi_peer_cq_write_error(cq, &err_entry); if (ret) { - FI_WARN(&rxm_prov, FI_LOG_CQ, "Unable to ofi_cq_write_error\n"); + FI_WARN(&rxm_prov, FI_LOG_CQ, + "Unable to ofi_peer_cq_write_error\n"); assert(0); } } @@ -1730,8 +1748,8 @@ ssize_t rxm_cq_owner_write(struct fid_peer_cq *peer_cq, void *context, } rxm_cq = container_of(peer_cq, struct rxm_cq, peer_cq); - return ofi_cq_write(&rxm_cq->util_cq, req->app_context, req->flags, len, - buf, data, tag); + return ofi_peer_cq_write(&rxm_cq->util_cq, req->app_context, req->flags, + len, buf, data, tag, FI_ADDR_NOTAVAIL); } ssize_t rxm_cq_owner_writeerr(struct fid_peer_cq *peer_cq, @@ -1751,7 +1769,7 @@ ssize_t rxm_cq_owner_writeerr(struct fid_peer_cq *peer_cq, } rxm_cq = container_of(peer_cq, struct rxm_cq, peer_cq); - return ofi_cq_write_error(&rxm_cq->util_cq, &cqe_err); + return ofi_peer_cq_write_error(&rxm_cq->util_cq, &cqe_err); } int rxm_post_recv(struct rxm_rx_buf *rx_buf) diff --git a/prov/rxm/src/rxm_ep.c b/prov/rxm/src/rxm_ep.c index ba6a949122e..69a88e2caaf 100644 --- a/prov/rxm/src/rxm_ep.c +++ b/prov/rxm/src/rxm_ep.c @@ -746,9 +746,8 @@ rxm_ep_sar_handle_segment_failure(struct rxm_deferred_tx_entry *def_tx_entry, { rxm_ep_sar_tx_cleanup(def_tx_entry->rxm_ep, def_tx_entry->rxm_conn, def_tx_entry->sar_seg.cur_seg_tx_buf); - rxm_cq_write_error(def_tx_entry->rxm_ep->util_ep.tx_cq, - def_tx_entry->rxm_ep->util_ep.cntrs[CNTR_TX], - def_tx_entry->sar_seg.app_context, (int) ret); + rxm_cq_write_tx_error(def_tx_entry->rxm_ep, ofi_op_msg, + def_tx_entry->sar_seg.app_context, (int) ret); } /* Returns FI_SUCCESS if the SAR deferred TX queue is empty, @@ -843,10 +842,10 @@ void rxm_ep_progress_deferred_queue(struct rxm_ep *rxm_ep, if (ret) { if (ret == -FI_EAGAIN) return; - rxm_cq_write_error(def_tx_entry->rxm_ep->util_ep.rx_cq, - def_tx_entry->rxm_ep->util_ep.cntrs[CNTR_RX], - def_tx_entry->rndv_ack.rx_buf-> - recv_entry->context, (int) ret); + rxm_cq_write_rx_error( + def_tx_entry->rxm_ep, ofi_op_msg, + def_tx_entry->rndv_ack.rx_buf-> + recv_entry->context, (int) ret); } if (def_tx_entry->rndv_ack.rx_buf->recv_entry->rndv .tx_buf->pkt.ctrl_hdr @@ -868,9 +867,10 @@ void rxm_ep_progress_deferred_queue(struct rxm_ep *rxm_ep, if (ret) { if (ret == -FI_EAGAIN) return; - rxm_cq_write_error(def_tx_entry->rxm_ep->util_ep.tx_cq, - def_tx_entry->rxm_ep->util_ep.cntrs[CNTR_TX], - def_tx_entry->rndv_done.tx_buf, (int) ret); + rxm_cq_write_tx_error(def_tx_entry->rxm_ep, + ofi_op_msg, + def_tx_entry->rndv_done.tx_buf, + (int) ret); } RXM_UPDATE_STATE(FI_LOG_EP_DATA, def_tx_entry->rndv_done.tx_buf, @@ -888,10 +888,10 @@ void rxm_ep_progress_deferred_queue(struct rxm_ep *rxm_ep, if (ret) { if (ret == -FI_EAGAIN) return; - rxm_cq_write_error(def_tx_entry->rxm_ep->util_ep.rx_cq, - def_tx_entry->rxm_ep->util_ep.cntrs[CNTR_RX], - def_tx_entry->rndv_read.rx_buf-> - recv_entry->context, (int) ret); + rxm_cq_write_rx_error( + def_tx_entry->rxm_ep, ofi_op_msg, + def_tx_entry->rndv_read.rx_buf-> + recv_entry->context, (int) ret); } break; case RXM_DEFERRED_TX_RNDV_WRITE: @@ -906,9 +906,10 @@ void rxm_ep_progress_deferred_queue(struct rxm_ep *rxm_ep, if (ret) { if (ret == -FI_EAGAIN) return; - rxm_cq_write_error(def_tx_entry->rxm_ep->util_ep.rx_cq, - def_tx_entry->rxm_ep->util_ep.cntrs[CNTR_RX], - def_tx_entry->rndv_write.tx_buf, (int) ret); + rxm_cq_write_rx_error( + def_tx_entry->rxm_ep, ofi_op_msg, + def_tx_entry->rndv_write.tx_buf, + (int) ret); } break; case RXM_DEFERRED_TX_SAR_SEG: @@ -939,11 +940,12 @@ void rxm_ep_progress_deferred_queue(struct rxm_ep *rxm_ep, OFI_PRIORITY); if (ret) { if (ret != -FI_EAGAIN) { - rxm_cq_write_error( - def_tx_entry->rxm_ep->util_ep.rx_cq, - def_tx_entry->rxm_ep->util_ep.cntrs[CNTR_RX], + rxm_cq_write_rx_error( + def_tx_entry->rxm_ep, + ofi_op_msg, def_tx_entry->rndv_read.rx_buf-> - recv_entry->context, (int) ret); + recv_entry->context, + (int) ret); } return; } diff --git a/prov/rxm/src/rxm_msg.c b/prov/rxm/src/rxm_msg.c index 46cd1cfe285..3b9088a2858 100644 --- a/prov/rxm/src/rxm_msg.c +++ b/prov/rxm/src/rxm_msg.c @@ -140,8 +140,8 @@ rxm_post_mrecv(struct rxm_ep *ep, const struct iovec *iov, if ((cur_iov.iov_len < ep->min_multi_recv_size) || (ret && cur_iov.iov_len != iov->iov_len)) { - rxm_cq_write(ep->util_ep.rx_cq, context, FI_MULTI_RECV, - 0, NULL, 0, 0); + ofi_peer_cq_write(ep->util_ep.rx_cq, context, FI_MULTI_RECV, + 0, NULL, 0, 0, FI_ADDR_NOTAVAIL); } return ret; diff --git a/prov/rxm/src/rxm_tagged.c b/prov/rxm/src/rxm_tagged.c index 78e3d3ff0e9..8f18f34b3eb 100644 --- a/prov/rxm/src/rxm_tagged.c +++ b/prov/rxm/src/rxm_tagged.c @@ -50,8 +50,9 @@ rxm_discard_recv(struct rxm_ep *rxm_ep, struct rxm_rx_buf *rx_buf, RXM_DBG_ADDR_TAG(FI_LOG_EP_DATA, "Discarding message", rx_buf->unexp_msg.addr, rx_buf->unexp_msg.tag); - rxm_cq_write(rxm_ep->util_ep.rx_cq, context, FI_TAGGED | FI_RECV, - 0, NULL, rx_buf->pkt.hdr.data, rx_buf->pkt.hdr.tag); + ofi_peer_cq_write(rxm_ep->util_ep.rx_cq, context, FI_TAGGED | FI_RECV, + 0, NULL, rx_buf->pkt.hdr.data, + rx_buf->pkt.hdr.tag, FI_ADDR_NOTAVAIL); rxm_free_rx_buf(rx_buf); } @@ -73,8 +74,8 @@ rxm_peek_recv(struct rxm_ep *rxm_ep, fi_addr_t addr, uint64_t tag, rx_buf = rxm_get_unexp_msg(recv_queue, addr, tag, ignore); if (!rx_buf) { FI_DBG(&rxm_prov, FI_LOG_EP_DATA, "Message not found\n"); - ret = ofi_cq_write_error_peek(rxm_ep->util_ep.rx_cq, tag, - context); + ret = ofi_peer_cq_write_error_peek( + rxm_ep->util_ep.rx_cq, tag, context); if (ret) FI_WARN(&rxm_prov, FI_LOG_CQ, "Error writing to CQ\n"); return; @@ -94,9 +95,9 @@ rxm_peek_recv(struct rxm_ep *rxm_ep, fi_addr_t addr, uint64_t tag, dlist_remove(&rx_buf->unexp_msg.entry); } - rxm_cq_write(rxm_ep->util_ep.rx_cq, context, FI_TAGGED | FI_RECV, - rx_buf->pkt.hdr.size, NULL, - rx_buf->pkt.hdr.data, rx_buf->pkt.hdr.tag); + ofi_peer_cq_write(rxm_ep->util_ep.rx_cq, context, FI_TAGGED | FI_RECV, + rx_buf->pkt.hdr.size, NULL, rx_buf->pkt.hdr.data, + rx_buf->pkt.hdr.tag, FI_ADDR_NOTAVAIL); } static ssize_t