]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_tools.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / rgw / rgw_tools.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <errno.h>
5
6 #include "common/errno.h"
7 #include "common/safe_io.h"
8 #include "librados/librados_asio.h"
9 #include "common/async/yield_context.h"
10
11 #include "include/types.h"
12 #include "include/stringify.h"
13
14 #include "librados/AioCompletionImpl.h"
15
16 #include "rgw_common.h"
17 #include "rgw_tools.h"
18 #include "rgw_acl_s3.h"
19 #include "rgw_op.h"
20 #include "rgw_putobj_processor.h"
21 #include "rgw_aio_throttle.h"
22 #include "rgw_compression.h"
23 #include "rgw_zone.h"
24 #include "rgw_sal_rados.h"
25 #include "osd/osd_types.h"
26
27 #include "services/svc_sys_obj.h"
28 #include "services/svc_zone.h"
29 #include "services/svc_zone_utils.h"
30
31 #define dout_subsys ceph_subsys_rgw
32 #define dout_context g_ceph_context
33
34 #define READ_CHUNK_LEN (512 * 1024)
35
36 static std::map<std::string, std::string>* ext_mime_map;
37
38 int rgw_init_ioctx(const DoutPrefixProvider *dpp,
39 librados::Rados *rados, const rgw_pool& pool,
40 librados::IoCtx& ioctx, bool create,
41 bool mostly_omap)
42 {
43 int r = rados->ioctx_create(pool.name.c_str(), ioctx);
44 if (r == -ENOENT && create) {
45 r = rados->pool_create(pool.name.c_str());
46 if (r == -ERANGE) {
47 ldpp_dout(dpp, 0)
48 << __func__
49 << " ERROR: librados::Rados::pool_create returned " << cpp_strerror(-r)
50 << " (this can be due to a pool or placement group misconfiguration, e.g."
51 << " pg_num < pgp_num or mon_max_pg_per_osd exceeded)"
52 << dendl;
53 }
54 if (r < 0 && r != -EEXIST) {
55 return r;
56 }
57
58 r = rados->ioctx_create(pool.name.c_str(), ioctx);
59 if (r < 0) {
60 return r;
61 }
62
63 r = ioctx.application_enable(pg_pool_t::APPLICATION_NAME_RGW, false);
64 if (r < 0 && r != -EOPNOTSUPP) {
65 return r;
66 }
67
68 if (mostly_omap) {
69 // set pg_autoscale_bias
70 bufferlist inbl;
71 float bias = g_conf().get_val<double>("rgw_rados_pool_autoscale_bias");
72 int r = rados->mon_command(
73 "{\"prefix\": \"osd pool set\", \"pool\": \"" +
74 pool.name + "\", \"var\": \"pg_autoscale_bias\", \"val\": \"" +
75 stringify(bias) + "\"}",
76 inbl, NULL, NULL);
77 if (r < 0) {
78 ldpp_dout(dpp, 10) << __func__ << " warning: failed to set pg_autoscale_bias on "
79 << pool.name << dendl;
80 }
81 // set pg_num_min
82 int min = g_conf().get_val<uint64_t>("rgw_rados_pool_pg_num_min");
83 r = rados->mon_command(
84 "{\"prefix\": \"osd pool set\", \"pool\": \"" +
85 pool.name + "\", \"var\": \"pg_num_min\", \"val\": \"" +
86 stringify(min) + "\"}",
87 inbl, NULL, NULL);
88 if (r < 0) {
89 ldpp_dout(dpp, 10) << __func__ << " warning: failed to set pg_num_min on "
90 << pool.name << dendl;
91 }
92 // set recovery_priority
93 int p = g_conf().get_val<uint64_t>("rgw_rados_pool_recovery_priority");
94 r = rados->mon_command(
95 "{\"prefix\": \"osd pool set\", \"pool\": \"" +
96 pool.name + "\", \"var\": \"recovery_priority\": \"" +
97 stringify(p) + "\"}",
98 inbl, NULL, NULL);
99 if (r < 0) {
100 ldpp_dout(dpp, 10) << __func__ << " warning: failed to set recovery_priority on "
101 << pool.name << dendl;
102 }
103 }
104 } else if (r < 0) {
105 return r;
106 }
107 if (!pool.ns.empty()) {
108 ioctx.set_namespace(pool.ns);
109 }
110 return 0;
111 }
112
113 void rgw_shard_name(const string& prefix, unsigned max_shards, const string& key, string& name, int *shard_id)
114 {
115 uint32_t val = ceph_str_hash_linux(key.c_str(), key.size());
116 char buf[16];
117 if (shard_id) {
118 *shard_id = val % max_shards;
119 }
120 snprintf(buf, sizeof(buf), "%u", (unsigned)(val % max_shards));
121 name = prefix + buf;
122 }
123
124 void rgw_shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name)
125 {
126 uint32_t val = ceph_str_hash_linux(key.c_str(), key.size());
127 val ^= ceph_str_hash_linux(section.c_str(), section.size());
128 char buf[16];
129 snprintf(buf, sizeof(buf), "%u", (unsigned)(val % max_shards));
130 name = prefix + buf;
131 }
132
133 void rgw_shard_name(const string& prefix, unsigned shard_id, string& name)
134 {
135 char buf[16];
136 snprintf(buf, sizeof(buf), "%u", shard_id);
137 name = prefix + buf;
138 }
139
140 int rgw_parse_list_of_flags(struct rgw_name_to_flag *mapping,
141 const string& str, uint32_t *perm)
142 {
143 list<string> strs;
144 get_str_list(str, strs);
145 list<string>::iterator iter;
146 uint32_t v = 0;
147 for (iter = strs.begin(); iter != strs.end(); ++iter) {
148 string& s = *iter;
149 for (int i = 0; mapping[i].type_name; i++) {
150 if (s.compare(mapping[i].type_name) == 0)
151 v |= mapping[i].flag;
152 }
153 }
154
155 *perm = v;
156 return 0;
157 }
158
159 int rgw_put_system_obj(const DoutPrefixProvider *dpp,
160 RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
161 RGWObjVersionTracker *objv_tracker, real_time set_mtime, optional_yield y, map<string, bufferlist> *pattrs)
162 {
163 map<string,bufferlist> no_attrs;
164 if (!pattrs) {
165 pattrs = &no_attrs;
166 }
167
168 rgw_raw_obj obj(pool, oid);
169
170 auto sysobj = obj_ctx.get_obj(obj);
171 int ret = sysobj.wop()
172 .set_objv_tracker(objv_tracker)
173 .set_exclusive(exclusive)
174 .set_mtime(set_mtime)
175 .set_attrs(*pattrs)
176 .write(dpp, data, y);
177
178 return ret;
179 }
180
181 int rgw_get_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& key, bufferlist& bl,
182 RGWObjVersionTracker *objv_tracker, real_time *pmtime, optional_yield y, const DoutPrefixProvider *dpp, map<string, bufferlist> *pattrs,
183 rgw_cache_entry_info *cache_info,
184 boost::optional<obj_version> refresh_version)
185 {
186 bufferlist::iterator iter;
187 int request_len = READ_CHUNK_LEN;
188 rgw_raw_obj obj(pool, key);
189
190 obj_version original_readv;
191 if (objv_tracker && !objv_tracker->read_version.empty()) {
192 original_readv = objv_tracker->read_version;
193 }
194
195 do {
196 auto sysobj = obj_ctx.get_obj(obj);
197 auto rop = sysobj.rop();
198
199 int ret = rop.set_attrs(pattrs)
200 .set_last_mod(pmtime)
201 .set_objv_tracker(objv_tracker)
202 .stat(y, dpp);
203 if (ret < 0)
204 return ret;
205
206 ret = rop.set_cache_info(cache_info)
207 .set_refresh_version(refresh_version)
208 .read(dpp, &bl, y);
209 if (ret == -ECANCELED) {
210 /* raced, restart */
211 if (!original_readv.empty()) {
212 /* we were asked to read a specific obj_version, failed */
213 return ret;
214 }
215 if (objv_tracker) {
216 objv_tracker->read_version.clear();
217 }
218 sysobj.invalidate();
219 continue;
220 }
221 if (ret < 0)
222 return ret;
223
224 if (ret < request_len)
225 break;
226 bl.clear();
227 request_len *= 2;
228 } while (true);
229
230 return 0;
231 }
232
233 int rgw_delete_system_obj(const DoutPrefixProvider *dpp,
234 RGWSI_SysObj *sysobj_svc, const rgw_pool& pool, const string& oid,
235 RGWObjVersionTracker *objv_tracker, optional_yield y)
236 {
237 auto obj_ctx = sysobj_svc->init_obj_ctx();
238 auto sysobj = obj_ctx.get_obj(rgw_raw_obj{pool, oid});
239 rgw_raw_obj obj(pool, oid);
240 return sysobj.wop()
241 .set_objv_tracker(objv_tracker)
242 .remove(dpp, y);
243 }
244
245 thread_local bool is_asio_thread = false;
246
247 int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
248 librados::ObjectReadOperation *op, bufferlist* pbl,
249 optional_yield y, int flags)
250 {
251 // given a yield_context, call async_operate() to yield the coroutine instead
252 // of blocking
253 if (y) {
254 auto& context = y.get_io_context();
255 auto& yield = y.get_yield_context();
256 boost::system::error_code ec;
257 auto bl = librados::async_operate(
258 context, ioctx, oid, op, flags, yield[ec]);
259 if (pbl) {
260 *pbl = std::move(bl);
261 }
262 return -ec.value();
263 }
264 // work on asio threads should be asynchronous, so warn when they block
265 if (is_asio_thread) {
266 ldpp_dout(dpp, 20) << "WARNING: blocking librados call" << dendl;
267 }
268 return ioctx.operate(oid, op, nullptr, flags);
269 }
270
271 int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
272 librados::ObjectWriteOperation *op, optional_yield y,
273 int flags)
274 {
275 if (y) {
276 auto& context = y.get_io_context();
277 auto& yield = y.get_yield_context();
278 boost::system::error_code ec;
279 librados::async_operate(context, ioctx, oid, op, flags, yield[ec]);
280 return -ec.value();
281 }
282 if (is_asio_thread) {
283 ldpp_dout(dpp, 20) << "WARNING: blocking librados call" << dendl;
284 }
285 return ioctx.operate(oid, op, flags);
286 }
287
288 int rgw_rados_notify(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
289 bufferlist& bl, uint64_t timeout_ms, bufferlist* pbl,
290 optional_yield y)
291 {
292 if (y) {
293 auto& context = y.get_io_context();
294 auto& yield = y.get_yield_context();
295 boost::system::error_code ec;
296 auto reply = librados::async_notify(context, ioctx, oid,
297 bl, timeout_ms, yield[ec]);
298 if (pbl) {
299 *pbl = std::move(reply);
300 }
301 return -ec.value();
302 }
303 if (is_asio_thread) {
304 ldpp_dout(dpp, 20) << "WARNING: blocking librados call" << dendl;
305 }
306 return ioctx.notify2(oid, bl, timeout_ms, pbl);
307 }
308
309 void parse_mime_map_line(const char *start, const char *end)
310 {
311 char line[end - start + 1];
312 strncpy(line, start, end - start);
313 line[end - start] = '\0';
314 char *l = line;
315 #define DELIMS " \t\n\r"
316
317 while (isspace(*l))
318 l++;
319
320 char *mime = strsep(&l, DELIMS);
321 if (!mime)
322 return;
323
324 char *ext;
325 do {
326 ext = strsep(&l, DELIMS);
327 if (ext && *ext) {
328 (*ext_mime_map)[ext] = mime;
329 }
330 } while (ext);
331 }
332
333
334 void parse_mime_map(const char *buf)
335 {
336 const char *start = buf, *end = buf;
337 while (*end) {
338 while (*end && *end != '\n') {
339 end++;
340 }
341 parse_mime_map_line(start, end);
342 end++;
343 start = end;
344 }
345 }
346
347 static int ext_mime_map_init(CephContext *cct, const char *ext_map)
348 {
349 int fd = open(ext_map, O_RDONLY);
350 char *buf = NULL;
351 int ret;
352 if (fd < 0) {
353 ret = -errno;
354 ldout(cct, 0) << __func__ << " failed to open file=" << ext_map
355 << " : " << cpp_strerror(-ret) << dendl;
356 return ret;
357 }
358
359 struct stat st;
360 ret = fstat(fd, &st);
361 if (ret < 0) {
362 ret = -errno;
363 ldout(cct, 0) << __func__ << " failed to stat file=" << ext_map
364 << " : " << cpp_strerror(-ret) << dendl;
365 goto done;
366 }
367
368 buf = (char *)malloc(st.st_size + 1);
369 if (!buf) {
370 ret = -ENOMEM;
371 ldout(cct, 0) << __func__ << " failed to allocate buf" << dendl;
372 goto done;
373 }
374
375 ret = safe_read(fd, buf, st.st_size + 1);
376 if (ret != st.st_size) {
377 // huh? file size has changed?
378 ldout(cct, 0) << __func__ << " raced! will retry.." << dendl;
379 free(buf);
380 close(fd);
381 return ext_mime_map_init(cct, ext_map);
382 }
383 buf[st.st_size] = '\0';
384
385 parse_mime_map(buf);
386 ret = 0;
387 done:
388 free(buf);
389 close(fd);
390 return ret;
391 }
392
393 const char *rgw_find_mime_by_ext(string& ext)
394 {
395 map<string, string>::iterator iter = ext_mime_map->find(ext);
396 if (iter == ext_mime_map->end())
397 return NULL;
398
399 return iter->second.c_str();
400 }
401
402 void rgw_filter_attrset(map<string, bufferlist>& unfiltered_attrset, const string& check_prefix,
403 map<string, bufferlist> *attrset)
404 {
405 attrset->clear();
406 map<string, bufferlist>::iterator iter;
407 for (iter = unfiltered_attrset.lower_bound(check_prefix);
408 iter != unfiltered_attrset.end(); ++iter) {
409 if (!boost::algorithm::starts_with(iter->first, check_prefix))
410 break;
411 (*attrset)[iter->first] = iter->second;
412 }
413 }
414
415 RGWDataAccess::RGWDataAccess(rgw::sal::RGWRadosStore *_store) : store(_store)
416 {
417 sysobj_ctx = std::make_unique<RGWSysObjectCtx>(store->svc()->sysobj->init_obj_ctx());
418 }
419
420
421 int RGWDataAccess::Bucket::finish_init()
422 {
423 auto iter = attrs.find(RGW_ATTR_ACL);
424 if (iter == attrs.end()) {
425 return 0;
426 }
427
428 bufferlist::const_iterator bliter = iter->second.begin();
429 try {
430 policy.decode(bliter);
431 } catch (buffer::error& err) {
432 return -EIO;
433 }
434
435 return 0;
436 }
437
438 int RGWDataAccess::Bucket::init(const DoutPrefixProvider *dpp, optional_yield y)
439 {
440 int ret = sd->store->getRados()->get_bucket_info(sd->store->svc(),
441 tenant, name,
442 bucket_info,
443 &mtime,
444 y,
445 dpp,
446 &attrs);
447 if (ret < 0) {
448 return ret;
449 }
450
451 return finish_init();
452 }
453
454 int RGWDataAccess::Bucket::init(const RGWBucketInfo& _bucket_info,
455 const map<string, bufferlist>& _attrs)
456 {
457 bucket_info = _bucket_info;
458 attrs = _attrs;
459
460 return finish_init();
461 }
462
463 int RGWDataAccess::Bucket::get_object(const rgw_obj_key& key,
464 ObjectRef *obj) {
465 obj->reset(new Object(sd, shared_from_this(), key));
466 return 0;
467 }
468
469 int RGWDataAccess::Object::put(bufferlist& data,
470 map<string, bufferlist>& attrs,
471 const DoutPrefixProvider *dpp,
472 optional_yield y)
473 {
474 rgw::sal::RGWRadosStore *store = sd->store;
475 CephContext *cct = store->ctx();
476
477 string tag;
478 append_rand_alpha(cct, tag, tag, 32);
479
480 RGWBucketInfo& bucket_info = bucket->bucket_info;
481
482 rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
483
484 RGWObjectCtx obj_ctx(store);
485 std::unique_ptr<rgw::sal::RGWBucket> b;
486 store->get_bucket(NULL, bucket_info, &b);
487 std::unique_ptr<rgw::sal::RGWObject> obj = b->get_object(key);
488
489 auto& owner = bucket->policy.get_owner();
490
491 string req_id = store->svc()->zone_utils->unique_id(store->getRados()->get_new_req_id());
492
493 using namespace rgw::putobj;
494 AtomicObjectProcessor processor(&aio, store, b.get(), nullptr,
495 owner.get_id(), obj_ctx, std::move(obj), olh_epoch,
496 req_id, dpp, y);
497
498 int ret = processor.prepare(y);
499 if (ret < 0)
500 return ret;
501
502 DataProcessor *filter = &processor;
503
504 CompressorRef plugin;
505 boost::optional<RGWPutObj_Compress> compressor;
506
507 const auto& compression_type = store->svc()->zone->get_zone_params().get_compression_type(bucket_info.placement_rule);
508 if (compression_type != "none") {
509 plugin = Compressor::create(store->ctx(), compression_type);
510 if (!plugin) {
511 ldout(store->ctx(), 1) << "Cannot load plugin for compression type "
512 << compression_type << dendl;
513 } else {
514 compressor.emplace(store->ctx(), plugin, filter);
515 filter = &*compressor;
516 }
517 }
518
519 off_t ofs = 0;
520 auto obj_size = data.length();
521
522 RGWMD5Etag etag_calc;
523
524 do {
525 size_t read_len = std::min(data.length(), (unsigned int)cct->_conf->rgw_max_chunk_size);
526
527 bufferlist bl;
528
529 data.splice(0, read_len, &bl);
530 etag_calc.update(bl);
531
532 ret = filter->process(std::move(bl), ofs);
533 if (ret < 0)
534 return ret;
535
536 ofs += read_len;
537 } while (data.length() > 0);
538
539 ret = filter->process({}, ofs);
540 if (ret < 0) {
541 return ret;
542 }
543 bool has_etag_attr = false;
544 auto iter = attrs.find(RGW_ATTR_ETAG);
545 if (iter != attrs.end()) {
546 bufferlist& bl = iter->second;
547 etag = bl.to_str();
548 has_etag_attr = true;
549 }
550
551 if (!aclbl) {
552 RGWAccessControlPolicy_S3 policy(cct);
553
554 policy.create_canned(bucket->policy.get_owner(), bucket->policy.get_owner(), string()); /* default private policy */
555
556 policy.encode(aclbl.emplace());
557 }
558
559 if (etag.empty()) {
560 etag_calc.finish(&etag);
561 }
562
563 if (!has_etag_attr) {
564 bufferlist etagbl;
565 etagbl.append(etag);
566 attrs[RGW_ATTR_ETAG] = etagbl;
567 }
568 attrs[RGW_ATTR_ACL] = *aclbl;
569
570 string *puser_data = nullptr;
571 if (user_data) {
572 puser_data = &(*user_data);
573 }
574
575 return processor.complete(obj_size, etag,
576 &mtime, mtime,
577 attrs, delete_at,
578 nullptr, nullptr,
579 puser_data,
580 nullptr, nullptr, y);
581 }
582
583 void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy)
584 {
585 policy.encode(aclbl.emplace());
586 }
587
588 int rgw_tools_init(CephContext *cct)
589 {
590 ext_mime_map = new std::map<std::string, std::string>;
591 ext_mime_map_init(cct, cct->_conf->rgw_mime_types_file.c_str());
592 // ignore errors; missing mime.types is not fatal
593 return 0;
594 }
595
596 void rgw_tools_cleanup()
597 {
598 delete ext_mime_map;
599 ext_mime_map = nullptr;
600 }
601
602 void rgw_complete_aio_completion(librados::AioCompletion* c, int r) {
603 auto pc = c->pc;
604 librados::CB_AioCompleteAndSafe cb(pc);
605 cb(r);
606 }