]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/net/SocketMessenger.cc
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / crimson / net / SocketMessenger.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2017 Red Hat, Inc
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "SocketMessenger.h"
16
17#include <tuple>
18#include <boost/functional/hash.hpp>
19
20#include "auth/Auth.h"
21#include "Errors.h"
22#include "Dispatcher.h"
23#include "Socket.h"
24
11fdf7f2
TL
25namespace {
26 seastar::logger& logger() {
9f95a23c 27 return crimson::get_logger(ceph_subsys_ms);
11fdf7f2
TL
28 }
29}
30
9f95a23c
TL
31namespace crimson::net {
32
11fdf7f2
TL
33SocketMessenger::SocketMessenger(const entity_name_t& myname,
34 const std::string& logic_name,
9f95a23c 35 uint32_t nonce)
11fdf7f2 36 : Messenger{myname},
9f95a23c 37 master_sid{seastar::engine().cpu_id()},
11fdf7f2
TL
38 logic_name{logic_name},
39 nonce{nonce}
40{}
41
42seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
43{
9f95a23c 44 assert(seastar::engine().cpu_id() == master_sid);
11fdf7f2
TL
45 auto my_addrs = addrs;
46 for (auto& addr : my_addrs.v) {
47 addr.nonce = nonce;
48 }
9f95a23c
TL
49 return Messenger::set_myaddrs(my_addrs);
50}
51
52seastar::future<> SocketMessenger::do_bind(const entity_addrvec_t& addrs)
53{
54 assert(seastar::engine().cpu_id() == master_sid);
55 ceph_assert(addrs.front().get_family() == AF_INET);
56 return set_myaddrs(addrs).then([this] {
57 if (!listener) {
58 return FixedCPUServerSocket::create().then([this] (auto _listener) {
59 listener = _listener;
60 });
61 } else {
62 return seastar::now();
63 }
64 }).then([this] {
65 auto listen_addr = get_myaddr();
66 logger().debug("{} do_bind: try listen {}...", *this, listen_addr.in4_addr());
67 if (!listener) {
68 logger().warn("{} do_bind: listener doesn't exist", *this);
69 return seastar::now();
70 }
71 return listener->listen(listen_addr);
72 });
11fdf7f2
TL
73}
74
75seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs)
76{
9f95a23c
TL
77 return do_bind(addrs).then([this] {
78 logger().info("{} bind: done", *this);
79 });
11fdf7f2
TL
80}
81
82seastar::future<>
83SocketMessenger::try_bind(const entity_addrvec_t& addrs,
84 uint32_t min_port, uint32_t max_port)
85{
9f95a23c 86 auto addr = addrs.front();
11fdf7f2 87 if (addr.get_port() != 0) {
9f95a23c
TL
88 return do_bind(addrs).then([this] {
89 logger().info("{} try_bind: done", *this);
90 });
11fdf7f2
TL
91 }
92 ceph_assert(min_port <= max_port);
93 return seastar::do_with(uint32_t(min_port),
9f95a23c
TL
94 [this, max_port, addr] (auto& port) {
95 return seastar::repeat([this, max_port, addr, &port] {
96 auto to_bind = addr;
97 to_bind.set_port(port);
98 return do_bind(entity_addrvec_t{to_bind}).then([this] {
99 logger().info("{} try_bind: done", *this);
100 return stop_t::yes;
101 }).handle_exception_type([this, max_port, &port] (const std::system_error& e) {
102 assert(e.code() == std::errc::address_in_use);
103 logger().trace("{} try_bind: {} already used", *this, port);
104 if (port == max_port) {
105 throw;
106 }
107 ++port;
108 return stop_t::no;
109 });
11fdf7f2 110 });
9f95a23c 111 });
11fdf7f2
TL
112}
113
114seastar::future<> SocketMessenger::start(Dispatcher *disp) {
9f95a23c 115 assert(seastar::engine().cpu_id() == master_sid);
11fdf7f2 116
11fdf7f2 117 dispatcher = disp;
11fdf7f2 118 if (listener) {
9f95a23c
TL
119 // make sure we have already bound to a valid address
120 ceph_assert(get_myaddr().is_legacy() || get_myaddr().is_msgr2());
121 ceph_assert(get_myaddr().get_port() > 0);
11fdf7f2 122
9f95a23c
TL
123 return listener->accept([this] (SocketRef socket, entity_addr_t peer_addr) {
124 assert(seastar::engine().cpu_id() == master_sid);
125 SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
126 *this, *dispatcher, get_myaddr().is_msgr2());
127 conn->start_accept(std::move(socket), peer_addr);
128 return seastar::now();
129 });
130 }
11fdf7f2
TL
131 return seastar::now();
132}
133
9f95a23c
TL
134crimson::net::ConnectionRef
135SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
11fdf7f2 136{
9f95a23c
TL
137 assert(seastar::engine().cpu_id() == master_sid);
138
139 // make sure we connect to a valid peer_addr
140 ceph_assert(peer_addr.is_legacy() || peer_addr.is_msgr2());
141 ceph_assert(peer_addr.get_port() > 0);
142
11fdf7f2 143 if (auto found = lookup_conn(peer_addr); found) {
9f95a23c 144 return found->shared_from_this();
11fdf7f2 145 }
9f95a23c
TL
146 SocketConnectionRef conn = seastar::make_shared<SocketConnection>(
147 *this, *dispatcher, peer_addr.is_msgr2());
11fdf7f2 148 conn->start_connect(peer_addr, peer_type);
9f95a23c 149 return conn->shared_from_this();
11fdf7f2
TL
150}
151
9f95a23c 152seastar::future<> SocketMessenger::shutdown()
11fdf7f2 153{
9f95a23c
TL
154 assert(seastar::engine().cpu_id() == master_sid);
155 return seastar::futurize_apply([this] {
156 if (listener) {
157 auto d_listener = listener;
158 listener = nullptr;
159 return d_listener->destroy();
160 } else {
161 return seastar::now();
162 }
11fdf7f2 163 // close all connections
9f95a23c
TL
164 }).then([this] {
165 return seastar::parallel_for_each(accepting_conns, [] (auto conn) {
11fdf7f2 166 return conn->close();
11fdf7f2 167 });
9f95a23c
TL
168 }).then([this] {
169 ceph_assert(accepting_conns.empty());
170 return seastar::parallel_for_each(connections, [] (auto conn) {
171 return conn.second->close();
172 });
173 }).then([this] {
174 ceph_assert(connections.empty());
175 shutdown_promise.set_value();
176 });
11fdf7f2
TL
177}
178
9f95a23c 179seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me, const SocketConnection& conn)
11fdf7f2 180{
9f95a23c
TL
181 assert(seastar::engine().cpu_id() == master_sid);
182 if (!need_addr) {
183 if ((!get_myaddr().is_any() &&
184 get_myaddr().get_type() != peer_addr_for_me.get_type()) ||
185 get_myaddr().get_family() != peer_addr_for_me.get_family() ||
186 !get_myaddr().is_same_host(peer_addr_for_me)) {
187 logger().warn("{} peer_addr_for_me {} type/family/IP doesn't match myaddr {}",
188 conn, peer_addr_for_me, get_myaddr());
189 throw std::system_error(
190 make_error_code(crimson::net::error::bad_peer_address));
191 }
11fdf7f2
TL
192 return seastar::now();
193 }
9f95a23c
TL
194 need_addr = false;
195
196 if (get_myaddr().get_type() == entity_addr_t::TYPE_NONE) {
197 // Not bound
198 entity_addr_t addr = peer_addr_for_me;
199 addr.set_type(entity_addr_t::TYPE_ANY);
200 addr.set_port(0);
201 return set_myaddrs(entity_addrvec_t{addr}
202 ).then([this, &conn, peer_addr_for_me] {
203 logger().info("{} learned myaddr={} (unbound) from {}",
204 conn, get_myaddr(), peer_addr_for_me);
205 });
206 } else {
207 // Already bound
208 if (!get_myaddr().is_any() &&
209 get_myaddr().get_type() != peer_addr_for_me.get_type()) {
210 logger().warn("{} peer_addr_for_me {} type doesn't match myaddr {}",
211 conn, peer_addr_for_me, get_myaddr());
212 throw std::system_error(
213 make_error_code(crimson::net::error::bad_peer_address));
214 }
215 if (get_myaddr().get_family() != peer_addr_for_me.get_family()) {
216 logger().warn("{} peer_addr_for_me {} family doesn't match myaddr {}",
217 conn, peer_addr_for_me, get_myaddr());
218 throw std::system_error(
219 make_error_code(crimson::net::error::bad_peer_address));
220 }
221 if (get_myaddr().is_blank_ip()) {
222 entity_addr_t addr = peer_addr_for_me;
223 addr.set_type(get_myaddr().get_type());
224 addr.set_port(get_myaddr().get_port());
225 return set_myaddrs(entity_addrvec_t{addr}
226 ).then([this, &conn, peer_addr_for_me] {
227 logger().info("{} learned myaddr={} (blank IP) from {}",
228 conn, get_myaddr(), peer_addr_for_me);
229 });
230 } else if (!get_myaddr().is_same_host(peer_addr_for_me)) {
231 logger().warn("{} peer_addr_for_me {} IP doesn't match myaddr {}",
232 conn, peer_addr_for_me, get_myaddr());
233 throw std::system_error(
234 make_error_code(crimson::net::error::bad_peer_address));
235 } else {
236 return seastar::now();
237 }
238 }
239}
240
241SocketPolicy SocketMessenger::get_policy(entity_type_t peer_type) const
242{
243 return policy_set.get(peer_type);
244}
11fdf7f2 245
9f95a23c
TL
246SocketPolicy SocketMessenger::get_default_policy() const
247{
248 return policy_set.get_default();
11fdf7f2
TL
249}
250
251void SocketMessenger::set_default_policy(const SocketPolicy& p)
252{
253 policy_set.set_default(p);
254}
255
256void SocketMessenger::set_policy(entity_type_t peer_type,
257 const SocketPolicy& p)
258{
259 policy_set.set(peer_type, p);
260}
261
262void SocketMessenger::set_policy_throttler(entity_type_t peer_type,
263 Throttle* throttle)
264{
265 // only byte throttler is used in OSD
266 policy_set.set_throttlers(peer_type, throttle, nullptr);
267}
268
9f95a23c 269crimson::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
11fdf7f2
TL
270{
271 if (auto found = connections.find(addr);
272 found != connections.end()) {
273 return found->second;
274 } else {
275 return nullptr;
276 }
277}
278
279void SocketMessenger::accept_conn(SocketConnectionRef conn)
280{
281 accepting_conns.insert(conn);
282}
283
284void SocketMessenger::unaccept_conn(SocketConnectionRef conn)
285{
286 accepting_conns.erase(conn);
287}
288
289void SocketMessenger::register_conn(SocketConnectionRef conn)
290{
11fdf7f2
TL
291 auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
292 std::ignore = i;
293 ceph_assert(added);
294}
295
296void SocketMessenger::unregister_conn(SocketConnectionRef conn)
297{
298 ceph_assert(conn);
299 auto found = connections.find(conn->get_peer_addr());
300 ceph_assert(found != connections.end());
301 ceph_assert(found->second == conn);
302 connections.erase(found);
303}
9f95a23c
TL
304
305seastar::future<uint32_t>
306SocketMessenger::get_global_seq(uint32_t old)
307{
308 if (old > global_seq) {
309 global_seq = old;
310 }
311 return seastar::make_ready_future<uint32_t>(++global_seq);
312}
313
314} // namespace crimson::net