1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/types.h"
8 #include "objclass/objclass.h"
9 #include "cls/rgw/cls_rgw_ops.h"
10 #include "cls/rgw/cls_rgw_types.h"
11 #include "cls/rgw_gc/cls_rgw_gc_types.h"
12 #include "cls/rgw_gc/cls_rgw_gc_ops.h"
13 #include "cls/queue/cls_queue_ops.h"
14 #include "cls/rgw_gc/cls_rgw_gc_const.h"
15 #include "cls/queue/cls_queue_src.h"
17 #include "common/ceph_context.h"
18 #include "global/global_context.h"
20 #define dout_context g_ceph_context
21 #define dout_subsys ceph_subsys_rgw
23 #define GC_LIST_DEFAULT_MAX 128
28 static int cls_rgw_gc_queue_init(cls_method_context_t hctx
, bufferlist
*in
, bufferlist
*out
)
30 auto in_iter
= in
->cbegin();
32 cls_rgw_gc_queue_init_op op
;
35 } catch (buffer::error
& err
) {
36 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_init: failed to decode entry\n");
40 cls_rgw_gc_urgent_data urgent_data
;
41 urgent_data
.num_urgent_data_entries
= op
.num_deferred_entries
;
43 cls_queue_init_op init_op
;
45 CLS_LOG(10, "INFO: cls_rgw_gc_queue_init: queue size is %lu\n", op
.size
);
47 init_op
.queue_size
= op
.size
;
48 init_op
.max_urgent_data_size
= g_ceph_context
->_conf
->rgw_gc_max_deferred_entries_size
;
49 encode(urgent_data
, init_op
.bl_urgent_data
);
51 return queue_init(hctx
, init_op
);
54 static int cls_rgw_gc_queue_enqueue(cls_method_context_t hctx
, bufferlist
*in
, bufferlist
*out
)
56 auto in_iter
= in
->cbegin();
57 cls_rgw_gc_set_entry_op op
;
60 } catch (buffer::error
& err
) {
61 CLS_LOG(1, "ERROR: cls_rgw_gc_queue_enqueue: failed to decode entry\n");
65 op
.info
.time
= ceph::real_clock::now();
66 op
.info
.time
+= make_timespan(op
.expiration_secs
);
70 int ret
= queue_read_head(hctx
, head
);
75 cls_queue_enqueue_op enqueue_op
;
77 encode(op
.info
, bl_data
);
78 enqueue_op
.bl_data_vec
.emplace_back(bl_data
);
80 CLS_LOG(20, "INFO: cls_rgw_gc_queue_enqueue: Data size is: %u \n", bl_data
.length());
82 ret
= queue_enqueue(hctx
, enqueue_op
, head
);
88 return queue_write_head(hctx
, head
);
91 static int cls_rgw_gc_queue_list_entries(cls_method_context_t hctx
, bufferlist
*in
, bufferlist
*out
)
93 auto in_iter
= in
->cbegin();
94 cls_rgw_gc_list_op op
;
97 } catch (buffer::error
& err
) {
98 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_list_entries(): failed to decode input\n");
103 auto ret
= queue_read_head(hctx
, head
);
108 cls_rgw_gc_urgent_data urgent_data
;
109 if (head
.bl_urgent_data
.length() > 0) {
110 auto iter_urgent_data
= head
.bl_urgent_data
.cbegin();
112 decode(urgent_data
, iter_urgent_data
);
113 } catch (buffer::error
& err
) {
114 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_list_entries(): failed to decode urgent data\n");
119 cls_queue_list_op list_op
;
121 op
.max
= GC_LIST_DEFAULT_MAX
;
124 list_op
.max
= op
.max
;
125 list_op
.start_marker
= op
.marker
;
127 cls_rgw_gc_list_ret list_ret
;
128 uint32_t num_entries
= 0; //Entries excluding the deferred ones
129 bool is_truncated
= true;
132 cls_queue_list_ret op_ret
;
133 int ret
= queue_list_entries(hctx
, list_op
, op_ret
, head
);
135 CLS_LOG(5, "ERROR: queue_list_entries(): returned error %d\n", ret
);
138 is_truncated
= op_ret
.is_truncated
;
139 next_marker
= op_ret
.next_marker
;
141 if (op_ret
.entries
.size()) {
142 for (auto it
: op_ret
.entries
) {
143 cls_rgw_gc_obj_info info
;
145 decode(info
, it
.data
);
146 } catch (buffer::error
& err
) {
147 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_list_entries(): failed to decode gc info\n");
151 //Check for info tag in urgent data map
152 auto iter
= urgent_data
.urgent_data_map
.find(info
.tag
);
153 if (iter
!= urgent_data
.urgent_data_map
.end()) {
155 if (iter
->second
> info
.time
) {
156 CLS_LOG(10, "INFO: cls_rgw_gc_queue_list_entries(): tag found in urgent data: %s\n", info
.tag
.c_str());
161 if (! found
&& urgent_data
.num_xattr_urgent_entries
> 0) {
162 bufferlist bl_xattrs
;
163 int ret
= cls_cxx_getxattr(hctx
, "cls_queue_urgent_data", &bl_xattrs
);
164 if (ret
< 0 && (ret
!= -ENOENT
&& ret
!= -ENODATA
)) {
165 CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__
, ret
);
168 if (ret
!= -ENOENT
&& ret
!= -ENODATA
) {
169 std::unordered_map
<string
,ceph::real_time
> xattr_urgent_data_map
;
170 auto iter
= bl_xattrs
.cbegin();
172 decode(xattr_urgent_data_map
, iter
);
173 } catch (buffer::error
& err
) {
174 CLS_LOG(1, "ERROR: cls_rgw_gc_queue_list_entries(): failed to decode xattrs urgent data map\n");
177 auto xattr_iter
= xattr_urgent_data_map
.find(info
.tag
);
178 if (xattr_iter
!= xattr_urgent_data_map
.end()) {
179 if (xattr_iter
->second
> info
.time
) {
180 CLS_LOG(1, "INFO: cls_rgw_gc_queue_list_entries(): tag found in xattrs urgent data map: %s\n", info
.tag
.c_str());
184 } // end - ret != ENOENT && ENODATA
185 } // end - if not found
186 if (op
.expired_only
) {
187 real_time now
= ceph::real_clock::now();
188 if (info
.time
<= now
) {
189 list_ret
.entries
.emplace_back(info
);
191 //Can break out here if info.time > now, since all subsequent entries won't have expired
193 list_ret
.entries
.emplace_back(info
);
197 CLS_LOG(10, "INFO: cls_rgw_gc_queue_list_entries(): num_entries: %u and op.max: %u\n", num_entries
, op
.max
);
198 if (num_entries
< op
.max
) {
199 list_op
.max
= (op
.max
- num_entries
);
200 list_op
.start_marker
= op_ret
.next_marker
;
203 //We've reached the max number of entries needed
207 //We dont have data to process
210 } while(is_truncated
);
212 list_ret
.truncated
= is_truncated
;
213 if (list_ret
.truncated
) {
214 list_ret
.next_marker
= next_marker
;
217 encode(list_ret
, *out
);
221 static int cls_rgw_gc_queue_remove_entries(cls_method_context_t hctx
, bufferlist
*in
, bufferlist
*out
)
223 auto in_iter
= in
->cbegin();
225 cls_rgw_gc_queue_remove_entries_op op
;
228 } catch (buffer::error
& err
) {
229 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_remove_entries(): failed to decode input\n");
234 auto ret
= queue_read_head(hctx
, head
);
239 cls_rgw_gc_urgent_data urgent_data
;
240 if (head
.bl_urgent_data
.length() > 0) {
241 auto iter_urgent_data
= head
.bl_urgent_data
.cbegin();
243 decode(urgent_data
, iter_urgent_data
);
244 } catch (buffer::error
& err
) {
245 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_remove_entries(): failed to decode urgent data\n");
250 // List entries and calculate total number of entries (including invalid entries)
251 if (! op
.num_entries
) {
252 op
.num_entries
= GC_LIST_DEFAULT_MAX
;
254 cls_queue_list_op list_op
;
255 list_op
.max
= op
.num_entries
+ 1; // +1 to get the offset of last + 1 entry
256 bool is_truncated
= true;
257 uint32_t total_num_entries
= 0, num_entries
= 0;
260 cls_queue_list_ret op_ret
;
261 int ret
= queue_list_entries(hctx
, list_op
, op_ret
, head
);
263 CLS_LOG(5, "ERROR: queue_list_entries(): returned error %d\n", ret
);
267 is_truncated
= op_ret
.is_truncated
;
268 unsigned int index
= 0;
269 // If data is not empty
270 if (op_ret
.entries
.size()) {
271 for (auto it
: op_ret
.entries
) {
272 cls_rgw_gc_obj_info info
;
274 decode(info
, it
.data
);
275 } catch (buffer::error
& err
) {
276 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_remove_entries(): failed to decode gc info\n");
279 CLS_LOG(20, "INFO: cls_rgw_gc_queue_remove_entries(): entry: %s\n", info
.tag
.c_str());
283 //Search for tag in urgent data map
284 auto iter
= urgent_data
.urgent_data_map
.find(info
.tag
);
285 if (iter
!= urgent_data
.urgent_data_map
.end()) {
287 if (iter
->second
> info
.time
) {
288 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): tag found in urgent data: %s\n", info
.tag
.c_str());
290 } else if (iter
->second
== info
.time
) {
291 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): erasing tag from urgent data: %s\n", info
.tag
.c_str());
292 urgent_data
.urgent_data_map
.erase(info
.tag
); //erase entry from map, as it will be removed later from queue
293 urgent_data
.num_head_urgent_entries
-= 1;
296 if (! found
&& urgent_data
.num_xattr_urgent_entries
> 0) {
298 bufferlist bl_xattrs
;
299 int ret
= cls_cxx_getxattr(hctx
, "cls_queue_urgent_data", &bl_xattrs
);
300 if (ret
< 0 && (ret
!= -ENOENT
&& ret
!= -ENODATA
)) {
301 CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__
, ret
);
304 if (ret
!= -ENOENT
&& ret
!= -ENODATA
) {
305 std::unordered_map
<string
,ceph::real_time
> xattr_urgent_data_map
;
306 auto iter
= bl_xattrs
.cbegin();
308 decode(xattr_urgent_data_map
, iter
);
309 } catch (buffer::error
& err
) {
310 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_remove_entries(): failed to decode xattrs urgent data map\n");
313 auto xattr_iter
= xattr_urgent_data_map
.find(info
.tag
);
314 if (xattr_iter
!= xattr_urgent_data_map
.end()) {
315 if (xattr_iter
->second
> info
.time
) {
316 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): tag found in xattrs urgent data map: %s\n", info
.tag
.c_str());
318 } else if (xattr_iter
->second
== info
.time
) {
319 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): erasing tag from xattrs urgent data: %s\n", info
.tag
.c_str());
320 xattr_urgent_data_map
.erase(info
.tag
); //erase entry from map, as it will be removed later
321 urgent_data
.num_xattr_urgent_entries
-= 1;
324 } // end - ret != ENOENT && ENODATA
329 if (num_entries
< (op
.num_entries
+ 1)) {
330 if (! op_ret
.is_truncated
) {
331 end_marker
= op_ret
.next_marker
;
332 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): not truncated and end offset is %s\n", end_marker
.c_str());
335 list_op
.max
= ((op
.num_entries
+ 1) - num_entries
);
336 list_op
.start_marker
= op_ret
.next_marker
;
340 end_marker
= op_ret
.entries
[index
- 1].marker
;
341 CLS_LOG(1, "INFO: cls_rgw_gc_queue_remove_entries(): index is %u and end_offset is: %s\n", index
, end_marker
.c_str());
348 } while(is_truncated
);
350 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): Total number of entries to remove: %d\n", total_num_entries
);
351 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): End offset is %s\n", end_marker
.c_str());
353 if (! end_marker
.empty()) {
354 cls_queue_remove_op rem_op
;
355 rem_op
.end_marker
= end_marker
;
356 int ret
= queue_remove_entries(hctx
, rem_op
, head
);
358 CLS_LOG(5, "ERROR: queue_remove_entries(): returned error %d\n", ret
);
363 //Update urgent data map
364 head
.bl_urgent_data
.clear();
365 encode(urgent_data
, head
.bl_urgent_data
);
366 CLS_LOG(5, "INFO: cls_rgw_gc_queue_remove_entries(): Urgent data size is %u\n", head
.bl_urgent_data
.length());
368 return queue_write_head(hctx
, head
);
371 static int cls_rgw_gc_queue_update_entry(cls_method_context_t hctx
, bufferlist
*in
, bufferlist
*out
)
374 auto in_iter
= in
->cbegin();
376 cls_rgw_gc_queue_defer_entry_op op
;
379 } catch (buffer::error
& err
) {
380 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_update_entry(): failed to decode input\n");
384 op
.info
.time
= ceph::real_clock::now();
385 op
.info
.time
+= make_timespan(op
.expiration_secs
);
389 ret
= queue_read_head(hctx
, head
);
394 auto bl_iter
= head
.bl_urgent_data
.cbegin();
395 cls_rgw_gc_urgent_data urgent_data
;
397 decode(urgent_data
, bl_iter
);
398 } catch (buffer::error
& err
) {
399 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_update_entry(): failed to decode urgent data\n");
403 //has_urgent_data signifies whether urgent data in queue has changed
404 bool has_urgent_data
= false, tag_found
= false;
405 //search in unordered map in head
406 auto it
= urgent_data
.urgent_data_map
.find(op
.info
.tag
);
407 if (it
!= urgent_data
.urgent_data_map
.end()) {
408 it
->second
= op
.info
.time
;
410 has_urgent_data
= true;
411 } else { //search in xattrs
412 bufferlist bl_xattrs
;
413 int ret
= cls_cxx_getxattr(hctx
, "cls_queue_urgent_data", &bl_xattrs
);
414 if (ret
< 0 && (ret
!= -ENOENT
&& ret
!= -ENODATA
)) {
415 CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__
, ret
);
418 if (ret
!= -ENOENT
&& ret
!= -ENODATA
) {
419 std::unordered_map
<string
,ceph::real_time
> xattr_urgent_data_map
;
420 auto iter
= bl_xattrs
.cbegin();
422 decode(xattr_urgent_data_map
, iter
);
423 } catch (buffer::error
& err
) {
424 CLS_LOG(1, "ERROR: cls_rgw_gc_queue_update_entry(): failed to decode xattrs urgent data map\n");
427 auto xattr_iter
= xattr_urgent_data_map
.find(op
.info
.tag
);
428 if (xattr_iter
!= xattr_urgent_data_map
.end()) {
429 it
->second
= op
.info
.time
;
431 //write the updated map back
433 encode(xattr_urgent_data_map
, bl_map
);
434 ret
= cls_cxx_setxattr(hctx
, "cls_queue_urgent_data", &bl_map
);
435 CLS_LOG(20, "%s(): setting attr: %s", __func__
, "cls_queue_urgent_data");
437 CLS_LOG(0, "ERROR: %s(): cls_cxx_setxattr (attr=%s) returned %d", __func__
, "cls_queue_urgent_data", ret
);
441 }// end ret != ENOENT ...
445 //try inserting in queue head
446 urgent_data
.urgent_data_map
.insert({op
.info
.tag
, op
.info
.time
});
447 urgent_data
.num_head_urgent_entries
+= 1;
448 has_urgent_data
= true;
450 bufferlist bl_urgent_data
;
451 encode(urgent_data
, bl_urgent_data
);
453 if (bl_urgent_data
.length() > head
.max_urgent_data_size
) {
454 //remove inserted entry from urgent data
455 urgent_data
.urgent_data_map
.erase(op
.info
.tag
);
456 urgent_data
.num_head_urgent_entries
-= 1;
457 has_urgent_data
= false;
459 bufferlist bl_xattrs
;
460 int ret
= cls_cxx_getxattr(hctx
, "cls_queue_urgent_data", &bl_xattrs
);
461 if (ret
< 0 && (ret
!= -ENOENT
&& ret
!= -ENODATA
)) {
462 CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__
, ret
);
465 std::unordered_map
<string
,ceph::real_time
> xattr_urgent_data_map
;
466 if (ret
!= -ENOENT
&& ret
!= -ENODATA
) {
467 auto iter
= bl_xattrs
.cbegin();
469 decode(xattr_urgent_data_map
, iter
);
470 } catch (buffer::error
& err
) {
471 CLS_LOG(1, "ERROR: cls_rgw_gc_queue_remove_entries(): failed to decode xattrs urgent data map\n");
475 xattr_urgent_data_map
.insert({op
.info
.tag
, op
.info
.time
});
476 urgent_data
.num_xattr_urgent_entries
+= 1;
477 has_urgent_data
= true;
479 encode(xattr_urgent_data_map
, bl_map
);
480 ret
= cls_cxx_setxattr(hctx
, "cls_queue_urgent_data", &bl_map
);
481 CLS_LOG(20, "%s(): setting attr: %s", __func__
, "cls_queue_urgent_data");
483 CLS_LOG(0, "ERROR: %s(): cls_cxx_setxattr (attr=%s) returned %d", __func__
, "cls_queue_urgent_data", ret
);
489 if ((urgent_data
.num_head_urgent_entries
+ urgent_data
.num_xattr_urgent_entries
) > urgent_data
.num_urgent_data_entries
) {
490 CLS_LOG(20, "Total num entries %u", urgent_data
.num_urgent_data_entries
);
491 CLS_LOG(20, "Num xattr entries %u", urgent_data
.num_xattr_urgent_entries
);
492 CLS_LOG(20, "Num head entries %u", urgent_data
.num_head_urgent_entries
);
493 CLS_LOG(0, "ERROR: Number of urgent data entries exceeded that requested by user, returning no space!");
497 cls_queue_enqueue_op enqueue_op
;
499 encode(op
.info
, bl_data
);
500 enqueue_op
.bl_data_vec
.emplace_back(bl_data
);
501 CLS_LOG(10, "INFO: cls_gc_update_entry: Data size is: %u \n", bl_data
.length());
503 ret
= queue_enqueue(hctx
, enqueue_op
, head
);
508 if (has_urgent_data
) {
509 head
.bl_urgent_data
.clear();
510 encode(urgent_data
, head
.bl_urgent_data
);
513 return queue_write_head(hctx
, head
);
518 CLS_LOG(1, "Loaded rgw gc class!");
520 cls_handle_t h_class
;
521 cls_method_handle_t h_rgw_gc_queue_init
;
522 cls_method_handle_t h_rgw_gc_queue_enqueue
;
523 cls_method_handle_t h_rgw_gc_queue_list_entries
;
524 cls_method_handle_t h_rgw_gc_queue_remove_entries
;
525 cls_method_handle_t h_rgw_gc_queue_update_entry
;
527 cls_register(RGW_GC_CLASS
, &h_class
);
530 cls_register_cxx_method(h_class
, RGW_GC_QUEUE_INIT
, CLS_METHOD_RD
| CLS_METHOD_WR
, cls_rgw_gc_queue_init
, &h_rgw_gc_queue_init
);
531 cls_register_cxx_method(h_class
, RGW_GC_QUEUE_ENQUEUE
, CLS_METHOD_RD
| CLS_METHOD_WR
, cls_rgw_gc_queue_enqueue
, &h_rgw_gc_queue_enqueue
);
532 cls_register_cxx_method(h_class
, RGW_GC_QUEUE_LIST_ENTRIES
, CLS_METHOD_RD
, cls_rgw_gc_queue_list_entries
, &h_rgw_gc_queue_list_entries
);
533 cls_register_cxx_method(h_class
, RGW_GC_QUEUE_REMOVE_ENTRIES
, CLS_METHOD_RD
| CLS_METHOD_WR
, cls_rgw_gc_queue_remove_entries
, &h_rgw_gc_queue_remove_entries
);
534 cls_register_cxx_method(h_class
, RGW_GC_QUEUE_UPDATE_ENTRY
, CLS_METHOD_RD
| CLS_METHOD_WR
, cls_rgw_gc_queue_update_entry
, &h_rgw_gc_queue_update_entry
);