]> git.proxmox.com Git - ceph.git/blob - ceph/src/messages/MOSDOp.h
update sources to v12.1.1
[ceph.git] / ceph / src / messages / MOSDOp.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #ifndef CEPH_MOSDOP_H
17 #define CEPH_MOSDOP_H
18
19 #include "MOSDFastDispatchOp.h"
20 #include "include/ceph_features.h"
21 #include "common/hobject.h"
22 #include <atomic>
23
24 /*
25 * OSD op
26 *
27 * oid - object id
28 * op - OSD_OP_DELETE, etc.
29 *
30 */
31
32 class OSD;
33
34 class MOSDOp : public MOSDFastDispatchOp {
35
36 static const int HEAD_VERSION = 8;
37 static const int COMPAT_VERSION = 3;
38
39 private:
40 uint32_t client_inc;
41 __u32 osdmap_epoch;
42 __u32 flags;
43 utime_t mtime;
44 int32_t retry_attempt; // 0 is first attempt. -1 if we don't know.
45
46 hobject_t hobj;
47 spg_t pgid;
48 bufferlist::iterator p;
49 // Decoding flags. Decoding is only needed for messages catched by pipe reader.
50 // Transition from true -> false without locks being held
51 // Can never see final_decode_needed == false and partial_decode_needed == true
52 atomic<bool> partial_decode_needed;
53 atomic<bool> final_decode_needed;
54 //
55 public:
56 vector<OSDOp> ops;
57 private:
58 snapid_t snap_seq;
59 vector<snapid_t> snaps;
60
61 uint64_t features;
62
63 osd_reqid_t reqid; // reqid explicitly set by sender
64
65 public:
66 friend class MOSDOpReply;
67
68 ceph_tid_t get_client_tid() { return header.tid; }
69 void set_snapid(const snapid_t& s) {
70 hobj.snap = s;
71 }
72 void set_snaps(const vector<snapid_t>& i) {
73 snaps = i;
74 }
75 void set_snap_seq(const snapid_t& s) { snap_seq = s; }
76 void set_reqid(const osd_reqid_t rid) {
77 reqid = rid;
78 }
79 void set_spg(spg_t p) {
80 pgid = p;
81 }
82
83 // Fields decoded in partial decoding
84 pg_t get_pg() const {
85 assert(!partial_decode_needed);
86 return pgid.pgid;
87 }
88 spg_t get_spg() const override {
89 assert(!partial_decode_needed);
90 return pgid;
91 }
92 pg_t get_raw_pg() const {
93 assert(!partial_decode_needed);
94 return pg_t(hobj.get_hash(), pgid.pgid.pool());
95 }
96 epoch_t get_map_epoch() const override {
97 assert(!partial_decode_needed);
98 return osdmap_epoch;
99 }
100 int get_flags() const {
101 assert(!partial_decode_needed);
102 return flags;
103 }
104 osd_reqid_t get_reqid() const {
105 assert(!partial_decode_needed);
106 if (reqid.name != entity_name_t() || reqid.tid != 0) {
107 return reqid;
108 } else {
109 if (!final_decode_needed)
110 assert(reqid.inc == (int32_t)client_inc); // decode() should have done this
111 return osd_reqid_t(get_orig_source(),
112 reqid.inc,
113 header.tid);
114 }
115 }
116
117 // Fields decoded in final decoding
118 int get_client_inc() const {
119 assert(!final_decode_needed);
120 return client_inc;
121 }
122 utime_t get_mtime() const {
123 assert(!final_decode_needed);
124 return mtime;
125 }
126 object_locator_t get_object_locator() const {
127 assert(!final_decode_needed);
128 if (hobj.oid.name.empty())
129 return object_locator_t(hobj.pool, hobj.nspace, hobj.get_hash());
130 else
131 return object_locator_t(hobj);
132 }
133 const object_t& get_oid() const {
134 assert(!final_decode_needed);
135 return hobj.oid;
136 }
137 const hobject_t &get_hobj() const {
138 return hobj;
139 }
140 snapid_t get_snapid() const {
141 assert(!final_decode_needed);
142 return hobj.snap;
143 }
144 const snapid_t& get_snap_seq() const {
145 assert(!final_decode_needed);
146 return snap_seq;
147 }
148 const vector<snapid_t> &get_snaps() const {
149 assert(!final_decode_needed);
150 return snaps;
151 }
152
153 /**
154 * get retry attempt
155 *
156 * 0 is the first attempt.
157 *
158 * @return retry attempt, or -1 if we don't know
159 */
160 int get_retry_attempt() const {
161 return retry_attempt;
162 }
163 uint64_t get_features() const {
164 if (features)
165 return features;
166 return get_connection()->get_features();
167 }
168
169 MOSDOp()
170 : MOSDFastDispatchOp(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
171 partial_decode_needed(true),
172 final_decode_needed(true) { }
173 MOSDOp(int inc, long tid, const hobject_t& ho, spg_t& _pgid,
174 epoch_t _osdmap_epoch,
175 int _flags, uint64_t feat)
176 : MOSDFastDispatchOp(CEPH_MSG_OSD_OP, HEAD_VERSION, COMPAT_VERSION),
177 client_inc(inc),
178 osdmap_epoch(_osdmap_epoch), flags(_flags), retry_attempt(-1),
179 hobj(ho),
180 pgid(_pgid),
181 partial_decode_needed(false),
182 final_decode_needed(false),
183 features(feat) {
184 set_tid(tid);
185
186 // also put the client_inc in reqid.inc, so that get_reqid() can
187 // be used before the full message is decoded.
188 reqid.inc = inc;
189 }
190 private:
191 ~MOSDOp() override {}
192
193 public:
194 void set_mtime(utime_t mt) { mtime = mt; }
195 void set_mtime(ceph::real_time mt) {
196 mtime = ceph::real_clock::to_timespec(mt);
197 }
198
199 // ops
200 void add_simple_op(int o, uint64_t off, uint64_t len) {
201 OSDOp osd_op;
202 osd_op.op.op = o;
203 osd_op.op.extent.offset = off;
204 osd_op.op.extent.length = len;
205 ops.push_back(osd_op);
206 }
207 void write(uint64_t off, uint64_t len, bufferlist& bl) {
208 add_simple_op(CEPH_OSD_OP_WRITE, off, len);
209 data.claim(bl);
210 header.data_off = off;
211 }
212 void writefull(bufferlist& bl) {
213 add_simple_op(CEPH_OSD_OP_WRITEFULL, 0, bl.length());
214 data.claim(bl);
215 header.data_off = 0;
216 }
217 void zero(uint64_t off, uint64_t len) {
218 add_simple_op(CEPH_OSD_OP_ZERO, off, len);
219 }
220 void truncate(uint64_t off) {
221 add_simple_op(CEPH_OSD_OP_TRUNCATE, off, 0);
222 }
223 void remove() {
224 add_simple_op(CEPH_OSD_OP_DELETE, 0, 0);
225 }
226
227 void read(uint64_t off, uint64_t len) {
228 add_simple_op(CEPH_OSD_OP_READ, off, len);
229 }
230 void stat() {
231 add_simple_op(CEPH_OSD_OP_STAT, 0, 0);
232 }
233
234 bool has_flag(__u32 flag) const { return flags & flag; };
235
236 bool is_retry_attempt() const { return flags & CEPH_OSD_FLAG_RETRY; }
237 void set_retry_attempt(unsigned a) {
238 if (a)
239 flags |= CEPH_OSD_FLAG_RETRY;
240 else
241 flags &= ~CEPH_OSD_FLAG_RETRY;
242 retry_attempt = a;
243 }
244
245 // marshalling
246 void encode_payload(uint64_t features) override {
247
248 OSDOp::merge_osd_op_vector_in_data(ops, data);
249
250 if ((features & CEPH_FEATURE_OBJECTLOCATOR) == 0) {
251 // here is the old structure we are encoding to: //
252 #if 0
253 struct ceph_osd_request_head {
254 __le32 client_inc; /* client incarnation */
255 struct ceph_object_layout layout; /* pgid */
256 __le32 osdmap_epoch; /* client's osdmap epoch */
257
258 __le32 flags;
259
260 struct ceph_timespec mtime; /* for mutations only */
261 struct ceph_eversion reassert_version; /* if we are replaying op */
262
263 __le32 object_len; /* length of object name */
264
265 __le64 snapid; /* snapid to read */
266 __le64 snap_seq; /* writer's snap context */
267 __le32 num_snaps;
268
269 __le16 num_ops;
270 struct ceph_osd_op ops[]; /* followed by ops[], obj, ticket, snaps */
271 } __attribute__ ((packed));
272 #endif
273 header.version = 1;
274
275 ::encode(client_inc, payload);
276
277 __u32 su = 0;
278 ::encode(get_raw_pg(), payload);
279 ::encode(su, payload);
280
281 ::encode(osdmap_epoch, payload);
282 ::encode(flags, payload);
283 ::encode(mtime, payload);
284 ::encode(eversion_t(), payload); // reassert_version
285
286 __u32 oid_len = hobj.oid.name.length();
287 ::encode(oid_len, payload);
288 ::encode(hobj.snap, payload);
289 ::encode(snap_seq, payload);
290 __u32 num_snaps = snaps.size();
291 ::encode(num_snaps, payload);
292
293 //::encode(ops, payload);
294 __u16 num_ops = ops.size();
295 ::encode(num_ops, payload);
296 for (unsigned i = 0; i < ops.size(); i++)
297 ::encode(ops[i].op, payload);
298
299 ::encode_nohead(hobj.oid.name, payload);
300 ::encode_nohead(snaps, payload);
301 } else if ((features & CEPH_FEATURE_NEW_OSDOP_ENCODING) == 0) {
302 header.version = 6;
303 ::encode(client_inc, payload);
304 ::encode(osdmap_epoch, payload);
305 ::encode(flags, payload);
306 ::encode(mtime, payload);
307 ::encode(eversion_t(), payload); // reassert_version
308 ::encode(get_object_locator(), payload);
309 ::encode(get_raw_pg(), payload);
310
311 ::encode(hobj.oid, payload);
312
313 __u16 num_ops = ops.size();
314 ::encode(num_ops, payload);
315 for (unsigned i = 0; i < ops.size(); i++)
316 ::encode(ops[i].op, payload);
317
318 ::encode(hobj.snap, payload);
319 ::encode(snap_seq, payload);
320 ::encode(snaps, payload);
321
322 ::encode(retry_attempt, payload);
323 ::encode(features, payload);
324 if (reqid.name != entity_name_t() || reqid.tid != 0) {
325 ::encode(reqid, payload);
326 } else {
327 // don't include client_inc in the reqid for the legacy v6
328 // encoding or else we'll confuse older peers.
329 ::encode(osd_reqid_t(), payload);
330 }
331 } else if (!HAVE_FEATURE(features, RESEND_ON_SPLIT)) {
332 // reordered, v7 message encoding
333 header.version = 7;
334 ::encode(get_raw_pg(), payload);
335 ::encode(osdmap_epoch, payload);
336 ::encode(flags, payload);
337 ::encode(eversion_t(), payload); // reassert_version
338 ::encode(reqid, payload);
339 ::encode(client_inc, payload);
340 ::encode(mtime, payload);
341 ::encode(get_object_locator(), payload);
342 ::encode(hobj.oid, payload);
343
344 __u16 num_ops = ops.size();
345 ::encode(num_ops, payload);
346 for (unsigned i = 0; i < ops.size(); i++)
347 ::encode(ops[i].op, payload);
348
349 ::encode(hobj.snap, payload);
350 ::encode(snap_seq, payload);
351 ::encode(snaps, payload);
352
353 ::encode(retry_attempt, payload);
354 ::encode(features, payload);
355 } else {
356 // latest v8 encoding with hobject_t hash separate from pgid, no
357 // reassert version
358 header.version = HEAD_VERSION;
359 ::encode(pgid, payload);
360 ::encode(hobj.get_hash(), payload);
361 ::encode(osdmap_epoch, payload);
362 ::encode(flags, payload);
363 ::encode(reqid, payload);
364 encode_trace(payload, features);
365
366 // -- above decoded up front; below decoded post-dispatch thread --
367
368 ::encode(client_inc, payload);
369 ::encode(mtime, payload);
370 ::encode(get_object_locator(), payload);
371 ::encode(hobj.oid, payload);
372
373 __u16 num_ops = ops.size();
374 ::encode(num_ops, payload);
375 for (unsigned i = 0; i < ops.size(); i++)
376 ::encode(ops[i].op, payload);
377
378 ::encode(hobj.snap, payload);
379 ::encode(snap_seq, payload);
380 ::encode(snaps, payload);
381
382 ::encode(retry_attempt, payload);
383 ::encode(features, payload);
384 }
385 }
386
387 void decode_payload() override {
388 assert(partial_decode_needed && final_decode_needed);
389 p = payload.begin();
390
391 // Always keep here the newest version of decoding order/rule
392 if (header.version == HEAD_VERSION) {
393 ::decode(pgid, p); // actual pgid
394 uint32_t hash;
395 ::decode(hash, p); // raw hash value
396 hobj.set_hash(hash);
397 ::decode(osdmap_epoch, p);
398 ::decode(flags, p);
399 ::decode(reqid, p);
400 decode_trace(p);
401 } else if (header.version == 7) {
402 ::decode(pgid.pgid, p); // raw pgid
403 hobj.set_hash(pgid.pgid.ps());
404 ::decode(osdmap_epoch, p);
405 ::decode(flags, p);
406 eversion_t reassert_version;
407 ::decode(reassert_version, p);
408 ::decode(reqid, p);
409 } else if (header.version < 2) {
410 // old decode
411 ::decode(client_inc, p);
412
413 old_pg_t opgid;
414 ::decode_raw(opgid, p);
415 pgid.pgid = opgid;
416
417 __u32 su;
418 ::decode(su, p);
419
420 ::decode(osdmap_epoch, p);
421 ::decode(flags, p);
422 ::decode(mtime, p);
423 eversion_t reassert_version;
424 ::decode(reassert_version, p);
425
426 __u32 oid_len;
427 ::decode(oid_len, p);
428 ::decode(hobj.snap, p);
429 ::decode(snap_seq, p);
430 __u32 num_snaps;
431 ::decode(num_snaps, p);
432
433 //::decode(ops, p);
434 __u16 num_ops;
435 ::decode(num_ops, p);
436 ops.resize(num_ops);
437 for (unsigned i = 0; i < num_ops; i++)
438 ::decode(ops[i].op, p);
439
440 decode_nohead(oid_len, hobj.oid.name, p);
441 decode_nohead(num_snaps, snaps, p);
442
443 // recalculate pgid hash value
444 pgid.pgid.set_ps(ceph_str_hash(CEPH_STR_HASH_RJENKINS,
445 hobj.oid.name.c_str(),
446 hobj.oid.name.length()));
447 hobj.pool = pgid.pgid.pool();
448 hobj.set_hash(pgid.pgid.ps());
449
450 retry_attempt = -1;
451 features = 0;
452 OSDOp::split_osd_op_vector_in_data(ops, data);
453
454 // we did the full decode
455 final_decode_needed = false;
456
457 // put client_inc in reqid.inc for get_reqid()'s benefit
458 reqid = osd_reqid_t();
459 reqid.inc = client_inc;
460 } else if (header.version < 7) {
461 ::decode(client_inc, p);
462 ::decode(osdmap_epoch, p);
463 ::decode(flags, p);
464 ::decode(mtime, p);
465 eversion_t reassert_version;
466 ::decode(reassert_version, p);
467
468 object_locator_t oloc;
469 ::decode(oloc, p);
470
471 if (header.version < 3) {
472 old_pg_t opgid;
473 ::decode_raw(opgid, p);
474 pgid.pgid = opgid;
475 } else {
476 ::decode(pgid.pgid, p);
477 }
478
479 ::decode(hobj.oid, p);
480
481 //::decode(ops, p);
482 __u16 num_ops;
483 ::decode(num_ops, p);
484 ops.resize(num_ops);
485 for (unsigned i = 0; i < num_ops; i++)
486 ::decode(ops[i].op, p);
487
488 ::decode(hobj.snap, p);
489 ::decode(snap_seq, p);
490 ::decode(snaps, p);
491
492 if (header.version >= 4)
493 ::decode(retry_attempt, p);
494 else
495 retry_attempt = -1;
496
497 if (header.version >= 5)
498 ::decode(features, p);
499 else
500 features = 0;
501
502 if (header.version >= 6)
503 ::decode(reqid, p);
504 else
505 reqid = osd_reqid_t();
506
507 hobj.pool = pgid.pgid.pool();
508 hobj.set_key(oloc.key);
509 hobj.nspace = oloc.nspace;
510 hobj.set_hash(pgid.pgid.ps());
511
512 OSDOp::split_osd_op_vector_in_data(ops, data);
513
514 // we did the full decode
515 final_decode_needed = false;
516
517 // put client_inc in reqid.inc for get_reqid()'s benefit
518 if (reqid.name == entity_name_t() && reqid.tid == 0)
519 reqid.inc = client_inc;
520 }
521
522 partial_decode_needed = false;
523 }
524
525 bool finish_decode() {
526 assert(!partial_decode_needed); // partial decoding required
527 if (!final_decode_needed)
528 return false; // Message is already final decoded
529 assert(header.version >= 7);
530
531 ::decode(client_inc, p);
532 ::decode(mtime, p);
533 object_locator_t oloc;
534 ::decode(oloc, p);
535 ::decode(hobj.oid, p);
536
537 __u16 num_ops;
538 ::decode(num_ops, p);
539 ops.resize(num_ops);
540 for (unsigned i = 0; i < num_ops; i++)
541 ::decode(ops[i].op, p);
542
543 ::decode(hobj.snap, p);
544 ::decode(snap_seq, p);
545 ::decode(snaps, p);
546
547 ::decode(retry_attempt, p);
548
549 ::decode(features, p);
550
551 hobj.pool = pgid.pgid.pool();
552 hobj.set_key(oloc.key);
553 hobj.nspace = oloc.nspace;
554
555 OSDOp::split_osd_op_vector_in_data(ops, data);
556
557 final_decode_needed = false;
558 return true;
559 }
560
561 void clear_buffers() override {
562 OSDOp::clear_data(ops);
563 }
564
565 const char *get_type_name() const override { return "osd_op"; }
566 void print(ostream& out) const override {
567 out << "osd_op(";
568 if (!partial_decode_needed) {
569 out << get_reqid() << ' ';
570 out << pgid;
571 if (!final_decode_needed) {
572 out << ' ';
573 out << hobj
574 << " " << ops
575 << " snapc " << get_snap_seq() << "=" << snaps;
576 if (is_retry_attempt())
577 out << " RETRY=" << get_retry_attempt();
578 } else {
579 out << " " << get_raw_pg() << " (undecoded)";
580 }
581 out << " " << ceph_osd_flag_string(get_flags());
582 out << " e" << osdmap_epoch;
583 }
584 out << ")";
585 }
586 };
587
588
589 #endif