1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
7 #include "common/config.h"
8 #include "common/Formatter.h"
9 #include "common/errno.h"
11 #include "rgw_rados.h"
13 #include "rgw_multi.h"
14 #include "rgw_orphan.h"
16 #include "rgw_bucket.h"
18 #include "services/svc_zone.h"
19 #include "services/svc_sys_obj.h"
21 #define dout_subsys ceph_subsys_rgw
23 #define DEFAULT_NUM_SHARDS 64
25 static string
obj_fingerprint(const string
& oid
, const char *force_ns
= NULL
)
27 ssize_t pos
= oid
.find('_');
29 cerr
<< "ERROR: object does not have a bucket marker: " << oid
<< std::endl
;
32 string obj_marker
= oid
.substr(0, pos
);
36 rgw_obj_key::parse_raw_oid(oid
.substr(pos
+ 1), &key
);
46 rgw_obj
new_obj(b
, key
);
47 s
= obj_marker
+ "_" + new_obj
.get_oid();
51 size_t i
= s
.size() - 1;
52 for (; i
>= s
.size() - 10; --i
) {
54 if (!isdigit(c
) && c
!= '.' && c
!= '_') {
59 return s
.substr(0, i
+ 1);
62 int RGWOrphanStore::read_job(const string
& job_name
, RGWOrphanSearchState
& state
)
65 map
<string
, bufferlist
> vals
;
66 keys
.insert(job_name
);
67 int r
= ioctx
.omap_get_vals_by_keys(oid
, keys
, &vals
);
72 map
<string
, bufferlist
>::iterator iter
= vals
.find(job_name
);
73 if (iter
== vals
.end()) {
78 bufferlist
& bl
= iter
->second
;
80 } catch (buffer::error
& err
) {
81 lderr(store
->ctx()) << "ERROR: could not decode buffer" << dendl
;
88 int RGWOrphanStore::write_job(const string
& job_name
, const RGWOrphanSearchState
& state
)
90 map
<string
, bufferlist
> vals
;
94 int r
= ioctx
.omap_set(oid
, vals
);
102 int RGWOrphanStore::remove_job(const string
& job_name
)
105 keys
.insert(job_name
);
107 int r
= ioctx
.omap_rm_keys(oid
, keys
);
115 int RGWOrphanStore::list_jobs(map
<string
,RGWOrphanSearchState
>& job_list
)
117 map
<string
,bufferlist
> vals
;
122 // loop through all the omap vals from index object, storing them to job_list,
123 // read in batches of 1024, we update the marker every iteration and exit the
124 // loop when we find that total size read out is less than batch size
126 r
= ioctx
.omap_get_vals(oid
, marker
, MAX_READ
, &vals
);
132 for (const auto &it
: vals
) {
134 RGWOrphanSearchState state
;
136 bufferlist bl
= it
.second
;
138 } catch (buffer::error
& err
) {
139 lderr(store
->ctx()) << "ERROR: could not decode buffer" << dendl
;
142 job_list
[it
.first
] = state
;
144 } while (r
== MAX_READ
);
149 int RGWOrphanStore::init()
151 const rgw_pool
& log_pool
= store
->svc()->zone
->get_zone_params().log_pool
;
152 int r
= rgw_init_ioctx(store
->getRados()->get_rados_handle(), log_pool
, ioctx
);
154 cerr
<< "ERROR: failed to open log pool (" << log_pool
<< " ret=" << r
<< std::endl
;
161 int RGWOrphanStore::store_entries(const string
& oid
, const map
<string
, bufferlist
>& entries
)
163 librados::ObjectWriteOperation op
;
164 op
.omap_set(entries
);
165 cout
<< "storing " << entries
.size() << " entries at " << oid
<< std::endl
;
166 ldout(store
->ctx(), 20) << "storing " << entries
.size() << " entries at " << oid
<< ": " << dendl
;
167 for (map
<string
, bufferlist
>::const_iterator iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
168 ldout(store
->ctx(), 20) << " > " << iter
->first
<< dendl
;
170 int ret
= rgw_rados_operate(ioctx
, oid
, &op
, null_yield
);
172 lderr(store
->ctx()) << "ERROR: " << __func__
<< "(" << oid
<< ") returned ret=" << ret
<< dendl
;
178 int RGWOrphanStore::read_entries(const string
& oid
, const string
& marker
, map
<string
, bufferlist
> *entries
, bool *truncated
)
180 #define MAX_OMAP_GET 100
181 int ret
= ioctx
.omap_get_vals(oid
, marker
, MAX_OMAP_GET
, entries
);
182 if (ret
< 0 && ret
!= -ENOENT
) {
183 cerr
<< "ERROR: " << __func__
<< "(" << oid
<< ") returned ret=" << cpp_strerror(-ret
) << std::endl
;
186 *truncated
= (entries
->size() == MAX_OMAP_GET
);
191 int RGWOrphanSearch::init(const string
& job_name
, RGWOrphanSearchInfo
*info
, bool _detailed_mode
)
193 int r
= orphan_store
.init();
198 constexpr int64_t MAX_LIST_OBJS_ENTRIES
=100;
200 max_list_bucket_entries
= std::max(store
->ctx()->_conf
->rgw_list_bucket_min_readahead
,
201 MAX_LIST_OBJS_ENTRIES
);
203 detailed_mode
= _detailed_mode
;
204 RGWOrphanSearchState state
;
205 r
= orphan_store
.read_job(job_name
, state
);
206 if (r
< 0 && r
!= -ENOENT
) {
207 lderr(store
->ctx()) << "ERROR: failed to read state ret=" << r
<< dendl
;
212 search_info
= state
.info
;
213 search_stage
= state
.stage
;
214 } else if (info
) { /* r == -ENOENT, initiate a new job if info was provided */
216 search_info
.job_name
= job_name
;
217 search_info
.num_shards
= (info
->num_shards
? info
->num_shards
: DEFAULT_NUM_SHARDS
);
218 search_info
.start_time
= ceph_clock_now();
219 search_stage
= RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_INIT
);
223 lderr(store
->ctx()) << "ERROR: failed to write state ret=" << r
<< dendl
;
227 lderr(store
->ctx()) << "ERROR: job not found" << dendl
;
231 index_objs_prefix
= RGW_ORPHAN_INDEX_PREFIX
+ string(".");
232 index_objs_prefix
+= job_name
;
234 for (int i
= 0; i
< search_info
.num_shards
; i
++) {
237 snprintf(buf
, sizeof(buf
), "%s.rados.%d", index_objs_prefix
.c_str(), i
);
238 all_objs_index
[i
] = buf
;
240 snprintf(buf
, sizeof(buf
), "%s.buckets.%d", index_objs_prefix
.c_str(), i
);
241 buckets_instance_index
[i
] = buf
;
243 snprintf(buf
, sizeof(buf
), "%s.linked.%d", index_objs_prefix
.c_str(), i
);
244 linked_objs_index
[i
] = buf
;
249 int RGWOrphanSearch::log_oids(map
<int, string
>& log_shards
, map
<int, list
<string
> >& oids
)
251 map
<int, list
<string
> >::iterator miter
= oids
.begin();
253 list
<log_iter_info
> liters
; /* a list of iterator pairs for begin and end */
255 for (; miter
!= oids
.end(); ++miter
) {
257 info
.oid
= log_shards
[miter
->first
];
258 info
.cur
= miter
->second
.begin();
259 info
.end
= miter
->second
.end();
260 liters
.push_back(info
);
263 list
<log_iter_info
>::iterator list_iter
;
264 while (!liters
.empty()) {
265 list_iter
= liters
.begin();
267 while (list_iter
!= liters
.end()) {
268 log_iter_info
& cur_info
= *list_iter
;
270 list
<string
>::iterator
& cur
= cur_info
.cur
;
271 list
<string
>::iterator
& end
= cur_info
.end
;
273 map
<string
, bufferlist
> entries
;
274 #define MAX_OMAP_SET_ENTRIES 100
275 for (int j
= 0; cur
!= end
&& j
!= MAX_OMAP_SET_ENTRIES
; ++cur
, ++j
) {
276 ldout(store
->ctx(), 20) << "adding obj: " << *cur
<< dendl
;
277 entries
[*cur
] = bufferlist();
280 int ret
= orphan_store
.store_entries(cur_info
.oid
, entries
);
284 list
<log_iter_info
>::iterator tmp
= list_iter
;
294 int RGWOrphanSearch::build_all_oids_index()
296 librados::IoCtx ioctx
;
298 int ret
= rgw_init_ioctx(store
->getRados()->get_rados_handle(), search_info
.pool
, ioctx
);
300 lderr(store
->ctx()) << __func__
<< ": rgw_init_ioctx() returned ret=" << ret
<< dendl
;
304 ioctx
.set_namespace(librados::all_nspaces
);
305 librados::NObjectIterator i
= ioctx
.nobjects_begin();
306 librados::NObjectIterator i_end
= ioctx
.nobjects_end();
308 map
<int, list
<string
> > oids
;
313 cout
<< "logging all objects in the pool" << std::endl
;
315 for (; i
!= i_end
; ++i
) {
316 string nspace
= i
->get_nspace();
317 string oid
= i
->get_oid();
318 string locator
= i
->get_locator();
320 ssize_t pos
= oid
.find('_');
322 cout
<< "unidentified oid: " << oid
<< ", skipping" << std::endl
;
323 /* what is this object, oids should be in the format of <bucket marker>_<obj>,
328 string stripped_oid
= oid
.substr(pos
+ 1);
330 if (!rgw_obj_key::parse_raw_oid(stripped_oid
, &key
)) {
331 cout
<< "cannot parse oid: " << oid
<< ", skipping" << std::endl
;
335 if (key
.ns
.empty()) {
336 /* skipping head objects, we don't want to remove these as they are mutable and
337 * cleaning them up is racy (can race with object removal and a later recreation)
339 cout
<< "skipping head object: oid=" << oid
<< std::endl
;
343 string oid_fp
= obj_fingerprint(oid
);
345 ldout(store
->ctx(), 20) << "oid_fp=" << oid_fp
<< dendl
;
347 int shard
= orphan_shard(oid_fp
);
348 oids
[shard
].push_back(oid
);
350 #define COUNT_BEFORE_FLUSH 1000
352 if (++count
>= COUNT_BEFORE_FLUSH
) {
353 ldout(store
->ctx(), 1) << "iterated through " << total
<< " objects" << dendl
;
354 ret
= log_oids(all_objs_index
, oids
);
356 cerr
<< __func__
<< ": ERROR: log_oids() returned ret=" << ret
<< std::endl
;
363 ret
= log_oids(all_objs_index
, oids
);
365 cerr
<< __func__
<< ": ERROR: log_oids() returned ret=" << ret
<< std::endl
;
372 int RGWOrphanSearch::build_buckets_instance_index()
376 string section
= "bucket.instance";
377 int ret
= store
->ctl()->meta
.mgr
->list_keys_init(section
, &handle
);
379 lderr(store
->ctx()) << "ERROR: can't get key: " << cpp_strerror(-ret
) << dendl
;
383 map
<int, list
<string
> > instances
;
387 RGWObjectCtx
obj_ctx(store
);
394 ret
= store
->ctl()->meta
.mgr
->list_keys_next(handle
, max
, keys
, &truncated
);
396 lderr(store
->ctx()) << "ERROR: lists_keys_next(): " << cpp_strerror(-ret
) << dendl
;
400 for (list
<string
>::iterator iter
= keys
.begin(); iter
!= keys
.end(); ++iter
) {
402 ldout(store
->ctx(), 10) << "bucket_instance=" << *iter
<< " total=" << total
<< dendl
;
403 int shard
= orphan_shard(*iter
);
404 instances
[shard
].push_back(*iter
);
406 if (++count
>= COUNT_BEFORE_FLUSH
) {
407 ret
= log_oids(buckets_instance_index
, instances
);
409 lderr(store
->ctx()) << __func__
<< ": ERROR: log_oids() returned ret=" << ret
<< dendl
;
419 ret
= log_oids(buckets_instance_index
, instances
);
421 lderr(store
->ctx()) << __func__
<< ": ERROR: log_oids() returned ret=" << ret
<< dendl
;
424 store
->ctl()->meta
.mgr
->list_keys_complete(handle
);
429 int RGWOrphanSearch::handle_stat_result(map
<int, list
<string
> >& oids
, RGWRados::Object::Stat::Result
& result
)
431 set
<string
> obj_oids
;
432 rgw_bucket
& bucket
= result
.obj
.bucket
;
433 if (!result
.manifest
) { /* a very very old object, or part of a multipart upload during upload */
434 const string loc
= bucket
.bucket_id
+ "_" + result
.obj
.get_oid();
435 obj_oids
.insert(obj_fingerprint(loc
));
438 * multipart parts don't have manifest on them, it's in the meta object. Instead of reading the
439 * meta object, just add a "shadow" object to the mix
441 obj_oids
.insert(obj_fingerprint(loc
, "shadow"));
443 RGWObjManifest
& manifest
= *result
.manifest
;
445 if (!detailed_mode
&&
446 manifest
.get_obj_size() <= manifest
.get_head_size()) {
447 ldout(store
->ctx(), 5) << "skipping object as it fits in a head" << dendl
;
451 RGWObjManifest::obj_iterator miter
;
452 for (miter
= manifest
.obj_begin(); miter
!= manifest
.obj_end(); ++miter
) {
453 const rgw_raw_obj
& loc
= miter
.get_location().get_raw_obj(store
->getRados());
455 obj_oids
.insert(obj_fingerprint(s
));
459 for (set
<string
>::iterator iter
= obj_oids
.begin(); iter
!= obj_oids
.end(); ++iter
) {
460 ldout(store
->ctx(), 20) << __func__
<< ": oid for obj=" << result
.obj
<< ": " << *iter
<< dendl
;
462 int shard
= orphan_shard(*iter
);
463 oids
[shard
].push_back(*iter
);
469 int RGWOrphanSearch::pop_and_handle_stat_op(map
<int, list
<string
> >& oids
, std::deque
<RGWRados::Object::Stat
>& ops
)
471 RGWRados::Object::Stat
& front_op
= ops
.front();
473 int ret
= front_op
.wait();
475 if (ret
!= -ENOENT
) {
476 lderr(store
->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret
) << dendl
;
480 ret
= handle_stat_result(oids
, front_op
.result
);
482 lderr(store
->ctx()) << "ERROR: handle_stat_response() returned error: " << cpp_strerror(-ret
) << dendl
;
489 int RGWOrphanSearch::build_linked_oids_for_bucket(const string
& bucket_instance_id
, map
<int, list
<string
> >& oids
)
491 RGWObjectCtx
obj_ctx(store
);
492 auto sysobj_ctx
= store
->svc()->sysobj
->init_obj_ctx();
494 rgw_bucket orphan_bucket
;
496 int ret
= rgw_bucket_parse_bucket_key(store
->ctx(), bucket_instance_id
,
497 &orphan_bucket
, &shard_id
);
499 ldout(store
->ctx(),0) << __func__
<< " failed to parse bucket instance: "
500 << bucket_instance_id
<< " skipping" << dendl
;
504 RGWBucketInfo cur_bucket_info
;
505 ret
= store
->getRados()->get_bucket_info(store
->svc(), orphan_bucket
.tenant
,
506 orphan_bucket
.name
, cur_bucket_info
, nullptr, null_yield
);
508 if (ret
== -ENOENT
) {
509 /* probably raced with bucket removal */
512 lderr(store
->ctx()) << __func__
<< ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << ret
<< dendl
;
516 if (cur_bucket_info
.bucket
.bucket_id
!= orphan_bucket
.bucket_id
) {
517 ldout(store
->ctx(), 0) << __func__
<< ": Skipping stale bucket instance: "
518 << orphan_bucket
.name
<< ": "
519 << orphan_bucket
.bucket_id
<< dendl
;
523 if (cur_bucket_info
.reshard_status
== cls_rgw_reshard_status::IN_PROGRESS
) {
524 ldout(store
->ctx(), 0) << __func__
<< ": reshard in progress. Skipping "
525 << orphan_bucket
.name
<< ": "
526 << orphan_bucket
.bucket_id
<< dendl
;
530 RGWBucketInfo bucket_info
;
531 ret
= store
->getRados()->get_bucket_instance_info(sysobj_ctx
, bucket_instance_id
, bucket_info
, nullptr, nullptr, null_yield
);
533 if (ret
== -ENOENT
) {
534 /* probably raced with bucket removal */
537 lderr(store
->ctx()) << __func__
<< ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << ret
<< dendl
;
541 ldout(store
->ctx(), 10) << "building linked oids for bucket instance: " << bucket_instance_id
<< dendl
;
542 RGWRados::Bucket
target(store
->getRados(), bucket_info
);
543 RGWRados::Bucket::List
list_op(&target
);
546 list_op
.params
.marker
= rgw_obj_key(marker
);
547 list_op
.params
.list_versions
= true;
548 list_op
.params
.enforce_ns
= false;
552 deque
<RGWRados::Object::Stat
> stat_ops
;
555 vector
<rgw_bucket_dir_entry
> result
;
557 ret
= list_op
.list_objects(max_list_bucket_entries
,
558 &result
, nullptr, &truncated
, null_yield
);
560 cerr
<< "ERROR: store->list_objects(): " << cpp_strerror(-ret
) << std::endl
;
564 for (vector
<rgw_bucket_dir_entry
>::iterator iter
= result
.begin(); iter
!= result
.end(); ++iter
) {
565 rgw_bucket_dir_entry
& entry
= *iter
;
566 if (entry
.key
.instance
.empty()) {
567 ldout(store
->ctx(), 20) << "obj entry: " << entry
.key
.name
<< dendl
;
569 ldout(store
->ctx(), 20) << "obj entry: " << entry
.key
.name
<< " [" << entry
.key
.instance
<< "]" << dendl
;
572 ldout(store
->ctx(), 20) << __func__
<< ": entry.key.name=" << entry
.key
.name
<< " entry.key.instance=" << entry
.key
.instance
<< dendl
;
574 if (!detailed_mode
&&
575 entry
.meta
.accounted_size
<= (uint64_t)store
->ctx()->_conf
->rgw_max_chunk_size
) {
576 ldout(store
->ctx(),5) << __func__
<< "skipping stat as the object " << entry
.key
.name
577 << "fits in a head" << dendl
;
581 rgw_obj
obj(bucket_info
.bucket
, entry
.key
);
583 RGWRados::Object
op_target(store
->getRados(), bucket_info
, obj_ctx
, obj
);
585 stat_ops
.push_back(RGWRados::Object::Stat(&op_target
));
586 RGWRados::Object::Stat
& op
= stat_ops
.back();
589 ret
= op
.stat_async();
591 lderr(store
->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret
) << dendl
;
594 if (stat_ops
.size() >= max_concurrent_ios
) {
595 ret
= pop_and_handle_stat_op(oids
, stat_ops
);
597 if (ret
!= -ENOENT
) {
598 lderr(store
->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret
) << dendl
;
602 if (oids
.size() >= COUNT_BEFORE_FLUSH
) {
603 ret
= log_oids(linked_objs_index
, oids
);
605 cerr
<< __func__
<< ": ERROR: log_oids() returned ret=" << ret
<< std::endl
;
613 while (!stat_ops
.empty()) {
614 ret
= pop_and_handle_stat_op(oids
, stat_ops
);
616 if (ret
!= -ENOENT
) {
617 lderr(store
->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret
) << dendl
;
625 int RGWOrphanSearch::build_linked_oids_index()
627 map
<int, list
<string
> > oids
;
628 map
<int, string
>::iterator iter
= buckets_instance_index
.find(search_stage
.shard
);
629 for (; iter
!= buckets_instance_index
.end(); ++iter
) {
630 ldout(store
->ctx(), 0) << "building linked oids index: " << iter
->first
<< "/" << buckets_instance_index
.size() << dendl
;
633 string oid
= iter
->second
;
636 map
<string
, bufferlist
> entries
;
637 int ret
= orphan_store
.read_entries(oid
, search_stage
.marker
, &entries
, &truncated
);
638 if (ret
== -ENOENT
) {
644 lderr(store
->ctx()) << __func__
<< ": ERROR: read_entries() oid=" << oid
<< " returned ret=" << ret
<< dendl
;
648 if (entries
.empty()) {
652 for (map
<string
, bufferlist
>::iterator eiter
= entries
.begin(); eiter
!= entries
.end(); ++eiter
) {
653 ldout(store
->ctx(), 20) << " indexed entry: " << eiter
->first
<< dendl
;
654 ret
= build_linked_oids_for_bucket(eiter
->first
, oids
);
656 lderr(store
->ctx()) << __func__
<< ": ERROR: build_linked_oids_for_bucket() indexed entry=" << eiter
->first
657 << " returned ret=" << ret
<< dendl
;
662 search_stage
.shard
= iter
->first
;
663 search_stage
.marker
= entries
.rbegin()->first
; /* last entry */
666 search_stage
.marker
.clear();
669 int ret
= log_oids(linked_objs_index
, oids
);
671 cerr
<< __func__
<< ": ERROR: log_oids() returned ret=" << ret
<< std::endl
;
677 cerr
<< __func__
<< ": ERROR: failed to write state ret=" << ret
<< std::endl
;
685 librados::IoCtx ioctx
;
688 map
<string
, bufferlist
> entries
;
689 map
<string
, bufferlist
>::iterator iter
;
694 OMAPReader(librados::IoCtx
& _ioctx
, const string
& _oid
) : ioctx(_ioctx
), oid(_oid
), truncated(true) {
695 iter
= entries
.end();
698 int get_next(string
*key
, bufferlist
*pbl
, bool *done
);
701 int OMAPReader::get_next(string
*key
, bufferlist
*pbl
, bool *done
)
703 if (iter
!= entries
.end()) {
719 #define MAX_OMAP_GET_ENTRIES 100
720 int ret
= ioctx
.omap_get_vals(oid
, marker
, MAX_OMAP_GET_ENTRIES
, &entries
);
722 if (ret
== -ENOENT
) {
729 truncated
= (entries
.size() == MAX_OMAP_GET_ENTRIES
);
730 iter
= entries
.begin();
731 return get_next(key
, pbl
, done
);
734 int RGWOrphanSearch::compare_oid_indexes()
736 ceph_assert(linked_objs_index
.size() == all_objs_index
.size());
738 librados::IoCtx
& ioctx
= orphan_store
.get_ioctx();
740 librados::IoCtx data_ioctx
;
742 int ret
= rgw_init_ioctx(store
->getRados()->get_rados_handle(), search_info
.pool
, data_ioctx
);
744 lderr(store
->ctx()) << __func__
<< ": rgw_init_ioctx() returned ret=" << ret
<< dendl
;
748 uint64_t time_threshold
= search_info
.start_time
.sec() - stale_secs
;
750 map
<int, string
>::iterator liter
= linked_objs_index
.begin();
751 map
<int, string
>::iterator aiter
= all_objs_index
.begin();
753 for (; liter
!= linked_objs_index
.end(); ++liter
, ++aiter
) {
754 OMAPReader
linked_entries(ioctx
, liter
->second
);
755 OMAPReader
all_entries(ioctx
, aiter
->second
);
760 bool linked_done
= false;
765 int r
= all_entries
.get_next(&key
, NULL
, &done
);
773 string key_fp
= obj_fingerprint(key
);
775 while (cur_linked
< key_fp
&& !linked_done
) {
776 r
= linked_entries
.get_next(&cur_linked
, NULL
, &linked_done
);
782 if (cur_linked
== key_fp
) {
783 ldout(store
->ctx(), 20) << "linked: " << key
<< dendl
;
788 r
= data_ioctx
.stat(key
, NULL
, &mtime
);
791 lderr(store
->ctx()) << "ERROR: ioctx.stat(" << key
<< ") returned ret=" << r
<< dendl
;
795 if (stale_secs
&& (uint64_t)mtime
>= time_threshold
) {
796 ldout(store
->ctx(), 20) << "skipping: " << key
<< " (mtime=" << mtime
<< " threshold=" << time_threshold
<< ")" << dendl
;
799 ldout(store
->ctx(), 20) << "leaked: " << key
<< dendl
;
800 cout
<< "leaked: " << key
<< std::endl
;
807 int RGWOrphanSearch::run()
811 switch (search_stage
.stage
) {
813 case ORPHAN_SEARCH_STAGE_INIT
:
814 ldout(store
->ctx(), 0) << __func__
<< "(): initializing state" << dendl
;
815 search_stage
= RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSPOOL
);
818 lderr(store
->ctx()) << __func__
<< ": ERROR: failed to save state, ret=" << r
<< dendl
;
822 case ORPHAN_SEARCH_STAGE_LSPOOL
:
823 ldout(store
->ctx(), 0) << __func__
<< "(): building index of all objects in pool" << dendl
;
824 r
= build_all_oids_index();
826 lderr(store
->ctx()) << __func__
<< ": ERROR: build_all_objs_index returned ret=" << r
<< dendl
;
830 search_stage
= RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSBUCKETS
);
833 lderr(store
->ctx()) << __func__
<< ": ERROR: failed to save state, ret=" << r
<< dendl
;
838 case ORPHAN_SEARCH_STAGE_LSBUCKETS
:
839 ldout(store
->ctx(), 0) << __func__
<< "(): building index of all bucket indexes" << dendl
;
840 r
= build_buckets_instance_index();
842 lderr(store
->ctx()) << __func__
<< ": ERROR: build_all_objs_index returned ret=" << r
<< dendl
;
846 search_stage
= RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_ITERATE_BI
);
849 lderr(store
->ctx()) << __func__
<< ": ERROR: failed to save state, ret=" << r
<< dendl
;
855 case ORPHAN_SEARCH_STAGE_ITERATE_BI
:
856 ldout(store
->ctx(), 0) << __func__
<< "(): building index of all linked objects" << dendl
;
857 r
= build_linked_oids_index();
859 lderr(store
->ctx()) << __func__
<< ": ERROR: build_all_objs_index returned ret=" << r
<< dendl
;
863 search_stage
= RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_COMPARE
);
866 lderr(store
->ctx()) << __func__
<< ": ERROR: failed to save state, ret=" << r
<< dendl
;
871 case ORPHAN_SEARCH_STAGE_COMPARE
:
872 r
= compare_oid_indexes();
874 lderr(store
->ctx()) << __func__
<< ": ERROR: build_all_objs_index returned ret=" << r
<< dendl
;
888 int RGWOrphanSearch::remove_index(map
<int, string
>& index
)
890 librados::IoCtx
& ioctx
= orphan_store
.get_ioctx();
892 for (map
<int, string
>::iterator iter
= index
.begin(); iter
!= index
.end(); ++iter
) {
893 int r
= ioctx
.remove(iter
->second
);
896 ldout(store
->ctx(), 0) << "ERROR: couldn't remove " << iter
->second
<< ": ret=" << r
<< dendl
;
903 int RGWOrphanSearch::finish()
905 int r
= remove_index(all_objs_index
);
907 ldout(store
->ctx(), 0) << "ERROR: remove_index(" << all_objs_index
<< ") returned ret=" << r
<< dendl
;
909 r
= remove_index(buckets_instance_index
);
911 ldout(store
->ctx(), 0) << "ERROR: remove_index(" << buckets_instance_index
<< ") returned ret=" << r
<< dendl
;
913 r
= remove_index(linked_objs_index
);
915 ldout(store
->ctx(), 0) << "ERROR: remove_index(" << linked_objs_index
<< ") returned ret=" << r
<< dendl
;
918 r
= orphan_store
.remove_job(search_info
.job_name
);
920 ldout(store
->ctx(), 0) << "ERROR: could not remove job name (" << search_info
.job_name
<< ") ret=" << r
<< dendl
;
927 int RGWRadosList::handle_stat_result(RGWRados::Object::Stat::Result
& result
,
928 std::set
<string
>& obj_oids
)
932 rgw_bucket
& bucket
= result
.obj
.bucket
;
934 ldout(store
->ctx(), 20) << "RGWRadosList::" << __func__
<<
935 " bucket=" << bucket
<<
936 ", has_manifest=" << result
.manifest
.has_value() <<
939 // iterator to store result of dlo/slo attribute find
940 decltype(result
.attrs
)::iterator attr_it
= result
.attrs
.end();
941 const std::string oid
= bucket
.marker
+ "_" + result
.obj
.get_oid();
942 ldout(store
->ctx(), 20) << "radoslist processing object=\"" <<
943 oid
<< "\"" << dendl
;
944 if (visited_oids
.find(oid
) != visited_oids
.end()) {
945 // apparently we hit a loop; don't continue with this oid
946 ldout(store
->ctx(), 15) <<
947 "radoslist stopped loop at already visited object=\"" <<
948 oid
<< "\"" << dendl
;
952 if (!result
.manifest
) {
953 /* a very very old object, or part of a multipart upload during upload */
954 obj_oids
.insert(oid
);
957 * multipart parts don't have manifest on them, it's in the meta
958 * object; we'll process them in
959 * RGWRadosList::do_incomplete_multipart
961 } else if ((attr_it
= result
.attrs
.find(RGW_ATTR_USER_MANIFEST
)) !=
962 result
.attrs
.end()) {
963 // *** handle DLO object ***
965 obj_oids
.insert(oid
);
966 visited_oids
.insert(oid
); // prevent dlo loops
967 ldout(store
->ctx(), 15) << "radoslist added to visited list DLO=\"" <<
968 oid
<< "\"" << dendl
;
970 char* prefix_path_c
= attr_it
->second
.c_str();
971 const std::string
& prefix_path
= prefix_path_c
;
973 const size_t sep_pos
= prefix_path
.find('/');
974 if (string::npos
== sep_pos
) {
978 const std::string bucket_name
= prefix_path
.substr(0, sep_pos
);
979 const std::string prefix
= prefix_path
.substr(sep_pos
+ 1);
981 add_bucket_prefix(bucket_name
, prefix
);
982 ldout(store
->ctx(), 25) << "radoslist DLO oid=\"" << oid
<<
983 "\" added bucket=\"" << bucket_name
<< "\" prefix=\"" <<
984 prefix
<< "\" to process list" << dendl
;
985 } else if ((attr_it
= result
.attrs
.find(RGW_ATTR_SLO_MANIFEST
)) !=
986 result
.attrs
.end()) {
987 // *** handle SLO object ***
989 obj_oids
.insert(oid
);
990 visited_oids
.insert(oid
); // prevent slo loops
991 ldout(store
->ctx(), 15) << "radoslist added to visited list SLO=\"" <<
992 oid
<< "\"" << dendl
;
995 bufferlist::const_iterator bliter
= attr_it
->second
.begin();
997 ::decode(slo_info
, bliter
);
998 } catch (buffer::error
& err
) {
999 ldout(store
->ctx(), 0) <<
1000 "ERROR: failed to decode slo manifest for " << oid
<< dendl
;
1004 for (const auto& iter
: slo_info
.entries
) {
1005 const string
& path_str
= iter
.path
;
1007 const size_t sep_pos
= path_str
.find('/', 1 /* skip initial slash */);
1008 if (string::npos
== sep_pos
) {
1012 std::string bucket_name
;
1013 std::string obj_name
;
1015 bucket_name
= url_decode(path_str
.substr(1, sep_pos
- 1));
1016 obj_name
= url_decode(path_str
.substr(sep_pos
+ 1));
1018 const rgw_obj_key
obj_key(obj_name
);
1019 add_bucket_filter(bucket_name
, obj_key
);
1020 ldout(store
->ctx(), 25) << "radoslist SLO oid=\"" << oid
<<
1021 "\" added bucket=\"" << bucket_name
<< "\" obj_key=\"" <<
1022 obj_key
<< "\" to process list" << dendl
;
1025 RGWObjManifest
& manifest
= *result
.manifest
;
1027 // in multipart, the head object contains no data and just has the
1028 // manifest AND empty objects have no manifest, but they're
1029 // realized as empty rados objects
1030 if (0 == manifest
.get_max_head_size() ||
1031 manifest
.obj_begin() == manifest
.obj_end()) {
1032 obj_oids
.insert(oid
);
1033 // first_insert = true;
1036 RGWObjManifest::obj_iterator miter
;
1037 for (miter
= manifest
.obj_begin(); miter
!= manifest
.obj_end(); ++miter
) {
1038 const rgw_raw_obj
& loc
=
1039 miter
.get_location().get_raw_obj(store
->getRados());
1046 } // RGWRadosList::handle_stat_result
1048 int RGWRadosList::pop_and_handle_stat_op(
1049 RGWObjectCtx
& obj_ctx
,
1050 std::deque
<RGWRados::Object::Stat
>& ops
)
1052 std::set
<string
> obj_oids
;
1053 RGWRados::Object::Stat
& front_op
= ops
.front();
1055 int ret
= front_op
.wait();
1057 if (ret
!= -ENOENT
) {
1058 lderr(store
->ctx()) << "ERROR: stat_async() returned error: " <<
1059 cpp_strerror(-ret
) << dendl
;
1064 ret
= handle_stat_result(front_op
.result
, obj_oids
);
1066 lderr(store
->ctx()) << "ERROR: handle_stat_result() returned error: " <<
1067 cpp_strerror(-ret
) << dendl
;
1071 for (const auto& o
: obj_oids
) {
1072 std::cout
<< o
<< std::endl
;
1077 // invalidate object context for this object to avoid memory leak
1078 // (see pr https://github.com/ceph/ceph/pull/30174)
1079 obj_ctx
.invalidate(front_op
.result
.obj
);
1086 #if 0 // code that may be the basis for expansion
1087 int RGWRadosList::build_buckets_instance_index()
1091 string section
= "bucket.instance";
1092 int ret
= store
->meta_mgr
->list_keys_init(section
, &handle
);
1094 lderr(store
->ctx()) << "ERROR: can't get key: " << cpp_strerror(-ret
) << dendl
;
1098 map
<int, list
<string
> > instances
;
1102 RGWObjectCtx
obj_ctx(store
);
1109 ret
= store
->meta_mgr
->list_keys_next(handle
, max
, keys
, &truncated
);
1111 lderr(store
->ctx()) << "ERROR: lists_keys_next(): " << cpp_strerror(-ret
) << dendl
;
1115 for (list
<string
>::iterator iter
= keys
.begin(); iter
!= keys
.end(); ++iter
) {
1117 ldout(store
->ctx(), 10) << "bucket_instance=" << *iter
<< " total=" << total
<< dendl
;
1118 int shard
= orphan_shard(*iter
);
1119 instances
[shard
].push_back(*iter
);
1121 if (++count
>= COUNT_BEFORE_FLUSH
) {
1122 ret
= log_oids(buckets_instance_index
, instances
);
1124 lderr(store
->ctx()) << __func__
<< ": ERROR: log_oids() returned ret=" << ret
<< dendl
;
1131 } while (truncated
);
1133 ret
= log_oids(buckets_instance_index
, instances
);
1135 lderr(store
->ctx()) << __func__
<< ": ERROR: log_oids() returned ret=" << ret
<< dendl
;
1138 store
->meta_mgr
->list_keys_complete(handle
);
1145 int RGWRadosList::process_bucket(
1146 const std::string
& bucket_instance_id
,
1147 const std::string
& prefix
,
1148 const std::set
<rgw_obj_key
>& entries_filter
)
1150 ldout(store
->ctx(), 10) << "RGWRadosList::" << __func__
<<
1151 " bucket_instance_id=" << bucket_instance_id
<<
1152 ", prefix=" << prefix
<<
1153 ", entries_filter.size=" << entries_filter
.size() << dendl
;
1155 RGWBucketInfo bucket_info
;
1156 RGWSysObjectCtx sys_obj_ctx
= store
->svc()->sysobj
->init_obj_ctx();
1157 int ret
= store
->getRados()->get_bucket_instance_info(sys_obj_ctx
,
1164 if (ret
== -ENOENT
) {
1165 // probably raced with bucket removal
1168 lderr(store
->ctx()) << __func__
<<
1169 ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" <<
1174 RGWRados::Bucket
target(store
->getRados(), bucket_info
);
1175 RGWRados::Bucket::List
list_op(&target
);
1178 list_op
.params
.marker
= rgw_obj_key(marker
);
1179 list_op
.params
.list_versions
= true;
1180 list_op
.params
.enforce_ns
= false;
1181 list_op
.params
.allow_unordered
= false;
1182 list_op
.params
.prefix
= prefix
;
1186 std::deque
<RGWRados::Object::Stat
> stat_ops
;
1187 std::string prev_versioned_key_name
= "";
1189 RGWObjectCtx
obj_ctx(store
);
1192 std::vector
<rgw_bucket_dir_entry
> result
;
1194 constexpr int64_t LIST_OBJS_MAX_ENTRIES
= 100;
1195 ret
= list_op
.list_objects(LIST_OBJS_MAX_ENTRIES
, &result
,
1196 NULL
, &truncated
, null_yield
);
1197 if (ret
== -ENOENT
) {
1198 // race with bucket delete?
1201 } else if (ret
< 0) {
1202 std::cerr
<< "ERROR: store->list_objects(): " << cpp_strerror(-ret
) <<
1207 for (std::vector
<rgw_bucket_dir_entry
>::iterator iter
= result
.begin();
1208 iter
!= result
.end();
1210 rgw_bucket_dir_entry
& entry
= *iter
;
1212 if (entry
.key
.instance
.empty()) {
1213 ldout(store
->ctx(), 20) << "obj entry: " << entry
.key
.name
<< dendl
;
1215 ldout(store
->ctx(), 20) << "obj entry: " << entry
.key
.name
<<
1216 " [" << entry
.key
.instance
<< "]" << dendl
;
1219 ldout(store
->ctx(), 20) << __func__
<< ": entry.key.name=" <<
1220 entry
.key
.name
<< " entry.key.instance=" << entry
.key
.instance
<<
1223 // ignore entries that are not in the filter if there is a filter
1224 if (!entries_filter
.empty() &&
1225 entries_filter
.find(entry
.key
) == entries_filter
.cend()) {
1229 // we need to do this in two cases below, so use a lambda
1231 [&](const rgw_obj_key
& key
) -> int {
1234 rgw_obj
obj(bucket_info
.bucket
, key
);
1236 RGWRados::Object
op_target(store
->getRados(), bucket_info
,
1239 stat_ops
.push_back(RGWRados::Object::Stat(&op_target
));
1240 RGWRados::Object::Stat
& op
= stat_ops
.back();
1242 ret
= op
.stat_async();
1244 lderr(store
->ctx()) << "ERROR: stat_async() returned error: " <<
1245 cpp_strerror(-ret
) << dendl
;
1249 if (stat_ops
.size() >= max_concurrent_ios
) {
1250 ret
= pop_and_handle_stat_op(obj_ctx
, stat_ops
);
1252 if (ret
!= -ENOENT
) {
1253 lderr(store
->ctx()) <<
1254 "ERROR: pop_and_handle_stat_op() returned error: " <<
1255 cpp_strerror(-ret
) << dendl
;
1258 // clear error, so we'll continue processing directory
1264 }; // do_stat_key lambda
1266 // for versioned objects, make sure the head object is handled
1267 // as well by ignoring the instance identifier
1268 if (!entry
.key
.instance
.empty() &&
1269 entry
.key
.name
!= prev_versioned_key_name
) {
1270 // don't do the same key twice; even though out bucket index
1271 // listing allows unordered, since all versions of an object
1272 // use the same bucket index key, they'll all end up together
1274 prev_versioned_key_name
= entry
.key
.name
;
1276 rgw_obj_key
uninstanced(entry
.key
.name
);
1278 ret
= do_stat_key(uninstanced
);
1284 ret
= do_stat_key(entry
.key
);
1289 } while (truncated
);
1291 while (!stat_ops
.empty()) {
1292 ret
= pop_and_handle_stat_op(obj_ctx
, stat_ops
);
1294 if (ret
!= -ENOENT
) {
1295 lderr(store
->ctx()) << "ERROR: stat_async() returned error: " <<
1296 cpp_strerror(-ret
) << dendl
;
1305 int RGWRadosList::run()
1308 void* handle
= nullptr;
1310 ret
= store
->ctl()->meta
.mgr
->list_keys_init("bucket", &handle
);
1312 lderr(store
->ctx()) << "RGWRadosList::" << __func__
<<
1313 " ERROR: list_keys_init returned " <<
1314 cpp_strerror(-ret
) << dendl
;
1318 const int max_keys
= 1000;
1319 bool truncated
= true;
1322 std::list
<std::string
> buckets
;
1323 ret
= store
->ctl()->meta
.mgr
->list_keys_next(handle
, max_keys
,
1324 buckets
, &truncated
);
1326 for (std::string
& bucket_id
: buckets
) {
1327 ret
= run(bucket_id
);
1328 if (ret
== -ENOENT
) {
1330 } else if (ret
< 0) {
1334 } while (truncated
);
1337 } // RGWRadosList::run()
1340 int RGWRadosList::run(const std::string
& start_bucket_name
)
1342 RGWSysObjectCtx sys_obj_ctx
= store
->svc()->sysobj
->init_obj_ctx();
1343 RGWObjectCtx
obj_ctx(store
);
1346 add_bucket_entire(start_bucket_name
);
1348 while (! bucket_process_map
.empty()) {
1349 // pop item from map and capture its key data
1350 auto front
= bucket_process_map
.begin();
1351 std::string bucket_name
= front
->first
;
1353 std::swap(process
, front
->second
);
1354 bucket_process_map
.erase(front
);
1356 RGWBucketInfo bucket_info
;
1357 ret
= store
->getRados()->get_bucket_info(store
->svc(),
1363 if (ret
== -ENOENT
) {
1364 std::cerr
<< "WARNING: bucket " << bucket_name
<<
1365 " does not exist; could it have been deleted very recently?" <<
1368 } else if (ret
< 0) {
1369 std::cerr
<< "ERROR: could not get info for bucket " << bucket_name
<<
1370 " -- " << cpp_strerror(-ret
) << std::endl
;
1374 const std::string bucket_id
= bucket_info
.bucket
.get_key();
1376 static const std::set
<rgw_obj_key
> empty_filter
;
1377 static const std::string empty_prefix
;
1379 auto do_process_bucket
=
1381 (const std::string
& prefix
,
1382 const std::set
<rgw_obj_key
>& entries_filter
) -> int {
1383 int ret
= process_bucket(bucket_id
, prefix
, entries_filter
);
1384 if (ret
== -ENOENT
) {
1385 // bucket deletion race?
1388 lderr(store
->ctx()) << "RGWRadosList::" << __func__
<<
1389 ": ERROR: process_bucket(); bucket_id=" <<
1390 bucket_id
<< " returned ret=" << ret
<< dendl
;
1396 // either process the whole bucket *or* process the filters and/or
1398 if (process
.entire_container
) {
1399 ret
= do_process_bucket(empty_prefix
, empty_filter
);
1404 if (! process
.filter_keys
.empty()) {
1405 ret
= do_process_bucket(empty_prefix
, process
.filter_keys
);
1410 for (const auto& p
: process
.prefixes
) {
1411 ret
= do_process_bucket(p
, empty_filter
);
1417 } // while (! bucket_process_map.empty())
1419 // now handle incomplete multipart uploads by going back to the
1422 RGWBucketInfo bucket_info
;
1423 ret
= store
->getRados()->get_bucket_info(store
->svc(),
1429 if (ret
== -ENOENT
) {
1430 // bucket deletion race?
1432 } else if (ret
< 0) {
1433 lderr(store
->ctx()) << "RGWRadosList::" << __func__
<<
1434 ": ERROR: get_bucket_info returned ret=" << ret
<< dendl
;
1438 ret
= do_incomplete_multipart(store
, bucket_info
);
1440 lderr(store
->ctx()) << "RGWRadosList::" << __func__
<<
1441 ": ERROR: do_incomplete_multipart returned ret=" << ret
<< dendl
;
1446 } // RGWRadosList::run(string)
1449 int RGWRadosList::do_incomplete_multipart(
1450 rgw::sal::RGWRadosStore
* store
,
1451 RGWBucketInfo
& bucket_info
)
1453 constexpr int max_uploads
= 1000;
1454 constexpr int max_parts
= 1000;
1455 static const std::string mp_ns
= RGW_OBJ_NS_MULTIPART
;
1456 static MultipartMetaFilter mp_filter
;
1460 RGWRados::Bucket
target(store
->getRados(), bucket_info
);
1461 RGWRados::Bucket::List
list_op(&target
);
1462 list_op
.params
.ns
= mp_ns
;
1463 list_op
.params
.filter
= &mp_filter
;
1464 // use empty string for initial list_op.params.marker
1465 // use empty strings for list_op.params.{prefix,delim}
1467 bool is_listing_truncated
;
1470 std::vector
<rgw_bucket_dir_entry
> objs
;
1471 std::map
<string
, bool> common_prefixes
;
1472 ret
= list_op
.list_objects(max_uploads
, &objs
, &common_prefixes
,
1473 &is_listing_truncated
, null_yield
);
1474 if (ret
== -ENOENT
) {
1475 // could bucket have been removed while this is running?
1476 ldout(store
->ctx(), 20) << "RGWRadosList::" << __func__
<<
1477 ": WARNING: call to list_objects of multipart namespace got ENOENT; "
1478 "assuming bucket removal race" << dendl
;
1480 } else if (ret
< 0) {
1481 lderr(store
->ctx()) << "RGWRadosList::" << __func__
<<
1482 ": ERROR: list_objects op returned ret=" << ret
<< dendl
;
1486 if (!objs
.empty()) {
1487 std::vector
<RGWMultipartUploadEntry
> uploads
;
1488 RGWMultipartUploadEntry entry
;
1489 for (const rgw_bucket_dir_entry
& obj
: objs
) {
1490 const rgw_obj_key
& key
= obj
.key
;
1491 if (!entry
.mp
.from_meta(key
.name
)) {
1492 // we only want the meta objects, so skip all the components
1496 uploads
.push_back(entry
);
1497 ldout(store
->ctx(), 20) << "RGWRadosList::" << __func__
<<
1498 " processing incomplete multipart entry " <<
1502 // now process the uploads vector
1503 int parts_marker
= 0;
1504 bool is_parts_truncated
= false;
1506 map
<uint32_t, RGWUploadPartInfo
> parts
;
1508 for (const auto& upload
: uploads
) {
1509 const RGWMPObj
& mp
= upload
.mp
;
1510 ret
= list_multipart_parts(store
, bucket_info
, store
->ctx(),
1511 mp
.get_upload_id(), mp
.get_meta(),
1513 parts_marker
, parts
, NULL
, &is_parts_truncated
);
1514 if (ret
== -ENOENT
) {
1516 } else if (ret
< 0) {
1517 lderr(store
->ctx()) << "RGWRadosList::" << __func__
<<
1518 ": ERROR: list_multipart_parts returned ret=" << ret
<< dendl
;
1522 for (auto& p
: parts
) {
1523 RGWObjManifest
& manifest
= p
.second
.manifest
;
1524 for (auto obj_it
= manifest
.obj_begin();
1525 obj_it
!= manifest
.obj_end();
1527 const rgw_raw_obj
& loc
=
1528 obj_it
.get_location().get_raw_obj(store
->getRados());
1529 std::cout
<< loc
.oid
<< std::endl
;
1533 } while (is_parts_truncated
);
1534 } // if objs not empty
1535 } while (is_listing_truncated
);
1538 } // RGWRadosList::do_incomplete_multipart