#include <tuple>
#include <boost/functional/hash.hpp>
+#include <fmt/os.h>
#include "auth/Auth.h"
#include "Errors.h"
SocketMessenger::SocketMessenger(const entity_name_t& myname,
const std::string& logic_name,
uint32_t nonce)
- : Messenger{myname},
- master_sid{seastar::this_shard_id()},
+ : master_sid{seastar::this_shard_id()},
logic_name{logic_name},
- nonce{nonce}
+ nonce{nonce},
+ my_name{myname}
{}
SocketMessenger::~SocketMessenger()
{
+ logger().debug("~SocketMessenger: {}", logic_name);
ceph_assert(!listener);
}
return ret;
}
-seastar::future<> SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
+void SocketMessenger::set_myaddrs(const entity_addrvec_t& addrs)
{
assert(seastar::this_shard_id() == master_sid);
- auto my_addrs = addrs;
+ my_addrs = addrs;
for (auto& addr : my_addrs.v) {
addr.nonce = nonce;
}
- return Messenger::set_myaddrs(my_addrs);
}
crimson::net::listen_ertr::future<>
{
assert(seastar::this_shard_id() == master_sid);
ceph_assert(addrs.front().get_family() == AF_INET);
- return set_myaddrs(addrs).then([this] {
+ set_myaddrs(addrs);
+ return seastar::futurize_invoke([this] {
if (!listener) {
return FixedCPUServerSocket::create().then([this] (auto _listener) {
listener = _listener;
if (auto found = lookup_conn(peer_addr); found) {
logger().debug("{} connect to existing", *found);
- return found->shared_from_this();
+ return found->get_local_shared_foreign_from_this();
}
SocketConnectionRef conn =
seastar::make_shared<SocketConnection>(*this, dispatchers);
conn->start_connect(peer_addr, peer_name);
- return conn->shared_from_this();
+ return conn->get_local_shared_foreign_from_this();
}
seastar::future<> SocketMessenger::shutdown()
// close all connections
}).then([this] {
return seastar::parallel_for_each(accepting_conns, [] (auto conn) {
- return conn->close_clean(false);
+ return conn->close_clean_yielded();
});
}).then([this] {
ceph_assert(accepting_conns.empty());
return seastar::parallel_for_each(connections, [] (auto conn) {
- return conn.second->close_clean(false);
+ return conn.second->close_clean_yielded();
});
}).then([this] {
return seastar::parallel_for_each(closing_conns, [] (auto conn) {
- return conn->close_clean(false);
+ return conn->close_clean_yielded();
});
}).then([this] {
ceph_assert(connections.empty());
}
}
-seastar::future<> SocketMessenger::learned_addr(const entity_addr_t &peer_addr_for_me, const SocketConnection& conn)
+void SocketMessenger::learned_addr(
+ const entity_addr_t &peer_addr_for_me,
+ const SocketConnection& conn)
{
assert(seastar::this_shard_id() == master_sid);
if (!need_addr) {
throw std::system_error(
make_error_code(crimson::net::error::bad_peer_address));
}
- return seastar::now();
+ return;
}
if (get_myaddr().get_type() == entity_addr_t::TYPE_NONE) {
addr.set_type(entity_addr_t::TYPE_ANY);
addr.set_port(0);
need_addr = false;
- return set_myaddrs(entity_addrvec_t{addr}
- ).then([this, &conn] {
- logger().info("{} learned myaddr={} (unbound)", conn, get_myaddr());
- });
+ set_myaddrs(entity_addrvec_t{addr});
+ logger().info("{} learned myaddr={} (unbound)", conn, get_myaddr());
} else {
// Already bound
if (!get_myaddr().is_any() &&
addr.set_type(get_myaddr().get_type());
addr.set_port(get_myaddr().get_port());
need_addr = false;
- return set_myaddrs(entity_addrvec_t{addr}
- ).then([this, &conn] {
- logger().info("{} learned myaddr={} (blank IP)", conn, get_myaddr());
- });
+ set_myaddrs(entity_addrvec_t{addr});
+ logger().info("{} learned myaddr={} (blank IP)", conn, get_myaddr());
} else if (!get_myaddr().is_same_host(peer_addr_for_me)) {
logger().warn("{} peer_addr_for_me {} IP doesn't match myaddr {}",
conn, peer_addr_for_me, get_myaddr());
make_error_code(crimson::net::error::bad_peer_address));
} else {
need_addr = false;
- return seastar::now();
}
}
}
}
}
-seastar::future<uint32_t>
-SocketMessenger::get_global_seq(uint32_t old)
+uint32_t SocketMessenger::get_global_seq(uint32_t old)
{
if (old > global_seq) {
global_seq = old;
}
- return seastar::make_ready_future<uint32_t>(++global_seq);
+ return ++global_seq;
}
} // namespace crimson::net