]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/net/Protocol.h
0343f026025aa9a7fcb56180e651f1f8513abf59
[ceph.git] / ceph / src / crimson / net / Protocol.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #pragma once
5
6 #include <seastar/core/gate.hh>
7 #include <seastar/core/shared_future.hh>
8
9 #include "crimson/common/gated.h"
10 #include "crimson/common/log.h"
11 #include "Fwd.h"
12 #include "SocketConnection.h"
13
14 namespace crimson::net {
15
16 class Protocol {
17 public:
18 enum class proto_t {
19 none,
20 v1,
21 v2
22 };
23
24 Protocol(Protocol&&) = delete;
25 virtual ~Protocol();
26
27 virtual bool is_connected() const = 0;
28
29 #ifdef UNIT_TESTS_BUILT
30 bool is_closed_clean = false;
31 bool is_closed() const { return closed; }
32 #endif
33
34 // Reentrant closing
35 void close(bool dispatch_reset, std::optional<std::function<void()>> f_accept_new=std::nullopt);
36 seastar::future<> close_clean(bool dispatch_reset) {
37 close(dispatch_reset);
38 // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset()
39 // which will otherwise result in deadlock
40 assert(close_ready.valid());
41 return close_ready.get_future();
42 }
43
44 virtual void start_connect(const entity_addr_t& peer_addr,
45 const entity_name_t& peer_name) = 0;
46
47 virtual void start_accept(SocketRef&& socket,
48 const entity_addr_t& peer_addr) = 0;
49
50 virtual void print(std::ostream&) const = 0;
51 protected:
52 Protocol(proto_t type,
53 ChainedDispatchers& dispatchers,
54 SocketConnection& conn);
55
56 virtual void trigger_close() = 0;
57
58 virtual ceph::bufferlist do_sweep_messages(
59 const std::deque<MessageURef>& msgs,
60 size_t num_msgs,
61 bool require_keepalive,
62 std::optional<utime_t> keepalive_ack,
63 bool require_ack) = 0;
64
65 virtual void notify_write() {};
66
67 virtual void on_closed() {}
68
69 private:
70 ceph::bufferlist sweep_messages_and_move_to_sent(
71 size_t num_msgs,
72 bool require_keepalive,
73 std::optional<utime_t> keepalive_ack,
74 bool require_ack);
75
76 public:
77 const proto_t proto_type;
78 SocketRef socket;
79
80 protected:
81 ChainedDispatchers& dispatchers;
82 SocketConnection &conn;
83
84 AuthConnectionMetaRef auth_meta;
85
86 private:
87 bool closed = false;
88 // become valid only after closed == true
89 seastar::shared_future<> close_ready;
90
91 // the write state-machine
92 public:
93 seastar::future<> send(MessageURef msg);
94 seastar::future<> keepalive();
95
96 // TODO: encapsulate a SessionedSender class
97 protected:
98 // write_state is changed with state atomically, indicating the write
99 // behavior of the according state.
100 enum class write_state_t : uint8_t {
101 none,
102 delay,
103 open,
104 drop
105 };
106
107 static const char* get_state_name(write_state_t state) {
108 uint8_t index = static_cast<uint8_t>(state);
109 static const char *const state_names[] = {"none",
110 "delay",
111 "open",
112 "drop"};
113 assert(index < std::size(state_names));
114 return state_names[index];
115 }
116
117 void set_write_state(const write_state_t& state) {
118 if (write_state == write_state_t::open &&
119 state != write_state_t::open &&
120 write_dispatching) {
121 exit_open = seastar::shared_promise<>();
122 }
123 write_state = state;
124 state_changed.set_value();
125 state_changed = seastar::shared_promise<>();
126 }
127
128 seastar::future<> wait_write_exit() {
129 if (exit_open) {
130 return exit_open->get_shared_future();
131 }
132 return seastar::now();
133 }
134
135 void notify_keepalive_ack(utime_t keepalive_ack);
136
137 void notify_ack();
138
139 void requeue_up_to(seq_num_t seq);
140
141 void requeue_sent();
142
143 void reset_write();
144
145 bool is_queued() const {
146 return (!conn.out_q.empty() ||
147 ack_left > 0 ||
148 need_keepalive ||
149 keepalive_ack.has_value());
150 }
151
152 void ack_writes(seq_num_t seq);
153 crimson::common::Gated gate;
154
155 private:
156 write_state_t write_state = write_state_t::none;
157 // wait until current state changed
158 seastar::shared_promise<> state_changed;
159
160 bool need_keepalive = false;
161 std::optional<utime_t> keepalive_ack = std::nullopt;
162 uint64_t ack_left = 0;
163 bool write_dispatching = false;
164 // If another continuation is trying to close or replace socket when
165 // write_dispatching is true and write_state is open,
166 // it needs to wait for exit_open until writing is stopped or failed.
167 std::optional<seastar::shared_promise<>> exit_open;
168
169 seastar::future<stop_t> try_exit_sweep();
170 seastar::future<> do_write_dispatch_sweep();
171 void write_event();
172 };
173
174 inline std::ostream& operator<<(std::ostream& out, const Protocol& proto) {
175 proto.print(out);
176 return out;
177 }
178
179
180 } // namespace crimson::net