]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_orphan.cc
bump version to 18.2.4-pve3
[ceph.git] / ceph / src / rgw / rgw_orphan.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <string>
5
6
7 #include "common/config.h"
8 #include "common/Formatter.h"
9 #include "common/errno.h"
10
11 #include "rgw_op.h"
12 #include "rgw_multi.h"
13 #include "rgw_orphan.h"
14 #include "rgw_zone.h"
15 #include "rgw_bucket.h"
16 #include "rgw_sal_rados.h"
17
18 #include "services/svc_zone.h"
19
20 #define dout_subsys ceph_subsys_rgw
21
22 #define DEFAULT_NUM_SHARDS 64
23
24 using namespace std;
25
26 static string obj_fingerprint(const string& oid, const char *force_ns = NULL)
27 {
28 ssize_t pos = oid.find('_');
29 if (pos < 0) {
30 cerr << "ERROR: object does not have a bucket marker: " << oid << std::endl;
31 }
32
33 string obj_marker = oid.substr(0, pos);
34
35 rgw_obj_key key;
36
37 rgw_obj_key::parse_raw_oid(oid.substr(pos + 1), &key);
38
39 if (key.ns.empty()) {
40 return oid;
41 }
42
43 string s = oid;
44
45 if (force_ns) {
46 rgw_bucket b;
47 rgw_obj new_obj(b, key);
48 s = obj_marker + "_" + new_obj.get_oid();
49 }
50
51 /* cut out suffix */
52 size_t i = s.size() - 1;
53 for (; i >= s.size() - 10; --i) {
54 char c = s[i];
55 if (!isdigit(c) && c != '.' && c != '_') {
56 break;
57 }
58 }
59
60 return s.substr(0, i + 1);
61 }
62
63 int RGWOrphanStore::read_job(const string& job_name, RGWOrphanSearchState & state)
64 {
65 set<string> keys;
66 map<string, bufferlist> vals;
67 keys.insert(job_name);
68 int r = ioctx.omap_get_vals_by_keys(oid, keys, &vals);
69 if (r < 0) {
70 return r;
71 }
72
73 map<string, bufferlist>::iterator iter = vals.find(job_name);
74 if (iter == vals.end()) {
75 return -ENOENT;
76 }
77
78 try {
79 bufferlist& bl = iter->second;
80 decode(state, bl);
81 } catch (buffer::error& err) {
82 lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl;
83 return -EIO;
84 }
85
86 return 0;
87 }
88
89 int RGWOrphanStore::write_job(const string& job_name, const RGWOrphanSearchState& state)
90 {
91 map<string, bufferlist> vals;
92 bufferlist bl;
93 encode(state, bl);
94 vals[job_name] = bl;
95 int r = ioctx.omap_set(oid, vals);
96 if (r < 0) {
97 return r;
98 }
99
100 return 0;
101 }
102
103 int RGWOrphanStore::remove_job(const string& job_name)
104 {
105 set<string> keys;
106 keys.insert(job_name);
107
108 int r = ioctx.omap_rm_keys(oid, keys);
109 if (r < 0) {
110 return r;
111 }
112
113 return 0;
114 }
115
116 int RGWOrphanStore::list_jobs(map <string,RGWOrphanSearchState>& job_list)
117 {
118 map <string,bufferlist> vals;
119 int MAX_READ=1024;
120 string marker="";
121 int r = 0;
122
123 // loop through all the omap vals from index object, storing them to job_list,
124 // read in batches of 1024, we update the marker every iteration and exit the
125 // loop when we find that total size read out is less than batch size
126 do {
127 r = ioctx.omap_get_vals(oid, marker, MAX_READ, &vals);
128 if (r < 0) {
129 return r;
130 }
131 r = vals.size();
132
133 for (const auto &it : vals) {
134 marker=it.first;
135 RGWOrphanSearchState state;
136 try {
137 bufferlist bl = it.second;
138 decode(state, bl);
139 } catch (buffer::error& err) {
140 lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl;
141 return -EIO;
142 }
143 job_list[it.first] = state;
144 }
145 } while (r == MAX_READ);
146
147 return 0;
148 }
149
150 int RGWOrphanStore::init(const DoutPrefixProvider *dpp)
151 {
152 const rgw_pool& log_pool = static_cast<rgw::sal::RadosStore*>(store)->svc()->zone->get_zone_params().log_pool;
153 int r = rgw_init_ioctx(dpp, static_cast<rgw::sal::RadosStore*>(store)->getRados()->get_rados_handle(), log_pool, ioctx);
154 if (r < 0) {
155 cerr << "ERROR: failed to open log pool (" << log_pool << " ret=" << r << std::endl;
156 return r;
157 }
158
159 return 0;
160 }
161
162 int RGWOrphanStore::store_entries(const DoutPrefixProvider *dpp, const string& oid, const map<string, bufferlist>& entries)
163 {
164 librados::ObjectWriteOperation op;
165 op.omap_set(entries);
166 cout << "storing " << entries.size() << " entries at " << oid << std::endl;
167 ldpp_dout(dpp, 20) << "storing " << entries.size() << " entries at " << oid << ": " << dendl;
168 for (map<string, bufferlist>::const_iterator iter = entries.begin(); iter != entries.end(); ++iter) {
169 ldpp_dout(dpp, 20) << " > " << iter->first << dendl;
170 }
171 int ret = rgw_rados_operate(dpp, ioctx, oid, &op, null_yield);
172 if (ret < 0) {
173 ldpp_dout(dpp, -1) << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << ret << dendl;
174 }
175
176 return 0;
177 }
178
179 int RGWOrphanStore::read_entries(const string& oid, const string& marker, map<string, bufferlist> *entries, bool *truncated)
180 {
181 #define MAX_OMAP_GET 100
182 int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET, entries);
183 if (ret < 0 && ret != -ENOENT) {
184 cerr << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << cpp_strerror(-ret) << std::endl;
185 }
186
187 *truncated = (entries->size() == MAX_OMAP_GET);
188
189 return 0;
190 }
191
192 int RGWOrphanSearch::init(const DoutPrefixProvider *dpp, const string& job_name, RGWOrphanSearchInfo *info, bool _detailed_mode)
193 {
194 int r = orphan_store.init(dpp);
195 if (r < 0) {
196 return r;
197 }
198
199 constexpr int64_t MAX_LIST_OBJS_ENTRIES=100;
200
201 max_list_bucket_entries = std::max(store->ctx()->_conf->rgw_list_bucket_min_readahead,
202 MAX_LIST_OBJS_ENTRIES);
203
204 detailed_mode = _detailed_mode;
205 RGWOrphanSearchState state;
206 r = orphan_store.read_job(job_name, state);
207 if (r < 0 && r != -ENOENT) {
208 ldpp_dout(dpp, -1) << "ERROR: failed to read state ret=" << r << dendl;
209 return r;
210 }
211
212 if (r == 0) {
213 search_info = state.info;
214 search_stage = state.stage;
215 } else if (info) { /* r == -ENOENT, initiate a new job if info was provided */
216 search_info = *info;
217 search_info.job_name = job_name;
218 search_info.num_shards = (info->num_shards ? info->num_shards : DEFAULT_NUM_SHARDS);
219 search_info.start_time = ceph_clock_now();
220 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_INIT);
221
222 r = save_state();
223 if (r < 0) {
224 ldpp_dout(dpp, -1) << "ERROR: failed to write state ret=" << r << dendl;
225 return r;
226 }
227 } else {
228 ldpp_dout(dpp, -1) << "ERROR: job not found" << dendl;
229 return r;
230 }
231
232 index_objs_prefix = RGW_ORPHAN_INDEX_PREFIX + string(".");
233 index_objs_prefix += job_name;
234
235 for (int i = 0; i < search_info.num_shards; i++) {
236 char buf[128];
237
238 snprintf(buf, sizeof(buf), "%s.rados.%d", index_objs_prefix.c_str(), i);
239 all_objs_index[i] = buf;
240
241 snprintf(buf, sizeof(buf), "%s.buckets.%d", index_objs_prefix.c_str(), i);
242 buckets_instance_index[i] = buf;
243
244 snprintf(buf, sizeof(buf), "%s.linked.%d", index_objs_prefix.c_str(), i);
245 linked_objs_index[i] = buf;
246 }
247 return 0;
248 }
249
250 int RGWOrphanSearch::log_oids(const DoutPrefixProvider *dpp, map<int, string>& log_shards, map<int, list<string> >& oids)
251 {
252 map<int, list<string> >::iterator miter = oids.begin();
253
254 list<log_iter_info> liters; /* a list of iterator pairs for begin and end */
255
256 for (; miter != oids.end(); ++miter) {
257 log_iter_info info;
258 info.oid = log_shards[miter->first];
259 info.cur = miter->second.begin();
260 info.end = miter->second.end();
261 liters.push_back(info);
262 }
263
264 list<log_iter_info>::iterator list_iter;
265 while (!liters.empty()) {
266 list_iter = liters.begin();
267
268 while (list_iter != liters.end()) {
269 log_iter_info& cur_info = *list_iter;
270
271 list<string>::iterator& cur = cur_info.cur;
272 list<string>::iterator& end = cur_info.end;
273
274 map<string, bufferlist> entries;
275 #define MAX_OMAP_SET_ENTRIES 100
276 for (int j = 0; cur != end && j != MAX_OMAP_SET_ENTRIES; ++cur, ++j) {
277 ldpp_dout(dpp, 20) << "adding obj: " << *cur << dendl;
278 entries[*cur] = bufferlist();
279 }
280
281 int ret = orphan_store.store_entries(dpp, cur_info.oid, entries);
282 if (ret < 0) {
283 return ret;
284 }
285 list<log_iter_info>::iterator tmp = list_iter;
286 ++list_iter;
287 if (cur == end) {
288 liters.erase(tmp);
289 }
290 }
291 }
292 return 0;
293 }
294
295 int RGWOrphanSearch::build_all_oids_index(const DoutPrefixProvider *dpp)
296 {
297 librados::IoCtx ioctx;
298
299 int ret = rgw_init_ioctx(dpp, static_cast<rgw::sal::RadosStore*>(store)->getRados()->get_rados_handle(), search_info.pool, ioctx);
300 if (ret < 0) {
301 ldpp_dout(dpp, -1) << __func__ << ": rgw_init_ioctx() returned ret=" << ret << dendl;
302 return ret;
303 }
304
305 ioctx.set_namespace(librados::all_nspaces);
306 librados::NObjectIterator i = ioctx.nobjects_begin();
307 librados::NObjectIterator i_end = ioctx.nobjects_end();
308
309 map<int, list<string> > oids;
310
311 int count = 0;
312 uint64_t total = 0;
313
314 cout << "logging all objects in the pool" << std::endl;
315
316 for (; i != i_end; ++i) {
317 string nspace = i->get_nspace();
318 string oid = i->get_oid();
319 string locator = i->get_locator();
320
321 ssize_t pos = oid.find('_');
322 if (pos < 0) {
323 cout << "unidentified oid: " << oid << ", skipping" << std::endl;
324 /* what is this object, oids should be in the format of <bucket marker>_<obj>,
325 * skip this entry
326 */
327 continue;
328 }
329 string stripped_oid = oid.substr(pos + 1);
330 rgw_obj_key key;
331 if (!rgw_obj_key::parse_raw_oid(stripped_oid, &key)) {
332 cout << "cannot parse oid: " << oid << ", skipping" << std::endl;
333 continue;
334 }
335
336 if (key.ns.empty()) {
337 /* skipping head objects, we don't want to remove these as they are mutable and
338 * cleaning them up is racy (can race with object removal and a later recreation)
339 */
340 cout << "skipping head object: oid=" << oid << std::endl;
341 continue;
342 }
343
344 string oid_fp = obj_fingerprint(oid);
345
346 ldout(store->ctx(), 20) << "oid_fp=" << oid_fp << dendl;
347
348 int shard = orphan_shard(oid_fp);
349 oids[shard].push_back(oid);
350
351 #define COUNT_BEFORE_FLUSH 1000
352 ++total;
353 if (++count >= COUNT_BEFORE_FLUSH) {
354 ldout(store->ctx(), 1) << "iterated through " << total << " objects" << dendl;
355 ret = log_oids(dpp, all_objs_index, oids);
356 if (ret < 0) {
357 cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
358 return ret;
359 }
360 count = 0;
361 oids.clear();
362 }
363 }
364 ret = log_oids(dpp, all_objs_index, oids);
365 if (ret < 0) {
366 cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
367 return ret;
368 }
369
370 return 0;
371 }
372
373 int RGWOrphanSearch::build_buckets_instance_index(const DoutPrefixProvider *dpp)
374 {
375 void *handle;
376 int max = 1000;
377 string section = "bucket.instance";
378 int ret = store->meta_list_keys_init(dpp, section, string(), &handle);
379 if (ret < 0) {
380 ldpp_dout(dpp, -1) << "ERROR: can't get key: " << cpp_strerror(-ret) << dendl;
381 return ret;
382 }
383
384 map<int, list<string> > instances;
385
386 bool truncated;
387
388 RGWObjectCtx obj_ctx(store);
389
390 int count = 0;
391 uint64_t total = 0;
392
393 do {
394 list<string> keys;
395 ret = store->meta_list_keys_next(dpp, handle, max, keys, &truncated);
396 if (ret < 0) {
397 ldpp_dout(dpp, -1) << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
398 return ret;
399 }
400
401 for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) {
402 ++total;
403 ldpp_dout(dpp, 10) << "bucket_instance=" << *iter << " total=" << total << dendl;
404 int shard = orphan_shard(*iter);
405 instances[shard].push_back(*iter);
406
407 if (++count >= COUNT_BEFORE_FLUSH) {
408 ret = log_oids(dpp, buckets_instance_index, instances);
409 if (ret < 0) {
410 ldpp_dout(dpp, -1) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
411 return ret;
412 }
413 count = 0;
414 instances.clear();
415 }
416 }
417
418 } while (truncated);
419
420 store->meta_list_keys_complete(handle);
421
422 ret = log_oids(dpp, buckets_instance_index, instances);
423 if (ret < 0) {
424 ldpp_dout(dpp, -1) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
425 return ret;
426 }
427
428 return 0;
429 }
430
431 int RGWOrphanSearch::handle_stat_result(const DoutPrefixProvider *dpp, map<int, list<string> >& oids, RGWRados::Object::Stat::Result& result)
432 {
433 set<string> obj_oids;
434 rgw_bucket& bucket = result.obj.bucket;
435 if (!result.manifest) { /* a very very old object, or part of a multipart upload during upload */
436 const string loc = bucket.bucket_id + "_" + result.obj.get_oid();
437 obj_oids.insert(obj_fingerprint(loc));
438
439 /*
440 * multipart parts don't have manifest on them, it's in the meta object. Instead of reading the
441 * meta object, just add a "shadow" object to the mix
442 */
443 obj_oids.insert(obj_fingerprint(loc, "shadow"));
444 } else {
445 RGWObjManifest& manifest = *result.manifest;
446
447 if (!detailed_mode &&
448 manifest.get_obj_size() <= manifest.get_head_size()) {
449 ldpp_dout(dpp, 5) << "skipping object as it fits in a head" << dendl;
450 return 0;
451 }
452
453 RGWObjManifest::obj_iterator miter;
454 for (miter = manifest.obj_begin(dpp); miter != manifest.obj_end(dpp); ++miter) {
455 const rgw_raw_obj& loc = miter.get_location().get_raw_obj(store->getRados());
456 string s = loc.oid;
457 obj_oids.insert(obj_fingerprint(s));
458 }
459 }
460
461 for (set<string>::iterator iter = obj_oids.begin(); iter != obj_oids.end(); ++iter) {
462 ldpp_dout(dpp, 20) << __func__ << ": oid for obj=" << result.obj << ": " << *iter << dendl;
463
464 int shard = orphan_shard(*iter);
465 oids[shard].push_back(*iter);
466 }
467
468 return 0;
469 }
470
471 int RGWOrphanSearch::pop_and_handle_stat_op(const DoutPrefixProvider *dpp, map<int, list<string> >& oids, std::deque<RGWRados::Object::Stat>& ops)
472 {
473 RGWRados::Object::Stat& front_op = ops.front();
474
475 int ret = front_op.wait(dpp);
476 if (ret < 0) {
477 if (ret != -ENOENT) {
478 ldpp_dout(dpp, -1) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
479 }
480 goto done;
481 }
482 ret = handle_stat_result(dpp, oids, front_op.result);
483 if (ret < 0) {
484 ldpp_dout(dpp, -1) << "ERROR: handle_stat_response() returned error: " << cpp_strerror(-ret) << dendl;
485 }
486 done:
487 ops.pop_front();
488 return ret;
489 }
490
491 int RGWOrphanSearch::build_linked_oids_for_bucket(const DoutPrefixProvider *dpp, const string& bucket_instance_id, map<int, list<string> >& oids)
492 {
493 RGWObjectCtx obj_ctx(store);
494 rgw_bucket orphan_bucket;
495 int shard_id;
496 int ret = rgw_bucket_parse_bucket_key(store->ctx(), bucket_instance_id,
497 &orphan_bucket, &shard_id);
498 if (ret < 0) {
499 ldpp_dout(dpp, 0) << __func__ << " failed to parse bucket instance: "
500 << bucket_instance_id << " skipping" << dendl;
501 return ret;
502 }
503
504 std::unique_ptr<rgw::sal::Bucket> cur_bucket;
505 ret = store->get_bucket(dpp, nullptr, orphan_bucket, &cur_bucket, null_yield);
506 if (ret < 0) {
507 if (ret == -ENOENT) {
508 /* probably raced with bucket removal */
509 return 0;
510 }
511 ldpp_dout(dpp, -1) << __func__ << ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << ret << dendl;
512 return ret;
513 }
514
515 if (cur_bucket->get_bucket_id() != orphan_bucket.bucket_id) {
516 ldpp_dout(dpp, 0) << __func__ << ": Skipping stale bucket instance: "
517 << orphan_bucket.name << ": "
518 << orphan_bucket.bucket_id << dendl;
519 return 0;
520 }
521
522 if (cur_bucket->get_info().layout.resharding != rgw::BucketReshardState::None) {
523 ldpp_dout(dpp, 0) << __func__ << ": reshard in progress. Skipping "
524 << orphan_bucket.name << ": "
525 << orphan_bucket.bucket_id << dendl;
526 return 0;
527 }
528
529 rgw_bucket b;
530 rgw_bucket_parse_bucket_key(store->ctx(), bucket_instance_id, &b, nullptr);
531 std::unique_ptr<rgw::sal::Bucket> bucket;
532 ret = store->get_bucket(dpp, nullptr, b, &bucket, null_yield);
533 if (ret < 0) {
534 if (ret == -ENOENT) {
535 /* probably raced with bucket removal */
536 return 0;
537 }
538 ldpp_dout(dpp, -1) << __func__ << ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << ret << dendl;
539 return ret;
540 }
541
542 ldpp_dout(dpp, 10) << "building linked oids for bucket instance: " << bucket_instance_id << dendl;
543 RGWRados::Bucket target(store->getRados(), cur_bucket->get_info());
544 RGWRados::Bucket::List list_op(&target);
545
546 string marker;
547 list_op.params.marker = rgw_obj_key(marker);
548 list_op.params.list_versions = true;
549 list_op.params.enforce_ns = false;
550
551 bool truncated;
552
553 deque<RGWRados::Object::Stat> stat_ops;
554
555 do {
556 vector<rgw_bucket_dir_entry> result;
557
558 ret = list_op.list_objects(dpp, max_list_bucket_entries,
559 &result, nullptr, &truncated, null_yield);
560 if (ret < 0) {
561 cerr << "ERROR: store->list_objects(): " << cpp_strerror(-ret) << std::endl;
562 return ret;
563 }
564
565 for (vector<rgw_bucket_dir_entry>::iterator iter = result.begin(); iter != result.end(); ++iter) {
566 rgw_bucket_dir_entry& entry = *iter;
567 if (entry.key.instance.empty()) {
568 ldpp_dout(dpp, 20) << "obj entry: " << entry.key.name << dendl;
569 } else {
570 ldpp_dout(dpp, 20) << "obj entry: " << entry.key.name << " [" << entry.key.instance << "]" << dendl;
571 }
572
573 ldpp_dout(dpp, 20) << __func__ << ": entry.key.name=" << entry.key.name << " entry.key.instance=" << entry.key.instance << dendl;
574
575 if (!detailed_mode &&
576 entry.meta.accounted_size <= (uint64_t)store->ctx()->_conf->rgw_max_chunk_size) {
577 ldpp_dout(dpp, 5) << __func__ << "skipping stat as the object " << entry.key.name
578 << "fits in a head" << dendl;
579 continue;
580 }
581
582 rgw_obj obj(cur_bucket->get_key(), entry.key);
583
584 RGWRados::Object op_target(store->getRados(), cur_bucket->get_info(), obj_ctx, obj);
585
586 stat_ops.push_back(RGWRados::Object::Stat(&op_target));
587 RGWRados::Object::Stat& op = stat_ops.back();
588
589 ret = op.stat_async(dpp);
590 if (ret < 0) {
591 ldpp_dout(dpp, -1) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
592 return ret;
593 }
594 if (stat_ops.size() >= max_concurrent_ios) {
595 ret = pop_and_handle_stat_op(dpp, oids, stat_ops);
596 if (ret < 0) {
597 if (ret != -ENOENT) {
598 ldpp_dout(dpp, -1) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
599 }
600 }
601 }
602 if (oids.size() >= COUNT_BEFORE_FLUSH) {
603 ret = log_oids(dpp, linked_objs_index, oids);
604 if (ret < 0) {
605 cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
606 return ret;
607 }
608 oids.clear();
609 }
610 }
611 } while (truncated);
612
613 while (!stat_ops.empty()) {
614 ret = pop_and_handle_stat_op(dpp, oids, stat_ops);
615 if (ret < 0) {
616 if (ret != -ENOENT) {
617 ldpp_dout(dpp, -1) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
618 }
619 }
620 }
621
622 return 0;
623 }
624
625 int RGWOrphanSearch::build_linked_oids_index(const DoutPrefixProvider *dpp)
626 {
627 map<int, list<string> > oids;
628 map<int, string>::iterator iter = buckets_instance_index.find(search_stage.shard);
629 for (; iter != buckets_instance_index.end(); ++iter) {
630 ldpp_dout(dpp, 0) << "building linked oids index: " << iter->first << "/" << buckets_instance_index.size() << dendl;
631 bool truncated;
632
633 string oid = iter->second;
634
635 do {
636 map<string, bufferlist> entries;
637 int ret = orphan_store.read_entries(oid, search_stage.marker, &entries, &truncated);
638 if (ret == -ENOENT) {
639 truncated = false;
640 ret = 0;
641 }
642
643 if (ret < 0) {
644 ldpp_dout(dpp, -1) << __func__ << ": ERROR: read_entries() oid=" << oid << " returned ret=" << ret << dendl;
645 return ret;
646 }
647
648 if (entries.empty()) {
649 break;
650 }
651
652 for (map<string, bufferlist>::iterator eiter = entries.begin(); eiter != entries.end(); ++eiter) {
653 ldpp_dout(dpp, 20) << " indexed entry: " << eiter->first << dendl;
654 ret = build_linked_oids_for_bucket(dpp, eiter->first, oids);
655 if (ret < 0) {
656 ldpp_dout(dpp, -1) << __func__ << ": ERROR: build_linked_oids_for_bucket() indexed entry=" << eiter->first
657 << " returned ret=" << ret << dendl;
658 return ret;
659 }
660 }
661
662 search_stage.shard = iter->first;
663 search_stage.marker = entries.rbegin()->first; /* last entry */
664 } while (truncated);
665
666 search_stage.marker.clear();
667 }
668
669 int ret = log_oids(dpp, linked_objs_index, oids);
670 if (ret < 0) {
671 cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
672 return ret;
673 }
674
675 ret = save_state();
676 if (ret < 0) {
677 cerr << __func__ << ": ERROR: failed to write state ret=" << ret << std::endl;
678 return ret;
679 }
680
681 return 0;
682 }
683
684 class OMAPReader {
685 librados::IoCtx ioctx;
686 string oid;
687
688 map<string, bufferlist> entries;
689 map<string, bufferlist>::iterator iter;
690 string marker;
691 bool truncated;
692
693 public:
694 OMAPReader(librados::IoCtx& _ioctx, const string& _oid) : ioctx(_ioctx), oid(_oid), truncated(true) {
695 iter = entries.end();
696 }
697
698 int get_next(string *key, bufferlist *pbl, bool *done);
699 };
700
701 int OMAPReader::get_next(string *key, bufferlist *pbl, bool *done)
702 {
703 if (iter != entries.end()) {
704 *key = iter->first;
705 if (pbl) {
706 *pbl = iter->second;
707 }
708 ++iter;
709 *done = false;
710 marker = *key;
711 return 0;
712 }
713
714 if (!truncated) {
715 *done = true;
716 return 0;
717 }
718
719 #define MAX_OMAP_GET_ENTRIES 100
720 int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET_ENTRIES, &entries);
721 if (ret < 0) {
722 if (ret == -ENOENT) {
723 *done = true;
724 return 0;
725 }
726 return ret;
727 }
728
729 truncated = (entries.size() == MAX_OMAP_GET_ENTRIES);
730 iter = entries.begin();
731 return get_next(key, pbl, done);
732 }
733
734 int RGWOrphanSearch::compare_oid_indexes(const DoutPrefixProvider *dpp)
735 {
736 ceph_assert(linked_objs_index.size() == all_objs_index.size());
737
738 librados::IoCtx& ioctx = orphan_store.get_ioctx();
739
740 librados::IoCtx data_ioctx;
741
742 int ret = rgw_init_ioctx(dpp, static_cast<rgw::sal::RadosStore*>(store)->getRados()->get_rados_handle(), search_info.pool, data_ioctx);
743 if (ret < 0) {
744 ldpp_dout(dpp, -1) << __func__ << ": rgw_init_ioctx() returned ret=" << ret << dendl;
745 return ret;
746 }
747
748 uint64_t time_threshold = search_info.start_time.sec() - stale_secs;
749
750 map<int, string>::iterator liter = linked_objs_index.begin();
751 map<int, string>::iterator aiter = all_objs_index.begin();
752
753 for (; liter != linked_objs_index.end(); ++liter, ++aiter) {
754 OMAPReader linked_entries(ioctx, liter->second);
755 OMAPReader all_entries(ioctx, aiter->second);
756
757 bool done;
758
759 string cur_linked;
760 bool linked_done = false;
761
762
763 do {
764 string key;
765 int r = all_entries.get_next(&key, NULL, &done);
766 if (r < 0) {
767 return r;
768 }
769 if (done) {
770 break;
771 }
772
773 string key_fp = obj_fingerprint(key);
774
775 while (cur_linked < key_fp && !linked_done) {
776 r = linked_entries.get_next(&cur_linked, NULL, &linked_done);
777 if (r < 0) {
778 return r;
779 }
780 }
781
782 if (cur_linked == key_fp) {
783 ldpp_dout(dpp, 20) << "linked: " << key << dendl;
784 continue;
785 }
786
787 time_t mtime;
788 r = data_ioctx.stat(key, NULL, &mtime);
789 if (r < 0) {
790 if (r != -ENOENT) {
791 ldpp_dout(dpp, -1) << "ERROR: ioctx.stat(" << key << ") returned ret=" << r << dendl;
792 }
793 continue;
794 }
795 if (stale_secs && (uint64_t)mtime >= time_threshold) {
796 ldpp_dout(dpp, 20) << "skipping: " << key << " (mtime=" << mtime << " threshold=" << time_threshold << ")" << dendl;
797 continue;
798 }
799 ldpp_dout(dpp, 20) << "leaked: " << key << dendl;
800 cout << "leaked: " << key << std::endl;
801 } while (!done);
802 }
803
804 return 0;
805 }
806
807 int RGWOrphanSearch::run(const DoutPrefixProvider *dpp)
808 {
809 int r;
810
811 switch (search_stage.stage) {
812
813 case ORPHAN_SEARCH_STAGE_INIT:
814 ldpp_dout(dpp, 0) << __func__ << "(): initializing state" << dendl;
815 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSPOOL);
816 r = save_state();
817 if (r < 0) {
818 ldpp_dout(dpp, -1) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
819 return r;
820 }
821 // fall through
822 case ORPHAN_SEARCH_STAGE_LSPOOL:
823 ldpp_dout(dpp, 0) << __func__ << "(): building index of all objects in pool" << dendl;
824 r = build_all_oids_index(dpp);
825 if (r < 0) {
826 ldpp_dout(dpp, -1) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
827 return r;
828 }
829
830 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSBUCKETS);
831 r = save_state();
832 if (r < 0) {
833 ldpp_dout(dpp, -1) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
834 return r;
835 }
836 // fall through
837
838 case ORPHAN_SEARCH_STAGE_LSBUCKETS:
839 ldpp_dout(dpp, 0) << __func__ << "(): building index of all bucket indexes" << dendl;
840 r = build_buckets_instance_index(dpp);
841 if (r < 0) {
842 ldpp_dout(dpp, -1) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
843 return r;
844 }
845
846 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_ITERATE_BI);
847 r = save_state();
848 if (r < 0) {
849 ldpp_dout(dpp, -1) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
850 return r;
851 }
852 // fall through
853
854
855 case ORPHAN_SEARCH_STAGE_ITERATE_BI:
856 ldpp_dout(dpp, 0) << __func__ << "(): building index of all linked objects" << dendl;
857 r = build_linked_oids_index(dpp);
858 if (r < 0) {
859 ldpp_dout(dpp, -1) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
860 return r;
861 }
862
863 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_COMPARE);
864 r = save_state();
865 if (r < 0) {
866 ldpp_dout(dpp, -1) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
867 return r;
868 }
869 // fall through
870
871 case ORPHAN_SEARCH_STAGE_COMPARE:
872 r = compare_oid_indexes(dpp);
873 if (r < 0) {
874 ldpp_dout(dpp, -1) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
875 return r;
876 }
877
878 break;
879
880 default:
881 ceph_abort();
882 };
883
884 return 0;
885 }
886
887
888 int RGWOrphanSearch::remove_index(map<int, string>& index)
889 {
890 librados::IoCtx& ioctx = orphan_store.get_ioctx();
891
892 for (map<int, string>::iterator iter = index.begin(); iter != index.end(); ++iter) {
893 int r = ioctx.remove(iter->second);
894 if (r < 0) {
895 if (r != -ENOENT) {
896 ldout(store->ctx(), 0) << "ERROR: couldn't remove " << iter->second << ": ret=" << r << dendl;
897 }
898 }
899 }
900 return 0;
901 }
902
903 int RGWOrphanSearch::finish()
904 {
905 int r = remove_index(all_objs_index);
906 if (r < 0) {
907 ldout(store->ctx(), 0) << "ERROR: remove_index(" << all_objs_index << ") returned ret=" << r << dendl;
908 }
909 r = remove_index(buckets_instance_index);
910 if (r < 0) {
911 ldout(store->ctx(), 0) << "ERROR: remove_index(" << buckets_instance_index << ") returned ret=" << r << dendl;
912 }
913 r = remove_index(linked_objs_index);
914 if (r < 0) {
915 ldout(store->ctx(), 0) << "ERROR: remove_index(" << linked_objs_index << ") returned ret=" << r << dendl;
916 }
917
918 r = orphan_store.remove_job(search_info.job_name);
919 if (r < 0) {
920 ldout(store->ctx(), 0) << "ERROR: could not remove job name (" << search_info.job_name << ") ret=" << r << dendl;
921 }
922
923 return r;
924 }
925
926
927 int RGWRadosList::handle_stat_result(const DoutPrefixProvider *dpp,
928 RGWRados::Object::Stat::Result& result,
929 std::string& bucket_name,
930 rgw_obj_key& obj_key,
931 std::set<string>& obj_oids)
932 {
933 obj_oids.clear();
934
935 rgw_bucket& bucket = result.obj.bucket;
936
937 ldpp_dout(dpp, 20) << "RGWRadosList::" << __func__ <<
938 " bucket=" << bucket <<
939 ", has_manifest=" << result.manifest.has_value() <<
940 dendl;
941
942 // iterator to store result of dlo/slo attribute find
943 decltype(result.attrs)::iterator attr_it = result.attrs.end();
944 const std::string oid = bucket.marker + "_" + result.obj.get_oid();
945 ldpp_dout(dpp, 20) << "radoslist processing object=\"" <<
946 oid << "\"" << dendl;
947 if (visited_oids.find(oid) != visited_oids.end()) {
948 // apparently we hit a loop; don't continue with this oid
949 ldpp_dout(dpp, 15) <<
950 "radoslist stopped loop at already visited object=\"" <<
951 oid << "\"" << dendl;
952 return 0;
953 }
954
955 bucket_name = bucket.name;
956 obj_key = result.obj.key;
957
958 if (!result.manifest) {
959 /* a very very old object, or part of a multipart upload during upload */
960 obj_oids.insert(oid);
961
962 /*
963 * multipart parts don't have manifest on them, it's in the meta
964 * object; we'll process them in
965 * RGWRadosList::do_incomplete_multipart
966 */
967 } else if ((attr_it = result.attrs.find(RGW_ATTR_USER_MANIFEST)) !=
968 result.attrs.end()) {
969 // *** handle DLO object ***
970
971 obj_oids.insert(oid);
972 visited_oids.insert(oid); // prevent dlo loops
973 ldpp_dout(dpp, 15) << "radoslist added to visited list DLO=\"" <<
974 oid << "\"" << dendl;
975
976 char* prefix_path_c = attr_it->second.c_str();
977 const std::string& prefix_path = prefix_path_c;
978
979 const size_t sep_pos = prefix_path.find('/');
980 if (string::npos == sep_pos) {
981 return -EINVAL;
982 }
983
984 const std::string bucket_name = prefix_path.substr(0, sep_pos);
985 const std::string prefix = prefix_path.substr(sep_pos + 1);
986
987 add_bucket_prefix(bucket_name, prefix);
988 ldpp_dout(dpp, 25) << "radoslist DLO oid=\"" << oid <<
989 "\" added bucket=\"" << bucket_name << "\" prefix=\"" <<
990 prefix << "\" to process list" << dendl;
991 } else if ((attr_it = result.attrs.find(RGW_ATTR_USER_MANIFEST)) !=
992 result.attrs.end()) {
993 // *** handle SLO object ***
994
995 obj_oids.insert(oid);
996 visited_oids.insert(oid); // prevent slo loops
997 ldpp_dout(dpp, 15) << "radoslist added to visited list SLO=\"" <<
998 oid << "\"" << dendl;
999
1000 RGWSLOInfo slo_info;
1001 bufferlist::const_iterator bliter = attr_it->second.begin();
1002 try {
1003 ::decode(slo_info, bliter);
1004 } catch (buffer::error& err) {
1005 ldpp_dout(dpp, 0) <<
1006 "ERROR: failed to decode slo manifest for " << oid << dendl;
1007 return -EIO;
1008 }
1009
1010 for (const auto& iter : slo_info.entries) {
1011 const string& path_str = iter.path;
1012
1013 const size_t sep_pos = path_str.find('/', 1 /* skip initial slash */);
1014 if (string::npos == sep_pos) {
1015 return -EINVAL;
1016 }
1017
1018 std::string bucket_name;
1019 std::string obj_name;
1020
1021 bucket_name = url_decode(path_str.substr(1, sep_pos - 1));
1022 obj_name = url_decode(path_str.substr(sep_pos + 1));
1023
1024 const rgw_obj_key obj_key(obj_name);
1025 add_bucket_filter(bucket_name, obj_key);
1026 ldpp_dout(dpp, 25) << "radoslist SLO oid=\"" << oid <<
1027 "\" added bucket=\"" << bucket_name << "\" obj_key=\"" <<
1028 obj_key << "\" to process list" << dendl;
1029 }
1030 } else {
1031 RGWObjManifest& manifest = *result.manifest;
1032
1033 // in multipart, the head object contains no data and just has the
1034 // manifest AND empty objects have no manifest, but they're
1035 // realized as empty rados objects
1036 if (0 == manifest.get_max_head_size() ||
1037 manifest.obj_begin(dpp) == manifest.obj_end(dpp)) {
1038 obj_oids.insert(oid);
1039 // first_insert = true;
1040 }
1041
1042 RGWObjManifest::obj_iterator miter;
1043 for (miter = manifest.obj_begin(dpp); miter != manifest.obj_end(dpp); ++miter) {
1044 const rgw_raw_obj& loc =
1045 miter.get_location().get_raw_obj(store->getRados());
1046 string s = loc.oid;
1047 obj_oids.insert(s);
1048 }
1049 }
1050
1051 return 0;
1052 } // RGWRadosList::handle_stat_result
1053
1054 int RGWRadosList::pop_and_handle_stat_op(
1055 const DoutPrefixProvider *dpp,
1056 RGWObjectCtx& obj_ctx,
1057 std::deque<RGWRados::Object::Stat>& ops)
1058 {
1059 std::string bucket_name;
1060 rgw_obj_key obj_key;
1061 std::set<std::string> obj_oids;
1062 RGWRados::Object::Stat& front_op = ops.front();
1063
1064 int ret = front_op.wait(dpp);
1065 if (ret < 0) {
1066 if (ret != -ENOENT) {
1067 ldpp_dout(dpp, -1) << "ERROR: stat_async() returned error: " <<
1068 cpp_strerror(-ret) << dendl;
1069 }
1070 goto done;
1071 }
1072
1073 ret = handle_stat_result(dpp, front_op.result, bucket_name, obj_key, obj_oids);
1074 if (ret < 0) {
1075 ldpp_dout(dpp, -1) << "ERROR: handle_stat_result() returned error: " <<
1076 cpp_strerror(-ret) << dendl;
1077 }
1078
1079 // output results
1080 for (const auto& o : obj_oids) {
1081 if (include_rgw_obj_name) {
1082 std::cout << o <<
1083 field_separator << bucket_name <<
1084 field_separator << obj_key <<
1085 std::endl;
1086 } else {
1087 std::cout << o << std::endl;
1088 }
1089 }
1090
1091 done:
1092
1093 // invalidate object context for this object to avoid memory leak
1094 // (see pr https://github.com/ceph/ceph/pull/30174)
1095 obj_ctx.invalidate(front_op.result.obj);
1096
1097 ops.pop_front();
1098 return ret;
1099 }
1100
1101
1102 #if 0 // code that may be the basis for expansion
1103 int RGWRadosList::build_buckets_instance_index()
1104 {
1105 void *handle;
1106 int max = 1000;
1107 string section = "bucket.instance";
1108 int ret = store->meta_mgr->list_keys_init(section, &handle);
1109 if (ret < 0) {
1110 lderr(store->ctx()) << "ERROR: can't get key: " << cpp_strerror(-ret) << dendl;
1111 return ret;
1112 }
1113
1114 map<int, list<string> > instances;
1115
1116 bool truncated;
1117
1118 RGWObjectCtx obj_ctx(store);
1119
1120 int count = 0;
1121 uint64_t total = 0;
1122
1123 do {
1124 list<string> keys;
1125 ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated);
1126 if (ret < 0) {
1127 lderr(store->ctx()) << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
1128 return ret;
1129 }
1130
1131 for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) {
1132 ++total;
1133 ldout(store->ctx(), 10) << "bucket_instance=" << *iter << " total=" << total << dendl;
1134 int shard = orphan_shard(*iter);
1135 instances[shard].push_back(*iter);
1136
1137 if (++count >= COUNT_BEFORE_FLUSH) {
1138 ret = log_oids(buckets_instance_index, instances);
1139 if (ret < 0) {
1140 lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
1141 return ret;
1142 }
1143 count = 0;
1144 instances.clear();
1145 }
1146 }
1147 } while (truncated);
1148
1149 ret = log_oids(buckets_instance_index, instances);
1150 if (ret < 0) {
1151 lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
1152 return ret;
1153 }
1154 store->meta_mgr->list_keys_complete(handle);
1155
1156 return 0;
1157 }
1158 #endif
1159
1160
1161 int RGWRadosList::process_bucket(
1162 const DoutPrefixProvider *dpp,
1163 const std::string& bucket_instance_id,
1164 const std::string& prefix,
1165 const std::set<rgw_obj_key>& entries_filter)
1166 {
1167 ldpp_dout(dpp, 10) << "RGWRadosList::" << __func__ <<
1168 " bucket_instance_id=" << bucket_instance_id <<
1169 ", prefix=" << prefix <<
1170 ", entries_filter.size=" << entries_filter.size() << dendl;
1171
1172 RGWBucketInfo bucket_info;
1173 int ret = store->getRados()->get_bucket_instance_info(bucket_instance_id,
1174 bucket_info,
1175 nullptr,
1176 nullptr,
1177 null_yield,
1178 dpp);
1179 if (ret < 0) {
1180 if (ret == -ENOENT) {
1181 // probably raced with bucket removal
1182 return 0;
1183 }
1184 ldpp_dout(dpp, -1) << __func__ <<
1185 ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" <<
1186 ret << dendl;
1187 return ret;
1188 }
1189
1190 RGWRados::Bucket target(store->getRados(), bucket_info);
1191 RGWRados::Bucket::List list_op(&target);
1192
1193 std::string marker;
1194 list_op.params.marker = rgw_obj_key(marker);
1195 list_op.params.list_versions = true;
1196 list_op.params.enforce_ns = false;
1197 list_op.params.allow_unordered = false;
1198 list_op.params.prefix = prefix;
1199
1200 bool truncated;
1201
1202 std::deque<RGWRados::Object::Stat> stat_ops;
1203 std::string prev_versioned_key_name = "";
1204
1205 RGWObjectCtx obj_ctx(store);
1206
1207 do {
1208 std::vector<rgw_bucket_dir_entry> result;
1209 constexpr int64_t LIST_OBJS_MAX_ENTRIES = 100;
1210 ret = list_op.list_objects(dpp, LIST_OBJS_MAX_ENTRIES, &result,
1211 NULL, &truncated, null_yield);
1212 if (ret == -ENOENT) {
1213 // race with bucket delete?
1214 ret = 0;
1215 break;
1216 } else if (ret < 0) {
1217 std::cerr << "ERROR: store->list_objects(): " << cpp_strerror(-ret) <<
1218 std::endl;
1219 return ret;
1220 }
1221
1222 for (std::vector<rgw_bucket_dir_entry>::iterator iter = result.begin();
1223 iter != result.end();
1224 ++iter) {
1225 rgw_bucket_dir_entry& entry = *iter;
1226
1227 if (entry.key.instance.empty()) {
1228 ldpp_dout(dpp, 20) << "obj entry: " << entry.key.name << dendl;
1229 } else {
1230 ldpp_dout(dpp, 20) << "obj entry: " << entry.key.name <<
1231 " [" << entry.key.instance << "]" << dendl;
1232 }
1233
1234 ldpp_dout(dpp, 20) << __func__ << ": entry.key.name=" <<
1235 entry.key.name << " entry.key.instance=" << entry.key.instance <<
1236 dendl;
1237
1238 // ignore entries that are not in the filter if there is a filter
1239 if (!entries_filter.empty() &&
1240 entries_filter.find(entry.key) == entries_filter.cend()) {
1241 continue;
1242 }
1243
1244 std::unique_ptr<rgw::sal::Bucket> bucket;
1245 store->get_bucket(nullptr, bucket_info, &bucket);
1246 // we need to do this in two cases below, so use a lambda
1247 auto do_stat_key =
1248 [&](const rgw_obj_key& key) -> int {
1249 int ret;
1250
1251 rgw_obj obj(bucket_info.bucket, key);
1252 RGWRados::Object op_target(store->getRados(), bucket_info,
1253 obj_ctx, obj);
1254
1255 stat_ops.push_back(RGWRados::Object::Stat(&op_target));
1256 RGWRados::Object::Stat& op = stat_ops.back();
1257
1258 ret = op.stat_async(dpp);
1259 if (ret < 0) {
1260 ldpp_dout(dpp, -1) << "ERROR: stat_async() returned error: " <<
1261 cpp_strerror(-ret) << dendl;
1262 return ret;
1263 }
1264
1265 if (stat_ops.size() >= max_concurrent_ios) {
1266 ret = pop_and_handle_stat_op(dpp, obj_ctx, stat_ops);
1267 if (ret < 0) {
1268 if (ret != -ENOENT) {
1269 ldpp_dout(dpp, -1) <<
1270 "ERROR: pop_and_handle_stat_op() returned error: " <<
1271 cpp_strerror(-ret) << dendl;
1272 }
1273
1274 // clear error, so we'll continue processing directory
1275 ret = 0;
1276 }
1277 }
1278
1279 return ret;
1280 }; // do_stat_key lambda
1281
1282 // for versioned objects, make sure the head object is handled
1283 // as well by ignoring the instance identifier
1284 if (!entry.key.instance.empty() &&
1285 entry.key.name != prev_versioned_key_name) {
1286 // don't do the same key twice; even though out bucket index
1287 // listing allows unordered, since all versions of an object
1288 // use the same bucket index key, they'll all end up together
1289 // and sorted
1290 prev_versioned_key_name = entry.key.name;
1291
1292 rgw_obj_key uninstanced(entry.key.name);
1293
1294 ret = do_stat_key(uninstanced);
1295 if (ret < 0) {
1296 return ret;
1297 }
1298 }
1299
1300 ret = do_stat_key(entry.key);
1301 if (ret < 0) {
1302 return ret;
1303 }
1304 } // for iter loop
1305 } while (truncated);
1306
1307 while (!stat_ops.empty()) {
1308 ret = pop_and_handle_stat_op(dpp, obj_ctx, stat_ops);
1309 if (ret < 0) {
1310 if (ret != -ENOENT) {
1311 ldpp_dout(dpp, -1) << "ERROR: stat_async() returned error: " <<
1312 cpp_strerror(-ret) << dendl;
1313 }
1314 }
1315 }
1316
1317 return 0;
1318 }
1319
1320
1321 int RGWRadosList::run(const DoutPrefixProvider *dpp,
1322 const bool yes_i_really_mean_it)
1323 {
1324 int ret;
1325 void* handle = nullptr;
1326
1327 ret = store->meta_list_keys_init(dpp, "bucket", string(), &handle);
1328 if (ret < 0) {
1329 ldpp_dout(dpp, -1) << "RGWRadosList::" << __func__ <<
1330 " ERROR: list_keys_init returned " <<
1331 cpp_strerror(-ret) << dendl;
1332 return ret;
1333 }
1334
1335 constexpr int max_keys = 1000;
1336 bool truncated = true;
1337 bool warned_indexless = false;
1338
1339 do {
1340 std::list<std::string> buckets;
1341 ret = store->meta_list_keys_next(dpp, handle, max_keys, buckets, &truncated);
1342
1343 for (std::string& bucket_id : buckets) {
1344 ret = run(dpp, bucket_id, true);
1345 if (ret == -ENOENT) {
1346 continue;
1347 } else if (ret == -EINVAL) {
1348 if (! warned_indexless) {
1349 if (yes_i_really_mean_it) {
1350 std::cerr <<
1351 "WARNING: because there is at least one indexless bucket (" <<
1352 bucket_id <<
1353 ") the results of radoslist are *incomplete*; continuing due to --yes-i-really-mean-it" <<
1354 std::endl;
1355 warned_indexless = true;
1356 } else {
1357 std::cerr << "ERROR: because there is at least one indexless bucket (" <<
1358 bucket_id <<
1359 ") the results of radoslist are *incomplete*; use --yes-i-really-mean-it to bypass error" <<
1360 std::endl;
1361 return ret;
1362 }
1363 }
1364 continue;
1365 } else if (ret < 0) {
1366 return ret;
1367 }
1368 }
1369 } while (truncated);
1370
1371 return 0;
1372 } // RGWRadosList::run(DoutPrefixProvider, bool)
1373
1374
1375 int RGWRadosList::run(const DoutPrefixProvider *dpp,
1376 const std::string& start_bucket_name,
1377 const bool silent_indexless)
1378 {
1379 int ret;
1380
1381 add_bucket_entire(start_bucket_name);
1382
1383 while (! bucket_process_map.empty()) {
1384 // pop item from map and capture its key data
1385 auto front = bucket_process_map.begin();
1386 std::string bucket_name = front->first;
1387 process_t process;
1388 std::swap(process, front->second);
1389 bucket_process_map.erase(front);
1390
1391 std::unique_ptr<rgw::sal::Bucket> bucket;
1392 ret = store->get_bucket(dpp, nullptr, tenant_name, bucket_name, &bucket, null_yield);
1393 if (ret == -ENOENT) {
1394 std::cerr << "WARNING: bucket " << bucket_name <<
1395 " does not exist; could it have been deleted very recently?" <<
1396 std::endl;
1397 continue;
1398 } else if (ret < 0) {
1399 std::cerr << "ERROR: could not get info for bucket " << bucket_name <<
1400 " -- " << cpp_strerror(-ret) << std::endl;
1401 return ret;
1402 } else if (bucket->get_info().is_indexless()) {
1403 if (! silent_indexless) {
1404 std::cerr << "ERROR: unable to run radoslist on indexless bucket " <<
1405 bucket_name << std::endl;
1406 }
1407 return -EINVAL;
1408 }
1409
1410 const std::string bucket_id = bucket->get_key().get_key();
1411
1412 static const std::set<rgw_obj_key> empty_filter;
1413 static const std::string empty_prefix;
1414
1415 auto do_process_bucket =
1416 [dpp, &bucket_id, this]
1417 (const std::string& prefix,
1418 const std::set<rgw_obj_key>& entries_filter) -> int {
1419 int ret = process_bucket(dpp, bucket_id, prefix, entries_filter);
1420 if (ret == -ENOENT) {
1421 // bucket deletion race?
1422 return 0;
1423 } if (ret < 0) {
1424 ldpp_dout(dpp, -1) << "RGWRadosList::" << __func__ <<
1425 ": ERROR: process_bucket(); bucket_id=" <<
1426 bucket_id << " returned ret=" << ret << dendl;
1427 }
1428
1429 return ret;
1430 };
1431
1432 // either process the whole bucket *or* process the filters and/or
1433 // the prefixes
1434 if (process.entire_container) {
1435 ret = do_process_bucket(empty_prefix, empty_filter);
1436 if (ret < 0) {
1437 return ret;
1438 }
1439 } else {
1440 if (! process.filter_keys.empty()) {
1441 ret = do_process_bucket(empty_prefix, process.filter_keys);
1442 if (ret < 0) {
1443 return ret;
1444 }
1445 }
1446 for (const auto& p : process.prefixes) {
1447 ret = do_process_bucket(p, empty_filter);
1448 if (ret < 0) {
1449 return ret;
1450 }
1451 }
1452 }
1453 } // while (! bucket_process_map.empty())
1454
1455 if (include_rgw_obj_name) {
1456 return 0;
1457 }
1458
1459 // now handle incomplete multipart uploads by going back to the
1460 // initial bucket
1461
1462 std::unique_ptr<rgw::sal::Bucket> bucket;
1463 ret = store->get_bucket(dpp, nullptr, tenant_name, start_bucket_name, &bucket, null_yield);
1464 if (ret == -ENOENT) {
1465 // bucket deletion race?
1466 return 0;
1467 } else if (ret < 0) {
1468 ldpp_dout(dpp, -1) << "RGWRadosList::" << __func__ <<
1469 ": ERROR: get_bucket_info returned ret=" << ret << dendl;
1470 return ret;
1471 }
1472
1473 ret = do_incomplete_multipart(dpp, bucket.get());
1474 if (ret < 0) {
1475 ldpp_dout(dpp, -1) << "RGWRadosList::" << __func__ <<
1476 ": ERROR: do_incomplete_multipart returned ret=" << ret << dendl;
1477 return ret;
1478 }
1479
1480 return 0;
1481 } // RGWRadosList::run(DoutPrefixProvider, string, bool)
1482
1483
1484 int RGWRadosList::do_incomplete_multipart(const DoutPrefixProvider *dpp,
1485 rgw::sal::Bucket* bucket)
1486 {
1487 constexpr int max_uploads = 1000;
1488 constexpr int max_parts = 1000;
1489 std::string marker;
1490 vector<std::unique_ptr<rgw::sal::MultipartUpload>> uploads;
1491 bool is_truncated;
1492 int ret;
1493
1494 // use empty strings for params.{prefix,delim}
1495
1496 do {
1497 ret = bucket->list_multiparts(dpp, string(), marker, string(), max_uploads, uploads, nullptr, &is_truncated);
1498 if (ret == -ENOENT) {
1499 // could bucket have been removed while this is running?
1500 ldpp_dout(dpp, 5) << "RGWRadosList::" << __func__ <<
1501 ": WARNING: call to list_objects of multipart namespace got ENOENT; "
1502 "assuming bucket removal race" << dendl;
1503 break;
1504 } else if (ret < 0) {
1505 ldpp_dout(dpp, -1) << "RGWRadosList::" << __func__ <<
1506 ": ERROR: list_objects op returned ret=" << ret << dendl;
1507 return ret;
1508 }
1509
1510 if (!uploads.empty()) {
1511 // now process the uploads vector
1512 for (const auto& upload : uploads) {
1513 int parts_marker = 0;
1514 bool is_parts_truncated = false;
1515
1516 do { // while (is_parts_truncated);
1517 ret = upload->list_parts(dpp, store->ctx(), max_parts, parts_marker,
1518 &parts_marker, &is_parts_truncated);
1519 if (ret == -ENOENT) {
1520 ldpp_dout(dpp, 5) << "RGWRadosList::" << __func__ <<
1521 ": WARNING: list_multipart_parts returned ret=-ENOENT "
1522 "for " << upload->get_upload_id() << ", moving on" << dendl;
1523 break;
1524 } else if (ret < 0) {
1525 ldpp_dout(dpp, -1) << "RGWRadosList::" << __func__ <<
1526 ": ERROR: list_multipart_parts returned ret=" << ret <<
1527 dendl;
1528 return ret;
1529 }
1530
1531 for (auto& p : upload->get_parts()) {
1532 rgw::sal::RadosMultipartPart* part =
1533 dynamic_cast<rgw::sal::RadosMultipartPart*>(p.second.get());
1534 RGWObjManifest& manifest = part->get_manifest();
1535 for (auto obj_it = manifest.obj_begin(dpp);
1536 obj_it != manifest.obj_end(dpp);
1537 ++obj_it) {
1538 const rgw_raw_obj& loc =
1539 obj_it.get_location().get_raw_obj(store->getRados());
1540 std::cout << loc.oid << std::endl;
1541 } // for (auto obj_it
1542 } // for (auto& p
1543 } while (is_parts_truncated);
1544 } // for (const auto& upload
1545 } // if objs not empty
1546 } while (is_truncated);
1547
1548 return 0;
1549 } // RGWRadosList::do_incomplete_multipart
1550
1551 void RGWOrphanSearchStage::dump(Formatter *f) const
1552 {
1553 f->open_object_section("orphan_search_stage");
1554 string s;
1555 switch(stage){
1556 case ORPHAN_SEARCH_STAGE_INIT:
1557 s = "init";
1558 break;
1559 case ORPHAN_SEARCH_STAGE_LSPOOL:
1560 s = "lspool";
1561 break;
1562 case ORPHAN_SEARCH_STAGE_LSBUCKETS:
1563 s = "lsbuckets";
1564 break;
1565 case ORPHAN_SEARCH_STAGE_ITERATE_BI:
1566 s = "iterate_bucket_index";
1567 break;
1568 case ORPHAN_SEARCH_STAGE_COMPARE:
1569 s = "comparing";
1570 break;
1571 default:
1572 s = "unknown";
1573 }
1574 f->dump_string("search_stage", s);
1575 f->dump_int("shard",shard);
1576 f->dump_string("marker",marker);
1577 f->close_section();
1578 }
1579
1580 void RGWOrphanSearchInfo::dump(Formatter *f) const
1581 {
1582 f->open_object_section("orphan_search_info");
1583 f->dump_string("job_name", job_name);
1584 encode_json("pool", pool, f);
1585 f->dump_int("num_shards", num_shards);
1586 encode_json("start_time", start_time, f);
1587 f->close_section();
1588 }
1589
1590 void RGWOrphanSearchState::dump(Formatter *f) const
1591 {
1592 f->open_object_section("orphan_search_state");
1593 encode_json("info", info, f);
1594 encode_json("stage", stage, f);
1595 f->close_section();
1596 }
1597
1598