#include "include/rados/librados.hpp"
#include "include/Context.h"
+#include "common/admin_socket.h"
#include "common/RefCountedObj.h"
#include "common/RWLock.h"
#include "common/ceph_time.h"
#include "rgw_meta_sync_status.h"
#include "rgw_period_puller.h"
#include "rgw_sync_module.h"
+#include "rgw_sync_log_trim.h"
class RGWWatcher;
class SafeTimer;
ceph::real_time mtime;
uint64_t epoch;
bufferlist obj_tag;
+ bufferlist tail_tag;
string write_tag;
bool fake_tag;
RGWObjManifest manifest;
if (rhs.obj_tag.length()) {
obj_tag = rhs.obj_tag;
}
+ if (rhs.tail_tag.length()) {
+ tail_tag = rhs.tail_tag;
+ }
write_tag = rhs.write_tag;
fake_tag = rhs.fake_tag;
if (rhs.has_manifest) {
bool is_read_only() { return read_only; }
- bool syncs_from(const string& zone_id) {
+ bool syncs_from(const string& zone_id) const {
return (sync_from_all || sync_from.find(zone_id) != sync_from.end());
}
};
int get_zonegroup(RGWZoneGroup& zonegroup,
const string& zonegroup_id);
- bool is_single_zonegroup()
+ bool is_single_zonegroup() const
{
return (period_map.zonegroups.size() == 1);
}
class RGWIndexCompletionManager;
-class RGWRados
+class RGWRados : public AdminSocketHook
{
friend class RGWGC;
friend class RGWMetaNotifier;
friend class BucketIndexLockGuard;
friend class RGWCompleteMultipart;
+ static const char* admin_commands[4][3];
+
/** Open the pool used as root for this gateway */
int open_root_pool_ctx();
int open_gc_pool_ctx();
RGWMetaSyncProcessorThread *meta_sync_processor_thread;
map<string, RGWDataSyncProcessorThread *> data_sync_processor_threads;
+ boost::optional<rgw::BucketTrimManager> bucket_trim;
RGWSyncLogTrimThread *sync_log_trimmer{nullptr};
Mutex meta_sync_thread_lock;
return rgw_shards_max();
}
+
int get_raw_obj_ref(const rgw_raw_obj& obj, rgw_rados_ref *ref);
+ int list_raw_objects_init(const rgw_pool& pool, const string& marker, RGWListRawObjsCtx *ctx);
+ int list_raw_objects_next(const string& prefix_filter, int max,
+ RGWListRawObjsCtx& ctx, list<string>& oids,
+ bool *is_truncated);
int list_raw_objects(const rgw_pool& pool, const string& prefix_filter, int max,
RGWListRawObjsCtx& ctx, list<string>& oids,
bool *is_truncated);
+ string list_raw_objs_get_cursor(RGWListRawObjsCtx& ctx);
int list_raw_prefixed_objs(const rgw_pool& pool, const string& prefix, list<string>& result);
int list_zonegroups(list<string>& zonegroups);
explicit Read(RGWRados::SystemObject *_source) : source(_source) {}
int stat(RGWObjVersionTracker *objv_tracker);
- int read(int64_t ofs, int64_t end, bufferlist& bl, RGWObjVersionTracker *objv_tracker);
+ int read(int64_t ofs, int64_t end, bufferlist& bl, RGWObjVersionTracker *objv_tracker,
+ boost::optional<obj_version> refresh_version = boost::none);
int get_attr(const char *name, bufferlist& dest);
};
};
explicit BucketShard(RGWRados *_store) : store(_store), shard_id(-1) {}
int init(const rgw_bucket& _bucket, const rgw_obj& obj);
int init(const rgw_bucket& _bucket, int sid);
+ int init(const RGWBucketInfo& bucket_info, int sid);
};
class Object {
void invalidate_state();
int prepare_atomic_modification(librados::ObjectWriteOperation& op, bool reset_obj, const string *ptag,
- const char *ifmatch, const char *ifnomatch, bool removal_op);
+ const char *ifmatch, const char *ifnomatch, bool removal_op, bool modify_tail);
int complete_atomic_modification();
public:
bool canceled;
const string *user_data;
rgw_zone_set *zones_trace;
+ bool modify_tail;
+ bool completeMultipart;
MetaParams() : mtime(NULL), rmattrs(NULL), data(NULL), manifest(NULL), ptag(NULL),
remove_objs(NULL), category(RGW_OBJ_CATEGORY_MAIN), flags(0),
- if_match(NULL), if_nomatch(NULL), olh_epoch(0), canceled(false), user_data(nullptr), zones_trace(nullptr) {}
+ if_match(NULL), if_nomatch(NULL), olh_epoch(0), canceled(false), user_data(nullptr), zones_trace(nullptr),
+ modify_tail(false), completeMultipart(false) {}
} meta;
explicit Write(RGWRados::Object *_target) : target(_target) {}
int _do_write_meta(uint64_t size, uint64_t accounted_size,
map<std::string, bufferlist>& attrs,
- bool assume_noent,
+ bool modify_tail, bool assume_noent,
void *index_op);
int write_meta(uint64_t size, uint64_t accounted_size,
map<std::string, bufferlist>& attrs);
RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
bufferlist& bl, off_t ofs, off_t end,
map<string, bufferlist> *attrs,
- rgw_cache_entry_info *cache_info);
+ rgw_cache_entry_info *cache_info,
+ boost::optional<obj_version> refresh_version =
+ boost::none);
virtual void register_chained_cache(RGWChainedCache *cache) {}
virtual bool chain_cache_entry(list<rgw_cache_entry_info *>& cache_info_entries, RGWChainedCache::Entry *chained_entry) { return false; }
int put_bucket_instance_info(RGWBucketInfo& info, bool exclusive, ceph::real_time mtime, map<string, bufferlist> *pattrs);
int get_bucket_entrypoint_info(RGWObjectCtx& obj_ctx, const string& tenant_name, const string& bucket_name,
RGWBucketEntryPoint& entry_point, RGWObjVersionTracker *objv_tracker,
- ceph::real_time *pmtime, map<string, bufferlist> *pattrs, rgw_cache_entry_info *cache_info = NULL);
+ ceph::real_time *pmtime, map<string, bufferlist> *pattrs, rgw_cache_entry_info *cache_info = NULL,
+ boost::optional<obj_version> refresh_version = boost::none);
int get_bucket_instance_info(RGWObjectCtx& obj_ctx, const string& meta_key, RGWBucketInfo& info, ceph::real_time *pmtime, map<string, bufferlist> *pattrs);
int get_bucket_instance_info(RGWObjectCtx& obj_ctx, const rgw_bucket& bucket, RGWBucketInfo& info, ceph::real_time *pmtime, map<string, bufferlist> *pattrs);
int get_bucket_instance_from_oid(RGWObjectCtx& obj_ctx, const string& oid, RGWBucketInfo& info, ceph::real_time *pmtime, map<string, bufferlist> *pattrs,
- rgw_cache_entry_info *cache_info = NULL);
+ rgw_cache_entry_info *cache_info = NULL,
+ boost::optional<obj_version> refresh_version = boost::none);
int convert_old_bucket_info(RGWObjectCtx& obj_ctx, const string& tenant_name, const string& bucket_name);
static void make_bucket_entry_name(const string& tenant_name, const string& bucket_name, string& bucket_entry);
+
+
+private:
+ int _get_bucket_info(RGWObjectCtx& obj_ctx, const string& tenant,
+ const string& bucket_name, RGWBucketInfo& info,
+ real_time *pmtime,
+ map<string, bufferlist> *pattrs,
+ boost::optional<obj_version> refresh_version);
+public:
+
+ bool call(std::string command, cmdmap_t& cmdmap, std::string format,
+ bufferlist& out) override final;
+
+ // Should really be protected, but some older GCCs don't handle
+ // access control properly with lambdas defined in member functions
+ // of child classes.
+
+ void cache_list_dump_helper(Formatter* f,
+ const std::string& name,
+ const ceph::real_time mtime,
+ const std::uint64_t size) {
+ f->dump_string("name", name);
+ f->dump_string("mtime", ceph::to_iso_8601(mtime));
+ f->dump_unsigned("size", size);
+ }
+
+protected:
+
+ // `call_list` must iterate over all cache entries and call
+ // `cache_list_dump_helper` with the supplied Formatter on any that
+ // include `filter` as a substring.
+ //
+ virtual void call_list(const boost::optional<std::string>& filter,
+ Formatter* format);
+ // `call_inspect` must look up the requested target and, if found,
+ // dump it to the supplied Formatter and return true. If not found,
+ // it must return false.
+ //
+ virtual bool call_inspect(const std::string& target, Formatter* format);
+
+ // `call_erase` must erase the requested target and return true. If
+ // the requested target does not exist, it should return false.
+ virtual bool call_erase(const std::string& target);
+
+ // `call_zap` must erase the cache.
+ virtual void call_zap();
+public:
+
int get_bucket_info(RGWObjectCtx& obj_ctx,
- const string& tenant_name, const string& bucket_name,
- RGWBucketInfo& info,
- ceph::real_time *pmtime, map<string, bufferlist> *pattrs = NULL);
+ const string& tenant_name, const string& bucket_name,
+ RGWBucketInfo& info,
+ ceph::real_time *pmtime, map<string, bufferlist> *pattrs = NULL);
+
+ // Returns true on successful refresh. Returns false if there was an
+ // error or the version stored on the OSD is the same as that
+ // presented in the BucketInfo structure.
+ //
+ int try_refresh_bucket_info(RGWBucketInfo& info,
+ ceph::real_time *pmtime,
+ map<string, bufferlist> *pattrs = nullptr);
+
int put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, ceph::real_time mtime, obj_version *pep_objv,
- map<string, bufferlist> *pattrs, bool create_entry_point);
+ map<string, bufferlist> *pattrs, bool create_entry_point);
int cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid);
int cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, rgw_obj& obj, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);
int fix_tail_obj_locator(const RGWBucketInfo& bucket_info, rgw_obj_key& key, bool fix, bool *need_fix);
int cls_user_get_header(const string& user_id, cls_user_header *header);
+ int cls_user_reset_stats(const string& user_id);
int cls_user_get_header_async(const string& user_id, RGWGetUserHeader_CB *ctx);
int cls_user_sync_bucket_stats(rgw_raw_obj& user_obj, const RGWBucketInfo& bucket_info);
int cls_user_list_buckets(rgw_raw_obj& obj,
(get_zonegroup().zones.size() > 1 || current_period.is_multi_zonegroups_with_zones());
}
+ bool can_reshard() const {
+ return current_period.get_id().empty() ||
+ (zonegroup.zones.size() == 1 && current_period.is_single_zonegroup());
+ }
+
librados::Rados* get_rados_handle();
int delete_raw_obj_aio(const rgw_raw_obj& obj, list<librados::AioCompletion *>& handles);
*/
int pool_iterate_begin(const rgw_pool& pool, RGWPoolIterCtx& ctx);
+ /**
+ * Init pool iteration
+ * pool: pool to use
+ * cursor: position to start iteration
+ * ctx: context object to use for the iteration
+ * Returns: 0 on success, -ERR# otherwise.
+ */
+ int pool_iterate_begin(const rgw_pool& pool, const string& cursor, RGWPoolIterCtx& ctx);
+
+ /**
+ * Get pool iteration position
+ * ctx: context object to use for the iteration
+ * Returns: string representation of position
+ */
+ string pool_iterate_get_cursor(RGWPoolIterCtx& ctx);
+
/**
* Iterate over pool return object names, use optional filter
* ctx: iteration context, initialized with pool_iterate_begin()
class RGWStoreManager {
public:
RGWStoreManager() {}
- static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread) {
+ static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads,
+ bool run_sync_thread, bool run_reshard_thread, bool use_cache = true) {
RGWRados *store = init_storage_provider(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread,
- run_reshard_thread);
+ run_reshard_thread, use_cache);
return store;
}
static RGWRados *get_raw_storage(CephContext *cct) {
RGWRados *store = init_raw_storage_provider(cct);
return store;
}
- static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread);
+ static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread, bool use_metadata_cache);
static RGWRados *init_raw_storage_provider(CephContext *cct);
static void close_storage(RGWRados *store);
template <class T>
class RGWChainedCacheImpl : public RGWChainedCache {
+ ceph::timespan expiry;
RWLock lock;
- map<string, T> entries;
+ map<string, std::pair<T, ceph::coarse_mono_time>> entries;
public:
RGWChainedCacheImpl() : lock("RGWChainedCacheImpl::lock") {}
void init(RGWRados *store) {
store->register_chained_cache(this);
+ expiry = std::chrono::seconds(store->ctx()->_conf->get_val<uint64_t>(
+ "rgw_cache_expiry_interval"));
}
bool find(const string& key, T *entry) {
RWLock::RLocker rl(lock);
- typename map<string, T>::iterator iter = entries.find(key);
+ auto iter = entries.find(key);
if (iter == entries.end()) {
return false;
}
+ if (expiry.count() &&
+ (ceph::coarse_mono_clock::now() - iter->second.second) > expiry) {
+ return false;
+ }
- *entry = iter->second;
+ *entry = iter->second.first;
return true;
}
void chain_cb(const string& key, void *data) override {
T *entry = static_cast<T *>(data);
RWLock::WLocker wl(lock);
- entries[key] = *entry;
+ entries[key].first = *entry;
+ if (expiry.count() > 0) {
+ entries[key].second = ceph::coarse_mono_clock::now();
+ }
}
void invalidate(const string& key) override {