// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// vim: ts=8 sw=2 smarttab ft=cpp
#include <errno.h>
#include "osd/osd_types.h"
#include "services/svc_sys_obj.h"
+#include "services/svc_zone.h"
#include "services/svc_zone_utils.h"
#define dout_subsys ceph_subsys_rgw
float bias = g_conf().get_val<double>("rgw_rados_pool_autoscale_bias");
int r = rados->mon_command(
"{\"prefix\": \"osd pool set\", \"pool\": \"" +
- pool.name + "\", \"var\": \"pg_autoscale_bias\": \"" +
+ pool.name + "\", \"var\": \"pg_autoscale_bias\", \"val\": \"" +
stringify(bias) + "\"}",
inbl, NULL, NULL);
if (r < 0) {
int min = g_conf().get_val<uint64_t>("rgw_rados_pool_pg_num_min");
r = rados->mon_command(
"{\"prefix\": \"osd pool set\", \"pool\": \"" +
- pool.name + "\", \"var\": \"pg_num_min\": \"" +
+ pool.name + "\", \"var\": \"pg_num_min\", \"val\": \"" +
stringify(min) + "\"}",
inbl, NULL, NULL);
- if (r < 0) {
- dout(10) << __func__ << " warning: failed to set pg_num_min on "
- << pool.name << dendl;
+ if (r < 0) {
+ dout(10) << __func__ << " warning: failed to set pg_num_min on "
+ << pool.name << dendl;
+ }
+ // set recovery_priority
+ int p = g_conf().get_val<uint64_t>("rgw_rados_pool_recovery_priority");
+ r = rados->mon_command(
+ "{\"prefix\": \"osd pool set\", \"pool\": \"" +
+ pool.name + "\", \"var\": \"recovery_priority\": \"" +
+ stringify(p) + "\"}",
+ inbl, NULL, NULL);
+ if (r < 0) {
+ dout(10) << __func__ << " warning: failed to set recovery_priority on "
+ << pool.name << dendl;
}
}
} else if (r < 0) {
return 0;
}
-int rgw_put_system_obj(RGWRados *rgwstore, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
- RGWObjVersionTracker *objv_tracker, real_time set_mtime, map<string, bufferlist> *pattrs)
+void rgw_shard_name(const string& prefix, unsigned max_shards, const string& key, string& name, int *shard_id)
+{
+ uint32_t val = ceph_str_hash_linux(key.c_str(), key.size());
+ char buf[16];
+ if (shard_id) {
+ *shard_id = val % max_shards;
+ }
+ snprintf(buf, sizeof(buf), "%u", (unsigned)(val % max_shards));
+ name = prefix + buf;
+}
+
+void rgw_shard_name(const string& prefix, unsigned max_shards, const string& section, const string& key, string& name)
+{
+ uint32_t val = ceph_str_hash_linux(key.c_str(), key.size());
+ val ^= ceph_str_hash_linux(section.c_str(), section.size());
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%u", (unsigned)(val % max_shards));
+ name = prefix + buf;
+}
+
+void rgw_shard_name(const string& prefix, unsigned shard_id, string& name)
+{
+ char buf[16];
+ snprintf(buf, sizeof(buf), "%u", shard_id);
+ name = prefix + buf;
+}
+
+int rgw_parse_list_of_flags(struct rgw_name_to_flag *mapping,
+ const string& str, uint32_t *perm)
+{
+ list<string> strs;
+ get_str_list(str, strs);
+ list<string>::iterator iter;
+ uint32_t v = 0;
+ for (iter = strs.begin(); iter != strs.end(); ++iter) {
+ string& s = *iter;
+ for (int i = 0; mapping[i].type_name; i++) {
+ if (s.compare(mapping[i].type_name) == 0)
+ v |= mapping[i].flag;
+ }
+ }
+
+ *perm = v;
+ return 0;
+}
+
+int rgw_put_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
+ RGWObjVersionTracker *objv_tracker, real_time set_mtime, optional_yield y, map<string, bufferlist> *pattrs)
{
map<string,bufferlist> no_attrs;
if (!pattrs) {
rgw_raw_obj obj(pool, oid);
- auto obj_ctx = rgwstore->svc.sysobj->init_obj_ctx();
auto sysobj = obj_ctx.get_obj(obj);
int ret = sysobj.wop()
.set_objv_tracker(objv_tracker)
.set_exclusive(exclusive)
.set_mtime(set_mtime)
.set_attrs(*pattrs)
- .write(data);
-
- if (ret == -ENOENT) {
- ret = rgwstore->create_pool(pool);
- if (ret >= 0) {
- ret = sysobj.wop()
- .set_objv_tracker(objv_tracker)
- .set_exclusive(exclusive)
- .set_mtime(set_mtime)
- .set_attrs(*pattrs)
- .write(data);
- }
- }
+ .write(data, y);
return ret;
}
-int rgw_get_system_obj(RGWRados *rgwstore, RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& key, bufferlist& bl,
- RGWObjVersionTracker *objv_tracker, real_time *pmtime, map<string, bufferlist> *pattrs,
- rgw_cache_entry_info *cache_info, boost::optional<obj_version> refresh_version)
+int rgw_put_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& oid, bufferlist& data, bool exclusive,
+ RGWObjVersionTracker *objv_tracker, real_time set_mtime, map<string, bufferlist> *pattrs)
+{
+ return rgw_put_system_obj(obj_ctx, pool, oid, data, exclusive,
+ objv_tracker, set_mtime, null_yield, pattrs);
+}
+
+int rgw_get_system_obj(RGWSysObjectCtx& obj_ctx, const rgw_pool& pool, const string& key, bufferlist& bl,
+ RGWObjVersionTracker *objv_tracker, real_time *pmtime, optional_yield y, map<string, bufferlist> *pattrs,
+ rgw_cache_entry_info *cache_info,
+ boost::optional<obj_version> refresh_version)
{
bufferlist::iterator iter;
int request_len = READ_CHUNK_LEN;
int ret = rop.set_attrs(pattrs)
.set_last_mod(pmtime)
.set_objv_tracker(objv_tracker)
- .stat();
+ .stat(y);
if (ret < 0)
return ret;
ret = rop.set_cache_info(cache_info)
.set_refresh_version(refresh_version)
- .read(&bl);
+ .read(&bl, y);
if (ret == -ECANCELED) {
/* raced, restart */
if (!original_readv.empty()) {
return 0;
}
-int rgw_delete_system_obj(RGWRados *rgwstore, const rgw_pool& pool, const string& oid,
+int rgw_delete_system_obj(RGWSI_SysObj *sysobj_svc, const rgw_pool& pool, const string& oid,
RGWObjVersionTracker *objv_tracker)
{
- auto obj_ctx = rgwstore->svc.sysobj->init_obj_ctx();
+ auto obj_ctx = sysobj_svc->init_obj_ctx();
auto sysobj = obj_ctx.get_obj(rgw_raw_obj{pool, oid});
rgw_raw_obj obj(pool, oid);
return sysobj.wop()
.set_objv_tracker(objv_tracker)
- .remove();
+ .remove(null_yield);
}
thread_local bool is_asio_thread = false;
return ioctx.operate(oid, op);
}
+int rgw_rados_notify(librados::IoCtx& ioctx, const std::string& oid,
+ bufferlist& bl, uint64_t timeout_ms, bufferlist* pbl,
+ optional_yield y)
+{
+#ifdef HAVE_BOOST_CONTEXT
+ if (y) {
+ auto& context = y.get_io_context();
+ auto& yield = y.get_yield_context();
+ boost::system::error_code ec;
+ auto reply = librados::async_notify(context, ioctx, oid,
+ bl, timeout_ms, yield[ec]);
+ if (pbl) {
+ *pbl = std::move(reply);
+ }
+ return -ec.value();
+ }
+ if (is_asio_thread) {
+ dout(20) << "WARNING: blocking librados call" << dendl;
+ }
+#endif
+ return ioctx.notify2(oid, bl, timeout_ms, pbl);
+}
+
void parse_mime_map_line(const char *start, const char *end)
{
char line[end - start + 1];
}
}
-RGWDataAccess::RGWDataAccess(RGWRados *_store) : store(_store)
+RGWDataAccess::RGWDataAccess(rgw::sal::RGWRadosStore *_store) : store(_store)
{
- sysobj_ctx = std::make_unique<RGWSysObjectCtx>(store->svc.sysobj->init_obj_ctx());
+ sysobj_ctx = std::make_unique<RGWSysObjectCtx>(store->svc()->sysobj->init_obj_ctx());
}
int RGWDataAccess::Bucket::init()
{
- int ret = sd->store->get_bucket_info(*sd->sysobj_ctx,
+ int ret = sd->store->getRados()->get_bucket_info(sd->store->svc(),
tenant, name,
bucket_info,
&mtime,
+ null_yield,
&attrs);
if (ret < 0) {
return ret;
}
int RGWDataAccess::Object::put(bufferlist& data,
- map<string, bufferlist>& attrs)
+ map<string, bufferlist>& attrs,
+ const DoutPrefixProvider *dpp,
+ optional_yield y)
{
- RGWRados *store = sd->store;
+ rgw::sal::RGWRadosStore *store = sd->store;
CephContext *cct = store->ctx();
string tag;
RGWBucketInfo& bucket_info = bucket->bucket_info;
- using namespace rgw::putobj;
- rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
+ rgw::BlockingAioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
RGWObjectCtx obj_ctx(store);
rgw_obj obj(bucket_info.bucket, key);
auto& owner = bucket->policy.get_owner();
- string req_id = store->svc.zone_utils->unique_id(store->get_new_req_id());
+ string req_id = store->svc()->zone_utils->unique_id(store->getRados()->get_new_req_id());
- AtomicObjectProcessor processor(&aio, store, bucket_info,
- nullptr,
- owner.get_id(),
- obj_ctx, obj, olh_epoch, req_id);
+ using namespace rgw::putobj;
+ AtomicObjectProcessor processor(&aio, store, bucket_info, nullptr,
+ owner.get_id(), obj_ctx, obj, olh_epoch,
+ req_id, dpp, y);
- int ret = processor.prepare();
+ int ret = processor.prepare(y);
if (ret < 0)
return ret;
- using namespace rgw::putobj;
-
DataProcessor *filter = &processor;
CompressorRef plugin;
boost::optional<RGWPutObj_Compress> compressor;
- const auto& compression_type = store->svc.zone->get_zone_params().get_compression_type(bucket_info.placement_rule);
+ const auto& compression_type = store->svc()->zone->get_zone_params().get_compression_type(bucket_info.placement_rule);
if (compression_type != "none") {
plugin = Compressor::create(store->ctx(), compression_type);
if (!plugin) {
attrs, delete_at,
nullptr, nullptr,
puser_data,
- nullptr, nullptr);
+ nullptr, nullptr, y);
}
void RGWDataAccess::Object::set_policy(const RGWAccessControlPolicy& policy)