]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_bucket.cc
4648d94aee18b67e46c43a1574ebafff0dd7d33f
[ceph.git] / ceph / src / rgw / rgw_bucket.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <errno.h>
5
6 #include <string>
7 #include <map>
8 #include <sstream>
9
10 #include <boost/utility/string_ref.hpp>
11 #include <boost/format.hpp>
12
13 #include "common/errno.h"
14 #include "common/ceph_json.h"
15 #include "rgw_rados.h"
16 #include "rgw_acl.h"
17 #include "rgw_acl_s3.h"
18
19 #include "include/types.h"
20 #include "rgw_bucket.h"
21 #include "rgw_user.h"
22 #include "rgw_string.h"
23 #include "rgw_multi.h"
24
25 #include "include/rados/librados.hpp"
26 // until everything is moved from rgw_common
27 #include "rgw_common.h"
28
29 #include "cls/user/cls_user_types.h"
30
31 #define dout_context g_ceph_context
32 #define dout_subsys ceph_subsys_rgw
33
34 #define BUCKET_TAG_TIMEOUT 30
35
36 using namespace std;
37
38 static RGWMetadataHandler *bucket_meta_handler = NULL;
39 static RGWMetadataHandler *bucket_instance_meta_handler = NULL;
40
41 // define as static when RGWBucket implementation compete
42 void rgw_get_buckets_obj(const rgw_user& user_id, string& buckets_obj_id)
43 {
44 buckets_obj_id = user_id.to_str();
45 buckets_obj_id += RGW_BUCKETS_OBJ_SUFFIX;
46 }
47
48 /*
49 * Note that this is not a reversal of parse_bucket(). That one deals
50 * with the syntax we need in metadata and such. This one deals with
51 * the representation in RADOS pools. We chose '/' because it's not
52 * acceptable in bucket names and thus qualified buckets cannot conflict
53 * with the legacy or S3 buckets.
54 */
55 std::string rgw_make_bucket_entry_name(const std::string& tenant_name,
56 const std::string& bucket_name) {
57 std::string bucket_entry;
58
59 if (bucket_name.empty()) {
60 bucket_entry.clear();
61 } else if (tenant_name.empty()) {
62 bucket_entry = bucket_name;
63 } else {
64 bucket_entry = tenant_name + "/" + bucket_name;
65 }
66
67 return bucket_entry;
68 }
69
70 /*
71 * Tenants are separated from buckets in URLs by a colon in S3.
72 * This function is not to be used on Swift URLs, not even for COPY arguments.
73 */
74 void rgw_parse_url_bucket(const string &bucket, const string& auth_tenant,
75 string &tenant_name, string &bucket_name) {
76
77 int pos = bucket.find(':');
78 if (pos >= 0) {
79 /*
80 * N.B.: We allow ":bucket" syntax with explicit empty tenant in order
81 * to refer to the legacy tenant, in case users in new named tenants
82 * want to access old global buckets.
83 */
84 tenant_name = bucket.substr(0, pos);
85 bucket_name = bucket.substr(pos + 1);
86 } else {
87 tenant_name = auth_tenant;
88 bucket_name = bucket;
89 }
90 }
91
92 /**
93 * Get all the buckets owned by a user and fill up an RGWUserBuckets with them.
94 * Returns: 0 on success, -ERR# on failure.
95 */
96 int rgw_read_user_buckets(RGWRados * store,
97 const rgw_user& user_id,
98 RGWUserBuckets& buckets,
99 const string& marker,
100 const string& end_marker,
101 uint64_t max,
102 bool need_stats,
103 bool *is_truncated,
104 uint64_t default_amount)
105 {
106 int ret;
107 buckets.clear();
108 string buckets_obj_id;
109 rgw_get_buckets_obj(user_id, buckets_obj_id);
110 rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id);
111 list<cls_user_bucket_entry> entries;
112
113 bool truncated = false;
114 string m = marker;
115
116 uint64_t total = 0;
117
118 if (!max) {
119 max = default_amount;
120 }
121
122 do {
123 ret = store->cls_user_list_buckets(obj, m, end_marker, max - total, entries, &m, &truncated);
124 if (ret == -ENOENT)
125 ret = 0;
126
127 if (ret < 0)
128 return ret;
129
130 for (const auto& entry : entries) {
131 buckets.add(RGWBucketEnt(user_id, entry));
132 total++;
133 }
134
135 } while (truncated && total < max);
136
137 if (is_truncated != nullptr) {
138 *is_truncated = truncated;
139 }
140
141 if (need_stats) {
142 map<string, RGWBucketEnt>& m = buckets.get_buckets();
143 ret = store->update_containers_stats(m);
144 if (ret < 0 && ret != -ENOENT) {
145 ldout(store->ctx(), 0) << "ERROR: could not get stats for buckets" << dendl;
146 return ret;
147 }
148 }
149 return 0;
150 }
151
152 int rgw_bucket_sync_user_stats(RGWRados *store, const rgw_user& user_id, const RGWBucketInfo& bucket_info)
153 {
154 string buckets_obj_id;
155 rgw_get_buckets_obj(user_id, buckets_obj_id);
156 rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id);
157
158 return store->cls_user_sync_bucket_stats(obj, bucket_info);
159 }
160
161 int rgw_bucket_sync_user_stats(RGWRados *store, const string& tenant_name, const string& bucket_name)
162 {
163 RGWBucketInfo bucket_info;
164 RGWObjectCtx obj_ctx(store);
165 int ret = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL);
166 if (ret < 0) {
167 ldout(store->ctx(), 0) << "ERROR: could not fetch bucket info: ret=" << ret << dendl;
168 return ret;
169 }
170
171 ret = rgw_bucket_sync_user_stats(store, bucket_info.owner, bucket_info);
172 if (ret < 0) {
173 ldout(store->ctx(), 0) << "ERROR: could not sync user stats for bucket " << bucket_name << ": ret=" << ret << dendl;
174 return ret;
175 }
176
177 return 0;
178 }
179
180 int rgw_link_bucket(RGWRados *store, const rgw_user& user_id, rgw_bucket& bucket, real_time creation_time, bool update_entrypoint)
181 {
182 int ret;
183 string& tenant_name = bucket.tenant;
184 string& bucket_name = bucket.name;
185
186 cls_user_bucket_entry new_bucket;
187
188 RGWBucketEntryPoint ep;
189 RGWObjVersionTracker ot;
190
191 bucket.convert(&new_bucket.bucket);
192 new_bucket.size = 0;
193 if (real_clock::is_zero(creation_time))
194 new_bucket.creation_time = real_clock::now();
195 else
196 new_bucket.creation_time = creation_time;
197
198 map<string, bufferlist> attrs;
199 RGWObjectCtx obj_ctx(store);
200
201 if (update_entrypoint) {
202 ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, ep, &ot, NULL, &attrs);
203 if (ret < 0 && ret != -ENOENT) {
204 ldout(store->ctx(), 0) << "ERROR: store->get_bucket_entrypoint_info() returned: "
205 << cpp_strerror(-ret) << dendl;
206 }
207 }
208
209 string buckets_obj_id;
210 rgw_get_buckets_obj(user_id, buckets_obj_id);
211
212 rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id);
213 ret = store->cls_user_add_bucket(obj, new_bucket);
214 if (ret < 0) {
215 ldout(store->ctx(), 0) << "ERROR: error adding bucket to directory: "
216 << cpp_strerror(-ret) << dendl;
217 goto done_err;
218 }
219
220 if (!update_entrypoint)
221 return 0;
222
223 ep.linked = true;
224 ep.owner = user_id;
225 ep.bucket = bucket;
226 ret = store->put_bucket_entrypoint_info(tenant_name, bucket_name, ep, false, ot, real_time(), &attrs);
227 if (ret < 0)
228 goto done_err;
229
230 return 0;
231 done_err:
232 int r = rgw_unlink_bucket(store, user_id, bucket.tenant, bucket.name);
233 if (r < 0) {
234 ldout(store->ctx(), 0) << "ERROR: failed unlinking bucket on error cleanup: "
235 << cpp_strerror(-r) << dendl;
236 }
237 return ret;
238 }
239
240 int rgw_unlink_bucket(RGWRados *store, const rgw_user& user_id, const string& tenant_name, const string& bucket_name, bool update_entrypoint)
241 {
242 int ret;
243
244 string buckets_obj_id;
245 rgw_get_buckets_obj(user_id, buckets_obj_id);
246
247 cls_user_bucket bucket;
248 bucket.name = bucket_name;
249 rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id);
250 ret = store->cls_user_remove_bucket(obj, bucket);
251 if (ret < 0) {
252 ldout(store->ctx(), 0) << "ERROR: error removing bucket from directory: "
253 << cpp_strerror(-ret)<< dendl;
254 }
255
256 if (!update_entrypoint)
257 return 0;
258
259 RGWBucketEntryPoint ep;
260 RGWObjVersionTracker ot;
261 map<string, bufferlist> attrs;
262 RGWObjectCtx obj_ctx(store);
263 ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, ep, &ot, NULL, &attrs);
264 if (ret == -ENOENT)
265 return 0;
266 if (ret < 0)
267 return ret;
268
269 if (!ep.linked)
270 return 0;
271
272 if (ep.owner != user_id) {
273 ldout(store->ctx(), 0) << "bucket entry point user mismatch, can't unlink bucket: " << ep.owner << " != " << user_id << dendl;
274 return -EINVAL;
275 }
276
277 ep.linked = false;
278 return store->put_bucket_entrypoint_info(tenant_name, bucket_name, ep, false, ot, real_time(), &attrs);
279 }
280
281 int rgw_bucket_store_info(RGWRados *store, const string& bucket_name, bufferlist& bl, bool exclusive,
282 map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
283 real_time mtime) {
284 return store->meta_mgr->put_entry(bucket_meta_handler, bucket_name, bl, exclusive, objv_tracker, mtime, pattrs);
285 }
286
287 int rgw_bucket_instance_store_info(RGWRados *store, string& entry, bufferlist& bl, bool exclusive,
288 map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
289 real_time mtime) {
290 return store->meta_mgr->put_entry(bucket_instance_meta_handler, entry, bl, exclusive, objv_tracker, mtime, pattrs);
291 }
292
293 int rgw_bucket_instance_remove_entry(RGWRados *store, string& entry, RGWObjVersionTracker *objv_tracker) {
294 return store->meta_mgr->remove_entry(bucket_instance_meta_handler, entry, objv_tracker);
295 }
296
297 // 'tenant/' is used in bucket instance keys for sync to avoid parsing ambiguity
298 // with the existing instance[:shard] format. once we parse the shard, the / is
299 // replaced with a : to match the [tenant:]instance format
300 void rgw_bucket_instance_key_to_oid(string& key)
301 {
302 // replace tenant/ with tenant:
303 auto c = key.find('/');
304 if (c != string::npos) {
305 key[c] = ':';
306 }
307 }
308
309 // convert bucket instance oids back to the tenant/ format for metadata keys.
310 // it's safe to parse 'tenant:' only for oids, because they won't contain the
311 // optional :shard at the end
312 void rgw_bucket_instance_oid_to_key(string& oid)
313 {
314 // find first : (could be tenant:bucket or bucket:instance)
315 auto c = oid.find(':');
316 if (c != string::npos) {
317 // if we find another :, the first one was for tenant
318 if (oid.find(':', c + 1) != string::npos) {
319 oid[c] = '/';
320 }
321 }
322 }
323
324 int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id)
325 {
326 ssize_t pos = bucket_instance.rfind(':');
327 if (pos < 0) {
328 return -EINVAL;
329 }
330
331 string first = bucket_instance.substr(0, pos);
332 string second = bucket_instance.substr(pos + 1);
333
334 if (first.find(':') == string::npos) {
335 *shard_id = -1;
336 *target_bucket_instance = bucket_instance;
337 return 0;
338 }
339
340 *target_bucket_instance = first;
341 string err;
342 *shard_id = strict_strtol(second.c_str(), 10, &err);
343 if (!err.empty()) {
344 return -EINVAL;
345 }
346
347 return 0;
348 }
349
350 // parse key in format: [tenant/]name:instance[:shard_id]
351 int rgw_bucket_parse_bucket_key(CephContext *cct, const string& key,
352 rgw_bucket *bucket, int *shard_id)
353 {
354 boost::string_ref name{key};
355 boost::string_ref instance;
356
357 // split tenant/name
358 auto pos = name.find('/');
359 if (pos != boost::string_ref::npos) {
360 auto tenant = name.substr(0, pos);
361 bucket->tenant.assign(tenant.begin(), tenant.end());
362 name = name.substr(pos + 1);
363 }
364
365 // split name:instance
366 pos = name.find(':');
367 if (pos != boost::string_ref::npos) {
368 instance = name.substr(pos + 1);
369 name = name.substr(0, pos);
370 }
371 bucket->name.assign(name.begin(), name.end());
372
373 // split instance:shard
374 pos = instance.find(':');
375 if (pos == boost::string_ref::npos) {
376 bucket->bucket_id.assign(instance.begin(), instance.end());
377 *shard_id = -1;
378 return 0;
379 }
380
381 // parse shard id
382 auto shard = instance.substr(pos + 1);
383 string err;
384 auto id = strict_strtol(shard.data(), 10, &err);
385 if (!err.empty()) {
386 ldout(cct, 0) << "ERROR: failed to parse bucket shard '"
387 << instance.data() << "': " << err << dendl;
388 return -EINVAL;
389 }
390
391 *shard_id = id;
392 instance = instance.substr(0, pos);
393 bucket->bucket_id.assign(instance.begin(), instance.end());
394 return 0;
395 }
396
397 int rgw_bucket_set_attrs(RGWRados *store, RGWBucketInfo& bucket_info,
398 map<string, bufferlist>& attrs,
399 RGWObjVersionTracker *objv_tracker)
400 {
401 rgw_bucket& bucket = bucket_info.bucket;
402
403 if (!bucket_info.has_instance_obj) {
404 /* an old bucket object, need to convert it */
405 RGWObjectCtx obj_ctx(store);
406 int ret = store->convert_old_bucket_info(obj_ctx, bucket.tenant, bucket.name);
407 if (ret < 0) {
408 ldout(store->ctx(), 0) << "ERROR: failed converting old bucket info: " << ret << dendl;
409 return ret;
410 }
411 }
412
413 /* we want the bucket instance name without the oid prefix cruft */
414 string key = bucket.get_key();
415 bufferlist bl;
416
417 ::encode(bucket_info, bl);
418
419 return rgw_bucket_instance_store_info(store, key, bl, false, &attrs, objv_tracker, real_time());
420 }
421
422 static void dump_mulipart_index_results(list<rgw_obj_index_key>& objs_to_unlink,
423 Formatter *f)
424 {
425 // make sure that an appropiately titled header has been opened previously
426 auto oiter = objs_to_unlink.begin();
427
428 f->open_array_section("invalid_multipart_entries");
429
430 for ( ; oiter != objs_to_unlink.end(); ++oiter) {
431 f->dump_string("object", oiter->name);
432 }
433
434 f->close_section();
435 }
436
437 void check_bad_user_bucket_mapping(RGWRados *store, const rgw_user& user_id,
438 bool fix)
439 {
440 RGWUserBuckets user_buckets;
441 bool is_truncated = false;
442 string marker;
443
444 CephContext *cct = store->ctx();
445
446 size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
447
448 do {
449 int ret = rgw_read_user_buckets(store, user_id, user_buckets, marker,
450 string(), max_entries, false,
451 &is_truncated);
452 if (ret < 0) {
453 ldout(store->ctx(), 0) << "failed to read user buckets: "
454 << cpp_strerror(-ret) << dendl;
455 return;
456 }
457
458 map<string, RGWBucketEnt>& buckets = user_buckets.get_buckets();
459 for (map<string, RGWBucketEnt>::iterator i = buckets.begin();
460 i != buckets.end();
461 ++i) {
462 marker = i->first;
463
464 RGWBucketEnt& bucket_ent = i->second;
465 rgw_bucket& bucket = bucket_ent.bucket;
466
467 RGWBucketInfo bucket_info;
468 real_time mtime;
469 RGWObjectCtx obj_ctx(store);
470 int r = store->get_bucket_info(obj_ctx, user_id.tenant, bucket.name, bucket_info, &mtime);
471 if (r < 0) {
472 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << dendl;
473 continue;
474 }
475
476 rgw_bucket& actual_bucket = bucket_info.bucket;
477
478 if (actual_bucket.name.compare(bucket.name) != 0 ||
479 actual_bucket.tenant.compare(bucket.tenant) != 0 ||
480 actual_bucket.marker.compare(bucket.marker) != 0 ||
481 actual_bucket.bucket_id.compare(bucket.bucket_id) != 0) {
482 cout << "bucket info mismatch: expected " << actual_bucket << " got " << bucket << std::endl;
483 if (fix) {
484 cout << "fixing" << std::endl;
485 r = rgw_link_bucket(store, user_id, actual_bucket, bucket_info.creation_time);
486 if (r < 0) {
487 cerr << "failed to fix bucket: " << cpp_strerror(-r) << std::endl;
488 }
489 }
490 }
491 }
492 } while (is_truncated);
493 }
494
495 static bool bucket_object_check_filter(const string& oid)
496 {
497 rgw_obj_key key;
498 string ns;
499 return rgw_obj_key::oid_to_key_in_ns(oid, &key, ns);
500 }
501
502 int rgw_remove_object(RGWRados *store, RGWBucketInfo& bucket_info, rgw_bucket& bucket, rgw_obj_key& key)
503 {
504 RGWObjectCtx rctx(store);
505
506 if (key.instance.empty()) {
507 key.instance = "null";
508 }
509
510 rgw_obj obj(bucket, key);
511
512 return store->delete_obj(rctx, bucket_info, obj, bucket_info.versioning_status());
513 }
514
515 int rgw_remove_bucket(RGWRados *store, rgw_bucket& bucket, bool delete_children)
516 {
517 int ret;
518 map<RGWObjCategory, RGWStorageStats> stats;
519 std::vector<rgw_bucket_dir_entry> objs;
520 map<string, bool> common_prefixes;
521 RGWBucketInfo info;
522 RGWObjectCtx obj_ctx(store);
523
524 string bucket_ver, master_ver;
525
526 ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, info, NULL);
527 if (ret < 0)
528 return ret;
529
530 ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL);
531 if (ret < 0)
532 return ret;
533
534 RGWRados::Bucket target(store, info);
535 RGWRados::Bucket::List list_op(&target);
536 CephContext *cct = store->ctx();
537 int max = 1000;
538
539 list_op.params.list_versions = true;
540
541 do {
542 objs.clear();
543
544 ret = list_op.list_objects(max, &objs, &common_prefixes, NULL);
545 if (ret < 0)
546 return ret;
547
548 if (!objs.empty() && !delete_children) {
549 lderr(store->ctx()) << "ERROR: could not remove non-empty bucket " << bucket.name << dendl;
550 return -ENOTEMPTY;
551 }
552
553 for (const auto& obj : objs) {
554 rgw_obj_key key(obj.key);
555 ret = rgw_remove_object(store, info, bucket, key);
556 if (ret < 0)
557 return ret;
558 }
559
560 } while (!objs.empty());
561
562 string prefix, delimiter;
563
564 ret = abort_bucket_multiparts(store, cct, info, prefix, delimiter);
565 if (ret < 0) {
566 return ret;
567 }
568
569 ret = rgw_bucket_sync_user_stats(store, bucket.tenant, info);
570 if ( ret < 0) {
571 dout(1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl;
572 }
573
574 RGWObjVersionTracker objv_tracker;
575
576 ret = store->delete_bucket(info, objv_tracker);
577 if (ret < 0) {
578 lderr(store->ctx()) << "ERROR: could not remove bucket " << bucket.name << dendl;
579 return ret;
580 }
581
582 ret = rgw_unlink_bucket(store, info.owner, bucket.tenant, bucket.name, false);
583 if (ret < 0) {
584 lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
585 }
586
587 return ret;
588 }
589
590 static int aio_wait(librados::AioCompletion *handle)
591 {
592 librados::AioCompletion *c = (librados::AioCompletion *)handle;
593 c->wait_for_safe();
594 int ret = c->get_return_value();
595 c->release();
596 return ret;
597 }
598
599 static int drain_handles(list<librados::AioCompletion *>& pending)
600 {
601 int ret = 0;
602 while (!pending.empty()) {
603 librados::AioCompletion *handle = pending.front();
604 pending.pop_front();
605 int r = aio_wait(handle);
606 if (r < 0) {
607 ret = r;
608 }
609 }
610 return ret;
611 }
612
613 int rgw_remove_bucket_bypass_gc(RGWRados *store, rgw_bucket& bucket,
614 int concurrent_max, bool keep_index_consistent)
615 {
616 int ret;
617 map<RGWObjCategory, RGWStorageStats> stats;
618 std::vector<rgw_bucket_dir_entry> objs;
619 map<string, bool> common_prefixes;
620 RGWBucketInfo info;
621 RGWObjectCtx obj_ctx(store);
622 CephContext *cct = store->ctx();
623
624 string bucket_ver, master_ver;
625
626 ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, info, NULL);
627 if (ret < 0)
628 return ret;
629
630 ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL);
631 if (ret < 0)
632 return ret;
633
634 string prefix, delimiter;
635
636 ret = abort_bucket_multiparts(store, cct, info, prefix, delimiter);
637 if (ret < 0) {
638 return ret;
639 }
640
641 RGWRados::Bucket target(store, info);
642 RGWRados::Bucket::List list_op(&target);
643
644 list_op.params.list_versions = true;
645
646 std::list<librados::AioCompletion*> handles;
647
648 int max = 1000;
649 int max_aio = concurrent_max;
650 ret = list_op.list_objects(max, &objs, &common_prefixes, NULL);
651 if (ret < 0)
652 return ret;
653
654 while (!objs.empty()) {
655 std::vector<rgw_bucket_dir_entry>::iterator it = objs.begin();
656 for (; it != objs.end(); ++it) {
657 RGWObjState *astate = NULL;
658 rgw_obj obj(bucket, (*it).key);
659
660 ret = store->get_obj_state(&obj_ctx, info, obj, &astate, false);
661 if (ret == -ENOENT) {
662 dout(1) << "WARNING: cannot find obj state for obj " << obj.get_oid() << dendl;
663 continue;
664 }
665 if (ret < 0) {
666 lderr(store->ctx()) << "ERROR: get obj state returned with error " << ret << dendl;
667 return ret;
668 }
669
670 if (astate->has_manifest) {
671 RGWObjManifest& manifest = astate->manifest;
672 RGWObjManifest::obj_iterator miter = manifest.obj_begin();
673 rgw_obj head_obj = manifest.get_obj();
674 rgw_raw_obj raw_head_obj;
675 store->obj_to_raw(info.placement_rule, head_obj, &raw_head_obj);
676
677
678 for (; miter != manifest.obj_end() && max_aio--; ++miter) {
679 if (!max_aio) {
680 ret = drain_handles(handles);
681 if (ret < 0) {
682 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
683 return ret;
684 }
685 max_aio = concurrent_max;
686 }
687
688 rgw_raw_obj last_obj = miter.get_location().get_raw_obj(store);
689 if (last_obj == raw_head_obj) {
690 // have the head obj deleted at the end
691 continue;
692 }
693
694 ret = store->delete_raw_obj_aio(last_obj, handles);
695 if (ret < 0) {
696 lderr(store->ctx()) << "ERROR: delete obj aio failed with " << ret << dendl;
697 return ret;
698 }
699 } // for all shadow objs
700
701 ret = store->delete_obj_aio(head_obj, info, astate, handles, keep_index_consistent);
702 if (ret < 0) {
703 lderr(store->ctx()) << "ERROR: delete obj aio failed with " << ret << dendl;
704 return ret;
705 }
706 }
707
708 if (!max_aio) {
709 ret = drain_handles(handles);
710 if (ret < 0) {
711 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
712 return ret;
713 }
714 max_aio = concurrent_max;
715 }
716 } // for all RGW objects
717 objs.clear();
718
719 ret = list_op.list_objects(max, &objs, &common_prefixes, NULL);
720 if (ret < 0)
721 return ret;
722 }
723
724 ret = drain_handles(handles);
725 if (ret < 0) {
726 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
727 return ret;
728 }
729
730 ret = rgw_bucket_sync_user_stats(store, bucket.tenant, info);
731 if (ret < 0) {
732 dout(1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl;
733 }
734
735 RGWObjVersionTracker objv_tracker;
736
737 ret = rgw_bucket_delete_bucket_obj(store, bucket.tenant, bucket.name, objv_tracker);
738 if (ret < 0) {
739 lderr(store->ctx()) << "ERROR: could not remove bucket " << bucket.name << "with ret as " << ret << dendl;
740 return ret;
741 }
742
743 if (!store->is_syncing_bucket_meta(bucket)) {
744 RGWObjVersionTracker objv_tracker;
745 string entry = bucket.get_key();
746 ret = rgw_bucket_instance_remove_entry(store, entry, &objv_tracker);
747 if (ret < 0) {
748 lderr(store->ctx()) << "ERROR: could not remove bucket instance entry" << bucket.name << "with ret as " << ret << dendl;
749 return ret;
750 }
751 }
752
753 ret = rgw_unlink_bucket(store, info.owner, bucket.tenant, bucket.name, false);
754 if (ret < 0) {
755 lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
756 }
757
758 return ret;
759 }
760
761 int rgw_bucket_delete_bucket_obj(RGWRados *store,
762 const string& tenant_name,
763 const string& bucket_name,
764 RGWObjVersionTracker& objv_tracker)
765 {
766 string key;
767
768 rgw_make_bucket_entry_name(tenant_name, bucket_name, key);
769 return store->meta_mgr->remove_entry(bucket_meta_handler, key, &objv_tracker);
770 }
771
772 static void set_err_msg(std::string *sink, std::string msg)
773 {
774 if (sink && !msg.empty())
775 *sink = msg;
776 }
777
778 int RGWBucket::init(RGWRados *storage, RGWBucketAdminOpState& op_state)
779 {
780 if (!storage)
781 return -EINVAL;
782
783 store = storage;
784
785 rgw_user user_id = op_state.get_user_id();
786 tenant = user_id.tenant;
787 bucket_name = op_state.get_bucket_name();
788 RGWUserBuckets user_buckets;
789 RGWObjectCtx obj_ctx(store);
790
791 if (bucket_name.empty() && user_id.empty())
792 return -EINVAL;
793
794 if (!bucket_name.empty()) {
795 int r = store->get_bucket_info(obj_ctx, tenant, bucket_name, bucket_info, NULL);
796 if (r < 0) {
797 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket_name << dendl;
798 return r;
799 }
800
801 op_state.set_bucket(bucket_info.bucket);
802 }
803
804 if (!user_id.empty()) {
805 int r = rgw_get_user_info_by_uid(store, user_id, user_info);
806 if (r < 0)
807 return r;
808
809 op_state.display_name = user_info.display_name;
810 }
811
812 clear_failure();
813 return 0;
814 }
815
816 int RGWBucket::link(RGWBucketAdminOpState& op_state, std::string *err_msg)
817 {
818 if (!op_state.is_user_op()) {
819 set_err_msg(err_msg, "empty user id");
820 return -EINVAL;
821 }
822
823 string bucket_id = op_state.get_bucket_id();
824 if (bucket_id.empty()) {
825 set_err_msg(err_msg, "empty bucket instance id");
826 return -EINVAL;
827 }
828
829 std::string display_name = op_state.get_user_display_name();
830 rgw_bucket bucket = op_state.get_bucket();
831
832 const rgw_pool& root_pool = store->get_zone_params().domain_root;
833 rgw_raw_obj obj(root_pool, bucket.name);
834 RGWObjVersionTracker objv_tracker;
835
836 map<string, bufferlist> attrs;
837 RGWBucketInfo bucket_info;
838
839 string key = bucket.name + ":" + bucket_id;
840 RGWObjectCtx obj_ctx(store);
841 int r = store->get_bucket_instance_info(obj_ctx, key, bucket_info, NULL, &attrs);
842 if (r < 0) {
843 return r;
844 }
845
846 rgw_user user_id = op_state.get_user_id();
847
848 map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_ACL);
849 if (aiter != attrs.end()) {
850 bufferlist aclbl = aiter->second;
851 RGWAccessControlPolicy policy;
852 ACLOwner owner;
853 try {
854 bufferlist::iterator iter = aclbl.begin();
855 ::decode(policy, iter);
856 owner = policy.get_owner();
857 } catch (buffer::error& err) {
858 set_err_msg(err_msg, "couldn't decode policy");
859 return -EIO;
860 }
861
862 r = rgw_unlink_bucket(store, owner.get_id(), bucket.tenant, bucket.name, false);
863 if (r < 0) {
864 set_err_msg(err_msg, "could not unlink policy from user " + owner.get_id().to_str());
865 return r;
866 }
867
868 // now update the user for the bucket...
869 if (display_name.empty()) {
870 ldout(store->ctx(), 0) << "WARNING: user " << user_info.user_id << " has no display name set" << dendl;
871 }
872 policy.create_default(user_info.user_id, display_name);
873
874 owner = policy.get_owner();
875 r = store->set_bucket_owner(bucket_info.bucket, owner);
876 if (r < 0) {
877 set_err_msg(err_msg, "failed to set bucket owner: " + cpp_strerror(-r));
878 return r;
879 }
880
881 // ...and encode the acl
882 aclbl.clear();
883 policy.encode(aclbl);
884
885 r = store->system_obj_set_attr(NULL, obj, RGW_ATTR_ACL, aclbl, &objv_tracker);
886 if (r < 0) {
887 return r;
888 }
889
890 RGWAccessControlPolicy policy_instance;
891 policy_instance.create_default(user_info.user_id, display_name);
892 aclbl.clear();
893 policy_instance.encode(aclbl);
894
895 string oid_bucket_instance = RGW_BUCKET_INSTANCE_MD_PREFIX + key;
896 rgw_raw_obj obj_bucket_instance(root_pool, oid_bucket_instance);
897 r = store->system_obj_set_attr(NULL, obj_bucket_instance, RGW_ATTR_ACL, aclbl, &objv_tracker);
898 if (r < 0) {
899 return r;
900 }
901
902 r = rgw_link_bucket(store, user_info.user_id, bucket_info.bucket, real_time());
903 if (r < 0) {
904 return r;
905 }
906 }
907
908 return 0;
909 }
910
911 int RGWBucket::unlink(RGWBucketAdminOpState& op_state, std::string *err_msg)
912 {
913 rgw_bucket bucket = op_state.get_bucket();
914
915 if (!op_state.is_user_op()) {
916 set_err_msg(err_msg, "could not fetch user or user bucket info");
917 return -EINVAL;
918 }
919
920 int r = rgw_unlink_bucket(store, user_info.user_id, bucket.tenant, bucket.name);
921 if (r < 0) {
922 set_err_msg(err_msg, "error unlinking bucket" + cpp_strerror(-r));
923 }
924
925 return r;
926 }
927
928 int RGWBucket::remove(RGWBucketAdminOpState& op_state, bool bypass_gc,
929 bool keep_index_consistent, std::string *err_msg)
930 {
931 bool delete_children = op_state.will_delete_children();
932 rgw_bucket bucket = op_state.get_bucket();
933 int ret;
934
935 if (bypass_gc) {
936 if (delete_children) {
937 ret = rgw_remove_bucket_bypass_gc(store, bucket, op_state.get_max_aio(), keep_index_consistent);
938 } else {
939 set_err_msg(err_msg, "purge objects should be set for gc to be bypassed");
940 return -EINVAL;
941 }
942 } else {
943 ret = rgw_remove_bucket(store, bucket, delete_children);
944 }
945
946 if (ret < 0) {
947 set_err_msg(err_msg, "unable to remove bucket" + cpp_strerror(-ret));
948 return ret;
949 }
950
951 return 0;
952 }
953
954 int RGWBucket::remove_object(RGWBucketAdminOpState& op_state, std::string *err_msg)
955 {
956 rgw_bucket bucket = op_state.get_bucket();
957 std::string object_name = op_state.get_object_name();
958
959 rgw_obj_key key(object_name);
960
961 int ret = rgw_remove_object(store, bucket_info, bucket, key);
962 if (ret < 0) {
963 set_err_msg(err_msg, "unable to remove object" + cpp_strerror(-ret));
964 return ret;
965 }
966
967 return 0;
968 }
969
970 static void dump_bucket_index(map<string, rgw_bucket_dir_entry> result, Formatter *f)
971 {
972 map<string, rgw_bucket_dir_entry>::iterator iter;
973 for (iter = result.begin(); iter != result.end(); ++iter) {
974 f->dump_string("object", iter->first);
975 }
976 }
977
978 static void dump_bucket_usage(map<RGWObjCategory, RGWStorageStats>& stats, Formatter *formatter)
979 {
980 map<RGWObjCategory, RGWStorageStats>::iterator iter;
981
982 formatter->open_object_section("usage");
983 for (iter = stats.begin(); iter != stats.end(); ++iter) {
984 RGWStorageStats& s = iter->second;
985 const char *cat_name = rgw_obj_category_name(iter->first);
986 formatter->open_object_section(cat_name);
987 s.dump(formatter);
988 formatter->close_section();
989 }
990 formatter->close_section();
991 }
992
993 static void dump_index_check(map<RGWObjCategory, RGWStorageStats> existing_stats,
994 map<RGWObjCategory, RGWStorageStats> calculated_stats,
995 Formatter *formatter)
996 {
997 formatter->open_object_section("check_result");
998 formatter->open_object_section("existing_header");
999 dump_bucket_usage(existing_stats, formatter);
1000 formatter->close_section();
1001 formatter->open_object_section("calculated_header");
1002 dump_bucket_usage(calculated_stats, formatter);
1003 formatter->close_section();
1004 formatter->close_section();
1005 }
1006
1007 int RGWBucket::check_bad_index_multipart(RGWBucketAdminOpState& op_state,
1008 list<rgw_obj_index_key>& objs_to_unlink, std::string *err_msg)
1009 {
1010 bool fix_index = op_state.will_fix_index();
1011 rgw_bucket bucket = op_state.get_bucket();
1012
1013 int max = 1000;
1014
1015 map<string, bool> common_prefixes;
1016
1017 bool is_truncated;
1018 map<string, bool> meta_objs;
1019 map<rgw_obj_index_key, string> all_objs;
1020
1021 RGWBucketInfo bucket_info;
1022 RGWObjectCtx obj_ctx(store);
1023 int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, nullptr, nullptr);
1024 if (r < 0) {
1025 ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): get_bucket_instance_info(bucket=" << bucket << ") returned r=" << r << dendl;
1026 return r;
1027 }
1028
1029 RGWRados::Bucket target(store, bucket_info);
1030 RGWRados::Bucket::List list_op(&target);
1031
1032 list_op.params.list_versions = true;
1033 list_op.params.ns = RGW_OBJ_NS_MULTIPART;
1034
1035 do {
1036 vector<rgw_bucket_dir_entry> result;
1037 int r = list_op.list_objects(max, &result, &common_prefixes, &is_truncated);
1038 if (r < 0) {
1039 set_err_msg(err_msg, "failed to list objects in bucket=" + bucket.name +
1040 " err=" + cpp_strerror(-r));
1041
1042 return r;
1043 }
1044
1045 vector<rgw_bucket_dir_entry>::iterator iter;
1046 for (iter = result.begin(); iter != result.end(); ++iter) {
1047 rgw_obj_index_key key = iter->key;
1048 rgw_obj obj(bucket, key);
1049 string oid = obj.get_oid();
1050
1051 int pos = oid.find_last_of('.');
1052 if (pos < 0) {
1053 /* obj has no suffix */
1054 all_objs[key] = oid;
1055 } else {
1056 /* obj has suffix */
1057 string name = oid.substr(0, pos);
1058 string suffix = oid.substr(pos + 1);
1059
1060 if (suffix.compare("meta") == 0) {
1061 meta_objs[name] = true;
1062 } else {
1063 all_objs[key] = name;
1064 }
1065 }
1066 }
1067
1068 } while (is_truncated);
1069
1070 for (auto aiter = all_objs.begin(); aiter != all_objs.end(); ++aiter) {
1071 string& name = aiter->second;
1072
1073 if (meta_objs.find(name) == meta_objs.end()) {
1074 objs_to_unlink.push_back(aiter->first);
1075 }
1076 }
1077
1078 if (objs_to_unlink.empty())
1079 return 0;
1080
1081 if (fix_index) {
1082 int r = store->remove_objs_from_index(bucket_info, objs_to_unlink);
1083 if (r < 0) {
1084 set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
1085 cpp_strerror(-r));
1086
1087 return r;
1088 }
1089 }
1090
1091 return 0;
1092 }
1093
1094 int RGWBucket::check_object_index(RGWBucketAdminOpState& op_state,
1095 RGWFormatterFlusher& flusher,
1096 std::string *err_msg)
1097 {
1098
1099 bool fix_index = op_state.will_fix_index();
1100
1101 rgw_bucket bucket = op_state.get_bucket();
1102
1103 if (!fix_index) {
1104 set_err_msg(err_msg, "check-objects flag requires fix index enabled");
1105 return -EINVAL;
1106 }
1107
1108 store->cls_obj_set_bucket_tag_timeout(bucket_info, BUCKET_TAG_TIMEOUT);
1109
1110 string prefix;
1111 rgw_obj_index_key marker;
1112 bool is_truncated = true;
1113
1114 Formatter *formatter = flusher.get_formatter();
1115 formatter->open_object_section("objects");
1116 while (is_truncated) {
1117 map<string, rgw_bucket_dir_entry> result;
1118
1119 int r = store->cls_bucket_list(bucket_info, RGW_NO_SHARD, marker, prefix, 1000, true,
1120 result, &is_truncated, &marker,
1121 bucket_object_check_filter);
1122 if (r == -ENOENT) {
1123 break;
1124 } else if (r < 0 && r != -ENOENT) {
1125 set_err_msg(err_msg, "ERROR: failed operation r=" + cpp_strerror(-r));
1126 }
1127
1128
1129 dump_bucket_index(result, formatter);
1130 flusher.flush();
1131
1132 }
1133
1134 formatter->close_section();
1135
1136 store->cls_obj_set_bucket_tag_timeout(bucket_info, 0);
1137
1138 return 0;
1139 }
1140
1141
1142 int RGWBucket::check_index(RGWBucketAdminOpState& op_state,
1143 map<RGWObjCategory, RGWStorageStats>& existing_stats,
1144 map<RGWObjCategory, RGWStorageStats>& calculated_stats,
1145 std::string *err_msg)
1146 {
1147 rgw_bucket bucket = op_state.get_bucket();
1148 bool fix_index = op_state.will_fix_index();
1149
1150 int r = store->bucket_check_index(bucket_info, &existing_stats, &calculated_stats);
1151 if (r < 0) {
1152 set_err_msg(err_msg, "failed to check index error=" + cpp_strerror(-r));
1153 return r;
1154 }
1155
1156 if (fix_index) {
1157 r = store->bucket_rebuild_index(bucket_info);
1158 if (r < 0) {
1159 set_err_msg(err_msg, "failed to rebuild index err=" + cpp_strerror(-r));
1160 return r;
1161 }
1162 }
1163
1164 return 0;
1165 }
1166
1167
1168 int RGWBucket::policy_bl_to_stream(bufferlist& bl, ostream& o)
1169 {
1170 RGWAccessControlPolicy_S3 policy(g_ceph_context);
1171 bufferlist::iterator iter = bl.begin();
1172 try {
1173 policy.decode(iter);
1174 } catch (buffer::error& err) {
1175 dout(0) << "ERROR: caught buffer::error, could not decode policy" << dendl;
1176 return -EIO;
1177 }
1178 policy.to_xml(o);
1179 return 0;
1180 }
1181
1182 static int policy_decode(RGWRados *store, bufferlist& bl, RGWAccessControlPolicy& policy)
1183 {
1184 bufferlist::iterator iter = bl.begin();
1185 try {
1186 policy.decode(iter);
1187 } catch (buffer::error& err) {
1188 ldout(store->ctx(), 0) << "ERROR: caught buffer::error, could not decode policy" << dendl;
1189 return -EIO;
1190 }
1191 return 0;
1192 }
1193
1194 int RGWBucket::get_policy(RGWBucketAdminOpState& op_state, RGWAccessControlPolicy& policy)
1195 {
1196 std::string object_name = op_state.get_object_name();
1197 rgw_bucket bucket = op_state.get_bucket();
1198 RGWObjectCtx obj_ctx(store);
1199
1200 RGWBucketInfo bucket_info;
1201 map<string, bufferlist> attrs;
1202 int ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, bucket_info, NULL, &attrs);
1203 if (ret < 0) {
1204 return ret;
1205 }
1206
1207 if (!object_name.empty()) {
1208 bufferlist bl;
1209 rgw_obj obj(bucket, object_name);
1210
1211 RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
1212 RGWRados::Object::Read rop(&op_target);
1213
1214 int ret = rop.get_attr(RGW_ATTR_ACL, bl);
1215 if (ret < 0)
1216 return ret;
1217
1218 return policy_decode(store, bl, policy);
1219 }
1220
1221 map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_ACL);
1222 if (aiter == attrs.end()) {
1223 return -ENOENT;
1224 }
1225
1226 return policy_decode(store, aiter->second, policy);
1227 }
1228
1229
1230 int RGWBucketAdminOp::get_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
1231 RGWAccessControlPolicy& policy)
1232 {
1233 RGWBucket bucket;
1234
1235 int ret = bucket.init(store, op_state);
1236 if (ret < 0)
1237 return ret;
1238
1239 ret = bucket.get_policy(op_state, policy);
1240 if (ret < 0)
1241 return ret;
1242
1243 return 0;
1244 }
1245
1246 /* Wrappers to facilitate RESTful interface */
1247
1248
1249 int RGWBucketAdminOp::get_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
1250 RGWFormatterFlusher& flusher)
1251 {
1252 RGWAccessControlPolicy policy(store->ctx());
1253
1254 int ret = get_policy(store, op_state, policy);
1255 if (ret < 0)
1256 return ret;
1257
1258 Formatter *formatter = flusher.get_formatter();
1259
1260 flusher.start(0);
1261
1262 formatter->open_object_section("policy");
1263 policy.dump(formatter);
1264 formatter->close_section();
1265
1266 flusher.flush();
1267
1268 return 0;
1269 }
1270
1271 int RGWBucketAdminOp::dump_s3_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
1272 ostream& os)
1273 {
1274 RGWAccessControlPolicy_S3 policy(store->ctx());
1275
1276 int ret = get_policy(store, op_state, policy);
1277 if (ret < 0)
1278 return ret;
1279
1280 policy.to_xml(os);
1281
1282 return 0;
1283 }
1284
1285 int RGWBucketAdminOp::unlink(RGWRados *store, RGWBucketAdminOpState& op_state)
1286 {
1287 RGWBucket bucket;
1288
1289 int ret = bucket.init(store, op_state);
1290 if (ret < 0)
1291 return ret;
1292
1293 return bucket.unlink(op_state);
1294 }
1295
1296 int RGWBucketAdminOp::link(RGWRados *store, RGWBucketAdminOpState& op_state, string *err)
1297 {
1298 RGWBucket bucket;
1299
1300 int ret = bucket.init(store, op_state);
1301 if (ret < 0)
1302 return ret;
1303
1304 return bucket.link(op_state, err);
1305
1306 }
1307
1308 int RGWBucketAdminOp::check_index(RGWRados *store, RGWBucketAdminOpState& op_state,
1309 RGWFormatterFlusher& flusher)
1310 {
1311 int ret;
1312 map<string, rgw_bucket_dir_entry> result;
1313 map<RGWObjCategory, RGWStorageStats> existing_stats;
1314 map<RGWObjCategory, RGWStorageStats> calculated_stats;
1315 list<rgw_obj_index_key> objs_to_unlink;
1316
1317 RGWBucket bucket;
1318
1319 ret = bucket.init(store, op_state);
1320 if (ret < 0)
1321 return ret;
1322
1323 Formatter *formatter = flusher.get_formatter();
1324 flusher.start(0);
1325
1326 ret = bucket.check_bad_index_multipart(op_state, objs_to_unlink);
1327 if (ret < 0)
1328 return ret;
1329
1330 dump_mulipart_index_results(objs_to_unlink, formatter);
1331 flusher.flush();
1332
1333 ret = bucket.check_object_index(op_state, flusher);
1334 if (ret < 0)
1335 return ret;
1336
1337 ret = bucket.check_index(op_state, existing_stats, calculated_stats);
1338 if (ret < 0)
1339 return ret;
1340
1341 dump_index_check(existing_stats, calculated_stats, formatter);
1342 flusher.flush();
1343
1344 return 0;
1345 }
1346
1347 int RGWBucketAdminOp::remove_bucket(RGWRados *store, RGWBucketAdminOpState& op_state,
1348 bool bypass_gc, bool keep_index_consistent)
1349 {
1350 RGWBucket bucket;
1351
1352 int ret = bucket.init(store, op_state);
1353 if (ret < 0)
1354 return ret;
1355
1356 std::string err_msg;
1357 ret = bucket.remove(op_state, bypass_gc, keep_index_consistent, &err_msg);
1358 if (!err_msg.empty()) {
1359 lderr(store->ctx()) << "ERROR: " << err_msg << dendl;
1360 }
1361 return ret;
1362 }
1363
1364 int RGWBucketAdminOp::remove_object(RGWRados *store, RGWBucketAdminOpState& op_state)
1365 {
1366 RGWBucket bucket;
1367
1368 int ret = bucket.init(store, op_state);
1369 if (ret < 0)
1370 return ret;
1371
1372 return bucket.remove_object(op_state);
1373 }
1374
1375 static int bucket_stats(RGWRados *store, const std::string& tenant_name, std::string& bucket_name, Formatter *formatter)
1376 {
1377 RGWBucketInfo bucket_info;
1378 map<RGWObjCategory, RGWStorageStats> stats;
1379
1380 real_time mtime;
1381 RGWObjectCtx obj_ctx(store);
1382 int r = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, &mtime);
1383 if (r < 0)
1384 return r;
1385
1386 rgw_bucket& bucket = bucket_info.bucket;
1387
1388 string bucket_ver, master_ver;
1389 string max_marker;
1390 int ret = store->get_bucket_stats(bucket_info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, &max_marker);
1391 if (ret < 0) {
1392 cerr << "error getting bucket stats ret=" << ret << std::endl;
1393 return ret;
1394 }
1395
1396 utime_t ut(mtime);
1397
1398 formatter->open_object_section("stats");
1399 formatter->dump_string("bucket", bucket.name);
1400 formatter->dump_string("zonegroup", bucket_info.zonegroup);
1401 formatter->dump_string("placement_rule", bucket_info.placement_rule);
1402 ::encode_json("explicit_placement", bucket.explicit_placement, formatter);
1403 formatter->dump_string("id", bucket.bucket_id);
1404 formatter->dump_string("marker", bucket.marker);
1405 formatter->dump_stream("index_type") << bucket_info.index_type;
1406 ::encode_json("owner", bucket_info.owner, formatter);
1407 formatter->dump_string("ver", bucket_ver);
1408 formatter->dump_string("master_ver", master_ver);
1409 formatter->dump_stream("mtime") << ut;
1410 formatter->dump_string("max_marker", max_marker);
1411 dump_bucket_usage(stats, formatter);
1412 encode_json("bucket_quota", bucket_info.quota, formatter);
1413 formatter->close_section();
1414
1415 return 0;
1416 }
1417
1418 int RGWBucketAdminOp::limit_check(RGWRados *store,
1419 RGWBucketAdminOpState& op_state,
1420 const std::list<std::string>& user_ids,
1421 RGWFormatterFlusher& flusher,
1422 bool warnings_only)
1423 {
1424 int ret = 0;
1425 const size_t max_entries =
1426 store->ctx()->_conf->rgw_list_buckets_max_chunk;
1427
1428 const size_t safe_max_objs_per_shard =
1429 store->ctx()->_conf->rgw_safe_max_objects_per_shard;
1430
1431 uint16_t shard_warn_pct =
1432 store->ctx()->_conf->rgw_shard_warning_threshold;
1433 if (shard_warn_pct > 100)
1434 shard_warn_pct = 90;
1435
1436 Formatter *formatter = flusher.get_formatter();
1437 flusher.start(0);
1438
1439 formatter->open_array_section("users");
1440
1441 for (const auto& user_id : user_ids) {
1442 formatter->open_object_section("user");
1443 formatter->dump_string("user_id", user_id);
1444 bool done;
1445 formatter->open_array_section("buckets");
1446 do {
1447 RGWUserBuckets buckets;
1448 string marker;
1449 bool is_truncated;
1450
1451 ret = rgw_read_user_buckets(store, user_id, buckets,
1452 marker, string(), max_entries, false,
1453 &is_truncated);
1454 if (ret < 0)
1455 return ret;
1456
1457 map<string, RGWBucketEnt>& m_buckets = buckets.get_buckets();
1458
1459 for (const auto& iter : m_buckets) {
1460 auto& bucket = iter.second.bucket;
1461 uint32_t num_shards = 1;
1462 uint64_t num_objects = 0;
1463
1464 /* need info for num_shards */
1465 RGWBucketInfo info;
1466 RGWObjectCtx obj_ctx(store);
1467
1468 marker = bucket.name; /* Casey's location for marker update,
1469 * as we may now not reach the end of
1470 * the loop body */
1471
1472 ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name,
1473 info, nullptr);
1474 if (ret < 0)
1475 continue;
1476
1477 /* need stats for num_entries */
1478 string bucket_ver, master_ver;
1479 std::map<RGWObjCategory, RGWStorageStats> stats;
1480 ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver,
1481 &master_ver, stats, nullptr);
1482
1483 if (ret < 0)
1484 continue;
1485
1486 for (const auto& s : stats) {
1487 num_objects += s.second.num_objects;
1488 }
1489
1490 num_shards = info.num_shards;
1491 uint64_t objs_per_shard =
1492 (num_shards) ? num_objects/num_shards : num_objects;
1493 {
1494 bool warn = false;
1495 stringstream ss;
1496 if (objs_per_shard > safe_max_objs_per_shard) {
1497 double over =
1498 100 - (safe_max_objs_per_shard/objs_per_shard * 100);
1499 ss << boost::format("OVER %4f%%") % over;
1500 warn = true;
1501 } else {
1502 double fill_pct =
1503 objs_per_shard / safe_max_objs_per_shard * 100;
1504 if (fill_pct >= shard_warn_pct) {
1505 ss << boost::format("WARN %4f%%") % fill_pct;
1506 warn = true;
1507 } else {
1508 ss << "OK";
1509 }
1510 }
1511
1512 if (warn || (! warnings_only)) {
1513 formatter->open_object_section("bucket");
1514 formatter->dump_string("bucket", bucket.name);
1515 formatter->dump_string("tenant", bucket.tenant);
1516 formatter->dump_int("num_objects", num_objects);
1517 formatter->dump_int("num_shards", num_shards);
1518 formatter->dump_int("objects_per_shard", objs_per_shard);
1519 formatter->dump_string("fill_status", ss.str());
1520 formatter->close_section();
1521 }
1522 }
1523 }
1524
1525 done = (m_buckets.size() < max_entries);
1526 } while (!done); /* foreach: bucket */
1527
1528 formatter->close_section();
1529 formatter->close_section();
1530 formatter->flush(cout);
1531
1532 } /* foreach: user_id */
1533
1534 formatter->close_section();
1535 formatter->flush(cout);
1536
1537 return ret;
1538 } /* RGWBucketAdminOp::limit_check */
1539
1540 int RGWBucketAdminOp::info(RGWRados *store, RGWBucketAdminOpState& op_state,
1541 RGWFormatterFlusher& flusher)
1542 {
1543 RGWBucket bucket;
1544 int ret;
1545
1546 string bucket_name = op_state.get_bucket_name();
1547
1548 if (!bucket_name.empty()) {
1549 ret = bucket.init(store, op_state);
1550 if (ret < 0)
1551 return ret;
1552 }
1553
1554 Formatter *formatter = flusher.get_formatter();
1555 flusher.start(0);
1556
1557 CephContext *cct = store->ctx();
1558
1559 const size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
1560
1561 bool show_stats = op_state.will_fetch_stats();
1562 rgw_user user_id = op_state.get_user_id();
1563 if (op_state.is_user_op()) {
1564 formatter->open_array_section("buckets");
1565
1566 RGWUserBuckets buckets;
1567 string marker;
1568 bool is_truncated = false;
1569
1570 do {
1571 ret = rgw_read_user_buckets(store, op_state.get_user_id(), buckets,
1572 marker, string(), max_entries, false,
1573 &is_truncated);
1574 if (ret < 0)
1575 return ret;
1576
1577 map<string, RGWBucketEnt>& m = buckets.get_buckets();
1578 map<string, RGWBucketEnt>::iterator iter;
1579
1580 for (iter = m.begin(); iter != m.end(); ++iter) {
1581 std::string obj_name = iter->first;
1582 if (show_stats)
1583 bucket_stats(store, user_id.tenant, obj_name, formatter);
1584 else
1585 formatter->dump_string("bucket", obj_name);
1586
1587 marker = obj_name;
1588 }
1589
1590 flusher.flush();
1591 } while (is_truncated);
1592
1593 formatter->close_section();
1594 } else if (!bucket_name.empty()) {
1595 bucket_stats(store, user_id.tenant, bucket_name, formatter);
1596 } else {
1597 RGWAccessHandle handle;
1598
1599 formatter->open_array_section("buckets");
1600 if (store->list_buckets_init(&handle) >= 0) {
1601 rgw_bucket_dir_entry obj;
1602 while (store->list_buckets_next(obj, &handle) >= 0) {
1603 if (show_stats)
1604 bucket_stats(store, user_id.tenant, obj.key.name, formatter);
1605 else
1606 formatter->dump_string("bucket", obj.key.name);
1607 }
1608 }
1609
1610 formatter->close_section();
1611 }
1612
1613 flusher.flush();
1614
1615 return 0;
1616 }
1617
1618
1619 void rgw_data_change::dump(Formatter *f) const
1620 {
1621 string type;
1622 switch (entity_type) {
1623 case ENTITY_TYPE_BUCKET:
1624 type = "bucket";
1625 break;
1626 default:
1627 type = "unknown";
1628 }
1629 encode_json("entity_type", type, f);
1630 encode_json("key", key, f);
1631 utime_t ut(timestamp);
1632 encode_json("timestamp", ut, f);
1633 }
1634
1635 void rgw_data_change::decode_json(JSONObj *obj) {
1636 string s;
1637 JSONDecoder::decode_json("entity_type", s, obj);
1638 if (s == "bucket") {
1639 entity_type = ENTITY_TYPE_BUCKET;
1640 } else {
1641 entity_type = ENTITY_TYPE_UNKNOWN;
1642 }
1643 JSONDecoder::decode_json("key", key, obj);
1644 utime_t ut;
1645 JSONDecoder::decode_json("timestamp", ut, obj);
1646 timestamp = ut.to_real_time();
1647 }
1648
1649 void rgw_data_change_log_entry::dump(Formatter *f) const
1650 {
1651 encode_json("log_id", log_id, f);
1652 utime_t ut(log_timestamp);
1653 encode_json("log_timestamp", ut, f);
1654 encode_json("entry", entry, f);
1655 }
1656
1657 void rgw_data_change_log_entry::decode_json(JSONObj *obj) {
1658 JSONDecoder::decode_json("log_id", log_id, obj);
1659 utime_t ut;
1660 JSONDecoder::decode_json("log_timestamp", ut, obj);
1661 log_timestamp = ut.to_real_time();
1662 JSONDecoder::decode_json("entry", entry, obj);
1663 }
1664
1665 int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
1666 const string& name = bs.bucket.name;
1667 int shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
1668 uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards;
1669
1670 return (int)r;
1671 }
1672
1673 int RGWDataChangesLog::renew_entries()
1674 {
1675 if (!store->need_to_log_data())
1676 return 0;
1677
1678 /* we can't keep the bucket name as part of the cls_log_entry, and we need
1679 * it later, so we keep two lists under the map */
1680 map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > > m;
1681
1682 lock.Lock();
1683 map<rgw_bucket_shard, bool> entries;
1684 entries.swap(cur_cycle);
1685 lock.Unlock();
1686
1687 map<rgw_bucket_shard, bool>::iterator iter;
1688 string section;
1689 real_time ut = real_clock::now();
1690 for (iter = entries.begin(); iter != entries.end(); ++iter) {
1691 const rgw_bucket_shard& bs = iter->first;
1692
1693 int index = choose_oid(bs);
1694
1695 cls_log_entry entry;
1696
1697 rgw_data_change change;
1698 bufferlist bl;
1699 change.entity_type = ENTITY_TYPE_BUCKET;
1700 change.key = bs.get_key();
1701 change.timestamp = ut;
1702 ::encode(change, bl);
1703
1704 store->time_log_prepare_entry(entry, ut, section, change.key, bl);
1705
1706 m[index].first.push_back(bs);
1707 m[index].second.emplace_back(std::move(entry));
1708 }
1709
1710 map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > >::iterator miter;
1711 for (miter = m.begin(); miter != m.end(); ++miter) {
1712 list<cls_log_entry>& entries = miter->second.second;
1713
1714 real_time now = real_clock::now();
1715
1716 int ret = store->time_log_add(oids[miter->first], entries, NULL);
1717 if (ret < 0) {
1718 /* we don't really need to have a special handling for failed cases here,
1719 * as this is just an optimization. */
1720 lderr(cct) << "ERROR: store->time_log_add() returned " << ret << dendl;
1721 return ret;
1722 }
1723
1724 real_time expiration = now;
1725 expiration += make_timespan(cct->_conf->rgw_data_log_window);
1726
1727 list<rgw_bucket_shard>& buckets = miter->second.first;
1728 list<rgw_bucket_shard>::iterator liter;
1729 for (liter = buckets.begin(); liter != buckets.end(); ++liter) {
1730 update_renewed(*liter, expiration);
1731 }
1732 }
1733
1734 return 0;
1735 }
1736
1737 void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status)
1738 {
1739 assert(lock.is_locked());
1740 if (!changes.find(bs, status)) {
1741 status = ChangeStatusPtr(new ChangeStatus);
1742 changes.add(bs, status);
1743 }
1744 }
1745
1746 void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs)
1747 {
1748 Mutex::Locker l(lock);
1749 cur_cycle[bs] = true;
1750 }
1751
1752 void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, real_time& expiration)
1753 {
1754 Mutex::Locker l(lock);
1755 ChangeStatusPtr status;
1756 _get_change(bs, status);
1757
1758 ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs.bucket.name << " shard_id=" << bs.shard_id << " expiration=" << expiration << dendl;
1759 status->cur_expiration = expiration;
1760 }
1761
1762 int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) {
1763 rgw_bucket_shard bs(bucket, shard_id);
1764
1765 return choose_oid(bs);
1766 }
1767
1768 int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) {
1769 if (!store->need_to_log_data())
1770 return 0;
1771
1772 rgw_bucket_shard bs(bucket, shard_id);
1773
1774 int index = choose_oid(bs);
1775 mark_modified(index, bs);
1776
1777 lock.Lock();
1778
1779 ChangeStatusPtr status;
1780 _get_change(bs, status);
1781
1782 lock.Unlock();
1783
1784 real_time now = real_clock::now();
1785
1786 status->lock->Lock();
1787
1788 ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
1789
1790 if (now < status->cur_expiration) {
1791 /* no need to send, recently completed */
1792 status->lock->Unlock();
1793
1794 register_renew(bs);
1795 return 0;
1796 }
1797
1798 RefCountedCond *cond;
1799
1800 if (status->pending) {
1801 cond = status->cond;
1802
1803 assert(cond);
1804
1805 status->cond->get();
1806 status->lock->Unlock();
1807
1808 int ret = cond->wait();
1809 cond->put();
1810 if (!ret) {
1811 register_renew(bs);
1812 }
1813 return ret;
1814 }
1815
1816 status->cond = new RefCountedCond;
1817 status->pending = true;
1818
1819 string& oid = oids[index];
1820 real_time expiration;
1821
1822 int ret;
1823
1824 do {
1825 status->cur_sent = now;
1826
1827 expiration = now;
1828 expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
1829
1830 status->lock->Unlock();
1831
1832 bufferlist bl;
1833 rgw_data_change change;
1834 change.entity_type = ENTITY_TYPE_BUCKET;
1835 change.key = bs.get_key();
1836 change.timestamp = now;
1837 ::encode(change, bl);
1838 string section;
1839
1840 ldout(cct, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
1841
1842 ret = store->time_log_add(oid, now, section, change.key, bl);
1843
1844 now = real_clock::now();
1845
1846 status->lock->Lock();
1847
1848 } while (!ret && real_clock::now() > expiration);
1849
1850 cond = status->cond;
1851
1852 status->pending = false;
1853 status->cur_expiration = status->cur_sent; /* time of when operation started, not completed */
1854 status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window);
1855 status->cond = NULL;
1856 status->lock->Unlock();
1857
1858 cond->done(ret);
1859 cond->put();
1860
1861 return ret;
1862 }
1863
1864 int RGWDataChangesLog::list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
1865 list<rgw_data_change_log_entry>& entries,
1866 const string& marker,
1867 string *out_marker,
1868 bool *truncated) {
1869 if (shard >= num_shards)
1870 return -EINVAL;
1871
1872 list<cls_log_entry> log_entries;
1873
1874 int ret = store->time_log_list(oids[shard], start_time, end_time,
1875 max_entries, log_entries, marker,
1876 out_marker, truncated);
1877 if (ret < 0)
1878 return ret;
1879
1880 list<cls_log_entry>::iterator iter;
1881 for (iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
1882 rgw_data_change_log_entry log_entry;
1883 log_entry.log_id = iter->id;
1884 real_time rt = iter->timestamp.to_real_time();
1885 log_entry.log_timestamp = rt;
1886 bufferlist::iterator liter = iter->data.begin();
1887 try {
1888 ::decode(log_entry.entry, liter);
1889 } catch (buffer::error& err) {
1890 lderr(cct) << "ERROR: failed to decode data changes log entry" << dendl;
1891 return -EIO;
1892 }
1893 entries.push_back(log_entry);
1894 }
1895
1896 return 0;
1897 }
1898
1899 int RGWDataChangesLog::list_entries(const real_time& start_time, const real_time& end_time, int max_entries,
1900 list<rgw_data_change_log_entry>& entries, LogMarker& marker, bool *ptruncated) {
1901 bool truncated;
1902 entries.clear();
1903
1904 for (; marker.shard < num_shards && (int)entries.size() < max_entries;
1905 marker.shard++, marker.marker.clear()) {
1906 int ret = list_entries(marker.shard, start_time, end_time, max_entries - entries.size(), entries,
1907 marker.marker, NULL, &truncated);
1908 if (ret == -ENOENT) {
1909 continue;
1910 }
1911 if (ret < 0) {
1912 return ret;
1913 }
1914 if (truncated) {
1915 *ptruncated = true;
1916 return 0;
1917 }
1918 }
1919
1920 *ptruncated = (marker.shard < num_shards);
1921
1922 return 0;
1923 }
1924
1925 int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info)
1926 {
1927 if (shard_id >= num_shards)
1928 return -EINVAL;
1929
1930 string oid = oids[shard_id];
1931
1932 cls_log_header header;
1933
1934 int ret = store->time_log_info(oid, &header);
1935 if ((ret < 0) && (ret != -ENOENT))
1936 return ret;
1937
1938 info->marker = header.max_marker;
1939 info->last_update = header.max_time.to_real_time();
1940
1941 return 0;
1942 }
1943
1944 int RGWDataChangesLog::trim_entries(int shard_id, const real_time& start_time, const real_time& end_time,
1945 const string& start_marker, const string& end_marker)
1946 {
1947 int ret;
1948
1949 if (shard_id > num_shards)
1950 return -EINVAL;
1951
1952 ret = store->time_log_trim(oids[shard_id], start_time, end_time, start_marker, end_marker);
1953
1954 if (ret == -ENOENT || ret == -ENODATA)
1955 ret = 0;
1956
1957 return ret;
1958 }
1959
1960 int RGWDataChangesLog::trim_entries(const real_time& start_time, const real_time& end_time,
1961 const string& start_marker, const string& end_marker)
1962 {
1963 for (int shard = 0; shard < num_shards; shard++) {
1964 int ret = store->time_log_trim(oids[shard], start_time, end_time, start_marker, end_marker);
1965 if (ret == -ENOENT || ret == -ENODATA) {
1966 continue;
1967 }
1968 if (ret < 0)
1969 return ret;
1970 }
1971
1972 return 0;
1973 }
1974
1975 bool RGWDataChangesLog::going_down()
1976 {
1977 return down_flag;
1978 }
1979
1980 RGWDataChangesLog::~RGWDataChangesLog() {
1981 down_flag = true;
1982 renew_thread->stop();
1983 renew_thread->join();
1984 delete renew_thread;
1985 delete[] oids;
1986 }
1987
1988 void *RGWDataChangesLog::ChangesRenewThread::entry() {
1989 do {
1990 dout(2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl;
1991 int r = log->renew_entries();
1992 if (r < 0) {
1993 dout(0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl;
1994 }
1995
1996 if (log->going_down())
1997 break;
1998
1999 int interval = cct->_conf->rgw_data_log_window * 3 / 4;
2000 lock.Lock();
2001 cond.WaitInterval(lock, utime_t(interval, 0));
2002 lock.Unlock();
2003 } while (!log->going_down());
2004
2005 return NULL;
2006 }
2007
2008 void RGWDataChangesLog::ChangesRenewThread::stop()
2009 {
2010 Mutex::Locker l(lock);
2011 cond.Signal();
2012 }
2013
2014 void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
2015 {
2016 auto key = bs.get_key();
2017 modified_lock.get_read();
2018 map<int, set<string> >::iterator iter = modified_shards.find(shard_id);
2019 if (iter != modified_shards.end()) {
2020 set<string>& keys = iter->second;
2021 if (keys.find(key) != keys.end()) {
2022 modified_lock.unlock();
2023 return;
2024 }
2025 }
2026 modified_lock.unlock();
2027
2028 RWLock::WLocker wl(modified_lock);
2029 modified_shards[shard_id].insert(key);
2030 }
2031
2032 void RGWDataChangesLog::read_clear_modified(map<int, set<string> > &modified)
2033 {
2034 RWLock::WLocker wl(modified_lock);
2035 modified.swap(modified_shards);
2036 modified_shards.clear();
2037 }
2038
2039 void RGWBucketCompleteInfo::dump(Formatter *f) const {
2040 encode_json("bucket_info", info, f);
2041 encode_json("attrs", attrs, f);
2042 }
2043
2044 void RGWBucketCompleteInfo::decode_json(JSONObj *obj) {
2045 JSONDecoder::decode_json("bucket_info", info, obj);
2046 JSONDecoder::decode_json("attrs", attrs, obj);
2047 }
2048
2049 class RGWBucketMetadataHandler : public RGWMetadataHandler {
2050
2051 public:
2052 string get_type() override { return "bucket"; }
2053
2054 int get(RGWRados *store, string& entry, RGWMetadataObject **obj) override {
2055 RGWObjVersionTracker ot;
2056 RGWBucketEntryPoint be;
2057
2058 real_time mtime;
2059 map<string, bufferlist> attrs;
2060 RGWObjectCtx obj_ctx(store);
2061
2062 string tenant_name, bucket_name;
2063 parse_bucket(entry, &tenant_name, &bucket_name);
2064 int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, be, &ot, &mtime, &attrs);
2065 if (ret < 0)
2066 return ret;
2067
2068 RGWBucketEntryMetadataObject *mdo = new RGWBucketEntryMetadataObject(be, ot.read_version, mtime);
2069
2070 *obj = mdo;
2071
2072 return 0;
2073 }
2074
2075 int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
2076 real_time mtime, JSONObj *obj, sync_type_t sync_type) override {
2077 RGWBucketEntryPoint be, old_be;
2078 try {
2079 decode_json_obj(be, obj);
2080 } catch (JSONDecoder::err& e) {
2081 return -EINVAL;
2082 }
2083
2084 real_time orig_mtime;
2085 map<string, bufferlist> attrs;
2086
2087 RGWObjVersionTracker old_ot;
2088 RGWObjectCtx obj_ctx(store);
2089
2090 string tenant_name, bucket_name;
2091 parse_bucket(entry, &tenant_name, &bucket_name);
2092 int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, old_be, &old_ot, &orig_mtime, &attrs);
2093 if (ret < 0 && ret != -ENOENT)
2094 return ret;
2095
2096 // are we actually going to perform this put, or is it too old?
2097 if (ret != -ENOENT &&
2098 !check_versions(old_ot.read_version, orig_mtime,
2099 objv_tracker.write_version, mtime, sync_type)) {
2100 return STATUS_NO_APPLY;
2101 }
2102
2103 objv_tracker.read_version = old_ot.read_version; /* maintain the obj version we just read */
2104
2105 ret = store->put_bucket_entrypoint_info(tenant_name, bucket_name, be, false, objv_tracker, mtime, &attrs);
2106 if (ret < 0)
2107 return ret;
2108
2109 /* link bucket */
2110 if (be.linked) {
2111 ret = rgw_link_bucket(store, be.owner, be.bucket, be.creation_time, false);
2112 } else {
2113 ret = rgw_unlink_bucket(store, be.owner, be.bucket.tenant, be.bucket.name, false);
2114 }
2115
2116 return ret;
2117 }
2118
2119 struct list_keys_info {
2120 RGWRados *store;
2121 RGWListRawObjsCtx ctx;
2122 };
2123
2124 int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) override {
2125 RGWBucketEntryPoint be;
2126 RGWObjectCtx obj_ctx(store);
2127
2128 string tenant_name, bucket_name;
2129 parse_bucket(entry, &tenant_name, &bucket_name);
2130 int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, be, &objv_tracker, NULL, NULL);
2131 if (ret < 0)
2132 return ret;
2133
2134 /*
2135 * We're unlinking the bucket but we don't want to update the entrypoint here - we're removing
2136 * it immediately and don't want to invalidate our cached objv_version or the bucket obj removal
2137 * will incorrectly fail.
2138 */
2139 ret = rgw_unlink_bucket(store, be.owner, tenant_name, bucket_name, false);
2140 if (ret < 0) {
2141 lderr(store->ctx()) << "could not unlink bucket=" << entry << " owner=" << be.owner << dendl;
2142 }
2143
2144 ret = rgw_bucket_delete_bucket_obj(store, tenant_name, bucket_name, objv_tracker);
2145 if (ret < 0) {
2146 lderr(store->ctx()) << "could not delete bucket=" << entry << dendl;
2147 }
2148 /* idempotent */
2149 return 0;
2150 }
2151
2152 void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {
2153 oid = key;
2154 pool = store->get_zone_params().domain_root;
2155 }
2156
2157 int list_keys_init(RGWRados *store, void **phandle) override
2158 {
2159 list_keys_info *info = new list_keys_info;
2160
2161 info->store = store;
2162
2163 *phandle = (void *)info;
2164
2165 return 0;
2166 }
2167
2168 int list_keys_next(void *handle, int max, list<string>& keys, bool *truncated) override {
2169 list_keys_info *info = static_cast<list_keys_info *>(handle);
2170
2171 string no_filter;
2172
2173 keys.clear();
2174
2175 RGWRados *store = info->store;
2176
2177 list<string> unfiltered_keys;
2178
2179 int ret = store->list_raw_objects(store->get_zone_params().domain_root, no_filter,
2180 max, info->ctx, unfiltered_keys, truncated);
2181 if (ret < 0 && ret != -ENOENT)
2182 return ret;
2183 if (ret == -ENOENT) {
2184 if (truncated)
2185 *truncated = false;
2186 return 0;
2187 }
2188
2189 // now filter out the system entries
2190 list<string>::iterator iter;
2191 for (iter = unfiltered_keys.begin(); iter != unfiltered_keys.end(); ++iter) {
2192 string& k = *iter;
2193
2194 if (k[0] != '.') {
2195 keys.push_back(k);
2196 }
2197 }
2198
2199 return 0;
2200 }
2201
2202 void list_keys_complete(void *handle) override {
2203 list_keys_info *info = static_cast<list_keys_info *>(handle);
2204 delete info;
2205 }
2206 };
2207
2208 class RGWBucketInstanceMetadataHandler : public RGWMetadataHandler {
2209
2210 public:
2211 string get_type() override { return "bucket.instance"; }
2212
2213 int get(RGWRados *store, string& oid, RGWMetadataObject **obj) override {
2214 RGWBucketCompleteInfo bci;
2215
2216 real_time mtime;
2217 RGWObjectCtx obj_ctx(store);
2218
2219 int ret = store->get_bucket_instance_info(obj_ctx, oid, bci.info, &mtime, &bci.attrs);
2220 if (ret < 0)
2221 return ret;
2222
2223 RGWBucketInstanceMetadataObject *mdo = new RGWBucketInstanceMetadataObject(bci, bci.info.objv_tracker.read_version, mtime);
2224
2225 *obj = mdo;
2226
2227 return 0;
2228 }
2229
2230 int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
2231 real_time mtime, JSONObj *obj, sync_type_t sync_type) override {
2232 RGWBucketCompleteInfo bci, old_bci;
2233 try {
2234 decode_json_obj(bci, obj);
2235 } catch (JSONDecoder::err& e) {
2236 return -EINVAL;
2237 }
2238
2239 real_time orig_mtime;
2240 RGWObjectCtx obj_ctx(store);
2241
2242 int ret = store->get_bucket_instance_info(obj_ctx, entry, old_bci.info,
2243 &orig_mtime, &old_bci.attrs);
2244 bool exists = (ret != -ENOENT);
2245 if (ret < 0 && exists)
2246 return ret;
2247
2248 if (!exists || old_bci.info.bucket.bucket_id != bci.info.bucket.bucket_id) {
2249 /* a new bucket, we need to select a new bucket placement for it */
2250 auto key(entry);
2251 rgw_bucket_instance_oid_to_key(key);
2252 string tenant_name;
2253 string bucket_name;
2254 string bucket_instance;
2255 parse_bucket(key, &tenant_name, &bucket_name, &bucket_instance);
2256
2257 RGWZonePlacementInfo rule_info;
2258 bci.info.bucket.name = bucket_name;
2259 bci.info.bucket.bucket_id = bucket_instance;
2260 bci.info.bucket.tenant = tenant_name;
2261 ret = store->select_bucket_location_by_rule(bci.info.placement_rule, &rule_info);
2262 if (ret < 0) {
2263 ldout(store->ctx(), 0) << "ERROR: select_bucket_placement() returned " << ret << dendl;
2264 return ret;
2265 }
2266 bci.info.index_type = rule_info.index_type;
2267 } else {
2268 /* existing bucket, keep its placement */
2269 bci.info.bucket.explicit_placement = old_bci.info.bucket.explicit_placement;
2270 bci.info.placement_rule = old_bci.info.placement_rule;
2271 }
2272
2273 if (exists && old_bci.info.datasync_flag_enabled() != bci.info.datasync_flag_enabled()) {
2274 int shards_num = bci.info.num_shards? bci.info.num_shards : 1;
2275 int shard_id = bci.info.num_shards? 0 : -1;
2276
2277 if (!bci.info.datasync_flag_enabled()) {
2278 ret = store->stop_bi_log_entries(bci.info, -1);
2279 if (ret < 0) {
2280 lderr(store->ctx()) << "ERROR: failed writing bilog" << dendl;
2281 return ret;
2282 }
2283 } else {
2284 ret = store->resync_bi_log_entries(bci.info, -1);
2285 if (ret < 0) {
2286 lderr(store->ctx()) << "ERROR: failed writing bilog" << dendl;
2287 return ret;
2288 }
2289 }
2290
2291 for (int i = 0; i < shards_num; ++i, ++shard_id) {
2292 ret = store->data_log->add_entry(bci.info.bucket, shard_id);
2293 if (ret < 0) {
2294 lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
2295 return ret;
2296 }
2297 }
2298 }
2299
2300 // are we actually going to perform this put, or is it too old?
2301 if (exists &&
2302 !check_versions(old_bci.info.objv_tracker.read_version, orig_mtime,
2303 objv_tracker.write_version, mtime, sync_type)) {
2304 objv_tracker.read_version = old_bci.info.objv_tracker.read_version;
2305 return STATUS_NO_APPLY;
2306 }
2307
2308 /* record the read version (if any), store the new version */
2309 bci.info.objv_tracker.read_version = old_bci.info.objv_tracker.read_version;
2310 bci.info.objv_tracker.write_version = objv_tracker.write_version;
2311
2312 ret = store->put_bucket_instance_info(bci.info, false, mtime, &bci.attrs);
2313 if (ret < 0)
2314 return ret;
2315
2316 objv_tracker = bci.info.objv_tracker;
2317
2318 ret = store->init_bucket_index(bci.info, bci.info.num_shards);
2319 if (ret < 0)
2320 return ret;
2321
2322 return STATUS_APPLIED;
2323 }
2324
2325 struct list_keys_info {
2326 RGWRados *store;
2327 RGWListRawObjsCtx ctx;
2328 };
2329
2330 int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) override {
2331 RGWBucketInfo info;
2332 RGWObjectCtx obj_ctx(store);
2333
2334 int ret = store->get_bucket_instance_info(obj_ctx, entry, info, NULL, NULL);
2335 if (ret < 0 && ret != -ENOENT)
2336 return ret;
2337
2338 return rgw_bucket_instance_remove_entry(store, entry, &info.objv_tracker);
2339 }
2340
2341 void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {
2342 oid = RGW_BUCKET_INSTANCE_MD_PREFIX + key;
2343 rgw_bucket_instance_key_to_oid(oid);
2344 pool = store->get_zone_params().domain_root;
2345 }
2346
2347 int list_keys_init(RGWRados *store, void **phandle) override
2348 {
2349 list_keys_info *info = new list_keys_info;
2350
2351 info->store = store;
2352
2353 *phandle = (void *)info;
2354
2355 return 0;
2356 }
2357
2358 int list_keys_next(void *handle, int max, list<string>& keys, bool *truncated) override {
2359 list_keys_info *info = static_cast<list_keys_info *>(handle);
2360
2361 string no_filter;
2362
2363 keys.clear();
2364
2365 RGWRados *store = info->store;
2366
2367 list<string> unfiltered_keys;
2368
2369 int ret = store->list_raw_objects(store->get_zone_params().domain_root, no_filter,
2370 max, info->ctx, unfiltered_keys, truncated);
2371 if (ret < 0 && ret != -ENOENT)
2372 return ret;
2373 if (ret == -ENOENT) {
2374 if (truncated)
2375 *truncated = false;
2376 return 0;
2377 }
2378
2379 constexpr int prefix_size = sizeof(RGW_BUCKET_INSTANCE_MD_PREFIX) - 1;
2380 // now filter in the relevant entries
2381 list<string>::iterator iter;
2382 for (iter = unfiltered_keys.begin(); iter != unfiltered_keys.end(); ++iter) {
2383 string& k = *iter;
2384
2385 if (k.compare(0, prefix_size, RGW_BUCKET_INSTANCE_MD_PREFIX) == 0) {
2386 auto oid = k.substr(prefix_size);
2387 rgw_bucket_instance_oid_to_key(oid);
2388 keys.emplace_back(std::move(oid));
2389 }
2390 }
2391
2392 return 0;
2393 }
2394
2395 void list_keys_complete(void *handle) override {
2396 list_keys_info *info = static_cast<list_keys_info *>(handle);
2397 delete info;
2398 }
2399
2400 /*
2401 * hash entry for mdlog placement. Use the same hash key we'd have for the bucket entry
2402 * point, so that the log entries end up at the same log shard, so that we process them
2403 * in order
2404 */
2405 void get_hash_key(const string& section, const string& key, string& hash_key) override {
2406 string k;
2407 int pos = key.find(':');
2408 if (pos < 0)
2409 k = key;
2410 else
2411 k = key.substr(0, pos);
2412 hash_key = "bucket:" + k;
2413 }
2414 };
2415
2416 void rgw_bucket_init(RGWMetadataManager *mm)
2417 {
2418 bucket_meta_handler = new RGWBucketMetadataHandler;
2419 mm->register_handler(bucket_meta_handler);
2420 bucket_instance_meta_handler = new RGWBucketInstanceMetadataHandler;
2421 mm->register_handler(bucket_instance_meta_handler);
2422 }