1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "common/errno.h"
7 #include "common/safe_io.h"
8 #include "librados/librados_asio.h"
9 #include "common/async/yield_context.h"
11 #include "include/types.h"
13 #include "rgw_common.h"
14 #include "rgw_rados.h"
15 #include "rgw_tools.h"
16 #include "rgw_acl_s3.h"
18 #include "rgw_putobj_processor.h"
19 #include "rgw_aio_throttle.h"
20 #include "rgw_compression.h"
23 #include "services/svc_sys_obj.h"
24 #include "services/svc_zone_utils.h"
26 #define dout_subsys ceph_subsys_rgw
27 #define dout_context g_ceph_context
29 #define READ_CHUNK_LEN (512 * 1024)
31 static std::map
<std::string
, std::string
>* ext_mime_map
;
33 int rgw_put_system_obj(RGWRados
*rgwstore
, const rgw_pool
& pool
, const string
& oid
, bufferlist
& data
, bool exclusive
,
34 RGWObjVersionTracker
*objv_tracker
, real_time set_mtime
, map
<string
, bufferlist
> *pattrs
)
36 map
<string
,bufferlist
> no_attrs
;
41 rgw_raw_obj
obj(pool
, oid
);
43 auto obj_ctx
= rgwstore
->svc
.sysobj
->init_obj_ctx();
44 auto sysobj
= obj_ctx
.get_obj(obj
);
45 int ret
= sysobj
.wop()
46 .set_objv_tracker(objv_tracker
)
47 .set_exclusive(exclusive
)
53 ret
= rgwstore
->create_pool(pool
);
56 .set_objv_tracker(objv_tracker
)
57 .set_exclusive(exclusive
)
67 int rgw_get_system_obj(RGWRados
*rgwstore
, RGWSysObjectCtx
& obj_ctx
, const rgw_pool
& pool
, const string
& key
, bufferlist
& bl
,
68 RGWObjVersionTracker
*objv_tracker
, real_time
*pmtime
, map
<string
, bufferlist
> *pattrs
,
69 rgw_cache_entry_info
*cache_info
, boost::optional
<obj_version
> refresh_version
)
71 bufferlist::iterator iter
;
72 int request_len
= READ_CHUNK_LEN
;
73 rgw_raw_obj
obj(pool
, key
);
75 obj_version original_readv
;
76 if (objv_tracker
&& !objv_tracker
->read_version
.empty()) {
77 original_readv
= objv_tracker
->read_version
;
81 auto sysobj
= obj_ctx
.get_obj(obj
);
82 auto rop
= sysobj
.rop();
84 int ret
= rop
.set_attrs(pattrs
)
86 .set_objv_tracker(objv_tracker
)
91 ret
= rop
.set_cache_info(cache_info
)
92 .set_refresh_version(refresh_version
)
94 if (ret
== -ECANCELED
) {
96 if (!original_readv
.empty()) {
97 /* we were asked to read a specific obj_version, failed */
101 objv_tracker
->read_version
.clear();
109 if (ret
< request_len
)
118 int rgw_delete_system_obj(RGWRados
*rgwstore
, const rgw_pool
& pool
, const string
& oid
,
119 RGWObjVersionTracker
*objv_tracker
)
121 auto obj_ctx
= rgwstore
->svc
.sysobj
->init_obj_ctx();
122 auto sysobj
= obj_ctx
.get_obj(rgw_raw_obj
{pool
, oid
});
123 rgw_raw_obj
obj(pool
, oid
);
125 .set_objv_tracker(objv_tracker
)
129 thread_local
bool is_asio_thread
= false;
131 int rgw_rados_operate(librados::IoCtx
& ioctx
, const std::string
& oid
,
132 librados::ObjectReadOperation
*op
, bufferlist
* pbl
,
135 #ifdef HAVE_BOOST_CONTEXT
136 // given a yield_context, call async_operate() to yield the coroutine instead
139 auto& context
= y
.get_io_context();
140 auto& yield
= y
.get_yield_context();
141 boost::system::error_code ec
;
142 auto bl
= librados::async_operate(context
, ioctx
, oid
, op
, 0, yield
[ec
]);
144 *pbl
= std::move(bl
);
148 // work on asio threads should be asynchronous, so warn when they block
149 if (is_asio_thread
) {
150 dout(20) << "WARNING: blocking librados call" << dendl
;
153 return ioctx
.operate(oid
, op
, nullptr);
156 int rgw_rados_operate(librados::IoCtx
& ioctx
, const std::string
& oid
,
157 librados::ObjectWriteOperation
*op
, optional_yield y
)
159 #ifdef HAVE_BOOST_CONTEXT
161 auto& context
= y
.get_io_context();
162 auto& yield
= y
.get_yield_context();
163 boost::system::error_code ec
;
164 librados::async_operate(context
, ioctx
, oid
, op
, 0, yield
[ec
]);
167 if (is_asio_thread
) {
168 dout(20) << "WARNING: blocking librados call" << dendl
;
171 return ioctx
.operate(oid
, op
);
174 void parse_mime_map_line(const char *start
, const char *end
)
176 char line
[end
- start
+ 1];
177 strncpy(line
, start
, end
- start
);
178 line
[end
- start
] = '\0';
180 #define DELIMS " \t\n\r"
185 char *mime
= strsep(&l
, DELIMS
);
191 ext
= strsep(&l
, DELIMS
);
193 (*ext_mime_map
)[ext
] = mime
;
199 void parse_mime_map(const char *buf
)
201 const char *start
= buf
, *end
= buf
;
203 while (*end
&& *end
!= '\n') {
206 parse_mime_map_line(start
, end
);
212 static int ext_mime_map_init(CephContext
*cct
, const char *ext_map
)
214 int fd
= open(ext_map
, O_RDONLY
);
219 ldout(cct
, 0) << __func__
<< " failed to open file=" << ext_map
220 << " : " << cpp_strerror(-ret
) << dendl
;
225 ret
= fstat(fd
, &st
);
228 ldout(cct
, 0) << __func__
<< " failed to stat file=" << ext_map
229 << " : " << cpp_strerror(-ret
) << dendl
;
233 buf
= (char *)malloc(st
.st_size
+ 1);
236 ldout(cct
, 0) << __func__
<< " failed to allocate buf" << dendl
;
240 ret
= safe_read(fd
, buf
, st
.st_size
+ 1);
241 if (ret
!= st
.st_size
) {
242 // huh? file size has changed?
243 ldout(cct
, 0) << __func__
<< " raced! will retry.." << dendl
;
246 return ext_mime_map_init(cct
, ext_map
);
248 buf
[st
.st_size
] = '\0';
258 const char *rgw_find_mime_by_ext(string
& ext
)
260 map
<string
, string
>::iterator iter
= ext_mime_map
->find(ext
);
261 if (iter
== ext_mime_map
->end())
264 return iter
->second
.c_str();
267 void rgw_filter_attrset(map
<string
, bufferlist
>& unfiltered_attrset
, const string
& check_prefix
,
268 map
<string
, bufferlist
> *attrset
)
271 map
<string
, bufferlist
>::iterator iter
;
272 for (iter
= unfiltered_attrset
.lower_bound(check_prefix
);
273 iter
!= unfiltered_attrset
.end(); ++iter
) {
274 if (!boost::algorithm::starts_with(iter
->first
, check_prefix
))
276 (*attrset
)[iter
->first
] = iter
->second
;
280 RGWDataAccess::RGWDataAccess(RGWRados
*_store
) : store(_store
)
282 sysobj_ctx
= std::make_unique
<RGWSysObjectCtx
>(store
->svc
.sysobj
->init_obj_ctx());
286 int RGWDataAccess::Bucket::finish_init()
288 auto iter
= attrs
.find(RGW_ATTR_ACL
);
289 if (iter
== attrs
.end()) {
293 bufferlist::const_iterator bliter
= iter
->second
.begin();
295 policy
.decode(bliter
);
296 } catch (buffer::error
& err
) {
303 int RGWDataAccess::Bucket::init()
305 int ret
= sd
->store
->get_bucket_info(*sd
->sysobj_ctx
,
314 return finish_init();
317 int RGWDataAccess::Bucket::init(const RGWBucketInfo
& _bucket_info
,
318 const map
<string
, bufferlist
>& _attrs
)
320 bucket_info
= _bucket_info
;
323 return finish_init();
326 int RGWDataAccess::Bucket::get_object(const rgw_obj_key
& key
,
328 obj
->reset(new Object(sd
, shared_from_this(), key
));
332 int RGWDataAccess::Object::put(bufferlist
& data
,
333 map
<string
, bufferlist
>& attrs
)
335 RGWRados
*store
= sd
->store
;
336 CephContext
*cct
= store
->ctx();
339 append_rand_alpha(cct
, tag
, tag
, 32);
341 RGWBucketInfo
& bucket_info
= bucket
->bucket_info
;
343 using namespace rgw::putobj
;
344 rgw::AioThrottle
aio(store
->ctx()->_conf
->rgw_put_obj_min_window_size
);
346 RGWObjectCtx
obj_ctx(store
);
347 rgw_obj
obj(bucket_info
.bucket
, key
);
349 auto& owner
= bucket
->policy
.get_owner();
351 string req_id
= store
->svc
.zone_utils
->unique_id(store
->get_new_req_id());
353 AtomicObjectProcessor
processor(&aio
, store
, bucket_info
,
356 obj_ctx
, obj
, olh_epoch
, req_id
);
358 int ret
= processor
.prepare();
362 using namespace rgw::putobj
;
364 DataProcessor
*filter
= &processor
;
366 CompressorRef plugin
;
367 boost::optional
<RGWPutObj_Compress
> compressor
;
369 const auto& compression_type
= store
->svc
.zone
->get_zone_params().get_compression_type(bucket_info
.placement_rule
);
370 if (compression_type
!= "none") {
371 plugin
= Compressor::create(store
->ctx(), compression_type
);
373 ldout(store
->ctx(), 1) << "Cannot load plugin for compression type "
374 << compression_type
<< dendl
;
376 compressor
.emplace(store
->ctx(), plugin
, filter
);
377 filter
= &*compressor
;
382 auto obj_size
= data
.length();
384 RGWMD5Etag etag_calc
;
387 size_t read_len
= std::min(data
.length(), (unsigned int)cct
->_conf
->rgw_max_chunk_size
);
391 data
.splice(0, read_len
, &bl
);
392 etag_calc
.update(bl
);
394 ret
= filter
->process(std::move(bl
), ofs
);
399 } while (data
.length() > 0);
401 ret
= filter
->process({}, ofs
);
405 bool has_etag_attr
= false;
406 auto iter
= attrs
.find(RGW_ATTR_ETAG
);
407 if (iter
!= attrs
.end()) {
408 bufferlist
& bl
= iter
->second
;
410 has_etag_attr
= true;
414 RGWAccessControlPolicy_S3
policy(cct
);
416 policy
.create_canned(bucket
->policy
.get_owner(), bucket
->policy
.get_owner(), string()); /* default private policy */
418 policy
.encode(aclbl
.emplace());
422 etag_calc
.finish(&etag
);
425 if (!has_etag_attr
) {
428 attrs
[RGW_ATTR_ETAG
] = etagbl
;
430 attrs
[RGW_ATTR_ACL
] = *aclbl
;
432 string
*puser_data
= nullptr;
434 puser_data
= &(*user_data
);
437 return processor
.complete(obj_size
, etag
,
445 void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy
& policy
)
447 policy
.encode(aclbl
.emplace());
450 int rgw_tools_init(CephContext
*cct
)
452 ext_mime_map
= new std::map
<std::string
, std::string
>;
453 ext_mime_map_init(cct
, cct
->_conf
->rgw_mime_types_file
.c_str());
454 // ignore errors; missing mime.types is not fatal
458 void rgw_tools_cleanup()
461 ext_mime_map
= nullptr;