]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_bucket.cc
update sources to v12.2.1
[ceph.git] / ceph / src / rgw / rgw_bucket.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <errno.h>
5
6 #include <string>
7 #include <map>
8 #include <sstream>
9
10 #include <boost/utility/string_ref.hpp>
11 #include <boost/format.hpp>
12
13 #include "common/errno.h"
14 #include "common/ceph_json.h"
15 #include "common/backport14.h"
16 #include "rgw_rados.h"
17 #include "rgw_acl.h"
18 #include "rgw_acl_s3.h"
19
20 #include "include/types.h"
21 #include "rgw_bucket.h"
22 #include "rgw_user.h"
23 #include "rgw_string.h"
24 #include "rgw_multi.h"
25
26 #include "include/rados/librados.hpp"
27 // until everything is moved from rgw_common
28 #include "rgw_common.h"
29
30 #include "cls/user/cls_user_types.h"
31
32 #define dout_context g_ceph_context
33 #define dout_subsys ceph_subsys_rgw
34
35 #define BUCKET_TAG_TIMEOUT 30
36
37 using namespace std;
38
39 static RGWMetadataHandler *bucket_meta_handler = NULL;
40 static RGWMetadataHandler *bucket_instance_meta_handler = NULL;
41
42 // define as static when RGWBucket implementation compete
43 void rgw_get_buckets_obj(const rgw_user& user_id, string& buckets_obj_id)
44 {
45 buckets_obj_id = user_id.to_str();
46 buckets_obj_id += RGW_BUCKETS_OBJ_SUFFIX;
47 }
48
49 /*
50 * Note that this is not a reversal of parse_bucket(). That one deals
51 * with the syntax we need in metadata and such. This one deals with
52 * the representation in RADOS pools. We chose '/' because it's not
53 * acceptable in bucket names and thus qualified buckets cannot conflict
54 * with the legacy or S3 buckets.
55 */
56 std::string rgw_make_bucket_entry_name(const std::string& tenant_name,
57 const std::string& bucket_name) {
58 std::string bucket_entry;
59
60 if (bucket_name.empty()) {
61 bucket_entry.clear();
62 } else if (tenant_name.empty()) {
63 bucket_entry = bucket_name;
64 } else {
65 bucket_entry = tenant_name + "/" + bucket_name;
66 }
67
68 return bucket_entry;
69 }
70
71 /*
72 * Tenants are separated from buckets in URLs by a colon in S3.
73 * This function is not to be used on Swift URLs, not even for COPY arguments.
74 */
75 void rgw_parse_url_bucket(const string &bucket, const string& auth_tenant,
76 string &tenant_name, string &bucket_name) {
77
78 int pos = bucket.find(':');
79 if (pos >= 0) {
80 /*
81 * N.B.: We allow ":bucket" syntax with explicit empty tenant in order
82 * to refer to the legacy tenant, in case users in new named tenants
83 * want to access old global buckets.
84 */
85 tenant_name = bucket.substr(0, pos);
86 bucket_name = bucket.substr(pos + 1);
87 } else {
88 tenant_name = auth_tenant;
89 bucket_name = bucket;
90 }
91 }
92
93 /**
94 * Get all the buckets owned by a user and fill up an RGWUserBuckets with them.
95 * Returns: 0 on success, -ERR# on failure.
96 */
97 int rgw_read_user_buckets(RGWRados * store,
98 const rgw_user& user_id,
99 RGWUserBuckets& buckets,
100 const string& marker,
101 const string& end_marker,
102 uint64_t max,
103 bool need_stats,
104 bool *is_truncated,
105 uint64_t default_amount)
106 {
107 int ret;
108 buckets.clear();
109 string buckets_obj_id;
110 rgw_get_buckets_obj(user_id, buckets_obj_id);
111 rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id);
112 list<cls_user_bucket_entry> entries;
113
114 bool truncated = false;
115 string m = marker;
116
117 uint64_t total = 0;
118
119 if (!max) {
120 max = default_amount;
121 }
122
123 do {
124 ret = store->cls_user_list_buckets(obj, m, end_marker, max - total, entries, &m, &truncated);
125 if (ret == -ENOENT)
126 ret = 0;
127
128 if (ret < 0)
129 return ret;
130
131 for (const auto& entry : entries) {
132 buckets.add(RGWBucketEnt(user_id, entry));
133 total++;
134 }
135
136 } while (truncated && total < max);
137
138 if (is_truncated != nullptr) {
139 *is_truncated = truncated;
140 }
141
142 if (need_stats) {
143 map<string, RGWBucketEnt>& m = buckets.get_buckets();
144 ret = store->update_containers_stats(m);
145 if (ret < 0 && ret != -ENOENT) {
146 ldout(store->ctx(), 0) << "ERROR: could not get stats for buckets" << dendl;
147 return ret;
148 }
149 }
150 return 0;
151 }
152
153 int rgw_bucket_sync_user_stats(RGWRados *store, const rgw_user& user_id, const RGWBucketInfo& bucket_info)
154 {
155 string buckets_obj_id;
156 rgw_get_buckets_obj(user_id, buckets_obj_id);
157 rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id);
158
159 return store->cls_user_sync_bucket_stats(obj, bucket_info);
160 }
161
162 int rgw_bucket_sync_user_stats(RGWRados *store, const string& tenant_name, const string& bucket_name)
163 {
164 RGWBucketInfo bucket_info;
165 RGWObjectCtx obj_ctx(store);
166 int ret = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL);
167 if (ret < 0) {
168 ldout(store->ctx(), 0) << "ERROR: could not fetch bucket info: ret=" << ret << dendl;
169 return ret;
170 }
171
172 ret = rgw_bucket_sync_user_stats(store, bucket_info.owner, bucket_info);
173 if (ret < 0) {
174 ldout(store->ctx(), 0) << "ERROR: could not sync user stats for bucket " << bucket_name << ": ret=" << ret << dendl;
175 return ret;
176 }
177
178 return 0;
179 }
180
181 int rgw_link_bucket(RGWRados *store, const rgw_user& user_id, rgw_bucket& bucket, real_time creation_time, bool update_entrypoint)
182 {
183 int ret;
184 string& tenant_name = bucket.tenant;
185 string& bucket_name = bucket.name;
186
187 cls_user_bucket_entry new_bucket;
188
189 RGWBucketEntryPoint ep;
190 RGWObjVersionTracker ot;
191
192 bucket.convert(&new_bucket.bucket);
193 new_bucket.size = 0;
194 if (real_clock::is_zero(creation_time))
195 new_bucket.creation_time = real_clock::now();
196 else
197 new_bucket.creation_time = creation_time;
198
199 map<string, bufferlist> attrs;
200 RGWObjectCtx obj_ctx(store);
201
202 if (update_entrypoint) {
203 ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, ep, &ot, NULL, &attrs);
204 if (ret < 0 && ret != -ENOENT) {
205 ldout(store->ctx(), 0) << "ERROR: store->get_bucket_entrypoint_info() returned: "
206 << cpp_strerror(-ret) << dendl;
207 }
208 }
209
210 string buckets_obj_id;
211 rgw_get_buckets_obj(user_id, buckets_obj_id);
212
213 rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id);
214 ret = store->cls_user_add_bucket(obj, new_bucket);
215 if (ret < 0) {
216 ldout(store->ctx(), 0) << "ERROR: error adding bucket to directory: "
217 << cpp_strerror(-ret) << dendl;
218 goto done_err;
219 }
220
221 if (!update_entrypoint)
222 return 0;
223
224 ep.linked = true;
225 ep.owner = user_id;
226 ep.bucket = bucket;
227 ret = store->put_bucket_entrypoint_info(tenant_name, bucket_name, ep, false, ot, real_time(), &attrs);
228 if (ret < 0)
229 goto done_err;
230
231 return 0;
232 done_err:
233 int r = rgw_unlink_bucket(store, user_id, bucket.tenant, bucket.name);
234 if (r < 0) {
235 ldout(store->ctx(), 0) << "ERROR: failed unlinking bucket on error cleanup: "
236 << cpp_strerror(-r) << dendl;
237 }
238 return ret;
239 }
240
241 int rgw_unlink_bucket(RGWRados *store, const rgw_user& user_id, const string& tenant_name, const string& bucket_name, bool update_entrypoint)
242 {
243 int ret;
244
245 string buckets_obj_id;
246 rgw_get_buckets_obj(user_id, buckets_obj_id);
247
248 cls_user_bucket bucket;
249 bucket.name = bucket_name;
250 rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id);
251 ret = store->cls_user_remove_bucket(obj, bucket);
252 if (ret < 0) {
253 ldout(store->ctx(), 0) << "ERROR: error removing bucket from directory: "
254 << cpp_strerror(-ret)<< dendl;
255 }
256
257 if (!update_entrypoint)
258 return 0;
259
260 RGWBucketEntryPoint ep;
261 RGWObjVersionTracker ot;
262 map<string, bufferlist> attrs;
263 RGWObjectCtx obj_ctx(store);
264 ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, ep, &ot, NULL, &attrs);
265 if (ret == -ENOENT)
266 return 0;
267 if (ret < 0)
268 return ret;
269
270 if (!ep.linked)
271 return 0;
272
273 if (ep.owner != user_id) {
274 ldout(store->ctx(), 0) << "bucket entry point user mismatch, can't unlink bucket: " << ep.owner << " != " << user_id << dendl;
275 return -EINVAL;
276 }
277
278 ep.linked = false;
279 return store->put_bucket_entrypoint_info(tenant_name, bucket_name, ep, false, ot, real_time(), &attrs);
280 }
281
282 int rgw_bucket_store_info(RGWRados *store, const string& bucket_name, bufferlist& bl, bool exclusive,
283 map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
284 real_time mtime) {
285 return store->meta_mgr->put_entry(bucket_meta_handler, bucket_name, bl, exclusive, objv_tracker, mtime, pattrs);
286 }
287
288 int rgw_bucket_instance_store_info(RGWRados *store, string& entry, bufferlist& bl, bool exclusive,
289 map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
290 real_time mtime) {
291 return store->meta_mgr->put_entry(bucket_instance_meta_handler, entry, bl, exclusive, objv_tracker, mtime, pattrs);
292 }
293
294 int rgw_bucket_instance_remove_entry(RGWRados *store, string& entry, RGWObjVersionTracker *objv_tracker) {
295 return store->meta_mgr->remove_entry(bucket_instance_meta_handler, entry, objv_tracker);
296 }
297
298 // 'tenant/' is used in bucket instance keys for sync to avoid parsing ambiguity
299 // with the existing instance[:shard] format. once we parse the shard, the / is
300 // replaced with a : to match the [tenant:]instance format
301 void rgw_bucket_instance_key_to_oid(string& key)
302 {
303 // replace tenant/ with tenant:
304 auto c = key.find('/');
305 if (c != string::npos) {
306 key[c] = ':';
307 }
308 }
309
310 // convert bucket instance oids back to the tenant/ format for metadata keys.
311 // it's safe to parse 'tenant:' only for oids, because they won't contain the
312 // optional :shard at the end
313 void rgw_bucket_instance_oid_to_key(string& oid)
314 {
315 // find first : (could be tenant:bucket or bucket:instance)
316 auto c = oid.find(':');
317 if (c != string::npos) {
318 // if we find another :, the first one was for tenant
319 if (oid.find(':', c + 1) != string::npos) {
320 oid[c] = '/';
321 }
322 }
323 }
324
325 int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id)
326 {
327 ssize_t pos = bucket_instance.rfind(':');
328 if (pos < 0) {
329 return -EINVAL;
330 }
331
332 string first = bucket_instance.substr(0, pos);
333 string second = bucket_instance.substr(pos + 1);
334
335 if (first.find(':') == string::npos) {
336 *shard_id = -1;
337 *target_bucket_instance = bucket_instance;
338 return 0;
339 }
340
341 *target_bucket_instance = first;
342 string err;
343 *shard_id = strict_strtol(second.c_str(), 10, &err);
344 if (!err.empty()) {
345 return -EINVAL;
346 }
347
348 return 0;
349 }
350
351 // parse key in format: [tenant/]name:instance[:shard_id]
352 int rgw_bucket_parse_bucket_key(CephContext *cct, const string& key,
353 rgw_bucket *bucket, int *shard_id)
354 {
355 boost::string_ref name{key};
356 boost::string_ref instance;
357
358 // split tenant/name
359 auto pos = name.find('/');
360 if (pos != boost::string_ref::npos) {
361 auto tenant = name.substr(0, pos);
362 bucket->tenant.assign(tenant.begin(), tenant.end());
363 name = name.substr(pos + 1);
364 }
365
366 // split name:instance
367 pos = name.find(':');
368 if (pos != boost::string_ref::npos) {
369 instance = name.substr(pos + 1);
370 name = name.substr(0, pos);
371 }
372 bucket->name.assign(name.begin(), name.end());
373
374 // split instance:shard
375 pos = instance.find(':');
376 if (pos == boost::string_ref::npos) {
377 bucket->bucket_id.assign(instance.begin(), instance.end());
378 *shard_id = -1;
379 return 0;
380 }
381
382 // parse shard id
383 auto shard = instance.substr(pos + 1);
384 string err;
385 auto id = strict_strtol(shard.data(), 10, &err);
386 if (!err.empty()) {
387 ldout(cct, 0) << "ERROR: failed to parse bucket shard '"
388 << instance.data() << "': " << err << dendl;
389 return -EINVAL;
390 }
391
392 *shard_id = id;
393 instance = instance.substr(0, pos);
394 bucket->bucket_id.assign(instance.begin(), instance.end());
395 return 0;
396 }
397
398 int rgw_bucket_set_attrs(RGWRados *store, RGWBucketInfo& bucket_info,
399 map<string, bufferlist>& attrs,
400 RGWObjVersionTracker *objv_tracker)
401 {
402 rgw_bucket& bucket = bucket_info.bucket;
403
404 if (!bucket_info.has_instance_obj) {
405 /* an old bucket object, need to convert it */
406 RGWObjectCtx obj_ctx(store);
407 int ret = store->convert_old_bucket_info(obj_ctx, bucket.tenant, bucket.name);
408 if (ret < 0) {
409 ldout(store->ctx(), 0) << "ERROR: failed converting old bucket info: " << ret << dendl;
410 return ret;
411 }
412 }
413
414 /* we want the bucket instance name without the oid prefix cruft */
415 string key = bucket.get_key();
416 bufferlist bl;
417
418 ::encode(bucket_info, bl);
419
420 return rgw_bucket_instance_store_info(store, key, bl, false, &attrs, objv_tracker, real_time());
421 }
422
423 static void dump_mulipart_index_results(list<rgw_obj_index_key>& objs_to_unlink,
424 Formatter *f)
425 {
426 for (const auto& o : objs_to_unlink) {
427 f->dump_string("object", o.name);
428 }
429 }
430
431 void check_bad_user_bucket_mapping(RGWRados *store, const rgw_user& user_id,
432 bool fix)
433 {
434 RGWUserBuckets user_buckets;
435 bool is_truncated = false;
436 string marker;
437
438 CephContext *cct = store->ctx();
439
440 size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
441
442 do {
443 int ret = rgw_read_user_buckets(store, user_id, user_buckets, marker,
444 string(), max_entries, false,
445 &is_truncated);
446 if (ret < 0) {
447 ldout(store->ctx(), 0) << "failed to read user buckets: "
448 << cpp_strerror(-ret) << dendl;
449 return;
450 }
451
452 map<string, RGWBucketEnt>& buckets = user_buckets.get_buckets();
453 for (map<string, RGWBucketEnt>::iterator i = buckets.begin();
454 i != buckets.end();
455 ++i) {
456 marker = i->first;
457
458 RGWBucketEnt& bucket_ent = i->second;
459 rgw_bucket& bucket = bucket_ent.bucket;
460
461 RGWBucketInfo bucket_info;
462 real_time mtime;
463 RGWObjectCtx obj_ctx(store);
464 int r = store->get_bucket_info(obj_ctx, user_id.tenant, bucket.name, bucket_info, &mtime);
465 if (r < 0) {
466 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << dendl;
467 continue;
468 }
469
470 rgw_bucket& actual_bucket = bucket_info.bucket;
471
472 if (actual_bucket.name.compare(bucket.name) != 0 ||
473 actual_bucket.tenant.compare(bucket.tenant) != 0 ||
474 actual_bucket.marker.compare(bucket.marker) != 0 ||
475 actual_bucket.bucket_id.compare(bucket.bucket_id) != 0) {
476 cout << "bucket info mismatch: expected " << actual_bucket << " got " << bucket << std::endl;
477 if (fix) {
478 cout << "fixing" << std::endl;
479 r = rgw_link_bucket(store, user_id, actual_bucket, bucket_info.creation_time);
480 if (r < 0) {
481 cerr << "failed to fix bucket: " << cpp_strerror(-r) << std::endl;
482 }
483 }
484 }
485 }
486 } while (is_truncated);
487 }
488
489 static bool bucket_object_check_filter(const string& oid)
490 {
491 rgw_obj_key key;
492 string ns;
493 return rgw_obj_key::oid_to_key_in_ns(oid, &key, ns);
494 }
495
496 int rgw_remove_object(RGWRados *store, RGWBucketInfo& bucket_info, rgw_bucket& bucket, rgw_obj_key& key)
497 {
498 RGWObjectCtx rctx(store);
499
500 if (key.instance.empty()) {
501 key.instance = "null";
502 }
503
504 rgw_obj obj(bucket, key);
505
506 return store->delete_obj(rctx, bucket_info, obj, bucket_info.versioning_status());
507 }
508
509 int rgw_remove_bucket(RGWRados *store, rgw_bucket& bucket, bool delete_children)
510 {
511 int ret;
512 map<RGWObjCategory, RGWStorageStats> stats;
513 std::vector<rgw_bucket_dir_entry> objs;
514 map<string, bool> common_prefixes;
515 RGWBucketInfo info;
516 RGWObjectCtx obj_ctx(store);
517
518 string bucket_ver, master_ver;
519
520 ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, info, NULL);
521 if (ret < 0)
522 return ret;
523
524 ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL);
525 if (ret < 0)
526 return ret;
527
528 RGWRados::Bucket target(store, info);
529 RGWRados::Bucket::List list_op(&target);
530 CephContext *cct = store->ctx();
531 int max = 1000;
532
533 list_op.params.list_versions = true;
534
535 do {
536 objs.clear();
537
538 ret = list_op.list_objects(max, &objs, &common_prefixes, NULL);
539 if (ret < 0)
540 return ret;
541
542 if (!objs.empty() && !delete_children) {
543 lderr(store->ctx()) << "ERROR: could not remove non-empty bucket " << bucket.name << dendl;
544 return -ENOTEMPTY;
545 }
546
547 for (const auto& obj : objs) {
548 rgw_obj_key key(obj.key);
549 ret = rgw_remove_object(store, info, bucket, key);
550 if (ret < 0)
551 return ret;
552 }
553
554 } while (!objs.empty());
555
556 string prefix, delimiter;
557
558 ret = abort_bucket_multiparts(store, cct, info, prefix, delimiter);
559 if (ret < 0) {
560 return ret;
561 }
562
563 ret = rgw_bucket_sync_user_stats(store, bucket.tenant, info);
564 if ( ret < 0) {
565 dout(1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl;
566 }
567
568 RGWObjVersionTracker objv_tracker;
569
570 ret = store->delete_bucket(info, objv_tracker);
571 if (ret < 0) {
572 lderr(store->ctx()) << "ERROR: could not remove bucket " << bucket.name << dendl;
573 return ret;
574 }
575
576 ret = rgw_unlink_bucket(store, info.owner, bucket.tenant, bucket.name, false);
577 if (ret < 0) {
578 lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
579 }
580
581 return ret;
582 }
583
584 static int aio_wait(librados::AioCompletion *handle)
585 {
586 librados::AioCompletion *c = (librados::AioCompletion *)handle;
587 c->wait_for_safe();
588 int ret = c->get_return_value();
589 c->release();
590 return ret;
591 }
592
593 static int drain_handles(list<librados::AioCompletion *>& pending)
594 {
595 int ret = 0;
596 while (!pending.empty()) {
597 librados::AioCompletion *handle = pending.front();
598 pending.pop_front();
599 int r = aio_wait(handle);
600 if (r < 0) {
601 ret = r;
602 }
603 }
604 return ret;
605 }
606
607 int rgw_remove_bucket_bypass_gc(RGWRados *store, rgw_bucket& bucket,
608 int concurrent_max, bool keep_index_consistent)
609 {
610 int ret;
611 map<RGWObjCategory, RGWStorageStats> stats;
612 std::vector<rgw_bucket_dir_entry> objs;
613 map<string, bool> common_prefixes;
614 RGWBucketInfo info;
615 RGWObjectCtx obj_ctx(store);
616 CephContext *cct = store->ctx();
617
618 string bucket_ver, master_ver;
619
620 ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, info, NULL);
621 if (ret < 0)
622 return ret;
623
624 ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL);
625 if (ret < 0)
626 return ret;
627
628 string prefix, delimiter;
629
630 ret = abort_bucket_multiparts(store, cct, info, prefix, delimiter);
631 if (ret < 0) {
632 return ret;
633 }
634
635 RGWRados::Bucket target(store, info);
636 RGWRados::Bucket::List list_op(&target);
637
638 list_op.params.list_versions = true;
639
640 std::list<librados::AioCompletion*> handles;
641
642 int max = 1000;
643 int max_aio = concurrent_max;
644 ret = list_op.list_objects(max, &objs, &common_prefixes, NULL);
645 if (ret < 0)
646 return ret;
647
648 while (!objs.empty()) {
649 std::vector<rgw_bucket_dir_entry>::iterator it = objs.begin();
650 for (; it != objs.end(); ++it) {
651 RGWObjState *astate = NULL;
652 rgw_obj obj(bucket, (*it).key);
653
654 ret = store->get_obj_state(&obj_ctx, info, obj, &astate, false);
655 if (ret == -ENOENT) {
656 dout(1) << "WARNING: cannot find obj state for obj " << obj.get_oid() << dendl;
657 continue;
658 }
659 if (ret < 0) {
660 lderr(store->ctx()) << "ERROR: get obj state returned with error " << ret << dendl;
661 return ret;
662 }
663
664 if (astate->has_manifest) {
665 RGWObjManifest& manifest = astate->manifest;
666 RGWObjManifest::obj_iterator miter = manifest.obj_begin();
667 rgw_obj head_obj = manifest.get_obj();
668 rgw_raw_obj raw_head_obj;
669 store->obj_to_raw(info.placement_rule, head_obj, &raw_head_obj);
670
671
672 for (; miter != manifest.obj_end() && max_aio--; ++miter) {
673 if (!max_aio) {
674 ret = drain_handles(handles);
675 if (ret < 0) {
676 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
677 return ret;
678 }
679 max_aio = concurrent_max;
680 }
681
682 rgw_raw_obj last_obj = miter.get_location().get_raw_obj(store);
683 if (last_obj == raw_head_obj) {
684 // have the head obj deleted at the end
685 continue;
686 }
687
688 ret = store->delete_raw_obj_aio(last_obj, handles);
689 if (ret < 0) {
690 lderr(store->ctx()) << "ERROR: delete obj aio failed with " << ret << dendl;
691 return ret;
692 }
693 } // for all shadow objs
694
695 ret = store->delete_obj_aio(head_obj, info, astate, handles, keep_index_consistent);
696 if (ret < 0) {
697 lderr(store->ctx()) << "ERROR: delete obj aio failed with " << ret << dendl;
698 return ret;
699 }
700 }
701
702 if (!max_aio) {
703 ret = drain_handles(handles);
704 if (ret < 0) {
705 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
706 return ret;
707 }
708 max_aio = concurrent_max;
709 }
710 } // for all RGW objects
711 objs.clear();
712
713 ret = list_op.list_objects(max, &objs, &common_prefixes, NULL);
714 if (ret < 0)
715 return ret;
716 }
717
718 ret = drain_handles(handles);
719 if (ret < 0) {
720 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
721 return ret;
722 }
723
724 ret = rgw_bucket_sync_user_stats(store, bucket.tenant, info);
725 if (ret < 0) {
726 dout(1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl;
727 }
728
729 RGWObjVersionTracker objv_tracker;
730
731 ret = rgw_bucket_delete_bucket_obj(store, bucket.tenant, bucket.name, objv_tracker);
732 if (ret < 0) {
733 lderr(store->ctx()) << "ERROR: could not remove bucket " << bucket.name << "with ret as " << ret << dendl;
734 return ret;
735 }
736
737 if (!store->is_syncing_bucket_meta(bucket)) {
738 RGWObjVersionTracker objv_tracker;
739 string entry = bucket.get_key();
740 ret = rgw_bucket_instance_remove_entry(store, entry, &objv_tracker);
741 if (ret < 0) {
742 lderr(store->ctx()) << "ERROR: could not remove bucket instance entry" << bucket.name << "with ret as " << ret << dendl;
743 return ret;
744 }
745 }
746
747 ret = rgw_unlink_bucket(store, info.owner, bucket.tenant, bucket.name, false);
748 if (ret < 0) {
749 lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
750 }
751
752 return ret;
753 }
754
755 int rgw_bucket_delete_bucket_obj(RGWRados *store,
756 const string& tenant_name,
757 const string& bucket_name,
758 RGWObjVersionTracker& objv_tracker)
759 {
760 string key;
761
762 rgw_make_bucket_entry_name(tenant_name, bucket_name, key);
763 return store->meta_mgr->remove_entry(bucket_meta_handler, key, &objv_tracker);
764 }
765
766 static void set_err_msg(std::string *sink, std::string msg)
767 {
768 if (sink && !msg.empty())
769 *sink = msg;
770 }
771
772 int RGWBucket::init(RGWRados *storage, RGWBucketAdminOpState& op_state)
773 {
774 if (!storage)
775 return -EINVAL;
776
777 store = storage;
778
779 rgw_user user_id = op_state.get_user_id();
780 tenant = user_id.tenant;
781 bucket_name = op_state.get_bucket_name();
782 RGWUserBuckets user_buckets;
783 RGWObjectCtx obj_ctx(store);
784
785 if (bucket_name.empty() && user_id.empty())
786 return -EINVAL;
787
788 if (!bucket_name.empty()) {
789 int r = store->get_bucket_info(obj_ctx, tenant, bucket_name, bucket_info, NULL);
790 if (r < 0) {
791 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket_name << dendl;
792 return r;
793 }
794
795 op_state.set_bucket(bucket_info.bucket);
796 }
797
798 if (!user_id.empty()) {
799 int r = rgw_get_user_info_by_uid(store, user_id, user_info);
800 if (r < 0)
801 return r;
802
803 op_state.display_name = user_info.display_name;
804 }
805
806 clear_failure();
807 return 0;
808 }
809
810 int RGWBucket::link(RGWBucketAdminOpState& op_state, std::string *err_msg)
811 {
812 if (!op_state.is_user_op()) {
813 set_err_msg(err_msg, "empty user id");
814 return -EINVAL;
815 }
816
817 string bucket_id = op_state.get_bucket_id();
818 if (bucket_id.empty()) {
819 set_err_msg(err_msg, "empty bucket instance id");
820 return -EINVAL;
821 }
822
823 std::string display_name = op_state.get_user_display_name();
824 rgw_bucket bucket = op_state.get_bucket();
825
826 const rgw_pool& root_pool = store->get_zone_params().domain_root;
827 rgw_raw_obj obj(root_pool, bucket.name);
828 RGWObjVersionTracker objv_tracker;
829
830 map<string, bufferlist> attrs;
831 RGWBucketInfo bucket_info;
832
833 string key = bucket.name + ":" + bucket_id;
834 RGWObjectCtx obj_ctx(store);
835 int r = store->get_bucket_instance_info(obj_ctx, key, bucket_info, NULL, &attrs);
836 if (r < 0) {
837 return r;
838 }
839
840 rgw_user user_id = op_state.get_user_id();
841
842 map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_ACL);
843 if (aiter != attrs.end()) {
844 bufferlist aclbl = aiter->second;
845 RGWAccessControlPolicy policy;
846 ACLOwner owner;
847 try {
848 bufferlist::iterator iter = aclbl.begin();
849 ::decode(policy, iter);
850 owner = policy.get_owner();
851 } catch (buffer::error& err) {
852 set_err_msg(err_msg, "couldn't decode policy");
853 return -EIO;
854 }
855
856 r = rgw_unlink_bucket(store, owner.get_id(), bucket.tenant, bucket.name, false);
857 if (r < 0) {
858 set_err_msg(err_msg, "could not unlink policy from user " + owner.get_id().to_str());
859 return r;
860 }
861
862 // now update the user for the bucket...
863 if (display_name.empty()) {
864 ldout(store->ctx(), 0) << "WARNING: user " << user_info.user_id << " has no display name set" << dendl;
865 }
866 policy.create_default(user_info.user_id, display_name);
867
868 owner = policy.get_owner();
869 r = store->set_bucket_owner(bucket_info.bucket, owner);
870 if (r < 0) {
871 set_err_msg(err_msg, "failed to set bucket owner: " + cpp_strerror(-r));
872 return r;
873 }
874
875 // ...and encode the acl
876 aclbl.clear();
877 policy.encode(aclbl);
878
879 r = store->system_obj_set_attr(NULL, obj, RGW_ATTR_ACL, aclbl, &objv_tracker);
880 if (r < 0) {
881 return r;
882 }
883
884 RGWAccessControlPolicy policy_instance;
885 policy_instance.create_default(user_info.user_id, display_name);
886 aclbl.clear();
887 policy_instance.encode(aclbl);
888
889 string oid_bucket_instance = RGW_BUCKET_INSTANCE_MD_PREFIX + key;
890 rgw_raw_obj obj_bucket_instance(root_pool, oid_bucket_instance);
891 r = store->system_obj_set_attr(NULL, obj_bucket_instance, RGW_ATTR_ACL, aclbl, &objv_tracker);
892 if (r < 0) {
893 return r;
894 }
895
896 r = rgw_link_bucket(store, user_info.user_id, bucket_info.bucket, real_time());
897 if (r < 0) {
898 return r;
899 }
900 }
901
902 return 0;
903 }
904
905 int RGWBucket::unlink(RGWBucketAdminOpState& op_state, std::string *err_msg)
906 {
907 rgw_bucket bucket = op_state.get_bucket();
908
909 if (!op_state.is_user_op()) {
910 set_err_msg(err_msg, "could not fetch user or user bucket info");
911 return -EINVAL;
912 }
913
914 int r = rgw_unlink_bucket(store, user_info.user_id, bucket.tenant, bucket.name);
915 if (r < 0) {
916 set_err_msg(err_msg, "error unlinking bucket" + cpp_strerror(-r));
917 }
918
919 return r;
920 }
921
922 int RGWBucket::remove(RGWBucketAdminOpState& op_state, bool bypass_gc,
923 bool keep_index_consistent, std::string *err_msg)
924 {
925 bool delete_children = op_state.will_delete_children();
926 rgw_bucket bucket = op_state.get_bucket();
927 int ret;
928
929 if (bypass_gc) {
930 if (delete_children) {
931 ret = rgw_remove_bucket_bypass_gc(store, bucket, op_state.get_max_aio(), keep_index_consistent);
932 } else {
933 set_err_msg(err_msg, "purge objects should be set for gc to be bypassed");
934 return -EINVAL;
935 }
936 } else {
937 ret = rgw_remove_bucket(store, bucket, delete_children);
938 }
939
940 if (ret < 0) {
941 set_err_msg(err_msg, "unable to remove bucket" + cpp_strerror(-ret));
942 return ret;
943 }
944
945 return 0;
946 }
947
948 int RGWBucket::remove_object(RGWBucketAdminOpState& op_state, std::string *err_msg)
949 {
950 rgw_bucket bucket = op_state.get_bucket();
951 std::string object_name = op_state.get_object_name();
952
953 rgw_obj_key key(object_name);
954
955 int ret = rgw_remove_object(store, bucket_info, bucket, key);
956 if (ret < 0) {
957 set_err_msg(err_msg, "unable to remove object" + cpp_strerror(-ret));
958 return ret;
959 }
960
961 return 0;
962 }
963
964 static void dump_bucket_index(map<string, rgw_bucket_dir_entry> result, Formatter *f)
965 {
966 map<string, rgw_bucket_dir_entry>::iterator iter;
967 for (iter = result.begin(); iter != result.end(); ++iter) {
968 f->dump_string("object", iter->first);
969 }
970 }
971
972 static void dump_bucket_usage(map<RGWObjCategory, RGWStorageStats>& stats, Formatter *formatter)
973 {
974 map<RGWObjCategory, RGWStorageStats>::iterator iter;
975
976 formatter->open_object_section("usage");
977 for (iter = stats.begin(); iter != stats.end(); ++iter) {
978 RGWStorageStats& s = iter->second;
979 const char *cat_name = rgw_obj_category_name(iter->first);
980 formatter->open_object_section(cat_name);
981 s.dump(formatter);
982 formatter->close_section();
983 }
984 formatter->close_section();
985 }
986
987 static void dump_index_check(map<RGWObjCategory, RGWStorageStats> existing_stats,
988 map<RGWObjCategory, RGWStorageStats> calculated_stats,
989 Formatter *formatter)
990 {
991 formatter->open_object_section("check_result");
992 formatter->open_object_section("existing_header");
993 dump_bucket_usage(existing_stats, formatter);
994 formatter->close_section();
995 formatter->open_object_section("calculated_header");
996 dump_bucket_usage(calculated_stats, formatter);
997 formatter->close_section();
998 formatter->close_section();
999 }
1000
1001 int RGWBucket::check_bad_index_multipart(RGWBucketAdminOpState& op_state,
1002 RGWFormatterFlusher& flusher ,std::string *err_msg)
1003 {
1004 bool fix_index = op_state.will_fix_index();
1005 rgw_bucket bucket = op_state.get_bucket();
1006
1007 size_t max = 1000;
1008
1009 map<string, bool> common_prefixes;
1010
1011 bool is_truncated;
1012 map<string, bool> meta_objs;
1013 map<rgw_obj_index_key, string> all_objs;
1014
1015 RGWBucketInfo bucket_info;
1016 RGWObjectCtx obj_ctx(store);
1017 int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, nullptr, nullptr);
1018 if (r < 0) {
1019 ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): get_bucket_instance_info(bucket=" << bucket << ") returned r=" << r << dendl;
1020 return r;
1021 }
1022
1023 RGWRados::Bucket target(store, bucket_info);
1024 RGWRados::Bucket::List list_op(&target);
1025
1026 list_op.params.list_versions = true;
1027 list_op.params.ns = RGW_OBJ_NS_MULTIPART;
1028
1029 do {
1030 vector<rgw_bucket_dir_entry> result;
1031 int r = list_op.list_objects(max, &result, &common_prefixes, &is_truncated);
1032 if (r < 0) {
1033 set_err_msg(err_msg, "failed to list objects in bucket=" + bucket.name +
1034 " err=" + cpp_strerror(-r));
1035
1036 return r;
1037 }
1038
1039 vector<rgw_bucket_dir_entry>::iterator iter;
1040 for (iter = result.begin(); iter != result.end(); ++iter) {
1041 rgw_obj_index_key key = iter->key;
1042 rgw_obj obj(bucket, key);
1043 string oid = obj.get_oid();
1044
1045 int pos = oid.find_last_of('.');
1046 if (pos < 0) {
1047 /* obj has no suffix */
1048 all_objs[key] = oid;
1049 } else {
1050 /* obj has suffix */
1051 string name = oid.substr(0, pos);
1052 string suffix = oid.substr(pos + 1);
1053
1054 if (suffix.compare("meta") == 0) {
1055 meta_objs[name] = true;
1056 } else {
1057 all_objs[key] = name;
1058 }
1059 }
1060 }
1061
1062 } while (is_truncated);
1063
1064 list<rgw_obj_index_key> objs_to_unlink;
1065 Formatter *f = flusher.get_formatter();
1066
1067 f->open_array_section("invalid_multipart_entries");
1068
1069 for (auto aiter = all_objs.begin(); aiter != all_objs.end(); ++aiter) {
1070 string& name = aiter->second;
1071
1072 if (meta_objs.find(name) == meta_objs.end()) {
1073 objs_to_unlink.push_back(aiter->first);
1074 }
1075
1076 if (objs_to_unlink.size() > max) {
1077 if (fix_index) {
1078 int r = store->remove_objs_from_index(bucket_info, objs_to_unlink);
1079 if (r < 0) {
1080 set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
1081 cpp_strerror(-r));
1082 return r;
1083 }
1084 }
1085
1086 dump_mulipart_index_results(objs_to_unlink, flusher.get_formatter());
1087 flusher.flush();
1088 objs_to_unlink.clear();
1089 }
1090 }
1091
1092 if (fix_index) {
1093 int r = store->remove_objs_from_index(bucket_info, objs_to_unlink);
1094 if (r < 0) {
1095 set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
1096 cpp_strerror(-r));
1097
1098 return r;
1099 }
1100 }
1101
1102 dump_mulipart_index_results(objs_to_unlink, f);
1103 f->close_section();
1104 flusher.flush();
1105
1106 return 0;
1107 }
1108
1109 int RGWBucket::check_object_index(RGWBucketAdminOpState& op_state,
1110 RGWFormatterFlusher& flusher,
1111 std::string *err_msg)
1112 {
1113
1114 bool fix_index = op_state.will_fix_index();
1115
1116 rgw_bucket bucket = op_state.get_bucket();
1117
1118 if (!fix_index) {
1119 set_err_msg(err_msg, "check-objects flag requires fix index enabled");
1120 return -EINVAL;
1121 }
1122
1123 store->cls_obj_set_bucket_tag_timeout(bucket_info, BUCKET_TAG_TIMEOUT);
1124
1125 string prefix;
1126 rgw_obj_index_key marker;
1127 bool is_truncated = true;
1128
1129 Formatter *formatter = flusher.get_formatter();
1130 formatter->open_object_section("objects");
1131 while (is_truncated) {
1132 map<string, rgw_bucket_dir_entry> result;
1133
1134 int r = store->cls_bucket_list(bucket_info, RGW_NO_SHARD, marker, prefix, 1000, true,
1135 result, &is_truncated, &marker,
1136 bucket_object_check_filter);
1137 if (r == -ENOENT) {
1138 break;
1139 } else if (r < 0 && r != -ENOENT) {
1140 set_err_msg(err_msg, "ERROR: failed operation r=" + cpp_strerror(-r));
1141 }
1142
1143
1144 dump_bucket_index(result, formatter);
1145 flusher.flush();
1146
1147 }
1148
1149 formatter->close_section();
1150
1151 store->cls_obj_set_bucket_tag_timeout(bucket_info, 0);
1152
1153 return 0;
1154 }
1155
1156
1157 int RGWBucket::check_index(RGWBucketAdminOpState& op_state,
1158 map<RGWObjCategory, RGWStorageStats>& existing_stats,
1159 map<RGWObjCategory, RGWStorageStats>& calculated_stats,
1160 std::string *err_msg)
1161 {
1162 rgw_bucket bucket = op_state.get_bucket();
1163 bool fix_index = op_state.will_fix_index();
1164
1165 int r = store->bucket_check_index(bucket_info, &existing_stats, &calculated_stats);
1166 if (r < 0) {
1167 set_err_msg(err_msg, "failed to check index error=" + cpp_strerror(-r));
1168 return r;
1169 }
1170
1171 if (fix_index) {
1172 r = store->bucket_rebuild_index(bucket_info);
1173 if (r < 0) {
1174 set_err_msg(err_msg, "failed to rebuild index err=" + cpp_strerror(-r));
1175 return r;
1176 }
1177 }
1178
1179 return 0;
1180 }
1181
1182
1183 int RGWBucket::policy_bl_to_stream(bufferlist& bl, ostream& o)
1184 {
1185 RGWAccessControlPolicy_S3 policy(g_ceph_context);
1186 bufferlist::iterator iter = bl.begin();
1187 try {
1188 policy.decode(iter);
1189 } catch (buffer::error& err) {
1190 dout(0) << "ERROR: caught buffer::error, could not decode policy" << dendl;
1191 return -EIO;
1192 }
1193 policy.to_xml(o);
1194 return 0;
1195 }
1196
1197 static int policy_decode(RGWRados *store, bufferlist& bl, RGWAccessControlPolicy& policy)
1198 {
1199 bufferlist::iterator iter = bl.begin();
1200 try {
1201 policy.decode(iter);
1202 } catch (buffer::error& err) {
1203 ldout(store->ctx(), 0) << "ERROR: caught buffer::error, could not decode policy" << dendl;
1204 return -EIO;
1205 }
1206 return 0;
1207 }
1208
1209 int RGWBucket::get_policy(RGWBucketAdminOpState& op_state, RGWAccessControlPolicy& policy)
1210 {
1211 std::string object_name = op_state.get_object_name();
1212 rgw_bucket bucket = op_state.get_bucket();
1213 RGWObjectCtx obj_ctx(store);
1214
1215 RGWBucketInfo bucket_info;
1216 map<string, bufferlist> attrs;
1217 int ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, bucket_info, NULL, &attrs);
1218 if (ret < 0) {
1219 return ret;
1220 }
1221
1222 if (!object_name.empty()) {
1223 bufferlist bl;
1224 rgw_obj obj(bucket, object_name);
1225
1226 RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
1227 RGWRados::Object::Read rop(&op_target);
1228
1229 int ret = rop.get_attr(RGW_ATTR_ACL, bl);
1230 if (ret < 0)
1231 return ret;
1232
1233 return policy_decode(store, bl, policy);
1234 }
1235
1236 map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_ACL);
1237 if (aiter == attrs.end()) {
1238 return -ENOENT;
1239 }
1240
1241 return policy_decode(store, aiter->second, policy);
1242 }
1243
1244
1245 int RGWBucketAdminOp::get_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
1246 RGWAccessControlPolicy& policy)
1247 {
1248 RGWBucket bucket;
1249
1250 int ret = bucket.init(store, op_state);
1251 if (ret < 0)
1252 return ret;
1253
1254 ret = bucket.get_policy(op_state, policy);
1255 if (ret < 0)
1256 return ret;
1257
1258 return 0;
1259 }
1260
1261 /* Wrappers to facilitate RESTful interface */
1262
1263
1264 int RGWBucketAdminOp::get_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
1265 RGWFormatterFlusher& flusher)
1266 {
1267 RGWAccessControlPolicy policy(store->ctx());
1268
1269 int ret = get_policy(store, op_state, policy);
1270 if (ret < 0)
1271 return ret;
1272
1273 Formatter *formatter = flusher.get_formatter();
1274
1275 flusher.start(0);
1276
1277 formatter->open_object_section("policy");
1278 policy.dump(formatter);
1279 formatter->close_section();
1280
1281 flusher.flush();
1282
1283 return 0;
1284 }
1285
1286 int RGWBucketAdminOp::dump_s3_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
1287 ostream& os)
1288 {
1289 RGWAccessControlPolicy_S3 policy(store->ctx());
1290
1291 int ret = get_policy(store, op_state, policy);
1292 if (ret < 0)
1293 return ret;
1294
1295 policy.to_xml(os);
1296
1297 return 0;
1298 }
1299
1300 int RGWBucketAdminOp::unlink(RGWRados *store, RGWBucketAdminOpState& op_state)
1301 {
1302 RGWBucket bucket;
1303
1304 int ret = bucket.init(store, op_state);
1305 if (ret < 0)
1306 return ret;
1307
1308 return bucket.unlink(op_state);
1309 }
1310
1311 int RGWBucketAdminOp::link(RGWRados *store, RGWBucketAdminOpState& op_state, string *err)
1312 {
1313 RGWBucket bucket;
1314
1315 int ret = bucket.init(store, op_state);
1316 if (ret < 0)
1317 return ret;
1318
1319 return bucket.link(op_state, err);
1320
1321 }
1322
1323 int RGWBucketAdminOp::check_index(RGWRados *store, RGWBucketAdminOpState& op_state,
1324 RGWFormatterFlusher& flusher)
1325 {
1326 int ret;
1327 map<RGWObjCategory, RGWStorageStats> existing_stats;
1328 map<RGWObjCategory, RGWStorageStats> calculated_stats;
1329
1330
1331 RGWBucket bucket;
1332
1333 ret = bucket.init(store, op_state);
1334 if (ret < 0)
1335 return ret;
1336
1337 Formatter *formatter = flusher.get_formatter();
1338 flusher.start(0);
1339
1340 ret = bucket.check_bad_index_multipart(op_state, flusher);
1341 if (ret < 0)
1342 return ret;
1343
1344 ret = bucket.check_object_index(op_state, flusher);
1345 if (ret < 0)
1346 return ret;
1347
1348 ret = bucket.check_index(op_state, existing_stats, calculated_stats);
1349 if (ret < 0)
1350 return ret;
1351
1352 dump_index_check(existing_stats, calculated_stats, formatter);
1353 flusher.flush();
1354
1355 return 0;
1356 }
1357
1358 int RGWBucketAdminOp::remove_bucket(RGWRados *store, RGWBucketAdminOpState& op_state,
1359 bool bypass_gc, bool keep_index_consistent)
1360 {
1361 RGWBucket bucket;
1362
1363 int ret = bucket.init(store, op_state);
1364 if (ret < 0)
1365 return ret;
1366
1367 std::string err_msg;
1368 ret = bucket.remove(op_state, bypass_gc, keep_index_consistent, &err_msg);
1369 if (!err_msg.empty()) {
1370 lderr(store->ctx()) << "ERROR: " << err_msg << dendl;
1371 }
1372 return ret;
1373 }
1374
1375 int RGWBucketAdminOp::remove_object(RGWRados *store, RGWBucketAdminOpState& op_state)
1376 {
1377 RGWBucket bucket;
1378
1379 int ret = bucket.init(store, op_state);
1380 if (ret < 0)
1381 return ret;
1382
1383 return bucket.remove_object(op_state);
1384 }
1385
1386 static int bucket_stats(RGWRados *store, const std::string& tenant_name, std::string& bucket_name, Formatter *formatter)
1387 {
1388 RGWBucketInfo bucket_info;
1389 map<RGWObjCategory, RGWStorageStats> stats;
1390
1391 real_time mtime;
1392 RGWObjectCtx obj_ctx(store);
1393 int r = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, &mtime);
1394 if (r < 0)
1395 return r;
1396
1397 rgw_bucket& bucket = bucket_info.bucket;
1398
1399 string bucket_ver, master_ver;
1400 string max_marker;
1401 int ret = store->get_bucket_stats(bucket_info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, &max_marker);
1402 if (ret < 0) {
1403 cerr << "error getting bucket stats ret=" << ret << std::endl;
1404 return ret;
1405 }
1406
1407 utime_t ut(mtime);
1408
1409 formatter->open_object_section("stats");
1410 formatter->dump_string("bucket", bucket.name);
1411 formatter->dump_string("zonegroup", bucket_info.zonegroup);
1412 formatter->dump_string("placement_rule", bucket_info.placement_rule);
1413 ::encode_json("explicit_placement", bucket.explicit_placement, formatter);
1414 formatter->dump_string("id", bucket.bucket_id);
1415 formatter->dump_string("marker", bucket.marker);
1416 formatter->dump_stream("index_type") << bucket_info.index_type;
1417 ::encode_json("owner", bucket_info.owner, formatter);
1418 formatter->dump_string("ver", bucket_ver);
1419 formatter->dump_string("master_ver", master_ver);
1420 formatter->dump_stream("mtime") << ut;
1421 formatter->dump_string("max_marker", max_marker);
1422 dump_bucket_usage(stats, formatter);
1423 encode_json("bucket_quota", bucket_info.quota, formatter);
1424 formatter->close_section();
1425
1426 return 0;
1427 }
1428
1429 int RGWBucketAdminOp::limit_check(RGWRados *store,
1430 RGWBucketAdminOpState& op_state,
1431 const std::list<std::string>& user_ids,
1432 RGWFormatterFlusher& flusher,
1433 bool warnings_only)
1434 {
1435 int ret = 0;
1436 const size_t max_entries =
1437 store->ctx()->_conf->rgw_list_buckets_max_chunk;
1438
1439 const size_t safe_max_objs_per_shard =
1440 store->ctx()->_conf->rgw_safe_max_objects_per_shard;
1441
1442 uint16_t shard_warn_pct =
1443 store->ctx()->_conf->rgw_shard_warning_threshold;
1444 if (shard_warn_pct > 100)
1445 shard_warn_pct = 90;
1446
1447 Formatter *formatter = flusher.get_formatter();
1448 flusher.start(0);
1449
1450 formatter->open_array_section("users");
1451
1452 for (const auto& user_id : user_ids) {
1453 formatter->open_object_section("user");
1454 formatter->dump_string("user_id", user_id);
1455 bool done;
1456 formatter->open_array_section("buckets");
1457 do {
1458 RGWUserBuckets buckets;
1459 string marker;
1460 bool is_truncated;
1461
1462 ret = rgw_read_user_buckets(store, user_id, buckets,
1463 marker, string(), max_entries, false,
1464 &is_truncated);
1465 if (ret < 0)
1466 return ret;
1467
1468 map<string, RGWBucketEnt>& m_buckets = buckets.get_buckets();
1469
1470 for (const auto& iter : m_buckets) {
1471 auto& bucket = iter.second.bucket;
1472 uint32_t num_shards = 1;
1473 uint64_t num_objects = 0;
1474
1475 /* need info for num_shards */
1476 RGWBucketInfo info;
1477 RGWObjectCtx obj_ctx(store);
1478
1479 marker = bucket.name; /* Casey's location for marker update,
1480 * as we may now not reach the end of
1481 * the loop body */
1482
1483 ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name,
1484 info, nullptr);
1485 if (ret < 0)
1486 continue;
1487
1488 /* need stats for num_entries */
1489 string bucket_ver, master_ver;
1490 std::map<RGWObjCategory, RGWStorageStats> stats;
1491 ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver,
1492 &master_ver, stats, nullptr);
1493
1494 if (ret < 0)
1495 continue;
1496
1497 for (const auto& s : stats) {
1498 num_objects += s.second.num_objects;
1499 }
1500
1501 num_shards = info.num_shards;
1502 uint64_t objs_per_shard =
1503 (num_shards) ? num_objects/num_shards : num_objects;
1504 {
1505 bool warn = false;
1506 stringstream ss;
1507 if (objs_per_shard > safe_max_objs_per_shard) {
1508 double over =
1509 100 - (safe_max_objs_per_shard/objs_per_shard * 100);
1510 ss << boost::format("OVER %4f%%") % over;
1511 warn = true;
1512 } else {
1513 double fill_pct =
1514 objs_per_shard / safe_max_objs_per_shard * 100;
1515 if (fill_pct >= shard_warn_pct) {
1516 ss << boost::format("WARN %4f%%") % fill_pct;
1517 warn = true;
1518 } else {
1519 ss << "OK";
1520 }
1521 }
1522
1523 if (warn || (! warnings_only)) {
1524 formatter->open_object_section("bucket");
1525 formatter->dump_string("bucket", bucket.name);
1526 formatter->dump_string("tenant", bucket.tenant);
1527 formatter->dump_int("num_objects", num_objects);
1528 formatter->dump_int("num_shards", num_shards);
1529 formatter->dump_int("objects_per_shard", objs_per_shard);
1530 formatter->dump_string("fill_status", ss.str());
1531 formatter->close_section();
1532 }
1533 }
1534 }
1535
1536 done = (m_buckets.size() < max_entries);
1537 } while (!done); /* foreach: bucket */
1538
1539 formatter->close_section();
1540 formatter->close_section();
1541 formatter->flush(cout);
1542
1543 } /* foreach: user_id */
1544
1545 formatter->close_section();
1546 formatter->flush(cout);
1547
1548 return ret;
1549 } /* RGWBucketAdminOp::limit_check */
1550
1551 int RGWBucketAdminOp::info(RGWRados *store, RGWBucketAdminOpState& op_state,
1552 RGWFormatterFlusher& flusher)
1553 {
1554 RGWBucket bucket;
1555 int ret;
1556
1557 string bucket_name = op_state.get_bucket_name();
1558
1559 if (!bucket_name.empty()) {
1560 ret = bucket.init(store, op_state);
1561 if (ret < 0)
1562 return ret;
1563 }
1564
1565 Formatter *formatter = flusher.get_formatter();
1566 flusher.start(0);
1567
1568 CephContext *cct = store->ctx();
1569
1570 const size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
1571
1572 bool show_stats = op_state.will_fetch_stats();
1573 rgw_user user_id = op_state.get_user_id();
1574 if (op_state.is_user_op()) {
1575 formatter->open_array_section("buckets");
1576
1577 RGWUserBuckets buckets;
1578 string marker;
1579 bool is_truncated = false;
1580
1581 do {
1582 ret = rgw_read_user_buckets(store, op_state.get_user_id(), buckets,
1583 marker, string(), max_entries, false,
1584 &is_truncated);
1585 if (ret < 0)
1586 return ret;
1587
1588 map<string, RGWBucketEnt>& m = buckets.get_buckets();
1589 map<string, RGWBucketEnt>::iterator iter;
1590
1591 for (iter = m.begin(); iter != m.end(); ++iter) {
1592 std::string obj_name = iter->first;
1593 if (show_stats)
1594 bucket_stats(store, user_id.tenant, obj_name, formatter);
1595 else
1596 formatter->dump_string("bucket", obj_name);
1597
1598 marker = obj_name;
1599 }
1600
1601 flusher.flush();
1602 } while (is_truncated);
1603
1604 formatter->close_section();
1605 } else if (!bucket_name.empty()) {
1606 bucket_stats(store, user_id.tenant, bucket_name, formatter);
1607 } else {
1608 RGWAccessHandle handle;
1609
1610 formatter->open_array_section("buckets");
1611 if (store->list_buckets_init(&handle) >= 0) {
1612 rgw_bucket_dir_entry obj;
1613 while (store->list_buckets_next(obj, &handle) >= 0) {
1614 if (show_stats)
1615 bucket_stats(store, user_id.tenant, obj.key.name, formatter);
1616 else
1617 formatter->dump_string("bucket", obj.key.name);
1618 }
1619 }
1620
1621 formatter->close_section();
1622 }
1623
1624 flusher.flush();
1625
1626 return 0;
1627 }
1628
1629
1630 void rgw_data_change::dump(Formatter *f) const
1631 {
1632 string type;
1633 switch (entity_type) {
1634 case ENTITY_TYPE_BUCKET:
1635 type = "bucket";
1636 break;
1637 default:
1638 type = "unknown";
1639 }
1640 encode_json("entity_type", type, f);
1641 encode_json("key", key, f);
1642 utime_t ut(timestamp);
1643 encode_json("timestamp", ut, f);
1644 }
1645
1646 void rgw_data_change::decode_json(JSONObj *obj) {
1647 string s;
1648 JSONDecoder::decode_json("entity_type", s, obj);
1649 if (s == "bucket") {
1650 entity_type = ENTITY_TYPE_BUCKET;
1651 } else {
1652 entity_type = ENTITY_TYPE_UNKNOWN;
1653 }
1654 JSONDecoder::decode_json("key", key, obj);
1655 utime_t ut;
1656 JSONDecoder::decode_json("timestamp", ut, obj);
1657 timestamp = ut.to_real_time();
1658 }
1659
1660 void rgw_data_change_log_entry::dump(Formatter *f) const
1661 {
1662 encode_json("log_id", log_id, f);
1663 utime_t ut(log_timestamp);
1664 encode_json("log_timestamp", ut, f);
1665 encode_json("entry", entry, f);
1666 }
1667
1668 void rgw_data_change_log_entry::decode_json(JSONObj *obj) {
1669 JSONDecoder::decode_json("log_id", log_id, obj);
1670 utime_t ut;
1671 JSONDecoder::decode_json("log_timestamp", ut, obj);
1672 log_timestamp = ut.to_real_time();
1673 JSONDecoder::decode_json("entry", entry, obj);
1674 }
1675
1676 int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
1677 const string& name = bs.bucket.name;
1678 int shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
1679 uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards;
1680
1681 return (int)r;
1682 }
1683
1684 int RGWDataChangesLog::renew_entries()
1685 {
1686 if (!store->need_to_log_data())
1687 return 0;
1688
1689 /* we can't keep the bucket name as part of the cls_log_entry, and we need
1690 * it later, so we keep two lists under the map */
1691 map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > > m;
1692
1693 lock.Lock();
1694 map<rgw_bucket_shard, bool> entries;
1695 entries.swap(cur_cycle);
1696 lock.Unlock();
1697
1698 map<rgw_bucket_shard, bool>::iterator iter;
1699 string section;
1700 real_time ut = real_clock::now();
1701 for (iter = entries.begin(); iter != entries.end(); ++iter) {
1702 const rgw_bucket_shard& bs = iter->first;
1703
1704 int index = choose_oid(bs);
1705
1706 cls_log_entry entry;
1707
1708 rgw_data_change change;
1709 bufferlist bl;
1710 change.entity_type = ENTITY_TYPE_BUCKET;
1711 change.key = bs.get_key();
1712 change.timestamp = ut;
1713 ::encode(change, bl);
1714
1715 store->time_log_prepare_entry(entry, ut, section, change.key, bl);
1716
1717 m[index].first.push_back(bs);
1718 m[index].second.emplace_back(std::move(entry));
1719 }
1720
1721 map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > >::iterator miter;
1722 for (miter = m.begin(); miter != m.end(); ++miter) {
1723 list<cls_log_entry>& entries = miter->second.second;
1724
1725 real_time now = real_clock::now();
1726
1727 int ret = store->time_log_add(oids[miter->first], entries, NULL);
1728 if (ret < 0) {
1729 /* we don't really need to have a special handling for failed cases here,
1730 * as this is just an optimization. */
1731 lderr(cct) << "ERROR: store->time_log_add() returned " << ret << dendl;
1732 return ret;
1733 }
1734
1735 real_time expiration = now;
1736 expiration += make_timespan(cct->_conf->rgw_data_log_window);
1737
1738 list<rgw_bucket_shard>& buckets = miter->second.first;
1739 list<rgw_bucket_shard>::iterator liter;
1740 for (liter = buckets.begin(); liter != buckets.end(); ++liter) {
1741 update_renewed(*liter, expiration);
1742 }
1743 }
1744
1745 return 0;
1746 }
1747
1748 void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status)
1749 {
1750 assert(lock.is_locked());
1751 if (!changes.find(bs, status)) {
1752 status = ChangeStatusPtr(new ChangeStatus);
1753 changes.add(bs, status);
1754 }
1755 }
1756
1757 void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs)
1758 {
1759 Mutex::Locker l(lock);
1760 cur_cycle[bs] = true;
1761 }
1762
1763 void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, real_time& expiration)
1764 {
1765 Mutex::Locker l(lock);
1766 ChangeStatusPtr status;
1767 _get_change(bs, status);
1768
1769 ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs.bucket.name << " shard_id=" << bs.shard_id << " expiration=" << expiration << dendl;
1770 status->cur_expiration = expiration;
1771 }
1772
1773 int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) {
1774 rgw_bucket_shard bs(bucket, shard_id);
1775
1776 return choose_oid(bs);
1777 }
1778
1779 int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) {
1780 if (!store->need_to_log_data())
1781 return 0;
1782
1783 rgw_bucket_shard bs(bucket, shard_id);
1784
1785 int index = choose_oid(bs);
1786 mark_modified(index, bs);
1787
1788 lock.Lock();
1789
1790 ChangeStatusPtr status;
1791 _get_change(bs, status);
1792
1793 lock.Unlock();
1794
1795 real_time now = real_clock::now();
1796
1797 status->lock->Lock();
1798
1799 ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
1800
1801 if (now < status->cur_expiration) {
1802 /* no need to send, recently completed */
1803 status->lock->Unlock();
1804
1805 register_renew(bs);
1806 return 0;
1807 }
1808
1809 RefCountedCond *cond;
1810
1811 if (status->pending) {
1812 cond = status->cond;
1813
1814 assert(cond);
1815
1816 status->cond->get();
1817 status->lock->Unlock();
1818
1819 int ret = cond->wait();
1820 cond->put();
1821 if (!ret) {
1822 register_renew(bs);
1823 }
1824 return ret;
1825 }
1826
1827 status->cond = new RefCountedCond;
1828 status->pending = true;
1829
1830 string& oid = oids[index];
1831 real_time expiration;
1832
1833 int ret;
1834
1835 do {
1836 status->cur_sent = now;
1837
1838 expiration = now;
1839 expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
1840
1841 status->lock->Unlock();
1842
1843 bufferlist bl;
1844 rgw_data_change change;
1845 change.entity_type = ENTITY_TYPE_BUCKET;
1846 change.key = bs.get_key();
1847 change.timestamp = now;
1848 ::encode(change, bl);
1849 string section;
1850
1851 ldout(cct, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
1852
1853 ret = store->time_log_add(oid, now, section, change.key, bl);
1854
1855 now = real_clock::now();
1856
1857 status->lock->Lock();
1858
1859 } while (!ret && real_clock::now() > expiration);
1860
1861 cond = status->cond;
1862
1863 status->pending = false;
1864 status->cur_expiration = status->cur_sent; /* time of when operation started, not completed */
1865 status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window);
1866 status->cond = NULL;
1867 status->lock->Unlock();
1868
1869 cond->done(ret);
1870 cond->put();
1871
1872 return ret;
1873 }
1874
1875 int RGWDataChangesLog::list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
1876 list<rgw_data_change_log_entry>& entries,
1877 const string& marker,
1878 string *out_marker,
1879 bool *truncated) {
1880 if (shard >= num_shards)
1881 return -EINVAL;
1882
1883 list<cls_log_entry> log_entries;
1884
1885 int ret = store->time_log_list(oids[shard], start_time, end_time,
1886 max_entries, log_entries, marker,
1887 out_marker, truncated);
1888 if (ret < 0)
1889 return ret;
1890
1891 list<cls_log_entry>::iterator iter;
1892 for (iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
1893 rgw_data_change_log_entry log_entry;
1894 log_entry.log_id = iter->id;
1895 real_time rt = iter->timestamp.to_real_time();
1896 log_entry.log_timestamp = rt;
1897 bufferlist::iterator liter = iter->data.begin();
1898 try {
1899 ::decode(log_entry.entry, liter);
1900 } catch (buffer::error& err) {
1901 lderr(cct) << "ERROR: failed to decode data changes log entry" << dendl;
1902 return -EIO;
1903 }
1904 entries.push_back(log_entry);
1905 }
1906
1907 return 0;
1908 }
1909
1910 int RGWDataChangesLog::list_entries(const real_time& start_time, const real_time& end_time, int max_entries,
1911 list<rgw_data_change_log_entry>& entries, LogMarker& marker, bool *ptruncated) {
1912 bool truncated;
1913 entries.clear();
1914
1915 for (; marker.shard < num_shards && (int)entries.size() < max_entries;
1916 marker.shard++, marker.marker.clear()) {
1917 int ret = list_entries(marker.shard, start_time, end_time, max_entries - entries.size(), entries,
1918 marker.marker, NULL, &truncated);
1919 if (ret == -ENOENT) {
1920 continue;
1921 }
1922 if (ret < 0) {
1923 return ret;
1924 }
1925 if (truncated) {
1926 *ptruncated = true;
1927 return 0;
1928 }
1929 }
1930
1931 *ptruncated = (marker.shard < num_shards);
1932
1933 return 0;
1934 }
1935
1936 int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info)
1937 {
1938 if (shard_id >= num_shards)
1939 return -EINVAL;
1940
1941 string oid = oids[shard_id];
1942
1943 cls_log_header header;
1944
1945 int ret = store->time_log_info(oid, &header);
1946 if ((ret < 0) && (ret != -ENOENT))
1947 return ret;
1948
1949 info->marker = header.max_marker;
1950 info->last_update = header.max_time.to_real_time();
1951
1952 return 0;
1953 }
1954
1955 int RGWDataChangesLog::trim_entries(int shard_id, const real_time& start_time, const real_time& end_time,
1956 const string& start_marker, const string& end_marker)
1957 {
1958 int ret;
1959
1960 if (shard_id > num_shards)
1961 return -EINVAL;
1962
1963 ret = store->time_log_trim(oids[shard_id], start_time, end_time, start_marker, end_marker);
1964
1965 if (ret == -ENOENT || ret == -ENODATA)
1966 ret = 0;
1967
1968 return ret;
1969 }
1970
1971 int RGWDataChangesLog::trim_entries(const real_time& start_time, const real_time& end_time,
1972 const string& start_marker, const string& end_marker)
1973 {
1974 for (int shard = 0; shard < num_shards; shard++) {
1975 int ret = store->time_log_trim(oids[shard], start_time, end_time, start_marker, end_marker);
1976 if (ret == -ENOENT || ret == -ENODATA) {
1977 continue;
1978 }
1979 if (ret < 0)
1980 return ret;
1981 }
1982
1983 return 0;
1984 }
1985
1986 bool RGWDataChangesLog::going_down()
1987 {
1988 return down_flag;
1989 }
1990
1991 RGWDataChangesLog::~RGWDataChangesLog() {
1992 down_flag = true;
1993 renew_thread->stop();
1994 renew_thread->join();
1995 delete renew_thread;
1996 delete[] oids;
1997 }
1998
1999 void *RGWDataChangesLog::ChangesRenewThread::entry() {
2000 do {
2001 dout(2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl;
2002 int r = log->renew_entries();
2003 if (r < 0) {
2004 dout(0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl;
2005 }
2006
2007 if (log->going_down())
2008 break;
2009
2010 int interval = cct->_conf->rgw_data_log_window * 3 / 4;
2011 lock.Lock();
2012 cond.WaitInterval(lock, utime_t(interval, 0));
2013 lock.Unlock();
2014 } while (!log->going_down());
2015
2016 return NULL;
2017 }
2018
2019 void RGWDataChangesLog::ChangesRenewThread::stop()
2020 {
2021 Mutex::Locker l(lock);
2022 cond.Signal();
2023 }
2024
2025 void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
2026 {
2027 auto key = bs.get_key();
2028 modified_lock.get_read();
2029 map<int, set<string> >::iterator iter = modified_shards.find(shard_id);
2030 if (iter != modified_shards.end()) {
2031 set<string>& keys = iter->second;
2032 if (keys.find(key) != keys.end()) {
2033 modified_lock.unlock();
2034 return;
2035 }
2036 }
2037 modified_lock.unlock();
2038
2039 RWLock::WLocker wl(modified_lock);
2040 modified_shards[shard_id].insert(key);
2041 }
2042
2043 void RGWDataChangesLog::read_clear_modified(map<int, set<string> > &modified)
2044 {
2045 RWLock::WLocker wl(modified_lock);
2046 modified.swap(modified_shards);
2047 modified_shards.clear();
2048 }
2049
2050 void RGWBucketCompleteInfo::dump(Formatter *f) const {
2051 encode_json("bucket_info", info, f);
2052 encode_json("attrs", attrs, f);
2053 }
2054
2055 void RGWBucketCompleteInfo::decode_json(JSONObj *obj) {
2056 JSONDecoder::decode_json("bucket_info", info, obj);
2057 JSONDecoder::decode_json("attrs", attrs, obj);
2058 }
2059
2060 class RGWBucketMetadataHandler : public RGWMetadataHandler {
2061
2062 public:
2063 string get_type() override { return "bucket"; }
2064
2065 int get(RGWRados *store, string& entry, RGWMetadataObject **obj) override {
2066 RGWObjVersionTracker ot;
2067 RGWBucketEntryPoint be;
2068
2069 real_time mtime;
2070 map<string, bufferlist> attrs;
2071 RGWObjectCtx obj_ctx(store);
2072
2073 string tenant_name, bucket_name;
2074 parse_bucket(entry, &tenant_name, &bucket_name);
2075 int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, be, &ot, &mtime, &attrs);
2076 if (ret < 0)
2077 return ret;
2078
2079 RGWBucketEntryMetadataObject *mdo = new RGWBucketEntryMetadataObject(be, ot.read_version, mtime);
2080
2081 *obj = mdo;
2082
2083 return 0;
2084 }
2085
2086 int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
2087 real_time mtime, JSONObj *obj, sync_type_t sync_type) override {
2088 RGWBucketEntryPoint be, old_be;
2089 try {
2090 decode_json_obj(be, obj);
2091 } catch (JSONDecoder::err& e) {
2092 return -EINVAL;
2093 }
2094
2095 real_time orig_mtime;
2096 map<string, bufferlist> attrs;
2097
2098 RGWObjVersionTracker old_ot;
2099 RGWObjectCtx obj_ctx(store);
2100
2101 string tenant_name, bucket_name;
2102 parse_bucket(entry, &tenant_name, &bucket_name);
2103 int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, old_be, &old_ot, &orig_mtime, &attrs);
2104 if (ret < 0 && ret != -ENOENT)
2105 return ret;
2106
2107 // are we actually going to perform this put, or is it too old?
2108 if (ret != -ENOENT &&
2109 !check_versions(old_ot.read_version, orig_mtime,
2110 objv_tracker.write_version, mtime, sync_type)) {
2111 return STATUS_NO_APPLY;
2112 }
2113
2114 objv_tracker.read_version = old_ot.read_version; /* maintain the obj version we just read */
2115
2116 ret = store->put_bucket_entrypoint_info(tenant_name, bucket_name, be, false, objv_tracker, mtime, &attrs);
2117 if (ret < 0)
2118 return ret;
2119
2120 /* link bucket */
2121 if (be.linked) {
2122 ret = rgw_link_bucket(store, be.owner, be.bucket, be.creation_time, false);
2123 } else {
2124 ret = rgw_unlink_bucket(store, be.owner, be.bucket.tenant, be.bucket.name, false);
2125 }
2126
2127 return ret;
2128 }
2129
2130 struct list_keys_info {
2131 RGWRados *store;
2132 RGWListRawObjsCtx ctx;
2133 };
2134
2135 int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) override {
2136 RGWBucketEntryPoint be;
2137 RGWObjectCtx obj_ctx(store);
2138
2139 string tenant_name, bucket_name;
2140 parse_bucket(entry, &tenant_name, &bucket_name);
2141 int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, be, &objv_tracker, NULL, NULL);
2142 if (ret < 0)
2143 return ret;
2144
2145 /*
2146 * We're unlinking the bucket but we don't want to update the entrypoint here - we're removing
2147 * it immediately and don't want to invalidate our cached objv_version or the bucket obj removal
2148 * will incorrectly fail.
2149 */
2150 ret = rgw_unlink_bucket(store, be.owner, tenant_name, bucket_name, false);
2151 if (ret < 0) {
2152 lderr(store->ctx()) << "could not unlink bucket=" << entry << " owner=" << be.owner << dendl;
2153 }
2154
2155 ret = rgw_bucket_delete_bucket_obj(store, tenant_name, bucket_name, objv_tracker);
2156 if (ret < 0) {
2157 lderr(store->ctx()) << "could not delete bucket=" << entry << dendl;
2158 }
2159 /* idempotent */
2160 return 0;
2161 }
2162
2163 void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {
2164 oid = key;
2165 pool = store->get_zone_params().domain_root;
2166 }
2167
2168 int list_keys_init(RGWRados *store, const string& marker, void **phandle) override {
2169 auto info = ceph::make_unique<list_keys_info>();
2170
2171 info->store = store;
2172
2173 int ret = store->list_raw_objects_init(store->get_zone_params().domain_root, marker,
2174 &info->ctx);
2175 if (ret < 0) {
2176 return ret;
2177 }
2178 *phandle = (void *)info.release();
2179
2180 return 0;
2181 }
2182
2183 int list_keys_next(void *handle, int max, list<string>& keys, bool *truncated) override {
2184 list_keys_info *info = static_cast<list_keys_info *>(handle);
2185
2186 string no_filter;
2187
2188 keys.clear();
2189
2190 RGWRados *store = info->store;
2191
2192 list<string> unfiltered_keys;
2193
2194 int ret = store->list_raw_objects_next(no_filter, max, info->ctx,
2195 unfiltered_keys, truncated);
2196 if (ret < 0 && ret != -ENOENT)
2197 return ret;
2198 if (ret == -ENOENT) {
2199 if (truncated)
2200 *truncated = false;
2201 return 0;
2202 }
2203
2204 // now filter out the system entries
2205 list<string>::iterator iter;
2206 for (iter = unfiltered_keys.begin(); iter != unfiltered_keys.end(); ++iter) {
2207 string& k = *iter;
2208
2209 if (k[0] != '.') {
2210 keys.push_back(k);
2211 }
2212 }
2213
2214 return 0;
2215 }
2216
2217 void list_keys_complete(void *handle) override {
2218 list_keys_info *info = static_cast<list_keys_info *>(handle);
2219 delete info;
2220 }
2221
2222 string get_marker(void *handle) {
2223 list_keys_info *info = static_cast<list_keys_info *>(handle);
2224 return info->store->list_raw_objs_get_cursor(info->ctx);
2225 }
2226 };
2227
2228 class RGWBucketInstanceMetadataHandler : public RGWMetadataHandler {
2229
2230 public:
2231 string get_type() override { return "bucket.instance"; }
2232
2233 int get(RGWRados *store, string& oid, RGWMetadataObject **obj) override {
2234 RGWBucketCompleteInfo bci;
2235
2236 real_time mtime;
2237 RGWObjectCtx obj_ctx(store);
2238
2239 int ret = store->get_bucket_instance_info(obj_ctx, oid, bci.info, &mtime, &bci.attrs);
2240 if (ret < 0)
2241 return ret;
2242
2243 RGWBucketInstanceMetadataObject *mdo = new RGWBucketInstanceMetadataObject(bci, bci.info.objv_tracker.read_version, mtime);
2244
2245 *obj = mdo;
2246
2247 return 0;
2248 }
2249
2250 int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
2251 real_time mtime, JSONObj *obj, sync_type_t sync_type) override {
2252 RGWBucketCompleteInfo bci, old_bci;
2253 try {
2254 decode_json_obj(bci, obj);
2255 } catch (JSONDecoder::err& e) {
2256 return -EINVAL;
2257 }
2258
2259 real_time orig_mtime;
2260 RGWObjectCtx obj_ctx(store);
2261
2262 int ret = store->get_bucket_instance_info(obj_ctx, entry, old_bci.info,
2263 &orig_mtime, &old_bci.attrs);
2264 bool exists = (ret != -ENOENT);
2265 if (ret < 0 && exists)
2266 return ret;
2267
2268 if (!exists || old_bci.info.bucket.bucket_id != bci.info.bucket.bucket_id) {
2269 /* a new bucket, we need to select a new bucket placement for it */
2270 auto key(entry);
2271 rgw_bucket_instance_oid_to_key(key);
2272 string tenant_name;
2273 string bucket_name;
2274 string bucket_instance;
2275 parse_bucket(key, &tenant_name, &bucket_name, &bucket_instance);
2276
2277 RGWZonePlacementInfo rule_info;
2278 bci.info.bucket.name = bucket_name;
2279 bci.info.bucket.bucket_id = bucket_instance;
2280 bci.info.bucket.tenant = tenant_name;
2281 ret = store->select_bucket_location_by_rule(bci.info.placement_rule, &rule_info);
2282 if (ret < 0) {
2283 ldout(store->ctx(), 0) << "ERROR: select_bucket_placement() returned " << ret << dendl;
2284 return ret;
2285 }
2286 bci.info.index_type = rule_info.index_type;
2287 } else {
2288 /* existing bucket, keep its placement */
2289 bci.info.bucket.explicit_placement = old_bci.info.bucket.explicit_placement;
2290 bci.info.placement_rule = old_bci.info.placement_rule;
2291 }
2292
2293 if (exists && old_bci.info.datasync_flag_enabled() != bci.info.datasync_flag_enabled()) {
2294 int shards_num = bci.info.num_shards? bci.info.num_shards : 1;
2295 int shard_id = bci.info.num_shards? 0 : -1;
2296
2297 if (!bci.info.datasync_flag_enabled()) {
2298 ret = store->stop_bi_log_entries(bci.info, -1);
2299 if (ret < 0) {
2300 lderr(store->ctx()) << "ERROR: failed writing bilog" << dendl;
2301 return ret;
2302 }
2303 } else {
2304 ret = store->resync_bi_log_entries(bci.info, -1);
2305 if (ret < 0) {
2306 lderr(store->ctx()) << "ERROR: failed writing bilog" << dendl;
2307 return ret;
2308 }
2309 }
2310
2311 for (int i = 0; i < shards_num; ++i, ++shard_id) {
2312 ret = store->data_log->add_entry(bci.info.bucket, shard_id);
2313 if (ret < 0) {
2314 lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
2315 return ret;
2316 }
2317 }
2318 }
2319
2320 // are we actually going to perform this put, or is it too old?
2321 if (exists &&
2322 !check_versions(old_bci.info.objv_tracker.read_version, orig_mtime,
2323 objv_tracker.write_version, mtime, sync_type)) {
2324 objv_tracker.read_version = old_bci.info.objv_tracker.read_version;
2325 return STATUS_NO_APPLY;
2326 }
2327
2328 /* record the read version (if any), store the new version */
2329 bci.info.objv_tracker.read_version = old_bci.info.objv_tracker.read_version;
2330 bci.info.objv_tracker.write_version = objv_tracker.write_version;
2331
2332 ret = store->put_bucket_instance_info(bci.info, false, mtime, &bci.attrs);
2333 if (ret < 0)
2334 return ret;
2335
2336 objv_tracker = bci.info.objv_tracker;
2337
2338 ret = store->init_bucket_index(bci.info, bci.info.num_shards);
2339 if (ret < 0)
2340 return ret;
2341
2342 return STATUS_APPLIED;
2343 }
2344
2345 struct list_keys_info {
2346 RGWRados *store;
2347 RGWListRawObjsCtx ctx;
2348 };
2349
2350 int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) override {
2351 RGWBucketInfo info;
2352 RGWObjectCtx obj_ctx(store);
2353
2354 int ret = store->get_bucket_instance_info(obj_ctx, entry, info, NULL, NULL);
2355 if (ret < 0 && ret != -ENOENT)
2356 return ret;
2357
2358 return rgw_bucket_instance_remove_entry(store, entry, &info.objv_tracker);
2359 }
2360
2361 void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {
2362 oid = RGW_BUCKET_INSTANCE_MD_PREFIX + key;
2363 rgw_bucket_instance_key_to_oid(oid);
2364 pool = store->get_zone_params().domain_root;
2365 }
2366
2367 int list_keys_init(RGWRados *store, const string& marker, void **phandle) override {
2368 auto info = ceph::make_unique<list_keys_info>();
2369
2370 info->store = store;
2371
2372 int ret = store->list_raw_objects_init(store->get_zone_params().domain_root, marker,
2373 &info->ctx);
2374 if (ret < 0) {
2375 return ret;
2376 }
2377 *phandle = (void *)info.release();
2378
2379 return 0;
2380 }
2381
2382 int list_keys_next(void *handle, int max, list<string>& keys, bool *truncated) override {
2383 list_keys_info *info = static_cast<list_keys_info *>(handle);
2384
2385 string no_filter;
2386
2387 keys.clear();
2388
2389 RGWRados *store = info->store;
2390
2391 list<string> unfiltered_keys;
2392
2393 int ret = store->list_raw_objects_next(no_filter, max, info->ctx,
2394 unfiltered_keys, truncated);
2395 if (ret < 0 && ret != -ENOENT)
2396 return ret;
2397 if (ret == -ENOENT) {
2398 if (truncated)
2399 *truncated = false;
2400 return 0;
2401 }
2402
2403 constexpr int prefix_size = sizeof(RGW_BUCKET_INSTANCE_MD_PREFIX) - 1;
2404 // now filter in the relevant entries
2405 list<string>::iterator iter;
2406 for (iter = unfiltered_keys.begin(); iter != unfiltered_keys.end(); ++iter) {
2407 string& k = *iter;
2408
2409 if (k.compare(0, prefix_size, RGW_BUCKET_INSTANCE_MD_PREFIX) == 0) {
2410 auto oid = k.substr(prefix_size);
2411 rgw_bucket_instance_oid_to_key(oid);
2412 keys.emplace_back(std::move(oid));
2413 }
2414 }
2415
2416 return 0;
2417 }
2418
2419 void list_keys_complete(void *handle) override {
2420 list_keys_info *info = static_cast<list_keys_info *>(handle);
2421 delete info;
2422 }
2423
2424 string get_marker(void *handle) {
2425 list_keys_info *info = static_cast<list_keys_info *>(handle);
2426 return info->store->list_raw_objs_get_cursor(info->ctx);
2427 }
2428
2429 /*
2430 * hash entry for mdlog placement. Use the same hash key we'd have for the bucket entry
2431 * point, so that the log entries end up at the same log shard, so that we process them
2432 * in order
2433 */
2434 void get_hash_key(const string& section, const string& key, string& hash_key) override {
2435 string k;
2436 int pos = key.find(':');
2437 if (pos < 0)
2438 k = key;
2439 else
2440 k = key.substr(0, pos);
2441 hash_key = "bucket:" + k;
2442 }
2443 };
2444
2445 void rgw_bucket_init(RGWMetadataManager *mm)
2446 {
2447 bucket_meta_handler = new RGWBucketMetadataHandler;
2448 mm->register_handler(bucket_meta_handler);
2449 bucket_instance_meta_handler = new RGWBucketInstanceMetadataHandler;
2450 mm->register_handler(bucket_instance_meta_handler);
2451 }