1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
21 #include "MOSDFastDispatchOp.h"
22 #include "include/ceph_features.h"
23 #include "common/hobject.h"
29 * op - OSD_OP_DELETE, etc.
37 class MOSDOp final
: public MOSDFastDispatchOp
{
39 static constexpr int HEAD_VERSION
= 8;
40 static constexpr int COMPAT_VERSION
= 3;
43 uint32_t client_inc
= 0;
44 __u32 osdmap_epoch
= 0;
47 int32_t retry_attempt
= -1; // 0 is first attempt. -1 if we don't know.
51 ceph::buffer::list::const_iterator p
;
52 // Decoding flags. Decoding is only needed for messages caught by pipe reader.
53 // Transition from true -> false without locks being held
54 // Can never see final_decode_needed == false and partial_decode_needed == true
55 std::atomic
<bool> partial_decode_needed
;
56 std::atomic
<bool> final_decode_needed
;
62 std::vector
<snapid_t
> snaps
;
66 osd_reqid_t reqid
; // reqid explicitly set by sender
71 ceph_tid_t
get_client_tid() { return header
.tid
; }
72 void set_snapid(const snapid_t
& s
) {
75 void set_snaps(const std::vector
<snapid_t
>& i
) {
78 void set_snap_seq(const snapid_t
& s
) { snap_seq
= s
; }
79 void set_reqid(const osd_reqid_t rid
) {
82 void set_spg(spg_t p
) {
86 // Fields decoded in partial decoding
88 ceph_assert(!partial_decode_needed
);
91 spg_t
get_spg() const override
{
92 ceph_assert(!partial_decode_needed
);
95 pg_t
get_raw_pg() const {
96 ceph_assert(!partial_decode_needed
);
97 return pg_t(hobj
.get_hash(), pgid
.pgid
.pool());
99 epoch_t
get_map_epoch() const override
{
100 ceph_assert(!partial_decode_needed
);
103 int get_flags() const {
104 ceph_assert(!partial_decode_needed
);
107 osd_reqid_t
get_reqid() const {
108 ceph_assert(!partial_decode_needed
);
109 if (reqid
.name
!= entity_name_t() || reqid
.tid
!= 0) {
112 if (!final_decode_needed
)
113 ceph_assert(reqid
.inc
== (int32_t)client_inc
); // decode() should have done this
114 return osd_reqid_t(get_orig_source(),
120 // Fields decoded in final decoding
121 int get_client_inc() const {
122 ceph_assert(!final_decode_needed
);
125 utime_t
get_mtime() const {
126 ceph_assert(!final_decode_needed
);
129 object_locator_t
get_object_locator() const {
130 ceph_assert(!final_decode_needed
);
131 if (hobj
.oid
.name
.empty())
132 return object_locator_t(hobj
.pool
, hobj
.nspace
, hobj
.get_hash());
134 return object_locator_t(hobj
);
136 const object_t
& get_oid() const {
137 ceph_assert(!final_decode_needed
);
140 const hobject_t
&get_hobj() const {
143 snapid_t
get_snapid() const {
144 ceph_assert(!final_decode_needed
);
147 const snapid_t
& get_snap_seq() const {
148 ceph_assert(!final_decode_needed
);
151 const std::vector
<snapid_t
> &get_snaps() const {
152 ceph_assert(!final_decode_needed
);
159 * 0 is the first attempt.
161 * @return retry attempt, or -1 if we don't know
163 int get_retry_attempt() const {
164 return retry_attempt
;
166 uint64_t get_features() const {
170 // In crimson, conn is independently maintained outside Message.
173 return get_connection()->get_features();
177 : MOSDFastDispatchOp(CEPH_MSG_OSD_OP
, HEAD_VERSION
, COMPAT_VERSION
),
178 partial_decode_needed(true),
179 final_decode_needed(true),
180 bdata_encode(false) { }
181 MOSDOp(int inc
, ceph_tid_t tid
, const hobject_t
& ho
, spg_t
& _pgid
,
182 epoch_t _osdmap_epoch
,
183 int _flags
, uint64_t feat
)
184 : MOSDFastDispatchOp(CEPH_MSG_OSD_OP
, HEAD_VERSION
, COMPAT_VERSION
),
186 osdmap_epoch(_osdmap_epoch
), flags(_flags
), retry_attempt(-1),
189 partial_decode_needed(false),
190 final_decode_needed(false),
192 bdata_encode(false) {
195 // also put the client_inc in reqid.inc, so that get_reqid() can
196 // be used before the full message is decoded.
203 void set_mtime(utime_t mt
) { mtime
= mt
; }
204 void set_mtime(ceph::real_time mt
) {
205 mtime
= ceph::real_clock::to_timespec(mt
);
209 void add_simple_op(int o
, uint64_t off
, uint64_t len
) {
212 osd_op
.op
.extent
.offset
= off
;
213 osd_op
.op
.extent
.length
= len
;
214 ops
.push_back(osd_op
);
216 void write(uint64_t off
, uint64_t len
, ceph::buffer::list
& bl
) {
217 add_simple_op(CEPH_OSD_OP_WRITE
, off
, len
);
218 data
= std::move(bl
);
219 header
.data_off
= off
;
221 void writefull(ceph::buffer::list
& bl
) {
222 add_simple_op(CEPH_OSD_OP_WRITEFULL
, 0, bl
.length());
223 data
= std::move(bl
);
226 void zero(uint64_t off
, uint64_t len
) {
227 add_simple_op(CEPH_OSD_OP_ZERO
, off
, len
);
229 void truncate(uint64_t off
) {
230 add_simple_op(CEPH_OSD_OP_TRUNCATE
, off
, 0);
233 add_simple_op(CEPH_OSD_OP_DELETE
, 0, 0);
236 void read(uint64_t off
, uint64_t len
) {
237 add_simple_op(CEPH_OSD_OP_READ
, off
, len
);
240 add_simple_op(CEPH_OSD_OP_STAT
, 0, 0);
243 bool has_flag(__u32 flag
) const { return flags
& flag
; };
245 bool is_retry_attempt() const { return flags
& CEPH_OSD_FLAG_RETRY
; }
246 void set_retry_attempt(unsigned a
) {
248 flags
|= CEPH_OSD_FLAG_RETRY
;
250 flags
&= ~CEPH_OSD_FLAG_RETRY
;
255 void encode_payload(uint64_t features
) override
{
257 if( false == bdata_encode
) {
258 OSDOp::merge_osd_op_vector_in_data(ops
, data
);
262 if ((features
& CEPH_FEATURE_OBJECTLOCATOR
) == 0) {
263 // here is the old structure we are encoding to: //
265 struct ceph_osd_request_head
{
266 ceph_le32 client_inc
; /* client incarnation */
267 struct ceph_object_layout layout
; /* pgid */
268 ceph_le32 osdmap_epoch
; /* client's osdmap epoch */
272 struct ceph_timespec mtime
; /* for mutations only */
273 struct ceph_eversion reassert_version
; /* if we are replaying op */
275 ceph_le32 object_len
; /* length of object name */
277 ceph_le64 snapid
; /* snapid to read */
278 ceph_le64 snap_seq
; /* writer's snap context */
282 struct ceph_osd_op ops
[]; /* followed by ops[], obj, ticket, snaps */
283 } __attribute__ ((packed
));
287 encode(client_inc
, payload
);
290 encode(get_raw_pg(), payload
);
293 encode(osdmap_epoch
, payload
);
294 encode(flags
, payload
);
295 encode(mtime
, payload
);
296 encode(eversion_t(), payload
); // reassert_version
298 __u32 oid_len
= hobj
.oid
.name
.length();
299 encode(oid_len
, payload
);
300 encode(hobj
.snap
, payload
);
301 encode(snap_seq
, payload
);
302 __u32 num_snaps
= snaps
.size();
303 encode(num_snaps
, payload
);
305 //::encode(ops, payload);
306 __u16 num_ops
= ops
.size();
307 encode(num_ops
, payload
);
308 for (unsigned i
= 0; i
< ops
.size(); i
++)
309 encode(ops
[i
].op
, payload
);
311 ceph::encode_nohead(hobj
.oid
.name
, payload
);
312 ceph::encode_nohead(snaps
, payload
);
313 } else if ((features
& CEPH_FEATURE_NEW_OSDOP_ENCODING
) == 0) {
315 encode(client_inc
, payload
);
316 encode(osdmap_epoch
, payload
);
317 encode(flags
, payload
);
318 encode(mtime
, payload
);
319 encode(eversion_t(), payload
); // reassert_version
320 encode(get_object_locator(), payload
);
321 encode(get_raw_pg(), payload
);
323 encode(hobj
.oid
, payload
);
325 __u16 num_ops
= ops
.size();
326 encode(num_ops
, payload
);
327 for (unsigned i
= 0; i
< ops
.size(); i
++)
328 encode(ops
[i
].op
, payload
);
330 encode(hobj
.snap
, payload
);
331 encode(snap_seq
, payload
);
332 encode(snaps
, payload
);
334 encode(retry_attempt
, payload
);
335 encode(features
, payload
);
336 if (reqid
.name
!= entity_name_t() || reqid
.tid
!= 0) {
337 encode(reqid
, payload
);
339 // don't include client_inc in the reqid for the legacy v6
340 // encoding or else we'll confuse older peers.
341 encode(osd_reqid_t(), payload
);
343 } else if (!HAVE_FEATURE(features
, RESEND_ON_SPLIT
)) {
344 // reordered, v7 message encoding
346 encode(get_raw_pg(), payload
);
347 encode(osdmap_epoch
, payload
);
348 encode(flags
, payload
);
349 encode(eversion_t(), payload
); // reassert_version
350 encode(reqid
, payload
);
351 encode(client_inc
, payload
);
352 encode(mtime
, payload
);
353 encode(get_object_locator(), payload
);
354 encode(hobj
.oid
, payload
);
356 __u16 num_ops
= ops
.size();
357 encode(num_ops
, payload
);
358 for (unsigned i
= 0; i
< ops
.size(); i
++)
359 encode(ops
[i
].op
, payload
);
361 encode(hobj
.snap
, payload
);
362 encode(snap_seq
, payload
);
363 encode(snaps
, payload
);
365 encode(retry_attempt
, payload
);
366 encode(features
, payload
);
368 // latest v8 encoding with hobject_t hash separate from pgid, no
370 header
.version
= HEAD_VERSION
;
372 encode(pgid
, payload
);
373 encode(hobj
.get_hash(), payload
);
374 encode(osdmap_epoch
, payload
);
375 encode(flags
, payload
);
376 encode(reqid
, payload
);
377 encode_trace(payload
, features
);
379 // -- above decoded up front; below decoded post-dispatch thread --
381 encode(client_inc
, payload
);
382 encode(mtime
, payload
);
383 encode(get_object_locator(), payload
);
384 encode(hobj
.oid
, payload
);
386 __u16 num_ops
= ops
.size();
387 encode(num_ops
, payload
);
388 for (unsigned i
= 0; i
< ops
.size(); i
++)
389 encode(ops
[i
].op
, payload
);
391 encode(hobj
.snap
, payload
);
392 encode(snap_seq
, payload
);
393 encode(snaps
, payload
);
395 encode(retry_attempt
, payload
);
396 encode(features
, payload
);
400 void decode_payload() override
{
402 ceph_assert(partial_decode_needed
&& final_decode_needed
);
403 p
= std::cbegin(payload
);
405 // Always keep here the newest version of decoding order/rule
406 if (header
.version
== HEAD_VERSION
) {
407 decode(pgid
, p
); // actual pgid
409 decode(hash
, p
); // raw hash value
411 decode(osdmap_epoch
, p
);
415 } else if (header
.version
== 7) {
416 decode(pgid
.pgid
, p
); // raw pgid
417 hobj
.set_hash(pgid
.pgid
.ps());
418 decode(osdmap_epoch
, p
);
420 eversion_t reassert_version
;
421 decode(reassert_version
, p
);
423 } else if (header
.version
< 2) {
425 decode(client_inc
, p
);
428 ceph::decode_raw(opgid
, p
);
434 decode(osdmap_epoch
, p
);
437 eversion_t reassert_version
;
438 decode(reassert_version
, p
);
442 decode(hobj
.snap
, p
);
445 decode(num_snaps
, p
);
451 for (unsigned i
= 0; i
< num_ops
; i
++)
452 decode(ops
[i
].op
, p
);
454 ceph::decode_nohead(oid_len
, hobj
.oid
.name
, p
);
455 ceph::decode_nohead(num_snaps
, snaps
, p
);
457 // recalculate pgid hash value
458 pgid
.pgid
.set_ps(ceph_str_hash(CEPH_STR_HASH_RJENKINS
,
459 hobj
.oid
.name
.c_str(),
460 hobj
.oid
.name
.length()));
461 hobj
.pool
= pgid
.pgid
.pool();
462 hobj
.set_hash(pgid
.pgid
.ps());
466 OSDOp::split_osd_op_vector_in_data(ops
, data
);
468 // we did the full decode
469 final_decode_needed
= false;
471 // put client_inc in reqid.inc for get_reqid()'s benefit
472 reqid
= osd_reqid_t();
473 reqid
.inc
= client_inc
;
474 } else if (header
.version
< 7) {
475 decode(client_inc
, p
);
476 decode(osdmap_epoch
, p
);
479 eversion_t reassert_version
;
480 decode(reassert_version
, p
);
482 object_locator_t oloc
;
485 if (header
.version
< 3) {
487 ceph::decode_raw(opgid
, p
);
490 decode(pgid
.pgid
, p
);
499 for (unsigned i
= 0; i
< num_ops
; i
++)
500 decode(ops
[i
].op
, p
);
502 decode(hobj
.snap
, p
);
506 if (header
.version
>= 4)
507 decode(retry_attempt
, p
);
511 if (header
.version
>= 5)
516 if (header
.version
>= 6)
519 reqid
= osd_reqid_t();
521 hobj
.pool
= pgid
.pgid
.pool();
522 hobj
.set_key(oloc
.key
);
523 hobj
.nspace
= oloc
.nspace
;
524 hobj
.set_hash(pgid
.pgid
.ps());
526 OSDOp::split_osd_op_vector_in_data(ops
, data
);
528 // we did the full decode
529 final_decode_needed
= false;
531 // put client_inc in reqid.inc for get_reqid()'s benefit
532 if (reqid
.name
== entity_name_t() && reqid
.tid
== 0)
533 reqid
.inc
= client_inc
;
536 partial_decode_needed
= false;
539 bool finish_decode() {
541 ceph_assert(!partial_decode_needed
); // partial decoding required
542 if (!final_decode_needed
)
543 return false; // Message is already final decoded
544 ceph_assert(header
.version
>= 7);
546 decode(client_inc
, p
);
548 object_locator_t oloc
;
555 for (unsigned i
= 0; i
< num_ops
; i
++)
556 decode(ops
[i
].op
, p
);
558 decode(hobj
.snap
, p
);
562 decode(retry_attempt
, p
);
566 hobj
.pool
= pgid
.pgid
.pool();
567 hobj
.set_key(oloc
.key
);
568 hobj
.nspace
= oloc
.nspace
;
570 OSDOp::split_osd_op_vector_in_data(ops
, data
);
572 final_decode_needed
= false;
576 void clear_buffers() override
{
577 OSDOp::clear_data(ops
);
578 bdata_encode
= false;
581 std::string_view
get_type_name() const override
{ return "osd_op"; }
582 void print(std::ostream
& out
) const override
{
584 if (!partial_decode_needed
) {
585 out
<< get_reqid() << ' ';
587 if (!final_decode_needed
) {
591 << " snapc " << get_snap_seq() << "=" << snaps
;
592 if (is_retry_attempt())
593 out
<< " RETRY=" << get_retry_attempt();
595 out
<< " " << get_raw_pg() << " (undecoded)";
597 out
<< " " << ceph_osd_flag_string(get_flags());
598 out
<< " e" << osdmap_epoch
;
604 template<class T
, typename
... Args
>
605 friend boost::intrusive_ptr
<T
> ceph::make_message(Args
&&... args
);
609 using MOSDOp
= _mosdop::MOSDOp
<std::vector
<OSDOp
>>;