]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/Message.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / msg / Message.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_MESSAGE_H
16 #define CEPH_MESSAGE_H
17
18 #include <stdlib.h>
19 #include <ostream>
20 #include <string_view>
21
22 #include <boost/intrusive/list.hpp>
23
24 #include "include/Context.h"
25 #include "common/RefCountedObj.h"
26 #include "common/ThrottleInterface.h"
27 #include "common/config.h"
28 #include "common/debug.h"
29 #include "common/zipkin_trace.h"
30 #include "include/ceph_assert.h" // Because intrusive_ptr clobbers our assert...
31 #include "include/buffer.h"
32 #include "include/types.h"
33 #include "msg/Connection.h"
34 #include "msg/MessageRef.h"
35 #include "msg_types.h"
36
37 // monitor internal
38 #define MSG_MON_SCRUB 64
39 #define MSG_MON_ELECTION 65
40 #define MSG_MON_PAXOS 66
41 #define MSG_MON_PROBE 67
42 #define MSG_MON_JOIN 68
43 #define MSG_MON_SYNC 69
44
45 /* monitor <-> mon admin tool */
46 #define MSG_MON_COMMAND 50
47 #define MSG_MON_COMMAND_ACK 51
48 #define MSG_LOG 52
49 #define MSG_LOGACK 53
50
51 #define MSG_GETPOOLSTATS 58
52 #define MSG_GETPOOLSTATSREPLY 59
53
54 #define MSG_MON_GLOBAL_ID 60
55
56 #define MSG_ROUTE 47
57 #define MSG_FORWARD 46
58
59 #define MSG_PAXOS 40
60
61 #define MSG_CONFIG 62
62 #define MSG_GET_CONFIG 63
63
64
65 // osd internal
66 #define MSG_OSD_PING 70
67 #define MSG_OSD_BOOT 71
68 #define MSG_OSD_FAILURE 72
69 #define MSG_OSD_ALIVE 73
70 #define MSG_OSD_MARK_ME_DOWN 74
71 #define MSG_OSD_FULL 75
72
73 // removed right after luminous
74 //#define MSG_OSD_SUBOP 76
75 //#define MSG_OSD_SUBOPREPLY 77
76
77 #define MSG_OSD_PGTEMP 78
78
79 #define MSG_OSD_BEACON 79
80
81 #define MSG_OSD_PG_NOTIFY 80
82 #define MSG_OSD_PG_QUERY 81
83 #define MSG_OSD_PG_LOG 83
84 #define MSG_OSD_PG_REMOVE 84
85 #define MSG_OSD_PG_INFO 85
86 #define MSG_OSD_PG_TRIM 86
87
88 #define MSG_PGSTATS 87
89 #define MSG_PGSTATSACK 88
90
91 #define MSG_OSD_PG_CREATE 89
92 #define MSG_REMOVE_SNAPS 90
93
94 #define MSG_OSD_SCRUB 91
95 #define MSG_OSD_SCRUB_RESERVE 92 // previous PG_MISSING
96 #define MSG_OSD_REP_SCRUB 93
97
98 #define MSG_OSD_PG_SCAN 94
99 #define MSG_OSD_PG_BACKFILL 95
100 #define MSG_OSD_PG_BACKFILL_REMOVE 96
101
102 #define MSG_COMMAND 97
103 #define MSG_COMMAND_REPLY 98
104
105 #define MSG_OSD_BACKFILL_RESERVE 99
106 #define MSG_OSD_RECOVERY_RESERVE 150
107 #define MSG_OSD_FORCE_RECOVERY 151
108
109 #define MSG_OSD_PG_PUSH 105
110 #define MSG_OSD_PG_PULL 106
111 #define MSG_OSD_PG_PUSH_REPLY 107
112
113 #define MSG_OSD_EC_WRITE 108
114 #define MSG_OSD_EC_WRITE_REPLY 109
115 #define MSG_OSD_EC_READ 110
116 #define MSG_OSD_EC_READ_REPLY 111
117
118 #define MSG_OSD_REPOP 112
119 #define MSG_OSD_REPOPREPLY 113
120 #define MSG_OSD_PG_UPDATE_LOG_MISSING 114
121 #define MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY 115
122
123 #define MSG_OSD_PG_CREATED 116
124 #define MSG_OSD_REP_SCRUBMAP 117
125 #define MSG_OSD_PG_RECOVERY_DELETE 118
126 #define MSG_OSD_PG_RECOVERY_DELETE_REPLY 119
127 #define MSG_OSD_PG_CREATE2 120
128 #define MSG_OSD_SCRUB2 121
129
130 #define MSG_OSD_PG_READY_TO_MERGE 122
131
132 // *** MDS ***
133
134 #define MSG_MDS_BEACON 100 // to monitor
135 #define MSG_MDS_SLAVE_REQUEST 101
136 #define MSG_MDS_TABLE_REQUEST 102
137
138 // 150 already in use (MSG_OSD_RECOVERY_RESERVE)
139
140 #define MSG_MDS_RESOLVE 0x200
141 #define MSG_MDS_RESOLVEACK 0x201
142 #define MSG_MDS_CACHEREJOIN 0x202
143 #define MSG_MDS_DISCOVER 0x203
144 #define MSG_MDS_DISCOVERREPLY 0x204
145 #define MSG_MDS_INODEUPDATE 0x205
146 #define MSG_MDS_DIRUPDATE 0x206
147 #define MSG_MDS_CACHEEXPIRE 0x207
148 #define MSG_MDS_DENTRYUNLINK 0x208
149 #define MSG_MDS_FRAGMENTNOTIFY 0x209
150 #define MSG_MDS_OFFLOAD_TARGETS 0x20a
151 #define MSG_MDS_DENTRYLINK 0x20c
152 #define MSG_MDS_FINDINO 0x20d
153 #define MSG_MDS_FINDINOREPLY 0x20e
154 #define MSG_MDS_OPENINO 0x20f
155 #define MSG_MDS_OPENINOREPLY 0x210
156 #define MSG_MDS_SNAPUPDATE 0x211
157 #define MSG_MDS_FRAGMENTNOTIFYACK 0x212
158 #define MSG_MDS_LOCK 0x300
159 #define MSG_MDS_INODEFILECAPS 0x301
160
161 #define MSG_MDS_EXPORTDIRDISCOVER 0x449
162 #define MSG_MDS_EXPORTDIRDISCOVERACK 0x450
163 #define MSG_MDS_EXPORTDIRCANCEL 0x451
164 #define MSG_MDS_EXPORTDIRPREP 0x452
165 #define MSG_MDS_EXPORTDIRPREPACK 0x453
166 #define MSG_MDS_EXPORTDIRWARNING 0x454
167 #define MSG_MDS_EXPORTDIRWARNINGACK 0x455
168 #define MSG_MDS_EXPORTDIR 0x456
169 #define MSG_MDS_EXPORTDIRACK 0x457
170 #define MSG_MDS_EXPORTDIRNOTIFY 0x458
171 #define MSG_MDS_EXPORTDIRNOTIFYACK 0x459
172 #define MSG_MDS_EXPORTDIRFINISH 0x460
173
174 #define MSG_MDS_EXPORTCAPS 0x470
175 #define MSG_MDS_EXPORTCAPSACK 0x471
176 #define MSG_MDS_GATHERCAPS 0x472
177
178 #define MSG_MDS_HEARTBEAT 0x500 // for mds load balancer
179
180 // *** generic ***
181 #define MSG_TIMECHECK 0x600
182 #define MSG_MON_HEALTH 0x601
183
184 // *** Message::encode() crcflags bits ***
185 #define MSG_CRC_DATA (1 << 0)
186 #define MSG_CRC_HEADER (1 << 1)
187 #define MSG_CRC_ALL (MSG_CRC_DATA | MSG_CRC_HEADER)
188
189 // Xio Testing
190 #define MSG_DATA_PING 0x602
191
192 // Xio intends to define messages 0x603..0x606
193
194 // Special
195 #define MSG_NOP 0x607
196
197 #define MSG_MON_HEALTH_CHECKS 0x608
198 #define MSG_TIMECHECK2 0x609
199
200 // *** ceph-mgr <-> OSD/MDS daemons ***
201 #define MSG_MGR_OPEN 0x700
202 #define MSG_MGR_CONFIGURE 0x701
203 #define MSG_MGR_REPORT 0x702
204
205 // *** ceph-mgr <-> ceph-mon ***
206 #define MSG_MGR_BEACON 0x703
207
208 // *** ceph-mon(MgrMonitor) -> OSD/MDS daemons ***
209 #define MSG_MGR_MAP 0x704
210
211 // *** ceph-mon(MgrMonitor) -> ceph-mgr
212 #define MSG_MGR_DIGEST 0x705
213 // *** cephmgr -> ceph-mon
214 #define MSG_MON_MGR_REPORT 0x706
215 #define MSG_SERVICE_MAP 0x707
216
217 #define MSG_MGR_CLOSE 0x708
218
219 // ======================================================
220
221 // abstract Message class
222
223 namespace bi = boost::intrusive;
224
225 // XioMessenger conditional trace flags
226 #define MSG_MAGIC_XIO 0x0002
227 #define MSG_MAGIC_TRACE_XCON 0x0004
228 #define MSG_MAGIC_TRACE_DTOR 0x0008
229 #define MSG_MAGIC_TRACE_HDR 0x0010
230 #define MSG_MAGIC_TRACE_XIO 0x0020
231 #define MSG_MAGIC_TRACE_XMSGR 0x0040
232 #define MSG_MAGIC_TRACE_CTR 0x0080
233
234 // XioMessenger diagnostic "ping pong" flag (resend msg when send completes)
235 #define MSG_MAGIC_REDUPE 0x0100
236
237 class Message : public RefCountedObject {
238 protected:
239 ceph_msg_header header; // headerelope
240 ceph_msg_footer footer;
241 bufferlist payload; // "front" unaligned blob
242 bufferlist middle; // "middle" unaligned blob
243 bufferlist data; // data payload (page-alignment will be preserved where possible)
244
245 /* recv_stamp is set when the Messenger starts reading the
246 * Message off the wire */
247 utime_t recv_stamp;
248 /* dispatch_stamp is set when the Messenger starts calling dispatch() on
249 * its endpoints */
250 utime_t dispatch_stamp;
251 /* throttle_stamp is the point at which we got throttle */
252 utime_t throttle_stamp;
253 /* time at which message was fully read */
254 utime_t recv_complete_stamp;
255
256 ConnectionRef connection;
257
258 uint32_t magic = 0;
259
260 bi::list_member_hook<> dispatch_q;
261
262 public:
263 using ref = MessageRef;
264 using const_ref = MessageConstRef;
265
266 // zipkin tracing
267 ZTracer::Trace trace;
268 void encode_trace(bufferlist &bl, uint64_t features) const;
269 void decode_trace(bufferlist::const_iterator &p, bool create = false);
270
271 class CompletionHook : public Context {
272 protected:
273 Message *m;
274 friend class Message;
275 public:
276 explicit CompletionHook(Message *_m) : m(_m) {}
277 virtual void set_message(Message *_m) { m = _m; }
278 };
279
280 typedef bi::list< Message,
281 bi::member_hook< Message,
282 bi::list_member_hook<>,
283 &Message::dispatch_q > > Queue;
284
285 protected:
286 CompletionHook* completion_hook = nullptr; // owned by Messenger
287
288 // release our size in bytes back to this throttler when our payload
289 // is adjusted or when we are destroyed.
290 ThrottleInterface *byte_throttler = nullptr;
291
292 // release a count back to this throttler when we are destroyed
293 ThrottleInterface *msg_throttler = nullptr;
294
295 // keep track of how big this message was when we reserved space in
296 // the msgr dispatch_throttler, so that we can properly release it
297 // later. this is necessary because messages can enter the dispatch
298 // queue locally (not via read_message()), and those are not
299 // currently throttled.
300 uint64_t dispatch_throttle_size = 0;
301
302 friend class Messenger;
303
304 public:
305 Message() {
306 memset(&header, 0, sizeof(header));
307 memset(&footer, 0, sizeof(footer));
308 }
309 Message(int t, int version=1, int compat_version=0) {
310 memset(&header, 0, sizeof(header));
311 header.type = t;
312 header.version = version;
313 header.compat_version = compat_version;
314 header.priority = 0; // undef
315 header.data_off = 0;
316 memset(&footer, 0, sizeof(footer));
317 }
318
319 Message *get() {
320 return static_cast<Message *>(RefCountedObject::get());
321 }
322
323 protected:
324 ~Message() override {
325 if (byte_throttler)
326 byte_throttler->put(payload.length() + middle.length() + data.length());
327 release_message_throttle();
328 trace.event("message destructed");
329 /* call completion hooks (if any) */
330 if (completion_hook)
331 completion_hook->complete(0);
332 }
333 public:
334 const ConnectionRef& get_connection() const { return connection; }
335 void set_connection(const ConnectionRef& c) {
336 connection = c;
337 }
338 CompletionHook* get_completion_hook() { return completion_hook; }
339 void set_completion_hook(CompletionHook *hook) { completion_hook = hook; }
340 void set_byte_throttler(ThrottleInterface *t) {
341 byte_throttler = t;
342 }
343 void set_message_throttler(ThrottleInterface *t) {
344 msg_throttler = t;
345 }
346
347 void set_dispatch_throttle_size(uint64_t s) { dispatch_throttle_size = s; }
348 uint64_t get_dispatch_throttle_size() const { return dispatch_throttle_size; }
349
350 const ceph_msg_header &get_header() const { return header; }
351 ceph_msg_header &get_header() { return header; }
352 void set_header(const ceph_msg_header &e) { header = e; }
353 void set_footer(const ceph_msg_footer &e) { footer = e; }
354 const ceph_msg_footer &get_footer() const { return footer; }
355 ceph_msg_footer &get_footer() { return footer; }
356 void set_src(const entity_name_t& src) { header.src = src; }
357
358 uint32_t get_magic() const { return magic; }
359 void set_magic(int _magic) { magic = _magic; }
360
361 /*
362 * If you use get_[data, middle, payload] you shouldn't
363 * use it to change those bufferlists unless you KNOW
364 * there is no throttle being used. The other
365 * functions are throttling-aware as appropriate.
366 */
367
368 void clear_payload() {
369 if (byte_throttler) {
370 byte_throttler->put(payload.length() + middle.length());
371 }
372 payload.clear();
373 middle.clear();
374 }
375
376 virtual void clear_buffers() {}
377 void clear_data() {
378 if (byte_throttler)
379 byte_throttler->put(data.length());
380 data.clear();
381 clear_buffers(); // let subclass drop buffers as well
382 }
383 void release_message_throttle() {
384 if (msg_throttler)
385 msg_throttler->put();
386 msg_throttler = nullptr;
387 }
388
389 bool empty_payload() const { return payload.length() == 0; }
390 bufferlist& get_payload() { return payload; }
391 const bufferlist& get_payload() const { return payload; }
392 void set_payload(bufferlist& bl) {
393 if (byte_throttler)
394 byte_throttler->put(payload.length());
395 payload.claim(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE);
396 if (byte_throttler)
397 byte_throttler->take(payload.length());
398 }
399
400 void set_middle(bufferlist& bl) {
401 if (byte_throttler)
402 byte_throttler->put(middle.length());
403 middle.claim(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE);
404 if (byte_throttler)
405 byte_throttler->take(middle.length());
406 }
407 bufferlist& get_middle() { return middle; }
408
409 void set_data(const bufferlist &bl) {
410 if (byte_throttler)
411 byte_throttler->put(data.length());
412 data.share(bl);
413 if (byte_throttler)
414 byte_throttler->take(data.length());
415 }
416
417 const bufferlist& get_data() const { return data; }
418 bufferlist& get_data() { return data; }
419 void claim_data(bufferlist& bl,
420 unsigned int flags = buffer::list::CLAIM_DEFAULT) {
421 if (byte_throttler)
422 byte_throttler->put(data.length());
423 bl.claim(data, flags);
424 }
425 off_t get_data_len() const { return data.length(); }
426
427 void set_recv_stamp(utime_t t) { recv_stamp = t; }
428 const utime_t& get_recv_stamp() const { return recv_stamp; }
429 void set_dispatch_stamp(utime_t t) { dispatch_stamp = t; }
430 const utime_t& get_dispatch_stamp() const { return dispatch_stamp; }
431 void set_throttle_stamp(utime_t t) { throttle_stamp = t; }
432 const utime_t& get_throttle_stamp() const { return throttle_stamp; }
433 void set_recv_complete_stamp(utime_t t) { recv_complete_stamp = t; }
434 const utime_t& get_recv_complete_stamp() const { return recv_complete_stamp; }
435
436 void calc_header_crc() {
437 header.crc = ceph_crc32c(0, (unsigned char*)&header,
438 sizeof(header) - sizeof(header.crc));
439 }
440 void calc_front_crc() {
441 footer.front_crc = payload.crc32c(0);
442 footer.middle_crc = middle.crc32c(0);
443 }
444 void calc_data_crc() {
445 footer.data_crc = data.crc32c(0);
446 }
447
448 virtual int get_cost() const {
449 return data.length();
450 }
451
452 // type
453 int get_type() const { return header.type; }
454 void set_type(int t) { header.type = t; }
455
456 uint64_t get_tid() const { return header.tid; }
457 void set_tid(uint64_t t) { header.tid = t; }
458
459 uint64_t get_seq() const { return header.seq; }
460 void set_seq(uint64_t s) { header.seq = s; }
461
462 unsigned get_priority() const { return header.priority; }
463 void set_priority(__s16 p) { header.priority = p; }
464
465 // source/dest
466 entity_inst_t get_source_inst() const {
467 return entity_inst_t(get_source(), get_source_addr());
468 }
469 entity_name_t get_source() const {
470 return entity_name_t(header.src);
471 }
472 entity_addr_t get_source_addr() const {
473 if (connection)
474 return connection->get_peer_addr();
475 return entity_addr_t();
476 }
477 entity_addrvec_t get_source_addrs() const {
478 if (connection)
479 return connection->get_peer_addrs();
480 return entity_addrvec_t();
481 }
482
483 // forwarded?
484 entity_inst_t get_orig_source_inst() const {
485 return get_source_inst();
486 }
487 entity_name_t get_orig_source() const {
488 return get_source();
489 }
490 entity_addr_t get_orig_source_addr() const {
491 return get_source_addr();
492 }
493 entity_addrvec_t get_orig_source_addrs() const {
494 return get_source_addrs();
495 }
496
497 // virtual bits
498 virtual void decode_payload() = 0;
499 virtual void encode_payload(uint64_t features) = 0;
500 virtual std::string_view get_type_name() const = 0;
501 virtual void print(ostream& out) const {
502 out << get_type_name() << " magic: " << magic;
503 }
504
505 virtual void dump(Formatter *f) const;
506
507 void encode(uint64_t features, int crcflags);
508 };
509
510 extern Message *decode_message(CephContext *cct, int crcflags,
511 ceph_msg_header &header,
512 ceph_msg_footer& footer, bufferlist& front,
513 bufferlist& middle, bufferlist& data,
514 Connection* conn);
515 inline ostream& operator<<(ostream& out, const Message& m) {
516 m.print(out);
517 if (m.get_header().version)
518 out << " v" << m.get_header().version;
519 return out;
520 }
521
522 extern void encode_message(Message *m, uint64_t features, bufferlist& bl);
523 extern Message *decode_message(CephContext *cct, int crcflags,
524 bufferlist::const_iterator& bl);
525
526 template <class MessageType>
527 class MessageFactory {
528 public:
529 template<typename... Args>
530 static typename MessageType::ref build(Args&&... args) {
531 return typename MessageType::ref(new MessageType(std::forward<Args>(args)...), false);
532 }
533 };
534
535 template<class T, class M = Message>
536 class MessageSubType : public M {
537 public:
538 typedef boost::intrusive_ptr<T> ref;
539 typedef boost::intrusive_ptr<T const> const_ref;
540
541 static auto msgref_cast(typename M::ref const& m) {
542 return boost::static_pointer_cast<typename T::const_ref::element_type, typename std::remove_reference<decltype(m)>::type::element_type>(m);
543 }
544 static auto msgref_cast(typename M::const_ref const& m) {
545 return boost::static_pointer_cast<typename T::ref::element_type, typename std::remove_reference<decltype(m)>::type::element_type>(m);
546 }
547
548 protected:
549 template<typename... Args>
550 MessageSubType(Args&&... args) : M(std::forward<Args>(args)...) {}
551 virtual ~MessageSubType() override {}
552 };
553
554
555 template<class T, class M = Message>
556 class MessageInstance : public MessageSubType<T, M> {
557 public:
558 using factory = MessageFactory<T>;
559
560 template<typename... Args>
561 static auto create(Args&&... args) {
562 return MessageFactory<T>::build(std::forward<Args>(args)...);
563 }
564 static auto msgref_cast(typename Message::ref const& m) {
565 return boost::static_pointer_cast<typename T::ref::element_type, typename std::remove_reference<decltype(m)>::type::element_type>(m);
566 }
567 static auto msgref_cast(typename Message::const_ref const& m) {
568 return boost::static_pointer_cast<typename T::const_ref::element_type, typename std::remove_reference<decltype(m)>::type::element_type>(m);
569 }
570
571 protected:
572 template<typename... Args>
573 MessageInstance(Args&&... args) : MessageSubType<T,M>(std::forward<Args>(args)...) {}
574 virtual ~MessageInstance() override {}
575 };
576
577 #endif