Skip to content
This repository has been archived by the owner on Sep 5, 2023. It is now read-only.

rpma: ack at least 100000 cq events at a time #1725

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion src/cq.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
#include "cmocka_alloc.h"
#endif

#define RPMA_MAX_UNACK_CQE (int)100000

struct rpma_cq {
struct ibv_comp_channel *channel; /* completion channel */
bool shared_comp_channel; /* completion channel is shared */
struct ibv_cq *cq; /* completion queue */
unsigned unack_cqe; /* number of unack completion events */
};

/* internal librpma API */
Expand Down Expand Up @@ -94,6 +97,8 @@ rpma_cq_new(struct ibv_context *ibv_ctx, int cqe,
(*cq_ptr)->channel = channel;
(*cq_ptr)->shared_comp_channel = (shared_channel != NULL);
(*cq_ptr)->cq = cq;
(*cq_ptr)->unack_cqe = 0;


return 0;

Expand Down Expand Up @@ -124,6 +129,10 @@ rpma_cq_delete(struct rpma_cq **cq_ptr)
if (cq == NULL)
return ret;

if (cq->unack_cqe)
(void) ibv_ack_cq_events(cq->cq, cq->unack_cqe);
cq->unack_cqe = 0;

errno = ibv_destroy_cq(cq->cq);
if (errno) {
RPMA_LOG_ERROR_WITH_ERRNO(errno, "ibv_destroy_cq()");
Expand Down Expand Up @@ -175,19 +184,29 @@ rpma_cq_wait(struct rpma_cq *cq)
if (cq->shared_comp_channel)
return RPMA_E_SHARED_CHANNEL;

/*
* ACK the collected CQ event.
*/
if (cq->unack_cqe >= RPMA_MAX_UNACK_CQE) {
ibv_ack_cq_events(cq->cq, cq->unack_cqe);
cq->unack_cqe = 0;
}

/* wait for the completion event */
struct ibv_cq *ev_cq; /* unused */
void *ev_ctx; /* unused */
if (ibv_get_cq_event(cq->channel, &ev_cq, &ev_ctx))
return RPMA_E_NO_COMPLETION;

++cq->unack_cqe;

/*
* ACK the collected CQ event.
*
* XXX for performance reasons, it may be beneficial to ACK more than
* one CQ event at the same time.
*/
ibv_ack_cq_events(cq->cq, 1 /* # of CQ events */);
// ibv_ack_cq_events(cq->cq, 1 /* # of CQ events */);

/* request for the next event on the CQ channel */
errno = ibv_req_notify_cq(cq->cq, 0 /* all completions */);
Expand Down
5 changes: 4 additions & 1 deletion tests/unit/common/mocks-ibverbs.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ struct ibv_cq Ibv_cq_unknown;
struct ibv_qp Ibv_qp;
struct ibv_mr Ibv_mr;

static unsigned unack_cqe = 0;
/*
* ibv_query_device -- ibv_query_device() mock
*/
Expand Down Expand Up @@ -205,6 +206,7 @@ ibv_get_cq_event(struct ibv_comp_channel *channel, struct ibv_cq **cq,
if (!errno) {
*cq = mock_type(struct ibv_cq *);
*cq_context = NULL;
unack_cqe++;
return 0;
}

Expand All @@ -218,7 +220,8 @@ void
ibv_ack_cq_events(struct ibv_cq *cq, unsigned nevents)
{
check_expected_ptr(cq);
assert_int_equal(nevents, 1);
// assert_int_equal(nevents, 1);
check_expected(nevents);
}

/*
Expand Down
3 changes: 3 additions & 0 deletions tests/unit/conn/conn-wait.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ wait__req_notify_cq_ERRNO(void **cstate_ptr)
expect_value(rpma_cq_get_ibv_cq, cq, MOCK_RPMA_CQ);
will_return(rpma_cq_get_ibv_cq, MOCK_IBV_CQ);
expect_value(ibv_ack_cq_events, cq, MOCK_IBV_CQ);
expect_value(ibv_ack_cq_events, nevents, 1);
expect_value(ibv_req_notify_cq_mock, cq, MOCK_IBV_CQ);
will_return(ibv_req_notify_cq_mock, MOCK_ERRNO);

Expand Down Expand Up @@ -156,6 +157,7 @@ wait__success_is_rcq_NULL(void **cstate_ptr)
expect_value(rpma_cq_get_ibv_cq, cq, MOCK_RPMA_RCQ);
will_return(rpma_cq_get_ibv_cq, MOCK_IBV_RCQ);
expect_value(ibv_ack_cq_events, cq, MOCK_IBV_RCQ);
expect_value(ibv_ack_cq_events, nevents, 1);
expect_value(ibv_req_notify_cq_mock, cq, MOCK_IBV_RCQ);
will_return(ibv_req_notify_cq_mock, MOCK_OK);

Expand Down Expand Up @@ -184,6 +186,7 @@ wait__success(void **cstate_ptr)
expect_value(rpma_cq_get_ibv_cq, cq, MOCK_RPMA_RCQ);
will_return(rpma_cq_get_ibv_cq, MOCK_IBV_RCQ);
expect_value(ibv_ack_cq_events, cq, MOCK_IBV_RCQ);
expect_value(ibv_ack_cq_events, nevents, 1);
expect_value(ibv_req_notify_cq_mock, cq, MOCK_IBV_RCQ);
will_return(ibv_req_notify_cq_mock, MOCK_OK);

Expand Down
5 changes: 5 additions & 0 deletions tests/unit/cq/cq-common.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ setup__cq_new(void **cq_ptr)
assert_int_equal(ret, MOCK_OK);

cstate->cq = cq;
cstate->unack_cqe = 0;
*cq_ptr = cstate;

return 0;
Expand All @@ -64,6 +65,10 @@ teardown__cq_delete(void **cq_ptr)
struct rpma_cq *cq = cstate->cq;

/* configure mocks */
if (cstate->unack_cqe) {
expect_value(ibv_ack_cq_events, cq, MOCK_IBV_CQ);
expect_value(ibv_ack_cq_events, nevents, cstate->unack_cqe);
}
will_return(ibv_destroy_cq, MOCK_OK);
if (!cstate->shared_channel)
will_return(ibv_destroy_comp_channel, MOCK_OK);
Expand Down
1 change: 1 addition & 0 deletions tests/unit/cq/cq-common.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
struct cq_test_state {
struct ibv_comp_channel *shared_channel;
struct rpma_cq *cq;
unsigned unack_cqe;
};

extern struct cq_test_state CQ_without_channel;
Expand Down
37 changes: 35 additions & 2 deletions tests/unit/cq/cq-wait.c
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ wait__req_notify_cq_ERRNO(void **cq_ptr)
expect_value(ibv_get_cq_event, channel, MOCK_COMP_CHANNEL);
will_return(ibv_get_cq_event, MOCK_OK);
will_return(ibv_get_cq_event, MOCK_IBV_CQ);
expect_value(ibv_ack_cq_events, cq, MOCK_IBV_CQ);
// expect_value(ibv_ack_cq_events, cq, MOCK_IBV_CQ);
++cstate->unack_cqe;
expect_value(ibv_req_notify_cq_mock, cq, MOCK_IBV_CQ);
will_return(ibv_req_notify_cq_mock, MOCK_ERRNO);

Expand All @@ -100,7 +101,8 @@ wait__success(void **cq_ptr)
expect_value(ibv_get_cq_event, channel, MOCK_COMP_CHANNEL);
will_return(ibv_get_cq_event, MOCK_OK);
will_return(ibv_get_cq_event, MOCK_IBV_CQ);
expect_value(ibv_ack_cq_events, cq, MOCK_IBV_CQ);
// expect_value(ibv_ack_cq_events, cq, MOCK_IBV_CQ);
++cstate->unack_cqe;
expect_value(ibv_req_notify_cq_mock, cq, MOCK_IBV_CQ);
will_return(ibv_req_notify_cq_mock, MOCK_OK);

Expand All @@ -111,6 +113,34 @@ wait__success(void **cq_ptr)
assert_int_equal(ret, MOCK_OK);
}

static void
wait__success_150000(void **cq_ptr)
{
struct cq_test_state *cstate = *cq_ptr;
struct rpma_cq *cq = cstate->cq;

for (int i = 0; i < 150000; i++) {
/* configure mocks */
expect_value(ibv_get_cq_event, channel, MOCK_COMP_CHANNEL);
will_return(ibv_get_cq_event, MOCK_OK);
will_return(ibv_get_cq_event, MOCK_IBV_CQ);
// expect_value(ibv_ack_cq_events, cq, MOCK_IBV_CQ);
++cstate->unack_cqe;
if (cstate->unack_cqe == 100000) {
expect_value(ibv_ack_cq_events, cq, MOCK_IBV_CQ);
expect_value(ibv_ack_cq_events, nevents, 100000);
cstate->unack_cqe = 0;
}
expect_value(ibv_req_notify_cq_mock, cq, MOCK_IBV_CQ);
will_return(ibv_req_notify_cq_mock, MOCK_OK);

/* run test */
int ret = rpma_cq_wait(cq);
assert_int_equal(ret, MOCK_OK);
}
/* verify the result */
}

static const struct CMUnitTest tests_wait[] = {
/* rpma_cq_wait() unit tests */
cmocka_unit_test(wait__cq_NULL),
Expand All @@ -126,6 +156,9 @@ static const struct CMUnitTest tests_wait[] = {
cmocka_unit_test_setup_teardown(
wait__success,
setup__cq_new, teardown__cq_delete),
cmocka_unit_test_setup_teardown(
wait__success_150000,
setup__cq_new, teardown__cq_delete),
cmocka_unit_test(NULL)
};

Expand Down