1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSky <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
19 #include "include/compat.h"
20 #include "common/Cond.h"
21 #include "common/errno.h"
22 #include "PosixStack.h"
24 #include "rdma/RDMAStack.h"
27 #include "dpdk/DPDKStack.h"
30 #include "common/dout.h"
31 #include "include/ceph_assert.h"
33 #define dout_subsys ceph_subsys_ms
35 #define dout_prefix *_dout << "stack "
37 std::function
<void ()> NetworkStack::add_thread(unsigned worker_id
)
39 Worker
*w
= workers
[worker_id
];
42 sprintf(tp_name
, "msgr-worker-%u", w
->id
);
43 ceph_pthread_setname(pthread_self(), tp_name
);
44 const unsigned EventMaxWaitUs
= 30000000;
45 w
->center
.set_owner();
46 ldout(cct
, 10) << __func__
<< " starting" << dendl
;
50 ldout(cct
, 30) << __func__
<< " calling event process" << dendl
;
53 int r
= w
->center
.process_events(EventMaxWaitUs
, &dur
);
55 ldout(cct
, 20) << __func__
<< " process events failed: "
56 << cpp_strerror(errno
) << dendl
;
59 w
->perf_logger
->tinc(l_msgr_running_total_time
, dur
);
66 std::shared_ptr
<NetworkStack
> NetworkStack::create(CephContext
*c
, const string
&t
)
69 return std::make_shared
<PosixNetworkStack
>(c
, t
);
72 return std::make_shared
<RDMAStack
>(c
, t
);
76 return std::make_shared
<DPDKStack
>(c
, t
);
79 lderr(c
) << __func__
<< " ms_async_transport_type " << t
<<
80 " is not supported! " << dendl
;
85 Worker
* NetworkStack::create_worker(CephContext
*c
, const string
&type
, unsigned worker_id
)
88 return new PosixWorker(c
, worker_id
);
90 else if (type
== "rdma")
91 return new RDMAWorker(c
, worker_id
);
94 else if (type
== "dpdk")
95 return new DPDKWorker(c
, worker_id
);
98 lderr(c
) << __func__
<< " ms_async_transport_type " << type
<<
99 " is not supported! " << dendl
;
104 NetworkStack::NetworkStack(CephContext
*c
, const string
&t
): type(t
), started(false), cct(c
)
106 ceph_assert(cct
->_conf
->ms_async_op_threads
> 0);
108 const int InitEventNumber
= 5000;
109 num_workers
= cct
->_conf
->ms_async_op_threads
;
110 if (num_workers
>= EventCenter::MAX_EVENTCENTER
) {
111 ldout(cct
, 0) << __func__
<< " max thread limit is "
112 << EventCenter::MAX_EVENTCENTER
<< ", switching to this now. "
113 << "Higher thread values are unnecessary and currently unsupported."
115 num_workers
= EventCenter::MAX_EVENTCENTER
;
118 for (unsigned worker_id
= 0; worker_id
< num_workers
; ++worker_id
) {
119 Worker
*w
= create_worker(cct
, type
, worker_id
);
120 w
->center
.init(InitEventNumber
, worker_id
, type
);
121 workers
.push_back(w
);
125 void NetworkStack::start()
127 std::unique_lock
<decltype(pool_spin
)> lk(pool_spin
);
133 for (unsigned i
= 0; i
< num_workers
; ++i
) {
134 if (workers
[i
]->is_init())
136 std::function
<void ()> thread
= add_thread(i
);
137 spawn_worker(i
, std::move(thread
));
142 for (unsigned i
= 0; i
< num_workers
; ++i
)
143 workers
[i
]->wait_for_init();
146 Worker
* NetworkStack::get_worker()
148 ldout(cct
, 30) << __func__
<< dendl
;
150 // start with some reasonably large number
151 unsigned min_load
= std::numeric_limits
<int>::max();
152 Worker
* current_best
= nullptr;
155 // find worker with least references
156 // tempting case is returning on references == 0, but in reality
157 // this will happen so rarely that there's no need for special case.
158 for (unsigned i
= 0; i
< num_workers
; ++i
) {
159 unsigned worker_load
= workers
[i
]->references
.load();
160 if (worker_load
< min_load
) {
161 current_best
= workers
[i
];
162 min_load
= worker_load
;
167 ceph_assert(current_best
);
168 ++current_best
->references
;
172 void NetworkStack::stop()
174 std::lock_guard
lk(pool_spin
);
175 for (unsigned i
= 0; i
< num_workers
; ++i
) {
176 workers
[i
]->done
= true;
177 workers
[i
]->center
.wakeup();
183 class C_drain
: public EventCallback
{
184 ceph::mutex drain_lock
= ceph::make_mutex("C_drain::drain_lock");
185 ceph::condition_variable drain_cond
;
186 unsigned drain_count
;
189 explicit C_drain(size_t c
)
191 void do_request(uint64_t id
) override
{
192 std::lock_guard l
{drain_lock
};
194 if (drain_count
== 0) drain_cond
.notify_all();
197 std::unique_lock l
{drain_lock
};
198 drain_cond
.wait(l
, [this] { return drain_count
== 0; });
202 void NetworkStack::drain()
204 ldout(cct
, 30) << __func__
<< " started." << dendl
;
205 pthread_t cur
= pthread_self();
207 C_drain
drain(num_workers
);
208 for (unsigned i
= 0; i
< num_workers
; ++i
) {
209 ceph_assert(cur
!= workers
[i
]->center
.get_owner());
210 workers
[i
]->center
.dispatch_event_external(EventCallbackRef(&drain
));
214 ldout(cct
, 30) << __func__
<< " end." << dendl
;