]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | ||
16 | ||
17 | #ifndef CEPH_MESSENGER_H | |
18 | #define CEPH_MESSENGER_H | |
19 | ||
20 | #include <map> | |
21 | using namespace std; | |
22 | ||
23 | #include "Message.h" | |
24 | #include "Dispatcher.h" | |
25 | #include "common/Mutex.h" | |
26 | #include "common/Cond.h" | |
27 | #include "include/Context.h" | |
28 | #include "include/types.h" | |
29 | #include "include/ceph_features.h" | |
30 | #include "auth/Crypto.h" | |
31 | ||
32 | #include <errno.h> | |
33 | #include <sstream> | |
34 | ||
35 | #define SOCKET_PRIORITY_MIN_DELAY 6 | |
36 | ||
37 | class Timer; | |
38 | ||
39 | ||
40 | class Messenger { | |
41 | private: | |
42 | list<Dispatcher*> dispatchers; | |
43 | list <Dispatcher*> fast_dispatchers; | |
44 | ZTracer::Endpoint trace_endpoint; | |
45 | ||
46 | void set_endpoint_addr(const entity_addr_t& a, | |
47 | const entity_name_t &name); | |
48 | ||
49 | protected: | |
50 | /// the "name" of the local daemon. eg client.99 | |
51 | entity_inst_t my_inst; | |
52 | int default_send_priority; | |
53 | /// set to true once the Messenger has started, and set to false on shutdown | |
54 | bool started; | |
55 | uint32_t magic; | |
56 | int socket_priority; | |
57 | ||
58 | public: | |
59 | /** | |
60 | * Various Messenger conditional config/type flags to allow | |
61 | * different "transport" Messengers to tune themselves | |
62 | */ | |
63 | static const int HAS_HEAVY_TRAFFIC = 0x0001; | |
64 | static const int HAS_MANY_CONNECTIONS = 0x0002; | |
65 | static const int HEARTBEAT = 0x0004; | |
66 | ||
67 | /** | |
68 | * The CephContext this Messenger uses. Many other components initialize themselves | |
69 | * from this value. | |
70 | */ | |
71 | CephContext *cct; | |
72 | int crcflags; | |
73 | ||
74 | /** | |
75 | * A Policy describes the rules of a Connection. Is there a limit on how | |
76 | * much data this Connection can have locally? When the underlying connection | |
77 | * experiences an error, does the Connection disappear? Can this Messenger | |
78 | * re-establish the underlying connection? | |
79 | */ | |
80 | struct Policy { | |
81 | /// If true, the Connection is tossed out on errors. | |
82 | bool lossy; | |
83 | /// If true, the underlying connection can't be re-established from this end. | |
84 | bool server; | |
85 | /// If true, we will standby when idle | |
86 | bool standby; | |
87 | /// If true, we will try to detect session resets | |
88 | bool resetcheck; | |
89 | /** | |
90 | * The throttler is used to limit how much data is held by Messages from | |
91 | * the associated Connection(s). When reading in a new Message, the Messenger | |
92 | * will call throttler->throttle() for the size of the new Message. | |
93 | */ | |
94 | Throttle *throttler_bytes; | |
95 | Throttle *throttler_messages; | |
96 | ||
97 | /// Specify features supported locally by the endpoint. | |
98 | uint64_t features_supported; | |
99 | /// Specify features any remotes must have to talk to this endpoint. | |
100 | uint64_t features_required; | |
101 | ||
102 | Policy() | |
103 | : lossy(false), server(false), standby(false), resetcheck(true), | |
104 | throttler_bytes(NULL), | |
105 | throttler_messages(NULL), | |
106 | features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT), | |
107 | features_required(0) {} | |
108 | private: | |
109 | Policy(bool l, bool s, bool st, bool r, uint64_t req) | |
110 | : lossy(l), server(s), standby(st), resetcheck(r), | |
111 | throttler_bytes(NULL), | |
112 | throttler_messages(NULL), | |
113 | features_supported(CEPH_FEATURES_SUPPORTED_DEFAULT), | |
114 | features_required(req) {} | |
115 | ||
116 | public: | |
117 | static Policy stateful_server(uint64_t req) { | |
118 | return Policy(false, true, true, true, req); | |
119 | } | |
120 | static Policy stateless_server(uint64_t req) { | |
121 | return Policy(true, true, false, false, req); | |
122 | } | |
123 | static Policy lossless_peer(uint64_t req) { | |
124 | return Policy(false, false, true, false, req); | |
125 | } | |
126 | static Policy lossless_peer_reuse(uint64_t req) { | |
127 | return Policy(false, false, true, true, req); | |
128 | } | |
129 | static Policy lossy_client(uint64_t req) { | |
130 | return Policy(true, false, false, false, req); | |
131 | } | |
132 | static Policy lossless_client(uint64_t req) { | |
133 | return Policy(false, false, false, true, req); | |
134 | } | |
135 | }; | |
136 | ||
137 | /** | |
138 | * Messenger constructor. Call this from your implementation. | |
139 | * Messenger users should construct full implementations directly, | |
140 | * or use the create() function. | |
141 | */ | |
142 | Messenger(CephContext *cct_, entity_name_t w) | |
143 | : trace_endpoint("0.0.0.0", 0, "Messenger"), | |
144 | my_inst(), | |
145 | default_send_priority(CEPH_MSG_PRIO_DEFAULT), started(false), | |
146 | magic(0), | |
147 | socket_priority(-1), | |
148 | cct(cct_), | |
149 | crcflags(get_default_crc_flags(cct->_conf)) | |
150 | { | |
151 | my_inst.name = w; | |
152 | } | |
153 | virtual ~Messenger() {} | |
154 | ||
155 | /** | |
156 | * create a new messenger | |
157 | * | |
158 | * Create a new messenger instance, with whatever implementation is | |
159 | * available or specified via the configuration in cct. | |
160 | * | |
161 | * @param cct context | |
162 | * @param type name of messenger type | |
163 | * @param name entity name to register | |
164 | * @param lname logical name of the messenger in this process (e.g., "client") | |
165 | * @param nonce nonce value to uniquely identify this instance on the current host | |
166 | * @param features bits for the local connection | |
167 | * @param cflags general set of flags to configure transport resources | |
168 | */ | |
169 | static Messenger *create(CephContext *cct, | |
170 | const string &type, | |
171 | entity_name_t name, | |
172 | string lname, | |
173 | uint64_t nonce, | |
174 | uint64_t cflags); | |
175 | ||
176 | /** | |
177 | * create a new messenger | |
178 | * | |
179 | * Create a new messenger instance. | |
180 | * Same as the above, but a slightly simpler interface for clients: | |
181 | * - Generate a random nonce | |
182 | * - use the default feature bits | |
183 | * - get the messenger type from cct | |
184 | * - use the client entity_type | |
185 | * | |
186 | * @param cct context | |
187 | * @param lname logical name of the messenger in this process (e.g., "client") | |
188 | */ | |
189 | static Messenger *create_client_messenger(CephContext *cct, string lname); | |
190 | ||
191 | /** | |
192 | * @defgroup Accessors | |
193 | * @{ | |
194 | */ | |
195 | /** | |
196 | * Retrieve the Messenger's instance. | |
197 | * | |
198 | * @return A const reference to the instance this Messenger | |
199 | * currently believes to be its own. | |
200 | */ | |
201 | const entity_inst_t& get_myinst() { return my_inst; } | |
202 | /** | |
203 | * set messenger's instance | |
204 | */ | |
205 | void set_myinst(entity_inst_t i) { my_inst = i; } | |
206 | ||
207 | uint32_t get_magic() { return magic; } | |
208 | void set_magic(int _magic) { magic = _magic; } | |
209 | ||
210 | /** | |
211 | * Retrieve the Messenger's address. | |
212 | * | |
213 | * @return A const reference to the address this Messenger | |
214 | * currently believes to be its own. | |
215 | */ | |
216 | const entity_addr_t& get_myaddr() { return my_inst.addr; } | |
217 | protected: | |
218 | /** | |
219 | * set messenger's address | |
220 | */ | |
221 | virtual void set_myaddr(const entity_addr_t& a) { | |
222 | my_inst.addr = a; | |
223 | set_endpoint_addr(a, my_inst.name); | |
224 | } | |
225 | public: | |
226 | /** | |
227 | * @return the zipkin trace endpoint | |
228 | */ | |
229 | const ZTracer::Endpoint* get_trace_endpoint() const { | |
230 | return &trace_endpoint; | |
231 | } | |
232 | ||
233 | /** | |
234 | * Retrieve the Messenger's name. | |
235 | * | |
236 | * @return A const reference to the name this Messenger | |
237 | * currently believes to be its own. | |
238 | */ | |
239 | const entity_name_t& get_myname() { return my_inst.name; } | |
240 | /** | |
241 | * Set the name of the local entity. The name is reported to others and | |
242 | * can be changed while the system is running, but doing so at incorrect | |
243 | * times may have bad results. | |
244 | * | |
245 | * @param m The name to set. | |
246 | */ | |
247 | void set_myname(const entity_name_t& m) { my_inst.name = m; } | |
248 | /** | |
249 | * Set the unknown address components for this Messenger. | |
250 | * This is useful if the Messenger doesn't know its full address just by | |
251 | * binding, but another Messenger on the same interface has already learned | |
252 | * its full address. This function does not fill in known address elements, | |
253 | * cause a rebind, or do anything of that sort. | |
254 | * | |
255 | * @param addr The address to use as a template. | |
256 | */ | |
257 | virtual void set_addr_unknowns(const entity_addr_t &addr) = 0; | |
258 | /// Get the default send priority. | |
259 | int get_default_send_priority() { return default_send_priority; } | |
260 | /** | |
261 | * Get the number of Messages which the Messenger has received | |
262 | * but not yet dispatched. | |
263 | */ | |
264 | virtual int get_dispatch_queue_len() = 0; | |
265 | ||
266 | /** | |
267 | * Get age of oldest undelivered message | |
268 | * (0 if the queue is empty) | |
269 | */ | |
270 | virtual double get_dispatch_queue_max_age(utime_t now) = 0; | |
271 | /** | |
272 | * Get the default crc flags for this messenger. | |
273 | * but not yet dispatched. | |
274 | */ | |
275 | static int get_default_crc_flags(md_config_t *); | |
276 | ||
277 | /** | |
278 | * @} // Accessors | |
279 | */ | |
280 | ||
281 | /** | |
282 | * @defgroup Configuration | |
283 | * @{ | |
284 | */ | |
285 | /** | |
286 | * Set the cluster protocol in use by this daemon. | |
287 | * This is an init-time function and cannot be called after calling | |
288 | * start() or bind(). | |
289 | * | |
290 | * @param p The cluster protocol to use. Defined externally. | |
291 | */ | |
292 | virtual void set_cluster_protocol(int p) = 0; | |
293 | /** | |
294 | * Set a policy which is applied to all peers who do not have a type-specific | |
295 | * Policy. | |
296 | * This is an init-time function and cannot be called after calling | |
297 | * start() or bind(). | |
298 | * | |
299 | * @param p The Policy to apply. | |
300 | */ | |
301 | virtual void set_default_policy(Policy p) = 0; | |
302 | /** | |
303 | * Set a policy which is applied to all peers of the given type. | |
304 | * This is an init-time function and cannot be called after calling | |
305 | * start() or bind(). | |
306 | * | |
307 | * @param type The peer type this policy applies to. | |
308 | * @param p The policy to apply. | |
309 | */ | |
310 | virtual void set_policy(int type, Policy p) = 0; | |
311 | /** | |
312 | * Set the Policy associated with a type of peer. | |
313 | * | |
314 | * This can be called either on initial setup, or after connections | |
315 | * are already established. However, the policies for existing | |
316 | * connections will not be affected; the new policy will only apply | |
317 | * to future connections. | |
318 | * | |
319 | * @param t The peer type to get the default policy for. | |
320 | * @return A const Policy reference. | |
321 | */ | |
322 | virtual Policy get_policy(int t) = 0; | |
323 | /** | |
324 | * Get the default Policy | |
325 | * | |
326 | * @return A const Policy reference. | |
327 | */ | |
328 | virtual Policy get_default_policy() = 0; | |
329 | /** | |
330 | * Set Throttlers applied to all Messages from the given type of peer | |
331 | * | |
332 | * This is an init-time function and cannot be called after calling | |
333 | * start() or bind(). | |
334 | * | |
335 | * @param type The peer type the Throttlers will apply to. | |
336 | * @param bytes The Throttle for the number of bytes carried by the message | |
337 | * @param msgs The Throttle for the number of messages for this @p type | |
338 | * @note The Messenger does not take ownership of the Throttle pointers, but | |
339 | * you must not destroy them before you destroy the Messenger. | |
340 | */ | |
341 | virtual void set_policy_throttlers(int type, Throttle *bytes, Throttle *msgs=NULL) = 0; | |
342 | /** | |
343 | * Set the default send priority | |
344 | * | |
345 | * This is an init-time function and must be called *before* calling | |
346 | * start(). | |
347 | * | |
348 | * @param p The cluster protocol to use. Defined externally. | |
349 | */ | |
350 | void set_default_send_priority(int p) { | |
351 | assert(!started); | |
352 | default_send_priority = p; | |
353 | } | |
354 | /** | |
355 | * Set the priority(SO_PRIORITY) for all packets to be sent on this socket. | |
356 | * | |
357 | * Linux uses this value to order the networking queues: packets with a higher | |
358 | * priority may be processed first depending on the selected device queueing | |
359 | * discipline. | |
360 | * | |
361 | * @param prio The priority. Setting a priority outside the range 0 to 6 | |
362 | * requires the CAP_NET_ADMIN capability. | |
363 | */ | |
364 | void set_socket_priority(int prio) { | |
365 | socket_priority = prio; | |
366 | } | |
367 | /** | |
368 | * Get the socket priority | |
369 | * | |
370 | * @return the socket priority | |
371 | */ | |
372 | int get_socket_priority() { | |
373 | return socket_priority; | |
374 | } | |
375 | /** | |
376 | * Add a new Dispatcher to the front of the list. If you add | |
377 | * a Dispatcher which is already included, it will get a duplicate | |
378 | * entry. This will reduce efficiency but not break anything. | |
379 | * | |
380 | * @param d The Dispatcher to insert into the list. | |
381 | */ | |
382 | void add_dispatcher_head(Dispatcher *d) { | |
383 | bool first = dispatchers.empty(); | |
384 | dispatchers.push_front(d); | |
385 | if (d->ms_can_fast_dispatch_any()) | |
386 | fast_dispatchers.push_front(d); | |
387 | if (first) | |
388 | ready(); | |
389 | } | |
390 | /** | |
391 | * Add a new Dispatcher to the end of the list. If you add | |
392 | * a Dispatcher which is already included, it will get a duplicate | |
393 | * entry. This will reduce efficiency but not break anything. | |
394 | * | |
395 | * @param d The Dispatcher to insert into the list. | |
396 | */ | |
397 | void add_dispatcher_tail(Dispatcher *d) { | |
398 | bool first = dispatchers.empty(); | |
399 | dispatchers.push_back(d); | |
400 | if (d->ms_can_fast_dispatch_any()) | |
401 | fast_dispatchers.push_back(d); | |
402 | if (first) | |
403 | ready(); | |
404 | } | |
405 | /** | |
406 | * Bind the Messenger to a specific address. If bind_addr | |
407 | * is not completely filled in the system will use the | |
408 | * valid portions and cycle through the unset ones (eg, the port) | |
409 | * in an unspecified order. | |
410 | * | |
411 | * @param bind_addr The address to bind to. | |
412 | * @return 0 on success, or -1 on error, or -errno if | |
413 | * we can be more specific about the failure. | |
414 | */ | |
415 | virtual int bind(const entity_addr_t& bind_addr) = 0; | |
416 | /** | |
417 | * This function performs a full restart of the Messenger component, | |
418 | * whatever that means. Other entities who connect to this | |
419 | * Messenger post-rebind() should perceive it as a new entity which | |
420 | * they have not previously contacted, and it MUST bind to a | |
421 | * different address than it did previously. | |
422 | * | |
423 | * @param avoid_ports Additional port to avoid binding to. | |
424 | */ | |
425 | virtual int rebind(const set<int>& avoid_ports) { return -EOPNOTSUPP; } | |
426 | /** | |
427 | * Bind the 'client' Messenger to a specific address.Messenger will bind | |
428 | * the address before connect to others when option ms_bind_before_connect | |
429 | * is true. | |
430 | * @param bind_addr The address to bind to. | |
431 | * @return 0 on success, or -1 on error, or -errno if | |
432 | */ | |
433 | virtual int client_bind(const entity_addr_t& bind_addr) = 0; | |
434 | /** | |
435 | * @} // Configuration | |
436 | */ | |
437 | ||
438 | /** | |
439 | * @defgroup Startup/Shutdown | |
440 | * @{ | |
441 | */ | |
442 | /** | |
443 | * Perform any resource allocation, thread startup, etc | |
444 | * that is required before attempting to connect to other | |
445 | * Messengers or transmit messages. | |
446 | * Once this function completes, started shall be set to true. | |
447 | * | |
448 | * @return 0 on success; -errno on failure. | |
449 | */ | |
450 | virtual int start() { started = true; return 0; } | |
451 | ||
452 | // shutdown | |
453 | /** | |
454 | * Block until the Messenger has finished shutting down (according | |
455 | * to the shutdown() function). | |
456 | * It is valid to call this after calling shutdown(), but it must | |
457 | * be called before deleting the Messenger. | |
458 | */ | |
459 | virtual void wait() = 0; | |
460 | /** | |
461 | * Initiate a shutdown of the Messenger. | |
462 | * | |
463 | * @return 0 on success, -errno otherwise. | |
464 | */ | |
465 | virtual int shutdown() { started = false; return 0; } | |
466 | /** | |
467 | * @} // Startup/Shutdown | |
468 | */ | |
469 | ||
470 | /** | |
471 | * @defgroup Messaging | |
472 | * @{ | |
473 | */ | |
474 | /** | |
475 | * Queue the given Message for the given entity. | |
476 | * Success in this function does not guarantee Message delivery, only | |
477 | * success in queueing the Message. Other guarantees may be provided based | |
478 | * on the Connection policy associated with the dest. | |
479 | * | |
480 | * @param m The Message to send. The Messenger consumes a single reference | |
481 | * when you pass it in. | |
482 | * @param dest The entity to send the Message to. | |
483 | * | |
484 | * DEPRECATED: please do not use this interface for any new code; | |
485 | * use the Connection* variant. | |
486 | * | |
487 | * @return 0 on success, or -errno on failure. | |
488 | */ | |
489 | virtual int send_message(Message *m, const entity_inst_t& dest) = 0; | |
490 | ||
491 | /** | |
492 | * @} // Messaging | |
493 | */ | |
494 | /** | |
495 | * @defgroup Connection Management | |
496 | * @{ | |
497 | */ | |
498 | /** | |
499 | * Get the Connection object associated with a given entity. If a | |
500 | * Connection does not exist, create one and establish a logical connection. | |
501 | * The caller owns a reference when this returns. Call ->put() when you're | |
502 | * done! | |
503 | * | |
504 | * @param dest The entity to get a connection for. | |
505 | */ | |
506 | virtual ConnectionRef get_connection(const entity_inst_t& dest) = 0; | |
507 | /** | |
508 | * Get the Connection object associated with ourselves. | |
509 | */ | |
510 | virtual ConnectionRef get_loopback_connection() = 0; | |
511 | /** | |
512 | * Mark down a Connection to a remote. | |
513 | * | |
514 | * This will cause us to discard our outgoing queue for them, and if | |
515 | * reset detection is enabled in the policy and the endpoint tries | |
516 | * to reconnect they will discard their queue when we inform them of | |
517 | * the session reset. | |
518 | * | |
519 | * If there is no Connection to the given dest, it is a no-op. | |
520 | * | |
521 | * This generates a RESET notification to the Dispatcher. | |
522 | * | |
523 | * DEPRECATED: please do not use this interface for any new code; | |
524 | * use the Connection* variant. | |
525 | * | |
526 | * @param a The address to mark down. | |
527 | */ | |
528 | virtual void mark_down(const entity_addr_t& a) = 0; | |
529 | /** | |
530 | * Mark all the existing Connections down. This is equivalent | |
531 | * to iterating over all Connections and calling mark_down() | |
532 | * on each. | |
533 | * | |
534 | * This will generate a RESET event for each closed connections. | |
535 | */ | |
536 | virtual void mark_down_all() = 0; | |
537 | /** | |
538 | * @} // Connection Management | |
539 | */ | |
540 | protected: | |
541 | /** | |
542 | * @defgroup Subclass Interfacing | |
543 | * @{ | |
544 | */ | |
545 | /** | |
546 | * A courtesy function for Messenger implementations which | |
547 | * will be called when we receive our first Dispatcher. | |
548 | */ | |
549 | virtual void ready() { } | |
550 | /** | |
551 | * @} // Subclass Interfacing | |
552 | */ | |
553 | /** | |
554 | * @defgroup Dispatcher Interfacing | |
555 | * @{ | |
556 | */ | |
557 | public: | |
558 | /** | |
559 | * Determine whether a message can be fast-dispatched. We will | |
560 | * query each Dispatcher in sequence to determine if they are | |
561 | * capable of handling a particular message via "fast dispatch". | |
562 | * | |
563 | * @param m The Message we are testing. | |
564 | */ | |
565 | bool ms_can_fast_dispatch(const Message *m) { | |
566 | for (list<Dispatcher*>::iterator p = fast_dispatchers.begin(); | |
567 | p != fast_dispatchers.end(); | |
568 | ++p) { | |
569 | if ((*p)->ms_can_fast_dispatch(m)) | |
570 | return true; | |
571 | } | |
572 | return false; | |
573 | } | |
574 | ||
575 | /** | |
576 | * Deliver a single Message via "fast dispatch". | |
577 | * | |
578 | * @param m The Message we are fast dispatching. We take ownership | |
579 | * of one reference to it. | |
580 | * If none of our Dispatchers can handle it, ceph_abort(). | |
581 | */ | |
582 | void ms_fast_dispatch(Message *m) { | |
583 | m->set_dispatch_stamp(ceph_clock_now()); | |
584 | for (list<Dispatcher*>::iterator p = fast_dispatchers.begin(); | |
585 | p != fast_dispatchers.end(); | |
586 | ++p) { | |
587 | if ((*p)->ms_can_fast_dispatch(m)) { | |
588 | (*p)->ms_fast_dispatch(m); | |
589 | return; | |
590 | } | |
591 | } | |
592 | ceph_abort(); | |
593 | } | |
594 | /** | |
595 | * | |
596 | */ | |
597 | void ms_fast_preprocess(Message *m) { | |
598 | for (list<Dispatcher*>::iterator p = fast_dispatchers.begin(); | |
599 | p != fast_dispatchers.end(); | |
600 | ++p) { | |
601 | (*p)->ms_fast_preprocess(m); | |
602 | } | |
603 | } | |
604 | /** | |
605 | * Deliver a single Message. Send it to each Dispatcher | |
606 | * in sequence until one of them handles it. | |
607 | * If none of our Dispatchers can handle it, assert(0). | |
608 | * | |
609 | * @param m The Message to deliver. We take ownership of | |
610 | * one reference to it. | |
611 | */ | |
612 | void ms_deliver_dispatch(Message *m) { | |
613 | m->set_dispatch_stamp(ceph_clock_now()); | |
614 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
615 | p != dispatchers.end(); | |
616 | ++p) { | |
617 | if ((*p)->ms_dispatch(m)) | |
618 | return; | |
619 | } | |
620 | lsubdout(cct, ms, 0) << "ms_deliver_dispatch: unhandled message " << m << " " << *m << " from " | |
621 | << m->get_source_inst() << dendl; | |
622 | assert(!cct->_conf->ms_die_on_unhandled_msg); | |
623 | m->put(); | |
624 | } | |
625 | /** | |
626 | * Notify each Dispatcher of a new Connection. Call | |
627 | * this function whenever a new Connection is initiated or | |
628 | * reconnects. | |
629 | * | |
630 | * @param con Pointer to the new Connection. | |
631 | */ | |
632 | void ms_deliver_handle_connect(Connection *con) { | |
633 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
634 | p != dispatchers.end(); | |
635 | ++p) | |
636 | (*p)->ms_handle_connect(con); | |
637 | } | |
638 | ||
639 | /** | |
640 | * Notify each fast Dispatcher of a new Connection. Call | |
641 | * this function whenever a new Connection is initiated or | |
642 | * reconnects. | |
643 | * | |
644 | * @param con Pointer to the new Connection. | |
645 | */ | |
646 | void ms_deliver_handle_fast_connect(Connection *con) { | |
647 | for (list<Dispatcher*>::iterator p = fast_dispatchers.begin(); | |
648 | p != fast_dispatchers.end(); | |
649 | ++p) | |
650 | (*p)->ms_handle_fast_connect(con); | |
651 | } | |
652 | ||
653 | /** | |
654 | * Notify each Dispatcher of a new incomming Connection. Call | |
655 | * this function whenever a new Connection is accepted. | |
656 | * | |
657 | * @param con Pointer to the new Connection. | |
658 | */ | |
659 | void ms_deliver_handle_accept(Connection *con) { | |
660 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
661 | p != dispatchers.end(); | |
662 | ++p) | |
663 | (*p)->ms_handle_accept(con); | |
664 | } | |
665 | ||
666 | /** | |
667 | * Notify each fast Dispatcher of a new incoming Connection. Call | |
668 | * this function whenever a new Connection is accepted. | |
669 | * | |
670 | * @param con Pointer to the new Connection. | |
671 | */ | |
672 | void ms_deliver_handle_fast_accept(Connection *con) { | |
673 | for (list<Dispatcher*>::iterator p = fast_dispatchers.begin(); | |
674 | p != fast_dispatchers.end(); | |
675 | ++p) | |
676 | (*p)->ms_handle_fast_accept(con); | |
677 | } | |
678 | ||
679 | /** | |
680 | * Notify each Dispatcher of a Connection which may have lost | |
681 | * Messages. Call this function whenever you detect that a lossy Connection | |
682 | * has been disconnected. | |
683 | * | |
684 | * @param con Pointer to the broken Connection. | |
685 | */ | |
686 | void ms_deliver_handle_reset(Connection *con) { | |
687 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
688 | p != dispatchers.end(); | |
689 | ++p) { | |
690 | if ((*p)->ms_handle_reset(con)) | |
691 | return; | |
692 | } | |
693 | } | |
694 | /** | |
695 | * Notify each Dispatcher of a Connection which has been "forgotten" about | |
696 | * by the remote end, implying that messages have probably been lost. | |
697 | * Call this function whenever you detect a reset. | |
698 | * | |
699 | * @param con Pointer to the broken Connection. | |
700 | */ | |
701 | void ms_deliver_handle_remote_reset(Connection *con) { | |
702 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
703 | p != dispatchers.end(); | |
704 | ++p) | |
705 | (*p)->ms_handle_remote_reset(con); | |
706 | } | |
707 | ||
708 | /** | |
709 | * Notify each Dispatcher of a Connection for which reconnection | |
710 | * attempts are being refused. Call this function whenever you | |
711 | * detect that a lossy Connection has been disconnected and it's | |
712 | * impossible to reconnect. | |
713 | * | |
714 | * @param con Pointer to the broken Connection. | |
715 | */ | |
716 | void ms_deliver_handle_refused(Connection *con) { | |
717 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
718 | p != dispatchers.end(); | |
719 | ++p) { | |
720 | if ((*p)->ms_handle_refused(con)) | |
721 | return; | |
722 | } | |
723 | } | |
724 | ||
725 | /** | |
726 | * Get the AuthAuthorizer for a new outgoing Connection. | |
727 | * | |
728 | * @param peer_type The peer type for the new Connection | |
729 | * @param force_new True if we want to wait for new keys, false otherwise. | |
730 | * @return A pointer to the AuthAuthorizer, if we have one; NULL otherwise | |
731 | */ | |
732 | AuthAuthorizer *ms_deliver_get_authorizer(int peer_type, bool force_new) { | |
733 | AuthAuthorizer *a = 0; | |
734 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
735 | p != dispatchers.end(); | |
736 | ++p) { | |
737 | if ((*p)->ms_get_authorizer(peer_type, &a, force_new)) | |
738 | return a; | |
739 | } | |
740 | return NULL; | |
741 | } | |
742 | /** | |
743 | * Verify that the authorizer on a new incoming Connection is correct. | |
744 | * | |
745 | * @param con The new incoming Connection | |
746 | * @param peer_type The type of the endpoint on the new Connection | |
747 | * @param protocol The ID of the protocol in use (at time of writing, cephx or none) | |
748 | * @param authorizer The authorization string supplied by the remote | |
749 | * @param authorizer_reply Output param: The string we should send back to | |
750 | * the remote to authorize ourselves. Only filled in if isvalid | |
751 | * @param isvalid Output param: True if authorizer is valid, false otherwise | |
752 | * | |
753 | * @return True if we were able to prove or disprove correctness of | |
754 | * authorizer, false otherwise. | |
755 | */ | |
756 | bool ms_deliver_verify_authorizer(Connection *con, int peer_type, | |
757 | int protocol, bufferlist& authorizer, bufferlist& authorizer_reply, | |
758 | bool& isvalid, CryptoKey& session_key) { | |
759 | for (list<Dispatcher*>::iterator p = dispatchers.begin(); | |
760 | p != dispatchers.end(); | |
761 | ++p) { | |
762 | if ((*p)->ms_verify_authorizer(con, peer_type, protocol, authorizer, authorizer_reply, isvalid, session_key)) | |
763 | return true; | |
764 | } | |
765 | return false; | |
766 | } | |
767 | ||
768 | /** | |
769 | * @} // Dispatcher Interfacing | |
770 | */ | |
771 | }; | |
772 | ||
773 | ||
774 | ||
775 | #endif |