]> git.proxmox.com Git - ceph.git/blob - ceph/src/messages/MOSDOpReply.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / messages / MOSDOpReply.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #ifndef CEPH_MOSDOPREPLY_H
17 #define CEPH_MOSDOPREPLY_H
18
19 #include "msg/Message.h"
20
21 #include "MOSDOp.h"
22 #include "common/errno.h"
23
24 /*
25 * OSD op reply
26 *
27 * oid - object id
28 * op - OSD_OP_DELETE, etc.
29 *
30 */
31
32 class MOSDOpReply : public Message {
33 private:
34 static constexpr int HEAD_VERSION = 8;
35 static constexpr int COMPAT_VERSION = 2;
36
37 object_t oid;
38 pg_t pgid;
39 std::vector<OSDOp> ops;
40 bool bdata_encode;
41 int64_t flags = 0;
42 errorcode32_t result;
43 eversion_t bad_replay_version;
44 eversion_t replay_version;
45 version_t user_version = 0;
46 epoch_t osdmap_epoch = 0;
47 int32_t retry_attempt = -1;
48 bool do_redirect;
49 request_redirect_t redirect;
50
51 public:
52 const object_t& get_oid() const { return oid; }
53 const pg_t& get_pg() const { return pgid; }
54 int get_flags() const { return flags; }
55
56 bool is_ondisk() const { return get_flags() & CEPH_OSD_FLAG_ONDISK; }
57 bool is_onnvram() const { return get_flags() & CEPH_OSD_FLAG_ONNVRAM; }
58
59 int get_result() const { return result; }
60 const eversion_t& get_replay_version() const { return replay_version; }
61 const version_t& get_user_version() const { return user_version; }
62
63 void set_result(int r) { result = r; }
64
65 void set_reply_versions(eversion_t v, version_t uv) {
66 replay_version = v;
67 user_version = uv;
68 /* We go through some shenanigans here for backwards compatibility
69 * with old clients, who do not look at our replay_version and
70 * user_version but instead see what we now call the
71 * bad_replay_version. On pools without caching
72 * the user_version infrastructure is a slightly-laggy copy of
73 * the regular pg version/at_version infrastructure; the difference
74 * being it is not updated on watch ops like that is -- but on updates
75 * it is set equal to at_version. This means that for non-watch write ops
76 * on classic pools, all three of replay_version, user_version, and
77 * bad_replay_version are identical. But for watch ops the replay_version
78 * has been updated, while the user_at_version has not, and the semantics
79 * we promised old clients are that the version they see is not an update.
80 * So set the bad_replay_version to be the same as the user_at_version. */
81 bad_replay_version = v;
82 if (uv) {
83 bad_replay_version.version = uv;
84 }
85 }
86
87 /* Don't fill in replay_version for non-write ops */
88 void set_enoent_reply_versions(const eversion_t& v, const version_t& uv) {
89 user_version = uv;
90 bad_replay_version = v;
91 }
92
93 void set_redirect(const request_redirect_t& redir) { redirect = redir; }
94 const request_redirect_t& get_redirect() const { return redirect; }
95 bool is_redirect_reply() const { return do_redirect; }
96
97 void add_flags(int f) { flags |= f; }
98
99 void claim_op_out_data(std::vector<OSDOp>& o) {
100 ceph_assert(ops.size() == o.size());
101 for (unsigned i = 0; i < o.size(); i++) {
102 ops[i].outdata.claim(o[i].outdata);
103 }
104 }
105 void claim_ops(std::vector<OSDOp>& o) {
106 o.swap(ops);
107 bdata_encode = false;
108 }
109 void set_op_returns(const vector<pg_log_op_return_item_t>& op_returns) {
110 if (op_returns.size()) {
111 ceph_assert(ops.empty() || ops.size() == op_returns.size());
112 ops.resize(op_returns.size());
113 for (unsigned i = 0; i < op_returns.size(); ++i) {
114 ops[i].rval = op_returns[i].rval;
115 ops[i].outdata = op_returns[i].bl;
116 }
117 }
118 }
119
120 /**
121 * get retry attempt
122 *
123 * If we don't know the attempt (because the server is old), return -1.
124 */
125 int get_retry_attempt() const {
126 return retry_attempt;
127 }
128
129 // osdmap
130 epoch_t get_map_epoch() const { return osdmap_epoch; }
131
132 /*osd_reqid_t get_reqid() { return osd_reqid_t(get_dest(),
133 head.client_inc,
134 head.tid); }
135 */
136
137 public:
138 MOSDOpReply()
139 : Message{CEPH_MSG_OSD_OPREPLY, HEAD_VERSION, COMPAT_VERSION},
140 bdata_encode(false) {
141 do_redirect = false;
142 }
143 MOSDOpReply(const MOSDOp *req, int r, epoch_t e, int acktype,
144 bool ignore_out_data)
145 : Message{CEPH_MSG_OSD_OPREPLY, HEAD_VERSION, COMPAT_VERSION},
146 oid(req->hobj.oid), pgid(req->pgid.pgid), ops(req->ops),
147 bdata_encode(false) {
148
149 set_tid(req->get_tid());
150 result = r;
151 flags =
152 (req->flags & ~(CEPH_OSD_FLAG_ONDISK|CEPH_OSD_FLAG_ONNVRAM|CEPH_OSD_FLAG_ACK)) | acktype;
153 osdmap_epoch = e;
154 user_version = 0;
155 retry_attempt = req->get_retry_attempt();
156 do_redirect = false;
157
158 for (unsigned i = 0; i < ops.size(); i++) {
159 // zero out input data
160 ops[i].indata.clear();
161 if (ignore_out_data) {
162 // original request didn't set the RETURNVEC flag
163 ops[i].outdata.clear();
164 }
165 }
166 }
167 private:
168 ~MOSDOpReply() override {}
169
170 public:
171 void encode_payload(uint64_t features) override {
172 using ceph::encode;
173 if(false == bdata_encode) {
174 OSDOp::merge_osd_op_vector_out_data(ops, data);
175 bdata_encode = true;
176 }
177
178 if ((features & CEPH_FEATURE_PGID64) == 0) {
179 header.version = 1;
180 ceph_osd_reply_head head;
181 memset(&head, 0, sizeof(head));
182 head.layout.ol_pgid = pgid.get_old_pg().v;
183 head.flags = flags;
184 head.osdmap_epoch = osdmap_epoch;
185 head.reassert_version = bad_replay_version;
186 head.result = result;
187 head.num_ops = ops.size();
188 head.object_len = oid.name.length();
189 encode(head, payload);
190 for (unsigned i = 0; i < head.num_ops; i++) {
191 encode(ops[i].op, payload);
192 }
193 ceph::encode_nohead(oid.name, payload);
194 } else {
195 header.version = HEAD_VERSION;
196 encode(oid, payload);
197 encode(pgid, payload);
198 encode(flags, payload);
199 encode(result, payload);
200 encode(bad_replay_version, payload);
201 encode(osdmap_epoch, payload);
202
203 __u32 num_ops = ops.size();
204 encode(num_ops, payload);
205 for (unsigned i = 0; i < num_ops; i++)
206 encode(ops[i].op, payload);
207
208 encode(retry_attempt, payload);
209
210 for (unsigned i = 0; i < num_ops; i++)
211 encode(ops[i].rval, payload);
212
213 encode(replay_version, payload);
214 encode(user_version, payload);
215 if ((features & CEPH_FEATURE_NEW_OSDOPREPLY_ENCODING) == 0) {
216 header.version = 6;
217 encode(redirect, payload);
218 } else {
219 do_redirect = !redirect.empty();
220 encode(do_redirect, payload);
221 if (do_redirect) {
222 encode(redirect, payload);
223 }
224 }
225 encode_trace(payload, features);
226 }
227 }
228 void decode_payload() override {
229 using ceph::decode;
230 auto p = payload.cbegin();
231
232 // Always keep here the newest version of decoding order/rule
233 if (header.version == HEAD_VERSION) {
234 decode(oid, p);
235 decode(pgid, p);
236 decode(flags, p);
237 decode(result, p);
238 decode(bad_replay_version, p);
239 decode(osdmap_epoch, p);
240
241 __u32 num_ops = ops.size();
242 decode(num_ops, p);
243 ops.resize(num_ops);
244 for (unsigned i = 0; i < num_ops; i++)
245 decode(ops[i].op, p);
246 decode(retry_attempt, p);
247
248 for (unsigned i = 0; i < num_ops; ++i)
249 decode(ops[i].rval, p);
250
251 OSDOp::split_osd_op_vector_out_data(ops, data);
252
253 decode(replay_version, p);
254 decode(user_version, p);
255 decode(do_redirect, p);
256 if (do_redirect)
257 decode(redirect, p);
258 decode_trace(p);
259 } else if (header.version < 2) {
260 ceph_osd_reply_head head;
261 decode(head, p);
262 ops.resize(head.num_ops);
263 for (unsigned i = 0; i < head.num_ops; i++) {
264 decode(ops[i].op, p);
265 }
266 ceph::decode_nohead(head.object_len, oid.name, p);
267 pgid = pg_t(head.layout.ol_pgid);
268 result = (int32_t)head.result;
269 flags = head.flags;
270 replay_version = head.reassert_version;
271 user_version = replay_version.version;
272 osdmap_epoch = head.osdmap_epoch;
273 retry_attempt = -1;
274 } else {
275 decode(oid, p);
276 decode(pgid, p);
277 decode(flags, p);
278 decode(result, p);
279 decode(bad_replay_version, p);
280 decode(osdmap_epoch, p);
281
282 __u32 num_ops = ops.size();
283 decode(num_ops, p);
284 ops.resize(num_ops);
285 for (unsigned i = 0; i < num_ops; i++)
286 decode(ops[i].op, p);
287
288 if (header.version >= 3)
289 decode(retry_attempt, p);
290 else
291 retry_attempt = -1;
292
293 if (header.version >= 4) {
294 for (unsigned i = 0; i < num_ops; ++i)
295 decode(ops[i].rval, p);
296
297 OSDOp::split_osd_op_vector_out_data(ops, data);
298 }
299
300 if (header.version >= 5) {
301 decode(replay_version, p);
302 decode(user_version, p);
303 } else {
304 replay_version = bad_replay_version;
305 user_version = replay_version.version;
306 }
307
308 if (header.version == 6) {
309 decode(redirect, p);
310 do_redirect = !redirect.empty();
311 }
312 if (header.version >= 7) {
313 decode(do_redirect, p);
314 if (do_redirect) {
315 decode(redirect, p);
316 }
317 }
318 if (header.version >= 8) {
319 decode_trace(p);
320 }
321 }
322 }
323
324 std::string_view get_type_name() const override { return "osd_op_reply"; }
325
326 void print(std::ostream& out) const override {
327 out << "osd_op_reply(" << get_tid()
328 << " " << oid << " " << ops
329 << " v" << get_replay_version()
330 << " uv" << get_user_version();
331 if (is_ondisk())
332 out << " ondisk";
333 else if (is_onnvram())
334 out << " onnvram";
335 else
336 out << " ack";
337 out << " = " << get_result();
338 if (get_result() < 0) {
339 out << " (" << cpp_strerror(get_result()) << ")";
340 }
341 if (is_redirect_reply()) {
342 out << " redirect: { " << redirect << " }";
343 }
344 out << ")";
345 }
346
347 private:
348 template<class T, typename... Args>
349 friend boost::intrusive_ptr<T> ceph::make_message(Args&&... args);
350 };
351
352
353 #endif