]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/net/Protocol.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / net / Protocol.h
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#pragma once
5
6#include <seastar/core/gate.hh>
7#include <seastar/core/shared_future.hh>
8
f67539c2
TL
9#include "crimson/common/gated.h"
10#include "crimson/common/log.h"
9f95a23c
TL
11#include "Fwd.h"
12#include "SocketConnection.h"
13
14namespace crimson::net {
15
16class Protocol {
17 public:
18 enum class proto_t {
19 none,
20 v1,
21 v2
22 };
23
24 Protocol(Protocol&&) = delete;
25 virtual ~Protocol();
26
f67539c2 27 virtual bool is_connected() const = 0;
9f95a23c 28
f67539c2
TL
29#ifdef UNIT_TESTS_BUILT
30 bool is_closed_clean = false;
9f95a23c 31 bool is_closed() const { return closed; }
f67539c2 32#endif
9f95a23c
TL
33
34 // Reentrant closing
f67539c2
TL
35 void close(bool dispatch_reset, std::optional<std::function<void()>> f_accept_new=std::nullopt);
36 seastar::future<> close_clean(bool dispatch_reset) {
37 close(dispatch_reset);
38 // it can happen if close_clean() is called inside Dispatcher::ms_handle_reset()
39 // which will otherwise result in deadlock
40 assert(close_ready.valid());
41 return close_ready.get_future();
42 }
9f95a23c
TL
43
44 virtual void start_connect(const entity_addr_t& peer_addr,
f67539c2 45 const entity_name_t& peer_name) = 0;
9f95a23c
TL
46
47 virtual void start_accept(SocketRef&& socket,
48 const entity_addr_t& peer_addr) = 0;
49
f67539c2 50 virtual void print(std::ostream&) const = 0;
9f95a23c
TL
51 protected:
52 Protocol(proto_t type,
f67539c2 53 ChainedDispatchers& dispatchers,
9f95a23c
TL
54 SocketConnection& conn);
55
56 virtual void trigger_close() = 0;
57
58 virtual ceph::bufferlist do_sweep_messages(
20effc67 59 const std::deque<MessageURef>& msgs,
9f95a23c
TL
60 size_t num_msgs,
61 bool require_keepalive,
62 std::optional<utime_t> keepalive_ack,
63 bool require_ack) = 0;
64
65 virtual void notify_write() {};
66
f67539c2 67 virtual void on_closed() {}
20effc67
TL
68
69 private:
70 ceph::bufferlist sweep_messages_and_move_to_sent(
71 size_t num_msgs,
72 bool require_keepalive,
73 std::optional<utime_t> keepalive_ack,
74 bool require_ack);
f67539c2 75
9f95a23c
TL
76 public:
77 const proto_t proto_type;
f67539c2 78 SocketRef socket;
9f95a23c
TL
79
80 protected:
f67539c2 81 ChainedDispatchers& dispatchers;
9f95a23c
TL
82 SocketConnection &conn;
83
9f95a23c
TL
84 AuthConnectionMetaRef auth_meta;
85
86 private:
87 bool closed = false;
88 // become valid only after closed == true
89 seastar::shared_future<> close_ready;
90
91// the write state-machine
92 public:
20effc67 93 seastar::future<> send(MessageURef msg);
9f95a23c
TL
94 seastar::future<> keepalive();
95
96// TODO: encapsulate a SessionedSender class
97 protected:
98 // write_state is changed with state atomically, indicating the write
99 // behavior of the according state.
100 enum class write_state_t : uint8_t {
101 none,
102 delay,
103 open,
104 drop
105 };
106
107 static const char* get_state_name(write_state_t state) {
108 uint8_t index = static_cast<uint8_t>(state);
109 static const char *const state_names[] = {"none",
110 "delay",
111 "open",
112 "drop"};
113 assert(index < std::size(state_names));
114 return state_names[index];
115 }
116
117 void set_write_state(const write_state_t& state) {
118 if (write_state == write_state_t::open &&
119 state != write_state_t::open &&
120 write_dispatching) {
121 exit_open = seastar::shared_promise<>();
122 }
123 write_state = state;
124 state_changed.set_value();
125 state_changed = seastar::shared_promise<>();
126 }
127
128 seastar::future<> wait_write_exit() {
129 if (exit_open) {
130 return exit_open->get_shared_future();
131 }
132 return seastar::now();
133 }
134
135 void notify_keepalive_ack(utime_t keepalive_ack);
136
137 void notify_ack();
138
139 void requeue_up_to(seq_num_t seq);
140
141 void requeue_sent();
142
143 void reset_write();
144
145 bool is_queued() const {
146 return (!conn.out_q.empty() ||
147 ack_left > 0 ||
148 need_keepalive ||
149 keepalive_ack.has_value());
150 }
151
152 void ack_writes(seq_num_t seq);
f67539c2 153 crimson::common::Gated gate;
9f95a23c
TL
154
155 private:
156 write_state_t write_state = write_state_t::none;
157 // wait until current state changed
158 seastar::shared_promise<> state_changed;
159
160 bool need_keepalive = false;
161 std::optional<utime_t> keepalive_ack = std::nullopt;
162 uint64_t ack_left = 0;
163 bool write_dispatching = false;
164 // If another continuation is trying to close or replace socket when
165 // write_dispatching is true and write_state is open,
166 // it needs to wait for exit_open until writing is stopped or failed.
167 std::optional<seastar::shared_promise<>> exit_open;
168
169 seastar::future<stop_t> try_exit_sweep();
170 seastar::future<> do_write_dispatch_sweep();
171 void write_event();
172};
173
f67539c2
TL
174inline std::ostream& operator<<(std::ostream& out, const Protocol& proto) {
175 proto.print(out);
176 return out;
177}
178
179
9f95a23c 180} // namespace crimson::net