]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_tools.cc
ed0f872108ef1ca5db6566e98ef29c36b3396b06
[ceph.git] / ceph / src / rgw / rgw_tools.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <errno.h>
5
6 #include "common/errno.h"
7 #include "common/safe_io.h"
8 #include "librados/librados_asio.h"
9 #include "common/async/yield_context.h"
10
11 #include "include/types.h"
12 #include "include/stringify.h"
13
14 #include "librados/AioCompletionImpl.h"
15
16 #include "rgw_common.h"
17 #include "rgw_tools.h"
18 #include "rgw_acl_s3.h"
19 #include "rgw_op.h"
20 #include "rgw_putobj_processor.h"
21 #include "rgw_aio_throttle.h"
22 #include "rgw_compression.h"
23 #include "rgw_zone.h"
24 #include "rgw_sal_rados.h"
25 #include "osd/osd_types.h"
26
27 #include "services/svc_sys_obj.h"
28 #include "services/svc_zone.h"
29 #include "services/svc_zone_utils.h"
30
31 #define dout_subsys ceph_subsys_rgw
32 #define dout_context g_ceph_context
33
34 #define READ_CHUNK_LEN (512 * 1024)
35
36 using namespace std;
37
38 static std::map<std::string, std::string>* ext_mime_map;
39
40 int rgw_init_ioctx(const DoutPrefixProvider *dpp,
41 librados::Rados *rados, const rgw_pool& pool,
42 librados::IoCtx& ioctx, bool create,
43 bool mostly_omap)
44 {
45 int r = rados->ioctx_create(pool.name.c_str(), ioctx);
46 if (r == -ENOENT && create) {
47 r = rados->pool_create(pool.name.c_str());
48 if (r == -ERANGE) {
49 ldpp_dout(dpp, 0)
50 << __func__
51 << " ERROR: librados::Rados::pool_create returned " << cpp_strerror(-r)
52 << " (this can be due to a pool or placement group misconfiguration, e.g."
53 << " pg_num < pgp_num or mon_max_pg_per_osd exceeded)"
54 << dendl;
55 }
56 if (r < 0 && r != -EEXIST) {
57 return r;
58 }
59
60 r = rados->ioctx_create(pool.name.c_str(), ioctx);
61 if (r < 0) {
62 return r;
63 }
64
65 r = ioctx.application_enable(pg_pool_t::APPLICATION_NAME_RGW, false);
66 if (r < 0 && r != -EOPNOTSUPP) {
67 return r;
68 }
69
70 if (mostly_omap) {
71 // set pg_autoscale_bias
72 bufferlist inbl;
73 float bias = g_conf().get_val<double>("rgw_rados_pool_autoscale_bias");
74 int r = rados->mon_command(
75 "{\"prefix\": \"osd pool set\", \"pool\": \"" +
76 pool.name + "\", \"var\": \"pg_autoscale_bias\", \"val\": \"" +
77 stringify(bias) + "\"}",
78 inbl, NULL, NULL);
79 if (r < 0) {
80 ldpp_dout(dpp, 10) << __func__ << " warning: failed to set pg_autoscale_bias on "
81 << pool.name << dendl;
82 }
83 // set recovery_priority
84 int p = g_conf().get_val<uint64_t>("rgw_rados_pool_recovery_priority");
85 r = rados->mon_command(
86 "{\"prefix\": \"osd pool set\", \"pool\": \"" +
87 pool.name + "\", \"var\": \"recovery_priority\": \"" +
88 stringify(p) + "\"}",
89 inbl, NULL, NULL);
90 if (r < 0) {
91 ldpp_dout(dpp, 10) << __func__ << " warning: failed to set recovery_priority on "
92 << pool.name << dendl;
93 }
94 }
95 } else if (r < 0) {
96 return r;
97 }
98 if (!pool.ns.empty()) {
99 ioctx.set_namespace(pool.ns);
100 }
101 return 0;
102 }
103
104 void rgw_shard_name(const string& prefix, unsigned max_shards, const string& key, string& name, int *shard_id)
105 {
106 uint32_t val = ceph_str_hash_linux(key.c_str(), key.size());
107 char buf[16];
108 if (shard_id) {
109 *shard_id = val % max_shards;
110 }
111 snprintf(buf, sizeof(buf), "%u", (unsigned)(val % max_shards));
112 name = prefix + buf;
113 }
114
115 void rgw_shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name)
116 {
117 uint32_t val = ceph_str_hash_linux(key.c_str(), key.size());
118 val ^= ceph_str_hash_linux(section.c_str(), section.size());
119 char buf[16];
120 snprintf(buf, sizeof(buf), "%u", (unsigned)(val % max_shards));
121 name = prefix + buf;
122 }
123
124 void rgw_shard_name(const string& prefix, unsigned shard_id, string& name)
125 {
126 char buf[16];
127 snprintf(buf, sizeof(buf), "%u", shard_id);
128 name = prefix + buf;
129 }
130
131 int rgw_parse_list_of_flags(struct rgw_name_to_flag *mapping,
132 const string& str, uint32_t *perm)
133 {
134 list<string> strs;
135 get_str_list(str, strs);
136 list<string>::iterator iter;
137 uint32_t v = 0;
138 for (iter = strs.begin(); iter != strs.end(); ++iter) {
139 string& s = *iter;
140 for (int i = 0; mapping[i].type_name; i++) {
141 if (s.compare(mapping[i].type_name) == 0)
142 v |= mapping[i].flag;
143 }
144 }
145
146 *perm = v;
147 return 0;
148 }
149
150 int rgw_put_system_obj(const DoutPrefixProvider *dpp,
151 RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
152 RGWObjVersionTracker *objv_tracker, real_time set_mtime, optional_yield y, map<string, bufferlist> *pattrs)
153 {
154 map<string,bufferlist> no_attrs;
155 if (!pattrs) {
156 pattrs = &no_attrs;
157 }
158
159 rgw_raw_obj obj(pool, oid);
160
161 auto sysobj = obj_ctx.get_obj(obj);
162 int ret = sysobj.wop()
163 .set_objv_tracker(objv_tracker)
164 .set_exclusive(exclusive)
165 .set_mtime(set_mtime)
166 .set_attrs(*pattrs)
167 .write(dpp, data, y);
168
169 return ret;
170 }
171
172 int rgw_get_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& key, bufferlist& bl,
173 RGWObjVersionTracker *objv_tracker, real_time *pmtime, optional_yield y, const DoutPrefixProvider *dpp, map<string, bufferlist> *pattrs,
174 rgw_cache_entry_info *cache_info,
175 boost::optional<obj_version> refresh_version, bool raw_attrs)
176 {
177 bufferlist::iterator iter;
178 int request_len = READ_CHUNK_LEN;
179 rgw_raw_obj obj(pool, key);
180
181 obj_version original_readv;
182 if (objv_tracker && !objv_tracker->read_version.empty()) {
183 original_readv = objv_tracker->read_version;
184 }
185
186 do {
187 auto sysobj = obj_ctx.get_obj(obj);
188 auto rop = sysobj.rop();
189
190 int ret = rop.set_attrs(pattrs)
191 .set_last_mod(pmtime)
192 .set_objv_tracker(objv_tracker)
193 .set_raw_attrs(raw_attrs)
194 .stat(y, dpp);
195 if (ret < 0)
196 return ret;
197
198 ret = rop.set_cache_info(cache_info)
199 .set_refresh_version(refresh_version)
200 .read(dpp, &bl, y);
201 if (ret == -ECANCELED) {
202 /* raced, restart */
203 if (!original_readv.empty()) {
204 /* we were asked to read a specific obj_version, failed */
205 return ret;
206 }
207 if (objv_tracker) {
208 objv_tracker->read_version.clear();
209 }
210 sysobj.invalidate();
211 continue;
212 }
213 if (ret < 0)
214 return ret;
215
216 if (ret < request_len)
217 break;
218 bl.clear();
219 request_len *= 2;
220 } while (true);
221
222 return 0;
223 }
224
225 int rgw_delete_system_obj(const DoutPrefixProvider *dpp,
226 RGWSI_SysObj *sysobj_svc, const rgw_pool& pool, const string& oid,
227 RGWObjVersionTracker *objv_tracker, optional_yield y)
228 {
229 auto obj_ctx = sysobj_svc->init_obj_ctx();
230 auto sysobj = obj_ctx.get_obj(rgw_raw_obj{pool, oid});
231 rgw_raw_obj obj(pool, oid);
232 return sysobj.wop()
233 .set_objv_tracker(objv_tracker)
234 .remove(dpp, y);
235 }
236
237 thread_local bool is_asio_thread = false;
238
239 int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
240 librados::ObjectReadOperation *op, bufferlist* pbl,
241 optional_yield y, int flags)
242 {
243 // given a yield_context, call async_operate() to yield the coroutine instead
244 // of blocking
245 if (y) {
246 auto& context = y.get_io_context();
247 auto& yield = y.get_yield_context();
248 boost::system::error_code ec;
249 auto bl = librados::async_operate(
250 context, ioctx, oid, op, flags, yield[ec]);
251 if (pbl) {
252 *pbl = std::move(bl);
253 }
254 return -ec.value();
255 }
256 // work on asio threads should be asynchronous, so warn when they block
257 if (is_asio_thread) {
258 ldpp_dout(dpp, 20) << "WARNING: blocking librados call" << dendl;
259 }
260 return ioctx.operate(oid, op, nullptr, flags);
261 }
262
263 int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
264 librados::ObjectWriteOperation *op, optional_yield y,
265 int flags)
266 {
267 if (y) {
268 auto& context = y.get_io_context();
269 auto& yield = y.get_yield_context();
270 boost::system::error_code ec;
271 librados::async_operate(context, ioctx, oid, op, flags, yield[ec]);
272 return -ec.value();
273 }
274 if (is_asio_thread) {
275 ldpp_dout(dpp, 20) << "WARNING: blocking librados call" << dendl;
276 }
277 return ioctx.operate(oid, op, flags);
278 }
279
280 int rgw_rados_notify(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
281 bufferlist& bl, uint64_t timeout_ms, bufferlist* pbl,
282 optional_yield y)
283 {
284 if (y) {
285 auto& context = y.get_io_context();
286 auto& yield = y.get_yield_context();
287 boost::system::error_code ec;
288 auto reply = librados::async_notify(context, ioctx, oid,
289 bl, timeout_ms, yield[ec]);
290 if (pbl) {
291 *pbl = std::move(reply);
292 }
293 return -ec.value();
294 }
295 if (is_asio_thread) {
296 ldpp_dout(dpp, 20) << "WARNING: blocking librados call" << dendl;
297 }
298 return ioctx.notify2(oid, bl, timeout_ms, pbl);
299 }
300
301 void parse_mime_map_line(const char *start, const char *end)
302 {
303 char line[end - start + 1];
304 strncpy(line, start, end - start);
305 line[end - start] = '\0';
306 char *l = line;
307 #define DELIMS " \t\n\r"
308
309 while (isspace(*l))
310 l++;
311
312 char *mime = strsep(&l, DELIMS);
313 if (!mime)
314 return;
315
316 char *ext;
317 do {
318 ext = strsep(&l, DELIMS);
319 if (ext && *ext) {
320 (*ext_mime_map)[ext] = mime;
321 }
322 } while (ext);
323 }
324
325
326 void parse_mime_map(const char *buf)
327 {
328 const char *start = buf, *end = buf;
329 while (*end) {
330 while (*end && *end != '\n') {
331 end++;
332 }
333 parse_mime_map_line(start, end);
334 end++;
335 start = end;
336 }
337 }
338
339 static int ext_mime_map_init(const DoutPrefixProvider *dpp, CephContext *cct, const char *ext_map)
340 {
341 int fd = open(ext_map, O_RDONLY);
342 char *buf = NULL;
343 int ret;
344 if (fd < 0) {
345 ret = -errno;
346 ldpp_dout(dpp, 0) << __func__ << " failed to open file=" << ext_map
347 << " : " << cpp_strerror(-ret) << dendl;
348 return ret;
349 }
350
351 struct stat st;
352 ret = fstat(fd, &st);
353 if (ret < 0) {
354 ret = -errno;
355 ldpp_dout(dpp, 0) << __func__ << " failed to stat file=" << ext_map
356 << " : " << cpp_strerror(-ret) << dendl;
357 goto done;
358 }
359
360 buf = (char *)malloc(st.st_size + 1);
361 if (!buf) {
362 ret = -ENOMEM;
363 ldpp_dout(dpp, 0) << __func__ << " failed to allocate buf" << dendl;
364 goto done;
365 }
366
367 ret = safe_read(fd, buf, st.st_size + 1);
368 if (ret != st.st_size) {
369 // huh? file size has changed?
370 ldpp_dout(dpp, 0) << __func__ << " raced! will retry.." << dendl;
371 free(buf);
372 close(fd);
373 return ext_mime_map_init(dpp, cct, ext_map);
374 }
375 buf[st.st_size] = '\0';
376
377 parse_mime_map(buf);
378 ret = 0;
379 done:
380 free(buf);
381 close(fd);
382 return ret;
383 }
384
385 const char *rgw_find_mime_by_ext(string& ext)
386 {
387 map<string, string>::iterator iter = ext_mime_map->find(ext);
388 if (iter == ext_mime_map->end())
389 return NULL;
390
391 return iter->second.c_str();
392 }
393
394 void rgw_filter_attrset(map<string, bufferlist>& unfiltered_attrset, const string& check_prefix,
395 map<string, bufferlist> *attrset)
396 {
397 attrset->clear();
398 map<string, bufferlist>::iterator iter;
399 for (iter = unfiltered_attrset.lower_bound(check_prefix);
400 iter != unfiltered_attrset.end(); ++iter) {
401 if (!boost::algorithm::starts_with(iter->first, check_prefix))
402 break;
403 (*attrset)[iter->first] = iter->second;
404 }
405 }
406
407 RGWDataAccess::RGWDataAccess(rgw::sal::Store* _store) : store(_store)
408 {
409 }
410
411
412 int RGWDataAccess::Bucket::finish_init()
413 {
414 auto iter = attrs.find(RGW_ATTR_ACL);
415 if (iter == attrs.end()) {
416 return 0;
417 }
418
419 bufferlist::const_iterator bliter = iter->second.begin();
420 try {
421 policy.decode(bliter);
422 } catch (buffer::error& err) {
423 return -EIO;
424 }
425
426 return 0;
427 }
428
429 int RGWDataAccess::Bucket::init(const DoutPrefixProvider *dpp, optional_yield y)
430 {
431 std::unique_ptr<rgw::sal::Bucket> bucket;
432 int ret = sd->store->get_bucket(dpp, nullptr, tenant, name, &bucket, y);
433 if (ret < 0) {
434 return ret;
435 }
436
437 bucket_info = bucket->get_info();
438 mtime = bucket->get_modification_time();
439 attrs = bucket->get_attrs();
440
441 return finish_init();
442 }
443
444 int RGWDataAccess::Bucket::init(const RGWBucketInfo& _bucket_info,
445 const map<string, bufferlist>& _attrs)
446 {
447 bucket_info = _bucket_info;
448 attrs = _attrs;
449
450 return finish_init();
451 }
452
453 int RGWDataAccess::Bucket::get_object(const rgw_obj_key& key,
454 ObjectRef *obj) {
455 obj->reset(new Object(sd, shared_from_this(), key));
456 return 0;
457 }
458
459 int RGWDataAccess::Object::put(bufferlist& data,
460 map<string, bufferlist>& attrs,
461 const DoutPrefixProvider *dpp,
462 optional_yield y)
463 {
464 rgw::sal::Store* store = sd->store;
465 CephContext *cct = store->ctx();
466
467 string tag;
468 append_rand_alpha(cct, tag, tag, 32);
469
470 RGWBucketInfo& bucket_info = bucket->bucket_info;
471
472 rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
473
474 RGWObjectCtx obj_ctx(store);
475 std::unique_ptr<rgw::sal::Bucket> b;
476 store->get_bucket(NULL, bucket_info, &b);
477 std::unique_ptr<rgw::sal::Object> obj = b->get_object(key);
478
479 auto& owner = bucket->policy.get_owner();
480
481 string req_id = store->zone_unique_id(store->get_new_req_id());
482
483 std::unique_ptr<rgw::sal::Writer> processor;
484 processor = store->get_atomic_writer(dpp, y, std::move(obj),
485 owner.get_id(), obj_ctx,
486 nullptr, olh_epoch, req_id);
487
488 int ret = processor->prepare(y);
489 if (ret < 0)
490 return ret;
491
492 rgw::sal::DataProcessor *filter = processor.get();
493
494 CompressorRef plugin;
495 boost::optional<RGWPutObj_Compress> compressor;
496
497 const auto& compression_type = store->get_zone()->get_params().get_compression_type(bucket_info.placement_rule);
498 if (compression_type != "none") {
499 plugin = Compressor::create(store->ctx(), compression_type);
500 if (!plugin) {
501 ldpp_dout(dpp, 1) << "Cannot load plugin for compression type "
502 << compression_type << dendl;
503 } else {
504 compressor.emplace(store->ctx(), plugin, filter);
505 filter = &*compressor;
506 }
507 }
508
509 off_t ofs = 0;
510 auto obj_size = data.length();
511
512 RGWMD5Etag etag_calc;
513
514 do {
515 size_t read_len = std::min(data.length(), (unsigned int)cct->_conf->rgw_max_chunk_size);
516
517 bufferlist bl;
518
519 data.splice(0, read_len, &bl);
520 etag_calc.update(bl);
521
522 ret = filter->process(std::move(bl), ofs);
523 if (ret < 0)
524 return ret;
525
526 ofs += read_len;
527 } while (data.length() > 0);
528
529 ret = filter->process({}, ofs);
530 if (ret < 0) {
531 return ret;
532 }
533 bool has_etag_attr = false;
534 auto iter = attrs.find(RGW_ATTR_ETAG);
535 if (iter != attrs.end()) {
536 bufferlist& bl = iter->second;
537 etag = bl.to_str();
538 has_etag_attr = true;
539 }
540
541 if (!aclbl) {
542 RGWAccessControlPolicy_S3 policy(cct);
543
544 policy.create_canned(bucket->policy.get_owner(), bucket->policy.get_owner(), string()); /* default private policy */
545
546 policy.encode(aclbl.emplace());
547 }
548
549 if (etag.empty()) {
550 etag_calc.finish(&etag);
551 }
552
553 if (!has_etag_attr) {
554 bufferlist etagbl;
555 etagbl.append(etag);
556 attrs[RGW_ATTR_ETAG] = etagbl;
557 }
558 attrs[RGW_ATTR_ACL] = *aclbl;
559
560 string *puser_data = nullptr;
561 if (user_data) {
562 puser_data = &(*user_data);
563 }
564
565 return processor->complete(obj_size, etag,
566 &mtime, mtime,
567 attrs, delete_at,
568 nullptr, nullptr,
569 puser_data,
570 nullptr, nullptr, y);
571 }
572
573 void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy)
574 {
575 policy.encode(aclbl.emplace());
576 }
577
578 int rgw_tools_init(const DoutPrefixProvider *dpp, CephContext *cct)
579 {
580 ext_mime_map = new std::map<std::string, std::string>;
581 ext_mime_map_init(dpp, cct, cct->_conf->rgw_mime_types_file.c_str());
582 // ignore errors; missing mime.types is not fatal
583 return 0;
584 }
585
586 void rgw_tools_cleanup()
587 {
588 delete ext_mime_map;
589 ext_mime_map = nullptr;
590 }
591
592 void rgw_complete_aio_completion(librados::AioCompletion* c, int r) {
593 auto pc = c->pc;
594 librados::CB_AioCompleteAndSafe cb(pc);
595 cb(r);
596 }