]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | ||
16 | ||
17 | #ifndef CEPH_MESSENGER_H | |
18 | #define CEPH_MESSENGER_H | |
19 | ||
20 | #include <map> | |
21 | using namespace std; | |
22 | ||
23 | #include "Message.h" | |
24 | #include "Dispatcher.h" | |
25 | #include "common/Mutex.h" | |
26 | #include "common/Cond.h" | |
27 | #include "include/Context.h" | |
28 | #include "include/types.h" | |
29 | #include "include/ceph_features.h" | |
30 | #include "auth/Crypto.h" | |
31 | ||
32 | #include <errno.h> | |
33 | #include <sstream> | |
34 | ||
35 | #define SOCKET_PRIORITY_MIN_DELAY 6 | |
36 | ||
37 | class Timer; | |
38 | ||
39 | ||
40 | class Messenger { | |
41 | private: | |
42 | list<Dispatcher*> dispatchers; | |
43 | list <Dispatcher*> fast_dispatchers; | |
44 | ZTracer::Endpoint trace_endpoint; | |
45 | ||
46 | void set_endpoint_addr(const entity_addr_t& a, | |
47 | const entity_name_t &name); | |
48 | ||
49 | protected: | |
50 | /// the "name" of the local daemon. eg client.99 | |
51 | entity_inst_t my_inst; | |
52 | int default_send_priority; | |
53 | /// set to true once the Messenger has started, and set to false on shutdown | |
54 | bool started; | |
55 | uint32_t magic; | |
56 | int socket_priority; | |
57 | ||
58 | public: | |
59 | /** | |
60 | * Various Messenger conditional config/type flags to allow | |
61 | * different "transport" Messengers to tune themselves | |
62 | */ | |
63 | static const int HAS_HEAVY_TRAFFIC = 0x0001; | |
64 | static const int HAS_MANY_CONNECTIONS = 0x0002; | |
65 | static const int HEARTBEAT = 0x0004; | |
66 | ||
67 | /** | |
68 | * The CephContext this Messenger uses. Many other components initialize themselves | |
69 | * from this value. | |
70 | */ | |
71 | CephContext *cct; | |
72 | int crcflags; | |
73 | ||
74 | /** | |
75 | * A Policy describes the rules of a Connection. Is there a limit on how | |
76 | * much data this Connection can have locally? When the underlying connection | |
77 | * experiences an error, does the Connection disappear? Can this Messenger | |
78 | * re-establish the underlying connection? | |
79 | */ | |
80 | struct Policy { | |
81 | /// If true, the Connection is tossed out on errors. | |
82 | bool lossy; | |
83 | /// If true, the underlying connection can't be re-established from this end. | |
84 | bool server; | |
85 | /// If true, we will standby when idle | |
86 | bool standby; | |
87 | /// If true, we will try to detect session resets | |
88 | bool resetcheck; | |
89 | /** | |
90 | * The throttler is used to limit how much data is held by Messages from | |
91 | * the associated Connection(s). When reading in a new Message, the Messenger | |
92 | * will call throttler->throttle() for the size of the new Message. | |
93 | */ | |
94 | Throttle *throttler_bytes; | |
95 | Throttle *throttler_messages; | |
96 | ||
97 | /// Specify features supported locally by the endpoint. | |
98 | uint64_t features_supported; | |
99 | /// Specify features any remotes must have to talk to this endpoint. | |
100 | uint64_t features_required; | |
101 | ||
102 | Policy() | |
103 | : lossy(false), server(false), standby(false), resetcheck(true), | |
104 | throttler_bytes(NULL), | |
105 | throttler_messages(NULL), | |
106 | features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT), | |
107 | features_required(0) {} | |
108 | private: | |
109 | Policy(bool l, bool s, bool st, bool r, uint64_t req) | |
110 | : lossy(l), server(s), standby(st), resetcheck(r), | |
111 | throttler_bytes(NULL), | |
112 | throttler_messages(NULL), | |
113 | features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT), | |
114 | features_required(req) {} | |
115 | ||
116 | public: | |
117 | static Policy stateful_server(uint64_t req) { | |
118 | return Policy(false, true, true, true, req); | |
119 | } | |
120 | static Policy stateless_server(uint64_t req) { | |
121 | return Policy(true, true, false, false, req); | |
122 | } | |
123 | static Policy lossless_peer(uint64_t req) { | |
124 | return Policy(false, false, true, false, req); | |
125 | } | |
126 | static Policy lossless_peer_reuse(uint64_t req) { | |
127 | return Policy(false, false, true, true, req); | |
128 | } | |
129 | static Policy lossy_client(uint64_t req) { | |
130 | return Policy(true, false, false, false, req); | |
131 | } | |
132 | static Policy lossless_client(uint64_t req) { | |
133 | return Policy(false, false, false, true, req); | |
134 | } | |
135 | }; | |
136 | ||
137 | /** | |
138 | * Messenger constructor. Call this from your implementation. | |
139 | * Messenger users should construct full implementations directly, | |
140 | * or use the create() function. | |
141 | */ | |
142 | Messenger(CephContext *cct_, entity_name_t w) | |
143 | : trace_endpoint("0.0.0.0", 0, "Messenger"), | |
144 | my_inst(), | |
145 | default_send_priority(CEPH_MSG_PRIO_DEFAULT), started(false), | |
146 | magic(0), | |
147 | socket_priority(-1), | |
148 | cct(cct_), | |
149 | crcflags(get_default_crc_flags(cct->_conf)) | |
150 | { | |
151 | my_inst.name = w; | |
152 | } | |
153 | virtual ~Messenger() {} | |
154 | ||
155 | /** | |
156 | * create a new messenger | |
157 | * | |
158 | * Create a new messenger instance, with whatever implementation is | |
159 | * available or specified via the configuration in cct. | |
160 | * | |
161 | * @param cct context | |
162 | * @param type name of messenger type | |
163 | * @param name entity name to register | |
164 | * @param lname logical name of the messenger in this process (e.g., "client") | |
165 | * @param nonce nonce value to uniquely identify this instance on the current host | |
166 | * @param features bits for the local connection | |
167 | * @param cflags general set of flags to configure transport resources | |
168 | */ | |
169 | static Messenger *create(CephContext *cct, | |
170 | const string &type, | |
171 | entity_name_t name, | |
172 | string lname, | |
173 | uint64_t nonce, | |
174 | uint64_t cflags); | |
175 | ||
176 | /** | |
177 | * create a new messenger | |
178 | * | |
179 | * Create a new messenger instance. | |
180 | * Same as the above, but a slightly simpler interface for clients: | |
181 | * - Generate a random nonce | |
182 | * - use the default feature bits | |
183 | * - get the messenger type from cct | |
184 | * - use the client entity_type | |
185 | * | |
186 | * @param cct context | |
187 | * @param lname logical name of the messenger in this process (e.g., "client") | |
188 | */ | |
189 | static Messenger *create_client_messenger(CephContext *cct, string lname); | |
190 | ||
191 | /** | |
192 | * @defgroup Accessors | |
193 | * @{ | |
194 | */ | |
195 | /** | |
196 | * Retrieve the Messenger's instance. | |
197 | * | |
198 | * @return A const reference to the instance this Messenger | |
199 | * currently believes to be its own. | |
200 | */ | |
201 | const entity_inst_t& get_myinst() { return my_inst; } | |
202 | /** | |
203 | * set messenger's instance | |
204 | */ | |
205 | void set_myinst(entity_inst_t i) { my_inst = i; } | |
206 | ||
207 | uint32_t get_magic() { return magic; } | |
208 | void set_magic(int _magic) { magic = _magic; } | |
209 | ||
210 | /** | |
211 | * Retrieve the Messenger's address. | |
212 | * | |
213 | * @return A const reference to the address this Messenger | |
214 | * currently believes to be its own. | |
215 | */ | |
216 | const entity_addr_t& get_myaddr() { return my_inst.addr; } | |
217 | protected: | |
218 | /** | |
219 | * set messenger's address | |
220 | */ | |
221 | virtual void set_myaddr(const entity_addr_t& a) { | |
222 | my_inst.addr = a; | |
223 | set_endpoint_addr(a, my_inst.name); | |
224 | } | |
225 | public: | |
226 | /** | |
227 | * @return the zipkin trace endpoint | |
228 | */ | |
229 | const ZTracer::Endpoint* get_trace_endpoint() const { | |
230 | return &trace_endpoint; | |
231 | } | |
232 | ||
233 | /** | |
234 | * Retrieve the Messenger's name. | |
235 | * | |
236 | * @return A const reference to the name this Messenger | |
237 | * currently believes to be its own. | |
238 | */ | |
239 | const entity_name_t& get_myname() { return my_inst.name; } | |
240 | /** | |
241 | * Set the name of the local entity. The name is reported to others and | |
242 | * can be changed while the system is running, but doing so at incorrect | |
243 | * times may have bad results. | |
244 | * | |
245 | * @param m The name to set. | |
246 | */ | |
247 | void set_myname(const entity_name_t& m) { my_inst.name = m; } | |
248 | /** | |
249 | * Set the unknown address components for this Messenger. | |
250 | * This is useful if the Messenger doesn't know its full address just by | |
251 | * binding, but another Messenger on the same interface has already learned | |
252 | * its full address. This function does not fill in known address elements, | |
253 | * cause a rebind, or do anything of that sort. | |
254 | * | |
255 | * @param addr The address to use as a template. | |
256 | */ | |
257 | virtual void set_addr_unknowns(const entity_addr_t &addr) = 0; | |
224ce89b WB |
258 | /** |
259 | * Set the address for this Messenger. This is useful if the Messenger | |
260 | * binds to a specific address but advertises a different address on the | |
261 | * the network. | |
262 | * | |
263 | * @param addr The address to use. | |
264 | */ | |
265 | virtual void set_addr(const entity_addr_t &addr) = 0; | |
7c673cae FG |
266 | /// Get the default send priority. |
267 | int get_default_send_priority() { return default_send_priority; } | |
268 | /** | |
269 | * Get the number of Messages which the Messenger has received | |
270 | * but not yet dispatched. | |
271 | */ | |
272 | virtual int get_dispatch_queue_len() = 0; | |
273 | ||
274 | /** | |
275 | * Get age of oldest undelivered message | |
276 | * (0 if the queue is empty) | |
277 | */ | |
278 | virtual double get_dispatch_queue_max_age(utime_t now) = 0; | |
279 | /** | |
280 | * Get the default crc flags for this messenger. | |
281 | * but not yet dispatched. | |
282 | */ | |
283 | static int get_default_crc_flags(md_config_t *); | |
284 | ||
285 | /** | |
286 | * @} // Accessors | |
287 | */ | |
288 | ||
289 | /** | |
290 | * @defgroup Configuration | |
291 | * @{ | |
292 | */ | |
293 | /** | |
294 | * Set the cluster protocol in use by this daemon. | |
295 | * This is an init-time function and cannot be called after calling | |
296 | * start() or bind(). | |
297 | * | |
298 | * @param p The cluster protocol to use. Defined externally. | |
299 | */ | |
300 | virtual void set_cluster_protocol(int p) = 0; | |
301 | /** | |
302 | * Set a policy which is applied to all peers who do not have a type-specific | |
303 | * Policy. | |
304 | * This is an init-time function and cannot be called after calling | |
305 | * start() or bind(). | |
306 | * | |
307 | * @param p The Policy to apply. | |
308 | */ | |
309 | virtual void set_default_policy(Policy p) = 0; | |
310 | /** | |
311 | * Set a policy which is applied to all peers of the given type. | |
312 | * This is an init-time function and cannot be called after calling | |
313 | * start() or bind(). | |
314 | * | |
315 | * @param type The peer type this policy applies to. | |
316 | * @param p The policy to apply. | |
317 | */ | |
318 | virtual void set_policy(int type, Policy p) = 0; | |
319 | /** | |
320 | * Set the Policy associated with a type of peer. | |
321 | * | |
322 | * This can be called either on initial setup, or after connections | |
323 | * are already established. However, the policies for existing | |
324 | * connections will not be affected; the new policy will only apply | |
325 | * to future connections. | |
326 | * | |
327 | * @param t The peer type to get the default policy for. | |
328 | * @return A const Policy reference. | |
329 | */ | |
330 | virtual Policy get_policy(int t) = 0; | |
331 | /** | |
332 | * Get the default Policy | |
333 | * | |
334 | * @return A const Policy reference. | |
335 | */ | |
336 | virtual Policy get_default_policy() = 0; | |
337 | /** | |
338 | * Set Throttlers applied to all Messages from the given type of peer | |
339 | * | |
340 | * This is an init-time function and cannot be called after calling | |
341 | * start() or bind(). | |
342 | * | |
343 | * @param type The peer type the Throttlers will apply to. | |
344 | * @param bytes The Throttle for the number of bytes carried by the message | |
345 | * @param msgs The Throttle for the number of messages for this @p type | |
346 | * @note The Messenger does not take ownership of the Throttle pointers, but | |
347 | * you must not destroy them before you destroy the Messenger. | |
348 | */ | |
349 | virtual void set_policy_throttlers(int type, Throttle *bytes, Throttle *msgs=NULL) = 0; | |
350 | /** | |
351 | * Set the default send priority | |
352 | * | |
353 | * This is an init-time function and must be called *before* calling | |
354 | * start(). | |
355 | * | |
356 | * @param p The cluster protocol to use. Defined externally. | |
357 | */ | |
358 | void set_default_send_priority(int p) { | |
359 | assert(!started); | |
360 | default_send_priority = p; | |
361 | } | |
362 | /** | |
363 | * Set the priority(SO_PRIORITY) for all packets to be sent on this socket. | |
364 | * | |
365 | * Linux uses this value to order the networking queues: packets with a higher | |
366 | * priority may be processed first depending on the selected device queueing | |
367 | * discipline. | |
368 | * | |
369 | * @param prio The priority. Setting a priority outside the range 0 to 6 | |
370 | * requires the CAP_NET_ADMIN capability. | |
371 | */ | |
372 | void set_socket_priority(int prio) { | |
373 | socket_priority = prio; | |
374 | } | |
375 | /** | |
376 | * Get the socket priority | |
377 | * | |
378 | * @return the socket priority | |
379 | */ | |
380 | int get_socket_priority() { | |
381 | return socket_priority; | |
382 | } | |
383 | /** | |
384 | * Add a new Dispatcher to the front of the list. If you add | |
385 | * a Dispatcher which is already included, it will get a duplicate | |
386 | * entry. This will reduce efficiency but not break anything. | |
387 | * | |
388 | * @param d The Dispatcher to insert into the list. | |
389 | */ | |
390 | void add_dispatcher_head(Dispatcher *d) { | |
391 | bool first = dispatchers.empty(); | |
392 | dispatchers.push_front(d); | |
393 | if (d->ms_can_fast_dispatch_any()) | |
394 | fast_dispatchers.push_front(d); | |
395 | if (first) | |
396 | ready(); | |
397 | } | |
398 | /** | |
399 | * Add a new Dispatcher to the end of the list. If you add | |
400 | * a Dispatcher which is already included, it will get a duplicate | |
401 | * entry. This will reduce efficiency but not break anything. | |
402 | * | |
403 | * @param d The Dispatcher to insert into the list. | |
404 | */ | |
405 | void add_dispatcher_tail(Dispatcher *d) { | |
406 | bool first = dispatchers.empty(); | |
407 | dispatchers.push_back(d); | |
408 | if (d->ms_can_fast_dispatch_any()) | |
409 | fast_dispatchers.push_back(d); | |
410 | if (first) | |
411 | ready(); | |
412 | } | |
413 | /** | |
414 | * Bind the Messenger to a specific address. If bind_addr | |
415 | * is not completely filled in the system will use the | |
416 | * valid portions and cycle through the unset ones (eg, the port) | |
417 | * in an unspecified order. | |
418 | * | |
419 | * @param bind_addr The address to bind to. | |
420 | * @return 0 on success, or -1 on error, or -errno if | |
421 | * we can be more specific about the failure. | |
422 | */ | |
423 | virtual int bind(const entity_addr_t& bind_addr) = 0; | |
424 | /** | |
425 | * This function performs a full restart of the Messenger component, | |
426 | * whatever that means. Other entities who connect to this | |
427 | * Messenger post-rebind() should perceive it as a new entity which | |
428 | * they have not previously contacted, and it MUST bind to a | |
429 | * different address than it did previously. | |
430 | * | |
431 | * @param avoid_ports Additional port to avoid binding to. | |
432 | */ | |
433 | virtual int rebind(const set<int>& avoid_ports) { return -EOPNOTSUPP; } | |
434 | /** | |
435 | * Bind the 'client' Messenger to a specific address.Messenger will bind | |
436 | * the address before connect to others when option ms_bind_before_connect | |
437 | * is true. | |
438 | * @param bind_addr The address to bind to. | |
439 | * @return 0 on success, or -1 on error, or -errno if | |
440 | */ | |
441 | virtual int client_bind(const entity_addr_t& bind_addr) = 0; | |
442 | /** | |
443 | * @} // Configuration | |
444 | */ | |
445 | ||
446 | /** | |
447 | * @defgroup Startup/Shutdown | |
448 | * @{ | |
449 | */ | |
450 | /** | |
451 | * Perform any resource allocation, thread startup, etc | |
452 | * that is required before attempting to connect to other | |
453 | * Messengers or transmit messages. | |
454 | * Once this function completes, started shall be set to true. | |
455 | * | |
456 | * @return 0 on success; -errno on failure. | |
457 | */ | |
458 | virtual int start() { started = true; return 0; } | |
459 | ||
460 | // shutdown | |
461 | /** | |
462 | * Block until the Messenger has finished shutting down (according | |
463 | * to the shutdown() function). | |
464 | * It is valid to call this after calling shutdown(), but it must | |
465 | * be called before deleting the Messenger. | |
466 | */ | |
467 | virtual void wait() = 0; | |
468 | /** | |
469 | * Initiate a shutdown of the Messenger. | |
470 | * | |
471 | * @return 0 on success, -errno otherwise. | |
472 | */ | |
473 | virtual int shutdown() { started = false; return 0; } | |
474 | /** | |
475 | * @} // Startup/Shutdown | |
476 | */ | |
477 | ||
478 | /** | |
479 | * @defgroup Messaging | |
480 | * @{ | |
481 | */ | |
482 | /** | |
483 | * Queue the given Message for the given entity. | |
484 | * Success in this function does not guarantee Message delivery, only | |
485 | * success in queueing the Message. Other guarantees may be provided based | |
486 | * on the Connection policy associated with the dest. | |
487 | * | |
488 | * @param m The Message to send. The Messenger consumes a single reference | |
489 | * when you pass it in. | |
490 | * @param dest The entity to send the Message to. | |
491 | * | |
492 | * DEPRECATED: please do not use this interface for any new code; | |
493 | * use the Connection* variant. | |
494 | * | |
495 | * @return 0 on success, or -errno on failure. | |
496 | */ | |
497 | virtual int send_message(Message *m, const entity_inst_t& dest) = 0; | |
498 | ||
499 | /** | |
500 | * @} // Messaging | |
501 | */ | |
502 | /** | |
503 | * @defgroup Connection Management | |
504 | * @{ | |
505 | */ | |
506 | /** | |
507 | * Get the Connection object associated with a given entity. If a | |
508 | * Connection does not exist, create one and establish a logical connection. | |
509 | * The caller owns a reference when this returns. Call ->put() when you're | |
510 | * done! | |
511 | * | |
512 | * @param dest The entity to get a connection for. | |
513 | */ | |
514 | virtual ConnectionRef get_connection(const entity_inst_t& dest) = 0; | |
515 | /** | |
516 | * Get the Connection object associated with ourselves. | |
517 | */ | |
518 | virtual ConnectionRef get_loopback_connection() = 0; | |
519 | /** | |
520 | * Mark down a Connection to a remote. | |
521 | * | |
522 | * This will cause us to discard our outgoing queue for them, and if | |
523 | * reset detection is enabled in the policy and the endpoint tries | |
524 | * to reconnect they will discard their queue when we inform them of | |
525 | * the session reset. | |
526 | * | |
527 | * If there is no Connection to the given dest, it is a no-op. | |
528 | * | |
529 | * This generates a RESET notification to the Dispatcher. | |
530 | * | |
531 | * DEPRECATED: please do not use this interface for any new code; | |
532 | * use the Connection* variant. | |
533 | * | |
534 | * @param a The address to mark down. | |
535 | */ | |
536 | virtual void mark_down(const entity_addr_t& a) = 0; | |
537 | /** | |
538 | * Mark all the existing Connections down. This is equivalent | |
539 | * to iterating over all Connections and calling mark_down() | |
540 | * on each. | |
541 | * | |
542 | * This will generate a RESET event for each closed connections. | |
543 | */ | |
544 | virtual void mark_down_all() = 0; | |
545 | /** | |
546 | * @} // Connection Management | |
547 | */ | |
548 | protected: | |
549 | /** | |
550 | * @defgroup Subclass Interfacing | |
551 | * @{ | |
552 | */ | |
553 | /** | |
554 | * A courtesy function for Messenger implementations which | |
555 | * will be called when we receive our first Dispatcher. | |
556 | */ | |
557 | virtual void ready() { } | |
558 | /** | |
559 | * @} // Subclass Interfacing | |
560 | */ | |
561 | /** | |
562 | * @defgroup Dispatcher Interfacing | |
563 | * @{ | |
564 | */ | |
565 | public: | |
566 | /** | |
567 | * Determine whether a message can be fast-dispatched. We will | |
568 | * query each Dispatcher in sequence to determine if they are | |
569 | * capable of handling a particular message via "fast dispatch". | |
570 | * | |
571 | * @param m The Message we are testing. | |
572 | */ | |
573 | bool ms_can_fast_dispatch(const Message *m) { | |
574 | for (list<Dispatcher*>::iterator p = fast_dispatchers.begin(); | |
575 | p != fast_dispatchers.end(); | |
576 | ++p) { | |
577 | if ((*p)->ms_can_fast_dispatch(m)) | |
578 | return true; | |
579 | } | |
580 | return false; | |
581 | } | |
582 | ||
583 | /** | |
584 | * Deliver a single Message via "fast dispatch". | |
585 | * | |
586 | * @param m The Message we are fast dispatching. We take ownership | |
587 | * of one reference to it. | |
588 | * If none of our Dispatchers can handle it, ceph_abort(). | |
589 | */ | |
590 | void ms_fast_dispatch(Message *m) { | |
591 | m->set_dispatch_stamp(ceph_clock_now()); | |
592 | for (list<Dispatcher*>::iterator p = fast_dispatchers.begin(); | |
593 | p != fast_dispatchers.end(); | |
594 | ++p) { | |
595 | if ((*p)->ms_can_fast_dispatch(m)) { | |
596 | (*p)->ms_fast_dispatch(m); | |
597 | return; | |
598 | } | |
599 | } | |
600 | ceph_abort(); | |
601 | } | |
602 | /** | |
603 | * | |
604 | */ | |
605 | void ms_fast_preprocess(Message *m) { | |
606 | for (list<Dispatcher*>::iterator p = fast_dispatchers.begin(); | |
607 | p != fast_dispatchers.end(); | |
608 | ++p) { | |
609 | (*p)->ms_fast_preprocess(m); | |
610 | } | |
611 | } | |
612 | /** | |
613 | * Deliver a single Message. Send it to each Dispatcher | |
614 | * in sequence until one of them handles it. | |
615 | * If none of our Dispatchers can handle it, assert(0). | |
616 | * | |
617 | * @param m The Message to deliver. We take ownership of | |
618 | * one reference to it. | |
619 | */ | |
620 | void ms_deliver_dispatch(Message *m) { | |
621 | m->set_dispatch_stamp(ceph_clock_now()); | |
622 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
623 | p != dispatchers.end(); | |
624 | ++p) { | |
625 | if ((*p)->ms_dispatch(m)) | |
626 | return; | |
627 | } | |
628 | lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from " | |
629 | << m->get_source_inst() << dendl; | |
630 | assert(!cct->_conf->ms_die_on_unhandled_msg); | |
631 | m->put(); | |
632 | } | |
633 | /** | |
634 | * Notify each Dispatcher of a new Connection. Call | |
635 | * this function whenever a new Connection is initiated or | |
636 | * reconnects. | |
637 | * | |
638 | * @param con Pointer to the new Connection. | |
639 | */ | |
640 | void ms_deliver_handle_connect(Connection *con) { | |
641 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
642 | p != dispatchers.end(); | |
643 | ++p) | |
644 | (*p)->ms_handle_connect(con); | |
645 | } | |
646 | ||
647 | /** | |
648 | * Notify each fast Dispatcher of a new Connection. Call | |
649 | * this function whenever a new Connection is initiated or | |
650 | * reconnects. | |
651 | * | |
652 | * @param con Pointer to the new Connection. | |
653 | */ | |
654 | void ms_deliver_handle_fast_connect(Connection *con) { | |
655 | for (list<Dispatcher*>::iterator p = fast_dispatchers.begin(); | |
656 | p != fast_dispatchers.end(); | |
657 | ++p) | |
658 | (*p)->ms_handle_fast_connect(con); | |
659 | } | |
660 | ||
661 | /** | |
662 | * Notify each Dispatcher of a new incomming Connection. Call | |
663 | * this function whenever a new Connection is accepted. | |
664 | * | |
665 | * @param con Pointer to the new Connection. | |
666 | */ | |
667 | void ms_deliver_handle_accept(Connection *con) { | |
668 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
669 | p != dispatchers.end(); | |
670 | ++p) | |
671 | (*p)->ms_handle_accept(con); | |
672 | } | |
673 | ||
674 | /** | |
675 | * Notify each fast Dispatcher of a new incoming Connection. Call | |
676 | * this function whenever a new Connection is accepted. | |
677 | * | |
678 | * @param con Pointer to the new Connection. | |
679 | */ | |
680 | void ms_deliver_handle_fast_accept(Connection *con) { | |
681 | for (list<Dispatcher*>::iterator p = fast_dispatchers.begin(); | |
682 | p != fast_dispatchers.end(); | |
683 | ++p) | |
684 | (*p)->ms_handle_fast_accept(con); | |
685 | } | |
686 | ||
687 | /** | |
688 | * Notify each Dispatcher of a Connection which may have lost | |
689 | * Messages. Call this function whenever you detect that a lossy Connection | |
690 | * has been disconnected. | |
691 | * | |
692 | * @param con Pointer to the broken Connection. | |
693 | */ | |
694 | void ms_deliver_handle_reset(Connection *con) { | |
695 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
696 | p != dispatchers.end(); | |
697 | ++p) { | |
698 | if ((*p)->ms_handle_reset(con)) | |
699 | return; | |
700 | } | |
701 | } | |
702 | /** | |
703 | * Notify each Dispatcher of a Connection which has been "forgotten" about | |
704 | * by the remote end, implying that messages have probably been lost. | |
705 | * Call this function whenever you detect a reset. | |
706 | * | |
707 | * @param con Pointer to the broken Connection. | |
708 | */ | |
709 | void ms_deliver_handle_remote_reset(Connection *con) { | |
710 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
711 | p != dispatchers.end(); | |
712 | ++p) | |
713 | (*p)->ms_handle_remote_reset(con); | |
714 | } | |
715 | ||
716 | /** | |
717 | * Notify each Dispatcher of a Connection for which reconnection | |
718 | * attempts are being refused. Call this function whenever you | |
719 | * detect that a lossy Connection has been disconnected and it's | |
720 | * impossible to reconnect. | |
721 | * | |
722 | * @param con Pointer to the broken Connection. | |
723 | */ | |
724 | void ms_deliver_handle_refused(Connection *con) { | |
725 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
726 | p != dispatchers.end(); | |
727 | ++p) { | |
728 | if ((*p)->ms_handle_refused(con)) | |
729 | return; | |
730 | } | |
731 | } | |
732 | ||
733 | /** | |
734 | * Get the AuthAuthorizer for a new outgoing Connection. | |
735 | * | |
736 | * @param peer_type The peer type for the new Connection | |
737 | * @param force_new True if we want to wait for new keys, false otherwise. | |
738 | * @return A pointer to the AuthAuthorizer, if we have one; NULL otherwise | |
739 | */ | |
740 | AuthAuthorizer *ms_deliver_get_authorizer(int peer_type, bool force_new) { | |
741 | AuthAuthorizer *a = 0; | |
742 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
743 | p != dispatchers.end(); | |
744 | ++p) { | |
745 | if ((*p)->ms_get_authorizer(peer_type, &a, force_new)) | |
746 | return a; | |
747 | } | |
748 | return NULL; | |
749 | } | |
750 | /** | |
751 | * Verify that the authorizer on a new incoming Connection is correct. | |
752 | * | |
753 | * @param con The new incoming Connection | |
754 | * @param peer_type The type of the endpoint on the new Connection | |
755 | * @param protocol The ID of the protocol in use (at time of writing, cephx or none) | |
756 | * @param authorizer The authorization string supplied by the remote | |
757 | * @param authorizer_reply Output param: The string we should send back to | |
758 | * the remote to authorize ourselves. Only filled in if isvalid | |
759 | * @param isvalid Output param: True if authorizer is valid, false otherwise | |
760 | * | |
761 | * @return True if we were able to prove or disprove correctness of | |
762 | * authorizer, false otherwise. | |
763 | */ | |
764 | bool ms_deliver_verify_authorizer(Connection *con, int peer_type, | |
765 | int protocol, bufferlist& authorizer, bufferlist& authorizer_reply, | |
766 | bool& isvalid, CryptoKey& session_key) { | |
767 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
768 | p != dispatchers.end(); | |
769 | ++p) { | |
770 | if ((*p)->ms_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply, isvalid, session_key)) | |
771 | return true; | |
772 | } | |
773 | return false; | |
774 | } | |
775 | ||
776 | /** | |
777 | * @} // Dispatcher Interfacing | |
778 | */ | |
779 | }; | |
780 | ||
781 | ||
782 | ||
783 | #endif |