]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_bucket.cc
bump version to 12.2.12-pve1
[ceph.git] / ceph / src / rgw / rgw_bucket.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <errno.h>
5
6#include <string>
7#include <map>
8#include <sstream>
9
10#include <boost/utility/string_ref.hpp>
11#include <boost/format.hpp>
12
13#include "common/errno.h"
14#include "common/ceph_json.h"
181888fb 15#include "common/backport14.h"
f64942e4 16#include "include/scope_guard.h"
7c673cae
FG
17#include "rgw_rados.h"
18#include "rgw_acl.h"
19#include "rgw_acl_s3.h"
20
21#include "include/types.h"
22#include "rgw_bucket.h"
23#include "rgw_user.h"
24#include "rgw_string.h"
224ce89b 25#include "rgw_multi.h"
7c673cae
FG
26
27#include "include/rados/librados.hpp"
28// until everything is moved from rgw_common
29#include "rgw_common.h"
f64942e4 30#include "rgw_reshard.h"
7c673cae
FG
31#include "cls/user/cls_user_types.h"
32
33#define dout_context g_ceph_context
34#define dout_subsys ceph_subsys_rgw
35
36#define BUCKET_TAG_TIMEOUT 30
37
38using namespace std;
39
40static RGWMetadataHandler *bucket_meta_handler = NULL;
41static RGWMetadataHandler *bucket_instance_meta_handler = NULL;
42
43// define as static when RGWBucket implementation compete
44void rgw_get_buckets_obj(const rgw_user& user_id, string& buckets_obj_id)
45{
46 buckets_obj_id = user_id.to_str();
47 buckets_obj_id += RGW_BUCKETS_OBJ_SUFFIX;
48}
49
50/*
51 * Note that this is not a reversal of parse_bucket(). That one deals
52 * with the syntax we need in metadata and such. This one deals with
53 * the representation in RADOS pools. We chose '/' because it's not
54 * acceptable in bucket names and thus qualified buckets cannot conflict
55 * with the legacy or S3 buckets.
56 */
57std::string rgw_make_bucket_entry_name(const std::string& tenant_name,
58 const std::string& bucket_name) {
59 std::string bucket_entry;
60
61 if (bucket_name.empty()) {
62 bucket_entry.clear();
63 } else if (tenant_name.empty()) {
64 bucket_entry = bucket_name;
65 } else {
66 bucket_entry = tenant_name + "/" + bucket_name;
67 }
68
69 return bucket_entry;
70}
71
72/*
73 * Tenants are separated from buckets in URLs by a colon in S3.
74 * This function is not to be used on Swift URLs, not even for COPY arguments.
75 */
76void rgw_parse_url_bucket(const string &bucket, const string& auth_tenant,
77 string &tenant_name, string &bucket_name) {
78
79 int pos = bucket.find(':');
80 if (pos >= 0) {
81 /*
82 * N.B.: We allow ":bucket" syntax with explicit empty tenant in order
83 * to refer to the legacy tenant, in case users in new named tenants
84 * want to access old global buckets.
85 */
86 tenant_name = bucket.substr(0, pos);
87 bucket_name = bucket.substr(pos + 1);
88 } else {
89 tenant_name = auth_tenant;
90 bucket_name = bucket;
91 }
92}
93
94/**
95 * Get all the buckets owned by a user and fill up an RGWUserBuckets with them.
96 * Returns: 0 on success, -ERR# on failure.
97 */
98int rgw_read_user_buckets(RGWRados * store,
99 const rgw_user& user_id,
100 RGWUserBuckets& buckets,
101 const string& marker,
102 const string& end_marker,
103 uint64_t max,
104 bool need_stats,
105 bool *is_truncated,
106 uint64_t default_amount)
107{
108 int ret;
109 buckets.clear();
3efd9988 110 std::string buckets_obj_id;
7c673cae
FG
111 rgw_get_buckets_obj(user_id, buckets_obj_id);
112 rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id);
7c673cae
FG
113
114 bool truncated = false;
115 string m = marker;
116
117 uint64_t total = 0;
118
119 if (!max) {
120 max = default_amount;
121 }
122
123 do {
3efd9988 124 std::list<cls_user_bucket_entry> entries;
7c673cae 125 ret = store->cls_user_list_buckets(obj, m, end_marker, max - total, entries, &m, &truncated);
3efd9988 126 if (ret == -ENOENT) {
7c673cae 127 ret = 0;
3efd9988 128 }
7c673cae 129
3efd9988 130 if (ret < 0) {
7c673cae 131 return ret;
3efd9988 132 }
7c673cae 133
3efd9988
FG
134 for (auto& entry : entries) {
135 buckets.add(RGWBucketEnt(user_id, std::move(entry)));
7c673cae
FG
136 total++;
137 }
138
139 } while (truncated && total < max);
140
141 if (is_truncated != nullptr) {
142 *is_truncated = truncated;
143 }
144
145 if (need_stats) {
146 map<string, RGWBucketEnt>& m = buckets.get_buckets();
147 ret = store->update_containers_stats(m);
148 if (ret < 0 && ret != -ENOENT) {
149 ldout(store->ctx(), 0) << "ERROR: could not get stats for buckets" << dendl;
150 return ret;
151 }
152 }
153 return 0;
154}
155
156int rgw_bucket_sync_user_stats(RGWRados *store, const rgw_user& user_id, const RGWBucketInfo& bucket_info)
157{
158 string buckets_obj_id;
159 rgw_get_buckets_obj(user_id, buckets_obj_id);
160 rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id);
161
162 return store->cls_user_sync_bucket_stats(obj, bucket_info);
163}
164
165int rgw_bucket_sync_user_stats(RGWRados *store, const string& tenant_name, const string& bucket_name)
166{
167 RGWBucketInfo bucket_info;
168 RGWObjectCtx obj_ctx(store);
169 int ret = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL);
170 if (ret < 0) {
171 ldout(store->ctx(), 0) << "ERROR: could not fetch bucket info: ret=" << ret << dendl;
172 return ret;
173 }
174
175 ret = rgw_bucket_sync_user_stats(store, bucket_info.owner, bucket_info);
176 if (ret < 0) {
177 ldout(store->ctx(), 0) << "ERROR: could not sync user stats for bucket " << bucket_name << ": ret=" << ret << dendl;
178 return ret;
179 }
180
181 return 0;
182}
183
3efd9988
FG
184int rgw_link_bucket(RGWRados* const store,
185 const rgw_user& user_id,
186 rgw_bucket& bucket,
187 ceph::real_time creation_time,
188 bool update_entrypoint)
7c673cae
FG
189{
190 int ret;
191 string& tenant_name = bucket.tenant;
192 string& bucket_name = bucket.name;
193
194 cls_user_bucket_entry new_bucket;
195
196 RGWBucketEntryPoint ep;
197 RGWObjVersionTracker ot;
198
199 bucket.convert(&new_bucket.bucket);
200 new_bucket.size = 0;
201 if (real_clock::is_zero(creation_time))
202 new_bucket.creation_time = real_clock::now();
203 else
204 new_bucket.creation_time = creation_time;
205
206 map<string, bufferlist> attrs;
207 RGWObjectCtx obj_ctx(store);
208
209 if (update_entrypoint) {
210 ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, ep, &ot, NULL, &attrs);
211 if (ret < 0 && ret != -ENOENT) {
212 ldout(store->ctx(), 0) << "ERROR: store->get_bucket_entrypoint_info() returned: "
213 << cpp_strerror(-ret) << dendl;
214 }
215 }
216
217 string buckets_obj_id;
218 rgw_get_buckets_obj(user_id, buckets_obj_id);
219
220 rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id);
221 ret = store->cls_user_add_bucket(obj, new_bucket);
222 if (ret < 0) {
223 ldout(store->ctx(), 0) << "ERROR: error adding bucket to directory: "
224 << cpp_strerror(-ret) << dendl;
225 goto done_err;
226 }
227
228 if (!update_entrypoint)
229 return 0;
230
231 ep.linked = true;
232 ep.owner = user_id;
233 ep.bucket = bucket;
234 ret = store->put_bucket_entrypoint_info(tenant_name, bucket_name, ep, false, ot, real_time(), &attrs);
235 if (ret < 0)
236 goto done_err;
237
238 return 0;
239done_err:
240 int r = rgw_unlink_bucket(store, user_id, bucket.tenant, bucket.name);
241 if (r < 0) {
242 ldout(store->ctx(), 0) << "ERROR: failed unlinking bucket on error cleanup: "
243 << cpp_strerror(-r) << dendl;
244 }
245 return ret;
246}
247
248int rgw_unlink_bucket(RGWRados *store, const rgw_user& user_id, const string& tenant_name, const string& bucket_name, bool update_entrypoint)
249{
250 int ret;
251
252 string buckets_obj_id;
253 rgw_get_buckets_obj(user_id, buckets_obj_id);
254
255 cls_user_bucket bucket;
256 bucket.name = bucket_name;
257 rgw_raw_obj obj(store->get_zone_params().user_uid_pool, buckets_obj_id);
258 ret = store->cls_user_remove_bucket(obj, bucket);
259 if (ret < 0) {
260 ldout(store->ctx(), 0) << "ERROR: error removing bucket from directory: "
261 << cpp_strerror(-ret)<< dendl;
262 }
263
264 if (!update_entrypoint)
265 return 0;
266
267 RGWBucketEntryPoint ep;
268 RGWObjVersionTracker ot;
269 map<string, bufferlist> attrs;
270 RGWObjectCtx obj_ctx(store);
271 ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, ep, &ot, NULL, &attrs);
272 if (ret == -ENOENT)
273 return 0;
274 if (ret < 0)
275 return ret;
276
277 if (!ep.linked)
278 return 0;
279
280 if (ep.owner != user_id) {
281 ldout(store->ctx(), 0) << "bucket entry point user mismatch, can't unlink bucket: " << ep.owner << " != " << user_id << dendl;
282 return -EINVAL;
283 }
284
285 ep.linked = false;
286 return store->put_bucket_entrypoint_info(tenant_name, bucket_name, ep, false, ot, real_time(), &attrs);
287}
288
289int rgw_bucket_store_info(RGWRados *store, const string& bucket_name, bufferlist& bl, bool exclusive,
290 map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
291 real_time mtime) {
292 return store->meta_mgr->put_entry(bucket_meta_handler, bucket_name, bl, exclusive, objv_tracker, mtime, pattrs);
293}
294
295int rgw_bucket_instance_store_info(RGWRados *store, string& entry, bufferlist& bl, bool exclusive,
296 map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
297 real_time mtime) {
298 return store->meta_mgr->put_entry(bucket_instance_meta_handler, entry, bl, exclusive, objv_tracker, mtime, pattrs);
299}
300
f64942e4
AA
301int rgw_bucket_instance_remove_entry(RGWRados *store, const string& entry,
302 RGWObjVersionTracker *objv_tracker) {
7c673cae
FG
303 return store->meta_mgr->remove_entry(bucket_instance_meta_handler, entry, objv_tracker);
304}
305
306// 'tenant/' is used in bucket instance keys for sync to avoid parsing ambiguity
307// with the existing instance[:shard] format. once we parse the shard, the / is
308// replaced with a : to match the [tenant:]instance format
309void rgw_bucket_instance_key_to_oid(string& key)
310{
311 // replace tenant/ with tenant:
312 auto c = key.find('/');
313 if (c != string::npos) {
314 key[c] = ':';
315 }
316}
317
318// convert bucket instance oids back to the tenant/ format for metadata keys.
319// it's safe to parse 'tenant:' only for oids, because they won't contain the
320// optional :shard at the end
321void rgw_bucket_instance_oid_to_key(string& oid)
322{
323 // find first : (could be tenant:bucket or bucket:instance)
324 auto c = oid.find(':');
325 if (c != string::npos) {
326 // if we find another :, the first one was for tenant
327 if (oid.find(':', c + 1) != string::npos) {
328 oid[c] = '/';
329 }
330 }
331}
332
333int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id)
334{
335 ssize_t pos = bucket_instance.rfind(':');
336 if (pos < 0) {
337 return -EINVAL;
338 }
339
340 string first = bucket_instance.substr(0, pos);
341 string second = bucket_instance.substr(pos + 1);
342
343 if (first.find(':') == string::npos) {
344 *shard_id = -1;
345 *target_bucket_instance = bucket_instance;
346 return 0;
347 }
348
349 *target_bucket_instance = first;
350 string err;
351 *shard_id = strict_strtol(second.c_str(), 10, &err);
352 if (!err.empty()) {
353 return -EINVAL;
354 }
355
356 return 0;
357}
358
359// parse key in format: [tenant/]name:instance[:shard_id]
360int rgw_bucket_parse_bucket_key(CephContext *cct, const string& key,
361 rgw_bucket *bucket, int *shard_id)
362{
363 boost::string_ref name{key};
364 boost::string_ref instance;
365
366 // split tenant/name
367 auto pos = name.find('/');
368 if (pos != boost::string_ref::npos) {
369 auto tenant = name.substr(0, pos);
370 bucket->tenant.assign(tenant.begin(), tenant.end());
371 name = name.substr(pos + 1);
372 }
373
374 // split name:instance
375 pos = name.find(':');
376 if (pos != boost::string_ref::npos) {
377 instance = name.substr(pos + 1);
378 name = name.substr(0, pos);
379 }
380 bucket->name.assign(name.begin(), name.end());
381
382 // split instance:shard
383 pos = instance.find(':');
384 if (pos == boost::string_ref::npos) {
385 bucket->bucket_id.assign(instance.begin(), instance.end());
386 *shard_id = -1;
387 return 0;
388 }
389
390 // parse shard id
391 auto shard = instance.substr(pos + 1);
392 string err;
393 auto id = strict_strtol(shard.data(), 10, &err);
394 if (!err.empty()) {
395 ldout(cct, 0) << "ERROR: failed to parse bucket shard '"
396 << instance.data() << "': " << err << dendl;
397 return -EINVAL;
398 }
399
400 *shard_id = id;
401 instance = instance.substr(0, pos);
402 bucket->bucket_id.assign(instance.begin(), instance.end());
403 return 0;
404}
405
406int rgw_bucket_set_attrs(RGWRados *store, RGWBucketInfo& bucket_info,
407 map<string, bufferlist>& attrs,
408 RGWObjVersionTracker *objv_tracker)
409{
410 rgw_bucket& bucket = bucket_info.bucket;
411
412 if (!bucket_info.has_instance_obj) {
413 /* an old bucket object, need to convert it */
414 RGWObjectCtx obj_ctx(store);
415 int ret = store->convert_old_bucket_info(obj_ctx, bucket.tenant, bucket.name);
416 if (ret < 0) {
417 ldout(store->ctx(), 0) << "ERROR: failed converting old bucket info: " << ret << dendl;
418 return ret;
419 }
420 }
421
422 /* we want the bucket instance name without the oid prefix cruft */
423 string key = bucket.get_key();
424 bufferlist bl;
425
426 ::encode(bucket_info, bl);
427
428 return rgw_bucket_instance_store_info(store, key, bl, false, &attrs, objv_tracker, real_time());
429}
430
431static void dump_mulipart_index_results(list<rgw_obj_index_key>& objs_to_unlink,
432 Formatter *f)
433{
d2e6a577
FG
434 for (const auto& o : objs_to_unlink) {
435 f->dump_string("object", o.name);
7c673cae 436 }
7c673cae
FG
437}
438
439void check_bad_user_bucket_mapping(RGWRados *store, const rgw_user& user_id,
440 bool fix)
441{
442 RGWUserBuckets user_buckets;
443 bool is_truncated = false;
444 string marker;
445
446 CephContext *cct = store->ctx();
447
448 size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
449
450 do {
451 int ret = rgw_read_user_buckets(store, user_id, user_buckets, marker,
452 string(), max_entries, false,
453 &is_truncated);
454 if (ret < 0) {
455 ldout(store->ctx(), 0) << "failed to read user buckets: "
456 << cpp_strerror(-ret) << dendl;
457 return;
458 }
459
460 map<string, RGWBucketEnt>& buckets = user_buckets.get_buckets();
461 for (map<string, RGWBucketEnt>::iterator i = buckets.begin();
462 i != buckets.end();
463 ++i) {
464 marker = i->first;
465
466 RGWBucketEnt& bucket_ent = i->second;
467 rgw_bucket& bucket = bucket_ent.bucket;
468
469 RGWBucketInfo bucket_info;
470 real_time mtime;
471 RGWObjectCtx obj_ctx(store);
472 int r = store->get_bucket_info(obj_ctx, user_id.tenant, bucket.name, bucket_info, &mtime);
473 if (r < 0) {
474 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << dendl;
475 continue;
476 }
477
478 rgw_bucket& actual_bucket = bucket_info.bucket;
479
480 if (actual_bucket.name.compare(bucket.name) != 0 ||
481 actual_bucket.tenant.compare(bucket.tenant) != 0 ||
482 actual_bucket.marker.compare(bucket.marker) != 0 ||
483 actual_bucket.bucket_id.compare(bucket.bucket_id) != 0) {
484 cout << "bucket info mismatch: expected " << actual_bucket << " got " << bucket << std::endl;
485 if (fix) {
486 cout << "fixing" << std::endl;
3efd9988
FG
487 r = rgw_link_bucket(store, user_id, actual_bucket,
488 bucket_info.creation_time);
7c673cae
FG
489 if (r < 0) {
490 cerr << "failed to fix bucket: " << cpp_strerror(-r) << std::endl;
491 }
492 }
493 }
494 }
495 } while (is_truncated);
496}
497
498static bool bucket_object_check_filter(const string& oid)
499{
500 rgw_obj_key key;
501 string ns;
502 return rgw_obj_key::oid_to_key_in_ns(oid, &key, ns);
503}
504
505int rgw_remove_object(RGWRados *store, RGWBucketInfo& bucket_info, rgw_bucket& bucket, rgw_obj_key& key)
506{
507 RGWObjectCtx rctx(store);
508
509 if (key.instance.empty()) {
510 key.instance = "null";
511 }
512
513 rgw_obj obj(bucket, key);
514
515 return store->delete_obj(rctx, bucket_info, obj, bucket_info.versioning_status());
516}
517
518int rgw_remove_bucket(RGWRados *store, rgw_bucket& bucket, bool delete_children)
519{
520 int ret;
521 map<RGWObjCategory, RGWStorageStats> stats;
522 std::vector<rgw_bucket_dir_entry> objs;
523 map<string, bool> common_prefixes;
524 RGWBucketInfo info;
525 RGWObjectCtx obj_ctx(store);
526
527 string bucket_ver, master_ver;
528
529 ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, info, NULL);
530 if (ret < 0)
531 return ret;
532
533 ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL);
534 if (ret < 0)
535 return ret;
536
7c673cae
FG
537 RGWRados::Bucket target(store, info);
538 RGWRados::Bucket::List list_op(&target);
224ce89b
WB
539 CephContext *cct = store->ctx();
540 int max = 1000;
7c673cae
FG
541
542 list_op.params.list_versions = true;
a8e16298 543 list_op.params.allow_unordered = true;
7c673cae 544
a8e16298 545 bool is_truncated = false;
224ce89b
WB
546 do {
547 objs.clear();
7c673cae 548
a8e16298 549 ret = list_op.list_objects(max, &objs, &common_prefixes, &is_truncated);
224ce89b
WB
550 if (ret < 0)
551 return ret;
7c673cae 552
224ce89b
WB
553 if (!objs.empty() && !delete_children) {
554 lderr(store->ctx()) << "ERROR: could not remove non-empty bucket " << bucket.name << dendl;
555 return -ENOTEMPTY;
556 }
557
558 for (const auto& obj : objs) {
559 rgw_obj_key key(obj.key);
560 ret = rgw_remove_object(store, info, bucket, key);
a8e16298 561 if (ret < 0 && ret != -ENOENT) {
7c673cae 562 return ret;
a8e16298 563 }
224ce89b 564 }
a8e16298 565 } while(is_truncated);
7c673cae 566
224ce89b 567 string prefix, delimiter;
7c673cae 568
224ce89b
WB
569 ret = abort_bucket_multiparts(store, cct, info, prefix, delimiter);
570 if (ret < 0) {
571 return ret;
7c673cae
FG
572 }
573
b32b8144 574 ret = rgw_bucket_sync_user_stats(store, info.owner, info);
7c673cae
FG
575 if ( ret < 0) {
576 dout(1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl;
577 }
578
579 RGWObjVersionTracker objv_tracker;
580
a8e16298
TL
581 // if we deleted children above we will force delete, as any that
582 // remain is detrius from a prior bug
583 ret = store->delete_bucket(info, objv_tracker, !delete_children);
7c673cae 584 if (ret < 0) {
a8e16298
TL
585 lderr(store->ctx()) << "ERROR: could not remove bucket " <<
586 bucket.name << dendl;
7c673cae
FG
587 return ret;
588 }
589
590 ret = rgw_unlink_bucket(store, info.owner, bucket.tenant, bucket.name, false);
591 if (ret < 0) {
592 lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
593 }
594
595 return ret;
596}
597
598static int aio_wait(librados::AioCompletion *handle)
599{
600 librados::AioCompletion *c = (librados::AioCompletion *)handle;
601 c->wait_for_safe();
602 int ret = c->get_return_value();
603 c->release();
604 return ret;
605}
606
607static int drain_handles(list<librados::AioCompletion *>& pending)
608{
609 int ret = 0;
610 while (!pending.empty()) {
611 librados::AioCompletion *handle = pending.front();
612 pending.pop_front();
613 int r = aio_wait(handle);
614 if (r < 0) {
615 ret = r;
616 }
617 }
618 return ret;
619}
620
621int rgw_remove_bucket_bypass_gc(RGWRados *store, rgw_bucket& bucket,
622 int concurrent_max, bool keep_index_consistent)
623{
624 int ret;
625 map<RGWObjCategory, RGWStorageStats> stats;
626 std::vector<rgw_bucket_dir_entry> objs;
627 map<string, bool> common_prefixes;
628 RGWBucketInfo info;
629 RGWObjectCtx obj_ctx(store);
224ce89b 630 CephContext *cct = store->ctx();
7c673cae
FG
631
632 string bucket_ver, master_ver;
633
634 ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, info, NULL);
635 if (ret < 0)
636 return ret;
637
638 ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL);
639 if (ret < 0)
640 return ret;
641
224ce89b
WB
642 string prefix, delimiter;
643
644 ret = abort_bucket_multiparts(store, cct, info, prefix, delimiter);
645 if (ret < 0) {
646 return ret;
647 }
7c673cae
FG
648
649 RGWRados::Bucket target(store, info);
650 RGWRados::Bucket::List list_op(&target);
651
652 list_op.params.list_versions = true;
a8e16298 653 list_op.params.allow_unordered = true;
7c673cae
FG
654
655 std::list<librados::AioCompletion*> handles;
656
657 int max = 1000;
658 int max_aio = concurrent_max;
a8e16298
TL
659 bool is_truncated = true;
660
661 while (is_truncated) {
662 objs.clear();
663 ret = list_op.list_objects(max, &objs, &common_prefixes, &is_truncated);
664 if (ret < 0)
665 return ret;
7c673cae 666
7c673cae
FG
667 std::vector<rgw_bucket_dir_entry>::iterator it = objs.begin();
668 for (; it != objs.end(); ++it) {
669 RGWObjState *astate = NULL;
670 rgw_obj obj(bucket, (*it).key);
671
672 ret = store->get_obj_state(&obj_ctx, info, obj, &astate, false);
673 if (ret == -ENOENT) {
674 dout(1) << "WARNING: cannot find obj state for obj " << obj.get_oid() << dendl;
675 continue;
676 }
677 if (ret < 0) {
678 lderr(store->ctx()) << "ERROR: get obj state returned with error " << ret << dendl;
679 return ret;
680 }
681
682 if (astate->has_manifest) {
683 RGWObjManifest& manifest = astate->manifest;
684 RGWObjManifest::obj_iterator miter = manifest.obj_begin();
685 rgw_obj head_obj = manifest.get_obj();
686 rgw_raw_obj raw_head_obj;
687 store->obj_to_raw(info.placement_rule, head_obj, &raw_head_obj);
688
689
690 for (; miter != manifest.obj_end() && max_aio--; ++miter) {
691 if (!max_aio) {
692 ret = drain_handles(handles);
693 if (ret < 0) {
694 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
695 return ret;
696 }
697 max_aio = concurrent_max;
698 }
699
700 rgw_raw_obj last_obj = miter.get_location().get_raw_obj(store);
701 if (last_obj == raw_head_obj) {
702 // have the head obj deleted at the end
703 continue;
704 }
705
706 ret = store->delete_raw_obj_aio(last_obj, handles);
707 if (ret < 0) {
708 lderr(store->ctx()) << "ERROR: delete obj aio failed with " << ret << dendl;
709 return ret;
710 }
711 } // for all shadow objs
712
713 ret = store->delete_obj_aio(head_obj, info, astate, handles, keep_index_consistent);
714 if (ret < 0) {
715 lderr(store->ctx()) << "ERROR: delete obj aio failed with " << ret << dendl;
716 return ret;
717 }
718 }
719
720 if (!max_aio) {
721 ret = drain_handles(handles);
722 if (ret < 0) {
723 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
724 return ret;
725 }
726 max_aio = concurrent_max;
727 }
728 } // for all RGW objects
7c673cae
FG
729 }
730
731 ret = drain_handles(handles);
732 if (ret < 0) {
733 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
734 return ret;
735 }
736
b32b8144 737 ret = rgw_bucket_sync_user_stats(store, info.owner, info);
7c673cae
FG
738 if (ret < 0) {
739 dout(1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl;
740 }
741
742 RGWObjVersionTracker objv_tracker;
743
a8e16298
TL
744 // this function can only be run if caller wanted children to be
745 // deleted, so we can ignore the check for children as any that
746 // remain are detritus from a prior bug
747 ret = store->delete_bucket(info, objv_tracker, false);
7c673cae 748 if (ret < 0) {
b32b8144 749 lderr(store->ctx()) << "ERROR: could not remove bucket " << bucket.name << dendl;
7c673cae
FG
750 return ret;
751 }
752
7c673cae
FG
753 ret = rgw_unlink_bucket(store, info.owner, bucket.tenant, bucket.name, false);
754 if (ret < 0) {
755 lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
756 }
757
758 return ret;
759}
760
761int rgw_bucket_delete_bucket_obj(RGWRados *store,
762 const string& tenant_name,
763 const string& bucket_name,
764 RGWObjVersionTracker& objv_tracker)
765{
766 string key;
767
768 rgw_make_bucket_entry_name(tenant_name, bucket_name, key);
769 return store->meta_mgr->remove_entry(bucket_meta_handler, key, &objv_tracker);
770}
771
772static void set_err_msg(std::string *sink, std::string msg)
773{
774 if (sink && !msg.empty())
775 *sink = msg;
776}
777
778int RGWBucket::init(RGWRados *storage, RGWBucketAdminOpState& op_state)
779{
780 if (!storage)
781 return -EINVAL;
782
783 store = storage;
784
785 rgw_user user_id = op_state.get_user_id();
786 tenant = user_id.tenant;
787 bucket_name = op_state.get_bucket_name();
788 RGWUserBuckets user_buckets;
789 RGWObjectCtx obj_ctx(store);
790
791 if (bucket_name.empty() && user_id.empty())
792 return -EINVAL;
793
794 if (!bucket_name.empty()) {
795 int r = store->get_bucket_info(obj_ctx, tenant, bucket_name, bucket_info, NULL);
796 if (r < 0) {
797 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket_name << dendl;
798 return r;
799 }
800
801 op_state.set_bucket(bucket_info.bucket);
802 }
803
804 if (!user_id.empty()) {
805 int r = rgw_get_user_info_by_uid(store, user_id, user_info);
806 if (r < 0)
807 return r;
808
809 op_state.display_name = user_info.display_name;
810 }
811
812 clear_failure();
813 return 0;
814}
815
816int RGWBucket::link(RGWBucketAdminOpState& op_state, std::string *err_msg)
817{
818 if (!op_state.is_user_op()) {
819 set_err_msg(err_msg, "empty user id");
820 return -EINVAL;
821 }
822
823 string bucket_id = op_state.get_bucket_id();
824 if (bucket_id.empty()) {
825 set_err_msg(err_msg, "empty bucket instance id");
826 return -EINVAL;
827 }
828
829 std::string display_name = op_state.get_user_display_name();
830 rgw_bucket bucket = op_state.get_bucket();
831
832 const rgw_pool& root_pool = store->get_zone_params().domain_root;
833 rgw_raw_obj obj(root_pool, bucket.name);
834 RGWObjVersionTracker objv_tracker;
835
836 map<string, bufferlist> attrs;
837 RGWBucketInfo bucket_info;
838
839 string key = bucket.name + ":" + bucket_id;
840 RGWObjectCtx obj_ctx(store);
841 int r = store->get_bucket_instance_info(obj_ctx, key, bucket_info, NULL, &attrs);
842 if (r < 0) {
843 return r;
844 }
845
846 rgw_user user_id = op_state.get_user_id();
847
848 map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_ACL);
849 if (aiter != attrs.end()) {
850 bufferlist aclbl = aiter->second;
851 RGWAccessControlPolicy policy;
852 ACLOwner owner;
853 try {
854 bufferlist::iterator iter = aclbl.begin();
855 ::decode(policy, iter);
856 owner = policy.get_owner();
857 } catch (buffer::error& err) {
858 set_err_msg(err_msg, "couldn't decode policy");
859 return -EIO;
860 }
861
862 r = rgw_unlink_bucket(store, owner.get_id(), bucket.tenant, bucket.name, false);
863 if (r < 0) {
864 set_err_msg(err_msg, "could not unlink policy from user " + owner.get_id().to_str());
865 return r;
866 }
867
868 // now update the user for the bucket...
869 if (display_name.empty()) {
870 ldout(store->ctx(), 0) << "WARNING: user " << user_info.user_id << " has no display name set" << dendl;
871 }
872 policy.create_default(user_info.user_id, display_name);
873
874 owner = policy.get_owner();
31f18b77 875 r = store->set_bucket_owner(bucket_info.bucket, owner);
7c673cae
FG
876 if (r < 0) {
877 set_err_msg(err_msg, "failed to set bucket owner: " + cpp_strerror(-r));
878 return r;
879 }
880
881 // ...and encode the acl
882 aclbl.clear();
883 policy.encode(aclbl);
884
885 r = store->system_obj_set_attr(NULL, obj, RGW_ATTR_ACL, aclbl, &objv_tracker);
224ce89b 886 if (r < 0) {
7c673cae 887 return r;
224ce89b 888 }
7c673cae
FG
889
890 RGWAccessControlPolicy policy_instance;
891 policy_instance.create_default(user_info.user_id, display_name);
892 aclbl.clear();
893 policy_instance.encode(aclbl);
894
895 string oid_bucket_instance = RGW_BUCKET_INSTANCE_MD_PREFIX + key;
896 rgw_raw_obj obj_bucket_instance(root_pool, oid_bucket_instance);
897 r = store->system_obj_set_attr(NULL, obj_bucket_instance, RGW_ATTR_ACL, aclbl, &objv_tracker);
224ce89b
WB
898 if (r < 0) {
899 return r;
900 }
7c673cae 901
3efd9988
FG
902 r = rgw_link_bucket(store, user_info.user_id, bucket_info.bucket,
903 ceph::real_time());
224ce89b 904 if (r < 0) {
7c673cae 905 return r;
224ce89b 906 }
7c673cae
FG
907 }
908
909 return 0;
910}
911
912int RGWBucket::unlink(RGWBucketAdminOpState& op_state, std::string *err_msg)
913{
914 rgw_bucket bucket = op_state.get_bucket();
915
916 if (!op_state.is_user_op()) {
917 set_err_msg(err_msg, "could not fetch user or user bucket info");
918 return -EINVAL;
919 }
920
921 int r = rgw_unlink_bucket(store, user_info.user_id, bucket.tenant, bucket.name);
922 if (r < 0) {
923 set_err_msg(err_msg, "error unlinking bucket" + cpp_strerror(-r));
924 }
925
926 return r;
927}
928
94b18763
FG
929int RGWBucket::set_quota(RGWBucketAdminOpState& op_state, std::string *err_msg)
930{
931 rgw_bucket bucket = op_state.get_bucket();
932 RGWBucketInfo bucket_info;
933 map<string, bufferlist> attrs;
934 RGWObjectCtx obj_ctx(store);
935 int r = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, bucket_info, NULL, &attrs);
936 if (r < 0) {
937 set_err_msg(err_msg, "could not get bucket info for bucket=" + bucket.name + ": " + cpp_strerror(-r));
938 return r;
939 }
940
941 bucket_info.quota = op_state.quota;
942 r = store->put_bucket_instance_info(bucket_info, false, real_time(), &attrs);
943 if (r < 0) {
944 set_err_msg(err_msg, "ERROR: failed writing bucket instance info: " + cpp_strerror(-r));
945 return r;
946 }
947 return r;
948}
949
7c673cae
FG
950int RGWBucket::remove(RGWBucketAdminOpState& op_state, bool bypass_gc,
951 bool keep_index_consistent, std::string *err_msg)
952{
953 bool delete_children = op_state.will_delete_children();
954 rgw_bucket bucket = op_state.get_bucket();
955 int ret;
956
957 if (bypass_gc) {
958 if (delete_children) {
959 ret = rgw_remove_bucket_bypass_gc(store, bucket, op_state.get_max_aio(), keep_index_consistent);
960 } else {
961 set_err_msg(err_msg, "purge objects should be set for gc to be bypassed");
962 return -EINVAL;
963 }
964 } else {
965 ret = rgw_remove_bucket(store, bucket, delete_children);
966 }
967
968 if (ret < 0) {
969 set_err_msg(err_msg, "unable to remove bucket" + cpp_strerror(-ret));
970 return ret;
971 }
972
973 return 0;
974}
975
976int RGWBucket::remove_object(RGWBucketAdminOpState& op_state, std::string *err_msg)
977{
978 rgw_bucket bucket = op_state.get_bucket();
979 std::string object_name = op_state.get_object_name();
980
981 rgw_obj_key key(object_name);
982
983 int ret = rgw_remove_object(store, bucket_info, bucket, key);
984 if (ret < 0) {
985 set_err_msg(err_msg, "unable to remove object" + cpp_strerror(-ret));
986 return ret;
987 }
988
989 return 0;
990}
991
992static void dump_bucket_index(map<string, rgw_bucket_dir_entry> result, Formatter *f)
993{
994 map<string, rgw_bucket_dir_entry>::iterator iter;
995 for (iter = result.begin(); iter != result.end(); ++iter) {
996 f->dump_string("object", iter->first);
997 }
998}
999
1000static void dump_bucket_usage(map<RGWObjCategory, RGWStorageStats>& stats, Formatter *formatter)
1001{
1002 map<RGWObjCategory, RGWStorageStats>::iterator iter;
1003
1004 formatter->open_object_section("usage");
1005 for (iter = stats.begin(); iter != stats.end(); ++iter) {
1006 RGWStorageStats& s = iter->second;
1007 const char *cat_name = rgw_obj_category_name(iter->first);
1008 formatter->open_object_section(cat_name);
1009 s.dump(formatter);
1010 formatter->close_section();
1011 }
1012 formatter->close_section();
1013}
1014
1015static void dump_index_check(map<RGWObjCategory, RGWStorageStats> existing_stats,
1016 map<RGWObjCategory, RGWStorageStats> calculated_stats,
1017 Formatter *formatter)
1018{
1019 formatter->open_object_section("check_result");
1020 formatter->open_object_section("existing_header");
1021 dump_bucket_usage(existing_stats, formatter);
1022 formatter->close_section();
1023 formatter->open_object_section("calculated_header");
1024 dump_bucket_usage(calculated_stats, formatter);
1025 formatter->close_section();
1026 formatter->close_section();
1027}
1028
1029int RGWBucket::check_bad_index_multipart(RGWBucketAdminOpState& op_state,
d2e6a577 1030 RGWFormatterFlusher& flusher ,std::string *err_msg)
7c673cae
FG
1031{
1032 bool fix_index = op_state.will_fix_index();
1033 rgw_bucket bucket = op_state.get_bucket();
1034
d2e6a577 1035 size_t max = 1000;
7c673cae
FG
1036
1037 map<string, bool> common_prefixes;
1038
1039 bool is_truncated;
1040 map<string, bool> meta_objs;
1041 map<rgw_obj_index_key, string> all_objs;
1042
1043 RGWBucketInfo bucket_info;
1044 RGWObjectCtx obj_ctx(store);
1045 int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, nullptr, nullptr);
1046 if (r < 0) {
1047 ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): get_bucket_instance_info(bucket=" << bucket << ") returned r=" << r << dendl;
1048 return r;
1049 }
1050
1051 RGWRados::Bucket target(store, bucket_info);
1052 RGWRados::Bucket::List list_op(&target);
1053
1054 list_op.params.list_versions = true;
224ce89b 1055 list_op.params.ns = RGW_OBJ_NS_MULTIPART;
7c673cae
FG
1056
1057 do {
1058 vector<rgw_bucket_dir_entry> result;
1059 int r = list_op.list_objects(max, &result, &common_prefixes, &is_truncated);
1060 if (r < 0) {
1061 set_err_msg(err_msg, "failed to list objects in bucket=" + bucket.name +
1062 " err=" + cpp_strerror(-r));
1063
1064 return r;
1065 }
1066
1067 vector<rgw_bucket_dir_entry>::iterator iter;
1068 for (iter = result.begin(); iter != result.end(); ++iter) {
31f18b77
FG
1069 rgw_obj_index_key key = iter->key;
1070 rgw_obj obj(bucket, key);
1071 string oid = obj.get_oid();
7c673cae
FG
1072
1073 int pos = oid.find_last_of('.');
1074 if (pos < 0) {
1075 /* obj has no suffix */
1076 all_objs[key] = oid;
1077 } else {
1078 /* obj has suffix */
1079 string name = oid.substr(0, pos);
1080 string suffix = oid.substr(pos + 1);
1081
1082 if (suffix.compare("meta") == 0) {
1083 meta_objs[name] = true;
1084 } else {
1085 all_objs[key] = name;
1086 }
1087 }
1088 }
1089
1090 } while (is_truncated);
1091
d2e6a577
FG
1092 list<rgw_obj_index_key> objs_to_unlink;
1093 Formatter *f = flusher.get_formatter();
1094
1095 f->open_array_section("invalid_multipart_entries");
1096
7c673cae
FG
1097 for (auto aiter = all_objs.begin(); aiter != all_objs.end(); ++aiter) {
1098 string& name = aiter->second;
1099
7c673cae
FG
1100 if (meta_objs.find(name) == meta_objs.end()) {
1101 objs_to_unlink.push_back(aiter->first);
1102 }
7c673cae 1103
d2e6a577
FG
1104 if (objs_to_unlink.size() > max) {
1105 if (fix_index) {
1106 int r = store->remove_objs_from_index(bucket_info, objs_to_unlink);
1107 if (r < 0) {
1108 set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
1109 cpp_strerror(-r));
1110 return r;
1111 }
1112 }
1113
1114 dump_mulipart_index_results(objs_to_unlink, flusher.get_formatter());
1115 flusher.flush();
1116 objs_to_unlink.clear();
1117 }
1118 }
7c673cae
FG
1119
1120 if (fix_index) {
1121 int r = store->remove_objs_from_index(bucket_info, objs_to_unlink);
1122 if (r < 0) {
1123 set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
1124 cpp_strerror(-r));
1125
1126 return r;
1127 }
1128 }
1129
d2e6a577
FG
1130 dump_mulipart_index_results(objs_to_unlink, f);
1131 f->close_section();
1132 flusher.flush();
1133
7c673cae
FG
1134 return 0;
1135}
1136
1137int RGWBucket::check_object_index(RGWBucketAdminOpState& op_state,
1138 RGWFormatterFlusher& flusher,
1139 std::string *err_msg)
1140{
1141
1142 bool fix_index = op_state.will_fix_index();
1143
1144 rgw_bucket bucket = op_state.get_bucket();
1145
1146 if (!fix_index) {
1147 set_err_msg(err_msg, "check-objects flag requires fix index enabled");
1148 return -EINVAL;
1149 }
1150
1151 store->cls_obj_set_bucket_tag_timeout(bucket_info, BUCKET_TAG_TIMEOUT);
1152
1153 string prefix;
1154 rgw_obj_index_key marker;
1155 bool is_truncated = true;
1156
1157 Formatter *formatter = flusher.get_formatter();
1158 formatter->open_object_section("objects");
1159 while (is_truncated) {
1160 map<string, rgw_bucket_dir_entry> result;
1161
1adf2230
AA
1162 int r = store->cls_bucket_list_ordered(bucket_info, RGW_NO_SHARD,
1163 marker, prefix, 1000, true,
1164 result, &is_truncated, &marker,
1165 bucket_object_check_filter);
7c673cae
FG
1166 if (r == -ENOENT) {
1167 break;
1168 } else if (r < 0 && r != -ENOENT) {
1169 set_err_msg(err_msg, "ERROR: failed operation r=" + cpp_strerror(-r));
1170 }
1171
7c673cae
FG
1172 dump_bucket_index(result, formatter);
1173 flusher.flush();
7c673cae
FG
1174 }
1175
1176 formatter->close_section();
1177
1178 store->cls_obj_set_bucket_tag_timeout(bucket_info, 0);
1179
1180 return 0;
1181}
1182
1183
1184int RGWBucket::check_index(RGWBucketAdminOpState& op_state,
1185 map<RGWObjCategory, RGWStorageStats>& existing_stats,
1186 map<RGWObjCategory, RGWStorageStats>& calculated_stats,
1187 std::string *err_msg)
1188{
1189 rgw_bucket bucket = op_state.get_bucket();
1190 bool fix_index = op_state.will_fix_index();
1191
1192 int r = store->bucket_check_index(bucket_info, &existing_stats, &calculated_stats);
1193 if (r < 0) {
1194 set_err_msg(err_msg, "failed to check index error=" + cpp_strerror(-r));
1195 return r;
1196 }
1197
1198 if (fix_index) {
1199 r = store->bucket_rebuild_index(bucket_info);
1200 if (r < 0) {
1201 set_err_msg(err_msg, "failed to rebuild index err=" + cpp_strerror(-r));
1202 return r;
1203 }
1204 }
1205
1206 return 0;
1207}
1208
1209
1210int RGWBucket::policy_bl_to_stream(bufferlist& bl, ostream& o)
1211{
1212 RGWAccessControlPolicy_S3 policy(g_ceph_context);
1213 bufferlist::iterator iter = bl.begin();
1214 try {
1215 policy.decode(iter);
1216 } catch (buffer::error& err) {
1217 dout(0) << "ERROR: caught buffer::error, could not decode policy" << dendl;
1218 return -EIO;
1219 }
1220 policy.to_xml(o);
1221 return 0;
1222}
1223
1224static int policy_decode(RGWRados *store, bufferlist& bl, RGWAccessControlPolicy& policy)
1225{
1226 bufferlist::iterator iter = bl.begin();
1227 try {
1228 policy.decode(iter);
1229 } catch (buffer::error& err) {
1230 ldout(store->ctx(), 0) << "ERROR: caught buffer::error, could not decode policy" << dendl;
1231 return -EIO;
1232 }
1233 return 0;
1234}
1235
1236int RGWBucket::get_policy(RGWBucketAdminOpState& op_state, RGWAccessControlPolicy& policy)
1237{
1238 std::string object_name = op_state.get_object_name();
1239 rgw_bucket bucket = op_state.get_bucket();
1240 RGWObjectCtx obj_ctx(store);
1241
1242 RGWBucketInfo bucket_info;
1243 map<string, bufferlist> attrs;
1244 int ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, bucket_info, NULL, &attrs);
1245 if (ret < 0) {
1246 return ret;
1247 }
1248
1249 if (!object_name.empty()) {
1250 bufferlist bl;
1251 rgw_obj obj(bucket, object_name);
1252
1253 RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
1254 RGWRados::Object::Read rop(&op_target);
1255
1256 int ret = rop.get_attr(RGW_ATTR_ACL, bl);
1257 if (ret < 0)
1258 return ret;
1259
1260 return policy_decode(store, bl, policy);
1261 }
1262
1263 map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_ACL);
1264 if (aiter == attrs.end()) {
1265 return -ENOENT;
1266 }
1267
1268 return policy_decode(store, aiter->second, policy);
1269}
1270
1271
1272int RGWBucketAdminOp::get_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
1273 RGWAccessControlPolicy& policy)
1274{
1275 RGWBucket bucket;
1276
1277 int ret = bucket.init(store, op_state);
1278 if (ret < 0)
1279 return ret;
1280
1281 ret = bucket.get_policy(op_state, policy);
1282 if (ret < 0)
1283 return ret;
1284
1285 return 0;
1286}
1287
1288/* Wrappers to facilitate RESTful interface */
1289
1290
1291int RGWBucketAdminOp::get_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
1292 RGWFormatterFlusher& flusher)
1293{
1294 RGWAccessControlPolicy policy(store->ctx());
1295
1296 int ret = get_policy(store, op_state, policy);
1297 if (ret < 0)
1298 return ret;
1299
1300 Formatter *formatter = flusher.get_formatter();
1301
1302 flusher.start(0);
1303
1304 formatter->open_object_section("policy");
1305 policy.dump(formatter);
1306 formatter->close_section();
1307
1308 flusher.flush();
1309
1310 return 0;
1311}
1312
1313int RGWBucketAdminOp::dump_s3_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
1314 ostream& os)
1315{
1316 RGWAccessControlPolicy_S3 policy(store->ctx());
1317
1318 int ret = get_policy(store, op_state, policy);
1319 if (ret < 0)
1320 return ret;
1321
1322 policy.to_xml(os);
1323
1324 return 0;
1325}
1326
1327int RGWBucketAdminOp::unlink(RGWRados *store, RGWBucketAdminOpState& op_state)
1328{
1329 RGWBucket bucket;
1330
1331 int ret = bucket.init(store, op_state);
1332 if (ret < 0)
1333 return ret;
1334
1335 return bucket.unlink(op_state);
1336}
1337
1338int RGWBucketAdminOp::link(RGWRados *store, RGWBucketAdminOpState& op_state, string *err)
1339{
1340 RGWBucket bucket;
1341
1342 int ret = bucket.init(store, op_state);
1343 if (ret < 0)
1344 return ret;
1345
1346 return bucket.link(op_state, err);
1347
1348}
1349
1350int RGWBucketAdminOp::check_index(RGWRados *store, RGWBucketAdminOpState& op_state,
1351 RGWFormatterFlusher& flusher)
1352{
1353 int ret;
7c673cae
FG
1354 map<RGWObjCategory, RGWStorageStats> existing_stats;
1355 map<RGWObjCategory, RGWStorageStats> calculated_stats;
d2e6a577 1356
7c673cae
FG
1357
1358 RGWBucket bucket;
1359
1360 ret = bucket.init(store, op_state);
1361 if (ret < 0)
1362 return ret;
1363
1364 Formatter *formatter = flusher.get_formatter();
1365 flusher.start(0);
1366
d2e6a577 1367 ret = bucket.check_bad_index_multipart(op_state, flusher);
7c673cae
FG
1368 if (ret < 0)
1369 return ret;
1370
7c673cae
FG
1371 ret = bucket.check_object_index(op_state, flusher);
1372 if (ret < 0)
1373 return ret;
1374
1375 ret = bucket.check_index(op_state, existing_stats, calculated_stats);
1376 if (ret < 0)
1377 return ret;
1378
1379 dump_index_check(existing_stats, calculated_stats, formatter);
1380 flusher.flush();
1381
1382 return 0;
1383}
1384
1385int RGWBucketAdminOp::remove_bucket(RGWRados *store, RGWBucketAdminOpState& op_state,
1386 bool bypass_gc, bool keep_index_consistent)
1387{
1388 RGWBucket bucket;
1389
1390 int ret = bucket.init(store, op_state);
1391 if (ret < 0)
1392 return ret;
1393
c07f9fc5
FG
1394 std::string err_msg;
1395 ret = bucket.remove(op_state, bypass_gc, keep_index_consistent, &err_msg);
1396 if (!err_msg.empty()) {
1397 lderr(store->ctx()) << "ERROR: " << err_msg << dendl;
1398 }
1399 return ret;
7c673cae
FG
1400}
1401
1402int RGWBucketAdminOp::remove_object(RGWRados *store, RGWBucketAdminOpState& op_state)
1403{
1404 RGWBucket bucket;
1405
1406 int ret = bucket.init(store, op_state);
1407 if (ret < 0)
1408 return ret;
1409
1410 return bucket.remove_object(op_state);
1411}
1412
1413static int bucket_stats(RGWRados *store, const std::string& tenant_name, std::string& bucket_name, Formatter *formatter)
1414{
1415 RGWBucketInfo bucket_info;
1416 map<RGWObjCategory, RGWStorageStats> stats;
1417
1418 real_time mtime;
1419 RGWObjectCtx obj_ctx(store);
1420 int r = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, &mtime);
1421 if (r < 0)
1422 return r;
1423
1424 rgw_bucket& bucket = bucket_info.bucket;
1425
1426 string bucket_ver, master_ver;
1427 string max_marker;
1428 int ret = store->get_bucket_stats(bucket_info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, &max_marker);
1429 if (ret < 0) {
1430 cerr << "error getting bucket stats ret=" << ret << std::endl;
1431 return ret;
1432 }
1433
1434 utime_t ut(mtime);
1435
1436 formatter->open_object_section("stats");
1437 formatter->dump_string("bucket", bucket.name);
31f18b77
FG
1438 formatter->dump_string("zonegroup", bucket_info.zonegroup);
1439 formatter->dump_string("placement_rule", bucket_info.placement_rule);
1440 ::encode_json("explicit_placement", bucket.explicit_placement, formatter);
7c673cae
FG
1441 formatter->dump_string("id", bucket.bucket_id);
1442 formatter->dump_string("marker", bucket.marker);
1443 formatter->dump_stream("index_type") << bucket_info.index_type;
1444 ::encode_json("owner", bucket_info.owner, formatter);
1445 formatter->dump_string("ver", bucket_ver);
1446 formatter->dump_string("master_ver", master_ver);
1447 formatter->dump_stream("mtime") << ut;
1448 formatter->dump_string("max_marker", max_marker);
1449 dump_bucket_usage(stats, formatter);
1450 encode_json("bucket_quota", bucket_info.quota, formatter);
1451 formatter->close_section();
1452
1453 return 0;
1454}
1455
1456int RGWBucketAdminOp::limit_check(RGWRados *store,
1457 RGWBucketAdminOpState& op_state,
1458 const std::list<std::string>& user_ids,
1459 RGWFormatterFlusher& flusher,
1460 bool warnings_only)
1461{
1462 int ret = 0;
1463 const size_t max_entries =
1464 store->ctx()->_conf->rgw_list_buckets_max_chunk;
1465
1466 const size_t safe_max_objs_per_shard =
1467 store->ctx()->_conf->rgw_safe_max_objects_per_shard;
1468
1469 uint16_t shard_warn_pct =
1470 store->ctx()->_conf->rgw_shard_warning_threshold;
1471 if (shard_warn_pct > 100)
1472 shard_warn_pct = 90;
1473
1474 Formatter *formatter = flusher.get_formatter();
1475 flusher.start(0);
1476
1477 formatter->open_array_section("users");
1478
1479 for (const auto& user_id : user_ids) {
a8e16298 1480
7c673cae
FG
1481 formatter->open_object_section("user");
1482 formatter->dump_string("user_id", user_id);
7c673cae 1483 formatter->open_array_section("buckets");
a8e16298
TL
1484
1485 string marker;
1486 bool is_truncated{false};
7c673cae
FG
1487 do {
1488 RGWUserBuckets buckets;
7c673cae
FG
1489
1490 ret = rgw_read_user_buckets(store, user_id, buckets,
1491 marker, string(), max_entries, false,
1492 &is_truncated);
1493 if (ret < 0)
1494 return ret;
1495
1496 map<string, RGWBucketEnt>& m_buckets = buckets.get_buckets();
1497
1498 for (const auto& iter : m_buckets) {
1499 auto& bucket = iter.second.bucket;
1500 uint32_t num_shards = 1;
1501 uint64_t num_objects = 0;
1502
1503 /* need info for num_shards */
1504 RGWBucketInfo info;
1505 RGWObjectCtx obj_ctx(store);
1506
1507 marker = bucket.name; /* Casey's location for marker update,
1508 * as we may now not reach the end of
1509 * the loop body */
1510
1511 ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name,
1512 info, nullptr);
1513 if (ret < 0)
1514 continue;
1515
1516 /* need stats for num_entries */
1517 string bucket_ver, master_ver;
1518 std::map<RGWObjCategory, RGWStorageStats> stats;
1519 ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver,
1520 &master_ver, stats, nullptr);
1521
1522 if (ret < 0)
1523 continue;
1524
1525 for (const auto& s : stats) {
1526 num_objects += s.second.num_objects;
1527 }
1528
1529 num_shards = info.num_shards;
31f18b77
FG
1530 uint64_t objs_per_shard =
1531 (num_shards) ? num_objects/num_shards : num_objects;
7c673cae
FG
1532 {
1533 bool warn = false;
1534 stringstream ss;
1535 if (objs_per_shard > safe_max_objs_per_shard) {
1536 double over =
1537 100 - (safe_max_objs_per_shard/objs_per_shard * 100);
1538 ss << boost::format("OVER %4f%%") % over;
1539 warn = true;
1540 } else {
1541 double fill_pct =
1542 objs_per_shard / safe_max_objs_per_shard * 100;
1543 if (fill_pct >= shard_warn_pct) {
1544 ss << boost::format("WARN %4f%%") % fill_pct;
1545 warn = true;
1546 } else {
1547 ss << "OK";
1548 }
1549 }
1550
1551 if (warn || (! warnings_only)) {
1552 formatter->open_object_section("bucket");
1553 formatter->dump_string("bucket", bucket.name);
1554 formatter->dump_string("tenant", bucket.tenant);
1555 formatter->dump_int("num_objects", num_objects);
1556 formatter->dump_int("num_shards", num_shards);
1557 formatter->dump_int("objects_per_shard", objs_per_shard);
1558 formatter->dump_string("fill_status", ss.str());
1559 formatter->close_section();
1560 }
1561 }
1562 }
a8e16298
TL
1563 formatter->flush(cout);
1564 } while (is_truncated); /* foreach: bucket */
7c673cae
FG
1565
1566 formatter->close_section();
1567 formatter->close_section();
1568 formatter->flush(cout);
1569
1570 } /* foreach: user_id */
1571
1572 formatter->close_section();
1573 formatter->flush(cout);
1574
1575 return ret;
1576} /* RGWBucketAdminOp::limit_check */
1577
1578int RGWBucketAdminOp::info(RGWRados *store, RGWBucketAdminOpState& op_state,
1579 RGWFormatterFlusher& flusher)
1580{
1581 RGWBucket bucket;
1582 int ret;
1583
1584 string bucket_name = op_state.get_bucket_name();
1585
1586 if (!bucket_name.empty()) {
1587 ret = bucket.init(store, op_state);
1588 if (ret < 0)
1589 return ret;
1590 }
1591
1592 Formatter *formatter = flusher.get_formatter();
1593 flusher.start(0);
1594
1595 CephContext *cct = store->ctx();
1596
1597 const size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
1598
1599 bool show_stats = op_state.will_fetch_stats();
1600 rgw_user user_id = op_state.get_user_id();
1601 if (op_state.is_user_op()) {
1602 formatter->open_array_section("buckets");
1603
1604 RGWUserBuckets buckets;
1605 string marker;
1606 bool is_truncated = false;
1607
1608 do {
1609 ret = rgw_read_user_buckets(store, op_state.get_user_id(), buckets,
1610 marker, string(), max_entries, false,
1611 &is_truncated);
1612 if (ret < 0)
1613 return ret;
1614
1615 map<string, RGWBucketEnt>& m = buckets.get_buckets();
1616 map<string, RGWBucketEnt>::iterator iter;
1617
1618 for (iter = m.begin(); iter != m.end(); ++iter) {
1619 std::string obj_name = iter->first;
1620 if (show_stats)
1621 bucket_stats(store, user_id.tenant, obj_name, formatter);
1622 else
1623 formatter->dump_string("bucket", obj_name);
1624
1625 marker = obj_name;
1626 }
1627
1628 flusher.flush();
1629 } while (is_truncated);
1630
1631 formatter->close_section();
1632 } else if (!bucket_name.empty()) {
1633 bucket_stats(store, user_id.tenant, bucket_name, formatter);
1634 } else {
1635 RGWAccessHandle handle;
1636
1637 formatter->open_array_section("buckets");
1638 if (store->list_buckets_init(&handle) >= 0) {
1639 rgw_bucket_dir_entry obj;
1640 while (store->list_buckets_next(obj, &handle) >= 0) {
1641 if (show_stats)
1642 bucket_stats(store, user_id.tenant, obj.key.name, formatter);
1643 else
1644 formatter->dump_string("bucket", obj.key.name);
1645 }
1646 }
1647
1648 formatter->close_section();
1649 }
1650
1651 flusher.flush();
1652
1653 return 0;
1654}
1655
94b18763
FG
1656int RGWBucketAdminOp::set_quota(RGWRados *store, RGWBucketAdminOpState& op_state)
1657{
1658 RGWBucket bucket;
1659
1660 int ret = bucket.init(store, op_state);
1661 if (ret < 0)
1662 return ret;
1663 return bucket.set_quota(op_state);
1664}
7c673cae 1665
f64942e4
AA
1666static int purge_bucket_instance(RGWRados *store, const RGWBucketInfo& bucket_info)
1667{
1668 int max_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
1669 for (int i = 0; i < max_shards; i++) {
1670 RGWRados::BucketShard bs(store);
1671 int shard_id = (bucket_info.num_shards > 0 ? i : -1);
1672 int ret = bs.init(bucket_info.bucket, shard_id, nullptr);
1673 if (ret < 0) {
1674 cerr << "ERROR: bs.init(bucket=" << bucket_info.bucket << ", shard=" << shard_id
1675 << "): " << cpp_strerror(-ret) << std::endl;
1676 return ret;
1677 }
1678 ret = store->bi_remove(bs);
1679 if (ret < 0) {
1680 cerr << "ERROR: failed to remove bucket index object: "
1681 << cpp_strerror(-ret) << std::endl;
1682 return ret;
1683 }
1684 }
1685 return 0;
1686}
1687
1688inline auto split_tenant(const std::string& bucket_name) ->
1689 std::pair<std::string,std::string>
1690{
1691 auto p = bucket_name.find('/');
1692 if(p != std::string::npos) {
1693 return std::make_pair(bucket_name.substr(0,p), bucket_name.substr(p+1));
1694 }
1695 return std::make_pair(std::string(), bucket_name);
1696}
1697
1698using bucket_instance_ls = std::vector<RGWBucketInfo>;
1699void get_stale_instances(RGWRados *store, const std::string& bucket_name,
1700 const vector<std::string>& lst,
1701 bucket_instance_ls& stale_instances)
1702{
1703
1704 RGWObjectCtx obj_ctx(store);
1705
1706 bucket_instance_ls other_instances;
1707// first iterate over the entries, and pick up the done buckets; these
1708// are guaranteed to be stale
1709 for (const auto& bucket_instance : lst){
1710 RGWBucketInfo binfo;
1711 int r = store->get_bucket_instance_info(obj_ctx, bucket_instance,
1712 binfo, nullptr,nullptr);
1713 if (r < 0){
1714 // this can only happen if someone deletes us right when we're processing
1715 lderr(store->ctx()) << "Bucket instance is invalid: " << bucket_instance
1716 << cpp_strerror(-r) << dendl;
1717 continue;
1718 }
1719 if (binfo.reshard_status == CLS_RGW_RESHARD_DONE)
1720 stale_instances.emplace_back(std::move(binfo));
1721 else {
1722 other_instances.emplace_back(std::move(binfo));
1723 }
1724 }
1725
1726 // Read the cur bucket info, if the bucket doesn't exist we can simply return
1727 // all the instances
1728 std::string tenant,bucket;
1729 std::tie(tenant, bucket) = split_tenant(bucket_name);
1730 RGWBucketInfo cur_bucket_info;
1731 int r = store->get_bucket_info(obj_ctx, tenant, bucket, cur_bucket_info, nullptr);
1732 if (r < 0) {
1733 if (r == -ENOENT) {
1734 // bucket doesn't exist, everything is stale then
1735 stale_instances.insert(std::end(stale_instances),
1736 std::make_move_iterator(other_instances.begin()),
1737 std::make_move_iterator(other_instances.end()));
1738 } else {
1739 // all bets are off if we can't read the bucket, just return the sureshot stale instances
1740 lderr(store->ctx()) << "error: reading bucket info for bucket: "
1741 << bucket << cpp_strerror(-r) << dendl;
1742 }
1743 return;
1744 }
1745
1746 // Don't process further in this round if bucket is resharding
1747 if (cur_bucket_info.reshard_status == CLS_RGW_RESHARD_IN_PROGRESS)
1748 return;
1749
1750 other_instances.erase(std::remove_if(other_instances.begin(), other_instances.end(),
1751 [&cur_bucket_info](const RGWBucketInfo& b){
1752 return (b.bucket.bucket_id == cur_bucket_info.bucket.bucket_id ||
1753 b.bucket.bucket_id == cur_bucket_info.new_bucket_instance_id);
1754 }),
1755 other_instances.end());
1756
1757 // check if there are still instances left
1758 if (other_instances.empty()) {
1759 return;
1760 }
1761
1762 // Now we have a bucket with instances where the reshard status is none, this
1763 // usually happens when the reshard process couldn't complete, lockdown the
1764 // bucket and walk through these instances to make sure no one else interferes
1765 // with these
1766 {
1767 RGWBucketReshardLock reshard_lock(store, cur_bucket_info, true);
1768 r = reshard_lock.lock();
1769 if (r < 0) {
1770 // most likely bucket is under reshard, return the sureshot stale instances
1771 ldout(store->ctx(), 5) << __func__
1772 << "failed to take reshard lock; reshard underway likey" << dendl;
1773 return;
1774 }
1775 auto sg = make_scope_guard([&reshard_lock](){ reshard_lock.unlock();} );
1776 // this should be fast enough that we may not need to renew locks and check
1777 // exit status?, should we read the values of the instances again?
1778 stale_instances.insert(std::end(stale_instances),
1779 std::make_move_iterator(other_instances.begin()),
1780 std::make_move_iterator(other_instances.end()));
1781 }
1782
1783 return;
1784}
1785
1786static int process_stale_instances(RGWRados *store, RGWBucketAdminOpState& op_state,
1787 RGWFormatterFlusher& flusher,
1788 std::function<void(const bucket_instance_ls&,
1789 Formatter *,
1790 RGWRados*)> process_f)
1791{
1792 std::string marker;
1793 void *handle;
1794 Formatter *formatter = flusher.get_formatter();
1795 static constexpr auto default_max_keys = 1000;
1796
1797 int ret = store->meta_mgr->list_keys_init("bucket.instance", marker, &handle);
1798 if (ret < 0) {
1799 cerr << "ERROR: can't get key: " << cpp_strerror(-ret) << std::endl;
1800 return ret;
1801 }
1802
1803 bool truncated;
1804
1805 formatter->open_array_section("keys");
1806
1807 do {
1808 list<std::string> keys;
1809
1810 ret = store->meta_mgr->list_keys_next(handle, default_max_keys, keys, &truncated);
1811 if (ret < 0 && ret != -ENOENT) {
1812 cerr << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << std::endl;
1813 return ret;
1814 } if (ret != -ENOENT) {
1815 // partition the list of buckets by buckets as the listing is un sorted,
1816 // since it would minimize the reads to bucket_info
1817 std::unordered_map<std::string, std::vector<std::string>> bucket_instance_map;
1818 for (auto &key: keys) {
1819 auto pos = key.find(':');
1820 if(pos != std::string::npos)
1821 bucket_instance_map[key.substr(0,pos)].emplace_back(std::move(key));
1822 }
1823 for (const auto& kv: bucket_instance_map) {
1824 bucket_instance_ls stale_lst;
1825 get_stale_instances(store, kv.first, kv.second, stale_lst);
1826 process_f(stale_lst, formatter, store);
1827 }
1828 }
1829 } while (truncated);
1830
1831 formatter->close_section(); // keys
1832 formatter->flush(cout);
1833 return 0;
1834}
1835
1836int RGWBucketAdminOp::list_stale_instances(RGWRados *store,
1837 RGWBucketAdminOpState& op_state,
1838 RGWFormatterFlusher& flusher)
1839{
1840 auto process_f = [](const bucket_instance_ls& lst,
1841 Formatter *formatter,
1842 RGWRados*){
1843 for (const auto& binfo: lst)
1844 formatter->dump_string("key", binfo.bucket.get_key());
1845 };
1846 return process_stale_instances(store, op_state, flusher, process_f);
1847}
1848
1849
1850int RGWBucketAdminOp::clear_stale_instances(RGWRados *store,
1851 RGWBucketAdminOpState& op_state,
1852 RGWFormatterFlusher& flusher)
1853{
1854 auto process_f = [](const bucket_instance_ls& lst,
1855 Formatter *formatter,
1856 RGWRados *store){
1857 for (const auto &binfo: lst) {
1858 int ret = purge_bucket_instance(store, binfo);
1859 if (ret == 0){
1860 auto md_key = "bucket.instance:" + binfo.bucket.get_key();
1861 ret = store->meta_mgr->remove(md_key);
1862 }
1863 formatter->open_object_section("delete_status");
1864 formatter->dump_string("bucket_instance", binfo.bucket.get_key());
1865 formatter->dump_int("status", -ret);
1866 formatter->close_section();
1867 }
1868 };
1869
1870 return process_stale_instances(store, op_state, flusher, process_f);
1871}
1872
7c673cae
FG
1873void rgw_data_change::dump(Formatter *f) const
1874{
1875 string type;
1876 switch (entity_type) {
1877 case ENTITY_TYPE_BUCKET:
1878 type = "bucket";
1879 break;
1880 default:
1881 type = "unknown";
1882 }
1883 encode_json("entity_type", type, f);
1884 encode_json("key", key, f);
1885 utime_t ut(timestamp);
1886 encode_json("timestamp", ut, f);
1887}
1888
1889void rgw_data_change::decode_json(JSONObj *obj) {
1890 string s;
1891 JSONDecoder::decode_json("entity_type", s, obj);
1892 if (s == "bucket") {
1893 entity_type = ENTITY_TYPE_BUCKET;
1894 } else {
1895 entity_type = ENTITY_TYPE_UNKNOWN;
1896 }
1897 JSONDecoder::decode_json("key", key, obj);
1898 utime_t ut;
1899 JSONDecoder::decode_json("timestamp", ut, obj);
1900 timestamp = ut.to_real_time();
1901}
1902
1903void rgw_data_change_log_entry::dump(Formatter *f) const
1904{
1905 encode_json("log_id", log_id, f);
1906 utime_t ut(log_timestamp);
1907 encode_json("log_timestamp", ut, f);
1908 encode_json("entry", entry, f);
1909}
1910
1911void rgw_data_change_log_entry::decode_json(JSONObj *obj) {
1912 JSONDecoder::decode_json("log_id", log_id, obj);
1913 utime_t ut;
1914 JSONDecoder::decode_json("log_timestamp", ut, obj);
1915 log_timestamp = ut.to_real_time();
1916 JSONDecoder::decode_json("entry", entry, obj);
1917}
1918
1919int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
1920 const string& name = bs.bucket.name;
1921 int shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
1922 uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards;
1923
1924 return (int)r;
1925}
1926
1927int RGWDataChangesLog::renew_entries()
1928{
1929 if (!store->need_to_log_data())
1930 return 0;
1931
1932 /* we can't keep the bucket name as part of the cls_log_entry, and we need
1933 * it later, so we keep two lists under the map */
1934 map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > > m;
1935
1936 lock.Lock();
1937 map<rgw_bucket_shard, bool> entries;
1938 entries.swap(cur_cycle);
1939 lock.Unlock();
1940
1941 map<rgw_bucket_shard, bool>::iterator iter;
1942 string section;
1943 real_time ut = real_clock::now();
1944 for (iter = entries.begin(); iter != entries.end(); ++iter) {
1945 const rgw_bucket_shard& bs = iter->first;
1946
1947 int index = choose_oid(bs);
1948
1949 cls_log_entry entry;
1950
1951 rgw_data_change change;
1952 bufferlist bl;
1953 change.entity_type = ENTITY_TYPE_BUCKET;
1954 change.key = bs.get_key();
1955 change.timestamp = ut;
1956 ::encode(change, bl);
1957
1958 store->time_log_prepare_entry(entry, ut, section, change.key, bl);
1959
1960 m[index].first.push_back(bs);
1961 m[index].second.emplace_back(std::move(entry));
1962 }
1963
1964 map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > >::iterator miter;
1965 for (miter = m.begin(); miter != m.end(); ++miter) {
1966 list<cls_log_entry>& entries = miter->second.second;
1967
1968 real_time now = real_clock::now();
1969
1970 int ret = store->time_log_add(oids[miter->first], entries, NULL);
1971 if (ret < 0) {
1972 /* we don't really need to have a special handling for failed cases here,
1973 * as this is just an optimization. */
1974 lderr(cct) << "ERROR: store->time_log_add() returned " << ret << dendl;
1975 return ret;
1976 }
1977
1978 real_time expiration = now;
1979 expiration += make_timespan(cct->_conf->rgw_data_log_window);
1980
1981 list<rgw_bucket_shard>& buckets = miter->second.first;
1982 list<rgw_bucket_shard>::iterator liter;
1983 for (liter = buckets.begin(); liter != buckets.end(); ++liter) {
1984 update_renewed(*liter, expiration);
1985 }
1986 }
1987
1988 return 0;
1989}
1990
1991void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status)
1992{
1993 assert(lock.is_locked());
1994 if (!changes.find(bs, status)) {
1995 status = ChangeStatusPtr(new ChangeStatus);
1996 changes.add(bs, status);
1997 }
1998}
1999
2000void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs)
2001{
2002 Mutex::Locker l(lock);
2003 cur_cycle[bs] = true;
2004}
2005
2006void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, real_time& expiration)
2007{
2008 Mutex::Locker l(lock);
2009 ChangeStatusPtr status;
2010 _get_change(bs, status);
2011
2012 ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs.bucket.name << " shard_id=" << bs.shard_id << " expiration=" << expiration << dendl;
2013 status->cur_expiration = expiration;
2014}
2015
2016int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) {
2017 rgw_bucket_shard bs(bucket, shard_id);
2018
2019 return choose_oid(bs);
2020}
2021
2022int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) {
2023 if (!store->need_to_log_data())
2024 return 0;
2025
91327a77
AA
2026 if (observer) {
2027 observer->on_bucket_changed(bucket.get_key());
2028 }
2029
7c673cae
FG
2030 rgw_bucket_shard bs(bucket, shard_id);
2031
2032 int index = choose_oid(bs);
2033 mark_modified(index, bs);
2034
2035 lock.Lock();
2036
2037 ChangeStatusPtr status;
2038 _get_change(bs, status);
2039
2040 lock.Unlock();
2041
2042 real_time now = real_clock::now();
2043
2044 status->lock->Lock();
2045
2046 ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
2047
2048 if (now < status->cur_expiration) {
2049 /* no need to send, recently completed */
2050 status->lock->Unlock();
2051
2052 register_renew(bs);
2053 return 0;
2054 }
2055
2056 RefCountedCond *cond;
2057
2058 if (status->pending) {
2059 cond = status->cond;
2060
2061 assert(cond);
2062
2063 status->cond->get();
2064 status->lock->Unlock();
2065
2066 int ret = cond->wait();
2067 cond->put();
2068 if (!ret) {
2069 register_renew(bs);
2070 }
2071 return ret;
2072 }
2073
2074 status->cond = new RefCountedCond;
2075 status->pending = true;
2076
2077 string& oid = oids[index];
2078 real_time expiration;
2079
2080 int ret;
2081
2082 do {
2083 status->cur_sent = now;
2084
2085 expiration = now;
2086 expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
2087
2088 status->lock->Unlock();
2089
2090 bufferlist bl;
2091 rgw_data_change change;
2092 change.entity_type = ENTITY_TYPE_BUCKET;
2093 change.key = bs.get_key();
2094 change.timestamp = now;
2095 ::encode(change, bl);
2096 string section;
2097
2098 ldout(cct, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
2099
2100 ret = store->time_log_add(oid, now, section, change.key, bl);
2101
2102 now = real_clock::now();
2103
2104 status->lock->Lock();
2105
2106 } while (!ret && real_clock::now() > expiration);
2107
2108 cond = status->cond;
2109
2110 status->pending = false;
2111 status->cur_expiration = status->cur_sent; /* time of when operation started, not completed */
2112 status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window);
2113 status->cond = NULL;
2114 status->lock->Unlock();
2115
2116 cond->done(ret);
2117 cond->put();
2118
2119 return ret;
2120}
2121
2122int RGWDataChangesLog::list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
2123 list<rgw_data_change_log_entry>& entries,
2124 const string& marker,
2125 string *out_marker,
2126 bool *truncated) {
31f18b77
FG
2127 if (shard >= num_shards)
2128 return -EINVAL;
7c673cae
FG
2129
2130 list<cls_log_entry> log_entries;
2131
2132 int ret = store->time_log_list(oids[shard], start_time, end_time,
2133 max_entries, log_entries, marker,
2134 out_marker, truncated);
2135 if (ret < 0)
2136 return ret;
2137
2138 list<cls_log_entry>::iterator iter;
2139 for (iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
2140 rgw_data_change_log_entry log_entry;
2141 log_entry.log_id = iter->id;
2142 real_time rt = iter->timestamp.to_real_time();
2143 log_entry.log_timestamp = rt;
2144 bufferlist::iterator liter = iter->data.begin();
2145 try {
2146 ::decode(log_entry.entry, liter);
2147 } catch (buffer::error& err) {
2148 lderr(cct) << "ERROR: failed to decode data changes log entry" << dendl;
2149 return -EIO;
2150 }
2151 entries.push_back(log_entry);
2152 }
2153
2154 return 0;
2155}
2156
2157int RGWDataChangesLog::list_entries(const real_time& start_time, const real_time& end_time, int max_entries,
2158 list<rgw_data_change_log_entry>& entries, LogMarker& marker, bool *ptruncated) {
2159 bool truncated;
2160 entries.clear();
2161
2162 for (; marker.shard < num_shards && (int)entries.size() < max_entries;
2163 marker.shard++, marker.marker.clear()) {
2164 int ret = list_entries(marker.shard, start_time, end_time, max_entries - entries.size(), entries,
2165 marker.marker, NULL, &truncated);
2166 if (ret == -ENOENT) {
2167 continue;
2168 }
2169 if (ret < 0) {
2170 return ret;
2171 }
2172 if (truncated) {
2173 *ptruncated = true;
2174 return 0;
2175 }
2176 }
2177
2178 *ptruncated = (marker.shard < num_shards);
2179
2180 return 0;
2181}
2182
2183int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info)
2184{
2185 if (shard_id >= num_shards)
2186 return -EINVAL;
2187
2188 string oid = oids[shard_id];
2189
2190 cls_log_header header;
2191
2192 int ret = store->time_log_info(oid, &header);
2193 if ((ret < 0) && (ret != -ENOENT))
2194 return ret;
2195
2196 info->marker = header.max_marker;
2197 info->last_update = header.max_time.to_real_time();
2198
2199 return 0;
2200}
2201
2202int RGWDataChangesLog::trim_entries(int shard_id, const real_time& start_time, const real_time& end_time,
2203 const string& start_marker, const string& end_marker)
2204{
2205 int ret;
2206
2207 if (shard_id > num_shards)
2208 return -EINVAL;
2209
2210 ret = store->time_log_trim(oids[shard_id], start_time, end_time, start_marker, end_marker);
2211
31f18b77 2212 if (ret == -ENOENT || ret == -ENODATA)
7c673cae
FG
2213 ret = 0;
2214
2215 return ret;
2216}
2217
2218int RGWDataChangesLog::trim_entries(const real_time& start_time, const real_time& end_time,
2219 const string& start_marker, const string& end_marker)
2220{
2221 for (int shard = 0; shard < num_shards; shard++) {
2222 int ret = store->time_log_trim(oids[shard], start_time, end_time, start_marker, end_marker);
31f18b77 2223 if (ret == -ENOENT || ret == -ENODATA) {
7c673cae
FG
2224 continue;
2225 }
2226 if (ret < 0)
2227 return ret;
2228 }
2229
2230 return 0;
2231}
2232
2233bool RGWDataChangesLog::going_down()
2234{
2235 return down_flag;
2236}
2237
2238RGWDataChangesLog::~RGWDataChangesLog() {
2239 down_flag = true;
2240 renew_thread->stop();
2241 renew_thread->join();
2242 delete renew_thread;
2243 delete[] oids;
2244}
2245
2246void *RGWDataChangesLog::ChangesRenewThread::entry() {
2247 do {
2248 dout(2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl;
2249 int r = log->renew_entries();
2250 if (r < 0) {
2251 dout(0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl;
2252 }
2253
2254 if (log->going_down())
2255 break;
2256
2257 int interval = cct->_conf->rgw_data_log_window * 3 / 4;
2258 lock.Lock();
2259 cond.WaitInterval(lock, utime_t(interval, 0));
2260 lock.Unlock();
2261 } while (!log->going_down());
2262
2263 return NULL;
2264}
2265
2266void RGWDataChangesLog::ChangesRenewThread::stop()
2267{
2268 Mutex::Locker l(lock);
2269 cond.Signal();
2270}
2271
2272void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
2273{
2274 auto key = bs.get_key();
2275 modified_lock.get_read();
2276 map<int, set<string> >::iterator iter = modified_shards.find(shard_id);
2277 if (iter != modified_shards.end()) {
2278 set<string>& keys = iter->second;
2279 if (keys.find(key) != keys.end()) {
2280 modified_lock.unlock();
2281 return;
2282 }
2283 }
2284 modified_lock.unlock();
2285
2286 RWLock::WLocker wl(modified_lock);
2287 modified_shards[shard_id].insert(key);
2288}
2289
2290void RGWDataChangesLog::read_clear_modified(map<int, set<string> > &modified)
2291{
2292 RWLock::WLocker wl(modified_lock);
2293 modified.swap(modified_shards);
2294 modified_shards.clear();
2295}
2296
2297void RGWBucketCompleteInfo::dump(Formatter *f) const {
2298 encode_json("bucket_info", info, f);
2299 encode_json("attrs", attrs, f);
2300}
2301
2302void RGWBucketCompleteInfo::decode_json(JSONObj *obj) {
2303 JSONDecoder::decode_json("bucket_info", info, obj);
2304 JSONDecoder::decode_json("attrs", attrs, obj);
2305}
2306
2307class RGWBucketMetadataHandler : public RGWMetadataHandler {
2308
2309public:
2310 string get_type() override { return "bucket"; }
2311
2312 int get(RGWRados *store, string& entry, RGWMetadataObject **obj) override {
2313 RGWObjVersionTracker ot;
2314 RGWBucketEntryPoint be;
2315
2316 real_time mtime;
2317 map<string, bufferlist> attrs;
2318 RGWObjectCtx obj_ctx(store);
2319
2320 string tenant_name, bucket_name;
2321 parse_bucket(entry, &tenant_name, &bucket_name);
2322 int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, be, &ot, &mtime, &attrs);
2323 if (ret < 0)
2324 return ret;
2325
2326 RGWBucketEntryMetadataObject *mdo = new RGWBucketEntryMetadataObject(be, ot.read_version, mtime);
2327
2328 *obj = mdo;
2329
2330 return 0;
2331 }
2332
2333 int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
2334 real_time mtime, JSONObj *obj, sync_type_t sync_type) override {
2335 RGWBucketEntryPoint be, old_be;
2336 try {
2337 decode_json_obj(be, obj);
2338 } catch (JSONDecoder::err& e) {
2339 return -EINVAL;
2340 }
2341
2342 real_time orig_mtime;
2343 map<string, bufferlist> attrs;
2344
2345 RGWObjVersionTracker old_ot;
2346 RGWObjectCtx obj_ctx(store);
2347
2348 string tenant_name, bucket_name;
2349 parse_bucket(entry, &tenant_name, &bucket_name);
2350 int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, old_be, &old_ot, &orig_mtime, &attrs);
2351 if (ret < 0 && ret != -ENOENT)
2352 return ret;
2353
2354 // are we actually going to perform this put, or is it too old?
2355 if (ret != -ENOENT &&
2356 !check_versions(old_ot.read_version, orig_mtime,
2357 objv_tracker.write_version, mtime, sync_type)) {
2358 return STATUS_NO_APPLY;
2359 }
2360
2361 objv_tracker.read_version = old_ot.read_version; /* maintain the obj version we just read */
2362
2363 ret = store->put_bucket_entrypoint_info(tenant_name, bucket_name, be, false, objv_tracker, mtime, &attrs);
2364 if (ret < 0)
2365 return ret;
2366
2367 /* link bucket */
2368 if (be.linked) {
2369 ret = rgw_link_bucket(store, be.owner, be.bucket, be.creation_time, false);
2370 } else {
3efd9988
FG
2371 ret = rgw_unlink_bucket(store, be.owner, be.bucket.tenant,
2372 be.bucket.name, false);
7c673cae
FG
2373 }
2374
2375 return ret;
2376 }
2377
2378 struct list_keys_info {
2379 RGWRados *store;
2380 RGWListRawObjsCtx ctx;
2381 };
2382
2383 int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) override {
2384 RGWBucketEntryPoint be;
2385 RGWObjectCtx obj_ctx(store);
2386
2387 string tenant_name, bucket_name;
2388 parse_bucket(entry, &tenant_name, &bucket_name);
2389 int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, be, &objv_tracker, NULL, NULL);
2390 if (ret < 0)
2391 return ret;
2392
2393 /*
31f18b77 2394 * We're unlinking the bucket but we don't want to update the entrypoint here - we're removing
7c673cae
FG
2395 * it immediately and don't want to invalidate our cached objv_version or the bucket obj removal
2396 * will incorrectly fail.
2397 */
2398 ret = rgw_unlink_bucket(store, be.owner, tenant_name, bucket_name, false);
2399 if (ret < 0) {
2400 lderr(store->ctx()) << "could not unlink bucket=" << entry << " owner=" << be.owner << dendl;
2401 }
2402
2403 ret = rgw_bucket_delete_bucket_obj(store, tenant_name, bucket_name, objv_tracker);
2404 if (ret < 0) {
2405 lderr(store->ctx()) << "could not delete bucket=" << entry << dendl;
2406 }
2407 /* idempotent */
2408 return 0;
2409 }
2410
2411 void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {
2412 oid = key;
2413 pool = store->get_zone_params().domain_root;
2414 }
2415
181888fb
FG
2416 int list_keys_init(RGWRados *store, const string& marker, void **phandle) override {
2417 auto info = ceph::make_unique<list_keys_info>();
7c673cae
FG
2418
2419 info->store = store;
2420
181888fb
FG
2421 int ret = store->list_raw_objects_init(store->get_zone_params().domain_root, marker,
2422 &info->ctx);
2423 if (ret < 0) {
2424 return ret;
2425 }
2426 *phandle = (void *)info.release();
7c673cae
FG
2427
2428 return 0;
2429 }
2430
2431 int list_keys_next(void *handle, int max, list<string>& keys, bool *truncated) override {
2432 list_keys_info *info = static_cast<list_keys_info *>(handle);
2433
2434 string no_filter;
2435
2436 keys.clear();
2437
2438 RGWRados *store = info->store;
2439
2440 list<string> unfiltered_keys;
2441
181888fb
FG
2442 int ret = store->list_raw_objects_next(no_filter, max, info->ctx,
2443 unfiltered_keys, truncated);
7c673cae
FG
2444 if (ret < 0 && ret != -ENOENT)
2445 return ret;
2446 if (ret == -ENOENT) {
2447 if (truncated)
2448 *truncated = false;
2449 return 0;
2450 }
2451
2452 // now filter out the system entries
2453 list<string>::iterator iter;
2454 for (iter = unfiltered_keys.begin(); iter != unfiltered_keys.end(); ++iter) {
2455 string& k = *iter;
2456
2457 if (k[0] != '.') {
2458 keys.push_back(k);
2459 }
2460 }
2461
2462 return 0;
2463 }
2464
2465 void list_keys_complete(void *handle) override {
2466 list_keys_info *info = static_cast<list_keys_info *>(handle);
2467 delete info;
2468 }
181888fb
FG
2469
2470 string get_marker(void *handle) {
2471 list_keys_info *info = static_cast<list_keys_info *>(handle);
2472 return info->store->list_raw_objs_get_cursor(info->ctx);
2473 }
7c673cae
FG
2474};
2475
2476class RGWBucketInstanceMetadataHandler : public RGWMetadataHandler {
2477
2478public:
2479 string get_type() override { return "bucket.instance"; }
2480
2481 int get(RGWRados *store, string& oid, RGWMetadataObject **obj) override {
2482 RGWBucketCompleteInfo bci;
2483
2484 real_time mtime;
2485 RGWObjectCtx obj_ctx(store);
2486
2487 int ret = store->get_bucket_instance_info(obj_ctx, oid, bci.info, &mtime, &bci.attrs);
2488 if (ret < 0)
2489 return ret;
2490
2491 RGWBucketInstanceMetadataObject *mdo = new RGWBucketInstanceMetadataObject(bci, bci.info.objv_tracker.read_version, mtime);
2492
2493 *obj = mdo;
2494
2495 return 0;
2496 }
2497
2498 int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
2499 real_time mtime, JSONObj *obj, sync_type_t sync_type) override {
2500 RGWBucketCompleteInfo bci, old_bci;
2501 try {
2502 decode_json_obj(bci, obj);
2503 } catch (JSONDecoder::err& e) {
2504 return -EINVAL;
2505 }
2506
2507 real_time orig_mtime;
2508 RGWObjectCtx obj_ctx(store);
2509
2510 int ret = store->get_bucket_instance_info(obj_ctx, entry, old_bci.info,
2511 &orig_mtime, &old_bci.attrs);
2512 bool exists = (ret != -ENOENT);
2513 if (ret < 0 && exists)
2514 return ret;
2515
2516 if (!exists || old_bci.info.bucket.bucket_id != bci.info.bucket.bucket_id) {
2517 /* a new bucket, we need to select a new bucket placement for it */
2518 auto key(entry);
2519 rgw_bucket_instance_oid_to_key(key);
2520 string tenant_name;
2521 string bucket_name;
2522 string bucket_instance;
2523 parse_bucket(key, &tenant_name, &bucket_name, &bucket_instance);
2524
2525 RGWZonePlacementInfo rule_info;
2526 bci.info.bucket.name = bucket_name;
2527 bci.info.bucket.bucket_id = bucket_instance;
2528 bci.info.bucket.tenant = tenant_name;
2529 ret = store->select_bucket_location_by_rule(bci.info.placement_rule, &rule_info);
2530 if (ret < 0) {
2531 ldout(store->ctx(), 0) << "ERROR: select_bucket_placement() returned " << ret << dendl;
2532 return ret;
2533 }
2534 bci.info.index_type = rule_info.index_type;
2535 } else {
2536 /* existing bucket, keep its placement */
2537 bci.info.bucket.explicit_placement = old_bci.info.bucket.explicit_placement;
2538 bci.info.placement_rule = old_bci.info.placement_rule;
2539 }
2540
c07f9fc5
FG
2541 if (exists && old_bci.info.datasync_flag_enabled() != bci.info.datasync_flag_enabled()) {
2542 int shards_num = bci.info.num_shards? bci.info.num_shards : 1;
2543 int shard_id = bci.info.num_shards? 0 : -1;
2544
2545 if (!bci.info.datasync_flag_enabled()) {
2546 ret = store->stop_bi_log_entries(bci.info, -1);
2547 if (ret < 0) {
2548 lderr(store->ctx()) << "ERROR: failed writing bilog" << dendl;
2549 return ret;
2550 }
2551 } else {
2552 ret = store->resync_bi_log_entries(bci.info, -1);
2553 if (ret < 0) {
2554 lderr(store->ctx()) << "ERROR: failed writing bilog" << dendl;
2555 return ret;
2556 }
2557 }
2558
2559 for (int i = 0; i < shards_num; ++i, ++shard_id) {
2560 ret = store->data_log->add_entry(bci.info.bucket, shard_id);
2561 if (ret < 0) {
2562 lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
2563 return ret;
2564 }
2565 }
2566 }
2567
7c673cae
FG
2568 // are we actually going to perform this put, or is it too old?
2569 if (exists &&
2570 !check_versions(old_bci.info.objv_tracker.read_version, orig_mtime,
2571 objv_tracker.write_version, mtime, sync_type)) {
2572 objv_tracker.read_version = old_bci.info.objv_tracker.read_version;
2573 return STATUS_NO_APPLY;
2574 }
2575
2576 /* record the read version (if any), store the new version */
2577 bci.info.objv_tracker.read_version = old_bci.info.objv_tracker.read_version;
2578 bci.info.objv_tracker.write_version = objv_tracker.write_version;
2579
2580 ret = store->put_bucket_instance_info(bci.info, false, mtime, &bci.attrs);
2581 if (ret < 0)
2582 return ret;
2583
2584 objv_tracker = bci.info.objv_tracker;
2585
2586 ret = store->init_bucket_index(bci.info, bci.info.num_shards);
2587 if (ret < 0)
2588 return ret;
2589
2590 return STATUS_APPLIED;
2591 }
2592
2593 struct list_keys_info {
2594 RGWRados *store;
2595 RGWListRawObjsCtx ctx;
2596 };
2597
f64942e4
AA
2598 int remove(RGWRados *store, string& entry,
2599 RGWObjVersionTracker& objv_tracker) override {
7c673cae
FG
2600 RGWBucketInfo info;
2601 RGWObjectCtx obj_ctx(store);
2602
2603 int ret = store->get_bucket_instance_info(obj_ctx, entry, info, NULL, NULL);
2604 if (ret < 0 && ret != -ENOENT)
2605 return ret;
2606
f64942e4
AA
2607 return rgw_bucket_instance_remove_entry(store, entry,
2608 &info.objv_tracker);
7c673cae
FG
2609 }
2610
2611 void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {
2612 oid = RGW_BUCKET_INSTANCE_MD_PREFIX + key;
2613 rgw_bucket_instance_key_to_oid(oid);
2614 pool = store->get_zone_params().domain_root;
2615 }
2616
181888fb
FG
2617 int list_keys_init(RGWRados *store, const string& marker, void **phandle) override {
2618 auto info = ceph::make_unique<list_keys_info>();
7c673cae
FG
2619
2620 info->store = store;
2621
181888fb
FG
2622 int ret = store->list_raw_objects_init(store->get_zone_params().domain_root, marker,
2623 &info->ctx);
2624 if (ret < 0) {
2625 return ret;
2626 }
2627 *phandle = (void *)info.release();
7c673cae
FG
2628
2629 return 0;
2630 }
2631
2632 int list_keys_next(void *handle, int max, list<string>& keys, bool *truncated) override {
2633 list_keys_info *info = static_cast<list_keys_info *>(handle);
2634
2635 string no_filter;
2636
2637 keys.clear();
2638
2639 RGWRados *store = info->store;
2640
2641 list<string> unfiltered_keys;
2642
181888fb
FG
2643 int ret = store->list_raw_objects_next(no_filter, max, info->ctx,
2644 unfiltered_keys, truncated);
7c673cae
FG
2645 if (ret < 0 && ret != -ENOENT)
2646 return ret;
2647 if (ret == -ENOENT) {
2648 if (truncated)
2649 *truncated = false;
2650 return 0;
2651 }
2652
2653 constexpr int prefix_size = sizeof(RGW_BUCKET_INSTANCE_MD_PREFIX) - 1;
2654 // now filter in the relevant entries
2655 list<string>::iterator iter;
2656 for (iter = unfiltered_keys.begin(); iter != unfiltered_keys.end(); ++iter) {
2657 string& k = *iter;
2658
2659 if (k.compare(0, prefix_size, RGW_BUCKET_INSTANCE_MD_PREFIX) == 0) {
2660 auto oid = k.substr(prefix_size);
2661 rgw_bucket_instance_oid_to_key(oid);
2662 keys.emplace_back(std::move(oid));
2663 }
2664 }
2665
2666 return 0;
2667 }
2668
2669 void list_keys_complete(void *handle) override {
2670 list_keys_info *info = static_cast<list_keys_info *>(handle);
2671 delete info;
2672 }
2673
181888fb
FG
2674 string get_marker(void *handle) {
2675 list_keys_info *info = static_cast<list_keys_info *>(handle);
2676 return info->store->list_raw_objs_get_cursor(info->ctx);
2677 }
2678
7c673cae
FG
2679 /*
2680 * hash entry for mdlog placement. Use the same hash key we'd have for the bucket entry
2681 * point, so that the log entries end up at the same log shard, so that we process them
2682 * in order
2683 */
2684 void get_hash_key(const string& section, const string& key, string& hash_key) override {
2685 string k;
2686 int pos = key.find(':');
2687 if (pos < 0)
2688 k = key;
2689 else
2690 k = key.substr(0, pos);
2691 hash_key = "bucket:" + k;
2692 }
2693};
2694
2695void rgw_bucket_init(RGWMetadataManager *mm)
2696{
2697 bucket_meta_handler = new RGWBucketMetadataHandler;
2698 mm->register_handler(bucket_meta_handler);
2699 bucket_instance_meta_handler = new RGWBucketInstanceMetadataHandler;
2700 mm->register_handler(bucket_instance_meta_handler);
2701}