]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef CEPH_MSGR_PIPE_H | |
16 | #define CEPH_MSGR_PIPE_H | |
17 | ||
7c673cae FG |
18 | #include "auth/AuthSessionHandler.h" |
19 | ||
20 | #include "msg/msg_types.h" | |
21 | #include "msg/Messenger.h" | |
22 | #include "PipeConnection.h" | |
23 | ||
24 | ||
25 | class SimpleMessenger; | |
26 | class DispatchQueue; | |
27 | ||
28 | static const int SM_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX); | |
29 | ||
30 | /** | |
31 | * The Pipe is the most complex SimpleMessenger component. It gets | |
32 | * two threads, one each for reading and writing on a socket it's handed | |
33 | * at creation time, and is responsible for everything that happens on | |
34 | * that socket. Besides message transmission, it's responsible for | |
35 | * propagating socket errors to the SimpleMessenger and then sticking | |
36 | * around in a state where it can provide enough data for the SimpleMessenger | |
37 | * to provide reliable Message delivery when it manages to reconnect. | |
38 | */ | |
39 | class Pipe : public RefCountedObject { | |
40 | /** | |
41 | * The Reader thread handles all reads off the socket -- not just | |
42 | * Messages, but also acks and other protocol bits (excepting startup, | |
43 | * when the Writer does a couple of reads). | |
44 | * All the work is implemented in Pipe itself, of course. | |
45 | */ | |
46 | class Reader : public Thread { | |
47 | Pipe *pipe; | |
48 | public: | |
49 | explicit Reader(Pipe *p) : pipe(p) {} | |
50 | void *entry() override { pipe->reader(); return 0; } | |
51 | } reader_thread; | |
52 | ||
53 | /** | |
54 | * The Writer thread handles all writes to the socket (after startup). | |
55 | * All the work is implemented in Pipe itself, of course. | |
56 | */ | |
57 | class Writer : public Thread { | |
58 | Pipe *pipe; | |
59 | public: | |
60 | explicit Writer(Pipe *p) : pipe(p) {} | |
61 | void *entry() override { pipe->writer(); return 0; } | |
62 | } writer_thread; | |
63 | ||
64 | class DelayedDelivery; | |
65 | DelayedDelivery *delay_thread; | |
66 | public: | |
67 | Pipe(SimpleMessenger *r, int st, PipeConnection *con); | |
68 | ~Pipe() override; | |
69 | ||
70 | SimpleMessenger *msgr; | |
71 | uint64_t conn_id; | |
72 | ostream& _pipe_prefix(std::ostream &out) const; | |
73 | ||
74 | Pipe* get() { | |
75 | return static_cast<Pipe*>(RefCountedObject::get()); | |
76 | } | |
77 | ||
78 | bool is_connected() { | |
79 | Mutex::Locker l(pipe_lock); | |
80 | return state == STATE_OPEN; | |
81 | } | |
82 | ||
83 | char *recv_buf; | |
84 | size_t recv_max_prefetch; | |
85 | size_t recv_ofs; | |
86 | size_t recv_len; | |
87 | ||
88 | enum { | |
89 | STATE_ACCEPTING, | |
90 | STATE_CONNECTING, | |
91 | STATE_OPEN, | |
92 | STATE_STANDBY, | |
93 | STATE_CLOSED, | |
94 | STATE_CLOSING, | |
95 | STATE_WAIT // just wait for racing connection | |
96 | }; | |
97 | ||
98 | static const char *get_state_name(int s) { | |
99 | switch (s) { | |
100 | case STATE_ACCEPTING: return "accepting"; | |
101 | case STATE_CONNECTING: return "connecting"; | |
102 | case STATE_OPEN: return "open"; | |
103 | case STATE_STANDBY: return "standby"; | |
104 | case STATE_CLOSED: return "closed"; | |
105 | case STATE_CLOSING: return "closing"; | |
106 | case STATE_WAIT: return "wait"; | |
107 | default: return "UNKNOWN"; | |
108 | } | |
109 | } | |
110 | const char *get_state_name() { | |
111 | return get_state_name(state); | |
112 | } | |
113 | ||
114 | private: | |
115 | int sd; | |
116 | struct iovec msgvec[SM_IOV_MAX]; | |
7c673cae FG |
117 | |
118 | public: | |
119 | int port; | |
120 | int peer_type; | |
121 | entity_addr_t peer_addr; | |
122 | Messenger::Policy policy; | |
123 | ||
124 | Mutex pipe_lock; | |
125 | int state; | |
31f18b77 | 126 | std::atomic<bool> state_closed = { false }; // true iff state = STATE_CLOSED |
7c673cae FG |
127 | |
128 | // session_security handles any signatures or encryptions required for this pipe's msgs. PLR | |
129 | ||
11fdf7f2 | 130 | std::shared_ptr<AuthSessionHandler> session_security; |
7c673cae FG |
131 | |
132 | protected: | |
133 | friend class SimpleMessenger; | |
134 | PipeConnectionRef connection_state; | |
135 | ||
136 | utime_t backoff; // backoff time | |
137 | ||
138 | bool reader_running, reader_needs_join; | |
139 | bool reader_dispatching; /// reader thread is dispatching without pipe_lock | |
140 | bool notify_on_dispatch_done; /// something wants a signal when dispatch done | |
141 | bool writer_running; | |
142 | ||
143 | map<int, list<Message*> > out_q; // priority queue for outbound msgs | |
144 | DispatchQueue *in_q; | |
145 | list<Message*> sent; | |
146 | Cond cond; | |
147 | bool send_keepalive; | |
148 | bool send_keepalive_ack; | |
149 | utime_t keepalive_ack_stamp; | |
150 | bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it | |
151 | ||
152 | __u32 connect_seq, peer_global_seq; | |
153 | uint64_t out_seq; | |
154 | uint64_t in_seq, in_seq_acked; | |
155 | ||
156 | void set_socket_options(); | |
157 | ||
158 | int accept(); // server handshake | |
159 | int connect(); // client handshake | |
160 | void reader(); | |
161 | void writer(); | |
162 | void unlock_maybe_reap(); | |
163 | ||
11fdf7f2 | 164 | void randomize_out_seq(); |
7c673cae FG |
165 | |
166 | int read_message(Message **pm, | |
167 | AuthSessionHandler *session_security_copy); | |
168 | int write_message(const ceph_msg_header& h, const ceph_msg_footer& f, bufferlist& body); | |
169 | /** | |
170 | * Write the given data (of length len) to the Pipe's socket. This function | |
171 | * will loop until all passed data has been written out. | |
172 | * If more is set, the function will optimize socket writes | |
173 | * for additional data (by passing the MSG_MORE flag, aka TCP_CORK). | |
174 | * | |
175 | * @param msg The msghdr to write out | |
176 | * @param len The length of the data in msg | |
177 | * @param more Should be set true if this is one part of a larger message | |
178 | * @return 0, or -1 on failure (unrecoverable -- close the socket). | |
179 | */ | |
180 | int do_sendmsg(struct msghdr *msg, unsigned len, bool more=false); | |
181 | int write_ack(uint64_t s); | |
182 | int write_keepalive(); | |
183 | int write_keepalive2(char tag, const utime_t &t); | |
184 | ||
7c673cae FG |
185 | void fault(bool reader=false); |
186 | ||
187 | void was_session_reset(); | |
188 | ||
189 | /* Clean up sent list */ | |
190 | void handle_ack(uint64_t seq); | |
191 | ||
192 | public: | |
193 | Pipe(const Pipe& other); | |
194 | const Pipe& operator=(const Pipe& other); | |
195 | ||
196 | void start_reader(); | |
197 | void start_writer(); | |
198 | void maybe_start_delay_thread(); | |
199 | void join_reader(); | |
200 | ||
201 | // public constructors | |
202 | static const Pipe& Server(int s); | |
203 | static const Pipe& Client(const entity_addr_t& pi); | |
204 | ||
205 | uint64_t get_out_seq() { return out_seq; } | |
206 | ||
207 | bool is_queued() { return !out_q.empty() || send_keepalive || send_keepalive_ack; } | |
208 | ||
209 | entity_addr_t& get_peer_addr() { return peer_addr; } | |
210 | ||
211 | void set_peer_addr(const entity_addr_t& a) { | |
212 | if (&peer_addr != &a) // shut up valgrind | |
213 | peer_addr = a; | |
214 | connection_state->set_peer_addr(a); | |
215 | } | |
216 | void set_peer_type(int t) { | |
217 | peer_type = t; | |
218 | connection_state->set_peer_type(t); | |
219 | } | |
220 | ||
221 | void register_pipe(); | |
222 | void unregister_pipe(); | |
223 | void join(); | |
224 | /// stop a Pipe by closing its socket and setting it to STATE_CLOSED | |
225 | void stop(); | |
226 | /// stop() a Pipe if not already done, and wait for it to finish any | |
227 | /// fast_dispatch in progress. | |
228 | void stop_and_wait(); | |
229 | ||
230 | void _send(Message *m) { | |
11fdf7f2 | 231 | ceph_assert(pipe_lock.is_locked()); |
7c673cae FG |
232 | out_q[m->get_priority()].push_back(m); |
233 | cond.Signal(); | |
234 | } | |
235 | void _send_keepalive() { | |
11fdf7f2 | 236 | ceph_assert(pipe_lock.is_locked()); |
7c673cae FG |
237 | send_keepalive = true; |
238 | cond.Signal(); | |
239 | } | |
240 | Message *_get_next_outgoing() { | |
11fdf7f2 | 241 | ceph_assert(pipe_lock.is_locked()); |
7c673cae FG |
242 | Message *m = 0; |
243 | while (!m && !out_q.empty()) { | |
244 | map<int, list<Message*> >::reverse_iterator p = out_q.rbegin(); | |
245 | if (!p->second.empty()) { | |
246 | m = p->second.front(); | |
247 | p->second.pop_front(); | |
248 | } | |
249 | if (p->second.empty()) | |
250 | out_q.erase(p->first); | |
251 | } | |
252 | return m; | |
253 | } | |
254 | ||
255 | /// move all messages in the sent list back into the queue at the highest priority. | |
256 | void requeue_sent(); | |
257 | /// discard messages requeued by requeued_sent() up to a given seq | |
258 | void discard_requeued_up_to(uint64_t seq); | |
259 | void discard_out_queue(); | |
260 | ||
261 | void shutdown_socket() { | |
262 | recv_reset(); | |
263 | if (sd >= 0) | |
264 | ::shutdown(sd, SHUT_RDWR); | |
265 | } | |
266 | ||
267 | void recv_reset() { | |
268 | recv_len = 0; | |
269 | recv_ofs = 0; | |
270 | } | |
271 | ssize_t do_recv(char *buf, size_t len, int flags); | |
272 | ssize_t buffered_recv(char *buf, size_t len, int flags); | |
273 | bool has_pending_data() { return recv_len > recv_ofs; } | |
274 | ||
275 | /** | |
276 | * do a blocking read of len bytes from socket | |
277 | * | |
278 | * @param buf buffer to read into | |
279 | * @param len exact number of bytes to read | |
280 | * @return 0 for success, or -1 on error | |
281 | */ | |
282 | int tcp_read(char *buf, unsigned len); | |
283 | ||
284 | /** | |
285 | * wait for bytes to become available on the socket | |
286 | * | |
287 | * @return 0 for success, or -1 on error | |
288 | */ | |
289 | int tcp_read_wait(); | |
290 | ||
291 | /** | |
292 | * non-blocking read of available bytes on socket | |
293 | * | |
294 | * This is expected to be used after tcp_read_wait(), and will return | |
295 | * an error if there is no data on the socket to consume. | |
296 | * | |
297 | * @param buf buffer to read into | |
298 | * @param len maximum number of bytes to read | |
299 | * @return bytes read, or -1 on error or when there is no data | |
300 | */ | |
301 | ssize_t tcp_read_nonblocking(char *buf, unsigned len); | |
302 | ||
303 | /** | |
304 | * blocking write of bytes to socket | |
305 | * | |
306 | * @param buf buffer | |
307 | * @param len number of bytes to write | |
308 | * @return 0 for success, or -1 on error | |
309 | */ | |
310 | int tcp_write(const char *buf, unsigned len); | |
311 | ||
312 | }; | |
313 | ||
314 | ||
315 | #endif |