-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathLoopbackFlow.cpp
More file actions
121 lines (101 loc) · 3.44 KB
/
LoopbackFlow.cpp
File metadata and controls
121 lines (101 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
#include "Options.hpp"
#include "LoopbackFlow.hpp"
#include "Loopback.hpp"
#include <iostream>
#include <vector>
#include <numa.h>
void LoopbackFlow::thread_body(int numa_node) {
#ifdef DEBUG_LOG
std::cout << "Loopback flow thread starting on NUMA node " << numa_node << std::endl;
#endif
// set NUMA node to that requested
numa_run_on_node(numa_node);
// Initialize RDMA NIC
Endpoint e;
// Initialize loopback connection
Loopback lp(e);
// Allocate memory region. We copy to and from the same buffer,
// since we care only about data movement, not the result.
ibv_mr * mr = e.allocate(Options::options->length);
// reuse same SGE for all reads
ibv_sge send_sge;
send_sge.addr = reinterpret_cast<uintptr_t>(mr->addr);
send_sge.length = Options::options->length;
send_sge.lkey = mr->lkey;
const int concurrent_reads = Options::options->concurrent_reads;
ibv_send_wr send_wr[concurrent_reads];
for (int i = 0; i < concurrent_reads; ++i) {
send_wr[i].wr_id = i;
send_wr[i].next = nullptr;
send_wr[i].sg_list = &send_sge;
send_wr[i].num_sge = 1;
send_wr[i].opcode = IBV_WR_RDMA_READ;
send_wr[i].send_flags = IBV_SEND_SIGNALED;
send_wr[i].wr.rdma.remote_addr = reinterpret_cast<uintptr_t>(mr->addr);
send_wr[i].wr.rdma.rkey = mr->rkey;
}
// post all outstanding requests
int outstanding_reads = 0;
for (int i = 0; i < concurrent_reads; ++i) {
lp.post_send(&send_wr[i]);
++outstanding_reads;
#ifdef DEBUG_LOG
std::cout << "Loopback flow posted read " << i << "." << std::endl;
#endif
}
// keep outstanding requests going
bool done = false;
while (!done) {
// see if any reads have completed
ibv_wc completions[concurrent_reads];
int completed_count = lp.poll_cq(concurrent_reads, &completions[0]);
// decrement outstanding count, whether reads completed successfully or not
outstanding_reads -= completed_count;
// check done flag here to avoid reposting unnecessary reads
done = done_flag.load();
// process completions
for (int i = 0; i < completed_count; ++i) {
if (completions[i].status != IBV_WC_SUCCESS) {
std::cerr << "Got eror completion for " << (void*) completions[i].wr_id
<< " with status " << ibv_wc_status_str(completions[i].status)
<< std::endl;
exit(1);
} else {
#ifdef DEBUG_LOG
std::cout << "Loopback flow read " << completions[i].wr_id << " completed; reposting..." << std::endl;
#endif
// repost read that completed
lp.post_send(&send_wr[completions[i].wr_id]);
++outstanding_reads;
}
}
}
// stop was requested; wait for current requests to complete
while (outstanding_reads > 0) {
ibv_wc completions[concurrent_reads];
int completed_count = lp.poll_cq(concurrent_reads, &completions[0]);
outstanding_reads -= completed_count;
#ifdef DEBUG_LOG
for (int i = 0; i < completed_count; ++i) {
std::cout << "Loopback flow read " << completions[i].wr_id << " completed." << std::endl;
}
#endif
}
#ifdef DEBUG_LOG
std::cout << "Loopback flow thread exiting." << std::endl;
#endif
}
LoopbackFlow::~LoopbackFlow() {
if (thread.joinable()) {
stop();
}
}
void LoopbackFlow::stop() {
#ifdef DEBUG_LOG
std::cout << "Loopback flow stop requested." << std::endl;
#endif
done_flag.store(true);
thread.join();
}