]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/async/Stack.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / msg / async / Stack.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2016 XSky <haomai@xsky.com>
7 *
8 * Author: Haomai Wang <haomaiwang@gmail.com>
9 *
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
14 *
15 */
16
11fdf7f2
TL
17#include <mutex>
18
7c673cae
FG
19#include "include/compat.h"
20#include "common/Cond.h"
21#include "common/errno.h"
22#include "PosixStack.h"
23#ifdef HAVE_RDMA
24#include "rdma/RDMAStack.h"
25#endif
26#ifdef HAVE_DPDK
27#include "dpdk/DPDKStack.h"
28#endif
29
30#include "common/dout.h"
11fdf7f2 31#include "include/ceph_assert.h"
7c673cae
FG
32
33#define dout_subsys ceph_subsys_ms
34#undef dout_prefix
35#define dout_prefix *_dout << "stack "
36
9f95a23c 37std::function<void ()> NetworkStack::add_thread(unsigned worker_id)
7c673cae 38{
9f95a23c 39 Worker *w = workers[worker_id];
7c673cae
FG
40 return [this, w]() {
41 char tp_name[16];
11fdf7f2 42 sprintf(tp_name, "msgr-worker-%u", w->id);
7c673cae 43 ceph_pthread_setname(pthread_self(), tp_name);
11fdf7f2 44 const unsigned EventMaxWaitUs = 30000000;
7c673cae
FG
45 w->center.set_owner();
46 ldout(cct, 10) << __func__ << " starting" << dendl;
47 w->initialize();
48 w->init_done();
49 while (!w->done) {
50 ldout(cct, 30) << __func__ << " calling event process" << dendl;
51
31f18b77
FG
52 ceph::timespan dur;
53 int r = w->center.process_events(EventMaxWaitUs, &dur);
7c673cae
FG
54 if (r < 0) {
55 ldout(cct, 20) << __func__ << " process events failed: "
56 << cpp_strerror(errno) << dendl;
57 // TODO do something?
58 }
31f18b77 59 w->perf_logger->tinc(l_msgr_running_total_time, dur);
7c673cae
FG
60 }
61 w->reset();
62 w->destroy();
63 };
64}
65
f67539c2
TL
66std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c,
67 const std::string &t)
7c673cae 68{
f67539c2
TL
69 std::shared_ptr<NetworkStack> stack = nullptr;
70
7c673cae 71 if (t == "posix")
f67539c2 72 stack.reset(new PosixNetworkStack(c));
7c673cae
FG
73#ifdef HAVE_RDMA
74 else if (t == "rdma")
f67539c2 75 stack.reset(new RDMAStack(c));
7c673cae
FG
76#endif
77#ifdef HAVE_DPDK
78 else if (t == "dpdk")
f67539c2 79 stack.reset(new DPDKStack(c));
7c673cae
FG
80#endif
81
f67539c2
TL
82 if (stack == nullptr) {
83 lderr(c) << __func__ << " ms_async_transport_type " << t <<
7c673cae 84 " is not supported! " << dendl;
f67539c2
TL
85 ceph_abort();
86 return nullptr;
87 }
88
89 const int InitEventNumber = 5000;
90 for (unsigned worker_id = 0; worker_id < stack->num_workers; ++worker_id) {
91 Worker *w = stack->create_worker(c, worker_id);
92 int ret = w->center.init(InitEventNumber, worker_id, t);
93 if (ret)
94 throw std::system_error(-ret, std::generic_category());
95 stack->workers.push_back(w);
96 }
7c673cae 97
f67539c2 98 return stack;
7c673cae
FG
99}
100
f67539c2
TL
101NetworkStack::NetworkStack(CephContext *c)
102 : cct(c)
7c673cae 103{
11fdf7f2 104 ceph_assert(cct->_conf->ms_async_op_threads > 0);
31f18b77 105
7c673cae
FG
106 num_workers = cct->_conf->ms_async_op_threads;
107 if (num_workers >= EventCenter::MAX_EVENTCENTER) {
108 ldout(cct, 0) << __func__ << " max thread limit is "
109 << EventCenter::MAX_EVENTCENTER << ", switching to this now. "
110 << "Higher thread values are unnecessary and currently unsupported."
111 << dendl;
112 num_workers = EventCenter::MAX_EVENTCENTER;
113 }
7c673cae
FG
114}
115
116void NetworkStack::start()
117{
11fdf7f2
TL
118 std::unique_lock<decltype(pool_spin)> lk(pool_spin);
119
7c673cae 120 if (started) {
7c673cae
FG
121 return ;
122 }
123
124 for (unsigned i = 0; i < num_workers; ++i) {
125 if (workers[i]->is_init())
126 continue;
127 std::function<void ()> thread = add_thread(i);
128 spawn_worker(i, std::move(thread));
129 }
130 started = true;
11fdf7f2 131 lk.unlock();
7c673cae
FG
132
133 for (unsigned i = 0; i < num_workers; ++i)
134 workers[i]->wait_for_init();
135}
136
137Worker* NetworkStack::get_worker()
138{
139 ldout(cct, 30) << __func__ << dendl;
140
141 // start with some reasonably large number
142 unsigned min_load = std::numeric_limits<int>::max();
143 Worker* current_best = nullptr;
144
145 pool_spin.lock();
146 // find worker with least references
147 // tempting case is returning on references == 0, but in reality
148 // this will happen so rarely that there's no need for special case.
149 for (unsigned i = 0; i < num_workers; ++i) {
150 unsigned worker_load = workers[i]->references.load();
151 if (worker_load < min_load) {
152 current_best = workers[i];
153 min_load = worker_load;
154 }
155 }
156
157 pool_spin.unlock();
11fdf7f2 158 ceph_assert(current_best);
7c673cae
FG
159 ++current_best->references;
160 return current_best;
161}
162
163void NetworkStack::stop()
164{
9f95a23c 165 std::lock_guard lk(pool_spin);
7c673cae
FG
166 for (unsigned i = 0; i < num_workers; ++i) {
167 workers[i]->done = true;
168 workers[i]->center.wakeup();
169 join_worker(i);
170 }
171 started = false;
172}
173
174class C_drain : public EventCallback {
9f95a23c
TL
175 ceph::mutex drain_lock = ceph::make_mutex("C_drain::drain_lock");
176 ceph::condition_variable drain_cond;
7c673cae
FG
177 unsigned drain_count;
178
179 public:
180 explicit C_drain(size_t c)
9f95a23c 181 : drain_count(c) {}
11fdf7f2 182 void do_request(uint64_t id) override {
9f95a23c 183 std::lock_guard l{drain_lock};
7c673cae 184 drain_count--;
9f95a23c 185 if (drain_count == 0) drain_cond.notify_all();
7c673cae
FG
186 }
187 void wait() {
9f95a23c
TL
188 std::unique_lock l{drain_lock};
189 drain_cond.wait(l, [this] { return drain_count == 0; });
7c673cae
FG
190 }
191};
192
193void NetworkStack::drain()
194{
195 ldout(cct, 30) << __func__ << " started." << dendl;
196 pthread_t cur = pthread_self();
197 pool_spin.lock();
198 C_drain drain(num_workers);
199 for (unsigned i = 0; i < num_workers; ++i) {
11fdf7f2 200 ceph_assert(cur != workers[i]->center.get_owner());
7c673cae
FG
201 workers[i]->center.dispatch_event_external(EventCallbackRef(&drain));
202 }
203 pool_spin.unlock();
204 drain.wait();
205 ldout(cct, 30) << __func__ << " end." << dendl;
206}