]>
Commit | Line | Data |
---|---|---|
1e59de90 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=2 sw=2 expandtab ft=cpp | |
3 | ||
4 | /* | |
5 | * Ceph - scalable distributed file system | |
6 | * | |
7 | * SAL implementation for the CORTX Motr backend | |
8 | * | |
9 | * Copyright (C) 2021 Seagate Technology LLC and/or its Affiliates | |
10 | * | |
11 | * This is free software; you can redistribute it and/or | |
12 | * modify it under the terms of the GNU Lesser General Public | |
13 | * License version 2.1, as published by the Free Software | |
14 | * Foundation. See file COPYING. | |
15 | * | |
16 | */ | |
17 | ||
18 | #include <errno.h> | |
19 | #include <stdlib.h> | |
20 | #include <unistd.h> | |
21 | ||
22 | extern "C" { | |
23 | #pragma clang diagnostic push | |
24 | #pragma clang diagnostic ignored "-Wextern-c-compat" | |
25 | #pragma clang diagnostic ignored "-Wdeprecated-anon-enum-enum-conversion" | |
26 | #include "motr/config.h" | |
27 | #include "lib/types.h" | |
28 | #include "lib/trace.h" // m0_trace_set_mmapped_buffer | |
29 | #include "motr/layout.h" // M0_OBJ_LAYOUT_ID | |
30 | #include "helpers/helpers.h" // m0_ufid_next | |
31 | #pragma clang diagnostic pop | |
32 | } | |
33 | ||
34 | #include "common/Clock.h" | |
35 | #include "common/errno.h" | |
36 | ||
37 | #include "rgw_compression.h" | |
38 | #include "rgw_sal.h" | |
39 | #include "rgw_sal_motr.h" | |
40 | #include "rgw_bucket.h" | |
41 | ||
42 | #define dout_subsys ceph_subsys_rgw | |
43 | ||
44 | using std::string; | |
45 | using std::map; | |
46 | using std::vector; | |
47 | using std::set; | |
48 | using std::list; | |
49 | ||
50 | static string mp_ns = RGW_OBJ_NS_MULTIPART; | |
51 | static struct m0_ufid_generator ufid_gr; | |
52 | ||
53 | namespace rgw::sal { | |
54 | ||
55 | using ::ceph::encode; | |
56 | using ::ceph::decode; | |
57 | ||
58 | static std::string motr_global_indices[] = { | |
59 | RGW_MOTR_USERS_IDX_NAME, | |
60 | RGW_MOTR_BUCKET_INST_IDX_NAME, | |
61 | RGW_MOTR_BUCKET_HD_IDX_NAME, | |
62 | RGW_IAM_MOTR_ACCESS_KEY, | |
63 | RGW_IAM_MOTR_EMAIL_KEY | |
64 | }; | |
65 | ||
66 | void MotrMetaCache::invalid(const DoutPrefixProvider *dpp, | |
67 | const string& name) | |
68 | { | |
69 | cache.invalidate_remove(dpp, name); | |
70 | } | |
71 | ||
72 | int MotrMetaCache::put(const DoutPrefixProvider *dpp, | |
73 | const string& name, | |
74 | const bufferlist& data) | |
75 | { | |
76 | ldpp_dout(dpp, 0) << "Put into cache: name = " << name << dendl; | |
77 | ||
78 | ObjectCacheInfo info; | |
79 | info.status = 0; | |
80 | info.data = data; | |
81 | info.flags = CACHE_FLAG_DATA; | |
82 | info.meta.mtime = ceph::real_clock::now(); | |
83 | info.meta.size = data.length(); | |
84 | cache.put(dpp, name, info, NULL); | |
85 | ||
86 | // Inform other rgw instances. Do nothing if it gets some error? | |
87 | int rc = distribute_cache(dpp, name, info, UPDATE_OBJ); | |
88 | if (rc < 0) | |
89 | ldpp_dout(dpp, 0) << "ERROR: failed to distribute cache for " << name << dendl; | |
90 | ||
91 | return 0; | |
92 | } | |
93 | ||
94 | int MotrMetaCache::get(const DoutPrefixProvider *dpp, | |
95 | const string& name, | |
96 | bufferlist& data) | |
97 | { | |
98 | ObjectCacheInfo info; | |
99 | uint32_t flags = CACHE_FLAG_DATA; | |
100 | int rc = cache.get(dpp, name, info, flags, NULL); | |
101 | if (rc == 0) { | |
102 | if (info.status < 0) | |
103 | return info.status; | |
104 | ||
105 | bufferlist& bl = info.data; | |
106 | bufferlist::iterator it = bl.begin(); | |
107 | data.clear(); | |
108 | ||
109 | it.copy_all(data); | |
110 | ldpp_dout(dpp, 0) << "Cache hit: name = " << name << dendl; | |
111 | return 0; | |
112 | } | |
113 | ldpp_dout(dpp, 0) << "Cache miss: name = " << name << ", rc = "<< rc << dendl; | |
114 | if(rc == -ENODATA) | |
115 | return -ENOENT; | |
116 | ||
117 | return rc; | |
118 | } | |
119 | ||
120 | int MotrMetaCache::remove(const DoutPrefixProvider *dpp, | |
121 | const string& name) | |
122 | ||
123 | { | |
124 | cache.invalidate_remove(dpp, name); | |
125 | ||
126 | ObjectCacheInfo info; | |
127 | int rc = distribute_cache(dpp, name, info, INVALIDATE_OBJ); | |
128 | if (rc < 0) { | |
129 | ldpp_dout(dpp, 0) << "ERROR: " <<__func__<< "(): failed to distribute cache: rc =" << rc << dendl; | |
130 | } | |
131 | ||
132 | ldpp_dout(dpp, 0) << "Remove from cache: name = " << name << dendl; | |
133 | return 0; | |
134 | } | |
135 | ||
136 | int MotrMetaCache::distribute_cache(const DoutPrefixProvider *dpp, | |
137 | const string& normal_name, | |
138 | ObjectCacheInfo& obj_info, int op) | |
139 | { | |
140 | return 0; | |
141 | } | |
142 | ||
143 | int MotrMetaCache::watch_cb(const DoutPrefixProvider *dpp, | |
144 | uint64_t notify_id, | |
145 | uint64_t cookie, | |
146 | uint64_t notifier_id, | |
147 | bufferlist& bl) | |
148 | { | |
149 | return 0; | |
150 | } | |
151 | ||
152 | void MotrMetaCache::set_enabled(bool status) | |
153 | { | |
154 | cache.set_enabled(status); | |
155 | } | |
156 | ||
157 | // TODO: properly handle the number of key/value pairs to get in | |
158 | // one query. Now the POC simply tries to retrieve all `max` number of pairs | |
159 | // with starting key `marker`. | |
160 | int MotrUser::list_buckets(const DoutPrefixProvider *dpp, const string& marker, | |
161 | const string& end_marker, uint64_t max, bool need_stats, | |
162 | BucketList &buckets, optional_yield y) | |
163 | { | |
164 | int rc; | |
165 | vector<string> keys(max); | |
166 | vector<bufferlist> vals(max); | |
167 | bool is_truncated = false; | |
168 | ||
169 | ldpp_dout(dpp, 20) <<__func__<< ": list_user_buckets: marker=" << marker | |
170 | << " end_marker=" << end_marker | |
171 | << " max=" << max << dendl; | |
172 | ||
173 | // Retrieve all `max` number of pairs. | |
174 | buckets.clear(); | |
175 | string user_info_iname = "motr.rgw.user.info." + info.user_id.to_str(); | |
176 | keys[0] = marker; | |
177 | rc = store->next_query_by_name(user_info_iname, keys, vals); | |
178 | if (rc < 0) { | |
179 | ldpp_dout(dpp, 0) << "ERROR: NEXT query failed. " << rc << dendl; | |
180 | return rc; | |
181 | } | |
182 | ||
183 | // Process the returned pairs to add into BucketList. | |
184 | uint64_t bcount = 0; | |
185 | for (const auto& bl: vals) { | |
186 | if (bl.length() == 0) | |
187 | break; | |
188 | ||
189 | RGWBucketEnt ent; | |
190 | auto iter = bl.cbegin(); | |
191 | ent.decode(iter); | |
192 | ||
193 | std::time_t ctime = ceph::real_clock::to_time_t(ent.creation_time); | |
194 | ldpp_dout(dpp, 20) << "got creation time: << " << std::put_time(std::localtime(&ctime), "%F %T") << dendl; | |
195 | ||
196 | if (!end_marker.empty() && | |
197 | end_marker.compare(ent.bucket.marker) <= 0) | |
198 | break; | |
199 | ||
200 | buckets.add(std::make_unique<MotrBucket>(this->store, ent, this)); | |
201 | bcount++; | |
202 | } | |
203 | if (bcount == max) | |
204 | is_truncated = true; | |
205 | buckets.set_truncated(is_truncated); | |
206 | ||
207 | return 0; | |
208 | } | |
209 | ||
210 | int MotrUser::create_bucket(const DoutPrefixProvider* dpp, | |
211 | const rgw_bucket& b, | |
212 | const std::string& zonegroup_id, | |
213 | rgw_placement_rule& placement_rule, | |
214 | std::string& swift_ver_location, | |
215 | const RGWQuotaInfo* pquota_info, | |
216 | const RGWAccessControlPolicy& policy, | |
217 | Attrs& attrs, | |
218 | RGWBucketInfo& info, | |
219 | obj_version& ep_objv, | |
220 | bool exclusive, | |
221 | bool obj_lock_enabled, | |
222 | bool* existed, | |
223 | req_info& req_info, | |
224 | std::unique_ptr<Bucket>* bucket_out, | |
225 | optional_yield y) | |
226 | { | |
227 | int ret; | |
228 | std::unique_ptr<Bucket> bucket; | |
229 | ||
230 | // Look up the bucket. Create it if it doesn't exist. | |
231 | ret = this->store->get_bucket(dpp, this, b, &bucket, y); | |
232 | if (ret < 0 && ret != -ENOENT) | |
233 | return ret; | |
234 | ||
235 | if (ret != -ENOENT) { | |
236 | *existed = true; | |
237 | // if (swift_ver_location.empty()) { | |
238 | // swift_ver_location = bucket->get_info().swift_ver_location; | |
239 | // } | |
240 | // placement_rule.inherit_from(bucket->get_info().placement_rule); | |
241 | ||
242 | // TODO: ACL policy | |
243 | // // don't allow changes to the acl policy | |
244 | //RGWAccessControlPolicy old_policy(ctx()); | |
245 | //int rc = rgw_op_get_bucket_policy_from_attr( | |
246 | // dpp, this, u, bucket->get_attrs(), &old_policy, y); | |
247 | //if (rc >= 0 && old_policy != policy) { | |
248 | // bucket_out->swap(bucket); | |
249 | // return -EEXIST; | |
250 | //} | |
251 | } else { | |
252 | ||
253 | placement_rule.name = "default"; | |
254 | placement_rule.storage_class = "STANDARD"; | |
255 | bucket = std::make_unique<MotrBucket>(store, b, this); | |
256 | bucket->set_attrs(attrs); | |
257 | *existed = false; | |
258 | } | |
259 | ||
260 | if (!*existed){ | |
261 | // TODO: how to handle zone and multi-site. | |
262 | info.placement_rule = placement_rule; | |
263 | info.bucket = b; | |
264 | info.owner = this->get_info().user_id; | |
265 | info.zonegroup = zonegroup_id; | |
266 | if (obj_lock_enabled) | |
267 | info.flags = BUCKET_VERSIONED | BUCKET_OBJ_LOCK_ENABLED; | |
268 | bucket->set_version(ep_objv); | |
269 | bucket->get_info() = info; | |
270 | ||
271 | // Create a new bucket: (1) Add a key/value pair in the | |
272 | // bucket instance index. (2) Create a new bucket index. | |
273 | MotrBucket* mbucket = static_cast<MotrBucket*>(bucket.get()); | |
274 | ret = mbucket->put_info(dpp, y, ceph::real_time())? : | |
275 | mbucket->create_bucket_index() ? : | |
276 | mbucket->create_multipart_indices(); | |
277 | if (ret < 0) | |
278 | ldpp_dout(dpp, 0) << "ERROR: failed to create bucket indices! " << ret << dendl; | |
279 | ||
280 | // Insert the bucket entry into the user info index. | |
281 | ret = mbucket->link_user(dpp, this, y); | |
282 | if (ret < 0) | |
283 | ldpp_dout(dpp, 0) << "ERROR: failed to add bucket entry! " << ret << dendl; | |
284 | } else { | |
285 | return -EEXIST; | |
286 | // bucket->set_version(ep_objv); | |
287 | // bucket->get_info() = info; | |
288 | } | |
289 | ||
290 | bucket_out->swap(bucket); | |
291 | ||
292 | return ret; | |
293 | } | |
294 | ||
295 | int MotrUser::read_attrs(const DoutPrefixProvider* dpp, optional_yield y) | |
296 | { | |
297 | return 0; | |
298 | } | |
299 | ||
300 | int MotrUser::read_stats(const DoutPrefixProvider *dpp, | |
301 | optional_yield y, RGWStorageStats* stats, | |
302 | ceph::real_time *last_stats_sync, | |
303 | ceph::real_time *last_stats_update) | |
304 | { | |
305 | return 0; | |
306 | } | |
307 | ||
308 | /* stats - Not for first pass */ | |
309 | int MotrUser::read_stats_async(const DoutPrefixProvider *dpp, RGWGetUserStats_CB *cb) | |
310 | { | |
311 | return 0; | |
312 | } | |
313 | ||
314 | int MotrUser::complete_flush_stats(const DoutPrefixProvider *dpp, optional_yield y) | |
315 | { | |
316 | return 0; | |
317 | } | |
318 | ||
319 | int MotrUser::read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, | |
320 | bool *is_truncated, RGWUsageIter& usage_iter, | |
321 | map<rgw_user_bucket, rgw_usage_log_entry>& usage) | |
322 | { | |
323 | return 0; | |
324 | } | |
325 | ||
326 | int MotrUser::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) | |
327 | { | |
328 | return 0; | |
329 | } | |
330 | ||
331 | int MotrUser::load_user_from_idx(const DoutPrefixProvider *dpp, | |
332 | MotrStore *store, | |
333 | RGWUserInfo& info, map<string, bufferlist> *attrs, | |
334 | RGWObjVersionTracker *objv_tr) | |
335 | { | |
336 | struct MotrUserInfo muinfo; | |
337 | bufferlist bl; | |
338 | ldpp_dout(dpp, 20) << "info.user_id.id = " << info.user_id.id << dendl; | |
339 | if (store->get_user_cache()->get(dpp, info.user_id.id, bl)) { | |
340 | // Cache misses | |
341 | int rc = store->do_idx_op_by_name(RGW_MOTR_USERS_IDX_NAME, | |
342 | M0_IC_GET, info.user_id.to_str(), bl); | |
343 | ldpp_dout(dpp, 20) << "do_idx_op_by_name() = " << rc << dendl; | |
344 | if (rc < 0) | |
345 | return rc; | |
346 | ||
347 | // Put into cache. | |
348 | store->get_user_cache()->put(dpp, info.user_id.id, bl); | |
349 | } | |
350 | ||
351 | bufferlist& blr = bl; | |
352 | auto iter = blr.cbegin(); | |
353 | muinfo.decode(iter); | |
354 | info = muinfo.info; | |
355 | if (attrs) | |
356 | *attrs = muinfo.attrs; | |
357 | if (objv_tr) | |
358 | { | |
359 | objv_tr->read_version = muinfo.user_version; | |
360 | objv_tracker.read_version = objv_tr->read_version; | |
361 | } | |
362 | ||
363 | if (!info.access_keys.empty()) { | |
364 | for(auto key : info.access_keys) { | |
365 | access_key_tracker.insert(key.first); | |
366 | } | |
367 | } | |
368 | ||
369 | return 0; | |
370 | } | |
371 | ||
372 | int MotrUser::load_user(const DoutPrefixProvider *dpp, | |
373 | optional_yield y) | |
374 | { | |
375 | ldpp_dout(dpp, 20) << "load user: user id = " << info.user_id.to_str() << dendl; | |
376 | return load_user_from_idx(dpp, store, info, &attrs, &objv_tracker); | |
377 | } | |
378 | ||
379 | int MotrUser::create_user_info_idx() | |
380 | { | |
381 | string user_info_iname = "motr.rgw.user.info." + info.user_id.to_str(); | |
382 | return store->create_motr_idx_by_name(user_info_iname); | |
383 | } | |
384 | ||
385 | int MotrUser::merge_and_store_attrs(const DoutPrefixProvider* dpp, Attrs& new_attrs, optional_yield y) | |
386 | { | |
387 | for (auto& it : new_attrs) | |
388 | attrs[it.first] = it.second; | |
389 | ||
390 | return store_user(dpp, y, false); | |
391 | } | |
392 | ||
393 | int MotrUser::store_user(const DoutPrefixProvider* dpp, | |
394 | optional_yield y, bool exclusive, RGWUserInfo* old_info) | |
395 | { | |
396 | bufferlist bl; | |
397 | struct MotrUserInfo muinfo; | |
398 | RGWUserInfo orig_info; | |
399 | RGWObjVersionTracker objv_tr = {}; | |
400 | obj_version& obj_ver = objv_tr.read_version; | |
401 | ||
402 | ldpp_dout(dpp, 20) << "Store_user(): User = " << info.user_id.id << dendl; | |
403 | orig_info.user_id = info.user_id; | |
404 | // XXX: we open and close motr idx 2 times in this method: | |
405 | // 1) on load_user_from_idx() here and 2) on do_idx_op_by_name(PUT) below. | |
406 | // Maybe this can be optimised later somewhow. | |
407 | int rc = load_user_from_idx(dpp, store, orig_info, nullptr, &objv_tr); | |
408 | ldpp_dout(dpp, 10) << "Get user: rc = " << rc << dendl; | |
409 | ||
410 | // Check if the user already exists | |
411 | if (rc == 0 && obj_ver.ver > 0) { | |
412 | if (old_info) | |
413 | *old_info = orig_info; | |
414 | ||
415 | if (obj_ver.ver != objv_tracker.read_version.ver) { | |
416 | rc = -ECANCELED; | |
417 | ldpp_dout(dpp, 0) << "ERROR: User Read version mismatch" << dendl; | |
418 | goto out; | |
419 | } | |
420 | ||
421 | if (exclusive) | |
422 | return rc; | |
423 | ||
424 | obj_ver.ver++; | |
425 | } else { | |
426 | obj_ver.ver = 1; | |
427 | obj_ver.tag = "UserTAG"; | |
428 | } | |
429 | ||
430 | // Insert the user to user info index. | |
431 | muinfo.info = info; | |
432 | muinfo.attrs = attrs; | |
433 | muinfo.user_version = obj_ver; | |
434 | muinfo.encode(bl); | |
435 | rc = store->do_idx_op_by_name(RGW_MOTR_USERS_IDX_NAME, | |
436 | M0_IC_PUT, info.user_id.to_str(), bl); | |
437 | ldpp_dout(dpp, 10) << "Store user to motr index: rc = " << rc << dendl; | |
438 | if (rc == 0) { | |
439 | objv_tracker.read_version = obj_ver; | |
440 | objv_tracker.write_version = obj_ver; | |
441 | } | |
442 | ||
443 | // Store access key in access key index | |
444 | if (!info.access_keys.empty()) { | |
445 | std::string access_key; | |
446 | std::string secret_key; | |
447 | std::map<std::string, RGWAccessKey>::const_iterator iter = info.access_keys.begin(); | |
448 | const RGWAccessKey& k = iter->second; | |
449 | access_key = k.id; | |
450 | secret_key = k.key; | |
451 | MotrAccessKey MGWUserKeys(access_key, secret_key, info.user_id.to_str()); | |
452 | store->store_access_key(dpp, y, MGWUserKeys); | |
453 | access_key_tracker.insert(access_key); | |
454 | } | |
455 | ||
456 | // Check if any key need to be deleted | |
457 | if (access_key_tracker.size() != info.access_keys.size()) { | |
458 | std::string key_for_deletion; | |
459 | for (auto key : access_key_tracker) { | |
460 | if (!info.get_key(key)) { | |
461 | key_for_deletion = key; | |
462 | ldpp_dout(dpp, 0) << "Deleting access key: " << key_for_deletion << dendl; | |
463 | store->delete_access_key(dpp, y, key_for_deletion); | |
464 | if (rc < 0) { | |
465 | ldpp_dout(dpp, 0) << "Unable to delete access key" << rc << dendl; | |
466 | } | |
467 | } | |
468 | } | |
469 | if(rc >= 0){ | |
470 | access_key_tracker.erase(key_for_deletion); | |
471 | } | |
472 | } | |
473 | ||
474 | if (!info.user_email.empty()) { | |
475 | MotrEmailInfo MGWEmailInfo(info.user_id.to_str(), info.user_email); | |
476 | store->store_email_info(dpp, y, MGWEmailInfo); | |
477 | } | |
478 | ||
479 | // Create user info index to store all buckets that are belong | |
480 | // to this bucket. | |
481 | rc = create_user_info_idx(); | |
482 | if (rc < 0 && rc != -EEXIST) { | |
483 | ldpp_dout(dpp, 0) << "Failed to create user info index: rc = " << rc << dendl; | |
484 | goto out; | |
485 | } | |
486 | ||
487 | // Put the user info into cache. | |
488 | rc = store->get_user_cache()->put(dpp, info.user_id.id, bl); | |
489 | ||
490 | out: | |
491 | return rc; | |
492 | } | |
493 | ||
494 | int MotrUser::remove_user(const DoutPrefixProvider* dpp, optional_yield y) | |
495 | { | |
496 | // Remove user info from cache | |
497 | // Delete access keys for user | |
498 | // Delete user info | |
499 | // Delete user from user index | |
500 | // Delete email for user - TODO | |
501 | bufferlist bl; | |
502 | int rc; | |
503 | // Remove the user info from cache. | |
504 | store->get_user_cache()->remove(dpp, info.user_id.id); | |
505 | ||
506 | // Delete all access key of user | |
507 | if (!info.access_keys.empty()) { | |
508 | for(auto acc_key = info.access_keys.begin(); acc_key != info.access_keys.end(); acc_key++) { | |
509 | auto access_key = acc_key->first; | |
510 | rc = store->delete_access_key(dpp, y, access_key); | |
511 | // TODO | |
512 | // Check error code for access_key does not exist | |
513 | // Continue to next step only if delete failed because key doesn't exists | |
514 | if (rc < 0){ | |
515 | ldpp_dout(dpp, 0) << "Unable to delete access key" << rc << dendl; | |
516 | } | |
517 | } | |
518 | } | |
519 | ||
520 | //Delete email id | |
521 | if (!info.user_email.empty()) { | |
522 | rc = store->do_idx_op_by_name(RGW_IAM_MOTR_EMAIL_KEY, | |
523 | M0_IC_DEL, info.user_email, bl); | |
524 | if (rc < 0 && rc != -ENOENT) { | |
525 | ldpp_dout(dpp, 0) << "Unable to delete email id " << rc << dendl; | |
526 | } | |
527 | } | |
528 | ||
529 | // Delete user info index | |
530 | string user_info_iname = "motr.rgw.user.info." + info.user_id.to_str(); | |
531 | store->delete_motr_idx_by_name(user_info_iname); | |
532 | ldpp_dout(dpp, 10) << "Deleted user info index - " << user_info_iname << dendl; | |
533 | ||
534 | // Delete user from user index | |
535 | rc = store->do_idx_op_by_name(RGW_MOTR_USERS_IDX_NAME, | |
536 | M0_IC_DEL, info.user_id.to_str(), bl); | |
537 | if (rc < 0){ | |
538 | ldpp_dout(dpp, 0) << "Unable to delete user from user index " << rc << dendl; | |
539 | return rc; | |
540 | } | |
541 | ||
542 | // TODO | |
543 | // Delete email for user | |
544 | // rc = store->do_idx_op_by_name(RGW_IAM_MOTR_EMAIL_KEY, | |
545 | // M0_IC_DEL, info.user_email, bl); | |
546 | // if (rc < 0){ | |
547 | // ldpp_dout(dpp, 0) << "Unable to delete email for user" << rc << dendl; | |
548 | // return rc; | |
549 | // } | |
550 | return 0; | |
551 | } | |
552 | ||
553 | int MotrUser::verify_mfa(const std::string& mfa_str, bool* verified, const DoutPrefixProvider *dpp, optional_yield y) | |
554 | { | |
555 | *verified = false; | |
556 | return 0; | |
557 | } | |
558 | ||
559 | int MotrBucket::remove_bucket(const DoutPrefixProvider *dpp, bool delete_children, bool forward_to_master, req_info* req_info, optional_yield y) | |
560 | { | |
561 | int ret; | |
562 | ||
563 | ldpp_dout(dpp, 20) << "remove_bucket Entry=" << info.bucket.name << dendl; | |
564 | ||
565 | // Refresh info | |
566 | ret = load_bucket(dpp, y); | |
567 | if (ret < 0) { | |
568 | ldpp_dout(dpp, 0) << "ERROR: remove_bucket load_bucket failed rc=" << ret << dendl; | |
569 | return ret; | |
570 | } | |
571 | ||
572 | ListParams params; | |
573 | params.list_versions = true; | |
574 | params.allow_unordered = true; | |
575 | ||
576 | ListResults results; | |
577 | ||
578 | // 1. Check if Bucket has objects. | |
579 | // If bucket contains objects and delete_children is true, delete all objects. | |
580 | // Else throw error that bucket is not empty. | |
581 | do { | |
582 | results.objs.clear(); | |
583 | ||
584 | // Check if bucket has objects. | |
585 | ret = list(dpp, params, 1000, results, y); | |
586 | if (ret < 0) { | |
587 | return ret; | |
588 | } | |
589 | ||
590 | // If result contains entries, bucket is not empty. | |
591 | if (!results.objs.empty() && !delete_children) { | |
592 | ldpp_dout(dpp, 0) << "ERROR: could not remove non-empty bucket " << info.bucket.name << dendl; | |
593 | return -ENOTEMPTY; | |
594 | } | |
595 | ||
596 | for (const auto& obj : results.objs) { | |
597 | rgw_obj_key key(obj.key); | |
598 | /* xxx dang */ | |
599 | ret = rgw_remove_object(dpp, store, this, key); | |
600 | if (ret < 0 && ret != -ENOENT) { | |
601 | ldpp_dout(dpp, 0) << "ERROR: remove_bucket rgw_remove_object failed rc=" << ret << dendl; | |
602 | return ret; | |
603 | } | |
604 | } | |
605 | } while(results.is_truncated); | |
606 | ||
607 | // 2. Abort Mp uploads on the bucket. | |
608 | ret = abort_multiparts(dpp, store->ctx()); | |
609 | if (ret < 0) { | |
610 | return ret; | |
611 | } | |
612 | ||
613 | // 3. Remove mp index?? | |
614 | string bucket_multipart_iname = "motr.rgw.bucket." + info.bucket.name + ".multiparts"; | |
615 | ret = store->delete_motr_idx_by_name(bucket_multipart_iname); | |
616 | if (ret < 0) { | |
617 | ldpp_dout(dpp, 0) << "ERROR: remove_bucket failed to remove multipart index rc=" << ret << dendl; | |
618 | return ret; | |
619 | } | |
620 | ||
621 | // 4. Sync user stats. | |
622 | ret = this->sync_user_stats(dpp, y); | |
623 | if (ret < 0) { | |
624 | ldout(store->ctx(), 1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret << dendl; | |
625 | } | |
626 | ||
627 | // 5. Remove the bucket from user info index. (unlink user) | |
628 | ret = this->unlink_user(dpp, owner, y); | |
629 | if (ret < 0) { | |
630 | ldpp_dout(dpp, 0) << "ERROR: remove_bucket unlink_user failed rc=" << ret << dendl; | |
631 | return ret; | |
632 | } | |
633 | ||
634 | // 6. Remove bucket index. | |
635 | string bucket_index_iname = "motr.rgw.bucket.index." + info.bucket.name; | |
636 | ret = store->delete_motr_idx_by_name(bucket_index_iname); | |
637 | if (ret < 0) { | |
638 | ldpp_dout(dpp, 0) << "ERROR: remove_bucket unlink_user failed rc=" << ret << dendl; | |
639 | return ret; | |
640 | } | |
641 | ||
642 | // 7. Remove bucket instance info. | |
643 | bufferlist bl; | |
644 | ret = store->get_bucket_inst_cache()->remove(dpp, info.bucket.name); | |
645 | if (ret < 0) { | |
646 | ldpp_dout(dpp, 0) << "ERROR: remove_bucket failed to remove bucket instance from cache rc=" | |
647 | << ret << dendl; | |
648 | return ret; | |
649 | } | |
650 | ||
651 | ret = store->do_idx_op_by_name(RGW_MOTR_BUCKET_INST_IDX_NAME, | |
652 | M0_IC_DEL, info.bucket.name, bl); | |
653 | if (ret < 0) { | |
654 | ldpp_dout(dpp, 0) << "ERROR: remove_bucket failed to remove bucket instance rc=" | |
655 | << ret << dendl; | |
656 | return ret; | |
657 | } | |
658 | ||
659 | // TODO : | |
660 | // 8. Remove Notifications | |
661 | // if bucket has notification definitions associated with it | |
662 | // they should be removed (note that any pending notifications on the bucket are still going to be sent) | |
663 | ||
664 | // 9. Forward request to master. | |
665 | if (forward_to_master) { | |
666 | bufferlist in_data; | |
667 | ret = store->forward_request_to_master(dpp, owner, &bucket_version, in_data, nullptr, *req_info, y); | |
668 | if (ret < 0) { | |
669 | if (ret == -ENOENT) { | |
670 | /* adjust error, we want to return with NoSuchBucket and not | |
671 | * NoSuchKey */ | |
672 | ret = -ERR_NO_SUCH_BUCKET; | |
673 | } | |
674 | ldpp_dout(dpp, 0) << "ERROR: Forward to master failed. ret=" << ret << dendl; | |
675 | return ret; | |
676 | } | |
677 | } | |
678 | ||
679 | ldpp_dout(dpp, 20) << "remove_bucket Exit=" << info.bucket.name << dendl; | |
680 | ||
681 | return ret; | |
682 | } | |
683 | ||
684 | int MotrBucket::remove_bucket_bypass_gc(int concurrent_max, bool | |
685 | keep_index_consistent, | |
686 | optional_yield y, const | |
687 | DoutPrefixProvider *dpp) { | |
688 | return 0; | |
689 | } | |
690 | ||
691 | int MotrBucket::put_info(const DoutPrefixProvider *dpp, bool exclusive, ceph::real_time _mtime) | |
692 | { | |
693 | bufferlist bl; | |
694 | struct MotrBucketInfo mbinfo; | |
695 | ||
696 | ldpp_dout(dpp, 20) << "put_info(): bucket_id=" << info.bucket.bucket_id << dendl; | |
697 | mbinfo.info = info; | |
698 | mbinfo.bucket_attrs = attrs; | |
699 | mbinfo.mtime = _mtime; | |
700 | mbinfo.bucket_version = bucket_version; | |
701 | mbinfo.encode(bl); | |
702 | ||
703 | // Insert bucket instance using bucket's marker (string). | |
704 | int rc = store->do_idx_op_by_name(RGW_MOTR_BUCKET_INST_IDX_NAME, | |
705 | M0_IC_PUT, info.bucket.name, bl, !exclusive); | |
706 | if (rc == 0) | |
707 | store->get_bucket_inst_cache()->put(dpp, info.bucket.name, bl); | |
708 | ||
709 | return rc; | |
710 | } | |
711 | ||
712 | int MotrBucket::load_bucket(const DoutPrefixProvider *dpp, optional_yield y, bool get_stats) | |
713 | { | |
714 | // Get bucket instance using bucket's name (string). or bucket id? | |
715 | bufferlist bl; | |
716 | if (store->get_bucket_inst_cache()->get(dpp, info.bucket.name, bl)) { | |
717 | // Cache misses. | |
718 | ldpp_dout(dpp, 20) << "load_bucket(): name=" << info.bucket.name << dendl; | |
719 | int rc = store->do_idx_op_by_name(RGW_MOTR_BUCKET_INST_IDX_NAME, | |
720 | M0_IC_GET, info.bucket.name, bl); | |
721 | ldpp_dout(dpp, 20) << "load_bucket(): rc=" << rc << dendl; | |
722 | if (rc < 0) | |
723 | return rc; | |
724 | store->get_bucket_inst_cache()->put(dpp, info.bucket.name, bl); | |
725 | } | |
726 | ||
727 | struct MotrBucketInfo mbinfo; | |
728 | bufferlist& blr = bl; | |
729 | auto iter =blr.cbegin(); | |
730 | mbinfo.decode(iter); //Decode into MotrBucketInfo. | |
731 | ||
732 | info = mbinfo.info; | |
733 | ldpp_dout(dpp, 20) << "load_bucket(): bucket_id=" << info.bucket.bucket_id << dendl; | |
734 | rgw_placement_rule placement_rule; | |
735 | placement_rule.name = "default"; | |
736 | placement_rule.storage_class = "STANDARD"; | |
737 | info.placement_rule = placement_rule; | |
738 | ||
739 | attrs = mbinfo.bucket_attrs; | |
740 | mtime = mbinfo.mtime; | |
741 | bucket_version = mbinfo.bucket_version; | |
742 | ||
743 | return 0; | |
744 | } | |
745 | ||
746 | int MotrBucket::link_user(const DoutPrefixProvider* dpp, User* new_user, optional_yield y) | |
747 | { | |
748 | bufferlist bl; | |
749 | RGWBucketEnt new_bucket; | |
750 | ceph::real_time creation_time = get_creation_time(); | |
751 | ||
752 | // RGWBucketEnt or cls_user_bucket_entry is the structure that is stored. | |
753 | new_bucket.bucket = info.bucket; | |
754 | new_bucket.size = 0; | |
755 | if (real_clock::is_zero(creation_time)) | |
756 | creation_time = ceph::real_clock::now(); | |
757 | new_bucket.creation_time = creation_time; | |
758 | new_bucket.encode(bl); | |
759 | std::time_t ctime = ceph::real_clock::to_time_t(new_bucket.creation_time); | |
760 | ldpp_dout(dpp, 20) << "got creation time: << " << std::put_time(std::localtime(&ctime), "%F %T") << dendl; | |
761 | ||
762 | // Insert the user into the user info index. | |
763 | string user_info_idx_name = "motr.rgw.user.info." + new_user->get_info().user_id.to_str(); | |
764 | return store->do_idx_op_by_name(user_info_idx_name, | |
765 | M0_IC_PUT, info.bucket.name, bl); | |
766 | ||
767 | } | |
768 | ||
769 | int MotrBucket::unlink_user(const DoutPrefixProvider* dpp, User* new_user, optional_yield y) | |
770 | { | |
771 | // Remove the user into the user info index. | |
772 | bufferlist bl; | |
773 | string user_info_idx_name = "motr.rgw.user.info." + new_user->get_info().user_id.to_str(); | |
774 | return store->do_idx_op_by_name(user_info_idx_name, | |
775 | M0_IC_DEL, info.bucket.name, bl); | |
776 | } | |
777 | ||
778 | /* stats - Not for first pass */ | |
779 | int MotrBucket::read_stats(const DoutPrefixProvider *dpp, | |
780 | const bucket_index_layout_generation& idx_layout, int shard_id, | |
781 | std::string *bucket_ver, std::string *master_ver, | |
782 | std::map<RGWObjCategory, RGWStorageStats>& stats, | |
783 | std::string *max_marker, bool *syncstopped) | |
784 | { | |
785 | return 0; | |
786 | } | |
787 | ||
788 | int MotrBucket::create_bucket_index() | |
789 | { | |
790 | string bucket_index_iname = "motr.rgw.bucket.index." + info.bucket.name; | |
791 | return store->create_motr_idx_by_name(bucket_index_iname); | |
792 | } | |
793 | ||
794 | int MotrBucket::create_multipart_indices() | |
795 | { | |
796 | int rc; | |
797 | ||
798 | // Bucket multipart index stores in-progress multipart uploads. | |
799 | // Key is the object name + upload_id, value is a rgw_bucket_dir_entry. | |
800 | // An entry is inserted when a multipart upload is initialised ( | |
801 | // MotrMultipartUpload::init()) and will be removed when the upload | |
802 | // is completed (MotrMultipartUpload::complete()). | |
803 | // MotrBucket::list_multiparts() will scan this index to return all | |
804 | // in-progress multipart uploads in the bucket. | |
805 | string bucket_multipart_iname = "motr.rgw.bucket." + info.bucket.name + ".multiparts"; | |
806 | rc = store->create_motr_idx_by_name(bucket_multipart_iname); | |
807 | if (rc < 0) { | |
808 | ldout(store->cctx, 0) << "Failed to create bucket multipart index " << bucket_multipart_iname << dendl; | |
809 | return rc; | |
810 | } | |
811 | ||
812 | return 0; | |
813 | } | |
814 | ||
815 | ||
816 | int MotrBucket::read_stats_async(const DoutPrefixProvider *dpp, | |
817 | const bucket_index_layout_generation& idx_layout, | |
818 | int shard_id, RGWGetBucketStats_CB *ctx) | |
819 | { | |
820 | return 0; | |
821 | } | |
822 | ||
823 | int MotrBucket::sync_user_stats(const DoutPrefixProvider *dpp, optional_yield y) | |
824 | { | |
825 | return 0; | |
826 | } | |
827 | ||
828 | int MotrBucket::update_container_stats(const DoutPrefixProvider *dpp) | |
829 | { | |
830 | return 0; | |
831 | } | |
832 | ||
833 | int MotrBucket::check_bucket_shards(const DoutPrefixProvider *dpp) | |
834 | { | |
835 | return 0; | |
836 | } | |
837 | ||
838 | int MotrBucket::chown(const DoutPrefixProvider *dpp, User& new_user, optional_yield y) | |
839 | { | |
840 | // TODO: update bucket with new owner | |
841 | return 0; | |
842 | } | |
843 | ||
844 | /* Make sure to call load_bucket() if you need it first */ | |
845 | bool MotrBucket::is_owner(User* user) | |
846 | { | |
847 | return (info.owner.compare(user->get_id()) == 0); | |
848 | } | |
849 | ||
850 | int MotrBucket::check_empty(const DoutPrefixProvider *dpp, optional_yield y) | |
851 | { | |
852 | /* XXX: Check if bucket contains any objects */ | |
853 | return 0; | |
854 | } | |
855 | ||
856 | int MotrBucket::check_quota(const DoutPrefixProvider *dpp, RGWQuota& quota, uint64_t obj_size, | |
857 | optional_yield y, bool check_size_only) | |
858 | { | |
859 | /* Not Handled in the first pass as stats are also needed */ | |
860 | return 0; | |
861 | } | |
862 | ||
863 | int MotrBucket::merge_and_store_attrs(const DoutPrefixProvider *dpp, Attrs& new_attrs, optional_yield y) | |
864 | { | |
865 | for (auto& it : new_attrs) | |
866 | attrs[it.first] = it.second; | |
867 | ||
868 | return put_info(dpp, y, ceph::real_time()); | |
869 | } | |
870 | ||
871 | int MotrBucket::try_refresh_info(const DoutPrefixProvider *dpp, ceph::real_time *pmtime) | |
872 | { | |
873 | return 0; | |
874 | } | |
875 | ||
876 | /* XXX: usage and stats not supported in the first pass */ | |
877 | int MotrBucket::read_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, | |
878 | uint32_t max_entries, bool *is_truncated, | |
879 | RGWUsageIter& usage_iter, | |
880 | map<rgw_user_bucket, rgw_usage_log_entry>& usage) | |
881 | { | |
882 | return 0; | |
883 | } | |
884 | ||
885 | int MotrBucket::trim_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) | |
886 | { | |
887 | return 0; | |
888 | } | |
889 | ||
890 | int MotrBucket::remove_objs_from_index(const DoutPrefixProvider *dpp, std::list<rgw_obj_index_key>& objs_to_unlink) | |
891 | { | |
892 | /* XXX: CHECK: Unlike RadosStore, there is no seperate bucket index table. | |
893 | * Delete all the object in the list from the object table of this | |
894 | * bucket | |
895 | */ | |
896 | return 0; | |
897 | } | |
898 | ||
899 | int MotrBucket::check_index(const DoutPrefixProvider *dpp, std::map<RGWObjCategory, RGWStorageStats>& existing_stats, std::map<RGWObjCategory, RGWStorageStats>& calculated_stats) | |
900 | { | |
901 | /* XXX: stats not supported yet */ | |
902 | return 0; | |
903 | } | |
904 | ||
905 | int MotrBucket::rebuild_index(const DoutPrefixProvider *dpp) | |
906 | { | |
907 | /* there is no index table in dbstore. Not applicable */ | |
908 | return 0; | |
909 | } | |
910 | ||
911 | int MotrBucket::set_tag_timeout(const DoutPrefixProvider *dpp, uint64_t timeout) | |
912 | { | |
913 | /* XXX: CHECK: set tag timeout for all the bucket objects? */ | |
914 | return 0; | |
915 | } | |
916 | ||
917 | int MotrBucket::purge_instance(const DoutPrefixProvider *dpp) | |
918 | { | |
919 | /* XXX: CHECK: for dbstore only single instance supported. | |
920 | * Remove all the objects for that instance? Anything extra needed? | |
921 | */ | |
922 | return 0; | |
923 | } | |
924 | ||
925 | int MotrBucket::set_acl(const DoutPrefixProvider *dpp, RGWAccessControlPolicy &acl, optional_yield y) | |
926 | { | |
927 | int ret = 0; | |
928 | bufferlist aclbl; | |
929 | ||
930 | acls = acl; | |
931 | acl.encode(aclbl); | |
932 | ||
933 | Attrs attrs = get_attrs(); | |
934 | attrs[RGW_ATTR_ACL] = aclbl; | |
935 | ||
936 | // TODO: update bucket entry with the new attrs | |
937 | ||
938 | return ret; | |
939 | } | |
940 | ||
941 | std::unique_ptr<Object> MotrBucket::get_object(const rgw_obj_key& k) | |
942 | { | |
943 | return std::make_unique<MotrObject>(this->store, k, this); | |
944 | } | |
945 | ||
946 | int MotrBucket::list(const DoutPrefixProvider *dpp, ListParams& params, int max, ListResults& results, optional_yield y) | |
947 | { | |
948 | int rc; | |
949 | vector<string> keys(max); | |
950 | vector<bufferlist> vals(max); | |
951 | ||
952 | ldpp_dout(dpp, 20) << "bucket=" << info.bucket.name | |
953 | << " prefix=" << params.prefix | |
954 | << " marker=" << params.marker | |
955 | << " max=" << max << dendl; | |
956 | ||
957 | // Retrieve all `max` number of pairs. | |
958 | string bucket_index_iname = "motr.rgw.bucket.index." + info.bucket.name; | |
959 | keys[0] = params.marker.empty() ? params.prefix : | |
960 | params.marker.get_oid(); | |
961 | rc = store->next_query_by_name(bucket_index_iname, keys, vals, params.prefix, | |
962 | params.delim); | |
963 | if (rc < 0) { | |
964 | ldpp_dout(dpp, 0) << "ERROR: NEXT query failed. " << rc << dendl; | |
965 | return rc; | |
966 | } | |
967 | ||
968 | // Process the returned pairs to add into ListResults. | |
969 | int i = 0; | |
970 | for (; i < rc; ++i) { | |
971 | if (vals[i].length() == 0) { | |
972 | results.common_prefixes[keys[i]] = true; | |
973 | } else { | |
974 | rgw_bucket_dir_entry ent; | |
975 | auto iter = vals[i].cbegin(); | |
976 | ent.decode(iter); | |
977 | if (params.list_versions || ent.is_visible()) | |
978 | results.objs.emplace_back(std::move(ent)); | |
979 | } | |
980 | } | |
981 | ||
982 | if (i == max) { | |
983 | results.is_truncated = true; | |
984 | results.next_marker = keys[max - 1] + " "; | |
985 | } else { | |
986 | results.is_truncated = false; | |
987 | } | |
988 | ||
989 | return 0; | |
990 | } | |
991 | ||
992 | int MotrBucket::list_multiparts(const DoutPrefixProvider *dpp, | |
993 | const string& prefix, | |
994 | string& marker, | |
995 | const string& delim, | |
996 | const int& max_uploads, | |
997 | vector<std::unique_ptr<MultipartUpload>>& uploads, | |
998 | map<string, bool> *common_prefixes, | |
999 | bool *is_truncated) | |
1000 | { | |
1001 | int rc; | |
1002 | vector<string> key_vec(max_uploads); | |
1003 | vector<bufferlist> val_vec(max_uploads); | |
1004 | ||
1005 | string bucket_multipart_iname = | |
1006 | "motr.rgw.bucket." + this->get_name() + ".multiparts"; | |
1007 | key_vec[0].clear(); | |
1008 | key_vec[0].assign(marker.begin(), marker.end()); | |
1009 | rc = store->next_query_by_name(bucket_multipart_iname, key_vec, val_vec); | |
1010 | if (rc < 0) { | |
1011 | ldpp_dout(dpp, 0) << "ERROR: NEXT query failed. " << rc << dendl; | |
1012 | return rc; | |
1013 | } | |
1014 | ||
1015 | // Process the returned pairs to add into ListResults. | |
1016 | // The POC can only support listing all objects or selecting | |
1017 | // with prefix. | |
1018 | int ocount = 0; | |
1019 | rgw_obj_key last_obj_key; | |
1020 | *is_truncated = false; | |
1021 | for (const auto& bl: val_vec) { | |
1022 | if (bl.length() == 0) | |
1023 | break; | |
1024 | ||
1025 | rgw_bucket_dir_entry ent; | |
1026 | auto iter = bl.cbegin(); | |
1027 | ent.decode(iter); | |
1028 | ||
1029 | if (prefix.size() && | |
1030 | (0 != ent.key.name.compare(0, prefix.size(), prefix))) { | |
1031 | ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << | |
1032 | ": skippping \"" << ent.key << | |
1033 | "\" because doesn't match prefix" << dendl; | |
1034 | continue; | |
1035 | } | |
1036 | ||
1037 | rgw_obj_key key(ent.key); | |
1038 | uploads.push_back(this->get_multipart_upload(key.name)); | |
1039 | last_obj_key = key; | |
1040 | ocount++; | |
1041 | if (ocount == max_uploads) { | |
1042 | *is_truncated = true; | |
1043 | break; | |
1044 | } | |
1045 | } | |
1046 | marker = last_obj_key.name; | |
1047 | ||
1048 | // What is common prefix? We don't handle it for now. | |
1049 | ||
1050 | return 0; | |
1051 | ||
1052 | } | |
1053 | ||
1054 | int MotrBucket::abort_multiparts(const DoutPrefixProvider *dpp, CephContext *cct) | |
1055 | { | |
1056 | return 0; | |
1057 | } | |
1058 | ||
1059 | void MotrStore::finalize(void) | |
1060 | { | |
1061 | // close connection with motr | |
1062 | m0_client_fini(this->instance, true); | |
1063 | } | |
1064 | ||
1065 | const std::string& MotrZoneGroup::get_endpoint() const | |
1066 | { | |
1067 | if (!group.endpoints.empty()) { | |
1068 | return group.endpoints.front(); | |
1069 | } else { | |
1070 | // use zonegroup's master zone endpoints | |
1071 | auto z = group.zones.find(group.master_zone); | |
1072 | if (z != group.zones.end() && !z->second.endpoints.empty()) { | |
1073 | return z->second.endpoints.front(); | |
1074 | } | |
1075 | } | |
1076 | return empty; | |
1077 | } | |
1078 | ||
1079 | bool MotrZoneGroup::placement_target_exists(std::string& target) const | |
1080 | { | |
1081 | return !!group.placement_targets.count(target); | |
1082 | } | |
1083 | ||
aee94f69 | 1084 | void MotrZoneGroup::get_placement_target_names(std::set<std::string>& names) const |
1e59de90 TL |
1085 | { |
1086 | for (const auto& target : group.placement_targets) { | |
1087 | names.emplace(target.second.name); | |
1088 | } | |
1e59de90 TL |
1089 | } |
1090 | ||
1091 | int MotrZoneGroup::get_placement_tier(const rgw_placement_rule& rule, | |
1092 | std::unique_ptr<PlacementTier>* tier) | |
1093 | { | |
1094 | std::map<std::string, RGWZoneGroupPlacementTarget>::const_iterator titer; | |
1095 | titer = group.placement_targets.find(rule.name); | |
1096 | if (titer == group.placement_targets.end()) { | |
1097 | return -ENOENT; | |
1098 | } | |
1099 | ||
1100 | const auto& target_rule = titer->second; | |
1101 | std::map<std::string, RGWZoneGroupPlacementTier>::const_iterator ttier; | |
1102 | ttier = target_rule.tier_targets.find(rule.storage_class); | |
1103 | if (ttier == target_rule.tier_targets.end()) { | |
1104 | // not found | |
1105 | return -ENOENT; | |
1106 | } | |
1107 | ||
1108 | PlacementTier* t; | |
1109 | t = new MotrPlacementTier(store, ttier->second); | |
1110 | if (!t) | |
1111 | return -ENOMEM; | |
1112 | ||
1113 | tier->reset(t); | |
1114 | return 0; | |
1115 | } | |
1116 | ||
1117 | ZoneGroup& MotrZone::get_zonegroup() | |
1118 | { | |
1119 | return zonegroup; | |
1120 | } | |
1121 | ||
1122 | const std::string& MotrZone::get_id() | |
1123 | { | |
1124 | return zone_params->get_id(); | |
1125 | } | |
1126 | ||
1127 | const std::string& MotrZone::get_name() const | |
1128 | { | |
1129 | return zone_params->get_name(); | |
1130 | } | |
1131 | ||
1132 | bool MotrZone::is_writeable() | |
1133 | { | |
1134 | return true; | |
1135 | } | |
1136 | ||
1137 | bool MotrZone::get_redirect_endpoint(std::string* endpoint) | |
1138 | { | |
1139 | return false; | |
1140 | } | |
1141 | ||
1142 | bool MotrZone::has_zonegroup_api(const std::string& api) const | |
1143 | { | |
1144 | return (zonegroup->api_name == api); | |
1145 | } | |
1146 | ||
1147 | const std::string& MotrZone::get_current_period_id() | |
1148 | { | |
1149 | return current_period->get_id(); | |
1150 | } | |
1151 | ||
1152 | std::unique_ptr<LuaManager> MotrStore::get_lua_manager() | |
1153 | { | |
1154 | return std::make_unique<MotrLuaManager>(this); | |
1155 | } | |
1156 | ||
1157 | int MotrObject::get_obj_state(const DoutPrefixProvider* dpp, RGWObjState **_state, optional_yield y, bool follow_olh) | |
1158 | { | |
1159 | // Get object's metadata (those stored in rgw_bucket_dir_entry). | |
1160 | bufferlist bl; | |
1161 | if (this->store->get_obj_meta_cache()->get(dpp, this->get_key().get_oid(), bl)) { | |
1162 | // Cache misses. | |
1163 | string bucket_index_iname = "motr.rgw.bucket.index." + this->get_bucket()->get_name(); | |
1164 | int rc = this->store->do_idx_op_by_name(bucket_index_iname, | |
1165 | M0_IC_GET, this->get_key().get_oid(), bl); | |
1166 | if (rc < 0) { | |
1167 | ldpp_dout(dpp, 0) << "Failed to get object's entry from bucket index. " << dendl; | |
1168 | return rc; | |
1169 | } | |
1170 | ||
1171 | // Put into cache. | |
1172 | this->store->get_obj_meta_cache()->put(dpp, this->get_key().get_oid(), bl); | |
1173 | } | |
1174 | ||
1175 | rgw_bucket_dir_entry ent; | |
1176 | bufferlist& blr = bl; | |
1177 | auto iter = blr.cbegin(); | |
1178 | ent.decode(iter); | |
1179 | ||
1180 | // Set object's type. | |
1181 | this->category = ent.meta.category; | |
1182 | ||
1183 | // Set object state. | |
1184 | state.exists = true; | |
1185 | state.size = ent.meta.size; | |
1186 | state.accounted_size = ent.meta.size; | |
1187 | state.mtime = ent.meta.mtime; | |
1188 | ||
1189 | state.has_attrs = true; | |
1190 | bufferlist etag_bl; | |
1191 | string& etag = ent.meta.etag; | |
1192 | ldpp_dout(dpp, 20) <<__func__<< ": object's etag: " << ent.meta.etag << dendl; | |
1193 | etag_bl.append(etag); | |
1194 | state.attrset[RGW_ATTR_ETAG] = etag_bl; | |
1195 | ||
1196 | return 0; | |
1197 | } | |
1198 | ||
1199 | MotrObject::~MotrObject() { | |
1200 | this->close_mobj(); | |
1201 | } | |
1202 | ||
1203 | // int MotrObject::read_attrs(const DoutPrefixProvider* dpp, Motr::Object::Read &read_op, optional_yield y, rgw_obj* target_obj) | |
1204 | // { | |
1205 | // read_op.params.attrs = &attrs; | |
1206 | // read_op.params.target_obj = target_obj; | |
1207 | // read_op.params.obj_size = &obj_size; | |
1208 | // read_op.params.lastmod = &mtime; | |
1209 | // | |
1210 | // return read_op.prepare(dpp); | |
1211 | // } | |
1212 | ||
1213 | int MotrObject::set_obj_attrs(const DoutPrefixProvider* dpp, Attrs* setattrs, Attrs* delattrs, optional_yield y) | |
1214 | { | |
1215 | // TODO: implement | |
1216 | ldpp_dout(dpp, 20) <<__func__<< ": MotrObject::set_obj_attrs()" << dendl; | |
1217 | return 0; | |
1218 | } | |
1219 | ||
1220 | int MotrObject::get_obj_attrs(optional_yield y, const DoutPrefixProvider* dpp, rgw_obj* target_obj) | |
1221 | { | |
1222 | if (this->category == RGWObjCategory::MultiMeta) | |
1223 | return 0; | |
1224 | ||
1225 | string bname, key; | |
1226 | if (target_obj) { | |
1227 | bname = target_obj->bucket.name; | |
1228 | key = target_obj->key.get_oid(); | |
1229 | } else { | |
1230 | bname = this->get_bucket()->get_name(); | |
1231 | key = this->get_key().get_oid(); | |
1232 | } | |
1233 | ldpp_dout(dpp, 20) << "MotrObject::get_obj_attrs(): " | |
1234 | << bname << "/" << key << dendl; | |
1235 | ||
1236 | // Get object's metadata (those stored in rgw_bucket_dir_entry). | |
1237 | bufferlist bl; | |
1238 | if (this->store->get_obj_meta_cache()->get(dpp, key, bl)) { | |
1239 | // Cache misses. | |
1240 | string bucket_index_iname = "motr.rgw.bucket.index." + bname; | |
1241 | int rc = this->store->do_idx_op_by_name(bucket_index_iname, M0_IC_GET, key, bl); | |
1242 | if (rc < 0) { | |
1243 | ldpp_dout(dpp, 0) << "Failed to get object's entry from bucket index. " << dendl; | |
1244 | return rc; | |
1245 | } | |
1246 | ||
1247 | // Put into cache. | |
1248 | this->store->get_obj_meta_cache()->put(dpp, key, bl); | |
1249 | } | |
1250 | ||
1251 | rgw_bucket_dir_entry ent; | |
1252 | bufferlist& blr = bl; | |
1253 | auto iter = blr.cbegin(); | |
1254 | ent.decode(iter); | |
1255 | decode(attrs, iter); | |
1256 | ||
1257 | return 0; | |
1258 | } | |
1259 | ||
1260 | int MotrObject::modify_obj_attrs(const char* attr_name, bufferlist& attr_val, optional_yield y, const DoutPrefixProvider* dpp) | |
1261 | { | |
1262 | rgw_obj target = get_obj(); | |
1263 | int r = get_obj_attrs(y, dpp, &target); | |
1264 | if (r < 0) { | |
1265 | return r; | |
1266 | } | |
1267 | set_atomic(); | |
1268 | attrs[attr_name] = attr_val; | |
1269 | return set_obj_attrs(dpp, &attrs, nullptr, y); | |
1270 | } | |
1271 | ||
1272 | int MotrObject::delete_obj_attrs(const DoutPrefixProvider* dpp, const char* attr_name, optional_yield y) | |
1273 | { | |
1274 | rgw_obj target = get_obj(); | |
1275 | Attrs rmattr; | |
1276 | bufferlist bl; | |
1277 | ||
1278 | set_atomic(); | |
1279 | rmattr[attr_name] = bl; | |
1280 | return set_obj_attrs(dpp, nullptr, &rmattr, y); | |
1281 | } | |
1282 | ||
1283 | bool MotrObject::is_expired() { | |
1284 | return false; | |
1285 | } | |
1286 | ||
1287 | // Taken from rgw_rados.cc | |
1288 | void MotrObject::gen_rand_obj_instance_name() | |
1289 | { | |
1290 | enum {OBJ_INSTANCE_LEN = 32}; | |
1291 | char buf[OBJ_INSTANCE_LEN + 1]; | |
1292 | ||
1293 | gen_rand_alphanumeric_no_underscore(store->ctx(), buf, OBJ_INSTANCE_LEN); | |
1294 | state.obj.key.set_instance(buf); | |
1295 | } | |
1296 | ||
1297 | int MotrObject::omap_get_vals(const DoutPrefixProvider *dpp, const std::string& marker, uint64_t count, | |
1298 | std::map<std::string, bufferlist> *m, | |
1299 | bool* pmore, optional_yield y) | |
1300 | { | |
1301 | return 0; | |
1302 | } | |
1303 | ||
1304 | int MotrObject::omap_get_all(const DoutPrefixProvider *dpp, std::map<std::string, bufferlist> *m, | |
1305 | optional_yield y) | |
1306 | { | |
1307 | return 0; | |
1308 | } | |
1309 | ||
1310 | int MotrObject::omap_get_vals_by_keys(const DoutPrefixProvider *dpp, const std::string& oid, | |
1311 | const std::set<std::string>& keys, | |
1312 | Attrs* vals) | |
1313 | { | |
1314 | return 0; | |
1315 | } | |
1316 | ||
1317 | int MotrObject::omap_set_val_by_key(const DoutPrefixProvider *dpp, const std::string& key, bufferlist& val, | |
1318 | bool must_exist, optional_yield y) | |
1319 | { | |
1320 | return 0; | |
1321 | } | |
1322 | ||
1323 | int MotrObject::chown(User& new_user, const DoutPrefixProvider* dpp, optional_yield y) | |
1324 | { | |
1325 | return 0; | |
1326 | } | |
1327 | ||
1328 | std::unique_ptr<MPSerializer> MotrObject::get_serializer(const DoutPrefixProvider *dpp, | |
1329 | const std::string& lock_name) | |
1330 | { | |
1331 | return std::make_unique<MPMotrSerializer>(dpp, store, this, lock_name); | |
1332 | } | |
1333 | ||
1334 | int MotrObject::transition(Bucket* bucket, | |
1335 | const rgw_placement_rule& placement_rule, | |
1336 | const real_time& mtime, | |
1337 | uint64_t olh_epoch, | |
1338 | const DoutPrefixProvider* dpp, | |
1339 | optional_yield y) | |
1340 | { | |
1341 | return 0; | |
1342 | } | |
1343 | ||
1344 | bool MotrObject::placement_rules_match(rgw_placement_rule& r1, rgw_placement_rule& r2) | |
1345 | { | |
1346 | /* XXX: support single default zone and zonegroup for now */ | |
1347 | return true; | |
1348 | } | |
1349 | ||
1350 | int MotrObject::dump_obj_layout(const DoutPrefixProvider *dpp, optional_yield y, Formatter* f) | |
1351 | { | |
1352 | return 0; | |
1353 | } | |
1354 | ||
1355 | std::unique_ptr<Object::ReadOp> MotrObject::get_read_op() | |
1356 | { | |
1357 | return std::make_unique<MotrObject::MotrReadOp>(this); | |
1358 | } | |
1359 | ||
1360 | MotrObject::MotrReadOp::MotrReadOp(MotrObject *_source) : | |
1361 | source(_source) | |
1362 | { } | |
1363 | ||
1364 | int MotrObject::MotrReadOp::prepare(optional_yield y, const DoutPrefixProvider* dpp) | |
1365 | { | |
1366 | int rc; | |
1367 | ldpp_dout(dpp, 20) <<__func__<< ": bucket=" << source->get_bucket()->get_name() << dendl; | |
1368 | ||
1369 | rgw_bucket_dir_entry ent; | |
1370 | rc = source->get_bucket_dir_ent(dpp, ent); | |
1371 | if (rc < 0) | |
1372 | return rc; | |
1373 | ||
1374 | // Set source object's attrs. The attrs is key/value map and is used | |
1375 | // in send_response_data() to set attributes, including etag. | |
1376 | bufferlist etag_bl; | |
1377 | string& etag = ent.meta.etag; | |
1378 | ldpp_dout(dpp, 20) <<__func__<< ": object's etag: " << ent.meta.etag << dendl; | |
1379 | etag_bl.append(etag.c_str(), etag.size()); | |
1380 | source->get_attrs().emplace(std::move(RGW_ATTR_ETAG), std::move(etag_bl)); | |
1381 | ||
1382 | source->set_key(ent.key); | |
1383 | source->set_obj_size(ent.meta.size); | |
1384 | source->category = ent.meta.category; | |
1385 | *params.lastmod = ent.meta.mtime; | |
1386 | ||
1387 | if (params.mod_ptr || params.unmod_ptr) { | |
1388 | // Convert all times go GMT to make them compatible | |
1389 | obj_time_weight src_weight; | |
1390 | src_weight.init(*params.lastmod, params.mod_zone_id, params.mod_pg_ver); | |
1391 | src_weight.high_precision = params.high_precision_time; | |
1392 | ||
1393 | obj_time_weight dest_weight; | |
1394 | dest_weight.high_precision = params.high_precision_time; | |
1395 | ||
1396 | // Check if-modified-since condition | |
1397 | if (params.mod_ptr && !params.if_nomatch) { | |
1398 | dest_weight.init(*params.mod_ptr, params.mod_zone_id, params.mod_pg_ver); | |
1399 | ldpp_dout(dpp, 10) << "If-Modified-Since: " << dest_weight << " & " | |
1400 | << "Last-Modified: " << src_weight << dendl; | |
1401 | if (!(dest_weight < src_weight)) { | |
1402 | return -ERR_NOT_MODIFIED; | |
1403 | } | |
1404 | } | |
1405 | ||
1406 | // Check if-unmodified-since condition | |
1407 | if (params.unmod_ptr && !params.if_match) { | |
1408 | dest_weight.init(*params.unmod_ptr, params.mod_zone_id, params.mod_pg_ver); | |
1409 | ldpp_dout(dpp, 10) << "If-UnModified-Since: " << dest_weight << " & " | |
1410 | << "Last-Modified: " << src_weight << dendl; | |
1411 | if (dest_weight < src_weight) { | |
1412 | return -ERR_PRECONDITION_FAILED; | |
1413 | } | |
1414 | } | |
1415 | } | |
1416 | // Check if-match condition | |
1417 | if (params.if_match) { | |
1418 | string if_match_str = rgw_string_unquote(params.if_match); | |
1419 | ldpp_dout(dpp, 10) << "ETag: " << etag << " & " | |
1420 | << "If-Match: " << if_match_str << dendl; | |
1421 | if (if_match_str.compare(etag) != 0) { | |
1422 | return -ERR_PRECONDITION_FAILED; | |
1423 | } | |
1424 | } | |
1425 | // Check if-none-match condition | |
1426 | if (params.if_nomatch) { | |
1427 | string if_nomatch_str = rgw_string_unquote(params.if_nomatch); | |
1428 | ldpp_dout(dpp, 10) << "ETag: " << etag << " & " | |
1429 | << "If-NoMatch: " << if_nomatch_str << dendl; | |
1430 | if (if_nomatch_str.compare(etag) == 0) { | |
1431 | return -ERR_NOT_MODIFIED; | |
1432 | } | |
1433 | } | |
1434 | ||
1435 | // Skip opening an empty object. | |
1436 | if(source->get_obj_size() == 0) | |
1437 | return 0; | |
1438 | ||
1439 | // Open the object here. | |
1440 | if (source->category == RGWObjCategory::MultiMeta) { | |
1441 | ldpp_dout(dpp, 20) <<__func__<< ": open obj parts..." << dendl; | |
1442 | rc = source->get_part_objs(dpp, this->part_objs)? : | |
1443 | source->open_part_objs(dpp, this->part_objs); | |
1444 | return rc; | |
1445 | } else { | |
1446 | ldpp_dout(dpp, 20) <<__func__<< ": open object..." << dendl; | |
1447 | return source->open_mobj(dpp); | |
1448 | } | |
1449 | } | |
1450 | ||
1451 | int MotrObject::MotrReadOp::read(int64_t off, int64_t end, bufferlist& bl, optional_yield y, const DoutPrefixProvider* dpp) | |
1452 | { | |
1453 | ldpp_dout(dpp, 20) << "MotrReadOp::read(): sync read." << dendl; | |
1454 | return 0; | |
1455 | } | |
1456 | ||
1457 | // RGWGetObj::execute() calls ReadOp::iterate() to read object from 'off' to 'end'. | |
1458 | // The returned data is processed in 'cb' which is a chain of post-processing | |
1459 | // filters such as decompression, de-encryption and sending back data to client | |
1460 | // (RGWGetObj_CB::handle_dta which in turn calls RGWGetObj::get_data_cb() to | |
1461 | // send data back.). | |
1462 | // | |
1463 | // POC implements a simple sync version of iterate() function in which it reads | |
1464 | // a block of data each time and call 'cb' for post-processing. | |
1465 | int MotrObject::MotrReadOp::iterate(const DoutPrefixProvider* dpp, int64_t off, int64_t end, RGWGetDataCB* cb, optional_yield y) | |
1466 | { | |
1467 | int rc; | |
1468 | ||
1469 | if (source->category == RGWObjCategory::MultiMeta) | |
1470 | rc = source->read_multipart_obj(dpp, off, end, cb, part_objs); | |
1471 | else | |
1472 | rc = source->read_mobj(dpp, off, end, cb); | |
1473 | ||
1474 | return rc; | |
1475 | } | |
1476 | ||
1477 | int MotrObject::MotrReadOp::get_attr(const DoutPrefixProvider* dpp, const char* name, bufferlist& dest, optional_yield y) | |
1478 | { | |
1479 | //return 0; | |
1480 | return -ENODATA; | |
1481 | } | |
1482 | ||
1483 | std::unique_ptr<Object::DeleteOp> MotrObject::get_delete_op() | |
1484 | { | |
1485 | return std::make_unique<MotrObject::MotrDeleteOp>(this); | |
1486 | } | |
1487 | ||
1488 | MotrObject::MotrDeleteOp::MotrDeleteOp(MotrObject *_source) : | |
1489 | source(_source) | |
1490 | { } | |
1491 | ||
1492 | // Implementation of DELETE OBJ also requires MotrObject::get_obj_state() | |
1493 | // to retrieve and set object's state from object's metadata. | |
1494 | // | |
1495 | // TODO: | |
1496 | // 1. The POC only remove the object's entry from bucket index and delete | |
1497 | // corresponding Motr objects. It doesn't handle the DeleteOp::params. | |
1498 | // Delete::delete_obj() in rgw_rados.cc shows how rados backend process the | |
1499 | // params. | |
1500 | // 2. Delete an object when its versioning is turned on. | |
1501 | int MotrObject::MotrDeleteOp::delete_obj(const DoutPrefixProvider* dpp, optional_yield y) | |
1502 | { | |
1503 | ldpp_dout(dpp, 20) << "delete " << source->get_key().get_oid() << " from " << source->get_bucket()->get_name() << dendl; | |
1504 | ||
1505 | rgw_bucket_dir_entry ent; | |
1506 | int rc = source->get_bucket_dir_ent(dpp, ent); | |
1507 | if (rc < 0) { | |
1508 | return rc; | |
1509 | } | |
1510 | ||
1511 | //TODO: When integrating with background GC for object deletion, | |
1512 | // we should consider adding object entry to GC before deleting the metadata. | |
1513 | // Delete from the cache first. | |
1514 | source->store->get_obj_meta_cache()->remove(dpp, source->get_key().get_oid()); | |
1515 | ||
1516 | // Delete the object's entry from the bucket index. | |
1517 | bufferlist bl; | |
1518 | string bucket_index_iname = "motr.rgw.bucket.index." + source->get_bucket()->get_name(); | |
1519 | rc = source->store->do_idx_op_by_name(bucket_index_iname, | |
1520 | M0_IC_DEL, source->get_key().get_oid(), bl); | |
1521 | if (rc < 0) { | |
1522 | ldpp_dout(dpp, 0) << "Failed to del object's entry from bucket index. " << dendl; | |
1523 | return rc; | |
1524 | } | |
1525 | ||
1526 | if (ent.meta.size == 0) { | |
1527 | ldpp_dout(dpp, 0) << __func__ << ": Object size is 0, not deleting motr object." << dendl; | |
1528 | return 0; | |
1529 | } | |
1530 | // Remove the motr objects. | |
1531 | if (source->category == RGWObjCategory::MultiMeta) | |
1532 | rc = source->delete_part_objs(dpp); | |
1533 | else | |
1534 | rc = source->delete_mobj(dpp); | |
1535 | if (rc < 0) { | |
1536 | ldpp_dout(dpp, 0) << "Failed to delete the object from Motr. " << dendl; | |
1537 | return rc; | |
1538 | } | |
1539 | ||
1540 | //result.delete_marker = parent_op.result.delete_marker; | |
1541 | //result.version_id = parent_op.result.version_id; | |
1542 | return 0; | |
1543 | } | |
1544 | ||
1545 | int MotrObject::delete_object(const DoutPrefixProvider* dpp, optional_yield y, bool prevent_versioning) | |
1546 | { | |
1547 | MotrObject::MotrDeleteOp del_op(this); | |
1548 | del_op.params.bucket_owner = bucket->get_info().owner; | |
1549 | del_op.params.versioning_status = bucket->get_info().versioning_status(); | |
1550 | ||
1551 | return del_op.delete_obj(dpp, y); | |
1552 | } | |
1553 | ||
1554 | int MotrObject::delete_obj_aio(const DoutPrefixProvider* dpp, RGWObjState* astate, | |
1555 | Completions* aio, bool keep_index_consistent, | |
1556 | optional_yield y) | |
1557 | { | |
1558 | /* XXX: Make it async */ | |
1559 | return 0; | |
1560 | } | |
1561 | ||
1562 | int MotrObject::copy_object(User* user, | |
1563 | req_info* info, | |
1564 | const rgw_zone_id& source_zone, | |
1565 | rgw::sal::Object* dest_object, | |
1566 | rgw::sal::Bucket* dest_bucket, | |
1567 | rgw::sal::Bucket* src_bucket, | |
1568 | const rgw_placement_rule& dest_placement, | |
1569 | ceph::real_time* src_mtime, | |
1570 | ceph::real_time* mtime, | |
1571 | const ceph::real_time* mod_ptr, | |
1572 | const ceph::real_time* unmod_ptr, | |
1573 | bool high_precision_time, | |
1574 | const char* if_match, | |
1575 | const char* if_nomatch, | |
1576 | AttrsMod attrs_mod, | |
1577 | bool copy_if_newer, | |
1578 | Attrs& attrs, | |
1579 | RGWObjCategory category, | |
1580 | uint64_t olh_epoch, | |
1581 | boost::optional<ceph::real_time> delete_at, | |
1582 | std::string* version_id, | |
1583 | std::string* tag, | |
1584 | std::string* etag, | |
1585 | void (*progress_cb)(off_t, void *), | |
1586 | void* progress_data, | |
1587 | const DoutPrefixProvider* dpp, | |
1588 | optional_yield y) | |
1589 | { | |
1590 | return 0; | |
1591 | } | |
1592 | ||
1593 | int MotrObject::swift_versioning_restore(bool& restored, | |
1594 | const DoutPrefixProvider* dpp) | |
1595 | { | |
1596 | return 0; | |
1597 | } | |
1598 | ||
1599 | int MotrObject::swift_versioning_copy(const DoutPrefixProvider* dpp, | |
1600 | optional_yield y) | |
1601 | { | |
1602 | return 0; | |
1603 | } | |
1604 | ||
1605 | MotrAtomicWriter::MotrAtomicWriter(const DoutPrefixProvider *dpp, | |
1606 | optional_yield y, | |
1607 | rgw::sal::Object* obj, | |
1608 | MotrStore* _store, | |
1609 | const rgw_user& _owner, | |
1610 | const rgw_placement_rule *_ptail_placement_rule, | |
1611 | uint64_t _olh_epoch, | |
1612 | const std::string& _unique_tag) : | |
1613 | StoreWriter(dpp, y), | |
1614 | store(_store), | |
1615 | owner(_owner), | |
1616 | ptail_placement_rule(_ptail_placement_rule), | |
1617 | olh_epoch(_olh_epoch), | |
1618 | unique_tag(_unique_tag), | |
1619 | obj(_store, obj->get_key(), obj->get_bucket()), | |
1620 | old_obj(_store, obj->get_key(), obj->get_bucket()) {} | |
1621 | ||
1622 | static const unsigned MAX_BUFVEC_NR = 256; | |
1623 | ||
1624 | int MotrAtomicWriter::prepare(optional_yield y) | |
1625 | { | |
1626 | total_data_size = 0; | |
1627 | ||
1628 | if (obj.is_opened()) | |
1629 | return 0; | |
1630 | ||
1631 | rgw_bucket_dir_entry ent; | |
1632 | int rc = old_obj.get_bucket_dir_ent(dpp, ent); | |
1633 | if (rc == 0) { | |
1634 | ldpp_dout(dpp, 20) << __func__ << ": object exists." << dendl; | |
1635 | } | |
1636 | ||
1637 | rc = m0_bufvec_empty_alloc(&buf, MAX_BUFVEC_NR) ?: | |
1638 | m0_bufvec_alloc(&attr, MAX_BUFVEC_NR, 1) ?: | |
1639 | m0_indexvec_alloc(&ext, MAX_BUFVEC_NR); | |
1640 | if (rc != 0) | |
1641 | this->cleanup(); | |
1642 | ||
1643 | return rc; | |
1644 | } | |
1645 | ||
1646 | int MotrObject::create_mobj(const DoutPrefixProvider *dpp, uint64_t sz) | |
1647 | { | |
1648 | if (mobj != nullptr) { | |
1649 | ldpp_dout(dpp, 0) <<__func__<< "ERROR: object is already opened" << dendl; | |
1650 | return -EINVAL; | |
1651 | } | |
1652 | ||
1653 | int rc = m0_ufid_next(&ufid_gr, 1, &meta.oid); | |
1654 | if (rc != 0) { | |
1655 | ldpp_dout(dpp, 0) <<__func__<< "ERROR: m0_ufid_next() failed: " << rc << dendl; | |
1656 | return rc; | |
1657 | } | |
1658 | ||
1659 | char fid_str[M0_FID_STR_LEN]; | |
1660 | snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&meta.oid)); | |
1661 | ldpp_dout(dpp, 20) <<__func__<< ": sz=" << sz << " oid=" << fid_str << dendl; | |
1662 | ||
1663 | int64_t lid = m0_layout_find_by_objsz(store->instance, nullptr, sz); | |
1664 | M0_ASSERT(lid > 0); | |
1665 | ||
1666 | M0_ASSERT(mobj == nullptr); | |
1667 | mobj = new m0_obj(); | |
1668 | m0_obj_init(mobj, &store->container.co_realm, &meta.oid, lid); | |
1669 | ||
1670 | struct m0_op *op = nullptr; | |
1671 | mobj->ob_entity.en_flags |= M0_ENF_META; | |
1672 | rc = m0_entity_create(nullptr, &mobj->ob_entity, &op); | |
1673 | if (rc != 0) { | |
1674 | this->close_mobj(); | |
1675 | ldpp_dout(dpp, 0) << "ERROR: m0_entity_create() failed: " << rc << dendl; | |
1676 | return rc; | |
1677 | } | |
1678 | ldpp_dout(dpp, 20) <<__func__<< ": call m0_op_launch()..." << dendl; | |
1679 | m0_op_launch(&op, 1); | |
1680 | rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: | |
1681 | m0_rc(op); | |
1682 | m0_op_fini(op); | |
1683 | m0_op_free(op); | |
1684 | ||
1685 | if (rc != 0) { | |
1686 | this->close_mobj(); | |
1687 | ldpp_dout(dpp, 0) << "ERROR: failed to create motr object: " << rc << dendl; | |
1688 | return rc; | |
1689 | } | |
1690 | ||
1691 | meta.layout_id = mobj->ob_attr.oa_layout_id; | |
1692 | meta.pver = mobj->ob_attr.oa_pver; | |
1693 | ldpp_dout(dpp, 20) <<__func__<< ": lid=0x" << std::hex << meta.layout_id | |
1694 | << std::dec << " rc=" << rc << dendl; | |
1695 | ||
1696 | // TODO: add key:user+bucket+key+obj.meta.oid value:timestamp to | |
1697 | // gc.queue.index. See more at github.com/Seagate/cortx-rgw/issues/7. | |
1698 | ||
1699 | return rc; | |
1700 | } | |
1701 | ||
1702 | int MotrObject::open_mobj(const DoutPrefixProvider *dpp) | |
1703 | { | |
1704 | char fid_str[M0_FID_STR_LEN]; | |
1705 | snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&meta.oid)); | |
1706 | ldpp_dout(dpp, 20) <<__func__<< ": oid=" << fid_str << dendl; | |
1707 | ||
1708 | int rc; | |
1709 | if (meta.layout_id == 0) { | |
1710 | rgw_bucket_dir_entry ent; | |
1711 | rc = this->get_bucket_dir_ent(dpp, ent); | |
1712 | if (rc < 0) { | |
1713 | ldpp_dout(dpp, 0) << "ERROR: open_mobj() failed: rc=" << rc << dendl; | |
1714 | return rc; | |
1715 | } | |
1716 | } | |
1717 | ||
1718 | if (meta.layout_id == 0) | |
1719 | return -ENOENT; | |
1720 | ||
1721 | M0_ASSERT(mobj == nullptr); | |
1722 | mobj = new m0_obj(); | |
1723 | memset(mobj, 0, sizeof *mobj); | |
1724 | m0_obj_init(mobj, &store->container.co_realm, &meta.oid, store->conf.mc_layout_id); | |
1725 | ||
1726 | struct m0_op *op = nullptr; | |
1727 | mobj->ob_attr.oa_layout_id = meta.layout_id; | |
1728 | mobj->ob_attr.oa_pver = meta.pver; | |
1729 | mobj->ob_entity.en_flags |= M0_ENF_META; | |
1730 | rc = m0_entity_open(&mobj->ob_entity, &op); | |
1731 | if (rc != 0) { | |
1732 | ldpp_dout(dpp, 0) << "ERROR: m0_entity_open() failed: rc=" << rc << dendl; | |
1733 | this->close_mobj(); | |
1734 | return rc; | |
1735 | } | |
1736 | m0_op_launch(&op, 1); | |
1737 | rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: | |
1738 | m0_rc(op); | |
1739 | m0_op_fini(op); | |
1740 | m0_op_free(op); | |
1741 | ||
1742 | if (rc < 0) { | |
1743 | ldpp_dout(dpp, 10) << "ERROR: failed to open motr object: rc=" << rc << dendl; | |
1744 | this->close_mobj(); | |
1745 | return rc; | |
1746 | } | |
1747 | ||
1748 | ldpp_dout(dpp, 20) <<__func__<< ": rc=" << rc << dendl; | |
1749 | ||
1750 | return 0; | |
1751 | } | |
1752 | ||
1753 | int MotrObject::delete_mobj(const DoutPrefixProvider *dpp) | |
1754 | { | |
1755 | int rc; | |
1756 | char fid_str[M0_FID_STR_LEN]; | |
1757 | snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&meta.oid)); | |
1758 | if (!meta.oid.u_hi || !meta.oid.u_lo) { | |
1759 | ldpp_dout(dpp, 20) << __func__ << ": invalid motr object oid=" << fid_str << dendl; | |
1760 | return -EINVAL; | |
1761 | } | |
1762 | ldpp_dout(dpp, 20) << __func__ << ": deleting motr object oid=" << fid_str << dendl; | |
1763 | ||
1764 | // Open the object. | |
1765 | if (mobj == nullptr) { | |
1766 | rc = this->open_mobj(dpp); | |
1767 | if (rc < 0) | |
1768 | return rc; | |
1769 | } | |
1770 | ||
1771 | // Create an DELETE op and execute it (sync version). | |
1772 | struct m0_op *op = nullptr; | |
1773 | mobj->ob_entity.en_flags |= M0_ENF_META; | |
1774 | rc = m0_entity_delete(&mobj->ob_entity, &op); | |
1775 | if (rc != 0) { | |
1776 | ldpp_dout(dpp, 0) << "ERROR: m0_entity_delete() failed: " << rc << dendl; | |
1777 | return rc; | |
1778 | } | |
1779 | m0_op_launch(&op, 1); | |
1780 | rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: | |
1781 | m0_rc(op); | |
1782 | m0_op_fini(op); | |
1783 | m0_op_free(op); | |
1784 | ||
1785 | if (rc < 0) { | |
1786 | ldpp_dout(dpp, 0) << "ERROR: failed to open motr object: " << rc << dendl; | |
1787 | return rc; | |
1788 | } | |
1789 | ||
1790 | this->close_mobj(); | |
1791 | ||
1792 | return 0; | |
1793 | } | |
1794 | ||
1795 | void MotrObject::close_mobj() | |
1796 | { | |
1797 | if (mobj == nullptr) | |
1798 | return; | |
1799 | m0_obj_fini(mobj); | |
1800 | delete mobj; mobj = nullptr; | |
1801 | } | |
1802 | ||
1803 | int MotrObject::write_mobj(const DoutPrefixProvider *dpp, bufferlist&& data, uint64_t offset) | |
1804 | { | |
1805 | int rc; | |
1806 | unsigned bs, left; | |
1807 | struct m0_op *op; | |
1808 | char *start, *p; | |
1809 | struct m0_bufvec buf; | |
1810 | struct m0_bufvec attr; | |
1811 | struct m0_indexvec ext; | |
1812 | ||
1813 | left = data.length(); | |
1814 | if (left == 0) | |
1815 | return 0; | |
1816 | ||
1817 | rc = m0_bufvec_empty_alloc(&buf, 1) ?: | |
1818 | m0_bufvec_alloc(&attr, 1, 1) ?: | |
1819 | m0_indexvec_alloc(&ext, 1); | |
1820 | if (rc != 0) | |
1821 | goto out; | |
1822 | ||
1823 | bs = this->get_optimal_bs(left); | |
1824 | ldpp_dout(dpp, 20) <<__func__<< ": left=" << left << " bs=" << bs << dendl; | |
1825 | ||
1826 | start = data.c_str(); | |
1827 | ||
1828 | for (p = start; left > 0; left -= bs, p += bs, offset += bs) { | |
1829 | if (left < bs) | |
1830 | bs = this->get_optimal_bs(left); | |
1831 | if (left < bs) { | |
1832 | data.append_zero(bs - left); | |
1833 | left = bs; | |
1834 | p = data.c_str(); | |
1835 | } | |
1836 | buf.ov_buf[0] = p; | |
1837 | buf.ov_vec.v_count[0] = bs; | |
1838 | ext.iv_index[0] = offset; | |
1839 | ext.iv_vec.v_count[0] = bs; | |
1840 | attr.ov_vec.v_count[0] = 0; | |
1841 | ||
1842 | op = nullptr; | |
1843 | rc = m0_obj_op(this->mobj, M0_OC_WRITE, &ext, &buf, &attr, 0, 0, &op); | |
1844 | if (rc != 0) | |
1845 | goto out; | |
1846 | m0_op_launch(&op, 1); | |
1847 | rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: | |
1848 | m0_rc(op); | |
1849 | m0_op_fini(op); | |
1850 | m0_op_free(op); | |
1851 | if (rc != 0) | |
1852 | goto out; | |
1853 | } | |
1854 | ||
1855 | out: | |
1856 | m0_indexvec_free(&ext); | |
1857 | m0_bufvec_free(&attr); | |
1858 | m0_bufvec_free2(&buf); | |
1859 | return rc; | |
1860 | } | |
1861 | ||
1862 | int MotrObject::read_mobj(const DoutPrefixProvider* dpp, int64_t off, int64_t end, RGWGetDataCB* cb) | |
1863 | { | |
1864 | int rc; | |
1865 | unsigned bs, actual, left; | |
1866 | struct m0_op *op; | |
1867 | struct m0_bufvec buf; | |
1868 | struct m0_bufvec attr; | |
1869 | struct m0_indexvec ext; | |
1870 | ||
1871 | // make end pointer exclusive: | |
1872 | // it's easier to work with it this way | |
1873 | end++; | |
1874 | ldpp_dout(dpp, 20) << "MotrObject::read_mobj(): off=" << off << | |
1875 | " end=" << end << dendl; | |
1876 | // As `off` may not be parity group size aligned, even using optimal | |
1877 | // buffer block size, simply reading data from offset `off` could come | |
1878 | // across parity group boundary. And Motr only allows page-size aligned | |
1879 | // offset. | |
1880 | // | |
1881 | // The optimal size of each IO should also take into account the data | |
1882 | // transfer size to s3 client. For example, 16MB may be nice to read | |
1883 | // data from motr, but it could be too big for network transfer. | |
1884 | // | |
1885 | // TODO: We leave proper handling of offset in the future. | |
1886 | bs = this->get_optimal_bs(end - off); | |
1887 | ldpp_dout(dpp, 20) << "MotrObject::read_mobj(): bs=" << bs << dendl; | |
1888 | ||
1889 | rc = m0_bufvec_empty_alloc(&buf, 1) ? : | |
1890 | m0_bufvec_alloc(&attr, 1, 1) ? : | |
1891 | m0_indexvec_alloc(&ext, 1); | |
1892 | if (rc < 0) | |
1893 | goto out; | |
1894 | ||
1895 | left = end - off; | |
1896 | for (; left > 0; off += actual) { | |
1897 | if (left < bs) | |
1898 | bs = this->get_optimal_bs(left); | |
1899 | actual = bs; | |
1900 | if (left < bs) | |
1901 | actual = left; | |
1902 | ldpp_dout(dpp, 20) << "MotrObject::read_mobj(): off=" << off << | |
1903 | " actual=" << actual << dendl; | |
1904 | bufferlist bl; | |
1905 | buf.ov_buf[0] = bl.append_hole(bs).c_str(); | |
1906 | buf.ov_vec.v_count[0] = bs; | |
1907 | ext.iv_index[0] = off; | |
1908 | ext.iv_vec.v_count[0] = bs; | |
1909 | attr.ov_vec.v_count[0] = 0; | |
1910 | ||
1911 | left -= actual; | |
1912 | // Read from Motr. | |
1913 | op = nullptr; | |
1914 | rc = m0_obj_op(this->mobj, M0_OC_READ, &ext, &buf, &attr, 0, 0, &op); | |
1915 | ldpp_dout(dpp, 20) << "MotrObject::read_mobj(): init read op rc=" << rc << dendl; | |
1916 | if (rc != 0) { | |
1917 | ldpp_dout(dpp, 0) << __func__ << ": read failed during m0_obj_op, rc=" << rc << dendl; | |
1918 | goto out; | |
1919 | } | |
1920 | m0_op_launch(&op, 1); | |
1921 | rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: | |
1922 | m0_rc(op); | |
1923 | m0_op_fini(op); | |
1924 | m0_op_free(op); | |
1925 | if (rc != 0) { | |
1926 | ldpp_dout(dpp, 0) << __func__ << ": read failed, m0_op_wait rc=" << rc << dendl; | |
1927 | goto out; | |
1928 | } | |
1929 | // Call `cb` to process returned data. | |
1930 | ldpp_dout(dpp, 20) << "MotrObject::read_mobj(): call cb to process data" << dendl; | |
1931 | cb->handle_data(bl, 0, actual); | |
1932 | } | |
1933 | ||
1934 | out: | |
1935 | m0_indexvec_free(&ext); | |
1936 | m0_bufvec_free(&attr); | |
1937 | m0_bufvec_free2(&buf); | |
1938 | this->close_mobj(); | |
1939 | ||
1940 | return rc; | |
1941 | } | |
1942 | ||
1943 | int MotrObject::get_bucket_dir_ent(const DoutPrefixProvider *dpp, rgw_bucket_dir_entry& ent) | |
1944 | { | |
1945 | int rc = 0; | |
1946 | string bucket_index_iname = "motr.rgw.bucket.index." + this->get_bucket()->get_name(); | |
1947 | int max = 1000; | |
1948 | vector<string> keys(max); | |
1949 | vector<bufferlist> vals(max); | |
1950 | bufferlist bl; | |
1951 | bufferlist::const_iterator iter; | |
1952 | ||
1953 | if (this->get_bucket()->get_info().versioning_status() == BUCKET_VERSIONED || | |
1954 | this->get_bucket()->get_info().versioning_status() == BUCKET_SUSPENDED) { | |
1955 | ||
1956 | rgw_bucket_dir_entry ent_to_check; | |
1957 | ||
1958 | if (this->store->get_obj_meta_cache()->get(dpp, this->get_name(), bl) == 0) { | |
1959 | iter = bl.cbegin(); | |
1960 | ent_to_check.decode(iter); | |
1961 | if (ent_to_check.is_current()) { | |
1962 | ent = ent_to_check; | |
1963 | rc = 0; | |
1964 | goto out; | |
1965 | } | |
1966 | } | |
1967 | ||
1968 | ldpp_dout(dpp, 20) <<__func__<< ": versioned bucket!" << dendl; | |
1969 | keys[0] = this->get_name(); | |
1970 | rc = store->next_query_by_name(bucket_index_iname, keys, vals); | |
1971 | if (rc < 0) { | |
1972 | ldpp_dout(dpp, 0) << __func__ << "ERROR: NEXT query failed. " << rc << dendl; | |
1973 | return rc; | |
1974 | } | |
1975 | ||
1976 | rc = -ENOENT; | |
1977 | for (const auto& bl: vals) { | |
1978 | if (bl.length() == 0) | |
1979 | break; | |
1980 | ||
1981 | iter = bl.cbegin(); | |
1982 | ent_to_check.decode(iter); | |
1983 | if (ent_to_check.is_current()) { | |
1984 | ldpp_dout(dpp, 20) <<__func__<< ": found current version!" << dendl; | |
1985 | ent = ent_to_check; | |
1986 | rc = 0; | |
1987 | ||
1988 | this->store->get_obj_meta_cache()->put(dpp, this->get_name(), bl); | |
1989 | ||
1990 | break; | |
1991 | } | |
1992 | } | |
1993 | } else { | |
1994 | if (this->store->get_obj_meta_cache()->get(dpp, this->get_key().get_oid(), bl)) { | |
1995 | ldpp_dout(dpp, 20) <<__func__<< ": non-versioned bucket!" << dendl; | |
1996 | rc = this->store->do_idx_op_by_name(bucket_index_iname, | |
1997 | M0_IC_GET, this->get_key().get_oid(), bl); | |
1998 | if (rc < 0) { | |
1999 | ldpp_dout(dpp, 0) << __func__ << "ERROR: failed to get object's entry from bucket index: rc=" | |
2000 | << rc << dendl; | |
2001 | return rc; | |
2002 | } | |
2003 | this->store->get_obj_meta_cache()->put(dpp, this->get_key().get_oid(), bl); | |
2004 | } | |
2005 | ||
2006 | bufferlist& blr = bl; | |
2007 | iter = blr.cbegin(); | |
2008 | ent.decode(iter); | |
2009 | } | |
2010 | ||
2011 | out: | |
2012 | if (rc == 0) { | |
2013 | sal::Attrs dummy; | |
2014 | decode(dummy, iter); | |
2015 | meta.decode(iter); | |
2016 | ldpp_dout(dpp, 20) <<__func__<< ": lid=0x" << std::hex << meta.layout_id << dendl; | |
2017 | char fid_str[M0_FID_STR_LEN]; | |
2018 | snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&meta.oid)); | |
2019 | ldpp_dout(dpp, 70) << __func__ << ": oid=" << fid_str << dendl; | |
2020 | } else | |
2021 | ldpp_dout(dpp, 0) <<__func__<< ": rc=" << rc << dendl; | |
2022 | ||
2023 | return rc; | |
2024 | } | |
2025 | ||
2026 | int MotrObject::update_version_entries(const DoutPrefixProvider *dpp) | |
2027 | { | |
2028 | int rc; | |
2029 | int max = 10; | |
2030 | vector<string> keys(max); | |
2031 | vector<bufferlist> vals(max); | |
2032 | ||
2033 | string bucket_index_iname = "motr.rgw.bucket.index." + this->get_bucket()->get_name(); | |
2034 | keys[0] = this->get_name(); | |
2035 | rc = store->next_query_by_name(bucket_index_iname, keys, vals); | |
2036 | ldpp_dout(dpp, 20) << "get all versions, name = " << this->get_name() << "rc = " << rc << dendl; | |
2037 | if (rc < 0) { | |
2038 | ldpp_dout(dpp, 0) << "ERROR: NEXT query failed. " << rc << dendl; | |
2039 | return rc; | |
2040 | } | |
2041 | ||
2042 | // no entries returned. | |
2043 | if (rc == 0) | |
2044 | return 0; | |
2045 | ||
2046 | for (const auto& bl: vals) { | |
2047 | if (bl.length() == 0) | |
2048 | break; | |
2049 | ||
2050 | rgw_bucket_dir_entry ent; | |
2051 | auto iter = bl.cbegin(); | |
2052 | ent.decode(iter); | |
2053 | ||
2054 | if (0 != ent.key.name.compare(0, this->get_name().size(), this->get_name())) | |
2055 | continue; | |
2056 | ||
2057 | if (!ent.is_current()) | |
2058 | continue; | |
2059 | ||
2060 | // Remove from the cache. | |
2061 | store->get_obj_meta_cache()->remove(dpp, this->get_name()); | |
2062 | ||
2063 | rgw::sal::Attrs attrs; | |
2064 | decode(attrs, iter); | |
2065 | MotrObject::Meta meta; | |
2066 | meta.decode(iter); | |
2067 | ||
2068 | ent.flags = rgw_bucket_dir_entry::FLAG_VER; | |
2069 | string key; | |
2070 | if (ent.key.instance.empty()) | |
2071 | key = ent.key.name; | |
2072 | else { | |
2073 | char buf[ent.key.name.size() + ent.key.instance.size() + 16]; | |
2074 | snprintf(buf, sizeof(buf), "%s[%s]", ent.key.name.c_str(), ent.key.instance.c_str()); | |
2075 | key = buf; | |
2076 | } | |
2077 | ldpp_dout(dpp, 20) << "update one version, key = " << key << dendl; | |
2078 | bufferlist ent_bl; | |
2079 | ent.encode(ent_bl); | |
2080 | encode(attrs, ent_bl); | |
2081 | meta.encode(ent_bl); | |
2082 | ||
2083 | rc = store->do_idx_op_by_name(bucket_index_iname, | |
2084 | M0_IC_PUT, key, ent_bl); | |
2085 | if (rc < 0) | |
2086 | break; | |
2087 | } | |
2088 | return rc; | |
2089 | } | |
2090 | ||
2091 | // Scan object_nnn_part_index to get all parts then open their motr objects. | |
2092 | // TODO: all parts are opened in the POC. But for a large object, for example | |
2093 | // a 5GB object will have about 300 parts (for default 15MB part). A better | |
2094 | // way of managing opened object may be needed. | |
2095 | int MotrObject::get_part_objs(const DoutPrefixProvider* dpp, | |
2096 | std::map<int, std::unique_ptr<MotrObject>>& part_objs) | |
2097 | { | |
2098 | int rc; | |
2099 | int max_parts = 1000; | |
2100 | int marker = 0; | |
2101 | uint64_t off = 0; | |
2102 | bool truncated = false; | |
2103 | std::unique_ptr<rgw::sal::MultipartUpload> upload; | |
2104 | ||
2105 | upload = this->get_bucket()->get_multipart_upload(this->get_name(), string()); | |
2106 | ||
2107 | do { | |
2108 | rc = upload->list_parts(dpp, store->ctx(), max_parts, marker, &marker, &truncated); | |
2109 | if (rc == -ENOENT) { | |
2110 | rc = -ERR_NO_SUCH_UPLOAD; | |
2111 | } | |
2112 | if (rc < 0) | |
2113 | return rc; | |
2114 | ||
2115 | std::map<uint32_t, std::unique_ptr<MultipartPart>>& parts = upload->get_parts(); | |
2116 | for (auto part_iter = parts.begin(); part_iter != parts.end(); ++part_iter) { | |
2117 | ||
2118 | MultipartPart *mpart = part_iter->second.get(); | |
2119 | MotrMultipartPart *mmpart = static_cast<MotrMultipartPart *>(mpart); | |
2120 | uint32_t part_num = mmpart->get_num(); | |
2121 | uint64_t part_size = mmpart->get_size(); | |
2122 | ||
2123 | string part_obj_name = this->get_bucket()->get_name() + "." + | |
2124 | this->get_key().get_oid() + | |
2125 | ".part." + std::to_string(part_num); | |
2126 | std::unique_ptr<rgw::sal::Object> obj; | |
2127 | obj = this->bucket->get_object(rgw_obj_key(part_obj_name)); | |
2128 | std::unique_ptr<rgw::sal::MotrObject> mobj(static_cast<rgw::sal::MotrObject *>(obj.release())); | |
2129 | ||
2130 | ldpp_dout(dpp, 20) << "get_part_objs: off = " << off << ", size = " << part_size << dendl; | |
2131 | mobj->part_off = off; | |
2132 | mobj->part_size = part_size; | |
2133 | mobj->part_num = part_num; | |
2134 | mobj->meta = mmpart->meta; | |
2135 | ||
2136 | part_objs.emplace(part_num, std::move(mobj)); | |
2137 | ||
2138 | off += part_size; | |
2139 | } | |
2140 | } while (truncated); | |
2141 | ||
2142 | return 0; | |
2143 | } | |
2144 | ||
2145 | int MotrObject::open_part_objs(const DoutPrefixProvider* dpp, | |
2146 | std::map<int, std::unique_ptr<MotrObject>>& part_objs) | |
2147 | { | |
2148 | //for (auto& iter: part_objs) { | |
2149 | for (auto iter = part_objs.begin(); iter != part_objs.end(); ++iter) { | |
2150 | MotrObject* obj = static_cast<MotrObject *>(iter->second.get()); | |
2151 | ldpp_dout(dpp, 20) << "open_part_objs: name = " << obj->get_name() << dendl; | |
2152 | int rc = obj->open_mobj(dpp); | |
2153 | if (rc < 0) | |
2154 | return rc; | |
2155 | } | |
2156 | ||
2157 | return 0; | |
2158 | } | |
2159 | ||
2160 | int MotrObject::delete_part_objs(const DoutPrefixProvider* dpp) | |
2161 | { | |
2162 | std::unique_ptr<rgw::sal::MultipartUpload> upload; | |
2163 | upload = this->get_bucket()->get_multipart_upload(this->get_name(), string()); | |
2164 | std::unique_ptr<rgw::sal::MotrMultipartUpload> mupload(static_cast<rgw::sal::MotrMultipartUpload *>(upload.release())); | |
2165 | return mupload->delete_parts(dpp); | |
2166 | } | |
2167 | ||
2168 | int MotrObject::read_multipart_obj(const DoutPrefixProvider* dpp, | |
2169 | int64_t off, int64_t end, RGWGetDataCB* cb, | |
2170 | std::map<int, std::unique_ptr<MotrObject>>& part_objs) | |
2171 | { | |
2172 | int64_t cursor = off; | |
2173 | ||
2174 | ldpp_dout(dpp, 20) << "read_multipart_obj: off=" << off << " end=" << end << dendl; | |
2175 | ||
2176 | // Find the parts which are in the (off, end) range and | |
2177 | // read data from it. Note: `end` argument is inclusive. | |
2178 | for (auto iter = part_objs.begin(); iter != part_objs.end(); ++iter) { | |
2179 | MotrObject* obj = static_cast<MotrObject *>(iter->second.get()); | |
2180 | int64_t part_off = obj->part_off; | |
2181 | int64_t part_size = obj->part_size; | |
2182 | int64_t part_end = obj->part_off + obj->part_size - 1; | |
2183 | ldpp_dout(dpp, 20) << "read_multipart_obj: part_off=" << part_off | |
2184 | << " part_end=" << part_end << dendl; | |
2185 | if (part_end < off) | |
2186 | continue; | |
2187 | ||
2188 | int64_t local_off = cursor - obj->part_off; | |
2189 | int64_t local_end = part_end < end? part_size - 1 : end - part_off; | |
2190 | ldpp_dout(dpp, 20) << "real_multipart_obj: name=" << obj->get_name() | |
2191 | << " local_off=" << local_off | |
2192 | << " local_end=" << local_end << dendl; | |
2193 | int rc = obj->read_mobj(dpp, local_off, local_end, cb); | |
2194 | if (rc < 0) | |
2195 | return rc; | |
2196 | ||
2197 | cursor = part_end + 1; | |
2198 | if (cursor > end) | |
2199 | break; | |
2200 | } | |
2201 | ||
2202 | return 0; | |
2203 | } | |
2204 | ||
2205 | static unsigned roundup(unsigned x, unsigned by) | |
2206 | { | |
2207 | return ((x - 1) / by + 1) * by; | |
2208 | } | |
2209 | ||
2210 | unsigned MotrObject::get_optimal_bs(unsigned len) | |
2211 | { | |
2212 | struct m0_pool_version *pver; | |
2213 | ||
2214 | pver = m0_pool_version_find(&store->instance->m0c_pools_common, | |
2215 | &mobj->ob_attr.oa_pver); | |
2216 | M0_ASSERT(pver != nullptr); | |
2217 | struct m0_pdclust_attr *pa = &pver->pv_attr; | |
2218 | uint64_t lid = M0_OBJ_LAYOUT_ID(meta.layout_id); | |
2219 | unsigned unit_sz = m0_obj_layout_id_to_unit_size(lid); | |
2220 | unsigned grp_sz = unit_sz * pa->pa_N; | |
2221 | ||
2222 | // bs should be max 4-times pool-width deep counting by 1MB units, or | |
2223 | // 8-times deep counting by 512K units, 16-times deep by 256K units, | |
2224 | // and so on. Several units to one target will be aggregated to make | |
2225 | // fewer network RPCs, disk i/o operations and BE transactions. | |
2226 | // For unit sizes of 32K or less, the depth is 128, which | |
2227 | // makes it 32K * 128 == 4MB - the maximum amount per target when | |
2228 | // the performance is still good on LNet (which has max 1MB frames). | |
2229 | // TODO: it may be different on libfabric, should be re-measured. | |
2230 | unsigned depth = 128 / ((unit_sz + 0x7fff) / 0x8000); | |
2231 | if (depth == 0) | |
2232 | depth = 1; | |
2233 | // P * N / (N + K + S) - number of data units to span the pool-width | |
2234 | unsigned max_bs = depth * unit_sz * pa->pa_P * pa->pa_N / | |
2235 | (pa->pa_N + pa->pa_K + pa->pa_S); | |
2236 | max_bs = roundup(max_bs, grp_sz); // multiple of group size | |
2237 | if (len >= max_bs) | |
2238 | return max_bs; | |
2239 | else if (len <= grp_sz) | |
2240 | return grp_sz; | |
2241 | else | |
2242 | return roundup(len, grp_sz); | |
2243 | } | |
2244 | ||
2245 | void MotrAtomicWriter::cleanup() | |
2246 | { | |
2247 | m0_indexvec_free(&ext); | |
2248 | m0_bufvec_free(&attr); | |
2249 | m0_bufvec_free2(&buf); | |
2250 | acc_data.clear(); | |
2251 | obj.close_mobj(); | |
2252 | old_obj.close_mobj(); | |
2253 | } | |
2254 | ||
2255 | unsigned MotrAtomicWriter::populate_bvec(unsigned len, bufferlist::iterator &bi) | |
2256 | { | |
2257 | unsigned i, l, done = 0; | |
2258 | const char *data; | |
2259 | ||
2260 | for (i = 0; i < MAX_BUFVEC_NR && len > 0; ++i) { | |
2261 | l = bi.get_ptr_and_advance(len, &data); | |
2262 | buf.ov_buf[i] = (char*)data; | |
2263 | buf.ov_vec.v_count[i] = l; | |
2264 | ext.iv_index[i] = acc_off; | |
2265 | ext.iv_vec.v_count[i] = l; | |
2266 | attr.ov_vec.v_count[i] = 0; | |
2267 | acc_off += l; | |
2268 | len -= l; | |
2269 | done += l; | |
2270 | } | |
2271 | buf.ov_vec.v_nr = i; | |
2272 | ext.iv_vec.v_nr = i; | |
2273 | ||
2274 | return done; | |
2275 | } | |
2276 | ||
2277 | int MotrAtomicWriter::write() | |
2278 | { | |
2279 | int rc; | |
2280 | unsigned bs, left; | |
2281 | struct m0_op *op; | |
2282 | bufferlist::iterator bi; | |
2283 | ||
2284 | left = acc_data.length(); | |
2285 | ||
2286 | if (!obj.is_opened()) { | |
2287 | rc = obj.create_mobj(dpp, left); | |
2288 | if (rc == -EEXIST) | |
2289 | rc = obj.open_mobj(dpp); | |
2290 | if (rc != 0) { | |
2291 | char fid_str[M0_FID_STR_LEN]; | |
2292 | snprintf(fid_str, ARRAY_SIZE(fid_str), U128X_F, U128_P(&obj.meta.oid)); | |
2293 | ldpp_dout(dpp, 0) << "ERROR: failed to create/open motr object " | |
2294 | << fid_str << " (" << obj.get_bucket()->get_name() | |
2295 | << "/" << obj.get_key().get_oid() << "): rc=" << rc | |
2296 | << dendl; | |
2297 | goto err; | |
2298 | } | |
2299 | } | |
2300 | ||
2301 | total_data_size += left; | |
2302 | ||
2303 | bs = obj.get_optimal_bs(left); | |
2304 | ldpp_dout(dpp, 20) <<__func__<< ": left=" << left << " bs=" << bs << dendl; | |
2305 | ||
2306 | bi = acc_data.begin(); | |
2307 | while (left > 0) { | |
2308 | if (left < bs) | |
2309 | bs = obj.get_optimal_bs(left); | |
2310 | if (left < bs) { | |
2311 | acc_data.append_zero(bs - left); | |
2312 | auto off = bi.get_off(); | |
2313 | bufferlist tmp; | |
2314 | acc_data.splice(off, bs, &tmp); | |
2315 | acc_data.clear(); | |
2316 | acc_data.append(tmp.c_str(), bs); // make it a single buf | |
2317 | bi = acc_data.begin(); | |
2318 | left = bs; | |
2319 | } | |
2320 | ||
2321 | left -= this->populate_bvec(bs, bi); | |
2322 | ||
2323 | op = nullptr; | |
2324 | rc = m0_obj_op(obj.mobj, M0_OC_WRITE, &ext, &buf, &attr, 0, 0, &op); | |
2325 | if (rc != 0) | |
2326 | goto err; | |
2327 | m0_op_launch(&op, 1); | |
2328 | rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: | |
2329 | m0_rc(op); | |
2330 | m0_op_fini(op); | |
2331 | m0_op_free(op); | |
2332 | if (rc != 0) | |
2333 | goto err; | |
2334 | } | |
2335 | acc_data.clear(); | |
2336 | ||
2337 | return 0; | |
2338 | ||
2339 | err: | |
2340 | this->cleanup(); | |
2341 | return rc; | |
2342 | } | |
2343 | ||
2344 | static const unsigned MAX_ACC_SIZE = 32 * 1024 * 1024; | |
2345 | ||
2346 | // Accumulate enough data first to make a reasonable decision about the | |
2347 | // optimal unit size for a new object, or bs for existing object (32M seems | |
2348 | // enough for 4M units in 8+2 parity groups, a common config on wide pools), | |
2349 | // and then launch the write operations. | |
2350 | int MotrAtomicWriter::process(bufferlist&& data, uint64_t offset) | |
2351 | { | |
2352 | if (data.length() == 0) { // last call, flush data | |
2353 | int rc = 0; | |
2354 | if (acc_data.length() != 0) | |
2355 | rc = this->write(); | |
2356 | this->cleanup(); | |
2357 | return rc; | |
2358 | } | |
2359 | ||
2360 | if (acc_data.length() == 0) | |
2361 | acc_off = offset; | |
2362 | ||
2363 | acc_data.append(std::move(data)); | |
2364 | if (acc_data.length() < MAX_ACC_SIZE) | |
2365 | return 0; | |
2366 | ||
2367 | return this->write(); | |
2368 | } | |
2369 | ||
2370 | int MotrAtomicWriter::complete(size_t accounted_size, const std::string& etag, | |
2371 | ceph::real_time *mtime, ceph::real_time set_mtime, | |
2372 | std::map<std::string, bufferlist>& attrs, | |
2373 | ceph::real_time delete_at, | |
2374 | const char *if_match, const char *if_nomatch, | |
2375 | const std::string *user_data, | |
2376 | rgw_zone_set *zones_trace, bool *canceled, | |
2377 | optional_yield y) | |
2378 | { | |
2379 | int rc = 0; | |
2380 | ||
2381 | if (acc_data.length() != 0) { // check again, just in case | |
2382 | rc = this->write(); | |
2383 | this->cleanup(); | |
2384 | if (rc != 0) | |
2385 | return rc; | |
2386 | } | |
2387 | ||
2388 | bufferlist bl; | |
2389 | rgw_bucket_dir_entry ent; | |
2390 | ||
2391 | // Set rgw_bucet_dir_entry. Some of the member of this structure may not | |
2392 | // apply to motr. For example the storage_class. | |
2393 | // | |
2394 | // Checkout AtomicObjectProcessor::complete() in rgw_putobj_processor.cc | |
2395 | // and RGWRados::Object::Write::write_meta() in rgw_rados.cc for what and | |
2396 | // how to set the dir entry. Only set the basic ones for POC, no ACLs and | |
2397 | // other attrs. | |
2398 | obj.get_key().get_index_key(&ent.key); | |
2399 | ent.meta.size = total_data_size; | |
2400 | ent.meta.accounted_size = total_data_size; | |
2401 | ent.meta.mtime = real_clock::is_zero(set_mtime)? ceph::real_clock::now() : set_mtime; | |
2402 | ent.meta.etag = etag; | |
2403 | ent.meta.owner = owner.to_str(); | |
2404 | ent.meta.owner_display_name = obj.get_bucket()->get_owner()->get_display_name(); | |
2405 | bool is_versioned = obj.get_key().have_instance(); | |
2406 | if (is_versioned) | |
2407 | ent.flags = rgw_bucket_dir_entry::FLAG_VER | rgw_bucket_dir_entry::FLAG_CURRENT; | |
2408 | ldpp_dout(dpp, 20) <<__func__<< ": key=" << obj.get_key().get_oid() | |
2409 | << " etag: " << etag << " user_data=" << user_data << dendl; | |
2410 | if (user_data) | |
2411 | ent.meta.user_data = *user_data; | |
2412 | ent.encode(bl); | |
2413 | ||
2414 | RGWBucketInfo &info = obj.get_bucket()->get_info(); | |
2415 | if (info.obj_lock_enabled() && info.obj_lock.has_rule()) { | |
2416 | auto iter = attrs.find(RGW_ATTR_OBJECT_RETENTION); | |
2417 | if (iter == attrs.end()) { | |
2418 | real_time lock_until_date = info.obj_lock.get_lock_until_date(ent.meta.mtime); | |
2419 | string mode = info.obj_lock.get_mode(); | |
2420 | RGWObjectRetention obj_retention(mode, lock_until_date); | |
2421 | bufferlist retention_bl; | |
2422 | obj_retention.encode(retention_bl); | |
2423 | attrs[RGW_ATTR_OBJECT_RETENTION] = retention_bl; | |
2424 | } | |
2425 | } | |
2426 | encode(attrs, bl); | |
2427 | obj.meta.encode(bl); | |
2428 | ldpp_dout(dpp, 20) <<__func__<< ": lid=0x" << std::hex << obj.meta.layout_id | |
2429 | << dendl; | |
2430 | if (is_versioned) { | |
2431 | // get the list of all versioned objects with the same key and | |
2432 | // unset their FLAG_CURRENT later, if do_idx_op_by_name() is successful. | |
2433 | // Note: without distributed lock on the index - it is possible that 2 | |
2434 | // CURRENT entries would appear in the bucket. For example, consider the | |
2435 | // following scenario when two clients are trying to add the new object | |
2436 | // version concurrently: | |
2437 | // client 1: reads all the CURRENT entries | |
2438 | // client 2: updates the index and sets the new CURRENT | |
2439 | // client 1: updates the index and sets the new CURRENT | |
2440 | // At the step (1) client 1 would not see the new current record from step (2), | |
2441 | // so it won't update it. As a result, two CURRENT version entries will appear | |
2442 | // in the bucket. | |
2443 | // TODO: update the current version (unset the flag) and insert the new current | |
2444 | // version can be launched in one motr op. This requires change at do_idx_op() | |
2445 | // and do_idx_op_by_name(). | |
2446 | rc = obj.update_version_entries(dpp); | |
2447 | if (rc < 0) | |
2448 | return rc; | |
2449 | } | |
2450 | // Insert an entry into bucket index. | |
2451 | string bucket_index_iname = "motr.rgw.bucket.index." + obj.get_bucket()->get_name(); | |
2452 | rc = store->do_idx_op_by_name(bucket_index_iname, | |
2453 | M0_IC_PUT, obj.get_key().get_oid(), bl); | |
2454 | if (rc == 0) | |
2455 | store->get_obj_meta_cache()->put(dpp, obj.get_key().get_oid(), bl); | |
2456 | ||
2457 | if (old_obj.get_bucket()->get_info().versioning_status() != BUCKET_VERSIONED) { | |
2458 | // Delete old object data if exists. | |
2459 | old_obj.delete_mobj(dpp); | |
2460 | } | |
2461 | ||
2462 | // TODO: We need to handle the object leak caused by parallel object upload by | |
2463 | // making use of background gc, which is currently not enabled for motr. | |
2464 | return rc; | |
2465 | } | |
2466 | ||
2467 | int MotrMultipartUpload::delete_parts(const DoutPrefixProvider *dpp) | |
2468 | { | |
2469 | int rc; | |
2470 | int max_parts = 1000; | |
2471 | int marker = 0; | |
2472 | bool truncated = false; | |
2473 | ||
2474 | // Scan all parts and delete the corresponding motr objects. | |
2475 | do { | |
2476 | rc = this->list_parts(dpp, store->ctx(), max_parts, marker, &marker, &truncated); | |
2477 | if (rc == -ENOENT) { | |
2478 | truncated = false; | |
2479 | rc = 0; | |
2480 | } | |
2481 | if (rc < 0) | |
2482 | return rc; | |
2483 | ||
2484 | std::map<uint32_t, std::unique_ptr<MultipartPart>>& parts = this->get_parts(); | |
2485 | for (auto part_iter = parts.begin(); part_iter != parts.end(); ++part_iter) { | |
2486 | ||
2487 | MultipartPart *mpart = part_iter->second.get(); | |
2488 | MotrMultipartPart *mmpart = static_cast<MotrMultipartPart *>(mpart); | |
2489 | uint32_t part_num = mmpart->get_num(); | |
2490 | ||
2491 | // Delete the part object. Note that the part object is not | |
2492 | // inserted into bucket index, only the corresponding motr object | |
2493 | // needs to be delete. That is why we don't call | |
2494 | // MotrObject::delete_object(). | |
2495 | string part_obj_name = bucket->get_name() + "." + | |
2496 | mp_obj.get_key() + | |
2497 | ".part." + std::to_string(part_num); | |
2498 | std::unique_ptr<rgw::sal::Object> obj; | |
2499 | obj = this->bucket->get_object(rgw_obj_key(part_obj_name)); | |
2500 | std::unique_ptr<rgw::sal::MotrObject> mobj(static_cast<rgw::sal::MotrObject *>(obj.release())); | |
2501 | mobj->meta = mmpart->meta; | |
2502 | rc = mobj->delete_mobj(dpp); | |
2503 | if (rc < 0) { | |
2504 | ldpp_dout(dpp, 0) << __func__ << ": Failed to delete object from Motr. rc=" << rc << dendl; | |
2505 | return rc; | |
2506 | } | |
2507 | } | |
2508 | } while (truncated); | |
2509 | ||
2510 | // Delete object part index. | |
2511 | std::string oid = mp_obj.get_key(); | |
2512 | string obj_part_iname = "motr.rgw.object." + bucket->get_name() + "." + oid + ".parts"; | |
2513 | return store->delete_motr_idx_by_name(obj_part_iname); | |
2514 | } | |
2515 | ||
2516 | int MotrMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct) | |
2517 | { | |
2518 | int rc; | |
2519 | // Check if multipart upload exists | |
2520 | bufferlist bl; | |
2521 | std::unique_ptr<rgw::sal::Object> meta_obj; | |
2522 | meta_obj = get_meta_obj(); | |
2523 | string bucket_multipart_iname = | |
2524 | "motr.rgw.bucket." + meta_obj->get_bucket()->get_name() + ".multiparts"; | |
2525 | rc = store->do_idx_op_by_name(bucket_multipart_iname, | |
2526 | M0_IC_GET, meta_obj->get_key().to_str(), bl); | |
2527 | if (rc < 0) { | |
2528 | ldpp_dout(dpp, 0) << __func__ << ": Failed to get multipart upload. rc=" << rc << dendl; | |
2529 | return rc == -ENOENT ? -ERR_NO_SUCH_UPLOAD : rc; | |
2530 | } | |
2531 | ||
2532 | // Scan all parts and delete the corresponding motr objects. | |
2533 | rc = this->delete_parts(dpp); | |
2534 | if (rc < 0) | |
2535 | return rc; | |
2536 | ||
2537 | bl.clear(); | |
2538 | // Remove the upload from bucket multipart index. | |
2539 | rc = store->do_idx_op_by_name(bucket_multipart_iname, | |
2540 | M0_IC_DEL, meta_obj->get_key().get_oid(), bl); | |
2541 | return rc; | |
2542 | } | |
2543 | ||
2544 | std::unique_ptr<rgw::sal::Object> MotrMultipartUpload::get_meta_obj() | |
2545 | { | |
2546 | std::unique_ptr<rgw::sal::Object> obj = bucket->get_object(rgw_obj_key(get_meta(), string(), mp_ns)); | |
2547 | std::unique_ptr<rgw::sal::MotrObject> mobj(static_cast<rgw::sal::MotrObject *>(obj.release())); | |
2548 | mobj->set_category(RGWObjCategory::MultiMeta); | |
2549 | return mobj; | |
2550 | } | |
2551 | ||
2552 | struct motr_multipart_upload_info | |
2553 | { | |
2554 | rgw_placement_rule dest_placement; | |
2555 | ||
2556 | void encode(bufferlist& bl) const { | |
2557 | ENCODE_START(1, 1, bl); | |
2558 | encode(dest_placement, bl); | |
2559 | ENCODE_FINISH(bl); | |
2560 | } | |
2561 | ||
2562 | void decode(bufferlist::const_iterator& bl) { | |
2563 | DECODE_START(1, bl); | |
2564 | decode(dest_placement, bl); | |
2565 | DECODE_FINISH(bl); | |
2566 | } | |
2567 | }; | |
2568 | WRITE_CLASS_ENCODER(motr_multipart_upload_info) | |
2569 | ||
2570 | int MotrMultipartUpload::init(const DoutPrefixProvider *dpp, optional_yield y, | |
2571 | ACLOwner& _owner, | |
2572 | rgw_placement_rule& dest_placement, rgw::sal::Attrs& attrs) | |
2573 | { | |
2574 | int rc; | |
2575 | std::string oid = mp_obj.get_key(); | |
2576 | ||
2577 | owner = _owner; | |
2578 | ||
2579 | do { | |
2580 | char buf[33]; | |
2581 | string tmp_obj_name; | |
2582 | gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1); | |
2583 | std::string upload_id = MULTIPART_UPLOAD_ID_PREFIX; /* v2 upload id */ | |
2584 | upload_id.append(buf); | |
2585 | ||
2586 | mp_obj.init(oid, upload_id); | |
2587 | tmp_obj_name = mp_obj.get_meta(); | |
2588 | ||
2589 | std::unique_ptr<rgw::sal::Object> obj; | |
2590 | obj = bucket->get_object(rgw_obj_key(tmp_obj_name, string(), mp_ns)); | |
2591 | // the meta object will be indexed with 0 size, we c | |
2592 | obj->set_in_extra_data(true); | |
2593 | obj->set_hash_source(oid); | |
2594 | ||
2595 | motr_multipart_upload_info upload_info; | |
2596 | upload_info.dest_placement = dest_placement; | |
2597 | bufferlist mpbl; | |
2598 | encode(upload_info, mpbl); | |
2599 | ||
2600 | // Create an initial entry in the bucket. The entry will be | |
2601 | // updated when multipart upload is completed, for example, | |
2602 | // size, etag etc. | |
2603 | bufferlist bl; | |
2604 | rgw_bucket_dir_entry ent; | |
2605 | obj->get_key().get_index_key(&ent.key); | |
2606 | ent.meta.owner = owner.get_id().to_str(); | |
2607 | ent.meta.category = RGWObjCategory::MultiMeta; | |
2608 | ent.meta.mtime = ceph::real_clock::now(); | |
2609 | ent.meta.user_data.assign(mpbl.c_str(), mpbl.c_str() + mpbl.length()); | |
2610 | ent.encode(bl); | |
2611 | ||
2612 | // Insert an entry into bucket multipart index so it is not shown | |
2613 | // when listing a bucket. | |
2614 | string bucket_multipart_iname = | |
2615 | "motr.rgw.bucket." + obj->get_bucket()->get_name() + ".multiparts"; | |
2616 | rc = store->do_idx_op_by_name(bucket_multipart_iname, | |
2617 | M0_IC_PUT, obj->get_key().get_oid(), bl); | |
2618 | ||
2619 | } while (rc == -EEXIST); | |
2620 | ||
2621 | if (rc < 0) | |
2622 | return rc; | |
2623 | ||
2624 | // Create object part index. | |
2625 | // TODO: add bucket as part of the name. | |
2626 | string obj_part_iname = "motr.rgw.object." + bucket->get_name() + "." + oid + ".parts"; | |
2627 | ldpp_dout(dpp, 20) << "MotrMultipartUpload::init(): object part index=" << obj_part_iname << dendl; | |
2628 | rc = store->create_motr_idx_by_name(obj_part_iname); | |
2629 | if (rc == -EEXIST) | |
2630 | rc = 0; | |
2631 | if (rc < 0) | |
2632 | // TODO: clean the bucket index entry | |
2633 | ldpp_dout(dpp, 0) << "Failed to create object multipart index " << obj_part_iname << dendl; | |
2634 | ||
2635 | return rc; | |
2636 | } | |
2637 | ||
2638 | int MotrMultipartUpload::list_parts(const DoutPrefixProvider *dpp, CephContext *cct, | |
2639 | int num_parts, int marker, | |
2640 | int *next_marker, bool *truncated, | |
2641 | bool assume_unsorted) | |
2642 | { | |
2643 | int rc; | |
2644 | vector<string> key_vec(num_parts); | |
2645 | vector<bufferlist> val_vec(num_parts); | |
2646 | ||
2647 | std::string oid = mp_obj.get_key(); | |
2648 | string obj_part_iname = "motr.rgw.object." + bucket->get_name() + "." + oid + ".parts"; | |
2649 | ldpp_dout(dpp, 20) << __func__ << ": object part index = " << obj_part_iname << dendl; | |
2650 | key_vec[0].clear(); | |
2651 | key_vec[0] = "part."; | |
2652 | char buf[32]; | |
2653 | snprintf(buf, sizeof(buf), "%08d", marker + 1); | |
2654 | key_vec[0].append(buf); | |
2655 | rc = store->next_query_by_name(obj_part_iname, key_vec, val_vec); | |
2656 | if (rc < 0) { | |
2657 | ldpp_dout(dpp, 0) << "ERROR: NEXT query failed. " << rc << dendl; | |
2658 | return rc; | |
2659 | } | |
2660 | ||
2661 | int last_num = 0; | |
2662 | int part_cnt = 0; | |
2663 | uint32_t expected_next = 0; | |
2664 | ldpp_dout(dpp, 20) << __func__ << ": marker = " << marker << dendl; | |
2665 | for (const auto& bl: val_vec) { | |
2666 | if (bl.length() == 0) | |
2667 | break; | |
2668 | ||
2669 | RGWUploadPartInfo info; | |
2670 | auto iter = bl.cbegin(); | |
2671 | info.decode(iter); | |
2672 | rgw::sal::Attrs attrs_dummy; | |
2673 | decode(attrs_dummy, iter); | |
2674 | MotrObject::Meta meta; | |
2675 | meta.decode(iter); | |
2676 | ||
2677 | ldpp_dout(dpp, 20) << __func__ << ": part_num=" << info.num | |
2678 | << " part_size=" << info.size << dendl; | |
2679 | ldpp_dout(dpp, 20) << __func__ << ": meta:oid=[" << meta.oid.u_hi << "," << meta.oid.u_lo | |
2680 | << "], meta:pvid=[" << meta.pver.f_container << "," << meta.pver.f_key | |
2681 | << "], meta:layout id=" << meta.layout_id << dendl; | |
2682 | ||
2683 | if (!expected_next) | |
2684 | expected_next = info.num + 1; | |
2685 | else if (expected_next && info.num != expected_next) | |
2686 | return -EINVAL; | |
2687 | else expected_next = info.num + 1; | |
2688 | ||
2689 | if ((int)info.num > marker) { | |
2690 | last_num = info.num; | |
2691 | parts.emplace(info.num, std::make_unique<MotrMultipartPart>(info, meta)); | |
2692 | } | |
2693 | ||
2694 | part_cnt++; | |
2695 | } | |
2696 | ||
2697 | // Does it have more parts? | |
2698 | if (truncated) | |
2699 | *truncated = part_cnt < num_parts? false : true; | |
2700 | ldpp_dout(dpp, 20) << __func__ << ": truncated=" << *truncated << dendl; | |
2701 | ||
2702 | if (next_marker) | |
2703 | *next_marker = last_num; | |
2704 | ||
2705 | return 0; | |
2706 | } | |
2707 | ||
2708 | // Heavily copy from rgw_sal_rados.cc | |
2709 | int MotrMultipartUpload::complete(const DoutPrefixProvider *dpp, | |
2710 | optional_yield y, CephContext* cct, | |
2711 | map<int, string>& part_etags, | |
2712 | list<rgw_obj_index_key>& remove_objs, | |
2713 | uint64_t& accounted_size, bool& compressed, | |
2714 | RGWCompressionInfo& cs_info, off_t& off, | |
2715 | std::string& tag, ACLOwner& owner, | |
2716 | uint64_t olh_epoch, | |
2717 | rgw::sal::Object* target_obj) | |
2718 | { | |
2719 | char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE]; | |
2720 | char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16]; | |
2721 | std::string etag; | |
2722 | bufferlist etag_bl; | |
2723 | MD5 hash; | |
2724 | // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes | |
2725 | hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW); | |
2726 | bool truncated; | |
2727 | int rc; | |
2728 | ||
2729 | ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): enter" << dendl; | |
2730 | int total_parts = 0; | |
2731 | int handled_parts = 0; | |
2732 | int max_parts = 1000; | |
2733 | int marker = 0; | |
2734 | uint64_t min_part_size = cct->_conf->rgw_multipart_min_part_size; | |
2735 | auto etags_iter = part_etags.begin(); | |
2736 | rgw::sal::Attrs attrs = target_obj->get_attrs(); | |
2737 | ||
2738 | do { | |
2739 | ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): list_parts()" << dendl; | |
2740 | rc = list_parts(dpp, cct, max_parts, marker, &marker, &truncated); | |
2741 | if (rc == -ENOENT) { | |
2742 | rc = -ERR_NO_SUCH_UPLOAD; | |
2743 | } | |
2744 | if (rc < 0) | |
2745 | return rc; | |
2746 | ||
2747 | total_parts += parts.size(); | |
2748 | if (!truncated && total_parts != (int)part_etags.size()) { | |
2749 | ldpp_dout(dpp, 0) << "NOTICE: total parts mismatch: have: " << total_parts | |
2750 | << " expected: " << part_etags.size() << dendl; | |
2751 | rc = -ERR_INVALID_PART; | |
2752 | return rc; | |
2753 | } | |
2754 | ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): parts.size()=" << parts.size() << dendl; | |
2755 | ||
2756 | for (auto obj_iter = parts.begin(); | |
2757 | etags_iter != part_etags.end() && obj_iter != parts.end(); | |
2758 | ++etags_iter, ++obj_iter, ++handled_parts) { | |
2759 | MultipartPart *mpart = obj_iter->second.get(); | |
2760 | MotrMultipartPart *mmpart = static_cast<MotrMultipartPart *>(mpart); | |
2761 | RGWUploadPartInfo *part = &mmpart->info; | |
2762 | ||
2763 | uint64_t part_size = part->accounted_size; | |
2764 | ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): part_size=" << part_size << dendl; | |
2765 | if (handled_parts < (int)part_etags.size() - 1 && | |
2766 | part_size < min_part_size) { | |
2767 | rc = -ERR_TOO_SMALL; | |
2768 | return rc; | |
2769 | } | |
2770 | ||
2771 | char petag[CEPH_CRYPTO_MD5_DIGESTSIZE]; | |
2772 | if (etags_iter->first != (int)obj_iter->first) { | |
2773 | ldpp_dout(dpp, 0) << "NOTICE: parts num mismatch: next requested: " | |
2774 | << etags_iter->first << " next uploaded: " | |
2775 | << obj_iter->first << dendl; | |
2776 | rc = -ERR_INVALID_PART; | |
2777 | return rc; | |
2778 | } | |
2779 | string part_etag = rgw_string_unquote(etags_iter->second); | |
2780 | if (part_etag.compare(part->etag) != 0) { | |
2781 | ldpp_dout(dpp, 0) << "NOTICE: etag mismatch: part: " << etags_iter->first | |
2782 | << " etag: " << etags_iter->second << dendl; | |
2783 | rc = -ERR_INVALID_PART; | |
2784 | return rc; | |
2785 | } | |
2786 | ||
2787 | hex_to_buf(part->etag.c_str(), petag, CEPH_CRYPTO_MD5_DIGESTSIZE); | |
2788 | hash.Update((const unsigned char *)petag, sizeof(petag)); | |
2789 | ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): calc etag " << dendl; | |
2790 | ||
2791 | string oid = mp_obj.get_part(part->num); | |
2792 | rgw_obj src_obj; | |
2793 | src_obj.init_ns(bucket->get_key(), oid, mp_ns); | |
2794 | ||
2795 | #if 0 // does Motr backend need it? | |
2796 | /* update manifest for part */ | |
2797 | if (part->manifest.empty()) { | |
2798 | ldpp_dout(dpp, 0) << "ERROR: empty manifest for object part: obj=" | |
2799 | << src_obj << dendl; | |
2800 | rc = -ERR_INVALID_PART; | |
2801 | return rc; | |
2802 | } else { | |
2803 | manifest.append(dpp, part->manifest, store->get_zone()); | |
2804 | } | |
2805 | ldpp_dout(dpp, 0) << "MotrMultipartUpload::complete(): manifest " << dendl; | |
2806 | #endif | |
2807 | ||
2808 | bool part_compressed = (part->cs_info.compression_type != "none"); | |
2809 | if ((handled_parts > 0) && | |
2810 | ((part_compressed != compressed) || | |
2811 | (cs_info.compression_type != part->cs_info.compression_type))) { | |
2812 | ldpp_dout(dpp, 0) << "ERROR: compression type was changed during multipart upload (" | |
2813 | << cs_info.compression_type << ">>" << part->cs_info.compression_type << ")" << dendl; | |
2814 | rc = -ERR_INVALID_PART; | |
2815 | return rc; | |
2816 | } | |
2817 | ||
2818 | ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): part compression" << dendl; | |
2819 | if (part_compressed) { | |
2820 | int64_t new_ofs; // offset in compression data for new part | |
2821 | if (cs_info.blocks.size() > 0) | |
2822 | new_ofs = cs_info.blocks.back().new_ofs + cs_info.blocks.back().len; | |
2823 | else | |
2824 | new_ofs = 0; | |
2825 | for (const auto& block : part->cs_info.blocks) { | |
2826 | compression_block cb; | |
2827 | cb.old_ofs = block.old_ofs + cs_info.orig_size; | |
2828 | cb.new_ofs = new_ofs; | |
2829 | cb.len = block.len; | |
2830 | cs_info.blocks.push_back(cb); | |
2831 | new_ofs = cb.new_ofs + cb.len; | |
2832 | } | |
2833 | if (!compressed) | |
2834 | cs_info.compression_type = part->cs_info.compression_type; | |
2835 | cs_info.orig_size += part->cs_info.orig_size; | |
2836 | compressed = true; | |
2837 | } | |
2838 | ||
2839 | // We may not need to do the following as remove_objs are those | |
2840 | // don't show when listing a bucket. As we store in-progress uploaded | |
2841 | // object's metadata in a separate index, they are not shown when | |
2842 | // listing a bucket. | |
2843 | rgw_obj_index_key remove_key; | |
2844 | src_obj.key.get_index_key(&remove_key); | |
2845 | remove_objs.push_back(remove_key); | |
2846 | ||
2847 | off += part_size; | |
2848 | accounted_size += part->accounted_size; | |
2849 | ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): off=" << off << ", accounted_size = " << accounted_size << dendl; | |
2850 | } | |
2851 | } while (truncated); | |
2852 | hash.Final((unsigned char *)final_etag); | |
2853 | ||
2854 | buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str); | |
2855 | snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2], | |
2856 | sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2, | |
2857 | "-%lld", (long long)part_etags.size()); | |
2858 | etag = final_etag_str; | |
2859 | ldpp_dout(dpp, 20) << "calculated etag: " << etag << dendl; | |
2860 | etag_bl.append(etag); | |
2861 | attrs[RGW_ATTR_ETAG] = etag_bl; | |
2862 | ||
2863 | if (compressed) { | |
2864 | // write compression attribute to full object | |
2865 | bufferlist tmp; | |
2866 | encode(cs_info, tmp); | |
2867 | attrs[RGW_ATTR_COMPRESSION] = tmp; | |
2868 | } | |
2869 | ||
2870 | // Read the object's the multipart_upload_info. | |
2871 | // TODO: all those index name and key constructions should be implemented as | |
2872 | // member functions. | |
2873 | bufferlist bl; | |
2874 | std::unique_ptr<rgw::sal::Object> meta_obj; | |
2875 | meta_obj = get_meta_obj(); | |
2876 | string bucket_multipart_iname = | |
2877 | "motr.rgw.bucket." + meta_obj->get_bucket()->get_name() + ".multiparts"; | |
2878 | rc = this->store->do_idx_op_by_name(bucket_multipart_iname, | |
2879 | M0_IC_GET, meta_obj->get_key().get_oid(), bl); | |
2880 | ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): read entry from bucket multipart index rc=" << rc << dendl; | |
2881 | if (rc < 0) | |
2882 | return rc; | |
2883 | rgw_bucket_dir_entry ent; | |
2884 | bufferlist& blr = bl; | |
2885 | auto ent_iter = blr.cbegin(); | |
2886 | ent.decode(ent_iter); | |
2887 | ||
2888 | // Update the dir entry and insert it to the bucket index so | |
2889 | // the object will be seen when listing the bucket. | |
2890 | bufferlist update_bl; | |
2891 | target_obj->get_key().get_index_key(&ent.key); // Change to offical name :) | |
2892 | ent.meta.size = off; | |
2893 | ent.meta.accounted_size = accounted_size; | |
2894 | ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): obj size=" << ent.meta.size | |
2895 | << " obj accounted size=" << ent.meta.accounted_size << dendl; | |
2896 | ent.meta.mtime = ceph::real_clock::now(); | |
2897 | ent.meta.etag = etag; | |
2898 | ent.encode(update_bl); | |
2899 | encode(attrs, update_bl); | |
2900 | MotrObject::Meta meta_dummy; | |
2901 | meta_dummy.encode(update_bl); | |
2902 | ||
2903 | string bucket_index_iname = "motr.rgw.bucket.index." + meta_obj->get_bucket()->get_name(); | |
2904 | ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): target_obj name=" << target_obj->get_name() | |
2905 | << " target_obj oid=" << target_obj->get_oid() << dendl; | |
2906 | rc = store->do_idx_op_by_name(bucket_index_iname, M0_IC_PUT, | |
2907 | target_obj->get_name(), update_bl); | |
2908 | if (rc < 0) | |
2909 | return rc; | |
2910 | ||
2911 | // Put into metadata cache. | |
2912 | store->get_obj_meta_cache()->put(dpp, target_obj->get_name(), update_bl); | |
2913 | ||
2914 | // Now we can remove it from bucket multipart index. | |
2915 | ldpp_dout(dpp, 20) << "MotrMultipartUpload::complete(): remove from bucket multipartindex " << dendl; | |
2916 | return store->do_idx_op_by_name(bucket_multipart_iname, | |
2917 | M0_IC_DEL, meta_obj->get_key().get_oid(), bl); | |
2918 | } | |
2919 | ||
2920 | int MotrMultipartUpload::get_info(const DoutPrefixProvider *dpp, optional_yield y, rgw_placement_rule** rule, rgw::sal::Attrs* attrs) | |
2921 | { | |
2922 | if (!rule && !attrs) { | |
2923 | return 0; | |
2924 | } | |
2925 | ||
2926 | if (rule) { | |
2927 | if (!placement.empty()) { | |
2928 | *rule = &placement; | |
2929 | if (!attrs) { | |
2930 | /* Don't need attrs, done */ | |
2931 | return 0; | |
2932 | } | |
2933 | } else { | |
2934 | *rule = nullptr; | |
2935 | } | |
2936 | } | |
2937 | ||
2938 | std::unique_ptr<rgw::sal::Object> meta_obj; | |
2939 | meta_obj = get_meta_obj(); | |
2940 | meta_obj->set_in_extra_data(true); | |
2941 | ||
2942 | // Read the object's the multipart_upload_info. | |
2943 | bufferlist bl; | |
2944 | string bucket_multipart_iname = | |
2945 | "motr.rgw.bucket." + meta_obj->get_bucket()->get_name() + ".multiparts"; | |
2946 | int rc = this->store->do_idx_op_by_name(bucket_multipart_iname, | |
2947 | M0_IC_GET, meta_obj->get_key().get_oid(), bl); | |
2948 | if (rc < 0) { | |
2949 | ldpp_dout(dpp, 0) << __func__ << ": Failed to get multipart info. rc=" << rc << dendl; | |
2950 | return rc == -ENOENT ? -ERR_NO_SUCH_UPLOAD : rc; | |
2951 | } | |
2952 | ||
2953 | rgw_bucket_dir_entry ent; | |
2954 | bufferlist& blr = bl; | |
2955 | auto ent_iter = blr.cbegin(); | |
2956 | ent.decode(ent_iter); | |
2957 | ||
2958 | if (attrs) { | |
2959 | bufferlist etag_bl; | |
2960 | string& etag = ent.meta.etag; | |
2961 | ldpp_dout(dpp, 20) << "object's etag: " << ent.meta.etag << dendl; | |
2962 | etag_bl.append(etag.c_str(), etag.size()); | |
2963 | attrs->emplace(std::move(RGW_ATTR_ETAG), std::move(etag_bl)); | |
2964 | if (!rule || *rule != nullptr) { | |
2965 | /* placement was cached; don't actually read */ | |
2966 | return 0; | |
2967 | } | |
2968 | } | |
2969 | ||
2970 | /* Decode multipart_upload_info */ | |
2971 | motr_multipart_upload_info upload_info; | |
2972 | bufferlist mpbl; | |
2973 | mpbl.append(ent.meta.user_data.c_str(), ent.meta.user_data.size()); | |
2974 | auto mpbl_iter = mpbl.cbegin(); | |
2975 | upload_info.decode(mpbl_iter); | |
2976 | placement = upload_info.dest_placement; | |
2977 | *rule = &placement; | |
2978 | ||
2979 | return 0; | |
2980 | } | |
2981 | ||
2982 | std::unique_ptr<Writer> MotrMultipartUpload::get_writer( | |
2983 | const DoutPrefixProvider *dpp, | |
2984 | optional_yield y, | |
2985 | rgw::sal::Object* obj, | |
2986 | const rgw_user& owner, | |
2987 | const rgw_placement_rule *ptail_placement_rule, | |
2988 | uint64_t part_num, | |
2989 | const std::string& part_num_str) | |
2990 | { | |
2991 | return std::make_unique<MotrMultipartWriter>(dpp, y, this, | |
2992 | obj, store, owner, | |
2993 | ptail_placement_rule, part_num, part_num_str); | |
2994 | } | |
2995 | ||
2996 | int MotrMultipartWriter::prepare(optional_yield y) | |
2997 | { | |
2998 | string part_obj_name = head_obj->get_bucket()->get_name() + "." + | |
2999 | head_obj->get_key().get_oid() + | |
3000 | ".part." + std::to_string(part_num); | |
3001 | ldpp_dout(dpp, 20) << "bucket=" << head_obj->get_bucket()->get_name() << "part_obj_name=" << part_obj_name << dendl; | |
3002 | part_obj = std::make_unique<MotrObject>(this->store, rgw_obj_key(part_obj_name), head_obj->get_bucket()); | |
3003 | if (part_obj == nullptr) | |
3004 | return -ENOMEM; | |
3005 | ||
3006 | // s3 client may retry uploading part, so the part may have already | |
3007 | // been created. | |
3008 | int rc = part_obj->create_mobj(dpp, store->cctx->_conf->rgw_max_chunk_size); | |
3009 | if (rc == -EEXIST) { | |
3010 | rc = part_obj->open_mobj(dpp); | |
3011 | if (rc < 0) | |
3012 | return rc; | |
3013 | } | |
3014 | return rc; | |
3015 | } | |
3016 | ||
3017 | int MotrMultipartWriter::process(bufferlist&& data, uint64_t offset) | |
3018 | { | |
3019 | int rc = part_obj->write_mobj(dpp, std::move(data), offset); | |
3020 | if (rc == 0) { | |
3021 | actual_part_size += data.length(); | |
3022 | ldpp_dout(dpp, 20) << " write_mobj(): actual_part_size=" << actual_part_size << dendl; | |
3023 | } | |
3024 | return rc; | |
3025 | } | |
3026 | ||
3027 | int MotrMultipartWriter::complete(size_t accounted_size, const std::string& etag, | |
3028 | ceph::real_time *mtime, ceph::real_time set_mtime, | |
3029 | std::map<std::string, bufferlist>& attrs, | |
3030 | ceph::real_time delete_at, | |
3031 | const char *if_match, const char *if_nomatch, | |
3032 | const std::string *user_data, | |
3033 | rgw_zone_set *zones_trace, bool *canceled, | |
3034 | optional_yield y) | |
3035 | { | |
3036 | // Should the dir entry(object metadata) be updated? For example | |
3037 | // mtime. | |
3038 | ||
3039 | ldpp_dout(dpp, 20) << "MotrMultipartWriter::complete(): enter" << dendl; | |
3040 | // Add an entry into object_nnn_part_index. | |
3041 | bufferlist bl; | |
3042 | RGWUploadPartInfo info; | |
3043 | info.num = part_num; | |
3044 | info.etag = etag; | |
3045 | info.size = actual_part_size; | |
3046 | info.accounted_size = accounted_size; | |
3047 | info.modified = real_clock::now(); | |
3048 | ||
3049 | bool compressed; | |
3050 | int rc = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info); | |
3051 | ldpp_dout(dpp, 20) << "MotrMultipartWriter::complete(): compression rc=" << rc << dendl; | |
3052 | if (rc < 0) { | |
3053 | ldpp_dout(dpp, 1) << "cannot get compression info" << dendl; | |
3054 | return rc; | |
3055 | } | |
3056 | encode(info, bl); | |
3057 | encode(attrs, bl); | |
3058 | part_obj->meta.encode(bl); | |
3059 | ||
3060 | string p = "part."; | |
3061 | char buf[32]; | |
3062 | snprintf(buf, sizeof(buf), "%08d", (int)part_num); | |
3063 | p.append(buf); | |
3064 | string obj_part_iname = "motr.rgw.object." + head_obj->get_bucket()->get_name() + "." + | |
3065 | head_obj->get_key().get_oid() + ".parts"; | |
3066 | ldpp_dout(dpp, 20) << "MotrMultipartWriter::complete(): object part index = " << obj_part_iname << dendl; | |
3067 | rc = store->do_idx_op_by_name(obj_part_iname, M0_IC_PUT, p, bl); | |
3068 | if (rc < 0) { | |
3069 | return rc == -ENOENT ? -ERR_NO_SUCH_UPLOAD : rc; | |
3070 | } | |
3071 | ||
3072 | return 0; | |
3073 | } | |
3074 | ||
3075 | std::unique_ptr<RGWRole> MotrStore::get_role(std::string name, | |
3076 | std::string tenant, | |
3077 | std::string path, | |
3078 | std::string trust_policy, | |
3079 | std::string max_session_duration_str, | |
3080 | std::multimap<std::string,std::string> tags) | |
3081 | { | |
3082 | RGWRole* p = nullptr; | |
3083 | return std::unique_ptr<RGWRole>(p); | |
3084 | } | |
3085 | ||
3086 | std::unique_ptr<RGWRole> MotrStore::get_role(const RGWRoleInfo& info) | |
3087 | { | |
3088 | RGWRole* p = nullptr; | |
3089 | return std::unique_ptr<RGWRole>(p); | |
3090 | } | |
3091 | ||
3092 | std::unique_ptr<RGWRole> MotrStore::get_role(std::string id) | |
3093 | { | |
3094 | RGWRole* p = nullptr; | |
3095 | return std::unique_ptr<RGWRole>(p); | |
3096 | } | |
3097 | ||
3098 | int MotrStore::get_roles(const DoutPrefixProvider *dpp, | |
3099 | optional_yield y, | |
3100 | const std::string& path_prefix, | |
3101 | const std::string& tenant, | |
3102 | vector<std::unique_ptr<RGWRole>>& roles) | |
3103 | { | |
3104 | return 0; | |
3105 | } | |
3106 | ||
3107 | std::unique_ptr<RGWOIDCProvider> MotrStore::get_oidc_provider() | |
3108 | { | |
3109 | RGWOIDCProvider* p = nullptr; | |
3110 | return std::unique_ptr<RGWOIDCProvider>(p); | |
3111 | } | |
3112 | ||
3113 | int MotrStore::get_oidc_providers(const DoutPrefixProvider *dpp, | |
3114 | const std::string& tenant, | |
3115 | vector<std::unique_ptr<RGWOIDCProvider>>& providers) | |
3116 | { | |
3117 | return 0; | |
3118 | } | |
3119 | ||
3120 | std::unique_ptr<MultipartUpload> MotrBucket::get_multipart_upload(const std::string& oid, | |
3121 | std::optional<std::string> upload_id, | |
3122 | ACLOwner owner, ceph::real_time mtime) | |
3123 | { | |
3124 | return std::make_unique<MotrMultipartUpload>(store, this, oid, upload_id, owner, mtime); | |
3125 | } | |
3126 | ||
3127 | std::unique_ptr<Writer> MotrStore::get_append_writer(const DoutPrefixProvider *dpp, | |
3128 | optional_yield y, | |
3129 | rgw::sal::Object* obj, | |
3130 | const rgw_user& owner, | |
3131 | const rgw_placement_rule *ptail_placement_rule, | |
3132 | const std::string& unique_tag, | |
3133 | uint64_t position, | |
3134 | uint64_t *cur_accounted_size) { | |
3135 | return nullptr; | |
3136 | } | |
3137 | ||
3138 | std::unique_ptr<Writer> MotrStore::get_atomic_writer(const DoutPrefixProvider *dpp, | |
3139 | optional_yield y, | |
3140 | rgw::sal::Object* obj, | |
3141 | const rgw_user& owner, | |
3142 | const rgw_placement_rule *ptail_placement_rule, | |
3143 | uint64_t olh_epoch, | |
3144 | const std::string& unique_tag) { | |
3145 | return std::make_unique<MotrAtomicWriter>(dpp, y, | |
3146 | obj, this, owner, | |
3147 | ptail_placement_rule, olh_epoch, unique_tag); | |
3148 | } | |
3149 | ||
3150 | const std::string& MotrStore::get_compression_type(const rgw_placement_rule& rule) | |
3151 | { | |
3152 | return zone.zone_params->get_compression_type(rule); | |
3153 | } | |
3154 | ||
3155 | bool MotrStore::valid_placement(const rgw_placement_rule& rule) | |
3156 | { | |
3157 | return zone.zone_params->valid_placement(rule); | |
3158 | } | |
3159 | ||
3160 | std::unique_ptr<User> MotrStore::get_user(const rgw_user &u) | |
3161 | { | |
3162 | ldout(cctx, 20) << "bucket's user: " << u.to_str() << dendl; | |
3163 | return std::make_unique<MotrUser>(this, u); | |
3164 | } | |
3165 | ||
3166 | int MotrStore::get_user_by_access_key(const DoutPrefixProvider *dpp, const std::string &key, optional_yield y, std::unique_ptr<User> *user) | |
3167 | { | |
3168 | int rc; | |
3169 | User *u; | |
3170 | bufferlist bl; | |
3171 | RGWUserInfo uinfo; | |
3172 | MotrAccessKey access_key; | |
3173 | ||
3174 | rc = do_idx_op_by_name(RGW_IAM_MOTR_ACCESS_KEY, | |
3175 | M0_IC_GET, key, bl); | |
3176 | if (rc < 0){ | |
3177 | ldout(cctx, 0) << "Access key not found: rc = " << rc << dendl; | |
3178 | return rc; | |
3179 | } | |
3180 | ||
3181 | bufferlist& blr = bl; | |
3182 | auto iter = blr.cbegin(); | |
3183 | access_key.decode(iter); | |
3184 | ||
3185 | uinfo.user_id.from_str(access_key.user_id); | |
3186 | ldout(cctx, 0) << "Loading user: " << uinfo.user_id.id << dendl; | |
3187 | rc = MotrUser().load_user_from_idx(dpp, this, uinfo, nullptr, nullptr); | |
3188 | if (rc < 0){ | |
3189 | ldout(cctx, 0) << "Failed to load user: rc = " << rc << dendl; | |
3190 | return rc; | |
3191 | } | |
3192 | u = new MotrUser(this, uinfo); | |
3193 | if (!u) | |
3194 | return -ENOMEM; | |
3195 | ||
3196 | user->reset(u); | |
3197 | return 0; | |
3198 | } | |
3199 | ||
3200 | int MotrStore::get_user_by_email(const DoutPrefixProvider *dpp, const std::string& email, optional_yield y, std::unique_ptr<User>* user) | |
3201 | { | |
3202 | int rc; | |
3203 | User *u; | |
3204 | bufferlist bl; | |
3205 | RGWUserInfo uinfo; | |
3206 | MotrEmailInfo email_info; | |
3207 | rc = do_idx_op_by_name(RGW_IAM_MOTR_EMAIL_KEY, | |
3208 | M0_IC_GET, email, bl); | |
3209 | if (rc < 0){ | |
3210 | ldout(cctx, 0) << "Email Id not found: rc = " << rc << dendl; | |
3211 | return rc; | |
3212 | } | |
3213 | auto iter = bl.cbegin(); | |
3214 | email_info.decode(iter); | |
3215 | ldout(cctx, 0) << "Loading user: " << email_info.user_id << dendl; | |
3216 | uinfo.user_id.from_str(email_info.user_id); | |
3217 | rc = MotrUser().load_user_from_idx(dpp, this, uinfo, nullptr, nullptr); | |
3218 | if (rc < 0){ | |
3219 | ldout(cctx, 0) << "Failed to load user: rc = " << rc << dendl; | |
3220 | return rc; | |
3221 | } | |
3222 | u = new MotrUser(this, uinfo); | |
3223 | if (!u) | |
3224 | return -ENOMEM; | |
3225 | ||
3226 | user->reset(u); | |
3227 | return 0; | |
3228 | } | |
3229 | ||
3230 | int MotrStore::get_user_by_swift(const DoutPrefixProvider *dpp, const std::string& user_str, optional_yield y, std::unique_ptr<User>* user) | |
3231 | { | |
3232 | /* Swift keys and subusers are not supported for now */ | |
3233 | return 0; | |
3234 | } | |
3235 | ||
3236 | int MotrStore::store_access_key(const DoutPrefixProvider *dpp, optional_yield y, MotrAccessKey access_key) | |
3237 | { | |
3238 | int rc; | |
3239 | bufferlist bl; | |
3240 | access_key.encode(bl); | |
3241 | rc = do_idx_op_by_name(RGW_IAM_MOTR_ACCESS_KEY, | |
3242 | M0_IC_PUT, access_key.id, bl); | |
3243 | if (rc < 0){ | |
3244 | ldout(cctx, 0) << "Failed to store key: rc = " << rc << dendl; | |
3245 | return rc; | |
3246 | } | |
3247 | return rc; | |
3248 | } | |
3249 | ||
3250 | int MotrStore::delete_access_key(const DoutPrefixProvider *dpp, optional_yield y, std::string access_key) | |
3251 | { | |
3252 | int rc; | |
3253 | bufferlist bl; | |
3254 | rc = do_idx_op_by_name(RGW_IAM_MOTR_ACCESS_KEY, | |
3255 | M0_IC_DEL, access_key, bl); | |
3256 | if (rc < 0){ | |
3257 | ldout(cctx, 0) << "Failed to delete key: rc = " << rc << dendl; | |
3258 | } | |
3259 | return rc; | |
3260 | } | |
3261 | ||
3262 | int MotrStore::store_email_info(const DoutPrefixProvider *dpp, optional_yield y, MotrEmailInfo& email_info ) | |
3263 | { | |
3264 | int rc; | |
3265 | bufferlist bl; | |
3266 | email_info.encode(bl); | |
3267 | rc = do_idx_op_by_name(RGW_IAM_MOTR_EMAIL_KEY, | |
3268 | M0_IC_PUT, email_info.email_id, bl); | |
3269 | if (rc < 0) { | |
3270 | ldout(cctx, 0) << "Failed to store the user by email as key: rc = " << rc << dendl; | |
3271 | } | |
3272 | return rc; | |
3273 | } | |
3274 | ||
3275 | std::unique_ptr<Object> MotrStore::get_object(const rgw_obj_key& k) | |
3276 | { | |
3277 | return std::make_unique<MotrObject>(this, k); | |
3278 | } | |
3279 | ||
3280 | ||
3281 | int MotrStore::get_bucket(const DoutPrefixProvider *dpp, User* u, const rgw_bucket& b, std::unique_ptr<Bucket>* bucket, optional_yield y) | |
3282 | { | |
3283 | int ret; | |
3284 | Bucket* bp; | |
3285 | ||
3286 | bp = new MotrBucket(this, b, u); | |
3287 | ret = bp->load_bucket(dpp, y); | |
3288 | if (ret < 0) { | |
3289 | delete bp; | |
3290 | return ret; | |
3291 | } | |
3292 | ||
3293 | bucket->reset(bp); | |
3294 | return 0; | |
3295 | } | |
3296 | ||
3297 | int MotrStore::get_bucket(User* u, const RGWBucketInfo& i, std::unique_ptr<Bucket>* bucket) | |
3298 | { | |
3299 | Bucket* bp; | |
3300 | ||
3301 | bp = new MotrBucket(this, i, u); | |
3302 | /* Don't need to fetch the bucket info, use the provided one */ | |
3303 | ||
3304 | bucket->reset(bp); | |
3305 | return 0; | |
3306 | } | |
3307 | ||
3308 | int MotrStore::get_bucket(const DoutPrefixProvider *dpp, User* u, const std::string& tenant, const std::string& name, std::unique_ptr<Bucket>* bucket, optional_yield y) | |
3309 | { | |
3310 | rgw_bucket b; | |
3311 | ||
3312 | b.tenant = tenant; | |
3313 | b.name = name; | |
3314 | ||
3315 | return get_bucket(dpp, u, b, bucket, y); | |
3316 | } | |
3317 | ||
3318 | bool MotrStore::is_meta_master() | |
3319 | { | |
3320 | return true; | |
3321 | } | |
3322 | ||
3323 | int MotrStore::forward_request_to_master(const DoutPrefixProvider *dpp, User* user, obj_version *objv, | |
3324 | bufferlist& in_data, | |
3325 | JSONParser *jp, req_info& info, | |
3326 | optional_yield y) | |
3327 | { | |
3328 | return 0; | |
3329 | } | |
3330 | ||
3331 | int MotrStore::forward_iam_request_to_master(const DoutPrefixProvider *dpp, const RGWAccessKey& key, obj_version* objv, | |
3332 | bufferlist& in_data, | |
3333 | RGWXMLDecoder::XMLParser* parser, req_info& info, | |
3334 | optional_yield y) | |
3335 | { | |
3336 | return 0; | |
3337 | } | |
3338 | ||
3339 | std::string MotrStore::zone_unique_id(uint64_t unique_num) | |
3340 | { | |
3341 | return ""; | |
3342 | } | |
3343 | ||
3344 | std::string MotrStore::zone_unique_trans_id(const uint64_t unique_num) | |
3345 | { | |
3346 | return ""; | |
3347 | } | |
3348 | ||
3349 | int MotrStore::get_zonegroup(const std::string& id, std::unique_ptr<ZoneGroup>* group) | |
3350 | { | |
3351 | /* XXX: for now only one zonegroup supported */ | |
3352 | ZoneGroup* zg; | |
3353 | zg = new MotrZoneGroup(this, zone.zonegroup.get_group()); | |
3354 | ||
3355 | group->reset(zg); | |
3356 | return 0; | |
3357 | } | |
3358 | ||
3359 | int MotrStore::list_all_zones(const DoutPrefixProvider* dpp, | |
3360 | std::list<std::string>& zone_ids) | |
3361 | { | |
3362 | zone_ids.push_back(zone.get_id()); | |
3363 | return 0; | |
3364 | } | |
3365 | ||
3366 | int MotrStore::cluster_stat(RGWClusterStat& stats) | |
3367 | { | |
3368 | return 0; | |
3369 | } | |
3370 | ||
3371 | std::unique_ptr<Lifecycle> MotrStore::get_lifecycle(void) | |
3372 | { | |
3373 | return 0; | |
3374 | } | |
3375 | ||
3376 | std::unique_ptr<Completions> MotrStore::get_completions(void) | |
3377 | { | |
3378 | return 0; | |
3379 | } | |
3380 | ||
3381 | std::unique_ptr<Notification> MotrStore::get_notification(Object* obj, Object* src_obj, req_state* s, | |
3382 | rgw::notify::EventType event_type, optional_yield y, const string* object_name) | |
3383 | { | |
3384 | return std::make_unique<MotrNotification>(obj, src_obj, event_type); | |
3385 | } | |
3386 | ||
3387 | std::unique_ptr<Notification> MotrStore::get_notification(const DoutPrefixProvider* dpp, Object* obj, | |
3388 | Object* src_obj, rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, | |
3389 | std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y) | |
3390 | { | |
3391 | return std::make_unique<MotrNotification>(obj, src_obj, event_type); | |
3392 | } | |
3393 | ||
3394 | int MotrStore::log_usage(const DoutPrefixProvider *dpp, map<rgw_user_bucket, RGWUsageBatch>& usage_info) | |
3395 | { | |
3396 | return 0; | |
3397 | } | |
3398 | ||
3399 | int MotrStore::log_op(const DoutPrefixProvider *dpp, string& oid, bufferlist& bl) | |
3400 | { | |
3401 | return 0; | |
3402 | } | |
3403 | ||
3404 | int MotrStore::register_to_service_map(const DoutPrefixProvider *dpp, const string& daemon_type, | |
3405 | const map<string, string>& meta) | |
3406 | { | |
3407 | return 0; | |
3408 | } | |
3409 | ||
3410 | void MotrStore::get_ratelimit(RGWRateLimitInfo& bucket_ratelimit, | |
3411 | RGWRateLimitInfo& user_ratelimit, | |
3412 | RGWRateLimitInfo& anon_ratelimit) | |
3413 | { | |
3414 | return; | |
3415 | } | |
3416 | ||
3417 | void MotrStore::get_quota(RGWQuota& quota) | |
3418 | { | |
3419 | // XXX: Not handled for the first pass | |
3420 | return; | |
3421 | } | |
3422 | ||
3423 | int MotrStore::set_buckets_enabled(const DoutPrefixProvider *dpp, vector<rgw_bucket>& buckets, bool enabled) | |
3424 | { | |
3425 | return 0; | |
3426 | } | |
3427 | ||
3428 | int MotrStore::get_sync_policy_handler(const DoutPrefixProvider *dpp, | |
3429 | std::optional<rgw_zone_id> zone, | |
3430 | std::optional<rgw_bucket> bucket, | |
3431 | RGWBucketSyncPolicyHandlerRef *phandler, | |
3432 | optional_yield y) | |
3433 | { | |
3434 | return 0; | |
3435 | } | |
3436 | ||
3437 | RGWDataSyncStatusManager* MotrStore::get_data_sync_manager(const rgw_zone_id& source_zone) | |
3438 | { | |
3439 | return 0; | |
3440 | } | |
3441 | ||
3442 | int MotrStore::read_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch, | |
3443 | uint32_t max_entries, bool *is_truncated, | |
3444 | RGWUsageIter& usage_iter, | |
3445 | map<rgw_user_bucket, rgw_usage_log_entry>& usage) | |
3446 | { | |
3447 | return 0; | |
3448 | } | |
3449 | ||
3450 | int MotrStore::trim_all_usage(const DoutPrefixProvider *dpp, uint64_t start_epoch, uint64_t end_epoch) | |
3451 | { | |
3452 | return 0; | |
3453 | } | |
3454 | ||
3455 | int MotrStore::get_config_key_val(string name, bufferlist *bl) | |
3456 | { | |
3457 | return 0; | |
3458 | } | |
3459 | ||
3460 | int MotrStore::meta_list_keys_init(const DoutPrefixProvider *dpp, const string& section, const string& marker, void** phandle) | |
3461 | { | |
3462 | return 0; | |
3463 | } | |
3464 | ||
3465 | int MotrStore::meta_list_keys_next(const DoutPrefixProvider *dpp, void* handle, int max, list<string>& keys, bool* truncated) | |
3466 | { | |
3467 | return 0; | |
3468 | } | |
3469 | ||
3470 | void MotrStore::meta_list_keys_complete(void* handle) | |
3471 | { | |
3472 | return; | |
3473 | } | |
3474 | ||
3475 | std::string MotrStore::meta_get_marker(void* handle) | |
3476 | { | |
3477 | return ""; | |
3478 | } | |
3479 | ||
3480 | int MotrStore::meta_remove(const DoutPrefixProvider *dpp, string& metadata_key, optional_yield y) | |
3481 | { | |
3482 | return 0; | |
3483 | } | |
3484 | ||
3485 | int MotrStore::open_idx(struct m0_uint128 *id, bool create, struct m0_idx *idx) | |
3486 | { | |
3487 | m0_idx_init(idx, &container.co_realm, id); | |
3488 | ||
3489 | if (!create) | |
3490 | return 0; // nothing to do more | |
3491 | ||
3492 | // create index or make sure it's created | |
3493 | struct m0_op *op = nullptr; | |
3494 | int rc = m0_entity_create(nullptr, &idx->in_entity, &op); | |
3495 | if (rc != 0) { | |
3496 | ldout(cctx, 0) << "ERROR: m0_entity_create() failed: " << rc << dendl; | |
3497 | goto out; | |
3498 | } | |
3499 | ||
3500 | m0_op_launch(&op, 1); | |
3501 | rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: | |
3502 | m0_rc(op); | |
3503 | m0_op_fini(op); | |
3504 | m0_op_free(op); | |
3505 | ||
3506 | if (rc != 0 && rc != -EEXIST) | |
3507 | ldout(cctx, 0) << "ERROR: index create failed: " << rc << dendl; | |
3508 | out: | |
3509 | return rc; | |
3510 | } | |
3511 | ||
3512 | static void set_m0bufvec(struct m0_bufvec *bv, vector<uint8_t>& vec) | |
3513 | { | |
3514 | *bv->ov_buf = reinterpret_cast<char*>(vec.data()); | |
3515 | *bv->ov_vec.v_count = vec.size(); | |
3516 | } | |
3517 | ||
3518 | // idx must be opened with open_idx() beforehand | |
3519 | int MotrStore::do_idx_op(struct m0_idx *idx, enum m0_idx_opcode opcode, | |
3520 | vector<uint8_t>& key, vector<uint8_t>& val, bool update) | |
3521 | { | |
3522 | int rc, rc_i; | |
3523 | struct m0_bufvec k, v, *vp = &v; | |
3524 | uint32_t flags = 0; | |
3525 | struct m0_op *op = nullptr; | |
3526 | ||
3527 | if (m0_bufvec_empty_alloc(&k, 1) != 0) { | |
3528 | ldout(cctx, 0) << "ERROR: failed to allocate key bufvec" << dendl; | |
3529 | return -ENOMEM; | |
3530 | } | |
3531 | ||
3532 | if (opcode == M0_IC_PUT || opcode == M0_IC_GET) { | |
3533 | rc = -ENOMEM; | |
3534 | if (m0_bufvec_empty_alloc(&v, 1) != 0) { | |
3535 | ldout(cctx, 0) << "ERROR: failed to allocate value bufvec" << dendl; | |
3536 | goto out; | |
3537 | } | |
3538 | } | |
3539 | ||
3540 | set_m0bufvec(&k, key); | |
3541 | if (opcode == M0_IC_PUT) | |
3542 | set_m0bufvec(&v, val); | |
3543 | ||
3544 | if (opcode == M0_IC_DEL) | |
3545 | vp = nullptr; | |
3546 | ||
3547 | if (opcode == M0_IC_PUT && update) | |
3548 | flags |= M0_OIF_OVERWRITE; | |
3549 | ||
3550 | rc = m0_idx_op(idx, opcode, &k, vp, &rc_i, flags, &op); | |
3551 | if (rc != 0) { | |
3552 | ldout(cctx, 0) << "ERROR: failed to init index op: " << rc << dendl; | |
3553 | goto out; | |
3554 | } | |
3555 | ||
3556 | m0_op_launch(&op, 1); | |
3557 | rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: | |
3558 | m0_rc(op); | |
3559 | m0_op_fini(op); | |
3560 | m0_op_free(op); | |
3561 | ||
3562 | if (rc != 0) { | |
3563 | ldout(cctx, 0) << "ERROR: op failed: " << rc << dendl; | |
3564 | goto out; | |
3565 | } | |
3566 | ||
3567 | if (rc_i != 0) { | |
3568 | ldout(cctx, 0) << "ERROR: idx op failed: " << rc_i << dendl; | |
3569 | rc = rc_i; | |
3570 | goto out; | |
3571 | } | |
3572 | ||
3573 | if (opcode == M0_IC_GET) { | |
3574 | val.resize(*v.ov_vec.v_count); | |
3575 | memcpy(reinterpret_cast<char*>(val.data()), *v.ov_buf, *v.ov_vec.v_count); | |
3576 | } | |
3577 | ||
3578 | out: | |
3579 | m0_bufvec_free2(&k); | |
3580 | if (opcode == M0_IC_GET) | |
3581 | m0_bufvec_free(&v); // cleanup buffer after GET | |
3582 | else if (opcode == M0_IC_PUT) | |
3583 | m0_bufvec_free2(&v); | |
3584 | ||
3585 | return rc; | |
3586 | } | |
3587 | ||
3588 | // Retrieve a range of key/value pairs starting from keys[0]. | |
3589 | int MotrStore::do_idx_next_op(struct m0_idx *idx, | |
3590 | vector<vector<uint8_t>>& keys, | |
3591 | vector<vector<uint8_t>>& vals) | |
3592 | { | |
3593 | int rc; | |
3594 | uint32_t i = 0; | |
3595 | int nr_kvp = vals.size(); | |
3596 | int *rcs = new int[nr_kvp]; | |
3597 | struct m0_bufvec k, v; | |
3598 | struct m0_op *op = nullptr; | |
3599 | ||
3600 | rc = m0_bufvec_empty_alloc(&k, nr_kvp)?: | |
3601 | m0_bufvec_empty_alloc(&v, nr_kvp); | |
3602 | if (rc != 0) { | |
3603 | ldout(cctx, 0) << "ERROR: failed to allocate kv bufvecs" << dendl; | |
3604 | return rc; | |
3605 | } | |
3606 | ||
3607 | set_m0bufvec(&k, keys[0]); | |
3608 | ||
3609 | rc = m0_idx_op(idx, M0_IC_NEXT, &k, &v, rcs, 0, &op); | |
3610 | if (rc != 0) { | |
3611 | ldout(cctx, 0) << "ERROR: failed to init index op: " << rc << dendl; | |
3612 | goto out; | |
3613 | } | |
3614 | ||
3615 | m0_op_launch(&op, 1); | |
3616 | rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: | |
3617 | m0_rc(op); | |
3618 | m0_op_fini(op); | |
3619 | m0_op_free(op); | |
3620 | ||
3621 | if (rc != 0) { | |
3622 | ldout(cctx, 0) << "ERROR: op failed: " << rc << dendl; | |
3623 | goto out; | |
3624 | } | |
3625 | ||
3626 | for (i = 0; i < v.ov_vec.v_nr; ++i) { | |
3627 | if (rcs[i] < 0) | |
3628 | break; | |
3629 | ||
3630 | vector<uint8_t>& key = keys[i]; | |
3631 | vector<uint8_t>& val = vals[i]; | |
3632 | key.resize(k.ov_vec.v_count[i]); | |
3633 | val.resize(v.ov_vec.v_count[i]); | |
3634 | memcpy(reinterpret_cast<char*>(key.data()), k.ov_buf[i], k.ov_vec.v_count[i]); | |
3635 | memcpy(reinterpret_cast<char*>(val.data()), v.ov_buf[i], v.ov_vec.v_count[i]); | |
3636 | } | |
3637 | ||
3638 | out: | |
3639 | k.ov_vec.v_nr = i; | |
3640 | v.ov_vec.v_nr = i; | |
3641 | m0_bufvec_free(&k); | |
3642 | m0_bufvec_free(&v); // cleanup buffer after GET | |
3643 | ||
3644 | delete []rcs; | |
3645 | return rc ?: i; | |
3646 | } | |
3647 | ||
3648 | // Retrieve a number of key/value pairs under the prefix starting | |
3649 | // from the marker at key_out[0]. | |
3650 | int MotrStore::next_query_by_name(string idx_name, | |
3651 | vector<string>& key_out, | |
3652 | vector<bufferlist>& val_out, | |
3653 | string prefix, string delim) | |
3654 | { | |
3655 | unsigned nr_kvp = std::min(val_out.size(), 100UL); | |
3656 | struct m0_idx idx = {}; | |
3657 | vector<vector<uint8_t>> keys(nr_kvp); | |
3658 | vector<vector<uint8_t>> vals(nr_kvp); | |
3659 | struct m0_uint128 idx_id; | |
3660 | int i = 0, j, k = 0; | |
3661 | ||
3662 | index_name_to_motr_fid(idx_name, &idx_id); | |
3663 | int rc = open_motr_idx(&idx_id, &idx); | |
3664 | if (rc != 0) { | |
3665 | ldout(cctx, 0) << "ERROR: next_query_by_name(): failed to open index: rc=" | |
3666 | << rc << dendl; | |
3667 | goto out; | |
3668 | } | |
3669 | ||
3670 | // Only the first element for keys needs to be set for NEXT query. | |
3671 | // The keys will be set will the returned keys from motr index. | |
3672 | ldout(cctx, 20) <<__func__<< ": next_query_by_name(): index=" << idx_name | |
3673 | << " prefix=" << prefix << " delim=" << delim << dendl; | |
3674 | keys[0].assign(key_out[0].begin(), key_out[0].end()); | |
3675 | for (i = 0; i < (int)val_out.size(); i += k, k = 0) { | |
3676 | rc = do_idx_next_op(&idx, keys, vals); | |
3677 | ldout(cctx, 20) << "do_idx_next_op() = " << rc << dendl; | |
3678 | if (rc < 0) { | |
3679 | ldout(cctx, 0) << "ERROR: NEXT query failed. " << rc << dendl; | |
3680 | goto out; | |
3681 | } | |
3682 | ||
3683 | string dir; | |
3684 | for (j = 0, k = 0; j < rc; ++j) { | |
3685 | string key(keys[j].begin(), keys[j].end()); | |
3686 | size_t pos = std::string::npos; | |
3687 | if (!delim.empty()) | |
3688 | pos = key.find(delim, prefix.length()); | |
3689 | if (pos != std::string::npos) { // DIR entry | |
3690 | dir.assign(key, 0, pos + 1); | |
3691 | if (dir.compare(0, prefix.length(), prefix) != 0) | |
3692 | goto out; | |
3693 | if (i + k == 0 || dir != key_out[i + k - 1]) // a new one | |
3694 | key_out[i + k++] = dir; | |
3695 | continue; | |
3696 | } | |
3697 | dir = ""; | |
3698 | if (key.compare(0, prefix.length(), prefix) != 0) | |
3699 | goto out; | |
3700 | key_out[i + k] = key; | |
3701 | bufferlist& vbl = val_out[i + k]; | |
3702 | vbl.append(reinterpret_cast<char*>(vals[j].data()), vals[j].size()); | |
3703 | ++k; | |
3704 | } | |
3705 | ||
3706 | if (rc < (int)nr_kvp) // there are no more keys to fetch | |
3707 | break; | |
3708 | ||
3709 | string next_key; | |
3710 | if (dir != "") | |
3711 | next_key = dir + "\xff"; // skip all dir content in 1 step | |
3712 | else | |
3713 | next_key = key_out[i + k - 1] + " "; | |
3714 | ldout(cctx, 0) << "do_idx_next_op(): next_key=" << next_key << dendl; | |
3715 | keys[0].assign(next_key.begin(), next_key.end()); | |
3716 | } | |
3717 | ||
3718 | out: | |
3719 | m0_idx_fini(&idx); | |
3720 | return rc < 0 ? rc : i + k; | |
3721 | } | |
3722 | ||
3723 | int MotrStore::delete_motr_idx_by_name(string iname) | |
3724 | { | |
3725 | struct m0_idx idx; | |
3726 | struct m0_uint128 idx_id; | |
3727 | struct m0_op *op = nullptr; | |
3728 | ||
3729 | ldout(cctx, 20) << "delete_motr_idx_by_name=" << iname << dendl; | |
3730 | ||
3731 | index_name_to_motr_fid(iname, &idx_id); | |
3732 | m0_idx_init(&idx, &container.co_realm, &idx_id); | |
3733 | m0_entity_open(&idx.in_entity, &op); | |
3734 | int rc = m0_entity_delete(&idx.in_entity, &op); | |
3735 | if (rc < 0) | |
3736 | goto out; | |
3737 | ||
3738 | m0_op_launch(&op, 1); | |
3739 | ||
3740 | ldout(cctx, 70) << "waiting for op completion" << dendl; | |
3741 | ||
3742 | rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: | |
3743 | m0_rc(op); | |
3744 | m0_op_fini(op); | |
3745 | m0_op_free(op); | |
3746 | ||
3747 | if (rc == -ENOENT) // race deletion?? | |
3748 | rc = 0; | |
3749 | else if (rc < 0) | |
3750 | ldout(cctx, 0) << "ERROR: index create failed: " << rc << dendl; | |
3751 | ||
3752 | ldout(cctx, 20) << "delete_motr_idx_by_name rc=" << rc << dendl; | |
3753 | ||
3754 | out: | |
3755 | m0_idx_fini(&idx); | |
3756 | return rc; | |
3757 | } | |
3758 | ||
3759 | int MotrStore::open_motr_idx(struct m0_uint128 *id, struct m0_idx *idx) | |
3760 | { | |
3761 | m0_idx_init(idx, &container.co_realm, id); | |
3762 | return 0; | |
3763 | } | |
3764 | ||
3765 | // The following marcos are from dix/fid_convert.h which are not exposed. | |
3766 | enum { | |
3767 | M0_DIX_FID_DEVICE_ID_OFFSET = 32, | |
3768 | M0_DIX_FID_DIX_CONTAINER_MASK = (1ULL << M0_DIX_FID_DEVICE_ID_OFFSET) | |
3769 | - 1, | |
3770 | }; | |
3771 | ||
3772 | // md5 is used here, a more robust way to convert index name to fid is | |
3773 | // needed to avoid collision. | |
3774 | void MotrStore::index_name_to_motr_fid(string iname, struct m0_uint128 *id) | |
3775 | { | |
3776 | unsigned char md5[16]; // 128/8 = 16 | |
3777 | MD5 hash; | |
3778 | ||
3779 | // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes | |
3780 | hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW); | |
3781 | hash.Update((const unsigned char *)iname.c_str(), iname.length()); | |
3782 | hash.Final(md5); | |
3783 | ||
3784 | memcpy(&id->u_hi, md5, 8); | |
3785 | memcpy(&id->u_lo, md5 + 8, 8); | |
3786 | ldout(cctx, 20) << "id = 0x" << std::hex << id->u_hi << ":0x" << std::hex << id->u_lo << dendl; | |
3787 | ||
3788 | struct m0_fid *fid = (struct m0_fid*)id; | |
3789 | m0_fid_tset(fid, m0_dix_fid_type.ft_id, | |
3790 | fid->f_container & M0_DIX_FID_DIX_CONTAINER_MASK, fid->f_key); | |
3791 | ldout(cctx, 20) << "converted id = 0x" << std::hex << id->u_hi << ":0x" << std::hex << id->u_lo << dendl; | |
3792 | } | |
3793 | ||
3794 | int MotrStore::do_idx_op_by_name(string idx_name, enum m0_idx_opcode opcode, | |
3795 | string key_str, bufferlist &bl, bool update) | |
3796 | { | |
3797 | struct m0_idx idx; | |
3798 | vector<uint8_t> key(key_str.begin(), key_str.end()); | |
3799 | vector<uint8_t> val; | |
3800 | struct m0_uint128 idx_id; | |
3801 | ||
3802 | index_name_to_motr_fid(idx_name, &idx_id); | |
3803 | int rc = open_motr_idx(&idx_id, &idx); | |
3804 | if (rc != 0) { | |
3805 | ldout(cctx, 0) << "ERROR: failed to open index: " << rc << dendl; | |
3806 | goto out; | |
3807 | } | |
3808 | ||
3809 | if (opcode == M0_IC_PUT) | |
3810 | val.assign(bl.c_str(), bl.c_str() + bl.length()); | |
3811 | ||
3812 | ldout(cctx, 20) <<__func__<< ": do_idx_op_by_name(): op=" | |
3813 | << (opcode == M0_IC_PUT ? "PUT" : "GET") | |
3814 | << " idx=" << idx_name << " key=" << key_str << dendl; | |
3815 | rc = do_idx_op(&idx, opcode, key, val, update); | |
3816 | if (rc == 0 && opcode == M0_IC_GET) | |
3817 | // Append the returned value (blob) to the bufferlist. | |
3818 | bl.append(reinterpret_cast<char*>(val.data()), val.size()); | |
3819 | ||
3820 | out: | |
3821 | m0_idx_fini(&idx); | |
3822 | return rc; | |
3823 | } | |
3824 | ||
3825 | int MotrStore::create_motr_idx_by_name(string iname) | |
3826 | { | |
3827 | struct m0_idx idx = {}; | |
3828 | struct m0_uint128 id; | |
3829 | ||
3830 | index_name_to_motr_fid(iname, &id); | |
3831 | m0_idx_init(&idx, &container.co_realm, &id); | |
3832 | ||
3833 | // create index or make sure it's created | |
3834 | struct m0_op *op = nullptr; | |
3835 | int rc = m0_entity_create(nullptr, &idx.in_entity, &op); | |
3836 | if (rc != 0) { | |
3837 | ldout(cctx, 0) << "ERROR: m0_entity_create() failed: " << rc << dendl; | |
3838 | goto out; | |
3839 | } | |
3840 | ||
3841 | m0_op_launch(&op, 1); | |
3842 | rc = m0_op_wait(op, M0_BITS(M0_OS_FAILED, M0_OS_STABLE), M0_TIME_NEVER) ?: | |
3843 | m0_rc(op); | |
3844 | m0_op_fini(op); | |
3845 | m0_op_free(op); | |
3846 | ||
3847 | if (rc != 0 && rc != -EEXIST) | |
3848 | ldout(cctx, 0) << "ERROR: index create failed: " << rc << dendl; | |
3849 | out: | |
3850 | m0_idx_fini(&idx); | |
3851 | return rc; | |
3852 | } | |
3853 | ||
3854 | // If a global index is checked (if it has been create) every time | |
3855 | // before they're queried (put/get), which takes 2 Motr operations to | |
3856 | // complete the query. As the global indices' name and FID are known | |
3857 | // already when MotrStore is created, we move the check and creation | |
3858 | // in newMotrStore(). | |
3859 | // Similar method is used for per bucket/user index. For example, | |
3860 | // bucket instance index is created when creating the bucket. | |
3861 | int MotrStore::check_n_create_global_indices() | |
3862 | { | |
3863 | int rc = 0; | |
3864 | ||
3865 | for (const auto& iname : motr_global_indices) { | |
3866 | rc = create_motr_idx_by_name(iname); | |
3867 | if (rc < 0 && rc != -EEXIST) | |
3868 | break; | |
3869 | rc = 0; | |
3870 | } | |
3871 | ||
3872 | return rc; | |
3873 | } | |
3874 | ||
3875 | std::string MotrStore::get_cluster_id(const DoutPrefixProvider* dpp, optional_yield y) | |
3876 | { | |
3877 | char id[M0_FID_STR_LEN]; | |
3878 | struct m0_confc *confc = m0_reqh2confc(&instance->m0c_reqh); | |
3879 | ||
3880 | m0_fid_print(id, ARRAY_SIZE(id), &confc->cc_root->co_id); | |
3881 | return std::string(id); | |
3882 | } | |
3883 | ||
3884 | int MotrStore::init_metadata_cache(const DoutPrefixProvider *dpp, | |
3885 | CephContext *cct) | |
3886 | { | |
3887 | this->obj_meta_cache = new MotrMetaCache(dpp, cct); | |
3888 | this->get_obj_meta_cache()->set_enabled(true); | |
3889 | ||
3890 | this->user_cache = new MotrMetaCache(dpp, cct); | |
3891 | this->get_user_cache()->set_enabled(true); | |
3892 | ||
3893 | this->bucket_inst_cache = new MotrMetaCache(dpp, cct); | |
3894 | this->get_bucket_inst_cache()->set_enabled(true); | |
3895 | ||
3896 | return 0; | |
3897 | } | |
3898 | ||
3899 | int MotrLuaManager::get_script(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, std::string& script) | |
3900 | { | |
3901 | return -ENOENT; | |
3902 | } | |
3903 | ||
3904 | int MotrLuaManager::put_script(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key, const std::string& script) | |
3905 | { | |
3906 | return -ENOENT; | |
3907 | } | |
3908 | ||
3909 | int MotrLuaManager::del_script(const DoutPrefixProvider* dpp, optional_yield y, const std::string& key) | |
3910 | { | |
3911 | return -ENOENT; | |
3912 | } | |
3913 | ||
3914 | int MotrLuaManager::add_package(const DoutPrefixProvider* dpp, optional_yield y, const std::string& package_name) | |
3915 | { | |
3916 | return -ENOENT; | |
3917 | } | |
3918 | ||
3919 | int MotrLuaManager::remove_package(const DoutPrefixProvider* dpp, optional_yield y, const std::string& package_name) | |
3920 | { | |
3921 | return -ENOENT; | |
3922 | } | |
3923 | ||
3924 | int MotrLuaManager::list_packages(const DoutPrefixProvider* dpp, optional_yield y, rgw::lua::packages_t& packages) | |
3925 | { | |
3926 | return -ENOENT; | |
3927 | } | |
3928 | } // namespace rgw::sal | |
3929 | ||
3930 | extern "C" { | |
3931 | ||
3932 | void *newMotrStore(CephContext *cct) | |
3933 | { | |
3934 | int rc = -1; | |
3935 | rgw::sal::MotrStore *store = new rgw::sal::MotrStore(cct); | |
3936 | ||
3937 | if (store) { | |
3938 | store->conf.mc_is_oostore = true; | |
3939 | // XXX: these params should be taken from config settings and | |
3940 | // cct somehow? | |
3941 | store->instance = nullptr; | |
3942 | const auto& proc_ep = g_conf().get_val<std::string>("motr_my_endpoint"); | |
3943 | const auto& ha_ep = g_conf().get_val<std::string>("motr_ha_endpoint"); | |
3944 | const auto& proc_fid = g_conf().get_val<std::string>("motr_my_fid"); | |
3945 | const auto& profile = g_conf().get_val<std::string>("motr_profile_fid"); | |
3946 | const auto& admin_proc_ep = g_conf().get_val<std::string>("motr_admin_endpoint"); | |
3947 | const auto& admin_proc_fid = g_conf().get_val<std::string>("motr_admin_fid"); | |
3948 | const int init_flags = cct->get_init_flags(); | |
3949 | ldout(cct, 0) << "INFO: motr my endpoint: " << proc_ep << dendl; | |
3950 | ldout(cct, 0) << "INFO: motr ha endpoint: " << ha_ep << dendl; | |
3951 | ldout(cct, 0) << "INFO: motr my fid: " << proc_fid << dendl; | |
3952 | ldout(cct, 0) << "INFO: motr profile fid: " << profile << dendl; | |
3953 | store->conf.mc_local_addr = proc_ep.c_str(); | |
3954 | store->conf.mc_process_fid = proc_fid.c_str(); | |
3955 | ||
3956 | ldout(cct, 0) << "INFO: init flags: " << init_flags << dendl; | |
3957 | ldout(cct, 0) << "INFO: motr admin endpoint: " << admin_proc_ep << dendl; | |
3958 | ldout(cct, 0) << "INFO: motr admin fid: " << admin_proc_fid << dendl; | |
3959 | ||
3960 | // HACK this is so that radosge-admin uses a different client | |
3961 | if (init_flags == 0) { | |
3962 | store->conf.mc_process_fid = admin_proc_fid.c_str(); | |
3963 | store->conf.mc_local_addr = admin_proc_ep.c_str(); | |
3964 | } else { | |
3965 | store->conf.mc_process_fid = proc_fid.c_str(); | |
3966 | store->conf.mc_local_addr = proc_ep.c_str(); | |
3967 | } | |
3968 | store->conf.mc_ha_addr = ha_ep.c_str(); | |
3969 | store->conf.mc_profile = profile.c_str(); | |
3970 | ||
3971 | ldout(cct, 50) << "INFO: motr profile fid: " << store->conf.mc_profile << dendl; | |
3972 | ldout(cct, 50) << "INFO: ha addr: " << store->conf.mc_ha_addr << dendl; | |
3973 | ldout(cct, 50) << "INFO: process fid: " << store->conf.mc_process_fid << dendl; | |
3974 | ldout(cct, 50) << "INFO: motr endpoint: " << store->conf.mc_local_addr << dendl; | |
3975 | ||
3976 | store->conf.mc_tm_recv_queue_min_len = 64; | |
3977 | store->conf.mc_max_rpc_msg_size = 524288; | |
3978 | store->conf.mc_idx_service_id = M0_IDX_DIX; | |
3979 | store->dix_conf.kc_create_meta = false; | |
3980 | store->conf.mc_idx_service_conf = &store->dix_conf; | |
3981 | ||
3982 | if (!g_conf().get_val<bool>("motr_tracing_enabled")) { | |
3983 | m0_trace_level_allow(M0_WARN); // allow errors and warnings in syslog anyway | |
3984 | m0_trace_set_mmapped_buffer(false); | |
3985 | } | |
3986 | ||
3987 | store->instance = nullptr; | |
3988 | rc = m0_client_init(&store->instance, &store->conf, true); | |
3989 | if (rc != 0) { | |
3990 | ldout(cct, 0) << "ERROR: m0_client_init() failed: " << rc << dendl; | |
3991 | goto out; | |
3992 | } | |
3993 | ||
3994 | m0_container_init(&store->container, nullptr, &M0_UBER_REALM, store->instance); | |
3995 | rc = store->container.co_realm.re_entity.en_sm.sm_rc; | |
3996 | if (rc != 0) { | |
3997 | ldout(cct, 0) << "ERROR: m0_container_init() failed: " << rc << dendl; | |
3998 | goto out; | |
3999 | } | |
4000 | ||
4001 | rc = m0_ufid_init(store->instance, &ufid_gr); | |
4002 | if (rc != 0) { | |
4003 | ldout(cct, 0) << "ERROR: m0_ufid_init() failed: " << rc << dendl; | |
4004 | goto out; | |
4005 | } | |
4006 | ||
4007 | // Create global indices if not yet. | |
4008 | rc = store->check_n_create_global_indices(); | |
4009 | if (rc != 0) { | |
4010 | ldout(cct, 0) << "ERROR: check_n_create_global_indices() failed: " << rc << dendl; | |
4011 | goto out; | |
4012 | } | |
4013 | ||
4014 | } | |
4015 | ||
4016 | out: | |
4017 | if (rc != 0) { | |
4018 | delete store; | |
4019 | return nullptr; | |
4020 | } | |
4021 | return store; | |
4022 | } | |
4023 | ||
4024 | } |