1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
19 #include "MOSDFastDispatchOp.h"
20 #include "include/ceph_features.h"
21 #include "common/hobject.h"
28 * op - OSD_OP_DELETE, etc.
34 class MOSDOp
: public MOSDFastDispatchOp
{
36 static const int HEAD_VERSION
= 8;
37 static const int COMPAT_VERSION
= 3;
40 uint32_t client_inc
= 0;
41 __u32 osdmap_epoch
= 0;
44 int32_t retry_attempt
= -1; // 0 is first attempt. -1 if we don't know.
48 bufferlist::iterator p
;
49 // Decoding flags. Decoding is only needed for messages catched by pipe reader.
50 // Transition from true -> false without locks being held
51 // Can never see final_decode_needed == false and partial_decode_needed == true
52 atomic
<bool> partial_decode_needed
;
53 atomic
<bool> final_decode_needed
;
59 vector
<snapid_t
> snaps
;
63 osd_reqid_t reqid
; // reqid explicitly set by sender
66 friend class MOSDOpReply
;
68 ceph_tid_t
get_client_tid() { return header
.tid
; }
69 void set_snapid(const snapid_t
& s
) {
72 void set_snaps(const vector
<snapid_t
>& i
) {
75 void set_snap_seq(const snapid_t
& s
) { snap_seq
= s
; }
76 void set_reqid(const osd_reqid_t rid
) {
79 void set_spg(spg_t p
) {
83 // Fields decoded in partial decoding
85 assert(!partial_decode_needed
);
88 spg_t
get_spg() const override
{
89 assert(!partial_decode_needed
);
92 pg_t
get_raw_pg() const {
93 assert(!partial_decode_needed
);
94 return pg_t(hobj
.get_hash(), pgid
.pgid
.pool());
96 epoch_t
get_map_epoch() const override
{
97 assert(!partial_decode_needed
);
100 int get_flags() const {
101 assert(!partial_decode_needed
);
104 osd_reqid_t
get_reqid() const {
105 assert(!partial_decode_needed
);
106 if (reqid
.name
!= entity_name_t() || reqid
.tid
!= 0) {
109 if (!final_decode_needed
)
110 assert(reqid
.inc
== (int32_t)client_inc
); // decode() should have done this
111 return osd_reqid_t(get_orig_source(),
117 // Fields decoded in final decoding
118 int get_client_inc() const {
119 assert(!final_decode_needed
);
122 utime_t
get_mtime() const {
123 assert(!final_decode_needed
);
126 object_locator_t
get_object_locator() const {
127 assert(!final_decode_needed
);
128 if (hobj
.oid
.name
.empty())
129 return object_locator_t(hobj
.pool
, hobj
.nspace
, hobj
.get_hash());
131 return object_locator_t(hobj
);
133 const object_t
& get_oid() const {
134 assert(!final_decode_needed
);
137 const hobject_t
&get_hobj() const {
140 snapid_t
get_snapid() const {
141 assert(!final_decode_needed
);
144 const snapid_t
& get_snap_seq() const {
145 assert(!final_decode_needed
);
148 const vector
<snapid_t
> &get_snaps() const {
149 assert(!final_decode_needed
);
156 * 0 is the first attempt.
158 * @return retry attempt, or -1 if we don't know
160 int get_retry_attempt() const {
161 return retry_attempt
;
163 uint64_t get_features() const {
166 return get_connection()->get_features();
170 : MOSDFastDispatchOp(CEPH_MSG_OSD_OP
, HEAD_VERSION
, COMPAT_VERSION
),
171 partial_decode_needed(true),
172 final_decode_needed(true) { }
173 MOSDOp(int inc
, long tid
, const hobject_t
& ho
, spg_t
& _pgid
,
174 epoch_t _osdmap_epoch
,
175 int _flags
, uint64_t feat
)
176 : MOSDFastDispatchOp(CEPH_MSG_OSD_OP
, HEAD_VERSION
, COMPAT_VERSION
),
178 osdmap_epoch(_osdmap_epoch
), flags(_flags
), retry_attempt(-1),
181 partial_decode_needed(false),
182 final_decode_needed(false),
186 // also put the client_inc in reqid.inc, so that get_reqid() can
187 // be used before the full message is decoded.
191 ~MOSDOp() override
{}
194 void set_mtime(utime_t mt
) { mtime
= mt
; }
195 void set_mtime(ceph::real_time mt
) {
196 mtime
= ceph::real_clock::to_timespec(mt
);
200 void add_simple_op(int o
, uint64_t off
, uint64_t len
) {
203 osd_op
.op
.extent
.offset
= off
;
204 osd_op
.op
.extent
.length
= len
;
205 ops
.push_back(osd_op
);
207 void write(uint64_t off
, uint64_t len
, bufferlist
& bl
) {
208 add_simple_op(CEPH_OSD_OP_WRITE
, off
, len
);
210 header
.data_off
= off
;
212 void writefull(bufferlist
& bl
) {
213 add_simple_op(CEPH_OSD_OP_WRITEFULL
, 0, bl
.length());
217 void zero(uint64_t off
, uint64_t len
) {
218 add_simple_op(CEPH_OSD_OP_ZERO
, off
, len
);
220 void truncate(uint64_t off
) {
221 add_simple_op(CEPH_OSD_OP_TRUNCATE
, off
, 0);
224 add_simple_op(CEPH_OSD_OP_DELETE
, 0, 0);
227 void read(uint64_t off
, uint64_t len
) {
228 add_simple_op(CEPH_OSD_OP_READ
, off
, len
);
231 add_simple_op(CEPH_OSD_OP_STAT
, 0, 0);
234 bool has_flag(__u32 flag
) const { return flags
& flag
; };
236 bool is_retry_attempt() const { return flags
& CEPH_OSD_FLAG_RETRY
; }
237 void set_retry_attempt(unsigned a
) {
239 flags
|= CEPH_OSD_FLAG_RETRY
;
241 flags
&= ~CEPH_OSD_FLAG_RETRY
;
246 void encode_payload(uint64_t features
) override
{
248 OSDOp::merge_osd_op_vector_in_data(ops
, data
);
250 if ((features
& CEPH_FEATURE_OBJECTLOCATOR
) == 0) {
251 // here is the old structure we are encoding to: //
253 struct ceph_osd_request_head
{
254 __le32 client_inc
; /* client incarnation */
255 struct ceph_object_layout layout
; /* pgid */
256 __le32 osdmap_epoch
; /* client's osdmap epoch */
260 struct ceph_timespec mtime
; /* for mutations only */
261 struct ceph_eversion reassert_version
; /* if we are replaying op */
263 __le32 object_len
; /* length of object name */
265 __le64 snapid
; /* snapid to read */
266 __le64 snap_seq
; /* writer's snap context */
270 struct ceph_osd_op ops
[]; /* followed by ops[], obj, ticket, snaps */
271 } __attribute__ ((packed
));
275 ::encode(client_inc
, payload
);
278 ::encode(get_raw_pg(), payload
);
279 ::encode(su
, payload
);
281 ::encode(osdmap_epoch
, payload
);
282 ::encode(flags
, payload
);
283 ::encode(mtime
, payload
);
284 ::encode(eversion_t(), payload
); // reassert_version
286 __u32 oid_len
= hobj
.oid
.name
.length();
287 ::encode(oid_len
, payload
);
288 ::encode(hobj
.snap
, payload
);
289 ::encode(snap_seq
, payload
);
290 __u32 num_snaps
= snaps
.size();
291 ::encode(num_snaps
, payload
);
293 //::encode(ops, payload);
294 __u16 num_ops
= ops
.size();
295 ::encode(num_ops
, payload
);
296 for (unsigned i
= 0; i
< ops
.size(); i
++)
297 ::encode(ops
[i
].op
, payload
);
299 ::encode_nohead(hobj
.oid
.name
, payload
);
300 ::encode_nohead(snaps
, payload
);
301 } else if ((features
& CEPH_FEATURE_NEW_OSDOP_ENCODING
) == 0) {
303 ::encode(client_inc
, payload
);
304 ::encode(osdmap_epoch
, payload
);
305 ::encode(flags
, payload
);
306 ::encode(mtime
, payload
);
307 ::encode(eversion_t(), payload
); // reassert_version
308 ::encode(get_object_locator(), payload
);
309 ::encode(get_raw_pg(), payload
);
311 ::encode(hobj
.oid
, payload
);
313 __u16 num_ops
= ops
.size();
314 ::encode(num_ops
, payload
);
315 for (unsigned i
= 0; i
< ops
.size(); i
++)
316 ::encode(ops
[i
].op
, payload
);
318 ::encode(hobj
.snap
, payload
);
319 ::encode(snap_seq
, payload
);
320 ::encode(snaps
, payload
);
322 ::encode(retry_attempt
, payload
);
323 ::encode(features
, payload
);
324 if (reqid
.name
!= entity_name_t() || reqid
.tid
!= 0) {
325 ::encode(reqid
, payload
);
327 // don't include client_inc in the reqid for the legacy v6
328 // encoding or else we'll confuse older peers.
329 ::encode(osd_reqid_t(), payload
);
331 } else if (!HAVE_FEATURE(features
, RESEND_ON_SPLIT
)) {
332 // reordered, v7 message encoding
334 ::encode(get_raw_pg(), payload
);
335 ::encode(osdmap_epoch
, payload
);
336 ::encode(flags
, payload
);
337 ::encode(eversion_t(), payload
); // reassert_version
338 ::encode(reqid
, payload
);
339 ::encode(client_inc
, payload
);
340 ::encode(mtime
, payload
);
341 ::encode(get_object_locator(), payload
);
342 ::encode(hobj
.oid
, payload
);
344 __u16 num_ops
= ops
.size();
345 ::encode(num_ops
, payload
);
346 for (unsigned i
= 0; i
< ops
.size(); i
++)
347 ::encode(ops
[i
].op
, payload
);
349 ::encode(hobj
.snap
, payload
);
350 ::encode(snap_seq
, payload
);
351 ::encode(snaps
, payload
);
353 ::encode(retry_attempt
, payload
);
354 ::encode(features
, payload
);
356 // latest v8 encoding with hobject_t hash separate from pgid, no
358 header
.version
= HEAD_VERSION
;
359 ::encode(pgid
, payload
);
360 ::encode(hobj
.get_hash(), payload
);
361 ::encode(osdmap_epoch
, payload
);
362 ::encode(flags
, payload
);
363 ::encode(reqid
, payload
);
364 encode_trace(payload
, features
);
366 // -- above decoded up front; below decoded post-dispatch thread --
368 ::encode(client_inc
, payload
);
369 ::encode(mtime
, payload
);
370 ::encode(get_object_locator(), payload
);
371 ::encode(hobj
.oid
, payload
);
373 __u16 num_ops
= ops
.size();
374 ::encode(num_ops
, payload
);
375 for (unsigned i
= 0; i
< ops
.size(); i
++)
376 ::encode(ops
[i
].op
, payload
);
378 ::encode(hobj
.snap
, payload
);
379 ::encode(snap_seq
, payload
);
380 ::encode(snaps
, payload
);
382 ::encode(retry_attempt
, payload
);
383 ::encode(features
, payload
);
387 void decode_payload() override
{
388 assert(partial_decode_needed
&& final_decode_needed
);
391 // Always keep here the newest version of decoding order/rule
392 if (header
.version
== HEAD_VERSION
) {
393 ::decode(pgid
, p
); // actual pgid
395 ::decode(hash
, p
); // raw hash value
397 ::decode(osdmap_epoch
, p
);
401 } else if (header
.version
== 7) {
402 ::decode(pgid
.pgid
, p
); // raw pgid
403 hobj
.set_hash(pgid
.pgid
.ps());
404 ::decode(osdmap_epoch
, p
);
406 eversion_t reassert_version
;
407 ::decode(reassert_version
, p
);
409 } else if (header
.version
< 2) {
411 ::decode(client_inc
, p
);
414 ::decode_raw(opgid
, p
);
420 ::decode(osdmap_epoch
, p
);
423 eversion_t reassert_version
;
424 ::decode(reassert_version
, p
);
427 ::decode(oid_len
, p
);
428 ::decode(hobj
.snap
, p
);
429 ::decode(snap_seq
, p
);
431 ::decode(num_snaps
, p
);
435 ::decode(num_ops
, p
);
437 for (unsigned i
= 0; i
< num_ops
; i
++)
438 ::decode(ops
[i
].op
, p
);
440 decode_nohead(oid_len
, hobj
.oid
.name
, p
);
441 decode_nohead(num_snaps
, snaps
, p
);
443 // recalculate pgid hash value
444 pgid
.pgid
.set_ps(ceph_str_hash(CEPH_STR_HASH_RJENKINS
,
445 hobj
.oid
.name
.c_str(),
446 hobj
.oid
.name
.length()));
447 hobj
.pool
= pgid
.pgid
.pool();
448 hobj
.set_hash(pgid
.pgid
.ps());
452 OSDOp::split_osd_op_vector_in_data(ops
, data
);
454 // we did the full decode
455 final_decode_needed
= false;
457 // put client_inc in reqid.inc for get_reqid()'s benefit
458 reqid
= osd_reqid_t();
459 reqid
.inc
= client_inc
;
460 } else if (header
.version
< 7) {
461 ::decode(client_inc
, p
);
462 ::decode(osdmap_epoch
, p
);
465 eversion_t reassert_version
;
466 ::decode(reassert_version
, p
);
468 object_locator_t oloc
;
471 if (header
.version
< 3) {
473 ::decode_raw(opgid
, p
);
476 ::decode(pgid
.pgid
, p
);
479 ::decode(hobj
.oid
, p
);
483 ::decode(num_ops
, p
);
485 for (unsigned i
= 0; i
< num_ops
; i
++)
486 ::decode(ops
[i
].op
, p
);
488 ::decode(hobj
.snap
, p
);
489 ::decode(snap_seq
, p
);
492 if (header
.version
>= 4)
493 ::decode(retry_attempt
, p
);
497 if (header
.version
>= 5)
498 ::decode(features
, p
);
502 if (header
.version
>= 6)
505 reqid
= osd_reqid_t();
507 hobj
.pool
= pgid
.pgid
.pool();
508 hobj
.set_key(oloc
.key
);
509 hobj
.nspace
= oloc
.nspace
;
510 hobj
.set_hash(pgid
.pgid
.ps());
512 OSDOp::split_osd_op_vector_in_data(ops
, data
);
514 // we did the full decode
515 final_decode_needed
= false;
517 // put client_inc in reqid.inc for get_reqid()'s benefit
518 if (reqid
.name
== entity_name_t() && reqid
.tid
== 0)
519 reqid
.inc
= client_inc
;
522 partial_decode_needed
= false;
525 bool finish_decode() {
526 assert(!partial_decode_needed
); // partial decoding required
527 if (!final_decode_needed
)
528 return false; // Message is already final decoded
529 assert(header
.version
>= 7);
531 ::decode(client_inc
, p
);
533 object_locator_t oloc
;
535 ::decode(hobj
.oid
, p
);
538 ::decode(num_ops
, p
);
540 for (unsigned i
= 0; i
< num_ops
; i
++)
541 ::decode(ops
[i
].op
, p
);
543 ::decode(hobj
.snap
, p
);
544 ::decode(snap_seq
, p
);
547 ::decode(retry_attempt
, p
);
549 ::decode(features
, p
);
551 hobj
.pool
= pgid
.pgid
.pool();
552 hobj
.set_key(oloc
.key
);
553 hobj
.nspace
= oloc
.nspace
;
555 OSDOp::split_osd_op_vector_in_data(ops
, data
);
557 final_decode_needed
= false;
561 void clear_buffers() override
{
562 OSDOp::clear_data(ops
);
565 const char *get_type_name() const override
{ return "osd_op"; }
566 void print(ostream
& out
) const override
{
568 if (!partial_decode_needed
) {
569 out
<< get_reqid() << ' ';
571 if (!final_decode_needed
) {
575 << " snapc " << get_snap_seq() << "=" << snaps
;
576 if (is_retry_attempt())
577 out
<< " RETRY=" << get_retry_attempt();
579 out
<< " " << get_raw_pg() << " (undecoded)";
581 out
<< " " << ceph_osd_flag_string(get_flags());
582 out
<< " e" << osdmap_epoch
;