1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_MSG_DIRECTMESSENGER_H
16 #define CEPH_MSG_DIRECTMESSENGER_H
18 #include "msg/SimplePolicyMessenger.h"
19 #include "common/Semaphore.h"
22 class DispatchStrategy
;
25 * DirectMessenger provides a direct path between two messengers
26 * within a process. A pair of DirectMessengers share their
27 * DispatchStrategy with each other, and calls to send_message()
28 * forward the message directly to the other.
30 * This is for testing and i/o injection only, and cannot be used
31 * for normal messengers with ms_type.
33 class DirectMessenger
: public SimplePolicyMessenger
{
35 /// strategy for local dispatch
36 std::unique_ptr
<DispatchStrategy
> dispatchers
;
37 /// peer instance for comparison in get_connection()
38 entity_inst_t peer_inst
;
39 /// connection that sends to the peer's dispatchers
40 ConnectionRef peer_connection
;
41 /// connection that sends to my own dispatchers
42 ConnectionRef loopback_connection
;
43 /// semaphore for signalling wait() from shutdown()
47 DirectMessenger(CephContext
*cct
, entity_name_t name
,
48 string mname
, uint64_t nonce
,
49 DispatchStrategy
*dispatchers
);
52 /// attach to a peer messenger. must be called before start()
53 int set_direct_peer(DirectMessenger
*peer
);
56 // Messenger interface
58 /// sets the addr. must not be called after set_direct_peer() or start()
59 int bind(const entity_addr_t
& bind_addr
) override
;
61 /// sets the addr. must not be called after set_direct_peer() or start()
62 int client_bind(const entity_addr_t
& bind_addr
) override
;
64 /// starts dispatchers
67 /// breaks connections, stops dispatchers, and unblocks callers of wait()
68 int shutdown() override
;
70 /// blocks until shutdown() completes
73 /// returns a connection to the peer instance, a loopback connection to our
74 /// own instance, or null if not connected
75 ConnectionRef
get_connection(const entity_inst_t
& dst
) override
;
77 /// returns a loopback connection that dispatches to this messenger
78 ConnectionRef
get_loopback_connection() override
;
80 /// dispatches a message to the peer instance if connected
81 int send_message(Message
*m
, const entity_inst_t
& dst
) override
;
83 /// mark down the connection for the given address
84 void mark_down(const entity_addr_t
& a
) override
;
86 /// mark down all connections
87 void mark_down_all() override
;
90 // unimplemented Messenger interface
91 void set_addr_unknowns(const entity_addr_t
&addr
) override
{}
92 int get_dispatch_queue_len() override
{ return 0; }
93 double get_dispatch_queue_max_age(utime_t now
) override
{ return 0; }
94 void set_cluster_protocol(int p
) override
{}