1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_RGW_BUCKET_H
5 #define CEPH_RGW_BUCKET_H
10 #include "include/types.h"
11 #include "rgw_common.h"
12 #include "rgw_tools.h"
14 #include "rgw_rados.h"
16 #include "rgw_string.h"
18 #include "common/Formatter.h"
19 #include "common/lru_map.h"
20 #include "common/ceph_time.h"
21 #include "rgw_formats.h"
23 // define as static when RGWBucket implementation compete
24 extern void rgw_get_buckets_obj(const rgw_user
& user_id
, string
& buckets_obj_id
);
26 extern int rgw_bucket_store_info(RGWRados
*store
, const string
& bucket_name
, bufferlist
& bl
, bool exclusive
,
27 map
<string
, bufferlist
> *pattrs
, RGWObjVersionTracker
*objv_tracker
,
29 extern int rgw_bucket_instance_store_info(RGWRados
*store
, string
& oid
, bufferlist
& bl
, bool exclusive
,
30 map
<string
, bufferlist
> *pattrs
, RGWObjVersionTracker
*objv_tracker
,
33 extern int rgw_bucket_parse_bucket_instance(const string
& bucket_instance
, string
*target_bucket_instance
, int *shard_id
);
34 extern int rgw_bucket_parse_bucket_key(CephContext
*cct
, const string
& key
,
35 rgw_bucket
* bucket
, int *shard_id
);
37 extern int rgw_bucket_instance_remove_entry(RGWRados
*store
, string
& entry
, RGWObjVersionTracker
*objv_tracker
);
38 extern void rgw_bucket_instance_key_to_oid(string
& key
);
39 extern void rgw_bucket_instance_oid_to_key(string
& oid
);
41 extern int rgw_bucket_delete_bucket_obj(RGWRados
*store
,
42 const string
& tenant_name
,
43 const string
& bucket_name
,
44 RGWObjVersionTracker
& objv_tracker
);
46 extern int rgw_bucket_sync_user_stats(RGWRados
*store
, const rgw_user
& user_id
, const RGWBucketInfo
& bucket_info
);
47 extern int rgw_bucket_sync_user_stats(RGWRados
*store
, const string
& tenant_name
, const string
& bucket_name
);
49 extern std::string
rgw_make_bucket_entry_name(const std::string
& tenant_name
,
50 const std::string
& bucket_name
);
51 static inline void rgw_make_bucket_entry_name(const string
& tenant_name
,
52 const string
& bucket_name
,
53 std::string
& bucket_entry
) {
54 bucket_entry
= rgw_make_bucket_entry_name(tenant_name
, bucket_name
);
57 extern void rgw_parse_url_bucket(const string
& bucket
,
58 const string
& auth_tenant
,
59 string
&tenant_name
, string
&bucket_name
);
61 struct RGWBucketCompleteInfo
{
63 map
<string
, bufferlist
> attrs
;
65 void dump(Formatter
*f
) const;
66 void decode_json(JSONObj
*obj
);
69 class RGWBucketEntryMetadataObject
: public RGWMetadataObject
{
70 RGWBucketEntryPoint ep
;
72 RGWBucketEntryMetadataObject(RGWBucketEntryPoint
& _ep
, obj_version
& v
, real_time m
) : ep(_ep
) {
77 void dump(Formatter
*f
) const override
{
82 class RGWBucketInstanceMetadataObject
: public RGWMetadataObject
{
83 RGWBucketCompleteInfo info
;
85 RGWBucketInstanceMetadataObject() {}
86 RGWBucketInstanceMetadataObject(RGWBucketCompleteInfo
& i
, obj_version
& v
, real_time m
) : info(i
) {
91 void dump(Formatter
*f
) const override
{
95 void decode_json(JSONObj
*obj
) {
96 info
.decode_json(obj
);
99 RGWBucketInfo
& get_bucket_info() { return info
.info
; }
103 * Store a list of the user's buckets, with associated functinos.
107 std::map
<std::string
, RGWBucketEnt
> buckets
;
110 RGWUserBuckets() = default;
111 RGWUserBuckets(RGWUserBuckets
&&) = default;
113 RGWUserBuckets
& operator=(const RGWUserBuckets
&) = default;
115 void encode(bufferlist
& bl
) const {
116 ::encode(buckets
, bl
);
118 void decode(bufferlist::iterator
& bl
) {
119 ::decode(buckets
, bl
);
122 * Check if the user owns a bucket by the given name.
124 bool owns(string
& name
) {
125 map
<string
, RGWBucketEnt
>::iterator iter
;
126 iter
= buckets
.find(name
);
127 return (iter
!= buckets
.end());
131 * Add a (created) bucket to the user's bucket list.
133 void add(const RGWBucketEnt
& bucket
) {
134 buckets
[bucket
.bucket
.name
] = bucket
;
138 * Remove a bucket from the user's list by name.
140 void remove(string
& name
) {
141 map
<string
, RGWBucketEnt
>::iterator iter
;
142 iter
= buckets
.find(name
);
143 if (iter
!= buckets
.end()) {
149 * Get the user's buckets as a map.
151 map
<string
, RGWBucketEnt
>& get_buckets() { return buckets
; }
154 * Cleanup data structure
156 void clear() { buckets
.clear(); }
158 size_t count() { return buckets
.size(); }
160 WRITE_CLASS_ENCODER(RGWUserBuckets
)
162 class RGWMetadataManager
;
164 extern void rgw_bucket_init(RGWMetadataManager
*mm
);
166 * Get all the buckets owned by a user and fill up an RGWUserBuckets with them.
167 * Returns: 0 on success, -ERR# on failure.
169 extern int rgw_read_user_buckets(RGWRados
*store
,
170 const rgw_user
& user_id
,
171 RGWUserBuckets
& buckets
,
172 const string
& marker
,
173 const string
& end_marker
,
177 uint64_t default_amount
= 1000);
179 extern int rgw_link_bucket(RGWRados
* store
,
180 const rgw_user
& user_id
,
182 ceph::real_time creation_time
,
183 bool update_entrypoint
= true);
184 extern int rgw_unlink_bucket(RGWRados
*store
, const rgw_user
& user_id
,
185 const string
& tenant_name
, const string
& bucket_name
, bool update_entrypoint
= true);
187 extern int rgw_remove_object(RGWRados
*store
, RGWBucketInfo
& bucket_info
, rgw_bucket
& bucket
, rgw_obj_key
& key
);
188 extern int rgw_remove_bucket(RGWRados
*store
, rgw_bucket
& bucket
, bool delete_children
);
189 extern int rgw_remove_bucket_bypass_gc(RGWRados
*store
, rgw_bucket
& bucket
, int concurrent_max
);
191 extern int rgw_bucket_set_attrs(RGWRados
*store
, RGWBucketInfo
& bucket_info
,
192 map
<string
, bufferlist
>& attrs
,
193 RGWObjVersionTracker
*objv_tracker
);
195 extern void check_bad_user_bucket_mapping(RGWRados
*store
, const rgw_user
& user_id
, bool fix
);
197 struct RGWBucketAdminOpState
{
199 std::string display_name
;
200 std::string bucket_name
;
201 std::string bucket_id
;
202 std::string object_name
;
208 bool delete_child_objects
;
216 void set_fetch_stats(bool value
) { stat_buckets
= value
; }
217 void set_check_objects(bool value
) { check_objects
= value
; }
218 void set_fix_index(bool value
) { fix_index
= value
; }
219 void set_delete_children(bool value
) { delete_child_objects
= value
; }
221 void set_max_aio(int value
) { max_aio
= value
; }
223 void set_user_id(const rgw_user
& user_id
) {
224 if (!user_id
.empty())
227 void set_bucket_name(const std::string
& bucket_str
) {
228 bucket_name
= bucket_str
;
230 void set_object(std::string
& object_str
) {
231 object_name
= object_str
;
233 void set_quota(RGWQuotaInfo
& value
) {
238 rgw_user
& get_user_id() { return uid
; }
239 std::string
& get_user_display_name() { return display_name
; }
240 std::string
& get_bucket_name() { return bucket_name
; }
241 std::string
& get_object_name() { return object_name
; }
243 rgw_bucket
& get_bucket() { return bucket
; }
244 void set_bucket(rgw_bucket
& _bucket
) {
246 bucket_stored
= true;
249 void set_bucket_id(const string
& bi
) {
252 const string
& get_bucket_id() { return bucket_id
; }
254 bool will_fetch_stats() { return stat_buckets
; }
255 bool will_fix_index() { return fix_index
; }
256 bool will_delete_children() { return delete_child_objects
; }
257 bool will_check_objects() { return check_objects
; }
258 bool is_user_op() { return !uid
.empty(); }
259 bool is_system_op() { return uid
.empty(); }
260 bool has_bucket_stored() { return bucket_stored
; }
261 int get_max_aio() { return max_aio
; }
263 RGWBucketAdminOpState() : list_buckets(false), stat_buckets(false), check_objects(false),
264 fix_index(false), delete_child_objects(false),
265 bucket_stored(false) {}
269 * A simple wrapper class for administrative bucket operations
274 RGWUserBuckets buckets
;
276 RGWAccessHandle handle
;
278 RGWUserInfo user_info
;
280 std::string bucket_name
;
284 RGWBucketInfo bucket_info
;
287 RGWBucket() : store(NULL
), handle(NULL
), failure(false) {}
288 int init(RGWRados
*storage
, RGWBucketAdminOpState
& op_state
);
290 int check_bad_index_multipart(RGWBucketAdminOpState
& op_state
,
291 RGWFormatterFlusher
& flusher
, std::string
*err_msg
= NULL
);
293 int check_object_index(RGWBucketAdminOpState
& op_state
,
294 RGWFormatterFlusher
& flusher
,
295 std::string
*err_msg
= NULL
);
297 int check_index(RGWBucketAdminOpState
& op_state
,
298 map
<RGWObjCategory
, RGWStorageStats
>& existing_stats
,
299 map
<RGWObjCategory
, RGWStorageStats
>& calculated_stats
,
300 std::string
*err_msg
= NULL
);
302 int remove(RGWBucketAdminOpState
& op_state
, bool bypass_gc
= false, bool keep_index_consistent
= true, std::string
*err_msg
= NULL
);
303 int link(RGWBucketAdminOpState
& op_state
, std::string
*err_msg
= NULL
);
304 int unlink(RGWBucketAdminOpState
& op_state
, std::string
*err_msg
= NULL
);
305 int set_quota(RGWBucketAdminOpState
& op_state
, std::string
*err_msg
= NULL
);
307 int remove_object(RGWBucketAdminOpState
& op_state
, std::string
*err_msg
= NULL
);
308 int policy_bl_to_stream(bufferlist
& bl
, ostream
& o
);
309 int get_policy(RGWBucketAdminOpState
& op_state
, RGWAccessControlPolicy
& policy
);
311 void clear_failure() { failure
= false; }
314 class RGWBucketAdminOp
317 static int get_policy(RGWRados
*store
, RGWBucketAdminOpState
& op_state
,
318 RGWFormatterFlusher
& flusher
);
319 static int get_policy(RGWRados
*store
, RGWBucketAdminOpState
& op_state
,
320 RGWAccessControlPolicy
& policy
);
321 static int dump_s3_policy(RGWRados
*store
, RGWBucketAdminOpState
& op_state
,
324 static int unlink(RGWRados
*store
, RGWBucketAdminOpState
& op_state
);
325 static int link(RGWRados
*store
, RGWBucketAdminOpState
& op_state
, string
*err_msg
= NULL
);
327 static int check_index(RGWRados
*store
, RGWBucketAdminOpState
& op_state
,
328 RGWFormatterFlusher
& flusher
);
330 static int remove_bucket(RGWRados
*store
, RGWBucketAdminOpState
& op_state
, bool bypass_gc
= false, bool keep_index_consistent
= true);
331 static int remove_object(RGWRados
*store
, RGWBucketAdminOpState
& op_state
);
332 static int info(RGWRados
*store
, RGWBucketAdminOpState
& op_state
, RGWFormatterFlusher
& flusher
);
333 static int limit_check(RGWRados
*store
, RGWBucketAdminOpState
& op_state
,
334 const std::list
<std::string
>& user_ids
,
335 RGWFormatterFlusher
& flusher
,
336 bool warnings_only
= false);
337 static int set_quota(RGWRados
*store
, RGWBucketAdminOpState
& op_state
);
341 enum DataLogEntityType
{
342 ENTITY_TYPE_UNKNOWN
= 0,
343 ENTITY_TYPE_BUCKET
= 1,
346 struct rgw_data_change
{
347 DataLogEntityType entity_type
;
351 void encode(bufferlist
& bl
) const {
352 ENCODE_START(1, 1, bl
);
353 uint8_t t
= (uint8_t)entity_type
;
356 ::encode(timestamp
, bl
);
360 void decode(bufferlist::iterator
& bl
) {
364 entity_type
= (DataLogEntityType
)t
;
366 ::decode(timestamp
, bl
);
370 void dump(Formatter
*f
) const;
371 void decode_json(JSONObj
*obj
);
373 WRITE_CLASS_ENCODER(rgw_data_change
)
375 struct rgw_data_change_log_entry
{
377 real_time log_timestamp
;
378 rgw_data_change entry
;
380 void encode(bufferlist
& bl
) const {
381 ENCODE_START(1, 1, bl
);
382 ::encode(log_id
, bl
);
383 ::encode(log_timestamp
, bl
);
388 void decode(bufferlist::iterator
& bl
) {
390 ::decode(log_id
, bl
);
391 ::decode(log_timestamp
, bl
);
396 void dump(Formatter
*f
) const;
397 void decode_json(JSONObj
*obj
);
399 WRITE_CLASS_ENCODER(rgw_data_change_log_entry
)
401 struct RGWDataChangesLogInfo
{
403 real_time last_update
;
405 void dump(Formatter
*f
) const;
406 void decode_json(JSONObj
*obj
);
410 struct BucketChangeObserver
;
413 class RGWDataChangesLog
{
416 rgw::BucketChangeObserver
*observer
= nullptr;
422 RWLock modified_lock
;
423 map
<int, set
<string
> > modified_shards
;
425 std::atomic
<bool> down_flag
= { false };
427 struct ChangeStatus
{
428 real_time cur_expiration
;
431 RefCountedCond
*cond
;
434 ChangeStatus() : pending(false), cond(NULL
) {
435 lock
= new Mutex("RGWDataChangesLog::ChangeStatus");
443 typedef ceph::shared_ptr
<ChangeStatus
> ChangeStatusPtr
;
445 lru_map
<rgw_bucket_shard
, ChangeStatusPtr
> changes
;
447 map
<rgw_bucket_shard
, bool> cur_cycle
;
449 void _get_change(const rgw_bucket_shard
& bs
, ChangeStatusPtr
& status
);
450 void register_renew(rgw_bucket_shard
& bs
);
451 void update_renewed(rgw_bucket_shard
& bs
, real_time
& expiration
);
453 class ChangesRenewThread
: public Thread
{
455 RGWDataChangesLog
*log
;
460 ChangesRenewThread(CephContext
*_cct
, RGWDataChangesLog
*_log
) : cct(_cct
), log(_log
), lock("ChangesRenewThread::lock") {}
461 void *entry() override
;
465 ChangesRenewThread
*renew_thread
;
469 RGWDataChangesLog(CephContext
*_cct
, RGWRados
*_store
) : cct(_cct
), store(_store
),
470 lock("RGWDataChangesLog::lock"), modified_lock("RGWDataChangesLog::modified_lock"),
471 changes(cct
->_conf
->rgw_data_log_changes_size
) {
472 num_shards
= cct
->_conf
->rgw_data_log_num_shards
;
474 oids
= new string
[num_shards
];
476 string prefix
= cct
->_conf
->rgw_data_log_obj_prefix
;
478 if (prefix
.empty()) {
482 for (int i
= 0; i
< num_shards
; i
++) {
484 snprintf(buf
, sizeof(buf
), "%s.%d", prefix
.c_str(), i
);
488 renew_thread
= new ChangesRenewThread(cct
, this);
489 renew_thread
->create("rgw_dt_lg_renew");
492 ~RGWDataChangesLog();
494 int choose_oid(const rgw_bucket_shard
& bs
);
495 const std::string
& get_oid(int shard_id
) const { return oids
[shard_id
]; }
496 int add_entry(rgw_bucket
& bucket
, int shard_id
);
497 int get_log_shard_id(rgw_bucket
& bucket
, int shard_id
);
499 int list_entries(int shard
, const real_time
& start_time
, const real_time
& end_time
, int max_entries
,
500 list
<rgw_data_change_log_entry
>& entries
,
501 const string
& marker
,
504 int trim_entries(int shard_id
, const real_time
& start_time
, const real_time
& end_time
,
505 const string
& start_marker
, const string
& end_marker
);
506 int trim_entries(const real_time
& start_time
, const real_time
& end_time
,
507 const string
& start_marker
, const string
& end_marker
);
508 int get_info(int shard_id
, RGWDataChangesLogInfo
*info
);
509 int lock_exclusive(int shard_id
, timespan duration
, string
& zone_id
, string
& owner_id
) {
510 return store
->lock_exclusive(store
->get_zone_params().log_pool
, oids
[shard_id
], duration
, zone_id
, owner_id
);
512 int unlock(int shard_id
, string
& zone_id
, string
& owner_id
) {
513 return store
->unlock(store
->get_zone_params().log_pool
, oids
[shard_id
], zone_id
, owner_id
);
519 LogMarker() : shard(0) {}
521 int list_entries(const real_time
& start_time
, const real_time
& end_time
, int max_entries
,
522 list
<rgw_data_change_log_entry
>& entries
, LogMarker
& marker
, bool *ptruncated
);
524 void mark_modified(int shard_id
, const rgw_bucket_shard
& bs
);
525 void read_clear_modified(map
<int, set
<string
> > &modified
);
527 void set_observer(rgw::BucketChangeObserver
*observer
) {
528 this->observer
= observer
;