1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
6 #include "common/errno.h"
7 #include "common/safe_io.h"
8 #include "librados/librados_asio.h"
9 #include "common/async/yield_context.h"
11 #include "include/types.h"
12 #include "include/stringify.h"
14 #include "librados/AioCompletionImpl.h"
16 #include "rgw_common.h"
17 #include "rgw_tools.h"
18 #include "rgw_acl_s3.h"
20 #include "rgw_putobj_processor.h"
21 #include "rgw_aio_throttle.h"
22 #include "rgw_compression.h"
24 #include "rgw_sal_rados.h"
25 #include "osd/osd_types.h"
27 #include "services/svc_sys_obj.h"
28 #include "services/svc_zone.h"
29 #include "services/svc_zone_utils.h"
31 #define dout_subsys ceph_subsys_rgw
32 #define dout_context g_ceph_context
34 #define READ_CHUNK_LEN (512 * 1024)
38 static std::map
<std::string
, std::string
>* ext_mime_map
;
40 int rgw_init_ioctx(const DoutPrefixProvider
*dpp
,
41 librados::Rados
*rados
, const rgw_pool
& pool
,
42 librados::IoCtx
& ioctx
, bool create
,
45 int r
= rados
->ioctx_create(pool
.name
.c_str(), ioctx
);
46 if (r
== -ENOENT
&& create
) {
47 r
= rados
->pool_create(pool
.name
.c_str());
51 << " ERROR: librados::Rados::pool_create returned " << cpp_strerror(-r
)
52 << " (this can be due to a pool or placement group misconfiguration, e.g."
53 << " pg_num < pgp_num or mon_max_pg_per_osd exceeded)"
56 if (r
< 0 && r
!= -EEXIST
) {
60 r
= rados
->ioctx_create(pool
.name
.c_str(), ioctx
);
65 r
= ioctx
.application_enable(pg_pool_t::APPLICATION_NAME_RGW
, false);
66 if (r
< 0 && r
!= -EOPNOTSUPP
) {
71 // set pg_autoscale_bias
73 float bias
= g_conf().get_val
<double>("rgw_rados_pool_autoscale_bias");
74 int r
= rados
->mon_command(
75 "{\"prefix\": \"osd pool set\", \"pool\": \"" +
76 pool
.name
+ "\", \"var\": \"pg_autoscale_bias\", \"val\": \"" +
77 stringify(bias
) + "\"}",
80 ldpp_dout(dpp
, 10) << __func__
<< " warning: failed to set pg_autoscale_bias on "
81 << pool
.name
<< dendl
;
84 int min
= g_conf().get_val
<uint64_t>("rgw_rados_pool_pg_num_min");
85 r
= rados
->mon_command(
86 "{\"prefix\": \"osd pool set\", \"pool\": \"" +
87 pool
.name
+ "\", \"var\": \"pg_num_min\", \"val\": \"" +
88 stringify(min
) + "\"}",
91 ldpp_dout(dpp
, 10) << __func__
<< " warning: failed to set pg_num_min on "
92 << pool
.name
<< dendl
;
94 // set recovery_priority
95 int p
= g_conf().get_val
<uint64_t>("rgw_rados_pool_recovery_priority");
96 r
= rados
->mon_command(
97 "{\"prefix\": \"osd pool set\", \"pool\": \"" +
98 pool
.name
+ "\", \"var\": \"recovery_priority\": \"" +
102 ldpp_dout(dpp
, 10) << __func__
<< " warning: failed to set recovery_priority on "
103 << pool
.name
<< dendl
;
109 if (!pool
.ns
.empty()) {
110 ioctx
.set_namespace(pool
.ns
);
115 void rgw_shard_name(const string
& prefix
, unsigned max_shards
, const string
& key
, string
& name
, int *shard_id
)
117 uint32_t val
= ceph_str_hash_linux(key
.c_str(), key
.size());
120 *shard_id
= val
% max_shards
;
122 snprintf(buf
, sizeof(buf
), "%u", (unsigned)(val
% max_shards
));
126 void rgw_shard_name(const string
& prefix
, unsigned max_shards
, const string
& section
, const string
& key
, string
& name
)
128 uint32_t val
= ceph_str_hash_linux(key
.c_str(), key
.size());
129 val
^= ceph_str_hash_linux(section
.c_str(), section
.size());
131 snprintf(buf
, sizeof(buf
), "%u", (unsigned)(val
% max_shards
));
135 void rgw_shard_name(const string
& prefix
, unsigned shard_id
, string
& name
)
138 snprintf(buf
, sizeof(buf
), "%u", shard_id
);
142 int rgw_parse_list_of_flags(struct rgw_name_to_flag
*mapping
,
143 const string
& str
, uint32_t *perm
)
146 get_str_list(str
, strs
);
147 list
<string
>::iterator iter
;
149 for (iter
= strs
.begin(); iter
!= strs
.end(); ++iter
) {
151 for (int i
= 0; mapping
[i
].type_name
; i
++) {
152 if (s
.compare(mapping
[i
].type_name
) == 0)
153 v
|= mapping
[i
].flag
;
161 int rgw_put_system_obj(const DoutPrefixProvider
*dpp
,
162 RGWSysObjectCtx
& obj_ctx
, const rgw_pool
& pool
, const string
& oid
, bufferlist
& data
, bool exclusive
,
163 RGWObjVersionTracker
*objv_tracker
, real_time set_mtime
, optional_yield y
, map
<string
, bufferlist
> *pattrs
)
165 map
<string
,bufferlist
> no_attrs
;
170 rgw_raw_obj
obj(pool
, oid
);
172 auto sysobj
= obj_ctx
.get_obj(obj
);
173 int ret
= sysobj
.wop()
174 .set_objv_tracker(objv_tracker
)
175 .set_exclusive(exclusive
)
176 .set_mtime(set_mtime
)
178 .write(dpp
, data
, y
);
183 int rgw_get_system_obj(RGWSysObjectCtx
& obj_ctx
, const rgw_pool
& pool
, const string
& key
, bufferlist
& bl
,
184 RGWObjVersionTracker
*objv_tracker
, real_time
*pmtime
, optional_yield y
, const DoutPrefixProvider
*dpp
, map
<string
, bufferlist
> *pattrs
,
185 rgw_cache_entry_info
*cache_info
,
186 boost::optional
<obj_version
> refresh_version
, bool raw_attrs
)
188 bufferlist::iterator iter
;
189 int request_len
= READ_CHUNK_LEN
;
190 rgw_raw_obj
obj(pool
, key
);
192 obj_version original_readv
;
193 if (objv_tracker
&& !objv_tracker
->read_version
.empty()) {
194 original_readv
= objv_tracker
->read_version
;
198 auto sysobj
= obj_ctx
.get_obj(obj
);
199 auto rop
= sysobj
.rop();
201 int ret
= rop
.set_attrs(pattrs
)
202 .set_last_mod(pmtime
)
203 .set_objv_tracker(objv_tracker
)
204 .set_raw_attrs(raw_attrs
)
209 ret
= rop
.set_cache_info(cache_info
)
210 .set_refresh_version(refresh_version
)
212 if (ret
== -ECANCELED
) {
214 if (!original_readv
.empty()) {
215 /* we were asked to read a specific obj_version, failed */
219 objv_tracker
->read_version
.clear();
227 if (ret
< request_len
)
236 int rgw_delete_system_obj(const DoutPrefixProvider
*dpp
,
237 RGWSI_SysObj
*sysobj_svc
, const rgw_pool
& pool
, const string
& oid
,
238 RGWObjVersionTracker
*objv_tracker
, optional_yield y
)
240 auto obj_ctx
= sysobj_svc
->init_obj_ctx();
241 auto sysobj
= obj_ctx
.get_obj(rgw_raw_obj
{pool
, oid
});
242 rgw_raw_obj
obj(pool
, oid
);
244 .set_objv_tracker(objv_tracker
)
248 thread_local
bool is_asio_thread
= false;
250 int rgw_rados_operate(const DoutPrefixProvider
*dpp
, librados::IoCtx
& ioctx
, const std::string
& oid
,
251 librados::ObjectReadOperation
*op
, bufferlist
* pbl
,
252 optional_yield y
, int flags
)
254 // given a yield_context, call async_operate() to yield the coroutine instead
257 auto& context
= y
.get_io_context();
258 auto& yield
= y
.get_yield_context();
259 boost::system::error_code ec
;
260 auto bl
= librados::async_operate(
261 context
, ioctx
, oid
, op
, flags
, yield
[ec
]);
263 *pbl
= std::move(bl
);
267 // work on asio threads should be asynchronous, so warn when they block
268 if (is_asio_thread
) {
269 ldpp_dout(dpp
, 20) << "WARNING: blocking librados call" << dendl
;
271 return ioctx
.operate(oid
, op
, nullptr, flags
);
274 int rgw_rados_operate(const DoutPrefixProvider
*dpp
, librados::IoCtx
& ioctx
, const std::string
& oid
,
275 librados::ObjectWriteOperation
*op
, optional_yield y
,
279 auto& context
= y
.get_io_context();
280 auto& yield
= y
.get_yield_context();
281 boost::system::error_code ec
;
282 librados::async_operate(context
, ioctx
, oid
, op
, flags
, yield
[ec
]);
285 if (is_asio_thread
) {
286 ldpp_dout(dpp
, 20) << "WARNING: blocking librados call" << dendl
;
288 return ioctx
.operate(oid
, op
, flags
);
291 int rgw_rados_notify(const DoutPrefixProvider
*dpp
, librados::IoCtx
& ioctx
, const std::string
& oid
,
292 bufferlist
& bl
, uint64_t timeout_ms
, bufferlist
* pbl
,
296 auto& context
= y
.get_io_context();
297 auto& yield
= y
.get_yield_context();
298 boost::system::error_code ec
;
299 auto reply
= librados::async_notify(context
, ioctx
, oid
,
300 bl
, timeout_ms
, yield
[ec
]);
302 *pbl
= std::move(reply
);
306 if (is_asio_thread
) {
307 ldpp_dout(dpp
, 20) << "WARNING: blocking librados call" << dendl
;
309 return ioctx
.notify2(oid
, bl
, timeout_ms
, pbl
);
312 void parse_mime_map_line(const char *start
, const char *end
)
314 char line
[end
- start
+ 1];
315 strncpy(line
, start
, end
- start
);
316 line
[end
- start
] = '\0';
318 #define DELIMS " \t\n\r"
323 char *mime
= strsep(&l
, DELIMS
);
329 ext
= strsep(&l
, DELIMS
);
331 (*ext_mime_map
)[ext
] = mime
;
337 void parse_mime_map(const char *buf
)
339 const char *start
= buf
, *end
= buf
;
341 while (*end
&& *end
!= '\n') {
344 parse_mime_map_line(start
, end
);
350 static int ext_mime_map_init(const DoutPrefixProvider
*dpp
, CephContext
*cct
, const char *ext_map
)
352 int fd
= open(ext_map
, O_RDONLY
);
357 ldpp_dout(dpp
, 0) << __func__
<< " failed to open file=" << ext_map
358 << " : " << cpp_strerror(-ret
) << dendl
;
363 ret
= fstat(fd
, &st
);
366 ldpp_dout(dpp
, 0) << __func__
<< " failed to stat file=" << ext_map
367 << " : " << cpp_strerror(-ret
) << dendl
;
371 buf
= (char *)malloc(st
.st_size
+ 1);
374 ldpp_dout(dpp
, 0) << __func__
<< " failed to allocate buf" << dendl
;
378 ret
= safe_read(fd
, buf
, st
.st_size
+ 1);
379 if (ret
!= st
.st_size
) {
380 // huh? file size has changed?
381 ldpp_dout(dpp
, 0) << __func__
<< " raced! will retry.." << dendl
;
384 return ext_mime_map_init(dpp
, cct
, ext_map
);
386 buf
[st
.st_size
] = '\0';
396 const char *rgw_find_mime_by_ext(string
& ext
)
398 map
<string
, string
>::iterator iter
= ext_mime_map
->find(ext
);
399 if (iter
== ext_mime_map
->end())
402 return iter
->second
.c_str();
405 void rgw_filter_attrset(map
<string
, bufferlist
>& unfiltered_attrset
, const string
& check_prefix
,
406 map
<string
, bufferlist
> *attrset
)
409 map
<string
, bufferlist
>::iterator iter
;
410 for (iter
= unfiltered_attrset
.lower_bound(check_prefix
);
411 iter
!= unfiltered_attrset
.end(); ++iter
) {
412 if (!boost::algorithm::starts_with(iter
->first
, check_prefix
))
414 (*attrset
)[iter
->first
] = iter
->second
;
418 RGWDataAccess::RGWDataAccess(rgw::sal::Store
* _store
) : store(_store
)
423 int RGWDataAccess::Bucket::finish_init()
425 auto iter
= attrs
.find(RGW_ATTR_ACL
);
426 if (iter
== attrs
.end()) {
430 bufferlist::const_iterator bliter
= iter
->second
.begin();
432 policy
.decode(bliter
);
433 } catch (buffer::error
& err
) {
440 int RGWDataAccess::Bucket::init(const DoutPrefixProvider
*dpp
, optional_yield y
)
442 std::unique_ptr
<rgw::sal::Bucket
> bucket
;
443 int ret
= sd
->store
->get_bucket(dpp
, nullptr, tenant
, name
, &bucket
, y
);
448 bucket_info
= bucket
->get_info();
449 mtime
= bucket
->get_modification_time();
450 attrs
= bucket
->get_attrs();
452 return finish_init();
455 int RGWDataAccess::Bucket::init(const RGWBucketInfo
& _bucket_info
,
456 const map
<string
, bufferlist
>& _attrs
)
458 bucket_info
= _bucket_info
;
461 return finish_init();
464 int RGWDataAccess::Bucket::get_object(const rgw_obj_key
& key
,
466 obj
->reset(new Object(sd
, shared_from_this(), key
));
470 int RGWDataAccess::Object::put(bufferlist
& data
,
471 map
<string
, bufferlist
>& attrs
,
472 const DoutPrefixProvider
*dpp
,
475 rgw::sal::Store
* store
= sd
->store
;
476 CephContext
*cct
= store
->ctx();
479 append_rand_alpha(cct
, tag
, tag
, 32);
481 RGWBucketInfo
& bucket_info
= bucket
->bucket_info
;
483 rgw::BlockingAioThrottle
aio(store
->ctx()->_conf
->rgw_put_obj_min_window_size
);
485 RGWObjectCtx
obj_ctx(store
);
486 std::unique_ptr
<rgw::sal::Bucket
> b
;
487 store
->get_bucket(NULL
, bucket_info
, &b
);
488 std::unique_ptr
<rgw::sal::Object
> obj
= b
->get_object(key
);
490 auto& owner
= bucket
->policy
.get_owner();
492 string req_id
= store
->zone_unique_id(store
->get_new_req_id());
494 std::unique_ptr
<rgw::sal::Writer
> processor
;
495 processor
= store
->get_atomic_writer(dpp
, y
, std::move(obj
),
496 owner
.get_id(), obj_ctx
,
497 nullptr, olh_epoch
, req_id
);
499 int ret
= processor
->prepare(y
);
503 rgw::sal::DataProcessor
*filter
= processor
.get();
505 CompressorRef plugin
;
506 boost::optional
<RGWPutObj_Compress
> compressor
;
508 const auto& compression_type
= store
->get_zone()->get_params().get_compression_type(bucket_info
.placement_rule
);
509 if (compression_type
!= "none") {
510 plugin
= Compressor::create(store
->ctx(), compression_type
);
512 ldpp_dout(dpp
, 1) << "Cannot load plugin for compression type "
513 << compression_type
<< dendl
;
515 compressor
.emplace(store
->ctx(), plugin
, filter
);
516 filter
= &*compressor
;
521 auto obj_size
= data
.length();
523 RGWMD5Etag etag_calc
;
526 size_t read_len
= std::min(data
.length(), (unsigned int)cct
->_conf
->rgw_max_chunk_size
);
530 data
.splice(0, read_len
, &bl
);
531 etag_calc
.update(bl
);
533 ret
= filter
->process(std::move(bl
), ofs
);
538 } while (data
.length() > 0);
540 ret
= filter
->process({}, ofs
);
544 bool has_etag_attr
= false;
545 auto iter
= attrs
.find(RGW_ATTR_ETAG
);
546 if (iter
!= attrs
.end()) {
547 bufferlist
& bl
= iter
->second
;
549 has_etag_attr
= true;
553 RGWAccessControlPolicy_S3
policy(cct
);
555 policy
.create_canned(bucket
->policy
.get_owner(), bucket
->policy
.get_owner(), string()); /* default private policy */
557 policy
.encode(aclbl
.emplace());
561 etag_calc
.finish(&etag
);
564 if (!has_etag_attr
) {
567 attrs
[RGW_ATTR_ETAG
] = etagbl
;
569 attrs
[RGW_ATTR_ACL
] = *aclbl
;
571 string
*puser_data
= nullptr;
573 puser_data
= &(*user_data
);
576 return processor
->complete(obj_size
, etag
,
581 nullptr, nullptr, y
);
584 void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy
& policy
)
586 policy
.encode(aclbl
.emplace());
589 int rgw_tools_init(const DoutPrefixProvider
*dpp
, CephContext
*cct
)
591 ext_mime_map
= new std::map
<std::string
, std::string
>;
592 ext_mime_map_init(dpp
, cct
, cct
->_conf
->rgw_mime_types_file
.c_str());
593 // ignore errors; missing mime.types is not fatal
597 void rgw_tools_cleanup()
600 ext_mime_map
= nullptr;
603 void rgw_complete_aio_completion(librados::AioCompletion
* c
, int r
) {
605 librados::CB_AioCompleteAndSafe
cb(pc
);