]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/simple/Pipe.h
update sources to 12.2.2
[ceph.git] / ceph / src / msg / simple / Pipe.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef CEPH_MSGR_PIPE_H
16#define CEPH_MSGR_PIPE_H
17
18#include "include/memory.h"
19#include "auth/AuthSessionHandler.h"
20
21#include "msg/msg_types.h"
22#include "msg/Messenger.h"
23#include "PipeConnection.h"
24
25
26class SimpleMessenger;
27class DispatchQueue;
28
29static const int SM_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
30
31 /**
32 * The Pipe is the most complex SimpleMessenger component. It gets
33 * two threads, one each for reading and writing on a socket it's handed
34 * at creation time, and is responsible for everything that happens on
35 * that socket. Besides message transmission, it's responsible for
36 * propagating socket errors to the SimpleMessenger and then sticking
37 * around in a state where it can provide enough data for the SimpleMessenger
38 * to provide reliable Message delivery when it manages to reconnect.
39 */
40 class Pipe : public RefCountedObject {
41 /**
42 * The Reader thread handles all reads off the socket -- not just
43 * Messages, but also acks and other protocol bits (excepting startup,
44 * when the Writer does a couple of reads).
45 * All the work is implemented in Pipe itself, of course.
46 */
47 class Reader : public Thread {
48 Pipe *pipe;
49 public:
50 explicit Reader(Pipe *p) : pipe(p) {}
51 void *entry() override { pipe->reader(); return 0; }
52 } reader_thread;
53
54 /**
55 * The Writer thread handles all writes to the socket (after startup).
56 * All the work is implemented in Pipe itself, of course.
57 */
58 class Writer : public Thread {
59 Pipe *pipe;
60 public:
61 explicit Writer(Pipe *p) : pipe(p) {}
62 void *entry() override { pipe->writer(); return 0; }
63 } writer_thread;
64
65 class DelayedDelivery;
66 DelayedDelivery *delay_thread;
67 public:
68 Pipe(SimpleMessenger *r, int st, PipeConnection *con);
69 ~Pipe() override;
70
71 SimpleMessenger *msgr;
72 uint64_t conn_id;
73 ostream& _pipe_prefix(std::ostream &out) const;
74
75 Pipe* get() {
76 return static_cast<Pipe*>(RefCountedObject::get());
77 }
78
79 bool is_connected() {
80 Mutex::Locker l(pipe_lock);
81 return state == STATE_OPEN;
82 }
83
84 char *recv_buf;
85 size_t recv_max_prefetch;
86 size_t recv_ofs;
87 size_t recv_len;
88
89 enum {
90 STATE_ACCEPTING,
91 STATE_CONNECTING,
92 STATE_OPEN,
93 STATE_STANDBY,
94 STATE_CLOSED,
95 STATE_CLOSING,
96 STATE_WAIT // just wait for racing connection
97 };
98
99 static const char *get_state_name(int s) {
100 switch (s) {
101 case STATE_ACCEPTING: return "accepting";
102 case STATE_CONNECTING: return "connecting";
103 case STATE_OPEN: return "open";
104 case STATE_STANDBY: return "standby";
105 case STATE_CLOSED: return "closed";
106 case STATE_CLOSING: return "closing";
107 case STATE_WAIT: return "wait";
108 default: return "UNKNOWN";
109 }
110 }
111 const char *get_state_name() {
112 return get_state_name(state);
113 }
114
115 private:
116 int sd;
117 struct iovec msgvec[SM_IOV_MAX];
7c673cae
FG
118
119 public:
120 int port;
121 int peer_type;
122 entity_addr_t peer_addr;
123 Messenger::Policy policy;
124
125 Mutex pipe_lock;
126 int state;
31f18b77 127 std::atomic<bool> state_closed = { false }; // true iff state = STATE_CLOSED
7c673cae
FG
128
129 // session_security handles any signatures or encryptions required for this pipe's msgs. PLR
130
131 ceph::shared_ptr<AuthSessionHandler> session_security;
132
133 protected:
134 friend class SimpleMessenger;
135 PipeConnectionRef connection_state;
136
137 utime_t backoff; // backoff time
138
139 bool reader_running, reader_needs_join;
140 bool reader_dispatching; /// reader thread is dispatching without pipe_lock
141 bool notify_on_dispatch_done; /// something wants a signal when dispatch done
142 bool writer_running;
143
144 map<int, list<Message*> > out_q; // priority queue for outbound msgs
145 DispatchQueue *in_q;
146 list<Message*> sent;
147 Cond cond;
148 bool send_keepalive;
149 bool send_keepalive_ack;
150 utime_t keepalive_ack_stamp;
151 bool halt_delivery; //if a pipe's queue is destroyed, stop adding to it
152
153 __u32 connect_seq, peer_global_seq;
154 uint64_t out_seq;
155 uint64_t in_seq, in_seq_acked;
156
157 void set_socket_options();
158
159 int accept(); // server handshake
160 int connect(); // client handshake
161 void reader();
162 void writer();
163 void unlock_maybe_reap();
164
165 int randomize_out_seq();
166
167 int read_message(Message **pm,
168 AuthSessionHandler *session_security_copy);
169 int write_message(const ceph_msg_header& h, const ceph_msg_footer& f, bufferlist& body);
170 /**
171 * Write the given data (of length len) to the Pipe's socket. This function
172 * will loop until all passed data has been written out.
173 * If more is set, the function will optimize socket writes
174 * for additional data (by passing the MSG_MORE flag, aka TCP_CORK).
175 *
176 * @param msg The msghdr to write out
177 * @param len The length of the data in msg
178 * @param more Should be set true if this is one part of a larger message
179 * @return 0, or -1 on failure (unrecoverable -- close the socket).
180 */
181 int do_sendmsg(struct msghdr *msg, unsigned len, bool more=false);
182 int write_ack(uint64_t s);
183 int write_keepalive();
184 int write_keepalive2(char tag, const utime_t &t);
185
7c673cae
FG
186 void fault(bool reader=false);
187
188 void was_session_reset();
189
190 /* Clean up sent list */
191 void handle_ack(uint64_t seq);
192
193 public:
194 Pipe(const Pipe& other);
195 const Pipe& operator=(const Pipe& other);
196
197 void start_reader();
198 void start_writer();
199 void maybe_start_delay_thread();
200 void join_reader();
201
202 // public constructors
203 static const Pipe& Server(int s);
204 static const Pipe& Client(const entity_addr_t& pi);
205
206 uint64_t get_out_seq() { return out_seq; }
207
208 bool is_queued() { return !out_q.empty() || send_keepalive || send_keepalive_ack; }
209
210 entity_addr_t& get_peer_addr() { return peer_addr; }
211
212 void set_peer_addr(const entity_addr_t& a) {
213 if (&peer_addr != &a) // shut up valgrind
214 peer_addr = a;
215 connection_state->set_peer_addr(a);
216 }
217 void set_peer_type(int t) {
218 peer_type = t;
219 connection_state->set_peer_type(t);
220 }
221
222 void register_pipe();
223 void unregister_pipe();
224 void join();
225 /// stop a Pipe by closing its socket and setting it to STATE_CLOSED
226 void stop();
227 /// stop() a Pipe if not already done, and wait for it to finish any
228 /// fast_dispatch in progress.
229 void stop_and_wait();
230
231 void _send(Message *m) {
232 assert(pipe_lock.is_locked());
233 out_q[m->get_priority()].push_back(m);
234 cond.Signal();
235 }
236 void _send_keepalive() {
237 assert(pipe_lock.is_locked());
238 send_keepalive = true;
239 cond.Signal();
240 }
241 Message *_get_next_outgoing() {
242 assert(pipe_lock.is_locked());
243 Message *m = 0;
244 while (!m && !out_q.empty()) {
245 map<int, list<Message*> >::reverse_iterator p = out_q.rbegin();
246 if (!p->second.empty()) {
247 m = p->second.front();
248 p->second.pop_front();
249 }
250 if (p->second.empty())
251 out_q.erase(p->first);
252 }
253 return m;
254 }
255
256 /// move all messages in the sent list back into the queue at the highest priority.
257 void requeue_sent();
258 /// discard messages requeued by requeued_sent() up to a given seq
259 void discard_requeued_up_to(uint64_t seq);
260 void discard_out_queue();
261
262 void shutdown_socket() {
263 recv_reset();
264 if (sd >= 0)
265 ::shutdown(sd, SHUT_RDWR);
266 }
267
268 void recv_reset() {
269 recv_len = 0;
270 recv_ofs = 0;
271 }
272 ssize_t do_recv(char *buf, size_t len, int flags);
273 ssize_t buffered_recv(char *buf, size_t len, int flags);
274 bool has_pending_data() { return recv_len > recv_ofs; }
275
276 /**
277 * do a blocking read of len bytes from socket
278 *
279 * @param buf buffer to read into
280 * @param len exact number of bytes to read
281 * @return 0 for success, or -1 on error
282 */
283 int tcp_read(char *buf, unsigned len);
284
285 /**
286 * wait for bytes to become available on the socket
287 *
288 * @return 0 for success, or -1 on error
289 */
290 int tcp_read_wait();
291
292 /**
293 * non-blocking read of available bytes on socket
294 *
295 * This is expected to be used after tcp_read_wait(), and will return
296 * an error if there is no data on the socket to consume.
297 *
298 * @param buf buffer to read into
299 * @param len maximum number of bytes to read
300 * @return bytes read, or -1 on error or when there is no data
301 */
302 ssize_t tcp_read_nonblocking(char *buf, unsigned len);
303
304 /**
305 * blocking write of bytes to socket
306 *
307 * @param buf buffer
308 * @param len number of bytes to write
309 * @return 0 for success, or -1 on error
310 */
311 int tcp_write(const char *buf, unsigned len);
312
313 };
314
315
316#endif