]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_bucket.cc
import ceph octopus 15.2.17
[ceph.git] / ceph / src / rgw / rgw_bucket.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <errno.h>
5
6 #include <string>
7 #include <map>
8 #include <sstream>
9
10 #include <boost/utility/string_ref.hpp>
11 #include <boost/format.hpp>
12
13 #include "common/errno.h"
14 #include "common/ceph_json.h"
15 #include "include/scope_guard.h"
16
17 #include "rgw_rados.h"
18 #include "rgw_zone.h"
19 #include "rgw_acl.h"
20 #include "rgw_acl_s3.h"
21 #include "rgw_tag_s3.h"
22
23 #include "include/types.h"
24 #include "rgw_bucket.h"
25 #include "rgw_user.h"
26 #include "rgw_string.h"
27 #include "rgw_multi.h"
28 #include "rgw_op.h"
29 #include "rgw_bucket_sync.h"
30
31 #include "services/svc_zone.h"
32 #include "services/svc_sys_obj.h"
33 #include "services/svc_bucket.h"
34 #include "services/svc_bucket_sync.h"
35 #include "services/svc_meta.h"
36 #include "services/svc_meta_be_sobj.h"
37 #include "services/svc_user.h"
38 #include "services/svc_cls.h"
39 #include "services/svc_bilog_rados.h"
40 #include "services/svc_datalog_rados.h"
41
42 #include "include/rados/librados.hpp"
43 // until everything is moved from rgw_common
44 #include "rgw_common.h"
45 #include "rgw_reshard.h"
46 #include "rgw_lc.h"
47
48 // stolen from src/cls/version/cls_version.cc
49 #define VERSION_ATTR "ceph.objclass.version"
50
51 #include "cls/user/cls_user_types.h"
52
53 #include "rgw_sal.h"
54
55 #define dout_context g_ceph_context
56 #define dout_subsys ceph_subsys_rgw
57
58 #define BUCKET_TAG_TIMEOUT 30
59
60 // default number of entries to list with each bucket listing call
61 // (use marker to bridge between calls)
62 static constexpr size_t listing_max_entries = 1000;
63
64
65 /*
66 * The tenant_name is always returned on purpose. May be empty, of course.
67 */
68 static void parse_bucket(const string& bucket,
69 string *tenant_name,
70 string *bucket_name,
71 string *bucket_instance = nullptr /* optional */)
72 {
73 /*
74 * expected format: [tenant/]bucket:bucket_instance
75 */
76 int pos = bucket.find('/');
77 if (pos >= 0) {
78 *tenant_name = bucket.substr(0, pos);
79 } else {
80 tenant_name->clear();
81 }
82 string bn = bucket.substr(pos + 1);
83 pos = bn.find (':');
84 if (pos < 0) {
85 *bucket_name = std::move(bn);
86 return;
87 }
88 *bucket_name = bn.substr(0, pos);
89 if (bucket_instance) {
90 *bucket_instance = bn.substr(pos + 1);
91 }
92
93 /*
94 * deal with the possible tenant:bucket:bucket_instance case
95 */
96 if (tenant_name->empty()) {
97 pos = bucket_instance->find(':');
98 if (pos >= 0) {
99 *tenant_name = *bucket_name;
100 *bucket_name = bucket_instance->substr(0, pos);
101 *bucket_instance = bucket_instance->substr(pos + 1);
102 }
103 }
104 }
105
106 /*
107 * Note that this is not a reversal of parse_bucket(). That one deals
108 * with the syntax we need in metadata and such. This one deals with
109 * the representation in RADOS pools. We chose '/' because it's not
110 * acceptable in bucket names and thus qualified buckets cannot conflict
111 * with the legacy or S3 buckets.
112 */
113 std::string rgw_make_bucket_entry_name(const std::string& tenant_name,
114 const std::string& bucket_name) {
115 std::string bucket_entry;
116
117 if (bucket_name.empty()) {
118 bucket_entry.clear();
119 } else if (tenant_name.empty()) {
120 bucket_entry = bucket_name;
121 } else {
122 bucket_entry = tenant_name + "/" + bucket_name;
123 }
124
125 return bucket_entry;
126 }
127
128 /*
129 * Tenants are separated from buckets in URLs by a colon in S3.
130 * This function is not to be used on Swift URLs, not even for COPY arguments.
131 */
132 void rgw_parse_url_bucket(const string &bucket, const string& auth_tenant,
133 string &tenant_name, string &bucket_name) {
134
135 int pos = bucket.find(':');
136 if (pos >= 0) {
137 /*
138 * N.B.: We allow ":bucket" syntax with explicit empty tenant in order
139 * to refer to the legacy tenant, in case users in new named tenants
140 * want to access old global buckets.
141 */
142 tenant_name = bucket.substr(0, pos);
143 bucket_name = bucket.substr(pos + 1);
144 } else {
145 tenant_name = auth_tenant;
146 bucket_name = bucket;
147 }
148 }
149
150 /**
151 * Get all the buckets owned by a user and fill up an RGWUserBuckets with them.
152 * Returns: 0 on success, -ERR# on failure.
153 */
154 int rgw_read_user_buckets(rgw::sal::RGWRadosStore * store,
155 const rgw_user& user_id,
156 rgw::sal::RGWBucketList& buckets,
157 const string& marker,
158 const string& end_marker,
159 uint64_t max,
160 bool need_stats)
161 {
162 rgw::sal::RGWRadosUser user(store, user_id);
163 return user.list_buckets(marker, end_marker, max, need_stats, buckets);
164 }
165
166 int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *bucket_name, string *bucket_id, int *shard_id)
167 {
168 auto pos = bucket_instance.rfind(':');
169 if (pos == string::npos) {
170 return -EINVAL;
171 }
172
173 string first = bucket_instance.substr(0, pos);
174 string second = bucket_instance.substr(pos + 1);
175
176 pos = first.find(':');
177
178 if (pos == string::npos) {
179 *shard_id = -1;
180 *bucket_name = first;
181 *bucket_id = second;
182 return 0;
183 }
184
185 *bucket_name = first.substr(0, pos);
186 *bucket_id = first.substr(pos + 1);
187
188 string err;
189 *shard_id = strict_strtol(second.c_str(), 10, &err);
190 if (!err.empty()) {
191 return -EINVAL;
192 }
193
194 return 0;
195 }
196
197 // parse key in format: [tenant/]name:instance[:shard_id]
198 int rgw_bucket_parse_bucket_key(CephContext *cct, const string& key,
199 rgw_bucket *bucket, int *shard_id)
200 {
201 boost::string_ref name{key};
202 boost::string_ref instance;
203
204 // split tenant/name
205 auto pos = name.find('/');
206 if (pos != string::npos) {
207 auto tenant = name.substr(0, pos);
208 bucket->tenant.assign(tenant.begin(), tenant.end());
209 name = name.substr(pos + 1);
210 } else {
211 bucket->tenant.clear();
212 }
213
214 // split name:instance
215 pos = name.find(':');
216 if (pos != string::npos) {
217 instance = name.substr(pos + 1);
218 name = name.substr(0, pos);
219 }
220 bucket->name.assign(name.begin(), name.end());
221
222 // split instance:shard
223 pos = instance.find(':');
224 if (pos == string::npos) {
225 bucket->bucket_id.assign(instance.begin(), instance.end());
226 if (shard_id) {
227 *shard_id = -1;
228 }
229 return 0;
230 }
231
232 // parse shard id
233 auto shard = instance.substr(pos + 1);
234 string err;
235 auto id = strict_strtol(shard.data(), 10, &err);
236 if (!err.empty()) {
237 if (cct) {
238 ldout(cct, 0) << "ERROR: failed to parse bucket shard '"
239 << instance.data() << "': " << err << dendl;
240 }
241 return -EINVAL;
242 }
243
244 if (shard_id) {
245 *shard_id = id;
246 }
247 instance = instance.substr(0, pos);
248 bucket->bucket_id.assign(instance.begin(), instance.end());
249 return 0;
250 }
251
252 static void dump_mulipart_index_results(list<rgw_obj_index_key>& objs_to_unlink,
253 Formatter *f)
254 {
255 for (const auto& o : objs_to_unlink) {
256 f->dump_string("object", o.name);
257 }
258 }
259
260 void check_bad_user_bucket_mapping(rgw::sal::RGWRadosStore *store, const rgw_user& user_id,
261 bool fix)
262 {
263 rgw::sal::RGWBucketList user_buckets;
264 rgw::sal::RGWRadosUser user(store, user_id);
265 string marker;
266
267 CephContext *cct = store->ctx();
268
269 size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
270
271 do {
272 int ret = user.list_buckets(marker, string(), max_entries, false, user_buckets);
273 if (ret < 0) {
274 ldout(store->ctx(), 0) << "failed to read user buckets: "
275 << cpp_strerror(-ret) << dendl;
276 return;
277 }
278
279 map<string, rgw::sal::RGWBucket*>& buckets = user_buckets.get_buckets();
280 for (map<string, rgw::sal::RGWBucket*>::iterator i = buckets.begin();
281 i != buckets.end();
282 ++i) {
283 marker = i->first;
284
285 rgw::sal::RGWBucket* bucket = i->second;
286
287 RGWBucketInfo bucket_info;
288 real_time mtime;
289 int r = store->getRados()->get_bucket_info(store->svc(), user_id.tenant, bucket->get_name(), bucket_info, &mtime, null_yield);
290 if (r < 0) {
291 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << dendl;
292 continue;
293 }
294
295 rgw_bucket& actual_bucket = bucket_info.bucket;
296
297 if (actual_bucket.name.compare(bucket->get_name()) != 0 ||
298 actual_bucket.tenant.compare(bucket->get_tenant()) != 0 ||
299 actual_bucket.marker.compare(bucket->get_marker()) != 0 ||
300 actual_bucket.bucket_id.compare(bucket->get_bucket_id()) != 0) {
301 cout << "bucket info mismatch: expected " << actual_bucket << " got " << bucket << std::endl;
302 if (fix) {
303 cout << "fixing" << std::endl;
304 r = store->ctl()->bucket->link_bucket(user_id, actual_bucket,
305 bucket_info.creation_time,
306 null_yield);
307 if (r < 0) {
308 cerr << "failed to fix bucket: " << cpp_strerror(-r) << std::endl;
309 }
310 }
311 }
312 }
313 } while (user_buckets.is_truncated());
314 }
315
316 // note: function type conforms to RGWRados::check_filter_t
317 bool rgw_bucket_object_check_filter(const string& oid)
318 {
319 rgw_obj_key key;
320 string ns;
321 return rgw_obj_key::oid_to_key_in_ns(oid, &key, ns);
322 }
323
324 int rgw_remove_object(rgw::sal::RGWRadosStore *store, const RGWBucketInfo& bucket_info, const rgw_bucket& bucket, rgw_obj_key& key)
325 {
326 RGWObjectCtx rctx(store);
327
328 if (key.instance.empty()) {
329 key.instance = "null";
330 }
331
332 rgw_obj obj(bucket, key);
333
334 return store->getRados()->delete_obj(rctx, bucket_info, obj, bucket_info.versioning_status());
335 }
336
337 /* xxx dang */
338 static int rgw_remove_bucket(rgw::sal::RGWRadosStore *store, rgw_bucket& bucket, bool delete_children, optional_yield y)
339 {
340 int ret;
341 map<RGWObjCategory, RGWStorageStats> stats;
342 std::vector<rgw_bucket_dir_entry> objs;
343 map<string, bool> common_prefixes;
344 RGWBucketInfo info;
345
346 string bucket_ver, master_ver;
347
348 ret = store->getRados()->get_bucket_info(store->svc(), bucket.tenant, bucket.name, info, NULL, null_yield);
349 if (ret < 0)
350 return ret;
351
352 ret = store->getRados()->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL);
353 if (ret < 0)
354 return ret;
355
356 RGWRados::Bucket target(store->getRados(), info);
357 RGWRados::Bucket::List list_op(&target);
358 CephContext *cct = store->ctx();
359
360 list_op.params.list_versions = true;
361 list_op.params.allow_unordered = true;
362
363 bool is_truncated = false;
364 do {
365 objs.clear();
366
367 ret = list_op.list_objects(listing_max_entries, &objs, &common_prefixes,
368 &is_truncated, null_yield);
369 if (ret < 0)
370 return ret;
371
372 if (!objs.empty() && !delete_children) {
373 lderr(store->ctx()) << "ERROR: could not remove non-empty bucket " << bucket.name << dendl;
374 return -ENOTEMPTY;
375 }
376
377 for (const auto& obj : objs) {
378 rgw_obj_key key(obj.key);
379 ret = rgw_remove_object(store, info, bucket, key);
380 if (ret < 0 && ret != -ENOENT) {
381 return ret;
382 }
383 }
384 } while(is_truncated);
385
386 string prefix, delimiter;
387
388 ret = abort_bucket_multiparts(store, cct, info, prefix, delimiter);
389 if (ret < 0) {
390 return ret;
391 }
392
393 ret = store->ctl()->bucket->sync_user_stats(info.owner, info);
394 if ( ret < 0) {
395 dout(1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl;
396 }
397
398 RGWObjVersionTracker objv_tracker;
399
400 // if we deleted children above we will force delete, as any that
401 // remain is detrius from a prior bug
402 ret = store->getRados()->delete_bucket(info, objv_tracker, null_yield, !delete_children);
403 if (ret < 0) {
404 lderr(store->ctx()) << "ERROR: could not remove bucket " <<
405 bucket.name << dendl;
406 return ret;
407 }
408
409 ret = store->ctl()->bucket->unlink_bucket(info.owner, bucket, null_yield, false);
410 if (ret < 0) {
411 lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
412 }
413
414 return ret;
415 }
416
417 static int aio_wait(librados::AioCompletion *handle)
418 {
419 librados::AioCompletion *c = (librados::AioCompletion *)handle;
420 c->wait_for_complete();
421 int ret = c->get_return_value();
422 c->release();
423 return ret;
424 }
425
426 static int drain_handles(list<librados::AioCompletion *>& pending)
427 {
428 int ret = 0;
429 while (!pending.empty()) {
430 librados::AioCompletion *handle = pending.front();
431 pending.pop_front();
432 int r = aio_wait(handle);
433 if (r < 0) {
434 ret = r;
435 }
436 }
437 return ret;
438 }
439
440 int rgw_remove_bucket_bypass_gc(rgw::sal::RGWRadosStore *store, rgw_bucket& bucket,
441 int concurrent_max, bool keep_index_consistent,
442 optional_yield y)
443 {
444 int ret;
445 map<RGWObjCategory, RGWStorageStats> stats;
446 std::vector<rgw_bucket_dir_entry> objs;
447 map<string, bool> common_prefixes;
448 RGWBucketInfo info;
449 RGWObjectCtx obj_ctx(store);
450 CephContext *cct = store->ctx();
451
452 string bucket_ver, master_ver;
453
454 ret = store->getRados()->get_bucket_info(store->svc(), bucket.tenant, bucket.name, info, NULL, null_yield);
455 if (ret < 0)
456 return ret;
457
458 ret = store->getRados()->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL);
459 if (ret < 0)
460 return ret;
461
462 string prefix, delimiter;
463
464 ret = abort_bucket_multiparts(store, cct, info, prefix, delimiter);
465 if (ret < 0) {
466 return ret;
467 }
468
469 RGWRados::Bucket target(store->getRados(), info);
470 RGWRados::Bucket::List list_op(&target);
471
472 list_op.params.list_versions = true;
473 list_op.params.allow_unordered = true;
474
475 std::list<librados::AioCompletion*> handles;
476
477 int max_aio = concurrent_max;
478 bool is_truncated = true;
479
480 while (is_truncated) {
481 objs.clear();
482 ret = list_op.list_objects(listing_max_entries, &objs, &common_prefixes,
483 &is_truncated, null_yield);
484 if (ret < 0)
485 return ret;
486
487 std::vector<rgw_bucket_dir_entry>::iterator it = objs.begin();
488 for (; it != objs.end(); ++it) {
489 RGWObjState *astate = NULL;
490 rgw_obj obj(bucket, (*it).key);
491
492 ret = store->getRados()->get_obj_state(&obj_ctx, info, obj, &astate, false, y);
493 if (ret == -ENOENT) {
494 dout(1) << "WARNING: cannot find obj state for obj " << obj.get_oid() << dendl;
495 continue;
496 }
497 if (ret < 0) {
498 lderr(store->ctx()) << "ERROR: get obj state returned with error " << ret << dendl;
499 return ret;
500 }
501
502 if (astate->manifest) {
503 RGWObjManifest& manifest = *astate->manifest;
504 RGWObjManifest::obj_iterator miter = manifest.obj_begin();
505 rgw_obj head_obj = manifest.get_obj();
506 rgw_raw_obj raw_head_obj;
507 store->getRados()->obj_to_raw(info.placement_rule, head_obj, &raw_head_obj);
508
509
510 for (; miter != manifest.obj_end() && max_aio--; ++miter) {
511 if (!max_aio) {
512 ret = drain_handles(handles);
513 if (ret < 0) {
514 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
515 return ret;
516 }
517 max_aio = concurrent_max;
518 }
519
520 rgw_raw_obj last_obj = miter.get_location().get_raw_obj(store->getRados());
521 if (last_obj == raw_head_obj) {
522 // have the head obj deleted at the end
523 continue;
524 }
525
526 ret = store->getRados()->delete_raw_obj_aio(last_obj, handles);
527 if (ret < 0) {
528 lderr(store->ctx()) << "ERROR: delete obj aio failed with " << ret << dendl;
529 return ret;
530 }
531 } // for all shadow objs
532
533 ret = store->getRados()->delete_obj_aio(head_obj, info, astate, handles, keep_index_consistent, null_yield);
534 if (ret < 0) {
535 lderr(store->ctx()) << "ERROR: delete obj aio failed with " << ret << dendl;
536 return ret;
537 }
538 }
539
540 if (!max_aio) {
541 ret = drain_handles(handles);
542 if (ret < 0) {
543 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
544 return ret;
545 }
546 max_aio = concurrent_max;
547 }
548 obj_ctx.invalidate(obj);
549 } // for all RGW objects
550 }
551
552 ret = drain_handles(handles);
553 if (ret < 0) {
554 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
555 return ret;
556 }
557
558 ret = store->ctl()->bucket->sync_user_stats(info.owner, info);
559 if (ret < 0) {
560 dout(1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl;
561 }
562
563 RGWObjVersionTracker objv_tracker;
564
565 // this function can only be run if caller wanted children to be
566 // deleted, so we can ignore the check for children as any that
567 // remain are detritus from a prior bug
568 ret = store->getRados()->delete_bucket(info, objv_tracker, y, false);
569 if (ret < 0) {
570 lderr(store->ctx()) << "ERROR: could not remove bucket " << bucket.name << dendl;
571 return ret;
572 }
573
574 ret = store->ctl()->bucket->unlink_bucket(info.owner, bucket, null_yield, false);
575 if (ret < 0) {
576 lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
577 }
578
579 return ret;
580 }
581
582 static void set_err_msg(std::string *sink, std::string msg)
583 {
584 if (sink && !msg.empty())
585 *sink = msg;
586 }
587
588 int RGWBucket::init(rgw::sal::RGWRadosStore *storage, RGWBucketAdminOpState& op_state,
589 optional_yield y, std::string *err_msg,
590 map<string, bufferlist> *pattrs)
591 {
592 if (!storage) {
593 set_err_msg(err_msg, "no storage!");
594 return -EINVAL;
595 }
596
597 store = storage;
598
599 rgw_user user_id = op_state.get_user_id();
600 bucket.tenant = user_id.tenant;
601 bucket.name = op_state.get_bucket_name();
602
603 if (bucket.name.empty() && user_id.empty())
604 return -EINVAL;
605
606 // split possible tenant/name
607 auto pos = bucket.name.find('/');
608 if (pos != string::npos) {
609 bucket.tenant = bucket.name.substr(0, pos);
610 bucket.name = bucket.name.substr(pos + 1);
611 }
612
613 if (!bucket.name.empty()) {
614 int r = store->ctl()->bucket->read_bucket_info(
615 bucket, &bucket_info, y,
616 RGWBucketCtl::BucketInstance::GetParams().set_attrs(pattrs),
617 &ep_objv);
618 if (r < 0) {
619 set_err_msg(err_msg, "failed to fetch bucket info for bucket=" + bucket.name);
620 return r;
621 }
622
623 op_state.set_bucket(bucket_info.bucket);
624 }
625
626 if (!user_id.empty()) {
627 int r = store->ctl()->user->get_info_by_uid(user_id, &user_info, y);
628 if (r < 0) {
629 set_err_msg(err_msg, "failed to fetch user info");
630 return r;
631 }
632
633 op_state.display_name = user_info.display_name;
634 }
635
636 clear_failure();
637 return 0;
638 }
639
640 bool rgw_find_bucket_by_id(CephContext *cct, RGWMetadataManager *mgr,
641 const string& marker, const string& bucket_id, rgw_bucket* bucket_out)
642 {
643 void *handle = NULL;
644 bool truncated = false;
645 string s;
646
647 int ret = mgr->list_keys_init("bucket.instance", marker, &handle);
648 if (ret < 0) {
649 cerr << "ERROR: can't get key: " << cpp_strerror(-ret) << std::endl;
650 mgr->list_keys_complete(handle);
651 return -ret;
652 }
653 do {
654 list<string> keys;
655 ret = mgr->list_keys_next(handle, 1000, keys, &truncated);
656 if (ret < 0) {
657 cerr << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << std::endl;
658 mgr->list_keys_complete(handle);
659 return -ret;
660 }
661 for (list<string>::iterator iter = keys.begin(); iter != keys.end(); ++iter) {
662 s = *iter;
663 ret = rgw_bucket_parse_bucket_key(cct, s, bucket_out, nullptr);
664 if (ret < 0) {
665 continue;
666 }
667 if (bucket_id == bucket_out->bucket_id) {
668 mgr->list_keys_complete(handle);
669 return true;
670 }
671 }
672 } while (truncated);
673 mgr->list_keys_complete(handle);
674 return false;
675 }
676
677 int RGWBucket::link(RGWBucketAdminOpState& op_state, optional_yield y,
678 map<string, bufferlist>& attrs, std::string *err_msg)
679 {
680 if (!op_state.is_user_op()) {
681 set_err_msg(err_msg, "empty user id");
682 return -EINVAL;
683 }
684
685 string bucket_id = op_state.get_bucket_id();
686
687 std::string display_name = op_state.get_user_display_name();
688 rgw_bucket& bucket = op_state.get_bucket();
689 if (!bucket_id.empty() && bucket_id != bucket.bucket_id) {
690 set_err_msg(err_msg,
691 "specified bucket id does not match " + bucket.bucket_id);
692 return -EINVAL;
693 }
694 rgw_bucket old_bucket = bucket;
695 rgw_user user_id = op_state.get_user_id();
696 bucket.tenant = user_id.tenant;
697 if (!op_state.new_bucket_name.empty()) {
698 auto pos = op_state.new_bucket_name.find('/');
699 if (pos != string::npos) {
700 bucket.tenant = op_state.new_bucket_name.substr(0, pos);
701 bucket.name = op_state.new_bucket_name.substr(pos + 1);
702 } else {
703 bucket.name = op_state.new_bucket_name;
704 }
705 }
706
707 RGWObjVersionTracker objv_tracker;
708 RGWObjVersionTracker old_version = bucket_info.objv_tracker;
709
710 map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_ACL);
711 if (aiter == attrs.end()) {
712 // should never happen; only pre-argonaut buckets lacked this.
713 ldout(store->ctx(), 0) << "WARNING: can't bucket link because no acl on bucket=" << old_bucket.name << dendl;
714 set_err_msg(err_msg,
715 "While crossing the Anavros you have displeased the goddess Hera."
716 " You must sacrifice your ancient bucket " + bucket.bucket_id);
717 return -EINVAL;
718 }
719 bufferlist& aclbl = aiter->second;
720 RGWAccessControlPolicy policy;
721 ACLOwner owner;
722 try {
723 auto iter = aclbl.cbegin();
724 decode(policy, iter);
725 owner = policy.get_owner();
726 } catch (buffer::error& err) {
727 set_err_msg(err_msg, "couldn't decode policy");
728 return -EIO;
729 }
730
731 auto bucket_ctl = store->ctl()->bucket;
732 int r = bucket_ctl->unlink_bucket(owner.get_id(), old_bucket, y, false);
733 if (r < 0) {
734 set_err_msg(err_msg, "could not unlink policy from user " + owner.get_id().to_str());
735 return r;
736 }
737
738 // now update the user for the bucket...
739 if (display_name.empty()) {
740 ldout(store->ctx(), 0) << "WARNING: user " << user_info.user_id << " has no display name set" << dendl;
741 }
742
743 RGWAccessControlPolicy policy_instance;
744 policy_instance.create_default(user_info.user_id, display_name);
745 owner = policy_instance.get_owner();
746
747 aclbl.clear();
748 policy_instance.encode(aclbl);
749
750 auto instance_params = RGWBucketCtl::BucketInstance::PutParams().set_attrs(&attrs);
751
752 bucket_info.owner = user_info.user_id;
753 if (bucket != old_bucket) {
754 bucket_info.bucket = bucket;
755 bucket_info.objv_tracker.version_for_read()->ver = 0;
756 instance_params.set_exclusive(true);
757 }
758
759 r = bucket_ctl->store_bucket_instance_info(bucket, bucket_info, y, instance_params);
760 if (r < 0) {
761 set_err_msg(err_msg, "ERROR: failed writing bucket instance info: " + cpp_strerror(-r));
762 return r;
763 }
764
765 RGWBucketEntryPoint ep;
766 ep.bucket = bucket_info.bucket;
767 ep.owner = user_info.user_id;
768 ep.creation_time = bucket_info.creation_time;
769 ep.linked = true;
770 map<string, bufferlist> ep_attrs;
771 rgw_ep_info ep_data{ep, ep_attrs};
772
773 /* link to user */
774 r = store->ctl()->bucket->link_bucket(user_info.user_id,
775 bucket_info.bucket,
776 ep.creation_time,
777 y, true, &ep_data);
778 if (r < 0) {
779 set_err_msg(err_msg, "failed to relink bucket");
780 return r;
781 }
782
783 if (bucket != old_bucket) {
784 // like RGWRados::delete_bucket -- excepting no bucket_index work.
785 r = bucket_ctl->remove_bucket_entrypoint_info(old_bucket, y,
786 RGWBucketCtl::Bucket::RemoveParams()
787 .set_objv_tracker(&ep_data.ep_objv));
788 if (r < 0) {
789 set_err_msg(err_msg, "failed to unlink old bucket endpoint " + old_bucket.tenant + "/" + old_bucket.name);
790 return r;
791 }
792
793 r = bucket_ctl->remove_bucket_instance_info(old_bucket, bucket_info, y,
794 RGWBucketCtl::BucketInstance::RemoveParams()
795 .set_objv_tracker(&old_version));
796 if (r < 0) {
797 set_err_msg(err_msg, "failed to unlink old bucket info");
798 return r;
799 }
800 }
801
802 return 0;
803 }
804
805 int RGWBucket::chown(RGWBucketAdminOpState& op_state, const string& marker,
806 optional_yield y, std::string *err_msg)
807 {
808 int ret = store->ctl()->bucket->chown(store, bucket_info, user_info.user_id,
809 user_info.display_name, marker, y);
810 if (ret < 0) {
811 set_err_msg(err_msg, "Failed to change object ownership: " + cpp_strerror(-ret));
812 }
813
814 return ret;
815 }
816
817 int RGWBucket::unlink(RGWBucketAdminOpState& op_state, optional_yield y, std::string *err_msg)
818 {
819 rgw_bucket bucket = op_state.get_bucket();
820
821 if (!op_state.is_user_op()) {
822 set_err_msg(err_msg, "could not fetch user or user bucket info");
823 return -EINVAL;
824 }
825
826 int r = store->ctl()->bucket->unlink_bucket(user_info.user_id, bucket, y);
827 if (r < 0) {
828 set_err_msg(err_msg, "error unlinking bucket" + cpp_strerror(-r));
829 }
830
831 return r;
832 }
833
834 int RGWBucket::set_quota(RGWBucketAdminOpState& op_state, std::string *err_msg)
835 {
836 rgw_bucket bucket = op_state.get_bucket();
837 RGWBucketInfo bucket_info;
838 map<string, bufferlist> attrs;
839 int r = store->getRados()->get_bucket_info(store->svc(), bucket.tenant, bucket.name, bucket_info, NULL, null_yield, &attrs);
840 if (r < 0) {
841 set_err_msg(err_msg, "could not get bucket info for bucket=" + bucket.name + ": " + cpp_strerror(-r));
842 return r;
843 }
844
845 bucket_info.quota = op_state.quota;
846 r = store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), &attrs);
847 if (r < 0) {
848 set_err_msg(err_msg, "ERROR: failed writing bucket instance info: " + cpp_strerror(-r));
849 return r;
850 }
851 return r;
852 }
853
854 int RGWBucket::remove(RGWBucketAdminOpState& op_state, optional_yield y, bool bypass_gc,
855 bool keep_index_consistent, std::string *err_msg)
856 {
857 bool delete_children = op_state.will_delete_children();
858 rgw_bucket bucket = op_state.get_bucket();
859 int ret;
860
861 if (bypass_gc) {
862 if (delete_children) {
863 ret = rgw_remove_bucket_bypass_gc(store, bucket, op_state.get_max_aio(), keep_index_consistent, y);
864 } else {
865 set_err_msg(err_msg, "purge objects should be set for gc to be bypassed");
866 return -EINVAL;
867 }
868 } else {
869 ret = rgw_remove_bucket(store, bucket, delete_children, y);
870 }
871
872 if (ret < 0) {
873 set_err_msg(err_msg, "unable to remove bucket" + cpp_strerror(-ret));
874 return ret;
875 }
876
877 return 0;
878 }
879
880 int RGWBucket::remove_object(RGWBucketAdminOpState& op_state, std::string *err_msg)
881 {
882 rgw_bucket bucket = op_state.get_bucket();
883 std::string object_name = op_state.get_object_name();
884
885 rgw_obj_key key(object_name);
886
887 int ret = rgw_remove_object(store, bucket_info, bucket, key);
888 if (ret < 0) {
889 set_err_msg(err_msg, "unable to remove object" + cpp_strerror(-ret));
890 return ret;
891 }
892
893 return 0;
894 }
895
896 static void dump_bucket_index(const RGWRados::ent_map_t& result, Formatter *f)
897 {
898 for (auto iter = result.begin(); iter != result.end(); ++iter) {
899 f->dump_string("object", iter->first);
900 }
901 }
902
903 static void dump_bucket_usage(map<RGWObjCategory, RGWStorageStats>& stats, Formatter *formatter)
904 {
905 map<RGWObjCategory, RGWStorageStats>::iterator iter;
906
907 formatter->open_object_section("usage");
908 for (iter = stats.begin(); iter != stats.end(); ++iter) {
909 RGWStorageStats& s = iter->second;
910 const char *cat_name = rgw_obj_category_name(iter->first);
911 formatter->open_object_section(cat_name);
912 s.dump(formatter);
913 formatter->close_section();
914 }
915 formatter->close_section();
916 }
917
918 static void dump_index_check(map<RGWObjCategory, RGWStorageStats> existing_stats,
919 map<RGWObjCategory, RGWStorageStats> calculated_stats,
920 Formatter *formatter)
921 {
922 formatter->open_object_section("check_result");
923 formatter->open_object_section("existing_header");
924 dump_bucket_usage(existing_stats, formatter);
925 formatter->close_section();
926 formatter->open_object_section("calculated_header");
927 dump_bucket_usage(calculated_stats, formatter);
928 formatter->close_section();
929 formatter->close_section();
930 }
931
932 int RGWBucket::check_bad_index_multipart(RGWBucketAdminOpState& op_state,
933 RGWFormatterFlusher& flusher ,std::string *err_msg)
934 {
935 bool fix_index = op_state.will_fix_index();
936 rgw_bucket bucket = op_state.get_bucket();
937
938 map<string, bool> common_prefixes;
939
940 bool is_truncated;
941 map<string, bool> meta_objs;
942 map<rgw_obj_index_key, string> all_objs;
943
944 RGWBucketInfo bucket_info;
945 auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
946 int r = store->getRados()->get_bucket_instance_info(obj_ctx, bucket, bucket_info, nullptr, nullptr, null_yield);
947 if (r < 0) {
948 ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): get_bucket_instance_info(bucket=" << bucket << ") returned r=" << r << dendl;
949 return r;
950 }
951
952 RGWRados::Bucket target(store->getRados(), bucket_info);
953 RGWRados::Bucket::List list_op(&target);
954
955 list_op.params.list_versions = true;
956 list_op.params.ns = RGW_OBJ_NS_MULTIPART;
957
958 do {
959 vector<rgw_bucket_dir_entry> result;
960 int r = list_op.list_objects(listing_max_entries, &result,
961 &common_prefixes, &is_truncated, null_yield);
962 if (r < 0) {
963 set_err_msg(err_msg, "failed to list objects in bucket=" + bucket.name +
964 " err=" + cpp_strerror(-r));
965
966 return r;
967 }
968
969 vector<rgw_bucket_dir_entry>::iterator iter;
970 for (iter = result.begin(); iter != result.end(); ++iter) {
971 rgw_obj_index_key key = iter->key;
972 rgw_obj obj(bucket, key);
973 string oid = obj.get_oid();
974
975 int pos = oid.find_last_of('.');
976 if (pos < 0) {
977 /* obj has no suffix */
978 all_objs[key] = oid;
979 } else {
980 /* obj has suffix */
981 string name = oid.substr(0, pos);
982 string suffix = oid.substr(pos + 1);
983
984 if (suffix.compare("meta") == 0) {
985 meta_objs[name] = true;
986 } else {
987 all_objs[key] = name;
988 }
989 }
990 }
991 } while (is_truncated);
992
993 list<rgw_obj_index_key> objs_to_unlink;
994 Formatter *f = flusher.get_formatter();
995
996 f->open_array_section("invalid_multipart_entries");
997
998 for (auto aiter = all_objs.begin(); aiter != all_objs.end(); ++aiter) {
999 string& name = aiter->second;
1000
1001 if (meta_objs.find(name) == meta_objs.end()) {
1002 objs_to_unlink.push_back(aiter->first);
1003 }
1004
1005 if (objs_to_unlink.size() > listing_max_entries) {
1006 if (fix_index) {
1007 int r = store->getRados()->remove_objs_from_index(bucket_info, objs_to_unlink);
1008 if (r < 0) {
1009 set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
1010 cpp_strerror(-r));
1011 return r;
1012 }
1013 }
1014
1015 dump_mulipart_index_results(objs_to_unlink, flusher.get_formatter());
1016 flusher.flush();
1017 objs_to_unlink.clear();
1018 }
1019 }
1020
1021 if (fix_index) {
1022 int r = store->getRados()->remove_objs_from_index(bucket_info, objs_to_unlink);
1023 if (r < 0) {
1024 set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
1025 cpp_strerror(-r));
1026
1027 return r;
1028 }
1029 }
1030
1031 dump_mulipart_index_results(objs_to_unlink, f);
1032 f->close_section();
1033 flusher.flush();
1034
1035 return 0;
1036 }
1037
1038 int RGWBucket::check_object_index(RGWBucketAdminOpState& op_state,
1039 RGWFormatterFlusher& flusher,
1040 optional_yield y,
1041 std::string *err_msg)
1042 {
1043
1044 bool fix_index = op_state.will_fix_index();
1045
1046 if (!fix_index) {
1047 set_err_msg(err_msg, "check-objects flag requires fix index enabled");
1048 return -EINVAL;
1049 }
1050
1051 store->getRados()->cls_obj_set_bucket_tag_timeout(bucket_info, BUCKET_TAG_TIMEOUT);
1052
1053 string prefix;
1054 string empty_delimiter;
1055 rgw_obj_index_key marker;
1056 bool is_truncated = true;
1057 bool cls_filtered = true;
1058
1059 Formatter *formatter = flusher.get_formatter();
1060 formatter->open_object_section("objects");
1061 uint16_t expansion_factor = 1;
1062 while (is_truncated) {
1063 RGWRados::ent_map_t result;
1064 result.reserve(listing_max_entries);
1065
1066 int r = store->getRados()->cls_bucket_list_ordered(
1067 bucket_info, RGW_NO_SHARD, marker, prefix, empty_delimiter,
1068 listing_max_entries, true, expansion_factor,
1069 result, &is_truncated, &cls_filtered, &marker,
1070 y, rgw_bucket_object_check_filter);
1071 if (r == -ENOENT) {
1072 break;
1073 } else if (r < 0 && r != -ENOENT) {
1074 set_err_msg(err_msg, "ERROR: failed operation r=" + cpp_strerror(-r));
1075 }
1076
1077 if (result.size() < listing_max_entries / 8) {
1078 ++expansion_factor;
1079 } else if (result.size() > listing_max_entries * 7 / 8 &&
1080 expansion_factor > 1) {
1081 --expansion_factor;
1082 }
1083
1084 dump_bucket_index(result, formatter);
1085 flusher.flush();
1086 }
1087
1088 formatter->close_section();
1089
1090 store->getRados()->cls_obj_set_bucket_tag_timeout(bucket_info, 0);
1091
1092 return 0;
1093 }
1094
1095
1096 int RGWBucket::check_index(RGWBucketAdminOpState& op_state,
1097 map<RGWObjCategory, RGWStorageStats>& existing_stats,
1098 map<RGWObjCategory, RGWStorageStats>& calculated_stats,
1099 std::string *err_msg)
1100 {
1101 bool fix_index = op_state.will_fix_index();
1102
1103 int r = store->getRados()->bucket_check_index(bucket_info, &existing_stats, &calculated_stats);
1104 if (r < 0) {
1105 set_err_msg(err_msg, "failed to check index error=" + cpp_strerror(-r));
1106 return r;
1107 }
1108
1109 if (fix_index) {
1110 r = store->getRados()->bucket_rebuild_index(bucket_info);
1111 if (r < 0) {
1112 set_err_msg(err_msg, "failed to rebuild index err=" + cpp_strerror(-r));
1113 return r;
1114 }
1115 }
1116
1117 return 0;
1118 }
1119
1120 int RGWBucket::sync(RGWBucketAdminOpState& op_state, map<string, bufferlist> *attrs, std::string *err_msg)
1121 {
1122 if (!store->svc()->zone->is_meta_master()) {
1123 set_err_msg(err_msg, "ERROR: failed to update bucket sync: only allowed on meta master zone");
1124 return EINVAL;
1125 }
1126 bool sync = op_state.will_sync_bucket();
1127 if (sync) {
1128 bucket_info.flags &= ~BUCKET_DATASYNC_DISABLED;
1129 } else {
1130 bucket_info.flags |= BUCKET_DATASYNC_DISABLED;
1131 }
1132
1133 int r = store->getRados()->put_bucket_instance_info(bucket_info, false, real_time(), attrs);
1134 if (r < 0) {
1135 set_err_msg(err_msg, "ERROR: failed writing bucket instance info:" + cpp_strerror(-r));
1136 return r;
1137 }
1138
1139 int shards_num = bucket_info.num_shards? bucket_info.num_shards : 1;
1140 int shard_id = bucket_info.num_shards? 0 : -1;
1141
1142 if (!sync) {
1143 r = store->svc()->bilog_rados->log_stop(bucket_info, -1);
1144 if (r < 0) {
1145 set_err_msg(err_msg, "ERROR: failed writing stop bilog:" + cpp_strerror(-r));
1146 return r;
1147 }
1148 } else {
1149 r = store->svc()->bilog_rados->log_start(bucket_info, -1);
1150 if (r < 0) {
1151 set_err_msg(err_msg, "ERROR: failed writing resync bilog:" + cpp_strerror(-r));
1152 return r;
1153 }
1154 }
1155
1156 for (int i = 0; i < shards_num; ++i, ++shard_id) {
1157 r = store->svc()->datalog_rados->add_entry(bucket_info, shard_id);
1158 if (r < 0) {
1159 set_err_msg(err_msg, "ERROR: failed writing data log:" + cpp_strerror(-r));
1160 return r;
1161 }
1162 }
1163
1164 return 0;
1165 }
1166
1167
1168 int RGWBucket::policy_bl_to_stream(bufferlist& bl, ostream& o)
1169 {
1170 RGWAccessControlPolicy_S3 policy(g_ceph_context);
1171 int ret = decode_bl(bl, policy);
1172 if (ret < 0) {
1173 ldout(store->ctx(),0) << "failed to decode RGWAccessControlPolicy" << dendl;
1174 }
1175 policy.to_xml(o);
1176 return 0;
1177 }
1178
1179 int rgw_object_get_attr(rgw::sal::RGWRadosStore* store, const RGWBucketInfo& bucket_info,
1180 const rgw_obj& obj, const char* attr_name,
1181 bufferlist& out_bl, optional_yield y)
1182 {
1183 RGWObjectCtx obj_ctx(store);
1184 RGWRados::Object op_target(store->getRados(), bucket_info, obj_ctx, obj);
1185 RGWRados::Object::Read rop(&op_target);
1186
1187 return rop.get_attr(attr_name, out_bl, y);
1188 }
1189
1190 int RGWBucket::get_policy(RGWBucketAdminOpState& op_state, RGWAccessControlPolicy& policy, optional_yield y)
1191 {
1192 std::string object_name = op_state.get_object_name();
1193 rgw_bucket bucket = op_state.get_bucket();
1194
1195 RGWBucketInfo bucket_info;
1196 map<string, bufferlist> attrs;
1197 int ret = store->getRados()->get_bucket_info(store->svc(), bucket.tenant, bucket.name, bucket_info, NULL, null_yield, &attrs);
1198 if (ret < 0) {
1199 return ret;
1200 }
1201
1202 if (!object_name.empty()) {
1203 bufferlist bl;
1204 rgw_obj obj(bucket, object_name);
1205
1206 ret = rgw_object_get_attr(store, bucket_info, obj, RGW_ATTR_ACL, bl, y);
1207 if (ret < 0){
1208 return ret;
1209 }
1210
1211 ret = decode_bl(bl, policy);
1212 if (ret < 0) {
1213 ldout(store->ctx(),0) << "failed to decode RGWAccessControlPolicy" << dendl;
1214 }
1215 return ret;
1216 }
1217
1218 map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_ACL);
1219 if (aiter == attrs.end()) {
1220 return -ENOENT;
1221 }
1222
1223 ret = decode_bl(aiter->second, policy);
1224 if (ret < 0) {
1225 ldout(store->ctx(),0) << "failed to decode RGWAccessControlPolicy" << dendl;
1226 }
1227
1228 return ret;
1229 }
1230
1231
1232 int RGWBucketAdminOp::get_policy(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
1233 RGWAccessControlPolicy& policy)
1234 {
1235 RGWBucket bucket;
1236
1237 int ret = bucket.init(store, op_state, null_yield);
1238 if (ret < 0)
1239 return ret;
1240
1241 ret = bucket.get_policy(op_state, policy, null_yield);
1242 if (ret < 0)
1243 return ret;
1244
1245 return 0;
1246 }
1247
1248 /* Wrappers to facilitate RESTful interface */
1249
1250
1251 int RGWBucketAdminOp::get_policy(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
1252 RGWFormatterFlusher& flusher)
1253 {
1254 RGWAccessControlPolicy policy(store->ctx());
1255
1256 int ret = get_policy(store, op_state, policy);
1257 if (ret < 0)
1258 return ret;
1259
1260 Formatter *formatter = flusher.get_formatter();
1261
1262 flusher.start(0);
1263
1264 formatter->open_object_section("policy");
1265 policy.dump(formatter);
1266 formatter->close_section();
1267
1268 flusher.flush();
1269
1270 return 0;
1271 }
1272
1273 int RGWBucketAdminOp::dump_s3_policy(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
1274 ostream& os)
1275 {
1276 RGWAccessControlPolicy_S3 policy(store->ctx());
1277
1278 int ret = get_policy(store, op_state, policy);
1279 if (ret < 0)
1280 return ret;
1281
1282 policy.to_xml(os);
1283
1284 return 0;
1285 }
1286
1287 int RGWBucketAdminOp::unlink(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state)
1288 {
1289 RGWBucket bucket;
1290
1291 int ret = bucket.init(store, op_state, null_yield);
1292 if (ret < 0)
1293 return ret;
1294
1295 return bucket.unlink(op_state, null_yield);
1296 }
1297
1298 int RGWBucketAdminOp::link(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state, string *err)
1299 {
1300 RGWBucket bucket;
1301 map<string, bufferlist> attrs;
1302
1303 int ret = bucket.init(store, op_state, null_yield, err, &attrs);
1304 if (ret < 0)
1305 return ret;
1306
1307 return bucket.link(op_state, null_yield, attrs, err);
1308
1309 }
1310
1311 int RGWBucketAdminOp::chown(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state, const string& marker, string *err)
1312 {
1313 RGWBucket bucket;
1314 map<string, bufferlist> attrs;
1315
1316 int ret = bucket.init(store, op_state, null_yield, err, &attrs);
1317 if (ret < 0)
1318 return ret;
1319
1320 ret = bucket.link(op_state, null_yield, attrs, err);
1321 if (ret < 0)
1322 return ret;
1323
1324 return bucket.chown(op_state, marker, null_yield, err);
1325
1326 }
1327
1328 int RGWBucketAdminOp::check_index(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
1329 RGWFormatterFlusher& flusher, optional_yield y)
1330 {
1331 int ret;
1332 map<RGWObjCategory, RGWStorageStats> existing_stats;
1333 map<RGWObjCategory, RGWStorageStats> calculated_stats;
1334
1335
1336 RGWBucket bucket;
1337
1338 ret = bucket.init(store, op_state, null_yield);
1339 if (ret < 0)
1340 return ret;
1341
1342 Formatter *formatter = flusher.get_formatter();
1343 flusher.start(0);
1344
1345 ret = bucket.check_bad_index_multipart(op_state, flusher);
1346 if (ret < 0)
1347 return ret;
1348
1349 ret = bucket.check_object_index(op_state, flusher, y);
1350 if (ret < 0)
1351 return ret;
1352
1353 ret = bucket.check_index(op_state, existing_stats, calculated_stats);
1354 if (ret < 0)
1355 return ret;
1356
1357 dump_index_check(existing_stats, calculated_stats, formatter);
1358 flusher.flush();
1359
1360 return 0;
1361 }
1362
1363 int RGWBucketAdminOp::remove_bucket(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
1364 optional_yield y, bool bypass_gc, bool keep_index_consistent)
1365 {
1366 RGWBucket bucket;
1367
1368 int ret = bucket.init(store, op_state, y);
1369 if (ret < 0)
1370 return ret;
1371
1372 std::string err_msg;
1373 ret = bucket.remove(op_state, y, bypass_gc, keep_index_consistent, &err_msg);
1374 if (!err_msg.empty()) {
1375 lderr(store->ctx()) << "ERROR: " << err_msg << dendl;
1376 }
1377 return ret;
1378 }
1379
1380 int RGWBucketAdminOp::remove_object(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state)
1381 {
1382 RGWBucket bucket;
1383
1384 int ret = bucket.init(store, op_state, null_yield);
1385 if (ret < 0)
1386 return ret;
1387
1388 return bucket.remove_object(op_state);
1389 }
1390
1391 int RGWBucketAdminOp::sync_bucket(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state, string *err_msg)
1392 {
1393 RGWBucket bucket;
1394 map<string, bufferlist> attrs;
1395 int ret = bucket.init(store, op_state, null_yield, err_msg, &attrs);
1396 if (ret < 0)
1397 {
1398 return ret;
1399 }
1400 return bucket.sync(op_state, &attrs, err_msg);
1401 }
1402
1403 static int bucket_stats(rgw::sal::RGWRadosStore *store,
1404 const std::string& tenant_name,
1405 const std::string& bucket_name,
1406 Formatter *formatter)
1407 {
1408 RGWBucketInfo bucket_info;
1409 map<RGWObjCategory, RGWStorageStats> stats;
1410 map<string, bufferlist> attrs;
1411
1412 real_time mtime;
1413 int r = store->getRados()->get_bucket_info(store->svc(),
1414 tenant_name, bucket_name, bucket_info,
1415 &mtime, null_yield, &attrs);
1416 if (r < 0) {
1417 return r;
1418 }
1419
1420 rgw_bucket& bucket = bucket_info.bucket;
1421
1422 string bucket_ver, master_ver;
1423 string max_marker;
1424 int ret = store->getRados()->get_bucket_stats(bucket_info, RGW_NO_SHARD,
1425 &bucket_ver, &master_ver, stats,
1426 &max_marker);
1427 if (ret < 0) {
1428 cerr << "error getting bucket stats bucket=" << bucket.name << " ret=" << ret << std::endl;
1429 return ret;
1430 }
1431
1432 utime_t ut(mtime);
1433 utime_t ctime_ut(bucket_info.creation_time);
1434
1435 formatter->open_object_section("stats");
1436 formatter->dump_string("bucket", bucket.name);
1437 formatter->dump_int("num_shards", bucket_info.num_shards);
1438 formatter->dump_string("tenant", bucket.tenant);
1439 formatter->dump_string("zonegroup", bucket_info.zonegroup);
1440 formatter->dump_string("placement_rule", bucket_info.placement_rule.to_str());
1441 ::encode_json("explicit_placement", bucket.explicit_placement, formatter);
1442 formatter->dump_string("id", bucket.bucket_id);
1443 formatter->dump_string("marker", bucket.marker);
1444 formatter->dump_stream("index_type") << bucket_info.index_type;
1445 ::encode_json("owner", bucket_info.owner, formatter);
1446 formatter->dump_string("ver", bucket_ver);
1447 formatter->dump_string("master_ver", master_ver);
1448 ut.gmtime(formatter->dump_stream("mtime"));
1449 ctime_ut.gmtime(formatter->dump_stream("creation_time"));
1450 formatter->dump_string("max_marker", max_marker);
1451 dump_bucket_usage(stats, formatter);
1452 encode_json("bucket_quota", bucket_info.quota, formatter);
1453
1454 // bucket tags
1455 auto iter = attrs.find(RGW_ATTR_TAGS);
1456 if (iter != attrs.end()) {
1457 RGWObjTagSet_S3 tagset;
1458 bufferlist::const_iterator piter{&iter->second};
1459 try {
1460 tagset.decode(piter);
1461 tagset.dump(formatter);
1462 } catch (buffer::error& err) {
1463 cerr << "ERROR: caught buffer:error, couldn't decode TagSet" << std::endl;
1464 }
1465 }
1466
1467 // TODO: bucket CORS
1468 // TODO: bucket LC
1469 formatter->close_section();
1470
1471 return 0;
1472 }
1473
1474 int RGWBucketAdminOp::limit_check(rgw::sal::RGWRadosStore *store,
1475 RGWBucketAdminOpState& op_state,
1476 const std::list<std::string>& user_ids,
1477 RGWFormatterFlusher& flusher,
1478 bool warnings_only)
1479 {
1480 int ret = 0;
1481 const size_t max_entries =
1482 store->ctx()->_conf->rgw_list_buckets_max_chunk;
1483
1484 const size_t safe_max_objs_per_shard =
1485 store->ctx()->_conf->rgw_safe_max_objects_per_shard;
1486
1487 uint16_t shard_warn_pct =
1488 store->ctx()->_conf->rgw_shard_warning_threshold;
1489 if (shard_warn_pct > 100)
1490 shard_warn_pct = 90;
1491
1492 Formatter *formatter = flusher.get_formatter();
1493 flusher.start(0);
1494
1495 formatter->open_array_section("users");
1496
1497 for (const auto& user_id : user_ids) {
1498
1499 formatter->open_object_section("user");
1500 formatter->dump_string("user_id", user_id);
1501 formatter->open_array_section("buckets");
1502
1503 string marker;
1504 rgw::sal::RGWBucketList buckets;
1505 do {
1506 rgw::sal::RGWRadosUser user(store, rgw_user(user_id));
1507
1508 ret = user.list_buckets(marker, string(), max_entries, false, buckets);
1509
1510 if (ret < 0)
1511 return ret;
1512
1513 map<string, rgw::sal::RGWBucket*>& m_buckets = buckets.get_buckets();
1514
1515 for (const auto& iter : m_buckets) {
1516 auto bucket = iter.second;
1517 uint32_t num_shards = 1;
1518 uint64_t num_objects = 0;
1519
1520 /* need info for num_shards */
1521 RGWBucketInfo info;
1522
1523 marker = bucket->get_name(); /* Casey's location for marker update,
1524 * as we may now not reach the end of
1525 * the loop body */
1526
1527 ret = store->getRados()->get_bucket_info(store->svc(), bucket->get_tenant(),
1528 bucket->get_name(), info, nullptr,
1529 null_yield);
1530 if (ret < 0)
1531 continue;
1532
1533 /* need stats for num_entries */
1534 string bucket_ver, master_ver;
1535 std::map<RGWObjCategory, RGWStorageStats> stats;
1536 ret = store->getRados()->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver,
1537 &master_ver, stats, nullptr);
1538
1539 if (ret < 0)
1540 continue;
1541
1542 for (const auto& s : stats) {
1543 num_objects += s.second.num_objects;
1544 }
1545
1546 num_shards = info.num_shards;
1547 uint64_t objs_per_shard =
1548 (num_shards) ? num_objects/num_shards : num_objects;
1549 {
1550 bool warn;
1551 stringstream ss;
1552 uint64_t fill_pct = objs_per_shard * 100 / safe_max_objs_per_shard;
1553 if (fill_pct > 100) {
1554 ss << "OVER " << fill_pct << "%";
1555 warn = true;
1556 } else if (fill_pct >= shard_warn_pct) {
1557 ss << "WARN " << fill_pct << "%";
1558 warn = true;
1559 } else {
1560 ss << "OK";
1561 warn = false;
1562 }
1563
1564 if (warn || !warnings_only) {
1565 formatter->open_object_section("bucket");
1566 formatter->dump_string("bucket", bucket->get_name());
1567 formatter->dump_string("tenant", bucket->get_tenant());
1568 formatter->dump_int("num_objects", num_objects);
1569 formatter->dump_int("num_shards", num_shards);
1570 formatter->dump_int("objects_per_shard", objs_per_shard);
1571 formatter->dump_string("fill_status", ss.str());
1572 formatter->close_section();
1573 }
1574 }
1575 }
1576 formatter->flush(cout);
1577 } while (buckets.is_truncated()); /* foreach: bucket */
1578
1579 formatter->close_section();
1580 formatter->close_section();
1581 formatter->flush(cout);
1582
1583 } /* foreach: user_id */
1584
1585 formatter->close_section();
1586 formatter->flush(cout);
1587
1588 return ret;
1589 } /* RGWBucketAdminOp::limit_check */
1590
1591 int RGWBucketAdminOp::info(rgw::sal::RGWRadosStore *store,
1592 RGWBucketAdminOpState& op_state,
1593 RGWFormatterFlusher& flusher)
1594 {
1595 RGWBucket bucket;
1596 int ret = 0;
1597 const std::string& bucket_name = op_state.get_bucket_name();
1598 if (!bucket_name.empty()) {
1599 ret = bucket.init(store, op_state, null_yield);
1600 if (-ENOENT == ret)
1601 return -ERR_NO_SUCH_BUCKET;
1602 else if (ret < 0)
1603 return ret;
1604 }
1605
1606 Formatter *formatter = flusher.get_formatter();
1607 flusher.start(0);
1608
1609 CephContext *cct = store->ctx();
1610
1611 const size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
1612
1613 const bool show_stats = op_state.will_fetch_stats();
1614 const rgw_user& user_id = op_state.get_user_id();
1615 if (op_state.is_user_op()) {
1616 formatter->open_array_section("buckets");
1617
1618 rgw::sal::RGWBucketList buckets;
1619 rgw::sal::RGWRadosUser user(store, op_state.get_user_id());
1620 std::string marker;
1621 const std::string empty_end_marker;
1622 constexpr bool no_need_stats = false; // set need_stats to false
1623
1624 do {
1625 buckets.clear();
1626 ret = user.list_buckets(marker, empty_end_marker, max_entries,
1627 no_need_stats, buckets);
1628 if (ret < 0) {
1629 return ret;
1630 }
1631
1632 const std::string* marker_cursor = nullptr;
1633 map<string, rgw::sal::RGWBucket*>& m = buckets.get_buckets();
1634
1635 for (const auto& i : m) {
1636 const std::string& obj_name = i.first;
1637 if (!bucket_name.empty() && bucket_name != obj_name) {
1638 continue;
1639 }
1640
1641 if (show_stats) {
1642 bucket_stats(store, user_id.tenant, obj_name, formatter);
1643 } else {
1644 formatter->dump_string("bucket", obj_name);
1645 }
1646
1647 marker_cursor = &obj_name;
1648 } // for loop
1649 if (marker_cursor) {
1650 marker = *marker_cursor;
1651 }
1652
1653 flusher.flush();
1654 } while (buckets.is_truncated());
1655
1656 formatter->close_section();
1657 } else if (!bucket_name.empty()) {
1658 ret = bucket_stats(store, user_id.tenant, bucket_name, formatter);
1659 if (ret < 0) {
1660 return ret;
1661 }
1662 } else {
1663 void *handle = nullptr;
1664 bool truncated = true;
1665
1666 formatter->open_array_section("buckets");
1667 ret = store->ctl()->meta.mgr->list_keys_init("bucket", &handle);
1668 while (ret == 0 && truncated) {
1669 std::list<std::string> buckets;
1670 constexpr int max_keys = 1000;
1671 ret = store->ctl()->meta.mgr->list_keys_next(handle, max_keys, buckets,
1672 &truncated);
1673 for (auto& bucket_name : buckets) {
1674 if (show_stats) {
1675 bucket_stats(store, user_id.tenant, bucket_name, formatter);
1676 } else {
1677 formatter->dump_string("bucket", bucket_name);
1678 }
1679 }
1680 }
1681 store->ctl()->meta.mgr->list_keys_complete(handle);
1682
1683 formatter->close_section();
1684 }
1685
1686 flusher.flush();
1687
1688 return 0;
1689 }
1690
1691 int RGWBucketAdminOp::set_quota(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state)
1692 {
1693 RGWBucket bucket;
1694
1695 int ret = bucket.init(store, op_state, null_yield);
1696 if (ret < 0)
1697 return ret;
1698 return bucket.set_quota(op_state);
1699 }
1700
1701 static int purge_bucket_instance(rgw::sal::RGWRadosStore *store, const RGWBucketInfo& bucket_info)
1702 {
1703 int max_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
1704 for (int i = 0; i < max_shards; i++) {
1705 RGWRados::BucketShard bs(store->getRados());
1706 int shard_id = (bucket_info.num_shards > 0 ? i : -1);
1707 int ret = bs.init(bucket_info.bucket, shard_id, nullptr);
1708 if (ret < 0) {
1709 cerr << "ERROR: bs.init(bucket=" << bucket_info.bucket << ", shard=" << shard_id
1710 << "): " << cpp_strerror(-ret) << std::endl;
1711 return ret;
1712 }
1713 ret = store->getRados()->bi_remove(bs);
1714 if (ret < 0) {
1715 cerr << "ERROR: failed to remove bucket index object: "
1716 << cpp_strerror(-ret) << std::endl;
1717 return ret;
1718 }
1719 }
1720 return 0;
1721 }
1722
1723 inline auto split_tenant(const std::string& bucket_name){
1724 auto p = bucket_name.find('/');
1725 if(p != std::string::npos) {
1726 return std::make_pair(bucket_name.substr(0,p), bucket_name.substr(p+1));
1727 }
1728 return std::make_pair(std::string(), bucket_name);
1729 }
1730
1731 using bucket_instance_ls = std::vector<RGWBucketInfo>;
1732 void get_stale_instances(rgw::sal::RGWRadosStore *store, const std::string& bucket_name,
1733 const vector<std::string>& lst,
1734 bucket_instance_ls& stale_instances)
1735 {
1736
1737 auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
1738
1739 bucket_instance_ls other_instances;
1740 // first iterate over the entries, and pick up the done buckets; these
1741 // are guaranteed to be stale
1742 for (const auto& bucket_instance : lst){
1743 RGWBucketInfo binfo;
1744 int r = store->getRados()->get_bucket_instance_info(obj_ctx, bucket_instance,
1745 binfo, nullptr,nullptr, null_yield);
1746 if (r < 0){
1747 // this can only happen if someone deletes us right when we're processing
1748 lderr(store->ctx()) << "Bucket instance is invalid: " << bucket_instance
1749 << cpp_strerror(-r) << dendl;
1750 continue;
1751 }
1752 if (binfo.reshard_status == cls_rgw_reshard_status::DONE)
1753 stale_instances.emplace_back(std::move(binfo));
1754 else {
1755 other_instances.emplace_back(std::move(binfo));
1756 }
1757 }
1758
1759 // Read the cur bucket info, if the bucket doesn't exist we can simply return
1760 // all the instances
1761 auto [tenant, bucket] = split_tenant(bucket_name);
1762 RGWBucketInfo cur_bucket_info;
1763 int r = store->getRados()->get_bucket_info(store->svc(), tenant, bucket, cur_bucket_info, nullptr, null_yield);
1764 if (r < 0) {
1765 if (r == -ENOENT) {
1766 // bucket doesn't exist, everything is stale then
1767 stale_instances.insert(std::end(stale_instances),
1768 std::make_move_iterator(other_instances.begin()),
1769 std::make_move_iterator(other_instances.end()));
1770 } else {
1771 // all bets are off if we can't read the bucket, just return the sureshot stale instances
1772 lderr(store->ctx()) << "error: reading bucket info for bucket: "
1773 << bucket << cpp_strerror(-r) << dendl;
1774 }
1775 return;
1776 }
1777
1778 // Don't process further in this round if bucket is resharding
1779 if (cur_bucket_info.reshard_status == cls_rgw_reshard_status::IN_PROGRESS)
1780 return;
1781
1782 other_instances.erase(std::remove_if(other_instances.begin(), other_instances.end(),
1783 [&cur_bucket_info](const RGWBucketInfo& b){
1784 return (b.bucket.bucket_id == cur_bucket_info.bucket.bucket_id ||
1785 b.bucket.bucket_id == cur_bucket_info.new_bucket_instance_id);
1786 }),
1787 other_instances.end());
1788
1789 // check if there are still instances left
1790 if (other_instances.empty()) {
1791 return;
1792 }
1793
1794 // Now we have a bucket with instances where the reshard status is none, this
1795 // usually happens when the reshard process couldn't complete, lockdown the
1796 // bucket and walk through these instances to make sure no one else interferes
1797 // with these
1798 {
1799 RGWBucketReshardLock reshard_lock(store, cur_bucket_info, true);
1800 r = reshard_lock.lock();
1801 if (r < 0) {
1802 // most likely bucket is under reshard, return the sureshot stale instances
1803 ldout(store->ctx(), 5) << __func__
1804 << "failed to take reshard lock; reshard underway likey" << dendl;
1805 return;
1806 }
1807 auto sg = make_scope_guard([&reshard_lock](){ reshard_lock.unlock();} );
1808 // this should be fast enough that we may not need to renew locks and check
1809 // exit status?, should we read the values of the instances again?
1810 stale_instances.insert(std::end(stale_instances),
1811 std::make_move_iterator(other_instances.begin()),
1812 std::make_move_iterator(other_instances.end()));
1813 }
1814
1815 return;
1816 }
1817
1818 static int process_stale_instances(rgw::sal::RGWRadosStore *store, RGWBucketAdminOpState& op_state,
1819 RGWFormatterFlusher& flusher,
1820 std::function<void(const bucket_instance_ls&,
1821 Formatter *,
1822 rgw::sal::RGWRadosStore*)> process_f)
1823 {
1824 std::string marker;
1825 void *handle;
1826 Formatter *formatter = flusher.get_formatter();
1827 static constexpr auto default_max_keys = 1000;
1828
1829 int ret = store->ctl()->meta.mgr->list_keys_init("bucket.instance", marker, &handle);
1830 if (ret < 0) {
1831 cerr << "ERROR: can't get key: " << cpp_strerror(-ret) << std::endl;
1832 return ret;
1833 }
1834
1835 bool truncated;
1836
1837 formatter->open_array_section("keys");
1838 auto g = make_scope_guard([&store, &handle, &formatter]() {
1839 store->ctl()->meta.mgr->list_keys_complete(handle);
1840 formatter->close_section(); // keys
1841 formatter->flush(cout);
1842 });
1843
1844 do {
1845 list<std::string> keys;
1846
1847 ret = store->ctl()->meta.mgr->list_keys_next(handle, default_max_keys, keys, &truncated);
1848 if (ret < 0 && ret != -ENOENT) {
1849 cerr << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << std::endl;
1850 return ret;
1851 } if (ret != -ENOENT) {
1852 // partition the list of buckets by buckets as the listing is un sorted,
1853 // since it would minimize the reads to bucket_info
1854 std::unordered_map<std::string, std::vector<std::string>> bucket_instance_map;
1855 for (auto &key: keys) {
1856 auto pos = key.find(':');
1857 if(pos != std::string::npos)
1858 bucket_instance_map[key.substr(0,pos)].emplace_back(std::move(key));
1859 }
1860 for (const auto& kv: bucket_instance_map) {
1861 bucket_instance_ls stale_lst;
1862 get_stale_instances(store, kv.first, kv.second, stale_lst);
1863 process_f(stale_lst, formatter, store);
1864 }
1865 }
1866 } while (truncated);
1867
1868 return 0;
1869 }
1870
1871 int RGWBucketAdminOp::list_stale_instances(rgw::sal::RGWRadosStore *store,
1872 RGWBucketAdminOpState& op_state,
1873 RGWFormatterFlusher& flusher)
1874 {
1875 auto process_f = [](const bucket_instance_ls& lst,
1876 Formatter *formatter,
1877 rgw::sal::RGWRadosStore*){
1878 for (const auto& binfo: lst)
1879 formatter->dump_string("key", binfo.bucket.get_key());
1880 };
1881 return process_stale_instances(store, op_state, flusher, process_f);
1882 }
1883
1884
1885 int RGWBucketAdminOp::clear_stale_instances(rgw::sal::RGWRadosStore *store,
1886 RGWBucketAdminOpState& op_state,
1887 RGWFormatterFlusher& flusher)
1888 {
1889 auto process_f = [](const bucket_instance_ls& lst,
1890 Formatter *formatter,
1891 rgw::sal::RGWRadosStore *store){
1892 for (const auto &binfo: lst) {
1893 int ret = purge_bucket_instance(store, binfo);
1894 if (ret == 0){
1895 auto md_key = "bucket.instance:" + binfo.bucket.get_key();
1896 ret = store->ctl()->meta.mgr->remove(md_key, null_yield);
1897 }
1898 formatter->open_object_section("delete_status");
1899 formatter->dump_string("bucket_instance", binfo.bucket.get_key());
1900 formatter->dump_int("status", -ret);
1901 formatter->close_section();
1902 }
1903 };
1904
1905 return process_stale_instances(store, op_state, flusher, process_f);
1906 }
1907
1908 static int fix_single_bucket_lc(rgw::sal::RGWRadosStore *store,
1909 const std::string& tenant_name,
1910 const std::string& bucket_name)
1911 {
1912 RGWBucketInfo bucket_info;
1913 map <std::string, bufferlist> bucket_attrs;
1914 int ret = store->getRados()->get_bucket_info(store->svc(), tenant_name, bucket_name,
1915 bucket_info, nullptr, null_yield, &bucket_attrs);
1916 if (ret < 0) {
1917 // TODO: Should we handle the case where the bucket could've been removed between
1918 // listing and fetching?
1919 return ret;
1920 }
1921
1922 return rgw::lc::fix_lc_shard_entry(store, bucket_info, bucket_attrs);
1923 }
1924
1925 static void format_lc_status(Formatter* formatter,
1926 const std::string& tenant_name,
1927 const std::string& bucket_name,
1928 int status)
1929 {
1930 formatter->open_object_section("bucket_entry");
1931 std::string entry = tenant_name.empty() ? bucket_name : tenant_name + "/" + bucket_name;
1932 formatter->dump_string("bucket", entry);
1933 formatter->dump_int("status", status);
1934 formatter->close_section(); // bucket_entry
1935 }
1936
1937 static void process_single_lc_entry(rgw::sal::RGWRadosStore *store,
1938 Formatter *formatter,
1939 const std::string& tenant_name,
1940 const std::string& bucket_name)
1941 {
1942 int ret = fix_single_bucket_lc(store, tenant_name, bucket_name);
1943 format_lc_status(formatter, tenant_name, bucket_name, -ret);
1944 }
1945
1946 int RGWBucketAdminOp::fix_lc_shards(rgw::sal::RGWRadosStore *store,
1947 RGWBucketAdminOpState& op_state,
1948 RGWFormatterFlusher& flusher)
1949 {
1950 std::string marker;
1951 void *handle;
1952 Formatter *formatter = flusher.get_formatter();
1953 static constexpr auto default_max_keys = 1000;
1954
1955 bool truncated;
1956 if (const std::string& bucket_name = op_state.get_bucket_name();
1957 ! bucket_name.empty()) {
1958 const rgw_user user_id = op_state.get_user_id();
1959 process_single_lc_entry(store, formatter, user_id.tenant, bucket_name);
1960 formatter->flush(cout);
1961 } else {
1962 int ret = store->ctl()->meta.mgr->list_keys_init("bucket", marker, &handle);
1963 if (ret < 0) {
1964 std::cerr << "ERROR: can't get key: " << cpp_strerror(-ret) << std::endl;
1965 return ret;
1966 }
1967
1968 {
1969 formatter->open_array_section("lc_fix_status");
1970 auto sg = make_scope_guard([&store, &handle, &formatter](){
1971 store->ctl()->meta.mgr->list_keys_complete(handle);
1972 formatter->close_section(); // lc_fix_status
1973 formatter->flush(cout);
1974 });
1975 do {
1976 list<std::string> keys;
1977 ret = store->ctl()->meta.mgr->list_keys_next(handle, default_max_keys, keys, &truncated);
1978 if (ret < 0 && ret != -ENOENT) {
1979 std::cerr << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << std::endl;
1980 return ret;
1981 } if (ret != -ENOENT) {
1982 for (const auto &key:keys) {
1983 auto [tenant_name, bucket_name] = split_tenant(key);
1984 process_single_lc_entry(store, formatter, tenant_name, bucket_name);
1985 }
1986 }
1987 formatter->flush(cout); // regularly flush every 1k entries
1988 } while (truncated);
1989 }
1990
1991 }
1992 return 0;
1993
1994 }
1995
1996 static bool has_object_expired(rgw::sal::RGWRadosStore *store,
1997 const RGWBucketInfo& bucket_info,
1998 const rgw_obj_key& key, utime_t& delete_at)
1999 {
2000 rgw_obj obj(bucket_info.bucket, key);
2001 bufferlist delete_at_bl;
2002
2003 int ret = rgw_object_get_attr(store, bucket_info, obj, RGW_ATTR_DELETE_AT, delete_at_bl, null_yield);
2004 if (ret < 0) {
2005 return false; // no delete at attr, proceed
2006 }
2007
2008 ret = decode_bl(delete_at_bl, delete_at);
2009 if (ret < 0) {
2010 return false; // failed to parse
2011 }
2012
2013 if (delete_at <= ceph_clock_now() && !delete_at.is_zero()) {
2014 return true;
2015 }
2016
2017 return false;
2018 }
2019
2020 static int fix_bucket_obj_expiry(rgw::sal::RGWRadosStore *store,
2021 const RGWBucketInfo& bucket_info,
2022 RGWFormatterFlusher& flusher, bool dry_run)
2023 {
2024 if (bucket_info.bucket.bucket_id == bucket_info.bucket.marker) {
2025 lderr(store->ctx()) << "Not a resharded bucket skipping" << dendl;
2026 return 0; // not a resharded bucket, move along
2027 }
2028
2029 Formatter *formatter = flusher.get_formatter();
2030 formatter->open_array_section("expired_deletion_status");
2031 auto sg = make_scope_guard([&formatter] {
2032 formatter->close_section();
2033 formatter->flush(std::cout);
2034 });
2035
2036 RGWRados::Bucket target(store->getRados(), bucket_info);
2037 RGWRados::Bucket::List list_op(&target);
2038
2039 list_op.params.list_versions = bucket_info.versioned();
2040 list_op.params.allow_unordered = true;
2041
2042 bool is_truncated {false};
2043 do {
2044 std::vector<rgw_bucket_dir_entry> objs;
2045
2046 int ret = list_op.list_objects(listing_max_entries, &objs, nullptr,
2047 &is_truncated, null_yield);
2048 if (ret < 0) {
2049 lderr(store->ctx()) << "ERROR failed to list objects in the bucket" << dendl;
2050 return ret;
2051 }
2052 for (const auto& obj : objs) {
2053 rgw_obj_key key(obj.key);
2054 utime_t delete_at;
2055 if (has_object_expired(store, bucket_info, key, delete_at)) {
2056 formatter->open_object_section("object_status");
2057 formatter->dump_string("object", key.name);
2058 formatter->dump_stream("delete_at") << delete_at;
2059
2060 if (!dry_run) {
2061 ret = rgw_remove_object(store, bucket_info, bucket_info.bucket, key);
2062 formatter->dump_int("status", ret);
2063 }
2064
2065 formatter->close_section(); // object_status
2066 }
2067 }
2068 formatter->flush(cout); // regularly flush every 1k entries
2069 } while (is_truncated);
2070
2071 return 0;
2072 }
2073
2074 int RGWBucketAdminOp::fix_obj_expiry(rgw::sal::RGWRadosStore *store,
2075 RGWBucketAdminOpState& op_state,
2076 RGWFormatterFlusher& flusher, bool dry_run)
2077 {
2078 RGWBucket admin_bucket;
2079 int ret = admin_bucket.init(store, op_state, null_yield);
2080 if (ret < 0) {
2081 lderr(store->ctx()) << "failed to initialize bucket" << dendl;
2082 return ret;
2083 }
2084
2085 return fix_bucket_obj_expiry(store, admin_bucket.get_bucket_info(), flusher, dry_run);
2086 }
2087
2088 void rgw_data_change::dump(Formatter *f) const
2089 {
2090 string type;
2091 switch (entity_type) {
2092 case ENTITY_TYPE_BUCKET:
2093 type = "bucket";
2094 break;
2095 default:
2096 type = "unknown";
2097 }
2098 encode_json("entity_type", type, f);
2099 encode_json("key", key, f);
2100 utime_t ut(timestamp);
2101 encode_json("timestamp", ut, f);
2102 }
2103
2104 void rgw_data_change::decode_json(JSONObj *obj) {
2105 string s;
2106 JSONDecoder::decode_json("entity_type", s, obj);
2107 if (s == "bucket") {
2108 entity_type = ENTITY_TYPE_BUCKET;
2109 } else {
2110 entity_type = ENTITY_TYPE_UNKNOWN;
2111 }
2112 JSONDecoder::decode_json("key", key, obj);
2113 utime_t ut;
2114 JSONDecoder::decode_json("timestamp", ut, obj);
2115 timestamp = ut.to_real_time();
2116 }
2117
2118 void rgw_data_change_log_entry::dump(Formatter *f) const
2119 {
2120 encode_json("log_id", log_id, f);
2121 utime_t ut(log_timestamp);
2122 encode_json("log_timestamp", ut, f);
2123 encode_json("entry", entry, f);
2124 }
2125
2126 void rgw_data_change_log_entry::decode_json(JSONObj *obj) {
2127 JSONDecoder::decode_json("log_id", log_id, obj);
2128 utime_t ut;
2129 JSONDecoder::decode_json("log_timestamp", ut, obj);
2130 log_timestamp = ut.to_real_time();
2131 JSONDecoder::decode_json("entry", entry, obj);
2132 }
2133
2134
2135 RGWDataChangesLog::RGWDataChangesLog(RGWSI_Zone *zone_svc, RGWSI_Cls *cls_svc)
2136 : cct(zone_svc->ctx()), changes(cct->_conf->rgw_data_log_changes_size)
2137 {
2138 svc.zone = zone_svc;
2139 svc.cls = cls_svc;
2140
2141 num_shards = cct->_conf->rgw_data_log_num_shards;
2142
2143 oids = new string[num_shards];
2144
2145 string prefix = cct->_conf->rgw_data_log_obj_prefix;
2146
2147 if (prefix.empty()) {
2148 prefix = "data_log";
2149 }
2150
2151 for (int i = 0; i < num_shards; i++) {
2152 char buf[16];
2153 snprintf(buf, sizeof(buf), "%s.%d", prefix.c_str(), i);
2154 oids[i] = buf;
2155 }
2156
2157 renew_thread = new ChangesRenewThread(cct, this);
2158 renew_thread->create("rgw_dt_lg_renew");
2159 }
2160
2161 int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
2162 const string& name = bs.bucket.name;
2163 int shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
2164 uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards;
2165
2166 return (int)r;
2167 }
2168
2169 int RGWDataChangesLog::renew_entries()
2170 {
2171 if (!svc.zone->need_to_log_data())
2172 return 0;
2173
2174 /* we can't keep the bucket name as part of the cls_log_entry, and we need
2175 * it later, so we keep two lists under the map */
2176 map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > > m;
2177
2178 lock.lock();
2179 map<rgw_bucket_shard, bool> entries;
2180 entries.swap(cur_cycle);
2181 lock.unlock();
2182
2183 map<rgw_bucket_shard, bool>::iterator iter;
2184 string section;
2185 real_time ut = real_clock::now();
2186 for (iter = entries.begin(); iter != entries.end(); ++iter) {
2187 const rgw_bucket_shard& bs = iter->first;
2188
2189 int index = choose_oid(bs);
2190
2191 cls_log_entry entry;
2192
2193 rgw_data_change change;
2194 bufferlist bl;
2195 change.entity_type = ENTITY_TYPE_BUCKET;
2196 change.key = bs.get_key();
2197 change.timestamp = ut;
2198 encode(change, bl);
2199
2200 svc.cls->timelog.prepare_entry(entry, ut, section, change.key, bl);
2201
2202 m[index].first.push_back(bs);
2203 m[index].second.emplace_back(std::move(entry));
2204 }
2205
2206 map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > >::iterator miter;
2207 for (miter = m.begin(); miter != m.end(); ++miter) {
2208 list<cls_log_entry>& entries = miter->second.second;
2209
2210 real_time now = real_clock::now();
2211
2212 int ret = svc.cls->timelog.add(oids[miter->first], entries, nullptr, true, null_yield);
2213 if (ret < 0) {
2214 /* we don't really need to have a special handling for failed cases here,
2215 * as this is just an optimization. */
2216 lderr(cct) << "ERROR: svc.cls->timelog.add() returned " << ret << dendl;
2217 return ret;
2218 }
2219
2220 real_time expiration = now;
2221 expiration += make_timespan(cct->_conf->rgw_data_log_window);
2222
2223 list<rgw_bucket_shard>& buckets = miter->second.first;
2224 list<rgw_bucket_shard>::iterator liter;
2225 for (liter = buckets.begin(); liter != buckets.end(); ++liter) {
2226 update_renewed(*liter, expiration);
2227 }
2228 }
2229
2230 return 0;
2231 }
2232
2233 void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status)
2234 {
2235 ceph_assert(ceph_mutex_is_locked(lock));
2236 if (!changes.find(bs, status)) {
2237 status = ChangeStatusPtr(new ChangeStatus);
2238 changes.add(bs, status);
2239 }
2240 }
2241
2242 void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs)
2243 {
2244 std::lock_guard l{lock};
2245 cur_cycle[bs] = true;
2246 }
2247
2248 void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, real_time& expiration)
2249 {
2250 std::lock_guard l{lock};
2251 ChangeStatusPtr status;
2252 _get_change(bs, status);
2253
2254 ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs.bucket.name << " shard_id=" << bs.shard_id << " expiration=" << expiration << dendl;
2255 status->cur_expiration = expiration;
2256 }
2257
2258 int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) {
2259 rgw_bucket_shard bs(bucket, shard_id);
2260
2261 return choose_oid(bs);
2262 }
2263
2264 bool RGWDataChangesLog::filter_bucket(const rgw_bucket& bucket, optional_yield y) const
2265 {
2266 if (!bucket_filter) {
2267 return true;
2268 }
2269
2270 return bucket_filter->filter(bucket, y);
2271 }
2272
2273 int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id) {
2274 auto& bucket = bucket_info.bucket;
2275
2276 if (!filter_bucket(bucket, null_yield)) {
2277 return 0;
2278 }
2279
2280 if (observer) {
2281 observer->on_bucket_changed(bucket.get_key());
2282 }
2283
2284 rgw_bucket_shard bs(bucket, shard_id);
2285
2286 int index = choose_oid(bs);
2287 mark_modified(index, bs);
2288
2289 lock.lock();
2290
2291 ChangeStatusPtr status;
2292 _get_change(bs, status);
2293
2294 lock.unlock();
2295
2296 real_time now = real_clock::now();
2297
2298 status->lock.lock();
2299
2300 ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
2301
2302 if (now < status->cur_expiration) {
2303 /* no need to send, recently completed */
2304 status->lock.unlock();
2305
2306 register_renew(bs);
2307 return 0;
2308 }
2309
2310 RefCountedCond *cond;
2311
2312 if (status->pending) {
2313 cond = status->cond;
2314
2315 ceph_assert(cond);
2316
2317 status->cond->get();
2318 status->lock.unlock();
2319
2320 int ret = cond->wait();
2321 cond->put();
2322 if (!ret) {
2323 register_renew(bs);
2324 }
2325 return ret;
2326 }
2327
2328 status->cond = new RefCountedCond;
2329 status->pending = true;
2330
2331 string& oid = oids[index];
2332 real_time expiration;
2333
2334 int ret;
2335
2336 do {
2337 status->cur_sent = now;
2338
2339 expiration = now;
2340 expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
2341
2342 status->lock.unlock();
2343
2344 bufferlist bl;
2345 rgw_data_change change;
2346 change.entity_type = ENTITY_TYPE_BUCKET;
2347 change.key = bs.get_key();
2348 change.timestamp = now;
2349 encode(change, bl);
2350 string section;
2351
2352 ldout(cct, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
2353
2354 ret = svc.cls->timelog.add(oid, now, section, change.key, bl, null_yield);
2355
2356 now = real_clock::now();
2357
2358 status->lock.lock();
2359
2360 } while (!ret && real_clock::now() > expiration);
2361
2362 cond = status->cond;
2363
2364 status->pending = false;
2365 status->cur_expiration = status->cur_sent; /* time of when operation started, not completed */
2366 status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window);
2367 status->cond = NULL;
2368 status->lock.unlock();
2369
2370 cond->done(ret);
2371 cond->put();
2372
2373 return ret;
2374 }
2375
2376 int RGWDataChangesLog::list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
2377 list<rgw_data_change_log_entry>& entries,
2378 const string& marker,
2379 string *out_marker,
2380 bool *truncated) {
2381 if (shard >= num_shards)
2382 return -EINVAL;
2383
2384 list<cls_log_entry> log_entries;
2385
2386 int ret = svc.cls->timelog.list(oids[shard], start_time, end_time,
2387 max_entries, log_entries, marker,
2388 out_marker, truncated, null_yield);
2389 if (ret < 0)
2390 return ret;
2391
2392 list<cls_log_entry>::iterator iter;
2393 for (iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
2394 rgw_data_change_log_entry log_entry;
2395 log_entry.log_id = iter->id;
2396 real_time rt = iter->timestamp.to_real_time();
2397 log_entry.log_timestamp = rt;
2398 auto liter = iter->data.cbegin();
2399 try {
2400 decode(log_entry.entry, liter);
2401 } catch (buffer::error& err) {
2402 lderr(cct) << "ERROR: failed to decode data changes log entry" << dendl;
2403 return -EIO;
2404 }
2405 entries.push_back(log_entry);
2406 }
2407
2408 return 0;
2409 }
2410
2411 int RGWDataChangesLog::list_entries(const real_time& start_time, const real_time& end_time, int max_entries,
2412 list<rgw_data_change_log_entry>& entries, LogMarker& marker, bool *ptruncated) {
2413 bool truncated;
2414 entries.clear();
2415
2416 for (; marker.shard < num_shards && (int)entries.size() < max_entries;
2417 marker.shard++, marker.marker.clear()) {
2418 int ret = list_entries(marker.shard, start_time, end_time, max_entries - entries.size(), entries,
2419 marker.marker, NULL, &truncated);
2420 if (ret == -ENOENT) {
2421 continue;
2422 }
2423 if (ret < 0) {
2424 return ret;
2425 }
2426 if (truncated) {
2427 *ptruncated = true;
2428 return 0;
2429 }
2430 }
2431
2432 *ptruncated = (marker.shard < num_shards);
2433
2434 return 0;
2435 }
2436
2437 int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info)
2438 {
2439 if (shard_id >= num_shards)
2440 return -EINVAL;
2441
2442 string oid = oids[shard_id];
2443
2444 cls_log_header header;
2445
2446 int ret = svc.cls->timelog.info(oid, &header, null_yield);
2447 if ((ret < 0) && (ret != -ENOENT))
2448 return ret;
2449
2450 info->marker = header.max_marker;
2451 info->last_update = header.max_time.to_real_time();
2452
2453 return 0;
2454 }
2455
2456 int RGWDataChangesLog::trim_entries(int shard_id, const real_time& start_time, const real_time& end_time,
2457 const string& start_marker, const string& end_marker)
2458 {
2459 if (shard_id > num_shards)
2460 return -EINVAL;
2461
2462 return svc.cls->timelog.trim(oids[shard_id], start_time, end_time,
2463 start_marker, end_marker, nullptr, null_yield);
2464 }
2465
2466 bool RGWDataChangesLog::going_down()
2467 {
2468 return down_flag;
2469 }
2470
2471 RGWDataChangesLog::~RGWDataChangesLog() {
2472 down_flag = true;
2473 renew_thread->stop();
2474 renew_thread->join();
2475 delete renew_thread;
2476 delete[] oids;
2477 }
2478
2479 void *RGWDataChangesLog::ChangesRenewThread::entry() {
2480 for (;;) {
2481 dout(2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl;
2482 int r = log->renew_entries();
2483 if (r < 0) {
2484 dout(0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl;
2485 }
2486
2487 if (log->going_down())
2488 break;
2489
2490 int interval = cct->_conf->rgw_data_log_window * 3 / 4;
2491 std::unique_lock locker{lock};
2492 cond.wait_for(locker, std::chrono::seconds(interval));
2493 }
2494
2495 return NULL;
2496 }
2497
2498 void RGWDataChangesLog::ChangesRenewThread::stop()
2499 {
2500 std::lock_guard l{lock};
2501 cond.notify_all();
2502 }
2503
2504 void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
2505 {
2506 auto key = bs.get_key();
2507 {
2508 std::shared_lock rl{modified_lock}; // read lock to check for existence
2509 auto shard = modified_shards.find(shard_id);
2510 if (shard != modified_shards.end() && shard->second.count(key)) {
2511 return;
2512 }
2513 }
2514
2515 std::unique_lock wl{modified_lock}; // write lock for insertion
2516 modified_shards[shard_id].insert(key);
2517 }
2518
2519 void RGWDataChangesLog::read_clear_modified(map<int, set<string> > &modified)
2520 {
2521 std::unique_lock wl{modified_lock};
2522 modified.swap(modified_shards);
2523 modified_shards.clear();
2524 }
2525
2526 void RGWBucketCompleteInfo::dump(Formatter *f) const {
2527 encode_json("bucket_info", info, f);
2528 encode_json("attrs", attrs, f);
2529 }
2530
2531 void RGWBucketCompleteInfo::decode_json(JSONObj *obj) {
2532 JSONDecoder::decode_json("bucket_info", info, obj);
2533 JSONDecoder::decode_json("attrs", attrs, obj);
2534 }
2535
2536 class RGWBucketMetadataHandler : public RGWBucketMetadataHandlerBase {
2537 public:
2538 struct Svc {
2539 RGWSI_Bucket *bucket{nullptr};
2540 } svc;
2541
2542 struct Ctl {
2543 RGWBucketCtl *bucket{nullptr};
2544 } ctl;
2545
2546 RGWBucketMetadataHandler() {}
2547
2548 void init(RGWSI_Bucket *bucket_svc,
2549 RGWBucketCtl *bucket_ctl) override {
2550 base_init(bucket_svc->ctx(),
2551 bucket_svc->get_ep_be_handler().get());
2552 svc.bucket = bucket_svc;
2553 ctl.bucket = bucket_ctl;
2554 }
2555
2556 string get_type() override { return "bucket"; }
2557
2558 RGWMetadataObject *get_meta_obj(JSONObj *jo, const obj_version& objv, const ceph::real_time& mtime) override {
2559 RGWBucketEntryPoint be;
2560
2561 try {
2562 decode_json_obj(be, jo);
2563 } catch (JSONDecoder::err& e) {
2564 return nullptr;
2565 }
2566
2567 return new RGWBucketEntryMetadataObject(be, objv, mtime);
2568 }
2569
2570 int do_get(RGWSI_MetaBackend_Handler::Op *op, string& entry, RGWMetadataObject **obj, optional_yield y) override {
2571 RGWObjVersionTracker ot;
2572 RGWBucketEntryPoint be;
2573
2574 real_time mtime;
2575 map<string, bufferlist> attrs;
2576
2577 RGWSI_Bucket_EP_Ctx ctx(op->ctx());
2578
2579 int ret = svc.bucket->read_bucket_entrypoint_info(ctx, entry, &be, &ot, &mtime, &attrs, y);
2580 if (ret < 0)
2581 return ret;
2582
2583 RGWBucketEntryMetadataObject *mdo = new RGWBucketEntryMetadataObject(be, ot.read_version, mtime, std::move(attrs));
2584
2585 *obj = mdo;
2586
2587 return 0;
2588 }
2589
2590 int do_put(RGWSI_MetaBackend_Handler::Op *op, string& entry,
2591 RGWMetadataObject *obj,
2592 RGWObjVersionTracker& objv_tracker,
2593 optional_yield y,
2594 RGWMDLogSyncType type) override;
2595
2596 int do_remove(RGWSI_MetaBackend_Handler::Op *op, string& entry, RGWObjVersionTracker& objv_tracker,
2597 optional_yield y) override {
2598 RGWBucketEntryPoint be;
2599
2600 real_time orig_mtime;
2601
2602 RGWSI_Bucket_EP_Ctx ctx(op->ctx());
2603
2604 int ret = svc.bucket->read_bucket_entrypoint_info(ctx, entry, &be, &objv_tracker, &orig_mtime, nullptr, y);
2605 if (ret < 0)
2606 return ret;
2607
2608 /*
2609 * We're unlinking the bucket but we don't want to update the entrypoint here - we're removing
2610 * it immediately and don't want to invalidate our cached objv_version or the bucket obj removal
2611 * will incorrectly fail.
2612 */
2613 ret = ctl.bucket->unlink_bucket(be.owner, be.bucket, y, false);
2614 if (ret < 0) {
2615 lderr(svc.bucket->ctx()) << "could not unlink bucket=" << entry << " owner=" << be.owner << dendl;
2616 }
2617
2618 ret = svc.bucket->remove_bucket_entrypoint_info(ctx, entry, &objv_tracker, y);
2619 if (ret < 0) {
2620 lderr(svc.bucket->ctx()) << "could not delete bucket=" << entry << dendl;
2621 }
2622 /* idempotent */
2623 return 0;
2624 }
2625
2626 int call(std::function<int(RGWSI_Bucket_EP_Ctx& ctx)> f) {
2627 return call(nullopt, f);
2628 }
2629
2630 int call(std::optional<RGWSI_MetaBackend_CtxParams> bectx_params,
2631 std::function<int(RGWSI_Bucket_EP_Ctx& ctx)> f) {
2632 return be_handler->call(bectx_params, [&](RGWSI_MetaBackend_Handler::Op *op) {
2633 RGWSI_Bucket_EP_Ctx ctx(op->ctx());
2634 return f(ctx);
2635 });
2636 }
2637 };
2638
2639 class RGWMetadataHandlerPut_Bucket : public RGWMetadataHandlerPut_SObj
2640 {
2641 RGWBucketMetadataHandler *bhandler;
2642 RGWBucketEntryMetadataObject *obj;
2643 public:
2644 RGWMetadataHandlerPut_Bucket(RGWBucketMetadataHandler *_handler,
2645 RGWSI_MetaBackend_Handler::Op *op, string& entry,
2646 RGWMetadataObject *_obj, RGWObjVersionTracker& objv_tracker,
2647 optional_yield y,
2648 RGWMDLogSyncType type) : RGWMetadataHandlerPut_SObj(_handler, op, entry, obj, objv_tracker, y, type),
2649 bhandler(_handler) {
2650 obj = static_cast<RGWBucketEntryMetadataObject *>(_obj);
2651 }
2652 ~RGWMetadataHandlerPut_Bucket() {}
2653
2654 void encode_obj(bufferlist *bl) override {
2655 obj->get_ep().encode(*bl);
2656 }
2657
2658 int put_checked() override;
2659 int put_post() override;
2660 };
2661
2662 int RGWBucketMetadataHandler::do_put(RGWSI_MetaBackend_Handler::Op *op, string& entry,
2663 RGWMetadataObject *obj,
2664 RGWObjVersionTracker& objv_tracker,
2665 optional_yield y,
2666 RGWMDLogSyncType type)
2667 {
2668 RGWMetadataHandlerPut_Bucket put_op(this, op, entry, obj, objv_tracker, y, type);
2669 return do_put_operate(&put_op);
2670 }
2671
2672 int RGWMetadataHandlerPut_Bucket::put_checked()
2673 {
2674 RGWBucketEntryMetadataObject *orig_obj = static_cast<RGWBucketEntryMetadataObject *>(old_obj);
2675
2676 if (orig_obj) {
2677 obj->set_pattrs(&orig_obj->get_attrs());
2678 }
2679
2680 auto& be = obj->get_ep();
2681 auto mtime = obj->get_mtime();
2682 auto pattrs = obj->get_pattrs();
2683
2684 RGWSI_Bucket_EP_Ctx ctx(op->ctx());
2685
2686 return bhandler->svc.bucket->store_bucket_entrypoint_info(ctx, entry,
2687 be,
2688 false,
2689 mtime,
2690 pattrs,
2691 &objv_tracker,
2692 y);
2693 }
2694
2695 int RGWMetadataHandlerPut_Bucket::put_post()
2696 {
2697 auto& be = obj->get_ep();
2698
2699 int ret;
2700
2701 /* link bucket */
2702 if (be.linked) {
2703 ret = bhandler->ctl.bucket->link_bucket(be.owner, be.bucket, be.creation_time, y, false);
2704 } else {
2705 ret = bhandler->ctl.bucket->unlink_bucket(be.owner, be.bucket, y, false);
2706 }
2707
2708 return ret;
2709 }
2710
2711 static void get_md5_digest(const RGWBucketEntryPoint *be, string& md5_digest) {
2712
2713 char md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
2714 unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
2715 bufferlist bl;
2716
2717 Formatter *f = new JSONFormatter(false);
2718 be->dump(f);
2719 f->flush(bl);
2720
2721 MD5 hash;
2722 // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes
2723 hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW);
2724 hash.Update((const unsigned char *)bl.c_str(), bl.length());
2725 hash.Final(m);
2726
2727 buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, md5);
2728
2729 delete f;
2730
2731 md5_digest = md5;
2732 }
2733
2734 #define ARCHIVE_META_ATTR RGW_ATTR_PREFIX "zone.archive.info"
2735
2736 struct archive_meta_info {
2737 rgw_bucket orig_bucket;
2738
2739 bool from_attrs(CephContext *cct, map<string, bufferlist>& attrs) {
2740 auto iter = attrs.find(ARCHIVE_META_ATTR);
2741 if (iter == attrs.end()) {
2742 return false;
2743 }
2744
2745 auto bliter = iter->second.cbegin();
2746 try {
2747 decode(bliter);
2748 } catch (buffer::error& err) {
2749 ldout(cct, 0) << "ERROR: failed to decode archive meta info" << dendl;
2750 return false;
2751 }
2752
2753 return true;
2754 }
2755
2756 void store_in_attrs(map<string, bufferlist>& attrs) const {
2757 encode(attrs[ARCHIVE_META_ATTR]);
2758 }
2759
2760 void encode(bufferlist& bl) const {
2761 ENCODE_START(1, 1, bl);
2762 encode(orig_bucket, bl);
2763 ENCODE_FINISH(bl);
2764 }
2765
2766 void decode(bufferlist::const_iterator& bl) {
2767 DECODE_START(1, bl);
2768 decode(orig_bucket, bl);
2769 DECODE_FINISH(bl);
2770 }
2771 };
2772 WRITE_CLASS_ENCODER(archive_meta_info)
2773
2774 class RGWArchiveBucketMetadataHandler : public RGWBucketMetadataHandler {
2775 public:
2776 RGWArchiveBucketMetadataHandler() {}
2777
2778 int do_remove(RGWSI_MetaBackend_Handler::Op *op, string& entry, RGWObjVersionTracker& objv_tracker,
2779 optional_yield y) override {
2780 auto cct = svc.bucket->ctx();
2781
2782 RGWSI_Bucket_EP_Ctx ctx(op->ctx());
2783
2784 ldout(cct, 5) << "SKIP: bucket removal is not allowed on archive zone: bucket:" << entry << " ... proceeding to rename" << dendl;
2785
2786 string tenant_name, bucket_name;
2787 parse_bucket(entry, &tenant_name, &bucket_name);
2788 rgw_bucket entry_bucket;
2789 entry_bucket.tenant = tenant_name;
2790 entry_bucket.name = bucket_name;
2791
2792 real_time mtime;
2793
2794 /* read original entrypoint */
2795
2796 RGWBucketEntryPoint be;
2797 map<string, bufferlist> attrs;
2798 int ret = svc.bucket->read_bucket_entrypoint_info(ctx, entry, &be, &objv_tracker, &mtime, &attrs, y);
2799 if (ret < 0) {
2800 return ret;
2801 }
2802
2803 string bi_meta_name = RGWSI_Bucket::get_bi_meta_key(be.bucket);
2804
2805 /* read original bucket instance info */
2806
2807 map<string, bufferlist> attrs_m;
2808 ceph::real_time orig_mtime;
2809 RGWBucketInfo old_bi;
2810
2811 ret = ctl.bucket->read_bucket_instance_info(be.bucket, &old_bi, y, RGWBucketCtl::BucketInstance::GetParams()
2812 .set_mtime(&orig_mtime)
2813 .set_attrs(&attrs_m));
2814 if (ret < 0) {
2815 return ret;
2816 }
2817
2818 archive_meta_info ami;
2819
2820 if (!ami.from_attrs(svc.bucket->ctx(), attrs_m)) {
2821 ami.orig_bucket = old_bi.bucket;
2822 ami.store_in_attrs(attrs_m);
2823 }
2824
2825 /* generate a new bucket instance. We could have avoided this if we could just point a new
2826 * bucket entry point to the old bucket instance, however, due to limitation in the way
2827 * we index buckets under the user, bucket entrypoint and bucket instance of the same
2828 * bucket need to have the same name, so we need to copy the old bucket instance into
2829 * to a new entry with the new name
2830 */
2831
2832 string new_bucket_name;
2833
2834 RGWBucketInfo new_bi = old_bi;
2835 RGWBucketEntryPoint new_be = be;
2836
2837 string md5_digest;
2838
2839 get_md5_digest(&new_be, md5_digest);
2840 new_bucket_name = ami.orig_bucket.name + "-deleted-" + md5_digest;
2841
2842 new_bi.bucket.name = new_bucket_name;
2843 new_bi.objv_tracker.clear();
2844
2845 new_be.bucket.name = new_bucket_name;
2846
2847 ret = ctl.bucket->store_bucket_instance_info(be.bucket, new_bi, y, RGWBucketCtl::BucketInstance::PutParams()
2848 .set_exclusive(false)
2849 .set_mtime(orig_mtime)
2850 .set_attrs(&attrs_m)
2851 .set_orig_info(&old_bi));
2852 if (ret < 0) {
2853 ldout(cct, 0) << "ERROR: failed to put new bucket instance info for bucket=" << new_bi.bucket << " ret=" << ret << dendl;
2854 return ret;
2855 }
2856
2857 /* store a new entrypoint */
2858
2859 RGWObjVersionTracker ot;
2860 ot.generate_new_write_ver(cct);
2861
2862 ret = svc.bucket->store_bucket_entrypoint_info(ctx, RGWSI_Bucket::get_entrypoint_meta_key(new_be.bucket),
2863 new_be, true, mtime, &attrs, nullptr, y);
2864 if (ret < 0) {
2865 ldout(cct, 0) << "ERROR: failed to put new bucket entrypoint for bucket=" << new_be.bucket << " ret=" << ret << dendl;
2866 return ret;
2867 }
2868
2869 /* link new bucket */
2870
2871 ret = ctl.bucket->link_bucket(new_be.owner, new_be.bucket, new_be.creation_time, y, false);
2872 if (ret < 0) {
2873 ldout(cct, 0) << "ERROR: failed to link new bucket for bucket=" << new_be.bucket << " ret=" << ret << dendl;
2874 return ret;
2875 }
2876
2877 /* clean up old stuff */
2878
2879 ret = ctl.bucket->unlink_bucket(be.owner, entry_bucket, y, false);
2880 if (ret < 0) {
2881 lderr(cct) << "could not unlink bucket=" << entry << " owner=" << be.owner << dendl;
2882 }
2883
2884 // if (ret == -ECANCELED) it means that there was a race here, and someone
2885 // wrote to the bucket entrypoint just before we removed it. The question is
2886 // whether it was a newly created bucket entrypoint ... in which case we
2887 // should ignore the error and move forward, or whether it is a higher version
2888 // of the same bucket instance ... in which we should retry
2889 ret = svc.bucket->remove_bucket_entrypoint_info(ctx,
2890 RGWSI_Bucket::get_entrypoint_meta_key(be.bucket),
2891 &objv_tracker,
2892 y);
2893 if (ret < 0) {
2894 ldout(cct, 0) << "ERROR: failed to put new bucket entrypoint for bucket=" << new_be.bucket << " ret=" << ret << dendl;
2895 return ret;
2896 }
2897
2898 ret = ctl.bucket->remove_bucket_instance_info(be.bucket, old_bi, y);
2899 if (ret < 0) {
2900 lderr(cct) << "could not delete bucket=" << entry << dendl;
2901 }
2902
2903
2904 /* idempotent */
2905
2906 return 0;
2907 }
2908
2909 int do_put(RGWSI_MetaBackend_Handler::Op *op, string& entry,
2910 RGWMetadataObject *obj,
2911 RGWObjVersionTracker& objv_tracker,
2912 optional_yield y,
2913 RGWMDLogSyncType type) override {
2914 if (entry.find("-deleted-") != string::npos) {
2915 RGWObjVersionTracker ot;
2916 RGWMetadataObject *robj;
2917 int ret = do_get(op, entry, &robj, y);
2918 if (ret != -ENOENT) {
2919 if (ret < 0) {
2920 return ret;
2921 }
2922 ot.read_version = robj->get_version();
2923 delete robj;
2924
2925 ret = do_remove(op, entry, ot, y);
2926 if (ret < 0) {
2927 return ret;
2928 }
2929 }
2930 }
2931
2932 return RGWBucketMetadataHandler::do_put(op, entry, obj,
2933 objv_tracker, y, type);
2934 }
2935
2936 };
2937
2938 class RGWBucketInstanceMetadataHandler : public RGWBucketInstanceMetadataHandlerBase {
2939 int read_bucket_instance_entry(RGWSI_Bucket_BI_Ctx& ctx,
2940 const string& entry,
2941 RGWBucketCompleteInfo *bi,
2942 ceph::real_time *pmtime,
2943 optional_yield y) {
2944 return svc.bucket->read_bucket_instance_info(ctx,
2945 entry,
2946 &bi->info,
2947 pmtime, &bi->attrs,
2948 y);
2949 }
2950
2951 public:
2952 struct Svc {
2953 RGWSI_Zone *zone{nullptr};
2954 RGWSI_Bucket *bucket{nullptr};
2955 RGWSI_BucketIndex *bi{nullptr};
2956 } svc;
2957
2958 RGWBucketInstanceMetadataHandler() {}
2959
2960 void init(RGWSI_Zone *zone_svc,
2961 RGWSI_Bucket *bucket_svc,
2962 RGWSI_BucketIndex *bi_svc) {
2963 base_init(bucket_svc->ctx(),
2964 bucket_svc->get_bi_be_handler().get());
2965 svc.zone = zone_svc;
2966 svc.bucket = bucket_svc;
2967 svc.bi = bi_svc;
2968 }
2969
2970 string get_type() override { return "bucket.instance"; }
2971
2972 RGWMetadataObject *get_meta_obj(JSONObj *jo, const obj_version& objv, const ceph::real_time& mtime) override {
2973 RGWBucketCompleteInfo bci;
2974
2975 try {
2976 decode_json_obj(bci, jo);
2977 } catch (JSONDecoder::err& e) {
2978 return nullptr;
2979 }
2980
2981 return new RGWBucketInstanceMetadataObject(bci, objv, mtime);
2982 }
2983
2984 int do_get(RGWSI_MetaBackend_Handler::Op *op, string& entry, RGWMetadataObject **obj, optional_yield y) override {
2985 RGWBucketCompleteInfo bci;
2986 real_time mtime;
2987
2988 RGWSI_Bucket_BI_Ctx ctx(op->ctx());
2989
2990 int ret = svc.bucket->read_bucket_instance_info(ctx, entry, &bci.info, &mtime, &bci.attrs, y);
2991 if (ret < 0)
2992 return ret;
2993
2994 RGWBucketInstanceMetadataObject *mdo = new RGWBucketInstanceMetadataObject(bci, bci.info.objv_tracker.read_version, mtime);
2995
2996 *obj = mdo;
2997
2998 return 0;
2999 }
3000
3001 int do_put(RGWSI_MetaBackend_Handler::Op *op, string& entry,
3002 RGWMetadataObject *_obj, RGWObjVersionTracker& objv_tracker,
3003 optional_yield y,
3004 RGWMDLogSyncType sync_type) override;
3005
3006 int do_remove(RGWSI_MetaBackend_Handler::Op *op, string& entry, RGWObjVersionTracker& objv_tracker,
3007 optional_yield y) override {
3008 RGWBucketCompleteInfo bci;
3009
3010 RGWSI_Bucket_BI_Ctx ctx(op->ctx());
3011
3012 int ret = read_bucket_instance_entry(ctx, entry, &bci, nullptr, y);
3013 if (ret < 0 && ret != -ENOENT)
3014 return ret;
3015
3016 return svc.bucket->remove_bucket_instance_info(ctx, entry, bci.info, &bci.info.objv_tracker, y);
3017 }
3018
3019 int call(std::function<int(RGWSI_Bucket_BI_Ctx& ctx)> f) {
3020 return call(nullopt, f);
3021 }
3022
3023 int call(std::optional<RGWSI_MetaBackend_CtxParams> bectx_params,
3024 std::function<int(RGWSI_Bucket_BI_Ctx& ctx)> f) {
3025 return be_handler->call(bectx_params, [&](RGWSI_MetaBackend_Handler::Op *op) {
3026 RGWSI_Bucket_BI_Ctx ctx(op->ctx());
3027 return f(ctx);
3028 });
3029 }
3030 };
3031
3032 class RGWMetadataHandlerPut_BucketInstance : public RGWMetadataHandlerPut_SObj
3033 {
3034 CephContext *cct;
3035 RGWBucketInstanceMetadataHandler *bihandler;
3036 RGWBucketInstanceMetadataObject *obj;
3037 public:
3038 RGWMetadataHandlerPut_BucketInstance(CephContext *cct,
3039 RGWBucketInstanceMetadataHandler *_handler,
3040 RGWSI_MetaBackend_Handler::Op *_op, string& entry,
3041 RGWMetadataObject *_obj, RGWObjVersionTracker& objv_tracker,
3042 optional_yield y,
3043 RGWMDLogSyncType type) : RGWMetadataHandlerPut_SObj(_handler, _op, entry, obj, objv_tracker, y, type),
3044 bihandler(_handler) {
3045 obj = static_cast<RGWBucketInstanceMetadataObject *>(_obj);
3046
3047 auto& bci = obj->get_bci();
3048 obj->set_pattrs(&bci.attrs);
3049 }
3050
3051 void encode_obj(bufferlist *bl) override {
3052 obj->get_bucket_info().encode(*bl);
3053 }
3054
3055 int put_check() override;
3056 int put_checked() override;
3057 int put_post() override;
3058 };
3059
3060 int RGWBucketInstanceMetadataHandler::do_put(RGWSI_MetaBackend_Handler::Op *op,
3061 string& entry,
3062 RGWMetadataObject *obj,
3063 RGWObjVersionTracker& objv_tracker,
3064 optional_yield y,
3065 RGWMDLogSyncType type)
3066 {
3067 RGWMetadataHandlerPut_BucketInstance put_op(svc.bucket->ctx(), this, op, entry, obj,
3068 objv_tracker, y, type);
3069 return do_put_operate(&put_op);
3070 }
3071
3072 int RGWMetadataHandlerPut_BucketInstance::put_check()
3073 {
3074 int ret;
3075
3076 RGWBucketCompleteInfo& bci = obj->get_bci();
3077
3078 RGWBucketInstanceMetadataObject *orig_obj = static_cast<RGWBucketInstanceMetadataObject *>(old_obj);
3079
3080 RGWBucketCompleteInfo *old_bci = (orig_obj ? &orig_obj->get_bci() : nullptr);
3081
3082 bool exists = (!!orig_obj);
3083
3084 if (!exists || old_bci->info.bucket.bucket_id != bci.info.bucket.bucket_id) {
3085 /* a new bucket, we need to select a new bucket placement for it */
3086 string tenant_name;
3087 string bucket_name;
3088 string bucket_instance;
3089 parse_bucket(entry, &tenant_name, &bucket_name, &bucket_instance);
3090
3091 RGWZonePlacementInfo rule_info;
3092 bci.info.bucket.name = bucket_name;
3093 bci.info.bucket.bucket_id = bucket_instance;
3094 bci.info.bucket.tenant = tenant_name;
3095 // if the sync module never writes data, don't require the zone to specify all placement targets
3096 if (bihandler->svc.zone->sync_module_supports_writes()) {
3097 ret = bihandler->svc.zone->select_bucket_location_by_rule(bci.info.placement_rule, &rule_info);
3098 if (ret < 0) {
3099 ldout(cct, 0) << "ERROR: select_bucket_placement() returned " << ret << dendl;
3100 return ret;
3101 }
3102 }
3103 bci.info.index_type = rule_info.index_type;
3104 } else {
3105 /* existing bucket, keep its placement */
3106 bci.info.bucket.explicit_placement = old_bci->info.bucket.explicit_placement;
3107 bci.info.placement_rule = old_bci->info.placement_rule;
3108 }
3109
3110 /* record the read version (if any), store the new version */
3111 bci.info.objv_tracker.read_version = objv_tracker.read_version;
3112 bci.info.objv_tracker.write_version = objv_tracker.write_version;
3113
3114 return 0;
3115 }
3116
3117 int RGWMetadataHandlerPut_BucketInstance::put_checked()
3118 {
3119 RGWBucketInstanceMetadataObject *orig_obj = static_cast<RGWBucketInstanceMetadataObject *>(old_obj);
3120
3121 RGWBucketInfo *orig_info = (orig_obj ? &orig_obj->get_bucket_info() : nullptr);
3122
3123 auto& info = obj->get_bucket_info();
3124 auto mtime = obj->get_mtime();
3125 auto pattrs = obj->get_pattrs();
3126
3127 RGWSI_Bucket_BI_Ctx ctx(op->ctx());
3128
3129 return bihandler->svc.bucket->store_bucket_instance_info(ctx,
3130 entry,
3131 info,
3132 orig_info,
3133 false,
3134 mtime,
3135 pattrs,
3136 y);
3137 }
3138
3139 int RGWMetadataHandlerPut_BucketInstance::put_post()
3140 {
3141 RGWBucketCompleteInfo& bci = obj->get_bci();
3142
3143 objv_tracker = bci.info.objv_tracker;
3144
3145 int ret = bihandler->svc.bi->init_index(bci.info);
3146 if (ret < 0) {
3147 return ret;
3148 }
3149
3150 return STATUS_APPLIED;
3151 }
3152
3153 class RGWArchiveBucketInstanceMetadataHandler : public RGWBucketInstanceMetadataHandler {
3154 public:
3155 RGWArchiveBucketInstanceMetadataHandler() {}
3156
3157 int do_remove(RGWSI_MetaBackend_Handler::Op *op, string& entry, RGWObjVersionTracker& objv_tracker, optional_yield y) override {
3158 ldout(cct, 0) << "SKIP: bucket instance removal is not allowed on archive zone: bucket.instance:" << entry << dendl;
3159 return 0;
3160 }
3161 };
3162
3163 bool RGWBucketCtl::DataLogFilter::filter(const rgw_bucket& bucket, optional_yield y) const
3164 {
3165 return bucket_ctl->bucket_exports_data(bucket, null_yield);
3166 }
3167
3168 RGWBucketCtl::RGWBucketCtl(RGWSI_Zone *zone_svc,
3169 RGWSI_Bucket *bucket_svc,
3170 RGWSI_Bucket_Sync *bucket_sync_svc,
3171 RGWSI_BucketIndex *bi_svc) : cct(zone_svc->ctx()),
3172 datalog_filter(this)
3173 {
3174 svc.zone = zone_svc;
3175 svc.bucket = bucket_svc;
3176 svc.bucket_sync = bucket_sync_svc;
3177 svc.bi = bi_svc;
3178 }
3179
3180 void RGWBucketCtl::init(RGWUserCtl *user_ctl,
3181 RGWBucketMetadataHandler *_bm_handler,
3182 RGWBucketInstanceMetadataHandler *_bmi_handler,
3183 RGWDataChangesLog *datalog)
3184 {
3185 ctl.user = user_ctl;
3186
3187 bm_handler = _bm_handler;
3188 bmi_handler = _bmi_handler;
3189
3190 bucket_be_handler = bm_handler->get_be_handler();
3191 bi_be_handler = bmi_handler->get_be_handler();
3192
3193 datalog->set_bucket_filter(&datalog_filter);
3194 }
3195
3196 int RGWBucketCtl::call(std::function<int(RGWSI_Bucket_X_Ctx& ctx)> f) {
3197 return bm_handler->call([&](RGWSI_Bucket_EP_Ctx& ep_ctx) {
3198 return bmi_handler->call([&](RGWSI_Bucket_BI_Ctx& bi_ctx) {
3199 RGWSI_Bucket_X_Ctx ctx{ep_ctx, bi_ctx};
3200 return f(ctx);
3201 });
3202 });
3203 }
3204
3205 int RGWBucketCtl::read_bucket_entrypoint_info(const rgw_bucket& bucket,
3206 RGWBucketEntryPoint *info,
3207 optional_yield y,
3208 const Bucket::GetParams& params)
3209 {
3210 return bm_handler->call(params.bectx_params, [&](RGWSI_Bucket_EP_Ctx& ctx) {
3211 return svc.bucket->read_bucket_entrypoint_info(ctx,
3212 RGWSI_Bucket::get_entrypoint_meta_key(bucket),
3213 info,
3214 params.objv_tracker,
3215 params.mtime,
3216 params.attrs,
3217 y,
3218 params.cache_info,
3219 params.refresh_version);
3220 });
3221 }
3222
3223 int RGWBucketCtl::store_bucket_entrypoint_info(const rgw_bucket& bucket,
3224 RGWBucketEntryPoint& info,
3225 optional_yield y,
3226 const Bucket::PutParams& params)
3227 {
3228 return bm_handler->call([&](RGWSI_Bucket_EP_Ctx& ctx) {
3229 return svc.bucket->store_bucket_entrypoint_info(ctx,
3230 RGWSI_Bucket::get_entrypoint_meta_key(bucket),
3231 info,
3232 params.exclusive,
3233 params.mtime,
3234 params.attrs,
3235 params.objv_tracker,
3236 y);
3237 });
3238 }
3239
3240 int RGWBucketCtl::remove_bucket_entrypoint_info(const rgw_bucket& bucket,
3241 optional_yield y,
3242 const Bucket::RemoveParams& params)
3243 {
3244 return bm_handler->call([&](RGWSI_Bucket_EP_Ctx& ctx) {
3245 return svc.bucket->remove_bucket_entrypoint_info(ctx,
3246 RGWSI_Bucket::get_entrypoint_meta_key(bucket),
3247 params.objv_tracker,
3248 y);
3249 });
3250 }
3251
3252 int RGWBucketCtl::read_bucket_instance_info(const rgw_bucket& bucket,
3253 RGWBucketInfo *info,
3254 optional_yield y,
3255 const BucketInstance::GetParams& params)
3256 {
3257 int ret = bmi_handler->call(params.bectx_params, [&](RGWSI_Bucket_BI_Ctx& ctx) {
3258 return svc.bucket->read_bucket_instance_info(ctx,
3259 RGWSI_Bucket::get_bi_meta_key(bucket),
3260 info,
3261 params.mtime,
3262 params.attrs,
3263 y,
3264 params.cache_info,
3265 params.refresh_version);
3266 });
3267
3268 if (ret < 0) {
3269 return ret;
3270 }
3271
3272 if (params.objv_tracker) {
3273 *params.objv_tracker = info->objv_tracker;
3274 }
3275
3276 return 0;
3277 }
3278
3279 int RGWBucketCtl::read_bucket_info(const rgw_bucket& bucket,
3280 RGWBucketInfo *info,
3281 optional_yield y,
3282 const BucketInstance::GetParams& params,
3283 RGWObjVersionTracker *ep_objv_tracker)
3284 {
3285 const rgw_bucket *b = &bucket;
3286
3287 std::optional<RGWBucketEntryPoint> ep;
3288
3289 if (b->bucket_id.empty()) {
3290 ep.emplace();
3291
3292 int r = read_bucket_entrypoint_info(*b, &(*ep), y, RGWBucketCtl::Bucket::GetParams()
3293 .set_bectx_params(params.bectx_params)
3294 .set_objv_tracker(ep_objv_tracker));
3295 if (r < 0) {
3296 return r;
3297 }
3298
3299 b = &ep->bucket;
3300 }
3301
3302 int ret = bmi_handler->call(params.bectx_params, [&](RGWSI_Bucket_BI_Ctx& ctx) {
3303 return svc.bucket->read_bucket_instance_info(ctx,
3304 RGWSI_Bucket::get_bi_meta_key(*b),
3305 info,
3306 params.mtime,
3307 params.attrs,
3308 y,
3309 params.cache_info,
3310 params.refresh_version);
3311 });
3312
3313 if (ret < 0) {
3314 return ret;
3315 }
3316
3317 if (params.objv_tracker) {
3318 *params.objv_tracker = info->objv_tracker;
3319 }
3320
3321 return 0;
3322 }
3323
3324 int RGWBucketCtl::do_store_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx,
3325 const rgw_bucket& bucket,
3326 RGWBucketInfo& info,
3327 optional_yield y,
3328 const BucketInstance::PutParams& params)
3329 {
3330 if (params.objv_tracker) {
3331 info.objv_tracker = *params.objv_tracker;
3332 }
3333
3334 return svc.bucket->store_bucket_instance_info(ctx,
3335 RGWSI_Bucket::get_bi_meta_key(bucket),
3336 info,
3337 params.orig_info,
3338 params.exclusive,
3339 params.mtime,
3340 params.attrs,
3341 y);
3342 }
3343
3344 int RGWBucketCtl::store_bucket_instance_info(const rgw_bucket& bucket,
3345 RGWBucketInfo& info,
3346 optional_yield y,
3347 const BucketInstance::PutParams& params)
3348 {
3349 return bmi_handler->call([&](RGWSI_Bucket_BI_Ctx& ctx) {
3350 return do_store_bucket_instance_info(ctx, bucket, info, y, params);
3351 });
3352 }
3353
3354 int RGWBucketCtl::remove_bucket_instance_info(const rgw_bucket& bucket,
3355 RGWBucketInfo& info,
3356 optional_yield y,
3357 const BucketInstance::RemoveParams& params)
3358 {
3359 if (params.objv_tracker) {
3360 info.objv_tracker = *params.objv_tracker;
3361 }
3362
3363 return bmi_handler->call([&](RGWSI_Bucket_BI_Ctx& ctx) {
3364 return svc.bucket->remove_bucket_instance_info(ctx,
3365 RGWSI_Bucket::get_bi_meta_key(bucket),
3366 info,
3367 &info.objv_tracker,
3368 y);
3369 });
3370 }
3371
3372 int RGWBucketCtl::do_store_linked_bucket_info(RGWSI_Bucket_X_Ctx& ctx,
3373 RGWBucketInfo& info,
3374 RGWBucketInfo *orig_info,
3375 bool exclusive, real_time mtime,
3376 obj_version *pep_objv,
3377 map<string, bufferlist> *pattrs,
3378 bool create_entry_point,
3379 optional_yield y)
3380 {
3381 bool create_head = !info.has_instance_obj || create_entry_point;
3382
3383 int ret = svc.bucket->store_bucket_instance_info(ctx.bi,
3384 RGWSI_Bucket::get_bi_meta_key(info.bucket),
3385 info,
3386 orig_info,
3387 exclusive,
3388 mtime, pattrs,
3389 y);
3390 if (ret < 0) {
3391 return ret;
3392 }
3393
3394 if (!create_head)
3395 return 0; /* done! */
3396
3397 RGWBucketEntryPoint entry_point;
3398 entry_point.bucket = info.bucket;
3399 entry_point.owner = info.owner;
3400 entry_point.creation_time = info.creation_time;
3401 entry_point.linked = true;
3402 RGWObjVersionTracker ot;
3403 if (pep_objv && !pep_objv->tag.empty()) {
3404 ot.write_version = *pep_objv;
3405 } else {
3406 ot.generate_new_write_ver(cct);
3407 if (pep_objv) {
3408 *pep_objv = ot.write_version;
3409 }
3410 }
3411 ret = svc.bucket->store_bucket_entrypoint_info(ctx.ep,
3412 RGWSI_Bucket::get_entrypoint_meta_key(info.bucket),
3413 entry_point,
3414 exclusive,
3415 mtime,
3416 pattrs,
3417 &ot,
3418 y);
3419 if (ret < 0)
3420 return ret;
3421
3422 return 0;
3423 }
3424 int RGWBucketCtl::convert_old_bucket_info(RGWSI_Bucket_X_Ctx& ctx,
3425 const rgw_bucket& bucket,
3426 optional_yield y)
3427 {
3428 RGWBucketEntryPoint entry_point;
3429 real_time ep_mtime;
3430 RGWObjVersionTracker ot;
3431 map<string, bufferlist> attrs;
3432 RGWBucketInfo info;
3433 auto cct = svc.bucket->ctx();
3434
3435 ldout(cct, 10) << "RGWRados::convert_old_bucket_info(): bucket=" << bucket << dendl;
3436
3437 int ret = svc.bucket->read_bucket_entrypoint_info(ctx.ep,
3438 RGWSI_Bucket::get_entrypoint_meta_key(bucket),
3439 &entry_point, &ot, &ep_mtime, &attrs, y);
3440 if (ret < 0) {
3441 ldout(cct, 0) << "ERROR: get_bucket_entrypoint_info() returned " << ret << " bucket=" << bucket << dendl;
3442 return ret;
3443 }
3444
3445 if (!entry_point.has_bucket_info) {
3446 /* already converted! */
3447 return 0;
3448 }
3449
3450 info = entry_point.old_bucket_info;
3451
3452 ot.generate_new_write_ver(cct);
3453
3454 ret = do_store_linked_bucket_info(ctx, info, nullptr, false, ep_mtime, &ot.write_version, &attrs, true, y);
3455 if (ret < 0) {
3456 ldout(cct, 0) << "ERROR: failed to put_linked_bucket_info(): " << ret << dendl;
3457 return ret;
3458 }
3459
3460 return 0;
3461 }
3462
3463 int RGWBucketCtl::set_bucket_instance_attrs(RGWBucketInfo& bucket_info,
3464 map<string, bufferlist>& attrs,
3465 RGWObjVersionTracker *objv_tracker,
3466 optional_yield y)
3467 {
3468 return call([&](RGWSI_Bucket_X_Ctx& ctx) {
3469 rgw_bucket& bucket = bucket_info.bucket;
3470
3471 if (!bucket_info.has_instance_obj) {
3472 /* an old bucket object, need to convert it */
3473 int ret = convert_old_bucket_info(ctx, bucket, y);
3474 if (ret < 0) {
3475 ldout(cct, 0) << "ERROR: failed converting old bucket info: " << ret << dendl;
3476 return ret;
3477 }
3478 }
3479
3480 return do_store_bucket_instance_info(ctx.bi,
3481 bucket,
3482 bucket_info,
3483 y,
3484 BucketInstance::PutParams().set_attrs(&attrs)
3485 .set_objv_tracker(objv_tracker)
3486 .set_orig_info(&bucket_info));
3487 });
3488 }
3489
3490
3491 int RGWBucketCtl::link_bucket(const rgw_user& user_id,
3492 const rgw_bucket& bucket,
3493 ceph::real_time creation_time,
3494 optional_yield y,
3495 bool update_entrypoint,
3496 rgw_ep_info *pinfo)
3497 {
3498 return bm_handler->call([&](RGWSI_Bucket_EP_Ctx& ctx) {
3499 return do_link_bucket(ctx, user_id, bucket, creation_time, y,
3500 update_entrypoint, pinfo);
3501 });
3502 }
3503
3504 int RGWBucketCtl::do_link_bucket(RGWSI_Bucket_EP_Ctx& ctx,
3505 const rgw_user& user_id,
3506 const rgw_bucket& bucket,
3507 ceph::real_time creation_time,
3508 optional_yield y,
3509 bool update_entrypoint,
3510 rgw_ep_info *pinfo)
3511 {
3512 int ret;
3513
3514 RGWBucketEntryPoint ep;
3515 RGWObjVersionTracker ot;
3516 RGWObjVersionTracker& rot = (pinfo) ? pinfo->ep_objv : ot;
3517 map<string, bufferlist> attrs, *pattrs = nullptr;
3518 string meta_key;
3519
3520 if (update_entrypoint) {
3521 meta_key = RGWSI_Bucket::get_entrypoint_meta_key(bucket);
3522 if (pinfo) {
3523 ep = pinfo->ep;
3524 pattrs = &pinfo->attrs;
3525 } else {
3526 ret = svc.bucket->read_bucket_entrypoint_info(ctx,
3527 meta_key,
3528 &ep, &rot,
3529 nullptr, &attrs,
3530 y);
3531 if (ret < 0 && ret != -ENOENT) {
3532 ldout(cct, 0) << "ERROR: store->get_bucket_entrypoint_info() returned: "
3533 << cpp_strerror(-ret) << dendl;
3534 }
3535 pattrs = &attrs;
3536 }
3537 }
3538
3539 ret = ctl.user->add_bucket(user_id, bucket, creation_time);
3540 if (ret < 0) {
3541 ldout(cct, 0) << "ERROR: error adding bucket to user directory:"
3542 << " user=" << user_id
3543 << " bucket=" << bucket
3544 << " err=" << cpp_strerror(-ret)
3545 << dendl;
3546 goto done_err;
3547 }
3548
3549 if (!update_entrypoint)
3550 return 0;
3551
3552 ep.linked = true;
3553 ep.owner = user_id;
3554 ep.bucket = bucket;
3555 ret = svc.bucket->store_bucket_entrypoint_info(
3556 ctx, meta_key, ep, false, real_time(), pattrs, &rot, y);
3557 if (ret < 0)
3558 goto done_err;
3559
3560 return 0;
3561
3562 done_err:
3563 int r = do_unlink_bucket(ctx, user_id, bucket, y, true);
3564 if (r < 0) {
3565 ldout(cct, 0) << "ERROR: failed unlinking bucket on error cleanup: "
3566 << cpp_strerror(-r) << dendl;
3567 }
3568 return ret;
3569 }
3570
3571 int RGWBucketCtl::unlink_bucket(const rgw_user& user_id, const rgw_bucket& bucket, optional_yield y, bool update_entrypoint)
3572 {
3573 return bm_handler->call([&](RGWSI_Bucket_EP_Ctx& ctx) {
3574 return do_unlink_bucket(ctx, user_id, bucket, y, update_entrypoint);
3575 });
3576 }
3577
3578 int RGWBucketCtl::do_unlink_bucket(RGWSI_Bucket_EP_Ctx& ctx,
3579 const rgw_user& user_id,
3580 const rgw_bucket& bucket,
3581 optional_yield y,
3582 bool update_entrypoint)
3583 {
3584 int ret = ctl.user->remove_bucket(user_id, bucket);
3585 if (ret < 0) {
3586 ldout(cct, 0) << "ERROR: error removing bucket from directory: "
3587 << cpp_strerror(-ret)<< dendl;
3588 }
3589
3590 if (!update_entrypoint)
3591 return 0;
3592
3593 RGWBucketEntryPoint ep;
3594 RGWObjVersionTracker ot;
3595 map<string, bufferlist> attrs;
3596 string meta_key = RGWSI_Bucket::get_entrypoint_meta_key(bucket);
3597 ret = svc.bucket->read_bucket_entrypoint_info(ctx, meta_key, &ep, &ot, nullptr, &attrs, y);
3598 if (ret == -ENOENT)
3599 return 0;
3600 if (ret < 0)
3601 return ret;
3602
3603 if (!ep.linked)
3604 return 0;
3605
3606 if (ep.owner != user_id) {
3607 ldout(cct, 0) << "bucket entry point user mismatch, can't unlink bucket: " << ep.owner << " != " << user_id << dendl;
3608 return -EINVAL;
3609 }
3610
3611 ep.linked = false;
3612 return svc.bucket->store_bucket_entrypoint_info(ctx, meta_key, ep, false, real_time(), &attrs, &ot, y);
3613 }
3614
3615 int RGWBucketCtl::set_acl(ACLOwner& owner, rgw_bucket& bucket,
3616 RGWBucketInfo& bucket_info, bufferlist& bl,
3617 optional_yield y)
3618 {
3619 // set owner and acl
3620 bucket_info.owner = owner.get_id();
3621 std::map<std::string, bufferlist> attrs{{RGW_ATTR_ACL, bl}};
3622
3623 int r = store_bucket_instance_info(bucket, bucket_info, y,
3624 BucketInstance::PutParams().set_attrs(&attrs));
3625 if (r < 0) {
3626 cerr << "ERROR: failed to set bucket owner: " << cpp_strerror(-r) << std::endl;
3627 return r;
3628 }
3629
3630 return 0;
3631 }
3632
3633 // TODO: remove RGWRados dependency for bucket listing
3634 int RGWBucketCtl::chown(rgw::sal::RGWRadosStore *store, RGWBucketInfo& bucket_info,
3635 const rgw_user& user_id, const std::string& display_name,
3636 const std::string& marker, optional_yield y)
3637 {
3638 std::vector<rgw_bucket_dir_entry> objs;
3639 map<string, bool> common_prefixes;
3640
3641 RGWRados::Bucket target(store->getRados(), bucket_info);
3642 RGWRados::Bucket::List list_op(&target);
3643
3644 list_op.params.list_versions = true;
3645 list_op.params.allow_unordered = true;
3646 list_op.params.marker = marker;
3647
3648 bool is_truncated = false;
3649 int count = 0;
3650 int max_entries = 1000;
3651
3652 //Loop through objects and update object acls to point to bucket owner
3653
3654 do {
3655 RGWObjectCtx obj_ctx(store);
3656 objs.clear();
3657 int ret = list_op.list_objects(max_entries, &objs, &common_prefixes, &is_truncated, y);
3658 if (ret < 0) {
3659 ldout(store->ctx(), 0) << "ERROR: list objects failed: " << cpp_strerror(-ret) << dendl;
3660 return ret;
3661 }
3662
3663 list_op.params.marker = list_op.get_next_marker();
3664 count += objs.size();
3665
3666 for (const auto& obj : objs) {
3667
3668 rgw_obj r_obj(bucket_info.bucket, obj.key);
3669 RGWRados::Object op_target(store->getRados(), bucket_info, obj_ctx, r_obj);
3670 RGWRados::Object::Read read_op(&op_target);
3671
3672 map<string, bufferlist> attrs;
3673 read_op.params.attrs = &attrs;
3674 ret = read_op.prepare(y);
3675 if (ret < 0){
3676 ldout(store->ctx(), 0) << "ERROR: failed to read object " << obj.key.name << cpp_strerror(-ret) << dendl;
3677 continue;
3678 }
3679 const auto& aiter = attrs.find(RGW_ATTR_ACL);
3680 if (aiter == attrs.end()) {
3681 ldout(store->ctx(), 0) << "ERROR: no acls found for object " << obj.key.name << " .Continuing with next object." << dendl;
3682 continue;
3683 } else {
3684 bufferlist& bl = aiter->second;
3685 RGWAccessControlPolicy policy(store->ctx());
3686 ACLOwner owner;
3687 try {
3688 decode(policy, bl);
3689 owner = policy.get_owner();
3690 } catch (buffer::error& err) {
3691 ldout(store->ctx(), 0) << "ERROR: decode policy failed" << err << dendl;
3692 return -EIO;
3693 }
3694
3695 //Get the ACL from the policy
3696 RGWAccessControlList& acl = policy.get_acl();
3697
3698 //Remove grant that is set to old owner
3699 acl.remove_canon_user_grant(owner.get_id());
3700
3701 //Create a grant and add grant
3702 ACLGrant grant;
3703 grant.set_canon(user_id, display_name, RGW_PERM_FULL_CONTROL);
3704 acl.add_grant(&grant);
3705
3706 //Update the ACL owner to the new user
3707 owner.set_id(user_id);
3708 owner.set_name(display_name);
3709 policy.set_owner(owner);
3710
3711 bl.clear();
3712 encode(policy, bl);
3713
3714 obj_ctx.set_atomic(r_obj);
3715 ret = store->getRados()->set_attr(&obj_ctx, bucket_info, r_obj, RGW_ATTR_ACL, bl);
3716 if (ret < 0) {
3717 ldout(store->ctx(), 0) << "ERROR: modify attr failed " << cpp_strerror(-ret) << dendl;
3718 return ret;
3719 }
3720 }
3721 }
3722 cerr << count << " objects processed in " << bucket_info.bucket.name
3723 << ". Next marker " << list_op.params.marker.name << std::endl;
3724 } while(is_truncated);
3725 return 0;
3726 }
3727
3728 int RGWBucketCtl::read_bucket_stats(const rgw_bucket& bucket,
3729 RGWBucketEnt *result,
3730 optional_yield y)
3731 {
3732 return call([&](RGWSI_Bucket_X_Ctx& ctx) {
3733 return svc.bucket->read_bucket_stats(ctx, bucket, result, y);
3734 });
3735 }
3736
3737 int RGWBucketCtl::read_buckets_stats(map<string, RGWBucketEnt>& m,
3738 optional_yield y)
3739 {
3740 return call([&](RGWSI_Bucket_X_Ctx& ctx) {
3741 return svc.bucket->read_buckets_stats(ctx, m, y);
3742 });
3743 }
3744
3745 int RGWBucketCtl::sync_user_stats(const rgw_user& user_id,
3746 const RGWBucketInfo& bucket_info,
3747 RGWBucketEnt* pent)
3748 {
3749 RGWBucketEnt ent;
3750 if (!pent) {
3751 pent = &ent;
3752 }
3753 int r = svc.bi->read_stats(bucket_info, pent, null_yield);
3754 if (r < 0) {
3755 ldout(cct, 20) << __func__ << "(): failed to read bucket stats (r=" << r << ")" << dendl;
3756 return r;
3757 }
3758
3759 return ctl.user->flush_bucket_stats(user_id, *pent);
3760 }
3761
3762 int RGWBucketCtl::get_sync_policy_handler(std::optional<rgw_zone_id> zone,
3763 std::optional<rgw_bucket> bucket,
3764 RGWBucketSyncPolicyHandlerRef *phandler,
3765 optional_yield y)
3766 {
3767 int r = call([&](RGWSI_Bucket_X_Ctx& ctx) {
3768 return svc.bucket_sync->get_policy_handler(ctx, zone, bucket, phandler, y);
3769 });
3770 if (r < 0) {
3771 ldout(cct, 20) << __func__ << "(): failed to get policy handler for bucket=" << bucket << " (r=" << r << ")" << dendl;
3772 return r;
3773 }
3774 return 0;
3775 }
3776
3777 int RGWBucketCtl::bucket_exports_data(const rgw_bucket& bucket,
3778 optional_yield y)
3779 {
3780
3781 RGWBucketSyncPolicyHandlerRef handler;
3782
3783 int r = get_sync_policy_handler(std::nullopt, bucket, &handler, y);
3784 if (r < 0) {
3785 return r;
3786 }
3787
3788 return handler->bucket_exports_data();
3789 }
3790
3791 int RGWBucketCtl::bucket_imports_data(const rgw_bucket& bucket,
3792 optional_yield y)
3793 {
3794
3795 RGWBucketSyncPolicyHandlerRef handler;
3796
3797 int r = get_sync_policy_handler(std::nullopt, bucket, &handler, y);
3798 if (r < 0) {
3799 return r;
3800 }
3801
3802 return handler->bucket_imports_data();
3803 }
3804
3805 RGWBucketMetadataHandlerBase *RGWBucketMetaHandlerAllocator::alloc()
3806 {
3807 return new RGWBucketMetadataHandler();
3808 }
3809
3810 RGWBucketInstanceMetadataHandlerBase *RGWBucketInstanceMetaHandlerAllocator::alloc()
3811 {
3812 return new RGWBucketInstanceMetadataHandler();
3813 }
3814
3815 RGWBucketMetadataHandlerBase *RGWArchiveBucketMetaHandlerAllocator::alloc()
3816 {
3817 return new RGWArchiveBucketMetadataHandler();
3818 }
3819
3820 RGWBucketInstanceMetadataHandlerBase *RGWArchiveBucketInstanceMetaHandlerAllocator::alloc()
3821 {
3822 return new RGWArchiveBucketInstanceMetadataHandler();
3823 }
3824