1 // -*- mode:C; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "include/utime.h"
7 #include "objclass/objclass.h"
9 #include "cls_user_ops.h"
14 static int write_entry(cls_method_context_t hctx
, const string
& key
, const cls_user_bucket_entry
& entry
)
19 int ret
= cls_cxx_map_set_val(hctx
, key
, &bl
);
26 static int remove_entry(cls_method_context_t hctx
, const string
& key
)
28 int ret
= cls_cxx_map_remove_key(hctx
, key
);
35 static void get_key_by_bucket_name(const string
& bucket_name
, string
*key
)
40 static int get_existing_bucket_entry(cls_method_context_t hctx
, const string
& bucket_name
,
41 cls_user_bucket_entry
& entry
)
43 if (bucket_name
.empty()) {
48 get_key_by_bucket_name(bucket_name
, &key
);
51 int rc
= cls_cxx_map_get_val(hctx
, key
, &bl
);
53 CLS_LOG(10, "could not read entry %s", key
.c_str());
57 bufferlist::iterator iter
= bl
.begin();
58 ::decode(entry
, iter
);
59 } catch (buffer::error
& err
) {
60 CLS_LOG(0, "ERROR: failed to decode entry %s", key
.c_str());
67 static int read_header(cls_method_context_t hctx
, cls_user_header
*header
)
71 int ret
= cls_cxx_map_read_header(hctx
, &bl
);
75 if (bl
.length() == 0) {
76 *header
= cls_user_header();
81 ::decode(*header
, bl
);
82 } catch (buffer::error
& err
) {
83 CLS_LOG(0, "ERROR: failed to decode user header");
90 static void add_header_stats(cls_user_stats
*stats
, cls_user_bucket_entry
& entry
)
92 stats
->total_entries
+= entry
.count
;
93 stats
->total_bytes
+= entry
.size
;
94 stats
->total_bytes_rounded
+= entry
.size_rounded
;
97 static void dec_header_stats(cls_user_stats
*stats
, cls_user_bucket_entry
& entry
)
99 stats
->total_bytes
-= entry
.size
;
100 stats
->total_bytes_rounded
-= entry
.size_rounded
;
101 stats
->total_entries
-= entry
.count
;
104 static void apply_entry_stats(const cls_user_bucket_entry
& src_entry
, cls_user_bucket_entry
*target_entry
)
106 target_entry
->size
= src_entry
.size
;
107 target_entry
->size_rounded
= src_entry
.size_rounded
;
108 target_entry
->count
= src_entry
.count
;
111 static int cls_user_set_buckets_info(cls_method_context_t hctx
, bufferlist
*in
, bufferlist
*out
)
113 bufferlist::iterator in_iter
= in
->begin();
115 cls_user_set_buckets_op op
;
117 ::decode(op
, in_iter
);
118 } catch (buffer::error
& err
) {
119 CLS_LOG(1, "ERROR: cls_user_add_op(): failed to decode op");
123 cls_user_header header
;
124 int ret
= read_header(hctx
, &header
);
126 CLS_LOG(0, "ERROR: failed to read user info header ret=%d", ret
);
130 for (list
<cls_user_bucket_entry
>::iterator iter
= op
.entries
.begin();
131 iter
!= op
.entries
.end(); ++iter
) {
132 cls_user_bucket_entry
& update_entry
= *iter
;
136 get_key_by_bucket_name(update_entry
.bucket
.name
, &key
);
138 cls_user_bucket_entry entry
;
139 ret
= get_existing_bucket_entry(hctx
, key
, entry
);
141 if (ret
== -ENOENT
) {
143 continue; /* racing bucket removal */
145 entry
= update_entry
;
149 // bucket id may have changed (ie reshard)
150 entry
.bucket
.bucket_id
= update_entry
.bucket
.bucket_id
;
154 CLS_LOG(0, "ERROR: get_existing_bucket_entry() key=%s returned %d", key
.c_str(), ret
);
156 } else if (ret
>= 0 && entry
.user_stats_sync
) {
157 dec_header_stats(&header
.stats
, entry
);
160 CLS_LOG(20, "storing entry for key=%s size=%lld count=%lld",
161 key
.c_str(), (long long)update_entry
.size
, (long long)update_entry
.count
);
163 // sync entry stats when not an op.add, as when the case is op.add if its a
164 // new entry we already have copied update_entry earlier, OTOH, for an existing entry
165 // we end up clobbering the existing stats for the bucket
167 apply_entry_stats(update_entry
, &entry
);
169 entry
.user_stats_sync
= true;
171 ret
= write_entry(hctx
, key
, entry
);
175 add_header_stats(&header
.stats
, entry
);
180 CLS_LOG(20, "header: total bytes=%lld entries=%lld", (long long)header
.stats
.total_bytes
, (long long)header
.stats
.total_entries
);
182 if (header
.last_stats_update
< op
.time
)
183 header
.last_stats_update
= op
.time
;
185 ::encode(header
, bl
);
187 ret
= cls_cxx_map_write_header(hctx
, &bl
);
194 static int cls_user_complete_stats_sync(cls_method_context_t hctx
, bufferlist
*in
, bufferlist
*out
)
196 bufferlist::iterator in_iter
= in
->begin();
198 cls_user_complete_stats_sync_op op
;
200 ::decode(op
, in_iter
);
201 } catch (buffer::error
& err
) {
202 CLS_LOG(1, "ERROR: cls_user_add_op(): failed to decode op");
206 cls_user_header header
;
207 int ret
= read_header(hctx
, &header
);
209 CLS_LOG(0, "ERROR: failed to read user info header ret=%d", ret
);
213 if (header
.last_stats_sync
< op
.time
)
214 header
.last_stats_sync
= op
.time
;
218 ::encode(header
, bl
);
220 ret
= cls_cxx_map_write_header(hctx
, &bl
);
227 static int cls_user_remove_bucket(cls_method_context_t hctx
, bufferlist
*in
, bufferlist
*out
)
229 bufferlist::iterator in_iter
= in
->begin();
231 cls_user_remove_bucket_op op
;
233 ::decode(op
, in_iter
);
234 } catch (buffer::error
& err
) {
235 CLS_LOG(1, "ERROR: cls_user_add_op(): failed to decode op");
239 cls_user_header header
;
240 int ret
= read_header(hctx
, &header
);
242 CLS_LOG(0, "ERROR: failed to read user info header ret=%d", ret
);
248 get_key_by_bucket_name(op
.bucket
.name
, &key
);
250 cls_user_bucket_entry entry
;
251 ret
= get_existing_bucket_entry(hctx
, key
, entry
);
252 if (ret
== -ENOENT
) {
253 return 0; /* idempotent removal */
256 CLS_LOG(0, "ERROR: get existing bucket entry, key=%s ret=%d", key
.c_str(), ret
);
260 CLS_LOG(20, "removing entry at %s", key
.c_str());
262 ret
= remove_entry(hctx
, key
);
266 if (!entry
.user_stats_sync
) {
270 dec_header_stats(&header
.stats
, entry
);
272 CLS_LOG(20, "header: total bytes=%lld entries=%lld", (long long)header
.stats
.total_bytes
, (long long)header
.stats
.total_entries
);
276 return cls_cxx_map_write_header(hctx
, &bl
);
279 static int cls_user_list_buckets(cls_method_context_t hctx
, bufferlist
*in
, bufferlist
*out
)
281 bufferlist::iterator in_iter
= in
->begin();
283 cls_user_list_buckets_op op
;
285 ::decode(op
, in_iter
);
286 } catch (buffer::error
& err
) {
287 CLS_LOG(1, "ERROR: cls_user_list_op(): failed to decode op");
291 map
<string
, bufferlist
> keys
;
293 const string
& from_index
= op
.marker
;
294 const string
& to_index
= op
.end_marker
;
295 const bool to_index_valid
= !to_index
.empty();
297 #define MAX_ENTRIES 1000
298 size_t max_entries
= op
.max_entries
;
299 if (max_entries
> MAX_ENTRIES
)
300 max_entries
= MAX_ENTRIES
;
303 cls_user_list_buckets_ret ret
;
305 int rc
= cls_cxx_map_get_vals(hctx
, from_index
, match_prefix
, max_entries
, &keys
, &ret
.truncated
);
309 CLS_LOG(20, "from_index=%s to_index=%s match_prefix=%s",
312 match_prefix
.c_str());
314 list
<cls_user_bucket_entry
>& entries
= ret
.entries
;
315 map
<string
, bufferlist
>::iterator iter
= keys
.begin();
319 for (; iter
!= keys
.end(); ++iter
) {
320 const string
& index
= iter
->first
;
323 if (to_index_valid
&& to_index
.compare(index
) <= 0) {
324 ret
.truncated
= false;
328 bufferlist
& bl
= iter
->second
;
329 bufferlist::iterator biter
= bl
.begin();
331 cls_user_bucket_entry e
;
333 entries
.push_back(e
);
334 } catch (buffer::error
& err
) {
335 CLS_LOG(0, "ERROR: cls_user_list: could not decode entry, index=%s", index
.c_str());
348 static int cls_user_get_header(cls_method_context_t hctx
, bufferlist
*in
, bufferlist
*out
)
350 bufferlist::iterator in_iter
= in
->begin();
352 cls_user_get_header_op op
;
354 ::decode(op
, in_iter
);
355 } catch (buffer::error
& err
) {
356 CLS_LOG(1, "ERROR: cls_user_get_header_op(): failed to decode op");
360 cls_user_get_header_ret op_ret
;
362 int ret
= read_header(hctx
, &op_ret
.header
);
366 ::encode(op_ret
, *out
);
371 /// A method to reset the user.buckets header stats in accordance to the values
372 /// seen in the user.buckets omap keys. This will not be equivalent to --sync-stats
373 /// which requires comparing the values with actual bucket meta stats supplied
375 static int cls_user_reset_stats(cls_method_context_t hctx
, bufferlist
*in
, bufferlist
*out
/*ignore*/)
377 cls_user_reset_stats_op op
;
380 auto bliter
= in
->begin();
381 ::decode(op
, bliter
);
382 } catch (buffer::error
& err
) {
383 CLS_LOG(0, "ERROR: cls_user_reset_op(): failed to decode op");
386 cls_user_header header
;
387 bool truncated
= false;
388 string from_index
, prefix
;
390 map
<string
, bufferlist
> keys
;
391 int rc
= cls_cxx_map_get_vals(hctx
, from_index
, prefix
, MAX_ENTRIES
, &keys
, &truncated
);
396 for (const auto&kv
: keys
){
397 cls_user_bucket_entry e
;
400 auto bliter
= bl
.begin();
402 } catch (buffer::error
& err
) {
403 CLS_LOG(0, "ERROR: failed to decode bucket entry for %s", kv
.first
.c_str());
406 add_header_stats(&header
.stats
, e
);
411 header
.last_stats_update
= op
.time
;
412 ::encode(header
, bl
);
414 return cls_cxx_map_write_header(hctx
, &bl
);
419 CLS_LOG(1, "Loaded user class!");
421 cls_handle_t h_class
;
422 cls_method_handle_t h_user_set_buckets_info
;
423 cls_method_handle_t h_user_complete_stats_sync
;
424 cls_method_handle_t h_user_remove_bucket
;
425 cls_method_handle_t h_user_list_buckets
;
426 cls_method_handle_t h_user_get_header
;
427 cls_method_handle_t h_user_reset_stats
;
429 cls_register("user", &h_class
);
432 cls_register_cxx_method(h_class
, "set_buckets_info", CLS_METHOD_RD
| CLS_METHOD_WR
,
433 cls_user_set_buckets_info
, &h_user_set_buckets_info
);
434 cls_register_cxx_method(h_class
, "complete_stats_sync", CLS_METHOD_RD
| CLS_METHOD_WR
,
435 cls_user_complete_stats_sync
, &h_user_complete_stats_sync
);
436 cls_register_cxx_method(h_class
, "remove_bucket", CLS_METHOD_RD
| CLS_METHOD_WR
, cls_user_remove_bucket
, &h_user_remove_bucket
);
437 cls_register_cxx_method(h_class
, "list_buckets", CLS_METHOD_RD
, cls_user_list_buckets
, &h_user_list_buckets
);
438 cls_register_cxx_method(h_class
, "get_header", CLS_METHOD_RD
, cls_user_get_header
, &h_user_get_header
);
439 cls_register_cxx_method(h_class
, "reset_user_stats", CLS_METHOD_RD
| CLS_METHOD_WR
, cls_user_reset_stats
, &h_user_reset_stats
);