1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
16 #ifndef CEPH_MOSDOPREPLY_H
17 #define CEPH_MOSDOPREPLY_H
19 #include "msg/Message.h"
22 #include "os/ObjectStore.h"
23 #include "common/errno.h"
29 * op - OSD_OP_DELETE, etc.
33 class MOSDOpReply
: public Message
{
35 static const int HEAD_VERSION
= 8;
36 static const int COMPAT_VERSION
= 2;
43 eversion_t bad_replay_version
;
44 eversion_t replay_version
;
45 version_t user_version
;
47 int32_t retry_attempt
;
49 request_redirect_t redirect
;
52 const object_t
& get_oid() const { return oid
; }
53 const pg_t
& get_pg() const { return pgid
; }
54 int get_flags() const { return flags
; }
56 bool is_ondisk() const { return get_flags() & CEPH_OSD_FLAG_ONDISK
; }
57 bool is_onnvram() const { return get_flags() & CEPH_OSD_FLAG_ONNVRAM
; }
59 int get_result() const { return result
; }
60 const eversion_t
& get_replay_version() const { return replay_version
; }
61 const version_t
& get_user_version() const { return user_version
; }
63 void set_result(int r
) { result
= r
; }
65 void set_reply_versions(eversion_t v
, version_t uv
) {
68 /* We go through some shenanigans here for backwards compatibility
69 * with old clients, who do not look at our replay_version and
70 * user_version but instead see what we now call the
71 * bad_replay_version. On pools without caching
72 * the user_version infrastructure is a slightly-laggy copy of
73 * the regular pg version/at_version infrastructure; the difference
74 * being it is not updated on watch ops like that is -- but on updates
75 * it is set equal to at_version. This means that for non-watch write ops
76 * on classic pools, all three of replay_version, user_version, and
77 * bad_replay_version are identical. But for watch ops the replay_version
78 * has been updated, while the user_at_version has not, and the semantics
79 * we promised old clients are that the version they see is not an update.
80 * So set the bad_replay_version to be the same as the user_at_version. */
81 bad_replay_version
= v
;
83 bad_replay_version
.version
= uv
;
87 /* Don't fill in replay_version for non-write ops */
88 void set_enoent_reply_versions(const eversion_t
& v
, const version_t
& uv
) {
90 bad_replay_version
= v
;
93 void set_redirect(const request_redirect_t
& redir
) { redirect
= redir
; }
94 const request_redirect_t
& get_redirect() const { return redirect
; }
95 bool is_redirect_reply() const { return do_redirect
; }
97 void add_flags(int f
) { flags
|= f
; }
99 void claim_op_out_data(vector
<OSDOp
>& o
) {
100 assert(ops
.size() == o
.size());
101 for (unsigned i
= 0; i
< o
.size(); i
++) {
102 ops
[i
].outdata
.claim(o
[i
].outdata
);
105 void claim_ops(vector
<OSDOp
>& o
) {
112 * If we don't know the attempt (because the server is old), return -1.
114 int get_retry_attempt() const {
115 return retry_attempt
;
119 epoch_t
get_map_epoch() const { return osdmap_epoch
; }
121 /*osd_reqid_t get_reqid() { return osd_reqid_t(get_dest(),
128 : Message(CEPH_MSG_OSD_OPREPLY
, HEAD_VERSION
, COMPAT_VERSION
) {
131 MOSDOpReply(const MOSDOp
*req
, int r
, epoch_t e
, int acktype
,
132 bool ignore_out_data
)
133 : Message(CEPH_MSG_OSD_OPREPLY
, HEAD_VERSION
, COMPAT_VERSION
),
134 oid(req
->hobj
.oid
), pgid(req
->pgid
.pgid
), ops(req
->ops
) {
136 set_tid(req
->get_tid());
139 (req
->flags
& ~(CEPH_OSD_FLAG_ONDISK
|CEPH_OSD_FLAG_ONNVRAM
|CEPH_OSD_FLAG_ACK
)) | acktype
;
142 retry_attempt
= req
->get_retry_attempt();
145 // zero out ops payload_len and possibly out data
146 for (unsigned i
= 0; i
< ops
.size(); i
++) {
147 ops
[i
].op
.payload_len
= 0;
149 ops
[i
].outdata
.clear();
153 ~MOSDOpReply() override
{}
156 void encode_payload(uint64_t features
) override
{
158 OSDOp::merge_osd_op_vector_out_data(ops
, data
);
160 if ((features
& CEPH_FEATURE_PGID64
) == 0) {
162 ceph_osd_reply_head head
;
163 memset(&head
, 0, sizeof(head
));
164 head
.layout
.ol_pgid
= pgid
.get_old_pg().v
;
166 head
.osdmap_epoch
= osdmap_epoch
;
167 head
.reassert_version
= bad_replay_version
;
168 head
.result
= result
;
169 head
.num_ops
= ops
.size();
170 head
.object_len
= oid
.name
.length();
171 ::encode(head
, payload
);
172 for (unsigned i
= 0; i
< head
.num_ops
; i
++) {
173 ::encode(ops
[i
].op
, payload
);
175 ::encode_nohead(oid
.name
, payload
);
177 header
.version
= HEAD_VERSION
;
178 ::encode(oid
, payload
);
179 ::encode(pgid
, payload
);
180 ::encode(flags
, payload
);
181 ::encode(result
, payload
);
182 ::encode(bad_replay_version
, payload
);
183 ::encode(osdmap_epoch
, payload
);
185 __u32 num_ops
= ops
.size();
186 ::encode(num_ops
, payload
);
187 for (unsigned i
= 0; i
< num_ops
; i
++)
188 ::encode(ops
[i
].op
, payload
);
190 ::encode(retry_attempt
, payload
);
192 for (unsigned i
= 0; i
< num_ops
; i
++)
193 ::encode(ops
[i
].rval
, payload
);
195 ::encode(replay_version
, payload
);
196 ::encode(user_version
, payload
);
197 if ((features
& CEPH_FEATURE_NEW_OSDOPREPLY_ENCODING
) == 0) {
199 ::encode(redirect
, payload
);
201 do_redirect
= !redirect
.empty();
202 ::encode(do_redirect
, payload
);
204 ::encode(redirect
, payload
);
207 encode_trace(payload
, features
);
210 void decode_payload() override
{
211 bufferlist::iterator p
= payload
.begin();
213 // Always keep here the newest version of decoding order/rule
214 if (header
.version
== HEAD_VERSION
) {
219 ::decode(bad_replay_version
, p
);
220 ::decode(osdmap_epoch
, p
);
222 __u32 num_ops
= ops
.size();
223 ::decode(num_ops
, p
);
225 for (unsigned i
= 0; i
< num_ops
; i
++)
226 ::decode(ops
[i
].op
, p
);
227 ::decode(retry_attempt
, p
);
229 for (unsigned i
= 0; i
< num_ops
; ++i
)
230 ::decode(ops
[i
].rval
, p
);
232 OSDOp::split_osd_op_vector_out_data(ops
, data
);
234 ::decode(replay_version
, p
);
235 ::decode(user_version
, p
);
236 ::decode(do_redirect
, p
);
238 ::decode(redirect
, p
);
239 } else if (header
.version
< 2) {
240 ceph_osd_reply_head head
;
242 ops
.resize(head
.num_ops
);
243 for (unsigned i
= 0; i
< head
.num_ops
; i
++) {
244 ::decode(ops
[i
].op
, p
);
246 ::decode_nohead(head
.object_len
, oid
.name
, p
);
247 pgid
= pg_t(head
.layout
.ol_pgid
);
248 result
= (int32_t)head
.result
;
250 replay_version
= head
.reassert_version
;
251 user_version
= replay_version
.version
;
252 osdmap_epoch
= head
.osdmap_epoch
;
259 ::decode(bad_replay_version
, p
);
260 ::decode(osdmap_epoch
, p
);
262 __u32 num_ops
= ops
.size();
263 ::decode(num_ops
, p
);
265 for (unsigned i
= 0; i
< num_ops
; i
++)
266 ::decode(ops
[i
].op
, p
);
268 if (header
.version
>= 3)
269 ::decode(retry_attempt
, p
);
273 if (header
.version
>= 4) {
274 for (unsigned i
= 0; i
< num_ops
; ++i
)
275 ::decode(ops
[i
].rval
, p
);
277 OSDOp::split_osd_op_vector_out_data(ops
, data
);
280 if (header
.version
>= 5) {
281 ::decode(replay_version
, p
);
282 ::decode(user_version
, p
);
284 replay_version
= bad_replay_version
;
285 user_version
= replay_version
.version
;
288 if (header
.version
== 6) {
289 ::decode(redirect
, p
);
290 do_redirect
= !redirect
.empty();
292 if (header
.version
>= 7) {
293 ::decode(do_redirect
, p
);
295 ::decode(redirect
, p
);
298 if (header
.version
>= 8) {
304 const char *get_type_name() const override
{ return "osd_op_reply"; }
306 void print(ostream
& out
) const override
{
307 out
<< "osd_op_reply(" << get_tid()
308 << " " << oid
<< " " << ops
309 << " v" << get_replay_version()
310 << " uv" << get_user_version();
313 else if (is_onnvram())
317 out
<< " = " << get_result();
318 if (get_result() < 0) {
319 out
<< " (" << cpp_strerror(get_result()) << ")";
321 if (is_redirect_reply()) {
322 out
<< " redirect: { " << redirect
<< " }";