#include "include/types.h"
#include "include/stringify.h"
+#include "librados/AioCompletionImpl.h"
+
#include "rgw_common.h"
-#include "rgw_rados.h"
#include "rgw_tools.h"
#include "rgw_acl_s3.h"
#include "rgw_op.h"
#include "rgw_aio_throttle.h"
#include "rgw_compression.h"
#include "rgw_zone.h"
+#include "rgw_sal_rados.h"
#include "osd/osd_types.h"
#include "services/svc_sys_obj.h"
#define READ_CHUNK_LEN (512 * 1024)
+using namespace std;
+
static std::map<std::string, std::string>* ext_mime_map;
-int rgw_init_ioctx(librados::Rados *rados, const rgw_pool& pool,
+int rgw_init_ioctx(const DoutPrefixProvider *dpp,
+ librados::Rados *rados, const rgw_pool& pool,
librados::IoCtx& ioctx, bool create,
bool mostly_omap)
{
if (r == -ENOENT && create) {
r = rados->pool_create(pool.name.c_str());
if (r == -ERANGE) {
- dout(0)
+ ldpp_dout(dpp, 0)
<< __func__
<< " ERROR: librados::Rados::pool_create returned " << cpp_strerror(-r)
<< " (this can be due to a pool or placement group misconfiguration, e.g."
stringify(bias) + "\"}",
inbl, NULL, NULL);
if (r < 0) {
- dout(10) << __func__ << " warning: failed to set pg_autoscale_bias on "
- << pool.name << dendl;
- }
- // set pg_num_min
- int min = g_conf().get_val<uint64_t>("rgw_rados_pool_pg_num_min");
- r = rados->mon_command(
- "{\"prefix\": \"osd pool set\", \"pool\": \"" +
- pool.name + "\", \"var\": \"pg_num_min\", \"val\": \"" +
- stringify(min) + "\"}",
- inbl, NULL, NULL);
- if (r < 0) {
- dout(10) << __func__ << " warning: failed to set pg_num_min on "
+ ldpp_dout(dpp, 10) << __func__ << " warning: failed to set pg_autoscale_bias on "
<< pool.name << dendl;
}
// set recovery_priority
stringify(p) + "\"}",
inbl, NULL, NULL);
if (r < 0) {
- dout(10) << __func__ << " warning: failed to set recovery_priority on "
+ ldpp_dout(dpp, 10) << __func__ << " warning: failed to set recovery_priority on "
<< pool.name << dendl;
}
}
return 0;
}
-int rgw_put_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
+int rgw_put_system_obj(const DoutPrefixProvider *dpp,
+ RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
RGWObjVersionTracker *objv_tracker, real_time set_mtime, optional_yield y, map<string, bufferlist> *pattrs)
{
map<string,bufferlist> no_attrs;
.set_exclusive(exclusive)
.set_mtime(set_mtime)
.set_attrs(*pattrs)
- .write(data, y);
+ .write(dpp, data, y);
return ret;
}
-int rgw_put_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
- RGWObjVersionTracker *objv_tracker, real_time set_mtime, map<string, bufferlist> *pattrs)
-{
- return rgw_put_system_obj(obj_ctx, pool, oid, data, exclusive,
- objv_tracker, set_mtime, null_yield, pattrs);
-}
-
int rgw_get_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& key, bufferlist& bl,
- RGWObjVersionTracker *objv_tracker, real_time *pmtime, optional_yield y, map<string, bufferlist> *pattrs,
+ RGWObjVersionTracker *objv_tracker, real_time *pmtime, optional_yield y, const DoutPrefixProvider *dpp, map<string, bufferlist> *pattrs,
rgw_cache_entry_info *cache_info,
- boost::optional<obj_version> refresh_version)
+ boost::optional<obj_version> refresh_version, bool raw_attrs)
{
bufferlist::iterator iter;
int request_len = READ_CHUNK_LEN;
int ret = rop.set_attrs(pattrs)
.set_last_mod(pmtime)
.set_objv_tracker(objv_tracker)
- .stat(y);
+ .set_raw_attrs(raw_attrs)
+ .stat(y, dpp);
if (ret < 0)
return ret;
ret = rop.set_cache_info(cache_info)
.set_refresh_version(refresh_version)
- .read(&bl, y);
+ .read(dpp, &bl, y);
if (ret == -ECANCELED) {
/* raced, restart */
if (!original_readv.empty()) {
return 0;
}
-int rgw_delete_system_obj(RGWSI_SysObj *sysobj_svc, const rgw_pool& pool, const string& oid,
- RGWObjVersionTracker *objv_tracker)
+int rgw_delete_system_obj(const DoutPrefixProvider *dpp,
+ RGWSI_SysObj *sysobj_svc, const rgw_pool& pool, const string& oid,
+ RGWObjVersionTracker *objv_tracker, optional_yield y)
{
auto obj_ctx = sysobj_svc->init_obj_ctx();
auto sysobj = obj_ctx.get_obj(rgw_raw_obj{pool, oid});
rgw_raw_obj obj(pool, oid);
return sysobj.wop()
.set_objv_tracker(objv_tracker)
- .remove(null_yield);
+ .remove(dpp, y);
}
thread_local bool is_asio_thread = false;
-int rgw_rados_operate(librados::IoCtx& ioctx, const std::string& oid,
+int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
librados::ObjectReadOperation *op, bufferlist* pbl,
- optional_yield y)
+ optional_yield y, int flags)
{
-#ifdef HAVE_BOOST_CONTEXT
// given a yield_context, call async_operate() to yield the coroutine instead
// of blocking
if (y) {
auto& context = y.get_io_context();
auto& yield = y.get_yield_context();
boost::system::error_code ec;
- auto bl = librados::async_operate(context, ioctx, oid, op, 0, yield[ec]);
+ auto bl = librados::async_operate(
+ context, ioctx, oid, op, flags, yield[ec]);
if (pbl) {
*pbl = std::move(bl);
}
}
// work on asio threads should be asynchronous, so warn when they block
if (is_asio_thread) {
- dout(20) << "WARNING: blocking librados call" << dendl;
+ ldpp_dout(dpp, 20) << "WARNING: blocking librados call" << dendl;
}
-#endif
- return ioctx.operate(oid, op, nullptr);
+ return ioctx.operate(oid, op, nullptr, flags);
}
-int rgw_rados_operate(librados::IoCtx& ioctx, const std::string& oid,
- librados::ObjectWriteOperation *op, optional_yield y)
+int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
+ librados::ObjectWriteOperation *op, optional_yield y,
+ int flags)
{
-#ifdef HAVE_BOOST_CONTEXT
if (y) {
auto& context = y.get_io_context();
auto& yield = y.get_yield_context();
boost::system::error_code ec;
- librados::async_operate(context, ioctx, oid, op, 0, yield[ec]);
+ librados::async_operate(context, ioctx, oid, op, flags, yield[ec]);
return -ec.value();
}
if (is_asio_thread) {
- dout(20) << "WARNING: blocking librados call" << dendl;
+ ldpp_dout(dpp, 20) << "WARNING: blocking librados call" << dendl;
}
-#endif
- return ioctx.operate(oid, op);
+ return ioctx.operate(oid, op, flags);
}
-int rgw_rados_notify(librados::IoCtx& ioctx, const std::string& oid,
+int rgw_rados_notify(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid,
bufferlist& bl, uint64_t timeout_ms, bufferlist* pbl,
optional_yield y)
{
-#ifdef HAVE_BOOST_CONTEXT
if (y) {
auto& context = y.get_io_context();
auto& yield = y.get_yield_context();
return -ec.value();
}
if (is_asio_thread) {
- dout(20) << "WARNING: blocking librados call" << dendl;
+ ldpp_dout(dpp, 20) << "WARNING: blocking librados call" << dendl;
}
-#endif
return ioctx.notify2(oid, bl, timeout_ms, pbl);
}
}
}
-static int ext_mime_map_init(CephContext *cct, const char *ext_map)
+static int ext_mime_map_init(const DoutPrefixProvider *dpp, CephContext *cct, const char *ext_map)
{
int fd = open(ext_map, O_RDONLY);
char *buf = NULL;
int ret;
if (fd < 0) {
ret = -errno;
- ldout(cct, 0) << __func__ << " failed to open file=" << ext_map
+ ldpp_dout(dpp, 0) << __func__ << " failed to open file=" << ext_map
<< " : " << cpp_strerror(-ret) << dendl;
return ret;
}
ret = fstat(fd, &st);
if (ret < 0) {
ret = -errno;
- ldout(cct, 0) << __func__ << " failed to stat file=" << ext_map
+ ldpp_dout(dpp, 0) << __func__ << " failed to stat file=" << ext_map
<< " : " << cpp_strerror(-ret) << dendl;
goto done;
}
buf = (char *)malloc(st.st_size + 1);
if (!buf) {
ret = -ENOMEM;
- ldout(cct, 0) << __func__ << " failed to allocate buf" << dendl;
+ ldpp_dout(dpp, 0) << __func__ << " failed to allocate buf" << dendl;
goto done;
}
ret = safe_read(fd, buf, st.st_size + 1);
if (ret != st.st_size) {
// huh? file size has changed?
- ldout(cct, 0) << __func__ << " raced! will retry.." << dendl;
+ ldpp_dout(dpp, 0) << __func__ << " raced! will retry.." << dendl;
free(buf);
close(fd);
- return ext_mime_map_init(cct, ext_map);
+ return ext_mime_map_init(dpp, cct, ext_map);
}
buf[st.st_size] = '\0';
}
}
-RGWDataAccess::RGWDataAccess(rgw::sal::RGWRadosStore *_store) : store(_store)
+RGWDataAccess::RGWDataAccess(rgw::sal::Store* _store) : store(_store)
{
- sysobj_ctx = std::make_unique<RGWSysObjectCtx>(store->svc()->sysobj->init_obj_ctx());
}
return 0;
}
-int RGWDataAccess::Bucket::init()
+int RGWDataAccess::Bucket::init(const DoutPrefixProvider *dpp, optional_yield y)
{
- int ret = sd->store->getRados()->get_bucket_info(sd->store->svc(),
- tenant, name,
- bucket_info,
- &mtime,
- null_yield,
- &attrs);
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ int ret = sd->store->get_bucket(dpp, nullptr, tenant, name, &bucket, y);
if (ret < 0) {
return ret;
}
+ bucket_info = bucket->get_info();
+ mtime = bucket->get_modification_time();
+ attrs = bucket->get_attrs();
+
return finish_init();
}
const DoutPrefixProvider *dpp,
optional_yield y)
{
- rgw::sal::RGWRadosStore *store = sd->store;
+ rgw::sal::Store* store = sd->store;
CephContext *cct = store->ctx();
string tag;
rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
RGWObjectCtx obj_ctx(store);
- rgw_obj obj(bucket_info.bucket, key);
+ std::unique_ptr<rgw::sal::Bucket> b;
+ store->get_bucket(NULL, bucket_info, &b);
+ std::unique_ptr<rgw::sal::Object> obj = b->get_object(key);
auto& owner = bucket->policy.get_owner();
- string req_id = store->svc()->zone_utils->unique_id(store->getRados()->get_new_req_id());
+ string req_id = store->zone_unique_id(store->get_new_req_id());
- using namespace rgw::putobj;
- AtomicObjectProcessor processor(&aio, store, bucket_info, nullptr,
- owner.get_id(), obj_ctx, obj, olh_epoch,
- req_id, dpp, y);
+ std::unique_ptr<rgw::sal::Writer> processor;
+ processor = store->get_atomic_writer(dpp, y, std::move(obj),
+ owner.get_id(), obj_ctx,
+ nullptr, olh_epoch, req_id);
- int ret = processor.prepare(y);
+ int ret = processor->prepare(y);
if (ret < 0)
return ret;
- DataProcessor *filter = &processor;
+ rgw::sal::DataProcessor *filter = processor.get();
CompressorRef plugin;
boost::optional<RGWPutObj_Compress> compressor;
- const auto& compression_type = store->svc()->zone->get_zone_params().get_compression_type(bucket_info.placement_rule);
+ const auto& compression_type = store->get_zone()->get_params().get_compression_type(bucket_info.placement_rule);
if (compression_type != "none") {
plugin = Compressor::create(store->ctx(), compression_type);
if (!plugin) {
- ldout(store->ctx(), 1) << "Cannot load plugin for compression type "
+ ldpp_dout(dpp, 1) << "Cannot load plugin for compression type "
<< compression_type << dendl;
} else {
compressor.emplace(store->ctx(), plugin, filter);
puser_data = &(*user_data);
}
- return processor.complete(obj_size, etag,
+ return processor->complete(obj_size, etag,
&mtime, mtime,
attrs, delete_at,
nullptr, nullptr,
policy.encode(aclbl.emplace());
}
-int rgw_tools_init(CephContext *cct)
+int rgw_tools_init(const DoutPrefixProvider *dpp, CephContext *cct)
{
ext_mime_map = new std::map<std::string, std::string>;
- ext_mime_map_init(cct, cct->_conf->rgw_mime_types_file.c_str());
+ ext_mime_map_init(dpp, cct, cct->_conf->rgw_mime_types_file.c_str());
// ignore errors; missing mime.types is not fatal
return 0;
}
delete ext_mime_map;
ext_mime_map = nullptr;
}
+
+void rgw_complete_aio_completion(librados::AioCompletion* c, int r) {
+ auto pc = c->pc;
+ librados::CB_AioCompleteAndSafe cb(pc);
+ cb(r);
+}