1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <seastar/core/gate.hh>
7 #include <seastar/core/shared_future.hh>
9 #include "crimson/common/gated.h"
10 #include "crimson/common/log.h"
12 #include "SocketConnection.h"
14 namespace crimson::net
{
24 Protocol(Protocol
&&) = delete;
27 virtual bool is_connected() const = 0;
29 #ifdef UNIT_TESTS_BUILT
30 bool is_closed_clean
= false;
31 bool is_closed() const { return closed
; }
35 void close(bool dispatch_reset
, std::optional
<std::function
<void()>> f_accept_new
=std::nullopt
);
36 seastar::future
<> close_clean(bool dispatch_reset
) {
37 close(dispatch_reset
);
38 // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset()
39 // which will otherwise result in deadlock
40 assert(close_ready
.valid());
41 return close_ready
.get_future();
44 virtual void start_connect(const entity_addr_t
& peer_addr
,
45 const entity_name_t
& peer_name
) = 0;
47 virtual void start_accept(SocketRef
&& socket
,
48 const entity_addr_t
& peer_addr
) = 0;
50 virtual void print(std::ostream
&) const = 0;
52 Protocol(proto_t type
,
53 ChainedDispatchers
& dispatchers
,
54 SocketConnection
& conn
);
56 virtual void trigger_close() = 0;
58 virtual ceph::bufferlist
do_sweep_messages(
59 const std::deque
<MessageURef
>& msgs
,
61 bool require_keepalive
,
62 std::optional
<utime_t
> keepalive_ack
,
63 bool require_ack
) = 0;
65 virtual void notify_write() {};
67 virtual void on_closed() {}
70 ceph::bufferlist
sweep_messages_and_move_to_sent(
72 bool require_keepalive
,
73 std::optional
<utime_t
> keepalive_ack
,
77 const proto_t proto_type
;
81 ChainedDispatchers
& dispatchers
;
82 SocketConnection
&conn
;
84 AuthConnectionMetaRef auth_meta
;
88 // become valid only after closed == true
89 seastar::shared_future
<> close_ready
;
91 // the write state-machine
93 seastar::future
<> send(MessageURef msg
);
94 seastar::future
<> keepalive();
96 // TODO: encapsulate a SessionedSender class
98 // write_state is changed with state atomically, indicating the write
99 // behavior of the according state.
100 enum class write_state_t
: uint8_t {
107 static const char* get_state_name(write_state_t state
) {
108 uint8_t index
= static_cast<uint8_t>(state
);
109 static const char *const state_names
[] = {"none",
113 assert(index
< std::size(state_names
));
114 return state_names
[index
];
117 void set_write_state(const write_state_t
& state
) {
118 if (write_state
== write_state_t::open
&&
119 state
!= write_state_t::open
&&
121 exit_open
= seastar::shared_promise
<>();
124 state_changed
.set_value();
125 state_changed
= seastar::shared_promise
<>();
128 seastar::future
<> wait_write_exit() {
130 return exit_open
->get_shared_future();
132 return seastar::now();
135 void notify_keepalive_ack(utime_t keepalive_ack
);
139 void requeue_up_to(seq_num_t seq
);
145 bool is_queued() const {
146 return (!conn
.out_q
.empty() ||
149 keepalive_ack
.has_value());
152 void ack_writes(seq_num_t seq
);
153 crimson::common::Gated gate
;
156 write_state_t write_state
= write_state_t::none
;
157 // wait until current state changed
158 seastar::shared_promise
<> state_changed
;
160 bool need_keepalive
= false;
161 std::optional
<utime_t
> keepalive_ack
= std::nullopt
;
162 uint64_t ack_left
= 0;
163 bool write_dispatching
= false;
164 // If another continuation is trying to close or replace socket when
165 // write_dispatching is true and write_state is open,
166 // it needs to wait for exit_open until writing is stopped or failed.
167 std::optional
<seastar::shared_promise
<>> exit_open
;
169 seastar::future
<stop_t
> try_exit_sweep();
170 seastar::future
<> do_write_dispatch_sweep();
174 inline std::ostream
& operator<<(std::ostream
& out
, const Protocol
& proto
) {
180 } // namespace crimson::net