1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_OBJECTER_H
16 #define CEPH_OBJECTER_H
18 #include <condition_variable>
24 #include <type_traits>
26 #include <boost/thread/shared_mutex.hpp>
28 #include "include/assert.h"
29 #include "include/buffer.h"
30 #include "include/types.h"
31 #include "include/rados/rados_types.hpp"
33 #include "common/admin_socket.h"
34 #include "common/ceph_time.h"
35 #include "common/ceph_timer.h"
36 #include "common/Finisher.h"
37 #include "common/shunique_lock.h"
38 #include "common/zipkin_trace.h"
40 #include "messages/MOSDOp.h"
41 #include "osd/OSDMap.h"
54 class MGetPoolStatsReply
;
61 // -----------------------------------------
63 struct ObjectOperation
{
68 vector
<bufferlist
*> out_bl
;
69 vector
<Context
*> out_handler
;
70 vector
<int*> out_rval
;
72 ObjectOperation() : flags(0), priority(0) {}
74 while (!out_handler
.empty()) {
75 delete out_handler
.back();
76 out_handler
.pop_back();
84 void set_last_op_flags(int flags
) {
86 ops
.rbegin()->op
.flags
= flags
;
91 * Add a callback to run when this operation completes,
92 * after any other callbacks for it.
94 void add_handler(Context
*extra
);
96 OSDOp
& add_op(int op
) {
102 out_handler
.resize(s
+1);
103 out_handler
[s
] = NULL
;
104 out_rval
.resize(s
+1);
108 void add_data(int op
, uint64_t off
, uint64_t len
, bufferlist
& bl
) {
109 OSDOp
& osd_op
= add_op(op
);
110 osd_op
.op
.extent
.offset
= off
;
111 osd_op
.op
.extent
.length
= len
;
112 osd_op
.indata
.claim_append(bl
);
114 void add_writesame(int op
, uint64_t off
, uint64_t write_len
,
116 OSDOp
& osd_op
= add_op(op
);
117 osd_op
.op
.writesame
.offset
= off
;
118 osd_op
.op
.writesame
.length
= write_len
;
119 osd_op
.op
.writesame
.data_length
= bl
.length();
120 osd_op
.indata
.claim_append(bl
);
122 void add_xattr(int op
, const char *name
, const bufferlist
& data
) {
123 OSDOp
& osd_op
= add_op(op
);
124 osd_op
.op
.xattr
.name_len
= (name
? strlen(name
) : 0);
125 osd_op
.op
.xattr
.value_len
= data
.length();
127 osd_op
.indata
.append(name
);
128 osd_op
.indata
.append(data
);
130 void add_xattr_cmp(int op
, const char *name
, uint8_t cmp_op
,
131 uint8_t cmp_mode
, const bufferlist
& data
) {
132 OSDOp
& osd_op
= add_op(op
);
133 osd_op
.op
.xattr
.name_len
= (name
? strlen(name
) : 0);
134 osd_op
.op
.xattr
.value_len
= data
.length();
135 osd_op
.op
.xattr
.cmp_op
= cmp_op
;
136 osd_op
.op
.xattr
.cmp_mode
= cmp_mode
;
138 osd_op
.indata
.append(name
);
139 osd_op
.indata
.append(data
);
141 void add_call(int op
, const char *cname
, const char *method
,
143 bufferlist
*outbl
, Context
*ctx
, int *prval
) {
144 OSDOp
& osd_op
= add_op(op
);
146 unsigned p
= ops
.size() - 1;
147 out_handler
[p
] = ctx
;
151 osd_op
.op
.cls
.class_len
= strlen(cname
);
152 osd_op
.op
.cls
.method_len
= strlen(method
);
153 osd_op
.op
.cls
.indata_len
= indata
.length();
154 osd_op
.indata
.append(cname
, osd_op
.op
.cls
.class_len
);
155 osd_op
.indata
.append(method
, osd_op
.op
.cls
.method_len
);
156 osd_op
.indata
.append(indata
);
158 void add_pgls(int op
, uint64_t count
, collection_list_handle_t cookie
,
159 epoch_t start_epoch
) {
160 OSDOp
& osd_op
= add_op(op
);
161 osd_op
.op
.pgls
.count
= count
;
162 osd_op
.op
.pgls
.start_epoch
= start_epoch
;
163 ::encode(cookie
, osd_op
.indata
);
165 void add_pgls_filter(int op
, uint64_t count
, const bufferlist
& filter
,
166 collection_list_handle_t cookie
, epoch_t start_epoch
) {
167 OSDOp
& osd_op
= add_op(op
);
168 osd_op
.op
.pgls
.count
= count
;
169 osd_op
.op
.pgls
.start_epoch
= start_epoch
;
171 string mname
= "filter";
172 ::encode(cname
, osd_op
.indata
);
173 ::encode(mname
, osd_op
.indata
);
174 osd_op
.indata
.append(filter
);
175 ::encode(cookie
, osd_op
.indata
);
177 void add_alloc_hint(int op
, uint64_t expected_object_size
,
178 uint64_t expected_write_size
,
180 OSDOp
& osd_op
= add_op(op
);
181 osd_op
.op
.alloc_hint
.expected_object_size
= expected_object_size
;
182 osd_op
.op
.alloc_hint
.expected_write_size
= expected_write_size
;
183 osd_op
.op
.alloc_hint
.flags
= flags
;
189 void pg_ls(uint64_t count
, bufferlist
& filter
,
190 collection_list_handle_t cookie
, epoch_t start_epoch
) {
191 if (filter
.length() == 0)
192 add_pgls(CEPH_OSD_OP_PGLS
, count
, cookie
, start_epoch
);
194 add_pgls_filter(CEPH_OSD_OP_PGLS_FILTER
, count
, filter
, cookie
,
196 flags
|= CEPH_OSD_FLAG_PGOP
;
199 void pg_nls(uint64_t count
, const bufferlist
& filter
,
200 collection_list_handle_t cookie
, epoch_t start_epoch
) {
201 if (filter
.length() == 0)
202 add_pgls(CEPH_OSD_OP_PGNLS
, count
, cookie
, start_epoch
);
204 add_pgls_filter(CEPH_OSD_OP_PGNLS_FILTER
, count
, filter
, cookie
,
206 flags
|= CEPH_OSD_FLAG_PGOP
;
209 void scrub_ls(const librados::object_id_t
& start_after
,
211 std::vector
<librados::inconsistent_obj_t
> *objects
,
214 void scrub_ls(const librados::object_id_t
& start_after
,
216 std::vector
<librados::inconsistent_snapset_t
> *objects
,
220 void create(bool excl
) {
221 OSDOp
& o
= add_op(CEPH_OSD_OP_CREATE
);
222 o
.op
.flags
= (excl
? CEPH_OSD_OP_FLAG_EXCL
: 0);
225 struct C_ObjectOperation_stat
: public Context
{
228 ceph::real_time
*pmtime
;
230 struct timespec
*pts
;
232 C_ObjectOperation_stat(uint64_t *ps
, ceph::real_time
*pm
, time_t *pt
, struct timespec
*_pts
,
234 : psize(ps
), pmtime(pm
), ptime(pt
), pts(_pts
), prval(prval
) {}
235 void finish(int r
) override
{
237 bufferlist::iterator p
= bl
.begin();
240 ceph::real_time mtime
;
248 *ptime
= ceph::real_clock::to_time_t(mtime
);
250 *pts
= ceph::real_clock::to_timespec(mtime
);
251 } catch (buffer::error
& e
) {
258 void stat(uint64_t *psize
, ceph::real_time
*pmtime
, int *prval
) {
259 add_op(CEPH_OSD_OP_STAT
);
260 unsigned p
= ops
.size() - 1;
261 C_ObjectOperation_stat
*h
= new C_ObjectOperation_stat(psize
, pmtime
, NULL
, NULL
,
267 void stat(uint64_t *psize
, time_t *ptime
, int *prval
) {
268 add_op(CEPH_OSD_OP_STAT
);
269 unsigned p
= ops
.size() - 1;
270 C_ObjectOperation_stat
*h
= new C_ObjectOperation_stat(psize
, NULL
, ptime
, NULL
,
276 void stat(uint64_t *psize
, struct timespec
*pts
, int *prval
) {
277 add_op(CEPH_OSD_OP_STAT
);
278 unsigned p
= ops
.size() - 1;
279 C_ObjectOperation_stat
*h
= new C_ObjectOperation_stat(psize
, NULL
, NULL
, pts
,
286 struct C_ObjectOperation_cmpext
: public Context
{
288 C_ObjectOperation_cmpext(int *prval
)
297 void cmpext(uint64_t off
, bufferlist
& cmp_bl
, int *prval
) {
298 add_data(CEPH_OSD_OP_CMPEXT
, off
, cmp_bl
.length(), cmp_bl
);
299 unsigned p
= ops
.size() - 1;
300 C_ObjectOperation_cmpext
*h
= new C_ObjectOperation_cmpext(prval
);
306 void cmpext(uint64_t off
, uint64_t cmp_len
, const char *cmp_buf
, int *prval
) {
308 cmp_bl
.append(cmp_buf
, cmp_len
);
309 add_data(CEPH_OSD_OP_CMPEXT
, off
, cmp_len
, cmp_bl
);
310 unsigned p
= ops
.size() - 1;
311 C_ObjectOperation_cmpext
*h
= new C_ObjectOperation_cmpext(prval
);
316 void read(uint64_t off
, uint64_t len
, bufferlist
*pbl
, int *prval
,
319 add_data(CEPH_OSD_OP_READ
, off
, len
, bl
);
320 unsigned p
= ops
.size() - 1;
323 out_handler
[p
] = ctx
;
326 struct C_ObjectOperation_sparse_read
: public Context
{
329 std::map
<uint64_t, uint64_t> *extents
;
331 C_ObjectOperation_sparse_read(bufferlist
*data_bl
,
332 std::map
<uint64_t, uint64_t> *extents
,
334 : data_bl(data_bl
), extents(extents
), prval(prval
) {}
335 void finish(int r
) override
{
336 bufferlist::iterator iter
= bl
.begin();
338 // NOTE: it's possible the sub-op has not been executed but the result
339 // code remains zeroed. Avoid the costly exception handling on a
340 // potential IO path.
341 if (bl
.length() > 0) {
343 ::decode(*extents
, iter
);
344 ::decode(*data_bl
, iter
);
345 } catch (buffer::error
& e
) {
355 void sparse_read(uint64_t off
, uint64_t len
, std::map
<uint64_t,uint64_t> *m
,
356 bufferlist
*data_bl
, int *prval
) {
358 add_data(CEPH_OSD_OP_SPARSE_READ
, off
, len
, bl
);
359 unsigned p
= ops
.size() - 1;
360 C_ObjectOperation_sparse_read
*h
=
361 new C_ObjectOperation_sparse_read(data_bl
, m
, prval
);
366 void write(uint64_t off
, bufferlist
& bl
,
367 uint64_t truncate_size
,
368 uint32_t truncate_seq
) {
369 add_data(CEPH_OSD_OP_WRITE
, off
, bl
.length(), bl
);
370 OSDOp
& o
= *ops
.rbegin();
371 o
.op
.extent
.truncate_size
= truncate_size
;
372 o
.op
.extent
.truncate_seq
= truncate_seq
;
374 void write(uint64_t off
, bufferlist
& bl
) {
375 write(off
, bl
, 0, 0);
377 void write_full(bufferlist
& bl
) {
378 add_data(CEPH_OSD_OP_WRITEFULL
, 0, bl
.length(), bl
);
380 void writesame(uint64_t off
, uint64_t write_len
, bufferlist
& bl
) {
381 add_writesame(CEPH_OSD_OP_WRITESAME
, off
, write_len
, bl
);
383 void append(bufferlist
& bl
) {
384 add_data(CEPH_OSD_OP_APPEND
, 0, bl
.length(), bl
);
386 void zero(uint64_t off
, uint64_t len
) {
388 add_data(CEPH_OSD_OP_ZERO
, off
, len
, bl
);
390 void truncate(uint64_t off
) {
392 add_data(CEPH_OSD_OP_TRUNCATE
, off
, 0, bl
);
396 add_data(CEPH_OSD_OP_DELETE
, 0, 0, bl
);
398 void mapext(uint64_t off
, uint64_t len
) {
400 add_data(CEPH_OSD_OP_MAPEXT
, off
, len
, bl
);
402 void sparse_read(uint64_t off
, uint64_t len
) {
404 add_data(CEPH_OSD_OP_SPARSE_READ
, off
, len
, bl
);
407 void checksum(uint8_t type
, const bufferlist
&init_value_bl
,
408 uint64_t off
, uint64_t len
, size_t chunk_size
,
409 bufferlist
*pbl
, int *prval
, Context
*ctx
) {
410 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_CHECKSUM
);
411 osd_op
.op
.checksum
.offset
= off
;
412 osd_op
.op
.checksum
.length
= len
;
413 osd_op
.op
.checksum
.type
= type
;
414 osd_op
.op
.checksum
.chunk_size
= chunk_size
;
415 osd_op
.indata
.append(init_value_bl
);
417 unsigned p
= ops
.size() - 1;
420 out_handler
[p
] = ctx
;
424 void getxattr(const char *name
, bufferlist
*pbl
, int *prval
) {
426 add_xattr(CEPH_OSD_OP_GETXATTR
, name
, bl
);
427 unsigned p
= ops
.size() - 1;
431 struct C_ObjectOperation_decodevals
: public Context
{
432 uint64_t max_entries
;
434 std::map
<std::string
,bufferlist
> *pattrs
;
437 C_ObjectOperation_decodevals(uint64_t m
, std::map
<std::string
,bufferlist
> *pa
,
439 : max_entries(m
), pattrs(pa
), ptruncated(pt
), prval(pr
) {
444 void finish(int r
) override
{
446 bufferlist::iterator p
= bl
.begin();
449 ::decode(*pattrs
, p
);
451 std::map
<std::string
,bufferlist
> ignore
;
457 ::decode(*ptruncated
, p
);
459 // the OSD did not provide this. since old OSDs do not
460 // enfoce omap result limits either, we can infer it from
461 // the size of the result
462 *ptruncated
= (pattrs
->size() == max_entries
);
466 catch (buffer::error
& e
) {
473 struct C_ObjectOperation_decodekeys
: public Context
{
474 uint64_t max_entries
;
476 std::set
<std::string
> *pattrs
;
479 C_ObjectOperation_decodekeys(uint64_t m
, std::set
<std::string
> *pa
, bool *pt
,
481 : max_entries(m
), pattrs(pa
), ptruncated(pt
), prval(pr
) {
486 void finish(int r
) override
{
488 bufferlist::iterator p
= bl
.begin();
491 ::decode(*pattrs
, p
);
493 std::set
<std::string
> ignore
;
499 ::decode(*ptruncated
, p
);
501 // the OSD did not provide this. since old OSDs do not
502 // enfoce omap result limits either, we can infer it from
503 // the size of the result
504 *ptruncated
= (pattrs
->size() == max_entries
);
508 catch (buffer::error
& e
) {
515 struct C_ObjectOperation_decodewatchers
: public Context
{
517 list
<obj_watch_t
> *pwatchers
;
519 C_ObjectOperation_decodewatchers(list
<obj_watch_t
> *pw
, int *pr
)
520 : pwatchers(pw
), prval(pr
) {}
521 void finish(int r
) override
{
523 bufferlist::iterator p
= bl
.begin();
525 obj_list_watch_response_t resp
;
528 for (list
<watch_item_t
>::iterator i
= resp
.entries
.begin() ;
529 i
!= resp
.entries
.end() ; ++i
) {
533 strncpy(ow
.addr
, sa
.str().c_str(), 256);
534 ow
.watcher_id
= i
->name
.num();
535 ow
.cookie
= i
->cookie
;
536 ow
.timeout_seconds
= i
->timeout_seconds
;
537 pwatchers
->push_back(ow
);
541 catch (buffer::error
& e
) {
548 struct C_ObjectOperation_decodesnaps
: public Context
{
550 librados::snap_set_t
*psnaps
;
552 C_ObjectOperation_decodesnaps(librados::snap_set_t
*ps
, int *pr
)
553 : psnaps(ps
), prval(pr
) {}
554 void finish(int r
) override
{
556 bufferlist::iterator p
= bl
.begin();
558 obj_list_snap_response_t resp
;
561 psnaps
->clones
.clear();
562 for (vector
<clone_info
>::iterator ci
= resp
.clones
.begin();
563 ci
!= resp
.clones
.end();
565 librados::clone_info_t clone
;
567 clone
.cloneid
= ci
->cloneid
;
568 clone
.snaps
.reserve(ci
->snaps
.size());
569 clone
.snaps
.insert(clone
.snaps
.end(), ci
->snaps
.begin(),
571 clone
.overlap
= ci
->overlap
;
572 clone
.size
= ci
->size
;
574 psnaps
->clones
.push_back(clone
);
576 psnaps
->seq
= resp
.seq
;
578 } catch (buffer::error
& e
) {
585 void getxattrs(std::map
<std::string
,bufferlist
> *pattrs
, int *prval
) {
586 add_op(CEPH_OSD_OP_GETXATTRS
);
587 if (pattrs
|| prval
) {
588 unsigned p
= ops
.size() - 1;
589 C_ObjectOperation_decodevals
*h
590 = new C_ObjectOperation_decodevals(0, pattrs
, nullptr, prval
);
596 void setxattr(const char *name
, const bufferlist
& bl
) {
597 add_xattr(CEPH_OSD_OP_SETXATTR
, name
, bl
);
599 void setxattr(const char *name
, const string
& s
) {
602 add_xattr(CEPH_OSD_OP_SETXATTR
, name
, bl
);
604 void cmpxattr(const char *name
, uint8_t cmp_op
, uint8_t cmp_mode
,
605 const bufferlist
& bl
) {
606 add_xattr_cmp(CEPH_OSD_OP_CMPXATTR
, name
, cmp_op
, cmp_mode
, bl
);
608 void rmxattr(const char *name
) {
610 add_xattr(CEPH_OSD_OP_RMXATTR
, name
, bl
);
612 void setxattrs(map
<string
, bufferlist
>& attrs
) {
615 add_xattr(CEPH_OSD_OP_RESETXATTRS
, 0, bl
.length());
617 void resetxattrs(const char *prefix
, map
<string
, bufferlist
>& attrs
) {
620 add_xattr(CEPH_OSD_OP_RESETXATTRS
, prefix
, bl
);
624 void tmap_update(bufferlist
& bl
) {
625 add_data(CEPH_OSD_OP_TMAPUP
, 0, 0, bl
);
627 void tmap_put(bufferlist
& bl
) {
628 add_data(CEPH_OSD_OP_TMAPPUT
, 0, bl
.length(), bl
);
630 void tmap_get(bufferlist
*pbl
, int *prval
) {
631 add_op(CEPH_OSD_OP_TMAPGET
);
632 unsigned p
= ops
.size() - 1;
637 add_op(CEPH_OSD_OP_TMAPGET
);
639 void tmap_to_omap(bool nullok
=false) {
640 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_TMAP2OMAP
);
642 osd_op
.op
.tmap2omap
.flags
= CEPH_OSD_TMAP2OMAP_NULLOK
;
646 void omap_get_keys(const string
&start_after
,
648 std::set
<std::string
> *out_set
,
651 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAPGETKEYS
);
653 ::encode(start_after
, bl
);
654 ::encode(max_to_get
, bl
);
655 op
.op
.extent
.offset
= 0;
656 op
.op
.extent
.length
= bl
.length();
657 op
.indata
.claim_append(bl
);
658 if (prval
|| ptruncated
|| out_set
) {
659 unsigned p
= ops
.size() - 1;
660 C_ObjectOperation_decodekeys
*h
=
661 new C_ObjectOperation_decodekeys(max_to_get
, out_set
, ptruncated
, prval
);
668 void omap_get_vals(const string
&start_after
,
669 const string
&filter_prefix
,
671 std::map
<std::string
, bufferlist
> *out_set
,
674 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAPGETVALS
);
676 ::encode(start_after
, bl
);
677 ::encode(max_to_get
, bl
);
678 ::encode(filter_prefix
, bl
);
679 op
.op
.extent
.offset
= 0;
680 op
.op
.extent
.length
= bl
.length();
681 op
.indata
.claim_append(bl
);
682 if (prval
|| out_set
|| ptruncated
) {
683 unsigned p
= ops
.size() - 1;
684 C_ObjectOperation_decodevals
*h
=
685 new C_ObjectOperation_decodevals(max_to_get
, out_set
, ptruncated
, prval
);
692 void omap_get_vals_by_keys(const std::set
<std::string
> &to_get
,
693 std::map
<std::string
, bufferlist
> *out_set
,
695 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAPGETVALSBYKEYS
);
697 ::encode(to_get
, bl
);
698 op
.op
.extent
.offset
= 0;
699 op
.op
.extent
.length
= bl
.length();
700 op
.indata
.claim_append(bl
);
701 if (prval
|| out_set
) {
702 unsigned p
= ops
.size() - 1;
703 C_ObjectOperation_decodevals
*h
=
704 new C_ObjectOperation_decodevals(0, out_set
, nullptr, prval
);
711 void omap_cmp(const std::map
<std::string
, pair
<bufferlist
,int> > &assertions
,
713 OSDOp
&op
= add_op(CEPH_OSD_OP_OMAP_CMP
);
715 ::encode(assertions
, bl
);
716 op
.op
.extent
.offset
= 0;
717 op
.op
.extent
.length
= bl
.length();
718 op
.indata
.claim_append(bl
);
720 unsigned p
= ops
.size() - 1;
725 struct C_ObjectOperation_copyget
: public Context
{
727 object_copy_cursor_t
*cursor
;
729 ceph::real_time
*out_mtime
;
730 std::map
<std::string
,bufferlist
> *out_attrs
;
731 bufferlist
*out_data
, *out_omap_header
, *out_omap_data
;
732 vector
<snapid_t
> *out_snaps
;
733 snapid_t
*out_snap_seq
;
735 uint32_t *out_data_digest
;
736 uint32_t *out_omap_digest
;
737 mempool::osd_pglog::vector
<pair
<osd_reqid_t
, version_t
> > *out_reqids
;
738 uint64_t *out_truncate_seq
;
739 uint64_t *out_truncate_size
;
741 C_ObjectOperation_copyget(object_copy_cursor_t
*c
,
744 std::map
<std::string
,bufferlist
> *a
,
745 bufferlist
*d
, bufferlist
*oh
,
747 std::vector
<snapid_t
> *osnaps
,
752 mempool::osd_pglog::vector
<pair
<osd_reqid_t
, version_t
> > *oreqids
,
757 out_size(s
), out_mtime(m
),
758 out_attrs(a
), out_data(d
), out_omap_header(oh
),
759 out_omap_data(o
), out_snaps(osnaps
), out_snap_seq(osnap_seq
),
760 out_flags(flags
), out_data_digest(dd
), out_omap_digest(od
),
762 out_truncate_seq(otseq
),
763 out_truncate_size(otsize
),
765 void finish(int r
) override
{
766 // reqids are copied on ENOENT
767 if (r
< 0 && r
!= -ENOENT
)
770 bufferlist::iterator p
= bl
.begin();
771 object_copy_data_t copy_reply
;
772 ::decode(copy_reply
, p
);
775 *out_reqids
= copy_reply
.reqids
;
779 *out_size
= copy_reply
.size
;
781 *out_mtime
= ceph::real_clock::from_ceph_timespec(copy_reply
.mtime
);
783 *out_attrs
= copy_reply
.attrs
;
785 out_data
->claim_append(copy_reply
.data
);
787 out_omap_header
->claim_append(copy_reply
.omap_header
);
789 *out_omap_data
= copy_reply
.omap_data
;
791 *out_snaps
= copy_reply
.snaps
;
793 *out_snap_seq
= copy_reply
.snap_seq
;
795 *out_flags
= copy_reply
.flags
;
797 *out_data_digest
= copy_reply
.data_digest
;
799 *out_omap_digest
= copy_reply
.omap_digest
;
801 *out_reqids
= copy_reply
.reqids
;
802 if (out_truncate_seq
)
803 *out_truncate_seq
= copy_reply
.truncate_seq
;
804 if (out_truncate_size
)
805 *out_truncate_size
= copy_reply
.truncate_size
;
806 *cursor
= copy_reply
.cursor
;
807 } catch (buffer::error
& e
) {
814 void copy_get(object_copy_cursor_t
*cursor
,
817 ceph::real_time
*out_mtime
,
818 std::map
<std::string
,bufferlist
> *out_attrs
,
819 bufferlist
*out_data
,
820 bufferlist
*out_omap_header
,
821 bufferlist
*out_omap_data
,
822 vector
<snapid_t
> *out_snaps
,
823 snapid_t
*out_snap_seq
,
825 uint32_t *out_data_digest
,
826 uint32_t *out_omap_digest
,
827 mempool::osd_pglog::vector
<pair
<osd_reqid_t
, version_t
> > *out_reqids
,
828 uint64_t *truncate_seq
,
829 uint64_t *truncate_size
,
831 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_COPY_GET
);
832 osd_op
.op
.copy_get
.max
= max
;
833 ::encode(*cursor
, osd_op
.indata
);
834 ::encode(max
, osd_op
.indata
);
835 unsigned p
= ops
.size() - 1;
837 C_ObjectOperation_copyget
*h
=
838 new C_ObjectOperation_copyget(cursor
, out_size
, out_mtime
,
839 out_attrs
, out_data
, out_omap_header
,
840 out_omap_data
, out_snaps
, out_snap_seq
,
841 out_flags
, out_data_digest
,
842 out_omap_digest
, out_reqids
, truncate_seq
,
843 truncate_size
, prval
);
849 add_op(CEPH_OSD_OP_UNDIRTY
);
852 struct C_ObjectOperation_isdirty
: public Context
{
856 C_ObjectOperation_isdirty(bool *p
, int *r
)
857 : pisdirty(p
), prval(r
) {}
858 void finish(int r
) override
{
862 bufferlist::iterator p
= bl
.begin();
864 ::decode(isdirty
, p
);
867 } catch (buffer::error
& e
) {
874 void is_dirty(bool *pisdirty
, int *prval
) {
875 add_op(CEPH_OSD_OP_ISDIRTY
);
876 unsigned p
= ops
.size() - 1;
878 C_ObjectOperation_isdirty
*h
=
879 new C_ObjectOperation_isdirty(pisdirty
, prval
);
884 struct C_ObjectOperation_hit_set_ls
: public Context
{
886 std::list
< std::pair
<time_t, time_t> > *ptls
;
887 std::list
< std::pair
<ceph::real_time
, ceph::real_time
> > *putls
;
889 C_ObjectOperation_hit_set_ls(std::list
< std::pair
<time_t, time_t> > *t
,
890 std::list
< std::pair
<ceph::real_time
,
891 ceph::real_time
> > *ut
,
893 : ptls(t
), putls(ut
), prval(r
) {}
894 void finish(int r
) override
{
898 bufferlist::iterator p
= bl
.begin();
899 std::list
< std::pair
<ceph::real_time
, ceph::real_time
> > ls
;
903 for (auto p
= ls
.begin(); p
!= ls
.end(); ++p
)
904 // round initial timestamp up to the next full second to
905 // keep this a valid interval.
907 make_pair(ceph::real_clock::to_time_t(
909 // Sadly, no time literals until C++14.
910 std::chrono::seconds(1))),
911 ceph::real_clock::to_time_t(p
->second
)));
915 } catch (buffer::error
& e
) {
924 * list available HitSets.
926 * We will get back a list of time intervals. Note that the most
927 * recent range may have an empty end timestamp if it is still
930 * @param pls [out] list of time intervals
931 * @param prval [out] return value
933 void hit_set_ls(std::list
< std::pair
<time_t, time_t> > *pls
, int *prval
) {
934 add_op(CEPH_OSD_OP_PG_HITSET_LS
);
935 unsigned p
= ops
.size() - 1;
937 C_ObjectOperation_hit_set_ls
*h
=
938 new C_ObjectOperation_hit_set_ls(pls
, NULL
, prval
);
942 void hit_set_ls(std::list
<std::pair
<ceph::real_time
, ceph::real_time
> > *pls
,
944 add_op(CEPH_OSD_OP_PG_HITSET_LS
);
945 unsigned p
= ops
.size() - 1;
947 C_ObjectOperation_hit_set_ls
*h
=
948 new C_ObjectOperation_hit_set_ls(NULL
, pls
, prval
);
956 * Return an encoded HitSet that includes the provided time
959 * @param stamp [in] timestamp
960 * @param pbl [out] target buffer for encoded HitSet
961 * @param prval [out] return value
963 void hit_set_get(ceph::real_time stamp
, bufferlist
*pbl
, int *prval
) {
964 OSDOp
& op
= add_op(CEPH_OSD_OP_PG_HITSET_GET
);
965 op
.op
.hit_set_get
.stamp
= ceph::real_clock::to_ceph_timespec(stamp
);
966 unsigned p
= ops
.size() - 1;
971 void omap_get_header(bufferlist
*bl
, int *prval
) {
972 add_op(CEPH_OSD_OP_OMAPGETHEADER
);
973 unsigned p
= ops
.size() - 1;
978 void omap_set(const map
<string
, bufferlist
> &map
) {
981 add_data(CEPH_OSD_OP_OMAPSETVALS
, 0, bl
.length(), bl
);
984 void omap_set_header(bufferlist
&bl
) {
985 add_data(CEPH_OSD_OP_OMAPSETHEADER
, 0, bl
.length(), bl
);
989 add_op(CEPH_OSD_OP_OMAPCLEAR
);
992 void omap_rm_keys(const std::set
<std::string
> &to_remove
) {
994 ::encode(to_remove
, bl
);
995 add_data(CEPH_OSD_OP_OMAPRMKEYS
, 0, bl
.length(), bl
);
999 void call(const char *cname
, const char *method
, bufferlist
&indata
) {
1000 add_call(CEPH_OSD_OP_CALL
, cname
, method
, indata
, NULL
, NULL
, NULL
);
1003 void call(const char *cname
, const char *method
, bufferlist
&indata
,
1004 bufferlist
*outdata
, Context
*ctx
, int *prval
) {
1005 add_call(CEPH_OSD_OP_CALL
, cname
, method
, indata
, outdata
, ctx
, prval
);
1009 void watch(uint64_t cookie
, __u8 op
, uint32_t timeout
= 0) {
1010 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_WATCH
);
1011 osd_op
.op
.watch
.cookie
= cookie
;
1012 osd_op
.op
.watch
.op
= op
;
1013 osd_op
.op
.watch
.timeout
= timeout
;
1016 void notify(uint64_t cookie
, uint32_t prot_ver
, uint32_t timeout
,
1017 bufferlist
&bl
, bufferlist
*inbl
) {
1018 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_NOTIFY
);
1019 osd_op
.op
.notify
.cookie
= cookie
;
1020 ::encode(prot_ver
, *inbl
);
1021 ::encode(timeout
, *inbl
);
1022 ::encode(bl
, *inbl
);
1023 osd_op
.indata
.append(*inbl
);
1026 void notify_ack(uint64_t notify_id
, uint64_t cookie
,
1027 bufferlist
& reply_bl
) {
1028 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_NOTIFY_ACK
);
1030 ::encode(notify_id
, bl
);
1031 ::encode(cookie
, bl
);
1032 ::encode(reply_bl
, bl
);
1033 osd_op
.indata
.append(bl
);
1036 void list_watchers(list
<obj_watch_t
> *out
,
1038 (void)add_op(CEPH_OSD_OP_LIST_WATCHERS
);
1040 unsigned p
= ops
.size() - 1;
1041 C_ObjectOperation_decodewatchers
*h
=
1042 new C_ObjectOperation_decodewatchers(out
, prval
);
1045 out_rval
[p
] = prval
;
1049 void list_snaps(librados::snap_set_t
*out
, int *prval
) {
1050 (void)add_op(CEPH_OSD_OP_LIST_SNAPS
);
1052 unsigned p
= ops
.size() - 1;
1053 C_ObjectOperation_decodesnaps
*h
=
1054 new C_ObjectOperation_decodesnaps(out
, prval
);
1057 out_rval
[p
] = prval
;
1061 void assert_version(uint64_t ver
) {
1062 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_ASSERT_VER
);
1063 osd_op
.op
.assert_ver
.ver
= ver
;
1066 void cmpxattr(const char *name
, const bufferlist
& val
,
1068 add_xattr(CEPH_OSD_OP_CMPXATTR
, name
, val
);
1069 OSDOp
& o
= *ops
.rbegin();
1070 o
.op
.xattr
.cmp_op
= op
;
1071 o
.op
.xattr
.cmp_mode
= mode
;
1074 void rollback(uint64_t snapid
) {
1075 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_ROLLBACK
);
1076 osd_op
.op
.snap
.snapid
= snapid
;
1079 void copy_from(object_t src
, snapid_t snapid
, object_locator_t src_oloc
,
1080 version_t src_version
, unsigned flags
,
1081 unsigned src_fadvise_flags
) {
1082 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_COPY_FROM
);
1083 osd_op
.op
.copy_from
.snapid
= snapid
;
1084 osd_op
.op
.copy_from
.src_version
= src_version
;
1085 osd_op
.op
.copy_from
.flags
= flags
;
1086 osd_op
.op
.copy_from
.src_fadvise_flags
= src_fadvise_flags
;
1087 ::encode(src
, osd_op
.indata
);
1088 ::encode(src_oloc
, osd_op
.indata
);
1092 * writeback content to backing tier
1094 * If object is marked dirty in the cache tier, write back content
1095 * to backing tier. If the object is clean this is a no-op.
1097 * If writeback races with an update, the update will block.
1099 * use with IGNORE_CACHE to avoid triggering promote.
1101 void cache_flush() {
1102 add_op(CEPH_OSD_OP_CACHE_FLUSH
);
1106 * writeback content to backing tier
1108 * If object is marked dirty in the cache tier, write back content
1109 * to backing tier. If the object is clean this is a no-op.
1111 * If writeback races with an update, return EAGAIN. Requires that
1112 * the SKIPRWLOCKS flag be set.
1114 * use with IGNORE_CACHE to avoid triggering promote.
1116 void cache_try_flush() {
1117 add_op(CEPH_OSD_OP_CACHE_TRY_FLUSH
);
1121 * evict object from cache tier
1123 * If object is marked clean, remove the object from the cache tier.
1124 * Otherwise, return EBUSY.
1126 * use with IGNORE_CACHE to avoid triggering promote.
1128 void cache_evict() {
1129 add_op(CEPH_OSD_OP_CACHE_EVICT
);
1135 void set_redirect(object_t tgt
, snapid_t snapid
, object_locator_t tgt_oloc
,
1136 version_t tgt_version
) {
1137 OSDOp
& osd_op
= add_op(CEPH_OSD_OP_SET_REDIRECT
);
1138 osd_op
.op
.copy_from
.snapid
= snapid
;
1139 osd_op
.op
.copy_from
.src_version
= tgt_version
;
1140 ::encode(tgt
, osd_op
.indata
);
1141 ::encode(tgt_oloc
, osd_op
.indata
);
1144 void set_alloc_hint(uint64_t expected_object_size
,
1145 uint64_t expected_write_size
,
1147 add_alloc_hint(CEPH_OSD_OP_SETALLOCHINT
, expected_object_size
,
1148 expected_write_size
, flags
);
1150 // CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
1151 // not worth a feature bit. Set FAILOK per-op flag to make
1152 // sure older osds don't trip over an unsupported opcode.
1153 set_last_op_flags(CEPH_OSD_OP_FLAG_FAILOK
);
1156 void dup(vector
<OSDOp
>& sops
) {
1158 out_bl
.resize(sops
.size());
1159 out_handler
.resize(sops
.size());
1160 out_rval
.resize(sops
.size());
1161 for (uint32_t i
= 0; i
< sops
.size(); i
++) {
1162 out_bl
[i
] = &sops
[i
].outdata
;
1163 out_handler
[i
] = NULL
;
1164 out_rval
[i
] = &sops
[i
].rval
;
1169 * Pin/unpin an object in cache tier
1172 add_op(CEPH_OSD_OP_CACHE_PIN
);
1175 void cache_unpin() {
1176 add_op(CEPH_OSD_OP_CACHE_UNPIN
);
1184 class Objecter
: public md_config_obs_t
, public Dispatcher
{
1186 // config observer bits
1187 const char** get_tracked_conf_keys() const override
;
1188 void handle_conf_change(const struct md_config_t
*conf
,
1189 const std::set
<std::string
> &changed
) override
;
1192 Messenger
*messenger
;
1195 ZTracer::Endpoint trace_endpoint
;
1199 using Dispatcher::cct
;
1200 std::multimap
<string
,string
> crush_location
;
1202 std::atomic
<bool> initialized
{false};
1205 std::atomic
<uint64_t> last_tid
{0};
1206 std::atomic
<unsigned> inflight_ops
{0};
1207 std::atomic
<int> client_inc
{-1};
1208 uint64_t max_linger_id
;
1209 std::atomic
<unsigned> num_in_flight
{0};
1210 std::atomic
<int> global_op_flags
{0}; // flags which are applied to each IO op
1211 bool keep_balanced_budget
;
1212 bool honor_osdmap_full
;
1213 bool osdmap_full_try
;
1215 // If this is true, accumulate a set of blacklisted entities
1216 // to be drained by consume_blacklist_events.
1217 bool blacklist_events_enabled
;
1218 std::set
<entity_addr_t
> blacklist_events
;
1221 void maybe_request_map();
1223 void enable_blacklist_events();
1226 void _maybe_request_map();
1228 version_t last_seen_osdmap_version
;
1229 version_t last_seen_pgmap_version
;
1231 mutable boost::shared_mutex rwlock
;
1232 using lock_guard
= std::unique_lock
<decltype(rwlock
)>;
1233 using unique_lock
= std::unique_lock
<decltype(rwlock
)>;
1234 using shared_lock
= boost::shared_lock
<decltype(rwlock
)>;
1235 using shunique_lock
= ceph::shunique_lock
<decltype(rwlock
)>;
1236 ceph::timer
<ceph::mono_clock
> timer
;
1238 PerfCounters
*logger
;
1240 uint64_t tick_event
;
1244 void update_crush_location();
1246 class RequestStateHook
;
1248 RequestStateHook
*m_request_state_hook
;
1251 /*** track pending operations ***/
1257 struct op_target_t
{
1260 epoch_t epoch
= 0; ///< latest epoch we calculated the mapping
1263 object_locator_t base_oloc
;
1264 object_t target_oid
;
1265 object_locator_t target_oloc
;
1267 ///< true if we are directed at base_pgid, not base_oid
1268 bool precalc_pgid
= false;
1270 ///< true if we have ever mapped to a valid pool
1271 bool pool_ever_existed
= false;
1273 ///< explcit pg target, if any
1276 pg_t pgid
; ///< last (raw) pg we mapped to
1277 spg_t actual_pgid
; ///< last (actual) spg_t we mapped to
1278 unsigned pg_num
= 0; ///< last pg_num we mapped to
1279 unsigned pg_num_mask
= 0; ///< last pg_num_mask we mapped to
1280 vector
<int> up
; ///< set of up osds for last pg we mapped to
1281 vector
<int> acting
; ///< set of acting osds for last pg we mapped to
1282 int up_primary
= -1; ///< last up_primary we mapped to
1283 int acting_primary
= -1; ///< last acting_primary we mapped to
1284 int size
= -1; ///< the size of the pool when were were last mapped
1285 int min_size
= -1; ///< the min size of the pool when were were last mapped
1286 bool sort_bitwise
= false; ///< whether the hobject_t sort order is bitwise
1287 bool recovery_deletes
= false; ///< whether the deletes are performed during recovery instead of peering
1289 bool used_replica
= false;
1290 bool paused
= false;
1292 int osd
= -1; ///< the final target osd, or -1
1294 epoch_t last_force_resend
= 0;
1296 op_target_t(object_t oid
, object_locator_t oloc
, int flags
)
1302 op_target_t(pg_t pgid
)
1303 : base_oloc(pgid
.pool(), pgid
.ps()),
1308 op_target_t() = default;
1310 hobject_t
get_hobj() {
1311 return hobject_t(target_oid
,
1314 target_oloc
.hash
>= 0 ? target_oloc
.hash
: pgid
.ps(),
1316 target_oloc
.nspace
);
1319 bool contained_by(const hobject_t
& begin
, const hobject_t
& end
) {
1320 hobject_t h
= get_hobj();
1321 int r
= cmp(h
, begin
);
1322 return r
== 0 || (r
> 0 && h
< end
);
1325 void dump(Formatter
*f
) const;
1328 struct Op
: public RefCountedObject
{
1329 OSDSession
*session
;
1334 ConnectionRef con
; // for rx buffer only
1335 uint64_t features
; // explicitly specified op features
1341 ceph::real_time mtime
;
1344 vector
<bufferlist
*> out_bl
;
1345 vector
<Context
*> out_handler
;
1346 vector
<int*> out_rval
;
1356 epoch_t
*reply_epoch
;
1358 ceph::mono_time stamp
;
1360 epoch_t map_dne_bound
;
1364 /// true if we should resend this message on failure
1367 /// true if the throttle budget is get/put on a series of OPs,
1368 /// instead of per OP basis, when this flag is set, the budget is
1369 /// acquired before sending the very first OP of the series and
1370 /// released upon receiving the last OP reply.
1375 osd_reqid_t reqid
; // explicitly setting reqid
1376 ZTracer::Trace trace
;
1378 Op(const object_t
& o
, const object_locator_t
& ol
, vector
<OSDOp
>& op
,
1379 int f
, Context
*fin
, version_t
*ov
, int *offset
= NULL
,
1380 ZTracer::Trace
*parent_trace
= nullptr) :
1381 session(NULL
), incarnation(0),
1384 features(CEPH_FEATURES_SUPPORTED_DEFAULT
),
1385 snapid(CEPH_NOSNAP
),
1396 should_resend(true),
1397 ctx_budgeted(false),
1398 data_offset(offset
) {
1401 /* initialize out_* to match op vector */
1402 out_bl
.resize(ops
.size());
1403 out_rval
.resize(ops
.size());
1404 out_handler
.resize(ops
.size());
1405 for (unsigned i
= 0; i
< ops
.size(); i
++) {
1407 out_handler
[i
] = NULL
;
1411 if (target
.base_oloc
.key
== o
)
1412 target
.base_oloc
.key
.clear();
1414 if (parent_trace
&& parent_trace
->valid()) {
1415 trace
.init("op", nullptr, parent_trace
);
1416 trace
.event("start");
1420 bool operator<(const Op
& other
) const {
1421 return tid
< other
.tid
;
1424 bool respects_full() const {
1426 (target
.flags
& (CEPH_OSD_FLAG_WRITE
| CEPH_OSD_FLAG_RWORDERED
)) &&
1427 !(target
.flags
& (CEPH_OSD_FLAG_FULL_TRY
| CEPH_OSD_FLAG_FULL_FORCE
));
1432 while (!out_handler
.empty()) {
1433 delete out_handler
.back();
1434 out_handler
.pop_back();
1436 trace
.event("finish");
1440 struct C_Op_Map_Latest
: public Context
{
1444 C_Op_Map_Latest(Objecter
*o
, ceph_tid_t t
) : objecter(o
), tid(t
),
1446 void finish(int r
) override
;
1449 struct C_Command_Map_Latest
: public Context
{
1453 C_Command_Map_Latest(Objecter
*o
, ceph_tid_t t
) : objecter(o
), tid(t
),
1455 void finish(int r
) override
;
1458 struct C_Stat
: public Context
{
1461 ceph::real_time
*pmtime
;
1463 C_Stat(uint64_t *ps
, ceph::real_time
*pm
, Context
*c
) :
1464 psize(ps
), pmtime(pm
), fin(c
) {}
1465 void finish(int r
) override
{
1467 bufferlist::iterator p
= bl
.begin();
1481 struct C_GetAttrs
: public Context
{
1483 map
<string
,bufferlist
>& attrset
;
1485 C_GetAttrs(map
<string
, bufferlist
>& set
, Context
*c
) : attrset(set
),
1487 void finish(int r
) override
{
1489 bufferlist::iterator p
= bl
.begin();
1490 ::decode(attrset
, p
);
1497 // Pools and statistics
1498 struct NListContext
{
1499 collection_list_handle_t pos
;
1501 // these are for !sortbitwise compat only
1503 int starting_pg_num
= 0;
1504 bool sort_bitwise
= false;
1506 bool at_end_of_pool
= false; ///< publicly visible end flag
1508 int64_t pool_id
= -1;
1509 int pool_snap_seq
= 0;
1510 uint64_t max_entries
= 0;
1513 bufferlist bl
; // raw data read to here
1514 std::list
<librados::ListObjectImpl
> list
;
1518 bufferlist extra_info
;
1520 // The budget associated with this context, once it is set (>= 0),
1521 // the budget is not get/released on OP basis, instead the budget
1522 // is acquired before sending the first OP and released upon receiving
1523 // the last op reply.
1524 int ctx_budget
= -1;
1526 bool at_end() const {
1527 return at_end_of_pool
;
1530 uint32_t get_pg_hash_position() const {
1531 return pos
.get_hash();
1535 struct C_NList
: public Context
{
1536 NListContext
*list_context
;
1537 Context
*final_finish
;
1540 C_NList(NListContext
*lc
, Context
* finish
, Objecter
*ob
) :
1541 list_context(lc
), final_finish(finish
), objecter(ob
), epoch(0) {}
1542 void finish(int r
) override
{
1544 objecter
->_nlist_reply(list_context
, r
, final_finish
, epoch
);
1546 final_finish
->complete(r
);
1555 map
<string
,pool_stat_t
> *pool_stats
;
1559 ceph::mono_time last_submit
;
1564 struct ceph_statfs
*stats
;
1565 boost::optional
<int64_t> data_pool
;
1569 ceph::mono_time last_submit
;
1584 ceph::mono_time last_submit
;
1585 PoolOp() : tid(0), pool(0), onfinish(NULL
), ontimeout(0), pool_op(0),
1586 auid(0), crush_rule(0), snapid(0), blp(NULL
) {}
1589 // -- osd commands --
1590 struct CommandOp
: public RefCountedObject
{
1591 OSDSession
*session
= nullptr;
1595 bufferlist
*poutbl
= nullptr;
1596 string
*prs
= nullptr;
1598 // target_osd == -1 means target_pg is valid
1599 const int target_osd
= -1;
1600 const pg_t target_pg
;
1604 epoch_t map_dne_bound
= 0;
1605 int map_check_error
= 0; // error to return if map check fails
1606 const char *map_check_error_str
= nullptr;
1608 Context
*onfinish
= nullptr;
1609 uint64_t ontimeout
= 0;
1610 ceph::mono_time last_submit
;
1614 const vector
<string
> &cmd
,
1623 target_osd(target_osd
),
1624 onfinish(onfinish
) {}
1628 const vector
<string
> &cmd
,
1639 onfinish(onfinish
) {}
1643 void submit_command(CommandOp
*c
, ceph_tid_t
*ptid
);
1644 int _calc_command_target(CommandOp
*c
, shunique_lock
&sul
);
1645 void _assign_command_session(CommandOp
*c
, shunique_lock
&sul
);
1646 void _send_command(CommandOp
*c
);
1647 int command_op_cancel(OSDSession
*s
, ceph_tid_t tid
, int r
);
1648 void _finish_command(CommandOp
*c
, int r
, string rs
);
1649 void handle_command_reply(MCommandReply
*m
);
1652 // -- lingering ops --
1654 struct WatchContext
{
1655 // this simply mirrors librados WatchCtx2
1656 virtual void handle_notify(uint64_t notify_id
,
1658 uint64_t notifier_id
,
1659 bufferlist
& bl
) = 0;
1660 virtual void handle_error(uint64_t cookie
, int err
) = 0;
1661 virtual ~WatchContext() {}
1664 struct LingerOp
: public RefCountedObject
{
1671 ceph::real_time mtime
;
1679 ceph::mono_time watch_valid_thru
; ///< send time for last acked ping
1680 int last_error
; ///< error from last failed ping|reconnect, if any
1681 boost::shared_mutex watch_lock
;
1682 using lock_guard
= std::unique_lock
<decltype(watch_lock
)>;
1683 using unique_lock
= std::unique_lock
<decltype(watch_lock
)>;
1684 using shared_lock
= boost::shared_lock
<decltype(watch_lock
)>;
1685 using shunique_lock
= ceph::shunique_lock
<decltype(watch_lock
)>;
1687 // queue of pending async operations, with the timestamp of
1688 // when they were queued.
1689 list
<ceph::mono_time
> watch_pending_async
;
1691 uint32_t register_gen
;
1694 Context
*on_reg_commit
;
1696 // we trigger these from an async finisher
1697 Context
*on_notify_finish
;
1698 bufferlist
*notify_result_bl
;
1701 WatchContext
*watch_context
;
1703 OSDSession
*session
;
1705 ceph_tid_t register_tid
;
1706 ceph_tid_t ping_tid
;
1707 epoch_t map_dne_bound
;
1709 void _queued_async() {
1710 // watch_lock ust be locked unique
1711 watch_pending_async
.push_back(ceph::mono_clock::now());
1713 void finished_async() {
1714 unique_lock
l(watch_lock
);
1715 assert(!watch_pending_async
.empty());
1716 watch_pending_async
.pop_front();
1719 LingerOp() : linger_id(0),
1720 target(object_t(), object_locator_t(), 0),
1721 snap(CEPH_NOSNAP
), poutbl(NULL
), pobjver(NULL
),
1722 is_watch(false), last_error(0),
1726 on_reg_commit(NULL
),
1727 on_notify_finish(NULL
),
1728 notify_result_bl(NULL
),
1730 watch_context(NULL
),
1737 const LingerOp
&operator=(const LingerOp
& r
);
1738 LingerOp(const LingerOp
& o
);
1740 uint64_t get_cookie() {
1741 return reinterpret_cast<uint64_t>(this);
1745 ~LingerOp() override
{
1746 delete watch_context
;
1750 struct C_Linger_Commit
: public Context
{
1753 bufferlist outbl
; // used for notify only
1754 C_Linger_Commit(Objecter
*o
, LingerOp
*l
) : objecter(o
), info(l
) {
1757 ~C_Linger_Commit() override
{
1760 void finish(int r
) override
{
1761 objecter
->_linger_commit(info
, r
, outbl
);
1765 struct C_Linger_Reconnect
: public Context
{
1768 C_Linger_Reconnect(Objecter
*o
, LingerOp
*l
) : objecter(o
), info(l
) {
1771 ~C_Linger_Reconnect() override
{
1774 void finish(int r
) override
{
1775 objecter
->_linger_reconnect(info
, r
);
1779 struct C_Linger_Ping
: public Context
{
1782 ceph::mono_time sent
;
1783 uint32_t register_gen
;
1784 C_Linger_Ping(Objecter
*o
, LingerOp
*l
)
1785 : objecter(o
), info(l
), register_gen(info
->register_gen
) {
1788 ~C_Linger_Ping() override
{
1791 void finish(int r
) override
{
1792 objecter
->_linger_ping(info
, r
, sent
, register_gen
);
1796 struct C_Linger_Map_Latest
: public Context
{
1800 C_Linger_Map_Latest(Objecter
*o
, uint64_t id
) :
1801 objecter(o
), linger_id(id
), latest(0) {}
1802 void finish(int r
) override
;
1805 // -- osd sessions --
1809 hobject_t begin
, end
;
1812 struct OSDSession
: public RefCountedObject
{
1813 boost::shared_mutex lock
;
1814 using lock_guard
= std::lock_guard
<decltype(lock
)>;
1815 using unique_lock
= std::unique_lock
<decltype(lock
)>;
1816 using shared_lock
= boost::shared_lock
<decltype(lock
)>;
1817 using shunique_lock
= ceph::shunique_lock
<decltype(lock
)>;
1820 map
<ceph_tid_t
,Op
*> ops
;
1821 map
<uint64_t, LingerOp
*> linger_ops
;
1822 map
<ceph_tid_t
,CommandOp
*> command_ops
;
1825 map
<spg_t
,map
<hobject_t
,OSDBackoff
>> backoffs
;
1826 map
<uint64_t,OSDBackoff
*> backoffs_by_id
;
1832 std::unique_ptr
<std::mutex
[]> completion_locks
;
1833 using unique_completion_lock
= std::unique_lock
<
1834 decltype(completion_locks
)::element_type
>;
1837 OSDSession(CephContext
*cct
, int o
) :
1838 osd(o
), incarnation(0), con(NULL
),
1839 num_locks(cct
->_conf
->objecter_completion_locks_per_session
),
1840 completion_locks(new std::mutex
[num_locks
]) {}
1842 ~OSDSession() override
;
1844 bool is_homeless() { return (osd
== -1); }
1846 unique_completion_lock
get_lock(object_t
& oid
);
1848 map
<int,OSDSession
*> osd_sessions
;
1850 bool osdmap_full_flag() const;
1851 bool osdmap_pool_full(const int64_t pool_id
) const;
1856 * Test pg_pool_t::FLAG_FULL on a pool
1858 * @return true if the pool exists and has the flag set, or
1859 * the global full flag is set, else false
1861 bool _osdmap_pool_full(const int64_t pool_id
) const;
1862 bool _osdmap_pool_full(const pg_pool_t
&p
) const;
1863 void update_pool_full_map(map
<int64_t, bool>& pool_full_map
);
1865 map
<uint64_t, LingerOp
*> linger_ops
;
1866 // we use this just to confirm a cookie is valid before dereferencing the ptr
1867 set
<LingerOp
*> linger_ops_set
;
1869 map
<ceph_tid_t
,PoolStatOp
*> poolstat_ops
;
1870 map
<ceph_tid_t
,StatfsOp
*> statfs_ops
;
1871 map
<ceph_tid_t
,PoolOp
*> pool_ops
;
1872 std::atomic
<unsigned> num_homeless_ops
{0};
1874 OSDSession
*homeless_session
;
1876 // ops waiting for an osdmap with a new pool or confirmation that
1877 // the pool does not exist (may be expanded to other uses later)
1878 map
<uint64_t, LingerOp
*> check_latest_map_lingers
;
1879 map
<ceph_tid_t
, Op
*> check_latest_map_ops
;
1880 map
<ceph_tid_t
, CommandOp
*> check_latest_map_commands
;
1882 map
<epoch_t
,list
< pair
<Context
*, int> > > waiting_for_map
;
1884 ceph::timespan mon_timeout
;
1885 ceph::timespan osd_timeout
;
1887 MOSDOp
*_prepare_osd_op(Op
*op
);
1888 void _send_op(Op
*op
, MOSDOp
*m
= NULL
);
1889 void _send_op_account(Op
*op
);
1890 void _cancel_linger_op(Op
*op
);
1891 void finish_op(OSDSession
*session
, ceph_tid_t tid
);
1892 void _finish_op(Op
*op
, int r
);
1893 static bool is_pg_changed(
1895 const vector
<int>& oldacting
,
1897 const vector
<int>& newacting
,
1898 bool any_change
=false);
1899 enum recalc_op_target_result
{
1900 RECALC_OP_TARGET_NO_ACTION
= 0,
1901 RECALC_OP_TARGET_NEED_RESEND
,
1902 RECALC_OP_TARGET_POOL_DNE
,
1903 RECALC_OP_TARGET_OSD_DNE
,
1904 RECALC_OP_TARGET_OSD_DOWN
,
1906 bool _osdmap_full_flag() const;
1907 bool _osdmap_has_pool_full() const;
1909 bool target_should_be_paused(op_target_t
*op
);
1910 int _calc_target(op_target_t
*t
, Connection
*con
,
1911 bool any_change
= false);
1912 int _map_session(op_target_t
*op
, OSDSession
**s
,
1915 void _session_op_assign(OSDSession
*s
, Op
*op
);
1916 void _session_op_remove(OSDSession
*s
, Op
*op
);
1917 void _session_linger_op_assign(OSDSession
*to
, LingerOp
*op
);
1918 void _session_linger_op_remove(OSDSession
*from
, LingerOp
*op
);
1919 void _session_command_op_assign(OSDSession
*to
, CommandOp
*op
);
1920 void _session_command_op_remove(OSDSession
*from
, CommandOp
*op
);
1922 int _assign_op_target_session(Op
*op
, shunique_lock
& lc
,
1923 bool src_session_locked
,
1924 bool dst_session_locked
);
1925 int _recalc_linger_op_target(LingerOp
*op
, shunique_lock
& lc
);
1927 void _linger_submit(LingerOp
*info
, shunique_lock
& sul
);
1928 void _send_linger(LingerOp
*info
, shunique_lock
& sul
);
1929 void _linger_commit(LingerOp
*info
, int r
, bufferlist
& outbl
);
1930 void _linger_reconnect(LingerOp
*info
, int r
);
1931 void _send_linger_ping(LingerOp
*info
);
1932 void _linger_ping(LingerOp
*info
, int r
, ceph::mono_time sent
,
1933 uint32_t register_gen
);
1934 int _normalize_watch_error(int r
);
1936 friend class C_DoWatchError
;
1938 void linger_callback_flush(Context
*ctx
) {
1939 finisher
->queue(ctx
);
1943 void _check_op_pool_dne(Op
*op
, unique_lock
*sl
);
1944 void _send_op_map_check(Op
*op
);
1945 void _op_cancel_map_check(Op
*op
);
1946 void _check_linger_pool_dne(LingerOp
*op
, bool *need_unregister
);
1947 void _send_linger_map_check(LingerOp
*op
);
1948 void _linger_cancel_map_check(LingerOp
*op
);
1949 void _check_command_map_dne(CommandOp
*op
);
1950 void _send_command_map_check(CommandOp
*op
);
1951 void _command_cancel_map_check(CommandOp
*op
);
1953 void kick_requests(OSDSession
*session
);
1954 void _kick_requests(OSDSession
*session
, map
<uint64_t, LingerOp
*>& lresend
);
1955 void _linger_ops_resend(map
<uint64_t, LingerOp
*>& lresend
, unique_lock
& ul
);
1957 int _get_session(int osd
, OSDSession
**session
, shunique_lock
& sul
);
1958 void put_session(OSDSession
*s
);
1959 void get_session(OSDSession
*s
);
1960 void _reopen_session(OSDSession
*session
);
1961 void close_session(OSDSession
*session
);
1963 void _nlist_reply(NListContext
*list_context
, int r
, Context
*final_finish
,
1964 epoch_t reply_epoch
);
1966 void resend_mon_ops();
1969 * handle a budget for in-flight ops
1970 * budget is taken whenever an op goes into the ops map
1971 * and returned whenever an op is removed from the map
1972 * If throttle_op needs to throttle it will unlock client_lock.
1974 int calc_op_budget(Op
*op
);
1975 void _throttle_op(Op
*op
, shunique_lock
& sul
, int op_size
= 0);
1976 int _take_op_budget(Op
*op
, shunique_lock
& sul
) {
1977 assert(sul
&& sul
.mutex() == &rwlock
);
1978 int op_budget
= calc_op_budget(op
);
1979 if (keep_balanced_budget
) {
1980 _throttle_op(op
, sul
, op_budget
);
1982 op_throttle_bytes
.take(op_budget
);
1983 op_throttle_ops
.take(1);
1985 op
->budgeted
= true;
1988 void put_op_budget_bytes(int op_budget
) {
1989 assert(op_budget
>= 0);
1990 op_throttle_bytes
.put(op_budget
);
1991 op_throttle_ops
.put(1);
1993 void put_op_budget(Op
*op
) {
1994 assert(op
->budgeted
);
1995 int op_budget
= calc_op_budget(op
);
1996 put_op_budget_bytes(op_budget
);
1998 void put_nlist_context_budget(NListContext
*list_context
);
1999 Throttle op_throttle_bytes
, op_throttle_ops
;
2002 Objecter(CephContext
*cct_
, Messenger
*m
, MonClient
*mc
,
2005 double osd_timeout
) :
2006 Dispatcher(cct_
), messenger(m
), monc(mc
), finisher(fin
),
2007 trace_endpoint("0.0.0.0", 0, "Objecter"),
2010 keep_balanced_budget(false), honor_osdmap_full(true), osdmap_full_try(false),
2011 blacklist_events_enabled(false),
2012 last_seen_osdmap_version(0), last_seen_pgmap_version(0),
2013 logger(NULL
), tick_event(0), m_request_state_hook(NULL
),
2014 homeless_session(new OSDSession(cct
, -1)),
2015 mon_timeout(ceph::make_timespan(mon_timeout
)),
2016 osd_timeout(ceph::make_timespan(osd_timeout
)),
2017 op_throttle_bytes(cct
, "objecter_bytes",
2018 cct
->_conf
->objecter_inflight_op_bytes
),
2019 op_throttle_ops(cct
, "objecter_ops", cct
->_conf
->objecter_inflight_ops
),
2021 retry_writes_after_first_reply(cct
->_conf
->objecter_retry_writes_after_first_reply
)
2023 ~Objecter() override
;
2026 void start(const OSDMap
*o
= nullptr);
2029 // These two templates replace osdmap_(get)|(put)_read. Simply wrap
2030 // whatever functionality you want to use the OSDMap in a lambda like:
2032 // with_osdmap([](const OSDMap& o) { o.do_stuff(); });
2036 // auto t = with_osdmap([&](const OSDMap& o) { return o.lookup_stuff(x); });
2038 // Do not call into something that will try to lock the OSDMap from
2039 // here or you will have great woe and misery.
2041 template<typename Callback
, typename
...Args
>
2042 auto with_osdmap(Callback
&& cb
, Args
&&... args
) const ->
2043 decltype(cb(*osdmap
, std::forward
<Args
>(args
)...)) {
2044 shared_lock
l(rwlock
);
2045 return std::forward
<Callback
>(cb
)(*osdmap
, std::forward
<Args
>(args
)...);
2050 * Tell the objecter to throttle outgoing ops according to its
2051 * budget (in _conf). If you do this, ops can block, in
2052 * which case it will unlock client_lock and sleep until
2053 * incoming messages reduce the used budget low enough for
2054 * the ops to continue going; then it will lock client_lock again.
2056 void set_balanced_budget() { keep_balanced_budget
= true; }
2057 void unset_balanced_budget() { keep_balanced_budget
= false; }
2059 void set_honor_osdmap_full() { honor_osdmap_full
= true; }
2060 void unset_honor_osdmap_full() { honor_osdmap_full
= false; }
2062 void set_osdmap_full_try() { osdmap_full_try
= true; }
2063 void unset_osdmap_full_try() { osdmap_full_try
= false; }
2065 void _scan_requests(OSDSession
*s
,
2068 map
<int64_t, bool> *pool_full_map
,
2069 map
<ceph_tid_t
, Op
*>& need_resend
,
2070 list
<LingerOp
*>& need_resend_linger
,
2071 map
<ceph_tid_t
, CommandOp
*>& need_resend_command
,
2072 shunique_lock
& sul
);
2074 int64_t get_object_hash_position(int64_t pool
, const string
& key
,
2076 int64_t get_object_pg_hash_position(int64_t pool
, const string
& key
,
2081 bool ms_dispatch(Message
*m
) override
;
2082 bool ms_can_fast_dispatch_any() const override
{
2085 bool ms_can_fast_dispatch(const Message
*m
) const override
{
2086 switch (m
->get_type()) {
2087 case CEPH_MSG_OSD_OPREPLY
:
2088 case CEPH_MSG_WATCH_NOTIFY
:
2094 void ms_fast_dispatch(Message
*m
) override
{
2095 if (!ms_dispatch(m
)) {
2100 void handle_osd_op_reply(class MOSDOpReply
*m
);
2101 void handle_osd_backoff(class MOSDBackoff
*m
);
2102 void handle_watch_notify(class MWatchNotify
*m
);
2103 void handle_osd_map(class MOSDMap
*m
);
2104 void wait_for_osd_map();
2107 * Get list of entities blacklisted since this was last called,
2108 * and reset the list.
2110 * Uses a std::set because typical use case is to compare some
2111 * other list of clients to see which overlap with the blacklisted
2115 void consume_blacklist_events(std::set
<entity_addr_t
> *events
);
2117 int pool_snap_by_name(int64_t poolid
,
2118 const char *snap_name
,
2119 snapid_t
*snap
) const;
2120 int pool_snap_get_info(int64_t poolid
, snapid_t snap
,
2121 pool_snap_info_t
*info
) const;
2122 int pool_snap_list(int64_t poolid
, vector
<uint64_t> *snaps
);
2125 void emit_blacklist_events(const OSDMap::Incremental
&inc
);
2126 void emit_blacklist_events(const OSDMap
&old_osd_map
,
2127 const OSDMap
&new_osd_map
);
2130 void _op_submit(Op
*op
, shunique_lock
& lc
, ceph_tid_t
*ptid
);
2131 void _op_submit_with_budget(Op
*op
, shunique_lock
& lc
,
2133 int *ctx_budget
= NULL
);
2134 inline void unregister_op(Op
*op
);
2138 void op_submit(Op
*op
, ceph_tid_t
*ptid
= NULL
, int *ctx_budget
= NULL
);
2140 shared_lock
l(rwlock
);
2141 return !((!inflight_ops
) && linger_ops
.empty() &&
2142 poolstat_ops
.empty() && statfs_ops
.empty());
2146 * Output in-flight requests
2148 void _dump_active(OSDSession
*s
);
2149 void _dump_active();
2151 void dump_requests(Formatter
*fmt
);
2152 void _dump_ops(const OSDSession
*s
, Formatter
*fmt
);
2153 void dump_ops(Formatter
*fmt
);
2154 void _dump_linger_ops(const OSDSession
*s
, Formatter
*fmt
);
2155 void dump_linger_ops(Formatter
*fmt
);
2156 void _dump_command_ops(const OSDSession
*s
, Formatter
*fmt
);
2157 void dump_command_ops(Formatter
*fmt
);
2158 void dump_pool_ops(Formatter
*fmt
) const;
2159 void dump_pool_stat_ops(Formatter
*fmt
) const;
2160 void dump_statfs_ops(Formatter
*fmt
) const;
2162 int get_client_incarnation() const { return client_inc
; }
2163 void set_client_incarnation(int inc
) { client_inc
= inc
; }
2165 bool have_map(epoch_t epoch
);
2166 /// wait for epoch; true if we already have it
2167 bool wait_for_map(epoch_t epoch
, Context
*c
, int err
=0);
2168 void _wait_for_new_map(Context
*c
, epoch_t epoch
, int err
=0);
2169 void wait_for_latest_osdmap(Context
*fin
);
2170 void get_latest_version(epoch_t oldest
, epoch_t neweset
, Context
*fin
);
2171 void _get_latest_version(epoch_t oldest
, epoch_t neweset
, Context
*fin
);
2173 /** Get the current set of global op flags */
2174 int get_global_op_flags() const { return global_op_flags
; }
2175 /** Add a flag to the global op flags, not really atomic operation */
2176 void add_global_op_flags(int flag
) {
2177 global_op_flags
.fetch_or(flag
);
2179 /** Clear the passed flags from the global op flag set */
2180 void clear_global_op_flag(int flags
) {
2181 global_op_flags
.fetch_and(~flags
);
2184 /// cancel an in-progress request with the given return code
2186 int op_cancel(OSDSession
*s
, ceph_tid_t tid
, int r
);
2187 int _op_cancel(ceph_tid_t tid
, int r
);
2189 int op_cancel(ceph_tid_t tid
, int r
);
2190 int op_cancel(const vector
<ceph_tid_t
>& tidls
, int r
);
2193 * Any write op which is in progress at the start of this call shall no
2194 * longer be in progress when this call ends. Operations started after the
2195 * start of this call may still be in progress when this call ends.
2197 * @return the latest possible epoch in which a cancelled op could have
2198 * existed, or -1 if nothing was cancelled.
2200 epoch_t
op_cancel_writes(int r
, int64_t pool
=-1);
2203 void osd_command(int osd
, const std::vector
<string
>& cmd
,
2204 const bufferlist
& inbl
, ceph_tid_t
*ptid
,
2205 bufferlist
*poutbl
, string
*prs
, Context
*onfinish
) {
2207 CommandOp
*c
= new CommandOp(
2214 submit_command(c
, ptid
);
2216 void pg_command(pg_t pgid
, const vector
<string
>& cmd
,
2217 const bufferlist
& inbl
, ceph_tid_t
*ptid
,
2218 bufferlist
*poutbl
, string
*prs
, Context
*onfinish
) {
2219 CommandOp
*c
= new CommandOp(
2226 submit_command(c
, ptid
);
2229 // mid-level helpers
2230 Op
*prepare_mutate_op(
2231 const object_t
& oid
, const object_locator_t
& oloc
,
2232 ObjectOperation
& op
, const SnapContext
& snapc
,
2233 ceph::real_time mtime
, int flags
,
2234 Context
*oncommit
, version_t
*objver
= NULL
,
2235 osd_reqid_t reqid
= osd_reqid_t(),
2236 ZTracer::Trace
*parent_trace
= nullptr) {
2237 Op
*o
= new Op(oid
, oloc
, op
.ops
, flags
| global_op_flags
|
2238 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
, nullptr, parent_trace
);
2239 o
->priority
= op
.priority
;
2242 o
->out_rval
.swap(op
.out_rval
);
2247 const object_t
& oid
, const object_locator_t
& oloc
,
2248 ObjectOperation
& op
, const SnapContext
& snapc
,
2249 ceph::real_time mtime
, int flags
,
2250 Context
*oncommit
, version_t
*objver
= NULL
,
2251 osd_reqid_t reqid
= osd_reqid_t()) {
2252 Op
*o
= prepare_mutate_op(oid
, oloc
, op
, snapc
, mtime
, flags
,
2253 oncommit
, objver
, reqid
);
2258 Op
*prepare_read_op(
2259 const object_t
& oid
, const object_locator_t
& oloc
,
2260 ObjectOperation
& op
,
2261 snapid_t snapid
, bufferlist
*pbl
, int flags
,
2262 Context
*onack
, version_t
*objver
= NULL
,
2263 int *data_offset
= NULL
,
2264 uint64_t features
= 0,
2265 ZTracer::Trace
*parent_trace
= nullptr) {
2266 Op
*o
= new Op(oid
, oloc
, op
.ops
, flags
| global_op_flags
|
2267 CEPH_OSD_FLAG_READ
, onack
, objver
, data_offset
, parent_trace
);
2268 o
->priority
= op
.priority
;
2271 if (!o
->outbl
&& op
.size() == 1 && op
.out_bl
[0]->length())
2272 o
->outbl
= op
.out_bl
[0];
2273 o
->out_bl
.swap(op
.out_bl
);
2274 o
->out_handler
.swap(op
.out_handler
);
2275 o
->out_rval
.swap(op
.out_rval
);
2279 const object_t
& oid
, const object_locator_t
& oloc
,
2280 ObjectOperation
& op
,
2281 snapid_t snapid
, bufferlist
*pbl
, int flags
,
2282 Context
*onack
, version_t
*objver
= NULL
,
2283 int *data_offset
= NULL
,
2284 uint64_t features
= 0) {
2285 Op
*o
= prepare_read_op(oid
, oloc
, op
, snapid
, pbl
, flags
, onack
, objver
,
2288 o
->features
= features
;
2293 Op
*prepare_pg_read_op(
2294 uint32_t hash
, object_locator_t oloc
,
2295 ObjectOperation
& op
, bufferlist
*pbl
, int flags
,
2296 Context
*onack
, epoch_t
*reply_epoch
,
2298 Op
*o
= new Op(object_t(), oloc
,
2300 flags
| global_op_flags
| CEPH_OSD_FLAG_READ
|
2301 CEPH_OSD_FLAG_IGNORE_OVERLAY
,
2303 o
->target
.precalc_pgid
= true;
2304 o
->target
.base_pgid
= pg_t(hash
, oloc
.pool
);
2305 o
->priority
= op
.priority
;
2306 o
->snapid
= CEPH_NOSNAP
;
2308 o
->out_bl
.swap(op
.out_bl
);
2309 o
->out_handler
.swap(op
.out_handler
);
2310 o
->out_rval
.swap(op
.out_rval
);
2311 o
->reply_epoch
= reply_epoch
;
2313 // budget is tracked by listing context
2314 o
->ctx_budgeted
= true;
2319 uint32_t hash
, object_locator_t oloc
,
2320 ObjectOperation
& op
, bufferlist
*pbl
, int flags
,
2321 Context
*onack
, epoch_t
*reply_epoch
,
2323 Op
*o
= prepare_pg_read_op(hash
, oloc
, op
, pbl
, flags
,
2324 onack
, reply_epoch
, ctx_budget
);
2326 op_submit(o
, &tid
, ctx_budget
);
2330 // caller owns a ref
2331 LingerOp
*linger_register(const object_t
& oid
, const object_locator_t
& oloc
,
2333 ceph_tid_t
linger_watch(LingerOp
*info
,
2334 ObjectOperation
& op
,
2335 const SnapContext
& snapc
, ceph::real_time mtime
,
2339 ceph_tid_t
linger_notify(LingerOp
*info
,
2340 ObjectOperation
& op
,
2341 snapid_t snap
, bufferlist
& inbl
,
2345 int linger_check(LingerOp
*info
);
2346 void linger_cancel(LingerOp
*info
); // releases a reference
2347 void _linger_cancel(LingerOp
*info
);
2349 void _do_watch_notify(LingerOp
*info
, MWatchNotify
*m
);
2352 * set up initial ops in the op vector, and allocate a final op slot.
2354 * The caller is responsible for filling in the final ops_count ops.
2356 * @param ops op vector
2357 * @param ops_count number of final ops the caller will fill in
2358 * @param extra_ops pointer to [array of] initial op[s]
2359 * @return index of final op (for caller to fill in)
2361 int init_ops(vector
<OSDOp
>& ops
, int ops_count
, ObjectOperation
*extra_ops
) {
2366 extra
= extra_ops
->ops
.size();
2368 ops
.resize(ops_count
+ extra
);
2370 for (i
=0; i
<extra
; i
++) {
2371 ops
[i
] = extra_ops
->ops
[i
];
2378 // high-level helpers
2379 Op
*prepare_stat_op(
2380 const object_t
& oid
, const object_locator_t
& oloc
,
2381 snapid_t snap
, uint64_t *psize
, ceph::real_time
*pmtime
,
2382 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2383 ObjectOperation
*extra_ops
= NULL
) {
2385 int i
= init_ops(ops
, 1, extra_ops
);
2386 ops
[i
].op
.op
= CEPH_OSD_OP_STAT
;
2387 C_Stat
*fin
= new C_Stat(psize
, pmtime
, onfinish
);
2388 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2389 CEPH_OSD_FLAG_READ
, fin
, objver
);
2391 o
->outbl
= &fin
->bl
;
2395 const object_t
& oid
, const object_locator_t
& oloc
,
2396 snapid_t snap
, uint64_t *psize
, ceph::real_time
*pmtime
,
2397 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2398 ObjectOperation
*extra_ops
= NULL
) {
2399 Op
*o
= prepare_stat_op(oid
, oloc
, snap
, psize
, pmtime
, flags
,
2400 onfinish
, objver
, extra_ops
);
2406 Op
*prepare_read_op(
2407 const object_t
& oid
, const object_locator_t
& oloc
,
2408 uint64_t off
, uint64_t len
, snapid_t snap
, bufferlist
*pbl
,
2409 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2410 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0,
2411 ZTracer::Trace
*parent_trace
= nullptr) {
2413 int i
= init_ops(ops
, 1, extra_ops
);
2414 ops
[i
].op
.op
= CEPH_OSD_OP_READ
;
2415 ops
[i
].op
.extent
.offset
= off
;
2416 ops
[i
].op
.extent
.length
= len
;
2417 ops
[i
].op
.extent
.truncate_size
= 0;
2418 ops
[i
].op
.extent
.truncate_seq
= 0;
2419 ops
[i
].op
.flags
= op_flags
;
2420 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2421 CEPH_OSD_FLAG_READ
, onfinish
, objver
, nullptr, parent_trace
);
2427 const object_t
& oid
, const object_locator_t
& oloc
,
2428 uint64_t off
, uint64_t len
, snapid_t snap
, bufferlist
*pbl
,
2429 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2430 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2431 Op
*o
= prepare_read_op(oid
, oloc
, off
, len
, snap
, pbl
, flags
,
2432 onfinish
, objver
, extra_ops
, op_flags
);
2438 Op
*prepare_cmpext_op(
2439 const object_t
& oid
, const object_locator_t
& oloc
,
2440 uint64_t off
, bufferlist
&cmp_bl
,
2441 snapid_t snap
, int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2442 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2444 int i
= init_ops(ops
, 1, extra_ops
);
2445 ops
[i
].op
.op
= CEPH_OSD_OP_CMPEXT
;
2446 ops
[i
].op
.extent
.offset
= off
;
2447 ops
[i
].op
.extent
.length
= cmp_bl
.length();
2448 ops
[i
].op
.extent
.truncate_size
= 0;
2449 ops
[i
].op
.extent
.truncate_seq
= 0;
2450 ops
[i
].indata
= cmp_bl
;
2451 ops
[i
].op
.flags
= op_flags
;
2452 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2453 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
2459 const object_t
& oid
, const object_locator_t
& oloc
,
2460 uint64_t off
, bufferlist
&cmp_bl
,
2461 snapid_t snap
, int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2462 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2463 Op
*o
= prepare_cmpext_op(oid
, oloc
, off
, cmp_bl
, snap
,
2464 flags
, onfinish
, objver
, extra_ops
, op_flags
);
2470 ceph_tid_t
read_trunc(const object_t
& oid
, const object_locator_t
& oloc
,
2471 uint64_t off
, uint64_t len
, snapid_t snap
,
2472 bufferlist
*pbl
, int flags
, uint64_t trunc_size
,
2473 __u32 trunc_seq
, Context
*onfinish
,
2474 version_t
*objver
= NULL
,
2475 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2477 int i
= init_ops(ops
, 1, extra_ops
);
2478 ops
[i
].op
.op
= CEPH_OSD_OP_READ
;
2479 ops
[i
].op
.extent
.offset
= off
;
2480 ops
[i
].op
.extent
.length
= len
;
2481 ops
[i
].op
.extent
.truncate_size
= trunc_size
;
2482 ops
[i
].op
.extent
.truncate_seq
= trunc_seq
;
2483 ops
[i
].op
.flags
= op_flags
;
2484 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2485 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
2492 ceph_tid_t
mapext(const object_t
& oid
, const object_locator_t
& oloc
,
2493 uint64_t off
, uint64_t len
, snapid_t snap
, bufferlist
*pbl
,
2494 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2495 ObjectOperation
*extra_ops
= NULL
) {
2497 int i
= init_ops(ops
, 1, extra_ops
);
2498 ops
[i
].op
.op
= CEPH_OSD_OP_MAPEXT
;
2499 ops
[i
].op
.extent
.offset
= off
;
2500 ops
[i
].op
.extent
.length
= len
;
2501 ops
[i
].op
.extent
.truncate_size
= 0;
2502 ops
[i
].op
.extent
.truncate_seq
= 0;
2503 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2504 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
2511 ceph_tid_t
getxattr(const object_t
& oid
, const object_locator_t
& oloc
,
2512 const char *name
, snapid_t snap
, bufferlist
*pbl
, int flags
,
2514 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2516 int i
= init_ops(ops
, 1, extra_ops
);
2517 ops
[i
].op
.op
= CEPH_OSD_OP_GETXATTR
;
2518 ops
[i
].op
.xattr
.name_len
= (name
? strlen(name
) : 0);
2519 ops
[i
].op
.xattr
.value_len
= 0;
2521 ops
[i
].indata
.append(name
);
2522 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2523 CEPH_OSD_FLAG_READ
, onfinish
, objver
);
2531 ceph_tid_t
getxattrs(const object_t
& oid
, const object_locator_t
& oloc
,
2532 snapid_t snap
, map
<string
,bufferlist
>& attrset
,
2533 int flags
, Context
*onfinish
, version_t
*objver
= NULL
,
2534 ObjectOperation
*extra_ops
= NULL
) {
2536 int i
= init_ops(ops
, 1, extra_ops
);
2537 ops
[i
].op
.op
= CEPH_OSD_OP_GETXATTRS
;
2538 C_GetAttrs
*fin
= new C_GetAttrs(attrset
, onfinish
);
2539 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2540 CEPH_OSD_FLAG_READ
, fin
, objver
);
2542 o
->outbl
= &fin
->bl
;
2548 ceph_tid_t
read_full(const object_t
& oid
, const object_locator_t
& oloc
,
2549 snapid_t snap
, bufferlist
*pbl
, int flags
,
2550 Context
*onfinish
, version_t
*objver
= NULL
,
2551 ObjectOperation
*extra_ops
= NULL
) {
2552 return read(oid
, oloc
, 0, 0, snap
, pbl
, flags
| global_op_flags
|
2553 CEPH_OSD_FLAG_READ
, onfinish
, objver
, extra_ops
);
2558 ceph_tid_t
_modify(const object_t
& oid
, const object_locator_t
& oloc
,
2559 vector
<OSDOp
>& ops
, ceph::real_time mtime
,
2560 const SnapContext
& snapc
, int flags
,
2562 version_t
*objver
= NULL
) {
2563 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2564 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2571 Op
*prepare_write_op(
2572 const object_t
& oid
, const object_locator_t
& oloc
,
2573 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
2574 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2575 Context
*oncommit
, version_t
*objver
= NULL
,
2576 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0,
2577 ZTracer::Trace
*parent_trace
= nullptr) {
2579 int i
= init_ops(ops
, 1, extra_ops
);
2580 ops
[i
].op
.op
= CEPH_OSD_OP_WRITE
;
2581 ops
[i
].op
.extent
.offset
= off
;
2582 ops
[i
].op
.extent
.length
= len
;
2583 ops
[i
].op
.extent
.truncate_size
= 0;
2584 ops
[i
].op
.extent
.truncate_seq
= 0;
2586 ops
[i
].op
.flags
= op_flags
;
2587 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2588 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
,
2589 nullptr, parent_trace
);
2595 const object_t
& oid
, const object_locator_t
& oloc
,
2596 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
2597 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2598 Context
*oncommit
, version_t
*objver
= NULL
,
2599 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2600 Op
*o
= prepare_write_op(oid
, oloc
, off
, len
, snapc
, bl
, mtime
, flags
,
2601 oncommit
, objver
, extra_ops
, op_flags
);
2606 Op
*prepare_append_op(
2607 const object_t
& oid
, const object_locator_t
& oloc
,
2608 uint64_t len
, const SnapContext
& snapc
,
2609 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2611 version_t
*objver
= NULL
,
2612 ObjectOperation
*extra_ops
= NULL
) {
2614 int i
= init_ops(ops
, 1, extra_ops
);
2615 ops
[i
].op
.op
= CEPH_OSD_OP_APPEND
;
2616 ops
[i
].op
.extent
.offset
= 0;
2617 ops
[i
].op
.extent
.length
= len
;
2618 ops
[i
].op
.extent
.truncate_size
= 0;
2619 ops
[i
].op
.extent
.truncate_seq
= 0;
2621 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2622 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2628 const object_t
& oid
, const object_locator_t
& oloc
,
2629 uint64_t len
, const SnapContext
& snapc
,
2630 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2632 version_t
*objver
= NULL
,
2633 ObjectOperation
*extra_ops
= NULL
) {
2634 Op
*o
= prepare_append_op(oid
, oloc
, len
, snapc
, bl
, mtime
, flags
,
2635 oncommit
, objver
, extra_ops
);
2640 ceph_tid_t
write_trunc(const object_t
& oid
, const object_locator_t
& oloc
,
2641 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
2642 const bufferlist
&bl
, ceph::real_time mtime
, int flags
,
2643 uint64_t trunc_size
, __u32 trunc_seq
,
2645 version_t
*objver
= NULL
,
2646 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2648 int i
= init_ops(ops
, 1, extra_ops
);
2649 ops
[i
].op
.op
= CEPH_OSD_OP_WRITE
;
2650 ops
[i
].op
.extent
.offset
= off
;
2651 ops
[i
].op
.extent
.length
= len
;
2652 ops
[i
].op
.extent
.truncate_size
= trunc_size
;
2653 ops
[i
].op
.extent
.truncate_seq
= trunc_seq
;
2655 ops
[i
].op
.flags
= op_flags
;
2656 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2657 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2664 Op
*prepare_write_full_op(
2665 const object_t
& oid
, const object_locator_t
& oloc
,
2666 const SnapContext
& snapc
, const bufferlist
&bl
,
2667 ceph::real_time mtime
, int flags
,
2668 Context
*oncommit
, version_t
*objver
= NULL
,
2669 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2671 int i
= init_ops(ops
, 1, extra_ops
);
2672 ops
[i
].op
.op
= CEPH_OSD_OP_WRITEFULL
;
2673 ops
[i
].op
.extent
.offset
= 0;
2674 ops
[i
].op
.extent
.length
= bl
.length();
2676 ops
[i
].op
.flags
= op_flags
;
2677 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2678 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2683 ceph_tid_t
write_full(
2684 const object_t
& oid
, const object_locator_t
& oloc
,
2685 const SnapContext
& snapc
, const bufferlist
&bl
,
2686 ceph::real_time mtime
, int flags
,
2687 Context
*oncommit
, version_t
*objver
= NULL
,
2688 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2689 Op
*o
= prepare_write_full_op(oid
, oloc
, snapc
, bl
, mtime
, flags
,
2690 oncommit
, objver
, extra_ops
, op_flags
);
2695 Op
*prepare_writesame_op(
2696 const object_t
& oid
, const object_locator_t
& oloc
,
2697 uint64_t write_len
, uint64_t off
,
2698 const SnapContext
& snapc
, const bufferlist
&bl
,
2699 ceph::real_time mtime
, int flags
,
2700 Context
*oncommit
, version_t
*objver
= NULL
,
2701 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2704 int i
= init_ops(ops
, 1, extra_ops
);
2705 ops
[i
].op
.op
= CEPH_OSD_OP_WRITESAME
;
2706 ops
[i
].op
.writesame
.offset
= off
;
2707 ops
[i
].op
.writesame
.length
= write_len
;
2708 ops
[i
].op
.writesame
.data_length
= bl
.length();
2710 ops
[i
].op
.flags
= op_flags
;
2711 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2712 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2717 ceph_tid_t
writesame(
2718 const object_t
& oid
, const object_locator_t
& oloc
,
2719 uint64_t write_len
, uint64_t off
,
2720 const SnapContext
& snapc
, const bufferlist
&bl
,
2721 ceph::real_time mtime
, int flags
,
2722 Context
*oncommit
, version_t
*objver
= NULL
,
2723 ObjectOperation
*extra_ops
= NULL
, int op_flags
= 0) {
2725 Op
*o
= prepare_writesame_op(oid
, oloc
, write_len
, off
, snapc
, bl
,
2726 mtime
, flags
, oncommit
, objver
,
2727 extra_ops
, op_flags
);
2733 ceph_tid_t
trunc(const object_t
& oid
, const object_locator_t
& oloc
,
2734 const SnapContext
& snapc
, ceph::real_time mtime
, int flags
,
2735 uint64_t trunc_size
, __u32 trunc_seq
,
2736 Context
*oncommit
, version_t
*objver
= NULL
,
2737 ObjectOperation
*extra_ops
= NULL
) {
2739 int i
= init_ops(ops
, 1, extra_ops
);
2740 ops
[i
].op
.op
= CEPH_OSD_OP_TRUNCATE
;
2741 ops
[i
].op
.extent
.offset
= trunc_size
;
2742 ops
[i
].op
.extent
.truncate_size
= trunc_size
;
2743 ops
[i
].op
.extent
.truncate_seq
= trunc_seq
;
2744 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2745 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2752 ceph_tid_t
zero(const object_t
& oid
, const object_locator_t
& oloc
,
2753 uint64_t off
, uint64_t len
, const SnapContext
& snapc
,
2754 ceph::real_time mtime
, int flags
, Context
*oncommit
,
2755 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2757 int i
= init_ops(ops
, 1, extra_ops
);
2758 ops
[i
].op
.op
= CEPH_OSD_OP_ZERO
;
2759 ops
[i
].op
.extent
.offset
= off
;
2760 ops
[i
].op
.extent
.length
= len
;
2761 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2762 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2769 ceph_tid_t
rollback_object(const object_t
& oid
, const object_locator_t
& oloc
,
2770 const SnapContext
& snapc
, snapid_t snapid
,
2771 ceph::real_time mtime
, Context
*oncommit
,
2772 version_t
*objver
= NULL
,
2773 ObjectOperation
*extra_ops
= NULL
) {
2775 int i
= init_ops(ops
, 1, extra_ops
);
2776 ops
[i
].op
.op
= CEPH_OSD_OP_ROLLBACK
;
2777 ops
[i
].op
.snap
.snapid
= snapid
;
2778 Op
*o
= new Op(oid
, oloc
, ops
, CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2785 ceph_tid_t
create(const object_t
& oid
, const object_locator_t
& oloc
,
2786 const SnapContext
& snapc
, ceph::real_time mtime
, int global_flags
,
2787 int create_flags
, Context
*oncommit
,
2788 version_t
*objver
= NULL
,
2789 ObjectOperation
*extra_ops
= NULL
) {
2791 int i
= init_ops(ops
, 1, extra_ops
);
2792 ops
[i
].op
.op
= CEPH_OSD_OP_CREATE
;
2793 ops
[i
].op
.flags
= create_flags
;
2794 Op
*o
= new Op(oid
, oloc
, ops
, global_flags
| global_op_flags
|
2795 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2802 Op
*prepare_remove_op(
2803 const object_t
& oid
, const object_locator_t
& oloc
,
2804 const SnapContext
& snapc
, ceph::real_time mtime
, int flags
,
2806 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2808 int i
= init_ops(ops
, 1, extra_ops
);
2809 ops
[i
].op
.op
= CEPH_OSD_OP_DELETE
;
2810 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2811 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2817 const object_t
& oid
, const object_locator_t
& oloc
,
2818 const SnapContext
& snapc
, ceph::real_time mtime
, int flags
,
2820 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2821 Op
*o
= prepare_remove_op(oid
, oloc
, snapc
, mtime
, flags
,
2822 oncommit
, objver
, extra_ops
);
2828 ceph_tid_t
setxattr(const object_t
& oid
, const object_locator_t
& oloc
,
2829 const char *name
, const SnapContext
& snapc
, const bufferlist
&bl
,
2830 ceph::real_time mtime
, int flags
,
2832 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2834 int i
= init_ops(ops
, 1, extra_ops
);
2835 ops
[i
].op
.op
= CEPH_OSD_OP_SETXATTR
;
2836 ops
[i
].op
.xattr
.name_len
= (name
? strlen(name
) : 0);
2837 ops
[i
].op
.xattr
.value_len
= bl
.length();
2839 ops
[i
].indata
.append(name
);
2840 ops
[i
].indata
.append(bl
);
2841 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2842 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2849 ceph_tid_t
removexattr(const object_t
& oid
, const object_locator_t
& oloc
,
2850 const char *name
, const SnapContext
& snapc
,
2851 ceph::real_time mtime
, int flags
,
2853 version_t
*objver
= NULL
, ObjectOperation
*extra_ops
= NULL
) {
2855 int i
= init_ops(ops
, 1, extra_ops
);
2856 ops
[i
].op
.op
= CEPH_OSD_OP_RMXATTR
;
2857 ops
[i
].op
.xattr
.name_len
= (name
? strlen(name
) : 0);
2858 ops
[i
].op
.xattr
.value_len
= 0;
2860 ops
[i
].indata
.append(name
);
2861 Op
*o
= new Op(oid
, oloc
, ops
, flags
| global_op_flags
|
2862 CEPH_OSD_FLAG_WRITE
, oncommit
, objver
);
2870 void list_nobjects(NListContext
*p
, Context
*onfinish
);
2871 uint32_t list_nobjects_seek(NListContext
*p
, uint32_t pos
);
2872 uint32_t list_nobjects_seek(NListContext
*list_context
, const hobject_t
& c
);
2873 void list_nobjects_get_cursor(NListContext
*list_context
, hobject_t
*c
);
2875 hobject_t
enumerate_objects_begin();
2876 hobject_t
enumerate_objects_end();
2877 //hobject_t enumerate_objects_begin(int n, int m);
2878 void enumerate_objects(
2880 const std::string
&ns
,
2881 const hobject_t
&start
,
2882 const hobject_t
&end
,
2884 const bufferlist
&filter_bl
,
2885 std::list
<librados::ListObjectImpl
> *result
,
2887 Context
*on_finish
);
2889 void _enumerate_reply(
2892 const hobject_t
&end
,
2893 const int64_t pool_id
,
2895 epoch_t reply_epoch
,
2896 std::list
<librados::ListObjectImpl
> *result
,
2898 Context
*on_finish
);
2899 friend class C_EnumerateReply
;
2901 // -------------------------
2904 void pool_op_submit(PoolOp
*op
);
2905 void _pool_op_submit(PoolOp
*op
);
2906 void _finish_pool_op(PoolOp
*op
, int r
);
2907 void _do_delete_pool(int64_t pool
, Context
*onfinish
);
2909 int create_pool_snap(int64_t pool
, string
& snapName
, Context
*onfinish
);
2910 int allocate_selfmanaged_snap(int64_t pool
, snapid_t
*psnapid
,
2912 int delete_pool_snap(int64_t pool
, string
& snapName
, Context
*onfinish
);
2913 int delete_selfmanaged_snap(int64_t pool
, snapid_t snap
, Context
*onfinish
);
2915 int create_pool(string
& name
, Context
*onfinish
, uint64_t auid
=0,
2917 int delete_pool(int64_t pool
, Context
*onfinish
);
2918 int delete_pool(const string
& name
, Context
*onfinish
);
2919 int change_pool_auid(int64_t pool
, Context
*onfinish
, uint64_t auid
);
2921 void handle_pool_op_reply(MPoolOpReply
*m
);
2922 int pool_op_cancel(ceph_tid_t tid
, int r
);
2924 // --------------------------
2927 void _poolstat_submit(PoolStatOp
*op
);
2929 void handle_get_pool_stats_reply(MGetPoolStatsReply
*m
);
2930 void get_pool_stats(list
<string
>& pools
, map
<string
,pool_stat_t
> *result
,
2932 int pool_stat_op_cancel(ceph_tid_t tid
, int r
);
2933 void _finish_pool_stat_op(PoolStatOp
*op
, int r
);
2935 // ---------------------------
2938 void _fs_stats_submit(StatfsOp
*op
);
2940 void handle_fs_stats_reply(MStatfsReply
*m
);
2941 void get_fs_stats(struct ceph_statfs
& result
, boost::optional
<int64_t> poolid
,
2943 int statfs_op_cancel(ceph_tid_t tid
, int r
);
2944 void _finish_statfs_op(StatfsOp
*op
, int r
);
2946 // ---------------------------
2947 // some scatter/gather hackery
2949 void _sg_read_finish(vector
<ObjectExtent
>& extents
,
2950 vector
<bufferlist
>& resultbl
,
2951 bufferlist
*bl
, Context
*onfinish
);
2953 struct C_SGRead
: public Context
{
2955 vector
<ObjectExtent
> extents
;
2956 vector
<bufferlist
> resultbl
;
2959 C_SGRead(Objecter
*ob
,
2960 vector
<ObjectExtent
>& e
, vector
<bufferlist
>& r
, bufferlist
*b
,
2962 objecter(ob
), bl(b
), onfinish(c
) {
2966 void finish(int r
) override
{
2967 objecter
->_sg_read_finish(extents
, resultbl
, bl
, onfinish
);
2971 void sg_read_trunc(vector
<ObjectExtent
>& extents
, snapid_t snap
,
2972 bufferlist
*bl
, int flags
, uint64_t trunc_size
,
2973 __u32 trunc_seq
, Context
*onfinish
, int op_flags
= 0) {
2974 if (extents
.size() == 1) {
2975 read_trunc(extents
[0].oid
, extents
[0].oloc
, extents
[0].offset
,
2976 extents
[0].length
, snap
, bl
, flags
, extents
[0].truncate_size
,
2977 trunc_seq
, onfinish
, 0, 0, op_flags
);
2979 C_GatherBuilder
gather(cct
);
2980 vector
<bufferlist
> resultbl(extents
.size());
2982 for (vector
<ObjectExtent
>::iterator p
= extents
.begin();
2985 read_trunc(p
->oid
, p
->oloc
, p
->offset
, p
->length
, snap
, &resultbl
[i
++],
2986 flags
, p
->truncate_size
, trunc_seq
, gather
.new_sub(),
2989 gather
.set_finisher(new C_SGRead(this, extents
, resultbl
, bl
, onfinish
));
2994 void sg_read(vector
<ObjectExtent
>& extents
, snapid_t snap
, bufferlist
*bl
,
2995 int flags
, Context
*onfinish
, int op_flags
= 0) {
2996 sg_read_trunc(extents
, snap
, bl
, flags
, 0, 0, onfinish
, op_flags
);
2999 void sg_write_trunc(vector
<ObjectExtent
>& extents
, const SnapContext
& snapc
,
3000 const bufferlist
& bl
, ceph::real_time mtime
, int flags
,
3001 uint64_t trunc_size
, __u32 trunc_seq
,
3002 Context
*oncommit
, int op_flags
= 0) {
3003 if (extents
.size() == 1) {
3004 write_trunc(extents
[0].oid
, extents
[0].oloc
, extents
[0].offset
,
3005 extents
[0].length
, snapc
, bl
, mtime
, flags
,
3006 extents
[0].truncate_size
, trunc_seq
, oncommit
,
3009 C_GatherBuilder
gcom(cct
, oncommit
);
3010 for (vector
<ObjectExtent
>::iterator p
= extents
.begin();
3014 for (vector
<pair
<uint64_t,uint64_t> >::iterator bit
3015 = p
->buffer_extents
.begin();
3016 bit
!= p
->buffer_extents
.end();
3018 bl
.copy(bit
->first
, bit
->second
, cur
);
3019 assert(cur
.length() == p
->length
);
3020 write_trunc(p
->oid
, p
->oloc
, p
->offset
, p
->length
,
3021 snapc
, cur
, mtime
, flags
, p
->truncate_size
, trunc_seq
,
3022 oncommit
? gcom
.new_sub():0,
3029 void sg_write(vector
<ObjectExtent
>& extents
, const SnapContext
& snapc
,
3030 const bufferlist
& bl
, ceph::real_time mtime
, int flags
,
3031 Context
*oncommit
, int op_flags
= 0) {
3032 sg_write_trunc(extents
, snapc
, bl
, mtime
, flags
, 0, 0, oncommit
,
3036 void ms_handle_connect(Connection
*con
) override
;
3037 bool ms_handle_reset(Connection
*con
) override
;
3038 void ms_handle_remote_reset(Connection
*con
) override
;
3039 bool ms_handle_refused(Connection
*con
) override
;
3040 bool ms_get_authorizer(int dest_type
,
3041 AuthAuthorizer
**authorizer
,
3042 bool force_new
) override
;
3044 void blacklist_self(bool set
);
3047 epoch_t epoch_barrier
;
3048 bool retry_writes_after_first_reply
;
3050 void set_epoch_barrier(epoch_t epoch
);
3052 PerfCounters
*get_logger() {