1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_OBJECTER_H
16 #define CEPH_OBJECTER_H
18 #include <condition_variable>
25 #include <string_view>
26 #include <type_traits>
29 #include <boost/container/small_vector.hpp>
30 #include <boost/asio.hpp>
32 #include <fmt/format.h>
34 #include "include/buffer.h"
35 #include "include/ceph_assert.h"
36 #include "include/ceph_fs.h"
37 #include "include/common_fwd.h"
38 #include "include/expected.hpp"
39 #include "include/types.h"
40 #include "include/rados/rados_types.hpp"
41 #include "include/function2.hpp"
42 #include "include/neorados/RADOS_Decodable.hpp"
44 #include "common/admin_socket.h"
45 #include "common/async/completion.h"
46 #include "common/ceph_time.h"
47 #include "common/ceph_mutex.h"
48 #include "common/ceph_timer.h"
49 #include "common/config_obs.h"
50 #include "common/shunique_lock.h"
51 #include "common/zipkin_trace.h"
52 #include "common/Throttle.h"
54 #include "mon/MonClient.h"
56 #include "messages/MOSDOp.h"
57 #include "msg/Dispatcher.h"
59 #include "osd/OSDMap.h"
68 class MGetPoolStatsReply
;
73 struct EnumerationContext
;
75 struct CB_EnumerateReply
;
77 inline constexpr std::size_t osdc_opvec_len
= 2;
78 using osdc_opvec
= boost::container::small_vector
<OSDOp
, osdc_opvec_len
>;
80 // -----------------------------------------
82 struct ObjectOperation
{
87 boost::container::small_vector
<ceph::buffer::list
*, osdc_opvec_len
> out_bl
;
88 boost::container::small_vector
<
89 fu2::unique_function
<void(boost::system::error_code
, int,
90 const ceph::buffer::list
& bl
) &&>,
91 osdc_opvec_len
> out_handler
;
92 boost::container::small_vector
<int*, osdc_opvec_len
> out_rval
;
93 boost::container::small_vector
<boost::system::error_code
*,
94 osdc_opvec_len
> out_ec
;
96 ObjectOperation() = default;
97 ObjectOperation(const ObjectOperation
&) = delete;
98 ObjectOperation
& operator =(const ObjectOperation
&) = delete;
99 ObjectOperation(ObjectOperation
&&) = default;
100 ObjectOperation
& operator =(ObjectOperation
&&) = default;
101 ~ObjectOperation() = default;
103 size_t size() const {
117 void set_last_op_flags(int flags
) {
118 ceph_assert(!ops
.empty());
119 ops
.rbegin()->op
.flags
= flags
;
123 void set_handler(fu2::unique_function
<void(boost::system::error_code
, int,
124 const ceph::buffer::list
&) &&> f
) {
126 if (out_handler
.back()) {
127 // This happens seldom enough that we may as well keep folding
128 // functions together when we get another one rather than
129 // using a container.
132 g
= std::move(std::move(out_handler
.back()))]
133 (boost::system::error_code ec
, int r
,
134 const ceph::buffer::list
& bl
) mutable {
135 std::move(g
)(ec
, r
, bl
);
136 std::move(f
)(ec
, r
, bl
);
139 out_handler
.back() = std::move(f
);
142 ceph_assert(ops
.size() == out_handler
.size());
145 void set_handler(Context
*c
) {
147 set_handler([c
= std::unique_ptr
<Context
>(c
)](boost::system::error_code
,
149 const ceph::buffer::list
&) mutable {
150 c
.release()->complete(r
);
155 OSDOp
& add_op(int op
) {
157 ops
.back().op
.op
= op
;
158 out_bl
.push_back(nullptr);
159 ceph_assert(ops
.size() == out_bl
.size());
160 out_handler
.emplace_back();
161 ceph_assert(ops
.size() == out_handler
.size());
162 out_rval
.push_back(nullptr);
163 ceph_assert(ops
.size() == out_rval
.size());
164 out_ec
.push_back(nullptr);
165 ceph_assert(ops
.size() == out_ec
.size());
168 void add_data(int op
, uint64_t off
, uint64_t len
, ceph::buffer::list
& bl
) {
169 OSDOp
& osd_op
= add_op(op
);
170 osd_op
.op
.extent
.offset
= off
;
171 osd_op
.op
.extent
.length
= len
;
172 osd_op
.indata
.claim_append(bl
);
174 void add_writesame(int op
, uint64_t off
, uint64_t write_len
,
175 ceph::buffer::list
& bl
) {
176 OSDOp
& osd_op
= add_op(op
);
177 osd_op
.op
.writesame
.offset
= off
;
178 osd_op
.op
.writesame
.length
= write_len
;
179 osd_op
.op
.writesame
.data_length
= bl
.length();
180 osd_op
.indata
.claim_append(bl
);
182 void add_xattr(int op
, const char *name
, const ceph::buffer::list
& data
) {
183 OSDOp
& osd_op
= add_op(op
);
184 osd_op
.op
.xattr
.name_len
= (name
? strlen(name
) : 0);
185 osd_op
.op
.xattr
.value_len
= data
.length();
187 osd_op
.indata
.append(name
, osd_op
.op
.xattr
.name_len
);
188 osd_op
.indata
.append(data
);
190 void add_xattr_cmp(int op
, const char *name
, uint8_t cmp_op
,
191 uint8_t cmp_mode
, const ceph::buffer::list
& data
) {
192 OSDOp
& osd_op
= add_op(op
);
193 osd_op
.op
.xattr
.name_len
= (name
? strlen(name
) : 0);
194 osd_op
.op
.xattr
.value_len
= data
.length();
195 osd_op
.op
.xattr
.cmp_op
= cmp_op
;
196 osd_op
.op
.xattr
.cmp_mode
= cmp_mode
;
198 osd_op
.indata
.append(name
, osd_op
.op
.xattr
.name_len
);
199 osd_op
.indata
.append(data
);
201 void add_xattr(int op
, std::string_view name
, const ceph::buffer::list
& data
) {
202 OSDOp
& osd_op
= add_op(op
);
203 osd_op
.op
.xattr
.name_len
= name
.size();
204 osd_op
.op
.xattr
.value_len
= data
.length();
205 osd_op
.indata
.append(name
.data(), osd_op
.op
.xattr
.name_len
);
206 osd_op
.indata
.append(data
);
208 void add_xattr_cmp(int op
, std::string_view name
, uint8_t cmp_op
,
209 uint8_t cmp_mode
, const ceph::buffer::list
& data
) {
210 OSDOp
& osd_op
= add_op(op
);
211 osd_op
.op
.xattr
.name_len
= name
.size();
212 osd_op
.op
.xattr
.value_len
= data
.length();
213 osd_op
.op
.xattr
.cmp_op
= cmp_op
;
214 osd_op
.op
.xattr
.cmp_mode
= cmp_mode
;
216 osd_op
.indata
.append(name
.data(), osd_op
.op
.xattr
.name_len
);
217 osd_op
.indata
.append(data
);
219 void add_call(int op
, std::string_view cname
, std::string_view method
,
220 const ceph::buffer::list
&indata
,
221 ceph::buffer::list
*outbl
, Context
*ctx
, int *prval
) {
222 OSDOp
& osd_op
= add_op(op
);
224 unsigned p
= ops
.size() - 1;
229 osd_op
.op
.cls
.class_len
= cname
.size();
230 osd_op
.op
.cls
.method_len
= method
.size();
231 osd_op
.op
.cls
.indata_len
= indata
.length();
232 osd_op
.indata
.append(cname
.data(), osd_op
.op
.cls
.class_len
);
233 osd_op
.indata
.append(method
.data(), osd_op
.op
.cls
.method_len
);
234 osd_op
.indata
.append(indata
);
236 void add_call(int op
, std::string_view cname
, std::string_view method
,
237 const ceph::buffer::list
&indata
,
238 fu2::unique_function
<void(boost::system::error_code
,
239 const ceph::buffer::list
&) &&> f
) {
240 OSDOp
& osd_op
= add_op(op
);
242 set_handler([f
= std::move(f
)](boost::system::error_code ec
,
244 const ceph::buffer::list
& bl
) mutable {
245 std::move(f
)(ec
, bl
);
248 osd_op
.op
.cls
.class_len
= cname
.size();
249 osd_op
.op
.cls
.method_len
= method
.size();
250 osd_op
.op
.cls
.indata_len
= indata
.length();
251 osd_op
.indata
.append(cname
.data(), osd_op
.op
.cls
.class_len
);
252 osd_op
.indata
.append(method
.data(), osd_op
.op
.cls
.method_len
);
253 osd_op
.indata
.append(indata
);
255 void add_call(int op
, std::string_view cname
, std::string_view method
,
256 const ceph::buffer::list
&indata
,
257 fu2::unique_function
<void(boost::system::error_code
, int,
258 const ceph::buffer::list
&) &&> f
) {
259 OSDOp
& osd_op
= add_op(op
);
261 set_handler([f
= std::move(f
)](boost::system::error_code ec
,
263 const ceph::buffer::list
& bl
) mutable {
264 std::move(f
)(ec
, r
, bl
);
267 osd_op
.op
.cls
.class_len
= cname
.size();
268 osd_op
.op
.cls
.method_len
= method
.size();
269 osd_op
.op
.cls
.indata_len
= indata
.length();
270 osd_op
.indata
.append(cname
.data(), osd_op
.op
.cls
.class_len
);
271 osd_op
.indata
.append(method
.data(), osd_op
.op
.cls
.method_len
);
272 osd_op
.indata
.append(indata
);
274 void add_pgls(int op
, uint64_t count
, collection_list_handle_t cookie
,
275 epoch_t start_epoch
) {
277 OSDOp
& osd_op
= add_op(op
);
278 osd_op
.op
.pgls
.count
= count
;
279 osd_op
.op
.pgls
.start_epoch
= start_epoch
;
280 encode(cookie
, osd_op
.indata
);
282 void add_pgls_filter(int op
, uint64_t count
, const ceph::buffer::list
& filter
,
283 collection_list_handle_t cookie
, epoch_t start_epoch
) {
285 OSDOp
& osd_op
= add_op(op
);
286 osd_op
.op
.pgls
.count
= count
;
287 osd_op
.op
.pgls
.start_epoch
= start_epoch
;
288 std::string cname
= "pg";
289 std::string mname
= "filter";
290 encode(cname
, osd_op
.indata
);
291 encode(mname
, osd_op
.indata
);
292 osd_op
.indata
.append(filter
);
293 encode(cookie
, osd_op
.indata
);
295 void add_alloc_hint(int op
, uint64_t expected_object_size
,
296 uint64_t expected_write_size
,
298 OSDOp
& osd_op
= add_op(op
);
299 osd_op
.op
.alloc_hint
.expected_object_size
= expected_object_size
;
300 osd_op
.op
.alloc_hint
.expected_write_size
= expected_write_size
;
301 osd_op
.op
.alloc_hint
.flags
= flags
;
307 void pg_ls(uint64_t count
, ceph::buffer::list
& filter
,
308 collection_list_handle_t cookie
, epoch_t start_epoch
) {
309 if (filter
.length() == 0)
310 add_pgls(CEPH_OSD_OP_PGLS
, count
, cookie
, start_epoch
);
312 add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER
, count
, filter
, cookie
,
314 flags
|= CEPH_OSD_FLAG_PGOP
;
317 void pg_nls(uint64_t count
, const ceph::buffer::list
& filter
,
318 collection_list_handle_t cookie
, epoch_t start_epoch
) {
319 if (filter
.length() == 0)
320 add_pgls(CEPH_OSD_OP_PGNLS
, count
, cookie
, start_epoch
);
322 add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER
, count
, filter
, cookie
,
324 flags
|= CEPH_OSD_FLAG_PGOP
;
327 void scrub_ls(const librados::object_id_t
& start_after
,
329 std::vector
<librados::inconsistent_obj_t
> *objects
,
332 void scrub_ls(const librados::object_id_t
& start_after
,
334 std::vector
<librados::inconsistent_snapset_t
> *objects
,
338 void create(bool excl
) {
339 OSDOp
& o
= add_op(CEPH_OSD_OP_CREATE
);
340 o
.op
.flags
= (excl
? CEPH_OSD_OP_FLAG_EXCL
: 0);
343 struct CB_ObjectOperation_stat
{
344 ceph::buffer::list bl
;
346 ceph::real_time
*pmtime
;
348 struct timespec
*pts
;
350 boost::system::error_code
* pec
;
351 CB_ObjectOperation_stat(uint64_t *ps
, ceph::real_time
*pm
, time_t *pt
, struct timespec
*_pts
,
352 int *prval
, boost::system::error_code
* pec
)
353 : psize(ps
), pmtime(pm
), ptime(pt
), pts(_pts
), prval(prval
), pec(pec
) {}
354 void operator()(boost::system::error_code ec
, int r
, const ceph::buffer::list
& bl
) {
357 auto p
= bl
.cbegin();
360 ceph::real_time mtime
;
368 *ptime
= ceph::real_clock::to_time_t(mtime
);
370 *pts
= ceph::real_clock::to_timespec(mtime
);
371 } catch (const ceph::buffer::error
& e
) {
380 void stat(uint64_t *psize
, ceph::real_time
*pmtime
, int *prval
) {
381 add_op(CEPH_OSD_OP_STAT
);
382 set_handler(CB_ObjectOperation_stat(psize
, pmtime
, nullptr, nullptr, prval
,
384 out_rval
.back() = prval
;
386 void stat(uint64_t *psize
, ceph::real_time
*pmtime
,
387 boost::system::error_code
* ec
) {
388 add_op(CEPH_OSD_OP_STAT
);
389 set_handler(CB_ObjectOperation_stat(psize
, pmtime
, nullptr, nullptr,
393 void stat(uint64_t *psize
, time_t *ptime
, int *prval
) {
394 add_op(CEPH_OSD_OP_STAT
);
395 set_handler(CB_ObjectOperation_stat(psize
, nullptr, ptime
, nullptr, prval
,
397 out_rval
.back() = prval
;
399 void stat(uint64_t *psize
, struct timespec
*pts
, int *prval
) {
400 add_op(CEPH_OSD_OP_STAT
);
401 set_handler(CB_ObjectOperation_stat(psize
, nullptr, nullptr, pts
, prval
, nullptr));
402 out_rval
.back() = prval
;
404 void stat(uint64_t *psize
, ceph::real_time
*pmtime
, nullptr_t
) {
405 add_op(CEPH_OSD_OP_STAT
);
406 set_handler(CB_ObjectOperation_stat(psize
, pmtime
, nullptr, nullptr, nullptr,
409 void stat(uint64_t *psize
, time_t *ptime
, nullptr_t
) {
410 add_op(CEPH_OSD_OP_STAT
);
411 set_handler(CB_ObjectOperation_stat(psize
, nullptr, ptime
, nullptr, nullptr,
414 void stat(uint64_t *psize
, struct timespec
*pts
, nullptr_t
) {
415 add_op(CEPH_OSD_OP_STAT
);
416 set_handler(CB_ObjectOperation_stat(psize
, nullptr, nullptr, pts
, nullptr,
419 void stat(uint64_t *psize
, nullptr_t
, nullptr_t
) {
420 add_op(CEPH_OSD_OP_STAT
);
421 set_handler(CB_ObjectOperation_stat(psize
, nullptr, nullptr, nullptr,
426 struct CB_ObjectOperation_cmpext
{
427 int* prval
= nullptr;
428 boost::system::error_code
* ec
= nullptr;
429 std::size_t* s
= nullptr;
430 explicit CB_ObjectOperation_cmpext(int *prval
)
432 CB_ObjectOperation_cmpext(boost::system::error_code
* ec
, std::size_t* s
)
435 void operator()(boost::system::error_code ec
, int r
, const ceph::buffer::list
&) {
441 *s
= static_cast<std::size_t>(-(MAX_ERRNO
- r
));
445 void cmpext(uint64_t off
, ceph::buffer::list
& cmp_bl
, int *prval
) {
446 add_data(CEPH_OSD_OP_CMPEXT
, off
, cmp_bl
.length(), cmp_bl
);
447 set_handler(CB_ObjectOperation_cmpext(prval
));
448 out_rval
.back() = prval
;
451 void cmpext(uint64_t off
, ceph::buffer::list
&& cmp_bl
, boost::system::error_code
* ec
,
453 add_data(CEPH_OSD_OP_CMPEXT
, off
, cmp_bl
.length(), cmp_bl
);
454 set_handler(CB_ObjectOperation_cmpext(ec
, s
));
459 void cmpext(uint64_t off
, uint64_t cmp_len
, const char *cmp_buf
, int *prval
) {
460 ceph::buffer::list cmp_bl
;
461 cmp_bl
.append(cmp_buf
, cmp_len
);
462 add_data(CEPH_OSD_OP_CMPEXT
, off
, cmp_len
, cmp_bl
);
463 set_handler(CB_ObjectOperation_cmpext(prval
));
464 out_rval
.back() = prval
;
467 void read(uint64_t off
, uint64_t len
, ceph::buffer::list
*pbl
, int *prval
,
469 ceph::buffer::list bl
;
470 add_data(CEPH_OSD_OP_READ
, off
, len
, bl
);
471 unsigned p
= ops
.size() - 1;
477 void read(uint64_t off
, uint64_t len
, boost::system::error_code
* ec
,
478 ceph::buffer::list
* pbl
) {
479 ceph::buffer::list bl
;
480 add_data(CEPH_OSD_OP_READ
, off
, len
, bl
);
485 template<typename Ex
>
486 struct CB_ObjectOperation_sparse_read
{
487 ceph::buffer::list
* data_bl
;
490 boost::system::error_code
* pec
;
491 CB_ObjectOperation_sparse_read(ceph::buffer::list
* data_bl
,
494 boost::system::error_code
* pec
)
495 : data_bl(data_bl
), extents(extents
), prval(prval
), pec(pec
) {}
496 void operator()(boost::system::error_code ec
, int r
, const ceph::buffer::list
& bl
) {
497 auto iter
= bl
.cbegin();
499 // NOTE: it's possible the sub-op has not been executed but the result
500 // code remains zeroed. Avoid the costly exception handling on a
501 // potential IO path.
502 if (bl
.length() > 0) {
504 decode(*extents
, iter
);
505 decode(*data_bl
, iter
);
506 } catch (const ceph::buffer::error
& e
) {
515 *pec
= buffer::errc::end_of_buffer
;
520 void sparse_read(uint64_t off
, uint64_t len
, std::map
<uint64_t, uint64_t>* m
,
521 ceph::buffer::list
* data_bl
, int* prval
) {
522 ceph::buffer::list bl
;
523 add_data(CEPH_OSD_OP_SPARSE_READ
, off
, len
, bl
);
524 set_handler(CB_ObjectOperation_sparse_read(data_bl
, m
, prval
, nullptr));
525 out_rval
.back() = prval
;
527 void sparse_read(uint64_t off
, uint64_t len
,
528 boost::system::error_code
* ec
,
529 std::vector
<std::pair
<uint64_t, uint64_t>>* m
,
530 ceph::buffer::list
* data_bl
) {
531 ceph::buffer::list bl
;
532 add_data(CEPH_OSD_OP_SPARSE_READ
, off
, len
, bl
);
533 set_handler(CB_ObjectOperation_sparse_read(data_bl
, m
, nullptr, ec
));
536 void write(uint64_t off
, ceph::buffer::list
& bl
,
537 uint64_t truncate_size
,
538 uint32_t truncate_seq
) {
539 add_data(CEPH_OSD_OP_WRITE
, off
, bl
.length(), bl
);
540 OSDOp
& o
= *ops
.rbegin();
541 o
.op
.extent
.truncate_size
= truncate_size
;
542 o
.op
.extent
.truncate_seq
= truncate_seq
;
544 void write(uint64_t off
, ceph::buffer::list
& bl
) {
545 write(off
, bl
, 0, 0);
547 void write_full(ceph::buffer::list
& bl
) {
548 add_data(CEPH_OSD_OP_WRITEFULL
, 0, bl
.length(), bl
);
550 void writesame(uint64_t off
, uint64_t write_len
, ceph::buffer::list
& bl
) {
551 add_writesame(CEPH_OSD_OP_WRITESAME
, off
, write_len
, bl
);
553 void append(ceph::buffer::list
& bl
) {
554 add_data(CEPH_OSD_OP_APPEND
, 0, bl
.length(), bl
);
556 void zero(uint64_t off
, uint64_t len
) {
557 ceph::buffer::list bl
;
558 add_data(CEPH_OSD_OP_ZERO
, off
, len
, bl
);
560 void truncate(uint64_t off
) {
561 ceph::buffer::list bl
;
562 add_data(CEPH_OSD_OP_TRUNCATE
, off
, 0, bl
);
565 ceph::buffer::list bl
;
566 add_data(CEPH_OSD_OP_DELETE
, 0, 0, bl
);
568 void mapext(uint64_t off
, uint64_t len
) {
569 ceph::buffer::list bl
;
570 add_data(CEPH_OSD_OP_MAPEXT
, off
, len
, bl
);
572 void sparse_read(uint64_t off
, uint64_t len
) {
573 ceph::buffer::list bl
;
574 add_data(CEPH_OSD_OP_SPARSE_READ
, off
, len
, bl
);
577 void checksum(uint8_t type
, const ceph::buffer::list
&init_value_bl
,
578 uint64_t off
, uint64_t len
, size_t chunk_size
,
579 ceph::buffer::list
*pbl
, int *prval
, Context
*ctx
) {
580 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_CHECKSUM
);
581 osd_op
.op
.checksum
.offset
= off
;
582 osd_op
.op
.checksum
.length
= len
;
583 osd_op
.op
.checksum
.type
= type
;
584 osd_op
.op
.checksum
.chunk_size
= chunk_size
;
585 osd_op
.indata
.append(init_value_bl
);
587 unsigned p
= ops
.size() - 1;
594 void getxattr(const char *name
, ceph::buffer::list
*pbl
, int *prval
) {
595 ceph::buffer::list bl
;
596 add_xattr(CEPH_OSD_OP_GETXATTR
, name
, bl
);
597 unsigned p
= ops
.size() - 1;
601 void getxattr(std::string_view name
, boost::system::error_code
* ec
,
603 ceph::buffer::list bl
;
604 add_xattr(CEPH_OSD_OP_GETXATTR
, name
, bl
);
609 template<typename Vals
>
610 struct CB_ObjectOperation_decodevals
{
611 uint64_t max_entries
;
615 boost::system::error_code
* pec
;
616 CB_ObjectOperation_decodevals(uint64_t m
, Vals
* pa
,
618 boost::system::error_code
* pec
)
619 : max_entries(m
), pattrs(pa
), ptruncated(pt
), prval(pr
), pec(pec
) {
624 void operator()(boost::system::error_code ec
, int r
, const ceph::buffer::list
& bl
) {
626 auto p
= bl
.cbegin();
637 decode(*ptruncated
, p
);
639 // The OSD did not provide this. Since old OSDs do not
640 // enfoce omap result limits either, we can infer it from
641 // the size of the result
642 *ptruncated
= (pattrs
->size() == max_entries
);
645 } catch (const ceph::buffer::error
& e
) {
654 template<typename Keys
>
655 struct CB_ObjectOperation_decodekeys
{
656 uint64_t max_entries
;
660 boost::system::error_code
* pec
;
661 CB_ObjectOperation_decodekeys(uint64_t m
, Keys
* pa
, bool *pt
,
662 int *pr
, boost::system::error_code
* pec
)
663 : max_entries(m
), pattrs(pa
), ptruncated(pt
), prval(pr
), pec(pec
) {
668 void operator()(boost::system::error_code ec
, int r
, const ceph::buffer::list
& bl
) {
671 auto p
= bl
.cbegin();
682 decode(*ptruncated
, p
);
684 // the OSD did not provide this. since old OSDs do not
685 // enforce omap result limits either, we can infer it from
686 // the size of the result
687 *ptruncated
= (pattrs
->size() == max_entries
);
690 } catch (const ceph::buffer::error
& e
) {
699 struct CB_ObjectOperation_decodewatchers
{
700 std::list
<obj_watch_t
>* pwatchers
;
702 boost::system::error_code
* pec
;
703 CB_ObjectOperation_decodewatchers(std::list
<obj_watch_t
>* pw
, int* pr
,
704 boost::system::error_code
* pec
)
705 : pwatchers(pw
), prval(pr
), pec(pec
) {}
706 void operator()(boost::system::error_code ec
, int r
,
707 const ceph::buffer::list
& bl
) {
709 auto p
= bl
.cbegin();
711 obj_list_watch_response_t resp
;
714 for (const auto& watch_item
: resp
.entries
) {
716 std::string sa
= watch_item
.addr
.get_legacy_str();
717 strncpy(ow
.addr
, sa
.c_str(), sizeof(ow
.addr
) - 1);
718 ow
.addr
[sizeof(ow
.addr
) - 1] = '\0';
719 ow
.watcher_id
= watch_item
.name
.num();
720 ow
.cookie
= watch_item
.cookie
;
721 ow
.timeout_seconds
= watch_item
.timeout_seconds
;
722 pwatchers
->push_back(std::move(ow
));
725 } catch (const ceph::buffer::error
& e
) {
735 struct CB_ObjectOperation_decodewatchersneo
{
736 std::vector
<neorados::ObjWatcher
>* pwatchers
;
738 boost::system::error_code
* pec
;
739 CB_ObjectOperation_decodewatchersneo(std::vector
<neorados::ObjWatcher
>* pw
,
741 boost::system::error_code
* pec
)
742 : pwatchers(pw
), prval(pr
), pec(pec
) {}
743 void operator()(boost::system::error_code ec
, int r
,
744 const ceph::buffer::list
& bl
) {
746 auto p
= bl
.cbegin();
748 obj_list_watch_response_t resp
;
751 for (const auto& watch_item
: resp
.entries
) {
752 neorados::ObjWatcher ow
;
753 ow
.addr
= watch_item
.addr
.get_legacy_str();
754 ow
.watcher_id
= watch_item
.name
.num();
755 ow
.cookie
= watch_item
.cookie
;
756 ow
.timeout_seconds
= watch_item
.timeout_seconds
;
757 pwatchers
->push_back(std::move(ow
));
760 } catch (const ceph::buffer::error
& e
) {
771 struct CB_ObjectOperation_decodesnaps
{
772 librados::snap_set_t
*psnaps
;
773 neorados::SnapSet
*neosnaps
;
775 boost::system::error_code
* pec
;
776 CB_ObjectOperation_decodesnaps(librados::snap_set_t
* ps
,
777 neorados::SnapSet
* ns
, int* pr
,
778 boost::system::error_code
* pec
)
779 : psnaps(ps
), neosnaps(ns
), prval(pr
), pec(pec
) {}
780 void operator()(boost::system::error_code ec
, int r
, const ceph::buffer::list
& bl
) {
783 auto p
= bl
.cbegin();
785 obj_list_snap_response_t resp
;
788 psnaps
->clones
.clear();
789 for (auto ci
= resp
.clones
.begin();
790 ci
!= resp
.clones
.end();
792 librados::clone_info_t clone
;
794 clone
.cloneid
= ci
->cloneid
;
795 clone
.snaps
.reserve(ci
->snaps
.size());
796 clone
.snaps
.insert(clone
.snaps
.end(), ci
->snaps
.begin(),
798 clone
.overlap
= ci
->overlap
;
799 clone
.size
= ci
->size
;
801 psnaps
->clones
.push_back(clone
);
803 psnaps
->seq
= resp
.seq
;
807 neosnaps
->clones
.clear();
808 for (auto&& c
: resp
.clones
) {
809 neorados::CloneInfo clone
;
811 clone
.cloneid
= std::move(c
.cloneid
);
812 clone
.snaps
.reserve(c
.snaps
.size());
813 std::move(c
.snaps
.begin(), c
.snaps
.end(),
814 std::back_inserter(clone
.snaps
));
815 clone
.overlap
= c
.overlap
;
817 neosnaps
->clones
.push_back(std::move(clone
));
819 neosnaps
->seq
= resp
.seq
;
821 } catch (const ceph::buffer::error
& e
) {
830 void getxattrs(std::map
<std::string
,ceph::buffer::list
> *pattrs
, int *prval
) {
831 add_op(CEPH_OSD_OP_GETXATTRS
);
832 if (pattrs
|| prval
) {
833 set_handler(CB_ObjectOperation_decodevals(0, pattrs
, nullptr, prval
,
835 out_rval
.back() = prval
;
838 void getxattrs(boost::system::error_code
* ec
,
839 boost::container::flat_map
<std::string
, ceph::buffer::list
> *pattrs
) {
840 add_op(CEPH_OSD_OP_GETXATTRS
);
841 set_handler(CB_ObjectOperation_decodevals(0, pattrs
, nullptr, nullptr, ec
));
844 void setxattr(const char *name
, const ceph::buffer::list
& bl
) {
845 add_xattr(CEPH_OSD_OP_SETXATTR
, name
, bl
);
847 void setxattr(std::string_view name
, const ceph::buffer::list
& bl
) {
848 add_xattr(CEPH_OSD_OP_SETXATTR
, name
, bl
);
850 void setxattr(const char *name
, const std::string
& s
) {
851 ceph::buffer::list bl
;
853 add_xattr(CEPH_OSD_OP_SETXATTR
, name
, bl
);
855 void cmpxattr(const char *name
, uint8_t cmp_op
, uint8_t cmp_mode
,
856 const ceph::buffer::list
& bl
) {
857 add_xattr_cmp(CEPH_OSD_OP_CMPXATTR
, name
, cmp_op
, cmp_mode
, bl
);
859 void cmpxattr(std::string_view name
, uint8_t cmp_op
, uint8_t cmp_mode
,
860 const ceph::buffer::list
& bl
) {
861 add_xattr_cmp(CEPH_OSD_OP_CMPXATTR
, name
, cmp_op
, cmp_mode
, bl
);
863 void rmxattr(const char *name
) {
864 ceph::buffer::list bl
;
865 add_xattr(CEPH_OSD_OP_RMXATTR
, name
, bl
);
867 void rmxattr(std::string_view name
) {
868 ceph::buffer::list bl
;
869 add_xattr(CEPH_OSD_OP_RMXATTR
, name
, bl
);
871 void setxattrs(map
<string
, ceph::buffer::list
>& attrs
) {
873 ceph::buffer::list bl
;
875 add_xattr(CEPH_OSD_OP_RESETXATTRS
, 0, bl
.length());
877 void resetxattrs(const char *prefix
, std::map
<std::string
, ceph::buffer::list
>& attrs
) {
879 ceph::buffer::list bl
;
881 add_xattr(CEPH_OSD_OP_RESETXATTRS
, prefix
, bl
);
885 void tmap_update(ceph::buffer::list
& bl
) {
886 add_data(CEPH_OSD_OP_TMAPUP
, 0, 0, bl
);
890 void omap_get_keys(const std::string
&start_after
,
892 std::set
<std::string
> *out_set
,
896 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAPGETKEYS
);
897 ceph::buffer::list bl
;
898 encode(start_after
, bl
);
899 encode(max_to_get
, bl
);
900 op
.op
.extent
.offset
= 0;
901 op
.op
.extent
.length
= bl
.length();
902 op
.indata
.claim_append(bl
);
903 if (prval
|| ptruncated
|| out_set
) {
904 set_handler(CB_ObjectOperation_decodekeys(max_to_get
, out_set
, ptruncated
, prval
,
906 out_rval
.back() = prval
;
909 void omap_get_keys(std::optional
<std::string_view
> start_after
,
911 boost::system::error_code
* ec
,
912 boost::container::flat_set
<std::string
> *out_set
,
914 OSDOp
& op
= add_op(CEPH_OSD_OP_OMAPGETKEYS
);
915 ceph::buffer::list bl
;
916 encode(start_after
? *start_after
: std::string_view
{}, bl
);
917 encode(max_to_get
, bl
);
918 op
.op
.extent
.offset
= 0;
919 op
.op
.extent
.length
= bl
.length();
920 op
.indata
.claim_append(bl
);
922 CB_ObjectOperation_decodekeys(max_to_get
, out_set
, ptruncated
, nullptr,
927 void omap_get_vals(const std::string
&start_after
,
928 const std::string
&filter_prefix
,
930 std::map
<std::string
, ceph::buffer::list
> *out_set
,
934 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAPGETVALS
);
935 ceph::buffer::list bl
;
936 encode(start_after
, bl
);
937 encode(max_to_get
, bl
);
938 encode(filter_prefix
, bl
);
939 op
.op
.extent
.offset
= 0;
940 op
.op
.extent
.length
= bl
.length();
941 op
.indata
.claim_append(bl
);
942 if (prval
|| out_set
|| ptruncated
) {
943 set_handler(CB_ObjectOperation_decodevals(max_to_get
, out_set
, ptruncated
,
945 out_rval
.back() = prval
;
949 void omap_get_vals(std::optional
<std::string_view
> start_after
,
950 std::optional
<std::string_view
> filter_prefix
,
952 boost::system::error_code
* ec
,
953 boost::container::flat_map
<std::string
, ceph::buffer::list
> *out_set
,
955 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAPGETVALS
);
956 ceph::buffer::list bl
;
957 encode(start_after
? *start_after
: std::string_view
{}, bl
);
958 encode(max_to_get
, bl
);
959 encode(filter_prefix
? *start_after
: std::string_view
{}, bl
);
960 op
.op
.extent
.offset
= 0;
961 op
.op
.extent
.length
= bl
.length();
962 op
.indata
.claim_append(bl
);
963 set_handler(CB_ObjectOperation_decodevals(max_to_get
, out_set
, ptruncated
,
968 void omap_get_vals_by_keys(const std::set
<std::string
> &to_get
,
969 std::map
<std::string
, ceph::buffer::list
> *out_set
,
971 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS
);
972 ceph::buffer::list bl
;
974 op
.op
.extent
.offset
= 0;
975 op
.op
.extent
.length
= bl
.length();
976 op
.indata
.claim_append(bl
);
977 if (prval
|| out_set
) {
978 set_handler(CB_ObjectOperation_decodevals(0, out_set
, nullptr, prval
,
980 out_rval
.back() = prval
;
984 void omap_get_vals_by_keys(
985 const boost::container::flat_set
<std::string
>& to_get
,
986 boost::system::error_code
* ec
,
987 boost::container::flat_map
<std::string
, ceph::buffer::list
> *out_set
) {
988 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS
);
989 ceph::buffer::list bl
;
991 op
.op
.extent
.offset
= 0;
992 op
.op
.extent
.length
= bl
.length();
993 op
.indata
.claim_append(bl
);
994 set_handler(CB_ObjectOperation_decodevals(0, out_set
, nullptr, nullptr,
999 void omap_cmp(const std::map
<std::string
, pair
<ceph::buffer::list
,int> > &assertions
,
1002 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAP_CMP
);
1003 ceph::buffer::list bl
;
1004 encode(assertions
, bl
);
1005 op
.op
.extent
.offset
= 0;
1006 op
.op
.extent
.length
= bl
.length();
1007 op
.indata
.claim_append(bl
);
1009 unsigned p
= ops
.size() - 1;
1010 out_rval
[p
] = prval
;
1014 void omap_cmp(const boost::container::flat_map
<
1015 std::string
, pair
<ceph::buffer::list
, int>>& assertions
,
1016 boost::system::error_code
*ec
) {
1017 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAP_CMP
);
1018 ceph::buffer::list bl
;
1019 encode(assertions
, bl
);
1020 op
.op
.extent
.offset
= 0;
1021 op
.op
.extent
.length
= bl
.length();
1022 op
.indata
.claim_append(bl
);
1026 struct C_ObjectOperation_copyget
: public Context
{
1027 ceph::buffer::list bl
;
1028 object_copy_cursor_t
*cursor
;
1030 ceph::real_time
*out_mtime
;
1031 std::map
<std::string
,ceph::buffer::list
> *out_attrs
;
1032 ceph::buffer::list
*out_data
, *out_omap_header
, *out_omap_data
;
1033 std::vector
<snapid_t
> *out_snaps
;
1034 snapid_t
*out_snap_seq
;
1035 uint32_t *out_flags
;
1036 uint32_t *out_data_digest
;
1037 uint32_t *out_omap_digest
;
1038 mempool::osd_pglog::vector
<std::pair
<osd_reqid_t
, version_t
> > *out_reqids
;
1039 mempool::osd_pglog::map
<uint32_t, int> *out_reqid_return_codes
;
1040 uint64_t *out_truncate_seq
;
1041 uint64_t *out_truncate_size
;
1043 C_ObjectOperation_copyget(object_copy_cursor_t
*c
,
1046 std::map
<std::string
,ceph::buffer::list
> *a
,
1047 ceph::buffer::list
*d
, ceph::buffer::list
*oh
,
1048 ceph::buffer::list
*o
,
1049 std::vector
<snapid_t
> *osnaps
,
1050 snapid_t
*osnap_seq
,
1054 mempool::osd_pglog::vector
<std::pair
<osd_reqid_t
, version_t
> > *oreqids
,
1055 mempool::osd_pglog::map
<uint32_t, int> *oreqid_return_codes
,
1060 out_size(s
), out_mtime(m
),
1061 out_attrs(a
), out_data(d
), out_omap_header(oh
),
1062 out_omap_data(o
), out_snaps(osnaps
), out_snap_seq(osnap_seq
),
1063 out_flags(flags
), out_data_digest(dd
), out_omap_digest(od
),
1064 out_reqids(oreqids
),
1065 out_reqid_return_codes(oreqid_return_codes
),
1066 out_truncate_seq(otseq
),
1067 out_truncate_size(otsize
),
1069 void finish(int r
) override
{
1071 // reqids are copied on ENOENT
1072 if (r
< 0 && r
!= -ENOENT
)
1075 auto p
= bl
.cbegin();
1076 object_copy_data_t copy_reply
;
1077 decode(copy_reply
, p
);
1080 *out_reqids
= copy_reply
.reqids
;
1084 *out_size
= copy_reply
.size
;
1086 *out_mtime
= ceph::real_clock::from_ceph_timespec(copy_reply
.mtime
);
1088 *out_attrs
= copy_reply
.attrs
;
1090 out_data
->claim_append(copy_reply
.data
);
1091 if (out_omap_header
)
1092 out_omap_header
->claim_append(copy_reply
.omap_header
);
1094 *out_omap_data
= copy_reply
.omap_data
;
1096 *out_snaps
= copy_reply
.snaps
;
1098 *out_snap_seq
= copy_reply
.snap_seq
;
1100 *out_flags
= copy_reply
.flags
;
1101 if (out_data_digest
)
1102 *out_data_digest
= copy_reply
.data_digest
;
1103 if (out_omap_digest
)
1104 *out_omap_digest
= copy_reply
.omap_digest
;
1106 *out_reqids
= copy_reply
.reqids
;
1107 if (out_reqid_return_codes
)
1108 *out_reqid_return_codes
= copy_reply
.reqid_return_codes
;
1109 if (out_truncate_seq
)
1110 *out_truncate_seq
= copy_reply
.truncate_seq
;
1111 if (out_truncate_size
)
1112 *out_truncate_size
= copy_reply
.truncate_size
;
1113 *cursor
= copy_reply
.cursor
;
1114 } catch (const ceph::buffer::error
& e
) {
1121 void copy_get(object_copy_cursor_t
*cursor
,
1124 ceph::real_time
*out_mtime
,
1125 std::map
<std::string
,ceph::buffer::list
> *out_attrs
,
1126 ceph::buffer::list
*out_data
,
1127 ceph::buffer::list
*out_omap_header
,
1128 ceph::buffer::list
*out_omap_data
,
1129 std::vector
<snapid_t
> *out_snaps
,
1130 snapid_t
*out_snap_seq
,
1131 uint32_t *out_flags
,
1132 uint32_t *out_data_digest
,
1133 uint32_t *out_omap_digest
,
1134 mempool::osd_pglog::vector
<std::pair
<osd_reqid_t
, version_t
> > *out_reqids
,
1135 mempool::osd_pglog::map
<uint32_t, int> *out_reqid_return_codes
,
1136 uint64_t *truncate_seq
,
1137 uint64_t *truncate_size
,
1140 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_COPY_GET
);
1141 osd_op
.op
.copy_get
.max
= max
;
1142 encode(*cursor
, osd_op
.indata
);
1143 encode(max
, osd_op
.indata
);
1144 unsigned p
= ops
.size() - 1;
1145 out_rval
[p
] = prval
;
1146 C_ObjectOperation_copyget
*h
=
1147 new C_ObjectOperation_copyget(cursor
, out_size
, out_mtime
,
1148 out_attrs
, out_data
, out_omap_header
,
1149 out_omap_data
, out_snaps
, out_snap_seq
,
1150 out_flags
, out_data_digest
,
1151 out_omap_digest
, out_reqids
,
1152 out_reqid_return_codes
, truncate_seq
,
1153 truncate_size
, prval
);
1159 add_op(CEPH_OSD_OP_UNDIRTY
);
1162 struct C_ObjectOperation_isdirty
: public Context
{
1163 ceph::buffer::list bl
;
1166 C_ObjectOperation_isdirty(bool *p
, int *r
)
1167 : pisdirty(p
), prval(r
) {}
1168 void finish(int r
) override
{
1173 auto p
= bl
.cbegin();
1177 *pisdirty
= isdirty
;
1178 } catch (const ceph::buffer::error
& e
) {
1185 void is_dirty(bool *pisdirty
, int *prval
) {
1186 add_op(CEPH_OSD_OP_ISDIRTY
);
1187 unsigned p
= ops
.size() - 1;
1188 out_rval
[p
] = prval
;
1189 C_ObjectOperation_isdirty
*h
=
1190 new C_ObjectOperation_isdirty(pisdirty
, prval
);
1195 struct C_ObjectOperation_hit_set_ls
: public Context
{
1196 ceph::buffer::list bl
;
1197 std::list
< std::pair
<time_t, time_t> > *ptls
;
1198 std::list
< std::pair
<ceph::real_time
, ceph::real_time
> > *putls
;
1200 C_ObjectOperation_hit_set_ls(std::list
< std::pair
<time_t, time_t> > *t
,
1201 std::list
< std::pair
<ceph::real_time
,
1202 ceph::real_time
> > *ut
,
1204 : ptls(t
), putls(ut
), prval(r
) {}
1205 void finish(int r
) override
{
1210 auto p
= bl
.cbegin();
1211 std::list
< std::pair
<ceph::real_time
, ceph::real_time
> > ls
;
1215 for (auto p
= ls
.begin(); p
!= ls
.end(); ++p
)
1216 // round initial timestamp up to the next full second to
1217 // keep this a valid interval.
1219 std::make_pair(ceph::real_clock::to_time_t(
1220 ceph::ceil(p
->first
,
1221 // Sadly, no time literals until C++14.
1222 std::chrono::seconds(1))),
1223 ceph::real_clock::to_time_t(p
->second
)));
1227 } catch (const ceph::buffer::error
& e
) {
1236 * std::list available HitSets.
1238 * We will get back a std::list of time intervals. Note that the most
1239 * recent range may have an empty end timestamp if it is still
1242 * @param pls [out] std::list of time intervals
1243 * @param prval [out] return value
1245 void hit_set_ls(std::list
< std::pair
<time_t, time_t> > *pls
, int *prval
) {
1246 add_op(CEPH_OSD_OP_PG_HITSET_LS
);
1247 unsigned p
= ops
.size() - 1;
1248 out_rval
[p
] = prval
;
1249 C_ObjectOperation_hit_set_ls
*h
=
1250 new C_ObjectOperation_hit_set_ls(pls
, NULL
, prval
);
1254 void hit_set_ls(std::list
<std::pair
<ceph::real_time
, ceph::real_time
> > *pls
,
1256 add_op(CEPH_OSD_OP_PG_HITSET_LS
);
1257 unsigned p
= ops
.size() - 1;
1258 out_rval
[p
] = prval
;
1259 C_ObjectOperation_hit_set_ls
*h
=
1260 new C_ObjectOperation_hit_set_ls(NULL
, pls
, prval
);
1268 * Return an encoded HitSet that includes the provided time
1271 * @param stamp [in] timestamp
1272 * @param pbl [out] target buffer for encoded HitSet
1273 * @param prval [out] return value
1275 void hit_set_get(ceph::real_time stamp
, ceph::buffer::list
*pbl
, int *prval
) {
1276 OSDOp
& op
= add_op(CEPH_OSD_OP_PG_HITSET_GET
);
1277 op
.op
.hit_set_get
.stamp
= ceph::real_clock::to_ceph_timespec(stamp
);
1278 unsigned p
= ops
.size() - 1;
1279 out_rval
[p
] = prval
;
1283 void omap_get_header(ceph::buffer::list
*bl
, int *prval
) {
1284 add_op(CEPH_OSD_OP_OMAPGETHEADER
);
1285 unsigned p
= ops
.size() - 1;
1287 out_rval
[p
] = prval
;
1290 void omap_get_header(boost::system::error_code
* ec
, ceph::buffer::list
*bl
) {
1291 add_op(CEPH_OSD_OP_OMAPGETHEADER
);
1296 void omap_set(const map
<string
, ceph::buffer::list
> &map
) {
1297 ceph::buffer::list bl
;
1299 add_data(CEPH_OSD_OP_OMAPSETVALS
, 0, bl
.length(), bl
);
1302 void omap_set(const boost::container::flat_map
<string
, ceph::buffer::list
>& map
) {
1303 ceph::buffer::list bl
;
1305 add_data(CEPH_OSD_OP_OMAPSETVALS
, 0, bl
.length(), bl
);
1308 void omap_set_header(ceph::buffer::list
&bl
) {
1309 add_data(CEPH_OSD_OP_OMAPSETHEADER
, 0, bl
.length(), bl
);
1313 add_op(CEPH_OSD_OP_OMAPCLEAR
);
1316 void omap_rm_keys(const std::set
<std::string
> &to_remove
) {
1318 ceph::buffer::list bl
;
1319 encode(to_remove
, bl
);
1320 add_data(CEPH_OSD_OP_OMAPRMKEYS
, 0, bl
.length(), bl
);
1322 void omap_rm_keys(const boost::container::flat_set
<std::string
>& to_remove
) {
1323 ceph::buffer::list bl
;
1324 encode(to_remove
, bl
);
1325 add_data(CEPH_OSD_OP_OMAPRMKEYS
, 0, bl
.length(), bl
);
1328 void omap_rm_range(std::string_view key_begin
, std::string_view key_end
) {
1329 ceph::buffer::list bl
;
1331 encode(key_begin
, bl
);
1332 encode(key_end
, bl
);
1333 add_data(CEPH_OSD_OP_OMAPRMKEYRANGE
, 0, bl
.length(), bl
);
1337 void call(const char *cname
, const char *method
, ceph::buffer::list
&indata
) {
1338 add_call(CEPH_OSD_OP_CALL
, cname
, method
, indata
, NULL
, NULL
, NULL
);
1341 void call(const char *cname
, const char *method
, ceph::buffer::list
&indata
,
1342 ceph::buffer::list
*outdata
, Context
*ctx
, int *prval
) {
1343 add_call(CEPH_OSD_OP_CALL
, cname
, method
, indata
, outdata
, ctx
, prval
);
1346 void call(std::string_view cname
, std::string_view method
,
1347 const ceph::buffer::list
& indata
, boost::system::error_code
* ec
) {
1348 add_call(CEPH_OSD_OP_CALL
, cname
, method
, indata
, NULL
, NULL
, NULL
);
1352 void call(std::string_view cname
, std::string_view method
, const ceph::buffer::list
& indata
,
1353 boost::system::error_code
* ec
, ceph::buffer::list
*outdata
) {
1354 add_call(CEPH_OSD_OP_CALL
, cname
, method
, indata
, outdata
, nullptr, nullptr);
1357 void call(std::string_view cname
, std::string_view method
,
1358 const ceph::buffer::list
& indata
,
1359 fu2::unique_function
<void (boost::system::error_code
,
1360 const ceph::buffer::list
&) &&> f
) {
1361 add_call(CEPH_OSD_OP_CALL
, cname
, method
, indata
, std::move(f
));
1363 void call(std::string_view cname
, std::string_view method
,
1364 const ceph::buffer::list
& indata
,
1365 fu2::unique_function
<void (boost::system::error_code
, int,
1366 const ceph::buffer::list
&) &&> f
) {
1367 add_call(CEPH_OSD_OP_CALL
, cname
, method
, indata
, std::move(f
));
1371 void watch(uint64_t cookie
, __u8 op
, uint32_t timeout
= 0) {
1372 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_WATCH
);
1373 osd_op
.op
.watch
.cookie
= cookie
;
1374 osd_op
.op
.watch
.op
= op
;
1375 osd_op
.op
.watch
.timeout
= timeout
;
1378 void notify(uint64_t cookie
, uint32_t prot_ver
, uint32_t timeout
,
1379 ceph::buffer::list
&bl
, ceph::buffer::list
*inbl
) {
1381 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_NOTIFY
);
1382 osd_op
.op
.notify
.cookie
= cookie
;
1383 encode(prot_ver
, *inbl
);
1384 encode(timeout
, *inbl
);
1386 osd_op
.indata
.append(*inbl
);
1389 void notify_ack(uint64_t notify_id
, uint64_t cookie
,
1390 ceph::buffer::list
& reply_bl
) {
1392 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_NOTIFY_ACK
);
1393 ceph::buffer::list bl
;
1394 encode(notify_id
, bl
);
1396 encode(reply_bl
, bl
);
1397 osd_op
.indata
.append(bl
);
1400 void list_watchers(std::list
<obj_watch_t
> *out
,
1402 add_op(CEPH_OSD_OP_LIST_WATCHERS
);
1404 set_handler(CB_ObjectOperation_decodewatchers(out
, prval
, nullptr));
1405 out_rval
.back() = prval
;
1408 void list_watchers(vector
<neorados::ObjWatcher
>* out
,
1409 boost::system::error_code
* ec
) {
1410 add_op(CEPH_OSD_OP_LIST_WATCHERS
);
1411 set_handler(CB_ObjectOperation_decodewatchersneo(out
, nullptr, ec
));
1415 void list_snaps(librados::snap_set_t
*out
, int *prval
,
1416 boost::system::error_code
* ec
= nullptr) {
1417 add_op(CEPH_OSD_OP_LIST_SNAPS
);
1418 if (prval
|| out
|| ec
) {
1419 set_handler(CB_ObjectOperation_decodesnaps(out
, nullptr, prval
, ec
));
1420 out_rval
.back() = prval
;
1425 void list_snaps(neorados::SnapSet
*out
, int *prval
,
1426 boost::system::error_code
* ec
= nullptr) {
1427 add_op(CEPH_OSD_OP_LIST_SNAPS
);
1428 if (prval
|| out
|| ec
) {
1429 set_handler(CB_ObjectOperation_decodesnaps(nullptr, out
, prval
, ec
));
1430 out_rval
.back() = prval
;
1435 void assert_version(uint64_t ver
) {
1436 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_ASSERT_VER
);
1437 osd_op
.op
.assert_ver
.ver
= ver
;
1440 void cmpxattr(const char *name
, const ceph::buffer::list
& val
,
1442 add_xattr(CEPH_OSD_OP_CMPXATTR
, name
, val
);
1443 OSDOp
& o
= *ops
.rbegin();
1444 o
.op
.xattr
.cmp_op
= op
;
1445 o
.op
.xattr
.cmp_mode
= mode
;
1448 void rollback(uint64_t snapid
) {
1449 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_ROLLBACK
);
1450 osd_op
.op
.snap
.snapid
= snapid
;
1453 void copy_from(object_t src
, snapid_t snapid
, object_locator_t src_oloc
,
1454 version_t src_version
, unsigned flags
,
1455 unsigned src_fadvise_flags
) {
1457 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_COPY_FROM
);
1458 osd_op
.op
.copy_from
.snapid
= snapid
;
1459 osd_op
.op
.copy_from
.src_version
= src_version
;
1460 osd_op
.op
.copy_from
.flags
= flags
;
1461 osd_op
.op
.copy_from
.src_fadvise_flags
= src_fadvise_flags
;
1462 encode(src
, osd_op
.indata
);
1463 encode(src_oloc
, osd_op
.indata
);
1465 void copy_from2(object_t src
, snapid_t snapid
, object_locator_t src_oloc
,
1466 version_t src_version
, unsigned flags
,
1467 uint32_t truncate_seq
, uint64_t truncate_size
,
1468 unsigned src_fadvise_flags
) {
1470 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_COPY_FROM2
);
1471 osd_op
.op
.copy_from
.snapid
= snapid
;
1472 osd_op
.op
.copy_from
.src_version
= src_version
;
1473 osd_op
.op
.copy_from
.flags
= flags
;
1474 osd_op
.op
.copy_from
.src_fadvise_flags
= src_fadvise_flags
;
1475 encode(src
, osd_op
.indata
);
1476 encode(src_oloc
, osd_op
.indata
);
1477 encode(truncate_seq
, osd_op
.indata
);
1478 encode(truncate_size
, osd_op
.indata
);
1482 * writeback content to backing tier
1484 * If object is marked dirty in the cache tier, write back content
1485 * to backing tier. If the object is clean this is a no-op.
1487 * If writeback races with an update, the update will block.
1489 * use with IGNORE_CACHE to avoid triggering promote.
1491 void cache_flush() {
1492 add_op(CEPH_OSD_OP_CACHE_FLUSH
);
1496 * writeback content to backing tier
1498 * If object is marked dirty in the cache tier, write back content
1499 * to backing tier. If the object is clean this is a no-op.
1501 * If writeback races with an update, return EAGAIN. Requires that
1502 * the SKIPRWLOCKS flag be set.
1504 * use with IGNORE_CACHE to avoid triggering promote.
1506 void cache_try_flush() {
1507 add_op(CEPH_OSD_OP_CACHE_TRY_FLUSH
);
1511 * evict object from cache tier
1513 * If object is marked clean, remove the object from the cache tier.
1514 * Otherwise, return EBUSY.
1516 * use with IGNORE_CACHE to avoid triggering promote.
1518 void cache_evict() {
1519 add_op(CEPH_OSD_OP_CACHE_EVICT
);
1525 void set_redirect(object_t tgt
, snapid_t snapid
, object_locator_t tgt_oloc
,
1526 version_t tgt_version
, int flag
) {
1528 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_SET_REDIRECT
);
1529 osd_op
.op
.copy_from
.snapid
= snapid
;
1530 osd_op
.op
.copy_from
.src_version
= tgt_version
;
1531 encode(tgt
, osd_op
.indata
);
1532 encode(tgt_oloc
, osd_op
.indata
);
1533 set_last_op_flags(flag
);
1536 void set_chunk(uint64_t src_offset
, uint64_t src_length
, object_locator_t tgt_oloc
,
1537 object_t tgt_oid
, uint64_t tgt_offset
, int flag
) {
1539 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_SET_CHUNK
);
1540 encode(src_offset
, osd_op
.indata
);
1541 encode(src_length
, osd_op
.indata
);
1542 encode(tgt_oloc
, osd_op
.indata
);
1543 encode(tgt_oid
, osd_op
.indata
);
1544 encode(tgt_offset
, osd_op
.indata
);
1545 set_last_op_flags(flag
);
1548 void tier_promote() {
1549 add_op(CEPH_OSD_OP_TIER_PROMOTE
);
1552 void unset_manifest() {
1553 add_op(CEPH_OSD_OP_UNSET_MANIFEST
);
1557 add_op(CEPH_OSD_OP_TIER_FLUSH
);
1561 add_op(CEPH_OSD_OP_TIER_EVICT
);
1564 void set_alloc_hint(uint64_t expected_object_size
,
1565 uint64_t expected_write_size
,
1567 add_alloc_hint(CEPH_OSD_OP_SETALLOCHINT
, expected_object_size
,
1568 expected_write_size
, flags
);
1570 // CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
1571 // not worth a feature bit. Set FAILOK per-op flag to make
1572 // sure older osds don't trip over an unsupported opcode.
1573 set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK
);
1576 template<typename V
>
1579 std::copy(sops
.begin(), sops
.end(),
1580 std::back_inserter(ops
));
1581 out_bl
.resize(sops
.size());
1582 out_handler
.resize(sops
.size());
1583 out_rval
.resize(sops
.size());
1584 out_ec
.resize(sops
.size());
1585 for (uint32_t i
= 0; i
< sops
.size(); i
++) {
1586 out_bl
[i
] = &sops
[i
].outdata
;
1587 out_rval
[i
] = &sops
[i
].rval
;
1588 out_ec
[i
] = nullptr;
1593 * Pin/unpin an object in cache tier
1596 add_op(CEPH_OSD_OP_CACHE_PIN
);
1599 void cache_unpin() {
1600 add_op(CEPH_OSD_OP_CACHE_UNPIN
);
1604 inline std::ostream
& operator <<(std::ostream
& m
, const ObjectOperation
& oo
) {
1605 auto i
= oo
.ops
.cbegin();
1607 while (i
!= oo
.ops
.cend()) {
1608 if (i
!= oo
.ops
.cbegin())
1620 class Objecter
: public md_config_obs_t
, public Dispatcher
{
1621 using MOSDOp
= _mosdop::MOSDOp
<osdc_opvec
>;
1623 using OpSignature
= void(boost::system::error_code
);
1624 using OpCompletion
= ceph::async::Completion
<OpSignature
>;
1626 // config observer bits
1627 const char** get_tracked_conf_keys() const override
;
1628 void handle_conf_change(const ConfigProxy
& conf
,
1629 const std::set
<std::string
> &changed
) override
;
1632 Messenger
*messenger
;
1634 boost::asio::io_context
& service
;
1635 // The guaranteed sequenced, one-at-a-time execution and apparently
1636 // people sometimes depend on this.
1637 boost::asio::io_context::strand finish_strand
{service
};
1638 ZTracer::Endpoint trace_endpoint
{"0.0.0.0", 0, "Objecter"};
1640 std::unique_ptr
<OSDMap
> osdmap
{std::make_unique
<OSDMap
>()};
1642 using Dispatcher::cct
;
1643 std::multimap
<std::string
,std::string
> crush_location
;
1645 std::atomic
<bool> initialized
{false};
1648 std::atomic
<uint64_t> last_tid
{0};
1649 std::atomic
<unsigned> inflight_ops
{0};
1650 std::atomic
<int> client_inc
{-1};
1651 uint64_t max_linger_id
{0};
1652 std::atomic
<unsigned> num_in_flight
{0};
1653 std::atomic
<int> global_op_flags
{0}; // flags which are applied to each IO op
1654 bool keep_balanced_budget
= false;
1655 bool honor_pool_full
= true;
1657 // If this is true, accumulate a set of blocklisted entities
1658 // to be drained by consume_blocklist_events.
1659 bool blocklist_events_enabled
= false;
1660 std::set
<entity_addr_t
> blocklist_events
;
1661 struct pg_mapping_t
{
1663 std::vector
<int> up
;
1664 int up_primary
= -1;
1665 std::vector
<int> acting
;
1666 int acting_primary
= -1;
1669 pg_mapping_t(epoch_t epoch
, std::vector
<int> up
, int up_primary
,
1670 std::vector
<int> acting
, int acting_primary
)
1671 : epoch(epoch
), up(up
), up_primary(up_primary
),
1672 acting(acting
), acting_primary(acting_primary
) {}
1674 ceph::shared_mutex pg_mapping_lock
=
1675 ceph::make_shared_mutex("Objecter::pg_mapping_lock");
1676 // pool -> pg mapping
1677 std::map
<int64_t, std::vector
<pg_mapping_t
>> pg_mappings
;
1679 // convenient accessors
1680 bool lookup_pg_mapping(const pg_t
& pg
, pg_mapping_t
* pg_mapping
) {
1681 std::shared_lock l
{pg_mapping_lock
};
1682 auto it
= pg_mappings
.find(pg
.pool());
1683 if (it
== pg_mappings
.end())
1685 auto& mapping_array
= it
->second
;
1686 if (pg
.ps() >= mapping_array
.size())
1688 if (mapping_array
[pg
.ps()].epoch
!= pg_mapping
->epoch
) // stale
1690 *pg_mapping
= mapping_array
[pg
.ps()];
1693 void update_pg_mapping(const pg_t
& pg
, pg_mapping_t
&& pg_mapping
) {
1694 std::lock_guard l
{pg_mapping_lock
};
1695 auto& mapping_array
= pg_mappings
[pg
.pool()];
1696 ceph_assert(pg
.ps() < mapping_array
.size());
1697 mapping_array
[pg
.ps()] = std::move(pg_mapping
);
1699 void prune_pg_mapping(const mempool::osdmap::map
<int64_t,pg_pool_t
>& pools
) {
1700 std::lock_guard l
{pg_mapping_lock
};
1701 for (auto& pool
: pools
) {
1702 auto& mapping_array
= pg_mappings
[pool
.first
];
1703 size_t pg_num
= pool
.second
.get_pg_num();
1704 if (mapping_array
.size() != pg_num
) {
1705 // catch both pg_num increasing & decreasing
1706 mapping_array
.resize(pg_num
);
1709 for (auto it
= pg_mappings
.begin(); it
!= pg_mappings
.end(); ) {
1710 if (!pools
.count(it
->first
)) {
1712 pg_mappings
.erase(it
++);
1720 void maybe_request_map();
1722 void enable_blocklist_events();
1725 void _maybe_request_map();
1727 version_t last_seen_osdmap_version
= 0;
1728 version_t last_seen_pgmap_version
= 0;
1730 mutable ceph::shared_mutex rwlock
=
1731 ceph::make_shared_mutex("Objecter::rwlock");
1732 ceph::timer
<ceph::coarse_mono_clock
> timer
;
1734 PerfCounters
* logger
= nullptr;
1736 uint64_t tick_event
= 0;
1740 void update_crush_location();
1742 class RequestStateHook
;
1744 RequestStateHook
*m_request_state_hook
= nullptr;
1747 /*** track pending operations ***/
1752 struct op_target_t
{
1755 epoch_t epoch
= 0; ///< latest epoch we calculated the mapping
1758 object_locator_t base_oloc
;
1759 object_t target_oid
;
1760 object_locator_t target_oloc
;
1762 ///< true if we are directed at base_pgid, not base_oid
1763 bool precalc_pgid
= false;
1765 ///< true if we have ever mapped to a valid pool
1766 bool pool_ever_existed
= false;
1768 ///< explcit pg target, if any
1771 pg_t pgid
; ///< last (raw) pg we mapped to
1772 spg_t actual_pgid
; ///< last (actual) spg_t we mapped to
1773 unsigned pg_num
= 0; ///< last pg_num we mapped to
1774 unsigned pg_num_mask
= 0; ///< last pg_num_mask we mapped to
1775 unsigned pg_num_pending
= 0; ///< last pg_num we mapped to
1776 std::vector
<int> up
; ///< set of up osds for last pg we mapped to
1777 std::vector
<int> acting
; ///< set of acting osds for last pg we mapped to
1778 int up_primary
= -1; ///< last up_primary we mapped to
1779 int acting_primary
= -1; ///< last acting_primary we mapped to
1780 int size
= -1; ///< the size of the pool when were were last mapped
1781 int min_size
= -1; ///< the min size of the pool when were were last mapped
1782 bool sort_bitwise
= false; ///< whether the hobject_t sort order is bitwise
1783 bool recovery_deletes
= false; ///< whether the deletes are performed during recovery instead of peering
1784 uint32_t peering_crush_bucket_count
= 0;
1785 uint32_t peering_crush_bucket_target
= 0;
1786 uint32_t peering_crush_bucket_barrier
= 0;
1787 int32_t peering_crush_mandatory_member
= CRUSH_ITEM_NONE
;
1789 bool used_replica
= false;
1790 bool paused
= false;
1792 int osd
= -1; ///< the final target osd, or -1
1794 epoch_t last_force_resend
= 0;
1796 op_target_t(object_t oid
, object_locator_t oloc
, int flags
)
1802 explicit op_target_t(pg_t pgid
)
1803 : base_oloc(pgid
.pool(), pgid
.ps()),
1808 op_target_t() = default;
1810 hobject_t
get_hobj() {
1811 return hobject_t(target_oid
,
1814 target_oloc
.hash
>= 0 ? target_oloc
.hash
: pgid
.ps(),
1816 target_oloc
.nspace
);
1819 bool contained_by(const hobject_t
& begin
, const hobject_t
& end
) {
1820 hobject_t h
= get_hobj();
1821 int r
= cmp(h
, begin
);
1822 return r
== 0 || (r
> 0 && h
< end
);
1825 bool respects_full() const {
1827 (flags
& (CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_RWORDERED
)) &&
1828 !(flags
& (CEPH_OSD_FLAG_FULL_TRY
| CEPH_OSD_FLAG_FULL_FORCE
));
1831 void dump(ceph::Formatter
*f
) const;
1834 std::unique_ptr
<ceph::async::Completion
<void(boost::system::error_code
)>>
1835 OpContextVert(Context
* c
) {
1837 return ceph::async::Completion
<void(boost::system::error_code
)>::create(
1838 service
.get_executor(),
1839 [c
= std::unique_ptr
<Context
>(c
)]
1840 (boost::system::error_code e
) mutable {
1841 c
.release()->complete(e
);
1847 template<typename T
>
1848 std::unique_ptr
<ceph::async::Completion
<void(boost::system::error_code
, T
)>>
1849 OpContextVert(Context
* c
, T
* p
) {
1853 ceph::async::Completion
<void(boost::system::error_code
, T
)>::create(
1854 service
.get_executor(),
1855 [c
= std::unique_ptr
<Context
>(c
), p
]
1856 (boost::system::error_code e
, T r
) mutable {
1860 c
.release()->complete(ceph::from_error_code(e
));
1866 template<typename T
>
1867 std::unique_ptr
<ceph::async::Completion
<void(boost::system::error_code
, T
)>>
1868 OpContextVert(Context
* c
, T
& p
) {
1870 return ceph::async::Completion
<
1871 void(boost::system::error_code
, T
)>::create(
1872 service
.get_executor(),
1873 [c
= std::unique_ptr
<Context
>(c
), &p
]
1874 (boost::system::error_code e
, T r
) mutable {
1877 c
.release()->complete(ceph::from_error_code(e
));
1883 struct Op
: public RefCountedObject
{
1884 OSDSession
*session
= nullptr;
1885 int incarnation
= 0;
1889 ConnectionRef con
= nullptr; // for rx buffer only
1890 uint64_t features
= CEPH_FEATURES_SUPPORTED_DEFAULT
; // explicitly specified op features
1894 snapid_t snapid
= CEPH_NOSNAP
;
1896 ceph::real_time mtime
;
1898 ceph::buffer::list
*outbl
= nullptr;
1899 boost::container::small_vector
<ceph::buffer::list
*, osdc_opvec_len
> out_bl
;
1900 boost::container::small_vector
<
1901 fu2::unique_function
<void(boost::system::error_code
, int,
1902 const ceph::buffer::list
& bl
) &&>,
1903 osdc_opvec_len
> out_handler
;
1904 boost::container::small_vector
<int*, osdc_opvec_len
> out_rval
;
1905 boost::container::small_vector
<boost::system::error_code
*,
1906 osdc_opvec_len
> out_ec
;
1909 using OpSig
= void(boost::system::error_code
);
1910 using OpComp
= ceph::async::Completion
<OpSig
>;
1911 // Due to an irregularity of cmpxattr, we actualy need the 'int'
1912 // value for onfinish for legacy librados users. As such just
1913 // preserve the Context* in this one case. That way we can have
1914 // our callers just pass in a unique_ptr<OpComp> and not deal with
1915 // our signature in Objecter being different than the exposed
1916 // signature in RADOS.
1918 // Add a function for the linger case, where we want better
1919 // semantics than Context, but still need to be under the completion_lock.
1920 std::variant
<std::unique_ptr
<OpComp
>, fu2::unique_function
<OpSig
>,
1922 uint64_t ontimeout
= 0;
1928 epoch_t
*reply_epoch
= nullptr;
1930 ceph::coarse_mono_time stamp
;
1932 epoch_t map_dne_bound
= 0;
1936 /// true if we should resend this message on failure
1937 bool should_resend
= true;
1939 /// true if the throttle budget is get/put on a series of OPs,
1940 /// instead of per OP basis, when this flag is set, the budget is
1941 /// acquired before sending the very first OP of the series and
1942 /// released upon receiving the last OP reply.
1943 bool ctx_budgeted
= false;
1947 osd_reqid_t reqid
; // explicitly setting reqid
1948 ZTracer::Trace trace
;
1950 static bool has_completion(decltype(onfinish
)& f
) {
1951 return std::visit([](auto&& arg
) { return bool(arg
);}, f
);
1953 bool has_completion() {
1954 return has_completion(onfinish
);
1957 static void complete(decltype(onfinish
)&& f
, boost::system::error_code ec
,
1959 std::visit([ec
, r
](auto&& arg
) {
1960 if constexpr (std::is_same_v
<std::decay_t
<decltype(arg
)>,
1963 } else if constexpr (std::is_same_v
<std::decay_t
<decltype(arg
)>,
1964 fu2::unique_function
<OpSig
>>) {
1967 arg
->defer(std::move(arg
), ec
);
1971 void complete(boost::system::error_code ec
, int r
) {
1972 complete(std::move(onfinish
), ec
, r
);
1975 Op(const object_t
& o
, const object_locator_t
& ol
, osdc_opvec
&& _ops
,
1976 int f
, std::unique_ptr
<OpComp
>&& fin
,
1977 version_t
*ov
, int *offset
= nullptr,
1978 ZTracer::Trace
*parent_trace
= nullptr) :
1980 ops(std::move(_ops
)),
1981 out_bl(ops
.size(), nullptr),
1982 out_handler(ops
.size()),
1983 out_rval(ops
.size(), nullptr),
1984 out_ec(ops
.size(), nullptr),
1985 onfinish(std::move(fin
)),
1987 data_offset(offset
) {
1988 if (target
.base_oloc
.key
== o
)
1989 target
.base_oloc
.key
.clear();
1990 if (parent_trace
&& parent_trace
->valid()) {
1991 trace
.init("op", nullptr, parent_trace
);
1992 trace
.event("start");
1996 Op(const object_t
& o
, const object_locator_t
& ol
, osdc_opvec
&& _ops
,
1997 int f
, Context
* fin
, version_t
*ov
, int *offset
= nullptr,
1998 ZTracer::Trace
*parent_trace
= nullptr) :
2000 ops(std::move(_ops
)),
2001 out_bl(ops
.size(), nullptr),
2002 out_handler(ops
.size()),
2003 out_rval(ops
.size(), nullptr),
2004 out_ec(ops
.size(), nullptr),
2007 data_offset(offset
) {
2008 if (target
.base_oloc
.key
== o
)
2009 target
.base_oloc
.key
.clear();
2010 if (parent_trace
&& parent_trace
->valid()) {
2011 trace
.init("op", nullptr, parent_trace
);
2012 trace
.event("start");
2016 Op(const object_t
& o
, const object_locator_t
& ol
, osdc_opvec
&& _ops
,
2017 int f
, fu2::unique_function
<OpSig
>&& fin
, version_t
*ov
, int *offset
= nullptr,
2018 ZTracer::Trace
*parent_trace
= nullptr) :
2020 ops(std::move(_ops
)),
2021 out_bl(ops
.size(), nullptr),
2022 out_handler(ops
.size()),
2023 out_rval(ops
.size(), nullptr),
2024 out_ec(ops
.size(), nullptr),
2025 onfinish(std::move(fin
)),
2027 data_offset(offset
) {
2028 if (target
.base_oloc
.key
== o
)
2029 target
.base_oloc
.key
.clear();
2030 if (parent_trace
&& parent_trace
->valid()) {
2031 trace
.init("op", nullptr, parent_trace
);
2032 trace
.event("start");
2036 bool operator<(const Op
& other
) const {
2037 return tid
< other
.tid
;
2042 trace
.event("finish");
2046 struct CB_Op_Map_Latest
{
2049 CB_Op_Map_Latest(Objecter
*o
, ceph_tid_t t
) : objecter(o
), tid(t
) {}
2050 void operator()(boost::system::error_code err
, version_t latest
, version_t
);
2053 struct CB_Command_Map_Latest
{
2056 CB_Command_Map_Latest(Objecter
*o
, ceph_tid_t t
) : objecter(o
), tid(t
) {}
2057 void operator()(boost::system::error_code err
, version_t latest
, version_t
);
2060 struct C_Stat
: public Context
{
2061 ceph::buffer::list bl
;
2063 ceph::real_time
*pmtime
;
2065 C_Stat(uint64_t *ps
, ceph::real_time
*pm
, Context
*c
) :
2066 psize(ps
), pmtime(pm
), fin(c
) {}
2067 void finish(int r
) override
{
2070 auto p
= bl
.cbegin();
2084 struct C_GetAttrs
: public Context
{
2085 ceph::buffer::list bl
;
2086 std::map
<std::string
,ceph::buffer::list
>& attrset
;
2088 C_GetAttrs(std::map
<std::string
, ceph::buffer::list
>& set
, Context
*c
) : attrset(set
),
2090 void finish(int r
) override
{
2093 auto p
= bl
.cbegin();
2101 // Pools and statistics
2102 struct NListContext
{
2103 collection_list_handle_t pos
;
2105 // these are for !sortbitwise compat only
2107 int starting_pg_num
= 0;
2108 bool sort_bitwise
= false;
2110 bool at_end_of_pool
= false; ///< publicly visible end flag
2112 int64_t pool_id
= -1;
2113 int pool_snap_seq
= 0;
2114 uint64_t max_entries
= 0;
2117 ceph::buffer::list bl
; // raw data read to here
2118 std::list
<librados::ListObjectImpl
> list
;
2120 ceph::buffer::list filter
;
2122 // The budget associated with this context, once it is set (>= 0),
2123 // the budget is not get/released on OP basis, instead the budget
2124 // is acquired before sending the first OP and released upon receiving
2125 // the last op reply.
2126 int ctx_budget
= -1;
2128 bool at_end() const {
2129 return at_end_of_pool
;
2132 uint32_t get_pg_hash_position() const {
2133 return pos
.get_hash();
2137 struct C_NList
: public Context
{
2138 NListContext
*list_context
;
2139 Context
*final_finish
;
2142 C_NList(NListContext
*lc
, Context
* finish
, Objecter
*ob
) :
2143 list_context(lc
), final_finish(finish
), objecter(ob
), epoch(0) {}
2144 void finish(int r
) override
{
2146 objecter
->_nlist_reply(list_context
, r
, final_finish
, epoch
);
2148 final_finish
->complete(r
);
2155 std::vector
<std::string
> pools
;
2156 using OpSig
= void(boost::system::error_code
,
2157 boost::container::flat_map
<std::string
, pool_stat_t
>,
2159 using OpComp
= ceph::async::Completion
<OpSig
>;
2160 std::unique_ptr
<OpComp
> onfinish
;
2161 std::uint64_t ontimeout
;
2162 ceph::coarse_mono_time last_submit
;
2167 boost::optional
<int64_t> data_pool
;
2168 using OpSig
= void(boost::system::error_code
,
2169 const struct ceph_statfs
);
2170 using OpComp
= ceph::async::Completion
<OpSig
>;
2172 std::unique_ptr
<OpComp
> onfinish
;
2175 ceph::coarse_mono_time last_submit
;
2182 using OpSig
= void(boost::system::error_code
, ceph::buffer::list
);
2183 using OpComp
= ceph::async::Completion
<OpSig
>;
2184 std::unique_ptr
<OpComp
> onfinish
;
2185 uint64_t ontimeout
= 0;
2187 int16_t crush_rule
= 0;
2188 snapid_t snapid
= 0;
2189 ceph::coarse_mono_time last_submit
;
2194 // -- osd commands --
2195 struct CommandOp
: public RefCountedObject
{
2196 OSDSession
*session
= nullptr;
2198 std::vector
<std::string
> cmd
;
2199 ceph::buffer::list inbl
;
2201 // target_osd == -1 means target_pg is valid
2202 const int target_osd
= -1;
2203 const pg_t target_pg
;
2207 epoch_t map_dne_bound
= 0;
2208 int map_check_error
= 0; // error to return if std::map check fails
2209 const char *map_check_error_str
= nullptr;
2211 using OpSig
= void(boost::system::error_code
, std::string
,
2212 ceph::buffer::list
);
2213 using OpComp
= ceph::async::Completion
<OpSig
>;
2214 std::unique_ptr
<OpComp
> onfinish
;
2216 uint64_t ontimeout
= 0;
2217 ceph::coarse_mono_time last_submit
;
2221 std::vector
<string
>&& cmd
,
2222 ceph::buffer::list
&& inbl
,
2223 decltype(onfinish
)&& onfinish
)
2224 : cmd(std::move(cmd
)),
2225 inbl(std::move(inbl
)),
2226 target_osd(target_osd
),
2227 onfinish(std::move(onfinish
)) {}
2231 std::vector
<string
>&& cmd
,
2232 ceph::buffer::list
&& inbl
,
2233 decltype(onfinish
)&& onfinish
)
2234 : cmd(std::move(cmd
)),
2235 inbl(std::move(inbl
)),
2238 onfinish(std::move(onfinish
)) {}
2241 void submit_command(CommandOp
*c
, ceph_tid_t
*ptid
);
2242 int _calc_command_target(CommandOp
*c
,
2243 ceph::shunique_lock
<ceph::shared_mutex
> &sul
);
2244 void _assign_command_session(CommandOp
*c
,
2245 ceph::shunique_lock
<ceph::shared_mutex
> &sul
);
2246 void _send_command(CommandOp
*c
);
2247 int command_op_cancel(OSDSession
*s
, ceph_tid_t tid
,
2248 boost::system::error_code ec
);
2249 void _finish_command(CommandOp
*c
, boost::system::error_code ec
,
2250 std::string
&& rs
, ceph::buffer::list
&& bl
);
2251 void handle_command_reply(MCommandReply
*m
);
2253 // -- lingering ops --
2255 struct LingerOp
: public RefCountedObject
{
2257 uint64_t linger_id
{0};
2258 op_target_t target
{object_t(), object_locator_t(), 0};
2259 snapid_t snap
{CEPH_NOSNAP
};
2261 ceph::real_time mtime
;
2264 ceph::buffer::list inbl
;
2265 version_t
*pobjver
{nullptr};
2267 bool is_watch
{false};
2268 ceph::coarse_mono_time watch_valid_thru
; ///< send time for last acked ping
2269 boost::system::error_code last_error
; ///< error from last failed ping|reconnect, if any
2270 ceph::shared_mutex watch_lock
;
2272 // queue of pending async operations, with the timestamp of
2273 // when they were queued.
2274 std::list
<ceph::coarse_mono_time
> watch_pending_async
;
2276 uint32_t register_gen
{0};
2277 bool registered
{false};
2278 bool canceled
{false};
2279 using OpSig
= void(boost::system::error_code
, ceph::buffer::list
);
2280 using OpComp
= ceph::async::Completion
<OpSig
>;
2281 std::unique_ptr
<OpComp
> on_reg_commit
;
2282 std::unique_ptr
<OpComp
> on_notify_finish
;
2283 uint64_t notify_id
{0};
2285 fu2::unique_function
<void(boost::system::error_code
,
2288 uint64_t notifier_id
,
2289 ceph::buffer::list
&& bl
)> handle
;
2290 OSDSession
*session
{nullptr};
2293 ceph_tid_t register_tid
{0};
2294 ceph_tid_t ping_tid
{0};
2295 epoch_t map_dne_bound
{0};
2297 void _queued_async() {
2298 // watch_lock ust be locked unique
2299 watch_pending_async
.push_back(ceph::coarse_mono_clock::now());
2301 void finished_async() {
2302 unique_lock
l(watch_lock
);
2303 ceph_assert(!watch_pending_async
.empty());
2304 watch_pending_async
.pop_front();
2307 LingerOp(Objecter
*o
, uint64_t linger_id
);
2308 const LingerOp
& operator=(const LingerOp
& r
) = delete;
2309 LingerOp(const LingerOp
& o
) = delete;
2311 uint64_t get_cookie() {
2312 return reinterpret_cast<uint64_t>(this);
2316 struct CB_Linger_Commit
{
2318 boost::intrusive_ptr
<LingerOp
> info
;
2319 ceph::buffer::list outbl
; // used for notify only
2320 CB_Linger_Commit(Objecter
*o
, LingerOp
*l
) : objecter(o
), info(l
) {}
2321 ~CB_Linger_Commit() = default;
2323 void operator()(boost::system::error_code ec
) {
2324 objecter
->_linger_commit(info
.get(), ec
, outbl
);
2328 struct CB_Linger_Reconnect
{
2330 boost::intrusive_ptr
<LingerOp
> info
;
2331 CB_Linger_Reconnect(Objecter
*o
, LingerOp
*l
) : objecter(o
), info(l
) {}
2332 ~CB_Linger_Reconnect() = default;
2334 void operator()(boost::system::error_code ec
) {
2335 objecter
->_linger_reconnect(info
.get(), ec
);
2340 struct CB_Linger_Ping
{
2342 boost::intrusive_ptr
<LingerOp
> info
;
2343 ceph::coarse_mono_time sent
;
2344 uint32_t register_gen
;
2345 CB_Linger_Ping(Objecter
*o
, LingerOp
*l
, ceph::coarse_mono_time s
)
2346 : objecter(o
), info(l
), sent(s
), register_gen(info
->register_gen
) {}
2347 void operator()(boost::system::error_code ec
) {
2348 objecter
->_linger_ping(info
.get(), ec
, sent
, register_gen
);
2353 struct CB_Linger_Map_Latest
{
2356 CB_Linger_Map_Latest(Objecter
*o
, uint64_t id
) : objecter(o
), linger_id(id
) {}
2357 void operator()(boost::system::error_code err
, version_t latest
, version_t
);
2360 // -- osd sessions --
2364 hobject_t begin
, end
;
2367 struct OSDSession
: public RefCountedObject
{
2369 std::map
<ceph_tid_t
,Op
*> ops
;
2370 std::map
<uint64_t, LingerOp
*> linger_ops
;
2371 std::map
<ceph_tid_t
,CommandOp
*> command_ops
;
2374 std::map
<spg_t
,std::map
<hobject_t
,OSDBackoff
>> backoffs
;
2375 std::map
<uint64_t,OSDBackoff
*> backoffs_by_id
;
2378 // NB locking two sessions at the same time is only safe because
2379 // it is only done in _recalc_linger_op_target with s and
2380 // linger_op->session, and it holds rwlock for write. We disable
2381 // lockdep (using std::sharedMutex) because lockdep doesn't know
2383 std::shared_mutex lock
;
2388 std::unique_ptr
<std::mutex
[]> completion_locks
;
2390 OSDSession(CephContext
*cct
, int o
) :
2391 osd(o
), incarnation(0), con(NULL
),
2392 num_locks(cct
->_conf
->objecter_completion_locks_per_session
),
2393 completion_locks(new std::mutex
[num_locks
]) {}
2395 ~OSDSession() override
;
2397 bool is_homeless() { return (osd
== -1); }
2399 std::unique_lock
<std::mutex
> get_lock(object_t
& oid
);
2401 std::map
<int,OSDSession
*> osd_sessions
;
2403 bool osdmap_full_flag() const;
2404 bool osdmap_pool_full(const int64_t pool_id
) const;
2410 * Test pg_pool_t::FLAG_FULL on a pool
2412 * @return true if the pool exists and has the flag set, or
2413 * the global full flag is set, else false
2415 bool _osdmap_pool_full(const int64_t pool_id
) const;
2416 bool _osdmap_pool_full(const pg_pool_t
&p
) const {
2417 return p
.has_flag(pg_pool_t::FLAG_FULL
) && honor_pool_full
;
2419 void update_pool_full_map(std::map
<int64_t, bool>& pool_full_map
);
2421 std::map
<uint64_t, LingerOp
*> linger_ops
;
2422 // we use this just to confirm a cookie is valid before dereferencing the ptr
2423 std::set
<LingerOp
*> linger_ops_set
;
2425 std::map
<ceph_tid_t
,PoolStatOp
*> poolstat_ops
;
2426 std::map
<ceph_tid_t
,StatfsOp
*> statfs_ops
;
2427 std::map
<ceph_tid_t
,PoolOp
*> pool_ops
;
2428 std::atomic
<unsigned> num_homeless_ops
{0};
2430 OSDSession
* homeless_session
= new OSDSession(cct
, -1);
2433 // ops waiting for an osdmap with a new pool or confirmation that
2434 // the pool does not exist (may be expanded to other uses later)
2435 std::map
<uint64_t, LingerOp
*> check_latest_map_lingers
;
2436 std::map
<ceph_tid_t
, Op
*> check_latest_map_ops
;
2437 std::map
<ceph_tid_t
, CommandOp
*> check_latest_map_commands
;
2440 std::vector
<std::pair
<std::unique_ptr
<OpCompletion
>,
2441 boost::system::error_code
>>> waiting_for_map
;
2443 ceph::timespan mon_timeout
;
2444 ceph::timespan osd_timeout
;
2446 MOSDOp
*_prepare_osd_op(Op
*op
);
2447 void _send_op(Op
*op
);
2448 void _send_op_account(Op
*op
);
2449 void _cancel_linger_op(Op
*op
);
2450 void _finish_op(Op
*op
, int r
);
2451 static bool is_pg_changed(
2453 const std::vector
<int>& oldacting
,
2455 const std::vector
<int>& newacting
,
2456 bool any_change
=false);
2457 enum recalc_op_target_result
{
2458 RECALC_OP_TARGET_NO_ACTION
= 0,
2459 RECALC_OP_TARGET_NEED_RESEND
,
2460 RECALC_OP_TARGET_POOL_DNE
,
2461 RECALC_OP_TARGET_OSD_DNE
,
2462 RECALC_OP_TARGET_OSD_DOWN
,
2464 bool _osdmap_full_flag() const;
2465 bool _osdmap_has_pool_full() const;
2467 const mempool::osdmap::map
<int64_t, snap_interval_set_t
>& new_removed_snaps
,
2470 bool target_should_be_paused(op_target_t
*op
);
2471 int _calc_target(op_target_t
*t
, Connection
*con
,
2472 bool any_change
= false);
2473 int _map_session(op_target_t
*op
, OSDSession
**s
,
2474 ceph::shunique_lock
<ceph::shared_mutex
>& lc
);
2476 void _session_op_assign(OSDSession
*s
, Op
*op
);
2477 void _session_op_remove(OSDSession
*s
, Op
*op
);
2478 void _session_linger_op_assign(OSDSession
*to
, LingerOp
*op
);
2479 void _session_linger_op_remove(OSDSession
*from
, LingerOp
*op
);
2480 void _session_command_op_assign(OSDSession
*to
, CommandOp
*op
);
2481 void _session_command_op_remove(OSDSession
*from
, CommandOp
*op
);
2483 int _assign_op_target_session(Op
*op
, ceph::shunique_lock
<ceph::shared_mutex
>& lc
,
2484 bool src_session_locked
,
2485 bool dst_session_locked
);
2486 int _recalc_linger_op_target(LingerOp
*op
,
2487 ceph::shunique_lock
<ceph::shared_mutex
>& lc
);
2489 void _linger_submit(LingerOp
*info
,
2490 ceph::shunique_lock
<ceph::shared_mutex
>& sul
);
2491 void _send_linger(LingerOp
*info
,
2492 ceph::shunique_lock
<ceph::shared_mutex
>& sul
);
2493 void _linger_commit(LingerOp
*info
, boost::system::error_code ec
,
2494 ceph::buffer::list
& outbl
);
2495 void _linger_reconnect(LingerOp
*info
, boost::system::error_code ec
);
2496 void _send_linger_ping(LingerOp
*info
);
2497 void _linger_ping(LingerOp
*info
, boost::system::error_code ec
,
2498 ceph::coarse_mono_time sent
, uint32_t register_gen
);
2499 boost::system::error_code
_normalize_watch_error(boost::system::error_code ec
);
2501 friend class CB_Objecter_GetVersion
;
2502 friend class CB_DoWatchError
;
2504 template<typename CT
>
2505 auto linger_callback_flush(CT
&& ct
) {
2506 boost::asio::async_completion
<CT
, void(void)> init(ct
);
2507 boost::asio::defer(finish_strand
, std::move(init
.completion_handler
));
2508 return init
.result
.get();
2512 void _check_op_pool_dne(Op
*op
, std::unique_lock
<std::shared_mutex
> *sl
);
2513 void _send_op_map_check(Op
*op
);
2514 void _op_cancel_map_check(Op
*op
);
2515 void _check_linger_pool_dne(LingerOp
*op
, bool *need_unregister
);
2516 void _send_linger_map_check(LingerOp
*op
);
2517 void _linger_cancel_map_check(LingerOp
*op
);
2518 void _check_command_map_dne(CommandOp
*op
);
2519 void _send_command_map_check(CommandOp
*op
);
2520 void _command_cancel_map_check(CommandOp
*op
);
2522 void _kick_requests(OSDSession
*session
, std::map
<uint64_t, LingerOp
*>& lresend
);
2523 void _linger_ops_resend(std::map
<uint64_t, LingerOp
*>& lresend
,
2524 std::unique_lock
<ceph::shared_mutex
>& ul
);
2526 int _get_session(int osd
, OSDSession
**session
,
2527 ceph::shunique_lock
<ceph::shared_mutex
>& sul
);
2528 void put_session(OSDSession
*s
);
2529 void get_session(OSDSession
*s
);
2530 void _reopen_session(OSDSession
*session
);
2531 void close_session(OSDSession
*session
);
2533 void _nlist_reply(NListContext
*list_context
, int r
, Context
*final_finish
,
2534 epoch_t reply_epoch
);
2536 void resend_mon_ops();
2539 * handle a budget for in-flight ops
2540 * budget is taken whenever an op goes into the ops std::map
2541 * and returned whenever an op is removed from the std::map
2542 * If throttle_op needs to throttle it will unlock client_lock.
2544 int calc_op_budget(const boost::container::small_vector_base
<OSDOp
>& ops
);
2545 void _throttle_op(Op
*op
, ceph::shunique_lock
<ceph::shared_mutex
>& sul
,
2547 int _take_op_budget(Op
*op
, ceph::shunique_lock
<ceph::shared_mutex
>& sul
) {
2548 ceph_assert(sul
&& sul
.mutex() == &rwlock
);
2549 int op_budget
= calc_op_budget(op
->ops
);
2550 if (keep_balanced_budget
) {
2551 _throttle_op(op
, sul
, op_budget
);
2552 } else { // update take_linger_budget to match this!
2553 op_throttle_bytes
.take(op_budget
);
2554 op_throttle_ops
.take(1);
2556 op
->budget
= op_budget
;
2559 int take_linger_budget(LingerOp
*info
);
2560 void put_op_budget_bytes(int op_budget
) {
2561 ceph_assert(op_budget
>= 0);
2562 op_throttle_bytes
.put(op_budget
);
2563 op_throttle_ops
.put(1);
2565 void put_nlist_context_budget(NListContext
*list_context
);
2566 Throttle op_throttle_bytes
{cct
, "objecter_bytes",
2567 static_cast<int64_t>(
2568 cct
->_conf
->objecter_inflight_op_bytes
)};
2569 Throttle op_throttle_ops
{cct
, "objecter_ops",
2570 static_cast<int64_t>(
2571 cct
->_conf
->objecter_inflight_ops
)};
2573 Objecter(CephContext
*cct
, Messenger
*m
, MonClient
*mc
,
2574 boost::asio::io_context
& service
);
2575 ~Objecter() override
;
2578 void start(const OSDMap
*o
= nullptr);
2581 // These two templates replace osdmap_(get)|(put)_read. Simply wrap
2582 // whatever functionality you want to use the OSDMap in a lambda like:
2584 // with_osdmap([](const OSDMap& o) { o.do_stuff(); });
2588 // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); });
2590 // Do not call into something that will try to lock the OSDMap from
2591 // here or you will have great woe and misery.
2593 template<typename Callback
, typename
...Args
>
2594 decltype(auto) with_osdmap(Callback
&& cb
, Args
&&... args
) {
2595 shared_lock
l(rwlock
);
2596 return std::forward
<Callback
>(cb
)(*osdmap
, std::forward
<Args
>(args
)...);
2601 * Tell the objecter to throttle outgoing ops according to its
2602 * budget (in _conf). If you do this, ops can block, in
2603 * which case it will unlock client_lock and sleep until
2604 * incoming messages reduce the used budget low enough for
2605 * the ops to continue going; then it will lock client_lock again.
2607 void set_balanced_budget() { keep_balanced_budget
= true; }
2608 void unset_balanced_budget() { keep_balanced_budget
= false; }
2610 void set_honor_pool_full() { honor_pool_full
= true; }
2611 void unset_honor_pool_full() { honor_pool_full
= false; }
2613 void _scan_requests(
2617 std::map
<int64_t, bool> *pool_full_map
,
2618 std::map
<ceph_tid_t
, Op
*>& need_resend
,
2619 std::list
<LingerOp
*>& need_resend_linger
,
2620 std::map
<ceph_tid_t
, CommandOp
*>& need_resend_command
,
2621 ceph::shunique_lock
<ceph::shared_mutex
>& sul
);
2623 int64_t get_object_hash_position(int64_t pool
, const std::string
& key
,
2624 const std::string
& ns
);
2625 int64_t get_object_pg_hash_position(int64_t pool
, const std::string
& key
,
2626 const std::string
& ns
);
2630 bool ms_dispatch(Message
*m
) override
;
2631 bool ms_can_fast_dispatch_any() const override
{
2634 bool ms_can_fast_dispatch(const Message
*m
) const override
{
2635 switch (m
->get_type()) {
2636 case CEPH_MSG_OSD_OPREPLY
:
2637 case CEPH_MSG_WATCH_NOTIFY
:
2643 void ms_fast_dispatch(Message
*m
) override
{
2644 if (!ms_dispatch(m
)) {
2649 void handle_osd_op_reply(class MOSDOpReply
*m
);
2650 void handle_osd_backoff(class MOSDBackoff
*m
);
2651 void handle_watch_notify(class MWatchNotify
*m
);
2652 void handle_osd_map(class MOSDMap
*m
);
2653 void wait_for_osd_map(epoch_t e
=0);
2655 template<typename CompletionToken
>
2656 auto wait_for_osd_map(CompletionToken
&& token
) {
2657 boost::asio::async_completion
<CompletionToken
, void()> init(token
);
2658 unique_lock
l(rwlock
);
2659 if (osdmap
->get_epoch()) {
2661 boost::asio::post(std::move(init
.completion_handler
));
2663 waiting_for_map
[0].emplace_back(
2664 OpCompletion::create(
2665 service
.get_executor(),
2666 [c
= std::move(init
.completion_handler
)]
2667 (boost::system::error_code
) mutable {
2669 }), boost::system::error_code
{});
2672 return init
.result
.get();
2677 * Get std::list of entities blocklisted since this was last called,
2678 * and reset the std::list.
2680 * Uses a std::set because typical use case is to compare some
2681 * other std::list of clients to see which overlap with the blocklisted
2685 void consume_blocklist_events(std::set
<entity_addr_t
> *events
);
2687 int pool_snap_by_name(int64_t poolid
,
2688 const char *snap_name
,
2689 snapid_t
*snap
) const;
2690 int pool_snap_get_info(int64_t poolid
, snapid_t snap
,
2691 pool_snap_info_t
*info
) const;
2692 int pool_snap_list(int64_t poolid
, std::vector
<uint64_t> *snaps
);
2695 void emit_blocklist_events(const OSDMap::Incremental
&inc
);
2696 void emit_blocklist_events(const OSDMap
&old_osd_map
,
2697 const OSDMap
&new_osd_map
);
2700 void _op_submit(Op
*op
, ceph::shunique_lock
<ceph::shared_mutex
>& lc
,
2702 void _op_submit_with_budget(Op
*op
,
2703 ceph::shunique_lock
<ceph::shared_mutex
>& lc
,
2705 int *ctx_budget
= NULL
);
2708 void op_submit(Op
*op
, ceph_tid_t
*ptid
= NULL
, int *ctx_budget
= NULL
);
2710 std::shared_lock
l(rwlock
);
2711 return !((!inflight_ops
) && linger_ops
.empty() &&
2712 poolstat_ops
.empty() && statfs_ops
.empty());
2716 * Output in-flight requests
2718 void _dump_active(OSDSession
*s
);
2719 void _dump_active();
2721 void dump_requests(ceph::Formatter
*fmt
);
2722 void _dump_ops(const OSDSession
*s
, ceph::Formatter
*fmt
);
2723 void dump_ops(ceph::Formatter
*fmt
);
2724 void _dump_linger_ops(const OSDSession
*s
, ceph::Formatter
*fmt
);
2725 void dump_linger_ops(ceph::Formatter
*fmt
);
2726 void _dump_command_ops(const OSDSession
*s
, ceph::Formatter
*fmt
);
2727 void dump_command_ops(ceph::Formatter
*fmt
);
2728 void dump_pool_ops(ceph::Formatter
*fmt
) const;
2729 void dump_pool_stat_ops(ceph::Formatter
*fmt
) const;
2730 void dump_statfs_ops(ceph::Formatter
*fmt
) const;
2732 int get_client_incarnation() const { return client_inc
; }
2733 void set_client_incarnation(int inc
) { client_inc
= inc
; }
2735 bool have_map(epoch_t epoch
);
2737 struct CB_Objecter_GetVersion
{
2739 std::unique_ptr
<OpCompletion
> fin
;
2741 CB_Objecter_GetVersion(Objecter
*o
, std::unique_ptr
<OpCompletion
> c
)
2742 : objecter(o
), fin(std::move(c
)) {}
2743 void operator()(boost::system::error_code ec
, version_t newest
,
2745 if (ec
== boost::system::errc::resource_unavailable_try_again
) {
2746 // try again as instructed
2747 objecter
->_wait_for_latest_osdmap(std::move(*this));
2749 ceph::async::post(std::move(fin
), ec
);
2751 auto l
= std::unique_lock(objecter
->rwlock
);
2752 objecter
->_get_latest_version(oldest
, newest
, std::move(fin
),
2758 template<typename CompletionToken
>
2759 auto wait_for_map(epoch_t epoch
, CompletionToken
&& token
) {
2760 boost::asio::async_completion
<CompletionToken
, OpSignature
> init(token
);
2762 if (osdmap
->get_epoch() >= epoch
) {
2763 boost::asio::post(service
,
2764 ceph::async::bind_handler(
2765 std::move(init
.completion_handler
),
2766 boost::system::error_code()));
2768 monc
->get_version("osdmap",
2769 CB_Objecter_GetVersion(
2771 OpCompletion::create(service
.get_executor(),
2772 std::move(init
.completion_handler
))));
2774 return init
.result
.get();
2777 void _wait_for_new_map(std::unique_ptr
<OpCompletion
>, epoch_t epoch
,
2778 boost::system::error_code
= {});
2781 void _wait_for_latest_osdmap(CB_Objecter_GetVersion
&& c
) {
2782 monc
->get_version("osdmap", std::move(c
));
2787 template<typename CompletionToken
>
2788 auto wait_for_latest_osdmap(CompletionToken
&& token
) {
2789 boost::asio::async_completion
<CompletionToken
, OpSignature
> init(token
);
2791 monc
->get_version("osdmap",
2792 CB_Objecter_GetVersion(
2794 OpCompletion::create(service
.get_executor(),
2795 std::move(init
.completion_handler
))));
2796 return init
.result
.get();
2799 void wait_for_latest_osdmap(std::unique_ptr
<OpCompletion
> c
) {
2800 monc
->get_version("osdmap",
2801 CB_Objecter_GetVersion(this, std::move(c
)));
2804 template<typename CompletionToken
>
2805 auto get_latest_version(epoch_t oldest
, epoch_t newest
,
2806 CompletionToken
&& token
) {
2807 boost::asio::async_completion
<CompletionToken
, OpSignature
> init(token
);
2809 std::unique_lock
wl(rwlock
);
2810 _get_latest_version(oldest
, newest
,
2811 OpCompletion::create(
2812 service
.get_executor(),
2813 std::move(init
.completion_handler
)),
2816 return init
.result
.get();
2819 void _get_latest_version(epoch_t oldest
, epoch_t neweset
,
2820 std::unique_ptr
<OpCompletion
> fin
,
2821 std::unique_lock
<ceph::shared_mutex
>&& ul
);
2823 /** Get the current set of global op flags */
2824 int get_global_op_flags() const { return global_op_flags
; }
2825 /** Add a flag to the global op flags, not really atomic operation */
2826 void add_global_op_flags(int flag
) {
2827 global_op_flags
.fetch_or(flag
);
2829 /** Clear the passed flags from the global op flag set */
2830 void clear_global_op_flag(int flags
) {
2831 global_op_flags
.fetch_and(~flags
);
2834 /// cancel an in-progress request with the given return code
2836 int op_cancel(OSDSession
*s
, ceph_tid_t tid
, int r
);
2837 int _op_cancel(ceph_tid_t tid
, int r
);
2839 int op_cancel(ceph_tid_t tid
, int r
);
2840 int op_cancel(const std::vector
<ceph_tid_t
>& tidls
, int r
);
2843 * Any write op which is in progress at the start of this call shall no
2844 * longer be in progress when this call ends. Operations started after the
2845 * start of this call may still be in progress when this call ends.
2847 * @return the latest possible epoch in which a cancelled op could have
2848 * existed, or -1 if nothing was cancelled.
2850 epoch_t
op_cancel_writes(int r
, int64_t pool
=-1);
2853 void osd_command(int osd
, std::vector
<std::string
> cmd
,
2854 ceph::buffer::list inbl
, ceph_tid_t
*ptid
,
2855 decltype(CommandOp::onfinish
)&& onfinish
) {
2856 ceph_assert(osd
>= 0);
2857 auto c
= new CommandOp(
2861 std::move(onfinish
));
2862 submit_command(c
, ptid
);
2864 template<typename CompletionToken
>
2865 auto osd_command(int osd
, std::vector
<std::string
> cmd
,
2866 ceph::buffer::list inbl
, ceph_tid_t
*ptid
,
2867 CompletionToken
&& token
) {
2868 boost::asio::async_completion
<CompletionToken
,
2869 CommandOp::OpSig
> init(token
);
2870 osd_command(osd
, std::move(cmd
), std::move(inbl
), ptid
,
2871 CommandOp::OpComp::create(service
.get_executor(),
2872 std::move(init
.completion_handler
)));
2873 return init
.result
.get();
2876 void pg_command(pg_t pgid
, std::vector
<std::string
> cmd
,
2877 ceph::buffer::list inbl
, ceph_tid_t
*ptid
,
2878 decltype(CommandOp::onfinish
)&& onfinish
) {
2879 auto *c
= new CommandOp(
2883 std::move(onfinish
));
2884 submit_command(c
, ptid
);
2887 template<typename CompletionToken
>
2888 auto pg_command(pg_t pgid
, std::vector
<std::string
> cmd
,
2889 ceph::buffer::list inbl
, ceph_tid_t
*ptid
,
2890 CompletionToken
&& token
) {
2891 boost::asio::async_completion
<CompletionToken
,
2892 CommandOp::OpSig
> init(token
);
2893 pg_command(pgid
, std::move(cmd
), std::move(inbl
), ptid
,
2894 CommandOp::OpComp::create(service
.get_executor(),
2895 std::move(init
.completion_handler
)));
2896 return init
.result
.get();
2899 // mid-level helpers
2900 Op
*prepare_mutate_op(
2901 const object_t
& oid
, const object_locator_t
& oloc
,
2902 ObjectOperation
& op
, const SnapContext
& snapc
,
2903 ceph::real_time mtime
, int flags
,
2904 Context
*oncommit
, version_t
*objver
= NULL
,
2905 osd_reqid_t reqid
= osd_reqid_t(),
2906 ZTracer::Trace
*parent_trace
= nullptr) {
2907 Op
*o
= new Op(oid
, oloc
, std::move(op
.ops
), flags
| global_op_flags
|
2908 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
,
2909 nullptr, parent_trace
);
2910 o
->priority
= op
.priority
;
2913 o
->out_rval
.swap(op
.out_rval
);
2914 o
->out_bl
.swap(op
.out_bl
);
2915 o
->out_handler
.swap(op
.out_handler
);
2916 o
->out_ec
.swap(op
.out_ec
);
2922 const object_t
& oid
, const object_locator_t
& oloc
,
2923 ObjectOperation
& op
, const SnapContext
& snapc
,
2924 ceph::real_time mtime
, int flags
,
2925 Context
*oncommit
, version_t
*objver
= NULL
,
2926 osd_reqid_t reqid
= osd_reqid_t()) {
2927 Op
*o
= prepare_mutate_op(oid
, oloc
, op
, snapc
, mtime
, flags
,
2928 oncommit
, objver
, reqid
);
2934 void mutate(const object_t
& oid
, const object_locator_t
& oloc
,
2935 ObjectOperation
&& op
, const SnapContext
& snapc
,
2936 ceph::real_time mtime
, int flags
,
2937 std::unique_ptr
<Op::OpComp
>&& oncommit
,
2938 version_t
*objver
= NULL
, osd_reqid_t reqid
= osd_reqid_t(),
2939 ZTracer::Trace
*parent_trace
= nullptr) {
2940 Op
*o
= new Op(oid
, oloc
, std::move(op
.ops
), flags
| global_op_flags
|
2941 CEPH_OSD_FLAG_WRITE
, std::move(oncommit
), objver
,
2942 nullptr, parent_trace
);
2943 o
->priority
= op
.priority
;
2946 o
->out_bl
.swap(op
.out_bl
);
2947 o
->out_handler
.swap(op
.out_handler
);
2948 o
->out_rval
.swap(op
.out_rval
);
2949 o
->out_ec
.swap(op
.out_ec
);
2955 Op
*prepare_read_op(
2956 const object_t
& oid
, const object_locator_t
& oloc
,
2957 ObjectOperation
& op
,
2958 snapid_t snapid
, ceph::buffer::list
*pbl
, int flags
,
2959 Context
*onack
, version_t
*objver
= NULL
,
2960 int *data_offset
= NULL
,
2961 uint64_t features
= 0,
2962 ZTracer::Trace
*parent_trace
= nullptr) {
2963 Op
*o
= new Op(oid
, oloc
, std::move(op
.ops
), flags
| global_op_flags
|
2964 CEPH_OSD_FLAG_READ
, onack
, objver
,
2965 data_offset
, parent_trace
);
2966 o
->priority
= op
.priority
;
2969 if (!o
->outbl
&& op
.size() == 1 && op
.out_bl
[0] && op
.out_bl
[0]->length())
2970 o
->outbl
= op
.out_bl
[0];
2971 o
->out_bl
.swap(op
.out_bl
);
2972 o
->out_handler
.swap(op
.out_handler
);
2973 o
->out_rval
.swap(op
.out_rval
);
2974 o
->out_ec
.swap(op
.out_ec
);
2979 const object_t
& oid
, const object_locator_t
& oloc
,
2980 ObjectOperation
& op
,
2981 snapid_t snapid
, ceph::buffer::list
*pbl
, int flags
,
2982 Context
*onack
, version_t
*objver
= NULL
,
2983 int *data_offset
= NULL
,
2984 uint64_t features
= 0) {
2985 Op
*o
= prepare_read_op(oid
, oloc
, op
, snapid
, pbl
, flags
, onack
, objver
,
2988 o
->features
= features
;
2994 void read(const object_t
& oid
, const object_locator_t
& oloc
,
2995 ObjectOperation
&& op
, snapid_t snapid
, ceph::buffer::list
*pbl
,
2996 int flags
, std::unique_ptr
<Op::OpComp
>&& onack
,
2997 version_t
*objver
= nullptr, int *data_offset
= nullptr,
2998 uint64_t features
= 0, ZTracer::Trace
*parent_trace
= nullptr) {
2999 Op
*o
= new Op(oid
, oloc
, std::move(op
.ops
), flags
| global_op_flags
|
3000 CEPH_OSD_FLAG_READ
, std::move(onack
), objver
,
3001 data_offset
, parent_trace
);
3002 o
->priority
= op
.priority
;
3006 if (!o
->outbl
&& op
.size() == 1 && op
.out_bl
[0] && op
.out_bl
[0]->length()) {
3007 o
->outbl
= op
.out_bl
[0];
3009 o
->out_bl
.swap(op
.out_bl
);
3010 o
->out_handler
.swap(op
.out_handler
);
3011 o
->out_rval
.swap(op
.out_rval
);
3012 o
->out_ec
.swap(op
.out_ec
);
3014 o
->features
= features
;
3020 Op
*prepare_pg_read_op(
3021 uint32_t hash
, object_locator_t oloc
,
3022 ObjectOperation
& op
, ceph::buffer::list
*pbl
, int flags
,
3023 Context
*onack
, epoch_t
*reply_epoch
,
3025 Op
*o
= new Op(object_t(), oloc
,
3027 flags
| global_op_flags
| CEPH_OSD_FLAG_READ
|
3028 CEPH_OSD_FLAG_IGNORE_OVERLAY
,
3030 o
->target
.precalc_pgid
= true;
3031 o
->target
.base_pgid
= pg_t(hash
, oloc
.pool
);
3032 o
->priority
= op
.priority
;
3033 o
->snapid
= CEPH_NOSNAP
;
3035 o
->out_bl
.swap(op
.out_bl
);
3036 o
->out_handler
.swap(op
.out_handler
);
3037 o
->out_rval
.swap(op
.out_rval
);
3038 o
->out_ec
.swap(op
.out_ec
);
3039 o
->reply_epoch
= reply_epoch
;
3041 // budget is tracked by listing context
3042 o
->ctx_budgeted
= true;
3048 uint32_t hash
, object_locator_t oloc
,
3049 ObjectOperation
& op
, ceph::buffer::list
*pbl
, int flags
,
3050 Context
*onack
, epoch_t
*reply_epoch
,
3052 Op
*o
= prepare_pg_read_op(hash
, oloc
, op
, pbl
, flags
,
3053 onack
, reply_epoch
, ctx_budget
);
3055 op_submit(o
, &tid
, ctx_budget
);
3060 uint32_t hash
, object_locator_t oloc
,
3061 ObjectOperation
& op
, ceph::buffer::list
*pbl
, int flags
,
3062 std::unique_ptr
<Op::OpComp
>&& onack
, epoch_t
*reply_epoch
, int *ctx_budget
) {
3064 Op
*o
= new Op(object_t(), oloc
,
3066 flags
| global_op_flags
| CEPH_OSD_FLAG_READ
|
3067 CEPH_OSD_FLAG_IGNORE_OVERLAY
,
3068 std::move(onack
), nullptr);
3069 o
->target
.precalc_pgid
= true;
3070 o
->target
.base_pgid
= pg_t(hash
, oloc
.pool
);
3071 o
->priority
= op
.priority
;
3072 o
->snapid
= CEPH_NOSNAP
;
3074 o
->out_bl
.swap(op
.out_bl
);
3075 o
->out_handler
.swap(op
.out_handler
);
3076 o
->out_rval
.swap(op
.out_rval
);
3077 o
->out_ec
.swap(op
.out_ec
);
3078 o
->reply_epoch
= reply_epoch
;
3080 // budget is tracked by listing context
3081 o
->ctx_budgeted
= true;
3083 op_submit(o
, &tid
, ctx_budget
);
3088 // caller owns a ref
3089 LingerOp
*linger_register(const object_t
& oid
, const object_locator_t
& oloc
,
3091 ceph_tid_t
linger_watch(LingerOp
*info
,
3092 ObjectOperation
& op
,
3093 const SnapContext
& snapc
, ceph::real_time mtime
,
3094 ceph::buffer::list
& inbl
,
3095 decltype(info
->on_reg_commit
)&& oncommit
,
3097 ceph_tid_t
linger_watch(LingerOp
*info
,
3098 ObjectOperation
& op
,
3099 const SnapContext
& snapc
, ceph::real_time mtime
,
3100 ceph::buffer::list
& inbl
,
3102 version_t
*objver
) {
3103 return linger_watch(info
, op
, snapc
, mtime
, inbl
,
3104 OpContextVert
<ceph::buffer::list
>(onfinish
, nullptr), objver
);
3106 ceph_tid_t
linger_notify(LingerOp
*info
,
3107 ObjectOperation
& op
,
3108 snapid_t snap
, ceph::buffer::list
& inbl
,
3109 decltype(LingerOp::on_reg_commit
)&& onfinish
,
3111 ceph_tid_t
linger_notify(LingerOp
*info
,
3112 ObjectOperation
& op
,
3113 snapid_t snap
, ceph::buffer::list
& inbl
,
3114 ceph::buffer::list
*poutbl
,
3116 version_t
*objver
) {
3117 return linger_notify(info
, op
, snap
, inbl
,
3118 OpContextVert(onack
, poutbl
),
3121 tl::expected
<ceph::timespan
,
3122 boost::system::error_code
> linger_check(LingerOp
*info
);
3123 void linger_cancel(LingerOp
*info
); // releases a reference
3124 void _linger_cancel(LingerOp
*info
);
3126 void _do_watch_notify(boost::intrusive_ptr
<LingerOp
> info
,
3127 boost::intrusive_ptr
<MWatchNotify
> m
);
3130 * set up initial ops in the op std::vector, and allocate a final op slot.
3132 * The caller is responsible for filling in the final ops_count ops.
3134 * @param ops op std::vector
3135 * @param ops_count number of final ops the caller will fill in
3136 * @param extra_ops pointer to [array of] initial op[s]
3137 * @return index of final op (for caller to fill in)
3139 int init_ops(boost::container::small_vector_base
<OSDOp
>& ops
, int ops_count
,
3140 ObjectOperation
*extra_ops
) {
3145 extra
= extra_ops
->ops
.size();
3147 ops
.resize(ops_count
+ extra
);
3149 for (i
=0; i
<extra
; i
++) {
3150 ops
[i
] = extra_ops
->ops
[i
];
3157 // high-level helpers
3158 Op
*prepare_stat_op(
3159 const object_t
& oid
, const object_locator_t
& oloc
,
3160 snapid_t snap
, uint64_t *psize
, ceph::real_time
*pmtime
,
3161 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
3162 ObjectOperation
*extra_ops
= NULL
) {
3164 int i
= init_ops(ops
, 1, extra_ops
);
3165 ops
[i
].op
.op
= CEPH_OSD_OP_STAT
;
3166 C_Stat
*fin
= new C_Stat(psize
, pmtime
, onfinish
);
3167 Op
*o
= new Op(oid
, oloc
, std::move(ops
), flags
| global_op_flags
|
3168 CEPH_OSD_FLAG_READ
, fin
, objver
);
3170 o
->outbl
= &fin
->bl
;
3174 const object_t
& oid
, const object_locator_t
& oloc
,
3175 snapid_t snap
, uint64_t *psize
, ceph::real_time
*pmtime
,
3176 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
3177 ObjectOperation
*extra_ops
= NULL
) {
3178 Op
*o
= prepare_stat_op(oid
, oloc
, snap
, psize
, pmtime
, flags
,
3179 onfinish
, objver
, extra_ops
);
3185 Op
*prepare_read_op(
3186 const object_t
& oid
, const object_locator_t
& oloc
,
3187 uint64_t off
, uint64_t len
, snapid_t snap
, ceph::buffer::list
*pbl
,
3188 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
3189 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0,
3190 ZTracer::Trace
*parent_trace
= nullptr) {
3192 int i
= init_ops(ops
, 1, extra_ops
);
3193 ops
[i
].op
.op
= CEPH_OSD_OP_READ
;
3194 ops
[i
].op
.extent
.offset
= off
;
3195 ops
[i
].op
.extent
.length
= len
;
3196 ops
[i
].op
.extent
.truncate_size
= 0;
3197 ops
[i
].op
.extent
.truncate_seq
= 0;
3198 ops
[i
].op
.flags
= op_flags
;
3199 Op
*o
= new Op(oid
, oloc
, std::move(ops
), flags
| global_op_flags
|
3200 CEPH_OSD_FLAG_READ
, onfinish
, objver
,
3201 nullptr, parent_trace
);
3207 const object_t
& oid
, const object_locator_t
& oloc
,
3208 uint64_t off
, uint64_t len
, snapid_t snap
, ceph::buffer::list
*pbl
,
3209 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
3210 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
3211 Op
*o
= prepare_read_op(oid
, oloc
, off
, len
, snap
, pbl
, flags
,
3212 onfinish
, objver
, extra_ops
, op_flags
);
3218 Op
*prepare_cmpext_op(
3219 const object_t
& oid
, const object_locator_t
& oloc
,
3220 uint64_t off
, ceph::buffer::list
&cmp_bl
,
3221 snapid_t snap
, int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
3222 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
3224 int i
= init_ops(ops
, 1, extra_ops
);
3225 ops
[i
].op
.op
= CEPH_OSD_OP_CMPEXT
;
3226 ops
[i
].op
.extent
.offset
= off
;
3227 ops
[i
].op
.extent
.length
= cmp_bl
.length();
3228 ops
[i
].op
.extent
.truncate_size
= 0;
3229 ops
[i
].op
.extent
.truncate_seq
= 0;
3230 ops
[i
].indata
= cmp_bl
;
3231 ops
[i
].op
.flags
= op_flags
;
3232 Op
*o
= new Op(oid
, oloc
, std::move(ops
), flags
| global_op_flags
|
3233 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
3239 const object_t
& oid
, const object_locator_t
& oloc
,
3240 uint64_t off
, ceph::buffer::list
&cmp_bl
,
3241 snapid_t snap
, int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
3242 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
3243 Op
*o
= prepare_cmpext_op(oid
, oloc
, off
, cmp_bl
, snap
,
3244 flags
, onfinish
, objver
, extra_ops
, op_flags
);
3250 ceph_tid_t
read_trunc(const object_t
& oid
, const object_locator_t
& oloc
,
3251 uint64_t off
, uint64_t len
, snapid_t snap
,
3252 ceph::buffer::list
*pbl
, int flags
, uint64_t trunc_size
,
3253 __u32 trunc_seq
, Context
*onfinish
,
3254 version_t
*objver
= NULL
,
3255 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
3257 int i
= init_ops(ops
, 1, extra_ops
);
3258 ops
[i
].op
.op
= CEPH_OSD_OP_READ
;
3259 ops
[i
].op
.extent
.offset
= off
;
3260 ops
[i
].op
.extent
.length
= len
;
3261 ops
[i
].op
.extent
.truncate_size
= trunc_size
;
3262 ops
[i
].op
.extent
.truncate_seq
= trunc_seq
;
3263 ops
[i
].op
.flags
= op_flags
;
3264 Op
*o
= new Op(oid
, oloc
, std::move(ops
), flags
| global_op_flags
|
3265 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
3272 ceph_tid_t
mapext(const object_t
& oid
, const object_locator_t
& oloc
,
3273 uint64_t off
, uint64_t len
, snapid_t snap
, ceph::buffer::list
*pbl
,
3274 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
3275 ObjectOperation
*extra_ops
= NULL
) {
3277 int i
= init_ops(ops
, 1, extra_ops
);
3278 ops
[i
].op
.op
= CEPH_OSD_OP_MAPEXT
;
3279 ops
[i
].op
.extent
.offset
= off
;
3280 ops
[i
].op
.extent
.length
= len
;
3281 ops
[i
].op
.extent
.truncate_size
= 0;
3282 ops
[i
].op
.extent
.truncate_seq
= 0;
3283 Op
*o
= new Op(oid
, oloc
, std::move(ops
), flags
| global_op_flags
|
3284 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
3291 ceph_tid_t
getxattr(const object_t
& oid
, const object_locator_t
& oloc
,
3292 const char *name
, snapid_t snap
, ceph::buffer::list
*pbl
, int flags
,
3294 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
3296 int i
= init_ops(ops
, 1, extra_ops
);
3297 ops
[i
].op
.op
= CEPH_OSD_OP_GETXATTR
;
3298 ops
[i
].op
.xattr
.name_len
= (name
? strlen(name
) : 0);
3299 ops
[i
].op
.xattr
.value_len
= 0;
3301 ops
[i
].indata
.append(name
, ops
[i
].op
.xattr
.name_len
);
3302 Op
*o
= new Op(oid
, oloc
, std::move(ops
), flags
| global_op_flags
|
3303 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
3311 ceph_tid_t
getxattrs(const object_t
& oid
, const object_locator_t
& oloc
,
3312 snapid_t snap
, std::map
<std::string
,ceph::buffer::list
>& attrset
,
3313 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
3314 ObjectOperation
*extra_ops
= NULL
) {
3316 int i
= init_ops(ops
, 1, extra_ops
);
3317 ops
[i
].op
.op
= CEPH_OSD_OP_GETXATTRS
;
3318 C_GetAttrs
*fin
= new C_GetAttrs(attrset
, onfinish
);
3319 Op
*o
= new Op(oid
, oloc
, std::move(ops
), flags
| global_op_flags
|
3320 CEPH_OSD_FLAG_READ
, fin
, objver
);
3322 o
->outbl
= &fin
->bl
;
3328 ceph_tid_t
read_full(const object_t
& oid
, const object_locator_t
& oloc
,
3329 snapid_t snap
, ceph::buffer::list
*pbl
, int flags
,
3330 Context
*onfinish
, version_t
*objver
= NULL
,
3331 ObjectOperation
*extra_ops
= NULL
) {
3332 return read(oid
, oloc
, 0, 0, snap
, pbl
, flags
| global_op_flags
|
3333 CEPH_OSD_FLAG_READ
, onfinish
, objver
, extra_ops
);
3338 ceph_tid_t
_modify(const object_t
& oid
, const object_locator_t
& oloc
,
3340 ceph::real_time mtime
,
3341 const SnapContext
& snapc
, int flags
,
3343 version_t
*objver
= NULL
) {
3344 Op
*o
= new Op(oid
, oloc
, std::move(ops
), flags
| global_op_flags
|
3345 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
3352 Op
*prepare_write_op(
3353 const object_t
& oid
, const object_locator_t
& oloc
,
3354 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
3355 const ceph::buffer::list
&bl
, ceph::real_time mtime
, int flags
,
3356 Context
*oncommit
, version_t
*objver
= NULL
,
3357 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0,
3358 ZTracer::Trace
*parent_trace
= nullptr) {
3360 int i
= init_ops(ops
, 1, extra_ops
);
3361 ops
[i
].op
.op
= CEPH_OSD_OP_WRITE
;
3362 ops
[i
].op
.extent
.offset
= off
;
3363 ops
[i
].op
.extent
.length
= len
;
3364 ops
[i
].op
.extent
.truncate_size
= 0;
3365 ops
[i
].op
.extent
.truncate_seq
= 0;
3367 ops
[i
].op
.flags
= op_flags
;
3368 Op
*o
= new Op(oid
, oloc
, std::move(ops
), flags
| global_op_flags
|
3369 CEPH_OSD_FLAG_WRITE
, std::move(oncommit
), objver
,
3370 nullptr, parent_trace
);
3376 const object_t
& oid
, const object_locator_t
& oloc
,
3377 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
3378 const ceph::buffer::list
&bl
, ceph::real_time mtime
, int flags
,
3379 Context
*oncommit
, version_t
*objver
= NULL
,
3380 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
3381 Op
*o
= prepare_write_op(oid
, oloc
, off
, len
, snapc
, bl
, mtime
, flags
,
3382 oncommit
, objver
, extra_ops
, op_flags
);
3387 Op
*prepare_append_op(
3388 const object_t
& oid
, const object_locator_t
& oloc
,
3389 uint64_t len
, const SnapContext
& snapc
,
3390 const ceph::buffer::list
&bl
, ceph::real_time mtime
, int flags
,
3392 version_t
*objver
= NULL
,
3393 ObjectOperation
*extra_ops
= NULL
) {
3395 int i
= init_ops(ops
, 1, extra_ops
);
3396 ops
[i
].op
.op
= CEPH_OSD_OP_APPEND
;
3397 ops
[i
].op
.extent
.offset
= 0;
3398 ops
[i
].op
.extent
.length
= len
;
3399 ops
[i
].op
.extent
.truncate_size
= 0;
3400 ops
[i
].op
.extent
.truncate_seq
= 0;
3402 Op
*o
= new Op(oid
, oloc
, std::move(ops
), flags
| global_op_flags
|
3403 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
3409 const object_t
& oid
, const object_locator_t
& oloc
,
3410 uint64_t len
, const SnapContext
& snapc
,
3411 const ceph::buffer::list
&bl
, ceph::real_time mtime
, int flags
,
3413 version_t
*objver
= NULL
,
3414 ObjectOperation
*extra_ops
= NULL
) {
3415 Op
*o
= prepare_append_op(oid
, oloc
, len
, snapc
, bl
, mtime
, flags
,
3416 oncommit
, objver
, extra_ops
);
3421 ceph_tid_t
write_trunc(const object_t
& oid
, const object_locator_t
& oloc
,
3422 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
3423 const ceph::buffer::list
&bl
, ceph::real_time mtime
, int flags
,
3424 uint64_t trunc_size
, __u32 trunc_seq
,
3426 version_t
*objver
= NULL
,
3427 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
3429 int i
= init_ops(ops
, 1, extra_ops
);
3430 ops
[i
].op
.op
= CEPH_OSD_OP_WRITE
;
3431 ops
[i
].op
.extent
.offset
= off
;
3432 ops
[i
].op
.extent
.length
= len
;
3433 ops
[i
].op
.extent
.truncate_size
= trunc_size
;
3434 ops
[i
].op
.extent
.truncate_seq
= trunc_seq
;
3436 ops
[i
].op
.flags
= op_flags
;
3437 Op
*o
= new Op(oid
, oloc
, std::move(ops
), flags
| global_op_flags
|
3438 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
3445 Op
*prepare_write_full_op(
3446 const object_t
& oid
, const object_locator_t
& oloc
,
3447 const SnapContext
& snapc
, const ceph::buffer::list
&bl
,
3448 ceph::real_time mtime
, int flags
,
3449 Context
*oncommit
, version_t
*objver
= NULL
,
3450 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
3452 int i
= init_ops(ops
, 1, extra_ops
);
3453 ops
[i
].op
.op
= CEPH_OSD_OP_WRITEFULL
;
3454 ops
[i
].op
.extent
.offset
= 0;
3455 ops
[i
].op
.extent
.length
= bl
.length();
3457 ops
[i
].op
.flags
= op_flags
;
3458 Op
*o
= new Op(oid
, oloc
, std::move(ops
), flags
| global_op_flags
|
3459 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
3464 ceph_tid_t
write_full(
3465 const object_t
& oid
, const object_locator_t
& oloc
,
3466 const SnapContext
& snapc
, const ceph::buffer::list
&bl
,
3467 ceph::real_time mtime
, int flags
,
3468 Context
*oncommit
, version_t
*objver
= NULL
,
3469 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
3470 Op
*o
= prepare_write_full_op(oid
, oloc
, snapc
, bl
, mtime
, flags
,
3471 oncommit
, objver
, extra_ops
, op_flags
);
3476 Op
*prepare_writesame_op(
3477 const object_t
& oid
, const object_locator_t
& oloc
,
3478 uint64_t write_len
, uint64_t off
,
3479 const SnapContext
& snapc
, const ceph::buffer::list
&bl
,
3480 ceph::real_time mtime
, int flags
,
3481 Context
*oncommit
, version_t
*objver
= NULL
,
3482 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
3485 int i
= init_ops(ops
, 1, extra_ops
);
3486 ops
[i
].op
.op
= CEPH_OSD_OP_WRITESAME
;
3487 ops
[i
].op
.writesame
.offset
= off
;
3488 ops
[i
].op
.writesame
.length
= write_len
;
3489 ops
[i
].op
.writesame
.data_length
= bl
.length();
3491 ops
[i
].op
.flags
= op_flags
;
3492 Op
*o
= new Op(oid
, oloc
, std::move(ops
), flags
| global_op_flags
|
3493 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
3498 ceph_tid_t
writesame(
3499 const object_t
& oid
, const object_locator_t
& oloc
,
3500 uint64_t write_len
, uint64_t off
,
3501 const SnapContext
& snapc
, const ceph::buffer::list
&bl
,
3502 ceph::real_time mtime
, int flags
,
3503 Context
*oncommit
, version_t
*objver
= NULL
,
3504 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
3506 Op
*o
= prepare_writesame_op(oid
, oloc
, write_len
, off
, snapc
, bl
,
3507 mtime
, flags
, oncommit
, objver
,
3508 extra_ops
, op_flags
);
3514 ceph_tid_t
trunc(const object_t
& oid
, const object_locator_t
& oloc
,
3515 const SnapContext
& snapc
, ceph::real_time mtime
, int flags
,
3516 uint64_t trunc_size
, __u32 trunc_seq
,
3517 Context
*oncommit
, version_t
*objver
= NULL
,
3518 ObjectOperation
*extra_ops
= NULL
) {
3520 int i
= init_ops(ops
, 1, extra_ops
);
3521 ops
[i
].op
.op
= CEPH_OSD_OP_TRUNCATE
;
3522 ops
[i
].op
.extent
.offset
= trunc_size
;
3523 ops
[i
].op
.extent
.truncate_size
= trunc_size
;
3524 ops
[i
].op
.extent
.truncate_seq
= trunc_seq
;
3525 Op
*o
= new Op(oid
, oloc
, std::move(ops
), flags
| global_op_flags
|
3526 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
3533 ceph_tid_t
zero(const object_t
& oid
, const object_locator_t
& oloc
,
3534 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
3535 ceph::real_time mtime
, int flags
, Context
*oncommit
,
3536 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
3538 int i
= init_ops(ops
, 1, extra_ops
);
3539 ops
[i
].op
.op
= CEPH_OSD_OP_ZERO
;
3540 ops
[i
].op
.extent
.offset
= off
;
3541 ops
[i
].op
.extent
.length
= len
;
3542 Op
*o
= new Op(oid
, oloc
, std::move(ops
), flags
| global_op_flags
|
3543 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
3550 ceph_tid_t
rollback_object(const object_t
& oid
, const object_locator_t
& oloc
,
3551 const SnapContext
& snapc
, snapid_t snapid
,
3552 ceph::real_time mtime
, Context
*oncommit
,
3553 version_t
*objver
= NULL
,
3554 ObjectOperation
*extra_ops
= NULL
) {
3556 int i
= init_ops(ops
, 1, extra_ops
);
3557 ops
[i
].op
.op
= CEPH_OSD_OP_ROLLBACK
;
3558 ops
[i
].op
.snap
.snapid
= snapid
;
3559 Op
*o
= new Op(oid
, oloc
, std::move(ops
), CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
3566 ceph_tid_t
create(const object_t
& oid
, const object_locator_t
& oloc
,
3567 const SnapContext
& snapc
, ceph::real_time mtime
, int global_flags
,
3568 int create_flags
, Context
*oncommit
,
3569 version_t
*objver
= NULL
,
3570 ObjectOperation
*extra_ops
= NULL
) {
3572 int i
= init_ops(ops
, 1, extra_ops
);
3573 ops
[i
].op
.op
= CEPH_OSD_OP_CREATE
;
3574 ops
[i
].op
.flags
= create_flags
;
3575 Op
*o
= new Op(oid
, oloc
, std::move(ops
), global_flags
| global_op_flags
|
3576 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
3583 Op
*prepare_remove_op(
3584 const object_t
& oid
, const object_locator_t
& oloc
,
3585 const SnapContext
& snapc
, ceph::real_time mtime
, int flags
,
3587 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
3589 int i
= init_ops(ops
, 1, extra_ops
);
3590 ops
[i
].op
.op
= CEPH_OSD_OP_DELETE
;
3591 Op
*o
= new Op(oid
, oloc
, std::move(ops
), flags
| global_op_flags
|
3592 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
3598 const object_t
& oid
, const object_locator_t
& oloc
,
3599 const SnapContext
& snapc
, ceph::real_time mtime
, int flags
,
3601 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
3602 Op
*o
= prepare_remove_op(oid
, oloc
, snapc
, mtime
, flags
,
3603 oncommit
, objver
, extra_ops
);
3609 ceph_tid_t
setxattr(const object_t
& oid
, const object_locator_t
& oloc
,
3610 const char *name
, const SnapContext
& snapc
, const ceph::buffer::list
&bl
,
3611 ceph::real_time mtime
, int flags
,
3613 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
3615 int i
= init_ops(ops
, 1, extra_ops
);
3616 ops
[i
].op
.op
= CEPH_OSD_OP_SETXATTR
;
3617 ops
[i
].op
.xattr
.name_len
= (name
? strlen(name
) : 0);
3618 ops
[i
].op
.xattr
.value_len
= bl
.length();
3620 ops
[i
].indata
.append(name
, ops
[i
].op
.xattr
.name_len
);
3621 ops
[i
].indata
.append(bl
);
3622 Op
*o
= new Op(oid
, oloc
, std::move(ops
), flags
| global_op_flags
|
3623 CEPH_OSD_FLAG_WRITE
, oncommit
,
3631 ceph_tid_t
removexattr(const object_t
& oid
, const object_locator_t
& oloc
,
3632 const char *name
, const SnapContext
& snapc
,
3633 ceph::real_time mtime
, int flags
,
3635 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
3637 int i
= init_ops(ops
, 1, extra_ops
);
3638 ops
[i
].op
.op
= CEPH_OSD_OP_RMXATTR
;
3639 ops
[i
].op
.xattr
.name_len
= (name
? strlen(name
) : 0);
3640 ops
[i
].op
.xattr
.value_len
= 0;
3642 ops
[i
].indata
.append(name
, ops
[i
].op
.xattr
.name_len
);
3643 Op
*o
= new Op(oid
, oloc
, std::move(ops
), flags
| global_op_flags
|
3644 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
3652 void list_nobjects(NListContext
*p
, Context
*onfinish
);
3653 uint32_t list_nobjects_seek(NListContext
*p
, uint32_t pos
);
3654 uint32_t list_nobjects_seek(NListContext
*list_context
, const hobject_t
& c
);
3655 void list_nobjects_get_cursor(NListContext
*list_context
, hobject_t
*c
);
3657 hobject_t
enumerate_objects_begin();
3658 hobject_t
enumerate_objects_end();
3660 template<typename T
>
3661 friend struct EnumerationContext
;
3662 template<typename T
>
3663 friend struct CB_EnumerateReply
;
3664 template<typename T
>
3665 void enumerate_objects(
3667 std::string_view ns
,
3671 const ceph::buffer::list
& filter_bl
,
3672 fu2::unique_function
<void(boost::system::error_code
,
3674 hobject_t
) &&> on_finish
);
3675 template<typename T
>
3676 void _issue_enumerate(hobject_t start
,
3677 std::unique_ptr
<EnumerationContext
<T
>>);
3678 template<typename T
>
3679 void _enumerate_reply(
3680 ceph::buffer::list
&& bl
,
3681 boost::system::error_code ec
,
3682 std::unique_ptr
<EnumerationContext
<T
>>&& ectx
);
3684 // -------------------------
3687 void pool_op_submit(PoolOp
*op
);
3688 void _pool_op_submit(PoolOp
*op
);
3689 void _finish_pool_op(PoolOp
*op
, int r
);
3690 void _do_delete_pool(int64_t pool
,
3691 decltype(PoolOp::onfinish
)&& onfinish
);
3694 void create_pool_snap(int64_t pool
, std::string_view snapName
,
3695 decltype(PoolOp::onfinish
)&& onfinish
);
3696 void create_pool_snap(int64_t pool
, std::string_view snapName
,
3698 create_pool_snap(pool
, snapName
,
3699 OpContextVert
<ceph::buffer::list
>(c
, nullptr));
3701 void allocate_selfmanaged_snap(int64_t pool
,
3702 std::unique_ptr
<ceph::async::Completion
<
3703 void(boost::system::error_code
,
3704 snapid_t
)>> onfinish
);
3705 void allocate_selfmanaged_snap(int64_t pool
, snapid_t
* psnapid
,
3707 allocate_selfmanaged_snap(pool
,
3708 OpContextVert(c
, psnapid
));
3710 void delete_pool_snap(int64_t pool
, std::string_view snapName
,
3711 decltype(PoolOp::onfinish
)&& onfinish
);
3712 void delete_pool_snap(int64_t pool
, std::string_view snapName
,
3714 delete_pool_snap(pool
, snapName
,
3715 OpContextVert
<ceph::buffer::list
>(c
, nullptr));
3718 void delete_selfmanaged_snap(int64_t pool
, snapid_t snap
,
3719 decltype(PoolOp::onfinish
)&& onfinish
);
3720 void delete_selfmanaged_snap(int64_t pool
, snapid_t snap
,
3722 delete_selfmanaged_snap(pool
, snap
,
3723 OpContextVert
<ceph::buffer::list
>(c
, nullptr));
3727 void create_pool(std::string_view name
,
3728 decltype(PoolOp::onfinish
)&& onfinish
,
3730 void create_pool(std::string_view name
, Context
*onfinish
,
3731 int crush_rule
=-1) {
3733 OpContextVert
<ceph::buffer::list
>(onfinish
, nullptr),
3736 void delete_pool(int64_t pool
,
3737 decltype(PoolOp::onfinish
)&& onfinish
);
3738 void delete_pool(int64_t pool
,
3739 Context
* onfinish
) {
3740 delete_pool(pool
, OpContextVert
<ceph::buffer::list
>(onfinish
, nullptr));
3743 void delete_pool(std::string_view name
,
3744 decltype(PoolOp::onfinish
)&& onfinish
);
3746 void delete_pool(std::string_view name
,
3747 Context
* onfinish
) {
3748 delete_pool(name
, OpContextVert
<ceph::buffer::list
>(onfinish
, nullptr));
3751 void handle_pool_op_reply(MPoolOpReply
*m
);
3752 int pool_op_cancel(ceph_tid_t tid
, int r
);
3754 // --------------------------
3757 void _poolstat_submit(PoolStatOp
*op
);
3759 void handle_get_pool_stats_reply(MGetPoolStatsReply
*m
);
3760 void get_pool_stats(const std::vector
<std::string
>& pools
,
3761 decltype(PoolStatOp::onfinish
)&& onfinish
);
3762 template<typename CompletionToken
>
3763 auto get_pool_stats(const std::vector
<std::string
>& pools
,
3764 CompletionToken
&& token
) {
3765 boost::asio::async_completion
<CompletionToken
,
3766 PoolStatOp::OpSig
> init(token
);
3767 get_pool_stats(pools
,
3768 PoolStatOp::OpComp::create(
3769 service
.get_executor(),
3770 std::move(init
.completion_handler
)));
3771 return init
.result
.get();
3773 int pool_stat_op_cancel(ceph_tid_t tid
, int r
);
3774 void _finish_pool_stat_op(PoolStatOp
*op
, int r
);
3776 // ---------------------------
3779 void _fs_stats_submit(StatfsOp
*op
);
3781 void handle_fs_stats_reply(MStatfsReply
*m
);
3782 void get_fs_stats(boost::optional
<int64_t> poolid
,
3783 decltype(StatfsOp::onfinish
)&& onfinish
);
3784 template<typename CompletionToken
>
3785 auto get_fs_stats(boost::optional
<int64_t> poolid
,
3786 CompletionToken
&& token
) {
3787 boost::asio::async_completion
<CompletionToken
, StatfsOp::OpSig
> init(token
);
3788 get_fs_stats(poolid
,
3789 StatfsOp::OpComp::create(service
.get_executor(),
3790 std::move(init
.completion_handler
)));
3791 return init
.result
.get();
3793 void get_fs_stats(struct ceph_statfs
& result
, boost::optional
<int64_t> poolid
,
3794 Context
*onfinish
) {
3795 get_fs_stats(poolid
, OpContextVert(onfinish
, result
));
3797 int statfs_op_cancel(ceph_tid_t tid
, int r
);
3798 void _finish_statfs_op(StatfsOp
*op
, int r
);
3800 // ---------------------------
3801 // some scatter/gather hackery
3803 void _sg_read_finish(std::vector
<ObjectExtent
>& extents
,
3804 std::vector
<ceph::buffer::list
>& resultbl
,
3805 ceph::buffer::list
*bl
, Context
*onfinish
);
3807 struct C_SGRead
: public Context
{
3809 std::vector
<ObjectExtent
> extents
;
3810 std::vector
<ceph::buffer::list
> resultbl
;
3811 ceph::buffer::list
*bl
;
3813 C_SGRead(Objecter
*ob
,
3814 std::vector
<ObjectExtent
>& e
, std::vector
<ceph::buffer::list
>& r
, ceph::buffer::list
*b
,
3816 objecter(ob
), bl(b
), onfinish(c
) {
3820 void finish(int r
) override
{
3821 objecter
->_sg_read_finish(extents
, resultbl
, bl
, onfinish
);
3825 void sg_read_trunc(std::vector
<ObjectExtent
>& extents
, snapid_t snap
,
3826 ceph::buffer::list
*bl
, int flags
, uint64_t trunc_size
,
3827 __u32 trunc_seq
, Context
*onfinish
, int op_flags
= 0) {
3828 if (extents
.size() == 1) {
3829 read_trunc(extents
[0].oid
, extents
[0].oloc
, extents
[0].offset
,
3830 extents
[0].length
, snap
, bl
, flags
, extents
[0].truncate_size
,
3831 trunc_seq
, onfinish
, 0, 0, op_flags
);
3833 C_GatherBuilder
gather(cct
);
3834 std::vector
<ceph::buffer::list
> resultbl(extents
.size());
3836 for (auto p
= extents
.begin(); p
!= extents
.end(); ++p
) {
3837 read_trunc(p
->oid
, p
->oloc
, p
->offset
, p
->length
, snap
, &resultbl
[i
++],
3838 flags
, p
->truncate_size
, trunc_seq
, gather
.new_sub(),
3841 gather
.set_finisher(new C_SGRead(this, extents
, resultbl
, bl
, onfinish
));
3846 void sg_read(std::vector
<ObjectExtent
>& extents
, snapid_t snap
, ceph::buffer::list
*bl
,
3847 int flags
, Context
*onfinish
, int op_flags
= 0) {
3848 sg_read_trunc(extents
, snap
, bl
, flags
, 0, 0, onfinish
, op_flags
);
3851 void sg_write_trunc(std::vector
<ObjectExtent
>& extents
, const SnapContext
& snapc
,
3852 const ceph::buffer::list
& bl
, ceph::real_time mtime
, int flags
,
3853 uint64_t trunc_size
, __u32 trunc_seq
,
3854 Context
*oncommit
, int op_flags
= 0) {
3855 if (extents
.size() == 1) {
3856 write_trunc(extents
[0].oid
, extents
[0].oloc
, extents
[0].offset
,
3857 extents
[0].length
, snapc
, bl
, mtime
, flags
,
3858 extents
[0].truncate_size
, trunc_seq
, oncommit
,
3861 C_GatherBuilder
gcom(cct
, oncommit
);
3862 auto it
= bl
.cbegin();
3863 for (auto p
= extents
.begin(); p
!= extents
.end(); ++p
) {
3864 ceph::buffer::list cur
;
3865 for (auto bit
= p
->buffer_extents
.begin();
3866 bit
!= p
->buffer_extents
.end();
3868 if (it
.get_off() != bit
->first
) {
3869 it
.seek(bit
->first
);
3871 it
.copy(bit
->second
, cur
);
3873 ceph_assert(cur
.length() == p
->length
);
3874 write_trunc(p
->oid
, p
->oloc
, p
->offset
, p
->length
,
3875 snapc
, cur
, mtime
, flags
, p
->truncate_size
, trunc_seq
,
3876 oncommit
? gcom
.new_sub():0,
3883 void sg_write(std::vector
<ObjectExtent
>& extents
, const SnapContext
& snapc
,
3884 const ceph::buffer::list
& bl
, ceph::real_time mtime
, int flags
,
3885 Context
*oncommit
, int op_flags
= 0) {
3886 sg_write_trunc(extents
, snapc
, bl
, mtime
, flags
, 0, 0, oncommit
,
3890 void ms_handle_connect(Connection
*con
) override
;
3891 bool ms_handle_reset(Connection
*con
) override
;
3892 void ms_handle_remote_reset(Connection
*con
) override
;
3893 bool ms_handle_refused(Connection
*con
) override
;
3895 void blocklist_self(bool set
);
3898 epoch_t epoch_barrier
= 0;
3899 bool retry_writes_after_first_reply
=
3900 cct
->_conf
->objecter_retry_writes_after_first_reply
;
3903 void set_epoch_barrier(epoch_t epoch
);
3905 PerfCounters
*get_logger() {