+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
+#include "include/compat.h"
#include <errno.h>
#include <stdlib.h>
#include <sys/types.h>
#include "cls/timeindex/cls_timeindex_client.h"
#include "cls/lock/cls_lock_client.h"
#include "cls/user/cls_user_client.h"
+#include "osd/osd_types.h"
#include "rgw_tools.h"
#include "rgw_coroutine.h"
#include "rgw_compression.h"
-#include "rgw_boost_asio_yield.h"
#undef fork // fails to compile RGWPeriod::fork() below
#include "common/Clock.h"
#include "rgw_sync.h"
#include "rgw_data_sync.h"
#include "rgw_realm_watcher.h"
+#include "rgw_reshard.h"
#include "compressor/Compressor.h"
-#include <atomic>
-
#define dout_context g_ceph_context
#define dout_subsys ceph_subsys_rgw
if (!obj.in_extra_data) {
*pool = placement.data_pool;
} else {
- *pool = placement.data_extra_pool;
+ *pool = placement.get_data_extra_pool();
}
}
}
r = rados->ioctx_create(pool.name.c_str(), ioctx);
- }
- if (r < 0) {
+ if (r < 0) {
+ return r;
+ }
+
+ r = ioctx.application_enable(pg_pool_t::APPLICATION_NAME_RGW, false);
+ if (r < 0 && r != -EOPNOTSUPP) {
+ return r;
+ }
+ } else if (r < 0) {
return r;
}
if (!pool.ns.empty()) {
for (map<string, RGWZone>::iterator iter = zones.begin(); iter != zones.end(); ++iter) {
RGWZone& zone = iter->second;
zone.log_data = log_data;
- zone.log_meta = (is_master && zone.id == master_zone);
RGWZoneParams zone_params(zone.id, zone.name);
int ret = zone_params.init(cct, store);
return -ENOENT;
}
-bool RGWPeriod::is_single_zonegroup(CephContext *cct, RGWRados *store)
-{
- return (period_map.zonegroups.size() == 1);
-}
-
const string& RGWPeriod::get_latest_epoch_oid()
{
if (cct->_conf->rgw_period_latest_epoch_info_oid.empty()) {
return oss.str();
}
-int RGWPeriod::read_latest_epoch(RGWPeriodLatestEpochInfo& info)
+int RGWPeriod::read_latest_epoch(RGWPeriodLatestEpochInfo& info,
+ RGWObjVersionTracker *objv)
{
string oid = get_period_oid_prefix() + get_latest_epoch_oid();
rgw_pool pool(get_pool(cct));
bufferlist bl;
RGWObjectCtx obj_ctx(store);
- int ret = rgw_get_system_obj(store, obj_ctx, pool, oid, bl, NULL, NULL);
+ int ret = rgw_get_system_obj(store, obj_ctx, pool, oid, bl, objv, nullptr);
if (ret < 0) {
ldout(cct, 1) << "error read_lastest_epoch " << pool << ":" << oid << dendl;
return ret;
return 0;
}
-int RGWPeriod::set_latest_epoch(epoch_t epoch, bool exclusive)
+int RGWPeriod::set_latest_epoch(epoch_t epoch, bool exclusive,
+ RGWObjVersionTracker *objv)
{
string oid = get_period_oid_prefix() + get_latest_epoch_oid();
::encode(info, bl);
return rgw_put_system_obj(store, pool, oid, bl.c_str(), bl.length(),
- exclusive, NULL, real_time(), NULL);
+ exclusive, objv, real_time(), nullptr);
+}
+
+int RGWPeriod::update_latest_epoch(epoch_t epoch)
+{
+ static constexpr int MAX_RETRIES = 20;
+
+ for (int i = 0; i < MAX_RETRIES; i++) {
+ RGWPeriodLatestEpochInfo info;
+ RGWObjVersionTracker objv;
+ bool exclusive = false;
+
+ // read existing epoch
+ int r = read_latest_epoch(info, &objv);
+ if (r == -ENOENT) {
+ // use an exclusive create to set the epoch atomically
+ exclusive = true;
+ ldout(cct, 20) << "creating initial latest_epoch=" << epoch
+ << " for period=" << id << dendl;
+ } else if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to read latest_epoch" << dendl;
+ return r;
+ } else if (epoch <= info.epoch) {
+ r = -EEXIST; // fail with EEXIST if epoch is not newer
+ ldout(cct, 1) << "found existing latest_epoch " << info.epoch
+ << " >= given epoch " << epoch << ", returning r=" << r << dendl;
+ return r;
+ } else {
+ ldout(cct, 20) << "updating latest_epoch from " << info.epoch
+ << " -> " << epoch << " on period=" << id << dendl;
+ }
+
+ r = set_latest_epoch(epoch, exclusive, &objv);
+ if (r == -EEXIST) {
+ continue; // exclusive create raced with another update, retry
+ } else if (r == -ECANCELED) {
+ continue; // write raced with a conflicting version, retry
+ }
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: failed to write latest_epoch" << dendl;
+ return r;
+ }
+ return 0; // return success
+ }
+
+ return -ECANCELED; // fail after max retries
}
int RGWPeriod::delete_obj()
ret = store_info(exclusive);
if (ret < 0) {
ldout(cct, 0) << "ERROR: storing info for " << id << ": " << cpp_strerror(-ret) << dendl;
+ return ret;
}
ret = set_latest_epoch(epoch);
int RGWPeriod::store_info(bool exclusive)
{
- epoch_t latest_epoch = FIRST_EPOCH - 1;
- int ret = get_latest_epoch(latest_epoch);
- if (ret < 0 && ret != -ENOENT) {
- ldout(cct, 0) << "ERROR: RGWPeriod::get_latest_epoch() returned " << cpp_strerror(-ret) << dendl;
- return ret;
- }
-
rgw_pool pool(get_pool(cct));
string oid = get_period_oid();
bufferlist bl;
::encode(*this, bl);
- ret = rgw_put_system_obj(store, pool, oid, bl.c_str(), bl.length(), exclusive, NULL, real_time(), NULL);
- if (ret < 0) {
- ldout(cct, 0) << "ERROR: rgw_put_system_obj(" << pool << ":" << oid << "): " << cpp_strerror(-ret) << dendl;
- return ret;
- }
- if (latest_epoch < epoch) {
- ret = set_latest_epoch(epoch);
- if (ret < 0) {
- ldout(cct, 0) << "ERROR: RGWPeriod::set_latest_epoch() returned " << cpp_strerror(-ret) << dendl;
- return ret;
- }
- }
- return 0;
+
+ return rgw_put_system_obj(store, pool, oid, bl.c_str(), bl.length(),
+ exclusive, NULL, real_time(), NULL);
}
rgw_pool RGWPeriod::get_pool(CephContext *cct)
return rgw_pool(cct->_conf->rgw_period_root_pool);
}
-int RGWPeriod::use_next_epoch()
-{
- epoch_t latest_epoch;
- int ret = get_latest_epoch(latest_epoch);
- if (ret < 0) {
- return ret;
- }
- epoch = latest_epoch + 1;
- ret = read_info();
- if (ret < 0 && ret != -ENOENT) {
- return ret;
- }
- if (ret == -ENOENT) {
- ret = create();
- if (ret < 0) {
- ldout(cct, 0) << "Error creating new epoch " << epoch << dendl;
- return ret;
- }
- }
- return 0;
-}
-
int RGWPeriod::add_zonegroup(const RGWZoneGroup& zonegroup)
{
if (zonegroup.realm_id != realm_id) {
return r;
}
// set as latest epoch
- r = set_latest_epoch(epoch);
+ r = update_latest_epoch(epoch);
+ if (r == -EEXIST) {
+ // already have this epoch (or a more recent one)
+ return 0;
+ }
if (r < 0) {
ldout(cct, 0) << "failed to set latest epoch: " << cpp_strerror(-r) << dendl;
return r;
pool_names.insert(zone.user_swift_pool);
pool_names.insert(zone.user_uid_pool);
pool_names.insert(zone.roles_pool);
+ pool_names.insert(zone.reshard_pool);
for(auto& iter : zone.placement_pools) {
pool_names.insert(iter.second.index_pool);
pool_names.insert(iter.second.data_pool);
user_swift_pool = fix_zone_pool_dup(pools, name, ".rgw.meta:users.swift", user_swift_pool);
user_uid_pool = fix_zone_pool_dup(pools, name, ".rgw.meta:users.uid", user_uid_pool);
roles_pool = fix_zone_pool_dup(pools, name, ".rgw.meta:roles", roles_pool);
+ reshard_pool = fix_zone_pool_dup(pools, name, ".rgw.log:reshard", reshard_pool);
for(auto& iter : placement_pools) {
iter.second.index_pool = fix_zone_pool_dup(pools, name, "." + default_bucket_index_pool_suffix,
iter != zonegroups.end(); ++iter) {
RGWZoneGroup& zonegroup = iter->second;
zonegroups_by_api[zonegroup.api_name] = zonegroup;
- if (zonegroup.is_master) {
+ if (zonegroup.is_master_zonegroup()) {
master_zonegroup = zonegroup.get_id();
}
}
int RGWPeriodMap::update(const RGWZoneGroup& zonegroup, CephContext *cct)
{
- if (zonegroup.is_master && (!master_zonegroup.empty() && zonegroup.get_id() != master_zonegroup)) {
+ if (zonegroup.is_master_zonegroup() && (!master_zonegroup.empty() && zonegroup.get_id() != master_zonegroup)) {
ldout(cct,0) << "Error updating periodmap, multiple master zonegroups configured "<< dendl;
ldout(cct,0) << "master zonegroup: " << master_zonegroup << " and " << zonegroup.get_id() <<dendl;
return -EINVAL;
zonegroups_by_api[zonegroup.api_name] = zonegroup;
}
- if (zonegroup.is_master) {
+ if (zonegroup.is_master_zonegroup()) {
master_zonegroup = zonegroup.get_id();
} else if (master_zonegroup == zonegroup.get_id()) {
master_zonegroup = "";
iter != zonegroups.end(); ++iter) {
RGWZoneGroup& zonegroup = iter->second;
zonegroups_by_api[zonegroup.api_name] = zonegroup;
- if (zonegroup.is_master) {
+ if (zonegroup.is_master_zonegroup()) {
master_zonegroup = zonegroup.get_name();
}
}
int RGWPutObjProcessor::complete(size_t accounted_size, const string& etag,
real_time *mtime, real_time set_mtime,
map<string, bufferlist>& attrs, real_time delete_at,
- const char *if_match, const char *if_nomatch, const string *user_data)
+ const char *if_match, const char *if_nomatch, const string *user_data,
+ rgw_zone_set *zones_trace)
{
- int r = do_complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at, if_match, if_nomatch, user_data);
+ int r = do_complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at, if_match, if_nomatch, user_data, zones_trace);
if (r < 0)
return r;
*pobj = cur_obj;
- if (!bl.length())
+ if (!bl.length()) {
+ *phandle = nullptr;
return 0;
+ }
return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
}
obj_len = (uint64_t)first_chunk.length();
}
while (pending_data_bl.length()) {
- void *handle;
+ void *handle = nullptr;
rgw_raw_obj obj;
uint64_t max_write_size = MIN(max_chunk_size, (uint64_t)next_part_ofs - data_ofs);
if (max_write_size > pending_data_bl.length()) {
map<string, bufferlist>& attrs,
real_time delete_at,
const char *if_match,
- const char *if_nomatch, const string *user_data) {
+ const char *if_nomatch, const string *user_data,
+ rgw_zone_set *zones_trace) {
int r = complete_writing_data();
if (r < 0)
return r;
obj_op.meta.olh_epoch = olh_epoch;
obj_op.meta.delete_at = delete_at;
obj_op.meta.user_data = user_data;
+ obj_op.meta.zones_trace = zones_trace;
r = obj_op.write_meta(obj_len, accounted_size, attrs);
if (r < 0) {
Mutex lock;
Cond cond;
+ void wait() {
+ Mutex::Locker l(lock);
+ cond.Wait(lock);
+ };
+
+ void wait_interval(const utime_t& wait_time) {
+ Mutex::Locker l(lock);
+ cond.WaitInterval(lock, wait_time);
+ }
+
public:
Worker(CephContext *_cct, RGWRadosThread *_p) : cct(_cct), processor(_p), lock("RGWRadosThread::Worker") {}
void *entry() override;
- void stop() {
+ void signal() {
Mutex::Locker l(lock);
cond.Signal();
}
void start();
void stop();
+
+ void signal() {
+ if (worker) {
+ worker->signal();
+ }
+ }
};
void RGWRadosThread::start()
down_flag = true;
stop_process();
if (worker) {
- worker->stop();
+ worker->signal();
worker->join();
}
delete worker;
utime_t wait_time = interval;
wait_time -= end;
- lock.Lock();
- cond.WaitInterval(lock, wait_time);
- lock.Unlock();
+ wait_interval(wait_time);
} else {
- lock.Lock();
- cond.Wait(lock);
- lock.Unlock();
+ wait();
}
} while (!processor->going_down());
int RGWRados::get_max_chunk_size(const rgw_pool& pool, uint64_t *max_chunk_size)
{
- uint64_t alignment;
+ uint64_t alignment = 0;
int r = get_required_alignment(pool, &alignment);
if (r < 0) {
return r;
return get_max_chunk_size(pool, max_chunk_size);
}
+class RGWIndexCompletionManager;
+
+struct complete_op_data {
+ Mutex lock{"complete_op_data"};
+ AioCompletion *rados_completion{nullptr};
+ int manager_shard_id{-1};
+ RGWIndexCompletionManager *manager{nullptr};
+ rgw_obj obj;
+ RGWModifyOp op;
+ string tag;
+ rgw_bucket_entry_ver ver;
+ cls_rgw_obj_key key;
+ rgw_bucket_dir_entry_meta dir_meta;
+ list<cls_rgw_obj_key> remove_objs;
+ bool log_op;
+ uint16_t bilog_op;
+ rgw_zone_set zones_trace;
+
+ bool stopped{false};
+
+ void stop() {
+ Mutex::Locker l(lock);
+ stopped = true;
+ }
+};
+
+class RGWIndexCompletionThread : public RGWRadosThread {
+ RGWRados *store;
+
+ uint64_t interval_msec() override {
+ return 0;
+ }
+
+ list<complete_op_data *> completions;
+
+ Mutex completions_lock;
+public:
+ RGWIndexCompletionThread(RGWRados *_store)
+ : RGWRadosThread(_store, "index-complete"), store(_store), completions_lock("RGWIndexCompletionThread::completions_lock") {}
+
+ int process() override;
+
+ void add_completion(complete_op_data *completion) {
+ {
+ Mutex::Locker l(completions_lock);
+ completions.push_back(completion);
+ }
+
+ signal();
+ }
+};
+
+int RGWIndexCompletionThread::process()
+{
+ list<complete_op_data *> comps;
+
+ {
+ Mutex::Locker l(completions_lock);
+ completions.swap(comps);
+ }
+
+ for (auto c : comps) {
+ std::unique_ptr<complete_op_data> up{c};
+
+ if (going_down()) {
+ continue;
+ }
+ ldout(store->ctx(), 20) << __func__ << "(): handling completion for key=" << c->key << dendl;
+
+ RGWRados::BucketShard bs(store);
+
+ int r = bs.init(c->obj.bucket, c->obj);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to initialize BucketShard, obj=" << c->obj << " r=" << r << dendl;
+ /* not much to do */
+ continue;
+ }
+
+ r = store->guard_reshard(&bs, c->obj, [&](RGWRados::BucketShard *bs) -> int {
+ librados::ObjectWriteOperation o;
+ cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
+ cls_rgw_bucket_complete_op(o, c->op, c->tag, c->ver, c->key, c->dir_meta, &c->remove_objs,
+ c->log_op, c->bilog_op, &c->zones_trace);
+
+ return bs->index_ctx.operate(bs->bucket_obj, &o);
+ });
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: " << __func__ << "(): bucket index completion failed, obj=" << c->obj << " r=" << r << dendl;
+ /* ignoring error, can't do anything about it */
+ continue;
+ }
+ r = store->data_log->add_entry(bs.bucket, bs.shard_id);
+ if (r < 0) {
+ lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
+ }
+ }
+
+ return 0;
+}
+
+class RGWIndexCompletionManager {
+ RGWRados *store{nullptr};
+ vector<Mutex *> locks;
+ vector<set<complete_op_data *> > completions;
+
+ RGWIndexCompletionThread *completion_thread{nullptr};
+
+ int num_shards;
+
+ std::atomic<int> cur_shard {0};
+
+
+public:
+ RGWIndexCompletionManager(RGWRados *_store) : store(_store) {
+ num_shards = store->ctx()->_conf->rgw_thread_pool_size;
+
+ for (int i = 0; i < num_shards; i++) {
+ char buf[64];
+ snprintf(buf, sizeof(buf), "RGWIndexCompletionManager::lock::%d", i);
+ locks.push_back(new Mutex(buf));
+ }
+
+ completions.resize(num_shards);
+ }
+ ~RGWIndexCompletionManager() {
+ stop();
+
+ for (auto l : locks) {
+ delete l;
+ }
+ }
+
+ int next_shard() {
+ int result = cur_shard % num_shards;
+ cur_shard++;
+ return result;
+ }
+
+ void create_completion(const rgw_obj& obj,
+ RGWModifyOp op, string& tag,
+ rgw_bucket_entry_ver& ver,
+ const cls_rgw_obj_key& key,
+ rgw_bucket_dir_entry_meta& dir_meta,
+ list<cls_rgw_obj_key> *remove_objs, bool log_op,
+ uint16_t bilog_op,
+ rgw_zone_set *zones_trace,
+ complete_op_data **result);
+ bool handle_completion(completion_t cb, complete_op_data *arg);
+
+ int start() {
+ completion_thread = new RGWIndexCompletionThread(store);
+ int ret = completion_thread->init();
+ if (ret < 0) {
+ return ret;
+ }
+ completion_thread->start();
+ return 0;
+ }
+ void stop() {
+ if (completion_thread) {
+ completion_thread->stop();
+ delete completion_thread;
+ }
+
+ for (int i = 0; i < num_shards; ++i) {
+ Mutex::Locker l(*locks[i]);
+ for (auto c : completions[i]) {
+ Mutex::Locker cl(c->lock);
+ c->stop();
+ }
+ }
+ completions.clear();
+ }
+};
+
+static void obj_complete_cb(completion_t cb, void *arg)
+{
+ complete_op_data *completion = (complete_op_data *)arg;
+ completion->lock.Lock();
+ if (completion->stopped) {
+ completion->lock.Unlock(); /* can drop lock, no one else is referencing us */
+ delete completion;
+ return;
+ }
+ bool need_delete = completion->manager->handle_completion(cb, completion);
+ completion->lock.Unlock();
+ if (need_delete) {
+ delete completion;
+ }
+}
+
+
+void RGWIndexCompletionManager::create_completion(const rgw_obj& obj,
+ RGWModifyOp op, string& tag,
+ rgw_bucket_entry_ver& ver,
+ const cls_rgw_obj_key& key,
+ rgw_bucket_dir_entry_meta& dir_meta,
+ list<cls_rgw_obj_key> *remove_objs, bool log_op,
+ uint16_t bilog_op,
+ rgw_zone_set *zones_trace,
+ complete_op_data **result)
+{
+ complete_op_data *entry = new complete_op_data;
+
+ int shard_id = next_shard();
+
+ entry->manager_shard_id = shard_id;
+ entry->manager = this;
+ entry->obj = obj;
+ entry->op = op;
+ entry->tag = tag;
+ entry->ver = ver;
+ entry->key = key;
+ entry->dir_meta = dir_meta;
+ entry->log_op = log_op;
+ entry->bilog_op = bilog_op;
+
+ if (remove_objs) {
+ for (auto iter = remove_objs->begin(); iter != remove_objs->end(); ++iter) {
+ entry->remove_objs.push_back(*iter);
+ }
+ }
+
+ if (zones_trace) {
+ entry->zones_trace = *zones_trace;
+ } else {
+ entry->zones_trace.insert(store->get_zone().id);
+ }
+
+ *result = entry;
+
+ entry->rados_completion = librados::Rados::aio_create_completion(entry, NULL, obj_complete_cb);
+
+ Mutex::Locker l(*locks[shard_id]);
+ completions[shard_id].insert(entry);
+}
+
+bool RGWIndexCompletionManager::handle_completion(completion_t cb, complete_op_data *arg)
+{
+ int shard_id = arg->manager_shard_id;
+ {
+ Mutex::Locker l(*locks[shard_id]);
+
+ auto& comps = completions[shard_id];
+
+ auto iter = comps.find(arg);
+ if (iter == comps.end()) {
+ return true;
+ }
+
+ comps.erase(iter);
+ }
+
+ int r = rados_aio_get_return_value(cb);
+ if (r != -ERR_BUSY_RESHARDING) {
+ return true;
+ }
+ completion_thread->add_completion(arg);
+ return false;
+}
+
void RGWRados::finalize()
{
if (run_sync_thread) {
if (async_rados) {
delete async_rados;
}
- if (use_gc_thread) {
- gc->stop_processor();
- obj_expirer->stop_processor();
- }
+
+ delete lc;
+ lc = NULL;
+
delete gc;
gc = NULL;
- if (use_lc_thread) {
- lc->stop_processor();
- }
- delete lc;
- lc = NULL;
-
delete obj_expirer;
obj_expirer = NULL;
delete binfo_cache;
delete obj_tombstone_cache;
delete sync_modules_manager;
+
+ if (reshard_wait.get()) {
+ reshard_wait->stop();
+ reshard_wait.reset();
+ }
+
+ if (run_reshard_thread) {
+ reshard->stop_processor();
+ }
+ delete reshard;
+ delete index_completion_manager;
}
/**
if (ret < 0) {
return ret;
}
-
ret = r.connect();
if (ret < 0) {
return ret;
return ret;
}
+
+int RGWRados::register_to_service_map(const string& daemon_type, const map<string, string>& meta)
+{
+ map<string,string> metadata = meta;
+ metadata["num_handles"] = stringify(rados.size());
+ metadata["zonegroup_id"] = zonegroup.get_id();
+ metadata["zonegroup_name"] = zonegroup.get_name();
+ metadata["zone_name"] = zone_name();
+ metadata["zone_id"] = zone_id();;
+ string name = cct->_conf->name.get_id();
+ if (name.find("rgw.") == 0) {
+ name = name.substr(4);
+ }
+ int ret = rados[0].service_daemon_register(daemon_type, name, metadata);
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: service_daemon_register() returned ret=" << ret << ": " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ return 0;
+}
+
/**
* Add new connection to connections map
* @param zonegroup_conn_map map which new connection will be added to
ldout(cct, 0) << __func__ << " failed init region "<< *iter << ": " << cpp_strerror(-ret) << dendl;
return ret;
}
- if (region.is_master) {
+ if (region.is_master_zonegroup()) {
master_region = region.get_id();
master_zone = region.master_zone;
}
}
}
ldout(cct, 20) << "zonegroup " << zonegroup.get_name() << dendl;
- if (zonegroup.is_master) {
+ if (zonegroup.is_master_zonegroup()) {
// use endpoints from the zonegroup's master zone
auto master = zonegroup.zones.find(zonegroup.master_zone);
if (master == zonegroup.zones.end()) {
zone_short_id = current_period.get_map().get_zone_short_id(zone_params.get_id());
- ret = sync_modules_manager->create_instance(cct, zone_public_config.tier_type, zone_params.tier_config, &sync_module);
- if (ret < 0) {
- lderr(cct) << "ERROR: failed to init sync module instance, ret=" << ret << dendl;
- return ret;
+ if (run_sync_thread) {
+ ret = sync_modules_manager->create_instance(cct, zone_public_config.tier_type, zone_params.tier_config, &sync_module);
+ if (ret < 0) {
+ lderr(cct) << "ERROR: failed to init sync module instance, ret=" << ret << dendl;
+ return ret;
+ }
}
writeable_zone = (zone_public_config.tier_type.empty() || zone_public_config.tier_type == "rgw");
zone_id_by_name[z.name] = id;
zone_by_id[id] = z;
}
-
+
if (zone_by_id.find(zone_id()) == zone_by_id.end()) {
ldout(cct, 0) << "WARNING: could not find zone config in zonegroup for local zone (" << zone_id() << "), will use defaults" << dendl;
}
if (ret < 0)
return ret;
+ ret = open_reshard_pool_ctx();
+ if (ret < 0)
+ return ret;
+
pools_initialized = true;
gc = new RGWGC();
lc = new RGWLC();
lc->initialize(cct, this);
-
+
if (use_lc_thread)
lc->start_processor();
-
+
quota_handler = RGWQuotaHandler::generate_handler(this, quota_threads);
bucket_index_max_shards = (cct->_conf->rgw_override_bucket_index_max_shards ? cct->_conf->rgw_override_bucket_index_max_shards :
get_zone().bucket_index_max_shards);
- if (bucket_index_max_shards > MAX_BUCKET_INDEX_SHARDS_PRIME) {
- bucket_index_max_shards = MAX_BUCKET_INDEX_SHARDS_PRIME;
+ if (bucket_index_max_shards > get_max_bucket_shards()) {
+ bucket_index_max_shards = get_max_bucket_shards();
ldout(cct, 1) << __func__ << " bucket index max shards is too large, reset to value: "
- << MAX_BUCKET_INDEX_SHARDS_PRIME << dendl;
+ << get_max_bucket_shards() << dendl;
}
ldout(cct, 20) << __func__ << " bucket index max shards: " << bucket_index_max_shards << dendl;
obj_tombstone_cache = new tombstone_cache_t(cct->_conf->rgw_obj_tombstone_cache_size);
}
+ reshard_wait = std::make_shared<RGWReshardWait>(this);
+
+ reshard = new RGWReshard(this);
+
+ /* only the master zone in the zonegroup reshards buckets */
+ run_reshard_thread = run_reshard_thread && (get_zonegroup().master_zone == zone_public_config.id);
+ if (run_reshard_thread) {
+ reshard->start_processor();
+ }
+
+ index_completion_manager = new RGWIndexCompletionManager(this);
+ ret = index_completion_manager->start();
+
return ret;
}
return rgw_init_ioctx(get_rados_handle(), get_zone_params().log_pool, objexp_pool_ctx, true);
}
+int RGWRados::open_reshard_pool_ctx()
+{
+ return rgw_init_ioctx(get_rados_handle(), get_zone_params().reshard_pool, reshard_pool_ctx, true);
+}
+
int RGWRados::init_watch()
{
int r = rgw_init_ioctx(&rados[0], get_zone_params().control_pool, control_pool_ctx, true);
if (r < 0 && r != -EEXIST)
return r;
- return rgw_init_ioctx(rad, pool, io_ctx);
+ r = rgw_init_ioctx(rad, pool, io_ctx);
+ if (r < 0)
+ return r;
+
+ r = io_ctx.application_enable(pg_pool_t::APPLICATION_NAME_RGW, false);
+ if (r < 0 && r != -EOPNOTSUPP)
+ return r;
+ return 0;
}
void RGWRados::build_bucket_index_marker(const string& shard_id_str, const string& shard_marker,
uint32_t val = index;
if (!name.empty()) {
- int max_user_shards = max(cct->_conf->rgw_usage_max_user_shards, 1);
+ int max_user_shards = cct->_conf->rgw_usage_max_user_shards;
val %= max_user_shards;
val += ceph_str_hash_linux(name.c_str(), name.size());
}
char buf[17];
- int max_shards = max(cct->_conf->rgw_usage_max_shards, 1);
+ int max_shards = cct->_conf->rgw_usage_max_shards;
snprintf(buf, sizeof(buf), RGW_USAGE_OBJ_PREFIX "%u", (unsigned)(val % max_shards));
hash = buf;
}
return 0;
}
-#define MAX_SHARDS_PRIME 7877
-
int RGWRados::key_to_shard_id(const string& key, int max_shards)
{
- uint32_t val = ceph_str_hash_linux(key.c_str(), key.size()) % MAX_SHARDS_PRIME;
- return val % max_shards;
+ return rgw_shards_hash(key, max_shards);
}
void RGWRados::shard_name(const string& prefix, unsigned max_shards, const string& key, string& name, int *shard_id)
return objname + buf;
}
-#define MAX_OBJEXP_SHARDS_PRIME 7877
-
int RGWRados::objexp_key_shard(const rgw_obj_index_key& key)
{
string obj_key = key.name + key.instance;
int num_shards = cct->_conf->rgw_objexp_hints_num_shards;
uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
- sid = sid2 % MAX_OBJEXP_SHARDS_PRIME % num_shards;
- return sid % num_shards;
+ sid = rgw_shards_mod(sid2, num_shards);
+ return sid;
}
static string objexp_hint_get_keyext(const string& tenant_name,
}
+int RGWRados::Bucket::update_bucket_id(const string& new_bucket_id)
+{
+ rgw_bucket bucket = bucket_info.bucket;
+ bucket.update_bucket_id(new_bucket_id);
+
+ RGWObjectCtx obj_ctx(store);
+
+ int ret = store->get_bucket_instance_info(obj_ctx, bucket, bucket_info, nullptr, nullptr);
+ if (ret < 0) {
+ return ret;
+ }
+
+ return 0;
+}
+
/**
* get listing of the objects in a bucket.
*
* common_prefixes: if delim is filled in, any matching prefixes are placed here.
* is_truncated: if number of objects in the bucket is bigger than max, then truncated.
*/
-int RGWRados::Bucket::List::list_objects(int max, vector<rgw_bucket_dir_entry> *result,
+int RGWRados::Bucket::List::list_objects(int64_t max,
+ vector<rgw_bucket_dir_entry> *result,
map<string, bool> *common_prefixes,
bool *is_truncated)
{
bigger_than_delim = buf;
/* if marker points at a common prefix, fast forward it into its upperbound string */
- int delim_pos = cur_marker.name.find(params.delim, params.prefix.size());
+ int delim_pos = cur_marker.name.find(params.delim, cur_prefix.size());
if (delim_pos >= 0) {
string s = cur_marker.name.substr(0, delim_pos);
s.append(bigger_than_delim);
next_marker = prefix_key;
(*common_prefixes)[prefix_key] = true;
- skip_after_delim = obj.name.substr(0, delim_pos);
+ int marker_delim_pos = cur_marker.name.find(params.delim, cur_prefix.size());
+
+ skip_after_delim = cur_marker.name.substr(0, marker_delim_pos);
skip_after_delim.append(bigger_than_delim);
ldout(cct, 20) << "skip_after_delim=" << skip_after_delim << dendl;
if (ret < 0)
return ret;
+ librados::IoCtx io_ctx;
+ ret = rad->ioctx_create(pool.name.c_str(), io_ctx);
+ if (ret < 0)
+ return ret;
+
+ ret = io_ctx.application_enable(pg_pool_t::APPLICATION_NAME_RGW, false);
+ if (ret < 0 && ret != -EOPNOTSUPP)
+ return ret;
return 0;
}
{
librados::IoCtx index_ctx; // context for new bucket
+ string dir_oid = dir_oid_prefix;
int r = open_bucket_index_ctx(bucket_info, index_ctx);
- if (r < 0)
+ if (r < 0) {
return r;
+ }
- string dir_oid = dir_oid_prefix;
dir_oid.append(bucket_info.bucket.bucket_id);
map<int, string> bucket_objs;
*bucket_id = buf;
}
-/**
- * create a bucket with name bucket and the given list of attrs
- * returns 0 on success, -ERR# otherwise.
- */
int RGWRados::create_bucket(RGWUserInfo& owner, rgw_bucket& bucket,
const string& zonegroup_id,
const string& placement_rule,
string *pselected_rule_name, RGWZonePlacementInfo *rule_info)
{
- /* first check that rule exists within the specific zonegroup */
+ /* first check that zonegroup exists within current period. */
RGWZoneGroup zonegroup;
int ret = get_zonegroup(zonegroup_id, zonegroup);
if (ret < 0) {
return ret;
}
- /* now check that tag exists within zonegroup */
/* find placement rule. Hierarchy: request rule > user default rule > zonegroup default rule */
- string rule = request_rule;
- if (rule.empty()) {
- rule = user_info.default_placement;
- if (rule.empty())
- rule = zonegroup.default_placement;
- }
-
- if (rule.empty()) {
- ldout(cct, 0) << "misconfiguration, should not have an empty placement rule name" << dendl;
- return -EIO;
- }
-
- map<string, RGWZoneGroupPlacementTarget>::iterator titer = zonegroup.placement_targets.find(rule);
- if (titer == zonegroup.placement_targets.end()) {
- ldout(cct, 0) << "could not find placement rule " << rule << " within zonegroup " << dendl;
- return -EINVAL;
+ std::map<std::string, RGWZoneGroupPlacementTarget>::const_iterator titer;
+
+ if (!request_rule.empty()) {
+ titer = zonegroup.placement_targets.find(request_rule);
+ if (titer == zonegroup.placement_targets.end()) {
+ ldout(cct, 0) << "could not find requested placement id " << request_rule
+ << " within zonegroup " << dendl;
+ return -ERR_INVALID_LOCATION_CONSTRAINT;
+ }
+ } else if (!user_info.default_placement.empty()) {
+ titer = zonegroup.placement_targets.find(user_info.default_placement);
+ if (titer == zonegroup.placement_targets.end()) {
+ ldout(cct, 0) << "could not find user default placement id " << user_info.default_placement
+ << " within zonegroup " << dendl;
+ return -ERR_INVALID_LOCATION_CONSTRAINT;
+ }
+ } else {
+ if (zonegroup.default_placement.empty()) { // zonegroup default rule as fallback, it should not be empty.
+ ldout(cct, 0) << "misconfiguration, zonegroup default placement id should not be empty." << dendl;
+ return -ERR_ZONEGROUP_DEFAULT_PLACEMENT_MISCONFIGURATION;
+ } else {
+ titer = zonegroup.placement_targets.find(zonegroup.default_placement);
+ if (titer == zonegroup.placement_targets.end()) {
+ ldout(cct, 0) << "could not find zonegroup default placement id " << zonegroup.default_placement
+ << " within zonegroup " << dendl;
+ return -ERR_INVALID_LOCATION_CONSTRAINT;
+ }
+ }
}
/* now check tag for the rule, whether user is permitted to use rule */
- RGWZoneGroupPlacementTarget& target_rule = titer->second;
+ const auto& target_rule = titer->second;
if (!target_rule.user_permitted(user_info.placement_tags)) {
- ldout(cct, 0) << "user not permitted to use placement rule" << dendl;
+ ldout(cct, 0) << "user not permitted to use placement rule " << titer->first << dendl;
return -EPERM;
}
if (pselected_rule_name)
- *pselected_rule_name = rule;
+ *pselected_rule_name = titer->first;
- return select_bucket_location_by_rule(rule, rule_info);
+ return select_bucket_location_by_rule(titer->first, rule_info);
}
int RGWRados::select_bucket_location_by_rule(const string& location_rule, RGWZonePlacementInfo *rule_info)
map<string, RGWZonePlacementInfo>::iterator piter = get_zone_params().placement_pools.find(location_rule);
if (piter == get_zone_params().placement_pools.end()) {
/* couldn't find, means we cannot really place data for this bucket in this zone */
- if (get_zonegroup().equals(zonegroup_id)) {
+ if (get_zonegroup().equals(zonegroup.get_id())) {
/* that's a configuration error, zone should have that rule, as we're within the requested
* zonegroup */
return -EINVAL;
vector<int>::iterator riter;
vector<librados::PoolAsyncCompletion *>::iterator citer;
+ bool error = false;
assert(rets.size() == completions.size());
for (riter = rets.begin(), citer = completions.begin(); riter != rets.end(); ++riter, ++citer) {
int r = *riter;
r = c->get_return_value();
if (r < 0) {
ldout(cct, 0) << "WARNING: async pool_create returned " << r << dendl;
+ error = true;
}
}
c->release();
retcodes.push_back(r);
}
+ if (error) {
+ return 0;
+ }
+
+ std::vector<librados::IoCtx> io_ctxs;
+ retcodes.clear();
+ for (auto pool : pools) {
+ io_ctxs.emplace_back();
+ int ret = rad->ioctx_create(pool.name.c_str(), io_ctxs.back());
+ if (ret < 0) {
+ ldout(cct, 0) << "WARNING: ioctx_create returned " << ret << dendl;
+ error = true;
+ }
+ retcodes.push_back(ret);
+ }
+ if (error) {
+ return 0;
+ }
+
+ completions.clear();
+ for (auto &io_ctx : io_ctxs) {
+ librados::PoolAsyncCompletion *c =
+ librados::Rados::pool_async_create_completion();
+ completions.push_back(c);
+ int ret = io_ctx.application_enable_async(pg_pool_t::APPLICATION_NAME_RGW,
+ false, c);
+ assert(ret == 0);
+ }
+
+ retcodes.clear();
+ for (auto c : completions) {
+ c->wait();
+ int ret = c->get_return_value();
+ if (ret == -EOPNOTSUPP) {
+ ret = 0;
+ } else if (ret < 0) {
+ ldout(cct, 0) << "WARNING: async application_enable returned " << ret
+ << dendl;
+ error = true;
+ }
+ c->release();
+ retcodes.push_back(ret);
+ }
return 0;
}
return 0;
}
-int RGWRados::get_raw_obj_ref(const rgw_raw_obj& obj, rgw_rados_ref *ref, rgw_pool *pool)
+int RGWRados::get_raw_obj_ref(const rgw_raw_obj& obj, rgw_rados_ref *ref)
{
ref->oid = obj.oid;
ref->key = obj.loc;
} else {
ref->pool = obj.pool;
}
- if (pool) {
- *pool = ref->pool;
- }
r = open_pool_ctx(ref->pool, ref->ioctx);
if (r < 0)
return r;
return 0;
}
-int RGWRados::get_system_obj_ref(const rgw_raw_obj& obj, rgw_rados_ref *ref, rgw_pool *pool)
+int RGWRados::get_system_obj_ref(const rgw_raw_obj& obj, rgw_rados_ref *ref)
{
- return get_raw_obj_ref(obj, ref, pool);
+ return get_raw_obj_ref(obj, ref);
}
/*
NULL, /* string *version_id */
NULL, /* string *ptag */
NULL, /* string *petag */
- NULL, /* struct rgw_err *err */
NULL, /* void (*progress_cb)(off_t, void *) */
NULL); /* void *progress_data */
if (r == -ECANCELED || r == -ENOENT) {
nullptr, /* string *version_id */
nullptr, /* string *ptag */
nullptr, /* string *petag */
- nullptr, /* struct rgw_err *err */
nullptr, /* void (*progress_cb)(off_t, void *) */
nullptr); /* void *progress_data */
if (ret == -ECANCELED || ret == -ENOENT) {
void *_index_op)
{
RGWRados::Bucket::UpdateIndex *index_op = static_cast<RGWRados::Bucket::UpdateIndex *>(_index_op);
- rgw_pool pool;
- rgw_rados_ref ref;
RGWRados *store = target->get_store();
ObjectWriteOperation op;
return -EIO;
}
+ rgw_rados_ref ref;
r = store->get_obj_head_ref(target->get_bucket_info(), obj, &ref);
if (r < 0)
return r;
uint64_t epoch;
int64_t poolid;
-
- bool orig_exists = state->exists;
- uint64_t orig_size = state->accounted_size;
+ bool orig_exists;
+ uint64_t orig_size;
+
+ if (!reset_obj) { //Multipart upload, it has immutable head.
+ orig_exists = false;
+ orig_size = 0;
+ } else {
+ orig_exists = state->exists;
+ orig_size = state->accounted_size;
+ }
bool versioned_target = (meta.olh_epoch > 0 || !obj.key.instance.empty());
state = NULL;
if (versioned_op) {
- r = store->set_olh(target->get_ctx(), target->get_bucket_info(), obj, false, NULL, meta.olh_epoch, real_time(), false);
+ r = store->set_olh(target->get_ctx(), target->get_bucket_info(), obj, false, NULL, meta.olh_epoch, real_time(), false, meta.zones_trace);
if (r < 0) {
return r;
}
RGWRados::Bucket bop(target->get_store(), bucket_info);
RGWRados::Bucket::UpdateIndex index_op(&bop, target->get_obj());
-
+ index_op.set_zones_trace(meta.zones_trace);
+
bool assume_noent = (meta.if_match == NULL && meta.if_nomatch == NULL);
int r;
if (assume_noent) {
RGWObjVersionTracker *objv_tracker,
real_time set_mtime /* 0 for don't set */)
{
- rgw_pool pool;
rgw_rados_ref ref;
- int r = get_system_obj_ref(obj, &ref, &pool);
+ int r = get_system_obj_ref(obj, &ref);
if (r < 0)
return r;
RGWObjVersionTracker *objv_tracker)
{
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_system_obj_ref(obj, &ref, &pool);
+ int r = get_system_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
}
int complete(const string& etag, real_time *mtime, real_time set_mtime,
- map<string, bufferlist>& attrs, real_time delete_at) {
- return processor->complete(data_len, etag, mtime, set_mtime, attrs, delete_at);
+ map<string, bufferlist>& attrs, real_time delete_at, rgw_zone_set *zones_trace) {
+ return processor->complete(data_len, etag, mtime, set_mtime, attrs, delete_at, NULL, NULL, NULL, zones_trace);
}
bool is_canceled() {
}
return copy_obj_data(rctx, dest_bucket_info, read_op, obj_size - 1, obj, obj, max_chunk_size, NULL, mtime, attrset,
- RGW_OBJ_CATEGORY_MAIN, 0, real_time(), NULL, NULL, NULL, NULL);
+ RGW_OBJ_CATEGORY_MAIN, 0, real_time(), NULL, NULL, NULL);
}
struct obj_time_weight {
string *version_id,
string *ptag,
ceph::buffer::list *petag,
- struct rgw_err *err,
void (*progress_cb)(off_t, void *),
- void *progress_data)
+ void *progress_data,
+ rgw_zone_set *zones_trace)
{
/* source is in a different zonegroup, copy from there */
#define MAX_COMPLETE_RETRY 100
for (i = 0; i < MAX_COMPLETE_RETRY; i++) {
- ret = cb.complete(etag, mtime, set_mtime, attrs, delete_at);
+ ret = cb.complete(etag, mtime, set_mtime, attrs, delete_at, zones_trace);
if (ret < 0) {
goto set_err_state;
}
int ret = rest_master_conn->put_obj_init(user_id, dest_obj, astate->size, src_attrs, &out_stream_req);
if (ret < 0) {
- delete out_stream_req;
return ret;
}
ret = read_op.iterate(0, astate->size - 1, out_stream_req->get_out_cb());
- if (ret < 0)
+ if (ret < 0) {
+ delete out_stream_req;
return ret;
+ }
ret = rest_master_conn->complete_request(out_stream_req, etag, mtime);
if (ret < 0)
string *version_id,
string *ptag,
ceph::buffer::list *petag,
- struct rgw_err *err,
void (*progress_cb)(off_t, void *),
void *progress_data)
{
dest_obj, src_obj, dest_bucket_info, src_bucket_info, src_mtime, mtime, mod_ptr,
unmod_ptr, high_precision_time,
if_match, if_nomatch, attrs_mod, copy_if_newer, attrs, category,
- olh_epoch, delete_at, version_id, ptag, petag, err, progress_cb, progress_data);
+ olh_epoch, delete_at, version_id, ptag, petag, progress_cb, progress_data);
}
map<string, bufferlist> src_attrs;
read_op.params.attrs = &src_attrs;
read_op.params.lastmod = src_mtime;
read_op.params.obj_size = &obj_size;
- read_op.params.perr = err;
ret = read_op.prepare();
if (ret < 0) {
if (copy_data) { /* refcounting tail wouldn't work here, just copy the data */
return copy_obj_data(obj_ctx, dest_bucket_info, read_op, obj_size - 1, dest_obj, src_obj,
max_chunk_size, mtime, real_time(), attrs, category, olh_epoch, delete_at,
- version_id, ptag, petag, err);
+ version_id, ptag, petag);
}
RGWObjManifest::obj_iterator miter = astate->manifest.obj_begin();
bool copy_itself = (dest_obj == src_obj);
RGWObjManifest *pmanifest;
- ldout(cct, 0) << "dest_obj=" << dest_obj << " src_obj=" << src_obj << " copy_itself=" << (int)copy_itself << dendl;
+ ldout(cct, 20) << "dest_obj=" << dest_obj << " src_obj=" << src_obj << " copy_itself=" << (int)copy_itself << dendl;
RGWRados::Object dest_op_target(this, dest_bucket_info, obj_ctx, dest_obj);
RGWRados::Object::Write write_op(&dest_op_target);
real_time delete_at,
string *version_id,
string *ptag,
- ceph::buffer::list *petag,
- struct rgw_err *err)
+ ceph::buffer::list *petag)
{
bufferlist first_chunk;
RGWObjManifest manifest;
bool RGWRados::is_meta_master()
{
- if (!get_zonegroup().is_master) {
+ if (!get_zonegroup().is_master_zonegroup()) {
return false;
}
}
/* zonegroup is not master zonegroup */
- if (!get_zonegroup().is_master) {
+ if (!get_zonegroup().is_master_zonegroup()) {
return false;
}
/* single zonegroup and a single zone */
- if (current_period.is_single_zonegroup(cct, this) && get_zonegroup().zones.size() == 1) {
+ if (current_period.is_single_zonegroup() && get_zonegroup().zones.size() == 1) {
return false;
}
RGWBucketInfo info;
map<string, bufferlist> attrs;
RGWObjectCtx obj_ctx(this);
- int r = get_bucket_info(obj_ctx, bucket.tenant, bucket.name, info, NULL, &attrs);
+ int r;
+ if (bucket.bucket_id.empty()) {
+ r = get_bucket_info(obj_ctx, bucket.tenant, bucket.name, info, NULL, &attrs);
+ } else {
+ r = get_bucket_instance_info(obj_ctx, bucket, info, nullptr, &attrs);
+ }
if (r < 0) {
ldout(cct, 0) << "NOTICE: get_bucket_info on bucket=" << bucket.name << " returned err=" << r << dendl;
return r;
// value - bucket index check OP returned result with the given bucket index object (shard)
map<int, string> oids;
map<int, struct rgw_cls_check_index_ret> bucket_objs_ret;
+
int ret = open_bucket_index(bucket_info, index_ctx, oids, bucket_objs_ret);
- if (ret < 0)
- return ret;
+ if (ret < 0) {
+ return ret;
+ }
ret = CLSRGWIssueBucketCheck(index_ctx, oids, bucket_objs_ret, cct->_conf->rgw_bucket_index_max_aio)();
- if (ret < 0)
- return ret;
+ if (ret < 0) {
+ return ret;
+ }
// Aggregate results (from different shards if there is any)
map<int, struct rgw_cls_check_index_ret>::iterator iter;
{
librados::IoCtx index_ctx;
map<int, string> bucket_objs;
+
int r = open_bucket_index(bucket_info, index_ctx, bucket_objs);
- if (r < 0)
+ if (r < 0) {
return r;
+ }
return CLSRGWIssueBucketRebuild(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
}
+int RGWRados::bucket_set_reshard(RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry)
+{
+ librados::IoCtx index_ctx;
+ map<int, string> bucket_objs;
+
+ int r = open_bucket_index(bucket_info, index_ctx, bucket_objs);
+ if (r < 0) {
+ return r;
+ }
+
+ return CLSRGWIssueSetBucketResharding(index_ctx, bucket_objs, entry, cct->_conf->rgw_bucket_index_max_aio)();
+}
int RGWRados::defer_gc(void *ctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj)
{
meta.mtime = params.mtime;
}
- int r = store->set_olh(target->get_ctx(), target->get_bucket_info(), marker, true, &meta, params.olh_epoch, params.unmod_since, params.high_precision_time);
+ int r = store->set_olh(target->get_ctx(), target->get_bucket_info(), marker, true, &meta, params.olh_epoch, params.unmod_since, params.high_precision_time, params.zones_trace);
if (r < 0) {
return r;
}
return r;
}
result.delete_marker = dirent.is_delete_marker();
- r = store->unlink_obj_instance(target->get_ctx(), target->get_bucket_info(), obj, params.olh_epoch);
+ r = store->unlink_obj_instance(target->get_ctx(), target->get_bucket_info(), obj, params.olh_epoch, params.zones_trace);
if (r < 0) {
return r;
}
return r;
}
- r = store->data_log->add_entry(bs->bucket, bs->shard_id);
- if (r < 0) {
- lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
- return r;
+ if (target->bucket_info.datasync_flag_enabled()) {
+ r = store->data_log->add_entry(bs->bucket, bs->shard_id);
+ if (r < 0) {
+ lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
+ return r;
+ }
}
return 0;
RGWRados::Bucket bop(store, bucket_info);
RGWRados::Bucket::UpdateIndex index_op(&bop, obj);
-
+
+ index_op.set_zones_trace(params.zones_trace);
index_op.set_bilog_flags(params.bilog_flags);
need_invalidate = true;
r = 0;
}
- bool removed = (r >= 0);
int64_t poolid = ref.ioctx.get_id();
if (r >= 0) {
obj_tombstone_cache->add(obj, entry);
}
r = index_op.complete_del(poolid, ref.ioctx.get_last_version(), state->mtime, params.remove_objs);
- } else {
- int ret = index_op.cancel();
- if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: index_op.cancel() returned ret=" << ret << dendl;
- }
- }
- if (removed) {
+
int ret = target->complete_atomic_modification();
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: complete_atomic_modification returned ret=" << ret << dendl;
}
/* other than that, no need to propagate error */
+ } else {
+ int ret = index_op.cancel();
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: index_op.cancel() returned ret=" << ret << dendl;
+ }
}
if (need_invalidate) {
const rgw_obj& obj,
int versioning_status,
uint16_t bilog_flags,
- const real_time& expiration_time)
+ const real_time& expiration_time,
+ rgw_zone_set *zones_trace)
{
RGWRados::Object del_target(this, bucket_info, obj_ctx, obj);
RGWRados::Object::Delete del_op(&del_target);
del_op.params.versioning_status = versioning_status;
del_op.params.bilog_flags = bilog_flags;
del_op.params.expiration_time = expiration_time;
+ del_op.params.zones_trace = zones_trace;
return del_op.delete_obj();
}
int RGWRados::delete_raw_obj(const rgw_raw_obj& obj)
{
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_raw_obj_ref(obj, &ref, &pool);
+ int r = get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
return -EINVAL;
}
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_raw_obj_ref(obj, &ref, &pool);
+ int r = get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
s->obj_tag = s->attrset[RGW_ATTR_ID_TAG];
if (s->obj_tag.length())
- ldout(cct, 20) << "get_system_obj_state: setting s->obj_tag to "
- << string(s->obj_tag.c_str(), s->obj_tag.length()) << dendl;
+ ldout(cct, 20) << "get_system_obj_state: setting s->obj_tag to "
+ << s->obj_tag.c_str() << dendl;
else
ldout(cct, 20) << "get_system_obj_state: s->obj_tag was set empty" << dendl;
s->accounted_size = s->size;
auto iter = s->attrset.find(RGW_ATTR_COMPRESSION);
- if (iter != s->attrset.end()) {
+ const bool compressed = (iter != s->attrset.end());
+ if (compressed) {
// use uncompressed size for accounted_size
try {
RGWCompressionInfo info;
auto p = iter->second.begin();
::decode(info, p);
- s->accounted_size = info.orig_size;
+ s->accounted_size = info.orig_size;
} catch (buffer::error&) {
dout(0) << "ERROR: could not decode compression info for object: " << obj << dendl;
return -EIO;
s->manifest.set_head(bucket_info.placement_rule, obj, s->size); /* patch manifest to reflect the head we just read, some manifests might be
broken due to old bugs */
s->size = s->manifest.get_obj_size();
+ if (!compressed)
+ s->accounted_size = s->size;
} catch (buffer::error& err) {
ldout(cct, 0) << "ERROR: couldn't decode manifest" << dendl;
return -EIO;
}
}
if (s->obj_tag.length())
- ldout(cct, 20) << "get_obj_state: setting s->obj_tag to " << string(s->obj_tag.c_str(), s->obj_tag.length()) << dendl;
+ ldout(cct, 20) << "get_obj_state: setting s->obj_tag to " << s->obj_tag.c_str() << dendl;
else
ldout(cct, 20) << "get_obj_state: s->obj_tag was set empty" << dendl;
}
/**
- * Get the attributes for an object.
- * bucket: name of the bucket holding the object.
- * obj: name of the object
+ * Get an attribute for a system object.
+ * obj: the object to get attr
* name: name of the attr to retrieve
* dest: bufferlist to store the result in
* Returns: 0 on success, -ERR# otherwise.
int RGWRados::system_obj_get_attr(rgw_raw_obj& obj, const char *name, bufferlist& dest)
{
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_system_obj_ref(obj, &ref, &pool);
+ int r = get_system_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
RGWObjVersionTracker *objv_tracker)
{
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_system_obj_ref(obj, &ref, &pool);
+ int r = get_system_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
return 0;
}
-/**
- * Get data about an object out of RADOS and into memory.
- * bucket: name of the bucket the object is in.
- * obj: name/key of the object to read
- * data: if get_data==true, this pointer will be set
- * to an address containing the object's data/value
- * attrs: if non-NULL, the pointed-to map will contain
- * all the attrs of the object when this function returns
- * mod_ptr: if non-NULL, compares the object's mtime to *mod_ptr,
- * and if mtime is smaller it fails.
- * unmod_ptr: if non-NULL, compares the object's mtime to *unmod_ptr,
- * and if mtime is >= it fails.
- * if_match/nomatch: if non-NULL, compares the object's etag attr
- * to the string and, if it doesn't/does match, fails out.
- * get_data: if true, the object's data/value will be read out, otherwise not
- * err: Many errors will result in this structure being filled
- * with extra informatin on the error.
- * Returns: -ERR# on failure, otherwise
- * (if get_data==true) length of read data,
- * (if get_data==false) length of the object
- */
-// P3 XXX get_data is not seen used anywhere.
int RGWRados::Object::Read::prepare()
{
RGWRados *store = source->get_store();
return 0;
}
+
+int RGWRados::Bucket::UpdateIndex::guard_reshard(BucketShard **pbs, std::function<int(BucketShard *)> call)
+{
+ RGWRados *store = target->get_store();
+ BucketShard *bs;
+ int r;
+
+#define NUM_RESHARD_RETRIES 10
+ for (int i = 0; i < NUM_RESHARD_RETRIES; ++i) {
+ int ret = get_bucket_shard(&bs);
+ if (ret < 0) {
+ ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
+ return ret;
+ }
+ r = call(bs);
+ if (r != -ERR_BUSY_RESHARDING) {
+ break;
+ }
+ ldout(store->ctx(), 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl;
+ string new_bucket_id;
+ r = store->block_while_resharding(bs, &new_bucket_id);
+ if (r == -ERR_BUSY_RESHARDING) {
+ continue;
+ }
+ if (r < 0) {
+ return r;
+ }
+ ldout(store->ctx(), 20) << "reshard completion identified, new_bucket_id=" << new_bucket_id << dendl;
+ i = 0; /* resharding is finished, make sure we can retry */
+ r = target->update_bucket_id(new_bucket_id);
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: update_bucket_id() new_bucket_id=" << new_bucket_id << " returned r=" << r << dendl;
+ return r;
+ }
+ invalidate_bs();
+ }
+
+ if (r < 0) {
+ return r;
+ }
+
+ if (pbs) {
+ *pbs = bs;
+ }
+
+ return 0;
+}
+
int RGWRados::SystemObject::Read::stat(RGWObjVersionTracker *objv_tracker)
{
RGWRados *store = source->get_store();
return 0;
}
RGWRados *store = target->get_store();
- BucketShard *bs;
- int ret = get_bucket_shard(&bs);
- if (ret < 0) {
- ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
- return ret;
- }
if (write_tag && write_tag->length()) {
optag = string(write_tag->c_str(), write_tag->length());
}
}
- int r = store->cls_obj_prepare_op(*bs, op, optag, obj, bilog_flags);
+ int r = guard_reshard(nullptr, [&](BucketShard *bs) -> int {
+ return store->cls_obj_prepare_op(*bs, op, optag, obj, bilog_flags, zones_trace);
+ });
+
if (r < 0) {
return r;
}
prepared = true;
+
return 0;
}
}
RGWRados *store = target->get_store();
BucketShard *bs;
+
int ret = get_bucket_shard(&bs);
if (ret < 0) {
ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
ent.meta.owner_display_name = owner.get_display_name();
ent.meta.content_type = content_type;
- ret = store->cls_obj_complete_add(*bs, optag, poolid, epoch, ent, category, remove_objs, bilog_flags);
+ ret = store->cls_obj_complete_add(*bs, obj, optag, poolid, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
- int r = store->data_log->add_entry(bs->bucket, bs->shard_id);
- if (r < 0) {
- lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
+ if (target->bucket_info.datasync_flag_enabled()) {
+ int r = store->data_log->add_entry(bs->bucket, bs->shard_id);
+ if (r < 0) {
+ lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
+ }
}
return ret;
}
RGWRados *store = target->get_store();
BucketShard *bs;
+
int ret = get_bucket_shard(&bs);
if (ret < 0) {
ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
return ret;
}
- ret = store->cls_obj_complete_del(*bs, optag, poolid, epoch, obj, removed_mtime, remove_objs, bilog_flags);
+ ret = store->cls_obj_complete_del(*bs, optag, poolid, epoch, obj, removed_mtime, remove_objs, bilog_flags, zones_trace);
- int r = store->data_log->add_entry(bs->bucket, bs->shard_id);
- if (r < 0) {
- lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
+ if (target->bucket_info.datasync_flag_enabled()) {
+ int r = store->data_log->add_entry(bs->bucket, bs->shard_id);
+ if (r < 0) {
+ lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
+ }
}
return ret;
}
RGWRados *store = target->get_store();
BucketShard *bs;
- int ret = get_bucket_shard(&bs);
- if (ret < 0) {
- ldout(store->ctx(), 5) << "failed to get BucketShard object: ret=" << ret << dendl;
- return ret;
- }
- ret = store->cls_obj_complete_cancel(*bs, optag, obj, bilog_flags);
+ int ret = guard_reshard(&bs, [&](BucketShard *bs) -> int {
+ return store->cls_obj_complete_cancel(*bs, optag, obj, bilog_flags, zones_trace);
+ });
/*
* need to update data log anyhow, so that whoever follows needs to update its internal markers
* for following the specific bucket shard log. Otherwise they end up staying behind, and users
* have no way to tell that they're all caught up
*/
- int r = store->data_log->add_entry(bs->bucket, bs->shard_id);
- if (r < 0) {
- lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
+ if (target->bucket_info.datasync_flag_enabled()) {
+ int r = store->data_log->add_entry(bs->bucket, bs->shard_id);
+ if (r < 0) {
+ lderr(store->ctx()) << "ERROR: failed writing data log" << dendl;
+ }
}
return ret;
int RGWRados::SystemObject::Read::GetObjState::get_ref(RGWRados *store, rgw_raw_obj& obj, rgw_rados_ref **pref)
{
if (!has_ref) {
- rgw_pool pool;
- int r = store->get_raw_obj_ref(obj, &ref, &pool);
+ int r = store->get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
return ret;
}
+int RGWRados::guard_reshard(BucketShard *bs, const rgw_obj& obj_instance, std::function<int(BucketShard *)> call)
+{
+ rgw_obj obj;
+ const rgw_obj *pobj = &obj_instance;
+ int r;
+
+ for (int i = 0; i < NUM_RESHARD_RETRIES; ++i) {
+ r = bs->init(pobj->bucket, *pobj);
+ if (r < 0) {
+ ldout(cct, 5) << "bs.init() returned ret=" << r << dendl;
+ return r;
+ }
+ r = call(bs);
+ if (r != -ERR_BUSY_RESHARDING) {
+ break;
+ }
+ ldout(cct, 0) << "NOTICE: resharding operation on bucket index detected, blocking" << dendl;
+ string new_bucket_id;
+ r = block_while_resharding(bs, &new_bucket_id);
+ if (r == -ERR_BUSY_RESHARDING) {
+ continue;
+ }
+ if (r < 0) {
+ return r;
+ }
+ ldout(cct, 20) << "reshard completion identified, new_bucket_id=" << new_bucket_id << dendl;
+ i = 0; /* resharding is finished, make sure we can retry */
+
+ obj = *pobj;
+ obj.bucket.update_bucket_id(new_bucket_id);
+ pobj = &obj;
+ }
+
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+}
+
+int RGWRados::block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id)
+{
+ std::shared_ptr<RGWReshardWait> waiter = reshard_wait;
+
+ return waiter->block_while_resharding(bs, new_bucket_id);
+}
+
int RGWRados::bucket_index_link_olh(const RGWBucketInfo& bucket_info, RGWObjState& olh_state, const rgw_obj& obj_instance,
bool delete_marker,
const string& op_tag,
struct rgw_bucket_dir_entry_meta *meta,
uint64_t olh_epoch,
- real_time unmod_since, bool high_precision_time)
+ real_time unmod_since, bool high_precision_time, rgw_zone_set *_zones_trace)
{
rgw_rados_ref ref;
int r = get_obj_head_ref(bucket_info, obj_instance, &ref);
return r;
}
- BucketShard bs(this);
- int ret = bs.init(obj_instance.bucket, obj_instance);
- if (ret < 0) {
- ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
- return ret;
+ rgw_zone_set zones_trace;
+ if (_zones_trace) {
+ zones_trace = *_zones_trace;
+ } else {
+ zones_trace.insert(get_zone().id);
}
+ BucketShard bs(this);
+
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance);
- ret = cls_rgw_bucket_link_olh(bs.index_ctx, bs.bucket_obj, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch,
- unmod_since, high_precision_time,
- get_zone().log_data);
- if (ret < 0) {
- return ret;
+ r = guard_reshard(&bs, obj_instance, [&](BucketShard *bs) -> int {
+ librados::ObjectWriteOperation op;
+ cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+ return cls_rgw_bucket_link_olh(bs->index_ctx, op,
+ bs->bucket_obj, key, olh_state.olh_tag, delete_marker, op_tag, meta, olh_epoch,
+ unmod_since, high_precision_time,
+ get_zone().log_data, zones_trace);
+ });
+ if (r < 0) {
+ ldout(cct, 20) << "cls_rgw_bucket_link_olh() returned r=" << r << dendl;
+ return r;
}
return 0;
}
int RGWRados::bucket_index_unlink_instance(const RGWBucketInfo& bucket_info, const rgw_obj& obj_instance,
- const string& op_tag, const string& olh_tag, uint64_t olh_epoch)
+ const string& op_tag, const string& olh_tag, uint64_t olh_epoch, rgw_zone_set *_zones_trace)
{
rgw_rados_ref ref;
int r = get_obj_head_ref(bucket_info, obj_instance, &ref);
return r;
}
- BucketShard bs(this);
- int ret = bs.init(obj_instance.bucket, obj_instance);
- if (ret < 0) {
- ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
- return ret;
+ rgw_zone_set zones_trace;
+ if (_zones_trace) {
+ zones_trace = *_zones_trace;
}
+ zones_trace.insert(get_zone().id);
+
+ BucketShard bs(this);
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), obj_instance.key.instance);
- ret = cls_rgw_bucket_unlink_instance(bs.index_ctx, bs.bucket_obj, key, op_tag, olh_tag, olh_epoch, get_zone().log_data);
- if (ret < 0) {
- return ret;
+ r = guard_reshard(&bs, obj_instance, [&](BucketShard *bs) -> int {
+ librados::ObjectWriteOperation op;
+ cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+ return cls_rgw_bucket_unlink_instance(bs->index_ctx, op, bs->bucket_obj, key, op_tag,
+ olh_tag, olh_epoch, get_zone().log_data, zones_trace);
+ });
+ if (r < 0) {
+ ldout(cct, 20) << "cls_rgw_bucket_link_olh() returned r=" << r << dendl;
+ return r;
}
return 0;
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), string());
- ObjectReadOperation op;
-
- ret = cls_rgw_get_olh_log(bs.index_ctx, bs.bucket_obj, op, key, ver_marker, olh_tag, log, is_truncated);
- if (ret < 0)
+ ret = guard_reshard(&bs, obj_instance, [&](BucketShard *bs) -> int {
+ ObjectReadOperation op;
+ cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+ return cls_rgw_get_olh_log(bs->index_ctx, bs->bucket_obj, op,
+ key, ver_marker, olh_tag, log, is_truncated);
+ });
+ if (ret < 0) {
+ ldout(cct, 20) << "cls_rgw_get_olh_log() returned r=" << r << dendl;
return ret;
+ }
return 0;
}
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), string());
- ObjectWriteOperation op;
-
- cls_rgw_trim_olh_log(op, key, ver, olh_tag);
-
- ret = bs.index_ctx.operate(bs.bucket_obj, &op);
- if (ret < 0)
+ ret = guard_reshard(&bs, obj_instance, [&](BucketShard *pbs) -> int {
+ ObjectWriteOperation op;
+ cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+ cls_rgw_trim_olh_log(op, key, ver, olh_tag);
+ return pbs->index_ctx.operate(pbs->bucket_obj, &op);
+ });
+ if (ret < 0) {
+ ldout(cct, 20) << "cls_rgw_trim_olh_log() returned r=" << ret << dendl;
return ret;
+ }
return 0;
}
}
BucketShard bs(this);
- int ret = bs.init(obj_instance.bucket, obj_instance);
- if (ret < 0) {
- ldout(cct, 5) << "bs.init() returned ret=" << ret << dendl;
- return ret;
- }
string olh_tag(state.olh_tag.c_str(), state.olh_tag.length());
cls_rgw_obj_key key(obj_instance.key.get_index_key_name(), string());
- ret = cls_rgw_clear_olh(bs.index_ctx, bs.bucket_obj, key, olh_tag);
+ int ret = guard_reshard(&bs, obj_instance, [&](BucketShard *pbs) -> int {
+ ObjectWriteOperation op;
+ cls_rgw_guard_bucket_resharding(op, -ERR_BUSY_RESHARDING);
+ return cls_rgw_clear_olh(pbs->index_ctx, op, pbs->bucket_obj, key, olh_tag);
+ });
if (ret < 0) {
ldout(cct, 5) << "cls_rgw_clear_olh() returned ret=" << ret << dendl;
return ret;
int RGWRados::apply_olh_log(RGWObjectCtx& obj_ctx, RGWObjState& state, const RGWBucketInfo& bucket_info, const rgw_obj& obj,
bufferlist& olh_tag, map<uint64_t, vector<rgw_bucket_olh_log_entry> >& log,
- uint64_t *plast_ver)
+ uint64_t *plast_ver, rgw_zone_set* zones_trace)
{
if (log.empty()) {
return 0;
liter != remove_instances.end(); ++liter) {
cls_rgw_obj_key& key = *liter;
rgw_obj obj_instance(bucket, key);
- int ret = delete_obj(obj_ctx, bucket_info, obj_instance, 0, RGW_BILOG_FLAG_VERSIONED_OP);
+ int ret = delete_obj(obj_ctx, bucket_info, obj_instance, 0, RGW_BILOG_FLAG_VERSIONED_OP, ceph::real_time(), zones_trace);
if (ret < 0 && ret != -ENOENT) {
ldout(cct, 0) << "ERROR: delete_obj() returned " << ret << " obj_instance=" << obj_instance << dendl;
return ret;
/*
* read olh log and apply it
*/
-int RGWRados::update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const RGWBucketInfo& bucket_info, const rgw_obj& obj)
+int RGWRados::update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_zone_set *zones_trace)
{
map<uint64_t, vector<rgw_bucket_olh_log_entry> > log;
bool is_truncated;
if (ret < 0) {
return ret;
}
- ret = apply_olh_log(obj_ctx, *state, bucket_info, obj, state->olh_tag, log, &ver_marker);
+ ret = apply_olh_log(obj_ctx, *state, bucket_info, obj, state->olh_tag, log, &ver_marker, zones_trace);
if (ret < 0) {
return ret;
}
}
int RGWRados::set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
- uint64_t olh_epoch, real_time unmod_since, bool high_precision_time)
+ uint64_t olh_epoch, real_time unmod_since, bool high_precision_time, rgw_zone_set *zones_trace)
{
string op_tag;
int ret = 0;
int i;
-
+
#define MAX_ECANCELED_RETRY 100
for (i = 0; i < MAX_ECANCELED_RETRY; i++) {
if (ret == -ECANCELED) {
}
return ret;
}
- ret = bucket_index_link_olh(bucket_info, *state, target_obj, delete_marker, op_tag, meta, olh_epoch, unmod_since, high_precision_time);
+ ret = bucket_index_link_olh(bucket_info, *state, target_obj, delete_marker, op_tag, meta, olh_epoch, unmod_since, high_precision_time, zones_trace);
if (ret < 0) {
ldout(cct, 20) << "bucket_index_link_olh() target_obj=" << target_obj << " delete_marker=" << (int)delete_marker << " returned " << ret << dendl;
if (ret == -ECANCELED) {
}
int RGWRados::unlink_obj_instance(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj,
- uint64_t olh_epoch)
+ uint64_t olh_epoch, rgw_zone_set *zones_trace)
{
string op_tag;
string olh_tag(state->olh_tag.c_str(), state->olh_tag.length());
- ret = bucket_index_unlink_instance(bucket_info, target_obj, op_tag, olh_tag, olh_epoch);
+ ret = bucket_index_unlink_instance(bucket_info, target_obj, op_tag, olh_tag, olh_epoch, zones_trace);
if (ret < 0) {
ldout(cct, 20) << "bucket_index_unlink_instance() target_obj=" << target_obj << " returned " << ret << dendl;
if (ret == -ECANCELED) {
return -EIO;
}
- ret = update_olh(obj_ctx, state, bucket_info, olh_obj);
+ ret = update_olh(obj_ctx, state, bucket_info, olh_obj, zones_trace);
if (ret == -ECANCELED) { /* already did what we needed, no need to retry, raced with another user */
return 0;
}
}
int RGWRados::get_bucket_stats(RGWBucketInfo& bucket_info, int shard_id, string *bucket_ver, string *master_ver,
- map<RGWObjCategory, RGWStorageStats>& stats, string *max_marker)
+ map<RGWObjCategory, RGWStorageStats>& stats, string *max_marker, bool *syncstopped)
{
map<string, rgw_bucket_dir_header> headers;
map<int, string> bucket_instance_ids;
} else {
marker_mgr.add(viter->first, iter->second.max_marker);
}
+ if (syncstopped != NULL)
+ *syncstopped = iter->second.syncstopped;
}
ver_mgr.to_string(bucket_ver);
master_ver_mgr.to_string(master_ver);
int RGWRados::get_bucket_stats_async(RGWBucketInfo& bucket_info, int shard_id, RGWGetBucketStats_CB *ctx)
{
int num_aio = 0;
- RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx, bucket_info.num_shards);
+ RGWGetBucketStatsContext *get_ctx = new RGWGetBucketStatsContext(ctx, bucket_info.num_shards ? : 1);
assert(get_ctx);
int r = cls_bucket_head_async(bucket_info, shard_id, get_ctx, &num_aio);
- get_ctx->put();
if (r < 0) {
ctx->put();
if (num_aio) {
get_ctx->unset_cb();
}
}
+ get_ctx->put();
return r;
}
return get_bucket_instance_from_oid(obj_ctx, oid, info, pmtime, pattrs);
}
-int RGWRados::get_bucket_instance_from_oid(RGWObjectCtx& obj_ctx, string& oid, RGWBucketInfo& info,
+int RGWRados::get_bucket_instance_from_oid(RGWObjectCtx& obj_ctx, const string& oid, RGWBucketInfo& info,
real_time *pmtime, map<string, bufferlist> *pattrs,
rgw_cache_entry_info *cache_info)
{
{
librados::IoCtx index_ctx;
map<int, string> bucket_objs;
+
+ BucketIndexShardsManager start_marker_mgr;
+ BucketIndexShardsManager end_marker_mgr;
+
int r = open_bucket_index(bucket_info, index_ctx, bucket_objs, shard_id);
- if (r < 0)
+ if (r < 0) {
return r;
+ }
- BucketIndexShardsManager start_marker_mgr;
r = start_marker_mgr.from_string(start_marker, shard_id);
- if (r < 0)
+ if (r < 0) {
return r;
- BucketIndexShardsManager end_marker_mgr;
+ }
+
r = end_marker_mgr.from_string(end_marker, shard_id);
- if (r < 0)
+ if (r < 0) {
return r;
+ }
return CLSRGWIssueBILogTrim(index_ctx, start_marker_mgr, end_marker_mgr, bucket_objs,
- cct->_conf->rgw_bucket_index_max_aio)();
+ cct->_conf->rgw_bucket_index_max_aio)();
+
+ return r;
+}
+
+int RGWRados::resync_bi_log_entries(RGWBucketInfo& bucket_info, int shard_id)
+{
+ librados::IoCtx index_ctx;
+ map<int, string> bucket_objs;
+ int r = open_bucket_index(bucket_info, index_ctx, bucket_objs, shard_id);
+ if (r < 0)
+ return r;
+
+ return CLSRGWIssueResyncBucketBILog(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
+}
+
+int RGWRados::stop_bi_log_entries(RGWBucketInfo& bucket_info, int shard_id)
+{
+ librados::IoCtx index_ctx;
+ map<int, string> bucket_objs;
+ int r = open_bucket_index(bucket_info, index_ctx, bucket_objs, shard_id);
+ if (r < 0)
+ return r;
+
+ return CLSRGWIssueBucketBILogStop(index_ctx, bucket_objs, cct->_conf->rgw_bucket_index_max_aio)();
}
int RGWRados::bi_get_instance(const RGWBucketInfo& bucket_info, rgw_obj& obj, rgw_bucket_dir_entry *dirent)
}
ret = cls_rgw_bi_list(bs.index_ctx, bs.bucket_obj, obj_name, marker, max, entries, is_truncated);
+ if (ret == -ENOENT) {
+ *is_truncated = false;
+ }
if (ret < 0)
return ret;
}
int RGWRados::cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag,
- rgw_obj& obj, uint16_t bilog_flags)
+ rgw_obj& obj, uint16_t bilog_flags, rgw_zone_set *_zones_trace)
{
+ rgw_zone_set zones_trace;
+ if (_zones_trace) {
+ zones_trace = *_zones_trace;
+ }
+ else {
+ zones_trace.insert(get_zone().id);
+ }
+
ObjectWriteOperation o;
cls_rgw_obj_key key(obj.key.get_index_key_name(), obj.key.instance);
- cls_rgw_bucket_prepare_op(o, op, tag, key, obj.key.get_loc(), get_zone().log_data, bilog_flags);
+ cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
+ cls_rgw_bucket_prepare_op(o, op, tag, key, obj.key.get_loc(), get_zone().log_data, bilog_flags, zones_trace);
return bs.index_ctx.operate(bs.bucket_obj, &o);
}
-int RGWRados::cls_obj_complete_op(BucketShard& bs, RGWModifyOp op, string& tag,
+int RGWRados::cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModifyOp op, string& tag,
int64_t pool, uint64_t epoch,
rgw_bucket_dir_entry& ent, RGWObjCategory category,
- list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags)
+ list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *_zones_trace)
{
- list<cls_rgw_obj_key> *pro = NULL;
- list<cls_rgw_obj_key> ro;
-
- if (remove_objs) {
- for (auto iter = remove_objs->begin(); iter != remove_objs->end(); ++iter) {
- ro.push_back(*iter);
- }
- pro = &ro;
- }
-
ObjectWriteOperation o;
rgw_bucket_dir_entry_meta dir_meta;
dir_meta = ent.meta;
ver.pool = pool;
ver.epoch = epoch;
cls_rgw_obj_key key(ent.key.name, ent.key.instance);
- cls_rgw_bucket_complete_op(o, op, tag, ver, key, dir_meta, pro,
- get_zone().log_data, bilog_flags);
-
- AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL);
- int ret = bs.index_ctx.aio_operate(bs.bucket_obj, c, &o);
- c->release();
+ cls_rgw_guard_bucket_resharding(o, -ERR_BUSY_RESHARDING);
+ cls_rgw_bucket_complete_op(o, op, tag, ver, key, dir_meta, remove_objs,
+ get_zone().log_data, bilog_flags, _zones_trace);
+ complete_op_data *arg;
+ index_completion_manager->create_completion(obj, op, tag, ver, key, dir_meta, remove_objs,
+ get_zone().log_data, bilog_flags, _zones_trace, &arg);
+ librados::AioCompletion *completion = arg->rados_completion;
+ int ret = bs.index_ctx.aio_operate(bs.bucket_obj, arg->rados_completion, &o);
+ completion->release(); /* can't reference arg here, as it might have already been released */
return ret;
}
-int RGWRados::cls_obj_complete_add(BucketShard& bs, string& tag,
+int RGWRados::cls_obj_complete_add(BucketShard& bs, const rgw_obj& obj, string& tag,
int64_t pool, uint64_t epoch,
rgw_bucket_dir_entry& ent, RGWObjCategory category,
- list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags)
+ list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *zones_trace)
{
- return cls_obj_complete_op(bs, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_objs, bilog_flags);
+ return cls_obj_complete_op(bs, obj, CLS_RGW_OP_ADD, tag, pool, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
}
int RGWRados::cls_obj_complete_del(BucketShard& bs, string& tag,
rgw_obj& obj,
real_time& removed_mtime,
list<rgw_obj_index_key> *remove_objs,
- uint16_t bilog_flags)
+ uint16_t bilog_flags,
+ rgw_zone_set *zones_trace)
{
rgw_bucket_dir_entry ent;
ent.meta.mtime = removed_mtime;
obj.key.get_index_key(&ent.key);
- return cls_obj_complete_op(bs, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, remove_objs, bilog_flags);
+ return cls_obj_complete_op(bs, obj, CLS_RGW_OP_DEL, tag, pool, epoch, ent, RGW_OBJ_CATEGORY_NONE, remove_objs, bilog_flags, zones_trace);
}
-int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, rgw_obj& obj, uint16_t bilog_flags)
+int RGWRados::cls_obj_complete_cancel(BucketShard& bs, string& tag, rgw_obj& obj, uint16_t bilog_flags, rgw_zone_set *zones_trace)
{
rgw_bucket_dir_entry ent;
obj.key.get_index_key(&ent.key);
- return cls_obj_complete_op(bs, CLS_RGW_OP_CANCEL, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL, bilog_flags);
+ return cls_obj_complete_op(bs, obj, CLS_RGW_OP_CANCEL, tag, -1 /* pool id */, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL, bilog_flags, zones_trace);
}
int RGWRados::cls_obj_set_bucket_tag_timeout(RGWBucketInfo& bucket_info, uint64_t timeout)
rgw_raw_obj obj(get_zone_params().usage_log_pool, oid);
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_raw_obj_ref(obj, &ref, &pool);
+ int r = get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
rgw_raw_obj obj(get_zone_params().usage_log_pool, oid);
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_raw_obj_ref(obj, &ref, &pool);
+ int r = get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
rgw_raw_obj obj(get_zone_params().usage_log_pool, oid);
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_raw_obj_ref(obj, &ref, &pool);
+ int r = get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
rgw_raw_obj obj(get_zone_params().user_uid_pool, buckets_obj_id);
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_raw_obj_ref(obj, &ref, &pool);
+ int r = get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
rgw_raw_obj obj(get_zone_params().user_uid_pool, buckets_obj_id);
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_raw_obj_ref(obj, &ref, &pool);
+ int r = get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
bucket_info.bucket.convert(&entry.bucket);
- map<string, struct rgw_bucket_dir_header>::iterator hiter = headers.begin();
- for (; hiter != headers.end(); ++hiter) {
- map<uint8_t, struct rgw_bucket_category_stats>::iterator iter = hiter->second.stats.begin();
- for (; iter != hiter->second.stats.end(); ++iter) {
- struct rgw_bucket_category_stats& header_stats = iter->second;
+ for (const auto& hiter : headers) {
+ for (const auto& iter : hiter.second.stats) {
+ const struct rgw_bucket_category_stats& header_stats = iter.second;
entry.size += header_stats.total_size;
entry.size_rounded += header_stats.total_size_rounded;
entry.count += header_stats.num_entries;
return 0;
}
+int RGWRados::cls_user_get_bucket_stats(const rgw_bucket& bucket, cls_user_bucket_entry& entry)
+{
+ map<string, struct rgw_bucket_dir_header> headers;
+ RGWBucketInfo bucket_info;
+ RGWObjectCtx obj_ctx(this);
+ int ret = get_bucket_instance_info(obj_ctx, bucket, bucket_info, NULL, NULL);
+ if (ret < 0) {
+ return ret;
+ }
+
+ ret = cls_bucket_head(bucket_info, RGW_NO_SHARD, headers);
+ if (ret < 0) {
+ ldout(cct, 20) << "cls_bucket_header() returned " << ret << dendl;
+ return ret;
+ }
+
+ bucket.convert(&entry.bucket);
+
+ for (const auto& hiter : headers) {
+ for (const auto& iter : hiter.second.stats) {
+ const struct rgw_bucket_category_stats& header_stats = iter.second;
+ entry.size += header_stats.total_size;
+ entry.size_rounded += header_stats.total_size_rounded;
+ entry.count += header_stats.num_entries;
+ }
+ }
+
+ return 0;
+}
+
int RGWRados::cls_user_list_buckets(rgw_raw_obj& obj,
const string& in_marker,
const string& end_marker,
bool * const truncated)
{
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_raw_obj_ref(obj, &ref, &pool);
+ int r = get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
int RGWRados::cls_user_update_buckets(rgw_raw_obj& obj, list<cls_user_bucket_entry>& entries, bool add)
{
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_raw_obj_ref(obj, &ref, &pool);
+ int r = get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
int RGWRados::cls_user_complete_stats_sync(rgw_raw_obj& obj)
{
rgw_rados_ref ref;
- rgw_pool pool;
- int r = get_raw_obj_ref(obj, &ref, &pool);
+ int r = get_raw_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
int RGWRados::cls_user_remove_bucket(rgw_raw_obj& obj, const cls_user_bucket& bucket)
{
- rgw_pool p;
rgw_rados_ref ref;
- int r = get_system_obj_ref(obj, &ref, &p);
+ int r = get_system_obj_ref(obj, &ref);
if (r < 0) {
return r;
}
return 0;
}
+int RGWRados::check_bucket_shards(const RGWBucketInfo& bucket_info, const rgw_bucket& bucket,
+ RGWQuotaInfo& bucket_quota)
+{
+ if (!cct->_conf->rgw_dynamic_resharding) {
+ return 0;
+ }
+
+ bool need_resharding = false;
+ int num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
+ uint32_t suggested_num_shards;
+
+ int ret = quota_handler->check_bucket_shards((uint64_t)cct->_conf->rgw_max_objs_per_shard,
+ num_source_shards, bucket_info.owner, bucket, bucket_quota,
+ 1, need_resharding, &suggested_num_shards);
+ if (ret < 0) {
+ return ret;
+ }
+
+ if (need_resharding) {
+ ldout(cct, 20) << __func__ << " bucket " << bucket.name << " need resharding " <<
+ " old num shards " << bucket_info.num_shards << " new num shards " << suggested_num_shards <<
+ dendl;
+ return add_bucket_to_reshard(bucket_info, suggested_num_shards);
+ }
+
+ return ret;
+}
+
+int RGWRados::add_bucket_to_reshard(const RGWBucketInfo& bucket_info, uint32_t new_num_shards)
+{
+ RGWReshard reshard(this);
+
+ uint32_t num_source_shards = (bucket_info.num_shards > 0 ? bucket_info.num_shards : 1);
+
+ new_num_shards = min(new_num_shards, get_max_bucket_shards());
+ if (new_num_shards <= num_source_shards) {
+ ldout(cct, 20) << "not resharding bucket name=" << bucket_info.bucket.name << ", orig_num=" << num_source_shards << ", new_num_shards=" << new_num_shards << dendl;
+ return 0;
+ }
+
+ cls_rgw_reshard_entry entry;
+ entry.time = real_clock::now();
+ entry.tenant = bucket_info.owner.tenant;
+ entry.bucket_name = bucket_info.bucket.name;
+ entry.bucket_id = bucket_info.bucket.bucket_id;
+ entry.old_num_shards = num_source_shards;
+ entry.new_num_shards = new_num_shards;
+
+ return reshard.add(entry);
+}
+
int RGWRados::check_quota(const rgw_user& bucket_owner, rgw_bucket& bucket,
RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t obj_size)
{
} else {
uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
- sid = sid2 % MAX_BUCKET_INDEX_SHARDS_PRIME % bucket_info.num_shards;
+ sid = rgw_shards_mod(sid2, bucket_info.num_shards);
if (shard_id) {
*shard_id = (int)sid;
}
} else {
uint32_t sid = ceph_str_hash_linux(obj_key.c_str(), obj_key.size());
uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
- sid = sid2 % MAX_BUCKET_INDEX_SHARDS_PRIME % num_shards;
+ sid = rgw_shards_mod(sid2, num_shards);
char buf[bucket_oid_base.size() + 32];
snprintf(buf, sizeof(buf), "%s.%d", bucket_oid_base.c_str(), sid);
(*bucket_obj) = buf;
return ++max_bucket_id;
}
-RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread)
+RGWRados *RGWStoreManager::init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread)
{
int use_cache = cct->_conf->rgw_cache_enabled;
RGWRados *store = NULL;
store = new RGWCache<RGWRados>;
}
- if (store->initialize(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread) < 0) {
+ if (store->initialize(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread, run_reshard_thread) < 0) {
delete store;
return NULL;
}