]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef CEPH_MSGR_PIPE_H | |
16 | #define CEPH_MSGR_PIPE_H | |
17 | ||
18 | #include "include/memory.h" | |
19 | #include "auth/AuthSessionHandler.h" | |
20 | ||
21 | #include "msg/msg_types.h" | |
22 | #include "msg/Messenger.h" | |
23 | #include "PipeConnection.h" | |
24 | ||
25 | ||
26 | class SimpleMessenger; | |
27 | class DispatchQueue; | |
28 | ||
29 | static const int SM_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX); | |
30 | ||
31 | /** | |
32 | * The Pipe is the most complex SimpleMessenger component. It gets | |
33 | * two threads, one each for reading and writing on a socket it's handed | |
34 | * at creation time, and is responsible for everything that happens on | |
35 | * that socket. Besides message transmission, it's responsible for | |
36 | * propagating socket errors to the SimpleMessenger and then sticking | |
37 | * around in a state where it can provide enough data for the SimpleMessenger | |
38 | * to provide reliable Message delivery when it manages to reconnect. | |
39 | */ | |
40 | class Pipe : public RefCountedObject { | |
41 | /** | |
42 | * The Reader thread handles all reads off the socket -- not just | |
43 | * Messages, but also acks and other protocol bits (excepting startup, | |
44 | * when the Writer does a couple of reads). | |
45 | * All the work is implemented in Pipe itself, of course. | |
46 | */ | |
47 | class Reader : public Thread { | |
48 | Pipe *pipe; | |
49 | public: | |
50 | explicit Reader(Pipe *p) : pipe(p) {} | |
51 | void *entry() override { pipe->reader(); return 0; } | |
52 | } reader_thread; | |
53 | ||
54 | /** | |
55 | * The Writer thread handles all writes to the socket (after startup). | |
56 | * All the work is implemented in Pipe itself, of course. | |
57 | */ | |
58 | class Writer : public Thread { | |
59 | Pipe *pipe; | |
60 | public: | |
61 | explicit Writer(Pipe *p) : pipe(p) {} | |
62 | void *entry() override { pipe->writer(); return 0; } | |
63 | } writer_thread; | |
64 | ||
65 | class DelayedDelivery; | |
66 | DelayedDelivery *delay_thread; | |
67 | public: | |
68 | Pipe(SimpleMessenger *r, int st, PipeConnection *con); | |
69 | ~Pipe() override; | |
70 | ||
71 | SimpleMessenger *msgr; | |
72 | uint64_t conn_id; | |
73 | ostream& _pipe_prefix(std::ostream &out) const; | |
74 | ||
75 | Pipe* get() { | |
76 | return static_cast<Pipe*>(RefCountedObject::get()); | |
77 | } | |
78 | ||
79 | bool is_connected() { | |
80 | Mutex::Locker l(pipe_lock); | |
81 | return state == STATE_OPEN; | |
82 | } | |
83 | ||
84 | char *recv_buf; | |
85 | size_t recv_max_prefetch; | |
86 | size_t recv_ofs; | |
87 | size_t recv_len; | |
88 | ||
89 | enum { | |
90 | STATE_ACCEPTING, | |
91 | STATE_CONNECTING, | |
92 | STATE_OPEN, | |
93 | STATE_STANDBY, | |
94 | STATE_CLOSED, | |
95 | STATE_CLOSING, | |
96 | STATE_WAIT // just wait for racing connection | |
97 | }; | |
98 | ||
99 | static const char *get_state_name(int s) { | |
100 | switch (s) { | |
101 | case STATE_ACCEPTING: return "accepting"; | |
102 | case STATE_CONNECTING: return "connecting"; | |
103 | case STATE_OPEN: return "open"; | |
104 | case STATE_STANDBY: return "standby"; | |
105 | case STATE_CLOSED: return "closed"; | |
106 | case STATE_CLOSING: return "closing"; | |
107 | case STATE_WAIT: return "wait"; | |
108 | default: return "UNKNOWN"; | |
109 | } | |
110 | } | |
111 | const char *get_state_name() { | |
112 | return get_state_name(state); | |
113 | } | |
114 | ||
115 | private: | |
116 | int sd; | |
117 | struct iovec msgvec[SM_IOV_MAX]; | |
7c673cae FG |
118 | |
119 | public: | |
120 | int port; | |
121 | int peer_type; | |
122 | entity_addr_t peer_addr; | |
123 | Messenger::Policy policy; | |
124 | ||
125 | Mutex pipe_lock; | |
126 | int state; | |
31f18b77 | 127 | std::atomic<bool> state_closed = { false }; // true iff state = STATE_CLOSED |
7c673cae FG |
128 | |
129 | // session_security handles any signatures or encryptions required for this pipe's msgs. PLR | |
130 | ||
131 | ceph::shared_ptr<AuthSessionHandler> session_security; | |
132 | ||
133 | protected: | |
134 | friend class SimpleMessenger; | |
135 | PipeConnectionRef connection_state; | |
136 | ||
137 | utime_t backoff; // backoff time | |
138 | ||
139 | bool reader_running, reader_needs_join; | |
140 | bool reader_dispatching; /// reader thread is dispatching without pipe_lock | |
141 | bool notify_on_dispatch_done; /// something wants a signal when dispatch done | |
142 | bool writer_running; | |
143 | ||
144 | map<int, list<Message*> > out_q; // priority queue for outbound msgs | |
145 | DispatchQueue *in_q; | |
146 | list<Message*> sent; | |
147 | Cond cond; | |
148 | bool send_keepalive; | |
149 | bool send_keepalive_ack; | |
150 | utime_t keepalive_ack_stamp; | |
151 | bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it | |
152 | ||
153 | __u32 connect_seq, peer_global_seq; | |
154 | uint64_t out_seq; | |
155 | uint64_t in_seq, in_seq_acked; | |
156 | ||
157 | void set_socket_options(); | |
158 | ||
159 | int accept(); // server handshake | |
160 | int connect(); // client handshake | |
161 | void reader(); | |
162 | void writer(); | |
163 | void unlock_maybe_reap(); | |
164 | ||
165 | int randomize_out_seq(); | |
166 | ||
167 | int read_message(Message **pm, | |
168 | AuthSessionHandler *session_security_copy); | |
169 | int write_message(const ceph_msg_header& h, const ceph_msg_footer& f, bufferlist& body); | |
170 | /** | |
171 | * Write the given data (of length len) to the Pipe's socket. This function | |
172 | * will loop until all passed data has been written out. | |
173 | * If more is set, the function will optimize socket writes | |
174 | * for additional data (by passing the MSG_MORE flag, aka TCP_CORK). | |
175 | * | |
176 | * @param msg The msghdr to write out | |
177 | * @param len The length of the data in msg | |
178 | * @param more Should be set true if this is one part of a larger message | |
179 | * @return 0, or -1 on failure (unrecoverable -- close the socket). | |
180 | */ | |
181 | int do_sendmsg(struct msghdr *msg, unsigned len, bool more=false); | |
182 | int write_ack(uint64_t s); | |
183 | int write_keepalive(); | |
184 | int write_keepalive2(char tag, const utime_t &t); | |
185 | ||
7c673cae FG |
186 | void fault(bool reader=false); |
187 | ||
188 | void was_session_reset(); | |
189 | ||
190 | /* Clean up sent list */ | |
191 | void handle_ack(uint64_t seq); | |
192 | ||
193 | public: | |
194 | Pipe(const Pipe& other); | |
195 | const Pipe& operator=(const Pipe& other); | |
196 | ||
197 | void start_reader(); | |
198 | void start_writer(); | |
199 | void maybe_start_delay_thread(); | |
200 | void join_reader(); | |
201 | ||
202 | // public constructors | |
203 | static const Pipe& Server(int s); | |
204 | static const Pipe& Client(const entity_addr_t& pi); | |
205 | ||
206 | uint64_t get_out_seq() { return out_seq; } | |
207 | ||
208 | bool is_queued() { return !out_q.empty() || send_keepalive || send_keepalive_ack; } | |
209 | ||
210 | entity_addr_t& get_peer_addr() { return peer_addr; } | |
211 | ||
212 | void set_peer_addr(const entity_addr_t& a) { | |
213 | if (&peer_addr != &a) // shut up valgrind | |
214 | peer_addr = a; | |
215 | connection_state->set_peer_addr(a); | |
216 | } | |
217 | void set_peer_type(int t) { | |
218 | peer_type = t; | |
219 | connection_state->set_peer_type(t); | |
220 | } | |
221 | ||
222 | void register_pipe(); | |
223 | void unregister_pipe(); | |
224 | void join(); | |
225 | /// stop a Pipe by closing its socket and setting it to STATE_CLOSED | |
226 | void stop(); | |
227 | /// stop() a Pipe if not already done, and wait for it to finish any | |
228 | /// fast_dispatch in progress. | |
229 | void stop_and_wait(); | |
230 | ||
231 | void _send(Message *m) { | |
232 | assert(pipe_lock.is_locked()); | |
233 | out_q[m->get_priority()].push_back(m); | |
234 | cond.Signal(); | |
235 | } | |
236 | void _send_keepalive() { | |
237 | assert(pipe_lock.is_locked()); | |
238 | send_keepalive = true; | |
239 | cond.Signal(); | |
240 | } | |
241 | Message *_get_next_outgoing() { | |
242 | assert(pipe_lock.is_locked()); | |
243 | Message *m = 0; | |
244 | while (!m && !out_q.empty()) { | |
245 | map<int, list<Message*> >::reverse_iterator p = out_q.rbegin(); | |
246 | if (!p->second.empty()) { | |
247 | m = p->second.front(); | |
248 | p->second.pop_front(); | |
249 | } | |
250 | if (p->second.empty()) | |
251 | out_q.erase(p->first); | |
252 | } | |
253 | return m; | |
254 | } | |
255 | ||
256 | /// move all messages in the sent list back into the queue at the highest priority. | |
257 | void requeue_sent(); | |
258 | /// discard messages requeued by requeued_sent() up to a given seq | |
259 | void discard_requeued_up_to(uint64_t seq); | |
260 | void discard_out_queue(); | |
261 | ||
262 | void shutdown_socket() { | |
263 | recv_reset(); | |
264 | if (sd >= 0) | |
265 | ::shutdown(sd, SHUT_RDWR); | |
266 | } | |
267 | ||
268 | void recv_reset() { | |
269 | recv_len = 0; | |
270 | recv_ofs = 0; | |
271 | } | |
272 | ssize_t do_recv(char *buf, size_t len, int flags); | |
273 | ssize_t buffered_recv(char *buf, size_t len, int flags); | |
274 | bool has_pending_data() { return recv_len > recv_ofs; } | |
275 | ||
276 | /** | |
277 | * do a blocking read of len bytes from socket | |
278 | * | |
279 | * @param buf buffer to read into | |
280 | * @param len exact number of bytes to read | |
281 | * @return 0 for success, or -1 on error | |
282 | */ | |
283 | int tcp_read(char *buf, unsigned len); | |
284 | ||
285 | /** | |
286 | * wait for bytes to become available on the socket | |
287 | * | |
288 | * @return 0 for success, or -1 on error | |
289 | */ | |
290 | int tcp_read_wait(); | |
291 | ||
292 | /** | |
293 | * non-blocking read of available bytes on socket | |
294 | * | |
295 | * This is expected to be used after tcp_read_wait(), and will return | |
296 | * an error if there is no data on the socket to consume. | |
297 | * | |
298 | * @param buf buffer to read into | |
299 | * @param len maximum number of bytes to read | |
300 | * @return bytes read, or -1 on error or when there is no data | |
301 | */ | |
302 | ssize_t tcp_read_nonblocking(char *buf, unsigned len); | |
303 | ||
304 | /** | |
305 | * blocking write of bytes to socket | |
306 | * | |
307 | * @param buf buffer | |
308 | * @param len number of bytes to write | |
309 | * @return 0 for success, or -1 on error | |
310 | */ | |
311 | int tcp_write(const char *buf, unsigned len); | |
312 | ||
313 | }; | |
314 | ||
315 | ||
316 | #endif |