]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/direct_messenger/DirectMessenger.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / test / direct_messenger / DirectMessenger.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "DirectMessenger.h"
16#include "msg/DispatchStrategy.h"
17
18
19class DirectConnection : public Connection {
20 /// sent messages are dispatched here
21 DispatchStrategy *const dispatchers;
22
23 /// the connection that will be attached to outgoing messages, so that replies
24 /// can be dispatched back to the sender. the pointer is atomic for
25 /// thread-safety between mark_down() and send_message(). no reference is held
26 /// on this Connection to avoid cyclical refs. we don't need a reference
27 /// because its owning DirectMessenger will mark both connections down (and
28 /// clear this pointer) before dropping its own reference
29 std::atomic<Connection*> reply_connection{nullptr};
30
9f95a23c
TL
31 private:
32 FRIEND_MAKE_REF(DirectConnection);
7c673cae
FG
33 DirectConnection(CephContext *cct, DirectMessenger *m,
34 DispatchStrategy *dispatchers)
35 : Connection(cct, m),
36 dispatchers(dispatchers)
37 {}
38
9f95a23c 39 public:
7c673cae
FG
40 /// sets the Connection that will receive replies to outgoing messages
41 void set_direct_reply_connection(ConnectionRef conn);
42
43 /// return true if a peer connection exists
44 bool is_connected() override;
45
46 /// pass the given message directly to our dispatchers
47 int send_message(Message *m) override;
48
49 /// release our pointer to the peer connection. later calls to is_connected()
50 /// will return false, and send_message() will fail with -ENOTCONN
51 void mark_down() override;
52
53 /// noop - keepalive messages are not needed within a process
54 void send_keepalive() override {}
55
56 /// noop - reconnect/recovery semantics are not needed within a process
57 void mark_disposable() override {}
58};
59
60void DirectConnection::set_direct_reply_connection(ConnectionRef conn)
61{
62 reply_connection.store(conn.get());
63}
64
65bool DirectConnection::is_connected()
66{
67 // true between calls to set_direct_reply_connection() and mark_down()
68 return reply_connection.load() != nullptr;
69}
70
71int DirectConnection::send_message(Message *m)
72{
73 // read reply_connection atomically and take a reference
74 ConnectionRef conn = reply_connection.load();
75 if (!conn) {
76 m->put();
77 return -ENOTCONN;
78 }
79 // attach reply_connection to the Message, so that calls to
80 // m->get_connection()->send_message() can be dispatched back to the sender
81 m->set_connection(conn);
82
83 dispatchers->ds_dispatch(m);
84 return 0;
85}
86
87void DirectConnection::mark_down()
88{
89 Connection *conn = reply_connection.load();
90 if (!conn) {
91 return; // already marked down
92 }
93 if (!reply_connection.compare_exchange_weak(conn, nullptr)) {
94 return; // lost the race to mark down
95 }
96 // called only once to avoid loops
97 conn->mark_down();
98}
99
100
101static ConnectionRef create_loopback(DirectMessenger *m,
102 entity_name_t name,
103 DispatchStrategy *dispatchers)
104{
9f95a23c 105 auto loopback = ceph::make_ref<DirectConnection>(m->cct, m, dispatchers);
7c673cae
FG
106 // loopback replies go to itself
107 loopback->set_direct_reply_connection(loopback);
108 loopback->set_peer_type(name.type());
109 loopback->set_features(CEPH_FEATURES_ALL);
110 return loopback;
111}
112
113DirectMessenger::DirectMessenger(CephContext *cct, entity_name_t name,
114 string mname, uint64_t nonce,
115 DispatchStrategy *dispatchers)
116 : SimplePolicyMessenger(cct, name, mname, nonce),
117 dispatchers(dispatchers),
118 loopback_connection(create_loopback(this, name, dispatchers))
119{
120 dispatchers->set_messenger(this);
121}
122
123DirectMessenger::~DirectMessenger()
124{
125}
126
127int DirectMessenger::set_direct_peer(DirectMessenger *peer)
128{
129 if (get_myinst() == peer->get_myinst()) {
130 return -EADDRINUSE; // must have a different entity instance
131 }
132 peer_inst = peer->get_myinst();
133
134 // allocate a Connection that dispatches to the peer messenger
9f95a23c 135 auto direct_connection = ceph::make_ref<DirectConnection>(cct, peer, peer->dispatchers.get());
7c673cae
FG
136
137 direct_connection->set_peer_addr(peer_inst.addr);
138 direct_connection->set_peer_type(peer_inst.name.type());
139 direct_connection->set_features(CEPH_FEATURES_ALL);
140
141 // if set_direct_peer() was already called on the peer messenger, we can
142 // finish by attaching their connections. if not, the later call to
143 // peer->set_direct_peer() will attach their connection to ours
144 auto connection = peer->get_connection(get_myinst());
145 if (connection) {
146 auto p = static_cast<DirectConnection*>(connection.get());
147
148 p->set_direct_reply_connection(direct_connection);
149 direct_connection->set_direct_reply_connection(p);
150 }
151
152 peer_connection = std::move(direct_connection);
153 return 0;
154}
155
156int DirectMessenger::bind(const entity_addr_t &bind_addr)
157{
158 if (peer_connection) {
159 return -EINVAL; // can't change address after sharing it with the peer
160 }
161 set_myaddr(bind_addr);
162 loopback_connection->set_peer_addr(bind_addr);
163 return 0;
164}
165
166int DirectMessenger::client_bind(const entity_addr_t &bind_addr)
167{
168 // same as bind
169 return bind(bind_addr);
170}
171
172int DirectMessenger::start()
173{
174 if (!peer_connection) {
175 return -EINVAL; // did not connect to a peer
176 }
177 if (started) {
178 return -EINVAL; // already started
179 }
180
181 dispatchers->start();
182 return SimplePolicyMessenger::start();
183}
184
185int DirectMessenger::shutdown()
186{
187 if (!started) {
188 return -EINVAL; // not started
189 }
190
191 mark_down_all();
192 peer_connection.reset();
193 loopback_connection.reset();
194
195 dispatchers->shutdown();
196 SimplePolicyMessenger::shutdown();
197 sem.Put(); // signal wait()
198 return 0;
199}
200
201void DirectMessenger::wait()
202{
203 sem.Get(); // wait on signal from shutdown()
204 dispatchers->wait();
205}
206
207ConnectionRef DirectMessenger::get_connection(const entity_inst_t& dst)
208{
209 if (dst == peer_inst) {
210 return peer_connection;
211 }
212 if (dst == get_myinst()) {
213 return loopback_connection;
214 }
215 return nullptr;
216}
217
218ConnectionRef DirectMessenger::get_loopback_connection()
219{
220 return loopback_connection;
221}
222
223int DirectMessenger::send_message(Message *m, const entity_inst_t& dst)
224{
225 auto conn = get_connection(dst);
226 if (!conn) {
227 m->put();
228 return -ENOTCONN;
229 }
230 return conn->send_message(m);
231}
232
233void DirectMessenger::mark_down(const entity_addr_t& addr)
234{
235 ConnectionRef conn;
236 if (addr == peer_inst.addr) {
237 conn = peer_connection;
11fdf7f2 238 } else if (addr == get_myaddr_legacy()) {
7c673cae
FG
239 conn = loopback_connection;
240 }
241 if (conn) {
242 conn->mark_down();
243 }
244}
245
246void DirectMessenger::mark_down_all()
247{
248 if (peer_connection) {
249 peer_connection->mark_down();
250 }
251 loopback_connection->mark_down();
252}