]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef CEPH_MESSAGE_H | |
16 | #define CEPH_MESSAGE_H | |
17 | ||
18 | #include <stdlib.h> | |
19 | #include <ostream> | |
20 | ||
21 | #include <boost/intrusive_ptr.hpp> | |
22 | #include <boost/intrusive/list.hpp> | |
23 | // Because intrusive_ptr clobbers our assert... | |
24 | #include "include/assert.h" | |
25 | ||
26 | #include "include/types.h" | |
27 | #include "include/buffer.h" | |
28 | #include "common/Throttle.h" | |
29 | #include "common/zipkin_trace.h" | |
30 | #include "msg_types.h" | |
31 | ||
32 | #include "common/RefCountedObj.h" | |
33 | #include "msg/Connection.h" | |
34 | ||
35 | #include "common/debug.h" | |
36 | #include "common/config.h" | |
37 | ||
38 | // monitor internal | |
39 | #define MSG_MON_SCRUB 64 | |
40 | #define MSG_MON_ELECTION 65 | |
41 | #define MSG_MON_PAXOS 66 | |
42 | #define MSG_MON_PROBE 67 | |
43 | #define MSG_MON_JOIN 68 | |
44 | #define MSG_MON_SYNC 69 | |
45 | ||
46 | /* monitor <-> mon admin tool */ | |
47 | #define MSG_MON_COMMAND 50 | |
48 | #define MSG_MON_COMMAND_ACK 51 | |
49 | #define MSG_LOG 52 | |
50 | #define MSG_LOGACK 53 | |
51 | ||
52 | #define MSG_GETPOOLSTATS 58 | |
53 | #define MSG_GETPOOLSTATSREPLY 59 | |
54 | ||
55 | #define MSG_MON_GLOBAL_ID 60 | |
56 | ||
57 | #define MSG_ROUTE 47 | |
58 | #define MSG_FORWARD 46 | |
59 | ||
60 | #define MSG_PAXOS 40 | |
61 | ||
62 | ||
63 | // osd internal | |
64 | #define MSG_OSD_PING 70 | |
65 | #define MSG_OSD_BOOT 71 | |
66 | #define MSG_OSD_FAILURE 72 | |
67 | #define MSG_OSD_ALIVE 73 | |
68 | #define MSG_OSD_MARK_ME_DOWN 74 | |
69 | #define MSG_OSD_FULL 75 | |
70 | ||
71 | #define MSG_OSD_SUBOP 76 | |
72 | #define MSG_OSD_SUBOPREPLY 77 | |
73 | ||
74 | #define MSG_OSD_PGTEMP 78 | |
75 | ||
76 | #define MSG_OSD_BEACON 79 | |
77 | ||
78 | #define MSG_OSD_PG_NOTIFY 80 | |
79 | #define MSG_OSD_PG_QUERY 81 | |
80 | #define MSG_OSD_PG_LOG 83 | |
81 | #define MSG_OSD_PG_REMOVE 84 | |
82 | #define MSG_OSD_PG_INFO 85 | |
83 | #define MSG_OSD_PG_TRIM 86 | |
84 | ||
85 | #define MSG_PGSTATS 87 | |
86 | #define MSG_PGSTATSACK 88 | |
87 | ||
88 | #define MSG_OSD_PG_CREATE 89 | |
89 | #define MSG_REMOVE_SNAPS 90 | |
90 | ||
91 | #define MSG_OSD_SCRUB 91 | |
92 | #define MSG_OSD_SCRUB_RESERVE 92 // previous PG_MISSING | |
93 | #define MSG_OSD_REP_SCRUB 93 | |
94 | ||
95 | #define MSG_OSD_PG_SCAN 94 | |
96 | #define MSG_OSD_PG_BACKFILL 95 | |
97 | #define MSG_OSD_PG_BACKFILL_REMOVE 96 | |
98 | ||
99 | #define MSG_COMMAND 97 | |
100 | #define MSG_COMMAND_REPLY 98 | |
101 | ||
102 | #define MSG_OSD_BACKFILL_RESERVE 99 | |
103 | #define MSG_OSD_RECOVERY_RESERVE 150 | |
c07f9fc5 | 104 | #define MSG_OSD_FORCE_RECOVERY 151 |
7c673cae FG |
105 | |
106 | #define MSG_OSD_PG_PUSH 105 | |
107 | #define MSG_OSD_PG_PULL 106 | |
108 | #define MSG_OSD_PG_PUSH_REPLY 107 | |
109 | ||
110 | #define MSG_OSD_EC_WRITE 108 | |
111 | #define MSG_OSD_EC_WRITE_REPLY 109 | |
112 | #define MSG_OSD_EC_READ 110 | |
113 | #define MSG_OSD_EC_READ_REPLY 111 | |
114 | ||
115 | #define MSG_OSD_REPOP 112 | |
116 | #define MSG_OSD_REPOPREPLY 113 | |
117 | #define MSG_OSD_PG_UPDATE_LOG_MISSING 114 | |
118 | #define MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY 115 | |
119 | ||
120 | #define MSG_OSD_PG_CREATED 116 | |
121 | #define MSG_OSD_REP_SCRUBMAP 117 | |
c07f9fc5 FG |
122 | #define MSG_OSD_PG_RECOVERY_DELETE 118 |
123 | #define MSG_OSD_PG_RECOVERY_DELETE_REPLY 119 | |
7c673cae FG |
124 | |
125 | // *** MDS *** | |
126 | ||
127 | #define MSG_MDS_BEACON 100 // to monitor | |
128 | #define MSG_MDS_SLAVE_REQUEST 101 | |
129 | #define MSG_MDS_TABLE_REQUEST 102 | |
130 | ||
131 | // 150 already in use (MSG_OSD_RECOVERY_RESERVE) | |
132 | ||
133 | #define MSG_MDS_RESOLVE 0x200 | |
134 | #define MSG_MDS_RESOLVEACK 0x201 | |
135 | #define MSG_MDS_CACHEREJOIN 0x202 | |
136 | #define MSG_MDS_DISCOVER 0x203 | |
137 | #define MSG_MDS_DISCOVERREPLY 0x204 | |
138 | #define MSG_MDS_INODEUPDATE 0x205 | |
139 | #define MSG_MDS_DIRUPDATE 0x206 | |
140 | #define MSG_MDS_CACHEEXPIRE 0x207 | |
141 | #define MSG_MDS_DENTRYUNLINK 0x208 | |
142 | #define MSG_MDS_FRAGMENTNOTIFY 0x209 | |
143 | #define MSG_MDS_OFFLOAD_TARGETS 0x20a | |
144 | #define MSG_MDS_DENTRYLINK 0x20c | |
145 | #define MSG_MDS_FINDINO 0x20d | |
146 | #define MSG_MDS_FINDINOREPLY 0x20e | |
147 | #define MSG_MDS_OPENINO 0x20f | |
148 | #define MSG_MDS_OPENINOREPLY 0x210 | |
149 | ||
150 | #define MSG_MDS_LOCK 0x300 | |
151 | #define MSG_MDS_INODEFILECAPS 0x301 | |
152 | ||
153 | #define MSG_MDS_EXPORTDIRDISCOVER 0x449 | |
154 | #define MSG_MDS_EXPORTDIRDISCOVERACK 0x450 | |
155 | #define MSG_MDS_EXPORTDIRCANCEL 0x451 | |
156 | #define MSG_MDS_EXPORTDIRPREP 0x452 | |
157 | #define MSG_MDS_EXPORTDIRPREPACK 0x453 | |
158 | #define MSG_MDS_EXPORTDIRWARNING 0x454 | |
159 | #define MSG_MDS_EXPORTDIRWARNINGACK 0x455 | |
160 | #define MSG_MDS_EXPORTDIR 0x456 | |
161 | #define MSG_MDS_EXPORTDIRACK 0x457 | |
162 | #define MSG_MDS_EXPORTDIRNOTIFY 0x458 | |
163 | #define MSG_MDS_EXPORTDIRNOTIFYACK 0x459 | |
164 | #define MSG_MDS_EXPORTDIRFINISH 0x460 | |
165 | ||
166 | #define MSG_MDS_EXPORTCAPS 0x470 | |
167 | #define MSG_MDS_EXPORTCAPSACK 0x471 | |
168 | #define MSG_MDS_GATHERCAPS 0x472 | |
169 | ||
170 | #define MSG_MDS_HEARTBEAT 0x500 // for mds load balancer | |
171 | ||
172 | // *** generic *** | |
173 | #define MSG_TIMECHECK 0x600 | |
174 | #define MSG_MON_HEALTH 0x601 | |
175 | ||
176 | // *** Message::encode() crcflags bits *** | |
177 | #define MSG_CRC_DATA (1 << 0) | |
178 | #define MSG_CRC_HEADER (1 << 1) | |
179 | #define MSG_CRC_ALL (MSG_CRC_DATA | MSG_CRC_HEADER) | |
180 | ||
181 | // Xio Testing | |
182 | #define MSG_DATA_PING 0x602 | |
183 | ||
184 | // Xio intends to define messages 0x603..0x606 | |
185 | ||
186 | // Special | |
187 | #define MSG_NOP 0x607 | |
188 | ||
224ce89b WB |
189 | #define MSG_MON_HEALTH_CHECKS 0x608 |
190 | ||
7c673cae FG |
191 | // *** ceph-mgr <-> OSD/MDS daemons *** |
192 | #define MSG_MGR_OPEN 0x700 | |
193 | #define MSG_MGR_CONFIGURE 0x701 | |
194 | #define MSG_MGR_REPORT 0x702 | |
195 | ||
196 | // *** ceph-mgr <-> ceph-mon *** | |
197 | #define MSG_MGR_BEACON 0x703 | |
198 | ||
199 | // *** ceph-mon(MgrMonitor) -> OSD/MDS daemons *** | |
200 | #define MSG_MGR_MAP 0x704 | |
201 | ||
202 | // *** ceph-mon(MgrMonitor) -> ceph-mgr | |
203 | #define MSG_MGR_DIGEST 0x705 | |
31f18b77 FG |
204 | // *** cephmgr -> ceph-mon |
205 | #define MSG_MON_MGR_REPORT 0x706 | |
224ce89b | 206 | #define MSG_SERVICE_MAP 0x707 |
7c673cae FG |
207 | |
208 | // ====================================================== | |
209 | ||
210 | // abstract Message class | |
211 | ||
212 | namespace bi = boost::intrusive; | |
213 | ||
214 | // XioMessenger conditional trace flags | |
215 | #define MSG_MAGIC_XIO 0x0002 | |
216 | #define MSG_MAGIC_TRACE_XCON 0x0004 | |
217 | #define MSG_MAGIC_TRACE_DTOR 0x0008 | |
218 | #define MSG_MAGIC_TRACE_HDR 0x0010 | |
219 | #define MSG_MAGIC_TRACE_XIO 0x0020 | |
220 | #define MSG_MAGIC_TRACE_XMSGR 0x0040 | |
221 | #define MSG_MAGIC_TRACE_CTR 0x0080 | |
222 | ||
223 | // XioMessenger diagnostic "ping pong" flag (resend msg when send completes) | |
224 | #define MSG_MAGIC_REDUPE 0x0100 | |
225 | ||
226 | class Message : public RefCountedObject { | |
227 | protected: | |
228 | ceph_msg_header header; // headerelope | |
229 | ceph_msg_footer footer; | |
230 | bufferlist payload; // "front" unaligned blob | |
231 | bufferlist middle; // "middle" unaligned blob | |
232 | bufferlist data; // data payload (page-alignment will be preserved where possible) | |
233 | ||
234 | /* recv_stamp is set when the Messenger starts reading the | |
235 | * Message off the wire */ | |
236 | utime_t recv_stamp; | |
237 | /* dispatch_stamp is set when the Messenger starts calling dispatch() on | |
238 | * its endpoints */ | |
239 | utime_t dispatch_stamp; | |
240 | /* throttle_stamp is the point at which we got throttle */ | |
241 | utime_t throttle_stamp; | |
242 | /* time at which message was fully read */ | |
243 | utime_t recv_complete_stamp; | |
244 | ||
245 | ConnectionRef connection; | |
246 | ||
247 | uint32_t magic = 0; | |
248 | ||
249 | bi::list_member_hook<> dispatch_q; | |
250 | ||
251 | public: | |
252 | // zipkin tracing | |
253 | ZTracer::Trace trace; | |
254 | void encode_trace(bufferlist &bl, uint64_t features) const; | |
255 | void decode_trace(bufferlist::iterator &p, bool create = false); | |
256 | ||
257 | class CompletionHook : public Context { | |
258 | protected: | |
259 | Message *m; | |
260 | friend class Message; | |
261 | public: | |
262 | explicit CompletionHook(Message *_m) : m(_m) {} | |
263 | virtual void set_message(Message *_m) { m = _m; } | |
264 | }; | |
265 | ||
266 | typedef bi::list< Message, | |
267 | bi::member_hook< Message, | |
268 | bi::list_member_hook<>, | |
269 | &Message::dispatch_q > > Queue; | |
270 | ||
271 | protected: | |
272 | CompletionHook* completion_hook = nullptr; // owned by Messenger | |
273 | ||
274 | // release our size in bytes back to this throttler when our payload | |
275 | // is adjusted or when we are destroyed. | |
276 | Throttle *byte_throttler = nullptr; | |
277 | ||
278 | // release a count back to this throttler when we are destroyed | |
279 | Throttle *msg_throttler = nullptr; | |
280 | ||
281 | // keep track of how big this message was when we reserved space in | |
282 | // the msgr dispatch_throttler, so that we can properly release it | |
283 | // later. this is necessary because messages can enter the dispatch | |
284 | // queue locally (not via read_message()), and those are not | |
285 | // currently throttled. | |
286 | uint64_t dispatch_throttle_size = 0; | |
287 | ||
288 | friend class Messenger; | |
289 | ||
290 | public: | |
291 | Message() { | |
292 | memset(&header, 0, sizeof(header)); | |
293 | memset(&footer, 0, sizeof(footer)); | |
294 | } | |
295 | Message(int t, int version=1, int compat_version=0) { | |
296 | memset(&header, 0, sizeof(header)); | |
297 | header.type = t; | |
298 | header.version = version; | |
299 | header.compat_version = compat_version; | |
300 | header.priority = 0; // undef | |
301 | header.data_off = 0; | |
302 | memset(&footer, 0, sizeof(footer)); | |
303 | } | |
304 | ||
305 | Message *get() { | |
306 | return static_cast<Message *>(RefCountedObject::get()); | |
307 | } | |
308 | ||
309 | protected: | |
310 | ~Message() override { | |
311 | if (byte_throttler) | |
312 | byte_throttler->put(payload.length() + middle.length() + data.length()); | |
313 | release_message_throttle(); | |
314 | trace.event("message destructed"); | |
315 | /* call completion hooks (if any) */ | |
316 | if (completion_hook) | |
317 | completion_hook->complete(0); | |
318 | } | |
319 | public: | |
320 | const ConnectionRef& get_connection() const { return connection; } | |
321 | void set_connection(const ConnectionRef& c) { | |
322 | connection = c; | |
323 | } | |
324 | CompletionHook* get_completion_hook() { return completion_hook; } | |
325 | void set_completion_hook(CompletionHook *hook) { completion_hook = hook; } | |
326 | void set_byte_throttler(Throttle *t) { byte_throttler = t; } | |
327 | Throttle *get_byte_throttler() { return byte_throttler; } | |
328 | void set_message_throttler(Throttle *t) { msg_throttler = t; } | |
329 | Throttle *get_message_throttler() { return msg_throttler; } | |
330 | ||
331 | void set_dispatch_throttle_size(uint64_t s) { dispatch_throttle_size = s; } | |
332 | uint64_t get_dispatch_throttle_size() const { return dispatch_throttle_size; } | |
333 | ||
334 | const ceph_msg_header &get_header() const { return header; } | |
335 | ceph_msg_header &get_header() { return header; } | |
336 | void set_header(const ceph_msg_header &e) { header = e; } | |
337 | void set_footer(const ceph_msg_footer &e) { footer = e; } | |
338 | const ceph_msg_footer &get_footer() const { return footer; } | |
339 | ceph_msg_footer &get_footer() { return footer; } | |
340 | void set_src(const entity_name_t& src) { header.src = src; } | |
341 | ||
342 | uint32_t get_magic() const { return magic; } | |
343 | void set_magic(int _magic) { magic = _magic; } | |
344 | ||
345 | /* | |
346 | * If you use get_[data, middle, payload] you shouldn't | |
347 | * use it to change those bufferlists unless you KNOW | |
348 | * there is no throttle being used. The other | |
349 | * functions are throttling-aware as appropriate. | |
350 | */ | |
351 | ||
352 | void clear_payload() { | |
353 | if (byte_throttler) { | |
354 | byte_throttler->put(payload.length() + middle.length()); | |
355 | } | |
356 | payload.clear(); | |
357 | middle.clear(); | |
358 | } | |
359 | ||
360 | virtual void clear_buffers() {} | |
361 | void clear_data() { | |
362 | if (byte_throttler) | |
363 | byte_throttler->put(data.length()); | |
364 | data.clear(); | |
365 | clear_buffers(); // let subclass drop buffers as well | |
366 | } | |
367 | void release_message_throttle() { | |
368 | if (msg_throttler) | |
369 | msg_throttler->put(); | |
370 | msg_throttler = nullptr; | |
371 | } | |
372 | ||
373 | bool empty_payload() const { return payload.length() == 0; } | |
374 | bufferlist& get_payload() { return payload; } | |
375 | void set_payload(bufferlist& bl) { | |
376 | if (byte_throttler) | |
377 | byte_throttler->put(payload.length()); | |
378 | payload.claim(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); | |
379 | if (byte_throttler) | |
380 | byte_throttler->take(payload.length()); | |
381 | } | |
382 | ||
383 | void set_middle(bufferlist& bl) { | |
384 | if (byte_throttler) | |
385 | byte_throttler->put(middle.length()); | |
386 | middle.claim(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); | |
387 | if (byte_throttler) | |
388 | byte_throttler->take(middle.length()); | |
389 | } | |
390 | bufferlist& get_middle() { return middle; } | |
391 | ||
392 | void set_data(const bufferlist &bl) { | |
393 | if (byte_throttler) | |
394 | byte_throttler->put(data.length()); | |
395 | data.share(bl); | |
396 | if (byte_throttler) | |
397 | byte_throttler->take(data.length()); | |
398 | } | |
399 | ||
400 | const bufferlist& get_data() const { return data; } | |
401 | bufferlist& get_data() { return data; } | |
402 | void claim_data(bufferlist& bl, | |
403 | unsigned int flags = buffer::list::CLAIM_DEFAULT) { | |
404 | if (byte_throttler) | |
405 | byte_throttler->put(data.length()); | |
406 | bl.claim(data, flags); | |
407 | } | |
408 | off_t get_data_len() const { return data.length(); } | |
409 | ||
410 | void set_recv_stamp(utime_t t) { recv_stamp = t; } | |
411 | const utime_t& get_recv_stamp() const { return recv_stamp; } | |
412 | void set_dispatch_stamp(utime_t t) { dispatch_stamp = t; } | |
413 | const utime_t& get_dispatch_stamp() const { return dispatch_stamp; } | |
414 | void set_throttle_stamp(utime_t t) { throttle_stamp = t; } | |
415 | const utime_t& get_throttle_stamp() const { return throttle_stamp; } | |
416 | void set_recv_complete_stamp(utime_t t) { recv_complete_stamp = t; } | |
417 | const utime_t& get_recv_complete_stamp() const { return recv_complete_stamp; } | |
418 | ||
419 | void calc_header_crc() { | |
420 | header.crc = ceph_crc32c(0, (unsigned char*)&header, | |
421 | sizeof(header) - sizeof(header.crc)); | |
422 | } | |
423 | void calc_front_crc() { | |
424 | footer.front_crc = payload.crc32c(0); | |
425 | footer.middle_crc = middle.crc32c(0); | |
426 | } | |
427 | void calc_data_crc() { | |
428 | footer.data_crc = data.crc32c(0); | |
429 | } | |
430 | ||
431 | virtual int get_cost() const { | |
432 | return data.length(); | |
433 | } | |
434 | ||
435 | // type | |
436 | int get_type() const { return header.type; } | |
437 | void set_type(int t) { header.type = t; } | |
438 | ||
439 | uint64_t get_tid() const { return header.tid; } | |
440 | void set_tid(uint64_t t) { header.tid = t; } | |
441 | ||
442 | uint64_t get_seq() const { return header.seq; } | |
443 | void set_seq(uint64_t s) { header.seq = s; } | |
444 | ||
445 | unsigned get_priority() const { return header.priority; } | |
446 | void set_priority(__s16 p) { header.priority = p; } | |
447 | ||
448 | // source/dest | |
449 | entity_inst_t get_source_inst() const { | |
450 | return entity_inst_t(get_source(), get_source_addr()); | |
451 | } | |
452 | entity_name_t get_source() const { | |
453 | return entity_name_t(header.src); | |
454 | } | |
455 | entity_addr_t get_source_addr() const { | |
456 | if (connection) | |
457 | return connection->get_peer_addr(); | |
458 | return entity_addr_t(); | |
459 | } | |
460 | ||
461 | // forwarded? | |
462 | entity_inst_t get_orig_source_inst() const { | |
463 | return get_source_inst(); | |
464 | } | |
465 | entity_name_t get_orig_source() const { | |
466 | return get_source(); | |
467 | } | |
468 | entity_addr_t get_orig_source_addr() const { | |
469 | return get_source_addr(); | |
470 | } | |
471 | ||
472 | // virtual bits | |
473 | virtual void decode_payload() = 0; | |
474 | virtual void encode_payload(uint64_t features) = 0; | |
475 | virtual const char *get_type_name() const = 0; | |
476 | virtual void print(ostream& out) const { | |
477 | out << get_type_name() << " magic: " << magic; | |
478 | } | |
479 | ||
480 | virtual void dump(Formatter *f) const; | |
481 | ||
482 | void encode(uint64_t features, int crcflags); | |
483 | }; | |
484 | typedef boost::intrusive_ptr<Message> MessageRef; | |
485 | ||
486 | extern Message *decode_message(CephContext *cct, int crcflags, | |
487 | ceph_msg_header &header, | |
488 | ceph_msg_footer& footer, bufferlist& front, | |
489 | bufferlist& middle, bufferlist& data, | |
490 | Connection* conn); | |
491 | inline ostream& operator<<(ostream& out, const Message& m) { | |
492 | m.print(out); | |
493 | if (m.get_header().version) | |
494 | out << " v" << m.get_header().version; | |
495 | return out; | |
496 | } | |
497 | ||
498 | extern void encode_message(Message *m, uint64_t features, bufferlist& bl); | |
499 | extern Message *decode_message(CephContext *cct, int crcflags, | |
500 | bufferlist::iterator& bl); | |
501 | ||
502 | #endif |