]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_orphan.cc
import ceph 15.2.14
[ceph.git] / ceph / src / rgw / rgw_orphan.cc
CommitLineData
31f18b77 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
7c673cae
FG
3
4#include <string>
5
7c673cae
FG
6
7#include "common/config.h"
8#include "common/Formatter.h"
9#include "common/errno.h"
10
11#include "rgw_rados.h"
e306af50
TL
12#include "rgw_op.h"
13#include "rgw_multi.h"
7c673cae 14#include "rgw_orphan.h"
11fdf7f2
TL
15#include "rgw_zone.h"
16#include "rgw_bucket.h"
17
18#include "services/svc_zone.h"
19#include "services/svc_sys_obj.h"
7c673cae
FG
20
21#define dout_subsys ceph_subsys_rgw
22
23#define DEFAULT_NUM_SHARDS 64
24
25static string obj_fingerprint(const string& oid, const char *force_ns = NULL)
26{
27 ssize_t pos = oid.find('_');
28 if (pos < 0) {
29 cerr << "ERROR: object does not have a bucket marker: " << oid << std::endl;
30 }
31
32 string obj_marker = oid.substr(0, pos);
33
34 rgw_obj_key key;
35
36 rgw_obj_key::parse_raw_oid(oid.substr(pos + 1), &key);
37
38 if (key.ns.empty()) {
39 return oid;
40 }
41
42 string s = oid;
43
44 if (force_ns) {
45 rgw_bucket b;
46 rgw_obj new_obj(b, key);
47 s = obj_marker + "_" + new_obj.get_oid();
48 }
49
50 /* cut out suffix */
51 size_t i = s.size() - 1;
52 for (; i >= s.size() - 10; --i) {
53 char c = s[i];
54 if (!isdigit(c) && c != '.' && c != '_') {
55 break;
56 }
57 }
58
59 return s.substr(0, i + 1);
60}
61
62int RGWOrphanStore::read_job(const string& job_name, RGWOrphanSearchState & state)
63{
64 set<string> keys;
65 map<string, bufferlist> vals;
66 keys.insert(job_name);
67 int r = ioctx.omap_get_vals_by_keys(oid, keys, &vals);
68 if (r < 0) {
69 return r;
70 }
71
72 map<string, bufferlist>::iterator iter = vals.find(job_name);
73 if (iter == vals.end()) {
74 return -ENOENT;
75 }
76
77 try {
78 bufferlist& bl = iter->second;
11fdf7f2 79 decode(state, bl);
7c673cae
FG
80 } catch (buffer::error& err) {
81 lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl;
82 return -EIO;
83 }
84
85 return 0;
86}
87
88int RGWOrphanStore::write_job(const string& job_name, const RGWOrphanSearchState& state)
89{
90 map<string, bufferlist> vals;
91 bufferlist bl;
11fdf7f2 92 encode(state, bl);
7c673cae
FG
93 vals[job_name] = bl;
94 int r = ioctx.omap_set(oid, vals);
95 if (r < 0) {
96 return r;
97 }
98
99 return 0;
100}
101
102int RGWOrphanStore::remove_job(const string& job_name)
103{
104 set<string> keys;
105 keys.insert(job_name);
106
107 int r = ioctx.omap_rm_keys(oid, keys);
108 if (r < 0) {
109 return r;
110 }
111
112 return 0;
113}
114
115int RGWOrphanStore::list_jobs(map <string,RGWOrphanSearchState>& job_list)
116{
117 map <string,bufferlist> vals;
118 int MAX_READ=1024;
119 string marker="";
120 int r = 0;
121
122 // loop through all the omap vals from index object, storing them to job_list,
123 // read in batches of 1024, we update the marker every iteration and exit the
124 // loop when we find that total size read out is less than batch size
125 do {
126 r = ioctx.omap_get_vals(oid, marker, MAX_READ, &vals);
127 if (r < 0) {
128 return r;
129 }
130 r = vals.size();
131
132 for (const auto &it : vals) {
133 marker=it.first;
134 RGWOrphanSearchState state;
135 try {
136 bufferlist bl = it.second;
11fdf7f2 137 decode(state, bl);
7c673cae
FG
138 } catch (buffer::error& err) {
139 lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl;
140 return -EIO;
141 }
142 job_list[it.first] = state;
143 }
144 } while (r == MAX_READ);
145
146 return 0;
147}
148
149int RGWOrphanStore::init()
150{
9f95a23c
TL
151 const rgw_pool& log_pool = store->svc()->zone->get_zone_params().log_pool;
152 int r = rgw_init_ioctx(store->getRados()->get_rados_handle(), log_pool, ioctx);
7c673cae
FG
153 if (r < 0) {
154 cerr << "ERROR: failed to open log pool (" << log_pool << " ret=" << r << std::endl;
155 return r;
156 }
157
158 return 0;
159}
160
161int RGWOrphanStore::store_entries(const string& oid, const map<string, bufferlist>& entries)
162{
163 librados::ObjectWriteOperation op;
164 op.omap_set(entries);
165 cout << "storing " << entries.size() << " entries at " << oid << std::endl;
166 ldout(store->ctx(), 20) << "storing " << entries.size() << " entries at " << oid << ": " << dendl;
167 for (map<string, bufferlist>::const_iterator iter = entries.begin(); iter != entries.end(); ++iter) {
168 ldout(store->ctx(), 20) << " > " << iter->first << dendl;
169 }
9f95a23c 170 int ret = rgw_rados_operate(ioctx, oid, &op, null_yield);
7c673cae
FG
171 if (ret < 0) {
172 lderr(store->ctx()) << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << ret << dendl;
173 }
174
175 return 0;
176}
177
178int RGWOrphanStore::read_entries(const string& oid, const string& marker, map<string, bufferlist> *entries, bool *truncated)
179{
180#define MAX_OMAP_GET 100
181 int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET, entries);
182 if (ret < 0 && ret != -ENOENT) {
183 cerr << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << cpp_strerror(-ret) << std::endl;
184 }
185
186 *truncated = (entries->size() == MAX_OMAP_GET);
187
188 return 0;
189}
190
11fdf7f2
TL
191int RGWOrphanSearch::init(const string& job_name, RGWOrphanSearchInfo *info, bool _detailed_mode)
192{
7c673cae
FG
193 int r = orphan_store.init();
194 if (r < 0) {
195 return r;
196 }
197
11fdf7f2
TL
198 constexpr int64_t MAX_LIST_OBJS_ENTRIES=100;
199
200 max_list_bucket_entries = std::max(store->ctx()->_conf->rgw_list_bucket_min_readahead,
201 MAX_LIST_OBJS_ENTRIES);
202
203 detailed_mode = _detailed_mode;
7c673cae
FG
204 RGWOrphanSearchState state;
205 r = orphan_store.read_job(job_name, state);
206 if (r < 0 && r != -ENOENT) {
207 lderr(store->ctx()) << "ERROR: failed to read state ret=" << r << dendl;
208 return r;
209 }
210
211 if (r == 0) {
212 search_info = state.info;
213 search_stage = state.stage;
214 } else if (info) { /* r == -ENOENT, initiate a new job if info was provided */
215 search_info = *info;
216 search_info.job_name = job_name;
217 search_info.num_shards = (info->num_shards ? info->num_shards : DEFAULT_NUM_SHARDS);
218 search_info.start_time = ceph_clock_now();
219 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_INIT);
220
221 r = save_state();
222 if (r < 0) {
223 lderr(store->ctx()) << "ERROR: failed to write state ret=" << r << dendl;
224 return r;
225 }
226 } else {
227 lderr(store->ctx()) << "ERROR: job not found" << dendl;
228 return r;
229 }
230
231 index_objs_prefix = RGW_ORPHAN_INDEX_PREFIX + string(".");
232 index_objs_prefix += job_name;
233
234 for (int i = 0; i < search_info.num_shards; i++) {
235 char buf[128];
236
237 snprintf(buf, sizeof(buf), "%s.rados.%d", index_objs_prefix.c_str(), i);
238 all_objs_index[i] = buf;
239
240 snprintf(buf, sizeof(buf), "%s.buckets.%d", index_objs_prefix.c_str(), i);
241 buckets_instance_index[i] = buf;
242
243 snprintf(buf, sizeof(buf), "%s.linked.%d", index_objs_prefix.c_str(), i);
244 linked_objs_index[i] = buf;
245 }
246 return 0;
247}
248
249int RGWOrphanSearch::log_oids(map<int, string>& log_shards, map<int, list<string> >& oids)
250{
251 map<int, list<string> >::iterator miter = oids.begin();
252
253 list<log_iter_info> liters; /* a list of iterator pairs for begin and end */
254
255 for (; miter != oids.end(); ++miter) {
256 log_iter_info info;
257 info.oid = log_shards[miter->first];
258 info.cur = miter->second.begin();
259 info.end = miter->second.end();
260 liters.push_back(info);
261 }
262
263 list<log_iter_info>::iterator list_iter;
264 while (!liters.empty()) {
265 list_iter = liters.begin();
266
267 while (list_iter != liters.end()) {
268 log_iter_info& cur_info = *list_iter;
269
270 list<string>::iterator& cur = cur_info.cur;
271 list<string>::iterator& end = cur_info.end;
272
273 map<string, bufferlist> entries;
274#define MAX_OMAP_SET_ENTRIES 100
275 for (int j = 0; cur != end && j != MAX_OMAP_SET_ENTRIES; ++cur, ++j) {
276 ldout(store->ctx(), 20) << "adding obj: " << *cur << dendl;
277 entries[*cur] = bufferlist();
278 }
279
280 int ret = orphan_store.store_entries(cur_info.oid, entries);
281 if (ret < 0) {
282 return ret;
283 }
284 list<log_iter_info>::iterator tmp = list_iter;
285 ++list_iter;
286 if (cur == end) {
287 liters.erase(tmp);
288 }
289 }
290 }
291 return 0;
292}
293
294int RGWOrphanSearch::build_all_oids_index()
295{
296 librados::IoCtx ioctx;
297
9f95a23c 298 int ret = rgw_init_ioctx(store->getRados()->get_rados_handle(), search_info.pool, ioctx);
7c673cae
FG
299 if (ret < 0) {
300 lderr(store->ctx()) << __func__ << ": rgw_init_ioctx() returned ret=" << ret << dendl;
301 return ret;
302 }
303
304 ioctx.set_namespace(librados::all_nspaces);
305 librados::NObjectIterator i = ioctx.nobjects_begin();
306 librados::NObjectIterator i_end = ioctx.nobjects_end();
307
308 map<int, list<string> > oids;
309
310 int count = 0;
311 uint64_t total = 0;
312
313 cout << "logging all objects in the pool" << std::endl;
314
315 for (; i != i_end; ++i) {
316 string nspace = i->get_nspace();
317 string oid = i->get_oid();
318 string locator = i->get_locator();
319
320 ssize_t pos = oid.find('_');
321 if (pos < 0) {
322 cout << "unidentified oid: " << oid << ", skipping" << std::endl;
323 /* what is this object, oids should be in the format of <bucket marker>_<obj>,
324 * skip this entry
325 */
326 continue;
327 }
328 string stripped_oid = oid.substr(pos + 1);
329 rgw_obj_key key;
330 if (!rgw_obj_key::parse_raw_oid(stripped_oid, &key)) {
331 cout << "cannot parse oid: " << oid << ", skipping" << std::endl;
332 continue;
333 }
334
335 if (key.ns.empty()) {
336 /* skipping head objects, we don't want to remove these as they are mutable and
337 * cleaning them up is racy (can race with object removal and a later recreation)
338 */
339 cout << "skipping head object: oid=" << oid << std::endl;
340 continue;
341 }
342
343 string oid_fp = obj_fingerprint(oid);
344
345 ldout(store->ctx(), 20) << "oid_fp=" << oid_fp << dendl;
346
347 int shard = orphan_shard(oid_fp);
348 oids[shard].push_back(oid);
349
350#define COUNT_BEFORE_FLUSH 1000
351 ++total;
352 if (++count >= COUNT_BEFORE_FLUSH) {
353 ldout(store->ctx(), 1) << "iterated through " << total << " objects" << dendl;
354 ret = log_oids(all_objs_index, oids);
355 if (ret < 0) {
356 cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
357 return ret;
358 }
359 count = 0;
360 oids.clear();
361 }
362 }
363 ret = log_oids(all_objs_index, oids);
364 if (ret < 0) {
365 cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
366 return ret;
367 }
368
369 return 0;
370}
371
372int RGWOrphanSearch::build_buckets_instance_index()
373{
374 void *handle;
375 int max = 1000;
376 string section = "bucket.instance";
9f95a23c 377 int ret = store->ctl()->meta.mgr->list_keys_init(section, &handle);
7c673cae
FG
378 if (ret < 0) {
379 lderr(store->ctx()) << "ERROR: can't get key: " << cpp_strerror(-ret) << dendl;
92f5a8d4 380 return ret;
7c673cae
FG
381 }
382
383 map<int, list<string> > instances;
384
385 bool truncated;
386
387 RGWObjectCtx obj_ctx(store);
388
389 int count = 0;
390 uint64_t total = 0;
391
392 do {
393 list<string> keys;
9f95a23c 394 ret = store->ctl()->meta.mgr->list_keys_next(handle, max, keys, &truncated);
7c673cae
FG
395 if (ret < 0) {
396 lderr(store->ctx()) << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
92f5a8d4 397 return ret;
7c673cae
FG
398 }
399
400 for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) {
401 ++total;
402 ldout(store->ctx(), 10) << "bucket_instance=" << *iter << " total=" << total << dendl;
403 int shard = orphan_shard(*iter);
404 instances[shard].push_back(*iter);
405
406 if (++count >= COUNT_BEFORE_FLUSH) {
407 ret = log_oids(buckets_instance_index, instances);
408 if (ret < 0) {
409 lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
410 return ret;
411 }
412 count = 0;
413 instances.clear();
414 }
415 }
416
417 } while (truncated);
418
419 ret = log_oids(buckets_instance_index, instances);
420 if (ret < 0) {
421 lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
422 return ret;
423 }
9f95a23c 424 store->ctl()->meta.mgr->list_keys_complete(handle);
7c673cae
FG
425
426 return 0;
427}
428
429int RGWOrphanSearch::handle_stat_result(map<int, list<string> >& oids, RGWRados::Object::Stat::Result& result)
430{
431 set<string> obj_oids;
432 rgw_bucket& bucket = result.obj.bucket;
9f95a23c 433 if (!result.manifest) { /* a very very old object, or part of a multipart upload during upload */
7c673cae
FG
434 const string loc = bucket.bucket_id + "_" + result.obj.get_oid();
435 obj_oids.insert(obj_fingerprint(loc));
436
437 /*
438 * multipart parts don't have manifest on them, it's in the meta object. Instead of reading the
439 * meta object, just add a "shadow" object to the mix
440 */
441 obj_oids.insert(obj_fingerprint(loc, "shadow"));
442 } else {
9f95a23c 443 RGWObjManifest& manifest = *result.manifest;
7c673cae 444
11fdf7f2
TL
445 if (!detailed_mode &&
446 manifest.get_obj_size() <= manifest.get_head_size()) {
447 ldout(store->ctx(), 5) << "skipping object as it fits in a head" << dendl;
448 return 0;
449 }
450
7c673cae
FG
451 RGWObjManifest::obj_iterator miter;
452 for (miter = manifest.obj_begin(); miter != manifest.obj_end(); ++miter) {
9f95a23c 453 const rgw_raw_obj& loc = miter.get_location().get_raw_obj(store->getRados());
7c673cae
FG
454 string s = loc.oid;
455 obj_oids.insert(obj_fingerprint(s));
456 }
457 }
458
459 for (set<string>::iterator iter = obj_oids.begin(); iter != obj_oids.end(); ++iter) {
460 ldout(store->ctx(), 20) << __func__ << ": oid for obj=" << result.obj << ": " << *iter << dendl;
461
462 int shard = orphan_shard(*iter);
463 oids[shard].push_back(*iter);
464 }
465
466 return 0;
467}
468
469int RGWOrphanSearch::pop_and_handle_stat_op(map<int, list<string> >& oids, std::deque<RGWRados::Object::Stat>& ops)
470{
471 RGWRados::Object::Stat& front_op = ops.front();
472
473 int ret = front_op.wait();
474 if (ret < 0) {
475 if (ret != -ENOENT) {
476 lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
477 }
478 goto done;
479 }
480 ret = handle_stat_result(oids, front_op.result);
481 if (ret < 0) {
482 lderr(store->ctx()) << "ERROR: handle_stat_response() returned error: " << cpp_strerror(-ret) << dendl;
483 }
484done:
485 ops.pop_front();
486 return ret;
487}
488
489int RGWOrphanSearch::build_linked_oids_for_bucket(const string& bucket_instance_id, map<int, list<string> >& oids)
490{
7c673cae 491 RGWObjectCtx obj_ctx(store);
9f95a23c 492 auto sysobj_ctx = store->svc()->sysobj->init_obj_ctx();
11fdf7f2
TL
493
494 rgw_bucket orphan_bucket;
495 int shard_id;
496 int ret = rgw_bucket_parse_bucket_key(store->ctx(), bucket_instance_id,
497 &orphan_bucket, &shard_id);
498 if (ret < 0) {
499 ldout(store->ctx(),0) << __func__ << " failed to parse bucket instance: "
500 << bucket_instance_id << " skipping" << dendl;
501 return ret;
502 }
503
504 RGWBucketInfo cur_bucket_info;
9f95a23c
TL
505 ret = store->getRados()->get_bucket_info(store->svc(), orphan_bucket.tenant,
506 orphan_bucket.name, cur_bucket_info, nullptr, null_yield);
11fdf7f2
TL
507 if (ret < 0) {
508 if (ret == -ENOENT) {
509 /* probably raced with bucket removal */
510 return 0;
511 }
512 lderr(store->ctx()) << __func__ << ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << ret << dendl;
513 return ret;
514 }
515
516 if (cur_bucket_info.bucket.bucket_id != orphan_bucket.bucket_id) {
517 ldout(store->ctx(), 0) << __func__ << ": Skipping stale bucket instance: "
518 << orphan_bucket.name << ": "
519 << orphan_bucket.bucket_id << dendl;
520 return 0;
521 }
522
9f95a23c 523 if (cur_bucket_info.reshard_status == cls_rgw_reshard_status::IN_PROGRESS) {
11fdf7f2
TL
524 ldout(store->ctx(), 0) << __func__ << ": reshard in progress. Skipping "
525 << orphan_bucket.name << ": "
526 << orphan_bucket.bucket_id << dendl;
527 return 0;
528 }
529
530 RGWBucketInfo bucket_info;
9f95a23c 531 ret = store->getRados()->get_bucket_instance_info(sysobj_ctx, bucket_instance_id, bucket_info, nullptr, nullptr, null_yield);
7c673cae
FG
532 if (ret < 0) {
533 if (ret == -ENOENT) {
534 /* probably raced with bucket removal */
535 return 0;
536 }
537 lderr(store->ctx()) << __func__ << ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << ret << dendl;
538 return ret;
539 }
540
11fdf7f2 541 ldout(store->ctx(), 10) << "building linked oids for bucket instance: " << bucket_instance_id << dendl;
9f95a23c 542 RGWRados::Bucket target(store->getRados(), bucket_info);
7c673cae
FG
543 RGWRados::Bucket::List list_op(&target);
544
545 string marker;
546 list_op.params.marker = rgw_obj_key(marker);
547 list_op.params.list_versions = true;
548 list_op.params.enforce_ns = false;
549
550 bool truncated;
551
552 deque<RGWRados::Object::Stat> stat_ops;
553
7c673cae
FG
554 do {
555 vector<rgw_bucket_dir_entry> result;
556
11fdf7f2 557 ret = list_op.list_objects(max_list_bucket_entries,
9f95a23c 558 &result, nullptr, &truncated, null_yield);
7c673cae
FG
559 if (ret < 0) {
560 cerr << "ERROR: store->list_objects(): " << cpp_strerror(-ret) << std::endl;
92f5a8d4 561 return ret;
7c673cae
FG
562 }
563
564 for (vector<rgw_bucket_dir_entry>::iterator iter = result.begin(); iter != result.end(); ++iter) {
565 rgw_bucket_dir_entry& entry = *iter;
566 if (entry.key.instance.empty()) {
567 ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << dendl;
568 } else {
569 ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << " [" << entry.key.instance << "]" << dendl;
570 }
571
572 ldout(store->ctx(), 20) << __func__ << ": entry.key.name=" << entry.key.name << " entry.key.instance=" << entry.key.instance << dendl;
11fdf7f2
TL
573
574 if (!detailed_mode &&
575 entry.meta.accounted_size <= (uint64_t)store->ctx()->_conf->rgw_max_chunk_size) {
576 ldout(store->ctx(),5) << __func__ << "skipping stat as the object " << entry.key.name
577 << "fits in a head" << dendl;
578 continue;
579 }
580
7c673cae
FG
581 rgw_obj obj(bucket_info.bucket, entry.key);
582
9f95a23c 583 RGWRados::Object op_target(store->getRados(), bucket_info, obj_ctx, obj);
7c673cae
FG
584
585 stat_ops.push_back(RGWRados::Object::Stat(&op_target));
586 RGWRados::Object::Stat& op = stat_ops.back();
587
588
589 ret = op.stat_async();
590 if (ret < 0) {
591 lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
592 return ret;
593 }
594 if (stat_ops.size() >= max_concurrent_ios) {
595 ret = pop_and_handle_stat_op(oids, stat_ops);
596 if (ret < 0) {
597 if (ret != -ENOENT) {
598 lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
599 }
600 }
601 }
11fdf7f2 602 if (oids.size() >= COUNT_BEFORE_FLUSH) {
7c673cae
FG
603 ret = log_oids(linked_objs_index, oids);
604 if (ret < 0) {
605 cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
606 return ret;
607 }
7c673cae
FG
608 oids.clear();
609 }
610 }
611 } while (truncated);
612
613 while (!stat_ops.empty()) {
614 ret = pop_and_handle_stat_op(oids, stat_ops);
615 if (ret < 0) {
616 if (ret != -ENOENT) {
617 lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
618 }
619 }
620 }
621
622 return 0;
623}
624
625int RGWOrphanSearch::build_linked_oids_index()
626{
627 map<int, list<string> > oids;
628 map<int, string>::iterator iter = buckets_instance_index.find(search_stage.shard);
629 for (; iter != buckets_instance_index.end(); ++iter) {
630 ldout(store->ctx(), 0) << "building linked oids index: " << iter->first << "/" << buckets_instance_index.size() << dendl;
631 bool truncated;
632
633 string oid = iter->second;
634
635 do {
636 map<string, bufferlist> entries;
637 int ret = orphan_store.read_entries(oid, search_stage.marker, &entries, &truncated);
638 if (ret == -ENOENT) {
639 truncated = false;
640 ret = 0;
641 }
642
643 if (ret < 0) {
644 lderr(store->ctx()) << __func__ << ": ERROR: read_entries() oid=" << oid << " returned ret=" << ret << dendl;
645 return ret;
646 }
647
648 if (entries.empty()) {
649 break;
650 }
651
652 for (map<string, bufferlist>::iterator eiter = entries.begin(); eiter != entries.end(); ++eiter) {
653 ldout(store->ctx(), 20) << " indexed entry: " << eiter->first << dendl;
654 ret = build_linked_oids_for_bucket(eiter->first, oids);
655 if (ret < 0) {
656 lderr(store->ctx()) << __func__ << ": ERROR: build_linked_oids_for_bucket() indexed entry=" << eiter->first
657 << " returned ret=" << ret << dendl;
658 return ret;
659 }
660 }
661
662 search_stage.shard = iter->first;
663 search_stage.marker = entries.rbegin()->first; /* last entry */
664 } while (truncated);
665
666 search_stage.marker.clear();
667 }
668
669 int ret = log_oids(linked_objs_index, oids);
670 if (ret < 0) {
671 cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
672 return ret;
673 }
674
675 ret = save_state();
676 if (ret < 0) {
677 cerr << __func__ << ": ERROR: failed to write state ret=" << ret << std::endl;
678 return ret;
679 }
680
681 return 0;
682}
683
684class OMAPReader {
685 librados::IoCtx ioctx;
686 string oid;
687
688 map<string, bufferlist> entries;
689 map<string, bufferlist>::iterator iter;
690 string marker;
691 bool truncated;
692
693public:
694 OMAPReader(librados::IoCtx& _ioctx, const string& _oid) : ioctx(_ioctx), oid(_oid), truncated(true) {
695 iter = entries.end();
696 }
697
698 int get_next(string *key, bufferlist *pbl, bool *done);
699};
700
701int OMAPReader::get_next(string *key, bufferlist *pbl, bool *done)
702{
703 if (iter != entries.end()) {
704 *key = iter->first;
705 if (pbl) {
706 *pbl = iter->second;
707 }
708 ++iter;
709 *done = false;
710 marker = *key;
711 return 0;
712 }
713
714 if (!truncated) {
715 *done = true;
716 return 0;
717 }
718
719#define MAX_OMAP_GET_ENTRIES 100
720 int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET_ENTRIES, &entries);
721 if (ret < 0) {
722 if (ret == -ENOENT) {
723 *done = true;
724 return 0;
725 }
726 return ret;
727 }
728
729 truncated = (entries.size() == MAX_OMAP_GET_ENTRIES);
730 iter = entries.begin();
731 return get_next(key, pbl, done);
732}
733
734int RGWOrphanSearch::compare_oid_indexes()
735{
11fdf7f2 736 ceph_assert(linked_objs_index.size() == all_objs_index.size());
7c673cae
FG
737
738 librados::IoCtx& ioctx = orphan_store.get_ioctx();
739
740 librados::IoCtx data_ioctx;
741
9f95a23c 742 int ret = rgw_init_ioctx(store->getRados()->get_rados_handle(), search_info.pool, data_ioctx);
7c673cae
FG
743 if (ret < 0) {
744 lderr(store->ctx()) << __func__ << ": rgw_init_ioctx() returned ret=" << ret << dendl;
745 return ret;
746 }
747
748 uint64_t time_threshold = search_info.start_time.sec() - stale_secs;
749
750 map<int, string>::iterator liter = linked_objs_index.begin();
751 map<int, string>::iterator aiter = all_objs_index.begin();
752
753 for (; liter != linked_objs_index.end(); ++liter, ++aiter) {
754 OMAPReader linked_entries(ioctx, liter->second);
755 OMAPReader all_entries(ioctx, aiter->second);
756
757 bool done;
758
759 string cur_linked;
760 bool linked_done = false;
761
762
763 do {
764 string key;
765 int r = all_entries.get_next(&key, NULL, &done);
766 if (r < 0) {
767 return r;
768 }
769 if (done) {
770 break;
771 }
772
773 string key_fp = obj_fingerprint(key);
774
775 while (cur_linked < key_fp && !linked_done) {
776 r = linked_entries.get_next(&cur_linked, NULL, &linked_done);
777 if (r < 0) {
778 return r;
779 }
780 }
781
782 if (cur_linked == key_fp) {
783 ldout(store->ctx(), 20) << "linked: " << key << dendl;
784 continue;
785 }
786
787 time_t mtime;
788 r = data_ioctx.stat(key, NULL, &mtime);
789 if (r < 0) {
790 if (r != -ENOENT) {
791 lderr(store->ctx()) << "ERROR: ioctx.stat(" << key << ") returned ret=" << r << dendl;
792 }
793 continue;
794 }
795 if (stale_secs && (uint64_t)mtime >= time_threshold) {
796 ldout(store->ctx(), 20) << "skipping: " << key << " (mtime=" << mtime << " threshold=" << time_threshold << ")" << dendl;
797 continue;
798 }
799 ldout(store->ctx(), 20) << "leaked: " << key << dendl;
800 cout << "leaked: " << key << std::endl;
801 } while (!done);
802 }
803
804 return 0;
805}
806
807int RGWOrphanSearch::run()
808{
809 int r;
810
811 switch (search_stage.stage) {
812
813 case ORPHAN_SEARCH_STAGE_INIT:
814 ldout(store->ctx(), 0) << __func__ << "(): initializing state" << dendl;
815 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSPOOL);
816 r = save_state();
817 if (r < 0) {
818 lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
819 return r;
820 }
821 // fall through
822 case ORPHAN_SEARCH_STAGE_LSPOOL:
823 ldout(store->ctx(), 0) << __func__ << "(): building index of all objects in pool" << dendl;
824 r = build_all_oids_index();
825 if (r < 0) {
826 lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
827 return r;
828 }
829
830 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSBUCKETS);
831 r = save_state();
832 if (r < 0) {
833 lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
834 return r;
835 }
836 // fall through
837
838 case ORPHAN_SEARCH_STAGE_LSBUCKETS:
839 ldout(store->ctx(), 0) << __func__ << "(): building index of all bucket indexes" << dendl;
840 r = build_buckets_instance_index();
841 if (r < 0) {
842 lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
843 return r;
844 }
845
846 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_ITERATE_BI);
847 r = save_state();
848 if (r < 0) {
849 lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
850 return r;
851 }
852 // fall through
853
854
855 case ORPHAN_SEARCH_STAGE_ITERATE_BI:
856 ldout(store->ctx(), 0) << __func__ << "(): building index of all linked objects" << dendl;
857 r = build_linked_oids_index();
858 if (r < 0) {
859 lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
860 return r;
861 }
862
863 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_COMPARE);
864 r = save_state();
865 if (r < 0) {
866 lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
867 return r;
868 }
869 // fall through
870
871 case ORPHAN_SEARCH_STAGE_COMPARE:
872 r = compare_oid_indexes();
873 if (r < 0) {
874 lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
875 return r;
876 }
877
878 break;
879
880 default:
881 ceph_abort();
882 };
883
884 return 0;
885}
886
887
888int RGWOrphanSearch::remove_index(map<int, string>& index)
889{
890 librados::IoCtx& ioctx = orphan_store.get_ioctx();
891
892 for (map<int, string>::iterator iter = index.begin(); iter != index.end(); ++iter) {
893 int r = ioctx.remove(iter->second);
894 if (r < 0) {
895 if (r != -ENOENT) {
896 ldout(store->ctx(), 0) << "ERROR: couldn't remove " << iter->second << ": ret=" << r << dendl;
897 }
898 }
899 }
900 return 0;
901}
902
903int RGWOrphanSearch::finish()
904{
905 int r = remove_index(all_objs_index);
906 if (r < 0) {
907 ldout(store->ctx(), 0) << "ERROR: remove_index(" << all_objs_index << ") returned ret=" << r << dendl;
908 }
909 r = remove_index(buckets_instance_index);
910 if (r < 0) {
911 ldout(store->ctx(), 0) << "ERROR: remove_index(" << buckets_instance_index << ") returned ret=" << r << dendl;
912 }
913 r = remove_index(linked_objs_index);
914 if (r < 0) {
915 ldout(store->ctx(), 0) << "ERROR: remove_index(" << linked_objs_index << ") returned ret=" << r << dendl;
916 }
917
918 r = orphan_store.remove_job(search_info.job_name);
919 if (r < 0) {
920 ldout(store->ctx(), 0) << "ERROR: could not remove job name (" << search_info.job_name << ") ret=" << r << dendl;
921 }
922
923 return r;
924}
e306af50
TL
925
926
927int RGWRadosList::handle_stat_result(RGWRados::Object::Stat::Result& result,
7f7e6c64
TL
928 std::string& bucket_name,
929 rgw_obj_key& obj_key,
e306af50
TL
930 std::set<string>& obj_oids)
931{
932 obj_oids.clear();
933
934 rgw_bucket& bucket = result.obj.bucket;
935
936 ldout(store->ctx(), 20) << "RGWRadosList::" << __func__ <<
937 " bucket=" << bucket <<
938 ", has_manifest=" << result.manifest.has_value() <<
939 dendl;
940
941 // iterator to store result of dlo/slo attribute find
942 decltype(result.attrs)::iterator attr_it = result.attrs.end();
943 const std::string oid = bucket.marker + "_" + result.obj.get_oid();
944 ldout(store->ctx(), 20) << "radoslist processing object=\"" <<
945 oid << "\"" << dendl;
946 if (visited_oids.find(oid) != visited_oids.end()) {
947 // apparently we hit a loop; don't continue with this oid
948 ldout(store->ctx(), 15) <<
949 "radoslist stopped loop at already visited object=\"" <<
950 oid << "\"" << dendl;
951 return 0;
952 }
953
7f7e6c64
TL
954 bucket_name = bucket.name;
955 obj_key = result.obj.key;
956
e306af50
TL
957 if (!result.manifest) {
958 /* a very very old object, or part of a multipart upload during upload */
959 obj_oids.insert(oid);
960
961 /*
962 * multipart parts don't have manifest on them, it's in the meta
963 * object; we'll process them in
964 * RGWRadosList::do_incomplete_multipart
965 */
966 } else if ((attr_it = result.attrs.find(RGW_ATTR_USER_MANIFEST)) !=
967 result.attrs.end()) {
968 // *** handle DLO object ***
969
970 obj_oids.insert(oid);
971 visited_oids.insert(oid); // prevent dlo loops
972 ldout(store->ctx(), 15) << "radoslist added to visited list DLO=\"" <<
973 oid << "\"" << dendl;
974
975 char* prefix_path_c = attr_it->second.c_str();
976 const std::string& prefix_path = prefix_path_c;
977
978 const size_t sep_pos = prefix_path.find('/');
979 if (string::npos == sep_pos) {
980 return -EINVAL;
981 }
982
983 const std::string bucket_name = prefix_path.substr(0, sep_pos);
984 const std::string prefix = prefix_path.substr(sep_pos + 1);
985
986 add_bucket_prefix(bucket_name, prefix);
987 ldout(store->ctx(), 25) << "radoslist DLO oid=\"" << oid <<
988 "\" added bucket=\"" << bucket_name << "\" prefix=\"" <<
989 prefix << "\" to process list" << dendl;
990 } else if ((attr_it = result.attrs.find(RGW_ATTR_SLO_MANIFEST)) !=
991 result.attrs.end()) {
992 // *** handle SLO object ***
993
994 obj_oids.insert(oid);
995 visited_oids.insert(oid); // prevent slo loops
996 ldout(store->ctx(), 15) << "radoslist added to visited list SLO=\"" <<
997 oid << "\"" << dendl;
998
999 RGWSLOInfo slo_info;
1000 bufferlist::const_iterator bliter = attr_it->second.begin();
1001 try {
1002 ::decode(slo_info, bliter);
1003 } catch (buffer::error& err) {
1004 ldout(store->ctx(), 0) <<
1005 "ERROR: failed to decode slo manifest for " << oid << dendl;
1006 return -EIO;
1007 }
1008
1009 for (const auto& iter : slo_info.entries) {
1010 const string& path_str = iter.path;
1011
1012 const size_t sep_pos = path_str.find('/', 1 /* skip initial slash */);
1013 if (string::npos == sep_pos) {
1014 return -EINVAL;
1015 }
1016
1017 std::string bucket_name;
1018 std::string obj_name;
1019
1020 bucket_name = url_decode(path_str.substr(1, sep_pos - 1));
1021 obj_name = url_decode(path_str.substr(sep_pos + 1));
1022
1023 const rgw_obj_key obj_key(obj_name);
1024 add_bucket_filter(bucket_name, obj_key);
1025 ldout(store->ctx(), 25) << "radoslist SLO oid=\"" << oid <<
1026 "\" added bucket=\"" << bucket_name << "\" obj_key=\"" <<
1027 obj_key << "\" to process list" << dendl;
1028 }
1029 } else {
1030 RGWObjManifest& manifest = *result.manifest;
1031
1032 // in multipart, the head object contains no data and just has the
1033 // manifest AND empty objects have no manifest, but they're
1034 // realized as empty rados objects
1035 if (0 == manifest.get_max_head_size() ||
1036 manifest.obj_begin() == manifest.obj_end()) {
1037 obj_oids.insert(oid);
1038 // first_insert = true;
1039 }
1040
1041 RGWObjManifest::obj_iterator miter;
1042 for (miter = manifest.obj_begin(); miter != manifest.obj_end(); ++miter) {
1043 const rgw_raw_obj& loc =
1044 miter.get_location().get_raw_obj(store->getRados());
1045 string s = loc.oid;
1046 obj_oids.insert(s);
1047 }
1048 }
1049
1050 return 0;
1051} // RGWRadosList::handle_stat_result
1052
1053int RGWRadosList::pop_and_handle_stat_op(
1054 RGWObjectCtx& obj_ctx,
1055 std::deque<RGWRados::Object::Stat>& ops)
1056{
7f7e6c64
TL
1057 std::string bucket_name;
1058 rgw_obj_key obj_key;
1059 std::set<std::string> obj_oids;
e306af50
TL
1060 RGWRados::Object::Stat& front_op = ops.front();
1061
1062 int ret = front_op.wait();
1063 if (ret < 0) {
1064 if (ret != -ENOENT) {
1065 lderr(store->ctx()) << "ERROR: stat_async() returned error: " <<
1066 cpp_strerror(-ret) << dendl;
1067 }
1068 goto done;
1069 }
1070
7f7e6c64 1071 ret = handle_stat_result(front_op.result, bucket_name, obj_key, obj_oids);
e306af50
TL
1072 if (ret < 0) {
1073 lderr(store->ctx()) << "ERROR: handle_stat_result() returned error: " <<
1074 cpp_strerror(-ret) << dendl;
1075 }
1076
1077 // output results
1078 for (const auto& o : obj_oids) {
7f7e6c64
TL
1079 if (include_rgw_obj_name) {
1080 std::cout << o <<
1081 field_separator << bucket_name <<
1082 field_separator << obj_key <<
1083 std::endl;
1084 } else {
1085 std::cout << o << std::endl;
1086 }
e306af50
TL
1087 }
1088
1089done:
1090
1091 // invalidate object context for this object to avoid memory leak
1092 // (see pr https://github.com/ceph/ceph/pull/30174)
1093 obj_ctx.invalidate(front_op.result.obj);
1094
1095 ops.pop_front();
1096 return ret;
1097}
1098
1099
1100#if 0 // code that may be the basis for expansion
1101int RGWRadosList::build_buckets_instance_index()
1102{
1103 void *handle;
1104 int max = 1000;
1105 string section = "bucket.instance";
1106 int ret = store->meta_mgr->list_keys_init(section, &handle);
1107 if (ret < 0) {
1108 lderr(store->ctx()) << "ERROR: can't get key: " << cpp_strerror(-ret) << dendl;
1109 return ret;
1110 }
1111
1112 map<int, list<string> > instances;
1113
1114 bool truncated;
1115
1116 RGWObjectCtx obj_ctx(store);
1117
1118 int count = 0;
1119 uint64_t total = 0;
1120
1121 do {
1122 list<string> keys;
1123 ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated);
1124 if (ret < 0) {
1125 lderr(store->ctx()) << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
1126 return ret;
1127 }
1128
1129 for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) {
1130 ++total;
1131 ldout(store->ctx(), 10) << "bucket_instance=" << *iter << " total=" << total << dendl;
1132 int shard = orphan_shard(*iter);
1133 instances[shard].push_back(*iter);
1134
1135 if (++count >= COUNT_BEFORE_FLUSH) {
1136 ret = log_oids(buckets_instance_index, instances);
1137 if (ret < 0) {
1138 lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
1139 return ret;
1140 }
1141 count = 0;
1142 instances.clear();
1143 }
1144 }
1145 } while (truncated);
1146
1147 ret = log_oids(buckets_instance_index, instances);
1148 if (ret < 0) {
1149 lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
1150 return ret;
1151 }
1152 store->meta_mgr->list_keys_complete(handle);
1153
1154 return 0;
1155}
1156#endif
1157
1158
1159int RGWRadosList::process_bucket(
1160 const std::string& bucket_instance_id,
1161 const std::string& prefix,
1162 const std::set<rgw_obj_key>& entries_filter)
1163{
1164 ldout(store->ctx(), 10) << "RGWRadosList::" << __func__ <<
1165 " bucket_instance_id=" << bucket_instance_id <<
1166 ", prefix=" << prefix <<
1167 ", entries_filter.size=" << entries_filter.size() << dendl;
1168
1169 RGWBucketInfo bucket_info;
1170 RGWSysObjectCtx sys_obj_ctx = store->svc()->sysobj->init_obj_ctx();
1171 int ret = store->getRados()->get_bucket_instance_info(sys_obj_ctx,
1172 bucket_instance_id,
1173 bucket_info,
1174 nullptr,
1175 nullptr,
1176 null_yield);
1177 if (ret < 0) {
1178 if (ret == -ENOENT) {
1179 // probably raced with bucket removal
1180 return 0;
1181 }
1182 lderr(store->ctx()) << __func__ <<
1183 ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" <<
1184 ret << dendl;
1185 return ret;
1186 }
1187
1188 RGWRados::Bucket target(store->getRados(), bucket_info);
1189 RGWRados::Bucket::List list_op(&target);
1190
1191 std::string marker;
1192 list_op.params.marker = rgw_obj_key(marker);
1193 list_op.params.list_versions = true;
1194 list_op.params.enforce_ns = false;
1195 list_op.params.allow_unordered = false;
1196 list_op.params.prefix = prefix;
1197
1198 bool truncated;
1199
1200 std::deque<RGWRados::Object::Stat> stat_ops;
1201 std::string prev_versioned_key_name = "";
1202
1203 RGWObjectCtx obj_ctx(store);
1204
1205 do {
1206 std::vector<rgw_bucket_dir_entry> result;
1207
1208 constexpr int64_t LIST_OBJS_MAX_ENTRIES = 100;
1209 ret = list_op.list_objects(LIST_OBJS_MAX_ENTRIES, &result,
1210 NULL, &truncated, null_yield);
1211 if (ret == -ENOENT) {
1212 // race with bucket delete?
1213 ret = 0;
1214 break;
1215 } else if (ret < 0) {
1216 std::cerr << "ERROR: store->list_objects(): " << cpp_strerror(-ret) <<
1217 std::endl;
1218 return ret;
1219 }
1220
1221 for (std::vector<rgw_bucket_dir_entry>::iterator iter = result.begin();
1222 iter != result.end();
1223 ++iter) {
1224 rgw_bucket_dir_entry& entry = *iter;
1225
1226 if (entry.key.instance.empty()) {
1227 ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << dendl;
1228 } else {
1229 ldout(store->ctx(), 20) << "obj entry: " << entry.key.name <<
1230 " [" << entry.key.instance << "]" << dendl;
1231 }
1232
1233 ldout(store->ctx(), 20) << __func__ << ": entry.key.name=" <<
1234 entry.key.name << " entry.key.instance=" << entry.key.instance <<
1235 dendl;
1236
1237 // ignore entries that are not in the filter if there is a filter
1238 if (!entries_filter.empty() &&
1239 entries_filter.find(entry.key) == entries_filter.cend()) {
1240 continue;
1241 }
1242
1243 // we need to do this in two cases below, so use a lambda
1244 auto do_stat_key =
1245 [&](const rgw_obj_key& key) -> int {
1246 int ret;
1247
1248 rgw_obj obj(bucket_info.bucket, key);
1249
1250 RGWRados::Object op_target(store->getRados(), bucket_info,
1251 obj_ctx, obj);
1252
1253 stat_ops.push_back(RGWRados::Object::Stat(&op_target));
1254 RGWRados::Object::Stat& op = stat_ops.back();
1255
1256 ret = op.stat_async();
1257 if (ret < 0) {
1258 lderr(store->ctx()) << "ERROR: stat_async() returned error: " <<
1259 cpp_strerror(-ret) << dendl;
1260 return ret;
1261 }
1262
1263 if (stat_ops.size() >= max_concurrent_ios) {
1264 ret = pop_and_handle_stat_op(obj_ctx, stat_ops);
1265 if (ret < 0) {
1266 if (ret != -ENOENT) {
1267 lderr(store->ctx()) <<
1268 "ERROR: pop_and_handle_stat_op() returned error: " <<
1269 cpp_strerror(-ret) << dendl;
1270 }
1271
1272 // clear error, so we'll continue processing directory
1273 ret = 0;
1274 }
1275 }
1276
1277 return ret;
1278 }; // do_stat_key lambda
1279
1280 // for versioned objects, make sure the head object is handled
1281 // as well by ignoring the instance identifier
1282 if (!entry.key.instance.empty() &&
1283 entry.key.name != prev_versioned_key_name) {
1284 // don't do the same key twice; even though out bucket index
1285 // listing allows unordered, since all versions of an object
1286 // use the same bucket index key, they'll all end up together
1287 // and sorted
1288 prev_versioned_key_name = entry.key.name;
1289
1290 rgw_obj_key uninstanced(entry.key.name);
1291
1292 ret = do_stat_key(uninstanced);
1293 if (ret < 0) {
1294 return ret;
1295 }
1296 }
1297
1298 ret = do_stat_key(entry.key);
1299 if (ret < 0) {
1300 return ret;
1301 }
1302 } // for iter loop
1303 } while (truncated);
1304
1305 while (!stat_ops.empty()) {
1306 ret = pop_and_handle_stat_op(obj_ctx, stat_ops);
1307 if (ret < 0) {
1308 if (ret != -ENOENT) {
1309 lderr(store->ctx()) << "ERROR: stat_async() returned error: " <<
1310 cpp_strerror(-ret) << dendl;
1311 }
1312 }
1313 }
1314
1315 return 0;
1316}
1317
1318
1319int RGWRadosList::run()
1320{
1321 int ret;
1322 void* handle = nullptr;
1323
1324 ret = store->ctl()->meta.mgr->list_keys_init("bucket", &handle);
1325 if (ret < 0) {
1326 lderr(store->ctx()) << "RGWRadosList::" << __func__ <<
1327 " ERROR: list_keys_init returned " <<
1328 cpp_strerror(-ret) << dendl;
1329 return ret;
1330 }
1331
1332 const int max_keys = 1000;
1333 bool truncated = true;
1334
1335 do {
1336 std::list<std::string> buckets;
1337 ret = store->ctl()->meta.mgr->list_keys_next(handle, max_keys,
1338 buckets, &truncated);
1339
1340 for (std::string& bucket_id : buckets) {
1341 ret = run(bucket_id);
1342 if (ret == -ENOENT) {
1343 continue;
1344 } else if (ret < 0) {
1345 return ret;
1346 }
1347 }
1348 } while (truncated);
1349
1350 return 0;
1351} // RGWRadosList::run()
1352
1353
1354int RGWRadosList::run(const std::string& start_bucket_name)
1355{
1356 RGWSysObjectCtx sys_obj_ctx = store->svc()->sysobj->init_obj_ctx();
1357 RGWObjectCtx obj_ctx(store);
7f7e6c64 1358 RGWBucketInfo bucket_info;
e306af50
TL
1359 int ret;
1360
1361 add_bucket_entire(start_bucket_name);
1362
1363 while (! bucket_process_map.empty()) {
1364 // pop item from map and capture its key data
1365 auto front = bucket_process_map.begin();
1366 std::string bucket_name = front->first;
1367 process_t process;
1368 std::swap(process, front->second);
1369 bucket_process_map.erase(front);
1370
1371 RGWBucketInfo bucket_info;
1372 ret = store->getRados()->get_bucket_info(store->svc(),
1373 tenant_name,
1374 bucket_name,
1375 bucket_info,
1376 nullptr,
1377 null_yield);
1378 if (ret == -ENOENT) {
1379 std::cerr << "WARNING: bucket " << bucket_name <<
1380 " does not exist; could it have been deleted very recently?" <<
1381 std::endl;
1382 continue;
1383 } else if (ret < 0) {
1384 std::cerr << "ERROR: could not get info for bucket " << bucket_name <<
1385 " -- " << cpp_strerror(-ret) << std::endl;
1386 return ret;
1387 }
1388
1389 const std::string bucket_id = bucket_info.bucket.get_key();
1390
1391 static const std::set<rgw_obj_key> empty_filter;
1392 static const std::string empty_prefix;
1393
1394 auto do_process_bucket =
1395 [&bucket_id, this]
1396 (const std::string& prefix,
1397 const std::set<rgw_obj_key>& entries_filter) -> int {
1398 int ret = process_bucket(bucket_id, prefix, entries_filter);
1399 if (ret == -ENOENT) {
1400 // bucket deletion race?
1401 return 0;
1402 } if (ret < 0) {
1403 lderr(store->ctx()) << "RGWRadosList::" << __func__ <<
1404 ": ERROR: process_bucket(); bucket_id=" <<
1405 bucket_id << " returned ret=" << ret << dendl;
1406 }
1407
1408 return ret;
1409 };
1410
1411 // either process the whole bucket *or* process the filters and/or
1412 // the prefixes
1413 if (process.entire_container) {
1414 ret = do_process_bucket(empty_prefix, empty_filter);
1415 if (ret < 0) {
1416 return ret;
1417 }
1418 } else {
1419 if (! process.filter_keys.empty()) {
1420 ret = do_process_bucket(empty_prefix, process.filter_keys);
1421 if (ret < 0) {
1422 return ret;
1423 }
1424 }
1425 for (const auto& p : process.prefixes) {
1426 ret = do_process_bucket(p, empty_filter);
1427 if (ret < 0) {
1428 return ret;
1429 }
1430 }
1431 }
1432 } // while (! bucket_process_map.empty())
1433
7f7e6c64
TL
1434 if (include_rgw_obj_name) {
1435 goto done;
1436 }
1437
e306af50
TL
1438 // now handle incomplete multipart uploads by going back to the
1439 // initial bucket
1440
e306af50
TL
1441 ret = store->getRados()->get_bucket_info(store->svc(),
1442 tenant_name,
1443 start_bucket_name,
1444 bucket_info,
1445 nullptr,
1446 null_yield);
1447 if (ret == -ENOENT) {
1448 // bucket deletion race?
1449 return 0;
1450 } else if (ret < 0) {
1451 lderr(store->ctx()) << "RGWRadosList::" << __func__ <<
1452 ": ERROR: get_bucket_info returned ret=" << ret << dendl;
1453 return ret;
1454 }
1455
1456 ret = do_incomplete_multipart(store, bucket_info);
1457 if (ret < 0) {
1458 lderr(store->ctx()) << "RGWRadosList::" << __func__ <<
1459 ": ERROR: do_incomplete_multipart returned ret=" << ret << dendl;
1460 return ret;
1461 }
1462
7f7e6c64
TL
1463done:
1464
e306af50
TL
1465 return 0;
1466} // RGWRadosList::run(string)
1467
1468
1469int RGWRadosList::do_incomplete_multipart(
1470 rgw::sal::RGWRadosStore* store,
1471 RGWBucketInfo& bucket_info)
1472{
1473 constexpr int max_uploads = 1000;
1474 constexpr int max_parts = 1000;
1475 static const std::string mp_ns = RGW_OBJ_NS_MULTIPART;
1476 static MultipartMetaFilter mp_filter;
1477
1478 int ret;
1479
1480 RGWRados::Bucket target(store->getRados(), bucket_info);
1481 RGWRados::Bucket::List list_op(&target);
f6b5b4d7
TL
1482 list_op.params.ns = mp_ns;
1483 list_op.params.filter = &mp_filter;
1484 // use empty string for initial list_op.params.marker
1485 // use empty strings for list_op.params.{prefix,delim}
e306af50
TL
1486
1487 bool is_listing_truncated;
e306af50
TL
1488
1489 do {
e306af50
TL
1490 std::vector<rgw_bucket_dir_entry> objs;
1491 std::map<string, bool> common_prefixes;
1492 ret = list_op.list_objects(max_uploads, &objs, &common_prefixes,
1493 &is_listing_truncated, null_yield);
1494 if (ret == -ENOENT) {
1495 // could bucket have been removed while this is running?
ec96510d 1496 ldout(store->ctx(), 5) << "RGWRadosList::" << __func__ <<
e306af50
TL
1497 ": WARNING: call to list_objects of multipart namespace got ENOENT; "
1498 "assuming bucket removal race" << dendl;
1499 break;
1500 } else if (ret < 0) {
1501 lderr(store->ctx()) << "RGWRadosList::" << __func__ <<
1502 ": ERROR: list_objects op returned ret=" << ret << dendl;
1503 return ret;
1504 }
1505
1506 if (!objs.empty()) {
1507 std::vector<RGWMultipartUploadEntry> uploads;
1508 RGWMultipartUploadEntry entry;
1509 for (const rgw_bucket_dir_entry& obj : objs) {
1510 const rgw_obj_key& key = obj.key;
1511 if (!entry.mp.from_meta(key.name)) {
1512 // we only want the meta objects, so skip all the components
1513 continue;
1514 }
1515 entry.obj = obj;
1516 uploads.push_back(entry);
1517 ldout(store->ctx(), 20) << "RGWRadosList::" << __func__ <<
1518 " processing incomplete multipart entry " <<
1519 entry << dendl;
1520 }
e306af50
TL
1521
1522 // now process the uploads vector
ec96510d
FG
1523 for (const auto& upload : uploads) {
1524 const RGWMPObj& mp = upload.mp;
1525 int parts_marker = 0;
1526 bool is_parts_truncated = false;
e306af50 1527
ec96510d
FG
1528 do { // while (is_parts_truncated);
1529 std::map<uint32_t, RGWUploadPartInfo> parts;
e306af50
TL
1530 ret = list_multipart_parts(store, bucket_info, store->ctx(),
1531 mp.get_upload_id(), mp.get_meta(),
ec96510d
FG
1532 max_parts, parts_marker,
1533 parts, &parts_marker,
1534 &is_parts_truncated);
e306af50 1535 if (ret == -ENOENT) {
ec96510d
FG
1536 ldout(store->ctx(), 5) << "RGWRadosList::" << __func__ <<
1537 ": WARNING: list_multipart_parts returned ret=-ENOENT "
1538 "for " << mp.get_upload_id() << ", moving on" << dendl;
1539 break;
e306af50
TL
1540 } else if (ret < 0) {
1541 lderr(store->ctx()) << "RGWRadosList::" << __func__ <<
ec96510d
FG
1542 ": ERROR: list_multipart_parts returned ret=" << ret <<
1543 dendl;
e306af50
TL
1544 return ret;
1545 }
1546
1547 for (auto& p : parts) {
1548 RGWObjManifest& manifest = p.second.manifest;
1549 for (auto obj_it = manifest.obj_begin();
1550 obj_it != manifest.obj_end();
1551 ++obj_it) {
1552 const rgw_raw_obj& loc =
1553 obj_it.get_location().get_raw_obj(store->getRados());
1554 std::cout << loc.oid << std::endl;
ec96510d
FG
1555 } // for (auto obj_it
1556 } // for (auto& p
1557 } while (is_parts_truncated);
1558 } // for (const auto& upload
e306af50 1559 } // if objs not empty
f6b5b4d7 1560 } while (is_listing_truncated);
e306af50
TL
1561
1562 return 0;
1563} // RGWRadosList::do_incomplete_multipart