1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_OBJECTER_H
16 #define CEPH_OBJECTER_H
18 #include <condition_variable>
24 #include <type_traits>
26 #include <boost/thread/shared_mutex.hpp>
28 #include "include/ceph_assert.h"
29 #include "include/buffer.h"
30 #include "include/types.h"
31 #include "include/rados/rados_types.hpp"
33 #include "common/admin_socket.h"
34 #include "common/ceph_time.h"
35 #include "common/ceph_timer.h"
36 #include "common/config_obs.h"
37 #include "common/shunique_lock.h"
38 #include "common/zipkin_trace.h"
39 #include "common/Finisher.h"
40 #include "common/Throttle.h"
42 #include "messages/MOSDOp.h"
43 #include "msg/Dispatcher.h"
44 #include "osd/OSDMap.h"
56 class MGetPoolStatsReply
;
63 // -----------------------------------------
65 struct ObjectOperation
{
70 vector
<bufferlist
*> out_bl
;
71 vector
<Context
*> out_handler
;
72 vector
<int*> out_rval
;
74 ObjectOperation() : flags(0), priority(0) {}
76 while (!out_handler
.empty()) {
77 delete out_handler
.back();
78 out_handler
.pop_back();
86 void set_last_op_flags(int flags
) {
87 ceph_assert(!ops
.empty());
88 ops
.rbegin()->op
.flags
= flags
;
93 * Add a callback to run when this operation completes,
94 * after any other callbacks for it.
96 void add_handler(Context
*extra
);
98 OSDOp
& add_op(int op
) {
104 out_handler
.resize(s
+1);
105 out_handler
[s
] = NULL
;
106 out_rval
.resize(s
+1);
110 void add_data(int op
, uint64_t off
, uint64_t len
, bufferlist
& bl
) {
111 OSDOp
& osd_op
= add_op(op
);
112 osd_op
.op
.extent
.offset
= off
;
113 osd_op
.op
.extent
.length
= len
;
114 osd_op
.indata
.claim_append(bl
);
116 void add_writesame(int op
, uint64_t off
, uint64_t write_len
,
118 OSDOp
& osd_op
= add_op(op
);
119 osd_op
.op
.writesame
.offset
= off
;
120 osd_op
.op
.writesame
.length
= write_len
;
121 osd_op
.op
.writesame
.data_length
= bl
.length();
122 osd_op
.indata
.claim_append(bl
);
124 void add_xattr(int op
, const char *name
, const bufferlist
& data
) {
125 OSDOp
& osd_op
= add_op(op
);
126 osd_op
.op
.xattr
.name_len
= (name
? strlen(name
) : 0);
127 osd_op
.op
.xattr
.value_len
= data
.length();
129 osd_op
.indata
.append(name
, osd_op
.op
.xattr
.name_len
);
130 osd_op
.indata
.append(data
);
132 void add_xattr_cmp(int op
, const char *name
, uint8_t cmp_op
,
133 uint8_t cmp_mode
, const bufferlist
& data
) {
134 OSDOp
& osd_op
= add_op(op
);
135 osd_op
.op
.xattr
.name_len
= (name
? strlen(name
) : 0);
136 osd_op
.op
.xattr
.value_len
= data
.length();
137 osd_op
.op
.xattr
.cmp_op
= cmp_op
;
138 osd_op
.op
.xattr
.cmp_mode
= cmp_mode
;
140 osd_op
.indata
.append(name
, osd_op
.op
.xattr
.name_len
);
141 osd_op
.indata
.append(data
);
143 void add_call(int op
, const char *cname
, const char *method
,
145 bufferlist
*outbl
, Context
*ctx
, int *prval
) {
146 OSDOp
& osd_op
= add_op(op
);
148 unsigned p
= ops
.size() - 1;
149 out_handler
[p
] = ctx
;
153 osd_op
.op
.cls
.class_len
= strlen(cname
);
154 osd_op
.op
.cls
.method_len
= strlen(method
);
155 osd_op
.op
.cls
.indata_len
= indata
.length();
156 osd_op
.indata
.append(cname
, osd_op
.op
.cls
.class_len
);
157 osd_op
.indata
.append(method
, osd_op
.op
.cls
.method_len
);
158 osd_op
.indata
.append(indata
);
160 void add_pgls(int op
, uint64_t count
, collection_list_handle_t cookie
,
161 epoch_t start_epoch
) {
162 OSDOp
& osd_op
= add_op(op
);
163 osd_op
.op
.pgls
.count
= count
;
164 osd_op
.op
.pgls
.start_epoch
= start_epoch
;
165 encode(cookie
, osd_op
.indata
);
167 void add_pgls_filter(int op
, uint64_t count
, const bufferlist
& filter
,
168 collection_list_handle_t cookie
, epoch_t start_epoch
) {
169 OSDOp
& osd_op
= add_op(op
);
170 osd_op
.op
.pgls
.count
= count
;
171 osd_op
.op
.pgls
.start_epoch
= start_epoch
;
173 string mname
= "filter";
174 encode(cname
, osd_op
.indata
);
175 encode(mname
, osd_op
.indata
);
176 osd_op
.indata
.append(filter
);
177 encode(cookie
, osd_op
.indata
);
179 void add_alloc_hint(int op
, uint64_t expected_object_size
,
180 uint64_t expected_write_size
,
182 OSDOp
& osd_op
= add_op(op
);
183 osd_op
.op
.alloc_hint
.expected_object_size
= expected_object_size
;
184 osd_op
.op
.alloc_hint
.expected_write_size
= expected_write_size
;
185 osd_op
.op
.alloc_hint
.flags
= flags
;
191 void pg_ls(uint64_t count
, bufferlist
& filter
,
192 collection_list_handle_t cookie
, epoch_t start_epoch
) {
193 if (filter
.length() == 0)
194 add_pgls(CEPH_OSD_OP_PGLS
, count
, cookie
, start_epoch
);
196 add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER
, count
, filter
, cookie
,
198 flags
|= CEPH_OSD_FLAG_PGOP
;
201 void pg_nls(uint64_t count
, const bufferlist
& filter
,
202 collection_list_handle_t cookie
, epoch_t start_epoch
) {
203 if (filter
.length() == 0)
204 add_pgls(CEPH_OSD_OP_PGNLS
, count
, cookie
, start_epoch
);
206 add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER
, count
, filter
, cookie
,
208 flags
|= CEPH_OSD_FLAG_PGOP
;
211 void scrub_ls(const librados::object_id_t
& start_after
,
213 std::vector
<librados::inconsistent_obj_t
> *objects
,
216 void scrub_ls(const librados::object_id_t
& start_after
,
218 std::vector
<librados::inconsistent_snapset_t
> *objects
,
222 void create(bool excl
) {
223 OSDOp
& o
= add_op(CEPH_OSD_OP_CREATE
);
224 o
.op
.flags
= (excl
? CEPH_OSD_OP_FLAG_EXCL
: 0);
227 struct C_ObjectOperation_stat
: public Context
{
230 ceph::real_time
*pmtime
;
232 struct timespec
*pts
;
234 C_ObjectOperation_stat(uint64_t *ps
, ceph::real_time
*pm
, time_t *pt
, struct timespec
*_pts
,
236 : psize(ps
), pmtime(pm
), ptime(pt
), pts(_pts
), prval(prval
) {}
237 void finish(int r
) override
{
239 auto p
= bl
.cbegin();
242 ceph::real_time mtime
;
250 *ptime
= ceph::real_clock::to_time_t(mtime
);
252 *pts
= ceph::real_clock::to_timespec(mtime
);
253 } catch (buffer::error
& e
) {
260 void stat(uint64_t *psize
, ceph::real_time
*pmtime
, int *prval
) {
261 add_op(CEPH_OSD_OP_STAT
);
262 unsigned p
= ops
.size() - 1;
263 C_ObjectOperation_stat
*h
= new C_ObjectOperation_stat(psize
, pmtime
, NULL
, NULL
,
269 void stat(uint64_t *psize
, time_t *ptime
, int *prval
) {
270 add_op(CEPH_OSD_OP_STAT
);
271 unsigned p
= ops
.size() - 1;
272 C_ObjectOperation_stat
*h
= new C_ObjectOperation_stat(psize
, NULL
, ptime
, NULL
,
278 void stat(uint64_t *psize
, struct timespec
*pts
, int *prval
) {
279 add_op(CEPH_OSD_OP_STAT
);
280 unsigned p
= ops
.size() - 1;
281 C_ObjectOperation_stat
*h
= new C_ObjectOperation_stat(psize
, NULL
, NULL
, pts
,
288 struct C_ObjectOperation_cmpext
: public Context
{
290 explicit C_ObjectOperation_cmpext(int *prval
)
299 void cmpext(uint64_t off
, bufferlist
& cmp_bl
, int *prval
) {
300 add_data(CEPH_OSD_OP_CMPEXT
, off
, cmp_bl
.length(), cmp_bl
);
301 unsigned p
= ops
.size() - 1;
302 C_ObjectOperation_cmpext
*h
= new C_ObjectOperation_cmpext(prval
);
308 void cmpext(uint64_t off
, uint64_t cmp_len
, const char *cmp_buf
, int *prval
) {
310 cmp_bl
.append(cmp_buf
, cmp_len
);
311 add_data(CEPH_OSD_OP_CMPEXT
, off
, cmp_len
, cmp_bl
);
312 unsigned p
= ops
.size() - 1;
313 C_ObjectOperation_cmpext
*h
= new C_ObjectOperation_cmpext(prval
);
318 void read(uint64_t off
, uint64_t len
, bufferlist
*pbl
, int *prval
,
321 add_data(CEPH_OSD_OP_READ
, off
, len
, bl
);
322 unsigned p
= ops
.size() - 1;
325 out_handler
[p
] = ctx
;
328 struct C_ObjectOperation_sparse_read
: public Context
{
331 std::map
<uint64_t, uint64_t> *extents
;
333 C_ObjectOperation_sparse_read(bufferlist
*data_bl
,
334 std::map
<uint64_t, uint64_t> *extents
,
336 : data_bl(data_bl
), extents(extents
), prval(prval
) {}
337 void finish(int r
) override
{
338 auto iter
= bl
.cbegin();
340 // NOTE: it's possible the sub-op has not been executed but the result
341 // code remains zeroed. Avoid the costly exception handling on a
342 // potential IO path.
343 if (bl
.length() > 0) {
345 decode(*extents
, iter
);
346 decode(*data_bl
, iter
);
347 } catch (buffer::error
& e
) {
357 void sparse_read(uint64_t off
, uint64_t len
, std::map
<uint64_t,uint64_t> *m
,
358 bufferlist
*data_bl
, int *prval
) {
360 add_data(CEPH_OSD_OP_SPARSE_READ
, off
, len
, bl
);
361 unsigned p
= ops
.size() - 1;
362 C_ObjectOperation_sparse_read
*h
=
363 new C_ObjectOperation_sparse_read(data_bl
, m
, prval
);
368 void write(uint64_t off
, bufferlist
& bl
,
369 uint64_t truncate_size
,
370 uint32_t truncate_seq
) {
371 add_data(CEPH_OSD_OP_WRITE
, off
, bl
.length(), bl
);
372 OSDOp
& o
= *ops
.rbegin();
373 o
.op
.extent
.truncate_size
= truncate_size
;
374 o
.op
.extent
.truncate_seq
= truncate_seq
;
376 void write(uint64_t off
, bufferlist
& bl
) {
377 write(off
, bl
, 0, 0);
379 void write_full(bufferlist
& bl
) {
380 add_data(CEPH_OSD_OP_WRITEFULL
, 0, bl
.length(), bl
);
382 void writesame(uint64_t off
, uint64_t write_len
, bufferlist
& bl
) {
383 add_writesame(CEPH_OSD_OP_WRITESAME
, off
, write_len
, bl
);
385 void append(bufferlist
& bl
) {
386 add_data(CEPH_OSD_OP_APPEND
, 0, bl
.length(), bl
);
388 void zero(uint64_t off
, uint64_t len
) {
390 add_data(CEPH_OSD_OP_ZERO
, off
, len
, bl
);
392 void truncate(uint64_t off
) {
394 add_data(CEPH_OSD_OP_TRUNCATE
, off
, 0, bl
);
398 add_data(CEPH_OSD_OP_DELETE
, 0, 0, bl
);
400 void mapext(uint64_t off
, uint64_t len
) {
402 add_data(CEPH_OSD_OP_MAPEXT
, off
, len
, bl
);
404 void sparse_read(uint64_t off
, uint64_t len
) {
406 add_data(CEPH_OSD_OP_SPARSE_READ
, off
, len
, bl
);
409 void checksum(uint8_t type
, const bufferlist
&init_value_bl
,
410 uint64_t off
, uint64_t len
, size_t chunk_size
,
411 bufferlist
*pbl
, int *prval
, Context
*ctx
) {
412 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_CHECKSUM
);
413 osd_op
.op
.checksum
.offset
= off
;
414 osd_op
.op
.checksum
.length
= len
;
415 osd_op
.op
.checksum
.type
= type
;
416 osd_op
.op
.checksum
.chunk_size
= chunk_size
;
417 osd_op
.indata
.append(init_value_bl
);
419 unsigned p
= ops
.size() - 1;
422 out_handler
[p
] = ctx
;
426 void getxattr(const char *name
, bufferlist
*pbl
, int *prval
) {
428 add_xattr(CEPH_OSD_OP_GETXATTR
, name
, bl
);
429 unsigned p
= ops
.size() - 1;
433 struct C_ObjectOperation_decodevals
: public Context
{
434 uint64_t max_entries
;
436 std::map
<std::string
,bufferlist
> *pattrs
;
439 C_ObjectOperation_decodevals(uint64_t m
, std::map
<std::string
,bufferlist
> *pa
,
441 : max_entries(m
), pattrs(pa
), ptruncated(pt
), prval(pr
) {
446 void finish(int r
) override
{
448 auto p
= bl
.cbegin();
453 std::map
<std::string
,bufferlist
> ignore
;
459 decode(*ptruncated
, p
);
461 // the OSD did not provide this. since old OSDs do not
462 // enfoce omap result limits either, we can infer it from
463 // the size of the result
464 *ptruncated
= (pattrs
->size() == max_entries
);
468 catch (buffer::error
& e
) {
475 struct C_ObjectOperation_decodekeys
: public Context
{
476 uint64_t max_entries
;
478 std::set
<std::string
> *pattrs
;
481 C_ObjectOperation_decodekeys(uint64_t m
, std::set
<std::string
> *pa
, bool *pt
,
483 : max_entries(m
), pattrs(pa
), ptruncated(pt
), prval(pr
) {
488 void finish(int r
) override
{
490 auto p
= bl
.cbegin();
495 std::set
<std::string
> ignore
;
501 decode(*ptruncated
, p
);
503 // the OSD did not provide this. since old OSDs do not
504 // enforce omap result limits either, we can infer it from
505 // the size of the result
506 *ptruncated
= (pattrs
->size() == max_entries
);
510 catch (buffer::error
& e
) {
517 struct C_ObjectOperation_decodewatchers
: public Context
{
519 list
<obj_watch_t
> *pwatchers
;
521 C_ObjectOperation_decodewatchers(list
<obj_watch_t
> *pw
, int *pr
)
522 : pwatchers(pw
), prval(pr
) {}
523 void finish(int r
) override
{
525 auto p
= bl
.cbegin();
527 obj_list_watch_response_t resp
;
530 for (list
<watch_item_t
>::iterator i
= resp
.entries
.begin() ;
531 i
!= resp
.entries
.end() ; ++i
) {
533 string sa
= i
->addr
.get_legacy_str();
534 strncpy(ow
.addr
, sa
.c_str(), 256);
535 ow
.watcher_id
= i
->name
.num();
536 ow
.cookie
= i
->cookie
;
537 ow
.timeout_seconds
= i
->timeout_seconds
;
538 pwatchers
->push_back(ow
);
542 catch (buffer::error
& e
) {
549 struct C_ObjectOperation_decodesnaps
: public Context
{
551 librados::snap_set_t
*psnaps
;
553 C_ObjectOperation_decodesnaps(librados::snap_set_t
*ps
, int *pr
)
554 : psnaps(ps
), prval(pr
) {}
555 void finish(int r
) override
{
557 auto p
= bl
.cbegin();
559 obj_list_snap_response_t resp
;
562 psnaps
->clones
.clear();
563 for (vector
<clone_info
>::iterator ci
= resp
.clones
.begin();
564 ci
!= resp
.clones
.end();
566 librados::clone_info_t clone
;
568 clone
.cloneid
= ci
->cloneid
;
569 clone
.snaps
.reserve(ci
->snaps
.size());
570 clone
.snaps
.insert(clone
.snaps
.end(), ci
->snaps
.begin(),
572 clone
.overlap
= ci
->overlap
;
573 clone
.size
= ci
->size
;
575 psnaps
->clones
.push_back(clone
);
577 psnaps
->seq
= resp
.seq
;
579 } catch (buffer::error
& e
) {
586 void getxattrs(std::map
<std::string
,bufferlist
> *pattrs
, int *prval
) {
587 add_op(CEPH_OSD_OP_GETXATTRS
);
588 if (pattrs
|| prval
) {
589 unsigned p
= ops
.size() - 1;
590 C_ObjectOperation_decodevals
*h
591 = new C_ObjectOperation_decodevals(0, pattrs
, nullptr, prval
);
597 void setxattr(const char *name
, const bufferlist
& bl
) {
598 add_xattr(CEPH_OSD_OP_SETXATTR
, name
, bl
);
600 void setxattr(const char *name
, const string
& s
) {
603 add_xattr(CEPH_OSD_OP_SETXATTR
, name
, bl
);
605 void cmpxattr(const char *name
, uint8_t cmp_op
, uint8_t cmp_mode
,
606 const bufferlist
& bl
) {
607 add_xattr_cmp(CEPH_OSD_OP_CMPXATTR
, name
, cmp_op
, cmp_mode
, bl
);
609 void rmxattr(const char *name
) {
611 add_xattr(CEPH_OSD_OP_RMXATTR
, name
, bl
);
613 void setxattrs(map
<string
, bufferlist
>& attrs
) {
616 add_xattr(CEPH_OSD_OP_RESETXATTRS
, 0, bl
.length());
618 void resetxattrs(const char *prefix
, map
<string
, bufferlist
>& attrs
) {
621 add_xattr(CEPH_OSD_OP_RESETXATTRS
, prefix
, bl
);
625 void tmap_update(bufferlist
& bl
) {
626 add_data(CEPH_OSD_OP_TMAPUP
, 0, 0, bl
);
630 void omap_get_keys(const string
&start_after
,
632 std::set
<std::string
> *out_set
,
635 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAPGETKEYS
);
637 encode(start_after
, bl
);
638 encode(max_to_get
, bl
);
639 op
.op
.extent
.offset
= 0;
640 op
.op
.extent
.length
= bl
.length();
641 op
.indata
.claim_append(bl
);
642 if (prval
|| ptruncated
|| out_set
) {
643 unsigned p
= ops
.size() - 1;
644 C_ObjectOperation_decodekeys
*h
=
645 new C_ObjectOperation_decodekeys(max_to_get
, out_set
, ptruncated
, prval
);
652 void omap_get_vals(const string
&start_after
,
653 const string
&filter_prefix
,
655 std::map
<std::string
, bufferlist
> *out_set
,
658 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAPGETVALS
);
660 encode(start_after
, bl
);
661 encode(max_to_get
, bl
);
662 encode(filter_prefix
, bl
);
663 op
.op
.extent
.offset
= 0;
664 op
.op
.extent
.length
= bl
.length();
665 op
.indata
.claim_append(bl
);
666 if (prval
|| out_set
|| ptruncated
) {
667 unsigned p
= ops
.size() - 1;
668 C_ObjectOperation_decodevals
*h
=
669 new C_ObjectOperation_decodevals(max_to_get
, out_set
, ptruncated
, prval
);
676 void omap_get_vals_by_keys(const std::set
<std::string
> &to_get
,
677 std::map
<std::string
, bufferlist
> *out_set
,
679 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS
);
682 op
.op
.extent
.offset
= 0;
683 op
.op
.extent
.length
= bl
.length();
684 op
.indata
.claim_append(bl
);
685 if (prval
|| out_set
) {
686 unsigned p
= ops
.size() - 1;
687 C_ObjectOperation_decodevals
*h
=
688 new C_ObjectOperation_decodevals(0, out_set
, nullptr, prval
);
695 void omap_cmp(const std::map
<std::string
, pair
<bufferlist
,int> > &assertions
,
697 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAP_CMP
);
699 encode(assertions
, bl
);
700 op
.op
.extent
.offset
= 0;
701 op
.op
.extent
.length
= bl
.length();
702 op
.indata
.claim_append(bl
);
704 unsigned p
= ops
.size() - 1;
709 struct C_ObjectOperation_copyget
: public Context
{
711 object_copy_cursor_t
*cursor
;
713 ceph::real_time
*out_mtime
;
714 std::map
<std::string
,bufferlist
> *out_attrs
;
715 bufferlist
*out_data
, *out_omap_header
, *out_omap_data
;
716 vector
<snapid_t
> *out_snaps
;
717 snapid_t
*out_snap_seq
;
719 uint32_t *out_data_digest
;
720 uint32_t *out_omap_digest
;
721 mempool::osd_pglog::vector
<pair
<osd_reqid_t
, version_t
> > *out_reqids
;
722 mempool::osd_pglog::map
<uint32_t, int> *out_reqid_return_codes
;
723 uint64_t *out_truncate_seq
;
724 uint64_t *out_truncate_size
;
726 C_ObjectOperation_copyget(object_copy_cursor_t
*c
,
729 std::map
<std::string
,bufferlist
> *a
,
730 bufferlist
*d
, bufferlist
*oh
,
732 std::vector
<snapid_t
> *osnaps
,
737 mempool::osd_pglog::vector
<pair
<osd_reqid_t
, version_t
> > *oreqids
,
738 mempool::osd_pglog::map
<uint32_t, int> *oreqid_return_codes
,
743 out_size(s
), out_mtime(m
),
744 out_attrs(a
), out_data(d
), out_omap_header(oh
),
745 out_omap_data(o
), out_snaps(osnaps
), out_snap_seq(osnap_seq
),
746 out_flags(flags
), out_data_digest(dd
), out_omap_digest(od
),
748 out_reqid_return_codes(oreqid_return_codes
),
749 out_truncate_seq(otseq
),
750 out_truncate_size(otsize
),
752 void finish(int r
) override
{
753 // reqids are copied on ENOENT
754 if (r
< 0 && r
!= -ENOENT
)
757 auto p
= bl
.cbegin();
758 object_copy_data_t copy_reply
;
759 decode(copy_reply
, p
);
762 *out_reqids
= copy_reply
.reqids
;
766 *out_size
= copy_reply
.size
;
768 *out_mtime
= ceph::real_clock::from_ceph_timespec(copy_reply
.mtime
);
770 *out_attrs
= copy_reply
.attrs
;
772 out_data
->claim_append(copy_reply
.data
);
774 out_omap_header
->claim_append(copy_reply
.omap_header
);
776 *out_omap_data
= copy_reply
.omap_data
;
778 *out_snaps
= copy_reply
.snaps
;
780 *out_snap_seq
= copy_reply
.snap_seq
;
782 *out_flags
= copy_reply
.flags
;
784 *out_data_digest
= copy_reply
.data_digest
;
786 *out_omap_digest
= copy_reply
.omap_digest
;
788 *out_reqids
= copy_reply
.reqids
;
789 if (out_reqid_return_codes
)
790 *out_reqid_return_codes
= copy_reply
.reqid_return_codes
;
791 if (out_truncate_seq
)
792 *out_truncate_seq
= copy_reply
.truncate_seq
;
793 if (out_truncate_size
)
794 *out_truncate_size
= copy_reply
.truncate_size
;
795 *cursor
= copy_reply
.cursor
;
796 } catch (buffer::error
& e
) {
803 void copy_get(object_copy_cursor_t
*cursor
,
806 ceph::real_time
*out_mtime
,
807 std::map
<std::string
,bufferlist
> *out_attrs
,
808 bufferlist
*out_data
,
809 bufferlist
*out_omap_header
,
810 bufferlist
*out_omap_data
,
811 vector
<snapid_t
> *out_snaps
,
812 snapid_t
*out_snap_seq
,
814 uint32_t *out_data_digest
,
815 uint32_t *out_omap_digest
,
816 mempool::osd_pglog::vector
<pair
<osd_reqid_t
, version_t
> > *out_reqids
,
817 mempool::osd_pglog::map
<uint32_t, int> *out_reqid_return_codes
,
818 uint64_t *truncate_seq
,
819 uint64_t *truncate_size
,
821 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_COPY_GET
);
822 osd_op
.op
.copy_get
.max
= max
;
823 encode(*cursor
, osd_op
.indata
);
824 encode(max
, osd_op
.indata
);
825 unsigned p
= ops
.size() - 1;
827 C_ObjectOperation_copyget
*h
=
828 new C_ObjectOperation_copyget(cursor
, out_size
, out_mtime
,
829 out_attrs
, out_data
, out_omap_header
,
830 out_omap_data
, out_snaps
, out_snap_seq
,
831 out_flags
, out_data_digest
,
832 out_omap_digest
, out_reqids
,
833 out_reqid_return_codes
, truncate_seq
,
834 truncate_size
, prval
);
840 add_op(CEPH_OSD_OP_UNDIRTY
);
843 struct C_ObjectOperation_isdirty
: public Context
{
847 C_ObjectOperation_isdirty(bool *p
, int *r
)
848 : pisdirty(p
), prval(r
) {}
849 void finish(int r
) override
{
853 auto p
= bl
.cbegin();
858 } catch (buffer::error
& e
) {
865 void is_dirty(bool *pisdirty
, int *prval
) {
866 add_op(CEPH_OSD_OP_ISDIRTY
);
867 unsigned p
= ops
.size() - 1;
869 C_ObjectOperation_isdirty
*h
=
870 new C_ObjectOperation_isdirty(pisdirty
, prval
);
875 struct C_ObjectOperation_hit_set_ls
: public Context
{
877 std::list
< std::pair
<time_t, time_t> > *ptls
;
878 std::list
< std::pair
<ceph::real_time
, ceph::real_time
> > *putls
;
880 C_ObjectOperation_hit_set_ls(std::list
< std::pair
<time_t, time_t> > *t
,
881 std::list
< std::pair
<ceph::real_time
,
882 ceph::real_time
> > *ut
,
884 : ptls(t
), putls(ut
), prval(r
) {}
885 void finish(int r
) override
{
889 auto p
= bl
.cbegin();
890 std::list
< std::pair
<ceph::real_time
, ceph::real_time
> > ls
;
894 for (auto p
= ls
.begin(); p
!= ls
.end(); ++p
)
895 // round initial timestamp up to the next full second to
896 // keep this a valid interval.
898 make_pair(ceph::real_clock::to_time_t(
900 // Sadly, no time literals until C++14.
901 std::chrono::seconds(1))),
902 ceph::real_clock::to_time_t(p
->second
)));
906 } catch (buffer::error
& e
) {
915 * list available HitSets.
917 * We will get back a list of time intervals. Note that the most
918 * recent range may have an empty end timestamp if it is still
921 * @param pls [out] list of time intervals
922 * @param prval [out] return value
924 void hit_set_ls(std::list
< std::pair
<time_t, time_t> > *pls
, int *prval
) {
925 add_op(CEPH_OSD_OP_PG_HITSET_LS
);
926 unsigned p
= ops
.size() - 1;
928 C_ObjectOperation_hit_set_ls
*h
=
929 new C_ObjectOperation_hit_set_ls(pls
, NULL
, prval
);
933 void hit_set_ls(std::list
<std::pair
<ceph::real_time
, ceph::real_time
> > *pls
,
935 add_op(CEPH_OSD_OP_PG_HITSET_LS
);
936 unsigned p
= ops
.size() - 1;
938 C_ObjectOperation_hit_set_ls
*h
=
939 new C_ObjectOperation_hit_set_ls(NULL
, pls
, prval
);
947 * Return an encoded HitSet that includes the provided time
950 * @param stamp [in] timestamp
951 * @param pbl [out] target buffer for encoded HitSet
952 * @param prval [out] return value
954 void hit_set_get(ceph::real_time stamp
, bufferlist
*pbl
, int *prval
) {
955 OSDOp
& op
= add_op(CEPH_OSD_OP_PG_HITSET_GET
);
956 op
.op
.hit_set_get
.stamp
= ceph::real_clock::to_ceph_timespec(stamp
);
957 unsigned p
= ops
.size() - 1;
962 void omap_get_header(bufferlist
*bl
, int *prval
) {
963 add_op(CEPH_OSD_OP_OMAPGETHEADER
);
964 unsigned p
= ops
.size() - 1;
969 void omap_set(const map
<string
, bufferlist
> &map
) {
972 add_data(CEPH_OSD_OP_OMAPSETVALS
, 0, bl
.length(), bl
);
975 void omap_set_header(bufferlist
&bl
) {
976 add_data(CEPH_OSD_OP_OMAPSETHEADER
, 0, bl
.length(), bl
);
980 add_op(CEPH_OSD_OP_OMAPCLEAR
);
983 void omap_rm_keys(const std::set
<std::string
> &to_remove
) {
985 encode(to_remove
, bl
);
986 add_data(CEPH_OSD_OP_OMAPRMKEYS
, 0, bl
.length(), bl
);
990 void call(const char *cname
, const char *method
, bufferlist
&indata
) {
991 add_call(CEPH_OSD_OP_CALL
, cname
, method
, indata
, NULL
, NULL
, NULL
);
994 void call(const char *cname
, const char *method
, bufferlist
&indata
,
995 bufferlist
*outdata
, Context
*ctx
, int *prval
) {
996 add_call(CEPH_OSD_OP_CALL
, cname
, method
, indata
, outdata
, ctx
, prval
);
1000 void watch(uint64_t cookie
, __u8 op
, uint32_t timeout
= 0) {
1001 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_WATCH
);
1002 osd_op
.op
.watch
.cookie
= cookie
;
1003 osd_op
.op
.watch
.op
= op
;
1004 osd_op
.op
.watch
.timeout
= timeout
;
1007 void notify(uint64_t cookie
, uint32_t prot_ver
, uint32_t timeout
,
1008 bufferlist
&bl
, bufferlist
*inbl
) {
1009 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_NOTIFY
);
1010 osd_op
.op
.notify
.cookie
= cookie
;
1011 encode(prot_ver
, *inbl
);
1012 encode(timeout
, *inbl
);
1014 osd_op
.indata
.append(*inbl
);
1017 void notify_ack(uint64_t notify_id
, uint64_t cookie
,
1018 bufferlist
& reply_bl
) {
1019 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_NOTIFY_ACK
);
1021 encode(notify_id
, bl
);
1023 encode(reply_bl
, bl
);
1024 osd_op
.indata
.append(bl
);
1027 void list_watchers(list
<obj_watch_t
> *out
,
1029 (void)add_op(CEPH_OSD_OP_LIST_WATCHERS
);
1031 unsigned p
= ops
.size() - 1;
1032 C_ObjectOperation_decodewatchers
*h
=
1033 new C_ObjectOperation_decodewatchers(out
, prval
);
1036 out_rval
[p
] = prval
;
1040 void list_snaps(librados::snap_set_t
*out
, int *prval
) {
1041 (void)add_op(CEPH_OSD_OP_LIST_SNAPS
);
1043 unsigned p
= ops
.size() - 1;
1044 C_ObjectOperation_decodesnaps
*h
=
1045 new C_ObjectOperation_decodesnaps(out
, prval
);
1048 out_rval
[p
] = prval
;
1052 void assert_version(uint64_t ver
) {
1053 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_ASSERT_VER
);
1054 osd_op
.op
.assert_ver
.ver
= ver
;
1057 void cmpxattr(const char *name
, const bufferlist
& val
,
1059 add_xattr(CEPH_OSD_OP_CMPXATTR
, name
, val
);
1060 OSDOp
& o
= *ops
.rbegin();
1061 o
.op
.xattr
.cmp_op
= op
;
1062 o
.op
.xattr
.cmp_mode
= mode
;
1065 void rollback(uint64_t snapid
) {
1066 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_ROLLBACK
);
1067 osd_op
.op
.snap
.snapid
= snapid
;
1070 void copy_from(object_t src
, snapid_t snapid
, object_locator_t src_oloc
,
1071 version_t src_version
, unsigned flags
,
1072 unsigned src_fadvise_flags
) {
1073 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_COPY_FROM
);
1074 osd_op
.op
.copy_from
.snapid
= snapid
;
1075 osd_op
.op
.copy_from
.src_version
= src_version
;
1076 osd_op
.op
.copy_from
.flags
= flags
;
1077 osd_op
.op
.copy_from
.src_fadvise_flags
= src_fadvise_flags
;
1078 encode(src
, osd_op
.indata
);
1079 encode(src_oloc
, osd_op
.indata
);
1083 * writeback content to backing tier
1085 * If object is marked dirty in the cache tier, write back content
1086 * to backing tier. If the object is clean this is a no-op.
1088 * If writeback races with an update, the update will block.
1090 * use with IGNORE_CACHE to avoid triggering promote.
1092 void cache_flush() {
1093 add_op(CEPH_OSD_OP_CACHE_FLUSH
);
1097 * writeback content to backing tier
1099 * If object is marked dirty in the cache tier, write back content
1100 * to backing tier. If the object is clean this is a no-op.
1102 * If writeback races with an update, return EAGAIN. Requires that
1103 * the SKIPRWLOCKS flag be set.
1105 * use with IGNORE_CACHE to avoid triggering promote.
1107 void cache_try_flush() {
1108 add_op(CEPH_OSD_OP_CACHE_TRY_FLUSH
);
1112 * evict object from cache tier
1114 * If object is marked clean, remove the object from the cache tier.
1115 * Otherwise, return EBUSY.
1117 * use with IGNORE_CACHE to avoid triggering promote.
1119 void cache_evict() {
1120 add_op(CEPH_OSD_OP_CACHE_EVICT
);
1126 void set_redirect(object_t tgt
, snapid_t snapid
, object_locator_t tgt_oloc
,
1127 version_t tgt_version
, int flag
) {
1128 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_SET_REDIRECT
);
1129 osd_op
.op
.copy_from
.snapid
= snapid
;
1130 osd_op
.op
.copy_from
.src_version
= tgt_version
;
1131 encode(tgt
, osd_op
.indata
);
1132 encode(tgt_oloc
, osd_op
.indata
);
1133 set_last_op_flags(flag
);
1136 void set_chunk(uint64_t src_offset
, uint64_t src_length
, object_locator_t tgt_oloc
,
1137 object_t tgt_oid
, uint64_t tgt_offset
, int flag
) {
1138 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_SET_CHUNK
);
1139 encode(src_offset
, osd_op
.indata
);
1140 encode(src_length
, osd_op
.indata
);
1141 encode(tgt_oloc
, osd_op
.indata
);
1142 encode(tgt_oid
, osd_op
.indata
);
1143 encode(tgt_offset
, osd_op
.indata
);
1144 set_last_op_flags(flag
);
1147 void tier_promote() {
1148 add_op(CEPH_OSD_OP_TIER_PROMOTE
);
1151 void unset_manifest() {
1152 add_op(CEPH_OSD_OP_UNSET_MANIFEST
);
1155 void set_alloc_hint(uint64_t expected_object_size
,
1156 uint64_t expected_write_size
,
1158 add_alloc_hint(CEPH_OSD_OP_SETALLOCHINT
, expected_object_size
,
1159 expected_write_size
, flags
);
1161 // CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
1162 // not worth a feature bit. Set FAILOK per-op flag to make
1163 // sure older osds don't trip over an unsupported opcode.
1164 set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK
);
1167 void dup(vector
<OSDOp
>& sops
) {
1169 out_bl
.resize(sops
.size());
1170 out_handler
.resize(sops
.size());
1171 out_rval
.resize(sops
.size());
1172 for (uint32_t i
= 0; i
< sops
.size(); i
++) {
1173 out_bl
[i
] = &sops
[i
].outdata
;
1174 out_handler
[i
] = NULL
;
1175 out_rval
[i
] = &sops
[i
].rval
;
1180 * Pin/unpin an object in cache tier
1183 add_op(CEPH_OSD_OP_CACHE_PIN
);
1186 void cache_unpin() {
1187 add_op(CEPH_OSD_OP_CACHE_UNPIN
);
1195 class Objecter
: public md_config_obs_t
, public Dispatcher
{
1197 // config observer bits
1198 const char** get_tracked_conf_keys() const override
;
1199 void handle_conf_change(const ConfigProxy
& conf
,
1200 const std::set
<std::string
> &changed
) override
;
1203 Messenger
*messenger
;
1206 ZTracer::Endpoint trace_endpoint
;
1210 using Dispatcher::cct
;
1211 std::multimap
<string
,string
> crush_location
;
1213 std::atomic
<bool> initialized
{false};
1216 std::atomic
<uint64_t> last_tid
{0};
1217 std::atomic
<unsigned> inflight_ops
{0};
1218 std::atomic
<int> client_inc
{-1};
1219 uint64_t max_linger_id
;
1220 std::atomic
<unsigned> num_in_flight
{0};
1221 std::atomic
<int> global_op_flags
{0}; // flags which are applied to each IO op
1222 bool keep_balanced_budget
;
1223 bool honor_osdmap_full
;
1224 bool osdmap_full_try
;
1226 // If this is true, accumulate a set of blacklisted entities
1227 // to be drained by consume_blacklist_events.
1228 bool blacklist_events_enabled
;
1229 std::set
<entity_addr_t
> blacklist_events
;
1232 void maybe_request_map();
1234 void enable_blacklist_events();
1237 void _maybe_request_map();
1239 version_t last_seen_osdmap_version
;
1240 version_t last_seen_pgmap_version
;
1242 mutable std::shared_mutex rwlock
;
1243 using lock_guard
= std::lock_guard
<decltype(rwlock
)>;
1244 using unique_lock
= std::unique_lock
<decltype(rwlock
)>;
1245 using shared_lock
= boost::shared_lock
<decltype(rwlock
)>;
1246 using shunique_lock
= ceph::shunique_lock
<decltype(rwlock
)>;
1247 ceph::timer
<ceph::coarse_mono_clock
> timer
;
1249 PerfCounters
*logger
;
1251 uint64_t tick_event
;
1255 void update_crush_location();
1257 class RequestStateHook
;
1259 RequestStateHook
*m_request_state_hook
;
1262 /*** track pending operations ***/
1267 struct op_target_t
{
1270 epoch_t epoch
= 0; ///< latest epoch we calculated the mapping
1273 object_locator_t base_oloc
;
1274 object_t target_oid
;
1275 object_locator_t target_oloc
;
1277 ///< true if we are directed at base_pgid, not base_oid
1278 bool precalc_pgid
= false;
1280 ///< true if we have ever mapped to a valid pool
1281 bool pool_ever_existed
= false;
1283 ///< explcit pg target, if any
1286 pg_t pgid
; ///< last (raw) pg we mapped to
1287 spg_t actual_pgid
; ///< last (actual) spg_t we mapped to
1288 unsigned pg_num
= 0; ///< last pg_num we mapped to
1289 unsigned pg_num_mask
= 0; ///< last pg_num_mask we mapped to
1290 unsigned pg_num_pending
= 0; ///< last pg_num we mapped to
1291 vector
<int> up
; ///< set of up osds for last pg we mapped to
1292 vector
<int> acting
; ///< set of acting osds for last pg we mapped to
1293 int up_primary
= -1; ///< last up_primary we mapped to
1294 int acting_primary
= -1; ///< last acting_primary we mapped to
1295 int size
= -1; ///< the size of the pool when were were last mapped
1296 int min_size
= -1; ///< the min size of the pool when were were last mapped
1297 bool sort_bitwise
= false; ///< whether the hobject_t sort order is bitwise
1298 bool recovery_deletes
= false; ///< whether the deletes are performed during recovery instead of peering
1300 bool used_replica
= false;
1301 bool paused
= false;
1303 int osd
= -1; ///< the final target osd, or -1
1305 epoch_t last_force_resend
= 0;
1307 op_target_t(object_t oid
, object_locator_t oloc
, int flags
)
1313 explicit op_target_t(pg_t pgid
)
1314 : base_oloc(pgid
.pool(), pgid
.ps()),
1319 op_target_t() = default;
1321 hobject_t
get_hobj() {
1322 return hobject_t(target_oid
,
1325 target_oloc
.hash
>= 0 ? target_oloc
.hash
: pgid
.ps(),
1327 target_oloc
.nspace
);
1330 bool contained_by(const hobject_t
& begin
, const hobject_t
& end
) {
1331 hobject_t h
= get_hobj();
1332 int r
= cmp(h
, begin
);
1333 return r
== 0 || (r
> 0 && h
< end
);
1336 void dump(Formatter
*f
) const;
1339 struct Op
: public RefCountedObject
{
1340 OSDSession
*session
;
1345 ConnectionRef con
; // for rx buffer only
1346 uint64_t features
; // explicitly specified op features
1352 ceph::real_time mtime
;
1355 vector
<bufferlist
*> out_bl
;
1356 vector
<Context
*> out_handler
;
1357 vector
<int*> out_rval
;
1367 epoch_t
*reply_epoch
;
1369 ceph::coarse_mono_time stamp
;
1371 epoch_t map_dne_bound
;
1375 /// true if we should resend this message on failure
1378 /// true if the throttle budget is get/put on a series of OPs,
1379 /// instead of per OP basis, when this flag is set, the budget is
1380 /// acquired before sending the very first OP of the series and
1381 /// released upon receiving the last OP reply.
1386 osd_reqid_t reqid
; // explicitly setting reqid
1387 ZTracer::Trace trace
;
1389 Op(const object_t
& o
, const object_locator_t
& ol
, vector
<OSDOp
>& op
,
1390 int f
, Context
*fin
, version_t
*ov
, int *offset
= NULL
,
1391 ZTracer::Trace
*parent_trace
= nullptr) :
1392 session(NULL
), incarnation(0),
1395 features(CEPH_FEATURES_SUPPORTED_DEFAULT
),
1396 snapid(CEPH_NOSNAP
),
1407 should_resend(true),
1408 ctx_budgeted(false),
1409 data_offset(offset
) {
1412 /* initialize out_* to match op vector */
1413 out_bl
.resize(ops
.size());
1414 out_rval
.resize(ops
.size());
1415 out_handler
.resize(ops
.size());
1416 for (unsigned i
= 0; i
< ops
.size(); i
++) {
1418 out_handler
[i
] = NULL
;
1422 if (target
.base_oloc
.key
== o
)
1423 target
.base_oloc
.key
.clear();
1425 if (parent_trace
&& parent_trace
->valid()) {
1426 trace
.init("op", nullptr, parent_trace
);
1427 trace
.event("start");
1431 bool operator<(const Op
& other
) const {
1432 return tid
< other
.tid
;
1435 bool respects_full() const {
1437 (target
.flags
& (CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_RWORDERED
)) &&
1438 !(target
.flags
& (CEPH_OSD_FLAG_FULL_TRY
| CEPH_OSD_FLAG_FULL_FORCE
));
1443 while (!out_handler
.empty()) {
1444 delete out_handler
.back();
1445 out_handler
.pop_back();
1447 trace
.event("finish");
1451 struct C_Op_Map_Latest
: public Context
{
1455 C_Op_Map_Latest(Objecter
*o
, ceph_tid_t t
) : objecter(o
), tid(t
),
1457 void finish(int r
) override
;
1460 struct C_Command_Map_Latest
: public Context
{
1464 C_Command_Map_Latest(Objecter
*o
, ceph_tid_t t
) : objecter(o
), tid(t
),
1466 void finish(int r
) override
;
1469 struct C_Stat
: public Context
{
1472 ceph::real_time
*pmtime
;
1474 C_Stat(uint64_t *ps
, ceph::real_time
*pm
, Context
*c
) :
1475 psize(ps
), pmtime(pm
), fin(c
) {}
1476 void finish(int r
) override
{
1478 auto p
= bl
.cbegin();
1492 struct C_GetAttrs
: public Context
{
1494 map
<string
,bufferlist
>& attrset
;
1496 C_GetAttrs(map
<string
, bufferlist
>& set
, Context
*c
) : attrset(set
),
1498 void finish(int r
) override
{
1500 auto p
= bl
.cbegin();
1508 // Pools and statistics
1509 struct NListContext
{
1510 collection_list_handle_t pos
;
1512 // these are for !sortbitwise compat only
1514 int starting_pg_num
= 0;
1515 bool sort_bitwise
= false;
1517 bool at_end_of_pool
= false; ///< publicly visible end flag
1519 int64_t pool_id
= -1;
1520 int pool_snap_seq
= 0;
1521 uint64_t max_entries
= 0;
1524 bufferlist bl
; // raw data read to here
1525 std::list
<librados::ListObjectImpl
> list
;
1529 bufferlist extra_info
;
1531 // The budget associated with this context, once it is set (>= 0),
1532 // the budget is not get/released on OP basis, instead the budget
1533 // is acquired before sending the first OP and released upon receiving
1534 // the last op reply.
1535 int ctx_budget
= -1;
1537 bool at_end() const {
1538 return at_end_of_pool
;
1541 uint32_t get_pg_hash_position() const {
1542 return pos
.get_hash();
1546 struct C_NList
: public Context
{
1547 NListContext
*list_context
;
1548 Context
*final_finish
;
1551 C_NList(NListContext
*lc
, Context
* finish
, Objecter
*ob
) :
1552 list_context(lc
), final_finish(finish
), objecter(ob
), epoch(0) {}
1553 void finish(int r
) override
{
1555 objecter
->_nlist_reply(list_context
, r
, final_finish
, epoch
);
1557 final_finish
->complete(r
);
1566 map
<string
,pool_stat_t
> *pool_stats
;
1571 ceph::coarse_mono_time last_submit
;
1576 struct ceph_statfs
*stats
;
1577 boost::optional
<int64_t> data_pool
;
1581 ceph::coarse_mono_time last_submit
;
1595 ceph::coarse_mono_time last_submit
;
1596 PoolOp() : tid(0), pool(0), onfinish(NULL
), ontimeout(0), pool_op(0),
1597 crush_rule(0), snapid(0), blp(NULL
) {}
1600 // -- osd commands --
1601 struct CommandOp
: public RefCountedObject
{
1602 OSDSession
*session
= nullptr;
1606 bufferlist
*poutbl
= nullptr;
1607 string
*prs
= nullptr;
1609 // target_osd == -1 means target_pg is valid
1610 const int target_osd
= -1;
1611 const pg_t target_pg
;
1615 epoch_t map_dne_bound
= 0;
1616 int map_check_error
= 0; // error to return if map check fails
1617 const char *map_check_error_str
= nullptr;
1619 Context
*onfinish
= nullptr;
1620 uint64_t ontimeout
= 0;
1621 ceph::coarse_mono_time last_submit
;
1625 const vector
<string
> &cmd
,
1634 target_osd(target_osd
),
1635 onfinish(onfinish
) {}
1639 const vector
<string
> &cmd
,
1650 onfinish(onfinish
) {}
1654 void submit_command(CommandOp
*c
, ceph_tid_t
*ptid
);
1655 int _calc_command_target(CommandOp
*c
, shunique_lock
&sul
);
1656 void _assign_command_session(CommandOp
*c
, shunique_lock
&sul
);
1657 void _send_command(CommandOp
*c
);
1658 int command_op_cancel(OSDSession
*s
, ceph_tid_t tid
, int r
);
1659 void _finish_command(CommandOp
*c
, int r
, string rs
);
1660 void handle_command_reply(MCommandReply
*m
);
1663 // -- lingering ops --
1665 struct WatchContext
{
1666 // this simply mirrors librados WatchCtx2
1667 virtual void handle_notify(uint64_t notify_id
,
1669 uint64_t notifier_id
,
1670 bufferlist
& bl
) = 0;
1671 virtual void handle_error(uint64_t cookie
, int err
) = 0;
1672 virtual ~WatchContext() {}
1675 struct LingerOp
: public RefCountedObject
{
1682 ceph::real_time mtime
;
1690 ceph::coarse_mono_time watch_valid_thru
; ///< send time for last acked ping
1691 int last_error
; ///< error from last failed ping|reconnect, if any
1692 std::shared_mutex watch_lock
;
1693 using lock_guard
= std::unique_lock
<decltype(watch_lock
)>;
1694 using unique_lock
= std::unique_lock
<decltype(watch_lock
)>;
1695 using shared_lock
= boost::shared_lock
<decltype(watch_lock
)>;
1696 using shunique_lock
= ceph::shunique_lock
<decltype(watch_lock
)>;
1698 // queue of pending async operations, with the timestamp of
1699 // when they were queued.
1700 list
<ceph::coarse_mono_time
> watch_pending_async
;
1702 uint32_t register_gen
;
1705 Context
*on_reg_commit
;
1707 // we trigger these from an async finisher
1708 Context
*on_notify_finish
;
1709 bufferlist
*notify_result_bl
;
1712 WatchContext
*watch_context
;
1714 OSDSession
*session
;
1718 ceph_tid_t register_tid
;
1719 ceph_tid_t ping_tid
;
1720 epoch_t map_dne_bound
;
1722 void _queued_async() {
1723 // watch_lock ust be locked unique
1724 watch_pending_async
.push_back(ceph::coarse_mono_clock::now());
1726 void finished_async() {
1727 unique_lock
l(watch_lock
);
1728 ceph_assert(!watch_pending_async
.empty());
1729 watch_pending_async
.pop_front();
1732 explicit LingerOp(Objecter
*o
) : linger_id(0),
1733 target(object_t(), object_locator_t(), 0),
1734 snap(CEPH_NOSNAP
), poutbl(NULL
), pobjver(NULL
),
1735 is_watch(false), last_error(0),
1739 on_reg_commit(NULL
),
1740 on_notify_finish(NULL
),
1741 notify_result_bl(NULL
),
1743 watch_context(NULL
),
1751 const LingerOp
&operator=(const LingerOp
& r
) = delete;
1752 LingerOp(const LingerOp
& o
) = delete;
1754 uint64_t get_cookie() {
1755 return reinterpret_cast<uint64_t>(this);
1759 ~LingerOp() override
{
1760 delete watch_context
;
1764 struct C_Linger_Commit
: public Context
{
1767 bufferlist outbl
; // used for notify only
1768 C_Linger_Commit(Objecter
*o
, LingerOp
*l
) : objecter(o
), info(l
) {
1771 ~C_Linger_Commit() override
{
1774 void finish(int r
) override
{
1775 objecter
->_linger_commit(info
, r
, outbl
);
1779 struct C_Linger_Reconnect
: public Context
{
1782 C_Linger_Reconnect(Objecter
*o
, LingerOp
*l
) : objecter(o
), info(l
) {
1785 ~C_Linger_Reconnect() override
{
1788 void finish(int r
) override
{
1789 objecter
->_linger_reconnect(info
, r
);
1793 struct C_Linger_Ping
: public Context
{
1796 ceph::coarse_mono_time sent
;
1797 uint32_t register_gen
;
1798 C_Linger_Ping(Objecter
*o
, LingerOp
*l
)
1799 : objecter(o
), info(l
), register_gen(info
->register_gen
) {
1802 ~C_Linger_Ping() override
{
1805 void finish(int r
) override
{
1806 objecter
->_linger_ping(info
, r
, sent
, register_gen
);
1810 struct C_Linger_Map_Latest
: public Context
{
1814 C_Linger_Map_Latest(Objecter
*o
, uint64_t id
) :
1815 objecter(o
), linger_id(id
), latest(0) {}
1816 void finish(int r
) override
;
1819 // -- osd sessions --
1823 hobject_t begin
, end
;
1826 struct OSDSession
: public RefCountedObject
{
1827 std::shared_mutex lock
;
1828 using lock_guard
= std::lock_guard
<decltype(lock
)>;
1829 using unique_lock
= std::unique_lock
<decltype(lock
)>;
1830 using shared_lock
= boost::shared_lock
<decltype(lock
)>;
1831 using shunique_lock
= ceph::shunique_lock
<decltype(lock
)>;
1834 map
<ceph_tid_t
,Op
*> ops
;
1835 map
<uint64_t, LingerOp
*> linger_ops
;
1836 map
<ceph_tid_t
,CommandOp
*> command_ops
;
1839 map
<spg_t
,map
<hobject_t
,OSDBackoff
>> backoffs
;
1840 map
<uint64_t,OSDBackoff
*> backoffs_by_id
;
1846 std::unique_ptr
<std::mutex
[]> completion_locks
;
1847 using unique_completion_lock
= std::unique_lock
<
1848 decltype(completion_locks
)::element_type
>;
1851 OSDSession(CephContext
*cct
, int o
) :
1852 osd(o
), incarnation(0), con(NULL
),
1853 num_locks(cct
->_conf
->objecter_completion_locks_per_session
),
1854 completion_locks(new std::mutex
[num_locks
]) {}
1856 ~OSDSession() override
;
1858 bool is_homeless() { return (osd
== -1); }
1860 unique_completion_lock
get_lock(object_t
& oid
);
1862 map
<int,OSDSession
*> osd_sessions
;
1864 bool osdmap_full_flag() const;
1865 bool osdmap_pool_full(const int64_t pool_id
) const;
1870 * Test pg_pool_t::FLAG_FULL on a pool
1872 * @return true if the pool exists and has the flag set, or
1873 * the global full flag is set, else false
1875 bool _osdmap_pool_full(const int64_t pool_id
) const;
1876 bool _osdmap_pool_full(const pg_pool_t
&p
) const;
1877 void update_pool_full_map(map
<int64_t, bool>& pool_full_map
);
1879 map
<uint64_t, LingerOp
*> linger_ops
;
1880 // we use this just to confirm a cookie is valid before dereferencing the ptr
1881 set
<LingerOp
*> linger_ops_set
;
1883 map
<ceph_tid_t
,PoolStatOp
*> poolstat_ops
;
1884 map
<ceph_tid_t
,StatfsOp
*> statfs_ops
;
1885 map
<ceph_tid_t
,PoolOp
*> pool_ops
;
1886 std::atomic
<unsigned> num_homeless_ops
{0};
1888 OSDSession
*homeless_session
;
1890 // ops waiting for an osdmap with a new pool or confirmation that
1891 // the pool does not exist (may be expanded to other uses later)
1892 map
<uint64_t, LingerOp
*> check_latest_map_lingers
;
1893 map
<ceph_tid_t
, Op
*> check_latest_map_ops
;
1894 map
<ceph_tid_t
, CommandOp
*> check_latest_map_commands
;
1896 map
<epoch_t
,list
< pair
<Context
*, int> > > waiting_for_map
;
1898 ceph::timespan mon_timeout
;
1899 ceph::timespan osd_timeout
;
1901 MOSDOp
*_prepare_osd_op(Op
*op
);
1902 void _send_op(Op
*op
);
1903 void _send_op_account(Op
*op
);
1904 void _cancel_linger_op(Op
*op
);
1905 void _finish_op(Op
*op
, int r
);
1906 static bool is_pg_changed(
1908 const vector
<int>& oldacting
,
1910 const vector
<int>& newacting
,
1911 bool any_change
=false);
1912 enum recalc_op_target_result
{
1913 RECALC_OP_TARGET_NO_ACTION
= 0,
1914 RECALC_OP_TARGET_NEED_RESEND
,
1915 RECALC_OP_TARGET_POOL_DNE
,
1916 RECALC_OP_TARGET_OSD_DNE
,
1917 RECALC_OP_TARGET_OSD_DOWN
,
1919 bool _osdmap_full_flag() const;
1920 bool _osdmap_has_pool_full() const;
1922 const mempool::osdmap::map
<int64_t, OSDMap::snap_interval_set_t
>& new_removed_snaps
,
1925 bool target_should_be_paused(op_target_t
*op
);
1926 int _calc_target(op_target_t
*t
, Connection
*con
,
1927 bool any_change
= false);
1928 int _map_session(op_target_t
*op
, OSDSession
**s
,
1931 void _session_op_assign(OSDSession
*s
, Op
*op
);
1932 void _session_op_remove(OSDSession
*s
, Op
*op
);
1933 void _session_linger_op_assign(OSDSession
*to
, LingerOp
*op
);
1934 void _session_linger_op_remove(OSDSession
*from
, LingerOp
*op
);
1935 void _session_command_op_assign(OSDSession
*to
, CommandOp
*op
);
1936 void _session_command_op_remove(OSDSession
*from
, CommandOp
*op
);
1938 int _assign_op_target_session(Op
*op
, shunique_lock
& lc
,
1939 bool src_session_locked
,
1940 bool dst_session_locked
);
1941 int _recalc_linger_op_target(LingerOp
*op
, shunique_lock
& lc
);
1943 void _linger_submit(LingerOp
*info
, shunique_lock
& sul
);
1944 void _send_linger(LingerOp
*info
, shunique_lock
& sul
);
1945 void _linger_commit(LingerOp
*info
, int r
, bufferlist
& outbl
);
1946 void _linger_reconnect(LingerOp
*info
, int r
);
1947 void _send_linger_ping(LingerOp
*info
);
1948 void _linger_ping(LingerOp
*info
, int r
, ceph::coarse_mono_time sent
,
1949 uint32_t register_gen
);
1950 int _normalize_watch_error(int r
);
1952 friend class C_DoWatchError
;
1954 void linger_callback_flush(Context
*ctx
) {
1955 finisher
->queue(ctx
);
1959 void _check_op_pool_dne(Op
*op
, unique_lock
*sl
);
1960 void _send_op_map_check(Op
*op
);
1961 void _op_cancel_map_check(Op
*op
);
1962 void _check_linger_pool_dne(LingerOp
*op
, bool *need_unregister
);
1963 void _send_linger_map_check(LingerOp
*op
);
1964 void _linger_cancel_map_check(LingerOp
*op
);
1965 void _check_command_map_dne(CommandOp
*op
);
1966 void _send_command_map_check(CommandOp
*op
);
1967 void _command_cancel_map_check(CommandOp
*op
);
1969 void _kick_requests(OSDSession
*session
, map
<uint64_t, LingerOp
*>& lresend
);
1970 void _linger_ops_resend(map
<uint64_t, LingerOp
*>& lresend
, unique_lock
& ul
);
1972 int _get_session(int osd
, OSDSession
**session
, shunique_lock
& sul
);
1973 void put_session(OSDSession
*s
);
1974 void get_session(OSDSession
*s
);
1975 void _reopen_session(OSDSession
*session
);
1976 void close_session(OSDSession
*session
);
1978 void _nlist_reply(NListContext
*list_context
, int r
, Context
*final_finish
,
1979 epoch_t reply_epoch
);
1981 void resend_mon_ops();
1984 * handle a budget for in-flight ops
1985 * budget is taken whenever an op goes into the ops map
1986 * and returned whenever an op is removed from the map
1987 * If throttle_op needs to throttle it will unlock client_lock.
1989 int calc_op_budget(const vector
<OSDOp
>& ops
);
1990 void _throttle_op(Op
*op
, shunique_lock
& sul
, int op_size
= 0);
1991 int _take_op_budget(Op
*op
, shunique_lock
& sul
) {
1992 ceph_assert(sul
&& sul
.mutex() == &rwlock
);
1993 int op_budget
= calc_op_budget(op
->ops
);
1994 if (keep_balanced_budget
) {
1995 _throttle_op(op
, sul
, op_budget
);
1996 } else { // update take_linger_budget to match this!
1997 op_throttle_bytes
.take(op_budget
);
1998 op_throttle_ops
.take(1);
2000 op
->budget
= op_budget
;
2003 int take_linger_budget(LingerOp
*info
);
2004 friend class WatchContext
; // to invoke put_up_budget_bytes
2005 void put_op_budget_bytes(int op_budget
) {
2006 ceph_assert(op_budget
>= 0);
2007 op_throttle_bytes
.put(op_budget
);
2008 op_throttle_ops
.put(1);
2010 void put_nlist_context_budget(NListContext
*list_context
);
2011 Throttle op_throttle_bytes
, op_throttle_ops
;
2014 Objecter(CephContext
*cct_
, Messenger
*m
, MonClient
*mc
,
2017 double osd_timeout
) :
2018 Dispatcher(cct_
), messenger(m
), monc(mc
), finisher(fin
),
2019 trace_endpoint("0.0.0.0", 0, "Objecter"),
2022 keep_balanced_budget(false), honor_osdmap_full(true), osdmap_full_try(false),
2023 blacklist_events_enabled(false),
2024 last_seen_osdmap_version(0), last_seen_pgmap_version(0),
2025 logger(NULL
), tick_event(0), m_request_state_hook(NULL
),
2026 homeless_session(new OSDSession(cct
, -1)),
2027 mon_timeout(ceph::make_timespan(mon_timeout
)),
2028 osd_timeout(ceph::make_timespan(osd_timeout
)),
2029 op_throttle_bytes(cct
, "objecter_bytes",
2030 cct
->_conf
->objecter_inflight_op_bytes
),
2031 op_throttle_ops(cct
, "objecter_ops", cct
->_conf
->objecter_inflight_ops
),
2033 retry_writes_after_first_reply(cct
->_conf
->objecter_retry_writes_after_first_reply
)
2035 ~Objecter() override
;
2038 void start(const OSDMap
*o
= nullptr);
2041 // These two templates replace osdmap_(get)|(put)_read. Simply wrap
2042 // whatever functionality you want to use the OSDMap in a lambda like:
2044 // with_osdmap([](const OSDMap& o) { o.do_stuff(); });
2048 // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); });
2050 // Do not call into something that will try to lock the OSDMap from
2051 // here or you will have great woe and misery.
2053 template<typename Callback
, typename
...Args
>
2054 auto with_osdmap(Callback
&& cb
, Args
&&... args
) const ->
2055 decltype(cb(*osdmap
, std::forward
<Args
>(args
)...)) {
2056 shared_lock
l(rwlock
);
2057 return std::forward
<Callback
>(cb
)(*osdmap
, std::forward
<Args
>(args
)...);
2062 * Tell the objecter to throttle outgoing ops according to its
2063 * budget (in _conf). If you do this, ops can block, in
2064 * which case it will unlock client_lock and sleep until
2065 * incoming messages reduce the used budget low enough for
2066 * the ops to continue going; then it will lock client_lock again.
2068 void set_balanced_budget() { keep_balanced_budget
= true; }
2069 void unset_balanced_budget() { keep_balanced_budget
= false; }
2071 void set_honor_osdmap_full() { honor_osdmap_full
= true; }
2072 void unset_honor_osdmap_full() { honor_osdmap_full
= false; }
2074 void set_osdmap_full_try() { osdmap_full_try
= true; }
2075 void unset_osdmap_full_try() { osdmap_full_try
= false; }
2077 void _scan_requests(
2081 map
<int64_t, bool> *pool_full_map
,
2082 map
<ceph_tid_t
, Op
*>& need_resend
,
2083 list
<LingerOp
*>& need_resend_linger
,
2084 map
<ceph_tid_t
, CommandOp
*>& need_resend_command
,
2086 const mempool::osdmap::map
<int64_t,OSDMap::snap_interval_set_t
> *gap_removed_snaps
);
2088 int64_t get_object_hash_position(int64_t pool
, const string
& key
,
2090 int64_t get_object_pg_hash_position(int64_t pool
, const string
& key
,
2095 bool ms_dispatch(Message
*m
) override
;
2096 bool ms_can_fast_dispatch_any() const override
{
2099 bool ms_can_fast_dispatch(const Message
*m
) const override
{
2100 switch (m
->get_type()) {
2101 case CEPH_MSG_OSD_OPREPLY
:
2102 case CEPH_MSG_WATCH_NOTIFY
:
2108 void ms_fast_dispatch(Message
*m
) override
{
2109 if (!ms_dispatch(m
)) {
2114 void handle_osd_op_reply(class MOSDOpReply
*m
);
2115 void handle_osd_backoff(class MOSDBackoff
*m
);
2116 void handle_watch_notify(class MWatchNotify
*m
);
2117 void handle_osd_map(class MOSDMap
*m
);
2118 void wait_for_osd_map();
2121 * Get list of entities blacklisted since this was last called,
2122 * and reset the list.
2124 * Uses a std::set because typical use case is to compare some
2125 * other list of clients to see which overlap with the blacklisted
2129 void consume_blacklist_events(std::set
<entity_addr_t
> *events
);
2131 int pool_snap_by_name(int64_t poolid
,
2132 const char *snap_name
,
2133 snapid_t
*snap
) const;
2134 int pool_snap_get_info(int64_t poolid
, snapid_t snap
,
2135 pool_snap_info_t
*info
) const;
2136 int pool_snap_list(int64_t poolid
, vector
<uint64_t> *snaps
);
2139 void emit_blacklist_events(const OSDMap::Incremental
&inc
);
2140 void emit_blacklist_events(const OSDMap
&old_osd_map
,
2141 const OSDMap
&new_osd_map
);
2144 void _op_submit(Op
*op
, shunique_lock
& lc
, ceph_tid_t
*ptid
);
2145 void _op_submit_with_budget(Op
*op
, shunique_lock
& lc
,
2147 int *ctx_budget
= NULL
);
2150 void op_submit(Op
*op
, ceph_tid_t
*ptid
= NULL
, int *ctx_budget
= NULL
);
2152 shared_lock
l(rwlock
);
2153 return !((!inflight_ops
) && linger_ops
.empty() &&
2154 poolstat_ops
.empty() && statfs_ops
.empty());
2158 * Output in-flight requests
2160 void _dump_active(OSDSession
*s
);
2161 void _dump_active();
2163 void dump_requests(Formatter
*fmt
);
2164 void _dump_ops(const OSDSession
*s
, Formatter
*fmt
);
2165 void dump_ops(Formatter
*fmt
);
2166 void _dump_linger_ops(const OSDSession
*s
, Formatter
*fmt
);
2167 void dump_linger_ops(Formatter
*fmt
);
2168 void _dump_command_ops(const OSDSession
*s
, Formatter
*fmt
);
2169 void dump_command_ops(Formatter
*fmt
);
2170 void dump_pool_ops(Formatter
*fmt
) const;
2171 void dump_pool_stat_ops(Formatter
*fmt
) const;
2172 void dump_statfs_ops(Formatter
*fmt
) const;
2174 int get_client_incarnation() const { return client_inc
; }
2175 void set_client_incarnation(int inc
) { client_inc
= inc
; }
2177 bool have_map(epoch_t epoch
);
2178 /// wait for epoch; true if we already have it
2179 bool wait_for_map(epoch_t epoch
, Context
*c
, int err
=0);
2180 void _wait_for_new_map(Context
*c
, epoch_t epoch
, int err
=0);
2181 void wait_for_latest_osdmap(Context
*fin
);
2182 void get_latest_version(epoch_t oldest
, epoch_t neweset
, Context
*fin
);
2184 /** Get the current set of global op flags */
2185 int get_global_op_flags() const { return global_op_flags
; }
2186 /** Add a flag to the global op flags, not really atomic operation */
2187 void add_global_op_flags(int flag
) {
2188 global_op_flags
.fetch_or(flag
);
2190 /** Clear the passed flags from the global op flag set */
2191 void clear_global_op_flag(int flags
) {
2192 global_op_flags
.fetch_and(~flags
);
2195 /// cancel an in-progress request with the given return code
2197 int op_cancel(OSDSession
*s
, ceph_tid_t tid
, int r
);
2198 int _op_cancel(ceph_tid_t tid
, int r
);
2200 int op_cancel(ceph_tid_t tid
, int r
);
2201 int op_cancel(const vector
<ceph_tid_t
>& tidls
, int r
);
2204 * Any write op which is in progress at the start of this call shall no
2205 * longer be in progress when this call ends. Operations started after the
2206 * start of this call may still be in progress when this call ends.
2208 * @return the latest possible epoch in which a cancelled op could have
2209 * existed, or -1 if nothing was cancelled.
2211 epoch_t
op_cancel_writes(int r
, int64_t pool
=-1);
2214 void osd_command(int osd
, const std::vector
<string
>& cmd
,
2215 const bufferlist
& inbl
, ceph_tid_t
*ptid
,
2216 bufferlist
*poutbl
, string
*prs
, Context
*onfinish
) {
2217 ceph_assert(osd
>= 0);
2218 CommandOp
*c
= new CommandOp(
2225 submit_command(c
, ptid
);
2227 void pg_command(pg_t pgid
, const vector
<string
>& cmd
,
2228 const bufferlist
& inbl
, ceph_tid_t
*ptid
,
2229 bufferlist
*poutbl
, string
*prs
, Context
*onfinish
) {
2230 CommandOp
*c
= new CommandOp(
2237 submit_command(c
, ptid
);
2240 // mid-level helpers
2241 Op
*prepare_mutate_op(
2242 const object_t
& oid
, const object_locator_t
& oloc
,
2243 ObjectOperation
& op
, const SnapContext
& snapc
,
2244 ceph::real_time mtime
, int flags
,
2245 Context
*oncommit
, version_t
*objver
= NULL
,
2246 osd_reqid_t reqid
= osd_reqid_t(),
2247 ZTracer::Trace
*parent_trace
= nullptr) {
2248 Op
*o
= new Op(oid
, oloc
, op
.ops
, flags
| global_op_flags
|
2249 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
, nullptr, parent_trace
);
2250 o
->priority
= op
.priority
;
2253 o
->out_rval
.swap(op
.out_rval
);
2258 const object_t
& oid
, const object_locator_t
& oloc
,
2259 ObjectOperation
& op
, const SnapContext
& snapc
,
2260 ceph::real_time mtime
, int flags
,
2261 Context
*oncommit
, version_t
*objver
= NULL
,
2262 osd_reqid_t reqid
= osd_reqid_t()) {
2263 Op
*o
= prepare_mutate_op(oid
, oloc
, op
, snapc
, mtime
, flags
,
2264 oncommit
, objver
, reqid
);
2269 Op
*prepare_read_op(
2270 const object_t
& oid
, const object_locator_t
& oloc
,
2271 ObjectOperation
& op
,
2272 snapid_t snapid
, bufferlist
*pbl
, int flags
,
2273 Context
*onack
, version_t
*objver
= NULL
,
2274 int *data_offset
= NULL
,
2275 uint64_t features
= 0,
2276 ZTracer::Trace
*parent_trace
= nullptr) {
2277 Op
*o
= new Op(oid
, oloc
, op
.ops
, flags
| global_op_flags
|
2278 CEPH_OSD_FLAG_READ
, onack
, objver
, data_offset
, parent_trace
);
2279 o
->priority
= op
.priority
;
2282 if (!o
->outbl
&& op
.size() == 1 && op
.out_bl
[0]->length())
2283 o
->outbl
= op
.out_bl
[0];
2284 o
->out_bl
.swap(op
.out_bl
);
2285 o
->out_handler
.swap(op
.out_handler
);
2286 o
->out_rval
.swap(op
.out_rval
);
2290 const object_t
& oid
, const object_locator_t
& oloc
,
2291 ObjectOperation
& op
,
2292 snapid_t snapid
, bufferlist
*pbl
, int flags
,
2293 Context
*onack
, version_t
*objver
= NULL
,
2294 int *data_offset
= NULL
,
2295 uint64_t features
= 0) {
2296 Op
*o
= prepare_read_op(oid
, oloc
, op
, snapid
, pbl
, flags
, onack
, objver
,
2299 o
->features
= features
;
2304 Op
*prepare_pg_read_op(
2305 uint32_t hash
, object_locator_t oloc
,
2306 ObjectOperation
& op
, bufferlist
*pbl
, int flags
,
2307 Context
*onack
, epoch_t
*reply_epoch
,
2309 Op
*o
= new Op(object_t(), oloc
,
2311 flags
| global_op_flags
| CEPH_OSD_FLAG_READ
|
2312 CEPH_OSD_FLAG_IGNORE_OVERLAY
,
2314 o
->target
.precalc_pgid
= true;
2315 o
->target
.base_pgid
= pg_t(hash
, oloc
.pool
);
2316 o
->priority
= op
.priority
;
2317 o
->snapid
= CEPH_NOSNAP
;
2319 o
->out_bl
.swap(op
.out_bl
);
2320 o
->out_handler
.swap(op
.out_handler
);
2321 o
->out_rval
.swap(op
.out_rval
);
2322 o
->reply_epoch
= reply_epoch
;
2324 // budget is tracked by listing context
2325 o
->ctx_budgeted
= true;
2330 uint32_t hash
, object_locator_t oloc
,
2331 ObjectOperation
& op
, bufferlist
*pbl
, int flags
,
2332 Context
*onack
, epoch_t
*reply_epoch
,
2334 Op
*o
= prepare_pg_read_op(hash
, oloc
, op
, pbl
, flags
,
2335 onack
, reply_epoch
, ctx_budget
);
2337 op_submit(o
, &tid
, ctx_budget
);
2341 // caller owns a ref
2342 LingerOp
*linger_register(const object_t
& oid
, const object_locator_t
& oloc
,
2344 ceph_tid_t
linger_watch(LingerOp
*info
,
2345 ObjectOperation
& op
,
2346 const SnapContext
& snapc
, ceph::real_time mtime
,
2350 ceph_tid_t
linger_notify(LingerOp
*info
,
2351 ObjectOperation
& op
,
2352 snapid_t snap
, bufferlist
& inbl
,
2356 int linger_check(LingerOp
*info
);
2357 void linger_cancel(LingerOp
*info
); // releases a reference
2358 void _linger_cancel(LingerOp
*info
);
2360 void _do_watch_notify(LingerOp
*info
, MWatchNotify
*m
);
2363 * set up initial ops in the op vector, and allocate a final op slot.
2365 * The caller is responsible for filling in the final ops_count ops.
2367 * @param ops op vector
2368 * @param ops_count number of final ops the caller will fill in
2369 * @param extra_ops pointer to [array of] initial op[s]
2370 * @return index of final op (for caller to fill in)
2372 int init_ops(vector
<OSDOp
>& ops
, int ops_count
, ObjectOperation
*extra_ops
) {
2377 extra
= extra_ops
->ops
.size();
2379 ops
.resize(ops_count
+ extra
);
2381 for (i
=0; i
<extra
; i
++) {
2382 ops
[i
] = extra_ops
->ops
[i
];
2389 // high-level helpers
2390 Op
*prepare_stat_op(
2391 const object_t
& oid
, const object_locator_t
& oloc
,
2392 snapid_t snap
, uint64_t *psize
, ceph::real_time
*pmtime
,
2393 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2394 ObjectOperation
*extra_ops
= NULL
) {
2396 int i
= init_ops(ops
, 1, extra_ops
);
2397 ops
[i
].op
.op
= CEPH_OSD_OP_STAT
;
2398 C_Stat
*fin
= new C_Stat(psize
, pmtime
, onfinish
);
2399 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2400 CEPH_OSD_FLAG_READ
, fin
, objver
);
2402 o
->outbl
= &fin
->bl
;
2406 const object_t
& oid
, const object_locator_t
& oloc
,
2407 snapid_t snap
, uint64_t *psize
, ceph::real_time
*pmtime
,
2408 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2409 ObjectOperation
*extra_ops
= NULL
) {
2410 Op
*o
= prepare_stat_op(oid
, oloc
, snap
, psize
, pmtime
, flags
,
2411 onfinish
, objver
, extra_ops
);
2417 Op
*prepare_read_op(
2418 const object_t
& oid
, const object_locator_t
& oloc
,
2419 uint64_t off
, uint64_t len
, snapid_t snap
, bufferlist
*pbl
,
2420 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2421 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0,
2422 ZTracer::Trace
*parent_trace
= nullptr) {
2424 int i
= init_ops(ops
, 1, extra_ops
);
2425 ops
[i
].op
.op
= CEPH_OSD_OP_READ
;
2426 ops
[i
].op
.extent
.offset
= off
;
2427 ops
[i
].op
.extent
.length
= len
;
2428 ops
[i
].op
.extent
.truncate_size
= 0;
2429 ops
[i
].op
.extent
.truncate_seq
= 0;
2430 ops
[i
].op
.flags
= op_flags
;
2431 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2432 CEPH_OSD_FLAG_READ
, onfinish
, objver
, nullptr, parent_trace
);
2438 const object_t
& oid
, const object_locator_t
& oloc
,
2439 uint64_t off
, uint64_t len
, snapid_t snap
, bufferlist
*pbl
,
2440 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2441 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2442 Op
*o
= prepare_read_op(oid
, oloc
, off
, len
, snap
, pbl
, flags
,
2443 onfinish
, objver
, extra_ops
, op_flags
);
2449 Op
*prepare_cmpext_op(
2450 const object_t
& oid
, const object_locator_t
& oloc
,
2451 uint64_t off
, bufferlist
&cmp_bl
,
2452 snapid_t snap
, int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2453 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2455 int i
= init_ops(ops
, 1, extra_ops
);
2456 ops
[i
].op
.op
= CEPH_OSD_OP_CMPEXT
;
2457 ops
[i
].op
.extent
.offset
= off
;
2458 ops
[i
].op
.extent
.length
= cmp_bl
.length();
2459 ops
[i
].op
.extent
.truncate_size
= 0;
2460 ops
[i
].op
.extent
.truncate_seq
= 0;
2461 ops
[i
].indata
= cmp_bl
;
2462 ops
[i
].op
.flags
= op_flags
;
2463 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2464 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
2470 const object_t
& oid
, const object_locator_t
& oloc
,
2471 uint64_t off
, bufferlist
&cmp_bl
,
2472 snapid_t snap
, int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2473 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2474 Op
*o
= prepare_cmpext_op(oid
, oloc
, off
, cmp_bl
, snap
,
2475 flags
, onfinish
, objver
, extra_ops
, op_flags
);
2481 ceph_tid_t
read_trunc(const object_t
& oid
, const object_locator_t
& oloc
,
2482 uint64_t off
, uint64_t len
, snapid_t snap
,
2483 bufferlist
*pbl
, int flags
, uint64_t trunc_size
,
2484 __u32 trunc_seq
, Context
*onfinish
,
2485 version_t
*objver
= NULL
,
2486 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2488 int i
= init_ops(ops
, 1, extra_ops
);
2489 ops
[i
].op
.op
= CEPH_OSD_OP_READ
;
2490 ops
[i
].op
.extent
.offset
= off
;
2491 ops
[i
].op
.extent
.length
= len
;
2492 ops
[i
].op
.extent
.truncate_size
= trunc_size
;
2493 ops
[i
].op
.extent
.truncate_seq
= trunc_seq
;
2494 ops
[i
].op
.flags
= op_flags
;
2495 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2496 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
2503 ceph_tid_t
mapext(const object_t
& oid
, const object_locator_t
& oloc
,
2504 uint64_t off
, uint64_t len
, snapid_t snap
, bufferlist
*pbl
,
2505 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2506 ObjectOperation
*extra_ops
= NULL
) {
2508 int i
= init_ops(ops
, 1, extra_ops
);
2509 ops
[i
].op
.op
= CEPH_OSD_OP_MAPEXT
;
2510 ops
[i
].op
.extent
.offset
= off
;
2511 ops
[i
].op
.extent
.length
= len
;
2512 ops
[i
].op
.extent
.truncate_size
= 0;
2513 ops
[i
].op
.extent
.truncate_seq
= 0;
2514 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2515 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
2522 ceph_tid_t
getxattr(const object_t
& oid
, const object_locator_t
& oloc
,
2523 const char *name
, snapid_t snap
, bufferlist
*pbl
, int flags
,
2525 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2527 int i
= init_ops(ops
, 1, extra_ops
);
2528 ops
[i
].op
.op
= CEPH_OSD_OP_GETXATTR
;
2529 ops
[i
].op
.xattr
.name_len
= (name
? strlen(name
) : 0);
2530 ops
[i
].op
.xattr
.value_len
= 0;
2532 ops
[i
].indata
.append(name
, ops
[i
].op
.xattr
.name_len
);
2533 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2534 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
2542 ceph_tid_t
getxattrs(const object_t
& oid
, const object_locator_t
& oloc
,
2543 snapid_t snap
, map
<string
,bufferlist
>& attrset
,
2544 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2545 ObjectOperation
*extra_ops
= NULL
) {
2547 int i
= init_ops(ops
, 1, extra_ops
);
2548 ops
[i
].op
.op
= CEPH_OSD_OP_GETXATTRS
;
2549 C_GetAttrs
*fin
= new C_GetAttrs(attrset
, onfinish
);
2550 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2551 CEPH_OSD_FLAG_READ
, fin
, objver
);
2553 o
->outbl
= &fin
->bl
;
2559 ceph_tid_t
read_full(const object_t
& oid
, const object_locator_t
& oloc
,
2560 snapid_t snap
, bufferlist
*pbl
, int flags
,
2561 Context
*onfinish
, version_t
*objver
= NULL
,
2562 ObjectOperation
*extra_ops
= NULL
) {
2563 return read(oid
, oloc
, 0, 0, snap
, pbl
, flags
| global_op_flags
|
2564 CEPH_OSD_FLAG_READ
, onfinish
, objver
, extra_ops
);
2569 ceph_tid_t
_modify(const object_t
& oid
, const object_locator_t
& oloc
,
2570 vector
<OSDOp
>& ops
, ceph::real_time mtime
,
2571 const SnapContext
& snapc
, int flags
,
2573 version_t
*objver
= NULL
) {
2574 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2575 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2582 Op
*prepare_write_op(
2583 const object_t
& oid
, const object_locator_t
& oloc
,
2584 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
2585 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2586 Context
*oncommit
, version_t
*objver
= NULL
,
2587 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0,
2588 ZTracer::Trace
*parent_trace
= nullptr) {
2590 int i
= init_ops(ops
, 1, extra_ops
);
2591 ops
[i
].op
.op
= CEPH_OSD_OP_WRITE
;
2592 ops
[i
].op
.extent
.offset
= off
;
2593 ops
[i
].op
.extent
.length
= len
;
2594 ops
[i
].op
.extent
.truncate_size
= 0;
2595 ops
[i
].op
.extent
.truncate_seq
= 0;
2597 ops
[i
].op
.flags
= op_flags
;
2598 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2599 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
,
2600 nullptr, parent_trace
);
2606 const object_t
& oid
, const object_locator_t
& oloc
,
2607 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
2608 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2609 Context
*oncommit
, version_t
*objver
= NULL
,
2610 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2611 Op
*o
= prepare_write_op(oid
, oloc
, off
, len
, snapc
, bl
, mtime
, flags
,
2612 oncommit
, objver
, extra_ops
, op_flags
);
2617 Op
*prepare_append_op(
2618 const object_t
& oid
, const object_locator_t
& oloc
,
2619 uint64_t len
, const SnapContext
& snapc
,
2620 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2622 version_t
*objver
= NULL
,
2623 ObjectOperation
*extra_ops
= NULL
) {
2625 int i
= init_ops(ops
, 1, extra_ops
);
2626 ops
[i
].op
.op
= CEPH_OSD_OP_APPEND
;
2627 ops
[i
].op
.extent
.offset
= 0;
2628 ops
[i
].op
.extent
.length
= len
;
2629 ops
[i
].op
.extent
.truncate_size
= 0;
2630 ops
[i
].op
.extent
.truncate_seq
= 0;
2632 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2633 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2639 const object_t
& oid
, const object_locator_t
& oloc
,
2640 uint64_t len
, const SnapContext
& snapc
,
2641 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2643 version_t
*objver
= NULL
,
2644 ObjectOperation
*extra_ops
= NULL
) {
2645 Op
*o
= prepare_append_op(oid
, oloc
, len
, snapc
, bl
, mtime
, flags
,
2646 oncommit
, objver
, extra_ops
);
2651 ceph_tid_t
write_trunc(const object_t
& oid
, const object_locator_t
& oloc
,
2652 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
2653 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2654 uint64_t trunc_size
, __u32 trunc_seq
,
2656 version_t
*objver
= NULL
,
2657 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2659 int i
= init_ops(ops
, 1, extra_ops
);
2660 ops
[i
].op
.op
= CEPH_OSD_OP_WRITE
;
2661 ops
[i
].op
.extent
.offset
= off
;
2662 ops
[i
].op
.extent
.length
= len
;
2663 ops
[i
].op
.extent
.truncate_size
= trunc_size
;
2664 ops
[i
].op
.extent
.truncate_seq
= trunc_seq
;
2666 ops
[i
].op
.flags
= op_flags
;
2667 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2668 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2675 Op
*prepare_write_full_op(
2676 const object_t
& oid
, const object_locator_t
& oloc
,
2677 const SnapContext
& snapc
, const bufferlist
&bl
,
2678 ceph::real_time mtime
, int flags
,
2679 Context
*oncommit
, version_t
*objver
= NULL
,
2680 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2682 int i
= init_ops(ops
, 1, extra_ops
);
2683 ops
[i
].op
.op
= CEPH_OSD_OP_WRITEFULL
;
2684 ops
[i
].op
.extent
.offset
= 0;
2685 ops
[i
].op
.extent
.length
= bl
.length();
2687 ops
[i
].op
.flags
= op_flags
;
2688 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2689 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2694 ceph_tid_t
write_full(
2695 const object_t
& oid
, const object_locator_t
& oloc
,
2696 const SnapContext
& snapc
, const bufferlist
&bl
,
2697 ceph::real_time mtime
, int flags
,
2698 Context
*oncommit
, version_t
*objver
= NULL
,
2699 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2700 Op
*o
= prepare_write_full_op(oid
, oloc
, snapc
, bl
, mtime
, flags
,
2701 oncommit
, objver
, extra_ops
, op_flags
);
2706 Op
*prepare_writesame_op(
2707 const object_t
& oid
, const object_locator_t
& oloc
,
2708 uint64_t write_len
, uint64_t off
,
2709 const SnapContext
& snapc
, const bufferlist
&bl
,
2710 ceph::real_time mtime
, int flags
,
2711 Context
*oncommit
, version_t
*objver
= NULL
,
2712 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2715 int i
= init_ops(ops
, 1, extra_ops
);
2716 ops
[i
].op
.op
= CEPH_OSD_OP_WRITESAME
;
2717 ops
[i
].op
.writesame
.offset
= off
;
2718 ops
[i
].op
.writesame
.length
= write_len
;
2719 ops
[i
].op
.writesame
.data_length
= bl
.length();
2721 ops
[i
].op
.flags
= op_flags
;
2722 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2723 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2728 ceph_tid_t
writesame(
2729 const object_t
& oid
, const object_locator_t
& oloc
,
2730 uint64_t write_len
, uint64_t off
,
2731 const SnapContext
& snapc
, const bufferlist
&bl
,
2732 ceph::real_time mtime
, int flags
,
2733 Context
*oncommit
, version_t
*objver
= NULL
,
2734 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2736 Op
*o
= prepare_writesame_op(oid
, oloc
, write_len
, off
, snapc
, bl
,
2737 mtime
, flags
, oncommit
, objver
,
2738 extra_ops
, op_flags
);
2744 ceph_tid_t
trunc(const object_t
& oid
, const object_locator_t
& oloc
,
2745 const SnapContext
& snapc
, ceph::real_time mtime
, int flags
,
2746 uint64_t trunc_size
, __u32 trunc_seq
,
2747 Context
*oncommit
, version_t
*objver
= NULL
,
2748 ObjectOperation
*extra_ops
= NULL
) {
2750 int i
= init_ops(ops
, 1, extra_ops
);
2751 ops
[i
].op
.op
= CEPH_OSD_OP_TRUNCATE
;
2752 ops
[i
].op
.extent
.offset
= trunc_size
;
2753 ops
[i
].op
.extent
.truncate_size
= trunc_size
;
2754 ops
[i
].op
.extent
.truncate_seq
= trunc_seq
;
2755 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2756 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2763 ceph_tid_t
zero(const object_t
& oid
, const object_locator_t
& oloc
,
2764 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
2765 ceph::real_time mtime
, int flags
, Context
*oncommit
,
2766 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2768 int i
= init_ops(ops
, 1, extra_ops
);
2769 ops
[i
].op
.op
= CEPH_OSD_OP_ZERO
;
2770 ops
[i
].op
.extent
.offset
= off
;
2771 ops
[i
].op
.extent
.length
= len
;
2772 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2773 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2780 ceph_tid_t
rollback_object(const object_t
& oid
, const object_locator_t
& oloc
,
2781 const SnapContext
& snapc
, snapid_t snapid
,
2782 ceph::real_time mtime
, Context
*oncommit
,
2783 version_t
*objver
= NULL
,
2784 ObjectOperation
*extra_ops
= NULL
) {
2786 int i
= init_ops(ops
, 1, extra_ops
);
2787 ops
[i
].op
.op
= CEPH_OSD_OP_ROLLBACK
;
2788 ops
[i
].op
.snap
.snapid
= snapid
;
2789 Op
*o
= new Op(oid
, oloc
, ops
, CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2796 ceph_tid_t
create(const object_t
& oid
, const object_locator_t
& oloc
,
2797 const SnapContext
& snapc
, ceph::real_time mtime
, int global_flags
,
2798 int create_flags
, Context
*oncommit
,
2799 version_t
*objver
= NULL
,
2800 ObjectOperation
*extra_ops
= NULL
) {
2802 int i
= init_ops(ops
, 1, extra_ops
);
2803 ops
[i
].op
.op
= CEPH_OSD_OP_CREATE
;
2804 ops
[i
].op
.flags
= create_flags
;
2805 Op
*o
= new Op(oid
, oloc
, ops
, global_flags
| global_op_flags
|
2806 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2813 Op
*prepare_remove_op(
2814 const object_t
& oid
, const object_locator_t
& oloc
,
2815 const SnapContext
& snapc
, ceph::real_time mtime
, int flags
,
2817 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2819 int i
= init_ops(ops
, 1, extra_ops
);
2820 ops
[i
].op
.op
= CEPH_OSD_OP_DELETE
;
2821 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2822 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2828 const object_t
& oid
, const object_locator_t
& oloc
,
2829 const SnapContext
& snapc
, ceph::real_time mtime
, int flags
,
2831 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2832 Op
*o
= prepare_remove_op(oid
, oloc
, snapc
, mtime
, flags
,
2833 oncommit
, objver
, extra_ops
);
2839 ceph_tid_t
setxattr(const object_t
& oid
, const object_locator_t
& oloc
,
2840 const char *name
, const SnapContext
& snapc
, const bufferlist
&bl
,
2841 ceph::real_time mtime
, int flags
,
2843 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2845 int i
= init_ops(ops
, 1, extra_ops
);
2846 ops
[i
].op
.op
= CEPH_OSD_OP_SETXATTR
;
2847 ops
[i
].op
.xattr
.name_len
= (name
? strlen(name
) : 0);
2848 ops
[i
].op
.xattr
.value_len
= bl
.length();
2850 ops
[i
].indata
.append(name
, ops
[i
].op
.xattr
.name_len
);
2851 ops
[i
].indata
.append(bl
);
2852 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2853 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2860 ceph_tid_t
removexattr(const object_t
& oid
, const object_locator_t
& oloc
,
2861 const char *name
, const SnapContext
& snapc
,
2862 ceph::real_time mtime
, int flags
,
2864 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2866 int i
= init_ops(ops
, 1, extra_ops
);
2867 ops
[i
].op
.op
= CEPH_OSD_OP_RMXATTR
;
2868 ops
[i
].op
.xattr
.name_len
= (name
? strlen(name
) : 0);
2869 ops
[i
].op
.xattr
.value_len
= 0;
2871 ops
[i
].indata
.append(name
, ops
[i
].op
.xattr
.name_len
);
2872 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2873 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2881 void list_nobjects(NListContext
*p
, Context
*onfinish
);
2882 uint32_t list_nobjects_seek(NListContext
*p
, uint32_t pos
);
2883 uint32_t list_nobjects_seek(NListContext
*list_context
, const hobject_t
& c
);
2884 void list_nobjects_get_cursor(NListContext
*list_context
, hobject_t
*c
);
2886 hobject_t
enumerate_objects_begin();
2887 hobject_t
enumerate_objects_end();
2888 //hobject_t enumerate_objects_begin(int n, int m);
2889 void enumerate_objects(
2891 const std::string
&ns
,
2892 const hobject_t
&start
,
2893 const hobject_t
&end
,
2895 const bufferlist
&filter_bl
,
2896 std::list
<librados::ListObjectImpl
> *result
,
2898 Context
*on_finish
);
2900 void _enumerate_reply(
2903 const hobject_t
&end
,
2904 const int64_t pool_id
,
2906 epoch_t reply_epoch
,
2907 std::list
<librados::ListObjectImpl
> *result
,
2909 Context
*on_finish
);
2910 friend class C_EnumerateReply
;
2912 // -------------------------
2915 void pool_op_submit(PoolOp
*op
);
2916 void _pool_op_submit(PoolOp
*op
);
2917 void _finish_pool_op(PoolOp
*op
, int r
);
2918 void _do_delete_pool(int64_t pool
, Context
*onfinish
);
2920 int create_pool_snap(int64_t pool
, string
& snapName
, Context
*onfinish
);
2921 int allocate_selfmanaged_snap(int64_t pool
, snapid_t
*psnapid
,
2923 int delete_pool_snap(int64_t pool
, string
& snapName
, Context
*onfinish
);
2924 int delete_selfmanaged_snap(int64_t pool
, snapid_t snap
, Context
*onfinish
);
2926 int create_pool(string
& name
, Context
*onfinish
,
2928 int delete_pool(int64_t pool
, Context
*onfinish
);
2929 int delete_pool(const string
& name
, Context
*onfinish
);
2931 void handle_pool_op_reply(MPoolOpReply
*m
);
2932 int pool_op_cancel(ceph_tid_t tid
, int r
);
2934 // --------------------------
2937 void _poolstat_submit(PoolStatOp
*op
);
2939 void handle_get_pool_stats_reply(MGetPoolStatsReply
*m
);
2940 void get_pool_stats(list
<string
>& pools
, map
<string
,pool_stat_t
> *result
,
2943 int pool_stat_op_cancel(ceph_tid_t tid
, int r
);
2944 void _finish_pool_stat_op(PoolStatOp
*op
, int r
);
2946 // ---------------------------
2949 void _fs_stats_submit(StatfsOp
*op
);
2951 void handle_fs_stats_reply(MStatfsReply
*m
);
2952 void get_fs_stats(struct ceph_statfs
& result
, boost::optional
<int64_t> poolid
,
2954 int statfs_op_cancel(ceph_tid_t tid
, int r
);
2955 void _finish_statfs_op(StatfsOp
*op
, int r
);
2957 // ---------------------------
2958 // some scatter/gather hackery
2960 void _sg_read_finish(vector
<ObjectExtent
>& extents
,
2961 vector
<bufferlist
>& resultbl
,
2962 bufferlist
*bl
, Context
*onfinish
);
2964 struct C_SGRead
: public Context
{
2966 vector
<ObjectExtent
> extents
;
2967 vector
<bufferlist
> resultbl
;
2970 C_SGRead(Objecter
*ob
,
2971 vector
<ObjectExtent
>& e
, vector
<bufferlist
>& r
, bufferlist
*b
,
2973 objecter(ob
), bl(b
), onfinish(c
) {
2977 void finish(int r
) override
{
2978 objecter
->_sg_read_finish(extents
, resultbl
, bl
, onfinish
);
2982 void sg_read_trunc(vector
<ObjectExtent
>& extents
, snapid_t snap
,
2983 bufferlist
*bl
, int flags
, uint64_t trunc_size
,
2984 __u32 trunc_seq
, Context
*onfinish
, int op_flags
= 0) {
2985 if (extents
.size() == 1) {
2986 read_trunc(extents
[0].oid
, extents
[0].oloc
, extents
[0].offset
,
2987 extents
[0].length
, snap
, bl
, flags
, extents
[0].truncate_size
,
2988 trunc_seq
, onfinish
, 0, 0, op_flags
);
2990 C_GatherBuilder
gather(cct
);
2991 vector
<bufferlist
> resultbl(extents
.size());
2993 for (vector
<ObjectExtent
>::iterator p
= extents
.begin();
2996 read_trunc(p
->oid
, p
->oloc
, p
->offset
, p
->length
, snap
, &resultbl
[i
++],
2997 flags
, p
->truncate_size
, trunc_seq
, gather
.new_sub(),
3000 gather
.set_finisher(new C_SGRead(this, extents
, resultbl
, bl
, onfinish
));
3005 void sg_read(vector
<ObjectExtent
>& extents
, snapid_t snap
, bufferlist
*bl
,
3006 int flags
, Context
*onfinish
, int op_flags
= 0) {
3007 sg_read_trunc(extents
, snap
, bl
, flags
, 0, 0, onfinish
, op_flags
);
3010 void sg_write_trunc(vector
<ObjectExtent
>& extents
, const SnapContext
& snapc
,
3011 const bufferlist
& bl
, ceph::real_time mtime
, int flags
,
3012 uint64_t trunc_size
, __u32 trunc_seq
,
3013 Context
*oncommit
, int op_flags
= 0) {
3014 if (extents
.size() == 1) {
3015 write_trunc(extents
[0].oid
, extents
[0].oloc
, extents
[0].offset
,
3016 extents
[0].length
, snapc
, bl
, mtime
, flags
,
3017 extents
[0].truncate_size
, trunc_seq
, oncommit
,
3020 C_GatherBuilder
gcom(cct
, oncommit
);
3021 for (vector
<ObjectExtent
>::iterator p
= extents
.begin();
3025 for (vector
<pair
<uint64_t,uint64_t> >::iterator bit
3026 = p
->buffer_extents
.begin();
3027 bit
!= p
->buffer_extents
.end();
3029 bl
.copy(bit
->first
, bit
->second
, cur
);
3030 ceph_assert(cur
.length() == p
->length
);
3031 write_trunc(p
->oid
, p
->oloc
, p
->offset
, p
->length
,
3032 snapc
, cur
, mtime
, flags
, p
->truncate_size
, trunc_seq
,
3033 oncommit
? gcom
.new_sub():0,
3040 void sg_write(vector
<ObjectExtent
>& extents
, const SnapContext
& snapc
,
3041 const bufferlist
& bl
, ceph::real_time mtime
, int flags
,
3042 Context
*oncommit
, int op_flags
= 0) {
3043 sg_write_trunc(extents
, snapc
, bl
, mtime
, flags
, 0, 0, oncommit
,
3047 void ms_handle_connect(Connection
*con
) override
;
3048 bool ms_handle_reset(Connection
*con
) override
;
3049 void ms_handle_remote_reset(Connection
*con
) override
;
3050 bool ms_handle_refused(Connection
*con
) override
;
3051 bool ms_get_authorizer(int dest_type
,
3052 AuthAuthorizer
**authorizer
) override
;
3054 void blacklist_self(bool set
);
3057 epoch_t epoch_barrier
;
3058 bool retry_writes_after_first_reply
;
3060 void set_epoch_barrier(epoch_t epoch
);
3062 PerfCounters
*get_logger() {