]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | ||
16 | ||
17 | #ifndef CEPH_MESSENGER_H | |
18 | #define CEPH_MESSENGER_H | |
19 | ||
20 | #include <map> | |
21 | using namespace std; | |
22 | ||
23 | #include "Message.h" | |
24 | #include "Dispatcher.h" | |
25 | #include "common/Mutex.h" | |
26 | #include "common/Cond.h" | |
27 | #include "include/Context.h" | |
28 | #include "include/types.h" | |
29 | #include "include/ceph_features.h" | |
30 | #include "auth/Crypto.h" | |
31 | ||
32 | #include <errno.h> | |
33 | #include <sstream> | |
3efd9988 | 34 | #include <signal.h> |
7c673cae FG |
35 | |
36 | #define SOCKET_PRIORITY_MIN_DELAY 6 | |
37 | ||
38 | class Timer; | |
39 | ||
40 | ||
41 | class Messenger { | |
42 | private: | |
43 | list<Dispatcher*> dispatchers; | |
44 | list <Dispatcher*> fast_dispatchers; | |
45 | ZTracer::Endpoint trace_endpoint; | |
46 | ||
47 | void set_endpoint_addr(const entity_addr_t& a, | |
48 | const entity_name_t &name); | |
49 | ||
50 | protected: | |
51 | /// the "name" of the local daemon. eg client.99 | |
52 | entity_inst_t my_inst; | |
53 | int default_send_priority; | |
54 | /// set to true once the Messenger has started, and set to false on shutdown | |
55 | bool started; | |
56 | uint32_t magic; | |
57 | int socket_priority; | |
58 | ||
59 | public: | |
60 | /** | |
61 | * Various Messenger conditional config/type flags to allow | |
62 | * different "transport" Messengers to tune themselves | |
63 | */ | |
64 | static const int HAS_HEAVY_TRAFFIC = 0x0001; | |
65 | static const int HAS_MANY_CONNECTIONS = 0x0002; | |
66 | static const int HEARTBEAT = 0x0004; | |
67 | ||
68 | /** | |
69 | * The CephContext this Messenger uses. Many other components initialize themselves | |
70 | * from this value. | |
71 | */ | |
72 | CephContext *cct; | |
73 | int crcflags; | |
74 | ||
75 | /** | |
76 | * A Policy describes the rules of a Connection. Is there a limit on how | |
77 | * much data this Connection can have locally? When the underlying connection | |
78 | * experiences an error, does the Connection disappear? Can this Messenger | |
79 | * re-establish the underlying connection? | |
80 | */ | |
81 | struct Policy { | |
82 | /// If true, the Connection is tossed out on errors. | |
83 | bool lossy; | |
84 | /// If true, the underlying connection can't be re-established from this end. | |
85 | bool server; | |
86 | /// If true, we will standby when idle | |
87 | bool standby; | |
88 | /// If true, we will try to detect session resets | |
89 | bool resetcheck; | |
90 | /** | |
91 | * The throttler is used to limit how much data is held by Messages from | |
92 | * the associated Connection(s). When reading in a new Message, the Messenger | |
93 | * will call throttler->throttle() for the size of the new Message. | |
94 | */ | |
95 | Throttle *throttler_bytes; | |
96 | Throttle *throttler_messages; | |
97 | ||
98 | /// Specify features supported locally by the endpoint. | |
99 | uint64_t features_supported; | |
100 | /// Specify features any remotes must have to talk to this endpoint. | |
101 | uint64_t features_required; | |
102 | ||
103 | Policy() | |
104 | : lossy(false), server(false), standby(false), resetcheck(true), | |
105 | throttler_bytes(NULL), | |
106 | throttler_messages(NULL), | |
107 | features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT), | |
108 | features_required(0) {} | |
109 | private: | |
110 | Policy(bool l, bool s, bool st, bool r, uint64_t req) | |
111 | : lossy(l), server(s), standby(st), resetcheck(r), | |
112 | throttler_bytes(NULL), | |
113 | throttler_messages(NULL), | |
114 | features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT), | |
115 | features_required(req) {} | |
116 | ||
117 | public: | |
118 | static Policy stateful_server(uint64_t req) { | |
119 | return Policy(false, true, true, true, req); | |
120 | } | |
121 | static Policy stateless_server(uint64_t req) { | |
122 | return Policy(true, true, false, false, req); | |
123 | } | |
124 | static Policy lossless_peer(uint64_t req) { | |
125 | return Policy(false, false, true, false, req); | |
126 | } | |
127 | static Policy lossless_peer_reuse(uint64_t req) { | |
128 | return Policy(false, false, true, true, req); | |
129 | } | |
130 | static Policy lossy_client(uint64_t req) { | |
131 | return Policy(true, false, false, false, req); | |
132 | } | |
133 | static Policy lossless_client(uint64_t req) { | |
134 | return Policy(false, false, false, true, req); | |
135 | } | |
136 | }; | |
137 | ||
138 | /** | |
139 | * Messenger constructor. Call this from your implementation. | |
140 | * Messenger users should construct full implementations directly, | |
141 | * or use the create() function. | |
142 | */ | |
143 | Messenger(CephContext *cct_, entity_name_t w) | |
144 | : trace_endpoint("0.0.0.0", 0, "Messenger"), | |
145 | my_inst(), | |
146 | default_send_priority(CEPH_MSG_PRIO_DEFAULT), started(false), | |
147 | magic(0), | |
148 | socket_priority(-1), | |
149 | cct(cct_), | |
150 | crcflags(get_default_crc_flags(cct->_conf)) | |
151 | { | |
152 | my_inst.name = w; | |
153 | } | |
154 | virtual ~Messenger() {} | |
155 | ||
156 | /** | |
157 | * create a new messenger | |
158 | * | |
159 | * Create a new messenger instance, with whatever implementation is | |
160 | * available or specified via the configuration in cct. | |
161 | * | |
162 | * @param cct context | |
163 | * @param type name of messenger type | |
164 | * @param name entity name to register | |
165 | * @param lname logical name of the messenger in this process (e.g., "client") | |
166 | * @param nonce nonce value to uniquely identify this instance on the current host | |
167 | * @param features bits for the local connection | |
168 | * @param cflags general set of flags to configure transport resources | |
169 | */ | |
170 | static Messenger *create(CephContext *cct, | |
171 | const string &type, | |
172 | entity_name_t name, | |
173 | string lname, | |
174 | uint64_t nonce, | |
175 | uint64_t cflags); | |
176 | ||
177 | /** | |
178 | * create a new messenger | |
179 | * | |
180 | * Create a new messenger instance. | |
181 | * Same as the above, but a slightly simpler interface for clients: | |
182 | * - Generate a random nonce | |
183 | * - use the default feature bits | |
184 | * - get the messenger type from cct | |
185 | * - use the client entity_type | |
186 | * | |
187 | * @param cct context | |
188 | * @param lname logical name of the messenger in this process (e.g., "client") | |
189 | */ | |
190 | static Messenger *create_client_messenger(CephContext *cct, string lname); | |
191 | ||
192 | /** | |
193 | * @defgroup Accessors | |
194 | * @{ | |
195 | */ | |
196 | /** | |
197 | * Retrieve the Messenger's instance. | |
198 | * | |
199 | * @return A const reference to the instance this Messenger | |
200 | * currently believes to be its own. | |
201 | */ | |
202 | const entity_inst_t& get_myinst() { return my_inst; } | |
203 | /** | |
204 | * set messenger's instance | |
205 | */ | |
206 | void set_myinst(entity_inst_t i) { my_inst = i; } | |
207 | ||
208 | uint32_t get_magic() { return magic; } | |
209 | void set_magic(int _magic) { magic = _magic; } | |
210 | ||
211 | /** | |
212 | * Retrieve the Messenger's address. | |
213 | * | |
214 | * @return A const reference to the address this Messenger | |
215 | * currently believes to be its own. | |
216 | */ | |
217 | const entity_addr_t& get_myaddr() { return my_inst.addr; } | |
218 | protected: | |
219 | /** | |
220 | * set messenger's address | |
221 | */ | |
222 | virtual void set_myaddr(const entity_addr_t& a) { | |
223 | my_inst.addr = a; | |
224 | set_endpoint_addr(a, my_inst.name); | |
225 | } | |
226 | public: | |
227 | /** | |
228 | * @return the zipkin trace endpoint | |
229 | */ | |
230 | const ZTracer::Endpoint* get_trace_endpoint() const { | |
231 | return &trace_endpoint; | |
232 | } | |
233 | ||
234 | /** | |
235 | * Retrieve the Messenger's name. | |
236 | * | |
237 | * @return A const reference to the name this Messenger | |
238 | * currently believes to be its own. | |
239 | */ | |
240 | const entity_name_t& get_myname() { return my_inst.name; } | |
241 | /** | |
242 | * Set the name of the local entity. The name is reported to others and | |
243 | * can be changed while the system is running, but doing so at incorrect | |
244 | * times may have bad results. | |
245 | * | |
246 | * @param m The name to set. | |
247 | */ | |
248 | void set_myname(const entity_name_t& m) { my_inst.name = m; } | |
249 | /** | |
250 | * Set the unknown address components for this Messenger. | |
251 | * This is useful if the Messenger doesn't know its full address just by | |
252 | * binding, but another Messenger on the same interface has already learned | |
253 | * its full address. This function does not fill in known address elements, | |
254 | * cause a rebind, or do anything of that sort. | |
255 | * | |
256 | * @param addr The address to use as a template. | |
257 | */ | |
258 | virtual void set_addr_unknowns(const entity_addr_t &addr) = 0; | |
224ce89b WB |
259 | /** |
260 | * Set the address for this Messenger. This is useful if the Messenger | |
261 | * binds to a specific address but advertises a different address on the | |
262 | * the network. | |
263 | * | |
264 | * @param addr The address to use. | |
265 | */ | |
266 | virtual void set_addr(const entity_addr_t &addr) = 0; | |
7c673cae FG |
267 | /// Get the default send priority. |
268 | int get_default_send_priority() { return default_send_priority; } | |
269 | /** | |
270 | * Get the number of Messages which the Messenger has received | |
271 | * but not yet dispatched. | |
272 | */ | |
273 | virtual int get_dispatch_queue_len() = 0; | |
274 | ||
275 | /** | |
276 | * Get age of oldest undelivered message | |
277 | * (0 if the queue is empty) | |
278 | */ | |
279 | virtual double get_dispatch_queue_max_age(utime_t now) = 0; | |
280 | /** | |
281 | * Get the default crc flags for this messenger. | |
282 | * but not yet dispatched. | |
283 | */ | |
284 | static int get_default_crc_flags(md_config_t *); | |
285 | ||
286 | /** | |
287 | * @} // Accessors | |
288 | */ | |
289 | ||
290 | /** | |
291 | * @defgroup Configuration | |
292 | * @{ | |
293 | */ | |
294 | /** | |
295 | * Set the cluster protocol in use by this daemon. | |
296 | * This is an init-time function and cannot be called after calling | |
297 | * start() or bind(). | |
298 | * | |
299 | * @param p The cluster protocol to use. Defined externally. | |
300 | */ | |
301 | virtual void set_cluster_protocol(int p) = 0; | |
302 | /** | |
303 | * Set a policy which is applied to all peers who do not have a type-specific | |
304 | * Policy. | |
305 | * This is an init-time function and cannot be called after calling | |
306 | * start() or bind(). | |
307 | * | |
308 | * @param p The Policy to apply. | |
309 | */ | |
310 | virtual void set_default_policy(Policy p) = 0; | |
311 | /** | |
312 | * Set a policy which is applied to all peers of the given type. | |
313 | * This is an init-time function and cannot be called after calling | |
314 | * start() or bind(). | |
315 | * | |
316 | * @param type The peer type this policy applies to. | |
317 | * @param p The policy to apply. | |
318 | */ | |
319 | virtual void set_policy(int type, Policy p) = 0; | |
320 | /** | |
321 | * Set the Policy associated with a type of peer. | |
322 | * | |
323 | * This can be called either on initial setup, or after connections | |
324 | * are already established. However, the policies for existing | |
325 | * connections will not be affected; the new policy will only apply | |
326 | * to future connections. | |
327 | * | |
328 | * @param t The peer type to get the default policy for. | |
329 | * @return A const Policy reference. | |
330 | */ | |
331 | virtual Policy get_policy(int t) = 0; | |
332 | /** | |
333 | * Get the default Policy | |
334 | * | |
335 | * @return A const Policy reference. | |
336 | */ | |
337 | virtual Policy get_default_policy() = 0; | |
338 | /** | |
339 | * Set Throttlers applied to all Messages from the given type of peer | |
340 | * | |
341 | * This is an init-time function and cannot be called after calling | |
342 | * start() or bind(). | |
343 | * | |
344 | * @param type The peer type the Throttlers will apply to. | |
345 | * @param bytes The Throttle for the number of bytes carried by the message | |
346 | * @param msgs The Throttle for the number of messages for this @p type | |
347 | * @note The Messenger does not take ownership of the Throttle pointers, but | |
348 | * you must not destroy them before you destroy the Messenger. | |
349 | */ | |
350 | virtual void set_policy_throttlers(int type, Throttle *bytes, Throttle *msgs=NULL) = 0; | |
351 | /** | |
352 | * Set the default send priority | |
353 | * | |
354 | * This is an init-time function and must be called *before* calling | |
355 | * start(). | |
356 | * | |
357 | * @param p The cluster protocol to use. Defined externally. | |
358 | */ | |
359 | void set_default_send_priority(int p) { | |
360 | assert(!started); | |
361 | default_send_priority = p; | |
362 | } | |
363 | /** | |
364 | * Set the priority(SO_PRIORITY) for all packets to be sent on this socket. | |
365 | * | |
366 | * Linux uses this value to order the networking queues: packets with a higher | |
367 | * priority may be processed first depending on the selected device queueing | |
368 | * discipline. | |
369 | * | |
370 | * @param prio The priority. Setting a priority outside the range 0 to 6 | |
371 | * requires the CAP_NET_ADMIN capability. | |
372 | */ | |
373 | void set_socket_priority(int prio) { | |
374 | socket_priority = prio; | |
375 | } | |
376 | /** | |
377 | * Get the socket priority | |
378 | * | |
379 | * @return the socket priority | |
380 | */ | |
381 | int get_socket_priority() { | |
382 | return socket_priority; | |
383 | } | |
384 | /** | |
385 | * Add a new Dispatcher to the front of the list. If you add | |
386 | * a Dispatcher which is already included, it will get a duplicate | |
387 | * entry. This will reduce efficiency but not break anything. | |
388 | * | |
389 | * @param d The Dispatcher to insert into the list. | |
390 | */ | |
391 | void add_dispatcher_head(Dispatcher *d) { | |
392 | bool first = dispatchers.empty(); | |
393 | dispatchers.push_front(d); | |
394 | if (d->ms_can_fast_dispatch_any()) | |
395 | fast_dispatchers.push_front(d); | |
396 | if (first) | |
397 | ready(); | |
398 | } | |
399 | /** | |
400 | * Add a new Dispatcher to the end of the list. If you add | |
401 | * a Dispatcher which is already included, it will get a duplicate | |
402 | * entry. This will reduce efficiency but not break anything. | |
403 | * | |
404 | * @param d The Dispatcher to insert into the list. | |
405 | */ | |
406 | void add_dispatcher_tail(Dispatcher *d) { | |
407 | bool first = dispatchers.empty(); | |
408 | dispatchers.push_back(d); | |
409 | if (d->ms_can_fast_dispatch_any()) | |
410 | fast_dispatchers.push_back(d); | |
411 | if (first) | |
412 | ready(); | |
413 | } | |
414 | /** | |
415 | * Bind the Messenger to a specific address. If bind_addr | |
416 | * is not completely filled in the system will use the | |
417 | * valid portions and cycle through the unset ones (eg, the port) | |
418 | * in an unspecified order. | |
419 | * | |
420 | * @param bind_addr The address to bind to. | |
421 | * @return 0 on success, or -1 on error, or -errno if | |
422 | * we can be more specific about the failure. | |
423 | */ | |
424 | virtual int bind(const entity_addr_t& bind_addr) = 0; | |
425 | /** | |
426 | * This function performs a full restart of the Messenger component, | |
427 | * whatever that means. Other entities who connect to this | |
428 | * Messenger post-rebind() should perceive it as a new entity which | |
429 | * they have not previously contacted, and it MUST bind to a | |
430 | * different address than it did previously. | |
431 | * | |
432 | * @param avoid_ports Additional port to avoid binding to. | |
433 | */ | |
434 | virtual int rebind(const set<int>& avoid_ports) { return -EOPNOTSUPP; } | |
435 | /** | |
436 | * Bind the 'client' Messenger to a specific address.Messenger will bind | |
437 | * the address before connect to others when option ms_bind_before_connect | |
438 | * is true. | |
439 | * @param bind_addr The address to bind to. | |
440 | * @return 0 on success, or -1 on error, or -errno if | |
441 | */ | |
442 | virtual int client_bind(const entity_addr_t& bind_addr) = 0; | |
443 | /** | |
444 | * @} // Configuration | |
445 | */ | |
446 | ||
447 | /** | |
448 | * @defgroup Startup/Shutdown | |
449 | * @{ | |
450 | */ | |
451 | /** | |
452 | * Perform any resource allocation, thread startup, etc | |
453 | * that is required before attempting to connect to other | |
454 | * Messengers or transmit messages. | |
455 | * Once this function completes, started shall be set to true. | |
456 | * | |
457 | * @return 0 on success; -errno on failure. | |
458 | */ | |
459 | virtual int start() { started = true; return 0; } | |
460 | ||
461 | // shutdown | |
462 | /** | |
463 | * Block until the Messenger has finished shutting down (according | |
464 | * to the shutdown() function). | |
465 | * It is valid to call this after calling shutdown(), but it must | |
466 | * be called before deleting the Messenger. | |
467 | */ | |
468 | virtual void wait() = 0; | |
469 | /** | |
470 | * Initiate a shutdown of the Messenger. | |
471 | * | |
472 | * @return 0 on success, -errno otherwise. | |
473 | */ | |
474 | virtual int shutdown() { started = false; return 0; } | |
475 | /** | |
476 | * @} // Startup/Shutdown | |
477 | */ | |
478 | ||
479 | /** | |
480 | * @defgroup Messaging | |
481 | * @{ | |
482 | */ | |
483 | /** | |
484 | * Queue the given Message for the given entity. | |
485 | * Success in this function does not guarantee Message delivery, only | |
486 | * success in queueing the Message. Other guarantees may be provided based | |
487 | * on the Connection policy associated with the dest. | |
488 | * | |
489 | * @param m The Message to send. The Messenger consumes a single reference | |
490 | * when you pass it in. | |
491 | * @param dest The entity to send the Message to. | |
492 | * | |
493 | * DEPRECATED: please do not use this interface for any new code; | |
494 | * use the Connection* variant. | |
495 | * | |
496 | * @return 0 on success, or -errno on failure. | |
497 | */ | |
498 | virtual int send_message(Message *m, const entity_inst_t& dest) = 0; | |
499 | ||
500 | /** | |
501 | * @} // Messaging | |
502 | */ | |
503 | /** | |
504 | * @defgroup Connection Management | |
505 | * @{ | |
506 | */ | |
507 | /** | |
508 | * Get the Connection object associated with a given entity. If a | |
509 | * Connection does not exist, create one and establish a logical connection. | |
510 | * The caller owns a reference when this returns. Call ->put() when you're | |
511 | * done! | |
512 | * | |
513 | * @param dest The entity to get a connection for. | |
514 | */ | |
515 | virtual ConnectionRef get_connection(const entity_inst_t& dest) = 0; | |
516 | /** | |
517 | * Get the Connection object associated with ourselves. | |
518 | */ | |
519 | virtual ConnectionRef get_loopback_connection() = 0; | |
520 | /** | |
521 | * Mark down a Connection to a remote. | |
522 | * | |
523 | * This will cause us to discard our outgoing queue for them, and if | |
524 | * reset detection is enabled in the policy and the endpoint tries | |
525 | * to reconnect they will discard their queue when we inform them of | |
526 | * the session reset. | |
527 | * | |
528 | * If there is no Connection to the given dest, it is a no-op. | |
529 | * | |
530 | * This generates a RESET notification to the Dispatcher. | |
531 | * | |
532 | * DEPRECATED: please do not use this interface for any new code; | |
533 | * use the Connection* variant. | |
534 | * | |
535 | * @param a The address to mark down. | |
536 | */ | |
537 | virtual void mark_down(const entity_addr_t& a) = 0; | |
538 | /** | |
539 | * Mark all the existing Connections down. This is equivalent | |
540 | * to iterating over all Connections and calling mark_down() | |
541 | * on each. | |
542 | * | |
543 | * This will generate a RESET event for each closed connections. | |
544 | */ | |
545 | virtual void mark_down_all() = 0; | |
546 | /** | |
547 | * @} // Connection Management | |
548 | */ | |
549 | protected: | |
550 | /** | |
551 | * @defgroup Subclass Interfacing | |
552 | * @{ | |
553 | */ | |
554 | /** | |
555 | * A courtesy function for Messenger implementations which | |
556 | * will be called when we receive our first Dispatcher. | |
557 | */ | |
558 | virtual void ready() { } | |
559 | /** | |
560 | * @} // Subclass Interfacing | |
561 | */ | |
3efd9988 FG |
562 | public: |
563 | #ifdef CEPH_USE_SIGPIPE_BLOCKER | |
564 | /** | |
565 | * We need to disable SIGPIPE on all platforms, and if they | |
566 | * don't give us a better mechanism (read: are on Solaris) that | |
567 | * means blocking the signal whenever we do a send or sendmsg... | |
568 | * That means any implementations must invoke MSGR_SIGPIPE_STOPPER in-scope | |
569 | * whenever doing so. On most systems that's blank, but on systems where | |
570 | * it's needed we construct an RAII object to plug and un-plug the SIGPIPE. | |
571 | * See http://www.microhowto.info/howto/ignore_sigpipe_without_affecting_other_threads_in_a_process.html | |
572 | */ | |
573 | struct sigpipe_stopper { | |
574 | bool blocked; | |
575 | sigset_t existing_mask; | |
576 | sigset_t pipe_mask; | |
577 | sigpipe_stopper() { | |
578 | sigemptyset(&pipe_mask); | |
579 | sigaddset(&pipe_mask, SIGPIPE); | |
580 | sigset_t signals; | |
581 | sigemptyset(&signals); | |
582 | sigpending(&signals); | |
583 | if (sigismember(&signals, SIGPIPE)) { | |
584 | blocked = false; | |
585 | } else { | |
586 | blocked = true; | |
587 | int r = pthread_sigmask(SIG_BLOCK, &pipe_mask, &existing_mask); | |
588 | assert(r == 0); | |
589 | } | |
590 | } | |
591 | ~sigpipe_stopper() { | |
592 | if (blocked) { | |
593 | struct timespec nowait{0}; | |
594 | int r = sigtimedwait(&pipe_mask, 0, &nowait); | |
595 | assert(r == EAGAIN || r == 0); | |
596 | r = pthread_sigmask(SIG_SETMASK, &existing_mask, 0); | |
597 | assert(r == 0); | |
598 | } | |
599 | } | |
600 | }; | |
601 | # define MSGR_SIGPIPE_STOPPER Messenger::sigpipe_stopper stopper(); | |
602 | #else | |
603 | # define MSGR_SIGPIPE_STOPPER | |
604 | #endif | |
7c673cae FG |
605 | /** |
606 | * @defgroup Dispatcher Interfacing | |
607 | * @{ | |
608 | */ | |
7c673cae FG |
609 | /** |
610 | * Determine whether a message can be fast-dispatched. We will | |
611 | * query each Dispatcher in sequence to determine if they are | |
612 | * capable of handling a particular message via "fast dispatch". | |
613 | * | |
614 | * @param m The Message we are testing. | |
615 | */ | |
616 | bool ms_can_fast_dispatch(const Message *m) { | |
617 | for (list<Dispatcher*>::iterator p = fast_dispatchers.begin(); | |
618 | p != fast_dispatchers.end(); | |
619 | ++p) { | |
620 | if ((*p)->ms_can_fast_dispatch(m)) | |
621 | return true; | |
622 | } | |
623 | return false; | |
624 | } | |
625 | ||
626 | /** | |
627 | * Deliver a single Message via "fast dispatch". | |
628 | * | |
629 | * @param m The Message we are fast dispatching. We take ownership | |
630 | * of one reference to it. | |
631 | * If none of our Dispatchers can handle it, ceph_abort(). | |
632 | */ | |
633 | void ms_fast_dispatch(Message *m) { | |
634 | m->set_dispatch_stamp(ceph_clock_now()); | |
635 | for (list<Dispatcher*>::iterator p = fast_dispatchers.begin(); | |
636 | p != fast_dispatchers.end(); | |
637 | ++p) { | |
638 | if ((*p)->ms_can_fast_dispatch(m)) { | |
639 | (*p)->ms_fast_dispatch(m); | |
640 | return; | |
641 | } | |
642 | } | |
643 | ceph_abort(); | |
644 | } | |
645 | /** | |
646 | * | |
647 | */ | |
648 | void ms_fast_preprocess(Message *m) { | |
649 | for (list<Dispatcher*>::iterator p = fast_dispatchers.begin(); | |
650 | p != fast_dispatchers.end(); | |
651 | ++p) { | |
652 | (*p)->ms_fast_preprocess(m); | |
653 | } | |
654 | } | |
655 | /** | |
656 | * Deliver a single Message. Send it to each Dispatcher | |
657 | * in sequence until one of them handles it. | |
658 | * If none of our Dispatchers can handle it, assert(0). | |
659 | * | |
660 | * @param m The Message to deliver. We take ownership of | |
661 | * one reference to it. | |
662 | */ | |
663 | void ms_deliver_dispatch(Message *m) { | |
664 | m->set_dispatch_stamp(ceph_clock_now()); | |
665 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
666 | p != dispatchers.end(); | |
667 | ++p) { | |
668 | if ((*p)->ms_dispatch(m)) | |
669 | return; | |
670 | } | |
671 | lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from " | |
672 | << m->get_source_inst() << dendl; | |
673 | assert(!cct->_conf->ms_die_on_unhandled_msg); | |
674 | m->put(); | |
675 | } | |
676 | /** | |
677 | * Notify each Dispatcher of a new Connection. Call | |
678 | * this function whenever a new Connection is initiated or | |
679 | * reconnects. | |
680 | * | |
681 | * @param con Pointer to the new Connection. | |
682 | */ | |
683 | void ms_deliver_handle_connect(Connection *con) { | |
684 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
685 | p != dispatchers.end(); | |
686 | ++p) | |
687 | (*p)->ms_handle_connect(con); | |
688 | } | |
689 | ||
690 | /** | |
691 | * Notify each fast Dispatcher of a new Connection. Call | |
692 | * this function whenever a new Connection is initiated or | |
693 | * reconnects. | |
694 | * | |
695 | * @param con Pointer to the new Connection. | |
696 | */ | |
697 | void ms_deliver_handle_fast_connect(Connection *con) { | |
698 | for (list<Dispatcher*>::iterator p = fast_dispatchers.begin(); | |
699 | p != fast_dispatchers.end(); | |
700 | ++p) | |
701 | (*p)->ms_handle_fast_connect(con); | |
702 | } | |
703 | ||
704 | /** | |
705 | * Notify each Dispatcher of a new incomming Connection. Call | |
706 | * this function whenever a new Connection is accepted. | |
707 | * | |
708 | * @param con Pointer to the new Connection. | |
709 | */ | |
710 | void ms_deliver_handle_accept(Connection *con) { | |
711 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
712 | p != dispatchers.end(); | |
713 | ++p) | |
714 | (*p)->ms_handle_accept(con); | |
715 | } | |
716 | ||
717 | /** | |
718 | * Notify each fast Dispatcher of a new incoming Connection. Call | |
719 | * this function whenever a new Connection is accepted. | |
720 | * | |
721 | * @param con Pointer to the new Connection. | |
722 | */ | |
723 | void ms_deliver_handle_fast_accept(Connection *con) { | |
724 | for (list<Dispatcher*>::iterator p = fast_dispatchers.begin(); | |
725 | p != fast_dispatchers.end(); | |
726 | ++p) | |
727 | (*p)->ms_handle_fast_accept(con); | |
728 | } | |
729 | ||
730 | /** | |
731 | * Notify each Dispatcher of a Connection which may have lost | |
732 | * Messages. Call this function whenever you detect that a lossy Connection | |
733 | * has been disconnected. | |
734 | * | |
735 | * @param con Pointer to the broken Connection. | |
736 | */ | |
737 | void ms_deliver_handle_reset(Connection *con) { | |
738 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
739 | p != dispatchers.end(); | |
740 | ++p) { | |
741 | if ((*p)->ms_handle_reset(con)) | |
742 | return; | |
743 | } | |
744 | } | |
745 | /** | |
746 | * Notify each Dispatcher of a Connection which has been "forgotten" about | |
747 | * by the remote end, implying that messages have probably been lost. | |
748 | * Call this function whenever you detect a reset. | |
749 | * | |
750 | * @param con Pointer to the broken Connection. | |
751 | */ | |
752 | void ms_deliver_handle_remote_reset(Connection *con) { | |
753 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
754 | p != dispatchers.end(); | |
755 | ++p) | |
756 | (*p)->ms_handle_remote_reset(con); | |
757 | } | |
758 | ||
759 | /** | |
760 | * Notify each Dispatcher of a Connection for which reconnection | |
761 | * attempts are being refused. Call this function whenever you | |
762 | * detect that a lossy Connection has been disconnected and it's | |
763 | * impossible to reconnect. | |
764 | * | |
765 | * @param con Pointer to the broken Connection. | |
766 | */ | |
767 | void ms_deliver_handle_refused(Connection *con) { | |
768 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
769 | p != dispatchers.end(); | |
770 | ++p) { | |
771 | if ((*p)->ms_handle_refused(con)) | |
772 | return; | |
773 | } | |
774 | } | |
775 | ||
776 | /** | |
777 | * Get the AuthAuthorizer for a new outgoing Connection. | |
778 | * | |
779 | * @param peer_type The peer type for the new Connection | |
780 | * @param force_new True if we want to wait for new keys, false otherwise. | |
781 | * @return A pointer to the AuthAuthorizer, if we have one; NULL otherwise | |
782 | */ | |
783 | AuthAuthorizer *ms_deliver_get_authorizer(int peer_type, bool force_new) { | |
784 | AuthAuthorizer *a = 0; | |
785 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
786 | p != dispatchers.end(); | |
787 | ++p) { | |
788 | if ((*p)->ms_get_authorizer(peer_type, &a, force_new)) | |
789 | return a; | |
790 | } | |
791 | return NULL; | |
792 | } | |
793 | /** | |
794 | * Verify that the authorizer on a new incoming Connection is correct. | |
795 | * | |
796 | * @param con The new incoming Connection | |
797 | * @param peer_type The type of the endpoint on the new Connection | |
798 | * @param protocol The ID of the protocol in use (at time of writing, cephx or none) | |
799 | * @param authorizer The authorization string supplied by the remote | |
800 | * @param authorizer_reply Output param: The string we should send back to | |
801 | * the remote to authorize ourselves. Only filled in if isvalid | |
802 | * @param isvalid Output param: True if authorizer is valid, false otherwise | |
803 | * | |
804 | * @return True if we were able to prove or disprove correctness of | |
805 | * authorizer, false otherwise. | |
806 | */ | |
807 | bool ms_deliver_verify_authorizer(Connection *con, int peer_type, | |
808 | int protocol, bufferlist& authorizer, bufferlist& authorizer_reply, | |
28e407b8 AA |
809 | bool& isvalid, CryptoKey& session_key, |
810 | std::unique_ptr<AuthAuthorizerChallenge> *challenge) { | |
7c673cae FG |
811 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); |
812 | p != dispatchers.end(); | |
813 | ++p) { | |
28e407b8 AA |
814 | if ((*p)->ms_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply, |
815 | isvalid, session_key, challenge)) | |
7c673cae FG |
816 | return true; |
817 | } | |
818 | return false; | |
819 | } | |
820 | ||
821 | /** | |
822 | * @} // Dispatcher Interfacing | |
823 | */ | |
824 | }; | |
825 | ||
826 | ||
827 | ||
828 | #endif |