1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_OBJECTER_H
16 #define CEPH_OBJECTER_H
18 #include <condition_variable>
24 #include <type_traits>
26 #include <boost/thread/shared_mutex.hpp>
28 #include "include/assert.h"
29 #include "include/buffer.h"
30 #include "include/types.h"
31 #include "include/rados/rados_types.hpp"
33 #include "common/admin_socket.h"
34 #include "common/ceph_time.h"
35 #include "common/ceph_timer.h"
36 #include "common/Finisher.h"
37 #include "common/shunique_lock.h"
38 #include "common/zipkin_trace.h"
40 #include "messages/MOSDOp.h"
41 #include "osd/OSDMap.h"
54 class MGetPoolStatsReply
;
61 // -----------------------------------------
63 struct ObjectOperation
{
68 vector
<bufferlist
*> out_bl
;
69 vector
<Context
*> out_handler
;
70 vector
<int*> out_rval
;
72 ObjectOperation() : flags(0), priority(0) {}
74 while (!out_handler
.empty()) {
75 delete out_handler
.back();
76 out_handler
.pop_back();
84 void set_last_op_flags(int flags
) {
86 ops
.rbegin()->op
.flags
= flags
;
91 * Add a callback to run when this operation completes,
92 * after any other callbacks for it.
94 void add_handler(Context
*extra
);
96 OSDOp
& add_op(int op
) {
102 out_handler
.resize(s
+1);
103 out_handler
[s
] = NULL
;
104 out_rval
.resize(s
+1);
108 void add_data(int op
, uint64_t off
, uint64_t len
, bufferlist
& bl
) {
109 OSDOp
& osd_op
= add_op(op
);
110 osd_op
.op
.extent
.offset
= off
;
111 osd_op
.op
.extent
.length
= len
;
112 osd_op
.indata
.claim_append(bl
);
114 void add_writesame(int op
, uint64_t off
, uint64_t write_len
,
116 OSDOp
& osd_op
= add_op(op
);
117 osd_op
.op
.writesame
.offset
= off
;
118 osd_op
.op
.writesame
.length
= write_len
;
119 osd_op
.op
.writesame
.data_length
= bl
.length();
120 osd_op
.indata
.claim_append(bl
);
122 void add_xattr(int op
, const char *name
, const bufferlist
& data
) {
123 OSDOp
& osd_op
= add_op(op
);
124 osd_op
.op
.xattr
.name_len
= (name
? strlen(name
) : 0);
125 osd_op
.op
.xattr
.value_len
= data
.length();
127 osd_op
.indata
.append(name
);
128 osd_op
.indata
.append(data
);
130 void add_xattr_cmp(int op
, const char *name
, uint8_t cmp_op
,
131 uint8_t cmp_mode
, const bufferlist
& data
) {
132 OSDOp
& osd_op
= add_op(op
);
133 osd_op
.op
.xattr
.name_len
= (name
? strlen(name
) : 0);
134 osd_op
.op
.xattr
.value_len
= data
.length();
135 osd_op
.op
.xattr
.cmp_op
= cmp_op
;
136 osd_op
.op
.xattr
.cmp_mode
= cmp_mode
;
138 osd_op
.indata
.append(name
);
139 osd_op
.indata
.append(data
);
141 void add_call(int op
, const char *cname
, const char *method
,
143 bufferlist
*outbl
, Context
*ctx
, int *prval
) {
144 OSDOp
& osd_op
= add_op(op
);
146 unsigned p
= ops
.size() - 1;
147 out_handler
[p
] = ctx
;
151 osd_op
.op
.cls
.class_len
= strlen(cname
);
152 osd_op
.op
.cls
.method_len
= strlen(method
);
153 osd_op
.op
.cls
.indata_len
= indata
.length();
154 osd_op
.indata
.append(cname
, osd_op
.op
.cls
.class_len
);
155 osd_op
.indata
.append(method
, osd_op
.op
.cls
.method_len
);
156 osd_op
.indata
.append(indata
);
158 void add_pgls(int op
, uint64_t count
, collection_list_handle_t cookie
,
159 epoch_t start_epoch
) {
160 OSDOp
& osd_op
= add_op(op
);
161 osd_op
.op
.pgls
.count
= count
;
162 osd_op
.op
.pgls
.start_epoch
= start_epoch
;
163 ::encode(cookie
, osd_op
.indata
);
165 void add_pgls_filter(int op
, uint64_t count
, const bufferlist
& filter
,
166 collection_list_handle_t cookie
, epoch_t start_epoch
) {
167 OSDOp
& osd_op
= add_op(op
);
168 osd_op
.op
.pgls
.count
= count
;
169 osd_op
.op
.pgls
.start_epoch
= start_epoch
;
171 string mname
= "filter";
172 ::encode(cname
, osd_op
.indata
);
173 ::encode(mname
, osd_op
.indata
);
174 osd_op
.indata
.append(filter
);
175 ::encode(cookie
, osd_op
.indata
);
177 void add_alloc_hint(int op
, uint64_t expected_object_size
,
178 uint64_t expected_write_size
,
180 OSDOp
& osd_op
= add_op(op
);
181 osd_op
.op
.alloc_hint
.expected_object_size
= expected_object_size
;
182 osd_op
.op
.alloc_hint
.expected_write_size
= expected_write_size
;
183 osd_op
.op
.alloc_hint
.flags
= flags
;
189 void pg_ls(uint64_t count
, bufferlist
& filter
,
190 collection_list_handle_t cookie
, epoch_t start_epoch
) {
191 if (filter
.length() == 0)
192 add_pgls(CEPH_OSD_OP_PGLS
, count
, cookie
, start_epoch
);
194 add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER
, count
, filter
, cookie
,
196 flags
|= CEPH_OSD_FLAG_PGOP
;
199 void pg_nls(uint64_t count
, const bufferlist
& filter
,
200 collection_list_handle_t cookie
, epoch_t start_epoch
) {
201 if (filter
.length() == 0)
202 add_pgls(CEPH_OSD_OP_PGNLS
, count
, cookie
, start_epoch
);
204 add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER
, count
, filter
, cookie
,
206 flags
|= CEPH_OSD_FLAG_PGOP
;
209 void scrub_ls(const librados::object_id_t
& start_after
,
211 std::vector
<librados::inconsistent_obj_t
> *objects
,
214 void scrub_ls(const librados::object_id_t
& start_after
,
216 std::vector
<librados::inconsistent_snapset_t
> *objects
,
220 void create(bool excl
) {
221 OSDOp
& o
= add_op(CEPH_OSD_OP_CREATE
);
222 o
.op
.flags
= (excl
? CEPH_OSD_OP_FLAG_EXCL
: 0);
225 struct C_ObjectOperation_stat
: public Context
{
228 ceph::real_time
*pmtime
;
230 struct timespec
*pts
;
232 C_ObjectOperation_stat(uint64_t *ps
, ceph::real_time
*pm
, time_t *pt
, struct timespec
*_pts
,
234 : psize(ps
), pmtime(pm
), ptime(pt
), pts(_pts
), prval(prval
) {}
235 void finish(int r
) override
{
237 bufferlist::iterator p
= bl
.begin();
240 ceph::real_time mtime
;
248 *ptime
= ceph::real_clock::to_time_t(mtime
);
250 *pts
= ceph::real_clock::to_timespec(mtime
);
251 } catch (buffer::error
& e
) {
258 void stat(uint64_t *psize
, ceph::real_time
*pmtime
, int *prval
) {
259 add_op(CEPH_OSD_OP_STAT
);
260 unsigned p
= ops
.size() - 1;
261 C_ObjectOperation_stat
*h
= new C_ObjectOperation_stat(psize
, pmtime
, NULL
, NULL
,
267 void stat(uint64_t *psize
, time_t *ptime
, int *prval
) {
268 add_op(CEPH_OSD_OP_STAT
);
269 unsigned p
= ops
.size() - 1;
270 C_ObjectOperation_stat
*h
= new C_ObjectOperation_stat(psize
, NULL
, ptime
, NULL
,
276 void stat(uint64_t *psize
, struct timespec
*pts
, int *prval
) {
277 add_op(CEPH_OSD_OP_STAT
);
278 unsigned p
= ops
.size() - 1;
279 C_ObjectOperation_stat
*h
= new C_ObjectOperation_stat(psize
, NULL
, NULL
, pts
,
286 struct C_ObjectOperation_cmpext
: public Context
{
288 C_ObjectOperation_cmpext(int *prval
)
297 void cmpext(uint64_t off
, bufferlist
& cmp_bl
, int *prval
) {
298 add_data(CEPH_OSD_OP_CMPEXT
, off
, cmp_bl
.length(), cmp_bl
);
299 unsigned p
= ops
.size() - 1;
300 C_ObjectOperation_cmpext
*h
= new C_ObjectOperation_cmpext(prval
);
306 void cmpext(uint64_t off
, uint64_t cmp_len
, const char *cmp_buf
, int *prval
) {
308 cmp_bl
.append(cmp_buf
, cmp_len
);
309 add_data(CEPH_OSD_OP_CMPEXT
, off
, cmp_len
, cmp_bl
);
310 unsigned p
= ops
.size() - 1;
311 C_ObjectOperation_cmpext
*h
= new C_ObjectOperation_cmpext(prval
);
316 void read(uint64_t off
, uint64_t len
, bufferlist
*pbl
, int *prval
,
319 add_data(CEPH_OSD_OP_READ
, off
, len
, bl
);
320 unsigned p
= ops
.size() - 1;
323 out_handler
[p
] = ctx
;
326 struct C_ObjectOperation_sparse_read
: public Context
{
329 std::map
<uint64_t, uint64_t> *extents
;
331 C_ObjectOperation_sparse_read(bufferlist
*data_bl
,
332 std::map
<uint64_t, uint64_t> *extents
,
334 : data_bl(data_bl
), extents(extents
), prval(prval
) {}
335 void finish(int r
) override
{
336 bufferlist::iterator iter
= bl
.begin();
339 ::decode(*extents
, iter
);
340 ::decode(*data_bl
, iter
);
341 } catch (buffer::error
& e
) {
348 void sparse_read(uint64_t off
, uint64_t len
, std::map
<uint64_t,uint64_t> *m
,
349 bufferlist
*data_bl
, int *prval
) {
351 add_data(CEPH_OSD_OP_SPARSE_READ
, off
, len
, bl
);
352 unsigned p
= ops
.size() - 1;
353 C_ObjectOperation_sparse_read
*h
=
354 new C_ObjectOperation_sparse_read(data_bl
, m
, prval
);
359 void write(uint64_t off
, bufferlist
& bl
,
360 uint64_t truncate_size
,
361 uint32_t truncate_seq
) {
362 add_data(CEPH_OSD_OP_WRITE
, off
, bl
.length(), bl
);
363 OSDOp
& o
= *ops
.rbegin();
364 o
.op
.extent
.truncate_size
= truncate_size
;
365 o
.op
.extent
.truncate_seq
= truncate_seq
;
367 void write(uint64_t off
, bufferlist
& bl
) {
368 write(off
, bl
, 0, 0);
370 void write_full(bufferlist
& bl
) {
371 add_data(CEPH_OSD_OP_WRITEFULL
, 0, bl
.length(), bl
);
373 void writesame(uint64_t off
, uint64_t write_len
, bufferlist
& bl
) {
374 add_writesame(CEPH_OSD_OP_WRITESAME
, off
, write_len
, bl
);
376 void append(bufferlist
& bl
) {
377 add_data(CEPH_OSD_OP_APPEND
, 0, bl
.length(), bl
);
379 void zero(uint64_t off
, uint64_t len
) {
381 add_data(CEPH_OSD_OP_ZERO
, off
, len
, bl
);
383 void truncate(uint64_t off
) {
385 add_data(CEPH_OSD_OP_TRUNCATE
, off
, 0, bl
);
389 add_data(CEPH_OSD_OP_DELETE
, 0, 0, bl
);
391 void mapext(uint64_t off
, uint64_t len
) {
393 add_data(CEPH_OSD_OP_MAPEXT
, off
, len
, bl
);
395 void sparse_read(uint64_t off
, uint64_t len
) {
397 add_data(CEPH_OSD_OP_SPARSE_READ
, off
, len
, bl
);
400 void checksum(uint8_t type
, const bufferlist
&init_value_bl
,
401 uint64_t off
, uint64_t len
, size_t chunk_size
,
402 bufferlist
*pbl
, int *prval
, Context
*ctx
) {
403 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_CHECKSUM
);
404 osd_op
.op
.checksum
.offset
= off
;
405 osd_op
.op
.checksum
.length
= len
;
406 osd_op
.op
.checksum
.type
= type
;
407 osd_op
.op
.checksum
.chunk_size
= chunk_size
;
408 osd_op
.indata
.append(init_value_bl
);
410 unsigned p
= ops
.size() - 1;
413 out_handler
[p
] = ctx
;
417 void getxattr(const char *name
, bufferlist
*pbl
, int *prval
) {
419 add_xattr(CEPH_OSD_OP_GETXATTR
, name
, bl
);
420 unsigned p
= ops
.size() - 1;
424 struct C_ObjectOperation_decodevals
: public Context
{
425 uint64_t max_entries
;
427 std::map
<std::string
,bufferlist
> *pattrs
;
430 C_ObjectOperation_decodevals(uint64_t m
, std::map
<std::string
,bufferlist
> *pa
,
432 : max_entries(m
), pattrs(pa
), ptruncated(pt
), prval(pr
) {
437 void finish(int r
) override
{
439 bufferlist::iterator p
= bl
.begin();
442 ::decode(*pattrs
, p
);
444 std::map
<std::string
,bufferlist
> ignore
;
450 ::decode(*ptruncated
, p
);
452 // the OSD did not provide this. since old OSDs do not
453 // enfoce omap result limits either, we can infer it from
454 // the size of the result
455 *ptruncated
= (pattrs
->size() == max_entries
);
459 catch (buffer::error
& e
) {
466 struct C_ObjectOperation_decodekeys
: public Context
{
467 uint64_t max_entries
;
469 std::set
<std::string
> *pattrs
;
472 C_ObjectOperation_decodekeys(uint64_t m
, std::set
<std::string
> *pa
, bool *pt
,
474 : max_entries(m
), pattrs(pa
), ptruncated(pt
), prval(pr
) {
479 void finish(int r
) override
{
481 bufferlist::iterator p
= bl
.begin();
484 ::decode(*pattrs
, p
);
486 std::set
<std::string
> ignore
;
492 ::decode(*ptruncated
, p
);
494 // the OSD did not provide this. since old OSDs do not
495 // enfoce omap result limits either, we can infer it from
496 // the size of the result
497 *ptruncated
= (pattrs
->size() == max_entries
);
501 catch (buffer::error
& e
) {
508 struct C_ObjectOperation_decodewatchers
: public Context
{
510 list
<obj_watch_t
> *pwatchers
;
512 C_ObjectOperation_decodewatchers(list
<obj_watch_t
> *pw
, int *pr
)
513 : pwatchers(pw
), prval(pr
) {}
514 void finish(int r
) override
{
516 bufferlist::iterator p
= bl
.begin();
518 obj_list_watch_response_t resp
;
521 for (list
<watch_item_t
>::iterator i
= resp
.entries
.begin() ;
522 i
!= resp
.entries
.end() ; ++i
) {
526 strncpy(ow
.addr
, sa
.str().c_str(), 256);
527 ow
.watcher_id
= i
->name
.num();
528 ow
.cookie
= i
->cookie
;
529 ow
.timeout_seconds
= i
->timeout_seconds
;
530 pwatchers
->push_back(ow
);
534 catch (buffer::error
& e
) {
541 struct C_ObjectOperation_decodesnaps
: public Context
{
543 librados::snap_set_t
*psnaps
;
545 C_ObjectOperation_decodesnaps(librados::snap_set_t
*ps
, int *pr
)
546 : psnaps(ps
), prval(pr
) {}
547 void finish(int r
) override
{
549 bufferlist::iterator p
= bl
.begin();
551 obj_list_snap_response_t resp
;
554 psnaps
->clones
.clear();
555 for (vector
<clone_info
>::iterator ci
= resp
.clones
.begin();
556 ci
!= resp
.clones
.end();
558 librados::clone_info_t clone
;
560 clone
.cloneid
= ci
->cloneid
;
561 clone
.snaps
.reserve(ci
->snaps
.size());
562 clone
.snaps
.insert(clone
.snaps
.end(), ci
->snaps
.begin(),
564 clone
.overlap
= ci
->overlap
;
565 clone
.size
= ci
->size
;
567 psnaps
->clones
.push_back(clone
);
569 psnaps
->seq
= resp
.seq
;
571 } catch (buffer::error
& e
) {
578 void getxattrs(std::map
<std::string
,bufferlist
> *pattrs
, int *prval
) {
579 add_op(CEPH_OSD_OP_GETXATTRS
);
580 if (pattrs
|| prval
) {
581 unsigned p
= ops
.size() - 1;
582 C_ObjectOperation_decodevals
*h
583 = new C_ObjectOperation_decodevals(0, pattrs
, nullptr, prval
);
589 void setxattr(const char *name
, const bufferlist
& bl
) {
590 add_xattr(CEPH_OSD_OP_SETXATTR
, name
, bl
);
592 void setxattr(const char *name
, const string
& s
) {
595 add_xattr(CEPH_OSD_OP_SETXATTR
, name
, bl
);
597 void cmpxattr(const char *name
, uint8_t cmp_op
, uint8_t cmp_mode
,
598 const bufferlist
& bl
) {
599 add_xattr_cmp(CEPH_OSD_OP_CMPXATTR
, name
, cmp_op
, cmp_mode
, bl
);
601 void rmxattr(const char *name
) {
603 add_xattr(CEPH_OSD_OP_RMXATTR
, name
, bl
);
605 void setxattrs(map
<string
, bufferlist
>& attrs
) {
608 add_xattr(CEPH_OSD_OP_RESETXATTRS
, 0, bl
.length());
610 void resetxattrs(const char *prefix
, map
<string
, bufferlist
>& attrs
) {
613 add_xattr(CEPH_OSD_OP_RESETXATTRS
, prefix
, bl
);
617 void tmap_update(bufferlist
& bl
) {
618 add_data(CEPH_OSD_OP_TMAPUP
, 0, 0, bl
);
620 void tmap_put(bufferlist
& bl
) {
621 add_data(CEPH_OSD_OP_TMAPPUT
, 0, bl
.length(), bl
);
623 void tmap_get(bufferlist
*pbl
, int *prval
) {
624 add_op(CEPH_OSD_OP_TMAPGET
);
625 unsigned p
= ops
.size() - 1;
630 add_op(CEPH_OSD_OP_TMAPGET
);
632 void tmap_to_omap(bool nullok
=false) {
633 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_TMAP2OMAP
);
635 osd_op
.op
.tmap2omap
.flags
= CEPH_OSD_TMAP2OMAP_NULLOK
;
639 void omap_get_keys(const string
&start_after
,
641 std::set
<std::string
> *out_set
,
644 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAPGETKEYS
);
646 ::encode(start_after
, bl
);
647 ::encode(max_to_get
, bl
);
648 op
.op
.extent
.offset
= 0;
649 op
.op
.extent
.length
= bl
.length();
650 op
.indata
.claim_append(bl
);
651 if (prval
|| ptruncated
|| out_set
) {
652 unsigned p
= ops
.size() - 1;
653 C_ObjectOperation_decodekeys
*h
=
654 new C_ObjectOperation_decodekeys(max_to_get
, out_set
, ptruncated
, prval
);
661 void omap_get_vals(const string
&start_after
,
662 const string
&filter_prefix
,
664 std::map
<std::string
, bufferlist
> *out_set
,
667 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAPGETVALS
);
669 ::encode(start_after
, bl
);
670 ::encode(max_to_get
, bl
);
671 ::encode(filter_prefix
, bl
);
672 op
.op
.extent
.offset
= 0;
673 op
.op
.extent
.length
= bl
.length();
674 op
.indata
.claim_append(bl
);
675 if (prval
|| out_set
|| ptruncated
) {
676 unsigned p
= ops
.size() - 1;
677 C_ObjectOperation_decodevals
*h
=
678 new C_ObjectOperation_decodevals(max_to_get
, out_set
, ptruncated
, prval
);
685 void omap_get_vals_by_keys(const std::set
<std::string
> &to_get
,
686 std::map
<std::string
, bufferlist
> *out_set
,
688 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS
);
690 ::encode(to_get
, bl
);
691 op
.op
.extent
.offset
= 0;
692 op
.op
.extent
.length
= bl
.length();
693 op
.indata
.claim_append(bl
);
694 if (prval
|| out_set
) {
695 unsigned p
= ops
.size() - 1;
696 C_ObjectOperation_decodevals
*h
=
697 new C_ObjectOperation_decodevals(0, out_set
, nullptr, prval
);
704 void omap_cmp(const std::map
<std::string
, pair
<bufferlist
,int> > &assertions
,
706 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAP_CMP
);
708 ::encode(assertions
, bl
);
709 op
.op
.extent
.offset
= 0;
710 op
.op
.extent
.length
= bl
.length();
711 op
.indata
.claim_append(bl
);
713 unsigned p
= ops
.size() - 1;
718 struct C_ObjectOperation_copyget
: public Context
{
720 object_copy_cursor_t
*cursor
;
722 ceph::real_time
*out_mtime
;
723 std::map
<std::string
,bufferlist
> *out_attrs
;
724 bufferlist
*out_data
, *out_omap_header
, *out_omap_data
;
725 vector
<snapid_t
> *out_snaps
;
726 snapid_t
*out_snap_seq
;
728 uint32_t *out_data_digest
;
729 uint32_t *out_omap_digest
;
730 mempool::osd_pglog::vector
<pair
<osd_reqid_t
, version_t
> > *out_reqids
;
731 uint64_t *out_truncate_seq
;
732 uint64_t *out_truncate_size
;
734 C_ObjectOperation_copyget(object_copy_cursor_t
*c
,
737 std::map
<std::string
,bufferlist
> *a
,
738 bufferlist
*d
, bufferlist
*oh
,
740 std::vector
<snapid_t
> *osnaps
,
745 mempool::osd_pglog::vector
<pair
<osd_reqid_t
, version_t
> > *oreqids
,
750 out_size(s
), out_mtime(m
),
751 out_attrs(a
), out_data(d
), out_omap_header(oh
),
752 out_omap_data(o
), out_snaps(osnaps
), out_snap_seq(osnap_seq
),
753 out_flags(flags
), out_data_digest(dd
), out_omap_digest(od
),
755 out_truncate_seq(otseq
),
756 out_truncate_size(otsize
),
758 void finish(int r
) override
{
759 // reqids are copied on ENOENT
760 if (r
< 0 && r
!= -ENOENT
)
763 bufferlist::iterator p
= bl
.begin();
764 object_copy_data_t copy_reply
;
765 ::decode(copy_reply
, p
);
768 *out_reqids
= copy_reply
.reqids
;
772 *out_size
= copy_reply
.size
;
774 *out_mtime
= ceph::real_clock::from_ceph_timespec(copy_reply
.mtime
);
776 *out_attrs
= copy_reply
.attrs
;
778 out_data
->claim_append(copy_reply
.data
);
780 out_omap_header
->claim_append(copy_reply
.omap_header
);
782 *out_omap_data
= copy_reply
.omap_data
;
784 *out_snaps
= copy_reply
.snaps
;
786 *out_snap_seq
= copy_reply
.snap_seq
;
788 *out_flags
= copy_reply
.flags
;
790 *out_data_digest
= copy_reply
.data_digest
;
792 *out_omap_digest
= copy_reply
.omap_digest
;
794 *out_reqids
= copy_reply
.reqids
;
795 if (out_truncate_seq
)
796 *out_truncate_seq
= copy_reply
.truncate_seq
;
797 if (out_truncate_size
)
798 *out_truncate_size
= copy_reply
.truncate_size
;
799 *cursor
= copy_reply
.cursor
;
800 } catch (buffer::error
& e
) {
807 void copy_get(object_copy_cursor_t
*cursor
,
810 ceph::real_time
*out_mtime
,
811 std::map
<std::string
,bufferlist
> *out_attrs
,
812 bufferlist
*out_data
,
813 bufferlist
*out_omap_header
,
814 bufferlist
*out_omap_data
,
815 vector
<snapid_t
> *out_snaps
,
816 snapid_t
*out_snap_seq
,
818 uint32_t *out_data_digest
,
819 uint32_t *out_omap_digest
,
820 mempool::osd_pglog::vector
<pair
<osd_reqid_t
, version_t
> > *out_reqids
,
821 uint64_t *truncate_seq
,
822 uint64_t *truncate_size
,
824 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_COPY_GET
);
825 osd_op
.op
.copy_get
.max
= max
;
826 ::encode(*cursor
, osd_op
.indata
);
827 ::encode(max
, osd_op
.indata
);
828 unsigned p
= ops
.size() - 1;
830 C_ObjectOperation_copyget
*h
=
831 new C_ObjectOperation_copyget(cursor
, out_size
, out_mtime
,
832 out_attrs
, out_data
, out_omap_header
,
833 out_omap_data
, out_snaps
, out_snap_seq
,
834 out_flags
, out_data_digest
,
835 out_omap_digest
, out_reqids
, truncate_seq
,
836 truncate_size
, prval
);
842 add_op(CEPH_OSD_OP_UNDIRTY
);
845 struct C_ObjectOperation_isdirty
: public Context
{
849 C_ObjectOperation_isdirty(bool *p
, int *r
)
850 : pisdirty(p
), prval(r
) {}
851 void finish(int r
) override
{
855 bufferlist::iterator p
= bl
.begin();
857 ::decode(isdirty
, p
);
860 } catch (buffer::error
& e
) {
867 void is_dirty(bool *pisdirty
, int *prval
) {
868 add_op(CEPH_OSD_OP_ISDIRTY
);
869 unsigned p
= ops
.size() - 1;
871 C_ObjectOperation_isdirty
*h
=
872 new C_ObjectOperation_isdirty(pisdirty
, prval
);
877 struct C_ObjectOperation_hit_set_ls
: public Context
{
879 std::list
< std::pair
<time_t, time_t> > *ptls
;
880 std::list
< std::pair
<ceph::real_time
, ceph::real_time
> > *putls
;
882 C_ObjectOperation_hit_set_ls(std::list
< std::pair
<time_t, time_t> > *t
,
883 std::list
< std::pair
<ceph::real_time
,
884 ceph::real_time
> > *ut
,
886 : ptls(t
), putls(ut
), prval(r
) {}
887 void finish(int r
) override
{
891 bufferlist::iterator p
= bl
.begin();
892 std::list
< std::pair
<ceph::real_time
, ceph::real_time
> > ls
;
896 for (auto p
= ls
.begin(); p
!= ls
.end(); ++p
)
897 // round initial timestamp up to the next full second to
898 // keep this a valid interval.
900 make_pair(ceph::real_clock::to_time_t(
902 // Sadly, no time literals until C++14.
903 std::chrono::seconds(1))),
904 ceph::real_clock::to_time_t(p
->second
)));
908 } catch (buffer::error
& e
) {
917 * list available HitSets.
919 * We will get back a list of time intervals. Note that the most
920 * recent range may have an empty end timestamp if it is still
923 * @param pls [out] list of time intervals
924 * @param prval [out] return value
926 void hit_set_ls(std::list
< std::pair
<time_t, time_t> > *pls
, int *prval
) {
927 add_op(CEPH_OSD_OP_PG_HITSET_LS
);
928 unsigned p
= ops
.size() - 1;
930 C_ObjectOperation_hit_set_ls
*h
=
931 new C_ObjectOperation_hit_set_ls(pls
, NULL
, prval
);
935 void hit_set_ls(std::list
<std::pair
<ceph::real_time
, ceph::real_time
> > *pls
,
937 add_op(CEPH_OSD_OP_PG_HITSET_LS
);
938 unsigned p
= ops
.size() - 1;
940 C_ObjectOperation_hit_set_ls
*h
=
941 new C_ObjectOperation_hit_set_ls(NULL
, pls
, prval
);
949 * Return an encoded HitSet that includes the provided time
952 * @param stamp [in] timestamp
953 * @param pbl [out] target buffer for encoded HitSet
954 * @param prval [out] return value
956 void hit_set_get(ceph::real_time stamp
, bufferlist
*pbl
, int *prval
) {
957 OSDOp
& op
= add_op(CEPH_OSD_OP_PG_HITSET_GET
);
958 op
.op
.hit_set_get
.stamp
= ceph::real_clock::to_ceph_timespec(stamp
);
959 unsigned p
= ops
.size() - 1;
964 void omap_get_header(bufferlist
*bl
, int *prval
) {
965 add_op(CEPH_OSD_OP_OMAPGETHEADER
);
966 unsigned p
= ops
.size() - 1;
971 void omap_set(const map
<string
, bufferlist
> &map
) {
974 add_data(CEPH_OSD_OP_OMAPSETVALS
, 0, bl
.length(), bl
);
977 void omap_set_header(bufferlist
&bl
) {
978 add_data(CEPH_OSD_OP_OMAPSETHEADER
, 0, bl
.length(), bl
);
982 add_op(CEPH_OSD_OP_OMAPCLEAR
);
985 void omap_rm_keys(const std::set
<std::string
> &to_remove
) {
987 ::encode(to_remove
, bl
);
988 add_data(CEPH_OSD_OP_OMAPRMKEYS
, 0, bl
.length(), bl
);
992 void call(const char *cname
, const char *method
, bufferlist
&indata
) {
993 add_call(CEPH_OSD_OP_CALL
, cname
, method
, indata
, NULL
, NULL
, NULL
);
996 void call(const char *cname
, const char *method
, bufferlist
&indata
,
997 bufferlist
*outdata
, Context
*ctx
, int *prval
) {
998 add_call(CEPH_OSD_OP_CALL
, cname
, method
, indata
, outdata
, ctx
, prval
);
1002 void watch(uint64_t cookie
, __u8 op
, uint32_t timeout
= 0) {
1003 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_WATCH
);
1004 osd_op
.op
.watch
.cookie
= cookie
;
1005 osd_op
.op
.watch
.op
= op
;
1006 osd_op
.op
.watch
.timeout
= timeout
;
1009 void notify(uint64_t cookie
, uint32_t prot_ver
, uint32_t timeout
,
1010 bufferlist
&bl
, bufferlist
*inbl
) {
1011 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_NOTIFY
);
1012 osd_op
.op
.notify
.cookie
= cookie
;
1013 ::encode(prot_ver
, *inbl
);
1014 ::encode(timeout
, *inbl
);
1015 ::encode(bl
, *inbl
);
1016 osd_op
.indata
.append(*inbl
);
1019 void notify_ack(uint64_t notify_id
, uint64_t cookie
,
1020 bufferlist
& reply_bl
) {
1021 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_NOTIFY_ACK
);
1023 ::encode(notify_id
, bl
);
1024 ::encode(cookie
, bl
);
1025 ::encode(reply_bl
, bl
);
1026 osd_op
.indata
.append(bl
);
1029 void list_watchers(list
<obj_watch_t
> *out
,
1031 (void)add_op(CEPH_OSD_OP_LIST_WATCHERS
);
1033 unsigned p
= ops
.size() - 1;
1034 C_ObjectOperation_decodewatchers
*h
=
1035 new C_ObjectOperation_decodewatchers(out
, prval
);
1038 out_rval
[p
] = prval
;
1042 void list_snaps(librados::snap_set_t
*out
, int *prval
) {
1043 (void)add_op(CEPH_OSD_OP_LIST_SNAPS
);
1045 unsigned p
= ops
.size() - 1;
1046 C_ObjectOperation_decodesnaps
*h
=
1047 new C_ObjectOperation_decodesnaps(out
, prval
);
1050 out_rval
[p
] = prval
;
1054 void assert_version(uint64_t ver
) {
1055 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_ASSERT_VER
);
1056 osd_op
.op
.assert_ver
.ver
= ver
;
1059 void cmpxattr(const char *name
, const bufferlist
& val
,
1061 add_xattr(CEPH_OSD_OP_CMPXATTR
, name
, val
);
1062 OSDOp
& o
= *ops
.rbegin();
1063 o
.op
.xattr
.cmp_op
= op
;
1064 o
.op
.xattr
.cmp_mode
= mode
;
1067 void rollback(uint64_t snapid
) {
1068 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_ROLLBACK
);
1069 osd_op
.op
.snap
.snapid
= snapid
;
1072 void copy_from(object_t src
, snapid_t snapid
, object_locator_t src_oloc
,
1073 version_t src_version
, unsigned flags
,
1074 unsigned src_fadvise_flags
) {
1075 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_COPY_FROM
);
1076 osd_op
.op
.copy_from
.snapid
= snapid
;
1077 osd_op
.op
.copy_from
.src_version
= src_version
;
1078 osd_op
.op
.copy_from
.flags
= flags
;
1079 osd_op
.op
.copy_from
.src_fadvise_flags
= src_fadvise_flags
;
1080 ::encode(src
, osd_op
.indata
);
1081 ::encode(src_oloc
, osd_op
.indata
);
1085 * writeback content to backing tier
1087 * If object is marked dirty in the cache tier, write back content
1088 * to backing tier. If the object is clean this is a no-op.
1090 * If writeback races with an update, the update will block.
1092 * use with IGNORE_CACHE to avoid triggering promote.
1094 void cache_flush() {
1095 add_op(CEPH_OSD_OP_CACHE_FLUSH
);
1099 * writeback content to backing tier
1101 * If object is marked dirty in the cache tier, write back content
1102 * to backing tier. If the object is clean this is a no-op.
1104 * If writeback races with an update, return EAGAIN. Requires that
1105 * the SKIPRWLOCKS flag be set.
1107 * use with IGNORE_CACHE to avoid triggering promote.
1109 void cache_try_flush() {
1110 add_op(CEPH_OSD_OP_CACHE_TRY_FLUSH
);
1114 * evict object from cache tier
1116 * If object is marked clean, remove the object from the cache tier.
1117 * Otherwise, return EBUSY.
1119 * use with IGNORE_CACHE to avoid triggering promote.
1121 void cache_evict() {
1122 add_op(CEPH_OSD_OP_CACHE_EVICT
);
1128 void set_redirect(object_t tgt
, snapid_t snapid
, object_locator_t tgt_oloc
,
1129 version_t tgt_version
) {
1130 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_SET_REDIRECT
);
1131 osd_op
.op
.copy_from
.snapid
= snapid
;
1132 osd_op
.op
.copy_from
.src_version
= tgt_version
;
1133 ::encode(tgt
, osd_op
.indata
);
1134 ::encode(tgt_oloc
, osd_op
.indata
);
1137 void set_alloc_hint(uint64_t expected_object_size
,
1138 uint64_t expected_write_size
,
1140 add_alloc_hint(CEPH_OSD_OP_SETALLOCHINT
, expected_object_size
,
1141 expected_write_size
, flags
);
1143 // CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
1144 // not worth a feature bit. Set FAILOK per-op flag to make
1145 // sure older osds don't trip over an unsupported opcode.
1146 set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK
);
1149 void dup(vector
<OSDOp
>& sops
) {
1151 out_bl
.resize(sops
.size());
1152 out_handler
.resize(sops
.size());
1153 out_rval
.resize(sops
.size());
1154 for (uint32_t i
= 0; i
< sops
.size(); i
++) {
1155 out_bl
[i
] = &sops
[i
].outdata
;
1156 out_handler
[i
] = NULL
;
1157 out_rval
[i
] = &sops
[i
].rval
;
1162 * Pin/unpin an object in cache tier
1165 add_op(CEPH_OSD_OP_CACHE_PIN
);
1168 void cache_unpin() {
1169 add_op(CEPH_OSD_OP_CACHE_UNPIN
);
1177 class Objecter
: public md_config_obs_t
, public Dispatcher
{
1179 // config observer bits
1180 const char** get_tracked_conf_keys() const override
;
1181 void handle_conf_change(const struct md_config_t
*conf
,
1182 const std::set
<std::string
> &changed
) override
;
1185 Messenger
*messenger
;
1188 ZTracer::Endpoint trace_endpoint
;
1192 using Dispatcher::cct
;
1193 std::multimap
<string
,string
> crush_location
;
1195 std::atomic
<bool> initialized
{false};
1198 std::atomic
<uint64_t> last_tid
{0};
1199 std::atomic
<unsigned> inflight_ops
{0};
1200 std::atomic
<int> client_inc
{-1};
1201 uint64_t max_linger_id
;
1202 std::atomic
<unsigned> num_in_flight
{0};
1203 std::atomic
<int> global_op_flags
{0}; // flags which are applied to each IO op
1204 bool keep_balanced_budget
;
1205 bool honor_osdmap_full
;
1206 bool osdmap_full_try
;
1208 // If this is true, accumulate a set of blacklisted entities
1209 // to be drained by consume_blacklist_events.
1210 bool blacklist_events_enabled
;
1211 std::set
<entity_addr_t
> blacklist_events
;
1214 void maybe_request_map();
1216 void enable_blacklist_events();
1219 void _maybe_request_map();
1221 version_t last_seen_osdmap_version
;
1222 version_t last_seen_pgmap_version
;
1224 mutable boost::shared_mutex rwlock
;
1225 using lock_guard
= std::unique_lock
<decltype(rwlock
)>;
1226 using unique_lock
= std::unique_lock
<decltype(rwlock
)>;
1227 using shared_lock
= boost::shared_lock
<decltype(rwlock
)>;
1228 using shunique_lock
= ceph::shunique_lock
<decltype(rwlock
)>;
1229 ceph::timer
<ceph::mono_clock
> timer
;
1231 PerfCounters
*logger
;
1233 uint64_t tick_event
;
1237 void update_crush_location();
1239 class RequestStateHook
;
1241 RequestStateHook
*m_request_state_hook
;
1244 /*** track pending operations ***/
1250 struct op_target_t
{
1253 epoch_t epoch
= 0; ///< latest epoch we calculated the mapping
1256 object_locator_t base_oloc
;
1257 object_t target_oid
;
1258 object_locator_t target_oloc
;
1260 ///< true if we are directed at base_pgid, not base_oid
1261 bool precalc_pgid
= false;
1263 ///< true if we have ever mapped to a valid pool
1264 bool pool_ever_existed
= false;
1266 ///< explcit pg target, if any
1269 pg_t pgid
; ///< last (raw) pg we mapped to
1270 spg_t actual_pgid
; ///< last (actual) spg_t we mapped to
1271 unsigned pg_num
= 0; ///< last pg_num we mapped to
1272 unsigned pg_num_mask
= 0; ///< last pg_num_mask we mapped to
1273 vector
<int> up
; ///< set of up osds for last pg we mapped to
1274 vector
<int> acting
; ///< set of acting osds for last pg we mapped to
1275 int up_primary
= -1; ///< last up_primary we mapped to
1276 int acting_primary
= -1; ///< last acting_primary we mapped to
1277 int size
= -1; ///< the size of the pool when were were last mapped
1278 int min_size
= -1; ///< the min size of the pool when were were last mapped
1279 bool sort_bitwise
= false; ///< whether the hobject_t sort order is bitwise
1280 bool recovery_deletes
= false; ///< whether the deletes are performed during recovery instead of peering
1282 bool used_replica
= false;
1283 bool paused
= false;
1285 int osd
= -1; ///< the final target osd, or -1
1287 epoch_t last_force_resend
= 0;
1289 op_target_t(object_t oid
, object_locator_t oloc
, int flags
)
1295 op_target_t(pg_t pgid
)
1296 : base_oloc(pgid
.pool(), pgid
.ps()),
1301 op_target_t() = default;
1303 hobject_t
get_hobj() {
1304 return hobject_t(target_oid
,
1307 target_oloc
.hash
>= 0 ? target_oloc
.hash
: pgid
.ps(),
1309 target_oloc
.nspace
);
1312 bool contained_by(const hobject_t
& begin
, const hobject_t
& end
) {
1313 hobject_t h
= get_hobj();
1314 int r
= cmp(h
, begin
);
1315 return r
== 0 || (r
> 0 && h
< end
);
1318 void dump(Formatter
*f
) const;
1321 struct Op
: public RefCountedObject
{
1322 OSDSession
*session
;
1327 ConnectionRef con
; // for rx buffer only
1328 uint64_t features
; // explicitly specified op features
1334 ceph::real_time mtime
;
1337 vector
<bufferlist
*> out_bl
;
1338 vector
<Context
*> out_handler
;
1339 vector
<int*> out_rval
;
1349 epoch_t
*reply_epoch
;
1351 ceph::mono_time stamp
;
1353 epoch_t map_dne_bound
;
1357 /// true if we should resend this message on failure
1360 /// true if the throttle budget is get/put on a series of OPs,
1361 /// instead of per OP basis, when this flag is set, the budget is
1362 /// acquired before sending the very first OP of the series and
1363 /// released upon receiving the last OP reply.
1368 osd_reqid_t reqid
; // explicitly setting reqid
1369 ZTracer::Trace trace
;
1371 Op(const object_t
& o
, const object_locator_t
& ol
, vector
<OSDOp
>& op
,
1372 int f
, Context
*fin
, version_t
*ov
, int *offset
= NULL
,
1373 ZTracer::Trace
*parent_trace
= nullptr) :
1374 session(NULL
), incarnation(0),
1377 features(CEPH_FEATURES_SUPPORTED_DEFAULT
),
1378 snapid(CEPH_NOSNAP
),
1389 should_resend(true),
1390 ctx_budgeted(false),
1391 data_offset(offset
) {
1394 /* initialize out_* to match op vector */
1395 out_bl
.resize(ops
.size());
1396 out_rval
.resize(ops
.size());
1397 out_handler
.resize(ops
.size());
1398 for (unsigned i
= 0; i
< ops
.size(); i
++) {
1400 out_handler
[i
] = NULL
;
1404 if (target
.base_oloc
.key
== o
)
1405 target
.base_oloc
.key
.clear();
1407 if (parent_trace
&& parent_trace
->valid()) {
1408 trace
.init("op", nullptr, parent_trace
);
1409 trace
.event("start");
1413 bool operator<(const Op
& other
) const {
1414 return tid
< other
.tid
;
1417 bool respects_full() const {
1419 (target
.flags
& (CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_RWORDERED
)) &&
1420 !(target
.flags
& (CEPH_OSD_FLAG_FULL_TRY
| CEPH_OSD_FLAG_FULL_FORCE
));
1425 while (!out_handler
.empty()) {
1426 delete out_handler
.back();
1427 out_handler
.pop_back();
1429 trace
.event("finish");
1433 struct C_Op_Map_Latest
: public Context
{
1437 C_Op_Map_Latest(Objecter
*o
, ceph_tid_t t
) : objecter(o
), tid(t
),
1439 void finish(int r
) override
;
1442 struct C_Command_Map_Latest
: public Context
{
1446 C_Command_Map_Latest(Objecter
*o
, ceph_tid_t t
) : objecter(o
), tid(t
),
1448 void finish(int r
) override
;
1451 struct C_Stat
: public Context
{
1454 ceph::real_time
*pmtime
;
1456 C_Stat(uint64_t *ps
, ceph::real_time
*pm
, Context
*c
) :
1457 psize(ps
), pmtime(pm
), fin(c
) {}
1458 void finish(int r
) override
{
1460 bufferlist::iterator p
= bl
.begin();
1474 struct C_GetAttrs
: public Context
{
1476 map
<string
,bufferlist
>& attrset
;
1478 C_GetAttrs(map
<string
, bufferlist
>& set
, Context
*c
) : attrset(set
),
1480 void finish(int r
) override
{
1482 bufferlist::iterator p
= bl
.begin();
1483 ::decode(attrset
, p
);
1490 // Pools and statistics
1491 struct NListContext
{
1492 collection_list_handle_t pos
;
1494 // these are for !sortbitwise compat only
1496 int starting_pg_num
= 0;
1497 bool sort_bitwise
= false;
1499 bool at_end_of_pool
= false; ///< publicly visible end flag
1501 int64_t pool_id
= -1;
1502 int pool_snap_seq
= 0;
1503 uint64_t max_entries
= 0;
1506 bufferlist bl
; // raw data read to here
1507 std::list
<librados::ListObjectImpl
> list
;
1511 bufferlist extra_info
;
1513 // The budget associated with this context, once it is set (>= 0),
1514 // the budget is not get/released on OP basis, instead the budget
1515 // is acquired before sending the first OP and released upon receiving
1516 // the last op reply.
1517 int ctx_budget
= -1;
1519 bool at_end() const {
1520 return at_end_of_pool
;
1523 uint32_t get_pg_hash_position() const {
1524 return pos
.get_hash();
1528 struct C_NList
: public Context
{
1529 NListContext
*list_context
;
1530 Context
*final_finish
;
1533 C_NList(NListContext
*lc
, Context
* finish
, Objecter
*ob
) :
1534 list_context(lc
), final_finish(finish
), objecter(ob
), epoch(0) {}
1535 void finish(int r
) override
{
1537 objecter
->_nlist_reply(list_context
, r
, final_finish
, epoch
);
1539 final_finish
->complete(r
);
1548 map
<string
,pool_stat_t
> *pool_stats
;
1552 ceph::mono_time last_submit
;
1557 struct ceph_statfs
*stats
;
1558 boost::optional
<int64_t> data_pool
;
1562 ceph::mono_time last_submit
;
1577 ceph::mono_time last_submit
;
1578 PoolOp() : tid(0), pool(0), onfinish(NULL
), ontimeout(0), pool_op(0),
1579 auid(0), crush_rule(0), snapid(0), blp(NULL
) {}
1582 // -- osd commands --
1583 struct CommandOp
: public RefCountedObject
{
1584 OSDSession
*session
= nullptr;
1588 bufferlist
*poutbl
= nullptr;
1589 string
*prs
= nullptr;
1591 // target_osd == -1 means target_pg is valid
1592 const int target_osd
= -1;
1593 const pg_t target_pg
;
1597 epoch_t map_dne_bound
= 0;
1598 int map_check_error
= 0; // error to return if map check fails
1599 const char *map_check_error_str
= nullptr;
1601 Context
*onfinish
= nullptr;
1602 uint64_t ontimeout
= 0;
1603 ceph::mono_time last_submit
;
1607 const vector
<string
> &cmd
,
1616 target_osd(target_osd
),
1617 onfinish(onfinish
) {}
1621 const vector
<string
> &cmd
,
1632 onfinish(onfinish
) {}
1636 void submit_command(CommandOp
*c
, ceph_tid_t
*ptid
);
1637 int _calc_command_target(CommandOp
*c
, shunique_lock
&sul
);
1638 void _assign_command_session(CommandOp
*c
, shunique_lock
&sul
);
1639 void _send_command(CommandOp
*c
);
1640 int command_op_cancel(OSDSession
*s
, ceph_tid_t tid
, int r
);
1641 void _finish_command(CommandOp
*c
, int r
, string rs
);
1642 void handle_command_reply(MCommandReply
*m
);
1645 // -- lingering ops --
1647 struct WatchContext
{
1648 // this simply mirrors librados WatchCtx2
1649 virtual void handle_notify(uint64_t notify_id
,
1651 uint64_t notifier_id
,
1652 bufferlist
& bl
) = 0;
1653 virtual void handle_error(uint64_t cookie
, int err
) = 0;
1654 virtual ~WatchContext() {}
1657 struct LingerOp
: public RefCountedObject
{
1664 ceph::real_time mtime
;
1672 ceph::mono_time watch_valid_thru
; ///< send time for last acked ping
1673 int last_error
; ///< error from last failed ping|reconnect, if any
1674 boost::shared_mutex watch_lock
;
1675 using lock_guard
= std::unique_lock
<decltype(watch_lock
)>;
1676 using unique_lock
= std::unique_lock
<decltype(watch_lock
)>;
1677 using shared_lock
= boost::shared_lock
<decltype(watch_lock
)>;
1678 using shunique_lock
= ceph::shunique_lock
<decltype(watch_lock
)>;
1680 // queue of pending async operations, with the timestamp of
1681 // when they were queued.
1682 list
<ceph::mono_time
> watch_pending_async
;
1684 uint32_t register_gen
;
1687 Context
*on_reg_commit
;
1689 // we trigger these from an async finisher
1690 Context
*on_notify_finish
;
1691 bufferlist
*notify_result_bl
;
1694 WatchContext
*watch_context
;
1696 OSDSession
*session
;
1698 ceph_tid_t register_tid
;
1699 ceph_tid_t ping_tid
;
1700 epoch_t map_dne_bound
;
1702 void _queued_async() {
1703 // watch_lock ust be locked unique
1704 watch_pending_async
.push_back(ceph::mono_clock::now());
1706 void finished_async() {
1707 unique_lock
l(watch_lock
);
1708 assert(!watch_pending_async
.empty());
1709 watch_pending_async
.pop_front();
1712 LingerOp() : linger_id(0),
1713 target(object_t(), object_locator_t(), 0),
1714 snap(CEPH_NOSNAP
), poutbl(NULL
), pobjver(NULL
),
1715 is_watch(false), last_error(0),
1719 on_reg_commit(NULL
),
1720 on_notify_finish(NULL
),
1721 notify_result_bl(NULL
),
1723 watch_context(NULL
),
1730 const LingerOp
&operator=(const LingerOp
& r
);
1731 LingerOp(const LingerOp
& o
);
1733 uint64_t get_cookie() {
1734 return reinterpret_cast<uint64_t>(this);
1738 ~LingerOp() override
{
1739 delete watch_context
;
1743 struct C_Linger_Commit
: public Context
{
1746 bufferlist outbl
; // used for notify only
1747 C_Linger_Commit(Objecter
*o
, LingerOp
*l
) : objecter(o
), info(l
) {
1750 ~C_Linger_Commit() override
{
1753 void finish(int r
) override
{
1754 objecter
->_linger_commit(info
, r
, outbl
);
1758 struct C_Linger_Reconnect
: public Context
{
1761 C_Linger_Reconnect(Objecter
*o
, LingerOp
*l
) : objecter(o
), info(l
) {
1764 ~C_Linger_Reconnect() override
{
1767 void finish(int r
) override
{
1768 objecter
->_linger_reconnect(info
, r
);
1772 struct C_Linger_Ping
: public Context
{
1775 ceph::mono_time sent
;
1776 uint32_t register_gen
;
1777 C_Linger_Ping(Objecter
*o
, LingerOp
*l
)
1778 : objecter(o
), info(l
), register_gen(info
->register_gen
) {
1781 ~C_Linger_Ping() override
{
1784 void finish(int r
) override
{
1785 objecter
->_linger_ping(info
, r
, sent
, register_gen
);
1789 struct C_Linger_Map_Latest
: public Context
{
1793 C_Linger_Map_Latest(Objecter
*o
, uint64_t id
) :
1794 objecter(o
), linger_id(id
), latest(0) {}
1795 void finish(int r
) override
;
1798 // -- osd sessions --
1802 hobject_t begin
, end
;
1805 struct OSDSession
: public RefCountedObject
{
1806 boost::shared_mutex lock
;
1807 using lock_guard
= std::lock_guard
<decltype(lock
)>;
1808 using unique_lock
= std::unique_lock
<decltype(lock
)>;
1809 using shared_lock
= boost::shared_lock
<decltype(lock
)>;
1810 using shunique_lock
= ceph::shunique_lock
<decltype(lock
)>;
1813 map
<ceph_tid_t
,Op
*> ops
;
1814 map
<uint64_t, LingerOp
*> linger_ops
;
1815 map
<ceph_tid_t
,CommandOp
*> command_ops
;
1818 map
<spg_t
,map
<hobject_t
,OSDBackoff
>> backoffs
;
1819 map
<uint64_t,OSDBackoff
*> backoffs_by_id
;
1825 std::unique_ptr
<std::mutex
[]> completion_locks
;
1826 using unique_completion_lock
= std::unique_lock
<
1827 decltype(completion_locks
)::element_type
>;
1830 OSDSession(CephContext
*cct
, int o
) :
1831 osd(o
), incarnation(0), con(NULL
),
1832 num_locks(cct
->_conf
->objecter_completion_locks_per_session
),
1833 completion_locks(new std::mutex
[num_locks
]) {}
1835 ~OSDSession() override
;
1837 bool is_homeless() { return (osd
== -1); }
1839 unique_completion_lock
get_lock(object_t
& oid
);
1841 map
<int,OSDSession
*> osd_sessions
;
1843 bool osdmap_full_flag() const;
1844 bool osdmap_pool_full(const int64_t pool_id
) const;
1849 * Test pg_pool_t::FLAG_FULL on a pool
1851 * @return true if the pool exists and has the flag set, or
1852 * the global full flag is set, else false
1854 bool _osdmap_pool_full(const int64_t pool_id
) const;
1855 bool _osdmap_pool_full(const pg_pool_t
&p
) const;
1856 void update_pool_full_map(map
<int64_t, bool>& pool_full_map
);
1858 map
<uint64_t, LingerOp
*> linger_ops
;
1859 // we use this just to confirm a cookie is valid before dereferencing the ptr
1860 set
<LingerOp
*> linger_ops_set
;
1862 map
<ceph_tid_t
,PoolStatOp
*> poolstat_ops
;
1863 map
<ceph_tid_t
,StatfsOp
*> statfs_ops
;
1864 map
<ceph_tid_t
,PoolOp
*> pool_ops
;
1865 std::atomic
<unsigned> num_homeless_ops
{0};
1867 OSDSession
*homeless_session
;
1869 // ops waiting for an osdmap with a new pool or confirmation that
1870 // the pool does not exist (may be expanded to other uses later)
1871 map
<uint64_t, LingerOp
*> check_latest_map_lingers
;
1872 map
<ceph_tid_t
, Op
*> check_latest_map_ops
;
1873 map
<ceph_tid_t
, CommandOp
*> check_latest_map_commands
;
1875 map
<epoch_t
,list
< pair
<Context
*, int> > > waiting_for_map
;
1877 ceph::timespan mon_timeout
;
1878 ceph::timespan osd_timeout
;
1880 MOSDOp
*_prepare_osd_op(Op
*op
);
1881 void _send_op(Op
*op
, MOSDOp
*m
= NULL
);
1882 void _send_op_account(Op
*op
);
1883 void _cancel_linger_op(Op
*op
);
1884 void finish_op(OSDSession
*session
, ceph_tid_t tid
);
1885 void _finish_op(Op
*op
, int r
);
1886 static bool is_pg_changed(
1888 const vector
<int>& oldacting
,
1890 const vector
<int>& newacting
,
1891 bool any_change
=false);
1892 enum recalc_op_target_result
{
1893 RECALC_OP_TARGET_NO_ACTION
= 0,
1894 RECALC_OP_TARGET_NEED_RESEND
,
1895 RECALC_OP_TARGET_POOL_DNE
,
1896 RECALC_OP_TARGET_OSD_DNE
,
1897 RECALC_OP_TARGET_OSD_DOWN
,
1899 bool _osdmap_full_flag() const;
1900 bool _osdmap_has_pool_full() const;
1902 bool target_should_be_paused(op_target_t
*op
);
1903 int _calc_target(op_target_t
*t
, Connection
*con
,
1904 bool any_change
= false);
1905 int _map_session(op_target_t
*op
, OSDSession
**s
,
1908 void _session_op_assign(OSDSession
*s
, Op
*op
);
1909 void _session_op_remove(OSDSession
*s
, Op
*op
);
1910 void _session_linger_op_assign(OSDSession
*to
, LingerOp
*op
);
1911 void _session_linger_op_remove(OSDSession
*from
, LingerOp
*op
);
1912 void _session_command_op_assign(OSDSession
*to
, CommandOp
*op
);
1913 void _session_command_op_remove(OSDSession
*from
, CommandOp
*op
);
1915 int _assign_op_target_session(Op
*op
, shunique_lock
& lc
,
1916 bool src_session_locked
,
1917 bool dst_session_locked
);
1918 int _recalc_linger_op_target(LingerOp
*op
, shunique_lock
& lc
);
1920 void _linger_submit(LingerOp
*info
, shunique_lock
& sul
);
1921 void _send_linger(LingerOp
*info
, shunique_lock
& sul
);
1922 void _linger_commit(LingerOp
*info
, int r
, bufferlist
& outbl
);
1923 void _linger_reconnect(LingerOp
*info
, int r
);
1924 void _send_linger_ping(LingerOp
*info
);
1925 void _linger_ping(LingerOp
*info
, int r
, ceph::mono_time sent
,
1926 uint32_t register_gen
);
1927 int _normalize_watch_error(int r
);
1929 friend class C_DoWatchError
;
1931 void linger_callback_flush(Context
*ctx
) {
1932 finisher
->queue(ctx
);
1936 void _check_op_pool_dne(Op
*op
, unique_lock
*sl
);
1937 void _send_op_map_check(Op
*op
);
1938 void _op_cancel_map_check(Op
*op
);
1939 void _check_linger_pool_dne(LingerOp
*op
, bool *need_unregister
);
1940 void _send_linger_map_check(LingerOp
*op
);
1941 void _linger_cancel_map_check(LingerOp
*op
);
1942 void _check_command_map_dne(CommandOp
*op
);
1943 void _send_command_map_check(CommandOp
*op
);
1944 void _command_cancel_map_check(CommandOp
*op
);
1946 void kick_requests(OSDSession
*session
);
1947 void _kick_requests(OSDSession
*session
, map
<uint64_t, LingerOp
*>& lresend
);
1948 void _linger_ops_resend(map
<uint64_t, LingerOp
*>& lresend
, unique_lock
& ul
);
1950 int _get_session(int osd
, OSDSession
**session
, shunique_lock
& sul
);
1951 void put_session(OSDSession
*s
);
1952 void get_session(OSDSession
*s
);
1953 void _reopen_session(OSDSession
*session
);
1954 void close_session(OSDSession
*session
);
1956 void _nlist_reply(NListContext
*list_context
, int r
, Context
*final_finish
,
1957 epoch_t reply_epoch
);
1959 void resend_mon_ops();
1962 * handle a budget for in-flight ops
1963 * budget is taken whenever an op goes into the ops map
1964 * and returned whenever an op is removed from the map
1965 * If throttle_op needs to throttle it will unlock client_lock.
1967 int calc_op_budget(Op
*op
);
1968 void _throttle_op(Op
*op
, shunique_lock
& sul
, int op_size
= 0);
1969 int _take_op_budget(Op
*op
, shunique_lock
& sul
) {
1970 assert(sul
&& sul
.mutex() == &rwlock
);
1971 int op_budget
= calc_op_budget(op
);
1972 if (keep_balanced_budget
) {
1973 _throttle_op(op
, sul
, op_budget
);
1975 op_throttle_bytes
.take(op_budget
);
1976 op_throttle_ops
.take(1);
1978 op
->budgeted
= true;
1981 void put_op_budget_bytes(int op_budget
) {
1982 assert(op_budget
>= 0);
1983 op_throttle_bytes
.put(op_budget
);
1984 op_throttle_ops
.put(1);
1986 void put_op_budget(Op
*op
) {
1987 assert(op
->budgeted
);
1988 int op_budget
= calc_op_budget(op
);
1989 put_op_budget_bytes(op_budget
);
1991 void put_nlist_context_budget(NListContext
*list_context
);
1992 Throttle op_throttle_bytes
, op_throttle_ops
;
1995 Objecter(CephContext
*cct_
, Messenger
*m
, MonClient
*mc
,
1998 double osd_timeout
) :
1999 Dispatcher(cct_
), messenger(m
), monc(mc
), finisher(fin
),
2000 trace_endpoint("0.0.0.0", 0, "Objecter"),
2003 keep_balanced_budget(false), honor_osdmap_full(true), osdmap_full_try(false),
2004 blacklist_events_enabled(false),
2005 last_seen_osdmap_version(0), last_seen_pgmap_version(0),
2006 logger(NULL
), tick_event(0), m_request_state_hook(NULL
),
2007 homeless_session(new OSDSession(cct
, -1)),
2008 mon_timeout(ceph::make_timespan(mon_timeout
)),
2009 osd_timeout(ceph::make_timespan(osd_timeout
)),
2010 op_throttle_bytes(cct
, "objecter_bytes",
2011 cct
->_conf
->objecter_inflight_op_bytes
),
2012 op_throttle_ops(cct
, "objecter_ops", cct
->_conf
->objecter_inflight_ops
),
2014 retry_writes_after_first_reply(cct
->_conf
->objecter_retry_writes_after_first_reply
)
2016 ~Objecter() override
;
2019 void start(const OSDMap
*o
= nullptr);
2022 // These two templates replace osdmap_(get)|(put)_read. Simply wrap
2023 // whatever functionality you want to use the OSDMap in a lambda like:
2025 // with_osdmap([](const OSDMap& o) { o.do_stuff(); });
2029 // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); });
2031 // Do not call into something that will try to lock the OSDMap from
2032 // here or you will have great woe and misery.
2034 template<typename Callback
, typename
...Args
>
2035 auto with_osdmap(Callback
&& cb
, Args
&&... args
) const ->
2036 decltype(cb(*osdmap
, std::forward
<Args
>(args
)...)) {
2037 shared_lock
l(rwlock
);
2038 return std::forward
<Callback
>(cb
)(*osdmap
, std::forward
<Args
>(args
)...);
2043 * Tell the objecter to throttle outgoing ops according to its
2044 * budget (in _conf). If you do this, ops can block, in
2045 * which case it will unlock client_lock and sleep until
2046 * incoming messages reduce the used budget low enough for
2047 * the ops to continue going; then it will lock client_lock again.
2049 void set_balanced_budget() { keep_balanced_budget
= true; }
2050 void unset_balanced_budget() { keep_balanced_budget
= false; }
2052 void set_honor_osdmap_full() { honor_osdmap_full
= true; }
2053 void unset_honor_osdmap_full() { honor_osdmap_full
= false; }
2055 void set_osdmap_full_try() { osdmap_full_try
= true; }
2056 void unset_osdmap_full_try() { osdmap_full_try
= false; }
2058 void _scan_requests(OSDSession
*s
,
2061 map
<int64_t, bool> *pool_full_map
,
2062 map
<ceph_tid_t
, Op
*>& need_resend
,
2063 list
<LingerOp
*>& need_resend_linger
,
2064 map
<ceph_tid_t
, CommandOp
*>& need_resend_command
,
2065 shunique_lock
& sul
);
2067 int64_t get_object_hash_position(int64_t pool
, const string
& key
,
2069 int64_t get_object_pg_hash_position(int64_t pool
, const string
& key
,
2074 bool ms_dispatch(Message
*m
) override
;
2075 bool ms_can_fast_dispatch_any() const override
{
2078 bool ms_can_fast_dispatch(const Message
*m
) const override
{
2079 switch (m
->get_type()) {
2080 case CEPH_MSG_OSD_OPREPLY
:
2081 case CEPH_MSG_WATCH_NOTIFY
:
2087 void ms_fast_dispatch(Message
*m
) override
{
2088 if (!ms_dispatch(m
)) {
2093 void handle_osd_op_reply(class MOSDOpReply
*m
);
2094 void handle_osd_backoff(class MOSDBackoff
*m
);
2095 void handle_watch_notify(class MWatchNotify
*m
);
2096 void handle_osd_map(class MOSDMap
*m
);
2097 void wait_for_osd_map();
2100 * Get list of entities blacklisted since this was last called,
2101 * and reset the list.
2103 * Uses a std::set because typical use case is to compare some
2104 * other list of clients to see which overlap with the blacklisted
2108 void consume_blacklist_events(std::set
<entity_addr_t
> *events
);
2110 int pool_snap_by_name(int64_t poolid
,
2111 const char *snap_name
,
2112 snapid_t
*snap
) const;
2113 int pool_snap_get_info(int64_t poolid
, snapid_t snap
,
2114 pool_snap_info_t
*info
) const;
2115 int pool_snap_list(int64_t poolid
, vector
<uint64_t> *snaps
);
2118 void emit_blacklist_events(const OSDMap::Incremental
&inc
);
2119 void emit_blacklist_events(const OSDMap
&old_osd_map
,
2120 const OSDMap
&new_osd_map
);
2123 void _op_submit(Op
*op
, shunique_lock
& lc
, ceph_tid_t
*ptid
);
2124 void _op_submit_with_budget(Op
*op
, shunique_lock
& lc
,
2126 int *ctx_budget
= NULL
);
2127 inline void unregister_op(Op
*op
);
2131 void op_submit(Op
*op
, ceph_tid_t
*ptid
= NULL
, int *ctx_budget
= NULL
);
2133 shared_lock
l(rwlock
);
2134 return !((!inflight_ops
) && linger_ops
.empty() &&
2135 poolstat_ops
.empty() && statfs_ops
.empty());
2139 * Output in-flight requests
2141 void _dump_active(OSDSession
*s
);
2142 void _dump_active();
2144 void dump_requests(Formatter
*fmt
);
2145 void _dump_ops(const OSDSession
*s
, Formatter
*fmt
);
2146 void dump_ops(Formatter
*fmt
);
2147 void _dump_linger_ops(const OSDSession
*s
, Formatter
*fmt
);
2148 void dump_linger_ops(Formatter
*fmt
);
2149 void _dump_command_ops(const OSDSession
*s
, Formatter
*fmt
);
2150 void dump_command_ops(Formatter
*fmt
);
2151 void dump_pool_ops(Formatter
*fmt
) const;
2152 void dump_pool_stat_ops(Formatter
*fmt
) const;
2153 void dump_statfs_ops(Formatter
*fmt
) const;
2155 int get_client_incarnation() const { return client_inc
; }
2156 void set_client_incarnation(int inc
) { client_inc
= inc
; }
2158 bool have_map(epoch_t epoch
);
2159 /// wait for epoch; true if we already have it
2160 bool wait_for_map(epoch_t epoch
, Context
*c
, int err
=0);
2161 void _wait_for_new_map(Context
*c
, epoch_t epoch
, int err
=0);
2162 void wait_for_latest_osdmap(Context
*fin
);
2163 void get_latest_version(epoch_t oldest
, epoch_t neweset
, Context
*fin
);
2164 void _get_latest_version(epoch_t oldest
, epoch_t neweset
, Context
*fin
);
2166 /** Get the current set of global op flags */
2167 int get_global_op_flags() const { return global_op_flags
; }
2168 /** Add a flag to the global op flags, not really atomic operation */
2169 void add_global_op_flags(int flag
) {
2170 global_op_flags
.fetch_or(flag
);
2172 /** Clear the passed flags from the global op flag set */
2173 void clear_global_op_flag(int flags
) {
2174 global_op_flags
.fetch_and(~flags
);
2177 /// cancel an in-progress request with the given return code
2179 int op_cancel(OSDSession
*s
, ceph_tid_t tid
, int r
);
2180 int _op_cancel(ceph_tid_t tid
, int r
);
2182 int op_cancel(ceph_tid_t tid
, int r
);
2185 * Any write op which is in progress at the start of this call shall no
2186 * longer be in progress when this call ends. Operations started after the
2187 * start of this call may still be in progress when this call ends.
2189 * @return the latest possible epoch in which a cancelled op could have
2190 * existed, or -1 if nothing was cancelled.
2192 epoch_t
op_cancel_writes(int r
, int64_t pool
=-1);
2195 void osd_command(int osd
, const std::vector
<string
>& cmd
,
2196 const bufferlist
& inbl
, ceph_tid_t
*ptid
,
2197 bufferlist
*poutbl
, string
*prs
, Context
*onfinish
) {
2199 CommandOp
*c
= new CommandOp(
2206 submit_command(c
, ptid
);
2208 void pg_command(pg_t pgid
, const vector
<string
>& cmd
,
2209 const bufferlist
& inbl
, ceph_tid_t
*ptid
,
2210 bufferlist
*poutbl
, string
*prs
, Context
*onfinish
) {
2211 CommandOp
*c
= new CommandOp(
2218 submit_command(c
, ptid
);
2221 // mid-level helpers
2222 Op
*prepare_mutate_op(
2223 const object_t
& oid
, const object_locator_t
& oloc
,
2224 ObjectOperation
& op
, const SnapContext
& snapc
,
2225 ceph::real_time mtime
, int flags
,
2226 Context
*oncommit
, version_t
*objver
= NULL
,
2227 osd_reqid_t reqid
= osd_reqid_t(),
2228 ZTracer::Trace
*parent_trace
= nullptr) {
2229 Op
*o
= new Op(oid
, oloc
, op
.ops
, flags
| global_op_flags
|
2230 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
, nullptr, parent_trace
);
2231 o
->priority
= op
.priority
;
2234 o
->out_rval
.swap(op
.out_rval
);
2239 const object_t
& oid
, const object_locator_t
& oloc
,
2240 ObjectOperation
& op
, const SnapContext
& snapc
,
2241 ceph::real_time mtime
, int flags
,
2242 Context
*oncommit
, version_t
*objver
= NULL
,
2243 osd_reqid_t reqid
= osd_reqid_t()) {
2244 Op
*o
= prepare_mutate_op(oid
, oloc
, op
, snapc
, mtime
, flags
,
2245 oncommit
, objver
, reqid
);
2250 Op
*prepare_read_op(
2251 const object_t
& oid
, const object_locator_t
& oloc
,
2252 ObjectOperation
& op
,
2253 snapid_t snapid
, bufferlist
*pbl
, int flags
,
2254 Context
*onack
, version_t
*objver
= NULL
,
2255 int *data_offset
= NULL
,
2256 uint64_t features
= 0,
2257 ZTracer::Trace
*parent_trace
= nullptr) {
2258 Op
*o
= new Op(oid
, oloc
, op
.ops
, flags
| global_op_flags
|
2259 CEPH_OSD_FLAG_READ
, onack
, objver
, data_offset
, parent_trace
);
2260 o
->priority
= op
.priority
;
2263 if (!o
->outbl
&& op
.size() == 1 && op
.out_bl
[0]->length())
2264 o
->outbl
= op
.out_bl
[0];
2265 o
->out_bl
.swap(op
.out_bl
);
2266 o
->out_handler
.swap(op
.out_handler
);
2267 o
->out_rval
.swap(op
.out_rval
);
2271 const object_t
& oid
, const object_locator_t
& oloc
,
2272 ObjectOperation
& op
,
2273 snapid_t snapid
, bufferlist
*pbl
, int flags
,
2274 Context
*onack
, version_t
*objver
= NULL
,
2275 int *data_offset
= NULL
,
2276 uint64_t features
= 0) {
2277 Op
*o
= prepare_read_op(oid
, oloc
, op
, snapid
, pbl
, flags
, onack
, objver
,
2280 o
->features
= features
;
2285 Op
*prepare_pg_read_op(
2286 uint32_t hash
, object_locator_t oloc
,
2287 ObjectOperation
& op
, bufferlist
*pbl
, int flags
,
2288 Context
*onack
, epoch_t
*reply_epoch
,
2290 Op
*o
= new Op(object_t(), oloc
,
2292 flags
| global_op_flags
| CEPH_OSD_FLAG_READ
|
2293 CEPH_OSD_FLAG_IGNORE_OVERLAY
,
2295 o
->target
.precalc_pgid
= true;
2296 o
->target
.base_pgid
= pg_t(hash
, oloc
.pool
);
2297 o
->priority
= op
.priority
;
2298 o
->snapid
= CEPH_NOSNAP
;
2300 o
->out_bl
.swap(op
.out_bl
);
2301 o
->out_handler
.swap(op
.out_handler
);
2302 o
->out_rval
.swap(op
.out_rval
);
2303 o
->reply_epoch
= reply_epoch
;
2305 // budget is tracked by listing context
2306 o
->ctx_budgeted
= true;
2311 uint32_t hash
, object_locator_t oloc
,
2312 ObjectOperation
& op
, bufferlist
*pbl
, int flags
,
2313 Context
*onack
, epoch_t
*reply_epoch
,
2315 Op
*o
= prepare_pg_read_op(hash
, oloc
, op
, pbl
, flags
,
2316 onack
, reply_epoch
, ctx_budget
);
2318 op_submit(o
, &tid
, ctx_budget
);
2322 // caller owns a ref
2323 LingerOp
*linger_register(const object_t
& oid
, const object_locator_t
& oloc
,
2325 ceph_tid_t
linger_watch(LingerOp
*info
,
2326 ObjectOperation
& op
,
2327 const SnapContext
& snapc
, ceph::real_time mtime
,
2331 ceph_tid_t
linger_notify(LingerOp
*info
,
2332 ObjectOperation
& op
,
2333 snapid_t snap
, bufferlist
& inbl
,
2337 int linger_check(LingerOp
*info
);
2338 void linger_cancel(LingerOp
*info
); // releases a reference
2339 void _linger_cancel(LingerOp
*info
);
2341 void _do_watch_notify(LingerOp
*info
, MWatchNotify
*m
);
2344 * set up initial ops in the op vector, and allocate a final op slot.
2346 * The caller is responsible for filling in the final ops_count ops.
2348 * @param ops op vector
2349 * @param ops_count number of final ops the caller will fill in
2350 * @param extra_ops pointer to [array of] initial op[s]
2351 * @return index of final op (for caller to fill in)
2353 int init_ops(vector
<OSDOp
>& ops
, int ops_count
, ObjectOperation
*extra_ops
) {
2358 extra
= extra_ops
->ops
.size();
2360 ops
.resize(ops_count
+ extra
);
2362 for (i
=0; i
<extra
; i
++) {
2363 ops
[i
] = extra_ops
->ops
[i
];
2370 // high-level helpers
2371 Op
*prepare_stat_op(
2372 const object_t
& oid
, const object_locator_t
& oloc
,
2373 snapid_t snap
, uint64_t *psize
, ceph::real_time
*pmtime
,
2374 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2375 ObjectOperation
*extra_ops
= NULL
) {
2377 int i
= init_ops(ops
, 1, extra_ops
);
2378 ops
[i
].op
.op
= CEPH_OSD_OP_STAT
;
2379 C_Stat
*fin
= new C_Stat(psize
, pmtime
, onfinish
);
2380 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2381 CEPH_OSD_FLAG_READ
, fin
, objver
);
2383 o
->outbl
= &fin
->bl
;
2387 const object_t
& oid
, const object_locator_t
& oloc
,
2388 snapid_t snap
, uint64_t *psize
, ceph::real_time
*pmtime
,
2389 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2390 ObjectOperation
*extra_ops
= NULL
) {
2391 Op
*o
= prepare_stat_op(oid
, oloc
, snap
, psize
, pmtime
, flags
,
2392 onfinish
, objver
, extra_ops
);
2398 Op
*prepare_read_op(
2399 const object_t
& oid
, const object_locator_t
& oloc
,
2400 uint64_t off
, uint64_t len
, snapid_t snap
, bufferlist
*pbl
,
2401 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2402 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0,
2403 ZTracer::Trace
*parent_trace
= nullptr) {
2405 int i
= init_ops(ops
, 1, extra_ops
);
2406 ops
[i
].op
.op
= CEPH_OSD_OP_READ
;
2407 ops
[i
].op
.extent
.offset
= off
;
2408 ops
[i
].op
.extent
.length
= len
;
2409 ops
[i
].op
.extent
.truncate_size
= 0;
2410 ops
[i
].op
.extent
.truncate_seq
= 0;
2411 ops
[i
].op
.flags
= op_flags
;
2412 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2413 CEPH_OSD_FLAG_READ
, onfinish
, objver
, nullptr, parent_trace
);
2419 const object_t
& oid
, const object_locator_t
& oloc
,
2420 uint64_t off
, uint64_t len
, snapid_t snap
, bufferlist
*pbl
,
2421 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2422 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2423 Op
*o
= prepare_read_op(oid
, oloc
, off
, len
, snap
, pbl
, flags
,
2424 onfinish
, objver
, extra_ops
, op_flags
);
2430 Op
*prepare_cmpext_op(
2431 const object_t
& oid
, const object_locator_t
& oloc
,
2432 uint64_t off
, bufferlist
&cmp_bl
,
2433 snapid_t snap
, int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2434 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2436 int i
= init_ops(ops
, 1, extra_ops
);
2437 ops
[i
].op
.op
= CEPH_OSD_OP_CMPEXT
;
2438 ops
[i
].op
.extent
.offset
= off
;
2439 ops
[i
].op
.extent
.length
= cmp_bl
.length();
2440 ops
[i
].op
.extent
.truncate_size
= 0;
2441 ops
[i
].op
.extent
.truncate_seq
= 0;
2442 ops
[i
].indata
= cmp_bl
;
2443 ops
[i
].op
.flags
= op_flags
;
2444 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2445 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
2451 const object_t
& oid
, const object_locator_t
& oloc
,
2452 uint64_t off
, bufferlist
&cmp_bl
,
2453 snapid_t snap
, int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2454 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2455 Op
*o
= prepare_cmpext_op(oid
, oloc
, off
, cmp_bl
, snap
,
2456 flags
, onfinish
, objver
, extra_ops
, op_flags
);
2462 ceph_tid_t
read_trunc(const object_t
& oid
, const object_locator_t
& oloc
,
2463 uint64_t off
, uint64_t len
, snapid_t snap
,
2464 bufferlist
*pbl
, int flags
, uint64_t trunc_size
,
2465 __u32 trunc_seq
, Context
*onfinish
,
2466 version_t
*objver
= NULL
,
2467 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2469 int i
= init_ops(ops
, 1, extra_ops
);
2470 ops
[i
].op
.op
= CEPH_OSD_OP_READ
;
2471 ops
[i
].op
.extent
.offset
= off
;
2472 ops
[i
].op
.extent
.length
= len
;
2473 ops
[i
].op
.extent
.truncate_size
= trunc_size
;
2474 ops
[i
].op
.extent
.truncate_seq
= trunc_seq
;
2475 ops
[i
].op
.flags
= op_flags
;
2476 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2477 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
2484 ceph_tid_t
mapext(const object_t
& oid
, const object_locator_t
& oloc
,
2485 uint64_t off
, uint64_t len
, snapid_t snap
, bufferlist
*pbl
,
2486 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2487 ObjectOperation
*extra_ops
= NULL
) {
2489 int i
= init_ops(ops
, 1, extra_ops
);
2490 ops
[i
].op
.op
= CEPH_OSD_OP_MAPEXT
;
2491 ops
[i
].op
.extent
.offset
= off
;
2492 ops
[i
].op
.extent
.length
= len
;
2493 ops
[i
].op
.extent
.truncate_size
= 0;
2494 ops
[i
].op
.extent
.truncate_seq
= 0;
2495 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2496 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
2503 ceph_tid_t
getxattr(const object_t
& oid
, const object_locator_t
& oloc
,
2504 const char *name
, snapid_t snap
, bufferlist
*pbl
, int flags
,
2506 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2508 int i
= init_ops(ops
, 1, extra_ops
);
2509 ops
[i
].op
.op
= CEPH_OSD_OP_GETXATTR
;
2510 ops
[i
].op
.xattr
.name_len
= (name
? strlen(name
) : 0);
2511 ops
[i
].op
.xattr
.value_len
= 0;
2513 ops
[i
].indata
.append(name
);
2514 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2515 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
2523 ceph_tid_t
getxattrs(const object_t
& oid
, const object_locator_t
& oloc
,
2524 snapid_t snap
, map
<string
,bufferlist
>& attrset
,
2525 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2526 ObjectOperation
*extra_ops
= NULL
) {
2528 int i
= init_ops(ops
, 1, extra_ops
);
2529 ops
[i
].op
.op
= CEPH_OSD_OP_GETXATTRS
;
2530 C_GetAttrs
*fin
= new C_GetAttrs(attrset
, onfinish
);
2531 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2532 CEPH_OSD_FLAG_READ
, fin
, objver
);
2534 o
->outbl
= &fin
->bl
;
2540 ceph_tid_t
read_full(const object_t
& oid
, const object_locator_t
& oloc
,
2541 snapid_t snap
, bufferlist
*pbl
, int flags
,
2542 Context
*onfinish
, version_t
*objver
= NULL
,
2543 ObjectOperation
*extra_ops
= NULL
) {
2544 return read(oid
, oloc
, 0, 0, snap
, pbl
, flags
| global_op_flags
|
2545 CEPH_OSD_FLAG_READ
, onfinish
, objver
, extra_ops
);
2550 ceph_tid_t
_modify(const object_t
& oid
, const object_locator_t
& oloc
,
2551 vector
<OSDOp
>& ops
, ceph::real_time mtime
,
2552 const SnapContext
& snapc
, int flags
,
2554 version_t
*objver
= NULL
) {
2555 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2556 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2563 Op
*prepare_write_op(
2564 const object_t
& oid
, const object_locator_t
& oloc
,
2565 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
2566 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2567 Context
*oncommit
, version_t
*objver
= NULL
,
2568 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0,
2569 ZTracer::Trace
*parent_trace
= nullptr) {
2571 int i
= init_ops(ops
, 1, extra_ops
);
2572 ops
[i
].op
.op
= CEPH_OSD_OP_WRITE
;
2573 ops
[i
].op
.extent
.offset
= off
;
2574 ops
[i
].op
.extent
.length
= len
;
2575 ops
[i
].op
.extent
.truncate_size
= 0;
2576 ops
[i
].op
.extent
.truncate_seq
= 0;
2578 ops
[i
].op
.flags
= op_flags
;
2579 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2580 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
,
2581 nullptr, parent_trace
);
2587 const object_t
& oid
, const object_locator_t
& oloc
,
2588 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
2589 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2590 Context
*oncommit
, version_t
*objver
= NULL
,
2591 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2592 Op
*o
= prepare_write_op(oid
, oloc
, off
, len
, snapc
, bl
, mtime
, flags
,
2593 oncommit
, objver
, extra_ops
, op_flags
);
2598 Op
*prepare_append_op(
2599 const object_t
& oid
, const object_locator_t
& oloc
,
2600 uint64_t len
, const SnapContext
& snapc
,
2601 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2603 version_t
*objver
= NULL
,
2604 ObjectOperation
*extra_ops
= NULL
) {
2606 int i
= init_ops(ops
, 1, extra_ops
);
2607 ops
[i
].op
.op
= CEPH_OSD_OP_APPEND
;
2608 ops
[i
].op
.extent
.offset
= 0;
2609 ops
[i
].op
.extent
.length
= len
;
2610 ops
[i
].op
.extent
.truncate_size
= 0;
2611 ops
[i
].op
.extent
.truncate_seq
= 0;
2613 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2614 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2620 const object_t
& oid
, const object_locator_t
& oloc
,
2621 uint64_t len
, const SnapContext
& snapc
,
2622 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2624 version_t
*objver
= NULL
,
2625 ObjectOperation
*extra_ops
= NULL
) {
2626 Op
*o
= prepare_append_op(oid
, oloc
, len
, snapc
, bl
, mtime
, flags
,
2627 oncommit
, objver
, extra_ops
);
2632 ceph_tid_t
write_trunc(const object_t
& oid
, const object_locator_t
& oloc
,
2633 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
2634 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2635 uint64_t trunc_size
, __u32 trunc_seq
,
2637 version_t
*objver
= NULL
,
2638 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2640 int i
= init_ops(ops
, 1, extra_ops
);
2641 ops
[i
].op
.op
= CEPH_OSD_OP_WRITE
;
2642 ops
[i
].op
.extent
.offset
= off
;
2643 ops
[i
].op
.extent
.length
= len
;
2644 ops
[i
].op
.extent
.truncate_size
= trunc_size
;
2645 ops
[i
].op
.extent
.truncate_seq
= trunc_seq
;
2647 ops
[i
].op
.flags
= op_flags
;
2648 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2649 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2656 Op
*prepare_write_full_op(
2657 const object_t
& oid
, const object_locator_t
& oloc
,
2658 const SnapContext
& snapc
, const bufferlist
&bl
,
2659 ceph::real_time mtime
, int flags
,
2660 Context
*oncommit
, version_t
*objver
= NULL
,
2661 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2663 int i
= init_ops(ops
, 1, extra_ops
);
2664 ops
[i
].op
.op
= CEPH_OSD_OP_WRITEFULL
;
2665 ops
[i
].op
.extent
.offset
= 0;
2666 ops
[i
].op
.extent
.length
= bl
.length();
2668 ops
[i
].op
.flags
= op_flags
;
2669 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2670 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2675 ceph_tid_t
write_full(
2676 const object_t
& oid
, const object_locator_t
& oloc
,
2677 const SnapContext
& snapc
, const bufferlist
&bl
,
2678 ceph::real_time mtime
, int flags
,
2679 Context
*oncommit
, version_t
*objver
= NULL
,
2680 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2681 Op
*o
= prepare_write_full_op(oid
, oloc
, snapc
, bl
, mtime
, flags
,
2682 oncommit
, objver
, extra_ops
, op_flags
);
2687 Op
*prepare_writesame_op(
2688 const object_t
& oid
, const object_locator_t
& oloc
,
2689 uint64_t write_len
, uint64_t off
,
2690 const SnapContext
& snapc
, const bufferlist
&bl
,
2691 ceph::real_time mtime
, int flags
,
2692 Context
*oncommit
, version_t
*objver
= NULL
,
2693 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2696 int i
= init_ops(ops
, 1, extra_ops
);
2697 ops
[i
].op
.op
= CEPH_OSD_OP_WRITESAME
;
2698 ops
[i
].op
.writesame
.offset
= off
;
2699 ops
[i
].op
.writesame
.length
= write_len
;
2700 ops
[i
].op
.writesame
.data_length
= bl
.length();
2702 ops
[i
].op
.flags
= op_flags
;
2703 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2704 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2709 ceph_tid_t
writesame(
2710 const object_t
& oid
, const object_locator_t
& oloc
,
2711 uint64_t write_len
, uint64_t off
,
2712 const SnapContext
& snapc
, const bufferlist
&bl
,
2713 ceph::real_time mtime
, int flags
,
2714 Context
*oncommit
, version_t
*objver
= NULL
,
2715 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2717 Op
*o
= prepare_writesame_op(oid
, oloc
, write_len
, off
, snapc
, bl
,
2718 mtime
, flags
, oncommit
, objver
,
2719 extra_ops
, op_flags
);
2725 ceph_tid_t
trunc(const object_t
& oid
, const object_locator_t
& oloc
,
2726 const SnapContext
& snapc
, ceph::real_time mtime
, int flags
,
2727 uint64_t trunc_size
, __u32 trunc_seq
,
2728 Context
*oncommit
, version_t
*objver
= NULL
,
2729 ObjectOperation
*extra_ops
= NULL
) {
2731 int i
= init_ops(ops
, 1, extra_ops
);
2732 ops
[i
].op
.op
= CEPH_OSD_OP_TRUNCATE
;
2733 ops
[i
].op
.extent
.offset
= trunc_size
;
2734 ops
[i
].op
.extent
.truncate_size
= trunc_size
;
2735 ops
[i
].op
.extent
.truncate_seq
= trunc_seq
;
2736 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2737 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2744 ceph_tid_t
zero(const object_t
& oid
, const object_locator_t
& oloc
,
2745 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
2746 ceph::real_time mtime
, int flags
, Context
*oncommit
,
2747 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2749 int i
= init_ops(ops
, 1, extra_ops
);
2750 ops
[i
].op
.op
= CEPH_OSD_OP_ZERO
;
2751 ops
[i
].op
.extent
.offset
= off
;
2752 ops
[i
].op
.extent
.length
= len
;
2753 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2754 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2761 ceph_tid_t
rollback_object(const object_t
& oid
, const object_locator_t
& oloc
,
2762 const SnapContext
& snapc
, snapid_t snapid
,
2763 ceph::real_time mtime
, Context
*oncommit
,
2764 version_t
*objver
= NULL
,
2765 ObjectOperation
*extra_ops
= NULL
) {
2767 int i
= init_ops(ops
, 1, extra_ops
);
2768 ops
[i
].op
.op
= CEPH_OSD_OP_ROLLBACK
;
2769 ops
[i
].op
.snap
.snapid
= snapid
;
2770 Op
*o
= new Op(oid
, oloc
, ops
, CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2777 ceph_tid_t
create(const object_t
& oid
, const object_locator_t
& oloc
,
2778 const SnapContext
& snapc
, ceph::real_time mtime
, int global_flags
,
2779 int create_flags
, Context
*oncommit
,
2780 version_t
*objver
= NULL
,
2781 ObjectOperation
*extra_ops
= NULL
) {
2783 int i
= init_ops(ops
, 1, extra_ops
);
2784 ops
[i
].op
.op
= CEPH_OSD_OP_CREATE
;
2785 ops
[i
].op
.flags
= create_flags
;
2786 Op
*o
= new Op(oid
, oloc
, ops
, global_flags
| global_op_flags
|
2787 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2794 Op
*prepare_remove_op(
2795 const object_t
& oid
, const object_locator_t
& oloc
,
2796 const SnapContext
& snapc
, ceph::real_time mtime
, int flags
,
2798 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2800 int i
= init_ops(ops
, 1, extra_ops
);
2801 ops
[i
].op
.op
= CEPH_OSD_OP_DELETE
;
2802 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2803 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2809 const object_t
& oid
, const object_locator_t
& oloc
,
2810 const SnapContext
& snapc
, ceph::real_time mtime
, int flags
,
2812 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2813 Op
*o
= prepare_remove_op(oid
, oloc
, snapc
, mtime
, flags
,
2814 oncommit
, objver
, extra_ops
);
2820 ceph_tid_t
setxattr(const object_t
& oid
, const object_locator_t
& oloc
,
2821 const char *name
, const SnapContext
& snapc
, const bufferlist
&bl
,
2822 ceph::real_time mtime
, int flags
,
2824 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2826 int i
= init_ops(ops
, 1, extra_ops
);
2827 ops
[i
].op
.op
= CEPH_OSD_OP_SETXATTR
;
2828 ops
[i
].op
.xattr
.name_len
= (name
? strlen(name
) : 0);
2829 ops
[i
].op
.xattr
.value_len
= bl
.length();
2831 ops
[i
].indata
.append(name
);
2832 ops
[i
].indata
.append(bl
);
2833 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2834 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2841 ceph_tid_t
removexattr(const object_t
& oid
, const object_locator_t
& oloc
,
2842 const char *name
, const SnapContext
& snapc
,
2843 ceph::real_time mtime
, int flags
,
2845 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2847 int i
= init_ops(ops
, 1, extra_ops
);
2848 ops
[i
].op
.op
= CEPH_OSD_OP_RMXATTR
;
2849 ops
[i
].op
.xattr
.name_len
= (name
? strlen(name
) : 0);
2850 ops
[i
].op
.xattr
.value_len
= 0;
2852 ops
[i
].indata
.append(name
);
2853 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2854 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2862 void list_nobjects(NListContext
*p
, Context
*onfinish
);
2863 uint32_t list_nobjects_seek(NListContext
*p
, uint32_t pos
);
2864 uint32_t list_nobjects_seek(NListContext
*list_context
, const hobject_t
& c
);
2865 void list_nobjects_get_cursor(NListContext
*list_context
, hobject_t
*c
);
2867 hobject_t
enumerate_objects_begin();
2868 hobject_t
enumerate_objects_end();
2869 //hobject_t enumerate_objects_begin(int n, int m);
2870 void enumerate_objects(
2872 const std::string
&ns
,
2873 const hobject_t
&start
,
2874 const hobject_t
&end
,
2876 const bufferlist
&filter_bl
,
2877 std::list
<librados::ListObjectImpl
> *result
,
2879 Context
*on_finish
);
2881 void _enumerate_reply(
2884 const hobject_t
&end
,
2885 const int64_t pool_id
,
2887 epoch_t reply_epoch
,
2888 std::list
<librados::ListObjectImpl
> *result
,
2890 Context
*on_finish
);
2891 friend class C_EnumerateReply
;
2893 // -------------------------
2896 void pool_op_submit(PoolOp
*op
);
2897 void _pool_op_submit(PoolOp
*op
);
2898 void _finish_pool_op(PoolOp
*op
, int r
);
2899 void _do_delete_pool(int64_t pool
, Context
*onfinish
);
2901 int create_pool_snap(int64_t pool
, string
& snapName
, Context
*onfinish
);
2902 int allocate_selfmanaged_snap(int64_t pool
, snapid_t
*psnapid
,
2904 int delete_pool_snap(int64_t pool
, string
& snapName
, Context
*onfinish
);
2905 int delete_selfmanaged_snap(int64_t pool
, snapid_t snap
, Context
*onfinish
);
2907 int create_pool(string
& name
, Context
*onfinish
, uint64_t auid
=0,
2909 int delete_pool(int64_t pool
, Context
*onfinish
);
2910 int delete_pool(const string
& name
, Context
*onfinish
);
2911 int change_pool_auid(int64_t pool
, Context
*onfinish
, uint64_t auid
);
2913 void handle_pool_op_reply(MPoolOpReply
*m
);
2914 int pool_op_cancel(ceph_tid_t tid
, int r
);
2916 // --------------------------
2919 void _poolstat_submit(PoolStatOp
*op
);
2921 void handle_get_pool_stats_reply(MGetPoolStatsReply
*m
);
2922 void get_pool_stats(list
<string
>& pools
, map
<string
,pool_stat_t
> *result
,
2924 int pool_stat_op_cancel(ceph_tid_t tid
, int r
);
2925 void _finish_pool_stat_op(PoolStatOp
*op
, int r
);
2927 // ---------------------------
2930 void _fs_stats_submit(StatfsOp
*op
);
2932 void handle_fs_stats_reply(MStatfsReply
*m
);
2933 void get_fs_stats(struct ceph_statfs
& result
, boost::optional
<int64_t> poolid
,
2935 int statfs_op_cancel(ceph_tid_t tid
, int r
);
2936 void _finish_statfs_op(StatfsOp
*op
, int r
);
2938 // ---------------------------
2939 // some scatter/gather hackery
2941 void _sg_read_finish(vector
<ObjectExtent
>& extents
,
2942 vector
<bufferlist
>& resultbl
,
2943 bufferlist
*bl
, Context
*onfinish
);
2945 struct C_SGRead
: public Context
{
2947 vector
<ObjectExtent
> extents
;
2948 vector
<bufferlist
> resultbl
;
2951 C_SGRead(Objecter
*ob
,
2952 vector
<ObjectExtent
>& e
, vector
<bufferlist
>& r
, bufferlist
*b
,
2954 objecter(ob
), bl(b
), onfinish(c
) {
2958 void finish(int r
) override
{
2959 objecter
->_sg_read_finish(extents
, resultbl
, bl
, onfinish
);
2963 void sg_read_trunc(vector
<ObjectExtent
>& extents
, snapid_t snap
,
2964 bufferlist
*bl
, int flags
, uint64_t trunc_size
,
2965 __u32 trunc_seq
, Context
*onfinish
, int op_flags
= 0) {
2966 if (extents
.size() == 1) {
2967 read_trunc(extents
[0].oid
, extents
[0].oloc
, extents
[0].offset
,
2968 extents
[0].length
, snap
, bl
, flags
, extents
[0].truncate_size
,
2969 trunc_seq
, onfinish
, 0, 0, op_flags
);
2971 C_GatherBuilder
gather(cct
);
2972 vector
<bufferlist
> resultbl(extents
.size());
2974 for (vector
<ObjectExtent
>::iterator p
= extents
.begin();
2977 read_trunc(p
->oid
, p
->oloc
, p
->offset
, p
->length
, snap
, &resultbl
[i
++],
2978 flags
, p
->truncate_size
, trunc_seq
, gather
.new_sub(),
2981 gather
.set_finisher(new C_SGRead(this, extents
, resultbl
, bl
, onfinish
));
2986 void sg_read(vector
<ObjectExtent
>& extents
, snapid_t snap
, bufferlist
*bl
,
2987 int flags
, Context
*onfinish
, int op_flags
= 0) {
2988 sg_read_trunc(extents
, snap
, bl
, flags
, 0, 0, onfinish
, op_flags
);
2991 void sg_write_trunc(vector
<ObjectExtent
>& extents
, const SnapContext
& snapc
,
2992 const bufferlist
& bl
, ceph::real_time mtime
, int flags
,
2993 uint64_t trunc_size
, __u32 trunc_seq
,
2994 Context
*oncommit
, int op_flags
= 0) {
2995 if (extents
.size() == 1) {
2996 write_trunc(extents
[0].oid
, extents
[0].oloc
, extents
[0].offset
,
2997 extents
[0].length
, snapc
, bl
, mtime
, flags
,
2998 extents
[0].truncate_size
, trunc_seq
, oncommit
,
3001 C_GatherBuilder
gcom(cct
, oncommit
);
3002 for (vector
<ObjectExtent
>::iterator p
= extents
.begin();
3006 for (vector
<pair
<uint64_t,uint64_t> >::iterator bit
3007 = p
->buffer_extents
.begin();
3008 bit
!= p
->buffer_extents
.end();
3010 bl
.copy(bit
->first
, bit
->second
, cur
);
3011 assert(cur
.length() == p
->length
);
3012 write_trunc(p
->oid
, p
->oloc
, p
->offset
, p
->length
,
3013 snapc
, cur
, mtime
, flags
, p
->truncate_size
, trunc_seq
,
3014 oncommit
? gcom
.new_sub():0,
3021 void sg_write(vector
<ObjectExtent
>& extents
, const SnapContext
& snapc
,
3022 const bufferlist
& bl
, ceph::real_time mtime
, int flags
,
3023 Context
*oncommit
, int op_flags
= 0) {
3024 sg_write_trunc(extents
, snapc
, bl
, mtime
, flags
, 0, 0, oncommit
,
3028 void ms_handle_connect(Connection
*con
) override
;
3029 bool ms_handle_reset(Connection
*con
) override
;
3030 void ms_handle_remote_reset(Connection
*con
) override
;
3031 bool ms_handle_refused(Connection
*con
) override
;
3032 bool ms_get_authorizer(int dest_type
,
3033 AuthAuthorizer
**authorizer
,
3034 bool force_new
) override
;
3036 void blacklist_self(bool set
);
3039 epoch_t epoch_barrier
;
3040 bool retry_writes_after_first_reply
;
3042 void set_epoch_barrier(epoch_t epoch
);
3044 PerfCounters
*get_logger() {