]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/store/dbstore/common/dbstore.cc
buildsys: change download over to reef release
[ceph.git] / ceph / src / rgw / store / dbstore / common / dbstore.cc
CommitLineData
20effc67
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "dbstore.h"
5
6using namespace std;
7
8namespace rgw { namespace store {
9
10map<string, class ObjectOp*> DB::objectmap = {};
11
12map<string, class ObjectOp*> DB::getObjectMap() {
13 return DB::objectmap;
14}
15
16int DB::Initialize(string logfile, int loglevel)
17{
18 int ret = -1;
19 const DoutPrefixProvider *dpp = get_def_dpp();
20
21 if (!cct) {
22 cout << "Failed to Initialize. No ceph Context \n";
23 return -1;
24 }
25
26 if (loglevel > 0) {
27 cct->_conf->subsys.set_log_level(ceph_subsys_rgw, loglevel);
28 }
29 if (!logfile.empty()) {
30 cct->_log->set_log_file(logfile);
31 cct->_log->reopen_log_file();
32 }
33
34
35 db = openDB(dpp);
36
37 if (!db) {
38 ldpp_dout(dpp, 0) <<"Failed to open database " << dendl;
39 return ret;
40 }
41
42 ret = InitializeDBOps(dpp);
43
44 if (ret) {
45 ldpp_dout(dpp, 0) <<"InitializeDBOps failed " << dendl;
46 closeDB(dpp);
47 db = NULL;
48 return ret;
49 }
50
51 ldpp_dout(dpp, 0) << "DB successfully initialized - name:" \
52 << db_name << "" << dendl;
53
54 return ret;
55}
56
57int DB::Destroy(const DoutPrefixProvider *dpp)
58{
59 if (!db)
60 return 0;
61
62 closeDB(dpp);
63
64
65 FreeDBOps(dpp);
66
67 ldpp_dout(dpp, 20)<<"DB successfully destroyed - name:" \
68 <<db_name << dendl;
69
70 return 0;
71}
72
73
74DBOp *DB::getDBOp(const DoutPrefixProvider *dpp, string Op, struct DBOpParams *params)
75{
76 if (!Op.compare("InsertUser"))
77 return dbops.InsertUser;
78 if (!Op.compare("RemoveUser"))
79 return dbops.RemoveUser;
80 if (!Op.compare("GetUser"))
81 return dbops.GetUser;
82 if (!Op.compare("InsertBucket"))
83 return dbops.InsertBucket;
84 if (!Op.compare("UpdateBucket"))
85 return dbops.UpdateBucket;
86 if (!Op.compare("RemoveBucket"))
87 return dbops.RemoveBucket;
88 if (!Op.compare("GetBucket"))
89 return dbops.GetBucket;
90 if (!Op.compare("ListUserBuckets"))
91 return dbops.ListUserBuckets;
92 if (!Op.compare("InsertLCEntry"))
93 return dbops.InsertLCEntry;
94 if (!Op.compare("RemoveLCEntry"))
95 return dbops.RemoveLCEntry;
96 if (!Op.compare("GetLCEntry"))
97 return dbops.GetLCEntry;
98 if (!Op.compare("ListLCEntries"))
99 return dbops.ListLCEntries;
100 if (!Op.compare("InsertLCHead"))
101 return dbops.InsertLCHead;
102 if (!Op.compare("RemoveLCHead"))
103 return dbops.RemoveLCHead;
104 if (!Op.compare("GetLCHead"))
105 return dbops.GetLCHead;
106
107 /* Object Operations */
108 map<string, class ObjectOp*>::iterator iter;
109 class ObjectOp* Ob;
110
111 {
112 const std::lock_guard<std::mutex> lk(mtx);
113 iter = DB::objectmap.find(params->op.bucket.info.bucket.name);
114 }
115
116 if (iter == DB::objectmap.end()) {
117 ldpp_dout(dpp, 30)<<"No objectmap found for bucket: " \
118 <<params->op.bucket.info.bucket.name << dendl;
119 /* not found */
120 return NULL;
121 }
122
123 Ob = iter->second;
124
125 if (!Op.compare("PutObject"))
126 return Ob->PutObject;
127 if (!Op.compare("DeleteObject"))
128 return Ob->DeleteObject;
129 if (!Op.compare("GetObject"))
130 return Ob->GetObject;
131 if (!Op.compare("UpdateObject"))
132 return Ob->UpdateObject;
133 if (!Op.compare("ListBucketObjects"))
134 return Ob->ListBucketObjects;
135 if (!Op.compare("PutObjectData"))
136 return Ob->PutObjectData;
137 if (!Op.compare("UpdateObjectData"))
138 return Ob->UpdateObjectData;
139 if (!Op.compare("GetObjectData"))
140 return Ob->GetObjectData;
141 if (!Op.compare("DeleteObjectData"))
142 return Ob->DeleteObjectData;
143
144 return NULL;
145}
146
147int DB::objectmapInsert(const DoutPrefixProvider *dpp, string bucket, class ObjectOp* ptr)
148{
149 map<string, class ObjectOp*>::iterator iter;
150 class ObjectOp *Ob;
151
152 const std::lock_guard<std::mutex> lk(mtx);
153 iter = DB::objectmap.find(bucket);
154
155 if (iter != DB::objectmap.end()) {
156 // entry already exists
157 // return success or replace it or
158 // return error ?
159 //
160 // return success for now & delete the newly allocated ptr
161 ldpp_dout(dpp, 20)<<"Objectmap entry already exists for bucket("\
162 <<bucket<<"). Not inserted " << dendl;
163 delete ptr;
164 return 0;
165 }
166
167 Ob = (class ObjectOp*) ptr;
168 Ob->InitializeObjectOps(getDBname(), dpp);
169
170 DB::objectmap.insert(pair<string, class ObjectOp*>(bucket, Ob));
171
172 return 0;
173}
174
175int DB::objectmapDelete(const DoutPrefixProvider *dpp, string bucket)
176{
177 map<string, class ObjectOp*>::iterator iter;
178 class ObjectOp *Ob;
179
180 const std::lock_guard<std::mutex> lk(mtx);
181 iter = DB::objectmap.find(bucket);
182
183 if (iter == DB::objectmap.end()) {
184 // entry doesn't exist
185 // return success or return error ?
186 // return success for now
187 ldpp_dout(dpp, 20)<<"Objectmap entry for bucket("<<bucket<<") "
188 <<"doesnt exist to delete " << dendl;
189 return 0;
190 }
191
192 Ob = (class ObjectOp*) (iter->second);
193 Ob->FreeObjectOps(dpp);
194
195 DB::objectmap.erase(iter);
196
197 return 0;
198}
199
200int DB::InitializeParams(const DoutPrefixProvider *dpp, string Op, DBOpParams *params)
201{
202 int ret = -1;
203
204 if (!params)
205 goto out;
206
207 params->cct = cct;
208
209 //reset params here
210 params->user_table = user_table;
211 params->bucket_table = bucket_table;
212 params->lc_entry_table = lc_entry_table;
213 params->lc_head_table = lc_head_table;
214
215 ret = 0;
216out:
217 return ret;
218}
219
220int DB::ProcessOp(const DoutPrefixProvider *dpp, string Op, struct DBOpParams *params) {
221 int ret = -1;
222 class DBOp *db_op;
223
224 db_op = getDBOp(dpp, Op, params);
225
226 if (!db_op) {
227 ldpp_dout(dpp, 0)<<"No db_op found for Op("<<Op<<")" << dendl;
228 return ret;
229 }
230 ret = db_op->Execute(dpp, params);
231
232 if (ret) {
233 ldpp_dout(dpp, 0)<<"In Process op Execute failed for fop(" \
234 <<Op.c_str()<<") " << dendl;
235 } else {
236 ldpp_dout(dpp, 20)<<"Successfully processed fop(" \
237 <<Op.c_str()<<") " << dendl;
238 }
239
240 return ret;
241}
242
243int DB::get_user(const DoutPrefixProvider *dpp,
244 const std::string& query_str, const std::string& query_str_val,
245 RGWUserInfo& uinfo, map<string, bufferlist> *pattrs,
246 RGWObjVersionTracker *pobjv_tracker) {
247 int ret = 0;
248
249 if (query_str.empty()) {
250 // not checking for query_str_val as the query can be to fetch
251 // entries with null values
252 return -1;
253 }
254
255 DBOpParams params = {};
256 InitializeParams(dpp, "GetUser", &params);
257
258 params.op.query_str = query_str;
259
260 // validate query_str with UserTable entries names
261 if (query_str == "username") {
262 params.op.user.uinfo.display_name = query_str_val;
263 } else if (query_str == "email") {
264 params.op.user.uinfo.user_email = query_str_val;
265 } else if (query_str == "access_key") {
266 RGWAccessKey k(query_str_val, "");
267 map<string, RGWAccessKey> keys;
268 keys[query_str_val] = k;
269 params.op.user.uinfo.access_keys = keys;
270 } else if (query_str == "user_id") {
271 params.op.user.uinfo.user_id = uinfo.user_id;
272 } else {
273 ldpp_dout(dpp, 0)<<"In GetUser Invalid query string :" <<query_str.c_str()<<") " << dendl;
274 return -1;
275 }
276
277 ret = ProcessOp(dpp, "GetUser", &params);
278
279 if (ret)
280 goto out;
281
282 /* Verify if its a valid user */
283 if (params.op.user.uinfo.access_keys.empty()) {
284 ldpp_dout(dpp, 0)<<"In GetUser - No user with query(" <<query_str.c_str()<<"), user_id(" << uinfo.user_id <<") found" << dendl;
285 return -ENOENT;
286 }
287
288 uinfo = params.op.user.uinfo;
289
290 if (pattrs) {
291 *pattrs = params.op.user.user_attrs;
292 }
293
294 if (pobjv_tracker) {
295 pobjv_tracker->read_version = params.op.user.user_version;
296 }
297
298out:
299 return ret;
300}
301
302int DB::store_user(const DoutPrefixProvider *dpp,
303 RGWUserInfo& uinfo, bool exclusive, map<string, bufferlist> *pattrs,
304 RGWObjVersionTracker *pobjv, RGWUserInfo* pold_info)
305{
306 DBOpParams params = {};
307 InitializeParams(dpp, "CreateUser", &params);
308 int ret = 0;
309
310 /* Check if the user already exists and return the old info, caller will have a use for it */
311 RGWUserInfo orig_info;
312 RGWObjVersionTracker objv_tracker = {};
313 obj_version& obj_ver = objv_tracker.read_version;
314
315 orig_info.user_id = uinfo.user_id;
316 ret = get_user(dpp, string("user_id"), "", orig_info, nullptr, &objv_tracker);
317
318 if (!ret && obj_ver.ver) {
319 /* already exists. */
320
321 if (pold_info) {
322 *pold_info = orig_info;
323 }
324
325 if (pobjv && (pobjv->read_version.ver != obj_ver.ver)) {
326 /* Object version mismatch.. return ECANCELED */
327 ret = -ECANCELED;
328 ldpp_dout(dpp, 0)<<"User Read version mismatch err:(" <<ret<<") " << dendl;
329 return ret;
330 }
331
332 if (exclusive) {
333 // return
334 return ret;
335 }
336 obj_ver.ver++;
337 } else {
338 obj_ver.ver = 1;
339 obj_ver.tag = "UserTAG";
340 }
341
342 params.op.user.user_version = obj_ver;
343 params.op.user.uinfo = uinfo;
344
345 if (pattrs) {
346 params.op.user.user_attrs = *pattrs;
347 }
348
349 ret = ProcessOp(dpp, "InsertUser", &params);
350
351 if (ret) {
352 ldpp_dout(dpp, 0)<<"store_user failed with err:(" <<ret<<") " << dendl;
353 goto out;
354 }
355
356 if (pobjv) {
357 pobjv->read_version = obj_ver;
358 pobjv->write_version = obj_ver;
359 }
360
361out:
362 return ret;
363}
364
365int DB::remove_user(const DoutPrefixProvider *dpp,
366 RGWUserInfo& uinfo, RGWObjVersionTracker *pobjv)
367{
368 DBOpParams params = {};
369 InitializeParams(dpp, "CreateUser", &params);
370 int ret = 0;
371
372 RGWUserInfo orig_info;
373 RGWObjVersionTracker objv_tracker = {};
374
375 orig_info.user_id = uinfo.user_id;
376 ret = get_user(dpp, string("user_id"), "", orig_info, nullptr, &objv_tracker);
377
378 if (!ret && objv_tracker.read_version.ver) {
379 /* already exists. */
380
381 if (pobjv && (pobjv->read_version.ver != objv_tracker.read_version.ver)) {
382 /* Object version mismatch.. return ECANCELED */
383 ret = -ECANCELED;
384 ldpp_dout(dpp, 0)<<"User Read version mismatch err:(" <<ret<<") " << dendl;
385 return ret;
386 }
387 }
388
389 params.op.user.uinfo.user_id = uinfo.user_id;
390
391 ret = ProcessOp(dpp, "RemoveUser", &params);
392
393 if (ret) {
394 ldpp_dout(dpp, 0)<<"remove_user failed with err:(" <<ret<<") " << dendl;
395 goto out;
396 }
397
398out:
399 return ret;
400}
401
402int DB::get_bucket_info(const DoutPrefixProvider *dpp, const std::string& query_str,
403 const std::string& query_str_val,
404 RGWBucketInfo& info,
405 rgw::sal::Attrs* pattrs, ceph::real_time* pmtime,
406 obj_version* pbucket_version) {
407 int ret = 0;
408
409 if (query_str.empty()) {
410 // not checking for query_str_val as the query can be to fetch
411 // entries with null values
412 return -1;
413 }
414
415 DBOpParams params = {};
416 DBOpParams params2 = {};
417 InitializeParams(dpp, "GetBucket", &params);
418
419 if (query_str == "name") {
420 params.op.bucket.info.bucket.name = info.bucket.name;
421 } else {
422 ldpp_dout(dpp, 0)<<"In GetBucket Invalid query string :" <<query_str.c_str()<<") " << dendl;
423 return -1;
424 }
425
426 ret = ProcessOp(dpp, "GetBucket", &params);
427
428 if (ret) {
429 ldpp_dout(dpp, 0)<<"In GetBucket failed err:(" <<ret<<") " << dendl;
430 goto out;
431 }
432
433 if (!ret && params.op.bucket.info.bucket.marker.empty()) {
434 return -ENOENT;
435 }
436 info = params.op.bucket.info;
437
438 if (pattrs) {
439 *pattrs = params.op.bucket.bucket_attrs;
440 }
441
442 if (pmtime) {
443 *pmtime = params.op.bucket.mtime;
444 }
445 if (pbucket_version) {
446 *pbucket_version = params.op.bucket.bucket_version;
447 }
448
449out:
450 return ret;
451}
452
453int DB::create_bucket(const DoutPrefixProvider *dpp,
454 const RGWUserInfo& owner, rgw_bucket& bucket,
455 const string& zonegroup_id,
456 const rgw_placement_rule& placement_rule,
457 const string& swift_ver_location,
458 const RGWQuotaInfo * pquota_info,
459 map<std::string, bufferlist>& attrs,
460 RGWBucketInfo& info,
461 obj_version *pobjv,
462 obj_version *pep_objv,
463 real_time creation_time,
464 rgw_bucket *pmaster_bucket,
465 uint32_t *pmaster_num_shards,
466 optional_yield y,
467 bool exclusive)
468{
469 /*
470 * XXX: Simple creation for now.
471 *
472 * Referring to RGWRados::create_bucket(),
473 * Check if bucket already exists, select_bucket_placement,
474 * is explicit put/remove instance info needed? - should not be ideally
475 */
476
477 DBOpParams params = {};
478 InitializeParams(dpp, "CreateBucket", &params);
479 int ret = 0;
480
481 /* Check if the bucket already exists and return the old info, caller will have a use for it */
482 RGWBucketInfo orig_info;
483 orig_info.bucket.name = bucket.name;
484 ret = get_bucket_info(dpp, string("name"), "", orig_info, nullptr, nullptr, nullptr);
485
486 if (!ret && !orig_info.owner.id.empty() && exclusive) {
487 /* already exists. Return the old info */
488
489 info = std::move(orig_info);
490 return ret;
491 }
492
493 RGWObjVersionTracker& objv_tracker = info.objv_tracker;
494
495 objv_tracker.read_version.clear();
496
497 if (pobjv) {
498 objv_tracker.write_version = *pobjv;
499 } else {
500 objv_tracker.generate_new_write_ver(cct);
501 }
502 params.op.bucket.bucket_version = objv_tracker.write_version;
503 objv_tracker.read_version = params.op.bucket.bucket_version;
504
505 uint64_t bid = next_bucket_id();
506 string s = getDBname() + "." + std::to_string(bid);
507 bucket.marker = bucket.bucket_id = s;
508
509 info.bucket = bucket;
510 info.owner = owner.user_id;
511 info.zonegroup = zonegroup_id;
512 info.placement_rule = placement_rule;
513 info.swift_ver_location = swift_ver_location;
514 info.swift_versioning = (!swift_ver_location.empty());
515
516 info.requester_pays = false;
517 if (real_clock::is_zero(creation_time)) {
518 info.creation_time = ceph::real_clock::now();
519 } else {
520 info.creation_time = creation_time;
521 }
522 if (pquota_info) {
523 info.quota = *pquota_info;
524 }
525
526 params.op.bucket.info = info;
527 params.op.bucket.bucket_attrs = attrs;
528 params.op.bucket.mtime = ceph::real_time();
529 params.op.user.uinfo.user_id.id = owner.user_id.id;
530
531 ret = ProcessOp(dpp, "InsertBucket", &params);
532
533 if (ret) {
534 ldpp_dout(dpp, 0)<<"create_bucket failed with err:(" <<ret<<") " << dendl;
535 goto out;
536 }
537
538out:
539 return ret;
540}
541
542int DB::remove_bucket(const DoutPrefixProvider *dpp, const RGWBucketInfo info) {
543 int ret = 0;
544
545 DBOpParams params = {};
546 InitializeParams(dpp, "RemoveBucket", &params);
547
548 params.op.bucket.info.bucket.name = info.bucket.name;
549
550 ret = ProcessOp(dpp, "RemoveBucket", &params);
551
552 if (ret) {
553 ldpp_dout(dpp, 0)<<"In RemoveBucket failed err:(" <<ret<<") " << dendl;
554 goto out;
555 }
556
557out:
558 return ret;
559}
560
561int DB::list_buckets(const DoutPrefixProvider *dpp, const rgw_user& user,
562 const string& marker,
563 const string& end_marker,
564 uint64_t max,
565 bool need_stats,
566 RGWUserBuckets *buckets,
567 bool *is_truncated)
568{
569 int ret = 0;
570
571 DBOpParams params = {};
572 InitializeParams(dpp, "ListUserBuckets", &params);
573
574 params.op.user.uinfo.user_id = user;
575 params.op.bucket.min_marker = marker;
576 params.op.bucket.max_marker = end_marker;
577 params.op.list_max_count = max;
578
579 ret = ProcessOp(dpp, "ListUserBuckets", &params);
580
581 if (ret) {
582 ldpp_dout(dpp, 0)<<"In ListUserBuckets failed err:(" <<ret<<") " << dendl;
583 goto out;
584 }
585
586 /* need_stats: stats are already part of entries... In case they are maintained in
587 * separate table , maybe use "Inner Join" with stats table for the query.
588 */
589 if (params.op.bucket.list_entries.size() == max)
590 *is_truncated = true;
591
592 for (auto& entry : params.op.bucket.list_entries) {
593 if (!end_marker.empty() &&
594 end_marker.compare(entry.bucket.marker) <= 0) {
595 *is_truncated = false;
596 break;
597 }
598 buckets->add(std::move(entry));
599 }
600out:
601 return ret;
602}
603
604int DB::update_bucket(const DoutPrefixProvider *dpp, const std::string& query_str,
605 RGWBucketInfo& info,
606 bool exclusive,
607 const rgw_user* powner_id,
608 map<std::string, bufferlist>* pattrs,
609 ceph::real_time* pmtime,
610 RGWObjVersionTracker* pobjv)
611{
612 int ret = 0;
613 DBOpParams params = {};
614 obj_version bucket_version;
615 RGWBucketInfo orig_info;
616
617 /* Check if the bucket already exists and return the old info, caller will have a use for it */
618 orig_info.bucket.name = info.bucket.name;
619 params.op.bucket.info.bucket.name = info.bucket.name;
620 ret = get_bucket_info(dpp, string("name"), "", orig_info, nullptr, nullptr,
621 &bucket_version);
622
623 if (ret) {
624 ldpp_dout(dpp, 0)<<"Failed to read bucket info err:(" <<ret<<") " << dendl;
625 goto out;
626 }
627
628 if (!orig_info.owner.id.empty() && exclusive) {
629 /* already exists. Return the old info */
630
631 info = std::move(orig_info);
632 return ret;
633 }
634
635 /* Verify if the objv read_ver matches current bucket version */
636 if (pobjv) {
637 if (pobjv->read_version.ver != bucket_version.ver) {
638 ldpp_dout(dpp, 0)<<"Read version mismatch err:(" <<ret<<") " << dendl;
639 ret = -ECANCELED;
640 goto out;
641 }
642 } else {
643 pobjv = &info.objv_tracker;
644 }
645
646 InitializeParams(dpp, "UpdateBucket", &params);
647
648 params.op.bucket.info.bucket.name = info.bucket.name;
649
650 if (powner_id) {
651 params.op.user.uinfo.user_id.id = powner_id->id;
652 } else {
653 params.op.user.uinfo.user_id.id = orig_info.owner.id;
654 }
655
656 /* Update version & mtime */
657 params.op.bucket.bucket_version.ver = ++(bucket_version.ver);
658
659 if (pmtime) {
660 params.op.bucket.mtime = *pmtime;;
661 } else {
662 params.op.bucket.mtime = ceph::real_time();
663 }
664
665 if (query_str == "attrs") {
666 params.op.query_str = "attrs";
667 params.op.bucket.bucket_attrs = *pattrs;
668 } else if (query_str == "owner") {
669 /* Update only owner i.e, chown.
670 * Update creation_time too */
671 params.op.query_str = "owner";
672 params.op.bucket.info.creation_time = params.op.bucket.mtime;
673 } else if (query_str == "info") {
674 params.op.query_str = "info";
675 params.op.bucket.info = info;
676 } else {
677 ret = -1;
678 ldpp_dout(dpp, 0)<<"In UpdateBucket Invalid query_str : " << query_str << dendl;
679 goto out;
680 }
681
682 ret = ProcessOp(dpp, "UpdateBucket", &params);
683
684 if (ret) {
685 ldpp_dout(dpp, 0)<<"In UpdateBucket failed err:(" <<ret<<") " << dendl;
686 goto out;
687 }
688
689 if (pobjv) {
690 pobjv->read_version = params.op.bucket.bucket_version;
691 pobjv->write_version = params.op.bucket.bucket_version;
692 }
693
694out:
695 return ret;
696}
697
698int DB::Bucket::List::list_objects(const DoutPrefixProvider *dpp, int64_t max,
699 vector<rgw_bucket_dir_entry> *result,
700 map<string, bool> *common_prefixes, bool *is_truncated)
701{
702 int ret = 0;
703 DB *store = target->get_store();
704
705 DBOpParams db_params = {};
706 store->InitializeParams(dpp, "ListBucketObjects", &db_params);
707
708 db_params.op.bucket.info = target->get_bucket_info();
709 /* XXX: Handle whole marker? key -> name, instance, ns? */
710 db_params.op.obj.min_marker = params.marker.name;
711 db_params.op.obj.max_marker = params.end_marker.name;
712 db_params.op.list_max_count = max + 1; /* +1 for next_marker */
713
714 ret = store->ProcessOp(dpp, "ListBucketObjects", &db_params);
715
716 if (ret) {
717 ldpp_dout(dpp, 0)<<"In ListBucketObjects failed err:(" <<ret<<") " << dendl;
718 goto out;
719 }
720
721 if (db_params.op.obj.list_entries.size() >= (uint64_t)max) {
722 *is_truncated = true;
723 next_marker.name = db_params.op.obj.list_entries.back().key.name;
724 next_marker.instance = db_params.op.obj.list_entries.back().key.instance;
725 db_params.op.obj.list_entries.pop_back();
726 }
727
728 for (auto& entry : db_params.op.obj.list_entries) {
729 if (!params.end_marker.name.empty() &&
730 params.end_marker.name.compare(entry.key.name) <= 0) {
731 *is_truncated = false;
732 break;
733 }
734 result->push_back(std::move(entry));
735 }
736out:
737 return ret;
738}
739
740int DB::raw_obj::InitializeParamsfromRawObj(const DoutPrefixProvider *dpp,
741 DBOpParams* params) {
742 int ret = 0;
743
744 if (!params)
745 return -1;
746
747 params->object_table = obj_table;
748 params->objectdata_table = obj_data_table;
749 params->op.bucket.info.bucket.name = bucket_name;
750 params->op.obj.state.obj.key.name = obj_name;
751 params->op.obj.state.obj.key.instance = obj_instance;
752 params->op.obj.state.obj.key.ns = obj_ns;
753
754 if (multipart_part_str != "0.0") {
755 params->op.obj.is_multipart = true;
756 } else {
757 params->op.obj.is_multipart = false;
758 }
759
760 params->op.obj_data.multipart_part_str = multipart_part_str;
761 params->op.obj_data.part_num = part_num;
762
763 return ret;
764}
765
766int DB::Object::InitializeParamsfromObject(const DoutPrefixProvider *dpp,
767 DBOpParams* params) {
768 int ret = 0;
769 string bucket = bucket_info.bucket.name;
770
771 if (!params)
772 return -1;
773
774 params->object_table = store->getObjectTable(bucket);
775 params->objectdata_table = store->getObjectDataTable(bucket);
776 params->op.bucket.info.bucket.name = bucket;
777 params->op.obj.state.obj = obj;
778
779 return ret;
780}
781
782int DB::Object::obj_omap_set_val_by_key(const DoutPrefixProvider *dpp,
783 const std::string& key, bufferlist& val,
784 bool must_exist) {
785 int ret = 0;
786
787 DBOpParams params = {};
788
789 store->InitializeParams(dpp, "GetObject", &params);
790 InitializeParamsfromObject(dpp, &params);
791
792 ret = store->ProcessOp(dpp, "GetObject", &params);
793
794 if (ret) {
795 ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <<ret<<")" << dendl;
796 goto out;
797 }
798
799 /* pick one field check if object exists */
800 if (!params.op.obj.state.exists) {
801 ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
802 return -1;
803 }
804
805 params.op.obj.omap[key] = val;
806 params.op.query_str = "omap";
807 params.op.obj.state.mtime = real_clock::now();
808
809 ret = store->ProcessOp(dpp, "UpdateObject", &params);
810
811 if (ret) {
812 ldpp_dout(dpp, 0)<<"In UpdateObject failed err:(" <<ret<<") " << dendl;
813 goto out;
814 }
815
816out:
817 return ret;
818}
819
820int DB::Object::obj_omap_get_vals_by_keys(const DoutPrefixProvider *dpp,
821 const std::string& oid,
822 const std::set<std::string>& keys,
823 std::map<std::string, bufferlist>* vals)
824{
825 int ret = 0;
826 DBOpParams params = {};
827 std::map<std::string, bufferlist> omap;
828
829 if (!vals)
830 return -1;
831
832 store->InitializeParams(dpp, "GetObject", &params);
833 InitializeParamsfromObject(dpp, &params);
834
835 ret = store->ProcessOp(dpp, "GetObject", &params);
836
837 if (ret) {
838 ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <<ret<<") " << dendl;
839 goto out;
840 }
841
842 /* pick one field check if object exists */
843 if (!params.op.obj.state.exists) {
844 ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
845 return -1;
846 }
847
848 omap = params.op.obj.omap;
849
850 for (const auto& k : keys) {
851 (*vals)[k] = omap[k];
852 }
853
854out:
855 return ret;
856}
857
858int DB::Object::add_mp_part(const DoutPrefixProvider *dpp,
859 RGWUploadPartInfo info) {
860 int ret = 0;
861
862 DBOpParams params = {};
863
864 store->InitializeParams(dpp, "GetObject", &params);
865 InitializeParamsfromObject(dpp, &params);
866
867 ret = store->ProcessOp(dpp, "GetObject", &params);
868
869 if (ret) {
870 ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <<ret<<")" << dendl;
871 goto out;
872 }
873
874 /* pick one field check if object exists */
875 if (!params.op.obj.state.exists) {
876 ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
877 return -1;
878 }
879
880 params.op.obj.mp_parts.push_back(info);
881 params.op.query_str = "mp";
882 params.op.obj.state.mtime = real_clock::now();
883
884 ret = store->ProcessOp(dpp, "UpdateObject", &params);
885
886 if (ret) {
887 ldpp_dout(dpp, 0)<<"In UpdateObject failed err:(" <<ret<<") " << dendl;
888 goto out;
889 }
890
891out:
892 return ret;
893}
894
895int DB::Object::get_mp_parts_list(const DoutPrefixProvider *dpp,
896 std::list<RGWUploadPartInfo>& info)
897{
898 int ret = 0;
899 DBOpParams params = {};
900 std::map<std::string, bufferlist> omap;
901
902 store->InitializeParams(dpp, "GetObject", &params);
903 InitializeParamsfromObject(dpp, &params);
904
905 ret = store->ProcessOp(dpp, "GetObject", &params);
906
907 if (ret) {
908 ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <<ret<<") " << dendl;
909 goto out;
910 }
911
912 /* pick one field check if object exists */
913 if (!params.op.obj.state.exists) {
914 ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
915 return -1;
916 }
917
918 info = params.op.obj.mp_parts;
919
920out:
921 return ret;
922}
923
924/* Taken from rgw_rados.cc */
925void DB::gen_rand_obj_instance_name(rgw_obj_key *target_key)
926{
927#define OBJ_INSTANCE_LEN 32
928 char buf[OBJ_INSTANCE_LEN + 1];
929
930 gen_rand_alphanumeric_no_underscore(cct, buf, OBJ_INSTANCE_LEN); /* don't want it to get url escaped,
931 no underscore for instance name due to the way we encode the raw keys */
932
933 target_key->set_instance(buf);
934}
935
936int DB::Object::obj_omap_get_all(const DoutPrefixProvider *dpp,
937 std::map<std::string, bufferlist> *m)
938{
939 int ret = 0;
940 DBOpParams params = {};
941 std::map<std::string, bufferlist> omap;
942
943 if (!m)
944 return -1;
945
946 store->InitializeParams(dpp, "GetObject", &params);
947 InitializeParamsfromObject(dpp, &params);
948
949 ret = store->ProcessOp(dpp, "GetObject", &params);
950
951 if (ret) {
952 ldpp_dout(dpp, 0)<<"In GetObject failed err:(" <<ret<<")" << dendl;
953 goto out;
954 }
955
956 /* pick one field check if object exists */
957 if (!params.op.obj.state.exists) {
958 ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
959 return -1;
960 }
961
962 (*m) = params.op.obj.omap;
963
964out:
965 return ret;
966}
967
968int DB::Object::obj_omap_get_vals(const DoutPrefixProvider *dpp,
969 const std::string& marker,
970 uint64_t max_count,
971 std::map<std::string, bufferlist> *m, bool* pmore)
972{
973 int ret = 0;
974 DBOpParams params = {};
975 std::map<std::string, bufferlist> omap;
976 map<string, bufferlist>::iterator iter;
977 uint64_t count = 0;
978
979 if (!m)
980 return -1;
981
982 store->InitializeParams(dpp, "GetObject", &params);
983 InitializeParamsfromObject(dpp, &params);
984
985 ret = store->ProcessOp(dpp, "GetObject", &params);
986
987 if (ret) {
988 ldpp_dout(dpp, 0)<<"In GetObject failed err:(" <<ret<<")" << dendl;
989 goto out;
990 }
991
992 /* pick one field check if object exists */
993 if (!params.op.obj.state.exists) {
994 ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
995 return -1;
996 }
997
998 omap = params.op.obj.omap;
999
1000 for (iter = omap.begin(); iter != omap.end(); ++iter) {
1001
1002 if (iter->first < marker)
1003 continue;
1004
1005 if ((++count) > max_count) {
1006 *pmore = true;
1007 break;
1008 }
1009
1010 (*m)[iter->first] = iter->second;
1011 }
1012
1013out:
1014 return ret;
1015}
1016
1017int DB::Object::set_attrs(const DoutPrefixProvider *dpp,
1018 map<string, bufferlist>& setattrs,
1019 map<string, bufferlist>* rmattrs)
1020{
1021 int ret = 0;
1022
1023 DBOpParams params = {};
1024 rgw::sal::Attrs *attrs;
1025 map<string, bufferlist>::iterator iter;
1026
1027 store->InitializeParams(dpp, "GetObject", &params);
1028 InitializeParamsfromObject(dpp, &params);
1029
1030 ret = store->ProcessOp(dpp, "GetObject", &params);
1031
1032 if (ret) {
1033 ldpp_dout(dpp, 0) <<"In GetObject failed err:(" <<ret<<")" << dendl;
1034 goto out;
1035 }
1036
1037 /* pick one field check if object exists */
1038 if (!params.op.obj.state.exists) {
1039 ldpp_dout(dpp, 0)<<"Object(bucket:" << bucket_info.bucket.name << ", Object:"<< obj.key.name << ") doesn't exist" << dendl;
1040 return -1;
1041 }
1042
1043 /* For now lets keep it simple..rmattrs & setattrs ..
1044 * XXX: Check rgw_rados::set_attrs
1045 */
1046 attrs = &params.op.obj.state.attrset;
1047 if (rmattrs) {
1048 for (iter = rmattrs->begin(); iter != rmattrs->end(); ++iter) {
1049 (*attrs).erase(iter->first);
1050 }
1051 }
1052 for (iter = setattrs.begin(); iter != setattrs.end(); ++iter) {
1053 (*attrs)[iter->first] = iter->second;
1054 }
1055
1056 params.op.query_str = "attrs";
1057 params.op.obj.state.mtime = real_clock::now();
1058
1059 ret = store->ProcessOp(dpp, "UpdateObject", &params);
1060
1061 if (ret) {
1062 ldpp_dout(dpp, 0)<<"In UpdateObject failed err:(" <<ret<<") " << dendl;
1063 goto out;
1064 }
1065
1066out:
1067 return ret;
1068}
1069
1070int DB::raw_obj::read(const DoutPrefixProvider *dpp, int64_t ofs,
1071 uint64_t len, bufferlist& bl)
1072{
1073 int ret = 0;
1074 DBOpParams params = {};
1075
1076 db->InitializeParams(dpp, "GetObjectData", &params);
1077 InitializeParamsfromRawObj(dpp, &params);
1078
1079 ret = db->ProcessOp(dpp, "GetObjectData", &params);
1080
1081 if (ret) {
1082 ldpp_dout(dpp, 0)<<"In GetObjectData failed err:(" <<ret<<")" << dendl;
1083 return ret;
1084 }
1085
1086 bufferlist& read_bl = params.op.obj_data.data;
1087
1088 unsigned copy_len;
1089 copy_len = std::min((uint64_t)read_bl.length() - ofs, len);
1090 read_bl.begin(ofs).copy(copy_len, bl);
1091 return bl.length();
1092}
1093
1094int DB::raw_obj::write(const DoutPrefixProvider *dpp, int64_t ofs, int64_t write_ofs,
1095 uint64_t len, bufferlist& bl)
1096{
1097 int ret = 0;
1098 DBOpParams params = {};
1099
1100 db->InitializeParams(dpp, "PutObjectData", &params);
1101 InitializeParamsfromRawObj(dpp, &params);
1102
1103 /* XXX: Check for chunk_size ?? */
1104 params.op.obj_data.offset = ofs;
1105 unsigned write_len = std::min((uint64_t)bl.length() - write_ofs, len);
1106 bl.begin(write_ofs).copy(write_len, params.op.obj_data.data);
1107 params.op.obj_data.size = params.op.obj_data.data.length();
1108
1109 ret = db->ProcessOp(dpp, "PutObjectData", &params);
1110
1111 if (ret) {
1112 ldpp_dout(dpp, 0)<<"In PutObjectData failed err:(" <<ret<<")" << dendl;
1113 return ret;
1114 }
1115
1116 return write_len;
1117}
1118
1119int DB::Object::follow_olh(const DoutPrefixProvider *dpp,
1120 const RGWBucketInfo& bucket_info, RGWObjState *state,
1121 const rgw_obj& olh_obj, rgw_obj *target)
1122{
1123 auto iter = state->attrset.find(RGW_ATTR_OLH_INFO);
1124 if (iter == state->attrset.end()) {
1125 return -EINVAL;
1126 }
1127
1128 DBOLHInfo olh;
1129 string s;
1130 const bufferlist& bl = iter->second;
1131 try {
1132 auto biter = bl.cbegin();
1133 decode(olh, biter);
1134 } catch (buffer::error& err) {
1135 return -EIO;
1136 }
1137
1138 if (olh.removed) {
1139 return -ENOENT;
1140 }
1141
1142 *target = olh.target;
1143
1144 return 0;
1145}
1146
1147int DB::Object::get_olh_target_state(const DoutPrefixProvider *dpp,
1148 const RGWBucketInfo& bucket_info, const rgw_obj& obj,
1149 RGWObjState* olh_state, RGWObjState** target)
1150{
1151 int ret = 0;
1152 rgw_obj target_obj;
1153
1154 if (!olh_state->is_olh) {
1155 return EINVAL;
1156 }
1157
1158 ret = follow_olh(dpp, bucket_info, olh_state, obj, &target_obj); /* might return -EAGAIN */
1159 if (ret < 0) {
1160 ldpp_dout(dpp, 0)<<"In get_olh_target_state follow_olh() failed err:(" <<ret<<")" << dendl;
1161 return ret;
1162 }
1163
1164 ret = get_obj_state(dpp, bucket_info, target_obj, false, target);
1165
1166 return ret;
1167}
1168
1169int DB::Object::get_obj_state(const DoutPrefixProvider *dpp,
1170 const RGWBucketInfo& bucket_info, const rgw_obj& obj,
1171 bool follow_olh, RGWObjState **state)
1172{
1173 int ret = 0;
1174
1175 DBOpParams params = {};
1176 RGWObjState* s;
1177 store->InitializeParams(dpp, "GetObject", &params);
1178 InitializeParamsfromObject(dpp, &params);
1179
1180 ret = store->ProcessOp(dpp, "GetObject", &params);
1181
1182 if (ret) {
1183 ldpp_dout(dpp, 0)<<"In GetObject failed err:(" <<ret<<")" << dendl;
1184 goto out;
1185 }
1186
1187 if (!params.op.obj.state.exists) {
1188 return -ENOENT;
1189 }
1190
1191 s = &params.op.obj.state;
1192 **state = *s;
1193
1194 if (follow_olh && params.op.obj.state.obj.key.instance.empty()) {
1195 /* fetch current version obj details */
1196 ret = get_olh_target_state(dpp, bucket_info, obj, s, state);
1197
1198 if (ret < 0) {
1199 ldpp_dout(dpp, 0)<<"get_olh_target_state failed err:(" <<ret<<")" << dendl;
1200 }
1201 }
1202
1203out:
1204 return ret;
1205
1206}
1207
1208int DB::Object::get_state(const DoutPrefixProvider *dpp, RGWObjState **pstate, bool follow_olh)
1209{
1210 return get_obj_state(dpp, bucket_info, obj, follow_olh, pstate);
1211}
1212
1213int DB::Object::get_manifest(const DoutPrefixProvider *dpp, RGWObjManifest **pmanifest)
1214{
1215 RGWObjState base_state;
1216 RGWObjState *astate = &base_state;
1217 int r = get_state(dpp, &astate, true);
1218 if (r < 0) {
1219 return r;
1220 }
1221
1222 *pmanifest = &(*astate->manifest);
1223
1224 return 0;
1225}
1226
1227int DB::Object::Read::get_attr(const DoutPrefixProvider *dpp, const char *name, bufferlist& dest)
1228{
1229 RGWObjState base_state;
1230 RGWObjState *state = &base_state;
1231 int r = source->get_state(dpp, &state, true);
1232 if (r < 0)
1233 return r;
1234 if (!state->exists)
1235 return -ENOENT;
1236 if (!state->get_attr(name, dest))
1237 return -ENODATA;
1238
1239 return 0;
1240}
1241
1242int DB::Object::Read::prepare(const DoutPrefixProvider *dpp)
1243{
1244 DB *store = source->get_store();
1245 CephContext *cct = store->ctx();
1246
1247 bufferlist etag;
1248
1249 map<string, bufferlist>::iterator iter;
1250
1251 RGWObjState base_state;
1252 RGWObjState *astate = &base_state;
1253 int r = source->get_state(dpp, &astate, true);
1254 if (r < 0)
1255 return r;
1256
1257 if (!astate->exists) {
1258 return -ENOENT;
1259 }
1260
1261 state.obj = astate->obj;
1262
1263 if (params.target_obj) {
1264 *params.target_obj = state.obj;
1265 }
1266 if (params.attrs) {
1267 *params.attrs = astate->attrset;
1268 if (cct->_conf->subsys.should_gather<ceph_subsys_rgw, 20>()) {
1269 for (iter = params.attrs->begin(); iter != params.attrs->end(); ++iter) {
1270 ldpp_dout(dpp, 20) << "Read xattr rgw_rados: " << iter->first << dendl;
1271 }
1272 }
1273 }
1274
1275 if (conds.if_match || conds.if_nomatch) {
1276 r = get_attr(dpp, RGW_ATTR_ETAG, etag);
1277 if (r < 0)
1278 return r;
1279
1280 if (conds.if_match) {
1281 string if_match_str = rgw_string_unquote(conds.if_match);
1282 ldpp_dout(dpp, 10) << "ETag: " << string(etag.c_str(), etag.length()) << " " << " If-Match: " << if_match_str << dendl;
1283 if (if_match_str.compare(0, etag.length(), etag.c_str(), etag.length()) != 0) {
1284 return -ERR_PRECONDITION_FAILED;
1285 }
1286 }
1287
1288 if (conds.if_nomatch) {
1289 string if_nomatch_str = rgw_string_unquote(conds.if_nomatch);
1290 ldpp_dout(dpp, 10) << "ETag: " << string(etag.c_str(), etag.length()) << " " << " If-NoMatch: " << if_nomatch_str << dendl;
1291 if (if_nomatch_str.compare(0, etag.length(), etag.c_str(), etag.length()) == 0) {
1292 return -ERR_NOT_MODIFIED;
1293 }
1294 }
1295 }
1296
1297 if (params.obj_size)
1298 *params.obj_size = astate->size;
1299 if (params.lastmod)
1300 *params.lastmod = astate->mtime;
1301
1302 return 0;
1303}
1304
1305int DB::Object::Read::range_to_ofs(uint64_t obj_size, int64_t &ofs, int64_t &end)
1306{
1307 if (ofs < 0) {
1308 ofs += obj_size;
1309 if (ofs < 0)
1310 ofs = 0;
1311 end = obj_size - 1;
1312 } else if (end < 0) {
1313 end = obj_size - 1;
1314 }
1315
1316 if (obj_size > 0) {
1317 if (ofs >= (off_t)obj_size) {
1318 return -ERANGE;
1319 }
1320 if (end >= (off_t)obj_size) {
1321 end = obj_size - 1;
1322 }
1323 }
1324 return 0;
1325}
1326
1327int DB::Object::Read::read(int64_t ofs, int64_t end, bufferlist& bl, const DoutPrefixProvider *dpp)
1328{
1329 DB *store = source->get_store();
1330
1331 uint64_t read_ofs = ofs;
1332 uint64_t len, read_len;
1333
1334 bufferlist read_bl;
1335 uint64_t max_chunk_size = store->get_max_chunk_size();
1336
1337 RGWObjState base_state;
1338 RGWObjState *astate = &base_state;
1339 int r = source->get_state(dpp, &astate, true);
1340 if (r < 0)
1341 return r;
1342
1343 if (!astate->exists) {
1344 return -ENOENT;
1345 }
1346
1347 if (astate->size == 0) {
1348 end = 0;
1349 } else if (end >= (int64_t)astate->size) {
1350 end = astate->size - 1;
1351 }
1352
1353 if (end < 0)
1354 len = 0;
1355 else
1356 len = end - ofs + 1;
1357
1358
1359 if (len > max_chunk_size) {
1360 len = max_chunk_size;
1361 }
1362
1363 int head_data_size = astate->data.length();
1364 bool reading_from_head = (ofs < head_data_size);
1365
1366 if (reading_from_head) {
1367 if (astate) { // && astate->prefetch_data)?
1368 if (!ofs && astate->data.length() >= len) {
1369 bl = astate->data;
1370 return bl.length();
1371 }
1372
1373 if (ofs < astate->data.length()) {
1374 unsigned copy_len = std::min((uint64_t)head_data_size - ofs, len);
1375 astate->data.begin(ofs).copy(copy_len, bl);
1376 return bl.length();
1377 }
1378 }
1379 }
1380
1381 /* tail object */
1382 int part_num = (ofs / max_chunk_size);
1383 /* XXX: Handle multipart_str */
1384 raw_obj read_obj(store, source->get_bucket_info().bucket.name, astate->obj.key.name,
1385 astate->obj.key.instance, astate->obj.key.ns, "0.0", part_num);
1386
1387 read_len = len;
1388
1389 ldpp_dout(dpp, 20) << "dbstore->read obj-ofs=" << ofs << " read_ofs=" << read_ofs << " read_len=" << read_len << dendl;
1390
1391 // read from non head object
1392 r = read_obj.read(dpp, read_ofs, read_len, bl);
1393
1394 if (r < 0) {
1395 return r;
1396 }
1397
1398 return bl.length();
1399}
1400
1401static int _get_obj_iterate_cb(const DoutPrefixProvider *dpp,
1402 const DB::raw_obj& read_obj, off_t obj_ofs,
1403 off_t len, bool is_head_obj,
1404 RGWObjState *astate, void *arg)
1405{
1406 struct db_get_obj_data* d = static_cast<struct db_get_obj_data*>(arg);
1407 return d->store->get_obj_iterate_cb(dpp, read_obj, obj_ofs, len,
1408 is_head_obj, astate, arg);
1409}
1410
1411int DB::get_obj_iterate_cb(const DoutPrefixProvider *dpp,
1412 const raw_obj& read_obj, off_t obj_ofs,
1413 off_t len, bool is_head_obj,
1414 RGWObjState *astate, void *arg)
1415{
1416 struct db_get_obj_data* d = static_cast<struct db_get_obj_data*>(arg);
1417 bufferlist bl;
1418 int r = 0;
1419
1420 if (is_head_obj) {
1421 bl = astate->data;
1422 } else {
1423 // read from non head object
1424 raw_obj robj = read_obj;
1425 /* read entire data. So pass offset as '0' & len as '-1' */
1426 r = robj.read(dpp, 0, -1, bl);
1427
1428 if (r < 0) {
1429 return r;
1430 }
1431 }
1432
1433 unsigned read_ofs = 0, read_len = 0;
1434 while (read_ofs < bl.length()) {
1435 unsigned chunk_len = std::min((uint64_t)bl.length() - read_ofs, (uint64_t)len);
1436 r = d->client_cb->handle_data(bl, read_ofs, chunk_len);
1437 if (r < 0)
1438 return r;
1439 read_ofs += chunk_len;
1440 read_len += chunk_len;
1441 ldpp_dout(dpp, 20) << "dbstore->get_obj_iterate_cb obj-ofs=" << obj_ofs << " len=" << len << " chunk_len = " << chunk_len << " read_len = " << read_len << dendl;
1442 }
1443
1444
1445 d->offset += read_len;
1446
1447 return read_len;
1448}
1449
1450int DB::Object::Read::iterate(const DoutPrefixProvider *dpp, int64_t ofs, int64_t end, RGWGetDataCB *cb)
1451{
1452 DB *store = source->get_store();
1453 const uint64_t chunk_size = store->get_max_chunk_size();
1454
1455 db_get_obj_data data(store, cb, ofs);
1456
1457 int r = source->iterate_obj(dpp, source->get_bucket_info(), state.obj,
1458 ofs, end, chunk_size, _get_obj_iterate_cb, &data);
1459 if (r < 0) {
1460 ldpp_dout(dpp, 0) << "iterate_obj() failed with " << r << dendl;
1461 return r;
1462 }
1463
1464 return 0;
1465}
1466
1467int DB::Object::iterate_obj(const DoutPrefixProvider *dpp,
1468 const RGWBucketInfo& bucket_info, const rgw_obj& obj,
1469 off_t ofs, off_t end, uint64_t max_chunk_size,
1470 iterate_obj_cb cb, void *arg)
1471{
1472 DB *store = get_store();
1473 uint64_t len;
1474 RGWObjState base_state;
1475 RGWObjState *astate = &base_state;
1476
1477 int r = get_state(dpp, &astate, true);
1478 if (r < 0) {
1479 return r;
1480 }
1481
1482 if (!astate->exists) {
1483 return -ENOENT;
1484 }
1485
1486 if (end < 0)
1487 len = 0;
1488 else
1489 len = end - ofs + 1;
1490
1491 /* XXX: Will it really help to store all parts info in astate like manifest in Rados? */
1492 int part_num = 0;
1493 int head_data_size = astate->data.length();
1494
1495 while (ofs <= end && (uint64_t)ofs < astate->size) {
1496 part_num = (ofs / max_chunk_size);
1497 uint64_t read_len = std::min(len, max_chunk_size);
1498
1499 /* XXX: Handle multipart_str */
1500 raw_obj read_obj(store, get_bucket_info().bucket.name, astate->obj.key.name,
1501 astate->obj.key.instance, astate->obj.key.ns, "0.0", part_num);
1502 bool reading_from_head = (ofs < head_data_size);
1503
1504 r = cb(dpp, read_obj, ofs, read_len, reading_from_head, astate, arg);
1505 if (r <= 0) {
1506 return r;
1507 }
1508 /* r refers to chunk_len (no. of bytes) handled in cb */
1509 len -= r;
1510 ofs += r;
1511 }
1512
1513 return 0;
1514}
1515
1516int DB::Object::Write::prepare(const DoutPrefixProvider* dpp)
1517{
1518 DB *store = target->get_store();
1519
1520 DBOpParams params = {};
1521 int ret = -1;
1522
1523 /* XXX: handle assume_noent */
1524 store->InitializeParams(dpp, "GetObject", &params);
1525 target->InitializeParamsfromObject(dpp, &params);
1526
1527 ret = store->ProcessOp(dpp, "GetObject", &params);
1528
1529 if (ret) {
1530 ldpp_dout(dpp, 0)<<"In GetObject failed err:(" <<ret<<")" << dendl;
1531 goto out;
1532 }
1533
1534 /* pick one field check if object exists */
1535 if (params.op.obj.state.exists) {
1536 ldpp_dout(dpp, 0)<<"Object(bucket:" << target->bucket_info.bucket.name << ", Object:"<< target->obj.key.name << ") exists" << dendl;
1537
1538 } else { /* create object entry in the object table */
1539 params.op.obj.storage_class = "STANDARD"; /* XXX: handle storage class */
1540 ret = store->ProcessOp(dpp, "PutObject", &params);
1541
1542 if (ret) {
1543 ldpp_dout(dpp, 0)<<"In PutObject failed err:(" <<ret<<")" << dendl;
1544 goto out;
1545 }
1546 }
1547
1548 obj_state = params.op.obj.state;
1549 ret = 0;
1550
1551out:
1552 return ret;
1553}
1554
1555/* writes tail objects */
1556int DB::Object::Write::write_data(const DoutPrefixProvider* dpp,
1557 bufferlist& data, uint64_t ofs) {
1558 DB *store = target->get_store();
1559 /* tail objects */
1560 /* XXX: Split into parts each of max_chunk_size. But later make tail
1561 * object chunk size limit to sqlite blob limit */
1562 int part_num = 0;
1563
1564 uint64_t max_chunk_size = store->get_max_chunk_size();
1565
1566 /* tail_obj ofs should be greater than max_head_size */
1567 if (mp_part_str == "0.0") { // ensure not multipart meta object
1568 if (ofs < store->get_max_head_size()) {
1569 return -1;
1570 }
1571 }
1572
1573 uint64_t end = data.length();
1574 uint64_t write_ofs = 0;
1575 /* as we are writing max_chunk_size at a time in sal_dbstore DBAtomicWriter::process(),
1576 * maybe this while loop is not needed
1577 */
1578 while (write_ofs < end) {
1579 part_num = (ofs / max_chunk_size);
1580 uint64_t len = std::min(end, max_chunk_size);
1581
1582 /* XXX: Handle multipart_str */
1583 raw_obj write_obj(store, target->get_bucket_info().bucket.name, obj_state.obj.key.name,
1584 obj_state.obj.key.instance, obj_state.obj.key.ns, mp_part_str, part_num);
1585
1586
1587 ldpp_dout(dpp, 20) << "dbstore->write obj-ofs=" << ofs << " write_len=" << len << dendl;
1588
1589 // write into non head object
1590 int r = write_obj.write(dpp, ofs, write_ofs, len, data);
1591 if (r < 0) {
1592 return r;
1593 }
1594 /* r refers to chunk_len (no. of bytes) handled in raw_obj::write */
1595 len -= r;
1596 ofs += r;
1597 write_ofs += r;
1598 }
1599
1600 return 0;
1601}
1602
1603/* Write metadata & head object data */
1604int DB::Object::Write::_do_write_meta(const DoutPrefixProvider *dpp,
1605 uint64_t size, uint64_t accounted_size,
1606 map<string, bufferlist>& attrs,
1607 bool assume_noent, bool modify_tail)
1608{
1609 DB *store = target->get_store();
1610
1611 RGWObjState *state = &obj_state;
1612 map<string, bufferlist> *attrset;
1613 DBOpParams params = {};
1614 int ret = 0;
1615 string etag;
1616 string content_type;
1617 bufferlist acl_bl;
1618 string storage_class;
1619
1620 map<string, bufferlist>::iterator iter;
1621
1622 store->InitializeParams(dpp, "PutObject", &params);
1623 target->InitializeParamsfromObject(dpp, &params);
1624
1625 obj_state = params.op.obj.state;
1626
1627 if (real_clock::is_zero(meta.set_mtime)) {
1628 meta.set_mtime = real_clock::now();
1629 }
1630
1631 attrset = &state->attrset;
1632 if (target->bucket_info.obj_lock_enabled() && target->bucket_info.obj_lock.has_rule()) {
1633 // && meta.flags == PUT_OBJ_CREATE) {
1634 auto iter = attrs.find(RGW_ATTR_OBJECT_RETENTION);
1635 if (iter == attrs.end()) {
1636 real_time lock_until_date = target->bucket_info.obj_lock.get_lock_until_date(meta.set_mtime);
1637 string mode = target->bucket_info.obj_lock.get_mode();
1638 RGWObjectRetention obj_retention(mode, lock_until_date);
1639 bufferlist bl;
1640 obj_retention.encode(bl);
1641 (*attrset)[RGW_ATTR_OBJECT_RETENTION] = bl;
1642 }
1643 }
1644
1645 if (state->is_olh) {
1646 (*attrset)[RGW_ATTR_OLH_ID_TAG] = state->olh_tag;
1647 }
1648
1649 state->mtime = meta.set_mtime;
1650
1651 if (meta.data) {
1652 /* if we want to overwrite the data, we also want to overwrite the
1653 xattrs, so just remove the object */
1654 params.op.obj.head_data = *meta.data;
1655 }
1656
1657 if (meta.rmattrs) {
1658 for (iter = meta.rmattrs->begin(); iter != meta.rmattrs->end(); ++iter) {
1659 const string& name = iter->first;
1660 (*attrset).erase(name.c_str());
1661 }
1662 }
1663
1664 if (meta.manifest) {
1665 storage_class = meta.manifest->get_tail_placement().placement_rule.storage_class;
1666
1667 /* remove existing manifest attr */
1668 iter = attrs.find(RGW_ATTR_MANIFEST);
1669 if (iter != attrs.end())
1670 attrs.erase(iter);
1671
1672 bufferlist bl;
1673 encode(*meta.manifest, bl);
1674 (*attrset)[RGW_ATTR_MANIFEST] = bl;
1675 }
1676
1677 for (iter = attrs.begin(); iter != attrs.end(); ++iter) {
1678 const string& name = iter->first;
1679 bufferlist& bl = iter->second;
1680
1681 if (!bl.length())
1682 continue;
1683
1684 (*attrset)[name.c_str()] = bl;
1685
1686 if (name.compare(RGW_ATTR_ETAG) == 0) {
1687 etag = rgw_bl_str(bl);
1688 params.op.obj.etag = etag;
1689 } else if (name.compare(RGW_ATTR_CONTENT_TYPE) == 0) {
1690 content_type = rgw_bl_str(bl);
1691 } else if (name.compare(RGW_ATTR_ACL) == 0) {
1692 acl_bl = bl;
1693 }
1694 }
1695
1696 if (!storage_class.empty()) {
1697 bufferlist bl;
1698 bl.append(storage_class);
1699 (*attrset)[RGW_ATTR_STORAGE_CLASS] = bl;
1700 }
1701
1702 params.op.obj.state = *state ;
1703 params.op.obj.state.exists = true;
1704 params.op.obj.state.size = size;
1705 params.op.obj.state.accounted_size = accounted_size;
1706 params.op.obj.owner = target->get_bucket_info().owner.id;
1707
1708 /* XXX: handle versioning */
1709 if (meta.mtime) {
1710 *meta.mtime = meta.set_mtime;
1711 }
1712
1713 /* XXX: handle multipart */
1714 params.op.query_str = "meta";
1715 ret = store->ProcessOp(dpp, "UpdateObject", &params);
1716
1717 if (ret) {
1718 ldpp_dout(dpp, 0)<<"In UpdateObject failed err:(" <<ret<<")" << dendl;
1719 goto out;
1720 }
1721
1722 /* pick one field check if object exists */
1723 return 0;
1724
1725out:
1726 if (ret < 0) {
1727 ldpp_dout(dpp, 0) << "ERROR: do_write_meta returned ret=" << ret << dendl;
1728 }
1729
1730 meta.canceled = true;
1731
1732 return ret;
1733}
1734
1735int DB::Object::Write::write_meta(const DoutPrefixProvider *dpp, uint64_t size, uint64_t accounted_size,
1736 map<string, bufferlist>& attrs)
1737{
1738 bool assume_noent = false;
1739 /* handle assume_noent */
1740 int r = _do_write_meta(dpp, size, accounted_size, attrs, assume_noent, meta.modify_tail);
1741 return r;
1742}
1743
1744int DB::Object::Write::update_mp_parts(const DoutPrefixProvider *dpp, rgw_obj_key new_obj_key)
1745{
1746 int ret = 0;
1747 DBOpParams params = {};
1748 DB *store = target->get_store();
1749
1750 store->InitializeParams(dpp, "UpdateObjectData", &params);
1751 target->InitializeParamsfromObject(dpp, &params);
1752
1753 params.op.obj.new_obj_key = new_obj_key;
1754
1755 ret = store->ProcessOp(dpp, "UpdateObjectData", &params);
1756
1757 if (ret) {
1758 ldpp_dout(dpp, 0)<<"In UpdateObjectData failed err:(" <<ret<<")" << dendl;
1759 return ret;
1760 }
1761
1762 return 0;
1763}
1764
1765int DB::Object::Delete::delete_obj(const DoutPrefixProvider *dpp) {
1766 int ret = 0;
1767 DB *store = target->get_store();
1768 RGWObjState base_state;
1769 RGWObjState *astate = &base_state;
1770
1771 int r = target->get_state(dpp, &astate, true);
1772 if (r < 0)
1773 return r;
1774
1775 if (!astate->exists) {
1776 return -ENOENT;
1777 }
1778
1779 /* XXX: handle versioned objects. Create delete marker */
1780
1781 /* XXX: check params conditions */
1782 DBOpParams del_params = {};
1783
1784 store->InitializeParams(dpp, "DeleteObject", &del_params);
1785 target->InitializeParamsfromObject(dpp, &del_params);
1786
1787 /* As it is cascade delete, it will delete the objectdata table entries also */
1788 ret = store->ProcessOp(dpp, "DeleteObject", &del_params);
1789 if (ret) {
1790 ldpp_dout(dpp, 0) << "In DeleteObject failed err:(" <<ret<<")" << dendl;
1791 goto out;
1792 }
1793
1794out:
1795 return ret;
1796}
1797
1798int DB::get_entry(const std::string& oid, const std::string& marker,
1799 rgw::sal::Lifecycle::LCEntry& entry)
1800{
1801 int ret = 0;
1802 const DoutPrefixProvider *dpp = get_def_dpp();
1803
1804 DBOpParams params = {};
1805 InitializeParams(dpp, "GetLCEntry", &params);
1806
1807 params.op.lc_entry.index = oid;
1808 params.op.lc_entry.entry.bucket = marker;
1809
1810 params.op.query_str = "get_entry";
1811 ret = ProcessOp(dpp, "GetLCEntry", &params);
1812
1813 if (ret) {
1814 ldpp_dout(dpp, 0)<<"In GetLCEntry failed err:(" <<ret<<") " << dendl;
1815 goto out;
1816 }
1817
1818 if (!params.op.lc_entry.entry.start_time == 0) { //ensure entry found
1819 entry = params.op.lc_entry.entry;
1820 }
1821
1822out:
1823 return ret;
1824}
1825
1826int DB::get_next_entry(const std::string& oid, std::string& marker,
1827 rgw::sal::Lifecycle::LCEntry& entry)
1828{
1829 int ret = 0;
1830 const DoutPrefixProvider *dpp = get_def_dpp();
1831
1832 DBOpParams params = {};
1833 InitializeParams(dpp, "GetLCEntry", &params);
1834
1835 params.op.lc_entry.index = oid;
1836 params.op.lc_entry.entry.bucket = marker;
1837
1838 params.op.query_str = "get_next_entry";
1839 ret = ProcessOp(dpp, "GetLCEntry", &params);
1840
1841 if (ret) {
1842 ldpp_dout(dpp, 0)<<"In GetLCEntry failed err:(" <<ret<<") " << dendl;
1843 goto out;
1844 }
1845
1846 if (!params.op.lc_entry.entry.start_time == 0) { //ensure entry found
1847 entry = params.op.lc_entry.entry;
1848 }
1849
1850out:
1851 return ret;
1852}
1853
1854int DB::set_entry(const std::string& oid, const rgw::sal::Lifecycle::LCEntry& entry)
1855{
1856 int ret = 0;
1857 const DoutPrefixProvider *dpp = get_def_dpp();
1858
1859 DBOpParams params = {};
1860 InitializeParams(dpp, "InsertLCEntry", &params);
1861
1862 params.op.lc_entry.index = oid;
1863 params.op.lc_entry.entry = entry;
1864
1865 ret = ProcessOp(dpp, "InsertLCEntry", &params);
1866
1867 if (ret) {
1868 ldpp_dout(dpp, 0)<<"In InsertLCEntry failed err:(" <<ret<<") " << dendl;
1869 goto out;
1870 }
1871
1872out:
1873 return ret;
1874}
1875
1876int DB::list_entries(const std::string& oid, const std::string& marker,
1877 uint32_t max_entries, vector<rgw::sal::Lifecycle::LCEntry>& entries)
1878{
1879 int ret = 0;
1880 const DoutPrefixProvider *dpp = get_def_dpp();
1881
1882 entries.clear();
1883
1884 DBOpParams params = {};
1885 InitializeParams(dpp, "ListLCEntries", &params);
1886
1887 params.op.lc_entry.index = oid;
1888 params.op.lc_entry.min_marker = marker;
1889 params.op.list_max_count = max_entries;
1890
1891 ret = ProcessOp(dpp, "ListLCEntries", &params);
1892
1893 if (ret) {
1894 ldpp_dout(dpp, 0)<<"In ListLCEntries failed err:(" <<ret<<") " << dendl;
1895 goto out;
1896 }
1897
1898 for (auto& entry : params.op.lc_entry.list_entries) {
1899 entries.push_back(std::move(entry));
1900 }
1901
1902out:
1903 return ret;
1904}
1905
1906int DB::rm_entry(const std::string& oid, const rgw::sal::Lifecycle::LCEntry& entry)
1907{
1908 int ret = 0;
1909 const DoutPrefixProvider *dpp = get_def_dpp();
1910
1911 DBOpParams params = {};
1912 InitializeParams(dpp, "RemoveLCEntry", &params);
1913
1914 params.op.lc_entry.index = oid;
1915 params.op.lc_entry.entry = entry;
1916
1917 ret = ProcessOp(dpp, "RemoveLCEntry", &params);
1918
1919 if (ret) {
1920 ldpp_dout(dpp, 0)<<"In RemoveLCEntry failed err:(" <<ret<<") " << dendl;
1921 goto out;
1922 }
1923
1924out:
1925 return ret;
1926}
1927
1928int DB::get_head(const std::string& oid, rgw::sal::Lifecycle::LCHead& head)
1929{
1930 int ret = 0;
1931 const DoutPrefixProvider *dpp = get_def_dpp();
1932
1933 DBOpParams params = {};
1934 InitializeParams(dpp, "GetLCHead", &params);
1935
1936 params.op.lc_head.index = oid;
1937
1938 ret = ProcessOp(dpp, "GetLCHead", &params);
1939
1940 if (ret) {
1941 ldpp_dout(dpp, 0)<<"In GetLCHead failed err:(" <<ret<<") " << dendl;
1942 goto out;
1943 }
1944
1945 head = params.op.lc_head.head;
1946
1947out:
1948 return ret;
1949}
1950
1951int DB::put_head(const std::string& oid, const rgw::sal::Lifecycle::LCHead& head)
1952{
1953 int ret = 0;
1954 const DoutPrefixProvider *dpp = get_def_dpp();
1955
1956 DBOpParams params = {};
1957 InitializeParams(dpp, "InsertLCHead", &params);
1958
1959 params.op.lc_head.index = oid;
1960 params.op.lc_head.head = head;
1961
1962 ret = ProcessOp(dpp, "InsertLCHead", &params);
1963
1964 if (ret) {
1965 ldpp_dout(dpp, 0)<<"In InsertLCHead failed err:(" <<ret<<") " << dendl;
1966 goto out;
1967 }
1968
1969out:
1970 return ret;
1971}
1972
1973} } // namespace rgw::store
1974