]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef CEPH_MESSAGE_H | |
16 | #define CEPH_MESSAGE_H | |
17 | ||
18 | #include <stdlib.h> | |
19 | #include <ostream> | |
20 | ||
21 | #include <boost/intrusive_ptr.hpp> | |
22 | #include <boost/intrusive/list.hpp> | |
23 | // Because intrusive_ptr clobbers our assert... | |
24 | #include "include/assert.h" | |
25 | ||
26 | #include "include/types.h" | |
27 | #include "include/buffer.h" | |
28 | #include "common/Throttle.h" | |
29 | #include "common/zipkin_trace.h" | |
30 | #include "msg_types.h" | |
31 | ||
32 | #include "common/RefCountedObj.h" | |
33 | #include "msg/Connection.h" | |
34 | ||
35 | #include "common/debug.h" | |
36 | #include "common/config.h" | |
37 | ||
38 | // monitor internal | |
39 | #define MSG_MON_SCRUB 64 | |
40 | #define MSG_MON_ELECTION 65 | |
41 | #define MSG_MON_PAXOS 66 | |
42 | #define MSG_MON_PROBE 67 | |
43 | #define MSG_MON_JOIN 68 | |
44 | #define MSG_MON_SYNC 69 | |
45 | ||
46 | /* monitor <-> mon admin tool */ | |
47 | #define MSG_MON_COMMAND 50 | |
48 | #define MSG_MON_COMMAND_ACK 51 | |
49 | #define MSG_LOG 52 | |
50 | #define MSG_LOGACK 53 | |
51 | ||
52 | #define MSG_GETPOOLSTATS 58 | |
53 | #define MSG_GETPOOLSTATSREPLY 59 | |
54 | ||
55 | #define MSG_MON_GLOBAL_ID 60 | |
56 | ||
57 | #define MSG_ROUTE 47 | |
58 | #define MSG_FORWARD 46 | |
59 | ||
60 | #define MSG_PAXOS 40 | |
61 | ||
62 | ||
63 | // osd internal | |
64 | #define MSG_OSD_PING 70 | |
65 | #define MSG_OSD_BOOT 71 | |
66 | #define MSG_OSD_FAILURE 72 | |
67 | #define MSG_OSD_ALIVE 73 | |
68 | #define MSG_OSD_MARK_ME_DOWN 74 | |
69 | #define MSG_OSD_FULL 75 | |
70 | ||
71 | #define MSG_OSD_SUBOP 76 | |
72 | #define MSG_OSD_SUBOPREPLY 77 | |
73 | ||
74 | #define MSG_OSD_PGTEMP 78 | |
75 | ||
76 | #define MSG_OSD_BEACON 79 | |
77 | ||
78 | #define MSG_OSD_PG_NOTIFY 80 | |
79 | #define MSG_OSD_PG_QUERY 81 | |
80 | #define MSG_OSD_PG_LOG 83 | |
81 | #define MSG_OSD_PG_REMOVE 84 | |
82 | #define MSG_OSD_PG_INFO 85 | |
83 | #define MSG_OSD_PG_TRIM 86 | |
84 | ||
85 | #define MSG_PGSTATS 87 | |
86 | #define MSG_PGSTATSACK 88 | |
87 | ||
88 | #define MSG_OSD_PG_CREATE 89 | |
89 | #define MSG_REMOVE_SNAPS 90 | |
90 | ||
91 | #define MSG_OSD_SCRUB 91 | |
92 | #define MSG_OSD_SCRUB_RESERVE 92 // previous PG_MISSING | |
93 | #define MSG_OSD_REP_SCRUB 93 | |
94 | ||
95 | #define MSG_OSD_PG_SCAN 94 | |
96 | #define MSG_OSD_PG_BACKFILL 95 | |
97 | #define MSG_OSD_PG_BACKFILL_REMOVE 96 | |
98 | ||
99 | #define MSG_COMMAND 97 | |
100 | #define MSG_COMMAND_REPLY 98 | |
101 | ||
102 | #define MSG_OSD_BACKFILL_RESERVE 99 | |
103 | #define MSG_OSD_RECOVERY_RESERVE 150 | |
104 | ||
105 | #define MSG_OSD_PG_PUSH 105 | |
106 | #define MSG_OSD_PG_PULL 106 | |
107 | #define MSG_OSD_PG_PUSH_REPLY 107 | |
108 | ||
109 | #define MSG_OSD_EC_WRITE 108 | |
110 | #define MSG_OSD_EC_WRITE_REPLY 109 | |
111 | #define MSG_OSD_EC_READ 110 | |
112 | #define MSG_OSD_EC_READ_REPLY 111 | |
113 | ||
114 | #define MSG_OSD_REPOP 112 | |
115 | #define MSG_OSD_REPOPREPLY 113 | |
116 | #define MSG_OSD_PG_UPDATE_LOG_MISSING 114 | |
117 | #define MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY 115 | |
118 | ||
119 | #define MSG_OSD_PG_CREATED 116 | |
120 | #define MSG_OSD_REP_SCRUBMAP 117 | |
121 | ||
122 | // *** MDS *** | |
123 | ||
124 | #define MSG_MDS_BEACON 100 // to monitor | |
125 | #define MSG_MDS_SLAVE_REQUEST 101 | |
126 | #define MSG_MDS_TABLE_REQUEST 102 | |
127 | ||
128 | // 150 already in use (MSG_OSD_RECOVERY_RESERVE) | |
129 | ||
130 | #define MSG_MDS_RESOLVE 0x200 | |
131 | #define MSG_MDS_RESOLVEACK 0x201 | |
132 | #define MSG_MDS_CACHEREJOIN 0x202 | |
133 | #define MSG_MDS_DISCOVER 0x203 | |
134 | #define MSG_MDS_DISCOVERREPLY 0x204 | |
135 | #define MSG_MDS_INODEUPDATE 0x205 | |
136 | #define MSG_MDS_DIRUPDATE 0x206 | |
137 | #define MSG_MDS_CACHEEXPIRE 0x207 | |
138 | #define MSG_MDS_DENTRYUNLINK 0x208 | |
139 | #define MSG_MDS_FRAGMENTNOTIFY 0x209 | |
140 | #define MSG_MDS_OFFLOAD_TARGETS 0x20a | |
141 | #define MSG_MDS_DENTRYLINK 0x20c | |
142 | #define MSG_MDS_FINDINO 0x20d | |
143 | #define MSG_MDS_FINDINOREPLY 0x20e | |
144 | #define MSG_MDS_OPENINO 0x20f | |
145 | #define MSG_MDS_OPENINOREPLY 0x210 | |
146 | ||
147 | #define MSG_MDS_LOCK 0x300 | |
148 | #define MSG_MDS_INODEFILECAPS 0x301 | |
149 | ||
150 | #define MSG_MDS_EXPORTDIRDISCOVER 0x449 | |
151 | #define MSG_MDS_EXPORTDIRDISCOVERACK 0x450 | |
152 | #define MSG_MDS_EXPORTDIRCANCEL 0x451 | |
153 | #define MSG_MDS_EXPORTDIRPREP 0x452 | |
154 | #define MSG_MDS_EXPORTDIRPREPACK 0x453 | |
155 | #define MSG_MDS_EXPORTDIRWARNING 0x454 | |
156 | #define MSG_MDS_EXPORTDIRWARNINGACK 0x455 | |
157 | #define MSG_MDS_EXPORTDIR 0x456 | |
158 | #define MSG_MDS_EXPORTDIRACK 0x457 | |
159 | #define MSG_MDS_EXPORTDIRNOTIFY 0x458 | |
160 | #define MSG_MDS_EXPORTDIRNOTIFYACK 0x459 | |
161 | #define MSG_MDS_EXPORTDIRFINISH 0x460 | |
162 | ||
163 | #define MSG_MDS_EXPORTCAPS 0x470 | |
164 | #define MSG_MDS_EXPORTCAPSACK 0x471 | |
165 | #define MSG_MDS_GATHERCAPS 0x472 | |
166 | ||
167 | #define MSG_MDS_HEARTBEAT 0x500 // for mds load balancer | |
168 | ||
169 | // *** generic *** | |
170 | #define MSG_TIMECHECK 0x600 | |
171 | #define MSG_MON_HEALTH 0x601 | |
172 | ||
173 | // *** Message::encode() crcflags bits *** | |
174 | #define MSG_CRC_DATA (1 << 0) | |
175 | #define MSG_CRC_HEADER (1 << 1) | |
176 | #define MSG_CRC_ALL (MSG_CRC_DATA | MSG_CRC_HEADER) | |
177 | ||
178 | // Xio Testing | |
179 | #define MSG_DATA_PING 0x602 | |
180 | ||
181 | // Xio intends to define messages 0x603..0x606 | |
182 | ||
183 | // Special | |
184 | #define MSG_NOP 0x607 | |
185 | ||
224ce89b WB |
186 | #define MSG_MON_HEALTH_CHECKS 0x608 |
187 | ||
7c673cae FG |
188 | // *** ceph-mgr <-> OSD/MDS daemons *** |
189 | #define MSG_MGR_OPEN 0x700 | |
190 | #define MSG_MGR_CONFIGURE 0x701 | |
191 | #define MSG_MGR_REPORT 0x702 | |
192 | ||
193 | // *** ceph-mgr <-> ceph-mon *** | |
194 | #define MSG_MGR_BEACON 0x703 | |
195 | ||
196 | // *** ceph-mon(MgrMonitor) -> OSD/MDS daemons *** | |
197 | #define MSG_MGR_MAP 0x704 | |
198 | ||
199 | // *** ceph-mon(MgrMonitor) -> ceph-mgr | |
200 | #define MSG_MGR_DIGEST 0x705 | |
31f18b77 FG |
201 | // *** cephmgr -> ceph-mon |
202 | #define MSG_MON_MGR_REPORT 0x706 | |
224ce89b | 203 | #define MSG_SERVICE_MAP 0x707 |
7c673cae FG |
204 | |
205 | // ====================================================== | |
206 | ||
207 | // abstract Message class | |
208 | ||
209 | namespace bi = boost::intrusive; | |
210 | ||
211 | // XioMessenger conditional trace flags | |
212 | #define MSG_MAGIC_XIO 0x0002 | |
213 | #define MSG_MAGIC_TRACE_XCON 0x0004 | |
214 | #define MSG_MAGIC_TRACE_DTOR 0x0008 | |
215 | #define MSG_MAGIC_TRACE_HDR 0x0010 | |
216 | #define MSG_MAGIC_TRACE_XIO 0x0020 | |
217 | #define MSG_MAGIC_TRACE_XMSGR 0x0040 | |
218 | #define MSG_MAGIC_TRACE_CTR 0x0080 | |
219 | ||
220 | // XioMessenger diagnostic "ping pong" flag (resend msg when send completes) | |
221 | #define MSG_MAGIC_REDUPE 0x0100 | |
222 | ||
223 | class Message : public RefCountedObject { | |
224 | protected: | |
225 | ceph_msg_header header; // headerelope | |
226 | ceph_msg_footer footer; | |
227 | bufferlist payload; // "front" unaligned blob | |
228 | bufferlist middle; // "middle" unaligned blob | |
229 | bufferlist data; // data payload (page-alignment will be preserved where possible) | |
230 | ||
231 | /* recv_stamp is set when the Messenger starts reading the | |
232 | * Message off the wire */ | |
233 | utime_t recv_stamp; | |
234 | /* dispatch_stamp is set when the Messenger starts calling dispatch() on | |
235 | * its endpoints */ | |
236 | utime_t dispatch_stamp; | |
237 | /* throttle_stamp is the point at which we got throttle */ | |
238 | utime_t throttle_stamp; | |
239 | /* time at which message was fully read */ | |
240 | utime_t recv_complete_stamp; | |
241 | ||
242 | ConnectionRef connection; | |
243 | ||
244 | uint32_t magic = 0; | |
245 | ||
246 | bi::list_member_hook<> dispatch_q; | |
247 | ||
248 | public: | |
249 | // zipkin tracing | |
250 | ZTracer::Trace trace; | |
251 | void encode_trace(bufferlist &bl, uint64_t features) const; | |
252 | void decode_trace(bufferlist::iterator &p, bool create = false); | |
253 | ||
254 | class CompletionHook : public Context { | |
255 | protected: | |
256 | Message *m; | |
257 | friend class Message; | |
258 | public: | |
259 | explicit CompletionHook(Message *_m) : m(_m) {} | |
260 | virtual void set_message(Message *_m) { m = _m; } | |
261 | }; | |
262 | ||
263 | typedef bi::list< Message, | |
264 | bi::member_hook< Message, | |
265 | bi::list_member_hook<>, | |
266 | &Message::dispatch_q > > Queue; | |
267 | ||
268 | protected: | |
269 | CompletionHook* completion_hook = nullptr; // owned by Messenger | |
270 | ||
271 | // release our size in bytes back to this throttler when our payload | |
272 | // is adjusted or when we are destroyed. | |
273 | Throttle *byte_throttler = nullptr; | |
274 | ||
275 | // release a count back to this throttler when we are destroyed | |
276 | Throttle *msg_throttler = nullptr; | |
277 | ||
278 | // keep track of how big this message was when we reserved space in | |
279 | // the msgr dispatch_throttler, so that we can properly release it | |
280 | // later. this is necessary because messages can enter the dispatch | |
281 | // queue locally (not via read_message()), and those are not | |
282 | // currently throttled. | |
283 | uint64_t dispatch_throttle_size = 0; | |
284 | ||
285 | friend class Messenger; | |
286 | ||
287 | public: | |
288 | Message() { | |
289 | memset(&header, 0, sizeof(header)); | |
290 | memset(&footer, 0, sizeof(footer)); | |
291 | } | |
292 | Message(int t, int version=1, int compat_version=0) { | |
293 | memset(&header, 0, sizeof(header)); | |
294 | header.type = t; | |
295 | header.version = version; | |
296 | header.compat_version = compat_version; | |
297 | header.priority = 0; // undef | |
298 | header.data_off = 0; | |
299 | memset(&footer, 0, sizeof(footer)); | |
300 | } | |
301 | ||
302 | Message *get() { | |
303 | return static_cast<Message *>(RefCountedObject::get()); | |
304 | } | |
305 | ||
306 | protected: | |
307 | ~Message() override { | |
308 | if (byte_throttler) | |
309 | byte_throttler->put(payload.length() + middle.length() + data.length()); | |
310 | release_message_throttle(); | |
311 | trace.event("message destructed"); | |
312 | /* call completion hooks (if any) */ | |
313 | if (completion_hook) | |
314 | completion_hook->complete(0); | |
315 | } | |
316 | public: | |
317 | const ConnectionRef& get_connection() const { return connection; } | |
318 | void set_connection(const ConnectionRef& c) { | |
319 | connection = c; | |
320 | } | |
321 | CompletionHook* get_completion_hook() { return completion_hook; } | |
322 | void set_completion_hook(CompletionHook *hook) { completion_hook = hook; } | |
323 | void set_byte_throttler(Throttle *t) { byte_throttler = t; } | |
324 | Throttle *get_byte_throttler() { return byte_throttler; } | |
325 | void set_message_throttler(Throttle *t) { msg_throttler = t; } | |
326 | Throttle *get_message_throttler() { return msg_throttler; } | |
327 | ||
328 | void set_dispatch_throttle_size(uint64_t s) { dispatch_throttle_size = s; } | |
329 | uint64_t get_dispatch_throttle_size() const { return dispatch_throttle_size; } | |
330 | ||
331 | const ceph_msg_header &get_header() const { return header; } | |
332 | ceph_msg_header &get_header() { return header; } | |
333 | void set_header(const ceph_msg_header &e) { header = e; } | |
334 | void set_footer(const ceph_msg_footer &e) { footer = e; } | |
335 | const ceph_msg_footer &get_footer() const { return footer; } | |
336 | ceph_msg_footer &get_footer() { return footer; } | |
337 | void set_src(const entity_name_t& src) { header.src = src; } | |
338 | ||
339 | uint32_t get_magic() const { return magic; } | |
340 | void set_magic(int _magic) { magic = _magic; } | |
341 | ||
342 | /* | |
343 | * If you use get_[data, middle, payload] you shouldn't | |
344 | * use it to change those bufferlists unless you KNOW | |
345 | * there is no throttle being used. The other | |
346 | * functions are throttling-aware as appropriate. | |
347 | */ | |
348 | ||
349 | void clear_payload() { | |
350 | if (byte_throttler) { | |
351 | byte_throttler->put(payload.length() + middle.length()); | |
352 | } | |
353 | payload.clear(); | |
354 | middle.clear(); | |
355 | } | |
356 | ||
357 | virtual void clear_buffers() {} | |
358 | void clear_data() { | |
359 | if (byte_throttler) | |
360 | byte_throttler->put(data.length()); | |
361 | data.clear(); | |
362 | clear_buffers(); // let subclass drop buffers as well | |
363 | } | |
364 | void release_message_throttle() { | |
365 | if (msg_throttler) | |
366 | msg_throttler->put(); | |
367 | msg_throttler = nullptr; | |
368 | } | |
369 | ||
370 | bool empty_payload() const { return payload.length() == 0; } | |
371 | bufferlist& get_payload() { return payload; } | |
372 | void set_payload(bufferlist& bl) { | |
373 | if (byte_throttler) | |
374 | byte_throttler->put(payload.length()); | |
375 | payload.claim(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); | |
376 | if (byte_throttler) | |
377 | byte_throttler->take(payload.length()); | |
378 | } | |
379 | ||
380 | void set_middle(bufferlist& bl) { | |
381 | if (byte_throttler) | |
382 | byte_throttler->put(middle.length()); | |
383 | middle.claim(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); | |
384 | if (byte_throttler) | |
385 | byte_throttler->take(middle.length()); | |
386 | } | |
387 | bufferlist& get_middle() { return middle; } | |
388 | ||
389 | void set_data(const bufferlist &bl) { | |
390 | if (byte_throttler) | |
391 | byte_throttler->put(data.length()); | |
392 | data.share(bl); | |
393 | if (byte_throttler) | |
394 | byte_throttler->take(data.length()); | |
395 | } | |
396 | ||
397 | const bufferlist& get_data() const { return data; } | |
398 | bufferlist& get_data() { return data; } | |
399 | void claim_data(bufferlist& bl, | |
400 | unsigned int flags = buffer::list::CLAIM_DEFAULT) { | |
401 | if (byte_throttler) | |
402 | byte_throttler->put(data.length()); | |
403 | bl.claim(data, flags); | |
404 | } | |
405 | off_t get_data_len() const { return data.length(); } | |
406 | ||
407 | void set_recv_stamp(utime_t t) { recv_stamp = t; } | |
408 | const utime_t& get_recv_stamp() const { return recv_stamp; } | |
409 | void set_dispatch_stamp(utime_t t) { dispatch_stamp = t; } | |
410 | const utime_t& get_dispatch_stamp() const { return dispatch_stamp; } | |
411 | void set_throttle_stamp(utime_t t) { throttle_stamp = t; } | |
412 | const utime_t& get_throttle_stamp() const { return throttle_stamp; } | |
413 | void set_recv_complete_stamp(utime_t t) { recv_complete_stamp = t; } | |
414 | const utime_t& get_recv_complete_stamp() const { return recv_complete_stamp; } | |
415 | ||
416 | void calc_header_crc() { | |
417 | header.crc = ceph_crc32c(0, (unsigned char*)&header, | |
418 | sizeof(header) - sizeof(header.crc)); | |
419 | } | |
420 | void calc_front_crc() { | |
421 | footer.front_crc = payload.crc32c(0); | |
422 | footer.middle_crc = middle.crc32c(0); | |
423 | } | |
424 | void calc_data_crc() { | |
425 | footer.data_crc = data.crc32c(0); | |
426 | } | |
427 | ||
428 | virtual int get_cost() const { | |
429 | return data.length(); | |
430 | } | |
431 | ||
432 | // type | |
433 | int get_type() const { return header.type; } | |
434 | void set_type(int t) { header.type = t; } | |
435 | ||
436 | uint64_t get_tid() const { return header.tid; } | |
437 | void set_tid(uint64_t t) { header.tid = t; } | |
438 | ||
439 | uint64_t get_seq() const { return header.seq; } | |
440 | void set_seq(uint64_t s) { header.seq = s; } | |
441 | ||
442 | unsigned get_priority() const { return header.priority; } | |
443 | void set_priority(__s16 p) { header.priority = p; } | |
444 | ||
445 | // source/dest | |
446 | entity_inst_t get_source_inst() const { | |
447 | return entity_inst_t(get_source(), get_source_addr()); | |
448 | } | |
449 | entity_name_t get_source() const { | |
450 | return entity_name_t(header.src); | |
451 | } | |
452 | entity_addr_t get_source_addr() const { | |
453 | if (connection) | |
454 | return connection->get_peer_addr(); | |
455 | return entity_addr_t(); | |
456 | } | |
457 | ||
458 | // forwarded? | |
459 | entity_inst_t get_orig_source_inst() const { | |
460 | return get_source_inst(); | |
461 | } | |
462 | entity_name_t get_orig_source() const { | |
463 | return get_source(); | |
464 | } | |
465 | entity_addr_t get_orig_source_addr() const { | |
466 | return get_source_addr(); | |
467 | } | |
468 | ||
469 | // virtual bits | |
470 | virtual void decode_payload() = 0; | |
471 | virtual void encode_payload(uint64_t features) = 0; | |
472 | virtual const char *get_type_name() const = 0; | |
473 | virtual void print(ostream& out) const { | |
474 | out << get_type_name() << " magic: " << magic; | |
475 | } | |
476 | ||
477 | virtual void dump(Formatter *f) const; | |
478 | ||
479 | void encode(uint64_t features, int crcflags); | |
480 | }; | |
481 | typedef boost::intrusive_ptr<Message> MessageRef; | |
482 | ||
483 | extern Message *decode_message(CephContext *cct, int crcflags, | |
484 | ceph_msg_header &header, | |
485 | ceph_msg_footer& footer, bufferlist& front, | |
486 | bufferlist& middle, bufferlist& data, | |
487 | Connection* conn); | |
488 | inline ostream& operator<<(ostream& out, const Message& m) { | |
489 | m.print(out); | |
490 | if (m.get_header().version) | |
491 | out << " v" << m.get_header().version; | |
492 | return out; | |
493 | } | |
494 | ||
495 | extern void encode_message(Message *m, uint64_t features, bufferlist& bl); | |
496 | extern Message *decode_message(CephContext *cct, int crcflags, | |
497 | bufferlist::iterator& bl); | |
498 | ||
499 | #endif |