]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sal_rados.cc
157dc88f65c1043732cc7c67401958f7f6a27146
[ceph.git] / ceph / src / rgw / rgw_sal_rados.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 /*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2020 Red Hat, Inc.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #include <errno.h>
17 #include <stdlib.h>
18 #include <system_error>
19 #include <unistd.h>
20 #include <sstream>
21
22 #include "common/Clock.h"
23 #include "common/errno.h"
24
25 #include "rgw_sal.h"
26 #include "rgw_sal_rados.h"
27 #include "rgw_bucket.h"
28 #include "rgw_multi.h"
29 #include "rgw_acl_s3.h"
30
31 #include "rgw_zone.h"
32 #include "rgw_rest_conn.h"
33 #include "services/svc_sys_obj.h"
34 #include "services/svc_zone.h"
35 #include "services/svc_tier_rados.h"
36 #include "cls/rgw/cls_rgw_client.h"
37
38 #include "rgw_pubsub.h"
39
40 #define dout_subsys ceph_subsys_rgw
41
42 namespace rgw::sal {
43
44 int RGWRadosUser::list_buckets(const string& marker, const string& end_marker,
45 uint64_t max, bool need_stats, RGWBucketList &buckets,
46 optional_yield y)
47 {
48 RGWUserBuckets ulist;
49 bool is_truncated = false;
50 int ret;
51
52 buckets.clear();
53 ret = store->ctl()->user->list_buckets(info.user_id, marker, end_marker, max,
54 need_stats, &ulist, &is_truncated, y);
55 if (ret < 0)
56 return ret;
57
58 buckets.set_truncated(is_truncated);
59 for (const auto& ent : ulist.get_buckets()) {
60 buckets.add(std::unique_ptr<RGWBucket>(new RGWRadosBucket(this->store, ent.second, this)));
61 }
62
63 return 0;
64 }
65
66 RGWBucket* RGWRadosUser::create_bucket(rgw_bucket& bucket,
67 ceph::real_time creation_time)
68 {
69 return NULL;
70 }
71
72 int RGWRadosUser::load_by_id(optional_yield y)
73
74 {
75 return store->ctl()->user->get_info_by_uid(info.user_id, &info, y);
76 }
77
78 std::unique_ptr<RGWObject> RGWRadosStore::get_object(const rgw_obj_key& k)
79 {
80 return std::unique_ptr<RGWObject>(new RGWRadosObject(this, k));
81 }
82
83 /* Placeholder */
84 RGWObject *RGWRadosBucket::create_object(const rgw_obj_key &key)
85 {
86 return nullptr;
87 }
88
89 int RGWRadosBucket::remove_bucket(bool delete_children, std::string prefix, std::string delimiter, bool forward_to_master, req_info* req_info, optional_yield y)
90 {
91 int ret;
92
93 // Refresh info
94 ret = get_bucket_info(y);
95 if (ret < 0)
96 return ret;
97
98 ListParams params;
99 params.list_versions = true;
100 params.allow_unordered = true;
101
102 ListResults results;
103
104 bool is_truncated = false;
105 do {
106 results.objs.clear();
107
108 ret = list(params, 1000, results, y);
109 if (ret < 0)
110 return ret;
111
112 if (!results.objs.empty() && !delete_children) {
113 lderr(store->ctx()) << "ERROR: could not remove non-empty bucket " << info.bucket.name <<
114 dendl;
115 return -ENOTEMPTY;
116 }
117
118 for (const auto& obj : results.objs) {
119 rgw_obj_key key(obj.key);
120 /* xxx dang */
121 ret = rgw_remove_object(store, info, info.bucket, key);
122 if (ret < 0 && ret != -ENOENT) {
123 return ret;
124 }
125 }
126 } while(is_truncated);
127
128 /* If there's a prefix, then we are aborting multiparts as well */
129 if (!prefix.empty()) {
130 ret = abort_bucket_multiparts(store, store->ctx(), info, prefix, delimiter);
131 if (ret < 0) {
132 return ret;
133 }
134 }
135
136 ret = store->ctl()->bucket->sync_user_stats(info.owner, info, y);
137 if (ret < 0) {
138 ldout(store->ctx(), 1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl;
139 }
140
141 RGWObjVersionTracker ot;
142
143 // if we deleted children above we will force delete, as any that
144 // remain is detrius from a prior bug
145 ret = store->getRados()->delete_bucket(info, ot, y, !delete_children);
146 if (ret < 0) {
147 lderr(store->ctx()) << "ERROR: could not remove bucket " <<
148 info.bucket.name << dendl;
149 return ret;
150 }
151
152 // if bucket has notification definitions associated with it
153 // they should be removed (note that any pending notifications on the bucket are still going to be sent)
154 RGWPubSub ps(store, info.owner.tenant);
155 RGWPubSub::Bucket ps_bucket(&ps, info.bucket);
156 const auto ps_ret = ps_bucket.remove_notifications(y);
157 if (ps_ret < 0 && ps_ret != -ENOENT) {
158 lderr(store->ctx()) << "ERROR: unable to remove notifications from bucket. ret=" << ps_ret << dendl;
159 }
160
161 ret = store->ctl()->bucket->unlink_bucket(info.owner, info.bucket, y, false);
162 if (ret < 0) {
163 lderr(store->ctx()) << "ERROR: unable to remove user bucket information" << dendl;
164 }
165
166 if (forward_to_master) {
167 bufferlist in_data;
168 ret = store->forward_request_to_master(owner, &ot.read_version, in_data, nullptr, *req_info, y);
169 if (ret < 0) {
170 if (ret == -ENOENT) {
171 /* adjust error, we want to return with NoSuchBucket and not
172 * NoSuchKey */
173 ret = -ERR_NO_SUCH_BUCKET;
174 }
175 return ret;
176 }
177 }
178
179 return ret;
180 }
181
182 int RGWRadosBucket::get_bucket_info(optional_yield y)
183 {
184 auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
185 RGWSI_MetaBackend_CtxParams bectx_params = RGWSI_MetaBackend_CtxParams_SObj(&obj_ctx);
186 RGWObjVersionTracker ep_ot;
187 int ret = store->ctl()->bucket->read_bucket_info(info.bucket, &info, y,
188 RGWBucketCtl::BucketInstance::GetParams()
189 .set_mtime(&mtime)
190 .set_attrs(&attrs)
191 .set_bectx_params(bectx_params),
192 &ep_ot);
193 if (ret == 0) {
194 bucket_version = ep_ot.read_version;
195 ent.placement_rule = info.placement_rule;
196 }
197 return ret;
198 }
199
200 int RGWRadosBucket::load_by_name(const std::string& tenant, const std::string& bucket_name, const std::string bucket_instance_id, RGWSysObjectCtx *rctx, optional_yield y)
201 {
202 info.bucket.tenant = tenant;
203 info.bucket.name = bucket_name;
204 info.bucket.bucket_id = bucket_instance_id;
205 ent.bucket = info.bucket;
206
207 if (bucket_instance_id.empty()) {
208 return get_bucket_info(y);
209 }
210
211 return store->getRados()->get_bucket_instance_info(*rctx, info.bucket, info, NULL, &attrs, y);
212 }
213
214 int RGWRadosBucket::get_bucket_stats(RGWBucketInfo& bucket_info, int shard_id,
215 std::string *bucket_ver, std::string *master_ver,
216 std::map<RGWObjCategory, RGWStorageStats>& stats,
217 std::string *max_marker, bool *syncstopped)
218 {
219 return store->getRados()->get_bucket_stats(bucket_info, shard_id, bucket_ver, master_ver, stats, max_marker, syncstopped);
220 }
221
222 int RGWRadosBucket::read_bucket_stats(optional_yield y)
223 {
224 int ret = store->ctl()->bucket->read_bucket_stats(info.bucket, &ent, y);
225 info.placement_rule = ent.placement_rule;
226 return ret;
227 }
228
229 int RGWRadosBucket::sync_user_stats(optional_yield y)
230 {
231 return store->ctl()->bucket->sync_user_stats(owner->get_id(), info, y);
232 }
233
234 int RGWRadosBucket::update_container_stats(void)
235 {
236 int ret;
237 map<std::string, RGWBucketEnt> m;
238
239 m[info.bucket.name] = ent;
240 ret = store->getRados()->update_containers_stats(m);
241 if (!ret)
242 return -EEXIST;
243 if (ret < 0)
244 return ret;
245
246 map<string, RGWBucketEnt>::iterator iter = m.find(info.bucket.name);
247 if (iter == m.end())
248 return -EINVAL;
249
250 ent.count = iter->second.count;
251 ent.size = iter->second.size;
252 ent.size_rounded = iter->second.size_rounded;
253 ent.creation_time = iter->second.creation_time;
254 ent.placement_rule = std::move(iter->second.placement_rule);
255
256 info.creation_time = ent.creation_time;
257 info.placement_rule = ent.placement_rule;
258
259 return 0;
260 }
261
262 int RGWRadosBucket::check_bucket_shards(void)
263 {
264 return store->getRados()->check_bucket_shards(info, info.bucket, get_count());
265 }
266
267 int RGWRadosBucket::link(RGWUser* new_user, optional_yield y)
268 {
269 RGWBucketEntryPoint ep;
270 ep.bucket = info.bucket;
271 ep.owner = new_user->get_user();
272 ep.creation_time = get_creation_time();
273 ep.linked = true;
274 RGWAttrs ep_attrs;
275 rgw_ep_info ep_data{ep, ep_attrs};
276
277 return store->ctl()->bucket->link_bucket(new_user->get_user(), info.bucket,
278 ceph::real_time(), y, true, &ep_data);
279 }
280
281 int RGWRadosBucket::unlink(RGWUser* new_user, optional_yield y)
282 {
283 return -1;
284 }
285
286 int RGWRadosBucket::chown(RGWUser* new_user, RGWUser* old_user, optional_yield y)
287 {
288 string obj_marker;
289
290 return store->ctl()->bucket->chown(store, info, new_user->get_user(),
291 old_user->get_display_name(), obj_marker, y);
292 }
293
294 int RGWRadosBucket::put_instance_info(bool exclusive, ceph::real_time _mtime)
295 {
296 mtime = _mtime;
297 return store->getRados()->put_bucket_instance_info(info, exclusive, mtime, &attrs);
298 }
299
300 /* Make sure to call get_bucket_info() if you need it first */
301 bool RGWRadosBucket::is_owner(RGWUser* user)
302 {
303 return (info.owner.compare(user->get_user()) == 0);
304 }
305
306 int RGWRadosBucket::check_empty(optional_yield y)
307 {
308 return store->getRados()->check_bucket_empty(info, y);
309 }
310
311 int RGWRadosBucket::check_quota(RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t obj_size,
312 optional_yield y, bool check_size_only)
313 {
314 return store->getRados()->check_quota(owner->get_user(), get_key(),
315 user_quota, bucket_quota, obj_size, y, check_size_only);
316 }
317
318 int RGWRadosBucket::set_instance_attrs(RGWAttrs& attrs, optional_yield y)
319 {
320 return store->ctl()->bucket->set_bucket_instance_attrs(get_info(),
321 attrs, &get_info().objv_tracker, y);
322 }
323
324 int RGWRadosBucket::try_refresh_info(ceph::real_time *pmtime)
325 {
326 return store->getRados()->try_refresh_bucket_info(info, pmtime, &attrs);
327 }
328
329 int RGWRadosBucket::read_usage(uint64_t start_epoch, uint64_t end_epoch,
330 uint32_t max_entries, bool *is_truncated,
331 RGWUsageIter& usage_iter,
332 map<rgw_user_bucket, rgw_usage_log_entry>& usage)
333 {
334 return store->getRados()->read_usage(owner->get_id(), get_name(), start_epoch,
335 end_epoch, max_entries, is_truncated,
336 usage_iter, usage);
337 }
338
339 int RGWRadosBucket::set_acl(RGWAccessControlPolicy &acl, optional_yield y)
340 {
341 bufferlist aclbl;
342
343 acls = acl;
344 acl.encode(aclbl);
345
346 return store->ctl()->bucket->set_acl(acl.get_owner(), info.bucket, info, aclbl, y);
347 }
348
349 std::unique_ptr<RGWObject> RGWRadosBucket::get_object(const rgw_obj_key& k)
350 {
351 return std::unique_ptr<RGWObject>(new RGWRadosObject(this->store, k, this));
352 }
353
354 int RGWRadosBucket::list(ListParams& params, int max, ListResults& results, optional_yield y)
355 {
356 RGWRados::Bucket target(store->getRados(), get_info());
357 if (params.shard_id >= 0) {
358 target.set_shard_id(params.shard_id);
359 }
360 RGWRados::Bucket::List list_op(&target);
361
362 list_op.params.prefix = params.prefix;
363 list_op.params.delim = params.delim;
364 list_op.params.marker = params.marker;
365 list_op.params.ns = params.ns;
366 list_op.params.end_marker = params.end_marker;
367 list_op.params.list_versions = params.list_versions;
368 list_op.params.allow_unordered = params.allow_unordered;
369
370 int ret = list_op.list_objects(max, &results.objs, &results.common_prefixes, &results.is_truncated, y);
371 if (ret >= 0) {
372 results.next_marker = list_op.get_next_marker();
373 }
374
375 return ret;
376 }
377
378 std::unique_ptr<RGWUser> RGWRadosStore::get_user(const rgw_user &u)
379 {
380 return std::unique_ptr<RGWUser>(new RGWRadosUser(this, u));
381 }
382
383 //RGWBucket *RGWRadosStore::create_bucket(RGWUser &u, const rgw_bucket &b)
384 //{
385 //if (!bucket) {
386 //bucket = new RGWRadosBucket(this, u, b);
387 //}
388 //
389 //return bucket;
390 //}
391 //
392 void RGWRadosStore::finalize(void)
393 {
394 if (rados)
395 rados->finalize();
396 }
397
398 int RGWObject::range_to_ofs(uint64_t obj_size, int64_t &ofs, int64_t &end)
399 {
400 if (ofs < 0) {
401 ofs += obj_size;
402 if (ofs < 0)
403 ofs = 0;
404 end = obj_size - 1;
405 } else if (end < 0) {
406 end = obj_size - 1;
407 }
408
409 if (obj_size > 0) {
410 if (ofs >= (off_t)obj_size) {
411 return -ERANGE;
412 }
413 if (end >= (off_t)obj_size) {
414 end = obj_size - 1;
415 }
416 }
417 return 0;
418 }
419
420 int RGWRadosObject::get_obj_state(RGWObjectCtx *rctx, RGWBucket& bucket, RGWObjState **state, optional_yield y, bool follow_olh)
421 {
422 rgw_obj obj(bucket.get_key(), key.name);
423
424 return store->getRados()->get_obj_state(rctx, bucket.get_info(), obj, state, follow_olh, y);
425 }
426
427 int RGWRadosObject::read_attrs(RGWRados::Object::Read &read_op, optional_yield y, rgw_obj *target_obj)
428 {
429 read_op.params.attrs = &attrs;
430 read_op.params.target_obj = target_obj;
431 read_op.params.obj_size = &obj_size;
432 read_op.params.lastmod = &mtime;
433
434 return read_op.prepare(y);
435 }
436
437 int RGWRadosObject::set_obj_attrs(RGWObjectCtx* rctx, RGWAttrs* setattrs, RGWAttrs* delattrs, optional_yield y, rgw_obj* target_obj)
438 {
439 RGWAttrs empty;
440 rgw_obj target = get_obj();
441
442 if (!target_obj)
443 target_obj = &target;
444
445 return store->getRados()->set_attrs(rctx,
446 bucket->get_info(),
447 *target_obj,
448 setattrs ? *setattrs : empty,
449 delattrs ? delattrs : nullptr,
450 y);
451 }
452
453 int RGWRadosObject::get_obj_attrs(RGWObjectCtx *rctx, optional_yield y, rgw_obj* target_obj)
454 {
455 RGWRados::Object op_target(store->getRados(), bucket->get_info(), *rctx, get_obj());
456 RGWRados::Object::Read read_op(&op_target);
457
458 return read_attrs(read_op, y, target_obj);
459 }
460
461 int RGWRadosObject::modify_obj_attrs(RGWObjectCtx *rctx, const char *attr_name, bufferlist& attr_val, optional_yield y)
462 {
463 rgw_obj target = get_obj();
464 int r = get_obj_attrs(rctx, y, &target);
465 if (r < 0) {
466 return r;
467 }
468 set_atomic(rctx);
469 attrs[attr_name] = attr_val;
470 return set_obj_attrs(rctx, &attrs, nullptr, y, &target);
471 }
472
473 int RGWRadosObject::delete_obj_attrs(RGWObjectCtx *rctx, const char *attr_name, optional_yield y)
474 {
475 RGWAttrs rmattr;
476 bufferlist bl;
477
478 set_atomic(rctx);
479 rmattr[attr_name] = bl;
480 return set_obj_attrs(rctx, nullptr, &rmattr, y);
481 }
482
483 int RGWRadosObject::copy_obj_data(RGWObjectCtx& rctx, RGWBucket* dest_bucket,
484 RGWObject* dest_obj,
485 uint16_t olh_epoch,
486 std::string* petag,
487 const DoutPrefixProvider *dpp,
488 optional_yield y)
489 {
490 RGWAttrs attrset;
491 RGWRados::Object op_target(store->getRados(), dest_bucket->get_info(), rctx, get_obj());
492 RGWRados::Object::Read read_op(&op_target);
493
494 int ret = read_attrs(read_op, y);
495 if (ret < 0)
496 return ret;
497
498 attrset = attrs;
499
500 attrset.erase(RGW_ATTR_ID_TAG);
501 attrset.erase(RGW_ATTR_TAIL_TAG);
502
503 return store->getRados()->copy_obj_data(rctx, dest_bucket,
504 dest_bucket->get_info().placement_rule, read_op,
505 obj_size - 1, dest_obj, NULL, mtime, attrset, 0,
506 real_time(), NULL, dpp, y);
507 }
508
509 void RGWRadosObject::set_atomic(RGWObjectCtx *rctx) const
510 {
511 rgw_obj obj = get_obj();
512 store->getRados()->set_atomic(rctx, obj);
513 }
514
515 void RGWRadosObject::set_prefetch_data(RGWObjectCtx *rctx)
516 {
517 rgw_obj obj = get_obj();
518 store->getRados()->set_prefetch_data(rctx, obj);
519 }
520
521 bool RGWRadosObject::is_expired() {
522 auto iter = attrs.find(RGW_ATTR_DELETE_AT);
523 if (iter != attrs.end()) {
524 utime_t delete_at;
525 try {
526 auto bufit = iter->second.cbegin();
527 decode(delete_at, bufit);
528 } catch (buffer::error& err) {
529 ldout(store->ctx(), 0) << "ERROR: " << __func__ << ": failed to decode " RGW_ATTR_DELETE_AT " attr" << dendl;
530 return false;
531 }
532
533 if (delete_at <= ceph_clock_now() && !delete_at.is_zero()) {
534 return true;
535 }
536 }
537
538 return false;
539 }
540
541 void RGWRadosObject::gen_rand_obj_instance_name()
542 {
543 store->getRados()->gen_rand_obj_instance_name(&key);
544 }
545
546 void RGWRadosObject::raw_obj_to_obj(const rgw_raw_obj& raw_obj)
547 {
548 rgw_obj tobj = get_obj();
549 RGWSI_Tier_RADOS::raw_obj_to_obj(get_bucket()->get_key(), raw_obj, &tobj);
550 set_key(tobj.key);
551 }
552
553 void RGWRadosObject::get_raw_obj(rgw_raw_obj* raw_obj)
554 {
555 store->getRados()->obj_to_raw((bucket->get_info()).placement_rule, get_obj(), raw_obj);
556 }
557
558 int RGWRadosObject::omap_get_vals_by_keys(const std::string& oid,
559 const std::set<std::string>& keys,
560 RGWAttrs *vals)
561 {
562 int ret;
563 rgw_raw_obj head_obj;
564 librados::IoCtx cur_ioctx;
565 rgw_obj obj = get_obj();
566
567 store->getRados()->obj_to_raw(bucket->get_placement_rule(), obj, &head_obj);
568 ret = store->get_obj_head_ioctx(bucket->get_info(), obj, &cur_ioctx);
569 if (ret < 0) {
570 return ret;
571 }
572
573 return cur_ioctx.omap_get_vals_by_keys(oid, keys, vals);
574 }
575
576 int RGWRadosObject::omap_set_val_by_key(const std::string& key, bufferlist& val,
577 bool must_exist, optional_yield y)
578 {
579 rgw_raw_obj raw_meta_obj;
580 rgw_obj obj = get_obj();
581
582 store->getRados()->obj_to_raw(bucket->get_placement_rule(), obj, &raw_meta_obj);
583
584 auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
585 auto sysobj = obj_ctx.get_obj(raw_meta_obj);
586
587 return sysobj.omap().set_must_exist(must_exist).set(key, val, y);
588 }
589
590 MPSerializer* RGWRadosObject::get_serializer(const std::string& lock_name)
591 {
592 return new MPRadosSerializer(store, this, lock_name);
593 }
594
595 int RGWRadosObject::transition(RGWObjectCtx& rctx,
596 RGWBucket* bucket,
597 const rgw_placement_rule& placement_rule,
598 const real_time& mtime,
599 uint64_t olh_epoch,
600 const DoutPrefixProvider *dpp,
601 optional_yield y)
602 {
603 return store->getRados()->transition_obj(rctx, bucket, *this, placement_rule, mtime, olh_epoch, dpp, y);
604 }
605
606 int RGWRadosObject::get_max_chunk_size(rgw_placement_rule placement_rule, uint64_t *max_chunk_size, uint64_t *alignment)
607 {
608 return store->getRados()->get_max_chunk_size(placement_rule, get_obj(), max_chunk_size, alignment);
609 }
610
611 void RGWRadosObject::get_max_aligned_size(uint64_t size, uint64_t alignment,
612 uint64_t *max_size)
613 {
614 store->getRados()->get_max_aligned_size(size, alignment, max_size);
615 }
616
617 bool RGWRadosObject::placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2)
618 {
619 rgw_obj obj;
620 rgw_pool p1, p2;
621
622 obj = get_obj();
623
624 if (r1 == r2)
625 return true;
626
627 if (!store->getRados()->get_obj_data_pool(r1, obj, &p1)) {
628 return false;
629 }
630 if (!store->getRados()->get_obj_data_pool(r2, obj, &p2)) {
631 return false;
632 }
633
634 return p1 == p2;
635 }
636
637 std::unique_ptr<RGWObject::ReadOp> RGWRadosObject::get_read_op(RGWObjectCtx *ctx)
638 {
639 return std::unique_ptr<RGWObject::ReadOp>(new RGWRadosObject::RadosReadOp(this, ctx));
640 }
641
642 RGWRadosObject::RadosReadOp::RadosReadOp(RGWRadosObject *_source, RGWObjectCtx *_rctx) :
643 source(_source),
644 rctx(_rctx),
645 op_target(_source->store->getRados(),
646 _source->get_bucket()->get_info(),
647 *static_cast<RGWObjectCtx *>(rctx),
648 _source->get_obj()),
649 parent_op(&op_target)
650 { }
651
652 int RGWRadosObject::RadosReadOp::prepare(optional_yield y)
653 {
654 uint64_t obj_size;
655
656 parent_op.conds.mod_ptr = params.mod_ptr;
657 parent_op.conds.unmod_ptr = params.unmod_ptr;
658 parent_op.conds.high_precision_time = params.high_precision_time;
659 parent_op.conds.mod_zone_id = params.mod_zone_id;
660 parent_op.conds.mod_pg_ver = params.mod_pg_ver;
661 parent_op.conds.if_match = params.if_match;
662 parent_op.conds.if_nomatch = params.if_nomatch;
663 parent_op.params.lastmod = params.lastmod;
664 parent_op.params.target_obj = params.target_obj;
665 parent_op.params.obj_size = &obj_size;
666 parent_op.params.attrs = &source->get_attrs();
667
668 int ret = parent_op.prepare(y);
669 if (ret < 0)
670 return ret;
671
672 source->set_key(parent_op.state.obj.key);
673 source->set_obj_size(obj_size);
674 result.head_obj = parent_op.state.head_obj;
675
676 return ret;
677 }
678
679 int RGWRadosObject::RadosReadOp::read(int64_t ofs, int64_t end, bufferlist& bl, optional_yield y)
680 {
681 return parent_op.read(ofs, end, bl, y);
682 }
683
684 int RGWRadosObject::RadosReadOp::get_manifest(RGWObjManifest **pmanifest,
685 optional_yield y)
686 {
687 return op_target.get_manifest(pmanifest, y);
688 }
689
690 int RGWRadosObject::RadosReadOp::get_attr(const char *name, bufferlist& dest, optional_yield y)
691 {
692 return parent_op.get_attr(name, dest, y);
693 }
694
695 int RGWRadosObject::delete_object(RGWObjectCtx* obj_ctx, ACLOwner obj_owner, ACLOwner bucket_owner, ceph::real_time unmod_since, bool high_precision_time, uint64_t epoch, string& version_id, optional_yield y)
696 {
697 int ret = 0;
698 RGWRados::Object del_target(store->getRados(), bucket->get_info(), *obj_ctx, get_obj());
699 RGWRados::Object::Delete del_op(&del_target);
700
701 del_op.params.olh_epoch = epoch;
702 del_op.params.marker_version_id = version_id;
703 del_op.params.bucket_owner = bucket_owner.get_id();
704 del_op.params.versioning_status = bucket->get_info().versioning_status();
705 del_op.params.obj_owner = obj_owner;
706 del_op.params.unmod_since = unmod_since;
707 del_op.params.high_precision_time = high_precision_time;
708
709 ret = del_op.delete_obj(y);
710 if (ret >= 0) {
711 delete_marker = del_op.result.delete_marker;
712 version_id = del_op.result.version_id;
713 }
714
715 return ret;
716 }
717
718 int RGWRadosObject::copy_object(RGWObjectCtx& obj_ctx,
719 RGWUser* user,
720 req_info *info,
721 const rgw_zone_id& source_zone,
722 rgw::sal::RGWObject* dest_object,
723 rgw::sal::RGWBucket* dest_bucket,
724 rgw::sal::RGWBucket* src_bucket,
725 const rgw_placement_rule& dest_placement,
726 ceph::real_time *src_mtime,
727 ceph::real_time *mtime,
728 const ceph::real_time *mod_ptr,
729 const ceph::real_time *unmod_ptr,
730 bool high_precision_time,
731 const char *if_match,
732 const char *if_nomatch,
733 AttrsMod attrs_mod,
734 bool copy_if_newer,
735 RGWAttrs& attrs,
736 RGWObjCategory category,
737 uint64_t olh_epoch,
738 boost::optional<ceph::real_time> delete_at,
739 string *version_id,
740 string *tag,
741 string *etag,
742 void (*progress_cb)(off_t, void *),
743 void *progress_data,
744 const DoutPrefixProvider *dpp,
745 optional_yield y)
746 {
747 return store->getRados()->copy_obj(obj_ctx,
748 user->get_id(),
749 info,
750 source_zone,
751 dest_object,
752 this,
753 dest_bucket,
754 src_bucket,
755 dest_placement,
756 src_mtime,
757 mtime,
758 mod_ptr,
759 unmod_ptr,
760 high_precision_time,
761 if_match,
762 if_nomatch,
763 static_cast<RGWRados::AttrsMod>(attrs_mod),
764 copy_if_newer,
765 attrs,
766 category,
767 olh_epoch,
768 (delete_at ? *delete_at : real_time()),
769 version_id,
770 tag,
771 etag,
772 progress_cb,
773 progress_data,
774 dpp,
775 y);
776 }
777
778 int RGWRadosObject::RadosReadOp::iterate(int64_t ofs, int64_t end, RGWGetDataCB *cb, optional_yield y)
779 {
780 return parent_op.iterate(ofs, end, cb, y);
781 }
782
783 std::unique_ptr<RGWObject::WriteOp> RGWRadosObject::get_write_op(RGWObjectCtx* ctx)
784 {
785 return std::unique_ptr<RGWObject::WriteOp>(new RGWRadosObject::RadosWriteOp(this, ctx));
786 }
787
788 RGWRadosObject::RadosWriteOp::RadosWriteOp(RGWRadosObject* _source, RGWObjectCtx* _rctx) :
789 source(_source),
790 rctx(_rctx),
791 op_target(_source->store->getRados(),
792 _source->get_bucket()->get_info(),
793 *static_cast<RGWObjectCtx *>(rctx),
794 _source->get_obj()),
795 parent_op(&op_target)
796 { }
797
798 int RGWRadosObject::RadosWriteOp::prepare(optional_yield y)
799 {
800 op_target.set_versioning_disabled(params.versioning_disabled);
801 parent_op.meta.mtime = params.mtime;
802 parent_op.meta.rmattrs = params.rmattrs;
803 parent_op.meta.data = params.data;
804 parent_op.meta.manifest = params.manifest;
805 parent_op.meta.ptag = params.ptag;
806 parent_op.meta.remove_objs = params.remove_objs;
807 parent_op.meta.set_mtime = params.set_mtime;
808 parent_op.meta.owner = params.owner.get_id();
809 parent_op.meta.category = params.category;
810 parent_op.meta.flags = params.flags;
811 parent_op.meta.if_match = params.if_match;
812 parent_op.meta.if_nomatch = params.if_nomatch;
813 parent_op.meta.olh_epoch = params.olh_epoch;
814 parent_op.meta.delete_at = params.delete_at;
815 parent_op.meta.canceled = params.canceled;
816 parent_op.meta.user_data = params.user_data;
817 parent_op.meta.zones_trace = params.zones_trace;
818 parent_op.meta.modify_tail = params.modify_tail;
819 parent_op.meta.completeMultipart = params.completeMultipart;
820 parent_op.meta.appendable = params.appendable;
821
822 return 0;
823 }
824
825 int RGWRadosObject::RadosWriteOp::write_meta(uint64_t size, uint64_t accounted_size, optional_yield y)
826 {
827 int ret = parent_op.write_meta(size, accounted_size, *params.attrs, y);
828 params.canceled = parent_op.meta.canceled;
829
830 return ret;
831 }
832
833 int RGWRadosObject::swift_versioning_restore(RGWObjectCtx* obj_ctx,
834 bool& restored,
835 const DoutPrefixProvider *dpp)
836 {
837 return store->getRados()->swift_versioning_restore(*obj_ctx,
838 bucket->get_owner()->get_id(),
839 bucket,
840 this,
841 restored,
842 dpp);
843 }
844
845 int RGWRadosObject::swift_versioning_copy(RGWObjectCtx* obj_ctx,
846 const DoutPrefixProvider *dpp,
847 optional_yield y)
848 {
849 return store->getRados()->swift_versioning_copy(*obj_ctx,
850 bucket->get_info().owner,
851 bucket,
852 this,
853 dpp,
854 y);
855 }
856
857 int RGWRadosStore::get_bucket(RGWUser* u, const rgw_bucket& b, std::unique_ptr<RGWBucket>* bucket, optional_yield y)
858 {
859 int ret;
860 RGWBucket* bp;
861
862 bp = new RGWRadosBucket(this, b, u);
863 ret = bp->get_bucket_info(y);
864 if (ret < 0) {
865 delete bp;
866 return ret;
867 }
868
869 bucket->reset(bp);
870 return 0;
871 }
872
873 int RGWRadosStore::get_bucket(RGWUser* u, const RGWBucketInfo& i, std::unique_ptr<RGWBucket>* bucket)
874 {
875 RGWBucket* bp;
876
877 bp = new RGWRadosBucket(this, i, u);
878 /* Don't need to fetch the bucket info, use the provided one */
879
880 bucket->reset(bp);
881 return 0;
882 }
883
884 int RGWRadosStore::get_bucket(RGWUser* u, const std::string& tenant, const std::string&name, std::unique_ptr<RGWBucket>* bucket, optional_yield y)
885 {
886 rgw_bucket b;
887
888 b.tenant = tenant;
889 b.name = name;
890
891 return get_bucket(u, b, bucket, y);
892 }
893
894 static int decode_policy(CephContext *cct,
895 bufferlist& bl,
896 RGWAccessControlPolicy *policy)
897 {
898 auto iter = bl.cbegin();
899 try {
900 policy->decode(iter);
901 } catch (buffer::error& err) {
902 ldout(cct, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl;
903 return -EIO;
904 }
905 if (cct->_conf->subsys.should_gather<ceph_subsys_rgw, 15>()) {
906 ldout(cct, 15) << __func__ << " Read AccessControlPolicy";
907 RGWAccessControlPolicy_S3 *s3policy = static_cast<RGWAccessControlPolicy_S3 *>(policy);
908 s3policy->to_xml(*_dout);
909 *_dout << dendl;
910 }
911 return 0;
912 }
913
914 static int rgw_op_get_bucket_policy_from_attr(RGWRadosStore *store,
915 RGWUser& user,
916 RGWAttrs& bucket_attrs,
917 RGWAccessControlPolicy *policy,
918 optional_yield y)
919 {
920 auto aiter = bucket_attrs.find(RGW_ATTR_ACL);
921
922 if (aiter != bucket_attrs.end()) {
923 int ret = decode_policy(store->ctx(), aiter->second, policy);
924 if (ret < 0)
925 return ret;
926 } else {
927 ldout(store->ctx(), 0) << "WARNING: couldn't find acl header for bucket, generating default" << dendl;
928 /* object exists, but policy is broken */
929 int r = user.load_by_id(y);
930 if (r < 0)
931 return r;
932
933 policy->create_default(user.get_user(), user.get_display_name());
934 }
935 return 0;
936 }
937
938 bool RGWRadosStore::is_meta_master()
939 {
940 return svc()->zone->is_meta_master();
941 }
942
943 int RGWRadosStore::forward_request_to_master(RGWUser* user, obj_version *objv,
944 bufferlist& in_data,
945 JSONParser *jp, req_info& info,
946 optional_yield y)
947 {
948 if (is_meta_master()) {
949 /* We're master, don't forward */
950 return 0;
951 }
952
953 if (!svc()->zone->get_master_conn()) {
954 ldout(ctx(), 0) << "rest connection is invalid" << dendl;
955 return -EINVAL;
956 }
957 ldout(ctx(), 0) << "sending request to master zonegroup" << dendl;
958 bufferlist response;
959 string uid_str = user->get_id().to_str();
960 #define MAX_REST_RESPONSE (128 * 1024) // we expect a very small response
961 int ret = svc()->zone->get_master_conn()->forward(rgw_user(uid_str), info,
962 objv, MAX_REST_RESPONSE,
963 &in_data, &response, y);
964 if (ret < 0)
965 return ret;
966
967 ldout(ctx(), 20) << "response: " << response.c_str() << dendl;
968 if (jp && !jp->parse(response.c_str(), response.length())) {
969 ldout(ctx(), 0) << "failed parsing response from master zonegroup" << dendl;
970 return -EINVAL;
971 }
972
973 return 0;
974 }
975
976 int RGWRadosStore::defer_gc(RGWObjectCtx *rctx, RGWBucket* bucket, RGWObject* obj, optional_yield y)
977 {
978 return rados->defer_gc(rctx, bucket->get_info(), obj->get_obj(), y);
979 }
980
981 const RGWZoneGroup& RGWRadosStore::get_zonegroup()
982 {
983 return rados->svc.zone->get_zonegroup();
984 }
985
986 int RGWRadosStore::get_zonegroup(const string& id, RGWZoneGroup& zonegroup)
987 {
988 return rados->svc.zone->get_zonegroup(id, zonegroup);
989 }
990
991 int RGWRadosStore::cluster_stat(RGWClusterStat& stats)
992 {
993 rados_cluster_stat_t rados_stats;
994 int ret;
995
996 ret = rados->get_rados_handle()->cluster_stat(rados_stats);
997 if (ret < 0)
998 return ret;
999
1000 stats.kb = rados_stats.kb;
1001 stats.kb_used = rados_stats.kb_used;
1002 stats.kb_avail = rados_stats.kb_avail;
1003 stats.num_objects = rados_stats.num_objects;
1004
1005 return ret;
1006 }
1007
1008 int RGWRadosStore::create_bucket(RGWUser& u, const rgw_bucket& b,
1009 const string& zonegroup_id,
1010 rgw_placement_rule& placement_rule,
1011 string& swift_ver_location,
1012 const RGWQuotaInfo * pquota_info,
1013 const RGWAccessControlPolicy& policy,
1014 RGWAttrs& attrs,
1015 RGWBucketInfo& info,
1016 obj_version& ep_objv,
1017 bool exclusive,
1018 bool obj_lock_enabled,
1019 bool *existed,
1020 req_info& req_info,
1021 std::unique_ptr<RGWBucket>* bucket_out,
1022 optional_yield y)
1023 {
1024 int ret;
1025 bufferlist in_data;
1026 RGWBucketInfo master_info;
1027 rgw_bucket *pmaster_bucket;
1028 uint32_t *pmaster_num_shards;
1029 real_time creation_time;
1030 std::unique_ptr<RGWBucket> bucket;
1031 obj_version objv, *pobjv = NULL;
1032
1033 /* If it exists, look it up; otherwise create it */
1034 ret = get_bucket(&u, b, &bucket, y);
1035 if (ret < 0 && ret != -ENOENT)
1036 return ret;
1037
1038 if (ret != -ENOENT) {
1039 RGWAccessControlPolicy old_policy(ctx());
1040 *existed = true;
1041 if (swift_ver_location.empty()) {
1042 swift_ver_location = bucket->get_info().swift_ver_location;
1043 }
1044 placement_rule.inherit_from(bucket->get_info().placement_rule);
1045
1046 // don't allow changes to the acl policy
1047 int r = rgw_op_get_bucket_policy_from_attr(this, u, bucket->get_attrs(),
1048 &old_policy, y);
1049 if (r >= 0 && old_policy != policy) {
1050 bucket_out->swap(bucket);
1051 return -EEXIST;
1052 }
1053 } else {
1054 bucket = std::unique_ptr<RGWBucket>(new RGWRadosBucket(this, b, &u));
1055 *existed = false;
1056 bucket->set_attrs(attrs);
1057 }
1058
1059 if (!svc()->zone->is_meta_master()) {
1060 JSONParser jp;
1061 ret = forward_request_to_master(&u, NULL, in_data, &jp, req_info, y);
1062 if (ret < 0) {
1063 return ret;
1064 }
1065
1066 JSONDecoder::decode_json("entry_point_object_ver", ep_objv, &jp);
1067 JSONDecoder::decode_json("object_ver", objv, &jp);
1068 JSONDecoder::decode_json("bucket_info", master_info, &jp);
1069 ldpp_dout(this, 20) << "parsed: objv.tag=" << objv.tag << " objv.ver=" << objv.ver << dendl;
1070 std::time_t ctime = ceph::real_clock::to_time_t(master_info.creation_time);
1071 ldpp_dout(this, 20) << "got creation time: << " << std::put_time(std::localtime(&ctime), "%F %T") << dendl;
1072 pmaster_bucket= &master_info.bucket;
1073 creation_time = master_info.creation_time;
1074 pmaster_num_shards = &master_info.layout.current_index.layout.normal.num_shards;
1075 pobjv = &objv;
1076 if (master_info.obj_lock_enabled()) {
1077 info.flags = BUCKET_VERSIONED | BUCKET_OBJ_LOCK_ENABLED;
1078 }
1079 } else {
1080 pmaster_bucket = NULL;
1081 pmaster_num_shards = NULL;
1082 if (obj_lock_enabled)
1083 info.flags = BUCKET_VERSIONED | BUCKET_OBJ_LOCK_ENABLED;
1084 }
1085
1086 std::string zid = zonegroup_id;
1087 if (zid.empty()) {
1088 zid = svc()->zone->get_zonegroup().get_id();
1089 }
1090
1091 if (*existed) {
1092 rgw_placement_rule selected_placement_rule;
1093 ret = svc()->zone->select_bucket_placement(u.get_info(),
1094 zid, placement_rule,
1095 &selected_placement_rule, nullptr, y);
1096 if (selected_placement_rule != info.placement_rule) {
1097 ret = -EEXIST;
1098 bucket_out->swap(bucket);
1099 return ret;
1100 }
1101 } else {
1102
1103 ret = getRados()->create_bucket(u.get_info(), bucket->get_key(),
1104 zid, placement_rule, swift_ver_location,
1105 pquota_info, attrs,
1106 info, pobjv, &ep_objv, creation_time,
1107 pmaster_bucket, pmaster_num_shards, y, exclusive);
1108 if (ret == -EEXIST) {
1109 *existed = true;
1110 ret = 0;
1111 } else if (ret != 0) {
1112 return ret;
1113 }
1114 }
1115
1116 bucket->set_version(ep_objv);
1117 bucket->get_info() = info;
1118
1119 bucket_out->swap(bucket);
1120
1121 return ret;
1122 }
1123
1124 std::unique_ptr<Lifecycle> RGWRadosStore::get_lifecycle(void)
1125 {
1126 return std::unique_ptr<Lifecycle>(new RadosLifecycle(this));
1127 }
1128
1129 int RGWRadosStore::delete_raw_obj(const rgw_raw_obj& obj)
1130 {
1131 return rados->delete_raw_obj(obj);
1132 }
1133
1134 void RGWRadosStore::get_raw_obj(const rgw_placement_rule& placement_rule, const rgw_obj& obj, rgw_raw_obj* raw_obj)
1135 {
1136 rados->obj_to_raw(placement_rule, obj, raw_obj);
1137 }
1138
1139 int RGWRadosStore::get_raw_chunk_size(const rgw_raw_obj& obj, uint64_t* chunk_size)
1140 {
1141 return rados->get_max_chunk_size(obj.pool, chunk_size);
1142 }
1143
1144 MPRadosSerializer::MPRadosSerializer(RGWRadosStore* store, RGWRadosObject* obj, const std::string& lock_name) :
1145 lock(lock_name)
1146 {
1147 rgw_pool meta_pool;
1148 rgw_raw_obj raw_obj;
1149
1150 obj->get_raw_obj(&raw_obj);
1151 oid = raw_obj.oid;
1152 store->getRados()->get_obj_data_pool(obj->get_bucket()->get_placement_rule(),
1153 obj->get_obj(), &meta_pool);
1154 store->getRados()->open_pool_ctx(meta_pool, ioctx, true);
1155 }
1156
1157 int MPRadosSerializer::try_lock(utime_t dur, optional_yield y)
1158 {
1159 op.assert_exists();
1160 lock.set_duration(dur);
1161 lock.lock_exclusive(&op);
1162 int ret = rgw_rados_operate(ioctx, oid, &op, y);
1163 if (! ret) {
1164 locked = true;
1165 }
1166 return ret;
1167 }
1168
1169 LCRadosSerializer::LCRadosSerializer(RGWRadosStore* store, const std::string& _oid, const std::string& lock_name, const std::string& cookie) :
1170 lock(lock_name), oid(_oid)
1171 {
1172 ioctx = &store->getRados()->lc_pool_ctx;
1173 lock.set_cookie(cookie);
1174 }
1175
1176 int LCRadosSerializer::try_lock(utime_t dur, optional_yield y)
1177 {
1178 lock.set_duration(dur);
1179 return lock.lock_exclusive(ioctx, oid);
1180 }
1181
1182 int RadosLifecycle::get_entry(const string& oid, const std::string& marker,
1183 LCEntry& entry)
1184 {
1185 cls_rgw_lc_entry cls_entry;
1186 int ret = cls_rgw_lc_get_entry(*store->getRados()->get_lc_pool_ctx(), oid, marker, cls_entry);
1187
1188 entry.bucket = cls_entry.bucket;
1189 entry.start_time = cls_entry.start_time;
1190 entry.status = cls_entry.status;
1191
1192 return ret;
1193 }
1194
1195 int RadosLifecycle::get_next_entry(const string& oid, std::string& marker,
1196 LCEntry& entry)
1197 {
1198 cls_rgw_lc_entry cls_entry;
1199 int ret = cls_rgw_lc_get_next_entry(*store->getRados()->get_lc_pool_ctx(), oid, marker,
1200 cls_entry);
1201
1202 entry.bucket = cls_entry.bucket;
1203 entry.start_time = cls_entry.start_time;
1204 entry.status = cls_entry.status;
1205
1206 return ret;
1207 }
1208
1209 int RadosLifecycle::set_entry(const string& oid, const LCEntry& entry)
1210 {
1211 cls_rgw_lc_entry cls_entry;
1212
1213 cls_entry.bucket = entry.bucket;
1214 cls_entry.start_time = entry.start_time;
1215 cls_entry.status = entry.status;
1216
1217 return cls_rgw_lc_set_entry(*store->getRados()->get_lc_pool_ctx(), oid, cls_entry);
1218 }
1219
1220 int RadosLifecycle::list_entries(const string& oid, const string& marker,
1221 uint32_t max_entries, vector<LCEntry>& entries)
1222 {
1223 entries.clear();
1224
1225 vector<cls_rgw_lc_entry> cls_entries;
1226 int ret = cls_rgw_lc_list(*store->getRados()->get_lc_pool_ctx(), oid, marker, max_entries, cls_entries);
1227
1228 if (ret < 0)
1229 return ret;
1230
1231 for (auto& entry : cls_entries) {
1232 entries.push_back(LCEntry(entry.bucket, entry.start_time, entry.status));
1233 }
1234
1235 return ret;
1236 }
1237
1238 int RadosLifecycle::rm_entry(const string& oid, const LCEntry& entry)
1239 {
1240 cls_rgw_lc_entry cls_entry;
1241
1242 cls_entry.bucket = entry.bucket;
1243 cls_entry.start_time = entry.start_time;
1244 cls_entry.status = entry.status;
1245
1246 return cls_rgw_lc_rm_entry(*store->getRados()->get_lc_pool_ctx(), oid, cls_entry);
1247 }
1248
1249 int RadosLifecycle::get_head(const string& oid, LCHead& head)
1250 {
1251 cls_rgw_lc_obj_head cls_head;
1252 int ret = cls_rgw_lc_get_head(*store->getRados()->get_lc_pool_ctx(), oid, cls_head);
1253
1254 head.marker = cls_head.marker;
1255 head.start_date = cls_head.start_date;
1256
1257 return ret;
1258 }
1259
1260 int RadosLifecycle::put_head(const string& oid, const LCHead& head)
1261 {
1262 cls_rgw_lc_obj_head cls_head;
1263
1264 cls_head.marker = head.marker;
1265 cls_head.start_date = head.start_date;
1266
1267 return cls_rgw_lc_put_head(*store->getRados()->get_lc_pool_ctx(), oid, cls_head);
1268 }
1269
1270 LCSerializer* RadosLifecycle::get_serializer(const std::string& lock_name, const std::string& oid, const std::string& cookie)
1271 {
1272 return new LCRadosSerializer(store, oid, lock_name, cookie);
1273 }
1274
1275 } // namespace rgw::sal
1276
1277 rgw::sal::RGWRadosStore *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread, bool use_cache)
1278 {
1279 RGWRados *rados = new RGWRados;
1280 rgw::sal::RGWRadosStore *store = new rgw::sal::RGWRadosStore();
1281
1282 store->setRados(rados);
1283 rados->set_store(store);
1284
1285 if ((*rados).set_use_cache(use_cache)
1286 .set_run_gc_thread(use_gc_thread)
1287 .set_run_lc_thread(use_lc_thread)
1288 .set_run_quota_threads(quota_threads)
1289 .set_run_sync_thread(run_sync_thread)
1290 .set_run_reshard_thread(run_reshard_thread)
1291 .initialize(cct) < 0) {
1292 delete store;
1293 return NULL;
1294 }
1295
1296 return store;
1297 }
1298
1299 rgw::sal::RGWRadosStore *RGWStoreManager::init_raw_storage_provider(CephContext *cct)
1300 {
1301 RGWRados *rados = new RGWRados;
1302 rgw::sal::RGWRadosStore *store = new rgw::sal::RGWRadosStore();
1303
1304 store->setRados(rados);
1305 rados->set_store(store);
1306
1307 rados->set_context(cct);
1308
1309 int ret = rados->init_svc(true);
1310 if (ret < 0) {
1311 ldout(cct, 0) << "ERROR: failed to init services (ret=" << cpp_strerror(-ret) << ")" << dendl;
1312 delete store;
1313 return nullptr;
1314 }
1315
1316 if (rados->init_rados() < 0) {
1317 delete store;
1318 return nullptr;
1319 }
1320
1321 return store;
1322 }
1323
1324 int rgw::sal::RGWRadosStore::get_obj_head_ioctx(const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::IoCtx *ioctx)
1325 {
1326 return rados->get_obj_head_ioctx(bucket_info, obj, ioctx);
1327 }
1328
1329 void RGWStoreManager::close_storage(rgw::sal::RGWRadosStore *store)
1330 {
1331 if (!store)
1332 return;
1333
1334 store->finalize();
1335
1336 delete store;
1337 }