]>
Commit | Line | Data |
---|---|---|
31f18b77 FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
7c673cae FG |
3 | |
4 | #include <string> | |
5 | ||
6 | using namespace std; | |
7 | ||
8 | #include "common/config.h" | |
9 | #include "common/Formatter.h" | |
10 | #include "common/errno.h" | |
11 | ||
12 | #include "rgw_rados.h" | |
13 | #include "rgw_orphan.h" | |
14 | ||
15 | #define dout_subsys ceph_subsys_rgw | |
16 | ||
17 | #define DEFAULT_NUM_SHARDS 64 | |
18 | ||
19 | static string obj_fingerprint(const string& oid, const char *force_ns = NULL) | |
20 | { | |
21 | ssize_t pos = oid.find('_'); | |
22 | if (pos < 0) { | |
23 | cerr << "ERROR: object does not have a bucket marker: " << oid << std::endl; | |
24 | } | |
25 | ||
26 | string obj_marker = oid.substr(0, pos); | |
27 | ||
28 | rgw_obj_key key; | |
29 | ||
30 | rgw_obj_key::parse_raw_oid(oid.substr(pos + 1), &key); | |
31 | ||
32 | if (key.ns.empty()) { | |
33 | return oid; | |
34 | } | |
35 | ||
36 | string s = oid; | |
37 | ||
38 | if (force_ns) { | |
39 | rgw_bucket b; | |
40 | rgw_obj new_obj(b, key); | |
41 | s = obj_marker + "_" + new_obj.get_oid(); | |
42 | } | |
43 | ||
44 | /* cut out suffix */ | |
45 | size_t i = s.size() - 1; | |
46 | for (; i >= s.size() - 10; --i) { | |
47 | char c = s[i]; | |
48 | if (!isdigit(c) && c != '.' && c != '_') { | |
49 | break; | |
50 | } | |
51 | } | |
52 | ||
53 | return s.substr(0, i + 1); | |
54 | } | |
55 | ||
56 | int RGWOrphanStore::read_job(const string& job_name, RGWOrphanSearchState & state) | |
57 | { | |
58 | set<string> keys; | |
59 | map<string, bufferlist> vals; | |
60 | keys.insert(job_name); | |
61 | int r = ioctx.omap_get_vals_by_keys(oid, keys, &vals); | |
62 | if (r < 0) { | |
63 | return r; | |
64 | } | |
65 | ||
66 | map<string, bufferlist>::iterator iter = vals.find(job_name); | |
67 | if (iter == vals.end()) { | |
68 | return -ENOENT; | |
69 | } | |
70 | ||
71 | try { | |
72 | bufferlist& bl = iter->second; | |
73 | ::decode(state, bl); | |
74 | } catch (buffer::error& err) { | |
75 | lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl; | |
76 | return -EIO; | |
77 | } | |
78 | ||
79 | return 0; | |
80 | } | |
81 | ||
82 | int RGWOrphanStore::write_job(const string& job_name, const RGWOrphanSearchState& state) | |
83 | { | |
84 | map<string, bufferlist> vals; | |
85 | bufferlist bl; | |
86 | ::encode(state, bl); | |
87 | vals[job_name] = bl; | |
88 | int r = ioctx.omap_set(oid, vals); | |
89 | if (r < 0) { | |
90 | return r; | |
91 | } | |
92 | ||
93 | return 0; | |
94 | } | |
95 | ||
96 | int RGWOrphanStore::remove_job(const string& job_name) | |
97 | { | |
98 | set<string> keys; | |
99 | keys.insert(job_name); | |
100 | ||
101 | int r = ioctx.omap_rm_keys(oid, keys); | |
102 | if (r < 0) { | |
103 | return r; | |
104 | } | |
105 | ||
106 | return 0; | |
107 | } | |
108 | ||
109 | int RGWOrphanStore::list_jobs(map <string,RGWOrphanSearchState>& job_list) | |
110 | { | |
111 | map <string,bufferlist> vals; | |
112 | int MAX_READ=1024; | |
113 | string marker=""; | |
114 | int r = 0; | |
115 | ||
116 | // loop through all the omap vals from index object, storing them to job_list, | |
117 | // read in batches of 1024, we update the marker every iteration and exit the | |
118 | // loop when we find that total size read out is less than batch size | |
119 | do { | |
120 | r = ioctx.omap_get_vals(oid, marker, MAX_READ, &vals); | |
121 | if (r < 0) { | |
122 | return r; | |
123 | } | |
124 | r = vals.size(); | |
125 | ||
126 | for (const auto &it : vals) { | |
127 | marker=it.first; | |
128 | RGWOrphanSearchState state; | |
129 | try { | |
130 | bufferlist bl = it.second; | |
131 | ::decode(state, bl); | |
132 | } catch (buffer::error& err) { | |
133 | lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl; | |
134 | return -EIO; | |
135 | } | |
136 | job_list[it.first] = state; | |
137 | } | |
138 | } while (r == MAX_READ); | |
139 | ||
140 | return 0; | |
141 | } | |
142 | ||
143 | int RGWOrphanStore::init() | |
144 | { | |
145 | rgw_pool& log_pool = store->get_zone_params().log_pool; | |
146 | int r = rgw_init_ioctx(store->get_rados_handle(), log_pool, ioctx); | |
147 | if (r < 0) { | |
148 | cerr << "ERROR: failed to open log pool (" << log_pool << " ret=" << r << std::endl; | |
149 | return r; | |
150 | } | |
151 | ||
152 | return 0; | |
153 | } | |
154 | ||
155 | int RGWOrphanStore::store_entries(const string& oid, const map<string, bufferlist>& entries) | |
156 | { | |
157 | librados::ObjectWriteOperation op; | |
158 | op.omap_set(entries); | |
159 | cout << "storing " << entries.size() << " entries at " << oid << std::endl; | |
160 | ldout(store->ctx(), 20) << "storing " << entries.size() << " entries at " << oid << ": " << dendl; | |
161 | for (map<string, bufferlist>::const_iterator iter = entries.begin(); iter != entries.end(); ++iter) { | |
162 | ldout(store->ctx(), 20) << " > " << iter->first << dendl; | |
163 | } | |
164 | int ret = ioctx.operate(oid, &op); | |
165 | if (ret < 0) { | |
166 | lderr(store->ctx()) << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << ret << dendl; | |
167 | } | |
168 | ||
169 | return 0; | |
170 | } | |
171 | ||
172 | int RGWOrphanStore::read_entries(const string& oid, const string& marker, map<string, bufferlist> *entries, bool *truncated) | |
173 | { | |
174 | #define MAX_OMAP_GET 100 | |
175 | int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET, entries); | |
176 | if (ret < 0 && ret != -ENOENT) { | |
177 | cerr << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << cpp_strerror(-ret) << std::endl; | |
178 | } | |
179 | ||
180 | *truncated = (entries->size() == MAX_OMAP_GET); | |
181 | ||
182 | return 0; | |
183 | } | |
184 | ||
185 | int RGWOrphanSearch::init(const string& job_name, RGWOrphanSearchInfo *info) { | |
186 | int r = orphan_store.init(); | |
187 | if (r < 0) { | |
188 | return r; | |
189 | } | |
190 | ||
191 | RGWOrphanSearchState state; | |
192 | r = orphan_store.read_job(job_name, state); | |
193 | if (r < 0 && r != -ENOENT) { | |
194 | lderr(store->ctx()) << "ERROR: failed to read state ret=" << r << dendl; | |
195 | return r; | |
196 | } | |
197 | ||
198 | if (r == 0) { | |
199 | search_info = state.info; | |
200 | search_stage = state.stage; | |
201 | } else if (info) { /* r == -ENOENT, initiate a new job if info was provided */ | |
202 | search_info = *info; | |
203 | search_info.job_name = job_name; | |
204 | search_info.num_shards = (info->num_shards ? info->num_shards : DEFAULT_NUM_SHARDS); | |
205 | search_info.start_time = ceph_clock_now(); | |
206 | search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_INIT); | |
207 | ||
208 | r = save_state(); | |
209 | if (r < 0) { | |
210 | lderr(store->ctx()) << "ERROR: failed to write state ret=" << r << dendl; | |
211 | return r; | |
212 | } | |
213 | } else { | |
214 | lderr(store->ctx()) << "ERROR: job not found" << dendl; | |
215 | return r; | |
216 | } | |
217 | ||
218 | index_objs_prefix = RGW_ORPHAN_INDEX_PREFIX + string("."); | |
219 | index_objs_prefix += job_name; | |
220 | ||
221 | for (int i = 0; i < search_info.num_shards; i++) { | |
222 | char buf[128]; | |
223 | ||
224 | snprintf(buf, sizeof(buf), "%s.rados.%d", index_objs_prefix.c_str(), i); | |
225 | all_objs_index[i] = buf; | |
226 | ||
227 | snprintf(buf, sizeof(buf), "%s.buckets.%d", index_objs_prefix.c_str(), i); | |
228 | buckets_instance_index[i] = buf; | |
229 | ||
230 | snprintf(buf, sizeof(buf), "%s.linked.%d", index_objs_prefix.c_str(), i); | |
231 | linked_objs_index[i] = buf; | |
232 | } | |
233 | return 0; | |
234 | } | |
235 | ||
236 | int RGWOrphanSearch::log_oids(map<int, string>& log_shards, map<int, list<string> >& oids) | |
237 | { | |
238 | map<int, list<string> >::iterator miter = oids.begin(); | |
239 | ||
240 | list<log_iter_info> liters; /* a list of iterator pairs for begin and end */ | |
241 | ||
242 | for (; miter != oids.end(); ++miter) { | |
243 | log_iter_info info; | |
244 | info.oid = log_shards[miter->first]; | |
245 | info.cur = miter->second.begin(); | |
246 | info.end = miter->second.end(); | |
247 | liters.push_back(info); | |
248 | } | |
249 | ||
250 | list<log_iter_info>::iterator list_iter; | |
251 | while (!liters.empty()) { | |
252 | list_iter = liters.begin(); | |
253 | ||
254 | while (list_iter != liters.end()) { | |
255 | log_iter_info& cur_info = *list_iter; | |
256 | ||
257 | list<string>::iterator& cur = cur_info.cur; | |
258 | list<string>::iterator& end = cur_info.end; | |
259 | ||
260 | map<string, bufferlist> entries; | |
261 | #define MAX_OMAP_SET_ENTRIES 100 | |
262 | for (int j = 0; cur != end && j != MAX_OMAP_SET_ENTRIES; ++cur, ++j) { | |
263 | ldout(store->ctx(), 20) << "adding obj: " << *cur << dendl; | |
264 | entries[*cur] = bufferlist(); | |
265 | } | |
266 | ||
267 | int ret = orphan_store.store_entries(cur_info.oid, entries); | |
268 | if (ret < 0) { | |
269 | return ret; | |
270 | } | |
271 | list<log_iter_info>::iterator tmp = list_iter; | |
272 | ++list_iter; | |
273 | if (cur == end) { | |
274 | liters.erase(tmp); | |
275 | } | |
276 | } | |
277 | } | |
278 | return 0; | |
279 | } | |
280 | ||
281 | int RGWOrphanSearch::build_all_oids_index() | |
282 | { | |
283 | librados::IoCtx ioctx; | |
284 | ||
285 | int ret = rgw_init_ioctx(store->get_rados_handle(), search_info.pool, ioctx); | |
286 | if (ret < 0) { | |
287 | lderr(store->ctx()) << __func__ << ": rgw_init_ioctx() returned ret=" << ret << dendl; | |
288 | return ret; | |
289 | } | |
290 | ||
291 | ioctx.set_namespace(librados::all_nspaces); | |
292 | librados::NObjectIterator i = ioctx.nobjects_begin(); | |
293 | librados::NObjectIterator i_end = ioctx.nobjects_end(); | |
294 | ||
295 | map<int, list<string> > oids; | |
296 | ||
297 | int count = 0; | |
298 | uint64_t total = 0; | |
299 | ||
300 | cout << "logging all objects in the pool" << std::endl; | |
301 | ||
302 | for (; i != i_end; ++i) { | |
303 | string nspace = i->get_nspace(); | |
304 | string oid = i->get_oid(); | |
305 | string locator = i->get_locator(); | |
306 | ||
307 | ssize_t pos = oid.find('_'); | |
308 | if (pos < 0) { | |
309 | cout << "unidentified oid: " << oid << ", skipping" << std::endl; | |
310 | /* what is this object, oids should be in the format of <bucket marker>_<obj>, | |
311 | * skip this entry | |
312 | */ | |
313 | continue; | |
314 | } | |
315 | string stripped_oid = oid.substr(pos + 1); | |
316 | rgw_obj_key key; | |
317 | if (!rgw_obj_key::parse_raw_oid(stripped_oid, &key)) { | |
318 | cout << "cannot parse oid: " << oid << ", skipping" << std::endl; | |
319 | continue; | |
320 | } | |
321 | ||
322 | if (key.ns.empty()) { | |
323 | /* skipping head objects, we don't want to remove these as they are mutable and | |
324 | * cleaning them up is racy (can race with object removal and a later recreation) | |
325 | */ | |
326 | cout << "skipping head object: oid=" << oid << std::endl; | |
327 | continue; | |
328 | } | |
329 | ||
330 | string oid_fp = obj_fingerprint(oid); | |
331 | ||
332 | ldout(store->ctx(), 20) << "oid_fp=" << oid_fp << dendl; | |
333 | ||
334 | int shard = orphan_shard(oid_fp); | |
335 | oids[shard].push_back(oid); | |
336 | ||
337 | #define COUNT_BEFORE_FLUSH 1000 | |
338 | ++total; | |
339 | if (++count >= COUNT_BEFORE_FLUSH) { | |
340 | ldout(store->ctx(), 1) << "iterated through " << total << " objects" << dendl; | |
341 | ret = log_oids(all_objs_index, oids); | |
342 | if (ret < 0) { | |
343 | cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl; | |
344 | return ret; | |
345 | } | |
346 | count = 0; | |
347 | oids.clear(); | |
348 | } | |
349 | } | |
350 | ret = log_oids(all_objs_index, oids); | |
351 | if (ret < 0) { | |
352 | cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl; | |
353 | return ret; | |
354 | } | |
355 | ||
356 | return 0; | |
357 | } | |
358 | ||
359 | int RGWOrphanSearch::build_buckets_instance_index() | |
360 | { | |
361 | void *handle; | |
362 | int max = 1000; | |
363 | string section = "bucket.instance"; | |
364 | int ret = store->meta_mgr->list_keys_init(section, &handle); | |
365 | if (ret < 0) { | |
366 | lderr(store->ctx()) << "ERROR: can't get key: " << cpp_strerror(-ret) << dendl; | |
367 | return -ret; | |
368 | } | |
369 | ||
370 | map<int, list<string> > instances; | |
371 | ||
372 | bool truncated; | |
373 | ||
374 | RGWObjectCtx obj_ctx(store); | |
375 | ||
376 | int count = 0; | |
377 | uint64_t total = 0; | |
378 | ||
379 | do { | |
380 | list<string> keys; | |
381 | ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated); | |
382 | if (ret < 0) { | |
383 | lderr(store->ctx()) << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl; | |
384 | return -ret; | |
385 | } | |
386 | ||
387 | for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) { | |
388 | ++total; | |
389 | ldout(store->ctx(), 10) << "bucket_instance=" << *iter << " total=" << total << dendl; | |
390 | int shard = orphan_shard(*iter); | |
391 | instances[shard].push_back(*iter); | |
392 | ||
393 | if (++count >= COUNT_BEFORE_FLUSH) { | |
394 | ret = log_oids(buckets_instance_index, instances); | |
395 | if (ret < 0) { | |
396 | lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl; | |
397 | return ret; | |
398 | } | |
399 | count = 0; | |
400 | instances.clear(); | |
401 | } | |
402 | } | |
403 | ||
404 | } while (truncated); | |
405 | ||
406 | ret = log_oids(buckets_instance_index, instances); | |
407 | if (ret < 0) { | |
408 | lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl; | |
409 | return ret; | |
410 | } | |
411 | store->meta_mgr->list_keys_complete(handle); | |
412 | ||
413 | return 0; | |
414 | } | |
415 | ||
416 | int RGWOrphanSearch::handle_stat_result(map<int, list<string> >& oids, RGWRados::Object::Stat::Result& result) | |
417 | { | |
418 | set<string> obj_oids; | |
419 | rgw_bucket& bucket = result.obj.bucket; | |
420 | if (!result.has_manifest) { /* a very very old object, or part of a multipart upload during upload */ | |
421 | const string loc = bucket.bucket_id + "_" + result.obj.get_oid(); | |
422 | obj_oids.insert(obj_fingerprint(loc)); | |
423 | ||
424 | /* | |
425 | * multipart parts don't have manifest on them, it's in the meta object. Instead of reading the | |
426 | * meta object, just add a "shadow" object to the mix | |
427 | */ | |
428 | obj_oids.insert(obj_fingerprint(loc, "shadow")); | |
429 | } else { | |
430 | RGWObjManifest& manifest = result.manifest; | |
431 | ||
432 | RGWObjManifest::obj_iterator miter; | |
433 | for (miter = manifest.obj_begin(); miter != manifest.obj_end(); ++miter) { | |
434 | const rgw_raw_obj& loc = miter.get_location().get_raw_obj(store); | |
435 | string s = loc.oid; | |
436 | obj_oids.insert(obj_fingerprint(s)); | |
437 | } | |
438 | } | |
439 | ||
440 | for (set<string>::iterator iter = obj_oids.begin(); iter != obj_oids.end(); ++iter) { | |
441 | ldout(store->ctx(), 20) << __func__ << ": oid for obj=" << result.obj << ": " << *iter << dendl; | |
442 | ||
443 | int shard = orphan_shard(*iter); | |
444 | oids[shard].push_back(*iter); | |
445 | } | |
446 | ||
447 | return 0; | |
448 | } | |
449 | ||
450 | int RGWOrphanSearch::pop_and_handle_stat_op(map<int, list<string> >& oids, std::deque<RGWRados::Object::Stat>& ops) | |
451 | { | |
452 | RGWRados::Object::Stat& front_op = ops.front(); | |
453 | ||
454 | int ret = front_op.wait(); | |
455 | if (ret < 0) { | |
456 | if (ret != -ENOENT) { | |
457 | lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl; | |
458 | } | |
459 | goto done; | |
460 | } | |
461 | ret = handle_stat_result(oids, front_op.result); | |
462 | if (ret < 0) { | |
463 | lderr(store->ctx()) << "ERROR: handle_stat_response() returned error: " << cpp_strerror(-ret) << dendl; | |
464 | } | |
465 | done: | |
466 | ops.pop_front(); | |
467 | return ret; | |
468 | } | |
469 | ||
470 | int RGWOrphanSearch::build_linked_oids_for_bucket(const string& bucket_instance_id, map<int, list<string> >& oids) | |
471 | { | |
472 | ldout(store->ctx(), 10) << "building linked oids for bucket instance: " << bucket_instance_id << dendl; | |
473 | RGWBucketInfo bucket_info; | |
474 | RGWObjectCtx obj_ctx(store); | |
475 | int ret = store->get_bucket_instance_info(obj_ctx, bucket_instance_id, bucket_info, NULL, NULL); | |
476 | if (ret < 0) { | |
477 | if (ret == -ENOENT) { | |
478 | /* probably raced with bucket removal */ | |
479 | return 0; | |
480 | } | |
481 | lderr(store->ctx()) << __func__ << ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << ret << dendl; | |
482 | return ret; | |
483 | } | |
484 | ||
485 | RGWRados::Bucket target(store, bucket_info); | |
486 | RGWRados::Bucket::List list_op(&target); | |
487 | ||
488 | string marker; | |
489 | list_op.params.marker = rgw_obj_key(marker); | |
490 | list_op.params.list_versions = true; | |
491 | list_op.params.enforce_ns = false; | |
492 | ||
493 | bool truncated; | |
494 | ||
495 | deque<RGWRados::Object::Stat> stat_ops; | |
496 | ||
497 | int count = 0; | |
498 | ||
499 | do { | |
500 | vector<rgw_bucket_dir_entry> result; | |
501 | ||
502 | #define MAX_LIST_OBJS_ENTRIES 100 | |
503 | ret = list_op.list_objects(MAX_LIST_OBJS_ENTRIES, &result, NULL, &truncated); | |
504 | if (ret < 0) { | |
505 | cerr << "ERROR: store->list_objects(): " << cpp_strerror(-ret) << std::endl; | |
506 | return -ret; | |
507 | } | |
508 | ||
509 | for (vector<rgw_bucket_dir_entry>::iterator iter = result.begin(); iter != result.end(); ++iter) { | |
510 | rgw_bucket_dir_entry& entry = *iter; | |
511 | if (entry.key.instance.empty()) { | |
512 | ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << dendl; | |
513 | } else { | |
514 | ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << " [" << entry.key.instance << "]" << dendl; | |
515 | } | |
516 | ||
517 | ldout(store->ctx(), 20) << __func__ << ": entry.key.name=" << entry.key.name << " entry.key.instance=" << entry.key.instance << dendl; | |
518 | rgw_obj obj(bucket_info.bucket, entry.key); | |
519 | ||
520 | RGWRados::Object op_target(store, bucket_info, obj_ctx, obj); | |
521 | ||
522 | stat_ops.push_back(RGWRados::Object::Stat(&op_target)); | |
523 | RGWRados::Object::Stat& op = stat_ops.back(); | |
524 | ||
525 | ||
526 | ret = op.stat_async(); | |
527 | if (ret < 0) { | |
528 | lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl; | |
529 | return ret; | |
530 | } | |
531 | if (stat_ops.size() >= max_concurrent_ios) { | |
532 | ret = pop_and_handle_stat_op(oids, stat_ops); | |
533 | if (ret < 0) { | |
534 | if (ret != -ENOENT) { | |
535 | lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl; | |
536 | } | |
537 | } | |
538 | } | |
539 | if (++count >= COUNT_BEFORE_FLUSH) { | |
540 | ret = log_oids(linked_objs_index, oids); | |
541 | if (ret < 0) { | |
542 | cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl; | |
543 | return ret; | |
544 | } | |
545 | count = 0; | |
546 | oids.clear(); | |
547 | } | |
548 | } | |
549 | } while (truncated); | |
550 | ||
551 | while (!stat_ops.empty()) { | |
552 | ret = pop_and_handle_stat_op(oids, stat_ops); | |
553 | if (ret < 0) { | |
554 | if (ret != -ENOENT) { | |
555 | lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl; | |
556 | } | |
557 | } | |
558 | } | |
559 | ||
560 | return 0; | |
561 | } | |
562 | ||
563 | int RGWOrphanSearch::build_linked_oids_index() | |
564 | { | |
565 | map<int, list<string> > oids; | |
566 | map<int, string>::iterator iter = buckets_instance_index.find(search_stage.shard); | |
567 | for (; iter != buckets_instance_index.end(); ++iter) { | |
568 | ldout(store->ctx(), 0) << "building linked oids index: " << iter->first << "/" << buckets_instance_index.size() << dendl; | |
569 | bool truncated; | |
570 | ||
571 | string oid = iter->second; | |
572 | ||
573 | do { | |
574 | map<string, bufferlist> entries; | |
575 | int ret = orphan_store.read_entries(oid, search_stage.marker, &entries, &truncated); | |
576 | if (ret == -ENOENT) { | |
577 | truncated = false; | |
578 | ret = 0; | |
579 | } | |
580 | ||
581 | if (ret < 0) { | |
582 | lderr(store->ctx()) << __func__ << ": ERROR: read_entries() oid=" << oid << " returned ret=" << ret << dendl; | |
583 | return ret; | |
584 | } | |
585 | ||
586 | if (entries.empty()) { | |
587 | break; | |
588 | } | |
589 | ||
590 | for (map<string, bufferlist>::iterator eiter = entries.begin(); eiter != entries.end(); ++eiter) { | |
591 | ldout(store->ctx(), 20) << " indexed entry: " << eiter->first << dendl; | |
592 | ret = build_linked_oids_for_bucket(eiter->first, oids); | |
593 | if (ret < 0) { | |
594 | lderr(store->ctx()) << __func__ << ": ERROR: build_linked_oids_for_bucket() indexed entry=" << eiter->first | |
595 | << " returned ret=" << ret << dendl; | |
596 | return ret; | |
597 | } | |
598 | } | |
599 | ||
600 | search_stage.shard = iter->first; | |
601 | search_stage.marker = entries.rbegin()->first; /* last entry */ | |
602 | } while (truncated); | |
603 | ||
604 | search_stage.marker.clear(); | |
605 | } | |
606 | ||
607 | int ret = log_oids(linked_objs_index, oids); | |
608 | if (ret < 0) { | |
609 | cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl; | |
610 | return ret; | |
611 | } | |
612 | ||
613 | ret = save_state(); | |
614 | if (ret < 0) { | |
615 | cerr << __func__ << ": ERROR: failed to write state ret=" << ret << std::endl; | |
616 | return ret; | |
617 | } | |
618 | ||
619 | return 0; | |
620 | } | |
621 | ||
622 | class OMAPReader { | |
623 | librados::IoCtx ioctx; | |
624 | string oid; | |
625 | ||
626 | map<string, bufferlist> entries; | |
627 | map<string, bufferlist>::iterator iter; | |
628 | string marker; | |
629 | bool truncated; | |
630 | ||
631 | public: | |
632 | OMAPReader(librados::IoCtx& _ioctx, const string& _oid) : ioctx(_ioctx), oid(_oid), truncated(true) { | |
633 | iter = entries.end(); | |
634 | } | |
635 | ||
636 | int get_next(string *key, bufferlist *pbl, bool *done); | |
637 | }; | |
638 | ||
639 | int OMAPReader::get_next(string *key, bufferlist *pbl, bool *done) | |
640 | { | |
641 | if (iter != entries.end()) { | |
642 | *key = iter->first; | |
643 | if (pbl) { | |
644 | *pbl = iter->second; | |
645 | } | |
646 | ++iter; | |
647 | *done = false; | |
648 | marker = *key; | |
649 | return 0; | |
650 | } | |
651 | ||
652 | if (!truncated) { | |
653 | *done = true; | |
654 | return 0; | |
655 | } | |
656 | ||
657 | #define MAX_OMAP_GET_ENTRIES 100 | |
658 | int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET_ENTRIES, &entries); | |
659 | if (ret < 0) { | |
660 | if (ret == -ENOENT) { | |
661 | *done = true; | |
662 | return 0; | |
663 | } | |
664 | return ret; | |
665 | } | |
666 | ||
667 | truncated = (entries.size() == MAX_OMAP_GET_ENTRIES); | |
668 | iter = entries.begin(); | |
669 | return get_next(key, pbl, done); | |
670 | } | |
671 | ||
672 | int RGWOrphanSearch::compare_oid_indexes() | |
673 | { | |
674 | assert(linked_objs_index.size() == all_objs_index.size()); | |
675 | ||
676 | librados::IoCtx& ioctx = orphan_store.get_ioctx(); | |
677 | ||
678 | librados::IoCtx data_ioctx; | |
679 | ||
680 | int ret = rgw_init_ioctx(store->get_rados_handle(), search_info.pool, data_ioctx); | |
681 | if (ret < 0) { | |
682 | lderr(store->ctx()) << __func__ << ": rgw_init_ioctx() returned ret=" << ret << dendl; | |
683 | return ret; | |
684 | } | |
685 | ||
686 | uint64_t time_threshold = search_info.start_time.sec() - stale_secs; | |
687 | ||
688 | map<int, string>::iterator liter = linked_objs_index.begin(); | |
689 | map<int, string>::iterator aiter = all_objs_index.begin(); | |
690 | ||
691 | for (; liter != linked_objs_index.end(); ++liter, ++aiter) { | |
692 | OMAPReader linked_entries(ioctx, liter->second); | |
693 | OMAPReader all_entries(ioctx, aiter->second); | |
694 | ||
695 | bool done; | |
696 | ||
697 | string cur_linked; | |
698 | bool linked_done = false; | |
699 | ||
700 | ||
701 | do { | |
702 | string key; | |
703 | int r = all_entries.get_next(&key, NULL, &done); | |
704 | if (r < 0) { | |
705 | return r; | |
706 | } | |
707 | if (done) { | |
708 | break; | |
709 | } | |
710 | ||
711 | string key_fp = obj_fingerprint(key); | |
712 | ||
713 | while (cur_linked < key_fp && !linked_done) { | |
714 | r = linked_entries.get_next(&cur_linked, NULL, &linked_done); | |
715 | if (r < 0) { | |
716 | return r; | |
717 | } | |
718 | } | |
719 | ||
720 | if (cur_linked == key_fp) { | |
721 | ldout(store->ctx(), 20) << "linked: " << key << dendl; | |
722 | continue; | |
723 | } | |
724 | ||
725 | time_t mtime; | |
726 | r = data_ioctx.stat(key, NULL, &mtime); | |
727 | if (r < 0) { | |
728 | if (r != -ENOENT) { | |
729 | lderr(store->ctx()) << "ERROR: ioctx.stat(" << key << ") returned ret=" << r << dendl; | |
730 | } | |
731 | continue; | |
732 | } | |
733 | if (stale_secs && (uint64_t)mtime >= time_threshold) { | |
734 | ldout(store->ctx(), 20) << "skipping: " << key << " (mtime=" << mtime << " threshold=" << time_threshold << ")" << dendl; | |
735 | continue; | |
736 | } | |
737 | ldout(store->ctx(), 20) << "leaked: " << key << dendl; | |
738 | cout << "leaked: " << key << std::endl; | |
739 | } while (!done); | |
740 | } | |
741 | ||
742 | return 0; | |
743 | } | |
744 | ||
745 | int RGWOrphanSearch::run() | |
746 | { | |
747 | int r; | |
748 | ||
749 | switch (search_stage.stage) { | |
750 | ||
751 | case ORPHAN_SEARCH_STAGE_INIT: | |
752 | ldout(store->ctx(), 0) << __func__ << "(): initializing state" << dendl; | |
753 | search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSPOOL); | |
754 | r = save_state(); | |
755 | if (r < 0) { | |
756 | lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl; | |
757 | return r; | |
758 | } | |
759 | // fall through | |
760 | case ORPHAN_SEARCH_STAGE_LSPOOL: | |
761 | ldout(store->ctx(), 0) << __func__ << "(): building index of all objects in pool" << dendl; | |
762 | r = build_all_oids_index(); | |
763 | if (r < 0) { | |
764 | lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl; | |
765 | return r; | |
766 | } | |
767 | ||
768 | search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSBUCKETS); | |
769 | r = save_state(); | |
770 | if (r < 0) { | |
771 | lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl; | |
772 | return r; | |
773 | } | |
774 | // fall through | |
775 | ||
776 | case ORPHAN_SEARCH_STAGE_LSBUCKETS: | |
777 | ldout(store->ctx(), 0) << __func__ << "(): building index of all bucket indexes" << dendl; | |
778 | r = build_buckets_instance_index(); | |
779 | if (r < 0) { | |
780 | lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl; | |
781 | return r; | |
782 | } | |
783 | ||
784 | search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_ITERATE_BI); | |
785 | r = save_state(); | |
786 | if (r < 0) { | |
787 | lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl; | |
788 | return r; | |
789 | } | |
790 | // fall through | |
791 | ||
792 | ||
793 | case ORPHAN_SEARCH_STAGE_ITERATE_BI: | |
794 | ldout(store->ctx(), 0) << __func__ << "(): building index of all linked objects" << dendl; | |
795 | r = build_linked_oids_index(); | |
796 | if (r < 0) { | |
797 | lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl; | |
798 | return r; | |
799 | } | |
800 | ||
801 | search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_COMPARE); | |
802 | r = save_state(); | |
803 | if (r < 0) { | |
804 | lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl; | |
805 | return r; | |
806 | } | |
807 | // fall through | |
808 | ||
809 | case ORPHAN_SEARCH_STAGE_COMPARE: | |
810 | r = compare_oid_indexes(); | |
811 | if (r < 0) { | |
812 | lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl; | |
813 | return r; | |
814 | } | |
815 | ||
816 | break; | |
817 | ||
818 | default: | |
819 | ceph_abort(); | |
820 | }; | |
821 | ||
822 | return 0; | |
823 | } | |
824 | ||
825 | ||
826 | int RGWOrphanSearch::remove_index(map<int, string>& index) | |
827 | { | |
828 | librados::IoCtx& ioctx = orphan_store.get_ioctx(); | |
829 | ||
830 | for (map<int, string>::iterator iter = index.begin(); iter != index.end(); ++iter) { | |
831 | int r = ioctx.remove(iter->second); | |
832 | if (r < 0) { | |
833 | if (r != -ENOENT) { | |
834 | ldout(store->ctx(), 0) << "ERROR: couldn't remove " << iter->second << ": ret=" << r << dendl; | |
835 | } | |
836 | } | |
837 | } | |
838 | return 0; | |
839 | } | |
840 | ||
841 | int RGWOrphanSearch::finish() | |
842 | { | |
843 | int r = remove_index(all_objs_index); | |
844 | if (r < 0) { | |
845 | ldout(store->ctx(), 0) << "ERROR: remove_index(" << all_objs_index << ") returned ret=" << r << dendl; | |
846 | } | |
847 | r = remove_index(buckets_instance_index); | |
848 | if (r < 0) { | |
849 | ldout(store->ctx(), 0) << "ERROR: remove_index(" << buckets_instance_index << ") returned ret=" << r << dendl; | |
850 | } | |
851 | r = remove_index(linked_objs_index); | |
852 | if (r < 0) { | |
853 | ldout(store->ctx(), 0) << "ERROR: remove_index(" << linked_objs_index << ") returned ret=" << r << dendl; | |
854 | } | |
855 | ||
856 | r = orphan_store.remove_job(search_info.job_name); | |
857 | if (r < 0) { | |
858 | ldout(store->ctx(), 0) << "ERROR: could not remove job name (" << search_info.job_name << ") ret=" << r << dendl; | |
859 | } | |
860 | ||
861 | return r; | |
862 | } |