]>
Commit | Line | Data |
---|---|---|
31f18b77 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
7c673cae FG |
3 | |
4 | #include <string> | |
5 | ||
7c673cae FG |
6 | |
7 | #include "common/config.h" | |
8 | #include "common/Formatter.h" | |
9 | #include "common/errno.h" | |
10 | ||
11 | #include "rgw_rados.h" | |
e306af50 TL |
12 | #include "rgw_op.h" |
13 | #include "rgw_multi.h" | |
7c673cae | 14 | #include "rgw_orphan.h" |
11fdf7f2 TL |
15 | #include "rgw_zone.h" |
16 | #include "rgw_bucket.h" | |
17 | ||
18 | #include "services/svc_zone.h" | |
19 | #include "services/svc_sys_obj.h" | |
7c673cae FG |
20 | |
21 | #define dout_subsys ceph_subsys_rgw | |
22 | ||
23 | #define DEFAULT_NUM_SHARDS 64 | |
24 | ||
25 | static string obj_fingerprint(const string& oid, const char *force_ns = NULL) | |
26 | { | |
27 | ssize_t pos = oid.find('_'); | |
28 | if (pos < 0) { | |
29 | cerr << "ERROR: object does not have a bucket marker: " << oid << std::endl; | |
30 | } | |
31 | ||
32 | string obj_marker = oid.substr(0, pos); | |
33 | ||
34 | rgw_obj_key key; | |
35 | ||
36 | rgw_obj_key::parse_raw_oid(oid.substr(pos + 1), &key); | |
37 | ||
38 | if (key.ns.empty()) { | |
39 | return oid; | |
40 | } | |
41 | ||
42 | string s = oid; | |
43 | ||
44 | if (force_ns) { | |
45 | rgw_bucket b; | |
46 | rgw_obj new_obj(b, key); | |
47 | s = obj_marker + "_" + new_obj.get_oid(); | |
48 | } | |
49 | ||
50 | /* cut out suffix */ | |
51 | size_t i = s.size() - 1; | |
52 | for (; i >= s.size() - 10; --i) { | |
53 | char c = s[i]; | |
54 | if (!isdigit(c) && c != '.' && c != '_') { | |
55 | break; | |
56 | } | |
57 | } | |
58 | ||
59 | return s.substr(0, i + 1); | |
60 | } | |
61 | ||
62 | int RGWOrphanStore::read_job(const string& job_name, RGWOrphanSearchState & state) | |
63 | { | |
64 | set<string> keys; | |
65 | map<string, bufferlist> vals; | |
66 | keys.insert(job_name); | |
67 | int r = ioctx.omap_get_vals_by_keys(oid, keys, &vals); | |
68 | if (r < 0) { | |
69 | return r; | |
70 | } | |
71 | ||
72 | map<string, bufferlist>::iterator iter = vals.find(job_name); | |
73 | if (iter == vals.end()) { | |
74 | return -ENOENT; | |
75 | } | |
76 | ||
77 | try { | |
78 | bufferlist& bl = iter->second; | |
11fdf7f2 | 79 | decode(state, bl); |
7c673cae FG |
80 | } catch (buffer::error& err) { |
81 | lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl; | |
82 | return -EIO; | |
83 | } | |
84 | ||
85 | return 0; | |
86 | } | |
87 | ||
88 | int RGWOrphanStore::write_job(const string& job_name, const RGWOrphanSearchState& state) | |
89 | { | |
90 | map<string, bufferlist> vals; | |
91 | bufferlist bl; | |
11fdf7f2 | 92 | encode(state, bl); |
7c673cae FG |
93 | vals[job_name] = bl; |
94 | int r = ioctx.omap_set(oid, vals); | |
95 | if (r < 0) { | |
96 | return r; | |
97 | } | |
98 | ||
99 | return 0; | |
100 | } | |
101 | ||
102 | int RGWOrphanStore::remove_job(const string& job_name) | |
103 | { | |
104 | set<string> keys; | |
105 | keys.insert(job_name); | |
106 | ||
107 | int r = ioctx.omap_rm_keys(oid, keys); | |
108 | if (r < 0) { | |
109 | return r; | |
110 | } | |
111 | ||
112 | return 0; | |
113 | } | |
114 | ||
115 | int RGWOrphanStore::list_jobs(map <string,RGWOrphanSearchState>& job_list) | |
116 | { | |
117 | map <string,bufferlist> vals; | |
118 | int MAX_READ=1024; | |
119 | string marker=""; | |
120 | int r = 0; | |
121 | ||
122 | // loop through all the omap vals from index object, storing them to job_list, | |
123 | // read in batches of 1024, we update the marker every iteration and exit the | |
124 | // loop when we find that total size read out is less than batch size | |
125 | do { | |
126 | r = ioctx.omap_get_vals(oid, marker, MAX_READ, &vals); | |
127 | if (r < 0) { | |
128 | return r; | |
129 | } | |
130 | r = vals.size(); | |
131 | ||
132 | for (const auto &it : vals) { | |
133 | marker=it.first; | |
134 | RGWOrphanSearchState state; | |
135 | try { | |
136 | bufferlist bl = it.second; | |
11fdf7f2 | 137 | decode(state, bl); |
7c673cae FG |
138 | } catch (buffer::error& err) { |
139 | lderr(store->ctx()) << "ERROR: could not decode buffer" << dendl; | |
140 | return -EIO; | |
141 | } | |
142 | job_list[it.first] = state; | |
143 | } | |
144 | } while (r == MAX_READ); | |
145 | ||
146 | return 0; | |
147 | } | |
148 | ||
149 | int RGWOrphanStore::init() | |
150 | { | |
9f95a23c TL |
151 | const rgw_pool& log_pool = store->svc()->zone->get_zone_params().log_pool; |
152 | int r = rgw_init_ioctx(store->getRados()->get_rados_handle(), log_pool, ioctx); | |
7c673cae FG |
153 | if (r < 0) { |
154 | cerr << "ERROR: failed to open log pool (" << log_pool << " ret=" << r << std::endl; | |
155 | return r; | |
156 | } | |
157 | ||
158 | return 0; | |
159 | } | |
160 | ||
161 | int RGWOrphanStore::store_entries(const string& oid, const map<string, bufferlist>& entries) | |
162 | { | |
163 | librados::ObjectWriteOperation op; | |
164 | op.omap_set(entries); | |
165 | cout << "storing " << entries.size() << " entries at " << oid << std::endl; | |
166 | ldout(store->ctx(), 20) << "storing " << entries.size() << " entries at " << oid << ": " << dendl; | |
167 | for (map<string, bufferlist>::const_iterator iter = entries.begin(); iter != entries.end(); ++iter) { | |
168 | ldout(store->ctx(), 20) << " > " << iter->first << dendl; | |
169 | } | |
9f95a23c | 170 | int ret = rgw_rados_operate(ioctx, oid, &op, null_yield); |
7c673cae FG |
171 | if (ret < 0) { |
172 | lderr(store->ctx()) << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << ret << dendl; | |
173 | } | |
174 | ||
175 | return 0; | |
176 | } | |
177 | ||
178 | int RGWOrphanStore::read_entries(const string& oid, const string& marker, map<string, bufferlist> *entries, bool *truncated) | |
179 | { | |
180 | #define MAX_OMAP_GET 100 | |
181 | int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET, entries); | |
182 | if (ret < 0 && ret != -ENOENT) { | |
183 | cerr << "ERROR: " << __func__ << "(" << oid << ") returned ret=" << cpp_strerror(-ret) << std::endl; | |
184 | } | |
185 | ||
186 | *truncated = (entries->size() == MAX_OMAP_GET); | |
187 | ||
188 | return 0; | |
189 | } | |
190 | ||
11fdf7f2 TL |
191 | int RGWOrphanSearch::init(const string& job_name, RGWOrphanSearchInfo *info, bool _detailed_mode) |
192 | { | |
7c673cae FG |
193 | int r = orphan_store.init(); |
194 | if (r < 0) { | |
195 | return r; | |
196 | } | |
197 | ||
11fdf7f2 TL |
198 | constexpr int64_t MAX_LIST_OBJS_ENTRIES=100; |
199 | ||
200 | max_list_bucket_entries = std::max(store->ctx()->_conf->rgw_list_bucket_min_readahead, | |
201 | MAX_LIST_OBJS_ENTRIES); | |
202 | ||
203 | detailed_mode = _detailed_mode; | |
7c673cae FG |
204 | RGWOrphanSearchState state; |
205 | r = orphan_store.read_job(job_name, state); | |
206 | if (r < 0 && r != -ENOENT) { | |
207 | lderr(store->ctx()) << "ERROR: failed to read state ret=" << r << dendl; | |
208 | return r; | |
209 | } | |
210 | ||
211 | if (r == 0) { | |
212 | search_info = state.info; | |
213 | search_stage = state.stage; | |
214 | } else if (info) { /* r == -ENOENT, initiate a new job if info was provided */ | |
215 | search_info = *info; | |
216 | search_info.job_name = job_name; | |
217 | search_info.num_shards = (info->num_shards ? info->num_shards : DEFAULT_NUM_SHARDS); | |
218 | search_info.start_time = ceph_clock_now(); | |
219 | search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_INIT); | |
220 | ||
221 | r = save_state(); | |
222 | if (r < 0) { | |
223 | lderr(store->ctx()) << "ERROR: failed to write state ret=" << r << dendl; | |
224 | return r; | |
225 | } | |
226 | } else { | |
227 | lderr(store->ctx()) << "ERROR: job not found" << dendl; | |
228 | return r; | |
229 | } | |
230 | ||
231 | index_objs_prefix = RGW_ORPHAN_INDEX_PREFIX + string("."); | |
232 | index_objs_prefix += job_name; | |
233 | ||
234 | for (int i = 0; i < search_info.num_shards; i++) { | |
235 | char buf[128]; | |
236 | ||
237 | snprintf(buf, sizeof(buf), "%s.rados.%d", index_objs_prefix.c_str(), i); | |
238 | all_objs_index[i] = buf; | |
239 | ||
240 | snprintf(buf, sizeof(buf), "%s.buckets.%d", index_objs_prefix.c_str(), i); | |
241 | buckets_instance_index[i] = buf; | |
242 | ||
243 | snprintf(buf, sizeof(buf), "%s.linked.%d", index_objs_prefix.c_str(), i); | |
244 | linked_objs_index[i] = buf; | |
245 | } | |
246 | return 0; | |
247 | } | |
248 | ||
249 | int RGWOrphanSearch::log_oids(map<int, string>& log_shards, map<int, list<string> >& oids) | |
250 | { | |
251 | map<int, list<string> >::iterator miter = oids.begin(); | |
252 | ||
253 | list<log_iter_info> liters; /* a list of iterator pairs for begin and end */ | |
254 | ||
255 | for (; miter != oids.end(); ++miter) { | |
256 | log_iter_info info; | |
257 | info.oid = log_shards[miter->first]; | |
258 | info.cur = miter->second.begin(); | |
259 | info.end = miter->second.end(); | |
260 | liters.push_back(info); | |
261 | } | |
262 | ||
263 | list<log_iter_info>::iterator list_iter; | |
264 | while (!liters.empty()) { | |
265 | list_iter = liters.begin(); | |
266 | ||
267 | while (list_iter != liters.end()) { | |
268 | log_iter_info& cur_info = *list_iter; | |
269 | ||
270 | list<string>::iterator& cur = cur_info.cur; | |
271 | list<string>::iterator& end = cur_info.end; | |
272 | ||
273 | map<string, bufferlist> entries; | |
274 | #define MAX_OMAP_SET_ENTRIES 100 | |
275 | for (int j = 0; cur != end && j != MAX_OMAP_SET_ENTRIES; ++cur, ++j) { | |
276 | ldout(store->ctx(), 20) << "adding obj: " << *cur << dendl; | |
277 | entries[*cur] = bufferlist(); | |
278 | } | |
279 | ||
280 | int ret = orphan_store.store_entries(cur_info.oid, entries); | |
281 | if (ret < 0) { | |
282 | return ret; | |
283 | } | |
284 | list<log_iter_info>::iterator tmp = list_iter; | |
285 | ++list_iter; | |
286 | if (cur == end) { | |
287 | liters.erase(tmp); | |
288 | } | |
289 | } | |
290 | } | |
291 | return 0; | |
292 | } | |
293 | ||
294 | int RGWOrphanSearch::build_all_oids_index() | |
295 | { | |
296 | librados::IoCtx ioctx; | |
297 | ||
9f95a23c | 298 | int ret = rgw_init_ioctx(store->getRados()->get_rados_handle(), search_info.pool, ioctx); |
7c673cae FG |
299 | if (ret < 0) { |
300 | lderr(store->ctx()) << __func__ << ": rgw_init_ioctx() returned ret=" << ret << dendl; | |
301 | return ret; | |
302 | } | |
303 | ||
304 | ioctx.set_namespace(librados::all_nspaces); | |
305 | librados::NObjectIterator i = ioctx.nobjects_begin(); | |
306 | librados::NObjectIterator i_end = ioctx.nobjects_end(); | |
307 | ||
308 | map<int, list<string> > oids; | |
309 | ||
310 | int count = 0; | |
311 | uint64_t total = 0; | |
312 | ||
313 | cout << "logging all objects in the pool" << std::endl; | |
314 | ||
315 | for (; i != i_end; ++i) { | |
316 | string nspace = i->get_nspace(); | |
317 | string oid = i->get_oid(); | |
318 | string locator = i->get_locator(); | |
319 | ||
320 | ssize_t pos = oid.find('_'); | |
321 | if (pos < 0) { | |
322 | cout << "unidentified oid: " << oid << ", skipping" << std::endl; | |
323 | /* what is this object, oids should be in the format of <bucket marker>_<obj>, | |
324 | * skip this entry | |
325 | */ | |
326 | continue; | |
327 | } | |
328 | string stripped_oid = oid.substr(pos + 1); | |
329 | rgw_obj_key key; | |
330 | if (!rgw_obj_key::parse_raw_oid(stripped_oid, &key)) { | |
331 | cout << "cannot parse oid: " << oid << ", skipping" << std::endl; | |
332 | continue; | |
333 | } | |
334 | ||
335 | if (key.ns.empty()) { | |
336 | /* skipping head objects, we don't want to remove these as they are mutable and | |
337 | * cleaning them up is racy (can race with object removal and a later recreation) | |
338 | */ | |
339 | cout << "skipping head object: oid=" << oid << std::endl; | |
340 | continue; | |
341 | } | |
342 | ||
343 | string oid_fp = obj_fingerprint(oid); | |
344 | ||
345 | ldout(store->ctx(), 20) << "oid_fp=" << oid_fp << dendl; | |
346 | ||
347 | int shard = orphan_shard(oid_fp); | |
348 | oids[shard].push_back(oid); | |
349 | ||
350 | #define COUNT_BEFORE_FLUSH 1000 | |
351 | ++total; | |
352 | if (++count >= COUNT_BEFORE_FLUSH) { | |
353 | ldout(store->ctx(), 1) << "iterated through " << total << " objects" << dendl; | |
354 | ret = log_oids(all_objs_index, oids); | |
355 | if (ret < 0) { | |
356 | cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl; | |
357 | return ret; | |
358 | } | |
359 | count = 0; | |
360 | oids.clear(); | |
361 | } | |
362 | } | |
363 | ret = log_oids(all_objs_index, oids); | |
364 | if (ret < 0) { | |
365 | cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl; | |
366 | return ret; | |
367 | } | |
368 | ||
369 | return 0; | |
370 | } | |
371 | ||
372 | int RGWOrphanSearch::build_buckets_instance_index() | |
373 | { | |
374 | void *handle; | |
375 | int max = 1000; | |
376 | string section = "bucket.instance"; | |
9f95a23c | 377 | int ret = store->ctl()->meta.mgr->list_keys_init(section, &handle); |
7c673cae FG |
378 | if (ret < 0) { |
379 | lderr(store->ctx()) << "ERROR: can't get key: " << cpp_strerror(-ret) << dendl; | |
92f5a8d4 | 380 | return ret; |
7c673cae FG |
381 | } |
382 | ||
383 | map<int, list<string> > instances; | |
384 | ||
385 | bool truncated; | |
386 | ||
387 | RGWObjectCtx obj_ctx(store); | |
388 | ||
389 | int count = 0; | |
390 | uint64_t total = 0; | |
391 | ||
392 | do { | |
393 | list<string> keys; | |
9f95a23c | 394 | ret = store->ctl()->meta.mgr->list_keys_next(handle, max, keys, &truncated); |
7c673cae FG |
395 | if (ret < 0) { |
396 | lderr(store->ctx()) << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl; | |
92f5a8d4 | 397 | return ret; |
7c673cae FG |
398 | } |
399 | ||
400 | for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) { | |
401 | ++total; | |
402 | ldout(store->ctx(), 10) << "bucket_instance=" << *iter << " total=" << total << dendl; | |
403 | int shard = orphan_shard(*iter); | |
404 | instances[shard].push_back(*iter); | |
405 | ||
406 | if (++count >= COUNT_BEFORE_FLUSH) { | |
407 | ret = log_oids(buckets_instance_index, instances); | |
408 | if (ret < 0) { | |
409 | lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl; | |
410 | return ret; | |
411 | } | |
412 | count = 0; | |
413 | instances.clear(); | |
414 | } | |
415 | } | |
416 | ||
417 | } while (truncated); | |
418 | ||
419 | ret = log_oids(buckets_instance_index, instances); | |
420 | if (ret < 0) { | |
421 | lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl; | |
422 | return ret; | |
423 | } | |
9f95a23c | 424 | store->ctl()->meta.mgr->list_keys_complete(handle); |
7c673cae FG |
425 | |
426 | return 0; | |
427 | } | |
428 | ||
429 | int RGWOrphanSearch::handle_stat_result(map<int, list<string> >& oids, RGWRados::Object::Stat::Result& result) | |
430 | { | |
431 | set<string> obj_oids; | |
432 | rgw_bucket& bucket = result.obj.bucket; | |
9f95a23c | 433 | if (!result.manifest) { /* a very very old object, or part of a multipart upload during upload */ |
7c673cae FG |
434 | const string loc = bucket.bucket_id + "_" + result.obj.get_oid(); |
435 | obj_oids.insert(obj_fingerprint(loc)); | |
436 | ||
437 | /* | |
438 | * multipart parts don't have manifest on them, it's in the meta object. Instead of reading the | |
439 | * meta object, just add a "shadow" object to the mix | |
440 | */ | |
441 | obj_oids.insert(obj_fingerprint(loc, "shadow")); | |
442 | } else { | |
9f95a23c | 443 | RGWObjManifest& manifest = *result.manifest; |
7c673cae | 444 | |
11fdf7f2 TL |
445 | if (!detailed_mode && |
446 | manifest.get_obj_size() <= manifest.get_head_size()) { | |
447 | ldout(store->ctx(), 5) << "skipping object as it fits in a head" << dendl; | |
448 | return 0; | |
449 | } | |
450 | ||
7c673cae FG |
451 | RGWObjManifest::obj_iterator miter; |
452 | for (miter = manifest.obj_begin(); miter != manifest.obj_end(); ++miter) { | |
9f95a23c | 453 | const rgw_raw_obj& loc = miter.get_location().get_raw_obj(store->getRados()); |
7c673cae FG |
454 | string s = loc.oid; |
455 | obj_oids.insert(obj_fingerprint(s)); | |
456 | } | |
457 | } | |
458 | ||
459 | for (set<string>::iterator iter = obj_oids.begin(); iter != obj_oids.end(); ++iter) { | |
460 | ldout(store->ctx(), 20) << __func__ << ": oid for obj=" << result.obj << ": " << *iter << dendl; | |
461 | ||
462 | int shard = orphan_shard(*iter); | |
463 | oids[shard].push_back(*iter); | |
464 | } | |
465 | ||
466 | return 0; | |
467 | } | |
468 | ||
469 | int RGWOrphanSearch::pop_and_handle_stat_op(map<int, list<string> >& oids, std::deque<RGWRados::Object::Stat>& ops) | |
470 | { | |
471 | RGWRados::Object::Stat& front_op = ops.front(); | |
472 | ||
473 | int ret = front_op.wait(); | |
474 | if (ret < 0) { | |
475 | if (ret != -ENOENT) { | |
476 | lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl; | |
477 | } | |
478 | goto done; | |
479 | } | |
480 | ret = handle_stat_result(oids, front_op.result); | |
481 | if (ret < 0) { | |
482 | lderr(store->ctx()) << "ERROR: handle_stat_response() returned error: " << cpp_strerror(-ret) << dendl; | |
483 | } | |
484 | done: | |
485 | ops.pop_front(); | |
486 | return ret; | |
487 | } | |
488 | ||
489 | int RGWOrphanSearch::build_linked_oids_for_bucket(const string& bucket_instance_id, map<int, list<string> >& oids) | |
490 | { | |
7c673cae | 491 | RGWObjectCtx obj_ctx(store); |
9f95a23c | 492 | auto sysobj_ctx = store->svc()->sysobj->init_obj_ctx(); |
11fdf7f2 TL |
493 | |
494 | rgw_bucket orphan_bucket; | |
495 | int shard_id; | |
496 | int ret = rgw_bucket_parse_bucket_key(store->ctx(), bucket_instance_id, | |
497 | &orphan_bucket, &shard_id); | |
498 | if (ret < 0) { | |
499 | ldout(store->ctx(),0) << __func__ << " failed to parse bucket instance: " | |
500 | << bucket_instance_id << " skipping" << dendl; | |
501 | return ret; | |
502 | } | |
503 | ||
504 | RGWBucketInfo cur_bucket_info; | |
9f95a23c TL |
505 | ret = store->getRados()->get_bucket_info(store->svc(), orphan_bucket.tenant, |
506 | orphan_bucket.name, cur_bucket_info, nullptr, null_yield); | |
11fdf7f2 TL |
507 | if (ret < 0) { |
508 | if (ret == -ENOENT) { | |
509 | /* probably raced with bucket removal */ | |
510 | return 0; | |
511 | } | |
512 | lderr(store->ctx()) << __func__ << ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << ret << dendl; | |
513 | return ret; | |
514 | } | |
515 | ||
516 | if (cur_bucket_info.bucket.bucket_id != orphan_bucket.bucket_id) { | |
517 | ldout(store->ctx(), 0) << __func__ << ": Skipping stale bucket instance: " | |
518 | << orphan_bucket.name << ": " | |
519 | << orphan_bucket.bucket_id << dendl; | |
520 | return 0; | |
521 | } | |
522 | ||
9f95a23c | 523 | if (cur_bucket_info.reshard_status == cls_rgw_reshard_status::IN_PROGRESS) { |
11fdf7f2 TL |
524 | ldout(store->ctx(), 0) << __func__ << ": reshard in progress. Skipping " |
525 | << orphan_bucket.name << ": " | |
526 | << orphan_bucket.bucket_id << dendl; | |
527 | return 0; | |
528 | } | |
529 | ||
530 | RGWBucketInfo bucket_info; | |
9f95a23c | 531 | ret = store->getRados()->get_bucket_instance_info(sysobj_ctx, bucket_instance_id, bucket_info, nullptr, nullptr, null_yield); |
7c673cae FG |
532 | if (ret < 0) { |
533 | if (ret == -ENOENT) { | |
534 | /* probably raced with bucket removal */ | |
535 | return 0; | |
536 | } | |
537 | lderr(store->ctx()) << __func__ << ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << ret << dendl; | |
538 | return ret; | |
539 | } | |
540 | ||
11fdf7f2 | 541 | ldout(store->ctx(), 10) << "building linked oids for bucket instance: " << bucket_instance_id << dendl; |
9f95a23c | 542 | RGWRados::Bucket target(store->getRados(), bucket_info); |
7c673cae FG |
543 | RGWRados::Bucket::List list_op(&target); |
544 | ||
545 | string marker; | |
546 | list_op.params.marker = rgw_obj_key(marker); | |
547 | list_op.params.list_versions = true; | |
548 | list_op.params.enforce_ns = false; | |
549 | ||
550 | bool truncated; | |
551 | ||
552 | deque<RGWRados::Object::Stat> stat_ops; | |
553 | ||
7c673cae FG |
554 | do { |
555 | vector<rgw_bucket_dir_entry> result; | |
556 | ||
11fdf7f2 | 557 | ret = list_op.list_objects(max_list_bucket_entries, |
9f95a23c | 558 | &result, nullptr, &truncated, null_yield); |
7c673cae FG |
559 | if (ret < 0) { |
560 | cerr << "ERROR: store->list_objects(): " << cpp_strerror(-ret) << std::endl; | |
92f5a8d4 | 561 | return ret; |
7c673cae FG |
562 | } |
563 | ||
564 | for (vector<rgw_bucket_dir_entry>::iterator iter = result.begin(); iter != result.end(); ++iter) { | |
565 | rgw_bucket_dir_entry& entry = *iter; | |
566 | if (entry.key.instance.empty()) { | |
567 | ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << dendl; | |
568 | } else { | |
569 | ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << " [" << entry.key.instance << "]" << dendl; | |
570 | } | |
571 | ||
572 | ldout(store->ctx(), 20) << __func__ << ": entry.key.name=" << entry.key.name << " entry.key.instance=" << entry.key.instance << dendl; | |
11fdf7f2 TL |
573 | |
574 | if (!detailed_mode && | |
575 | entry.meta.accounted_size <= (uint64_t)store->ctx()->_conf->rgw_max_chunk_size) { | |
576 | ldout(store->ctx(),5) << __func__ << "skipping stat as the object " << entry.key.name | |
577 | << "fits in a head" << dendl; | |
578 | continue; | |
579 | } | |
580 | ||
7c673cae FG |
581 | rgw_obj obj(bucket_info.bucket, entry.key); |
582 | ||
9f95a23c | 583 | RGWRados::Object op_target(store->getRados(), bucket_info, obj_ctx, obj); |
7c673cae FG |
584 | |
585 | stat_ops.push_back(RGWRados::Object::Stat(&op_target)); | |
586 | RGWRados::Object::Stat& op = stat_ops.back(); | |
587 | ||
588 | ||
589 | ret = op.stat_async(); | |
590 | if (ret < 0) { | |
591 | lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl; | |
592 | return ret; | |
593 | } | |
594 | if (stat_ops.size() >= max_concurrent_ios) { | |
595 | ret = pop_and_handle_stat_op(oids, stat_ops); | |
596 | if (ret < 0) { | |
597 | if (ret != -ENOENT) { | |
598 | lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl; | |
599 | } | |
600 | } | |
601 | } | |
11fdf7f2 | 602 | if (oids.size() >= COUNT_BEFORE_FLUSH) { |
7c673cae FG |
603 | ret = log_oids(linked_objs_index, oids); |
604 | if (ret < 0) { | |
605 | cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl; | |
606 | return ret; | |
607 | } | |
7c673cae FG |
608 | oids.clear(); |
609 | } | |
610 | } | |
611 | } while (truncated); | |
612 | ||
613 | while (!stat_ops.empty()) { | |
614 | ret = pop_and_handle_stat_op(oids, stat_ops); | |
615 | if (ret < 0) { | |
616 | if (ret != -ENOENT) { | |
617 | lderr(store->ctx()) << "ERROR: stat_async() returned error: " << cpp_strerror(-ret) << dendl; | |
618 | } | |
619 | } | |
620 | } | |
621 | ||
622 | return 0; | |
623 | } | |
624 | ||
625 | int RGWOrphanSearch::build_linked_oids_index() | |
626 | { | |
627 | map<int, list<string> > oids; | |
628 | map<int, string>::iterator iter = buckets_instance_index.find(search_stage.shard); | |
629 | for (; iter != buckets_instance_index.end(); ++iter) { | |
630 | ldout(store->ctx(), 0) << "building linked oids index: " << iter->first << "/" << buckets_instance_index.size() << dendl; | |
631 | bool truncated; | |
632 | ||
633 | string oid = iter->second; | |
634 | ||
635 | do { | |
636 | map<string, bufferlist> entries; | |
637 | int ret = orphan_store.read_entries(oid, search_stage.marker, &entries, &truncated); | |
638 | if (ret == -ENOENT) { | |
639 | truncated = false; | |
640 | ret = 0; | |
641 | } | |
642 | ||
643 | if (ret < 0) { | |
644 | lderr(store->ctx()) << __func__ << ": ERROR: read_entries() oid=" << oid << " returned ret=" << ret << dendl; | |
645 | return ret; | |
646 | } | |
647 | ||
648 | if (entries.empty()) { | |
649 | break; | |
650 | } | |
651 | ||
652 | for (map<string, bufferlist>::iterator eiter = entries.begin(); eiter != entries.end(); ++eiter) { | |
653 | ldout(store->ctx(), 20) << " indexed entry: " << eiter->first << dendl; | |
654 | ret = build_linked_oids_for_bucket(eiter->first, oids); | |
655 | if (ret < 0) { | |
656 | lderr(store->ctx()) << __func__ << ": ERROR: build_linked_oids_for_bucket() indexed entry=" << eiter->first | |
657 | << " returned ret=" << ret << dendl; | |
658 | return ret; | |
659 | } | |
660 | } | |
661 | ||
662 | search_stage.shard = iter->first; | |
663 | search_stage.marker = entries.rbegin()->first; /* last entry */ | |
664 | } while (truncated); | |
665 | ||
666 | search_stage.marker.clear(); | |
667 | } | |
668 | ||
669 | int ret = log_oids(linked_objs_index, oids); | |
670 | if (ret < 0) { | |
671 | cerr << __func__ << ": ERROR: log_oids() returned ret=" << ret << std::endl; | |
672 | return ret; | |
673 | } | |
674 | ||
675 | ret = save_state(); | |
676 | if (ret < 0) { | |
677 | cerr << __func__ << ": ERROR: failed to write state ret=" << ret << std::endl; | |
678 | return ret; | |
679 | } | |
680 | ||
681 | return 0; | |
682 | } | |
683 | ||
684 | class OMAPReader { | |
685 | librados::IoCtx ioctx; | |
686 | string oid; | |
687 | ||
688 | map<string, bufferlist> entries; | |
689 | map<string, bufferlist>::iterator iter; | |
690 | string marker; | |
691 | bool truncated; | |
692 | ||
693 | public: | |
694 | OMAPReader(librados::IoCtx& _ioctx, const string& _oid) : ioctx(_ioctx), oid(_oid), truncated(true) { | |
695 | iter = entries.end(); | |
696 | } | |
697 | ||
698 | int get_next(string *key, bufferlist *pbl, bool *done); | |
699 | }; | |
700 | ||
701 | int OMAPReader::get_next(string *key, bufferlist *pbl, bool *done) | |
702 | { | |
703 | if (iter != entries.end()) { | |
704 | *key = iter->first; | |
705 | if (pbl) { | |
706 | *pbl = iter->second; | |
707 | } | |
708 | ++iter; | |
709 | *done = false; | |
710 | marker = *key; | |
711 | return 0; | |
712 | } | |
713 | ||
714 | if (!truncated) { | |
715 | *done = true; | |
716 | return 0; | |
717 | } | |
718 | ||
719 | #define MAX_OMAP_GET_ENTRIES 100 | |
720 | int ret = ioctx.omap_get_vals(oid, marker, MAX_OMAP_GET_ENTRIES, &entries); | |
721 | if (ret < 0) { | |
722 | if (ret == -ENOENT) { | |
723 | *done = true; | |
724 | return 0; | |
725 | } | |
726 | return ret; | |
727 | } | |
728 | ||
729 | truncated = (entries.size() == MAX_OMAP_GET_ENTRIES); | |
730 | iter = entries.begin(); | |
731 | return get_next(key, pbl, done); | |
732 | } | |
733 | ||
734 | int RGWOrphanSearch::compare_oid_indexes() | |
735 | { | |
11fdf7f2 | 736 | ceph_assert(linked_objs_index.size() == all_objs_index.size()); |
7c673cae FG |
737 | |
738 | librados::IoCtx& ioctx = orphan_store.get_ioctx(); | |
739 | ||
740 | librados::IoCtx data_ioctx; | |
741 | ||
9f95a23c | 742 | int ret = rgw_init_ioctx(store->getRados()->get_rados_handle(), search_info.pool, data_ioctx); |
7c673cae FG |
743 | if (ret < 0) { |
744 | lderr(store->ctx()) << __func__ << ": rgw_init_ioctx() returned ret=" << ret << dendl; | |
745 | return ret; | |
746 | } | |
747 | ||
748 | uint64_t time_threshold = search_info.start_time.sec() - stale_secs; | |
749 | ||
750 | map<int, string>::iterator liter = linked_objs_index.begin(); | |
751 | map<int, string>::iterator aiter = all_objs_index.begin(); | |
752 | ||
753 | for (; liter != linked_objs_index.end(); ++liter, ++aiter) { | |
754 | OMAPReader linked_entries(ioctx, liter->second); | |
755 | OMAPReader all_entries(ioctx, aiter->second); | |
756 | ||
757 | bool done; | |
758 | ||
759 | string cur_linked; | |
760 | bool linked_done = false; | |
761 | ||
762 | ||
763 | do { | |
764 | string key; | |
765 | int r = all_entries.get_next(&key, NULL, &done); | |
766 | if (r < 0) { | |
767 | return r; | |
768 | } | |
769 | if (done) { | |
770 | break; | |
771 | } | |
772 | ||
773 | string key_fp = obj_fingerprint(key); | |
774 | ||
775 | while (cur_linked < key_fp && !linked_done) { | |
776 | r = linked_entries.get_next(&cur_linked, NULL, &linked_done); | |
777 | if (r < 0) { | |
778 | return r; | |
779 | } | |
780 | } | |
781 | ||
782 | if (cur_linked == key_fp) { | |
783 | ldout(store->ctx(), 20) << "linked: " << key << dendl; | |
784 | continue; | |
785 | } | |
786 | ||
787 | time_t mtime; | |
788 | r = data_ioctx.stat(key, NULL, &mtime); | |
789 | if (r < 0) { | |
790 | if (r != -ENOENT) { | |
791 | lderr(store->ctx()) << "ERROR: ioctx.stat(" << key << ") returned ret=" << r << dendl; | |
792 | } | |
793 | continue; | |
794 | } | |
795 | if (stale_secs && (uint64_t)mtime >= time_threshold) { | |
796 | ldout(store->ctx(), 20) << "skipping: " << key << " (mtime=" << mtime << " threshold=" << time_threshold << ")" << dendl; | |
797 | continue; | |
798 | } | |
799 | ldout(store->ctx(), 20) << "leaked: " << key << dendl; | |
800 | cout << "leaked: " << key << std::endl; | |
801 | } while (!done); | |
802 | } | |
803 | ||
804 | return 0; | |
805 | } | |
806 | ||
807 | int RGWOrphanSearch::run() | |
808 | { | |
809 | int r; | |
810 | ||
811 | switch (search_stage.stage) { | |
812 | ||
813 | case ORPHAN_SEARCH_STAGE_INIT: | |
814 | ldout(store->ctx(), 0) << __func__ << "(): initializing state" << dendl; | |
815 | search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSPOOL); | |
816 | r = save_state(); | |
817 | if (r < 0) { | |
818 | lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl; | |
819 | return r; | |
820 | } | |
821 | // fall through | |
822 | case ORPHAN_SEARCH_STAGE_LSPOOL: | |
823 | ldout(store->ctx(), 0) << __func__ << "(): building index of all objects in pool" << dendl; | |
824 | r = build_all_oids_index(); | |
825 | if (r < 0) { | |
826 | lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl; | |
827 | return r; | |
828 | } | |
829 | ||
830 | search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_LSBUCKETS); | |
831 | r = save_state(); | |
832 | if (r < 0) { | |
833 | lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl; | |
834 | return r; | |
835 | } | |
836 | // fall through | |
837 | ||
838 | case ORPHAN_SEARCH_STAGE_LSBUCKETS: | |
839 | ldout(store->ctx(), 0) << __func__ << "(): building index of all bucket indexes" << dendl; | |
840 | r = build_buckets_instance_index(); | |
841 | if (r < 0) { | |
842 | lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl; | |
843 | return r; | |
844 | } | |
845 | ||
846 | search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_ITERATE_BI); | |
847 | r = save_state(); | |
848 | if (r < 0) { | |
849 | lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl; | |
850 | return r; | |
851 | } | |
852 | // fall through | |
853 | ||
854 | ||
855 | case ORPHAN_SEARCH_STAGE_ITERATE_BI: | |
856 | ldout(store->ctx(), 0) << __func__ << "(): building index of all linked objects" << dendl; | |
857 | r = build_linked_oids_index(); | |
858 | if (r < 0) { | |
859 | lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl; | |
860 | return r; | |
861 | } | |
862 | ||
863 | search_stage = RGWOrphanSearchStage(ORPHAN_SEARCH_STAGE_COMPARE); | |
864 | r = save_state(); | |
865 | if (r < 0) { | |
866 | lderr(store->ctx()) << __func__ << ": ERROR: failed to save state, ret=" << r << dendl; | |
867 | return r; | |
868 | } | |
869 | // fall through | |
870 | ||
871 | case ORPHAN_SEARCH_STAGE_COMPARE: | |
872 | r = compare_oid_indexes(); | |
873 | if (r < 0) { | |
874 | lderr(store->ctx()) << __func__ << ": ERROR: build_all_objs_index returned ret=" << r << dendl; | |
875 | return r; | |
876 | } | |
877 | ||
878 | break; | |
879 | ||
880 | default: | |
881 | ceph_abort(); | |
882 | }; | |
883 | ||
884 | return 0; | |
885 | } | |
886 | ||
887 | ||
888 | int RGWOrphanSearch::remove_index(map<int, string>& index) | |
889 | { | |
890 | librados::IoCtx& ioctx = orphan_store.get_ioctx(); | |
891 | ||
892 | for (map<int, string>::iterator iter = index.begin(); iter != index.end(); ++iter) { | |
893 | int r = ioctx.remove(iter->second); | |
894 | if (r < 0) { | |
895 | if (r != -ENOENT) { | |
896 | ldout(store->ctx(), 0) << "ERROR: couldn't remove " << iter->second << ": ret=" << r << dendl; | |
897 | } | |
898 | } | |
899 | } | |
900 | return 0; | |
901 | } | |
902 | ||
903 | int RGWOrphanSearch::finish() | |
904 | { | |
905 | int r = remove_index(all_objs_index); | |
906 | if (r < 0) { | |
907 | ldout(store->ctx(), 0) << "ERROR: remove_index(" << all_objs_index << ") returned ret=" << r << dendl; | |
908 | } | |
909 | r = remove_index(buckets_instance_index); | |
910 | if (r < 0) { | |
911 | ldout(store->ctx(), 0) << "ERROR: remove_index(" << buckets_instance_index << ") returned ret=" << r << dendl; | |
912 | } | |
913 | r = remove_index(linked_objs_index); | |
914 | if (r < 0) { | |
915 | ldout(store->ctx(), 0) << "ERROR: remove_index(" << linked_objs_index << ") returned ret=" << r << dendl; | |
916 | } | |
917 | ||
918 | r = orphan_store.remove_job(search_info.job_name); | |
919 | if (r < 0) { | |
920 | ldout(store->ctx(), 0) << "ERROR: could not remove job name (" << search_info.job_name << ") ret=" << r << dendl; | |
921 | } | |
922 | ||
923 | return r; | |
924 | } | |
e306af50 TL |
925 | |
926 | ||
927 | int RGWRadosList::handle_stat_result(RGWRados::Object::Stat::Result& result, | |
7f7e6c64 TL |
928 | std::string& bucket_name, |
929 | rgw_obj_key& obj_key, | |
e306af50 TL |
930 | std::set<string>& obj_oids) |
931 | { | |
932 | obj_oids.clear(); | |
933 | ||
934 | rgw_bucket& bucket = result.obj.bucket; | |
935 | ||
936 | ldout(store->ctx(), 20) << "RGWRadosList::" << __func__ << | |
937 | " bucket=" << bucket << | |
938 | ", has_manifest=" << result.manifest.has_value() << | |
939 | dendl; | |
940 | ||
941 | // iterator to store result of dlo/slo attribute find | |
942 | decltype(result.attrs)::iterator attr_it = result.attrs.end(); | |
943 | const std::string oid = bucket.marker + "_" + result.obj.get_oid(); | |
944 | ldout(store->ctx(), 20) << "radoslist processing object=\"" << | |
945 | oid << "\"" << dendl; | |
946 | if (visited_oids.find(oid) != visited_oids.end()) { | |
947 | // apparently we hit a loop; don't continue with this oid | |
948 | ldout(store->ctx(), 15) << | |
949 | "radoslist stopped loop at already visited object=\"" << | |
950 | oid << "\"" << dendl; | |
951 | return 0; | |
952 | } | |
953 | ||
7f7e6c64 TL |
954 | bucket_name = bucket.name; |
955 | obj_key = result.obj.key; | |
956 | ||
e306af50 TL |
957 | if (!result.manifest) { |
958 | /* a very very old object, or part of a multipart upload during upload */ | |
959 | obj_oids.insert(oid); | |
960 | ||
961 | /* | |
962 | * multipart parts don't have manifest on them, it's in the meta | |
963 | * object; we'll process them in | |
964 | * RGWRadosList::do_incomplete_multipart | |
965 | */ | |
966 | } else if ((attr_it = result.attrs.find(RGW_ATTR_USER_MANIFEST)) != | |
967 | result.attrs.end()) { | |
968 | // *** handle DLO object *** | |
969 | ||
970 | obj_oids.insert(oid); | |
971 | visited_oids.insert(oid); // prevent dlo loops | |
972 | ldout(store->ctx(), 15) << "radoslist added to visited list DLO=\"" << | |
973 | oid << "\"" << dendl; | |
974 | ||
975 | char* prefix_path_c = attr_it->second.c_str(); | |
976 | const std::string& prefix_path = prefix_path_c; | |
977 | ||
978 | const size_t sep_pos = prefix_path.find('/'); | |
979 | if (string::npos == sep_pos) { | |
980 | return -EINVAL; | |
981 | } | |
982 | ||
983 | const std::string bucket_name = prefix_path.substr(0, sep_pos); | |
984 | const std::string prefix = prefix_path.substr(sep_pos + 1); | |
985 | ||
986 | add_bucket_prefix(bucket_name, prefix); | |
987 | ldout(store->ctx(), 25) << "radoslist DLO oid=\"" << oid << | |
988 | "\" added bucket=\"" << bucket_name << "\" prefix=\"" << | |
989 | prefix << "\" to process list" << dendl; | |
990 | } else if ((attr_it = result.attrs.find(RGW_ATTR_SLO_MANIFEST)) != | |
991 | result.attrs.end()) { | |
992 | // *** handle SLO object *** | |
993 | ||
994 | obj_oids.insert(oid); | |
995 | visited_oids.insert(oid); // prevent slo loops | |
996 | ldout(store->ctx(), 15) << "radoslist added to visited list SLO=\"" << | |
997 | oid << "\"" << dendl; | |
998 | ||
999 | RGWSLOInfo slo_info; | |
1000 | bufferlist::const_iterator bliter = attr_it->second.begin(); | |
1001 | try { | |
1002 | ::decode(slo_info, bliter); | |
1003 | } catch (buffer::error& err) { | |
1004 | ldout(store->ctx(), 0) << | |
1005 | "ERROR: failed to decode slo manifest for " << oid << dendl; | |
1006 | return -EIO; | |
1007 | } | |
1008 | ||
1009 | for (const auto& iter : slo_info.entries) { | |
1010 | const string& path_str = iter.path; | |
1011 | ||
1012 | const size_t sep_pos = path_str.find('/', 1 /* skip initial slash */); | |
1013 | if (string::npos == sep_pos) { | |
1014 | return -EINVAL; | |
1015 | } | |
1016 | ||
1017 | std::string bucket_name; | |
1018 | std::string obj_name; | |
1019 | ||
1020 | bucket_name = url_decode(path_str.substr(1, sep_pos - 1)); | |
1021 | obj_name = url_decode(path_str.substr(sep_pos + 1)); | |
1022 | ||
1023 | const rgw_obj_key obj_key(obj_name); | |
1024 | add_bucket_filter(bucket_name, obj_key); | |
1025 | ldout(store->ctx(), 25) << "radoslist SLO oid=\"" << oid << | |
1026 | "\" added bucket=\"" << bucket_name << "\" obj_key=\"" << | |
1027 | obj_key << "\" to process list" << dendl; | |
1028 | } | |
1029 | } else { | |
1030 | RGWObjManifest& manifest = *result.manifest; | |
1031 | ||
1032 | // in multipart, the head object contains no data and just has the | |
1033 | // manifest AND empty objects have no manifest, but they're | |
1034 | // realized as empty rados objects | |
1035 | if (0 == manifest.get_max_head_size() || | |
1036 | manifest.obj_begin() == manifest.obj_end()) { | |
1037 | obj_oids.insert(oid); | |
1038 | // first_insert = true; | |
1039 | } | |
1040 | ||
1041 | RGWObjManifest::obj_iterator miter; | |
1042 | for (miter = manifest.obj_begin(); miter != manifest.obj_end(); ++miter) { | |
1043 | const rgw_raw_obj& loc = | |
1044 | miter.get_location().get_raw_obj(store->getRados()); | |
1045 | string s = loc.oid; | |
1046 | obj_oids.insert(s); | |
1047 | } | |
1048 | } | |
1049 | ||
1050 | return 0; | |
1051 | } // RGWRadosList::handle_stat_result | |
1052 | ||
1053 | int RGWRadosList::pop_and_handle_stat_op( | |
1054 | RGWObjectCtx& obj_ctx, | |
1055 | std::deque<RGWRados::Object::Stat>& ops) | |
1056 | { | |
7f7e6c64 TL |
1057 | std::string bucket_name; |
1058 | rgw_obj_key obj_key; | |
1059 | std::set<std::string> obj_oids; | |
e306af50 TL |
1060 | RGWRados::Object::Stat& front_op = ops.front(); |
1061 | ||
1062 | int ret = front_op.wait(); | |
1063 | if (ret < 0) { | |
1064 | if (ret != -ENOENT) { | |
1065 | lderr(store->ctx()) << "ERROR: stat_async() returned error: " << | |
1066 | cpp_strerror(-ret) << dendl; | |
1067 | } | |
1068 | goto done; | |
1069 | } | |
1070 | ||
7f7e6c64 | 1071 | ret = handle_stat_result(front_op.result, bucket_name, obj_key, obj_oids); |
e306af50 TL |
1072 | if (ret < 0) { |
1073 | lderr(store->ctx()) << "ERROR: handle_stat_result() returned error: " << | |
1074 | cpp_strerror(-ret) << dendl; | |
1075 | } | |
1076 | ||
1077 | // output results | |
1078 | for (const auto& o : obj_oids) { | |
7f7e6c64 TL |
1079 | if (include_rgw_obj_name) { |
1080 | std::cout << o << | |
1081 | field_separator << bucket_name << | |
1082 | field_separator << obj_key << | |
1083 | std::endl; | |
1084 | } else { | |
1085 | std::cout << o << std::endl; | |
1086 | } | |
e306af50 TL |
1087 | } |
1088 | ||
1089 | done: | |
1090 | ||
1091 | // invalidate object context for this object to avoid memory leak | |
1092 | // (see pr https://github.com/ceph/ceph/pull/30174) | |
1093 | obj_ctx.invalidate(front_op.result.obj); | |
1094 | ||
1095 | ops.pop_front(); | |
1096 | return ret; | |
1097 | } | |
1098 | ||
1099 | ||
1100 | #if 0 // code that may be the basis for expansion | |
1101 | int RGWRadosList::build_buckets_instance_index() | |
1102 | { | |
1103 | void *handle; | |
1104 | int max = 1000; | |
1105 | string section = "bucket.instance"; | |
1106 | int ret = store->meta_mgr->list_keys_init(section, &handle); | |
1107 | if (ret < 0) { | |
1108 | lderr(store->ctx()) << "ERROR: can't get key: " << cpp_strerror(-ret) << dendl; | |
1109 | return ret; | |
1110 | } | |
1111 | ||
1112 | map<int, list<string> > instances; | |
1113 | ||
1114 | bool truncated; | |
1115 | ||
1116 | RGWObjectCtx obj_ctx(store); | |
1117 | ||
1118 | int count = 0; | |
1119 | uint64_t total = 0; | |
1120 | ||
1121 | do { | |
1122 | list<string> keys; | |
1123 | ret = store->meta_mgr->list_keys_next(handle, max, keys, &truncated); | |
1124 | if (ret < 0) { | |
1125 | lderr(store->ctx()) << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << dendl; | |
1126 | return ret; | |
1127 | } | |
1128 | ||
1129 | for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) { | |
1130 | ++total; | |
1131 | ldout(store->ctx(), 10) << "bucket_instance=" << *iter << " total=" << total << dendl; | |
1132 | int shard = orphan_shard(*iter); | |
1133 | instances[shard].push_back(*iter); | |
1134 | ||
1135 | if (++count >= COUNT_BEFORE_FLUSH) { | |
1136 | ret = log_oids(buckets_instance_index, instances); | |
1137 | if (ret < 0) { | |
1138 | lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl; | |
1139 | return ret; | |
1140 | } | |
1141 | count = 0; | |
1142 | instances.clear(); | |
1143 | } | |
1144 | } | |
1145 | } while (truncated); | |
1146 | ||
1147 | ret = log_oids(buckets_instance_index, instances); | |
1148 | if (ret < 0) { | |
1149 | lderr(store->ctx()) << __func__ << ": ERROR: log_oids() returned ret=" << ret << dendl; | |
1150 | return ret; | |
1151 | } | |
1152 | store->meta_mgr->list_keys_complete(handle); | |
1153 | ||
1154 | return 0; | |
1155 | } | |
1156 | #endif | |
1157 | ||
1158 | ||
1159 | int RGWRadosList::process_bucket( | |
1160 | const std::string& bucket_instance_id, | |
1161 | const std::string& prefix, | |
1162 | const std::set<rgw_obj_key>& entries_filter) | |
1163 | { | |
1164 | ldout(store->ctx(), 10) << "RGWRadosList::" << __func__ << | |
1165 | " bucket_instance_id=" << bucket_instance_id << | |
1166 | ", prefix=" << prefix << | |
1167 | ", entries_filter.size=" << entries_filter.size() << dendl; | |
1168 | ||
1169 | RGWBucketInfo bucket_info; | |
1170 | RGWSysObjectCtx sys_obj_ctx = store->svc()->sysobj->init_obj_ctx(); | |
1171 | int ret = store->getRados()->get_bucket_instance_info(sys_obj_ctx, | |
1172 | bucket_instance_id, | |
1173 | bucket_info, | |
1174 | nullptr, | |
1175 | nullptr, | |
1176 | null_yield); | |
1177 | if (ret < 0) { | |
1178 | if (ret == -ENOENT) { | |
1179 | // probably raced with bucket removal | |
1180 | return 0; | |
1181 | } | |
1182 | lderr(store->ctx()) << __func__ << | |
1183 | ": ERROR: RGWRados::get_bucket_instance_info() returned ret=" << | |
1184 | ret << dendl; | |
1185 | return ret; | |
1186 | } | |
1187 | ||
1188 | RGWRados::Bucket target(store->getRados(), bucket_info); | |
1189 | RGWRados::Bucket::List list_op(&target); | |
1190 | ||
1191 | std::string marker; | |
1192 | list_op.params.marker = rgw_obj_key(marker); | |
1193 | list_op.params.list_versions = true; | |
1194 | list_op.params.enforce_ns = false; | |
1195 | list_op.params.allow_unordered = false; | |
1196 | list_op.params.prefix = prefix; | |
1197 | ||
1198 | bool truncated; | |
1199 | ||
1200 | std::deque<RGWRados::Object::Stat> stat_ops; | |
1201 | std::string prev_versioned_key_name = ""; | |
1202 | ||
1203 | RGWObjectCtx obj_ctx(store); | |
1204 | ||
1205 | do { | |
1206 | std::vector<rgw_bucket_dir_entry> result; | |
1207 | ||
1208 | constexpr int64_t LIST_OBJS_MAX_ENTRIES = 100; | |
1209 | ret = list_op.list_objects(LIST_OBJS_MAX_ENTRIES, &result, | |
1210 | NULL, &truncated, null_yield); | |
1211 | if (ret == -ENOENT) { | |
1212 | // race with bucket delete? | |
1213 | ret = 0; | |
1214 | break; | |
1215 | } else if (ret < 0) { | |
1216 | std::cerr << "ERROR: store->list_objects(): " << cpp_strerror(-ret) << | |
1217 | std::endl; | |
1218 | return ret; | |
1219 | } | |
1220 | ||
1221 | for (std::vector<rgw_bucket_dir_entry>::iterator iter = result.begin(); | |
1222 | iter != result.end(); | |
1223 | ++iter) { | |
1224 | rgw_bucket_dir_entry& entry = *iter; | |
1225 | ||
1226 | if (entry.key.instance.empty()) { | |
1227 | ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << dendl; | |
1228 | } else { | |
1229 | ldout(store->ctx(), 20) << "obj entry: " << entry.key.name << | |
1230 | " [" << entry.key.instance << "]" << dendl; | |
1231 | } | |
1232 | ||
1233 | ldout(store->ctx(), 20) << __func__ << ": entry.key.name=" << | |
1234 | entry.key.name << " entry.key.instance=" << entry.key.instance << | |
1235 | dendl; | |
1236 | ||
1237 | // ignore entries that are not in the filter if there is a filter | |
1238 | if (!entries_filter.empty() && | |
1239 | entries_filter.find(entry.key) == entries_filter.cend()) { | |
1240 | continue; | |
1241 | } | |
1242 | ||
1243 | // we need to do this in two cases below, so use a lambda | |
1244 | auto do_stat_key = | |
1245 | [&](const rgw_obj_key& key) -> int { | |
1246 | int ret; | |
1247 | ||
1248 | rgw_obj obj(bucket_info.bucket, key); | |
1249 | ||
1250 | RGWRados::Object op_target(store->getRados(), bucket_info, | |
1251 | obj_ctx, obj); | |
1252 | ||
1253 | stat_ops.push_back(RGWRados::Object::Stat(&op_target)); | |
1254 | RGWRados::Object::Stat& op = stat_ops.back(); | |
1255 | ||
1256 | ret = op.stat_async(); | |
1257 | if (ret < 0) { | |
1258 | lderr(store->ctx()) << "ERROR: stat_async() returned error: " << | |
1259 | cpp_strerror(-ret) << dendl; | |
1260 | return ret; | |
1261 | } | |
1262 | ||
1263 | if (stat_ops.size() >= max_concurrent_ios) { | |
1264 | ret = pop_and_handle_stat_op(obj_ctx, stat_ops); | |
1265 | if (ret < 0) { | |
1266 | if (ret != -ENOENT) { | |
1267 | lderr(store->ctx()) << | |
1268 | "ERROR: pop_and_handle_stat_op() returned error: " << | |
1269 | cpp_strerror(-ret) << dendl; | |
1270 | } | |
1271 | ||
1272 | // clear error, so we'll continue processing directory | |
1273 | ret = 0; | |
1274 | } | |
1275 | } | |
1276 | ||
1277 | return ret; | |
1278 | }; // do_stat_key lambda | |
1279 | ||
1280 | // for versioned objects, make sure the head object is handled | |
1281 | // as well by ignoring the instance identifier | |
1282 | if (!entry.key.instance.empty() && | |
1283 | entry.key.name != prev_versioned_key_name) { | |
1284 | // don't do the same key twice; even though out bucket index | |
1285 | // listing allows unordered, since all versions of an object | |
1286 | // use the same bucket index key, they'll all end up together | |
1287 | // and sorted | |
1288 | prev_versioned_key_name = entry.key.name; | |
1289 | ||
1290 | rgw_obj_key uninstanced(entry.key.name); | |
1291 | ||
1292 | ret = do_stat_key(uninstanced); | |
1293 | if (ret < 0) { | |
1294 | return ret; | |
1295 | } | |
1296 | } | |
1297 | ||
1298 | ret = do_stat_key(entry.key); | |
1299 | if (ret < 0) { | |
1300 | return ret; | |
1301 | } | |
1302 | } // for iter loop | |
1303 | } while (truncated); | |
1304 | ||
1305 | while (!stat_ops.empty()) { | |
1306 | ret = pop_and_handle_stat_op(obj_ctx, stat_ops); | |
1307 | if (ret < 0) { | |
1308 | if (ret != -ENOENT) { | |
1309 | lderr(store->ctx()) << "ERROR: stat_async() returned error: " << | |
1310 | cpp_strerror(-ret) << dendl; | |
1311 | } | |
1312 | } | |
1313 | } | |
1314 | ||
1315 | return 0; | |
1316 | } | |
1317 | ||
1318 | ||
1319 | int RGWRadosList::run() | |
1320 | { | |
1321 | int ret; | |
1322 | void* handle = nullptr; | |
1323 | ||
1324 | ret = store->ctl()->meta.mgr->list_keys_init("bucket", &handle); | |
1325 | if (ret < 0) { | |
1326 | lderr(store->ctx()) << "RGWRadosList::" << __func__ << | |
1327 | " ERROR: list_keys_init returned " << | |
1328 | cpp_strerror(-ret) << dendl; | |
1329 | return ret; | |
1330 | } | |
1331 | ||
1332 | const int max_keys = 1000; | |
1333 | bool truncated = true; | |
1334 | ||
1335 | do { | |
1336 | std::list<std::string> buckets; | |
1337 | ret = store->ctl()->meta.mgr->list_keys_next(handle, max_keys, | |
1338 | buckets, &truncated); | |
1339 | ||
1340 | for (std::string& bucket_id : buckets) { | |
1341 | ret = run(bucket_id); | |
1342 | if (ret == -ENOENT) { | |
1343 | continue; | |
1344 | } else if (ret < 0) { | |
1345 | return ret; | |
1346 | } | |
1347 | } | |
1348 | } while (truncated); | |
1349 | ||
1350 | return 0; | |
1351 | } // RGWRadosList::run() | |
1352 | ||
1353 | ||
1354 | int RGWRadosList::run(const std::string& start_bucket_name) | |
1355 | { | |
1356 | RGWSysObjectCtx sys_obj_ctx = store->svc()->sysobj->init_obj_ctx(); | |
1357 | RGWObjectCtx obj_ctx(store); | |
7f7e6c64 | 1358 | RGWBucketInfo bucket_info; |
e306af50 TL |
1359 | int ret; |
1360 | ||
1361 | add_bucket_entire(start_bucket_name); | |
1362 | ||
1363 | while (! bucket_process_map.empty()) { | |
1364 | // pop item from map and capture its key data | |
1365 | auto front = bucket_process_map.begin(); | |
1366 | std::string bucket_name = front->first; | |
1367 | process_t process; | |
1368 | std::swap(process, front->second); | |
1369 | bucket_process_map.erase(front); | |
1370 | ||
1371 | RGWBucketInfo bucket_info; | |
1372 | ret = store->getRados()->get_bucket_info(store->svc(), | |
1373 | tenant_name, | |
1374 | bucket_name, | |
1375 | bucket_info, | |
1376 | nullptr, | |
1377 | null_yield); | |
1378 | if (ret == -ENOENT) { | |
1379 | std::cerr << "WARNING: bucket " << bucket_name << | |
1380 | " does not exist; could it have been deleted very recently?" << | |
1381 | std::endl; | |
1382 | continue; | |
1383 | } else if (ret < 0) { | |
1384 | std::cerr << "ERROR: could not get info for bucket " << bucket_name << | |
1385 | " -- " << cpp_strerror(-ret) << std::endl; | |
1386 | return ret; | |
1387 | } | |
1388 | ||
1389 | const std::string bucket_id = bucket_info.bucket.get_key(); | |
1390 | ||
1391 | static const std::set<rgw_obj_key> empty_filter; | |
1392 | static const std::string empty_prefix; | |
1393 | ||
1394 | auto do_process_bucket = | |
1395 | [&bucket_id, this] | |
1396 | (const std::string& prefix, | |
1397 | const std::set<rgw_obj_key>& entries_filter) -> int { | |
1398 | int ret = process_bucket(bucket_id, prefix, entries_filter); | |
1399 | if (ret == -ENOENT) { | |
1400 | // bucket deletion race? | |
1401 | return 0; | |
1402 | } if (ret < 0) { | |
1403 | lderr(store->ctx()) << "RGWRadosList::" << __func__ << | |
1404 | ": ERROR: process_bucket(); bucket_id=" << | |
1405 | bucket_id << " returned ret=" << ret << dendl; | |
1406 | } | |
1407 | ||
1408 | return ret; | |
1409 | }; | |
1410 | ||
1411 | // either process the whole bucket *or* process the filters and/or | |
1412 | // the prefixes | |
1413 | if (process.entire_container) { | |
1414 | ret = do_process_bucket(empty_prefix, empty_filter); | |
1415 | if (ret < 0) { | |
1416 | return ret; | |
1417 | } | |
1418 | } else { | |
1419 | if (! process.filter_keys.empty()) { | |
1420 | ret = do_process_bucket(empty_prefix, process.filter_keys); | |
1421 | if (ret < 0) { | |
1422 | return ret; | |
1423 | } | |
1424 | } | |
1425 | for (const auto& p : process.prefixes) { | |
1426 | ret = do_process_bucket(p, empty_filter); | |
1427 | if (ret < 0) { | |
1428 | return ret; | |
1429 | } | |
1430 | } | |
1431 | } | |
1432 | } // while (! bucket_process_map.empty()) | |
1433 | ||
7f7e6c64 TL |
1434 | if (include_rgw_obj_name) { |
1435 | goto done; | |
1436 | } | |
1437 | ||
e306af50 TL |
1438 | // now handle incomplete multipart uploads by going back to the |
1439 | // initial bucket | |
1440 | ||
e306af50 TL |
1441 | ret = store->getRados()->get_bucket_info(store->svc(), |
1442 | tenant_name, | |
1443 | start_bucket_name, | |
1444 | bucket_info, | |
1445 | nullptr, | |
1446 | null_yield); | |
1447 | if (ret == -ENOENT) { | |
1448 | // bucket deletion race? | |
1449 | return 0; | |
1450 | } else if (ret < 0) { | |
1451 | lderr(store->ctx()) << "RGWRadosList::" << __func__ << | |
1452 | ": ERROR: get_bucket_info returned ret=" << ret << dendl; | |
1453 | return ret; | |
1454 | } | |
1455 | ||
1456 | ret = do_incomplete_multipart(store, bucket_info); | |
1457 | if (ret < 0) { | |
1458 | lderr(store->ctx()) << "RGWRadosList::" << __func__ << | |
1459 | ": ERROR: do_incomplete_multipart returned ret=" << ret << dendl; | |
1460 | return ret; | |
1461 | } | |
1462 | ||
7f7e6c64 TL |
1463 | done: |
1464 | ||
e306af50 TL |
1465 | return 0; |
1466 | } // RGWRadosList::run(string) | |
1467 | ||
1468 | ||
1469 | int RGWRadosList::do_incomplete_multipart( | |
1470 | rgw::sal::RGWRadosStore* store, | |
1471 | RGWBucketInfo& bucket_info) | |
1472 | { | |
1473 | constexpr int max_uploads = 1000; | |
1474 | constexpr int max_parts = 1000; | |
1475 | static const std::string mp_ns = RGW_OBJ_NS_MULTIPART; | |
1476 | static MultipartMetaFilter mp_filter; | |
1477 | ||
1478 | int ret; | |
1479 | ||
1480 | RGWRados::Bucket target(store->getRados(), bucket_info); | |
1481 | RGWRados::Bucket::List list_op(&target); | |
f6b5b4d7 TL |
1482 | list_op.params.ns = mp_ns; |
1483 | list_op.params.filter = &mp_filter; | |
1484 | // use empty string for initial list_op.params.marker | |
1485 | // use empty strings for list_op.params.{prefix,delim} | |
e306af50 TL |
1486 | |
1487 | bool is_listing_truncated; | |
e306af50 TL |
1488 | |
1489 | do { | |
e306af50 TL |
1490 | std::vector<rgw_bucket_dir_entry> objs; |
1491 | std::map<string, bool> common_prefixes; | |
1492 | ret = list_op.list_objects(max_uploads, &objs, &common_prefixes, | |
1493 | &is_listing_truncated, null_yield); | |
1494 | if (ret == -ENOENT) { | |
1495 | // could bucket have been removed while this is running? | |
ec96510d | 1496 | ldout(store->ctx(), 5) << "RGWRadosList::" << __func__ << |
e306af50 TL |
1497 | ": WARNING: call to list_objects of multipart namespace got ENOENT; " |
1498 | "assuming bucket removal race" << dendl; | |
1499 | break; | |
1500 | } else if (ret < 0) { | |
1501 | lderr(store->ctx()) << "RGWRadosList::" << __func__ << | |
1502 | ": ERROR: list_objects op returned ret=" << ret << dendl; | |
1503 | return ret; | |
1504 | } | |
1505 | ||
1506 | if (!objs.empty()) { | |
1507 | std::vector<RGWMultipartUploadEntry> uploads; | |
1508 | RGWMultipartUploadEntry entry; | |
1509 | for (const rgw_bucket_dir_entry& obj : objs) { | |
1510 | const rgw_obj_key& key = obj.key; | |
1511 | if (!entry.mp.from_meta(key.name)) { | |
1512 | // we only want the meta objects, so skip all the components | |
1513 | continue; | |
1514 | } | |
1515 | entry.obj = obj; | |
1516 | uploads.push_back(entry); | |
1517 | ldout(store->ctx(), 20) << "RGWRadosList::" << __func__ << | |
1518 | " processing incomplete multipart entry " << | |
1519 | entry << dendl; | |
1520 | } | |
e306af50 TL |
1521 | |
1522 | // now process the uploads vector | |
ec96510d FG |
1523 | for (const auto& upload : uploads) { |
1524 | const RGWMPObj& mp = upload.mp; | |
1525 | int parts_marker = 0; | |
1526 | bool is_parts_truncated = false; | |
e306af50 | 1527 | |
ec96510d FG |
1528 | do { // while (is_parts_truncated); |
1529 | std::map<uint32_t, RGWUploadPartInfo> parts; | |
e306af50 TL |
1530 | ret = list_multipart_parts(store, bucket_info, store->ctx(), |
1531 | mp.get_upload_id(), mp.get_meta(), | |
ec96510d FG |
1532 | max_parts, parts_marker, |
1533 | parts, &parts_marker, | |
1534 | &is_parts_truncated); | |
e306af50 | 1535 | if (ret == -ENOENT) { |
ec96510d FG |
1536 | ldout(store->ctx(), 5) << "RGWRadosList::" << __func__ << |
1537 | ": WARNING: list_multipart_parts returned ret=-ENOENT " | |
1538 | "for " << mp.get_upload_id() << ", moving on" << dendl; | |
1539 | break; | |
e306af50 TL |
1540 | } else if (ret < 0) { |
1541 | lderr(store->ctx()) << "RGWRadosList::" << __func__ << | |
ec96510d FG |
1542 | ": ERROR: list_multipart_parts returned ret=" << ret << |
1543 | dendl; | |
e306af50 TL |
1544 | return ret; |
1545 | } | |
1546 | ||
1547 | for (auto& p : parts) { | |
1548 | RGWObjManifest& manifest = p.second.manifest; | |
1549 | for (auto obj_it = manifest.obj_begin(); | |
1550 | obj_it != manifest.obj_end(); | |
1551 | ++obj_it) { | |
1552 | const rgw_raw_obj& loc = | |
1553 | obj_it.get_location().get_raw_obj(store->getRados()); | |
1554 | std::cout << loc.oid << std::endl; | |
ec96510d FG |
1555 | } // for (auto obj_it |
1556 | } // for (auto& p | |
1557 | } while (is_parts_truncated); | |
1558 | } // for (const auto& upload | |
e306af50 | 1559 | } // if objs not empty |
f6b5b4d7 | 1560 | } while (is_listing_truncated); |
e306af50 TL |
1561 | |
1562 | return 0; | |
1563 | } // RGWRadosList::do_incomplete_multipart |