]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_orphan.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rgw / rgw_orphan.cc
1
2
3 #include <string>
4
5 using namespace std;
6
7 #include "common/config.h"
8 #include "common/Formatter.h"
9 #include "common/errno.h"
10
11 #include "rgw_rados.h"
12 #include "rgw_orphan.h"
13
14 #define dout_subsys ceph_subsys_rgw
15
16 #define DEFAULT_NUM_SHARDS 64
17
18 static string obj_fingerprint(const string& oid, const char *force_ns = NULL)
19 {
20 ssize_t pos = oid.find('_');
21 if (pos < 0) {
22 cerr << "ERROR: object does not have a bucket marker: " << oid << std::endl;
23 }
24
25 string obj_marker = oid.substr(0, pos);
26
27 rgw_obj_key key;
28
29 rgw_obj_key::parse_raw_oid(oid.substr(pos + 1), &key);
30
31 if (key.ns.empty()) {
32 return oid;
33 }
34
35 string s = oid;
36
37 if (force_ns) {
38 rgw_bucket b;
39 rgw_obj new_obj(b, key);
40 s = obj_marker + "_" + new_obj.get_oid();
41 }
42
43 /* cut out suffix */
44 size_t i = s.size() - 1;
45 for (; i >= s.size() - 10; --i) {
46 char c = s[i];
47 if (!isdigit(c) && c != '.' && c != '_') {
48 break;
49 }
50 }
51
52 return s.substr(0, i + 1);
53 }
54
55 int RGWOrphanStore::read_job(const string& job_name, RGWOrphanSearchState & state)
56 {
57 set<string> keys;
58 map<string, bufferlist> vals;
59 keys.insert(job_name);
60 int r = ioctx.omap_get_vals_by_keys(oid, keys, &vals);
61 if (r < 0) {
62 return r;
63 }
64
65 map<string, bufferlist>::iterator iter = vals.find(job_name);
66 if (iter == vals.end()) {
67 return -ENOENT;
68 }
69
70 try {
71 bufferlist& bl = iter->second;
72 ::decode(state, bl);
73 } catch (buffer::error& err) {
74 lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl;
75 return -EIO;
76 }
77
78 return 0;
79 }
80
81 int RGWOrphanStore::write_job(const string& job_name, const RGWOrphanSearchState& state)
82 {
83 map<string, bufferlist> vals;
84 bufferlist bl;
85 ::encode(state, bl);
86 vals[job_name] = bl;
87 int r = ioctx.omap_set(oid, vals);
88 if (r < 0) {
89 return r;
90 }
91
92 return 0;
93 }
94
95 int RGWOrphanStore::remove_job(const string& job_name)
96 {
97 set<string> keys;
98 keys.insert(job_name);
99
100 int r = ioctx.omap_rm_keys(oid, keys);
101 if (r < 0) {
102 return r;
103 }
104
105 return 0;
106 }
107
108 int RGWOrphanStore::list_jobs(map <string,RGWOrphanSearchState>& job_list)
109 {
110 map <string,bufferlist> vals;
111 int MAX_READ=1024;
112 string marker="";
113 int r = 0;
114
115 // loop through all the omap vals from index object, storing them to job_list,
116 // read in batches of 1024, we update the marker every iteration and exit the
117 // loop when we find that total size read out is less than batch size
118 do {
119 r = ioctx.omap_get_vals(oid, marker, MAX_READ, &vals);
120 if (r < 0) {
121 return r;
122 }
123 r = vals.size();
124
125 for (const auto &it : vals) {
126 marker=it.first;
127 RGWOrphanSearchState state;
128 try {
129 bufferlist bl = it.second;
130 ::decode(state, bl);
131 } catch (buffer::error& err) {
132 lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl;
133 return -EIO;
134 }
135 job_list[it.first] = state;
136 }
137 } while (r == MAX_READ);
138
139 return 0;
140 }
141
142 int RGWOrphanStore::init()
143 {
144 rgw_pool& log_pool = store->get_zone_params().log_pool;
145 int r = rgw_init_ioctx(store->get_rados_handle(), log_pool, ioctx);
146 if (r < 0) {
147 cerr << "ERROR: failed to open log pool (" << log_pool << " ret=" << r << std::endl;
148 return r;
149 }
150
151 return 0;
152 }
153
154 int RGWOrphanStore::store_entries(const string& oid, const map<string, bufferlist>& entries)
155 {
156 librados::ObjectWriteOperation op;
157 op.omap_set(entries);
158 cout << "storing " << entries.size() << " entries at " << oid << std::endl;
159 ldout(store->ctx(), 20) << "storing " << entries.size() << " entries at " << oid << ": " << dendl;
160 for (map<string, bufferlist>::const_iterator iter = entries.begin(); iter != entries.end(); ++iter) {
161 ldout(store->ctx(), 20) << " > " << iter->first << dendl;
162 }
163 int ret = ioctx.operate(oid, &op);
164 if (ret < 0) {
165 lderr(store->ctx()) << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << ret << dendl;
166 }
167
168 return 0;
169 }
170
171 int RGWOrphanStore::read_entries(const string& oid, const string& marker, map<string, bufferlist> *entries, bool *truncated)
172 {
173 #define MAX_OMAP_GET 100
174 int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET, entries);
175 if (ret < 0 && ret != -ENOENT) {
176 cerr << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << cpp_strerror(-ret) << std::endl;
177 }
178
179 *truncated = (entries->size() == MAX_OMAP_GET);
180
181 return 0;
182 }
183
184 int RGWOrphanSearch::init(const string& job_name, RGWOrphanSearchInfo *info) {
185 int r = orphan_store.init();
186 if (r < 0) {
187 return r;
188 }
189
190 RGWOrphanSearchState state;
191 r = orphan_store.read_job(job_name, state);
192 if (r < 0 && r != -ENOENT) {
193 lderr(store->ctx()) << "ERROR: failed to read state ret=" << r << dendl;
194 return r;
195 }
196
197 if (r == 0) {
198 search_info = state.info;
199 search_stage = state.stage;
200 } else if (info) { /* r == -ENOENT, initiate a new job if info was provided */
201 search_info = *info;
202 search_info.job_name = job_name;
203 search_info.num_shards = (info->num_shards ? info->num_shards : DEFAULT_NUM_SHARDS);
204 search_info.start_time = ceph_clock_now();
205 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_INIT);
206
207 r = save_state();
208 if (r < 0) {
209 lderr(store->ctx()) << "ERROR: failed to write state ret=" << r << dendl;
210 return r;
211 }
212 } else {
213 lderr(store->ctx()) << "ERROR: job not found" << dendl;
214 return r;
215 }
216
217 index_objs_prefix = RGW_ORPHAN_INDEX_PREFIX + string(".");
218 index_objs_prefix += job_name;
219
220 for (int i = 0; i < search_info.num_shards; i++) {
221 char buf[128];
222
223 snprintf(buf, sizeof(buf), "%s.rados.%d", index_objs_prefix.c_str(), i);
224 all_objs_index[i] = buf;
225
226 snprintf(buf, sizeof(buf), "%s.buckets.%d", index_objs_prefix.c_str(), i);
227 buckets_instance_index[i] = buf;
228
229 snprintf(buf, sizeof(buf), "%s.linked.%d", index_objs_prefix.c_str(), i);
230 linked_objs_index[i] = buf;
231 }
232 return 0;
233 }
234
235 int RGWOrphanSearch::log_oids(map<int, string>& log_shards, map<int, list<string> >& oids)
236 {
237 map<int, list<string> >::iterator miter = oids.begin();
238
239 list<log_iter_info> liters; /* a list of iterator pairs for begin and end */
240
241 for (; miter != oids.end(); ++miter) {
242 log_iter_info info;
243 info.oid = log_shards[miter->first];
244 info.cur = miter->second.begin();
245 info.end = miter->second.end();
246 liters.push_back(info);
247 }
248
249 list<log_iter_info>::iterator list_iter;
250 while (!liters.empty()) {
251 list_iter = liters.begin();
252
253 while (list_iter != liters.end()) {
254 log_iter_info& cur_info = *list_iter;
255
256 list<string>::iterator& cur = cur_info.cur;
257 list<string>::iterator& end = cur_info.end;
258
259 map<string, bufferlist> entries;
260 #define MAX_OMAP_SET_ENTRIES 100
261 for (int j = 0; cur != end && j != MAX_OMAP_SET_ENTRIES; ++cur, ++j) {
262 ldout(store->ctx(), 20) << "adding obj: " << *cur << dendl;
263 entries[*cur] = bufferlist();
264 }
265
266 int ret = orphan_store.store_entries(cur_info.oid, entries);
267 if (ret < 0) {
268 return ret;
269 }
270 list<log_iter_info>::iterator tmp = list_iter;
271 ++list_iter;
272 if (cur == end) {
273 liters.erase(tmp);
274 }
275 }
276 }
277 return 0;
278 }
279
280 int RGWOrphanSearch::build_all_oids_index()
281 {
282 librados::IoCtx ioctx;
283
284 int ret = rgw_init_ioctx(store->get_rados_handle(), search_info.pool, ioctx);
285 if (ret < 0) {
286 lderr(store->ctx()) << __func__ << ": rgw_init_ioctx() returned ret=" << ret << dendl;
287 return ret;
288 }
289
290 ioctx.set_namespace(librados::all_nspaces);
291 librados::NObjectIterator i = ioctx.nobjects_begin();
292 librados::NObjectIterator i_end = ioctx.nobjects_end();
293
294 map<int, list<string> > oids;
295
296 int count = 0;
297 uint64_t total = 0;
298
299 cout << "logging all objects in the pool" << std::endl;
300
301 for (; i != i_end; ++i) {
302 string nspace = i->get_nspace();
303 string oid = i->get_oid();
304 string locator = i->get_locator();
305
306 ssize_t pos = oid.find('_');
307 if (pos < 0) {
308 cout << "unidentified oid: " << oid << ", skipping" << std::endl;
309 /* what is this object, oids should be in the format of <bucket marker>_<obj>,
310 * skip this entry
311 */
312 continue;
313 }
314 string stripped_oid = oid.substr(pos + 1);
315 rgw_obj_key key;
316 if (!rgw_obj_key::parse_raw_oid(stripped_oid, &key)) {
317 cout << "cannot parse oid: " << oid << ", skipping" << std::endl;
318 continue;
319 }
320
321 if (key.ns.empty()) {
322 /* skipping head objects, we don't want to remove these as they are mutable and
323 * cleaning them up is racy (can race with object removal and a later recreation)
324 */
325 cout << "skipping head object: oid=" << oid << std::endl;
326 continue;
327 }
328
329 string oid_fp = obj_fingerprint(oid);
330
331 ldout(store->ctx(), 20) << "oid_fp=" << oid_fp << dendl;
332
333 int shard = orphan_shard(oid_fp);
334 oids[shard].push_back(oid);
335
336 #define COUNT_BEFORE_FLUSH 1000
337 ++total;
338 if (++count >= COUNT_BEFORE_FLUSH) {
339 ldout(store->ctx(), 1) << "iterated through " << total << " objects" << dendl;
340 ret = log_oids(all_objs_index, oids);
341 if (ret < 0) {
342 cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
343 return ret;
344 }
345 count = 0;
346 oids.clear();
347 }
348 }
349 ret = log_oids(all_objs_index, oids);
350 if (ret < 0) {
351 cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
352 return ret;
353 }
354
355 return 0;
356 }
357
358 int RGWOrphanSearch::build_buckets_instance_index()
359 {
360 void *handle;
361 int max = 1000;
362 string section = "bucket.instance";
363 int ret = store->meta_mgr->list_keys_init(section, &handle);
364 if (ret < 0) {
365 lderr(store->ctx()) << "ERROR: can't get key: " << cpp_strerror(-ret) << dendl;
366 return -ret;
367 }
368
369 map<int, list<string> > instances;
370
371 bool truncated;
372
373 RGWObjectCtx obj_ctx(store);
374
375 int count = 0;
376 uint64_t total = 0;
377
378 do {
379 list<string> keys;
380 ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated);
381 if (ret < 0) {
382 lderr(store->ctx()) << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
383 return -ret;
384 }
385
386 for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) {
387 ++total;
388 ldout(store->ctx(), 10) << "bucket_instance=" << *iter << " total=" << total << dendl;
389 int shard = orphan_shard(*iter);
390 instances[shard].push_back(*iter);
391
392 if (++count >= COUNT_BEFORE_FLUSH) {
393 ret = log_oids(buckets_instance_index, instances);
394 if (ret < 0) {
395 lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
396 return ret;
397 }
398 count = 0;
399 instances.clear();
400 }
401 }
402
403 } while (truncated);
404
405 ret = log_oids(buckets_instance_index, instances);
406 if (ret < 0) {
407 lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
408 return ret;
409 }
410 store->meta_mgr->list_keys_complete(handle);
411
412 return 0;
413 }
414
415 int RGWOrphanSearch::handle_stat_result(map<int, list<string> >& oids, RGWRados::Object::Stat::Result& result)
416 {
417 set<string> obj_oids;
418 rgw_bucket& bucket = result.obj.bucket;
419 if (!result.has_manifest) { /* a very very old object, or part of a multipart upload during upload */
420 const string loc = bucket.bucket_id + "_" + result.obj.get_oid();
421 obj_oids.insert(obj_fingerprint(loc));
422
423 /*
424 * multipart parts don't have manifest on them, it's in the meta object. Instead of reading the
425 * meta object, just add a "shadow" object to the mix
426 */
427 obj_oids.insert(obj_fingerprint(loc, "shadow"));
428 } else {
429 RGWObjManifest& manifest = result.manifest;
430
431 RGWObjManifest::obj_iterator miter;
432 for (miter = manifest.obj_begin(); miter != manifest.obj_end(); ++miter) {
433 const rgw_raw_obj& loc = miter.get_location().get_raw_obj(store);
434 string s = loc.oid;
435 obj_oids.insert(obj_fingerprint(s));
436 }
437 }
438
439 for (set<string>::iterator iter = obj_oids.begin(); iter != obj_oids.end(); ++iter) {
440 ldout(store->ctx(), 20) << __func__ << ": oid for obj=" << result.obj << ": " << *iter << dendl;
441
442 int shard = orphan_shard(*iter);
443 oids[shard].push_back(*iter);
444 }
445
446 return 0;
447 }
448
449 int RGWOrphanSearch::pop_and_handle_stat_op(map<int, list<string> >& oids, std::deque<RGWRados::Object::Stat>& ops)
450 {
451 RGWRados::Object::Stat& front_op = ops.front();
452
453 int ret = front_op.wait();
454 if (ret < 0) {
455 if (ret != -ENOENT) {
456 lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
457 }
458 goto done;
459 }
460 ret = handle_stat_result(oids, front_op.result);
461 if (ret < 0) {
462 lderr(store->ctx()) << "ERROR: handle_stat_response() returned error: " << cpp_strerror(-ret) << dendl;
463 }
464 done:
465 ops.pop_front();
466 return ret;
467 }
468
469 int RGWOrphanSearch::build_linked_oids_for_bucket(const string& bucket_instance_id, map<int, list<string> >& oids)
470 {
471 ldout(store->ctx(), 10) << "building linked oids for bucket instance: " << bucket_instance_id << dendl;
472 RGWBucketInfo bucket_info;
473 RGWObjectCtx obj_ctx(store);
474 int ret = store->get_bucket_instance_info(obj_ctx, bucket_instance_id, bucket_info, NULL, NULL);
475 if (ret < 0) {
476 if (ret == -ENOENT) {
477 /* probably raced with bucket removal */
478 return 0;
479 }
480 lderr(store->ctx()) << __func__ << ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << ret << dendl;
481 return ret;
482 }
483
484 RGWRados::Bucket target(store, bucket_info);
485 RGWRados::Bucket::List list_op(&target);
486
487 string marker;
488 list_op.params.marker = rgw_obj_key(marker);
489 list_op.params.list_versions = true;
490 list_op.params.enforce_ns = false;
491
492 bool truncated;
493
494 deque<RGWRados::Object::Stat> stat_ops;
495
496 int count = 0;
497
498 do {
499 vector<rgw_bucket_dir_entry> result;
500
501 #define MAX_LIST_OBJS_ENTRIES 100
502 ret = list_op.list_objects(MAX_LIST_OBJS_ENTRIES, &result, NULL, &truncated);
503 if (ret < 0) {
504 cerr << "ERROR: store->list_objects(): " << cpp_strerror(-ret) << std::endl;
505 return -ret;
506 }
507
508 for (vector<rgw_bucket_dir_entry>::iterator iter = result.begin(); iter != result.end(); ++iter) {
509 rgw_bucket_dir_entry& entry = *iter;
510 if (entry.key.instance.empty()) {
511 ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << dendl;
512 } else {
513 ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << " [" << entry.key.instance << "]" << dendl;
514 }
515
516 ldout(store->ctx(), 20) << __func__ << ": entry.key.name=" << entry.key.name << " entry.key.instance=" << entry.key.instance << dendl;
517 rgw_obj obj(bucket_info.bucket, entry.key);
518
519 RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
520
521 stat_ops.push_back(RGWRados::Object::Stat(&op_target));
522 RGWRados::Object::Stat& op = stat_ops.back();
523
524
525 ret = op.stat_async();
526 if (ret < 0) {
527 lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
528 return ret;
529 }
530 if (stat_ops.size() >= max_concurrent_ios) {
531 ret = pop_and_handle_stat_op(oids, stat_ops);
532 if (ret < 0) {
533 if (ret != -ENOENT) {
534 lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
535 }
536 }
537 }
538 if (++count >= COUNT_BEFORE_FLUSH) {
539 ret = log_oids(linked_objs_index, oids);
540 if (ret < 0) {
541 cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
542 return ret;
543 }
544 count = 0;
545 oids.clear();
546 }
547 }
548 } while (truncated);
549
550 while (!stat_ops.empty()) {
551 ret = pop_and_handle_stat_op(oids, stat_ops);
552 if (ret < 0) {
553 if (ret != -ENOENT) {
554 lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
555 }
556 }
557 }
558
559 return 0;
560 }
561
562 int RGWOrphanSearch::build_linked_oids_index()
563 {
564 map<int, list<string> > oids;
565 map<int, string>::iterator iter = buckets_instance_index.find(search_stage.shard);
566 for (; iter != buckets_instance_index.end(); ++iter) {
567 ldout(store->ctx(), 0) << "building linked oids index: " << iter->first << "/" << buckets_instance_index.size() << dendl;
568 bool truncated;
569
570 string oid = iter->second;
571
572 do {
573 map<string, bufferlist> entries;
574 int ret = orphan_store.read_entries(oid, search_stage.marker, &entries, &truncated);
575 if (ret == -ENOENT) {
576 truncated = false;
577 ret = 0;
578 }
579
580 if (ret < 0) {
581 lderr(store->ctx()) << __func__ << ": ERROR: read_entries() oid=" << oid << " returned ret=" << ret << dendl;
582 return ret;
583 }
584
585 if (entries.empty()) {
586 break;
587 }
588
589 for (map<string, bufferlist>::iterator eiter = entries.begin(); eiter != entries.end(); ++eiter) {
590 ldout(store->ctx(), 20) << " indexed entry: " << eiter->first << dendl;
591 ret = build_linked_oids_for_bucket(eiter->first, oids);
592 if (ret < 0) {
593 lderr(store->ctx()) << __func__ << ": ERROR: build_linked_oids_for_bucket() indexed entry=" << eiter->first
594 << " returned ret=" << ret << dendl;
595 return ret;
596 }
597 }
598
599 search_stage.shard = iter->first;
600 search_stage.marker = entries.rbegin()->first; /* last entry */
601 } while (truncated);
602
603 search_stage.marker.clear();
604 }
605
606 int ret = log_oids(linked_objs_index, oids);
607 if (ret < 0) {
608 cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
609 return ret;
610 }
611
612 ret = save_state();
613 if (ret < 0) {
614 cerr << __func__ << ": ERROR: failed to write state ret=" << ret << std::endl;
615 return ret;
616 }
617
618 return 0;
619 }
620
621 class OMAPReader {
622 librados::IoCtx ioctx;
623 string oid;
624
625 map<string, bufferlist> entries;
626 map<string, bufferlist>::iterator iter;
627 string marker;
628 bool truncated;
629
630 public:
631 OMAPReader(librados::IoCtx& _ioctx, const string& _oid) : ioctx(_ioctx), oid(_oid), truncated(true) {
632 iter = entries.end();
633 }
634
635 int get_next(string *key, bufferlist *pbl, bool *done);
636 };
637
638 int OMAPReader::get_next(string *key, bufferlist *pbl, bool *done)
639 {
640 if (iter != entries.end()) {
641 *key = iter->first;
642 if (pbl) {
643 *pbl = iter->second;
644 }
645 ++iter;
646 *done = false;
647 marker = *key;
648 return 0;
649 }
650
651 if (!truncated) {
652 *done = true;
653 return 0;
654 }
655
656 #define MAX_OMAP_GET_ENTRIES 100
657 int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET_ENTRIES, &entries);
658 if (ret < 0) {
659 if (ret == -ENOENT) {
660 *done = true;
661 return 0;
662 }
663 return ret;
664 }
665
666 truncated = (entries.size() == MAX_OMAP_GET_ENTRIES);
667 iter = entries.begin();
668 return get_next(key, pbl, done);
669 }
670
671 int RGWOrphanSearch::compare_oid_indexes()
672 {
673 assert(linked_objs_index.size() == all_objs_index.size());
674
675 librados::IoCtx& ioctx = orphan_store.get_ioctx();
676
677 librados::IoCtx data_ioctx;
678
679 int ret = rgw_init_ioctx(store->get_rados_handle(), search_info.pool, data_ioctx);
680 if (ret < 0) {
681 lderr(store->ctx()) << __func__ << ": rgw_init_ioctx() returned ret=" << ret << dendl;
682 return ret;
683 }
684
685 uint64_t time_threshold = search_info.start_time.sec() - stale_secs;
686
687 map<int, string>::iterator liter = linked_objs_index.begin();
688 map<int, string>::iterator aiter = all_objs_index.begin();
689
690 for (; liter != linked_objs_index.end(); ++liter, ++aiter) {
691 OMAPReader linked_entries(ioctx, liter->second);
692 OMAPReader all_entries(ioctx, aiter->second);
693
694 bool done;
695
696 string cur_linked;
697 bool linked_done = false;
698
699
700 do {
701 string key;
702 int r = all_entries.get_next(&key, NULL, &done);
703 if (r < 0) {
704 return r;
705 }
706 if (done) {
707 break;
708 }
709
710 string key_fp = obj_fingerprint(key);
711
712 while (cur_linked < key_fp && !linked_done) {
713 r = linked_entries.get_next(&cur_linked, NULL, &linked_done);
714 if (r < 0) {
715 return r;
716 }
717 }
718
719 if (cur_linked == key_fp) {
720 ldout(store->ctx(), 20) << "linked: " << key << dendl;
721 continue;
722 }
723
724 time_t mtime;
725 r = data_ioctx.stat(key, NULL, &mtime);
726 if (r < 0) {
727 if (r != -ENOENT) {
728 lderr(store->ctx()) << "ERROR: ioctx.stat(" << key << ") returned ret=" << r << dendl;
729 }
730 continue;
731 }
732 if (stale_secs && (uint64_t)mtime >= time_threshold) {
733 ldout(store->ctx(), 20) << "skipping: " << key << " (mtime=" << mtime << " threshold=" << time_threshold << ")" << dendl;
734 continue;
735 }
736 ldout(store->ctx(), 20) << "leaked: " << key << dendl;
737 cout << "leaked: " << key << std::endl;
738 } while (!done);
739 }
740
741 return 0;
742 }
743
744 int RGWOrphanSearch::run()
745 {
746 int r;
747
748 switch (search_stage.stage) {
749
750 case ORPHAN_SEARCH_STAGE_INIT:
751 ldout(store->ctx(), 0) << __func__ << "(): initializing state" << dendl;
752 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSPOOL);
753 r = save_state();
754 if (r < 0) {
755 lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
756 return r;
757 }
758 // fall through
759 case ORPHAN_SEARCH_STAGE_LSPOOL:
760 ldout(store->ctx(), 0) << __func__ << "(): building index of all objects in pool" << dendl;
761 r = build_all_oids_index();
762 if (r < 0) {
763 lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
764 return r;
765 }
766
767 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSBUCKETS);
768 r = save_state();
769 if (r < 0) {
770 lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
771 return r;
772 }
773 // fall through
774
775 case ORPHAN_SEARCH_STAGE_LSBUCKETS:
776 ldout(store->ctx(), 0) << __func__ << "(): building index of all bucket indexes" << dendl;
777 r = build_buckets_instance_index();
778 if (r < 0) {
779 lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
780 return r;
781 }
782
783 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_ITERATE_BI);
784 r = save_state();
785 if (r < 0) {
786 lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
787 return r;
788 }
789 // fall through
790
791
792 case ORPHAN_SEARCH_STAGE_ITERATE_BI:
793 ldout(store->ctx(), 0) << __func__ << "(): building index of all linked objects" << dendl;
794 r = build_linked_oids_index();
795 if (r < 0) {
796 lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
797 return r;
798 }
799
800 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_COMPARE);
801 r = save_state();
802 if (r < 0) {
803 lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
804 return r;
805 }
806 // fall through
807
808 case ORPHAN_SEARCH_STAGE_COMPARE:
809 r = compare_oid_indexes();
810 if (r < 0) {
811 lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
812 return r;
813 }
814
815 break;
816
817 default:
818 ceph_abort();
819 };
820
821 return 0;
822 }
823
824
825 int RGWOrphanSearch::remove_index(map<int, string>& index)
826 {
827 librados::IoCtx& ioctx = orphan_store.get_ioctx();
828
829 for (map<int, string>::iterator iter = index.begin(); iter != index.end(); ++iter) {
830 int r = ioctx.remove(iter->second);
831 if (r < 0) {
832 if (r != -ENOENT) {
833 ldout(store->ctx(), 0) << "ERROR: couldn't remove " << iter->second << ": ret=" << r << dendl;
834 }
835 }
836 }
837 return 0;
838 }
839
840 int RGWOrphanSearch::finish()
841 {
842 int r = remove_index(all_objs_index);
843 if (r < 0) {
844 ldout(store->ctx(), 0) << "ERROR: remove_index(" << all_objs_index << ") returned ret=" << r << dendl;
845 }
846 r = remove_index(buckets_instance_index);
847 if (r < 0) {
848 ldout(store->ctx(), 0) << "ERROR: remove_index(" << buckets_instance_index << ") returned ret=" << r << dendl;
849 }
850 r = remove_index(linked_objs_index);
851 if (r < 0) {
852 ldout(store->ctx(), 0) << "ERROR: remove_index(" << linked_objs_index << ") returned ret=" << r << dendl;
853 }
854
855 r = orphan_store.remove_job(search_info.job_name);
856 if (r < 0) {
857 ldout(store->ctx(), 0) << "ERROR: could not remove job name (" << search_info.job_name << ") ret=" << r << dendl;
858 }
859
860 return r;
861 }