]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_tools.cc
import 15.2.5
[ceph.git] / ceph / src / rgw / rgw_tools.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <errno.h>
5
6 #include "common/errno.h"
7 #include "common/safe_io.h"
8 #include "librados/librados_asio.h"
9 #include "common/async/yield_context.h"
10
11 #include "include/types.h"
12 #include "include/stringify.h"
13
14 #include "rgw_common.h"
15 #include "rgw_rados.h"
16 #include "rgw_tools.h"
17 #include "rgw_acl_s3.h"
18 #include "rgw_op.h"
19 #include "rgw_putobj_processor.h"
20 #include "rgw_aio_throttle.h"
21 #include "rgw_compression.h"
22 #include "rgw_zone.h"
23 #include "osd/osd_types.h"
24
25 #include "services/svc_sys_obj.h"
26 #include "services/svc_zone.h"
27 #include "services/svc_zone_utils.h"
28
29 #define dout_subsys ceph_subsys_rgw
30 #define dout_context g_ceph_context
31
32 #define READ_CHUNK_LEN (512 * 1024)
33
34 static std::map<std::string, std::string>* ext_mime_map;
35
36 int rgw_init_ioctx(librados::Rados *rados, const rgw_pool& pool,
37 librados::IoCtx& ioctx, bool create,
38 bool mostly_omap)
39 {
40 int r = rados->ioctx_create(pool.name.c_str(), ioctx);
41 if (r == -ENOENT && create) {
42 r = rados->pool_create(pool.name.c_str());
43 if (r == -ERANGE) {
44 dout(0)
45 << __func__
46 << " ERROR: librados::Rados::pool_create returned " << cpp_strerror(-r)
47 << " (this can be due to a pool or placement group misconfiguration, e.g."
48 << " pg_num < pgp_num or mon_max_pg_per_osd exceeded)"
49 << dendl;
50 }
51 if (r < 0 && r != -EEXIST) {
52 return r;
53 }
54
55 r = rados->ioctx_create(pool.name.c_str(), ioctx);
56 if (r < 0) {
57 return r;
58 }
59
60 r = ioctx.application_enable(pg_pool_t::APPLICATION_NAME_RGW, false);
61 if (r < 0 && r != -EOPNOTSUPP) {
62 return r;
63 }
64
65 if (mostly_omap) {
66 // set pg_autoscale_bias
67 bufferlist inbl;
68 float bias = g_conf().get_val<double>("rgw_rados_pool_autoscale_bias");
69 int r = rados->mon_command(
70 "{\"prefix\": \"osd pool set\", \"pool\": \"" +
71 pool.name + "\", \"var\": \"pg_autoscale_bias\", \"val\": \"" +
72 stringify(bias) + "\"}",
73 inbl, NULL, NULL);
74 if (r < 0) {
75 dout(10) << __func__ << " warning: failed to set pg_autoscale_bias on "
76 << pool.name << dendl;
77 }
78 // set pg_num_min
79 int min = g_conf().get_val<uint64_t>("rgw_rados_pool_pg_num_min");
80 r = rados->mon_command(
81 "{\"prefix\": \"osd pool set\", \"pool\": \"" +
82 pool.name + "\", \"var\": \"pg_num_min\", \"val\": \"" +
83 stringify(min) + "\"}",
84 inbl, NULL, NULL);
85 if (r < 0) {
86 dout(10) << __func__ << " warning: failed to set pg_num_min on "
87 << pool.name << dendl;
88 }
89 // set recovery_priority
90 int p = g_conf().get_val<uint64_t>("rgw_rados_pool_recovery_priority");
91 r = rados->mon_command(
92 "{\"prefix\": \"osd pool set\", \"pool\": \"" +
93 pool.name + "\", \"var\": \"recovery_priority\": \"" +
94 stringify(p) + "\"}",
95 inbl, NULL, NULL);
96 if (r < 0) {
97 dout(10) << __func__ << " warning: failed to set recovery_priority on "
98 << pool.name << dendl;
99 }
100 }
101 } else if (r < 0) {
102 return r;
103 }
104 if (!pool.ns.empty()) {
105 ioctx.set_namespace(pool.ns);
106 }
107 return 0;
108 }
109
110 void rgw_shard_name(const string& prefix, unsigned max_shards, const string& key, string& name, int *shard_id)
111 {
112 uint32_t val = ceph_str_hash_linux(key.c_str(), key.size());
113 char buf[16];
114 if (shard_id) {
115 *shard_id = val % max_shards;
116 }
117 snprintf(buf, sizeof(buf), "%u", (unsigned)(val % max_shards));
118 name = prefix + buf;
119 }
120
121 void rgw_shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name)
122 {
123 uint32_t val = ceph_str_hash_linux(key.c_str(), key.size());
124 val ^= ceph_str_hash_linux(section.c_str(), section.size());
125 char buf[16];
126 snprintf(buf, sizeof(buf), "%u", (unsigned)(val % max_shards));
127 name = prefix + buf;
128 }
129
130 void rgw_shard_name(const string& prefix, unsigned shard_id, string& name)
131 {
132 char buf[16];
133 snprintf(buf, sizeof(buf), "%u", shard_id);
134 name = prefix + buf;
135 }
136
137 int rgw_parse_list_of_flags(struct rgw_name_to_flag *mapping,
138 const string& str, uint32_t *perm)
139 {
140 list<string> strs;
141 get_str_list(str, strs);
142 list<string>::iterator iter;
143 uint32_t v = 0;
144 for (iter = strs.begin(); iter != strs.end(); ++iter) {
145 string& s = *iter;
146 for (int i = 0; mapping[i].type_name; i++) {
147 if (s.compare(mapping[i].type_name) == 0)
148 v |= mapping[i].flag;
149 }
150 }
151
152 *perm = v;
153 return 0;
154 }
155
156 int rgw_put_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
157 RGWObjVersionTracker *objv_tracker, real_time set_mtime, optional_yield y, map<string, bufferlist> *pattrs)
158 {
159 map<string,bufferlist> no_attrs;
160 if (!pattrs) {
161 pattrs = &no_attrs;
162 }
163
164 rgw_raw_obj obj(pool, oid);
165
166 auto sysobj = obj_ctx.get_obj(obj);
167 int ret = sysobj.wop()
168 .set_objv_tracker(objv_tracker)
169 .set_exclusive(exclusive)
170 .set_mtime(set_mtime)
171 .set_attrs(*pattrs)
172 .write(data, y);
173
174 return ret;
175 }
176
177 int rgw_put_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
178 RGWObjVersionTracker *objv_tracker, real_time set_mtime, map<string, bufferlist> *pattrs)
179 {
180 return rgw_put_system_obj(obj_ctx, pool, oid, data, exclusive,
181 objv_tracker, set_mtime, null_yield, pattrs);
182 }
183
184 int rgw_get_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& key, bufferlist& bl,
185 RGWObjVersionTracker *objv_tracker, real_time *pmtime, optional_yield y, map<string, bufferlist> *pattrs,
186 rgw_cache_entry_info *cache_info,
187 boost::optional<obj_version> refresh_version)
188 {
189 bufferlist::iterator iter;
190 int request_len = READ_CHUNK_LEN;
191 rgw_raw_obj obj(pool, key);
192
193 obj_version original_readv;
194 if (objv_tracker && !objv_tracker->read_version.empty()) {
195 original_readv = objv_tracker->read_version;
196 }
197
198 do {
199 auto sysobj = obj_ctx.get_obj(obj);
200 auto rop = sysobj.rop();
201
202 int ret = rop.set_attrs(pattrs)
203 .set_last_mod(pmtime)
204 .set_objv_tracker(objv_tracker)
205 .stat(y);
206 if (ret < 0)
207 return ret;
208
209 ret = rop.set_cache_info(cache_info)
210 .set_refresh_version(refresh_version)
211 .read(&bl, y);
212 if (ret == -ECANCELED) {
213 /* raced, restart */
214 if (!original_readv.empty()) {
215 /* we were asked to read a specific obj_version, failed */
216 return ret;
217 }
218 if (objv_tracker) {
219 objv_tracker->read_version.clear();
220 }
221 sysobj.invalidate();
222 continue;
223 }
224 if (ret < 0)
225 return ret;
226
227 if (ret < request_len)
228 break;
229 bl.clear();
230 request_len *= 2;
231 } while (true);
232
233 return 0;
234 }
235
236 int rgw_delete_system_obj(RGWSI_SysObj *sysobj_svc, const rgw_pool& pool, const string& oid,
237 RGWObjVersionTracker *objv_tracker)
238 {
239 auto obj_ctx = sysobj_svc->init_obj_ctx();
240 auto sysobj = obj_ctx.get_obj(rgw_raw_obj{pool, oid});
241 rgw_raw_obj obj(pool, oid);
242 return sysobj.wop()
243 .set_objv_tracker(objv_tracker)
244 .remove(null_yield);
245 }
246
247 thread_local bool is_asio_thread = false;
248
249 int rgw_rados_operate(librados::IoCtx& ioctx, const std::string& oid,
250 librados::ObjectReadOperation *op, bufferlist* pbl,
251 optional_yield y, int flags)
252 {
253 #ifdef HAVE_BOOST_CONTEXT
254 // given a yield_context, call async_operate() to yield the coroutine instead
255 // of blocking
256 if (y) {
257 auto& context = y.get_io_context();
258 auto& yield = y.get_yield_context();
259 boost::system::error_code ec;
260 auto bl = librados::async_operate(
261 context, ioctx, oid, op, flags, yield[ec]);
262 if (pbl) {
263 *pbl = std::move(bl);
264 }
265 return -ec.value();
266 }
267 // work on asio threads should be asynchronous, so warn when they block
268 if (is_asio_thread) {
269 dout(20) << "WARNING: blocking librados call" << dendl;
270 }
271 #endif
272 return ioctx.operate(oid, op, nullptr, flags);
273 }
274
275 int rgw_rados_operate(librados::IoCtx& ioctx, const std::string& oid,
276 librados::ObjectWriteOperation *op, optional_yield y,
277 int flags)
278 {
279 #ifdef HAVE_BOOST_CONTEXT
280 if (y) {
281 auto& context = y.get_io_context();
282 auto& yield = y.get_yield_context();
283 boost::system::error_code ec;
284 librados::async_operate(context, ioctx, oid, op, flags, yield[ec]);
285 return -ec.value();
286 }
287 if (is_asio_thread) {
288 dout(20) << "WARNING: blocking librados call" << dendl;
289 }
290 #endif
291 return ioctx.operate(oid, op, flags);
292 }
293
294 int rgw_rados_notify(librados::IoCtx& ioctx, const std::string& oid,
295 bufferlist& bl, uint64_t timeout_ms, bufferlist* pbl,
296 optional_yield y)
297 {
298 #ifdef HAVE_BOOST_CONTEXT
299 if (y) {
300 auto& context = y.get_io_context();
301 auto& yield = y.get_yield_context();
302 boost::system::error_code ec;
303 auto reply = librados::async_notify(context, ioctx, oid,
304 bl, timeout_ms, yield[ec]);
305 if (pbl) {
306 *pbl = std::move(reply);
307 }
308 return -ec.value();
309 }
310 if (is_asio_thread) {
311 dout(20) << "WARNING: blocking librados call" << dendl;
312 }
313 #endif
314 return ioctx.notify2(oid, bl, timeout_ms, pbl);
315 }
316
317 void parse_mime_map_line(const char *start, const char *end)
318 {
319 char line[end - start + 1];
320 strncpy(line, start, end - start);
321 line[end - start] = '\0';
322 char *l = line;
323 #define DELIMS " \t\n\r"
324
325 while (isspace(*l))
326 l++;
327
328 char *mime = strsep(&l, DELIMS);
329 if (!mime)
330 return;
331
332 char *ext;
333 do {
334 ext = strsep(&l, DELIMS);
335 if (ext && *ext) {
336 (*ext_mime_map)[ext] = mime;
337 }
338 } while (ext);
339 }
340
341
342 void parse_mime_map(const char *buf)
343 {
344 const char *start = buf, *end = buf;
345 while (*end) {
346 while (*end && *end != '\n') {
347 end++;
348 }
349 parse_mime_map_line(start, end);
350 end++;
351 start = end;
352 }
353 }
354
355 static int ext_mime_map_init(CephContext *cct, const char *ext_map)
356 {
357 int fd = open(ext_map, O_RDONLY);
358 char *buf = NULL;
359 int ret;
360 if (fd < 0) {
361 ret = -errno;
362 ldout(cct, 0) << __func__ << " failed to open file=" << ext_map
363 << " : " << cpp_strerror(-ret) << dendl;
364 return ret;
365 }
366
367 struct stat st;
368 ret = fstat(fd, &st);
369 if (ret < 0) {
370 ret = -errno;
371 ldout(cct, 0) << __func__ << " failed to stat file=" << ext_map
372 << " : " << cpp_strerror(-ret) << dendl;
373 goto done;
374 }
375
376 buf = (char *)malloc(st.st_size + 1);
377 if (!buf) {
378 ret = -ENOMEM;
379 ldout(cct, 0) << __func__ << " failed to allocate buf" << dendl;
380 goto done;
381 }
382
383 ret = safe_read(fd, buf, st.st_size + 1);
384 if (ret != st.st_size) {
385 // huh? file size has changed?
386 ldout(cct, 0) << __func__ << " raced! will retry.." << dendl;
387 free(buf);
388 close(fd);
389 return ext_mime_map_init(cct, ext_map);
390 }
391 buf[st.st_size] = '\0';
392
393 parse_mime_map(buf);
394 ret = 0;
395 done:
396 free(buf);
397 close(fd);
398 return ret;
399 }
400
401 const char *rgw_find_mime_by_ext(string& ext)
402 {
403 map<string, string>::iterator iter = ext_mime_map->find(ext);
404 if (iter == ext_mime_map->end())
405 return NULL;
406
407 return iter->second.c_str();
408 }
409
410 void rgw_filter_attrset(map<string, bufferlist>& unfiltered_attrset, const string& check_prefix,
411 map<string, bufferlist> *attrset)
412 {
413 attrset->clear();
414 map<string, bufferlist>::iterator iter;
415 for (iter = unfiltered_attrset.lower_bound(check_prefix);
416 iter != unfiltered_attrset.end(); ++iter) {
417 if (!boost::algorithm::starts_with(iter->first, check_prefix))
418 break;
419 (*attrset)[iter->first] = iter->second;
420 }
421 }
422
423 RGWDataAccess::RGWDataAccess(rgw::sal::RGWRadosStore *_store) : store(_store)
424 {
425 sysobj_ctx = std::make_unique<RGWSysObjectCtx>(store->svc()->sysobj->init_obj_ctx());
426 }
427
428
429 int RGWDataAccess::Bucket::finish_init()
430 {
431 auto iter = attrs.find(RGW_ATTR_ACL);
432 if (iter == attrs.end()) {
433 return 0;
434 }
435
436 bufferlist::const_iterator bliter = iter->second.begin();
437 try {
438 policy.decode(bliter);
439 } catch (buffer::error& err) {
440 return -EIO;
441 }
442
443 return 0;
444 }
445
446 int RGWDataAccess::Bucket::init()
447 {
448 int ret = sd->store->getRados()->get_bucket_info(sd->store->svc(),
449 tenant, name,
450 bucket_info,
451 &mtime,
452 null_yield,
453 &attrs);
454 if (ret < 0) {
455 return ret;
456 }
457
458 return finish_init();
459 }
460
461 int RGWDataAccess::Bucket::init(const RGWBucketInfo& _bucket_info,
462 const map<string, bufferlist>& _attrs)
463 {
464 bucket_info = _bucket_info;
465 attrs = _attrs;
466
467 return finish_init();
468 }
469
470 int RGWDataAccess::Bucket::get_object(const rgw_obj_key& key,
471 ObjectRef *obj) {
472 obj->reset(new Object(sd, shared_from_this(), key));
473 return 0;
474 }
475
476 int RGWDataAccess::Object::put(bufferlist& data,
477 map<string, bufferlist>& attrs,
478 const DoutPrefixProvider *dpp,
479 optional_yield y)
480 {
481 rgw::sal::RGWRadosStore *store = sd->store;
482 CephContext *cct = store->ctx();
483
484 string tag;
485 append_rand_alpha(cct, tag, tag, 32);
486
487 RGWBucketInfo& bucket_info = bucket->bucket_info;
488
489 rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
490
491 RGWObjectCtx obj_ctx(store);
492 rgw_obj obj(bucket_info.bucket, key);
493
494 auto& owner = bucket->policy.get_owner();
495
496 string req_id = store->svc()->zone_utils->unique_id(store->getRados()->get_new_req_id());
497
498 using namespace rgw::putobj;
499 AtomicObjectProcessor processor(&aio, store, bucket_info, nullptr,
500 owner.get_id(), obj_ctx, obj, olh_epoch,
501 req_id, dpp, y);
502
503 int ret = processor.prepare(y);
504 if (ret < 0)
505 return ret;
506
507 DataProcessor *filter = &processor;
508
509 CompressorRef plugin;
510 boost::optional<RGWPutObj_Compress> compressor;
511
512 const auto& compression_type = store->svc()->zone->get_zone_params().get_compression_type(bucket_info.placement_rule);
513 if (compression_type != "none") {
514 plugin = Compressor::create(store->ctx(), compression_type);
515 if (!plugin) {
516 ldout(store->ctx(), 1) << "Cannot load plugin for compression type "
517 << compression_type << dendl;
518 } else {
519 compressor.emplace(store->ctx(), plugin, filter);
520 filter = &*compressor;
521 }
522 }
523
524 off_t ofs = 0;
525 auto obj_size = data.length();
526
527 RGWMD5Etag etag_calc;
528
529 do {
530 size_t read_len = std::min(data.length(), (unsigned int)cct->_conf->rgw_max_chunk_size);
531
532 bufferlist bl;
533
534 data.splice(0, read_len, &bl);
535 etag_calc.update(bl);
536
537 ret = filter->process(std::move(bl), ofs);
538 if (ret < 0)
539 return ret;
540
541 ofs += read_len;
542 } while (data.length() > 0);
543
544 ret = filter->process({}, ofs);
545 if (ret < 0) {
546 return ret;
547 }
548 bool has_etag_attr = false;
549 auto iter = attrs.find(RGW_ATTR_ETAG);
550 if (iter != attrs.end()) {
551 bufferlist& bl = iter->second;
552 etag = bl.to_str();
553 has_etag_attr = true;
554 }
555
556 if (!aclbl) {
557 RGWAccessControlPolicy_S3 policy(cct);
558
559 policy.create_canned(bucket->policy.get_owner(), bucket->policy.get_owner(), string()); /* default private policy */
560
561 policy.encode(aclbl.emplace());
562 }
563
564 if (etag.empty()) {
565 etag_calc.finish(&etag);
566 }
567
568 if (!has_etag_attr) {
569 bufferlist etagbl;
570 etagbl.append(etag);
571 attrs[RGW_ATTR_ETAG] = etagbl;
572 }
573 attrs[RGW_ATTR_ACL] = *aclbl;
574
575 string *puser_data = nullptr;
576 if (user_data) {
577 puser_data = &(*user_data);
578 }
579
580 return processor.complete(obj_size, etag,
581 &mtime, mtime,
582 attrs, delete_at,
583 nullptr, nullptr,
584 puser_data,
585 nullptr, nullptr, y);
586 }
587
588 void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy)
589 {
590 policy.encode(aclbl.emplace());
591 }
592
593 int rgw_tools_init(CephContext *cct)
594 {
595 ext_mime_map = new std::map<std::string, std::string>;
596 ext_mime_map_init(cct, cct->_conf->rgw_mime_types_file.c_str());
597 // ignore errors; missing mime.types is not fatal
598 return 0;
599 }
600
601 void rgw_tools_cleanup()
602 {
603 delete ext_mime_map;
604 ext_mime_map = nullptr;
605 }