]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sal_dbstore.cc
f646d94357dfa1aaa3170be62518108dadc6eb67
[ceph.git] / ceph / src / rgw / rgw_sal_dbstore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 /*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2021 Red Hat, Inc.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #include <errno.h>
17 #include <stdlib.h>
18 #include <system_error>
19 #include <unistd.h>
20 #include <sstream>
21
22 #include "common/Clock.h"
23 #include "common/errno.h"
24
25 #include "rgw_sal.h"
26 #include "rgw_sal_dbstore.h"
27 #include "rgw_bucket.h"
28
29 #define dout_subsys ceph_subsys_rgw
30
31 namespace rgw::sal {
32
33 int DBUser::list_buckets(const DoutPrefixProvider *dpp, const string& marker,
34 const string& end_marker, uint64_t max, bool need_stats,
35 BucketList &buckets, optional_yield y)
36 {
37 RGWUserBuckets ulist;
38 bool is_truncated = false;
39 int ret;
40
41 buckets.clear();
42 ret = store->getDB()->list_buckets(dpp, info.user_id, marker, end_marker, max,
43 need_stats, &ulist, &is_truncated);
44 if (ret < 0)
45 return ret;
46
47 buckets.set_truncated(is_truncated);
48 for (const auto& ent : ulist.get_buckets()) {
49 buckets.add(std::make_unique<DBBucket>(this->store, ent.second, this));
50 }
51
52 return 0;
53 }
54
55 int DBUser::create_bucket(const DoutPrefixProvider *dpp,
56 const rgw_bucket& b,
57 const string& zonegroup_id,
58 rgw_placement_rule& placement_rule,
59 string& swift_ver_location,
60 const RGWQuotaInfo * pquota_info,
61 const RGWAccessControlPolicy& policy,
62 Attrs& attrs,
63 RGWBucketInfo& info,
64 obj_version& ep_objv,
65 bool exclusive,
66 bool obj_lock_enabled,
67 bool *existed,
68 req_info& req_info,
69 std::unique_ptr<Bucket>* bucket_out,
70 optional_yield y)
71 {
72 int ret;
73 bufferlist in_data;
74 RGWBucketInfo master_info;
75 rgw_bucket *pmaster_bucket = nullptr;
76 uint32_t *pmaster_num_shards = nullptr;
77 real_time creation_time;
78 std::unique_ptr<Bucket> bucket;
79 obj_version objv, *pobjv = NULL;
80
81 /* If it exists, look it up; otherwise create it */
82 ret = store->get_bucket(dpp, this, b, &bucket, y);
83 if (ret < 0 && ret != -ENOENT)
84 return ret;
85
86 if (ret != -ENOENT) {
87 RGWAccessControlPolicy old_policy(store->ctx());
88 *existed = true;
89 if (swift_ver_location.empty()) {
90 swift_ver_location = bucket->get_info().swift_ver_location;
91 }
92 placement_rule.inherit_from(bucket->get_info().placement_rule);
93
94 // don't allow changes to the acl policy
95 /* int r = rgw_op_get_bucket_policy_from_attr(dpp, this, this, bucket->get_attrs(),
96 &old_policy, y);
97 if (r >= 0 && old_policy != policy) {
98 bucket_out->swap(bucket);
99 return -EEXIST;
100 }*/
101 } else {
102 bucket = std::make_unique<DBBucket>(store, b, this);
103 *existed = false;
104 bucket->set_attrs(attrs);
105 // XXX: For now single default zone and STANDARD storage class
106 // supported.
107 placement_rule.name = "default";
108 placement_rule.storage_class = "STANDARD";
109 }
110
111 /*
112 * XXX: If not master zone, fwd the request to master zone.
113 * For now DBStore has single zone.
114 */
115 std::string zid = zonegroup_id;
116 /* if (zid.empty()) {
117 zid = svc()->zone->get_zonegroup().get_id();
118 } */
119
120 if (*existed) {
121 rgw_placement_rule selected_placement_rule;
122 /* XXX: Handle this when zone is implemented
123 ret = svc()->zone->select_bucket_placement(this.get_info(),
124 zid, placement_rule,
125 &selected_placement_rule, nullptr, y);
126 if (selected_placement_rule != info.placement_rule) {
127 ret = -EEXIST;
128 bucket_out->swap(bucket);
129 return ret;
130 } */
131 } else {
132
133 /* XXX: We may not need to send all these params. Cleanup the unused ones */
134 ret = store->getDB()->create_bucket(dpp, this->get_info(), bucket->get_key(),
135 zid, placement_rule, swift_ver_location, pquota_info,
136 attrs, info, pobjv, &ep_objv, creation_time,
137 pmaster_bucket, pmaster_num_shards, y, exclusive);
138 if (ret == -EEXIST) {
139 *existed = true;
140 ret = 0;
141 } else if (ret != 0) {
142 return ret;
143 }
144 }
145
146 bucket->set_version(ep_objv);
147 bucket->get_info() = info;
148
149 bucket_out->swap(bucket);
150
151 return ret;
152 }
153
154 int DBUser::read_attrs(const DoutPrefixProvider* dpp, optional_yield y)
155 {
156 int ret;
157 ret = store->getDB()->get_user(dpp, string("user_id"), "", info, &attrs,
158 &objv_tracker);
159 return ret;
160 }
161
162 int DBUser::read_stats(const DoutPrefixProvider *dpp,
163 optional_yield y, RGWStorageStats* stats,
164 ceph::real_time *last_stats_sync,
165 ceph::real_time *last_stats_update)
166 {
167 return 0;
168 }
169
170 /* stats - Not for first pass */
171 int DBUser::read_stats_async(const DoutPrefixProvider *dpp, RGWGetUserStats_CB *cb)
172 {
173 return 0;
174 }
175
176 int DBUser::complete_flush_stats(const DoutPrefixProvider *dpp, optional_yield y)
177 {
178 return 0;
179 }
180
181 int DBUser::read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
182 bool *is_truncated, RGWUsageIter& usage_iter,
183 map<rgw_user_bucket, rgw_usage_log_entry>& usage)
184 {
185 return 0;
186 }
187
188 int DBUser::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch)
189 {
190 return 0;
191 }
192
193 int DBUser::load_user(const DoutPrefixProvider *dpp, optional_yield y)
194 {
195 int ret = 0;
196
197 ret = store->getDB()->get_user(dpp, string("user_id"), "", info, &attrs,
198 &objv_tracker);
199
200 return ret;
201 }
202 int DBUser::merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y)
203 {
204 for(auto& it : new_attrs) {
205 attrs[it.first] = it.second;
206 }
207 return store_user(dpp, y, false);
208 }
209 int DBUser::store_user(const DoutPrefixProvider* dpp, optional_yield y, bool exclusive, RGWUserInfo* old_info)
210 {
211 int ret = 0;
212
213 ret = store->getDB()->store_user(dpp, info, exclusive, &attrs, &objv_tracker, old_info);
214
215 return ret;
216 }
217
218 int DBUser::remove_user(const DoutPrefixProvider* dpp, optional_yield y)
219 {
220 int ret = 0;
221
222 ret = store->getDB()->remove_user(dpp, info, &objv_tracker);
223
224 return ret;
225 }
226
227 int DBBucket::remove_bucket(const DoutPrefixProvider *dpp, bool delete_children, bool forward_to_master, req_info* req_info, optional_yield y)
228 {
229 int ret;
230
231 ret = load_bucket(dpp, y);
232 if (ret < 0)
233 return ret;
234
235 /* XXX: handle delete_children */
236
237 if (!delete_children) {
238 /* Check if there are any objects */
239 rgw::sal::Bucket::ListParams params;
240 params.list_versions = true;
241 params.allow_unordered = true;
242
243 rgw::sal::Bucket::ListResults results;
244
245 results.objs.clear();
246
247 ret = list(dpp, params, 2, results, null_yield);
248
249 if (ret < 0) {
250 ldpp_dout(dpp, 20) << __func__ << ": Bucket list objects returned " <<
251 ret << dendl;
252 return ret;
253 }
254
255 if (!results.objs.empty()) {
256 ret = -ENOTEMPTY;
257 ldpp_dout(dpp, -1) << __func__ << ": Bucket Not Empty.. returning " <<
258 ret << dendl;
259 return ret;
260 }
261 }
262
263 ret = store->getDB()->remove_bucket(dpp, info);
264
265 return ret;
266 }
267
268 int DBBucket::remove_bucket_bypass_gc(int concurrent_max, bool
269 keep_index_consistent,
270 optional_yield y, const
271 DoutPrefixProvider *dpp) {
272 return 0;
273 }
274
275 int DBBucket::load_bucket(const DoutPrefixProvider *dpp, optional_yield y, bool get_stats)
276 {
277 int ret = 0;
278
279 ret = store->getDB()->get_bucket_info(dpp, string("name"), "", info, &attrs,
280 &mtime, &bucket_version);
281
282 return ret;
283 }
284
285 /* stats - Not for first pass */
286 int DBBucket::read_stats(const DoutPrefixProvider *dpp, int shard_id,
287 std::string *bucket_ver, std::string *master_ver,
288 std::map<RGWObjCategory, RGWStorageStats>& stats,
289 std::string *max_marker, bool *syncstopped)
290 {
291 return 0;
292 }
293
294 int DBBucket::read_stats_async(const DoutPrefixProvider *dpp, int shard_id, RGWGetBucketStats_CB *ctx)
295 {
296 return 0;
297 }
298
299 int DBBucket::sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y)
300 {
301 return 0;
302 }
303
304 int DBBucket::update_container_stats(const DoutPrefixProvider *dpp)
305 {
306 return 0;
307 }
308
309 int DBBucket::check_bucket_shards(const DoutPrefixProvider *dpp)
310 {
311 return 0;
312 }
313
314 int DBBucket::chown(const DoutPrefixProvider *dpp, User* new_user, User* old_user, optional_yield y, const std::string* marker)
315 {
316 int ret;
317
318 ret = store->getDB()->update_bucket(dpp, "owner", info, false, &(new_user->get_id()), nullptr, nullptr, nullptr);
319
320 /* XXX: Update policies of all the bucket->objects with new user */
321 return ret;
322 }
323
324 int DBBucket::put_info(const DoutPrefixProvider *dpp, bool exclusive, ceph::real_time _mtime)
325 {
326 int ret;
327
328 ret = store->getDB()->update_bucket(dpp, "info", info, exclusive, nullptr, nullptr, &_mtime, &info.objv_tracker);
329
330 return ret;
331
332 }
333
334 /* Make sure to call get_bucket_info() if you need it first */
335 bool DBBucket::is_owner(User* user)
336 {
337 return (info.owner.compare(user->get_id()) == 0);
338 }
339
340 int DBBucket::check_empty(const DoutPrefixProvider *dpp, optional_yield y)
341 {
342 /* XXX: Check if bucket contains any objects */
343 return 0;
344 }
345
346 int DBBucket::check_quota(const DoutPrefixProvider *dpp, RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t obj_size,
347 optional_yield y, bool check_size_only)
348 {
349 /* Not Handled in the first pass as stats are also needed */
350 return 0;
351 }
352
353 int DBBucket::merge_and_store_attrs(const DoutPrefixProvider *dpp, Attrs& new_attrs, optional_yield y)
354 {
355 int ret = 0;
356
357 for(auto& it : new_attrs) {
358 attrs[it.first] = it.second;
359 }
360
361 /* XXX: handle has_instance_obj like in set_bucket_instance_attrs() */
362
363 ret = store->getDB()->update_bucket(dpp, "attrs", info, false, nullptr, &new_attrs, nullptr, &get_info().objv_tracker);
364
365 return ret;
366 }
367
368 int DBBucket::try_refresh_info(const DoutPrefixProvider *dpp, ceph::real_time *pmtime)
369 {
370 int ret = 0;
371
372 ret = store->getDB()->get_bucket_info(dpp, string("name"), "", info, &attrs,
373 pmtime, &bucket_version);
374
375 return ret;
376 }
377
378 /* XXX: usage and stats not supported in the first pass */
379 int DBBucket::read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch,
380 uint32_t max_entries, bool *is_truncated,
381 RGWUsageIter& usage_iter,
382 map<rgw_user_bucket, rgw_usage_log_entry>& usage)
383 {
384 return 0;
385 }
386
387 int DBBucket::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch)
388 {
389 return 0;
390 }
391
392 int DBBucket::remove_objs_from_index(const DoutPrefixProvider *dpp, std::list<rgw_obj_index_key>& objs_to_unlink)
393 {
394 /* XXX: CHECK: Unlike RadosStore, there is no seperate bucket index table.
395 * Delete all the object in the list from the object table of this
396 * bucket
397 */
398 return 0;
399 }
400
401 int DBBucket::check_index(const DoutPrefixProvider *dpp, std::map<RGWObjCategory, RGWStorageStats>& existing_stats, std::map<RGWObjCategory, RGWStorageStats>& calculated_stats)
402 {
403 /* XXX: stats not supported yet */
404 return 0;
405 }
406
407 int DBBucket::rebuild_index(const DoutPrefixProvider *dpp)
408 {
409 /* there is no index table in dbstore. Not applicable */
410 return 0;
411 }
412
413 int DBBucket::set_tag_timeout(const DoutPrefixProvider *dpp, uint64_t timeout)
414 {
415 /* XXX: CHECK: set tag timeout for all the bucket objects? */
416 return 0;
417 }
418
419 int DBBucket::purge_instance(const DoutPrefixProvider *dpp)
420 {
421 /* XXX: CHECK: for dbstore only single instance supported.
422 * Remove all the objects for that instance? Anything extra needed?
423 */
424 return 0;
425 }
426
427 int DBBucket::set_acl(const DoutPrefixProvider *dpp, RGWAccessControlPolicy &acl, optional_yield y)
428 {
429 int ret = 0;
430 bufferlist aclbl;
431
432 acls = acl;
433 acl.encode(aclbl);
434
435 Attrs attrs = get_attrs();
436 attrs[RGW_ATTR_ACL] = aclbl;
437
438 ret = store->getDB()->update_bucket(dpp, "attrs", info, false, &(acl.get_owner().get_id()), &attrs, nullptr, nullptr);
439
440 return ret;
441 }
442
443 std::unique_ptr<Object> DBBucket::get_object(const rgw_obj_key& k)
444 {
445 return std::make_unique<DBObject>(this->store, k, this);
446 }
447
448 int DBBucket::list(const DoutPrefixProvider *dpp, ListParams& params, int max, ListResults& results, optional_yield y)
449 {
450 int ret = 0;
451
452 results.objs.clear();
453
454 DB::Bucket target(store->getDB(), get_info());
455 DB::Bucket::List list_op(&target);
456
457 list_op.params.prefix = params.prefix;
458 list_op.params.delim = params.delim;
459 list_op.params.marker = params.marker;
460 list_op.params.ns = params.ns;
461 list_op.params.end_marker = params.end_marker;
462 list_op.params.ns = params.ns;
463 list_op.params.enforce_ns = params.enforce_ns;
464 list_op.params.access_list_filter = params.access_list_filter;
465 list_op.params.force_check_filter = params.force_check_filter;
466 list_op.params.list_versions = params.list_versions;
467 list_op.params.allow_unordered = params.allow_unordered;
468
469 results.objs.clear();
470 ret = list_op.list_objects(dpp, max, &results.objs, &results.common_prefixes, &results.is_truncated);
471 if (ret >= 0) {
472 results.next_marker = list_op.get_next_marker();
473 params.marker = results.next_marker;
474 }
475
476 return ret;
477 }
478
479 std::unique_ptr<MultipartUpload> DBBucket::get_multipart_upload(
480 const std::string& oid,
481 std::optional<std::string> upload_id,
482 ACLOwner owner, ceph::real_time mtime) {
483 return std::make_unique<DBMultipartUpload>(this->store, this, oid, upload_id,
484 std::move(owner), mtime);
485 }
486
487 int DBBucket::list_multiparts(const DoutPrefixProvider *dpp,
488 const string& prefix,
489 string& marker,
490 const string& delim,
491 const int& max_uploads,
492 vector<std::unique_ptr<MultipartUpload>>& uploads,
493 map<string, bool> *common_prefixes,
494 bool *is_truncated) {
495 return 0;
496 }
497
498 int DBBucket::abort_multiparts(const DoutPrefixProvider* dpp,
499 CephContext* cct) {
500 return 0;
501 }
502
503 void DBStore::finalize(void)
504 {
505 if (dbsm)
506 dbsm->destroyAllHandles();
507 }
508
509 const RGWZoneGroup& DBZone::get_zonegroup()
510 {
511 return *zonegroup;
512 }
513
514 int DBZone::get_zonegroup(const std::string& id, RGWZoneGroup& zg)
515 {
516 /* XXX: for now only one zonegroup supported */
517 zg = *zonegroup;
518 return 0;
519 }
520
521 const RGWZoneParams& DBZone::get_params()
522 {
523 return *zone_params;
524 }
525
526 const rgw_zone_id& DBZone::get_id()
527 {
528 return cur_zone_id;
529 }
530
531 const RGWRealm& DBZone::get_realm()
532 {
533 return *realm;
534 }
535
536 const std::string& DBZone::get_name() const
537 {
538 return zone_params->get_name();
539 }
540
541 bool DBZone::is_writeable()
542 {
543 return true;
544 }
545
546 bool DBZone::get_redirect_endpoint(std::string* endpoint)
547 {
548 return false;
549 }
550
551 bool DBZone::has_zonegroup_api(const std::string& api) const
552 {
553 return false;
554 }
555
556 const std::string& DBZone::get_current_period_id()
557 {
558 return current_period->get_id();
559 }
560
561 std::unique_ptr<LuaScriptManager> DBStore::get_lua_script_manager()
562 {
563 return std::make_unique<DBLuaScriptManager>(this);
564 }
565
566 int DBObject::get_obj_state(const DoutPrefixProvider* dpp, RGWObjectCtx* rctx, RGWObjState **state, optional_yield y, bool follow_olh)
567 {
568 *state = &(this->state);
569 DB::Object op_target(store->getDB(), get_bucket()->get_info(), get_obj());
570 return op_target.get_obj_state(dpp, get_bucket()->get_info(), get_obj(), follow_olh, state);
571 }
572
573 int DBObject::read_attrs(const DoutPrefixProvider* dpp, DB::Object::Read &read_op, optional_yield y, rgw_obj* target_obj)
574 {
575 read_op.params.attrs = &attrs;
576 read_op.params.target_obj = target_obj;
577 read_op.params.obj_size = &obj_size;
578 read_op.params.lastmod = &mtime;
579
580 return read_op.prepare(dpp);
581 }
582
583 int DBObject::set_obj_attrs(const DoutPrefixProvider* dpp, RGWObjectCtx* rctx, Attrs* setattrs, Attrs* delattrs, optional_yield y, rgw_obj* target_obj)
584 {
585 Attrs empty;
586 DB::Object op_target(store->getDB(),
587 get_bucket()->get_info(), target_obj ? *target_obj : get_obj());
588 return op_target.set_attrs(dpp, setattrs ? *setattrs : empty, delattrs);
589 }
590
591 int DBObject::get_obj_attrs(RGWObjectCtx* rctx, optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj)
592 {
593 DB::Object op_target(store->getDB(), get_bucket()->get_info(), get_obj());
594 DB::Object::Read read_op(&op_target);
595
596 return read_attrs(dpp, read_op, y, target_obj);
597 }
598
599 int DBObject::modify_obj_attrs(RGWObjectCtx* rctx, const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp)
600 {
601 rgw_obj target = get_obj();
602 int r = get_obj_attrs(rctx, y, dpp, &target);
603 if (r < 0) {
604 return r;
605 }
606 set_atomic(rctx);
607 attrs[attr_name] = attr_val;
608 return set_obj_attrs(dpp, rctx, &attrs, nullptr, y, &target);
609 }
610
611 int DBObject::delete_obj_attrs(const DoutPrefixProvider* dpp, RGWObjectCtx* rctx, const char* attr_name, optional_yield y)
612 {
613 rgw_obj target = get_obj();
614 Attrs rmattr;
615 bufferlist bl;
616
617 set_atomic(rctx);
618 rmattr[attr_name] = bl;
619 return set_obj_attrs(dpp, rctx, nullptr, &rmattr, y, &target);
620 }
621
622 /* RGWObjectCtx will be moved out of sal */
623 /* XXX: Placeholder. Should not be needed later after Dan's patch */
624 void DBObject::set_atomic(RGWObjectCtx* rctx) const
625 {
626 return;
627 }
628
629 /* RGWObjectCtx will be moved out of sal */
630 /* XXX: Placeholder. Should not be needed later after Dan's patch */
631 void DBObject::set_prefetch_data(RGWObjectCtx* rctx)
632 {
633 return;
634 }
635
636 /* RGWObjectCtx will be moved out of sal */
637 /* XXX: Placeholder. Should not be needed later after Dan's patch */
638 void DBObject::set_compressed(RGWObjectCtx* rctx)
639 {
640 return;
641 }
642
643 bool DBObject::is_expired() {
644 return false;
645 }
646
647 void DBObject::gen_rand_obj_instance_name()
648 {
649 store->getDB()->gen_rand_obj_instance_name(&key);
650 }
651
652
653 int DBObject::omap_get_vals(const DoutPrefixProvider *dpp, const std::string& marker, uint64_t count,
654 std::map<std::string, bufferlist> *m,
655 bool* pmore, optional_yield y)
656 {
657 DB::Object op_target(store->getDB(),
658 get_bucket()->get_info(), get_obj());
659 return op_target.obj_omap_get_vals(dpp, marker, count, m, pmore);
660 }
661
662 int DBObject::omap_get_all(const DoutPrefixProvider *dpp, std::map<std::string, bufferlist> *m,
663 optional_yield y)
664 {
665 DB::Object op_target(store->getDB(),
666 get_bucket()->get_info(), get_obj());
667 return op_target.obj_omap_get_all(dpp, m);
668 }
669
670 int DBObject::omap_get_vals_by_keys(const DoutPrefixProvider *dpp, const std::string& oid,
671 const std::set<std::string>& keys,
672 Attrs* vals)
673 {
674 DB::Object op_target(store->getDB(),
675 get_bucket()->get_info(), get_obj());
676 return op_target.obj_omap_get_vals_by_keys(dpp, oid, keys, vals);
677 }
678
679 int DBObject::omap_set_val_by_key(const DoutPrefixProvider *dpp, const std::string& key, bufferlist& val,
680 bool must_exist, optional_yield y)
681 {
682 DB::Object op_target(store->getDB(),
683 get_bucket()->get_info(), get_obj());
684 return op_target.obj_omap_set_val_by_key(dpp, key, val, must_exist);
685 }
686
687 MPSerializer* DBObject::get_serializer(const DoutPrefixProvider *dpp, const std::string& lock_name)
688 {
689 return new MPDBSerializer(dpp, store, this, lock_name);
690 }
691
692 int DBObject::transition(RGWObjectCtx& rctx,
693 Bucket* bucket,
694 const rgw_placement_rule& placement_rule,
695 const real_time& mtime,
696 uint64_t olh_epoch,
697 const DoutPrefixProvider* dpp,
698 optional_yield y)
699 {
700 return 0;
701 }
702
703 bool DBObject::placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2)
704 {
705 /* XXX: support single default zone and zonegroup for now */
706 return true;
707 }
708
709 int DBObject::dump_obj_layout(const DoutPrefixProvider *dpp, optional_yield y, Formatter* f, RGWObjectCtx* obj_ctx)
710 {
711 return 0;
712 }
713
714 std::unique_ptr<Object::ReadOp> DBObject::get_read_op(RGWObjectCtx* ctx)
715 {
716 return std::make_unique<DBObject::DBReadOp>(this, ctx);
717 }
718
719 DBObject::DBReadOp::DBReadOp(DBObject *_source, RGWObjectCtx *_rctx) :
720 source(_source),
721 rctx(_rctx),
722 op_target(_source->store->getDB(),
723 _source->get_bucket()->get_info(),
724 _source->get_obj()),
725 parent_op(&op_target)
726 { }
727
728 int DBObject::DBReadOp::prepare(optional_yield y, const DoutPrefixProvider* dpp)
729 {
730 uint64_t obj_size;
731
732 parent_op.conds.mod_ptr = params.mod_ptr;
733 parent_op.conds.unmod_ptr = params.unmod_ptr;
734 parent_op.conds.high_precision_time = params.high_precision_time;
735 parent_op.conds.mod_zone_id = params.mod_zone_id;
736 parent_op.conds.mod_pg_ver = params.mod_pg_ver;
737 parent_op.conds.if_match = params.if_match;
738 parent_op.conds.if_nomatch = params.if_nomatch;
739 parent_op.params.lastmod = params.lastmod;
740 parent_op.params.target_obj = params.target_obj;
741 parent_op.params.obj_size = &obj_size;
742 parent_op.params.attrs = &source->get_attrs();
743
744 int ret = parent_op.prepare(dpp);
745 if (ret < 0)
746 return ret;
747
748 source->set_key(parent_op.state.obj.key);
749 source->set_obj_size(obj_size);
750
751 return ret;
752 }
753
754 int DBObject::DBReadOp::read(int64_t ofs, int64_t end, bufferlist& bl, optional_yield y, const DoutPrefixProvider* dpp)
755 {
756 return parent_op.read(ofs, end, bl, dpp);
757 }
758
759 int DBObject::DBReadOp::get_attr(const DoutPrefixProvider* dpp, const char* name, bufferlist& dest, optional_yield y)
760 {
761 return parent_op.get_attr(dpp, name, dest);
762 }
763
764 std::unique_ptr<Object::DeleteOp> DBObject::get_delete_op(RGWObjectCtx* ctx)
765 {
766 return std::make_unique<DBObject::DBDeleteOp>(this, ctx);
767 }
768
769 DBObject::DBDeleteOp::DBDeleteOp(DBObject *_source, RGWObjectCtx *_rctx) :
770 source(_source),
771 rctx(_rctx),
772 op_target(_source->store->getDB(),
773 _source->get_bucket()->get_info(),
774 _source->get_obj()),
775 parent_op(&op_target)
776 { }
777
778 int DBObject::DBDeleteOp::delete_obj(const DoutPrefixProvider* dpp, optional_yield y)
779 {
780 parent_op.params.bucket_owner = params.bucket_owner.get_id();
781 parent_op.params.versioning_status = params.versioning_status;
782 parent_op.params.obj_owner = params.obj_owner;
783 parent_op.params.olh_epoch = params.olh_epoch;
784 parent_op.params.marker_version_id = params.marker_version_id;
785 parent_op.params.bilog_flags = params.bilog_flags;
786 parent_op.params.remove_objs = params.remove_objs;
787 parent_op.params.expiration_time = params.expiration_time;
788 parent_op.params.unmod_since = params.unmod_since;
789 parent_op.params.mtime = params.mtime;
790 parent_op.params.high_precision_time = params.high_precision_time;
791 parent_op.params.zones_trace = params.zones_trace;
792 parent_op.params.abortmp = params.abortmp;
793 parent_op.params.parts_accounted_size = params.parts_accounted_size;
794
795 int ret = parent_op.delete_obj(dpp);
796 if (ret < 0)
797 return ret;
798
799 result.delete_marker = parent_op.result.delete_marker;
800 result.version_id = parent_op.result.version_id;
801
802 return ret;
803 }
804
805 int DBObject::delete_object(const DoutPrefixProvider* dpp, RGWObjectCtx* obj_ctx, optional_yield y, bool prevent_versioning)
806 {
807 DB::Object del_target(store->getDB(), bucket->get_info(), *obj_ctx, get_obj());
808 DB::Object::Delete del_op(&del_target);
809
810 del_op.params.bucket_owner = bucket->get_info().owner;
811 del_op.params.versioning_status = bucket->get_info().versioning_status();
812
813 return del_op.delete_obj(dpp);
814 }
815
816 int DBObject::delete_obj_aio(const DoutPrefixProvider* dpp, RGWObjState* astate,
817 Completions* aio, bool keep_index_consistent,
818 optional_yield y)
819 {
820 /* XXX: Make it async */
821 return 0;
822 }
823
824 int DBObject::copy_object(RGWObjectCtx& obj_ctx,
825 User* user,
826 req_info* info,
827 const rgw_zone_id& source_zone,
828 rgw::sal::Object* dest_object,
829 rgw::sal::Bucket* dest_bucket,
830 rgw::sal::Bucket* src_bucket,
831 const rgw_placement_rule& dest_placement,
832 ceph::real_time* src_mtime,
833 ceph::real_time* mtime,
834 const ceph::real_time* mod_ptr,
835 const ceph::real_time* unmod_ptr,
836 bool high_precision_time,
837 const char* if_match,
838 const char* if_nomatch,
839 AttrsMod attrs_mod,
840 bool copy_if_newer,
841 Attrs& attrs,
842 RGWObjCategory category,
843 uint64_t olh_epoch,
844 boost::optional<ceph::real_time> delete_at,
845 std::string* version_id,
846 std::string* tag,
847 std::string* etag,
848 void (*progress_cb)(off_t, void *),
849 void* progress_data,
850 const DoutPrefixProvider* dpp,
851 optional_yield y)
852 {
853 return 0;
854 }
855
856 int DBObject::DBReadOp::iterate(const DoutPrefixProvider* dpp, int64_t ofs, int64_t end, RGWGetDataCB* cb, optional_yield y)
857 {
858 return parent_op.iterate(dpp, ofs, end, cb);
859 }
860
861 int DBObject::swift_versioning_restore(RGWObjectCtx* obj_ctx,
862 bool& restored,
863 const DoutPrefixProvider* dpp)
864 {
865 return 0;
866 }
867
868 int DBObject::swift_versioning_copy(RGWObjectCtx* obj_ctx,
869 const DoutPrefixProvider* dpp,
870 optional_yield y)
871 {
872 return 0;
873 }
874
875 int DBMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct,
876 RGWObjectCtx *obj_ctx)
877 {
878 std::unique_ptr<rgw::sal::Object> meta_obj = get_meta_obj();
879 meta_obj->set_in_extra_data(true);
880 meta_obj->set_hash_source(mp_obj.get_key());
881 int ret;
882
883 std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = meta_obj->get_delete_op(obj_ctx);
884 del_op->params.bucket_owner = bucket->get_acl_owner();
885 del_op->params.versioning_status = 0;
886
887 // Since the data objects are associated with meta obj till
888 // MultipartUpload::Complete() is done, removing the metadata obj
889 // should remove all the uploads so far.
890 ret = del_op->delete_obj(dpp, null_yield);
891 if (ret < 0) {
892 ldpp_dout(dpp, 20) << __func__ << ": del_op.delete_obj returned " <<
893 ret << dendl;
894 }
895 return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
896 }
897
898 static string mp_ns = RGW_OBJ_NS_MULTIPART;
899
900 std::unique_ptr<rgw::sal::Object> DBMultipartUpload::get_meta_obj()
901 {
902 return bucket->get_object(rgw_obj_key(get_meta(), string(), mp_ns));
903 }
904
905 int DBMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, ACLOwner& owner, rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs)
906 {
907 int ret;
908 std::string oid = mp_obj.get_key();
909
910 char buf[33];
911 std::unique_ptr<rgw::sal::Object> obj; // create meta obj
912 gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
913 std::string upload_id = MULTIPART_UPLOAD_ID_PREFIX; /* v2 upload id */
914 upload_id.append(buf);
915
916 mp_obj.init(oid, upload_id);
917 obj = get_meta_obj();
918
919 DB::Object op_target(store->getDB(), obj->get_bucket()->get_info(),
920 obj->get_obj());
921 DB::Object::Write obj_op(&op_target);
922
923 obj_op.meta.owner = owner.get_id();
924 obj_op.meta.category = RGWObjCategory::MultiMeta;
925 obj_op.meta.flags = PUT_OBJ_CREATE_EXCL;
926 obj_op.meta.mtime = &mtime;
927
928 multipart_upload_info upload_info;
929 upload_info.dest_placement = dest_placement;
930
931 bufferlist bl;
932 encode(upload_info, bl);
933 obj_op.meta.data = &bl;
934 ret = obj_op.prepare(dpp);
935 if (ret < 0)
936 return ret;
937 ret = obj_op.write_meta(dpp, bl.length(), bl.length(), attrs);
938
939 return ret;
940 }
941
942 int DBMultipartUpload::list_parts(const DoutPrefixProvider *dpp, CephContext *cct,
943 int num_parts, int marker,
944 int *next_marker, bool *truncated,
945 bool assume_unsorted)
946 {
947 std::list<RGWUploadPartInfo> parts_map;
948
949 std::unique_ptr<rgw::sal::Object> obj = get_meta_obj();
950
951 parts.clear();
952 int ret;
953
954 DB::Object op_target(store->getDB(),
955 obj->get_bucket()->get_info(), obj->get_obj());
956 ret = op_target.get_mp_parts_list(dpp, parts_map);
957 if (ret < 0) {
958 return ret;
959 }
960
961 int last_num = 0;
962
963 while (!parts_map.empty()) {
964 std::unique_ptr<DBMultipartPart> part = std::make_unique<DBMultipartPart>();
965 RGWUploadPartInfo &pinfo = parts_map.front();
966 part->set_info(pinfo);
967 if ((int)pinfo.num > marker) {
968 last_num = pinfo.num;
969 parts[pinfo.num] = std::move(part);
970 }
971 parts_map.pop_front();
972 }
973
974 /* rebuild a map with only num_parts entries */
975 std::map<uint32_t, std::unique_ptr<MultipartPart>> new_parts;
976 std::map<uint32_t, std::unique_ptr<MultipartPart>>::iterator piter;
977 int i;
978 for (i = 0, piter = parts.begin();
979 i < num_parts && piter != parts.end();
980 ++i, ++piter) {
981 last_num = piter->first;
982 new_parts[piter->first] = std::move(piter->second);
983 }
984
985 if (truncated) {
986 *truncated = (piter != parts.end());
987 }
988
989 parts.swap(new_parts);
990
991 if (next_marker) {
992 *next_marker = last_num;
993 }
994
995 return 0;
996 }
997
998 int DBMultipartUpload::complete(const DoutPrefixProvider *dpp,
999 optional_yield y, CephContext* cct,
1000 map<int, string>& part_etags,
1001 list<rgw_obj_index_key>& remove_objs,
1002 uint64_t& accounted_size, bool& compressed,
1003 RGWCompressionInfo& cs_info, off_t& ofs,
1004 std::string& tag, ACLOwner& owner,
1005 uint64_t olh_epoch,
1006 rgw::sal::Object* target_obj,
1007 RGWObjectCtx* obj_ctx)
1008 {
1009 char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
1010 char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
1011 std::string etag;
1012 bufferlist etag_bl;
1013 MD5 hash;
1014 bool truncated;
1015 int ret;
1016
1017 int total_parts = 0;
1018 int handled_parts = 0;
1019 int max_parts = 1000;
1020 int marker = 0;
1021 uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
1022 auto etags_iter = part_etags.begin();
1023 rgw::sal::Attrs attrs = target_obj->get_attrs();
1024
1025 ofs = 0;
1026 accounted_size = 0;
1027 do {
1028 ret = list_parts(dpp, cct, max_parts, marker, &marker, &truncated);
1029 if (ret == -ENOENT) {
1030 ret = -ERR_NO_SUCH_UPLOAD;
1031 }
1032 if (ret < 0)
1033 return ret;
1034
1035 total_parts += parts.size();
1036 if (!truncated && total_parts != (int)part_etags.size()) {
1037 ldpp_dout(dpp, 0) << "NOTICE: total parts mismatch: have: " << total_parts
1038 << " expected: " << part_etags.size() << dendl;
1039 ret = -ERR_INVALID_PART;
1040 return ret;
1041 }
1042
1043 for (auto obj_iter = parts.begin(); etags_iter != part_etags.end() && obj_iter != parts.end(); ++etags_iter, ++obj_iter, ++handled_parts) {
1044 DBMultipartPart* part = dynamic_cast<rgw::sal::DBMultipartPart*>(obj_iter->second.get());
1045 uint64_t part_size = part->get_size();
1046 if (handled_parts < (int)part_etags.size() - 1 &&
1047 part_size < min_part_size) {
1048 ret = -ERR_TOO_SMALL;
1049 return ret;
1050 }
1051
1052 char petag[CEPH_CRYPTO_MD5_DIGESTSIZE];
1053 if (etags_iter->first != (int)obj_iter->first) {
1054 ldpp_dout(dpp, 0) << "NOTICE: parts num mismatch: next requested: "
1055 << etags_iter->first << " next uploaded: "
1056 << obj_iter->first << dendl;
1057 ret = -ERR_INVALID_PART;
1058 return ret;
1059 }
1060 string part_etag = rgw_string_unquote(etags_iter->second);
1061 if (part_etag.compare(part->get_etag()) != 0) {
1062 ldpp_dout(dpp, 0) << "NOTICE: etag mismatch: part: " << etags_iter->first
1063 << " etag: " << etags_iter->second << dendl;
1064 ret = -ERR_INVALID_PART;
1065 return ret;
1066 }
1067
1068 hex_to_buf(part->get_etag().c_str(), petag,
1069 CEPH_CRYPTO_MD5_DIGESTSIZE);
1070 hash.Update((const unsigned char *)petag, sizeof(petag));
1071
1072 RGWUploadPartInfo& obj_part = part->get_info();
1073
1074 ofs += obj_part.size;
1075 accounted_size += obj_part.accounted_size;
1076 }
1077 } while (truncated);
1078 hash.Final((unsigned char *)final_etag);
1079
1080 buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str);
1081 snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2],
1082 sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2,
1083 "-%lld", (long long)part_etags.size());
1084 etag = final_etag_str;
1085 ldpp_dout(dpp, 10) << "calculated etag: " << etag << dendl;
1086
1087 etag_bl.append(etag);
1088
1089 attrs[RGW_ATTR_ETAG] = etag_bl;
1090
1091 /* XXX: handle compression ? */
1092
1093 /* Rename all the object data entries with original object name (i.e
1094 * from 'head_obj.name + "." + upload_id' to head_obj.name) */
1095
1096 /* Original head object */
1097 DB::Object op_target(store->getDB(),
1098 target_obj->get_bucket()->get_info(),
1099 target_obj->get_obj());
1100 DB::Object::Write obj_op(&op_target);
1101 obj_op.prepare(NULL);
1102
1103 /* Meta object */
1104 std::unique_ptr<rgw::sal::Object> meta_obj = get_meta_obj();
1105 DB::Object meta_op_target(store->getDB(),
1106 meta_obj->get_bucket()->get_info(),
1107 meta_obj->get_obj());
1108 DB::Object::Write mp_op(&meta_op_target);
1109 mp_op.update_mp_parts(dpp, target_obj->get_obj().key);
1110
1111 obj_op.meta.owner = owner.get_id();
1112 obj_op.meta.flags = PUT_OBJ_CREATE;
1113 obj_op.meta.modify_tail = true;
1114 obj_op.meta.completeMultipart = true;
1115
1116 ret = obj_op.write_meta(dpp, ofs, accounted_size, attrs);
1117 if (ret < 0)
1118 return ret;
1119
1120 /* No need to delete Meta obj here. It is deleted from sal */
1121 return ret;
1122 }
1123
1124 int DBMultipartUpload::get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs)
1125 {
1126 if (!rule && !attrs) {
1127 return 0;
1128 }
1129
1130 if (rule) {
1131 if (!placement.empty()) {
1132 *rule = &placement;
1133 if (!attrs) {
1134 /* Don't need attrs, done */
1135 return 0;
1136 }
1137 } else {
1138 *rule = nullptr;
1139 }
1140 }
1141
1142 /* We need either attributes or placement, so we need a read */
1143 std::unique_ptr<rgw::sal::Object> meta_obj;
1144 meta_obj = get_meta_obj();
1145 meta_obj->set_in_extra_data(true);
1146
1147 multipart_upload_info upload_info;
1148 bufferlist headbl;
1149
1150 /* Read the obj head which contains the multipart_upload_info */
1151 std::unique_ptr<rgw::sal::Object::ReadOp> read_op = meta_obj->get_read_op(obj_ctx);
1152 int ret = read_op->prepare(y, dpp);
1153 if (ret < 0) {
1154 if (ret == -ENOENT) {
1155 return -ERR_NO_SUCH_UPLOAD;
1156 }
1157 return ret;
1158 }
1159
1160 if (attrs) {
1161 /* Attrs are filled in by prepare */
1162 *attrs = meta_obj->get_attrs();
1163 if (!rule || *rule != nullptr) {
1164 /* placement was cached; don't actually read */
1165 return 0;
1166 }
1167 }
1168
1169 /* Now read the placement from the head */
1170 ret = read_op->read(0, store->getDB()->get_max_head_size(), headbl, y, dpp);
1171 if (ret < 0) {
1172 if (ret == -ENOENT) {
1173 return -ERR_NO_SUCH_UPLOAD;
1174 }
1175 return ret;
1176 }
1177
1178 if (headbl.length() <= 0) {
1179 return -ERR_NO_SUCH_UPLOAD;
1180 }
1181
1182 /* Decode multipart_upload_info */
1183 auto hiter = headbl.cbegin();
1184 try {
1185 decode(upload_info, hiter);
1186 } catch (buffer::error& err) {
1187 ldpp_dout(dpp, 0) << "ERROR: failed to decode multipart upload info" << dendl;
1188 return -EIO;
1189 }
1190 placement = upload_info.dest_placement;
1191 *rule = &placement;
1192
1193 return 0;
1194 }
1195
1196 std::unique_ptr<Writer> DBMultipartUpload::get_writer(
1197 const DoutPrefixProvider *dpp,
1198 optional_yield y,
1199 std::unique_ptr<rgw::sal::Object> _head_obj,
1200 const rgw_user& owner, RGWObjectCtx& obj_ctx,
1201 const rgw_placement_rule *ptail_placement_rule,
1202 uint64_t part_num,
1203 const std::string& part_num_str)
1204 {
1205 return std::make_unique<DBMultipartWriter>(dpp, y, this,
1206 std::move(_head_obj), store, owner,
1207 obj_ctx, ptail_placement_rule, part_num, part_num_str);
1208 }
1209
1210 DBMultipartWriter::DBMultipartWriter(const DoutPrefixProvider *dpp,
1211 optional_yield y,
1212 MultipartUpload* upload,
1213 std::unique_ptr<rgw::sal::Object> _head_obj,
1214 DBStore* _store,
1215 const rgw_user& _owner, RGWObjectCtx& obj_ctx,
1216 const rgw_placement_rule *_ptail_placement_rule,
1217 uint64_t _part_num, const std::string& _part_num_str):
1218 Writer(dpp, y),
1219 store(_store),
1220 owner(_owner),
1221 ptail_placement_rule(_ptail_placement_rule),
1222 head_obj(std::move(_head_obj)),
1223 upload_id(upload->get_upload_id()),
1224 oid(head_obj->get_name() + "." + upload_id +
1225 "." + std::to_string(part_num)),
1226 meta_obj(((DBMultipartUpload*)upload)->get_meta_obj()),
1227 op_target(_store->getDB(), meta_obj->get_bucket()->get_info(), meta_obj->get_obj()),
1228 parent_op(&op_target), part_num(_part_num),
1229 part_num_str(_part_num_str) { parent_op.prepare(NULL);}
1230
1231 int DBMultipartWriter::prepare(optional_yield y)
1232 {
1233 parent_op.set_mp_part_str(upload_id + "." + std::to_string(part_num));
1234 // XXX: do we need to handle part_num_str??
1235 return 0;
1236 }
1237
1238 int DBMultipartWriter::process(bufferlist&& data, uint64_t offset)
1239 {
1240 /* XXX: same as AtomicWriter..consolidate code */
1241 total_data_size += data.length();
1242
1243 /* XXX: Optimize all bufferlist copies in this function */
1244
1245 /* copy head_data into meta. But for multipart we do not
1246 * need to write head_data */
1247 uint64_t max_chunk_size = store->getDB()->get_max_chunk_size();
1248 int excess_size = 0;
1249
1250 /* Accumulate tail_data till max_chunk_size or flush op */
1251 bufferlist tail_data;
1252
1253 if (data.length() != 0) {
1254 parent_op.meta.data = &head_data; /* Null data ?? */
1255
1256 /* handle tail )parts.
1257 * First accumulate and write data into dbstore in its chunk_size
1258 * parts
1259 */
1260 if (!tail_part_size) { /* new tail part */
1261 tail_part_offset = offset;
1262 }
1263 data.begin(0).copy(data.length(), tail_data);
1264 tail_part_size += tail_data.length();
1265 tail_part_data.append(tail_data);
1266
1267 if (tail_part_size < max_chunk_size) {
1268 return 0;
1269 } else {
1270 int write_ofs = 0;
1271 while (tail_part_size >= max_chunk_size) {
1272 excess_size = tail_part_size - max_chunk_size;
1273 bufferlist tmp;
1274 tail_part_data.begin(write_ofs).copy(max_chunk_size, tmp);
1275 /* write tail objects data */
1276 int ret = parent_op.write_data(dpp, tmp, tail_part_offset);
1277
1278 if (ret < 0) {
1279 return ret;
1280 }
1281
1282 tail_part_size -= max_chunk_size;
1283 write_ofs += max_chunk_size;
1284 tail_part_offset += max_chunk_size;
1285 }
1286 /* reset tail parts or update if excess data */
1287 if (excess_size > 0) { /* wrote max_chunk_size data */
1288 tail_part_size = excess_size;
1289 bufferlist tmp;
1290 tail_part_data.begin(write_ofs).copy(excess_size, tmp);
1291 tail_part_data = tmp;
1292 } else {
1293 tail_part_size = 0;
1294 tail_part_data.clear();
1295 tail_part_offset = 0;
1296 }
1297 }
1298 } else {
1299 if (tail_part_size == 0) {
1300 return 0; /* nothing more to write */
1301 }
1302
1303 /* flush watever tail data is present */
1304 int ret = parent_op.write_data(dpp, tail_part_data, tail_part_offset);
1305 if (ret < 0) {
1306 return ret;
1307 }
1308 tail_part_size = 0;
1309 tail_part_data.clear();
1310 tail_part_offset = 0;
1311 }
1312
1313 return 0;
1314 }
1315
1316 int DBMultipartWriter::complete(size_t accounted_size, const std::string& etag,
1317 ceph::real_time *mtime, ceph::real_time set_mtime,
1318 std::map<std::string, bufferlist>& attrs,
1319 ceph::real_time delete_at,
1320 const char *if_match, const char *if_nomatch,
1321 const std::string *user_data,
1322 rgw_zone_set *zones_trace, bool *canceled,
1323 optional_yield y)
1324 {
1325 int ret = 0;
1326 /* XXX: same as AtomicWriter..consolidate code */
1327 parent_op.meta.mtime = mtime;
1328 parent_op.meta.delete_at = delete_at;
1329 parent_op.meta.if_match = if_match;
1330 parent_op.meta.if_nomatch = if_nomatch;
1331 parent_op.meta.user_data = user_data;
1332 parent_op.meta.zones_trace = zones_trace;
1333
1334 /* XXX: handle accounted size */
1335 accounted_size = total_data_size;
1336
1337 if (ret < 0)
1338 return ret;
1339
1340 RGWUploadPartInfo info;
1341 info.num = part_num;
1342 info.etag = etag;
1343 info.size = total_data_size;
1344 info.accounted_size = accounted_size;
1345 info.modified = real_clock::now();
1346 //info.manifest = manifest;
1347
1348 DB::Object op_target(store->getDB(),
1349 meta_obj->get_bucket()->get_info(), meta_obj->get_obj());
1350 ret = op_target.add_mp_part(dpp, info);
1351 if (ret < 0) {
1352 return ret == -ENOENT ? -ERR_NO_SUCH_UPLOAD : ret;
1353 }
1354
1355 return 0;
1356 }
1357
1358 DBAtomicWriter::DBAtomicWriter(const DoutPrefixProvider *dpp,
1359 optional_yield y,
1360 std::unique_ptr<rgw::sal::Object> _head_obj,
1361 DBStore* _store,
1362 const rgw_user& _owner, RGWObjectCtx& obj_ctx,
1363 const rgw_placement_rule *_ptail_placement_rule,
1364 uint64_t _olh_epoch,
1365 const std::string& _unique_tag) :
1366 Writer(dpp, y),
1367 store(_store),
1368 owner(_owner),
1369 ptail_placement_rule(_ptail_placement_rule),
1370 olh_epoch(_olh_epoch),
1371 unique_tag(_unique_tag),
1372 obj(_store, _head_obj->get_key(), _head_obj->get_bucket()),
1373 op_target(_store->getDB(), obj.get_bucket()->get_info(), obj.get_obj()),
1374 parent_op(&op_target) {}
1375
1376 int DBAtomicWriter::prepare(optional_yield y)
1377 {
1378 return parent_op.prepare(NULL); /* send dpp */
1379 }
1380
1381 int DBAtomicWriter::process(bufferlist&& data, uint64_t offset)
1382 {
1383 total_data_size += data.length();
1384
1385 /* XXX: Optimize all bufferlist copies in this function */
1386
1387 /* copy head_data into meta. */
1388 uint64_t head_size = store->getDB()->get_max_head_size();
1389 unsigned head_len = 0;
1390 uint64_t max_chunk_size = store->getDB()->get_max_chunk_size();
1391 int excess_size = 0;
1392
1393 /* Accumulate tail_data till max_chunk_size or flush op */
1394 bufferlist tail_data;
1395
1396 if (data.length() != 0) {
1397 if (offset < head_size) {
1398 /* XXX: handle case (if exists) where offset > 0 & < head_size */
1399 head_len = std::min((uint64_t)data.length(),
1400 head_size - offset);
1401 bufferlist tmp;
1402 data.begin(0).copy(head_len, tmp);
1403 head_data.append(tmp);
1404
1405 parent_op.meta.data = &head_data;
1406 if (head_len == data.length()) {
1407 return 0;
1408 }
1409
1410 /* Move offset by copy_len */
1411 offset = head_len;
1412 }
1413
1414 /* handle tail parts.
1415 * First accumulate and write data into dbstore in its chunk_size
1416 * parts
1417 */
1418 if (!tail_part_size) { /* new tail part */
1419 tail_part_offset = offset;
1420 }
1421 data.begin(head_len).copy(data.length() - head_len, tail_data);
1422 tail_part_size += tail_data.length();
1423 tail_part_data.append(tail_data);
1424
1425 if (tail_part_size < max_chunk_size) {
1426 return 0;
1427 } else {
1428 int write_ofs = 0;
1429 while (tail_part_size >= max_chunk_size) {
1430 excess_size = tail_part_size - max_chunk_size;
1431 bufferlist tmp;
1432 tail_part_data.begin(write_ofs).copy(max_chunk_size, tmp);
1433 /* write tail objects data */
1434 int ret = parent_op.write_data(dpp, tmp, tail_part_offset);
1435
1436 if (ret < 0) {
1437 return ret;
1438 }
1439
1440 tail_part_size -= max_chunk_size;
1441 write_ofs += max_chunk_size;
1442 tail_part_offset += max_chunk_size;
1443 }
1444 /* reset tail parts or update if excess data */
1445 if (excess_size > 0) { /* wrote max_chunk_size data */
1446 tail_part_size = excess_size;
1447 bufferlist tmp;
1448 tail_part_data.begin(write_ofs).copy(excess_size, tmp);
1449 tail_part_data = tmp;
1450 } else {
1451 tail_part_size = 0;
1452 tail_part_data.clear();
1453 tail_part_offset = 0;
1454 }
1455 }
1456 } else {
1457 if (tail_part_size == 0) {
1458 return 0; /* nothing more to write */
1459 }
1460
1461 /* flush watever tail data is present */
1462 int ret = parent_op.write_data(dpp, tail_part_data, tail_part_offset);
1463 if (ret < 0) {
1464 return ret;
1465 }
1466 tail_part_size = 0;
1467 tail_part_data.clear();
1468 tail_part_offset = 0;
1469 }
1470
1471 return 0;
1472 }
1473
1474 int DBAtomicWriter::complete(size_t accounted_size, const std::string& etag,
1475 ceph::real_time *mtime, ceph::real_time set_mtime,
1476 std::map<std::string, bufferlist>& attrs,
1477 ceph::real_time delete_at,
1478 const char *if_match, const char *if_nomatch,
1479 const std::string *user_data,
1480 rgw_zone_set *zones_trace, bool *canceled,
1481 optional_yield y)
1482 {
1483 parent_op.meta.mtime = mtime;
1484 parent_op.meta.delete_at = delete_at;
1485 parent_op.meta.if_match = if_match;
1486 parent_op.meta.if_nomatch = if_nomatch;
1487 parent_op.meta.user_data = user_data;
1488 parent_op.meta.zones_trace = zones_trace;
1489
1490 /* XXX: handle accounted size */
1491 accounted_size = total_data_size;
1492 int ret = parent_op.write_meta(dpp, total_data_size, accounted_size, attrs);
1493 if (canceled) {
1494 *canceled = parent_op.meta.canceled;
1495 }
1496
1497 return ret;
1498
1499 }
1500
1501 std::unique_ptr<RGWRole> DBStore::get_role(std::string name,
1502 std::string tenant,
1503 std::string path,
1504 std::string trust_policy,
1505 std::string max_session_duration_str,
1506 std::multimap<std::string,std::string> tags)
1507 {
1508 RGWRole* p = nullptr;
1509 return std::unique_ptr<RGWRole>(p);
1510 }
1511
1512 std::unique_ptr<RGWRole> DBStore::get_role(std::string id)
1513 {
1514 RGWRole* p = nullptr;
1515 return std::unique_ptr<RGWRole>(p);
1516 }
1517
1518 int DBStore::get_roles(const DoutPrefixProvider *dpp,
1519 optional_yield y,
1520 const std::string& path_prefix,
1521 const std::string& tenant,
1522 vector<std::unique_ptr<RGWRole>>& roles)
1523 {
1524 return 0;
1525 }
1526
1527 std::unique_ptr<RGWOIDCProvider> DBStore::get_oidc_provider()
1528 {
1529 RGWOIDCProvider* p = nullptr;
1530 return std::unique_ptr<RGWOIDCProvider>(p);
1531 }
1532
1533 int DBStore::get_oidc_providers(const DoutPrefixProvider *dpp,
1534 const std::string& tenant,
1535 vector<std::unique_ptr<RGWOIDCProvider>>& providers)
1536 {
1537 return 0;
1538 }
1539
1540 std::unique_ptr<Writer> DBStore::get_append_writer(const DoutPrefixProvider *dpp,
1541 optional_yield y,
1542 std::unique_ptr<rgw::sal::Object> _head_obj,
1543 const rgw_user& owner, RGWObjectCtx& obj_ctx,
1544 const rgw_placement_rule *ptail_placement_rule,
1545 const std::string& unique_tag,
1546 uint64_t position,
1547 uint64_t *cur_accounted_size) {
1548 return nullptr;
1549 }
1550
1551 std::unique_ptr<Writer> DBStore::get_atomic_writer(const DoutPrefixProvider *dpp,
1552 optional_yield y,
1553 std::unique_ptr<rgw::sal::Object> _head_obj,
1554 const rgw_user& owner, RGWObjectCtx& obj_ctx,
1555 const rgw_placement_rule *ptail_placement_rule,
1556 uint64_t olh_epoch,
1557 const std::string& unique_tag) {
1558 return std::make_unique<DBAtomicWriter>(dpp, y,
1559 std::move(_head_obj), this, owner, obj_ctx,
1560 ptail_placement_rule, olh_epoch, unique_tag);
1561 }
1562
1563 std::unique_ptr<User> DBStore::get_user(const rgw_user &u)
1564 {
1565 return std::make_unique<DBUser>(this, u);
1566 }
1567
1568 int DBStore::get_user_by_access_key(const DoutPrefixProvider *dpp, const std::string& key, optional_yield y, std::unique_ptr<User>* user)
1569 {
1570 RGWUserInfo uinfo;
1571 User *u;
1572 int ret = 0;
1573 RGWObjVersionTracker objv_tracker;
1574
1575 ret = getDB()->get_user(dpp, string("access_key"), key, uinfo, nullptr,
1576 &objv_tracker);
1577
1578 if (ret < 0)
1579 return ret;
1580
1581 u = new DBUser(this, uinfo);
1582
1583 if (!u)
1584 return -ENOMEM;
1585
1586 u->get_version_tracker() = objv_tracker;
1587 user->reset(u);
1588
1589 return 0;
1590 }
1591
1592 int DBStore::get_user_by_email(const DoutPrefixProvider *dpp, const std::string& email, optional_yield y, std::unique_ptr<User>* user)
1593 {
1594 RGWUserInfo uinfo;
1595 User *u;
1596 int ret = 0;
1597 RGWObjVersionTracker objv_tracker;
1598
1599 ret = getDB()->get_user(dpp, string("email"), email, uinfo, nullptr,
1600 &objv_tracker);
1601
1602 if (ret < 0)
1603 return ret;
1604
1605 u = new DBUser(this, uinfo);
1606
1607 if (!u)
1608 return -ENOMEM;
1609
1610 u->get_version_tracker() = objv_tracker;
1611 user->reset(u);
1612
1613 return ret;
1614 }
1615
1616 int DBStore::get_user_by_swift(const DoutPrefixProvider *dpp, const std::string& user_str, optional_yield y, std::unique_ptr<User>* user)
1617 {
1618 /* Swift keys and subusers are not supported for now */
1619 return 0;
1620 }
1621
1622 std::string DBStore::get_cluster_id(const DoutPrefixProvider* dpp, optional_yield y)
1623 {
1624 return "PLACEHOLDER"; // for instance unique identifier
1625 }
1626
1627 std::unique_ptr<Object> DBStore::get_object(const rgw_obj_key& k)
1628 {
1629 return std::make_unique<DBObject>(this, k);
1630 }
1631
1632
1633 int DBStore::get_bucket(const DoutPrefixProvider *dpp, User* u, const rgw_bucket& b, std::unique_ptr<Bucket>* bucket, optional_yield y)
1634 {
1635 int ret;
1636 Bucket* bp;
1637
1638 bp = new DBBucket(this, b, u);
1639 ret = bp->load_bucket(dpp, y);
1640 if (ret < 0) {
1641 delete bp;
1642 return ret;
1643 }
1644
1645 bucket->reset(bp);
1646 return 0;
1647 }
1648
1649 int DBStore::get_bucket(User* u, const RGWBucketInfo& i, std::unique_ptr<Bucket>* bucket)
1650 {
1651 Bucket* bp;
1652
1653 bp = new DBBucket(this, i, u);
1654 /* Don't need to fetch the bucket info, use the provided one */
1655
1656 bucket->reset(bp);
1657 return 0;
1658 }
1659
1660 int DBStore::get_bucket(const DoutPrefixProvider *dpp, User* u, const std::string& tenant, const std::string& name, std::unique_ptr<Bucket>* bucket, optional_yield y)
1661 {
1662 rgw_bucket b;
1663
1664 b.tenant = tenant;
1665 b.name = name;
1666
1667 return get_bucket(dpp, u, b, bucket, y);
1668 }
1669
1670 bool DBStore::is_meta_master()
1671 {
1672 return true;
1673 }
1674
1675 int DBStore::forward_request_to_master(const DoutPrefixProvider *dpp, User* user, obj_version *objv,
1676 bufferlist& in_data,
1677 JSONParser *jp, req_info& info,
1678 optional_yield y)
1679 {
1680 return 0;
1681 }
1682
1683 std::string DBStore::zone_unique_id(uint64_t unique_num)
1684 {
1685 return "";
1686 }
1687
1688 std::string DBStore::zone_unique_trans_id(const uint64_t unique_num)
1689 {
1690 return "";
1691 }
1692
1693 int DBStore::cluster_stat(RGWClusterStat& stats)
1694 {
1695 return 0;
1696 }
1697
1698 std::unique_ptr<Lifecycle> DBStore::get_lifecycle(void)
1699 {
1700 return std::make_unique<DBLifecycle>(this);
1701 }
1702
1703 std::unique_ptr<Completions> DBStore::get_completions(void)
1704 {
1705 return 0;
1706 }
1707
1708 int DBLifecycle::get_entry(const std::string& oid, const std::string& marker,
1709 LCEntry& entry)
1710 {
1711 return store->getDB()->get_entry(oid, marker, entry);
1712 }
1713
1714 int DBLifecycle::get_next_entry(const std::string& oid, std::string& marker,
1715 LCEntry& entry)
1716 {
1717 return store->getDB()->get_next_entry(oid, marker, entry);
1718 }
1719
1720 int DBLifecycle::set_entry(const std::string& oid, const LCEntry& entry)
1721 {
1722 return store->getDB()->set_entry(oid, entry);
1723 }
1724
1725 int DBLifecycle::list_entries(const std::string& oid, const std::string& marker,
1726 uint32_t max_entries, vector<LCEntry>& entries)
1727 {
1728 return store->getDB()->list_entries(oid, marker, max_entries, entries);
1729 }
1730
1731 int DBLifecycle::rm_entry(const std::string& oid, const LCEntry& entry)
1732 {
1733 return store->getDB()->rm_entry(oid, entry);
1734 }
1735
1736 int DBLifecycle::get_head(const std::string& oid, LCHead& head)
1737 {
1738 return store->getDB()->get_head(oid, head);
1739 }
1740
1741 int DBLifecycle::put_head(const std::string& oid, const LCHead& head)
1742 {
1743 return store->getDB()->put_head(oid, head);
1744 }
1745
1746 LCSerializer* DBLifecycle::get_serializer(const std::string& lock_name, const std::string& oid, const std::string& cookie)
1747 {
1748 return new LCDBSerializer(store, oid, lock_name, cookie);
1749 }
1750
1751 std::unique_ptr<Notification> DBStore::get_notification(
1752 rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s,
1753 rgw::notify::EventType event_type, const std::string* object_name)
1754 {
1755 return std::make_unique<DBNotification>(obj, src_obj, event_type);
1756 }
1757
1758 std::unique_ptr<Notification> DBStore::get_notification(
1759 const DoutPrefixProvider* dpp, rgw::sal::Object* obj,
1760 rgw::sal::Object* src_obj, RGWObjectCtx* rctx,
1761 rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket,
1762 std::string& _user_id, std::string& _user_tenant, std::string& _req_id,
1763 optional_yield y)
1764 {
1765 return std::make_unique<DBNotification>(obj, src_obj, event_type);
1766 }
1767
1768 RGWLC* DBStore::get_rgwlc(void) {
1769 return lc;
1770 }
1771
1772 int DBStore::log_usage(const DoutPrefixProvider *dpp, map<rgw_user_bucket, RGWUsageBatch>& usage_info)
1773 {
1774 return 0;
1775 }
1776
1777 int DBStore::log_op(const DoutPrefixProvider *dpp, string& oid, bufferlist& bl)
1778 {
1779 return 0;
1780 }
1781
1782 int DBStore::register_to_service_map(const DoutPrefixProvider *dpp, const string& daemon_type,
1783 const map<string, string>& meta)
1784 {
1785 return 0;
1786 }
1787
1788 void DBStore::get_ratelimit(RGWRateLimitInfo& bucket_ratelimit, RGWRateLimitInfo& user_ratelimit, RGWRateLimitInfo& anon_ratelimit)
1789 {
1790 return;
1791 }
1792
1793 void DBStore::get_quota(RGWQuotaInfo& bucket_quota, RGWQuotaInfo& user_quota)
1794 {
1795 // XXX: Not handled for the first pass
1796 return;
1797 }
1798
1799 int DBStore::set_buckets_enabled(const DoutPrefixProvider *dpp, vector<rgw_bucket>& buckets, bool enabled)
1800 {
1801 int ret = 0;
1802
1803 vector<rgw_bucket>::iterator iter;
1804
1805 for (iter = buckets.begin(); iter != buckets.end(); ++iter) {
1806 rgw_bucket& bucket = *iter;
1807 if (enabled) {
1808 ldpp_dout(dpp, 20) << "enabling bucket name=" << bucket.name << dendl;
1809 } else {
1810 ldpp_dout(dpp, 20) << "disabling bucket name=" << bucket.name << dendl;
1811 }
1812
1813 RGWBucketInfo info;
1814 map<string, bufferlist> attrs;
1815 int r = getDB()->get_bucket_info(dpp, string("name"), "", info, &attrs,
1816 nullptr, nullptr);
1817 if (r < 0) {
1818 ldpp_dout(dpp, 0) << "NOTICE: get_bucket_info on bucket=" << bucket.name << " returned err=" << r << ", skipping bucket" << dendl;
1819 ret = r;
1820 continue;
1821 }
1822 if (enabled) {
1823 info.flags &= ~BUCKET_SUSPENDED;
1824 } else {
1825 info.flags |= BUCKET_SUSPENDED;
1826 }
1827
1828 r = getDB()->update_bucket(dpp, "info", info, false, nullptr, &attrs, nullptr, &info.objv_tracker);
1829 if (r < 0) {
1830 ldpp_dout(dpp, 0) << "NOTICE: put_bucket_info on bucket=" << bucket.name << " returned err=" << r << ", skipping bucket" << dendl;
1831 ret = r;
1832 continue;
1833 }
1834 }
1835 return ret;
1836 }
1837
1838 int DBStore::get_sync_policy_handler(const DoutPrefixProvider *dpp,
1839 std::optional<rgw_zone_id> zone,
1840 std::optional<rgw_bucket> bucket,
1841 RGWBucketSyncPolicyHandlerRef *phandler,
1842 optional_yield y)
1843 {
1844 return 0;
1845 }
1846
1847 RGWDataSyncStatusManager* DBStore::get_data_sync_manager(const rgw_zone_id& source_zone)
1848 {
1849 return 0;
1850 }
1851
1852 int DBStore::read_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch,
1853 uint32_t max_entries, bool *is_truncated,
1854 RGWUsageIter& usage_iter,
1855 map<rgw_user_bucket, rgw_usage_log_entry>& usage)
1856 {
1857 return 0;
1858 }
1859
1860 int DBStore::trim_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch)
1861 {
1862 return 0;
1863 }
1864
1865 int DBStore::get_config_key_val(string name, bufferlist *bl)
1866 {
1867 return 0;
1868 }
1869
1870 int DBStore::meta_list_keys_init(const DoutPrefixProvider *dpp, const string& section, const string& marker, void** phandle)
1871 {
1872 return 0;
1873 }
1874
1875 int DBStore::meta_list_keys_next(const DoutPrefixProvider *dpp, void* handle, int max, list<string>& keys, bool* truncated)
1876 {
1877 return 0;
1878 }
1879
1880 void DBStore::meta_list_keys_complete(void* handle)
1881 {
1882 return;
1883 }
1884
1885 std::string DBStore::meta_get_marker(void* handle)
1886 {
1887 return "";
1888 }
1889
1890 int DBStore::meta_remove(const DoutPrefixProvider *dpp, string& metadata_key, optional_yield y)
1891 {
1892 return 0;
1893 }
1894
1895 int DBStore::initialize(CephContext *_cct, const DoutPrefixProvider *_dpp) {
1896 int ret = 0;
1897 cct = _cct;
1898 dpp = _dpp;
1899
1900 lc = new RGWLC();
1901 lc->initialize(cct, this);
1902
1903 if (use_lc_thread) {
1904 ret = db->createLCTables(dpp);
1905 lc->start_processor();
1906 }
1907
1908 return ret;
1909 }
1910 } // namespace rgw::sal
1911
1912 extern "C" {
1913
1914 void *newDBStore(CephContext *cct)
1915 {
1916 rgw::sal::DBStore *store = new rgw::sal::DBStore();
1917 if (store) {
1918 DBStoreManager *dbsm = new DBStoreManager(cct);
1919
1920 if (!dbsm ) {
1921 delete store; store = nullptr;
1922 }
1923
1924 DB *db = dbsm->getDB();
1925 if (!db) {
1926 delete dbsm;
1927 delete store; store = nullptr;
1928 }
1929
1930 store->setDBStoreManager(dbsm);
1931 store->setDB(db);
1932 db->set_store((rgw::sal::Store*)store);
1933 db->set_context(cct);
1934 }
1935
1936 return store;
1937 }
1938
1939 }