]> git.proxmox.com Git - ceph.git/blob - ceph/src/msg/Message.h
b7220e2de28aa0584958059a7f27369231a50c84
[ceph.git] / ceph / src / msg / Message.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_MESSAGE_H
16 #define CEPH_MESSAGE_H
17
18 #include <stdlib.h>
19 #include <ostream>
20
21 #include <boost/intrusive_ptr.hpp>
22 #include <boost/intrusive/list.hpp>
23 // Because intrusive_ptr clobbers our assert...
24 #include "include/assert.h"
25
26 #include "include/types.h"
27 #include "include/buffer.h"
28 #include "common/Throttle.h"
29 #include "common/zipkin_trace.h"
30 #include "msg_types.h"
31
32 #include "common/RefCountedObj.h"
33 #include "msg/Connection.h"
34
35 #include "common/debug.h"
36 #include "common/config.h"
37
38 // monitor internal
39 #define MSG_MON_SCRUB 64
40 #define MSG_MON_ELECTION 65
41 #define MSG_MON_PAXOS 66
42 #define MSG_MON_PROBE 67
43 #define MSG_MON_JOIN 68
44 #define MSG_MON_SYNC 69
45
46 /* monitor <-> mon admin tool */
47 #define MSG_MON_COMMAND 50
48 #define MSG_MON_COMMAND_ACK 51
49 #define MSG_LOG 52
50 #define MSG_LOGACK 53
51
52 #define MSG_GETPOOLSTATS 58
53 #define MSG_GETPOOLSTATSREPLY 59
54
55 #define MSG_MON_GLOBAL_ID 60
56
57 #define MSG_ROUTE 47
58 #define MSG_FORWARD 46
59
60 #define MSG_PAXOS 40
61
62
63 // osd internal
64 #define MSG_OSD_PING 70
65 #define MSG_OSD_BOOT 71
66 #define MSG_OSD_FAILURE 72
67 #define MSG_OSD_ALIVE 73
68 #define MSG_OSD_MARK_ME_DOWN 74
69 #define MSG_OSD_FULL 75
70
71 #define MSG_OSD_SUBOP 76
72 #define MSG_OSD_SUBOPREPLY 77
73
74 #define MSG_OSD_PGTEMP 78
75
76 #define MSG_OSD_BEACON 79
77
78 #define MSG_OSD_PG_NOTIFY 80
79 #define MSG_OSD_PG_QUERY 81
80 #define MSG_OSD_PG_LOG 83
81 #define MSG_OSD_PG_REMOVE 84
82 #define MSG_OSD_PG_INFO 85
83 #define MSG_OSD_PG_TRIM 86
84
85 #define MSG_PGSTATS 87
86 #define MSG_PGSTATSACK 88
87
88 #define MSG_OSD_PG_CREATE 89
89 #define MSG_REMOVE_SNAPS 90
90
91 #define MSG_OSD_SCRUB 91
92 #define MSG_OSD_SCRUB_RESERVE 92 // previous PG_MISSING
93 #define MSG_OSD_REP_SCRUB 93
94
95 #define MSG_OSD_PG_SCAN 94
96 #define MSG_OSD_PG_BACKFILL 95
97 #define MSG_OSD_PG_BACKFILL_REMOVE 96
98
99 #define MSG_COMMAND 97
100 #define MSG_COMMAND_REPLY 98
101
102 #define MSG_OSD_BACKFILL_RESERVE 99
103 #define MSG_OSD_RECOVERY_RESERVE 150
104
105 #define MSG_OSD_PG_PUSH 105
106 #define MSG_OSD_PG_PULL 106
107 #define MSG_OSD_PG_PUSH_REPLY 107
108
109 #define MSG_OSD_EC_WRITE 108
110 #define MSG_OSD_EC_WRITE_REPLY 109
111 #define MSG_OSD_EC_READ 110
112 #define MSG_OSD_EC_READ_REPLY 111
113
114 #define MSG_OSD_REPOP 112
115 #define MSG_OSD_REPOPREPLY 113
116 #define MSG_OSD_PG_UPDATE_LOG_MISSING 114
117 #define MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY 115
118
119 #define MSG_OSD_PG_CREATED 116
120 #define MSG_OSD_REP_SCRUBMAP 117
121
122 // *** MDS ***
123
124 #define MSG_MDS_BEACON 100 // to monitor
125 #define MSG_MDS_SLAVE_REQUEST 101
126 #define MSG_MDS_TABLE_REQUEST 102
127
128 // 150 already in use (MSG_OSD_RECOVERY_RESERVE)
129
130 #define MSG_MDS_RESOLVE 0x200
131 #define MSG_MDS_RESOLVEACK 0x201
132 #define MSG_MDS_CACHEREJOIN 0x202
133 #define MSG_MDS_DISCOVER 0x203
134 #define MSG_MDS_DISCOVERREPLY 0x204
135 #define MSG_MDS_INODEUPDATE 0x205
136 #define MSG_MDS_DIRUPDATE 0x206
137 #define MSG_MDS_CACHEEXPIRE 0x207
138 #define MSG_MDS_DENTRYUNLINK 0x208
139 #define MSG_MDS_FRAGMENTNOTIFY 0x209
140 #define MSG_MDS_OFFLOAD_TARGETS 0x20a
141 #define MSG_MDS_DENTRYLINK 0x20c
142 #define MSG_MDS_FINDINO 0x20d
143 #define MSG_MDS_FINDINOREPLY 0x20e
144 #define MSG_MDS_OPENINO 0x20f
145 #define MSG_MDS_OPENINOREPLY 0x210
146
147 #define MSG_MDS_LOCK 0x300
148 #define MSG_MDS_INODEFILECAPS 0x301
149
150 #define MSG_MDS_EXPORTDIRDISCOVER 0x449
151 #define MSG_MDS_EXPORTDIRDISCOVERACK 0x450
152 #define MSG_MDS_EXPORTDIRCANCEL 0x451
153 #define MSG_MDS_EXPORTDIRPREP 0x452
154 #define MSG_MDS_EXPORTDIRPREPACK 0x453
155 #define MSG_MDS_EXPORTDIRWARNING 0x454
156 #define MSG_MDS_EXPORTDIRWARNINGACK 0x455
157 #define MSG_MDS_EXPORTDIR 0x456
158 #define MSG_MDS_EXPORTDIRACK 0x457
159 #define MSG_MDS_EXPORTDIRNOTIFY 0x458
160 #define MSG_MDS_EXPORTDIRNOTIFYACK 0x459
161 #define MSG_MDS_EXPORTDIRFINISH 0x460
162
163 #define MSG_MDS_EXPORTCAPS 0x470
164 #define MSG_MDS_EXPORTCAPSACK 0x471
165 #define MSG_MDS_GATHERCAPS 0x472
166
167 #define MSG_MDS_HEARTBEAT 0x500 // for mds load balancer
168
169 // *** generic ***
170 #define MSG_TIMECHECK 0x600
171 #define MSG_MON_HEALTH 0x601
172
173 // *** Message::encode() crcflags bits ***
174 #define MSG_CRC_DATA (1 << 0)
175 #define MSG_CRC_HEADER (1 << 1)
176 #define MSG_CRC_ALL (MSG_CRC_DATA | MSG_CRC_HEADER)
177
178 // Xio Testing
179 #define MSG_DATA_PING 0x602
180
181 // Xio intends to define messages 0x603..0x606
182
183 // Special
184 #define MSG_NOP 0x607
185
186 // *** ceph-mgr <-> OSD/MDS daemons ***
187 #define MSG_MGR_OPEN 0x700
188 #define MSG_MGR_CONFIGURE 0x701
189 #define MSG_MGR_REPORT 0x702
190
191 // *** ceph-mgr <-> ceph-mon ***
192 #define MSG_MGR_BEACON 0x703
193
194 // *** ceph-mon(MgrMonitor) -> OSD/MDS daemons ***
195 #define MSG_MGR_MAP 0x704
196
197 // *** ceph-mon(MgrMonitor) -> ceph-mgr
198 #define MSG_MGR_DIGEST 0x705
199 // *** cephmgr -> ceph-mon
200 #define MSG_MON_MGR_REPORT 0x706
201
202 // ======================================================
203
204 // abstract Message class
205
206 namespace bi = boost::intrusive;
207
208 // XioMessenger conditional trace flags
209 #define MSG_MAGIC_XIO 0x0002
210 #define MSG_MAGIC_TRACE_XCON 0x0004
211 #define MSG_MAGIC_TRACE_DTOR 0x0008
212 #define MSG_MAGIC_TRACE_HDR 0x0010
213 #define MSG_MAGIC_TRACE_XIO 0x0020
214 #define MSG_MAGIC_TRACE_XMSGR 0x0040
215 #define MSG_MAGIC_TRACE_CTR 0x0080
216
217 // XioMessenger diagnostic "ping pong" flag (resend msg when send completes)
218 #define MSG_MAGIC_REDUPE 0x0100
219
220 class Message : public RefCountedObject {
221 protected:
222 ceph_msg_header header; // headerelope
223 ceph_msg_footer footer;
224 bufferlist payload; // "front" unaligned blob
225 bufferlist middle; // "middle" unaligned blob
226 bufferlist data; // data payload (page-alignment will be preserved where possible)
227
228 /* recv_stamp is set when the Messenger starts reading the
229 * Message off the wire */
230 utime_t recv_stamp;
231 /* dispatch_stamp is set when the Messenger starts calling dispatch() on
232 * its endpoints */
233 utime_t dispatch_stamp;
234 /* throttle_stamp is the point at which we got throttle */
235 utime_t throttle_stamp;
236 /* time at which message was fully read */
237 utime_t recv_complete_stamp;
238
239 ConnectionRef connection;
240
241 uint32_t magic = 0;
242
243 bi::list_member_hook<> dispatch_q;
244
245 public:
246 // zipkin tracing
247 ZTracer::Trace trace;
248 void encode_trace(bufferlist &bl, uint64_t features) const;
249 void decode_trace(bufferlist::iterator &p, bool create = false);
250
251 class CompletionHook : public Context {
252 protected:
253 Message *m;
254 friend class Message;
255 public:
256 explicit CompletionHook(Message *_m) : m(_m) {}
257 virtual void set_message(Message *_m) { m = _m; }
258 };
259
260 typedef bi::list< Message,
261 bi::member_hook< Message,
262 bi::list_member_hook<>,
263 &Message::dispatch_q > > Queue;
264
265 protected:
266 CompletionHook* completion_hook = nullptr; // owned by Messenger
267
268 // release our size in bytes back to this throttler when our payload
269 // is adjusted or when we are destroyed.
270 Throttle *byte_throttler = nullptr;
271
272 // release a count back to this throttler when we are destroyed
273 Throttle *msg_throttler = nullptr;
274
275 // keep track of how big this message was when we reserved space in
276 // the msgr dispatch_throttler, so that we can properly release it
277 // later. this is necessary because messages can enter the dispatch
278 // queue locally (not via read_message()), and those are not
279 // currently throttled.
280 uint64_t dispatch_throttle_size = 0;
281
282 friend class Messenger;
283
284 public:
285 Message() {
286 memset(&header, 0, sizeof(header));
287 memset(&footer, 0, sizeof(footer));
288 }
289 Message(int t, int version=1, int compat_version=0) {
290 memset(&header, 0, sizeof(header));
291 header.type = t;
292 header.version = version;
293 header.compat_version = compat_version;
294 header.priority = 0; // undef
295 header.data_off = 0;
296 memset(&footer, 0, sizeof(footer));
297 }
298
299 Message *get() {
300 return static_cast<Message *>(RefCountedObject::get());
301 }
302
303 protected:
304 ~Message() override {
305 if (byte_throttler)
306 byte_throttler->put(payload.length() + middle.length() + data.length());
307 release_message_throttle();
308 trace.event("message destructed");
309 /* call completion hooks (if any) */
310 if (completion_hook)
311 completion_hook->complete(0);
312 }
313 public:
314 const ConnectionRef& get_connection() const { return connection; }
315 void set_connection(const ConnectionRef& c) {
316 connection = c;
317 }
318 CompletionHook* get_completion_hook() { return completion_hook; }
319 void set_completion_hook(CompletionHook *hook) { completion_hook = hook; }
320 void set_byte_throttler(Throttle *t) { byte_throttler = t; }
321 Throttle *get_byte_throttler() { return byte_throttler; }
322 void set_message_throttler(Throttle *t) { msg_throttler = t; }
323 Throttle *get_message_throttler() { return msg_throttler; }
324
325 void set_dispatch_throttle_size(uint64_t s) { dispatch_throttle_size = s; }
326 uint64_t get_dispatch_throttle_size() const { return dispatch_throttle_size; }
327
328 const ceph_msg_header &get_header() const { return header; }
329 ceph_msg_header &get_header() { return header; }
330 void set_header(const ceph_msg_header &e) { header = e; }
331 void set_footer(const ceph_msg_footer &e) { footer = e; }
332 const ceph_msg_footer &get_footer() const { return footer; }
333 ceph_msg_footer &get_footer() { return footer; }
334 void set_src(const entity_name_t& src) { header.src = src; }
335
336 uint32_t get_magic() const { return magic; }
337 void set_magic(int _magic) { magic = _magic; }
338
339 /*
340 * If you use get_[data, middle, payload] you shouldn't
341 * use it to change those bufferlists unless you KNOW
342 * there is no throttle being used. The other
343 * functions are throttling-aware as appropriate.
344 */
345
346 void clear_payload() {
347 if (byte_throttler) {
348 byte_throttler->put(payload.length() + middle.length());
349 }
350 payload.clear();
351 middle.clear();
352 }
353
354 virtual void clear_buffers() {}
355 void clear_data() {
356 if (byte_throttler)
357 byte_throttler->put(data.length());
358 data.clear();
359 clear_buffers(); // let subclass drop buffers as well
360 }
361 void release_message_throttle() {
362 if (msg_throttler)
363 msg_throttler->put();
364 msg_throttler = nullptr;
365 }
366
367 bool empty_payload() const { return payload.length() == 0; }
368 bufferlist& get_payload() { return payload; }
369 void set_payload(bufferlist& bl) {
370 if (byte_throttler)
371 byte_throttler->put(payload.length());
372 payload.claim(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE);
373 if (byte_throttler)
374 byte_throttler->take(payload.length());
375 }
376
377 void set_middle(bufferlist& bl) {
378 if (byte_throttler)
379 byte_throttler->put(middle.length());
380 middle.claim(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE);
381 if (byte_throttler)
382 byte_throttler->take(middle.length());
383 }
384 bufferlist& get_middle() { return middle; }
385
386 void set_data(const bufferlist &bl) {
387 if (byte_throttler)
388 byte_throttler->put(data.length());
389 data.share(bl);
390 if (byte_throttler)
391 byte_throttler->take(data.length());
392 }
393
394 const bufferlist& get_data() const { return data; }
395 bufferlist& get_data() { return data; }
396 void claim_data(bufferlist& bl,
397 unsigned int flags = buffer::list::CLAIM_DEFAULT) {
398 if (byte_throttler)
399 byte_throttler->put(data.length());
400 bl.claim(data, flags);
401 }
402 off_t get_data_len() const { return data.length(); }
403
404 void set_recv_stamp(utime_t t) { recv_stamp = t; }
405 const utime_t& get_recv_stamp() const { return recv_stamp; }
406 void set_dispatch_stamp(utime_t t) { dispatch_stamp = t; }
407 const utime_t& get_dispatch_stamp() const { return dispatch_stamp; }
408 void set_throttle_stamp(utime_t t) { throttle_stamp = t; }
409 const utime_t& get_throttle_stamp() const { return throttle_stamp; }
410 void set_recv_complete_stamp(utime_t t) { recv_complete_stamp = t; }
411 const utime_t& get_recv_complete_stamp() const { return recv_complete_stamp; }
412
413 void calc_header_crc() {
414 header.crc = ceph_crc32c(0, (unsigned char*)&header,
415 sizeof(header) - sizeof(header.crc));
416 }
417 void calc_front_crc() {
418 footer.front_crc = payload.crc32c(0);
419 footer.middle_crc = middle.crc32c(0);
420 }
421 void calc_data_crc() {
422 footer.data_crc = data.crc32c(0);
423 }
424
425 virtual int get_cost() const {
426 return data.length();
427 }
428
429 // type
430 int get_type() const { return header.type; }
431 void set_type(int t) { header.type = t; }
432
433 uint64_t get_tid() const { return header.tid; }
434 void set_tid(uint64_t t) { header.tid = t; }
435
436 uint64_t get_seq() const { return header.seq; }
437 void set_seq(uint64_t s) { header.seq = s; }
438
439 unsigned get_priority() const { return header.priority; }
440 void set_priority(__s16 p) { header.priority = p; }
441
442 // source/dest
443 entity_inst_t get_source_inst() const {
444 return entity_inst_t(get_source(), get_source_addr());
445 }
446 entity_name_t get_source() const {
447 return entity_name_t(header.src);
448 }
449 entity_addr_t get_source_addr() const {
450 if (connection)
451 return connection->get_peer_addr();
452 return entity_addr_t();
453 }
454
455 // forwarded?
456 entity_inst_t get_orig_source_inst() const {
457 return get_source_inst();
458 }
459 entity_name_t get_orig_source() const {
460 return get_source();
461 }
462 entity_addr_t get_orig_source_addr() const {
463 return get_source_addr();
464 }
465
466 // virtual bits
467 virtual void decode_payload() = 0;
468 virtual void encode_payload(uint64_t features) = 0;
469 virtual const char *get_type_name() const = 0;
470 virtual void print(ostream& out) const {
471 out << get_type_name() << " magic: " << magic;
472 }
473
474 virtual void dump(Formatter *f) const;
475
476 void encode(uint64_t features, int crcflags);
477 };
478 typedef boost::intrusive_ptr<Message> MessageRef;
479
480 extern Message *decode_message(CephContext *cct, int crcflags,
481 ceph_msg_header &header,
482 ceph_msg_footer& footer, bufferlist& front,
483 bufferlist& middle, bufferlist& data,
484 Connection* conn);
485 inline ostream& operator<<(ostream& out, const Message& m) {
486 m.print(out);
487 if (m.get_header().version)
488 out << " v" << m.get_header().version;
489 return out;
490 }
491
492 extern void encode_message(Message *m, uint64_t features, bufferlist& bl);
493 extern Message *decode_message(CephContext *cct, int crcflags,
494 bufferlist::iterator& bl);
495
496 #endif