]>
git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/Stack.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSky <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #include "include/compat.h"
18 #include "common/Cond.h"
19 #include "common/errno.h"
20 #include "PosixStack.h"
22 #include "rdma/RDMAStack.h"
25 #include "dpdk/DPDKStack.h"
28 #include "common/dout.h"
29 #include "include/assert.h"
31 #define dout_subsys ceph_subsys_ms
33 #define dout_prefix *_dout << "stack "
35 std::function
<void ()> NetworkStack::add_thread(unsigned i
)
37 Worker
*w
= workers
[i
];
40 sprintf(tp_name
, "msgr-worker-%d", w
->id
);
41 ceph_pthread_setname(pthread_self(), tp_name
);
42 const uint64_t EventMaxWaitUs
= 30000000;
43 w
->center
.set_owner();
44 ldout(cct
, 10) << __func__
<< " starting" << dendl
;
48 ldout(cct
, 30) << __func__
<< " calling event process" << dendl
;
50 int r
= w
->center
.process_events(EventMaxWaitUs
);
52 ldout(cct
, 20) << __func__
<< " process events failed: "
53 << cpp_strerror(errno
) << dendl
;
62 std::shared_ptr
<NetworkStack
> NetworkStack::create(CephContext
*c
, const string
&t
)
65 return std::make_shared
<PosixNetworkStack
>(c
, t
);
68 return std::make_shared
<RDMAStack
>(c
, t
);
72 return std::make_shared
<DPDKStack
>(c
, t
);
75 lderr(c
) << __func__
<< " ms_async_transport_type " << t
<<
76 " is not supported! " << dendl
;
81 Worker
* NetworkStack::create_worker(CephContext
*c
, const string
&type
, unsigned i
)
84 return new PosixWorker(c
, i
);
86 else if (type
== "rdma")
87 return new RDMAWorker(c
, i
);
90 else if (type
== "dpdk")
91 return new DPDKWorker(c
, i
);
94 lderr(c
) << __func__
<< " ms_async_transport_type " << type
<<
95 " is not supported! " << dendl
;
100 NetworkStack::NetworkStack(CephContext
*c
, const string
&t
): type(t
), started(false), cct(c
)
102 const uint64_t InitEventNumber
= 5000;
103 num_workers
= cct
->_conf
->ms_async_op_threads
;
104 if (num_workers
>= EventCenter::MAX_EVENTCENTER
) {
105 ldout(cct
, 0) << __func__
<< " max thread limit is "
106 << EventCenter::MAX_EVENTCENTER
<< ", switching to this now. "
107 << "Higher thread values are unnecessary and currently unsupported."
109 num_workers
= EventCenter::MAX_EVENTCENTER
;
112 for (unsigned i
= 0; i
< num_workers
; ++i
) {
113 Worker
*w
= create_worker(cct
, type
, i
);
114 w
->center
.init(InitEventNumber
, i
, type
);
115 workers
.push_back(w
);
117 cct
->register_fork_watcher(this);
120 void NetworkStack::start()
128 for (unsigned i
= 0; i
< num_workers
; ++i
) {
129 if (workers
[i
]->is_init())
131 std::function
<void ()> thread
= add_thread(i
);
132 spawn_worker(i
, std::move(thread
));
137 for (unsigned i
= 0; i
< num_workers
; ++i
)
138 workers
[i
]->wait_for_init();
141 Worker
* NetworkStack::get_worker()
143 ldout(cct
, 30) << __func__
<< dendl
;
145 // start with some reasonably large number
146 unsigned min_load
= std::numeric_limits
<int>::max();
147 Worker
* current_best
= nullptr;
150 // find worker with least references
151 // tempting case is returning on references == 0, but in reality
152 // this will happen so rarely that there's no need for special case.
153 for (unsigned i
= 0; i
< num_workers
; ++i
) {
154 unsigned worker_load
= workers
[i
]->references
.load();
155 if (worker_load
< min_load
) {
156 current_best
= workers
[i
];
157 min_load
= worker_load
;
162 assert(current_best
);
163 ++current_best
->references
;
167 void NetworkStack::stop()
169 Spinlock::Locker
l(pool_spin
);
170 for (unsigned i
= 0; i
< num_workers
; ++i
) {
171 workers
[i
]->done
= true;
172 workers
[i
]->center
.wakeup();
178 class C_drain
: public EventCallback
{
181 unsigned drain_count
;
184 explicit C_drain(size_t c
)
185 : drain_lock("C_drain::drain_lock"),
187 void do_request(int id
) override
{
188 Mutex::Locker
l(drain_lock
);
190 if (drain_count
== 0) drain_cond
.Signal();
193 Mutex::Locker
l(drain_lock
);
195 drain_cond
.Wait(drain_lock
);
199 void NetworkStack::drain()
201 ldout(cct
, 30) << __func__
<< " started." << dendl
;
202 pthread_t cur
= pthread_self();
204 C_drain
drain(num_workers
);
205 for (unsigned i
= 0; i
< num_workers
; ++i
) {
206 assert(cur
!= workers
[i
]->center
.get_owner());
207 workers
[i
]->center
.dispatch_event_external(EventCallbackRef(&drain
));
211 ldout(cct
, 30) << __func__
<< " end." << dendl
;