]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
7c673cae FG |
2 | // vim: ts=8 sw=2 smarttab |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
11fdf7f2 | 10 | * License version 2.1, as published by the Free Software |
7c673cae | 11 | * Foundation. See file COPYING. |
11fdf7f2 | 12 | * |
7c673cae FG |
13 | */ |
14 | ||
15 | ||
16 | ||
17 | #ifndef CEPH_MESSENGER_H | |
18 | #define CEPH_MESSENGER_H | |
19 | ||
20 | #include <map> | |
11fdf7f2 TL |
21 | #include <deque> |
22 | ||
23 | #include <errno.h> | |
24 | #include <sstream> | |
25 | #include <memory> | |
7c673cae FG |
26 | |
27 | #include "Message.h" | |
28 | #include "Dispatcher.h" | |
11fdf7f2 | 29 | #include "Policy.h" |
11fdf7f2 | 30 | #include "common/Throttle.h" |
7c673cae FG |
31 | #include "include/Context.h" |
32 | #include "include/types.h" | |
33 | #include "include/ceph_features.h" | |
34 | #include "auth/Crypto.h" | |
11fdf7f2 TL |
35 | #include "common/item_history.h" |
36 | #include "auth/AuthRegistry.h" | |
20effc67 | 37 | #include "compressor_registry.h" |
11fdf7f2 | 38 | #include "include/ceph_assert.h" |
7c673cae FG |
39 | |
40 | #include <errno.h> | |
41 | #include <sstream> | |
3efd9988 | 42 | #include <signal.h> |
7c673cae FG |
43 | |
44 | #define SOCKET_PRIORITY_MIN_DELAY 6 | |
45 | ||
46 | class Timer; | |
47 | ||
11fdf7f2 TL |
48 | class AuthClient; |
49 | class AuthServer; | |
50 | ||
51 | #ifdef UNIT_TESTS_BUILT | |
52 | ||
53 | struct Interceptor { | |
54 | std::mutex lock; | |
55 | std::condition_variable cond_var; | |
56 | ||
57 | enum ACTION : uint32_t { | |
58 | CONTINUE = 0, | |
59 | FAIL, | |
60 | STOP | |
61 | }; | |
62 | ||
f67539c2 TL |
63 | enum STEP { |
64 | START_CLIENT_BANNER_EXCHANGE = 1, | |
65 | START_SERVER_BANNER_EXCHANGE, | |
66 | BANNER_EXCHANGE_BANNER_CONNECTING, | |
67 | BANNER_EXCHANGE, | |
68 | HANDLE_PEER_BANNER_BANNER_CONNECTING, | |
69 | HANDLE_PEER_BANNER, | |
70 | HANDLE_PEER_BANNER_PAYLOAD_HELLO_CONNECTING, | |
71 | HANDLE_PEER_BANNER_PAYLOAD, | |
72 | SEND_AUTH_REQUEST, | |
73 | HANDLE_AUTH_REQUEST_ACCEPTING_SIGN, | |
74 | SEND_CLIENT_IDENTITY, | |
75 | SEND_SERVER_IDENTITY, | |
76 | SEND_RECONNECT, | |
77 | SEND_RECONNECT_OK, | |
78 | READY, | |
79 | HANDLE_MESSAGE, | |
80 | READ_MESSAGE_COMPLETE, | |
20effc67 TL |
81 | SESSION_RETRY, |
82 | SEND_COMPRESSION_REQUEST, | |
83 | HANDLE_COMPRESSION_REQUEST | |
f67539c2 TL |
84 | }; |
85 | ||
11fdf7f2 TL |
86 | virtual ~Interceptor() {} |
87 | virtual ACTION intercept(Connection *conn, uint32_t step) = 0; | |
88 | }; | |
89 | ||
90 | #endif | |
7c673cae FG |
91 | |
92 | class Messenger { | |
93 | private: | |
11fdf7f2 TL |
94 | std::deque<Dispatcher*> dispatchers; |
95 | std::deque<Dispatcher*> fast_dispatchers; | |
7c673cae FG |
96 | ZTracer::Endpoint trace_endpoint; |
97 | ||
11fdf7f2 | 98 | protected: |
7c673cae FG |
99 | void set_endpoint_addr(const entity_addr_t& a, |
100 | const entity_name_t &name); | |
101 | ||
102 | protected: | |
103 | /// the "name" of the local daemon. eg client.99 | |
11fdf7f2 TL |
104 | entity_name_t my_name; |
105 | ||
106 | /// my addr | |
107 | safe_item_history<entity_addrvec_t> my_addrs; | |
108 | ||
7c673cae | 109 | int default_send_priority; |
9f95a23c | 110 | /// std::set to true once the Messenger has started, and std::set to false on shutdown |
7c673cae FG |
111 | bool started; |
112 | uint32_t magic; | |
113 | int socket_priority; | |
114 | ||
115 | public: | |
11fdf7f2 TL |
116 | AuthClient *auth_client = 0; |
117 | AuthServer *auth_server = 0; | |
118 | ||
119 | #ifdef UNIT_TESTS_BUILT | |
120 | Interceptor *interceptor = nullptr; | |
121 | #endif | |
122 | ||
7c673cae FG |
123 | /** |
124 | * The CephContext this Messenger uses. Many other components initialize themselves | |
125 | * from this value. | |
126 | */ | |
127 | CephContext *cct; | |
128 | int crcflags; | |
129 | ||
11fdf7f2 | 130 | using Policy = ceph::net::Policy<Throttle>; |
7c673cae | 131 | |
9f95a23c TL |
132 | public: |
133 | // allow unauthenticated connections. This is needed for | |
134 | // compatibility with pre-nautilus OSDs, which do not authenticate | |
135 | // the heartbeat sessions. | |
136 | bool require_authorizer = true; | |
137 | ||
11fdf7f2 TL |
138 | protected: |
139 | // for authentication | |
140 | AuthRegistry auth_registry; | |
141 | ||
142 | public: | |
7c673cae FG |
143 | /** |
144 | * Messenger constructor. Call this from your implementation. | |
145 | * Messenger users should construct full implementations directly, | |
146 | * or use the create() function. | |
147 | */ | |
11fdf7f2 | 148 | Messenger(CephContext *cct_, entity_name_t w); |
7c673cae FG |
149 | virtual ~Messenger() {} |
150 | ||
151 | /** | |
152 | * create a new messenger | |
153 | * | |
154 | * Create a new messenger instance, with whatever implementation is | |
155 | * available or specified via the configuration in cct. | |
156 | * | |
157 | * @param cct context | |
158 | * @param type name of messenger type | |
159 | * @param name entity name to register | |
160 | * @param lname logical name of the messenger in this process (e.g., "client") | |
161 | * @param nonce nonce value to uniquely identify this instance on the current host | |
7c673cae FG |
162 | */ |
163 | static Messenger *create(CephContext *cct, | |
9f95a23c | 164 | const std::string &type, |
7c673cae | 165 | entity_name_t name, |
9f95a23c | 166 | std::string lname, |
f67539c2 | 167 | uint64_t nonce); |
7c673cae | 168 | |
9f95a23c TL |
169 | static uint64_t get_random_nonce(); |
170 | static uint64_t get_pid_nonce(); | |
171 | ||
7c673cae FG |
172 | /** |
173 | * create a new messenger | |
174 | * | |
175 | * Create a new messenger instance. | |
176 | * Same as the above, but a slightly simpler interface for clients: | |
177 | * - Generate a random nonce | |
7c673cae FG |
178 | * - get the messenger type from cct |
179 | * - use the client entity_type | |
180 | * | |
181 | * @param cct context | |
182 | * @param lname logical name of the messenger in this process (e.g., "client") | |
183 | */ | |
9f95a23c | 184 | static Messenger *create_client_messenger(CephContext *cct, std::string lname); |
7c673cae FG |
185 | |
186 | /** | |
187 | * @defgroup Accessors | |
188 | * @{ | |
189 | */ | |
11fdf7f2 TL |
190 | int get_mytype() const { return my_name.type(); } |
191 | ||
7c673cae | 192 | /** |
11fdf7f2 | 193 | * Retrieve the Messenger's name |
7c673cae | 194 | * |
11fdf7f2 | 195 | * @return A const reference to the name this Messenger |
7c673cae FG |
196 | * currently believes to be its own. |
197 | */ | |
11fdf7f2 | 198 | const entity_name_t& get_myname() { return my_name; } |
7c673cae FG |
199 | |
200 | /** | |
201 | * Retrieve the Messenger's address. | |
202 | * | |
203 | * @return A const reference to the address this Messenger | |
204 | * currently believes to be its own. | |
205 | */ | |
11fdf7f2 TL |
206 | const entity_addrvec_t& get_myaddrs() { |
207 | return *my_addrs; | |
208 | } | |
209 | ||
210 | /** | |
211 | * get legacy addr for myself, suitable for protocol v1 | |
212 | * | |
213 | * Note that myaddrs might be a proper addrvec with v1 in it, or it might be an | |
214 | * ANY addr (if i am a pure client). | |
215 | */ | |
216 | entity_addr_t get_myaddr_legacy() { | |
217 | return my_addrs->as_legacy_addr(); | |
218 | } | |
219 | ||
220 | ||
221 | /** | |
9f95a23c | 222 | * std::set messenger's instance |
11fdf7f2 TL |
223 | */ |
224 | uint32_t get_magic() { return magic; } | |
225 | void set_magic(int _magic) { magic = _magic; } | |
226 | ||
227 | void set_auth_client(AuthClient *ac) { | |
228 | auth_client = ac; | |
229 | } | |
230 | void set_auth_server(AuthServer *as) { | |
231 | auth_server = as; | |
232 | } | |
233 | ||
20effc67 TL |
234 | // for compression |
235 | CompressorRegistry comp_registry; | |
236 | ||
7c673cae FG |
237 | protected: |
238 | /** | |
9f95a23c | 239 | * std::set messenger's address |
7c673cae | 240 | */ |
11fdf7f2 TL |
241 | virtual void set_myaddrs(const entity_addrvec_t& a) { |
242 | my_addrs = a; | |
243 | set_endpoint_addr(a.front(), my_name); | |
7c673cae FG |
244 | } |
245 | public: | |
246 | /** | |
247 | * @return the zipkin trace endpoint | |
248 | */ | |
249 | const ZTracer::Endpoint* get_trace_endpoint() const { | |
250 | return &trace_endpoint; | |
251 | } | |
252 | ||
7c673cae | 253 | /** |
9f95a23c | 254 | * set the name of the local entity. The name is reported to others and |
7c673cae FG |
255 | * can be changed while the system is running, but doing so at incorrect |
256 | * times may have bad results. | |
257 | * | |
9f95a23c | 258 | * @param m The name to std::set. |
7c673cae | 259 | */ |
11fdf7f2 TL |
260 | void set_myname(const entity_name_t& m) { my_name = m; } |
261 | ||
7c673cae | 262 | /** |
9f95a23c | 263 | * set the unknown address components for this Messenger. |
7c673cae FG |
264 | * This is useful if the Messenger doesn't know its full address just by |
265 | * binding, but another Messenger on the same interface has already learned | |
266 | * its full address. This function does not fill in known address elements, | |
267 | * cause a rebind, or do anything of that sort. | |
268 | * | |
269 | * @param addr The address to use as a template. | |
270 | */ | |
11fdf7f2 | 271 | virtual bool set_addr_unknowns(const entity_addrvec_t &addrs) = 0; |
224ce89b | 272 | /** |
9f95a23c | 273 | * set the address for this Messenger. This is useful if the Messenger |
224ce89b WB |
274 | * binds to a specific address but advertises a different address on the |
275 | * the network. | |
276 | * | |
277 | * @param addr The address to use. | |
278 | */ | |
11fdf7f2 | 279 | virtual void set_addrs(const entity_addrvec_t &addr) = 0; |
7c673cae FG |
280 | /// Get the default send priority. |
281 | int get_default_send_priority() { return default_send_priority; } | |
282 | /** | |
283 | * Get the number of Messages which the Messenger has received | |
284 | * but not yet dispatched. | |
285 | */ | |
286 | virtual int get_dispatch_queue_len() = 0; | |
287 | ||
288 | /** | |
289 | * Get age of oldest undelivered message | |
290 | * (0 if the queue is empty) | |
291 | */ | |
292 | virtual double get_dispatch_queue_max_age(utime_t now) = 0; | |
7c673cae FG |
293 | |
294 | /** | |
295 | * @} // Accessors | |
296 | */ | |
297 | ||
298 | /** | |
299 | * @defgroup Configuration | |
300 | * @{ | |
301 | */ | |
302 | /** | |
9f95a23c | 303 | * set the cluster protocol in use by this daemon. |
7c673cae FG |
304 | * This is an init-time function and cannot be called after calling |
305 | * start() or bind(). | |
306 | * | |
307 | * @param p The cluster protocol to use. Defined externally. | |
308 | */ | |
309 | virtual void set_cluster_protocol(int p) = 0; | |
310 | /** | |
9f95a23c | 311 | * set a policy which is applied to all peers who do not have a type-specific |
7c673cae FG |
312 | * Policy. |
313 | * This is an init-time function and cannot be called after calling | |
314 | * start() or bind(). | |
315 | * | |
316 | * @param p The Policy to apply. | |
317 | */ | |
318 | virtual void set_default_policy(Policy p) = 0; | |
319 | /** | |
9f95a23c | 320 | * set a policy which is applied to all peers of the given type. |
7c673cae FG |
321 | * This is an init-time function and cannot be called after calling |
322 | * start() or bind(). | |
323 | * | |
324 | * @param type The peer type this policy applies to. | |
325 | * @param p The policy to apply. | |
326 | */ | |
327 | virtual void set_policy(int type, Policy p) = 0; | |
328 | /** | |
9f95a23c | 329 | * set the Policy associated with a type of peer. |
7c673cae FG |
330 | * |
331 | * This can be called either on initial setup, or after connections | |
332 | * are already established. However, the policies for existing | |
333 | * connections will not be affected; the new policy will only apply | |
334 | * to future connections. | |
335 | * | |
336 | * @param t The peer type to get the default policy for. | |
337 | * @return A const Policy reference. | |
338 | */ | |
339 | virtual Policy get_policy(int t) = 0; | |
340 | /** | |
341 | * Get the default Policy | |
342 | * | |
343 | * @return A const Policy reference. | |
344 | */ | |
345 | virtual Policy get_default_policy() = 0; | |
346 | /** | |
9f95a23c | 347 | * set Throttlers applied to all Messages from the given type of peer |
7c673cae FG |
348 | * |
349 | * This is an init-time function and cannot be called after calling | |
350 | * start() or bind(). | |
351 | * | |
352 | * @param type The peer type the Throttlers will apply to. | |
353 | * @param bytes The Throttle for the number of bytes carried by the message | |
354 | * @param msgs The Throttle for the number of messages for this @p type | |
355 | * @note The Messenger does not take ownership of the Throttle pointers, but | |
356 | * you must not destroy them before you destroy the Messenger. | |
357 | */ | |
358 | virtual void set_policy_throttlers(int type, Throttle *bytes, Throttle *msgs=NULL) = 0; | |
359 | /** | |
9f95a23c | 360 | * set the default send priority |
7c673cae FG |
361 | * |
362 | * This is an init-time function and must be called *before* calling | |
363 | * start(). | |
364 | * | |
365 | * @param p The cluster protocol to use. Defined externally. | |
366 | */ | |
367 | void set_default_send_priority(int p) { | |
11fdf7f2 | 368 | ceph_assert(!started); |
7c673cae FG |
369 | default_send_priority = p; |
370 | } | |
371 | /** | |
9f95a23c | 372 | * set the priority(SO_PRIORITY) for all packets to be sent on this socket. |
7c673cae FG |
373 | * |
374 | * Linux uses this value to order the networking queues: packets with a higher | |
375 | * priority may be processed first depending on the selected device queueing | |
376 | * discipline. | |
377 | * | |
378 | * @param prio The priority. Setting a priority outside the range 0 to 6 | |
379 | * requires the CAP_NET_ADMIN capability. | |
380 | */ | |
381 | void set_socket_priority(int prio) { | |
382 | socket_priority = prio; | |
383 | } | |
384 | /** | |
385 | * Get the socket priority | |
386 | * | |
387 | * @return the socket priority | |
388 | */ | |
389 | int get_socket_priority() { | |
390 | return socket_priority; | |
391 | } | |
392 | /** | |
393 | * Add a new Dispatcher to the front of the list. If you add | |
394 | * a Dispatcher which is already included, it will get a duplicate | |
395 | * entry. This will reduce efficiency but not break anything. | |
396 | * | |
397 | * @param d The Dispatcher to insert into the list. | |
398 | */ | |
11fdf7f2 | 399 | void add_dispatcher_head(Dispatcher *d) { |
7c673cae FG |
400 | bool first = dispatchers.empty(); |
401 | dispatchers.push_front(d); | |
402 | if (d->ms_can_fast_dispatch_any()) | |
403 | fast_dispatchers.push_front(d); | |
404 | if (first) | |
405 | ready(); | |
406 | } | |
407 | /** | |
408 | * Add a new Dispatcher to the end of the list. If you add | |
409 | * a Dispatcher which is already included, it will get a duplicate | |
410 | * entry. This will reduce efficiency but not break anything. | |
411 | * | |
412 | * @param d The Dispatcher to insert into the list. | |
413 | */ | |
11fdf7f2 | 414 | void add_dispatcher_tail(Dispatcher *d) { |
7c673cae FG |
415 | bool first = dispatchers.empty(); |
416 | dispatchers.push_back(d); | |
417 | if (d->ms_can_fast_dispatch_any()) | |
418 | fast_dispatchers.push_back(d); | |
419 | if (first) | |
420 | ready(); | |
421 | } | |
422 | /** | |
423 | * Bind the Messenger to a specific address. If bind_addr | |
424 | * is not completely filled in the system will use the | |
425 | * valid portions and cycle through the unset ones (eg, the port) | |
426 | * in an unspecified order. | |
427 | * | |
428 | * @param bind_addr The address to bind to. | |
429 | * @return 0 on success, or -1 on error, or -errno if | |
430 | * we can be more specific about the failure. | |
431 | */ | |
432 | virtual int bind(const entity_addr_t& bind_addr) = 0; | |
11fdf7f2 | 433 | |
f6b5b4d7 TL |
434 | virtual int bindv(const entity_addrvec_t& addrs); |
435 | ||
7c673cae FG |
436 | /** |
437 | * This function performs a full restart of the Messenger component, | |
438 | * whatever that means. Other entities who connect to this | |
439 | * Messenger post-rebind() should perceive it as a new entity which | |
440 | * they have not previously contacted, and it MUST bind to a | |
441 | * different address than it did previously. | |
442 | * | |
443 | * @param avoid_ports Additional port to avoid binding to. | |
444 | */ | |
9f95a23c | 445 | virtual int rebind(const std::set<int>& avoid_ports) { return -EOPNOTSUPP; } |
7c673cae FG |
446 | /** |
447 | * Bind the 'client' Messenger to a specific address.Messenger will bind | |
448 | * the address before connect to others when option ms_bind_before_connect | |
449 | * is true. | |
450 | * @param bind_addr The address to bind to. | |
451 | * @return 0 on success, or -1 on error, or -errno if | |
f6b5b4d7 | 452 | * we can be more specific about the failure. |
7c673cae FG |
453 | */ |
454 | virtual int client_bind(const entity_addr_t& bind_addr) = 0; | |
11fdf7f2 | 455 | |
f6b5b4d7 TL |
456 | /** |
457 | * reset the 'client' Messenger. Mark all the existing Connections down | |
458 | * and update 'nonce'. | |
459 | */ | |
460 | virtual int client_reset() = 0; | |
11fdf7f2 TL |
461 | |
462 | ||
463 | virtual bool should_use_msgr2() { | |
464 | return false; | |
465 | } | |
466 | ||
7c673cae FG |
467 | /** |
468 | * @} // Configuration | |
469 | */ | |
470 | ||
471 | /** | |
472 | * @defgroup Startup/Shutdown | |
473 | * @{ | |
474 | */ | |
475 | /** | |
476 | * Perform any resource allocation, thread startup, etc | |
477 | * that is required before attempting to connect to other | |
478 | * Messengers or transmit messages. | |
479 | * Once this function completes, started shall be set to true. | |
480 | * | |
481 | * @return 0 on success; -errno on failure. | |
482 | */ | |
483 | virtual int start() { started = true; return 0; } | |
484 | ||
485 | // shutdown | |
486 | /** | |
487 | * Block until the Messenger has finished shutting down (according | |
488 | * to the shutdown() function). | |
489 | * It is valid to call this after calling shutdown(), but it must | |
490 | * be called before deleting the Messenger. | |
491 | */ | |
492 | virtual void wait() = 0; | |
493 | /** | |
494 | * Initiate a shutdown of the Messenger. | |
495 | * | |
496 | * @return 0 on success, -errno otherwise. | |
497 | */ | |
498 | virtual int shutdown() { started = false; return 0; } | |
499 | /** | |
500 | * @} // Startup/Shutdown | |
501 | */ | |
502 | ||
503 | /** | |
504 | * @defgroup Messaging | |
505 | * @{ | |
506 | */ | |
507 | /** | |
508 | * Queue the given Message for the given entity. | |
509 | * Success in this function does not guarantee Message delivery, only | |
510 | * success in queueing the Message. Other guarantees may be provided based | |
511 | * on the Connection policy associated with the dest. | |
512 | * | |
513 | * @param m The Message to send. The Messenger consumes a single reference | |
514 | * when you pass it in. | |
515 | * @param dest The entity to send the Message to. | |
516 | * | |
517 | * DEPRECATED: please do not use this interface for any new code; | |
518 | * use the Connection* variant. | |
519 | * | |
520 | * @return 0 on success, or -errno on failure. | |
521 | */ | |
11fdf7f2 TL |
522 | virtual int send_to( |
523 | Message *m, | |
524 | int type, | |
525 | const entity_addrvec_t& addr) = 0; | |
526 | int send_to_mon( | |
527 | Message *m, const entity_addrvec_t& addrs) { | |
528 | return send_to(m, CEPH_ENTITY_TYPE_MON, addrs); | |
529 | } | |
530 | int send_to_mds( | |
531 | Message *m, const entity_addrvec_t& addrs) { | |
532 | return send_to(m, CEPH_ENTITY_TYPE_MDS, addrs); | |
533 | } | |
7c673cae FG |
534 | |
535 | /** | |
536 | * @} // Messaging | |
537 | */ | |
538 | /** | |
539 | * @defgroup Connection Management | |
540 | * @{ | |
541 | */ | |
542 | /** | |
543 | * Get the Connection object associated with a given entity. If a | |
544 | * Connection does not exist, create one and establish a logical connection. | |
545 | * The caller owns a reference when this returns. Call ->put() when you're | |
546 | * done! | |
547 | * | |
548 | * @param dest The entity to get a connection for. | |
549 | */ | |
11fdf7f2 | 550 | virtual ConnectionRef connect_to( |
9f95a23c TL |
551 | int type, const entity_addrvec_t& dest, |
552 | bool anon=false, bool not_local_dest=false) = 0; | |
553 | ConnectionRef connect_to_mon(const entity_addrvec_t& dest, | |
554 | bool anon=false, bool not_local_dest=false) { | |
555 | return connect_to(CEPH_ENTITY_TYPE_MON, dest, anon, not_local_dest); | |
11fdf7f2 | 556 | } |
9f95a23c TL |
557 | ConnectionRef connect_to_mds(const entity_addrvec_t& dest, |
558 | bool anon=false, bool not_local_dest=false) { | |
559 | return connect_to(CEPH_ENTITY_TYPE_MDS, dest, anon, not_local_dest); | |
11fdf7f2 | 560 | } |
9f95a23c TL |
561 | ConnectionRef connect_to_osd(const entity_addrvec_t& dest, |
562 | bool anon=false, bool not_local_dest=false) { | |
563 | return connect_to(CEPH_ENTITY_TYPE_OSD, dest, anon, not_local_dest); | |
11fdf7f2 | 564 | } |
9f95a23c TL |
565 | ConnectionRef connect_to_mgr(const entity_addrvec_t& dest, |
566 | bool anon=false, bool not_local_dest=false) { | |
567 | return connect_to(CEPH_ENTITY_TYPE_MGR, dest, anon, not_local_dest); | |
11fdf7f2 TL |
568 | } |
569 | ||
7c673cae FG |
570 | /** |
571 | * Get the Connection object associated with ourselves. | |
572 | */ | |
573 | virtual ConnectionRef get_loopback_connection() = 0; | |
574 | /** | |
575 | * Mark down a Connection to a remote. | |
576 | * | |
577 | * This will cause us to discard our outgoing queue for them, and if | |
578 | * reset detection is enabled in the policy and the endpoint tries | |
579 | * to reconnect they will discard their queue when we inform them of | |
580 | * the session reset. | |
581 | * | |
582 | * If there is no Connection to the given dest, it is a no-op. | |
583 | * | |
584 | * This generates a RESET notification to the Dispatcher. | |
585 | * | |
586 | * DEPRECATED: please do not use this interface for any new code; | |
587 | * use the Connection* variant. | |
588 | * | |
589 | * @param a The address to mark down. | |
590 | */ | |
591 | virtual void mark_down(const entity_addr_t& a) = 0; | |
11fdf7f2 TL |
592 | virtual void mark_down_addrs(const entity_addrvec_t& a) { |
593 | mark_down(a.legacy_addr()); | |
594 | } | |
7c673cae FG |
595 | /** |
596 | * Mark all the existing Connections down. This is equivalent | |
597 | * to iterating over all Connections and calling mark_down() | |
598 | * on each. | |
599 | * | |
600 | * This will generate a RESET event for each closed connections. | |
601 | */ | |
602 | virtual void mark_down_all() = 0; | |
603 | /** | |
604 | * @} // Connection Management | |
605 | */ | |
606 | protected: | |
607 | /** | |
608 | * @defgroup Subclass Interfacing | |
609 | * @{ | |
610 | */ | |
611 | /** | |
612 | * A courtesy function for Messenger implementations which | |
613 | * will be called when we receive our first Dispatcher. | |
614 | */ | |
615 | virtual void ready() { } | |
616 | /** | |
617 | * @} // Subclass Interfacing | |
618 | */ | |
3efd9988 FG |
619 | public: |
620 | #ifdef CEPH_USE_SIGPIPE_BLOCKER | |
621 | /** | |
622 | * We need to disable SIGPIPE on all platforms, and if they | |
623 | * don't give us a better mechanism (read: are on Solaris) that | |
624 | * means blocking the signal whenever we do a send or sendmsg... | |
625 | * That means any implementations must invoke MSGR_SIGPIPE_STOPPER in-scope | |
626 | * whenever doing so. On most systems that's blank, but on systems where | |
627 | * it's needed we construct an RAII object to plug and un-plug the SIGPIPE. | |
628 | * See http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html | |
629 | */ | |
630 | struct sigpipe_stopper { | |
631 | bool blocked; | |
632 | sigset_t existing_mask; | |
633 | sigset_t pipe_mask; | |
634 | sigpipe_stopper() { | |
635 | sigemptyset(&pipe_mask); | |
636 | sigaddset(&pipe_mask, SIGPIPE); | |
637 | sigset_t signals; | |
638 | sigemptyset(&signals); | |
639 | sigpending(&signals); | |
640 | if (sigismember(&signals, SIGPIPE)) { | |
641 | blocked = false; | |
642 | } else { | |
643 | blocked = true; | |
644 | int r = pthread_sigmask(SIG_BLOCK, &pipe_mask, &existing_mask); | |
11fdf7f2 | 645 | ceph_assert(r == 0); |
3efd9988 FG |
646 | } |
647 | } | |
648 | ~sigpipe_stopper() { | |
649 | if (blocked) { | |
650 | struct timespec nowait{0}; | |
651 | int r = sigtimedwait(&pipe_mask, 0, &nowait); | |
11fdf7f2 | 652 | ceph_assert(r == EAGAIN || r == 0); |
3efd9988 | 653 | r = pthread_sigmask(SIG_SETMASK, &existing_mask, 0); |
11fdf7f2 | 654 | ceph_assert(r == 0); |
3efd9988 FG |
655 | } |
656 | } | |
657 | }; | |
658 | # define MSGR_SIGPIPE_STOPPER Messenger::sigpipe_stopper stopper(); | |
659 | #else | |
660 | # define MSGR_SIGPIPE_STOPPER | |
661 | #endif | |
7c673cae FG |
662 | /** |
663 | * @defgroup Dispatcher Interfacing | |
664 | * @{ | |
665 | */ | |
7c673cae FG |
666 | /** |
667 | * Determine whether a message can be fast-dispatched. We will | |
668 | * query each Dispatcher in sequence to determine if they are | |
669 | * capable of handling a particular message via "fast dispatch". | |
670 | * | |
671 | * @param m The Message we are testing. | |
672 | */ | |
f67539c2 | 673 | bool ms_can_fast_dispatch(const ceph::cref_t<Message>& m) { |
11fdf7f2 TL |
674 | for (const auto &dispatcher : fast_dispatchers) { |
675 | if (dispatcher->ms_can_fast_dispatch2(m)) | |
7c673cae FG |
676 | return true; |
677 | } | |
678 | return false; | |
679 | } | |
680 | ||
681 | /** | |
682 | * Deliver a single Message via "fast dispatch". | |
683 | * | |
11fdf7f2 | 684 | * @param m The Message we are fast dispatching. |
7c673cae FG |
685 | * If none of our Dispatchers can handle it, ceph_abort(). |
686 | */ | |
f67539c2 | 687 | void ms_fast_dispatch(const ceph::ref_t<Message> &m) { |
7c673cae | 688 | m->set_dispatch_stamp(ceph_clock_now()); |
11fdf7f2 TL |
689 | for (const auto &dispatcher : fast_dispatchers) { |
690 | if (dispatcher->ms_can_fast_dispatch2(m)) { | |
691 | dispatcher->ms_fast_dispatch2(m); | |
7c673cae FG |
692 | return; |
693 | } | |
694 | } | |
695 | ceph_abort(); | |
696 | } | |
11fdf7f2 | 697 | void ms_fast_dispatch(Message *m) { |
f67539c2 | 698 | return ms_fast_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */ |
11fdf7f2 | 699 | } |
7c673cae FG |
700 | /** |
701 | * | |
702 | */ | |
f67539c2 | 703 | void ms_fast_preprocess(const ceph::ref_t<Message> &m) { |
11fdf7f2 TL |
704 | for (const auto &dispatcher : fast_dispatchers) { |
705 | dispatcher->ms_fast_preprocess2(m); | |
7c673cae FG |
706 | } |
707 | } | |
708 | /** | |
709 | * Deliver a single Message. Send it to each Dispatcher | |
710 | * in sequence until one of them handles it. | |
11fdf7f2 | 711 | * If none of our Dispatchers can handle it, ceph_abort(). |
7c673cae | 712 | * |
11fdf7f2 | 713 | * @param m The Message to deliver. |
7c673cae | 714 | */ |
f67539c2 | 715 | void ms_deliver_dispatch(const ceph::ref_t<Message> &m) { |
7c673cae | 716 | m->set_dispatch_stamp(ceph_clock_now()); |
11fdf7f2 TL |
717 | for (const auto &dispatcher : dispatchers) { |
718 | if (dispatcher->ms_dispatch2(m)) | |
7c673cae FG |
719 | return; |
720 | } | |
721 | lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from " | |
722 | << m->get_source_inst() << dendl; | |
11fdf7f2 TL |
723 | ceph_assert(!cct->_conf->ms_die_on_unhandled_msg); |
724 | } | |
725 | void ms_deliver_dispatch(Message *m) { | |
f67539c2 | 726 | return ms_deliver_dispatch(ceph::ref_t<Message>(m, false)); /* consume ref */ |
7c673cae FG |
727 | } |
728 | /** | |
729 | * Notify each Dispatcher of a new Connection. Call | |
730 | * this function whenever a new Connection is initiated or | |
731 | * reconnects. | |
732 | * | |
733 | * @param con Pointer to the new Connection. | |
734 | */ | |
735 | void ms_deliver_handle_connect(Connection *con) { | |
11fdf7f2 TL |
736 | for (const auto& dispatcher : dispatchers) { |
737 | dispatcher->ms_handle_connect(con); | |
738 | } | |
7c673cae FG |
739 | } |
740 | ||
741 | /** | |
742 | * Notify each fast Dispatcher of a new Connection. Call | |
743 | * this function whenever a new Connection is initiated or | |
744 | * reconnects. | |
745 | * | |
746 | * @param con Pointer to the new Connection. | |
747 | */ | |
748 | void ms_deliver_handle_fast_connect(Connection *con) { | |
11fdf7f2 TL |
749 | for (const auto& dispatcher : fast_dispatchers) { |
750 | dispatcher->ms_handle_fast_connect(con); | |
751 | } | |
7c673cae FG |
752 | } |
753 | ||
754 | /** | |
11fdf7f2 | 755 | * Notify each Dispatcher of a new incoming Connection. Call |
7c673cae FG |
756 | * this function whenever a new Connection is accepted. |
757 | * | |
758 | * @param con Pointer to the new Connection. | |
759 | */ | |
760 | void ms_deliver_handle_accept(Connection *con) { | |
11fdf7f2 TL |
761 | for (const auto& dispatcher : dispatchers) { |
762 | dispatcher->ms_handle_accept(con); | |
763 | } | |
7c673cae FG |
764 | } |
765 | ||
766 | /** | |
767 | * Notify each fast Dispatcher of a new incoming Connection. Call | |
768 | * this function whenever a new Connection is accepted. | |
769 | * | |
770 | * @param con Pointer to the new Connection. | |
771 | */ | |
772 | void ms_deliver_handle_fast_accept(Connection *con) { | |
11fdf7f2 TL |
773 | for (const auto& dispatcher : fast_dispatchers) { |
774 | dispatcher->ms_handle_fast_accept(con); | |
775 | } | |
7c673cae FG |
776 | } |
777 | ||
778 | /** | |
779 | * Notify each Dispatcher of a Connection which may have lost | |
780 | * Messages. Call this function whenever you detect that a lossy Connection | |
781 | * has been disconnected. | |
782 | * | |
783 | * @param con Pointer to the broken Connection. | |
784 | */ | |
785 | void ms_deliver_handle_reset(Connection *con) { | |
11fdf7f2 TL |
786 | for (const auto& dispatcher : dispatchers) { |
787 | if (dispatcher->ms_handle_reset(con)) | |
7c673cae FG |
788 | return; |
789 | } | |
790 | } | |
791 | /** | |
792 | * Notify each Dispatcher of a Connection which has been "forgotten" about | |
793 | * by the remote end, implying that messages have probably been lost. | |
794 | * Call this function whenever you detect a reset. | |
795 | * | |
796 | * @param con Pointer to the broken Connection. | |
797 | */ | |
798 | void ms_deliver_handle_remote_reset(Connection *con) { | |
11fdf7f2 TL |
799 | for (const auto& dispatcher : dispatchers) { |
800 | dispatcher->ms_handle_remote_reset(con); | |
801 | } | |
7c673cae FG |
802 | } |
803 | ||
804 | /** | |
805 | * Notify each Dispatcher of a Connection for which reconnection | |
806 | * attempts are being refused. Call this function whenever you | |
807 | * detect that a lossy Connection has been disconnected and it's | |
808 | * impossible to reconnect. | |
809 | * | |
810 | * @param con Pointer to the broken Connection. | |
811 | */ | |
812 | void ms_deliver_handle_refused(Connection *con) { | |
11fdf7f2 TL |
813 | for (const auto& dispatcher : dispatchers) { |
814 | if (dispatcher->ms_handle_refused(con)) | |
7c673cae FG |
815 | return; |
816 | } | |
817 | } | |
818 | ||
9f95a23c TL |
819 | void set_require_authorizer(bool b) { |
820 | require_authorizer = b; | |
7c673cae | 821 | } |
7c673cae FG |
822 | |
823 | /** | |
824 | * @} // Dispatcher Interfacing | |
825 | */ | |
826 | }; | |
827 | ||
828 | ||
829 | ||
830 | #endif |