]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_tools.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / rgw / rgw_tools.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <errno.h>
5
6 #include "common/errno.h"
7 #include "common/safe_io.h"
8 #include "librados/librados_asio.h"
9 #include "common/async/yield_context.h"
10
11 #include "include/types.h"
12 #include "include/stringify.h"
13
14 #include "rgw_common.h"
15 #include "rgw_rados.h"
16 #include "rgw_tools.h"
17 #include "rgw_acl_s3.h"
18 #include "rgw_op.h"
19 #include "rgw_putobj_processor.h"
20 #include "rgw_aio_throttle.h"
21 #include "rgw_compression.h"
22 #include "rgw_zone.h"
23 #include "osd/osd_types.h"
24
25 #include "services/svc_sys_obj.h"
26 #include "services/svc_zone_utils.h"
27
28 #define dout_subsys ceph_subsys_rgw
29 #define dout_context g_ceph_context
30
31 #define READ_CHUNK_LEN (512 * 1024)
32
33 static std::map<std::string, std::string>* ext_mime_map;
34
35 int rgw_init_ioctx(librados::Rados *rados, const rgw_pool& pool,
36 librados::IoCtx& ioctx, bool create,
37 bool mostly_omap)
38 {
39 int r = rados->ioctx_create(pool.name.c_str(), ioctx);
40 if (r == -ENOENT && create) {
41 r = rados->pool_create(pool.name.c_str());
42 if (r == -ERANGE) {
43 dout(0)
44 << __func__
45 << " ERROR: librados::Rados::pool_create returned " << cpp_strerror(-r)
46 << " (this can be due to a pool or placement group misconfiguration, e.g."
47 << " pg_num < pgp_num or mon_max_pg_per_osd exceeded)"
48 << dendl;
49 }
50 if (r < 0 && r != -EEXIST) {
51 return r;
52 }
53
54 r = rados->ioctx_create(pool.name.c_str(), ioctx);
55 if (r < 0) {
56 return r;
57 }
58
59 r = ioctx.application_enable(pg_pool_t::APPLICATION_NAME_RGW, false);
60 if (r < 0 && r != -EOPNOTSUPP) {
61 return r;
62 }
63
64 if (mostly_omap) {
65 // set pg_autoscale_bias
66 bufferlist inbl;
67 float bias = g_conf().get_val<double>("rgw_rados_pool_autoscale_bias");
68 int r = rados->mon_command(
69 "{\"prefix\": \"osd pool set\", \"pool\": \"" +
70 pool.name + "\", \"var\": \"pg_autoscale_bias\": \"" +
71 stringify(bias) + "\"}",
72 inbl, NULL, NULL);
73 if (r < 0) {
74 dout(10) << __func__ << " warning: failed to set pg_autoscale_bias on "
75 << pool.name << dendl;
76 }
77 // set pg_num_min
78 int min = g_conf().get_val<uint64_t>("rgw_rados_pool_pg_num_min");
79 r = rados->mon_command(
80 "{\"prefix\": \"osd pool set\", \"pool\": \"" +
81 pool.name + "\", \"var\": \"pg_num_min\": \"" +
82 stringify(min) + "\"}",
83 inbl, NULL, NULL);
84 if (r < 0) {
85 dout(10) << __func__ << " warning: failed to set pg_num_min on "
86 << pool.name << dendl;
87 }
88 }
89 } else if (r < 0) {
90 return r;
91 }
92 if (!pool.ns.empty()) {
93 ioctx.set_namespace(pool.ns);
94 }
95 return 0;
96 }
97
98 int rgw_put_system_obj(RGWRados *rgwstore, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
99 RGWObjVersionTracker *objv_tracker, real_time set_mtime, map<string, bufferlist> *pattrs)
100 {
101 map<string,bufferlist> no_attrs;
102 if (!pattrs) {
103 pattrs = &no_attrs;
104 }
105
106 rgw_raw_obj obj(pool, oid);
107
108 auto obj_ctx = rgwstore->svc.sysobj->init_obj_ctx();
109 auto sysobj = obj_ctx.get_obj(obj);
110 int ret = sysobj.wop()
111 .set_objv_tracker(objv_tracker)
112 .set_exclusive(exclusive)
113 .set_mtime(set_mtime)
114 .set_attrs(*pattrs)
115 .write(data);
116
117 if (ret == -ENOENT) {
118 ret = rgwstore->create_pool(pool);
119 if (ret >= 0) {
120 ret = sysobj.wop()
121 .set_objv_tracker(objv_tracker)
122 .set_exclusive(exclusive)
123 .set_mtime(set_mtime)
124 .set_attrs(*pattrs)
125 .write(data);
126 }
127 }
128
129 return ret;
130 }
131
132 int rgw_get_system_obj(RGWRados *rgwstore, RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& key, bufferlist& bl,
133 RGWObjVersionTracker *objv_tracker, real_time *pmtime, map<string, bufferlist> *pattrs,
134 rgw_cache_entry_info *cache_info, boost::optional<obj_version> refresh_version)
135 {
136 bufferlist::iterator iter;
137 int request_len = READ_CHUNK_LEN;
138 rgw_raw_obj obj(pool, key);
139
140 obj_version original_readv;
141 if (objv_tracker && !objv_tracker->read_version.empty()) {
142 original_readv = objv_tracker->read_version;
143 }
144
145 do {
146 auto sysobj = obj_ctx.get_obj(obj);
147 auto rop = sysobj.rop();
148
149 int ret = rop.set_attrs(pattrs)
150 .set_last_mod(pmtime)
151 .set_objv_tracker(objv_tracker)
152 .stat();
153 if (ret < 0)
154 return ret;
155
156 ret = rop.set_cache_info(cache_info)
157 .set_refresh_version(refresh_version)
158 .read(&bl);
159 if (ret == -ECANCELED) {
160 /* raced, restart */
161 if (!original_readv.empty()) {
162 /* we were asked to read a specific obj_version, failed */
163 return ret;
164 }
165 if (objv_tracker) {
166 objv_tracker->read_version.clear();
167 }
168 sysobj.invalidate();
169 continue;
170 }
171 if (ret < 0)
172 return ret;
173
174 if (ret < request_len)
175 break;
176 bl.clear();
177 request_len *= 2;
178 } while (true);
179
180 return 0;
181 }
182
183 int rgw_delete_system_obj(RGWRados *rgwstore, const rgw_pool& pool, const string& oid,
184 RGWObjVersionTracker *objv_tracker)
185 {
186 auto obj_ctx = rgwstore->svc.sysobj->init_obj_ctx();
187 auto sysobj = obj_ctx.get_obj(rgw_raw_obj{pool, oid});
188 rgw_raw_obj obj(pool, oid);
189 return sysobj.wop()
190 .set_objv_tracker(objv_tracker)
191 .remove();
192 }
193
194 thread_local bool is_asio_thread = false;
195
196 int rgw_rados_operate(librados::IoCtx& ioctx, const std::string& oid,
197 librados::ObjectReadOperation *op, bufferlist* pbl,
198 optional_yield y)
199 {
200 #ifdef HAVE_BOOST_CONTEXT
201 // given a yield_context, call async_operate() to yield the coroutine instead
202 // of blocking
203 if (y) {
204 auto& context = y.get_io_context();
205 auto& yield = y.get_yield_context();
206 boost::system::error_code ec;
207 auto bl = librados::async_operate(context, ioctx, oid, op, 0, yield[ec]);
208 if (pbl) {
209 *pbl = std::move(bl);
210 }
211 return -ec.value();
212 }
213 // work on asio threads should be asynchronous, so warn when they block
214 if (is_asio_thread) {
215 dout(20) << "WARNING: blocking librados call" << dendl;
216 }
217 #endif
218 return ioctx.operate(oid, op, nullptr);
219 }
220
221 int rgw_rados_operate(librados::IoCtx& ioctx, const std::string& oid,
222 librados::ObjectWriteOperation *op, optional_yield y)
223 {
224 #ifdef HAVE_BOOST_CONTEXT
225 if (y) {
226 auto& context = y.get_io_context();
227 auto& yield = y.get_yield_context();
228 boost::system::error_code ec;
229 librados::async_operate(context, ioctx, oid, op, 0, yield[ec]);
230 return -ec.value();
231 }
232 if (is_asio_thread) {
233 dout(20) << "WARNING: blocking librados call" << dendl;
234 }
235 #endif
236 return ioctx.operate(oid, op);
237 }
238
239 void parse_mime_map_line(const char *start, const char *end)
240 {
241 char line[end - start + 1];
242 strncpy(line, start, end - start);
243 line[end - start] = '\0';
244 char *l = line;
245 #define DELIMS " \t\n\r"
246
247 while (isspace(*l))
248 l++;
249
250 char *mime = strsep(&l, DELIMS);
251 if (!mime)
252 return;
253
254 char *ext;
255 do {
256 ext = strsep(&l, DELIMS);
257 if (ext && *ext) {
258 (*ext_mime_map)[ext] = mime;
259 }
260 } while (ext);
261 }
262
263
264 void parse_mime_map(const char *buf)
265 {
266 const char *start = buf, *end = buf;
267 while (*end) {
268 while (*end && *end != '\n') {
269 end++;
270 }
271 parse_mime_map_line(start, end);
272 end++;
273 start = end;
274 }
275 }
276
277 static int ext_mime_map_init(CephContext *cct, const char *ext_map)
278 {
279 int fd = open(ext_map, O_RDONLY);
280 char *buf = NULL;
281 int ret;
282 if (fd < 0) {
283 ret = -errno;
284 ldout(cct, 0) << __func__ << " failed to open file=" << ext_map
285 << " : " << cpp_strerror(-ret) << dendl;
286 return ret;
287 }
288
289 struct stat st;
290 ret = fstat(fd, &st);
291 if (ret < 0) {
292 ret = -errno;
293 ldout(cct, 0) << __func__ << " failed to stat file=" << ext_map
294 << " : " << cpp_strerror(-ret) << dendl;
295 goto done;
296 }
297
298 buf = (char *)malloc(st.st_size + 1);
299 if (!buf) {
300 ret = -ENOMEM;
301 ldout(cct, 0) << __func__ << " failed to allocate buf" << dendl;
302 goto done;
303 }
304
305 ret = safe_read(fd, buf, st.st_size + 1);
306 if (ret != st.st_size) {
307 // huh? file size has changed?
308 ldout(cct, 0) << __func__ << " raced! will retry.." << dendl;
309 free(buf);
310 close(fd);
311 return ext_mime_map_init(cct, ext_map);
312 }
313 buf[st.st_size] = '\0';
314
315 parse_mime_map(buf);
316 ret = 0;
317 done:
318 free(buf);
319 close(fd);
320 return ret;
321 }
322
323 const char *rgw_find_mime_by_ext(string& ext)
324 {
325 map<string, string>::iterator iter = ext_mime_map->find(ext);
326 if (iter == ext_mime_map->end())
327 return NULL;
328
329 return iter->second.c_str();
330 }
331
332 void rgw_filter_attrset(map<string, bufferlist>& unfiltered_attrset, const string& check_prefix,
333 map<string, bufferlist> *attrset)
334 {
335 attrset->clear();
336 map<string, bufferlist>::iterator iter;
337 for (iter = unfiltered_attrset.lower_bound(check_prefix);
338 iter != unfiltered_attrset.end(); ++iter) {
339 if (!boost::algorithm::starts_with(iter->first, check_prefix))
340 break;
341 (*attrset)[iter->first] = iter->second;
342 }
343 }
344
345 RGWDataAccess::RGWDataAccess(RGWRados *_store) : store(_store)
346 {
347 sysobj_ctx = std::make_unique<RGWSysObjectCtx>(store->svc.sysobj->init_obj_ctx());
348 }
349
350
351 int RGWDataAccess::Bucket::finish_init()
352 {
353 auto iter = attrs.find(RGW_ATTR_ACL);
354 if (iter == attrs.end()) {
355 return 0;
356 }
357
358 bufferlist::const_iterator bliter = iter->second.begin();
359 try {
360 policy.decode(bliter);
361 } catch (buffer::error& err) {
362 return -EIO;
363 }
364
365 return 0;
366 }
367
368 int RGWDataAccess::Bucket::init()
369 {
370 int ret = sd->store->get_bucket_info(*sd->sysobj_ctx,
371 tenant, name,
372 bucket_info,
373 &mtime,
374 &attrs);
375 if (ret < 0) {
376 return ret;
377 }
378
379 return finish_init();
380 }
381
382 int RGWDataAccess::Bucket::init(const RGWBucketInfo& _bucket_info,
383 const map<string, bufferlist>& _attrs)
384 {
385 bucket_info = _bucket_info;
386 attrs = _attrs;
387
388 return finish_init();
389 }
390
391 int RGWDataAccess::Bucket::get_object(const rgw_obj_key& key,
392 ObjectRef *obj) {
393 obj->reset(new Object(sd, shared_from_this(), key));
394 return 0;
395 }
396
397 int RGWDataAccess::Object::put(bufferlist& data,
398 map<string, bufferlist>& attrs)
399 {
400 RGWRados *store = sd->store;
401 CephContext *cct = store->ctx();
402
403 string tag;
404 append_rand_alpha(cct, tag, tag, 32);
405
406 RGWBucketInfo& bucket_info = bucket->bucket_info;
407
408 using namespace rgw::putobj;
409 rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
410
411 RGWObjectCtx obj_ctx(store);
412 rgw_obj obj(bucket_info.bucket, key);
413
414 auto& owner = bucket->policy.get_owner();
415
416 string req_id = store->svc.zone_utils->unique_id(store->get_new_req_id());
417
418 AtomicObjectProcessor processor(&aio, store, bucket_info,
419 nullptr,
420 owner.get_id(),
421 obj_ctx, obj, olh_epoch, req_id);
422
423 int ret = processor.prepare();
424 if (ret < 0)
425 return ret;
426
427 using namespace rgw::putobj;
428
429 DataProcessor *filter = &processor;
430
431 CompressorRef plugin;
432 boost::optional<RGWPutObj_Compress> compressor;
433
434 const auto& compression_type = store->svc.zone->get_zone_params().get_compression_type(bucket_info.placement_rule);
435 if (compression_type != "none") {
436 plugin = Compressor::create(store->ctx(), compression_type);
437 if (!plugin) {
438 ldout(store->ctx(), 1) << "Cannot load plugin for compression type "
439 << compression_type << dendl;
440 } else {
441 compressor.emplace(store->ctx(), plugin, filter);
442 filter = &*compressor;
443 }
444 }
445
446 off_t ofs = 0;
447 auto obj_size = data.length();
448
449 RGWMD5Etag etag_calc;
450
451 do {
452 size_t read_len = std::min(data.length(), (unsigned int)cct->_conf->rgw_max_chunk_size);
453
454 bufferlist bl;
455
456 data.splice(0, read_len, &bl);
457 etag_calc.update(bl);
458
459 ret = filter->process(std::move(bl), ofs);
460 if (ret < 0)
461 return ret;
462
463 ofs += read_len;
464 } while (data.length() > 0);
465
466 ret = filter->process({}, ofs);
467 if (ret < 0) {
468 return ret;
469 }
470 bool has_etag_attr = false;
471 auto iter = attrs.find(RGW_ATTR_ETAG);
472 if (iter != attrs.end()) {
473 bufferlist& bl = iter->second;
474 etag = bl.to_str();
475 has_etag_attr = true;
476 }
477
478 if (!aclbl) {
479 RGWAccessControlPolicy_S3 policy(cct);
480
481 policy.create_canned(bucket->policy.get_owner(), bucket->policy.get_owner(), string()); /* default private policy */
482
483 policy.encode(aclbl.emplace());
484 }
485
486 if (etag.empty()) {
487 etag_calc.finish(&etag);
488 }
489
490 if (!has_etag_attr) {
491 bufferlist etagbl;
492 etagbl.append(etag);
493 attrs[RGW_ATTR_ETAG] = etagbl;
494 }
495 attrs[RGW_ATTR_ACL] = *aclbl;
496
497 string *puser_data = nullptr;
498 if (user_data) {
499 puser_data = &(*user_data);
500 }
501
502 return processor.complete(obj_size, etag,
503 &mtime, mtime,
504 attrs, delete_at,
505 nullptr, nullptr,
506 puser_data,
507 nullptr, nullptr);
508 }
509
510 void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy)
511 {
512 policy.encode(aclbl.emplace());
513 }
514
515 int rgw_tools_init(CephContext *cct)
516 {
517 ext_mime_map = new std::map<std::string, std::string>;
518 ext_mime_map_init(cct, cct->_conf->rgw_mime_types_file.c_str());
519 // ignore errors; missing mime.types is not fatal
520 return 0;
521 }
522
523 void rgw_tools_cleanup()
524 {
525 delete ext_mime_map;
526 ext_mime_map = nullptr;
527 }