]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_sal_motr.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / rgw / rgw_sal_motr.cc
CommitLineData
1e59de90
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=2 sw=2 expandtab ft=cpp
3
4/*
5 * Ceph - scalable distributed file system
6 *
7 * SAL implementation for the CORTX Motr backend
8 *
9 * Copyright (C) 2021 Seagate Technology LLC and/or its Affiliates
10 *
11 * This is free software; you can redistribute it and/or
12 * modify it under the terms of the GNU Lesser General Public
13 * License version 2.1, as published by the Free Software
14 * Foundation. See file COPYING.
15 *
16 */
17
18#include <errno.h>
19#include <stdlib.h>
20#include <unistd.h>
21
22extern "C" {
23#pragma clang diagnostic push
24#pragma clang diagnostic ignored "-Wextern-c-compat"
25#pragma clang diagnostic ignored "-Wdeprecated-anon-enum-enum-conversion"
26#include "motr/config.h"
27#include "lib/types.h"
28#include "lib/trace.h" // m0_trace_set_mmapped_buffer
29#include "motr/layout.h" // M0_OBJ_LAYOUT_ID
30#include "helpers/helpers.h" // m0_ufid_next
31#pragma clang diagnostic pop
32}
33
34#include "common/Clock.h"
35#include "common/errno.h"
36
37#include "rgw_compression.h"
38#include "rgw_sal.h"
39#include "rgw_sal_motr.h"
40#include "rgw_bucket.h"
41
42#define dout_subsys ceph_subsys_rgw
43
44using std::string;
45using std::map;
46using std::vector;
47using std::set;
48using std::list;
49
50static string mp_ns = RGW_OBJ_NS_MULTIPART;
51static struct m0_ufid_generator ufid_gr;
52
53namespace rgw::sal {
54
55using ::ceph::encode;
56using ::ceph::decode;
57
58static std::string motr_global_indices[] = {
59 RGW_MOTR_USERS_IDX_NAME,
60 RGW_MOTR_BUCKET_INST_IDX_NAME,
61 RGW_MOTR_BUCKET_HD_IDX_NAME,
62 RGW_IAM_MOTR_ACCESS_KEY,
63 RGW_IAM_MOTR_EMAIL_KEY
64};
65
66void MotrMetaCache::invalid(const DoutPrefixProvider *dpp,
67 const string& name)
68{
69 cache.invalidate_remove(dpp, name);
70}
71
72int MotrMetaCache::put(const DoutPrefixProvider *dpp,
73 const string& name,
74 const bufferlist& data)
75{
76 ldpp_dout(dpp, 0) << "Put into cache: name = " << name << dendl;
77
78 ObjectCacheInfo info;
79 info.status = 0;
80 info.data = data;
81 info.flags = CACHE_FLAG_DATA;
82 info.meta.mtime = ceph::real_clock::now();
83 info.meta.size = data.length();
84 cache.put(dpp, name, info, NULL);
85
86 // Inform other rgw instances. Do nothing if it gets some error?
87 int rc = distribute_cache(dpp, name, info, UPDATE_OBJ);
88 if (rc < 0)
89 ldpp_dout(dpp, 0) << "ERROR: failed to distribute cache for " << name << dendl;
90
91 return 0;
92}
93
94int MotrMetaCache::get(const DoutPrefixProvider *dpp,
95 const string& name,
96 bufferlist& data)
97{
98 ObjectCacheInfo info;
99 uint32_t flags = CACHE_FLAG_DATA;
100 int rc = cache.get(dpp, name, info, flags, NULL);
101 if (rc == 0) {
102 if (info.status < 0)
103 return info.status;
104
105 bufferlist& bl = info.data;
106 bufferlist::iterator it = bl.begin();
107 data.clear();
108
109 it.copy_all(data);
110 ldpp_dout(dpp, 0) << "Cache hit: name = " << name << dendl;
111 return 0;
112 }
113 ldpp_dout(dpp, 0) << "Cache miss: name = " << name << ", rc = "<< rc << dendl;
114 if(rc == -ENODATA)
115 return -ENOENT;
116
117 return rc;
118}
119
120int MotrMetaCache::remove(const DoutPrefixProvider *dpp,
121 const string& name)
122
123{
124 cache.invalidate_remove(dpp, name);
125
126 ObjectCacheInfo info;
127 int rc = distribute_cache(dpp, name, info, INVALIDATE_OBJ);
128 if (rc < 0) {
129 ldpp_dout(dpp, 0) << "ERROR: " <<__func__<< "(): failed to distribute cache: rc =" << rc << dendl;
130 }
131
132 ldpp_dout(dpp, 0) << "Remove from cache: name = " << name << dendl;
133 return 0;
134}
135
136int MotrMetaCache::distribute_cache(const DoutPrefixProvider *dpp,
137 const string& normal_name,
138 ObjectCacheInfo& obj_info, int op)
139{
140 return 0;
141}
142
143int MotrMetaCache::watch_cb(const DoutPrefixProvider *dpp,
144 uint64_t notify_id,
145 uint64_t cookie,
146 uint64_t notifier_id,
147 bufferlist& bl)
148{
149 return 0;
150}
151
152void MotrMetaCache::set_enabled(bool status)
153{
154 cache.set_enabled(status);
155}
156
157// TODO: properly handle the number of key/value pairs to get in
158// one query. Now the POC simply tries to retrieve all `max` number of pairs
159// with starting key `marker`.
160int MotrUser::list_buckets(const DoutPrefixProvider *dpp, const string& marker,
161 const string& end_marker, uint64_t max, bool need_stats,
162 BucketList &buckets, optional_yield y)
163{
164 int rc;
165 vector<string> keys(max);
166 vector<bufferlist> vals(max);
167 bool is_truncated = false;
168
169 ldpp_dout(dpp, 20) <<__func__<< ": list_user_buckets: marker=" << marker
170 << " end_marker=" << end_marker
171 << " max=" << max << dendl;
172
173 // Retrieve all `max` number of pairs.
174 buckets.clear();
175 string user_info_iname = "motr.rgw.user.info." + info.user_id.to_str();
176 keys[0] = marker;
177 rc = store->next_query_by_name(user_info_iname, keys, vals);
178 if (rc < 0) {
179 ldpp_dout(dpp, 0) << "ERROR: NEXT query failed. " << rc << dendl;
180 return rc;
181 }
182
183 // Process the returned pairs to add into BucketList.
184 uint64_t bcount = 0;
185 for (const auto& bl: vals) {
186 if (bl.length() == 0)
187 break;
188
189 RGWBucketEnt ent;
190 auto iter = bl.cbegin();
191 ent.decode(iter);
192
193 std::time_t ctime = ceph::real_clock::to_time_t(ent.creation_time);
194 ldpp_dout(dpp, 20) << "got creation time: << " << std::put_time(std::localtime(&ctime), "%F %T") << dendl;
195
196 if (!end_marker.empty() &&
197 end_marker.compare(ent.bucket.marker) <= 0)
198 break;
199
200 buckets.add(std::make_unique<MotrBucket>(this->store, ent, this));
201 bcount++;
202 }
203 if (bcount == max)
204 is_truncated = true;
205 buckets.set_truncated(is_truncated);
206
207 return 0;
208}
209
210int MotrUser::create_bucket(const DoutPrefixProvider* dpp,
211 const rgw_bucket& b,
212 const std::string& zonegroup_id,
213 rgw_placement_rule& placement_rule,
214 std::string& swift_ver_location,
215 const RGWQuotaInfo* pquota_info,
216 const RGWAccessControlPolicy& policy,
217 Attrs& attrs,
218 RGWBucketInfo& info,
219 obj_version& ep_objv,
220 bool exclusive,
221 bool obj_lock_enabled,
222 bool* existed,
223 req_info& req_info,
224 std::unique_ptr<Bucket>* bucket_out,
225 optional_yield y)
226{
227 int ret;
228 std::unique_ptr<Bucket> bucket;
229
230 // Look up the bucket. Create it if it doesn't exist.
231 ret = this->store->get_bucket(dpp, this, b, &bucket, y);
232 if (ret < 0 && ret != -ENOENT)
233 return ret;
234
235 if (ret != -ENOENT) {
236 *existed = true;
237 // if (swift_ver_location.empty()) {
238 // swift_ver_location = bucket->get_info().swift_ver_location;
239 // }
240 // placement_rule.inherit_from(bucket->get_info().placement_rule);
241
242 // TODO: ACL policy
243 // // don't allow changes to the acl policy
244 //RGWAccessControlPolicy old_policy(ctx());
245 //int rc = rgw_op_get_bucket_policy_from_attr(
246 // dpp, this, u, bucket->get_attrs(), &old_policy, y);
247 //if (rc >= 0 && old_policy != policy) {
248 // bucket_out->swap(bucket);
249 // return -EEXIST;
250 //}
251 } else {
252
253 placement_rule.name = "default";
254 placement_rule.storage_class = "STANDARD";
255 bucket = std::make_unique<MotrBucket>(store, b, this);
256 bucket->set_attrs(attrs);
257 *existed = false;
258 }
259
260 if (!*existed){
261 // TODO: how to handle zone and multi-site.
262 info.placement_rule = placement_rule;
263 info.bucket = b;
264 info.owner = this->get_info().user_id;
265 info.zonegroup = zonegroup_id;
266 if (obj_lock_enabled)
267 info.flags = BUCKET_VERSIONED | BUCKET_OBJ_LOCK_ENABLED;
268 bucket->set_version(ep_objv);
269 bucket->get_info() = info;
270
271 // Create a new bucket: (1) Add a key/value pair in the
272 // bucket instance index. (2) Create a new bucket index.
273 MotrBucket* mbucket = static_cast<MotrBucket*>(bucket.get());
274 ret = mbucket->put_info(dpp, y, ceph::real_time())? :
275 mbucket->create_bucket_index() ? :
276 mbucket->create_multipart_indices();
277 if (ret < 0)
278 ldpp_dout(dpp, 0) << "ERROR: failed to create bucket indices! " << ret << dendl;
279
280 // Insert the bucket entry into the user info index.
281 ret = mbucket->link_user(dpp, this, y);
282 if (ret < 0)
283 ldpp_dout(dpp, 0) << "ERROR: failed to add bucket entry! " << ret << dendl;
284 } else {
285 return -EEXIST;
286 // bucket->set_version(ep_objv);
287 // bucket->get_info() = info;
288 }
289
290 bucket_out->swap(bucket);
291
292 return ret;
293}
294
295int MotrUser::read_attrs(const DoutPrefixProvider* dpp, optional_yield y)
296{
297 return 0;
298}
299
300int MotrUser::read_stats(const DoutPrefixProvider *dpp,
301 optional_yield y, RGWStorageStats* stats,
302 ceph::real_time *last_stats_sync,
303 ceph::real_time *last_stats_update)
304{
305 return 0;
306}
307
308/* stats - Not for first pass */
309int MotrUser::read_stats_async(const DoutPrefixProvider *dpp, RGWGetUserStats_CB *cb)
310{
311 return 0;
312}
313
314int MotrUser::complete_flush_stats(const DoutPrefixProvider *dpp, optional_yield y)
315{
316 return 0;
317}
318
319int MotrUser::read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
320 bool *is_truncated, RGWUsageIter& usage_iter,
321 map<rgw_user_bucket, rgw_usage_log_entry>& usage)
322{
323 return 0;
324}
325
326int MotrUser::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch)
327{
328 return 0;
329}
330
331int MotrUser::load_user_from_idx(const DoutPrefixProvider *dpp,
332 MotrStore *store,
333 RGWUserInfo& info, map<string, bufferlist> *attrs,
334 RGWObjVersionTracker *objv_tr)
335{
336 struct MotrUserInfo muinfo;
337 bufferlist bl;
338 ldpp_dout(dpp, 20) << "info.user_id.id = " << info.user_id.id << dendl;
339 if (store->get_user_cache()->get(dpp, info.user_id.id, bl)) {
340 // Cache misses
341 int rc = store->do_idx_op_by_name(RGW_MOTR_USERS_IDX_NAME,
342 M0_IC_GET, info.user_id.to_str(), bl);
343 ldpp_dout(dpp, 20) << "do_idx_op_by_name() = " << rc << dendl;
344 if (rc < 0)
345 return rc;
346
347 // Put into cache.
348 store->get_user_cache()->put(dpp, info.user_id.id, bl);
349 }
350
351 bufferlist& blr = bl;
352 auto iter = blr.cbegin();
353 muinfo.decode(iter);
354 info = muinfo.info;
355 if (attrs)
356 *attrs = muinfo.attrs;
357 if (objv_tr)
358 {
359 objv_tr->read_version = muinfo.user_version;
360 objv_tracker.read_version = objv_tr->read_version;
361 }
362
363 if (!info.access_keys.empty()) {
364 for(auto key : info.access_keys) {
365 access_key_tracker.insert(key.first);
366 }
367 }
368
369 return 0;
370}
371
372int MotrUser::load_user(const DoutPrefixProvider *dpp,
373 optional_yield y)
374{
375 ldpp_dout(dpp, 20) << "load user: user id = " << info.user_id.to_str() << dendl;
376 return load_user_from_idx(dpp, store, info, &attrs, &objv_tracker);
377}
378
379int MotrUser::create_user_info_idx()
380{
381 string user_info_iname = "motr.rgw.user.info." + info.user_id.to_str();
382 return store->create_motr_idx_by_name(user_info_iname);
383}
384
385int MotrUser::merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y)
386{
387 for (auto& it : new_attrs)
388 attrs[it.first] = it.second;
389
390 return store_user(dpp, y, false);
391}
392
393int MotrUser::store_user(const DoutPrefixProvider* dpp,
394 optional_yield y, bool exclusive, RGWUserInfo* old_info)
395{
396 bufferlist bl;
397 struct MotrUserInfo muinfo;
398 RGWUserInfo orig_info;
399 RGWObjVersionTracker objv_tr = {};
400 obj_version& obj_ver = objv_tr.read_version;
401
402 ldpp_dout(dpp, 20) << "Store_user(): User = " << info.user_id.id << dendl;
403 orig_info.user_id = info.user_id;
404 // XXX: we open and close motr idx 2 times in this method:
405 // 1) on load_user_from_idx() here and 2) on do_idx_op_by_name(PUT) below.
406 // Maybe this can be optimised later somewhow.
407 int rc = load_user_from_idx(dpp, store, orig_info, nullptr, &objv_tr);
408 ldpp_dout(dpp, 10) << "Get user: rc = " << rc << dendl;
409
410 // Check if the user already exists
411 if (rc == 0 && obj_ver.ver > 0) {
412 if (old_info)
413 *old_info = orig_info;
414
415 if (obj_ver.ver != objv_tracker.read_version.ver) {
416 rc = -ECANCELED;
417 ldpp_dout(dpp, 0) << "ERROR: User Read version mismatch" << dendl;
418 goto out;
419 }
420
421 if (exclusive)
422 return rc;
423
424 obj_ver.ver++;
425 } else {
426 obj_ver.ver = 1;
427 obj_ver.tag = "UserTAG";
428 }
429
430 // Insert the user to user info index.
431 muinfo.info = info;
432 muinfo.attrs = attrs;
433 muinfo.user_version = obj_ver;
434 muinfo.encode(bl);
435 rc = store->do_idx_op_by_name(RGW_MOTR_USERS_IDX_NAME,
436 M0_IC_PUT, info.user_id.to_str(), bl);
437 ldpp_dout(dpp, 10) << "Store user to motr index: rc = " << rc << dendl;
438 if (rc == 0) {
439 objv_tracker.read_version = obj_ver;
440 objv_tracker.write_version = obj_ver;
441 }
442
443 // Store access key in access key index
444 if (!info.access_keys.empty()) {
445 std::string access_key;
446 std::string secret_key;
447 std::map<std::string, RGWAccessKey>::const_iterator iter = info.access_keys.begin();
448 const RGWAccessKey& k = iter->second;
449 access_key = k.id;
450 secret_key = k.key;
451 MotrAccessKey MGWUserKeys(access_key, secret_key, info.user_id.to_str());
452 store->store_access_key(dpp, y, MGWUserKeys);
453 access_key_tracker.insert(access_key);
454 }
455
456 // Check if any key need to be deleted
457 if (access_key_tracker.size() != info.access_keys.size()) {
458 std::string key_for_deletion;
459 for (auto key : access_key_tracker) {
460 if (!info.get_key(key)) {
461 key_for_deletion = key;
462 ldpp_dout(dpp, 0) << "Deleting access key: " << key_for_deletion << dendl;
463 store->delete_access_key(dpp, y, key_for_deletion);
464 if (rc < 0) {
465 ldpp_dout(dpp, 0) << "Unable to delete access key" << rc << dendl;
466 }
467 }
468 }
469 if(rc >= 0){
470 access_key_tracker.erase(key_for_deletion);
471 }
472 }
473
474 if (!info.user_email.empty()) {
475 MotrEmailInfo MGWEmailInfo(info.user_id.to_str(), info.user_email);
476 store->store_email_info(dpp, y, MGWEmailInfo);
477 }
478
479 // Create user info index to store all buckets that are belong
480 // to this bucket.
481 rc = create_user_info_idx();
482 if (rc < 0 && rc != -EEXIST) {
483 ldpp_dout(dpp, 0) << "Failed to create user info index: rc = " << rc << dendl;
484 goto out;
485 }
486
487 // Put the user info into cache.
488 rc = store->get_user_cache()->put(dpp, info.user_id.id, bl);
489
490out:
491 return rc;
492}
493
494int MotrUser::remove_user(const DoutPrefixProvider* dpp, optional_yield y)
495{
496 // Remove user info from cache
497 // Delete access keys for user
498 // Delete user info
499 // Delete user from user index
500 // Delete email for user - TODO
501 bufferlist bl;
502 int rc;
503 // Remove the user info from cache.
504 store->get_user_cache()->remove(dpp, info.user_id.id);
505
506 // Delete all access key of user
507 if (!info.access_keys.empty()) {
508 for(auto acc_key = info.access_keys.begin(); acc_key != info.access_keys.end(); acc_key++) {
509 auto access_key = acc_key->first;
510 rc = store->delete_access_key(dpp, y, access_key);
511 // TODO
512 // Check error code for access_key does not exist
513 // Continue to next step only if delete failed because key doesn't exists
514 if (rc < 0){
515 ldpp_dout(dpp, 0) << "Unable to delete access key" << rc << dendl;
516 }
517 }
518 }
519
520 //Delete email id
521 if (!info.user_email.empty()) {
522 rc = store->do_idx_op_by_name(RGW_IAM_MOTR_EMAIL_KEY,
523 M0_IC_DEL, info.user_email, bl);
524 if (rc < 0 && rc != -ENOENT) {
525 ldpp_dout(dpp, 0) << "Unable to delete email id " << rc << dendl;
526 }
527 }
528
529 // Delete user info index
530 string user_info_iname = "motr.rgw.user.info." + info.user_id.to_str();
531 store->delete_motr_idx_by_name(user_info_iname);
532 ldpp_dout(dpp, 10) << "Deleted user info index - " << user_info_iname << dendl;
533
534 // Delete user from user index
535 rc = store->do_idx_op_by_name(RGW_MOTR_USERS_IDX_NAME,
536 M0_IC_DEL, info.user_id.to_str(), bl);
537 if (rc < 0){
538 ldpp_dout(dpp, 0) << "Unable to delete user from user index " << rc << dendl;
539 return rc;
540 }
541
542 // TODO
543 // Delete email for user
544 // rc = store->do_idx_op_by_name(RGW_IAM_MOTR_EMAIL_KEY,
545 // M0_IC_DEL, info.user_email, bl);
546 // if (rc < 0){
547 // ldpp_dout(dpp, 0) << "Unable to delete email for user" << rc << dendl;
548 // return rc;
549 // }
550 return 0;
551}
552
553int MotrUser::verify_mfa(const std::string& mfa_str, bool* verified, const DoutPrefixProvider *dpp, optional_yield y)
554{
555 *verified = false;
556 return 0;
557}
558
559int MotrBucket::remove_bucket(const DoutPrefixProvider *dpp, bool delete_children, bool forward_to_master, req_info* req_info, optional_yield y)
560{
561 int ret;
562
563 ldpp_dout(dpp, 20) << "remove_bucket Entry=" << info.bucket.name << dendl;
564
565 // Refresh info
566 ret = load_bucket(dpp, y);
567 if (ret < 0) {
568 ldpp_dout(dpp, 0) << "ERROR: remove_bucket load_bucket failed rc=" << ret << dendl;
569 return ret;
570 }
571
572 ListParams params;
573 params.list_versions = true;
574 params.allow_unordered = true;
575
576 ListResults results;
577
578 // 1. Check if Bucket has objects.
579 // If bucket contains objects and delete_children is true, delete all objects.
580 // Else throw error that bucket is not empty.
581 do {
582 results.objs.clear();
583
584 // Check if bucket has objects.
585 ret = list(dpp, params, 1000, results, y);
586 if (ret < 0) {
587 return ret;
588 }
589
590 // If result contains entries, bucket is not empty.
591 if (!results.objs.empty() && !delete_children) {
592 ldpp_dout(dpp, 0) << "ERROR: could not remove non-empty bucket " << info.bucket.name << dendl;
593 return -ENOTEMPTY;
594 }
595
596 for (const auto& obj : results.objs) {
597 rgw_obj_key key(obj.key);
598 /* xxx dang */
599 ret = rgw_remove_object(dpp, store, this, key);
600 if (ret < 0 && ret != -ENOENT) {
601 ldpp_dout(dpp, 0) << "ERROR: remove_bucket rgw_remove_object failed rc=" << ret << dendl;
602 return ret;
603 }
604 }
605 } while(results.is_truncated);
606
607 // 2. Abort Mp uploads on the bucket.
608 ret = abort_multiparts(dpp, store->ctx());
609 if (ret < 0) {
610 return ret;
611 }
612
613 // 3. Remove mp index??
614 string bucket_multipart_iname = "motr.rgw.bucket." + info.bucket.name + ".multiparts";
615 ret = store->delete_motr_idx_by_name(bucket_multipart_iname);
616 if (ret < 0) {
617 ldpp_dout(dpp, 0) << "ERROR: remove_bucket failed to remove multipart index rc=" << ret << dendl;
618 return ret;
619 }
620
621 // 4. Sync user stats.
622 ret = this->sync_user_stats(dpp, y);
623 if (ret < 0) {
624 ldout(store->ctx(), 1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl;
625 }
626
627 // 5. Remove the bucket from user info index. (unlink user)
628 ret = this->unlink_user(dpp, owner, y);
629 if (ret < 0) {
630 ldpp_dout(dpp, 0) << "ERROR: remove_bucket unlink_user failed rc=" << ret << dendl;
631 return ret;
632 }
633
634 // 6. Remove bucket index.
635 string bucket_index_iname = "motr.rgw.bucket.index." + info.bucket.name;
636 ret = store->delete_motr_idx_by_name(bucket_index_iname);
637 if (ret < 0) {
638 ldpp_dout(dpp, 0) << "ERROR: remove_bucket unlink_user failed rc=" << ret << dendl;
639 return ret;
640 }
641
642 // 7. Remove bucket instance info.
643 bufferlist bl;
644 ret = store->get_bucket_inst_cache()->remove(dpp, info.bucket.name);
645 if (ret < 0) {
646 ldpp_dout(dpp, 0) << "ERROR: remove_bucket failed to remove bucket instance from cache rc="
647 << ret << dendl;
648 return ret;
649 }
650
651 ret = store->do_idx_op_by_name(RGW_MOTR_BUCKET_INST_IDX_NAME,
652 M0_IC_DEL, info.bucket.name, bl);
653 if (ret < 0) {
654 ldpp_dout(dpp, 0) << "ERROR: remove_bucket failed to remove bucket instance rc="
655 << ret << dendl;
656 return ret;
657 }
658
659 // TODO :
660 // 8. Remove Notifications
661 // if bucket has notification definitions associated with it
662 // they should be removed (note that any pending notifications on the bucket are still going to be sent)
663
664 // 9. Forward request to master.
665 if (forward_to_master) {
666 bufferlist in_data;
667 ret = store->forward_request_to_master(dpp, owner, &bucket_version, in_data, nullptr, *req_info, y);
668 if (ret < 0) {
669 if (ret == -ENOENT) {
670 /* adjust error, we want to return with NoSuchBucket and not
671 * NoSuchKey */
672 ret = -ERR_NO_SUCH_BUCKET;
673 }
674 ldpp_dout(dpp, 0) << "ERROR: Forward to master failed. ret=" << ret << dendl;
675 return ret;
676 }
677 }
678
679 ldpp_dout(dpp, 20) << "remove_bucket Exit=" << info.bucket.name << dendl;
680
681 return ret;
682}
683
684int MotrBucket::remove_bucket_bypass_gc(int concurrent_max, bool
685 keep_index_consistent,
686 optional_yield y, const
687 DoutPrefixProvider *dpp) {
688 return 0;
689}
690
691int MotrBucket::put_info(const DoutPrefixProvider *dpp, bool exclusive, ceph::real_time _mtime)
692{
693 bufferlist bl;
694 struct MotrBucketInfo mbinfo;
695
696 ldpp_dout(dpp, 20) << "put_info(): bucket_id=" << info.bucket.bucket_id << dendl;
697 mbinfo.info = info;
698 mbinfo.bucket_attrs = attrs;
699 mbinfo.mtime = _mtime;
700 mbinfo.bucket_version = bucket_version;
701 mbinfo.encode(bl);
702
703 // Insert bucket instance using bucket's marker (string).
704 int rc = store->do_idx_op_by_name(RGW_MOTR_BUCKET_INST_IDX_NAME,
705 M0_IC_PUT, info.bucket.name, bl, !exclusive);
706 if (rc == 0)
707 store->get_bucket_inst_cache()->put(dpp, info.bucket.name, bl);
708
709 return rc;
710}
711
712int MotrBucket::load_bucket(const DoutPrefixProvider *dpp, optional_yield y, bool get_stats)
713{
714 // Get bucket instance using bucket's name (string). or bucket id?
715 bufferlist bl;
716 if (store->get_bucket_inst_cache()->get(dpp, info.bucket.name, bl)) {
717 // Cache misses.
718 ldpp_dout(dpp, 20) << "load_bucket(): name=" << info.bucket.name << dendl;
719 int rc = store->do_idx_op_by_name(RGW_MOTR_BUCKET_INST_IDX_NAME,
720 M0_IC_GET, info.bucket.name, bl);
721 ldpp_dout(dpp, 20) << "load_bucket(): rc=" << rc << dendl;
722 if (rc < 0)
723 return rc;
724 store->get_bucket_inst_cache()->put(dpp, info.bucket.name, bl);
725 }
726
727 struct MotrBucketInfo mbinfo;
728 bufferlist& blr = bl;
729 auto iter =blr.cbegin();
730 mbinfo.decode(iter); //Decode into MotrBucketInfo.
731
732 info = mbinfo.info;
733 ldpp_dout(dpp, 20) << "load_bucket(): bucket_id=" << info.bucket.bucket_id << dendl;
734 rgw_placement_rule placement_rule;
735 placement_rule.name = "default";
736 placement_rule.storage_class = "STANDARD";
737 info.placement_rule = placement_rule;
738
739 attrs = mbinfo.bucket_attrs;
740 mtime = mbinfo.mtime;
741 bucket_version = mbinfo.bucket_version;
742
743 return 0;
744}
745
746int MotrBucket::link_user(const DoutPrefixProvider* dpp, User* new_user, optional_yield y)
747{
748 bufferlist bl;
749 RGWBucketEnt new_bucket;
750 ceph::real_time creation_time = get_creation_time();
751
752 // RGWBucketEnt or cls_user_bucket_entry is the structure that is stored.
753 new_bucket.bucket = info.bucket;
754 new_bucket.size = 0;
755 if (real_clock::is_zero(creation_time))
756 creation_time = ceph::real_clock::now();
757 new_bucket.creation_time = creation_time;
758 new_bucket.encode(bl);
759 std::time_t ctime = ceph::real_clock::to_time_t(new_bucket.creation_time);
760 ldpp_dout(dpp, 20) << "got creation time: << " << std::put_time(std::localtime(&ctime), "%F %T") << dendl;
761
762 // Insert the user into the user info index.
763 string user_info_idx_name = "motr.rgw.user.info." + new_user->get_info().user_id.to_str();
764 return store->do_idx_op_by_name(user_info_idx_name,
765 M0_IC_PUT, info.bucket.name, bl);
766
767}
768
769int MotrBucket::unlink_user(const DoutPrefixProvider* dpp, User* new_user, optional_yield y)
770{
771 // Remove the user into the user info index.
772 bufferlist bl;
773 string user_info_idx_name = "motr.rgw.user.info." + new_user->get_info().user_id.to_str();
774 return store->do_idx_op_by_name(user_info_idx_name,
775 M0_IC_DEL, info.bucket.name, bl);
776}
777
778/* stats - Not for first pass */
779int MotrBucket::read_stats(const DoutPrefixProvider *dpp,
780 const bucket_index_layout_generation& idx_layout, int shard_id,
781 std::string *bucket_ver, std::string *master_ver,
782 std::map<RGWObjCategory, RGWStorageStats>& stats,
783 std::string *max_marker, bool *syncstopped)
784{
785 return 0;
786}
787
788int MotrBucket::create_bucket_index()
789{
790 string bucket_index_iname = "motr.rgw.bucket.index." + info.bucket.name;
791 return store->create_motr_idx_by_name(bucket_index_iname);
792}
793
794int MotrBucket::create_multipart_indices()
795{
796 int rc;
797
798 // Bucket multipart index stores in-progress multipart uploads.
799 // Key is the object name + upload_id, value is a rgw_bucket_dir_entry.
800 // An entry is inserted when a multipart upload is initialised (
801 // MotrMultipartUpload::init()) and will be removed when the upload
802 // is completed (MotrMultipartUpload::complete()).
803 // MotrBucket::list_multiparts() will scan this index to return all
804 // in-progress multipart uploads in the bucket.
805 string bucket_multipart_iname = "motr.rgw.bucket." + info.bucket.name + ".multiparts";
806 rc = store->create_motr_idx_by_name(bucket_multipart_iname);
807 if (rc < 0) {
808 ldout(store->cctx, 0) << "Failed to create bucket multipart index " << bucket_multipart_iname << dendl;
809 return rc;
810 }
811
812 return 0;
813}
814
815
816int MotrBucket::read_stats_async(const DoutPrefixProvider *dpp,
817 const bucket_index_layout_generation& idx_layout,
818 int shard_id, RGWGetBucketStats_CB *ctx)
819{
820 return 0;
821}
822
823int MotrBucket::sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y)
824{
825 return 0;
826}
827
828int MotrBucket::update_container_stats(const DoutPrefixProvider *dpp)
829{
830 return 0;
831}
832
833int MotrBucket::check_bucket_shards(const DoutPrefixProvider *dpp)
834{
835 return 0;
836}
837
838int MotrBucket::chown(const DoutPrefixProvider *dpp, User& new_user, optional_yield y)
839{
840 // TODO: update bucket with new owner
841 return 0;
842}
843
844/* Make sure to call load_bucket() if you need it first */
845bool MotrBucket::is_owner(User* user)
846{
847 return (info.owner.compare(user->get_id()) == 0);
848}
849
850int MotrBucket::check_empty(const DoutPrefixProvider *dpp, optional_yield y)
851{
852 /* XXX: Check if bucket contains any objects */
853 return 0;
854}
855
856int MotrBucket::check_quota(const DoutPrefixProvider *dpp, RGWQuota& quota, uint64_t obj_size,
857 optional_yield y, bool check_size_only)
858{
859 /* Not Handled in the first pass as stats are also needed */
860 return 0;
861}
862
863int MotrBucket::merge_and_store_attrs(const DoutPrefixProvider *dpp, Attrs& new_attrs, optional_yield y)
864{
865 for (auto& it : new_attrs)
866 attrs[it.first] = it.second;
867
868 return put_info(dpp, y, ceph::real_time());
869}
870
871int MotrBucket::try_refresh_info(const DoutPrefixProvider *dpp, ceph::real_time *pmtime)
872{
873 return 0;
874}
875
876/* XXX: usage and stats not supported in the first pass */
877int MotrBucket::read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch,
878 uint32_t max_entries, bool *is_truncated,
879 RGWUsageIter& usage_iter,
880 map<rgw_user_bucket, rgw_usage_log_entry>& usage)
881{
882 return 0;
883}
884
885int MotrBucket::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch)
886{
887 return 0;
888}
889
890int MotrBucket::remove_objs_from_index(const DoutPrefixProvider *dpp, std::list<rgw_obj_index_key>& objs_to_unlink)
891{
892 /* XXX: CHECK: Unlike RadosStore, there is no seperate bucket index table.
893 * Delete all the object in the list from the object table of this
894 * bucket
895 */
896 return 0;
897}
898
899int MotrBucket::check_index(const DoutPrefixProvider *dpp, std::map<RGWObjCategory, RGWStorageStats>& existing_stats, std::map<RGWObjCategory, RGWStorageStats>& calculated_stats)
900{
901 /* XXX: stats not supported yet */
902 return 0;
903}
904
905int MotrBucket::rebuild_index(const DoutPrefixProvider *dpp)
906{
907 /* there is no index table in dbstore. Not applicable */
908 return 0;
909}
910
911int MotrBucket::set_tag_timeout(const DoutPrefixProvider *dpp, uint64_t timeout)
912{
913 /* XXX: CHECK: set tag timeout for all the bucket objects? */
914 return 0;
915}
916
917int MotrBucket::purge_instance(const DoutPrefixProvider *dpp)
918{
919 /* XXX: CHECK: for dbstore only single instance supported.
920 * Remove all the objects for that instance? Anything extra needed?
921 */
922 return 0;
923}
924
925int MotrBucket::set_acl(const DoutPrefixProvider *dpp, RGWAccessControlPolicy &acl, optional_yield y)
926{
927 int ret = 0;
928 bufferlist aclbl;
929
930 acls = acl;
931 acl.encode(aclbl);
932
933 Attrs attrs = get_attrs();
934 attrs[RGW_ATTR_ACL] = aclbl;
935
936 // TODO: update bucket entry with the new attrs
937
938 return ret;
939}
940
941std::unique_ptr<Object> MotrBucket::get_object(const rgw_obj_key& k)
942{
943 return std::make_unique<MotrObject>(this->store, k, this);
944}
945
946int MotrBucket::list(const DoutPrefixProvider *dpp, ListParams& params, int max, ListResults& results, optional_yield y)
947{
948 int rc;
949 vector<string> keys(max);
950 vector<bufferlist> vals(max);
951
952 ldpp_dout(dpp, 20) << "bucket=" << info.bucket.name
953 << " prefix=" << params.prefix
954 << " marker=" << params.marker
955 << " max=" << max << dendl;
956
957 // Retrieve all `max` number of pairs.
958 string bucket_index_iname = "motr.rgw.bucket.index." + info.bucket.name;
959 keys[0] = params.marker.empty() ? params.prefix :
960 params.marker.get_oid();
961 rc = store->next_query_by_name(bucket_index_iname, keys, vals, params.prefix,
962 params.delim);
963 if (rc < 0) {
964 ldpp_dout(dpp, 0) << "ERROR: NEXT query failed. " << rc << dendl;
965 return rc;
966 }
967
968 // Process the returned pairs to add into ListResults.
969 int i = 0;
970 for (; i < rc; ++i) {
971 if (vals[i].length() == 0) {
972 results.common_prefixes[keys[i]] = true;
973 } else {
974 rgw_bucket_dir_entry ent;
975 auto iter = vals[i].cbegin();
976 ent.decode(iter);
977 if (params.list_versions || ent.is_visible())
978 results.objs.emplace_back(std::move(ent));
979 }
980 }
981
982 if (i == max) {
983 results.is_truncated = true;
984 results.next_marker = keys[max - 1] + " ";
985 } else {
986 results.is_truncated = false;
987 }
988
989 return 0;
990}
991
992int MotrBucket::list_multiparts(const DoutPrefixProvider *dpp,
993 const string& prefix,
994 string& marker,
995 const string& delim,
996 const int& max_uploads,
997 vector<std::unique_ptr<MultipartUpload>>& uploads,
998 map<string, bool> *common_prefixes,
999 bool *is_truncated)
1000{
1001 int rc;
1002 vector<string> key_vec(max_uploads);
1003 vector<bufferlist> val_vec(max_uploads);
1004
1005 string bucket_multipart_iname =
1006 "motr.rgw.bucket." + this->get_name() + ".multiparts";
1007 key_vec[0].clear();
1008 key_vec[0].assign(marker.begin(), marker.end());
1009 rc = store->next_query_by_name(bucket_multipart_iname, key_vec, val_vec);
1010 if (rc < 0) {
1011 ldpp_dout(dpp, 0) << "ERROR: NEXT query failed. " << rc << dendl;
1012 return rc;
1013 }
1014
1015 // Process the returned pairs to add into ListResults.
1016 // The POC can only support listing all objects or selecting
1017 // with prefix.
1018 int ocount = 0;
1019 rgw_obj_key last_obj_key;
1020 *is_truncated = false;
1021 for (const auto& bl: val_vec) {
1022 if (bl.length() == 0)
1023 break;
1024
1025 rgw_bucket_dir_entry ent;
1026 auto iter = bl.cbegin();
1027 ent.decode(iter);
1028
1029 if (prefix.size() &&
1030 (0 != ent.key.name.compare(0, prefix.size(), prefix))) {
1031 ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ <<
1032 ": skippping \"" << ent.key <<
1033 "\" because doesn't match prefix" << dendl;
1034 continue;
1035 }
1036
1037 rgw_obj_key key(ent.key);
1038 uploads.push_back(this->get_multipart_upload(key.name));
1039 last_obj_key = key;
1040 ocount++;
1041 if (ocount == max_uploads) {
1042 *is_truncated = true;
1043 break;
1044 }
1045 }
1046 marker = last_obj_key.name;
1047
1048 // What is common prefix? We don't handle it for now.
1049
1050 return 0;
1051
1052}
1053
1054int MotrBucket::abort_multiparts(const DoutPrefixProvider *dpp, CephContext *cct)
1055{
1056 return 0;
1057}
1058
1059void MotrStore::finalize(void)
1060{
1061 // close connection with motr
1062 m0_client_fini(this->instance, true);
1063}
1064
1065const std::string& MotrZoneGroup::get_endpoint() const
1066{
1067 if (!group.endpoints.empty()) {
1068 return group.endpoints.front();
1069 } else {
1070 // use zonegroup's master zone endpoints
1071 auto z = group.zones.find(group.master_zone);
1072 if (z != group.zones.end() && !z->second.endpoints.empty()) {
1073 return z->second.endpoints.front();
1074 }
1075 }
1076 return empty;
1077}
1078
1079bool MotrZoneGroup::placement_target_exists(std::string& target) const
1080{
1081 return !!group.placement_targets.count(target);
1082}
1083
aee94f69 1084void MotrZoneGroup::get_placement_target_names(std::set<std::string>& names) const
1e59de90
TL
1085{
1086 for (const auto& target : group.placement_targets) {
1087 names.emplace(target.second.name);
1088 }
1e59de90
TL
1089}
1090
1091int MotrZoneGroup::get_placement_tier(const rgw_placement_rule& rule,
1092 std::unique_ptr<PlacementTier>* tier)
1093{
1094 std::map<std::string, RGWZoneGroupPlacementTarget>::const_iterator titer;
1095 titer = group.placement_targets.find(rule.name);
1096 if (titer == group.placement_targets.end()) {
1097 return -ENOENT;
1098 }
1099
1100 const auto& target_rule = titer->second;
1101 std::map<std::string, RGWZoneGroupPlacementTier>::const_iterator ttier;
1102 ttier = target_rule.tier_targets.find(rule.storage_class);
1103 if (ttier == target_rule.tier_targets.end()) {
1104 // not found
1105 return -ENOENT;
1106 }
1107
1108 PlacementTier* t;
1109 t = new MotrPlacementTier(store, ttier->second);
1110 if (!t)
1111 return -ENOMEM;
1112
1113 tier->reset(t);
1114 return 0;
1115}
1116
1117ZoneGroup& MotrZone::get_zonegroup()
1118{
1119 return zonegroup;
1120}
1121
1122const std::string& MotrZone::get_id()
1123{
1124 return zone_params->get_id();
1125}
1126
1127const std::string& MotrZone::get_name() const
1128{
1129 return zone_params->get_name();
1130}
1131
1132bool MotrZone::is_writeable()
1133{
1134 return true;
1135}
1136
1137bool MotrZone::get_redirect_endpoint(std::string* endpoint)
1138{
1139 return false;
1140}
1141
1142bool MotrZone::has_zonegroup_api(const std::string& api) const
1143{
1144 return (zonegroup->api_name == api);
1145}
1146
1147const std::string& MotrZone::get_current_period_id()
1148{
1149 return current_period->get_id();
1150}
1151
1152std::unique_ptr<LuaManager> MotrStore::get_lua_manager()
1153{
1154 return std::make_unique<MotrLuaManager>(this);
1155}
1156
1157int MotrObject::get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **_state, optional_yield y, bool follow_olh)
1158{
1159 // Get object's metadata (those stored in rgw_bucket_dir_entry).
1160 bufferlist bl;
1161 if (this->store->get_obj_meta_cache()->get(dpp, this->get_key().get_oid(), bl)) {
1162 // Cache misses.
1163 string bucket_index_iname = "motr.rgw.bucket.index." + this->get_bucket()->get_name();
1164 int rc = this->store->do_idx_op_by_name(bucket_index_iname,
1165 M0_IC_GET, this->get_key().get_oid(), bl);
1166 if (rc < 0) {
1167 ldpp_dout(dpp, 0) << "Failed to get object's entry from bucket index. " << dendl;
1168 return rc;
1169 }
1170
1171 // Put into cache.
1172 this->store->get_obj_meta_cache()->put(dpp, this->get_key().get_oid(), bl);
1173 }
1174
1175 rgw_bucket_dir_entry ent;
1176 bufferlist& blr = bl;
1177 auto iter = blr.cbegin();
1178 ent.decode(iter);
1179
1180 // Set object's type.
1181 this->category = ent.meta.category;
1182
1183 // Set object state.
1184 state.exists = true;
1185 state.size = ent.meta.size;
1186 state.accounted_size = ent.meta.size;
1187 state.mtime = ent.meta.mtime;
1188
1189 state.has_attrs = true;
1190 bufferlist etag_bl;
1191 string& etag = ent.meta.etag;
1192 ldpp_dout(dpp, 20) <<__func__<< ": object's etag: " << ent.meta.etag << dendl;
1193 etag_bl.append(etag);
1194 state.attrset[RGW_ATTR_ETAG] = etag_bl;
1195
1196 return 0;
1197}
1198
1199MotrObject::~MotrObject() {
1200 this->close_mobj();
1201}
1202
1203// int MotrObject::read_attrs(const DoutPrefixProvider* dpp, Motr::Object::Read &read_op, optional_yield y, rgw_obj* target_obj)
1204// {
1205// read_op.params.attrs = &attrs;
1206// read_op.params.target_obj = target_obj;
1207// read_op.params.obj_size = &obj_size;
1208// read_op.params.lastmod = &mtime;
1209//
1210// return read_op.prepare(dpp);
1211// }
1212
1213int MotrObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y)
1214{
1215 // TODO: implement
1216 ldpp_dout(dpp, 20) <<__func__<< ": MotrObject::set_obj_attrs()" << dendl;
1217 return 0;
1218}
1219
1220int MotrObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj)
1221{
1222 if (this->category == RGWObjCategory::MultiMeta)
1223 return 0;
1224
1225 string bname, key;
1226 if (target_obj) {
1227 bname = target_obj->bucket.name;
1228 key = target_obj->key.get_oid();
1229 } else {
1230 bname = this->get_bucket()->get_name();
1231 key = this->get_key().get_oid();
1232 }
1233 ldpp_dout(dpp, 20) << "MotrObject::get_obj_attrs(): "
1234 << bname << "/" << key << dendl;
1235
1236 // Get object's metadata (those stored in rgw_bucket_dir_entry).
1237 bufferlist bl;
1238 if (this->store->get_obj_meta_cache()->get(dpp, key, bl)) {
1239 // Cache misses.
1240 string bucket_index_iname = "motr.rgw.bucket.index." + bname;
1241 int rc = this->store->do_idx_op_by_name(bucket_index_iname, M0_IC_GET, key, bl);
1242 if (rc < 0) {
1243 ldpp_dout(dpp, 0) << "Failed to get object's entry from bucket index. " << dendl;
1244 return rc;
1245 }
1246
1247 // Put into cache.
1248 this->store->get_obj_meta_cache()->put(dpp, key, bl);
1249 }
1250
1251 rgw_bucket_dir_entry ent;
1252 bufferlist& blr = bl;
1253 auto iter = blr.cbegin();
1254 ent.decode(iter);
1255 decode(attrs, iter);
1256
1257 return 0;
1258}
1259
1260int MotrObject::modify_obj_attrs(const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp)
1261{
1262 rgw_obj target = get_obj();
1263 int r = get_obj_attrs(y, dpp, &target);
1264 if (r < 0) {
1265 return r;
1266 }
1267 set_atomic();
1268 attrs[attr_name] = attr_val;
1269 return set_obj_attrs(dpp, &attrs, nullptr, y);
1270}
1271
1272int MotrObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y)
1273{
1274 rgw_obj target = get_obj();
1275 Attrs rmattr;
1276 bufferlist bl;
1277
1278 set_atomic();
1279 rmattr[attr_name] = bl;
1280 return set_obj_attrs(dpp, nullptr, &rmattr, y);
1281}
1282
1283bool MotrObject::is_expired() {
1284 return false;
1285}
1286
1287// Taken from rgw_rados.cc
1288void MotrObject::gen_rand_obj_instance_name()
1289{
1290 enum {OBJ_INSTANCE_LEN = 32};
1291 char buf[OBJ_INSTANCE_LEN + 1];
1292
1293 gen_rand_alphanumeric_no_underscore(store->ctx(), buf, OBJ_INSTANCE_LEN);
1294 state.obj.key.set_instance(buf);
1295}
1296
1297int MotrObject::omap_get_vals(const DoutPrefixProvider *dpp, const std::string& marker, uint64_t count,
1298 std::map<std::string, bufferlist> *m,
1299 bool* pmore, optional_yield y)
1300{
1301 return 0;
1302}
1303
1304int MotrObject::omap_get_all(const DoutPrefixProvider *dpp, std::map<std::string, bufferlist> *m,
1305 optional_yield y)
1306{
1307 return 0;
1308}
1309
1310int MotrObject::omap_get_vals_by_keys(const DoutPrefixProvider *dpp, const std::string& oid,
1311 const std::set<std::string>& keys,
1312 Attrs* vals)
1313{
1314 return 0;
1315}
1316
1317int MotrObject::omap_set_val_by_key(const DoutPrefixProvider *dpp, const std::string& key, bufferlist& val,
1318 bool must_exist, optional_yield y)
1319{
1320 return 0;
1321}
1322
1323int MotrObject::chown(User& new_user, const DoutPrefixProvider* dpp, optional_yield y)
1324{
1325 return 0;
1326}
1327
1328std::unique_ptr<MPSerializer> MotrObject::get_serializer(const DoutPrefixProvider *dpp,
1329 const std::string& lock_name)
1330{
1331 return std::make_unique<MPMotrSerializer>(dpp, store, this, lock_name);
1332}
1333
1334int MotrObject::transition(Bucket* bucket,
1335 const rgw_placement_rule& placement_rule,
1336 const real_time& mtime,
1337 uint64_t olh_epoch,
1338 const DoutPrefixProvider* dpp,
1339 optional_yield y)
1340{
1341 return 0;
1342}
1343
1344bool MotrObject::placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2)
1345{
1346 /* XXX: support single default zone and zonegroup for now */
1347 return true;
1348}
1349
1350int MotrObject::dump_obj_layout(const DoutPrefixProvider *dpp, optional_yield y, Formatter* f)
1351{
1352 return 0;
1353}
1354
1355std::unique_ptr<Object::ReadOp> MotrObject::get_read_op()
1356{
1357 return std::make_unique<MotrObject::MotrReadOp>(this);
1358}
1359
1360MotrObject::MotrReadOp::MotrReadOp(MotrObject *_source) :
1361 source(_source)
1362{ }
1363
1364int MotrObject::MotrReadOp::prepare(optional_yield y, const DoutPrefixProvider* dpp)
1365{
1366 int rc;
1367 ldpp_dout(dpp, 20) <<__func__<< ": bucket=" << source->get_bucket()->get_name() << dendl;
1368
1369 rgw_bucket_dir_entry ent;
1370 rc = source->get_bucket_dir_ent(dpp, ent);
1371 if (rc < 0)
1372 return rc;
1373
1374 // Set source object's attrs. The attrs is key/value map and is used
1375 // in send_response_data() to set attributes, including etag.
1376 bufferlist etag_bl;
1377 string& etag = ent.meta.etag;
1378 ldpp_dout(dpp, 20) <<__func__<< ": object's etag: " << ent.meta.etag << dendl;
1379 etag_bl.append(etag.c_str(), etag.size());
1380 source->get_attrs().emplace(std::move(RGW_ATTR_ETAG), std::move(etag_bl));
1381
1382 source->set_key(ent.key);
1383 source->set_obj_size(ent.meta.size);
1384 source->category = ent.meta.category;
1385 *params.lastmod = ent.meta.mtime;
1386
1387 if (params.mod_ptr || params.unmod_ptr) {
1388 // Convert all times go GMT to make them compatible
1389 obj_time_weight src_weight;
1390 src_weight.init(*params.lastmod, params.mod_zone_id, params.mod_pg_ver);
1391 src_weight.high_precision = params.high_precision_time;
1392
1393 obj_time_weight dest_weight;
1394 dest_weight.high_precision = params.high_precision_time;
1395
1396 // Check if-modified-since condition
1397 if (params.mod_ptr && !params.if_nomatch) {
1398 dest_weight.init(*params.mod_ptr, params.mod_zone_id, params.mod_pg_ver);
1399 ldpp_dout(dpp, 10) << "If-Modified-Since: " << dest_weight << " & "
1400 << "Last-Modified: " << src_weight << dendl;
1401 if (!(dest_weight < src_weight)) {
1402 return -ERR_NOT_MODIFIED;
1403 }
1404 }
1405
1406 // Check if-unmodified-since condition
1407 if (params.unmod_ptr && !params.if_match) {
1408 dest_weight.init(*params.unmod_ptr, params.mod_zone_id, params.mod_pg_ver);
1409 ldpp_dout(dpp, 10) << "If-UnModified-Since: " << dest_weight << " & "
1410 << "Last-Modified: " << src_weight << dendl;
1411 if (dest_weight < src_weight) {
1412 return -ERR_PRECONDITION_FAILED;
1413 }
1414 }
1415 }
1416 // Check if-match condition
1417 if (params.if_match) {
1418 string if_match_str = rgw_string_unquote(params.if_match);
1419 ldpp_dout(dpp, 10) << "ETag: " << etag << " & "
1420 << "If-Match: " << if_match_str << dendl;
1421 if (if_match_str.compare(etag) != 0) {
1422 return -ERR_PRECONDITION_FAILED;
1423 }
1424 }
1425 // Check if-none-match condition
1426 if (params.if_nomatch) {
1427 string if_nomatch_str = rgw_string_unquote(params.if_nomatch);
1428 ldpp_dout(dpp, 10) << "ETag: " << etag << " & "
1429 << "If-NoMatch: " << if_nomatch_str << dendl;
1430 if (if_nomatch_str.compare(etag) == 0) {
1431 return -ERR_NOT_MODIFIED;
1432 }
1433 }
1434
1435 // Skip opening an empty object.
1436 if(source->get_obj_size() == 0)
1437 return 0;
1438
1439 // Open the object here.
1440 if (source->category == RGWObjCategory::MultiMeta) {
1441 ldpp_dout(dpp, 20) <<__func__<< ": open obj parts..." << dendl;
1442 rc = source->get_part_objs(dpp, this->part_objs)? :
1443 source->open_part_objs(dpp, this->part_objs);
1444 return rc;
1445 } else {
1446 ldpp_dout(dpp, 20) <<__func__<< ": open object..." << dendl;
1447 return source->open_mobj(dpp);
1448 }
1449}
1450
1451int MotrObject::MotrReadOp::read(int64_t off, int64_t end, bufferlist& bl, optional_yield y, const DoutPrefixProvider* dpp)
1452{
1453 ldpp_dout(dpp, 20) << "MotrReadOp::read(): sync read." << dendl;
1454 return 0;
1455}
1456
1457// RGWGetObj::execute() calls ReadOp::iterate() to read object from 'off' to 'end'.
1458// The returned data is processed in 'cb' which is a chain of post-processing
1459// filters such as decompression, de-encryption and sending back data to client
1460// (RGWGetObj_CB::handle_dta which in turn calls RGWGetObj::get_data_cb() to
1461// send data back.).
1462//
1463// POC implements a simple sync version of iterate() function in which it reads
1464// a block of data each time and call 'cb' for post-processing.
1465int MotrObject::MotrReadOp::iterate(const DoutPrefixProvider* dpp, int64_t off, int64_t end, RGWGetDataCB* cb, optional_yield y)
1466{
1467 int rc;
1468
1469 if (source->category == RGWObjCategory::MultiMeta)
1470 rc = source->read_multipart_obj(dpp, off, end, cb, part_objs);
1471 else
1472 rc = source->read_mobj(dpp, off, end, cb);
1473
1474 return rc;
1475}
1476
1477int MotrObject::MotrReadOp::get_attr(const DoutPrefixProvider* dpp, const char* name, bufferlist& dest, optional_yield y)
1478{
1479 //return 0;
1480 return -ENODATA;
1481}
1482
1483std::unique_ptr<Object::DeleteOp> MotrObject::get_delete_op()
1484{
1485 return std::make_unique<MotrObject::MotrDeleteOp>(this);
1486}
1487
1488MotrObject::MotrDeleteOp::MotrDeleteOp(MotrObject *_source) :
1489 source(_source)
1490{ }
1491
1492// Implementation of DELETE OBJ also requires MotrObject::get_obj_state()
1493// to retrieve and set object's state from object's metadata.
1494//
1495// TODO:
1496// 1. The POC only remove the object's entry from bucket index and delete
1497// corresponding Motr objects. It doesn't handle the DeleteOp::params.
1498// Delete::delete_obj() in rgw_rados.cc shows how rados backend process the
1499// params.
1500// 2. Delete an object when its versioning is turned on.
1501int MotrObject::MotrDeleteOp::delete_obj(const DoutPrefixProvider* dpp, optional_yield y)
1502{
1503 ldpp_dout(dpp, 20) << "delete " << source->get_key().get_oid() << " from " << source->get_bucket()->get_name() << dendl;
1504
1505 rgw_bucket_dir_entry ent;
1506 int rc = source->get_bucket_dir_ent(dpp, ent);
1507 if (rc < 0) {
1508 return rc;
1509 }
1510
1511 //TODO: When integrating with background GC for object deletion,
1512 // we should consider adding object entry to GC before deleting the metadata.
1513 // Delete from the cache first.
1514 source->store->get_obj_meta_cache()->remove(dpp, source->get_key().get_oid());
1515
1516 // Delete the object's entry from the bucket index.
1517 bufferlist bl;
1518 string bucket_index_iname = "motr.rgw.bucket.index." + source->get_bucket()->get_name();
1519 rc = source->store->do_idx_op_by_name(bucket_index_iname,
1520 M0_IC_DEL, source->get_key().get_oid(), bl);
1521 if (rc < 0) {
1522 ldpp_dout(dpp, 0) << "Failed to del object's entry from bucket index. " << dendl;
1523 return rc;
1524 }
1525
1526 if (ent.meta.size == 0) {
1527 ldpp_dout(dpp, 0) << __func__ << ": Object size is 0, not deleting motr object." << dendl;
1528 return 0;
1529 }
1530 // Remove the motr objects.
1531 if (source->category == RGWObjCategory::MultiMeta)
1532 rc = source->delete_part_objs(dpp);
1533 else
1534 rc = source->delete_mobj(dpp);
1535 if (rc < 0) {
1536 ldpp_dout(dpp, 0) << "Failed to delete the object from Motr. " << dendl;
1537 return rc;
1538 }
1539
1540 //result.delete_marker = parent_op.result.delete_marker;
1541 //result.version_id = parent_op.result.version_id;
1542 return 0;
1543}
1544
1545int MotrObject::delete_object(const DoutPrefixProvider* dpp, optional_yield y, bool prevent_versioning)
1546{
1547 MotrObject::MotrDeleteOp del_op(this);
1548 del_op.params.bucket_owner = bucket->get_info().owner;
1549 del_op.params.versioning_status = bucket->get_info().versioning_status();
1550
1551 return del_op.delete_obj(dpp, y);
1552}
1553
1554int MotrObject::delete_obj_aio(const DoutPrefixProvider* dpp, RGWObjState* astate,
1555 Completions* aio, bool keep_index_consistent,
1556 optional_yield y)
1557{
1558 /* XXX: Make it async */
1559 return 0;
1560}
1561
1562int MotrObject::copy_object(User* user,
1563 req_info* info,
1564 const rgw_zone_id& source_zone,
1565 rgw::sal::Object* dest_object,
1566 rgw::sal::Bucket* dest_bucket,
1567 rgw::sal::Bucket* src_bucket,
1568 const rgw_placement_rule& dest_placement,
1569 ceph::real_time* src_mtime,
1570 ceph::real_time* mtime,
1571 const ceph::real_time* mod_ptr,
1572 const ceph::real_time* unmod_ptr,
1573 bool high_precision_time,
1574 const char* if_match,
1575 const char* if_nomatch,
1576 AttrsMod attrs_mod,
1577 bool copy_if_newer,
1578 Attrs& attrs,
1579 RGWObjCategory category,
1580 uint64_t olh_epoch,
1581 boost::optional<ceph::real_time> delete_at,
1582 std::string* version_id,
1583 std::string* tag,
1584 std::string* etag,
1585 void (*progress_cb)(off_t, void *),
1586 void* progress_data,
1587 const DoutPrefixProvider* dpp,
1588 optional_yield y)
1589{
1590 return 0;
1591}
1592
1593int MotrObject::swift_versioning_restore(bool& restored,
1594 const DoutPrefixProvider* dpp)
1595{
1596 return 0;
1597}
1598
1599int MotrObject::swift_versioning_copy(const DoutPrefixProvider* dpp,
1600 optional_yield y)
1601{
1602 return 0;
1603}
1604
1605MotrAtomicWriter::MotrAtomicWriter(const DoutPrefixProvider *dpp,
1606 optional_yield y,
1607 rgw::sal::Object* obj,
1608 MotrStore* _store,
1609 const rgw_user& _owner,
1610 const rgw_placement_rule *_ptail_placement_rule,
1611 uint64_t _olh_epoch,
1612 const std::string& _unique_tag) :
1613 StoreWriter(dpp, y),
1614 store(_store),
1615 owner(_owner),
1616 ptail_placement_rule(_ptail_placement_rule),
1617 olh_epoch(_olh_epoch),
1618 unique_tag(_unique_tag),
1619 obj(_store, obj->get_key(), obj->get_bucket()),
1620 old_obj(_store, obj->get_key(), obj->get_bucket()) {}
1621
1622static const unsigned MAX_BUFVEC_NR = 256;
1623
1624int MotrAtomicWriter::prepare(optional_yield y)
1625{
1626 total_data_size = 0;
1627
1628 if (obj.is_opened())
1629 return 0;
1630
1631 rgw_bucket_dir_entry ent;
1632 int rc = old_obj.get_bucket_dir_ent(dpp, ent);
1633 if (rc == 0) {
1634 ldpp_dout(dpp, 20) << __func__ << ": object exists." << dendl;
1635 }
1636
1637 rc = m0_bufvec_empty_alloc(&buf, MAX_BUFVEC_NR) ?:
1638 m0_bufvec_alloc(&attr, MAX_BUFVEC_NR, 1) ?:
1639 m0_indexvec_alloc(&ext, MAX_BUFVEC_NR);
1640 if (rc != 0)
1641 this->cleanup();
1642
1643 return rc;
1644}
1645
1646int MotrObject::create_mobj(const DoutPrefixProvider *dpp, uint64_t sz)
1647{
1648 if (mobj != nullptr) {
1649 ldpp_dout(dpp, 0) <<__func__<< "ERROR: object is already opened" << dendl;
1650 return -EINVAL;
1651 }
1652
1653 int rc = m0_ufid_next(&ufid_gr, 1, &meta.oid);
1654 if (rc != 0) {
1655 ldpp_dout(dpp, 0) <<__func__<< "ERROR: m0_ufid_next() failed: " << rc << dendl;
1656 return rc;
1657 }
1658
1659 char fid_str[M0_FID_STR_LEN];
1660 snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&meta.oid));
1661 ldpp_dout(dpp, 20) <<__func__<< ": sz=" << sz << " oid=" << fid_str << dendl;
1662
1663 int64_t lid = m0_layout_find_by_objsz(store->instance, nullptr, sz);
1664 M0_ASSERT(lid > 0);
1665
1666 M0_ASSERT(mobj == nullptr);
1667 mobj = new m0_obj();
1668 m0_obj_init(mobj, &store->container.co_realm, &meta.oid, lid);
1669
1670 struct m0_op *op = nullptr;
1671 mobj->ob_entity.en_flags |= M0_ENF_META;
1672 rc = m0_entity_create(nullptr, &mobj->ob_entity, &op);
1673 if (rc != 0) {
1674 this->close_mobj();
1675 ldpp_dout(dpp, 0) << "ERROR: m0_entity_create() failed: " << rc << dendl;
1676 return rc;
1677 }
1678 ldpp_dout(dpp, 20) <<__func__<< ": call m0_op_launch()..." << dendl;
1679 m0_op_launch(&op, 1);
1680 rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?:
1681 m0_rc(op);
1682 m0_op_fini(op);
1683 m0_op_free(op);
1684
1685 if (rc != 0) {
1686 this->close_mobj();
1687 ldpp_dout(dpp, 0) << "ERROR: failed to create motr object: " << rc << dendl;
1688 return rc;
1689 }
1690
1691 meta.layout_id = mobj->ob_attr.oa_layout_id;
1692 meta.pver = mobj->ob_attr.oa_pver;
1693 ldpp_dout(dpp, 20) <<__func__<< ": lid=0x" << std::hex << meta.layout_id
1694 << std::dec << " rc=" << rc << dendl;
1695
1696 // TODO: add key:user+bucket+key+obj.meta.oid value:timestamp to
1697 // gc.queue.index. See more at github.com/Seagate/cortx-rgw/issues/7.
1698
1699 return rc;
1700}
1701
1702int MotrObject::open_mobj(const DoutPrefixProvider *dpp)
1703{
1704 char fid_str[M0_FID_STR_LEN];
1705 snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&meta.oid));
1706 ldpp_dout(dpp, 20) <<__func__<< ": oid=" << fid_str << dendl;
1707
1708 int rc;
1709 if (meta.layout_id == 0) {
1710 rgw_bucket_dir_entry ent;
1711 rc = this->get_bucket_dir_ent(dpp, ent);
1712 if (rc < 0) {
1713 ldpp_dout(dpp, 0) << "ERROR: open_mobj() failed: rc=" << rc << dendl;
1714 return rc;
1715 }
1716 }
1717
1718 if (meta.layout_id == 0)
1719 return -ENOENT;
1720
1721 M0_ASSERT(mobj == nullptr);
1722 mobj = new m0_obj();
1723 memset(mobj, 0, sizeof *mobj);
1724 m0_obj_init(mobj, &store->container.co_realm, &meta.oid, store->conf.mc_layout_id);
1725
1726 struct m0_op *op = nullptr;
1727 mobj->ob_attr.oa_layout_id = meta.layout_id;
1728 mobj->ob_attr.oa_pver = meta.pver;
1729 mobj->ob_entity.en_flags |= M0_ENF_META;
1730 rc = m0_entity_open(&mobj->ob_entity, &op);
1731 if (rc != 0) {
1732 ldpp_dout(dpp, 0) << "ERROR: m0_entity_open() failed: rc=" << rc << dendl;
1733 this->close_mobj();
1734 return rc;
1735 }
1736 m0_op_launch(&op, 1);
1737 rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?:
1738 m0_rc(op);
1739 m0_op_fini(op);
1740 m0_op_free(op);
1741
1742 if (rc < 0) {
1743 ldpp_dout(dpp, 10) << "ERROR: failed to open motr object: rc=" << rc << dendl;
1744 this->close_mobj();
1745 return rc;
1746 }
1747
1748 ldpp_dout(dpp, 20) <<__func__<< ": rc=" << rc << dendl;
1749
1750 return 0;
1751}
1752
1753int MotrObject::delete_mobj(const DoutPrefixProvider *dpp)
1754{
1755 int rc;
1756 char fid_str[M0_FID_STR_LEN];
1757 snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&meta.oid));
1758 if (!meta.oid.u_hi || !meta.oid.u_lo) {
1759 ldpp_dout(dpp, 20) << __func__ << ": invalid motr object oid=" << fid_str << dendl;
1760 return -EINVAL;
1761 }
1762 ldpp_dout(dpp, 20) << __func__ << ": deleting motr object oid=" << fid_str << dendl;
1763
1764 // Open the object.
1765 if (mobj == nullptr) {
1766 rc = this->open_mobj(dpp);
1767 if (rc < 0)
1768 return rc;
1769 }
1770
1771 // Create an DELETE op and execute it (sync version).
1772 struct m0_op *op = nullptr;
1773 mobj->ob_entity.en_flags |= M0_ENF_META;
1774 rc = m0_entity_delete(&mobj->ob_entity, &op);
1775 if (rc != 0) {
1776 ldpp_dout(dpp, 0) << "ERROR: m0_entity_delete() failed: " << rc << dendl;
1777 return rc;
1778 }
1779 m0_op_launch(&op, 1);
1780 rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?:
1781 m0_rc(op);
1782 m0_op_fini(op);
1783 m0_op_free(op);
1784
1785 if (rc < 0) {
1786 ldpp_dout(dpp, 0) << "ERROR: failed to open motr object: " << rc << dendl;
1787 return rc;
1788 }
1789
1790 this->close_mobj();
1791
1792 return 0;
1793}
1794
1795void MotrObject::close_mobj()
1796{
1797 if (mobj == nullptr)
1798 return;
1799 m0_obj_fini(mobj);
1800 delete mobj; mobj = nullptr;
1801}
1802
1803int MotrObject::write_mobj(const DoutPrefixProvider *dpp, bufferlist&& data, uint64_t offset)
1804{
1805 int rc;
1806 unsigned bs, left;
1807 struct m0_op *op;
1808 char *start, *p;
1809 struct m0_bufvec buf;
1810 struct m0_bufvec attr;
1811 struct m0_indexvec ext;
1812
1813 left = data.length();
1814 if (left == 0)
1815 return 0;
1816
1817 rc = m0_bufvec_empty_alloc(&buf, 1) ?:
1818 m0_bufvec_alloc(&attr, 1, 1) ?:
1819 m0_indexvec_alloc(&ext, 1);
1820 if (rc != 0)
1821 goto out;
1822
1823 bs = this->get_optimal_bs(left);
1824 ldpp_dout(dpp, 20) <<__func__<< ": left=" << left << " bs=" << bs << dendl;
1825
1826 start = data.c_str();
1827
1828 for (p = start; left > 0; left -= bs, p += bs, offset += bs) {
1829 if (left < bs)
1830 bs = this->get_optimal_bs(left);
1831 if (left < bs) {
1832 data.append_zero(bs - left);
1833 left = bs;
1834 p = data.c_str();
1835 }
1836 buf.ov_buf[0] = p;
1837 buf.ov_vec.v_count[0] = bs;
1838 ext.iv_index[0] = offset;
1839 ext.iv_vec.v_count[0] = bs;
1840 attr.ov_vec.v_count[0] = 0;
1841
1842 op = nullptr;
1843 rc = m0_obj_op(this->mobj, M0_OC_WRITE, &ext, &buf, &attr, 0, 0, &op);
1844 if (rc != 0)
1845 goto out;
1846 m0_op_launch(&op, 1);
1847 rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?:
1848 m0_rc(op);
1849 m0_op_fini(op);
1850 m0_op_free(op);
1851 if (rc != 0)
1852 goto out;
1853 }
1854
1855out:
1856 m0_indexvec_free(&ext);
1857 m0_bufvec_free(&attr);
1858 m0_bufvec_free2(&buf);
1859 return rc;
1860}
1861
1862int MotrObject::read_mobj(const DoutPrefixProvider* dpp, int64_t off, int64_t end, RGWGetDataCB* cb)
1863{
1864 int rc;
1865 unsigned bs, actual, left;
1866 struct m0_op *op;
1867 struct m0_bufvec buf;
1868 struct m0_bufvec attr;
1869 struct m0_indexvec ext;
1870
1871 // make end pointer exclusive:
1872 // it's easier to work with it this way
1873 end++;
1874 ldpp_dout(dpp, 20) << "MotrObject::read_mobj(): off=" << off <<
1875 " end=" << end << dendl;
1876 // As `off` may not be parity group size aligned, even using optimal
1877 // buffer block size, simply reading data from offset `off` could come
1878 // across parity group boundary. And Motr only allows page-size aligned
1879 // offset.
1880 //
1881 // The optimal size of each IO should also take into account the data
1882 // transfer size to s3 client. For example, 16MB may be nice to read
1883 // data from motr, but it could be too big for network transfer.
1884 //
1885 // TODO: We leave proper handling of offset in the future.
1886 bs = this->get_optimal_bs(end - off);
1887 ldpp_dout(dpp, 20) << "MotrObject::read_mobj(): bs=" << bs << dendl;
1888
1889 rc = m0_bufvec_empty_alloc(&buf, 1) ? :
1890 m0_bufvec_alloc(&attr, 1, 1) ? :
1891 m0_indexvec_alloc(&ext, 1);
1892 if (rc < 0)
1893 goto out;
1894
1895 left = end - off;
1896 for (; left > 0; off += actual) {
1897 if (left < bs)
1898 bs = this->get_optimal_bs(left);
1899 actual = bs;
1900 if (left < bs)
1901 actual = left;
1902 ldpp_dout(dpp, 20) << "MotrObject::read_mobj(): off=" << off <<
1903 " actual=" << actual << dendl;
1904 bufferlist bl;
1905 buf.ov_buf[0] = bl.append_hole(bs).c_str();
1906 buf.ov_vec.v_count[0] = bs;
1907 ext.iv_index[0] = off;
1908 ext.iv_vec.v_count[0] = bs;
1909 attr.ov_vec.v_count[0] = 0;
1910
1911 left -= actual;
1912 // Read from Motr.
1913 op = nullptr;
1914 rc = m0_obj_op(this->mobj, M0_OC_READ, &ext, &buf, &attr, 0, 0, &op);
1915 ldpp_dout(dpp, 20) << "MotrObject::read_mobj(): init read op rc=" << rc << dendl;
1916 if (rc != 0) {
1917 ldpp_dout(dpp, 0) << __func__ << ": read failed during m0_obj_op, rc=" << rc << dendl;
1918 goto out;
1919 }
1920 m0_op_launch(&op, 1);
1921 rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?:
1922 m0_rc(op);
1923 m0_op_fini(op);
1924 m0_op_free(op);
1925 if (rc != 0) {
1926 ldpp_dout(dpp, 0) << __func__ << ": read failed, m0_op_wait rc=" << rc << dendl;
1927 goto out;
1928 }
1929 // Call `cb` to process returned data.
1930 ldpp_dout(dpp, 20) << "MotrObject::read_mobj(): call cb to process data" << dendl;
1931 cb->handle_data(bl, 0, actual);
1932 }
1933
1934out:
1935 m0_indexvec_free(&ext);
1936 m0_bufvec_free(&attr);
1937 m0_bufvec_free2(&buf);
1938 this->close_mobj();
1939
1940 return rc;
1941}
1942
1943int MotrObject::get_bucket_dir_ent(const DoutPrefixProvider *dpp, rgw_bucket_dir_entry& ent)
1944{
1945 int rc = 0;
1946 string bucket_index_iname = "motr.rgw.bucket.index." + this->get_bucket()->get_name();
1947 int max = 1000;
1948 vector<string> keys(max);
1949 vector<bufferlist> vals(max);
1950 bufferlist bl;
1951 bufferlist::const_iterator iter;
1952
1953 if (this->get_bucket()->get_info().versioning_status() == BUCKET_VERSIONED ||
1954 this->get_bucket()->get_info().versioning_status() == BUCKET_SUSPENDED) {
1955
1956 rgw_bucket_dir_entry ent_to_check;
1957
1958 if (this->store->get_obj_meta_cache()->get(dpp, this->get_name(), bl) == 0) {
1959 iter = bl.cbegin();
1960 ent_to_check.decode(iter);
1961 if (ent_to_check.is_current()) {
1962 ent = ent_to_check;
1963 rc = 0;
1964 goto out;
1965 }
1966 }
1967
1968 ldpp_dout(dpp, 20) <<__func__<< ": versioned bucket!" << dendl;
1969 keys[0] = this->get_name();
1970 rc = store->next_query_by_name(bucket_index_iname, keys, vals);
1971 if (rc < 0) {
1972 ldpp_dout(dpp, 0) << __func__ << "ERROR: NEXT query failed. " << rc << dendl;
1973 return rc;
1974 }
1975
1976 rc = -ENOENT;
1977 for (const auto& bl: vals) {
1978 if (bl.length() == 0)
1979 break;
1980
1981 iter = bl.cbegin();
1982 ent_to_check.decode(iter);
1983 if (ent_to_check.is_current()) {
1984 ldpp_dout(dpp, 20) <<__func__<< ": found current version!" << dendl;
1985 ent = ent_to_check;
1986 rc = 0;
1987
1988 this->store->get_obj_meta_cache()->put(dpp, this->get_name(), bl);
1989
1990 break;
1991 }
1992 }
1993 } else {
1994 if (this->store->get_obj_meta_cache()->get(dpp, this->get_key().get_oid(), bl)) {
1995 ldpp_dout(dpp, 20) <<__func__<< ": non-versioned bucket!" << dendl;
1996 rc = this->store->do_idx_op_by_name(bucket_index_iname,
1997 M0_IC_GET, this->get_key().get_oid(), bl);
1998 if (rc < 0) {
1999 ldpp_dout(dpp, 0) << __func__ << "ERROR: failed to get object's entry from bucket index: rc="
2000 << rc << dendl;
2001 return rc;
2002 }
2003 this->store->get_obj_meta_cache()->put(dpp, this->get_key().get_oid(), bl);
2004 }
2005
2006 bufferlist& blr = bl;
2007 iter = blr.cbegin();
2008 ent.decode(iter);
2009 }
2010
2011out:
2012 if (rc == 0) {
2013 sal::Attrs dummy;
2014 decode(dummy, iter);
2015 meta.decode(iter);
2016 ldpp_dout(dpp, 20) <<__func__<< ": lid=0x" << std::hex << meta.layout_id << dendl;
2017 char fid_str[M0_FID_STR_LEN];
2018 snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&meta.oid));
2019 ldpp_dout(dpp, 70) << __func__ << ": oid=" << fid_str << dendl;
2020 } else
2021 ldpp_dout(dpp, 0) <<__func__<< ": rc=" << rc << dendl;
2022
2023 return rc;
2024}
2025
2026int MotrObject::update_version_entries(const DoutPrefixProvider *dpp)
2027{
2028 int rc;
2029 int max = 10;
2030 vector<string> keys(max);
2031 vector<bufferlist> vals(max);
2032
2033 string bucket_index_iname = "motr.rgw.bucket.index." + this->get_bucket()->get_name();
2034 keys[0] = this->get_name();
2035 rc = store->next_query_by_name(bucket_index_iname, keys, vals);
2036 ldpp_dout(dpp, 20) << "get all versions, name = " << this->get_name() << "rc = " << rc << dendl;
2037 if (rc < 0) {
2038 ldpp_dout(dpp, 0) << "ERROR: NEXT query failed. " << rc << dendl;
2039 return rc;
2040 }
2041
2042 // no entries returned.
2043 if (rc == 0)
2044 return 0;
2045
2046 for (const auto& bl: vals) {
2047 if (bl.length() == 0)
2048 break;
2049
2050 rgw_bucket_dir_entry ent;
2051 auto iter = bl.cbegin();
2052 ent.decode(iter);
2053
2054 if (0 != ent.key.name.compare(0, this->get_name().size(), this->get_name()))
2055 continue;
2056
2057 if (!ent.is_current())
2058 continue;
2059
2060 // Remove from the cache.
2061 store->get_obj_meta_cache()->remove(dpp, this->get_name());
2062
2063 rgw::sal::Attrs attrs;
2064 decode(attrs, iter);
2065 MotrObject::Meta meta;
2066 meta.decode(iter);
2067
2068 ent.flags = rgw_bucket_dir_entry::FLAG_VER;
2069 string key;
2070 if (ent.key.instance.empty())
2071 key = ent.key.name;
2072 else {
2073 char buf[ent.key.name.size() + ent.key.instance.size() + 16];
2074 snprintf(buf, sizeof(buf), "%s[%s]", ent.key.name.c_str(), ent.key.instance.c_str());
2075 key = buf;
2076 }
2077 ldpp_dout(dpp, 20) << "update one version, key = " << key << dendl;
2078 bufferlist ent_bl;
2079 ent.encode(ent_bl);
2080 encode(attrs, ent_bl);
2081 meta.encode(ent_bl);
2082
2083 rc = store->do_idx_op_by_name(bucket_index_iname,
2084 M0_IC_PUT, key, ent_bl);
2085 if (rc < 0)
2086 break;
2087 }
2088 return rc;
2089}
2090
2091// Scan object_nnn_part_index to get all parts then open their motr objects.
2092// TODO: all parts are opened in the POC. But for a large object, for example
2093// a 5GB object will have about 300 parts (for default 15MB part). A better
2094// way of managing opened object may be needed.
2095int MotrObject::get_part_objs(const DoutPrefixProvider* dpp,
2096 std::map<int, std::unique_ptr<MotrObject>>& part_objs)
2097{
2098 int rc;
2099 int max_parts = 1000;
2100 int marker = 0;
2101 uint64_t off = 0;
2102 bool truncated = false;
2103 std::unique_ptr<rgw::sal::MultipartUpload> upload;
2104
2105 upload = this->get_bucket()->get_multipart_upload(this->get_name(), string());
2106
2107 do {
2108 rc = upload->list_parts(dpp, store->ctx(), max_parts, marker, &marker, &truncated);
2109 if (rc == -ENOENT) {
2110 rc = -ERR_NO_SUCH_UPLOAD;
2111 }
2112 if (rc < 0)
2113 return rc;
2114
2115 std::map<uint32_t, std::unique_ptr<MultipartPart>>& parts = upload->get_parts();
2116 for (auto part_iter = parts.begin(); part_iter != parts.end(); ++part_iter) {
2117
2118 MultipartPart *mpart = part_iter->second.get();
2119 MotrMultipartPart *mmpart = static_cast<MotrMultipartPart *>(mpart);
2120 uint32_t part_num = mmpart->get_num();
2121 uint64_t part_size = mmpart->get_size();
2122
2123 string part_obj_name = this->get_bucket()->get_name() + "." +
2124 this->get_key().get_oid() +
2125 ".part." + std::to_string(part_num);
2126 std::unique_ptr<rgw::sal::Object> obj;
2127 obj = this->bucket->get_object(rgw_obj_key(part_obj_name));
2128 std::unique_ptr<rgw::sal::MotrObject> mobj(static_cast<rgw::sal::MotrObject *>(obj.release()));
2129
2130 ldpp_dout(dpp, 20) << "get_part_objs: off = " << off << ", size = " << part_size << dendl;
2131 mobj->part_off = off;
2132 mobj->part_size = part_size;
2133 mobj->part_num = part_num;
2134 mobj->meta = mmpart->meta;
2135
2136 part_objs.emplace(part_num, std::move(mobj));
2137
2138 off += part_size;
2139 }
2140 } while (truncated);
2141
2142 return 0;
2143}
2144
2145int MotrObject::open_part_objs(const DoutPrefixProvider* dpp,
2146 std::map<int, std::unique_ptr<MotrObject>>& part_objs)
2147{
2148 //for (auto& iter: part_objs) {
2149 for (auto iter = part_objs.begin(); iter != part_objs.end(); ++iter) {
2150 MotrObject* obj = static_cast<MotrObject *>(iter->second.get());
2151 ldpp_dout(dpp, 20) << "open_part_objs: name = " << obj->get_name() << dendl;
2152 int rc = obj->open_mobj(dpp);
2153 if (rc < 0)
2154 return rc;
2155 }
2156
2157 return 0;
2158}
2159
2160int MotrObject::delete_part_objs(const DoutPrefixProvider* dpp)
2161{
2162 std::unique_ptr<rgw::sal::MultipartUpload> upload;
2163 upload = this->get_bucket()->get_multipart_upload(this->get_name(), string());
2164 std::unique_ptr<rgw::sal::MotrMultipartUpload> mupload(static_cast<rgw::sal::MotrMultipartUpload *>(upload.release()));
2165 return mupload->delete_parts(dpp);
2166}
2167
2168int MotrObject::read_multipart_obj(const DoutPrefixProvider* dpp,
2169 int64_t off, int64_t end, RGWGetDataCB* cb,
2170 std::map<int, std::unique_ptr<MotrObject>>& part_objs)
2171{
2172 int64_t cursor = off;
2173
2174 ldpp_dout(dpp, 20) << "read_multipart_obj: off=" << off << " end=" << end << dendl;
2175
2176 // Find the parts which are in the (off, end) range and
2177 // read data from it. Note: `end` argument is inclusive.
2178 for (auto iter = part_objs.begin(); iter != part_objs.end(); ++iter) {
2179 MotrObject* obj = static_cast<MotrObject *>(iter->second.get());
2180 int64_t part_off = obj->part_off;
2181 int64_t part_size = obj->part_size;
2182 int64_t part_end = obj->part_off + obj->part_size - 1;
2183 ldpp_dout(dpp, 20) << "read_multipart_obj: part_off=" << part_off
2184 << " part_end=" << part_end << dendl;
2185 if (part_end < off)
2186 continue;
2187
2188 int64_t local_off = cursor - obj->part_off;
2189 int64_t local_end = part_end < end? part_size - 1 : end - part_off;
2190 ldpp_dout(dpp, 20) << "real_multipart_obj: name=" << obj->get_name()
2191 << " local_off=" << local_off
2192 << " local_end=" << local_end << dendl;
2193 int rc = obj->read_mobj(dpp, local_off, local_end, cb);
2194 if (rc < 0)
2195 return rc;
2196
2197 cursor = part_end + 1;
2198 if (cursor > end)
2199 break;
2200 }
2201
2202 return 0;
2203}
2204
2205static unsigned roundup(unsigned x, unsigned by)
2206{
2207 return ((x - 1) / by + 1) * by;
2208}
2209
2210unsigned MotrObject::get_optimal_bs(unsigned len)
2211{
2212 struct m0_pool_version *pver;
2213
2214 pver = m0_pool_version_find(&store->instance->m0c_pools_common,
2215 &mobj->ob_attr.oa_pver);
2216 M0_ASSERT(pver != nullptr);
2217 struct m0_pdclust_attr *pa = &pver->pv_attr;
2218 uint64_t lid = M0_OBJ_LAYOUT_ID(meta.layout_id);
2219 unsigned unit_sz = m0_obj_layout_id_to_unit_size(lid);
2220 unsigned grp_sz = unit_sz * pa->pa_N;
2221
2222 // bs should be max 4-times pool-width deep counting by 1MB units, or
2223 // 8-times deep counting by 512K units, 16-times deep by 256K units,
2224 // and so on. Several units to one target will be aggregated to make
2225 // fewer network RPCs, disk i/o operations and BE transactions.
2226 // For unit sizes of 32K or less, the depth is 128, which
2227 // makes it 32K * 128 == 4MB - the maximum amount per target when
2228 // the performance is still good on LNet (which has max 1MB frames).
2229 // TODO: it may be different on libfabric, should be re-measured.
2230 unsigned depth = 128 / ((unit_sz + 0x7fff) / 0x8000);
2231 if (depth == 0)
2232 depth = 1;
2233 // P * N / (N + K + S) - number of data units to span the pool-width
2234 unsigned max_bs = depth * unit_sz * pa->pa_P * pa->pa_N /
2235 (pa->pa_N + pa->pa_K + pa->pa_S);
2236 max_bs = roundup(max_bs, grp_sz); // multiple of group size
2237 if (len >= max_bs)
2238 return max_bs;
2239 else if (len <= grp_sz)
2240 return grp_sz;
2241 else
2242 return roundup(len, grp_sz);
2243}
2244
2245void MotrAtomicWriter::cleanup()
2246{
2247 m0_indexvec_free(&ext);
2248 m0_bufvec_free(&attr);
2249 m0_bufvec_free2(&buf);
2250 acc_data.clear();
2251 obj.close_mobj();
2252 old_obj.close_mobj();
2253}
2254
2255unsigned MotrAtomicWriter::populate_bvec(unsigned len, bufferlist::iterator &bi)
2256{
2257 unsigned i, l, done = 0;
2258 const char *data;
2259
2260 for (i = 0; i < MAX_BUFVEC_NR && len > 0; ++i) {
2261 l = bi.get_ptr_and_advance(len, &data);
2262 buf.ov_buf[i] = (char*)data;
2263 buf.ov_vec.v_count[i] = l;
2264 ext.iv_index[i] = acc_off;
2265 ext.iv_vec.v_count[i] = l;
2266 attr.ov_vec.v_count[i] = 0;
2267 acc_off += l;
2268 len -= l;
2269 done += l;
2270 }
2271 buf.ov_vec.v_nr = i;
2272 ext.iv_vec.v_nr = i;
2273
2274 return done;
2275}
2276
2277int MotrAtomicWriter::write()
2278{
2279 int rc;
2280 unsigned bs, left;
2281 struct m0_op *op;
2282 bufferlist::iterator bi;
2283
2284 left = acc_data.length();
2285
2286 if (!obj.is_opened()) {
2287 rc = obj.create_mobj(dpp, left);
2288 if (rc == -EEXIST)
2289 rc = obj.open_mobj(dpp);
2290 if (rc != 0) {
2291 char fid_str[M0_FID_STR_LEN];
2292 snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&obj.meta.oid));
2293 ldpp_dout(dpp, 0) << "ERROR: failed to create/open motr object "
2294 << fid_str << " (" << obj.get_bucket()->get_name()
2295 << "/" << obj.get_key().get_oid() << "): rc=" << rc
2296 << dendl;
2297 goto err;
2298 }
2299 }
2300
2301 total_data_size += left;
2302
2303 bs = obj.get_optimal_bs(left);
2304 ldpp_dout(dpp, 20) <<__func__<< ": left=" << left << " bs=" << bs << dendl;
2305
2306 bi = acc_data.begin();
2307 while (left > 0) {
2308 if (left < bs)
2309 bs = obj.get_optimal_bs(left);
2310 if (left < bs) {
2311 acc_data.append_zero(bs - left);
2312 auto off = bi.get_off();
2313 bufferlist tmp;
2314 acc_data.splice(off, bs, &tmp);
2315 acc_data.clear();
2316 acc_data.append(tmp.c_str(), bs); // make it a single buf
2317 bi = acc_data.begin();
2318 left = bs;
2319 }
2320
2321 left -= this->populate_bvec(bs, bi);
2322
2323 op = nullptr;
2324 rc = m0_obj_op(obj.mobj, M0_OC_WRITE, &ext, &buf, &attr, 0, 0, &op);
2325 if (rc != 0)
2326 goto err;
2327 m0_op_launch(&op, 1);
2328 rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?:
2329 m0_rc(op);
2330 m0_op_fini(op);
2331 m0_op_free(op);
2332 if (rc != 0)
2333 goto err;
2334 }
2335 acc_data.clear();
2336
2337 return 0;
2338
2339err:
2340 this->cleanup();
2341 return rc;
2342}
2343
2344static const unsigned MAX_ACC_SIZE = 32 * 1024 * 1024;
2345
2346// Accumulate enough data first to make a reasonable decision about the
2347// optimal unit size for a new object, or bs for existing object (32M seems
2348// enough for 4M units in 8+2 parity groups, a common config on wide pools),
2349// and then launch the write operations.
2350int MotrAtomicWriter::process(bufferlist&& data, uint64_t offset)
2351{
2352 if (data.length() == 0) { // last call, flush data
2353 int rc = 0;
2354 if (acc_data.length() != 0)
2355 rc = this->write();
2356 this->cleanup();
2357 return rc;
2358 }
2359
2360 if (acc_data.length() == 0)
2361 acc_off = offset;
2362
2363 acc_data.append(std::move(data));
2364 if (acc_data.length() < MAX_ACC_SIZE)
2365 return 0;
2366
2367 return this->write();
2368}
2369
2370int MotrAtomicWriter::complete(size_t accounted_size, const std::string& etag,
2371 ceph::real_time *mtime, ceph::real_time set_mtime,
2372 std::map<std::string, bufferlist>& attrs,
2373 ceph::real_time delete_at,
2374 const char *if_match, const char *if_nomatch,
2375 const std::string *user_data,
2376 rgw_zone_set *zones_trace, bool *canceled,
2377 optional_yield y)
2378{
2379 int rc = 0;
2380
2381 if (acc_data.length() != 0) { // check again, just in case
2382 rc = this->write();
2383 this->cleanup();
2384 if (rc != 0)
2385 return rc;
2386 }
2387
2388 bufferlist bl;
2389 rgw_bucket_dir_entry ent;
2390
2391 // Set rgw_bucet_dir_entry. Some of the member of this structure may not
2392 // apply to motr. For example the storage_class.
2393 //
2394 // Checkout AtomicObjectProcessor::complete() in rgw_putobj_processor.cc
2395 // and RGWRados::Object::Write::write_meta() in rgw_rados.cc for what and
2396 // how to set the dir entry. Only set the basic ones for POC, no ACLs and
2397 // other attrs.
2398 obj.get_key().get_index_key(&ent.key);
2399 ent.meta.size = total_data_size;
2400 ent.meta.accounted_size = total_data_size;
2401 ent.meta.mtime = real_clock::is_zero(set_mtime)? ceph::real_clock::now() : set_mtime;
2402 ent.meta.etag = etag;
2403 ent.meta.owner = owner.to_str();
2404 ent.meta.owner_display_name = obj.get_bucket()->get_owner()->get_display_name();
2405 bool is_versioned = obj.get_key().have_instance();
2406 if (is_versioned)
2407 ent.flags = rgw_bucket_dir_entry::FLAG_VER | rgw_bucket_dir_entry::FLAG_CURRENT;
2408 ldpp_dout(dpp, 20) <<__func__<< ": key=" << obj.get_key().get_oid()
2409 << " etag: " << etag << " user_data=" << user_data << dendl;
2410 if (user_data)
2411 ent.meta.user_data = *user_data;
2412 ent.encode(bl);
2413
2414 RGWBucketInfo &info = obj.get_bucket()->get_info();
2415 if (info.obj_lock_enabled() && info.obj_lock.has_rule()) {
2416 auto iter = attrs.find(RGW_ATTR_OBJECT_RETENTION);
2417 if (iter == attrs.end()) {
2418 real_time lock_until_date = info.obj_lock.get_lock_until_date(ent.meta.mtime);
2419 string mode = info.obj_lock.get_mode();
2420 RGWObjectRetention obj_retention(mode, lock_until_date);
2421 bufferlist retention_bl;
2422 obj_retention.encode(retention_bl);
2423 attrs[RGW_ATTR_OBJECT_RETENTION] = retention_bl;
2424 }
2425 }
2426 encode(attrs, bl);
2427 obj.meta.encode(bl);
2428 ldpp_dout(dpp, 20) <<__func__<< ": lid=0x" << std::hex << obj.meta.layout_id
2429 << dendl;
2430 if (is_versioned) {
2431 // get the list of all versioned objects with the same key and
2432 // unset their FLAG_CURRENT later, if do_idx_op_by_name() is successful.
2433 // Note: without distributed lock on the index - it is possible that 2
2434 // CURRENT entries would appear in the bucket. For example, consider the
2435 // following scenario when two clients are trying to add the new object
2436 // version concurrently:
2437 // client 1: reads all the CURRENT entries
2438 // client 2: updates the index and sets the new CURRENT
2439 // client 1: updates the index and sets the new CURRENT
2440 // At the step (1) client 1 would not see the new current record from step (2),
2441 // so it won't update it. As a result, two CURRENT version entries will appear
2442 // in the bucket.
2443 // TODO: update the current version (unset the flag) and insert the new current
2444 // version can be launched in one motr op. This requires change at do_idx_op()
2445 // and do_idx_op_by_name().
2446 rc = obj.update_version_entries(dpp);
2447 if (rc < 0)
2448 return rc;
2449 }
2450 // Insert an entry into bucket index.
2451 string bucket_index_iname = "motr.rgw.bucket.index." + obj.get_bucket()->get_name();
2452 rc = store->do_idx_op_by_name(bucket_index_iname,
2453 M0_IC_PUT, obj.get_key().get_oid(), bl);
2454 if (rc == 0)
2455 store->get_obj_meta_cache()->put(dpp, obj.get_key().get_oid(), bl);
2456
2457 if (old_obj.get_bucket()->get_info().versioning_status() != BUCKET_VERSIONED) {
2458 // Delete old object data if exists.
2459 old_obj.delete_mobj(dpp);
2460 }
2461
2462 // TODO: We need to handle the object leak caused by parallel object upload by
2463 // making use of background gc, which is currently not enabled for motr.
2464 return rc;
2465}
2466
2467int MotrMultipartUpload::delete_parts(const DoutPrefixProvider *dpp)
2468{
2469 int rc;
2470 int max_parts = 1000;
2471 int marker = 0;
2472 bool truncated = false;
2473
2474 // Scan all parts and delete the corresponding motr objects.
2475 do {
2476 rc = this->list_parts(dpp, store->ctx(), max_parts, marker, &marker, &truncated);
2477 if (rc == -ENOENT) {
2478 truncated = false;
2479 rc = 0;
2480 }
2481 if (rc < 0)
2482 return rc;
2483
2484 std::map<uint32_t, std::unique_ptr<MultipartPart>>& parts = this->get_parts();
2485 for (auto part_iter = parts.begin(); part_iter != parts.end(); ++part_iter) {
2486
2487 MultipartPart *mpart = part_iter->second.get();
2488 MotrMultipartPart *mmpart = static_cast<MotrMultipartPart *>(mpart);
2489 uint32_t part_num = mmpart->get_num();
2490
2491 // Delete the part object. Note that the part object is not
2492 // inserted into bucket index, only the corresponding motr object
2493 // needs to be delete. That is why we don't call
2494 // MotrObject::delete_object().
2495 string part_obj_name = bucket->get_name() + "." +
2496 mp_obj.get_key() +
2497 ".part." + std::to_string(part_num);
2498 std::unique_ptr<rgw::sal::Object> obj;
2499 obj = this->bucket->get_object(rgw_obj_key(part_obj_name));
2500 std::unique_ptr<rgw::sal::MotrObject> mobj(static_cast<rgw::sal::MotrObject *>(obj.release()));
2501 mobj->meta = mmpart->meta;
2502 rc = mobj->delete_mobj(dpp);
2503 if (rc < 0) {
2504 ldpp_dout(dpp, 0) << __func__ << ": Failed to delete object from Motr. rc=" << rc << dendl;
2505 return rc;
2506 }
2507 }
2508 } while (truncated);
2509
2510 // Delete object part index.
2511 std::string oid = mp_obj.get_key();
2512 string obj_part_iname = "motr.rgw.object." + bucket->get_name() + "." + oid + ".parts";
2513 return store->delete_motr_idx_by_name(obj_part_iname);
2514}
2515
2516int MotrMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct)
2517{
2518 int rc;
2519 // Check if multipart upload exists
2520 bufferlist bl;
2521 std::unique_ptr<rgw::sal::Object> meta_obj;
2522 meta_obj = get_meta_obj();
2523 string bucket_multipart_iname =
2524 "motr.rgw.bucket." + meta_obj->get_bucket()->get_name() + ".multiparts";
2525 rc = store->do_idx_op_by_name(bucket_multipart_iname,
2526 M0_IC_GET, meta_obj->get_key().to_str(), bl);
2527 if (rc < 0) {
2528 ldpp_dout(dpp, 0) << __func__ << ": Failed to get multipart upload. rc=" << rc << dendl;
2529 return rc == -ENOENT ? -ERR_NO_SUCH_UPLOAD : rc;
2530 }
2531
2532 // Scan all parts and delete the corresponding motr objects.
2533 rc = this->delete_parts(dpp);
2534 if (rc < 0)
2535 return rc;
2536
2537 bl.clear();
2538 // Remove the upload from bucket multipart index.
2539 rc = store->do_idx_op_by_name(bucket_multipart_iname,
2540 M0_IC_DEL, meta_obj->get_key().get_oid(), bl);
2541 return rc;
2542}
2543
2544std::unique_ptr<rgw::sal::Object> MotrMultipartUpload::get_meta_obj()
2545{
2546 std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(rgw_obj_key(get_meta(), string(), mp_ns));
2547 std::unique_ptr<rgw::sal::MotrObject> mobj(static_cast<rgw::sal::MotrObject *>(obj.release()));
2548 mobj->set_category(RGWObjCategory::MultiMeta);
2549 return mobj;
2550}
2551
2552struct motr_multipart_upload_info
2553{
2554 rgw_placement_rule dest_placement;
2555
2556 void encode(bufferlist& bl) const {
2557 ENCODE_START(1, 1, bl);
2558 encode(dest_placement, bl);
2559 ENCODE_FINISH(bl);
2560 }
2561
2562 void decode(bufferlist::const_iterator& bl) {
2563 DECODE_START(1, bl);
2564 decode(dest_placement, bl);
2565 DECODE_FINISH(bl);
2566 }
2567};
2568WRITE_CLASS_ENCODER(motr_multipart_upload_info)
2569
2570int MotrMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y,
2571 ACLOwner& _owner,
2572 rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs)
2573{
2574 int rc;
2575 std::string oid = mp_obj.get_key();
2576
2577 owner = _owner;
2578
2579 do {
2580 char buf[33];
2581 string tmp_obj_name;
2582 gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
2583 std::string upload_id = MULTIPART_UPLOAD_ID_PREFIX; /* v2 upload id */
2584 upload_id.append(buf);
2585
2586 mp_obj.init(oid, upload_id);
2587 tmp_obj_name = mp_obj.get_meta();
2588
2589 std::unique_ptr<rgw::sal::Object> obj;
2590 obj = bucket->get_object(rgw_obj_key(tmp_obj_name, string(), mp_ns));
2591 // the meta object will be indexed with 0 size, we c
2592 obj->set_in_extra_data(true);
2593 obj->set_hash_source(oid);
2594
2595 motr_multipart_upload_info upload_info;
2596 upload_info.dest_placement = dest_placement;
2597 bufferlist mpbl;
2598 encode(upload_info, mpbl);
2599
2600 // Create an initial entry in the bucket. The entry will be
2601 // updated when multipart upload is completed, for example,
2602 // size, etag etc.
2603 bufferlist bl;
2604 rgw_bucket_dir_entry ent;
2605 obj->get_key().get_index_key(&ent.key);
2606 ent.meta.owner = owner.get_id().to_str();
2607 ent.meta.category = RGWObjCategory::MultiMeta;
2608 ent.meta.mtime = ceph::real_clock::now();
2609 ent.meta.user_data.assign(mpbl.c_str(), mpbl.c_str() + mpbl.length());
2610 ent.encode(bl);
2611
2612 // Insert an entry into bucket multipart index so it is not shown
2613 // when listing a bucket.
2614 string bucket_multipart_iname =
2615 "motr.rgw.bucket." + obj->get_bucket()->get_name() + ".multiparts";
2616 rc = store->do_idx_op_by_name(bucket_multipart_iname,
2617 M0_IC_PUT, obj->get_key().get_oid(), bl);
2618
2619 } while (rc == -EEXIST);
2620
2621 if (rc < 0)
2622 return rc;
2623
2624 // Create object part index.
2625 // TODO: add bucket as part of the name.
2626 string obj_part_iname = "motr.rgw.object." + bucket->get_name() + "." + oid + ".parts";
2627 ldpp_dout(dpp, 20) << "MotrMultipartUpload::init(): object part index=" << obj_part_iname << dendl;
2628 rc = store->create_motr_idx_by_name(obj_part_iname);
2629 if (rc == -EEXIST)
2630 rc = 0;
2631 if (rc < 0)
2632 // TODO: clean the bucket index entry
2633 ldpp_dout(dpp, 0) << "Failed to create object multipart index " << obj_part_iname << dendl;
2634
2635 return rc;
2636}
2637
2638int MotrMultipartUpload::list_parts(const DoutPrefixProvider *dpp, CephContext *cct,
2639 int num_parts, int marker,
2640 int *next_marker, bool *truncated,
2641 bool assume_unsorted)
2642{
2643 int rc;
2644 vector<string> key_vec(num_parts);
2645 vector<bufferlist> val_vec(num_parts);
2646
2647 std::string oid = mp_obj.get_key();
2648 string obj_part_iname = "motr.rgw.object." + bucket->get_name() + "." + oid + ".parts";
2649 ldpp_dout(dpp, 20) << __func__ << ": object part index = " << obj_part_iname << dendl;
2650 key_vec[0].clear();
2651 key_vec[0] = "part.";
2652 char buf[32];
2653 snprintf(buf, sizeof(buf), "%08d", marker + 1);
2654 key_vec[0].append(buf);
2655 rc = store->next_query_by_name(obj_part_iname, key_vec, val_vec);
2656 if (rc < 0) {
2657 ldpp_dout(dpp, 0) << "ERROR: NEXT query failed. " << rc << dendl;
2658 return rc;
2659 }
2660
2661 int last_num = 0;
2662 int part_cnt = 0;
2663 uint32_t expected_next = 0;
2664 ldpp_dout(dpp, 20) << __func__ << ": marker = " << marker << dendl;
2665 for (const auto& bl: val_vec) {
2666 if (bl.length() == 0)
2667 break;
2668
2669 RGWUploadPartInfo info;
2670 auto iter = bl.cbegin();
2671 info.decode(iter);
2672 rgw::sal::Attrs attrs_dummy;
2673 decode(attrs_dummy, iter);
2674 MotrObject::Meta meta;
2675 meta.decode(iter);
2676
2677 ldpp_dout(dpp, 20) << __func__ << ": part_num=" << info.num
2678 << " part_size=" << info.size << dendl;
2679 ldpp_dout(dpp, 20) << __func__ << ": meta:oid=[" << meta.oid.u_hi << "," << meta.oid.u_lo
2680 << "], meta:pvid=[" << meta.pver.f_container << "," << meta.pver.f_key
2681 << "], meta:layout id=" << meta.layout_id << dendl;
2682
2683 if (!expected_next)
2684 expected_next = info.num + 1;
2685 else if (expected_next && info.num != expected_next)
2686 return -EINVAL;
2687 else expected_next = info.num + 1;
2688
2689 if ((int)info.num > marker) {
2690 last_num = info.num;
2691 parts.emplace(info.num, std::make_unique<MotrMultipartPart>(info, meta));
2692 }
2693
2694 part_cnt++;
2695 }
2696
2697 // Does it have more parts?
2698 if (truncated)
2699 *truncated = part_cnt < num_parts? false : true;
2700 ldpp_dout(dpp, 20) << __func__ << ": truncated=" << *truncated << dendl;
2701
2702 if (next_marker)
2703 *next_marker = last_num;
2704
2705 return 0;
2706}
2707
2708// Heavily copy from rgw_sal_rados.cc
2709int MotrMultipartUpload::complete(const DoutPrefixProvider *dpp,
2710 optional_yield y, CephContext* cct,
2711 map<int, string>& part_etags,
2712 list<rgw_obj_index_key>& remove_objs,
2713 uint64_t& accounted_size, bool& compressed,
2714 RGWCompressionInfo& cs_info, off_t& off,
2715 std::string& tag, ACLOwner& owner,
2716 uint64_t olh_epoch,
2717 rgw::sal::Object* target_obj)
2718{
2719 char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
2720 char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
2721 std::string etag;
2722 bufferlist etag_bl;
2723 MD5 hash;
2724 // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes
2725 hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW);
2726 bool truncated;
2727 int rc;
2728
2729 ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): enter" << dendl;
2730 int total_parts = 0;
2731 int handled_parts = 0;
2732 int max_parts = 1000;
2733 int marker = 0;
2734 uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size;
2735 auto etags_iter = part_etags.begin();
2736 rgw::sal::Attrs attrs = target_obj->get_attrs();
2737
2738 do {
2739 ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): list_parts()" << dendl;
2740 rc = list_parts(dpp, cct, max_parts, marker, &marker, &truncated);
2741 if (rc == -ENOENT) {
2742 rc = -ERR_NO_SUCH_UPLOAD;
2743 }
2744 if (rc < 0)
2745 return rc;
2746
2747 total_parts += parts.size();
2748 if (!truncated && total_parts != (int)part_etags.size()) {
2749 ldpp_dout(dpp, 0) << "NOTICE: total parts mismatch: have: " << total_parts
2750 << " expected: " << part_etags.size() << dendl;
2751 rc = -ERR_INVALID_PART;
2752 return rc;
2753 }
2754 ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): parts.size()=" << parts.size() << dendl;
2755
2756 for (auto obj_iter = parts.begin();
2757 etags_iter != part_etags.end() && obj_iter != parts.end();
2758 ++etags_iter, ++obj_iter, ++handled_parts) {
2759 MultipartPart *mpart = obj_iter->second.get();
2760 MotrMultipartPart *mmpart = static_cast<MotrMultipartPart *>(mpart);
2761 RGWUploadPartInfo *part = &mmpart->info;
2762
2763 uint64_t part_size = part->accounted_size;
2764 ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): part_size=" << part_size << dendl;
2765 if (handled_parts < (int)part_etags.size() - 1 &&
2766 part_size < min_part_size) {
2767 rc = -ERR_TOO_SMALL;
2768 return rc;
2769 }
2770
2771 char petag[CEPH_CRYPTO_MD5_DIGESTSIZE];
2772 if (etags_iter->first != (int)obj_iter->first) {
2773 ldpp_dout(dpp, 0) << "NOTICE: parts num mismatch: next requested: "
2774 << etags_iter->first << " next uploaded: "
2775 << obj_iter->first << dendl;
2776 rc = -ERR_INVALID_PART;
2777 return rc;
2778 }
2779 string part_etag = rgw_string_unquote(etags_iter->second);
2780 if (part_etag.compare(part->etag) != 0) {
2781 ldpp_dout(dpp, 0) << "NOTICE: etag mismatch: part: " << etags_iter->first
2782 << " etag: " << etags_iter->second << dendl;
2783 rc = -ERR_INVALID_PART;
2784 return rc;
2785 }
2786
2787 hex_to_buf(part->etag.c_str(), petag, CEPH_CRYPTO_MD5_DIGESTSIZE);
2788 hash.Update((const unsigned char *)petag, sizeof(petag));
2789 ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): calc etag " << dendl;
2790
2791 string oid = mp_obj.get_part(part->num);
2792 rgw_obj src_obj;
2793 src_obj.init_ns(bucket->get_key(), oid, mp_ns);
2794
2795#if 0 // does Motr backend need it?
2796 /* update manifest for part */
2797 if (part->manifest.empty()) {
2798 ldpp_dout(dpp, 0) << "ERROR: empty manifest for object part: obj="
2799 << src_obj << dendl;
2800 rc = -ERR_INVALID_PART;
2801 return rc;
2802 } else {
2803 manifest.append(dpp, part->manifest, store->get_zone());
2804 }
2805 ldpp_dout(dpp, 0) << "MotrMultipartUpload::complete(): manifest " << dendl;
2806#endif
2807
2808 bool part_compressed = (part->cs_info.compression_type != "none");
2809 if ((handled_parts > 0) &&
2810 ((part_compressed != compressed) ||
2811 (cs_info.compression_type != part->cs_info.compression_type))) {
2812 ldpp_dout(dpp, 0) << "ERROR: compression type was changed during multipart upload ("
2813 << cs_info.compression_type << ">>" << part->cs_info.compression_type << ")" << dendl;
2814 rc = -ERR_INVALID_PART;
2815 return rc;
2816 }
2817
2818 ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): part compression" << dendl;
2819 if (part_compressed) {
2820 int64_t new_ofs; // offset in compression data for new part
2821 if (cs_info.blocks.size() > 0)
2822 new_ofs = cs_info.blocks.back().new_ofs + cs_info.blocks.back().len;
2823 else
2824 new_ofs = 0;
2825 for (const auto& block : part->cs_info.blocks) {
2826 compression_block cb;
2827 cb.old_ofs = block.old_ofs + cs_info.orig_size;
2828 cb.new_ofs = new_ofs;
2829 cb.len = block.len;
2830 cs_info.blocks.push_back(cb);
2831 new_ofs = cb.new_ofs + cb.len;
2832 }
2833 if (!compressed)
2834 cs_info.compression_type = part->cs_info.compression_type;
2835 cs_info.orig_size += part->cs_info.orig_size;
2836 compressed = true;
2837 }
2838
2839 // We may not need to do the following as remove_objs are those
2840 // don't show when listing a bucket. As we store in-progress uploaded
2841 // object's metadata in a separate index, they are not shown when
2842 // listing a bucket.
2843 rgw_obj_index_key remove_key;
2844 src_obj.key.get_index_key(&remove_key);
2845 remove_objs.push_back(remove_key);
2846
2847 off += part_size;
2848 accounted_size += part->accounted_size;
2849 ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): off=" << off << ", accounted_size = " << accounted_size << dendl;
2850 }
2851 } while (truncated);
2852 hash.Final((unsigned char *)final_etag);
2853
2854 buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str);
2855 snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2],
2856 sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2,
2857 "-%lld", (long long)part_etags.size());
2858 etag = final_etag_str;
2859 ldpp_dout(dpp, 20) << "calculated etag: " << etag << dendl;
2860 etag_bl.append(etag);
2861 attrs[RGW_ATTR_ETAG] = etag_bl;
2862
2863 if (compressed) {
2864 // write compression attribute to full object
2865 bufferlist tmp;
2866 encode(cs_info, tmp);
2867 attrs[RGW_ATTR_COMPRESSION] = tmp;
2868 }
2869
2870 // Read the object's the multipart_upload_info.
2871 // TODO: all those index name and key constructions should be implemented as
2872 // member functions.
2873 bufferlist bl;
2874 std::unique_ptr<rgw::sal::Object> meta_obj;
2875 meta_obj = get_meta_obj();
2876 string bucket_multipart_iname =
2877 "motr.rgw.bucket." + meta_obj->get_bucket()->get_name() + ".multiparts";
2878 rc = this->store->do_idx_op_by_name(bucket_multipart_iname,
2879 M0_IC_GET, meta_obj->get_key().get_oid(), bl);
2880 ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): read entry from bucket multipart index rc=" << rc << dendl;
2881 if (rc < 0)
2882 return rc;
2883 rgw_bucket_dir_entry ent;
2884 bufferlist& blr = bl;
2885 auto ent_iter = blr.cbegin();
2886 ent.decode(ent_iter);
2887
2888 // Update the dir entry and insert it to the bucket index so
2889 // the object will be seen when listing the bucket.
2890 bufferlist update_bl;
2891 target_obj->get_key().get_index_key(&ent.key); // Change to offical name :)
2892 ent.meta.size = off;
2893 ent.meta.accounted_size = accounted_size;
2894 ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): obj size=" << ent.meta.size
2895 << " obj accounted size=" << ent.meta.accounted_size << dendl;
2896 ent.meta.mtime = ceph::real_clock::now();
2897 ent.meta.etag = etag;
2898 ent.encode(update_bl);
2899 encode(attrs, update_bl);
2900 MotrObject::Meta meta_dummy;
2901 meta_dummy.encode(update_bl);
2902
2903 string bucket_index_iname = "motr.rgw.bucket.index." + meta_obj->get_bucket()->get_name();
2904 ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): target_obj name=" << target_obj->get_name()
2905 << " target_obj oid=" << target_obj->get_oid() << dendl;
2906 rc = store->do_idx_op_by_name(bucket_index_iname, M0_IC_PUT,
2907 target_obj->get_name(), update_bl);
2908 if (rc < 0)
2909 return rc;
2910
2911 // Put into metadata cache.
2912 store->get_obj_meta_cache()->put(dpp, target_obj->get_name(), update_bl);
2913
2914 // Now we can remove it from bucket multipart index.
2915 ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): remove from bucket multipartindex " << dendl;
2916 return store->do_idx_op_by_name(bucket_multipart_iname,
2917 M0_IC_DEL, meta_obj->get_key().get_oid(), bl);
2918}
2919
2920int MotrMultipartUpload::get_info(const DoutPrefixProvider *dpp, optional_yield y, rgw_placement_rule** rule, rgw::sal::Attrs* attrs)
2921{
2922 if (!rule && !attrs) {
2923 return 0;
2924 }
2925
2926 if (rule) {
2927 if (!placement.empty()) {
2928 *rule = &placement;
2929 if (!attrs) {
2930 /* Don't need attrs, done */
2931 return 0;
2932 }
2933 } else {
2934 *rule = nullptr;
2935 }
2936 }
2937
2938 std::unique_ptr<rgw::sal::Object> meta_obj;
2939 meta_obj = get_meta_obj();
2940 meta_obj->set_in_extra_data(true);
2941
2942 // Read the object's the multipart_upload_info.
2943 bufferlist bl;
2944 string bucket_multipart_iname =
2945 "motr.rgw.bucket." + meta_obj->get_bucket()->get_name() + ".multiparts";
2946 int rc = this->store->do_idx_op_by_name(bucket_multipart_iname,
2947 M0_IC_GET, meta_obj->get_key().get_oid(), bl);
2948 if (rc < 0) {
2949 ldpp_dout(dpp, 0) << __func__ << ": Failed to get multipart info. rc=" << rc << dendl;
2950 return rc == -ENOENT ? -ERR_NO_SUCH_UPLOAD : rc;
2951 }
2952
2953 rgw_bucket_dir_entry ent;
2954 bufferlist& blr = bl;
2955 auto ent_iter = blr.cbegin();
2956 ent.decode(ent_iter);
2957
2958 if (attrs) {
2959 bufferlist etag_bl;
2960 string& etag = ent.meta.etag;
2961 ldpp_dout(dpp, 20) << "object's etag: " << ent.meta.etag << dendl;
2962 etag_bl.append(etag.c_str(), etag.size());
2963 attrs->emplace(std::move(RGW_ATTR_ETAG), std::move(etag_bl));
2964 if (!rule || *rule != nullptr) {
2965 /* placement was cached; don't actually read */
2966 return 0;
2967 }
2968 }
2969
2970 /* Decode multipart_upload_info */
2971 motr_multipart_upload_info upload_info;
2972 bufferlist mpbl;
2973 mpbl.append(ent.meta.user_data.c_str(), ent.meta.user_data.size());
2974 auto mpbl_iter = mpbl.cbegin();
2975 upload_info.decode(mpbl_iter);
2976 placement = upload_info.dest_placement;
2977 *rule = &placement;
2978
2979 return 0;
2980}
2981
2982std::unique_ptr<Writer> MotrMultipartUpload::get_writer(
2983 const DoutPrefixProvider *dpp,
2984 optional_yield y,
2985 rgw::sal::Object* obj,
2986 const rgw_user& owner,
2987 const rgw_placement_rule *ptail_placement_rule,
2988 uint64_t part_num,
2989 const std::string& part_num_str)
2990{
2991 return std::make_unique<MotrMultipartWriter>(dpp, y, this,
2992 obj, store, owner,
2993 ptail_placement_rule, part_num, part_num_str);
2994}
2995
2996int MotrMultipartWriter::prepare(optional_yield y)
2997{
2998 string part_obj_name = head_obj->get_bucket()->get_name() + "." +
2999 head_obj->get_key().get_oid() +
3000 ".part." + std::to_string(part_num);
3001 ldpp_dout(dpp, 20) << "bucket=" << head_obj->get_bucket()->get_name() << "part_obj_name=" << part_obj_name << dendl;
3002 part_obj = std::make_unique<MotrObject>(this->store, rgw_obj_key(part_obj_name), head_obj->get_bucket());
3003 if (part_obj == nullptr)
3004 return -ENOMEM;
3005
3006 // s3 client may retry uploading part, so the part may have already
3007 // been created.
3008 int rc = part_obj->create_mobj(dpp, store->cctx->_conf->rgw_max_chunk_size);
3009 if (rc == -EEXIST) {
3010 rc = part_obj->open_mobj(dpp);
3011 if (rc < 0)
3012 return rc;
3013 }
3014 return rc;
3015}
3016
3017int MotrMultipartWriter::process(bufferlist&& data, uint64_t offset)
3018{
3019 int rc = part_obj->write_mobj(dpp, std::move(data), offset);
3020 if (rc == 0) {
3021 actual_part_size += data.length();
3022 ldpp_dout(dpp, 20) << " write_mobj(): actual_part_size=" << actual_part_size << dendl;
3023 }
3024 return rc;
3025}
3026
3027int MotrMultipartWriter::complete(size_t accounted_size, const std::string& etag,
3028 ceph::real_time *mtime, ceph::real_time set_mtime,
3029 std::map<std::string, bufferlist>& attrs,
3030 ceph::real_time delete_at,
3031 const char *if_match, const char *if_nomatch,
3032 const std::string *user_data,
3033 rgw_zone_set *zones_trace, bool *canceled,
3034 optional_yield y)
3035{
3036 // Should the dir entry(object metadata) be updated? For example
3037 // mtime.
3038
3039 ldpp_dout(dpp, 20) << "MotrMultipartWriter::complete(): enter" << dendl;
3040 // Add an entry into object_nnn_part_index.
3041 bufferlist bl;
3042 RGWUploadPartInfo info;
3043 info.num = part_num;
3044 info.etag = etag;
3045 info.size = actual_part_size;
3046 info.accounted_size = accounted_size;
3047 info.modified = real_clock::now();
3048
3049 bool compressed;
3050 int rc = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info);
3051 ldpp_dout(dpp, 20) << "MotrMultipartWriter::complete(): compression rc=" << rc << dendl;
3052 if (rc < 0) {
3053 ldpp_dout(dpp, 1) << "cannot get compression info" << dendl;
3054 return rc;
3055 }
3056 encode(info, bl);
3057 encode(attrs, bl);
3058 part_obj->meta.encode(bl);
3059
3060 string p = "part.";
3061 char buf[32];
3062 snprintf(buf, sizeof(buf), "%08d", (int)part_num);
3063 p.append(buf);
3064 string obj_part_iname = "motr.rgw.object." + head_obj->get_bucket()->get_name() + "." +
3065 head_obj->get_key().get_oid() + ".parts";
3066 ldpp_dout(dpp, 20) << "MotrMultipartWriter::complete(): object part index = " << obj_part_iname << dendl;
3067 rc = store->do_idx_op_by_name(obj_part_iname, M0_IC_PUT, p, bl);
3068 if (rc < 0) {
3069 return rc == -ENOENT ? -ERR_NO_SUCH_UPLOAD : rc;
3070 }
3071
3072 return 0;
3073}
3074
3075std::unique_ptr<RGWRole> MotrStore::get_role(std::string name,
3076 std::string tenant,
3077 std::string path,
3078 std::string trust_policy,
3079 std::string max_session_duration_str,
3080 std::multimap<std::string,std::string> tags)
3081{
3082 RGWRole* p = nullptr;
3083 return std::unique_ptr<RGWRole>(p);
3084}
3085
3086std::unique_ptr<RGWRole> MotrStore::get_role(const RGWRoleInfo& info)
3087{
3088 RGWRole* p = nullptr;
3089 return std::unique_ptr<RGWRole>(p);
3090}
3091
3092std::unique_ptr<RGWRole> MotrStore::get_role(std::string id)
3093{
3094 RGWRole* p = nullptr;
3095 return std::unique_ptr<RGWRole>(p);
3096}
3097
3098int MotrStore::get_roles(const DoutPrefixProvider *dpp,
3099 optional_yield y,
3100 const std::string& path_prefix,
3101 const std::string& tenant,
3102 vector<std::unique_ptr<RGWRole>>& roles)
3103{
3104 return 0;
3105}
3106
3107std::unique_ptr<RGWOIDCProvider> MotrStore::get_oidc_provider()
3108{
3109 RGWOIDCProvider* p = nullptr;
3110 return std::unique_ptr<RGWOIDCProvider>(p);
3111}
3112
3113int MotrStore::get_oidc_providers(const DoutPrefixProvider *dpp,
3114 const std::string& tenant,
3115 vector<std::unique_ptr<RGWOIDCProvider>>& providers)
3116{
3117 return 0;
3118}
3119
3120std::unique_ptr<MultipartUpload> MotrBucket::get_multipart_upload(const std::string& oid,
3121 std::optional<std::string> upload_id,
3122 ACLOwner owner, ceph::real_time mtime)
3123{
3124 return std::make_unique<MotrMultipartUpload>(store, this, oid, upload_id, owner, mtime);
3125}
3126
3127std::unique_ptr<Writer> MotrStore::get_append_writer(const DoutPrefixProvider *dpp,
3128 optional_yield y,
3129 rgw::sal::Object* obj,
3130 const rgw_user& owner,
3131 const rgw_placement_rule *ptail_placement_rule,
3132 const std::string& unique_tag,
3133 uint64_t position,
3134 uint64_t *cur_accounted_size) {
3135 return nullptr;
3136}
3137
3138std::unique_ptr<Writer> MotrStore::get_atomic_writer(const DoutPrefixProvider *dpp,
3139 optional_yield y,
3140 rgw::sal::Object* obj,
3141 const rgw_user& owner,
3142 const rgw_placement_rule *ptail_placement_rule,
3143 uint64_t olh_epoch,
3144 const std::string& unique_tag) {
3145 return std::make_unique<MotrAtomicWriter>(dpp, y,
3146 obj, this, owner,
3147 ptail_placement_rule, olh_epoch, unique_tag);
3148}
3149
3150const std::string& MotrStore::get_compression_type(const rgw_placement_rule& rule)
3151{
3152 return zone.zone_params->get_compression_type(rule);
3153}
3154
3155bool MotrStore::valid_placement(const rgw_placement_rule& rule)
3156{
3157 return zone.zone_params->valid_placement(rule);
3158}
3159
3160std::unique_ptr<User> MotrStore::get_user(const rgw_user &u)
3161{
3162 ldout(cctx, 20) << "bucket's user: " << u.to_str() << dendl;
3163 return std::make_unique<MotrUser>(this, u);
3164}
3165
3166int MotrStore::get_user_by_access_key(const DoutPrefixProvider *dpp, const std::string &key, optional_yield y, std::unique_ptr<User> *user)
3167{
3168 int rc;
3169 User *u;
3170 bufferlist bl;
3171 RGWUserInfo uinfo;
3172 MotrAccessKey access_key;
3173
3174 rc = do_idx_op_by_name(RGW_IAM_MOTR_ACCESS_KEY,
3175 M0_IC_GET, key, bl);
3176 if (rc < 0){
3177 ldout(cctx, 0) << "Access key not found: rc = " << rc << dendl;
3178 return rc;
3179 }
3180
3181 bufferlist& blr = bl;
3182 auto iter = blr.cbegin();
3183 access_key.decode(iter);
3184
3185 uinfo.user_id.from_str(access_key.user_id);
3186 ldout(cctx, 0) << "Loading user: " << uinfo.user_id.id << dendl;
3187 rc = MotrUser().load_user_from_idx(dpp, this, uinfo, nullptr, nullptr);
3188 if (rc < 0){
3189 ldout(cctx, 0) << "Failed to load user: rc = " << rc << dendl;
3190 return rc;
3191 }
3192 u = new MotrUser(this, uinfo);
3193 if (!u)
3194 return -ENOMEM;
3195
3196 user->reset(u);
3197 return 0;
3198}
3199
3200int MotrStore::get_user_by_email(const DoutPrefixProvider *dpp, const std::string& email, optional_yield y, std::unique_ptr<User>* user)
3201{
3202 int rc;
3203 User *u;
3204 bufferlist bl;
3205 RGWUserInfo uinfo;
3206 MotrEmailInfo email_info;
3207 rc = do_idx_op_by_name(RGW_IAM_MOTR_EMAIL_KEY,
3208 M0_IC_GET, email, bl);
3209 if (rc < 0){
3210 ldout(cctx, 0) << "Email Id not found: rc = " << rc << dendl;
3211 return rc;
3212 }
3213 auto iter = bl.cbegin();
3214 email_info.decode(iter);
3215 ldout(cctx, 0) << "Loading user: " << email_info.user_id << dendl;
3216 uinfo.user_id.from_str(email_info.user_id);
3217 rc = MotrUser().load_user_from_idx(dpp, this, uinfo, nullptr, nullptr);
3218 if (rc < 0){
3219 ldout(cctx, 0) << "Failed to load user: rc = " << rc << dendl;
3220 return rc;
3221 }
3222 u = new MotrUser(this, uinfo);
3223 if (!u)
3224 return -ENOMEM;
3225
3226 user->reset(u);
3227 return 0;
3228}
3229
3230int MotrStore::get_user_by_swift(const DoutPrefixProvider *dpp, const std::string& user_str, optional_yield y, std::unique_ptr<User>* user)
3231{
3232 /* Swift keys and subusers are not supported for now */
3233 return 0;
3234}
3235
3236int MotrStore::store_access_key(const DoutPrefixProvider *dpp, optional_yield y, MotrAccessKey access_key)
3237{
3238 int rc;
3239 bufferlist bl;
3240 access_key.encode(bl);
3241 rc = do_idx_op_by_name(RGW_IAM_MOTR_ACCESS_KEY,
3242 M0_IC_PUT, access_key.id, bl);
3243 if (rc < 0){
3244 ldout(cctx, 0) << "Failed to store key: rc = " << rc << dendl;
3245 return rc;
3246 }
3247 return rc;
3248}
3249
3250int MotrStore::delete_access_key(const DoutPrefixProvider *dpp, optional_yield y, std::string access_key)
3251{
3252 int rc;
3253 bufferlist bl;
3254 rc = do_idx_op_by_name(RGW_IAM_MOTR_ACCESS_KEY,
3255 M0_IC_DEL, access_key, bl);
3256 if (rc < 0){
3257 ldout(cctx, 0) << "Failed to delete key: rc = " << rc << dendl;
3258 }
3259 return rc;
3260}
3261
3262int MotrStore::store_email_info(const DoutPrefixProvider *dpp, optional_yield y, MotrEmailInfo& email_info )
3263{
3264 int rc;
3265 bufferlist bl;
3266 email_info.encode(bl);
3267 rc = do_idx_op_by_name(RGW_IAM_MOTR_EMAIL_KEY,
3268 M0_IC_PUT, email_info.email_id, bl);
3269 if (rc < 0) {
3270 ldout(cctx, 0) << "Failed to store the user by email as key: rc = " << rc << dendl;
3271 }
3272 return rc;
3273}
3274
3275std::unique_ptr<Object> MotrStore::get_object(const rgw_obj_key& k)
3276{
3277 return std::make_unique<MotrObject>(this, k);
3278}
3279
3280
3281int MotrStore::get_bucket(const DoutPrefixProvider *dpp, User* u, const rgw_bucket& b, std::unique_ptr<Bucket>* bucket, optional_yield y)
3282{
3283 int ret;
3284 Bucket* bp;
3285
3286 bp = new MotrBucket(this, b, u);
3287 ret = bp->load_bucket(dpp, y);
3288 if (ret < 0) {
3289 delete bp;
3290 return ret;
3291 }
3292
3293 bucket->reset(bp);
3294 return 0;
3295}
3296
3297int MotrStore::get_bucket(User* u, const RGWBucketInfo& i, std::unique_ptr<Bucket>* bucket)
3298{
3299 Bucket* bp;
3300
3301 bp = new MotrBucket(this, i, u);
3302 /* Don't need to fetch the bucket info, use the provided one */
3303
3304 bucket->reset(bp);
3305 return 0;
3306}
3307
3308int MotrStore::get_bucket(const DoutPrefixProvider *dpp, User* u, const std::string& tenant, const std::string& name, std::unique_ptr<Bucket>* bucket, optional_yield y)
3309{
3310 rgw_bucket b;
3311
3312 b.tenant = tenant;
3313 b.name = name;
3314
3315 return get_bucket(dpp, u, b, bucket, y);
3316}
3317
3318bool MotrStore::is_meta_master()
3319{
3320 return true;
3321}
3322
3323int MotrStore::forward_request_to_master(const DoutPrefixProvider *dpp, User* user, obj_version *objv,
3324 bufferlist& in_data,
3325 JSONParser *jp, req_info& info,
3326 optional_yield y)
3327{
3328 return 0;
3329}
3330
3331int MotrStore::forward_iam_request_to_master(const DoutPrefixProvider *dpp, const RGWAccessKey& key, obj_version* objv,
3332 bufferlist& in_data,
3333 RGWXMLDecoder::XMLParser* parser, req_info& info,
3334 optional_yield y)
3335{
3336 return 0;
3337}
3338
3339std::string MotrStore::zone_unique_id(uint64_t unique_num)
3340{
3341 return "";
3342}
3343
3344std::string MotrStore::zone_unique_trans_id(const uint64_t unique_num)
3345{
3346 return "";
3347}
3348
3349int MotrStore::get_zonegroup(const std::string& id, std::unique_ptr<ZoneGroup>* group)
3350{
3351 /* XXX: for now only one zonegroup supported */
3352 ZoneGroup* zg;
3353 zg = new MotrZoneGroup(this, zone.zonegroup.get_group());
3354
3355 group->reset(zg);
3356 return 0;
3357}
3358
3359int MotrStore::list_all_zones(const DoutPrefixProvider* dpp,
3360 std::list<std::string>& zone_ids)
3361{
3362 zone_ids.push_back(zone.get_id());
3363 return 0;
3364}
3365
3366int MotrStore::cluster_stat(RGWClusterStat& stats)
3367{
3368 return 0;
3369}
3370
3371std::unique_ptr<Lifecycle> MotrStore::get_lifecycle(void)
3372{
3373 return 0;
3374}
3375
3376std::unique_ptr<Completions> MotrStore::get_completions(void)
3377{
3378 return 0;
3379}
3380
3381std::unique_ptr<Notification> MotrStore::get_notification(Object* obj, Object* src_obj, req_state* s,
3382 rgw::notify::EventType event_type, optional_yield y, const string* object_name)
3383{
3384 return std::make_unique<MotrNotification>(obj, src_obj, event_type);
3385}
3386
3387std::unique_ptr<Notification> MotrStore::get_notification(const DoutPrefixProvider* dpp, Object* obj,
3388 Object* src_obj, rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket,
3389 std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y)
3390{
3391 return std::make_unique<MotrNotification>(obj, src_obj, event_type);
3392}
3393
3394int MotrStore::log_usage(const DoutPrefixProvider *dpp, map<rgw_user_bucket, RGWUsageBatch>& usage_info)
3395{
3396 return 0;
3397}
3398
3399int MotrStore::log_op(const DoutPrefixProvider *dpp, string& oid, bufferlist& bl)
3400{
3401 return 0;
3402}
3403
3404int MotrStore::register_to_service_map(const DoutPrefixProvider *dpp, const string& daemon_type,
3405 const map<string, string>& meta)
3406{
3407 return 0;
3408}
3409
3410void MotrStore::get_ratelimit(RGWRateLimitInfo& bucket_ratelimit,
3411 RGWRateLimitInfo& user_ratelimit,
3412 RGWRateLimitInfo& anon_ratelimit)
3413{
3414 return;
3415}
3416
3417void MotrStore::get_quota(RGWQuota& quota)
3418{
3419 // XXX: Not handled for the first pass
3420 return;
3421}
3422
3423int MotrStore::set_buckets_enabled(const DoutPrefixProvider *dpp, vector<rgw_bucket>& buckets, bool enabled)
3424{
3425 return 0;
3426}
3427
3428int MotrStore::get_sync_policy_handler(const DoutPrefixProvider *dpp,
3429 std::optional<rgw_zone_id> zone,
3430 std::optional<rgw_bucket> bucket,
3431 RGWBucketSyncPolicyHandlerRef *phandler,
3432 optional_yield y)
3433{
3434 return 0;
3435}
3436
3437RGWDataSyncStatusManager* MotrStore::get_data_sync_manager(const rgw_zone_id& source_zone)
3438{
3439 return 0;
3440}
3441
3442int MotrStore::read_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch,
3443 uint32_t max_entries, bool *is_truncated,
3444 RGWUsageIter& usage_iter,
3445 map<rgw_user_bucket, rgw_usage_log_entry>& usage)
3446{
3447 return 0;
3448}
3449
3450int MotrStore::trim_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch)
3451{
3452 return 0;
3453}
3454
3455int MotrStore::get_config_key_val(string name, bufferlist *bl)
3456{
3457 return 0;
3458}
3459
3460int MotrStore::meta_list_keys_init(const DoutPrefixProvider *dpp, const string& section, const string& marker, void** phandle)
3461{
3462 return 0;
3463}
3464
3465int MotrStore::meta_list_keys_next(const DoutPrefixProvider *dpp, void* handle, int max, list<string>& keys, bool* truncated)
3466{
3467 return 0;
3468}
3469
3470void MotrStore::meta_list_keys_complete(void* handle)
3471{
3472 return;
3473}
3474
3475std::string MotrStore::meta_get_marker(void* handle)
3476{
3477 return "";
3478}
3479
3480int MotrStore::meta_remove(const DoutPrefixProvider *dpp, string& metadata_key, optional_yield y)
3481{
3482 return 0;
3483}
3484
3485int MotrStore::open_idx(struct m0_uint128 *id, bool create, struct m0_idx *idx)
3486{
3487 m0_idx_init(idx, &container.co_realm, id);
3488
3489 if (!create)
3490 return 0; // nothing to do more
3491
3492 // create index or make sure it's created
3493 struct m0_op *op = nullptr;
3494 int rc = m0_entity_create(nullptr, &idx->in_entity, &op);
3495 if (rc != 0) {
3496 ldout(cctx, 0) << "ERROR: m0_entity_create() failed: " << rc << dendl;
3497 goto out;
3498 }
3499
3500 m0_op_launch(&op, 1);
3501 rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?:
3502 m0_rc(op);
3503 m0_op_fini(op);
3504 m0_op_free(op);
3505
3506 if (rc != 0 && rc != -EEXIST)
3507 ldout(cctx, 0) << "ERROR: index create failed: " << rc << dendl;
3508out:
3509 return rc;
3510}
3511
3512static void set_m0bufvec(struct m0_bufvec *bv, vector<uint8_t>& vec)
3513{
3514 *bv->ov_buf = reinterpret_cast<char*>(vec.data());
3515 *bv->ov_vec.v_count = vec.size();
3516}
3517
3518// idx must be opened with open_idx() beforehand
3519int MotrStore::do_idx_op(struct m0_idx *idx, enum m0_idx_opcode opcode,
3520 vector<uint8_t>& key, vector<uint8_t>& val, bool update)
3521{
3522 int rc, rc_i;
3523 struct m0_bufvec k, v, *vp = &v;
3524 uint32_t flags = 0;
3525 struct m0_op *op = nullptr;
3526
3527 if (m0_bufvec_empty_alloc(&k, 1) != 0) {
3528 ldout(cctx, 0) << "ERROR: failed to allocate key bufvec" << dendl;
3529 return -ENOMEM;
3530 }
3531
3532 if (opcode == M0_IC_PUT || opcode == M0_IC_GET) {
3533 rc = -ENOMEM;
3534 if (m0_bufvec_empty_alloc(&v, 1) != 0) {
3535 ldout(cctx, 0) << "ERROR: failed to allocate value bufvec" << dendl;
3536 goto out;
3537 }
3538 }
3539
3540 set_m0bufvec(&k, key);
3541 if (opcode == M0_IC_PUT)
3542 set_m0bufvec(&v, val);
3543
3544 if (opcode == M0_IC_DEL)
3545 vp = nullptr;
3546
3547 if (opcode == M0_IC_PUT && update)
3548 flags |= M0_OIF_OVERWRITE;
3549
3550 rc = m0_idx_op(idx, opcode, &k, vp, &rc_i, flags, &op);
3551 if (rc != 0) {
3552 ldout(cctx, 0) << "ERROR: failed to init index op: " << rc << dendl;
3553 goto out;
3554 }
3555
3556 m0_op_launch(&op, 1);
3557 rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?:
3558 m0_rc(op);
3559 m0_op_fini(op);
3560 m0_op_free(op);
3561
3562 if (rc != 0) {
3563 ldout(cctx, 0) << "ERROR: op failed: " << rc << dendl;
3564 goto out;
3565 }
3566
3567 if (rc_i != 0) {
3568 ldout(cctx, 0) << "ERROR: idx op failed: " << rc_i << dendl;
3569 rc = rc_i;
3570 goto out;
3571 }
3572
3573 if (opcode == M0_IC_GET) {
3574 val.resize(*v.ov_vec.v_count);
3575 memcpy(reinterpret_cast<char*>(val.data()), *v.ov_buf, *v.ov_vec.v_count);
3576 }
3577
3578out:
3579 m0_bufvec_free2(&k);
3580 if (opcode == M0_IC_GET)
3581 m0_bufvec_free(&v); // cleanup buffer after GET
3582 else if (opcode == M0_IC_PUT)
3583 m0_bufvec_free2(&v);
3584
3585 return rc;
3586}
3587
3588// Retrieve a range of key/value pairs starting from keys[0].
3589int MotrStore::do_idx_next_op(struct m0_idx *idx,
3590 vector<vector<uint8_t>>& keys,
3591 vector<vector<uint8_t>>& vals)
3592{
3593 int rc;
3594 uint32_t i = 0;
3595 int nr_kvp = vals.size();
3596 int *rcs = new int[nr_kvp];
3597 struct m0_bufvec k, v;
3598 struct m0_op *op = nullptr;
3599
3600 rc = m0_bufvec_empty_alloc(&k, nr_kvp)?:
3601 m0_bufvec_empty_alloc(&v, nr_kvp);
3602 if (rc != 0) {
3603 ldout(cctx, 0) << "ERROR: failed to allocate kv bufvecs" << dendl;
3604 return rc;
3605 }
3606
3607 set_m0bufvec(&k, keys[0]);
3608
3609 rc = m0_idx_op(idx, M0_IC_NEXT, &k, &v, rcs, 0, &op);
3610 if (rc != 0) {
3611 ldout(cctx, 0) << "ERROR: failed to init index op: " << rc << dendl;
3612 goto out;
3613 }
3614
3615 m0_op_launch(&op, 1);
3616 rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?:
3617 m0_rc(op);
3618 m0_op_fini(op);
3619 m0_op_free(op);
3620
3621 if (rc != 0) {
3622 ldout(cctx, 0) << "ERROR: op failed: " << rc << dendl;
3623 goto out;
3624 }
3625
3626 for (i = 0; i < v.ov_vec.v_nr; ++i) {
3627 if (rcs[i] < 0)
3628 break;
3629
3630 vector<uint8_t>& key = keys[i];
3631 vector<uint8_t>& val = vals[i];
3632 key.resize(k.ov_vec.v_count[i]);
3633 val.resize(v.ov_vec.v_count[i]);
3634 memcpy(reinterpret_cast<char*>(key.data()), k.ov_buf[i], k.ov_vec.v_count[i]);
3635 memcpy(reinterpret_cast<char*>(val.data()), v.ov_buf[i], v.ov_vec.v_count[i]);
3636 }
3637
3638out:
3639 k.ov_vec.v_nr = i;
3640 v.ov_vec.v_nr = i;
3641 m0_bufvec_free(&k);
3642 m0_bufvec_free(&v); // cleanup buffer after GET
3643
3644 delete []rcs;
3645 return rc ?: i;
3646}
3647
3648// Retrieve a number of key/value pairs under the prefix starting
3649// from the marker at key_out[0].
3650int MotrStore::next_query_by_name(string idx_name,
3651 vector<string>& key_out,
3652 vector<bufferlist>& val_out,
3653 string prefix, string delim)
3654{
3655 unsigned nr_kvp = std::min(val_out.size(), 100UL);
3656 struct m0_idx idx = {};
3657 vector<vector<uint8_t>> keys(nr_kvp);
3658 vector<vector<uint8_t>> vals(nr_kvp);
3659 struct m0_uint128 idx_id;
3660 int i = 0, j, k = 0;
3661
3662 index_name_to_motr_fid(idx_name, &idx_id);
3663 int rc = open_motr_idx(&idx_id, &idx);
3664 if (rc != 0) {
3665 ldout(cctx, 0) << "ERROR: next_query_by_name(): failed to open index: rc="
3666 << rc << dendl;
3667 goto out;
3668 }
3669
3670 // Only the first element for keys needs to be set for NEXT query.
3671 // The keys will be set will the returned keys from motr index.
3672 ldout(cctx, 20) <<__func__<< ": next_query_by_name(): index=" << idx_name
3673 << " prefix=" << prefix << " delim=" << delim << dendl;
3674 keys[0].assign(key_out[0].begin(), key_out[0].end());
3675 for (i = 0; i < (int)val_out.size(); i += k, k = 0) {
3676 rc = do_idx_next_op(&idx, keys, vals);
3677 ldout(cctx, 20) << "do_idx_next_op() = " << rc << dendl;
3678 if (rc < 0) {
3679 ldout(cctx, 0) << "ERROR: NEXT query failed. " << rc << dendl;
3680 goto out;
3681 }
3682
3683 string dir;
3684 for (j = 0, k = 0; j < rc; ++j) {
3685 string key(keys[j].begin(), keys[j].end());
3686 size_t pos = std::string::npos;
3687 if (!delim.empty())
3688 pos = key.find(delim, prefix.length());
3689 if (pos != std::string::npos) { // DIR entry
3690 dir.assign(key, 0, pos + 1);
3691 if (dir.compare(0, prefix.length(), prefix) != 0)
3692 goto out;
3693 if (i + k == 0 || dir != key_out[i + k - 1]) // a new one
3694 key_out[i + k++] = dir;
3695 continue;
3696 }
3697 dir = "";
3698 if (key.compare(0, prefix.length(), prefix) != 0)
3699 goto out;
3700 key_out[i + k] = key;
3701 bufferlist& vbl = val_out[i + k];
3702 vbl.append(reinterpret_cast<char*>(vals[j].data()), vals[j].size());
3703 ++k;
3704 }
3705
3706 if (rc < (int)nr_kvp) // there are no more keys to fetch
3707 break;
3708
3709 string next_key;
3710 if (dir != "")
3711 next_key = dir + "\xff"; // skip all dir content in 1 step
3712 else
3713 next_key = key_out[i + k - 1] + " ";
3714 ldout(cctx, 0) << "do_idx_next_op(): next_key=" << next_key << dendl;
3715 keys[0].assign(next_key.begin(), next_key.end());
3716 }
3717
3718out:
3719 m0_idx_fini(&idx);
3720 return rc < 0 ? rc : i + k;
3721}
3722
3723int MotrStore::delete_motr_idx_by_name(string iname)
3724{
3725 struct m0_idx idx;
3726 struct m0_uint128 idx_id;
3727 struct m0_op *op = nullptr;
3728
3729 ldout(cctx, 20) << "delete_motr_idx_by_name=" << iname << dendl;
3730
3731 index_name_to_motr_fid(iname, &idx_id);
3732 m0_idx_init(&idx, &container.co_realm, &idx_id);
3733 m0_entity_open(&idx.in_entity, &op);
3734 int rc = m0_entity_delete(&idx.in_entity, &op);
3735 if (rc < 0)
3736 goto out;
3737
3738 m0_op_launch(&op, 1);
3739
3740 ldout(cctx, 70) << "waiting for op completion" << dendl;
3741
3742 rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?:
3743 m0_rc(op);
3744 m0_op_fini(op);
3745 m0_op_free(op);
3746
3747 if (rc == -ENOENT) // race deletion??
3748 rc = 0;
3749 else if (rc < 0)
3750 ldout(cctx, 0) << "ERROR: index create failed: " << rc << dendl;
3751
3752 ldout(cctx, 20) << "delete_motr_idx_by_name rc=" << rc << dendl;
3753
3754out:
3755 m0_idx_fini(&idx);
3756 return rc;
3757}
3758
3759int MotrStore::open_motr_idx(struct m0_uint128 *id, struct m0_idx *idx)
3760{
3761 m0_idx_init(idx, &container.co_realm, id);
3762 return 0;
3763}
3764
3765// The following marcos are from dix/fid_convert.h which are not exposed.
3766enum {
3767 M0_DIX_FID_DEVICE_ID_OFFSET = 32,
3768 M0_DIX_FID_DIX_CONTAINER_MASK = (1ULL << M0_DIX_FID_DEVICE_ID_OFFSET)
3769 - 1,
3770};
3771
3772// md5 is used here, a more robust way to convert index name to fid is
3773// needed to avoid collision.
3774void MotrStore::index_name_to_motr_fid(string iname, struct m0_uint128 *id)
3775{
3776 unsigned char md5[16]; // 128/8 = 16
3777 MD5 hash;
3778
3779 // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes
3780 hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW);
3781 hash.Update((const unsigned char *)iname.c_str(), iname.length());
3782 hash.Final(md5);
3783
3784 memcpy(&id->u_hi, md5, 8);
3785 memcpy(&id->u_lo, md5 + 8, 8);
3786 ldout(cctx, 20) << "id = 0x" << std::hex << id->u_hi << ":0x" << std::hex << id->u_lo << dendl;
3787
3788 struct m0_fid *fid = (struct m0_fid*)id;
3789 m0_fid_tset(fid, m0_dix_fid_type.ft_id,
3790 fid->f_container & M0_DIX_FID_DIX_CONTAINER_MASK, fid->f_key);
3791 ldout(cctx, 20) << "converted id = 0x" << std::hex << id->u_hi << ":0x" << std::hex << id->u_lo << dendl;
3792}
3793
3794int MotrStore::do_idx_op_by_name(string idx_name, enum m0_idx_opcode opcode,
3795 string key_str, bufferlist &bl, bool update)
3796{
3797 struct m0_idx idx;
3798 vector<uint8_t> key(key_str.begin(), key_str.end());
3799 vector<uint8_t> val;
3800 struct m0_uint128 idx_id;
3801
3802 index_name_to_motr_fid(idx_name, &idx_id);
3803 int rc = open_motr_idx(&idx_id, &idx);
3804 if (rc != 0) {
3805 ldout(cctx, 0) << "ERROR: failed to open index: " << rc << dendl;
3806 goto out;
3807 }
3808
3809 if (opcode == M0_IC_PUT)
3810 val.assign(bl.c_str(), bl.c_str() + bl.length());
3811
3812 ldout(cctx, 20) <<__func__<< ": do_idx_op_by_name(): op="
3813 << (opcode == M0_IC_PUT ? "PUT" : "GET")
3814 << " idx=" << idx_name << " key=" << key_str << dendl;
3815 rc = do_idx_op(&idx, opcode, key, val, update);
3816 if (rc == 0 && opcode == M0_IC_GET)
3817 // Append the returned value (blob) to the bufferlist.
3818 bl.append(reinterpret_cast<char*>(val.data()), val.size());
3819
3820out:
3821 m0_idx_fini(&idx);
3822 return rc;
3823}
3824
3825int MotrStore::create_motr_idx_by_name(string iname)
3826{
3827 struct m0_idx idx = {};
3828 struct m0_uint128 id;
3829
3830 index_name_to_motr_fid(iname, &id);
3831 m0_idx_init(&idx, &container.co_realm, &id);
3832
3833 // create index or make sure it's created
3834 struct m0_op *op = nullptr;
3835 int rc = m0_entity_create(nullptr, &idx.in_entity, &op);
3836 if (rc != 0) {
3837 ldout(cctx, 0) << "ERROR: m0_entity_create() failed: " << rc << dendl;
3838 goto out;
3839 }
3840
3841 m0_op_launch(&op, 1);
3842 rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?:
3843 m0_rc(op);
3844 m0_op_fini(op);
3845 m0_op_free(op);
3846
3847 if (rc != 0 && rc != -EEXIST)
3848 ldout(cctx, 0) << "ERROR: index create failed: " << rc << dendl;
3849out:
3850 m0_idx_fini(&idx);
3851 return rc;
3852}
3853
3854// If a global index is checked (if it has been create) every time
3855// before they're queried (put/get), which takes 2 Motr operations to
3856// complete the query. As the global indices' name and FID are known
3857// already when MotrStore is created, we move the check and creation
3858// in newMotrStore().
3859// Similar method is used for per bucket/user index. For example,
3860// bucket instance index is created when creating the bucket.
3861int MotrStore::check_n_create_global_indices()
3862{
3863 int rc = 0;
3864
3865 for (const auto& iname : motr_global_indices) {
3866 rc = create_motr_idx_by_name(iname);
3867 if (rc < 0 && rc != -EEXIST)
3868 break;
3869 rc = 0;
3870 }
3871
3872 return rc;
3873}
3874
3875std::string MotrStore::get_cluster_id(const DoutPrefixProvider* dpp, optional_yield y)
3876{
3877 char id[M0_FID_STR_LEN];
3878 struct m0_confc *confc = m0_reqh2confc(&instance->m0c_reqh);
3879
3880 m0_fid_print(id, ARRAY_SIZE(id), &confc->cc_root->co_id);
3881 return std::string(id);
3882}
3883
3884int MotrStore::init_metadata_cache(const DoutPrefixProvider *dpp,
3885 CephContext *cct)
3886{
3887 this->obj_meta_cache = new MotrMetaCache(dpp, cct);
3888 this->get_obj_meta_cache()->set_enabled(true);
3889
3890 this->user_cache = new MotrMetaCache(dpp, cct);
3891 this->get_user_cache()->set_enabled(true);
3892
3893 this->bucket_inst_cache = new MotrMetaCache(dpp, cct);
3894 this->get_bucket_inst_cache()->set_enabled(true);
3895
3896 return 0;
3897}
3898
3899 int MotrLuaManager::get_script(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, std::string& script)
3900 {
3901 return -ENOENT;
3902 }
3903
3904 int MotrLuaManager::put_script(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, const std::string& script)
3905 {
3906 return -ENOENT;
3907 }
3908
3909 int MotrLuaManager::del_script(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key)
3910 {
3911 return -ENOENT;
3912 }
3913
3914 int MotrLuaManager::add_package(const DoutPrefixProvider* dpp, optional_yield y, const std::string& package_name)
3915 {
3916 return -ENOENT;
3917 }
3918
3919 int MotrLuaManager::remove_package(const DoutPrefixProvider* dpp, optional_yield y, const std::string& package_name)
3920 {
3921 return -ENOENT;
3922 }
3923
3924 int MotrLuaManager::list_packages(const DoutPrefixProvider* dpp, optional_yield y, rgw::lua::packages_t& packages)
3925 {
3926 return -ENOENT;
3927 }
3928} // namespace rgw::sal
3929
3930extern "C" {
3931
3932void *newMotrStore(CephContext *cct)
3933{
3934 int rc = -1;
3935 rgw::sal::MotrStore *store = new rgw::sal::MotrStore(cct);
3936
3937 if (store) {
3938 store->conf.mc_is_oostore = true;
3939 // XXX: these params should be taken from config settings and
3940 // cct somehow?
3941 store->instance = nullptr;
3942 const auto& proc_ep = g_conf().get_val<std::string>("motr_my_endpoint");
3943 const auto& ha_ep = g_conf().get_val<std::string>("motr_ha_endpoint");
3944 const auto& proc_fid = g_conf().get_val<std::string>("motr_my_fid");
3945 const auto& profile = g_conf().get_val<std::string>("motr_profile_fid");
3946 const auto& admin_proc_ep = g_conf().get_val<std::string>("motr_admin_endpoint");
3947 const auto& admin_proc_fid = g_conf().get_val<std::string>("motr_admin_fid");
3948 const int init_flags = cct->get_init_flags();
3949 ldout(cct, 0) << "INFO: motr my endpoint: " << proc_ep << dendl;
3950 ldout(cct, 0) << "INFO: motr ha endpoint: " << ha_ep << dendl;
3951 ldout(cct, 0) << "INFO: motr my fid: " << proc_fid << dendl;
3952 ldout(cct, 0) << "INFO: motr profile fid: " << profile << dendl;
3953 store->conf.mc_local_addr = proc_ep.c_str();
3954 store->conf.mc_process_fid = proc_fid.c_str();
3955
3956 ldout(cct, 0) << "INFO: init flags: " << init_flags << dendl;
3957 ldout(cct, 0) << "INFO: motr admin endpoint: " << admin_proc_ep << dendl;
3958 ldout(cct, 0) << "INFO: motr admin fid: " << admin_proc_fid << dendl;
3959
3960 // HACK this is so that radosge-admin uses a different client
3961 if (init_flags == 0) {
3962 store->conf.mc_process_fid = admin_proc_fid.c_str();
3963 store->conf.mc_local_addr = admin_proc_ep.c_str();
3964 } else {
3965 store->conf.mc_process_fid = proc_fid.c_str();
3966 store->conf.mc_local_addr = proc_ep.c_str();
3967 }
3968 store->conf.mc_ha_addr = ha_ep.c_str();
3969 store->conf.mc_profile = profile.c_str();
3970
3971 ldout(cct, 50) << "INFO: motr profile fid: " << store->conf.mc_profile << dendl;
3972 ldout(cct, 50) << "INFO: ha addr: " << store->conf.mc_ha_addr << dendl;
3973 ldout(cct, 50) << "INFO: process fid: " << store->conf.mc_process_fid << dendl;
3974 ldout(cct, 50) << "INFO: motr endpoint: " << store->conf.mc_local_addr << dendl;
3975
3976 store->conf.mc_tm_recv_queue_min_len = 64;
3977 store->conf.mc_max_rpc_msg_size = 524288;
3978 store->conf.mc_idx_service_id = M0_IDX_DIX;
3979 store->dix_conf.kc_create_meta = false;
3980 store->conf.mc_idx_service_conf = &store->dix_conf;
3981
3982 if (!g_conf().get_val<bool>("motr_tracing_enabled")) {
3983 m0_trace_level_allow(M0_WARN); // allow errors and warnings in syslog anyway
3984 m0_trace_set_mmapped_buffer(false);
3985 }
3986
3987 store->instance = nullptr;
3988 rc = m0_client_init(&store->instance, &store->conf, true);
3989 if (rc != 0) {
3990 ldout(cct, 0) << "ERROR: m0_client_init() failed: " << rc << dendl;
3991 goto out;
3992 }
3993
3994 m0_container_init(&store->container, nullptr, &M0_UBER_REALM, store->instance);
3995 rc = store->container.co_realm.re_entity.en_sm.sm_rc;
3996 if (rc != 0) {
3997 ldout(cct, 0) << "ERROR: m0_container_init() failed: " << rc << dendl;
3998 goto out;
3999 }
4000
4001 rc = m0_ufid_init(store->instance, &ufid_gr);
4002 if (rc != 0) {
4003 ldout(cct, 0) << "ERROR: m0_ufid_init() failed: " << rc << dendl;
4004 goto out;
4005 }
4006
4007 // Create global indices if not yet.
4008 rc = store->check_n_create_global_indices();
4009 if (rc != 0) {
4010 ldout(cct, 0) << "ERROR: check_n_create_global_indices() failed: " << rc << dendl;
4011 goto out;
4012 }
4013
4014 }
4015
4016out:
4017 if (rc != 0) {
4018 delete store;
4019 return nullptr;
4020 }
4021 return store;
4022}
4023
4024}