]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_orphan.cc
update sources to v12.1.0
[ceph.git] / ceph / src / rgw / rgw_orphan.cc
CommitLineData
31f18b77
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
7c673cae
FG
3
4#include <string>
5
6using namespace std;
7
8#include "common/config.h"
9#include "common/Formatter.h"
10#include "common/errno.h"
11
12#include "rgw_rados.h"
13#include "rgw_orphan.h"
14
15#define dout_subsys ceph_subsys_rgw
16
17#define DEFAULT_NUM_SHARDS 64
18
19static string obj_fingerprint(const string& oid, const char *force_ns = NULL)
20{
21 ssize_t pos = oid.find('_');
22 if (pos < 0) {
23 cerr << "ERROR: object does not have a bucket marker: " << oid << std::endl;
24 }
25
26 string obj_marker = oid.substr(0, pos);
27
28 rgw_obj_key key;
29
30 rgw_obj_key::parse_raw_oid(oid.substr(pos + 1), &key);
31
32 if (key.ns.empty()) {
33 return oid;
34 }
35
36 string s = oid;
37
38 if (force_ns) {
39 rgw_bucket b;
40 rgw_obj new_obj(b, key);
41 s = obj_marker + "_" + new_obj.get_oid();
42 }
43
44 /* cut out suffix */
45 size_t i = s.size() - 1;
46 for (; i >= s.size() - 10; --i) {
47 char c = s[i];
48 if (!isdigit(c) && c != '.' && c != '_') {
49 break;
50 }
51 }
52
53 return s.substr(0, i + 1);
54}
55
56int RGWOrphanStore::read_job(const string& job_name, RGWOrphanSearchState & state)
57{
58 set<string> keys;
59 map<string, bufferlist> vals;
60 keys.insert(job_name);
61 int r = ioctx.omap_get_vals_by_keys(oid, keys, &vals);
62 if (r < 0) {
63 return r;
64 }
65
66 map<string, bufferlist>::iterator iter = vals.find(job_name);
67 if (iter == vals.end()) {
68 return -ENOENT;
69 }
70
71 try {
72 bufferlist& bl = iter->second;
73 ::decode(state, bl);
74 } catch (buffer::error& err) {
75 lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl;
76 return -EIO;
77 }
78
79 return 0;
80}
81
82int RGWOrphanStore::write_job(const string& job_name, const RGWOrphanSearchState& state)
83{
84 map<string, bufferlist> vals;
85 bufferlist bl;
86 ::encode(state, bl);
87 vals[job_name] = bl;
88 int r = ioctx.omap_set(oid, vals);
89 if (r < 0) {
90 return r;
91 }
92
93 return 0;
94}
95
96int RGWOrphanStore::remove_job(const string& job_name)
97{
98 set<string> keys;
99 keys.insert(job_name);
100
101 int r = ioctx.omap_rm_keys(oid, keys);
102 if (r < 0) {
103 return r;
104 }
105
106 return 0;
107}
108
109int RGWOrphanStore::list_jobs(map <string,RGWOrphanSearchState>& job_list)
110{
111 map <string,bufferlist> vals;
112 int MAX_READ=1024;
113 string marker="";
114 int r = 0;
115
116 // loop through all the omap vals from index object, storing them to job_list,
117 // read in batches of 1024, we update the marker every iteration and exit the
118 // loop when we find that total size read out is less than batch size
119 do {
120 r = ioctx.omap_get_vals(oid, marker, MAX_READ, &vals);
121 if (r < 0) {
122 return r;
123 }
124 r = vals.size();
125
126 for (const auto &it : vals) {
127 marker=it.first;
128 RGWOrphanSearchState state;
129 try {
130 bufferlist bl = it.second;
131 ::decode(state, bl);
132 } catch (buffer::error& err) {
133 lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl;
134 return -EIO;
135 }
136 job_list[it.first] = state;
137 }
138 } while (r == MAX_READ);
139
140 return 0;
141}
142
143int RGWOrphanStore::init()
144{
145 rgw_pool& log_pool = store->get_zone_params().log_pool;
146 int r = rgw_init_ioctx(store->get_rados_handle(), log_pool, ioctx);
147 if (r < 0) {
148 cerr << "ERROR: failed to open log pool (" << log_pool << " ret=" << r << std::endl;
149 return r;
150 }
151
152 return 0;
153}
154
155int RGWOrphanStore::store_entries(const string& oid, const map<string, bufferlist>& entries)
156{
157 librados::ObjectWriteOperation op;
158 op.omap_set(entries);
159 cout << "storing " << entries.size() << " entries at " << oid << std::endl;
160 ldout(store->ctx(), 20) << "storing " << entries.size() << " entries at " << oid << ": " << dendl;
161 for (map<string, bufferlist>::const_iterator iter = entries.begin(); iter != entries.end(); ++iter) {
162 ldout(store->ctx(), 20) << " > " << iter->first << dendl;
163 }
164 int ret = ioctx.operate(oid, &op);
165 if (ret < 0) {
166 lderr(store->ctx()) << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << ret << dendl;
167 }
168
169 return 0;
170}
171
172int RGWOrphanStore::read_entries(const string& oid, const string& marker, map<string, bufferlist> *entries, bool *truncated)
173{
174#define MAX_OMAP_GET 100
175 int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET, entries);
176 if (ret < 0 && ret != -ENOENT) {
177 cerr << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << cpp_strerror(-ret) << std::endl;
178 }
179
180 *truncated = (entries->size() == MAX_OMAP_GET);
181
182 return 0;
183}
184
185int RGWOrphanSearch::init(const string& job_name, RGWOrphanSearchInfo *info) {
186 int r = orphan_store.init();
187 if (r < 0) {
188 return r;
189 }
190
191 RGWOrphanSearchState state;
192 r = orphan_store.read_job(job_name, state);
193 if (r < 0 && r != -ENOENT) {
194 lderr(store->ctx()) << "ERROR: failed to read state ret=" << r << dendl;
195 return r;
196 }
197
198 if (r == 0) {
199 search_info = state.info;
200 search_stage = state.stage;
201 } else if (info) { /* r == -ENOENT, initiate a new job if info was provided */
202 search_info = *info;
203 search_info.job_name = job_name;
204 search_info.num_shards = (info->num_shards ? info->num_shards : DEFAULT_NUM_SHARDS);
205 search_info.start_time = ceph_clock_now();
206 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_INIT);
207
208 r = save_state();
209 if (r < 0) {
210 lderr(store->ctx()) << "ERROR: failed to write state ret=" << r << dendl;
211 return r;
212 }
213 } else {
214 lderr(store->ctx()) << "ERROR: job not found" << dendl;
215 return r;
216 }
217
218 index_objs_prefix = RGW_ORPHAN_INDEX_PREFIX + string(".");
219 index_objs_prefix += job_name;
220
221 for (int i = 0; i < search_info.num_shards; i++) {
222 char buf[128];
223
224 snprintf(buf, sizeof(buf), "%s.rados.%d", index_objs_prefix.c_str(), i);
225 all_objs_index[i] = buf;
226
227 snprintf(buf, sizeof(buf), "%s.buckets.%d", index_objs_prefix.c_str(), i);
228 buckets_instance_index[i] = buf;
229
230 snprintf(buf, sizeof(buf), "%s.linked.%d", index_objs_prefix.c_str(), i);
231 linked_objs_index[i] = buf;
232 }
233 return 0;
234}
235
236int RGWOrphanSearch::log_oids(map<int, string>& log_shards, map<int, list<string> >& oids)
237{
238 map<int, list<string> >::iterator miter = oids.begin();
239
240 list<log_iter_info> liters; /* a list of iterator pairs for begin and end */
241
242 for (; miter != oids.end(); ++miter) {
243 log_iter_info info;
244 info.oid = log_shards[miter->first];
245 info.cur = miter->second.begin();
246 info.end = miter->second.end();
247 liters.push_back(info);
248 }
249
250 list<log_iter_info>::iterator list_iter;
251 while (!liters.empty()) {
252 list_iter = liters.begin();
253
254 while (list_iter != liters.end()) {
255 log_iter_info& cur_info = *list_iter;
256
257 list<string>::iterator& cur = cur_info.cur;
258 list<string>::iterator& end = cur_info.end;
259
260 map<string, bufferlist> entries;
261#define MAX_OMAP_SET_ENTRIES 100
262 for (int j = 0; cur != end && j != MAX_OMAP_SET_ENTRIES; ++cur, ++j) {
263 ldout(store->ctx(), 20) << "adding obj: " << *cur << dendl;
264 entries[*cur] = bufferlist();
265 }
266
267 int ret = orphan_store.store_entries(cur_info.oid, entries);
268 if (ret < 0) {
269 return ret;
270 }
271 list<log_iter_info>::iterator tmp = list_iter;
272 ++list_iter;
273 if (cur == end) {
274 liters.erase(tmp);
275 }
276 }
277 }
278 return 0;
279}
280
281int RGWOrphanSearch::build_all_oids_index()
282{
283 librados::IoCtx ioctx;
284
285 int ret = rgw_init_ioctx(store->get_rados_handle(), search_info.pool, ioctx);
286 if (ret < 0) {
287 lderr(store->ctx()) << __func__ << ": rgw_init_ioctx() returned ret=" << ret << dendl;
288 return ret;
289 }
290
291 ioctx.set_namespace(librados::all_nspaces);
292 librados::NObjectIterator i = ioctx.nobjects_begin();
293 librados::NObjectIterator i_end = ioctx.nobjects_end();
294
295 map<int, list<string> > oids;
296
297 int count = 0;
298 uint64_t total = 0;
299
300 cout << "logging all objects in the pool" << std::endl;
301
302 for (; i != i_end; ++i) {
303 string nspace = i->get_nspace();
304 string oid = i->get_oid();
305 string locator = i->get_locator();
306
307 ssize_t pos = oid.find('_');
308 if (pos < 0) {
309 cout << "unidentified oid: " << oid << ", skipping" << std::endl;
310 /* what is this object, oids should be in the format of <bucket marker>_<obj>,
311 * skip this entry
312 */
313 continue;
314 }
315 string stripped_oid = oid.substr(pos + 1);
316 rgw_obj_key key;
317 if (!rgw_obj_key::parse_raw_oid(stripped_oid, &key)) {
318 cout << "cannot parse oid: " << oid << ", skipping" << std::endl;
319 continue;
320 }
321
322 if (key.ns.empty()) {
323 /* skipping head objects, we don't want to remove these as they are mutable and
324 * cleaning them up is racy (can race with object removal and a later recreation)
325 */
326 cout << "skipping head object: oid=" << oid << std::endl;
327 continue;
328 }
329
330 string oid_fp = obj_fingerprint(oid);
331
332 ldout(store->ctx(), 20) << "oid_fp=" << oid_fp << dendl;
333
334 int shard = orphan_shard(oid_fp);
335 oids[shard].push_back(oid);
336
337#define COUNT_BEFORE_FLUSH 1000
338 ++total;
339 if (++count >= COUNT_BEFORE_FLUSH) {
340 ldout(store->ctx(), 1) << "iterated through " << total << " objects" << dendl;
341 ret = log_oids(all_objs_index, oids);
342 if (ret < 0) {
343 cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
344 return ret;
345 }
346 count = 0;
347 oids.clear();
348 }
349 }
350 ret = log_oids(all_objs_index, oids);
351 if (ret < 0) {
352 cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
353 return ret;
354 }
355
356 return 0;
357}
358
359int RGWOrphanSearch::build_buckets_instance_index()
360{
361 void *handle;
362 int max = 1000;
363 string section = "bucket.instance";
364 int ret = store->meta_mgr->list_keys_init(section, &handle);
365 if (ret < 0) {
366 lderr(store->ctx()) << "ERROR: can't get key: " << cpp_strerror(-ret) << dendl;
367 return -ret;
368 }
369
370 map<int, list<string> > instances;
371
372 bool truncated;
373
374 RGWObjectCtx obj_ctx(store);
375
376 int count = 0;
377 uint64_t total = 0;
378
379 do {
380 list<string> keys;
381 ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated);
382 if (ret < 0) {
383 lderr(store->ctx()) << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
384 return -ret;
385 }
386
387 for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) {
388 ++total;
389 ldout(store->ctx(), 10) << "bucket_instance=" << *iter << " total=" << total << dendl;
390 int shard = orphan_shard(*iter);
391 instances[shard].push_back(*iter);
392
393 if (++count >= COUNT_BEFORE_FLUSH) {
394 ret = log_oids(buckets_instance_index, instances);
395 if (ret < 0) {
396 lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
397 return ret;
398 }
399 count = 0;
400 instances.clear();
401 }
402 }
403
404 } while (truncated);
405
406 ret = log_oids(buckets_instance_index, instances);
407 if (ret < 0) {
408 lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
409 return ret;
410 }
411 store->meta_mgr->list_keys_complete(handle);
412
413 return 0;
414}
415
416int RGWOrphanSearch::handle_stat_result(map<int, list<string> >& oids, RGWRados::Object::Stat::Result& result)
417{
418 set<string> obj_oids;
419 rgw_bucket& bucket = result.obj.bucket;
420 if (!result.has_manifest) { /* a very very old object, or part of a multipart upload during upload */
421 const string loc = bucket.bucket_id + "_" + result.obj.get_oid();
422 obj_oids.insert(obj_fingerprint(loc));
423
424 /*
425 * multipart parts don't have manifest on them, it's in the meta object. Instead of reading the
426 * meta object, just add a "shadow" object to the mix
427 */
428 obj_oids.insert(obj_fingerprint(loc, "shadow"));
429 } else {
430 RGWObjManifest& manifest = result.manifest;
431
432 RGWObjManifest::obj_iterator miter;
433 for (miter = manifest.obj_begin(); miter != manifest.obj_end(); ++miter) {
434 const rgw_raw_obj& loc = miter.get_location().get_raw_obj(store);
435 string s = loc.oid;
436 obj_oids.insert(obj_fingerprint(s));
437 }
438 }
439
440 for (set<string>::iterator iter = obj_oids.begin(); iter != obj_oids.end(); ++iter) {
441 ldout(store->ctx(), 20) << __func__ << ": oid for obj=" << result.obj << ": " << *iter << dendl;
442
443 int shard = orphan_shard(*iter);
444 oids[shard].push_back(*iter);
445 }
446
447 return 0;
448}
449
450int RGWOrphanSearch::pop_and_handle_stat_op(map<int, list<string> >& oids, std::deque<RGWRados::Object::Stat>& ops)
451{
452 RGWRados::Object::Stat& front_op = ops.front();
453
454 int ret = front_op.wait();
455 if (ret < 0) {
456 if (ret != -ENOENT) {
457 lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
458 }
459 goto done;
460 }
461 ret = handle_stat_result(oids, front_op.result);
462 if (ret < 0) {
463 lderr(store->ctx()) << "ERROR: handle_stat_response() returned error: " << cpp_strerror(-ret) << dendl;
464 }
465done:
466 ops.pop_front();
467 return ret;
468}
469
470int RGWOrphanSearch::build_linked_oids_for_bucket(const string& bucket_instance_id, map<int, list<string> >& oids)
471{
472 ldout(store->ctx(), 10) << "building linked oids for bucket instance: " << bucket_instance_id << dendl;
473 RGWBucketInfo bucket_info;
474 RGWObjectCtx obj_ctx(store);
475 int ret = store->get_bucket_instance_info(obj_ctx, bucket_instance_id, bucket_info, NULL, NULL);
476 if (ret < 0) {
477 if (ret == -ENOENT) {
478 /* probably raced with bucket removal */
479 return 0;
480 }
481 lderr(store->ctx()) << __func__ << ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << ret << dendl;
482 return ret;
483 }
484
485 RGWRados::Bucket target(store, bucket_info);
486 RGWRados::Bucket::List list_op(&target);
487
488 string marker;
489 list_op.params.marker = rgw_obj_key(marker);
490 list_op.params.list_versions = true;
491 list_op.params.enforce_ns = false;
492
493 bool truncated;
494
495 deque<RGWRados::Object::Stat> stat_ops;
496
497 int count = 0;
498
499 do {
500 vector<rgw_bucket_dir_entry> result;
501
502#define MAX_LIST_OBJS_ENTRIES 100
503 ret = list_op.list_objects(MAX_LIST_OBJS_ENTRIES, &result, NULL, &truncated);
504 if (ret < 0) {
505 cerr << "ERROR: store->list_objects(): " << cpp_strerror(-ret) << std::endl;
506 return -ret;
507 }
508
509 for (vector<rgw_bucket_dir_entry>::iterator iter = result.begin(); iter != result.end(); ++iter) {
510 rgw_bucket_dir_entry& entry = *iter;
511 if (entry.key.instance.empty()) {
512 ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << dendl;
513 } else {
514 ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << " [" << entry.key.instance << "]" << dendl;
515 }
516
517 ldout(store->ctx(), 20) << __func__ << ": entry.key.name=" << entry.key.name << " entry.key.instance=" << entry.key.instance << dendl;
518 rgw_obj obj(bucket_info.bucket, entry.key);
519
520 RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
521
522 stat_ops.push_back(RGWRados::Object::Stat(&op_target));
523 RGWRados::Object::Stat& op = stat_ops.back();
524
525
526 ret = op.stat_async();
527 if (ret < 0) {
528 lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
529 return ret;
530 }
531 if (stat_ops.size() >= max_concurrent_ios) {
532 ret = pop_and_handle_stat_op(oids, stat_ops);
533 if (ret < 0) {
534 if (ret != -ENOENT) {
535 lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
536 }
537 }
538 }
539 if (++count >= COUNT_BEFORE_FLUSH) {
540 ret = log_oids(linked_objs_index, oids);
541 if (ret < 0) {
542 cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
543 return ret;
544 }
545 count = 0;
546 oids.clear();
547 }
548 }
549 } while (truncated);
550
551 while (!stat_ops.empty()) {
552 ret = pop_and_handle_stat_op(oids, stat_ops);
553 if (ret < 0) {
554 if (ret != -ENOENT) {
555 lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
556 }
557 }
558 }
559
560 return 0;
561}
562
563int RGWOrphanSearch::build_linked_oids_index()
564{
565 map<int, list<string> > oids;
566 map<int, string>::iterator iter = buckets_instance_index.find(search_stage.shard);
567 for (; iter != buckets_instance_index.end(); ++iter) {
568 ldout(store->ctx(), 0) << "building linked oids index: " << iter->first << "/" << buckets_instance_index.size() << dendl;
569 bool truncated;
570
571 string oid = iter->second;
572
573 do {
574 map<string, bufferlist> entries;
575 int ret = orphan_store.read_entries(oid, search_stage.marker, &entries, &truncated);
576 if (ret == -ENOENT) {
577 truncated = false;
578 ret = 0;
579 }
580
581 if (ret < 0) {
582 lderr(store->ctx()) << __func__ << ": ERROR: read_entries() oid=" << oid << " returned ret=" << ret << dendl;
583 return ret;
584 }
585
586 if (entries.empty()) {
587 break;
588 }
589
590 for (map<string, bufferlist>::iterator eiter = entries.begin(); eiter != entries.end(); ++eiter) {
591 ldout(store->ctx(), 20) << " indexed entry: " << eiter->first << dendl;
592 ret = build_linked_oids_for_bucket(eiter->first, oids);
593 if (ret < 0) {
594 lderr(store->ctx()) << __func__ << ": ERROR: build_linked_oids_for_bucket() indexed entry=" << eiter->first
595 << " returned ret=" << ret << dendl;
596 return ret;
597 }
598 }
599
600 search_stage.shard = iter->first;
601 search_stage.marker = entries.rbegin()->first; /* last entry */
602 } while (truncated);
603
604 search_stage.marker.clear();
605 }
606
607 int ret = log_oids(linked_objs_index, oids);
608 if (ret < 0) {
609 cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
610 return ret;
611 }
612
613 ret = save_state();
614 if (ret < 0) {
615 cerr << __func__ << ": ERROR: failed to write state ret=" << ret << std::endl;
616 return ret;
617 }
618
619 return 0;
620}
621
622class OMAPReader {
623 librados::IoCtx ioctx;
624 string oid;
625
626 map<string, bufferlist> entries;
627 map<string, bufferlist>::iterator iter;
628 string marker;
629 bool truncated;
630
631public:
632 OMAPReader(librados::IoCtx& _ioctx, const string& _oid) : ioctx(_ioctx), oid(_oid), truncated(true) {
633 iter = entries.end();
634 }
635
636 int get_next(string *key, bufferlist *pbl, bool *done);
637};
638
639int OMAPReader::get_next(string *key, bufferlist *pbl, bool *done)
640{
641 if (iter != entries.end()) {
642 *key = iter->first;
643 if (pbl) {
644 *pbl = iter->second;
645 }
646 ++iter;
647 *done = false;
648 marker = *key;
649 return 0;
650 }
651
652 if (!truncated) {
653 *done = true;
654 return 0;
655 }
656
657#define MAX_OMAP_GET_ENTRIES 100
658 int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET_ENTRIES, &entries);
659 if (ret < 0) {
660 if (ret == -ENOENT) {
661 *done = true;
662 return 0;
663 }
664 return ret;
665 }
666
667 truncated = (entries.size() == MAX_OMAP_GET_ENTRIES);
668 iter = entries.begin();
669 return get_next(key, pbl, done);
670}
671
672int RGWOrphanSearch::compare_oid_indexes()
673{
674 assert(linked_objs_index.size() == all_objs_index.size());
675
676 librados::IoCtx& ioctx = orphan_store.get_ioctx();
677
678 librados::IoCtx data_ioctx;
679
680 int ret = rgw_init_ioctx(store->get_rados_handle(), search_info.pool, data_ioctx);
681 if (ret < 0) {
682 lderr(store->ctx()) << __func__ << ": rgw_init_ioctx() returned ret=" << ret << dendl;
683 return ret;
684 }
685
686 uint64_t time_threshold = search_info.start_time.sec() - stale_secs;
687
688 map<int, string>::iterator liter = linked_objs_index.begin();
689 map<int, string>::iterator aiter = all_objs_index.begin();
690
691 for (; liter != linked_objs_index.end(); ++liter, ++aiter) {
692 OMAPReader linked_entries(ioctx, liter->second);
693 OMAPReader all_entries(ioctx, aiter->second);
694
695 bool done;
696
697 string cur_linked;
698 bool linked_done = false;
699
700
701 do {
702 string key;
703 int r = all_entries.get_next(&key, NULL, &done);
704 if (r < 0) {
705 return r;
706 }
707 if (done) {
708 break;
709 }
710
711 string key_fp = obj_fingerprint(key);
712
713 while (cur_linked < key_fp && !linked_done) {
714 r = linked_entries.get_next(&cur_linked, NULL, &linked_done);
715 if (r < 0) {
716 return r;
717 }
718 }
719
720 if (cur_linked == key_fp) {
721 ldout(store->ctx(), 20) << "linked: " << key << dendl;
722 continue;
723 }
724
725 time_t mtime;
726 r = data_ioctx.stat(key, NULL, &mtime);
727 if (r < 0) {
728 if (r != -ENOENT) {
729 lderr(store->ctx()) << "ERROR: ioctx.stat(" << key << ") returned ret=" << r << dendl;
730 }
731 continue;
732 }
733 if (stale_secs && (uint64_t)mtime >= time_threshold) {
734 ldout(store->ctx(), 20) << "skipping: " << key << " (mtime=" << mtime << " threshold=" << time_threshold << ")" << dendl;
735 continue;
736 }
737 ldout(store->ctx(), 20) << "leaked: " << key << dendl;
738 cout << "leaked: " << key << std::endl;
739 } while (!done);
740 }
741
742 return 0;
743}
744
745int RGWOrphanSearch::run()
746{
747 int r;
748
749 switch (search_stage.stage) {
750
751 case ORPHAN_SEARCH_STAGE_INIT:
752 ldout(store->ctx(), 0) << __func__ << "(): initializing state" << dendl;
753 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSPOOL);
754 r = save_state();
755 if (r < 0) {
756 lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
757 return r;
758 }
759 // fall through
760 case ORPHAN_SEARCH_STAGE_LSPOOL:
761 ldout(store->ctx(), 0) << __func__ << "(): building index of all objects in pool" << dendl;
762 r = build_all_oids_index();
763 if (r < 0) {
764 lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
765 return r;
766 }
767
768 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSBUCKETS);
769 r = save_state();
770 if (r < 0) {
771 lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
772 return r;
773 }
774 // fall through
775
776 case ORPHAN_SEARCH_STAGE_LSBUCKETS:
777 ldout(store->ctx(), 0) << __func__ << "(): building index of all bucket indexes" << dendl;
778 r = build_buckets_instance_index();
779 if (r < 0) {
780 lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
781 return r;
782 }
783
784 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_ITERATE_BI);
785 r = save_state();
786 if (r < 0) {
787 lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
788 return r;
789 }
790 // fall through
791
792
793 case ORPHAN_SEARCH_STAGE_ITERATE_BI:
794 ldout(store->ctx(), 0) << __func__ << "(): building index of all linked objects" << dendl;
795 r = build_linked_oids_index();
796 if (r < 0) {
797 lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
798 return r;
799 }
800
801 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_COMPARE);
802 r = save_state();
803 if (r < 0) {
804 lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
805 return r;
806 }
807 // fall through
808
809 case ORPHAN_SEARCH_STAGE_COMPARE:
810 r = compare_oid_indexes();
811 if (r < 0) {
812 lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
813 return r;
814 }
815
816 break;
817
818 default:
819 ceph_abort();
820 };
821
822 return 0;
823}
824
825
826int RGWOrphanSearch::remove_index(map<int, string>& index)
827{
828 librados::IoCtx& ioctx = orphan_store.get_ioctx();
829
830 for (map<int, string>::iterator iter = index.begin(); iter != index.end(); ++iter) {
831 int r = ioctx.remove(iter->second);
832 if (r < 0) {
833 if (r != -ENOENT) {
834 ldout(store->ctx(), 0) << "ERROR: couldn't remove " << iter->second << ": ret=" << r << dendl;
835 }
836 }
837 }
838 return 0;
839}
840
841int RGWOrphanSearch::finish()
842{
843 int r = remove_index(all_objs_index);
844 if (r < 0) {
845 ldout(store->ctx(), 0) << "ERROR: remove_index(" << all_objs_index << ") returned ret=" << r << dendl;
846 }
847 r = remove_index(buckets_instance_index);
848 if (r < 0) {
849 ldout(store->ctx(), 0) << "ERROR: remove_index(" << buckets_instance_index << ") returned ret=" << r << dendl;
850 }
851 r = remove_index(linked_objs_index);
852 if (r < 0) {
853 ldout(store->ctx(), 0) << "ERROR: remove_index(" << linked_objs_index << ") returned ret=" << r << dendl;
854 }
855
856 r = orphan_store.remove_job(search_info.job_name);
857 if (r < 0) {
858 ldout(store->ctx(), 0) << "ERROR: could not remove job name (" << search_info.job_name << ") ret=" << r << dendl;
859 }
860
861 return r;
862}