1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_OBJECTER_H
16 #define CEPH_OBJECTER_H
18 #include <condition_variable>
24 #include <type_traits>
26 #include <boost/thread/shared_mutex.hpp>
28 #include "include/assert.h"
29 #include "include/buffer.h"
30 #include "include/types.h"
31 #include "include/rados/rados_types.hpp"
33 #include "common/admin_socket.h"
34 #include "common/ceph_time.h"
35 #include "common/ceph_timer.h"
36 #include "common/Finisher.h"
37 #include "common/shunique_lock.h"
38 #include "common/zipkin_trace.h"
40 #include "messages/MOSDOp.h"
41 #include "osd/OSDMap.h"
54 class MGetPoolStatsReply
;
61 // -----------------------------------------
63 struct ObjectOperation
{
68 vector
<bufferlist
*> out_bl
;
69 vector
<Context
*> out_handler
;
70 vector
<int*> out_rval
;
72 ObjectOperation() : flags(0), priority(0) {}
74 while (!out_handler
.empty()) {
75 delete out_handler
.back();
76 out_handler
.pop_back();
84 void set_last_op_flags(int flags
) {
86 ops
.rbegin()->op
.flags
= flags
;
91 * Add a callback to run when this operation completes,
92 * after any other callbacks for it.
94 void add_handler(Context
*extra
);
96 OSDOp
& add_op(int op
) {
102 out_handler
.resize(s
+1);
103 out_handler
[s
] = NULL
;
104 out_rval
.resize(s
+1);
108 void add_data(int op
, uint64_t off
, uint64_t len
, bufferlist
& bl
) {
109 OSDOp
& osd_op
= add_op(op
);
110 osd_op
.op
.extent
.offset
= off
;
111 osd_op
.op
.extent
.length
= len
;
112 osd_op
.indata
.claim_append(bl
);
114 void add_writesame(int op
, uint64_t off
, uint64_t write_len
,
116 OSDOp
& osd_op
= add_op(op
);
117 osd_op
.op
.writesame
.offset
= off
;
118 osd_op
.op
.writesame
.length
= write_len
;
119 osd_op
.op
.writesame
.data_length
= bl
.length();
120 osd_op
.indata
.claim_append(bl
);
122 void add_xattr(int op
, const char *name
, const bufferlist
& data
) {
123 OSDOp
& osd_op
= add_op(op
);
124 osd_op
.op
.xattr
.name_len
= (name
? strlen(name
) : 0);
125 osd_op
.op
.xattr
.value_len
= data
.length();
127 osd_op
.indata
.append(name
);
128 osd_op
.indata
.append(data
);
130 void add_xattr_cmp(int op
, const char *name
, uint8_t cmp_op
,
131 uint8_t cmp_mode
, const bufferlist
& data
) {
132 OSDOp
& osd_op
= add_op(op
);
133 osd_op
.op
.xattr
.name_len
= (name
? strlen(name
) : 0);
134 osd_op
.op
.xattr
.value_len
= data
.length();
135 osd_op
.op
.xattr
.cmp_op
= cmp_op
;
136 osd_op
.op
.xattr
.cmp_mode
= cmp_mode
;
138 osd_op
.indata
.append(name
);
139 osd_op
.indata
.append(data
);
141 void add_call(int op
, const char *cname
, const char *method
,
143 bufferlist
*outbl
, Context
*ctx
, int *prval
) {
144 OSDOp
& osd_op
= add_op(op
);
146 unsigned p
= ops
.size() - 1;
147 out_handler
[p
] = ctx
;
151 osd_op
.op
.cls
.class_len
= strlen(cname
);
152 osd_op
.op
.cls
.method_len
= strlen(method
);
153 osd_op
.op
.cls
.indata_len
= indata
.length();
154 osd_op
.indata
.append(cname
, osd_op
.op
.cls
.class_len
);
155 osd_op
.indata
.append(method
, osd_op
.op
.cls
.method_len
);
156 osd_op
.indata
.append(indata
);
158 void add_pgls(int op
, uint64_t count
, collection_list_handle_t cookie
,
159 epoch_t start_epoch
) {
160 OSDOp
& osd_op
= add_op(op
);
161 osd_op
.op
.pgls
.count
= count
;
162 osd_op
.op
.pgls
.start_epoch
= start_epoch
;
163 ::encode(cookie
, osd_op
.indata
);
165 void add_pgls_filter(int op
, uint64_t count
, const bufferlist
& filter
,
166 collection_list_handle_t cookie
, epoch_t start_epoch
) {
167 OSDOp
& osd_op
= add_op(op
);
168 osd_op
.op
.pgls
.count
= count
;
169 osd_op
.op
.pgls
.start_epoch
= start_epoch
;
171 string mname
= "filter";
172 ::encode(cname
, osd_op
.indata
);
173 ::encode(mname
, osd_op
.indata
);
174 osd_op
.indata
.append(filter
);
175 ::encode(cookie
, osd_op
.indata
);
177 void add_alloc_hint(int op
, uint64_t expected_object_size
,
178 uint64_t expected_write_size
,
180 OSDOp
& osd_op
= add_op(op
);
181 osd_op
.op
.alloc_hint
.expected_object_size
= expected_object_size
;
182 osd_op
.op
.alloc_hint
.expected_write_size
= expected_write_size
;
183 osd_op
.op
.alloc_hint
.flags
= flags
;
189 void pg_ls(uint64_t count
, bufferlist
& filter
,
190 collection_list_handle_t cookie
, epoch_t start_epoch
) {
191 if (filter
.length() == 0)
192 add_pgls(CEPH_OSD_OP_PGLS
, count
, cookie
, start_epoch
);
194 add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER
, count
, filter
, cookie
,
196 flags
|= CEPH_OSD_FLAG_PGOP
;
199 void pg_nls(uint64_t count
, const bufferlist
& filter
,
200 collection_list_handle_t cookie
, epoch_t start_epoch
) {
201 if (filter
.length() == 0)
202 add_pgls(CEPH_OSD_OP_PGNLS
, count
, cookie
, start_epoch
);
204 add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER
, count
, filter
, cookie
,
206 flags
|= CEPH_OSD_FLAG_PGOP
;
209 void scrub_ls(const librados::object_id_t
& start_after
,
211 std::vector
<librados::inconsistent_obj_t
> *objects
,
214 void scrub_ls(const librados::object_id_t
& start_after
,
216 std::vector
<librados::inconsistent_snapset_t
> *objects
,
220 void create(bool excl
) {
221 OSDOp
& o
= add_op(CEPH_OSD_OP_CREATE
);
222 o
.op
.flags
= (excl
? CEPH_OSD_OP_FLAG_EXCL
: 0);
225 struct C_ObjectOperation_stat
: public Context
{
228 ceph::real_time
*pmtime
;
230 struct timespec
*pts
;
232 C_ObjectOperation_stat(uint64_t *ps
, ceph::real_time
*pm
, time_t *pt
, struct timespec
*_pts
,
234 : psize(ps
), pmtime(pm
), ptime(pt
), pts(_pts
), prval(prval
) {}
235 void finish(int r
) override
{
237 bufferlist::iterator p
= bl
.begin();
240 ceph::real_time mtime
;
248 *ptime
= ceph::real_clock::to_time_t(mtime
);
250 *pts
= ceph::real_clock::to_timespec(mtime
);
251 } catch (buffer::error
& e
) {
258 void stat(uint64_t *psize
, ceph::real_time
*pmtime
, int *prval
) {
259 add_op(CEPH_OSD_OP_STAT
);
260 unsigned p
= ops
.size() - 1;
261 C_ObjectOperation_stat
*h
= new C_ObjectOperation_stat(psize
, pmtime
, NULL
, NULL
,
267 void stat(uint64_t *psize
, time_t *ptime
, int *prval
) {
268 add_op(CEPH_OSD_OP_STAT
);
269 unsigned p
= ops
.size() - 1;
270 C_ObjectOperation_stat
*h
= new C_ObjectOperation_stat(psize
, NULL
, ptime
, NULL
,
276 void stat(uint64_t *psize
, struct timespec
*pts
, int *prval
) {
277 add_op(CEPH_OSD_OP_STAT
);
278 unsigned p
= ops
.size() - 1;
279 C_ObjectOperation_stat
*h
= new C_ObjectOperation_stat(psize
, NULL
, NULL
, pts
,
286 struct C_ObjectOperation_cmpext
: public Context
{
288 C_ObjectOperation_cmpext(int *prval
)
297 void cmpext(uint64_t off
, bufferlist
& cmp_bl
, int *prval
) {
298 add_data(CEPH_OSD_OP_CMPEXT
, off
, cmp_bl
.length(), cmp_bl
);
299 unsigned p
= ops
.size() - 1;
300 C_ObjectOperation_cmpext
*h
= new C_ObjectOperation_cmpext(prval
);
306 void cmpext(uint64_t off
, uint64_t cmp_len
, const char *cmp_buf
, int *prval
) {
308 cmp_bl
.append(cmp_buf
, cmp_len
);
309 add_data(CEPH_OSD_OP_CMPEXT
, off
, cmp_len
, cmp_bl
);
310 unsigned p
= ops
.size() - 1;
311 C_ObjectOperation_cmpext
*h
= new C_ObjectOperation_cmpext(prval
);
316 void read(uint64_t off
, uint64_t len
, bufferlist
*pbl
, int *prval
,
319 add_data(CEPH_OSD_OP_READ
, off
, len
, bl
);
320 unsigned p
= ops
.size() - 1;
323 out_handler
[p
] = ctx
;
326 struct C_ObjectOperation_sparse_read
: public Context
{
329 std::map
<uint64_t, uint64_t> *extents
;
331 C_ObjectOperation_sparse_read(bufferlist
*data_bl
,
332 std::map
<uint64_t, uint64_t> *extents
,
334 : data_bl(data_bl
), extents(extents
), prval(prval
) {}
335 void finish(int r
) override
{
336 bufferlist::iterator iter
= bl
.begin();
339 ::decode(*extents
, iter
);
340 ::decode(*data_bl
, iter
);
341 } catch (buffer::error
& e
) {
348 void sparse_read(uint64_t off
, uint64_t len
, std::map
<uint64_t,uint64_t> *m
,
349 bufferlist
*data_bl
, int *prval
) {
351 add_data(CEPH_OSD_OP_SPARSE_READ
, off
, len
, bl
);
352 unsigned p
= ops
.size() - 1;
353 C_ObjectOperation_sparse_read
*h
=
354 new C_ObjectOperation_sparse_read(data_bl
, m
, prval
);
359 void write(uint64_t off
, bufferlist
& bl
,
360 uint64_t truncate_size
,
361 uint32_t truncate_seq
) {
362 add_data(CEPH_OSD_OP_WRITE
, off
, bl
.length(), bl
);
363 OSDOp
& o
= *ops
.rbegin();
364 o
.op
.extent
.truncate_size
= truncate_size
;
365 o
.op
.extent
.truncate_seq
= truncate_seq
;
367 void write(uint64_t off
, bufferlist
& bl
) {
368 write(off
, bl
, 0, 0);
370 void write_full(bufferlist
& bl
) {
371 add_data(CEPH_OSD_OP_WRITEFULL
, 0, bl
.length(), bl
);
373 void writesame(uint64_t off
, uint64_t write_len
, bufferlist
& bl
) {
374 add_writesame(CEPH_OSD_OP_WRITESAME
, off
, write_len
, bl
);
376 void append(bufferlist
& bl
) {
377 add_data(CEPH_OSD_OP_APPEND
, 0, bl
.length(), bl
);
379 void zero(uint64_t off
, uint64_t len
) {
381 add_data(CEPH_OSD_OP_ZERO
, off
, len
, bl
);
383 void truncate(uint64_t off
) {
385 add_data(CEPH_OSD_OP_TRUNCATE
, off
, 0, bl
);
389 add_data(CEPH_OSD_OP_DELETE
, 0, 0, bl
);
391 void mapext(uint64_t off
, uint64_t len
) {
393 add_data(CEPH_OSD_OP_MAPEXT
, off
, len
, bl
);
395 void sparse_read(uint64_t off
, uint64_t len
) {
397 add_data(CEPH_OSD_OP_SPARSE_READ
, off
, len
, bl
);
400 void checksum(uint8_t type
, const bufferlist
&init_value_bl
,
401 uint64_t off
, uint64_t len
, size_t chunk_size
,
402 bufferlist
*pbl
, int *prval
, Context
*ctx
) {
403 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_CHECKSUM
);
404 osd_op
.op
.checksum
.offset
= off
;
405 osd_op
.op
.checksum
.length
= len
;
406 osd_op
.op
.checksum
.type
= type
;
407 osd_op
.op
.checksum
.chunk_size
= chunk_size
;
408 osd_op
.indata
.append(init_value_bl
);
410 unsigned p
= ops
.size() - 1;
413 out_handler
[p
] = ctx
;
417 void getxattr(const char *name
, bufferlist
*pbl
, int *prval
) {
419 add_xattr(CEPH_OSD_OP_GETXATTR
, name
, bl
);
420 unsigned p
= ops
.size() - 1;
424 struct C_ObjectOperation_decodevals
: public Context
{
425 uint64_t max_entries
;
427 std::map
<std::string
,bufferlist
> *pattrs
;
430 C_ObjectOperation_decodevals(uint64_t m
, std::map
<std::string
,bufferlist
> *pa
,
432 : max_entries(m
), pattrs(pa
), ptruncated(pt
), prval(pr
) {
437 void finish(int r
) override
{
439 bufferlist::iterator p
= bl
.begin();
442 ::decode(*pattrs
, p
);
444 std::map
<std::string
,bufferlist
> ignore
;
450 ::decode(*ptruncated
, p
);
452 // the OSD did not provide this. since old OSDs do not
453 // enfoce omap result limits either, we can infer it from
454 // the size of the result
455 *ptruncated
= (pattrs
->size() == max_entries
);
459 catch (buffer::error
& e
) {
466 struct C_ObjectOperation_decodekeys
: public Context
{
467 uint64_t max_entries
;
469 std::set
<std::string
> *pattrs
;
472 C_ObjectOperation_decodekeys(uint64_t m
, std::set
<std::string
> *pa
, bool *pt
,
474 : max_entries(m
), pattrs(pa
), ptruncated(pt
), prval(pr
) {
479 void finish(int r
) override
{
481 bufferlist::iterator p
= bl
.begin();
484 ::decode(*pattrs
, p
);
486 std::set
<std::string
> ignore
;
492 ::decode(*ptruncated
, p
);
494 // the OSD did not provide this. since old OSDs do not
495 // enfoce omap result limits either, we can infer it from
496 // the size of the result
497 *ptruncated
= (pattrs
->size() == max_entries
);
501 catch (buffer::error
& e
) {
508 struct C_ObjectOperation_decodewatchers
: public Context
{
510 list
<obj_watch_t
> *pwatchers
;
512 C_ObjectOperation_decodewatchers(list
<obj_watch_t
> *pw
, int *pr
)
513 : pwatchers(pw
), prval(pr
) {}
514 void finish(int r
) override
{
516 bufferlist::iterator p
= bl
.begin();
518 obj_list_watch_response_t resp
;
521 for (list
<watch_item_t
>::iterator i
= resp
.entries
.begin() ;
522 i
!= resp
.entries
.end() ; ++i
) {
526 strncpy(ow
.addr
, sa
.str().c_str(), 256);
527 ow
.watcher_id
= i
->name
.num();
528 ow
.cookie
= i
->cookie
;
529 ow
.timeout_seconds
= i
->timeout_seconds
;
530 pwatchers
->push_back(ow
);
534 catch (buffer::error
& e
) {
541 struct C_ObjectOperation_decodesnaps
: public Context
{
543 librados::snap_set_t
*psnaps
;
545 C_ObjectOperation_decodesnaps(librados::snap_set_t
*ps
, int *pr
)
546 : psnaps(ps
), prval(pr
) {}
547 void finish(int r
) override
{
549 bufferlist::iterator p
= bl
.begin();
551 obj_list_snap_response_t resp
;
554 psnaps
->clones
.clear();
555 for (vector
<clone_info
>::iterator ci
= resp
.clones
.begin();
556 ci
!= resp
.clones
.end();
558 librados::clone_info_t clone
;
560 clone
.cloneid
= ci
->cloneid
;
561 clone
.snaps
.reserve(ci
->snaps
.size());
562 clone
.snaps
.insert(clone
.snaps
.end(), ci
->snaps
.begin(),
564 clone
.overlap
= ci
->overlap
;
565 clone
.size
= ci
->size
;
567 psnaps
->clones
.push_back(clone
);
569 psnaps
->seq
= resp
.seq
;
571 } catch (buffer::error
& e
) {
578 void getxattrs(std::map
<std::string
,bufferlist
> *pattrs
, int *prval
) {
579 add_op(CEPH_OSD_OP_GETXATTRS
);
580 if (pattrs
|| prval
) {
581 unsigned p
= ops
.size() - 1;
582 C_ObjectOperation_decodevals
*h
583 = new C_ObjectOperation_decodevals(0, pattrs
, nullptr, prval
);
589 void setxattr(const char *name
, const bufferlist
& bl
) {
590 add_xattr(CEPH_OSD_OP_SETXATTR
, name
, bl
);
592 void setxattr(const char *name
, const string
& s
) {
595 add_xattr(CEPH_OSD_OP_SETXATTR
, name
, bl
);
597 void cmpxattr(const char *name
, uint8_t cmp_op
, uint8_t cmp_mode
,
598 const bufferlist
& bl
) {
599 add_xattr_cmp(CEPH_OSD_OP_CMPXATTR
, name
, cmp_op
, cmp_mode
, bl
);
601 void rmxattr(const char *name
) {
603 add_xattr(CEPH_OSD_OP_RMXATTR
, name
, bl
);
605 void setxattrs(map
<string
, bufferlist
>& attrs
) {
608 add_xattr(CEPH_OSD_OP_RESETXATTRS
, 0, bl
.length());
610 void resetxattrs(const char *prefix
, map
<string
, bufferlist
>& attrs
) {
613 add_xattr(CEPH_OSD_OP_RESETXATTRS
, prefix
, bl
);
617 void tmap_update(bufferlist
& bl
) {
618 add_data(CEPH_OSD_OP_TMAPUP
, 0, 0, bl
);
620 void tmap_put(bufferlist
& bl
) {
621 add_data(CEPH_OSD_OP_TMAPPUT
, 0, bl
.length(), bl
);
623 void tmap_get(bufferlist
*pbl
, int *prval
) {
624 add_op(CEPH_OSD_OP_TMAPGET
);
625 unsigned p
= ops
.size() - 1;
630 add_op(CEPH_OSD_OP_TMAPGET
);
632 void tmap_to_omap(bool nullok
=false) {
633 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_TMAP2OMAP
);
635 osd_op
.op
.tmap2omap
.flags
= CEPH_OSD_TMAP2OMAP_NULLOK
;
639 void omap_get_keys(const string
&start_after
,
641 std::set
<std::string
> *out_set
,
644 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAPGETKEYS
);
646 ::encode(start_after
, bl
);
647 ::encode(max_to_get
, bl
);
648 op
.op
.extent
.offset
= 0;
649 op
.op
.extent
.length
= bl
.length();
650 op
.indata
.claim_append(bl
);
651 if (prval
|| ptruncated
|| out_set
) {
652 unsigned p
= ops
.size() - 1;
653 C_ObjectOperation_decodekeys
*h
=
654 new C_ObjectOperation_decodekeys(max_to_get
, out_set
, ptruncated
, prval
);
661 void omap_get_vals(const string
&start_after
,
662 const string
&filter_prefix
,
664 std::map
<std::string
, bufferlist
> *out_set
,
667 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAPGETVALS
);
669 ::encode(start_after
, bl
);
670 ::encode(max_to_get
, bl
);
671 ::encode(filter_prefix
, bl
);
672 op
.op
.extent
.offset
= 0;
673 op
.op
.extent
.length
= bl
.length();
674 op
.indata
.claim_append(bl
);
675 if (prval
|| out_set
|| ptruncated
) {
676 unsigned p
= ops
.size() - 1;
677 C_ObjectOperation_decodevals
*h
=
678 new C_ObjectOperation_decodevals(max_to_get
, out_set
, ptruncated
, prval
);
685 void omap_get_vals_by_keys(const std::set
<std::string
> &to_get
,
686 std::map
<std::string
, bufferlist
> *out_set
,
688 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS
);
690 ::encode(to_get
, bl
);
691 op
.op
.extent
.offset
= 0;
692 op
.op
.extent
.length
= bl
.length();
693 op
.indata
.claim_append(bl
);
694 if (prval
|| out_set
) {
695 unsigned p
= ops
.size() - 1;
696 C_ObjectOperation_decodevals
*h
=
697 new C_ObjectOperation_decodevals(0, out_set
, nullptr, prval
);
704 void omap_cmp(const std::map
<std::string
, pair
<bufferlist
,int> > &assertions
,
706 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAP_CMP
);
708 ::encode(assertions
, bl
);
709 op
.op
.extent
.offset
= 0;
710 op
.op
.extent
.length
= bl
.length();
711 op
.indata
.claim_append(bl
);
713 unsigned p
= ops
.size() - 1;
718 struct C_ObjectOperation_copyget
: public Context
{
720 object_copy_cursor_t
*cursor
;
722 ceph::real_time
*out_mtime
;
723 std::map
<std::string
,bufferlist
> *out_attrs
;
724 bufferlist
*out_data
, *out_omap_header
, *out_omap_data
;
725 vector
<snapid_t
> *out_snaps
;
726 snapid_t
*out_snap_seq
;
728 uint32_t *out_data_digest
;
729 uint32_t *out_omap_digest
;
730 mempool::osd_pglog::vector
<pair
<osd_reqid_t
, version_t
> > *out_reqids
;
731 uint64_t *out_truncate_seq
;
732 uint64_t *out_truncate_size
;
734 C_ObjectOperation_copyget(object_copy_cursor_t
*c
,
737 std::map
<std::string
,bufferlist
> *a
,
738 bufferlist
*d
, bufferlist
*oh
,
740 std::vector
<snapid_t
> *osnaps
,
745 mempool::osd_pglog::vector
<pair
<osd_reqid_t
, version_t
> > *oreqids
,
750 out_size(s
), out_mtime(m
),
751 out_attrs(a
), out_data(d
), out_omap_header(oh
),
752 out_omap_data(o
), out_snaps(osnaps
), out_snap_seq(osnap_seq
),
753 out_flags(flags
), out_data_digest(dd
), out_omap_digest(od
),
755 out_truncate_seq(otseq
),
756 out_truncate_size(otsize
),
758 void finish(int r
) override
{
759 // reqids are copied on ENOENT
760 if (r
< 0 && r
!= -ENOENT
)
763 bufferlist::iterator p
= bl
.begin();
764 object_copy_data_t copy_reply
;
765 ::decode(copy_reply
, p
);
768 *out_reqids
= copy_reply
.reqids
;
772 *out_size
= copy_reply
.size
;
774 *out_mtime
= ceph::real_clock::from_ceph_timespec(copy_reply
.mtime
);
776 *out_attrs
= copy_reply
.attrs
;
778 out_data
->claim_append(copy_reply
.data
);
780 out_omap_header
->claim_append(copy_reply
.omap_header
);
782 *out_omap_data
= copy_reply
.omap_data
;
784 *out_snaps
= copy_reply
.snaps
;
786 *out_snap_seq
= copy_reply
.snap_seq
;
788 *out_flags
= copy_reply
.flags
;
790 *out_data_digest
= copy_reply
.data_digest
;
792 *out_omap_digest
= copy_reply
.omap_digest
;
794 *out_reqids
= copy_reply
.reqids
;
795 if (out_truncate_seq
)
796 *out_truncate_seq
= copy_reply
.truncate_seq
;
797 if (out_truncate_size
)
798 *out_truncate_size
= copy_reply
.truncate_size
;
799 *cursor
= copy_reply
.cursor
;
800 } catch (buffer::error
& e
) {
807 void copy_get(object_copy_cursor_t
*cursor
,
810 ceph::real_time
*out_mtime
,
811 std::map
<std::string
,bufferlist
> *out_attrs
,
812 bufferlist
*out_data
,
813 bufferlist
*out_omap_header
,
814 bufferlist
*out_omap_data
,
815 vector
<snapid_t
> *out_snaps
,
816 snapid_t
*out_snap_seq
,
818 uint32_t *out_data_digest
,
819 uint32_t *out_omap_digest
,
820 mempool::osd_pglog::vector
<pair
<osd_reqid_t
, version_t
> > *out_reqids
,
821 uint64_t *truncate_seq
,
822 uint64_t *truncate_size
,
824 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_COPY_GET
);
825 osd_op
.op
.copy_get
.max
= max
;
826 ::encode(*cursor
, osd_op
.indata
);
827 ::encode(max
, osd_op
.indata
);
828 unsigned p
= ops
.size() - 1;
830 C_ObjectOperation_copyget
*h
=
831 new C_ObjectOperation_copyget(cursor
, out_size
, out_mtime
,
832 out_attrs
, out_data
, out_omap_header
,
833 out_omap_data
, out_snaps
, out_snap_seq
,
834 out_flags
, out_data_digest
,
835 out_omap_digest
, out_reqids
, truncate_seq
,
836 truncate_size
, prval
);
842 add_op(CEPH_OSD_OP_UNDIRTY
);
845 struct C_ObjectOperation_isdirty
: public Context
{
849 C_ObjectOperation_isdirty(bool *p
, int *r
)
850 : pisdirty(p
), prval(r
) {}
851 void finish(int r
) override
{
855 bufferlist::iterator p
= bl
.begin();
857 ::decode(isdirty
, p
);
860 } catch (buffer::error
& e
) {
867 void is_dirty(bool *pisdirty
, int *prval
) {
868 add_op(CEPH_OSD_OP_ISDIRTY
);
869 unsigned p
= ops
.size() - 1;
871 C_ObjectOperation_isdirty
*h
=
872 new C_ObjectOperation_isdirty(pisdirty
, prval
);
877 struct C_ObjectOperation_hit_set_ls
: public Context
{
879 std::list
< std::pair
<time_t, time_t> > *ptls
;
880 std::list
< std::pair
<ceph::real_time
, ceph::real_time
> > *putls
;
882 C_ObjectOperation_hit_set_ls(std::list
< std::pair
<time_t, time_t> > *t
,
883 std::list
< std::pair
<ceph::real_time
,
884 ceph::real_time
> > *ut
,
886 : ptls(t
), putls(ut
), prval(r
) {}
887 void finish(int r
) override
{
891 bufferlist::iterator p
= bl
.begin();
892 std::list
< std::pair
<ceph::real_time
, ceph::real_time
> > ls
;
896 for (auto p
= ls
.begin(); p
!= ls
.end(); ++p
)
897 // round initial timestamp up to the next full second to
898 // keep this a valid interval.
900 make_pair(ceph::real_clock::to_time_t(
902 // Sadly, no time literals until C++14.
903 std::chrono::seconds(1))),
904 ceph::real_clock::to_time_t(p
->second
)));
908 } catch (buffer::error
& e
) {
917 * list available HitSets.
919 * We will get back a list of time intervals. Note that the most
920 * recent range may have an empty end timestamp if it is still
923 * @param pls [out] list of time intervals
924 * @param prval [out] return value
926 void hit_set_ls(std::list
< std::pair
<time_t, time_t> > *pls
, int *prval
) {
927 add_op(CEPH_OSD_OP_PG_HITSET_LS
);
928 unsigned p
= ops
.size() - 1;
930 C_ObjectOperation_hit_set_ls
*h
=
931 new C_ObjectOperation_hit_set_ls(pls
, NULL
, prval
);
935 void hit_set_ls(std::list
<std::pair
<ceph::real_time
, ceph::real_time
> > *pls
,
937 add_op(CEPH_OSD_OP_PG_HITSET_LS
);
938 unsigned p
= ops
.size() - 1;
940 C_ObjectOperation_hit_set_ls
*h
=
941 new C_ObjectOperation_hit_set_ls(NULL
, pls
, prval
);
949 * Return an encoded HitSet that includes the provided time
952 * @param stamp [in] timestamp
953 * @param pbl [out] target buffer for encoded HitSet
954 * @param prval [out] return value
956 void hit_set_get(ceph::real_time stamp
, bufferlist
*pbl
, int *prval
) {
957 OSDOp
& op
= add_op(CEPH_OSD_OP_PG_HITSET_GET
);
958 op
.op
.hit_set_get
.stamp
= ceph::real_clock::to_ceph_timespec(stamp
);
959 unsigned p
= ops
.size() - 1;
964 void omap_get_header(bufferlist
*bl
, int *prval
) {
965 add_op(CEPH_OSD_OP_OMAPGETHEADER
);
966 unsigned p
= ops
.size() - 1;
971 void omap_set(const map
<string
, bufferlist
> &map
) {
974 add_data(CEPH_OSD_OP_OMAPSETVALS
, 0, bl
.length(), bl
);
977 void omap_set_header(bufferlist
&bl
) {
978 add_data(CEPH_OSD_OP_OMAPSETHEADER
, 0, bl
.length(), bl
);
982 add_op(CEPH_OSD_OP_OMAPCLEAR
);
985 void omap_rm_keys(const std::set
<std::string
> &to_remove
) {
987 ::encode(to_remove
, bl
);
988 add_data(CEPH_OSD_OP_OMAPRMKEYS
, 0, bl
.length(), bl
);
992 void call(const char *cname
, const char *method
, bufferlist
&indata
) {
993 add_call(CEPH_OSD_OP_CALL
, cname
, method
, indata
, NULL
, NULL
, NULL
);
996 void call(const char *cname
, const char *method
, bufferlist
&indata
,
997 bufferlist
*outdata
, Context
*ctx
, int *prval
) {
998 add_call(CEPH_OSD_OP_CALL
, cname
, method
, indata
, outdata
, ctx
, prval
);
1002 void watch(uint64_t cookie
, __u8 op
, uint32_t timeout
= 0) {
1003 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_WATCH
);
1004 osd_op
.op
.watch
.cookie
= cookie
;
1005 osd_op
.op
.watch
.op
= op
;
1006 osd_op
.op
.watch
.timeout
= timeout
;
1009 void notify(uint64_t cookie
, uint32_t prot_ver
, uint32_t timeout
,
1010 bufferlist
&bl
, bufferlist
*inbl
) {
1011 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_NOTIFY
);
1012 osd_op
.op
.notify
.cookie
= cookie
;
1013 ::encode(prot_ver
, *inbl
);
1014 ::encode(timeout
, *inbl
);
1015 ::encode(bl
, *inbl
);
1016 osd_op
.indata
.append(*inbl
);
1019 void notify_ack(uint64_t notify_id
, uint64_t cookie
,
1020 bufferlist
& reply_bl
) {
1021 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_NOTIFY_ACK
);
1023 ::encode(notify_id
, bl
);
1024 ::encode(cookie
, bl
);
1025 ::encode(reply_bl
, bl
);
1026 osd_op
.indata
.append(bl
);
1029 void list_watchers(list
<obj_watch_t
> *out
,
1031 (void)add_op(CEPH_OSD_OP_LIST_WATCHERS
);
1033 unsigned p
= ops
.size() - 1;
1034 C_ObjectOperation_decodewatchers
*h
=
1035 new C_ObjectOperation_decodewatchers(out
, prval
);
1038 out_rval
[p
] = prval
;
1042 void list_snaps(librados::snap_set_t
*out
, int *prval
) {
1043 (void)add_op(CEPH_OSD_OP_LIST_SNAPS
);
1045 unsigned p
= ops
.size() - 1;
1046 C_ObjectOperation_decodesnaps
*h
=
1047 new C_ObjectOperation_decodesnaps(out
, prval
);
1050 out_rval
[p
] = prval
;
1054 void assert_version(uint64_t ver
) {
1055 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_ASSERT_VER
);
1056 osd_op
.op
.assert_ver
.ver
= ver
;
1059 void cmpxattr(const char *name
, const bufferlist
& val
,
1061 add_xattr(CEPH_OSD_OP_CMPXATTR
, name
, val
);
1062 OSDOp
& o
= *ops
.rbegin();
1063 o
.op
.xattr
.cmp_op
= op
;
1064 o
.op
.xattr
.cmp_mode
= mode
;
1067 void rollback(uint64_t snapid
) {
1068 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_ROLLBACK
);
1069 osd_op
.op
.snap
.snapid
= snapid
;
1072 void copy_from(object_t src
, snapid_t snapid
, object_locator_t src_oloc
,
1073 version_t src_version
, unsigned flags
,
1074 unsigned src_fadvise_flags
) {
1075 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_COPY_FROM
);
1076 osd_op
.op
.copy_from
.snapid
= snapid
;
1077 osd_op
.op
.copy_from
.src_version
= src_version
;
1078 osd_op
.op
.copy_from
.flags
= flags
;
1079 osd_op
.op
.copy_from
.src_fadvise_flags
= src_fadvise_flags
;
1080 ::encode(src
, osd_op
.indata
);
1081 ::encode(src_oloc
, osd_op
.indata
);
1085 * writeback content to backing tier
1087 * If object is marked dirty in the cache tier, write back content
1088 * to backing tier. If the object is clean this is a no-op.
1090 * If writeback races with an update, the update will block.
1092 * use with IGNORE_CACHE to avoid triggering promote.
1094 void cache_flush() {
1095 add_op(CEPH_OSD_OP_CACHE_FLUSH
);
1099 * writeback content to backing tier
1101 * If object is marked dirty in the cache tier, write back content
1102 * to backing tier. If the object is clean this is a no-op.
1104 * If writeback races with an update, return EAGAIN. Requires that
1105 * the SKIPRWLOCKS flag be set.
1107 * use with IGNORE_CACHE to avoid triggering promote.
1109 void cache_try_flush() {
1110 add_op(CEPH_OSD_OP_CACHE_TRY_FLUSH
);
1114 * evict object from cache tier
1116 * If object is marked clean, remove the object from the cache tier.
1117 * Otherwise, return EBUSY.
1119 * use with IGNORE_CACHE to avoid triggering promote.
1121 void cache_evict() {
1122 add_op(CEPH_OSD_OP_CACHE_EVICT
);
1128 void set_redirect(object_t tgt
, snapid_t snapid
, object_locator_t tgt_oloc
,
1129 version_t tgt_version
) {
1130 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_SET_REDIRECT
);
1131 osd_op
.op
.copy_from
.snapid
= snapid
;
1132 osd_op
.op
.copy_from
.src_version
= tgt_version
;
1133 ::encode(tgt
, osd_op
.indata
);
1134 ::encode(tgt_oloc
, osd_op
.indata
);
1137 void set_alloc_hint(uint64_t expected_object_size
,
1138 uint64_t expected_write_size
,
1140 add_alloc_hint(CEPH_OSD_OP_SETALLOCHINT
, expected_object_size
,
1141 expected_write_size
, flags
);
1143 // CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
1144 // not worth a feature bit. Set FAILOK per-op flag to make
1145 // sure older osds don't trip over an unsupported opcode.
1146 set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK
);
1149 void dup(vector
<OSDOp
>& sops
) {
1151 out_bl
.resize(sops
.size());
1152 out_handler
.resize(sops
.size());
1153 out_rval
.resize(sops
.size());
1154 for (uint32_t i
= 0; i
< sops
.size(); i
++) {
1155 out_bl
[i
] = &sops
[i
].outdata
;
1156 out_handler
[i
] = NULL
;
1157 out_rval
[i
] = &sops
[i
].rval
;
1162 * Pin/unpin an object in cache tier
1165 add_op(CEPH_OSD_OP_CACHE_PIN
);
1168 void cache_unpin() {
1169 add_op(CEPH_OSD_OP_CACHE_UNPIN
);
1177 class Objecter
: public md_config_obs_t
, public Dispatcher
{
1179 // config observer bits
1180 const char** get_tracked_conf_keys() const override
;
1181 void handle_conf_change(const struct md_config_t
*conf
,
1182 const std::set
<std::string
> &changed
) override
;
1185 Messenger
*messenger
;
1188 ZTracer::Endpoint trace_endpoint
;
1192 using Dispatcher::cct
;
1193 std::multimap
<string
,string
> crush_location
;
1195 std::atomic
<bool> initialized
{false};
1198 std::atomic
<uint64_t> last_tid
{0};
1199 std::atomic
<unsigned> inflight_ops
{0};
1200 std::atomic
<int> client_inc
{-1};
1201 uint64_t max_linger_id
;
1202 std::atomic
<unsigned> num_in_flight
{0};
1203 std::atomic
<int> global_op_flags
{0}; // flags which are applied to each IO op
1204 bool keep_balanced_budget
;
1205 bool honor_osdmap_full
;
1206 bool osdmap_full_try
;
1208 // If this is true, accumulate a set of blacklisted entities
1209 // to be drained by consume_blacklist_events.
1210 bool blacklist_events_enabled
;
1211 std::set
<entity_addr_t
> blacklist_events
;
1214 void maybe_request_map();
1216 void enable_blacklist_events();
1219 void _maybe_request_map();
1221 version_t last_seen_osdmap_version
;
1222 version_t last_seen_pgmap_version
;
1224 mutable boost::shared_mutex rwlock
;
1225 using lock_guard
= std::unique_lock
<decltype(rwlock
)>;
1226 using unique_lock
= std::unique_lock
<decltype(rwlock
)>;
1227 using shared_lock
= boost::shared_lock
<decltype(rwlock
)>;
1228 using shunique_lock
= ceph::shunique_lock
<decltype(rwlock
)>;
1229 ceph::timer
<ceph::mono_clock
> timer
;
1231 PerfCounters
*logger
;
1233 uint64_t tick_event
;
1237 void update_crush_location();
1239 class RequestStateHook
;
1241 RequestStateHook
*m_request_state_hook
;
1244 /*** track pending operations ***/
1250 struct op_target_t
{
1253 epoch_t epoch
= 0; ///< latest epoch we calculated the mapping
1256 object_locator_t base_oloc
;
1257 object_t target_oid
;
1258 object_locator_t target_oloc
;
1260 ///< true if we are directed at base_pgid, not base_oid
1261 bool precalc_pgid
= false;
1263 ///< true if we have ever mapped to a valid pool
1264 bool pool_ever_existed
= false;
1266 ///< explcit pg target, if any
1269 pg_t pgid
; ///< last (raw) pg we mapped to
1270 spg_t actual_pgid
; ///< last (actual) spg_t we mapped to
1271 unsigned pg_num
= 0; ///< last pg_num we mapped to
1272 unsigned pg_num_mask
= 0; ///< last pg_num_mask we mapped to
1273 vector
<int> up
; ///< set of up osds for last pg we mapped to
1274 vector
<int> acting
; ///< set of acting osds for last pg we mapped to
1275 int up_primary
= -1; ///< last up_primary we mapped to
1276 int acting_primary
= -1; ///< last acting_primary we mapped to
1277 int size
= -1; ///< the size of the pool when were were last mapped
1278 int min_size
= -1; ///< the min size of the pool when were were last mapped
1279 bool sort_bitwise
= false; ///< whether the hobject_t sort order is bitwise
1281 bool used_replica
= false;
1282 bool paused
= false;
1284 int osd
= -1; ///< the final target osd, or -1
1286 epoch_t last_force_resend
= 0;
1288 op_target_t(object_t oid
, object_locator_t oloc
, int flags
)
1294 op_target_t(pg_t pgid
)
1295 : base_oloc(pgid
.pool(), pgid
.ps()),
1300 op_target_t() = default;
1302 hobject_t
get_hobj() {
1303 return hobject_t(target_oid
,
1306 target_oloc
.hash
>= 0 ? target_oloc
.hash
: pgid
.ps(),
1308 target_oloc
.nspace
);
1311 bool contained_by(const hobject_t
& begin
, const hobject_t
& end
) {
1312 hobject_t h
= get_hobj();
1313 int r
= cmp(h
, begin
);
1314 return r
== 0 || (r
> 0 && h
< end
);
1317 void dump(Formatter
*f
) const;
1320 struct Op
: public RefCountedObject
{
1321 OSDSession
*session
;
1326 ConnectionRef con
; // for rx buffer only
1327 uint64_t features
; // explicitly specified op features
1333 ceph::real_time mtime
;
1336 vector
<bufferlist
*> out_bl
;
1337 vector
<Context
*> out_handler
;
1338 vector
<int*> out_rval
;
1348 epoch_t
*reply_epoch
;
1350 ceph::mono_time stamp
;
1352 epoch_t map_dne_bound
;
1356 /// true if we should resend this message on failure
1359 /// true if the throttle budget is get/put on a series of OPs,
1360 /// instead of per OP basis, when this flag is set, the budget is
1361 /// acquired before sending the very first OP of the series and
1362 /// released upon receiving the last OP reply.
1367 osd_reqid_t reqid
; // explicitly setting reqid
1368 ZTracer::Trace trace
;
1370 Op(const object_t
& o
, const object_locator_t
& ol
, vector
<OSDOp
>& op
,
1371 int f
, Context
*fin
, version_t
*ov
, int *offset
= NULL
,
1372 ZTracer::Trace
*parent_trace
= nullptr) :
1373 session(NULL
), incarnation(0),
1376 features(CEPH_FEATURES_SUPPORTED_DEFAULT
),
1377 snapid(CEPH_NOSNAP
),
1388 should_resend(true),
1389 ctx_budgeted(false),
1390 data_offset(offset
) {
1393 /* initialize out_* to match op vector */
1394 out_bl
.resize(ops
.size());
1395 out_rval
.resize(ops
.size());
1396 out_handler
.resize(ops
.size());
1397 for (unsigned i
= 0; i
< ops
.size(); i
++) {
1399 out_handler
[i
] = NULL
;
1403 if (target
.base_oloc
.key
== o
)
1404 target
.base_oloc
.key
.clear();
1406 if (parent_trace
&& parent_trace
->valid()) {
1407 trace
.init("op", nullptr, parent_trace
);
1408 trace
.event("start");
1412 bool operator<(const Op
& other
) const {
1413 return tid
< other
.tid
;
1416 bool respects_full() const {
1418 (target
.flags
& (CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_RWORDERED
)) &&
1419 !(target
.flags
& (CEPH_OSD_FLAG_FULL_TRY
| CEPH_OSD_FLAG_FULL_FORCE
));
1424 while (!out_handler
.empty()) {
1425 delete out_handler
.back();
1426 out_handler
.pop_back();
1428 trace
.event("finish");
1432 struct C_Op_Map_Latest
: public Context
{
1436 C_Op_Map_Latest(Objecter
*o
, ceph_tid_t t
) : objecter(o
), tid(t
),
1438 void finish(int r
) override
;
1441 struct C_Command_Map_Latest
: public Context
{
1445 C_Command_Map_Latest(Objecter
*o
, ceph_tid_t t
) : objecter(o
), tid(t
),
1447 void finish(int r
) override
;
1450 struct C_Stat
: public Context
{
1453 ceph::real_time
*pmtime
;
1455 C_Stat(uint64_t *ps
, ceph::real_time
*pm
, Context
*c
) :
1456 psize(ps
), pmtime(pm
), fin(c
) {}
1457 void finish(int r
) override
{
1459 bufferlist::iterator p
= bl
.begin();
1473 struct C_GetAttrs
: public Context
{
1475 map
<string
,bufferlist
>& attrset
;
1477 C_GetAttrs(map
<string
, bufferlist
>& set
, Context
*c
) : attrset(set
),
1479 void finish(int r
) override
{
1481 bufferlist::iterator p
= bl
.begin();
1482 ::decode(attrset
, p
);
1489 // Pools and statistics
1490 struct NListContext
{
1491 collection_list_handle_t pos
;
1493 // these are for !sortbitwise compat only
1495 int starting_pg_num
= 0;
1496 bool sort_bitwise
= false;
1498 bool at_end_of_pool
= false; ///< publicly visible end flag
1500 int64_t pool_id
= -1;
1501 int pool_snap_seq
= 0;
1502 uint64_t max_entries
= 0;
1505 bufferlist bl
; // raw data read to here
1506 std::list
<librados::ListObjectImpl
> list
;
1510 bufferlist extra_info
;
1512 // The budget associated with this context, once it is set (>= 0),
1513 // the budget is not get/released on OP basis, instead the budget
1514 // is acquired before sending the first OP and released upon receiving
1515 // the last op reply.
1516 int ctx_budget
= -1;
1518 bool at_end() const {
1519 return at_end_of_pool
;
1522 uint32_t get_pg_hash_position() const {
1523 return pos
.get_hash();
1527 struct C_NList
: public Context
{
1528 NListContext
*list_context
;
1529 Context
*final_finish
;
1532 C_NList(NListContext
*lc
, Context
* finish
, Objecter
*ob
) :
1533 list_context(lc
), final_finish(finish
), objecter(ob
), epoch(0) {}
1534 void finish(int r
) override
{
1536 objecter
->_nlist_reply(list_context
, r
, final_finish
, epoch
);
1538 final_finish
->complete(r
);
1547 map
<string
,pool_stat_t
> *pool_stats
;
1551 ceph::mono_time last_submit
;
1556 struct ceph_statfs
*stats
;
1560 ceph::mono_time last_submit
;
1575 ceph::mono_time last_submit
;
1576 PoolOp() : tid(0), pool(0), onfinish(NULL
), ontimeout(0), pool_op(0),
1577 auid(0), crush_rule(0), snapid(0), blp(NULL
) {}
1580 // -- osd commands --
1581 struct CommandOp
: public RefCountedObject
{
1582 OSDSession
*session
= nullptr;
1586 bufferlist
*poutbl
= nullptr;
1587 string
*prs
= nullptr;
1589 // target_osd == -1 means target_pg is valid
1590 const int target_osd
= -1;
1591 const pg_t target_pg
;
1595 epoch_t map_dne_bound
= 0;
1596 int map_check_error
= 0; // error to return if map check fails
1597 const char *map_check_error_str
= nullptr;
1599 Context
*onfinish
= nullptr;
1600 uint64_t ontimeout
= 0;
1601 ceph::mono_time last_submit
;
1605 const vector
<string
> &cmd
,
1614 target_osd(target_osd
),
1615 onfinish(onfinish
) {}
1619 const vector
<string
> &cmd
,
1630 onfinish(onfinish
) {}
1634 void submit_command(CommandOp
*c
, ceph_tid_t
*ptid
);
1635 int _calc_command_target(CommandOp
*c
, shunique_lock
&sul
);
1636 void _assign_command_session(CommandOp
*c
, shunique_lock
&sul
);
1637 void _send_command(CommandOp
*c
);
1638 int command_op_cancel(OSDSession
*s
, ceph_tid_t tid
, int r
);
1639 void _finish_command(CommandOp
*c
, int r
, string rs
);
1640 void handle_command_reply(MCommandReply
*m
);
1643 // -- lingering ops --
1645 struct WatchContext
{
1646 // this simply mirrors librados WatchCtx2
1647 virtual void handle_notify(uint64_t notify_id
,
1649 uint64_t notifier_id
,
1650 bufferlist
& bl
) = 0;
1651 virtual void handle_error(uint64_t cookie
, int err
) = 0;
1652 virtual ~WatchContext() {}
1655 struct LingerOp
: public RefCountedObject
{
1662 ceph::real_time mtime
;
1670 ceph::mono_time watch_valid_thru
; ///< send time for last acked ping
1671 int last_error
; ///< error from last failed ping|reconnect, if any
1672 boost::shared_mutex watch_lock
;
1673 using lock_guard
= std::unique_lock
<decltype(watch_lock
)>;
1674 using unique_lock
= std::unique_lock
<decltype(watch_lock
)>;
1675 using shared_lock
= boost::shared_lock
<decltype(watch_lock
)>;
1676 using shunique_lock
= ceph::shunique_lock
<decltype(watch_lock
)>;
1678 // queue of pending async operations, with the timestamp of
1679 // when they were queued.
1680 list
<ceph::mono_time
> watch_pending_async
;
1682 uint32_t register_gen
;
1685 Context
*on_reg_commit
;
1687 // we trigger these from an async finisher
1688 Context
*on_notify_finish
;
1689 bufferlist
*notify_result_bl
;
1692 WatchContext
*watch_context
;
1694 OSDSession
*session
;
1696 ceph_tid_t register_tid
;
1697 ceph_tid_t ping_tid
;
1698 epoch_t map_dne_bound
;
1700 void _queued_async() {
1701 // watch_lock ust be locked unique
1702 watch_pending_async
.push_back(ceph::mono_clock::now());
1704 void finished_async() {
1705 unique_lock
l(watch_lock
);
1706 assert(!watch_pending_async
.empty());
1707 watch_pending_async
.pop_front();
1710 LingerOp() : linger_id(0),
1711 target(object_t(), object_locator_t(), 0),
1712 snap(CEPH_NOSNAP
), poutbl(NULL
), pobjver(NULL
),
1713 is_watch(false), last_error(0),
1717 on_reg_commit(NULL
),
1718 on_notify_finish(NULL
),
1719 notify_result_bl(NULL
),
1721 watch_context(NULL
),
1728 const LingerOp
&operator=(const LingerOp
& r
);
1729 LingerOp(const LingerOp
& o
);
1731 uint64_t get_cookie() {
1732 return reinterpret_cast<uint64_t>(this);
1736 ~LingerOp() override
{
1737 delete watch_context
;
1741 struct C_Linger_Commit
: public Context
{
1744 bufferlist outbl
; // used for notify only
1745 C_Linger_Commit(Objecter
*o
, LingerOp
*l
) : objecter(o
), info(l
) {
1748 ~C_Linger_Commit() override
{
1751 void finish(int r
) override
{
1752 objecter
->_linger_commit(info
, r
, outbl
);
1756 struct C_Linger_Reconnect
: public Context
{
1759 C_Linger_Reconnect(Objecter
*o
, LingerOp
*l
) : objecter(o
), info(l
) {
1762 ~C_Linger_Reconnect() override
{
1765 void finish(int r
) override
{
1766 objecter
->_linger_reconnect(info
, r
);
1770 struct C_Linger_Ping
: public Context
{
1773 ceph::mono_time sent
;
1774 uint32_t register_gen
;
1775 C_Linger_Ping(Objecter
*o
, LingerOp
*l
)
1776 : objecter(o
), info(l
), register_gen(info
->register_gen
) {
1779 ~C_Linger_Ping() override
{
1782 void finish(int r
) override
{
1783 objecter
->_linger_ping(info
, r
, sent
, register_gen
);
1787 struct C_Linger_Map_Latest
: public Context
{
1791 C_Linger_Map_Latest(Objecter
*o
, uint64_t id
) :
1792 objecter(o
), linger_id(id
), latest(0) {}
1793 void finish(int r
) override
;
1796 // -- osd sessions --
1800 hobject_t begin
, end
;
1803 struct OSDSession
: public RefCountedObject
{
1804 boost::shared_mutex lock
;
1805 using lock_guard
= std::lock_guard
<decltype(lock
)>;
1806 using unique_lock
= std::unique_lock
<decltype(lock
)>;
1807 using shared_lock
= boost::shared_lock
<decltype(lock
)>;
1808 using shunique_lock
= ceph::shunique_lock
<decltype(lock
)>;
1811 map
<ceph_tid_t
,Op
*> ops
;
1812 map
<uint64_t, LingerOp
*> linger_ops
;
1813 map
<ceph_tid_t
,CommandOp
*> command_ops
;
1816 map
<spg_t
,map
<hobject_t
,OSDBackoff
>> backoffs
;
1817 map
<uint64_t,OSDBackoff
*> backoffs_by_id
;
1823 std::unique_ptr
<std::mutex
[]> completion_locks
;
1824 using unique_completion_lock
= std::unique_lock
<
1825 decltype(completion_locks
)::element_type
>;
1828 OSDSession(CephContext
*cct
, int o
) :
1829 osd(o
), incarnation(0), con(NULL
),
1830 num_locks(cct
->_conf
->objecter_completion_locks_per_session
),
1831 completion_locks(new std::mutex
[num_locks
]) {}
1833 ~OSDSession() override
;
1835 bool is_homeless() { return (osd
== -1); }
1837 unique_completion_lock
get_lock(object_t
& oid
);
1839 map
<int,OSDSession
*> osd_sessions
;
1841 bool osdmap_full_flag() const;
1842 bool osdmap_pool_full(const int64_t pool_id
) const;
1847 * Test pg_pool_t::FLAG_FULL on a pool
1849 * @return true if the pool exists and has the flag set, or
1850 * the global full flag is set, else false
1852 bool _osdmap_pool_full(const int64_t pool_id
) const;
1853 bool _osdmap_pool_full(const pg_pool_t
&p
) const;
1854 void update_pool_full_map(map
<int64_t, bool>& pool_full_map
);
1856 map
<uint64_t, LingerOp
*> linger_ops
;
1857 // we use this just to confirm a cookie is valid before dereferencing the ptr
1858 set
<LingerOp
*> linger_ops_set
;
1860 map
<ceph_tid_t
,PoolStatOp
*> poolstat_ops
;
1861 map
<ceph_tid_t
,StatfsOp
*> statfs_ops
;
1862 map
<ceph_tid_t
,PoolOp
*> pool_ops
;
1863 std::atomic
<unsigned> num_homeless_ops
{0};
1865 OSDSession
*homeless_session
;
1867 // ops waiting for an osdmap with a new pool or confirmation that
1868 // the pool does not exist (may be expanded to other uses later)
1869 map
<uint64_t, LingerOp
*> check_latest_map_lingers
;
1870 map
<ceph_tid_t
, Op
*> check_latest_map_ops
;
1871 map
<ceph_tid_t
, CommandOp
*> check_latest_map_commands
;
1873 map
<epoch_t
,list
< pair
<Context
*, int> > > waiting_for_map
;
1875 ceph::timespan mon_timeout
;
1876 ceph::timespan osd_timeout
;
1878 MOSDOp
*_prepare_osd_op(Op
*op
);
1879 void _send_op(Op
*op
, MOSDOp
*m
= NULL
);
1880 void _send_op_account(Op
*op
);
1881 void _cancel_linger_op(Op
*op
);
1882 void finish_op(OSDSession
*session
, ceph_tid_t tid
);
1883 void _finish_op(Op
*op
, int r
);
1884 static bool is_pg_changed(
1886 const vector
<int>& oldacting
,
1888 const vector
<int>& newacting
,
1889 bool any_change
=false);
1890 enum recalc_op_target_result
{
1891 RECALC_OP_TARGET_NO_ACTION
= 0,
1892 RECALC_OP_TARGET_NEED_RESEND
,
1893 RECALC_OP_TARGET_POOL_DNE
,
1894 RECALC_OP_TARGET_OSD_DNE
,
1895 RECALC_OP_TARGET_OSD_DOWN
,
1897 bool _osdmap_full_flag() const;
1898 bool _osdmap_has_pool_full() const;
1900 bool target_should_be_paused(op_target_t
*op
);
1901 int _calc_target(op_target_t
*t
, Connection
*con
,
1902 bool any_change
= false);
1903 int _map_session(op_target_t
*op
, OSDSession
**s
,
1906 void _session_op_assign(OSDSession
*s
, Op
*op
);
1907 void _session_op_remove(OSDSession
*s
, Op
*op
);
1908 void _session_linger_op_assign(OSDSession
*to
, LingerOp
*op
);
1909 void _session_linger_op_remove(OSDSession
*from
, LingerOp
*op
);
1910 void _session_command_op_assign(OSDSession
*to
, CommandOp
*op
);
1911 void _session_command_op_remove(OSDSession
*from
, CommandOp
*op
);
1913 int _assign_op_target_session(Op
*op
, shunique_lock
& lc
,
1914 bool src_session_locked
,
1915 bool dst_session_locked
);
1916 int _recalc_linger_op_target(LingerOp
*op
, shunique_lock
& lc
);
1918 void _linger_submit(LingerOp
*info
, shunique_lock
& sul
);
1919 void _send_linger(LingerOp
*info
, shunique_lock
& sul
);
1920 void _linger_commit(LingerOp
*info
, int r
, bufferlist
& outbl
);
1921 void _linger_reconnect(LingerOp
*info
, int r
);
1922 void _send_linger_ping(LingerOp
*info
);
1923 void _linger_ping(LingerOp
*info
, int r
, ceph::mono_time sent
,
1924 uint32_t register_gen
);
1925 int _normalize_watch_error(int r
);
1927 friend class C_DoWatchError
;
1929 void linger_callback_flush(Context
*ctx
) {
1930 finisher
->queue(ctx
);
1934 void _check_op_pool_dne(Op
*op
, unique_lock
*sl
);
1935 void _send_op_map_check(Op
*op
);
1936 void _op_cancel_map_check(Op
*op
);
1937 void _check_linger_pool_dne(LingerOp
*op
, bool *need_unregister
);
1938 void _send_linger_map_check(LingerOp
*op
);
1939 void _linger_cancel_map_check(LingerOp
*op
);
1940 void _check_command_map_dne(CommandOp
*op
);
1941 void _send_command_map_check(CommandOp
*op
);
1942 void _command_cancel_map_check(CommandOp
*op
);
1944 void kick_requests(OSDSession
*session
);
1945 void _kick_requests(OSDSession
*session
, map
<uint64_t, LingerOp
*>& lresend
);
1946 void _linger_ops_resend(map
<uint64_t, LingerOp
*>& lresend
, unique_lock
& ul
);
1948 int _get_session(int osd
, OSDSession
**session
, shunique_lock
& sul
);
1949 void put_session(OSDSession
*s
);
1950 void get_session(OSDSession
*s
);
1951 void _reopen_session(OSDSession
*session
);
1952 void close_session(OSDSession
*session
);
1954 void _nlist_reply(NListContext
*list_context
, int r
, Context
*final_finish
,
1955 epoch_t reply_epoch
);
1957 void resend_mon_ops();
1960 * handle a budget for in-flight ops
1961 * budget is taken whenever an op goes into the ops map
1962 * and returned whenever an op is removed from the map
1963 * If throttle_op needs to throttle it will unlock client_lock.
1965 int calc_op_budget(Op
*op
);
1966 void _throttle_op(Op
*op
, shunique_lock
& sul
, int op_size
= 0);
1967 int _take_op_budget(Op
*op
, shunique_lock
& sul
) {
1968 assert(sul
&& sul
.mutex() == &rwlock
);
1969 int op_budget
= calc_op_budget(op
);
1970 if (keep_balanced_budget
) {
1971 _throttle_op(op
, sul
, op_budget
);
1973 op_throttle_bytes
.take(op_budget
);
1974 op_throttle_ops
.take(1);
1976 op
->budgeted
= true;
1979 void put_op_budget_bytes(int op_budget
) {
1980 assert(op_budget
>= 0);
1981 op_throttle_bytes
.put(op_budget
);
1982 op_throttle_ops
.put(1);
1984 void put_op_budget(Op
*op
) {
1985 assert(op
->budgeted
);
1986 int op_budget
= calc_op_budget(op
);
1987 put_op_budget_bytes(op_budget
);
1989 void put_nlist_context_budget(NListContext
*list_context
);
1990 Throttle op_throttle_bytes
, op_throttle_ops
;
1993 Objecter(CephContext
*cct_
, Messenger
*m
, MonClient
*mc
,
1996 double osd_timeout
) :
1997 Dispatcher(cct_
), messenger(m
), monc(mc
), finisher(fin
),
1998 trace_endpoint("0.0.0.0", 0, "Objecter"),
2001 keep_balanced_budget(false), honor_osdmap_full(true), osdmap_full_try(false),
2002 blacklist_events_enabled(false),
2003 last_seen_osdmap_version(0), last_seen_pgmap_version(0),
2004 logger(NULL
), tick_event(0), m_request_state_hook(NULL
),
2005 homeless_session(new OSDSession(cct
, -1)),
2006 mon_timeout(ceph::make_timespan(mon_timeout
)),
2007 osd_timeout(ceph::make_timespan(osd_timeout
)),
2008 op_throttle_bytes(cct
, "objecter_bytes",
2009 cct
->_conf
->objecter_inflight_op_bytes
),
2010 op_throttle_ops(cct
, "objecter_ops", cct
->_conf
->objecter_inflight_ops
),
2012 retry_writes_after_first_reply(cct
->_conf
->objecter_retry_writes_after_first_reply
)
2014 ~Objecter() override
;
2017 void start(const OSDMap
*o
= nullptr);
2020 // These two templates replace osdmap_(get)|(put)_read. Simply wrap
2021 // whatever functionality you want to use the OSDMap in a lambda like:
2023 // with_osdmap([](const OSDMap& o) { o.do_stuff(); });
2027 // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); });
2029 // Do not call into something that will try to lock the OSDMap from
2030 // here or you will have great woe and misery.
2032 template<typename Callback
, typename
...Args
>
2033 auto with_osdmap(Callback
&& cb
, Args
&&... args
) const ->
2034 decltype(cb(*osdmap
, std::forward
<Args
>(args
)...)) {
2035 shared_lock
l(rwlock
);
2036 return std::forward
<Callback
>(cb
)(*osdmap
, std::forward
<Args
>(args
)...);
2041 * Tell the objecter to throttle outgoing ops according to its
2042 * budget (in _conf). If you do this, ops can block, in
2043 * which case it will unlock client_lock and sleep until
2044 * incoming messages reduce the used budget low enough for
2045 * the ops to continue going; then it will lock client_lock again.
2047 void set_balanced_budget() { keep_balanced_budget
= true; }
2048 void unset_balanced_budget() { keep_balanced_budget
= false; }
2050 void set_honor_osdmap_full() { honor_osdmap_full
= true; }
2051 void unset_honor_osdmap_full() { honor_osdmap_full
= false; }
2053 void set_osdmap_full_try() { osdmap_full_try
= true; }
2054 void unset_osdmap_full_try() { osdmap_full_try
= false; }
2056 void _scan_requests(OSDSession
*s
,
2059 map
<int64_t, bool> *pool_full_map
,
2060 map
<ceph_tid_t
, Op
*>& need_resend
,
2061 list
<LingerOp
*>& need_resend_linger
,
2062 map
<ceph_tid_t
, CommandOp
*>& need_resend_command
,
2063 shunique_lock
& sul
);
2065 int64_t get_object_hash_position(int64_t pool
, const string
& key
,
2067 int64_t get_object_pg_hash_position(int64_t pool
, const string
& key
,
2072 bool ms_dispatch(Message
*m
) override
;
2073 bool ms_can_fast_dispatch_any() const override
{
2076 bool ms_can_fast_dispatch(const Message
*m
) const override
{
2077 switch (m
->get_type()) {
2078 case CEPH_MSG_OSD_OPREPLY
:
2079 case CEPH_MSG_WATCH_NOTIFY
:
2085 void ms_fast_dispatch(Message
*m
) override
{
2089 void handle_osd_op_reply(class MOSDOpReply
*m
);
2090 void handle_osd_backoff(class MOSDBackoff
*m
);
2091 void handle_watch_notify(class MWatchNotify
*m
);
2092 void handle_osd_map(class MOSDMap
*m
);
2093 void wait_for_osd_map();
2096 * Get list of entities blacklisted since this was last called,
2097 * and reset the list.
2099 * Uses a std::set because typical use case is to compare some
2100 * other list of clients to see which overlap with the blacklisted
2104 void consume_blacklist_events(std::set
<entity_addr_t
> *events
);
2106 int pool_snap_by_name(int64_t poolid
,
2107 const char *snap_name
,
2108 snapid_t
*snap
) const;
2109 int pool_snap_get_info(int64_t poolid
, snapid_t snap
,
2110 pool_snap_info_t
*info
) const;
2111 int pool_snap_list(int64_t poolid
, vector
<uint64_t> *snaps
);
2114 void emit_blacklist_events(const OSDMap::Incremental
&inc
);
2115 void emit_blacklist_events(const OSDMap
&old_osd_map
,
2116 const OSDMap
&new_osd_map
);
2119 void _op_submit(Op
*op
, shunique_lock
& lc
, ceph_tid_t
*ptid
);
2120 void _op_submit_with_budget(Op
*op
, shunique_lock
& lc
,
2122 int *ctx_budget
= NULL
);
2123 inline void unregister_op(Op
*op
);
2127 void op_submit(Op
*op
, ceph_tid_t
*ptid
= NULL
, int *ctx_budget
= NULL
);
2129 shared_lock
l(rwlock
);
2130 return !((!inflight_ops
) && linger_ops
.empty() &&
2131 poolstat_ops
.empty() && statfs_ops
.empty());
2135 * Output in-flight requests
2137 void _dump_active(OSDSession
*s
);
2138 void _dump_active();
2140 void dump_requests(Formatter
*fmt
);
2141 void _dump_ops(const OSDSession
*s
, Formatter
*fmt
);
2142 void dump_ops(Formatter
*fmt
);
2143 void _dump_linger_ops(const OSDSession
*s
, Formatter
*fmt
);
2144 void dump_linger_ops(Formatter
*fmt
);
2145 void _dump_command_ops(const OSDSession
*s
, Formatter
*fmt
);
2146 void dump_command_ops(Formatter
*fmt
);
2147 void dump_pool_ops(Formatter
*fmt
) const;
2148 void dump_pool_stat_ops(Formatter
*fmt
) const;
2149 void dump_statfs_ops(Formatter
*fmt
) const;
2151 int get_client_incarnation() const { return client_inc
; }
2152 void set_client_incarnation(int inc
) { client_inc
= inc
; }
2154 bool have_map(epoch_t epoch
);
2155 /// wait for epoch; true if we already have it
2156 bool wait_for_map(epoch_t epoch
, Context
*c
, int err
=0);
2157 void _wait_for_new_map(Context
*c
, epoch_t epoch
, int err
=0);
2158 void wait_for_latest_osdmap(Context
*fin
);
2159 void get_latest_version(epoch_t oldest
, epoch_t neweset
, Context
*fin
);
2160 void _get_latest_version(epoch_t oldest
, epoch_t neweset
, Context
*fin
);
2162 /** Get the current set of global op flags */
2163 int get_global_op_flags() const { return global_op_flags
; }
2164 /** Add a flag to the global op flags, not really atomic operation */
2165 void add_global_op_flags(int flag
) {
2166 global_op_flags
.fetch_or(flag
);
2168 /** Clear the passed flags from the global op flag set */
2169 void clear_global_op_flag(int flags
) {
2170 global_op_flags
.fetch_and(~flags
);
2173 /// cancel an in-progress request with the given return code
2175 int op_cancel(OSDSession
*s
, ceph_tid_t tid
, int r
);
2176 int _op_cancel(ceph_tid_t tid
, int r
);
2178 int op_cancel(ceph_tid_t tid
, int r
);
2181 * Any write op which is in progress at the start of this call shall no
2182 * longer be in progress when this call ends. Operations started after the
2183 * start of this call may still be in progress when this call ends.
2185 * @return the latest possible epoch in which a cancelled op could have
2186 * existed, or -1 if nothing was cancelled.
2188 epoch_t
op_cancel_writes(int r
, int64_t pool
=-1);
2191 void osd_command(int osd
, const std::vector
<string
>& cmd
,
2192 const bufferlist
& inbl
, ceph_tid_t
*ptid
,
2193 bufferlist
*poutbl
, string
*prs
, Context
*onfinish
) {
2195 CommandOp
*c
= new CommandOp(
2202 submit_command(c
, ptid
);
2204 void pg_command(pg_t pgid
, vector
<string
>& cmd
,
2205 const bufferlist
& inbl
, ceph_tid_t
*ptid
,
2206 bufferlist
*poutbl
, string
*prs
, Context
*onfinish
) {
2207 CommandOp
*c
= new CommandOp(
2214 submit_command(c
, ptid
);
2217 // mid-level helpers
2218 Op
*prepare_mutate_op(
2219 const object_t
& oid
, const object_locator_t
& oloc
,
2220 ObjectOperation
& op
, const SnapContext
& snapc
,
2221 ceph::real_time mtime
, int flags
,
2222 Context
*oncommit
, version_t
*objver
= NULL
,
2223 osd_reqid_t reqid
= osd_reqid_t(),
2224 ZTracer::Trace
*parent_trace
= nullptr) {
2225 Op
*o
= new Op(oid
, oloc
, op
.ops
, flags
| global_op_flags
|
2226 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
, nullptr, parent_trace
);
2227 o
->priority
= op
.priority
;
2230 o
->out_rval
.swap(op
.out_rval
);
2235 const object_t
& oid
, const object_locator_t
& oloc
,
2236 ObjectOperation
& op
, const SnapContext
& snapc
,
2237 ceph::real_time mtime
, int flags
,
2238 Context
*oncommit
, version_t
*objver
= NULL
,
2239 osd_reqid_t reqid
= osd_reqid_t()) {
2240 Op
*o
= prepare_mutate_op(oid
, oloc
, op
, snapc
, mtime
, flags
,
2241 oncommit
, objver
, reqid
);
2246 Op
*prepare_read_op(
2247 const object_t
& oid
, const object_locator_t
& oloc
,
2248 ObjectOperation
& op
,
2249 snapid_t snapid
, bufferlist
*pbl
, int flags
,
2250 Context
*onack
, version_t
*objver
= NULL
,
2251 int *data_offset
= NULL
,
2252 uint64_t features
= 0,
2253 ZTracer::Trace
*parent_trace
= nullptr) {
2254 Op
*o
= new Op(oid
, oloc
, op
.ops
, flags
| global_op_flags
|
2255 CEPH_OSD_FLAG_READ
, onack
, objver
, data_offset
, parent_trace
);
2256 o
->priority
= op
.priority
;
2259 if (!o
->outbl
&& op
.size() == 1 && op
.out_bl
[0]->length())
2260 o
->outbl
= op
.out_bl
[0];
2261 o
->out_bl
.swap(op
.out_bl
);
2262 o
->out_handler
.swap(op
.out_handler
);
2263 o
->out_rval
.swap(op
.out_rval
);
2267 const object_t
& oid
, const object_locator_t
& oloc
,
2268 ObjectOperation
& op
,
2269 snapid_t snapid
, bufferlist
*pbl
, int flags
,
2270 Context
*onack
, version_t
*objver
= NULL
,
2271 int *data_offset
= NULL
,
2272 uint64_t features
= 0) {
2273 Op
*o
= prepare_read_op(oid
, oloc
, op
, snapid
, pbl
, flags
, onack
, objver
,
2276 o
->features
= features
;
2281 Op
*prepare_pg_read_op(
2282 uint32_t hash
, object_locator_t oloc
,
2283 ObjectOperation
& op
, bufferlist
*pbl
, int flags
,
2284 Context
*onack
, epoch_t
*reply_epoch
,
2286 Op
*o
= new Op(object_t(), oloc
,
2288 flags
| global_op_flags
| CEPH_OSD_FLAG_READ
|
2289 CEPH_OSD_FLAG_IGNORE_OVERLAY
,
2291 o
->target
.precalc_pgid
= true;
2292 o
->target
.base_pgid
= pg_t(hash
, oloc
.pool
);
2293 o
->priority
= op
.priority
;
2294 o
->snapid
= CEPH_NOSNAP
;
2296 o
->out_bl
.swap(op
.out_bl
);
2297 o
->out_handler
.swap(op
.out_handler
);
2298 o
->out_rval
.swap(op
.out_rval
);
2299 o
->reply_epoch
= reply_epoch
;
2301 // budget is tracked by listing context
2302 o
->ctx_budgeted
= true;
2307 uint32_t hash
, object_locator_t oloc
,
2308 ObjectOperation
& op
, bufferlist
*pbl
, int flags
,
2309 Context
*onack
, epoch_t
*reply_epoch
,
2311 Op
*o
= prepare_pg_read_op(hash
, oloc
, op
, pbl
, flags
,
2312 onack
, reply_epoch
, ctx_budget
);
2314 op_submit(o
, &tid
, ctx_budget
);
2318 // caller owns a ref
2319 LingerOp
*linger_register(const object_t
& oid
, const object_locator_t
& oloc
,
2321 ceph_tid_t
linger_watch(LingerOp
*info
,
2322 ObjectOperation
& op
,
2323 const SnapContext
& snapc
, ceph::real_time mtime
,
2327 ceph_tid_t
linger_notify(LingerOp
*info
,
2328 ObjectOperation
& op
,
2329 snapid_t snap
, bufferlist
& inbl
,
2333 int linger_check(LingerOp
*info
);
2334 void linger_cancel(LingerOp
*info
); // releases a reference
2335 void _linger_cancel(LingerOp
*info
);
2337 void _do_watch_notify(LingerOp
*info
, MWatchNotify
*m
);
2340 * set up initial ops in the op vector, and allocate a final op slot.
2342 * The caller is responsible for filling in the final ops_count ops.
2344 * @param ops op vector
2345 * @param ops_count number of final ops the caller will fill in
2346 * @param extra_ops pointer to [array of] initial op[s]
2347 * @return index of final op (for caller to fill in)
2349 int init_ops(vector
<OSDOp
>& ops
, int ops_count
, ObjectOperation
*extra_ops
) {
2354 extra
= extra_ops
->ops
.size();
2356 ops
.resize(ops_count
+ extra
);
2358 for (i
=0; i
<extra
; i
++) {
2359 ops
[i
] = extra_ops
->ops
[i
];
2366 // high-level helpers
2367 Op
*prepare_stat_op(
2368 const object_t
& oid
, const object_locator_t
& oloc
,
2369 snapid_t snap
, uint64_t *psize
, ceph::real_time
*pmtime
,
2370 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2371 ObjectOperation
*extra_ops
= NULL
) {
2373 int i
= init_ops(ops
, 1, extra_ops
);
2374 ops
[i
].op
.op
= CEPH_OSD_OP_STAT
;
2375 C_Stat
*fin
= new C_Stat(psize
, pmtime
, onfinish
);
2376 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2377 CEPH_OSD_FLAG_READ
, fin
, objver
);
2379 o
->outbl
= &fin
->bl
;
2383 const object_t
& oid
, const object_locator_t
& oloc
,
2384 snapid_t snap
, uint64_t *psize
, ceph::real_time
*pmtime
,
2385 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2386 ObjectOperation
*extra_ops
= NULL
) {
2387 Op
*o
= prepare_stat_op(oid
, oloc
, snap
, psize
, pmtime
, flags
,
2388 onfinish
, objver
, extra_ops
);
2394 Op
*prepare_read_op(
2395 const object_t
& oid
, const object_locator_t
& oloc
,
2396 uint64_t off
, uint64_t len
, snapid_t snap
, bufferlist
*pbl
,
2397 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2398 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0,
2399 ZTracer::Trace
*parent_trace
= nullptr) {
2401 int i
= init_ops(ops
, 1, extra_ops
);
2402 ops
[i
].op
.op
= CEPH_OSD_OP_READ
;
2403 ops
[i
].op
.extent
.offset
= off
;
2404 ops
[i
].op
.extent
.length
= len
;
2405 ops
[i
].op
.extent
.truncate_size
= 0;
2406 ops
[i
].op
.extent
.truncate_seq
= 0;
2407 ops
[i
].op
.flags
= op_flags
;
2408 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2409 CEPH_OSD_FLAG_READ
, onfinish
, objver
, nullptr, parent_trace
);
2415 const object_t
& oid
, const object_locator_t
& oloc
,
2416 uint64_t off
, uint64_t len
, snapid_t snap
, bufferlist
*pbl
,
2417 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2418 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2419 Op
*o
= prepare_read_op(oid
, oloc
, off
, len
, snap
, pbl
, flags
,
2420 onfinish
, objver
, extra_ops
, op_flags
);
2426 Op
*prepare_cmpext_op(
2427 const object_t
& oid
, const object_locator_t
& oloc
,
2428 uint64_t off
, bufferlist
&cmp_bl
,
2429 snapid_t snap
, int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2430 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2432 int i
= init_ops(ops
, 1, extra_ops
);
2433 ops
[i
].op
.op
= CEPH_OSD_OP_CMPEXT
;
2434 ops
[i
].op
.extent
.offset
= off
;
2435 ops
[i
].op
.extent
.length
= cmp_bl
.length();
2436 ops
[i
].op
.extent
.truncate_size
= 0;
2437 ops
[i
].op
.extent
.truncate_seq
= 0;
2438 ops
[i
].indata
= cmp_bl
;
2439 ops
[i
].op
.flags
= op_flags
;
2440 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2441 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
2447 const object_t
& oid
, const object_locator_t
& oloc
,
2448 uint64_t off
, bufferlist
&cmp_bl
,
2449 snapid_t snap
, int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2450 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2451 Op
*o
= prepare_cmpext_op(oid
, oloc
, off
, cmp_bl
, snap
,
2452 flags
, onfinish
, objver
, extra_ops
, op_flags
);
2458 ceph_tid_t
read_trunc(const object_t
& oid
, const object_locator_t
& oloc
,
2459 uint64_t off
, uint64_t len
, snapid_t snap
,
2460 bufferlist
*pbl
, int flags
, uint64_t trunc_size
,
2461 __u32 trunc_seq
, Context
*onfinish
,
2462 version_t
*objver
= NULL
,
2463 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2465 int i
= init_ops(ops
, 1, extra_ops
);
2466 ops
[i
].op
.op
= CEPH_OSD_OP_READ
;
2467 ops
[i
].op
.extent
.offset
= off
;
2468 ops
[i
].op
.extent
.length
= len
;
2469 ops
[i
].op
.extent
.truncate_size
= trunc_size
;
2470 ops
[i
].op
.extent
.truncate_seq
= trunc_seq
;
2471 ops
[i
].op
.flags
= op_flags
;
2472 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2473 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
2480 ceph_tid_t
mapext(const object_t
& oid
, const object_locator_t
& oloc
,
2481 uint64_t off
, uint64_t len
, snapid_t snap
, bufferlist
*pbl
,
2482 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2483 ObjectOperation
*extra_ops
= NULL
) {
2485 int i
= init_ops(ops
, 1, extra_ops
);
2486 ops
[i
].op
.op
= CEPH_OSD_OP_MAPEXT
;
2487 ops
[i
].op
.extent
.offset
= off
;
2488 ops
[i
].op
.extent
.length
= len
;
2489 ops
[i
].op
.extent
.truncate_size
= 0;
2490 ops
[i
].op
.extent
.truncate_seq
= 0;
2491 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2492 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
2499 ceph_tid_t
getxattr(const object_t
& oid
, const object_locator_t
& oloc
,
2500 const char *name
, snapid_t snap
, bufferlist
*pbl
, int flags
,
2502 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2504 int i
= init_ops(ops
, 1, extra_ops
);
2505 ops
[i
].op
.op
= CEPH_OSD_OP_GETXATTR
;
2506 ops
[i
].op
.xattr
.name_len
= (name
? strlen(name
) : 0);
2507 ops
[i
].op
.xattr
.value_len
= 0;
2509 ops
[i
].indata
.append(name
);
2510 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2511 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
2519 ceph_tid_t
getxattrs(const object_t
& oid
, const object_locator_t
& oloc
,
2520 snapid_t snap
, map
<string
,bufferlist
>& attrset
,
2521 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2522 ObjectOperation
*extra_ops
= NULL
) {
2524 int i
= init_ops(ops
, 1, extra_ops
);
2525 ops
[i
].op
.op
= CEPH_OSD_OP_GETXATTRS
;
2526 C_GetAttrs
*fin
= new C_GetAttrs(attrset
, onfinish
);
2527 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2528 CEPH_OSD_FLAG_READ
, fin
, objver
);
2530 o
->outbl
= &fin
->bl
;
2536 ceph_tid_t
read_full(const object_t
& oid
, const object_locator_t
& oloc
,
2537 snapid_t snap
, bufferlist
*pbl
, int flags
,
2538 Context
*onfinish
, version_t
*objver
= NULL
,
2539 ObjectOperation
*extra_ops
= NULL
) {
2540 return read(oid
, oloc
, 0, 0, snap
, pbl
, flags
| global_op_flags
|
2541 CEPH_OSD_FLAG_READ
, onfinish
, objver
, extra_ops
);
2546 ceph_tid_t
_modify(const object_t
& oid
, const object_locator_t
& oloc
,
2547 vector
<OSDOp
>& ops
, ceph::real_time mtime
,
2548 const SnapContext
& snapc
, int flags
,
2550 version_t
*objver
= NULL
) {
2551 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2552 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2559 Op
*prepare_write_op(
2560 const object_t
& oid
, const object_locator_t
& oloc
,
2561 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
2562 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2563 Context
*oncommit
, version_t
*objver
= NULL
,
2564 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0,
2565 ZTracer::Trace
*parent_trace
= nullptr) {
2567 int i
= init_ops(ops
, 1, extra_ops
);
2568 ops
[i
].op
.op
= CEPH_OSD_OP_WRITE
;
2569 ops
[i
].op
.extent
.offset
= off
;
2570 ops
[i
].op
.extent
.length
= len
;
2571 ops
[i
].op
.extent
.truncate_size
= 0;
2572 ops
[i
].op
.extent
.truncate_seq
= 0;
2574 ops
[i
].op
.flags
= op_flags
;
2575 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2576 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
,
2577 nullptr, parent_trace
);
2583 const object_t
& oid
, const object_locator_t
& oloc
,
2584 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
2585 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2586 Context
*oncommit
, version_t
*objver
= NULL
,
2587 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2588 Op
*o
= prepare_write_op(oid
, oloc
, off
, len
, snapc
, bl
, mtime
, flags
,
2589 oncommit
, objver
, extra_ops
, op_flags
);
2594 Op
*prepare_append_op(
2595 const object_t
& oid
, const object_locator_t
& oloc
,
2596 uint64_t len
, const SnapContext
& snapc
,
2597 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2599 version_t
*objver
= NULL
,
2600 ObjectOperation
*extra_ops
= NULL
) {
2602 int i
= init_ops(ops
, 1, extra_ops
);
2603 ops
[i
].op
.op
= CEPH_OSD_OP_APPEND
;
2604 ops
[i
].op
.extent
.offset
= 0;
2605 ops
[i
].op
.extent
.length
= len
;
2606 ops
[i
].op
.extent
.truncate_size
= 0;
2607 ops
[i
].op
.extent
.truncate_seq
= 0;
2609 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2610 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2616 const object_t
& oid
, const object_locator_t
& oloc
,
2617 uint64_t len
, const SnapContext
& snapc
,
2618 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2620 version_t
*objver
= NULL
,
2621 ObjectOperation
*extra_ops
= NULL
) {
2622 Op
*o
= prepare_append_op(oid
, oloc
, len
, snapc
, bl
, mtime
, flags
,
2623 oncommit
, objver
, extra_ops
);
2628 ceph_tid_t
write_trunc(const object_t
& oid
, const object_locator_t
& oloc
,
2629 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
2630 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2631 uint64_t trunc_size
, __u32 trunc_seq
,
2633 version_t
*objver
= NULL
,
2634 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2636 int i
= init_ops(ops
, 1, extra_ops
);
2637 ops
[i
].op
.op
= CEPH_OSD_OP_WRITE
;
2638 ops
[i
].op
.extent
.offset
= off
;
2639 ops
[i
].op
.extent
.length
= len
;
2640 ops
[i
].op
.extent
.truncate_size
= trunc_size
;
2641 ops
[i
].op
.extent
.truncate_seq
= trunc_seq
;
2643 ops
[i
].op
.flags
= op_flags
;
2644 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2645 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2652 Op
*prepare_write_full_op(
2653 const object_t
& oid
, const object_locator_t
& oloc
,
2654 const SnapContext
& snapc
, const bufferlist
&bl
,
2655 ceph::real_time mtime
, int flags
,
2656 Context
*oncommit
, version_t
*objver
= NULL
,
2657 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2659 int i
= init_ops(ops
, 1, extra_ops
);
2660 ops
[i
].op
.op
= CEPH_OSD_OP_WRITEFULL
;
2661 ops
[i
].op
.extent
.offset
= 0;
2662 ops
[i
].op
.extent
.length
= bl
.length();
2664 ops
[i
].op
.flags
= op_flags
;
2665 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2666 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2671 ceph_tid_t
write_full(
2672 const object_t
& oid
, const object_locator_t
& oloc
,
2673 const SnapContext
& snapc
, const bufferlist
&bl
,
2674 ceph::real_time mtime
, int flags
,
2675 Context
*oncommit
, version_t
*objver
= NULL
,
2676 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2677 Op
*o
= prepare_write_full_op(oid
, oloc
, snapc
, bl
, mtime
, flags
,
2678 oncommit
, objver
, extra_ops
, op_flags
);
2683 Op
*prepare_writesame_op(
2684 const object_t
& oid
, const object_locator_t
& oloc
,
2685 uint64_t write_len
, uint64_t off
,
2686 const SnapContext
& snapc
, const bufferlist
&bl
,
2687 ceph::real_time mtime
, int flags
,
2688 Context
*oncommit
, version_t
*objver
= NULL
,
2689 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2692 int i
= init_ops(ops
, 1, extra_ops
);
2693 ops
[i
].op
.op
= CEPH_OSD_OP_WRITESAME
;
2694 ops
[i
].op
.writesame
.offset
= off
;
2695 ops
[i
].op
.writesame
.length
= write_len
;
2696 ops
[i
].op
.writesame
.data_length
= bl
.length();
2698 ops
[i
].op
.flags
= op_flags
;
2699 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2700 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2705 ceph_tid_t
writesame(
2706 const object_t
& oid
, const object_locator_t
& oloc
,
2707 uint64_t write_len
, uint64_t off
,
2708 const SnapContext
& snapc
, const bufferlist
&bl
,
2709 ceph::real_time mtime
, int flags
,
2710 Context
*oncommit
, version_t
*objver
= NULL
,
2711 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2713 Op
*o
= prepare_writesame_op(oid
, oloc
, write_len
, off
, snapc
, bl
,
2714 mtime
, flags
, oncommit
, objver
,
2715 extra_ops
, op_flags
);
2721 ceph_tid_t
trunc(const object_t
& oid
, const object_locator_t
& oloc
,
2722 const SnapContext
& snapc
, ceph::real_time mtime
, int flags
,
2723 uint64_t trunc_size
, __u32 trunc_seq
,
2724 Context
*oncommit
, version_t
*objver
= NULL
,
2725 ObjectOperation
*extra_ops
= NULL
) {
2727 int i
= init_ops(ops
, 1, extra_ops
);
2728 ops
[i
].op
.op
= CEPH_OSD_OP_TRUNCATE
;
2729 ops
[i
].op
.extent
.offset
= trunc_size
;
2730 ops
[i
].op
.extent
.truncate_size
= trunc_size
;
2731 ops
[i
].op
.extent
.truncate_seq
= trunc_seq
;
2732 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2733 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2740 ceph_tid_t
zero(const object_t
& oid
, const object_locator_t
& oloc
,
2741 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
2742 ceph::real_time mtime
, int flags
, Context
*oncommit
,
2743 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2745 int i
= init_ops(ops
, 1, extra_ops
);
2746 ops
[i
].op
.op
= CEPH_OSD_OP_ZERO
;
2747 ops
[i
].op
.extent
.offset
= off
;
2748 ops
[i
].op
.extent
.length
= len
;
2749 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2750 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2757 ceph_tid_t
rollback_object(const object_t
& oid
, const object_locator_t
& oloc
,
2758 const SnapContext
& snapc
, snapid_t snapid
,
2759 ceph::real_time mtime
, Context
*oncommit
,
2760 version_t
*objver
= NULL
,
2761 ObjectOperation
*extra_ops
= NULL
) {
2763 int i
= init_ops(ops
, 1, extra_ops
);
2764 ops
[i
].op
.op
= CEPH_OSD_OP_ROLLBACK
;
2765 ops
[i
].op
.snap
.snapid
= snapid
;
2766 Op
*o
= new Op(oid
, oloc
, ops
, CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2773 ceph_tid_t
create(const object_t
& oid
, const object_locator_t
& oloc
,
2774 const SnapContext
& snapc
, ceph::real_time mtime
, int global_flags
,
2775 int create_flags
, Context
*oncommit
,
2776 version_t
*objver
= NULL
,
2777 ObjectOperation
*extra_ops
= NULL
) {
2779 int i
= init_ops(ops
, 1, extra_ops
);
2780 ops
[i
].op
.op
= CEPH_OSD_OP_CREATE
;
2781 ops
[i
].op
.flags
= create_flags
;
2782 Op
*o
= new Op(oid
, oloc
, ops
, global_flags
| global_op_flags
|
2783 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2790 Op
*prepare_remove_op(
2791 const object_t
& oid
, const object_locator_t
& oloc
,
2792 const SnapContext
& snapc
, ceph::real_time mtime
, int flags
,
2794 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2796 int i
= init_ops(ops
, 1, extra_ops
);
2797 ops
[i
].op
.op
= CEPH_OSD_OP_DELETE
;
2798 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2799 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2805 const object_t
& oid
, const object_locator_t
& oloc
,
2806 const SnapContext
& snapc
, ceph::real_time mtime
, int flags
,
2808 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2809 Op
*o
= prepare_remove_op(oid
, oloc
, snapc
, mtime
, flags
,
2810 oncommit
, objver
, extra_ops
);
2816 ceph_tid_t
setxattr(const object_t
& oid
, const object_locator_t
& oloc
,
2817 const char *name
, const SnapContext
& snapc
, const bufferlist
&bl
,
2818 ceph::real_time mtime
, int flags
,
2820 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2822 int i
= init_ops(ops
, 1, extra_ops
);
2823 ops
[i
].op
.op
= CEPH_OSD_OP_SETXATTR
;
2824 ops
[i
].op
.xattr
.name_len
= (name
? strlen(name
) : 0);
2825 ops
[i
].op
.xattr
.value_len
= bl
.length();
2827 ops
[i
].indata
.append(name
);
2828 ops
[i
].indata
.append(bl
);
2829 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2830 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2837 ceph_tid_t
removexattr(const object_t
& oid
, const object_locator_t
& oloc
,
2838 const char *name
, const SnapContext
& snapc
,
2839 ceph::real_time mtime
, int flags
,
2841 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2843 int i
= init_ops(ops
, 1, extra_ops
);
2844 ops
[i
].op
.op
= CEPH_OSD_OP_RMXATTR
;
2845 ops
[i
].op
.xattr
.name_len
= (name
? strlen(name
) : 0);
2846 ops
[i
].op
.xattr
.value_len
= 0;
2848 ops
[i
].indata
.append(name
);
2849 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2850 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2858 void list_nobjects(NListContext
*p
, Context
*onfinish
);
2859 uint32_t list_nobjects_seek(NListContext
*p
, uint32_t pos
);
2860 uint32_t list_nobjects_seek(NListContext
*list_context
, const hobject_t
& c
);
2861 void list_nobjects_get_cursor(NListContext
*list_context
, hobject_t
*c
);
2863 hobject_t
enumerate_objects_begin();
2864 hobject_t
enumerate_objects_end();
2865 //hobject_t enumerate_objects_begin(int n, int m);
2866 void enumerate_objects(
2868 const std::string
&ns
,
2869 const hobject_t
&start
,
2870 const hobject_t
&end
,
2872 const bufferlist
&filter_bl
,
2873 std::list
<librados::ListObjectImpl
> *result
,
2875 Context
*on_finish
);
2877 void _enumerate_reply(
2880 const hobject_t
&end
,
2881 const int64_t pool_id
,
2883 epoch_t reply_epoch
,
2884 std::list
<librados::ListObjectImpl
> *result
,
2886 Context
*on_finish
);
2887 friend class C_EnumerateReply
;
2889 // -------------------------
2892 void pool_op_submit(PoolOp
*op
);
2893 void _pool_op_submit(PoolOp
*op
);
2894 void _finish_pool_op(PoolOp
*op
, int r
);
2895 void _do_delete_pool(int64_t pool
, Context
*onfinish
);
2897 int create_pool_snap(int64_t pool
, string
& snapName
, Context
*onfinish
);
2898 int allocate_selfmanaged_snap(int64_t pool
, snapid_t
*psnapid
,
2900 int delete_pool_snap(int64_t pool
, string
& snapName
, Context
*onfinish
);
2901 int delete_selfmanaged_snap(int64_t pool
, snapid_t snap
, Context
*onfinish
);
2903 int create_pool(string
& name
, Context
*onfinish
, uint64_t auid
=0,
2905 int delete_pool(int64_t pool
, Context
*onfinish
);
2906 int delete_pool(const string
& name
, Context
*onfinish
);
2907 int change_pool_auid(int64_t pool
, Context
*onfinish
, uint64_t auid
);
2909 void handle_pool_op_reply(MPoolOpReply
*m
);
2910 int pool_op_cancel(ceph_tid_t tid
, int r
);
2912 // --------------------------
2915 void _poolstat_submit(PoolStatOp
*op
);
2917 void handle_get_pool_stats_reply(MGetPoolStatsReply
*m
);
2918 void get_pool_stats(list
<string
>& pools
, map
<string
,pool_stat_t
> *result
,
2920 int pool_stat_op_cancel(ceph_tid_t tid
, int r
);
2921 void _finish_pool_stat_op(PoolStatOp
*op
, int r
);
2923 // ---------------------------
2926 void _fs_stats_submit(StatfsOp
*op
);
2928 void handle_fs_stats_reply(MStatfsReply
*m
);
2929 void get_fs_stats(struct ceph_statfs
& result
, Context
*onfinish
);
2930 int statfs_op_cancel(ceph_tid_t tid
, int r
);
2931 void _finish_statfs_op(StatfsOp
*op
, int r
);
2933 // ---------------------------
2934 // some scatter/gather hackery
2936 void _sg_read_finish(vector
<ObjectExtent
>& extents
,
2937 vector
<bufferlist
>& resultbl
,
2938 bufferlist
*bl
, Context
*onfinish
);
2940 struct C_SGRead
: public Context
{
2942 vector
<ObjectExtent
> extents
;
2943 vector
<bufferlist
> resultbl
;
2946 C_SGRead(Objecter
*ob
,
2947 vector
<ObjectExtent
>& e
, vector
<bufferlist
>& r
, bufferlist
*b
,
2949 objecter(ob
), bl(b
), onfinish(c
) {
2953 void finish(int r
) override
{
2954 objecter
->_sg_read_finish(extents
, resultbl
, bl
, onfinish
);
2958 void sg_read_trunc(vector
<ObjectExtent
>& extents
, snapid_t snap
,
2959 bufferlist
*bl
, int flags
, uint64_t trunc_size
,
2960 __u32 trunc_seq
, Context
*onfinish
, int op_flags
= 0) {
2961 if (extents
.size() == 1) {
2962 read_trunc(extents
[0].oid
, extents
[0].oloc
, extents
[0].offset
,
2963 extents
[0].length
, snap
, bl
, flags
, extents
[0].truncate_size
,
2964 trunc_seq
, onfinish
, 0, 0, op_flags
);
2966 C_GatherBuilder
gather(cct
);
2967 vector
<bufferlist
> resultbl(extents
.size());
2969 for (vector
<ObjectExtent
>::iterator p
= extents
.begin();
2972 read_trunc(p
->oid
, p
->oloc
, p
->offset
, p
->length
, snap
, &resultbl
[i
++],
2973 flags
, p
->truncate_size
, trunc_seq
, gather
.new_sub(),
2976 gather
.set_finisher(new C_SGRead(this, extents
, resultbl
, bl
, onfinish
));
2981 void sg_read(vector
<ObjectExtent
>& extents
, snapid_t snap
, bufferlist
*bl
,
2982 int flags
, Context
*onfinish
, int op_flags
= 0) {
2983 sg_read_trunc(extents
, snap
, bl
, flags
, 0, 0, onfinish
, op_flags
);
2986 void sg_write_trunc(vector
<ObjectExtent
>& extents
, const SnapContext
& snapc
,
2987 const bufferlist
& bl
, ceph::real_time mtime
, int flags
,
2988 uint64_t trunc_size
, __u32 trunc_seq
,
2989 Context
*oncommit
, int op_flags
= 0) {
2990 if (extents
.size() == 1) {
2991 write_trunc(extents
[0].oid
, extents
[0].oloc
, extents
[0].offset
,
2992 extents
[0].length
, snapc
, bl
, mtime
, flags
,
2993 extents
[0].truncate_size
, trunc_seq
, oncommit
,
2996 C_GatherBuilder
gcom(cct
, oncommit
);
2997 for (vector
<ObjectExtent
>::iterator p
= extents
.begin();
3001 for (vector
<pair
<uint64_t,uint64_t> >::iterator bit
3002 = p
->buffer_extents
.begin();
3003 bit
!= p
->buffer_extents
.end();
3005 bl
.copy(bit
->first
, bit
->second
, cur
);
3006 assert(cur
.length() == p
->length
);
3007 write_trunc(p
->oid
, p
->oloc
, p
->offset
, p
->length
,
3008 snapc
, cur
, mtime
, flags
, p
->truncate_size
, trunc_seq
,
3009 oncommit
? gcom
.new_sub():0,
3016 void sg_write(vector
<ObjectExtent
>& extents
, const SnapContext
& snapc
,
3017 const bufferlist
& bl
, ceph::real_time mtime
, int flags
,
3018 Context
*oncommit
, int op_flags
= 0) {
3019 sg_write_trunc(extents
, snapc
, bl
, mtime
, flags
, 0, 0, oncommit
,
3023 void ms_handle_connect(Connection
*con
) override
;
3024 bool ms_handle_reset(Connection
*con
) override
;
3025 void ms_handle_remote_reset(Connection
*con
) override
;
3026 bool ms_handle_refused(Connection
*con
) override
;
3027 bool ms_get_authorizer(int dest_type
,
3028 AuthAuthorizer
**authorizer
,
3029 bool force_new
) override
;
3031 void blacklist_self(bool set
);
3034 epoch_t epoch_barrier
;
3035 bool retry_writes_after_first_reply
;
3037 void set_epoch_barrier(epoch_t epoch
);
3039 PerfCounters
*get_logger() {