Skip to content

Commit

Permalink
prov/rxm: add rxm support for using a peer CQs and counters
Browse files Browse the repository at this point in the history
Support using the peer APIs by default using the util peer helper
functions. Instead of going through the rxm-specific functions to write
to CQs and counters, use the ofi_peer_cq/cntr APIs which use the
owner ops. In the default case where rxm is not being used as a peer
these will go to the regular ofi_cq_write functions.

Signed-off-by: Alexia Ingerson <[email protected]>
  • Loading branch information
aingerson committed Dec 6, 2024
1 parent 975428c commit de410e4
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 194 deletions.
77 changes: 4 additions & 73 deletions prov/rxm/src/rxm.h
Original file line number Diff line number Diff line change
Expand Up @@ -759,9 +759,10 @@ ssize_t rxm_handle_rx_buf(struct rxm_rx_buf *rx_buf);

int rxm_endpoint(struct fid_domain *domain, struct fi_info *info,
struct fid_ep **ep, void *context);

void rxm_cq_write_error(struct util_cq *cq, struct util_cntr *cntr,
void *op_context, int err);
void rxm_cq_write_tx_error(struct rxm_ep *rxm_ep, uint8_t op, void *op_context,
int err);
void rxm_cq_write_rx_error(struct rxm_ep *rxm_ep, uint8_t op, void *op_context,
int err);
void rxm_cq_write_error_all(struct rxm_ep *rxm_ep, int err);
void rxm_handle_comp_error(struct rxm_ep *rxm_ep);
ssize_t rxm_handle_comp(struct rxm_ep *rxm_ep, struct fi_cq_data_entry *comp);
Expand Down Expand Up @@ -880,50 +881,6 @@ int rxm_msg_mr_reg_internal(struct rxm_domain *rxm_domain, const void *buf,
size_t len, uint64_t acs, uint64_t flags,
struct fid_mr **mr);

static inline void rxm_cntr_incerr(struct util_cntr *cntr)
{
if (cntr)
cntr->cntr_fid.ops->adderr(&cntr->cntr_fid, 1);
}

static inline void
rxm_cq_write(struct util_cq *cq, void *context, uint64_t flags, size_t len,
void *buf, uint64_t data, uint64_t tag)
{
int ret;

FI_DBG(&rxm_prov, FI_LOG_CQ, "Reporting %s completion\n",
fi_tostr((void *) &flags, FI_TYPE_CQ_EVENT_FLAGS));

ret = ofi_cq_write(cq, context, flags, len, buf, data, tag);
if (ret) {
FI_WARN(&rxm_prov, FI_LOG_CQ,
"Unable to report completion\n");
assert(0);
}
if (cq->wait)
cq->wait->signal(cq->wait);
}

static inline void
rxm_cq_write_src(struct util_cq *cq, void *context, uint64_t flags, size_t len,
void *buf, uint64_t data, uint64_t tag, fi_addr_t addr)
{
int ret;

FI_DBG(&rxm_prov, FI_LOG_CQ, "Reporting %s completion\n",
fi_tostr((void *) &flags, FI_TYPE_CQ_EVENT_FLAGS));

ret = ofi_cq_write_src(cq, context, flags, len, buf, data, tag, addr);
if (ret) {
FI_WARN(&rxm_prov, FI_LOG_CQ,
"Unable to report completion\n");
assert(0);
}
if (cq->wait)
cq->wait->signal(cq->wait);
}

ssize_t rxm_get_conn(struct rxm_ep *rxm_ep, fi_addr_t addr,
struct rxm_conn **rxm_conn);

Expand Down Expand Up @@ -998,32 +955,6 @@ rxm_recv_entry_release(struct rxm_recv_entry *entry)
ofi_buf_free(entry);
}

static inline void
rxm_cq_write_recv_comp(struct rxm_rx_buf *rx_buf, void *context, uint64_t flags,
size_t len, char *buf)
{
if (rx_buf->ep->util_coll_peer_xfer_ops &&
rx_buf->pkt.hdr.tag & RXM_PEER_XFER_TAG_FLAG) {
struct fi_cq_tagged_entry cqe = {
.tag = rx_buf->pkt.hdr.tag,
.op_context = rx_buf->recv_entry->context,
};
rx_buf->ep->util_coll_peer_xfer_ops->
complete(rx_buf->ep->util_coll_ep, &cqe, 0);
return;
}

if (rx_buf->ep->rxm_info->caps & FI_SOURCE)
rxm_cq_write_src(rx_buf->ep->util_ep.rx_cq, context,
flags, len, buf, rx_buf->pkt.hdr.data,
rx_buf->pkt.hdr.tag,
rx_buf->conn->peer->fi_addr);
else
rxm_cq_write(rx_buf->ep->util_ep.rx_cq, context,
flags, len, buf, rx_buf->pkt.hdr.data,
rx_buf->pkt.hdr.tag);
}

struct rxm_mr *rxm_mr_get_map_entry(struct rxm_domain *domain, uint64_t key);

struct rxm_recv_entry *
Expand Down
Loading

0 comments on commit de410e4

Please sign in to comment.