]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_bucket.cc
2e1e8b39ac2adfb03f6205a9cbdf20eaf9fd79cc
[ceph.git] / ceph / src / rgw / rgw_bucket.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <errno.h>
5
6 #include <string>
7 #include <map>
8 #include <sstream>
9
10 #include <boost/utility/string_ref.hpp>
11 #include <boost/format.hpp>
12
13 #include "common/errno.h"
14 #include "common/ceph_json.h"
15 #include "include/scope_guard.h"
16
17 #include "rgw_rados.h"
18 #include "rgw_zone.h"
19 #include "rgw_acl.h"
20 #include "rgw_acl_s3.h"
21 #include "rgw_tag_s3.h"
22
23 #include "include/types.h"
24 #include "rgw_bucket.h"
25 #include "rgw_user.h"
26 #include "rgw_string.h"
27 #include "rgw_multi.h"
28 #include "rgw_op.h"
29 #include "rgw_bucket_sync.h"
30
31 #include "services/svc_zone.h"
32 #include "services/svc_sys_obj.h"
33 #include "services/svc_bucket.h"
34 #include "services/svc_bucket_sync.h"
35 #include "services/svc_meta.h"
36 #include "services/svc_meta_be_sobj.h"
37 #include "services/svc_user.h"
38 #include "services/svc_cls.h"
39 #include "services/svc_bilog_rados.h"
40 #include "services/svc_datalog_rados.h"
41
42 #include "include/rados/librados.hpp"
43 // until everything is moved from rgw_common
44 #include "rgw_common.h"
45 #include "rgw_reshard.h"
46 #include "rgw_lc.h"
47
48 // stolen from src/cls/version/cls_version.cc
49 #define VERSION_ATTR "ceph.objclass.version"
50
51 #include "cls/user/cls_user_types.h"
52
53 #include "rgw_sal.h"
54
55 #define dout_context g_ceph_context
56 #define dout_subsys ceph_subsys_rgw
57
58 #define BUCKET_TAG_TIMEOUT 30
59
60 // default number of entries to list with each bucket listing call
61 // (use marker to bridge between calls)
62 static constexpr size_t listing_max_entries = 1000;
63
64
65 /*
66 * The tenant_name is always returned on purpose. May be empty, of course.
67 */
68 static void parse_bucket(const string& bucket,
69 string *tenant_name,
70 string *bucket_name,
71 string *bucket_instance = nullptr /* optional */)
72 {
73 /*
74 * expected format: [tenant/]bucket:bucket_instance
75 */
76 int pos = bucket.find('/');
77 if (pos >= 0) {
78 *tenant_name = bucket.substr(0, pos);
79 } else {
80 tenant_name->clear();
81 }
82 string bn = bucket.substr(pos + 1);
83 pos = bn.find (':');
84 if (pos < 0) {
85 *bucket_name = std::move(bn);
86 return;
87 }
88 *bucket_name = bn.substr(0, pos);
89 if (bucket_instance) {
90 *bucket_instance = bn.substr(pos + 1);
91 }
92
93 /*
94 * deal with the possible tenant:bucket:bucket_instance case
95 */
96 if (tenant_name->empty()) {
97 pos = bucket_instance->find(':');
98 if (pos >= 0) {
99 *tenant_name = *bucket_name;
100 *bucket_name = bucket_instance->substr(0, pos);
101 *bucket_instance = bucket_instance->substr(pos + 1);
102 }
103 }
104 }
105
106 /*
107 * Note that this is not a reversal of parse_bucket(). That one deals
108 * with the syntax we need in metadata and such. This one deals with
109 * the representation in RADOS pools. We chose '/' because it's not
110 * acceptable in bucket names and thus qualified buckets cannot conflict
111 * with the legacy or S3 buckets.
112 */
113 std::string rgw_make_bucket_entry_name(const std::string& tenant_name,
114 const std::string& bucket_name) {
115 std::string bucket_entry;
116
117 if (bucket_name.empty()) {
118 bucket_entry.clear();
119 } else if (tenant_name.empty()) {
120 bucket_entry = bucket_name;
121 } else {
122 bucket_entry = tenant_name + "/" + bucket_name;
123 }
124
125 return bucket_entry;
126 }
127
128 /*
129 * Tenants are separated from buckets in URLs by a colon in S3.
130 * This function is not to be used on Swift URLs, not even for COPY arguments.
131 */
132 void rgw_parse_url_bucket(const string &bucket, const string& auth_tenant,
133 string &tenant_name, string &bucket_name) {
134
135 int pos = bucket.find(':');
136 if (pos >= 0) {
137 /*
138 * N.B.: We allow ":bucket" syntax with explicit empty tenant in order
139 * to refer to the legacy tenant, in case users in new named tenants
140 * want to access old global buckets.
141 */
142 tenant_name = bucket.substr(0, pos);
143 bucket_name = bucket.substr(pos + 1);
144 } else {
145 tenant_name = auth_tenant;
146 bucket_name = bucket;
147 }
148 }
149
150 /**
151 * Get all the buckets owned by a user and fill up an RGWUserBuckets with them.
152 * Returns: 0 on success, -ERR# on failure.
153 */
154 int rgw_read_user_buckets(rgw::sal::RGWRadosStore * store,
155 const rgw_user& user_id,
156 rgw::sal::RGWBucketList& buckets,
157 const string& marker,
158 const string& end_marker,
159 uint64_t max,
160 bool need_stats)
161 {
162 rgw::sal::RGWRadosUser user(store, user_id);
163 return user.list_buckets(marker, end_marker, max, need_stats, buckets);
164 }
165
166 int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *bucket_name, string *bucket_id, int *shard_id)
167 {
168 auto pos = bucket_instance.rfind(':');
169 if (pos == string::npos) {
170 return -EINVAL;
171 }
172
173 string first = bucket_instance.substr(0, pos);
174 string second = bucket_instance.substr(pos + 1);
175
176 pos = first.find(':');
177
178 if (pos == string::npos) {
179 *shard_id = -1;
180 *bucket_name = first;
181 *bucket_id = second;
182 return 0;
183 }
184
185 *bucket_name = first.substr(0, pos);
186 *bucket_id = first.substr(pos + 1);
187
188 string err;
189 *shard_id = strict_strtol(second.c_str(), 10, &err);
190 if (!err.empty()) {
191 return -EINVAL;
192 }
193
194 return 0;
195 }
196
197 // parse key in format: [tenant/]name:instance[:shard_id]
198 int rgw_bucket_parse_bucket_key(CephContext *cct, const string& key,
199 rgw_bucket *bucket, int *shard_id)
200 {
201 boost::string_ref name{key};
202 boost::string_ref instance;
203
204 // split tenant/name
205 auto pos = name.find('/');
206 if (pos != string::npos) {
207 auto tenant = name.substr(0, pos);
208 bucket->tenant.assign(tenant.begin(), tenant.end());
209 name = name.substr(pos + 1);
210 } else {
211 bucket->tenant.clear();
212 }
213
214 // split name:instance
215 pos = name.find(':');
216 if (pos != string::npos) {
217 instance = name.substr(pos + 1);
218 name = name.substr(0, pos);
219 }
220 bucket->name.assign(name.begin(), name.end());
221
222 // split instance:shard
223 pos = instance.find(':');
224 if (pos == string::npos) {
225 bucket->bucket_id.assign(instance.begin(), instance.end());
226 if (shard_id) {
227 *shard_id = -1;
228 }
229 return 0;
230 }
231
232 // parse shard id
233 auto shard = instance.substr(pos + 1);
234 string err;
235 auto id = strict_strtol(shard.data(), 10, &err);
236 if (!err.empty()) {
237 if (cct) {
238 ldout(cct, 0) << "ERROR: failed to parse bucket shard '"
239 << instance.data() << "': " << err << dendl;
240 }
241 return -EINVAL;
242 }
243
244 if (shard_id) {
245 *shard_id = id;
246 }
247 instance = instance.substr(0, pos);
248 bucket->bucket_id.assign(instance.begin(), instance.end());
249 return 0;
250 }
251
252 static void dump_mulipart_index_results(list<rgw_obj_index_key>& objs_to_unlink,
253 Formatter *f)
254 {
255 for (const auto& o : objs_to_unlink) {
256 f->dump_string("object", o.name);
257 }
258 }
259
260 void check_bad_user_bucket_mapping(rgw::sal::RGWRadosStore *store, const rgw_user& user_id,
261 bool fix)
262 {
263 rgw::sal::RGWBucketList user_buckets;
264 rgw::sal::RGWRadosUser user(store, user_id);
265 string marker;
266
267 CephContext *cct = store->ctx();
268
269 size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
270
271 do {
272 int ret = user.list_buckets(marker, string(), max_entries, false, user_buckets);
273 if (ret < 0) {
274 ldout(store->ctx(), 0) << "failed to read user buckets: "
275 << cpp_strerror(-ret) << dendl;
276 return;
277 }
278
279 map<string, rgw::sal::RGWBucket*>& buckets = user_buckets.get_buckets();
280 for (map<string, rgw::sal::RGWBucket*>::iterator i = buckets.begin();
281 i != buckets.end();
282 ++i) {
283 marker = i->first;
284
285 rgw::sal::RGWBucket* bucket = i->second;
286
287 RGWBucketInfo bucket_info;
288 real_time mtime;
289 int r = store->getRados()->get_bucket_info(store->svc(), user_id.tenant, bucket->get_name(), bucket_info, &mtime, null_yield);
290 if (r < 0) {
291 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << dendl;
292 continue;
293 }
294
295 rgw_bucket& actual_bucket = bucket_info.bucket;
296
297 if (actual_bucket.name.compare(bucket->get_name()) != 0 ||
298 actual_bucket.tenant.compare(bucket->get_tenant()) != 0 ||
299 actual_bucket.marker.compare(bucket->get_marker()) != 0 ||
300 actual_bucket.bucket_id.compare(bucket->get_bucket_id()) != 0) {
301 cout << "bucket info mismatch: expected " << actual_bucket << " got " << bucket << std::endl;
302 if (fix) {
303 cout << "fixing" << std::endl;
304 r = store->ctl()->bucket->link_bucket(user_id, actual_bucket,
305 bucket_info.creation_time,
306 null_yield);
307 if (r < 0) {
308 cerr << "failed to fix bucket: " << cpp_strerror(-r) << std::endl;
309 }
310 }
311 }
312 }
313 } while (user_buckets.is_truncated());
314 }
315
316 // note: function type conforms to RGWRados::check_filter_t
317 bool rgw_bucket_object_check_filter(const string& oid)
318 {
319 rgw_obj_key key;
320 string ns;
321 return rgw_obj_key::oid_to_key_in_ns(oid, &key, ns);
322 }
323
324 int rgw_remove_object(rgw::sal::RGWRadosStore *store, const RGWBucketInfo& bucket_info, const rgw_bucket& bucket, rgw_obj_key& key)
325 {
326 RGWObjectCtx rctx(store);
327
328 if (key.instance.empty()) {
329 key.instance = "null";
330 }
331
332 rgw_obj obj(bucket, key);
333
334 return store->getRados()->delete_obj(rctx, bucket_info, obj, bucket_info.versioning_status());
335 }
336
337 /* xxx dang */
338 static int rgw_remove_bucket(rgw::sal::RGWRadosStore *store, rgw_bucket& bucket, bool delete_children, optional_yield y)
339 {
340 int ret;
341 map<RGWObjCategory, RGWStorageStats> stats;
342 std::vector<rgw_bucket_dir_entry> objs;
343 map<string, bool> common_prefixes;
344 RGWBucketInfo info;
345
346 string bucket_ver, master_ver;
347
348 ret = store->getRados()->get_bucket_info(store->svc(), bucket.tenant, bucket.name, info, NULL, null_yield);
349 if (ret < 0)
350 return ret;
351
352 ret = store->getRados()->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL);
353 if (ret < 0)
354 return ret;
355
356 RGWRados::Bucket target(store->getRados(), info);
357 RGWRados::Bucket::List list_op(&target);
358 CephContext *cct = store->ctx();
359
360 list_op.params.list_versions = true;
361 list_op.params.allow_unordered = true;
362
363 bool is_truncated = false;
364 do {
365 objs.clear();
366
367 ret = list_op.list_objects(listing_max_entries, &objs, &common_prefixes,
368 &is_truncated, null_yield);
369 if (ret < 0)
370 return ret;
371
372 if (!objs.empty() && !delete_children) {
373 lderr(store->ctx()) << "ERROR: could not remove non-empty bucket " << bucket.name << dendl;
374 return -ENOTEMPTY;
375 }
376
377 for (const auto& obj : objs) {
378 rgw_obj_key key(obj.key);
379 ret = rgw_remove_object(store, info, bucket, key);
380 if (ret < 0 && ret != -ENOENT) {
381 return ret;
382 }
383 }
384 } while(is_truncated);
385
386 string prefix, delimiter;
387
388 ret = abort_bucket_multiparts(store, cct, info, prefix, delimiter);
389 if (ret < 0) {
390 return ret;
391 }
392
393 ret = store->ctl()->bucket->sync_user_stats(info.owner, info);
394 if ( ret < 0) {
395 dout(1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl;
396 }
397
398 RGWObjVersionTracker objv_tracker;
399
400 // if we deleted children above we will force delete, as any that
401 // remain is detrius from a prior bug
402 ret = store->getRados()->delete_bucket(info, objv_tracker, null_yield, !delete_children);
403 if (ret < 0) {
404 lderr(store->ctx()) << "ERROR: could not remove bucket " <<
405 bucket.name << dendl;
406 return ret;
407 }
408
409 ret = store->ctl()->bucket->unlink_bucket(info.owner, bucket, null_yield, false);
410 if (ret < 0) {
411 lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
412 }
413
414 return ret;
415 }
416
417 static int aio_wait(librados::AioCompletion *handle)
418 {
419 librados::AioCompletion *c = (librados::AioCompletion *)handle;
420 c->wait_for_complete();
421 int ret = c->get_return_value();
422 c->release();
423 return ret;
424 }
425
426 static int drain_handles(list<librados::AioCompletion *>& pending)
427 {
428 int ret = 0;
429 while (!pending.empty()) {
430 librados::AioCompletion *handle = pending.front();
431 pending.pop_front();
432 int r = aio_wait(handle);
433 if (r < 0) {
434 ret = r;
435 }
436 }
437 return ret;
438 }
439
440 int rgw_remove_bucket_bypass_gc(rgw::sal::RGWRadosStore *store, rgw_bucket& bucket,
441 int concurrent_max, bool keep_index_consistent,
442 optional_yield y)
443 {
444 int ret;
445 map<RGWObjCategory, RGWStorageStats> stats;
446 std::vector<rgw_bucket_dir_entry> objs;
447 map<string, bool> common_prefixes;
448 RGWBucketInfo info;
449 RGWObjectCtx obj_ctx(store);
450 CephContext *cct = store->ctx();
451
452 string bucket_ver, master_ver;
453
454 ret = store->getRados()->get_bucket_info(store->svc(), bucket.tenant, bucket.name, info, NULL, null_yield);
455 if (ret < 0)
456 return ret;
457
458 ret = store->getRados()->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL);
459 if (ret < 0)
460 return ret;
461
462 string prefix, delimiter;
463
464 ret = abort_bucket_multiparts(store, cct, info, prefix, delimiter);
465 if (ret < 0) {
466 return ret;
467 }
468
469 RGWRados::Bucket target(store->getRados(), info);
470 RGWRados::Bucket::List list_op(&target);
471
472 list_op.params.list_versions = true;
473 list_op.params.allow_unordered = true;
474
475 std::list<librados::AioCompletion*> handles;
476
477 int max_aio = concurrent_max;
478 bool is_truncated = true;
479
480 while (is_truncated) {
481 objs.clear();
482 ret = list_op.list_objects(listing_max_entries, &objs, &common_prefixes,
483 &is_truncated, null_yield);
484 if (ret < 0)
485 return ret;
486
487 std::vector<rgw_bucket_dir_entry>::iterator it = objs.begin();
488 for (; it != objs.end(); ++it) {
489 RGWObjState *astate = NULL;
490 rgw_obj obj(bucket, (*it).key);
491
492 ret = store->getRados()->get_obj_state(&obj_ctx, info, obj, &astate, false, y);
493 if (ret == -ENOENT) {
494 dout(1) << "WARNING: cannot find obj state for obj " << obj.get_oid() << dendl;
495 continue;
496 }
497 if (ret < 0) {
498 lderr(store->ctx()) << "ERROR: get obj state returned with error " << ret << dendl;
499 return ret;
500 }
501
502 if (astate->manifest) {
503 RGWObjManifest& manifest = *astate->manifest;
504 RGWObjManifest::obj_iterator miter = manifest.obj_begin();
505 rgw_obj head_obj = manifest.get_obj();
506 rgw_raw_obj raw_head_obj;
507 store->getRados()->obj_to_raw(info.placement_rule, head_obj, &raw_head_obj);
508
509
510 for (; miter != manifest.obj_end() && max_aio--; ++miter) {
511 if (!max_aio) {
512 ret = drain_handles(handles);
513 if (ret < 0) {
514 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
515 return ret;
516 }
517 max_aio = concurrent_max;
518 }
519
520 rgw_raw_obj last_obj = miter.get_location().get_raw_obj(store->getRados());
521 if (last_obj == raw_head_obj) {
522 // have the head obj deleted at the end
523 continue;
524 }
525
526 ret = store->getRados()->delete_raw_obj_aio(last_obj, handles);
527 if (ret < 0) {
528 lderr(store->ctx()) << "ERROR: delete obj aio failed with " << ret << dendl;
529 return ret;
530 }
531 } // for all shadow objs
532
533 ret = store->getRados()->delete_obj_aio(head_obj, info, astate, handles, keep_index_consistent, null_yield);
534 if (ret < 0) {
535 lderr(store->ctx()) << "ERROR: delete obj aio failed with " << ret << dendl;
536 return ret;
537 }
538 }
539
540 if (!max_aio) {
541 ret = drain_handles(handles);
542 if (ret < 0) {
543 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
544 return ret;
545 }
546 max_aio = concurrent_max;
547 }
548 obj_ctx.invalidate(obj);
549 } // for all RGW objects
550 }
551
552 ret = drain_handles(handles);
553 if (ret < 0) {
554 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
555 return ret;
556 }
557
558 ret = store->ctl()->bucket->sync_user_stats(info.owner, info);
559 if (ret < 0) {
560 dout(1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl;
561 }
562
563 RGWObjVersionTracker objv_tracker;
564
565 // this function can only be run if caller wanted children to be
566 // deleted, so we can ignore the check for children as any that
567 // remain are detritus from a prior bug
568 ret = store->getRados()->delete_bucket(info, objv_tracker, y, false);
569 if (ret < 0) {
570 lderr(store->ctx()) << "ERROR: could not remove bucket " << bucket.name << dendl;
571 return ret;
572 }
573
574 ret = store->ctl()->bucket->unlink_bucket(info.owner, bucket, null_yield, false);
575 if (ret < 0) {
576 lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
577 }
578
579 return ret;
580 }
581
582 static void set_err_msg(std::string *sink, std::string msg)
583 {
584 if (sink && !msg.empty())
585 *sink = msg;
586 }
587
588 int RGWBucket::init(rgw::sal::RGWRadosStore *storage, RGWBucketAdminOpState& op_state,
589 optional_yield y, std::string *err_msg,
590 map<string, bufferlist> *pattrs)
591 {
592 if (!storage) {
593 set_err_msg(err_msg, "no storage!");
594 return -EINVAL;
595 }
596
597 store = storage;
598
599 rgw_user user_id = op_state.get_user_id();
600 bucket.tenant = user_id.tenant;
601 bucket.name = op_state.get_bucket_name();
602
603 if (bucket.name.empty() && user_id.empty())
604 return -EINVAL;
605
606 // split possible tenant/name
607 auto pos = bucket.name.find('/');
608 if (pos != string::npos) {
609 bucket.tenant = bucket.name.substr(0, pos);
610 bucket.name = bucket.name.substr(pos + 1);
611 }
612
613 if (!bucket.name.empty()) {
614 int r = store->ctl()->bucket->read_bucket_info(
615 bucket, &bucket_info, y,
616 RGWBucketCtl::BucketInstance::GetParams().set_attrs(pattrs),
617 &ep_objv);
618 if (r < 0) {
619 set_err_msg(err_msg, "failed to fetch bucket info for bucket=" + bucket.name);
620 return r;
621 }
622
623 op_state.set_bucket(bucket_info.bucket);
624 }
625
626 if (!user_id.empty()) {
627 int r = store->ctl()->user->get_info_by_uid(user_id, &user_info, y);
628 if (r < 0) {
629 set_err_msg(err_msg, "failed to fetch user info");
630 return r;
631 }
632
633 op_state.display_name = user_info.display_name;
634 }
635
636 clear_failure();
637 return 0;
638 }
639
640 bool rgw_find_bucket_by_id(CephContext *cct, RGWMetadataManager *mgr,
641 const string& marker, const string& bucket_id, rgw_bucket* bucket_out)
642 {
643 void *handle = NULL;
644 bool truncated = false;
645 string s;
646
647 int ret = mgr->list_keys_init("bucket.instance", marker, &handle);
648 if (ret < 0) {
649 cerr << "ERROR: can't get key: " << cpp_strerror(-ret) << std::endl;
650 mgr->list_keys_complete(handle);
651 return -ret;
652 }
653 do {
654 list<string> keys;
655 ret = mgr->list_keys_next(handle, 1000, keys, &truncated);
656 if (ret < 0) {
657 cerr << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << std::endl;
658 mgr->list_keys_complete(handle);
659 return -ret;
660 }
661 for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) {
662 s = *iter;
663 ret = rgw_bucket_parse_bucket_key(cct, s, bucket_out, nullptr);
664 if (ret < 0) {
665 continue;
666 }
667 if (bucket_id == bucket_out->bucket_id) {
668 mgr->list_keys_complete(handle);
669 return true;
670 }
671 }
672 } while (truncated);
673 mgr->list_keys_complete(handle);
674 return false;
675 }
676
677 int RGWBucket::link(RGWBucketAdminOpState& op_state, optional_yield y,
678 map<string, bufferlist>& attrs, std::string *err_msg)
679 {
680 if (!op_state.is_user_op()) {
681 set_err_msg(err_msg, "empty user id");
682 return -EINVAL;
683 }
684
685 string bucket_id = op_state.get_bucket_id();
686
687 std::string display_name = op_state.get_user_display_name();
688 rgw_bucket& bucket = op_state.get_bucket();
689 if (!bucket_id.empty() && bucket_id != bucket.bucket_id) {
690 set_err_msg(err_msg,
691 "specified bucket id does not match " + bucket.bucket_id);
692 return -EINVAL;
693 }
694 rgw_bucket old_bucket = bucket;
695 rgw_user user_id = op_state.get_user_id();
696 bucket.tenant = user_id.tenant;
697 if (!op_state.new_bucket_name.empty()) {
698 auto pos = op_state.new_bucket_name.find('/');
699 if (pos != string::npos) {
700 bucket.tenant = op_state.new_bucket_name.substr(0, pos);
701 bucket.name = op_state.new_bucket_name.substr(pos + 1);
702 } else {
703 bucket.name = op_state.new_bucket_name;
704 }
705 }
706
707 RGWObjVersionTracker objv_tracker;
708 RGWObjVersionTracker old_version = bucket_info.objv_tracker;
709
710 map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_ACL);
711 if (aiter == attrs.end()) {
712 // should never happen; only pre-argonaut buckets lacked this.
713 ldout(store->ctx(), 0) << "WARNING: can't bucket link because no acl on bucket=" << old_bucket.name << dendl;
714 set_err_msg(err_msg,
715 "While crossing the Anavros you have displeased the goddess Hera."
716 " You must sacrifice your ancient bucket " + bucket.bucket_id);
717 return -EINVAL;
718 }
719 bufferlist& aclbl = aiter->second;
720 RGWAccessControlPolicy policy;
721 ACLOwner owner;
722 try {
723 auto iter = aclbl.cbegin();
724 decode(policy, iter);
725 owner = policy.get_owner();
726 } catch (buffer::error& err) {
727 set_err_msg(err_msg, "couldn't decode policy");
728 return -EIO;
729 }
730
731 auto bucket_ctl = store->ctl()->bucket;
732 int r = bucket_ctl->unlink_bucket(owner.get_id(), old_bucket, y, false);
733 if (r < 0) {
734 set_err_msg(err_msg, "could not unlink policy from user " + owner.get_id().to_str());
735 return r;
736 }
737
738 // now update the user for the bucket...
739 if (display_name.empty()) {
740 ldout(store->ctx(), 0) << "WARNING: user " << user_info.user_id << " has no display name set" << dendl;
741 }
742
743 RGWAccessControlPolicy policy_instance;
744 policy_instance.create_default(user_info.user_id, display_name);
745 owner = policy_instance.get_owner();
746
747 aclbl.clear();
748 policy_instance.encode(aclbl);
749
750 auto instance_params = RGWBucketCtl::BucketInstance::PutParams().set_attrs(&attrs);
751
752 bucket_info.owner = user_info.user_id;
753 if (bucket != old_bucket) {
754 bucket_info.bucket = bucket;
755 bucket_info.objv_tracker.version_for_read()->ver = 0;
756 instance_params.set_exclusive(true);
757 }
758
759 r = bucket_ctl->store_bucket_instance_info(bucket, bucket_info, y, instance_params);
760 if (r < 0) {
761 set_err_msg(err_msg, "ERROR: failed writing bucket instance info: " + cpp_strerror(-r));
762 return r;
763 }
764
765 RGWBucketEntryPoint ep;
766 ep.bucket = bucket_info.bucket;
767 ep.owner = user_info.user_id;
768 ep.creation_time = bucket_info.creation_time;
769 ep.linked = true;
770 map<string, bufferlist> ep_attrs;
771 rgw_ep_info ep_data{ep, ep_attrs};
772
773 /* link to user */
774 r = store->ctl()->bucket->link_bucket(user_info.user_id,
775 bucket_info.bucket,
776 ep.creation_time,
777 y, true, &ep_data);
778 if (r < 0) {
779 set_err_msg(err_msg, "failed to relink bucket");
780 return r;
781 }
782
783 if (bucket != old_bucket) {
784 // like RGWRados::delete_bucket -- excepting no bucket_index work.
785 r = bucket_ctl->remove_bucket_entrypoint_info(old_bucket, y,
786 RGWBucketCtl::Bucket::RemoveParams()
787 .set_objv_tracker(&ep_data.ep_objv));
788 if (r < 0) {
789 set_err_msg(err_msg, "failed to unlink old bucket endpoint " + old_bucket.tenant + "/" + old_bucket.name);
790 return r;
791 }
792
793 r = bucket_ctl->remove_bucket_instance_info(old_bucket, bucket_info, y,
794 RGWBucketCtl::BucketInstance::RemoveParams()
795 .set_objv_tracker(&old_version));
796 if (r < 0) {
797 set_err_msg(err_msg, "failed to unlink old bucket info");
798 return r;
799 }
800 }
801
802 return 0;
803 }
804
805 int RGWBucket::chown(RGWBucketAdminOpState& op_state, const string& marker,
806 optional_yield y, std::string *err_msg)
807 {
808 int ret = store->ctl()->bucket->chown(store, bucket_info, user_info.user_id,
809 user_info.display_name, marker, y);
810 if (ret < 0) {
811 set_err_msg(err_msg, "Failed to change object ownership: " + cpp_strerror(-ret));
812 }
813
814 return ret;
815 }
816
817 int RGWBucket::unlink(RGWBucketAdminOpState& op_state, optional_yield y, std::string *err_msg)
818 {
819 rgw_bucket bucket = op_state.get_bucket();
820
821 if (!op_state.is_user_op()) {
822 set_err_msg(err_msg, "could not fetch user or user bucket info");
823 return -EINVAL;
824 }
825
826 int r = store->ctl()->bucket->unlink_bucket(user_info.user_id, bucket, y);
827 if (r < 0) {
828 set_err_msg(err_msg, "error unlinking bucket" + cpp_strerror(-r));
829 }
830
831 return r;
832 }
833
834 int RGWBucket::set_quota(RGWBucketAdminOpState& op_state, std::string *err_msg)
835 {
836 rgw_bucket bucket = op_state.get_bucket();
837 RGWBucketInfo bucket_info;
838 map<string, bufferlist> attrs;
839 int r = store->getRados()->get_bucket_info(store->svc(), bucket.tenant, bucket.name, bucket_info, NULL, null_yield, &attrs);
840 if (r < 0) {
841 set_err_msg(err_msg, "could not get bucket info for bucket=" + bucket.name + ": " + cpp_strerror(-r));
842 return r;
843 }
844
845 bucket_info.quota = op_state.quota;
846 r = store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), &attrs);
847 if (r < 0) {
848 set_err_msg(err_msg, "ERROR: failed writing bucket instance info: " + cpp_strerror(-r));
849 return r;
850 }
851 return r;
852 }
853
854 int RGWBucket::remove(RGWBucketAdminOpState& op_state, optional_yield y, bool bypass_gc,
855 bool keep_index_consistent, std::string *err_msg)
856 {
857 bool delete_children = op_state.will_delete_children();
858 rgw_bucket bucket = op_state.get_bucket();
859 int ret;
860
861 if (bypass_gc) {
862 if (delete_children) {
863 ret = rgw_remove_bucket_bypass_gc(store, bucket, op_state.get_max_aio(), keep_index_consistent, y);
864 } else {
865 set_err_msg(err_msg, "purge objects should be set for gc to be bypassed");
866 return -EINVAL;
867 }
868 } else {
869 ret = rgw_remove_bucket(store, bucket, delete_children, y);
870 }
871
872 if (ret < 0) {
873 set_err_msg(err_msg, "unable to remove bucket" + cpp_strerror(-ret));
874 return ret;
875 }
876
877 return 0;
878 }
879
880 int RGWBucket::remove_object(RGWBucketAdminOpState& op_state, std::string *err_msg)
881 {
882 rgw_bucket bucket = op_state.get_bucket();
883 std::string object_name = op_state.get_object_name();
884
885 rgw_obj_key key(object_name);
886
887 int ret = rgw_remove_object(store, bucket_info, bucket, key);
888 if (ret < 0) {
889 set_err_msg(err_msg, "unable to remove object" + cpp_strerror(-ret));
890 return ret;
891 }
892
893 return 0;
894 }
895
896 static void dump_bucket_index(const RGWRados::ent_map_t& result, Formatter *f)
897 {
898 for (auto iter = result.begin(); iter != result.end(); ++iter) {
899 f->dump_string("object", iter->first);
900 }
901 }
902
903 static void dump_bucket_usage(map<RGWObjCategory, RGWStorageStats>& stats, Formatter *formatter)
904 {
905 map<RGWObjCategory, RGWStorageStats>::iterator iter;
906
907 formatter->open_object_section("usage");
908 for (iter = stats.begin(); iter != stats.end(); ++iter) {
909 RGWStorageStats& s = iter->second;
910 const char *cat_name = rgw_obj_category_name(iter->first);
911 formatter->open_object_section(cat_name);
912 s.dump(formatter);
913 formatter->close_section();
914 }
915 formatter->close_section();
916 }
917
918 static void dump_index_check(map<RGWObjCategory, RGWStorageStats> existing_stats,
919 map<RGWObjCategory, RGWStorageStats> calculated_stats,
920 Formatter *formatter)
921 {
922 formatter->open_object_section("check_result");
923 formatter->open_object_section("existing_header");
924 dump_bucket_usage(existing_stats, formatter);
925 formatter->close_section();
926 formatter->open_object_section("calculated_header");
927 dump_bucket_usage(calculated_stats, formatter);
928 formatter->close_section();
929 formatter->close_section();
930 }
931
932 int RGWBucket::check_bad_index_multipart(RGWBucketAdminOpState& op_state,
933 RGWFormatterFlusher& flusher ,std::string *err_msg)
934 {
935 bool fix_index = op_state.will_fix_index();
936 rgw_bucket bucket = op_state.get_bucket();
937
938 map<string, bool> common_prefixes;
939
940 bool is_truncated;
941 map<string, bool> meta_objs;
942 map<rgw_obj_index_key, string> all_objs;
943
944 RGWBucketInfo bucket_info;
945 auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
946 int r = store->getRados()->get_bucket_instance_info(obj_ctx, bucket, bucket_info, nullptr, nullptr, null_yield);
947 if (r < 0) {
948 ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): get_bucket_instance_info(bucket=" << bucket << ") returned r=" << r << dendl;
949 return r;
950 }
951
952 RGWRados::Bucket target(store->getRados(), bucket_info);
953 RGWRados::Bucket::List list_op(&target);
954
955 list_op.params.list_versions = true;
956 list_op.params.ns = RGW_OBJ_NS_MULTIPART;
957
958 do {
959 vector<rgw_bucket_dir_entry> result;
960 int r = list_op.list_objects(listing_max_entries, &result,
961 &common_prefixes, &is_truncated, null_yield);
962 if (r < 0) {
963 set_err_msg(err_msg, "failed to list objects in bucket=" + bucket.name +
964 " err=" + cpp_strerror(-r));
965
966 return r;
967 }
968
969 vector<rgw_bucket_dir_entry>::iterator iter;
970 for (iter = result.begin(); iter != result.end(); ++iter) {
971 rgw_obj_index_key key = iter->key;
972 rgw_obj obj(bucket, key);
973 string oid = obj.get_oid();
974
975 int pos = oid.find_last_of('.');
976 if (pos < 0) {
977 /* obj has no suffix */
978 all_objs[key] = oid;
979 } else {
980 /* obj has suffix */
981 string name = oid.substr(0, pos);
982 string suffix = oid.substr(pos + 1);
983
984 if (suffix.compare("meta") == 0) {
985 meta_objs[name] = true;
986 } else {
987 all_objs[key] = name;
988 }
989 }
990 }
991 } while (is_truncated);
992
993 list<rgw_obj_index_key> objs_to_unlink;
994 Formatter *f = flusher.get_formatter();
995
996 f->open_array_section("invalid_multipart_entries");
997
998 for (auto aiter = all_objs.begin(); aiter != all_objs.end(); ++aiter) {
999 string& name = aiter->second;
1000
1001 if (meta_objs.find(name) == meta_objs.end()) {
1002 objs_to_unlink.push_back(aiter->first);
1003 }
1004
1005 if (objs_to_unlink.size() > listing_max_entries) {
1006 if (fix_index) {
1007 int r = store->getRados()->remove_objs_from_index(bucket_info, objs_to_unlink);
1008 if (r < 0) {
1009 set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
1010 cpp_strerror(-r));
1011 return r;
1012 }
1013 }
1014
1015 dump_mulipart_index_results(objs_to_unlink, flusher.get_formatter());
1016 flusher.flush();
1017 objs_to_unlink.clear();
1018 }
1019 }
1020
1021 if (fix_index) {
1022 int r = store->getRados()->remove_objs_from_index(bucket_info, objs_to_unlink);
1023 if (r < 0) {
1024 set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
1025 cpp_strerror(-r));
1026
1027 return r;
1028 }
1029 }
1030
1031 dump_mulipart_index_results(objs_to_unlink, f);
1032 f->close_section();
1033 flusher.flush();
1034
1035 return 0;
1036 }
1037
1038 int RGWBucket::check_object_index(RGWBucketAdminOpState& op_state,
1039 RGWFormatterFlusher& flusher,
1040 optional_yield y,
1041 std::string *err_msg)
1042 {
1043
1044 bool fix_index = op_state.will_fix_index();
1045
1046 if (!fix_index) {
1047 set_err_msg(err_msg, "check-objects flag requires fix index enabled");
1048 return -EINVAL;
1049 }
1050
1051 store->getRados()->cls_obj_set_bucket_tag_timeout(bucket_info, BUCKET_TAG_TIMEOUT);
1052
1053 string prefix;
1054 string empty_delimiter;
1055 rgw_obj_index_key marker;
1056 bool is_truncated = true;
1057 bool cls_filtered = true;
1058
1059 Formatter *formatter = flusher.get_formatter();
1060 formatter->open_object_section("objects");
1061 uint16_t expansion_factor = 1;
1062 while (is_truncated) {
1063 RGWRados::ent_map_t result;
1064 result.reserve(listing_max_entries);
1065
1066 int r = store->getRados()->cls_bucket_list_ordered(
1067 bucket_info, RGW_NO_SHARD, marker, prefix, empty_delimiter,
1068 listing_max_entries, true, expansion_factor,
1069 result, &is_truncated, &cls_filtered, &marker,
1070 y, rgw_bucket_object_check_filter);
1071 if (r == -ENOENT) {
1072 break;
1073 } else if (r < 0 && r != -ENOENT) {
1074 set_err_msg(err_msg, "ERROR: failed operation r=" + cpp_strerror(-r));
1075 }
1076
1077 if (result.size() < listing_max_entries / 8) {
1078 ++expansion_factor;
1079 } else if (result.size() > listing_max_entries * 7 / 8 &&
1080 expansion_factor > 1) {
1081 --expansion_factor;
1082 }
1083
1084 dump_bucket_index(result, formatter);
1085 flusher.flush();
1086 }
1087
1088 formatter->close_section();
1089
1090 store->getRados()->cls_obj_set_bucket_tag_timeout(bucket_info, 0);
1091
1092 return 0;
1093 }
1094
1095
1096 int RGWBucket::check_index(RGWBucketAdminOpState& op_state,
1097 map<RGWObjCategory, RGWStorageStats>& existing_stats,
1098 map<RGWObjCategory, RGWStorageStats>& calculated_stats,
1099 std::string *err_msg)
1100 {
1101 bool fix_index = op_state.will_fix_index();
1102
1103 int r = store->getRados()->bucket_check_index(bucket_info, &existing_stats, &calculated_stats);
1104 if (r < 0) {
1105 set_err_msg(err_msg, "failed to check index error=" + cpp_strerror(-r));
1106 return r;
1107 }
1108
1109 if (fix_index) {
1110 r = store->getRados()->bucket_rebuild_index(bucket_info);
1111 if (r < 0) {
1112 set_err_msg(err_msg, "failed to rebuild index err=" + cpp_strerror(-r));
1113 return r;
1114 }
1115 }
1116
1117 return 0;
1118 }
1119
1120 int RGWBucket::sync(RGWBucketAdminOpState& op_state, map<string, bufferlist> *attrs, std::string *err_msg)
1121 {
1122 if (!store->svc()->zone->is_meta_master()) {
1123 set_err_msg(err_msg, "ERROR: failed to update bucket sync: only allowed on meta master zone");
1124 return EINVAL;
1125 }
1126 bool sync = op_state.will_sync_bucket();
1127 if (sync) {
1128 bucket_info.flags &= ~BUCKET_DATASYNC_DISABLED;
1129 } else {
1130 bucket_info.flags |= BUCKET_DATASYNC_DISABLED;
1131 }
1132
1133 int r = store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), attrs);
1134 if (r < 0) {
1135 set_err_msg(err_msg, "ERROR: failed writing bucket instance info:" + cpp_strerror(-r));
1136 return r;
1137 }
1138
1139 int shards_num = bucket_info.num_shards? bucket_info.num_shards : 1;
1140 int shard_id = bucket_info.num_shards? 0 : -1;
1141
1142 if (!sync) {
1143 r = store->svc()->bilog_rados->log_stop(bucket_info, -1);
1144 if (r < 0) {
1145 set_err_msg(err_msg, "ERROR: failed writing stop bilog:" + cpp_strerror(-r));
1146 return r;
1147 }
1148 } else {
1149 r = store->svc()->bilog_rados->log_start(bucket_info, -1);
1150 if (r < 0) {
1151 set_err_msg(err_msg, "ERROR: failed writing resync bilog:" + cpp_strerror(-r));
1152 return r;
1153 }
1154 }
1155
1156 for (int i = 0; i < shards_num; ++i, ++shard_id) {
1157 r = store->svc()->datalog_rados->add_entry(bucket_info, shard_id);
1158 if (r < 0) {
1159 set_err_msg(err_msg, "ERROR: failed writing data log:" + cpp_strerror(-r));
1160 return r;
1161 }
1162 }
1163
1164 return 0;
1165 }
1166
1167
1168 int RGWBucket::policy_bl_to_stream(bufferlist& bl, ostream& o)
1169 {
1170 RGWAccessControlPolicy_S3 policy(g_ceph_context);
1171 int ret = decode_bl(bl, policy);
1172 if (ret < 0) {
1173 ldout(store->ctx(),0) << "failed to decode RGWAccessControlPolicy" << dendl;
1174 }
1175 policy.to_xml(o);
1176 return 0;
1177 }
1178
1179 int rgw_object_get_attr(rgw::sal::RGWRadosStore* store, const RGWBucketInfo& bucket_info,
1180 const rgw_obj& obj, const char* attr_name,
1181 bufferlist& out_bl, optional_yield y)
1182 {
1183 RGWObjectCtx obj_ctx(store);
1184 RGWRados::Object op_target(store->getRados(), bucket_info, obj_ctx, obj);
1185 RGWRados::Object::Read rop(&op_target);
1186
1187 return rop.get_attr(attr_name, out_bl, y);
1188 }
1189
1190 int RGWBucket::get_policy(RGWBucketAdminOpState& op_state, RGWAccessControlPolicy& policy, optional_yield y)
1191 {
1192 std::string object_name = op_state.get_object_name();
1193 rgw_bucket bucket = op_state.get_bucket();
1194
1195 RGWBucketInfo bucket_info;
1196 map<string, bufferlist> attrs;
1197 int ret = store->getRados()->get_bucket_info(store->svc(), bucket.tenant, bucket.name, bucket_info, NULL, null_yield, &attrs);
1198 if (ret < 0) {
1199 return ret;
1200 }
1201
1202 if (!object_name.empty()) {
1203 bufferlist bl;
1204 rgw_obj obj(bucket, object_name);
1205
1206 ret = rgw_object_get_attr(store, bucket_info, obj, RGW_ATTR_ACL, bl, y);
1207 if (ret < 0){
1208 return ret;
1209 }
1210
1211 ret = decode_bl(bl, policy);
1212 if (ret < 0) {
1213 ldout(store->ctx(),0) << "failed to decode RGWAccessControlPolicy" << dendl;
1214 }
1215 return ret;
1216 }
1217
1218 map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_ACL);
1219 if (aiter == attrs.end()) {
1220 return -ENOENT;
1221 }
1222
1223 ret = decode_bl(aiter->second, policy);
1224 if (ret < 0) {
1225 ldout(store->ctx(),0) << "failed to decode RGWAccessControlPolicy" << dendl;
1226 }
1227
1228 return ret;
1229 }
1230
1231
1232 int RGWBucketAdminOp::get_policy(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
1233 RGWAccessControlPolicy& policy)
1234 {
1235 RGWBucket bucket;
1236
1237 int ret = bucket.init(store, op_state, null_yield);
1238 if (ret < 0)
1239 return ret;
1240
1241 ret = bucket.get_policy(op_state, policy, null_yield);
1242 if (ret < 0)
1243 return ret;
1244
1245 return 0;
1246 }
1247
1248 /* Wrappers to facilitate RESTful interface */
1249
1250
1251 int RGWBucketAdminOp::get_policy(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
1252 RGWFormatterFlusher& flusher)
1253 {
1254 RGWAccessControlPolicy policy(store->ctx());
1255
1256 int ret = get_policy(store, op_state, policy);
1257 if (ret < 0)
1258 return ret;
1259
1260 Formatter *formatter = flusher.get_formatter();
1261
1262 flusher.start(0);
1263
1264 formatter->open_object_section("policy");
1265 policy.dump(formatter);
1266 formatter->close_section();
1267
1268 flusher.flush();
1269
1270 return 0;
1271 }
1272
1273 int RGWBucketAdminOp::dump_s3_policy(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
1274 ostream& os)
1275 {
1276 RGWAccessControlPolicy_S3 policy(store->ctx());
1277
1278 int ret = get_policy(store, op_state, policy);
1279 if (ret < 0)
1280 return ret;
1281
1282 policy.to_xml(os);
1283
1284 return 0;
1285 }
1286
1287 int RGWBucketAdminOp::unlink(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state)
1288 {
1289 RGWBucket bucket;
1290
1291 int ret = bucket.init(store, op_state, null_yield);
1292 if (ret < 0)
1293 return ret;
1294
1295 return bucket.unlink(op_state, null_yield);
1296 }
1297
1298 int RGWBucketAdminOp::link(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state, string *err)
1299 {
1300 RGWBucket bucket;
1301 map<string, bufferlist> attrs;
1302
1303 int ret = bucket.init(store, op_state, null_yield, err, &attrs);
1304 if (ret < 0)
1305 return ret;
1306
1307 return bucket.link(op_state, null_yield, attrs, err);
1308
1309 }
1310
1311 int RGWBucketAdminOp::chown(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state, const string& marker, string *err)
1312 {
1313 RGWBucket bucket;
1314 map<string, bufferlist> attrs;
1315
1316 int ret = bucket.init(store, op_state, null_yield, err, &attrs);
1317 if (ret < 0)
1318 return ret;
1319
1320 ret = bucket.link(op_state, null_yield, attrs, err);
1321 if (ret < 0)
1322 return ret;
1323
1324 return bucket.chown(op_state, marker, null_yield, err);
1325
1326 }
1327
1328 int RGWBucketAdminOp::check_index(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
1329 RGWFormatterFlusher& flusher, optional_yield y)
1330 {
1331 int ret;
1332 map<RGWObjCategory, RGWStorageStats> existing_stats;
1333 map<RGWObjCategory, RGWStorageStats> calculated_stats;
1334
1335
1336 RGWBucket bucket;
1337
1338 ret = bucket.init(store, op_state, null_yield);
1339 if (ret < 0)
1340 return ret;
1341
1342 Formatter *formatter = flusher.get_formatter();
1343 flusher.start(0);
1344
1345 ret = bucket.check_bad_index_multipart(op_state, flusher);
1346 if (ret < 0)
1347 return ret;
1348
1349 ret = bucket.check_object_index(op_state, flusher, y);
1350 if (ret < 0)
1351 return ret;
1352
1353 ret = bucket.check_index(op_state, existing_stats, calculated_stats);
1354 if (ret < 0)
1355 return ret;
1356
1357 dump_index_check(existing_stats, calculated_stats, formatter);
1358 flusher.flush();
1359
1360 return 0;
1361 }
1362
1363 int RGWBucketAdminOp::remove_bucket(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
1364 optional_yield y, bool bypass_gc, bool keep_index_consistent)
1365 {
1366 RGWBucket bucket;
1367
1368 int ret = bucket.init(store, op_state, y);
1369 if (ret < 0)
1370 return ret;
1371
1372 std::string err_msg;
1373 ret = bucket.remove(op_state, y, bypass_gc, keep_index_consistent, &err_msg);
1374 if (!err_msg.empty()) {
1375 lderr(store->ctx()) << "ERROR: " << err_msg << dendl;
1376 }
1377 return ret;
1378 }
1379
1380 int RGWBucketAdminOp::remove_object(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state)
1381 {
1382 RGWBucket bucket;
1383
1384 int ret = bucket.init(store, op_state, null_yield);
1385 if (ret < 0)
1386 return ret;
1387
1388 return bucket.remove_object(op_state);
1389 }
1390
1391 int RGWBucketAdminOp::sync_bucket(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state, string *err_msg)
1392 {
1393 RGWBucket bucket;
1394 map<string, bufferlist> attrs;
1395 int ret = bucket.init(store, op_state, null_yield, err_msg, &attrs);
1396 if (ret < 0)
1397 {
1398 return ret;
1399 }
1400 return bucket.sync(op_state, &attrs, err_msg);
1401 }
1402
1403 static int bucket_stats(rgw::sal::RGWRadosStore *store, const std::string& tenant_name, std::string& bucket_name, Formatter *formatter)
1404 {
1405 RGWBucketInfo bucket_info;
1406 map<RGWObjCategory, RGWStorageStats> stats;
1407 map<string, bufferlist> attrs;
1408
1409 real_time mtime;
1410 int r = store->getRados()->get_bucket_info(store->svc(), tenant_name, bucket_name, bucket_info, &mtime, null_yield, &attrs);
1411 if (r < 0)
1412 return r;
1413
1414 rgw_bucket& bucket = bucket_info.bucket;
1415
1416 string bucket_ver, master_ver;
1417 string max_marker;
1418 int ret = store->getRados()->get_bucket_stats(bucket_info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, &max_marker);
1419 if (ret < 0) {
1420 cerr << "error getting bucket stats ret=" << ret << std::endl;
1421 return ret;
1422 }
1423
1424 utime_t ut(mtime);
1425 utime_t ctime_ut(bucket_info.creation_time);
1426
1427 formatter->open_object_section("stats");
1428 formatter->dump_string("bucket", bucket.name);
1429 formatter->dump_int("num_shards", bucket_info.num_shards);
1430 formatter->dump_string("tenant", bucket.tenant);
1431 formatter->dump_string("zonegroup", bucket_info.zonegroup);
1432 formatter->dump_string("placement_rule", bucket_info.placement_rule.to_str());
1433 ::encode_json("explicit_placement", bucket.explicit_placement, formatter);
1434 formatter->dump_string("id", bucket.bucket_id);
1435 formatter->dump_string("marker", bucket.marker);
1436 formatter->dump_stream("index_type") << bucket_info.index_type;
1437 ::encode_json("owner", bucket_info.owner, formatter);
1438 formatter->dump_string("ver", bucket_ver);
1439 formatter->dump_string("master_ver", master_ver);
1440 ut.gmtime(formatter->dump_stream("mtime"));
1441 ctime_ut.gmtime(formatter->dump_stream("creation_time"));
1442 formatter->dump_string("max_marker", max_marker);
1443 dump_bucket_usage(stats, formatter);
1444 encode_json("bucket_quota", bucket_info.quota, formatter);
1445
1446 // bucket tags
1447 auto iter = attrs.find(RGW_ATTR_TAGS);
1448 if (iter != attrs.end()) {
1449 RGWObjTagSet_S3 tagset;
1450 bufferlist::const_iterator piter{&iter->second};
1451 try {
1452 tagset.decode(piter);
1453 tagset.dump(formatter);
1454 } catch (buffer::error& err) {
1455 cerr << "ERROR: caught buffer:error, couldn't decode TagSet" << std::endl;
1456 }
1457 }
1458
1459 // TODO: bucket CORS
1460 // TODO: bucket LC
1461 formatter->close_section();
1462
1463 return 0;
1464 }
1465
1466 int RGWBucketAdminOp::limit_check(rgw::sal::RGWRadosStore *store,
1467 RGWBucketAdminOpState& op_state,
1468 const std::list<std::string>& user_ids,
1469 RGWFormatterFlusher& flusher,
1470 bool warnings_only)
1471 {
1472 int ret = 0;
1473 const size_t max_entries =
1474 store->ctx()->_conf->rgw_list_buckets_max_chunk;
1475
1476 const size_t safe_max_objs_per_shard =
1477 store->ctx()->_conf->rgw_safe_max_objects_per_shard;
1478
1479 uint16_t shard_warn_pct =
1480 store->ctx()->_conf->rgw_shard_warning_threshold;
1481 if (shard_warn_pct > 100)
1482 shard_warn_pct = 90;
1483
1484 Formatter *formatter = flusher.get_formatter();
1485 flusher.start(0);
1486
1487 formatter->open_array_section("users");
1488
1489 for (const auto& user_id : user_ids) {
1490
1491 formatter->open_object_section("user");
1492 formatter->dump_string("user_id", user_id);
1493 formatter->open_array_section("buckets");
1494
1495 string marker;
1496 rgw::sal::RGWBucketList buckets;
1497 do {
1498 rgw::sal::RGWRadosUser user(store, rgw_user(user_id));
1499
1500 ret = user.list_buckets(marker, string(), max_entries, false, buckets);
1501
1502 if (ret < 0)
1503 return ret;
1504
1505 map<string, rgw::sal::RGWBucket*>& m_buckets = buckets.get_buckets();
1506
1507 for (const auto& iter : m_buckets) {
1508 auto bucket = iter.second;
1509 uint32_t num_shards = 1;
1510 uint64_t num_objects = 0;
1511
1512 /* need info for num_shards */
1513 RGWBucketInfo info;
1514
1515 marker = bucket->get_name(); /* Casey's location for marker update,
1516 * as we may now not reach the end of
1517 * the loop body */
1518
1519 ret = store->getRados()->get_bucket_info(store->svc(), bucket->get_tenant(),
1520 bucket->get_name(), info, nullptr,
1521 null_yield);
1522 if (ret < 0)
1523 continue;
1524
1525 /* need stats for num_entries */
1526 string bucket_ver, master_ver;
1527 std::map<RGWObjCategory, RGWStorageStats> stats;
1528 ret = store->getRados()->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver,
1529 &master_ver, stats, nullptr);
1530
1531 if (ret < 0)
1532 continue;
1533
1534 for (const auto& s : stats) {
1535 num_objects += s.second.num_objects;
1536 }
1537
1538 num_shards = info.num_shards;
1539 uint64_t objs_per_shard =
1540 (num_shards) ? num_objects/num_shards : num_objects;
1541 {
1542 bool warn = false;
1543 stringstream ss;
1544 if (objs_per_shard > safe_max_objs_per_shard) {
1545 double over =
1546 100 - (safe_max_objs_per_shard/objs_per_shard * 100);
1547 ss << boost::format("OVER %4f%%") % over;
1548 warn = true;
1549 } else {
1550 double fill_pct =
1551 objs_per_shard / safe_max_objs_per_shard * 100;
1552 if (fill_pct >= shard_warn_pct) {
1553 ss << boost::format("WARN %4f%%") % fill_pct;
1554 warn = true;
1555 } else {
1556 ss << "OK";
1557 }
1558 }
1559
1560 if (warn || (! warnings_only)) {
1561 formatter->open_object_section("bucket");
1562 formatter->dump_string("bucket", bucket->get_name());
1563 formatter->dump_string("tenant", bucket->get_tenant());
1564 formatter->dump_int("num_objects", num_objects);
1565 formatter->dump_int("num_shards", num_shards);
1566 formatter->dump_int("objects_per_shard", objs_per_shard);
1567 formatter->dump_string("fill_status", ss.str());
1568 formatter->close_section();
1569 }
1570 }
1571 }
1572 formatter->flush(cout);
1573 } while (buckets.is_truncated()); /* foreach: bucket */
1574
1575 formatter->close_section();
1576 formatter->close_section();
1577 formatter->flush(cout);
1578
1579 } /* foreach: user_id */
1580
1581 formatter->close_section();
1582 formatter->flush(cout);
1583
1584 return ret;
1585 } /* RGWBucketAdminOp::limit_check */
1586
1587 int RGWBucketAdminOp::info(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
1588 RGWFormatterFlusher& flusher)
1589 {
1590 int ret = 0;
1591 string bucket_name = op_state.get_bucket_name();
1592 Formatter *formatter = flusher.get_formatter();
1593 flusher.start(0);
1594
1595 CephContext *cct = store->ctx();
1596
1597 const size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
1598
1599 bool show_stats = op_state.will_fetch_stats();
1600 rgw_user user_id = op_state.get_user_id();
1601 if (op_state.is_user_op()) {
1602 formatter->open_array_section("buckets");
1603
1604 rgw::sal::RGWBucketList buckets;
1605 rgw::sal::RGWRadosUser user(store, op_state.get_user_id());
1606 string marker;
1607 bool is_truncated = false;
1608
1609 do {
1610 ret = user.list_buckets(marker, string(), max_entries, false, buckets);
1611 if (ret < 0)
1612 return ret;
1613
1614 map<string, rgw::sal::RGWBucket*>& m = buckets.get_buckets();
1615 map<string, rgw::sal::RGWBucket*>::iterator iter;
1616
1617 for (iter = m.begin(); iter != m.end(); ++iter) {
1618 std::string obj_name = iter->first;
1619 if (!bucket_name.empty() && bucket_name != obj_name) {
1620 continue;
1621 }
1622
1623 if (show_stats)
1624 bucket_stats(store, user_id.tenant, obj_name, formatter);
1625 else
1626 formatter->dump_string("bucket", obj_name);
1627
1628 marker = obj_name;
1629 }
1630
1631 flusher.flush();
1632 } while (is_truncated);
1633
1634 formatter->close_section();
1635 } else if (!bucket_name.empty()) {
1636 ret = bucket_stats(store, user_id.tenant, bucket_name, formatter);
1637 if (ret < 0) {
1638 return ret;
1639 }
1640 } else {
1641 void *handle = nullptr;
1642 bool truncated = true;
1643
1644 formatter->open_array_section("buckets");
1645 ret = store->ctl()->meta.mgr->list_keys_init("bucket", &handle);
1646 while (ret == 0 && truncated) {
1647 std::list<std::string> buckets;
1648 const int max_keys = 1000;
1649 ret = store->ctl()->meta.mgr->list_keys_next(handle, max_keys, buckets,
1650 &truncated);
1651 for (auto& bucket_name : buckets) {
1652 if (show_stats)
1653 bucket_stats(store, user_id.tenant, bucket_name, formatter);
1654 else
1655 formatter->dump_string("bucket", bucket_name);
1656 }
1657 }
1658
1659 formatter->close_section();
1660 }
1661
1662 flusher.flush();
1663
1664 return 0;
1665 }
1666
1667 int RGWBucketAdminOp::set_quota(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state)
1668 {
1669 RGWBucket bucket;
1670
1671 int ret = bucket.init(store, op_state, null_yield);
1672 if (ret < 0)
1673 return ret;
1674 return bucket.set_quota(op_state);
1675 }
1676
1677 static int purge_bucket_instance(rgw::sal::RGWRadosStore *store, const RGWBucketInfo& bucket_info)
1678 {
1679 int max_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
1680 for (int i = 0; i < max_shards; i++) {
1681 RGWRados::BucketShard bs(store->getRados());
1682 int shard_id = (bucket_info.num_shards > 0 ? i : -1);
1683 int ret = bs.init(bucket_info.bucket, shard_id, nullptr);
1684 if (ret < 0) {
1685 cerr << "ERROR: bs.init(bucket=" << bucket_info.bucket << ", shard=" << shard_id
1686 << "): " << cpp_strerror(-ret) << std::endl;
1687 return ret;
1688 }
1689 ret = store->getRados()->bi_remove(bs);
1690 if (ret < 0) {
1691 cerr << "ERROR: failed to remove bucket index object: "
1692 << cpp_strerror(-ret) << std::endl;
1693 return ret;
1694 }
1695 }
1696 return 0;
1697 }
1698
1699 inline auto split_tenant(const std::string& bucket_name){
1700 auto p = bucket_name.find('/');
1701 if(p != std::string::npos) {
1702 return std::make_pair(bucket_name.substr(0,p), bucket_name.substr(p+1));
1703 }
1704 return std::make_pair(std::string(), bucket_name);
1705 }
1706
1707 using bucket_instance_ls = std::vector<RGWBucketInfo>;
1708 void get_stale_instances(rgw::sal::RGWRadosStore *store, const std::string& bucket_name,
1709 const vector<std::string>& lst,
1710 bucket_instance_ls& stale_instances)
1711 {
1712
1713 auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
1714
1715 bucket_instance_ls other_instances;
1716 // first iterate over the entries, and pick up the done buckets; these
1717 // are guaranteed to be stale
1718 for (const auto& bucket_instance : lst){
1719 RGWBucketInfo binfo;
1720 int r = store->getRados()->get_bucket_instance_info(obj_ctx, bucket_instance,
1721 binfo, nullptr,nullptr, null_yield);
1722 if (r < 0){
1723 // this can only happen if someone deletes us right when we're processing
1724 lderr(store->ctx()) << "Bucket instance is invalid: " << bucket_instance
1725 << cpp_strerror(-r) << dendl;
1726 continue;
1727 }
1728 if (binfo.reshard_status == cls_rgw_reshard_status::DONE)
1729 stale_instances.emplace_back(std::move(binfo));
1730 else {
1731 other_instances.emplace_back(std::move(binfo));
1732 }
1733 }
1734
1735 // Read the cur bucket info, if the bucket doesn't exist we can simply return
1736 // all the instances
1737 auto [tenant, bucket] = split_tenant(bucket_name);
1738 RGWBucketInfo cur_bucket_info;
1739 int r = store->getRados()->get_bucket_info(store->svc(), tenant, bucket, cur_bucket_info, nullptr, null_yield);
1740 if (r < 0) {
1741 if (r == -ENOENT) {
1742 // bucket doesn't exist, everything is stale then
1743 stale_instances.insert(std::end(stale_instances),
1744 std::make_move_iterator(other_instances.begin()),
1745 std::make_move_iterator(other_instances.end()));
1746 } else {
1747 // all bets are off if we can't read the bucket, just return the sureshot stale instances
1748 lderr(store->ctx()) << "error: reading bucket info for bucket: "
1749 << bucket << cpp_strerror(-r) << dendl;
1750 }
1751 return;
1752 }
1753
1754 // Don't process further in this round if bucket is resharding
1755 if (cur_bucket_info.reshard_status == cls_rgw_reshard_status::IN_PROGRESS)
1756 return;
1757
1758 other_instances.erase(std::remove_if(other_instances.begin(), other_instances.end(),
1759 [&cur_bucket_info](const RGWBucketInfo& b){
1760 return (b.bucket.bucket_id == cur_bucket_info.bucket.bucket_id ||
1761 b.bucket.bucket_id == cur_bucket_info.new_bucket_instance_id);
1762 }),
1763 other_instances.end());
1764
1765 // check if there are still instances left
1766 if (other_instances.empty()) {
1767 return;
1768 }
1769
1770 // Now we have a bucket with instances where the reshard status is none, this
1771 // usually happens when the reshard process couldn't complete, lockdown the
1772 // bucket and walk through these instances to make sure no one else interferes
1773 // with these
1774 {
1775 RGWBucketReshardLock reshard_lock(store, cur_bucket_info, true);
1776 r = reshard_lock.lock();
1777 if (r < 0) {
1778 // most likely bucket is under reshard, return the sureshot stale instances
1779 ldout(store->ctx(), 5) << __func__
1780 << "failed to take reshard lock; reshard underway likey" << dendl;
1781 return;
1782 }
1783 auto sg = make_scope_guard([&reshard_lock](){ reshard_lock.unlock();} );
1784 // this should be fast enough that we may not need to renew locks and check
1785 // exit status?, should we read the values of the instances again?
1786 stale_instances.insert(std::end(stale_instances),
1787 std::make_move_iterator(other_instances.begin()),
1788 std::make_move_iterator(other_instances.end()));
1789 }
1790
1791 return;
1792 }
1793
1794 static int process_stale_instances(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
1795 RGWFormatterFlusher& flusher,
1796 std::function<void(const bucket_instance_ls&,
1797 Formatter *,
1798 rgw::sal::RGWRadosStore*)> process_f)
1799 {
1800 std::string marker;
1801 void *handle;
1802 Formatter *formatter = flusher.get_formatter();
1803 static constexpr auto default_max_keys = 1000;
1804
1805 int ret = store->ctl()->meta.mgr->list_keys_init("bucket.instance", marker, &handle);
1806 if (ret < 0) {
1807 cerr << "ERROR: can't get key: " << cpp_strerror(-ret) << std::endl;
1808 return ret;
1809 }
1810
1811 bool truncated;
1812
1813 formatter->open_array_section("keys");
1814
1815 do {
1816 list<std::string> keys;
1817
1818 ret = store->ctl()->meta.mgr->list_keys_next(handle, default_max_keys, keys, &truncated);
1819 if (ret < 0 && ret != -ENOENT) {
1820 cerr << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << std::endl;
1821 return ret;
1822 } if (ret != -ENOENT) {
1823 // partition the list of buckets by buckets as the listing is un sorted,
1824 // since it would minimize the reads to bucket_info
1825 std::unordered_map<std::string, std::vector<std::string>> bucket_instance_map;
1826 for (auto &key: keys) {
1827 auto pos = key.find(':');
1828 if(pos != std::string::npos)
1829 bucket_instance_map[key.substr(0,pos)].emplace_back(std::move(key));
1830 }
1831 for (const auto& kv: bucket_instance_map) {
1832 bucket_instance_ls stale_lst;
1833 get_stale_instances(store, kv.first, kv.second, stale_lst);
1834 process_f(stale_lst, formatter, store);
1835 }
1836 }
1837 } while (truncated);
1838
1839 formatter->close_section(); // keys
1840 formatter->flush(cout);
1841 return 0;
1842 }
1843
1844 int RGWBucketAdminOp::list_stale_instances(rgw::sal::RGWRadosStore *store,
1845 RGWBucketAdminOpState& op_state,
1846 RGWFormatterFlusher& flusher)
1847 {
1848 auto process_f = [](const bucket_instance_ls& lst,
1849 Formatter *formatter,
1850 rgw::sal::RGWRadosStore*){
1851 for (const auto& binfo: lst)
1852 formatter->dump_string("key", binfo.bucket.get_key());
1853 };
1854 return process_stale_instances(store, op_state, flusher, process_f);
1855 }
1856
1857
1858 int RGWBucketAdminOp::clear_stale_instances(rgw::sal::RGWRadosStore *store,
1859 RGWBucketAdminOpState& op_state,
1860 RGWFormatterFlusher& flusher)
1861 {
1862 auto process_f = [](const bucket_instance_ls& lst,
1863 Formatter *formatter,
1864 rgw::sal::RGWRadosStore *store){
1865 for (const auto &binfo: lst) {
1866 int ret = purge_bucket_instance(store, binfo);
1867 if (ret == 0){
1868 auto md_key = "bucket.instance:" + binfo.bucket.get_key();
1869 ret = store->ctl()->meta.mgr->remove(md_key, null_yield);
1870 }
1871 formatter->open_object_section("delete_status");
1872 formatter->dump_string("bucket_instance", binfo.bucket.get_key());
1873 formatter->dump_int("status", -ret);
1874 formatter->close_section();
1875 }
1876 };
1877
1878 return process_stale_instances(store, op_state, flusher, process_f);
1879 }
1880
1881 static int fix_single_bucket_lc(rgw::sal::RGWRadosStore *store,
1882 const std::string& tenant_name,
1883 const std::string& bucket_name)
1884 {
1885 RGWBucketInfo bucket_info;
1886 map <std::string, bufferlist> bucket_attrs;
1887 int ret = store->getRados()->get_bucket_info(store->svc(), tenant_name, bucket_name,
1888 bucket_info, nullptr, null_yield, &bucket_attrs);
1889 if (ret < 0) {
1890 // TODO: Should we handle the case where the bucket could've been removed between
1891 // listing and fetching?
1892 return ret;
1893 }
1894
1895 return rgw::lc::fix_lc_shard_entry(store, bucket_info, bucket_attrs);
1896 }
1897
1898 static void format_lc_status(Formatter* formatter,
1899 const std::string& tenant_name,
1900 const std::string& bucket_name,
1901 int status)
1902 {
1903 formatter->open_object_section("bucket_entry");
1904 std::string entry = tenant_name.empty() ? bucket_name : tenant_name + "/" + bucket_name;
1905 formatter->dump_string("bucket", entry);
1906 formatter->dump_int("status", status);
1907 formatter->close_section(); // bucket_entry
1908 }
1909
1910 static void process_single_lc_entry(rgw::sal::RGWRadosStore *store,
1911 Formatter *formatter,
1912 const std::string& tenant_name,
1913 const std::string& bucket_name)
1914 {
1915 int ret = fix_single_bucket_lc(store, tenant_name, bucket_name);
1916 format_lc_status(formatter, tenant_name, bucket_name, -ret);
1917 }
1918
1919 int RGWBucketAdminOp::fix_lc_shards(rgw::sal::RGWRadosStore *store,
1920 RGWBucketAdminOpState& op_state,
1921 RGWFormatterFlusher& flusher)
1922 {
1923 std::string marker;
1924 void *handle;
1925 Formatter *formatter = flusher.get_formatter();
1926 static constexpr auto default_max_keys = 1000;
1927
1928 bool truncated;
1929 if (const std::string& bucket_name = op_state.get_bucket_name();
1930 ! bucket_name.empty()) {
1931 const rgw_user user_id = op_state.get_user_id();
1932 process_single_lc_entry(store, formatter, user_id.tenant, bucket_name);
1933 formatter->flush(cout);
1934 } else {
1935 int ret = store->ctl()->meta.mgr->list_keys_init("bucket", marker, &handle);
1936 if (ret < 0) {
1937 std::cerr << "ERROR: can't get key: " << cpp_strerror(-ret) << std::endl;
1938 return ret;
1939 }
1940
1941 {
1942 formatter->open_array_section("lc_fix_status");
1943 auto sg = make_scope_guard([&store, &handle, &formatter](){
1944 store->ctl()->meta.mgr->list_keys_complete(handle);
1945 formatter->close_section(); // lc_fix_status
1946 formatter->flush(cout);
1947 });
1948 do {
1949 list<std::string> keys;
1950 ret = store->ctl()->meta.mgr->list_keys_next(handle, default_max_keys, keys, &truncated);
1951 if (ret < 0 && ret != -ENOENT) {
1952 std::cerr << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << std::endl;
1953 return ret;
1954 } if (ret != -ENOENT) {
1955 for (const auto &key:keys) {
1956 auto [tenant_name, bucket_name] = split_tenant(key);
1957 process_single_lc_entry(store, formatter, tenant_name, bucket_name);
1958 }
1959 }
1960 formatter->flush(cout); // regularly flush every 1k entries
1961 } while (truncated);
1962 }
1963
1964 }
1965 return 0;
1966
1967 }
1968
1969 static bool has_object_expired(rgw::sal::RGWRadosStore *store,
1970 const RGWBucketInfo& bucket_info,
1971 const rgw_obj_key& key, utime_t& delete_at)
1972 {
1973 rgw_obj obj(bucket_info.bucket, key);
1974 bufferlist delete_at_bl;
1975
1976 int ret = rgw_object_get_attr(store, bucket_info, obj, RGW_ATTR_DELETE_AT, delete_at_bl, null_yield);
1977 if (ret < 0) {
1978 return false; // no delete at attr, proceed
1979 }
1980
1981 ret = decode_bl(delete_at_bl, delete_at);
1982 if (ret < 0) {
1983 return false; // failed to parse
1984 }
1985
1986 if (delete_at <= ceph_clock_now() && !delete_at.is_zero()) {
1987 return true;
1988 }
1989
1990 return false;
1991 }
1992
1993 static int fix_bucket_obj_expiry(rgw::sal::RGWRadosStore *store,
1994 const RGWBucketInfo& bucket_info,
1995 RGWFormatterFlusher& flusher, bool dry_run)
1996 {
1997 if (bucket_info.bucket.bucket_id == bucket_info.bucket.marker) {
1998 lderr(store->ctx()) << "Not a resharded bucket skipping" << dendl;
1999 return 0; // not a resharded bucket, move along
2000 }
2001
2002 Formatter *formatter = flusher.get_formatter();
2003 formatter->open_array_section("expired_deletion_status");
2004 auto sg = make_scope_guard([&formatter] {
2005 formatter->close_section();
2006 formatter->flush(std::cout);
2007 });
2008
2009 RGWRados::Bucket target(store->getRados(), bucket_info);
2010 RGWRados::Bucket::List list_op(&target);
2011
2012 list_op.params.list_versions = bucket_info.versioned();
2013 list_op.params.allow_unordered = true;
2014
2015 bool is_truncated {false};
2016 do {
2017 std::vector<rgw_bucket_dir_entry> objs;
2018
2019 int ret = list_op.list_objects(listing_max_entries, &objs, nullptr,
2020 &is_truncated, null_yield);
2021 if (ret < 0) {
2022 lderr(store->ctx()) << "ERROR failed to list objects in the bucket" << dendl;
2023 return ret;
2024 }
2025 for (const auto& obj : objs) {
2026 rgw_obj_key key(obj.key);
2027 utime_t delete_at;
2028 if (has_object_expired(store, bucket_info, key, delete_at)) {
2029 formatter->open_object_section("object_status");
2030 formatter->dump_string("object", key.name);
2031 formatter->dump_stream("delete_at") << delete_at;
2032
2033 if (!dry_run) {
2034 ret = rgw_remove_object(store, bucket_info, bucket_info.bucket, key);
2035 formatter->dump_int("status", ret);
2036 }
2037
2038 formatter->close_section(); // object_status
2039 }
2040 }
2041 formatter->flush(cout); // regularly flush every 1k entries
2042 } while (is_truncated);
2043
2044 return 0;
2045 }
2046
2047 int RGWBucketAdminOp::fix_obj_expiry(rgw::sal::RGWRadosStore *store,
2048 RGWBucketAdminOpState& op_state,
2049 RGWFormatterFlusher& flusher, bool dry_run)
2050 {
2051 RGWBucket admin_bucket;
2052 int ret = admin_bucket.init(store, op_state, null_yield);
2053 if (ret < 0) {
2054 lderr(store->ctx()) << "failed to initialize bucket" << dendl;
2055 return ret;
2056 }
2057
2058 return fix_bucket_obj_expiry(store, admin_bucket.get_bucket_info(), flusher, dry_run);
2059 }
2060
2061 void rgw_data_change::dump(Formatter *f) const
2062 {
2063 string type;
2064 switch (entity_type) {
2065 case ENTITY_TYPE_BUCKET:
2066 type = "bucket";
2067 break;
2068 default:
2069 type = "unknown";
2070 }
2071 encode_json("entity_type", type, f);
2072 encode_json("key", key, f);
2073 utime_t ut(timestamp);
2074 encode_json("timestamp", ut, f);
2075 }
2076
2077 void rgw_data_change::decode_json(JSONObj *obj) {
2078 string s;
2079 JSONDecoder::decode_json("entity_type", s, obj);
2080 if (s == "bucket") {
2081 entity_type = ENTITY_TYPE_BUCKET;
2082 } else {
2083 entity_type = ENTITY_TYPE_UNKNOWN;
2084 }
2085 JSONDecoder::decode_json("key", key, obj);
2086 utime_t ut;
2087 JSONDecoder::decode_json("timestamp", ut, obj);
2088 timestamp = ut.to_real_time();
2089 }
2090
2091 void rgw_data_change_log_entry::dump(Formatter *f) const
2092 {
2093 encode_json("log_id", log_id, f);
2094 utime_t ut(log_timestamp);
2095 encode_json("log_timestamp", ut, f);
2096 encode_json("entry", entry, f);
2097 }
2098
2099 void rgw_data_change_log_entry::decode_json(JSONObj *obj) {
2100 JSONDecoder::decode_json("log_id", log_id, obj);
2101 utime_t ut;
2102 JSONDecoder::decode_json("log_timestamp", ut, obj);
2103 log_timestamp = ut.to_real_time();
2104 JSONDecoder::decode_json("entry", entry, obj);
2105 }
2106
2107
2108 RGWDataChangesLog::RGWDataChangesLog(RGWSI_Zone *zone_svc, RGWSI_Cls *cls_svc)
2109 : cct(zone_svc->ctx()), changes(cct->_conf->rgw_data_log_changes_size)
2110 {
2111 svc.zone = zone_svc;
2112 svc.cls = cls_svc;
2113
2114 num_shards = cct->_conf->rgw_data_log_num_shards;
2115
2116 oids = new string[num_shards];
2117
2118 string prefix = cct->_conf->rgw_data_log_obj_prefix;
2119
2120 if (prefix.empty()) {
2121 prefix = "data_log";
2122 }
2123
2124 for (int i = 0; i < num_shards; i++) {
2125 char buf[16];
2126 snprintf(buf, sizeof(buf), "%s.%d", prefix.c_str(), i);
2127 oids[i] = buf;
2128 }
2129
2130 renew_thread = new ChangesRenewThread(cct, this);
2131 renew_thread->create("rgw_dt_lg_renew");
2132 }
2133
2134 int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
2135 const string& name = bs.bucket.name;
2136 int shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
2137 uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards;
2138
2139 return (int)r;
2140 }
2141
2142 int RGWDataChangesLog::renew_entries()
2143 {
2144 if (!svc.zone->need_to_log_data())
2145 return 0;
2146
2147 /* we can't keep the bucket name as part of the cls_log_entry, and we need
2148 * it later, so we keep two lists under the map */
2149 map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > > m;
2150
2151 lock.lock();
2152 map<rgw_bucket_shard, bool> entries;
2153 entries.swap(cur_cycle);
2154 lock.unlock();
2155
2156 map<rgw_bucket_shard, bool>::iterator iter;
2157 string section;
2158 real_time ut = real_clock::now();
2159 for (iter = entries.begin(); iter != entries.end(); ++iter) {
2160 const rgw_bucket_shard& bs = iter->first;
2161
2162 int index = choose_oid(bs);
2163
2164 cls_log_entry entry;
2165
2166 rgw_data_change change;
2167 bufferlist bl;
2168 change.entity_type = ENTITY_TYPE_BUCKET;
2169 change.key = bs.get_key();
2170 change.timestamp = ut;
2171 encode(change, bl);
2172
2173 svc.cls->timelog.prepare_entry(entry, ut, section, change.key, bl);
2174
2175 m[index].first.push_back(bs);
2176 m[index].second.emplace_back(std::move(entry));
2177 }
2178
2179 map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > >::iterator miter;
2180 for (miter = m.begin(); miter != m.end(); ++miter) {
2181 list<cls_log_entry>& entries = miter->second.second;
2182
2183 real_time now = real_clock::now();
2184
2185 int ret = svc.cls->timelog.add(oids[miter->first], entries, nullptr, true, null_yield);
2186 if (ret < 0) {
2187 /* we don't really need to have a special handling for failed cases here,
2188 * as this is just an optimization. */
2189 lderr(cct) << "ERROR: svc.cls->timelog.add() returned " << ret << dendl;
2190 return ret;
2191 }
2192
2193 real_time expiration = now;
2194 expiration += make_timespan(cct->_conf->rgw_data_log_window);
2195
2196 list<rgw_bucket_shard>& buckets = miter->second.first;
2197 list<rgw_bucket_shard>::iterator liter;
2198 for (liter = buckets.begin(); liter != buckets.end(); ++liter) {
2199 update_renewed(*liter, expiration);
2200 }
2201 }
2202
2203 return 0;
2204 }
2205
2206 void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status)
2207 {
2208 ceph_assert(ceph_mutex_is_locked(lock));
2209 if (!changes.find(bs, status)) {
2210 status = ChangeStatusPtr(new ChangeStatus);
2211 changes.add(bs, status);
2212 }
2213 }
2214
2215 void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs)
2216 {
2217 std::lock_guard l{lock};
2218 cur_cycle[bs] = true;
2219 }
2220
2221 void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, real_time& expiration)
2222 {
2223 std::lock_guard l{lock};
2224 ChangeStatusPtr status;
2225 _get_change(bs, status);
2226
2227 ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs.bucket.name << " shard_id=" << bs.shard_id << " expiration=" << expiration << dendl;
2228 status->cur_expiration = expiration;
2229 }
2230
2231 int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) {
2232 rgw_bucket_shard bs(bucket, shard_id);
2233
2234 return choose_oid(bs);
2235 }
2236
2237 bool RGWDataChangesLog::filter_bucket(const rgw_bucket& bucket, optional_yield y) const
2238 {
2239 if (!bucket_filter) {
2240 return true;
2241 }
2242
2243 return bucket_filter->filter(bucket, y);
2244 }
2245
2246 int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id) {
2247 auto& bucket = bucket_info.bucket;
2248
2249 if (!filter_bucket(bucket, null_yield)) {
2250 return 0;
2251 }
2252
2253 if (observer) {
2254 observer->on_bucket_changed(bucket.get_key());
2255 }
2256
2257 rgw_bucket_shard bs(bucket, shard_id);
2258
2259 int index = choose_oid(bs);
2260 mark_modified(index, bs);
2261
2262 lock.lock();
2263
2264 ChangeStatusPtr status;
2265 _get_change(bs, status);
2266
2267 lock.unlock();
2268
2269 real_time now = real_clock::now();
2270
2271 status->lock.lock();
2272
2273 ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
2274
2275 if (now < status->cur_expiration) {
2276 /* no need to send, recently completed */
2277 status->lock.unlock();
2278
2279 register_renew(bs);
2280 return 0;
2281 }
2282
2283 RefCountedCond *cond;
2284
2285 if (status->pending) {
2286 cond = status->cond;
2287
2288 ceph_assert(cond);
2289
2290 status->cond->get();
2291 status->lock.unlock();
2292
2293 int ret = cond->wait();
2294 cond->put();
2295 if (!ret) {
2296 register_renew(bs);
2297 }
2298 return ret;
2299 }
2300
2301 status->cond = new RefCountedCond;
2302 status->pending = true;
2303
2304 string& oid = oids[index];
2305 real_time expiration;
2306
2307 int ret;
2308
2309 do {
2310 status->cur_sent = now;
2311
2312 expiration = now;
2313 expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
2314
2315 status->lock.unlock();
2316
2317 bufferlist bl;
2318 rgw_data_change change;
2319 change.entity_type = ENTITY_TYPE_BUCKET;
2320 change.key = bs.get_key();
2321 change.timestamp = now;
2322 encode(change, bl);
2323 string section;
2324
2325 ldout(cct, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
2326
2327 ret = svc.cls->timelog.add(oid, now, section, change.key, bl, null_yield);
2328
2329 now = real_clock::now();
2330
2331 status->lock.lock();
2332
2333 } while (!ret && real_clock::now() > expiration);
2334
2335 cond = status->cond;
2336
2337 status->pending = false;
2338 status->cur_expiration = status->cur_sent; /* time of when operation started, not completed */
2339 status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window);
2340 status->cond = NULL;
2341 status->lock.unlock();
2342
2343 cond->done(ret);
2344 cond->put();
2345
2346 return ret;
2347 }
2348
2349 int RGWDataChangesLog::list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
2350 list<rgw_data_change_log_entry>& entries,
2351 const string& marker,
2352 string *out_marker,
2353 bool *truncated) {
2354 if (shard >= num_shards)
2355 return -EINVAL;
2356
2357 list<cls_log_entry> log_entries;
2358
2359 int ret = svc.cls->timelog.list(oids[shard], start_time, end_time,
2360 max_entries, log_entries, marker,
2361 out_marker, truncated, null_yield);
2362 if (ret < 0)
2363 return ret;
2364
2365 list<cls_log_entry>::iterator iter;
2366 for (iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
2367 rgw_data_change_log_entry log_entry;
2368 log_entry.log_id = iter->id;
2369 real_time rt = iter->timestamp.to_real_time();
2370 log_entry.log_timestamp = rt;
2371 auto liter = iter->data.cbegin();
2372 try {
2373 decode(log_entry.entry, liter);
2374 } catch (buffer::error& err) {
2375 lderr(cct) << "ERROR: failed to decode data changes log entry" << dendl;
2376 return -EIO;
2377 }
2378 entries.push_back(log_entry);
2379 }
2380
2381 return 0;
2382 }
2383
2384 int RGWDataChangesLog::list_entries(const real_time& start_time, const real_time& end_time, int max_entries,
2385 list<rgw_data_change_log_entry>& entries, LogMarker& marker, bool *ptruncated) {
2386 bool truncated;
2387 entries.clear();
2388
2389 for (; marker.shard < num_shards && (int)entries.size() < max_entries;
2390 marker.shard++, marker.marker.clear()) {
2391 int ret = list_entries(marker.shard, start_time, end_time, max_entries - entries.size(), entries,
2392 marker.marker, NULL, &truncated);
2393 if (ret == -ENOENT) {
2394 continue;
2395 }
2396 if (ret < 0) {
2397 return ret;
2398 }
2399 if (truncated) {
2400 *ptruncated = true;
2401 return 0;
2402 }
2403 }
2404
2405 *ptruncated = (marker.shard < num_shards);
2406
2407 return 0;
2408 }
2409
2410 int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info)
2411 {
2412 if (shard_id >= num_shards)
2413 return -EINVAL;
2414
2415 string oid = oids[shard_id];
2416
2417 cls_log_header header;
2418
2419 int ret = svc.cls->timelog.info(oid, &header, null_yield);
2420 if ((ret < 0) && (ret != -ENOENT))
2421 return ret;
2422
2423 info->marker = header.max_marker;
2424 info->last_update = header.max_time.to_real_time();
2425
2426 return 0;
2427 }
2428
2429 int RGWDataChangesLog::trim_entries(int shard_id, const real_time& start_time, const real_time& end_time,
2430 const string& start_marker, const string& end_marker)
2431 {
2432 if (shard_id > num_shards)
2433 return -EINVAL;
2434
2435 return svc.cls->timelog.trim(oids[shard_id], start_time, end_time,
2436 start_marker, end_marker, nullptr, null_yield);
2437 }
2438
2439 bool RGWDataChangesLog::going_down()
2440 {
2441 return down_flag;
2442 }
2443
2444 RGWDataChangesLog::~RGWDataChangesLog() {
2445 down_flag = true;
2446 renew_thread->stop();
2447 renew_thread->join();
2448 delete renew_thread;
2449 delete[] oids;
2450 }
2451
2452 void *RGWDataChangesLog::ChangesRenewThread::entry() {
2453 for (;;) {
2454 dout(2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl;
2455 int r = log->renew_entries();
2456 if (r < 0) {
2457 dout(0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl;
2458 }
2459
2460 if (log->going_down())
2461 break;
2462
2463 int interval = cct->_conf->rgw_data_log_window * 3 / 4;
2464 std::unique_lock locker{lock};
2465 cond.wait_for(locker, std::chrono::seconds(interval));
2466 }
2467
2468 return NULL;
2469 }
2470
2471 void RGWDataChangesLog::ChangesRenewThread::stop()
2472 {
2473 std::lock_guard l{lock};
2474 cond.notify_all();
2475 }
2476
2477 void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
2478 {
2479 auto key = bs.get_key();
2480 {
2481 std::shared_lock rl{modified_lock}; // read lock to check for existence
2482 auto shard = modified_shards.find(shard_id);
2483 if (shard != modified_shards.end() && shard->second.count(key)) {
2484 return;
2485 }
2486 }
2487
2488 std::unique_lock wl{modified_lock}; // write lock for insertion
2489 modified_shards[shard_id].insert(key);
2490 }
2491
2492 void RGWDataChangesLog::read_clear_modified(map<int, set<string> > &modified)
2493 {
2494 std::unique_lock wl{modified_lock};
2495 modified.swap(modified_shards);
2496 modified_shards.clear();
2497 }
2498
2499 void RGWBucketCompleteInfo::dump(Formatter *f) const {
2500 encode_json("bucket_info", info, f);
2501 encode_json("attrs", attrs, f);
2502 }
2503
2504 void RGWBucketCompleteInfo::decode_json(JSONObj *obj) {
2505 JSONDecoder::decode_json("bucket_info", info, obj);
2506 JSONDecoder::decode_json("attrs", attrs, obj);
2507 }
2508
2509 class RGWBucketMetadataHandler : public RGWBucketMetadataHandlerBase {
2510 public:
2511 struct Svc {
2512 RGWSI_Bucket *bucket{nullptr};
2513 } svc;
2514
2515 struct Ctl {
2516 RGWBucketCtl *bucket{nullptr};
2517 } ctl;
2518
2519 RGWBucketMetadataHandler() {}
2520
2521 void init(RGWSI_Bucket *bucket_svc,
2522 RGWBucketCtl *bucket_ctl) override {
2523 base_init(bucket_svc->ctx(),
2524 bucket_svc->get_ep_be_handler().get());
2525 svc.bucket = bucket_svc;
2526 ctl.bucket = bucket_ctl;
2527 }
2528
2529 string get_type() override { return "bucket"; }
2530
2531 RGWMetadataObject *get_meta_obj(JSONObj *jo, const obj_version& objv, const ceph::real_time& mtime) override {
2532 RGWBucketEntryPoint be;
2533
2534 try {
2535 decode_json_obj(be, jo);
2536 } catch (JSONDecoder::err& e) {
2537 return nullptr;
2538 }
2539
2540 return new RGWBucketEntryMetadataObject(be, objv, mtime);
2541 }
2542
2543 int do_get(RGWSI_MetaBackend_Handler::Op *op, string& entry, RGWMetadataObject **obj, optional_yield y) override {
2544 RGWObjVersionTracker ot;
2545 RGWBucketEntryPoint be;
2546
2547 real_time mtime;
2548 map<string, bufferlist> attrs;
2549
2550 RGWSI_Bucket_EP_Ctx ctx(op->ctx());
2551
2552 int ret = svc.bucket->read_bucket_entrypoint_info(ctx, entry, &be, &ot, &mtime, &attrs, y);
2553 if (ret < 0)
2554 return ret;
2555
2556 RGWBucketEntryMetadataObject *mdo = new RGWBucketEntryMetadataObject(be, ot.read_version, mtime, std::move(attrs));
2557
2558 *obj = mdo;
2559
2560 return 0;
2561 }
2562
2563 int do_put(RGWSI_MetaBackend_Handler::Op *op, string& entry,
2564 RGWMetadataObject *obj,
2565 RGWObjVersionTracker& objv_tracker,
2566 optional_yield y,
2567 RGWMDLogSyncType type) override;
2568
2569 int do_remove(RGWSI_MetaBackend_Handler::Op *op, string& entry, RGWObjVersionTracker& objv_tracker,
2570 optional_yield y) override {
2571 RGWBucketEntryPoint be;
2572
2573 real_time orig_mtime;
2574
2575 RGWSI_Bucket_EP_Ctx ctx(op->ctx());
2576
2577 int ret = svc.bucket->read_bucket_entrypoint_info(ctx, entry, &be, &objv_tracker, &orig_mtime, nullptr, y);
2578 if (ret < 0)
2579 return ret;
2580
2581 /*
2582 * We're unlinking the bucket but we don't want to update the entrypoint here - we're removing
2583 * it immediately and don't want to invalidate our cached objv_version or the bucket obj removal
2584 * will incorrectly fail.
2585 */
2586 ret = ctl.bucket->unlink_bucket(be.owner, be.bucket, y, false);
2587 if (ret < 0) {
2588 lderr(svc.bucket->ctx()) << "could not unlink bucket=" << entry << " owner=" << be.owner << dendl;
2589 }
2590
2591 ret = svc.bucket->remove_bucket_entrypoint_info(ctx, entry, &objv_tracker, y);
2592 if (ret < 0) {
2593 lderr(svc.bucket->ctx()) << "could not delete bucket=" << entry << dendl;
2594 }
2595 /* idempotent */
2596 return 0;
2597 }
2598
2599 int call(std::function<int(RGWSI_Bucket_EP_Ctx& ctx)> f) {
2600 return call(nullopt, f);
2601 }
2602
2603 int call(std::optional<RGWSI_MetaBackend_CtxParams> bectx_params,
2604 std::function<int(RGWSI_Bucket_EP_Ctx& ctx)> f) {
2605 return be_handler->call(bectx_params, [&](RGWSI_MetaBackend_Handler::Op *op) {
2606 RGWSI_Bucket_EP_Ctx ctx(op->ctx());
2607 return f(ctx);
2608 });
2609 }
2610 };
2611
2612 class RGWMetadataHandlerPut_Bucket : public RGWMetadataHandlerPut_SObj
2613 {
2614 RGWBucketMetadataHandler *bhandler;
2615 RGWBucketEntryMetadataObject *obj;
2616 public:
2617 RGWMetadataHandlerPut_Bucket(RGWBucketMetadataHandler *_handler,
2618 RGWSI_MetaBackend_Handler::Op *op, string& entry,
2619 RGWMetadataObject *_obj, RGWObjVersionTracker& objv_tracker,
2620 optional_yield y,
2621 RGWMDLogSyncType type) : RGWMetadataHandlerPut_SObj(_handler, op, entry, obj, objv_tracker, y, type),
2622 bhandler(_handler) {
2623 obj = static_cast<RGWBucketEntryMetadataObject *>(_obj);
2624 }
2625 ~RGWMetadataHandlerPut_Bucket() {}
2626
2627 void encode_obj(bufferlist *bl) override {
2628 obj->get_ep().encode(*bl);
2629 }
2630
2631 int put_checked() override;
2632 int put_post() override;
2633 };
2634
2635 int RGWBucketMetadataHandler::do_put(RGWSI_MetaBackend_Handler::Op *op, string& entry,
2636 RGWMetadataObject *obj,
2637 RGWObjVersionTracker& objv_tracker,
2638 optional_yield y,
2639 RGWMDLogSyncType type)
2640 {
2641 RGWMetadataHandlerPut_Bucket put_op(this, op, entry, obj, objv_tracker, y, type);
2642 return do_put_operate(&put_op);
2643 }
2644
2645 int RGWMetadataHandlerPut_Bucket::put_checked()
2646 {
2647 RGWBucketEntryMetadataObject *orig_obj = static_cast<RGWBucketEntryMetadataObject *>(old_obj);
2648
2649 if (orig_obj) {
2650 obj->set_pattrs(&orig_obj->get_attrs());
2651 }
2652
2653 auto& be = obj->get_ep();
2654 auto mtime = obj->get_mtime();
2655 auto pattrs = obj->get_pattrs();
2656
2657 RGWSI_Bucket_EP_Ctx ctx(op->ctx());
2658
2659 return bhandler->svc.bucket->store_bucket_entrypoint_info(ctx, entry,
2660 be,
2661 false,
2662 mtime,
2663 pattrs,
2664 &objv_tracker,
2665 y);
2666 }
2667
2668 int RGWMetadataHandlerPut_Bucket::put_post()
2669 {
2670 auto& be = obj->get_ep();
2671
2672 int ret;
2673
2674 /* link bucket */
2675 if (be.linked) {
2676 ret = bhandler->ctl.bucket->link_bucket(be.owner, be.bucket, be.creation_time, y, false);
2677 } else {
2678 ret = bhandler->ctl.bucket->unlink_bucket(be.owner, be.bucket, y, false);
2679 }
2680
2681 return ret;
2682 }
2683
2684 static void get_md5_digest(const RGWBucketEntryPoint *be, string& md5_digest) {
2685
2686 char md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
2687 unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
2688 bufferlist bl;
2689
2690 Formatter *f = new JSONFormatter(false);
2691 be->dump(f);
2692 f->flush(bl);
2693
2694 MD5 hash;
2695 hash.Update((const unsigned char *)bl.c_str(), bl.length());
2696 hash.Final(m);
2697
2698 buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, md5);
2699
2700 delete f;
2701
2702 md5_digest = md5;
2703 }
2704
2705 #define ARCHIVE_META_ATTR RGW_ATTR_PREFIX "zone.archive.info"
2706
2707 struct archive_meta_info {
2708 rgw_bucket orig_bucket;
2709
2710 bool from_attrs(CephContext *cct, map<string, bufferlist>& attrs) {
2711 auto iter = attrs.find(ARCHIVE_META_ATTR);
2712 if (iter == attrs.end()) {
2713 return false;
2714 }
2715
2716 auto bliter = iter->second.cbegin();
2717 try {
2718 decode(bliter);
2719 } catch (buffer::error& err) {
2720 ldout(cct, 0) << "ERROR: failed to decode archive meta info" << dendl;
2721 return false;
2722 }
2723
2724 return true;
2725 }
2726
2727 void store_in_attrs(map<string, bufferlist>& attrs) const {
2728 encode(attrs[ARCHIVE_META_ATTR]);
2729 }
2730
2731 void encode(bufferlist& bl) const {
2732 ENCODE_START(1, 1, bl);
2733 encode(orig_bucket, bl);
2734 ENCODE_FINISH(bl);
2735 }
2736
2737 void decode(bufferlist::const_iterator& bl) {
2738 DECODE_START(1, bl);
2739 decode(orig_bucket, bl);
2740 DECODE_FINISH(bl);
2741 }
2742 };
2743 WRITE_CLASS_ENCODER(archive_meta_info)
2744
2745 class RGWArchiveBucketMetadataHandler : public RGWBucketMetadataHandler {
2746 public:
2747 RGWArchiveBucketMetadataHandler() {}
2748
2749 int do_remove(RGWSI_MetaBackend_Handler::Op *op, string& entry, RGWObjVersionTracker& objv_tracker,
2750 optional_yield y) override {
2751 auto cct = svc.bucket->ctx();
2752
2753 RGWSI_Bucket_EP_Ctx ctx(op->ctx());
2754
2755 ldout(cct, 5) << "SKIP: bucket removal is not allowed on archive zone: bucket:" << entry << " ... proceeding to rename" << dendl;
2756
2757 string tenant_name, bucket_name;
2758 parse_bucket(entry, &tenant_name, &bucket_name);
2759 rgw_bucket entry_bucket;
2760 entry_bucket.tenant = tenant_name;
2761 entry_bucket.name = bucket_name;
2762
2763 real_time mtime;
2764
2765 /* read original entrypoint */
2766
2767 RGWBucketEntryPoint be;
2768 map<string, bufferlist> attrs;
2769 int ret = svc.bucket->read_bucket_entrypoint_info(ctx, entry, &be, &objv_tracker, &mtime, &attrs, y);
2770 if (ret < 0) {
2771 return ret;
2772 }
2773
2774 string bi_meta_name = RGWSI_Bucket::get_bi_meta_key(be.bucket);
2775
2776 /* read original bucket instance info */
2777
2778 map<string, bufferlist> attrs_m;
2779 ceph::real_time orig_mtime;
2780 RGWBucketInfo old_bi;
2781
2782 ret = ctl.bucket->read_bucket_instance_info(be.bucket, &old_bi, y, RGWBucketCtl::BucketInstance::GetParams()
2783 .set_mtime(&orig_mtime)
2784 .set_attrs(&attrs_m));
2785 if (ret < 0) {
2786 return ret;
2787 }
2788
2789 archive_meta_info ami;
2790
2791 if (!ami.from_attrs(svc.bucket->ctx(), attrs_m)) {
2792 ami.orig_bucket = old_bi.bucket;
2793 ami.store_in_attrs(attrs_m);
2794 }
2795
2796 /* generate a new bucket instance. We could have avoided this if we could just point a new
2797 * bucket entry point to the old bucket instance, however, due to limitation in the way
2798 * we index buckets under the user, bucket entrypoint and bucket instance of the same
2799 * bucket need to have the same name, so we need to copy the old bucket instance into
2800 * to a new entry with the new name
2801 */
2802
2803 string new_bucket_name;
2804
2805 RGWBucketInfo new_bi = old_bi;
2806 RGWBucketEntryPoint new_be = be;
2807
2808 string md5_digest;
2809
2810 get_md5_digest(&new_be, md5_digest);
2811 new_bucket_name = ami.orig_bucket.name + "-deleted-" + md5_digest;
2812
2813 new_bi.bucket.name = new_bucket_name;
2814 new_bi.objv_tracker.clear();
2815
2816 new_be.bucket.name = new_bucket_name;
2817
2818 ret = ctl.bucket->store_bucket_instance_info(be.bucket, new_bi, y, RGWBucketCtl::BucketInstance::PutParams()
2819 .set_exclusive(false)
2820 .set_mtime(orig_mtime)
2821 .set_attrs(&attrs_m)
2822 .set_orig_info(&old_bi));
2823 if (ret < 0) {
2824 ldout(cct, 0) << "ERROR: failed to put new bucket instance info for bucket=" << new_bi.bucket << " ret=" << ret << dendl;
2825 return ret;
2826 }
2827
2828 /* store a new entrypoint */
2829
2830 RGWObjVersionTracker ot;
2831 ot.generate_new_write_ver(cct);
2832
2833 ret = svc.bucket->store_bucket_entrypoint_info(ctx, RGWSI_Bucket::get_entrypoint_meta_key(new_be.bucket),
2834 new_be, true, mtime, &attrs, nullptr, y);
2835 if (ret < 0) {
2836 ldout(cct, 0) << "ERROR: failed to put new bucket entrypoint for bucket=" << new_be.bucket << " ret=" << ret << dendl;
2837 return ret;
2838 }
2839
2840 /* link new bucket */
2841
2842 ret = ctl.bucket->link_bucket(new_be.owner, new_be.bucket, new_be.creation_time, y, false);
2843 if (ret < 0) {
2844 ldout(cct, 0) << "ERROR: failed to link new bucket for bucket=" << new_be.bucket << " ret=" << ret << dendl;
2845 return ret;
2846 }
2847
2848 /* clean up old stuff */
2849
2850 ret = ctl.bucket->unlink_bucket(be.owner, entry_bucket, y, false);
2851 if (ret < 0) {
2852 lderr(cct) << "could not unlink bucket=" << entry << " owner=" << be.owner << dendl;
2853 }
2854
2855 // if (ret == -ECANCELED) it means that there was a race here, and someone
2856 // wrote to the bucket entrypoint just before we removed it. The question is
2857 // whether it was a newly created bucket entrypoint ... in which case we
2858 // should ignore the error and move forward, or whether it is a higher version
2859 // of the same bucket instance ... in which we should retry
2860 ret = svc.bucket->remove_bucket_entrypoint_info(ctx,
2861 RGWSI_Bucket::get_entrypoint_meta_key(be.bucket),
2862 &objv_tracker,
2863 y);
2864 if (ret < 0) {
2865 ldout(cct, 0) << "ERROR: failed to put new bucket entrypoint for bucket=" << new_be.bucket << " ret=" << ret << dendl;
2866 return ret;
2867 }
2868
2869 ret = ctl.bucket->remove_bucket_instance_info(be.bucket, old_bi, y);
2870 if (ret < 0) {
2871 lderr(cct) << "could not delete bucket=" << entry << dendl;
2872 }
2873
2874
2875 /* idempotent */
2876
2877 return 0;
2878 }
2879
2880 int do_put(RGWSI_MetaBackend_Handler::Op *op, string& entry,
2881 RGWMetadataObject *obj,
2882 RGWObjVersionTracker& objv_tracker,
2883 optional_yield y,
2884 RGWMDLogSyncType type) override {
2885 if (entry.find("-deleted-") != string::npos) {
2886 RGWObjVersionTracker ot;
2887 RGWMetadataObject *robj;
2888 int ret = do_get(op, entry, &robj, y);
2889 if (ret != -ENOENT) {
2890 if (ret < 0) {
2891 return ret;
2892 }
2893 ot.read_version = robj->get_version();
2894 delete robj;
2895
2896 ret = do_remove(op, entry, ot, y);
2897 if (ret < 0) {
2898 return ret;
2899 }
2900 }
2901 }
2902
2903 return RGWBucketMetadataHandler::do_put(op, entry, obj,
2904 objv_tracker, y, type);
2905 }
2906
2907 };
2908
2909 class RGWBucketInstanceMetadataHandler : public RGWBucketInstanceMetadataHandlerBase {
2910 int read_bucket_instance_entry(RGWSI_Bucket_BI_Ctx& ctx,
2911 const string& entry,
2912 RGWBucketCompleteInfo *bi,
2913 ceph::real_time *pmtime,
2914 optional_yield y) {
2915 return svc.bucket->read_bucket_instance_info(ctx,
2916 entry,
2917 &bi->info,
2918 pmtime, &bi->attrs,
2919 y);
2920 }
2921
2922 public:
2923 struct Svc {
2924 RGWSI_Zone *zone{nullptr};
2925 RGWSI_Bucket *bucket{nullptr};
2926 RGWSI_BucketIndex *bi{nullptr};
2927 } svc;
2928
2929 RGWBucketInstanceMetadataHandler() {}
2930
2931 void init(RGWSI_Zone *zone_svc,
2932 RGWSI_Bucket *bucket_svc,
2933 RGWSI_BucketIndex *bi_svc) {
2934 base_init(bucket_svc->ctx(),
2935 bucket_svc->get_bi_be_handler().get());
2936 svc.zone = zone_svc;
2937 svc.bucket = bucket_svc;
2938 svc.bi = bi_svc;
2939 }
2940
2941 string get_type() override { return "bucket.instance"; }
2942
2943 RGWMetadataObject *get_meta_obj(JSONObj *jo, const obj_version& objv, const ceph::real_time& mtime) override {
2944 RGWBucketCompleteInfo bci;
2945
2946 try {
2947 decode_json_obj(bci, jo);
2948 } catch (JSONDecoder::err& e) {
2949 return nullptr;
2950 }
2951
2952 return new RGWBucketInstanceMetadataObject(bci, objv, mtime);
2953 }
2954
2955 int do_get(RGWSI_MetaBackend_Handler::Op *op, string& entry, RGWMetadataObject **obj, optional_yield y) override {
2956 RGWBucketCompleteInfo bci;
2957 real_time mtime;
2958
2959 RGWSI_Bucket_BI_Ctx ctx(op->ctx());
2960
2961 int ret = svc.bucket->read_bucket_instance_info(ctx, entry, &bci.info, &mtime, &bci.attrs, y);
2962 if (ret < 0)
2963 return ret;
2964
2965 RGWBucketInstanceMetadataObject *mdo = new RGWBucketInstanceMetadataObject(bci, bci.info.objv_tracker.read_version, mtime);
2966
2967 *obj = mdo;
2968
2969 return 0;
2970 }
2971
2972 int do_put(RGWSI_MetaBackend_Handler::Op *op, string& entry,
2973 RGWMetadataObject *_obj, RGWObjVersionTracker& objv_tracker,
2974 optional_yield y,
2975 RGWMDLogSyncType sync_type) override;
2976
2977 int do_remove(RGWSI_MetaBackend_Handler::Op *op, string& entry, RGWObjVersionTracker& objv_tracker,
2978 optional_yield y) override {
2979 RGWBucketCompleteInfo bci;
2980
2981 RGWSI_Bucket_BI_Ctx ctx(op->ctx());
2982
2983 int ret = read_bucket_instance_entry(ctx, entry, &bci, nullptr, y);
2984 if (ret < 0 && ret != -ENOENT)
2985 return ret;
2986
2987 return svc.bucket->remove_bucket_instance_info(ctx, entry, bci.info, &bci.info.objv_tracker, y);
2988 }
2989
2990 int call(std::function<int(RGWSI_Bucket_BI_Ctx& ctx)> f) {
2991 return call(nullopt, f);
2992 }
2993
2994 int call(std::optional<RGWSI_MetaBackend_CtxParams> bectx_params,
2995 std::function<int(RGWSI_Bucket_BI_Ctx& ctx)> f) {
2996 return be_handler->call(bectx_params, [&](RGWSI_MetaBackend_Handler::Op *op) {
2997 RGWSI_Bucket_BI_Ctx ctx(op->ctx());
2998 return f(ctx);
2999 });
3000 }
3001 };
3002
3003 class RGWMetadataHandlerPut_BucketInstance : public RGWMetadataHandlerPut_SObj
3004 {
3005 CephContext *cct;
3006 RGWBucketInstanceMetadataHandler *bihandler;
3007 RGWBucketInstanceMetadataObject *obj;
3008 public:
3009 RGWMetadataHandlerPut_BucketInstance(CephContext *cct,
3010 RGWBucketInstanceMetadataHandler *_handler,
3011 RGWSI_MetaBackend_Handler::Op *_op, string& entry,
3012 RGWMetadataObject *_obj, RGWObjVersionTracker& objv_tracker,
3013 optional_yield y,
3014 RGWMDLogSyncType type) : RGWMetadataHandlerPut_SObj(_handler, _op, entry, obj, objv_tracker, y, type),
3015 bihandler(_handler) {
3016 obj = static_cast<RGWBucketInstanceMetadataObject *>(_obj);
3017
3018 auto& bci = obj->get_bci();
3019 obj->set_pattrs(&bci.attrs);
3020 }
3021
3022 void encode_obj(bufferlist *bl) override {
3023 obj->get_bucket_info().encode(*bl);
3024 }
3025
3026 int put_check() override;
3027 int put_checked() override;
3028 int put_post() override;
3029 };
3030
3031 int RGWBucketInstanceMetadataHandler::do_put(RGWSI_MetaBackend_Handler::Op *op,
3032 string& entry,
3033 RGWMetadataObject *obj,
3034 RGWObjVersionTracker& objv_tracker,
3035 optional_yield y,
3036 RGWMDLogSyncType type)
3037 {
3038 RGWMetadataHandlerPut_BucketInstance put_op(svc.bucket->ctx(), this, op, entry, obj,
3039 objv_tracker, y, type);
3040 return do_put_operate(&put_op);
3041 }
3042
3043 int RGWMetadataHandlerPut_BucketInstance::put_check()
3044 {
3045 int ret;
3046
3047 RGWBucketCompleteInfo& bci = obj->get_bci();
3048
3049 RGWBucketInstanceMetadataObject *orig_obj = static_cast<RGWBucketInstanceMetadataObject *>(old_obj);
3050
3051 RGWBucketCompleteInfo *old_bci = (orig_obj ? &orig_obj->get_bci() : nullptr);
3052
3053 bool exists = (!!orig_obj);
3054
3055 if (!exists || old_bci->info.bucket.bucket_id != bci.info.bucket.bucket_id) {
3056 /* a new bucket, we need to select a new bucket placement for it */
3057 string tenant_name;
3058 string bucket_name;
3059 string bucket_instance;
3060 parse_bucket(entry, &tenant_name, &bucket_name, &bucket_instance);
3061
3062 RGWZonePlacementInfo rule_info;
3063 bci.info.bucket.name = bucket_name;
3064 bci.info.bucket.bucket_id = bucket_instance;
3065 bci.info.bucket.tenant = tenant_name;
3066 // if the sync module never writes data, don't require the zone to specify all placement targets
3067 if (bihandler->svc.zone->sync_module_supports_writes()) {
3068 ret = bihandler->svc.zone->select_bucket_location_by_rule(bci.info.placement_rule, &rule_info);
3069 if (ret < 0) {
3070 ldout(cct, 0) << "ERROR: select_bucket_placement() returned " << ret << dendl;
3071 return ret;
3072 }
3073 }
3074 bci.info.index_type = rule_info.index_type;
3075 } else {
3076 /* existing bucket, keep its placement */
3077 bci.info.bucket.explicit_placement = old_bci->info.bucket.explicit_placement;
3078 bci.info.placement_rule = old_bci->info.placement_rule;
3079 }
3080
3081 /* record the read version (if any), store the new version */
3082 bci.info.objv_tracker.read_version = objv_tracker.read_version;
3083 bci.info.objv_tracker.write_version = objv_tracker.write_version;
3084
3085 return 0;
3086 }
3087
3088 int RGWMetadataHandlerPut_BucketInstance::put_checked()
3089 {
3090 RGWBucketInstanceMetadataObject *orig_obj = static_cast<RGWBucketInstanceMetadataObject *>(old_obj);
3091
3092 RGWBucketInfo *orig_info = (orig_obj ? &orig_obj->get_bucket_info() : nullptr);
3093
3094 auto& info = obj->get_bucket_info();
3095 auto mtime = obj->get_mtime();
3096 auto pattrs = obj->get_pattrs();
3097
3098 RGWSI_Bucket_BI_Ctx ctx(op->ctx());
3099
3100 return bihandler->svc.bucket->store_bucket_instance_info(ctx,
3101 entry,
3102 info,
3103 orig_info,
3104 false,
3105 mtime,
3106 pattrs,
3107 y);
3108 }
3109
3110 int RGWMetadataHandlerPut_BucketInstance::put_post()
3111 {
3112 RGWBucketCompleteInfo& bci = obj->get_bci();
3113
3114 objv_tracker = bci.info.objv_tracker;
3115
3116 int ret = bihandler->svc.bi->init_index(bci.info);
3117 if (ret < 0) {
3118 return ret;
3119 }
3120
3121 return STATUS_APPLIED;
3122 }
3123
3124 class RGWArchiveBucketInstanceMetadataHandler : public RGWBucketInstanceMetadataHandler {
3125 public:
3126 RGWArchiveBucketInstanceMetadataHandler() {}
3127
3128 int do_remove(RGWSI_MetaBackend_Handler::Op *op, string& entry, RGWObjVersionTracker& objv_tracker, optional_yield y) override {
3129 ldout(cct, 0) << "SKIP: bucket instance removal is not allowed on archive zone: bucket.instance:" << entry << dendl;
3130 return 0;
3131 }
3132 };
3133
3134 bool RGWBucketCtl::DataLogFilter::filter(const rgw_bucket& bucket, optional_yield y) const
3135 {
3136 return bucket_ctl->bucket_exports_data(bucket, null_yield);
3137 }
3138
3139 RGWBucketCtl::RGWBucketCtl(RGWSI_Zone *zone_svc,
3140 RGWSI_Bucket *bucket_svc,
3141 RGWSI_Bucket_Sync *bucket_sync_svc,
3142 RGWSI_BucketIndex *bi_svc) : cct(zone_svc->ctx()),
3143 datalog_filter(this)
3144 {
3145 svc.zone = zone_svc;
3146 svc.bucket = bucket_svc;
3147 svc.bucket_sync = bucket_sync_svc;
3148 svc.bi = bi_svc;
3149 }
3150
3151 void RGWBucketCtl::init(RGWUserCtl *user_ctl,
3152 RGWBucketMetadataHandler *_bm_handler,
3153 RGWBucketInstanceMetadataHandler *_bmi_handler,
3154 RGWDataChangesLog *datalog)
3155 {
3156 ctl.user = user_ctl;
3157
3158 bm_handler = _bm_handler;
3159 bmi_handler = _bmi_handler;
3160
3161 bucket_be_handler = bm_handler->get_be_handler();
3162 bi_be_handler = bmi_handler->get_be_handler();
3163
3164 datalog->set_bucket_filter(&datalog_filter);
3165 }
3166
3167 int RGWBucketCtl::call(std::function<int(RGWSI_Bucket_X_Ctx& ctx)> f) {
3168 return bm_handler->call([&](RGWSI_Bucket_EP_Ctx& ep_ctx) {
3169 return bmi_handler->call([&](RGWSI_Bucket_BI_Ctx& bi_ctx) {
3170 RGWSI_Bucket_X_Ctx ctx{ep_ctx, bi_ctx};
3171 return f(ctx);
3172 });
3173 });
3174 }
3175
3176 int RGWBucketCtl::read_bucket_entrypoint_info(const rgw_bucket& bucket,
3177 RGWBucketEntryPoint *info,
3178 optional_yield y,
3179 const Bucket::GetParams& params)
3180 {
3181 return bm_handler->call(params.bectx_params, [&](RGWSI_Bucket_EP_Ctx& ctx) {
3182 return svc.bucket->read_bucket_entrypoint_info(ctx,
3183 RGWSI_Bucket::get_entrypoint_meta_key(bucket),
3184 info,
3185 params.objv_tracker,
3186 params.mtime,
3187 params.attrs,
3188 y,
3189 params.cache_info,
3190 params.refresh_version);
3191 });
3192 }
3193
3194 int RGWBucketCtl::store_bucket_entrypoint_info(const rgw_bucket& bucket,
3195 RGWBucketEntryPoint& info,
3196 optional_yield y,
3197 const Bucket::PutParams& params)
3198 {
3199 return bm_handler->call([&](RGWSI_Bucket_EP_Ctx& ctx) {
3200 return svc.bucket->store_bucket_entrypoint_info(ctx,
3201 RGWSI_Bucket::get_entrypoint_meta_key(bucket),
3202 info,
3203 params.exclusive,
3204 params.mtime,
3205 params.attrs,
3206 params.objv_tracker,
3207 y);
3208 });
3209 }
3210
3211 int RGWBucketCtl::remove_bucket_entrypoint_info(const rgw_bucket& bucket,
3212 optional_yield y,
3213 const Bucket::RemoveParams& params)
3214 {
3215 return bm_handler->call([&](RGWSI_Bucket_EP_Ctx& ctx) {
3216 return svc.bucket->remove_bucket_entrypoint_info(ctx,
3217 RGWSI_Bucket::get_entrypoint_meta_key(bucket),
3218 params.objv_tracker,
3219 y);
3220 });
3221 }
3222
3223 int RGWBucketCtl::read_bucket_instance_info(const rgw_bucket& bucket,
3224 RGWBucketInfo *info,
3225 optional_yield y,
3226 const BucketInstance::GetParams& params)
3227 {
3228 int ret = bmi_handler->call(params.bectx_params, [&](RGWSI_Bucket_BI_Ctx& ctx) {
3229 return svc.bucket->read_bucket_instance_info(ctx,
3230 RGWSI_Bucket::get_bi_meta_key(bucket),
3231 info,
3232 params.mtime,
3233 params.attrs,
3234 y,
3235 params.cache_info,
3236 params.refresh_version);
3237 });
3238
3239 if (ret < 0) {
3240 return ret;
3241 }
3242
3243 if (params.objv_tracker) {
3244 *params.objv_tracker = info->objv_tracker;
3245 }
3246
3247 return 0;
3248 }
3249
3250 int RGWBucketCtl::read_bucket_info(const rgw_bucket& bucket,
3251 RGWBucketInfo *info,
3252 optional_yield y,
3253 const BucketInstance::GetParams& params,
3254 RGWObjVersionTracker *ep_objv_tracker)
3255 {
3256 const rgw_bucket *b = &bucket;
3257
3258 std::optional<RGWBucketEntryPoint> ep;
3259
3260 if (b->bucket_id.empty()) {
3261 ep.emplace();
3262
3263 int r = read_bucket_entrypoint_info(*b, &(*ep), y, RGWBucketCtl::Bucket::GetParams()
3264 .set_bectx_params(params.bectx_params)
3265 .set_objv_tracker(ep_objv_tracker));
3266 if (r < 0) {
3267 return r;
3268 }
3269
3270 b = &ep->bucket;
3271 }
3272
3273 int ret = bmi_handler->call(params.bectx_params, [&](RGWSI_Bucket_BI_Ctx& ctx) {
3274 return svc.bucket->read_bucket_instance_info(ctx,
3275 RGWSI_Bucket::get_bi_meta_key(*b),
3276 info,
3277 params.mtime,
3278 params.attrs,
3279 y,
3280 params.cache_info,
3281 params.refresh_version);
3282 });
3283
3284 if (ret < 0) {
3285 return ret;
3286 }
3287
3288 if (params.objv_tracker) {
3289 *params.objv_tracker = info->objv_tracker;
3290 }
3291
3292 return 0;
3293 }
3294
3295 int RGWBucketCtl::do_store_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx,
3296 const rgw_bucket& bucket,
3297 RGWBucketInfo& info,
3298 optional_yield y,
3299 const BucketInstance::PutParams& params)
3300 {
3301 if (params.objv_tracker) {
3302 info.objv_tracker = *params.objv_tracker;
3303 }
3304
3305 return svc.bucket->store_bucket_instance_info(ctx,
3306 RGWSI_Bucket::get_bi_meta_key(bucket),
3307 info,
3308 params.orig_info,
3309 params.exclusive,
3310 params.mtime,
3311 params.attrs,
3312 y);
3313 }
3314
3315 int RGWBucketCtl::store_bucket_instance_info(const rgw_bucket& bucket,
3316 RGWBucketInfo& info,
3317 optional_yield y,
3318 const BucketInstance::PutParams& params)
3319 {
3320 return bmi_handler->call([&](RGWSI_Bucket_BI_Ctx& ctx) {
3321 return do_store_bucket_instance_info(ctx, bucket, info, y, params);
3322 });
3323 }
3324
3325 int RGWBucketCtl::remove_bucket_instance_info(const rgw_bucket& bucket,
3326 RGWBucketInfo& info,
3327 optional_yield y,
3328 const BucketInstance::RemoveParams& params)
3329 {
3330 if (params.objv_tracker) {
3331 info.objv_tracker = *params.objv_tracker;
3332 }
3333
3334 return bmi_handler->call([&](RGWSI_Bucket_BI_Ctx& ctx) {
3335 return svc.bucket->remove_bucket_instance_info(ctx,
3336 RGWSI_Bucket::get_bi_meta_key(bucket),
3337 info,
3338 &info.objv_tracker,
3339 y);
3340 });
3341 }
3342
3343 int RGWBucketCtl::do_store_linked_bucket_info(RGWSI_Bucket_X_Ctx& ctx,
3344 RGWBucketInfo& info,
3345 RGWBucketInfo *orig_info,
3346 bool exclusive, real_time mtime,
3347 obj_version *pep_objv,
3348 map<string, bufferlist> *pattrs,
3349 bool create_entry_point,
3350 optional_yield y)
3351 {
3352 bool create_head = !info.has_instance_obj || create_entry_point;
3353
3354 int ret = svc.bucket->store_bucket_instance_info(ctx.bi,
3355 RGWSI_Bucket::get_bi_meta_key(info.bucket),
3356 info,
3357 orig_info,
3358 exclusive,
3359 mtime, pattrs,
3360 y);
3361 if (ret < 0) {
3362 return ret;
3363 }
3364
3365 if (!create_head)
3366 return 0; /* done! */
3367
3368 RGWBucketEntryPoint entry_point;
3369 entry_point.bucket = info.bucket;
3370 entry_point.owner = info.owner;
3371 entry_point.creation_time = info.creation_time;
3372 entry_point.linked = true;
3373 RGWObjVersionTracker ot;
3374 if (pep_objv && !pep_objv->tag.empty()) {
3375 ot.write_version = *pep_objv;
3376 } else {
3377 ot.generate_new_write_ver(cct);
3378 if (pep_objv) {
3379 *pep_objv = ot.write_version;
3380 }
3381 }
3382 ret = svc.bucket->store_bucket_entrypoint_info(ctx.ep,
3383 RGWSI_Bucket::get_entrypoint_meta_key(info.bucket),
3384 entry_point,
3385 exclusive,
3386 mtime,
3387 pattrs,
3388 &ot,
3389 y);
3390 if (ret < 0)
3391 return ret;
3392
3393 return 0;
3394 }
3395 int RGWBucketCtl::convert_old_bucket_info(RGWSI_Bucket_X_Ctx& ctx,
3396 const rgw_bucket& bucket,
3397 optional_yield y)
3398 {
3399 RGWBucketEntryPoint entry_point;
3400 real_time ep_mtime;
3401 RGWObjVersionTracker ot;
3402 map<string, bufferlist> attrs;
3403 RGWBucketInfo info;
3404 auto cct = svc.bucket->ctx();
3405
3406 ldout(cct, 10) << "RGWRados::convert_old_bucket_info(): bucket=" << bucket << dendl;
3407
3408 int ret = svc.bucket->read_bucket_entrypoint_info(ctx.ep,
3409 RGWSI_Bucket::get_entrypoint_meta_key(bucket),
3410 &entry_point, &ot, &ep_mtime, &attrs, y);
3411 if (ret < 0) {
3412 ldout(cct, 0) << "ERROR: get_bucket_entrypoint_info() returned " << ret << " bucket=" << bucket << dendl;
3413 return ret;
3414 }
3415
3416 if (!entry_point.has_bucket_info) {
3417 /* already converted! */
3418 return 0;
3419 }
3420
3421 info = entry_point.old_bucket_info;
3422
3423 ot.generate_new_write_ver(cct);
3424
3425 ret = do_store_linked_bucket_info(ctx, info, nullptr, false, ep_mtime, &ot.write_version, &attrs, true, y);
3426 if (ret < 0) {
3427 ldout(cct, 0) << "ERROR: failed to put_linked_bucket_info(): " << ret << dendl;
3428 return ret;
3429 }
3430
3431 return 0;
3432 }
3433
3434 int RGWBucketCtl::set_bucket_instance_attrs(RGWBucketInfo& bucket_info,
3435 map<string, bufferlist>& attrs,
3436 RGWObjVersionTracker *objv_tracker,
3437 optional_yield y)
3438 {
3439 return call([&](RGWSI_Bucket_X_Ctx& ctx) {
3440 rgw_bucket& bucket = bucket_info.bucket;
3441
3442 if (!bucket_info.has_instance_obj) {
3443 /* an old bucket object, need to convert it */
3444 int ret = convert_old_bucket_info(ctx, bucket, y);
3445 if (ret < 0) {
3446 ldout(cct, 0) << "ERROR: failed converting old bucket info: " << ret << dendl;
3447 return ret;
3448 }
3449 }
3450
3451 return do_store_bucket_instance_info(ctx.bi,
3452 bucket,
3453 bucket_info,
3454 y,
3455 BucketInstance::PutParams().set_attrs(&attrs)
3456 .set_objv_tracker(objv_tracker)
3457 .set_orig_info(&bucket_info));
3458 });
3459 }
3460
3461
3462 int RGWBucketCtl::link_bucket(const rgw_user& user_id,
3463 const rgw_bucket& bucket,
3464 ceph::real_time creation_time,
3465 optional_yield y,
3466 bool update_entrypoint,
3467 rgw_ep_info *pinfo)
3468 {
3469 return bm_handler->call([&](RGWSI_Bucket_EP_Ctx& ctx) {
3470 return do_link_bucket(ctx, user_id, bucket, creation_time, y,
3471 update_entrypoint, pinfo);
3472 });
3473 }
3474
3475 int RGWBucketCtl::do_link_bucket(RGWSI_Bucket_EP_Ctx& ctx,
3476 const rgw_user& user_id,
3477 const rgw_bucket& bucket,
3478 ceph::real_time creation_time,
3479 optional_yield y,
3480 bool update_entrypoint,
3481 rgw_ep_info *pinfo)
3482 {
3483 int ret;
3484
3485 RGWBucketEntryPoint ep;
3486 RGWObjVersionTracker ot;
3487 RGWObjVersionTracker& rot = (pinfo) ? pinfo->ep_objv : ot;
3488 map<string, bufferlist> attrs, *pattrs = nullptr;
3489 string meta_key;
3490
3491 if (update_entrypoint) {
3492 meta_key = RGWSI_Bucket::get_entrypoint_meta_key(bucket);
3493 if (pinfo) {
3494 ep = pinfo->ep;
3495 pattrs = &pinfo->attrs;
3496 } else {
3497 ret = svc.bucket->read_bucket_entrypoint_info(ctx,
3498 meta_key,
3499 &ep, &rot,
3500 nullptr, &attrs,
3501 y);
3502 if (ret < 0 && ret != -ENOENT) {
3503 ldout(cct, 0) << "ERROR: store->get_bucket_entrypoint_info() returned: "
3504 << cpp_strerror(-ret) << dendl;
3505 }
3506 pattrs = &attrs;
3507 }
3508 }
3509
3510 ret = ctl.user->add_bucket(user_id, bucket, creation_time);
3511 if (ret < 0) {
3512 ldout(cct, 0) << "ERROR: error adding bucket to user directory:"
3513 << " user=" << user_id
3514 << " bucket=" << bucket
3515 << " err=" << cpp_strerror(-ret)
3516 << dendl;
3517 goto done_err;
3518 }
3519
3520 if (!update_entrypoint)
3521 return 0;
3522
3523 ep.linked = true;
3524 ep.owner = user_id;
3525 ep.bucket = bucket;
3526 ret = svc.bucket->store_bucket_entrypoint_info(
3527 ctx, meta_key, ep, false, real_time(), pattrs, &rot, y);
3528 if (ret < 0)
3529 goto done_err;
3530
3531 return 0;
3532
3533 done_err:
3534 int r = do_unlink_bucket(ctx, user_id, bucket, y, true);
3535 if (r < 0) {
3536 ldout(cct, 0) << "ERROR: failed unlinking bucket on error cleanup: "
3537 << cpp_strerror(-r) << dendl;
3538 }
3539 return ret;
3540 }
3541
3542 int RGWBucketCtl::unlink_bucket(const rgw_user& user_id, const rgw_bucket& bucket, optional_yield y, bool update_entrypoint)
3543 {
3544 return bm_handler->call([&](RGWSI_Bucket_EP_Ctx& ctx) {
3545 return do_unlink_bucket(ctx, user_id, bucket, y, update_entrypoint);
3546 });
3547 }
3548
3549 int RGWBucketCtl::do_unlink_bucket(RGWSI_Bucket_EP_Ctx& ctx,
3550 const rgw_user& user_id,
3551 const rgw_bucket& bucket,
3552 optional_yield y,
3553 bool update_entrypoint)
3554 {
3555 int ret = ctl.user->remove_bucket(user_id, bucket);
3556 if (ret < 0) {
3557 ldout(cct, 0) << "ERROR: error removing bucket from directory: "
3558 << cpp_strerror(-ret)<< dendl;
3559 }
3560
3561 if (!update_entrypoint)
3562 return 0;
3563
3564 RGWBucketEntryPoint ep;
3565 RGWObjVersionTracker ot;
3566 map<string, bufferlist> attrs;
3567 string meta_key = RGWSI_Bucket::get_entrypoint_meta_key(bucket);
3568 ret = svc.bucket->read_bucket_entrypoint_info(ctx, meta_key, &ep, &ot, nullptr, &attrs, y);
3569 if (ret == -ENOENT)
3570 return 0;
3571 if (ret < 0)
3572 return ret;
3573
3574 if (!ep.linked)
3575 return 0;
3576
3577 if (ep.owner != user_id) {
3578 ldout(cct, 0) << "bucket entry point user mismatch, can't unlink bucket: " << ep.owner << " != " << user_id << dendl;
3579 return -EINVAL;
3580 }
3581
3582 ep.linked = false;
3583 return svc.bucket->store_bucket_entrypoint_info(ctx, meta_key, ep, false, real_time(), &attrs, &ot, y);
3584 }
3585
3586 int RGWBucketCtl::set_acl(ACLOwner& owner, rgw_bucket& bucket,
3587 RGWBucketInfo& bucket_info, bufferlist& bl,
3588 optional_yield y)
3589 {
3590 // set owner and acl
3591 bucket_info.owner = owner.get_id();
3592 std::map<std::string, bufferlist> attrs{{RGW_ATTR_ACL, bl}};
3593
3594 int r = store_bucket_instance_info(bucket, bucket_info, y,
3595 BucketInstance::PutParams().set_attrs(&attrs));
3596 if (r < 0) {
3597 cerr << "ERROR: failed to set bucket owner: " << cpp_strerror(-r) << std::endl;
3598 return r;
3599 }
3600
3601 return 0;
3602 }
3603
3604 // TODO: remove RGWRados dependency for bucket listing
3605 int RGWBucketCtl::chown(rgw::sal::RGWRadosStore *store, RGWBucketInfo& bucket_info,
3606 const rgw_user& user_id, const std::string& display_name,
3607 const std::string& marker, optional_yield y)
3608 {
3609 RGWObjectCtx obj_ctx(store);
3610 std::vector<rgw_bucket_dir_entry> objs;
3611 map<string, bool> common_prefixes;
3612
3613 RGWRados::Bucket target(store->getRados(), bucket_info);
3614 RGWRados::Bucket::List list_op(&target);
3615
3616 list_op.params.list_versions = true;
3617 list_op.params.allow_unordered = true;
3618 list_op.params.marker = marker;
3619
3620 bool is_truncated = false;
3621 int count = 0;
3622 int max_entries = 1000;
3623
3624 //Loop through objects and update object acls to point to bucket owner
3625
3626 do {
3627 objs.clear();
3628 int ret = list_op.list_objects(max_entries, &objs, &common_prefixes, &is_truncated, y);
3629 if (ret < 0) {
3630 ldout(store->ctx(), 0) << "ERROR: list objects failed: " << cpp_strerror(-ret) << dendl;
3631 return ret;
3632 }
3633
3634 list_op.params.marker = list_op.get_next_marker();
3635 count += objs.size();
3636
3637 for (const auto& obj : objs) {
3638
3639 rgw_obj r_obj(bucket_info.bucket, obj.key);
3640 RGWRados::Object op_target(store->getRados(), bucket_info, obj_ctx, r_obj);
3641 RGWRados::Object::Read read_op(&op_target);
3642
3643 map<string, bufferlist> attrs;
3644 read_op.params.attrs = &attrs;
3645 ret = read_op.prepare(y);
3646 if (ret < 0){
3647 ldout(store->ctx(), 0) << "ERROR: failed to read object " << obj.key.name << cpp_strerror(-ret) << dendl;
3648 continue;
3649 }
3650 const auto& aiter = attrs.find(RGW_ATTR_ACL);
3651 if (aiter == attrs.end()) {
3652 ldout(store->ctx(), 0) << "ERROR: no acls found for object " << obj.key.name << " .Continuing with next object." << dendl;
3653 continue;
3654 } else {
3655 bufferlist& bl = aiter->second;
3656 RGWAccessControlPolicy policy(store->ctx());
3657 ACLOwner owner;
3658 try {
3659 decode(policy, bl);
3660 owner = policy.get_owner();
3661 } catch (buffer::error& err) {
3662 ldout(store->ctx(), 0) << "ERROR: decode policy failed" << err << dendl;
3663 return -EIO;
3664 }
3665
3666 //Get the ACL from the policy
3667 RGWAccessControlList& acl = policy.get_acl();
3668
3669 //Remove grant that is set to old owner
3670 acl.remove_canon_user_grant(owner.get_id());
3671
3672 //Create a grant and add grant
3673 ACLGrant grant;
3674 grant.set_canon(user_id, display_name, RGW_PERM_FULL_CONTROL);
3675 acl.add_grant(&grant);
3676
3677 //Update the ACL owner to the new user
3678 owner.set_id(user_id);
3679 owner.set_name(display_name);
3680 policy.set_owner(owner);
3681
3682 bl.clear();
3683 encode(policy, bl);
3684
3685 obj_ctx.set_atomic(r_obj);
3686 ret = store->getRados()->set_attr(&obj_ctx, bucket_info, r_obj, RGW_ATTR_ACL, bl);
3687 if (ret < 0) {
3688 ldout(store->ctx(), 0) << "ERROR: modify attr failed " << cpp_strerror(-ret) << dendl;
3689 return ret;
3690 }
3691 }
3692 }
3693 cerr << count << " objects processed in " << bucket_info.bucket.name
3694 << ". Next marker " << list_op.params.marker.name << std::endl;
3695 } while(is_truncated);
3696 return 0;
3697 }
3698
3699 int RGWBucketCtl::read_bucket_stats(const rgw_bucket& bucket,
3700 RGWBucketEnt *result,
3701 optional_yield y)
3702 {
3703 return call([&](RGWSI_Bucket_X_Ctx& ctx) {
3704 return svc.bucket->read_bucket_stats(ctx, bucket, result, y);
3705 });
3706 }
3707
3708 int RGWBucketCtl::read_buckets_stats(map<string, RGWBucketEnt>& m,
3709 optional_yield y)
3710 {
3711 return call([&](RGWSI_Bucket_X_Ctx& ctx) {
3712 return svc.bucket->read_buckets_stats(ctx, m, y);
3713 });
3714 }
3715
3716 int RGWBucketCtl::sync_user_stats(const rgw_user& user_id,
3717 const RGWBucketInfo& bucket_info,
3718 RGWBucketEnt* pent)
3719 {
3720 RGWBucketEnt ent;
3721 if (!pent) {
3722 pent = &ent;
3723 }
3724 int r = svc.bi->read_stats(bucket_info, pent, null_yield);
3725 if (r < 0) {
3726 ldout(cct, 20) << __func__ << "(): failed to read bucket stats (r=" << r << ")" << dendl;
3727 return r;
3728 }
3729
3730 return ctl.user->flush_bucket_stats(user_id, *pent);
3731 }
3732
3733 int RGWBucketCtl::get_sync_policy_handler(std::optional<rgw_zone_id> zone,
3734 std::optional<rgw_bucket> bucket,
3735 RGWBucketSyncPolicyHandlerRef *phandler,
3736 optional_yield y)
3737 {
3738 int r = call([&](RGWSI_Bucket_X_Ctx& ctx) {
3739 return svc.bucket_sync->get_policy_handler(ctx, zone, bucket, phandler, y);
3740 });
3741 if (r < 0) {
3742 ldout(cct, 20) << __func__ << "(): failed to get policy handler for bucket=" << bucket << " (r=" << r << ")" << dendl;
3743 return r;
3744 }
3745 return 0;
3746 }
3747
3748 int RGWBucketCtl::bucket_exports_data(const rgw_bucket& bucket,
3749 optional_yield y)
3750 {
3751
3752 RGWBucketSyncPolicyHandlerRef handler;
3753
3754 int r = get_sync_policy_handler(std::nullopt, bucket, &handler, y);
3755 if (r < 0) {
3756 return r;
3757 }
3758
3759 return handler->bucket_exports_data();
3760 }
3761
3762 int RGWBucketCtl::bucket_imports_data(const rgw_bucket& bucket,
3763 optional_yield y)
3764 {
3765
3766 RGWBucketSyncPolicyHandlerRef handler;
3767
3768 int r = get_sync_policy_handler(std::nullopt, bucket, &handler, y);
3769 if (r < 0) {
3770 return r;
3771 }
3772
3773 return handler->bucket_imports_data();
3774 }
3775
3776 RGWBucketMetadataHandlerBase *RGWBucketMetaHandlerAllocator::alloc()
3777 {
3778 return new RGWBucketMetadataHandler();
3779 }
3780
3781 RGWBucketInstanceMetadataHandlerBase *RGWBucketInstanceMetaHandlerAllocator::alloc()
3782 {
3783 return new RGWBucketInstanceMetadataHandler();
3784 }
3785
3786 RGWBucketMetadataHandlerBase *RGWArchiveBucketMetaHandlerAllocator::alloc()
3787 {
3788 return new RGWArchiveBucketMetadataHandler();
3789 }
3790
3791 RGWBucketInstanceMetadataHandlerBase *RGWArchiveBucketInstanceMetaHandlerAllocator::alloc()
3792 {
3793 return new RGWArchiveBucketInstanceMetadataHandler();
3794 }
3795