1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_MESSAGE_H
16 #define CEPH_MESSAGE_H
21 #include <boost/intrusive_ptr.hpp>
22 #include <boost/intrusive/list.hpp>
23 // Because intrusive_ptr clobbers our assert...
24 #include "include/assert.h"
26 #include "include/types.h"
27 #include "include/buffer.h"
28 #include "common/Throttle.h"
29 #include "common/zipkin_trace.h"
30 #include "msg_types.h"
32 #include "common/RefCountedObj.h"
33 #include "msg/Connection.h"
35 #include "common/debug.h"
36 #include "common/config.h"
39 #define MSG_MON_SCRUB 64
40 #define MSG_MON_ELECTION 65
41 #define MSG_MON_PAXOS 66
42 #define MSG_MON_PROBE 67
43 #define MSG_MON_JOIN 68
44 #define MSG_MON_SYNC 69
46 /* monitor <-> mon admin tool */
47 #define MSG_MON_COMMAND 50
48 #define MSG_MON_COMMAND_ACK 51
52 #define MSG_GETPOOLSTATS 58
53 #define MSG_GETPOOLSTATSREPLY 59
55 #define MSG_MON_GLOBAL_ID 60
58 #define MSG_FORWARD 46
64 #define MSG_OSD_PING 70
65 #define MSG_OSD_BOOT 71
66 #define MSG_OSD_FAILURE 72
67 #define MSG_OSD_ALIVE 73
68 #define MSG_OSD_MARK_ME_DOWN 74
69 #define MSG_OSD_FULL 75
71 #define MSG_OSD_SUBOP 76
72 #define MSG_OSD_SUBOPREPLY 77
74 #define MSG_OSD_PGTEMP 78
76 #define MSG_OSD_BEACON 79
78 #define MSG_OSD_PG_NOTIFY 80
79 #define MSG_OSD_PG_QUERY 81
80 #define MSG_OSD_PG_LOG 83
81 #define MSG_OSD_PG_REMOVE 84
82 #define MSG_OSD_PG_INFO 85
83 #define MSG_OSD_PG_TRIM 86
85 #define MSG_PGSTATS 87
86 #define MSG_PGSTATSACK 88
88 #define MSG_OSD_PG_CREATE 89
89 #define MSG_REMOVE_SNAPS 90
91 #define MSG_OSD_SCRUB 91
92 #define MSG_OSD_SCRUB_RESERVE 92 // previous PG_MISSING
93 #define MSG_OSD_REP_SCRUB 93
95 #define MSG_OSD_PG_SCAN 94
96 #define MSG_OSD_PG_BACKFILL 95
97 #define MSG_OSD_PG_BACKFILL_REMOVE 96
99 #define MSG_COMMAND 97
100 #define MSG_COMMAND_REPLY 98
102 #define MSG_OSD_BACKFILL_RESERVE 99
103 #define MSG_OSD_RECOVERY_RESERVE 150
104 #define MSG_OSD_FORCE_RECOVERY 151
106 #define MSG_OSD_PG_PUSH 105
107 #define MSG_OSD_PG_PULL 106
108 #define MSG_OSD_PG_PUSH_REPLY 107
110 #define MSG_OSD_EC_WRITE 108
111 #define MSG_OSD_EC_WRITE_REPLY 109
112 #define MSG_OSD_EC_READ 110
113 #define MSG_OSD_EC_READ_REPLY 111
115 #define MSG_OSD_REPOP 112
116 #define MSG_OSD_REPOPREPLY 113
117 #define MSG_OSD_PG_UPDATE_LOG_MISSING 114
118 #define MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY 115
120 #define MSG_OSD_PG_CREATED 116
121 #define MSG_OSD_REP_SCRUBMAP 117
122 #define MSG_OSD_PG_RECOVERY_DELETE 118
123 #define MSG_OSD_PG_RECOVERY_DELETE_REPLY 119
127 #define MSG_MDS_BEACON 100 // to monitor
128 #define MSG_MDS_SLAVE_REQUEST 101
129 #define MSG_MDS_TABLE_REQUEST 102
131 // 150 already in use (MSG_OSD_RECOVERY_RESERVE)
133 #define MSG_MDS_RESOLVE 0x200
134 #define MSG_MDS_RESOLVEACK 0x201
135 #define MSG_MDS_CACHEREJOIN 0x202
136 #define MSG_MDS_DISCOVER 0x203
137 #define MSG_MDS_DISCOVERREPLY 0x204
138 #define MSG_MDS_INODEUPDATE 0x205
139 #define MSG_MDS_DIRUPDATE 0x206
140 #define MSG_MDS_CACHEEXPIRE 0x207
141 #define MSG_MDS_DENTRYUNLINK 0x208
142 #define MSG_MDS_FRAGMENTNOTIFY 0x209
143 #define MSG_MDS_OFFLOAD_TARGETS 0x20a
144 #define MSG_MDS_DENTRYLINK 0x20c
145 #define MSG_MDS_FINDINO 0x20d
146 #define MSG_MDS_FINDINOREPLY 0x20e
147 #define MSG_MDS_OPENINO 0x20f
148 #define MSG_MDS_OPENINOREPLY 0x210
150 #define MSG_MDS_LOCK 0x300
151 #define MSG_MDS_INODEFILECAPS 0x301
153 #define MSG_MDS_EXPORTDIRDISCOVER 0x449
154 #define MSG_MDS_EXPORTDIRDISCOVERACK 0x450
155 #define MSG_MDS_EXPORTDIRCANCEL 0x451
156 #define MSG_MDS_EXPORTDIRPREP 0x452
157 #define MSG_MDS_EXPORTDIRPREPACK 0x453
158 #define MSG_MDS_EXPORTDIRWARNING 0x454
159 #define MSG_MDS_EXPORTDIRWARNINGACK 0x455
160 #define MSG_MDS_EXPORTDIR 0x456
161 #define MSG_MDS_EXPORTDIRACK 0x457
162 #define MSG_MDS_EXPORTDIRNOTIFY 0x458
163 #define MSG_MDS_EXPORTDIRNOTIFYACK 0x459
164 #define MSG_MDS_EXPORTDIRFINISH 0x460
166 #define MSG_MDS_EXPORTCAPS 0x470
167 #define MSG_MDS_EXPORTCAPSACK 0x471
168 #define MSG_MDS_GATHERCAPS 0x472
170 #define MSG_MDS_HEARTBEAT 0x500 // for mds load balancer
173 #define MSG_TIMECHECK 0x600
174 #define MSG_MON_HEALTH 0x601
176 // *** Message::encode() crcflags bits ***
177 #define MSG_CRC_DATA (1 << 0)
178 #define MSG_CRC_HEADER (1 << 1)
179 #define MSG_CRC_ALL (MSG_CRC_DATA | MSG_CRC_HEADER)
182 #define MSG_DATA_PING 0x602
184 // Xio intends to define messages 0x603..0x606
187 #define MSG_NOP 0x607
189 #define MSG_MON_HEALTH_CHECKS 0x608
191 // *** ceph-mgr <-> OSD/MDS daemons ***
192 #define MSG_MGR_OPEN 0x700
193 #define MSG_MGR_CONFIGURE 0x701
194 #define MSG_MGR_REPORT 0x702
196 // *** ceph-mgr <-> ceph-mon ***
197 #define MSG_MGR_BEACON 0x703
199 // *** ceph-mon(MgrMonitor) -> OSD/MDS daemons ***
200 #define MSG_MGR_MAP 0x704
202 // *** ceph-mon(MgrMonitor) -> ceph-mgr
203 #define MSG_MGR_DIGEST 0x705
204 // *** cephmgr -> ceph-mon
205 #define MSG_MON_MGR_REPORT 0x706
206 #define MSG_SERVICE_MAP 0x707
208 // ======================================================
210 // abstract Message class
212 namespace bi
= boost::intrusive
;
214 // XioMessenger conditional trace flags
215 #define MSG_MAGIC_XIO 0x0002
216 #define MSG_MAGIC_TRACE_XCON 0x0004
217 #define MSG_MAGIC_TRACE_DTOR 0x0008
218 #define MSG_MAGIC_TRACE_HDR 0x0010
219 #define MSG_MAGIC_TRACE_XIO 0x0020
220 #define MSG_MAGIC_TRACE_XMSGR 0x0040
221 #define MSG_MAGIC_TRACE_CTR 0x0080
223 // XioMessenger diagnostic "ping pong" flag (resend msg when send completes)
224 #define MSG_MAGIC_REDUPE 0x0100
226 class Message
: public RefCountedObject
{
228 ceph_msg_header header
; // headerelope
229 ceph_msg_footer footer
;
230 bufferlist payload
; // "front" unaligned blob
231 bufferlist middle
; // "middle" unaligned blob
232 bufferlist data
; // data payload (page-alignment will be preserved where possible)
234 /* recv_stamp is set when the Messenger starts reading the
235 * Message off the wire */
237 /* dispatch_stamp is set when the Messenger starts calling dispatch() on
239 utime_t dispatch_stamp
;
240 /* throttle_stamp is the point at which we got throttle */
241 utime_t throttle_stamp
;
242 /* time at which message was fully read */
243 utime_t recv_complete_stamp
;
245 ConnectionRef connection
;
249 bi::list_member_hook
<> dispatch_q
;
253 ZTracer::Trace trace
;
254 void encode_trace(bufferlist
&bl
, uint64_t features
) const;
255 void decode_trace(bufferlist::iterator
&p
, bool create
= false);
257 class CompletionHook
: public Context
{
260 friend class Message
;
262 explicit CompletionHook(Message
*_m
) : m(_m
) {}
263 virtual void set_message(Message
*_m
) { m
= _m
; }
266 typedef bi::list
< Message
,
267 bi::member_hook
< Message
,
268 bi::list_member_hook
<>,
269 &Message::dispatch_q
> > Queue
;
272 CompletionHook
* completion_hook
= nullptr; // owned by Messenger
274 // release our size in bytes back to this throttler when our payload
275 // is adjusted or when we are destroyed.
276 Throttle
*byte_throttler
= nullptr;
278 // release a count back to this throttler when we are destroyed
279 Throttle
*msg_throttler
= nullptr;
281 // keep track of how big this message was when we reserved space in
282 // the msgr dispatch_throttler, so that we can properly release it
283 // later. this is necessary because messages can enter the dispatch
284 // queue locally (not via read_message()), and those are not
285 // currently throttled.
286 uint64_t dispatch_throttle_size
= 0;
288 friend class Messenger
;
292 memset(&header
, 0, sizeof(header
));
293 memset(&footer
, 0, sizeof(footer
));
295 Message(int t
, int version
=1, int compat_version
=0) {
296 memset(&header
, 0, sizeof(header
));
298 header
.version
= version
;
299 header
.compat_version
= compat_version
;
300 header
.priority
= 0; // undef
302 memset(&footer
, 0, sizeof(footer
));
306 return static_cast<Message
*>(RefCountedObject::get());
310 ~Message() override
{
312 byte_throttler
->put(payload
.length() + middle
.length() + data
.length());
313 release_message_throttle();
314 trace
.event("message destructed");
315 /* call completion hooks (if any) */
317 completion_hook
->complete(0);
320 const ConnectionRef
& get_connection() const { return connection
; }
321 void set_connection(const ConnectionRef
& c
) {
324 CompletionHook
* get_completion_hook() { return completion_hook
; }
325 void set_completion_hook(CompletionHook
*hook
) { completion_hook
= hook
; }
326 void set_byte_throttler(Throttle
*t
) { byte_throttler
= t
; }
327 Throttle
*get_byte_throttler() { return byte_throttler
; }
328 void set_message_throttler(Throttle
*t
) { msg_throttler
= t
; }
329 Throttle
*get_message_throttler() { return msg_throttler
; }
331 void set_dispatch_throttle_size(uint64_t s
) { dispatch_throttle_size
= s
; }
332 uint64_t get_dispatch_throttle_size() const { return dispatch_throttle_size
; }
334 const ceph_msg_header
&get_header() const { return header
; }
335 ceph_msg_header
&get_header() { return header
; }
336 void set_header(const ceph_msg_header
&e
) { header
= e
; }
337 void set_footer(const ceph_msg_footer
&e
) { footer
= e
; }
338 const ceph_msg_footer
&get_footer() const { return footer
; }
339 ceph_msg_footer
&get_footer() { return footer
; }
340 void set_src(const entity_name_t
& src
) { header
.src
= src
; }
342 uint32_t get_magic() const { return magic
; }
343 void set_magic(int _magic
) { magic
= _magic
; }
346 * If you use get_[data, middle, payload] you shouldn't
347 * use it to change those bufferlists unless you KNOW
348 * there is no throttle being used. The other
349 * functions are throttling-aware as appropriate.
352 void clear_payload() {
353 if (byte_throttler
) {
354 byte_throttler
->put(payload
.length() + middle
.length());
360 virtual void clear_buffers() {}
363 byte_throttler
->put(data
.length());
365 clear_buffers(); // let subclass drop buffers as well
367 void release_message_throttle() {
369 msg_throttler
->put();
370 msg_throttler
= nullptr;
373 bool empty_payload() const { return payload
.length() == 0; }
374 bufferlist
& get_payload() { return payload
; }
375 void set_payload(bufferlist
& bl
) {
377 byte_throttler
->put(payload
.length());
378 payload
.claim(bl
, buffer::list::CLAIM_ALLOW_NONSHAREABLE
);
380 byte_throttler
->take(payload
.length());
383 void set_middle(bufferlist
& bl
) {
385 byte_throttler
->put(middle
.length());
386 middle
.claim(bl
, buffer::list::CLAIM_ALLOW_NONSHAREABLE
);
388 byte_throttler
->take(middle
.length());
390 bufferlist
& get_middle() { return middle
; }
392 void set_data(const bufferlist
&bl
) {
394 byte_throttler
->put(data
.length());
397 byte_throttler
->take(data
.length());
400 const bufferlist
& get_data() const { return data
; }
401 bufferlist
& get_data() { return data
; }
402 void claim_data(bufferlist
& bl
,
403 unsigned int flags
= buffer::list::CLAIM_DEFAULT
) {
405 byte_throttler
->put(data
.length());
406 bl
.claim(data
, flags
);
408 off_t
get_data_len() const { return data
.length(); }
410 void set_recv_stamp(utime_t t
) { recv_stamp
= t
; }
411 const utime_t
& get_recv_stamp() const { return recv_stamp
; }
412 void set_dispatch_stamp(utime_t t
) { dispatch_stamp
= t
; }
413 const utime_t
& get_dispatch_stamp() const { return dispatch_stamp
; }
414 void set_throttle_stamp(utime_t t
) { throttle_stamp
= t
; }
415 const utime_t
& get_throttle_stamp() const { return throttle_stamp
; }
416 void set_recv_complete_stamp(utime_t t
) { recv_complete_stamp
= t
; }
417 const utime_t
& get_recv_complete_stamp() const { return recv_complete_stamp
; }
419 void calc_header_crc() {
420 header
.crc
= ceph_crc32c(0, (unsigned char*)&header
,
421 sizeof(header
) - sizeof(header
.crc
));
423 void calc_front_crc() {
424 footer
.front_crc
= payload
.crc32c(0);
425 footer
.middle_crc
= middle
.crc32c(0);
427 void calc_data_crc() {
428 footer
.data_crc
= data
.crc32c(0);
431 virtual int get_cost() const {
432 return data
.length();
436 int get_type() const { return header
.type
; }
437 void set_type(int t
) { header
.type
= t
; }
439 uint64_t get_tid() const { return header
.tid
; }
440 void set_tid(uint64_t t
) { header
.tid
= t
; }
442 uint64_t get_seq() const { return header
.seq
; }
443 void set_seq(uint64_t s
) { header
.seq
= s
; }
445 unsigned get_priority() const { return header
.priority
; }
446 void set_priority(__s16 p
) { header
.priority
= p
; }
449 entity_inst_t
get_source_inst() const {
450 return entity_inst_t(get_source(), get_source_addr());
452 entity_name_t
get_source() const {
453 return entity_name_t(header
.src
);
455 entity_addr_t
get_source_addr() const {
457 return connection
->get_peer_addr();
458 return entity_addr_t();
462 entity_inst_t
get_orig_source_inst() const {
463 return get_source_inst();
465 entity_name_t
get_orig_source() const {
468 entity_addr_t
get_orig_source_addr() const {
469 return get_source_addr();
473 virtual void decode_payload() = 0;
474 virtual void encode_payload(uint64_t features
) = 0;
475 virtual const char *get_type_name() const = 0;
476 virtual void print(ostream
& out
) const {
477 out
<< get_type_name() << " magic: " << magic
;
480 virtual void dump(Formatter
*f
) const;
482 void encode(uint64_t features
, int crcflags
);
484 typedef boost::intrusive_ptr
<Message
> MessageRef
;
486 extern Message
*decode_message(CephContext
*cct
, int crcflags
,
487 ceph_msg_header
&header
,
488 ceph_msg_footer
& footer
, bufferlist
& front
,
489 bufferlist
& middle
, bufferlist
& data
,
491 inline ostream
& operator<<(ostream
& out
, const Message
& m
) {
493 if (m
.get_header().version
)
494 out
<< " v" << m
.get_header().version
;
498 extern void encode_message(Message
*m
, uint64_t features
, bufferlist
& bl
);
499 extern Message
*decode_message(CephContext
*cct
, int crcflags
,
500 bufferlist::iterator
& bl
);