]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_orphan.cc
7a1174039cbdf3aac678f645cd85535bf4466ee2
[ceph.git] / ceph / src / rgw / rgw_orphan.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <string>
5
6
7 #include "common/config.h"
8 #include "common/Formatter.h"
9 #include "common/errno.h"
10
11 #include "rgw_rados.h"
12 #include "rgw_orphan.h"
13 #include "rgw_zone.h"
14 #include "rgw_bucket.h"
15
16 #include "services/svc_zone.h"
17 #include "services/svc_sys_obj.h"
18
19 #define dout_subsys ceph_subsys_rgw
20
21 #define DEFAULT_NUM_SHARDS 64
22
23 static string obj_fingerprint(const string& oid, const char *force_ns = NULL)
24 {
25 ssize_t pos = oid.find('_');
26 if (pos < 0) {
27 cerr << "ERROR: object does not have a bucket marker: " << oid << std::endl;
28 }
29
30 string obj_marker = oid.substr(0, pos);
31
32 rgw_obj_key key;
33
34 rgw_obj_key::parse_raw_oid(oid.substr(pos + 1), &key);
35
36 if (key.ns.empty()) {
37 return oid;
38 }
39
40 string s = oid;
41
42 if (force_ns) {
43 rgw_bucket b;
44 rgw_obj new_obj(b, key);
45 s = obj_marker + "_" + new_obj.get_oid();
46 }
47
48 /* cut out suffix */
49 size_t i = s.size() - 1;
50 for (; i >= s.size() - 10; --i) {
51 char c = s[i];
52 if (!isdigit(c) && c != '.' && c != '_') {
53 break;
54 }
55 }
56
57 return s.substr(0, i + 1);
58 }
59
60 int RGWOrphanStore::read_job(const string& job_name, RGWOrphanSearchState & state)
61 {
62 set<string> keys;
63 map<string, bufferlist> vals;
64 keys.insert(job_name);
65 int r = ioctx.omap_get_vals_by_keys(oid, keys, &vals);
66 if (r < 0) {
67 return r;
68 }
69
70 map<string, bufferlist>::iterator iter = vals.find(job_name);
71 if (iter == vals.end()) {
72 return -ENOENT;
73 }
74
75 try {
76 bufferlist& bl = iter->second;
77 decode(state, bl);
78 } catch (buffer::error& err) {
79 lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl;
80 return -EIO;
81 }
82
83 return 0;
84 }
85
86 int RGWOrphanStore::write_job(const string& job_name, const RGWOrphanSearchState& state)
87 {
88 map<string, bufferlist> vals;
89 bufferlist bl;
90 encode(state, bl);
91 vals[job_name] = bl;
92 int r = ioctx.omap_set(oid, vals);
93 if (r < 0) {
94 return r;
95 }
96
97 return 0;
98 }
99
100 int RGWOrphanStore::remove_job(const string& job_name)
101 {
102 set<string> keys;
103 keys.insert(job_name);
104
105 int r = ioctx.omap_rm_keys(oid, keys);
106 if (r < 0) {
107 return r;
108 }
109
110 return 0;
111 }
112
113 int RGWOrphanStore::list_jobs(map <string,RGWOrphanSearchState>& job_list)
114 {
115 map <string,bufferlist> vals;
116 int MAX_READ=1024;
117 string marker="";
118 int r = 0;
119
120 // loop through all the omap vals from index object, storing them to job_list,
121 // read in batches of 1024, we update the marker every iteration and exit the
122 // loop when we find that total size read out is less than batch size
123 do {
124 r = ioctx.omap_get_vals(oid, marker, MAX_READ, &vals);
125 if (r < 0) {
126 return r;
127 }
128 r = vals.size();
129
130 for (const auto &it : vals) {
131 marker=it.first;
132 RGWOrphanSearchState state;
133 try {
134 bufferlist bl = it.second;
135 decode(state, bl);
136 } catch (buffer::error& err) {
137 lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl;
138 return -EIO;
139 }
140 job_list[it.first] = state;
141 }
142 } while (r == MAX_READ);
143
144 return 0;
145 }
146
147 int RGWOrphanStore::init()
148 {
149 const rgw_pool& log_pool = store->svc()->zone->get_zone_params().log_pool;
150 int r = rgw_init_ioctx(store->getRados()->get_rados_handle(), log_pool, ioctx);
151 if (r < 0) {
152 cerr << "ERROR: failed to open log pool (" << log_pool << " ret=" << r << std::endl;
153 return r;
154 }
155
156 return 0;
157 }
158
159 int RGWOrphanStore::store_entries(const string& oid, const map<string, bufferlist>& entries)
160 {
161 librados::ObjectWriteOperation op;
162 op.omap_set(entries);
163 cout << "storing " << entries.size() << " entries at " << oid << std::endl;
164 ldout(store->ctx(), 20) << "storing " << entries.size() << " entries at " << oid << ": " << dendl;
165 for (map<string, bufferlist>::const_iterator iter = entries.begin(); iter != entries.end(); ++iter) {
166 ldout(store->ctx(), 20) << " > " << iter->first << dendl;
167 }
168 int ret = rgw_rados_operate(ioctx, oid, &op, null_yield);
169 if (ret < 0) {
170 lderr(store->ctx()) << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << ret << dendl;
171 }
172
173 return 0;
174 }
175
176 int RGWOrphanStore::read_entries(const string& oid, const string& marker, map<string, bufferlist> *entries, bool *truncated)
177 {
178 #define MAX_OMAP_GET 100
179 int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET, entries);
180 if (ret < 0 && ret != -ENOENT) {
181 cerr << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << cpp_strerror(-ret) << std::endl;
182 }
183
184 *truncated = (entries->size() == MAX_OMAP_GET);
185
186 return 0;
187 }
188
189 int RGWOrphanSearch::init(const string& job_name, RGWOrphanSearchInfo *info, bool _detailed_mode)
190 {
191 int r = orphan_store.init();
192 if (r < 0) {
193 return r;
194 }
195
196 constexpr int64_t MAX_LIST_OBJS_ENTRIES=100;
197
198 max_list_bucket_entries = std::max(store->ctx()->_conf->rgw_list_bucket_min_readahead,
199 MAX_LIST_OBJS_ENTRIES);
200
201 detailed_mode = _detailed_mode;
202 RGWOrphanSearchState state;
203 r = orphan_store.read_job(job_name, state);
204 if (r < 0 && r != -ENOENT) {
205 lderr(store->ctx()) << "ERROR: failed to read state ret=" << r << dendl;
206 return r;
207 }
208
209 if (r == 0) {
210 search_info = state.info;
211 search_stage = state.stage;
212 } else if (info) { /* r == -ENOENT, initiate a new job if info was provided */
213 search_info = *info;
214 search_info.job_name = job_name;
215 search_info.num_shards = (info->num_shards ? info->num_shards : DEFAULT_NUM_SHARDS);
216 search_info.start_time = ceph_clock_now();
217 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_INIT);
218
219 r = save_state();
220 if (r < 0) {
221 lderr(store->ctx()) << "ERROR: failed to write state ret=" << r << dendl;
222 return r;
223 }
224 } else {
225 lderr(store->ctx()) << "ERROR: job not found" << dendl;
226 return r;
227 }
228
229 index_objs_prefix = RGW_ORPHAN_INDEX_PREFIX + string(".");
230 index_objs_prefix += job_name;
231
232 for (int i = 0; i < search_info.num_shards; i++) {
233 char buf[128];
234
235 snprintf(buf, sizeof(buf), "%s.rados.%d", index_objs_prefix.c_str(), i);
236 all_objs_index[i] = buf;
237
238 snprintf(buf, sizeof(buf), "%s.buckets.%d", index_objs_prefix.c_str(), i);
239 buckets_instance_index[i] = buf;
240
241 snprintf(buf, sizeof(buf), "%s.linked.%d", index_objs_prefix.c_str(), i);
242 linked_objs_index[i] = buf;
243 }
244 return 0;
245 }
246
247 int RGWOrphanSearch::log_oids(map<int, string>& log_shards, map<int, list<string> >& oids)
248 {
249 map<int, list<string> >::iterator miter = oids.begin();
250
251 list<log_iter_info> liters; /* a list of iterator pairs for begin and end */
252
253 for (; miter != oids.end(); ++miter) {
254 log_iter_info info;
255 info.oid = log_shards[miter->first];
256 info.cur = miter->second.begin();
257 info.end = miter->second.end();
258 liters.push_back(info);
259 }
260
261 list<log_iter_info>::iterator list_iter;
262 while (!liters.empty()) {
263 list_iter = liters.begin();
264
265 while (list_iter != liters.end()) {
266 log_iter_info& cur_info = *list_iter;
267
268 list<string>::iterator& cur = cur_info.cur;
269 list<string>::iterator& end = cur_info.end;
270
271 map<string, bufferlist> entries;
272 #define MAX_OMAP_SET_ENTRIES 100
273 for (int j = 0; cur != end && j != MAX_OMAP_SET_ENTRIES; ++cur, ++j) {
274 ldout(store->ctx(), 20) << "adding obj: " << *cur << dendl;
275 entries[*cur] = bufferlist();
276 }
277
278 int ret = orphan_store.store_entries(cur_info.oid, entries);
279 if (ret < 0) {
280 return ret;
281 }
282 list<log_iter_info>::iterator tmp = list_iter;
283 ++list_iter;
284 if (cur == end) {
285 liters.erase(tmp);
286 }
287 }
288 }
289 return 0;
290 }
291
292 int RGWOrphanSearch::build_all_oids_index()
293 {
294 librados::IoCtx ioctx;
295
296 int ret = rgw_init_ioctx(store->getRados()->get_rados_handle(), search_info.pool, ioctx);
297 if (ret < 0) {
298 lderr(store->ctx()) << __func__ << ": rgw_init_ioctx() returned ret=" << ret << dendl;
299 return ret;
300 }
301
302 ioctx.set_namespace(librados::all_nspaces);
303 librados::NObjectIterator i = ioctx.nobjects_begin();
304 librados::NObjectIterator i_end = ioctx.nobjects_end();
305
306 map<int, list<string> > oids;
307
308 int count = 0;
309 uint64_t total = 0;
310
311 cout << "logging all objects in the pool" << std::endl;
312
313 for (; i != i_end; ++i) {
314 string nspace = i->get_nspace();
315 string oid = i->get_oid();
316 string locator = i->get_locator();
317
318 ssize_t pos = oid.find('_');
319 if (pos < 0) {
320 cout << "unidentified oid: " << oid << ", skipping" << std::endl;
321 /* what is this object, oids should be in the format of <bucket marker>_<obj>,
322 * skip this entry
323 */
324 continue;
325 }
326 string stripped_oid = oid.substr(pos + 1);
327 rgw_obj_key key;
328 if (!rgw_obj_key::parse_raw_oid(stripped_oid, &key)) {
329 cout << "cannot parse oid: " << oid << ", skipping" << std::endl;
330 continue;
331 }
332
333 if (key.ns.empty()) {
334 /* skipping head objects, we don't want to remove these as they are mutable and
335 * cleaning them up is racy (can race with object removal and a later recreation)
336 */
337 cout << "skipping head object: oid=" << oid << std::endl;
338 continue;
339 }
340
341 string oid_fp = obj_fingerprint(oid);
342
343 ldout(store->ctx(), 20) << "oid_fp=" << oid_fp << dendl;
344
345 int shard = orphan_shard(oid_fp);
346 oids[shard].push_back(oid);
347
348 #define COUNT_BEFORE_FLUSH 1000
349 ++total;
350 if (++count >= COUNT_BEFORE_FLUSH) {
351 ldout(store->ctx(), 1) << "iterated through " << total << " objects" << dendl;
352 ret = log_oids(all_objs_index, oids);
353 if (ret < 0) {
354 cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
355 return ret;
356 }
357 count = 0;
358 oids.clear();
359 }
360 }
361 ret = log_oids(all_objs_index, oids);
362 if (ret < 0) {
363 cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
364 return ret;
365 }
366
367 return 0;
368 }
369
370 int RGWOrphanSearch::build_buckets_instance_index()
371 {
372 void *handle;
373 int max = 1000;
374 string section = "bucket.instance";
375 int ret = store->ctl()->meta.mgr->list_keys_init(section, &handle);
376 if (ret < 0) {
377 lderr(store->ctx()) << "ERROR: can't get key: " << cpp_strerror(-ret) << dendl;
378 return ret;
379 }
380
381 map<int, list<string> > instances;
382
383 bool truncated;
384
385 RGWObjectCtx obj_ctx(store);
386
387 int count = 0;
388 uint64_t total = 0;
389
390 do {
391 list<string> keys;
392 ret = store->ctl()->meta.mgr->list_keys_next(handle, max, keys, &truncated);
393 if (ret < 0) {
394 lderr(store->ctx()) << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl;
395 return ret;
396 }
397
398 for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) {
399 ++total;
400 ldout(store->ctx(), 10) << "bucket_instance=" << *iter << " total=" << total << dendl;
401 int shard = orphan_shard(*iter);
402 instances[shard].push_back(*iter);
403
404 if (++count >= COUNT_BEFORE_FLUSH) {
405 ret = log_oids(buckets_instance_index, instances);
406 if (ret < 0) {
407 lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
408 return ret;
409 }
410 count = 0;
411 instances.clear();
412 }
413 }
414
415 } while (truncated);
416
417 ret = log_oids(buckets_instance_index, instances);
418 if (ret < 0) {
419 lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl;
420 return ret;
421 }
422 store->ctl()->meta.mgr->list_keys_complete(handle);
423
424 return 0;
425 }
426
427 int RGWOrphanSearch::handle_stat_result(map<int, list<string> >& oids, RGWRados::Object::Stat::Result& result)
428 {
429 set<string> obj_oids;
430 rgw_bucket& bucket = result.obj.bucket;
431 if (!result.manifest) { /* a very very old object, or part of a multipart upload during upload */
432 const string loc = bucket.bucket_id + "_" + result.obj.get_oid();
433 obj_oids.insert(obj_fingerprint(loc));
434
435 /*
436 * multipart parts don't have manifest on them, it's in the meta object. Instead of reading the
437 * meta object, just add a "shadow" object to the mix
438 */
439 obj_oids.insert(obj_fingerprint(loc, "shadow"));
440 } else {
441 RGWObjManifest& manifest = *result.manifest;
442
443 if (!detailed_mode &&
444 manifest.get_obj_size() <= manifest.get_head_size()) {
445 ldout(store->ctx(), 5) << "skipping object as it fits in a head" << dendl;
446 return 0;
447 }
448
449 RGWObjManifest::obj_iterator miter;
450 for (miter = manifest.obj_begin(); miter != manifest.obj_end(); ++miter) {
451 const rgw_raw_obj& loc = miter.get_location().get_raw_obj(store->getRados());
452 string s = loc.oid;
453 obj_oids.insert(obj_fingerprint(s));
454 }
455 }
456
457 for (set<string>::iterator iter = obj_oids.begin(); iter != obj_oids.end(); ++iter) {
458 ldout(store->ctx(), 20) << __func__ << ": oid for obj=" << result.obj << ": " << *iter << dendl;
459
460 int shard = orphan_shard(*iter);
461 oids[shard].push_back(*iter);
462 }
463
464 return 0;
465 }
466
467 int RGWOrphanSearch::pop_and_handle_stat_op(map<int, list<string> >& oids, std::deque<RGWRados::Object::Stat>& ops)
468 {
469 RGWRados::Object::Stat& front_op = ops.front();
470
471 int ret = front_op.wait();
472 if (ret < 0) {
473 if (ret != -ENOENT) {
474 lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
475 }
476 goto done;
477 }
478 ret = handle_stat_result(oids, front_op.result);
479 if (ret < 0) {
480 lderr(store->ctx()) << "ERROR: handle_stat_response() returned error: " << cpp_strerror(-ret) << dendl;
481 }
482 done:
483 ops.pop_front();
484 return ret;
485 }
486
487 int RGWOrphanSearch::build_linked_oids_for_bucket(const string& bucket_instance_id, map<int, list<string> >& oids)
488 {
489 RGWObjectCtx obj_ctx(store);
490 auto sysobj_ctx = store->svc()->sysobj->init_obj_ctx();
491
492 rgw_bucket orphan_bucket;
493 int shard_id;
494 int ret = rgw_bucket_parse_bucket_key(store->ctx(), bucket_instance_id,
495 &orphan_bucket, &shard_id);
496 if (ret < 0) {
497 ldout(store->ctx(),0) << __func__ << " failed to parse bucket instance: "
498 << bucket_instance_id << " skipping" << dendl;
499 return ret;
500 }
501
502 RGWBucketInfo cur_bucket_info;
503 ret = store->getRados()->get_bucket_info(store->svc(), orphan_bucket.tenant,
504 orphan_bucket.name, cur_bucket_info, nullptr, null_yield);
505 if (ret < 0) {
506 if (ret == -ENOENT) {
507 /* probably raced with bucket removal */
508 return 0;
509 }
510 lderr(store->ctx()) << __func__ << ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << ret << dendl;
511 return ret;
512 }
513
514 if (cur_bucket_info.bucket.bucket_id != orphan_bucket.bucket_id) {
515 ldout(store->ctx(), 0) << __func__ << ": Skipping stale bucket instance: "
516 << orphan_bucket.name << ": "
517 << orphan_bucket.bucket_id << dendl;
518 return 0;
519 }
520
521 if (cur_bucket_info.reshard_status == cls_rgw_reshard_status::IN_PROGRESS) {
522 ldout(store->ctx(), 0) << __func__ << ": reshard in progress. Skipping "
523 << orphan_bucket.name << ": "
524 << orphan_bucket.bucket_id << dendl;
525 return 0;
526 }
527
528 RGWBucketInfo bucket_info;
529 ret = store->getRados()->get_bucket_instance_info(sysobj_ctx, bucket_instance_id, bucket_info, nullptr, nullptr, null_yield);
530 if (ret < 0) {
531 if (ret == -ENOENT) {
532 /* probably raced with bucket removal */
533 return 0;
534 }
535 lderr(store->ctx()) << __func__ << ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << ret << dendl;
536 return ret;
537 }
538
539 ldout(store->ctx(), 10) << "building linked oids for bucket instance: " << bucket_instance_id << dendl;
540 RGWRados::Bucket target(store->getRados(), bucket_info);
541 RGWRados::Bucket::List list_op(&target);
542
543 string marker;
544 list_op.params.marker = rgw_obj_key(marker);
545 list_op.params.list_versions = true;
546 list_op.params.enforce_ns = false;
547
548 bool truncated;
549
550 deque<RGWRados::Object::Stat> stat_ops;
551
552 do {
553 vector<rgw_bucket_dir_entry> result;
554
555 ret = list_op.list_objects(max_list_bucket_entries,
556 &result, nullptr, &truncated, null_yield);
557 if (ret < 0) {
558 cerr << "ERROR: store->list_objects(): " << cpp_strerror(-ret) << std::endl;
559 return ret;
560 }
561
562 for (vector<rgw_bucket_dir_entry>::iterator iter = result.begin(); iter != result.end(); ++iter) {
563 rgw_bucket_dir_entry& entry = *iter;
564 if (entry.key.instance.empty()) {
565 ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << dendl;
566 } else {
567 ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << " [" << entry.key.instance << "]" << dendl;
568 }
569
570 ldout(store->ctx(), 20) << __func__ << ": entry.key.name=" << entry.key.name << " entry.key.instance=" << entry.key.instance << dendl;
571
572 if (!detailed_mode &&
573 entry.meta.accounted_size <= (uint64_t)store->ctx()->_conf->rgw_max_chunk_size) {
574 ldout(store->ctx(),5) << __func__ << "skipping stat as the object " << entry.key.name
575 << "fits in a head" << dendl;
576 continue;
577 }
578
579 rgw_obj obj(bucket_info.bucket, entry.key);
580
581 RGWRados::Object op_target(store->getRados(), bucket_info, obj_ctx, obj);
582
583 stat_ops.push_back(RGWRados::Object::Stat(&op_target));
584 RGWRados::Object::Stat& op = stat_ops.back();
585
586
587 ret = op.stat_async();
588 if (ret < 0) {
589 lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
590 return ret;
591 }
592 if (stat_ops.size() >= max_concurrent_ios) {
593 ret = pop_and_handle_stat_op(oids, stat_ops);
594 if (ret < 0) {
595 if (ret != -ENOENT) {
596 lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
597 }
598 }
599 }
600 if (oids.size() >= COUNT_BEFORE_FLUSH) {
601 ret = log_oids(linked_objs_index, oids);
602 if (ret < 0) {
603 cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
604 return ret;
605 }
606 oids.clear();
607 }
608 }
609 } while (truncated);
610
611 while (!stat_ops.empty()) {
612 ret = pop_and_handle_stat_op(oids, stat_ops);
613 if (ret < 0) {
614 if (ret != -ENOENT) {
615 lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl;
616 }
617 }
618 }
619
620 return 0;
621 }
622
623 int RGWOrphanSearch::build_linked_oids_index()
624 {
625 map<int, list<string> > oids;
626 map<int, string>::iterator iter = buckets_instance_index.find(search_stage.shard);
627 for (; iter != buckets_instance_index.end(); ++iter) {
628 ldout(store->ctx(), 0) << "building linked oids index: " << iter->first << "/" << buckets_instance_index.size() << dendl;
629 bool truncated;
630
631 string oid = iter->second;
632
633 do {
634 map<string, bufferlist> entries;
635 int ret = orphan_store.read_entries(oid, search_stage.marker, &entries, &truncated);
636 if (ret == -ENOENT) {
637 truncated = false;
638 ret = 0;
639 }
640
641 if (ret < 0) {
642 lderr(store->ctx()) << __func__ << ": ERROR: read_entries() oid=" << oid << " returned ret=" << ret << dendl;
643 return ret;
644 }
645
646 if (entries.empty()) {
647 break;
648 }
649
650 for (map<string, bufferlist>::iterator eiter = entries.begin(); eiter != entries.end(); ++eiter) {
651 ldout(store->ctx(), 20) << " indexed entry: " << eiter->first << dendl;
652 ret = build_linked_oids_for_bucket(eiter->first, oids);
653 if (ret < 0) {
654 lderr(store->ctx()) << __func__ << ": ERROR: build_linked_oids_for_bucket() indexed entry=" << eiter->first
655 << " returned ret=" << ret << dendl;
656 return ret;
657 }
658 }
659
660 search_stage.shard = iter->first;
661 search_stage.marker = entries.rbegin()->first; /* last entry */
662 } while (truncated);
663
664 search_stage.marker.clear();
665 }
666
667 int ret = log_oids(linked_objs_index, oids);
668 if (ret < 0) {
669 cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl;
670 return ret;
671 }
672
673 ret = save_state();
674 if (ret < 0) {
675 cerr << __func__ << ": ERROR: failed to write state ret=" << ret << std::endl;
676 return ret;
677 }
678
679 return 0;
680 }
681
682 class OMAPReader {
683 librados::IoCtx ioctx;
684 string oid;
685
686 map<string, bufferlist> entries;
687 map<string, bufferlist>::iterator iter;
688 string marker;
689 bool truncated;
690
691 public:
692 OMAPReader(librados::IoCtx& _ioctx, const string& _oid) : ioctx(_ioctx), oid(_oid), truncated(true) {
693 iter = entries.end();
694 }
695
696 int get_next(string *key, bufferlist *pbl, bool *done);
697 };
698
699 int OMAPReader::get_next(string *key, bufferlist *pbl, bool *done)
700 {
701 if (iter != entries.end()) {
702 *key = iter->first;
703 if (pbl) {
704 *pbl = iter->second;
705 }
706 ++iter;
707 *done = false;
708 marker = *key;
709 return 0;
710 }
711
712 if (!truncated) {
713 *done = true;
714 return 0;
715 }
716
717 #define MAX_OMAP_GET_ENTRIES 100
718 int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET_ENTRIES, &entries);
719 if (ret < 0) {
720 if (ret == -ENOENT) {
721 *done = true;
722 return 0;
723 }
724 return ret;
725 }
726
727 truncated = (entries.size() == MAX_OMAP_GET_ENTRIES);
728 iter = entries.begin();
729 return get_next(key, pbl, done);
730 }
731
732 int RGWOrphanSearch::compare_oid_indexes()
733 {
734 ceph_assert(linked_objs_index.size() == all_objs_index.size());
735
736 librados::IoCtx& ioctx = orphan_store.get_ioctx();
737
738 librados::IoCtx data_ioctx;
739
740 int ret = rgw_init_ioctx(store->getRados()->get_rados_handle(), search_info.pool, data_ioctx);
741 if (ret < 0) {
742 lderr(store->ctx()) << __func__ << ": rgw_init_ioctx() returned ret=" << ret << dendl;
743 return ret;
744 }
745
746 uint64_t time_threshold = search_info.start_time.sec() - stale_secs;
747
748 map<int, string>::iterator liter = linked_objs_index.begin();
749 map<int, string>::iterator aiter = all_objs_index.begin();
750
751 for (; liter != linked_objs_index.end(); ++liter, ++aiter) {
752 OMAPReader linked_entries(ioctx, liter->second);
753 OMAPReader all_entries(ioctx, aiter->second);
754
755 bool done;
756
757 string cur_linked;
758 bool linked_done = false;
759
760
761 do {
762 string key;
763 int r = all_entries.get_next(&key, NULL, &done);
764 if (r < 0) {
765 return r;
766 }
767 if (done) {
768 break;
769 }
770
771 string key_fp = obj_fingerprint(key);
772
773 while (cur_linked < key_fp && !linked_done) {
774 r = linked_entries.get_next(&cur_linked, NULL, &linked_done);
775 if (r < 0) {
776 return r;
777 }
778 }
779
780 if (cur_linked == key_fp) {
781 ldout(store->ctx(), 20) << "linked: " << key << dendl;
782 continue;
783 }
784
785 time_t mtime;
786 r = data_ioctx.stat(key, NULL, &mtime);
787 if (r < 0) {
788 if (r != -ENOENT) {
789 lderr(store->ctx()) << "ERROR: ioctx.stat(" << key << ") returned ret=" << r << dendl;
790 }
791 continue;
792 }
793 if (stale_secs && (uint64_t)mtime >= time_threshold) {
794 ldout(store->ctx(), 20) << "skipping: " << key << " (mtime=" << mtime << " threshold=" << time_threshold << ")" << dendl;
795 continue;
796 }
797 ldout(store->ctx(), 20) << "leaked: " << key << dendl;
798 cout << "leaked: " << key << std::endl;
799 } while (!done);
800 }
801
802 return 0;
803 }
804
805 int RGWOrphanSearch::run()
806 {
807 int r;
808
809 switch (search_stage.stage) {
810
811 case ORPHAN_SEARCH_STAGE_INIT:
812 ldout(store->ctx(), 0) << __func__ << "(): initializing state" << dendl;
813 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSPOOL);
814 r = save_state();
815 if (r < 0) {
816 lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
817 return r;
818 }
819 // fall through
820 case ORPHAN_SEARCH_STAGE_LSPOOL:
821 ldout(store->ctx(), 0) << __func__ << "(): building index of all objects in pool" << dendl;
822 r = build_all_oids_index();
823 if (r < 0) {
824 lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
825 return r;
826 }
827
828 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSBUCKETS);
829 r = save_state();
830 if (r < 0) {
831 lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
832 return r;
833 }
834 // fall through
835
836 case ORPHAN_SEARCH_STAGE_LSBUCKETS:
837 ldout(store->ctx(), 0) << __func__ << "(): building index of all bucket indexes" << dendl;
838 r = build_buckets_instance_index();
839 if (r < 0) {
840 lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
841 return r;
842 }
843
844 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_ITERATE_BI);
845 r = save_state();
846 if (r < 0) {
847 lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
848 return r;
849 }
850 // fall through
851
852
853 case ORPHAN_SEARCH_STAGE_ITERATE_BI:
854 ldout(store->ctx(), 0) << __func__ << "(): building index of all linked objects" << dendl;
855 r = build_linked_oids_index();
856 if (r < 0) {
857 lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
858 return r;
859 }
860
861 search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_COMPARE);
862 r = save_state();
863 if (r < 0) {
864 lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl;
865 return r;
866 }
867 // fall through
868
869 case ORPHAN_SEARCH_STAGE_COMPARE:
870 r = compare_oid_indexes();
871 if (r < 0) {
872 lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl;
873 return r;
874 }
875
876 break;
877
878 default:
879 ceph_abort();
880 };
881
882 return 0;
883 }
884
885
886 int RGWOrphanSearch::remove_index(map<int, string>& index)
887 {
888 librados::IoCtx& ioctx = orphan_store.get_ioctx();
889
890 for (map<int, string>::iterator iter = index.begin(); iter != index.end(); ++iter) {
891 int r = ioctx.remove(iter->second);
892 if (r < 0) {
893 if (r != -ENOENT) {
894 ldout(store->ctx(), 0) << "ERROR: couldn't remove " << iter->second << ": ret=" << r << dendl;
895 }
896 }
897 }
898 return 0;
899 }
900
901 int RGWOrphanSearch::finish()
902 {
903 int r = remove_index(all_objs_index);
904 if (r < 0) {
905 ldout(store->ctx(), 0) << "ERROR: remove_index(" << all_objs_index << ") returned ret=" << r << dendl;
906 }
907 r = remove_index(buckets_instance_index);
908 if (r < 0) {
909 ldout(store->ctx(), 0) << "ERROR: remove_index(" << buckets_instance_index << ") returned ret=" << r << dendl;
910 }
911 r = remove_index(linked_objs_index);
912 if (r < 0) {
913 ldout(store->ctx(), 0) << "ERROR: remove_index(" << linked_objs_index << ") returned ret=" << r << dendl;
914 }
915
916 r = orphan_store.remove_job(search_info.job_name);
917 if (r < 0) {
918 ldout(store->ctx(), 0) << "ERROR: could not remove job name (" << search_info.job_name << ") ret=" << r << dendl;
919 }
920
921 return r;
922 }