1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #include "include/int_types.h"
8 #include "include/types.h"
9 #include "include/uuid.h"
10 #include "common/ceph_context.h"
11 #include "common/dout.h"
12 #include "common/errno.h"
13 #include "common/Throttle.h"
14 #include "common/event_socket.h"
15 #include "cls/lock/cls_lock_client.h"
16 #include "include/stringify.h"
18 #include "cls/rbd/cls_rbd.h"
19 #include "cls/rbd/cls_rbd_types.h"
20 #include "cls/rbd/cls_rbd_client.h"
21 #include "cls/journal/cls_journal_types.h"
22 #include "cls/journal/cls_journal_client.h"
24 #include "librbd/ExclusiveLock.h"
25 #include "librbd/ImageCtx.h"
26 #include "librbd/ImageState.h"
27 #include "librbd/internal.h"
28 #include "librbd/Journal.h"
29 #include "librbd/ObjectMap.h"
30 #include "librbd/Operations.h"
31 #include "librbd/Types.h"
32 #include "librbd/Utils.h"
33 #include "librbd/api/Image.h"
34 #include "librbd/exclusive_lock/AutomaticPolicy.h"
35 #include "librbd/exclusive_lock/StandardPolicy.h"
36 #include "librbd/image/CloneRequest.h"
37 #include "librbd/image/CreateRequest.h"
38 #include "librbd/image/RemoveRequest.h"
39 #include "librbd/io/AioCompletion.h"
40 #include "librbd/io/ImageRequest.h"
41 #include "librbd/io/ImageRequestWQ.h"
42 #include "librbd/io/ObjectRequest.h"
43 #include "librbd/io/ReadResult.h"
44 #include "librbd/journal/Types.h"
45 #include "librbd/managed_lock/Types.h"
46 #include "librbd/mirror/EnableRequest.h"
47 #include "librbd/operation/TrimRequest.h"
49 #include "journal/Journaler.h"
51 #include <boost/scope_exit.hpp>
52 #include <boost/variant.hpp>
53 #include "include/assert.h"
55 #define dout_subsys ceph_subsys_rbd
57 #define dout_prefix *_dout << "librbd: "
59 #define rbd_howmany(x, y) (((x) + (y) - 1) / (y))
66 // list binds to list() here, so std::list is explicitly used below
68 using ceph::bufferlist
;
69 using librados::snap_t
;
70 using librados::IoCtx
;
71 using librados::Rados
;
77 int validate_pool(IoCtx
&io_ctx
, CephContext
*cct
) {
78 if (!cct
->_conf
->get_val
<bool>("rbd_validate_pool")) {
82 int r
= io_ctx
.stat(RBD_DIRECTORY
, NULL
, NULL
);
85 } else if (r
< 0 && r
!= -ENOENT
) {
86 lderr(cct
) << "failed to stat RBD directory: " << cpp_strerror(r
) << dendl
;
90 // allocate a self-managed snapshot id if this a new pool to force
91 // self-managed snapshot mode
93 r
= io_ctx
.selfmanaged_snap_create(&snap_id
);
95 lderr(cct
) << "pool not configured for self-managed RBD snapshot support"
99 lderr(cct
) << "failed to allocate self-managed snapshot: "
100 << cpp_strerror(r
) << dendl
;
104 r
= io_ctx
.selfmanaged_snap_remove(snap_id
);
106 lderr(cct
) << "failed to release self-managed snapshot " << snap_id
107 << ": " << cpp_strerror(r
) << dendl
;
113 } // anonymous namespace
115 int detect_format(IoCtx
&io_ctx
, const string
&name
,
116 bool *old_format
, uint64_t *size
)
118 CephContext
*cct
= (CephContext
*)io_ctx
.cct();
121 int r
= io_ctx
.stat(util::old_header_name(name
), size
, NULL
);
125 r
= io_ctx
.stat(util::id_obj_name(name
), size
, NULL
);
132 ldout(cct
, 20) << "detect format of " << name
<< " : "
133 << (old_format
? (*old_format
? "old" : "new") :
134 "don't care") << dendl
;
138 bool has_parent(int64_t parent_pool_id
, uint64_t off
, uint64_t overlap
)
140 return (parent_pool_id
!= -1 && off
<= overlap
);
143 void init_rbd_header(struct rbd_obj_header_ondisk
& ondisk
,
144 uint64_t size
, int order
, uint64_t bid
)
146 uint32_t hi
= bid
>> 32;
147 uint32_t lo
= bid
& 0xFFFFFFFF;
148 uint32_t extra
= rand() % 0xFFFFFFFF;
149 memset(&ondisk
, 0, sizeof(ondisk
));
151 memcpy(&ondisk
.text
, RBD_HEADER_TEXT
, sizeof(RBD_HEADER_TEXT
));
152 memcpy(&ondisk
.signature
, RBD_HEADER_SIGNATURE
,
153 sizeof(RBD_HEADER_SIGNATURE
));
154 memcpy(&ondisk
.version
, RBD_HEADER_VERSION
, sizeof(RBD_HEADER_VERSION
));
156 snprintf(ondisk
.block_name
, sizeof(ondisk
.block_name
), "rb.%x.%x.%x",
159 ondisk
.image_size
= size
;
160 ondisk
.options
.order
= order
;
161 ondisk
.options
.crypt_type
= RBD_CRYPT_NONE
;
162 ondisk
.options
.comp_type
= RBD_COMP_NONE
;
164 ondisk
.snap_count
= 0;
166 ondisk
.snap_names_len
= 0;
169 void image_info(ImageCtx
*ictx
, image_info_t
& info
, size_t infosize
)
171 int obj_order
= ictx
->order
;
172 ictx
->snap_lock
.get_read();
173 info
.size
= ictx
->get_image_size(ictx
->snap_id
);
174 ictx
->snap_lock
.put_read();
175 info
.obj_size
= 1ULL << obj_order
;
176 info
.num_objs
= Striper::get_num_objects(ictx
->layout
, info
.size
);
177 info
.order
= obj_order
;
178 strncpy(info
.block_name_prefix
, ictx
->object_prefix
.c_str(),
179 RBD_MAX_BLOCK_NAME_SIZE
);
180 info
.block_name_prefix
[RBD_MAX_BLOCK_NAME_SIZE
- 1] = '\0';
182 // clear deprecated fields
183 info
.parent_pool
= -1L;
184 info
.parent_name
[0] = '\0';
187 uint64_t oid_to_object_no(const string
& oid
, const string
& object_prefix
)
189 istringstream
iss(oid
);
190 // skip object prefix and separator
191 iss
.ignore(object_prefix
.length() + 1);
193 iss
>> std::hex
>> num
;
197 void trim_image(ImageCtx
*ictx
, uint64_t newsize
, ProgressContext
& prog_ctx
)
199 assert(ictx
->owner_lock
.is_locked());
200 assert(ictx
->exclusive_lock
== nullptr ||
201 ictx
->exclusive_lock
->is_lock_owner());
204 ictx
->snap_lock
.get_read();
205 operation::TrimRequest
<> *req
= operation::TrimRequest
<>::create(
206 *ictx
, &ctx
, ictx
->size
, newsize
, prog_ctx
);
207 ictx
->snap_lock
.put_read();
212 lderr(ictx
->cct
) << "warning: failed to remove some object(s): "
213 << cpp_strerror(r
) << dendl
;
217 int read_header_bl(IoCtx
& io_ctx
, const string
& header_oid
,
218 bufferlist
& header
, uint64_t *ver
)
222 #define READ_SIZE 4096
225 r
= io_ctx
.read(header_oid
, bl
, READ_SIZE
, off
);
228 header
.claim_append(bl
);
230 } while (r
== READ_SIZE
);
232 if (header
.length() < sizeof(RBD_HEADER_TEXT
) ||
233 memcmp(RBD_HEADER_TEXT
, header
.c_str(), sizeof(RBD_HEADER_TEXT
))) {
234 CephContext
*cct
= (CephContext
*)io_ctx
.cct();
235 lderr(cct
) << "unrecognized header format" << dendl
;
240 *ver
= io_ctx
.get_last_version();
245 int read_header(IoCtx
& io_ctx
, const string
& header_oid
,
246 struct rbd_obj_header_ondisk
*header
, uint64_t *ver
)
248 bufferlist header_bl
;
249 int r
= read_header_bl(io_ctx
, header_oid
, header_bl
, ver
);
252 if (header_bl
.length() < (int)sizeof(*header
))
254 memcpy(header
, header_bl
.c_str(), sizeof(*header
));
259 int tmap_set(IoCtx
& io_ctx
, const string
& imgname
)
261 bufferlist cmdbl
, emptybl
;
262 __u8 c
= CEPH_OSD_TMAP_SET
;
264 ::encode(imgname
, cmdbl
);
265 ::encode(emptybl
, cmdbl
);
266 return io_ctx
.tmap_update(RBD_DIRECTORY
, cmdbl
);
269 int tmap_rm(IoCtx
& io_ctx
, const string
& imgname
)
272 __u8 c
= CEPH_OSD_TMAP_RM
;
274 ::encode(imgname
, cmdbl
);
275 return io_ctx
.tmap_update(RBD_DIRECTORY
, cmdbl
);
278 typedef boost::variant
<std::string
,uint64_t> image_option_value_t
;
279 typedef std::map
<int,image_option_value_t
> image_options_t
;
280 typedef std::shared_ptr
<image_options_t
> image_options_ref
;
282 enum image_option_type_t
{
287 const std::map
<int, image_option_type_t
> IMAGE_OPTIONS_TYPE_MAPPING
= {
288 {RBD_IMAGE_OPTION_FORMAT
, UINT64
},
289 {RBD_IMAGE_OPTION_FEATURES
, UINT64
},
290 {RBD_IMAGE_OPTION_ORDER
, UINT64
},
291 {RBD_IMAGE_OPTION_STRIPE_UNIT
, UINT64
},
292 {RBD_IMAGE_OPTION_STRIPE_COUNT
, UINT64
},
293 {RBD_IMAGE_OPTION_JOURNAL_ORDER
, UINT64
},
294 {RBD_IMAGE_OPTION_JOURNAL_SPLAY_WIDTH
, UINT64
},
295 {RBD_IMAGE_OPTION_JOURNAL_POOL
, STR
},
296 {RBD_IMAGE_OPTION_FEATURES_SET
, UINT64
},
297 {RBD_IMAGE_OPTION_FEATURES_CLEAR
, UINT64
},
298 {RBD_IMAGE_OPTION_DATA_POOL
, STR
},
301 std::string
image_option_name(int optname
) {
303 case RBD_IMAGE_OPTION_FORMAT
:
305 case RBD_IMAGE_OPTION_FEATURES
:
307 case RBD_IMAGE_OPTION_ORDER
:
309 case RBD_IMAGE_OPTION_STRIPE_UNIT
:
310 return "stripe_unit";
311 case RBD_IMAGE_OPTION_STRIPE_COUNT
:
312 return "stripe_count";
313 case RBD_IMAGE_OPTION_JOURNAL_ORDER
:
314 return "journal_order";
315 case RBD_IMAGE_OPTION_JOURNAL_SPLAY_WIDTH
:
316 return "journal_splay_width";
317 case RBD_IMAGE_OPTION_JOURNAL_POOL
:
318 return "journal_pool";
319 case RBD_IMAGE_OPTION_FEATURES_SET
:
320 return "features_set";
321 case RBD_IMAGE_OPTION_FEATURES_CLEAR
:
322 return "features_clear";
323 case RBD_IMAGE_OPTION_DATA_POOL
:
326 return "unknown (" + stringify(optname
) + ")";
330 std::ostream
&operator<<(std::ostream
&os
, const ImageOptions
&opts
) {
333 const char *delimiter
= "";
334 for (auto &i
: IMAGE_OPTIONS_TYPE_MAPPING
) {
335 if (i
.second
== STR
) {
337 if (opts
.get(i
.first
, &val
) == 0) {
338 os
<< delimiter
<< image_option_name(i
.first
) << "=" << val
;
341 } else if (i
.second
== UINT64
) {
343 if (opts
.get(i
.first
, &val
) == 0) {
344 os
<< delimiter
<< image_option_name(i
.first
) << "=" << val
;
355 void image_options_create(rbd_image_options_t
* opts
)
357 image_options_ref
* opts_
= new image_options_ref(new image_options_t());
359 *opts
= static_cast<rbd_image_options_t
>(opts_
);
362 void image_options_create_ref(rbd_image_options_t
* opts
,
363 rbd_image_options_t orig
)
365 image_options_ref
* orig_
= static_cast<image_options_ref
*>(orig
);
366 image_options_ref
* opts_
= new image_options_ref(*orig_
);
368 *opts
= static_cast<rbd_image_options_t
>(opts_
);
371 void image_options_copy(rbd_image_options_t
* opts
,
372 const ImageOptions
&orig
)
374 image_options_ref
* opts_
= new image_options_ref(new image_options_t());
376 *opts
= static_cast<rbd_image_options_t
>(opts_
);
380 for (auto &i
: IMAGE_OPTIONS_TYPE_MAPPING
) {
383 if (orig
.get(i
.first
, &str_val
) == 0) {
384 image_options_set(*opts
, i
.first
, str_val
);
388 if (orig
.get(i
.first
, &uint64_val
) == 0) {
389 image_options_set(*opts
, i
.first
, uint64_val
);
396 void image_options_destroy(rbd_image_options_t opts
)
398 image_options_ref
* opts_
= static_cast<image_options_ref
*>(opts
);
403 int image_options_set(rbd_image_options_t opts
, int optname
,
404 const std::string
& optval
)
406 image_options_ref
* opts_
= static_cast<image_options_ref
*>(opts
);
408 std::map
<int, image_option_type_t
>::const_iterator i
=
409 IMAGE_OPTIONS_TYPE_MAPPING
.find(optname
);
411 if (i
== IMAGE_OPTIONS_TYPE_MAPPING
.end() || i
->second
!= STR
) {
415 (*opts_
->get())[optname
] = optval
;
419 int image_options_set(rbd_image_options_t opts
, int optname
, uint64_t optval
)
421 image_options_ref
* opts_
= static_cast<image_options_ref
*>(opts
);
423 std::map
<int, image_option_type_t
>::const_iterator i
=
424 IMAGE_OPTIONS_TYPE_MAPPING
.find(optname
);
426 if (i
== IMAGE_OPTIONS_TYPE_MAPPING
.end() || i
->second
!= UINT64
) {
430 (*opts_
->get())[optname
] = optval
;
434 int image_options_get(rbd_image_options_t opts
, int optname
,
437 image_options_ref
* opts_
= static_cast<image_options_ref
*>(opts
);
439 std::map
<int, image_option_type_t
>::const_iterator i
=
440 IMAGE_OPTIONS_TYPE_MAPPING
.find(optname
);
442 if (i
== IMAGE_OPTIONS_TYPE_MAPPING
.end() || i
->second
!= STR
) {
446 image_options_t::const_iterator j
= (*opts_
)->find(optname
);
448 if (j
== (*opts_
)->end()) {
452 *optval
= boost::get
<std::string
>(j
->second
);
456 int image_options_get(rbd_image_options_t opts
, int optname
, uint64_t* optval
)
458 image_options_ref
* opts_
= static_cast<image_options_ref
*>(opts
);
460 std::map
<int, image_option_type_t
>::const_iterator i
=
461 IMAGE_OPTIONS_TYPE_MAPPING
.find(optname
);
463 if (i
== IMAGE_OPTIONS_TYPE_MAPPING
.end() || i
->second
!= UINT64
) {
467 image_options_t::const_iterator j
= (*opts_
)->find(optname
);
469 if (j
== (*opts_
)->end()) {
473 *optval
= boost::get
<uint64_t>(j
->second
);
477 int image_options_is_set(rbd_image_options_t opts
, int optname
,
480 if (IMAGE_OPTIONS_TYPE_MAPPING
.find(optname
) ==
481 IMAGE_OPTIONS_TYPE_MAPPING
.end()) {
485 image_options_ref
* opts_
= static_cast<image_options_ref
*>(opts
);
486 *is_set
= ((*opts_
)->find(optname
) != (*opts_
)->end());
490 int image_options_unset(rbd_image_options_t opts
, int optname
)
492 image_options_ref
* opts_
= static_cast<image_options_ref
*>(opts
);
494 std::map
<int, image_option_type_t
>::const_iterator i
=
495 IMAGE_OPTIONS_TYPE_MAPPING
.find(optname
);
497 if (i
== IMAGE_OPTIONS_TYPE_MAPPING
.end()) {
498 assert((*opts_
)->find(optname
) == (*opts_
)->end());
502 image_options_t::const_iterator j
= (*opts_
)->find(optname
);
504 if (j
== (*opts_
)->end()) {
512 void image_options_clear(rbd_image_options_t opts
)
514 image_options_ref
* opts_
= static_cast<image_options_ref
*>(opts
);
519 bool image_options_is_empty(rbd_image_options_t opts
)
521 image_options_ref
* opts_
= static_cast<image_options_ref
*>(opts
);
523 return (*opts_
)->empty();
526 int list(IoCtx
& io_ctx
, vector
<string
>& names
)
528 CephContext
*cct
= (CephContext
*)io_ctx
.cct();
529 ldout(cct
, 20) << "list " << &io_ctx
<< dendl
;
532 int r
= io_ctx
.read(RBD_DIRECTORY
, bl
, 0, 0);
540 // old format images are in a tmap
542 bufferlist::iterator p
= bl
.begin();
544 map
<string
,bufferlist
> m
;
547 for (map
<string
,bufferlist
>::iterator q
= m
.begin(); q
!= m
.end(); ++q
) {
548 names
.push_back(q
->first
);
552 map
<string
, string
> images
;
553 r
= api::Image
<>::list_images(io_ctx
, &images
);
555 lderr(cct
) << "error listing v2 images: " << cpp_strerror(r
) << dendl
;
558 for (const auto& img_pair
: images
) {
559 names
.push_back(img_pair
.first
);
565 int flatten_children(ImageCtx
*ictx
, const char* snap_name
,
566 ProgressContext
& pctx
)
568 CephContext
*cct
= ictx
->cct
;
569 ldout(cct
, 20) << "children flatten " << ictx
->name
<< dendl
;
571 int r
= ictx
->state
->refresh_if_required();
576 RWLock::RLocker
l(ictx
->snap_lock
);
577 snap_t snap_id
= ictx
->get_snap_id(cls::rbd::UserSnapshotNamespace(), snap_name
);
578 ParentSpec
parent_spec(ictx
->md_ctx
.get_id(), ictx
->id
, snap_id
);
579 map
< pair
<int64_t, string
>, set
<string
> > image_info
;
581 r
= api::Image
<>::list_children(ictx
, parent_spec
, &image_info
);
586 size_t size
= image_info
.size();
591 Rados
rados(ictx
->md_ctx
);
592 for ( auto &info
: image_info
){
593 string pool
= info
.first
.second
;
595 r
= rados
.ioctx_create2(info
.first
.first
, ioctx
);
597 lderr(cct
) << "Error accessing child image pool " << pool
602 for (auto &id_it
: info
.second
) {
603 ImageCtx
*imctx
= new ImageCtx("", id_it
, NULL
, ioctx
, false);
604 int r
= imctx
->state
->open(false);
606 lderr(cct
) << "error opening image: "
607 << cpp_strerror(r
) << dendl
;
611 if ((imctx
->features
& RBD_FEATURE_DEEP_FLATTEN
) == 0 &&
612 !imctx
->snaps
.empty()) {
613 lderr(cct
) << "snapshot in-use by " << pool
<< "/" << imctx
->name
615 imctx
->state
->close();
619 librbd::NoOpProgressContext prog_ctx
;
620 r
= imctx
->operations
->flatten(prog_ctx
);
622 lderr(cct
) << "error flattening image: " << pool
<< "/" << id_it
623 << cpp_strerror(r
) << dendl
;
624 imctx
->state
->close();
628 r
= imctx
->state
->close();
630 lderr(cct
) << "failed to close image: " << cpp_strerror(r
) << dendl
;
634 pctx
.update_progress(++i
, size
);
641 int list_children(ImageCtx
*ictx
, set
<pair
<string
, string
> >& names
)
643 CephContext
*cct
= ictx
->cct
;
644 ldout(cct
, 20) << "children list " << ictx
->name
<< dendl
;
646 int r
= ictx
->state
->refresh_if_required();
651 RWLock::RLocker
l(ictx
->snap_lock
);
652 ParentSpec
parent_spec(ictx
->md_ctx
.get_id(), ictx
->id
, ictx
->snap_id
);
653 map
< pair
<int64_t, string
>, set
<string
> > image_info
;
655 r
= api::Image
<>::list_children(ictx
, parent_spec
, &image_info
);
660 Rados
rados(ictx
->md_ctx
);
661 for ( auto &info
: image_info
){
663 r
= rados
.ioctx_create2(info
.first
.first
, ioctx
);
665 lderr(cct
) << "Error accessing child image pool " << info
.first
.second
670 for (auto &id_it
: info
.second
) {
672 r
= cls_client::dir_get_name(&ioctx
, RBD_DIRECTORY
, id_it
, &name
);
674 lderr(cct
) << "Error looking up name for image id " << id_it
675 << " in pool " << info
.first
.second
<< dendl
;
678 names
.insert(make_pair(info
.first
.second
, name
));
685 int get_snap_namespace(ImageCtx
*ictx
,
686 const char *snap_name
,
687 cls::rbd::SnapshotNamespace
*snap_namespace
) {
688 ldout(ictx
->cct
, 20) << "get_snap_namespace " << ictx
<< " " << snap_name
691 int r
= ictx
->state
->refresh_if_required();
694 RWLock::RLocker
l(ictx
->snap_lock
);
695 snap_t snap_id
= ictx
->get_snap_id(*snap_namespace
, snap_name
);
696 if (snap_id
== CEPH_NOSNAP
)
698 r
= ictx
->get_snap_namespace(snap_id
, snap_namespace
);
702 int snap_is_protected(ImageCtx
*ictx
, const char *snap_name
, bool *is_protected
)
704 ldout(ictx
->cct
, 20) << "snap_is_protected " << ictx
<< " " << snap_name
707 int r
= ictx
->state
->refresh_if_required();
711 RWLock::RLocker
l(ictx
->snap_lock
);
712 snap_t snap_id
= ictx
->get_snap_id(cls::rbd::UserSnapshotNamespace(), snap_name
);
713 if (snap_id
== CEPH_NOSNAP
)
716 r
= ictx
->is_snap_unprotected(snap_id
, &is_unprotected
);
717 // consider both PROTECTED or UNPROTECTING to be 'protected',
718 // since in either state they can't be deleted
719 *is_protected
= !is_unprotected
;
723 int create_v1(IoCtx
& io_ctx
, const char *imgname
, uint64_t size
, int order
)
725 CephContext
*cct
= (CephContext
*)io_ctx
.cct();
727 ldout(cct
, 20) << __func__
<< " " << &io_ctx
<< " name = " << imgname
728 << " size = " << size
<< " order = " << order
<< dendl
;
729 int r
= validate_pool(io_ctx
, cct
);
734 ldout(cct
, 2) << "adding rbd image to directory..." << dendl
;
735 r
= tmap_set(io_ctx
, imgname
);
737 lderr(cct
) << "error adding image to directory: " << cpp_strerror(r
)
743 uint64_t bid
= rados
.get_instance_id();
745 ldout(cct
, 2) << "creating rbd image..." << dendl
;
746 struct rbd_obj_header_ondisk header
;
747 init_rbd_header(header
, size
, order
, bid
);
750 bl
.append((const char *)&header
, sizeof(header
));
752 string header_oid
= util::old_header_name(imgname
);
753 r
= io_ctx
.write(header_oid
, bl
, bl
.length(), 0);
755 lderr(cct
) << "Error writing image header: " << cpp_strerror(r
)
757 int remove_r
= tmap_rm(io_ctx
, imgname
);
759 lderr(cct
) << "Could not remove image from directory after "
760 << "header creation failed: "
761 << cpp_strerror(remove_r
) << dendl
;
766 ldout(cct
, 2) << "done." << dendl
;
770 int create(librados::IoCtx
& io_ctx
, const char *imgname
, uint64_t size
,
773 uint64_t order_
= *order
;
776 int r
= opts
.set(RBD_IMAGE_OPTION_ORDER
, order_
);
779 r
= create(io_ctx
, imgname
, "", size
, opts
, "", "", false);
781 int r1
= opts
.get(RBD_IMAGE_OPTION_ORDER
, &order_
);
788 int create(IoCtx
& io_ctx
, const char *imgname
, uint64_t size
,
789 bool old_format
, uint64_t features
, int *order
,
790 uint64_t stripe_unit
, uint64_t stripe_count
)
795 uint64_t order_
= *order
;
796 uint64_t format
= old_format
? 1 : 2;
800 r
= opts
.set(RBD_IMAGE_OPTION_FORMAT
, format
);
802 r
= opts
.set(RBD_IMAGE_OPTION_FEATURES
, features
);
804 r
= opts
.set(RBD_IMAGE_OPTION_ORDER
, order_
);
806 r
= opts
.set(RBD_IMAGE_OPTION_STRIPE_UNIT
, stripe_unit
);
808 r
= opts
.set(RBD_IMAGE_OPTION_STRIPE_COUNT
, stripe_count
);
811 r
= create(io_ctx
, imgname
, "", size
, opts
, "", "", false);
813 int r1
= opts
.get(RBD_IMAGE_OPTION_ORDER
, &order_
);
820 int create(IoCtx
& io_ctx
, const std::string
&image_name
,
821 const std::string
&image_id
, uint64_t size
,
823 const std::string
&non_primary_global_image_id
,
824 const std::string
&primary_mirror_uuid
,
825 bool skip_mirror_enable
)
827 std::string
id(image_id
);
829 id
= util::generate_image_id(io_ctx
);
832 CephContext
*cct
= (CephContext
*)io_ctx
.cct();
833 ldout(cct
, 10) << __func__
<< " name=" << image_name
<< ", "
834 << "id= " << id
<< ", "
835 << "size=" << size
<< ", opts=" << opts
<< dendl
;
838 if (opts
.get(RBD_IMAGE_OPTION_FORMAT
, &format
) != 0)
839 format
= cct
->_conf
->get_val
<int64_t>("rbd_default_format");
840 bool old_format
= format
== 1;
842 // make sure it doesn't already exist, in either format
843 int r
= detect_format(io_ctx
, image_name
, NULL
, NULL
);
846 lderr(cct
) << "Could not tell if " << image_name
<< " already exists"
850 lderr(cct
) << "rbd image " << image_name
<< " already exists" << dendl
;
855 if (opts
.get(RBD_IMAGE_OPTION_ORDER
, &order
) != 0 || order
== 0) {
856 order
= cct
->_conf
->get_val
<int64_t>("rbd_default_order");
858 r
= image::CreateRequest
<>::validate_order(cct
, order
);
864 r
= create_v1(io_ctx
, image_name
.c_str(), size
, order
);
866 ThreadPool
*thread_pool
;
867 ContextWQ
*op_work_queue
;
868 ImageCtx::get_thread_pool_instance(cct
, &thread_pool
, &op_work_queue
);
871 image::CreateRequest
<> *req
= image::CreateRequest
<>::create(
872 io_ctx
, image_name
, id
, size
, opts
, non_primary_global_image_id
,
873 primary_mirror_uuid
, skip_mirror_enable
, op_work_queue
, &cond
);
879 int r1
= opts
.set(RBD_IMAGE_OPTION_ORDER
, order
);
886 * Parent may be in different pool, hence different IoCtx
888 int clone(IoCtx
& p_ioctx
, const char *p_name
, const char *p_snap_name
,
889 IoCtx
& c_ioctx
, const char *c_name
,
890 uint64_t features
, int *c_order
,
891 uint64_t stripe_unit
, int stripe_count
)
893 uint64_t order
= *c_order
;
896 opts
.set(RBD_IMAGE_OPTION_FEATURES
, features
);
897 opts
.set(RBD_IMAGE_OPTION_ORDER
, order
);
898 opts
.set(RBD_IMAGE_OPTION_STRIPE_UNIT
, stripe_unit
);
899 opts
.set(RBD_IMAGE_OPTION_STRIPE_COUNT
, stripe_count
);
901 int r
= clone(p_ioctx
, p_name
, p_snap_name
, c_ioctx
, c_name
, opts
);
902 opts
.get(RBD_IMAGE_OPTION_ORDER
, &order
);
907 int clone(IoCtx
& p_ioctx
, const char *p_name
, const char *p_snap_name
,
908 IoCtx
& c_ioctx
, const char *c_name
, ImageOptions
& c_opts
)
910 CephContext
*cct
= (CephContext
*)p_ioctx
.cct();
911 if (p_snap_name
== NULL
) {
912 lderr(cct
) << "image to be cloned must be a snapshot" << dendl
;
916 // make sure parent snapshot exists
917 ImageCtx
*p_imctx
= new ImageCtx(p_name
, "", p_snap_name
, p_ioctx
, true);
918 int r
= p_imctx
->state
->open(false);
920 lderr(cct
) << "error opening parent image: "
921 << cpp_strerror(r
) << dendl
;
925 r
= clone(p_imctx
, c_ioctx
, c_name
, "", c_opts
, "", "");
927 int close_r
= p_imctx
->state
->close();
928 if (r
== 0 && close_r
< 0) {
938 int clone(ImageCtx
*p_imctx
, IoCtx
& c_ioctx
, const std::string
&c_name
,
939 const std::string
&c_id
, ImageOptions
& c_opts
,
940 const std::string
&non_primary_global_image_id
,
941 const std::string
&primary_mirror_uuid
)
943 std::string
id(c_id
);
945 id
= util::generate_image_id(c_ioctx
);
948 CephContext
*cct
= (CephContext
*)c_ioctx
.cct();
949 ldout(cct
, 10) << __func__
<< " "
950 << "c_name=" << c_name
<< ", "
951 << "c_id= " << c_id
<< ", "
952 << "c_opts=" << c_opts
<< dendl
;
954 ThreadPool
*thread_pool
;
955 ContextWQ
*op_work_queue
;
956 ImageCtx::get_thread_pool_instance(cct
, &thread_pool
, &op_work_queue
);
959 auto *req
= image::CloneRequest
<>::create(
960 p_imctx
, c_ioctx
, c_name
, id
, c_opts
,
961 non_primary_global_image_id
, primary_mirror_uuid
, op_work_queue
, &cond
);
967 int rename(IoCtx
& io_ctx
, const char *srcname
, const char *dstname
)
969 CephContext
*cct
= (CephContext
*)io_ctx
.cct();
970 ldout(cct
, 20) << "rename " << &io_ctx
<< " " << srcname
<< " -> "
973 ImageCtx
*ictx
= new ImageCtx(srcname
, "", "", io_ctx
, false);
974 int r
= ictx
->state
->open(false);
976 lderr(cct
) << "error opening source image: " << cpp_strerror(r
) << dendl
;
979 BOOST_SCOPE_EXIT((ictx
)) {
980 ictx
->state
->close();
981 } BOOST_SCOPE_EXIT_END
983 return ictx
->operations
->rename(dstname
);
986 int info(ImageCtx
*ictx
, image_info_t
& info
, size_t infosize
)
988 ldout(ictx
->cct
, 20) << "info " << ictx
<< dendl
;
990 int r
= ictx
->state
->refresh_if_required();
994 image_info(ictx
, info
, infosize
);
998 int get_old_format(ImageCtx
*ictx
, uint8_t *old
)
1000 int r
= ictx
->state
->refresh_if_required();
1003 *old
= ictx
->old_format
;
1007 int get_size(ImageCtx
*ictx
, uint64_t *size
)
1009 int r
= ictx
->state
->refresh_if_required();
1012 RWLock::RLocker
l2(ictx
->snap_lock
);
1013 *size
= ictx
->get_image_size(ictx
->snap_id
);
1017 int get_features(ImageCtx
*ictx
, uint64_t *features
)
1019 int r
= ictx
->state
->refresh_if_required();
1022 RWLock::RLocker
l(ictx
->snap_lock
);
1023 *features
= ictx
->features
;
1027 int get_overlap(ImageCtx
*ictx
, uint64_t *overlap
)
1029 int r
= ictx
->state
->refresh_if_required();
1032 RWLock::RLocker
l(ictx
->snap_lock
);
1033 RWLock::RLocker
l2(ictx
->parent_lock
);
1034 return ictx
->get_parent_overlap(ictx
->snap_id
, overlap
);
1037 int get_parent_info(ImageCtx
*ictx
, string
*parent_pool_name
,
1038 string
*parent_name
, string
*parent_id
,
1039 string
*parent_snap_name
)
1041 int r
= ictx
->state
->refresh_if_required();
1045 RWLock::RLocker
l(ictx
->snap_lock
);
1046 RWLock::RLocker
l2(ictx
->parent_lock
);
1047 if (ictx
->parent
== NULL
) {
1051 ParentSpec parent_spec
;
1053 if (ictx
->snap_id
== CEPH_NOSNAP
) {
1054 parent_spec
= ictx
->parent_md
.spec
;
1056 r
= ictx
->get_parent_spec(ictx
->snap_id
, &parent_spec
);
1058 lderr(ictx
->cct
) << "Can't find snapshot id = " << ictx
->snap_id
1062 if (parent_spec
.pool_id
== -1)
1065 if (parent_pool_name
) {
1066 Rados
rados(ictx
->md_ctx
);
1067 r
= rados
.pool_reverse_lookup(parent_spec
.pool_id
,
1070 lderr(ictx
->cct
) << "error looking up pool name: " << cpp_strerror(r
)
1076 if (parent_snap_name
) {
1077 RWLock::RLocker
l(ictx
->parent
->snap_lock
);
1078 r
= ictx
->parent
->get_snap_name(parent_spec
.snap_id
,
1081 lderr(ictx
->cct
) << "error finding parent snap name: "
1082 << cpp_strerror(r
) << dendl
;
1088 RWLock::RLocker
snap_locker(ictx
->parent
->snap_lock
);
1089 *parent_name
= ictx
->parent
->name
;
1092 *parent_id
= ictx
->parent
->id
;
1098 int get_flags(ImageCtx
*ictx
, uint64_t *flags
)
1100 int r
= ictx
->state
->refresh_if_required();
1105 RWLock::RLocker
l2(ictx
->snap_lock
);
1106 return ictx
->get_flags(ictx
->snap_id
, flags
);
1109 int set_image_notification(ImageCtx
*ictx
, int fd
, int type
)
1111 CephContext
*cct
= ictx
->cct
;
1112 ldout(cct
, 20) << __func__
<< " " << ictx
<< " fd " << fd
<< " type" << type
<< dendl
;
1114 int r
= ictx
->state
->refresh_if_required();
1119 if (ictx
->event_socket
.is_valid())
1121 return ictx
->event_socket
.init(fd
, type
);
1124 int is_exclusive_lock_owner(ImageCtx
*ictx
, bool *is_owner
)
1126 CephContext
*cct
= ictx
->cct
;
1127 ldout(cct
, 20) << __func__
<< ": ictx=" << ictx
<< dendl
;
1130 RWLock::RLocker
owner_locker(ictx
->owner_lock
);
1131 if (ictx
->exclusive_lock
== nullptr) {
1135 // might have been blacklisted by peer -- ensure we still own
1136 // the lock by pinging the OSD
1137 int r
= ictx
->exclusive_lock
->assert_header_locked();
1138 if (r
== -EBUSY
|| r
== -ENOENT
) {
1148 int lock_acquire(ImageCtx
*ictx
, rbd_lock_mode_t lock_mode
)
1150 CephContext
*cct
= ictx
->cct
;
1151 ldout(cct
, 20) << __func__
<< ": ictx=" << ictx
<< ", "
1152 << "lock_mode=" << lock_mode
<< dendl
;
1154 if (lock_mode
!= RBD_LOCK_MODE_EXCLUSIVE
) {
1158 C_SaferCond lock_ctx
;
1160 RWLock::WLocker
l(ictx
->owner_lock
);
1162 if (ictx
->exclusive_lock
== nullptr) {
1163 lderr(cct
) << "exclusive-lock feature is not enabled" << dendl
;
1167 if (ictx
->get_exclusive_lock_policy()->may_auto_request_lock()) {
1168 ictx
->set_exclusive_lock_policy(
1169 new exclusive_lock::StandardPolicy(ictx
));
1172 if (ictx
->exclusive_lock
->is_lock_owner()) {
1176 ictx
->exclusive_lock
->acquire_lock(&lock_ctx
);
1179 int r
= lock_ctx
.wait();
1181 lderr(cct
) << "failed to request exclusive lock: " << cpp_strerror(r
)
1186 RWLock::RLocker
l(ictx
->owner_lock
);
1187 if (ictx
->exclusive_lock
== nullptr) {
1189 } else if (!ictx
->exclusive_lock
->is_lock_owner()) {
1190 lderr(cct
) << "failed to acquire exclusive lock" << dendl
;
1191 return ictx
->exclusive_lock
->get_unlocked_op_error();
1197 int lock_release(ImageCtx
*ictx
)
1199 CephContext
*cct
= ictx
->cct
;
1200 ldout(cct
, 20) << __func__
<< ": ictx=" << ictx
<< dendl
;
1202 C_SaferCond lock_ctx
;
1204 RWLock::WLocker
l(ictx
->owner_lock
);
1206 if (ictx
->exclusive_lock
== nullptr ||
1207 !ictx
->exclusive_lock
->is_lock_owner()) {
1208 lderr(cct
) << "not exclusive lock owner" << dendl
;
1212 ictx
->exclusive_lock
->release_lock(&lock_ctx
);
1215 int r
= lock_ctx
.wait();
1217 lderr(cct
) << "failed to release exclusive lock: " << cpp_strerror(r
)
1224 int lock_get_owners(ImageCtx
*ictx
, rbd_lock_mode_t
*lock_mode
,
1225 std::list
<std::string
> *lock_owners
)
1227 CephContext
*cct
= ictx
->cct
;
1228 ldout(cct
, 20) << __func__
<< ": ictx=" << ictx
<< dendl
;
1230 if (!ictx
->test_features(RBD_FEATURE_EXCLUSIVE_LOCK
)) {
1231 lderr(cct
) << "exclusive-lock feature is not enabled" << dendl
;
1235 managed_lock::Locker locker
;
1236 C_SaferCond get_owner_ctx
;
1237 ExclusiveLock
<>(*ictx
).get_locker(&locker
, &get_owner_ctx
);
1238 int r
= get_owner_ctx
.wait();
1242 lderr(cct
) << "failed to determine current lock owner: "
1243 << cpp_strerror(r
) << dendl
;
1247 *lock_mode
= RBD_LOCK_MODE_EXCLUSIVE
;
1248 lock_owners
->clear();
1249 lock_owners
->emplace_back(locker
.address
);
1253 int lock_break(ImageCtx
*ictx
, rbd_lock_mode_t lock_mode
,
1254 const std::string
&lock_owner
)
1256 CephContext
*cct
= ictx
->cct
;
1257 ldout(cct
, 20) << __func__
<< ": ictx=" << ictx
<< ", "
1258 << "lock_mode=" << lock_mode
<< ", "
1259 << "lock_owner=" << lock_owner
<< dendl
;
1261 if (lock_mode
!= RBD_LOCK_MODE_EXCLUSIVE
) {
1265 if (ictx
->read_only
) {
1269 managed_lock::Locker locker
;
1270 C_SaferCond get_owner_ctx
;
1272 RWLock::RLocker
l(ictx
->owner_lock
);
1274 if (ictx
->exclusive_lock
== nullptr) {
1275 lderr(cct
) << "exclusive-lock feature is not enabled" << dendl
;
1279 ictx
->exclusive_lock
->get_locker(&locker
, &get_owner_ctx
);
1281 int r
= get_owner_ctx
.wait();
1285 lderr(cct
) << "failed to determine current lock owner: "
1286 << cpp_strerror(r
) << dendl
;
1290 if (locker
.address
!= lock_owner
) {
1294 C_SaferCond break_ctx
;
1296 RWLock::RLocker
l(ictx
->owner_lock
);
1298 if (ictx
->exclusive_lock
== nullptr) {
1299 lderr(cct
) << "exclusive-lock feature is not enabled" << dendl
;
1303 ictx
->exclusive_lock
->break_lock(locker
, true, &break_ctx
);
1305 r
= break_ctx
.wait();
1309 lderr(cct
) << "failed to break lock: " << cpp_strerror(r
) << dendl
;
1315 int remove(IoCtx
& io_ctx
, const std::string
&image_name
,
1316 const std::string
&image_id
, ProgressContext
& prog_ctx
,
1317 bool force
, bool from_trash_remove
)
1319 CephContext
*cct((CephContext
*)io_ctx
.cct());
1320 ldout(cct
, 20) << "remove " << &io_ctx
<< " "
1321 << (image_id
.empty() ? image_name
: image_id
) << dendl
;
1323 ThreadPool
*thread_pool
;
1324 ContextWQ
*op_work_queue
;
1325 ImageCtx::get_thread_pool_instance(cct
, &thread_pool
, &op_work_queue
);
1328 auto req
= librbd::image::RemoveRequest
<>::create(
1329 io_ctx
, image_name
, image_id
, force
, from_trash_remove
, prog_ctx
,
1330 op_work_queue
, &cond
);
1336 int trash_move(librados::IoCtx
&io_ctx
, rbd_trash_image_source_t source
,
1337 const std::string
&image_name
, uint64_t delay
) {
1338 CephContext
*cct((CephContext
*)io_ctx
.cct());
1339 ldout(cct
, 20) << "trash_move " << &io_ctx
<< " " << image_name
1342 std::string image_id
;
1343 ImageCtx
*ictx
= new ImageCtx(image_name
, "", nullptr, io_ctx
, false);
1344 int r
= ictx
->state
->open(true);
1349 ldout(cct
, 2) << "error opening image: " << cpp_strerror(-r
) << dendl
;
1353 // try to get image id from the directory
1354 r
= cls_client::dir_get_id(&io_ctx
, RBD_DIRECTORY
, image_name
, &image_id
);
1357 ldout(cct
, 2) << "error reading image id from dirctory: "
1358 << cpp_strerror(-r
) << dendl
;
1363 if (ictx
->old_format
) {
1364 ictx
->state
->close();
1368 image_id
= ictx
->id
;
1369 ictx
->owner_lock
.get_read();
1370 if (ictx
->exclusive_lock
!= nullptr) {
1371 r
= ictx
->operations
->prepare_image_update(false);
1373 lderr(cct
) << "cannot obtain exclusive lock - not removing" << dendl
;
1374 ictx
->owner_lock
.put_read();
1375 ictx
->state
->close();
1381 BOOST_SCOPE_EXIT_ALL(ictx
, cct
) {
1382 if (ictx
== nullptr)
1385 bool is_locked
= ictx
->exclusive_lock
!= nullptr &&
1386 ictx
->exclusive_lock
->is_lock_owner();
1389 auto exclusive_lock
= ictx
->exclusive_lock
;
1390 exclusive_lock
->shut_down(&ctx
);
1391 ictx
->owner_lock
.put_read();
1394 lderr(cct
) << "error shutting down exclusive lock" << dendl
;
1396 delete exclusive_lock
;
1398 ictx
->owner_lock
.put_read();
1400 ictx
->state
->close();
1403 ldout(cct
, 2) << "adding image entry to rbd_trash" << dendl
;
1404 utime_t ts
= ceph_clock_now();
1405 utime_t deferment_end_time
= ts
;
1406 deferment_end_time
+= (double)delay
;
1407 cls::rbd::TrashImageSource trash_source
=
1408 static_cast<cls::rbd::TrashImageSource
>(source
);
1409 cls::rbd::TrashImageSpec
trash_spec(trash_source
, image_name
, ts
,
1410 deferment_end_time
);
1411 r
= cls_client::trash_add(&io_ctx
, image_id
, trash_spec
);
1412 if (r
< 0 && r
!= -EEXIST
) {
1413 lderr(cct
) << "error adding image " << image_name
<< " to rbd_trash"
1416 } else if (r
== -EEXIST
) {
1417 ldout(cct
, 10) << "found previous unfinished deferred remove for image:"
1418 << image_id
<< dendl
;
1419 // continue with removing image from directory
1422 ldout(cct
, 2) << "removing id object..." << dendl
;
1423 r
= io_ctx
.remove(util::id_obj_name(image_name
));
1424 if (r
< 0 && r
!= -ENOENT
) {
1425 lderr(cct
) << "error removing id object: " << cpp_strerror(r
)
1430 ldout(cct
, 2) << "removing rbd image from v2 directory..." << dendl
;
1431 r
= cls_client::dir_remove_image(&io_ctx
, RBD_DIRECTORY
, image_name
,
1435 lderr(cct
) << "error removing image from v2 directory: "
1436 << cpp_strerror(-r
) << dendl
;
1444 int trash_get(IoCtx
&io_ctx
, const std::string
&id
,
1445 trash_image_info_t
*info
) {
1446 CephContext
*cct((CephContext
*)io_ctx
.cct());
1447 ldout(cct
, 20) << __func__
<< " " << &io_ctx
<< dendl
;
1449 cls::rbd::TrashImageSpec spec
;
1450 int r
= cls_client::trash_get(&io_ctx
, id
, &spec
);
1454 lderr(cct
) << "error retrieving trash entry: " << cpp_strerror(r
)
1459 rbd_trash_image_source_t source
= static_cast<rbd_trash_image_source_t
>(
1461 *info
= trash_image_info_t
{id
, spec
.name
, source
, spec
.deletion_time
.sec(),
1462 spec
.deferment_end_time
.sec()};
1466 int trash_list(IoCtx
&io_ctx
, vector
<trash_image_info_t
> &entries
) {
1467 CephContext
*cct((CephContext
*)io_ctx
.cct());
1468 ldout(cct
, 20) << "trash_list " << &io_ctx
<< dendl
;
1471 uint32_t max_read
= 1024;
1472 std::string last_read
= "";
1474 map
<string
, cls::rbd::TrashImageSpec
> trash_entries
;
1475 int r
= cls_client::trash_list(&io_ctx
, last_read
, max_read
,
1477 if (r
< 0 && r
!= -ENOENT
) {
1478 lderr(cct
) << "error listing rbd trash entries: " << cpp_strerror(r
)
1481 } else if (r
== -ENOENT
) {
1485 if (trash_entries
.empty()) {
1489 for (const auto &entry
: trash_entries
) {
1490 rbd_trash_image_source_t source
=
1491 static_cast<rbd_trash_image_source_t
>(entry
.second
.source
);
1492 entries
.push_back({entry
.first
, entry
.second
.name
, source
,
1493 entry
.second
.deletion_time
.sec(),
1494 entry
.second
.deferment_end_time
.sec()});
1496 last_read
= trash_entries
.rbegin()->first
;
1497 more_entries
= (trash_entries
.size() >= max_read
);
1498 } while (more_entries
);
1503 int trash_remove(IoCtx
&io_ctx
, const std::string
&image_id
, bool force
,
1504 ProgressContext
& prog_ctx
) {
1505 CephContext
*cct((CephContext
*)io_ctx
.cct());
1506 ldout(cct
, 20) << "trash_remove " << &io_ctx
<< " " << image_id
1507 << " " << force
<< dendl
;
1509 cls::rbd::TrashImageSpec trash_spec
;
1510 int r
= cls_client::trash_get(&io_ctx
, image_id
, &trash_spec
);
1512 lderr(cct
) << "error getting image id " << image_id
1513 << " info from trash: " << cpp_strerror(r
) << dendl
;
1517 utime_t now
= ceph_clock_now();
1518 if (now
< trash_spec
.deferment_end_time
&& !force
) {
1519 lderr(cct
) << "error: deferment time has not expired." << dendl
;
1523 r
= remove(io_ctx
, "", image_id
, prog_ctx
, false, true);
1525 lderr(cct
) << "error removing image " << image_id
1526 << ", which is pending deletion" << dendl
;
1529 r
= cls_client::trash_remove(&io_ctx
, image_id
);
1530 if (r
< 0 && r
!= -ENOENT
) {
1531 lderr(cct
) << "error removing image " << image_id
1532 << " from rbd_trash object" << dendl
;
1538 int trash_restore(librados::IoCtx
&io_ctx
, const std::string
&image_id
,
1539 const std::string
&image_new_name
) {
1540 CephContext
*cct((CephContext
*)io_ctx
.cct());
1541 ldout(cct
, 20) << "trash_restore " << &io_ctx
<< " " << image_id
<< " "
1542 << image_new_name
<< dendl
;
1544 cls::rbd::TrashImageSpec trash_spec
;
1545 int r
= cls_client::trash_get(&io_ctx
, image_id
, &trash_spec
);
1547 lderr(cct
) << "error getting image id " << image_id
1548 << " info from trash: " << cpp_strerror(r
) << dendl
;
1552 std::string image_name
= image_new_name
;
1553 if (image_name
.empty()) {
1554 // if user didn't specify a new name, let's try using the old name
1555 image_name
= trash_spec
.name
;
1556 ldout(cct
, 20) << "restoring image id " << image_id
<< " with name "
1557 << image_name
<< dendl
;
1560 // check if no image exists with the same name
1561 bool create_id_obj
= true;
1562 std::string existing_id
;
1563 r
= cls_client::get_id(&io_ctx
, util::id_obj_name(image_name
), &existing_id
);
1564 if (r
< 0 && r
!= -ENOENT
) {
1565 lderr(cct
) << "error checking if image " << image_name
<< " exists: "
1566 << cpp_strerror(r
) << dendl
;
1568 } else if (r
!= -ENOENT
){
1569 // checking if we are recovering from an incomplete restore
1570 if (existing_id
!= image_id
) {
1571 ldout(cct
, 2) << "an image with the same name already exists" << dendl
;
1574 create_id_obj
= false;
1577 if (create_id_obj
) {
1578 ldout(cct
, 2) << "adding id object" << dendl
;
1579 librados::ObjectWriteOperation op
;
1581 cls_client::set_id(&op
, image_id
);
1582 r
= io_ctx
.operate(util::id_obj_name(image_name
), &op
);
1584 lderr(cct
) << "error adding id object for image " << image_name
1585 << ": " << cpp_strerror(r
) << dendl
;
1590 ldout(cct
, 2) << "adding rbd image from v2 directory..." << dendl
;
1591 r
= cls_client::dir_add_image(&io_ctx
, RBD_DIRECTORY
, image_name
,
1593 if (r
< 0 && r
!= -EEXIST
) {
1594 lderr(cct
) << "error adding image to v2 directory: "
1595 << cpp_strerror(r
) << dendl
;
1599 ldout(cct
, 2) << "removing image from trash..." << dendl
;
1600 r
= cls_client::trash_remove(&io_ctx
, image_id
);
1601 if (r
< 0 && r
!= -ENOENT
) {
1602 lderr(cct
) << "error removing image id " << image_id
<< " from trash: "
1603 << cpp_strerror(r
) << dendl
;
1610 int snap_list(ImageCtx
*ictx
, vector
<snap_info_t
>& snaps
)
1612 ldout(ictx
->cct
, 20) << "snap_list " << ictx
<< dendl
;
1614 int r
= ictx
->state
->refresh_if_required();
1618 RWLock::RLocker
l(ictx
->snap_lock
);
1619 for (map
<snap_t
, SnapInfo
>::iterator it
= ictx
->snap_info
.begin();
1620 it
!= ictx
->snap_info
.end(); ++it
) {
1622 info
.name
= it
->second
.name
;
1623 info
.id
= it
->first
;
1624 info
.size
= it
->second
.size
;
1625 snaps
.push_back(info
);
1631 int snap_exists(ImageCtx
*ictx
, const cls::rbd::SnapshotNamespace
& snap_namespace
,
1632 const char *snap_name
, bool *exists
)
1634 ldout(ictx
->cct
, 20) << "snap_exists " << ictx
<< " " << snap_name
<< dendl
;
1636 int r
= ictx
->state
->refresh_if_required();
1640 RWLock::RLocker
l(ictx
->snap_lock
);
1641 *exists
= ictx
->get_snap_id(snap_namespace
, snap_name
) != CEPH_NOSNAP
;
1645 int snap_remove(ImageCtx
*ictx
, const char *snap_name
, uint32_t flags
,
1646 ProgressContext
& pctx
)
1648 ldout(ictx
->cct
, 20) << "snap_remove " << ictx
<< " " << snap_name
<< " flags: " << flags
<< dendl
;
1652 r
= ictx
->state
->refresh_if_required();
1656 if (flags
& RBD_SNAP_REMOVE_FLATTEN
) {
1657 r
= flatten_children(ictx
, snap_name
, pctx
);
1664 r
= snap_is_protected(ictx
, snap_name
, &is_protected
);
1669 if (is_protected
&& flags
& RBD_SNAP_REMOVE_UNPROTECT
) {
1670 r
= ictx
->operations
->snap_unprotect(cls::rbd::UserSnapshotNamespace(), snap_name
);
1672 lderr(ictx
->cct
) << "failed to unprotect snapshot: " << snap_name
<< dendl
;
1676 r
= snap_is_protected(ictx
, snap_name
, &is_protected
);
1681 lderr(ictx
->cct
) << "snapshot is still protected after unprotection" << dendl
;
1687 ictx
->operations
->snap_remove(cls::rbd::UserSnapshotNamespace(), snap_name
, &ctx
);
1693 int snap_get_timestamp(ImageCtx
*ictx
, uint64_t snap_id
, struct timespec
*timestamp
)
1695 std::map
<librados::snap_t
, SnapInfo
>::iterator snap_it
= ictx
->snap_info
.find(snap_id
);
1696 assert(snap_it
!= ictx
->snap_info
.end());
1697 utime_t time
= snap_it
->second
.timestamp
;
1698 time
.to_timespec(timestamp
);
1702 int snap_get_limit(ImageCtx
*ictx
, uint64_t *limit
)
1704 int r
= cls_client::snapshot_get_limit(&ictx
->md_ctx
, ictx
->header_oid
,
1706 if (r
== -EOPNOTSUPP
) {
1707 *limit
= UINT64_MAX
;
1713 int snap_set_limit(ImageCtx
*ictx
, uint64_t limit
)
1715 return ictx
->operations
->snap_set_limit(limit
);
1718 struct CopyProgressCtx
{
1719 explicit CopyProgressCtx(ProgressContext
&p
)
1720 : destictx(NULL
), src_size(0), prog_ctx(p
)
1725 ProgressContext
&prog_ctx
;
1728 int copy(ImageCtx
*src
, IoCtx
& dest_md_ctx
, const char *destname
,
1729 ImageOptions
& opts
, ProgressContext
&prog_ctx
, size_t sparse_size
)
1731 CephContext
*cct
= (CephContext
*)dest_md_ctx
.cct();
1732 ldout(cct
, 20) << "copy " << src
->name
1733 << (src
->snap_name
.length() ? "@" + src
->snap_name
: "")
1734 << " -> " << destname
<< " opts = " << opts
<< dendl
;
1736 src
->snap_lock
.get_read();
1737 uint64_t features
= src
->features
;
1738 uint64_t src_size
= src
->get_image_size(src
->snap_id
);
1739 src
->snap_lock
.put_read();
1740 uint64_t format
= src
->old_format
? 1 : 2;
1741 if (opts
.get(RBD_IMAGE_OPTION_FORMAT
, &format
) != 0) {
1742 opts
.set(RBD_IMAGE_OPTION_FORMAT
, format
);
1744 uint64_t stripe_unit
= src
->stripe_unit
;
1745 if (opts
.get(RBD_IMAGE_OPTION_STRIPE_UNIT
, &stripe_unit
) != 0) {
1746 opts
.set(RBD_IMAGE_OPTION_STRIPE_UNIT
, stripe_unit
);
1748 uint64_t stripe_count
= src
->stripe_count
;
1749 if (opts
.get(RBD_IMAGE_OPTION_STRIPE_COUNT
, &stripe_count
) != 0) {
1750 opts
.set(RBD_IMAGE_OPTION_STRIPE_COUNT
, stripe_count
);
1752 uint64_t order
= src
->order
;
1753 if (opts
.get(RBD_IMAGE_OPTION_ORDER
, &order
) != 0) {
1754 opts
.set(RBD_IMAGE_OPTION_ORDER
, order
);
1756 if (opts
.get(RBD_IMAGE_OPTION_FEATURES
, &features
) != 0) {
1757 opts
.set(RBD_IMAGE_OPTION_FEATURES
, features
);
1759 if (features
& ~RBD_FEATURES_ALL
) {
1760 lderr(cct
) << "librbd does not support requested features" << dendl
;
1764 int r
= create(dest_md_ctx
, destname
, "", src_size
, opts
, "", "", false);
1766 lderr(cct
) << "header creation failed" << dendl
;
1769 opts
.set(RBD_IMAGE_OPTION_ORDER
, static_cast<uint64_t>(order
));
1771 ImageCtx
*dest
= new librbd::ImageCtx(destname
, "", NULL
,
1772 dest_md_ctx
, false);
1773 r
= dest
->state
->open(false);
1775 lderr(cct
) << "failed to read newly created header" << dendl
;
1779 r
= copy(src
, dest
, prog_ctx
, sparse_size
);
1781 int close_r
= dest
->state
->close();
1782 if (r
== 0 && close_r
< 0) {
1788 class C_CopyWrite
: public Context
{
1790 C_CopyWrite(bufferlist
*bl
, Context
* ctx
)
1791 : m_bl(bl
), m_ctx(ctx
) {}
1792 void finish(int r
) override
{
1801 class C_CopyRead
: public Context
{
1803 C_CopyRead(SimpleThrottle
*throttle
, ImageCtx
*dest
, uint64_t offset
,
1804 bufferlist
*bl
, size_t sparse_size
)
1805 : m_throttle(throttle
), m_dest(dest
), m_offset(offset
), m_bl(bl
),
1806 m_sparse_size(sparse_size
) {
1807 m_throttle
->start_op();
1809 void finish(int r
) override
{
1811 lderr(m_dest
->cct
) << "error reading from source image at offset "
1812 << m_offset
<< ": " << cpp_strerror(r
) << dendl
;
1814 m_throttle
->end_op(r
);
1817 assert(m_bl
->length() == (size_t)r
);
1819 if (m_bl
->is_zero()) {
1821 m_throttle
->end_op(r
);
1825 if (!m_sparse_size
) {
1826 m_sparse_size
= (1 << m_dest
->order
);
1829 auto *throttle
= m_throttle
;
1830 auto *end_op_ctx
= new FunctionContext([throttle
](int r
) {
1831 throttle
->end_op(r
);
1833 auto gather_ctx
= new C_Gather(m_dest
->cct
, end_op_ctx
);
1835 bufferptr
m_ptr(m_bl
->length());
1836 m_bl
->rebuild(m_ptr
);
1837 size_t write_offset
= 0;
1838 size_t write_length
= 0;
1840 size_t length
= m_bl
->length();
1841 while (offset
< length
) {
1842 if (util::calc_sparse_extent(m_ptr
,
1848 bufferptr
write_ptr(m_ptr
, write_offset
, write_length
);
1849 bufferlist
*write_bl
= new bufferlist();
1850 write_bl
->push_back(write_ptr
);
1851 Context
*ctx
= new C_CopyWrite(write_bl
, gather_ctx
->new_sub());
1852 auto comp
= io::AioCompletion::create(ctx
);
1854 // coordinate through AIO WQ to ensure lock is acquired if needed
1855 m_dest
->io_work_queue
->aio_write(comp
, m_offset
+ write_offset
,
1857 std::move(*write_bl
),
1858 LIBRADOS_OP_FLAG_FADVISE_DONTNEED
,
1859 std::move(read_trace
));
1860 write_offset
= offset
;
1865 assert(gather_ctx
->get_sub_created_count() > 0);
1866 gather_ctx
->activate();
1869 ZTracer::Trace read_trace
;
1872 SimpleThrottle
*m_throttle
;
1876 size_t m_sparse_size
;
1879 int copy(ImageCtx
*src
, ImageCtx
*dest
, ProgressContext
&prog_ctx
, size_t sparse_size
)
1881 src
->snap_lock
.get_read();
1882 uint64_t src_size
= src
->get_image_size(src
->snap_id
);
1883 src
->snap_lock
.put_read();
1885 dest
->snap_lock
.get_read();
1886 uint64_t dest_size
= dest
->get_image_size(dest
->snap_id
);
1887 dest
->snap_lock
.put_read();
1889 CephContext
*cct
= src
->cct
;
1890 if (dest_size
< src_size
) {
1891 lderr(cct
) << " src size " << src_size
<< " > dest size "
1892 << dest_size
<< dendl
;
1896 const uint32_t MAX_KEYS
= 64;
1897 map
<string
, bufferlist
> pairs
;
1898 std::string last_key
= "";
1899 bool more_results
= true;
1901 while (more_results
) {
1902 r
= cls_client::metadata_list(&src
->md_ctx
, src
->header_oid
, last_key
, 0, &pairs
);
1903 if (r
< 0 && r
!= -EOPNOTSUPP
&& r
!= -EIO
) {
1904 lderr(cct
) << "couldn't list metadata: " << cpp_strerror(r
) << dendl
;
1906 } else if (r
== 0 && !pairs
.empty()) {
1907 r
= cls_client::metadata_set(&dest
->md_ctx
, dest
->header_oid
, pairs
);
1909 lderr(cct
) << "couldn't set metadata: " << cpp_strerror(r
) << dendl
;
1913 last_key
= pairs
.rbegin()->first
;
1916 more_results
= (pairs
.size() == MAX_KEYS
);
1920 ZTracer::Trace trace
;
1921 if (src
->blkin_trace_all
) {
1922 trace
.init("copy", &src
->trace_endpoint
);
1925 RWLock::RLocker
owner_lock(src
->owner_lock
);
1926 SimpleThrottle
throttle(src
->concurrent_management_ops
, false);
1927 uint64_t period
= src
->get_stripe_period();
1928 unsigned fadvise_flags
= LIBRADOS_OP_FLAG_FADVISE_SEQUENTIAL
|
1929 LIBRADOS_OP_FLAG_FADVISE_NOCACHE
;
1930 for (uint64_t offset
= 0; offset
< src_size
; offset
+= period
) {
1931 if (throttle
.pending_error()) {
1932 return throttle
.wait_for_ret();
1935 uint64_t len
= min(period
, src_size
- offset
);
1936 bufferlist
*bl
= new bufferlist();
1937 auto ctx
= new C_CopyRead(&throttle
, dest
, offset
, bl
, sparse_size
);
1938 auto comp
= io::AioCompletion::create_and_start
<Context
>(
1939 ctx
, src
, io::AIO_TYPE_READ
);
1941 io::ImageReadRequest
<> req(*src
, comp
, {{offset
, len
}},
1942 io::ReadResult
{bl
}, fadvise_flags
,
1944 ctx
->read_trace
= req
.get_trace();
1947 prog_ctx
.update_progress(offset
, src_size
);
1950 r
= throttle
.wait_for_ret();
1952 prog_ctx
.update_progress(src_size
, src_size
);
1956 int snap_set(ImageCtx
*ictx
, const cls::rbd::SnapshotNamespace
&snap_namespace
,
1957 const char *snap_name
)
1959 ldout(ictx
->cct
, 20) << "snap_set " << ictx
<< " snap = "
1960 << (snap_name
? snap_name
: "NULL") << dendl
;
1962 // ignore return value, since we may be set to a non-existent
1963 // snapshot and the user is trying to fix that
1964 ictx
->state
->refresh_if_required();
1967 std::string
name(snap_name
== nullptr ? "" : snap_name
);
1968 ictx
->state
->snap_set(snap_namespace
, name
, &ctx
);
1973 lderr(ictx
->cct
) << "failed to " << (name
.empty() ? "un" : "") << "set "
1974 << "snapshot: " << cpp_strerror(r
) << dendl
;
1982 int list_lockers(ImageCtx
*ictx
,
1983 std::list
<locker_t
> *lockers
,
1987 ldout(ictx
->cct
, 20) << "list_locks on image " << ictx
<< dendl
;
1989 int r
= ictx
->state
->refresh_if_required();
1993 RWLock::RLocker
locker(ictx
->md_lock
);
1995 *exclusive
= ictx
->exclusive_locked
;
1997 *tag
= ictx
->lock_tag
;
2000 map
<rados::cls::lock::locker_id_t
,
2001 rados::cls::lock::locker_info_t
>::const_iterator it
;
2002 for (it
= ictx
->lockers
.begin(); it
!= ictx
->lockers
.end(); ++it
) {
2004 locker
.client
= stringify(it
->first
.locker
);
2005 locker
.cookie
= it
->first
.cookie
;
2006 locker
.address
= stringify(it
->second
.addr
);
2007 lockers
->push_back(locker
);
2014 int lock(ImageCtx
*ictx
, bool exclusive
, const string
& cookie
,
2017 ldout(ictx
->cct
, 20) << "lock image " << ictx
<< " exclusive=" << exclusive
2018 << " cookie='" << cookie
<< "' tag='" << tag
<< "'"
2021 int r
= ictx
->state
->refresh_if_required();
2026 * If we wanted we could do something more intelligent, like local
2027 * checks that we think we will succeed. But for now, let's not
2028 * duplicate that code.
2031 RWLock::RLocker
locker(ictx
->md_lock
);
2032 r
= rados::cls::lock::lock(&ictx
->md_ctx
, ictx
->header_oid
, RBD_LOCK_NAME
,
2033 exclusive
? LOCK_EXCLUSIVE
: LOCK_SHARED
,
2034 cookie
, tag
, "", utime_t(), 0);
2040 ictx
->notify_update();
2044 int unlock(ImageCtx
*ictx
, const string
& cookie
)
2046 ldout(ictx
->cct
, 20) << "unlock image " << ictx
2047 << " cookie='" << cookie
<< "'" << dendl
;
2049 int r
= ictx
->state
->refresh_if_required();
2054 RWLock::RLocker
locker(ictx
->md_lock
);
2055 r
= rados::cls::lock::unlock(&ictx
->md_ctx
, ictx
->header_oid
,
2056 RBD_LOCK_NAME
, cookie
);
2062 ictx
->notify_update();
2066 int break_lock(ImageCtx
*ictx
, const string
& client
,
2067 const string
& cookie
)
2069 ldout(ictx
->cct
, 20) << "break_lock image " << ictx
<< " client='" << client
2070 << "' cookie='" << cookie
<< "'" << dendl
;
2072 int r
= ictx
->state
->refresh_if_required();
2076 entity_name_t lock_client
;
2077 if (!lock_client
.parse(client
)) {
2078 lderr(ictx
->cct
) << "Unable to parse client '" << client
2083 if (ictx
->blacklist_on_break_lock
) {
2084 typedef std::map
<rados::cls::lock::locker_id_t
,
2085 rados::cls::lock::locker_info_t
> Lockers
;
2087 ClsLockType lock_type
;
2088 std::string lock_tag
;
2089 r
= rados::cls::lock::get_lock_info(&ictx
->md_ctx
, ictx
->header_oid
,
2090 RBD_LOCK_NAME
, &lockers
, &lock_type
,
2093 lderr(ictx
->cct
) << "unable to retrieve lock info: " << cpp_strerror(r
)
2098 std::string client_address
;
2099 for (Lockers::iterator it
= lockers
.begin();
2100 it
!= lockers
.end(); ++it
) {
2101 if (it
->first
.locker
== lock_client
) {
2102 client_address
= stringify(it
->second
.addr
);
2106 if (client_address
.empty()) {
2110 RWLock::RLocker
locker(ictx
->md_lock
);
2111 librados::Rados
rados(ictx
->md_ctx
);
2112 r
= rados
.blacklist_add(client_address
,
2113 ictx
->blacklist_expire_seconds
);
2115 lderr(ictx
->cct
) << "unable to blacklist client: " << cpp_strerror(r
)
2121 r
= rados::cls::lock::break_lock(&ictx
->md_ctx
, ictx
->header_oid
,
2122 RBD_LOCK_NAME
, cookie
, lock_client
);
2125 ictx
->notify_update();
2129 void rbd_ctx_cb(completion_t cb
, void *arg
)
2131 Context
*ctx
= reinterpret_cast<Context
*>(arg
);
2132 auto comp
= reinterpret_cast<io::AioCompletion
*>(cb
);
2133 ctx
->complete(comp
->get_return_value());
2137 int64_t read_iterate(ImageCtx
*ictx
, uint64_t off
, uint64_t len
,
2138 int (*cb
)(uint64_t, size_t, const char *, void *),
2141 utime_t start_time
, elapsed
;
2143 ldout(ictx
->cct
, 20) << "read_iterate " << ictx
<< " off = " << off
2144 << " len = " << len
<< dendl
;
2146 int r
= ictx
->state
->refresh_if_required();
2150 uint64_t mylen
= len
;
2151 ictx
->snap_lock
.get_read();
2152 r
= clip_io(ictx
, off
, &mylen
);
2153 ictx
->snap_lock
.put_read();
2157 int64_t total_read
= 0;
2158 uint64_t period
= ictx
->get_stripe_period();
2159 uint64_t left
= mylen
;
2161 ZTracer::Trace trace
;
2162 if (ictx
->blkin_trace_all
) {
2163 trace
.init("read_iterate", &ictx
->trace_endpoint
);
2166 RWLock::RLocker
owner_locker(ictx
->owner_lock
);
2167 start_time
= ceph_clock_now();
2169 uint64_t period_off
= off
- (off
% period
);
2170 uint64_t read_len
= min(period_off
+ period
- off
, left
);
2175 auto c
= io::AioCompletion::create_and_start(&ctx
, ictx
,
2177 io::ImageRequest
<>::aio_read(ictx
, c
, {{off
, read_len
}},
2178 io::ReadResult
{&bl
}, 0, std::move(trace
));
2180 int ret
= ctx
.wait();
2185 r
= cb(total_read
, ret
, bl
.c_str(), arg
);
2195 elapsed
= ceph_clock_now() - start_time
;
2196 ictx
->perfcounter
->tinc(l_librbd_rd_latency
, elapsed
);
2197 ictx
->perfcounter
->inc(l_librbd_rd
);
2198 ictx
->perfcounter
->inc(l_librbd_rd_bytes
, mylen
);
2202 // validate extent against image size; clip to image size if necessary
2203 int clip_io(ImageCtx
*ictx
, uint64_t off
, uint64_t *len
)
2205 assert(ictx
->snap_lock
.is_locked());
2206 uint64_t image_size
= ictx
->get_image_size(ictx
->snap_id
);
2207 bool snap_exists
= ictx
->snap_exists
;
2212 // special-case "len == 0" requests: always valid
2216 // can't start past end
2217 if (off
>= image_size
)
2220 // clip requests that extend past end to just end
2221 if ((off
+ *len
) > image_size
)
2222 *len
= (size_t)(image_size
- off
);
2227 int flush(ImageCtx
*ictx
)
2229 CephContext
*cct
= ictx
->cct
;
2230 ldout(cct
, 20) << "flush " << ictx
<< dendl
;
2232 int r
= ictx
->state
->refresh_if_required();
2237 ictx
->user_flushed();
2240 RWLock::RLocker
owner_locker(ictx
->owner_lock
);
2245 ictx
->perfcounter
->inc(l_librbd_flush
);
2249 int invalidate_cache(ImageCtx
*ictx
)
2251 CephContext
*cct
= ictx
->cct
;
2252 ldout(cct
, 20) << "invalidate_cache " << ictx
<< dendl
;
2254 int r
= ictx
->state
->refresh_if_required();
2259 RWLock::RLocker
owner_locker(ictx
->owner_lock
);
2260 r
= ictx
->invalidate_cache(false);
2261 ictx
->perfcounter
->inc(l_librbd_invalidate_cache
);
2265 int poll_io_events(ImageCtx
*ictx
, io::AioCompletion
**comps
, int numcomp
)
2269 CephContext
*cct
= ictx
->cct
;
2270 ldout(cct
, 20) << __func__
<< " " << ictx
<< " numcomp = " << numcomp
2273 Mutex::Locker
l(ictx
->completed_reqs_lock
);
2274 while (i
< numcomp
) {
2275 if (ictx
->completed_reqs
.empty())
2277 comps
[i
++] = ictx
->completed_reqs
.front();
2278 ictx
->completed_reqs
.pop_front();
2283 int metadata_get(ImageCtx
*ictx
, const string
&key
, string
*value
)
2285 CephContext
*cct
= ictx
->cct
;
2286 ldout(cct
, 20) << "metadata_get " << ictx
<< " key=" << key
<< dendl
;
2288 int r
= ictx
->state
->refresh_if_required();
2293 return cls_client::metadata_get(&ictx
->md_ctx
, ictx
->header_oid
, key
, value
);
2296 int metadata_list(ImageCtx
*ictx
, const string
&start
, uint64_t max
, map
<string
, bufferlist
> *pairs
)
2298 CephContext
*cct
= ictx
->cct
;
2299 ldout(cct
, 20) << "metadata_list " << ictx
<< dendl
;
2301 int r
= ictx
->state
->refresh_if_required();
2306 return cls_client::metadata_list(&ictx
->md_ctx
, ictx
->header_oid
, start
, max
, pairs
);
2309 struct C_RBD_Readahead
: public Context
{
2314 C_RBD_Readahead(ImageCtx
*ictx
, object_t oid
, uint64_t offset
, uint64_t length
)
2315 : ictx(ictx
), oid(oid
), offset(offset
), length(length
) { }
2316 void finish(int r
) override
{
2317 ldout(ictx
->cct
, 20) << "C_RBD_Readahead on " << oid
<< ": " << offset
<< "+" << length
<< dendl
;
2318 ictx
->readahead
.dec_pending();
2322 void readahead(ImageCtx
*ictx
,
2323 const vector
<pair
<uint64_t,uint64_t> >& image_extents
)
2325 uint64_t total_bytes
= 0;
2326 for (vector
<pair
<uint64_t,uint64_t> >::const_iterator p
= image_extents
.begin();
2327 p
!= image_extents
.end();
2329 total_bytes
+= p
->second
;
2332 ictx
->md_lock
.get_write();
2333 bool abort
= ictx
->readahead_disable_after_bytes
!= 0 &&
2334 ictx
->total_bytes_read
> ictx
->readahead_disable_after_bytes
;
2336 ictx
->md_lock
.put_write();
2339 ictx
->total_bytes_read
+= total_bytes
;
2340 ictx
->snap_lock
.get_read();
2341 uint64_t image_size
= ictx
->get_image_size(ictx
->snap_id
);
2342 ictx
->snap_lock
.put_read();
2343 ictx
->md_lock
.put_write();
2345 pair
<uint64_t, uint64_t> readahead_extent
= ictx
->readahead
.update(image_extents
, image_size
);
2346 uint64_t readahead_offset
= readahead_extent
.first
;
2347 uint64_t readahead_length
= readahead_extent
.second
;
2349 if (readahead_length
> 0) {
2350 ldout(ictx
->cct
, 20) << "(readahead logical) " << readahead_offset
<< "~" << readahead_length
<< dendl
;
2351 map
<object_t
,vector
<ObjectExtent
> > readahead_object_extents
;
2352 Striper::file_to_extents(ictx
->cct
, ictx
->format_string
, &ictx
->layout
,
2353 readahead_offset
, readahead_length
, 0, readahead_object_extents
);
2354 for (map
<object_t
,vector
<ObjectExtent
> >::iterator p
= readahead_object_extents
.begin(); p
!= readahead_object_extents
.end(); ++p
) {
2355 for (vector
<ObjectExtent
>::iterator q
= p
->second
.begin(); q
!= p
->second
.end(); ++q
) {
2356 ldout(ictx
->cct
, 20) << "(readahead) oid " << q
->oid
<< " " << q
->offset
<< "~" << q
->length
<< dendl
;
2358 Context
*req_comp
= new C_RBD_Readahead(ictx
, q
->oid
, q
->offset
, q
->length
);
2359 ictx
->readahead
.inc_pending();
2360 ictx
->aio_read_from_cache(q
->oid
, q
->objectno
, NULL
,
2361 q
->length
, q
->offset
,
2362 req_comp
, 0, nullptr);
2365 ictx
->perfcounter
->inc(l_librbd_readahead
);
2366 ictx
->perfcounter
->inc(l_librbd_readahead_bytes
, readahead_length
);