]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_tools.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rgw / rgw_tools.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <errno.h>
5
6 #include "common/errno.h"
7 #include "common/safe_io.h"
8 #include "librados/librados_asio.h"
9 #include "common/async/yield_context.h"
10
11 #include "include/types.h"
12
13 #include "rgw_common.h"
14 #include "rgw_rados.h"
15 #include "rgw_tools.h"
16 #include "rgw_acl_s3.h"
17 #include "rgw_op.h"
18 #include "rgw_putobj_processor.h"
19 #include "rgw_aio_throttle.h"
20 #include "rgw_compression.h"
21 #include "rgw_zone.h"
22
23 #include "services/svc_sys_obj.h"
24 #include "services/svc_zone_utils.h"
25
26 #define dout_subsys ceph_subsys_rgw
27 #define dout_context g_ceph_context
28
29 #define READ_CHUNK_LEN (512 * 1024)
30
31 static std::map<std::string, std::string>* ext_mime_map;
32
33 int rgw_put_system_obj(RGWRados *rgwstore, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
34 RGWObjVersionTracker *objv_tracker, real_time set_mtime, map<string, bufferlist> *pattrs)
35 {
36 map<string,bufferlist> no_attrs;
37 if (!pattrs) {
38 pattrs = &no_attrs;
39 }
40
41 rgw_raw_obj obj(pool, oid);
42
43 auto obj_ctx = rgwstore->svc.sysobj->init_obj_ctx();
44 auto sysobj = obj_ctx.get_obj(obj);
45 int ret = sysobj.wop()
46 .set_objv_tracker(objv_tracker)
47 .set_exclusive(exclusive)
48 .set_mtime(set_mtime)
49 .set_attrs(*pattrs)
50 .write(data);
51
52 if (ret == -ENOENT) {
53 ret = rgwstore->create_pool(pool);
54 if (ret >= 0) {
55 ret = sysobj.wop()
56 .set_objv_tracker(objv_tracker)
57 .set_exclusive(exclusive)
58 .set_mtime(set_mtime)
59 .set_attrs(*pattrs)
60 .write(data);
61 }
62 }
63
64 return ret;
65 }
66
67 int rgw_get_system_obj(RGWRados *rgwstore, RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& key, bufferlist& bl,
68 RGWObjVersionTracker *objv_tracker, real_time *pmtime, map<string, bufferlist> *pattrs,
69 rgw_cache_entry_info *cache_info, boost::optional<obj_version> refresh_version)
70 {
71 bufferlist::iterator iter;
72 int request_len = READ_CHUNK_LEN;
73 rgw_raw_obj obj(pool, key);
74
75 obj_version original_readv;
76 if (objv_tracker && !objv_tracker->read_version.empty()) {
77 original_readv = objv_tracker->read_version;
78 }
79
80 do {
81 auto sysobj = obj_ctx.get_obj(obj);
82 auto rop = sysobj.rop();
83
84 int ret = rop.set_attrs(pattrs)
85 .set_last_mod(pmtime)
86 .set_objv_tracker(objv_tracker)
87 .stat();
88 if (ret < 0)
89 return ret;
90
91 ret = rop.set_cache_info(cache_info)
92 .set_refresh_version(refresh_version)
93 .read(&bl);
94 if (ret == -ECANCELED) {
95 /* raced, restart */
96 if (!original_readv.empty()) {
97 /* we were asked to read a specific obj_version, failed */
98 return ret;
99 }
100 if (objv_tracker) {
101 objv_tracker->read_version.clear();
102 }
103 sysobj.invalidate();
104 continue;
105 }
106 if (ret < 0)
107 return ret;
108
109 if (ret < request_len)
110 break;
111 bl.clear();
112 request_len *= 2;
113 } while (true);
114
115 return 0;
116 }
117
118 int rgw_delete_system_obj(RGWRados *rgwstore, const rgw_pool& pool, const string& oid,
119 RGWObjVersionTracker *objv_tracker)
120 {
121 auto obj_ctx = rgwstore->svc.sysobj->init_obj_ctx();
122 auto sysobj = obj_ctx.get_obj(rgw_raw_obj{pool, oid});
123 rgw_raw_obj obj(pool, oid);
124 return sysobj.wop()
125 .set_objv_tracker(objv_tracker)
126 .remove();
127 }
128
129 thread_local bool is_asio_thread = false;
130
131 int rgw_rados_operate(librados::IoCtx& ioctx, const std::string& oid,
132 librados::ObjectReadOperation *op, bufferlist* pbl,
133 optional_yield y)
134 {
135 #ifdef HAVE_BOOST_CONTEXT
136 // given a yield_context, call async_operate() to yield the coroutine instead
137 // of blocking
138 if (y) {
139 auto& context = y.get_io_context();
140 auto& yield = y.get_yield_context();
141 boost::system::error_code ec;
142 auto bl = librados::async_operate(context, ioctx, oid, op, 0, yield[ec]);
143 if (pbl) {
144 *pbl = std::move(bl);
145 }
146 return -ec.value();
147 }
148 // work on asio threads should be asynchronous, so warn when they block
149 if (is_asio_thread) {
150 dout(20) << "WARNING: blocking librados call" << dendl;
151 }
152 #endif
153 return ioctx.operate(oid, op, nullptr);
154 }
155
156 int rgw_rados_operate(librados::IoCtx& ioctx, const std::string& oid,
157 librados::ObjectWriteOperation *op, optional_yield y)
158 {
159 #ifdef HAVE_BOOST_CONTEXT
160 if (y) {
161 auto& context = y.get_io_context();
162 auto& yield = y.get_yield_context();
163 boost::system::error_code ec;
164 librados::async_operate(context, ioctx, oid, op, 0, yield[ec]);
165 return -ec.value();
166 }
167 if (is_asio_thread) {
168 dout(20) << "WARNING: blocking librados call" << dendl;
169 }
170 #endif
171 return ioctx.operate(oid, op);
172 }
173
174 void parse_mime_map_line(const char *start, const char *end)
175 {
176 char line[end - start + 1];
177 strncpy(line, start, end - start);
178 line[end - start] = '\0';
179 char *l = line;
180 #define DELIMS " \t\n\r"
181
182 while (isspace(*l))
183 l++;
184
185 char *mime = strsep(&l, DELIMS);
186 if (!mime)
187 return;
188
189 char *ext;
190 do {
191 ext = strsep(&l, DELIMS);
192 if (ext && *ext) {
193 (*ext_mime_map)[ext] = mime;
194 }
195 } while (ext);
196 }
197
198
199 void parse_mime_map(const char *buf)
200 {
201 const char *start = buf, *end = buf;
202 while (*end) {
203 while (*end && *end != '\n') {
204 end++;
205 }
206 parse_mime_map_line(start, end);
207 end++;
208 start = end;
209 }
210 }
211
212 static int ext_mime_map_init(CephContext *cct, const char *ext_map)
213 {
214 int fd = open(ext_map, O_RDONLY);
215 char *buf = NULL;
216 int ret;
217 if (fd < 0) {
218 ret = -errno;
219 ldout(cct, 0) << __func__ << " failed to open file=" << ext_map
220 << " : " << cpp_strerror(-ret) << dendl;
221 return ret;
222 }
223
224 struct stat st;
225 ret = fstat(fd, &st);
226 if (ret < 0) {
227 ret = -errno;
228 ldout(cct, 0) << __func__ << " failed to stat file=" << ext_map
229 << " : " << cpp_strerror(-ret) << dendl;
230 goto done;
231 }
232
233 buf = (char *)malloc(st.st_size + 1);
234 if (!buf) {
235 ret = -ENOMEM;
236 ldout(cct, 0) << __func__ << " failed to allocate buf" << dendl;
237 goto done;
238 }
239
240 ret = safe_read(fd, buf, st.st_size + 1);
241 if (ret != st.st_size) {
242 // huh? file size has changed?
243 ldout(cct, 0) << __func__ << " raced! will retry.." << dendl;
244 free(buf);
245 close(fd);
246 return ext_mime_map_init(cct, ext_map);
247 }
248 buf[st.st_size] = '\0';
249
250 parse_mime_map(buf);
251 ret = 0;
252 done:
253 free(buf);
254 close(fd);
255 return ret;
256 }
257
258 const char *rgw_find_mime_by_ext(string& ext)
259 {
260 map<string, string>::iterator iter = ext_mime_map->find(ext);
261 if (iter == ext_mime_map->end())
262 return NULL;
263
264 return iter->second.c_str();
265 }
266
267 void rgw_filter_attrset(map<string, bufferlist>& unfiltered_attrset, const string& check_prefix,
268 map<string, bufferlist> *attrset)
269 {
270 attrset->clear();
271 map<string, bufferlist>::iterator iter;
272 for (iter = unfiltered_attrset.lower_bound(check_prefix);
273 iter != unfiltered_attrset.end(); ++iter) {
274 if (!boost::algorithm::starts_with(iter->first, check_prefix))
275 break;
276 (*attrset)[iter->first] = iter->second;
277 }
278 }
279
280 RGWDataAccess::RGWDataAccess(RGWRados *_store) : store(_store)
281 {
282 sysobj_ctx = std::make_unique<RGWSysObjectCtx>(store->svc.sysobj->init_obj_ctx());
283 }
284
285
286 int RGWDataAccess::Bucket::finish_init()
287 {
288 auto iter = attrs.find(RGW_ATTR_ACL);
289 if (iter == attrs.end()) {
290 return 0;
291 }
292
293 bufferlist::const_iterator bliter = iter->second.begin();
294 try {
295 policy.decode(bliter);
296 } catch (buffer::error& err) {
297 return -EIO;
298 }
299
300 return 0;
301 }
302
303 int RGWDataAccess::Bucket::init()
304 {
305 int ret = sd->store->get_bucket_info(*sd->sysobj_ctx,
306 tenant, name,
307 bucket_info,
308 &mtime,
309 &attrs);
310 if (ret < 0) {
311 return ret;
312 }
313
314 return finish_init();
315 }
316
317 int RGWDataAccess::Bucket::init(const RGWBucketInfo& _bucket_info,
318 const map<string, bufferlist>& _attrs)
319 {
320 bucket_info = _bucket_info;
321 attrs = _attrs;
322
323 return finish_init();
324 }
325
326 int RGWDataAccess::Bucket::get_object(const rgw_obj_key& key,
327 ObjectRef *obj) {
328 obj->reset(new Object(sd, shared_from_this(), key));
329 return 0;
330 }
331
332 int RGWDataAccess::Object::put(bufferlist& data,
333 map<string, bufferlist>& attrs)
334 {
335 RGWRados *store = sd->store;
336 CephContext *cct = store->ctx();
337
338 string tag;
339 append_rand_alpha(cct, tag, tag, 32);
340
341 RGWBucketInfo& bucket_info = bucket->bucket_info;
342
343 using namespace rgw::putobj;
344 rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
345
346 RGWObjectCtx obj_ctx(store);
347 rgw_obj obj(bucket_info.bucket, key);
348
349 auto& owner = bucket->policy.get_owner();
350
351 string req_id = store->svc.zone_utils->unique_id(store->get_new_req_id());
352
353 AtomicObjectProcessor processor(&aio, store, bucket_info,
354 nullptr,
355 owner.get_id(),
356 obj_ctx, obj, olh_epoch, req_id);
357
358 int ret = processor.prepare();
359 if (ret < 0)
360 return ret;
361
362 using namespace rgw::putobj;
363
364 DataProcessor *filter = &processor;
365
366 CompressorRef plugin;
367 boost::optional<RGWPutObj_Compress> compressor;
368
369 const auto& compression_type = store->svc.zone->get_zone_params().get_compression_type(bucket_info.placement_rule);
370 if (compression_type != "none") {
371 plugin = Compressor::create(store->ctx(), compression_type);
372 if (!plugin) {
373 ldout(store->ctx(), 1) << "Cannot load plugin for compression type "
374 << compression_type << dendl;
375 } else {
376 compressor.emplace(store->ctx(), plugin, filter);
377 filter = &*compressor;
378 }
379 }
380
381 off_t ofs = 0;
382 auto obj_size = data.length();
383
384 RGWMD5Etag etag_calc;
385
386 do {
387 size_t read_len = std::min(data.length(), (unsigned int)cct->_conf->rgw_max_chunk_size);
388
389 bufferlist bl;
390
391 data.splice(0, read_len, &bl);
392 etag_calc.update(bl);
393
394 ret = filter->process(std::move(bl), ofs);
395 if (ret < 0)
396 return ret;
397
398 ofs += read_len;
399 } while (data.length() > 0);
400
401 ret = filter->process({}, ofs);
402 if (ret < 0) {
403 return ret;
404 }
405 bool has_etag_attr = false;
406 auto iter = attrs.find(RGW_ATTR_ETAG);
407 if (iter != attrs.end()) {
408 bufferlist& bl = iter->second;
409 etag = bl.to_str();
410 has_etag_attr = true;
411 }
412
413 if (!aclbl) {
414 RGWAccessControlPolicy_S3 policy(cct);
415
416 policy.create_canned(bucket->policy.get_owner(), bucket->policy.get_owner(), string()); /* default private policy */
417
418 policy.encode(aclbl.emplace());
419 }
420
421 if (etag.empty()) {
422 etag_calc.finish(&etag);
423 }
424
425 if (!has_etag_attr) {
426 bufferlist etagbl;
427 etagbl.append(etag);
428 attrs[RGW_ATTR_ETAG] = etagbl;
429 }
430 attrs[RGW_ATTR_ACL] = *aclbl;
431
432 string *puser_data = nullptr;
433 if (user_data) {
434 puser_data = &(*user_data);
435 }
436
437 return processor.complete(obj_size, etag,
438 &mtime, mtime,
439 attrs, delete_at,
440 nullptr, nullptr,
441 puser_data,
442 nullptr, nullptr);
443 }
444
445 void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy)
446 {
447 policy.encode(aclbl.emplace());
448 }
449
450 int rgw_tools_init(CephContext *cct)
451 {
452 ext_mime_map = new std::map<std::string, std::string>;
453 ext_mime_map_init(cct, cct->_conf->rgw_mime_types_file.c_str());
454 // ignore errors; missing mime.types is not fatal
455 return 0;
456 }
457
458 void rgw_tools_cleanup()
459 {
460 delete ext_mime_map;
461 ext_mime_map = nullptr;
462 }