1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSky <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
19 #include "include/compat.h"
20 #include "common/Cond.h"
21 #include "common/errno.h"
22 #include "PosixStack.h"
24 #include "rdma/RDMAStack.h"
27 #include "dpdk/DPDKStack.h"
30 #include "common/dout.h"
31 #include "include/ceph_assert.h"
33 #define dout_subsys ceph_subsys_ms
35 #define dout_prefix *_dout << "stack "
37 std::function
<void ()> NetworkStack::add_thread(unsigned worker_id
)
39 Worker
*w
= workers
[worker_id
];
42 sprintf(tp_name
, "msgr-worker-%u", w
->id
);
43 ceph_pthread_setname(pthread_self(), tp_name
);
44 const unsigned EventMaxWaitUs
= 30000000;
45 w
->center
.set_owner();
46 ldout(cct
, 10) << __func__
<< " starting" << dendl
;
50 ldout(cct
, 30) << __func__
<< " calling event process" << dendl
;
53 int r
= w
->center
.process_events(EventMaxWaitUs
, &dur
);
55 ldout(cct
, 20) << __func__
<< " process events failed: "
56 << cpp_strerror(errno
) << dendl
;
59 w
->perf_logger
->tinc(l_msgr_running_total_time
, dur
);
66 std::shared_ptr
<NetworkStack
> NetworkStack::create(CephContext
*c
,
69 std::shared_ptr
<NetworkStack
> stack
= nullptr;
72 stack
.reset(new PosixNetworkStack(c
));
75 stack
.reset(new RDMAStack(c
));
79 stack
.reset(new DPDKStack(c
));
82 if (stack
== nullptr) {
83 lderr(c
) << __func__
<< " ms_async_transport_type " << t
<<
84 " is not supported! " << dendl
;
89 const int InitEventNumber
= 5000;
90 for (unsigned worker_id
= 0; worker_id
< stack
->num_workers
; ++worker_id
) {
91 Worker
*w
= stack
->create_worker(c
, worker_id
);
92 int ret
= w
->center
.init(InitEventNumber
, worker_id
, t
);
94 throw std::system_error(-ret
, std::generic_category());
95 stack
->workers
.push_back(w
);
101 NetworkStack::NetworkStack(CephContext
*c
)
104 ceph_assert(cct
->_conf
->ms_async_op_threads
> 0);
106 num_workers
= cct
->_conf
->ms_async_op_threads
;
107 if (num_workers
>= EventCenter::MAX_EVENTCENTER
) {
108 ldout(cct
, 0) << __func__
<< " max thread limit is "
109 << EventCenter::MAX_EVENTCENTER
<< ", switching to this now. "
110 << "Higher thread values are unnecessary and currently unsupported."
112 num_workers
= EventCenter::MAX_EVENTCENTER
;
116 void NetworkStack::start()
118 std::unique_lock
<decltype(pool_spin
)> lk(pool_spin
);
124 for (unsigned i
= 0; i
< num_workers
; ++i
) {
125 if (workers
[i
]->is_init())
127 std::function
<void ()> thread
= add_thread(i
);
128 spawn_worker(i
, std::move(thread
));
133 for (unsigned i
= 0; i
< num_workers
; ++i
)
134 workers
[i
]->wait_for_init();
137 Worker
* NetworkStack::get_worker()
139 ldout(cct
, 30) << __func__
<< dendl
;
141 // start with some reasonably large number
142 unsigned min_load
= std::numeric_limits
<int>::max();
143 Worker
* current_best
= nullptr;
146 // find worker with least references
147 // tempting case is returning on references == 0, but in reality
148 // this will happen so rarely that there's no need for special case.
149 for (unsigned i
= 0; i
< num_workers
; ++i
) {
150 unsigned worker_load
= workers
[i
]->references
.load();
151 if (worker_load
< min_load
) {
152 current_best
= workers
[i
];
153 min_load
= worker_load
;
158 ceph_assert(current_best
);
159 ++current_best
->references
;
163 void NetworkStack::stop()
165 std::lock_guard
lk(pool_spin
);
166 for (unsigned i
= 0; i
< num_workers
; ++i
) {
167 workers
[i
]->done
= true;
168 workers
[i
]->center
.wakeup();
174 class C_drain
: public EventCallback
{
175 ceph::mutex drain_lock
= ceph::make_mutex("C_drain::drain_lock");
176 ceph::condition_variable drain_cond
;
177 unsigned drain_count
;
180 explicit C_drain(size_t c
)
182 void do_request(uint64_t id
) override
{
183 std::lock_guard l
{drain_lock
};
185 if (drain_count
== 0) drain_cond
.notify_all();
188 std::unique_lock l
{drain_lock
};
189 drain_cond
.wait(l
, [this] { return drain_count
== 0; });
193 void NetworkStack::drain()
195 ldout(cct
, 30) << __func__
<< " started." << dendl
;
196 pthread_t cur
= pthread_self();
198 C_drain
drain(num_workers
);
199 for (unsigned i
= 0; i
< num_workers
; ++i
) {
200 ceph_assert(cur
!= workers
[i
]->center
.get_owner());
201 workers
[i
]->center
.dispatch_event_external(EventCallbackRef(&drain
));
205 ldout(cct
, 30) << __func__
<< " end." << dendl
;