]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/async/Protocol.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / msg / async / Protocol.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef _MSG_ASYNC_PROTOCOL_
5 #define _MSG_ASYNC_PROTOCOL_
6
7 #include <list>
8 #include <map>
9
10 #include "AsyncConnection.h"
11 #include "include/buffer.h"
12 #include "include/msgr.h"
13
14 /*
15 * Continuation Helper Classes
16 */
17
18 #include <memory>
19 #include <tuple>
20
21 template <class C>
22 class Ct {
23 public:
24 virtual ~Ct() {}
25 virtual Ct<C> *call(C *foo) const = 0;
26 };
27
28 template <class C, typename... Args>
29 class CtFun : public Ct<C> {
30 private:
31 using fn_t = Ct<C> *(C::*)(Args...);
32 fn_t _f;
33 std::tuple<Args...> _params;
34
35 template <std::size_t... Is>
36 inline Ct<C> *_call(C *foo, std::index_sequence<Is...>) const {
37 return (foo->*_f)(std::get<Is>(_params)...);
38 }
39
40 public:
41 CtFun(fn_t f) : _f(f) {}
42
43 inline void setParams(Args... args) { _params = std::make_tuple(args...); }
44 inline Ct<C> *call(C *foo) const override {
45 return _call(foo, std::index_sequence_for<Args...>());
46 }
47 };
48
49 using rx_buffer_t =
50 std::unique_ptr<ceph::buffer::ptr_node, ceph::buffer::ptr_node::disposer>;
51
52 template <class C>
53 class CtRxNode : public Ct<C> {
54 using fn_t = Ct<C> *(C::*)(rx_buffer_t&&, int r);
55 fn_t _f;
56
57 public:
58 mutable rx_buffer_t node;
59 int r;
60
61 CtRxNode(fn_t f) : _f(f) {}
62 void setParams(rx_buffer_t &&node, int r) {
63 this->node = std::move(node);
64 this->r = r;
65 }
66 inline Ct<C> *call(C *foo) const override {
67 return (foo->*_f)(std::move(node), r);
68 }
69 };
70
71 template <class C> using CONTINUATION_TYPE = CtFun<C>;
72 template <class C> using CONTINUATION_TX_TYPE = CtFun<C, int>;
73 template <class C> using CONTINUATION_RX_TYPE = CtFun<C, char*, int>;
74 template <class C> using CONTINUATION_RXBPTR_TYPE = CtRxNode<C>;
75
76 #define CONTINUATION_DECL(C, F, ...) \
77 CtFun<C, ##__VA_ARGS__> F##_cont { (&C::F) };
78
79 #define CONTINUATION(F) F##_cont
80 #define CONTINUE(F, ...) (F##_cont.setParams(__VA_ARGS__), &F##_cont)
81
82 #define CONTINUATION_RUN(CT) \
83 { \
84 Ct<std::remove_reference<decltype(*this)>::type> *_cont = &CT;\
85 do { \
86 _cont = _cont->call(this); \
87 } while (_cont); \
88 }
89
90 #define READ_HANDLER_CONTINUATION_DECL(C, F) \
91 CONTINUATION_DECL(C, F, char *, int)
92
93 #define READ_BPTR_HANDLER_CONTINUATION_DECL(C, F) \
94 CtRxNode<C> F##_cont { (&C::F) };
95
96 #define WRITE_HANDLER_CONTINUATION_DECL(C, F) CONTINUATION_DECL(C, F, int)
97
98 //////////////////////////////////////////////////////////////////////
99
100 class AsyncMessenger;
101
102 class Protocol {
103 public:
104 const int proto_type;
105 protected:
106 AsyncConnection *connection;
107 AsyncMessenger *messenger;
108 CephContext *cct;
109 public:
110 std::shared_ptr<AuthConnectionMeta> auth_meta;
111
112 public:
113 Protocol(int type, AsyncConnection *connection);
114 virtual ~Protocol();
115
116 // prepare protocol for connecting to peer
117 virtual void connect() = 0;
118 // prepare protocol for accepting peer connections
119 virtual void accept() = 0;
120 // true -> protocol is ready for sending messages
121 virtual bool is_connected() = 0;
122 // stop connection
123 virtual void stop() = 0;
124 // signal and handle connection failure
125 virtual void fault() = 0;
126 // send message
127 virtual void send_message(Message *m) = 0;
128 // send keepalive
129 virtual void send_keepalive() = 0;
130
131 virtual void read_event() = 0;
132 virtual void write_event() = 0;
133 virtual bool is_queued() = 0;
134
135 int get_con_mode() const {
136 return auth_meta->con_mode;
137 }
138 };
139
140 #endif /* _MSG_ASYNC_PROTOCOL_ */