]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/net/SocketMessenger.cc
update download target update for octopus release
[ceph.git] / ceph / src / crimson / net / SocketMessenger.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2017 Red Hat, Inc
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "SocketMessenger.h"
16
17#include <tuple>
18#include <boost/functional/hash.hpp>
19
20#include "auth/Auth.h"
21#include "Errors.h"
22#include "Dispatcher.h"
23#include "Socket.h"
24
25using namespace ceph::net;
26
27namespace {
28 seastar::logger& logger() {
29 return ceph::get_logger(ceph_subsys_ms);
30 }
31}
32
33SocketMessenger::SocketMessenger(const entity_name_t& myname,
34 const std::string& logic_name,
35 uint32_t nonce,
36 int master_sid)
37 : Messenger{myname},
38 master_sid{master_sid},
39 sid{seastar::engine().cpu_id()},
40 logic_name{logic_name},
41 nonce{nonce}
42{}
43
44seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
45{
46 auto my_addrs = addrs;
47 for (auto& addr : my_addrs.v) {
48 addr.nonce = nonce;
49 }
50 return container().invoke_on_all([my_addrs](auto& msgr) {
51 return msgr.Messenger::set_myaddrs(my_addrs);
52 });
53}
54
55seastar::future<> SocketMessenger::bind(const entity_addrvec_t& addrs)
56{
57 ceph_assert(addrs.legacy_addr().get_family() == AF_INET);
58 auto my_addrs = addrs;
59 for (auto& addr : my_addrs.v) {
60 addr.nonce = nonce;
61 }
62 logger().info("listening on {}", my_addrs.legacy_addr().in4_addr());
63 return container().invoke_on_all([my_addrs](auto& msgr) {
64 msgr.do_bind(my_addrs);
65 });
66}
67
68seastar::future<>
69SocketMessenger::try_bind(const entity_addrvec_t& addrs,
70 uint32_t min_port, uint32_t max_port)
71{
72 auto addr = addrs.legacy_or_front_addr();
73 if (addr.get_port() != 0) {
74 return bind(addrs);
75 }
76 ceph_assert(min_port <= max_port);
77 return seastar::do_with(uint32_t(min_port),
78 [this, max_port, addr] (auto& port) {
79 return seastar::repeat([this, max_port, addr, &port] {
80 auto to_bind = addr;
81 to_bind.set_port(port);
82 return bind(entity_addrvec_t{to_bind})
83 .then([this] {
84 logger().info("{}: try_bind: done", *this);
85 return stop_t::yes;
86 }).handle_exception_type([this, max_port, &port] (const std::system_error& e) {
87 logger().debug("{}: try_bind: {} already used", *this, port);
88 if (port == max_port) {
89 throw e;
90 }
91 ++port;
92 return stop_t::no;
93 });
94 });
95 });
96}
97
98seastar::future<> SocketMessenger::start(Dispatcher *disp) {
99 return container().invoke_on_all([disp](auto& msgr) {
100 return msgr.do_start(disp->get_local_shard());
101 });
102}
103
104seastar::future<ceph::net::ConnectionXRef>
105SocketMessenger::connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
106{
107 auto shard = locate_shard(peer_addr);
108 return container().invoke_on(shard, [peer_addr, peer_type](auto& msgr) {
109 return msgr.do_connect(peer_addr, peer_type);
110 }).then([](seastar::foreign_ptr<ConnectionRef>&& conn) {
111 return seastar::make_lw_shared<seastar::foreign_ptr<ConnectionRef>>(std::move(conn));
112 });
113}
114
115seastar::future<> SocketMessenger::shutdown()
116{
117 return container().invoke_on_all([](auto& msgr) {
118 return msgr.do_shutdown();
119 }).finally([this] {
120 return container().invoke_on_all([](auto& msgr) {
121 msgr.shutdown_promise.set_value();
122 });
123 });
124}
125
126void SocketMessenger::do_bind(const entity_addrvec_t& addrs)
127{
128 Messenger::set_myaddrs(addrs);
129
130 // TODO: v2: listen on multiple addresses
131 seastar::socket_address address(addrs.legacy_addr().in4_addr());
132 seastar::listen_options lo;
133 lo.reuse_address = true;
134 listener = seastar::listen(address, lo);
135}
136
137seastar::future<> SocketMessenger::do_start(Dispatcher *disp)
138{
139 dispatcher = disp;
140
141 // start listening if bind() was called
142 if (listener) {
143 seastar::keep_doing([this] {
144 return listener->accept()
145 .then([this] (seastar::connected_socket socket,
146 seastar::socket_address paddr) {
147 // allocate the connection
148 entity_addr_t peer_addr;
149 peer_addr.set_sockaddr(&paddr.as_posix_sockaddr());
150 auto shard = locate_shard(peer_addr);
151#warning fixme
152 // we currently do dangerous i/o from a Connection core, different from the Socket core.
153 auto sock = seastar::make_foreign(std::make_unique<Socket>(std::move(socket)));
154 // don't wait before accepting another
155 container().invoke_on(shard, [sock = std::move(sock), peer_addr, this](auto& msgr) mutable {
156 SocketConnectionRef conn = seastar::make_shared<SocketConnection>(msgr, *msgr.dispatcher);
157 conn->start_accept(std::move(sock), peer_addr);
158 });
159 });
160 }).handle_exception_type([this] (const std::system_error& e) {
161 // stop gracefully on connection_aborted
162 if (e.code() != error::connection_aborted) {
163 logger().error("{} unexpected error during accept: {}", *this, e);
164 }
165 });
166 }
167
168 return seastar::now();
169}
170
171seastar::foreign_ptr<ceph::net::ConnectionRef>
172SocketMessenger::do_connect(const entity_addr_t& peer_addr, const entity_type_t& peer_type)
173{
174 if (auto found = lookup_conn(peer_addr); found) {
175 return seastar::make_foreign(found->shared_from_this());
176 }
177 SocketConnectionRef conn = seastar::make_shared<SocketConnection>(*this, *dispatcher);
178 conn->start_connect(peer_addr, peer_type);
179 return seastar::make_foreign(conn->shared_from_this());
180}
181
182seastar::future<> SocketMessenger::do_shutdown()
183{
184 if (listener) {
185 listener->abort_accept();
186 }
187 // close all connections
188 return seastar::parallel_for_each(accepting_conns, [] (auto conn) {
189 return conn->close();
190 }).then([this] {
191 ceph_assert(accepting_conns.empty());
192 return seastar::parallel_for_each(connections, [] (auto conn) {
193 return conn.second->close();
194 });
195 }).finally([this] {
196 ceph_assert(connections.empty());
197 });
198}
199
200seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me)
201{
202 if (!get_myaddr().is_blank_ip()) {
203 // already learned or binded
204 return seastar::now();
205 }
206
207 // Only learn IP address if blank.
208 entity_addr_t addr = get_myaddr();
209 addr.u = peer_addr_for_me.u;
210 addr.set_type(peer_addr_for_me.get_type());
211 addr.set_port(get_myaddr().get_port());
212 return set_myaddrs(entity_addrvec_t{addr});
213}
214
215void SocketMessenger::set_default_policy(const SocketPolicy& p)
216{
217 policy_set.set_default(p);
218}
219
220void SocketMessenger::set_policy(entity_type_t peer_type,
221 const SocketPolicy& p)
222{
223 policy_set.set(peer_type, p);
224}
225
226void SocketMessenger::set_policy_throttler(entity_type_t peer_type,
227 Throttle* throttle)
228{
229 // only byte throttler is used in OSD
230 policy_set.set_throttlers(peer_type, throttle, nullptr);
231}
232
233seastar::shard_id SocketMessenger::locate_shard(const entity_addr_t& addr)
234{
235 ceph_assert(addr.get_family() == AF_INET);
236 if (master_sid >= 0) {
237 return master_sid;
238 }
239 std::size_t seed = 0;
240 boost::hash_combine(seed, addr.u.sin.sin_addr.s_addr);
241 //boost::hash_combine(seed, addr.u.sin.sin_port);
242 //boost::hash_combine(seed, addr.nonce);
243 return seed % seastar::smp::count;
244}
245
246ceph::net::SocketConnectionRef SocketMessenger::lookup_conn(const entity_addr_t& addr)
247{
248 if (auto found = connections.find(addr);
249 found != connections.end()) {
250 return found->second;
251 } else {
252 return nullptr;
253 }
254}
255
256void SocketMessenger::accept_conn(SocketConnectionRef conn)
257{
258 accepting_conns.insert(conn);
259}
260
261void SocketMessenger::unaccept_conn(SocketConnectionRef conn)
262{
263 accepting_conns.erase(conn);
264}
265
266void SocketMessenger::register_conn(SocketConnectionRef conn)
267{
268 if (master_sid >= 0) {
269 ceph_assert(static_cast<int>(sid) == master_sid);
270 }
271 auto [i, added] = connections.emplace(conn->get_peer_addr(), conn);
272 std::ignore = i;
273 ceph_assert(added);
274}
275
276void SocketMessenger::unregister_conn(SocketConnectionRef conn)
277{
278 ceph_assert(conn);
279 auto found = connections.find(conn->get_peer_addr());
280 ceph_assert(found != connections.end());
281 ceph_assert(found->second == conn);
282 connections.erase(found);
283}