]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #include "DirectMessenger.h" | |
16 | #include "msg/DispatchStrategy.h" | |
17 | ||
18 | ||
19 | class DirectConnection : public Connection { | |
20 | /// sent messages are dispatched here | |
21 | DispatchStrategy *const dispatchers; | |
22 | ||
23 | /// the connection that will be attached to outgoing messages, so that replies | |
24 | /// can be dispatched back to the sender. the pointer is atomic for | |
25 | /// thread-safety between mark_down() and send_message(). no reference is held | |
26 | /// on this Connection to avoid cyclical refs. we don't need a reference | |
27 | /// because its owning DirectMessenger will mark both connections down (and | |
28 | /// clear this pointer) before dropping its own reference | |
29 | std::atomic<Connection*> reply_connection{nullptr}; | |
30 | ||
9f95a23c TL |
31 | private: |
32 | FRIEND_MAKE_REF(DirectConnection); | |
7c673cae FG |
33 | DirectConnection(CephContext *cct, DirectMessenger *m, |
34 | DispatchStrategy *dispatchers) | |
35 | : Connection(cct, m), | |
36 | dispatchers(dispatchers) | |
37 | {} | |
38 | ||
9f95a23c | 39 | public: |
7c673cae FG |
40 | /// sets the Connection that will receive replies to outgoing messages |
41 | void set_direct_reply_connection(ConnectionRef conn); | |
42 | ||
43 | /// return true if a peer connection exists | |
44 | bool is_connected() override; | |
45 | ||
46 | /// pass the given message directly to our dispatchers | |
47 | int send_message(Message *m) override; | |
48 | ||
49 | /// release our pointer to the peer connection. later calls to is_connected() | |
50 | /// will return false, and send_message() will fail with -ENOTCONN | |
51 | void mark_down() override; | |
52 | ||
53 | /// noop - keepalive messages are not needed within a process | |
54 | void send_keepalive() override {} | |
55 | ||
56 | /// noop - reconnect/recovery semantics are not needed within a process | |
57 | void mark_disposable() override {} | |
58 | }; | |
59 | ||
60 | void DirectConnection::set_direct_reply_connection(ConnectionRef conn) | |
61 | { | |
62 | reply_connection.store(conn.get()); | |
63 | } | |
64 | ||
65 | bool DirectConnection::is_connected() | |
66 | { | |
67 | // true between calls to set_direct_reply_connection() and mark_down() | |
68 | return reply_connection.load() != nullptr; | |
69 | } | |
70 | ||
71 | int DirectConnection::send_message(Message *m) | |
72 | { | |
73 | // read reply_connection atomically and take a reference | |
74 | ConnectionRef conn = reply_connection.load(); | |
75 | if (!conn) { | |
76 | m->put(); | |
77 | return -ENOTCONN; | |
78 | } | |
79 | // attach reply_connection to the Message, so that calls to | |
80 | // m->get_connection()->send_message() can be dispatched back to the sender | |
81 | m->set_connection(conn); | |
82 | ||
83 | dispatchers->ds_dispatch(m); | |
84 | return 0; | |
85 | } | |
86 | ||
87 | void DirectConnection::mark_down() | |
88 | { | |
89 | Connection *conn = reply_connection.load(); | |
90 | if (!conn) { | |
91 | return; // already marked down | |
92 | } | |
93 | if (!reply_connection.compare_exchange_weak(conn, nullptr)) { | |
94 | return; // lost the race to mark down | |
95 | } | |
96 | // called only once to avoid loops | |
97 | conn->mark_down(); | |
98 | } | |
99 | ||
100 | ||
101 | static ConnectionRef create_loopback(DirectMessenger *m, | |
102 | entity_name_t name, | |
103 | DispatchStrategy *dispatchers) | |
104 | { | |
9f95a23c | 105 | auto loopback = ceph::make_ref<DirectConnection>(m->cct, m, dispatchers); |
7c673cae FG |
106 | // loopback replies go to itself |
107 | loopback->set_direct_reply_connection(loopback); | |
108 | loopback->set_peer_type(name.type()); | |
109 | loopback->set_features(CEPH_FEATURES_ALL); | |
110 | return loopback; | |
111 | } | |
112 | ||
113 | DirectMessenger::DirectMessenger(CephContext *cct, entity_name_t name, | |
114 | string mname, uint64_t nonce, | |
115 | DispatchStrategy *dispatchers) | |
116 | : SimplePolicyMessenger(cct, name, mname, nonce), | |
117 | dispatchers(dispatchers), | |
118 | loopback_connection(create_loopback(this, name, dispatchers)) | |
119 | { | |
120 | dispatchers->set_messenger(this); | |
121 | } | |
122 | ||
123 | DirectMessenger::~DirectMessenger() | |
124 | { | |
125 | } | |
126 | ||
127 | int DirectMessenger::set_direct_peer(DirectMessenger *peer) | |
128 | { | |
129 | if (get_myinst() == peer->get_myinst()) { | |
130 | return -EADDRINUSE; // must have a different entity instance | |
131 | } | |
132 | peer_inst = peer->get_myinst(); | |
133 | ||
134 | // allocate a Connection that dispatches to the peer messenger | |
9f95a23c | 135 | auto direct_connection = ceph::make_ref<DirectConnection>(cct, peer, peer->dispatchers.get()); |
7c673cae FG |
136 | |
137 | direct_connection->set_peer_addr(peer_inst.addr); | |
138 | direct_connection->set_peer_type(peer_inst.name.type()); | |
139 | direct_connection->set_features(CEPH_FEATURES_ALL); | |
140 | ||
141 | // if set_direct_peer() was already called on the peer messenger, we can | |
142 | // finish by attaching their connections. if not, the later call to | |
143 | // peer->set_direct_peer() will attach their connection to ours | |
144 | auto connection = peer->get_connection(get_myinst()); | |
145 | if (connection) { | |
146 | auto p = static_cast<DirectConnection*>(connection.get()); | |
147 | ||
148 | p->set_direct_reply_connection(direct_connection); | |
149 | direct_connection->set_direct_reply_connection(p); | |
150 | } | |
151 | ||
152 | peer_connection = std::move(direct_connection); | |
153 | return 0; | |
154 | } | |
155 | ||
156 | int DirectMessenger::bind(const entity_addr_t &bind_addr) | |
157 | { | |
158 | if (peer_connection) { | |
159 | return -EINVAL; // can't change address after sharing it with the peer | |
160 | } | |
161 | set_myaddr(bind_addr); | |
162 | loopback_connection->set_peer_addr(bind_addr); | |
163 | return 0; | |
164 | } | |
165 | ||
166 | int DirectMessenger::client_bind(const entity_addr_t &bind_addr) | |
167 | { | |
168 | // same as bind | |
169 | return bind(bind_addr); | |
170 | } | |
171 | ||
172 | int DirectMessenger::start() | |
173 | { | |
174 | if (!peer_connection) { | |
175 | return -EINVAL; // did not connect to a peer | |
176 | } | |
177 | if (started) { | |
178 | return -EINVAL; // already started | |
179 | } | |
180 | ||
181 | dispatchers->start(); | |
182 | return SimplePolicyMessenger::start(); | |
183 | } | |
184 | ||
185 | int DirectMessenger::shutdown() | |
186 | { | |
187 | if (!started) { | |
188 | return -EINVAL; // not started | |
189 | } | |
190 | ||
191 | mark_down_all(); | |
192 | peer_connection.reset(); | |
193 | loopback_connection.reset(); | |
194 | ||
195 | dispatchers->shutdown(); | |
196 | SimplePolicyMessenger::shutdown(); | |
197 | sem.Put(); // signal wait() | |
198 | return 0; | |
199 | } | |
200 | ||
201 | void DirectMessenger::wait() | |
202 | { | |
203 | sem.Get(); // wait on signal from shutdown() | |
204 | dispatchers->wait(); | |
205 | } | |
206 | ||
207 | ConnectionRef DirectMessenger::get_connection(const entity_inst_t& dst) | |
208 | { | |
209 | if (dst == peer_inst) { | |
210 | return peer_connection; | |
211 | } | |
212 | if (dst == get_myinst()) { | |
213 | return loopback_connection; | |
214 | } | |
215 | return nullptr; | |
216 | } | |
217 | ||
218 | ConnectionRef DirectMessenger::get_loopback_connection() | |
219 | { | |
220 | return loopback_connection; | |
221 | } | |
222 | ||
223 | int DirectMessenger::send_message(Message *m, const entity_inst_t& dst) | |
224 | { | |
225 | auto conn = get_connection(dst); | |
226 | if (!conn) { | |
227 | m->put(); | |
228 | return -ENOTCONN; | |
229 | } | |
230 | return conn->send_message(m); | |
231 | } | |
232 | ||
233 | void DirectMessenger::mark_down(const entity_addr_t& addr) | |
234 | { | |
235 | ConnectionRef conn; | |
236 | if (addr == peer_inst.addr) { | |
237 | conn = peer_connection; | |
11fdf7f2 | 238 | } else if (addr == get_myaddr_legacy()) { |
7c673cae FG |
239 | conn = loopback_connection; |
240 | } | |
241 | if (conn) { | |
242 | conn->mark_down(); | |
243 | } | |
244 | } | |
245 | ||
246 | void DirectMessenger::mark_down_all() | |
247 | { | |
248 | if (peer_connection) { | |
249 | peer_connection->mark_down(); | |
250 | } | |
251 | loopback_connection->mark_down(); | |
252 | } |