]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #pragma once | |
5 | ||
6 | #include <seastar/core/gate.hh> | |
7 | #include <seastar/core/shared_future.hh> | |
8 | ||
f67539c2 TL |
9 | #include "crimson/common/gated.h" |
10 | #include "crimson/common/log.h" | |
9f95a23c TL |
11 | #include "Fwd.h" |
12 | #include "SocketConnection.h" | |
13 | ||
14 | namespace crimson::net { | |
15 | ||
16 | class Protocol { | |
17 | public: | |
18 | enum class proto_t { | |
19 | none, | |
20 | v1, | |
21 | v2 | |
22 | }; | |
23 | ||
24 | Protocol(Protocol&&) = delete; | |
25 | virtual ~Protocol(); | |
26 | ||
f67539c2 | 27 | virtual bool is_connected() const = 0; |
9f95a23c | 28 | |
f67539c2 TL |
29 | #ifdef UNIT_TESTS_BUILT |
30 | bool is_closed_clean = false; | |
9f95a23c | 31 | bool is_closed() const { return closed; } |
f67539c2 | 32 | #endif |
9f95a23c TL |
33 | |
34 | // Reentrant closing | |
f67539c2 TL |
35 | void close(bool dispatch_reset, std::optional<std::function<void()>> f_accept_new=std::nullopt); |
36 | seastar::future<> close_clean(bool dispatch_reset) { | |
37 | close(dispatch_reset); | |
38 | // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset() | |
39 | // which will otherwise result in deadlock | |
40 | assert(close_ready.valid()); | |
41 | return close_ready.get_future(); | |
42 | } | |
9f95a23c TL |
43 | |
44 | virtual void start_connect(const entity_addr_t& peer_addr, | |
f67539c2 | 45 | const entity_name_t& peer_name) = 0; |
9f95a23c TL |
46 | |
47 | virtual void start_accept(SocketRef&& socket, | |
48 | const entity_addr_t& peer_addr) = 0; | |
49 | ||
f67539c2 | 50 | virtual void print(std::ostream&) const = 0; |
9f95a23c TL |
51 | protected: |
52 | Protocol(proto_t type, | |
f67539c2 | 53 | ChainedDispatchers& dispatchers, |
9f95a23c TL |
54 | SocketConnection& conn); |
55 | ||
56 | virtual void trigger_close() = 0; | |
57 | ||
58 | virtual ceph::bufferlist do_sweep_messages( | |
20effc67 | 59 | const std::deque<MessageURef>& msgs, |
9f95a23c TL |
60 | size_t num_msgs, |
61 | bool require_keepalive, | |
62 | std::optional<utime_t> keepalive_ack, | |
63 | bool require_ack) = 0; | |
64 | ||
65 | virtual void notify_write() {}; | |
66 | ||
f67539c2 | 67 | virtual void on_closed() {} |
20effc67 TL |
68 | |
69 | private: | |
70 | ceph::bufferlist sweep_messages_and_move_to_sent( | |
71 | size_t num_msgs, | |
72 | bool require_keepalive, | |
73 | std::optional<utime_t> keepalive_ack, | |
74 | bool require_ack); | |
f67539c2 | 75 | |
9f95a23c TL |
76 | public: |
77 | const proto_t proto_type; | |
f67539c2 | 78 | SocketRef socket; |
9f95a23c TL |
79 | |
80 | protected: | |
f67539c2 | 81 | ChainedDispatchers& dispatchers; |
9f95a23c TL |
82 | SocketConnection &conn; |
83 | ||
9f95a23c TL |
84 | AuthConnectionMetaRef auth_meta; |
85 | ||
86 | private: | |
87 | bool closed = false; | |
88 | // become valid only after closed == true | |
89 | seastar::shared_future<> close_ready; | |
90 | ||
91 | // the write state-machine | |
92 | public: | |
20effc67 | 93 | seastar::future<> send(MessageURef msg); |
9f95a23c TL |
94 | seastar::future<> keepalive(); |
95 | ||
96 | // TODO: encapsulate a SessionedSender class | |
97 | protected: | |
98 | // write_state is changed with state atomically, indicating the write | |
99 | // behavior of the according state. | |
100 | enum class write_state_t : uint8_t { | |
101 | none, | |
102 | delay, | |
103 | open, | |
104 | drop | |
105 | }; | |
106 | ||
107 | static const char* get_state_name(write_state_t state) { | |
108 | uint8_t index = static_cast<uint8_t>(state); | |
109 | static const char *const state_names[] = {"none", | |
110 | "delay", | |
111 | "open", | |
112 | "drop"}; | |
113 | assert(index < std::size(state_names)); | |
114 | return state_names[index]; | |
115 | } | |
116 | ||
117 | void set_write_state(const write_state_t& state) { | |
118 | if (write_state == write_state_t::open && | |
119 | state != write_state_t::open && | |
120 | write_dispatching) { | |
121 | exit_open = seastar::shared_promise<>(); | |
122 | } | |
123 | write_state = state; | |
124 | state_changed.set_value(); | |
125 | state_changed = seastar::shared_promise<>(); | |
126 | } | |
127 | ||
128 | seastar::future<> wait_write_exit() { | |
129 | if (exit_open) { | |
130 | return exit_open->get_shared_future(); | |
131 | } | |
132 | return seastar::now(); | |
133 | } | |
134 | ||
135 | void notify_keepalive_ack(utime_t keepalive_ack); | |
136 | ||
137 | void notify_ack(); | |
138 | ||
139 | void requeue_up_to(seq_num_t seq); | |
140 | ||
141 | void requeue_sent(); | |
142 | ||
143 | void reset_write(); | |
144 | ||
145 | bool is_queued() const { | |
146 | return (!conn.out_q.empty() || | |
147 | ack_left > 0 || | |
148 | need_keepalive || | |
149 | keepalive_ack.has_value()); | |
150 | } | |
151 | ||
152 | void ack_writes(seq_num_t seq); | |
f67539c2 | 153 | crimson::common::Gated gate; |
9f95a23c TL |
154 | |
155 | private: | |
156 | write_state_t write_state = write_state_t::none; | |
157 | // wait until current state changed | |
158 | seastar::shared_promise<> state_changed; | |
159 | ||
160 | bool need_keepalive = false; | |
161 | std::optional<utime_t> keepalive_ack = std::nullopt; | |
162 | uint64_t ack_left = 0; | |
163 | bool write_dispatching = false; | |
164 | // If another continuation is trying to close or replace socket when | |
165 | // write_dispatching is true and write_state is open, | |
166 | // it needs to wait for exit_open until writing is stopped or failed. | |
167 | std::optional<seastar::shared_promise<>> exit_open; | |
168 | ||
169 | seastar::future<stop_t> try_exit_sweep(); | |
170 | seastar::future<> do_write_dispatch_sweep(); | |
171 | void write_event(); | |
172 | }; | |
173 | ||
f67539c2 TL |
174 | inline std::ostream& operator<<(std::ostream& out, const Protocol& proto) { |
175 | proto.print(out); | |
176 | return out; | |
177 | } | |
178 | ||
179 | ||
9f95a23c | 180 | } // namespace crimson::net |