]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/simple/Pipe.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / msg / simple / Pipe.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef CEPH_MSGR_PIPE_H
16#define CEPH_MSGR_PIPE_H
17
7c673cae
FG
18#include "auth/AuthSessionHandler.h"
19
20#include "msg/msg_types.h"
21#include "msg/Messenger.h"
22#include "PipeConnection.h"
23
24
25class SimpleMessenger;
26class DispatchQueue;
27
28static const int SM_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
29
30 /**
31 * The Pipe is the most complex SimpleMessenger component. It gets
32 * two threads, one each for reading and writing on a socket it's handed
33 * at creation time, and is responsible for everything that happens on
34 * that socket. Besides message transmission, it's responsible for
35 * propagating socket errors to the SimpleMessenger and then sticking
36 * around in a state where it can provide enough data for the SimpleMessenger
37 * to provide reliable Message delivery when it manages to reconnect.
38 */
39 class Pipe : public RefCountedObject {
40 /**
41 * The Reader thread handles all reads off the socket -- not just
42 * Messages, but also acks and other protocol bits (excepting startup,
43 * when the Writer does a couple of reads).
44 * All the work is implemented in Pipe itself, of course.
45 */
46 class Reader : public Thread {
47 Pipe *pipe;
48 public:
49 explicit Reader(Pipe *p) : pipe(p) {}
50 void *entry() override { pipe->reader(); return 0; }
51 } reader_thread;
52
53 /**
54 * The Writer thread handles all writes to the socket (after startup).
55 * All the work is implemented in Pipe itself, of course.
56 */
57 class Writer : public Thread {
58 Pipe *pipe;
59 public:
60 explicit Writer(Pipe *p) : pipe(p) {}
61 void *entry() override { pipe->writer(); return 0; }
62 } writer_thread;
63
64 class DelayedDelivery;
65 DelayedDelivery *delay_thread;
66 public:
67 Pipe(SimpleMessenger *r, int st, PipeConnection *con);
68 ~Pipe() override;
69
70 SimpleMessenger *msgr;
71 uint64_t conn_id;
72 ostream& _pipe_prefix(std::ostream &out) const;
73
74 Pipe* get() {
75 return static_cast<Pipe*>(RefCountedObject::get());
76 }
77
78 bool is_connected() {
79 Mutex::Locker l(pipe_lock);
80 return state == STATE_OPEN;
81 }
82
83 char *recv_buf;
84 size_t recv_max_prefetch;
85 size_t recv_ofs;
86 size_t recv_len;
87
88 enum {
89 STATE_ACCEPTING,
90 STATE_CONNECTING,
91 STATE_OPEN,
92 STATE_STANDBY,
93 STATE_CLOSED,
94 STATE_CLOSING,
95 STATE_WAIT // just wait for racing connection
96 };
97
98 static const char *get_state_name(int s) {
99 switch (s) {
100 case STATE_ACCEPTING: return "accepting";
101 case STATE_CONNECTING: return "connecting";
102 case STATE_OPEN: return "open";
103 case STATE_STANDBY: return "standby";
104 case STATE_CLOSED: return "closed";
105 case STATE_CLOSING: return "closing";
106 case STATE_WAIT: return "wait";
107 default: return "UNKNOWN";
108 }
109 }
110 const char *get_state_name() {
111 return get_state_name(state);
112 }
113
114 private:
115 int sd;
116 struct iovec msgvec[SM_IOV_MAX];
7c673cae
FG
117
118 public:
119 int port;
120 int peer_type;
121 entity_addr_t peer_addr;
122 Messenger::Policy policy;
123
124 Mutex pipe_lock;
125 int state;
31f18b77 126 std::atomic<bool> state_closed = { false }; // true iff state = STATE_CLOSED
7c673cae
FG
127
128 // session_security handles any signatures or encryptions required for this pipe's msgs. PLR
129
11fdf7f2 130 std::shared_ptr<AuthSessionHandler> session_security;
7c673cae
FG
131
132 protected:
133 friend class SimpleMessenger;
134 PipeConnectionRef connection_state;
135
136 utime_t backoff; // backoff time
137
138 bool reader_running, reader_needs_join;
139 bool reader_dispatching; /// reader thread is dispatching without pipe_lock
140 bool notify_on_dispatch_done; /// something wants a signal when dispatch done
141 bool writer_running;
142
143 map<int, list<Message*> > out_q; // priority queue for outbound msgs
144 DispatchQueue *in_q;
145 list<Message*> sent;
146 Cond cond;
147 bool send_keepalive;
148 bool send_keepalive_ack;
149 utime_t keepalive_ack_stamp;
150 bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it
151
152 __u32 connect_seq, peer_global_seq;
153 uint64_t out_seq;
154 uint64_t in_seq, in_seq_acked;
155
156 void set_socket_options();
157
158 int accept(); // server handshake
159 int connect(); // client handshake
160 void reader();
161 void writer();
162 void unlock_maybe_reap();
163
11fdf7f2 164 void randomize_out_seq();
7c673cae
FG
165
166 int read_message(Message **pm,
167 AuthSessionHandler *session_security_copy);
168 int write_message(const ceph_msg_header& h, const ceph_msg_footer& f, bufferlist& body);
169 /**
170 * Write the given data (of length len) to the Pipe's socket. This function
171 * will loop until all passed data has been written out.
172 * If more is set, the function will optimize socket writes
173 * for additional data (by passing the MSG_MORE flag, aka TCP_CORK).
174 *
175 * @param msg The msghdr to write out
176 * @param len The length of the data in msg
177 * @param more Should be set true if this is one part of a larger message
178 * @return 0, or -1 on failure (unrecoverable -- close the socket).
179 */
180 int do_sendmsg(struct msghdr *msg, unsigned len, bool more=false);
181 int write_ack(uint64_t s);
182 int write_keepalive();
183 int write_keepalive2(char tag, const utime_t &t);
184
7c673cae
FG
185 void fault(bool reader=false);
186
187 void was_session_reset();
188
189 /* Clean up sent list */
190 void handle_ack(uint64_t seq);
191
192 public:
193 Pipe(const Pipe& other);
194 const Pipe& operator=(const Pipe& other);
195
196 void start_reader();
197 void start_writer();
198 void maybe_start_delay_thread();
199 void join_reader();
200
201 // public constructors
202 static const Pipe& Server(int s);
203 static const Pipe& Client(const entity_addr_t& pi);
204
205 uint64_t get_out_seq() { return out_seq; }
206
207 bool is_queued() { return !out_q.empty() || send_keepalive || send_keepalive_ack; }
208
209 entity_addr_t& get_peer_addr() { return peer_addr; }
210
211 void set_peer_addr(const entity_addr_t& a) {
212 if (&peer_addr != &a) // shut up valgrind
213 peer_addr = a;
214 connection_state->set_peer_addr(a);
215 }
216 void set_peer_type(int t) {
217 peer_type = t;
218 connection_state->set_peer_type(t);
219 }
220
221 void register_pipe();
222 void unregister_pipe();
223 void join();
224 /// stop a Pipe by closing its socket and setting it to STATE_CLOSED
225 void stop();
226 /// stop() a Pipe if not already done, and wait for it to finish any
227 /// fast_dispatch in progress.
228 void stop_and_wait();
229
230 void _send(Message *m) {
11fdf7f2 231 ceph_assert(pipe_lock.is_locked());
7c673cae
FG
232 out_q[m->get_priority()].push_back(m);
233 cond.Signal();
234 }
235 void _send_keepalive() {
11fdf7f2 236 ceph_assert(pipe_lock.is_locked());
7c673cae
FG
237 send_keepalive = true;
238 cond.Signal();
239 }
240 Message *_get_next_outgoing() {
11fdf7f2 241 ceph_assert(pipe_lock.is_locked());
7c673cae
FG
242 Message *m = 0;
243 while (!m && !out_q.empty()) {
244 map<int, list<Message*> >::reverse_iterator p = out_q.rbegin();
245 if (!p->second.empty()) {
246 m = p->second.front();
247 p->second.pop_front();
248 }
249 if (p->second.empty())
250 out_q.erase(p->first);
251 }
252 return m;
253 }
254
255 /// move all messages in the sent list back into the queue at the highest priority.
256 void requeue_sent();
257 /// discard messages requeued by requeued_sent() up to a given seq
258 void discard_requeued_up_to(uint64_t seq);
259 void discard_out_queue();
260
261 void shutdown_socket() {
262 recv_reset();
263 if (sd >= 0)
264 ::shutdown(sd, SHUT_RDWR);
265 }
266
267 void recv_reset() {
268 recv_len = 0;
269 recv_ofs = 0;
270 }
271 ssize_t do_recv(char *buf, size_t len, int flags);
272 ssize_t buffered_recv(char *buf, size_t len, int flags);
273 bool has_pending_data() { return recv_len > recv_ofs; }
274
275 /**
276 * do a blocking read of len bytes from socket
277 *
278 * @param buf buffer to read into
279 * @param len exact number of bytes to read
280 * @return 0 for success, or -1 on error
281 */
282 int tcp_read(char *buf, unsigned len);
283
284 /**
285 * wait for bytes to become available on the socket
286 *
287 * @return 0 for success, or -1 on error
288 */
289 int tcp_read_wait();
290
291 /**
292 * non-blocking read of available bytes on socket
293 *
294 * This is expected to be used after tcp_read_wait(), and will return
295 * an error if there is no data on the socket to consume.
296 *
297 * @param buf buffer to read into
298 * @param len maximum number of bytes to read
299 * @return bytes read, or -1 on error or when there is no data
300 */
301 ssize_t tcp_read_nonblocking(char *buf, unsigned len);
302
303 /**
304 * blocking write of bytes to socket
305 *
306 * @param buf buffer
307 * @param len number of bytes to write
308 * @return 0 for success, or -1 on error
309 */
310 int tcp_write(const char *buf, unsigned len);
311
312 };
313
314
315#endif