]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_tools.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_tools.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <errno.h>
5
6 #include "common/errno.h"
7 #include "common/safe_io.h"
8 #include "librados/librados_asio.h"
9 #include "common/async/yield_context.h"
10
11 #include "include/types.h"
12 #include "include/stringify.h"
13
14 #include "librados/AioCompletionImpl.h"
15
16 #include "rgw_common.h"
17 #include "rgw_tools.h"
18 #include "rgw_acl_s3.h"
19 #include "rgw_op.h"
20 #include "rgw_putobj_processor.h"
21 #include "rgw_aio_throttle.h"
22 #include "rgw_compression.h"
23 #include "rgw_zone.h"
24 #include "rgw_sal_rados.h"
25 #include "osd/osd_types.h"
26
27 #include "services/svc_sys_obj.h"
28 #include "services/svc_zone.h"
29 #include "services/svc_zone_utils.h"
30
31 #define dout_subsys ceph_subsys_rgw
32 #define dout_context g_ceph_context
33
34 #define READ_CHUNK_LEN (512 * 1024)
35
36 using namespace std;
37
38 static std::map<std::string, std::string>* ext_mime_map;
39
40 int rgw_init_ioctx(const DoutPrefixProvider *dpp,
41 librados::Rados *rados, const rgw_pool& pool,
42 librados::IoCtx& ioctx, bool create,
43 bool mostly_omap)
44 {
45 int r = rados->ioctx_create(pool.name.c_str(), ioctx);
46 if (r == -ENOENT && create) {
47 r = rados->pool_create(pool.name.c_str());
48 if (r == -ERANGE) {
49 ldpp_dout(dpp, 0)
50 << __func__
51 << " ERROR: librados::Rados::pool_create returned " << cpp_strerror(-r)
52 << " (this can be due to a pool or placement group misconfiguration, e.g."
53 << " pg_num < pgp_num or mon_max_pg_per_osd exceeded)"
54 << dendl;
55 }
56 if (r < 0 && r != -EEXIST) {
57 return r;
58 }
59
60 r = rados->ioctx_create(pool.name.c_str(), ioctx);
61 if (r < 0) {
62 return r;
63 }
64
65 r = ioctx.application_enable(pg_pool_t::APPLICATION_NAME_RGW, false);
66 if (r < 0 && r != -EOPNOTSUPP) {
67 return r;
68 }
69
70 if (mostly_omap) {
71 // set pg_autoscale_bias
72 bufferlist inbl;
73 float bias = g_conf().get_val<double>("rgw_rados_pool_autoscale_bias");
74 int r = rados->mon_command(
75 "{\"prefix\": \"osd pool set\", \"pool\": \"" +
76 pool.name + "\", \"var\": \"pg_autoscale_bias\", \"val\": \"" +
77 stringify(bias) + "\"}",
78 inbl, NULL, NULL);
79 if (r < 0) {
80 ldpp_dout(dpp, 10) << __func__ << " warning: failed to set pg_autoscale_bias on "
81 << pool.name << dendl;
82 }
83 // set pg_num_min
84 int min = g_conf().get_val<uint64_t>("rgw_rados_pool_pg_num_min");
85 r = rados->mon_command(
86 "{\"prefix\": \"osd pool set\", \"pool\": \"" +
87 pool.name + "\", \"var\": \"pg_num_min\", \"val\": \"" +
88 stringify(min) + "\"}",
89 inbl, NULL, NULL);
90 if (r < 0) {
91 ldpp_dout(dpp, 10) << __func__ << " warning: failed to set pg_num_min on "
92 << pool.name << dendl;
93 }
94 // set recovery_priority
95 int p = g_conf().get_val<uint64_t>("rgw_rados_pool_recovery_priority");
96 r = rados->mon_command(
97 "{\"prefix\": \"osd pool set\", \"pool\": \"" +
98 pool.name + "\", \"var\": \"recovery_priority\": \"" +
99 stringify(p) + "\"}",
100 inbl, NULL, NULL);
101 if (r < 0) {
102 ldpp_dout(dpp, 10) << __func__ << " warning: failed to set recovery_priority on "
103 << pool.name << dendl;
104 }
105 }
106 } else if (r < 0) {
107 return r;
108 }
109 if (!pool.ns.empty()) {
110 ioctx.set_namespace(pool.ns);
111 }
112 return 0;
113 }
114
115 void rgw_shard_name(const string& prefix, unsigned max_shards, const string& key, string& name, int *shard_id)
116 {
117 uint32_t val = ceph_str_hash_linux(key.c_str(), key.size());
118 char buf[16];
119 if (shard_id) {
120 *shard_id = val % max_shards;
121 }
122 snprintf(buf, sizeof(buf), "%u", (unsigned)(val % max_shards));
123 name = prefix + buf;
124 }
125
126 void rgw_shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name)
127 {
128 uint32_t val = ceph_str_hash_linux(key.c_str(), key.size());
129 val ^= ceph_str_hash_linux(section.c_str(), section.size());
130 char buf[16];
131 snprintf(buf, sizeof(buf), "%u", (unsigned)(val % max_shards));
132 name = prefix + buf;
133 }
134
135 void rgw_shard_name(const string& prefix, unsigned shard_id, string& name)
136 {
137 char buf[16];
138 snprintf(buf, sizeof(buf), "%u", shard_id);
139 name = prefix + buf;
140 }
141
142 int rgw_parse_list_of_flags(struct rgw_name_to_flag *mapping,
143 const string& str, uint32_t *perm)
144 {
145 list<string> strs;
146 get_str_list(str, strs);
147 list<string>::iterator iter;
148 uint32_t v = 0;
149 for (iter = strs.begin(); iter != strs.end(); ++iter) {
150 string& s = *iter;
151 for (int i = 0; mapping[i].type_name; i++) {
152 if (s.compare(mapping[i].type_name) == 0)
153 v |= mapping[i].flag;
154 }
155 }
156
157 *perm = v;
158 return 0;
159 }
160
161 int rgw_put_system_obj(const DoutPrefixProvider *dpp,
162 RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
163 RGWObjVersionTracker *objv_tracker, real_time set_mtime, optional_yield y, map<string, bufferlist> *pattrs)
164 {
165 map<string,bufferlist> no_attrs;
166 if (!pattrs) {
167 pattrs = &no_attrs;
168 }
169
170 rgw_raw_obj obj(pool, oid);
171
172 auto sysobj = obj_ctx.get_obj(obj);
173 int ret = sysobj.wop()
174 .set_objv_tracker(objv_tracker)
175 .set_exclusive(exclusive)
176 .set_mtime(set_mtime)
177 .set_attrs(*pattrs)
178 .write(dpp, data, y);
179
180 return ret;
181 }
182
183 int rgw_get_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& key, bufferlist& bl,
184 RGWObjVersionTracker *objv_tracker, real_time *pmtime, optional_yield y, const DoutPrefixProvider *dpp, map<string, bufferlist> *pattrs,
185 rgw_cache_entry_info *cache_info,
186 boost::optional<obj_version> refresh_version, bool raw_attrs)
187 {
188 bufferlist::iterator iter;
189 int request_len = READ_CHUNK_LEN;
190 rgw_raw_obj obj(pool, key);
191
192 obj_version original_readv;
193 if (objv_tracker && !objv_tracker->read_version.empty()) {
194 original_readv = objv_tracker->read_version;
195 }
196
197 do {
198 auto sysobj = obj_ctx.get_obj(obj);
199 auto rop = sysobj.rop();
200
201 int ret = rop.set_attrs(pattrs)
202 .set_last_mod(pmtime)
203 .set_objv_tracker(objv_tracker)
204 .set_raw_attrs(raw_attrs)
205 .stat(y, dpp);
206 if (ret < 0)
207 return ret;
208
209 ret = rop.set_cache_info(cache_info)
210 .set_refresh_version(refresh_version)
211 .read(dpp, &bl, y);
212 if (ret == -ECANCELED) {
213 /* raced, restart */
214 if (!original_readv.empty()) {
215 /* we were asked to read a specific obj_version, failed */
216 return ret;
217 }
218 if (objv_tracker) {
219 objv_tracker->read_version.clear();
220 }
221 sysobj.invalidate();
222 continue;
223 }
224 if (ret < 0)
225 return ret;
226
227 if (ret < request_len)
228 break;
229 bl.clear();
230 request_len *= 2;
231 } while (true);
232
233 return 0;
234 }
235
236 int rgw_delete_system_obj(const DoutPrefixProvider *dpp,
237 RGWSI_SysObj *sysobj_svc, const rgw_pool& pool, const string& oid,
238 RGWObjVersionTracker *objv_tracker, optional_yield y)
239 {
240 auto obj_ctx = sysobj_svc->init_obj_ctx();
241 auto sysobj = obj_ctx.get_obj(rgw_raw_obj{pool, oid});
242 rgw_raw_obj obj(pool, oid);
243 return sysobj.wop()
244 .set_objv_tracker(objv_tracker)
245 .remove(dpp, y);
246 }
247
248 thread_local bool is_asio_thread = false;
249
250 int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
251 librados::ObjectReadOperation *op, bufferlist* pbl,
252 optional_yield y, int flags)
253 {
254 // given a yield_context, call async_operate() to yield the coroutine instead
255 // of blocking
256 if (y) {
257 auto& context = y.get_io_context();
258 auto& yield = y.get_yield_context();
259 boost::system::error_code ec;
260 auto bl = librados::async_operate(
261 context, ioctx, oid, op, flags, yield[ec]);
262 if (pbl) {
263 *pbl = std::move(bl);
264 }
265 return -ec.value();
266 }
267 // work on asio threads should be asynchronous, so warn when they block
268 if (is_asio_thread) {
269 ldpp_dout(dpp, 20) << "WARNING: blocking librados call" << dendl;
270 }
271 return ioctx.operate(oid, op, nullptr, flags);
272 }
273
274 int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
275 librados::ObjectWriteOperation *op, optional_yield y,
276 int flags)
277 {
278 if (y) {
279 auto& context = y.get_io_context();
280 auto& yield = y.get_yield_context();
281 boost::system::error_code ec;
282 librados::async_operate(context, ioctx, oid, op, flags, yield[ec]);
283 return -ec.value();
284 }
285 if (is_asio_thread) {
286 ldpp_dout(dpp, 20) << "WARNING: blocking librados call" << dendl;
287 }
288 return ioctx.operate(oid, op, flags);
289 }
290
291 int rgw_rados_notify(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
292 bufferlist& bl, uint64_t timeout_ms, bufferlist* pbl,
293 optional_yield y)
294 {
295 if (y) {
296 auto& context = y.get_io_context();
297 auto& yield = y.get_yield_context();
298 boost::system::error_code ec;
299 auto reply = librados::async_notify(context, ioctx, oid,
300 bl, timeout_ms, yield[ec]);
301 if (pbl) {
302 *pbl = std::move(reply);
303 }
304 return -ec.value();
305 }
306 if (is_asio_thread) {
307 ldpp_dout(dpp, 20) << "WARNING: blocking librados call" << dendl;
308 }
309 return ioctx.notify2(oid, bl, timeout_ms, pbl);
310 }
311
312 void parse_mime_map_line(const char *start, const char *end)
313 {
314 char line[end - start + 1];
315 strncpy(line, start, end - start);
316 line[end - start] = '\0';
317 char *l = line;
318 #define DELIMS " \t\n\r"
319
320 while (isspace(*l))
321 l++;
322
323 char *mime = strsep(&l, DELIMS);
324 if (!mime)
325 return;
326
327 char *ext;
328 do {
329 ext = strsep(&l, DELIMS);
330 if (ext && *ext) {
331 (*ext_mime_map)[ext] = mime;
332 }
333 } while (ext);
334 }
335
336
337 void parse_mime_map(const char *buf)
338 {
339 const char *start = buf, *end = buf;
340 while (*end) {
341 while (*end && *end != '\n') {
342 end++;
343 }
344 parse_mime_map_line(start, end);
345 end++;
346 start = end;
347 }
348 }
349
350 static int ext_mime_map_init(const DoutPrefixProvider *dpp, CephContext *cct, const char *ext_map)
351 {
352 int fd = open(ext_map, O_RDONLY);
353 char *buf = NULL;
354 int ret;
355 if (fd < 0) {
356 ret = -errno;
357 ldpp_dout(dpp, 0) << __func__ << " failed to open file=" << ext_map
358 << " : " << cpp_strerror(-ret) << dendl;
359 return ret;
360 }
361
362 struct stat st;
363 ret = fstat(fd, &st);
364 if (ret < 0) {
365 ret = -errno;
366 ldpp_dout(dpp, 0) << __func__ << " failed to stat file=" << ext_map
367 << " : " << cpp_strerror(-ret) << dendl;
368 goto done;
369 }
370
371 buf = (char *)malloc(st.st_size + 1);
372 if (!buf) {
373 ret = -ENOMEM;
374 ldpp_dout(dpp, 0) << __func__ << " failed to allocate buf" << dendl;
375 goto done;
376 }
377
378 ret = safe_read(fd, buf, st.st_size + 1);
379 if (ret != st.st_size) {
380 // huh? file size has changed?
381 ldpp_dout(dpp, 0) << __func__ << " raced! will retry.." << dendl;
382 free(buf);
383 close(fd);
384 return ext_mime_map_init(dpp, cct, ext_map);
385 }
386 buf[st.st_size] = '\0';
387
388 parse_mime_map(buf);
389 ret = 0;
390 done:
391 free(buf);
392 close(fd);
393 return ret;
394 }
395
396 const char *rgw_find_mime_by_ext(string& ext)
397 {
398 map<string, string>::iterator iter = ext_mime_map->find(ext);
399 if (iter == ext_mime_map->end())
400 return NULL;
401
402 return iter->second.c_str();
403 }
404
405 void rgw_filter_attrset(map<string, bufferlist>& unfiltered_attrset, const string& check_prefix,
406 map<string, bufferlist> *attrset)
407 {
408 attrset->clear();
409 map<string, bufferlist>::iterator iter;
410 for (iter = unfiltered_attrset.lower_bound(check_prefix);
411 iter != unfiltered_attrset.end(); ++iter) {
412 if (!boost::algorithm::starts_with(iter->first, check_prefix))
413 break;
414 (*attrset)[iter->first] = iter->second;
415 }
416 }
417
418 RGWDataAccess::RGWDataAccess(rgw::sal::Store* _store) : store(_store)
419 {
420 }
421
422
423 int RGWDataAccess::Bucket::finish_init()
424 {
425 auto iter = attrs.find(RGW_ATTR_ACL);
426 if (iter == attrs.end()) {
427 return 0;
428 }
429
430 bufferlist::const_iterator bliter = iter->second.begin();
431 try {
432 policy.decode(bliter);
433 } catch (buffer::error& err) {
434 return -EIO;
435 }
436
437 return 0;
438 }
439
440 int RGWDataAccess::Bucket::init(const DoutPrefixProvider *dpp, optional_yield y)
441 {
442 std::unique_ptr<rgw::sal::Bucket> bucket;
443 int ret = sd->store->get_bucket(dpp, nullptr, tenant, name, &bucket, y);
444 if (ret < 0) {
445 return ret;
446 }
447
448 bucket_info = bucket->get_info();
449 mtime = bucket->get_modification_time();
450 attrs = bucket->get_attrs();
451
452 return finish_init();
453 }
454
455 int RGWDataAccess::Bucket::init(const RGWBucketInfo& _bucket_info,
456 const map<string, bufferlist>& _attrs)
457 {
458 bucket_info = _bucket_info;
459 attrs = _attrs;
460
461 return finish_init();
462 }
463
464 int RGWDataAccess::Bucket::get_object(const rgw_obj_key& key,
465 ObjectRef *obj) {
466 obj->reset(new Object(sd, shared_from_this(), key));
467 return 0;
468 }
469
470 int RGWDataAccess::Object::put(bufferlist& data,
471 map<string, bufferlist>& attrs,
472 const DoutPrefixProvider *dpp,
473 optional_yield y)
474 {
475 rgw::sal::Store* store = sd->store;
476 CephContext *cct = store->ctx();
477
478 string tag;
479 append_rand_alpha(cct, tag, tag, 32);
480
481 RGWBucketInfo& bucket_info = bucket->bucket_info;
482
483 rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
484
485 RGWObjectCtx obj_ctx(store);
486 std::unique_ptr<rgw::sal::Bucket> b;
487 store->get_bucket(NULL, bucket_info, &b);
488 std::unique_ptr<rgw::sal::Object> obj = b->get_object(key);
489
490 auto& owner = bucket->policy.get_owner();
491
492 string req_id = store->zone_unique_id(store->get_new_req_id());
493
494 std::unique_ptr<rgw::sal::Writer> processor;
495 processor = store->get_atomic_writer(dpp, y, std::move(obj),
496 owner.get_id(), obj_ctx,
497 nullptr, olh_epoch, req_id);
498
499 int ret = processor->prepare(y);
500 if (ret < 0)
501 return ret;
502
503 rgw::sal::DataProcessor *filter = processor.get();
504
505 CompressorRef plugin;
506 boost::optional<RGWPutObj_Compress> compressor;
507
508 const auto& compression_type = store->get_zone()->get_params().get_compression_type(bucket_info.placement_rule);
509 if (compression_type != "none") {
510 plugin = Compressor::create(store->ctx(), compression_type);
511 if (!plugin) {
512 ldpp_dout(dpp, 1) << "Cannot load plugin for compression type "
513 << compression_type << dendl;
514 } else {
515 compressor.emplace(store->ctx(), plugin, filter);
516 filter = &*compressor;
517 }
518 }
519
520 off_t ofs = 0;
521 auto obj_size = data.length();
522
523 RGWMD5Etag etag_calc;
524
525 do {
526 size_t read_len = std::min(data.length(), (unsigned int)cct->_conf->rgw_max_chunk_size);
527
528 bufferlist bl;
529
530 data.splice(0, read_len, &bl);
531 etag_calc.update(bl);
532
533 ret = filter->process(std::move(bl), ofs);
534 if (ret < 0)
535 return ret;
536
537 ofs += read_len;
538 } while (data.length() > 0);
539
540 ret = filter->process({}, ofs);
541 if (ret < 0) {
542 return ret;
543 }
544 bool has_etag_attr = false;
545 auto iter = attrs.find(RGW_ATTR_ETAG);
546 if (iter != attrs.end()) {
547 bufferlist& bl = iter->second;
548 etag = bl.to_str();
549 has_etag_attr = true;
550 }
551
552 if (!aclbl) {
553 RGWAccessControlPolicy_S3 policy(cct);
554
555 policy.create_canned(bucket->policy.get_owner(), bucket->policy.get_owner(), string()); /* default private policy */
556
557 policy.encode(aclbl.emplace());
558 }
559
560 if (etag.empty()) {
561 etag_calc.finish(&etag);
562 }
563
564 if (!has_etag_attr) {
565 bufferlist etagbl;
566 etagbl.append(etag);
567 attrs[RGW_ATTR_ETAG] = etagbl;
568 }
569 attrs[RGW_ATTR_ACL] = *aclbl;
570
571 string *puser_data = nullptr;
572 if (user_data) {
573 puser_data = &(*user_data);
574 }
575
576 return processor->complete(obj_size, etag,
577 &mtime, mtime,
578 attrs, delete_at,
579 nullptr, nullptr,
580 puser_data,
581 nullptr, nullptr, y);
582 }
583
584 void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy)
585 {
586 policy.encode(aclbl.emplace());
587 }
588
589 int rgw_tools_init(const DoutPrefixProvider *dpp, CephContext *cct)
590 {
591 ext_mime_map = new std::map<std::string, std::string>;
592 ext_mime_map_init(dpp, cct, cct->_conf->rgw_mime_types_file.c_str());
593 // ignore errors; missing mime.types is not fatal
594 return 0;
595 }
596
597 void rgw_tools_cleanup()
598 {
599 delete ext_mime_map;
600 ext_mime_map = nullptr;
601 }
602
603 void rgw_complete_aio_completion(librados::AioCompletion* c, int r) {
604 auto pc = c->pc;
605 librados::CB_AioCompleteAndSafe cb(pc);
606 cb(r);
607 }