]> git.proxmox.com Git - ceph.git/blob - ceph/src/messages/MOSDOp.h
bump version to 18.2.4-pve3
[ceph.git] / ceph / src / messages / MOSDOp.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #ifndef CEPH_MOSDOP_H
17 #define CEPH_MOSDOP_H
18
19 #include <atomic>
20
21 #include "MOSDFastDispatchOp.h"
22 #include "include/ceph_features.h"
23 #include "common/hobject.h"
24
25 /*
26 * OSD op
27 *
28 * oid - object id
29 * op - OSD_OP_DELETE, etc.
30 *
31 */
32
33 class MOSDOpReply;
34
35 namespace _mosdop {
36 template<typename V>
37 class MOSDOp final : public MOSDFastDispatchOp {
38 private:
39 static constexpr int HEAD_VERSION = 8;
40 static constexpr int COMPAT_VERSION = 3;
41
42 private:
43 uint32_t client_inc = 0;
44 __u32 osdmap_epoch = 0;
45 __u32 flags = 0;
46 utime_t mtime;
47 int32_t retry_attempt = -1; // 0 is first attempt. -1 if we don't know.
48
49 hobject_t hobj;
50 spg_t pgid;
51 ceph::buffer::list::const_iterator p;
52 // Decoding flags. Decoding is only needed for messages caught by pipe reader.
53 // Transition from true -> false without locks being held
54 // Can never see final_decode_needed == false and partial_decode_needed == true
55 std::atomic<bool> partial_decode_needed;
56 std::atomic<bool> final_decode_needed;
57 //
58 public:
59 V ops;
60 private:
61 snapid_t snap_seq;
62 std::vector<snapid_t> snaps;
63
64 uint64_t features;
65 bool bdata_encode;
66 osd_reqid_t reqid; // reqid explicitly set by sender
67
68 public:
69 friend MOSDOpReply;
70
71 ceph_tid_t get_client_tid() { return header.tid; }
72 void set_snapid(const snapid_t& s) {
73 hobj.snap = s;
74 }
75 void set_snaps(const std::vector<snapid_t>& i) {
76 snaps = i;
77 }
78 void set_snap_seq(const snapid_t& s) { snap_seq = s; }
79 void set_reqid(const osd_reqid_t rid) {
80 reqid = rid;
81 }
82 void set_spg(spg_t p) {
83 pgid = p;
84 }
85
86 // Fields decoded in partial decoding
87 pg_t get_pg() const {
88 ceph_assert(!partial_decode_needed);
89 return pgid.pgid;
90 }
91 spg_t get_spg() const override {
92 ceph_assert(!partial_decode_needed);
93 return pgid;
94 }
95 pg_t get_raw_pg() const {
96 ceph_assert(!partial_decode_needed);
97 return pg_t(hobj.get_hash(), pgid.pgid.pool());
98 }
99 epoch_t get_map_epoch() const override {
100 ceph_assert(!partial_decode_needed);
101 return osdmap_epoch;
102 }
103 int get_flags() const {
104 ceph_assert(!partial_decode_needed);
105 return flags;
106 }
107 osd_reqid_t get_reqid() const {
108 ceph_assert(!partial_decode_needed);
109 if (reqid.name != entity_name_t() || reqid.tid != 0) {
110 return reqid;
111 } else {
112 if (!final_decode_needed)
113 ceph_assert(reqid.inc == (int32_t)client_inc); // decode() should have done this
114 return osd_reqid_t(get_orig_source(),
115 reqid.inc,
116 header.tid);
117 }
118 }
119
120 // Fields decoded in final decoding
121 int get_client_inc() const {
122 ceph_assert(!final_decode_needed);
123 return client_inc;
124 }
125 utime_t get_mtime() const {
126 ceph_assert(!final_decode_needed);
127 return mtime;
128 }
129 object_locator_t get_object_locator() const {
130 ceph_assert(!final_decode_needed);
131 if (hobj.oid.name.empty())
132 return object_locator_t(hobj.pool, hobj.nspace, hobj.get_hash());
133 else
134 return object_locator_t(hobj);
135 }
136 const object_t& get_oid() const {
137 ceph_assert(!final_decode_needed);
138 return hobj.oid;
139 }
140 const hobject_t &get_hobj() const {
141 return hobj;
142 }
143 snapid_t get_snapid() const {
144 ceph_assert(!final_decode_needed);
145 return hobj.snap;
146 }
147 const snapid_t& get_snap_seq() const {
148 ceph_assert(!final_decode_needed);
149 return snap_seq;
150 }
151 const std::vector<snapid_t> &get_snaps() const {
152 ceph_assert(!final_decode_needed);
153 return snaps;
154 }
155
156 /**
157 * get retry attempt
158 *
159 * 0 is the first attempt.
160 *
161 * @return retry attempt, or -1 if we don't know
162 */
163 int get_retry_attempt() const {
164 return retry_attempt;
165 }
166 uint64_t get_features() const {
167 if (features)
168 return features;
169 #ifdef WITH_SEASTAR
170 // In crimson, conn is independently maintained outside Message.
171 ceph_abort();
172 #endif
173 return get_connection()->get_features();
174 }
175
176 MOSDOp()
177 : MOSDFastDispatchOp(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
178 partial_decode_needed(true),
179 final_decode_needed(true),
180 bdata_encode(false) { }
181 MOSDOp(int inc, ceph_tid_t tid, const hobject_t& ho, spg_t& _pgid,
182 epoch_t _osdmap_epoch,
183 int _flags, uint64_t feat)
184 : MOSDFastDispatchOp(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
185 client_inc(inc),
186 osdmap_epoch(_osdmap_epoch), flags(_flags), retry_attempt(-1),
187 hobj(ho),
188 pgid(_pgid),
189 partial_decode_needed(false),
190 final_decode_needed(false),
191 features(feat),
192 bdata_encode(false) {
193 set_tid(tid);
194
195 // also put the client_inc in reqid.inc, so that get_reqid() can
196 // be used before the full message is decoded.
197 reqid.inc = inc;
198 }
199 private:
200 ~MOSDOp() final {}
201
202 public:
203 void set_mtime(utime_t mt) { mtime = mt; }
204 void set_mtime(ceph::real_time mt) {
205 mtime = ceph::real_clock::to_timespec(mt);
206 }
207
208 // ops
209 void add_simple_op(int o, uint64_t off, uint64_t len) {
210 OSDOp osd_op;
211 osd_op.op.op = o;
212 osd_op.op.extent.offset = off;
213 osd_op.op.extent.length = len;
214 ops.push_back(osd_op);
215 }
216 void write(uint64_t off, uint64_t len, ceph::buffer::list& bl) {
217 add_simple_op(CEPH_OSD_OP_WRITE, off, len);
218 data = std::move(bl);
219 header.data_off = off;
220 }
221 void writefull(ceph::buffer::list& bl) {
222 add_simple_op(CEPH_OSD_OP_WRITEFULL, 0, bl.length());
223 data = std::move(bl);
224 header.data_off = 0;
225 }
226 void zero(uint64_t off, uint64_t len) {
227 add_simple_op(CEPH_OSD_OP_ZERO, off, len);
228 }
229 void truncate(uint64_t off) {
230 add_simple_op(CEPH_OSD_OP_TRUNCATE, off, 0);
231 }
232 void remove() {
233 add_simple_op(CEPH_OSD_OP_DELETE, 0, 0);
234 }
235
236 void read(uint64_t off, uint64_t len) {
237 add_simple_op(CEPH_OSD_OP_READ, off, len);
238 }
239 void stat() {
240 add_simple_op(CEPH_OSD_OP_STAT, 0, 0);
241 }
242
243 bool has_flag(__u32 flag) const { return flags & flag; };
244
245 bool is_retry_attempt() const { return flags & CEPH_OSD_FLAG_RETRY; }
246 void set_retry_attempt(unsigned a) {
247 if (a)
248 flags |= CEPH_OSD_FLAG_RETRY;
249 else
250 flags &= ~CEPH_OSD_FLAG_RETRY;
251 retry_attempt = a;
252 }
253
254 // marshalling
255 void encode_payload(uint64_t features) override {
256 using ceph::encode;
257 if( false == bdata_encode ) {
258 OSDOp::merge_osd_op_vector_in_data(ops, data);
259 bdata_encode = true;
260 }
261
262 if ((features & CEPH_FEATURE_OBJECTLOCATOR) == 0) {
263 // here is the old structure we are encoding to: //
264 #if 0
265 struct ceph_osd_request_head {
266 ceph_le32 client_inc; /* client incarnation */
267 struct ceph_object_layout layout; /* pgid */
268 ceph_le32 osdmap_epoch; /* client's osdmap epoch */
269
270 ceph_le32 flags;
271
272 struct ceph_timespec mtime; /* for mutations only */
273 struct ceph_eversion reassert_version; /* if we are replaying op */
274
275 ceph_le32 object_len; /* length of object name */
276
277 ceph_le64 snapid; /* snapid to read */
278 ceph_le64 snap_seq; /* writer's snap context */
279 ceph_le32 num_snaps;
280
281 ceph_le16 num_ops;
282 struct ceph_osd_op ops[]; /* followed by ops[], obj, ticket, snaps */
283 } __attribute__ ((packed));
284 #endif
285 header.version = 1;
286
287 encode(client_inc, payload);
288
289 __u32 su = 0;
290 encode(get_raw_pg(), payload);
291 encode(su, payload);
292
293 encode(osdmap_epoch, payload);
294 encode(flags, payload);
295 encode(mtime, payload);
296 encode(eversion_t(), payload); // reassert_version
297
298 __u32 oid_len = hobj.oid.name.length();
299 encode(oid_len, payload);
300 encode(hobj.snap, payload);
301 encode(snap_seq, payload);
302 __u32 num_snaps = snaps.size();
303 encode(num_snaps, payload);
304
305 //::encode(ops, payload);
306 __u16 num_ops = ops.size();
307 encode(num_ops, payload);
308 for (unsigned i = 0; i < ops.size(); i++)
309 encode(ops[i].op, payload);
310
311 ceph::encode_nohead(hobj.oid.name, payload);
312 ceph::encode_nohead(snaps, payload);
313 } else if ((features & CEPH_FEATURE_NEW_OSDOP_ENCODING) == 0) {
314 header.version = 6;
315 encode(client_inc, payload);
316 encode(osdmap_epoch, payload);
317 encode(flags, payload);
318 encode(mtime, payload);
319 encode(eversion_t(), payload); // reassert_version
320 encode(get_object_locator(), payload);
321 encode(get_raw_pg(), payload);
322
323 encode(hobj.oid, payload);
324
325 __u16 num_ops = ops.size();
326 encode(num_ops, payload);
327 for (unsigned i = 0; i < ops.size(); i++)
328 encode(ops[i].op, payload);
329
330 encode(hobj.snap, payload);
331 encode(snap_seq, payload);
332 encode(snaps, payload);
333
334 encode(retry_attempt, payload);
335 encode(features, payload);
336 if (reqid.name != entity_name_t() || reqid.tid != 0) {
337 encode(reqid, payload);
338 } else {
339 // don't include client_inc in the reqid for the legacy v6
340 // encoding or else we'll confuse older peers.
341 encode(osd_reqid_t(), payload);
342 }
343 } else if (!HAVE_FEATURE(features, RESEND_ON_SPLIT)) {
344 // reordered, v7 message encoding
345 header.version = 7;
346 encode(get_raw_pg(), payload);
347 encode(osdmap_epoch, payload);
348 encode(flags, payload);
349 encode(eversion_t(), payload); // reassert_version
350 encode(reqid, payload);
351 encode(client_inc, payload);
352 encode(mtime, payload);
353 encode(get_object_locator(), payload);
354 encode(hobj.oid, payload);
355
356 __u16 num_ops = ops.size();
357 encode(num_ops, payload);
358 for (unsigned i = 0; i < ops.size(); i++)
359 encode(ops[i].op, payload);
360
361 encode(hobj.snap, payload);
362 encode(snap_seq, payload);
363 encode(snaps, payload);
364
365 encode(retry_attempt, payload);
366 encode(features, payload);
367 } else {
368 // latest v8 encoding with hobject_t hash separate from pgid, no
369 // reassert version
370 header.version = HEAD_VERSION;
371
372 encode(pgid, payload);
373 encode(hobj.get_hash(), payload);
374 encode(osdmap_epoch, payload);
375 encode(flags, payload);
376 encode(reqid, payload);
377 encode_trace(payload, features);
378
379 // -- above decoded up front; below decoded post-dispatch thread --
380
381 encode(client_inc, payload);
382 encode(mtime, payload);
383 encode(get_object_locator(), payload);
384 encode(hobj.oid, payload);
385
386 __u16 num_ops = ops.size();
387 encode(num_ops, payload);
388 for (unsigned i = 0; i < ops.size(); i++)
389 encode(ops[i].op, payload);
390
391 encode(hobj.snap, payload);
392 encode(snap_seq, payload);
393 encode(snaps, payload);
394
395 encode(retry_attempt, payload);
396 encode(features, payload);
397 }
398 }
399
400 void decode_payload() override {
401 using ceph::decode;
402 ceph_assert(partial_decode_needed && final_decode_needed);
403 p = std::cbegin(payload);
404
405 // Always keep here the newest version of decoding order/rule
406 if (header.version == HEAD_VERSION) {
407 decode(pgid, p); // actual pgid
408 uint32_t hash;
409 decode(hash, p); // raw hash value
410 hobj.set_hash(hash);
411 decode(osdmap_epoch, p);
412 decode(flags, p);
413 decode(reqid, p);
414 decode_trace(p);
415 } else if (header.version == 7) {
416 decode(pgid.pgid, p); // raw pgid
417 hobj.set_hash(pgid.pgid.ps());
418 decode(osdmap_epoch, p);
419 decode(flags, p);
420 eversion_t reassert_version;
421 decode(reassert_version, p);
422 decode(reqid, p);
423 } else if (header.version < 2) {
424 // old decode
425 decode(client_inc, p);
426
427 old_pg_t opgid;
428 ceph::decode_raw(opgid, p);
429 pgid.pgid = opgid;
430
431 __u32 su;
432 decode(su, p);
433
434 decode(osdmap_epoch, p);
435 decode(flags, p);
436 decode(mtime, p);
437 eversion_t reassert_version;
438 decode(reassert_version, p);
439
440 __u32 oid_len;
441 decode(oid_len, p);
442 decode(hobj.snap, p);
443 decode(snap_seq, p);
444 __u32 num_snaps;
445 decode(num_snaps, p);
446
447 //::decode(ops, p);
448 __u16 num_ops;
449 decode(num_ops, p);
450 ops.resize(num_ops);
451 for (unsigned i = 0; i < num_ops; i++)
452 decode(ops[i].op, p);
453
454 ceph::decode_nohead(oid_len, hobj.oid.name, p);
455 ceph::decode_nohead(num_snaps, snaps, p);
456
457 // recalculate pgid hash value
458 pgid.pgid.set_ps(ceph_str_hash(CEPH_STR_HASH_RJENKINS,
459 hobj.oid.name.c_str(),
460 hobj.oid.name.length()));
461 hobj.pool = pgid.pgid.pool();
462 hobj.set_hash(pgid.pgid.ps());
463
464 retry_attempt = -1;
465 features = 0;
466 OSDOp::split_osd_op_vector_in_data(ops, data);
467
468 // we did the full decode
469 final_decode_needed = false;
470
471 // put client_inc in reqid.inc for get_reqid()'s benefit
472 reqid = osd_reqid_t();
473 reqid.inc = client_inc;
474 } else if (header.version < 7) {
475 decode(client_inc, p);
476 decode(osdmap_epoch, p);
477 decode(flags, p);
478 decode(mtime, p);
479 eversion_t reassert_version;
480 decode(reassert_version, p);
481
482 object_locator_t oloc;
483 decode(oloc, p);
484
485 if (header.version < 3) {
486 old_pg_t opgid;
487 ceph::decode_raw(opgid, p);
488 pgid.pgid = opgid;
489 } else {
490 decode(pgid.pgid, p);
491 }
492
493 decode(hobj.oid, p);
494
495 //::decode(ops, p);
496 __u16 num_ops;
497 decode(num_ops, p);
498 ops.resize(num_ops);
499 for (unsigned i = 0; i < num_ops; i++)
500 decode(ops[i].op, p);
501
502 decode(hobj.snap, p);
503 decode(snap_seq, p);
504 decode(snaps, p);
505
506 if (header.version >= 4)
507 decode(retry_attempt, p);
508 else
509 retry_attempt = -1;
510
511 if (header.version >= 5)
512 decode(features, p);
513 else
514 features = 0;
515
516 if (header.version >= 6)
517 decode(reqid, p);
518 else
519 reqid = osd_reqid_t();
520
521 hobj.pool = pgid.pgid.pool();
522 hobj.set_key(oloc.key);
523 hobj.nspace = oloc.nspace;
524 hobj.set_hash(pgid.pgid.ps());
525
526 OSDOp::split_osd_op_vector_in_data(ops, data);
527
528 // we did the full decode
529 final_decode_needed = false;
530
531 // put client_inc in reqid.inc for get_reqid()'s benefit
532 if (reqid.name == entity_name_t() && reqid.tid == 0)
533 reqid.inc = client_inc;
534 }
535
536 partial_decode_needed = false;
537 }
538
539 bool finish_decode() {
540 using ceph::decode;
541 ceph_assert(!partial_decode_needed); // partial decoding required
542 if (!final_decode_needed)
543 return false; // Message is already final decoded
544 ceph_assert(header.version >= 7);
545
546 decode(client_inc, p);
547 decode(mtime, p);
548 object_locator_t oloc;
549 decode(oloc, p);
550 decode(hobj.oid, p);
551
552 __u16 num_ops;
553 decode(num_ops, p);
554 ops.resize(num_ops);
555 for (unsigned i = 0; i < num_ops; i++)
556 decode(ops[i].op, p);
557
558 decode(hobj.snap, p);
559 decode(snap_seq, p);
560 decode(snaps, p);
561
562 decode(retry_attempt, p);
563
564 decode(features, p);
565
566 hobj.pool = pgid.pgid.pool();
567 hobj.set_key(oloc.key);
568 hobj.nspace = oloc.nspace;
569
570 OSDOp::split_osd_op_vector_in_data(ops, data);
571
572 final_decode_needed = false;
573 return true;
574 }
575
576 void clear_buffers() override {
577 OSDOp::clear_data(ops);
578 bdata_encode = false;
579 }
580
581 std::string_view get_type_name() const override { return "osd_op"; }
582 void print(std::ostream& out) const override {
583 out << "osd_op(";
584 if (!partial_decode_needed) {
585 out << get_reqid() << ' ';
586 out << pgid;
587 if (!final_decode_needed) {
588 out << ' ';
589 out << hobj
590 << " " << ops
591 << " snapc " << get_snap_seq() << "=" << snaps;
592 if (is_retry_attempt())
593 out << " RETRY=" << get_retry_attempt();
594 } else {
595 out << " " << get_raw_pg() << " (undecoded)";
596 }
597 out << " " << ceph_osd_flag_string(get_flags());
598 out << " e" << osdmap_epoch;
599 }
600 out << ")";
601 }
602
603 private:
604 template<class T, typename... Args>
605 friend boost::intrusive_ptr<T> ceph::make_message(Args&&... args);
606 };
607 }
608
609 using MOSDOp = _mosdop::MOSDOp<std::vector<OSDOp>>;
610
611
612 #endif