1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef _MSG_ASYNC_PROTOCOL_V2_
5 #define _MSG_ASYNC_PROTOCOL_V2_
8 #include "crypto_onwire.h"
11 class ProtocolV2
: public Protocol
{
32 THROTTLE_DISPATCH_QUEUE
,
34 READ_MESSAGE_COMPLETE
,
40 static const char *get_state_name(int state
) {
41 const char *const statenames
[] = {"NONE",
46 "AUTH_CONNECTING_SIGN",
48 "SESSION_RECONNECTING",
53 "AUTH_ACCEPTING_MORE",
54 "AUTH_ACCEPTING_SIGN",
59 "THROTTLE_DISPATCH_QUEUE",
61 "READ_MESSAGE_COMPLETE",
65 return statenames
[state
];
68 // TODO: move into auth_meta?
69 ceph::crypto::onwire::rxtx_t session_stream_handlers
;
71 entity_name_t peer_name
;
73 uint64_t peer_supported_features
; // CEPH_MSGR2_FEATURE_*
75 uint64_t client_cookie
;
76 uint64_t server_cookie
;
79 uint64_t peer_global_seq
;
84 struct out_queue_entry_t
{
85 bool is_prepared
{false};
88 std::map
<int, std::list
<out_queue_entry_t
>> out_queue
;
89 std::list
<Message
*> sent
;
90 std::atomic
<uint64_t> out_seq
{0};
91 std::atomic
<uint64_t> in_seq
{0};
92 std::atomic
<uint64_t> ack_left
{0};
94 using ProtFuncPtr
= void (ProtocolV2::*)();
95 Ct
<ProtocolV2
> *bannerExchangeCallback
;
97 ceph::msgr::v2::FrameAssembler tx_frame_asm
;
98 ceph::msgr::v2::FrameAssembler rx_frame_asm
;
100 ceph::bufferlist rx_preamble
;
101 ceph::bufferlist rx_epilogue
;
102 ceph::msgr::v2::segment_bls_t rx_segments_data
;
103 ceph::msgr::v2::Tag next_tag
;
104 utime_t backoff
; // backoff time
106 utime_t throttle_stamp
;
109 ceph::bufferlist rxbuf
;
110 ceph::bufferlist txbuf
;
115 bool write_in_progress
= false;
117 ostream
&_conn_prefix(std::ostream
*_dout
);
118 void run_continuation(Ct
<ProtocolV2
> *pcontinuation
);
119 void run_continuation(Ct
<ProtocolV2
> &continuation
);
121 Ct
<ProtocolV2
> *read(CONTINUATION_RXBPTR_TYPE
<ProtocolV2
> &next
,
122 rx_buffer_t
&& buffer
);
124 Ct
<ProtocolV2
> *write(const std::string
&desc
,
125 CONTINUATION_TYPE
<ProtocolV2
> &next
,
127 Ct
<ProtocolV2
> *write(const std::string
&desc
,
128 CONTINUATION_TYPE
<ProtocolV2
> &next
,
132 bool append_frame(F
& frame
);
135 uint64_t discard_requeued_up_to(uint64_t out_seq
, uint64_t seq
);
136 void reset_recv_state();
137 void reset_security();
138 void reset_throttle();
139 Ct
<ProtocolV2
> *_fault();
140 void discard_out_queue();
141 void reset_session();
142 void prepare_send_message(uint64_t features
, Message
*m
);
143 out_queue_entry_t
_get_next_outgoing();
144 ssize_t
write_message(Message
*m
, bool more
);
145 void handle_message_ack(uint64_t seq
);
147 CONTINUATION_DECL(ProtocolV2
, _wait_for_peer_banner
);
148 READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2
, _handle_peer_banner
);
149 READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2
, _handle_peer_banner_payload
);
151 Ct
<ProtocolV2
> *_banner_exchange(Ct
<ProtocolV2
> &callback
);
152 Ct
<ProtocolV2
> *_wait_for_peer_banner();
153 Ct
<ProtocolV2
> *_handle_peer_banner(rx_buffer_t
&&buffer
, int r
);
154 Ct
<ProtocolV2
> *_handle_peer_banner_payload(rx_buffer_t
&&buffer
, int r
);
155 Ct
<ProtocolV2
> *handle_hello(ceph::bufferlist
&payload
);
157 CONTINUATION_DECL(ProtocolV2
, read_frame
);
158 CONTINUATION_DECL(ProtocolV2
, finish_auth
);
159 READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2
, handle_read_frame_preamble_main
);
160 READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2
, handle_read_frame_segment
);
161 READ_BPTR_HANDLER_CONTINUATION_DECL(ProtocolV2
, handle_read_frame_epilogue_main
);
162 CONTINUATION_DECL(ProtocolV2
, throttle_message
);
163 CONTINUATION_DECL(ProtocolV2
, throttle_bytes
);
164 CONTINUATION_DECL(ProtocolV2
, throttle_dispatch_queue
);
166 Ct
<ProtocolV2
> *read_frame();
167 Ct
<ProtocolV2
> *finish_auth();
168 Ct
<ProtocolV2
> *finish_client_auth();
169 Ct
<ProtocolV2
> *handle_read_frame_preamble_main(rx_buffer_t
&&buffer
, int r
);
170 Ct
<ProtocolV2
> *read_frame_segment();
171 Ct
<ProtocolV2
> *handle_read_frame_segment(rx_buffer_t
&&rx_buffer
, int r
);
172 Ct
<ProtocolV2
> *_handle_read_frame_segment();
173 Ct
<ProtocolV2
> *handle_read_frame_epilogue_main(rx_buffer_t
&&buffer
, int r
);
174 Ct
<ProtocolV2
> *_handle_read_frame_epilogue_main();
175 Ct
<ProtocolV2
> *handle_read_frame_dispatch();
176 Ct
<ProtocolV2
> *handle_frame_payload();
178 Ct
<ProtocolV2
> *ready();
180 Ct
<ProtocolV2
> *handle_message();
181 Ct
<ProtocolV2
> *throttle_message();
182 Ct
<ProtocolV2
> *throttle_bytes();
183 Ct
<ProtocolV2
> *throttle_dispatch_queue();
184 Ct
<ProtocolV2
> *read_message_data_prepare();
186 Ct
<ProtocolV2
> *handle_keepalive2(ceph::bufferlist
&payload
);
187 Ct
<ProtocolV2
> *handle_keepalive2_ack(ceph::bufferlist
&payload
);
189 Ct
<ProtocolV2
> *handle_message_ack(ceph::bufferlist
&payload
);
192 uint64_t connection_features
;
194 ProtocolV2(AsyncConnection
*connection
);
195 virtual ~ProtocolV2();
197 virtual void connect() override
;
198 virtual void accept() override
;
199 virtual bool is_connected() override
;
200 virtual void stop() override
;
201 virtual void fault() override
;
202 virtual void send_message(Message
*m
) override
;
203 virtual void send_keepalive() override
;
205 virtual void read_event() override
;
206 virtual void write_event() override
;
207 virtual bool is_queued() override
;
211 CONTINUATION_DECL(ProtocolV2
, start_client_banner_exchange
);
212 CONTINUATION_DECL(ProtocolV2
, post_client_banner_exchange
);
214 Ct
<ProtocolV2
> *start_client_banner_exchange();
215 Ct
<ProtocolV2
> *post_client_banner_exchange();
216 inline Ct
<ProtocolV2
> *send_auth_request() {
217 std::vector
<uint32_t> empty
;
218 return send_auth_request(empty
);
220 Ct
<ProtocolV2
> *send_auth_request(std::vector
<uint32_t> &allowed_methods
);
221 Ct
<ProtocolV2
> *handle_auth_bad_method(ceph::bufferlist
&payload
);
222 Ct
<ProtocolV2
> *handle_auth_reply_more(ceph::bufferlist
&payload
);
223 Ct
<ProtocolV2
> *handle_auth_done(ceph::bufferlist
&payload
);
224 Ct
<ProtocolV2
> *handle_auth_signature(ceph::bufferlist
&payload
);
225 Ct
<ProtocolV2
> *send_client_ident();
226 Ct
<ProtocolV2
> *send_reconnect();
227 Ct
<ProtocolV2
> *handle_ident_missing_features(ceph::bufferlist
&payload
);
228 Ct
<ProtocolV2
> *handle_session_reset(ceph::bufferlist
&payload
);
229 Ct
<ProtocolV2
> *handle_session_retry(ceph::bufferlist
&payload
);
230 Ct
<ProtocolV2
> *handle_session_retry_global(ceph::bufferlist
&payload
);
231 Ct
<ProtocolV2
> *handle_wait(ceph::bufferlist
&payload
);
232 Ct
<ProtocolV2
> *handle_reconnect_ok(ceph::bufferlist
&payload
);
233 Ct
<ProtocolV2
> *handle_server_ident(ceph::bufferlist
&payload
);
236 CONTINUATION_DECL(ProtocolV2
, start_server_banner_exchange
);
237 CONTINUATION_DECL(ProtocolV2
, post_server_banner_exchange
);
238 CONTINUATION_DECL(ProtocolV2
, server_ready
);
240 Ct
<ProtocolV2
> *start_server_banner_exchange();
241 Ct
<ProtocolV2
> *post_server_banner_exchange();
242 Ct
<ProtocolV2
> *handle_auth_request(ceph::bufferlist
&payload
);
243 Ct
<ProtocolV2
> *handle_auth_request_more(ceph::bufferlist
&payload
);
244 Ct
<ProtocolV2
> *_handle_auth_request(bufferlist
& auth_payload
, bool more
);
245 Ct
<ProtocolV2
> *_auth_bad_method(int r
);
246 Ct
<ProtocolV2
> *handle_client_ident(ceph::bufferlist
&payload
);
247 Ct
<ProtocolV2
> *handle_ident_missing_features_write(int r
);
248 Ct
<ProtocolV2
> *handle_reconnect(ceph::bufferlist
&payload
);
249 Ct
<ProtocolV2
> *handle_existing_connection(const AsyncConnectionRef
& existing
);
250 Ct
<ProtocolV2
> *reuse_connection(const AsyncConnectionRef
& existing
,
251 ProtocolV2
*exproto
);
252 Ct
<ProtocolV2
> *send_server_ident();
253 Ct
<ProtocolV2
> *send_reconnect_ok();
254 Ct
<ProtocolV2
> *server_ready();
256 size_t get_current_msg_size() const;
259 #endif /* _MSG_ASYNC_PROTOCOL_V2_ */