1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSky <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
19 #include "include/compat.h"
20 #include "common/Cond.h"
21 #include "common/errno.h"
22 #include "PosixStack.h"
24 #include "rdma/RDMAStack.h"
27 #include "dpdk/DPDKStack.h"
30 #include "common/dout.h"
31 #include "include/ceph_assert.h"
33 #define dout_subsys ceph_subsys_ms
35 #define dout_prefix *_dout << "stack "
37 std::function
<void ()> NetworkStack::add_thread(Worker
* w
)
41 const unsigned EventMaxWaitUs
= 30000000;
42 w
->center
.set_owner();
43 ldout(cct
, 10) << __func__
<< " starting" << dendl
;
47 ldout(cct
, 30) << __func__
<< " calling event process" << dendl
;
50 int r
= w
->center
.process_events(EventMaxWaitUs
, &dur
);
52 ldout(cct
, 20) << __func__
<< " process events failed: "
53 << cpp_strerror(errno
) << dendl
;
56 w
->perf_logger
->tinc(l_msgr_running_total_time
, dur
);
63 std::shared_ptr
<NetworkStack
> NetworkStack::create(CephContext
*c
,
66 std::shared_ptr
<NetworkStack
> stack
= nullptr;
69 stack
.reset(new PosixNetworkStack(c
));
72 stack
.reset(new RDMAStack(c
));
76 stack
.reset(new DPDKStack(c
));
79 if (stack
== nullptr) {
80 lderr(c
) << __func__
<< " ms_async_transport_type " << t
<<
81 " is not supported! " << dendl
;
86 unsigned num_workers
= c
->_conf
->ms_async_op_threads
;
87 ceph_assert(num_workers
> 0);
88 if (num_workers
>= EventCenter::MAX_EVENTCENTER
) {
89 ldout(c
, 0) << __func__
<< " max thread limit is "
90 << EventCenter::MAX_EVENTCENTER
<< ", switching to this now. "
91 << "Higher thread values are unnecessary and currently unsupported."
93 num_workers
= EventCenter::MAX_EVENTCENTER
;
95 const int InitEventNumber
= 5000;
96 for (unsigned worker_id
= 0; worker_id
< num_workers
; ++worker_id
) {
97 Worker
*w
= stack
->create_worker(c
, worker_id
);
98 int ret
= w
->center
.init(InitEventNumber
, worker_id
, t
);
100 throw std::system_error(-ret
, std::generic_category());
101 stack
->workers
.push_back(w
);
107 NetworkStack::NetworkStack(CephContext
*c
)
111 void NetworkStack::start()
113 std::unique_lock
<decltype(pool_spin
)> lk(pool_spin
);
119 for (Worker
* worker
: workers
) {
120 if (worker
->is_init())
122 spawn_worker(add_thread(worker
));
127 for (Worker
* worker
: workers
) {
128 worker
->wait_for_init();
132 Worker
* NetworkStack::get_worker()
134 ldout(cct
, 30) << __func__
<< dendl
;
136 // start with some reasonably large number
137 unsigned min_load
= std::numeric_limits
<int>::max();
138 Worker
* current_best
= nullptr;
141 // find worker with least references
142 // tempting case is returning on references == 0, but in reality
143 // this will happen so rarely that there's no need for special case.
144 for (Worker
* worker
: workers
) {
145 unsigned worker_load
= worker
->references
.load();
146 if (worker_load
< min_load
) {
147 current_best
= worker
;
148 min_load
= worker_load
;
153 ceph_assert(current_best
);
154 ++current_best
->references
;
158 void NetworkStack::stop()
160 std::lock_guard
lk(pool_spin
);
162 for (Worker
* worker
: workers
) {
164 worker
->center
.wakeup();
170 class C_drain
: public EventCallback
{
171 ceph::mutex drain_lock
= ceph::make_mutex("C_drain::drain_lock");
172 ceph::condition_variable drain_cond
;
173 unsigned drain_count
;
176 explicit C_drain(size_t c
)
178 void do_request(uint64_t id
) override
{
179 std::lock_guard l
{drain_lock
};
181 if (drain_count
== 0) drain_cond
.notify_all();
184 std::unique_lock l
{drain_lock
};
185 drain_cond
.wait(l
, [this] { return drain_count
== 0; });
189 void NetworkStack::drain()
191 ldout(cct
, 30) << __func__
<< " started." << dendl
;
192 pthread_t cur
= pthread_self();
194 C_drain
drain(get_num_worker());
195 for (Worker
* worker
: workers
) {
196 ceph_assert(cur
!= worker
->center
.get_owner());
197 worker
->center
.dispatch_event_external(EventCallbackRef(&drain
));
201 ldout(cct
, 30) << __func__
<< " end." << dendl
;