1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_MESSAGE_H
16 #define CEPH_MESSAGE_H
20 #include <string_view>
22 #include <boost/intrusive/list.hpp>
24 #include "include/Context.h"
25 #include "common/RefCountedObj.h"
26 #include "common/ThrottleInterface.h"
27 #include "common/config.h"
28 #include "common/ref.h"
29 #include "common/debug.h"
30 #include "common/zipkin_trace.h"
31 #include "include/ceph_assert.h" // Because intrusive_ptr clobbers our assert...
32 #include "include/buffer.h"
33 #include "include/types.h"
34 #include "msg/Connection.h"
35 #include "msg/MessageRef.h"
36 #include "msg_types.h"
39 # include "crimson/net/SocketConnection.h"
40 #endif // WITH_SEASTAR
43 #define MSG_MON_SCRUB 64
44 #define MSG_MON_ELECTION 65
45 #define MSG_MON_PAXOS 66
46 #define MSG_MON_PROBE 67
47 #define MSG_MON_JOIN 68
48 #define MSG_MON_SYNC 69
49 #define MSG_MON_PING 140
51 /* monitor <-> mon admin tool */
52 #define MSG_MON_COMMAND 50
53 #define MSG_MON_COMMAND_ACK 51
57 #define MSG_GETPOOLSTATS 58
58 #define MSG_GETPOOLSTATSREPLY 59
60 #define MSG_MON_GLOBAL_ID 60
63 #define MSG_FORWARD 46
68 #define MSG_GET_CONFIG 63
70 #define MSG_KV_DATA 54
72 #define MSG_MON_GET_PURGED_SNAPS 76
73 #define MSG_MON_GET_PURGED_SNAPS_REPLY 77
76 #define MSG_OSD_PING 70
77 #define MSG_OSD_BOOT 71
78 #define MSG_OSD_FAILURE 72
79 #define MSG_OSD_ALIVE 73
80 #define MSG_OSD_MARK_ME_DOWN 74
81 #define MSG_OSD_FULL 75
82 #define MSG_OSD_MARK_ME_DEAD 123
84 // removed right after luminous
85 //#define MSG_OSD_SUBOP 76
86 //#define MSG_OSD_SUBOPREPLY 77
88 #define MSG_OSD_PGTEMP 78
90 #define MSG_OSD_BEACON 79
92 #define MSG_OSD_PG_NOTIFY 80
93 #define MSG_OSD_PG_NOTIFY2 130
94 #define MSG_OSD_PG_QUERY 81
95 #define MSG_OSD_PG_QUERY2 131
96 #define MSG_OSD_PG_LOG 83
97 #define MSG_OSD_PG_REMOVE 84
98 #define MSG_OSD_PG_INFO 85
99 #define MSG_OSD_PG_INFO2 132
100 #define MSG_OSD_PG_TRIM 86
102 #define MSG_PGSTATS 87
103 #define MSG_PGSTATSACK 88
105 #define MSG_OSD_PG_CREATE 89
106 #define MSG_REMOVE_SNAPS 90
108 #define MSG_OSD_SCRUB 91
109 #define MSG_OSD_SCRUB_RESERVE 92 // previous PG_MISSING
110 #define MSG_OSD_REP_SCRUB 93
112 #define MSG_OSD_PG_SCAN 94
113 #define MSG_OSD_PG_BACKFILL 95
114 #define MSG_OSD_PG_BACKFILL_REMOVE 96
116 #define MSG_COMMAND 97
117 #define MSG_COMMAND_REPLY 98
119 #define MSG_OSD_BACKFILL_RESERVE 99
120 #define MSG_OSD_RECOVERY_RESERVE 150
121 #define MSG_OSD_FORCE_RECOVERY 151
123 #define MSG_OSD_PG_PUSH 105
124 #define MSG_OSD_PG_PULL 106
125 #define MSG_OSD_PG_PUSH_REPLY 107
127 #define MSG_OSD_EC_WRITE 108
128 #define MSG_OSD_EC_WRITE_REPLY 109
129 #define MSG_OSD_EC_READ 110
130 #define MSG_OSD_EC_READ_REPLY 111
132 #define MSG_OSD_REPOP 112
133 #define MSG_OSD_REPOPREPLY 113
134 #define MSG_OSD_PG_UPDATE_LOG_MISSING 114
135 #define MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY 115
137 #define MSG_OSD_PG_CREATED 116
138 #define MSG_OSD_REP_SCRUBMAP 117
139 #define MSG_OSD_PG_RECOVERY_DELETE 118
140 #define MSG_OSD_PG_RECOVERY_DELETE_REPLY 119
141 #define MSG_OSD_PG_CREATE2 120
142 #define MSG_OSD_SCRUB2 121
144 #define MSG_OSD_PG_READY_TO_MERGE 122
146 #define MSG_OSD_PG_LEASE 133
147 #define MSG_OSD_PG_LEASE_ACK 134
151 #define MSG_MDS_BEACON 100 // to monitor
152 #define MSG_MDS_PEER_REQUEST 101
153 #define MSG_MDS_TABLE_REQUEST 102
154 #define MSG_MDS_SCRUB 135
156 // 150 already in use (MSG_OSD_RECOVERY_RESERVE)
158 #define MSG_MDS_RESOLVE 0x200 // 0x2xx are for mdcache of mds
159 #define MSG_MDS_RESOLVEACK 0x201
160 #define MSG_MDS_CACHEREJOIN 0x202
161 #define MSG_MDS_DISCOVER 0x203
162 #define MSG_MDS_DISCOVERREPLY 0x204
163 #define MSG_MDS_INODEUPDATE 0x205
164 #define MSG_MDS_DIRUPDATE 0x206
165 #define MSG_MDS_CACHEEXPIRE 0x207
166 #define MSG_MDS_DENTRYUNLINK 0x208
167 #define MSG_MDS_FRAGMENTNOTIFY 0x209
168 #define MSG_MDS_OFFLOAD_TARGETS 0x20a
169 #define MSG_MDS_DENTRYLINK 0x20c
170 #define MSG_MDS_FINDINO 0x20d
171 #define MSG_MDS_FINDINOREPLY 0x20e
172 #define MSG_MDS_OPENINO 0x20f
173 #define MSG_MDS_OPENINOREPLY 0x210
174 #define MSG_MDS_SNAPUPDATE 0x211
175 #define MSG_MDS_FRAGMENTNOTIFYACK 0x212
176 #define MSG_MDS_LOCK 0x300 // 0x3xx are for locker of mds
177 #define MSG_MDS_INODEFILECAPS 0x301
179 #define MSG_MDS_EXPORTDIRDISCOVER 0x449 // 0x4xx are for migrator of mds
180 #define MSG_MDS_EXPORTDIRDISCOVERACK 0x450
181 #define MSG_MDS_EXPORTDIRCANCEL 0x451
182 #define MSG_MDS_EXPORTDIRPREP 0x452
183 #define MSG_MDS_EXPORTDIRPREPACK 0x453
184 #define MSG_MDS_EXPORTDIRWARNING 0x454
185 #define MSG_MDS_EXPORTDIRWARNINGACK 0x455
186 #define MSG_MDS_EXPORTDIR 0x456
187 #define MSG_MDS_EXPORTDIRACK 0x457
188 #define MSG_MDS_EXPORTDIRNOTIFY 0x458
189 #define MSG_MDS_EXPORTDIRNOTIFYACK 0x459
190 #define MSG_MDS_EXPORTDIRFINISH 0x460
192 #define MSG_MDS_EXPORTCAPS 0x470
193 #define MSG_MDS_EXPORTCAPSACK 0x471
194 #define MSG_MDS_GATHERCAPS 0x472
196 #define MSG_MDS_HEARTBEAT 0x500 // for mds load balancer
197 #define MSG_MDS_METRICS 0x501 // for mds metric aggregator
198 #define MSG_MDS_PING 0x502 // for mds pinger
199 #define MSG_MDS_SCRUB_STATS 0x503 // for mds scrub stack
202 #define MSG_TIMECHECK 0x600
203 #define MSG_MON_HEALTH 0x601
205 // *** Message::encode() crcflags bits ***
206 #define MSG_CRC_DATA (1 << 0)
207 #define MSG_CRC_HEADER (1 << 1)
208 #define MSG_CRC_ALL (MSG_CRC_DATA | MSG_CRC_HEADER)
212 #define MSG_NOP 0x607
214 #define MSG_MON_HEALTH_CHECKS 0x608
215 #define MSG_TIMECHECK2 0x609
217 // *** ceph-mgr <-> OSD/MDS daemons ***
218 #define MSG_MGR_OPEN 0x700
219 #define MSG_MGR_CONFIGURE 0x701
220 #define MSG_MGR_REPORT 0x702
222 // *** ceph-mgr <-> ceph-mon ***
223 #define MSG_MGR_BEACON 0x703
225 // *** ceph-mon(MgrMonitor) -> OSD/MDS daemons ***
226 #define MSG_MGR_MAP 0x704
228 // *** ceph-mon(MgrMonitor) -> ceph-mgr
229 #define MSG_MGR_DIGEST 0x705
230 // *** cephmgr -> ceph-mon
231 #define MSG_MON_MGR_REPORT 0x706
232 #define MSG_SERVICE_MAP 0x707
234 #define MSG_MGR_CLOSE 0x708
235 #define MSG_MGR_COMMAND 0x709
236 #define MSG_MGR_COMMAND_REPLY 0x70a
238 // *** ceph-mgr <-> MON daemons ***
239 #define MSG_MGR_UPDATE 0x70b
241 // ======================================================
243 // abstract Message class
245 class Message
: public RefCountedObject
{
248 using ConnectionRef
= crimson::net::ConnectionRef
;
250 using ConnectionRef
= ::ConnectionRef
;
251 #endif // WITH_SEASTAR
254 ceph_msg_header header
; // headerelope
255 ceph_msg_footer footer
;
256 ceph::buffer::list payload
; // "front" unaligned blob
257 ceph::buffer::list middle
; // "middle" unaligned blob
258 ceph::buffer::list data
; // data payload (page-alignment will be preserved where possible)
260 /* recv_stamp is set when the Messenger starts reading the
261 * Message off the wire */
263 /* dispatch_stamp is set when the Messenger starts calling dispatch() on
265 utime_t dispatch_stamp
;
266 /* throttle_stamp is the point at which we got throttle */
267 utime_t throttle_stamp
;
268 /* time at which message was fully read */
269 utime_t recv_complete_stamp
;
271 ConnectionRef connection
;
275 boost::intrusive::list_member_hook
<> dispatch_q
;
279 ZTracer::Trace trace
;
280 void encode_trace(ceph::buffer::list
&bl
, uint64_t features
) const;
281 void decode_trace(ceph::buffer::list::const_iterator
&p
, bool create
= false);
283 class CompletionHook
: public Context
{
286 friend class Message
;
288 explicit CompletionHook(Message
*_m
) : m(_m
) {}
289 virtual void set_message(Message
*_m
) { m
= _m
; }
292 typedef boost::intrusive::list
<Message
,
293 boost::intrusive::member_hook
<
295 boost::intrusive::list_member_hook
<>,
296 &Message::dispatch_q
>> Queue
;
298 ceph::mono_time queue_start
;
300 CompletionHook
* completion_hook
= nullptr; // owned by Messenger
302 // release our size in bytes back to this throttler when our payload
303 // is adjusted or when we are destroyed.
304 ThrottleInterface
*byte_throttler
= nullptr;
306 // release a count back to this throttler when we are destroyed
307 ThrottleInterface
*msg_throttler
= nullptr;
309 // keep track of how big this message was when we reserved space in
310 // the msgr dispatch_throttler, so that we can properly release it
311 // later. this is necessary because messages can enter the dispatch
312 // queue locally (not via read_message()), and those are not
313 // currently throttled.
314 uint64_t dispatch_throttle_size
= 0;
316 friend class Messenger
;
320 memset(&header
, 0, sizeof(header
));
321 memset(&footer
, 0, sizeof(footer
));
323 Message(int t
, int version
=1, int compat_version
=0) {
324 memset(&header
, 0, sizeof(header
));
326 header
.version
= version
;
327 header
.compat_version
= compat_version
;
328 memset(&footer
, 0, sizeof(footer
));
332 return static_cast<Message
*>(RefCountedObject::get());
336 ~Message() override
{
338 byte_throttler
->put(payload
.length() + middle
.length() + data
.length());
339 release_message_throttle();
340 trace
.event("message destructed");
341 /* call completion hooks (if any) */
343 completion_hook
->complete(0);
346 const ConnectionRef
& get_connection() const { return connection
; }
347 void set_connection(ConnectionRef c
) {
348 connection
= std::move(c
);
350 CompletionHook
* get_completion_hook() { return completion_hook
; }
351 void set_completion_hook(CompletionHook
*hook
) { completion_hook
= hook
; }
352 void set_byte_throttler(ThrottleInterface
*t
) {
355 void set_message_throttler(ThrottleInterface
*t
) {
359 void set_dispatch_throttle_size(uint64_t s
) { dispatch_throttle_size
= s
; }
360 uint64_t get_dispatch_throttle_size() const { return dispatch_throttle_size
; }
362 const ceph_msg_header
&get_header() const { return header
; }
363 ceph_msg_header
&get_header() { return header
; }
364 void set_header(const ceph_msg_header
&e
) { header
= e
; }
365 void set_footer(const ceph_msg_footer
&e
) { footer
= e
; }
366 const ceph_msg_footer
&get_footer() const { return footer
; }
367 ceph_msg_footer
&get_footer() { return footer
; }
368 void set_src(const entity_name_t
& src
) { header
.src
= src
; }
370 uint32_t get_magic() const { return magic
; }
371 void set_magic(int _magic
) { magic
= _magic
; }
374 * If you use get_[data, middle, payload] you shouldn't
375 * use it to change those ceph::buffer::lists unless you KNOW
376 * there is no throttle being used. The other
377 * functions are throttling-aware as appropriate.
380 void clear_payload() {
381 if (byte_throttler
) {
382 byte_throttler
->put(payload
.length() + middle
.length());
388 virtual void clear_buffers() {}
391 byte_throttler
->put(data
.length());
393 clear_buffers(); // let subclass drop buffers as well
395 void release_message_throttle() {
397 msg_throttler
->put();
398 msg_throttler
= nullptr;
401 bool empty_payload() const { return payload
.length() == 0; }
402 ceph::buffer::list
& get_payload() { return payload
; }
403 const ceph::buffer::list
& get_payload() const { return payload
; }
404 void set_payload(ceph::buffer::list
& bl
) {
406 byte_throttler
->put(payload
.length());
407 payload
= std::move(bl
);
409 byte_throttler
->take(payload
.length());
412 void set_middle(ceph::buffer::list
& bl
) {
414 byte_throttler
->put(middle
.length());
415 middle
= std::move(bl
);
417 byte_throttler
->take(middle
.length());
419 ceph::buffer::list
& get_middle() { return middle
; }
421 void set_data(const ceph::buffer::list
&bl
) {
423 byte_throttler
->put(data
.length());
426 byte_throttler
->take(data
.length());
429 const ceph::buffer::list
& get_data() const { return data
; }
430 ceph::buffer::list
& get_data() { return data
; }
431 void claim_data(ceph::buffer::list
& bl
) {
433 byte_throttler
->put(data
.length());
434 bl
= std::move(data
);
436 uint32_t get_data_len() const { return data
.length(); }
438 void set_recv_stamp(utime_t t
) { recv_stamp
= t
; }
439 const utime_t
& get_recv_stamp() const { return recv_stamp
; }
440 void set_dispatch_stamp(utime_t t
) { dispatch_stamp
= t
; }
441 const utime_t
& get_dispatch_stamp() const { return dispatch_stamp
; }
442 void set_throttle_stamp(utime_t t
) { throttle_stamp
= t
; }
443 const utime_t
& get_throttle_stamp() const { return throttle_stamp
; }
444 void set_recv_complete_stamp(utime_t t
) { recv_complete_stamp
= t
; }
445 const utime_t
& get_recv_complete_stamp() const { return recv_complete_stamp
; }
447 void calc_header_crc() {
448 header
.crc
= ceph_crc32c(0, (unsigned char*)&header
,
449 sizeof(header
) - sizeof(header
.crc
));
451 void calc_front_crc() {
452 footer
.front_crc
= payload
.crc32c(0);
453 footer
.middle_crc
= middle
.crc32c(0);
455 void calc_data_crc() {
456 footer
.data_crc
= data
.crc32c(0);
459 virtual int get_cost() const {
460 return data
.length();
464 int get_type() const { return header
.type
; }
465 void set_type(int t
) { header
.type
= t
; }
467 uint64_t get_tid() const { return header
.tid
; }
468 void set_tid(uint64_t t
) { header
.tid
= t
; }
470 uint64_t get_seq() const { return header
.seq
; }
471 void set_seq(uint64_t s
) { header
.seq
= s
; }
473 unsigned get_priority() const { return header
.priority
; }
474 void set_priority(__s16 p
) { header
.priority
= p
; }
477 entity_inst_t
get_source_inst() const {
478 return entity_inst_t(get_source(), get_source_addr());
480 entity_name_t
get_source() const {
481 return entity_name_t(header
.src
);
483 entity_addr_t
get_source_addr() const {
485 return connection
->get_peer_addr();
486 return entity_addr_t();
488 entity_addrvec_t
get_source_addrs() const {
490 return connection
->get_peer_addrs();
491 return entity_addrvec_t();
495 entity_inst_t
get_orig_source_inst() const {
496 return get_source_inst();
498 entity_name_t
get_orig_source() const {
501 entity_addr_t
get_orig_source_addr() const {
502 return get_source_addr();
504 entity_addrvec_t
get_orig_source_addrs() const {
505 return get_source_addrs();
509 virtual void decode_payload() = 0;
510 virtual void encode_payload(uint64_t features
) = 0;
511 virtual std::string_view
get_type_name() const = 0;
512 virtual void print(std::ostream
& out
) const {
513 out
<< get_type_name() << " magic: " << magic
;
516 virtual void dump(ceph::Formatter
*f
) const;
518 void encode(uint64_t features
, int crcflags
, bool skip_header_crc
= false);
521 extern Message
*decode_message(CephContext
*cct
,
523 ceph_msg_header
& header
,
524 ceph_msg_footer
& footer
,
525 ceph::buffer::list
& front
,
526 ceph::buffer::list
& middle
,
527 ceph::buffer::list
& data
,
528 Message::ConnectionRef conn
);
529 inline std::ostream
& operator<<(std::ostream
& out
, const Message
& m
) {
531 if (m
.get_header().version
)
532 out
<< " v" << m
.get_header().version
;
536 extern void encode_message(Message
*m
, uint64_t features
, ceph::buffer::list
& bl
);
537 extern Message
*decode_message(CephContext
*cct
, int crcflags
,
538 ceph::buffer::list::const_iterator
& bl
);
540 /// this is a "safe" version of Message. it does not allow calling get/put
541 /// methods on its derived classes. This is intended to prevent some accidental
542 /// reference leaks by forcing . Instead, you must either cast the derived class to a
543 /// RefCountedObject to do the get/put or detach a temporary reference.
544 class SafeMessage
: public Message
{
546 using Message::Message
;
548 using RefCountedObject::get
;
549 using RefCountedObject::put
;
553 template<class T
, typename
... Args
>
554 ceph::ref_t
<T
> make_message(Args
&&... args
) {
555 return {new T(std::forward
<Args
>(args
)...), false};
560 template<class T
, typename
... Args
>
561 MURef
<T
> make_message(Args
&&... args
) {
562 return {new T(std::forward
<Args
>(args
)...), TOPNSPC::common::UniquePtrDeleter
{}};