]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
7c673cae FG |
2 | // vim: ts=8 sw=2 smarttab |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
11fdf7f2 | 10 | * License version 2.1, as published by the Free Software |
7c673cae | 11 | * Foundation. See file COPYING. |
11fdf7f2 | 12 | * |
7c673cae FG |
13 | */ |
14 | ||
15 | ||
16 | ||
17 | #ifndef CEPH_MESSENGER_H | |
18 | #define CEPH_MESSENGER_H | |
19 | ||
11fdf7f2 | 20 | #include <deque> |
39ae355f TL |
21 | #include <map> |
22 | #include <optional> | |
11fdf7f2 TL |
23 | |
24 | #include <errno.h> | |
25 | #include <sstream> | |
26 | #include <memory> | |
7c673cae FG |
27 | |
28 | #include "Message.h" | |
29 | #include "Dispatcher.h" | |
11fdf7f2 | 30 | #include "Policy.h" |
11fdf7f2 | 31 | #include "common/Throttle.h" |
7c673cae FG |
32 | #include "include/Context.h" |
33 | #include "include/types.h" | |
34 | #include "include/ceph_features.h" | |
35 | #include "auth/Crypto.h" | |
11fdf7f2 TL |
36 | #include "common/item_history.h" |
37 | #include "auth/AuthRegistry.h" | |
20effc67 | 38 | #include "compressor_registry.h" |
11fdf7f2 | 39 | #include "include/ceph_assert.h" |
7c673cae FG |
40 | |
41 | #include <errno.h> | |
42 | #include <sstream> | |
3efd9988 | 43 | #include <signal.h> |
7c673cae FG |
44 | |
45 | #define SOCKET_PRIORITY_MIN_DELAY 6 | |
46 | ||
47 | class Timer; | |
48 | ||
11fdf7f2 TL |
49 | class AuthClient; |
50 | class AuthServer; | |
51 | ||
52 | #ifdef UNIT_TESTS_BUILT | |
53 | ||
54 | struct Interceptor { | |
55 | std::mutex lock; | |
56 | std::condition_variable cond_var; | |
57 | ||
58 | enum ACTION : uint32_t { | |
59 | CONTINUE = 0, | |
60 | FAIL, | |
61 | STOP | |
62 | }; | |
63 | ||
f67539c2 TL |
64 | enum STEP { |
65 | START_CLIENT_BANNER_EXCHANGE = 1, | |
66 | START_SERVER_BANNER_EXCHANGE, | |
67 | BANNER_EXCHANGE_BANNER_CONNECTING, | |
68 | BANNER_EXCHANGE, | |
69 | HANDLE_PEER_BANNER_BANNER_CONNECTING, | |
70 | HANDLE_PEER_BANNER, | |
71 | HANDLE_PEER_BANNER_PAYLOAD_HELLO_CONNECTING, | |
72 | HANDLE_PEER_BANNER_PAYLOAD, | |
73 | SEND_AUTH_REQUEST, | |
74 | HANDLE_AUTH_REQUEST_ACCEPTING_SIGN, | |
75 | SEND_CLIENT_IDENTITY, | |
76 | SEND_SERVER_IDENTITY, | |
77 | SEND_RECONNECT, | |
78 | SEND_RECONNECT_OK, | |
79 | READY, | |
80 | HANDLE_MESSAGE, | |
81 | READ_MESSAGE_COMPLETE, | |
20effc67 TL |
82 | SESSION_RETRY, |
83 | SEND_COMPRESSION_REQUEST, | |
84 | HANDLE_COMPRESSION_REQUEST | |
f67539c2 TL |
85 | }; |
86 | ||
11fdf7f2 TL |
87 | virtual ~Interceptor() {} |
88 | virtual ACTION intercept(Connection *conn, uint32_t step) = 0; | |
89 | }; | |
90 | ||
91 | #endif | |
7c673cae FG |
92 | |
93 | class Messenger { | |
94 | private: | |
11fdf7f2 TL |
95 | std::deque<Dispatcher*> dispatchers; |
96 | std::deque<Dispatcher*> fast_dispatchers; | |
7c673cae FG |
97 | ZTracer::Endpoint trace_endpoint; |
98 | ||
11fdf7f2 | 99 | protected: |
7c673cae FG |
100 | void set_endpoint_addr(const entity_addr_t& a, |
101 | const entity_name_t &name); | |
102 | ||
103 | protected: | |
104 | /// the "name" of the local daemon. eg client.99 | |
11fdf7f2 TL |
105 | entity_name_t my_name; |
106 | ||
107 | /// my addr | |
108 | safe_item_history<entity_addrvec_t> my_addrs; | |
109 | ||
7c673cae | 110 | int default_send_priority; |
9f95a23c | 111 | /// std::set to true once the Messenger has started, and std::set to false on shutdown |
7c673cae FG |
112 | bool started; |
113 | uint32_t magic; | |
114 | int socket_priority; | |
115 | ||
116 | public: | |
11fdf7f2 TL |
117 | AuthClient *auth_client = 0; |
118 | AuthServer *auth_server = 0; | |
119 | ||
120 | #ifdef UNIT_TESTS_BUILT | |
121 | Interceptor *interceptor = nullptr; | |
122 | #endif | |
123 | ||
7c673cae FG |
124 | /** |
125 | * The CephContext this Messenger uses. Many other components initialize themselves | |
126 | * from this value. | |
127 | */ | |
128 | CephContext *cct; | |
129 | int crcflags; | |
130 | ||
11fdf7f2 | 131 | using Policy = ceph::net::Policy<Throttle>; |
7c673cae | 132 | |
9f95a23c TL |
133 | public: |
134 | // allow unauthenticated connections. This is needed for | |
135 | // compatibility with pre-nautilus OSDs, which do not authenticate | |
136 | // the heartbeat sessions. | |
137 | bool require_authorizer = true; | |
138 | ||
11fdf7f2 TL |
139 | protected: |
140 | // for authentication | |
141 | AuthRegistry auth_registry; | |
142 | ||
143 | public: | |
7c673cae FG |
144 | /** |
145 | * Messenger constructor. Call this from your implementation. | |
146 | * Messenger users should construct full implementations directly, | |
147 | * or use the create() function. | |
148 | */ | |
11fdf7f2 | 149 | Messenger(CephContext *cct_, entity_name_t w); |
7c673cae FG |
150 | virtual ~Messenger() {} |
151 | ||
152 | /** | |
153 | * create a new messenger | |
154 | * | |
155 | * Create a new messenger instance, with whatever implementation is | |
156 | * available or specified via the configuration in cct. | |
157 | * | |
158 | * @param cct context | |
159 | * @param type name of messenger type | |
160 | * @param name entity name to register | |
161 | * @param lname logical name of the messenger in this process (e.g., "client") | |
162 | * @param nonce nonce value to uniquely identify this instance on the current host | |
7c673cae FG |
163 | */ |
164 | static Messenger *create(CephContext *cct, | |
9f95a23c | 165 | const std::string &type, |
7c673cae | 166 | entity_name_t name, |
9f95a23c | 167 | std::string lname, |
f67539c2 | 168 | uint64_t nonce); |
7c673cae | 169 | |
9f95a23c TL |
170 | static uint64_t get_random_nonce(); |
171 | static uint64_t get_pid_nonce(); | |
172 | ||
7c673cae FG |
173 | /** |
174 | * create a new messenger | |
175 | * | |
176 | * Create a new messenger instance. | |
177 | * Same as the above, but a slightly simpler interface for clients: | |
178 | * - Generate a random nonce | |
7c673cae FG |
179 | * - get the messenger type from cct |
180 | * - use the client entity_type | |
181 | * | |
182 | * @param cct context | |
183 | * @param lname logical name of the messenger in this process (e.g., "client") | |
184 | */ | |
9f95a23c | 185 | static Messenger *create_client_messenger(CephContext *cct, std::string lname); |
7c673cae FG |
186 | |
187 | /** | |
188 | * @defgroup Accessors | |
189 | * @{ | |
190 | */ | |
11fdf7f2 TL |
191 | int get_mytype() const { return my_name.type(); } |
192 | ||
7c673cae | 193 | /** |
11fdf7f2 | 194 | * Retrieve the Messenger's name |
7c673cae | 195 | * |
11fdf7f2 | 196 | * @return A const reference to the name this Messenger |
7c673cae FG |
197 | * currently believes to be its own. |
198 | */ | |
11fdf7f2 | 199 | const entity_name_t& get_myname() { return my_name; } |
7c673cae FG |
200 | |
201 | /** | |
202 | * Retrieve the Messenger's address. | |
203 | * | |
204 | * @return A const reference to the address this Messenger | |
205 | * currently believes to be its own. | |
206 | */ | |
11fdf7f2 TL |
207 | const entity_addrvec_t& get_myaddrs() { |
208 | return *my_addrs; | |
209 | } | |
210 | ||
211 | /** | |
212 | * get legacy addr for myself, suitable for protocol v1 | |
213 | * | |
214 | * Note that myaddrs might be a proper addrvec with v1 in it, or it might be an | |
215 | * ANY addr (if i am a pure client). | |
216 | */ | |
217 | entity_addr_t get_myaddr_legacy() { | |
218 | return my_addrs->as_legacy_addr(); | |
219 | } | |
220 | ||
221 | ||
222 | /** | |
9f95a23c | 223 | * std::set messenger's instance |
11fdf7f2 TL |
224 | */ |
225 | uint32_t get_magic() { return magic; } | |
226 | void set_magic(int _magic) { magic = _magic; } | |
227 | ||
228 | void set_auth_client(AuthClient *ac) { | |
229 | auth_client = ac; | |
230 | } | |
231 | void set_auth_server(AuthServer *as) { | |
232 | auth_server = as; | |
233 | } | |
234 | ||
20effc67 TL |
235 | // for compression |
236 | CompressorRegistry comp_registry; | |
237 | ||
7c673cae FG |
238 | protected: |
239 | /** | |
9f95a23c | 240 | * std::set messenger's address |
7c673cae | 241 | */ |
11fdf7f2 TL |
242 | virtual void set_myaddrs(const entity_addrvec_t& a) { |
243 | my_addrs = a; | |
244 | set_endpoint_addr(a.front(), my_name); | |
7c673cae FG |
245 | } |
246 | public: | |
247 | /** | |
248 | * @return the zipkin trace endpoint | |
249 | */ | |
250 | const ZTracer::Endpoint* get_trace_endpoint() const { | |
251 | return &trace_endpoint; | |
252 | } | |
253 | ||
7c673cae | 254 | /** |
9f95a23c | 255 | * set the name of the local entity. The name is reported to others and |
7c673cae FG |
256 | * can be changed while the system is running, but doing so at incorrect |
257 | * times may have bad results. | |
258 | * | |
9f95a23c | 259 | * @param m The name to std::set. |
7c673cae | 260 | */ |
11fdf7f2 TL |
261 | void set_myname(const entity_name_t& m) { my_name = m; } |
262 | ||
7c673cae | 263 | /** |
9f95a23c | 264 | * set the unknown address components for this Messenger. |
7c673cae FG |
265 | * This is useful if the Messenger doesn't know its full address just by |
266 | * binding, but another Messenger on the same interface has already learned | |
267 | * its full address. This function does not fill in known address elements, | |
268 | * cause a rebind, or do anything of that sort. | |
269 | * | |
270 | * @param addr The address to use as a template. | |
271 | */ | |
11fdf7f2 | 272 | virtual bool set_addr_unknowns(const entity_addrvec_t &addrs) = 0; |
39ae355f | 273 | |
7c673cae FG |
274 | /// Get the default send priority. |
275 | int get_default_send_priority() { return default_send_priority; } | |
276 | /** | |
277 | * Get the number of Messages which the Messenger has received | |
278 | * but not yet dispatched. | |
279 | */ | |
280 | virtual int get_dispatch_queue_len() = 0; | |
281 | ||
282 | /** | |
283 | * Get age of oldest undelivered message | |
284 | * (0 if the queue is empty) | |
285 | */ | |
286 | virtual double get_dispatch_queue_max_age(utime_t now) = 0; | |
7c673cae FG |
287 | |
288 | /** | |
289 | * @} // Accessors | |
290 | */ | |
291 | ||
292 | /** | |
293 | * @defgroup Configuration | |
294 | * @{ | |
295 | */ | |
296 | /** | |
9f95a23c | 297 | * set the cluster protocol in use by this daemon. |
7c673cae FG |
298 | * This is an init-time function and cannot be called after calling |
299 | * start() or bind(). | |
300 | * | |
301 | * @param p The cluster protocol to use. Defined externally. | |
302 | */ | |
303 | virtual void set_cluster_protocol(int p) = 0; | |
304 | /** | |
9f95a23c | 305 | * set a policy which is applied to all peers who do not have a type-specific |
7c673cae FG |
306 | * Policy. |
307 | * This is an init-time function and cannot be called after calling | |
308 | * start() or bind(). | |
309 | * | |
310 | * @param p The Policy to apply. | |
311 | */ | |
312 | virtual void set_default_policy(Policy p) = 0; | |
313 | /** | |
9f95a23c | 314 | * set a policy which is applied to all peers of the given type. |
7c673cae FG |
315 | * This is an init-time function and cannot be called after calling |
316 | * start() or bind(). | |
317 | * | |
318 | * @param type The peer type this policy applies to. | |
319 | * @param p The policy to apply. | |
320 | */ | |
321 | virtual void set_policy(int type, Policy p) = 0; | |
322 | /** | |
9f95a23c | 323 | * set the Policy associated with a type of peer. |
7c673cae FG |
324 | * |
325 | * This can be called either on initial setup, or after connections | |
326 | * are already established. However, the policies for existing | |
327 | * connections will not be affected; the new policy will only apply | |
328 | * to future connections. | |
329 | * | |
330 | * @param t The peer type to get the default policy for. | |
331 | * @return A const Policy reference. | |
332 | */ | |
333 | virtual Policy get_policy(int t) = 0; | |
334 | /** | |
335 | * Get the default Policy | |
336 | * | |
337 | * @return A const Policy reference. | |
338 | */ | |
339 | virtual Policy get_default_policy() = 0; | |
340 | /** | |
9f95a23c | 341 | * set Throttlers applied to all Messages from the given type of peer |
7c673cae FG |
342 | * |
343 | * This is an init-time function and cannot be called after calling | |
344 | * start() or bind(). | |
345 | * | |
346 | * @param type The peer type the Throttlers will apply to. | |
347 | * @param bytes The Throttle for the number of bytes carried by the message | |
348 | * @param msgs The Throttle for the number of messages for this @p type | |
349 | * @note The Messenger does not take ownership of the Throttle pointers, but | |
350 | * you must not destroy them before you destroy the Messenger. | |
351 | */ | |
352 | virtual void set_policy_throttlers(int type, Throttle *bytes, Throttle *msgs=NULL) = 0; | |
353 | /** | |
9f95a23c | 354 | * set the default send priority |
7c673cae FG |
355 | * |
356 | * This is an init-time function and must be called *before* calling | |
357 | * start(). | |
358 | * | |
359 | * @param p The cluster protocol to use. Defined externally. | |
360 | */ | |
361 | void set_default_send_priority(int p) { | |
11fdf7f2 | 362 | ceph_assert(!started); |
7c673cae FG |
363 | default_send_priority = p; |
364 | } | |
365 | /** | |
9f95a23c | 366 | * set the priority(SO_PRIORITY) for all packets to be sent on this socket. |
7c673cae FG |
367 | * |
368 | * Linux uses this value to order the networking queues: packets with a higher | |
369 | * priority may be processed first depending on the selected device queueing | |
370 | * discipline. | |
371 | * | |
372 | * @param prio The priority. Setting a priority outside the range 0 to 6 | |
373 | * requires the CAP_NET_ADMIN capability. | |
374 | */ | |
375 | void set_socket_priority(int prio) { | |
376 | socket_priority = prio; | |
377 | } | |
378 | /** | |
379 | * Get the socket priority | |
380 | * | |
381 | * @return the socket priority | |
382 | */ | |
383 | int get_socket_priority() { | |
384 | return socket_priority; | |
385 | } | |
386 | /** | |
387 | * Add a new Dispatcher to the front of the list. If you add | |
388 | * a Dispatcher which is already included, it will get a duplicate | |
389 | * entry. This will reduce efficiency but not break anything. | |
390 | * | |
391 | * @param d The Dispatcher to insert into the list. | |
392 | */ | |
11fdf7f2 | 393 | void add_dispatcher_head(Dispatcher *d) { |
7c673cae FG |
394 | bool first = dispatchers.empty(); |
395 | dispatchers.push_front(d); | |
396 | if (d->ms_can_fast_dispatch_any()) | |
397 | fast_dispatchers.push_front(d); | |
398 | if (first) | |
399 | ready(); | |
400 | } | |
401 | /** | |
402 | * Add a new Dispatcher to the end of the list. If you add | |
403 | * a Dispatcher which is already included, it will get a duplicate | |
404 | * entry. This will reduce efficiency but not break anything. | |
405 | * | |
406 | * @param d The Dispatcher to insert into the list. | |
407 | */ | |
11fdf7f2 | 408 | void add_dispatcher_tail(Dispatcher *d) { |
7c673cae FG |
409 | bool first = dispatchers.empty(); |
410 | dispatchers.push_back(d); | |
411 | if (d->ms_can_fast_dispatch_any()) | |
412 | fast_dispatchers.push_back(d); | |
413 | if (first) | |
414 | ready(); | |
415 | } | |
416 | /** | |
417 | * Bind the Messenger to a specific address. If bind_addr | |
418 | * is not completely filled in the system will use the | |
419 | * valid portions and cycle through the unset ones (eg, the port) | |
420 | * in an unspecified order. | |
421 | * | |
422 | * @param bind_addr The address to bind to. | |
39ae355f | 423 | * @patam public_addrs The addresses to announce over the network |
7c673cae FG |
424 | * @return 0 on success, or -1 on error, or -errno if |
425 | * we can be more specific about the failure. | |
426 | */ | |
39ae355f TL |
427 | virtual int bind(const entity_addr_t& bind_addr, |
428 | std::optional<entity_addrvec_t> public_addrs=std::nullopt) = 0; | |
11fdf7f2 | 429 | |
39ae355f TL |
430 | virtual int bindv(const entity_addrvec_t& bind_addrs, |
431 | std::optional<entity_addrvec_t> public_addrs=std::nullopt); | |
f6b5b4d7 | 432 | |
7c673cae FG |
433 | /** |
434 | * This function performs a full restart of the Messenger component, | |
435 | * whatever that means. Other entities who connect to this | |
436 | * Messenger post-rebind() should perceive it as a new entity which | |
437 | * they have not previously contacted, and it MUST bind to a | |
438 | * different address than it did previously. | |
439 | * | |
440 | * @param avoid_ports Additional port to avoid binding to. | |
441 | */ | |
9f95a23c | 442 | virtual int rebind(const std::set<int>& avoid_ports) { return -EOPNOTSUPP; } |
7c673cae FG |
443 | /** |
444 | * Bind the 'client' Messenger to a specific address.Messenger will bind | |
445 | * the address before connect to others when option ms_bind_before_connect | |
446 | * is true. | |
447 | * @param bind_addr The address to bind to. | |
448 | * @return 0 on success, or -1 on error, or -errno if | |
f6b5b4d7 | 449 | * we can be more specific about the failure. |
7c673cae FG |
450 | */ |
451 | virtual int client_bind(const entity_addr_t& bind_addr) = 0; | |
11fdf7f2 | 452 | |
f6b5b4d7 TL |
453 | /** |
454 | * reset the 'client' Messenger. Mark all the existing Connections down | |
455 | * and update 'nonce'. | |
456 | */ | |
457 | virtual int client_reset() = 0; | |
11fdf7f2 TL |
458 | |
459 | ||
460 | virtual bool should_use_msgr2() { | |
461 | return false; | |
462 | } | |
463 | ||
7c673cae FG |
464 | /** |
465 | * @} // Configuration | |
466 | */ | |
467 | ||
468 | /** | |
469 | * @defgroup Startup/Shutdown | |
470 | * @{ | |
471 | */ | |
472 | /** | |
473 | * Perform any resource allocation, thread startup, etc | |
474 | * that is required before attempting to connect to other | |
475 | * Messengers or transmit messages. | |
476 | * Once this function completes, started shall be set to true. | |
477 | * | |
478 | * @return 0 on success; -errno on failure. | |
479 | */ | |
480 | virtual int start() { started = true; return 0; } | |
481 | ||
482 | // shutdown | |
483 | /** | |
484 | * Block until the Messenger has finished shutting down (according | |
485 | * to the shutdown() function). | |
486 | * It is valid to call this after calling shutdown(), but it must | |
487 | * be called before deleting the Messenger. | |
488 | */ | |
489 | virtual void wait() = 0; | |
490 | /** | |
491 | * Initiate a shutdown of the Messenger. | |
492 | * | |
493 | * @return 0 on success, -errno otherwise. | |
494 | */ | |
495 | virtual int shutdown() { started = false; return 0; } | |
496 | /** | |
497 | * @} // Startup/Shutdown | |
498 | */ | |
499 | ||
500 | /** | |
501 | * @defgroup Messaging | |
502 | * @{ | |
503 | */ | |
504 | /** | |
505 | * Queue the given Message for the given entity. | |
506 | * Success in this function does not guarantee Message delivery, only | |
507 | * success in queueing the Message. Other guarantees may be provided based | |
508 | * on the Connection policy associated with the dest. | |
509 | * | |
510 | * @param m The Message to send. The Messenger consumes a single reference | |
511 | * when you pass it in. | |
512 | * @param dest The entity to send the Message to. | |
513 | * | |
514 | * DEPRECATED: please do not use this interface for any new code; | |
515 | * use the Connection* variant. | |
516 | * | |
517 | * @return 0 on success, or -errno on failure. | |
518 | */ | |
11fdf7f2 TL |
519 | virtual int send_to( |
520 | Message *m, | |
521 | int type, | |
522 | const entity_addrvec_t& addr) = 0; | |
523 | int send_to_mon( | |
524 | Message *m, const entity_addrvec_t& addrs) { | |
525 | return send_to(m, CEPH_ENTITY_TYPE_MON, addrs); | |
526 | } | |
527 | int send_to_mds( | |
528 | Message *m, const entity_addrvec_t& addrs) { | |
529 | return send_to(m, CEPH_ENTITY_TYPE_MDS, addrs); | |
530 | } | |
7c673cae FG |
531 | |
532 | /** | |
533 | * @} // Messaging | |
534 | */ | |
535 | /** | |
536 | * @defgroup Connection Management | |
537 | * @{ | |
538 | */ | |
539 | /** | |
540 | * Get the Connection object associated with a given entity. If a | |
541 | * Connection does not exist, create one and establish a logical connection. | |
542 | * The caller owns a reference when this returns. Call ->put() when you're | |
543 | * done! | |
544 | * | |
545 | * @param dest The entity to get a connection for. | |
546 | */ | |
11fdf7f2 | 547 | virtual ConnectionRef connect_to( |
9f95a23c TL |
548 | int type, const entity_addrvec_t& dest, |
549 | bool anon=false, bool not_local_dest=false) = 0; | |
550 | ConnectionRef connect_to_mon(const entity_addrvec_t& dest, | |
551 | bool anon=false, bool not_local_dest=false) { | |
552 | return connect_to(CEPH_ENTITY_TYPE_MON, dest, anon, not_local_dest); | |
11fdf7f2 | 553 | } |
9f95a23c TL |
554 | ConnectionRef connect_to_mds(const entity_addrvec_t& dest, |
555 | bool anon=false, bool not_local_dest=false) { | |
556 | return connect_to(CEPH_ENTITY_TYPE_MDS, dest, anon, not_local_dest); | |
11fdf7f2 | 557 | } |
9f95a23c TL |
558 | ConnectionRef connect_to_osd(const entity_addrvec_t& dest, |
559 | bool anon=false, bool not_local_dest=false) { | |
560 | return connect_to(CEPH_ENTITY_TYPE_OSD, dest, anon, not_local_dest); | |
11fdf7f2 | 561 | } |
9f95a23c TL |
562 | ConnectionRef connect_to_mgr(const entity_addrvec_t& dest, |
563 | bool anon=false, bool not_local_dest=false) { | |
564 | return connect_to(CEPH_ENTITY_TYPE_MGR, dest, anon, not_local_dest); | |
11fdf7f2 TL |
565 | } |
566 | ||
7c673cae FG |
567 | /** |
568 | * Get the Connection object associated with ourselves. | |
569 | */ | |
570 | virtual ConnectionRef get_loopback_connection() = 0; | |
571 | /** | |
572 | * Mark down a Connection to a remote. | |
573 | * | |
574 | * This will cause us to discard our outgoing queue for them, and if | |
575 | * reset detection is enabled in the policy and the endpoint tries | |
576 | * to reconnect they will discard their queue when we inform them of | |
577 | * the session reset. | |
578 | * | |
579 | * If there is no Connection to the given dest, it is a no-op. | |
580 | * | |
581 | * This generates a RESET notification to the Dispatcher. | |
582 | * | |
583 | * DEPRECATED: please do not use this interface for any new code; | |
584 | * use the Connection* variant. | |
585 | * | |
586 | * @param a The address to mark down. | |
587 | */ | |
588 | virtual void mark_down(const entity_addr_t& a) = 0; | |
11fdf7f2 TL |
589 | virtual void mark_down_addrs(const entity_addrvec_t& a) { |
590 | mark_down(a.legacy_addr()); | |
591 | } | |
7c673cae FG |
592 | /** |
593 | * Mark all the existing Connections down. This is equivalent | |
594 | * to iterating over all Connections and calling mark_down() | |
595 | * on each. | |
596 | * | |
597 | * This will generate a RESET event for each closed connections. | |
598 | */ | |
599 | virtual void mark_down_all() = 0; | |
600 | /** | |
601 | * @} // Connection Management | |
602 | */ | |
603 | protected: | |
604 | /** | |
605 | * @defgroup Subclass Interfacing | |
606 | * @{ | |
607 | */ | |
608 | /** | |
609 | * A courtesy function for Messenger implementations which | |
610 | * will be called when we receive our first Dispatcher. | |
611 | */ | |
612 | virtual void ready() { } | |
613 | /** | |
614 | * @} // Subclass Interfacing | |
615 | */ | |
3efd9988 FG |
616 | public: |
617 | #ifdef CEPH_USE_SIGPIPE_BLOCKER | |
618 | /** | |
619 | * We need to disable SIGPIPE on all platforms, and if they | |
620 | * don't give us a better mechanism (read: are on Solaris) that | |
621 | * means blocking the signal whenever we do a send or sendmsg... | |
622 | * That means any implementations must invoke MSGR_SIGPIPE_STOPPER in-scope | |
623 | * whenever doing so. On most systems that's blank, but on systems where | |
624 | * it's needed we construct an RAII object to plug and un-plug the SIGPIPE. | |
625 | * See http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html | |
626 | */ | |
627 | struct sigpipe_stopper { | |
628 | bool blocked; | |
629 | sigset_t existing_mask; | |
630 | sigset_t pipe_mask; | |
631 | sigpipe_stopper() { | |
632 | sigemptyset(&pipe_mask); | |
633 | sigaddset(&pipe_mask, SIGPIPE); | |
634 | sigset_t signals; | |
635 | sigemptyset(&signals); | |
636 | sigpending(&signals); | |
637 | if (sigismember(&signals, SIGPIPE)) { | |
638 | blocked = false; | |
639 | } else { | |
640 | blocked = true; | |
641 | int r = pthread_sigmask(SIG_BLOCK, &pipe_mask, &existing_mask); | |
11fdf7f2 | 642 | ceph_assert(r == 0); |
3efd9988 FG |
643 | } |
644 | } | |
645 | ~sigpipe_stopper() { | |
646 | if (blocked) { | |
647 | struct timespec nowait{0}; | |
648 | int r = sigtimedwait(&pipe_mask, 0, &nowait); | |
11fdf7f2 | 649 | ceph_assert(r == EAGAIN || r == 0); |
3efd9988 | 650 | r = pthread_sigmask(SIG_SETMASK, &existing_mask, 0); |
11fdf7f2 | 651 | ceph_assert(r == 0); |
3efd9988 FG |
652 | } |
653 | } | |
654 | }; | |
655 | # define MSGR_SIGPIPE_STOPPER Messenger::sigpipe_stopper stopper(); | |
656 | #else | |
657 | # define MSGR_SIGPIPE_STOPPER | |
658 | #endif | |
7c673cae FG |
659 | /** |
660 | * @defgroup Dispatcher Interfacing | |
661 | * @{ | |
662 | */ | |
7c673cae FG |
663 | /** |
664 | * Determine whether a message can be fast-dispatched. We will | |
665 | * query each Dispatcher in sequence to determine if they are | |
666 | * capable of handling a particular message via "fast dispatch". | |
667 | * | |
668 | * @param m The Message we are testing. | |
669 | */ | |
f67539c2 | 670 | bool ms_can_fast_dispatch(const ceph::cref_t<Message>& m) { |
11fdf7f2 TL |
671 | for (const auto &dispatcher : fast_dispatchers) { |
672 | if (dispatcher->ms_can_fast_dispatch2(m)) | |
7c673cae FG |
673 | return true; |
674 | } | |
675 | return false; | |
676 | } | |
677 | ||
678 | /** | |
679 | * Deliver a single Message via "fast dispatch". | |
680 | * | |
11fdf7f2 | 681 | * @param m The Message we are fast dispatching. |
7c673cae FG |
682 | * If none of our Dispatchers can handle it, ceph_abort(). |
683 | */ | |
f67539c2 | 684 | void ms_fast_dispatch(const ceph::ref_t<Message> &m) { |
7c673cae | 685 | m->set_dispatch_stamp(ceph_clock_now()); |
11fdf7f2 TL |
686 | for (const auto &dispatcher : fast_dispatchers) { |
687 | if (dispatcher->ms_can_fast_dispatch2(m)) { | |
688 | dispatcher->ms_fast_dispatch2(m); | |
7c673cae FG |
689 | return; |
690 | } | |
691 | } | |
692 | ceph_abort(); | |
693 | } | |
11fdf7f2 | 694 | void ms_fast_dispatch(Message *m) { |
f67539c2 | 695 | return ms_fast_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */ |
11fdf7f2 | 696 | } |
7c673cae FG |
697 | /** |
698 | * | |
699 | */ | |
f67539c2 | 700 | void ms_fast_preprocess(const ceph::ref_t<Message> &m) { |
11fdf7f2 TL |
701 | for (const auto &dispatcher : fast_dispatchers) { |
702 | dispatcher->ms_fast_preprocess2(m); | |
7c673cae FG |
703 | } |
704 | } | |
705 | /** | |
706 | * Deliver a single Message. Send it to each Dispatcher | |
707 | * in sequence until one of them handles it. | |
11fdf7f2 | 708 | * If none of our Dispatchers can handle it, ceph_abort(). |
7c673cae | 709 | * |
11fdf7f2 | 710 | * @param m The Message to deliver. |
7c673cae | 711 | */ |
f67539c2 | 712 | void ms_deliver_dispatch(const ceph::ref_t<Message> &m) { |
7c673cae | 713 | m->set_dispatch_stamp(ceph_clock_now()); |
11fdf7f2 TL |
714 | for (const auto &dispatcher : dispatchers) { |
715 | if (dispatcher->ms_dispatch2(m)) | |
7c673cae FG |
716 | return; |
717 | } | |
718 | lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from " | |
719 | << m->get_source_inst() << dendl; | |
11fdf7f2 TL |
720 | ceph_assert(!cct->_conf->ms_die_on_unhandled_msg); |
721 | } | |
722 | void ms_deliver_dispatch(Message *m) { | |
f67539c2 | 723 | return ms_deliver_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */ |
7c673cae FG |
724 | } |
725 | /** | |
726 | * Notify each Dispatcher of a new Connection. Call | |
727 | * this function whenever a new Connection is initiated or | |
728 | * reconnects. | |
729 | * | |
730 | * @param con Pointer to the new Connection. | |
731 | */ | |
732 | void ms_deliver_handle_connect(Connection *con) { | |
11fdf7f2 TL |
733 | for (const auto& dispatcher : dispatchers) { |
734 | dispatcher->ms_handle_connect(con); | |
735 | } | |
7c673cae FG |
736 | } |
737 | ||
738 | /** | |
739 | * Notify each fast Dispatcher of a new Connection. Call | |
740 | * this function whenever a new Connection is initiated or | |
741 | * reconnects. | |
742 | * | |
743 | * @param con Pointer to the new Connection. | |
744 | */ | |
745 | void ms_deliver_handle_fast_connect(Connection *con) { | |
11fdf7f2 TL |
746 | for (const auto& dispatcher : fast_dispatchers) { |
747 | dispatcher->ms_handle_fast_connect(con); | |
748 | } | |
7c673cae FG |
749 | } |
750 | ||
751 | /** | |
11fdf7f2 | 752 | * Notify each Dispatcher of a new incoming Connection. Call |
7c673cae FG |
753 | * this function whenever a new Connection is accepted. |
754 | * | |
755 | * @param con Pointer to the new Connection. | |
756 | */ | |
757 | void ms_deliver_handle_accept(Connection *con) { | |
11fdf7f2 TL |
758 | for (const auto& dispatcher : dispatchers) { |
759 | dispatcher->ms_handle_accept(con); | |
760 | } | |
7c673cae FG |
761 | } |
762 | ||
763 | /** | |
764 | * Notify each fast Dispatcher of a new incoming Connection. Call | |
765 | * this function whenever a new Connection is accepted. | |
766 | * | |
767 | * @param con Pointer to the new Connection. | |
768 | */ | |
769 | void ms_deliver_handle_fast_accept(Connection *con) { | |
11fdf7f2 TL |
770 | for (const auto& dispatcher : fast_dispatchers) { |
771 | dispatcher->ms_handle_fast_accept(con); | |
772 | } | |
7c673cae FG |
773 | } |
774 | ||
775 | /** | |
776 | * Notify each Dispatcher of a Connection which may have lost | |
777 | * Messages. Call this function whenever you detect that a lossy Connection | |
778 | * has been disconnected. | |
779 | * | |
780 | * @param con Pointer to the broken Connection. | |
781 | */ | |
782 | void ms_deliver_handle_reset(Connection *con) { | |
11fdf7f2 TL |
783 | for (const auto& dispatcher : dispatchers) { |
784 | if (dispatcher->ms_handle_reset(con)) | |
7c673cae FG |
785 | return; |
786 | } | |
787 | } | |
788 | /** | |
789 | * Notify each Dispatcher of a Connection which has been "forgotten" about | |
790 | * by the remote end, implying that messages have probably been lost. | |
791 | * Call this function whenever you detect a reset. | |
792 | * | |
793 | * @param con Pointer to the broken Connection. | |
794 | */ | |
795 | void ms_deliver_handle_remote_reset(Connection *con) { | |
11fdf7f2 TL |
796 | for (const auto& dispatcher : dispatchers) { |
797 | dispatcher->ms_handle_remote_reset(con); | |
798 | } | |
7c673cae FG |
799 | } |
800 | ||
801 | /** | |
802 | * Notify each Dispatcher of a Connection for which reconnection | |
803 | * attempts are being refused. Call this function whenever you | |
804 | * detect that a lossy Connection has been disconnected and it's | |
805 | * impossible to reconnect. | |
806 | * | |
807 | * @param con Pointer to the broken Connection. | |
808 | */ | |
809 | void ms_deliver_handle_refused(Connection *con) { | |
11fdf7f2 TL |
810 | for (const auto& dispatcher : dispatchers) { |
811 | if (dispatcher->ms_handle_refused(con)) | |
7c673cae FG |
812 | return; |
813 | } | |
814 | } | |
815 | ||
9f95a23c TL |
816 | void set_require_authorizer(bool b) { |
817 | require_authorizer = b; | |
7c673cae | 818 | } |
7c673cae FG |
819 | |
820 | /** | |
821 | * @} // Dispatcher Interfacing | |
822 | */ | |
823 | }; | |
824 | ||
825 | ||
826 | ||
827 | #endif |