]> git.proxmox.com Git - ceph.git/blame - ceph/src/msg/Message.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / msg / Message.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef CEPH_MESSAGE_H
16#define CEPH_MESSAGE_H
9f95a23c
TL
17
18#include <cstdlib>
7c673cae 19#include <ostream>
11fdf7f2 20#include <string_view>
7c673cae 21
7c673cae 22#include <boost/intrusive/list.hpp>
7c673cae 23
11fdf7f2 24#include "include/Context.h"
7c673cae 25#include "common/RefCountedObj.h"
11fdf7f2 26#include "common/ThrottleInterface.h"
7c673cae 27#include "common/config.h"
9f95a23c 28#include "common/ref.h"
11fdf7f2
TL
29#include "common/debug.h"
30#include "common/zipkin_trace.h"
31#include "include/ceph_assert.h" // Because intrusive_ptr clobbers our assert...
32#include "include/buffer.h"
33#include "include/types.h"
34#include "msg/Connection.h"
35#include "msg/MessageRef.h"
36#include "msg_types.h"
7c673cae 37
9f95a23c
TL
38#ifdef WITH_SEASTAR
39# include "crimson/net/SocketConnection.h"
40#endif // WITH_SEASTAR
41
7c673cae
FG
42// monitor internal
43#define MSG_MON_SCRUB 64
44#define MSG_MON_ELECTION 65
45#define MSG_MON_PAXOS 66
46#define MSG_MON_PROBE 67
47#define MSG_MON_JOIN 68
48#define MSG_MON_SYNC 69
f67539c2 49#define MSG_MON_PING 140
7c673cae
FG
50
51/* monitor <-> mon admin tool */
52#define MSG_MON_COMMAND 50
53#define MSG_MON_COMMAND_ACK 51
54#define MSG_LOG 52
55#define MSG_LOGACK 53
56
57#define MSG_GETPOOLSTATS 58
58#define MSG_GETPOOLSTATSREPLY 59
59
60#define MSG_MON_GLOBAL_ID 60
61
62#define MSG_ROUTE 47
63#define MSG_FORWARD 46
64
65#define MSG_PAXOS 40
66
11fdf7f2
TL
67#define MSG_CONFIG 62
68#define MSG_GET_CONFIG 63
69
f67539c2
TL
70#define MSG_KV_DATA 54
71
9f95a23c
TL
72#define MSG_MON_GET_PURGED_SNAPS 76
73#define MSG_MON_GET_PURGED_SNAPS_REPLY 77
7c673cae
FG
74
75// osd internal
76#define MSG_OSD_PING 70
77#define MSG_OSD_BOOT 71
78#define MSG_OSD_FAILURE 72
79#define MSG_OSD_ALIVE 73
80#define MSG_OSD_MARK_ME_DOWN 74
81#define MSG_OSD_FULL 75
9f95a23c 82#define MSG_OSD_MARK_ME_DEAD 123
7c673cae 83
11fdf7f2
TL
84// removed right after luminous
85//#define MSG_OSD_SUBOP 76
86//#define MSG_OSD_SUBOPREPLY 77
7c673cae
FG
87
88#define MSG_OSD_PGTEMP 78
89
90#define MSG_OSD_BEACON 79
91
92#define MSG_OSD_PG_NOTIFY 80
9f95a23c 93#define MSG_OSD_PG_NOTIFY2 130
7c673cae 94#define MSG_OSD_PG_QUERY 81
9f95a23c 95#define MSG_OSD_PG_QUERY2 131
7c673cae
FG
96#define MSG_OSD_PG_LOG 83
97#define MSG_OSD_PG_REMOVE 84
98#define MSG_OSD_PG_INFO 85
9f95a23c 99#define MSG_OSD_PG_INFO2 132
7c673cae
FG
100#define MSG_OSD_PG_TRIM 86
101
102#define MSG_PGSTATS 87
103#define MSG_PGSTATSACK 88
104
105#define MSG_OSD_PG_CREATE 89
106#define MSG_REMOVE_SNAPS 90
107
108#define MSG_OSD_SCRUB 91
109#define MSG_OSD_SCRUB_RESERVE 92 // previous PG_MISSING
110#define MSG_OSD_REP_SCRUB 93
111
112#define MSG_OSD_PG_SCAN 94
113#define MSG_OSD_PG_BACKFILL 95
114#define MSG_OSD_PG_BACKFILL_REMOVE 96
115
116#define MSG_COMMAND 97
117#define MSG_COMMAND_REPLY 98
118
119#define MSG_OSD_BACKFILL_RESERVE 99
120#define MSG_OSD_RECOVERY_RESERVE 150
c07f9fc5 121#define MSG_OSD_FORCE_RECOVERY 151
7c673cae
FG
122
123#define MSG_OSD_PG_PUSH 105
124#define MSG_OSD_PG_PULL 106
125#define MSG_OSD_PG_PUSH_REPLY 107
126
127#define MSG_OSD_EC_WRITE 108
128#define MSG_OSD_EC_WRITE_REPLY 109
129#define MSG_OSD_EC_READ 110
130#define MSG_OSD_EC_READ_REPLY 111
131
132#define MSG_OSD_REPOP 112
133#define MSG_OSD_REPOPREPLY 113
134#define MSG_OSD_PG_UPDATE_LOG_MISSING 114
135#define MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY 115
136
137#define MSG_OSD_PG_CREATED 116
138#define MSG_OSD_REP_SCRUBMAP 117
c07f9fc5
FG
139#define MSG_OSD_PG_RECOVERY_DELETE 118
140#define MSG_OSD_PG_RECOVERY_DELETE_REPLY 119
11fdf7f2
TL
141#define MSG_OSD_PG_CREATE2 120
142#define MSG_OSD_SCRUB2 121
143
144#define MSG_OSD_PG_READY_TO_MERGE 122
7c673cae 145
9f95a23c
TL
146#define MSG_OSD_PG_LEASE 133
147#define MSG_OSD_PG_LEASE_ACK 134
148
7c673cae
FG
149// *** MDS ***
150
151#define MSG_MDS_BEACON 100 // to monitor
f67539c2 152#define MSG_MDS_PEER_REQUEST 101
7c673cae 153#define MSG_MDS_TABLE_REQUEST 102
f67539c2 154#define MSG_MDS_SCRUB 135
7c673cae
FG
155
156 // 150 already in use (MSG_OSD_RECOVERY_RESERVE)
157
f67539c2 158#define MSG_MDS_RESOLVE 0x200 // 0x2xx are for mdcache of mds
7c673cae
FG
159#define MSG_MDS_RESOLVEACK 0x201
160#define MSG_MDS_CACHEREJOIN 0x202
161#define MSG_MDS_DISCOVER 0x203
162#define MSG_MDS_DISCOVERREPLY 0x204
163#define MSG_MDS_INODEUPDATE 0x205
164#define MSG_MDS_DIRUPDATE 0x206
165#define MSG_MDS_CACHEEXPIRE 0x207
166#define MSG_MDS_DENTRYUNLINK 0x208
167#define MSG_MDS_FRAGMENTNOTIFY 0x209
168#define MSG_MDS_OFFLOAD_TARGETS 0x20a
169#define MSG_MDS_DENTRYLINK 0x20c
170#define MSG_MDS_FINDINO 0x20d
171#define MSG_MDS_FINDINOREPLY 0x20e
172#define MSG_MDS_OPENINO 0x20f
173#define MSG_MDS_OPENINOREPLY 0x210
11fdf7f2 174#define MSG_MDS_SNAPUPDATE 0x211
a8e16298 175#define MSG_MDS_FRAGMENTNOTIFYACK 0x212
f67539c2 176#define MSG_MDS_LOCK 0x300 // 0x3xx are for locker of mds
7c673cae
FG
177#define MSG_MDS_INODEFILECAPS 0x301
178
f67539c2 179#define MSG_MDS_EXPORTDIRDISCOVER 0x449 // 0x4xx are for migrator of mds
7c673cae
FG
180#define MSG_MDS_EXPORTDIRDISCOVERACK 0x450
181#define MSG_MDS_EXPORTDIRCANCEL 0x451
182#define MSG_MDS_EXPORTDIRPREP 0x452
183#define MSG_MDS_EXPORTDIRPREPACK 0x453
184#define MSG_MDS_EXPORTDIRWARNING 0x454
185#define MSG_MDS_EXPORTDIRWARNINGACK 0x455
186#define MSG_MDS_EXPORTDIR 0x456
187#define MSG_MDS_EXPORTDIRACK 0x457
188#define MSG_MDS_EXPORTDIRNOTIFY 0x458
189#define MSG_MDS_EXPORTDIRNOTIFYACK 0x459
190#define MSG_MDS_EXPORTDIRFINISH 0x460
191
192#define MSG_MDS_EXPORTCAPS 0x470
193#define MSG_MDS_EXPORTCAPSACK 0x471
194#define MSG_MDS_GATHERCAPS 0x472
195
196#define MSG_MDS_HEARTBEAT 0x500 // for mds load balancer
f67539c2
TL
197#define MSG_MDS_METRICS 0x501 // for mds metric aggregator
198#define MSG_MDS_PING 0x502 // for mds pinger
199#define MSG_MDS_SCRUB_STATS 0x503 // for mds scrub stack
7c673cae
FG
200
201// *** generic ***
202#define MSG_TIMECHECK 0x600
203#define MSG_MON_HEALTH 0x601
204
205// *** Message::encode() crcflags bits ***
206#define MSG_CRC_DATA (1 << 0)
207#define MSG_CRC_HEADER (1 << 1)
208#define MSG_CRC_ALL (MSG_CRC_DATA | MSG_CRC_HEADER)
209
7c673cae
FG
210
211// Special
212#define MSG_NOP 0x607
213
224ce89b 214#define MSG_MON_HEALTH_CHECKS 0x608
11fdf7f2 215#define MSG_TIMECHECK2 0x609
224ce89b 216
7c673cae
FG
217// *** ceph-mgr <-> OSD/MDS daemons ***
218#define MSG_MGR_OPEN 0x700
219#define MSG_MGR_CONFIGURE 0x701
220#define MSG_MGR_REPORT 0x702
221
222// *** ceph-mgr <-> ceph-mon ***
223#define MSG_MGR_BEACON 0x703
224
225// *** ceph-mon(MgrMonitor) -> OSD/MDS daemons ***
226#define MSG_MGR_MAP 0x704
227
228// *** ceph-mon(MgrMonitor) -> ceph-mgr
229#define MSG_MGR_DIGEST 0x705
31f18b77
FG
230// *** cephmgr -> ceph-mon
231#define MSG_MON_MGR_REPORT 0x706
224ce89b 232#define MSG_SERVICE_MAP 0x707
7c673cae 233
11fdf7f2 234#define MSG_MGR_CLOSE 0x708
9f95a23c
TL
235#define MSG_MGR_COMMAND 0x709
236#define MSG_MGR_COMMAND_REPLY 0x70a
11fdf7f2 237
7c673cae
FG
238// ======================================================
239
240// abstract Message class
241
7c673cae 242class Message : public RefCountedObject {
9f95a23c
TL
243public:
244#ifdef WITH_SEASTAR
245 using ConnectionRef = crimson::net::ConnectionRef;
246#else
247 using ConnectionRef = ::ConnectionRef;
248#endif // WITH_SEASTAR
249
7c673cae
FG
250protected:
251 ceph_msg_header header; // headerelope
252 ceph_msg_footer footer;
9f95a23c
TL
253 ceph::buffer::list payload; // "front" unaligned blob
254 ceph::buffer::list middle; // "middle" unaligned blob
255 ceph::buffer::list data; // data payload (page-alignment will be preserved where possible)
7c673cae
FG
256
257 /* recv_stamp is set when the Messenger starts reading the
258 * Message off the wire */
259 utime_t recv_stamp;
260 /* dispatch_stamp is set when the Messenger starts calling dispatch() on
261 * its endpoints */
262 utime_t dispatch_stamp;
263 /* throttle_stamp is the point at which we got throttle */
264 utime_t throttle_stamp;
265 /* time at which message was fully read */
266 utime_t recv_complete_stamp;
267
268 ConnectionRef connection;
269
270 uint32_t magic = 0;
271
9f95a23c 272 boost::intrusive::list_member_hook<> dispatch_q;
7c673cae
FG
273
274public:
275 // zipkin tracing
276 ZTracer::Trace trace;
9f95a23c
TL
277 void encode_trace(ceph::buffer::list &bl, uint64_t features) const;
278 void decode_trace(ceph::buffer::list::const_iterator &p, bool create = false);
7c673cae
FG
279
280 class CompletionHook : public Context {
281 protected:
282 Message *m;
283 friend class Message;
284 public:
285 explicit CompletionHook(Message *_m) : m(_m) {}
286 virtual void set_message(Message *_m) { m = _m; }
287 };
288
9f95a23c
TL
289 typedef boost::intrusive::list<Message,
290 boost::intrusive::member_hook<
291 Message,
292 boost::intrusive::list_member_hook<>,
293 &Message::dispatch_q>> Queue;
7c673cae 294
9f95a23c 295 ceph::mono_time queue_start;
7c673cae
FG
296protected:
297 CompletionHook* completion_hook = nullptr; // owned by Messenger
298
299 // release our size in bytes back to this throttler when our payload
300 // is adjusted or when we are destroyed.
11fdf7f2 301 ThrottleInterface *byte_throttler = nullptr;
7c673cae
FG
302
303 // release a count back to this throttler when we are destroyed
11fdf7f2 304 ThrottleInterface *msg_throttler = nullptr;
7c673cae
FG
305
306 // keep track of how big this message was when we reserved space in
307 // the msgr dispatch_throttler, so that we can properly release it
308 // later. this is necessary because messages can enter the dispatch
309 // queue locally (not via read_message()), and those are not
310 // currently throttled.
311 uint64_t dispatch_throttle_size = 0;
312
313 friend class Messenger;
314
315public:
316 Message() {
317 memset(&header, 0, sizeof(header));
318 memset(&footer, 0, sizeof(footer));
319 }
320 Message(int t, int version=1, int compat_version=0) {
321 memset(&header, 0, sizeof(header));
322 header.type = t;
323 header.version = version;
324 header.compat_version = compat_version;
7c673cae
FG
325 memset(&footer, 0, sizeof(footer));
326 }
327
328 Message *get() {
329 return static_cast<Message *>(RefCountedObject::get());
330 }
331
332protected:
333 ~Message() override {
334 if (byte_throttler)
335 byte_throttler->put(payload.length() + middle.length() + data.length());
336 release_message_throttle();
337 trace.event("message destructed");
338 /* call completion hooks (if any) */
339 if (completion_hook)
340 completion_hook->complete(0);
341 }
342public:
343 const ConnectionRef& get_connection() const { return connection; }
9f95a23c
TL
344 void set_connection(ConnectionRef c) {
345 connection = std::move(c);
7c673cae
FG
346 }
347 CompletionHook* get_completion_hook() { return completion_hook; }
348 void set_completion_hook(CompletionHook *hook) { completion_hook = hook; }
11fdf7f2
TL
349 void set_byte_throttler(ThrottleInterface *t) {
350 byte_throttler = t;
351 }
352 void set_message_throttler(ThrottleInterface *t) {
353 msg_throttler = t;
354 }
7c673cae
FG
355
356 void set_dispatch_throttle_size(uint64_t s) { dispatch_throttle_size = s; }
357 uint64_t get_dispatch_throttle_size() const { return dispatch_throttle_size; }
358
359 const ceph_msg_header &get_header() const { return header; }
360 ceph_msg_header &get_header() { return header; }
361 void set_header(const ceph_msg_header &e) { header = e; }
362 void set_footer(const ceph_msg_footer &e) { footer = e; }
363 const ceph_msg_footer &get_footer() const { return footer; }
364 ceph_msg_footer &get_footer() { return footer; }
365 void set_src(const entity_name_t& src) { header.src = src; }
366
367 uint32_t get_magic() const { return magic; }
368 void set_magic(int _magic) { magic = _magic; }
369
370 /*
371 * If you use get_[data, middle, payload] you shouldn't
9f95a23c 372 * use it to change those ceph::buffer::lists unless you KNOW
7c673cae
FG
373 * there is no throttle being used. The other
374 * functions are throttling-aware as appropriate.
375 */
376
377 void clear_payload() {
378 if (byte_throttler) {
379 byte_throttler->put(payload.length() + middle.length());
380 }
381 payload.clear();
382 middle.clear();
383 }
384
385 virtual void clear_buffers() {}
386 void clear_data() {
387 if (byte_throttler)
388 byte_throttler->put(data.length());
389 data.clear();
390 clear_buffers(); // let subclass drop buffers as well
391 }
392 void release_message_throttle() {
393 if (msg_throttler)
394 msg_throttler->put();
395 msg_throttler = nullptr;
396 }
397
398 bool empty_payload() const { return payload.length() == 0; }
9f95a23c
TL
399 ceph::buffer::list& get_payload() { return payload; }
400 const ceph::buffer::list& get_payload() const { return payload; }
401 void set_payload(ceph::buffer::list& bl) {
7c673cae
FG
402 if (byte_throttler)
403 byte_throttler->put(payload.length());
f67539c2 404 payload = std::move(bl);
7c673cae
FG
405 if (byte_throttler)
406 byte_throttler->take(payload.length());
407 }
408
9f95a23c 409 void set_middle(ceph::buffer::list& bl) {
7c673cae
FG
410 if (byte_throttler)
411 byte_throttler->put(middle.length());
f67539c2 412 middle = std::move(bl);
7c673cae
FG
413 if (byte_throttler)
414 byte_throttler->take(middle.length());
415 }
9f95a23c 416 ceph::buffer::list& get_middle() { return middle; }
7c673cae 417
9f95a23c 418 void set_data(const ceph::buffer::list &bl) {
7c673cae
FG
419 if (byte_throttler)
420 byte_throttler->put(data.length());
421 data.share(bl);
422 if (byte_throttler)
423 byte_throttler->take(data.length());
424 }
425
9f95a23c
TL
426 const ceph::buffer::list& get_data() const { return data; }
427 ceph::buffer::list& get_data() { return data; }
428 void claim_data(ceph::buffer::list& bl) {
7c673cae
FG
429 if (byte_throttler)
430 byte_throttler->put(data.length());
f67539c2 431 bl = std::move(data);
7c673cae 432 }
20effc67 433 uint32_t get_data_len() const { return data.length(); }
7c673cae
FG
434
435 void set_recv_stamp(utime_t t) { recv_stamp = t; }
436 const utime_t& get_recv_stamp() const { return recv_stamp; }
437 void set_dispatch_stamp(utime_t t) { dispatch_stamp = t; }
438 const utime_t& get_dispatch_stamp() const { return dispatch_stamp; }
439 void set_throttle_stamp(utime_t t) { throttle_stamp = t; }
440 const utime_t& get_throttle_stamp() const { return throttle_stamp; }
441 void set_recv_complete_stamp(utime_t t) { recv_complete_stamp = t; }
442 const utime_t& get_recv_complete_stamp() const { return recv_complete_stamp; }
443
444 void calc_header_crc() {
445 header.crc = ceph_crc32c(0, (unsigned char*)&header,
446 sizeof(header) - sizeof(header.crc));
447 }
448 void calc_front_crc() {
449 footer.front_crc = payload.crc32c(0);
450 footer.middle_crc = middle.crc32c(0);
451 }
452 void calc_data_crc() {
453 footer.data_crc = data.crc32c(0);
454 }
455
456 virtual int get_cost() const {
457 return data.length();
458 }
459
460 // type
461 int get_type() const { return header.type; }
462 void set_type(int t) { header.type = t; }
463
464 uint64_t get_tid() const { return header.tid; }
465 void set_tid(uint64_t t) { header.tid = t; }
466
467 uint64_t get_seq() const { return header.seq; }
468 void set_seq(uint64_t s) { header.seq = s; }
469
470 unsigned get_priority() const { return header.priority; }
471 void set_priority(__s16 p) { header.priority = p; }
472
473 // source/dest
474 entity_inst_t get_source_inst() const {
475 return entity_inst_t(get_source(), get_source_addr());
476 }
477 entity_name_t get_source() const {
478 return entity_name_t(header.src);
479 }
480 entity_addr_t get_source_addr() const {
481 if (connection)
482 return connection->get_peer_addr();
483 return entity_addr_t();
484 }
11fdf7f2
TL
485 entity_addrvec_t get_source_addrs() const {
486 if (connection)
487 return connection->get_peer_addrs();
488 return entity_addrvec_t();
489 }
7c673cae
FG
490
491 // forwarded?
492 entity_inst_t get_orig_source_inst() const {
493 return get_source_inst();
494 }
495 entity_name_t get_orig_source() const {
496 return get_source();
497 }
498 entity_addr_t get_orig_source_addr() const {
499 return get_source_addr();
500 }
11fdf7f2
TL
501 entity_addrvec_t get_orig_source_addrs() const {
502 return get_source_addrs();
503 }
7c673cae
FG
504
505 // virtual bits
506 virtual void decode_payload() = 0;
507 virtual void encode_payload(uint64_t features) = 0;
11fdf7f2 508 virtual std::string_view get_type_name() const = 0;
9f95a23c 509 virtual void print(std::ostream& out) const {
7c673cae
FG
510 out << get_type_name() << " magic: " << magic;
511 }
512
9f95a23c 513 virtual void dump(ceph::Formatter *f) const;
7c673cae 514
9f95a23c 515 void encode(uint64_t features, int crcflags, bool skip_header_crc = false);
7c673cae 516};
7c673cae 517
9f95a23c
TL
518extern Message *decode_message(CephContext *cct,
519 int crcflags,
520 ceph_msg_header& header,
521 ceph_msg_footer& footer,
522 ceph::buffer::list& front,
523 ceph::buffer::list& middle,
524 ceph::buffer::list& data,
525 Message::ConnectionRef conn);
526inline std::ostream& operator<<(std::ostream& out, const Message& m) {
7c673cae
FG
527 m.print(out);
528 if (m.get_header().version)
529 out << " v" << m.get_header().version;
530 return out;
531}
532
9f95a23c 533extern void encode_message(Message *m, uint64_t features, ceph::buffer::list& bl);
7c673cae 534extern Message *decode_message(CephContext *cct, int crcflags,
9f95a23c 535 ceph::buffer::list::const_iterator& bl);
11fdf7f2 536
9f95a23c
TL
537/// this is a "safe" version of Message. it does not allow calling get/put
538/// methods on its derived classes. This is intended to prevent some accidental
539/// reference leaks by forcing . Instead, you must either cast the derived class to a
540/// RefCountedObject to do the get/put or detach a temporary reference.
541class SafeMessage : public Message {
11fdf7f2 542public:
9f95a23c
TL
543 using Message::Message;
544private:
545 using RefCountedObject::get;
546 using RefCountedObject::put;
11fdf7f2
TL
547};
548
9f95a23c
TL
549namespace ceph {
550template<class T, typename... Args>
551ceph::ref_t<T> make_message(Args&&... args) {
552 return {new T(std::forward<Args>(args)...), false};
553}
554}
7c673cae 555
20effc67
TL
556namespace crimson {
557template<class T, typename... Args>
558MURef<T> make_message(Args&&... args) {
559 return {new T(std::forward<Args>(args)...), TOPNSPC::common::UniquePtrDeleter{}};
560}
561}
7c673cae 562#endif