]> git.proxmox.com Git - ceph.git/blob - ceph/src/osdc/Objecter.h
import ceph 16.2.6
[ceph.git] / ceph / src / osdc / Objecter.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OBJECTER_H
16 #define CEPH_OBJECTER_H
17
18 #include <condition_variable>
19 #include <list>
20 #include <map>
21 #include <mutex>
22 #include <memory>
23 #include <sstream>
24 #include <string>
25 #include <string_view>
26 #include <type_traits>
27 #include <variant>
28
29 #include <boost/container/small_vector.hpp>
30 #include <boost/asio.hpp>
31
32 #include <fmt/format.h>
33
34 #include "include/buffer.h"
35 #include "include/ceph_assert.h"
36 #include "include/ceph_fs.h"
37 #include "include/common_fwd.h"
38 #include "include/expected.hpp"
39 #include "include/types.h"
40 #include "include/rados/rados_types.hpp"
41 #include "include/function2.hpp"
42 #include "include/neorados/RADOS_Decodable.hpp"
43
44 #include "common/admin_socket.h"
45 #include "common/async/completion.h"
46 #include "common/ceph_time.h"
47 #include "common/ceph_mutex.h"
48 #include "common/ceph_timer.h"
49 #include "common/config_obs.h"
50 #include "common/shunique_lock.h"
51 #include "common/zipkin_trace.h"
52 #include "common/Throttle.h"
53
54 #include "mon/MonClient.h"
55
56 #include "messages/MOSDOp.h"
57 #include "msg/Dispatcher.h"
58
59 #include "osd/OSDMap.h"
60
61 class Context;
62 class Messenger;
63 class MonClient;
64 class Message;
65
66 class MPoolOpReply;
67
68 class MGetPoolStatsReply;
69 class MStatfsReply;
70 class MCommandReply;
71 class MWatchNotify;
72 template<typename T>
73 struct EnumerationContext;
74 template<typename t>
75 struct CB_EnumerateReply;
76
77 inline constexpr std::size_t osdc_opvec_len = 2;
78 using osdc_opvec = boost::container::small_vector<OSDOp, osdc_opvec_len>;
79
80 // -----------------------------------------
81
82 struct ObjectOperation {
83 osdc_opvec ops;
84 int flags = 0;
85 int priority = 0;
86
87 boost::container::small_vector<ceph::buffer::list*, osdc_opvec_len> out_bl;
88 boost::container::small_vector<
89 fu2::unique_function<void(boost::system::error_code, int,
90 const ceph::buffer::list& bl) &&>,
91 osdc_opvec_len> out_handler;
92 boost::container::small_vector<int*, osdc_opvec_len> out_rval;
93 boost::container::small_vector<boost::system::error_code*,
94 osdc_opvec_len> out_ec;
95
96 ObjectOperation() = default;
97 ObjectOperation(const ObjectOperation&) = delete;
98 ObjectOperation& operator =(const ObjectOperation&) = delete;
99 ObjectOperation(ObjectOperation&&) = default;
100 ObjectOperation& operator =(ObjectOperation&&) = default;
101 ~ObjectOperation() = default;
102
103 size_t size() const {
104 return ops.size();
105 }
106
107 void clear() {
108 ops.clear();
109 flags = 0;
110 priority = 0;
111 out_bl.clear();
112 out_handler.clear();
113 out_rval.clear();
114 out_ec.clear();
115 }
116
117 void set_last_op_flags(int flags) {
118 ceph_assert(!ops.empty());
119 ops.rbegin()->op.flags = flags;
120 }
121
122
123 void set_handler(fu2::unique_function<void(boost::system::error_code, int,
124 const ceph::buffer::list&) &&> f) {
125 if (f) {
126 if (out_handler.back()) {
127 // This happens seldom enough that we may as well keep folding
128 // functions together when we get another one rather than
129 // using a container.
130 out_handler.back() =
131 [f = std::move(f),
132 g = std::move(std::move(out_handler.back()))]
133 (boost::system::error_code ec, int r,
134 const ceph::buffer::list& bl) mutable {
135 std::move(g)(ec, r, bl);
136 std::move(f)(ec, r, bl);
137 };
138 } else {
139 out_handler.back() = std::move(f);
140 }
141 }
142 ceph_assert(ops.size() == out_handler.size());
143 }
144
145 void set_handler(Context *c) {
146 if (c)
147 set_handler([c = std::unique_ptr<Context>(c)](boost::system::error_code,
148 int r,
149 const ceph::buffer::list&) mutable {
150 c.release()->complete(r);
151 });
152
153 }
154
155 OSDOp& add_op(int op) {
156 ops.emplace_back();
157 ops.back().op.op = op;
158 out_bl.push_back(nullptr);
159 ceph_assert(ops.size() == out_bl.size());
160 out_handler.emplace_back();
161 ceph_assert(ops.size() == out_handler.size());
162 out_rval.push_back(nullptr);
163 ceph_assert(ops.size() == out_rval.size());
164 out_ec.push_back(nullptr);
165 ceph_assert(ops.size() == out_ec.size());
166 return ops.back();
167 }
168 void add_data(int op, uint64_t off, uint64_t len, ceph::buffer::list& bl) {
169 OSDOp& osd_op = add_op(op);
170 osd_op.op.extent.offset = off;
171 osd_op.op.extent.length = len;
172 osd_op.indata.claim_append(bl);
173 }
174 void add_writesame(int op, uint64_t off, uint64_t write_len,
175 ceph::buffer::list& bl) {
176 OSDOp& osd_op = add_op(op);
177 osd_op.op.writesame.offset = off;
178 osd_op.op.writesame.length = write_len;
179 osd_op.op.writesame.data_length = bl.length();
180 osd_op.indata.claim_append(bl);
181 }
182 void add_xattr(int op, const char *name, const ceph::buffer::list& data) {
183 OSDOp& osd_op = add_op(op);
184 osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
185 osd_op.op.xattr.value_len = data.length();
186 if (name)
187 osd_op.indata.append(name, osd_op.op.xattr.name_len);
188 osd_op.indata.append(data);
189 }
190 void add_xattr_cmp(int op, const char *name, uint8_t cmp_op,
191 uint8_t cmp_mode, const ceph::buffer::list& data) {
192 OSDOp& osd_op = add_op(op);
193 osd_op.op.xattr.name_len = (name ? strlen(name) : 0);
194 osd_op.op.xattr.value_len = data.length();
195 osd_op.op.xattr.cmp_op = cmp_op;
196 osd_op.op.xattr.cmp_mode = cmp_mode;
197 if (name)
198 osd_op.indata.append(name, osd_op.op.xattr.name_len);
199 osd_op.indata.append(data);
200 }
201 void add_xattr(int op, std::string_view name, const ceph::buffer::list& data) {
202 OSDOp& osd_op = add_op(op);
203 osd_op.op.xattr.name_len = name.size();
204 osd_op.op.xattr.value_len = data.length();
205 osd_op.indata.append(name.data(), osd_op.op.xattr.name_len);
206 osd_op.indata.append(data);
207 }
208 void add_xattr_cmp(int op, std::string_view name, uint8_t cmp_op,
209 uint8_t cmp_mode, const ceph::buffer::list& data) {
210 OSDOp& osd_op = add_op(op);
211 osd_op.op.xattr.name_len = name.size();
212 osd_op.op.xattr.value_len = data.length();
213 osd_op.op.xattr.cmp_op = cmp_op;
214 osd_op.op.xattr.cmp_mode = cmp_mode;
215 if (!name.empty())
216 osd_op.indata.append(name.data(), osd_op.op.xattr.name_len);
217 osd_op.indata.append(data);
218 }
219 void add_call(int op, std::string_view cname, std::string_view method,
220 const ceph::buffer::list &indata,
221 ceph::buffer::list *outbl, Context *ctx, int *prval) {
222 OSDOp& osd_op = add_op(op);
223
224 unsigned p = ops.size() - 1;
225 set_handler(ctx);
226 out_bl[p] = outbl;
227 out_rval[p] = prval;
228
229 osd_op.op.cls.class_len = cname.size();
230 osd_op.op.cls.method_len = method.size();
231 osd_op.op.cls.indata_len = indata.length();
232 osd_op.indata.append(cname.data(), osd_op.op.cls.class_len);
233 osd_op.indata.append(method.data(), osd_op.op.cls.method_len);
234 osd_op.indata.append(indata);
235 }
236 void add_call(int op, std::string_view cname, std::string_view method,
237 const ceph::buffer::list &indata,
238 fu2::unique_function<void(boost::system::error_code,
239 const ceph::buffer::list&) &&> f) {
240 OSDOp& osd_op = add_op(op);
241
242 set_handler([f = std::move(f)](boost::system::error_code ec,
243 int,
244 const ceph::buffer::list& bl) mutable {
245 std::move(f)(ec, bl);
246 });
247
248 osd_op.op.cls.class_len = cname.size();
249 osd_op.op.cls.method_len = method.size();
250 osd_op.op.cls.indata_len = indata.length();
251 osd_op.indata.append(cname.data(), osd_op.op.cls.class_len);
252 osd_op.indata.append(method.data(), osd_op.op.cls.method_len);
253 osd_op.indata.append(indata);
254 }
255 void add_call(int op, std::string_view cname, std::string_view method,
256 const ceph::buffer::list &indata,
257 fu2::unique_function<void(boost::system::error_code, int,
258 const ceph::buffer::list&) &&> f) {
259 OSDOp& osd_op = add_op(op);
260
261 set_handler([f = std::move(f)](boost::system::error_code ec,
262 int r,
263 const ceph::buffer::list& bl) mutable {
264 std::move(f)(ec, r, bl);
265 });
266
267 osd_op.op.cls.class_len = cname.size();
268 osd_op.op.cls.method_len = method.size();
269 osd_op.op.cls.indata_len = indata.length();
270 osd_op.indata.append(cname.data(), osd_op.op.cls.class_len);
271 osd_op.indata.append(method.data(), osd_op.op.cls.method_len);
272 osd_op.indata.append(indata);
273 }
274 void add_pgls(int op, uint64_t count, collection_list_handle_t cookie,
275 epoch_t start_epoch) {
276 using ceph::encode;
277 OSDOp& osd_op = add_op(op);
278 osd_op.op.pgls.count = count;
279 osd_op.op.pgls.start_epoch = start_epoch;
280 encode(cookie, osd_op.indata);
281 }
282 void add_pgls_filter(int op, uint64_t count, const ceph::buffer::list& filter,
283 collection_list_handle_t cookie, epoch_t start_epoch) {
284 using ceph::encode;
285 OSDOp& osd_op = add_op(op);
286 osd_op.op.pgls.count = count;
287 osd_op.op.pgls.start_epoch = start_epoch;
288 std::string cname = "pg";
289 std::string mname = "filter";
290 encode(cname, osd_op.indata);
291 encode(mname, osd_op.indata);
292 osd_op.indata.append(filter);
293 encode(cookie, osd_op.indata);
294 }
295 void add_alloc_hint(int op, uint64_t expected_object_size,
296 uint64_t expected_write_size,
297 uint32_t flags) {
298 OSDOp& osd_op = add_op(op);
299 osd_op.op.alloc_hint.expected_object_size = expected_object_size;
300 osd_op.op.alloc_hint.expected_write_size = expected_write_size;
301 osd_op.op.alloc_hint.flags = flags;
302 }
303
304 // ------
305
306 // pg
307 void pg_ls(uint64_t count, ceph::buffer::list& filter,
308 collection_list_handle_t cookie, epoch_t start_epoch) {
309 if (filter.length() == 0)
310 add_pgls(CEPH_OSD_OP_PGLS, count, cookie, start_epoch);
311 else
312 add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER, count, filter, cookie,
313 start_epoch);
314 flags |= CEPH_OSD_FLAG_PGOP;
315 }
316
317 void pg_nls(uint64_t count, const ceph::buffer::list& filter,
318 collection_list_handle_t cookie, epoch_t start_epoch) {
319 if (filter.length() == 0)
320 add_pgls(CEPH_OSD_OP_PGNLS, count, cookie, start_epoch);
321 else
322 add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER, count, filter, cookie,
323 start_epoch);
324 flags |= CEPH_OSD_FLAG_PGOP;
325 }
326
327 void scrub_ls(const librados::object_id_t& start_after,
328 uint64_t max_to_get,
329 std::vector<librados::inconsistent_obj_t> *objects,
330 uint32_t *interval,
331 int *rval);
332 void scrub_ls(const librados::object_id_t& start_after,
333 uint64_t max_to_get,
334 std::vector<librados::inconsistent_snapset_t> *objects,
335 uint32_t *interval,
336 int *rval);
337
338 void create(bool excl) {
339 OSDOp& o = add_op(CEPH_OSD_OP_CREATE);
340 o.op.flags = (excl ? CEPH_OSD_OP_FLAG_EXCL : 0);
341 }
342
343 struct CB_ObjectOperation_stat {
344 ceph::buffer::list bl;
345 uint64_t *psize;
346 ceph::real_time *pmtime;
347 time_t *ptime;
348 struct timespec *pts;
349 int *prval;
350 boost::system::error_code* pec;
351 CB_ObjectOperation_stat(uint64_t *ps, ceph::real_time *pm, time_t *pt, struct timespec *_pts,
352 int *prval, boost::system::error_code* pec)
353 : psize(ps), pmtime(pm), ptime(pt), pts(_pts), prval(prval), pec(pec) {}
354 void operator()(boost::system::error_code ec, int r, const ceph::buffer::list& bl) {
355 using ceph::decode;
356 if (r >= 0) {
357 auto p = bl.cbegin();
358 try {
359 uint64_t size;
360 ceph::real_time mtime;
361 decode(size, p);
362 decode(mtime, p);
363 if (psize)
364 *psize = size;
365 if (pmtime)
366 *pmtime = mtime;
367 if (ptime)
368 *ptime = ceph::real_clock::to_time_t(mtime);
369 if (pts)
370 *pts = ceph::real_clock::to_timespec(mtime);
371 } catch (const ceph::buffer::error& e) {
372 if (prval)
373 *prval = -EIO;
374 if (pec)
375 *pec = e.code();
376 }
377 }
378 }
379 };
380 void stat(uint64_t *psize, ceph::real_time *pmtime, int *prval) {
381 add_op(CEPH_OSD_OP_STAT);
382 set_handler(CB_ObjectOperation_stat(psize, pmtime, nullptr, nullptr, prval,
383 nullptr));
384 out_rval.back() = prval;
385 }
386 void stat(uint64_t *psize, ceph::real_time *pmtime,
387 boost::system::error_code* ec) {
388 add_op(CEPH_OSD_OP_STAT);
389 set_handler(CB_ObjectOperation_stat(psize, pmtime, nullptr, nullptr,
390 nullptr, ec));
391 out_ec.back() = ec;
392 }
393 void stat(uint64_t *psize, time_t *ptime, int *prval) {
394 add_op(CEPH_OSD_OP_STAT);
395 set_handler(CB_ObjectOperation_stat(psize, nullptr, ptime, nullptr, prval,
396 nullptr));
397 out_rval.back() = prval;
398 }
399 void stat(uint64_t *psize, struct timespec *pts, int *prval) {
400 add_op(CEPH_OSD_OP_STAT);
401 set_handler(CB_ObjectOperation_stat(psize, nullptr, nullptr, pts, prval, nullptr));
402 out_rval.back() = prval;
403 }
404 void stat(uint64_t *psize, ceph::real_time *pmtime, nullptr_t) {
405 add_op(CEPH_OSD_OP_STAT);
406 set_handler(CB_ObjectOperation_stat(psize, pmtime, nullptr, nullptr, nullptr,
407 nullptr));
408 }
409 void stat(uint64_t *psize, time_t *ptime, nullptr_t) {
410 add_op(CEPH_OSD_OP_STAT);
411 set_handler(CB_ObjectOperation_stat(psize, nullptr, ptime, nullptr, nullptr,
412 nullptr));
413 }
414 void stat(uint64_t *psize, struct timespec *pts, nullptr_t) {
415 add_op(CEPH_OSD_OP_STAT);
416 set_handler(CB_ObjectOperation_stat(psize, nullptr, nullptr, pts, nullptr,
417 nullptr));
418 }
419 void stat(uint64_t *psize, nullptr_t, nullptr_t) {
420 add_op(CEPH_OSD_OP_STAT);
421 set_handler(CB_ObjectOperation_stat(psize, nullptr, nullptr, nullptr,
422 nullptr, nullptr));
423 }
424
425 // object cmpext
426 struct CB_ObjectOperation_cmpext {
427 int* prval = nullptr;
428 boost::system::error_code* ec = nullptr;
429 std::size_t* s = nullptr;
430 explicit CB_ObjectOperation_cmpext(int *prval)
431 : prval(prval) {}
432 CB_ObjectOperation_cmpext(boost::system::error_code* ec, std::size_t* s)
433 : ec(ec), s(s) {}
434
435 void operator()(boost::system::error_code ec, int r, const ceph::buffer::list&) {
436 if (prval)
437 *prval = r;
438 if (this->ec)
439 *this->ec = ec;
440 if (s)
441 *s = static_cast<std::size_t>(-(MAX_ERRNO - r));
442 }
443 };
444
445 void cmpext(uint64_t off, ceph::buffer::list& cmp_bl, int *prval) {
446 add_data(CEPH_OSD_OP_CMPEXT, off, cmp_bl.length(), cmp_bl);
447 set_handler(CB_ObjectOperation_cmpext(prval));
448 out_rval.back() = prval;
449 }
450
451 void cmpext(uint64_t off, ceph::buffer::list&& cmp_bl, boost::system::error_code* ec,
452 std::size_t* s) {
453 add_data(CEPH_OSD_OP_CMPEXT, off, cmp_bl.length(), cmp_bl);
454 set_handler(CB_ObjectOperation_cmpext(ec, s));
455 out_ec.back() = ec;
456 }
457
458 // Used by C API
459 void cmpext(uint64_t off, uint64_t cmp_len, const char *cmp_buf, int *prval) {
460 ceph::buffer::list cmp_bl;
461 cmp_bl.append(cmp_buf, cmp_len);
462 add_data(CEPH_OSD_OP_CMPEXT, off, cmp_len, cmp_bl);
463 set_handler(CB_ObjectOperation_cmpext(prval));
464 out_rval.back() = prval;
465 }
466
467 void read(uint64_t off, uint64_t len, ceph::buffer::list *pbl, int *prval,
468 Context* ctx) {
469 ceph::buffer::list bl;
470 add_data(CEPH_OSD_OP_READ, off, len, bl);
471 unsigned p = ops.size() - 1;
472 out_bl[p] = pbl;
473 out_rval[p] = prval;
474 set_handler(ctx);
475 }
476
477 void read(uint64_t off, uint64_t len, boost::system::error_code* ec,
478 ceph::buffer::list* pbl) {
479 ceph::buffer::list bl;
480 add_data(CEPH_OSD_OP_READ, off, len, bl);
481 out_ec.back() = ec;
482 out_bl.back() = pbl;
483 }
484
485 template<typename Ex>
486 struct CB_ObjectOperation_sparse_read {
487 ceph::buffer::list* data_bl;
488 Ex* extents;
489 int* prval;
490 boost::system::error_code* pec;
491 CB_ObjectOperation_sparse_read(ceph::buffer::list* data_bl,
492 Ex* extents,
493 int* prval,
494 boost::system::error_code* pec)
495 : data_bl(data_bl), extents(extents), prval(prval), pec(pec) {}
496 void operator()(boost::system::error_code ec, int r, const ceph::buffer::list& bl) {
497 auto iter = bl.cbegin();
498 if (r >= 0) {
499 // NOTE: it's possible the sub-op has not been executed but the result
500 // code remains zeroed. Avoid the costly exception handling on a
501 // potential IO path.
502 if (bl.length() > 0) {
503 try {
504 decode(*extents, iter);
505 decode(*data_bl, iter);
506 } catch (const ceph::buffer::error& e) {
507 if (prval)
508 *prval = -EIO;
509 if (pec)
510 *pec = e.code();
511 }
512 } else if (prval) {
513 *prval = -EIO;
514 if (pec)
515 *pec = buffer::errc::end_of_buffer;
516 }
517 }
518 }
519 };
520 void sparse_read(uint64_t off, uint64_t len, std::map<uint64_t, uint64_t>* m,
521 ceph::buffer::list* data_bl, int* prval) {
522 ceph::buffer::list bl;
523 add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
524 set_handler(CB_ObjectOperation_sparse_read(data_bl, m, prval, nullptr));
525 out_rval.back() = prval;
526 }
527 void sparse_read(uint64_t off, uint64_t len,
528 boost::system::error_code* ec,
529 std::vector<std::pair<uint64_t, uint64_t>>* m,
530 ceph::buffer::list* data_bl) {
531 ceph::buffer::list bl;
532 add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
533 set_handler(CB_ObjectOperation_sparse_read(data_bl, m, nullptr, ec));
534 out_ec.back() = ec;
535 }
536 void write(uint64_t off, ceph::buffer::list& bl,
537 uint64_t truncate_size,
538 uint32_t truncate_seq) {
539 add_data(CEPH_OSD_OP_WRITE, off, bl.length(), bl);
540 OSDOp& o = *ops.rbegin();
541 o.op.extent.truncate_size = truncate_size;
542 o.op.extent.truncate_seq = truncate_seq;
543 }
544 void write(uint64_t off, ceph::buffer::list& bl) {
545 write(off, bl, 0, 0);
546 }
547 void write_full(ceph::buffer::list& bl) {
548 add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl);
549 }
550 void writesame(uint64_t off, uint64_t write_len, ceph::buffer::list& bl) {
551 add_writesame(CEPH_OSD_OP_WRITESAME, off, write_len, bl);
552 }
553 void append(ceph::buffer::list& bl) {
554 add_data(CEPH_OSD_OP_APPEND, 0, bl.length(), bl);
555 }
556 void zero(uint64_t off, uint64_t len) {
557 ceph::buffer::list bl;
558 add_data(CEPH_OSD_OP_ZERO, off, len, bl);
559 }
560 void truncate(uint64_t off) {
561 ceph::buffer::list bl;
562 add_data(CEPH_OSD_OP_TRUNCATE, off, 0, bl);
563 }
564 void remove() {
565 ceph::buffer::list bl;
566 add_data(CEPH_OSD_OP_DELETE, 0, 0, bl);
567 }
568 void mapext(uint64_t off, uint64_t len) {
569 ceph::buffer::list bl;
570 add_data(CEPH_OSD_OP_MAPEXT, off, len, bl);
571 }
572 void sparse_read(uint64_t off, uint64_t len) {
573 ceph::buffer::list bl;
574 add_data(CEPH_OSD_OP_SPARSE_READ, off, len, bl);
575 }
576
577 void checksum(uint8_t type, const ceph::buffer::list &init_value_bl,
578 uint64_t off, uint64_t len, size_t chunk_size,
579 ceph::buffer::list *pbl, int *prval, Context *ctx) {
580 OSDOp& osd_op = add_op(CEPH_OSD_OP_CHECKSUM);
581 osd_op.op.checksum.offset = off;
582 osd_op.op.checksum.length = len;
583 osd_op.op.checksum.type = type;
584 osd_op.op.checksum.chunk_size = chunk_size;
585 osd_op.indata.append(init_value_bl);
586
587 unsigned p = ops.size() - 1;
588 out_bl[p] = pbl;
589 out_rval[p] = prval;
590 set_handler(ctx);
591 }
592
593 // object attrs
594 void getxattr(const char *name, ceph::buffer::list *pbl, int *prval) {
595 ceph::buffer::list bl;
596 add_xattr(CEPH_OSD_OP_GETXATTR, name, bl);
597 unsigned p = ops.size() - 1;
598 out_bl[p] = pbl;
599 out_rval[p] = prval;
600 }
601 void getxattr(std::string_view name, boost::system::error_code* ec,
602 buffer::list *pbl) {
603 ceph::buffer::list bl;
604 add_xattr(CEPH_OSD_OP_GETXATTR, name, bl);
605 out_bl.back() = pbl;
606 out_ec.back() = ec;
607 }
608
609 template<typename Vals>
610 struct CB_ObjectOperation_decodevals {
611 uint64_t max_entries;
612 Vals* pattrs;
613 bool* ptruncated;
614 int* prval;
615 boost::system::error_code* pec;
616 CB_ObjectOperation_decodevals(uint64_t m, Vals* pa,
617 bool *pt, int *pr,
618 boost::system::error_code* pec)
619 : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr), pec(pec) {
620 if (ptruncated) {
621 *ptruncated = false;
622 }
623 }
624 void operator()(boost::system::error_code ec, int r, const ceph::buffer::list& bl) {
625 if (r >= 0) {
626 auto p = bl.cbegin();
627 try {
628 if (pattrs)
629 decode(*pattrs, p);
630 if (ptruncated) {
631 Vals ignore;
632 if (!pattrs) {
633 decode(ignore, p);
634 pattrs = &ignore;
635 }
636 if (!p.end()) {
637 decode(*ptruncated, p);
638 } else {
639 // The OSD did not provide this. Since old OSDs do not
640 // enfoce omap result limits either, we can infer it from
641 // the size of the result
642 *ptruncated = (pattrs->size() == max_entries);
643 }
644 }
645 } catch (const ceph::buffer::error& e) {
646 if (prval)
647 *prval = -EIO;
648 if (pec)
649 *pec = e.code();
650 }
651 }
652 }
653 };
654 template<typename Keys>
655 struct CB_ObjectOperation_decodekeys {
656 uint64_t max_entries;
657 Keys* pattrs;
658 bool *ptruncated;
659 int *prval;
660 boost::system::error_code* pec;
661 CB_ObjectOperation_decodekeys(uint64_t m, Keys* pa, bool *pt,
662 int *pr, boost::system::error_code* pec)
663 : max_entries(m), pattrs(pa), ptruncated(pt), prval(pr), pec(pec) {
664 if (ptruncated) {
665 *ptruncated = false;
666 }
667 }
668 void operator()(boost::system::error_code ec, int r, const ceph::buffer::list& bl) {
669 if (r >= 0) {
670 using ceph::decode;
671 auto p = bl.cbegin();
672 try {
673 if (pattrs)
674 decode(*pattrs, p);
675 if (ptruncated) {
676 Keys ignore;
677 if (!pattrs) {
678 decode(ignore, p);
679 pattrs = &ignore;
680 }
681 if (!p.end()) {
682 decode(*ptruncated, p);
683 } else {
684 // the OSD did not provide this. since old OSDs do not
685 // enforce omap result limits either, we can infer it from
686 // the size of the result
687 *ptruncated = (pattrs->size() == max_entries);
688 }
689 }
690 } catch (const ceph::buffer::error& e) {
691 if (prval)
692 *prval = -EIO;
693 if (pec)
694 *pec = e.code();
695 }
696 }
697 }
698 };
699 struct CB_ObjectOperation_decodewatchers {
700 std::list<obj_watch_t>* pwatchers;
701 int* prval;
702 boost::system::error_code* pec;
703 CB_ObjectOperation_decodewatchers(std::list<obj_watch_t>* pw, int* pr,
704 boost::system::error_code* pec)
705 : pwatchers(pw), prval(pr), pec(pec) {}
706 void operator()(boost::system::error_code ec, int r,
707 const ceph::buffer::list& bl) {
708 if (r >= 0) {
709 auto p = bl.cbegin();
710 try {
711 obj_list_watch_response_t resp;
712 decode(resp, p);
713 if (pwatchers) {
714 for (const auto& watch_item : resp.entries) {
715 obj_watch_t ow;
716 std::string sa = watch_item.addr.get_legacy_str();
717 strncpy(ow.addr, sa.c_str(), sizeof(ow.addr) - 1);
718 ow.addr[sizeof(ow.addr) - 1] = '\0';
719 ow.watcher_id = watch_item.name.num();
720 ow.cookie = watch_item.cookie;
721 ow.timeout_seconds = watch_item.timeout_seconds;
722 pwatchers->push_back(std::move(ow));
723 }
724 }
725 } catch (const ceph::buffer::error& e) {
726 if (prval)
727 *prval = -EIO;
728 if (pec)
729 *pec = e.code();
730 }
731 }
732 }
733 };
734
735 struct CB_ObjectOperation_decodewatchersneo {
736 std::vector<neorados::ObjWatcher>* pwatchers;
737 int* prval;
738 boost::system::error_code* pec;
739 CB_ObjectOperation_decodewatchersneo(std::vector<neorados::ObjWatcher>* pw,
740 int* pr,
741 boost::system::error_code* pec)
742 : pwatchers(pw), prval(pr), pec(pec) {}
743 void operator()(boost::system::error_code ec, int r,
744 const ceph::buffer::list& bl) {
745 if (r >= 0) {
746 auto p = bl.cbegin();
747 try {
748 obj_list_watch_response_t resp;
749 decode(resp, p);
750 if (pwatchers) {
751 for (const auto& watch_item : resp.entries) {
752 neorados::ObjWatcher ow;
753 ow.addr = watch_item.addr.get_legacy_str();
754 ow.watcher_id = watch_item.name.num();
755 ow.cookie = watch_item.cookie;
756 ow.timeout_seconds = watch_item.timeout_seconds;
757 pwatchers->push_back(std::move(ow));
758 }
759 }
760 } catch (const ceph::buffer::error& e) {
761 if (prval)
762 *prval = -EIO;
763 if (pec)
764 *pec = e.code();
765 }
766 }
767 }
768 };
769
770
771 struct CB_ObjectOperation_decodesnaps {
772 librados::snap_set_t *psnaps;
773 neorados::SnapSet *neosnaps;
774 int *prval;
775 boost::system::error_code* pec;
776 CB_ObjectOperation_decodesnaps(librados::snap_set_t* ps,
777 neorados::SnapSet* ns, int* pr,
778 boost::system::error_code* pec)
779 : psnaps(ps), neosnaps(ns), prval(pr), pec(pec) {}
780 void operator()(boost::system::error_code ec, int r, const ceph::buffer::list& bl) {
781 if (r >= 0) {
782 using ceph::decode;
783 auto p = bl.cbegin();
784 try {
785 obj_list_snap_response_t resp;
786 decode(resp, p);
787 if (psnaps) {
788 psnaps->clones.clear();
789 for (auto ci = resp.clones.begin();
790 ci != resp.clones.end();
791 ++ci) {
792 librados::clone_info_t clone;
793
794 clone.cloneid = ci->cloneid;
795 clone.snaps.reserve(ci->snaps.size());
796 clone.snaps.insert(clone.snaps.end(), ci->snaps.begin(),
797 ci->snaps.end());
798 clone.overlap = ci->overlap;
799 clone.size = ci->size;
800
801 psnaps->clones.push_back(clone);
802 }
803 psnaps->seq = resp.seq;
804 }
805
806 if (neosnaps) {
807 neosnaps->clones.clear();
808 for (auto&& c : resp.clones) {
809 neorados::CloneInfo clone;
810
811 clone.cloneid = std::move(c.cloneid);
812 clone.snaps.reserve(c.snaps.size());
813 std::move(c.snaps.begin(), c.snaps.end(),
814 std::back_inserter(clone.snaps));
815 clone.overlap = c.overlap;
816 clone.size = c.size;
817 neosnaps->clones.push_back(std::move(clone));
818 }
819 neosnaps->seq = resp.seq;
820 }
821 } catch (const ceph::buffer::error& e) {
822 if (prval)
823 *prval = -EIO;
824 if (pec)
825 *pec = e.code();
826 }
827 }
828 }
829 };
830 void getxattrs(std::map<std::string,ceph::buffer::list> *pattrs, int *prval) {
831 add_op(CEPH_OSD_OP_GETXATTRS);
832 if (pattrs || prval) {
833 set_handler(CB_ObjectOperation_decodevals(0, pattrs, nullptr, prval,
834 nullptr));
835 out_rval.back() = prval;
836 }
837 }
838 void getxattrs(boost::system::error_code* ec,
839 boost::container::flat_map<std::string, ceph::buffer::list> *pattrs) {
840 add_op(CEPH_OSD_OP_GETXATTRS);
841 set_handler(CB_ObjectOperation_decodevals(0, pattrs, nullptr, nullptr, ec));
842 out_ec.back() = ec;
843 }
844 void setxattr(const char *name, const ceph::buffer::list& bl) {
845 add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
846 }
847 void setxattr(std::string_view name, const ceph::buffer::list& bl) {
848 add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
849 }
850 void setxattr(const char *name, const std::string& s) {
851 ceph::buffer::list bl;
852 bl.append(s);
853 add_xattr(CEPH_OSD_OP_SETXATTR, name, bl);
854 }
855 void cmpxattr(const char *name, uint8_t cmp_op, uint8_t cmp_mode,
856 const ceph::buffer::list& bl) {
857 add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, bl);
858 }
859 void cmpxattr(std::string_view name, uint8_t cmp_op, uint8_t cmp_mode,
860 const ceph::buffer::list& bl) {
861 add_xattr_cmp(CEPH_OSD_OP_CMPXATTR, name, cmp_op, cmp_mode, bl);
862 }
863 void rmxattr(const char *name) {
864 ceph::buffer::list bl;
865 add_xattr(CEPH_OSD_OP_RMXATTR, name, bl);
866 }
867 void rmxattr(std::string_view name) {
868 ceph::buffer::list bl;
869 add_xattr(CEPH_OSD_OP_RMXATTR, name, bl);
870 }
871 void setxattrs(map<string, ceph::buffer::list>& attrs) {
872 using ceph::encode;
873 ceph::buffer::list bl;
874 encode(attrs, bl);
875 add_xattr(CEPH_OSD_OP_RESETXATTRS, 0, bl.length());
876 }
877 void resetxattrs(const char *prefix, std::map<std::string, ceph::buffer::list>& attrs) {
878 using ceph::encode;
879 ceph::buffer::list bl;
880 encode(attrs, bl);
881 add_xattr(CEPH_OSD_OP_RESETXATTRS, prefix, bl);
882 }
883
884 // trivialmap
885 void tmap_update(ceph::buffer::list& bl) {
886 add_data(CEPH_OSD_OP_TMAPUP, 0, 0, bl);
887 }
888
889 // objectmap
890 void omap_get_keys(const std::string &start_after,
891 uint64_t max_to_get,
892 std::set<std::string> *out_set,
893 bool *ptruncated,
894 int *prval) {
895 using ceph::encode;
896 OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETKEYS);
897 ceph::buffer::list bl;
898 encode(start_after, bl);
899 encode(max_to_get, bl);
900 op.op.extent.offset = 0;
901 op.op.extent.length = bl.length();
902 op.indata.claim_append(bl);
903 if (prval || ptruncated || out_set) {
904 set_handler(CB_ObjectOperation_decodekeys(max_to_get, out_set, ptruncated, prval,
905 nullptr));
906 out_rval.back() = prval;
907 }
908 }
909 void omap_get_keys(std::optional<std::string_view> start_after,
910 uint64_t max_to_get,
911 boost::system::error_code* ec,
912 boost::container::flat_set<std::string> *out_set,
913 bool *ptruncated) {
914 OSDOp& op = add_op(CEPH_OSD_OP_OMAPGETKEYS);
915 ceph::buffer::list bl;
916 encode(start_after ? *start_after : std::string_view{}, bl);
917 encode(max_to_get, bl);
918 op.op.extent.offset = 0;
919 op.op.extent.length = bl.length();
920 op.indata.claim_append(bl);
921 set_handler(
922 CB_ObjectOperation_decodekeys(max_to_get, out_set, ptruncated, nullptr,
923 ec));
924 out_ec.back() = ec;
925 }
926
927 void omap_get_vals(const std::string &start_after,
928 const std::string &filter_prefix,
929 uint64_t max_to_get,
930 std::map<std::string, ceph::buffer::list> *out_set,
931 bool *ptruncated,
932 int *prval) {
933 using ceph::encode;
934 OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALS);
935 ceph::buffer::list bl;
936 encode(start_after, bl);
937 encode(max_to_get, bl);
938 encode(filter_prefix, bl);
939 op.op.extent.offset = 0;
940 op.op.extent.length = bl.length();
941 op.indata.claim_append(bl);
942 if (prval || out_set || ptruncated) {
943 set_handler(CB_ObjectOperation_decodevals(max_to_get, out_set, ptruncated,
944 prval, nullptr));
945 out_rval.back() = prval;
946 }
947 }
948
949 void omap_get_vals(std::optional<std::string_view> start_after,
950 std::optional<std::string_view> filter_prefix,
951 uint64_t max_to_get,
952 boost::system::error_code* ec,
953 boost::container::flat_map<std::string, ceph::buffer::list> *out_set,
954 bool *ptruncated) {
955 OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALS);
956 ceph::buffer::list bl;
957 encode(start_after ? *start_after : std::string_view{}, bl);
958 encode(max_to_get, bl);
959 encode(filter_prefix ? *start_after : std::string_view{}, bl);
960 op.op.extent.offset = 0;
961 op.op.extent.length = bl.length();
962 op.indata.claim_append(bl);
963 set_handler(CB_ObjectOperation_decodevals(max_to_get, out_set, ptruncated,
964 nullptr, ec));
965 out_ec.back() = ec;
966 }
967
968 void omap_get_vals_by_keys(const std::set<std::string> &to_get,
969 std::map<std::string, ceph::buffer::list> *out_set,
970 int *prval) {
971 OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS);
972 ceph::buffer::list bl;
973 encode(to_get, bl);
974 op.op.extent.offset = 0;
975 op.op.extent.length = bl.length();
976 op.indata.claim_append(bl);
977 if (prval || out_set) {
978 set_handler(CB_ObjectOperation_decodevals(0, out_set, nullptr, prval,
979 nullptr));
980 out_rval.back() = prval;
981 }
982 }
983
984 void omap_get_vals_by_keys(
985 const boost::container::flat_set<std::string>& to_get,
986 boost::system::error_code* ec,
987 boost::container::flat_map<std::string, ceph::buffer::list> *out_set) {
988 OSDOp &op = add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS);
989 ceph::buffer::list bl;
990 encode(to_get, bl);
991 op.op.extent.offset = 0;
992 op.op.extent.length = bl.length();
993 op.indata.claim_append(bl);
994 set_handler(CB_ObjectOperation_decodevals(0, out_set, nullptr, nullptr,
995 ec));
996 out_ec.back() = ec;
997 }
998
999 void omap_cmp(const std::map<std::string, pair<ceph::buffer::list,int> > &assertions,
1000 int *prval) {
1001 using ceph::encode;
1002 OSDOp &op = add_op(CEPH_OSD_OP_OMAP_CMP);
1003 ceph::buffer::list bl;
1004 encode(assertions, bl);
1005 op.op.extent.offset = 0;
1006 op.op.extent.length = bl.length();
1007 op.indata.claim_append(bl);
1008 if (prval) {
1009 unsigned p = ops.size() - 1;
1010 out_rval[p] = prval;
1011 }
1012 }
1013
1014 void omap_cmp(const boost::container::flat_map<
1015 std::string, pair<ceph::buffer::list, int>>& assertions,
1016 boost::system::error_code *ec) {
1017 OSDOp &op = add_op(CEPH_OSD_OP_OMAP_CMP);
1018 ceph::buffer::list bl;
1019 encode(assertions, bl);
1020 op.op.extent.offset = 0;
1021 op.op.extent.length = bl.length();
1022 op.indata.claim_append(bl);
1023 out_ec.back() = ec;
1024 }
1025
1026 struct C_ObjectOperation_copyget : public Context {
1027 ceph::buffer::list bl;
1028 object_copy_cursor_t *cursor;
1029 uint64_t *out_size;
1030 ceph::real_time *out_mtime;
1031 std::map<std::string,ceph::buffer::list> *out_attrs;
1032 ceph::buffer::list *out_data, *out_omap_header, *out_omap_data;
1033 std::vector<snapid_t> *out_snaps;
1034 snapid_t *out_snap_seq;
1035 uint32_t *out_flags;
1036 uint32_t *out_data_digest;
1037 uint32_t *out_omap_digest;
1038 mempool::osd_pglog::vector<std::pair<osd_reqid_t, version_t> > *out_reqids;
1039 mempool::osd_pglog::map<uint32_t, int> *out_reqid_return_codes;
1040 uint64_t *out_truncate_seq;
1041 uint64_t *out_truncate_size;
1042 int *prval;
1043 C_ObjectOperation_copyget(object_copy_cursor_t *c,
1044 uint64_t *s,
1045 ceph::real_time *m,
1046 std::map<std::string,ceph::buffer::list> *a,
1047 ceph::buffer::list *d, ceph::buffer::list *oh,
1048 ceph::buffer::list *o,
1049 std::vector<snapid_t> *osnaps,
1050 snapid_t *osnap_seq,
1051 uint32_t *flags,
1052 uint32_t *dd,
1053 uint32_t *od,
1054 mempool::osd_pglog::vector<std::pair<osd_reqid_t, version_t> > *oreqids,
1055 mempool::osd_pglog::map<uint32_t, int> *oreqid_return_codes,
1056 uint64_t *otseq,
1057 uint64_t *otsize,
1058 int *r)
1059 : cursor(c),
1060 out_size(s), out_mtime(m),
1061 out_attrs(a), out_data(d), out_omap_header(oh),
1062 out_omap_data(o), out_snaps(osnaps), out_snap_seq(osnap_seq),
1063 out_flags(flags), out_data_digest(dd), out_omap_digest(od),
1064 out_reqids(oreqids),
1065 out_reqid_return_codes(oreqid_return_codes),
1066 out_truncate_seq(otseq),
1067 out_truncate_size(otsize),
1068 prval(r) {}
1069 void finish(int r) override {
1070 using ceph::decode;
1071 // reqids are copied on ENOENT
1072 if (r < 0 && r != -ENOENT)
1073 return;
1074 try {
1075 auto p = bl.cbegin();
1076 object_copy_data_t copy_reply;
1077 decode(copy_reply, p);
1078 if (r == -ENOENT) {
1079 if (out_reqids)
1080 *out_reqids = copy_reply.reqids;
1081 return;
1082 }
1083 if (out_size)
1084 *out_size = copy_reply.size;
1085 if (out_mtime)
1086 *out_mtime = ceph::real_clock::from_ceph_timespec(copy_reply.mtime);
1087 if (out_attrs)
1088 *out_attrs = copy_reply.attrs;
1089 if (out_data)
1090 out_data->claim_append(copy_reply.data);
1091 if (out_omap_header)
1092 out_omap_header->claim_append(copy_reply.omap_header);
1093 if (out_omap_data)
1094 *out_omap_data = copy_reply.omap_data;
1095 if (out_snaps)
1096 *out_snaps = copy_reply.snaps;
1097 if (out_snap_seq)
1098 *out_snap_seq = copy_reply.snap_seq;
1099 if (out_flags)
1100 *out_flags = copy_reply.flags;
1101 if (out_data_digest)
1102 *out_data_digest = copy_reply.data_digest;
1103 if (out_omap_digest)
1104 *out_omap_digest = copy_reply.omap_digest;
1105 if (out_reqids)
1106 *out_reqids = copy_reply.reqids;
1107 if (out_reqid_return_codes)
1108 *out_reqid_return_codes = copy_reply.reqid_return_codes;
1109 if (out_truncate_seq)
1110 *out_truncate_seq = copy_reply.truncate_seq;
1111 if (out_truncate_size)
1112 *out_truncate_size = copy_reply.truncate_size;
1113 *cursor = copy_reply.cursor;
1114 } catch (const ceph::buffer::error& e) {
1115 if (prval)
1116 *prval = -EIO;
1117 }
1118 }
1119 };
1120
1121 void copy_get(object_copy_cursor_t *cursor,
1122 uint64_t max,
1123 uint64_t *out_size,
1124 ceph::real_time *out_mtime,
1125 std::map<std::string,ceph::buffer::list> *out_attrs,
1126 ceph::buffer::list *out_data,
1127 ceph::buffer::list *out_omap_header,
1128 ceph::buffer::list *out_omap_data,
1129 std::vector<snapid_t> *out_snaps,
1130 snapid_t *out_snap_seq,
1131 uint32_t *out_flags,
1132 uint32_t *out_data_digest,
1133 uint32_t *out_omap_digest,
1134 mempool::osd_pglog::vector<std::pair<osd_reqid_t, version_t> > *out_reqids,
1135 mempool::osd_pglog::map<uint32_t, int> *out_reqid_return_codes,
1136 uint64_t *truncate_seq,
1137 uint64_t *truncate_size,
1138 int *prval) {
1139 using ceph::encode;
1140 OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_GET);
1141 osd_op.op.copy_get.max = max;
1142 encode(*cursor, osd_op.indata);
1143 encode(max, osd_op.indata);
1144 unsigned p = ops.size() - 1;
1145 out_rval[p] = prval;
1146 C_ObjectOperation_copyget *h =
1147 new C_ObjectOperation_copyget(cursor, out_size, out_mtime,
1148 out_attrs, out_data, out_omap_header,
1149 out_omap_data, out_snaps, out_snap_seq,
1150 out_flags, out_data_digest,
1151 out_omap_digest, out_reqids,
1152 out_reqid_return_codes, truncate_seq,
1153 truncate_size, prval);
1154 out_bl[p] = &h->bl;
1155 set_handler(h);
1156 }
1157
1158 void undirty() {
1159 add_op(CEPH_OSD_OP_UNDIRTY);
1160 }
1161
1162 struct C_ObjectOperation_isdirty : public Context {
1163 ceph::buffer::list bl;
1164 bool *pisdirty;
1165 int *prval;
1166 C_ObjectOperation_isdirty(bool *p, int *r)
1167 : pisdirty(p), prval(r) {}
1168 void finish(int r) override {
1169 using ceph::decode;
1170 if (r < 0)
1171 return;
1172 try {
1173 auto p = bl.cbegin();
1174 bool isdirty;
1175 decode(isdirty, p);
1176 if (pisdirty)
1177 *pisdirty = isdirty;
1178 } catch (const ceph::buffer::error& e) {
1179 if (prval)
1180 *prval = -EIO;
1181 }
1182 }
1183 };
1184
1185 void is_dirty(bool *pisdirty, int *prval) {
1186 add_op(CEPH_OSD_OP_ISDIRTY);
1187 unsigned p = ops.size() - 1;
1188 out_rval[p] = prval;
1189 C_ObjectOperation_isdirty *h =
1190 new C_ObjectOperation_isdirty(pisdirty, prval);
1191 out_bl[p] = &h->bl;
1192 set_handler(h);
1193 }
1194
1195 struct C_ObjectOperation_hit_set_ls : public Context {
1196 ceph::buffer::list bl;
1197 std::list< std::pair<time_t, time_t> > *ptls;
1198 std::list< std::pair<ceph::real_time, ceph::real_time> > *putls;
1199 int *prval;
1200 C_ObjectOperation_hit_set_ls(std::list< std::pair<time_t, time_t> > *t,
1201 std::list< std::pair<ceph::real_time,
1202 ceph::real_time> > *ut,
1203 int *r)
1204 : ptls(t), putls(ut), prval(r) {}
1205 void finish(int r) override {
1206 using ceph::decode;
1207 if (r < 0)
1208 return;
1209 try {
1210 auto p = bl.cbegin();
1211 std::list< std::pair<ceph::real_time, ceph::real_time> > ls;
1212 decode(ls, p);
1213 if (ptls) {
1214 ptls->clear();
1215 for (auto p = ls.begin(); p != ls.end(); ++p)
1216 // round initial timestamp up to the next full second to
1217 // keep this a valid interval.
1218 ptls->push_back(
1219 std::make_pair(ceph::real_clock::to_time_t(
1220 ceph::ceil(p->first,
1221 // Sadly, no time literals until C++14.
1222 std::chrono::seconds(1))),
1223 ceph::real_clock::to_time_t(p->second)));
1224 }
1225 if (putls)
1226 putls->swap(ls);
1227 } catch (const ceph::buffer::error& e) {
1228 r = -EIO;
1229 }
1230 if (prval)
1231 *prval = r;
1232 }
1233 };
1234
1235 /**
1236 * std::list available HitSets.
1237 *
1238 * We will get back a std::list of time intervals. Note that the most
1239 * recent range may have an empty end timestamp if it is still
1240 * accumulating.
1241 *
1242 * @param pls [out] std::list of time intervals
1243 * @param prval [out] return value
1244 */
1245 void hit_set_ls(std::list< std::pair<time_t, time_t> > *pls, int *prval) {
1246 add_op(CEPH_OSD_OP_PG_HITSET_LS);
1247 unsigned p = ops.size() - 1;
1248 out_rval[p] = prval;
1249 C_ObjectOperation_hit_set_ls *h =
1250 new C_ObjectOperation_hit_set_ls(pls, NULL, prval);
1251 out_bl[p] = &h->bl;
1252 set_handler(h);
1253 }
1254 void hit_set_ls(std::list<std::pair<ceph::real_time, ceph::real_time> > *pls,
1255 int *prval) {
1256 add_op(CEPH_OSD_OP_PG_HITSET_LS);
1257 unsigned p = ops.size() - 1;
1258 out_rval[p] = prval;
1259 C_ObjectOperation_hit_set_ls *h =
1260 new C_ObjectOperation_hit_set_ls(NULL, pls, prval);
1261 out_bl[p] = &h->bl;
1262 set_handler(h);
1263 }
1264
1265 /**
1266 * get HitSet
1267 *
1268 * Return an encoded HitSet that includes the provided time
1269 * interval.
1270 *
1271 * @param stamp [in] timestamp
1272 * @param pbl [out] target buffer for encoded HitSet
1273 * @param prval [out] return value
1274 */
1275 void hit_set_get(ceph::real_time stamp, ceph::buffer::list *pbl, int *prval) {
1276 OSDOp& op = add_op(CEPH_OSD_OP_PG_HITSET_GET);
1277 op.op.hit_set_get.stamp = ceph::real_clock::to_ceph_timespec(stamp);
1278 unsigned p = ops.size() - 1;
1279 out_rval[p] = prval;
1280 out_bl[p] = pbl;
1281 }
1282
1283 void omap_get_header(ceph::buffer::list *bl, int *prval) {
1284 add_op(CEPH_OSD_OP_OMAPGETHEADER);
1285 unsigned p = ops.size() - 1;
1286 out_bl[p] = bl;
1287 out_rval[p] = prval;
1288 }
1289
1290 void omap_get_header(boost::system::error_code* ec, ceph::buffer::list *bl) {
1291 add_op(CEPH_OSD_OP_OMAPGETHEADER);
1292 out_bl.back() = bl;
1293 out_ec.back() = ec;
1294 }
1295
1296 void omap_set(const map<string, ceph::buffer::list> &map) {
1297 ceph::buffer::list bl;
1298 encode(map, bl);
1299 add_data(CEPH_OSD_OP_OMAPSETVALS, 0, bl.length(), bl);
1300 }
1301
1302 void omap_set(const boost::container::flat_map<string, ceph::buffer::list>& map) {
1303 ceph::buffer::list bl;
1304 encode(map, bl);
1305 add_data(CEPH_OSD_OP_OMAPSETVALS, 0, bl.length(), bl);
1306 }
1307
1308 void omap_set_header(ceph::buffer::list &bl) {
1309 add_data(CEPH_OSD_OP_OMAPSETHEADER, 0, bl.length(), bl);
1310 }
1311
1312 void omap_clear() {
1313 add_op(CEPH_OSD_OP_OMAPCLEAR);
1314 }
1315
1316 void omap_rm_keys(const std::set<std::string> &to_remove) {
1317 using ceph::encode;
1318 ceph::buffer::list bl;
1319 encode(to_remove, bl);
1320 add_data(CEPH_OSD_OP_OMAPRMKEYS, 0, bl.length(), bl);
1321 }
1322 void omap_rm_keys(const boost::container::flat_set<std::string>& to_remove) {
1323 ceph::buffer::list bl;
1324 encode(to_remove, bl);
1325 add_data(CEPH_OSD_OP_OMAPRMKEYS, 0, bl.length(), bl);
1326 }
1327
1328 void omap_rm_range(std::string_view key_begin, std::string_view key_end) {
1329 ceph::buffer::list bl;
1330 using ceph::encode;
1331 encode(key_begin, bl);
1332 encode(key_end, bl);
1333 add_data(CEPH_OSD_OP_OMAPRMKEYRANGE, 0, bl.length(), bl);
1334 }
1335
1336 // object classes
1337 void call(const char *cname, const char *method, ceph::buffer::list &indata) {
1338 add_call(CEPH_OSD_OP_CALL, cname, method, indata, NULL, NULL, NULL);
1339 }
1340
1341 void call(const char *cname, const char *method, ceph::buffer::list &indata,
1342 ceph::buffer::list *outdata, Context *ctx, int *prval) {
1343 add_call(CEPH_OSD_OP_CALL, cname, method, indata, outdata, ctx, prval);
1344 }
1345
1346 void call(std::string_view cname, std::string_view method,
1347 const ceph::buffer::list& indata, boost::system::error_code* ec) {
1348 add_call(CEPH_OSD_OP_CALL, cname, method, indata, NULL, NULL, NULL);
1349 out_ec.back() = ec;
1350 }
1351
1352 void call(std::string_view cname, std::string_view method, const ceph::buffer::list& indata,
1353 boost::system::error_code* ec, ceph::buffer::list *outdata) {
1354 add_call(CEPH_OSD_OP_CALL, cname, method, indata, outdata, nullptr, nullptr);
1355 out_ec.back() = ec;
1356 }
1357 void call(std::string_view cname, std::string_view method,
1358 const ceph::buffer::list& indata,
1359 fu2::unique_function<void (boost::system::error_code,
1360 const ceph::buffer::list&) &&> f) {
1361 add_call(CEPH_OSD_OP_CALL, cname, method, indata, std::move(f));
1362 }
1363 void call(std::string_view cname, std::string_view method,
1364 const ceph::buffer::list& indata,
1365 fu2::unique_function<void (boost::system::error_code, int,
1366 const ceph::buffer::list&) &&> f) {
1367 add_call(CEPH_OSD_OP_CALL, cname, method, indata, std::move(f));
1368 }
1369
1370 // watch/notify
1371 void watch(uint64_t cookie, __u8 op, uint32_t timeout = 0) {
1372 OSDOp& osd_op = add_op(CEPH_OSD_OP_WATCH);
1373 osd_op.op.watch.cookie = cookie;
1374 osd_op.op.watch.op = op;
1375 osd_op.op.watch.timeout = timeout;
1376 }
1377
1378 void notify(uint64_t cookie, uint32_t prot_ver, uint32_t timeout,
1379 ceph::buffer::list &bl, ceph::buffer::list *inbl) {
1380 using ceph::encode;
1381 OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY);
1382 osd_op.op.notify.cookie = cookie;
1383 encode(prot_ver, *inbl);
1384 encode(timeout, *inbl);
1385 encode(bl, *inbl);
1386 osd_op.indata.append(*inbl);
1387 }
1388
1389 void notify_ack(uint64_t notify_id, uint64_t cookie,
1390 ceph::buffer::list& reply_bl) {
1391 using ceph::encode;
1392 OSDOp& osd_op = add_op(CEPH_OSD_OP_NOTIFY_ACK);
1393 ceph::buffer::list bl;
1394 encode(notify_id, bl);
1395 encode(cookie, bl);
1396 encode(reply_bl, bl);
1397 osd_op.indata.append(bl);
1398 }
1399
1400 void list_watchers(std::list<obj_watch_t> *out,
1401 int *prval) {
1402 add_op(CEPH_OSD_OP_LIST_WATCHERS);
1403 if (prval || out) {
1404 set_handler(CB_ObjectOperation_decodewatchers(out, prval, nullptr));
1405 out_rval.back() = prval;
1406 }
1407 }
1408 void list_watchers(vector<neorados::ObjWatcher>* out,
1409 boost::system::error_code* ec) {
1410 add_op(CEPH_OSD_OP_LIST_WATCHERS);
1411 set_handler(CB_ObjectOperation_decodewatchersneo(out, nullptr, ec));
1412 out_ec.back() = ec;
1413 }
1414
1415 void list_snaps(librados::snap_set_t *out, int *prval,
1416 boost::system::error_code* ec = nullptr) {
1417 add_op(CEPH_OSD_OP_LIST_SNAPS);
1418 if (prval || out || ec) {
1419 set_handler(CB_ObjectOperation_decodesnaps(out, nullptr, prval, ec));
1420 out_rval.back() = prval;
1421 out_ec.back() = ec;
1422 }
1423 }
1424
1425 void list_snaps(neorados::SnapSet *out, int *prval,
1426 boost::system::error_code* ec = nullptr) {
1427 add_op(CEPH_OSD_OP_LIST_SNAPS);
1428 if (prval || out || ec) {
1429 set_handler(CB_ObjectOperation_decodesnaps(nullptr, out, prval, ec));
1430 out_rval.back() = prval;
1431 out_ec.back() = ec;
1432 }
1433 }
1434
1435 void assert_version(uint64_t ver) {
1436 OSDOp& osd_op = add_op(CEPH_OSD_OP_ASSERT_VER);
1437 osd_op.op.assert_ver.ver = ver;
1438 }
1439
1440 void cmpxattr(const char *name, const ceph::buffer::list& val,
1441 int op, int mode) {
1442 add_xattr(CEPH_OSD_OP_CMPXATTR, name, val);
1443 OSDOp& o = *ops.rbegin();
1444 o.op.xattr.cmp_op = op;
1445 o.op.xattr.cmp_mode = mode;
1446 }
1447
1448 void rollback(uint64_t snapid) {
1449 OSDOp& osd_op = add_op(CEPH_OSD_OP_ROLLBACK);
1450 osd_op.op.snap.snapid = snapid;
1451 }
1452
1453 void copy_from(object_t src, snapid_t snapid, object_locator_t src_oloc,
1454 version_t src_version, unsigned flags,
1455 unsigned src_fadvise_flags) {
1456 using ceph::encode;
1457 OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_FROM);
1458 osd_op.op.copy_from.snapid = snapid;
1459 osd_op.op.copy_from.src_version = src_version;
1460 osd_op.op.copy_from.flags = flags;
1461 osd_op.op.copy_from.src_fadvise_flags = src_fadvise_flags;
1462 encode(src, osd_op.indata);
1463 encode(src_oloc, osd_op.indata);
1464 }
1465 void copy_from2(object_t src, snapid_t snapid, object_locator_t src_oloc,
1466 version_t src_version, unsigned flags,
1467 uint32_t truncate_seq, uint64_t truncate_size,
1468 unsigned src_fadvise_flags) {
1469 using ceph::encode;
1470 OSDOp& osd_op = add_op(CEPH_OSD_OP_COPY_FROM2);
1471 osd_op.op.copy_from.snapid = snapid;
1472 osd_op.op.copy_from.src_version = src_version;
1473 osd_op.op.copy_from.flags = flags;
1474 osd_op.op.copy_from.src_fadvise_flags = src_fadvise_flags;
1475 encode(src, osd_op.indata);
1476 encode(src_oloc, osd_op.indata);
1477 encode(truncate_seq, osd_op.indata);
1478 encode(truncate_size, osd_op.indata);
1479 }
1480
1481 /**
1482 * writeback content to backing tier
1483 *
1484 * If object is marked dirty in the cache tier, write back content
1485 * to backing tier. If the object is clean this is a no-op.
1486 *
1487 * If writeback races with an update, the update will block.
1488 *
1489 * use with IGNORE_CACHE to avoid triggering promote.
1490 */
1491 void cache_flush() {
1492 add_op(CEPH_OSD_OP_CACHE_FLUSH);
1493 }
1494
1495 /**
1496 * writeback content to backing tier
1497 *
1498 * If object is marked dirty in the cache tier, write back content
1499 * to backing tier. If the object is clean this is a no-op.
1500 *
1501 * If writeback races with an update, return EAGAIN. Requires that
1502 * the SKIPRWLOCKS flag be set.
1503 *
1504 * use with IGNORE_CACHE to avoid triggering promote.
1505 */
1506 void cache_try_flush() {
1507 add_op(CEPH_OSD_OP_CACHE_TRY_FLUSH);
1508 }
1509
1510 /**
1511 * evict object from cache tier
1512 *
1513 * If object is marked clean, remove the object from the cache tier.
1514 * Otherwise, return EBUSY.
1515 *
1516 * use with IGNORE_CACHE to avoid triggering promote.
1517 */
1518 void cache_evict() {
1519 add_op(CEPH_OSD_OP_CACHE_EVICT);
1520 }
1521
1522 /*
1523 * Extensible tier
1524 */
1525 void set_redirect(object_t tgt, snapid_t snapid, object_locator_t tgt_oloc,
1526 version_t tgt_version, int flag) {
1527 using ceph::encode;
1528 OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_REDIRECT);
1529 osd_op.op.copy_from.snapid = snapid;
1530 osd_op.op.copy_from.src_version = tgt_version;
1531 encode(tgt, osd_op.indata);
1532 encode(tgt_oloc, osd_op.indata);
1533 set_last_op_flags(flag);
1534 }
1535
1536 void set_chunk(uint64_t src_offset, uint64_t src_length, object_locator_t tgt_oloc,
1537 object_t tgt_oid, uint64_t tgt_offset, int flag) {
1538 using ceph::encode;
1539 OSDOp& osd_op = add_op(CEPH_OSD_OP_SET_CHUNK);
1540 encode(src_offset, osd_op.indata);
1541 encode(src_length, osd_op.indata);
1542 encode(tgt_oloc, osd_op.indata);
1543 encode(tgt_oid, osd_op.indata);
1544 encode(tgt_offset, osd_op.indata);
1545 set_last_op_flags(flag);
1546 }
1547
1548 void tier_promote() {
1549 add_op(CEPH_OSD_OP_TIER_PROMOTE);
1550 }
1551
1552 void unset_manifest() {
1553 add_op(CEPH_OSD_OP_UNSET_MANIFEST);
1554 }
1555
1556 void tier_flush() {
1557 add_op(CEPH_OSD_OP_TIER_FLUSH);
1558 }
1559
1560 void tier_evict() {
1561 add_op(CEPH_OSD_OP_TIER_EVICT);
1562 }
1563
1564 void set_alloc_hint(uint64_t expected_object_size,
1565 uint64_t expected_write_size,
1566 uint32_t flags) {
1567 add_alloc_hint(CEPH_OSD_OP_SETALLOCHINT, expected_object_size,
1568 expected_write_size, flags);
1569
1570 // CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
1571 // not worth a feature bit. Set FAILOK per-op flag to make
1572 // sure older osds don't trip over an unsupported opcode.
1573 set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK);
1574 }
1575
1576 template<typename V>
1577 void dup(V& sops) {
1578 ops.clear();
1579 std::copy(sops.begin(), sops.end(),
1580 std::back_inserter(ops));
1581 out_bl.resize(sops.size());
1582 out_handler.resize(sops.size());
1583 out_rval.resize(sops.size());
1584 out_ec.resize(sops.size());
1585 for (uint32_t i = 0; i < sops.size(); i++) {
1586 out_bl[i] = &sops[i].outdata;
1587 out_rval[i] = &sops[i].rval;
1588 out_ec[i] = nullptr;
1589 }
1590 }
1591
1592 /**
1593 * Pin/unpin an object in cache tier
1594 */
1595 void cache_pin() {
1596 add_op(CEPH_OSD_OP_CACHE_PIN);
1597 }
1598
1599 void cache_unpin() {
1600 add_op(CEPH_OSD_OP_CACHE_UNPIN);
1601 }
1602 };
1603
1604 inline std::ostream& operator <<(std::ostream& m, const ObjectOperation& oo) {
1605 auto i = oo.ops.cbegin();
1606 m << '[';
1607 while (i != oo.ops.cend()) {
1608 if (i != oo.ops.cbegin())
1609 m << ' ';
1610 m << *i;
1611 ++i;
1612 }
1613 m << ']';
1614 return m;
1615 }
1616
1617
1618 // ----------------
1619
1620 class Objecter : public md_config_obs_t, public Dispatcher {
1621 using MOSDOp = _mosdop::MOSDOp<osdc_opvec>;
1622 public:
1623 using OpSignature = void(boost::system::error_code);
1624 using OpCompletion = ceph::async::Completion<OpSignature>;
1625
1626 // config observer bits
1627 const char** get_tracked_conf_keys() const override;
1628 void handle_conf_change(const ConfigProxy& conf,
1629 const std::set <std::string> &changed) override;
1630
1631 public:
1632 Messenger *messenger;
1633 MonClient *monc;
1634 boost::asio::io_context& service;
1635 // The guaranteed sequenced, one-at-a-time execution and apparently
1636 // people sometimes depend on this.
1637 boost::asio::io_context::strand finish_strand{service};
1638 ZTracer::Endpoint trace_endpoint{"0.0.0.0", 0, "Objecter"};
1639 private:
1640 std::unique_ptr<OSDMap> osdmap{std::make_unique<OSDMap>()};
1641 public:
1642 using Dispatcher::cct;
1643 std::multimap<std::string,std::string> crush_location;
1644
1645 std::atomic<bool> initialized{false};
1646
1647 private:
1648 std::atomic<uint64_t> last_tid{0};
1649 std::atomic<unsigned> inflight_ops{0};
1650 std::atomic<int> client_inc{-1};
1651 uint64_t max_linger_id{0};
1652 std::atomic<unsigned> num_in_flight{0};
1653 std::atomic<int> global_op_flags{0}; // flags which are applied to each IO op
1654 bool keep_balanced_budget = false;
1655 bool honor_pool_full = true;
1656
1657 // If this is true, accumulate a set of blocklisted entities
1658 // to be drained by consume_blocklist_events.
1659 bool blocklist_events_enabled = false;
1660 std::set<entity_addr_t> blocklist_events;
1661 struct pg_mapping_t {
1662 epoch_t epoch = 0;
1663 std::vector<int> up;
1664 int up_primary = -1;
1665 std::vector<int> acting;
1666 int acting_primary = -1;
1667
1668 pg_mapping_t() {}
1669 pg_mapping_t(epoch_t epoch, std::vector<int> up, int up_primary,
1670 std::vector<int> acting, int acting_primary)
1671 : epoch(epoch), up(up), up_primary(up_primary),
1672 acting(acting), acting_primary(acting_primary) {}
1673 };
1674 ceph::shared_mutex pg_mapping_lock =
1675 ceph::make_shared_mutex("Objecter::pg_mapping_lock");
1676 // pool -> pg mapping
1677 std::map<int64_t, std::vector<pg_mapping_t>> pg_mappings;
1678
1679 // convenient accessors
1680 bool lookup_pg_mapping(const pg_t& pg, pg_mapping_t* pg_mapping) {
1681 std::shared_lock l{pg_mapping_lock};
1682 auto it = pg_mappings.find(pg.pool());
1683 if (it == pg_mappings.end())
1684 return false;
1685 auto& mapping_array = it->second;
1686 if (pg.ps() >= mapping_array.size())
1687 return false;
1688 if (mapping_array[pg.ps()].epoch != pg_mapping->epoch) // stale
1689 return false;
1690 *pg_mapping = mapping_array[pg.ps()];
1691 return true;
1692 }
1693 void update_pg_mapping(const pg_t& pg, pg_mapping_t&& pg_mapping) {
1694 std::lock_guard l{pg_mapping_lock};
1695 auto& mapping_array = pg_mappings[pg.pool()];
1696 ceph_assert(pg.ps() < mapping_array.size());
1697 mapping_array[pg.ps()] = std::move(pg_mapping);
1698 }
1699 void prune_pg_mapping(const mempool::osdmap::map<int64_t,pg_pool_t>& pools) {
1700 std::lock_guard l{pg_mapping_lock};
1701 for (auto& pool : pools) {
1702 auto& mapping_array = pg_mappings[pool.first];
1703 size_t pg_num = pool.second.get_pg_num();
1704 if (mapping_array.size() != pg_num) {
1705 // catch both pg_num increasing & decreasing
1706 mapping_array.resize(pg_num);
1707 }
1708 }
1709 for (auto it = pg_mappings.begin(); it != pg_mappings.end(); ) {
1710 if (!pools.count(it->first)) {
1711 // pool is gone
1712 pg_mappings.erase(it++);
1713 continue;
1714 }
1715 it++;
1716 }
1717 }
1718
1719 public:
1720 void maybe_request_map();
1721
1722 void enable_blocklist_events();
1723 private:
1724
1725 void _maybe_request_map();
1726
1727 version_t last_seen_osdmap_version = 0;
1728 version_t last_seen_pgmap_version = 0;
1729
1730 mutable ceph::shared_mutex rwlock =
1731 ceph::make_shared_mutex("Objecter::rwlock");
1732 ceph::timer<ceph::coarse_mono_clock> timer;
1733
1734 PerfCounters* logger = nullptr;
1735
1736 uint64_t tick_event = 0;
1737
1738 void start_tick();
1739 void tick();
1740 void update_crush_location();
1741
1742 class RequestStateHook;
1743
1744 RequestStateHook *m_request_state_hook = nullptr;
1745
1746 public:
1747 /*** track pending operations ***/
1748 // read
1749
1750 struct OSDSession;
1751
1752 struct op_target_t {
1753 int flags = 0;
1754
1755 epoch_t epoch = 0; ///< latest epoch we calculated the mapping
1756
1757 object_t base_oid;
1758 object_locator_t base_oloc;
1759 object_t target_oid;
1760 object_locator_t target_oloc;
1761
1762 ///< true if we are directed at base_pgid, not base_oid
1763 bool precalc_pgid = false;
1764
1765 ///< true if we have ever mapped to a valid pool
1766 bool pool_ever_existed = false;
1767
1768 ///< explcit pg target, if any
1769 pg_t base_pgid;
1770
1771 pg_t pgid; ///< last (raw) pg we mapped to
1772 spg_t actual_pgid; ///< last (actual) spg_t we mapped to
1773 unsigned pg_num = 0; ///< last pg_num we mapped to
1774 unsigned pg_num_mask = 0; ///< last pg_num_mask we mapped to
1775 unsigned pg_num_pending = 0; ///< last pg_num we mapped to
1776 std::vector<int> up; ///< set of up osds for last pg we mapped to
1777 std::vector<int> acting; ///< set of acting osds for last pg we mapped to
1778 int up_primary = -1; ///< last up_primary we mapped to
1779 int acting_primary = -1; ///< last acting_primary we mapped to
1780 int size = -1; ///< the size of the pool when were were last mapped
1781 int min_size = -1; ///< the min size of the pool when were were last mapped
1782 bool sort_bitwise = false; ///< whether the hobject_t sort order is bitwise
1783 bool recovery_deletes = false; ///< whether the deletes are performed during recovery instead of peering
1784 uint32_t peering_crush_bucket_count = 0;
1785 uint32_t peering_crush_bucket_target = 0;
1786 uint32_t peering_crush_bucket_barrier = 0;
1787 int32_t peering_crush_mandatory_member = CRUSH_ITEM_NONE;
1788
1789 bool used_replica = false;
1790 bool paused = false;
1791
1792 int osd = -1; ///< the final target osd, or -1
1793
1794 epoch_t last_force_resend = 0;
1795
1796 op_target_t(object_t oid, object_locator_t oloc, int flags)
1797 : flags(flags),
1798 base_oid(oid),
1799 base_oloc(oloc)
1800 {}
1801
1802 explicit op_target_t(pg_t pgid)
1803 : base_oloc(pgid.pool(), pgid.ps()),
1804 precalc_pgid(true),
1805 base_pgid(pgid)
1806 {}
1807
1808 op_target_t() = default;
1809
1810 hobject_t get_hobj() {
1811 return hobject_t(target_oid,
1812 target_oloc.key,
1813 CEPH_NOSNAP,
1814 target_oloc.hash >= 0 ? target_oloc.hash : pgid.ps(),
1815 target_oloc.pool,
1816 target_oloc.nspace);
1817 }
1818
1819 bool contained_by(const hobject_t& begin, const hobject_t& end) {
1820 hobject_t h = get_hobj();
1821 int r = cmp(h, begin);
1822 return r == 0 || (r > 0 && h < end);
1823 }
1824
1825 bool respects_full() const {
1826 return
1827 (flags & (CEPH_OSD_FLAG_WRITE | CEPH_OSD_FLAG_RWORDERED)) &&
1828 !(flags & (CEPH_OSD_FLAG_FULL_TRY | CEPH_OSD_FLAG_FULL_FORCE));
1829 }
1830
1831 void dump(ceph::Formatter *f) const;
1832 };
1833
1834 std::unique_ptr<ceph::async::Completion<void(boost::system::error_code)>>
1835 OpContextVert(Context* c) {
1836 if (c)
1837 return ceph::async::Completion<void(boost::system::error_code)>::create(
1838 service.get_executor(),
1839 [c = std::unique_ptr<Context>(c)]
1840 (boost::system::error_code e) mutable {
1841 c.release()->complete(e);
1842 });
1843 else
1844 return nullptr;
1845 }
1846
1847 template<typename T>
1848 std::unique_ptr<ceph::async::Completion<void(boost::system::error_code, T)>>
1849 OpContextVert(Context* c, T* p) {
1850
1851 if (c || p)
1852 return
1853 ceph::async::Completion<void(boost::system::error_code, T)>::create(
1854 service.get_executor(),
1855 [c = std::unique_ptr<Context>(c), p]
1856 (boost::system::error_code e, T r) mutable {
1857 if (p)
1858 *p = std::move(r);
1859 if (c)
1860 c.release()->complete(ceph::from_error_code(e));
1861 });
1862 else
1863 return nullptr;
1864 }
1865
1866 template<typename T>
1867 std::unique_ptr<ceph::async::Completion<void(boost::system::error_code, T)>>
1868 OpContextVert(Context* c, T& p) {
1869 if (c)
1870 return ceph::async::Completion<
1871 void(boost::system::error_code, T)>::create(
1872 service.get_executor(),
1873 [c = std::unique_ptr<Context>(c), &p]
1874 (boost::system::error_code e, T r) mutable {
1875 p = std::move(r);
1876 if (c)
1877 c.release()->complete(ceph::from_error_code(e));
1878 });
1879 else
1880 return nullptr;
1881 }
1882
1883 struct Op : public RefCountedObject {
1884 OSDSession *session = nullptr;
1885 int incarnation = 0;
1886
1887 op_target_t target;
1888
1889 ConnectionRef con = nullptr; // for rx buffer only
1890 uint64_t features = CEPH_FEATURES_SUPPORTED_DEFAULT; // explicitly specified op features
1891
1892 osdc_opvec ops;
1893
1894 snapid_t snapid = CEPH_NOSNAP;
1895 SnapContext snapc;
1896 ceph::real_time mtime;
1897
1898 ceph::buffer::list *outbl = nullptr;
1899 boost::container::small_vector<ceph::buffer::list*, osdc_opvec_len> out_bl;
1900 boost::container::small_vector<
1901 fu2::unique_function<void(boost::system::error_code, int,
1902 const ceph::buffer::list& bl) &&>,
1903 osdc_opvec_len> out_handler;
1904 boost::container::small_vector<int*, osdc_opvec_len> out_rval;
1905 boost::container::small_vector<boost::system::error_code*,
1906 osdc_opvec_len> out_ec;
1907
1908 int priority = 0;
1909 using OpSig = void(boost::system::error_code);
1910 using OpComp = ceph::async::Completion<OpSig>;
1911 // Due to an irregularity of cmpxattr, we actualy need the 'int'
1912 // value for onfinish for legacy librados users. As such just
1913 // preserve the Context* in this one case. That way we can have
1914 // our callers just pass in a unique_ptr<OpComp> and not deal with
1915 // our signature in Objecter being different than the exposed
1916 // signature in RADOS.
1917 //
1918 // Add a function for the linger case, where we want better
1919 // semantics than Context, but still need to be under the completion_lock.
1920 std::variant<std::unique_ptr<OpComp>, fu2::unique_function<OpSig>,
1921 Context*> onfinish;
1922 uint64_t ontimeout = 0;
1923
1924 ceph_tid_t tid = 0;
1925 int attempts = 0;
1926
1927 version_t *objver;
1928 epoch_t *reply_epoch = nullptr;
1929
1930 ceph::coarse_mono_time stamp;
1931
1932 epoch_t map_dne_bound = 0;
1933
1934 int budget = -1;
1935
1936 /// true if we should resend this message on failure
1937 bool should_resend = true;
1938
1939 /// true if the throttle budget is get/put on a series of OPs,
1940 /// instead of per OP basis, when this flag is set, the budget is
1941 /// acquired before sending the very first OP of the series and
1942 /// released upon receiving the last OP reply.
1943 bool ctx_budgeted = false;
1944
1945 int *data_offset;
1946
1947 osd_reqid_t reqid; // explicitly setting reqid
1948 ZTracer::Trace trace;
1949
1950 static bool has_completion(decltype(onfinish)& f) {
1951 return std::visit([](auto&& arg) { return bool(arg);}, f);
1952 }
1953 bool has_completion() {
1954 return has_completion(onfinish);
1955 }
1956
1957 static void complete(decltype(onfinish)&& f, boost::system::error_code ec,
1958 int r) {
1959 std::visit([ec, r](auto&& arg) {
1960 if constexpr (std::is_same_v<std::decay_t<decltype(arg)>,
1961 Context*>) {
1962 arg->complete(r);
1963 } else if constexpr (std::is_same_v<std::decay_t<decltype(arg)>,
1964 fu2::unique_function<OpSig>>) {
1965 std::move(arg)(ec);
1966 } else {
1967 arg->defer(std::move(arg), ec);
1968 }
1969 }, std::move(f));
1970 }
1971 void complete(boost::system::error_code ec, int r) {
1972 complete(std::move(onfinish), ec, r);
1973 }
1974
1975 Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops,
1976 int f, std::unique_ptr<OpComp>&& fin,
1977 version_t *ov, int *offset = nullptr,
1978 ZTracer::Trace *parent_trace = nullptr) :
1979 target(o, ol, f),
1980 ops(std::move(_ops)),
1981 out_bl(ops.size(), nullptr),
1982 out_handler(ops.size()),
1983 out_rval(ops.size(), nullptr),
1984 out_ec(ops.size(), nullptr),
1985 onfinish(std::move(fin)),
1986 objver(ov),
1987 data_offset(offset) {
1988 if (target.base_oloc.key == o)
1989 target.base_oloc.key.clear();
1990 if (parent_trace && parent_trace->valid()) {
1991 trace.init("op", nullptr, parent_trace);
1992 trace.event("start");
1993 }
1994 }
1995
1996 Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops,
1997 int f, Context* fin, version_t *ov, int *offset = nullptr,
1998 ZTracer::Trace *parent_trace = nullptr) :
1999 target(o, ol, f),
2000 ops(std::move(_ops)),
2001 out_bl(ops.size(), nullptr),
2002 out_handler(ops.size()),
2003 out_rval(ops.size(), nullptr),
2004 out_ec(ops.size(), nullptr),
2005 onfinish(fin),
2006 objver(ov),
2007 data_offset(offset) {
2008 if (target.base_oloc.key == o)
2009 target.base_oloc.key.clear();
2010 if (parent_trace && parent_trace->valid()) {
2011 trace.init("op", nullptr, parent_trace);
2012 trace.event("start");
2013 }
2014 }
2015
2016 Op(const object_t& o, const object_locator_t& ol, osdc_opvec&& _ops,
2017 int f, fu2::unique_function<OpSig>&& fin, version_t *ov, int *offset = nullptr,
2018 ZTracer::Trace *parent_trace = nullptr) :
2019 target(o, ol, f),
2020 ops(std::move(_ops)),
2021 out_bl(ops.size(), nullptr),
2022 out_handler(ops.size()),
2023 out_rval(ops.size(), nullptr),
2024 out_ec(ops.size(), nullptr),
2025 onfinish(std::move(fin)),
2026 objver(ov),
2027 data_offset(offset) {
2028 if (target.base_oloc.key == o)
2029 target.base_oloc.key.clear();
2030 if (parent_trace && parent_trace->valid()) {
2031 trace.init("op", nullptr, parent_trace);
2032 trace.event("start");
2033 }
2034 }
2035
2036 bool operator<(const Op& other) const {
2037 return tid < other.tid;
2038 }
2039
2040 private:
2041 ~Op() override {
2042 trace.event("finish");
2043 }
2044 };
2045
2046 struct CB_Op_Map_Latest {
2047 Objecter *objecter;
2048 ceph_tid_t tid;
2049 CB_Op_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t) {}
2050 void operator()(boost::system::error_code err, version_t latest, version_t);
2051 };
2052
2053 struct CB_Command_Map_Latest {
2054 Objecter *objecter;
2055 uint64_t tid;
2056 CB_Command_Map_Latest(Objecter *o, ceph_tid_t t) : objecter(o), tid(t) {}
2057 void operator()(boost::system::error_code err, version_t latest, version_t);
2058 };
2059
2060 struct C_Stat : public Context {
2061 ceph::buffer::list bl;
2062 uint64_t *psize;
2063 ceph::real_time *pmtime;
2064 Context *fin;
2065 C_Stat(uint64_t *ps, ceph::real_time *pm, Context *c) :
2066 psize(ps), pmtime(pm), fin(c) {}
2067 void finish(int r) override {
2068 using ceph::decode;
2069 if (r >= 0) {
2070 auto p = bl.cbegin();
2071 uint64_t s;
2072 ceph::real_time m;
2073 decode(s, p);
2074 decode(m, p);
2075 if (psize)
2076 *psize = s;
2077 if (pmtime)
2078 *pmtime = m;
2079 }
2080 fin->complete(r);
2081 }
2082 };
2083
2084 struct C_GetAttrs : public Context {
2085 ceph::buffer::list bl;
2086 std::map<std::string,ceph::buffer::list>& attrset;
2087 Context *fin;
2088 C_GetAttrs(std::map<std::string, ceph::buffer::list>& set, Context *c) : attrset(set),
2089 fin(c) {}
2090 void finish(int r) override {
2091 using ceph::decode;
2092 if (r >= 0) {
2093 auto p = bl.cbegin();
2094 decode(attrset, p);
2095 }
2096 fin->complete(r);
2097 }
2098 };
2099
2100
2101 // Pools and statistics
2102 struct NListContext {
2103 collection_list_handle_t pos;
2104
2105 // these are for !sortbitwise compat only
2106 int current_pg = 0;
2107 int starting_pg_num = 0;
2108 bool sort_bitwise = false;
2109
2110 bool at_end_of_pool = false; ///< publicly visible end flag
2111
2112 int64_t pool_id = -1;
2113 int pool_snap_seq = 0;
2114 uint64_t max_entries = 0;
2115 std::string nspace;
2116
2117 ceph::buffer::list bl; // raw data read to here
2118 std::list<librados::ListObjectImpl> list;
2119
2120 ceph::buffer::list filter;
2121
2122 // The budget associated with this context, once it is set (>= 0),
2123 // the budget is not get/released on OP basis, instead the budget
2124 // is acquired before sending the first OP and released upon receiving
2125 // the last op reply.
2126 int ctx_budget = -1;
2127
2128 bool at_end() const {
2129 return at_end_of_pool;
2130 }
2131
2132 uint32_t get_pg_hash_position() const {
2133 return pos.get_hash();
2134 }
2135 };
2136
2137 struct C_NList : public Context {
2138 NListContext *list_context;
2139 Context *final_finish;
2140 Objecter *objecter;
2141 epoch_t epoch;
2142 C_NList(NListContext *lc, Context * finish, Objecter *ob) :
2143 list_context(lc), final_finish(finish), objecter(ob), epoch(0) {}
2144 void finish(int r) override {
2145 if (r >= 0) {
2146 objecter->_nlist_reply(list_context, r, final_finish, epoch);
2147 } else {
2148 final_finish->complete(r);
2149 }
2150 }
2151 };
2152
2153 struct PoolStatOp {
2154 ceph_tid_t tid;
2155 std::vector<std::string> pools;
2156 using OpSig = void(boost::system::error_code,
2157 boost::container::flat_map<std::string, pool_stat_t>,
2158 bool);
2159 using OpComp = ceph::async::Completion<OpSig>;
2160 std::unique_ptr<OpComp> onfinish;
2161 std::uint64_t ontimeout;
2162 ceph::coarse_mono_time last_submit;
2163 };
2164
2165 struct StatfsOp {
2166 ceph_tid_t tid;
2167 boost::optional<int64_t> data_pool;
2168 using OpSig = void(boost::system::error_code,
2169 const struct ceph_statfs);
2170 using OpComp = ceph::async::Completion<OpSig>;
2171
2172 std::unique_ptr<OpComp> onfinish;
2173 uint64_t ontimeout;
2174
2175 ceph::coarse_mono_time last_submit;
2176 };
2177
2178 struct PoolOp {
2179 ceph_tid_t tid = 0;
2180 int64_t pool = 0;
2181 std::string name;
2182 using OpSig = void(boost::system::error_code, ceph::buffer::list);
2183 using OpComp = ceph::async::Completion<OpSig>;
2184 std::unique_ptr<OpComp> onfinish;
2185 uint64_t ontimeout = 0;
2186 int pool_op = 0;
2187 int16_t crush_rule = 0;
2188 snapid_t snapid = 0;
2189 ceph::coarse_mono_time last_submit;
2190
2191 PoolOp() {}
2192 };
2193
2194 // -- osd commands --
2195 struct CommandOp : public RefCountedObject {
2196 OSDSession *session = nullptr;
2197 ceph_tid_t tid = 0;
2198 std::vector<std::string> cmd;
2199 ceph::buffer::list inbl;
2200
2201 // target_osd == -1 means target_pg is valid
2202 const int target_osd = -1;
2203 const pg_t target_pg;
2204
2205 op_target_t target;
2206
2207 epoch_t map_dne_bound = 0;
2208 int map_check_error = 0; // error to return if std::map check fails
2209 const char *map_check_error_str = nullptr;
2210
2211 using OpSig = void(boost::system::error_code, std::string,
2212 ceph::buffer::list);
2213 using OpComp = ceph::async::Completion<OpSig>;
2214 std::unique_ptr<OpComp> onfinish;
2215
2216 uint64_t ontimeout = 0;
2217 ceph::coarse_mono_time last_submit;
2218
2219 CommandOp(
2220 int target_osd,
2221 std::vector<string>&& cmd,
2222 ceph::buffer::list&& inbl,
2223 decltype(onfinish)&& onfinish)
2224 : cmd(std::move(cmd)),
2225 inbl(std::move(inbl)),
2226 target_osd(target_osd),
2227 onfinish(std::move(onfinish)) {}
2228
2229 CommandOp(
2230 pg_t pgid,
2231 std::vector<string>&& cmd,
2232 ceph::buffer::list&& inbl,
2233 decltype(onfinish)&& onfinish)
2234 : cmd(std::move(cmd)),
2235 inbl(std::move(inbl)),
2236 target_pg(pgid),
2237 target(pgid),
2238 onfinish(std::move(onfinish)) {}
2239 };
2240
2241 void submit_command(CommandOp *c, ceph_tid_t *ptid);
2242 int _calc_command_target(CommandOp *c,
2243 ceph::shunique_lock<ceph::shared_mutex> &sul);
2244 void _assign_command_session(CommandOp *c,
2245 ceph::shunique_lock<ceph::shared_mutex> &sul);
2246 void _send_command(CommandOp *c);
2247 int command_op_cancel(OSDSession *s, ceph_tid_t tid,
2248 boost::system::error_code ec);
2249 void _finish_command(CommandOp *c, boost::system::error_code ec,
2250 std::string&& rs, ceph::buffer::list&& bl);
2251 void handle_command_reply(MCommandReply *m);
2252
2253 // -- lingering ops --
2254
2255 struct LingerOp : public RefCountedObject {
2256 Objecter *objecter;
2257 uint64_t linger_id{0};
2258 op_target_t target{object_t(), object_locator_t(), 0};
2259 snapid_t snap{CEPH_NOSNAP};
2260 SnapContext snapc;
2261 ceph::real_time mtime;
2262
2263 osdc_opvec ops;
2264 ceph::buffer::list inbl;
2265 version_t *pobjver{nullptr};
2266
2267 bool is_watch{false};
2268 ceph::coarse_mono_time watch_valid_thru; ///< send time for last acked ping
2269 boost::system::error_code last_error; ///< error from last failed ping|reconnect, if any
2270 ceph::shared_mutex watch_lock;
2271
2272 // queue of pending async operations, with the timestamp of
2273 // when they were queued.
2274 std::list<ceph::coarse_mono_time> watch_pending_async;
2275
2276 uint32_t register_gen{0};
2277 bool registered{false};
2278 bool canceled{false};
2279 using OpSig = void(boost::system::error_code, ceph::buffer::list);
2280 using OpComp = ceph::async::Completion<OpSig>;
2281 std::unique_ptr<OpComp> on_reg_commit;
2282 std::unique_ptr<OpComp> on_notify_finish;
2283 uint64_t notify_id{0};
2284
2285 fu2::unique_function<void(boost::system::error_code,
2286 uint64_t notify_id,
2287 uint64_t cookie,
2288 uint64_t notifier_id,
2289 ceph::buffer::list&& bl)> handle;
2290 OSDSession *session{nullptr};
2291
2292 int ctx_budget{-1};
2293 ceph_tid_t register_tid{0};
2294 ceph_tid_t ping_tid{0};
2295 epoch_t map_dne_bound{0};
2296
2297 void _queued_async() {
2298 // watch_lock ust be locked unique
2299 watch_pending_async.push_back(ceph::coarse_mono_clock::now());
2300 }
2301 void finished_async() {
2302 unique_lock l(watch_lock);
2303 ceph_assert(!watch_pending_async.empty());
2304 watch_pending_async.pop_front();
2305 }
2306
2307 LingerOp(Objecter *o, uint64_t linger_id);
2308 const LingerOp& operator=(const LingerOp& r) = delete;
2309 LingerOp(const LingerOp& o) = delete;
2310
2311 uint64_t get_cookie() {
2312 return reinterpret_cast<uint64_t>(this);
2313 }
2314 };
2315
2316 struct CB_Linger_Commit {
2317 Objecter *objecter;
2318 boost::intrusive_ptr<LingerOp> info;
2319 ceph::buffer::list outbl; // used for notify only
2320 CB_Linger_Commit(Objecter *o, LingerOp *l) : objecter(o), info(l) {}
2321 ~CB_Linger_Commit() = default;
2322
2323 void operator()(boost::system::error_code ec) {
2324 objecter->_linger_commit(info.get(), ec, outbl);
2325 }
2326 };
2327
2328 struct CB_Linger_Reconnect {
2329 Objecter *objecter;
2330 boost::intrusive_ptr<LingerOp> info;
2331 CB_Linger_Reconnect(Objecter *o, LingerOp *l) : objecter(o), info(l) {}
2332 ~CB_Linger_Reconnect() = default;
2333
2334 void operator()(boost::system::error_code ec) {
2335 objecter->_linger_reconnect(info.get(), ec);
2336 info.reset();
2337 }
2338 };
2339
2340 struct CB_Linger_Ping {
2341 Objecter *objecter;
2342 boost::intrusive_ptr<LingerOp> info;
2343 ceph::coarse_mono_time sent;
2344 uint32_t register_gen;
2345 CB_Linger_Ping(Objecter *o, LingerOp *l, ceph::coarse_mono_time s)
2346 : objecter(o), info(l), sent(s), register_gen(info->register_gen) {}
2347 void operator()(boost::system::error_code ec) {
2348 objecter->_linger_ping(info.get(), ec, sent, register_gen);
2349 info.reset();
2350 }
2351 };
2352
2353 struct CB_Linger_Map_Latest {
2354 Objecter *objecter;
2355 uint64_t linger_id;
2356 CB_Linger_Map_Latest(Objecter *o, uint64_t id) : objecter(o), linger_id(id) {}
2357 void operator()(boost::system::error_code err, version_t latest, version_t);
2358 };
2359
2360 // -- osd sessions --
2361 struct OSDBackoff {
2362 spg_t pgid;
2363 uint64_t id;
2364 hobject_t begin, end;
2365 };
2366
2367 struct OSDSession : public RefCountedObject {
2368 // pending ops
2369 std::map<ceph_tid_t,Op*> ops;
2370 std::map<uint64_t, LingerOp*> linger_ops;
2371 std::map<ceph_tid_t,CommandOp*> command_ops;
2372
2373 // backoffs
2374 std::map<spg_t,std::map<hobject_t,OSDBackoff>> backoffs;
2375 std::map<uint64_t,OSDBackoff*> backoffs_by_id;
2376
2377 int osd;
2378 // NB locking two sessions at the same time is only safe because
2379 // it is only done in _recalc_linger_op_target with s and
2380 // linger_op->session, and it holds rwlock for write. We disable
2381 // lockdep (using std::sharedMutex) because lockdep doesn't know
2382 // that.
2383 std::shared_mutex lock;
2384
2385 int incarnation;
2386 ConnectionRef con;
2387 int num_locks;
2388 std::unique_ptr<std::mutex[]> completion_locks;
2389
2390 OSDSession(CephContext *cct, int o) :
2391 osd(o), incarnation(0), con(NULL),
2392 num_locks(cct->_conf->objecter_completion_locks_per_session),
2393 completion_locks(new std::mutex[num_locks]) {}
2394
2395 ~OSDSession() override;
2396
2397 bool is_homeless() { return (osd == -1); }
2398
2399 std::unique_lock<std::mutex> get_lock(object_t& oid);
2400 };
2401 std::map<int,OSDSession*> osd_sessions;
2402
2403 bool osdmap_full_flag() const;
2404 bool osdmap_pool_full(const int64_t pool_id) const;
2405
2406
2407 private:
2408
2409 /**
2410 * Test pg_pool_t::FLAG_FULL on a pool
2411 *
2412 * @return true if the pool exists and has the flag set, or
2413 * the global full flag is set, else false
2414 */
2415 bool _osdmap_pool_full(const int64_t pool_id) const;
2416 bool _osdmap_pool_full(const pg_pool_t &p) const {
2417 return p.has_flag(pg_pool_t::FLAG_FULL) && honor_pool_full;
2418 }
2419 void update_pool_full_map(std::map<int64_t, bool>& pool_full_map);
2420
2421 std::map<uint64_t, LingerOp*> linger_ops;
2422 // we use this just to confirm a cookie is valid before dereferencing the ptr
2423 std::set<LingerOp*> linger_ops_set;
2424
2425 std::map<ceph_tid_t,PoolStatOp*> poolstat_ops;
2426 std::map<ceph_tid_t,StatfsOp*> statfs_ops;
2427 std::map<ceph_tid_t,PoolOp*> pool_ops;
2428 std::atomic<unsigned> num_homeless_ops{0};
2429
2430 OSDSession* homeless_session = new OSDSession(cct, -1);
2431
2432
2433 // ops waiting for an osdmap with a new pool or confirmation that
2434 // the pool does not exist (may be expanded to other uses later)
2435 std::map<uint64_t, LingerOp*> check_latest_map_lingers;
2436 std::map<ceph_tid_t, Op*> check_latest_map_ops;
2437 std::map<ceph_tid_t, CommandOp*> check_latest_map_commands;
2438
2439 std::map<epoch_t,
2440 std::vector<std::pair<std::unique_ptr<OpCompletion>,
2441 boost::system::error_code>>> waiting_for_map;
2442
2443 ceph::timespan mon_timeout;
2444 ceph::timespan osd_timeout;
2445
2446 MOSDOp *_prepare_osd_op(Op *op);
2447 void _send_op(Op *op);
2448 void _send_op_account(Op *op);
2449 void _cancel_linger_op(Op *op);
2450 void _finish_op(Op *op, int r);
2451 static bool is_pg_changed(
2452 int oldprimary,
2453 const std::vector<int>& oldacting,
2454 int newprimary,
2455 const std::vector<int>& newacting,
2456 bool any_change=false);
2457 enum recalc_op_target_result {
2458 RECALC_OP_TARGET_NO_ACTION = 0,
2459 RECALC_OP_TARGET_NEED_RESEND,
2460 RECALC_OP_TARGET_POOL_DNE,
2461 RECALC_OP_TARGET_OSD_DNE,
2462 RECALC_OP_TARGET_OSD_DOWN,
2463 };
2464 bool _osdmap_full_flag() const;
2465 bool _osdmap_has_pool_full() const;
2466 void _prune_snapc(
2467 const mempool::osdmap::map<int64_t, snap_interval_set_t>& new_removed_snaps,
2468 Op *op);
2469
2470 bool target_should_be_paused(op_target_t *op);
2471 int _calc_target(op_target_t *t, Connection *con,
2472 bool any_change = false);
2473 int _map_session(op_target_t *op, OSDSession **s,
2474 ceph::shunique_lock<ceph::shared_mutex>& lc);
2475
2476 void _session_op_assign(OSDSession *s, Op *op);
2477 void _session_op_remove(OSDSession *s, Op *op);
2478 void _session_linger_op_assign(OSDSession *to, LingerOp *op);
2479 void _session_linger_op_remove(OSDSession *from, LingerOp *op);
2480 void _session_command_op_assign(OSDSession *to, CommandOp *op);
2481 void _session_command_op_remove(OSDSession *from, CommandOp *op);
2482
2483 int _assign_op_target_session(Op *op, ceph::shunique_lock<ceph::shared_mutex>& lc,
2484 bool src_session_locked,
2485 bool dst_session_locked);
2486 int _recalc_linger_op_target(LingerOp *op,
2487 ceph::shunique_lock<ceph::shared_mutex>& lc);
2488
2489 void _linger_submit(LingerOp *info,
2490 ceph::shunique_lock<ceph::shared_mutex>& sul);
2491 void _send_linger(LingerOp *info,
2492 ceph::shunique_lock<ceph::shared_mutex>& sul);
2493 void _linger_commit(LingerOp *info, boost::system::error_code ec,
2494 ceph::buffer::list& outbl);
2495 void _linger_reconnect(LingerOp *info, boost::system::error_code ec);
2496 void _send_linger_ping(LingerOp *info);
2497 void _linger_ping(LingerOp *info, boost::system::error_code ec,
2498 ceph::coarse_mono_time sent, uint32_t register_gen);
2499 boost::system::error_code _normalize_watch_error(boost::system::error_code ec);
2500
2501 friend class CB_Objecter_GetVersion;
2502 friend class CB_DoWatchError;
2503 public:
2504 template<typename CT>
2505 auto linger_callback_flush(CT&& ct) {
2506 boost::asio::async_completion<CT, void(void)> init(ct);
2507 boost::asio::defer(finish_strand, std::move(init.completion_handler));
2508 return init.result.get();
2509 }
2510
2511 private:
2512 void _check_op_pool_dne(Op *op, std::unique_lock<std::shared_mutex> *sl);
2513 void _send_op_map_check(Op *op);
2514 void _op_cancel_map_check(Op *op);
2515 void _check_linger_pool_dne(LingerOp *op, bool *need_unregister);
2516 void _send_linger_map_check(LingerOp *op);
2517 void _linger_cancel_map_check(LingerOp *op);
2518 void _check_command_map_dne(CommandOp *op);
2519 void _send_command_map_check(CommandOp *op);
2520 void _command_cancel_map_check(CommandOp *op);
2521
2522 void _kick_requests(OSDSession *session, std::map<uint64_t, LingerOp *>& lresend);
2523 void _linger_ops_resend(std::map<uint64_t, LingerOp *>& lresend,
2524 std::unique_lock<ceph::shared_mutex>& ul);
2525
2526 int _get_session(int osd, OSDSession **session,
2527 ceph::shunique_lock<ceph::shared_mutex>& sul);
2528 void put_session(OSDSession *s);
2529 void get_session(OSDSession *s);
2530 void _reopen_session(OSDSession *session);
2531 void close_session(OSDSession *session);
2532
2533 void _nlist_reply(NListContext *list_context, int r, Context *final_finish,
2534 epoch_t reply_epoch);
2535
2536 void resend_mon_ops();
2537
2538 /**
2539 * handle a budget for in-flight ops
2540 * budget is taken whenever an op goes into the ops std::map
2541 * and returned whenever an op is removed from the std::map
2542 * If throttle_op needs to throttle it will unlock client_lock.
2543 */
2544 int calc_op_budget(const boost::container::small_vector_base<OSDOp>& ops);
2545 void _throttle_op(Op *op, ceph::shunique_lock<ceph::shared_mutex>& sul,
2546 int op_size = 0);
2547 int _take_op_budget(Op *op, ceph::shunique_lock<ceph::shared_mutex>& sul) {
2548 ceph_assert(sul && sul.mutex() == &rwlock);
2549 int op_budget = calc_op_budget(op->ops);
2550 if (keep_balanced_budget) {
2551 _throttle_op(op, sul, op_budget);
2552 } else { // update take_linger_budget to match this!
2553 op_throttle_bytes.take(op_budget);
2554 op_throttle_ops.take(1);
2555 }
2556 op->budget = op_budget;
2557 return op_budget;
2558 }
2559 int take_linger_budget(LingerOp *info);
2560 void put_op_budget_bytes(int op_budget) {
2561 ceph_assert(op_budget >= 0);
2562 op_throttle_bytes.put(op_budget);
2563 op_throttle_ops.put(1);
2564 }
2565 void put_nlist_context_budget(NListContext *list_context);
2566 Throttle op_throttle_bytes{cct, "objecter_bytes",
2567 static_cast<int64_t>(
2568 cct->_conf->objecter_inflight_op_bytes)};
2569 Throttle op_throttle_ops{cct, "objecter_ops",
2570 static_cast<int64_t>(
2571 cct->_conf->objecter_inflight_ops)};
2572 public:
2573 Objecter(CephContext *cct, Messenger *m, MonClient *mc,
2574 boost::asio::io_context& service);
2575 ~Objecter() override;
2576
2577 void init();
2578 void start(const OSDMap *o = nullptr);
2579 void shutdown();
2580
2581 // These two templates replace osdmap_(get)|(put)_read. Simply wrap
2582 // whatever functionality you want to use the OSDMap in a lambda like:
2583 //
2584 // with_osdmap([](const OSDMap& o) { o.do_stuff(); });
2585 //
2586 // or
2587 //
2588 // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); });
2589 //
2590 // Do not call into something that will try to lock the OSDMap from
2591 // here or you will have great woe and misery.
2592
2593 template<typename Callback, typename...Args>
2594 decltype(auto) with_osdmap(Callback&& cb, Args&&... args) {
2595 shared_lock l(rwlock);
2596 return std::forward<Callback>(cb)(*osdmap, std::forward<Args>(args)...);
2597 }
2598
2599
2600 /**
2601 * Tell the objecter to throttle outgoing ops according to its
2602 * budget (in _conf). If you do this, ops can block, in
2603 * which case it will unlock client_lock and sleep until
2604 * incoming messages reduce the used budget low enough for
2605 * the ops to continue going; then it will lock client_lock again.
2606 */
2607 void set_balanced_budget() { keep_balanced_budget = true; }
2608 void unset_balanced_budget() { keep_balanced_budget = false; }
2609
2610 void set_honor_pool_full() { honor_pool_full = true; }
2611 void unset_honor_pool_full() { honor_pool_full = false; }
2612
2613 void _scan_requests(
2614 OSDSession *s,
2615 bool skipped_map,
2616 bool cluster_full,
2617 std::map<int64_t, bool> *pool_full_map,
2618 std::map<ceph_tid_t, Op*>& need_resend,
2619 std::list<LingerOp*>& need_resend_linger,
2620 std::map<ceph_tid_t, CommandOp*>& need_resend_command,
2621 ceph::shunique_lock<ceph::shared_mutex>& sul);
2622
2623 int64_t get_object_hash_position(int64_t pool, const std::string& key,
2624 const std::string& ns);
2625 int64_t get_object_pg_hash_position(int64_t pool, const std::string& key,
2626 const std::string& ns);
2627
2628 // messages
2629 public:
2630 bool ms_dispatch(Message *m) override;
2631 bool ms_can_fast_dispatch_any() const override {
2632 return true;
2633 }
2634 bool ms_can_fast_dispatch(const Message *m) const override {
2635 switch (m->get_type()) {
2636 case CEPH_MSG_OSD_OPREPLY:
2637 case CEPH_MSG_WATCH_NOTIFY:
2638 return true;
2639 default:
2640 return false;
2641 }
2642 }
2643 void ms_fast_dispatch(Message *m) override {
2644 if (!ms_dispatch(m)) {
2645 m->put();
2646 }
2647 }
2648
2649 void handle_osd_op_reply(class MOSDOpReply *m);
2650 void handle_osd_backoff(class MOSDBackoff *m);
2651 void handle_watch_notify(class MWatchNotify *m);
2652 void handle_osd_map(class MOSDMap *m);
2653 void wait_for_osd_map(epoch_t e=0);
2654
2655 template<typename CompletionToken>
2656 auto wait_for_osd_map(CompletionToken&& token) {
2657 boost::asio::async_completion<CompletionToken, void()> init(token);
2658 unique_lock l(rwlock);
2659 if (osdmap->get_epoch()) {
2660 l.unlock();
2661 boost::asio::post(std::move(init.completion_handler));
2662 } else {
2663 waiting_for_map[0].emplace_back(
2664 OpCompletion::create(
2665 service.get_executor(),
2666 [c = std::move(init.completion_handler)]
2667 (boost::system::error_code) mutable {
2668 std::move(c)();
2669 }), boost::system::error_code{});
2670 l.unlock();
2671 }
2672 return init.result.get();
2673 }
2674
2675
2676 /**
2677 * Get std::list of entities blocklisted since this was last called,
2678 * and reset the std::list.
2679 *
2680 * Uses a std::set because typical use case is to compare some
2681 * other std::list of clients to see which overlap with the blocklisted
2682 * addrs.
2683 *
2684 */
2685 void consume_blocklist_events(std::set<entity_addr_t> *events);
2686
2687 int pool_snap_by_name(int64_t poolid,
2688 const char *snap_name,
2689 snapid_t *snap) const;
2690 int pool_snap_get_info(int64_t poolid, snapid_t snap,
2691 pool_snap_info_t *info) const;
2692 int pool_snap_list(int64_t poolid, std::vector<uint64_t> *snaps);
2693 private:
2694
2695 void emit_blocklist_events(const OSDMap::Incremental &inc);
2696 void emit_blocklist_events(const OSDMap &old_osd_map,
2697 const OSDMap &new_osd_map);
2698
2699 // low-level
2700 void _op_submit(Op *op, ceph::shunique_lock<ceph::shared_mutex>& lc,
2701 ceph_tid_t *ptid);
2702 void _op_submit_with_budget(Op *op,
2703 ceph::shunique_lock<ceph::shared_mutex>& lc,
2704 ceph_tid_t *ptid,
2705 int *ctx_budget = NULL);
2706 // public interface
2707 public:
2708 void op_submit(Op *op, ceph_tid_t *ptid = NULL, int *ctx_budget = NULL);
2709 bool is_active() {
2710 std::shared_lock l(rwlock);
2711 return !((!inflight_ops) && linger_ops.empty() &&
2712 poolstat_ops.empty() && statfs_ops.empty());
2713 }
2714
2715 /**
2716 * Output in-flight requests
2717 */
2718 void _dump_active(OSDSession *s);
2719 void _dump_active();
2720 void dump_active();
2721 void dump_requests(ceph::Formatter *fmt);
2722 void _dump_ops(const OSDSession *s, ceph::Formatter *fmt);
2723 void dump_ops(ceph::Formatter *fmt);
2724 void _dump_linger_ops(const OSDSession *s, ceph::Formatter *fmt);
2725 void dump_linger_ops(ceph::Formatter *fmt);
2726 void _dump_command_ops(const OSDSession *s, ceph::Formatter *fmt);
2727 void dump_command_ops(ceph::Formatter *fmt);
2728 void dump_pool_ops(ceph::Formatter *fmt) const;
2729 void dump_pool_stat_ops(ceph::Formatter *fmt) const;
2730 void dump_statfs_ops(ceph::Formatter *fmt) const;
2731
2732 int get_client_incarnation() const { return client_inc; }
2733 void set_client_incarnation(int inc) { client_inc = inc; }
2734
2735 bool have_map(epoch_t epoch);
2736
2737 struct CB_Objecter_GetVersion {
2738 Objecter *objecter;
2739 std::unique_ptr<OpCompletion> fin;
2740
2741 CB_Objecter_GetVersion(Objecter *o, std::unique_ptr<OpCompletion> c)
2742 : objecter(o), fin(std::move(c)) {}
2743 void operator()(boost::system::error_code ec, version_t newest,
2744 version_t oldest) {
2745 if (ec == boost::system::errc::resource_unavailable_try_again) {
2746 // try again as instructed
2747 objecter->_wait_for_latest_osdmap(std::move(*this));
2748 } else if (ec) {
2749 ceph::async::post(std::move(fin), ec);
2750 } else {
2751 auto l = std::unique_lock(objecter->rwlock);
2752 objecter->_get_latest_version(oldest, newest, std::move(fin),
2753 std::move(l));
2754 }
2755 }
2756 };
2757
2758 template<typename CompletionToken>
2759 auto wait_for_map(epoch_t epoch, CompletionToken&& token) {
2760 boost::asio::async_completion<CompletionToken, OpSignature> init(token);
2761
2762 if (osdmap->get_epoch() >= epoch) {
2763 boost::asio::post(service,
2764 ceph::async::bind_handler(
2765 std::move(init.completion_handler),
2766 boost::system::error_code()));
2767 } else {
2768 monc->get_version("osdmap",
2769 CB_Objecter_GetVersion(
2770 this,
2771 OpCompletion::create(service.get_executor(),
2772 std::move(init.completion_handler))));
2773 }
2774 return init.result.get();
2775 }
2776
2777 void _wait_for_new_map(std::unique_ptr<OpCompletion>, epoch_t epoch,
2778 boost::system::error_code = {});
2779
2780 private:
2781 void _wait_for_latest_osdmap(CB_Objecter_GetVersion&& c) {
2782 monc->get_version("osdmap", std::move(c));
2783 }
2784
2785 public:
2786
2787 template<typename CompletionToken>
2788 auto wait_for_latest_osdmap(CompletionToken&& token) {
2789 boost::asio::async_completion<CompletionToken, OpSignature> init(token);
2790
2791 monc->get_version("osdmap",
2792 CB_Objecter_GetVersion(
2793 this,
2794 OpCompletion::create(service.get_executor(),
2795 std::move(init.completion_handler))));
2796 return init.result.get();
2797 }
2798
2799 void wait_for_latest_osdmap(std::unique_ptr<OpCompletion> c) {
2800 monc->get_version("osdmap",
2801 CB_Objecter_GetVersion(this, std::move(c)));
2802 }
2803
2804 template<typename CompletionToken>
2805 auto get_latest_version(epoch_t oldest, epoch_t newest,
2806 CompletionToken&& token) {
2807 boost::asio::async_completion<CompletionToken, OpSignature> init(token);
2808 {
2809 std::unique_lock wl(rwlock);
2810 _get_latest_version(oldest, newest,
2811 OpCompletion::create(
2812 service.get_executor(),
2813 std::move(init.completion_handler)),
2814 std::move(wl));
2815 }
2816 return init.result.get();
2817 }
2818
2819 void _get_latest_version(epoch_t oldest, epoch_t neweset,
2820 std::unique_ptr<OpCompletion> fin,
2821 std::unique_lock<ceph::shared_mutex>&& ul);
2822
2823 /** Get the current set of global op flags */
2824 int get_global_op_flags() const { return global_op_flags; }
2825 /** Add a flag to the global op flags, not really atomic operation */
2826 void add_global_op_flags(int flag) {
2827 global_op_flags.fetch_or(flag);
2828 }
2829 /** Clear the passed flags from the global op flag set */
2830 void clear_global_op_flag(int flags) {
2831 global_op_flags.fetch_and(~flags);
2832 }
2833
2834 /// cancel an in-progress request with the given return code
2835 private:
2836 int op_cancel(OSDSession *s, ceph_tid_t tid, int r);
2837 int _op_cancel(ceph_tid_t tid, int r);
2838 public:
2839 int op_cancel(ceph_tid_t tid, int r);
2840 int op_cancel(const std::vector<ceph_tid_t>& tidls, int r);
2841
2842 /**
2843 * Any write op which is in progress at the start of this call shall no
2844 * longer be in progress when this call ends. Operations started after the
2845 * start of this call may still be in progress when this call ends.
2846 *
2847 * @return the latest possible epoch in which a cancelled op could have
2848 * existed, or -1 if nothing was cancelled.
2849 */
2850 epoch_t op_cancel_writes(int r, int64_t pool=-1);
2851
2852 // commands
2853 void osd_command(int osd, std::vector<std::string> cmd,
2854 ceph::buffer::list inbl, ceph_tid_t *ptid,
2855 decltype(CommandOp::onfinish)&& onfinish) {
2856 ceph_assert(osd >= 0);
2857 auto c = new CommandOp(
2858 osd,
2859 std::move(cmd),
2860 std::move(inbl),
2861 std::move(onfinish));
2862 submit_command(c, ptid);
2863 }
2864 template<typename CompletionToken>
2865 auto osd_command(int osd, std::vector<std::string> cmd,
2866 ceph::buffer::list inbl, ceph_tid_t *ptid,
2867 CompletionToken&& token) {
2868 boost::asio::async_completion<CompletionToken,
2869 CommandOp::OpSig> init(token);
2870 osd_command(osd, std::move(cmd), std::move(inbl), ptid,
2871 CommandOp::OpComp::create(service.get_executor(),
2872 std::move(init.completion_handler)));
2873 return init.result.get();
2874 }
2875
2876 void pg_command(pg_t pgid, std::vector<std::string> cmd,
2877 ceph::buffer::list inbl, ceph_tid_t *ptid,
2878 decltype(CommandOp::onfinish)&& onfinish) {
2879 auto *c = new CommandOp(
2880 pgid,
2881 std::move(cmd),
2882 std::move(inbl),
2883 std::move(onfinish));
2884 submit_command(c, ptid);
2885 }
2886
2887 template<typename CompletionToken>
2888 auto pg_command(pg_t pgid, std::vector<std::string> cmd,
2889 ceph::buffer::list inbl, ceph_tid_t *ptid,
2890 CompletionToken&& token) {
2891 boost::asio::async_completion<CompletionToken,
2892 CommandOp::OpSig> init(token);
2893 pg_command(pgid, std::move(cmd), std::move(inbl), ptid,
2894 CommandOp::OpComp::create(service.get_executor(),
2895 std::move(init.completion_handler)));
2896 return init.result.get();
2897 }
2898
2899 // mid-level helpers
2900 Op *prepare_mutate_op(
2901 const object_t& oid, const object_locator_t& oloc,
2902 ObjectOperation& op, const SnapContext& snapc,
2903 ceph::real_time mtime, int flags,
2904 Context *oncommit, version_t *objver = NULL,
2905 osd_reqid_t reqid = osd_reqid_t(),
2906 ZTracer::Trace *parent_trace = nullptr) {
2907 Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags |
2908 CEPH_OSD_FLAG_WRITE, oncommit, objver,
2909 nullptr, parent_trace);
2910 o->priority = op.priority;
2911 o->mtime = mtime;
2912 o->snapc = snapc;
2913 o->out_rval.swap(op.out_rval);
2914 o->out_bl.swap(op.out_bl);
2915 o->out_handler.swap(op.out_handler);
2916 o->out_ec.swap(op.out_ec);
2917 o->reqid = reqid;
2918 op.clear();
2919 return o;
2920 }
2921 ceph_tid_t mutate(
2922 const object_t& oid, const object_locator_t& oloc,
2923 ObjectOperation& op, const SnapContext& snapc,
2924 ceph::real_time mtime, int flags,
2925 Context *oncommit, version_t *objver = NULL,
2926 osd_reqid_t reqid = osd_reqid_t()) {
2927 Op *o = prepare_mutate_op(oid, oloc, op, snapc, mtime, flags,
2928 oncommit, objver, reqid);
2929 ceph_tid_t tid;
2930 op_submit(o, &tid);
2931 return tid;
2932 }
2933
2934 void mutate(const object_t& oid, const object_locator_t& oloc,
2935 ObjectOperation&& op, const SnapContext& snapc,
2936 ceph::real_time mtime, int flags,
2937 std::unique_ptr<Op::OpComp>&& oncommit,
2938 version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t(),
2939 ZTracer::Trace *parent_trace = nullptr) {
2940 Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags |
2941 CEPH_OSD_FLAG_WRITE, std::move(oncommit), objver,
2942 nullptr, parent_trace);
2943 o->priority = op.priority;
2944 o->mtime = mtime;
2945 o->snapc = snapc;
2946 o->out_bl.swap(op.out_bl);
2947 o->out_handler.swap(op.out_handler);
2948 o->out_rval.swap(op.out_rval);
2949 o->out_ec.swap(op.out_ec);
2950 o->reqid = reqid;
2951 op.clear();
2952 op_submit(o);
2953 }
2954
2955 Op *prepare_read_op(
2956 const object_t& oid, const object_locator_t& oloc,
2957 ObjectOperation& op,
2958 snapid_t snapid, ceph::buffer::list *pbl, int flags,
2959 Context *onack, version_t *objver = NULL,
2960 int *data_offset = NULL,
2961 uint64_t features = 0,
2962 ZTracer::Trace *parent_trace = nullptr) {
2963 Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags |
2964 CEPH_OSD_FLAG_READ, onack, objver,
2965 data_offset, parent_trace);
2966 o->priority = op.priority;
2967 o->snapid = snapid;
2968 o->outbl = pbl;
2969 if (!o->outbl && op.size() == 1 && op.out_bl[0] && op.out_bl[0]->length())
2970 o->outbl = op.out_bl[0];
2971 o->out_bl.swap(op.out_bl);
2972 o->out_handler.swap(op.out_handler);
2973 o->out_rval.swap(op.out_rval);
2974 o->out_ec.swap(op.out_ec);
2975 op.clear();
2976 return o;
2977 }
2978 ceph_tid_t read(
2979 const object_t& oid, const object_locator_t& oloc,
2980 ObjectOperation& op,
2981 snapid_t snapid, ceph::buffer::list *pbl, int flags,
2982 Context *onack, version_t *objver = NULL,
2983 int *data_offset = NULL,
2984 uint64_t features = 0) {
2985 Op *o = prepare_read_op(oid, oloc, op, snapid, pbl, flags, onack, objver,
2986 data_offset);
2987 if (features)
2988 o->features = features;
2989 ceph_tid_t tid;
2990 op_submit(o, &tid);
2991 return tid;
2992 }
2993
2994 void read(const object_t& oid, const object_locator_t& oloc,
2995 ObjectOperation&& op, snapid_t snapid, ceph::buffer::list *pbl,
2996 int flags, std::unique_ptr<Op::OpComp>&& onack,
2997 version_t *objver = nullptr, int *data_offset = nullptr,
2998 uint64_t features = 0, ZTracer::Trace *parent_trace = nullptr) {
2999 Op *o = new Op(oid, oloc, std::move(op.ops), flags | global_op_flags |
3000 CEPH_OSD_FLAG_READ, std::move(onack), objver,
3001 data_offset, parent_trace);
3002 o->priority = op.priority;
3003 o->snapid = snapid;
3004 o->outbl = pbl;
3005 // XXX
3006 if (!o->outbl && op.size() == 1 && op.out_bl[0] && op.out_bl[0]->length()) {
3007 o->outbl = op.out_bl[0];
3008 }
3009 o->out_bl.swap(op.out_bl);
3010 o->out_handler.swap(op.out_handler);
3011 o->out_rval.swap(op.out_rval);
3012 o->out_ec.swap(op.out_ec);
3013 if (features)
3014 o->features = features;
3015 op.clear();
3016 op_submit(o);
3017 }
3018
3019
3020 Op *prepare_pg_read_op(
3021 uint32_t hash, object_locator_t oloc,
3022 ObjectOperation& op, ceph::buffer::list *pbl, int flags,
3023 Context *onack, epoch_t *reply_epoch,
3024 int *ctx_budget) {
3025 Op *o = new Op(object_t(), oloc,
3026 std::move(op.ops),
3027 flags | global_op_flags | CEPH_OSD_FLAG_READ |
3028 CEPH_OSD_FLAG_IGNORE_OVERLAY,
3029 onack, NULL);
3030 o->target.precalc_pgid = true;
3031 o->target.base_pgid = pg_t(hash, oloc.pool);
3032 o->priority = op.priority;
3033 o->snapid = CEPH_NOSNAP;
3034 o->outbl = pbl;
3035 o->out_bl.swap(op.out_bl);
3036 o->out_handler.swap(op.out_handler);
3037 o->out_rval.swap(op.out_rval);
3038 o->out_ec.swap(op.out_ec);
3039 o->reply_epoch = reply_epoch;
3040 if (ctx_budget) {
3041 // budget is tracked by listing context
3042 o->ctx_budgeted = true;
3043 }
3044 op.clear();
3045 return o;
3046 }
3047 ceph_tid_t pg_read(
3048 uint32_t hash, object_locator_t oloc,
3049 ObjectOperation& op, ceph::buffer::list *pbl, int flags,
3050 Context *onack, epoch_t *reply_epoch,
3051 int *ctx_budget) {
3052 Op *o = prepare_pg_read_op(hash, oloc, op, pbl, flags,
3053 onack, reply_epoch, ctx_budget);
3054 ceph_tid_t tid;
3055 op_submit(o, &tid, ctx_budget);
3056 return tid;
3057 }
3058
3059 ceph_tid_t pg_read(
3060 uint32_t hash, object_locator_t oloc,
3061 ObjectOperation& op, ceph::buffer::list *pbl, int flags,
3062 std::unique_ptr<Op::OpComp>&& onack, epoch_t *reply_epoch, int *ctx_budget) {
3063 ceph_tid_t tid;
3064 Op *o = new Op(object_t(), oloc,
3065 std::move(op.ops),
3066 flags | global_op_flags | CEPH_OSD_FLAG_READ |
3067 CEPH_OSD_FLAG_IGNORE_OVERLAY,
3068 std::move(onack), nullptr);
3069 o->target.precalc_pgid = true;
3070 o->target.base_pgid = pg_t(hash, oloc.pool);
3071 o->priority = op.priority;
3072 o->snapid = CEPH_NOSNAP;
3073 o->outbl = pbl;
3074 o->out_bl.swap(op.out_bl);
3075 o->out_handler.swap(op.out_handler);
3076 o->out_rval.swap(op.out_rval);
3077 o->out_ec.swap(op.out_ec);
3078 o->reply_epoch = reply_epoch;
3079 if (ctx_budget) {
3080 // budget is tracked by listing context
3081 o->ctx_budgeted = true;
3082 }
3083 op_submit(o, &tid, ctx_budget);
3084 op.clear();
3085 return tid;
3086 }
3087
3088 // caller owns a ref
3089 LingerOp *linger_register(const object_t& oid, const object_locator_t& oloc,
3090 int flags);
3091 ceph_tid_t linger_watch(LingerOp *info,
3092 ObjectOperation& op,
3093 const SnapContext& snapc, ceph::real_time mtime,
3094 ceph::buffer::list& inbl,
3095 decltype(info->on_reg_commit)&& oncommit,
3096 version_t *objver);
3097 ceph_tid_t linger_watch(LingerOp *info,
3098 ObjectOperation& op,
3099 const SnapContext& snapc, ceph::real_time mtime,
3100 ceph::buffer::list& inbl,
3101 Context* onfinish,
3102 version_t *objver) {
3103 return linger_watch(info, op, snapc, mtime, inbl,
3104 OpContextVert<ceph::buffer::list>(onfinish, nullptr), objver);
3105 }
3106 ceph_tid_t linger_notify(LingerOp *info,
3107 ObjectOperation& op,
3108 snapid_t snap, ceph::buffer::list& inbl,
3109 decltype(LingerOp::on_reg_commit)&& onfinish,
3110 version_t *objver);
3111 ceph_tid_t linger_notify(LingerOp *info,
3112 ObjectOperation& op,
3113 snapid_t snap, ceph::buffer::list& inbl,
3114 ceph::buffer::list *poutbl,
3115 Context* onack,
3116 version_t *objver) {
3117 return linger_notify(info, op, snap, inbl,
3118 OpContextVert(onack, poutbl),
3119 objver);
3120 }
3121 tl::expected<ceph::timespan,
3122 boost::system::error_code> linger_check(LingerOp *info);
3123 void linger_cancel(LingerOp *info); // releases a reference
3124 void _linger_cancel(LingerOp *info);
3125
3126 void _do_watch_notify(boost::intrusive_ptr<LingerOp> info,
3127 boost::intrusive_ptr<MWatchNotify> m);
3128
3129 /**
3130 * set up initial ops in the op std::vector, and allocate a final op slot.
3131 *
3132 * The caller is responsible for filling in the final ops_count ops.
3133 *
3134 * @param ops op std::vector
3135 * @param ops_count number of final ops the caller will fill in
3136 * @param extra_ops pointer to [array of] initial op[s]
3137 * @return index of final op (for caller to fill in)
3138 */
3139 int init_ops(boost::container::small_vector_base<OSDOp>& ops, int ops_count,
3140 ObjectOperation *extra_ops) {
3141 int i;
3142 int extra = 0;
3143
3144 if (extra_ops)
3145 extra = extra_ops->ops.size();
3146
3147 ops.resize(ops_count + extra);
3148
3149 for (i=0; i<extra; i++) {
3150 ops[i] = extra_ops->ops[i];
3151 }
3152
3153 return i;
3154 }
3155
3156
3157 // high-level helpers
3158 Op *prepare_stat_op(
3159 const object_t& oid, const object_locator_t& oloc,
3160 snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
3161 int flags, Context *onfinish, version_t *objver = NULL,
3162 ObjectOperation *extra_ops = NULL) {
3163 osdc_opvec ops;
3164 int i = init_ops(ops, 1, extra_ops);
3165 ops[i].op.op = CEPH_OSD_OP_STAT;
3166 C_Stat *fin = new C_Stat(psize, pmtime, onfinish);
3167 Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
3168 CEPH_OSD_FLAG_READ, fin, objver);
3169 o->snapid = snap;
3170 o->outbl = &fin->bl;
3171 return o;
3172 }
3173 ceph_tid_t stat(
3174 const object_t& oid, const object_locator_t& oloc,
3175 snapid_t snap, uint64_t *psize, ceph::real_time *pmtime,
3176 int flags, Context *onfinish, version_t *objver = NULL,
3177 ObjectOperation *extra_ops = NULL) {
3178 Op *o = prepare_stat_op(oid, oloc, snap, psize, pmtime, flags,
3179 onfinish, objver, extra_ops);
3180 ceph_tid_t tid;
3181 op_submit(o, &tid);
3182 return tid;
3183 }
3184
3185 Op *prepare_read_op(
3186 const object_t& oid, const object_locator_t& oloc,
3187 uint64_t off, uint64_t len, snapid_t snap, ceph::buffer::list *pbl,
3188 int flags, Context *onfinish, version_t *objver = NULL,
3189 ObjectOperation *extra_ops = NULL, int op_flags = 0,
3190 ZTracer::Trace *parent_trace = nullptr) {
3191 osdc_opvec ops;
3192 int i = init_ops(ops, 1, extra_ops);
3193 ops[i].op.op = CEPH_OSD_OP_READ;
3194 ops[i].op.extent.offset = off;
3195 ops[i].op.extent.length = len;
3196 ops[i].op.extent.truncate_size = 0;
3197 ops[i].op.extent.truncate_seq = 0;
3198 ops[i].op.flags = op_flags;
3199 Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
3200 CEPH_OSD_FLAG_READ, onfinish, objver,
3201 nullptr, parent_trace);
3202 o->snapid = snap;
3203 o->outbl = pbl;
3204 return o;
3205 }
3206 ceph_tid_t read(
3207 const object_t& oid, const object_locator_t& oloc,
3208 uint64_t off, uint64_t len, snapid_t snap, ceph::buffer::list *pbl,
3209 int flags, Context *onfinish, version_t *objver = NULL,
3210 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
3211 Op *o = prepare_read_op(oid, oloc, off, len, snap, pbl, flags,
3212 onfinish, objver, extra_ops, op_flags);
3213 ceph_tid_t tid;
3214 op_submit(o, &tid);
3215 return tid;
3216 }
3217
3218 Op *prepare_cmpext_op(
3219 const object_t& oid, const object_locator_t& oloc,
3220 uint64_t off, ceph::buffer::list &cmp_bl,
3221 snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
3222 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
3223 osdc_opvec ops;
3224 int i = init_ops(ops, 1, extra_ops);
3225 ops[i].op.op = CEPH_OSD_OP_CMPEXT;
3226 ops[i].op.extent.offset = off;
3227 ops[i].op.extent.length = cmp_bl.length();
3228 ops[i].op.extent.truncate_size = 0;
3229 ops[i].op.extent.truncate_seq = 0;
3230 ops[i].indata = cmp_bl;
3231 ops[i].op.flags = op_flags;
3232 Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
3233 CEPH_OSD_FLAG_READ, onfinish, objver);
3234 o->snapid = snap;
3235 return o;
3236 }
3237
3238 ceph_tid_t cmpext(
3239 const object_t& oid, const object_locator_t& oloc,
3240 uint64_t off, ceph::buffer::list &cmp_bl,
3241 snapid_t snap, int flags, Context *onfinish, version_t *objver = NULL,
3242 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
3243 Op *o = prepare_cmpext_op(oid, oloc, off, cmp_bl, snap,
3244 flags, onfinish, objver, extra_ops, op_flags);
3245 ceph_tid_t tid;
3246 op_submit(o, &tid);
3247 return tid;
3248 }
3249
3250 ceph_tid_t read_trunc(const object_t& oid, const object_locator_t& oloc,
3251 uint64_t off, uint64_t len, snapid_t snap,
3252 ceph::buffer::list *pbl, int flags, uint64_t trunc_size,
3253 __u32 trunc_seq, Context *onfinish,
3254 version_t *objver = NULL,
3255 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
3256 osdc_opvec ops;
3257 int i = init_ops(ops, 1, extra_ops);
3258 ops[i].op.op = CEPH_OSD_OP_READ;
3259 ops[i].op.extent.offset = off;
3260 ops[i].op.extent.length = len;
3261 ops[i].op.extent.truncate_size = trunc_size;
3262 ops[i].op.extent.truncate_seq = trunc_seq;
3263 ops[i].op.flags = op_flags;
3264 Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
3265 CEPH_OSD_FLAG_READ, onfinish, objver);
3266 o->snapid = snap;
3267 o->outbl = pbl;
3268 ceph_tid_t tid;
3269 op_submit(o, &tid);
3270 return tid;
3271 }
3272 ceph_tid_t mapext(const object_t& oid, const object_locator_t& oloc,
3273 uint64_t off, uint64_t len, snapid_t snap, ceph::buffer::list *pbl,
3274 int flags, Context *onfinish, version_t *objver = NULL,
3275 ObjectOperation *extra_ops = NULL) {
3276 osdc_opvec ops;
3277 int i = init_ops(ops, 1, extra_ops);
3278 ops[i].op.op = CEPH_OSD_OP_MAPEXT;
3279 ops[i].op.extent.offset = off;
3280 ops[i].op.extent.length = len;
3281 ops[i].op.extent.truncate_size = 0;
3282 ops[i].op.extent.truncate_seq = 0;
3283 Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
3284 CEPH_OSD_FLAG_READ, onfinish, objver);
3285 o->snapid = snap;
3286 o->outbl = pbl;
3287 ceph_tid_t tid;
3288 op_submit(o, &tid);
3289 return tid;
3290 }
3291 ceph_tid_t getxattr(const object_t& oid, const object_locator_t& oloc,
3292 const char *name, snapid_t snap, ceph::buffer::list *pbl, int flags,
3293 Context *onfinish,
3294 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
3295 osdc_opvec ops;
3296 int i = init_ops(ops, 1, extra_ops);
3297 ops[i].op.op = CEPH_OSD_OP_GETXATTR;
3298 ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
3299 ops[i].op.xattr.value_len = 0;
3300 if (name)
3301 ops[i].indata.append(name, ops[i].op.xattr.name_len);
3302 Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
3303 CEPH_OSD_FLAG_READ, onfinish, objver);
3304 o->snapid = snap;
3305 o->outbl = pbl;
3306 ceph_tid_t tid;
3307 op_submit(o, &tid);
3308 return tid;
3309 }
3310
3311 ceph_tid_t getxattrs(const object_t& oid, const object_locator_t& oloc,
3312 snapid_t snap, std::map<std::string,ceph::buffer::list>& attrset,
3313 int flags, Context *onfinish, version_t *objver = NULL,
3314 ObjectOperation *extra_ops = NULL) {
3315 osdc_opvec ops;
3316 int i = init_ops(ops, 1, extra_ops);
3317 ops[i].op.op = CEPH_OSD_OP_GETXATTRS;
3318 C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish);
3319 Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
3320 CEPH_OSD_FLAG_READ, fin, objver);
3321 o->snapid = snap;
3322 o->outbl = &fin->bl;
3323 ceph_tid_t tid;
3324 op_submit(o, &tid);
3325 return tid;
3326 }
3327
3328 ceph_tid_t read_full(const object_t& oid, const object_locator_t& oloc,
3329 snapid_t snap, ceph::buffer::list *pbl, int flags,
3330 Context *onfinish, version_t *objver = NULL,
3331 ObjectOperation *extra_ops = NULL) {
3332 return read(oid, oloc, 0, 0, snap, pbl, flags | global_op_flags |
3333 CEPH_OSD_FLAG_READ, onfinish, objver, extra_ops);
3334 }
3335
3336
3337 // writes
3338 ceph_tid_t _modify(const object_t& oid, const object_locator_t& oloc,
3339 osdc_opvec& ops,
3340 ceph::real_time mtime,
3341 const SnapContext& snapc, int flags,
3342 Context *oncommit,
3343 version_t *objver = NULL) {
3344 Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
3345 CEPH_OSD_FLAG_WRITE, oncommit, objver);
3346 o->mtime = mtime;
3347 o->snapc = snapc;
3348 ceph_tid_t tid;
3349 op_submit(o, &tid);
3350 return tid;
3351 }
3352 Op *prepare_write_op(
3353 const object_t& oid, const object_locator_t& oloc,
3354 uint64_t off, uint64_t len, const SnapContext& snapc,
3355 const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
3356 Context *oncommit, version_t *objver = NULL,
3357 ObjectOperation *extra_ops = NULL, int op_flags = 0,
3358 ZTracer::Trace *parent_trace = nullptr) {
3359 osdc_opvec ops;
3360 int i = init_ops(ops, 1, extra_ops);
3361 ops[i].op.op = CEPH_OSD_OP_WRITE;
3362 ops[i].op.extent.offset = off;
3363 ops[i].op.extent.length = len;
3364 ops[i].op.extent.truncate_size = 0;
3365 ops[i].op.extent.truncate_seq = 0;
3366 ops[i].indata = bl;
3367 ops[i].op.flags = op_flags;
3368 Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
3369 CEPH_OSD_FLAG_WRITE, std::move(oncommit), objver,
3370 nullptr, parent_trace);
3371 o->mtime = mtime;
3372 o->snapc = snapc;
3373 return o;
3374 }
3375 ceph_tid_t write(
3376 const object_t& oid, const object_locator_t& oloc,
3377 uint64_t off, uint64_t len, const SnapContext& snapc,
3378 const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
3379 Context *oncommit, version_t *objver = NULL,
3380 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
3381 Op *o = prepare_write_op(oid, oloc, off, len, snapc, bl, mtime, flags,
3382 oncommit, objver, extra_ops, op_flags);
3383 ceph_tid_t tid;
3384 op_submit(o, &tid);
3385 return tid;
3386 }
3387 Op *prepare_append_op(
3388 const object_t& oid, const object_locator_t& oloc,
3389 uint64_t len, const SnapContext& snapc,
3390 const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
3391 Context *oncommit,
3392 version_t *objver = NULL,
3393 ObjectOperation *extra_ops = NULL) {
3394 osdc_opvec ops;
3395 int i = init_ops(ops, 1, extra_ops);
3396 ops[i].op.op = CEPH_OSD_OP_APPEND;
3397 ops[i].op.extent.offset = 0;
3398 ops[i].op.extent.length = len;
3399 ops[i].op.extent.truncate_size = 0;
3400 ops[i].op.extent.truncate_seq = 0;
3401 ops[i].indata = bl;
3402 Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
3403 CEPH_OSD_FLAG_WRITE, oncommit, objver);
3404 o->mtime = mtime;
3405 o->snapc = snapc;
3406 return o;
3407 }
3408 ceph_tid_t append(
3409 const object_t& oid, const object_locator_t& oloc,
3410 uint64_t len, const SnapContext& snapc,
3411 const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
3412 Context *oncommit,
3413 version_t *objver = NULL,
3414 ObjectOperation *extra_ops = NULL) {
3415 Op *o = prepare_append_op(oid, oloc, len, snapc, bl, mtime, flags,
3416 oncommit, objver, extra_ops);
3417 ceph_tid_t tid;
3418 op_submit(o, &tid);
3419 return tid;
3420 }
3421 ceph_tid_t write_trunc(const object_t& oid, const object_locator_t& oloc,
3422 uint64_t off, uint64_t len, const SnapContext& snapc,
3423 const ceph::buffer::list &bl, ceph::real_time mtime, int flags,
3424 uint64_t trunc_size, __u32 trunc_seq,
3425 Context *oncommit,
3426 version_t *objver = NULL,
3427 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
3428 osdc_opvec ops;
3429 int i = init_ops(ops, 1, extra_ops);
3430 ops[i].op.op = CEPH_OSD_OP_WRITE;
3431 ops[i].op.extent.offset = off;
3432 ops[i].op.extent.length = len;
3433 ops[i].op.extent.truncate_size = trunc_size;
3434 ops[i].op.extent.truncate_seq = trunc_seq;
3435 ops[i].indata = bl;
3436 ops[i].op.flags = op_flags;
3437 Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
3438 CEPH_OSD_FLAG_WRITE, oncommit, objver);
3439 o->mtime = mtime;
3440 o->snapc = snapc;
3441 ceph_tid_t tid;
3442 op_submit(o, &tid);
3443 return tid;
3444 }
3445 Op *prepare_write_full_op(
3446 const object_t& oid, const object_locator_t& oloc,
3447 const SnapContext& snapc, const ceph::buffer::list &bl,
3448 ceph::real_time mtime, int flags,
3449 Context *oncommit, version_t *objver = NULL,
3450 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
3451 osdc_opvec ops;
3452 int i = init_ops(ops, 1, extra_ops);
3453 ops[i].op.op = CEPH_OSD_OP_WRITEFULL;
3454 ops[i].op.extent.offset = 0;
3455 ops[i].op.extent.length = bl.length();
3456 ops[i].indata = bl;
3457 ops[i].op.flags = op_flags;
3458 Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
3459 CEPH_OSD_FLAG_WRITE, oncommit, objver);
3460 o->mtime = mtime;
3461 o->snapc = snapc;
3462 return o;
3463 }
3464 ceph_tid_t write_full(
3465 const object_t& oid, const object_locator_t& oloc,
3466 const SnapContext& snapc, const ceph::buffer::list &bl,
3467 ceph::real_time mtime, int flags,
3468 Context *oncommit, version_t *objver = NULL,
3469 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
3470 Op *o = prepare_write_full_op(oid, oloc, snapc, bl, mtime, flags,
3471 oncommit, objver, extra_ops, op_flags);
3472 ceph_tid_t tid;
3473 op_submit(o, &tid);
3474 return tid;
3475 }
3476 Op *prepare_writesame_op(
3477 const object_t& oid, const object_locator_t& oloc,
3478 uint64_t write_len, uint64_t off,
3479 const SnapContext& snapc, const ceph::buffer::list &bl,
3480 ceph::real_time mtime, int flags,
3481 Context *oncommit, version_t *objver = NULL,
3482 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
3483
3484 osdc_opvec ops;
3485 int i = init_ops(ops, 1, extra_ops);
3486 ops[i].op.op = CEPH_OSD_OP_WRITESAME;
3487 ops[i].op.writesame.offset = off;
3488 ops[i].op.writesame.length = write_len;
3489 ops[i].op.writesame.data_length = bl.length();
3490 ops[i].indata = bl;
3491 ops[i].op.flags = op_flags;
3492 Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
3493 CEPH_OSD_FLAG_WRITE, oncommit, objver);
3494 o->mtime = mtime;
3495 o->snapc = snapc;
3496 return o;
3497 }
3498 ceph_tid_t writesame(
3499 const object_t& oid, const object_locator_t& oloc,
3500 uint64_t write_len, uint64_t off,
3501 const SnapContext& snapc, const ceph::buffer::list &bl,
3502 ceph::real_time mtime, int flags,
3503 Context *oncommit, version_t *objver = NULL,
3504 ObjectOperation *extra_ops = NULL, int op_flags = 0) {
3505
3506 Op *o = prepare_writesame_op(oid, oloc, write_len, off, snapc, bl,
3507 mtime, flags, oncommit, objver,
3508 extra_ops, op_flags);
3509
3510 ceph_tid_t tid;
3511 op_submit(o, &tid);
3512 return tid;
3513 }
3514 ceph_tid_t trunc(const object_t& oid, const object_locator_t& oloc,
3515 const SnapContext& snapc, ceph::real_time mtime, int flags,
3516 uint64_t trunc_size, __u32 trunc_seq,
3517 Context *oncommit, version_t *objver = NULL,
3518 ObjectOperation *extra_ops = NULL) {
3519 osdc_opvec ops;
3520 int i = init_ops(ops, 1, extra_ops);
3521 ops[i].op.op = CEPH_OSD_OP_TRUNCATE;
3522 ops[i].op.extent.offset = trunc_size;
3523 ops[i].op.extent.truncate_size = trunc_size;
3524 ops[i].op.extent.truncate_seq = trunc_seq;
3525 Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
3526 CEPH_OSD_FLAG_WRITE, oncommit, objver);
3527 o->mtime = mtime;
3528 o->snapc = snapc;
3529 ceph_tid_t tid;
3530 op_submit(o, &tid);
3531 return tid;
3532 }
3533 ceph_tid_t zero(const object_t& oid, const object_locator_t& oloc,
3534 uint64_t off, uint64_t len, const SnapContext& snapc,
3535 ceph::real_time mtime, int flags, Context *oncommit,
3536 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
3537 osdc_opvec ops;
3538 int i = init_ops(ops, 1, extra_ops);
3539 ops[i].op.op = CEPH_OSD_OP_ZERO;
3540 ops[i].op.extent.offset = off;
3541 ops[i].op.extent.length = len;
3542 Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
3543 CEPH_OSD_FLAG_WRITE, oncommit, objver);
3544 o->mtime = mtime;
3545 o->snapc = snapc;
3546 ceph_tid_t tid;
3547 op_submit(o, &tid);
3548 return tid;
3549 }
3550 ceph_tid_t rollback_object(const object_t& oid, const object_locator_t& oloc,
3551 const SnapContext& snapc, snapid_t snapid,
3552 ceph::real_time mtime, Context *oncommit,
3553 version_t *objver = NULL,
3554 ObjectOperation *extra_ops = NULL) {
3555 osdc_opvec ops;
3556 int i = init_ops(ops, 1, extra_ops);
3557 ops[i].op.op = CEPH_OSD_OP_ROLLBACK;
3558 ops[i].op.snap.snapid = snapid;
3559 Op *o = new Op(oid, oloc, std::move(ops), CEPH_OSD_FLAG_WRITE, oncommit, objver);
3560 o->mtime = mtime;
3561 o->snapc = snapc;
3562 ceph_tid_t tid;
3563 op_submit(o, &tid);
3564 return tid;
3565 }
3566 ceph_tid_t create(const object_t& oid, const object_locator_t& oloc,
3567 const SnapContext& snapc, ceph::real_time mtime, int global_flags,
3568 int create_flags, Context *oncommit,
3569 version_t *objver = NULL,
3570 ObjectOperation *extra_ops = NULL) {
3571 osdc_opvec ops;
3572 int i = init_ops(ops, 1, extra_ops);
3573 ops[i].op.op = CEPH_OSD_OP_CREATE;
3574 ops[i].op.flags = create_flags;
3575 Op *o = new Op(oid, oloc, std::move(ops), global_flags | global_op_flags |
3576 CEPH_OSD_FLAG_WRITE, oncommit, objver);
3577 o->mtime = mtime;
3578 o->snapc = snapc;
3579 ceph_tid_t tid;
3580 op_submit(o, &tid);
3581 return tid;
3582 }
3583 Op *prepare_remove_op(
3584 const object_t& oid, const object_locator_t& oloc,
3585 const SnapContext& snapc, ceph::real_time mtime, int flags,
3586 Context *oncommit,
3587 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
3588 osdc_opvec ops;
3589 int i = init_ops(ops, 1, extra_ops);
3590 ops[i].op.op = CEPH_OSD_OP_DELETE;
3591 Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
3592 CEPH_OSD_FLAG_WRITE, oncommit, objver);
3593 o->mtime = mtime;
3594 o->snapc = snapc;
3595 return o;
3596 }
3597 ceph_tid_t remove(
3598 const object_t& oid, const object_locator_t& oloc,
3599 const SnapContext& snapc, ceph::real_time mtime, int flags,
3600 Context *oncommit,
3601 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
3602 Op *o = prepare_remove_op(oid, oloc, snapc, mtime, flags,
3603 oncommit, objver, extra_ops);
3604 ceph_tid_t tid;
3605 op_submit(o, &tid);
3606 return tid;
3607 }
3608
3609 ceph_tid_t setxattr(const object_t& oid, const object_locator_t& oloc,
3610 const char *name, const SnapContext& snapc, const ceph::buffer::list &bl,
3611 ceph::real_time mtime, int flags,
3612 Context *oncommit,
3613 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
3614 osdc_opvec ops;
3615 int i = init_ops(ops, 1, extra_ops);
3616 ops[i].op.op = CEPH_OSD_OP_SETXATTR;
3617 ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
3618 ops[i].op.xattr.value_len = bl.length();
3619 if (name)
3620 ops[i].indata.append(name, ops[i].op.xattr.name_len);
3621 ops[i].indata.append(bl);
3622 Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
3623 CEPH_OSD_FLAG_WRITE, oncommit,
3624 objver);
3625 o->mtime = mtime;
3626 o->snapc = snapc;
3627 ceph_tid_t tid;
3628 op_submit(o, &tid);
3629 return tid;
3630 }
3631 ceph_tid_t removexattr(const object_t& oid, const object_locator_t& oloc,
3632 const char *name, const SnapContext& snapc,
3633 ceph::real_time mtime, int flags,
3634 Context *oncommit,
3635 version_t *objver = NULL, ObjectOperation *extra_ops = NULL) {
3636 osdc_opvec ops;
3637 int i = init_ops(ops, 1, extra_ops);
3638 ops[i].op.op = CEPH_OSD_OP_RMXATTR;
3639 ops[i].op.xattr.name_len = (name ? strlen(name) : 0);
3640 ops[i].op.xattr.value_len = 0;
3641 if (name)
3642 ops[i].indata.append(name, ops[i].op.xattr.name_len);
3643 Op *o = new Op(oid, oloc, std::move(ops), flags | global_op_flags |
3644 CEPH_OSD_FLAG_WRITE, oncommit, objver);
3645 o->mtime = mtime;
3646 o->snapc = snapc;
3647 ceph_tid_t tid;
3648 op_submit(o, &tid);
3649 return tid;
3650 }
3651
3652 void list_nobjects(NListContext *p, Context *onfinish);
3653 uint32_t list_nobjects_seek(NListContext *p, uint32_t pos);
3654 uint32_t list_nobjects_seek(NListContext *list_context, const hobject_t& c);
3655 void list_nobjects_get_cursor(NListContext *list_context, hobject_t *c);
3656
3657 hobject_t enumerate_objects_begin();
3658 hobject_t enumerate_objects_end();
3659
3660 template<typename T>
3661 friend struct EnumerationContext;
3662 template<typename T>
3663 friend struct CB_EnumerateReply;
3664 template<typename T>
3665 void enumerate_objects(
3666 int64_t pool_id,
3667 std::string_view ns,
3668 hobject_t start,
3669 hobject_t end,
3670 const uint32_t max,
3671 const ceph::buffer::list& filter_bl,
3672 fu2::unique_function<void(boost::system::error_code,
3673 std::vector<T>,
3674 hobject_t) &&> on_finish);
3675 template<typename T>
3676 void _issue_enumerate(hobject_t start,
3677 std::unique_ptr<EnumerationContext<T>>);
3678 template<typename T>
3679 void _enumerate_reply(
3680 ceph::buffer::list&& bl,
3681 boost::system::error_code ec,
3682 std::unique_ptr<EnumerationContext<T>>&& ectx);
3683
3684 // -------------------------
3685 // pool ops
3686 private:
3687 void pool_op_submit(PoolOp *op);
3688 void _pool_op_submit(PoolOp *op);
3689 void _finish_pool_op(PoolOp *op, int r);
3690 void _do_delete_pool(int64_t pool,
3691 decltype(PoolOp::onfinish)&& onfinish);
3692
3693 public:
3694 void create_pool_snap(int64_t pool, std::string_view snapName,
3695 decltype(PoolOp::onfinish)&& onfinish);
3696 void create_pool_snap(int64_t pool, std::string_view snapName,
3697 Context* c) {
3698 create_pool_snap(pool, snapName,
3699 OpContextVert<ceph::buffer::list>(c, nullptr));
3700 }
3701 void allocate_selfmanaged_snap(int64_t pool,
3702 std::unique_ptr<ceph::async::Completion<
3703 void(boost::system::error_code,
3704 snapid_t)>> onfinish);
3705 void allocate_selfmanaged_snap(int64_t pool, snapid_t* psnapid,
3706 Context* c) {
3707 allocate_selfmanaged_snap(pool,
3708 OpContextVert(c, psnapid));
3709 }
3710 void delete_pool_snap(int64_t pool, std::string_view snapName,
3711 decltype(PoolOp::onfinish)&& onfinish);
3712 void delete_pool_snap(int64_t pool, std::string_view snapName,
3713 Context* c) {
3714 delete_pool_snap(pool, snapName,
3715 OpContextVert<ceph::buffer::list>(c, nullptr));
3716 }
3717
3718 void delete_selfmanaged_snap(int64_t pool, snapid_t snap,
3719 decltype(PoolOp::onfinish)&& onfinish);
3720 void delete_selfmanaged_snap(int64_t pool, snapid_t snap,
3721 Context* c) {
3722 delete_selfmanaged_snap(pool, snap,
3723 OpContextVert<ceph::buffer::list>(c, nullptr));
3724 }
3725
3726
3727 void create_pool(std::string_view name,
3728 decltype(PoolOp::onfinish)&& onfinish,
3729 int crush_rule=-1);
3730 void create_pool(std::string_view name, Context *onfinish,
3731 int crush_rule=-1) {
3732 create_pool(name,
3733 OpContextVert<ceph::buffer::list>(onfinish, nullptr),
3734 crush_rule);
3735 }
3736 void delete_pool(int64_t pool,
3737 decltype(PoolOp::onfinish)&& onfinish);
3738 void delete_pool(int64_t pool,
3739 Context* onfinish) {
3740 delete_pool(pool, OpContextVert<ceph::buffer::list>(onfinish, nullptr));
3741 }
3742
3743 void delete_pool(std::string_view name,
3744 decltype(PoolOp::onfinish)&& onfinish);
3745
3746 void delete_pool(std::string_view name,
3747 Context* onfinish) {
3748 delete_pool(name, OpContextVert<ceph::buffer::list>(onfinish, nullptr));
3749 }
3750
3751 void handle_pool_op_reply(MPoolOpReply *m);
3752 int pool_op_cancel(ceph_tid_t tid, int r);
3753
3754 // --------------------------
3755 // pool stats
3756 private:
3757 void _poolstat_submit(PoolStatOp *op);
3758 public:
3759 void handle_get_pool_stats_reply(MGetPoolStatsReply *m);
3760 void get_pool_stats(const std::vector<std::string>& pools,
3761 decltype(PoolStatOp::onfinish)&& onfinish);
3762 template<typename CompletionToken>
3763 auto get_pool_stats(const std::vector<std::string>& pools,
3764 CompletionToken&& token) {
3765 boost::asio::async_completion<CompletionToken,
3766 PoolStatOp::OpSig> init(token);
3767 get_pool_stats(pools,
3768 PoolStatOp::OpComp::create(
3769 service.get_executor(),
3770 std::move(init.completion_handler)));
3771 return init.result.get();
3772 }
3773 int pool_stat_op_cancel(ceph_tid_t tid, int r);
3774 void _finish_pool_stat_op(PoolStatOp *op, int r);
3775
3776 // ---------------------------
3777 // df stats
3778 private:
3779 void _fs_stats_submit(StatfsOp *op);
3780 public:
3781 void handle_fs_stats_reply(MStatfsReply *m);
3782 void get_fs_stats(boost::optional<int64_t> poolid,
3783 decltype(StatfsOp::onfinish)&& onfinish);
3784 template<typename CompletionToken>
3785 auto get_fs_stats(boost::optional<int64_t> poolid,
3786 CompletionToken&& token) {
3787 boost::asio::async_completion<CompletionToken, StatfsOp::OpSig> init(token);
3788 get_fs_stats(poolid,
3789 StatfsOp::OpComp::create(service.get_executor(),
3790 std::move(init.completion_handler)));
3791 return init.result.get();
3792 }
3793 void get_fs_stats(struct ceph_statfs& result, boost::optional<int64_t> poolid,
3794 Context *onfinish) {
3795 get_fs_stats(poolid, OpContextVert(onfinish, result));
3796 }
3797 int statfs_op_cancel(ceph_tid_t tid, int r);
3798 void _finish_statfs_op(StatfsOp *op, int r);
3799
3800 // ---------------------------
3801 // some scatter/gather hackery
3802
3803 void _sg_read_finish(std::vector<ObjectExtent>& extents,
3804 std::vector<ceph::buffer::list>& resultbl,
3805 ceph::buffer::list *bl, Context *onfinish);
3806
3807 struct C_SGRead : public Context {
3808 Objecter *objecter;
3809 std::vector<ObjectExtent> extents;
3810 std::vector<ceph::buffer::list> resultbl;
3811 ceph::buffer::list *bl;
3812 Context *onfinish;
3813 C_SGRead(Objecter *ob,
3814 std::vector<ObjectExtent>& e, std::vector<ceph::buffer::list>& r, ceph::buffer::list *b,
3815 Context *c) :
3816 objecter(ob), bl(b), onfinish(c) {
3817 extents.swap(e);
3818 resultbl.swap(r);
3819 }
3820 void finish(int r) override {
3821 objecter->_sg_read_finish(extents, resultbl, bl, onfinish);
3822 }
3823 };
3824
3825 void sg_read_trunc(std::vector<ObjectExtent>& extents, snapid_t snap,
3826 ceph::buffer::list *bl, int flags, uint64_t trunc_size,
3827 __u32 trunc_seq, Context *onfinish, int op_flags = 0) {
3828 if (extents.size() == 1) {
3829 read_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
3830 extents[0].length, snap, bl, flags, extents[0].truncate_size,
3831 trunc_seq, onfinish, 0, 0, op_flags);
3832 } else {
3833 C_GatherBuilder gather(cct);
3834 std::vector<ceph::buffer::list> resultbl(extents.size());
3835 int i=0;
3836 for (auto p = extents.begin(); p != extents.end(); ++p) {
3837 read_trunc(p->oid, p->oloc, p->offset, p->length, snap, &resultbl[i++],
3838 flags, p->truncate_size, trunc_seq, gather.new_sub(),
3839 0, 0, op_flags);
3840 }
3841 gather.set_finisher(new C_SGRead(this, extents, resultbl, bl, onfinish));
3842 gather.activate();
3843 }
3844 }
3845
3846 void sg_read(std::vector<ObjectExtent>& extents, snapid_t snap, ceph::buffer::list *bl,
3847 int flags, Context *onfinish, int op_flags = 0) {
3848 sg_read_trunc(extents, snap, bl, flags, 0, 0, onfinish, op_flags);
3849 }
3850
3851 void sg_write_trunc(std::vector<ObjectExtent>& extents, const SnapContext& snapc,
3852 const ceph::buffer::list& bl, ceph::real_time mtime, int flags,
3853 uint64_t trunc_size, __u32 trunc_seq,
3854 Context *oncommit, int op_flags = 0) {
3855 if (extents.size() == 1) {
3856 write_trunc(extents[0].oid, extents[0].oloc, extents[0].offset,
3857 extents[0].length, snapc, bl, mtime, flags,
3858 extents[0].truncate_size, trunc_seq, oncommit,
3859 0, 0, op_flags);
3860 } else {
3861 C_GatherBuilder gcom(cct, oncommit);
3862 auto it = bl.cbegin();
3863 for (auto p = extents.begin(); p != extents.end(); ++p) {
3864 ceph::buffer::list cur;
3865 for (auto bit = p->buffer_extents.begin();
3866 bit != p->buffer_extents.end();
3867 ++bit) {
3868 if (it.get_off() != bit->first) {
3869 it.seek(bit->first);
3870 }
3871 it.copy(bit->second, cur);
3872 }
3873 ceph_assert(cur.length() == p->length);
3874 write_trunc(p->oid, p->oloc, p->offset, p->length,
3875 snapc, cur, mtime, flags, p->truncate_size, trunc_seq,
3876 oncommit ? gcom.new_sub():0,
3877 0, 0, op_flags);
3878 }
3879 gcom.activate();
3880 }
3881 }
3882
3883 void sg_write(std::vector<ObjectExtent>& extents, const SnapContext& snapc,
3884 const ceph::buffer::list& bl, ceph::real_time mtime, int flags,
3885 Context *oncommit, int op_flags = 0) {
3886 sg_write_trunc(extents, snapc, bl, mtime, flags, 0, 0, oncommit,
3887 op_flags);
3888 }
3889
3890 void ms_handle_connect(Connection *con) override;
3891 bool ms_handle_reset(Connection *con) override;
3892 void ms_handle_remote_reset(Connection *con) override;
3893 bool ms_handle_refused(Connection *con) override;
3894
3895 void blocklist_self(bool set);
3896
3897 private:
3898 epoch_t epoch_barrier = 0;
3899 bool retry_writes_after_first_reply =
3900 cct->_conf->objecter_retry_writes_after_first_reply;
3901
3902 public:
3903 void set_epoch_barrier(epoch_t epoch);
3904
3905 PerfCounters *get_logger() {
3906 return logger;
3907 }
3908 };
3909
3910 #endif