]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/Message.h
update ceph source to reef 18.2.1
[ceph.git] / ceph / src / msg / Message.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_MESSAGE_H
16 #define CEPH_MESSAGE_H
17
18 #include <concepts>
19 #include <cstdlib>
20 #include <ostream>
21 #include <string_view>
22
23 #include <boost/intrusive/list.hpp>
24 #if FMT_VERSION >= 90000
25 #include <fmt/ostream.h>
26 #endif
27
28 #include "include/Context.h"
29 #include "common/RefCountedObj.h"
30 #include "common/ThrottleInterface.h"
31 #include "common/config.h"
32 #include "common/ref.h"
33 #include "common/debug.h"
34 #include "common/zipkin_trace.h"
35 #include "include/ceph_assert.h" // Because intrusive_ptr clobbers our assert...
36 #include "include/buffer.h"
37 #include "include/types.h"
38 #include "msg/Connection.h"
39 #include "msg/MessageRef.h"
40 #include "msg_types.h"
41
42 #ifdef WITH_SEASTAR
43 # include "crimson/net/SocketConnection.h"
44 #endif // WITH_SEASTAR
45
46 // monitor internal
47 #define MSG_MON_SCRUB 64
48 #define MSG_MON_ELECTION 65
49 #define MSG_MON_PAXOS 66
50 #define MSG_MON_PROBE 67
51 #define MSG_MON_JOIN 68
52 #define MSG_MON_SYNC 69
53 #define MSG_MON_PING 140
54
55 /* monitor <-> mon admin tool */
56 #define MSG_MON_COMMAND 50
57 #define MSG_MON_COMMAND_ACK 51
58 #define MSG_LOG 52
59 #define MSG_LOGACK 53
60
61 #define MSG_GETPOOLSTATS 58
62 #define MSG_GETPOOLSTATSREPLY 59
63
64 #define MSG_MON_GLOBAL_ID 60
65 #define MSG_MON_USED_PENDING_KEYS 141
66
67 #define MSG_ROUTE 47
68 #define MSG_FORWARD 46
69
70 #define MSG_PAXOS 40
71
72 #define MSG_CONFIG 62
73 #define MSG_GET_CONFIG 63
74
75 #define MSG_KV_DATA 54
76
77 #define MSG_MON_GET_PURGED_SNAPS 76
78 #define MSG_MON_GET_PURGED_SNAPS_REPLY 77
79
80 // osd internal
81 #define MSG_OSD_PING 70
82 #define MSG_OSD_BOOT 71
83 #define MSG_OSD_FAILURE 72
84 #define MSG_OSD_ALIVE 73
85 #define MSG_OSD_MARK_ME_DOWN 74
86 #define MSG_OSD_FULL 75
87 #define MSG_OSD_MARK_ME_DEAD 123
88
89 // removed right after luminous
90 //#define MSG_OSD_SUBOP 76
91 //#define MSG_OSD_SUBOPREPLY 77
92
93 #define MSG_OSD_PGTEMP 78
94
95 #define MSG_OSD_BEACON 79
96
97 #define MSG_OSD_PG_NOTIFY 80
98 #define MSG_OSD_PG_NOTIFY2 130
99 #define MSG_OSD_PG_QUERY 81
100 #define MSG_OSD_PG_QUERY2 131
101 #define MSG_OSD_PG_LOG 83
102 #define MSG_OSD_PG_REMOVE 84
103 #define MSG_OSD_PG_INFO 85
104 #define MSG_OSD_PG_INFO2 132
105 #define MSG_OSD_PG_TRIM 86
106
107 #define MSG_PGSTATS 87
108 #define MSG_PGSTATSACK 88
109
110 #define MSG_OSD_PG_CREATE 89
111 #define MSG_REMOVE_SNAPS 90
112
113 #define MSG_OSD_SCRUB 91
114 #define MSG_OSD_SCRUB_RESERVE 92 // previous PG_MISSING
115 #define MSG_OSD_REP_SCRUB 93
116
117 #define MSG_OSD_PG_SCAN 94
118 #define MSG_OSD_PG_BACKFILL 95
119 #define MSG_OSD_PG_BACKFILL_REMOVE 96
120
121 #define MSG_COMMAND 97
122 #define MSG_COMMAND_REPLY 98
123
124 #define MSG_OSD_BACKFILL_RESERVE 99
125 #define MSG_OSD_RECOVERY_RESERVE 150
126 #define MSG_OSD_FORCE_RECOVERY 151
127
128 #define MSG_OSD_PG_PUSH 105
129 #define MSG_OSD_PG_PULL 106
130 #define MSG_OSD_PG_PUSH_REPLY 107
131
132 #define MSG_OSD_EC_WRITE 108
133 #define MSG_OSD_EC_WRITE_REPLY 109
134 #define MSG_OSD_EC_READ 110
135 #define MSG_OSD_EC_READ_REPLY 111
136
137 #define MSG_OSD_REPOP 112
138 #define MSG_OSD_REPOPREPLY 113
139 #define MSG_OSD_PG_UPDATE_LOG_MISSING 114
140 #define MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY 115
141
142 #define MSG_OSD_PG_CREATED 116
143 #define MSG_OSD_REP_SCRUBMAP 117
144 #define MSG_OSD_PG_RECOVERY_DELETE 118
145 #define MSG_OSD_PG_RECOVERY_DELETE_REPLY 119
146 #define MSG_OSD_PG_CREATE2 120
147 #define MSG_OSD_SCRUB2 121
148
149 #define MSG_OSD_PG_READY_TO_MERGE 122
150
151 #define MSG_OSD_PG_LEASE 133
152 #define MSG_OSD_PG_LEASE_ACK 134
153
154 // *** MDS ***
155
156 #define MSG_MDS_BEACON 100 // to monitor
157 #define MSG_MDS_PEER_REQUEST 101
158 #define MSG_MDS_TABLE_REQUEST 102
159 #define MSG_MDS_SCRUB 135
160
161 // 150 already in use (MSG_OSD_RECOVERY_RESERVE)
162
163 #define MSG_MDS_RESOLVE 0x200 // 0x2xx are for mdcache of mds
164 #define MSG_MDS_RESOLVEACK 0x201
165 #define MSG_MDS_CACHEREJOIN 0x202
166 #define MSG_MDS_DISCOVER 0x203
167 #define MSG_MDS_DISCOVERREPLY 0x204
168 #define MSG_MDS_INODEUPDATE 0x205
169 #define MSG_MDS_DIRUPDATE 0x206
170 #define MSG_MDS_CACHEEXPIRE 0x207
171 #define MSG_MDS_DENTRYUNLINK 0x208
172 #define MSG_MDS_FRAGMENTNOTIFY 0x209
173 #define MSG_MDS_OFFLOAD_TARGETS 0x20a
174 #define MSG_MDS_DENTRYLINK 0x20c
175 #define MSG_MDS_FINDINO 0x20d
176 #define MSG_MDS_FINDINOREPLY 0x20e
177 #define MSG_MDS_OPENINO 0x20f
178 #define MSG_MDS_OPENINOREPLY 0x210
179 #define MSG_MDS_SNAPUPDATE 0x211
180 #define MSG_MDS_FRAGMENTNOTIFYACK 0x212
181 #define MSG_MDS_LOCK 0x300 // 0x3xx are for locker of mds
182 #define MSG_MDS_INODEFILECAPS 0x301
183
184 #define MSG_MDS_EXPORTDIRDISCOVER 0x449 // 0x4xx are for migrator of mds
185 #define MSG_MDS_EXPORTDIRDISCOVERACK 0x450
186 #define MSG_MDS_EXPORTDIRCANCEL 0x451
187 #define MSG_MDS_EXPORTDIRPREP 0x452
188 #define MSG_MDS_EXPORTDIRPREPACK 0x453
189 #define MSG_MDS_EXPORTDIRWARNING 0x454
190 #define MSG_MDS_EXPORTDIRWARNINGACK 0x455
191 #define MSG_MDS_EXPORTDIR 0x456
192 #define MSG_MDS_EXPORTDIRACK 0x457
193 #define MSG_MDS_EXPORTDIRNOTIFY 0x458
194 #define MSG_MDS_EXPORTDIRNOTIFYACK 0x459
195 #define MSG_MDS_EXPORTDIRFINISH 0x460
196
197 #define MSG_MDS_EXPORTCAPS 0x470
198 #define MSG_MDS_EXPORTCAPSACK 0x471
199 #define MSG_MDS_GATHERCAPS 0x472
200
201 #define MSG_MDS_HEARTBEAT 0x500 // for mds load balancer
202 #define MSG_MDS_METRICS 0x501 // for mds metric aggregator
203 #define MSG_MDS_PING 0x502 // for mds pinger
204 #define MSG_MDS_SCRUB_STATS 0x503 // for mds scrub stack
205
206 // *** generic ***
207 #define MSG_TIMECHECK 0x600
208 #define MSG_MON_HEALTH 0x601
209
210 // *** Message::encode() crcflags bits ***
211 #define MSG_CRC_DATA (1 << 0)
212 #define MSG_CRC_HEADER (1 << 1)
213 #define MSG_CRC_ALL (MSG_CRC_DATA | MSG_CRC_HEADER)
214
215
216 // Special
217 #define MSG_NOP 0x607
218
219 #define MSG_MON_HEALTH_CHECKS 0x608
220 #define MSG_TIMECHECK2 0x609
221
222 // *** ceph-mgr <-> OSD/MDS daemons ***
223 #define MSG_MGR_OPEN 0x700
224 #define MSG_MGR_CONFIGURE 0x701
225 #define MSG_MGR_REPORT 0x702
226
227 // *** ceph-mgr <-> ceph-mon ***
228 #define MSG_MGR_BEACON 0x703
229
230 // *** ceph-mon(MgrMonitor) -> OSD/MDS daemons ***
231 #define MSG_MGR_MAP 0x704
232
233 // *** ceph-mon(MgrMonitor) -> ceph-mgr
234 #define MSG_MGR_DIGEST 0x705
235 // *** cephmgr -> ceph-mon
236 #define MSG_MON_MGR_REPORT 0x706
237 #define MSG_SERVICE_MAP 0x707
238
239 #define MSG_MGR_CLOSE 0x708
240 #define MSG_MGR_COMMAND 0x709
241 #define MSG_MGR_COMMAND_REPLY 0x70a
242
243 // *** ceph-mgr <-> MON daemons ***
244 #define MSG_MGR_UPDATE 0x70b
245
246 // ======================================================
247
248 // abstract Message class
249
250 class Message : public RefCountedObject {
251 public:
252 #ifdef WITH_SEASTAR
253 using ConnectionRef = crimson::net::ConnectionRef;
254 #else
255 using ConnectionRef = ::ConnectionRef;
256 #endif // WITH_SEASTAR
257
258 protected:
259 ceph_msg_header header; // headerelope
260 ceph_msg_footer footer;
261 ceph::buffer::list payload; // "front" unaligned blob
262 ceph::buffer::list middle; // "middle" unaligned blob
263 ceph::buffer::list data; // data payload (page-alignment will be preserved where possible)
264
265 /* recv_stamp is set when the Messenger starts reading the
266 * Message off the wire */
267 utime_t recv_stamp;
268 /* dispatch_stamp is set when the Messenger starts calling dispatch() on
269 * its endpoints */
270 utime_t dispatch_stamp;
271 /* throttle_stamp is the point at which we got throttle */
272 utime_t throttle_stamp;
273 /* time at which message was fully read */
274 utime_t recv_complete_stamp;
275
276 ConnectionRef connection;
277
278 uint32_t magic = 0;
279
280 boost::intrusive::list_member_hook<> dispatch_q;
281
282 public:
283 // zipkin tracing
284 ZTracer::Trace trace;
285 void encode_trace(ceph::buffer::list &bl, uint64_t features) const;
286 void decode_trace(ceph::buffer::list::const_iterator &p, bool create = false);
287
288 class CompletionHook : public Context {
289 protected:
290 Message *m;
291 friend class Message;
292 public:
293 explicit CompletionHook(Message *_m) : m(_m) {}
294 virtual void set_message(Message *_m) { m = _m; }
295 };
296
297 typedef boost::intrusive::list<Message,
298 boost::intrusive::member_hook<
299 Message,
300 boost::intrusive::list_member_hook<>,
301 &Message::dispatch_q>> Queue;
302
303 ceph::mono_time queue_start;
304 protected:
305 CompletionHook* completion_hook = nullptr; // owned by Messenger
306
307 // release our size in bytes back to this throttler when our payload
308 // is adjusted or when we are destroyed.
309 ThrottleInterface *byte_throttler = nullptr;
310
311 // release a count back to this throttler when we are destroyed
312 ThrottleInterface *msg_throttler = nullptr;
313
314 // keep track of how big this message was when we reserved space in
315 // the msgr dispatch_throttler, so that we can properly release it
316 // later. this is necessary because messages can enter the dispatch
317 // queue locally (not via read_message()), and those are not
318 // currently throttled.
319 uint64_t dispatch_throttle_size = 0;
320
321 friend class Messenger;
322
323 public:
324 Message() {
325 memset(&header, 0, sizeof(header));
326 memset(&footer, 0, sizeof(footer));
327 }
328 Message(int t, int version=1, int compat_version=0) {
329 memset(&header, 0, sizeof(header));
330 header.type = t;
331 header.version = version;
332 header.compat_version = compat_version;
333 memset(&footer, 0, sizeof(footer));
334 }
335
336 Message *get() {
337 return static_cast<Message *>(RefCountedObject::get());
338 }
339
340 protected:
341 ~Message() override {
342 if (byte_throttler)
343 byte_throttler->put(payload.length() + middle.length() + data.length());
344 release_message_throttle();
345 trace.event("message destructed");
346 /* call completion hooks (if any) */
347 if (completion_hook)
348 completion_hook->complete(0);
349 }
350 public:
351 const ConnectionRef& get_connection() const {
352 #ifdef WITH_SEASTAR
353 // In crimson, conn is independently maintained outside Message.
354 ceph_abort();
355 #endif
356 return connection;
357 }
358 void set_connection(ConnectionRef c) {
359 #ifdef WITH_SEASTAR
360 // In crimson, conn is independently maintained outside Message.
361 ceph_assert(c == nullptr);
362 #endif
363 connection = std::move(c);
364 }
365 CompletionHook* get_completion_hook() { return completion_hook; }
366 void set_completion_hook(CompletionHook *hook) { completion_hook = hook; }
367 void set_byte_throttler(ThrottleInterface *t) {
368 byte_throttler = t;
369 }
370 void set_message_throttler(ThrottleInterface *t) {
371 msg_throttler = t;
372 }
373
374 void set_dispatch_throttle_size(uint64_t s) { dispatch_throttle_size = s; }
375 uint64_t get_dispatch_throttle_size() const { return dispatch_throttle_size; }
376
377 const ceph_msg_header &get_header() const { return header; }
378 ceph_msg_header &get_header() { return header; }
379 void set_header(const ceph_msg_header &e) { header = e; }
380 void set_footer(const ceph_msg_footer &e) { footer = e; }
381 const ceph_msg_footer &get_footer() const { return footer; }
382 ceph_msg_footer &get_footer() { return footer; }
383 void set_src(const entity_name_t& src) { header.src = src; }
384
385 uint32_t get_magic() const { return magic; }
386 void set_magic(int _magic) { magic = _magic; }
387
388 /*
389 * If you use get_[data, middle, payload] you shouldn't
390 * use it to change those ceph::buffer::lists unless you KNOW
391 * there is no throttle being used. The other
392 * functions are throttling-aware as appropriate.
393 */
394
395 void clear_payload() {
396 if (byte_throttler) {
397 byte_throttler->put(payload.length() + middle.length());
398 }
399 payload.clear();
400 middle.clear();
401 }
402
403 virtual void clear_buffers() {}
404 void clear_data() {
405 if (byte_throttler)
406 byte_throttler->put(data.length());
407 data.clear();
408 clear_buffers(); // let subclass drop buffers as well
409 }
410 void release_message_throttle() {
411 if (msg_throttler)
412 msg_throttler->put();
413 msg_throttler = nullptr;
414 }
415
416 bool empty_payload() const { return payload.length() == 0; }
417 ceph::buffer::list& get_payload() { return payload; }
418 const ceph::buffer::list& get_payload() const { return payload; }
419 void set_payload(ceph::buffer::list& bl) {
420 if (byte_throttler)
421 byte_throttler->put(payload.length());
422 payload = std::move(bl);
423 if (byte_throttler)
424 byte_throttler->take(payload.length());
425 }
426
427 void set_middle(ceph::buffer::list& bl) {
428 if (byte_throttler)
429 byte_throttler->put(middle.length());
430 middle = std::move(bl);
431 if (byte_throttler)
432 byte_throttler->take(middle.length());
433 }
434 ceph::buffer::list& get_middle() { return middle; }
435
436 void set_data(const ceph::buffer::list &bl) {
437 if (byte_throttler)
438 byte_throttler->put(data.length());
439 data.share(bl);
440 if (byte_throttler)
441 byte_throttler->take(data.length());
442 }
443
444 const ceph::buffer::list& get_data() const { return data; }
445 ceph::buffer::list& get_data() { return data; }
446 void claim_data(ceph::buffer::list& bl) {
447 if (byte_throttler)
448 byte_throttler->put(data.length());
449 bl = std::move(data);
450 }
451 uint32_t get_data_len() const { return data.length(); }
452
453 void set_recv_stamp(utime_t t) { recv_stamp = t; }
454 const utime_t& get_recv_stamp() const { return recv_stamp; }
455 void set_dispatch_stamp(utime_t t) { dispatch_stamp = t; }
456 const utime_t& get_dispatch_stamp() const { return dispatch_stamp; }
457 void set_throttle_stamp(utime_t t) { throttle_stamp = t; }
458 const utime_t& get_throttle_stamp() const { return throttle_stamp; }
459 void set_recv_complete_stamp(utime_t t) { recv_complete_stamp = t; }
460 const utime_t& get_recv_complete_stamp() const { return recv_complete_stamp; }
461
462 void calc_header_crc() {
463 header.crc = ceph_crc32c(0, (unsigned char*)&header,
464 sizeof(header) - sizeof(header.crc));
465 }
466 void calc_front_crc() {
467 footer.front_crc = payload.crc32c(0);
468 footer.middle_crc = middle.crc32c(0);
469 }
470 void calc_data_crc() {
471 footer.data_crc = data.crc32c(0);
472 }
473
474 virtual int get_cost() const {
475 return data.length();
476 }
477
478 // type
479 int get_type() const { return header.type; }
480 void set_type(int t) { header.type = t; }
481
482 uint64_t get_tid() const { return header.tid; }
483 void set_tid(uint64_t t) { header.tid = t; }
484
485 uint64_t get_seq() const { return header.seq; }
486 void set_seq(uint64_t s) { header.seq = s; }
487
488 unsigned get_priority() const { return header.priority; }
489 void set_priority(__s16 p) { header.priority = p; }
490
491 // source/dest
492 entity_inst_t get_source_inst() const {
493 return entity_inst_t(get_source(), get_source_addr());
494 }
495 entity_name_t get_source() const {
496 return entity_name_t(header.src);
497 }
498 entity_addr_t get_source_addr() const {
499 if (connection)
500 return connection->get_peer_addr();
501 return entity_addr_t();
502 }
503 entity_addrvec_t get_source_addrs() const {
504 if (connection)
505 return connection->get_peer_addrs();
506 return entity_addrvec_t();
507 }
508
509 // forwarded?
510 entity_inst_t get_orig_source_inst() const {
511 return get_source_inst();
512 }
513 entity_name_t get_orig_source() const {
514 return get_source();
515 }
516 entity_addr_t get_orig_source_addr() const {
517 return get_source_addr();
518 }
519 entity_addrvec_t get_orig_source_addrs() const {
520 return get_source_addrs();
521 }
522
523 // virtual bits
524 virtual void decode_payload() = 0;
525 virtual void encode_payload(uint64_t features) = 0;
526 virtual std::string_view get_type_name() const = 0;
527 virtual void print(std::ostream& out) const {
528 out << get_type_name() << " magic: " << magic;
529 }
530
531 virtual void dump(ceph::Formatter *f) const;
532
533 void encode(uint64_t features, int crcflags, bool skip_header_crc = false);
534 };
535
536 extern Message *decode_message(CephContext *cct,
537 int crcflags,
538 ceph_msg_header& header,
539 ceph_msg_footer& footer,
540 ceph::buffer::list& front,
541 ceph::buffer::list& middle,
542 ceph::buffer::list& data,
543 Message::ConnectionRef conn);
544 inline std::ostream& operator<<(std::ostream& out, const Message& m) {
545 m.print(out);
546 if (m.get_header().version)
547 out << " v" << m.get_header().version;
548 return out;
549 }
550
551 extern void encode_message(Message *m, uint64_t features, ceph::buffer::list& bl);
552 extern Message *decode_message(CephContext *cct, int crcflags,
553 ceph::buffer::list::const_iterator& bl);
554
555 /// this is a "safe" version of Message. it does not allow calling get/put
556 /// methods on its derived classes. This is intended to prevent some accidental
557 /// reference leaks by forcing . Instead, you must either cast the derived class to a
558 /// RefCountedObject to do the get/put or detach a temporary reference.
559 class SafeMessage : public Message {
560 public:
561 using Message::Message;
562 bool is_a_client() const {
563 return get_connection()->get_peer_type() == CEPH_ENTITY_TYPE_CLIENT;
564 }
565
566 private:
567 using RefCountedObject::get;
568 using RefCountedObject::put;
569 };
570
571 namespace ceph {
572 template<class T, typename... Args>
573 ceph::ref_t<T> make_message(Args&&... args) {
574 return {new T(std::forward<Args>(args)...), false};
575 }
576 }
577
578 namespace crimson {
579 template<class T, typename... Args>
580 MURef<T> make_message(Args&&... args) {
581 return {new T(std::forward<Args>(args)...), TOPNSPC::common::UniquePtrDeleter{}};
582 }
583 }
584
585 #if FMT_VERSION >= 90000
586 template <std::derived_from<Message> M> struct fmt::formatter<M> : fmt::ostream_formatter {};
587 #endif
588
589 #endif