]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_tools.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / rgw / rgw_tools.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <errno.h>
5
6 #include "common/errno.h"
7 #include "common/safe_io.h"
8 #include "librados/librados_asio.h"
9 #include "common/async/yield_context.h"
10
11 #include "include/types.h"
12 #include "include/stringify.h"
13
14 #include "librados/AioCompletionImpl.h"
15
16 #include "rgw_common.h"
17 #include "rgw_tools.h"
18 #include "rgw_acl_s3.h"
19 #include "rgw_op.h"
20 #include "rgw_putobj_processor.h"
21 #include "rgw_aio_throttle.h"
22 #include "rgw_compression.h"
23 #include "rgw_zone.h"
24 #include "rgw_sal_rados.h"
25 #include "osd/osd_types.h"
26
27 #include "services/svc_sys_obj.h"
28 #include "services/svc_zone.h"
29 #include "services/svc_zone_utils.h"
30
31 #define dout_subsys ceph_subsys_rgw
32 #define dout_context g_ceph_context
33
34 #define READ_CHUNK_LEN (512 * 1024)
35
36 static std::map<std::string, std::string>* ext_mime_map;
37
38 int rgw_init_ioctx(librados::Rados *rados, const rgw_pool& pool,
39 librados::IoCtx& ioctx, bool create,
40 bool mostly_omap)
41 {
42 int r = rados->ioctx_create(pool.name.c_str(), ioctx);
43 if (r == -ENOENT && create) {
44 r = rados->pool_create(pool.name.c_str());
45 if (r == -ERANGE) {
46 dout(0)
47 << __func__
48 << " ERROR: librados::Rados::pool_create returned " << cpp_strerror(-r)
49 << " (this can be due to a pool or placement group misconfiguration, e.g."
50 << " pg_num < pgp_num or mon_max_pg_per_osd exceeded)"
51 << dendl;
52 }
53 if (r < 0 && r != -EEXIST) {
54 return r;
55 }
56
57 r = rados->ioctx_create(pool.name.c_str(), ioctx);
58 if (r < 0) {
59 return r;
60 }
61
62 r = ioctx.application_enable(pg_pool_t::APPLICATION_NAME_RGW, false);
63 if (r < 0 && r != -EOPNOTSUPP) {
64 return r;
65 }
66
67 if (mostly_omap) {
68 // set pg_autoscale_bias
69 bufferlist inbl;
70 float bias = g_conf().get_val<double>("rgw_rados_pool_autoscale_bias");
71 int r = rados->mon_command(
72 "{\"prefix\": \"osd pool set\", \"pool\": \"" +
73 pool.name + "\", \"var\": \"pg_autoscale_bias\", \"val\": \"" +
74 stringify(bias) + "\"}",
75 inbl, NULL, NULL);
76 if (r < 0) {
77 dout(10) << __func__ << " warning: failed to set pg_autoscale_bias on "
78 << pool.name << dendl;
79 }
80 // set pg_num_min
81 int min = g_conf().get_val<uint64_t>("rgw_rados_pool_pg_num_min");
82 r = rados->mon_command(
83 "{\"prefix\": \"osd pool set\", \"pool\": \"" +
84 pool.name + "\", \"var\": \"pg_num_min\", \"val\": \"" +
85 stringify(min) + "\"}",
86 inbl, NULL, NULL);
87 if (r < 0) {
88 dout(10) << __func__ << " warning: failed to set pg_num_min on "
89 << pool.name << dendl;
90 }
91 // set recovery_priority
92 int p = g_conf().get_val<uint64_t>("rgw_rados_pool_recovery_priority");
93 r = rados->mon_command(
94 "{\"prefix\": \"osd pool set\", \"pool\": \"" +
95 pool.name + "\", \"var\": \"recovery_priority\": \"" +
96 stringify(p) + "\"}",
97 inbl, NULL, NULL);
98 if (r < 0) {
99 dout(10) << __func__ << " warning: failed to set recovery_priority on "
100 << pool.name << dendl;
101 }
102 }
103 } else if (r < 0) {
104 return r;
105 }
106 if (!pool.ns.empty()) {
107 ioctx.set_namespace(pool.ns);
108 }
109 return 0;
110 }
111
112 void rgw_shard_name(const string& prefix, unsigned max_shards, const string& key, string& name, int *shard_id)
113 {
114 uint32_t val = ceph_str_hash_linux(key.c_str(), key.size());
115 char buf[16];
116 if (shard_id) {
117 *shard_id = val % max_shards;
118 }
119 snprintf(buf, sizeof(buf), "%u", (unsigned)(val % max_shards));
120 name = prefix + buf;
121 }
122
123 void rgw_shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name)
124 {
125 uint32_t val = ceph_str_hash_linux(key.c_str(), key.size());
126 val ^= ceph_str_hash_linux(section.c_str(), section.size());
127 char buf[16];
128 snprintf(buf, sizeof(buf), "%u", (unsigned)(val % max_shards));
129 name = prefix + buf;
130 }
131
132 void rgw_shard_name(const string& prefix, unsigned shard_id, string& name)
133 {
134 char buf[16];
135 snprintf(buf, sizeof(buf), "%u", shard_id);
136 name = prefix + buf;
137 }
138
139 int rgw_parse_list_of_flags(struct rgw_name_to_flag *mapping,
140 const string& str, uint32_t *perm)
141 {
142 list<string> strs;
143 get_str_list(str, strs);
144 list<string>::iterator iter;
145 uint32_t v = 0;
146 for (iter = strs.begin(); iter != strs.end(); ++iter) {
147 string& s = *iter;
148 for (int i = 0; mapping[i].type_name; i++) {
149 if (s.compare(mapping[i].type_name) == 0)
150 v |= mapping[i].flag;
151 }
152 }
153
154 *perm = v;
155 return 0;
156 }
157
158 int rgw_put_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
159 RGWObjVersionTracker *objv_tracker, real_time set_mtime, optional_yield y, map<string, bufferlist> *pattrs)
160 {
161 map<string,bufferlist> no_attrs;
162 if (!pattrs) {
163 pattrs = &no_attrs;
164 }
165
166 rgw_raw_obj obj(pool, oid);
167
168 auto sysobj = obj_ctx.get_obj(obj);
169 int ret = sysobj.wop()
170 .set_objv_tracker(objv_tracker)
171 .set_exclusive(exclusive)
172 .set_mtime(set_mtime)
173 .set_attrs(*pattrs)
174 .write(data, y);
175
176 return ret;
177 }
178
179 int rgw_get_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& key, bufferlist& bl,
180 RGWObjVersionTracker *objv_tracker, real_time *pmtime, optional_yield y, map<string, bufferlist> *pattrs,
181 rgw_cache_entry_info *cache_info,
182 boost::optional<obj_version> refresh_version)
183 {
184 bufferlist::iterator iter;
185 int request_len = READ_CHUNK_LEN;
186 rgw_raw_obj obj(pool, key);
187
188 obj_version original_readv;
189 if (objv_tracker && !objv_tracker->read_version.empty()) {
190 original_readv = objv_tracker->read_version;
191 }
192
193 do {
194 auto sysobj = obj_ctx.get_obj(obj);
195 auto rop = sysobj.rop();
196
197 int ret = rop.set_attrs(pattrs)
198 .set_last_mod(pmtime)
199 .set_objv_tracker(objv_tracker)
200 .stat(y);
201 if (ret < 0)
202 return ret;
203
204 ret = rop.set_cache_info(cache_info)
205 .set_refresh_version(refresh_version)
206 .read(&bl, y);
207 if (ret == -ECANCELED) {
208 /* raced, restart */
209 if (!original_readv.empty()) {
210 /* we were asked to read a specific obj_version, failed */
211 return ret;
212 }
213 if (objv_tracker) {
214 objv_tracker->read_version.clear();
215 }
216 sysobj.invalidate();
217 continue;
218 }
219 if (ret < 0)
220 return ret;
221
222 if (ret < request_len)
223 break;
224 bl.clear();
225 request_len *= 2;
226 } while (true);
227
228 return 0;
229 }
230
231 int rgw_delete_system_obj(RGWSI_SysObj *sysobj_svc, const rgw_pool& pool, const string& oid,
232 RGWObjVersionTracker *objv_tracker, optional_yield y)
233 {
234 auto obj_ctx = sysobj_svc->init_obj_ctx();
235 auto sysobj = obj_ctx.get_obj(rgw_raw_obj{pool, oid});
236 rgw_raw_obj obj(pool, oid);
237 return sysobj.wop()
238 .set_objv_tracker(objv_tracker)
239 .remove(y);
240 }
241
242 thread_local bool is_asio_thread = false;
243
244 int rgw_rados_operate(librados::IoCtx& ioctx, const std::string& oid,
245 librados::ObjectReadOperation *op, bufferlist* pbl,
246 optional_yield y, int flags)
247 {
248 // given a yield_context, call async_operate() to yield the coroutine instead
249 // of blocking
250 if (y) {
251 auto& context = y.get_io_context();
252 auto& yield = y.get_yield_context();
253 boost::system::error_code ec;
254 auto bl = librados::async_operate(
255 context, ioctx, oid, op, flags, yield[ec]);
256 if (pbl) {
257 *pbl = std::move(bl);
258 }
259 return -ec.value();
260 }
261 // work on asio threads should be asynchronous, so warn when they block
262 if (is_asio_thread) {
263 dout(20) << "WARNING: blocking librados call" << dendl;
264 }
265 return ioctx.operate(oid, op, nullptr, flags);
266 }
267
268 int rgw_rados_operate(librados::IoCtx& ioctx, const std::string& oid,
269 librados::ObjectWriteOperation *op, optional_yield y,
270 int flags)
271 {
272 if (y) {
273 auto& context = y.get_io_context();
274 auto& yield = y.get_yield_context();
275 boost::system::error_code ec;
276 librados::async_operate(context, ioctx, oid, op, flags, yield[ec]);
277 return -ec.value();
278 }
279 if (is_asio_thread) {
280 dout(20) << "WARNING: blocking librados call" << dendl;
281 }
282 return ioctx.operate(oid, op, flags);
283 }
284
285 int rgw_rados_notify(librados::IoCtx& ioctx, const std::string& oid,
286 bufferlist& bl, uint64_t timeout_ms, bufferlist* pbl,
287 optional_yield y)
288 {
289 if (y) {
290 auto& context = y.get_io_context();
291 auto& yield = y.get_yield_context();
292 boost::system::error_code ec;
293 auto reply = librados::async_notify(context, ioctx, oid,
294 bl, timeout_ms, yield[ec]);
295 if (pbl) {
296 *pbl = std::move(reply);
297 }
298 return -ec.value();
299 }
300 if (is_asio_thread) {
301 dout(20) << "WARNING: blocking librados call" << dendl;
302 }
303 return ioctx.notify2(oid, bl, timeout_ms, pbl);
304 }
305
306 void parse_mime_map_line(const char *start, const char *end)
307 {
308 char line[end - start + 1];
309 strncpy(line, start, end - start);
310 line[end - start] = '\0';
311 char *l = line;
312 #define DELIMS " \t\n\r"
313
314 while (isspace(*l))
315 l++;
316
317 char *mime = strsep(&l, DELIMS);
318 if (!mime)
319 return;
320
321 char *ext;
322 do {
323 ext = strsep(&l, DELIMS);
324 if (ext && *ext) {
325 (*ext_mime_map)[ext] = mime;
326 }
327 } while (ext);
328 }
329
330
331 void parse_mime_map(const char *buf)
332 {
333 const char *start = buf, *end = buf;
334 while (*end) {
335 while (*end && *end != '\n') {
336 end++;
337 }
338 parse_mime_map_line(start, end);
339 end++;
340 start = end;
341 }
342 }
343
344 static int ext_mime_map_init(CephContext *cct, const char *ext_map)
345 {
346 int fd = open(ext_map, O_RDONLY);
347 char *buf = NULL;
348 int ret;
349 if (fd < 0) {
350 ret = -errno;
351 ldout(cct, 0) << __func__ << " failed to open file=" << ext_map
352 << " : " << cpp_strerror(-ret) << dendl;
353 return ret;
354 }
355
356 struct stat st;
357 ret = fstat(fd, &st);
358 if (ret < 0) {
359 ret = -errno;
360 ldout(cct, 0) << __func__ << " failed to stat file=" << ext_map
361 << " : " << cpp_strerror(-ret) << dendl;
362 goto done;
363 }
364
365 buf = (char *)malloc(st.st_size + 1);
366 if (!buf) {
367 ret = -ENOMEM;
368 ldout(cct, 0) << __func__ << " failed to allocate buf" << dendl;
369 goto done;
370 }
371
372 ret = safe_read(fd, buf, st.st_size + 1);
373 if (ret != st.st_size) {
374 // huh? file size has changed?
375 ldout(cct, 0) << __func__ << " raced! will retry.." << dendl;
376 free(buf);
377 close(fd);
378 return ext_mime_map_init(cct, ext_map);
379 }
380 buf[st.st_size] = '\0';
381
382 parse_mime_map(buf);
383 ret = 0;
384 done:
385 free(buf);
386 close(fd);
387 return ret;
388 }
389
390 const char *rgw_find_mime_by_ext(string& ext)
391 {
392 map<string, string>::iterator iter = ext_mime_map->find(ext);
393 if (iter == ext_mime_map->end())
394 return NULL;
395
396 return iter->second.c_str();
397 }
398
399 void rgw_filter_attrset(map<string, bufferlist>& unfiltered_attrset, const string& check_prefix,
400 map<string, bufferlist> *attrset)
401 {
402 attrset->clear();
403 map<string, bufferlist>::iterator iter;
404 for (iter = unfiltered_attrset.lower_bound(check_prefix);
405 iter != unfiltered_attrset.end(); ++iter) {
406 if (!boost::algorithm::starts_with(iter->first, check_prefix))
407 break;
408 (*attrset)[iter->first] = iter->second;
409 }
410 }
411
412 RGWDataAccess::RGWDataAccess(rgw::sal::RGWRadosStore *_store) : store(_store)
413 {
414 sysobj_ctx = std::make_unique<RGWSysObjectCtx>(store->svc()->sysobj->init_obj_ctx());
415 }
416
417
418 int RGWDataAccess::Bucket::finish_init()
419 {
420 auto iter = attrs.find(RGW_ATTR_ACL);
421 if (iter == attrs.end()) {
422 return 0;
423 }
424
425 bufferlist::const_iterator bliter = iter->second.begin();
426 try {
427 policy.decode(bliter);
428 } catch (buffer::error& err) {
429 return -EIO;
430 }
431
432 return 0;
433 }
434
435 int RGWDataAccess::Bucket::init(optional_yield y)
436 {
437 int ret = sd->store->getRados()->get_bucket_info(sd->store->svc(),
438 tenant, name,
439 bucket_info,
440 &mtime,
441 y,
442 &attrs);
443 if (ret < 0) {
444 return ret;
445 }
446
447 return finish_init();
448 }
449
450 int RGWDataAccess::Bucket::init(const RGWBucketInfo& _bucket_info,
451 const map<string, bufferlist>& _attrs)
452 {
453 bucket_info = _bucket_info;
454 attrs = _attrs;
455
456 return finish_init();
457 }
458
459 int RGWDataAccess::Bucket::get_object(const rgw_obj_key& key,
460 ObjectRef *obj) {
461 obj->reset(new Object(sd, shared_from_this(), key));
462 return 0;
463 }
464
465 int RGWDataAccess::Object::put(bufferlist& data,
466 map<string, bufferlist>& attrs,
467 const DoutPrefixProvider *dpp,
468 optional_yield y)
469 {
470 rgw::sal::RGWRadosStore *store = sd->store;
471 CephContext *cct = store->ctx();
472
473 string tag;
474 append_rand_alpha(cct, tag, tag, 32);
475
476 RGWBucketInfo& bucket_info = bucket->bucket_info;
477
478 rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
479
480 RGWObjectCtx obj_ctx(store);
481 std::unique_ptr<rgw::sal::RGWBucket> b;
482 store->get_bucket(NULL, bucket_info, &b);
483 std::unique_ptr<rgw::sal::RGWObject> obj = b->get_object(key);
484
485 auto& owner = bucket->policy.get_owner();
486
487 string req_id = store->svc()->zone_utils->unique_id(store->getRados()->get_new_req_id());
488
489 using namespace rgw::putobj;
490 AtomicObjectProcessor processor(&aio, store, b.get(), nullptr,
491 owner.get_id(), obj_ctx, std::move(obj), olh_epoch,
492 req_id, dpp, y);
493
494 int ret = processor.prepare(y);
495 if (ret < 0)
496 return ret;
497
498 DataProcessor *filter = &processor;
499
500 CompressorRef plugin;
501 boost::optional<RGWPutObj_Compress> compressor;
502
503 const auto& compression_type = store->svc()->zone->get_zone_params().get_compression_type(bucket_info.placement_rule);
504 if (compression_type != "none") {
505 plugin = Compressor::create(store->ctx(), compression_type);
506 if (!plugin) {
507 ldout(store->ctx(), 1) << "Cannot load plugin for compression type "
508 << compression_type << dendl;
509 } else {
510 compressor.emplace(store->ctx(), plugin, filter);
511 filter = &*compressor;
512 }
513 }
514
515 off_t ofs = 0;
516 auto obj_size = data.length();
517
518 RGWMD5Etag etag_calc;
519
520 do {
521 size_t read_len = std::min(data.length(), (unsigned int)cct->_conf->rgw_max_chunk_size);
522
523 bufferlist bl;
524
525 data.splice(0, read_len, &bl);
526 etag_calc.update(bl);
527
528 ret = filter->process(std::move(bl), ofs);
529 if (ret < 0)
530 return ret;
531
532 ofs += read_len;
533 } while (data.length() > 0);
534
535 ret = filter->process({}, ofs);
536 if (ret < 0) {
537 return ret;
538 }
539 bool has_etag_attr = false;
540 auto iter = attrs.find(RGW_ATTR_ETAG);
541 if (iter != attrs.end()) {
542 bufferlist& bl = iter->second;
543 etag = bl.to_str();
544 has_etag_attr = true;
545 }
546
547 if (!aclbl) {
548 RGWAccessControlPolicy_S3 policy(cct);
549
550 policy.create_canned(bucket->policy.get_owner(), bucket->policy.get_owner(), string()); /* default private policy */
551
552 policy.encode(aclbl.emplace());
553 }
554
555 if (etag.empty()) {
556 etag_calc.finish(&etag);
557 }
558
559 if (!has_etag_attr) {
560 bufferlist etagbl;
561 etagbl.append(etag);
562 attrs[RGW_ATTR_ETAG] = etagbl;
563 }
564 attrs[RGW_ATTR_ACL] = *aclbl;
565
566 string *puser_data = nullptr;
567 if (user_data) {
568 puser_data = &(*user_data);
569 }
570
571 return processor.complete(obj_size, etag,
572 &mtime, mtime,
573 attrs, delete_at,
574 nullptr, nullptr,
575 puser_data,
576 nullptr, nullptr, y);
577 }
578
579 void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy)
580 {
581 policy.encode(aclbl.emplace());
582 }
583
584 int rgw_tools_init(CephContext *cct)
585 {
586 ext_mime_map = new std::map<std::string, std::string>;
587 ext_mime_map_init(cct, cct->_conf->rgw_mime_types_file.c_str());
588 // ignore errors; missing mime.types is not fatal
589 return 0;
590 }
591
592 void rgw_tools_cleanup()
593 {
594 delete ext_mime_map;
595 ext_mime_map = nullptr;
596 }
597
598 void rgw_complete_aio_completion(librados::AioCompletion* c, int r) {
599 auto pc = c->pc;
600 librados::CB_AioCompleteAndSafe cb(pc);
601 cb(r);
602 }