]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_bucket.cc
import ceph 14.2.5
[ceph.git] / ceph / src / rgw / rgw_bucket.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <errno.h>
5
6#include <string>
7#include <map>
8#include <sstream>
9
10#include <boost/utility/string_ref.hpp>
11#include <boost/format.hpp>
12
13#include "common/errno.h"
14#include "common/ceph_json.h"
f64942e4 15#include "include/scope_guard.h"
7c673cae 16#include "rgw_rados.h"
11fdf7f2 17#include "rgw_zone.h"
7c673cae
FG
18#include "rgw_acl.h"
19#include "rgw_acl_s3.h"
20
21#include "include/types.h"
22#include "rgw_bucket.h"
23#include "rgw_user.h"
24#include "rgw_string.h"
224ce89b 25#include "rgw_multi.h"
7c673cae 26
11fdf7f2
TL
27#include "services/svc_zone.h"
28#include "services/svc_sys_obj.h"
29
7c673cae
FG
30#include "include/rados/librados.hpp"
31// until everything is moved from rgw_common
32#include "rgw_common.h"
f64942e4 33#include "rgw_reshard.h"
11fdf7f2 34#include "rgw_lc.h"
7c673cae
FG
35#include "cls/user/cls_user_types.h"
36
37#define dout_context g_ceph_context
38#define dout_subsys ceph_subsys_rgw
39
40#define BUCKET_TAG_TIMEOUT 30
41
7c673cae
FG
42
43static RGWMetadataHandler *bucket_meta_handler = NULL;
44static RGWMetadataHandler *bucket_instance_meta_handler = NULL;
45
11fdf7f2 46// define as static when RGWBucket implementation completes
7c673cae
FG
47void rgw_get_buckets_obj(const rgw_user& user_id, string& buckets_obj_id)
48{
49 buckets_obj_id = user_id.to_str();
50 buckets_obj_id += RGW_BUCKETS_OBJ_SUFFIX;
51}
52
53/*
54 * Note that this is not a reversal of parse_bucket(). That one deals
55 * with the syntax we need in metadata and such. This one deals with
56 * the representation in RADOS pools. We chose '/' because it's not
57 * acceptable in bucket names and thus qualified buckets cannot conflict
58 * with the legacy or S3 buckets.
59 */
60std::string rgw_make_bucket_entry_name(const std::string& tenant_name,
61 const std::string& bucket_name) {
62 std::string bucket_entry;
63
64 if (bucket_name.empty()) {
65 bucket_entry.clear();
66 } else if (tenant_name.empty()) {
67 bucket_entry = bucket_name;
68 } else {
69 bucket_entry = tenant_name + "/" + bucket_name;
70 }
71
72 return bucket_entry;
73}
74
75/*
76 * Tenants are separated from buckets in URLs by a colon in S3.
77 * This function is not to be used on Swift URLs, not even for COPY arguments.
78 */
79void rgw_parse_url_bucket(const string &bucket, const string& auth_tenant,
80 string &tenant_name, string &bucket_name) {
81
82 int pos = bucket.find(':');
83 if (pos >= 0) {
84 /*
85 * N.B.: We allow ":bucket" syntax with explicit empty tenant in order
86 * to refer to the legacy tenant, in case users in new named tenants
87 * want to access old global buckets.
88 */
89 tenant_name = bucket.substr(0, pos);
90 bucket_name = bucket.substr(pos + 1);
91 } else {
92 tenant_name = auth_tenant;
93 bucket_name = bucket;
94 }
95}
96
97/**
98 * Get all the buckets owned by a user and fill up an RGWUserBuckets with them.
99 * Returns: 0 on success, -ERR# on failure.
100 */
101int rgw_read_user_buckets(RGWRados * store,
102 const rgw_user& user_id,
103 RGWUserBuckets& buckets,
104 const string& marker,
105 const string& end_marker,
106 uint64_t max,
107 bool need_stats,
108 bool *is_truncated,
109 uint64_t default_amount)
110{
111 int ret;
112 buckets.clear();
3efd9988 113 std::string buckets_obj_id;
7c673cae 114 rgw_get_buckets_obj(user_id, buckets_obj_id);
11fdf7f2 115 rgw_raw_obj obj(store->svc.zone->get_zone_params().user_uid_pool, buckets_obj_id);
7c673cae
FG
116
117 bool truncated = false;
118 string m = marker;
119
120 uint64_t total = 0;
121
122 if (!max) {
123 max = default_amount;
124 }
125
126 do {
3efd9988 127 std::list<cls_user_bucket_entry> entries;
7c673cae 128 ret = store->cls_user_list_buckets(obj, m, end_marker, max - total, entries, &m, &truncated);
3efd9988 129 if (ret == -ENOENT) {
7c673cae 130 ret = 0;
3efd9988 131 }
7c673cae 132
3efd9988 133 if (ret < 0) {
7c673cae 134 return ret;
3efd9988 135 }
7c673cae 136
3efd9988
FG
137 for (auto& entry : entries) {
138 buckets.add(RGWBucketEnt(user_id, std::move(entry)));
7c673cae
FG
139 total++;
140 }
141
142 } while (truncated && total < max);
143
144 if (is_truncated != nullptr) {
145 *is_truncated = truncated;
146 }
147
148 if (need_stats) {
149 map<string, RGWBucketEnt>& m = buckets.get_buckets();
150 ret = store->update_containers_stats(m);
151 if (ret < 0 && ret != -ENOENT) {
152 ldout(store->ctx(), 0) << "ERROR: could not get stats for buckets" << dendl;
153 return ret;
154 }
155 }
156 return 0;
157}
158
159int rgw_bucket_sync_user_stats(RGWRados *store, const rgw_user& user_id, const RGWBucketInfo& bucket_info)
160{
161 string buckets_obj_id;
162 rgw_get_buckets_obj(user_id, buckets_obj_id);
11fdf7f2 163 rgw_raw_obj obj(store->svc.zone->get_zone_params().user_uid_pool, buckets_obj_id);
7c673cae
FG
164
165 return store->cls_user_sync_bucket_stats(obj, bucket_info);
166}
167
168int rgw_bucket_sync_user_stats(RGWRados *store, const string& tenant_name, const string& bucket_name)
169{
170 RGWBucketInfo bucket_info;
11fdf7f2 171 RGWSysObjectCtx obj_ctx = store->svc.sysobj->init_obj_ctx();
7c673cae
FG
172 int ret = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, NULL);
173 if (ret < 0) {
174 ldout(store->ctx(), 0) << "ERROR: could not fetch bucket info: ret=" << ret << dendl;
175 return ret;
176 }
177
178 ret = rgw_bucket_sync_user_stats(store, bucket_info.owner, bucket_info);
179 if (ret < 0) {
180 ldout(store->ctx(), 0) << "ERROR: could not sync user stats for bucket " << bucket_name << ": ret=" << ret << dendl;
181 return ret;
182 }
183
184 return 0;
185}
186
3efd9988
FG
187int rgw_link_bucket(RGWRados* const store,
188 const rgw_user& user_id,
189 rgw_bucket& bucket,
190 ceph::real_time creation_time,
191 bool update_entrypoint)
7c673cae
FG
192{
193 int ret;
194 string& tenant_name = bucket.tenant;
195 string& bucket_name = bucket.name;
196
197 cls_user_bucket_entry new_bucket;
198
199 RGWBucketEntryPoint ep;
200 RGWObjVersionTracker ot;
201
202 bucket.convert(&new_bucket.bucket);
203 new_bucket.size = 0;
204 if (real_clock::is_zero(creation_time))
205 new_bucket.creation_time = real_clock::now();
206 else
207 new_bucket.creation_time = creation_time;
208
209 map<string, bufferlist> attrs;
11fdf7f2 210 RGWSysObjectCtx obj_ctx = store->svc.sysobj->init_obj_ctx();
7c673cae
FG
211
212 if (update_entrypoint) {
213 ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, ep, &ot, NULL, &attrs);
214 if (ret < 0 && ret != -ENOENT) {
215 ldout(store->ctx(), 0) << "ERROR: store->get_bucket_entrypoint_info() returned: "
216 << cpp_strerror(-ret) << dendl;
217 }
218 }
219
220 string buckets_obj_id;
221 rgw_get_buckets_obj(user_id, buckets_obj_id);
222
11fdf7f2 223 rgw_raw_obj obj(store->svc.zone->get_zone_params().user_uid_pool, buckets_obj_id);
7c673cae
FG
224 ret = store->cls_user_add_bucket(obj, new_bucket);
225 if (ret < 0) {
226 ldout(store->ctx(), 0) << "ERROR: error adding bucket to directory: "
227 << cpp_strerror(-ret) << dendl;
228 goto done_err;
229 }
230
231 if (!update_entrypoint)
232 return 0;
233
234 ep.linked = true;
235 ep.owner = user_id;
236 ep.bucket = bucket;
237 ret = store->put_bucket_entrypoint_info(tenant_name, bucket_name, ep, false, ot, real_time(), &attrs);
238 if (ret < 0)
239 goto done_err;
240
241 return 0;
242done_err:
243 int r = rgw_unlink_bucket(store, user_id, bucket.tenant, bucket.name);
244 if (r < 0) {
245 ldout(store->ctx(), 0) << "ERROR: failed unlinking bucket on error cleanup: "
246 << cpp_strerror(-r) << dendl;
247 }
248 return ret;
249}
250
251int rgw_unlink_bucket(RGWRados *store, const rgw_user& user_id, const string& tenant_name, const string& bucket_name, bool update_entrypoint)
252{
253 int ret;
254
255 string buckets_obj_id;
256 rgw_get_buckets_obj(user_id, buckets_obj_id);
257
258 cls_user_bucket bucket;
259 bucket.name = bucket_name;
11fdf7f2 260 rgw_raw_obj obj(store->svc.zone->get_zone_params().user_uid_pool, buckets_obj_id);
7c673cae
FG
261 ret = store->cls_user_remove_bucket(obj, bucket);
262 if (ret < 0) {
263 ldout(store->ctx(), 0) << "ERROR: error removing bucket from directory: "
264 << cpp_strerror(-ret)<< dendl;
265 }
266
267 if (!update_entrypoint)
268 return 0;
269
270 RGWBucketEntryPoint ep;
271 RGWObjVersionTracker ot;
272 map<string, bufferlist> attrs;
11fdf7f2 273 RGWSysObjectCtx obj_ctx = store->svc.sysobj->init_obj_ctx();
7c673cae
FG
274 ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, ep, &ot, NULL, &attrs);
275 if (ret == -ENOENT)
276 return 0;
277 if (ret < 0)
278 return ret;
279
280 if (!ep.linked)
281 return 0;
282
283 if (ep.owner != user_id) {
284 ldout(store->ctx(), 0) << "bucket entry point user mismatch, can't unlink bucket: " << ep.owner << " != " << user_id << dendl;
285 return -EINVAL;
286 }
287
288 ep.linked = false;
289 return store->put_bucket_entrypoint_info(tenant_name, bucket_name, ep, false, ot, real_time(), &attrs);
290}
291
292int rgw_bucket_store_info(RGWRados *store, const string& bucket_name, bufferlist& bl, bool exclusive,
293 map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
294 real_time mtime) {
295 return store->meta_mgr->put_entry(bucket_meta_handler, bucket_name, bl, exclusive, objv_tracker, mtime, pattrs);
296}
297
298int rgw_bucket_instance_store_info(RGWRados *store, string& entry, bufferlist& bl, bool exclusive,
299 map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker,
300 real_time mtime) {
301 return store->meta_mgr->put_entry(bucket_instance_meta_handler, entry, bl, exclusive, objv_tracker, mtime, pattrs);
302}
303
f64942e4
AA
304int rgw_bucket_instance_remove_entry(RGWRados *store, const string& entry,
305 RGWObjVersionTracker *objv_tracker) {
7c673cae
FG
306 return store->meta_mgr->remove_entry(bucket_instance_meta_handler, entry, objv_tracker);
307}
308
309// 'tenant/' is used in bucket instance keys for sync to avoid parsing ambiguity
310// with the existing instance[:shard] format. once we parse the shard, the / is
311// replaced with a : to match the [tenant:]instance format
312void rgw_bucket_instance_key_to_oid(string& key)
313{
314 // replace tenant/ with tenant:
315 auto c = key.find('/');
316 if (c != string::npos) {
317 key[c] = ':';
318 }
319}
320
321// convert bucket instance oids back to the tenant/ format for metadata keys.
322// it's safe to parse 'tenant:' only for oids, because they won't contain the
323// optional :shard at the end
324void rgw_bucket_instance_oid_to_key(string& oid)
325{
326 // find first : (could be tenant:bucket or bucket:instance)
327 auto c = oid.find(':');
328 if (c != string::npos) {
329 // if we find another :, the first one was for tenant
330 if (oid.find(':', c + 1) != string::npos) {
331 oid[c] = '/';
332 }
333 }
334}
335
336int rgw_bucket_parse_bucket_instance(const string& bucket_instance, string *target_bucket_instance, int *shard_id)
337{
338 ssize_t pos = bucket_instance.rfind(':');
339 if (pos < 0) {
340 return -EINVAL;
341 }
342
343 string first = bucket_instance.substr(0, pos);
344 string second = bucket_instance.substr(pos + 1);
345
346 if (first.find(':') == string::npos) {
347 *shard_id = -1;
348 *target_bucket_instance = bucket_instance;
349 return 0;
350 }
351
352 *target_bucket_instance = first;
353 string err;
354 *shard_id = strict_strtol(second.c_str(), 10, &err);
355 if (!err.empty()) {
356 return -EINVAL;
357 }
358
359 return 0;
360}
361
362// parse key in format: [tenant/]name:instance[:shard_id]
363int rgw_bucket_parse_bucket_key(CephContext *cct, const string& key,
364 rgw_bucket *bucket, int *shard_id)
365{
366 boost::string_ref name{key};
367 boost::string_ref instance;
368
369 // split tenant/name
370 auto pos = name.find('/');
371 if (pos != boost::string_ref::npos) {
372 auto tenant = name.substr(0, pos);
373 bucket->tenant.assign(tenant.begin(), tenant.end());
374 name = name.substr(pos + 1);
375 }
376
377 // split name:instance
378 pos = name.find(':');
379 if (pos != boost::string_ref::npos) {
380 instance = name.substr(pos + 1);
381 name = name.substr(0, pos);
382 }
383 bucket->name.assign(name.begin(), name.end());
384
385 // split instance:shard
386 pos = instance.find(':');
387 if (pos == boost::string_ref::npos) {
388 bucket->bucket_id.assign(instance.begin(), instance.end());
389 *shard_id = -1;
390 return 0;
391 }
392
393 // parse shard id
394 auto shard = instance.substr(pos + 1);
395 string err;
396 auto id = strict_strtol(shard.data(), 10, &err);
397 if (!err.empty()) {
398 ldout(cct, 0) << "ERROR: failed to parse bucket shard '"
399 << instance.data() << "': " << err << dendl;
400 return -EINVAL;
401 }
402
403 *shard_id = id;
404 instance = instance.substr(0, pos);
405 bucket->bucket_id.assign(instance.begin(), instance.end());
406 return 0;
407}
408
409int rgw_bucket_set_attrs(RGWRados *store, RGWBucketInfo& bucket_info,
410 map<string, bufferlist>& attrs,
411 RGWObjVersionTracker *objv_tracker)
412{
413 rgw_bucket& bucket = bucket_info.bucket;
414
415 if (!bucket_info.has_instance_obj) {
416 /* an old bucket object, need to convert it */
11fdf7f2 417 RGWSysObjectCtx obj_ctx = store->svc.sysobj->init_obj_ctx();
7c673cae
FG
418 int ret = store->convert_old_bucket_info(obj_ctx, bucket.tenant, bucket.name);
419 if (ret < 0) {
420 ldout(store->ctx(), 0) << "ERROR: failed converting old bucket info: " << ret << dendl;
421 return ret;
422 }
423 }
424
425 /* we want the bucket instance name without the oid prefix cruft */
426 string key = bucket.get_key();
427 bufferlist bl;
428
11fdf7f2 429 encode(bucket_info, bl);
7c673cae
FG
430
431 return rgw_bucket_instance_store_info(store, key, bl, false, &attrs, objv_tracker, real_time());
432}
433
434static void dump_mulipart_index_results(list<rgw_obj_index_key>& objs_to_unlink,
435 Formatter *f)
436{
d2e6a577
FG
437 for (const auto& o : objs_to_unlink) {
438 f->dump_string("object", o.name);
7c673cae 439 }
7c673cae
FG
440}
441
442void check_bad_user_bucket_mapping(RGWRados *store, const rgw_user& user_id,
443 bool fix)
444{
445 RGWUserBuckets user_buckets;
446 bool is_truncated = false;
447 string marker;
448
449 CephContext *cct = store->ctx();
450
451 size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
452
453 do {
454 int ret = rgw_read_user_buckets(store, user_id, user_buckets, marker,
455 string(), max_entries, false,
456 &is_truncated);
457 if (ret < 0) {
458 ldout(store->ctx(), 0) << "failed to read user buckets: "
459 << cpp_strerror(-ret) << dendl;
460 return;
461 }
462
463 map<string, RGWBucketEnt>& buckets = user_buckets.get_buckets();
464 for (map<string, RGWBucketEnt>::iterator i = buckets.begin();
465 i != buckets.end();
466 ++i) {
467 marker = i->first;
468
469 RGWBucketEnt& bucket_ent = i->second;
470 rgw_bucket& bucket = bucket_ent.bucket;
471
472 RGWBucketInfo bucket_info;
473 real_time mtime;
11fdf7f2 474 RGWSysObjectCtx obj_ctx = store->svc.sysobj->init_obj_ctx();
7c673cae
FG
475 int r = store->get_bucket_info(obj_ctx, user_id.tenant, bucket.name, bucket_info, &mtime);
476 if (r < 0) {
477 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket << dendl;
478 continue;
479 }
480
481 rgw_bucket& actual_bucket = bucket_info.bucket;
482
483 if (actual_bucket.name.compare(bucket.name) != 0 ||
484 actual_bucket.tenant.compare(bucket.tenant) != 0 ||
485 actual_bucket.marker.compare(bucket.marker) != 0 ||
486 actual_bucket.bucket_id.compare(bucket.bucket_id) != 0) {
487 cout << "bucket info mismatch: expected " << actual_bucket << " got " << bucket << std::endl;
488 if (fix) {
489 cout << "fixing" << std::endl;
3efd9988
FG
490 r = rgw_link_bucket(store, user_id, actual_bucket,
491 bucket_info.creation_time);
7c673cae
FG
492 if (r < 0) {
493 cerr << "failed to fix bucket: " << cpp_strerror(-r) << std::endl;
494 }
495 }
496 }
497 }
498 } while (is_truncated);
499}
500
501static bool bucket_object_check_filter(const string& oid)
502{
503 rgw_obj_key key;
504 string ns;
505 return rgw_obj_key::oid_to_key_in_ns(oid, &key, ns);
506}
507
81eedcae 508int rgw_remove_object(RGWRados *store, const RGWBucketInfo& bucket_info, const rgw_bucket& bucket, rgw_obj_key& key)
7c673cae
FG
509{
510 RGWObjectCtx rctx(store);
511
512 if (key.instance.empty()) {
513 key.instance = "null";
514 }
515
516 rgw_obj obj(bucket, key);
517
518 return store->delete_obj(rctx, bucket_info, obj, bucket_info.versioning_status());
519}
520
521int rgw_remove_bucket(RGWRados *store, rgw_bucket& bucket, bool delete_children)
522{
523 int ret;
524 map<RGWObjCategory, RGWStorageStats> stats;
525 std::vector<rgw_bucket_dir_entry> objs;
526 map<string, bool> common_prefixes;
527 RGWBucketInfo info;
11fdf7f2 528 RGWSysObjectCtx obj_ctx = store->svc.sysobj->init_obj_ctx();
7c673cae
FG
529
530 string bucket_ver, master_ver;
531
532 ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, info, NULL);
533 if (ret < 0)
534 return ret;
535
536 ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL);
537 if (ret < 0)
538 return ret;
539
7c673cae
FG
540 RGWRados::Bucket target(store, info);
541 RGWRados::Bucket::List list_op(&target);
224ce89b
WB
542 CephContext *cct = store->ctx();
543 int max = 1000;
7c673cae
FG
544
545 list_op.params.list_versions = true;
a8e16298 546 list_op.params.allow_unordered = true;
7c673cae 547
a8e16298 548 bool is_truncated = false;
224ce89b
WB
549 do {
550 objs.clear();
7c673cae 551
a8e16298 552 ret = list_op.list_objects(max, &objs, &common_prefixes, &is_truncated);
224ce89b
WB
553 if (ret < 0)
554 return ret;
7c673cae 555
224ce89b
WB
556 if (!objs.empty() && !delete_children) {
557 lderr(store->ctx()) << "ERROR: could not remove non-empty bucket " << bucket.name << dendl;
558 return -ENOTEMPTY;
559 }
560
561 for (const auto& obj : objs) {
562 rgw_obj_key key(obj.key);
563 ret = rgw_remove_object(store, info, bucket, key);
a8e16298 564 if (ret < 0 && ret != -ENOENT) {
7c673cae 565 return ret;
a8e16298 566 }
224ce89b 567 }
a8e16298 568 } while(is_truncated);
7c673cae 569
224ce89b 570 string prefix, delimiter;
7c673cae 571
224ce89b
WB
572 ret = abort_bucket_multiparts(store, cct, info, prefix, delimiter);
573 if (ret < 0) {
574 return ret;
7c673cae
FG
575 }
576
b32b8144 577 ret = rgw_bucket_sync_user_stats(store, info.owner, info);
7c673cae
FG
578 if ( ret < 0) {
579 dout(1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl;
580 }
581
582 RGWObjVersionTracker objv_tracker;
583
a8e16298
TL
584 // if we deleted children above we will force delete, as any that
585 // remain is detrius from a prior bug
586 ret = store->delete_bucket(info, objv_tracker, !delete_children);
7c673cae 587 if (ret < 0) {
a8e16298
TL
588 lderr(store->ctx()) << "ERROR: could not remove bucket " <<
589 bucket.name << dendl;
7c673cae
FG
590 return ret;
591 }
592
593 ret = rgw_unlink_bucket(store, info.owner, bucket.tenant, bucket.name, false);
594 if (ret < 0) {
595 lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
596 }
597
598 return ret;
599}
600
601static int aio_wait(librados::AioCompletion *handle)
602{
603 librados::AioCompletion *c = (librados::AioCompletion *)handle;
604 c->wait_for_safe();
605 int ret = c->get_return_value();
606 c->release();
607 return ret;
608}
609
610static int drain_handles(list<librados::AioCompletion *>& pending)
611{
612 int ret = 0;
613 while (!pending.empty()) {
614 librados::AioCompletion *handle = pending.front();
615 pending.pop_front();
616 int r = aio_wait(handle);
617 if (r < 0) {
618 ret = r;
619 }
620 }
621 return ret;
622}
623
624int rgw_remove_bucket_bypass_gc(RGWRados *store, rgw_bucket& bucket,
625 int concurrent_max, bool keep_index_consistent)
626{
627 int ret;
628 map<RGWObjCategory, RGWStorageStats> stats;
629 std::vector<rgw_bucket_dir_entry> objs;
630 map<string, bool> common_prefixes;
631 RGWBucketInfo info;
632 RGWObjectCtx obj_ctx(store);
11fdf7f2 633 RGWSysObjectCtx sysobj_ctx = store->svc.sysobj->init_obj_ctx();
224ce89b 634 CephContext *cct = store->ctx();
7c673cae
FG
635
636 string bucket_ver, master_ver;
637
11fdf7f2 638 ret = store->get_bucket_info(sysobj_ctx, bucket.tenant, bucket.name, info, NULL);
7c673cae
FG
639 if (ret < 0)
640 return ret;
641
642 ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, NULL);
643 if (ret < 0)
644 return ret;
645
224ce89b
WB
646 string prefix, delimiter;
647
648 ret = abort_bucket_multiparts(store, cct, info, prefix, delimiter);
649 if (ret < 0) {
650 return ret;
651 }
7c673cae
FG
652
653 RGWRados::Bucket target(store, info);
654 RGWRados::Bucket::List list_op(&target);
655
656 list_op.params.list_versions = true;
a8e16298 657 list_op.params.allow_unordered = true;
7c673cae
FG
658
659 std::list<librados::AioCompletion*> handles;
660
661 int max = 1000;
662 int max_aio = concurrent_max;
a8e16298
TL
663 bool is_truncated = true;
664
665 while (is_truncated) {
666 objs.clear();
667 ret = list_op.list_objects(max, &objs, &common_prefixes, &is_truncated);
668 if (ret < 0)
669 return ret;
7c673cae 670
7c673cae
FG
671 std::vector<rgw_bucket_dir_entry>::iterator it = objs.begin();
672 for (; it != objs.end(); ++it) {
673 RGWObjState *astate = NULL;
674 rgw_obj obj(bucket, (*it).key);
675
676 ret = store->get_obj_state(&obj_ctx, info, obj, &astate, false);
677 if (ret == -ENOENT) {
678 dout(1) << "WARNING: cannot find obj state for obj " << obj.get_oid() << dendl;
679 continue;
680 }
681 if (ret < 0) {
682 lderr(store->ctx()) << "ERROR: get obj state returned with error " << ret << dendl;
683 return ret;
684 }
685
686 if (astate->has_manifest) {
687 RGWObjManifest& manifest = astate->manifest;
688 RGWObjManifest::obj_iterator miter = manifest.obj_begin();
689 rgw_obj head_obj = manifest.get_obj();
690 rgw_raw_obj raw_head_obj;
691 store->obj_to_raw(info.placement_rule, head_obj, &raw_head_obj);
692
693
694 for (; miter != manifest.obj_end() && max_aio--; ++miter) {
695 if (!max_aio) {
696 ret = drain_handles(handles);
eafe8130 697 if (ret < 0 && ret != -ENOENT) {
7c673cae
FG
698 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
699 return ret;
700 }
701 max_aio = concurrent_max;
702 }
703
704 rgw_raw_obj last_obj = miter.get_location().get_raw_obj(store);
705 if (last_obj == raw_head_obj) {
706 // have the head obj deleted at the end
707 continue;
708 }
709
710 ret = store->delete_raw_obj_aio(last_obj, handles);
711 if (ret < 0) {
712 lderr(store->ctx()) << "ERROR: delete obj aio failed with " << ret << dendl;
713 return ret;
714 }
715 } // for all shadow objs
716
717 ret = store->delete_obj_aio(head_obj, info, astate, handles, keep_index_consistent);
718 if (ret < 0) {
719 lderr(store->ctx()) << "ERROR: delete obj aio failed with " << ret << dendl;
720 return ret;
721 }
722 }
723
724 if (!max_aio) {
725 ret = drain_handles(handles);
eafe8130 726 if (ret < 0 && ret != -ENOENT) {
7c673cae
FG
727 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
728 return ret;
729 }
730 max_aio = concurrent_max;
731 }
eafe8130 732 obj_ctx.invalidate(obj);
7c673cae 733 } // for all RGW objects
7c673cae
FG
734 }
735
736 ret = drain_handles(handles);
eafe8130 737 if (ret < 0 && ret != -ENOENT) {
7c673cae
FG
738 lderr(store->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret << dendl;
739 return ret;
740 }
741
b32b8144 742 ret = rgw_bucket_sync_user_stats(store, info.owner, info);
7c673cae
FG
743 if (ret < 0) {
744 dout(1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl;
745 }
746
747 RGWObjVersionTracker objv_tracker;
748
a8e16298
TL
749 // this function can only be run if caller wanted children to be
750 // deleted, so we can ignore the check for children as any that
751 // remain are detritus from a prior bug
752 ret = store->delete_bucket(info, objv_tracker, false);
7c673cae 753 if (ret < 0) {
b32b8144 754 lderr(store->ctx()) << "ERROR: could not remove bucket " << bucket.name << dendl;
7c673cae
FG
755 return ret;
756 }
757
7c673cae
FG
758 ret = rgw_unlink_bucket(store, info.owner, bucket.tenant, bucket.name, false);
759 if (ret < 0) {
760 lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
761 }
762
763 return ret;
764}
765
766int rgw_bucket_delete_bucket_obj(RGWRados *store,
767 const string& tenant_name,
768 const string& bucket_name,
769 RGWObjVersionTracker& objv_tracker)
770{
771 string key;
772
773 rgw_make_bucket_entry_name(tenant_name, bucket_name, key);
774 return store->meta_mgr->remove_entry(bucket_meta_handler, key, &objv_tracker);
775}
776
777static void set_err_msg(std::string *sink, std::string msg)
778{
779 if (sink && !msg.empty())
780 *sink = msg;
781}
782
783int RGWBucket::init(RGWRados *storage, RGWBucketAdminOpState& op_state)
784{
785 if (!storage)
786 return -EINVAL;
787
788 store = storage;
789
790 rgw_user user_id = op_state.get_user_id();
791 tenant = user_id.tenant;
792 bucket_name = op_state.get_bucket_name();
793 RGWUserBuckets user_buckets;
11fdf7f2 794 auto obj_ctx = store->svc.sysobj->init_obj_ctx();
7c673cae
FG
795
796 if (bucket_name.empty() && user_id.empty())
797 return -EINVAL;
798
799 if (!bucket_name.empty()) {
800 int r = store->get_bucket_info(obj_ctx, tenant, bucket_name, bucket_info, NULL);
801 if (r < 0) {
802 ldout(store->ctx(), 0) << "could not get bucket info for bucket=" << bucket_name << dendl;
803 return r;
804 }
805
806 op_state.set_bucket(bucket_info.bucket);
807 }
808
809 if (!user_id.empty()) {
810 int r = rgw_get_user_info_by_uid(store, user_id, user_info);
811 if (r < 0)
812 return r;
813
814 op_state.display_name = user_info.display_name;
815 }
816
817 clear_failure();
818 return 0;
819}
820
821int RGWBucket::link(RGWBucketAdminOpState& op_state, std::string *err_msg)
822{
823 if (!op_state.is_user_op()) {
824 set_err_msg(err_msg, "empty user id");
825 return -EINVAL;
826 }
827
828 string bucket_id = op_state.get_bucket_id();
829 if (bucket_id.empty()) {
830 set_err_msg(err_msg, "empty bucket instance id");
831 return -EINVAL;
832 }
833
834 std::string display_name = op_state.get_user_display_name();
835 rgw_bucket bucket = op_state.get_bucket();
836
11fdf7f2
TL
837 const rgw_pool& root_pool = store->svc.zone->get_zone_params().domain_root;
838 std::string bucket_entry;
839 rgw_make_bucket_entry_name(tenant, bucket_name, bucket_entry);
840 rgw_raw_obj obj(root_pool, bucket_entry);
7c673cae
FG
841 RGWObjVersionTracker objv_tracker;
842
843 map<string, bufferlist> attrs;
844 RGWBucketInfo bucket_info;
845
11fdf7f2
TL
846 auto obj_ctx = store->svc.sysobj->init_obj_ctx();
847 int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, &attrs);
7c673cae
FG
848 if (r < 0) {
849 return r;
850 }
851
7c673cae
FG
852 map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_ACL);
853 if (aiter != attrs.end()) {
854 bufferlist aclbl = aiter->second;
855 RGWAccessControlPolicy policy;
856 ACLOwner owner;
857 try {
11fdf7f2
TL
858 auto iter = aclbl.cbegin();
859 decode(policy, iter);
7c673cae
FG
860 owner = policy.get_owner();
861 } catch (buffer::error& err) {
862 set_err_msg(err_msg, "couldn't decode policy");
863 return -EIO;
864 }
865
866 r = rgw_unlink_bucket(store, owner.get_id(), bucket.tenant, bucket.name, false);
867 if (r < 0) {
868 set_err_msg(err_msg, "could not unlink policy from user " + owner.get_id().to_str());
869 return r;
870 }
871
872 // now update the user for the bucket...
873 if (display_name.empty()) {
874 ldout(store->ctx(), 0) << "WARNING: user " << user_info.user_id << " has no display name set" << dendl;
875 }
876 policy.create_default(user_info.user_id, display_name);
877
878 owner = policy.get_owner();
31f18b77 879 r = store->set_bucket_owner(bucket_info.bucket, owner);
7c673cae
FG
880 if (r < 0) {
881 set_err_msg(err_msg, "failed to set bucket owner: " + cpp_strerror(-r));
882 return r;
883 }
884
885 // ...and encode the acl
886 aclbl.clear();
887 policy.encode(aclbl);
888
11fdf7f2
TL
889 auto sysobj = obj_ctx.get_obj(obj);
890 r = sysobj.wop()
891 .set_objv_tracker(&objv_tracker)
892 .write_attr(RGW_ATTR_ACL, aclbl);
224ce89b 893 if (r < 0) {
7c673cae 894 return r;
224ce89b 895 }
7c673cae
FG
896
897 RGWAccessControlPolicy policy_instance;
898 policy_instance.create_default(user_info.user_id, display_name);
899 aclbl.clear();
900 policy_instance.encode(aclbl);
901
11fdf7f2
TL
902 rgw_raw_obj obj_bucket_instance;
903 store->get_bucket_instance_obj(bucket, obj_bucket_instance);
904 auto inst_sysobj = obj_ctx.get_obj(obj_bucket_instance);
905 r = inst_sysobj.wop()
906 .set_objv_tracker(&objv_tracker)
907 .write_attr(RGW_ATTR_ACL, aclbl);
224ce89b
WB
908 if (r < 0) {
909 return r;
910 }
7c673cae 911
3efd9988
FG
912 r = rgw_link_bucket(store, user_info.user_id, bucket_info.bucket,
913 ceph::real_time());
224ce89b 914 if (r < 0) {
7c673cae 915 return r;
224ce89b 916 }
7c673cae
FG
917 }
918
919 return 0;
920}
921
922int RGWBucket::unlink(RGWBucketAdminOpState& op_state, std::string *err_msg)
923{
924 rgw_bucket bucket = op_state.get_bucket();
925
926 if (!op_state.is_user_op()) {
927 set_err_msg(err_msg, "could not fetch user or user bucket info");
928 return -EINVAL;
929 }
930
931 int r = rgw_unlink_bucket(store, user_info.user_id, bucket.tenant, bucket.name);
932 if (r < 0) {
933 set_err_msg(err_msg, "error unlinking bucket" + cpp_strerror(-r));
934 }
935
936 return r;
937}
938
94b18763
FG
939int RGWBucket::set_quota(RGWBucketAdminOpState& op_state, std::string *err_msg)
940{
941 rgw_bucket bucket = op_state.get_bucket();
942 RGWBucketInfo bucket_info;
943 map<string, bufferlist> attrs;
11fdf7f2 944 auto obj_ctx = store->svc.sysobj->init_obj_ctx();
94b18763
FG
945 int r = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name, bucket_info, NULL, &attrs);
946 if (r < 0) {
947 set_err_msg(err_msg, "could not get bucket info for bucket=" + bucket.name + ": " + cpp_strerror(-r));
948 return r;
949 }
950
951 bucket_info.quota = op_state.quota;
952 r = store->put_bucket_instance_info(bucket_info, false, real_time(), &attrs);
953 if (r < 0) {
954 set_err_msg(err_msg, "ERROR: failed writing bucket instance info: " + cpp_strerror(-r));
955 return r;
956 }
957 return r;
958}
959
7c673cae
FG
960int RGWBucket::remove(RGWBucketAdminOpState& op_state, bool bypass_gc,
961 bool keep_index_consistent, std::string *err_msg)
962{
963 bool delete_children = op_state.will_delete_children();
964 rgw_bucket bucket = op_state.get_bucket();
965 int ret;
966
967 if (bypass_gc) {
968 if (delete_children) {
969 ret = rgw_remove_bucket_bypass_gc(store, bucket, op_state.get_max_aio(), keep_index_consistent);
970 } else {
971 set_err_msg(err_msg, "purge objects should be set for gc to be bypassed");
972 return -EINVAL;
973 }
974 } else {
975 ret = rgw_remove_bucket(store, bucket, delete_children);
976 }
977
978 if (ret < 0) {
979 set_err_msg(err_msg, "unable to remove bucket" + cpp_strerror(-ret));
980 return ret;
981 }
982
983 return 0;
984}
985
986int RGWBucket::remove_object(RGWBucketAdminOpState& op_state, std::string *err_msg)
987{
988 rgw_bucket bucket = op_state.get_bucket();
989 std::string object_name = op_state.get_object_name();
990
991 rgw_obj_key key(object_name);
992
993 int ret = rgw_remove_object(store, bucket_info, bucket, key);
994 if (ret < 0) {
995 set_err_msg(err_msg, "unable to remove object" + cpp_strerror(-ret));
996 return ret;
997 }
998
999 return 0;
1000}
1001
1002static void dump_bucket_index(map<string, rgw_bucket_dir_entry> result, Formatter *f)
1003{
1004 map<string, rgw_bucket_dir_entry>::iterator iter;
1005 for (iter = result.begin(); iter != result.end(); ++iter) {
1006 f->dump_string("object", iter->first);
1007 }
1008}
1009
1010static void dump_bucket_usage(map<RGWObjCategory, RGWStorageStats>& stats, Formatter *formatter)
1011{
1012 map<RGWObjCategory, RGWStorageStats>::iterator iter;
1013
1014 formatter->open_object_section("usage");
1015 for (iter = stats.begin(); iter != stats.end(); ++iter) {
1016 RGWStorageStats& s = iter->second;
1017 const char *cat_name = rgw_obj_category_name(iter->first);
1018 formatter->open_object_section(cat_name);
1019 s.dump(formatter);
1020 formatter->close_section();
1021 }
1022 formatter->close_section();
1023}
1024
1025static void dump_index_check(map<RGWObjCategory, RGWStorageStats> existing_stats,
1026 map<RGWObjCategory, RGWStorageStats> calculated_stats,
1027 Formatter *formatter)
1028{
1029 formatter->open_object_section("check_result");
1030 formatter->open_object_section("existing_header");
1031 dump_bucket_usage(existing_stats, formatter);
1032 formatter->close_section();
1033 formatter->open_object_section("calculated_header");
1034 dump_bucket_usage(calculated_stats, formatter);
1035 formatter->close_section();
1036 formatter->close_section();
1037}
1038
1039int RGWBucket::check_bad_index_multipart(RGWBucketAdminOpState& op_state,
d2e6a577 1040 RGWFormatterFlusher& flusher ,std::string *err_msg)
7c673cae
FG
1041{
1042 bool fix_index = op_state.will_fix_index();
1043 rgw_bucket bucket = op_state.get_bucket();
1044
d2e6a577 1045 size_t max = 1000;
7c673cae
FG
1046
1047 map<string, bool> common_prefixes;
1048
1049 bool is_truncated;
1050 map<string, bool> meta_objs;
1051 map<rgw_obj_index_key, string> all_objs;
1052
1053 RGWBucketInfo bucket_info;
11fdf7f2 1054 auto obj_ctx = store->svc.sysobj->init_obj_ctx();
7c673cae
FG
1055 int r = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, nullptr, nullptr);
1056 if (r < 0) {
1057 ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): get_bucket_instance_info(bucket=" << bucket << ") returned r=" << r << dendl;
1058 return r;
1059 }
1060
1061 RGWRados::Bucket target(store, bucket_info);
1062 RGWRados::Bucket::List list_op(&target);
1063
1064 list_op.params.list_versions = true;
224ce89b 1065 list_op.params.ns = RGW_OBJ_NS_MULTIPART;
7c673cae
FG
1066
1067 do {
1068 vector<rgw_bucket_dir_entry> result;
1069 int r = list_op.list_objects(max, &result, &common_prefixes, &is_truncated);
1070 if (r < 0) {
1071 set_err_msg(err_msg, "failed to list objects in bucket=" + bucket.name +
1072 " err=" + cpp_strerror(-r));
1073
1074 return r;
1075 }
1076
1077 vector<rgw_bucket_dir_entry>::iterator iter;
1078 for (iter = result.begin(); iter != result.end(); ++iter) {
31f18b77
FG
1079 rgw_obj_index_key key = iter->key;
1080 rgw_obj obj(bucket, key);
1081 string oid = obj.get_oid();
7c673cae
FG
1082
1083 int pos = oid.find_last_of('.');
1084 if (pos < 0) {
1085 /* obj has no suffix */
1086 all_objs[key] = oid;
1087 } else {
1088 /* obj has suffix */
1089 string name = oid.substr(0, pos);
1090 string suffix = oid.substr(pos + 1);
1091
1092 if (suffix.compare("meta") == 0) {
1093 meta_objs[name] = true;
1094 } else {
1095 all_objs[key] = name;
1096 }
1097 }
1098 }
1099
1100 } while (is_truncated);
1101
d2e6a577
FG
1102 list<rgw_obj_index_key> objs_to_unlink;
1103 Formatter *f = flusher.get_formatter();
1104
1105 f->open_array_section("invalid_multipart_entries");
1106
7c673cae
FG
1107 for (auto aiter = all_objs.begin(); aiter != all_objs.end(); ++aiter) {
1108 string& name = aiter->second;
1109
7c673cae
FG
1110 if (meta_objs.find(name) == meta_objs.end()) {
1111 objs_to_unlink.push_back(aiter->first);
1112 }
7c673cae 1113
d2e6a577
FG
1114 if (objs_to_unlink.size() > max) {
1115 if (fix_index) {
1116 int r = store->remove_objs_from_index(bucket_info, objs_to_unlink);
1117 if (r < 0) {
1118 set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
1119 cpp_strerror(-r));
1120 return r;
1121 }
1122 }
1123
1124 dump_mulipart_index_results(objs_to_unlink, flusher.get_formatter());
1125 flusher.flush();
1126 objs_to_unlink.clear();
1127 }
1128 }
7c673cae
FG
1129
1130 if (fix_index) {
1131 int r = store->remove_objs_from_index(bucket_info, objs_to_unlink);
1132 if (r < 0) {
1133 set_err_msg(err_msg, "ERROR: remove_obj_from_index() returned error: " +
1134 cpp_strerror(-r));
1135
1136 return r;
1137 }
1138 }
1139
d2e6a577
FG
1140 dump_mulipart_index_results(objs_to_unlink, f);
1141 f->close_section();
1142 flusher.flush();
1143
7c673cae
FG
1144 return 0;
1145}
1146
1147int RGWBucket::check_object_index(RGWBucketAdminOpState& op_state,
1148 RGWFormatterFlusher& flusher,
1149 std::string *err_msg)
1150{
1151
1152 bool fix_index = op_state.will_fix_index();
1153
7c673cae
FG
1154 if (!fix_index) {
1155 set_err_msg(err_msg, "check-objects flag requires fix index enabled");
1156 return -EINVAL;
1157 }
1158
1159 store->cls_obj_set_bucket_tag_timeout(bucket_info, BUCKET_TAG_TIMEOUT);
1160
1161 string prefix;
1162 rgw_obj_index_key marker;
1163 bool is_truncated = true;
1164
1165 Formatter *formatter = flusher.get_formatter();
1166 formatter->open_object_section("objects");
1167 while (is_truncated) {
1168 map<string, rgw_bucket_dir_entry> result;
1169
1adf2230
AA
1170 int r = store->cls_bucket_list_ordered(bucket_info, RGW_NO_SHARD,
1171 marker, prefix, 1000, true,
1172 result, &is_truncated, &marker,
1173 bucket_object_check_filter);
7c673cae
FG
1174 if (r == -ENOENT) {
1175 break;
1176 } else if (r < 0 && r != -ENOENT) {
1177 set_err_msg(err_msg, "ERROR: failed operation r=" + cpp_strerror(-r));
1178 }
1179
7c673cae
FG
1180 dump_bucket_index(result, formatter);
1181 flusher.flush();
7c673cae
FG
1182 }
1183
1184 formatter->close_section();
1185
1186 store->cls_obj_set_bucket_tag_timeout(bucket_info, 0);
1187
1188 return 0;
1189}
1190
1191
1192int RGWBucket::check_index(RGWBucketAdminOpState& op_state,
1193 map<RGWObjCategory, RGWStorageStats>& existing_stats,
1194 map<RGWObjCategory, RGWStorageStats>& calculated_stats,
1195 std::string *err_msg)
1196{
7c673cae
FG
1197 bool fix_index = op_state.will_fix_index();
1198
1199 int r = store->bucket_check_index(bucket_info, &existing_stats, &calculated_stats);
1200 if (r < 0) {
1201 set_err_msg(err_msg, "failed to check index error=" + cpp_strerror(-r));
1202 return r;
1203 }
1204
1205 if (fix_index) {
1206 r = store->bucket_rebuild_index(bucket_info);
1207 if (r < 0) {
1208 set_err_msg(err_msg, "failed to rebuild index err=" + cpp_strerror(-r));
1209 return r;
1210 }
1211 }
1212
1213 return 0;
1214}
1215
7c673cae
FG
1216int RGWBucket::policy_bl_to_stream(bufferlist& bl, ostream& o)
1217{
1218 RGWAccessControlPolicy_S3 policy(g_ceph_context);
81eedcae
TL
1219 int ret = decode_bl(bl, policy);
1220 if (ret < 0) {
1221 ldout(store->ctx(),0) << "failed to decode RGWAccessControlPolicy" << dendl;
7c673cae
FG
1222 }
1223 policy.to_xml(o);
1224 return 0;
1225}
1226
81eedcae
TL
1227int rgw_object_get_attr(RGWRados* store, const RGWBucketInfo& bucket_info,
1228 const rgw_obj& obj, const char* attr_name,
1229 bufferlist& out_bl)
7c673cae 1230{
81eedcae
TL
1231 RGWObjectCtx obj_ctx(store);
1232 RGWRados::Object op_target(store, bucket_info, obj_ctx, obj);
1233 RGWRados::Object::Read rop(&op_target);
1234
1235 return rop.get_attr(attr_name, out_bl);
7c673cae
FG
1236}
1237
1238int RGWBucket::get_policy(RGWBucketAdminOpState& op_state, RGWAccessControlPolicy& policy)
1239{
1240 std::string object_name = op_state.get_object_name();
1241 rgw_bucket bucket = op_state.get_bucket();
11fdf7f2 1242 auto sysobj_ctx = store->svc.sysobj->init_obj_ctx();
7c673cae
FG
1243
1244 RGWBucketInfo bucket_info;
1245 map<string, bufferlist> attrs;
11fdf7f2 1246 int ret = store->get_bucket_info(sysobj_ctx, bucket.tenant, bucket.name, bucket_info, NULL, &attrs);
7c673cae
FG
1247 if (ret < 0) {
1248 return ret;
1249 }
1250
1251 if (!object_name.empty()) {
1252 bufferlist bl;
1253 rgw_obj obj(bucket, object_name);
1254
81eedcae
TL
1255 ret = rgw_object_get_attr(store, bucket_info, obj, RGW_ATTR_ACL, bl);
1256 if (ret < 0){
7c673cae 1257 return ret;
81eedcae 1258 }
7c673cae 1259
81eedcae
TL
1260 ret = decode_bl(bl, policy);
1261 if (ret < 0) {
1262 ldout(store->ctx(),0) << "failed to decode RGWAccessControlPolicy" << dendl;
1263 }
1264 return ret;
7c673cae
FG
1265 }
1266
1267 map<string, bufferlist>::iterator aiter = attrs.find(RGW_ATTR_ACL);
1268 if (aiter == attrs.end()) {
1269 return -ENOENT;
1270 }
1271
81eedcae
TL
1272 ret = decode_bl(aiter->second, policy);
1273 if (ret < 0) {
1274 ldout(store->ctx(),0) << "failed to decode RGWAccessControlPolicy" << dendl;
1275 }
1276
1277 return ret;
7c673cae
FG
1278}
1279
1280
1281int RGWBucketAdminOp::get_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
1282 RGWAccessControlPolicy& policy)
1283{
1284 RGWBucket bucket;
1285
1286 int ret = bucket.init(store, op_state);
1287 if (ret < 0)
1288 return ret;
1289
1290 ret = bucket.get_policy(op_state, policy);
1291 if (ret < 0)
1292 return ret;
1293
1294 return 0;
1295}
1296
1297/* Wrappers to facilitate RESTful interface */
1298
1299
1300int RGWBucketAdminOp::get_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
1301 RGWFormatterFlusher& flusher)
1302{
1303 RGWAccessControlPolicy policy(store->ctx());
1304
1305 int ret = get_policy(store, op_state, policy);
1306 if (ret < 0)
1307 return ret;
1308
1309 Formatter *formatter = flusher.get_formatter();
1310
1311 flusher.start(0);
1312
1313 formatter->open_object_section("policy");
1314 policy.dump(formatter);
1315 formatter->close_section();
1316
1317 flusher.flush();
1318
1319 return 0;
1320}
1321
1322int RGWBucketAdminOp::dump_s3_policy(RGWRados *store, RGWBucketAdminOpState& op_state,
1323 ostream& os)
1324{
1325 RGWAccessControlPolicy_S3 policy(store->ctx());
1326
1327 int ret = get_policy(store, op_state, policy);
1328 if (ret < 0)
1329 return ret;
1330
1331 policy.to_xml(os);
1332
1333 return 0;
1334}
1335
1336int RGWBucketAdminOp::unlink(RGWRados *store, RGWBucketAdminOpState& op_state)
1337{
1338 RGWBucket bucket;
1339
1340 int ret = bucket.init(store, op_state);
1341 if (ret < 0)
1342 return ret;
1343
1344 return bucket.unlink(op_state);
1345}
1346
1347int RGWBucketAdminOp::link(RGWRados *store, RGWBucketAdminOpState& op_state, string *err)
1348{
1349 RGWBucket bucket;
1350
1351 int ret = bucket.init(store, op_state);
1352 if (ret < 0)
1353 return ret;
1354
1355 return bucket.link(op_state, err);
1356
1357}
1358
1359int RGWBucketAdminOp::check_index(RGWRados *store, RGWBucketAdminOpState& op_state,
1360 RGWFormatterFlusher& flusher)
1361{
1362 int ret;
7c673cae
FG
1363 map<RGWObjCategory, RGWStorageStats> existing_stats;
1364 map<RGWObjCategory, RGWStorageStats> calculated_stats;
d2e6a577 1365
7c673cae
FG
1366
1367 RGWBucket bucket;
1368
1369 ret = bucket.init(store, op_state);
1370 if (ret < 0)
1371 return ret;
1372
1373 Formatter *formatter = flusher.get_formatter();
1374 flusher.start(0);
1375
d2e6a577 1376 ret = bucket.check_bad_index_multipart(op_state, flusher);
7c673cae
FG
1377 if (ret < 0)
1378 return ret;
1379
7c673cae
FG
1380 ret = bucket.check_object_index(op_state, flusher);
1381 if (ret < 0)
1382 return ret;
1383
1384 ret = bucket.check_index(op_state, existing_stats, calculated_stats);
1385 if (ret < 0)
1386 return ret;
1387
1388 dump_index_check(existing_stats, calculated_stats, formatter);
1389 flusher.flush();
1390
1391 return 0;
1392}
1393
1394int RGWBucketAdminOp::remove_bucket(RGWRados *store, RGWBucketAdminOpState& op_state,
1395 bool bypass_gc, bool keep_index_consistent)
1396{
1397 RGWBucket bucket;
1398
1399 int ret = bucket.init(store, op_state);
1400 if (ret < 0)
1401 return ret;
1402
c07f9fc5
FG
1403 std::string err_msg;
1404 ret = bucket.remove(op_state, bypass_gc, keep_index_consistent, &err_msg);
1405 if (!err_msg.empty()) {
1406 lderr(store->ctx()) << "ERROR: " << err_msg << dendl;
1407 }
1408 return ret;
7c673cae
FG
1409}
1410
1411int RGWBucketAdminOp::remove_object(RGWRados *store, RGWBucketAdminOpState& op_state)
1412{
1413 RGWBucket bucket;
1414
1415 int ret = bucket.init(store, op_state);
1416 if (ret < 0)
1417 return ret;
1418
1419 return bucket.remove_object(op_state);
1420}
1421
1422static int bucket_stats(RGWRados *store, const std::string& tenant_name, std::string& bucket_name, Formatter *formatter)
1423{
1424 RGWBucketInfo bucket_info;
1425 map<RGWObjCategory, RGWStorageStats> stats;
1426
1427 real_time mtime;
11fdf7f2 1428 auto obj_ctx = store->svc.sysobj->init_obj_ctx();
7c673cae
FG
1429 int r = store->get_bucket_info(obj_ctx, tenant_name, bucket_name, bucket_info, &mtime);
1430 if (r < 0)
1431 return r;
1432
1433 rgw_bucket& bucket = bucket_info.bucket;
1434
1435 string bucket_ver, master_ver;
1436 string max_marker;
1437 int ret = store->get_bucket_stats(bucket_info, RGW_NO_SHARD, &bucket_ver, &master_ver, stats, &max_marker);
1438 if (ret < 0) {
1439 cerr << "error getting bucket stats ret=" << ret << std::endl;
1440 return ret;
1441 }
1442
1443 utime_t ut(mtime);
1444
1445 formatter->open_object_section("stats");
1446 formatter->dump_string("bucket", bucket.name);
11fdf7f2 1447 formatter->dump_string("tenant", bucket.tenant);
31f18b77 1448 formatter->dump_string("zonegroup", bucket_info.zonegroup);
11fdf7f2 1449 formatter->dump_string("placement_rule", bucket_info.placement_rule.to_str());
31f18b77 1450 ::encode_json("explicit_placement", bucket.explicit_placement, formatter);
7c673cae
FG
1451 formatter->dump_string("id", bucket.bucket_id);
1452 formatter->dump_string("marker", bucket.marker);
1453 formatter->dump_stream("index_type") << bucket_info.index_type;
1454 ::encode_json("owner", bucket_info.owner, formatter);
1455 formatter->dump_string("ver", bucket_ver);
1456 formatter->dump_string("master_ver", master_ver);
81eedcae 1457 ut.gmtime(formatter->dump_stream("mtime"));
7c673cae
FG
1458 formatter->dump_string("max_marker", max_marker);
1459 dump_bucket_usage(stats, formatter);
1460 encode_json("bucket_quota", bucket_info.quota, formatter);
1461 formatter->close_section();
1462
1463 return 0;
1464}
1465
1466int RGWBucketAdminOp::limit_check(RGWRados *store,
1467 RGWBucketAdminOpState& op_state,
1468 const std::list<std::string>& user_ids,
1469 RGWFormatterFlusher& flusher,
1470 bool warnings_only)
1471{
1472 int ret = 0;
1473 const size_t max_entries =
1474 store->ctx()->_conf->rgw_list_buckets_max_chunk;
1475
1476 const size_t safe_max_objs_per_shard =
1477 store->ctx()->_conf->rgw_safe_max_objects_per_shard;
1478
1479 uint16_t shard_warn_pct =
1480 store->ctx()->_conf->rgw_shard_warning_threshold;
1481 if (shard_warn_pct > 100)
1482 shard_warn_pct = 90;
1483
1484 Formatter *formatter = flusher.get_formatter();
1485 flusher.start(0);
1486
1487 formatter->open_array_section("users");
1488
1489 for (const auto& user_id : user_ids) {
a8e16298 1490
7c673cae
FG
1491 formatter->open_object_section("user");
1492 formatter->dump_string("user_id", user_id);
7c673cae 1493 formatter->open_array_section("buckets");
a8e16298
TL
1494
1495 string marker;
1496 bool is_truncated{false};
7c673cae
FG
1497 do {
1498 RGWUserBuckets buckets;
7c673cae
FG
1499
1500 ret = rgw_read_user_buckets(store, user_id, buckets,
1501 marker, string(), max_entries, false,
1502 &is_truncated);
1503 if (ret < 0)
1504 return ret;
1505
1506 map<string, RGWBucketEnt>& m_buckets = buckets.get_buckets();
1507
1508 for (const auto& iter : m_buckets) {
1509 auto& bucket = iter.second.bucket;
1510 uint32_t num_shards = 1;
1511 uint64_t num_objects = 0;
1512
1513 /* need info for num_shards */
1514 RGWBucketInfo info;
11fdf7f2 1515 auto obj_ctx = store->svc.sysobj->init_obj_ctx();
7c673cae
FG
1516
1517 marker = bucket.name; /* Casey's location for marker update,
1518 * as we may now not reach the end of
1519 * the loop body */
1520
1521 ret = store->get_bucket_info(obj_ctx, bucket.tenant, bucket.name,
1522 info, nullptr);
1523 if (ret < 0)
1524 continue;
1525
1526 /* need stats for num_entries */
1527 string bucket_ver, master_ver;
1528 std::map<RGWObjCategory, RGWStorageStats> stats;
1529 ret = store->get_bucket_stats(info, RGW_NO_SHARD, &bucket_ver,
1530 &master_ver, stats, nullptr);
1531
1532 if (ret < 0)
1533 continue;
1534
1535 for (const auto& s : stats) {
1536 num_objects += s.second.num_objects;
1537 }
1538
1539 num_shards = info.num_shards;
31f18b77
FG
1540 uint64_t objs_per_shard =
1541 (num_shards) ? num_objects/num_shards : num_objects;
7c673cae
FG
1542 {
1543 bool warn = false;
1544 stringstream ss;
1545 if (objs_per_shard > safe_max_objs_per_shard) {
1546 double over =
1547 100 - (safe_max_objs_per_shard/objs_per_shard * 100);
1548 ss << boost::format("OVER %4f%%") % over;
1549 warn = true;
1550 } else {
1551 double fill_pct =
1552 objs_per_shard / safe_max_objs_per_shard * 100;
1553 if (fill_pct >= shard_warn_pct) {
1554 ss << boost::format("WARN %4f%%") % fill_pct;
1555 warn = true;
1556 } else {
1557 ss << "OK";
1558 }
1559 }
1560
1561 if (warn || (! warnings_only)) {
1562 formatter->open_object_section("bucket");
1563 formatter->dump_string("bucket", bucket.name);
1564 formatter->dump_string("tenant", bucket.tenant);
1565 formatter->dump_int("num_objects", num_objects);
1566 formatter->dump_int("num_shards", num_shards);
1567 formatter->dump_int("objects_per_shard", objs_per_shard);
1568 formatter->dump_string("fill_status", ss.str());
1569 formatter->close_section();
1570 }
1571 }
1572 }
a8e16298
TL
1573 formatter->flush(cout);
1574 } while (is_truncated); /* foreach: bucket */
7c673cae
FG
1575
1576 formatter->close_section();
1577 formatter->close_section();
1578 formatter->flush(cout);
1579
1580 } /* foreach: user_id */
1581
1582 formatter->close_section();
1583 formatter->flush(cout);
1584
1585 return ret;
1586} /* RGWBucketAdminOp::limit_check */
1587
1588int RGWBucketAdminOp::info(RGWRados *store, RGWBucketAdminOpState& op_state,
1589 RGWFormatterFlusher& flusher)
1590{
11fdf7f2 1591 int ret = 0;
7c673cae 1592 string bucket_name = op_state.get_bucket_name();
7c673cae
FG
1593 Formatter *formatter = flusher.get_formatter();
1594 flusher.start(0);
1595
1596 CephContext *cct = store->ctx();
1597
1598 const size_t max_entries = cct->_conf->rgw_list_buckets_max_chunk;
1599
1600 bool show_stats = op_state.will_fetch_stats();
1601 rgw_user user_id = op_state.get_user_id();
1602 if (op_state.is_user_op()) {
1603 formatter->open_array_section("buckets");
1604
1605 RGWUserBuckets buckets;
1606 string marker;
1607 bool is_truncated = false;
1608
1609 do {
1610 ret = rgw_read_user_buckets(store, op_state.get_user_id(), buckets,
1611 marker, string(), max_entries, false,
1612 &is_truncated);
1613 if (ret < 0)
1614 return ret;
1615
1616 map<string, RGWBucketEnt>& m = buckets.get_buckets();
1617 map<string, RGWBucketEnt>::iterator iter;
1618
1619 for (iter = m.begin(); iter != m.end(); ++iter) {
11fdf7f2
TL
1620 std::string obj_name = iter->first;
1621 if (!bucket_name.empty() && bucket_name != obj_name) {
1622 continue;
1623 }
1624
7c673cae
FG
1625 if (show_stats)
1626 bucket_stats(store, user_id.tenant, obj_name, formatter);
1627 else
1628 formatter->dump_string("bucket", obj_name);
1629
1630 marker = obj_name;
1631 }
1632
1633 flusher.flush();
1634 } while (is_truncated);
1635
1636 formatter->close_section();
1637 } else if (!bucket_name.empty()) {
11fdf7f2
TL
1638 ret = bucket_stats(store, user_id.tenant, bucket_name, formatter);
1639 if (ret < 0) {
1640 return ret;
1641 }
7c673cae 1642 } else {
11fdf7f2
TL
1643 void *handle = nullptr;
1644 bool truncated = true;
7c673cae
FG
1645
1646 formatter->open_array_section("buckets");
11fdf7f2
TL
1647 ret = store->meta_mgr->list_keys_init("bucket", &handle);
1648 while (ret == 0 && truncated) {
1649 std::list<std::string> buckets;
1650 const int max_keys = 1000;
1651 ret = store->meta_mgr->list_keys_next(handle, max_keys, buckets,
1652 &truncated);
1653 for (auto& bucket_name : buckets) {
7c673cae 1654 if (show_stats)
11fdf7f2 1655 bucket_stats(store, user_id.tenant, bucket_name, formatter);
7c673cae 1656 else
11fdf7f2 1657 formatter->dump_string("bucket", bucket_name);
7c673cae
FG
1658 }
1659 }
1660
1661 formatter->close_section();
1662 }
1663
1664 flusher.flush();
1665
1666 return 0;
1667}
1668
94b18763
FG
1669int RGWBucketAdminOp::set_quota(RGWRados *store, RGWBucketAdminOpState& op_state)
1670{
1671 RGWBucket bucket;
1672
1673 int ret = bucket.init(store, op_state);
1674 if (ret < 0)
1675 return ret;
1676 return bucket.set_quota(op_state);
1677}
7c673cae 1678
f64942e4
AA
1679static int purge_bucket_instance(RGWRados *store, const RGWBucketInfo& bucket_info)
1680{
1681 int max_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
1682 for (int i = 0; i < max_shards; i++) {
1683 RGWRados::BucketShard bs(store);
1684 int shard_id = (bucket_info.num_shards > 0 ? i : -1);
1685 int ret = bs.init(bucket_info.bucket, shard_id, nullptr);
1686 if (ret < 0) {
1687 cerr << "ERROR: bs.init(bucket=" << bucket_info.bucket << ", shard=" << shard_id
1688 << "): " << cpp_strerror(-ret) << std::endl;
1689 return ret;
1690 }
1691 ret = store->bi_remove(bs);
1692 if (ret < 0) {
1693 cerr << "ERROR: failed to remove bucket index object: "
1694 << cpp_strerror(-ret) << std::endl;
1695 return ret;
1696 }
1697 }
1698 return 0;
1699}
1700
11fdf7f2 1701inline auto split_tenant(const std::string& bucket_name){
f64942e4
AA
1702 auto p = bucket_name.find('/');
1703 if(p != std::string::npos) {
1704 return std::make_pair(bucket_name.substr(0,p), bucket_name.substr(p+1));
1705 }
1706 return std::make_pair(std::string(), bucket_name);
1707}
1708
1709using bucket_instance_ls = std::vector<RGWBucketInfo>;
1710void get_stale_instances(RGWRados *store, const std::string& bucket_name,
1711 const vector<std::string>& lst,
1712 bucket_instance_ls& stale_instances)
1713{
1714
11fdf7f2 1715 auto obj_ctx = store->svc.sysobj->init_obj_ctx();
f64942e4
AA
1716
1717 bucket_instance_ls other_instances;
1718// first iterate over the entries, and pick up the done buckets; these
1719// are guaranteed to be stale
1720 for (const auto& bucket_instance : lst){
1721 RGWBucketInfo binfo;
1722 int r = store->get_bucket_instance_info(obj_ctx, bucket_instance,
1723 binfo, nullptr,nullptr);
1724 if (r < 0){
1725 // this can only happen if someone deletes us right when we're processing
1726 lderr(store->ctx()) << "Bucket instance is invalid: " << bucket_instance
1727 << cpp_strerror(-r) << dendl;
1728 continue;
1729 }
1730 if (binfo.reshard_status == CLS_RGW_RESHARD_DONE)
1731 stale_instances.emplace_back(std::move(binfo));
1732 else {
1733 other_instances.emplace_back(std::move(binfo));
1734 }
1735 }
1736
1737 // Read the cur bucket info, if the bucket doesn't exist we can simply return
1738 // all the instances
11fdf7f2 1739 auto [tenant, bucket] = split_tenant(bucket_name);
f64942e4
AA
1740 RGWBucketInfo cur_bucket_info;
1741 int r = store->get_bucket_info(obj_ctx, tenant, bucket, cur_bucket_info, nullptr);
1742 if (r < 0) {
1743 if (r == -ENOENT) {
1744 // bucket doesn't exist, everything is stale then
1745 stale_instances.insert(std::end(stale_instances),
1746 std::make_move_iterator(other_instances.begin()),
1747 std::make_move_iterator(other_instances.end()));
1748 } else {
1749 // all bets are off if we can't read the bucket, just return the sureshot stale instances
1750 lderr(store->ctx()) << "error: reading bucket info for bucket: "
1751 << bucket << cpp_strerror(-r) << dendl;
1752 }
1753 return;
1754 }
1755
1756 // Don't process further in this round if bucket is resharding
1757 if (cur_bucket_info.reshard_status == CLS_RGW_RESHARD_IN_PROGRESS)
1758 return;
1759
1760 other_instances.erase(std::remove_if(other_instances.begin(), other_instances.end(),
1761 [&cur_bucket_info](const RGWBucketInfo& b){
1762 return (b.bucket.bucket_id == cur_bucket_info.bucket.bucket_id ||
1763 b.bucket.bucket_id == cur_bucket_info.new_bucket_instance_id);
1764 }),
1765 other_instances.end());
1766
1767 // check if there are still instances left
1768 if (other_instances.empty()) {
1769 return;
1770 }
1771
1772 // Now we have a bucket with instances where the reshard status is none, this
1773 // usually happens when the reshard process couldn't complete, lockdown the
1774 // bucket and walk through these instances to make sure no one else interferes
1775 // with these
1776 {
1777 RGWBucketReshardLock reshard_lock(store, cur_bucket_info, true);
1778 r = reshard_lock.lock();
1779 if (r < 0) {
1780 // most likely bucket is under reshard, return the sureshot stale instances
1781 ldout(store->ctx(), 5) << __func__
1782 << "failed to take reshard lock; reshard underway likey" << dendl;
1783 return;
1784 }
1785 auto sg = make_scope_guard([&reshard_lock](){ reshard_lock.unlock();} );
1786 // this should be fast enough that we may not need to renew locks and check
1787 // exit status?, should we read the values of the instances again?
1788 stale_instances.insert(std::end(stale_instances),
1789 std::make_move_iterator(other_instances.begin()),
1790 std::make_move_iterator(other_instances.end()));
1791 }
1792
1793 return;
1794}
1795
1796static int process_stale_instances(RGWRados *store, RGWBucketAdminOpState& op_state,
1797 RGWFormatterFlusher& flusher,
1798 std::function<void(const bucket_instance_ls&,
1799 Formatter *,
1800 RGWRados*)> process_f)
1801{
1802 std::string marker;
1803 void *handle;
1804 Formatter *formatter = flusher.get_formatter();
1805 static constexpr auto default_max_keys = 1000;
1806
1807 int ret = store->meta_mgr->list_keys_init("bucket.instance", marker, &handle);
1808 if (ret < 0) {
1809 cerr << "ERROR: can't get key: " << cpp_strerror(-ret) << std::endl;
1810 return ret;
1811 }
1812
1813 bool truncated;
1814
1815 formatter->open_array_section("keys");
1816
1817 do {
1818 list<std::string> keys;
1819
1820 ret = store->meta_mgr->list_keys_next(handle, default_max_keys, keys, &truncated);
1821 if (ret < 0 && ret != -ENOENT) {
1822 cerr << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << std::endl;
1823 return ret;
1824 } if (ret != -ENOENT) {
1825 // partition the list of buckets by buckets as the listing is un sorted,
1826 // since it would minimize the reads to bucket_info
1827 std::unordered_map<std::string, std::vector<std::string>> bucket_instance_map;
1828 for (auto &key: keys) {
1829 auto pos = key.find(':');
1830 if(pos != std::string::npos)
1831 bucket_instance_map[key.substr(0,pos)].emplace_back(std::move(key));
1832 }
1833 for (const auto& kv: bucket_instance_map) {
1834 bucket_instance_ls stale_lst;
1835 get_stale_instances(store, kv.first, kv.second, stale_lst);
1836 process_f(stale_lst, formatter, store);
1837 }
1838 }
1839 } while (truncated);
1840
1841 formatter->close_section(); // keys
1842 formatter->flush(cout);
1843 return 0;
1844}
1845
1846int RGWBucketAdminOp::list_stale_instances(RGWRados *store,
1847 RGWBucketAdminOpState& op_state,
1848 RGWFormatterFlusher& flusher)
1849{
1850 auto process_f = [](const bucket_instance_ls& lst,
1851 Formatter *formatter,
1852 RGWRados*){
1853 for (const auto& binfo: lst)
1854 formatter->dump_string("key", binfo.bucket.get_key());
1855 };
1856 return process_stale_instances(store, op_state, flusher, process_f);
1857}
1858
1859
1860int RGWBucketAdminOp::clear_stale_instances(RGWRados *store,
1861 RGWBucketAdminOpState& op_state,
1862 RGWFormatterFlusher& flusher)
1863{
1864 auto process_f = [](const bucket_instance_ls& lst,
1865 Formatter *formatter,
1866 RGWRados *store){
1867 for (const auto &binfo: lst) {
1868 int ret = purge_bucket_instance(store, binfo);
1869 if (ret == 0){
1870 auto md_key = "bucket.instance:" + binfo.bucket.get_key();
1871 ret = store->meta_mgr->remove(md_key);
1872 }
1873 formatter->open_object_section("delete_status");
1874 formatter->dump_string("bucket_instance", binfo.bucket.get_key());
1875 formatter->dump_int("status", -ret);
1876 formatter->close_section();
1877 }
1878 };
1879
1880 return process_stale_instances(store, op_state, flusher, process_f);
1881}
1882
11fdf7f2
TL
1883static int fix_single_bucket_lc(RGWRados *store,
1884 const std::string& tenant_name,
1885 const std::string& bucket_name)
1886{
1887 auto obj_ctx = store->svc.sysobj->init_obj_ctx();
1888 RGWBucketInfo bucket_info;
1889 map <std::string, bufferlist> bucket_attrs;
1890 int ret = store->get_bucket_info(obj_ctx, tenant_name, bucket_name,
1891 bucket_info, nullptr, &bucket_attrs);
1892 if (ret < 0) {
1893 // TODO: Should we handle the case where the bucket could've been removed between
1894 // listing and fetching?
1895 return ret;
1896 }
1897
1898 return rgw::lc::fix_lc_shard_entry(store, bucket_info, bucket_attrs);
1899}
1900
1901static void format_lc_status(Formatter* formatter,
1902 const std::string& tenant_name,
1903 const std::string& bucket_name,
1904 int status)
1905{
1906 formatter->open_object_section("bucket_entry");
1907 std::string entry = tenant_name.empty() ? bucket_name : tenant_name + "/" + bucket_name;
1908 formatter->dump_string("bucket", entry);
1909 formatter->dump_int("status", status);
1910 formatter->close_section(); // bucket_entry
1911}
1912
1913static void process_single_lc_entry(RGWRados *store, Formatter *formatter,
1914 const std::string& tenant_name,
1915 const std::string& bucket_name)
1916{
1917 int ret = fix_single_bucket_lc(store, tenant_name, bucket_name);
1918 format_lc_status(formatter, tenant_name, bucket_name, -ret);
1919}
1920
1921int RGWBucketAdminOp::fix_lc_shards(RGWRados *store,
1922 RGWBucketAdminOpState& op_state,
1923 RGWFormatterFlusher& flusher)
1924{
1925 std::string marker;
1926 void *handle;
1927 Formatter *formatter = flusher.get_formatter();
1928 static constexpr auto default_max_keys = 1000;
1929
1930 bool truncated;
1931 if (const std::string& bucket_name = op_state.get_bucket_name();
1932 ! bucket_name.empty()) {
1933 const rgw_user user_id = op_state.get_user_id();
1934 process_single_lc_entry(store, formatter, user_id.tenant, bucket_name);
1935 formatter->flush(cout);
1936 } else {
1937 int ret = store->meta_mgr->list_keys_init("bucket", marker, &handle);
1938 if (ret < 0) {
1939 std::cerr << "ERROR: can't get key: " << cpp_strerror(-ret) << std::endl;
1940 return ret;
1941 }
1942
1943 {
1944 formatter->open_array_section("lc_fix_status");
1945 auto sg = make_scope_guard([&store, &handle, &formatter](){
1946 store->meta_mgr->list_keys_complete(handle);
1947 formatter->close_section(); // lc_fix_status
1948 formatter->flush(cout);
1949 });
1950 do {
1951 list<std::string> keys;
1952 ret = store->meta_mgr->list_keys_next(handle, default_max_keys, keys, &truncated);
1953 if (ret < 0 && ret != -ENOENT) {
1954 std::cerr << "ERROR: lists_keys_next(): " << cpp_strerror(-ret) << std::endl;
1955 return ret;
1956 } if (ret != -ENOENT) {
1957 for (const auto &key:keys) {
1958 auto [tenant_name, bucket_name] = split_tenant(key);
1959 process_single_lc_entry(store, formatter, tenant_name, bucket_name);
1960 }
1961 }
1962 formatter->flush(cout); // regularly flush every 1k entries
1963 } while (truncated);
1964 }
1965
1966 }
1967 return 0;
1968
1969}
1970
81eedcae
TL
1971static bool has_object_expired(RGWRados *store, const RGWBucketInfo& bucket_info,
1972 const rgw_obj_key& key, utime_t& delete_at)
1973{
1974 rgw_obj obj(bucket_info.bucket, key);
1975 bufferlist delete_at_bl;
1976
1977 int ret = rgw_object_get_attr(store, bucket_info, obj, RGW_ATTR_DELETE_AT, delete_at_bl);
1978 if (ret < 0) {
1979 return false; // no delete at attr, proceed
1980 }
1981
1982 ret = decode_bl(delete_at_bl, delete_at);
1983 if (ret < 0) {
1984 return false; // failed to parse
1985 }
1986
1987 if (delete_at <= ceph_clock_now() && !delete_at.is_zero()) {
1988 return true;
1989 }
1990
1991 return false;
1992}
1993
1994static int fix_bucket_obj_expiry(RGWRados *store, const RGWBucketInfo& bucket_info,
1995 RGWFormatterFlusher& flusher, bool dry_run)
1996{
1997 if (bucket_info.bucket.bucket_id == bucket_info.bucket.marker) {
1998 lderr(store->ctx()) << "Not a resharded bucket skipping" << dendl;
1999 return 0; // not a resharded bucket, move along
2000 }
2001
2002 Formatter *formatter = flusher.get_formatter();
2003 formatter->open_array_section("expired_deletion_status");
2004 auto sg = make_scope_guard([&formatter] {
2005 formatter->close_section();
2006 formatter->flush(std::cout);
2007 });
2008
2009 RGWRados::Bucket target(store, bucket_info);
2010 RGWRados::Bucket::List list_op(&target);
2011
2012 list_op.params.list_versions = bucket_info.versioned();
2013 list_op.params.allow_unordered = true;
2014
2015 constexpr auto max_objects = 1000;
2016 bool is_truncated {false};
2017 do {
2018 std::vector<rgw_bucket_dir_entry> objs;
2019
2020 int ret = list_op.list_objects(max_objects, &objs, nullptr, &is_truncated);
2021 if (ret < 0) {
2022 lderr(store->ctx()) << "ERROR failed to list objects in the bucket" << dendl;
2023 return ret;
2024 }
2025 for (const auto& obj : objs) {
2026 rgw_obj_key key(obj.key);
2027 utime_t delete_at;
2028 if (has_object_expired(store, bucket_info, key, delete_at)) {
2029 formatter->open_object_section("object_status");
2030 formatter->dump_string("object", key.name);
2031 formatter->dump_stream("delete_at") << delete_at;
2032
2033 if (!dry_run) {
2034 ret = rgw_remove_object(store, bucket_info, bucket_info.bucket, key);
2035 formatter->dump_int("status", ret);
2036 }
2037
2038 formatter->close_section(); // object_status
2039 }
2040 }
2041 formatter->flush(cout); // regularly flush every 1k entries
2042 } while (is_truncated);
2043
2044 return 0;
2045}
2046
2047int RGWBucketAdminOp::fix_obj_expiry(RGWRados *store, RGWBucketAdminOpState& op_state,
2048 RGWFormatterFlusher& flusher, bool dry_run)
2049{
2050 RGWBucket admin_bucket;
2051 int ret = admin_bucket.init(store, op_state);
2052 if (ret < 0) {
2053 lderr(store->ctx()) << "failed to initialize bucket" << dendl;
2054 return ret;
2055 }
2056
2057 return fix_bucket_obj_expiry(store, admin_bucket.get_bucket_info(), flusher, dry_run);
2058}
2059
7c673cae
FG
2060void rgw_data_change::dump(Formatter *f) const
2061{
2062 string type;
2063 switch (entity_type) {
2064 case ENTITY_TYPE_BUCKET:
2065 type = "bucket";
2066 break;
2067 default:
2068 type = "unknown";
2069 }
2070 encode_json("entity_type", type, f);
2071 encode_json("key", key, f);
2072 utime_t ut(timestamp);
2073 encode_json("timestamp", ut, f);
2074}
2075
2076void rgw_data_change::decode_json(JSONObj *obj) {
2077 string s;
2078 JSONDecoder::decode_json("entity_type", s, obj);
2079 if (s == "bucket") {
2080 entity_type = ENTITY_TYPE_BUCKET;
2081 } else {
2082 entity_type = ENTITY_TYPE_UNKNOWN;
2083 }
2084 JSONDecoder::decode_json("key", key, obj);
2085 utime_t ut;
2086 JSONDecoder::decode_json("timestamp", ut, obj);
2087 timestamp = ut.to_real_time();
2088}
2089
2090void rgw_data_change_log_entry::dump(Formatter *f) const
2091{
2092 encode_json("log_id", log_id, f);
2093 utime_t ut(log_timestamp);
2094 encode_json("log_timestamp", ut, f);
2095 encode_json("entry", entry, f);
2096}
2097
2098void rgw_data_change_log_entry::decode_json(JSONObj *obj) {
2099 JSONDecoder::decode_json("log_id", log_id, obj);
2100 utime_t ut;
2101 JSONDecoder::decode_json("log_timestamp", ut, obj);
2102 log_timestamp = ut.to_real_time();
2103 JSONDecoder::decode_json("entry", entry, obj);
2104}
2105
2106int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
2107 const string& name = bs.bucket.name;
2108 int shard_shift = (bs.shard_id > 0 ? bs.shard_id : 0);
2109 uint32_t r = (ceph_str_hash_linux(name.c_str(), name.size()) + shard_shift) % num_shards;
2110
2111 return (int)r;
2112}
2113
2114int RGWDataChangesLog::renew_entries()
2115{
11fdf7f2 2116 if (!store->svc.zone->need_to_log_data())
7c673cae
FG
2117 return 0;
2118
2119 /* we can't keep the bucket name as part of the cls_log_entry, and we need
2120 * it later, so we keep two lists under the map */
2121 map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > > m;
2122
2123 lock.Lock();
2124 map<rgw_bucket_shard, bool> entries;
2125 entries.swap(cur_cycle);
2126 lock.Unlock();
2127
2128 map<rgw_bucket_shard, bool>::iterator iter;
2129 string section;
2130 real_time ut = real_clock::now();
2131 for (iter = entries.begin(); iter != entries.end(); ++iter) {
2132 const rgw_bucket_shard& bs = iter->first;
2133
2134 int index = choose_oid(bs);
2135
2136 cls_log_entry entry;
2137
2138 rgw_data_change change;
2139 bufferlist bl;
2140 change.entity_type = ENTITY_TYPE_BUCKET;
2141 change.key = bs.get_key();
2142 change.timestamp = ut;
11fdf7f2 2143 encode(change, bl);
7c673cae
FG
2144
2145 store->time_log_prepare_entry(entry, ut, section, change.key, bl);
2146
2147 m[index].first.push_back(bs);
2148 m[index].second.emplace_back(std::move(entry));
2149 }
2150
2151 map<int, pair<list<rgw_bucket_shard>, list<cls_log_entry> > >::iterator miter;
2152 for (miter = m.begin(); miter != m.end(); ++miter) {
2153 list<cls_log_entry>& entries = miter->second.second;
2154
2155 real_time now = real_clock::now();
2156
2157 int ret = store->time_log_add(oids[miter->first], entries, NULL);
2158 if (ret < 0) {
2159 /* we don't really need to have a special handling for failed cases here,
2160 * as this is just an optimization. */
2161 lderr(cct) << "ERROR: store->time_log_add() returned " << ret << dendl;
2162 return ret;
2163 }
2164
2165 real_time expiration = now;
2166 expiration += make_timespan(cct->_conf->rgw_data_log_window);
2167
2168 list<rgw_bucket_shard>& buckets = miter->second.first;
2169 list<rgw_bucket_shard>::iterator liter;
2170 for (liter = buckets.begin(); liter != buckets.end(); ++liter) {
2171 update_renewed(*liter, expiration);
2172 }
2173 }
2174
2175 return 0;
2176}
2177
2178void RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, ChangeStatusPtr& status)
2179{
11fdf7f2 2180 ceph_assert(lock.is_locked());
7c673cae
FG
2181 if (!changes.find(bs, status)) {
2182 status = ChangeStatusPtr(new ChangeStatus);
2183 changes.add(bs, status);
2184 }
2185}
2186
2187void RGWDataChangesLog::register_renew(rgw_bucket_shard& bs)
2188{
2189 Mutex::Locker l(lock);
2190 cur_cycle[bs] = true;
2191}
2192
2193void RGWDataChangesLog::update_renewed(rgw_bucket_shard& bs, real_time& expiration)
2194{
2195 Mutex::Locker l(lock);
2196 ChangeStatusPtr status;
2197 _get_change(bs, status);
2198
2199 ldout(cct, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs.bucket.name << " shard_id=" << bs.shard_id << " expiration=" << expiration << dendl;
2200 status->cur_expiration = expiration;
2201}
2202
2203int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) {
2204 rgw_bucket_shard bs(bucket, shard_id);
2205
2206 return choose_oid(bs);
2207}
2208
2209int RGWDataChangesLog::add_entry(rgw_bucket& bucket, int shard_id) {
11fdf7f2 2210 if (!store->svc.zone->need_to_log_data())
7c673cae
FG
2211 return 0;
2212
91327a77
AA
2213 if (observer) {
2214 observer->on_bucket_changed(bucket.get_key());
2215 }
2216
7c673cae
FG
2217 rgw_bucket_shard bs(bucket, shard_id);
2218
2219 int index = choose_oid(bs);
2220 mark_modified(index, bs);
2221
2222 lock.Lock();
2223
2224 ChangeStatusPtr status;
2225 _get_change(bs, status);
2226
2227 lock.Unlock();
2228
2229 real_time now = real_clock::now();
2230
2231 status->lock->Lock();
2232
2233 ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
2234
2235 if (now < status->cur_expiration) {
2236 /* no need to send, recently completed */
2237 status->lock->Unlock();
2238
2239 register_renew(bs);
2240 return 0;
2241 }
2242
2243 RefCountedCond *cond;
2244
2245 if (status->pending) {
2246 cond = status->cond;
2247
11fdf7f2 2248 ceph_assert(cond);
7c673cae
FG
2249
2250 status->cond->get();
2251 status->lock->Unlock();
2252
2253 int ret = cond->wait();
2254 cond->put();
2255 if (!ret) {
2256 register_renew(bs);
2257 }
2258 return ret;
2259 }
2260
2261 status->cond = new RefCountedCond;
2262 status->pending = true;
2263
2264 string& oid = oids[index];
2265 real_time expiration;
2266
2267 int ret;
2268
2269 do {
2270 status->cur_sent = now;
2271
2272 expiration = now;
2273 expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
2274
2275 status->lock->Unlock();
2276
2277 bufferlist bl;
2278 rgw_data_change change;
2279 change.entity_type = ENTITY_TYPE_BUCKET;
2280 change.key = bs.get_key();
2281 change.timestamp = now;
11fdf7f2 2282 encode(change, bl);
7c673cae
FG
2283 string section;
2284
2285 ldout(cct, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
2286
2287 ret = store->time_log_add(oid, now, section, change.key, bl);
2288
2289 now = real_clock::now();
2290
2291 status->lock->Lock();
2292
2293 } while (!ret && real_clock::now() > expiration);
2294
2295 cond = status->cond;
2296
2297 status->pending = false;
2298 status->cur_expiration = status->cur_sent; /* time of when operation started, not completed */
2299 status->cur_expiration += make_timespan(cct->_conf->rgw_data_log_window);
2300 status->cond = NULL;
2301 status->lock->Unlock();
2302
2303 cond->done(ret);
2304 cond->put();
2305
2306 return ret;
2307}
2308
2309int RGWDataChangesLog::list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
2310 list<rgw_data_change_log_entry>& entries,
2311 const string& marker,
2312 string *out_marker,
2313 bool *truncated) {
31f18b77
FG
2314 if (shard >= num_shards)
2315 return -EINVAL;
7c673cae
FG
2316
2317 list<cls_log_entry> log_entries;
2318
2319 int ret = store->time_log_list(oids[shard], start_time, end_time,
2320 max_entries, log_entries, marker,
2321 out_marker, truncated);
2322 if (ret < 0)
2323 return ret;
2324
2325 list<cls_log_entry>::iterator iter;
2326 for (iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
2327 rgw_data_change_log_entry log_entry;
2328 log_entry.log_id = iter->id;
2329 real_time rt = iter->timestamp.to_real_time();
2330 log_entry.log_timestamp = rt;
11fdf7f2 2331 auto liter = iter->data.cbegin();
7c673cae 2332 try {
11fdf7f2 2333 decode(log_entry.entry, liter);
7c673cae
FG
2334 } catch (buffer::error& err) {
2335 lderr(cct) << "ERROR: failed to decode data changes log entry" << dendl;
2336 return -EIO;
2337 }
2338 entries.push_back(log_entry);
2339 }
2340
2341 return 0;
2342}
2343
2344int RGWDataChangesLog::list_entries(const real_time& start_time, const real_time& end_time, int max_entries,
2345 list<rgw_data_change_log_entry>& entries, LogMarker& marker, bool *ptruncated) {
2346 bool truncated;
2347 entries.clear();
2348
2349 for (; marker.shard < num_shards && (int)entries.size() < max_entries;
2350 marker.shard++, marker.marker.clear()) {
2351 int ret = list_entries(marker.shard, start_time, end_time, max_entries - entries.size(), entries,
2352 marker.marker, NULL, &truncated);
2353 if (ret == -ENOENT) {
2354 continue;
2355 }
2356 if (ret < 0) {
2357 return ret;
2358 }
2359 if (truncated) {
2360 *ptruncated = true;
2361 return 0;
2362 }
2363 }
2364
2365 *ptruncated = (marker.shard < num_shards);
2366
2367 return 0;
2368}
2369
2370int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info)
2371{
2372 if (shard_id >= num_shards)
2373 return -EINVAL;
2374
2375 string oid = oids[shard_id];
2376
2377 cls_log_header header;
2378
2379 int ret = store->time_log_info(oid, &header);
2380 if ((ret < 0) && (ret != -ENOENT))
2381 return ret;
2382
2383 info->marker = header.max_marker;
2384 info->last_update = header.max_time.to_real_time();
2385
2386 return 0;
2387}
2388
2389int RGWDataChangesLog::trim_entries(int shard_id, const real_time& start_time, const real_time& end_time,
2390 const string& start_marker, const string& end_marker)
2391{
7c673cae
FG
2392 if (shard_id > num_shards)
2393 return -EINVAL;
2394
eafe8130
TL
2395 return store->time_log_trim(oids[shard_id], start_time, end_time,
2396 start_marker, end_marker, nullptr);
7c673cae
FG
2397}
2398
11fdf7f2
TL
2399int RGWDataChangesLog::lock_exclusive(int shard_id, timespan duration, string& zone_id, string& owner_id) {
2400 return store->lock_exclusive(store->svc.zone->get_zone_params().log_pool, oids[shard_id], duration, zone_id, owner_id);
2401}
2402
2403int RGWDataChangesLog::unlock(int shard_id, string& zone_id, string& owner_id) {
2404 return store->unlock(store->svc.zone->get_zone_params().log_pool, oids[shard_id], zone_id, owner_id);
2405}
2406
7c673cae
FG
2407bool RGWDataChangesLog::going_down()
2408{
2409 return down_flag;
2410}
2411
2412RGWDataChangesLog::~RGWDataChangesLog() {
2413 down_flag = true;
2414 renew_thread->stop();
2415 renew_thread->join();
2416 delete renew_thread;
2417 delete[] oids;
2418}
2419
2420void *RGWDataChangesLog::ChangesRenewThread::entry() {
2421 do {
2422 dout(2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl;
2423 int r = log->renew_entries();
2424 if (r < 0) {
2425 dout(0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r << dendl;
2426 }
2427
2428 if (log->going_down())
2429 break;
2430
2431 int interval = cct->_conf->rgw_data_log_window * 3 / 4;
2432 lock.Lock();
2433 cond.WaitInterval(lock, utime_t(interval, 0));
2434 lock.Unlock();
2435 } while (!log->going_down());
2436
2437 return NULL;
2438}
2439
2440void RGWDataChangesLog::ChangesRenewThread::stop()
2441{
2442 Mutex::Locker l(lock);
2443 cond.Signal();
2444}
2445
2446void RGWDataChangesLog::mark_modified(int shard_id, const rgw_bucket_shard& bs)
2447{
2448 auto key = bs.get_key();
2449 modified_lock.get_read();
2450 map<int, set<string> >::iterator iter = modified_shards.find(shard_id);
2451 if (iter != modified_shards.end()) {
2452 set<string>& keys = iter->second;
2453 if (keys.find(key) != keys.end()) {
2454 modified_lock.unlock();
2455 return;
2456 }
2457 }
2458 modified_lock.unlock();
2459
2460 RWLock::WLocker wl(modified_lock);
2461 modified_shards[shard_id].insert(key);
2462}
2463
2464void RGWDataChangesLog::read_clear_modified(map<int, set<string> > &modified)
2465{
2466 RWLock::WLocker wl(modified_lock);
2467 modified.swap(modified_shards);
2468 modified_shards.clear();
2469}
2470
2471void RGWBucketCompleteInfo::dump(Formatter *f) const {
2472 encode_json("bucket_info", info, f);
2473 encode_json("attrs", attrs, f);
2474}
2475
2476void RGWBucketCompleteInfo::decode_json(JSONObj *obj) {
2477 JSONDecoder::decode_json("bucket_info", info, obj);
2478 JSONDecoder::decode_json("attrs", attrs, obj);
2479}
2480
2481class RGWBucketMetadataHandler : public RGWMetadataHandler {
2482
2483public:
2484 string get_type() override { return "bucket"; }
2485
2486 int get(RGWRados *store, string& entry, RGWMetadataObject **obj) override {
2487 RGWObjVersionTracker ot;
2488 RGWBucketEntryPoint be;
2489
2490 real_time mtime;
2491 map<string, bufferlist> attrs;
11fdf7f2 2492 auto obj_ctx = store->svc.sysobj->init_obj_ctx();
7c673cae
FG
2493
2494 string tenant_name, bucket_name;
2495 parse_bucket(entry, &tenant_name, &bucket_name);
2496 int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, be, &ot, &mtime, &attrs);
2497 if (ret < 0)
2498 return ret;
2499
2500 RGWBucketEntryMetadataObject *mdo = new RGWBucketEntryMetadataObject(be, ot.read_version, mtime);
2501
2502 *obj = mdo;
2503
2504 return 0;
2505 }
2506
2507 int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
2508 real_time mtime, JSONObj *obj, sync_type_t sync_type) override {
2509 RGWBucketEntryPoint be, old_be;
2510 try {
2511 decode_json_obj(be, obj);
2512 } catch (JSONDecoder::err& e) {
2513 return -EINVAL;
2514 }
2515
2516 real_time orig_mtime;
2517 map<string, bufferlist> attrs;
2518
2519 RGWObjVersionTracker old_ot;
11fdf7f2 2520 auto obj_ctx = store->svc.sysobj->init_obj_ctx();
7c673cae
FG
2521
2522 string tenant_name, bucket_name;
2523 parse_bucket(entry, &tenant_name, &bucket_name);
2524 int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, old_be, &old_ot, &orig_mtime, &attrs);
2525 if (ret < 0 && ret != -ENOENT)
2526 return ret;
2527
2528 // are we actually going to perform this put, or is it too old?
2529 if (ret != -ENOENT &&
2530 !check_versions(old_ot.read_version, orig_mtime,
2531 objv_tracker.write_version, mtime, sync_type)) {
2532 return STATUS_NO_APPLY;
2533 }
2534
2535 objv_tracker.read_version = old_ot.read_version; /* maintain the obj version we just read */
2536
2537 ret = store->put_bucket_entrypoint_info(tenant_name, bucket_name, be, false, objv_tracker, mtime, &attrs);
2538 if (ret < 0)
2539 return ret;
2540
2541 /* link bucket */
2542 if (be.linked) {
2543 ret = rgw_link_bucket(store, be.owner, be.bucket, be.creation_time, false);
2544 } else {
3efd9988
FG
2545 ret = rgw_unlink_bucket(store, be.owner, be.bucket.tenant,
2546 be.bucket.name, false);
7c673cae
FG
2547 }
2548
2549 return ret;
2550 }
2551
2552 struct list_keys_info {
2553 RGWRados *store;
2554 RGWListRawObjsCtx ctx;
2555 };
2556
2557 int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) override {
2558 RGWBucketEntryPoint be;
11fdf7f2 2559 auto obj_ctx = store->svc.sysobj->init_obj_ctx();
7c673cae
FG
2560
2561 string tenant_name, bucket_name;
2562 parse_bucket(entry, &tenant_name, &bucket_name);
2563 int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, be, &objv_tracker, NULL, NULL);
2564 if (ret < 0)
2565 return ret;
2566
2567 /*
31f18b77 2568 * We're unlinking the bucket but we don't want to update the entrypoint here - we're removing
7c673cae
FG
2569 * it immediately and don't want to invalidate our cached objv_version or the bucket obj removal
2570 * will incorrectly fail.
2571 */
2572 ret = rgw_unlink_bucket(store, be.owner, tenant_name, bucket_name, false);
2573 if (ret < 0) {
2574 lderr(store->ctx()) << "could not unlink bucket=" << entry << " owner=" << be.owner << dendl;
2575 }
2576
2577 ret = rgw_bucket_delete_bucket_obj(store, tenant_name, bucket_name, objv_tracker);
2578 if (ret < 0) {
2579 lderr(store->ctx()) << "could not delete bucket=" << entry << dendl;
2580 }
2581 /* idempotent */
2582 return 0;
2583 }
2584
2585 void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {
2586 oid = key;
11fdf7f2 2587 pool = store->svc.zone->get_zone_params().domain_root;
7c673cae
FG
2588 }
2589
181888fb 2590 int list_keys_init(RGWRados *store, const string& marker, void **phandle) override {
11fdf7f2 2591 auto info = std::make_unique<list_keys_info>();
7c673cae
FG
2592
2593 info->store = store;
2594
11fdf7f2 2595 int ret = store->list_raw_objects_init(store->svc.zone->get_zone_params().domain_root, marker,
181888fb
FG
2596 &info->ctx);
2597 if (ret < 0) {
2598 return ret;
2599 }
2600 *phandle = (void *)info.release();
7c673cae
FG
2601
2602 return 0;
2603 }
2604
2605 int list_keys_next(void *handle, int max, list<string>& keys, bool *truncated) override {
2606 list_keys_info *info = static_cast<list_keys_info *>(handle);
2607
2608 string no_filter;
2609
2610 keys.clear();
2611
2612 RGWRados *store = info->store;
2613
2614 list<string> unfiltered_keys;
2615
181888fb
FG
2616 int ret = store->list_raw_objects_next(no_filter, max, info->ctx,
2617 unfiltered_keys, truncated);
7c673cae
FG
2618 if (ret < 0 && ret != -ENOENT)
2619 return ret;
2620 if (ret == -ENOENT) {
2621 if (truncated)
2622 *truncated = false;
2623 return 0;
2624 }
2625
2626 // now filter out the system entries
2627 list<string>::iterator iter;
2628 for (iter = unfiltered_keys.begin(); iter != unfiltered_keys.end(); ++iter) {
2629 string& k = *iter;
2630
2631 if (k[0] != '.') {
2632 keys.push_back(k);
2633 }
2634 }
2635
2636 return 0;
2637 }
2638
2639 void list_keys_complete(void *handle) override {
2640 list_keys_info *info = static_cast<list_keys_info *>(handle);
2641 delete info;
2642 }
181888fb 2643
11fdf7f2 2644 string get_marker(void *handle) override {
181888fb
FG
2645 list_keys_info *info = static_cast<list_keys_info *>(handle);
2646 return info->store->list_raw_objs_get_cursor(info->ctx);
2647 }
7c673cae
FG
2648};
2649
11fdf7f2
TL
2650void get_md5_digest(const RGWBucketEntryPoint *be, string& md5_digest) {
2651
2652 char md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
2653 unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
2654 bufferlist bl;
2655
2656 Formatter *f = new JSONFormatter(false);
2657 be->dump(f);
2658 f->flush(bl);
2659
2660 MD5 hash;
2661 hash.Update((const unsigned char *)bl.c_str(), bl.length());
2662 hash.Final(m);
2663
2664 buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, md5);
2665
2666 delete f;
2667
2668 md5_digest = md5;
2669}
2670
2671#define ARCHIVE_META_ATTR RGW_ATTR_PREFIX "zone.archive.info"
2672
2673struct archive_meta_info {
2674 rgw_bucket orig_bucket;
2675
2676 bool from_attrs(CephContext *cct, map<string, bufferlist>& attrs) {
2677 auto iter = attrs.find(ARCHIVE_META_ATTR);
2678 if (iter == attrs.end()) {
2679 return false;
2680 }
2681
2682 auto bliter = iter->second.cbegin();
2683 try {
2684 decode(bliter);
2685 } catch (buffer::error& err) {
2686 ldout(cct, 0) << "ERROR: failed to decode archive meta info" << dendl;
2687 return false;
2688 }
2689
2690 return true;
2691 }
2692
2693 void store_in_attrs(map<string, bufferlist>& attrs) const {
2694 encode(attrs[ARCHIVE_META_ATTR]);
2695 }
2696
2697 void encode(bufferlist& bl) const {
2698 ENCODE_START(1, 1, bl);
2699 encode(orig_bucket, bl);
2700 ENCODE_FINISH(bl);
2701 }
2702
2703 void decode(bufferlist::const_iterator& bl) {
2704 DECODE_START(1, bl);
2705 decode(orig_bucket, bl);
2706 DECODE_FINISH(bl);
2707 }
2708};
2709WRITE_CLASS_ENCODER(archive_meta_info)
2710
2711class RGWArchiveBucketMetadataHandler : public RGWBucketMetadataHandler {
2712public:
2713 int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) override {
2714 ldout(store->ctx(), 5) << "SKIP: bucket removal is not allowed on archive zone: bucket:" << entry << " ... proceeding to rename" << dendl;
2715
2716 string tenant_name, bucket_name;
2717 parse_bucket(entry, &tenant_name, &bucket_name);
2718
2719 real_time mtime;
2720
2721 /* read original entrypoint */
2722
2723 RGWBucketEntryPoint be;
2724 auto obj_ctx = store->svc.sysobj->init_obj_ctx();
2725 map<string, bufferlist> attrs;
2726 int ret = store->get_bucket_entrypoint_info(obj_ctx, tenant_name, bucket_name, be, &objv_tracker, &mtime, &attrs);
2727 if (ret < 0) {
2728 return ret;
2729 }
2730
2731 string meta_name = bucket_name + ":" + be.bucket.bucket_id;
2732
2733 /* read original bucket instance info */
2734
2735 map<string, bufferlist> attrs_m;
2736 ceph::real_time orig_mtime;
2737 RGWBucketInfo old_bi;
2738
2739 ret = store->get_bucket_instance_info(obj_ctx, be.bucket, old_bi, &orig_mtime, &attrs_m);
2740 if (ret < 0) {
2741 return ret;
2742 }
2743
2744 archive_meta_info ami;
2745
2746 if (!ami.from_attrs(store->ctx(), attrs_m)) {
2747 ami.orig_bucket = old_bi.bucket;
2748 ami.store_in_attrs(attrs_m);
2749 }
2750
2751 /* generate a new bucket instance. We could have avoided this if we could just point a new
2752 * bucket entry point to the old bucket instance, however, due to limitation in the way
2753 * we index buckets under the user, bucket entrypoint and bucket instance of the same
2754 * bucket need to have the same name, so we need to copy the old bucket instance into
2755 * to a new entry with the new name
2756 */
2757
2758 string new_bucket_name;
2759
2760 RGWBucketInfo new_bi = old_bi;
2761 RGWBucketEntryPoint new_be = be;
2762
2763 string md5_digest;
2764
2765 get_md5_digest(&new_be, md5_digest);
2766 new_bucket_name = ami.orig_bucket.name + "-deleted-" + md5_digest;
2767
2768 new_bi.bucket.name = new_bucket_name;
2769 new_bi.objv_tracker.clear();
2770
2771 new_be.bucket.name = new_bucket_name;
2772
2773 ret = store->put_bucket_instance_info(new_bi, false, orig_mtime, &attrs_m);
2774 if (ret < 0) {
2775 ldout(store->ctx(), 0) << "ERROR: failed to put new bucket instance info for bucket=" << new_bi.bucket << " ret=" << ret << dendl;
2776 return ret;
2777 }
2778
2779 /* store a new entrypoint */
2780
2781 RGWObjVersionTracker ot;
2782 ot.generate_new_write_ver(store->ctx());
2783
2784 ret = store->put_bucket_entrypoint_info(tenant_name, new_bucket_name, new_be, true, ot, mtime, &attrs);
2785 if (ret < 0) {
2786 ldout(store->ctx(), 0) << "ERROR: failed to put new bucket entrypoint for bucket=" << new_be.bucket << " ret=" << ret << dendl;
2787 return ret;
2788 }
2789
2790 /* link new bucket */
2791
2792 ret = rgw_link_bucket(store, new_be.owner, new_be.bucket, new_be.creation_time, false);
2793 if (ret < 0) {
2794 ldout(store->ctx(), 0) << "ERROR: failed to link new bucket for bucket=" << new_be.bucket << " ret=" << ret << dendl;
2795 return ret;
2796 }
2797
2798 /* clean up old stuff */
2799
2800 ret = rgw_unlink_bucket(store, be.owner, tenant_name, bucket_name, false);
2801 if (ret < 0) {
2802 lderr(store->ctx()) << "could not unlink bucket=" << entry << " owner=" << be.owner << dendl;
2803 }
2804
2805 // if (ret == -ECANCELED) it means that there was a race here, and someone
2806 // wrote to the bucket entrypoint just before we removed it. The question is
2807 // whether it was a newly created bucket entrypoint ... in which case we
2808 // should ignore the error and move forward, or whether it is a higher version
2809 // of the same bucket instance ... in which we should retry
2810 ret = rgw_bucket_delete_bucket_obj(store, tenant_name, bucket_name, objv_tracker);
2811 if (ret < 0) {
2812 lderr(store->ctx()) << "could not delete bucket=" << entry << dendl;
2813 }
2814
2815 ret = rgw_delete_system_obj(store, store->svc.zone->get_zone_params().domain_root, RGW_BUCKET_INSTANCE_MD_PREFIX + meta_name, NULL);
2816
2817 /* idempotent */
2818
2819 return 0;
2820 }
2821
2822 int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
2823 real_time mtime, JSONObj *obj, sync_type_t sync_type) override {
2824 if (entry.find("-deleted-") != string::npos) {
2825 RGWObjVersionTracker ot;
2826 RGWMetadataObject *robj;
2827 int ret = get(store, entry, &robj);
2828 if (ret != -ENOENT) {
2829 if (ret < 0) {
2830 return ret;
2831 }
2832 ot.read_version = robj->get_version();
2833 delete robj;
2834
2835 ret = remove(store, entry, ot);
2836 if (ret < 0) {
2837 return ret;
2838 }
2839 }
2840 }
2841
2842 return RGWBucketMetadataHandler::put(store, entry, objv_tracker,
2843 mtime, obj, sync_type);
2844 }
2845
2846};
2847
7c673cae
FG
2848class RGWBucketInstanceMetadataHandler : public RGWMetadataHandler {
2849
2850public:
2851 string get_type() override { return "bucket.instance"; }
2852
2853 int get(RGWRados *store, string& oid, RGWMetadataObject **obj) override {
2854 RGWBucketCompleteInfo bci;
2855
2856 real_time mtime;
11fdf7f2 2857 auto obj_ctx = store->svc.sysobj->init_obj_ctx();
7c673cae
FG
2858
2859 int ret = store->get_bucket_instance_info(obj_ctx, oid, bci.info, &mtime, &bci.attrs);
2860 if (ret < 0)
2861 return ret;
2862
2863 RGWBucketInstanceMetadataObject *mdo = new RGWBucketInstanceMetadataObject(bci, bci.info.objv_tracker.read_version, mtime);
2864
2865 *obj = mdo;
2866
2867 return 0;
2868 }
2869
2870 int put(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker,
2871 real_time mtime, JSONObj *obj, sync_type_t sync_type) override {
2872 RGWBucketCompleteInfo bci, old_bci;
2873 try {
2874 decode_json_obj(bci, obj);
2875 } catch (JSONDecoder::err& e) {
2876 return -EINVAL;
2877 }
2878
2879 real_time orig_mtime;
11fdf7f2 2880 auto obj_ctx = store->svc.sysobj->init_obj_ctx();
7c673cae
FG
2881
2882 int ret = store->get_bucket_instance_info(obj_ctx, entry, old_bci.info,
2883 &orig_mtime, &old_bci.attrs);
2884 bool exists = (ret != -ENOENT);
2885 if (ret < 0 && exists)
2886 return ret;
2887
2888 if (!exists || old_bci.info.bucket.bucket_id != bci.info.bucket.bucket_id) {
2889 /* a new bucket, we need to select a new bucket placement for it */
2890 auto key(entry);
2891 rgw_bucket_instance_oid_to_key(key);
2892 string tenant_name;
2893 string bucket_name;
2894 string bucket_instance;
2895 parse_bucket(key, &tenant_name, &bucket_name, &bucket_instance);
2896
2897 RGWZonePlacementInfo rule_info;
2898 bci.info.bucket.name = bucket_name;
2899 bci.info.bucket.bucket_id = bucket_instance;
2900 bci.info.bucket.tenant = tenant_name;
11fdf7f2 2901 ret = store->svc.zone->select_bucket_location_by_rule(bci.info.placement_rule, &rule_info);
7c673cae
FG
2902 if (ret < 0) {
2903 ldout(store->ctx(), 0) << "ERROR: select_bucket_placement() returned " << ret << dendl;
2904 return ret;
2905 }
2906 bci.info.index_type = rule_info.index_type;
2907 } else {
2908 /* existing bucket, keep its placement */
2909 bci.info.bucket.explicit_placement = old_bci.info.bucket.explicit_placement;
2910 bci.info.placement_rule = old_bci.info.placement_rule;
2911 }
2912
c07f9fc5
FG
2913 if (exists && old_bci.info.datasync_flag_enabled() != bci.info.datasync_flag_enabled()) {
2914 int shards_num = bci.info.num_shards? bci.info.num_shards : 1;
2915 int shard_id = bci.info.num_shards? 0 : -1;
2916
2917 if (!bci.info.datasync_flag_enabled()) {
2918 ret = store->stop_bi_log_entries(bci.info, -1);
2919 if (ret < 0) {
2920 lderr(store->ctx()) << "ERROR: failed writing bilog" << dendl;
2921 return ret;
2922 }
2923 } else {
2924 ret = store->resync_bi_log_entries(bci.info, -1);
2925 if (ret < 0) {
2926 lderr(store->ctx()) << "ERROR: failed writing bilog" << dendl;
2927 return ret;
2928 }
2929 }
2930
2931 for (int i = 0; i < shards_num; ++i, ++shard_id) {
2932 ret = store->data_log->add_entry(bci.info.bucket, shard_id);
2933 if (ret < 0) {
2934 lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
2935 return ret;
2936 }
2937 }
2938 }
2939
7c673cae
FG
2940 // are we actually going to perform this put, or is it too old?
2941 if (exists &&
2942 !check_versions(old_bci.info.objv_tracker.read_version, orig_mtime,
2943 objv_tracker.write_version, mtime, sync_type)) {
2944 objv_tracker.read_version = old_bci.info.objv_tracker.read_version;
2945 return STATUS_NO_APPLY;
2946 }
2947
2948 /* record the read version (if any), store the new version */
2949 bci.info.objv_tracker.read_version = old_bci.info.objv_tracker.read_version;
2950 bci.info.objv_tracker.write_version = objv_tracker.write_version;
2951
2952 ret = store->put_bucket_instance_info(bci.info, false, mtime, &bci.attrs);
2953 if (ret < 0)
2954 return ret;
2955
2956 objv_tracker = bci.info.objv_tracker;
2957
2958 ret = store->init_bucket_index(bci.info, bci.info.num_shards);
2959 if (ret < 0)
2960 return ret;
2961
2962 return STATUS_APPLIED;
2963 }
2964
2965 struct list_keys_info {
2966 RGWRados *store;
2967 RGWListRawObjsCtx ctx;
2968 };
2969
f64942e4
AA
2970 int remove(RGWRados *store, string& entry,
2971 RGWObjVersionTracker& objv_tracker) override {
7c673cae 2972 RGWBucketInfo info;
11fdf7f2 2973 auto obj_ctx = store->svc.sysobj->init_obj_ctx();
7c673cae
FG
2974
2975 int ret = store->get_bucket_instance_info(obj_ctx, entry, info, NULL, NULL);
2976 if (ret < 0 && ret != -ENOENT)
2977 return ret;
2978
f64942e4
AA
2979 return rgw_bucket_instance_remove_entry(store, entry,
2980 &info.objv_tracker);
7c673cae
FG
2981 }
2982
2983 void get_pool_and_oid(RGWRados *store, const string& key, rgw_pool& pool, string& oid) override {
2984 oid = RGW_BUCKET_INSTANCE_MD_PREFIX + key;
2985 rgw_bucket_instance_key_to_oid(oid);
11fdf7f2 2986 pool = store->svc.zone->get_zone_params().domain_root;
7c673cae
FG
2987 }
2988
181888fb 2989 int list_keys_init(RGWRados *store, const string& marker, void **phandle) override {
11fdf7f2 2990 auto info = std::make_unique<list_keys_info>();
7c673cae
FG
2991
2992 info->store = store;
2993
11fdf7f2 2994 int ret = store->list_raw_objects_init(store->svc.zone->get_zone_params().domain_root, marker,
181888fb
FG
2995 &info->ctx);
2996 if (ret < 0) {
2997 return ret;
2998 }
2999 *phandle = (void *)info.release();
7c673cae
FG
3000
3001 return 0;
3002 }
3003
3004 int list_keys_next(void *handle, int max, list<string>& keys, bool *truncated) override {
3005 list_keys_info *info = static_cast<list_keys_info *>(handle);
3006
3007 string no_filter;
3008
3009 keys.clear();
3010
3011 RGWRados *store = info->store;
3012
3013 list<string> unfiltered_keys;
3014
181888fb
FG
3015 int ret = store->list_raw_objects_next(no_filter, max, info->ctx,
3016 unfiltered_keys, truncated);
7c673cae
FG
3017 if (ret < 0 && ret != -ENOENT)
3018 return ret;
3019 if (ret == -ENOENT) {
3020 if (truncated)
3021 *truncated = false;
3022 return 0;
3023 }
3024
3025 constexpr int prefix_size = sizeof(RGW_BUCKET_INSTANCE_MD_PREFIX) - 1;
3026 // now filter in the relevant entries
3027 list<string>::iterator iter;
3028 for (iter = unfiltered_keys.begin(); iter != unfiltered_keys.end(); ++iter) {
3029 string& k = *iter;
3030
3031 if (k.compare(0, prefix_size, RGW_BUCKET_INSTANCE_MD_PREFIX) == 0) {
3032 auto oid = k.substr(prefix_size);
3033 rgw_bucket_instance_oid_to_key(oid);
3034 keys.emplace_back(std::move(oid));
3035 }
3036 }
3037
3038 return 0;
3039 }
3040
3041 void list_keys_complete(void *handle) override {
3042 list_keys_info *info = static_cast<list_keys_info *>(handle);
3043 delete info;
3044 }
3045
11fdf7f2 3046 string get_marker(void *handle) override {
181888fb
FG
3047 list_keys_info *info = static_cast<list_keys_info *>(handle);
3048 return info->store->list_raw_objs_get_cursor(info->ctx);
3049 }
3050
7c673cae
FG
3051 /*
3052 * hash entry for mdlog placement. Use the same hash key we'd have for the bucket entry
3053 * point, so that the log entries end up at the same log shard, so that we process them
3054 * in order
3055 */
3056 void get_hash_key(const string& section, const string& key, string& hash_key) override {
3057 string k;
3058 int pos = key.find(':');
3059 if (pos < 0)
3060 k = key;
3061 else
3062 k = key.substr(0, pos);
3063 hash_key = "bucket:" + k;
3064 }
3065};
3066
11fdf7f2
TL
3067class RGWArchiveBucketInstanceMetadataHandler : public RGWBucketInstanceMetadataHandler {
3068public:
3069
3070 int remove(RGWRados *store, string& entry, RGWObjVersionTracker& objv_tracker) override {
3071 ldout(store->ctx(), 0) << "SKIP: bucket instance removal is not allowed on archive zone: bucket.instance:" << entry << dendl;
3072 return 0;
3073 }
3074};
3075
3076RGWMetadataHandler *RGWBucketMetaHandlerAllocator::alloc() {
3077 return new RGWBucketMetadataHandler;
3078}
3079
3080RGWMetadataHandler *RGWBucketInstanceMetaHandlerAllocator::alloc() {
3081 return new RGWBucketInstanceMetadataHandler;
3082}
3083
3084RGWMetadataHandler *RGWArchiveBucketMetaHandlerAllocator::alloc() {
3085 return new RGWArchiveBucketMetadataHandler;
3086}
3087
3088RGWMetadataHandler *RGWArchiveBucketInstanceMetaHandlerAllocator::alloc() {
3089 return new RGWArchiveBucketInstanceMetadataHandler;
3090}
3091
7c673cae
FG
3092void rgw_bucket_init(RGWMetadataManager *mm)
3093{
11fdf7f2
TL
3094 auto sync_module = mm->get_store()->get_sync_module();
3095 if (sync_module) {
3096 bucket_meta_handler = sync_module->alloc_bucket_meta_handler();
3097 bucket_instance_meta_handler = sync_module->alloc_bucket_instance_meta_handler();
3098 } else {
3099 bucket_meta_handler = RGWBucketMetaHandlerAllocator::alloc();
3100 bucket_instance_meta_handler = RGWBucketInstanceMetaHandlerAllocator::alloc();
3101 }
7c673cae 3102 mm->register_handler(bucket_meta_handler);
7c673cae
FG
3103 mm->register_handler(bucket_instance_meta_handler);
3104}