]> git.proxmox.com Git - ceph.git/blob - ceph/src/cls/rgw_gc/cls_rgw_gc.cc
976e49ea52e114d5050ccb048b09a0c2535ea0ad
[ceph.git] / ceph / src / cls / rgw_gc / cls_rgw_gc.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/types.h"
5
6 #include <errno.h>
7
8 #include "objclass/objclass.h"
9 #include "cls/rgw/cls_rgw_ops.h"
10 #include "cls/rgw/cls_rgw_types.h"
11 #include "cls/rgw_gc/cls_rgw_gc_types.h"
12 #include "cls/rgw_gc/cls_rgw_gc_ops.h"
13 #include "cls/queue/cls_queue_ops.h"
14 #include "cls/rgw_gc/cls_rgw_gc_const.h"
15 #include "cls/queue/cls_queue_src.h"
16
17 #include "common/ceph_context.h"
18 #include "global/global_context.h"
19
20 #define dout_context g_ceph_context
21 #define dout_subsys ceph_subsys_rgw
22
23 #define GC_LIST_DEFAULT_MAX 128
24
25 CLS_VER(1,0)
26 CLS_NAME(rgw_gc)
27
28 static int cls_rgw_gc_queue_init(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
29 {
30 auto in_iter = in->cbegin();
31
32 cls_rgw_gc_queue_init_op op;
33 try {
34 decode(op, in_iter);
35 } catch (buffer::error& err) {
36 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_init: failed to decode entry\n");
37 return -EINVAL;
38 }
39
40 cls_rgw_gc_urgent_data urgent_data;
41 urgent_data.num_urgent_data_entries = op.num_deferred_entries;
42
43 cls_queue_init_op init_op;
44
45 CLS_LOG(10, "INFO: cls_rgw_gc_queue_init: queue size is %lu\n", op.size);
46
47 init_op.queue_size = op.size;
48 init_op.max_urgent_data_size = g_ceph_context->_conf->rgw_gc_max_deferred_entries_size;
49 encode(urgent_data, init_op.bl_urgent_data);
50
51 return queue_init(hctx, init_op);
52 }
53
54 static int cls_rgw_gc_queue_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
55 {
56 auto in_iter = in->cbegin();
57 cls_rgw_gc_set_entry_op op;
58 try {
59 decode(op, in_iter);
60 } catch (buffer::error& err) {
61 CLS_LOG(1, "ERROR: cls_rgw_gc_queue_enqueue: failed to decode entry\n");
62 return -EINVAL;
63 }
64
65 op.info.time = ceph::real_clock::now();
66 op.info.time += make_timespan(op.expiration_secs);
67
68 //get head
69 cls_queue_head head;
70 int ret = queue_read_head(hctx, head);
71 if (ret < 0) {
72 return ret;
73 }
74
75 cls_queue_enqueue_op enqueue_op;
76 bufferlist bl_data;
77 encode(op.info, bl_data);
78 enqueue_op.bl_data_vec.emplace_back(bl_data);
79
80 CLS_LOG(20, "INFO: cls_rgw_gc_queue_enqueue: Data size is: %u \n", bl_data.length());
81
82 ret = queue_enqueue(hctx, enqueue_op, head);
83 if (ret < 0) {
84 return ret;
85 }
86
87 //Write back head
88 return queue_write_head(hctx, head);
89 }
90
91 static int cls_rgw_gc_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
92 {
93 auto in_iter = in->cbegin();
94 cls_rgw_gc_list_op op;
95 try {
96 decode(op, in_iter);
97 } catch (buffer::error& err) {
98 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_list_entries(): failed to decode input\n");
99 return -EINVAL;
100 }
101
102 cls_queue_head head;
103 auto ret = queue_read_head(hctx, head);
104 if (ret < 0) {
105 return ret;
106 }
107
108 cls_rgw_gc_urgent_data urgent_data;
109 if (head.bl_urgent_data.length() > 0) {
110 auto iter_urgent_data = head.bl_urgent_data.cbegin();
111 try {
112 decode(urgent_data, iter_urgent_data);
113 } catch (buffer::error& err) {
114 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_list_entries(): failed to decode urgent data\n");
115 return -EINVAL;
116 }
117 }
118
119 cls_queue_list_op list_op;
120 if (! op.max) {
121 op.max = GC_LIST_DEFAULT_MAX;
122 }
123
124 list_op.max = op.max;
125 list_op.start_marker = op.marker;
126
127 cls_rgw_gc_list_ret list_ret;
128 uint32_t num_entries = 0; //Entries excluding the deferred ones
129 bool is_truncated = true;
130 string next_marker;
131 do {
132 cls_queue_list_ret op_ret;
133 int ret = queue_list_entries(hctx, list_op, op_ret, head);
134 if (ret < 0) {
135 CLS_LOG(5, "ERROR: queue_list_entries(): returned error %d\n", ret);
136 return ret;
137 }
138 is_truncated = op_ret.is_truncated;
139 next_marker = op_ret.next_marker;
140
141 if (op_ret.entries.size()) {
142 for (auto it : op_ret.entries) {
143 cls_rgw_gc_obj_info info;
144 try {
145 decode(info, it.data);
146 } catch (buffer::error& err) {
147 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_list_entries(): failed to decode gc info\n");
148 return -EINVAL;
149 }
150 bool found = false;
151 //Check for info tag in urgent data map
152 auto iter = urgent_data.urgent_data_map.find(info.tag);
153 if (iter != urgent_data.urgent_data_map.end()) {
154 found = true;
155 if (iter->second > info.time) {
156 CLS_LOG(10, "INFO: cls_rgw_gc_queue_list_entries(): tag found in urgent data: %s\n", info.tag.c_str());
157 continue;
158 }
159 }
160 //Search in xattrs
161 if (! found && urgent_data.num_xattr_urgent_entries > 0) {
162 bufferlist bl_xattrs;
163 int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs);
164 if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
165 CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret);
166 return ret;
167 }
168 if (ret != -ENOENT && ret != -ENODATA) {
169 std::unordered_map<string,ceph::real_time> xattr_urgent_data_map;
170 auto iter = bl_xattrs.cbegin();
171 try {
172 decode(xattr_urgent_data_map, iter);
173 } catch (buffer::error& err) {
174 CLS_LOG(1, "ERROR: cls_rgw_gc_queue_list_entries(): failed to decode xattrs urgent data map\n");
175 return -EINVAL;
176 } //end - catch
177 auto xattr_iter = xattr_urgent_data_map.find(info.tag);
178 if (xattr_iter != xattr_urgent_data_map.end()) {
179 if (xattr_iter->second > info.time) {
180 CLS_LOG(1, "INFO: cls_rgw_gc_queue_list_entries(): tag found in xattrs urgent data map: %s\n", info.tag.c_str());
181 continue;
182 }
183 }
184 } // end - ret != ENOENT && ENODATA
185 } // end - if not found
186 if (op.expired_only) {
187 real_time now = ceph::real_clock::now();
188 if (info.time <= now) {
189 list_ret.entries.emplace_back(info);
190 }
191 //Can break out here if info.time > now, since all subsequent entries won't have expired
192 } else {
193 list_ret.entries.emplace_back(info);
194 }
195 num_entries++;
196 }
197 CLS_LOG(10, "INFO: cls_rgw_gc_queue_list_entries(): num_entries: %u and op.max: %u\n", num_entries, op.max);
198 if (num_entries < op.max) {
199 list_op.max = (op.max - num_entries);
200 list_op.start_marker = op_ret.next_marker;
201 out->clear();
202 } else {
203 //We've reached the max number of entries needed
204 break;
205 }
206 } else {
207 //We dont have data to process
208 break;
209 }
210 } while(is_truncated);
211
212 list_ret.truncated = is_truncated;
213 if (list_ret.truncated) {
214 list_ret.next_marker = next_marker;
215 }
216 out->clear();
217 encode(list_ret, *out);
218 return 0;
219 }
220
221 static int cls_rgw_gc_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
222 {
223 auto in_iter = in->cbegin();
224
225 cls_rgw_gc_queue_remove_entries_op op;
226 try {
227 decode(op, in_iter);
228 } catch (buffer::error& err) {
229 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_remove_entries(): failed to decode input\n");
230 return -EINVAL;
231 }
232
233 cls_queue_head head;
234 auto ret = queue_read_head(hctx, head);
235 if (ret < 0) {
236 return ret;
237 }
238
239 cls_rgw_gc_urgent_data urgent_data;
240 if (head.bl_urgent_data.length() > 0) {
241 auto iter_urgent_data = head.bl_urgent_data.cbegin();
242 try {
243 decode(urgent_data, iter_urgent_data);
244 } catch (buffer::error& err) {
245 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_remove_entries(): failed to decode urgent data\n");
246 return -EINVAL;
247 }
248 }
249
250 // List entries and calculate total number of entries (including invalid entries)
251 if (! op.num_entries) {
252 op.num_entries = GC_LIST_DEFAULT_MAX;
253 }
254 cls_queue_list_op list_op;
255 list_op.max = op.num_entries + 1; // +1 to get the offset of last + 1 entry
256 bool is_truncated = true;
257 uint32_t total_num_entries = 0, num_entries = 0;
258 string end_marker;
259 do {
260 cls_queue_list_ret op_ret;
261 int ret = queue_list_entries(hctx, list_op, op_ret, head);
262 if (ret < 0) {
263 CLS_LOG(5, "ERROR: queue_list_entries(): returned error %d\n", ret);
264 return ret;
265 }
266
267 is_truncated = op_ret.is_truncated;
268 unsigned int index = 0;
269 // If data is not empty
270 if (op_ret.entries.size()) {
271 for (auto it : op_ret.entries) {
272 cls_rgw_gc_obj_info info;
273 try {
274 decode(info, it.data);
275 } catch (buffer::error& err) {
276 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_remove_entries(): failed to decode gc info\n");
277 return -EINVAL;
278 }
279 CLS_LOG(20, "INFO: cls_rgw_gc_queue_remove_entries(): entry: %s\n", info.tag.c_str());
280 total_num_entries++;
281 index++;
282 bool found = false;
283 //Search for tag in urgent data map
284 auto iter = urgent_data.urgent_data_map.find(info.tag);
285 if (iter != urgent_data.urgent_data_map.end()) {
286 found = true;
287 if (iter->second > info.time) {
288 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): tag found in urgent data: %s\n", info.tag.c_str());
289 continue;
290 } else if (iter->second == info.time) {
291 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): erasing tag from urgent data: %s\n", info.tag.c_str());
292 urgent_data.urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later from queue
293 urgent_data.num_head_urgent_entries -= 1;
294 }
295 }//end-if map end
296 if (! found && urgent_data.num_xattr_urgent_entries > 0) {
297 //Search in xattrs
298 bufferlist bl_xattrs;
299 int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs);
300 if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
301 CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret);
302 return ret;
303 }
304 if (ret != -ENOENT && ret != -ENODATA) {
305 std::unordered_map<string,ceph::real_time> xattr_urgent_data_map;
306 auto iter = bl_xattrs.cbegin();
307 try {
308 decode(xattr_urgent_data_map, iter);
309 } catch (buffer::error& err) {
310 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_remove_entries(): failed to decode xattrs urgent data map\n");
311 return -EINVAL;
312 } //end - catch
313 auto xattr_iter = xattr_urgent_data_map.find(info.tag);
314 if (xattr_iter != xattr_urgent_data_map.end()) {
315 if (xattr_iter->second > info.time) {
316 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): tag found in xattrs urgent data map: %s\n", info.tag.c_str());
317 continue;
318 } else if (xattr_iter->second == info.time) {
319 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): erasing tag from xattrs urgent data: %s\n", info.tag.c_str());
320 xattr_urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later
321 urgent_data.num_xattr_urgent_entries -= 1;
322 }
323 }
324 } // end - ret != ENOENT && ENODATA
325 }// search in xattrs
326 num_entries++;
327 }//end-for
328
329 if (num_entries < (op.num_entries + 1)) {
330 if (! op_ret.is_truncated) {
331 end_marker = op_ret.next_marker;
332 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): not truncated and end offset is %s\n", end_marker.c_str());
333 break;
334 } else {
335 list_op.max = ((op.num_entries + 1) - num_entries);
336 list_op.start_marker = op_ret.next_marker;
337 out->clear();
338 }
339 } else {
340 end_marker = op_ret.entries[index - 1].marker;
341 CLS_LOG(1, "INFO: cls_rgw_gc_queue_remove_entries(): index is %u and end_offset is: %s\n", index, end_marker.c_str());
342 break;
343 }
344 } //end-if
345 else {
346 break;
347 }
348 } while(is_truncated);
349
350 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): Total number of entries to remove: %d\n", total_num_entries);
351 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): End offset is %s\n", end_marker.c_str());
352
353 if (! end_marker.empty()) {
354 cls_queue_remove_op rem_op;
355 rem_op.end_marker = end_marker;
356 int ret = queue_remove_entries(hctx, rem_op, head);
357 if (ret < 0) {
358 CLS_LOG(5, "ERROR: queue_remove_entries(): returned error %d\n", ret);
359 return ret;
360 }
361 }
362
363 //Update urgent data map
364 encode(urgent_data, head.bl_urgent_data);
365
366 return queue_write_head(hctx, head);
367 }
368
369 static int cls_rgw_gc_queue_update_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
370 {
371 int ret = 0;
372 auto in_iter = in->cbegin();
373
374 cls_rgw_gc_queue_defer_entry_op op;
375 try {
376 decode(op, in_iter);
377 } catch (buffer::error& err) {
378 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_update_entry(): failed to decode input\n");
379 return -EINVAL;
380 }
381
382 op.info.time = ceph::real_clock::now();
383 op.info.time += make_timespan(op.expiration_secs);
384
385 // Read head
386 cls_queue_head head;
387 ret = queue_read_head(hctx, head);
388 if (ret < 0) {
389 return ret;
390 }
391
392 auto bl_iter = head.bl_urgent_data.cbegin();
393 cls_rgw_gc_urgent_data urgent_data;
394 try {
395 decode(urgent_data, bl_iter);
396 } catch (buffer::error& err) {
397 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_update_entry(): failed to decode urgent data\n");
398 return -EINVAL;
399 }
400
401 //has_urgent_data signifies whether urgent data in queue has changed
402 bool has_urgent_data = false, tag_found = false;
403 //search in unordered map in head
404 auto it = urgent_data.urgent_data_map.find(op.info.tag);
405 if (it != urgent_data.urgent_data_map.end()) {
406 it->second = op.info.time;
407 tag_found = true;
408 has_urgent_data = true;
409 } else { //search in xattrs
410 bufferlist bl_xattrs;
411 int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs);
412 if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
413 CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret);
414 return ret;
415 }
416 if (ret != -ENOENT && ret != -ENODATA) {
417 std::unordered_map<string,ceph::real_time> xattr_urgent_data_map;
418 auto iter = bl_xattrs.cbegin();
419 try {
420 decode(xattr_urgent_data_map, iter);
421 } catch (buffer::error& err) {
422 CLS_LOG(1, "ERROR: cls_rgw_gc_queue_update_entry(): failed to decode xattrs urgent data map\n");
423 return -EINVAL;
424 } //end - catch
425 auto xattr_iter = xattr_urgent_data_map.find(op.info.tag);
426 if (xattr_iter != xattr_urgent_data_map.end()) {
427 it->second = op.info.time;
428 tag_found = true;
429 //write the updated map back
430 bufferlist bl_map;
431 encode(xattr_urgent_data_map, bl_map);
432 ret = cls_cxx_setxattr(hctx, "cls_queue_urgent_data", &bl_map);
433 CLS_LOG(20, "%s(): setting attr: %s", __func__, "cls_queue_urgent_data");
434 if (ret < 0) {
435 CLS_LOG(0, "ERROR: %s(): cls_cxx_setxattr (attr=%s) returned %d", __func__, "cls_queue_urgent_data", ret);
436 return ret;
437 }
438 }
439 }// end ret != ENOENT ...
440 }
441
442 if (! tag_found) {
443 //try inserting in queue head
444 urgent_data.urgent_data_map.insert({op.info.tag, op.info.time});
445 urgent_data.num_head_urgent_entries += 1;
446 has_urgent_data = true;
447
448 bufferlist bl_urgent_data;
449 encode(urgent_data, bl_urgent_data);
450 //insert as xattrs
451 if (bl_urgent_data.length() > head.max_urgent_data_size) {
452 //remove inserted entry from urgent data
453 urgent_data.urgent_data_map.erase(op.info.tag);
454 urgent_data.num_head_urgent_entries -= 1;
455 has_urgent_data = false;
456
457 bufferlist bl_xattrs;
458 int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs);
459 if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
460 CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret);
461 return ret;
462 }
463 std::unordered_map<string,ceph::real_time> xattr_urgent_data_map;
464 if (ret != -ENOENT && ret != -ENODATA) {
465 auto iter = bl_xattrs.cbegin();
466 try {
467 decode(xattr_urgent_data_map, iter);
468 } catch (buffer::error& err) {
469 CLS_LOG(1, "ERROR: cls_rgw_gc_queue_remove_entries(): failed to decode xattrs urgent data map\n");
470 return -EINVAL;
471 } //end - catch
472 }
473 xattr_urgent_data_map.insert({op.info.tag, op.info.time});
474 urgent_data.num_xattr_urgent_entries += 1;
475 has_urgent_data = true;
476 bufferlist bl_map;
477 encode(xattr_urgent_data_map, bl_map);
478 ret = cls_cxx_setxattr(hctx, "cls_queue_urgent_data", &bl_map);
479 CLS_LOG(20, "%s(): setting attr: %s", __func__, "cls_queue_urgent_data");
480 if (ret < 0) {
481 CLS_LOG(0, "ERROR: %s(): cls_cxx_setxattr (attr=%s) returned %d", __func__, "cls_queue_urgent_data", ret);
482 return ret;
483 }
484 }
485 }
486
487 if ((urgent_data.num_head_urgent_entries + urgent_data.num_xattr_urgent_entries) > urgent_data.num_urgent_data_entries) {
488 CLS_LOG(20, "Total num entries %u", urgent_data.num_urgent_data_entries);
489 CLS_LOG(20, "Num xattr entries %u", urgent_data.num_xattr_urgent_entries);
490 CLS_LOG(20, "Num head entries %u", urgent_data.num_head_urgent_entries);
491 CLS_LOG(0, "ERROR: Number of urgent data entries exceeded that requested by user, returning no space!");
492 return -ENOSPC;
493 }
494
495 cls_queue_enqueue_op enqueue_op;
496 bufferlist bl_data;
497 encode(op.info, bl_data);
498 enqueue_op.bl_data_vec.emplace_back(bl_data);
499 CLS_LOG(10, "INFO: cls_gc_update_entry: Data size is: %u \n", bl_data.length());
500
501 ret = queue_enqueue(hctx, enqueue_op, head);
502 if (ret < 0) {
503 return ret;
504 }
505
506 if (has_urgent_data) {
507 head.bl_urgent_data.clear();
508 encode(urgent_data, head.bl_urgent_data);
509 }
510
511 return queue_write_head(hctx, head);
512 }
513
514 CLS_INIT(rgw_gc)
515 {
516 CLS_LOG(1, "Loaded rgw gc class!");
517
518 cls_handle_t h_class;
519 cls_method_handle_t h_rgw_gc_queue_init;
520 cls_method_handle_t h_rgw_gc_queue_enqueue;
521 cls_method_handle_t h_rgw_gc_queue_list_entries;
522 cls_method_handle_t h_rgw_gc_queue_remove_entries;
523 cls_method_handle_t h_rgw_gc_queue_update_entry;
524
525 cls_register(RGW_GC_CLASS, &h_class);
526
527 /* gc */
528 cls_register_cxx_method(h_class, RGW_GC_QUEUE_INIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_rgw_gc_queue_init, &h_rgw_gc_queue_init);
529 cls_register_cxx_method(h_class, RGW_GC_QUEUE_ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_rgw_gc_queue_enqueue, &h_rgw_gc_queue_enqueue);
530 cls_register_cxx_method(h_class, RGW_GC_QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_rgw_gc_queue_list_entries, &h_rgw_gc_queue_list_entries);
531 cls_register_cxx_method(h_class, RGW_GC_QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_rgw_gc_queue_remove_entries, &h_rgw_gc_queue_remove_entries);
532 cls_register_cxx_method(h_class, RGW_GC_QUEUE_UPDATE_ENTRY, CLS_METHOD_RD | CLS_METHOD_WR, cls_rgw_gc_queue_update_entry, &h_rgw_gc_queue_update_entry);
533
534 return;
535 }
536