1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2017 Red Hat, Inc
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
20 #include <seastar/core/gate.hh>
21 #include <seastar/core/reactor.hh>
22 #include <seastar/core/sharded.hh>
23 #include <seastar/core/shared_future.hh>
25 #include "crimson/net/chained_dispatchers.h"
26 #include "Messenger.h"
28 #include "SocketConnection.h"
30 namespace crimson::net
{
32 class FixedCPUServerSocket
;
34 class SocketMessenger final
: public Messenger
{
35 const seastar::shard_id master_sid
;
36 seastar::promise
<> shutdown_promise
;
38 FixedCPUServerSocket
* listener
= nullptr;
39 ChainedDispatchers dispatchers
;
40 std::map
<entity_addr_t
, SocketConnectionRef
> connections
;
41 std::set
<SocketConnectionRef
> accepting_conns
;
42 std::vector
<SocketConnectionRef
> closing_conns
;
43 ceph::net::PolicySet
<Throttle
> policy_set
;
44 // Distinguish messengers with meaningful names for debugging
45 const std::string logic_name
;
47 // specifying we haven't learned our addr; set false when we find it.
48 bool need_addr
= true;
49 uint32_t global_seq
= 0;
52 listen_ertr::future
<> do_listen(const entity_addrvec_t
& addr
);
53 /// try to bind to the first unused port of given address
54 bind_ertr::future
<> try_bind(const entity_addrvec_t
& addr
,
55 uint32_t min_port
, uint32_t max_port
);
59 SocketMessenger(const entity_name_t
& myname
,
60 const std::string
& logic_name
,
62 ~SocketMessenger() override
;
64 seastar::future
<> set_myaddrs(const entity_addrvec_t
& addr
) override
;
66 bool set_addr_unknowns(const entity_addrvec_t
&addr
) override
;
67 // Messenger interfaces are assumed to be called from its own shard, but its
68 // behavior should be symmetric when called from any shard.
69 bind_ertr::future
<> bind(const entity_addrvec_t
& addr
) override
;
71 seastar::future
<> start(const dispatchers_t
& dispatchers
) override
;
73 ConnectionRef
connect(const entity_addr_t
& peer_addr
,
74 const entity_name_t
& peer_name
) override
;
76 seastar::future
<> wait() override
{
77 assert(seastar::this_shard_id() == master_sid
);
78 return shutdown_promise
.get_future();
81 void stop() override
{
85 bool is_started() const override
{
86 return !dispatchers
.empty();
89 seastar::future
<> shutdown() override
;
91 void print(std::ostream
& out
) const override
{
94 << ") " << get_myaddr();
97 SocketPolicy
get_policy(entity_type_t peer_type
) const override
;
99 SocketPolicy
get_default_policy() const override
;
101 void set_default_policy(const SocketPolicy
& p
) override
;
103 void set_policy(entity_type_t peer_type
, const SocketPolicy
& p
) override
;
105 void set_policy_throttler(entity_type_t peer_type
, Throttle
* throttle
) override
;
108 seastar::future
<uint32_t> get_global_seq(uint32_t old
=0);
109 seastar::future
<> learned_addr(const entity_addr_t
&peer_addr_for_me
,
110 const SocketConnection
& conn
);
112 SocketConnectionRef
lookup_conn(const entity_addr_t
& addr
);
113 void accept_conn(SocketConnectionRef
);
114 void unaccept_conn(SocketConnectionRef
);
115 void register_conn(SocketConnectionRef
);
116 void unregister_conn(SocketConnectionRef
);
117 void closing_conn(SocketConnectionRef
);
118 void closed_conn(SocketConnectionRef
);
119 seastar::shard_id
shard_id() const {
120 assert(seastar::this_shard_id() == master_sid
);
125 } // namespace crimson::net