1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2016 XSKY <haomai@xsky.com>
8 * Author: Haomai Wang <haomaiwang@gmail.com>
10 * This is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU Lesser General Public
12 * License version 2.1, as published by the Free Software
13 * Foundation. See file COPYING.
17 #ifndef CEPH_MSG_ASYNC_STACK_H
18 #define CEPH_MSG_ASYNC_STACK_H
20 #include "common/perf_counters.h"
21 #include "common/perf_counters_key.h"
22 #include "include/spinlock.h"
23 #include "msg/async/Event.h"
24 #include "msg/msg_types.h"
28 class ConnectedSocketImpl
{
30 virtual ~ConnectedSocketImpl() {}
31 virtual int is_connected() = 0;
32 virtual ssize_t
read(char*, size_t) = 0;
33 virtual ssize_t
send(ceph::buffer::list
&bl
, bool more
) = 0;
34 virtual void shutdown() = 0;
35 virtual void close() = 0;
36 virtual int fd() const = 0;
37 virtual void set_priority(int sd
, int prio
, int domain
) = 0;
40 class ConnectedSocket
;
41 struct SocketOptions
{
46 entity_addr_t connect_bind_addr
;
50 class ServerSocketImpl
{
52 unsigned addr_type
; ///< entity_addr_t::TYPE_*
53 unsigned addr_slot
; ///< position of our addr in myaddrs().v
54 ServerSocketImpl(unsigned type
, unsigned slot
)
55 : addr_type(type
), addr_slot(slot
) {}
56 virtual ~ServerSocketImpl() {}
57 virtual int accept(ConnectedSocket
*sock
, const SocketOptions
&opt
, entity_addr_t
*out
, Worker
*w
) = 0;
58 virtual void abort_accept() = 0;
59 /// Get file descriptor
60 virtual int fd() const = 0;
64 /// \addtogroup networking-module
67 /// A TCP (or other stream-based protocol) connection.
69 /// A \c ConnectedSocket represents a full-duplex stream between
70 /// two endpoints, a local endpoint and a remote endpoint.
71 class ConnectedSocket
{
72 std::unique_ptr
<ConnectedSocketImpl
> _csi
;
75 /// Constructs a \c ConnectedSocket not corresponding to a connection
78 explicit ConnectedSocket(std::unique_ptr
<ConnectedSocketImpl
> csi
)
79 : _csi(std::move(csi
)) {}
85 /// Moves a \c ConnectedSocket object.
86 ConnectedSocket(ConnectedSocket
&& cs
) = default;
87 /// Move-assigns a \c ConnectedSocket object.
88 ConnectedSocket
& operator=(ConnectedSocket
&& cs
) = default;
91 return _csi
->is_connected();
93 /// Read the input stream with copy.
95 /// Copy an object returning data sent from the remote endpoint.
96 ssize_t
read(char* buf
, size_t len
) {
97 return _csi
->read(buf
, len
);
99 /// Gets the output stream.
101 /// Gets an object that sends data to the remote endpoint.
102 ssize_t
send(ceph::buffer::list
&bl
, bool more
) {
103 return _csi
->send(bl
, more
);
105 /// Disables output to the socket.
107 /// Current or future writes that have not been successfully flushed
108 /// will immediately fail with an error. This is useful to abort
109 /// operations on a socket that is not making progress due to a
112 return _csi
->shutdown();
114 /// Disables input from the socket.
116 /// Current or future reads will immediately fail with an error.
117 /// This is useful to abort operations on a socket that is not making
118 /// progress due to a peer failure.
124 /// Get file descriptor
129 void set_priority(int sd
, int prio
, int domain
) {
130 _csi
->set_priority(sd
, prio
, domain
);
133 explicit operator bool() const {
139 /// \addtogroup networking-module
142 /// A listening socket, waiting to accept incoming network connections.
144 std::unique_ptr
<ServerSocketImpl
> _ssi
;
146 /// Constructs a \c ServerSocket not corresponding to a connection
149 explicit ServerSocket(std::unique_ptr
<ServerSocketImpl
> ssi
)
150 : _ssi(std::move(ssi
)) {}
153 _ssi
->abort_accept();
156 /// Moves a \c ServerSocket object.
157 ServerSocket(ServerSocket
&& ss
) = default;
158 /// Move-assigns a \c ServerSocket object.
159 ServerSocket
& operator=(ServerSocket
&& cs
) = default;
161 /// Accepts the next connection to successfully connect to this socket.
163 /// \Accepts a \ref ConnectedSocket representing the connection, and
164 /// a \ref entity_addr_t describing the remote endpoint.
165 int accept(ConnectedSocket
*sock
, const SocketOptions
&opt
, entity_addr_t
*out
, Worker
*w
) {
166 return _ssi
->accept(sock
, opt
, out
, w
);
169 /// Stops any \ref accept() in progress.
171 /// Current and future \ref accept() calls will terminate immediately
173 void abort_accept() {
174 _ssi
->abort_accept();
178 /// Get file descriptor
183 /// get listen/bind addr
184 unsigned get_addr_slot() {
185 return _ssi
->addr_slot
;
188 explicit operator bool() const {
197 l_msgr_first
= 94000,
198 l_msgr_recv_messages
,
199 l_msgr_send_messages
,
202 l_msgr_created_connections
,
203 l_msgr_active_connections
,
205 l_msgr_running_total_time
,
206 l_msgr_running_send_time
,
207 l_msgr_running_recv_time
,
208 l_msgr_running_fast_dispatch_time
,
210 l_msgr_send_messages_queue_lat
,
211 l_msgr_handle_ack_lat
,
213 l_msgr_recv_encrypted_bytes
,
214 l_msgr_send_encrypted_bytes
,
220 l_msgr_labeled_first
= l_msgr_last
+ 1,
222 l_msgr_connection_ready_timeouts
,
223 l_msgr_connection_idle_timeouts
,
229 std::mutex init_lock
;
230 std::condition_variable init_cond
;
237 PerfCounters
*perf_logger
;
238 PerfCounters
*perf_labeled_logger
;
241 std::atomic_uint references
;
244 Worker(const Worker
&) = delete;
245 Worker
& operator=(const Worker
&) = delete;
247 Worker(CephContext
*c
, unsigned worker_id
)
248 : cct(c
), id(worker_id
), references(0), center(c
) {
250 char name_prefix
[] = "AsyncMessenger::Worker";
251 sprintf(name
, "%s-%u", name_prefix
, id
);
253 // initialize perf_logger
254 PerfCountersBuilder
plb(cct
, name
, l_msgr_first
, l_msgr_last
);
256 plb
.add_u64_counter(l_msgr_recv_messages
, "msgr_recv_messages", "Network received messages");
257 plb
.add_u64_counter(l_msgr_send_messages
, "msgr_send_messages", "Network sent messages");
258 plb
.add_u64_counter(l_msgr_recv_bytes
, "msgr_recv_bytes", "Network received bytes", NULL
, 0, unit_t(UNIT_BYTES
));
259 plb
.add_u64_counter(l_msgr_send_bytes
, "msgr_send_bytes", "Network sent bytes", NULL
, 0, unit_t(UNIT_BYTES
));
260 plb
.add_u64_counter(l_msgr_active_connections
, "msgr_active_connections", "Active connection number");
261 plb
.add_u64_counter(l_msgr_created_connections
, "msgr_created_connections", "Created connection number");
263 plb
.add_time(l_msgr_running_total_time
, "msgr_running_total_time", "The total time of thread running");
264 plb
.add_time(l_msgr_running_send_time
, "msgr_running_send_time", "The total time of message sending");
265 plb
.add_time(l_msgr_running_recv_time
, "msgr_running_recv_time", "The total time of message receiving");
266 plb
.add_time(l_msgr_running_fast_dispatch_time
, "msgr_running_fast_dispatch_time", "The total time of fast dispatch");
268 plb
.add_time_avg(l_msgr_send_messages_queue_lat
, "msgr_send_messages_queue_lat", "Network sent messages lat");
269 plb
.add_time_avg(l_msgr_handle_ack_lat
, "msgr_handle_ack_lat", "Connection handle ack lat");
271 plb
.add_u64_counter(l_msgr_recv_encrypted_bytes
, "msgr_recv_encrypted_bytes", "Network received encrypted bytes", NULL
, 0, unit_t(UNIT_BYTES
));
272 plb
.add_u64_counter(l_msgr_send_encrypted_bytes
, "msgr_send_encrypted_bytes", "Network sent encrypted bytes", NULL
, 0, unit_t(UNIT_BYTES
));
274 perf_logger
= plb
.create_perf_counters();
275 cct
->get_perfcounters_collection()->add(perf_logger
);
277 // Add labeled perfcounters
278 std::string labels
= ceph::perf_counters::key_create(
279 name_prefix
, {{"id", std::to_string(id
)}});
280 PerfCountersBuilder
plb_labeled(
281 cct
, labels
, l_msgr_labeled_first
,
282 l_msgr_labeled_last
);
284 plb_labeled
.add_u64_counter(
285 l_msgr_connection_ready_timeouts
, "msgr_connection_ready_timeouts",
286 "Number of not yet ready connections declared as dead", NULL
,
287 PerfCountersBuilder::PRIO_USEFUL
);
288 plb_labeled
.add_u64_counter(
289 l_msgr_connection_idle_timeouts
, "msgr_connection_idle_timeouts",
290 "Number of connections closed due to idleness", NULL
,
291 PerfCountersBuilder::PRIO_USEFUL
);
293 perf_labeled_logger
= plb_labeled
.create_perf_counters();
294 cct
->get_perfcounters_collection()->add(perf_labeled_logger
);
298 cct
->get_perfcounters_collection()->remove(perf_logger
);
301 if (perf_labeled_logger
) {
302 cct
->get_perfcounters_collection()->remove(perf_labeled_logger
);
303 delete perf_labeled_logger
;
307 virtual int listen(entity_addr_t
&addr
, unsigned addr_slot
,
308 const SocketOptions
&opts
, ServerSocket
*) = 0;
309 virtual int connect(const entity_addr_t
&addr
,
310 const SocketOptions
&opts
, ConnectedSocket
*socket
) = 0;
311 virtual void destroy() {}
313 virtual void initialize() {}
314 PerfCounters
*get_perf_counter() { return perf_logger
; }
315 PerfCounters
*get_labeled_perf_counter() { return perf_labeled_logger
; }
316 void release_worker() {
317 int oldref
= references
.fetch_sub(1);
318 ceph_assert(oldref
> 0);
323 init_cond
.notify_all();
327 std::lock_guard
<std::mutex
> l(init_lock
);
330 void wait_for_init() {
331 std::unique_lock
<std::mutex
> l(init_lock
);
338 init_cond
.notify_all();
345 ceph::spinlock pool_spin
;
346 bool started
= false;
348 std::function
<void ()> add_thread(Worker
* w
);
350 virtual Worker
* create_worker(CephContext
*c
, unsigned i
) = 0;
351 virtual void rename_thread(unsigned id
) {
352 static constexpr int TASK_COMM_LEN
= 16;
353 char tp_name
[TASK_COMM_LEN
];
354 sprintf(tp_name
, "msgr-worker-%u", id
);
355 ceph_pthread_setname(pthread_self(), tp_name
);
360 std::vector
<Worker
*> workers
;
362 explicit NetworkStack(CephContext
*c
);
364 NetworkStack(const NetworkStack
&) = delete;
365 NetworkStack
& operator=(const NetworkStack
&) = delete;
366 virtual ~NetworkStack() {
367 for (auto &&w
: workers
)
371 static std::shared_ptr
<NetworkStack
> create(
372 CephContext
*c
, const std::string
&type
);
374 // backend need to override this method if backend doesn't support shared
376 // For example, posix backend has in kernel global listen table. If one
377 // thread bind a port, other threads also aware this.
378 // But for dpdk backend, we maintain listen table in each thread. So we
379 // need to let each thread do binding port.
380 virtual bool support_local_listen_table() const { return false; }
381 virtual bool nonblock_connect_need_writable_event() const { return true; }
385 virtual Worker
*get_worker();
386 Worker
*get_worker(unsigned worker_id
) {
387 return workers
[worker_id
];
390 unsigned get_num_worker() const {
391 return workers
.size();
394 // direct is used in tests only
395 virtual void spawn_worker(std::function
<void ()> &&) = 0;
396 virtual void join_worker(unsigned i
) = 0;
398 virtual bool is_ready() { return true; };
399 virtual void ready() { };
402 #endif //CEPH_MSG_ASYNC_STACK_H