int register_qp(QueuePair *qp, RDMAConnectedSocketImpl* csi);
void make_pending_worker(RDMAWorker* w) {
Mutex::Locker l(w_lock);
- if (pending_workers.back() != w) {
- pending_workers.push_back(w);
- ++num_pending_workers;
- }
+ auto it = std::find(pending_workers.begin(), pending_workers.end(), w);
+ if (it != pending_workers.end())
+ return;
+ pending_workers.push_back(w);
+ ++num_pending_workers;
}
RDMAStack* get_stack() { return stack; }
RDMAConnectedSocketImpl* get_conn_lockless(uint32_t qp);
l_msgr_rdma_tx_bytes,
l_msgr_rdma_rx_chunks,
l_msgr_rdma_rx_bytes,
+ l_msgr_rdma_pending_sent_conns,
l_msgr_rdma_last,
};
EventCallbackRef con_handler;
int tcp_fd = -1;
bool active;// qp is active ?
+ bool pending;
void notify();
ssize_t read_buffers(char* buf, size_t len);
void cleanup();
void set_accept_fd(int sd);
int try_connect(const entity_addr_t&, const SocketOptions &opt);
-
+ bool is_pending() {return pending;}
+ void set_pending(bool val) {pending = val;}
class C_handle_connection : public EventCallback {
RDMAConnectedSocketImpl *csi;
bool active;
class RDMAStack : public NetworkStack {
vector<std::thread> threads;
RDMADispatcher *dispatcher;
- PerfCounters *perf_counter;
std::atomic<bool> fork_finished = {false};