1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_OBJECTER_H
16 #define CEPH_OBJECTER_H
18 #include <condition_variable>
24 #include <type_traits>
26 #include <boost/thread/shared_mutex.hpp>
28 #include "include/assert.h"
29 #include "include/buffer.h"
30 #include "include/types.h"
31 #include "include/rados/rados_types.hpp"
33 #include "common/admin_socket.h"
34 #include "common/ceph_time.h"
35 #include "common/ceph_timer.h"
36 #include "common/Finisher.h"
37 #include "common/shunique_lock.h"
38 #include "common/zipkin_trace.h"
40 #include "messages/MOSDOp.h"
41 #include "osd/OSDMap.h"
54 class MGetPoolStatsReply
;
61 // -----------------------------------------
63 struct ObjectOperation
{
68 vector
<bufferlist
*> out_bl
;
69 vector
<Context
*> out_handler
;
70 vector
<int*> out_rval
;
72 ObjectOperation() : flags(0), priority(0) {}
74 while (!out_handler
.empty()) {
75 delete out_handler
.back();
76 out_handler
.pop_back();
84 void set_last_op_flags(int flags
) {
86 ops
.rbegin()->op
.flags
= flags
;
91 * Add a callback to run when this operation completes,
92 * after any other callbacks for it.
94 void add_handler(Context
*extra
);
96 OSDOp
& add_op(int op
) {
102 out_handler
.resize(s
+1);
103 out_handler
[s
] = NULL
;
104 out_rval
.resize(s
+1);
108 void add_data(int op
, uint64_t off
, uint64_t len
, bufferlist
& bl
) {
109 OSDOp
& osd_op
= add_op(op
);
110 osd_op
.op
.extent
.offset
= off
;
111 osd_op
.op
.extent
.length
= len
;
112 osd_op
.indata
.claim_append(bl
);
114 void add_writesame(int op
, uint64_t off
, uint64_t write_len
,
116 OSDOp
& osd_op
= add_op(op
);
117 osd_op
.op
.writesame
.offset
= off
;
118 osd_op
.op
.writesame
.length
= write_len
;
119 osd_op
.op
.writesame
.data_length
= bl
.length();
120 osd_op
.indata
.claim_append(bl
);
122 void add_xattr(int op
, const char *name
, const bufferlist
& data
) {
123 OSDOp
& osd_op
= add_op(op
);
124 osd_op
.op
.xattr
.name_len
= (name
? strlen(name
) : 0);
125 osd_op
.op
.xattr
.value_len
= data
.length();
127 osd_op
.indata
.append(name
);
128 osd_op
.indata
.append(data
);
130 void add_xattr_cmp(int op
, const char *name
, uint8_t cmp_op
,
131 uint8_t cmp_mode
, const bufferlist
& data
) {
132 OSDOp
& osd_op
= add_op(op
);
133 osd_op
.op
.xattr
.name_len
= (name
? strlen(name
) : 0);
134 osd_op
.op
.xattr
.value_len
= data
.length();
135 osd_op
.op
.xattr
.cmp_op
= cmp_op
;
136 osd_op
.op
.xattr
.cmp_mode
= cmp_mode
;
138 osd_op
.indata
.append(name
);
139 osd_op
.indata
.append(data
);
141 void add_call(int op
, const char *cname
, const char *method
,
143 bufferlist
*outbl
, Context
*ctx
, int *prval
) {
144 OSDOp
& osd_op
= add_op(op
);
146 unsigned p
= ops
.size() - 1;
147 out_handler
[p
] = ctx
;
151 osd_op
.op
.cls
.class_len
= strlen(cname
);
152 osd_op
.op
.cls
.method_len
= strlen(method
);
153 osd_op
.op
.cls
.indata_len
= indata
.length();
154 osd_op
.indata
.append(cname
, osd_op
.op
.cls
.class_len
);
155 osd_op
.indata
.append(method
, osd_op
.op
.cls
.method_len
);
156 osd_op
.indata
.append(indata
);
158 void add_pgls(int op
, uint64_t count
, collection_list_handle_t cookie
,
159 epoch_t start_epoch
) {
160 OSDOp
& osd_op
= add_op(op
);
161 osd_op
.op
.pgls
.count
= count
;
162 osd_op
.op
.pgls
.start_epoch
= start_epoch
;
163 ::encode(cookie
, osd_op
.indata
);
165 void add_pgls_filter(int op
, uint64_t count
, const bufferlist
& filter
,
166 collection_list_handle_t cookie
, epoch_t start_epoch
) {
167 OSDOp
& osd_op
= add_op(op
);
168 osd_op
.op
.pgls
.count
= count
;
169 osd_op
.op
.pgls
.start_epoch
= start_epoch
;
171 string mname
= "filter";
172 ::encode(cname
, osd_op
.indata
);
173 ::encode(mname
, osd_op
.indata
);
174 osd_op
.indata
.append(filter
);
175 ::encode(cookie
, osd_op
.indata
);
177 void add_alloc_hint(int op
, uint64_t expected_object_size
,
178 uint64_t expected_write_size
,
180 OSDOp
& osd_op
= add_op(op
);
181 osd_op
.op
.alloc_hint
.expected_object_size
= expected_object_size
;
182 osd_op
.op
.alloc_hint
.expected_write_size
= expected_write_size
;
183 osd_op
.op
.alloc_hint
.flags
= flags
;
189 void pg_ls(uint64_t count
, bufferlist
& filter
,
190 collection_list_handle_t cookie
, epoch_t start_epoch
) {
191 if (filter
.length() == 0)
192 add_pgls(CEPH_OSD_OP_PGLS
, count
, cookie
, start_epoch
);
194 add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER
, count
, filter
, cookie
,
196 flags
|= CEPH_OSD_FLAG_PGOP
;
199 void pg_nls(uint64_t count
, const bufferlist
& filter
,
200 collection_list_handle_t cookie
, epoch_t start_epoch
) {
201 if (filter
.length() == 0)
202 add_pgls(CEPH_OSD_OP_PGNLS
, count
, cookie
, start_epoch
);
204 add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER
, count
, filter
, cookie
,
206 flags
|= CEPH_OSD_FLAG_PGOP
;
209 void scrub_ls(const librados::object_id_t
& start_after
,
211 std::vector
<librados::inconsistent_obj_t
> *objects
,
214 void scrub_ls(const librados::object_id_t
& start_after
,
216 std::vector
<librados::inconsistent_snapset_t
> *objects
,
220 void create(bool excl
) {
221 OSDOp
& o
= add_op(CEPH_OSD_OP_CREATE
);
222 o
.op
.flags
= (excl
? CEPH_OSD_OP_FLAG_EXCL
: 0);
225 struct C_ObjectOperation_stat
: public Context
{
228 ceph::real_time
*pmtime
;
230 struct timespec
*pts
;
232 C_ObjectOperation_stat(uint64_t *ps
, ceph::real_time
*pm
, time_t *pt
, struct timespec
*_pts
,
234 : psize(ps
), pmtime(pm
), ptime(pt
), pts(_pts
), prval(prval
) {}
235 void finish(int r
) override
{
237 bufferlist::iterator p
= bl
.begin();
240 ceph::real_time mtime
;
248 *ptime
= ceph::real_clock::to_time_t(mtime
);
250 *pts
= ceph::real_clock::to_timespec(mtime
);
251 } catch (buffer::error
& e
) {
258 void stat(uint64_t *psize
, ceph::real_time
*pmtime
, int *prval
) {
259 add_op(CEPH_OSD_OP_STAT
);
260 unsigned p
= ops
.size() - 1;
261 C_ObjectOperation_stat
*h
= new C_ObjectOperation_stat(psize
, pmtime
, NULL
, NULL
,
267 void stat(uint64_t *psize
, time_t *ptime
, int *prval
) {
268 add_op(CEPH_OSD_OP_STAT
);
269 unsigned p
= ops
.size() - 1;
270 C_ObjectOperation_stat
*h
= new C_ObjectOperation_stat(psize
, NULL
, ptime
, NULL
,
276 void stat(uint64_t *psize
, struct timespec
*pts
, int *prval
) {
277 add_op(CEPH_OSD_OP_STAT
);
278 unsigned p
= ops
.size() - 1;
279 C_ObjectOperation_stat
*h
= new C_ObjectOperation_stat(psize
, NULL
, NULL
, pts
,
286 struct C_ObjectOperation_cmpext
: public Context
{
288 C_ObjectOperation_cmpext(int *prval
)
297 void cmpext(uint64_t off
, bufferlist
& cmp_bl
, int *prval
) {
298 add_data(CEPH_OSD_OP_CMPEXT
, off
, cmp_bl
.length(), cmp_bl
);
299 unsigned p
= ops
.size() - 1;
300 C_ObjectOperation_cmpext
*h
= new C_ObjectOperation_cmpext(prval
);
306 void cmpext(uint64_t off
, uint64_t cmp_len
, const char *cmp_buf
, int *prval
) {
308 cmp_bl
.append(cmp_buf
, cmp_len
);
309 add_data(CEPH_OSD_OP_CMPEXT
, off
, cmp_len
, cmp_bl
);
310 unsigned p
= ops
.size() - 1;
311 C_ObjectOperation_cmpext
*h
= new C_ObjectOperation_cmpext(prval
);
316 void read(uint64_t off
, uint64_t len
, bufferlist
*pbl
, int *prval
,
319 add_data(CEPH_OSD_OP_READ
, off
, len
, bl
);
320 unsigned p
= ops
.size() - 1;
323 out_handler
[p
] = ctx
;
326 struct C_ObjectOperation_sparse_read
: public Context
{
329 std::map
<uint64_t, uint64_t> *extents
;
331 C_ObjectOperation_sparse_read(bufferlist
*data_bl
,
332 std::map
<uint64_t, uint64_t> *extents
,
334 : data_bl(data_bl
), extents(extents
), prval(prval
) {}
335 void finish(int r
) override
{
336 bufferlist::iterator iter
= bl
.begin();
339 ::decode(*extents
, iter
);
340 ::decode(*data_bl
, iter
);
341 } catch (buffer::error
& e
) {
348 void sparse_read(uint64_t off
, uint64_t len
, std::map
<uint64_t,uint64_t> *m
,
349 bufferlist
*data_bl
, int *prval
) {
351 add_data(CEPH_OSD_OP_SPARSE_READ
, off
, len
, bl
);
352 unsigned p
= ops
.size() - 1;
353 C_ObjectOperation_sparse_read
*h
=
354 new C_ObjectOperation_sparse_read(data_bl
, m
, prval
);
359 void write(uint64_t off
, bufferlist
& bl
,
360 uint64_t truncate_size
,
361 uint32_t truncate_seq
) {
362 add_data(CEPH_OSD_OP_WRITE
, off
, bl
.length(), bl
);
363 OSDOp
& o
= *ops
.rbegin();
364 o
.op
.extent
.truncate_size
= truncate_size
;
365 o
.op
.extent
.truncate_seq
= truncate_seq
;
367 void write(uint64_t off
, bufferlist
& bl
) {
368 write(off
, bl
, 0, 0);
370 void write_full(bufferlist
& bl
) {
371 add_data(CEPH_OSD_OP_WRITEFULL
, 0, bl
.length(), bl
);
373 void writesame(uint64_t off
, uint64_t write_len
, bufferlist
& bl
) {
374 add_writesame(CEPH_OSD_OP_WRITESAME
, off
, write_len
, bl
);
376 void append(bufferlist
& bl
) {
377 add_data(CEPH_OSD_OP_APPEND
, 0, bl
.length(), bl
);
379 void zero(uint64_t off
, uint64_t len
) {
381 add_data(CEPH_OSD_OP_ZERO
, off
, len
, bl
);
383 void truncate(uint64_t off
) {
385 add_data(CEPH_OSD_OP_TRUNCATE
, off
, 0, bl
);
389 add_data(CEPH_OSD_OP_DELETE
, 0, 0, bl
);
391 void mapext(uint64_t off
, uint64_t len
) {
393 add_data(CEPH_OSD_OP_MAPEXT
, off
, len
, bl
);
395 void sparse_read(uint64_t off
, uint64_t len
) {
397 add_data(CEPH_OSD_OP_SPARSE_READ
, off
, len
, bl
);
400 void checksum(uint8_t type
, const bufferlist
&init_value_bl
,
401 uint64_t off
, uint64_t len
, size_t chunk_size
,
402 bufferlist
*pbl
, int *prval
, Context
*ctx
) {
403 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_CHECKSUM
);
404 osd_op
.op
.checksum
.offset
= off
;
405 osd_op
.op
.checksum
.length
= len
;
406 osd_op
.op
.checksum
.type
= type
;
407 osd_op
.op
.checksum
.chunk_size
= chunk_size
;
408 osd_op
.indata
.append(init_value_bl
);
410 unsigned p
= ops
.size() - 1;
413 out_handler
[p
] = ctx
;
417 void getxattr(const char *name
, bufferlist
*pbl
, int *prval
) {
419 add_xattr(CEPH_OSD_OP_GETXATTR
, name
, bl
);
420 unsigned p
= ops
.size() - 1;
424 struct C_ObjectOperation_decodevals
: public Context
{
425 uint64_t max_entries
;
427 std::map
<std::string
,bufferlist
> *pattrs
;
430 C_ObjectOperation_decodevals(uint64_t m
, std::map
<std::string
,bufferlist
> *pa
,
432 : max_entries(m
), pattrs(pa
), ptruncated(pt
), prval(pr
) {
437 void finish(int r
) override
{
439 bufferlist::iterator p
= bl
.begin();
442 ::decode(*pattrs
, p
);
444 std::map
<std::string
,bufferlist
> ignore
;
450 ::decode(*ptruncated
, p
);
452 // the OSD did not provide this. since old OSDs do not
453 // enfoce omap result limits either, we can infer it from
454 // the size of the result
455 *ptruncated
= (pattrs
->size() == max_entries
);
459 catch (buffer::error
& e
) {
466 struct C_ObjectOperation_decodekeys
: public Context
{
467 uint64_t max_entries
;
469 std::set
<std::string
> *pattrs
;
472 C_ObjectOperation_decodekeys(uint64_t m
, std::set
<std::string
> *pa
, bool *pt
,
474 : max_entries(m
), pattrs(pa
), ptruncated(pt
), prval(pr
) {
479 void finish(int r
) override
{
481 bufferlist::iterator p
= bl
.begin();
484 ::decode(*pattrs
, p
);
486 std::set
<std::string
> ignore
;
492 ::decode(*ptruncated
, p
);
494 // the OSD did not provide this. since old OSDs do not
495 // enfoce omap result limits either, we can infer it from
496 // the size of the result
497 *ptruncated
= (pattrs
->size() == max_entries
);
501 catch (buffer::error
& e
) {
508 struct C_ObjectOperation_decodewatchers
: public Context
{
510 list
<obj_watch_t
> *pwatchers
;
512 C_ObjectOperation_decodewatchers(list
<obj_watch_t
> *pw
, int *pr
)
513 : pwatchers(pw
), prval(pr
) {}
514 void finish(int r
) override
{
516 bufferlist::iterator p
= bl
.begin();
518 obj_list_watch_response_t resp
;
521 for (list
<watch_item_t
>::iterator i
= resp
.entries
.begin() ;
522 i
!= resp
.entries
.end() ; ++i
) {
526 strncpy(ow
.addr
, sa
.str().c_str(), 256);
527 ow
.watcher_id
= i
->name
.num();
528 ow
.cookie
= i
->cookie
;
529 ow
.timeout_seconds
= i
->timeout_seconds
;
530 pwatchers
->push_back(ow
);
534 catch (buffer::error
& e
) {
541 struct C_ObjectOperation_decodesnaps
: public Context
{
543 librados::snap_set_t
*psnaps
;
545 C_ObjectOperation_decodesnaps(librados::snap_set_t
*ps
, int *pr
)
546 : psnaps(ps
), prval(pr
) {}
547 void finish(int r
) override
{
549 bufferlist::iterator p
= bl
.begin();
551 obj_list_snap_response_t resp
;
554 psnaps
->clones
.clear();
555 for (vector
<clone_info
>::iterator ci
= resp
.clones
.begin();
556 ci
!= resp
.clones
.end();
558 librados::clone_info_t clone
;
560 clone
.cloneid
= ci
->cloneid
;
561 clone
.snaps
.reserve(ci
->snaps
.size());
562 clone
.snaps
.insert(clone
.snaps
.end(), ci
->snaps
.begin(),
564 clone
.overlap
= ci
->overlap
;
565 clone
.size
= ci
->size
;
567 psnaps
->clones
.push_back(clone
);
569 psnaps
->seq
= resp
.seq
;
571 } catch (buffer::error
& e
) {
578 void getxattrs(std::map
<std::string
,bufferlist
> *pattrs
, int *prval
) {
579 add_op(CEPH_OSD_OP_GETXATTRS
);
580 if (pattrs
|| prval
) {
581 unsigned p
= ops
.size() - 1;
582 C_ObjectOperation_decodevals
*h
583 = new C_ObjectOperation_decodevals(0, pattrs
, nullptr, prval
);
589 void setxattr(const char *name
, const bufferlist
& bl
) {
590 add_xattr(CEPH_OSD_OP_SETXATTR
, name
, bl
);
592 void setxattr(const char *name
, const string
& s
) {
595 add_xattr(CEPH_OSD_OP_SETXATTR
, name
, bl
);
597 void cmpxattr(const char *name
, uint8_t cmp_op
, uint8_t cmp_mode
,
598 const bufferlist
& bl
) {
599 add_xattr_cmp(CEPH_OSD_OP_CMPXATTR
, name
, cmp_op
, cmp_mode
, bl
);
601 void rmxattr(const char *name
) {
603 add_xattr(CEPH_OSD_OP_RMXATTR
, name
, bl
);
605 void setxattrs(map
<string
, bufferlist
>& attrs
) {
608 add_xattr(CEPH_OSD_OP_RESETXATTRS
, 0, bl
.length());
610 void resetxattrs(const char *prefix
, map
<string
, bufferlist
>& attrs
) {
613 add_xattr(CEPH_OSD_OP_RESETXATTRS
, prefix
, bl
);
617 void tmap_update(bufferlist
& bl
) {
618 add_data(CEPH_OSD_OP_TMAPUP
, 0, 0, bl
);
620 void tmap_put(bufferlist
& bl
) {
621 add_data(CEPH_OSD_OP_TMAPPUT
, 0, bl
.length(), bl
);
623 void tmap_get(bufferlist
*pbl
, int *prval
) {
624 add_op(CEPH_OSD_OP_TMAPGET
);
625 unsigned p
= ops
.size() - 1;
630 add_op(CEPH_OSD_OP_TMAPGET
);
632 void tmap_to_omap(bool nullok
=false) {
633 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_TMAP2OMAP
);
635 osd_op
.op
.tmap2omap
.flags
= CEPH_OSD_TMAP2OMAP_NULLOK
;
639 void omap_get_keys(const string
&start_after
,
641 std::set
<std::string
> *out_set
,
644 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAPGETKEYS
);
646 ::encode(start_after
, bl
);
647 ::encode(max_to_get
, bl
);
648 op
.op
.extent
.offset
= 0;
649 op
.op
.extent
.length
= bl
.length();
650 op
.indata
.claim_append(bl
);
651 if (prval
|| ptruncated
|| out_set
) {
652 unsigned p
= ops
.size() - 1;
653 C_ObjectOperation_decodekeys
*h
=
654 new C_ObjectOperation_decodekeys(max_to_get
, out_set
, ptruncated
, prval
);
661 void omap_get_vals(const string
&start_after
,
662 const string
&filter_prefix
,
664 std::map
<std::string
, bufferlist
> *out_set
,
667 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAPGETVALS
);
669 ::encode(start_after
, bl
);
670 ::encode(max_to_get
, bl
);
671 ::encode(filter_prefix
, bl
);
672 op
.op
.extent
.offset
= 0;
673 op
.op
.extent
.length
= bl
.length();
674 op
.indata
.claim_append(bl
);
675 if (prval
|| out_set
|| ptruncated
) {
676 unsigned p
= ops
.size() - 1;
677 C_ObjectOperation_decodevals
*h
=
678 new C_ObjectOperation_decodevals(max_to_get
, out_set
, ptruncated
, prval
);
685 void omap_get_vals_by_keys(const std::set
<std::string
> &to_get
,
686 std::map
<std::string
, bufferlist
> *out_set
,
688 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS
);
690 ::encode(to_get
, bl
);
691 op
.op
.extent
.offset
= 0;
692 op
.op
.extent
.length
= bl
.length();
693 op
.indata
.claim_append(bl
);
694 if (prval
|| out_set
) {
695 unsigned p
= ops
.size() - 1;
696 C_ObjectOperation_decodevals
*h
=
697 new C_ObjectOperation_decodevals(0, out_set
, nullptr, prval
);
704 void omap_cmp(const std::map
<std::string
, pair
<bufferlist
,int> > &assertions
,
706 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAP_CMP
);
708 ::encode(assertions
, bl
);
709 op
.op
.extent
.offset
= 0;
710 op
.op
.extent
.length
= bl
.length();
711 op
.indata
.claim_append(bl
);
713 unsigned p
= ops
.size() - 1;
718 struct C_ObjectOperation_copyget
: public Context
{
720 object_copy_cursor_t
*cursor
;
722 ceph::real_time
*out_mtime
;
723 std::map
<std::string
,bufferlist
> *out_attrs
;
724 bufferlist
*out_data
, *out_omap_header
, *out_omap_data
;
725 vector
<snapid_t
> *out_snaps
;
726 snapid_t
*out_snap_seq
;
728 uint32_t *out_data_digest
;
729 uint32_t *out_omap_digest
;
730 mempool::osd_pglog::vector
<pair
<osd_reqid_t
, version_t
> > *out_reqids
;
731 uint64_t *out_truncate_seq
;
732 uint64_t *out_truncate_size
;
734 C_ObjectOperation_copyget(object_copy_cursor_t
*c
,
737 std::map
<std::string
,bufferlist
> *a
,
738 bufferlist
*d
, bufferlist
*oh
,
740 std::vector
<snapid_t
> *osnaps
,
745 mempool::osd_pglog::vector
<pair
<osd_reqid_t
, version_t
> > *oreqids
,
750 out_size(s
), out_mtime(m
),
751 out_attrs(a
), out_data(d
), out_omap_header(oh
),
752 out_omap_data(o
), out_snaps(osnaps
), out_snap_seq(osnap_seq
),
753 out_flags(flags
), out_data_digest(dd
), out_omap_digest(od
),
755 out_truncate_seq(otseq
),
756 out_truncate_size(otsize
),
758 void finish(int r
) override
{
759 // reqids are copied on ENOENT
760 if (r
< 0 && r
!= -ENOENT
)
763 bufferlist::iterator p
= bl
.begin();
764 object_copy_data_t copy_reply
;
765 ::decode(copy_reply
, p
);
768 *out_reqids
= copy_reply
.reqids
;
772 *out_size
= copy_reply
.size
;
774 *out_mtime
= ceph::real_clock::from_ceph_timespec(copy_reply
.mtime
);
776 *out_attrs
= copy_reply
.attrs
;
778 out_data
->claim_append(copy_reply
.data
);
780 out_omap_header
->claim_append(copy_reply
.omap_header
);
782 *out_omap_data
= copy_reply
.omap_data
;
784 *out_snaps
= copy_reply
.snaps
;
786 *out_snap_seq
= copy_reply
.snap_seq
;
788 *out_flags
= copy_reply
.flags
;
790 *out_data_digest
= copy_reply
.data_digest
;
792 *out_omap_digest
= copy_reply
.omap_digest
;
794 *out_reqids
= copy_reply
.reqids
;
795 if (out_truncate_seq
)
796 *out_truncate_seq
= copy_reply
.truncate_seq
;
797 if (out_truncate_size
)
798 *out_truncate_size
= copy_reply
.truncate_size
;
799 *cursor
= copy_reply
.cursor
;
800 } catch (buffer::error
& e
) {
807 void copy_get(object_copy_cursor_t
*cursor
,
810 ceph::real_time
*out_mtime
,
811 std::map
<std::string
,bufferlist
> *out_attrs
,
812 bufferlist
*out_data
,
813 bufferlist
*out_omap_header
,
814 bufferlist
*out_omap_data
,
815 vector
<snapid_t
> *out_snaps
,
816 snapid_t
*out_snap_seq
,
818 uint32_t *out_data_digest
,
819 uint32_t *out_omap_digest
,
820 mempool::osd_pglog::vector
<pair
<osd_reqid_t
, version_t
> > *out_reqids
,
821 uint64_t *truncate_seq
,
822 uint64_t *truncate_size
,
824 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_COPY_GET
);
825 osd_op
.op
.copy_get
.max
= max
;
826 ::encode(*cursor
, osd_op
.indata
);
827 ::encode(max
, osd_op
.indata
);
828 unsigned p
= ops
.size() - 1;
830 C_ObjectOperation_copyget
*h
=
831 new C_ObjectOperation_copyget(cursor
, out_size
, out_mtime
,
832 out_attrs
, out_data
, out_omap_header
,
833 out_omap_data
, out_snaps
, out_snap_seq
,
834 out_flags
, out_data_digest
,
835 out_omap_digest
, out_reqids
, truncate_seq
,
836 truncate_size
, prval
);
842 add_op(CEPH_OSD_OP_UNDIRTY
);
845 struct C_ObjectOperation_isdirty
: public Context
{
849 C_ObjectOperation_isdirty(bool *p
, int *r
)
850 : pisdirty(p
), prval(r
) {}
851 void finish(int r
) override
{
855 bufferlist::iterator p
= bl
.begin();
857 ::decode(isdirty
, p
);
860 } catch (buffer::error
& e
) {
867 void is_dirty(bool *pisdirty
, int *prval
) {
868 add_op(CEPH_OSD_OP_ISDIRTY
);
869 unsigned p
= ops
.size() - 1;
871 C_ObjectOperation_isdirty
*h
=
872 new C_ObjectOperation_isdirty(pisdirty
, prval
);
877 struct C_ObjectOperation_hit_set_ls
: public Context
{
879 std::list
< std::pair
<time_t, time_t> > *ptls
;
880 std::list
< std::pair
<ceph::real_time
, ceph::real_time
> > *putls
;
882 C_ObjectOperation_hit_set_ls(std::list
< std::pair
<time_t, time_t> > *t
,
883 std::list
< std::pair
<ceph::real_time
,
884 ceph::real_time
> > *ut
,
886 : ptls(t
), putls(ut
), prval(r
) {}
887 void finish(int r
) override
{
891 bufferlist::iterator p
= bl
.begin();
892 std::list
< std::pair
<ceph::real_time
, ceph::real_time
> > ls
;
896 for (auto p
= ls
.begin(); p
!= ls
.end(); ++p
)
897 // round initial timestamp up to the next full second to
898 // keep this a valid interval.
900 make_pair(ceph::real_clock::to_time_t(
902 // Sadly, no time literals until C++14.
903 std::chrono::seconds(1))),
904 ceph::real_clock::to_time_t(p
->second
)));
908 } catch (buffer::error
& e
) {
917 * list available HitSets.
919 * We will get back a list of time intervals. Note that the most
920 * recent range may have an empty end timestamp if it is still
923 * @param pls [out] list of time intervals
924 * @param prval [out] return value
926 void hit_set_ls(std::list
< std::pair
<time_t, time_t> > *pls
, int *prval
) {
927 add_op(CEPH_OSD_OP_PG_HITSET_LS
);
928 unsigned p
= ops
.size() - 1;
930 C_ObjectOperation_hit_set_ls
*h
=
931 new C_ObjectOperation_hit_set_ls(pls
, NULL
, prval
);
935 void hit_set_ls(std::list
<std::pair
<ceph::real_time
, ceph::real_time
> > *pls
,
937 add_op(CEPH_OSD_OP_PG_HITSET_LS
);
938 unsigned p
= ops
.size() - 1;
940 C_ObjectOperation_hit_set_ls
*h
=
941 new C_ObjectOperation_hit_set_ls(NULL
, pls
, prval
);
949 * Return an encoded HitSet that includes the provided time
952 * @param stamp [in] timestamp
953 * @param pbl [out] target buffer for encoded HitSet
954 * @param prval [out] return value
956 void hit_set_get(ceph::real_time stamp
, bufferlist
*pbl
, int *prval
) {
957 OSDOp
& op
= add_op(CEPH_OSD_OP_PG_HITSET_GET
);
958 op
.op
.hit_set_get
.stamp
= ceph::real_clock::to_ceph_timespec(stamp
);
959 unsigned p
= ops
.size() - 1;
964 void omap_get_header(bufferlist
*bl
, int *prval
) {
965 add_op(CEPH_OSD_OP_OMAPGETHEADER
);
966 unsigned p
= ops
.size() - 1;
971 void omap_set(const map
<string
, bufferlist
> &map
) {
974 add_data(CEPH_OSD_OP_OMAPSETVALS
, 0, bl
.length(), bl
);
977 void omap_set_header(bufferlist
&bl
) {
978 add_data(CEPH_OSD_OP_OMAPSETHEADER
, 0, bl
.length(), bl
);
982 add_op(CEPH_OSD_OP_OMAPCLEAR
);
985 void omap_rm_keys(const std::set
<std::string
> &to_remove
) {
987 ::encode(to_remove
, bl
);
988 add_data(CEPH_OSD_OP_OMAPRMKEYS
, 0, bl
.length(), bl
);
992 void call(const char *cname
, const char *method
, bufferlist
&indata
) {
993 add_call(CEPH_OSD_OP_CALL
, cname
, method
, indata
, NULL
, NULL
, NULL
);
996 void call(const char *cname
, const char *method
, bufferlist
&indata
,
997 bufferlist
*outdata
, Context
*ctx
, int *prval
) {
998 add_call(CEPH_OSD_OP_CALL
, cname
, method
, indata
, outdata
, ctx
, prval
);
1002 void watch(uint64_t cookie
, __u8 op
, uint32_t timeout
= 0) {
1003 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_WATCH
);
1004 osd_op
.op
.watch
.cookie
= cookie
;
1005 osd_op
.op
.watch
.op
= op
;
1006 osd_op
.op
.watch
.timeout
= timeout
;
1009 void notify(uint64_t cookie
, uint32_t prot_ver
, uint32_t timeout
,
1010 bufferlist
&bl
, bufferlist
*inbl
) {
1011 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_NOTIFY
);
1012 osd_op
.op
.notify
.cookie
= cookie
;
1013 ::encode(prot_ver
, *inbl
);
1014 ::encode(timeout
, *inbl
);
1015 ::encode(bl
, *inbl
);
1016 osd_op
.indata
.append(*inbl
);
1019 void notify_ack(uint64_t notify_id
, uint64_t cookie
,
1020 bufferlist
& reply_bl
) {
1021 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_NOTIFY_ACK
);
1023 ::encode(notify_id
, bl
);
1024 ::encode(cookie
, bl
);
1025 ::encode(reply_bl
, bl
);
1026 osd_op
.indata
.append(bl
);
1029 void list_watchers(list
<obj_watch_t
> *out
,
1031 (void)add_op(CEPH_OSD_OP_LIST_WATCHERS
);
1033 unsigned p
= ops
.size() - 1;
1034 C_ObjectOperation_decodewatchers
*h
=
1035 new C_ObjectOperation_decodewatchers(out
, prval
);
1038 out_rval
[p
] = prval
;
1042 void list_snaps(librados::snap_set_t
*out
, int *prval
) {
1043 (void)add_op(CEPH_OSD_OP_LIST_SNAPS
);
1045 unsigned p
= ops
.size() - 1;
1046 C_ObjectOperation_decodesnaps
*h
=
1047 new C_ObjectOperation_decodesnaps(out
, prval
);
1050 out_rval
[p
] = prval
;
1054 void assert_version(uint64_t ver
) {
1055 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_ASSERT_VER
);
1056 osd_op
.op
.assert_ver
.ver
= ver
;
1059 void cmpxattr(const char *name
, const bufferlist
& val
,
1061 add_xattr(CEPH_OSD_OP_CMPXATTR
, name
, val
);
1062 OSDOp
& o
= *ops
.rbegin();
1063 o
.op
.xattr
.cmp_op
= op
;
1064 o
.op
.xattr
.cmp_mode
= mode
;
1067 void rollback(uint64_t snapid
) {
1068 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_ROLLBACK
);
1069 osd_op
.op
.snap
.snapid
= snapid
;
1072 void copy_from(object_t src
, snapid_t snapid
, object_locator_t src_oloc
,
1073 version_t src_version
, unsigned flags
,
1074 unsigned src_fadvise_flags
) {
1075 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_COPY_FROM
);
1076 osd_op
.op
.copy_from
.snapid
= snapid
;
1077 osd_op
.op
.copy_from
.src_version
= src_version
;
1078 osd_op
.op
.copy_from
.flags
= flags
;
1079 osd_op
.op
.copy_from
.src_fadvise_flags
= src_fadvise_flags
;
1080 ::encode(src
, osd_op
.indata
);
1081 ::encode(src_oloc
, osd_op
.indata
);
1085 * writeback content to backing tier
1087 * If object is marked dirty in the cache tier, write back content
1088 * to backing tier. If the object is clean this is a no-op.
1090 * If writeback races with an update, the update will block.
1092 * use with IGNORE_CACHE to avoid triggering promote.
1094 void cache_flush() {
1095 add_op(CEPH_OSD_OP_CACHE_FLUSH
);
1099 * writeback content to backing tier
1101 * If object is marked dirty in the cache tier, write back content
1102 * to backing tier. If the object is clean this is a no-op.
1104 * If writeback races with an update, return EAGAIN. Requires that
1105 * the SKIPRWLOCKS flag be set.
1107 * use with IGNORE_CACHE to avoid triggering promote.
1109 void cache_try_flush() {
1110 add_op(CEPH_OSD_OP_CACHE_TRY_FLUSH
);
1114 * evict object from cache tier
1116 * If object is marked clean, remove the object from the cache tier.
1117 * Otherwise, return EBUSY.
1119 * use with IGNORE_CACHE to avoid triggering promote.
1121 void cache_evict() {
1122 add_op(CEPH_OSD_OP_CACHE_EVICT
);
1128 void set_redirect(object_t tgt
, snapid_t snapid
, object_locator_t tgt_oloc
,
1129 version_t tgt_version
) {
1130 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_SET_REDIRECT
);
1131 osd_op
.op
.copy_from
.snapid
= snapid
;
1132 osd_op
.op
.copy_from
.src_version
= tgt_version
;
1133 ::encode(tgt
, osd_op
.indata
);
1134 ::encode(tgt_oloc
, osd_op
.indata
);
1137 void set_alloc_hint(uint64_t expected_object_size
,
1138 uint64_t expected_write_size
,
1140 add_alloc_hint(CEPH_OSD_OP_SETALLOCHINT
, expected_object_size
,
1141 expected_write_size
, flags
);
1143 // CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
1144 // not worth a feature bit. Set FAILOK per-op flag to make
1145 // sure older osds don't trip over an unsupported opcode.
1146 set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK
);
1149 void dup(vector
<OSDOp
>& sops
) {
1151 out_bl
.resize(sops
.size());
1152 out_handler
.resize(sops
.size());
1153 out_rval
.resize(sops
.size());
1154 for (uint32_t i
= 0; i
< sops
.size(); i
++) {
1155 out_bl
[i
] = &sops
[i
].outdata
;
1156 out_handler
[i
] = NULL
;
1157 out_rval
[i
] = &sops
[i
].rval
;
1162 * Pin/unpin an object in cache tier
1165 add_op(CEPH_OSD_OP_CACHE_PIN
);
1168 void cache_unpin() {
1169 add_op(CEPH_OSD_OP_CACHE_UNPIN
);
1177 class Objecter
: public md_config_obs_t
, public Dispatcher
{
1179 // config observer bits
1180 const char** get_tracked_conf_keys() const override
;
1181 void handle_conf_change(const struct md_config_t
*conf
,
1182 const std::set
<std::string
> &changed
) override
;
1185 Messenger
*messenger
;
1188 ZTracer::Endpoint trace_endpoint
;
1192 using Dispatcher::cct
;
1193 std::multimap
<string
,string
> crush_location
;
1195 std::atomic
<bool> initialized
{false};
1198 std::atomic
<uint64_t> last_tid
{0};
1199 std::atomic
<unsigned> inflight_ops
{0};
1200 std::atomic
<int> client_inc
{-1};
1201 uint64_t max_linger_id
;
1202 std::atomic
<unsigned> num_in_flight
{0};
1203 std::atomic
<int> global_op_flags
{0}; // flags which are applied to each IO op
1204 bool keep_balanced_budget
;
1205 bool honor_osdmap_full
;
1206 bool osdmap_full_try
;
1208 // If this is true, accumulate a set of blacklisted entities
1209 // to be drained by consume_blacklist_events.
1210 bool blacklist_events_enabled
;
1211 std::set
<entity_addr_t
> blacklist_events
;
1214 void maybe_request_map();
1216 void enable_blacklist_events();
1219 void _maybe_request_map();
1221 version_t last_seen_osdmap_version
;
1222 version_t last_seen_pgmap_version
;
1224 mutable boost::shared_mutex rwlock
;
1225 using lock_guard
= std::unique_lock
<decltype(rwlock
)>;
1226 using unique_lock
= std::unique_lock
<decltype(rwlock
)>;
1227 using shared_lock
= boost::shared_lock
<decltype(rwlock
)>;
1228 using shunique_lock
= ceph::shunique_lock
<decltype(rwlock
)>;
1229 ceph::timer
<ceph::mono_clock
> timer
;
1231 PerfCounters
*logger
;
1233 uint64_t tick_event
;
1237 void update_crush_location();
1239 class RequestStateHook
;
1241 RequestStateHook
*m_request_state_hook
;
1244 /*** track pending operations ***/
1250 struct op_target_t
{
1253 epoch_t epoch
= 0; ///< latest epoch we calculated the mapping
1256 object_locator_t base_oloc
;
1257 object_t target_oid
;
1258 object_locator_t target_oloc
;
1260 ///< true if we are directed at base_pgid, not base_oid
1261 bool precalc_pgid
= false;
1263 ///< true if we have ever mapped to a valid pool
1264 bool pool_ever_existed
= false;
1266 ///< explcit pg target, if any
1269 pg_t pgid
; ///< last (raw) pg we mapped to
1270 spg_t actual_pgid
; ///< last (actual) spg_t we mapped to
1271 unsigned pg_num
= 0; ///< last pg_num we mapped to
1272 unsigned pg_num_mask
= 0; ///< last pg_num_mask we mapped to
1273 vector
<int> up
; ///< set of up osds for last pg we mapped to
1274 vector
<int> acting
; ///< set of acting osds for last pg we mapped to
1275 int up_primary
= -1; ///< last up_primary we mapped to
1276 int acting_primary
= -1; ///< last acting_primary we mapped to
1277 int size
= -1; ///< the size of the pool when were were last mapped
1278 int min_size
= -1; ///< the min size of the pool when were were last mapped
1279 bool sort_bitwise
= false; ///< whether the hobject_t sort order is bitwise
1280 bool recovery_deletes
= false; ///< whether the deletes are performed during recovery instead of peering
1282 bool used_replica
= false;
1283 bool paused
= false;
1285 int osd
= -1; ///< the final target osd, or -1
1287 epoch_t last_force_resend
= 0;
1289 op_target_t(object_t oid
, object_locator_t oloc
, int flags
)
1295 op_target_t(pg_t pgid
)
1296 : base_oloc(pgid
.pool(), pgid
.ps()),
1301 op_target_t() = default;
1303 hobject_t
get_hobj() {
1304 return hobject_t(target_oid
,
1307 target_oloc
.hash
>= 0 ? target_oloc
.hash
: pgid
.ps(),
1309 target_oloc
.nspace
);
1312 bool contained_by(const hobject_t
& begin
, const hobject_t
& end
) {
1313 hobject_t h
= get_hobj();
1314 int r
= cmp(h
, begin
);
1315 return r
== 0 || (r
> 0 && h
< end
);
1318 void dump(Formatter
*f
) const;
1321 struct Op
: public RefCountedObject
{
1322 OSDSession
*session
;
1327 ConnectionRef con
; // for rx buffer only
1328 uint64_t features
; // explicitly specified op features
1334 ceph::real_time mtime
;
1337 vector
<bufferlist
*> out_bl
;
1338 vector
<Context
*> out_handler
;
1339 vector
<int*> out_rval
;
1349 epoch_t
*reply_epoch
;
1351 ceph::mono_time stamp
;
1353 epoch_t map_dne_bound
;
1357 /// true if we should resend this message on failure
1360 /// true if the throttle budget is get/put on a series of OPs,
1361 /// instead of per OP basis, when this flag is set, the budget is
1362 /// acquired before sending the very first OP of the series and
1363 /// released upon receiving the last OP reply.
1368 osd_reqid_t reqid
; // explicitly setting reqid
1369 ZTracer::Trace trace
;
1371 Op(const object_t
& o
, const object_locator_t
& ol
, vector
<OSDOp
>& op
,
1372 int f
, Context
*fin
, version_t
*ov
, int *offset
= NULL
,
1373 ZTracer::Trace
*parent_trace
= nullptr) :
1374 session(NULL
), incarnation(0),
1377 features(CEPH_FEATURES_SUPPORTED_DEFAULT
),
1378 snapid(CEPH_NOSNAP
),
1389 should_resend(true),
1390 ctx_budgeted(false),
1391 data_offset(offset
) {
1394 /* initialize out_* to match op vector */
1395 out_bl
.resize(ops
.size());
1396 out_rval
.resize(ops
.size());
1397 out_handler
.resize(ops
.size());
1398 for (unsigned i
= 0; i
< ops
.size(); i
++) {
1400 out_handler
[i
] = NULL
;
1404 if (target
.base_oloc
.key
== o
)
1405 target
.base_oloc
.key
.clear();
1407 if (parent_trace
&& parent_trace
->valid()) {
1408 trace
.init("op", nullptr, parent_trace
);
1409 trace
.event("start");
1413 bool operator<(const Op
& other
) const {
1414 return tid
< other
.tid
;
1417 bool respects_full() const {
1419 (target
.flags
& (CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_RWORDERED
)) &&
1420 !(target
.flags
& (CEPH_OSD_FLAG_FULL_TRY
| CEPH_OSD_FLAG_FULL_FORCE
));
1425 while (!out_handler
.empty()) {
1426 delete out_handler
.back();
1427 out_handler
.pop_back();
1429 trace
.event("finish");
1433 struct C_Op_Map_Latest
: public Context
{
1437 C_Op_Map_Latest(Objecter
*o
, ceph_tid_t t
) : objecter(o
), tid(t
),
1439 void finish(int r
) override
;
1442 struct C_Command_Map_Latest
: public Context
{
1446 C_Command_Map_Latest(Objecter
*o
, ceph_tid_t t
) : objecter(o
), tid(t
),
1448 void finish(int r
) override
;
1451 struct C_Stat
: public Context
{
1454 ceph::real_time
*pmtime
;
1456 C_Stat(uint64_t *ps
, ceph::real_time
*pm
, Context
*c
) :
1457 psize(ps
), pmtime(pm
), fin(c
) {}
1458 void finish(int r
) override
{
1460 bufferlist::iterator p
= bl
.begin();
1474 struct C_GetAttrs
: public Context
{
1476 map
<string
,bufferlist
>& attrset
;
1478 C_GetAttrs(map
<string
, bufferlist
>& set
, Context
*c
) : attrset(set
),
1480 void finish(int r
) override
{
1482 bufferlist::iterator p
= bl
.begin();
1483 ::decode(attrset
, p
);
1490 // Pools and statistics
1491 struct NListContext
{
1492 collection_list_handle_t pos
;
1494 // these are for !sortbitwise compat only
1496 int starting_pg_num
= 0;
1497 bool sort_bitwise
= false;
1499 bool at_end_of_pool
= false; ///< publicly visible end flag
1501 int64_t pool_id
= -1;
1502 int pool_snap_seq
= 0;
1503 uint64_t max_entries
= 0;
1506 bufferlist bl
; // raw data read to here
1507 std::list
<librados::ListObjectImpl
> list
;
1511 bufferlist extra_info
;
1513 // The budget associated with this context, once it is set (>= 0),
1514 // the budget is not get/released on OP basis, instead the budget
1515 // is acquired before sending the first OP and released upon receiving
1516 // the last op reply.
1517 int ctx_budget
= -1;
1519 bool at_end() const {
1520 return at_end_of_pool
;
1523 uint32_t get_pg_hash_position() const {
1524 return pos
.get_hash();
1528 struct C_NList
: public Context
{
1529 NListContext
*list_context
;
1530 Context
*final_finish
;
1533 C_NList(NListContext
*lc
, Context
* finish
, Objecter
*ob
) :
1534 list_context(lc
), final_finish(finish
), objecter(ob
), epoch(0) {}
1535 void finish(int r
) override
{
1537 objecter
->_nlist_reply(list_context
, r
, final_finish
, epoch
);
1539 final_finish
->complete(r
);
1548 map
<string
,pool_stat_t
> *pool_stats
;
1552 ceph::mono_time last_submit
;
1557 struct ceph_statfs
*stats
;
1561 ceph::mono_time last_submit
;
1576 ceph::mono_time last_submit
;
1577 PoolOp() : tid(0), pool(0), onfinish(NULL
), ontimeout(0), pool_op(0),
1578 auid(0), crush_rule(0), snapid(0), blp(NULL
) {}
1581 // -- osd commands --
1582 struct CommandOp
: public RefCountedObject
{
1583 OSDSession
*session
= nullptr;
1587 bufferlist
*poutbl
= nullptr;
1588 string
*prs
= nullptr;
1590 // target_osd == -1 means target_pg is valid
1591 const int target_osd
= -1;
1592 const pg_t target_pg
;
1596 epoch_t map_dne_bound
= 0;
1597 int map_check_error
= 0; // error to return if map check fails
1598 const char *map_check_error_str
= nullptr;
1600 Context
*onfinish
= nullptr;
1601 uint64_t ontimeout
= 0;
1602 ceph::mono_time last_submit
;
1606 const vector
<string
> &cmd
,
1615 target_osd(target_osd
),
1616 onfinish(onfinish
) {}
1620 const vector
<string
> &cmd
,
1631 onfinish(onfinish
) {}
1635 void submit_command(CommandOp
*c
, ceph_tid_t
*ptid
);
1636 int _calc_command_target(CommandOp
*c
, shunique_lock
&sul
);
1637 void _assign_command_session(CommandOp
*c
, shunique_lock
&sul
);
1638 void _send_command(CommandOp
*c
);
1639 int command_op_cancel(OSDSession
*s
, ceph_tid_t tid
, int r
);
1640 void _finish_command(CommandOp
*c
, int r
, string rs
);
1641 void handle_command_reply(MCommandReply
*m
);
1644 // -- lingering ops --
1646 struct WatchContext
{
1647 // this simply mirrors librados WatchCtx2
1648 virtual void handle_notify(uint64_t notify_id
,
1650 uint64_t notifier_id
,
1651 bufferlist
& bl
) = 0;
1652 virtual void handle_error(uint64_t cookie
, int err
) = 0;
1653 virtual ~WatchContext() {}
1656 struct LingerOp
: public RefCountedObject
{
1663 ceph::real_time mtime
;
1671 ceph::mono_time watch_valid_thru
; ///< send time for last acked ping
1672 int last_error
; ///< error from last failed ping|reconnect, if any
1673 boost::shared_mutex watch_lock
;
1674 using lock_guard
= std::unique_lock
<decltype(watch_lock
)>;
1675 using unique_lock
= std::unique_lock
<decltype(watch_lock
)>;
1676 using shared_lock
= boost::shared_lock
<decltype(watch_lock
)>;
1677 using shunique_lock
= ceph::shunique_lock
<decltype(watch_lock
)>;
1679 // queue of pending async operations, with the timestamp of
1680 // when they were queued.
1681 list
<ceph::mono_time
> watch_pending_async
;
1683 uint32_t register_gen
;
1686 Context
*on_reg_commit
;
1688 // we trigger these from an async finisher
1689 Context
*on_notify_finish
;
1690 bufferlist
*notify_result_bl
;
1693 WatchContext
*watch_context
;
1695 OSDSession
*session
;
1697 ceph_tid_t register_tid
;
1698 ceph_tid_t ping_tid
;
1699 epoch_t map_dne_bound
;
1701 void _queued_async() {
1702 // watch_lock ust be locked unique
1703 watch_pending_async
.push_back(ceph::mono_clock::now());
1705 void finished_async() {
1706 unique_lock
l(watch_lock
);
1707 assert(!watch_pending_async
.empty());
1708 watch_pending_async
.pop_front();
1711 LingerOp() : linger_id(0),
1712 target(object_t(), object_locator_t(), 0),
1713 snap(CEPH_NOSNAP
), poutbl(NULL
), pobjver(NULL
),
1714 is_watch(false), last_error(0),
1718 on_reg_commit(NULL
),
1719 on_notify_finish(NULL
),
1720 notify_result_bl(NULL
),
1722 watch_context(NULL
),
1729 const LingerOp
&operator=(const LingerOp
& r
);
1730 LingerOp(const LingerOp
& o
);
1732 uint64_t get_cookie() {
1733 return reinterpret_cast<uint64_t>(this);
1737 ~LingerOp() override
{
1738 delete watch_context
;
1742 struct C_Linger_Commit
: public Context
{
1745 bufferlist outbl
; // used for notify only
1746 C_Linger_Commit(Objecter
*o
, LingerOp
*l
) : objecter(o
), info(l
) {
1749 ~C_Linger_Commit() override
{
1752 void finish(int r
) override
{
1753 objecter
->_linger_commit(info
, r
, outbl
);
1757 struct C_Linger_Reconnect
: public Context
{
1760 C_Linger_Reconnect(Objecter
*o
, LingerOp
*l
) : objecter(o
), info(l
) {
1763 ~C_Linger_Reconnect() override
{
1766 void finish(int r
) override
{
1767 objecter
->_linger_reconnect(info
, r
);
1771 struct C_Linger_Ping
: public Context
{
1774 ceph::mono_time sent
;
1775 uint32_t register_gen
;
1776 C_Linger_Ping(Objecter
*o
, LingerOp
*l
)
1777 : objecter(o
), info(l
), register_gen(info
->register_gen
) {
1780 ~C_Linger_Ping() override
{
1783 void finish(int r
) override
{
1784 objecter
->_linger_ping(info
, r
, sent
, register_gen
);
1788 struct C_Linger_Map_Latest
: public Context
{
1792 C_Linger_Map_Latest(Objecter
*o
, uint64_t id
) :
1793 objecter(o
), linger_id(id
), latest(0) {}
1794 void finish(int r
) override
;
1797 // -- osd sessions --
1801 hobject_t begin
, end
;
1804 struct OSDSession
: public RefCountedObject
{
1805 boost::shared_mutex lock
;
1806 using lock_guard
= std::lock_guard
<decltype(lock
)>;
1807 using unique_lock
= std::unique_lock
<decltype(lock
)>;
1808 using shared_lock
= boost::shared_lock
<decltype(lock
)>;
1809 using shunique_lock
= ceph::shunique_lock
<decltype(lock
)>;
1812 map
<ceph_tid_t
,Op
*> ops
;
1813 map
<uint64_t, LingerOp
*> linger_ops
;
1814 map
<ceph_tid_t
,CommandOp
*> command_ops
;
1817 map
<spg_t
,map
<hobject_t
,OSDBackoff
>> backoffs
;
1818 map
<uint64_t,OSDBackoff
*> backoffs_by_id
;
1824 std::unique_ptr
<std::mutex
[]> completion_locks
;
1825 using unique_completion_lock
= std::unique_lock
<
1826 decltype(completion_locks
)::element_type
>;
1829 OSDSession(CephContext
*cct
, int o
) :
1830 osd(o
), incarnation(0), con(NULL
),
1831 num_locks(cct
->_conf
->objecter_completion_locks_per_session
),
1832 completion_locks(new std::mutex
[num_locks
]) {}
1834 ~OSDSession() override
;
1836 bool is_homeless() { return (osd
== -1); }
1838 unique_completion_lock
get_lock(object_t
& oid
);
1840 map
<int,OSDSession
*> osd_sessions
;
1842 bool osdmap_full_flag() const;
1843 bool osdmap_pool_full(const int64_t pool_id
) const;
1848 * Test pg_pool_t::FLAG_FULL on a pool
1850 * @return true if the pool exists and has the flag set, or
1851 * the global full flag is set, else false
1853 bool _osdmap_pool_full(const int64_t pool_id
) const;
1854 bool _osdmap_pool_full(const pg_pool_t
&p
) const;
1855 void update_pool_full_map(map
<int64_t, bool>& pool_full_map
);
1857 map
<uint64_t, LingerOp
*> linger_ops
;
1858 // we use this just to confirm a cookie is valid before dereferencing the ptr
1859 set
<LingerOp
*> linger_ops_set
;
1861 map
<ceph_tid_t
,PoolStatOp
*> poolstat_ops
;
1862 map
<ceph_tid_t
,StatfsOp
*> statfs_ops
;
1863 map
<ceph_tid_t
,PoolOp
*> pool_ops
;
1864 std::atomic
<unsigned> num_homeless_ops
{0};
1866 OSDSession
*homeless_session
;
1868 // ops waiting for an osdmap with a new pool or confirmation that
1869 // the pool does not exist (may be expanded to other uses later)
1870 map
<uint64_t, LingerOp
*> check_latest_map_lingers
;
1871 map
<ceph_tid_t
, Op
*> check_latest_map_ops
;
1872 map
<ceph_tid_t
, CommandOp
*> check_latest_map_commands
;
1874 map
<epoch_t
,list
< pair
<Context
*, int> > > waiting_for_map
;
1876 ceph::timespan mon_timeout
;
1877 ceph::timespan osd_timeout
;
1879 MOSDOp
*_prepare_osd_op(Op
*op
);
1880 void _send_op(Op
*op
, MOSDOp
*m
= NULL
);
1881 void _send_op_account(Op
*op
);
1882 void _cancel_linger_op(Op
*op
);
1883 void finish_op(OSDSession
*session
, ceph_tid_t tid
);
1884 void _finish_op(Op
*op
, int r
);
1885 static bool is_pg_changed(
1887 const vector
<int>& oldacting
,
1889 const vector
<int>& newacting
,
1890 bool any_change
=false);
1891 enum recalc_op_target_result
{
1892 RECALC_OP_TARGET_NO_ACTION
= 0,
1893 RECALC_OP_TARGET_NEED_RESEND
,
1894 RECALC_OP_TARGET_POOL_DNE
,
1895 RECALC_OP_TARGET_OSD_DNE
,
1896 RECALC_OP_TARGET_OSD_DOWN
,
1898 bool _osdmap_full_flag() const;
1899 bool _osdmap_has_pool_full() const;
1901 bool target_should_be_paused(op_target_t
*op
);
1902 int _calc_target(op_target_t
*t
, Connection
*con
,
1903 bool any_change
= false);
1904 int _map_session(op_target_t
*op
, OSDSession
**s
,
1907 void _session_op_assign(OSDSession
*s
, Op
*op
);
1908 void _session_op_remove(OSDSession
*s
, Op
*op
);
1909 void _session_linger_op_assign(OSDSession
*to
, LingerOp
*op
);
1910 void _session_linger_op_remove(OSDSession
*from
, LingerOp
*op
);
1911 void _session_command_op_assign(OSDSession
*to
, CommandOp
*op
);
1912 void _session_command_op_remove(OSDSession
*from
, CommandOp
*op
);
1914 int _assign_op_target_session(Op
*op
, shunique_lock
& lc
,
1915 bool src_session_locked
,
1916 bool dst_session_locked
);
1917 int _recalc_linger_op_target(LingerOp
*op
, shunique_lock
& lc
);
1919 void _linger_submit(LingerOp
*info
, shunique_lock
& sul
);
1920 void _send_linger(LingerOp
*info
, shunique_lock
& sul
);
1921 void _linger_commit(LingerOp
*info
, int r
, bufferlist
& outbl
);
1922 void _linger_reconnect(LingerOp
*info
, int r
);
1923 void _send_linger_ping(LingerOp
*info
);
1924 void _linger_ping(LingerOp
*info
, int r
, ceph::mono_time sent
,
1925 uint32_t register_gen
);
1926 int _normalize_watch_error(int r
);
1928 friend class C_DoWatchError
;
1930 void linger_callback_flush(Context
*ctx
) {
1931 finisher
->queue(ctx
);
1935 void _check_op_pool_dne(Op
*op
, unique_lock
*sl
);
1936 void _send_op_map_check(Op
*op
);
1937 void _op_cancel_map_check(Op
*op
);
1938 void _check_linger_pool_dne(LingerOp
*op
, bool *need_unregister
);
1939 void _send_linger_map_check(LingerOp
*op
);
1940 void _linger_cancel_map_check(LingerOp
*op
);
1941 void _check_command_map_dne(CommandOp
*op
);
1942 void _send_command_map_check(CommandOp
*op
);
1943 void _command_cancel_map_check(CommandOp
*op
);
1945 void kick_requests(OSDSession
*session
);
1946 void _kick_requests(OSDSession
*session
, map
<uint64_t, LingerOp
*>& lresend
);
1947 void _linger_ops_resend(map
<uint64_t, LingerOp
*>& lresend
, unique_lock
& ul
);
1949 int _get_session(int osd
, OSDSession
**session
, shunique_lock
& sul
);
1950 void put_session(OSDSession
*s
);
1951 void get_session(OSDSession
*s
);
1952 void _reopen_session(OSDSession
*session
);
1953 void close_session(OSDSession
*session
);
1955 void _nlist_reply(NListContext
*list_context
, int r
, Context
*final_finish
,
1956 epoch_t reply_epoch
);
1958 void resend_mon_ops();
1961 * handle a budget for in-flight ops
1962 * budget is taken whenever an op goes into the ops map
1963 * and returned whenever an op is removed from the map
1964 * If throttle_op needs to throttle it will unlock client_lock.
1966 int calc_op_budget(Op
*op
);
1967 void _throttle_op(Op
*op
, shunique_lock
& sul
, int op_size
= 0);
1968 int _take_op_budget(Op
*op
, shunique_lock
& sul
) {
1969 assert(sul
&& sul
.mutex() == &rwlock
);
1970 int op_budget
= calc_op_budget(op
);
1971 if (keep_balanced_budget
) {
1972 _throttle_op(op
, sul
, op_budget
);
1974 op_throttle_bytes
.take(op_budget
);
1975 op_throttle_ops
.take(1);
1977 op
->budgeted
= true;
1980 void put_op_budget_bytes(int op_budget
) {
1981 assert(op_budget
>= 0);
1982 op_throttle_bytes
.put(op_budget
);
1983 op_throttle_ops
.put(1);
1985 void put_op_budget(Op
*op
) {
1986 assert(op
->budgeted
);
1987 int op_budget
= calc_op_budget(op
);
1988 put_op_budget_bytes(op_budget
);
1990 void put_nlist_context_budget(NListContext
*list_context
);
1991 Throttle op_throttle_bytes
, op_throttle_ops
;
1994 Objecter(CephContext
*cct_
, Messenger
*m
, MonClient
*mc
,
1997 double osd_timeout
) :
1998 Dispatcher(cct_
), messenger(m
), monc(mc
), finisher(fin
),
1999 trace_endpoint("0.0.0.0", 0, "Objecter"),
2002 keep_balanced_budget(false), honor_osdmap_full(true), osdmap_full_try(false),
2003 blacklist_events_enabled(false),
2004 last_seen_osdmap_version(0), last_seen_pgmap_version(0),
2005 logger(NULL
), tick_event(0), m_request_state_hook(NULL
),
2006 homeless_session(new OSDSession(cct
, -1)),
2007 mon_timeout(ceph::make_timespan(mon_timeout
)),
2008 osd_timeout(ceph::make_timespan(osd_timeout
)),
2009 op_throttle_bytes(cct
, "objecter_bytes",
2010 cct
->_conf
->objecter_inflight_op_bytes
),
2011 op_throttle_ops(cct
, "objecter_ops", cct
->_conf
->objecter_inflight_ops
),
2013 retry_writes_after_first_reply(cct
->_conf
->objecter_retry_writes_after_first_reply
)
2015 ~Objecter() override
;
2018 void start(const OSDMap
*o
= nullptr);
2021 // These two templates replace osdmap_(get)|(put)_read. Simply wrap
2022 // whatever functionality you want to use the OSDMap in a lambda like:
2024 // with_osdmap([](const OSDMap& o) { o.do_stuff(); });
2028 // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); });
2030 // Do not call into something that will try to lock the OSDMap from
2031 // here or you will have great woe and misery.
2033 template<typename Callback
, typename
...Args
>
2034 auto with_osdmap(Callback
&& cb
, Args
&&... args
) const ->
2035 decltype(cb(*osdmap
, std::forward
<Args
>(args
)...)) {
2036 shared_lock
l(rwlock
);
2037 return std::forward
<Callback
>(cb
)(*osdmap
, std::forward
<Args
>(args
)...);
2042 * Tell the objecter to throttle outgoing ops according to its
2043 * budget (in _conf). If you do this, ops can block, in
2044 * which case it will unlock client_lock and sleep until
2045 * incoming messages reduce the used budget low enough for
2046 * the ops to continue going; then it will lock client_lock again.
2048 void set_balanced_budget() { keep_balanced_budget
= true; }
2049 void unset_balanced_budget() { keep_balanced_budget
= false; }
2051 void set_honor_osdmap_full() { honor_osdmap_full
= true; }
2052 void unset_honor_osdmap_full() { honor_osdmap_full
= false; }
2054 void set_osdmap_full_try() { osdmap_full_try
= true; }
2055 void unset_osdmap_full_try() { osdmap_full_try
= false; }
2057 void _scan_requests(OSDSession
*s
,
2060 map
<int64_t, bool> *pool_full_map
,
2061 map
<ceph_tid_t
, Op
*>& need_resend
,
2062 list
<LingerOp
*>& need_resend_linger
,
2063 map
<ceph_tid_t
, CommandOp
*>& need_resend_command
,
2064 shunique_lock
& sul
);
2066 int64_t get_object_hash_position(int64_t pool
, const string
& key
,
2068 int64_t get_object_pg_hash_position(int64_t pool
, const string
& key
,
2073 bool ms_dispatch(Message
*m
) override
;
2074 bool ms_can_fast_dispatch_any() const override
{
2077 bool ms_can_fast_dispatch(const Message
*m
) const override
{
2078 switch (m
->get_type()) {
2079 case CEPH_MSG_OSD_OPREPLY
:
2080 case CEPH_MSG_WATCH_NOTIFY
:
2086 void ms_fast_dispatch(Message
*m
) override
{
2087 if (!ms_dispatch(m
)) {
2092 void handle_osd_op_reply(class MOSDOpReply
*m
);
2093 void handle_osd_backoff(class MOSDBackoff
*m
);
2094 void handle_watch_notify(class MWatchNotify
*m
);
2095 void handle_osd_map(class MOSDMap
*m
);
2096 void wait_for_osd_map();
2099 * Get list of entities blacklisted since this was last called,
2100 * and reset the list.
2102 * Uses a std::set because typical use case is to compare some
2103 * other list of clients to see which overlap with the blacklisted
2107 void consume_blacklist_events(std::set
<entity_addr_t
> *events
);
2109 int pool_snap_by_name(int64_t poolid
,
2110 const char *snap_name
,
2111 snapid_t
*snap
) const;
2112 int pool_snap_get_info(int64_t poolid
, snapid_t snap
,
2113 pool_snap_info_t
*info
) const;
2114 int pool_snap_list(int64_t poolid
, vector
<uint64_t> *snaps
);
2117 void emit_blacklist_events(const OSDMap::Incremental
&inc
);
2118 void emit_blacklist_events(const OSDMap
&old_osd_map
,
2119 const OSDMap
&new_osd_map
);
2122 void _op_submit(Op
*op
, shunique_lock
& lc
, ceph_tid_t
*ptid
);
2123 void _op_submit_with_budget(Op
*op
, shunique_lock
& lc
,
2125 int *ctx_budget
= NULL
);
2126 inline void unregister_op(Op
*op
);
2130 void op_submit(Op
*op
, ceph_tid_t
*ptid
= NULL
, int *ctx_budget
= NULL
);
2132 shared_lock
l(rwlock
);
2133 return !((!inflight_ops
) && linger_ops
.empty() &&
2134 poolstat_ops
.empty() && statfs_ops
.empty());
2138 * Output in-flight requests
2140 void _dump_active(OSDSession
*s
);
2141 void _dump_active();
2143 void dump_requests(Formatter
*fmt
);
2144 void _dump_ops(const OSDSession
*s
, Formatter
*fmt
);
2145 void dump_ops(Formatter
*fmt
);
2146 void _dump_linger_ops(const OSDSession
*s
, Formatter
*fmt
);
2147 void dump_linger_ops(Formatter
*fmt
);
2148 void _dump_command_ops(const OSDSession
*s
, Formatter
*fmt
);
2149 void dump_command_ops(Formatter
*fmt
);
2150 void dump_pool_ops(Formatter
*fmt
) const;
2151 void dump_pool_stat_ops(Formatter
*fmt
) const;
2152 void dump_statfs_ops(Formatter
*fmt
) const;
2154 int get_client_incarnation() const { return client_inc
; }
2155 void set_client_incarnation(int inc
) { client_inc
= inc
; }
2157 bool have_map(epoch_t epoch
);
2158 /// wait for epoch; true if we already have it
2159 bool wait_for_map(epoch_t epoch
, Context
*c
, int err
=0);
2160 void _wait_for_new_map(Context
*c
, epoch_t epoch
, int err
=0);
2161 void wait_for_latest_osdmap(Context
*fin
);
2162 void get_latest_version(epoch_t oldest
, epoch_t neweset
, Context
*fin
);
2163 void _get_latest_version(epoch_t oldest
, epoch_t neweset
, Context
*fin
);
2165 /** Get the current set of global op flags */
2166 int get_global_op_flags() const { return global_op_flags
; }
2167 /** Add a flag to the global op flags, not really atomic operation */
2168 void add_global_op_flags(int flag
) {
2169 global_op_flags
.fetch_or(flag
);
2171 /** Clear the passed flags from the global op flag set */
2172 void clear_global_op_flag(int flags
) {
2173 global_op_flags
.fetch_and(~flags
);
2176 /// cancel an in-progress request with the given return code
2178 int op_cancel(OSDSession
*s
, ceph_tid_t tid
, int r
);
2179 int _op_cancel(ceph_tid_t tid
, int r
);
2181 int op_cancel(ceph_tid_t tid
, int r
);
2184 * Any write op which is in progress at the start of this call shall no
2185 * longer be in progress when this call ends. Operations started after the
2186 * start of this call may still be in progress when this call ends.
2188 * @return the latest possible epoch in which a cancelled op could have
2189 * existed, or -1 if nothing was cancelled.
2191 epoch_t
op_cancel_writes(int r
, int64_t pool
=-1);
2194 void osd_command(int osd
, const std::vector
<string
>& cmd
,
2195 const bufferlist
& inbl
, ceph_tid_t
*ptid
,
2196 bufferlist
*poutbl
, string
*prs
, Context
*onfinish
) {
2198 CommandOp
*c
= new CommandOp(
2205 submit_command(c
, ptid
);
2207 void pg_command(pg_t pgid
, const vector
<string
>& cmd
,
2208 const bufferlist
& inbl
, ceph_tid_t
*ptid
,
2209 bufferlist
*poutbl
, string
*prs
, Context
*onfinish
) {
2210 CommandOp
*c
= new CommandOp(
2217 submit_command(c
, ptid
);
2220 // mid-level helpers
2221 Op
*prepare_mutate_op(
2222 const object_t
& oid
, const object_locator_t
& oloc
,
2223 ObjectOperation
& op
, const SnapContext
& snapc
,
2224 ceph::real_time mtime
, int flags
,
2225 Context
*oncommit
, version_t
*objver
= NULL
,
2226 osd_reqid_t reqid
= osd_reqid_t(),
2227 ZTracer::Trace
*parent_trace
= nullptr) {
2228 Op
*o
= new Op(oid
, oloc
, op
.ops
, flags
| global_op_flags
|
2229 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
, nullptr, parent_trace
);
2230 o
->priority
= op
.priority
;
2233 o
->out_rval
.swap(op
.out_rval
);
2238 const object_t
& oid
, const object_locator_t
& oloc
,
2239 ObjectOperation
& op
, const SnapContext
& snapc
,
2240 ceph::real_time mtime
, int flags
,
2241 Context
*oncommit
, version_t
*objver
= NULL
,
2242 osd_reqid_t reqid
= osd_reqid_t()) {
2243 Op
*o
= prepare_mutate_op(oid
, oloc
, op
, snapc
, mtime
, flags
,
2244 oncommit
, objver
, reqid
);
2249 Op
*prepare_read_op(
2250 const object_t
& oid
, const object_locator_t
& oloc
,
2251 ObjectOperation
& op
,
2252 snapid_t snapid
, bufferlist
*pbl
, int flags
,
2253 Context
*onack
, version_t
*objver
= NULL
,
2254 int *data_offset
= NULL
,
2255 uint64_t features
= 0,
2256 ZTracer::Trace
*parent_trace
= nullptr) {
2257 Op
*o
= new Op(oid
, oloc
, op
.ops
, flags
| global_op_flags
|
2258 CEPH_OSD_FLAG_READ
, onack
, objver
, data_offset
, parent_trace
);
2259 o
->priority
= op
.priority
;
2262 if (!o
->outbl
&& op
.size() == 1 && op
.out_bl
[0]->length())
2263 o
->outbl
= op
.out_bl
[0];
2264 o
->out_bl
.swap(op
.out_bl
);
2265 o
->out_handler
.swap(op
.out_handler
);
2266 o
->out_rval
.swap(op
.out_rval
);
2270 const object_t
& oid
, const object_locator_t
& oloc
,
2271 ObjectOperation
& op
,
2272 snapid_t snapid
, bufferlist
*pbl
, int flags
,
2273 Context
*onack
, version_t
*objver
= NULL
,
2274 int *data_offset
= NULL
,
2275 uint64_t features
= 0) {
2276 Op
*o
= prepare_read_op(oid
, oloc
, op
, snapid
, pbl
, flags
, onack
, objver
,
2279 o
->features
= features
;
2284 Op
*prepare_pg_read_op(
2285 uint32_t hash
, object_locator_t oloc
,
2286 ObjectOperation
& op
, bufferlist
*pbl
, int flags
,
2287 Context
*onack
, epoch_t
*reply_epoch
,
2289 Op
*o
= new Op(object_t(), oloc
,
2291 flags
| global_op_flags
| CEPH_OSD_FLAG_READ
|
2292 CEPH_OSD_FLAG_IGNORE_OVERLAY
,
2294 o
->target
.precalc_pgid
= true;
2295 o
->target
.base_pgid
= pg_t(hash
, oloc
.pool
);
2296 o
->priority
= op
.priority
;
2297 o
->snapid
= CEPH_NOSNAP
;
2299 o
->out_bl
.swap(op
.out_bl
);
2300 o
->out_handler
.swap(op
.out_handler
);
2301 o
->out_rval
.swap(op
.out_rval
);
2302 o
->reply_epoch
= reply_epoch
;
2304 // budget is tracked by listing context
2305 o
->ctx_budgeted
= true;
2310 uint32_t hash
, object_locator_t oloc
,
2311 ObjectOperation
& op
, bufferlist
*pbl
, int flags
,
2312 Context
*onack
, epoch_t
*reply_epoch
,
2314 Op
*o
= prepare_pg_read_op(hash
, oloc
, op
, pbl
, flags
,
2315 onack
, reply_epoch
, ctx_budget
);
2317 op_submit(o
, &tid
, ctx_budget
);
2321 // caller owns a ref
2322 LingerOp
*linger_register(const object_t
& oid
, const object_locator_t
& oloc
,
2324 ceph_tid_t
linger_watch(LingerOp
*info
,
2325 ObjectOperation
& op
,
2326 const SnapContext
& snapc
, ceph::real_time mtime
,
2330 ceph_tid_t
linger_notify(LingerOp
*info
,
2331 ObjectOperation
& op
,
2332 snapid_t snap
, bufferlist
& inbl
,
2336 int linger_check(LingerOp
*info
);
2337 void linger_cancel(LingerOp
*info
); // releases a reference
2338 void _linger_cancel(LingerOp
*info
);
2340 void _do_watch_notify(LingerOp
*info
, MWatchNotify
*m
);
2343 * set up initial ops in the op vector, and allocate a final op slot.
2345 * The caller is responsible for filling in the final ops_count ops.
2347 * @param ops op vector
2348 * @param ops_count number of final ops the caller will fill in
2349 * @param extra_ops pointer to [array of] initial op[s]
2350 * @return index of final op (for caller to fill in)
2352 int init_ops(vector
<OSDOp
>& ops
, int ops_count
, ObjectOperation
*extra_ops
) {
2357 extra
= extra_ops
->ops
.size();
2359 ops
.resize(ops_count
+ extra
);
2361 for (i
=0; i
<extra
; i
++) {
2362 ops
[i
] = extra_ops
->ops
[i
];
2369 // high-level helpers
2370 Op
*prepare_stat_op(
2371 const object_t
& oid
, const object_locator_t
& oloc
,
2372 snapid_t snap
, uint64_t *psize
, ceph::real_time
*pmtime
,
2373 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2374 ObjectOperation
*extra_ops
= NULL
) {
2376 int i
= init_ops(ops
, 1, extra_ops
);
2377 ops
[i
].op
.op
= CEPH_OSD_OP_STAT
;
2378 C_Stat
*fin
= new C_Stat(psize
, pmtime
, onfinish
);
2379 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2380 CEPH_OSD_FLAG_READ
, fin
, objver
);
2382 o
->outbl
= &fin
->bl
;
2386 const object_t
& oid
, const object_locator_t
& oloc
,
2387 snapid_t snap
, uint64_t *psize
, ceph::real_time
*pmtime
,
2388 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2389 ObjectOperation
*extra_ops
= NULL
) {
2390 Op
*o
= prepare_stat_op(oid
, oloc
, snap
, psize
, pmtime
, flags
,
2391 onfinish
, objver
, extra_ops
);
2397 Op
*prepare_read_op(
2398 const object_t
& oid
, const object_locator_t
& oloc
,
2399 uint64_t off
, uint64_t len
, snapid_t snap
, bufferlist
*pbl
,
2400 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2401 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0,
2402 ZTracer::Trace
*parent_trace
= nullptr) {
2404 int i
= init_ops(ops
, 1, extra_ops
);
2405 ops
[i
].op
.op
= CEPH_OSD_OP_READ
;
2406 ops
[i
].op
.extent
.offset
= off
;
2407 ops
[i
].op
.extent
.length
= len
;
2408 ops
[i
].op
.extent
.truncate_size
= 0;
2409 ops
[i
].op
.extent
.truncate_seq
= 0;
2410 ops
[i
].op
.flags
= op_flags
;
2411 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2412 CEPH_OSD_FLAG_READ
, onfinish
, objver
, nullptr, parent_trace
);
2418 const object_t
& oid
, const object_locator_t
& oloc
,
2419 uint64_t off
, uint64_t len
, snapid_t snap
, bufferlist
*pbl
,
2420 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2421 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2422 Op
*o
= prepare_read_op(oid
, oloc
, off
, len
, snap
, pbl
, flags
,
2423 onfinish
, objver
, extra_ops
, op_flags
);
2429 Op
*prepare_cmpext_op(
2430 const object_t
& oid
, const object_locator_t
& oloc
,
2431 uint64_t off
, bufferlist
&cmp_bl
,
2432 snapid_t snap
, int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2433 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2435 int i
= init_ops(ops
, 1, extra_ops
);
2436 ops
[i
].op
.op
= CEPH_OSD_OP_CMPEXT
;
2437 ops
[i
].op
.extent
.offset
= off
;
2438 ops
[i
].op
.extent
.length
= cmp_bl
.length();
2439 ops
[i
].op
.extent
.truncate_size
= 0;
2440 ops
[i
].op
.extent
.truncate_seq
= 0;
2441 ops
[i
].indata
= cmp_bl
;
2442 ops
[i
].op
.flags
= op_flags
;
2443 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2444 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
2450 const object_t
& oid
, const object_locator_t
& oloc
,
2451 uint64_t off
, bufferlist
&cmp_bl
,
2452 snapid_t snap
, int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2453 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2454 Op
*o
= prepare_cmpext_op(oid
, oloc
, off
, cmp_bl
, snap
,
2455 flags
, onfinish
, objver
, extra_ops
, op_flags
);
2461 ceph_tid_t
read_trunc(const object_t
& oid
, const object_locator_t
& oloc
,
2462 uint64_t off
, uint64_t len
, snapid_t snap
,
2463 bufferlist
*pbl
, int flags
, uint64_t trunc_size
,
2464 __u32 trunc_seq
, Context
*onfinish
,
2465 version_t
*objver
= NULL
,
2466 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2468 int i
= init_ops(ops
, 1, extra_ops
);
2469 ops
[i
].op
.op
= CEPH_OSD_OP_READ
;
2470 ops
[i
].op
.extent
.offset
= off
;
2471 ops
[i
].op
.extent
.length
= len
;
2472 ops
[i
].op
.extent
.truncate_size
= trunc_size
;
2473 ops
[i
].op
.extent
.truncate_seq
= trunc_seq
;
2474 ops
[i
].op
.flags
= op_flags
;
2475 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2476 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
2483 ceph_tid_t
mapext(const object_t
& oid
, const object_locator_t
& oloc
,
2484 uint64_t off
, uint64_t len
, snapid_t snap
, bufferlist
*pbl
,
2485 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2486 ObjectOperation
*extra_ops
= NULL
) {
2488 int i
= init_ops(ops
, 1, extra_ops
);
2489 ops
[i
].op
.op
= CEPH_OSD_OP_MAPEXT
;
2490 ops
[i
].op
.extent
.offset
= off
;
2491 ops
[i
].op
.extent
.length
= len
;
2492 ops
[i
].op
.extent
.truncate_size
= 0;
2493 ops
[i
].op
.extent
.truncate_seq
= 0;
2494 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2495 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
2502 ceph_tid_t
getxattr(const object_t
& oid
, const object_locator_t
& oloc
,
2503 const char *name
, snapid_t snap
, bufferlist
*pbl
, int flags
,
2505 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2507 int i
= init_ops(ops
, 1, extra_ops
);
2508 ops
[i
].op
.op
= CEPH_OSD_OP_GETXATTR
;
2509 ops
[i
].op
.xattr
.name_len
= (name
? strlen(name
) : 0);
2510 ops
[i
].op
.xattr
.value_len
= 0;
2512 ops
[i
].indata
.append(name
);
2513 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2514 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
2522 ceph_tid_t
getxattrs(const object_t
& oid
, const object_locator_t
& oloc
,
2523 snapid_t snap
, map
<string
,bufferlist
>& attrset
,
2524 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2525 ObjectOperation
*extra_ops
= NULL
) {
2527 int i
= init_ops(ops
, 1, extra_ops
);
2528 ops
[i
].op
.op
= CEPH_OSD_OP_GETXATTRS
;
2529 C_GetAttrs
*fin
= new C_GetAttrs(attrset
, onfinish
);
2530 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2531 CEPH_OSD_FLAG_READ
, fin
, objver
);
2533 o
->outbl
= &fin
->bl
;
2539 ceph_tid_t
read_full(const object_t
& oid
, const object_locator_t
& oloc
,
2540 snapid_t snap
, bufferlist
*pbl
, int flags
,
2541 Context
*onfinish
, version_t
*objver
= NULL
,
2542 ObjectOperation
*extra_ops
= NULL
) {
2543 return read(oid
, oloc
, 0, 0, snap
, pbl
, flags
| global_op_flags
|
2544 CEPH_OSD_FLAG_READ
, onfinish
, objver
, extra_ops
);
2549 ceph_tid_t
_modify(const object_t
& oid
, const object_locator_t
& oloc
,
2550 vector
<OSDOp
>& ops
, ceph::real_time mtime
,
2551 const SnapContext
& snapc
, int flags
,
2553 version_t
*objver
= NULL
) {
2554 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2555 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2562 Op
*prepare_write_op(
2563 const object_t
& oid
, const object_locator_t
& oloc
,
2564 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
2565 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2566 Context
*oncommit
, version_t
*objver
= NULL
,
2567 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0,
2568 ZTracer::Trace
*parent_trace
= nullptr) {
2570 int i
= init_ops(ops
, 1, extra_ops
);
2571 ops
[i
].op
.op
= CEPH_OSD_OP_WRITE
;
2572 ops
[i
].op
.extent
.offset
= off
;
2573 ops
[i
].op
.extent
.length
= len
;
2574 ops
[i
].op
.extent
.truncate_size
= 0;
2575 ops
[i
].op
.extent
.truncate_seq
= 0;
2577 ops
[i
].op
.flags
= op_flags
;
2578 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2579 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
,
2580 nullptr, parent_trace
);
2586 const object_t
& oid
, const object_locator_t
& oloc
,
2587 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
2588 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2589 Context
*oncommit
, version_t
*objver
= NULL
,
2590 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2591 Op
*o
= prepare_write_op(oid
, oloc
, off
, len
, snapc
, bl
, mtime
, flags
,
2592 oncommit
, objver
, extra_ops
, op_flags
);
2597 Op
*prepare_append_op(
2598 const object_t
& oid
, const object_locator_t
& oloc
,
2599 uint64_t len
, const SnapContext
& snapc
,
2600 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2602 version_t
*objver
= NULL
,
2603 ObjectOperation
*extra_ops
= NULL
) {
2605 int i
= init_ops(ops
, 1, extra_ops
);
2606 ops
[i
].op
.op
= CEPH_OSD_OP_APPEND
;
2607 ops
[i
].op
.extent
.offset
= 0;
2608 ops
[i
].op
.extent
.length
= len
;
2609 ops
[i
].op
.extent
.truncate_size
= 0;
2610 ops
[i
].op
.extent
.truncate_seq
= 0;
2612 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2613 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2619 const object_t
& oid
, const object_locator_t
& oloc
,
2620 uint64_t len
, const SnapContext
& snapc
,
2621 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2623 version_t
*objver
= NULL
,
2624 ObjectOperation
*extra_ops
= NULL
) {
2625 Op
*o
= prepare_append_op(oid
, oloc
, len
, snapc
, bl
, mtime
, flags
,
2626 oncommit
, objver
, extra_ops
);
2631 ceph_tid_t
write_trunc(const object_t
& oid
, const object_locator_t
& oloc
,
2632 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
2633 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2634 uint64_t trunc_size
, __u32 trunc_seq
,
2636 version_t
*objver
= NULL
,
2637 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2639 int i
= init_ops(ops
, 1, extra_ops
);
2640 ops
[i
].op
.op
= CEPH_OSD_OP_WRITE
;
2641 ops
[i
].op
.extent
.offset
= off
;
2642 ops
[i
].op
.extent
.length
= len
;
2643 ops
[i
].op
.extent
.truncate_size
= trunc_size
;
2644 ops
[i
].op
.extent
.truncate_seq
= trunc_seq
;
2646 ops
[i
].op
.flags
= op_flags
;
2647 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2648 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2655 Op
*prepare_write_full_op(
2656 const object_t
& oid
, const object_locator_t
& oloc
,
2657 const SnapContext
& snapc
, const bufferlist
&bl
,
2658 ceph::real_time mtime
, int flags
,
2659 Context
*oncommit
, version_t
*objver
= NULL
,
2660 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2662 int i
= init_ops(ops
, 1, extra_ops
);
2663 ops
[i
].op
.op
= CEPH_OSD_OP_WRITEFULL
;
2664 ops
[i
].op
.extent
.offset
= 0;
2665 ops
[i
].op
.extent
.length
= bl
.length();
2667 ops
[i
].op
.flags
= op_flags
;
2668 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2669 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2674 ceph_tid_t
write_full(
2675 const object_t
& oid
, const object_locator_t
& oloc
,
2676 const SnapContext
& snapc
, const bufferlist
&bl
,
2677 ceph::real_time mtime
, int flags
,
2678 Context
*oncommit
, version_t
*objver
= NULL
,
2679 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2680 Op
*o
= prepare_write_full_op(oid
, oloc
, snapc
, bl
, mtime
, flags
,
2681 oncommit
, objver
, extra_ops
, op_flags
);
2686 Op
*prepare_writesame_op(
2687 const object_t
& oid
, const object_locator_t
& oloc
,
2688 uint64_t write_len
, uint64_t off
,
2689 const SnapContext
& snapc
, const bufferlist
&bl
,
2690 ceph::real_time mtime
, int flags
,
2691 Context
*oncommit
, version_t
*objver
= NULL
,
2692 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2695 int i
= init_ops(ops
, 1, extra_ops
);
2696 ops
[i
].op
.op
= CEPH_OSD_OP_WRITESAME
;
2697 ops
[i
].op
.writesame
.offset
= off
;
2698 ops
[i
].op
.writesame
.length
= write_len
;
2699 ops
[i
].op
.writesame
.data_length
= bl
.length();
2701 ops
[i
].op
.flags
= op_flags
;
2702 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2703 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2708 ceph_tid_t
writesame(
2709 const object_t
& oid
, const object_locator_t
& oloc
,
2710 uint64_t write_len
, uint64_t off
,
2711 const SnapContext
& snapc
, const bufferlist
&bl
,
2712 ceph::real_time mtime
, int flags
,
2713 Context
*oncommit
, version_t
*objver
= NULL
,
2714 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2716 Op
*o
= prepare_writesame_op(oid
, oloc
, write_len
, off
, snapc
, bl
,
2717 mtime
, flags
, oncommit
, objver
,
2718 extra_ops
, op_flags
);
2724 ceph_tid_t
trunc(const object_t
& oid
, const object_locator_t
& oloc
,
2725 const SnapContext
& snapc
, ceph::real_time mtime
, int flags
,
2726 uint64_t trunc_size
, __u32 trunc_seq
,
2727 Context
*oncommit
, version_t
*objver
= NULL
,
2728 ObjectOperation
*extra_ops
= NULL
) {
2730 int i
= init_ops(ops
, 1, extra_ops
);
2731 ops
[i
].op
.op
= CEPH_OSD_OP_TRUNCATE
;
2732 ops
[i
].op
.extent
.offset
= trunc_size
;
2733 ops
[i
].op
.extent
.truncate_size
= trunc_size
;
2734 ops
[i
].op
.extent
.truncate_seq
= trunc_seq
;
2735 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2736 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2743 ceph_tid_t
zero(const object_t
& oid
, const object_locator_t
& oloc
,
2744 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
2745 ceph::real_time mtime
, int flags
, Context
*oncommit
,
2746 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2748 int i
= init_ops(ops
, 1, extra_ops
);
2749 ops
[i
].op
.op
= CEPH_OSD_OP_ZERO
;
2750 ops
[i
].op
.extent
.offset
= off
;
2751 ops
[i
].op
.extent
.length
= len
;
2752 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2753 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2760 ceph_tid_t
rollback_object(const object_t
& oid
, const object_locator_t
& oloc
,
2761 const SnapContext
& snapc
, snapid_t snapid
,
2762 ceph::real_time mtime
, Context
*oncommit
,
2763 version_t
*objver
= NULL
,
2764 ObjectOperation
*extra_ops
= NULL
) {
2766 int i
= init_ops(ops
, 1, extra_ops
);
2767 ops
[i
].op
.op
= CEPH_OSD_OP_ROLLBACK
;
2768 ops
[i
].op
.snap
.snapid
= snapid
;
2769 Op
*o
= new Op(oid
, oloc
, ops
, CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2776 ceph_tid_t
create(const object_t
& oid
, const object_locator_t
& oloc
,
2777 const SnapContext
& snapc
, ceph::real_time mtime
, int global_flags
,
2778 int create_flags
, Context
*oncommit
,
2779 version_t
*objver
= NULL
,
2780 ObjectOperation
*extra_ops
= NULL
) {
2782 int i
= init_ops(ops
, 1, extra_ops
);
2783 ops
[i
].op
.op
= CEPH_OSD_OP_CREATE
;
2784 ops
[i
].op
.flags
= create_flags
;
2785 Op
*o
= new Op(oid
, oloc
, ops
, global_flags
| global_op_flags
|
2786 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2793 Op
*prepare_remove_op(
2794 const object_t
& oid
, const object_locator_t
& oloc
,
2795 const SnapContext
& snapc
, ceph::real_time mtime
, int flags
,
2797 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2799 int i
= init_ops(ops
, 1, extra_ops
);
2800 ops
[i
].op
.op
= CEPH_OSD_OP_DELETE
;
2801 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2802 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2808 const object_t
& oid
, const object_locator_t
& oloc
,
2809 const SnapContext
& snapc
, ceph::real_time mtime
, int flags
,
2811 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2812 Op
*o
= prepare_remove_op(oid
, oloc
, snapc
, mtime
, flags
,
2813 oncommit
, objver
, extra_ops
);
2819 ceph_tid_t
setxattr(const object_t
& oid
, const object_locator_t
& oloc
,
2820 const char *name
, const SnapContext
& snapc
, const bufferlist
&bl
,
2821 ceph::real_time mtime
, int flags
,
2823 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2825 int i
= init_ops(ops
, 1, extra_ops
);
2826 ops
[i
].op
.op
= CEPH_OSD_OP_SETXATTR
;
2827 ops
[i
].op
.xattr
.name_len
= (name
? strlen(name
) : 0);
2828 ops
[i
].op
.xattr
.value_len
= bl
.length();
2830 ops
[i
].indata
.append(name
);
2831 ops
[i
].indata
.append(bl
);
2832 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2833 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2840 ceph_tid_t
removexattr(const object_t
& oid
, const object_locator_t
& oloc
,
2841 const char *name
, const SnapContext
& snapc
,
2842 ceph::real_time mtime
, int flags
,
2844 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2846 int i
= init_ops(ops
, 1, extra_ops
);
2847 ops
[i
].op
.op
= CEPH_OSD_OP_RMXATTR
;
2848 ops
[i
].op
.xattr
.name_len
= (name
? strlen(name
) : 0);
2849 ops
[i
].op
.xattr
.value_len
= 0;
2851 ops
[i
].indata
.append(name
);
2852 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2853 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2861 void list_nobjects(NListContext
*p
, Context
*onfinish
);
2862 uint32_t list_nobjects_seek(NListContext
*p
, uint32_t pos
);
2863 uint32_t list_nobjects_seek(NListContext
*list_context
, const hobject_t
& c
);
2864 void list_nobjects_get_cursor(NListContext
*list_context
, hobject_t
*c
);
2866 hobject_t
enumerate_objects_begin();
2867 hobject_t
enumerate_objects_end();
2868 //hobject_t enumerate_objects_begin(int n, int m);
2869 void enumerate_objects(
2871 const std::string
&ns
,
2872 const hobject_t
&start
,
2873 const hobject_t
&end
,
2875 const bufferlist
&filter_bl
,
2876 std::list
<librados::ListObjectImpl
> *result
,
2878 Context
*on_finish
);
2880 void _enumerate_reply(
2883 const hobject_t
&end
,
2884 const int64_t pool_id
,
2886 epoch_t reply_epoch
,
2887 std::list
<librados::ListObjectImpl
> *result
,
2889 Context
*on_finish
);
2890 friend class C_EnumerateReply
;
2892 // -------------------------
2895 void pool_op_submit(PoolOp
*op
);
2896 void _pool_op_submit(PoolOp
*op
);
2897 void _finish_pool_op(PoolOp
*op
, int r
);
2898 void _do_delete_pool(int64_t pool
, Context
*onfinish
);
2900 int create_pool_snap(int64_t pool
, string
& snapName
, Context
*onfinish
);
2901 int allocate_selfmanaged_snap(int64_t pool
, snapid_t
*psnapid
,
2903 int delete_pool_snap(int64_t pool
, string
& snapName
, Context
*onfinish
);
2904 int delete_selfmanaged_snap(int64_t pool
, snapid_t snap
, Context
*onfinish
);
2906 int create_pool(string
& name
, Context
*onfinish
, uint64_t auid
=0,
2908 int delete_pool(int64_t pool
, Context
*onfinish
);
2909 int delete_pool(const string
& name
, Context
*onfinish
);
2910 int change_pool_auid(int64_t pool
, Context
*onfinish
, uint64_t auid
);
2912 void handle_pool_op_reply(MPoolOpReply
*m
);
2913 int pool_op_cancel(ceph_tid_t tid
, int r
);
2915 // --------------------------
2918 void _poolstat_submit(PoolStatOp
*op
);
2920 void handle_get_pool_stats_reply(MGetPoolStatsReply
*m
);
2921 void get_pool_stats(list
<string
>& pools
, map
<string
,pool_stat_t
> *result
,
2923 int pool_stat_op_cancel(ceph_tid_t tid
, int r
);
2924 void _finish_pool_stat_op(PoolStatOp
*op
, int r
);
2926 // ---------------------------
2929 void _fs_stats_submit(StatfsOp
*op
);
2931 void handle_fs_stats_reply(MStatfsReply
*m
);
2932 void get_fs_stats(struct ceph_statfs
& result
, Context
*onfinish
);
2933 int statfs_op_cancel(ceph_tid_t tid
, int r
);
2934 void _finish_statfs_op(StatfsOp
*op
, int r
);
2936 // ---------------------------
2937 // some scatter/gather hackery
2939 void _sg_read_finish(vector
<ObjectExtent
>& extents
,
2940 vector
<bufferlist
>& resultbl
,
2941 bufferlist
*bl
, Context
*onfinish
);
2943 struct C_SGRead
: public Context
{
2945 vector
<ObjectExtent
> extents
;
2946 vector
<bufferlist
> resultbl
;
2949 C_SGRead(Objecter
*ob
,
2950 vector
<ObjectExtent
>& e
, vector
<bufferlist
>& r
, bufferlist
*b
,
2952 objecter(ob
), bl(b
), onfinish(c
) {
2956 void finish(int r
) override
{
2957 objecter
->_sg_read_finish(extents
, resultbl
, bl
, onfinish
);
2961 void sg_read_trunc(vector
<ObjectExtent
>& extents
, snapid_t snap
,
2962 bufferlist
*bl
, int flags
, uint64_t trunc_size
,
2963 __u32 trunc_seq
, Context
*onfinish
, int op_flags
= 0) {
2964 if (extents
.size() == 1) {
2965 read_trunc(extents
[0].oid
, extents
[0].oloc
, extents
[0].offset
,
2966 extents
[0].length
, snap
, bl
, flags
, extents
[0].truncate_size
,
2967 trunc_seq
, onfinish
, 0, 0, op_flags
);
2969 C_GatherBuilder
gather(cct
);
2970 vector
<bufferlist
> resultbl(extents
.size());
2972 for (vector
<ObjectExtent
>::iterator p
= extents
.begin();
2975 read_trunc(p
->oid
, p
->oloc
, p
->offset
, p
->length
, snap
, &resultbl
[i
++],
2976 flags
, p
->truncate_size
, trunc_seq
, gather
.new_sub(),
2979 gather
.set_finisher(new C_SGRead(this, extents
, resultbl
, bl
, onfinish
));
2984 void sg_read(vector
<ObjectExtent
>& extents
, snapid_t snap
, bufferlist
*bl
,
2985 int flags
, Context
*onfinish
, int op_flags
= 0) {
2986 sg_read_trunc(extents
, snap
, bl
, flags
, 0, 0, onfinish
, op_flags
);
2989 void sg_write_trunc(vector
<ObjectExtent
>& extents
, const SnapContext
& snapc
,
2990 const bufferlist
& bl
, ceph::real_time mtime
, int flags
,
2991 uint64_t trunc_size
, __u32 trunc_seq
,
2992 Context
*oncommit
, int op_flags
= 0) {
2993 if (extents
.size() == 1) {
2994 write_trunc(extents
[0].oid
, extents
[0].oloc
, extents
[0].offset
,
2995 extents
[0].length
, snapc
, bl
, mtime
, flags
,
2996 extents
[0].truncate_size
, trunc_seq
, oncommit
,
2999 C_GatherBuilder
gcom(cct
, oncommit
);
3000 for (vector
<ObjectExtent
>::iterator p
= extents
.begin();
3004 for (vector
<pair
<uint64_t,uint64_t> >::iterator bit
3005 = p
->buffer_extents
.begin();
3006 bit
!= p
->buffer_extents
.end();
3008 bl
.copy(bit
->first
, bit
->second
, cur
);
3009 assert(cur
.length() == p
->length
);
3010 write_trunc(p
->oid
, p
->oloc
, p
->offset
, p
->length
,
3011 snapc
, cur
, mtime
, flags
, p
->truncate_size
, trunc_seq
,
3012 oncommit
? gcom
.new_sub():0,
3019 void sg_write(vector
<ObjectExtent
>& extents
, const SnapContext
& snapc
,
3020 const bufferlist
& bl
, ceph::real_time mtime
, int flags
,
3021 Context
*oncommit
, int op_flags
= 0) {
3022 sg_write_trunc(extents
, snapc
, bl
, mtime
, flags
, 0, 0, oncommit
,
3026 void ms_handle_connect(Connection
*con
) override
;
3027 bool ms_handle_reset(Connection
*con
) override
;
3028 void ms_handle_remote_reset(Connection
*con
) override
;
3029 bool ms_handle_refused(Connection
*con
) override
;
3030 bool ms_get_authorizer(int dest_type
,
3031 AuthAuthorizer
**authorizer
,
3032 bool force_new
) override
;
3034 void blacklist_self(bool set
);
3037 epoch_t epoch_barrier
;
3038 bool retry_writes_after_first_reply
;
3040 void set_epoch_barrier(epoch_t epoch
);
3042 PerfCounters
*get_logger() {