]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/net/Interceptor.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / net / Interceptor.h
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#pragma once
5
6#include <variant>
7#include <seastar/core/sharded.hh>
8#include <seastar/core/sleep.hh>
9
10#include "Fwd.h"
11#include "msg/async/frames_v2.h"
12
13namespace crimson::net {
14
15enum class custom_bp_t : uint8_t {
16 BANNER_WRITE = 0,
17 BANNER_READ,
18 BANNER_PAYLOAD_READ,
19 SOCKET_CONNECTING,
20 SOCKET_ACCEPTED
21};
22inline const char* get_bp_name(custom_bp_t bp) {
23 uint8_t index = static_cast<uint8_t>(bp);
24 static const char *const bp_names[] = {"BANNER_WRITE",
25 "BANNER_READ",
26 "BANNER_PAYLOAD_READ",
27 "SOCKET_CONNECTING",
28 "SOCKET_ACCEPTED"};
29 assert(index < std::size(bp_names));
30 return bp_names[index];
31}
32
33enum class bp_type_t {
34 READ = 0,
35 WRITE
36};
37
38enum class bp_action_t {
39 CONTINUE = 0,
40 FAULT,
41 BLOCK,
42 STALL
43};
44
45inline std::ostream& operator<<(std::ostream& out, const bp_action_t& action) {
46 static const char *const action_names[] = {"CONTINUE",
47 "FAULT",
48 "BLOCK",
49 "STALL"};
50 assert(static_cast<size_t>(action) < std::size(action_names));
51 return out << action_names[static_cast<size_t>(action)];
52}
53
54class socket_blocker {
55 std::optional<seastar::abort_source> p_blocked;
56 std::optional<seastar::abort_source> p_unblocked;
57
58 public:
59 seastar::future<> wait_blocked() {
60 ceph_assert(!p_blocked);
61 if (p_unblocked) {
f67539c2 62 return seastar::make_ready_future<>();
9f95a23c
TL
63 } else {
64 p_blocked = seastar::abort_source();
20effc67
TL
65 return seastar::sleep_abortable(std::chrono::seconds(10),
66 *p_blocked).then([] {
9f95a23c
TL
67 throw std::runtime_error(
68 "Timeout (10s) in socket_blocker::wait_blocked()");
69 }).handle_exception_type([] (const seastar::sleep_aborted& e) {
70 // wait done!
71 });
72 }
73 }
74
75 seastar::future<> block() {
76 if (p_blocked) {
77 p_blocked->request_abort();
78 p_blocked = std::nullopt;
79 }
80 ceph_assert(!p_unblocked);
81 p_unblocked = seastar::abort_source();
20effc67
TL
82 return seastar::sleep_abortable(std::chrono::seconds(10),
83 *p_unblocked).then([] {
9f95a23c
TL
84 ceph_abort("Timeout (10s) in socket_blocker::block()");
85 }).handle_exception_type([] (const seastar::sleep_aborted& e) {
86 // wait done!
87 });
88 }
89
90 void unblock() {
91 ceph_assert(!p_blocked);
92 ceph_assert(p_unblocked);
93 p_unblocked->request_abort();
94 p_unblocked = std::nullopt;
95 }
96};
97
98struct tag_bp_t {
99 ceph::msgr::v2::Tag tag;
100 bp_type_t type;
101 bool operator==(const tag_bp_t& x) const {
102 return tag == x.tag && type == x.type;
103 }
104 bool operator!=(const tag_bp_t& x) const { return !operator==(x); }
105 bool operator<(const tag_bp_t& x) const {
106 return std::tie(tag, type) < std::tie(x.tag, x.type);
107 }
108};
109
110struct Breakpoint {
111 using var_t = std::variant<custom_bp_t, tag_bp_t>;
112 var_t bp;
113 Breakpoint(custom_bp_t bp) : bp(bp) { }
114 Breakpoint(ceph::msgr::v2::Tag tag, bp_type_t type)
115 : bp(tag_bp_t{tag, type}) { }
116 bool operator==(const Breakpoint& x) const { return bp == x.bp; }
117 bool operator!=(const Breakpoint& x) const { return !operator==(x); }
118 bool operator==(const custom_bp_t& x) const { return bp == var_t(x); }
119 bool operator!=(const custom_bp_t& x) const { return !operator==(x); }
120 bool operator==(const tag_bp_t& x) const { return bp == var_t(x); }
121 bool operator!=(const tag_bp_t& x) const { return !operator==(x); }
122 bool operator<(const Breakpoint& x) const { return bp < x.bp; }
123};
124
125inline std::ostream& operator<<(std::ostream& out, const Breakpoint& bp) {
126 if (auto custom_bp = std::get_if<custom_bp_t>(&bp.bp)) {
127 return out << get_bp_name(*custom_bp);
128 } else {
129 auto tag_bp = std::get<tag_bp_t>(bp.bp);
130 static const char *const tag_names[] = {"NONE",
131 "HELLO",
132 "AUTH_REQUEST",
133 "AUTH_BAD_METHOD",
134 "AUTH_REPLY_MORE",
135 "AUTH_REQUEST_MORE",
136 "AUTH_DONE",
137 "AUTH_SIGNATURE",
138 "CLIENT_IDENT",
139 "SERVER_IDENT",
140 "IDENT_MISSING_FEATURES",
141 "SESSION_RECONNECT",
142 "SESSION_RESET",
143 "SESSION_RETRY",
144 "SESSION_RETRY_GLOBAL",
145 "SESSION_RECONNECT_OK",
146 "WAIT",
147 "MESSAGE",
148 "KEEPALIVE2",
149 "KEEPALIVE2_ACK",
150 "ACK"};
151 assert(static_cast<size_t>(tag_bp.tag) < std::size(tag_names));
152 return out << tag_names[static_cast<size_t>(tag_bp.tag)]
153 << (tag_bp.type == bp_type_t::WRITE ? "_WRITE" : "_READ");
154 }
155}
156
157struct Interceptor {
158 socket_blocker blocker;
159 virtual ~Interceptor() {}
160 virtual void register_conn(Connection& conn) = 0;
161 virtual void register_conn_ready(Connection& conn) = 0;
162 virtual void register_conn_closed(Connection& conn) = 0;
163 virtual void register_conn_replaced(Connection& conn) = 0;
164 virtual bp_action_t intercept(Connection& conn, Breakpoint bp) = 0;
165};
166
167} // namespace crimson::net