#include "include/rados/librados.hpp"
#include "include/Context.h"
+#include "common/admin_socket.h"
#include "common/RefCountedObj.h"
#include "common/RWLock.h"
#include "common/ceph_time.h"
#include "common/lru_map.h"
+#include "common/ceph_json.h"
#include "rgw_common.h"
#include "cls/rgw/cls_rgw_types.h"
#include "cls/version/cls_version_types.h"
#include "cls/log/cls_log_types.h"
-#include "cls/statelog/cls_statelog_types.h"
#include "cls/timeindex/cls_timeindex_types.h"
+#include "cls/otp/cls_otp_types.h"
#include "rgw_log.h"
#include "rgw_metadata.h"
#include "rgw_meta_sync_status.h"
#include "rgw_period_puller.h"
#include "rgw_sync_module.h"
+#include "rgw_sync_log_trim.h"
+#include "rgw_service.h"
+
+#include "services/svc_rados.h"
+#include "services/svc_zone.h"
class RGWWatcher;
class SafeTimer;
class RGWMetaSyncProcessorThread;
class RGWDataSyncProcessorThread;
class RGWSyncLogTrimThread;
-class RGWRESTConn;
+class RGWSyncTraceManager;
struct RGWZoneGroup;
struct RGWZoneParams;
class RGWReshard;
class RGWReshardWait;
+class RGWSysObjectCtx;
+
/* flags for put_obj_meta() */
#define PUT_OBJ_CREATE 0x01
#define PUT_OBJ_EXCL 0x02
#define RGW_SHARDS_PRIME_0 7877
#define RGW_SHARDS_PRIME_1 65521
+extern const std::string MP_META_SUFFIX;
+
+// only called by rgw_shard_id and rgw_bucket_shard_index
static inline int rgw_shards_mod(unsigned hval, int max_shards)
{
if (max_shards <= RGW_SHARDS_PRIME_0) {
return hval % RGW_SHARDS_PRIME_1 % max_shards;
}
-static inline int rgw_shards_hash(const string& key, int max_shards)
+// used for logging and tagging
+static inline int rgw_shard_id(const string& key, int max_shards)
{
- return rgw_shards_mod(ceph_str_hash_linux(key.c_str(), key.size()), max_shards);
+ return rgw_shards_mod(ceph_str_hash_linux(key.c_str(), key.size()),
+ max_shards);
+}
+
+// used for bucket indices
+static inline uint32_t rgw_bucket_shard_index(const std::string& key,
+ int num_shards) {
+ uint32_t sid = ceph_str_hash_linux(key.c_str(), key.size());
+ uint32_t sid2 = sid ^ ((sid & 0xFF) << 24);
+ return rgw_shards_mod(sid2, num_shards);
}
static inline int rgw_shards_max()
}
}
-int rgw_init_ioctx(librados::Rados *rados, const rgw_pool& pool, librados::IoCtx& ioctx, bool create = false);
-
int rgw_policy_from_attrset(CephContext *cct, map<string, bufferlist>& attrset, RGWAccessControlPolicy *policy);
static inline bool rgw_raw_obj_to_obj(const rgw_bucket& bucket, const rgw_raw_obj& raw_obj, rgw_obj *obj)
return true;
}
+
struct rgw_bucket_placement {
- string placement_rule;
+ rgw_placement_rule placement_rule;
rgw_bucket bucket;
void dump(Formatter *f) const;
};
class rgw_obj_select {
- string placement_rule;
+ rgw_placement_rule placement_rule;
rgw_obj obj;
rgw_raw_obj raw_obj;
bool is_raw;
public:
rgw_obj_select() : is_raw(false) {}
- rgw_obj_select(const rgw_obj& _obj) : obj(_obj), is_raw(false) {}
- rgw_obj_select(const rgw_raw_obj& _raw_obj) : raw_obj(_raw_obj), is_raw(true) {}
+ explicit rgw_obj_select(const rgw_obj& _obj) : obj(_obj), is_raw(false) {}
+ explicit rgw_obj_select(const rgw_raw_obj& _raw_obj) : raw_obj(_raw_obj), is_raw(true) {}
rgw_obj_select(const rgw_obj_select& rhs) {
+ placement_rule = rhs.placement_rule;
is_raw = rhs.is_raw;
if (is_raw) {
raw_obj = rhs.raw_obj;
return *this;
}
- void set_placement_rule(const string& rule) {
+ void set_placement_rule(const rgw_placement_rule& rule) {
placement_rule = rule;
}
+ void dump(Formatter *f) const;
};
struct compression_block {
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
- ::encode(old_ofs, bl);
- ::encode(new_ofs, bl);
- ::encode(len, bl);
+ encode(old_ofs, bl);
+ encode(new_ofs, bl);
+ encode(len, bl);
ENCODE_FINISH(bl);
}
- void decode(bufferlist::iterator& bl) {
+ void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
- ::decode(old_ofs, bl);
- ::decode(new_ofs, bl);
- ::decode(len, bl);
+ decode(old_ofs, bl);
+ decode(new_ofs, bl);
+ decode(len, bl);
DECODE_FINISH(bl);
}
+ void dump(Formatter *f) const;
};
WRITE_CLASS_ENCODER(compression_block)
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
- ::encode(compression_type, bl);
- ::encode(orig_size, bl);
- ::encode(blocks, bl);
+ encode(compression_type, bl);
+ encode(orig_size, bl);
+ encode(blocks, bl);
ENCODE_FINISH(bl);
}
- void decode(bufferlist::iterator& bl) {
+ void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
- ::decode(compression_type, bl);
- ::decode(orig_size, bl);
- ::decode(blocks, bl);
+ decode(compression_type, bl);
+ decode(orig_size, bl);
+ decode(blocks, bl);
DECODE_FINISH(bl);
- }
+ }
+ void dump(Formatter *f) const;
};
WRITE_CLASS_ENCODER(RGWCompressionInfo)
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
- ::encode(target, bl);
- ::encode(removed, bl);
+ encode(target, bl);
+ encode(removed, bl);
ENCODE_FINISH(bl);
}
- void decode(bufferlist::iterator& bl) {
+ void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
- ::decode(target, bl);
- ::decode(removed, bl);
+ decode(target, bl);
+ decode(removed, bl);
DECODE_FINISH(bl);
}
static void generate_test_instances(list<RGWOLHInfo*>& o);
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
- ::encode(time, bl);
+ encode(time, bl);
ENCODE_FINISH(bl);
}
- void decode(bufferlist::iterator& bl) {
+ void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
- ::decode(time, bl);
+ decode(time, bl);
DECODE_FINISH(bl);
}
};
class RGWGetDataCB {
-protected:
- uint64_t extra_data_len;
public:
virtual int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) = 0;
- RGWGetDataCB() : extra_data_len(0) {}
+ RGWGetDataCB() {}
virtual ~RGWGetDataCB() {}
- virtual void set_extra_data_len(uint64_t len) {
- extra_data_len = len;
- }
- /**
- * Flushes any cached data. Used by RGWGetObjFilter.
- * Return logic same as handle_data.
- */
- virtual int flush() {
- return 0;
- }
- /**
- * Allows to extend fetch range of RGW object. Used by RGWGetObjFilter.
- */
- virtual int fixup_range(off_t& bl_ofs, off_t& bl_end) {
- return 0;
- }
-};
-
-class RGWAccessListFilter {
-public:
- virtual ~RGWAccessListFilter() {}
- virtual bool filter(string& name, string& key) = 0;
};
struct RGWCloneRangeInfo {
void encode(bufferlist& bl) const {
ENCODE_START(2, 2, bl);
- ::encode(loc, bl);
- ::encode(loc_ofs, bl);
- ::encode(size, bl);
+ encode(loc, bl);
+ encode(loc_ofs, bl);
+ encode(size, bl);
ENCODE_FINISH(bl);
}
- void decode(bufferlist::iterator& bl) {
+ void decode(bufferlist::const_iterator& bl) {
DECODE_START_LEGACY_COMPAT_LEN_32(2, 2, 2, bl);
- ::decode(loc, bl);
- ::decode(loc_ofs, bl);
- ::decode(size, bl);
+ decode(loc, bl);
+ decode(loc_ofs, bl);
+ decode(size, bl);
DECODE_FINISH(bl);
}
void encode(bufferlist& bl) const {
ENCODE_START(2, 1, bl);
- ::encode(start_part_num, bl);
- ::encode(start_ofs, bl);
- ::encode(part_size, bl);
- ::encode(stripe_max_size, bl);
- ::encode(override_prefix, bl);
+ encode(start_part_num, bl);
+ encode(start_ofs, bl);
+ encode(part_size, bl);
+ encode(stripe_max_size, bl);
+ encode(override_prefix, bl);
ENCODE_FINISH(bl);
}
- void decode(bufferlist::iterator& bl) {
+ void decode(bufferlist::const_iterator& bl) {
DECODE_START(2, bl);
- ::decode(start_part_num, bl);
- ::decode(start_ofs, bl);
- ::decode(part_size, bl);
- ::decode(stripe_max_size, bl);
+ decode(start_part_num, bl);
+ decode(start_ofs, bl);
+ decode(part_size, bl);
+ decode(stripe_max_size, bl);
if (struct_v >= 2)
- ::decode(override_prefix, bl);
+ decode(override_prefix, bl);
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
rgw_obj obj;
uint64_t head_size;
- string head_placement_rule;
+ rgw_placement_rule head_placement_rule;
uint64_t max_head_size;
string prefix;
void encode(bufferlist& bl) const {
ENCODE_START(7, 6, bl);
- ::encode(obj_size, bl);
- ::encode(objs, bl);
- ::encode(explicit_objs, bl);
- ::encode(obj, bl);
- ::encode(head_size, bl);
- ::encode(max_head_size, bl);
- ::encode(prefix, bl);
- ::encode(rules, bl);
+ encode(obj_size, bl);
+ encode(objs, bl);
+ encode(explicit_objs, bl);
+ encode(obj, bl);
+ encode(head_size, bl);
+ encode(max_head_size, bl);
+ encode(prefix, bl);
+ encode(rules, bl);
bool encode_tail_bucket = !(tail_placement.bucket == obj.bucket);
- ::encode(encode_tail_bucket, bl);
+ encode(encode_tail_bucket, bl);
if (encode_tail_bucket) {
- ::encode(tail_placement.bucket, bl);
+ encode(tail_placement.bucket, bl);
}
bool encode_tail_instance = (tail_instance != obj.key.instance);
- ::encode(encode_tail_instance, bl);
+ encode(encode_tail_instance, bl);
if (encode_tail_instance) {
- ::encode(tail_instance, bl);
+ encode(tail_instance, bl);
}
- ::encode(head_placement_rule, bl);
- ::encode(tail_placement.placement_rule, bl);
+ encode(head_placement_rule, bl);
+ encode(tail_placement.placement_rule, bl);
ENCODE_FINISH(bl);
}
- void decode(bufferlist::iterator& bl) {
+ void decode(bufferlist::const_iterator& bl) {
DECODE_START_LEGACY_COMPAT_LEN_32(7, 2, 2, bl);
- ::decode(obj_size, bl);
- ::decode(objs, bl);
+ decode(obj_size, bl);
+ decode(objs, bl);
if (struct_v >= 3) {
- ::decode(explicit_objs, bl);
- ::decode(obj, bl);
- ::decode(head_size, bl);
- ::decode(max_head_size, bl);
- ::decode(prefix, bl);
- ::decode(rules, bl);
+ decode(explicit_objs, bl);
+ decode(obj, bl);
+ decode(head_size, bl);
+ decode(max_head_size, bl);
+ decode(prefix, bl);
+ decode(rules, bl);
} else {
explicit_objs = true;
if (!objs.empty()) {
if (struct_v >= 4) {
if (struct_v < 6) {
- ::decode(tail_placement.bucket, bl);
+ decode(tail_placement.bucket, bl);
} else {
bool need_to_decode;
- ::decode(need_to_decode, bl);
+ decode(need_to_decode, bl);
if (need_to_decode) {
- ::decode(tail_placement.bucket, bl);
+ decode(tail_placement.bucket, bl);
} else {
tail_placement.bucket = obj.bucket;
}
if (struct_v >= 5) {
if (struct_v < 6) {
- ::decode(tail_instance, bl);
+ decode(tail_instance, bl);
} else {
bool need_to_decode;
- ::decode(need_to_decode, bl);
+ decode(need_to_decode, bl);
if (need_to_decode) {
- ::decode(tail_instance, bl);
+ decode(tail_instance, bl);
} else {
tail_instance = obj.key.instance;
}
}
if (struct_v >= 7) {
- ::decode(head_placement_rule, bl);
- ::decode(tail_placement.placement_rule, bl);
+ decode(head_placement_rule, bl);
+ decode(tail_placement.placement_rule, bl);
}
update_iterators();
void dump(Formatter *f) const;
static void generate_test_instances(list<RGWObjManifest*>& o);
- int append(RGWObjManifest& m, RGWZoneGroup& zonegroup, RGWZoneParams& zone_params);
- int append(RGWObjManifest& m, RGWRados *store);
+ int append(RGWObjManifest& m, const RGWZoneGroup& zonegroup,
+ const RGWZoneParams& zone_params);
+ int append(RGWObjManifest& m, RGWSI_Zone *zone_svc);
bool get_rule(uint64_t ofs, RGWObjManifestRule *rule);
return (obj_size > head_size);
}
- void set_head(const string& placement_rule, const rgw_obj& _o, uint64_t _s) {
+ void set_head(const rgw_placement_rule& placement_rule, const rgw_obj& _o, uint64_t _s) {
head_placement_rule = placement_rule;
obj = _o;
head_size = _s;
return obj;
}
- void set_tail_placement(const string& placement_rule, const rgw_bucket& _b) {
+ void set_tail_placement(const rgw_placement_rule& placement_rule, const rgw_bucket& _b) {
tail_placement.placement_rule = placement_rule;
tail_placement.bucket = _b;
}
return tail_placement;
}
- const string& get_head_placement_rule() {
+ const rgw_placement_rule& get_head_placement_rule() {
return head_placement_rule;
}
return head_size;
}
- void set_max_head_size(uint64_t s) {
- max_head_size = s;
- }
-
uint64_t get_max_head_size() {
return max_head_size;
}
void update_location();
friend class RGWObjManifest;
+ void dump(Formatter *f) const;
};
const obj_iterator& obj_begin();
public:
generator() : manifest(NULL), last_ofs(0), cur_part_ofs(0), cur_part_id(0),
cur_stripe(0), cur_stripe_size(0) {}
- int create_begin(CephContext *cct, RGWObjManifest *manifest, const string& placement_rule, rgw_bucket& bucket, rgw_obj& obj);
+ int create_begin(CephContext *cct, RGWObjManifest *manifest,
+ const rgw_placement_rule& head_placement_rule,
+ const rgw_placement_rule *tail_placement_rule,
+ const rgw_bucket& bucket,
+ const rgw_obj& obj);
int create_next(uint64_t ofs);
rgw_raw_obj get_cur_obj(RGWZoneGroup& zonegroup, RGWZoneParams& zone_params) { return cur_obj.get_raw_obj(zonegroup, zone_params); }
- rgw_raw_obj get_cur_obj(RGWRados *store) { return cur_obj.get_raw_obj(store); }
+ rgw_raw_obj get_cur_obj(RGWRados *store) const { return cur_obj.get_raw_obj(store); }
/* total max size of current stripe (including head obj) */
- uint64_t cur_stripe_max_size() {
+ uint64_t cur_stripe_max_size() const {
return cur_stripe_size;
}
};
void encode(bufferlist& bl) const {
ENCODE_START(4, 2, bl);
- ::encode(num, bl);
- ::encode(size, bl);
- ::encode(etag, bl);
- ::encode(modified, bl);
- ::encode(manifest, bl);
- ::encode(cs_info, bl);
- ::encode(accounted_size, bl);
+ encode(num, bl);
+ encode(size, bl);
+ encode(etag, bl);
+ encode(modified, bl);
+ encode(manifest, bl);
+ encode(cs_info, bl);
+ encode(accounted_size, bl);
ENCODE_FINISH(bl);
}
- void decode(bufferlist::iterator& bl) {
+ void decode(bufferlist::const_iterator& bl) {
DECODE_START_LEGACY_COMPAT_LEN(4, 2, 2, bl);
- ::decode(num, bl);
- ::decode(size, bl);
- ::decode(etag, bl);
- ::decode(modified, bl);
+ decode(num, bl);
+ decode(size, bl);
+ decode(etag, bl);
+ decode(modified, bl);
if (struct_v >= 3)
- ::decode(manifest, bl);
+ decode(manifest, bl);
if (struct_v >= 4) {
- ::decode(cs_info, bl);
- ::decode(accounted_size, bl);
+ decode(cs_info, bl);
+ decode(accounted_size, bl);
} else {
accounted_size = size;
}
ceph::real_time mtime;
uint64_t epoch;
bufferlist obj_tag;
+ bufferlist tail_tag;
string write_tag;
bool fake_tag;
RGWObjManifest manifest;
if (rhs.obj_tag.length()) {
obj_tag = rhs.obj_tag;
}
+ if (rhs.tail_tag.length()) {
+ tail_tag = rhs.tail_tag;
+ }
write_tag = rhs.write_tag;
fake_tag = rhs.fake_tag;
if (rhs.has_manifest) {
bool exists{false};
uint64_t size{0};
ceph::real_time mtime;
- uint64_t epoch;
+ uint64_t epoch{0};
bufferlist obj_tag;
bool has_data{false};
bufferlist data;
RGWListRawObjsCtx() : initialized(false) {}
};
-struct RGWDefaultSystemMetaObjInfo {
- string default_id;
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- ::encode(default_id, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::iterator& bl) {
- DECODE_START(1, bl);
- ::decode(default_id, bl);
- DECODE_FINISH(bl);
- }
-
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
-};
-WRITE_CLASS_ENCODER(RGWDefaultSystemMetaObjInfo)
-
-struct RGWNameToId {
- string obj_id;
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- ::encode(obj_id, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::iterator& bl) {
- DECODE_START(1, bl);
- ::decode(obj_id, bl);
- DECODE_FINISH(bl);
- }
-
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
-};
-WRITE_CLASS_ENCODER(RGWNameToId)
-
-class RGWSystemMetaObj {
-protected:
- string id;
- string name;
-
- CephContext *cct;
- RGWRados *store;
-
- int store_name(bool exclusive);
- int store_info(bool exclusive);
- int read_info(const string& obj_id, bool old_format = false);
- int read_id(const string& obj_name, string& obj_id);
- int read_default(RGWDefaultSystemMetaObjInfo& default_info,
- const string& oid);
- /* read and use default id */
- int use_default(bool old_format = false);
-
-public:
- RGWSystemMetaObj() : cct(NULL), store(NULL) {}
- RGWSystemMetaObj(const string& _name): name(_name), cct(NULL), store(NULL) {}
- RGWSystemMetaObj(const string& _id, const string& _name) : id(_id), name(_name), cct(NULL), store(NULL) {}
- RGWSystemMetaObj(CephContext *_cct, RGWRados *_store): cct(_cct), store(_store){}
- RGWSystemMetaObj(const string& _name, CephContext *_cct, RGWRados *_store): name(_name), cct(_cct), store(_store){}
- const string& get_name() const { return name; }
- const string& get_id() const { return id; }
-
- void set_name(const string& _name) { name = _name;}
- void set_id(const string& _id) { id = _id;}
- void clear_id() { id.clear(); }
-
- virtual ~RGWSystemMetaObj() {}
-
- virtual void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- ::encode(id, bl);
- ::encode(name, bl);
- ENCODE_FINISH(bl);
- }
-
- virtual void decode(bufferlist::iterator& bl) {
- DECODE_START(1, bl);
- ::decode(id, bl);
- ::decode(name, bl);
- DECODE_FINISH(bl);
- }
-
- void reinit_instance(CephContext *_cct, RGWRados *_store) {
- cct = _cct;
- store = _store;
- }
- int init(CephContext *_cct, RGWRados *_store, bool setup_obj = true, bool old_format = false);
- virtual int read_default_id(string& default_id, bool old_format = false);
- virtual int set_as_default(bool exclusive = false);
- int delete_default();
- virtual int create(bool exclusive = true);
- int delete_obj(bool old_format = false);
- int rename(const string& new_name);
- int update() { return store_info(false);}
- int update_name() { return store_name(false);}
- int read();
- int write(bool exclusive);
-
- virtual rgw_pool get_pool(CephContext *cct) = 0;
- virtual const string get_default_oid(bool old_format = false) = 0;
- virtual const string& get_names_oid_prefix() = 0;
- virtual const string& get_info_oid_prefix(bool old_format = false) = 0;
- virtual const string& get_predefined_name(CephContext *cct) = 0;
-
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
-};
-WRITE_CLASS_ENCODER(RGWSystemMetaObj)
-
-struct RGWZonePlacementInfo {
- rgw_pool index_pool;
- rgw_pool data_pool;
- rgw_pool data_extra_pool; /* if not set we should use data_pool */
- RGWBucketIndexType index_type;
- std::string compression_type;
-
- RGWZonePlacementInfo() : index_type(RGWBIType_Normal) {}
-
- void encode(bufferlist& bl) const {
- ENCODE_START(6, 1, bl);
- ::encode(index_pool.to_str(), bl);
- ::encode(data_pool.to_str(), bl);
- ::encode(data_extra_pool.to_str(), bl);
- ::encode((uint32_t)index_type, bl);
- ::encode(compression_type, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::iterator& bl) {
- DECODE_START(6, bl);
- string index_pool_str;
- string data_pool_str;
- ::decode(index_pool_str, bl);
- index_pool = rgw_pool(index_pool_str);
- ::decode(data_pool_str, bl);
- data_pool = rgw_pool(data_pool_str);
- if (struct_v >= 4) {
- string data_extra_pool_str;
- ::decode(data_extra_pool_str, bl);
- data_extra_pool = rgw_pool(data_extra_pool_str);
- }
- if (struct_v >= 5) {
- uint32_t it;
- ::decode(it, bl);
- index_type = (RGWBucketIndexType)it;
- }
- if (struct_v >= 6) {
- ::decode(compression_type, bl);
- }
- DECODE_FINISH(bl);
- }
- const rgw_pool& get_data_extra_pool() const {
- if (data_extra_pool.empty()) {
- return data_pool;
- }
- return data_extra_pool;
- }
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
-};
-WRITE_CLASS_ENCODER(RGWZonePlacementInfo)
-
-struct RGWZoneParams : RGWSystemMetaObj {
- rgw_pool domain_root;
- rgw_pool metadata_heap;
- rgw_pool control_pool;
- rgw_pool gc_pool;
- rgw_pool lc_pool;
- rgw_pool log_pool;
- rgw_pool intent_log_pool;
- rgw_pool usage_log_pool;
-
- rgw_pool user_keys_pool;
- rgw_pool user_email_pool;
- rgw_pool user_swift_pool;
- rgw_pool user_uid_pool;
- rgw_pool roles_pool;
- rgw_pool reshard_pool;
-
- RGWAccessKey system_key;
-
- map<string, RGWZonePlacementInfo> placement_pools;
-
- string realm_id;
-
- map<string, string, ltstr_nocase> tier_config;
-
- RGWZoneParams() : RGWSystemMetaObj() {}
- RGWZoneParams(const string& name) : RGWSystemMetaObj(name){}
- RGWZoneParams(const string& id, const string& name) : RGWSystemMetaObj(id, name) {}
- RGWZoneParams(const string& id, const string& name, const string& _realm_id)
- : RGWSystemMetaObj(id, name), realm_id(_realm_id) {}
-
- rgw_pool get_pool(CephContext *cct);
- const string get_default_oid(bool old_format = false) override;
- const string& get_names_oid_prefix() override;
- const string& get_info_oid_prefix(bool old_format = false) override;
- const string& get_predefined_name(CephContext *cct) override;
-
- int init(CephContext *_cct, RGWRados *_store, bool setup_obj = true,
- bool old_format = false);
- using RGWSystemMetaObj::init;
- int read_default_id(string& default_id, bool old_format = false) override;
- int set_as_default(bool exclusive = false) override;
- int create_default(bool old_format = false);
- int create(bool exclusive = true) override;
- int fix_pool_names();
-
- const string& get_compression_type(const string& placement_rule) const;
-
- void encode(bufferlist& bl) const override {
- ENCODE_START(10, 1, bl);
- ::encode(domain_root, bl);
- ::encode(control_pool, bl);
- ::encode(gc_pool, bl);
- ::encode(log_pool, bl);
- ::encode(intent_log_pool, bl);
- ::encode(usage_log_pool, bl);
- ::encode(user_keys_pool, bl);
- ::encode(user_email_pool, bl);
- ::encode(user_swift_pool, bl);
- ::encode(user_uid_pool, bl);
- RGWSystemMetaObj::encode(bl);
- ::encode(system_key, bl);
- ::encode(placement_pools, bl);
- ::encode(metadata_heap, bl);
- ::encode(realm_id, bl);
- ::encode(lc_pool, bl);
- ::encode(tier_config, bl);
- ::encode(roles_pool, bl);
- ::encode(reshard_pool, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::iterator& bl) override {
- DECODE_START(10, bl);
- ::decode(domain_root, bl);
- ::decode(control_pool, bl);
- ::decode(gc_pool, bl);
- ::decode(log_pool, bl);
- ::decode(intent_log_pool, bl);
- ::decode(usage_log_pool, bl);
- ::decode(user_keys_pool, bl);
- ::decode(user_email_pool, bl);
- ::decode(user_swift_pool, bl);
- ::decode(user_uid_pool, bl);
- if (struct_v >= 6) {
- RGWSystemMetaObj::decode(bl);
- } else if (struct_v >= 2) {
- ::decode(name, bl);
- id = name;
- }
- if (struct_v >= 3)
- ::decode(system_key, bl);
- if (struct_v >= 4)
- ::decode(placement_pools, bl);
- if (struct_v >= 5)
- ::decode(metadata_heap, bl);
- if (struct_v >= 6) {
- ::decode(realm_id, bl);
- }
- if (struct_v >= 7) {
- ::decode(lc_pool, bl);
- } else {
- lc_pool.init(name + ".rgw.lc");
- }
- if (struct_v >= 8) {
- ::decode(tier_config, bl);
- }
- if (struct_v >= 9) {
- ::decode(roles_pool, bl);
- } else {
- roles_pool = name + ".rgw.roles";
- }
- if (struct_v >= 10) {
- ::decode(reshard_pool, bl);
- } else {
- reshard_pool = name + ".rgw.reshard";
- }
- DECODE_FINISH(bl);
- }
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
- static void generate_test_instances(list<RGWZoneParams*>& o);
-
- bool find_placement(const rgw_data_placement_target& placement, string *placement_id) {
- for (const auto& pp : placement_pools) {
- const RGWZonePlacementInfo& info = pp.second;
- if (info.index_pool == placement.index_pool.to_str() &&
- info.data_pool == placement.data_pool.to_str() &&
- info.data_extra_pool == placement.data_extra_pool.to_str()) {
- *placement_id = pp.first;
- return true;
- }
- }
- return false;
- }
-
- bool get_placement(const string& placement_id, RGWZonePlacementInfo *placement) const {
- auto iter = placement_pools.find(placement_id);
- if (iter == placement_pools.end()) {
- return false;
- }
- *placement = iter->second;
- return true;
- }
-
- /*
- * return data pool of the head object
- */
- bool get_head_data_pool(const string& placement_id, const rgw_obj& obj, rgw_pool *pool) const {
- const rgw_data_placement_target& explicit_placement = obj.bucket.explicit_placement;
- if (!explicit_placement.data_pool.empty()) {
- if (!obj.in_extra_data) {
- *pool = explicit_placement.data_pool;
- } else {
- *pool = explicit_placement.get_data_extra_pool();
- }
- return true;
- }
- if (placement_id.empty()) {
- return false;
- }
- auto iter = placement_pools.find(placement_id);
- if (iter == placement_pools.end()) {
- return false;
- }
- if (!obj.in_extra_data) {
- *pool = iter->second.data_pool;
- } else {
- *pool = iter->second.get_data_extra_pool();
- }
- return true;
- }
-};
-WRITE_CLASS_ENCODER(RGWZoneParams)
-
-struct RGWZone {
- string id;
- string name;
- list<string> endpoints;
- bool log_meta;
- bool log_data;
- bool read_only;
- string tier_type;
-
-/**
- * Represents the number of shards for the bucket index object, a value of zero
- * indicates there is no sharding. By default (no sharding, the name of the object
- * is '.dir.{marker}', with sharding, the name is '.dir.{marker}.{sharding_id}',
- * sharding_id is zero-based value. It is not recommended to set a too large value
- * (e.g. thousand) as it increases the cost for bucket listing.
- */
- uint32_t bucket_index_max_shards;
-
- bool sync_from_all;
- set<string> sync_from; /* list of zones to sync from */
-
- RGWZone() : log_meta(false), log_data(false), read_only(false), bucket_index_max_shards(0),
- sync_from_all(true) {}
-
- void encode(bufferlist& bl) const {
- ENCODE_START(6, 1, bl);
- ::encode(name, bl);
- ::encode(endpoints, bl);
- ::encode(log_meta, bl);
- ::encode(log_data, bl);
- ::encode(bucket_index_max_shards, bl);
- ::encode(id, bl);
- ::encode(read_only, bl);
- ::encode(tier_type, bl);
- ::encode(sync_from_all, bl);
- ::encode(sync_from, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::iterator& bl) {
- DECODE_START(6, bl);
- ::decode(name, bl);
- if (struct_v < 4) {
- id = name;
- }
- ::decode(endpoints, bl);
- if (struct_v >= 2) {
- ::decode(log_meta, bl);
- ::decode(log_data, bl);
- }
- if (struct_v >= 3) {
- ::decode(bucket_index_max_shards, bl);
- }
- if (struct_v >= 4) {
- ::decode(id, bl);
- ::decode(read_only, bl);
- }
- if (struct_v >= 5) {
- ::decode(tier_type, bl);
- }
- if (struct_v >= 6) {
- ::decode(sync_from_all, bl);
- ::decode(sync_from, bl);
- }
- DECODE_FINISH(bl);
- }
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
- static void generate_test_instances(list<RGWZone*>& o);
-
- bool is_read_only() { return read_only; }
-
- bool syncs_from(const string& zone_id) {
- return (sync_from_all || sync_from.find(zone_id) != sync_from.end());
- }
-};
-WRITE_CLASS_ENCODER(RGWZone)
-
-struct RGWDefaultZoneGroupInfo {
- string default_zonegroup;
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- ::encode(default_zonegroup, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::iterator& bl) {
- DECODE_START(1, bl);
- ::decode(default_zonegroup, bl);
- DECODE_FINISH(bl);
- }
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
- //todo: implement ceph-dencoder
-};
-WRITE_CLASS_ENCODER(RGWDefaultZoneGroupInfo)
-
-struct RGWZoneGroupPlacementTarget {
- string name;
- set<string> tags;
-
- bool user_permitted(list<string>& user_tags) {
- if (tags.empty()) {
- return true;
- }
- for (auto& rule : user_tags) {
- if (tags.find(rule) != tags.end()) {
- return true;
- }
- }
- return false;
- }
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- ::encode(name, bl);
- ::encode(tags, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::iterator& bl) {
- DECODE_START(1, bl);
- ::decode(name, bl);
- ::decode(tags, bl);
- DECODE_FINISH(bl);
- }
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
-};
-WRITE_CLASS_ENCODER(RGWZoneGroupPlacementTarget)
-
-
-struct RGWZoneGroup : public RGWSystemMetaObj {
- string api_name;
- list<string> endpoints;
- bool is_master;
-
- string master_zone;
- map<string, RGWZone> zones;
-
- map<string, RGWZoneGroupPlacementTarget> placement_targets;
- string default_placement;
-
- list<string> hostnames;
- list<string> hostnames_s3website;
- // TODO: Maybe convert hostnames to a map<string,list<string>> for
- // endpoint_type->hostnames
-/*
-20:05 < _robbat21irssi> maybe I do someting like: if (hostname_map.empty()) { populate all map keys from hostnames; };
-20:05 < _robbat21irssi> but that's a later compatability migration planning bit
-20:06 < yehudasa> more like if (!hostnames.empty()) {
-20:06 < yehudasa> for (list<string>::iterator iter = hostnames.begin(); iter != hostnames.end(); ++iter) {
-20:06 < yehudasa> hostname_map["s3"].append(iter->second);
-20:07 < yehudasa> hostname_map["s3website"].append(iter->second);
-20:07 < yehudasa> s/append/push_back/g
-20:08 < _robbat21irssi> inner loop over APIs
-20:08 < yehudasa> yeah, probably
-20:08 < _robbat21irssi> s3, s3website, swift, swith_auth, swift_website
-*/
- map<string, list<string> > api_hostname_map;
- map<string, list<string> > api_endpoints_map;
-
- string realm_id;
-
- RGWZoneGroup(): is_master(false){}
- RGWZoneGroup(const std::string &id, const std::string &name):RGWSystemMetaObj(id, name) {}
- RGWZoneGroup(const std::string &_name):RGWSystemMetaObj(_name) {}
- RGWZoneGroup(const std::string &_name, bool _is_master, CephContext *cct, RGWRados* store,
- const string& _realm_id, const list<string>& _endpoints)
- : RGWSystemMetaObj(_name, cct , store), endpoints(_endpoints), is_master(_is_master),
- realm_id(_realm_id) {}
-
- bool is_master_zonegroup() const { return is_master;}
- void update_master(bool _is_master) {
- is_master = _is_master;
- post_process_params();
- }
- void post_process_params();
-
- void encode(bufferlist& bl) const override {
- ENCODE_START(4, 1, bl);
- ::encode(name, bl);
- ::encode(api_name, bl);
- ::encode(is_master, bl);
- ::encode(endpoints, bl);
- ::encode(master_zone, bl);
- ::encode(zones, bl);
- ::encode(placement_targets, bl);
- ::encode(default_placement, bl);
- ::encode(hostnames, bl);
- ::encode(hostnames_s3website, bl);
- RGWSystemMetaObj::encode(bl);
- ::encode(realm_id, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::iterator& bl) override {
- DECODE_START(4, bl);
- ::decode(name, bl);
- ::decode(api_name, bl);
- ::decode(is_master, bl);
- ::decode(endpoints, bl);
- ::decode(master_zone, bl);
- ::decode(zones, bl);
- ::decode(placement_targets, bl);
- ::decode(default_placement, bl);
- if (struct_v >= 2) {
- ::decode(hostnames, bl);
- }
- if (struct_v >= 3) {
- ::decode(hostnames_s3website, bl);
- }
- if (struct_v >= 4) {
- RGWSystemMetaObj::decode(bl);
- ::decode(realm_id, bl);
- } else {
- id = name;
- }
- DECODE_FINISH(bl);
- }
-
- int read_default_id(string& default_id, bool old_format = false) override;
- int set_as_default(bool exclusive = false) override;
- int create_default(bool old_format = false);
- int equals(const string& other_zonegroup) const;
- int add_zone(const RGWZoneParams& zone_params, bool *is_master, bool *read_only,
- const list<string>& endpoints, const string *ptier_type,
- bool *psync_from_all, list<string>& sync_from, list<string>& sync_from_rm);
- int remove_zone(const std::string& zone_id);
- int rename_zone(const RGWZoneParams& zone_params);
- rgw_pool get_pool(CephContext *cct);
- const string get_default_oid(bool old_region_format = false) override;
- const string& get_info_oid_prefix(bool old_region_format = false) override;
- const string& get_names_oid_prefix() override;
- const string& get_predefined_name(CephContext *cct) override;
-
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
- static void generate_test_instances(list<RGWZoneGroup*>& o);
-};
-WRITE_CLASS_ENCODER(RGWZoneGroup)
-
-struct RGWPeriodMap
-{
- string id;
- map<string, RGWZoneGroup> zonegroups;
- map<string, RGWZoneGroup> zonegroups_by_api;
- map<string, uint32_t> short_zone_ids;
-
- string master_zonegroup;
-
- void encode(bufferlist& bl) const;
- void decode(bufferlist::iterator& bl);
-
- int update(const RGWZoneGroup& zonegroup, CephContext *cct);
-
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
-
- void reset() {
- zonegroups.clear();
- zonegroups_by_api.clear();
- master_zonegroup.clear();
- }
-
- uint32_t get_zone_short_id(const string& zone_id) const;
-};
-WRITE_CLASS_ENCODER(RGWPeriodMap)
-
-struct RGWPeriodConfig
-{
- RGWQuotaInfo bucket_quota;
- RGWQuotaInfo user_quota;
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- ::encode(bucket_quota, bl);
- ::encode(user_quota, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::iterator& bl) {
- DECODE_START(1, bl);
- ::decode(bucket_quota, bl);
- ::decode(user_quota, bl);
- DECODE_FINISH(bl);
- }
-
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
-
- // the period config must be stored in a local object outside of the period,
- // so that it can be used in a default configuration where no realm/period
- // exists
- int read(RGWRados *store, const std::string& realm_id);
- int write(RGWRados *store, const std::string& realm_id);
-
- static std::string get_oid(const std::string& realm_id);
- static rgw_pool get_pool(CephContext *cct);
-};
-WRITE_CLASS_ENCODER(RGWPeriodConfig)
-
-/* for backward comaptability */
-struct RGWRegionMap {
-
- map<string, RGWZoneGroup> regions;
-
- string master_region;
-
- RGWQuotaInfo bucket_quota;
- RGWQuotaInfo user_quota;
-
- void encode(bufferlist& bl) const;
- void decode(bufferlist::iterator& bl);
-
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
-};
-WRITE_CLASS_ENCODER(RGWRegionMap)
-
-struct RGWZoneGroupMap {
-
- map<string, RGWZoneGroup> zonegroups;
- map<string, RGWZoneGroup> zonegroups_by_api;
-
- string master_zonegroup;
-
- RGWQuotaInfo bucket_quota;
- RGWQuotaInfo user_quota;
-
- /* constract the map */
- int read(CephContext *cct, RGWRados *store);
-
- void encode(bufferlist& bl) const;
- void decode(bufferlist::iterator& bl);
-
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
-};
-WRITE_CLASS_ENCODER(RGWZoneGroupMap)
-
-class RGWRealm;
-
struct objexp_hint_entry {
string tenant;
string bucket_name;
void encode(bufferlist& bl) const {
ENCODE_START(2, 1, bl);
- ::encode(bucket_name, bl);
- ::encode(bucket_id, bl);
- ::encode(obj_key, bl);
- ::encode(exp_time, bl);
- ::encode(tenant, bl);
+ encode(bucket_name, bl);
+ encode(bucket_id, bl);
+ encode(obj_key, bl);
+ encode(exp_time, bl);
+ encode(tenant, bl);
ENCODE_FINISH(bl);
}
- void decode(bufferlist::iterator& bl) {
+ void decode(bufferlist::const_iterator& bl) {
// XXX Do we want DECODE_START_LEGACY_COMPAT_LEN(2, 1, 1, bl); ?
DECODE_START(2, bl);
- ::decode(bucket_name, bl);
- ::decode(bucket_id, bl);
- ::decode(obj_key, bl);
- ::decode(exp_time, bl);
+ decode(bucket_name, bl);
+ decode(bucket_id, bl);
+ decode(obj_key, bl);
+ decode(exp_time, bl);
if (struct_v >= 2) {
- ::decode(tenant, bl);
+ decode(tenant, bl);
} else {
tenant.clear();
}
};
WRITE_CLASS_ENCODER(objexp_hint_entry)
-class RGWPeriod;
-
-class RGWRealm : public RGWSystemMetaObj
-{
- string current_period;
- epoch_t epoch{0}; //< realm epoch, incremented for each new period
-
- int create_control(bool exclusive);
- int delete_control();
-public:
- RGWRealm() {}
- RGWRealm(const string& _id, const string& _name = "") : RGWSystemMetaObj(_id, _name) {}
- RGWRealm(CephContext *_cct, RGWRados *_store): RGWSystemMetaObj(_cct, _store) {}
- RGWRealm(const string& _name, CephContext *_cct, RGWRados *_store): RGWSystemMetaObj(_name, _cct, _store){}
-
- void encode(bufferlist& bl) const override {
- ENCODE_START(1, 1, bl);
- RGWSystemMetaObj::encode(bl);
- ::encode(current_period, bl);
- ::encode(epoch, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::iterator& bl) override {
- DECODE_START(1, bl);
- RGWSystemMetaObj::decode(bl);
- ::decode(current_period, bl);
- ::decode(epoch, bl);
- DECODE_FINISH(bl);
- }
-
- int create(bool exclusive = true) override;
- int delete_obj();
- rgw_pool get_pool(CephContext *cct);
- const string get_default_oid(bool old_format = false) override;
- const string& get_names_oid_prefix() override;
- const string& get_info_oid_prefix(bool old_format = false) override;
- const string& get_predefined_name(CephContext *cct) override;
-
- using RGWSystemMetaObj::read_id; // expose as public for radosgw-admin
-
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
-
- const string& get_current_period() const {
- return current_period;
- }
- int set_current_period(RGWPeriod& period);
- void clear_current_period_and_epoch() {
- current_period.clear();
- epoch = 0;
- }
- epoch_t get_epoch() const { return epoch; }
-
- string get_control_oid();
- /// send a notify on the realm control object
- int notify_zone(bufferlist& bl);
- /// notify the zone of a new period
- int notify_new_period(const RGWPeriod& period);
-};
-WRITE_CLASS_ENCODER(RGWRealm)
-
-struct RGWPeriodLatestEpochInfo {
- epoch_t epoch;
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- ::encode(epoch, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::iterator& bl) {
- DECODE_START(1, bl);
- ::decode(epoch, bl);
- DECODE_FINISH(bl);
- }
-
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
-};
-WRITE_CLASS_ENCODER(RGWPeriodLatestEpochInfo)
-
-class RGWPeriod
-{
- string id;
- epoch_t epoch;
- string predecessor_uuid;
- std::vector<std::string> sync_status;
- RGWPeriodMap period_map;
- RGWPeriodConfig period_config;
- string master_zonegroup;
- string master_zone;
-
- string realm_id;
- string realm_name;
- epoch_t realm_epoch{1}; //< realm epoch when period was made current
-
- CephContext *cct;
- RGWRados *store;
-
- int read_info();
- int read_latest_epoch(RGWPeriodLatestEpochInfo& epoch_info,
- RGWObjVersionTracker *objv = nullptr);
- int use_latest_epoch();
- int use_current_period();
-
- const string get_period_oid();
- const string get_period_oid_prefix();
-
- // gather the metadata sync status for each shard; only for use on master zone
- int update_sync_status(const RGWPeriod ¤t_period,
- std::ostream& error_stream, bool force_if_stale);
-
-public:
- RGWPeriod() : epoch(0), cct(NULL), store(NULL) {}
-
- RGWPeriod(const string& period_id, epoch_t _epoch = 0)
- : id(period_id), epoch(_epoch),
- cct(NULL), store(NULL) {}
-
- const string& get_id() const { return id; }
- epoch_t get_epoch() const { return epoch; }
- epoch_t get_realm_epoch() const { return realm_epoch; }
- const string& get_predecessor() const { return predecessor_uuid; }
- const string& get_master_zone() const { return master_zone; }
- const string& get_master_zonegroup() const { return master_zonegroup; }
- const string& get_realm() const { return realm_id; }
- const RGWPeriodMap& get_map() const { return period_map; }
- RGWPeriodConfig& get_config() { return period_config; }
- const RGWPeriodConfig& get_config() const { return period_config; }
- const std::vector<std::string>& get_sync_status() const { return sync_status; }
- rgw_pool get_pool(CephContext *cct);
- const string& get_latest_epoch_oid();
- const string& get_info_oid_prefix();
-
- void set_user_quota(RGWQuotaInfo& user_quota) {
- period_config.user_quota = user_quota;
- }
-
- void set_bucket_quota(RGWQuotaInfo& bucket_quota) {
- period_config.bucket_quota = bucket_quota;
- }
-
- void set_id(const string& id) {
- this->id = id;
- period_map.id = id;
- }
- void set_epoch(epoch_t epoch) { this->epoch = epoch; }
- void set_realm_epoch(epoch_t epoch) { realm_epoch = epoch; }
-
- void set_predecessor(const string& predecessor)
- {
- predecessor_uuid = predecessor;
- }
-
- void set_realm_id(const string& _realm_id) {
- realm_id = _realm_id;
- }
-
- int reflect();
-
- int get_zonegroup(RGWZoneGroup& zonegroup,
- const string& zonegroup_id);
-
- bool is_single_zonegroup()
- {
- return (period_map.zonegroups.size() == 1);
- }
-
- /*
- returns true if there are several zone groups with a least one zone
- */
- bool is_multi_zonegroups_with_zones()
- {
- int count = 0;
- for (const auto& zg: period_map.zonegroups) {
- if (zg.second.zones.size() > 0) {
- if (count++ > 0) {
- return true;
- }
- }
- }
- return false;
- }
-
- int get_latest_epoch(epoch_t& epoch);
- int set_latest_epoch(epoch_t epoch, bool exclusive = false,
- RGWObjVersionTracker *objv = nullptr);
- // update latest_epoch if the given epoch is higher, else return -EEXIST
- int update_latest_epoch(epoch_t epoch);
-
- int init(CephContext *_cct, RGWRados *_store, const string &period_realm_id, const string &period_realm_name = "",
- bool setup_obj = true);
- int init(CephContext *_cct, RGWRados *_store, bool setup_obj = true);
-
- int create(bool exclusive = true);
- int delete_obj();
- int store_info(bool exclusive);
- int add_zonegroup(const RGWZoneGroup& zonegroup);
-
- void fork();
- int update();
-
- // commit a staging period; only for use on master zone
- int commit(RGWRealm& realm, const RGWPeriod ¤t_period,
- std::ostream& error_stream, bool force_if_stale = false);
-
- void encode(bufferlist& bl) const {
- ENCODE_START(1, 1, bl);
- ::encode(id, bl);
- ::encode(epoch, bl);
- ::encode(realm_epoch, bl);
- ::encode(predecessor_uuid, bl);
- ::encode(sync_status, bl);
- ::encode(period_map, bl);
- ::encode(master_zone, bl);
- ::encode(master_zonegroup, bl);
- ::encode(period_config, bl);
- ::encode(realm_id, bl);
- ::encode(realm_name, bl);
- ENCODE_FINISH(bl);
- }
-
- void decode(bufferlist::iterator& bl) {
- DECODE_START(1, bl);
- ::decode(id, bl);
- ::decode(epoch, bl);
- ::decode(realm_epoch, bl);
- ::decode(predecessor_uuid, bl);
- ::decode(sync_status, bl);
- ::decode(period_map, bl);
- ::decode(master_zone, bl);
- ::decode(master_zonegroup, bl);
- ::decode(period_config, bl);
- ::decode(realm_id, bl);
- ::decode(realm_name, bl);
- DECODE_FINISH(bl);
- }
- void dump(Formatter *f) const;
- void decode_json(JSONObj *obj);
-
- static string get_staging_id(const string& realm_id) {
- return realm_id + ":staging";
- }
-};
-WRITE_CLASS_ENCODER(RGWPeriod)
-
class RGWDataChangesLog;
class RGWMetaSyncStatusManager;
class RGWDataSyncStatusManager;
-class RGWReplicaLogger;
class RGWCoroutinesManagerRegistry;
-
-class RGWStateLog {
- RGWRados *store;
- int num_shards;
- string module_name;
-
- void oid_str(int shard, string& oid);
- int get_shard_num(const string& object);
- string get_oid(const string& object);
- int open_ioctx(librados::IoCtx& ioctx);
-
- struct list_state {
- int cur_shard;
- int max_shard;
- string marker;
- string client_id;
- string op_id;
- string object;
-
- list_state() : cur_shard(0), max_shard(0) {}
- };
-
-protected:
- virtual bool dump_entry_internal(const cls_statelog_entry& entry, Formatter *f) {
- return false;
- }
-
-public:
- RGWStateLog(RGWRados *_store, int _num_shards, const string& _module_name) :
- store(_store), num_shards(_num_shards), module_name(_module_name) {}
- virtual ~RGWStateLog() {}
-
- int store_entry(const string& client_id, const string& op_id, const string& object,
- uint32_t state, bufferlist *bl, uint32_t *check_state);
-
- int remove_entry(const string& client_id, const string& op_id, const string& object);
-
- void init_list_entries(const string& client_id, const string& op_id, const string& object,
- void **handle);
-
- int list_entries(void *handle, int max_entries, list<cls_statelog_entry>& entries, bool *done);
-
- void finish_list_entries(void *handle);
-
- virtual void dump_entry(const cls_statelog_entry& entry, Formatter *f);
-};
-
-/*
- * state transitions:
- *
- * unknown -> in-progress -> complete
- * -> error
- *
- * user can try setting the 'abort' state, and it can only succeed if state is
- * in-progress.
- *
- * state renewal cannot switch state (stays in the same state)
- *
- * rgw can switch from in-progress to complete
- * rgw can switch from in-progress to error
- *
- * rgw can switch from abort to cancelled
- *
- */
-
-class RGWOpState : public RGWStateLog {
-protected:
- bool dump_entry_internal(const cls_statelog_entry& entry, Formatter *f) override;
-public:
-
- enum OpState {
- OPSTATE_UNKNOWN = 0,
- OPSTATE_IN_PROGRESS = 1,
- OPSTATE_COMPLETE = 2,
- OPSTATE_ERROR = 3,
- OPSTATE_ABORT = 4,
- OPSTATE_CANCELLED = 5,
- };
-
- explicit RGWOpState(RGWRados *_store);
-
- int state_from_str(const string& s, OpState *state);
- int set_state(const string& client_id, const string& op_id, const string& object, OpState state);
- int renew_state(const string& client_id, const string& op_id, const string& object, OpState state);
-};
-
-class RGWOpStateSingleOp
-{
- RGWOpState os;
- string client_id;
- string op_id;
- string object;
-
- CephContext *cct;
-
- RGWOpState::OpState cur_state;
- ceph::real_time last_update;
-
-public:
- RGWOpStateSingleOp(RGWRados *store, const string& cid, const string& oid, const string& obj);
-
- int set_state(RGWOpState::OpState state);
- int renew_state();
-};
class RGWGetBucketStats_CB : public RefCountedObject {
protected:
class RGWGetDirHeader_CB;
class RGWGetUserHeader_CB;
-struct rgw_rados_ref {
- rgw_pool pool;
- string oid;
- string key;
- librados::IoCtx ioctx;
-};
+class RGWObjectCtx {
+ RGWRados *store;
+ RWLock lock{"RGWObjectCtx"};
+ void *s{nullptr};
-class RGWChainedCache {
+ std::map<rgw_obj, RGWObjState> objs_state;
public:
- virtual ~RGWChainedCache() {}
- virtual void chain_cb(const string& key, void *data) = 0;
- virtual void invalidate(const string& key) = 0;
- virtual void invalidate_all() = 0;
+ explicit RGWObjectCtx(RGWRados *_store) : store(_store) {}
+ explicit RGWObjectCtx(RGWRados *_store, void *_s) : store(_store), s(_s) {}
- struct Entry {
- RGWChainedCache *cache;
- const string& key;
- void *data;
-
- Entry(RGWChainedCache *_c, const string& _k, void *_d) : cache(_c), key(_k), data(_d) {}
- };
-};
-
-template <class T, class S>
-class RGWObjectCtxImpl {
- RGWRados *store;
- std::map<T, S> objs_state;
- RWLock lock;
+ void *get_private() {
+ return s;
+ }
-public:
- RGWObjectCtxImpl(RGWRados *_store) : store(_store), lock("RGWObjectCtxImpl") {}
+ RGWRados *get_store() {
+ return store;
+ }
- S *get_state(const T& obj) {
- S *result;
- typename std::map<T, S>::iterator iter;
+ RGWObjState *get_state(const rgw_obj& obj) {
+ RGWObjState *result;
+ typename std::map<rgw_obj, RGWObjState>::iterator iter;
lock.get_read();
assert (!obj.empty());
iter = objs_state.find(obj);
return result;
}
- void set_atomic(T& obj) {
+ void set_atomic(rgw_obj& obj) {
RWLock::WLocker wl(lock);
assert (!obj.empty());
objs_state[obj].is_atomic = true;
}
- void set_prefetch_data(T& obj) {
+ void set_prefetch_data(const rgw_obj& obj) {
RWLock::WLocker wl(lock);
assert (!obj.empty());
objs_state[obj].prefetch_data = true;
}
- void invalidate(T& obj) {
+
+ void invalidate(const rgw_obj& obj) {
RWLock::WLocker wl(lock);
auto iter = objs_state.find(obj);
if (iter == objs_state.end()) {
objs_state.erase(iter);
if (is_atomic || prefetch_data) {
- auto& s = objs_state[obj];
- s.is_atomic = is_atomic;
- s.prefetch_data = prefetch_data;
+ auto& state = objs_state[obj];
+ state.is_atomic = is_atomic;
+ state.prefetch_data = prefetch_data;
}
}
};
-template<>
-void RGWObjectCtxImpl<rgw_obj, RGWObjState>::invalidate(rgw_obj& obj);
-
-template<>
-void RGWObjectCtxImpl<rgw_raw_obj, RGWRawObjState>::invalidate(rgw_raw_obj& obj);
-
-struct RGWObjectCtx {
- RGWRados *store;
- void *user_ctx;
-
- RGWObjectCtxImpl<rgw_obj, RGWObjState> obj;
- RGWObjectCtxImpl<rgw_raw_obj, RGWRawObjState> raw;
-
- explicit RGWObjectCtx(RGWRados *_store) : store(_store), user_ctx(NULL), obj(store), raw(store) { }
- RGWObjectCtx(RGWRados *_store, void *_user_ctx) : store(_store), user_ctx(_user_ctx), obj(store), raw(store) { }
-};
-
-class Finisher;
class RGWAsyncRadosProcessor;
template <class T>
uint64_t pg_ver;
tombstone_entry() = default;
- tombstone_entry(const RGWObjState& state)
+ explicit tombstone_entry(const RGWObjState& state)
: mtime(state.mtime), zone_short_id(state.zone_short_id),
pg_ver(state.pg_ver) {}
};
class RGWIndexCompletionManager;
-class RGWRados
+class RGWRados : public AdminSocketHook
{
friend class RGWGC;
friend class RGWMetaNotifier;
friend class RGWObjectExpirer;
friend class RGWMetaSyncProcessorThread;
friend class RGWDataSyncProcessorThread;
- friend class RGWStateLog;
- friend class RGWReplicaLogger;
friend class RGWReshard;
friend class RGWBucketReshard;
+ friend class RGWBucketReshardLock;
friend class BucketIndexLockGuard;
+ friend class RGWCompleteMultipart;
+
+ static constexpr const char* admin_commands[4][3] = {
+ { "cache list",
+ "cache list name=filter,type=CephString,req=false",
+ "cache list [filter_str]: list object cache, possibly matching substrings" },
+ { "cache inspect",
+ "cache inspect name=target,type=CephString,req=true",
+ "cache inspect target: print cache element" },
+ { "cache erase",
+ "cache erase name=target,type=CephString,req=true",
+ "cache erase target: erase element from cache" },
+ { "cache zap",
+ "cache zap",
+ "cache zap: erase all elements from cache" }
+ };
/** Open the pool used as root for this gateway */
int open_root_pool_ctx();
int open_objexp_pool_ctx();
int open_reshard_pool_ctx();
- int open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx);
+ int open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx,
+ bool mostly_omap);
int open_bucket_index_ctx(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx);
int open_bucket_index(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx, string& bucket_oid);
int open_bucket_index_base(const RGWBucketInfo& bucket_info, librados::IoCtx& index_ctx,
std::atomic<int64_t> max_req_id = { 0 };
Mutex lock;
- Mutex watchers_lock;
SafeTimer *timer;
RGWGC *gc;
RGWMetaNotifier *meta_notifier;
RGWDataNotifier *data_notifier;
RGWMetaSyncProcessorThread *meta_sync_processor_thread;
+ RGWSyncTraceManager *sync_tracer = nullptr;
map<string, RGWDataSyncProcessorThread *> data_sync_processor_threads;
+ boost::optional<rgw::BucketTrimManager> bucket_trim;
RGWSyncLogTrimThread *sync_log_trimmer{nullptr};
Mutex meta_sync_thread_lock;
Mutex data_sync_thread_lock;
- int num_watchers;
- RGWWatcher **watchers;
- std::set<int> watchers_set;
librados::IoCtx root_pool_ctx; // .rgw
- librados::IoCtx control_pool_ctx; // .rgw.control
- bool watch_initialized;
+
+ double inject_notify_timeout_probability = 0;
+ unsigned max_notify_retries = 0;
friend class RGWWatcher;
int get_olh_target_state(RGWObjectCtx& rctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj,
RGWObjState *olh_state, RGWObjState **target_state);
- int get_system_obj_state_impl(RGWObjectCtx *rctx, rgw_raw_obj& obj, RGWRawObjState **state, RGWObjVersionTracker *objv_tracker);
int get_obj_state_impl(RGWObjectCtx *rctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj, RGWObjState **state,
bool follow_olh, bool assume_noent = false);
int append_atomic_test(RGWObjectCtx *rctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj,
librados::ObjectOperation& op, RGWObjState **state);
+ int append_atomic_test(const RGWObjState* astate, librados::ObjectOperation& op);
int update_placement_map();
int store_bucket_info(RGWBucketInfo& info, map<string, bufferlist> *pattrs, RGWObjVersionTracker *objv_tracker, bool exclusive);
protected:
CephContext *cct;
- std::vector<librados::Rados> rados;
- uint32_t next_rados_handle;
- RWLock handle_lock;
- std::map<pthread_t, int> rados_map;
+ librados::Rados rados;
using RGWChainedCacheImpl_bucket_info_entry = RGWChainedCacheImpl<bucket_info_entry>;
RGWChainedCacheImpl_bucket_info_entry *binfo_cache;
bool pools_initialized;
- string trans_id_suffix;
-
RGWQuotaHandler *quota_handler;
- Finisher *finisher;
-
RGWCoroutinesManagerRegistry *cr_registry;
- RGWSyncModulesManager *sync_modules_manager{nullptr};
RGWSyncModuleInstanceRef sync_module;
bool writeable_zone{false};
- RGWZoneGroup zonegroup;
- RGWZone zone_public_config; /* external zone params, e.g., entrypoints, log flags, etc. */
- RGWZoneParams zone_params; /* internal zone params, e.g., rados pools */
- uint32_t zone_short_id;
-
- RGWPeriod current_period;
-
RGWIndexCompletionManager *index_completion_manager{nullptr};
+
+ bool use_cache{false};
public:
- RGWRados() : lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
+ RGWRados(): lock("rados_timer_lock"), timer(NULL),
gc(NULL), lc(NULL), obj_expirer(NULL), use_gc_thread(false), use_lc_thread(false), quota_threads(false),
run_sync_thread(false), run_reshard_thread(false), async_rados(nullptr), meta_notifier(NULL),
data_notifier(NULL), meta_sync_processor_thread(NULL),
meta_sync_thread_lock("meta_sync_thread_lock"), data_sync_thread_lock("data_sync_thread_lock"),
- num_watchers(0), watchers(NULL),
- watch_initialized(false),
bucket_id_lock("rados_bucket_id"),
bucket_index_max_shards(0),
max_bucket_id(0), cct(NULL),
- next_rados_handle(0),
- handle_lock("rados_handle_lock"),
binfo_cache(NULL), obj_tombstone_cache(nullptr),
pools_initialized(false),
quota_handler(NULL),
- finisher(NULL),
cr_registry(NULL),
- zone_short_id(0),
- rest_master_conn(NULL),
- meta_mgr(NULL), data_log(NULL), reshard(NULL) {}
-
- uint64_t get_new_req_id() {
- return ++max_req_id;
- }
-
- librados::IoCtx* get_lc_pool_ctx() {
- return &lc_pool_ctx;
- }
- void set_context(CephContext *_cct) {
- cct = _cct;
- }
-
- /**
- * AmazonS3 errors contain a HostId string, but is an opaque base64 blob; we
- * try to be more transparent. This has a wrapper so we can update it when zonegroup/zone are changed.
- */
- void init_host_id() {
- /* uint64_t needs 16, two '-' separators and a trailing null */
- const string& zone_name = get_zone().name;
- const string& zonegroup_name = zonegroup.get_name();
- char charbuf[16 + zone_name.size() + zonegroup_name.size() + 2 + 1];
- snprintf(charbuf, sizeof(charbuf), "%llx-%s-%s", (unsigned long long)instance_id(), zone_name.c_str(), zonegroup_name.c_str());
- string s(charbuf);
- host_id = s;
- }
-
- string host_id;
-
- RGWRealm realm;
-
- RGWRESTConn *rest_master_conn;
- map<string, RGWRESTConn *> zone_conn_map;
- map<string, RGWRESTConn *> zone_data_sync_from_map;
- map<string, RGWRESTConn *> zone_data_notify_to_map;
- map<string, RGWRESTConn *> zonegroup_conn_map;
-
- map<string, string> zone_id_by_name;
- map<string, RGWZone> zone_by_id;
-
- RGWRESTConn *get_zone_conn_by_id(const string& id) {
- auto citer = zone_conn_map.find(id);
- if (citer == zone_conn_map.end()) {
- return NULL;
- }
+ meta_mgr(NULL), data_log(NULL), reshard(NULL) {}
- return citer->second;
+ RGWRados& set_use_cache(bool status) {
+ use_cache = status;
+ return *this;
}
- RGWRESTConn *get_zone_conn_by_name(const string& name) {
- auto i = zone_id_by_name.find(name);
- if (i == zone_id_by_name.end()) {
- return NULL;
- }
-
- return get_zone_conn_by_id(i->second);
+ RGWLC *get_lc() {
+ return lc;
}
- bool find_zone_id_by_name(const string& name, string *id) {
- auto i = zone_id_by_name.find(name);
- if (i == zone_id_by_name.end()) {
- return false;
- }
- *id = i->second;
- return true;
+ RGWRados& set_run_gc_thread(bool _use_gc_thread) {
+ use_gc_thread = _use_gc_thread;
+ return *this;
}
- int get_zonegroup(const string& id, RGWZoneGroup& zonegroup) {
- int ret = 0;
- if (id == get_zonegroup().get_id()) {
- zonegroup = get_zonegroup();
- } else if (!current_period.get_id().empty()) {
- ret = current_period.get_zonegroup(zonegroup, id);
- }
- return ret;
+ RGWRados& set_run_lc_thread(bool _use_lc_thread) {
+ use_lc_thread = _use_lc_thread;
+ return *this;
}
- RGWRealm& get_realm() {
- return realm;
+ RGWRados& set_run_quota_threads(bool _run_quota_threads) {
+ quota_threads = _run_quota_threads;
+ return *this;
}
- RGWZoneParams& get_zone_params() { return zone_params; }
- RGWZoneGroup& get_zonegroup() {
- return zonegroup;
- }
- RGWZone& get_zone() {
- return zone_public_config;
+ RGWRados& set_run_sync_thread(bool _run_sync_thread) {
+ run_sync_thread = _run_sync_thread;
+ return *this;
}
- bool zone_is_writeable() {
- return writeable_zone && !get_zone().is_read_only();
+ RGWRados& set_run_reshard_thread(bool _run_reshard_thread) {
+ run_reshard_thread = _run_reshard_thread;
+ return *this;
}
- uint32_t get_zone_short_id() const {
- return zone_short_id;
+ uint64_t get_new_req_id() {
+ return ++max_req_id;
}
- bool zone_syncs_from(RGWZone& target_zone, RGWZone& source_zone);
-
- const RGWQuotaInfo& get_bucket_quota() {
- return current_period.get_config().bucket_quota;
+ librados::IoCtx* get_lc_pool_ctx() {
+ return &lc_pool_ctx;
}
-
- const RGWQuotaInfo& get_user_quota() {
- return current_period.get_config().user_quota;
+ void set_context(CephContext *_cct) {
+ cct = _cct;
}
- const string& get_current_period_id() {
- return current_period.get_id();
- }
+ RGWServices svc;
- bool has_zonegroup_api(const std::string& api) const {
- if (!current_period.get_id().empty()) {
- const auto& zonegroups_by_api = current_period.get_map().zonegroups_by_api;
- if (zonegroups_by_api.find(api) != zonegroups_by_api.end())
- return true;
- }
- return false;
- }
+ /**
+ * AmazonS3 errors contain a HostId string, but is an opaque base64 blob; we
+ * try to be more transparent. This has a wrapper so we can update it when zonegroup/zone are changed.
+ */
+ string host_id;
// pulls missing periods for period_history
std::unique_ptr<RGWPeriodPuller> period_puller;
tombstone_cache_t *get_tombstone_cache() {
return obj_tombstone_cache;
}
-
- RGWSyncModulesManager *get_sync_modules_manager() {
- return sync_modules_manager;
- }
const RGWSyncModuleInstanceRef& get_sync_module() {
return sync_module;
}
+ RGWSyncTraceManager *get_sync_tracer() {
+ return sync_tracer;
+ }
int get_required_alignment(const rgw_pool& pool, uint64_t *alignment);
- int get_max_chunk_size(const rgw_pool& pool, uint64_t *max_chunk_size);
- int get_max_chunk_size(const string& placement_rule, const rgw_obj& obj, uint64_t *max_chunk_size);
+ void get_max_aligned_size(uint64_t size, uint64_t alignment, uint64_t *max_size);
+ int get_max_chunk_size(const rgw_pool& pool, uint64_t *max_chunk_size, uint64_t *palignment = nullptr);
+ int get_max_chunk_size(const rgw_placement_rule& placement_rule, const rgw_obj& obj, uint64_t *max_chunk_size, uint64_t *palignment = nullptr);
uint32_t get_max_bucket_shards() {
return rgw_shards_max();
}
+
int get_raw_obj_ref(const rgw_raw_obj& obj, rgw_rados_ref *ref);
+ int list_raw_objects_init(const rgw_pool& pool, const string& marker, RGWListRawObjsCtx *ctx);
+ int list_raw_objects_next(const string& prefix_filter, int max,
+ RGWListRawObjsCtx& ctx, list<string>& oids,
+ bool *is_truncated);
int list_raw_objects(const rgw_pool& pool, const string& prefix_filter, int max,
RGWListRawObjsCtx& ctx, list<string>& oids,
bool *is_truncated);
-
- int list_raw_prefixed_objs(const rgw_pool& pool, const string& prefix, list<string>& result);
- int list_zonegroups(list<string>& zonegroups);
- int list_regions(list<string>& regions);
- int list_zones(list<string>& zones);
- int list_realms(list<string>& realms);
- int list_periods(list<string>& periods);
- int list_periods(const string& current_period, list<string>& periods);
- void tick();
+ string list_raw_objs_get_cursor(RGWListRawObjsCtx& ctx);
CephContext *ctx() { return cct; }
/** do all necessary setup of the storage device */
- int initialize(CephContext *_cct, bool _use_gc_thread, bool _use_lc_thread, bool _quota_threads, bool _run_sync_thread, bool _run_reshard_thread) {
+ int initialize(CephContext *_cct) {
set_context(_cct);
- use_gc_thread = _use_gc_thread;
- use_lc_thread = _use_lc_thread;
- quota_threads = _quota_threads;
- run_sync_thread = _run_sync_thread;
- run_reshard_thread = _run_reshard_thread;
return initialize();
}
/** Initialize the RADOS instance and prepare to do other ops */
- virtual int init_rados();
- int init_zg_from_period(bool *initialized);
- int init_zg_from_local(bool *creating_defaults);
+ int init_svc(bool raw);
+ int init_rados();
int init_complete();
- int replace_region_with_zonegroup();
- int convert_regionmap();
int initialize();
void finalize();
int register_to_service_map(const string& daemon_type, const map<string, string>& meta);
-
- void schedule_context(Context *c);
-
- /** set up a bucket listing. handle is filled in. */
- int list_buckets_init(RGWAccessHandle *handle);
- /**
- * get the next bucket in the listing. obj is filled in,
- * handle is updated.
- */
- int list_buckets_next(rgw_bucket_dir_entry& obj, RGWAccessHandle *handle);
+ int update_service_map(std::map<std::string, std::string>&& status);
/// list logs
int log_list_init(const string& prefix, RGWAccessHandle *handle);
// log bandwidth info
int log_usage(map<rgw_user_bucket, RGWUsageBatch>& usage_info);
- int read_usage(const rgw_user& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
- bool *is_truncated, RGWUsageIter& read_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage);
- int trim_usage(rgw_user& user, uint64_t start_epoch, uint64_t end_epoch);
+ int read_usage(const rgw_user& user, const string& bucket_name, uint64_t start_epoch, uint64_t end_epoch,
+ uint32_t max_entries, bool *is_truncated, RGWUsageIter& read_iter, map<rgw_user_bucket,
+ rgw_usage_log_entry>& usage);
+ int trim_usage(const rgw_user& user, const string& bucket_name, uint64_t start_epoch, uint64_t end_epoch);
+ int clear_usage();
int create_pool(const rgw_pool& pool);
- /**
- * create a bucket with name bucket and the given list of attrs
- * returns 0 on success, -ERR# otherwise.
- */
int init_bucket_index(RGWBucketInfo& bucket_info, int num_shards);
- int select_bucket_placement(RGWUserInfo& user_info, const string& zonegroup_id, const string& rule,
- string *pselected_rule_name, RGWZonePlacementInfo *rule_info);
- int select_legacy_bucket_placement(RGWZonePlacementInfo *rule_info);
- int select_new_bucket_location(RGWUserInfo& user_info, const string& zonegroup_id, const string& rule,
- string *pselected_rule_name, RGWZonePlacementInfo *rule_info);
- int select_bucket_location_by_rule(const string& location_rule, RGWZonePlacementInfo *rule_info);
+ int clean_bucket_index(RGWBucketInfo& bucket_info, int num_shards);
void create_bucket_id(string *bucket_id);
- bool get_obj_data_pool(const string& placement_rule, const rgw_obj& obj, rgw_pool *pool);
- bool obj_to_raw(const string& placement_rule, const rgw_obj& obj, rgw_raw_obj *raw_obj);
+ bool get_obj_data_pool(const rgw_placement_rule& placement_rule, const rgw_obj& obj, rgw_pool *pool);
+ bool obj_to_raw(const rgw_placement_rule& placement_rule, const rgw_obj& obj, rgw_raw_obj *raw_obj);
- int create_bucket(RGWUserInfo& owner, rgw_bucket& bucket,
+ int create_bucket(const RGWUserInfo& owner, rgw_bucket& bucket,
const string& zonegroup_id,
- const string& placement_rule,
+ const rgw_placement_rule& placement_rule,
const string& swift_ver_location,
const RGWQuotaInfo * pquota_info,
map<std::string,bufferlist>& attrs,
rgw_bucket *master_bucket,
uint32_t *master_num_shards,
bool exclusive = true);
- int add_bucket_placement(const rgw_pool& new_pool);
- int remove_bucket_placement(const rgw_pool& new_pool);
- int list_placement_set(set<rgw_pool>& names);
- int create_pools(vector<rgw_pool>& pools, vector<int>& retcodes);
RGWCoroutinesManagerRegistry *get_cr_registry() { return cr_registry; }
- class SystemObject {
- RGWRados *store;
- RGWObjectCtx& ctx;
- rgw_raw_obj obj;
-
- RGWObjState *state;
-
- protected:
- int get_state(RGWRawObjState **pstate, RGWObjVersionTracker *objv_tracker);
-
- public:
- SystemObject(RGWRados *_store, RGWObjectCtx& _ctx, rgw_raw_obj& _obj) : store(_store), ctx(_ctx), obj(_obj), state(NULL) {}
-
- void invalidate_state();
-
- RGWRados *get_store() { return store; }
- rgw_raw_obj& get_obj() { return obj; }
- RGWObjectCtx& get_ctx() { return ctx; }
-
- struct Read {
- RGWRados::SystemObject *source;
-
- struct GetObjState {
- rgw_rados_ref ref;
- bool has_ref{false};
- uint64_t last_ver{0};
-
- GetObjState() {}
-
- int get_ref(RGWRados *store, rgw_raw_obj& obj, rgw_rados_ref **pref);
- } state;
-
- struct StatParams {
- ceph::real_time *lastmod;
- uint64_t *obj_size;
- map<string, bufferlist> *attrs;
-
- StatParams() : lastmod(NULL), obj_size(NULL), attrs(NULL) {}
- } stat_params;
-
- struct ReadParams {
- rgw_cache_entry_info *cache_info{nullptr};
- map<string, bufferlist> *attrs;
-
- ReadParams() : attrs(NULL) {}
- } read_params;
-
- explicit Read(RGWRados::SystemObject *_source) : source(_source) {}
-
- int stat(RGWObjVersionTracker *objv_tracker);
- int read(int64_t ofs, int64_t end, bufferlist& bl, RGWObjVersionTracker *objv_tracker);
- int get_attr(const char *name, bufferlist& dest);
- };
- };
-
struct BucketShard {
RGWRados *store;
rgw_bucket bucket;
string bucket_obj;
explicit BucketShard(RGWRados *_store) : store(_store), shard_id(-1) {}
- int init(const rgw_bucket& _bucket, const rgw_obj& obj);
- int init(const rgw_bucket& _bucket, int sid);
+ int init(const rgw_bucket& _bucket, const rgw_obj& obj, RGWBucketInfo* out);
+ int init(const rgw_bucket& _bucket, int sid, RGWBucketInfo* out);
+ int init(const RGWBucketInfo& bucket_info, const rgw_obj& obj);
+ int init(const RGWBucketInfo& bucket_info, int sid);
};
class Object {
void invalidate_state();
int prepare_atomic_modification(librados::ObjectWriteOperation& op, bool reset_obj, const string *ptag,
- const char *ifmatch, const char *ifnomatch, bool removal_op);
+ const char *ifmatch, const char *ifnomatch, bool removal_op, bool modify_tail);
int complete_atomic_modification();
public:
int get_bucket_shard(BucketShard **pbs) {
if (!bs_initialized) {
- int r = bs.init(bucket_info.bucket, obj);
+ int r =
+ bs.init(bucket_info.bucket, obj, nullptr /* no RGWBucketInfo */);
if (r < 0) {
return r;
}
RGWRados::Object *source;
struct GetObjState {
- librados::IoCtx io_ctx;
+ map<rgw_pool, librados::IoCtx> io_ctxs;
+ rgw_pool cur_pool;
+ librados::IoCtx *cur_ioctx{nullptr};
rgw_obj obj;
rgw_raw_obj head_obj;
} state;
int flags;
const char *if_match;
const char *if_nomatch;
- uint64_t olh_epoch;
+ std::optional<uint64_t> olh_epoch;
ceph::real_time delete_at;
bool canceled;
const string *user_data;
rgw_zone_set *zones_trace;
+ bool modify_tail;
+ bool completeMultipart;
+ bool appendable;
MetaParams() : mtime(NULL), rmattrs(NULL), data(NULL), manifest(NULL), ptag(NULL),
- remove_objs(NULL), category(RGW_OBJ_CATEGORY_MAIN), flags(0),
- if_match(NULL), if_nomatch(NULL), olh_epoch(0), canceled(false), user_data(nullptr), zones_trace(nullptr) {}
+ remove_objs(NULL), category(RGWObjCategory::Main), flags(0),
+ if_match(NULL), if_nomatch(NULL), canceled(false), user_data(nullptr), zones_trace(nullptr),
+ modify_tail(false), completeMultipart(false), appendable(false) {}
} meta;
explicit Write(RGWRados::Object *_target) : target(_target) {}
int _do_write_meta(uint64_t size, uint64_t accounted_size,
map<std::string, bufferlist>& attrs,
- bool assume_noent,
+ bool modify_tail, bool assume_noent,
void *index_op);
int write_meta(uint64_t size, uint64_t accounted_size,
map<std::string, bufferlist>& attrs);
int write_data(const char *data, uint64_t ofs, uint64_t len, bool exclusive);
+ const req_state* get_req_state() {
+ return (req_state *)target->get_ctx().get_private();
+ }
};
struct Delete {
rgw_zone_set *zones_trace{nullptr};
int init_bs() {
- int r = bs.init(target->get_bucket(), obj);
+ int r =
+ bs.init(target->get_bucket(), obj, nullptr /* no RGWBucketInfo */);
if (r < 0) {
return r;
}
int complete(int64_t poolid, uint64_t epoch, uint64_t size,
uint64_t accounted_size, ceph::real_time& ut,
const string& etag, const string& content_type,
+ const string& storage_class,
bufferlist *acl_bl, RGWObjCategory category,
- list<rgw_obj_index_key> *remove_objs, const string *user_data = nullptr);
+ list<rgw_obj_index_key> *remove_objs, const string *user_data = nullptr, bool appendable = false);
int complete_del(int64_t poolid, uint64_t epoch,
ceph::real_time& removed_mtime, /* mtime of removed object */
list<rgw_obj_index_key> *remove_objs);
const string *get_optag() { return &optag; }
bool is_prepared() { return prepared; }
- };
+ }; // class UpdateIndex
+
+ class List {
+ protected:
- struct List {
RGWRados::Bucket *target;
rgw_obj_key next_marker;
+ int list_objects_ordered(int64_t max,
+ vector<rgw_bucket_dir_entry> *result,
+ map<string, bool> *common_prefixes,
+ bool *is_truncated);
+ int list_objects_unordered(int64_t max,
+ vector<rgw_bucket_dir_entry> *result,
+ map<string, bool> *common_prefixes,
+ bool *is_truncated);
+
+ public:
+
struct Params {
string prefix;
string delim;
bool enforce_ns;
RGWAccessListFilter *filter;
bool list_versions;
-
- Params() : enforce_ns(true), filter(NULL), list_versions(false) {}
+ bool allow_unordered;
+
+ Params() :
+ enforce_ns(true),
+ filter(NULL),
+ list_versions(false),
+ allow_unordered(false)
+ {}
} params;
- public:
explicit List(RGWRados::Bucket *_target) : target(_target) {}
- int list_objects(int max, vector<rgw_bucket_dir_entry> *result, map<string, bool> *common_prefixes, bool *is_truncated);
+ int list_objects(int64_t max,
+ vector<rgw_bucket_dir_entry> *result,
+ map<string, bool> *common_prefixes,
+ bool *is_truncated) {
+ if (params.allow_unordered) {
+ return list_objects_unordered(max, result, common_prefixes,
+ is_truncated);
+ } else {
+ return list_objects_ordered(max, result, common_prefixes,
+ is_truncated);
+ }
+ }
rgw_obj_key& get_next_marker() {
return next_marker;
}
- };
- };
-
- /** Write/overwrite an object to the bucket storage. */
- virtual int put_system_obj_impl(rgw_raw_obj& obj, uint64_t size, ceph::real_time *mtime,
- map<std::string, bufferlist>& attrs, int flags,
- bufferlist& data,
- RGWObjVersionTracker *objv_tracker,
- ceph::real_time set_mtime /* 0 for don't set */);
-
- virtual int put_system_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl,
- off_t ofs, bool exclusive,
- RGWObjVersionTracker *objv_tracker = nullptr);
- int aio_put_obj_data(void *ctx, rgw_raw_obj& obj, bufferlist& bl,
- off_t ofs, bool exclusive, void **handle);
-
- int put_system_obj(void *ctx, rgw_raw_obj& obj, const char *data, size_t len, bool exclusive,
- ceph::real_time *mtime, map<std::string, bufferlist>& attrs, RGWObjVersionTracker *objv_tracker,
- ceph::real_time set_mtime) {
- bufferlist bl;
- bl.append(data, len);
- int flags = PUT_OBJ_CREATE;
- if (exclusive)
- flags |= PUT_OBJ_EXCL;
-
- return put_system_obj_impl(obj, len, mtime, attrs, flags, bl, objv_tracker, set_mtime);
- }
- int aio_wait(void *handle);
- bool aio_completed(void *handle);
+ }; // class List
+ }; // class Bucket
int on_last_entry_in_listing(RGWBucketInfo& bucket_info,
const std::string& obj_prefix,
const rgw_user& user, /* in */
RGWBucketInfo& bucket_info, /* in */
rgw_obj& obj); /* in */
- int swift_versioning_restore(RGWObjectCtx& obj_ctx, /* in/out */
+ int swift_versioning_restore(RGWSysObjectCtx& sysobj_ctx,
+ RGWObjectCtx& obj_ctx, /* in/out */
const rgw_user& user, /* in */
RGWBucketInfo& bucket_info, /* in */
rgw_obj& obj, /* in */
ATTRSMOD_MERGE = 2
};
- int rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw_obj& obj);
+ int rewrite_obj(RGWBucketInfo& dest_bucket_info, const rgw_obj& obj);
int stat_remote_obj(RGWObjectCtx& obj_ctx,
const rgw_user& user_id,
- const string& client_id,
req_info *info,
const string& source_zone,
rgw_obj& src_obj,
const char *if_match,
const char *if_nomatch,
map<string, bufferlist> *pattrs,
+ map<string, string> *pheaders,
string *version_id,
string *ptag,
string *petag);
int fetch_remote_obj(RGWObjectCtx& obj_ctx,
const rgw_user& user_id,
- const string& client_id,
- const string& op_id,
- bool record_op_state,
req_info *info,
const string& source_zone,
- rgw_obj& dest_obj,
- rgw_obj& src_obj,
+ const rgw_obj& dest_obj,
+ const rgw_obj& src_obj,
RGWBucketInfo& dest_bucket_info,
RGWBucketInfo& src_bucket_info,
+ std::optional<rgw_placement_rule> dest_placement,
ceph::real_time *src_mtime,
ceph::real_time *mtime,
const ceph::real_time *mod_ptr,
bool copy_if_newer,
map<string, bufferlist>& attrs,
RGWObjCategory category,
- uint64_t olh_epoch,
+ std::optional<uint64_t> olh_epoch,
ceph::real_time delete_at,
- string *version_id,
string *ptag,
- ceph::buffer::list *petag,
+ string *petag,
void (*progress_cb)(off_t, void *),
void *progress_data,
- rgw_zone_set *zones_trace= nullptr);
+ rgw_zone_set *zones_trace= nullptr,
+ std::optional<uint64_t>* bytes_transferred = 0);
/**
* Copy an object.
* dest_obj: the object to copy into
*/
int copy_obj(RGWObjectCtx& obj_ctx,
const rgw_user& user_id,
- const string& client_id,
- const string& op_id,
req_info *info,
const string& source_zone,
rgw_obj& dest_obj,
rgw_obj& src_obj,
RGWBucketInfo& dest_bucket_info,
RGWBucketInfo& src_bucket_info,
+ const rgw_placement_rule& dest_placement,
ceph::real_time *src_mtime,
ceph::real_time *mtime,
const ceph::real_time *mod_ptr,
ceph::real_time delete_at,
string *version_id,
string *ptag,
- ceph::buffer::list *petag,
+ string *petag,
void (*progress_cb)(off_t, void *),
void *progress_data);
int copy_obj_data(RGWObjectCtx& obj_ctx,
RGWBucketInfo& dest_bucket_info,
+ const rgw_placement_rule& dest_placement,
RGWRados::Object::Read& read_op, off_t end,
- rgw_obj& dest_obj,
- rgw_obj& src_obj,
- uint64_t max_chunk_size,
+ const rgw_obj& dest_obj,
ceph::real_time *mtime,
ceph::real_time set_mtime,
map<string, bufferlist>& attrs,
- RGWObjCategory category,
uint64_t olh_epoch,
ceph::real_time delete_at,
- string *version_id,
- string *ptag,
- ceph::buffer::list *petag);
+ string *petag);
+ int transition_obj(RGWObjectCtx& obj_ctx,
+ RGWBucketInfo& bucket_info,
+ rgw_obj& obj,
+ const rgw_placement_rule& placement_rule,
+ const real_time& mtime,
+ uint64_t olh_epoch);
+
int check_bucket_empty(RGWBucketInfo& bucket_info);
/**
*/
int delete_bucket(RGWBucketInfo& bucket_info, RGWObjVersionTracker& objv_tracker, bool check_empty = true);
- bool is_meta_master();
-
- /**
- * Check to see if the bucket metadata is synced
- */
- bool is_syncing_bucket_meta(const rgw_bucket& bucket);
void wakeup_meta_sync_shards(set<int>& shard_ids);
void wakeup_data_sync_shards(const string& source_zone, map<int, set<string> >& shard_ids);
const ceph::real_time& expiration_time = ceph::real_time(),
rgw_zone_set *zones_trace = nullptr);
- /** Delete a raw object.*/
int delete_raw_obj(const rgw_raw_obj& obj);
- /* Delete a system object */
- virtual int delete_system_obj(rgw_raw_obj& src_obj, RGWObjVersionTracker *objv_tracker = NULL);
-
/** Remove an object from the bucket index */
- int delete_obj_index(const rgw_obj& obj);
-
- /**
- * Get an attribute for a system object.
- * obj: the object to get attr
- * name: name of the attr to retrieve
- * dest: bufferlist to store the result in
- * Returns: 0 on success, -ERR# otherwise.
- */
- virtual int system_obj_get_attr(rgw_raw_obj& obj, const char *name, bufferlist& dest);
-
- int system_obj_set_attr(void *ctx, rgw_raw_obj& obj, const char *name, bufferlist& bl,
- RGWObjVersionTracker *objv_tracker);
- virtual int system_obj_set_attrs(void *ctx, rgw_raw_obj& obj,
- map<string, bufferlist>& attrs,
- map<string, bufferlist>* rmattrs,
- RGWObjVersionTracker *objv_tracker);
+ int delete_obj_index(const rgw_obj& obj, ceph::real_time mtime);
/**
* Set an attr on an object.
map<string, bufferlist>& attrs,
map<string, bufferlist>* rmattrs);
- int get_system_obj_state(RGWObjectCtx *rctx, rgw_raw_obj& obj, RGWRawObjState **state, RGWObjVersionTracker *objv_tracker);
int get_obj_state(RGWObjectCtx *rctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj, RGWObjState **state,
bool follow_olh, bool assume_noent = false);
int get_obj_state(RGWObjectCtx *rctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj, RGWObjState **state) {
return get_obj_state(rctx, bucket_info, obj, state, true);
}
- virtual int stat_system_obj(RGWObjectCtx& obj_ctx,
- RGWRados::SystemObject::Read::GetObjState& state,
- rgw_raw_obj& obj,
- map<string, bufferlist> *attrs,
- ceph::real_time *lastmod,
- uint64_t *obj_size,
- RGWObjVersionTracker *objv_tracker);
-
- virtual int get_system_obj(RGWObjectCtx& obj_ctx, RGWRados::SystemObject::Read::GetObjState& read_state,
- RGWObjVersionTracker *objv_tracker, rgw_raw_obj& obj,
- bufferlist& bl, off_t ofs, off_t end,
- map<string, bufferlist> *attrs,
- rgw_cache_entry_info *cache_info);
-
- virtual void register_chained_cache(RGWChainedCache *cache) {}
- virtual bool chain_cache_entry(list<rgw_cache_entry_info *>& cache_info_entries, RGWChainedCache::Entry *chained_entry) { return false; }
+ using iterate_obj_cb = int (*)(const rgw_raw_obj&, off_t, off_t,
+ off_t, bool, RGWObjState*, void*);
- int iterate_obj(RGWObjectCtx& ctx,
- const RGWBucketInfo& bucket_info, const rgw_obj& obj,
- off_t ofs, off_t end,
- uint64_t max_chunk_size,
- int (*iterate_obj_cb)(const RGWBucketInfo& bucket_info, const rgw_obj& obj, const rgw_raw_obj&, off_t, off_t, off_t, bool, RGWObjState *, void *),
- void *arg);
+ int iterate_obj(RGWObjectCtx& ctx, const RGWBucketInfo& bucket_info,
+ const rgw_obj& obj, off_t ofs, off_t end,
+ uint64_t max_chunk_size, iterate_obj_cb cb, void *arg);
int flush_read_list(struct get_obj_data *d);
- int get_obj_iterate_cb(RGWObjectCtx *ctx, RGWObjState *astate,
- const RGWBucketInfo& bucket_info, const rgw_obj& obj,
- const rgw_raw_obj& read_obj,
- off_t obj_ofs, off_t read_ofs, off_t len,
- bool is_head_obj, void *arg);
+ int get_obj_iterate_cb(const rgw_raw_obj& read_obj, off_t obj_ofs,
+ off_t read_ofs, off_t len, bool is_head_obj,
+ RGWObjState *astate, void *arg);
void get_obj_aio_completion_cb(librados::completion_t cb, void *arg);
* a simple object read without keeping state
*/
- virtual int raw_obj_stat(rgw_raw_obj& obj, uint64_t *psize, ceph::real_time *pmtime, uint64_t *epoch,
- map<string, bufferlist> *attrs, bufferlist *first_chunk,
- RGWObjVersionTracker *objv_tracker);
+ int raw_obj_stat(rgw_raw_obj& obj, uint64_t *psize, ceph::real_time *pmtime, uint64_t *epoch,
+ map<string, bufferlist> *attrs, bufferlist *first_chunk,
+ RGWObjVersionTracker *objv_tracker);
int obj_operate(const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::ObjectWriteOperation *op);
int obj_operate(const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::ObjectReadOperation *op);
- int guard_reshard(BucketShard *bs, const rgw_obj& obj_instance, std::function<int(BucketShard *)> call);
- int block_while_resharding(RGWRados::BucketShard *bs, string *new_bucket_id);
+ int guard_reshard(BucketShard *bs,
+ const rgw_obj& obj_instance,
+ const RGWBucketInfo& bucket_info,
+ std::function<int(BucketShard *)> call);
+ int block_while_resharding(RGWRados::BucketShard *bs,
+ string *new_bucket_id,
+ const RGWBucketInfo& bucket_info,
+ optional_yield y);
void bucket_index_guard_olh_op(RGWObjState& olh_state, librados::ObjectOperation& op);
int olh_init_modification(const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, string *op_tag);
const rgw_obj& obj_instance, bool delete_marker,
const string& op_tag, struct rgw_bucket_dir_entry_meta *meta,
uint64_t olh_epoch,
- ceph::real_time unmod_since, bool high_precision_time, rgw_zone_set *zones_trace = nullptr);
+ ceph::real_time unmod_since, bool high_precision_time,
+ rgw_zone_set *zones_trace = nullptr,
+ bool log_data_change = false);
int bucket_index_unlink_instance(const RGWBucketInfo& bucket_info, const rgw_obj& obj_instance, const string& op_tag, const string& olh_tag, uint64_t olh_epoch, rgw_zone_set *zones_trace = nullptr);
int bucket_index_read_olh_log(const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& obj_instance, uint64_t ver_marker,
map<uint64_t, vector<rgw_bucket_olh_log_entry> > *log, bool *is_truncated);
uint64_t *plast_ver, rgw_zone_set *zones_trace = nullptr);
int update_olh(RGWObjectCtx& obj_ctx, RGWObjState *state, const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_zone_set *zones_trace = nullptr);
int set_olh(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj, bool delete_marker, rgw_bucket_dir_entry_meta *meta,
- uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, rgw_zone_set *zones_trace = nullptr);
+ uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time,
+ rgw_zone_set *zones_trace = nullptr, bool log_data_change = false);
+ int repair_olh(RGWObjState* state, const RGWBucketInfo& bucket_info,
+ const rgw_obj& obj);
int unlink_obj_instance(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj,
uint64_t olh_epoch, rgw_zone_set *zones_trace = nullptr);
int follow_olh(const RGWBucketInfo& bucket_info, RGWObjectCtx& ctx, RGWObjState *state, const rgw_obj& olh_obj, rgw_obj *target);
int get_olh(const RGWBucketInfo& bucket_info, const rgw_obj& obj, RGWOLHInfo *olh);
+ void gen_rand_obj_instance_name(rgw_obj_key *target_key);
void gen_rand_obj_instance_name(rgw_obj *target);
- int omap_get_vals(rgw_raw_obj& obj, bufferlist& header, const std::string& marker, uint64_t count, std::map<string, bufferlist>& m);
- int omap_get_all(rgw_raw_obj& obj, bufferlist& header, std::map<string, bufferlist>& m);
- int omap_set(rgw_raw_obj& obj, const std::string& key, bufferlist& bl);
- int omap_set(rgw_raw_obj& obj, map<std::string, bufferlist>& m);
- int omap_del(rgw_raw_obj& obj, const std::string& key);
int update_containers_stats(map<string, RGWBucketEnt>& m);
int append_async(rgw_raw_obj& obj, size_t size, bufferlist& bl);
- int watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx);
- int unwatch(uint64_t watch_handle);
- void add_watcher(int i);
- void remove_watcher(int i);
- virtual bool need_watch_notify() { return false; }
- int init_watch();
- void finalize_watch();
- int distribute(const string& key, bufferlist& bl);
- virtual int watch_cb(uint64_t notify_id,
- uint64_t cookie,
- uint64_t notifier_id,
- bufferlist& bl) { return 0; }
- void pick_control_oid(const string& key, string& notify_oid);
-
- virtual void set_cache_enabled(bool state) {}
-
+public:
void set_atomic(void *ctx, rgw_obj& obj) {
RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
- rctx->obj.set_atomic(obj);
- }
- void set_prefetch_data(void *ctx, rgw_obj& obj) {
- RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
- rctx->obj.set_prefetch_data(obj);
+ rctx->set_atomic(obj);
}
- void set_prefetch_data(void *ctx, rgw_raw_obj& obj) {
+ void set_prefetch_data(void *ctx, const rgw_obj& obj) {
RGWObjectCtx *rctx = static_cast<RGWObjectCtx *>(ctx);
- rctx->raw.set_prefetch_data(obj);
+ rctx->set_prefetch_data(obj);
}
-
int decode_policy(bufferlist& bl, ACLOwner *owner);
int get_bucket_stats(RGWBucketInfo& bucket_info, int shard_id, string *bucket_ver, string *master_ver,
- map<RGWObjCategory, RGWStorageStats>& stats, string *max_marker);
+ map<RGWObjCategory, RGWStorageStats>& stats, string *max_marker, bool* syncstopped = NULL);
int get_bucket_stats_async(RGWBucketInfo& bucket_info, int shard_id, RGWGetBucketStats_CB *cb);
int get_user_stats(const rgw_user& user, RGWStorageStats& stats);
int get_user_stats_async(const rgw_user& user, RGWGetUserStats_CB *cb);
bool exclusive, RGWObjVersionTracker& objv_tracker, ceph::real_time mtime,
map<string, bufferlist> *pattrs);
int put_bucket_instance_info(RGWBucketInfo& info, bool exclusive, ceph::real_time mtime, map<string, bufferlist> *pattrs);
- int get_bucket_entrypoint_info(RGWObjectCtx& obj_ctx, const string& tenant_name, const string& bucket_name,
+ int get_bucket_entrypoint_info(RGWSysObjectCtx& obj_ctx, const string& tenant_name, const string& bucket_name,
RGWBucketEntryPoint& entry_point, RGWObjVersionTracker *objv_tracker,
- ceph::real_time *pmtime, map<string, bufferlist> *pattrs, rgw_cache_entry_info *cache_info = NULL);
- int get_bucket_instance_info(RGWObjectCtx& obj_ctx, const string& meta_key, RGWBucketInfo& info, ceph::real_time *pmtime, map<string, bufferlist> *pattrs);
- int get_bucket_instance_info(RGWObjectCtx& obj_ctx, const rgw_bucket& bucket, RGWBucketInfo& info, ceph::real_time *pmtime, map<string, bufferlist> *pattrs);
- int get_bucket_instance_from_oid(RGWObjectCtx& obj_ctx, const string& oid, RGWBucketInfo& info, ceph::real_time *pmtime, map<string, bufferlist> *pattrs,
- rgw_cache_entry_info *cache_info = NULL);
-
- int convert_old_bucket_info(RGWObjectCtx& obj_ctx, const string& tenant_name, const string& bucket_name);
+ ceph::real_time *pmtime, map<string, bufferlist> *pattrs, rgw_cache_entry_info *cache_info = NULL,
+ boost::optional<obj_version> refresh_version = boost::none);
+ int get_bucket_instance_info(RGWSysObjectCtx& obj_ctx, const string& meta_key, RGWBucketInfo& info, ceph::real_time *pmtime, map<string, bufferlist> *pattrs);
+ int get_bucket_instance_info(RGWSysObjectCtx& obj_ctx, const rgw_bucket& bucket, RGWBucketInfo& info, ceph::real_time *pmtime, map<string, bufferlist> *pattrs);
+ int get_bucket_instance_from_oid(RGWSysObjectCtx& obj_ctx, const string& oid, RGWBucketInfo& info, ceph::real_time *pmtime, map<string, bufferlist> *pattrs,
+ rgw_cache_entry_info *cache_info = NULL,
+ boost::optional<obj_version> refresh_version = boost::none);
+
+ int convert_old_bucket_info(RGWSysObjectCtx& obj_ctx, const string& tenant_name, const string& bucket_name);
static void make_bucket_entry_name(const string& tenant_name, const string& bucket_name, string& bucket_entry);
- int get_bucket_info(RGWObjectCtx& obj_ctx,
- const string& tenant_name, const string& bucket_name,
- RGWBucketInfo& info,
- ceph::real_time *pmtime, map<string, bufferlist> *pattrs = NULL);
+
+
+private:
+ int _get_bucket_info(RGWSysObjectCtx& obj_ctx, const string& tenant,
+ const string& bucket_name, RGWBucketInfo& info,
+ real_time *pmtime,
+ map<string, bufferlist> *pattrs,
+ boost::optional<obj_version> refresh_version);
+public:
+
+ bool call(std::string_view command, const cmdmap_t& cmdmap,
+ std::string_view format,
+ bufferlist& out) override final;
+
+protected:
+ // `call_list` must iterate over all cache entries and call
+ // `cache_list_dump_helper` with the supplied Formatter on any that
+ // include `filter` as a substring.
+ //
+ void call_list(const std::optional<std::string>& filter,
+ Formatter* format);
+ // `call_inspect` must look up the requested target and, if found,
+ // dump it to the supplied Formatter and return true. If not found,
+ // it must return false.
+ //
+ bool call_inspect(const std::string& target, Formatter* format);
+
+ // `call_erase` must erase the requested target and return true. If
+ // the requested target does not exist, it should return false.
+ bool call_erase(const std::string& target);
+
+ // `call_zap` must erase the cache.
+ void call_zap();
+public:
+
+ int get_bucket_info(RGWSysObjectCtx& obj_ctx,
+ const string& tenant_name, const string& bucket_name,
+ RGWBucketInfo& info,
+ ceph::real_time *pmtime, map<string, bufferlist> *pattrs = NULL);
+
+ // Returns 0 on successful refresh. Returns error code if there was
+ // an error or the version stored on the OSD is the same as that
+ // presented in the BucketInfo structure.
+ //
+ int try_refresh_bucket_info(RGWBucketInfo& info,
+ ceph::real_time *pmtime,
+ map<string, bufferlist> *pattrs = nullptr);
+
int put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, ceph::real_time mtime, obj_version *pep_objv,
- map<string, bufferlist> *pattrs, bool create_entry_point);
+ map<string, bufferlist> *pattrs, bool create_entry_point);
- int cls_rgw_init_index(librados::IoCtx& io_ctx, librados::ObjectWriteOperation& op, string& oid);
int cls_obj_prepare_op(BucketShard& bs, RGWModifyOp op, string& tag, rgw_obj& obj, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);
int cls_obj_complete_op(BucketShard& bs, const rgw_obj& obj, RGWModifyOp op, string& tag, int64_t pool, uint64_t epoch,
rgw_bucket_dir_entry& ent, RGWObjCategory category, list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);
ceph::real_time& removed_mtime, list<rgw_obj_index_key> *remove_objs, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);
int cls_obj_complete_cancel(BucketShard& bs, string& tag, rgw_obj& obj, uint16_t bilog_flags, rgw_zone_set *zones_trace = nullptr);
int cls_obj_set_bucket_tag_timeout(RGWBucketInfo& bucket_info, uint64_t timeout);
- int cls_bucket_list(RGWBucketInfo& bucket_info, int shard_id, rgw_obj_index_key& start, const string& prefix,
- uint32_t num_entries, bool list_versions, map<string, rgw_bucket_dir_entry>& m,
- bool *is_truncated, rgw_obj_index_key *last_entry,
- bool (*force_check_filter)(const string& name) = NULL);
- int cls_bucket_head(const RGWBucketInfo& bucket_info, int shard_id, map<string, struct rgw_bucket_dir_header>& headers, map<int, string> *bucket_instance_ids = NULL);
+ int cls_bucket_list_ordered(RGWBucketInfo& bucket_info, int shard_id,
+ const rgw_obj_index_key& start,
+ const string& prefix,
+ uint32_t num_entries, bool list_versions,
+ map<string, rgw_bucket_dir_entry>& m,
+ bool *is_truncated,
+ rgw_obj_index_key *last_entry,
+ bool (*force_check_filter)(const string& name) = nullptr);
+ int cls_bucket_list_unordered(RGWBucketInfo& bucket_info, int shard_id,
+ const rgw_obj_index_key& start,
+ const string& prefix,
+ uint32_t num_entries, bool list_versions,
+ vector<rgw_bucket_dir_entry>& ent_list,
+ bool *is_truncated, rgw_obj_index_key *last_entry,
+ bool (*force_check_filter)(const string& name) = nullptr);
+ int cls_bucket_head(const RGWBucketInfo& bucket_info, int shard_id, vector<rgw_bucket_dir_header>& headers, map<int, string> *bucket_instance_ids = NULL);
int cls_bucket_head_async(const RGWBucketInfo& bucket_info, int shard_id, RGWGetDirHeader_CB *ctx, int *num_aio);
int list_bi_log_entries(RGWBucketInfo& bucket_info, int shard_id, string& marker, uint32_t max, std::list<rgw_bi_log_entry>& result, bool *truncated);
int trim_bi_log_entries(RGWBucketInfo& bucket_info, int shard_id, string& marker, string& end_marker);
+ int resync_bi_log_entries(RGWBucketInfo& bucket_info, int shard_id);
+ int stop_bi_log_entries(RGWBucketInfo& bucket_info, int shard_id);
int get_bi_log_status(RGWBucketInfo& bucket_info, int shard_id, map<int, string>& max_marker);
- int bi_get_instance(const RGWBucketInfo& bucket_info, rgw_obj& obj, rgw_bucket_dir_entry *dirent);
- int bi_get(rgw_bucket& bucket, rgw_obj& obj, BIIndexType index_type, rgw_cls_bi_entry *entry);
+ int bi_get_instance(const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_bucket_dir_entry *dirent);
+ int bi_get_olh(const RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_bucket_olh_entry *olh);
+ int bi_get(const RGWBucketInfo& bucket_info, const rgw_obj& obj, BIIndexType index_type, rgw_cls_bi_entry *entry);
void bi_put(librados::ObjectWriteOperation& op, BucketShard& bs, rgw_cls_bi_entry& entry);
int bi_put(BucketShard& bs, rgw_cls_bi_entry& entry);
int bi_put(rgw_bucket& bucket, rgw_obj& obj, rgw_cls_bi_entry& entry);
int bi_remove(BucketShard& bs);
int cls_obj_usage_log_add(const string& oid, rgw_usage_log_info& info);
- int cls_obj_usage_log_read(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries,
- string& read_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage, bool *is_truncated);
- int cls_obj_usage_log_trim(string& oid, string& user, uint64_t start_epoch, uint64_t end_epoch);
+ int cls_obj_usage_log_read(const string& oid, const string& user, const string& bucket, uint64_t start_epoch,
+ uint64_t end_epoch, uint32_t max_entries, string& read_iter, map<rgw_user_bucket,
+ rgw_usage_log_entry>& usage, bool *is_truncated);
+ int cls_obj_usage_log_trim(const string& oid, const string& user, const string& bucket, uint64_t start_epoch,
+ uint64_t end_epoch);
+ int cls_obj_usage_log_clear(string& oid);
int key_to_shard_id(const string& key, int max_shards);
void shard_name(const string& prefix, unsigned max_shards, const string& key, string& name, int *shard_id);
const string& from_marker = std::string(),
const string& to_marker = std::string());
- int lock_exclusive(rgw_pool& pool, const string& oid, ceph::timespan& duration, string& zone_id, string& owner_id);
- int unlock(rgw_pool& pool, const string& oid, string& zone_id, string& owner_id);
+ int lock_exclusive(const rgw_pool& pool, const string& oid, ceph::timespan& duration, string& zone_id, string& owner_id);
+ int unlock(const rgw_pool& pool, const string& oid, string& zone_id, string& owner_id);
void update_gc_chain(rgw_obj& head_obj, RGWObjManifest& manifest, cls_rgw_obj_chain *chain);
int send_chain_to_gc(cls_rgw_obj_chain& chain, const string& tag, bool sync);
int gc_operate(string& oid, librados::ObjectWriteOperation *op);
- int gc_aio_operate(string& oid, librados::ObjectWriteOperation *op);
+ int gc_aio_operate(string& oid, librados::ObjectWriteOperation *op, librados::AioCompletion **pc = nullptr);
int gc_operate(string& oid, librados::ObjectReadOperation *op, bufferlist *pbl);
int list_gc_objs(int *index, string& marker, uint32_t max, bool expired_only, std::list<cls_rgw_gc_obj_info>& result, bool *truncated);
- int process_gc();
- int process_expire_objects();
+ int process_gc(bool expired_only);
+ bool process_expire_objects();
int defer_gc(void *ctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj);
int process_lc();
map<RGWObjCategory, RGWStorageStats> *existing_stats,
map<RGWObjCategory, RGWStorageStats> *calculated_stats);
int bucket_rebuild_index(RGWBucketInfo& bucket_info);
- int bucket_set_reshard(RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry);
+ int bucket_set_reshard(const RGWBucketInfo& bucket_info, const cls_rgw_bucket_instance_entry& entry);
int remove_objs_from_index(RGWBucketInfo& bucket_info, list<rgw_obj_index_key>& oid_list);
int move_rados_obj(librados::IoCtx& src_ioctx,
const string& src_oid, const string& src_locator,
int fix_tail_obj_locator(const RGWBucketInfo& bucket_info, rgw_obj_key& key, bool fix, bool *need_fix);
int cls_user_get_header(const string& user_id, cls_user_header *header);
+ int cls_user_reset_stats(const string& user_id);
int cls_user_get_header_async(const string& user_id, RGWGetUserHeader_CB *ctx);
int cls_user_sync_bucket_stats(rgw_raw_obj& user_obj, const RGWBucketInfo& bucket_info);
int cls_user_list_buckets(rgw_raw_obj& obj,
int cls_user_update_buckets(rgw_raw_obj& obj, list<cls_user_bucket_entry>& entries, bool add);
int cls_user_complete_stats_sync(rgw_raw_obj& obj);
int complete_sync_user_stats(const rgw_user& user_id);
- int cls_user_add_bucket(rgw_raw_obj& obj, list<cls_user_bucket_entry>& entries);
int cls_user_remove_bucket(rgw_raw_obj& obj, const cls_user_bucket& bucket);
+ int cls_user_get_bucket_stats(const rgw_bucket& bucket, cls_user_bucket_entry& entry);
int check_quota(const rgw_user& bucket_owner, rgw_bucket& bucket,
- RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t obj_size);
+ RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t obj_size, bool check_size_only = false);
int check_bucket_shards(const RGWBucketInfo& bucket_info, const rgw_bucket& bucket,
RGWQuotaInfo& bucket_quota);
int add_bucket_to_reshard(const RGWBucketInfo& bucket_info, uint32_t new_num_shards);
uint64_t instance_id();
- const string& zone_name() {
- return get_zone_params().get_name();
- }
- const string& zone_id() {
- return get_zone_params().get_id();
- }
- string unique_id(uint64_t unique_num) {
- char buf[32];
- snprintf(buf, sizeof(buf), ".%llu.%llu", (unsigned long long)instance_id(), (unsigned long long)unique_num);
- string s = get_zone_params().get_id() + buf;
- return s;
- }
-
- void init_unique_trans_id_deps() {
- char buf[16 + 2 + 1]; /* uint64_t needs 16, 2 hyphens add further 2 */
-
- snprintf(buf, sizeof(buf), "-%llx-", (unsigned long long)instance_id());
- url_encode(string(buf) + get_zone_params().get_name(), trans_id_suffix);
- }
-
- /* In order to preserve compability with Swift API, transaction ID
- * should contain at least 32 characters satisfying following spec:
- * - first 21 chars must be in range [0-9a-f]. Swift uses this
- * space for storing fragment of UUID obtained through a call to
- * uuid4() function of Python's uuid module;
- * - char no. 22 must be a hyphen;
- * - at least 10 next characters constitute hex-formatted timestamp
- * padded with zeroes if necessary. All bytes must be in [0-9a-f]
- * range;
- * - last, optional part of transaction ID is any url-encoded string
- * without restriction on length. */
- string unique_trans_id(const uint64_t unique_num) {
- char buf[41]; /* 2 + 21 + 1 + 16 (timestamp can consume up to 16) + 1 */
- time_t timestamp = time(NULL);
-
- snprintf(buf, sizeof(buf), "tx%021llx-%010llx",
- (unsigned long long)unique_num,
- (unsigned long long)timestamp);
-
- return string(buf) + trans_id_suffix;
- }
-
- void get_log_pool(rgw_pool& pool) {
- pool = get_zone_params().log_pool;
- }
-
- bool need_to_log_data() {
- return get_zone().log_data;
- }
-
- bool need_to_log_metadata() {
- return is_meta_master() &&
- (get_zonegroup().zones.size() > 1 || current_period.is_multi_zonegroups_with_zones());
- }
librados::Rados* get_rados_handle();
int delete_raw_obj_aio(const rgw_raw_obj& obj, list<librados::AioCompletion *>& handles);
int delete_obj_aio(const rgw_obj& obj, RGWBucketInfo& info, RGWObjState *astate,
list<librados::AioCompletion *>& handles, bool keep_index_consistent);
+
+ /* mfa/totp stuff */
+ private:
+ void prepare_mfa_write(librados::ObjectWriteOperation *op,
+ RGWObjVersionTracker *objv_tracker,
+ const ceph::real_time& mtime);
+ public:
+ string get_mfa_oid(const rgw_user& user);
+ int get_mfa_ref(const rgw_user& user, rgw_rados_ref *ref);
+ int check_mfa(const rgw_user& user, const string& otp_id, const string& pin);
+ int create_mfa(const rgw_user& user, const rados::cls::otp::otp_info_t& config,
+ RGWObjVersionTracker *objv_tracker, const ceph::real_time& mtime);
+ int remove_mfa(const rgw_user& user, const string& id,
+ RGWObjVersionTracker *objv_tracker, const ceph::real_time& mtime);
+ int get_mfa(const rgw_user& user, const string& id, rados::cls::otp::otp_info_t *result);
+ int list_mfa(const rgw_user& user, list<rados::cls::otp::otp_info_t> *result);
+ int otp_get_current_time(const rgw_user& user, ceph::real_time *result);
+
+ /* mfa interfaces used by metadata engine */
+ int set_mfa(const string& oid, const list<rados::cls::otp::otp_info_t>& entries, bool reset_obj,
+ RGWObjVersionTracker *objv_tracker, const ceph::real_time& mtime);
+ int list_mfa(const string& oid, list<rados::cls::otp::otp_info_t> *result,
+ RGWObjVersionTracker *objv_tracker, ceph::real_time *pmtime);
private:
/**
* This is a helper method, it generates a list of bucket index objects with the given
*/
int pool_iterate_begin(const rgw_pool& pool, RGWPoolIterCtx& ctx);
+ /**
+ * Init pool iteration
+ * pool: pool to use
+ * cursor: position to start iteration
+ * ctx: context object to use for the iteration
+ * Returns: 0 on success, -ERR# otherwise.
+ */
+ int pool_iterate_begin(const rgw_pool& pool, const string& cursor, RGWPoolIterCtx& ctx);
+
+ /**
+ * Get pool iteration position
+ * ctx: context object to use for the iteration
+ * Returns: string representation of position
+ */
+ string pool_iterate_get_cursor(RGWPoolIterCtx& ctx);
+
/**
* Iterate over pool return object names, use optional filter
* ctx: iteration context, initialized with pool_iterate_begin()
class RGWStoreManager {
public:
RGWStoreManager() {}
- static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread) {
+ static RGWRados *get_storage(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads,
+ bool run_sync_thread, bool run_reshard_thread, bool use_cache = true) {
RGWRados *store = init_storage_provider(cct, use_gc_thread, use_lc_thread, quota_threads, run_sync_thread,
- run_reshard_thread);
+ run_reshard_thread, use_cache);
return store;
}
static RGWRados *get_raw_storage(CephContext *cct) {
RGWRados *store = init_raw_storage_provider(cct);
return store;
}
- static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread);
+ static RGWRados *init_storage_provider(CephContext *cct, bool use_gc_thread, bool use_lc_thread, bool quota_threads, bool run_sync_thread, bool run_reshard_thread, bool use_metadata_cache);
static RGWRados *init_raw_storage_provider(CephContext *cct);
static void close_storage(RGWRados *store);
};
-template <class T>
-class RGWChainedCacheImpl : public RGWChainedCache {
- RWLock lock;
-
- map<string, T> entries;
-
-public:
- RGWChainedCacheImpl() : lock("RGWChainedCacheImpl::lock") {}
-
- void init(RGWRados *store) {
- store->register_chained_cache(this);
- }
-
- bool find(const string& key, T *entry) {
- RWLock::RLocker rl(lock);
- typename map<string, T>::iterator iter = entries.find(key);
- if (iter == entries.end()) {
- return false;
- }
-
- *entry = iter->second;
- return true;
- }
-
- bool put(RGWRados *store, const string& key, T *entry, list<rgw_cache_entry_info *>& cache_info_entries) {
- Entry chain_entry(this, key, entry);
-
- /* we need the store cache to call us under its lock to maintain lock ordering */
- return store->chain_cache_entry(cache_info_entries, &chain_entry);
- }
-
- void chain_cb(const string& key, void *data) override {
- T *entry = static_cast<T *>(data);
- RWLock::WLocker wl(lock);
- entries[key] = *entry;
- }
-
- void invalidate(const string& key) override {
- RWLock::WLocker wl(lock);
- entries.erase(key);
- }
-
- void invalidate_all() override {
- RWLock::WLocker wl(lock);
- entries.clear();
- }
-}; /* RGWChainedCacheImpl */
-
-/**
- * Base of PUT operation.
- * Allow to create chained data transformers like compresors and encryptors.
- */
-class RGWPutObjDataProcessor
-{
-public:
- RGWPutObjDataProcessor(){}
- virtual ~RGWPutObjDataProcessor(){}
- virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again) = 0;
- virtual int throttle_data(void *handle, const rgw_raw_obj& obj, uint64_t size, bool need_to_wait) = 0;
-}; /* RGWPutObjDataProcessor */
-
-
-class RGWPutObjProcessor : public RGWPutObjDataProcessor
-{
-protected:
- RGWRados *store;
- RGWObjectCtx& obj_ctx;
- bool is_complete;
- RGWBucketInfo bucket_info;
- bool canceled;
-
- virtual int do_complete(size_t accounted_size, const string& etag,
- ceph::real_time *mtime, ceph::real_time set_mtime,
- map<string, bufferlist>& attrs, ceph::real_time delete_at,
- const char *if_match, const char *if_nomatch, const string *user_data,
- rgw_zone_set* zones_trace = nullptr) = 0;
-
-public:
- RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL),
- obj_ctx(_obj_ctx),
- is_complete(false),
- bucket_info(_bi),
- canceled(false) {}
- ~RGWPutObjProcessor() override {}
- virtual int prepare(RGWRados *_store, string *oid_rand) {
- store = _store;
- return 0;
- }
-
- int complete(size_t accounted_size, const string& etag,
- ceph::real_time *mtime, ceph::real_time set_mtime,
- map<string, bufferlist>& attrs, ceph::real_time delete_at,
- const char *if_match = NULL, const char *if_nomatch = NULL, const string *user_data = nullptr,
- rgw_zone_set *zones_trace = nullptr);
-
- CephContext *ctx();
-
- bool is_canceled() { return canceled; }
-}; /* RGWPutObjProcessor */
-
-struct put_obj_aio_info {
- void *handle;
- rgw_raw_obj obj;
- uint64_t size;
-};
-
-#define RGW_PUT_OBJ_MIN_WINDOW_SIZE_DEFAULT (16 * 1024 * 1024)
-
-class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
-{
- list<struct put_obj_aio_info> pending;
- uint64_t window_size{RGW_PUT_OBJ_MIN_WINDOW_SIZE_DEFAULT};
- uint64_t pending_size{0};
-
- struct put_obj_aio_info pop_pending();
- int wait_pending_front();
- bool pending_has_completed();
-
- rgw_raw_obj last_written_obj;
-
-protected:
- uint64_t obj_len{0};
-
- set<rgw_raw_obj> written_objs;
- rgw_obj head_obj;
-
- void add_written_obj(const rgw_raw_obj& obj) {
- written_objs.insert(obj);
- }
-
- int drain_pending();
- int handle_obj_data(rgw_raw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive);
-
-public:
- int prepare(RGWRados *store, string *oid_rand) override;
- int throttle_data(void *handle, const rgw_raw_obj& obj, uint64_t size, bool need_to_wait) override;
-
- RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info) : RGWPutObjProcessor(obj_ctx, bucket_info) {}
- ~RGWPutObjProcessor_Aio() override;
-}; /* RGWPutObjProcessor_Aio */
-
-class RGWPutObjProcessor_Atomic : public RGWPutObjProcessor_Aio
-{
- bufferlist first_chunk;
- uint64_t part_size;
- off_t cur_part_ofs;
- off_t next_part_ofs;
- int cur_part_id;
- off_t data_ofs;
-
- bufferlist pending_data_bl;
- uint64_t max_chunk_size;
-
- bool versioned_object;
- uint64_t olh_epoch;
- string version_id;
-
-protected:
- rgw_bucket bucket;
- string obj_str;
-
- string unique_tag;
-
- rgw_raw_obj cur_obj;
- RGWObjManifest manifest;
- RGWObjManifest::generator manifest_gen;
-
- int write_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool exclusive);
- int do_complete(size_t accounted_size, const string& etag,
- ceph::real_time *mtime, ceph::real_time set_mtime,
- map<string, bufferlist>& attrs, ceph::real_time delete_at,
- const char *if_match, const char *if_nomatch, const string *user_data, rgw_zone_set *zones_trace) override;
-
- int prepare_next_part(off_t ofs);
- int complete_parts();
- int complete_writing_data();
-
- int prepare_init(RGWRados *store, string *oid_rand);
-
-public:
- ~RGWPutObjProcessor_Atomic() override {}
- RGWPutObjProcessor_Atomic(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info,
- rgw_bucket& _b, const string& _o, uint64_t _p, const string& _t, bool versioned) :
- RGWPutObjProcessor_Aio(obj_ctx, bucket_info),
- part_size(_p),
- cur_part_ofs(0),
- next_part_ofs(_p),
- cur_part_id(0),
- data_ofs(0),
- max_chunk_size(0),
- versioned_object(versioned),
- olh_epoch(0),
- bucket(_b),
- obj_str(_o),
- unique_tag(_t) {}
- int prepare(RGWRados *store, string *oid_rand) override;
- virtual bool immutable_head() { return false; }
- int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_raw_obj *pobj, bool *again) override;
-
- void set_olh_epoch(uint64_t epoch) {
- olh_epoch = epoch;
- }
-
- void set_version_id(const string& vid) {
- version_id = vid;
- }
-}; /* RGWPutObjProcessor_Atomic */
-
-#define MP_META_SUFFIX ".meta"
-
class RGWMPObj {
string oid;
string prefix;
meta = prefix + upload_id + MP_META_SUFFIX;
prefix.append(part_unique_str);
}
- string& get_meta() { return meta; }
- string get_part(int num) {
+ const string& get_meta() const { return meta; }
+ string get_part(int num) const {
char buf[16];
snprintf(buf, 16, ".%d", num);
string s = prefix;
s.append(buf);
return s;
}
- string get_part(string& part) {
+ string get_part(const string& part) const {
string s = prefix;
s.append(".");
s.append(part);
return s;
}
- string& get_upload_id() {
+ const string& get_upload_id() const {
return upload_id;
}
- string& get_key() {
+ const string& get_key() const {
return oid;
}
bool from_meta(string& meta) {
meta = "";
upload_id = "";
}
-};
+}; // class RGWMPObj
-class RGWPutObjProcessor_Multipart : public RGWPutObjProcessor_Atomic
-{
- string part_num;
- RGWMPObj mp;
- req_state *s;
- string upload_id;
+
+class RGWRadosThread {
+ class Worker : public Thread {
+ CephContext *cct;
+ RGWRadosThread *processor;
+ Mutex lock;
+ Cond cond;
+
+ void wait() {
+ Mutex::Locker l(lock);
+ cond.Wait(lock);
+ };
+
+ void wait_interval(const utime_t& wait_time) {
+ Mutex::Locker l(lock);
+ cond.WaitInterval(lock, wait_time);
+ }
+
+ public:
+ Worker(CephContext *_cct, RGWRadosThread *_p) : cct(_cct), processor(_p), lock("RGWRadosThread::Worker") {}
+ void *entry() override;
+ void signal() {
+ Mutex::Locker l(lock);
+ cond.Signal();
+ }
+ };
+
+ Worker *worker;
protected:
- int prepare(RGWRados *store, string *oid_rand);
- int do_complete(size_t accounted_size, const string& etag,
- ceph::real_time *mtime, ceph::real_time set_mtime,
- map<string, bufferlist>& attrs, ceph::real_time delete_at,
- const char *if_match, const char *if_nomatch, const string *user_data,
- rgw_zone_set *zones_trace) override;
+ CephContext *cct;
+ RGWRados *store;
+
+ std::atomic<bool> down_flag = { false };
+
+ string thread_name;
+
+ virtual uint64_t interval_msec() = 0;
+ virtual void stop_process() {}
public:
- bool immutable_head() { return true; }
- RGWPutObjProcessor_Multipart(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, uint64_t _p, req_state *_s) :
- RGWPutObjProcessor_Atomic(obj_ctx, bucket_info, _s->bucket, _s->object.name, _p, _s->req_id, false), s(_s) {}
- void get_mp(RGWMPObj** _mp);
-}; /* RGWPutObjProcessor_Multipart */
+ RGWRadosThread(RGWRados *_store, const string& thread_name = "radosgw")
+ : worker(NULL), cct(_store->ctx()), store(_store), thread_name(thread_name) {}
+ virtual ~RGWRadosThread() {
+ stop();
+ }
+
+ virtual int init() { return 0; }
+ virtual int process() = 0;
+
+ bool going_down() { return down_flag; }
+
+ void start();
+ void stop();
+
+ void signal() {
+ if (worker) {
+ worker->signal();
+ }
+ }
+};
+
#endif