]> git.proxmox.com Git - ceph.git/blob - ceph/src/cls/rgw_gc/cls_rgw_gc.cc
import 15.2.4
[ceph.git] / ceph / src / cls / rgw_gc / cls_rgw_gc.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/types.h"
5
6 #include <errno.h>
7
8 #include "objclass/objclass.h"
9 #include "cls/rgw/cls_rgw_ops.h"
10 #include "cls/rgw/cls_rgw_types.h"
11 #include "cls/rgw_gc/cls_rgw_gc_types.h"
12 #include "cls/rgw_gc/cls_rgw_gc_ops.h"
13 #include "cls/queue/cls_queue_ops.h"
14 #include "cls/rgw_gc/cls_rgw_gc_const.h"
15 #include "cls/queue/cls_queue_src.h"
16
17 #include "common/ceph_context.h"
18 #include "global/global_context.h"
19
20 #define dout_context g_ceph_context
21 #define dout_subsys ceph_subsys_rgw
22
23 #define GC_LIST_DEFAULT_MAX 128
24
25 CLS_VER(1,0)
26 CLS_NAME(rgw_gc)
27
28 static int cls_rgw_gc_queue_init(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
29 {
30 auto in_iter = in->cbegin();
31
32 cls_rgw_gc_queue_init_op op;
33 try {
34 decode(op, in_iter);
35 } catch (buffer::error& err) {
36 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_init: failed to decode entry\n");
37 return -EINVAL;
38 }
39
40 cls_rgw_gc_urgent_data urgent_data;
41 urgent_data.num_urgent_data_entries = op.num_deferred_entries;
42
43 cls_queue_init_op init_op;
44
45 CLS_LOG(10, "INFO: cls_rgw_gc_queue_init: queue size is %lu\n", op.size);
46
47 init_op.queue_size = op.size;
48 init_op.max_urgent_data_size = g_ceph_context->_conf->rgw_gc_max_deferred_entries_size;
49 encode(urgent_data, init_op.bl_urgent_data);
50
51 return queue_init(hctx, init_op);
52 }
53
54 static int cls_rgw_gc_queue_enqueue(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
55 {
56 auto in_iter = in->cbegin();
57 cls_rgw_gc_set_entry_op op;
58 try {
59 decode(op, in_iter);
60 } catch (buffer::error& err) {
61 CLS_LOG(1, "ERROR: cls_rgw_gc_queue_enqueue: failed to decode entry\n");
62 return -EINVAL;
63 }
64
65 op.info.time = ceph::real_clock::now();
66 op.info.time += make_timespan(op.expiration_secs);
67
68 //get head
69 cls_queue_head head;
70 int ret = queue_read_head(hctx, head);
71 if (ret < 0) {
72 return ret;
73 }
74
75 cls_queue_enqueue_op enqueue_op;
76 bufferlist bl_data;
77 encode(op.info, bl_data);
78 enqueue_op.bl_data_vec.emplace_back(bl_data);
79
80 CLS_LOG(20, "INFO: cls_rgw_gc_queue_enqueue: Data size is: %u \n", bl_data.length());
81
82 ret = queue_enqueue(hctx, enqueue_op, head);
83 if (ret < 0) {
84 return ret;
85 }
86
87 //Write back head
88 return queue_write_head(hctx, head);
89 }
90
91 static int cls_rgw_gc_queue_list_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
92 {
93 auto in_iter = in->cbegin();
94 cls_rgw_gc_list_op op;
95 try {
96 decode(op, in_iter);
97 } catch (buffer::error& err) {
98 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_list_entries(): failed to decode input\n");
99 return -EINVAL;
100 }
101
102 cls_queue_head head;
103 auto ret = queue_read_head(hctx, head);
104 if (ret < 0) {
105 return ret;
106 }
107
108 cls_rgw_gc_urgent_data urgent_data;
109 if (head.bl_urgent_data.length() > 0) {
110 auto iter_urgent_data = head.bl_urgent_data.cbegin();
111 try {
112 decode(urgent_data, iter_urgent_data);
113 } catch (buffer::error& err) {
114 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_list_entries(): failed to decode urgent data\n");
115 return -EINVAL;
116 }
117 }
118
119 cls_queue_list_op list_op;
120 if (! op.max) {
121 op.max = GC_LIST_DEFAULT_MAX;
122 }
123
124 list_op.max = op.max;
125 list_op.start_marker = op.marker;
126
127 cls_rgw_gc_list_ret list_ret;
128 uint32_t num_entries = 0; //Entries excluding the deferred ones
129 bool is_truncated = true;
130 string next_marker;
131 do {
132 cls_queue_list_ret op_ret;
133 int ret = queue_list_entries(hctx, list_op, op_ret, head);
134 if (ret < 0) {
135 CLS_LOG(5, "ERROR: queue_list_entries(): returned error %d\n", ret);
136 return ret;
137 }
138 is_truncated = op_ret.is_truncated;
139 next_marker = op_ret.next_marker;
140
141 if (op_ret.entries.size()) {
142 for (auto it : op_ret.entries) {
143 cls_rgw_gc_obj_info info;
144 try {
145 decode(info, it.data);
146 } catch (buffer::error& err) {
147 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_list_entries(): failed to decode gc info\n");
148 return -EINVAL;
149 }
150 bool found = false;
151 //Check for info tag in urgent data map
152 auto iter = urgent_data.urgent_data_map.find(info.tag);
153 if (iter != urgent_data.urgent_data_map.end()) {
154 found = true;
155 if (iter->second > info.time) {
156 CLS_LOG(10, "INFO: cls_rgw_gc_queue_list_entries(): tag found in urgent data: %s\n", info.tag.c_str());
157 continue;
158 }
159 }
160 //Search in xattrs
161 if (! found && urgent_data.num_xattr_urgent_entries > 0) {
162 bufferlist bl_xattrs;
163 int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs);
164 if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
165 CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret);
166 return ret;
167 }
168 if (ret != -ENOENT && ret != -ENODATA) {
169 std::unordered_map<string,ceph::real_time> xattr_urgent_data_map;
170 auto iter = bl_xattrs.cbegin();
171 try {
172 decode(xattr_urgent_data_map, iter);
173 } catch (buffer::error& err) {
174 CLS_LOG(1, "ERROR: cls_rgw_gc_queue_list_entries(): failed to decode xattrs urgent data map\n");
175 return -EINVAL;
176 } //end - catch
177 auto xattr_iter = xattr_urgent_data_map.find(info.tag);
178 if (xattr_iter != xattr_urgent_data_map.end()) {
179 if (xattr_iter->second > info.time) {
180 CLS_LOG(1, "INFO: cls_rgw_gc_queue_list_entries(): tag found in xattrs urgent data map: %s\n", info.tag.c_str());
181 continue;
182 }
183 }
184 } // end - ret != ENOENT && ENODATA
185 } // end - if not found
186 if (op.expired_only) {
187 real_time now = ceph::real_clock::now();
188 if (info.time <= now) {
189 list_ret.entries.emplace_back(info);
190 }
191 //Can break out here if info.time > now, since all subsequent entries won't have expired
192 } else {
193 list_ret.entries.emplace_back(info);
194 }
195 num_entries++;
196 }
197 CLS_LOG(10, "INFO: cls_rgw_gc_queue_list_entries(): num_entries: %u and op.max: %u\n", num_entries, op.max);
198 if (num_entries < op.max) {
199 list_op.max = (op.max - num_entries);
200 list_op.start_marker = op_ret.next_marker;
201 out->clear();
202 } else {
203 //We've reached the max number of entries needed
204 break;
205 }
206 } else {
207 //We dont have data to process
208 break;
209 }
210 } while(is_truncated);
211
212 list_ret.truncated = is_truncated;
213 if (list_ret.truncated) {
214 list_ret.next_marker = next_marker;
215 }
216 out->clear();
217 encode(list_ret, *out);
218 return 0;
219 }
220
221 static int cls_rgw_gc_queue_remove_entries(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
222 {
223 auto in_iter = in->cbegin();
224
225 cls_rgw_gc_queue_remove_entries_op op;
226 try {
227 decode(op, in_iter);
228 } catch (buffer::error& err) {
229 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_remove_entries(): failed to decode input\n");
230 return -EINVAL;
231 }
232
233 cls_queue_head head;
234 auto ret = queue_read_head(hctx, head);
235 if (ret < 0) {
236 return ret;
237 }
238
239 cls_rgw_gc_urgent_data urgent_data;
240 if (head.bl_urgent_data.length() > 0) {
241 auto iter_urgent_data = head.bl_urgent_data.cbegin();
242 try {
243 decode(urgent_data, iter_urgent_data);
244 } catch (buffer::error& err) {
245 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_remove_entries(): failed to decode urgent data\n");
246 return -EINVAL;
247 }
248 }
249
250 // List entries and calculate total number of entries (including invalid entries)
251 if (! op.num_entries) {
252 op.num_entries = GC_LIST_DEFAULT_MAX;
253 }
254 cls_queue_list_op list_op;
255 list_op.max = op.num_entries + 1; // +1 to get the offset of last + 1 entry
256 bool is_truncated = true;
257 uint32_t total_num_entries = 0, num_entries = 0;
258 string end_marker;
259 do {
260 cls_queue_list_ret op_ret;
261 int ret = queue_list_entries(hctx, list_op, op_ret, head);
262 if (ret < 0) {
263 CLS_LOG(5, "ERROR: queue_list_entries(): returned error %d\n", ret);
264 return ret;
265 }
266
267 is_truncated = op_ret.is_truncated;
268 unsigned int index = 0;
269 // If data is not empty
270 if (op_ret.entries.size()) {
271 for (auto it : op_ret.entries) {
272 cls_rgw_gc_obj_info info;
273 try {
274 decode(info, it.data);
275 } catch (buffer::error& err) {
276 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_remove_entries(): failed to decode gc info\n");
277 return -EINVAL;
278 }
279 CLS_LOG(20, "INFO: cls_rgw_gc_queue_remove_entries(): entry: %s\n", info.tag.c_str());
280 total_num_entries++;
281 index++;
282 bool found = false;
283 //Search for tag in urgent data map
284 auto iter = urgent_data.urgent_data_map.find(info.tag);
285 if (iter != urgent_data.urgent_data_map.end()) {
286 found = true;
287 if (iter->second > info.time) {
288 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): tag found in urgent data: %s\n", info.tag.c_str());
289 continue;
290 } else if (iter->second == info.time) {
291 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): erasing tag from urgent data: %s\n", info.tag.c_str());
292 urgent_data.urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later from queue
293 urgent_data.num_head_urgent_entries -= 1;
294 }
295 }//end-if map end
296 if (! found && urgent_data.num_xattr_urgent_entries > 0) {
297 //Search in xattrs
298 bufferlist bl_xattrs;
299 int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs);
300 if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
301 CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret);
302 return ret;
303 }
304 if (ret != -ENOENT && ret != -ENODATA) {
305 std::unordered_map<string,ceph::real_time> xattr_urgent_data_map;
306 auto iter = bl_xattrs.cbegin();
307 try {
308 decode(xattr_urgent_data_map, iter);
309 } catch (buffer::error& err) {
310 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_remove_entries(): failed to decode xattrs urgent data map\n");
311 return -EINVAL;
312 } //end - catch
313 auto xattr_iter = xattr_urgent_data_map.find(info.tag);
314 if (xattr_iter != xattr_urgent_data_map.end()) {
315 if (xattr_iter->second > info.time) {
316 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): tag found in xattrs urgent data map: %s\n", info.tag.c_str());
317 continue;
318 } else if (xattr_iter->second == info.time) {
319 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): erasing tag from xattrs urgent data: %s\n", info.tag.c_str());
320 xattr_urgent_data_map.erase(info.tag); //erase entry from map, as it will be removed later
321 urgent_data.num_xattr_urgent_entries -= 1;
322 }
323 }
324 } // end - ret != ENOENT && ENODATA
325 }// search in xattrs
326 num_entries++;
327 }//end-for
328
329 if (num_entries < (op.num_entries + 1)) {
330 if (! op_ret.is_truncated) {
331 end_marker = op_ret.next_marker;
332 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): not truncated and end offset is %s\n", end_marker.c_str());
333 break;
334 } else {
335 list_op.max = ((op.num_entries + 1) - num_entries);
336 list_op.start_marker = op_ret.next_marker;
337 out->clear();
338 }
339 } else {
340 end_marker = op_ret.entries[index - 1].marker;
341 CLS_LOG(1, "INFO: cls_rgw_gc_queue_remove_entries(): index is %u and end_offset is: %s\n", index, end_marker.c_str());
342 break;
343 }
344 } //end-if
345 else {
346 break;
347 }
348 } while(is_truncated);
349
350 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): Total number of entries to remove: %d\n", total_num_entries);
351 CLS_LOG(10, "INFO: cls_rgw_gc_queue_remove_entries(): End offset is %s\n", end_marker.c_str());
352
353 if (! end_marker.empty()) {
354 cls_queue_remove_op rem_op;
355 rem_op.end_marker = end_marker;
356 int ret = queue_remove_entries(hctx, rem_op, head);
357 if (ret < 0) {
358 CLS_LOG(5, "ERROR: queue_remove_entries(): returned error %d\n", ret);
359 return ret;
360 }
361 }
362
363 //Update urgent data map
364 head.bl_urgent_data.clear();
365 encode(urgent_data, head.bl_urgent_data);
366 CLS_LOG(5, "INFO: cls_rgw_gc_queue_remove_entries(): Urgent data size is %u\n", head.bl_urgent_data.length());
367
368 return queue_write_head(hctx, head);
369 }
370
371 static int cls_rgw_gc_queue_update_entry(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
372 {
373 int ret = 0;
374 auto in_iter = in->cbegin();
375
376 cls_rgw_gc_queue_defer_entry_op op;
377 try {
378 decode(op, in_iter);
379 } catch (buffer::error& err) {
380 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_update_entry(): failed to decode input\n");
381 return -EINVAL;
382 }
383
384 op.info.time = ceph::real_clock::now();
385 op.info.time += make_timespan(op.expiration_secs);
386
387 // Read head
388 cls_queue_head head;
389 ret = queue_read_head(hctx, head);
390 if (ret < 0) {
391 return ret;
392 }
393
394 auto bl_iter = head.bl_urgent_data.cbegin();
395 cls_rgw_gc_urgent_data urgent_data;
396 try {
397 decode(urgent_data, bl_iter);
398 } catch (buffer::error& err) {
399 CLS_LOG(5, "ERROR: cls_rgw_gc_queue_update_entry(): failed to decode urgent data\n");
400 return -EINVAL;
401 }
402
403 //has_urgent_data signifies whether urgent data in queue has changed
404 bool has_urgent_data = false, tag_found = false;
405 //search in unordered map in head
406 auto it = urgent_data.urgent_data_map.find(op.info.tag);
407 if (it != urgent_data.urgent_data_map.end()) {
408 it->second = op.info.time;
409 tag_found = true;
410 has_urgent_data = true;
411 } else { //search in xattrs
412 bufferlist bl_xattrs;
413 int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs);
414 if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
415 CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret);
416 return ret;
417 }
418 if (ret != -ENOENT && ret != -ENODATA) {
419 std::unordered_map<string,ceph::real_time> xattr_urgent_data_map;
420 auto iter = bl_xattrs.cbegin();
421 try {
422 decode(xattr_urgent_data_map, iter);
423 } catch (buffer::error& err) {
424 CLS_LOG(1, "ERROR: cls_rgw_gc_queue_update_entry(): failed to decode xattrs urgent data map\n");
425 return -EINVAL;
426 } //end - catch
427 auto xattr_iter = xattr_urgent_data_map.find(op.info.tag);
428 if (xattr_iter != xattr_urgent_data_map.end()) {
429 it->second = op.info.time;
430 tag_found = true;
431 //write the updated map back
432 bufferlist bl_map;
433 encode(xattr_urgent_data_map, bl_map);
434 ret = cls_cxx_setxattr(hctx, "cls_queue_urgent_data", &bl_map);
435 CLS_LOG(20, "%s(): setting attr: %s", __func__, "cls_queue_urgent_data");
436 if (ret < 0) {
437 CLS_LOG(0, "ERROR: %s(): cls_cxx_setxattr (attr=%s) returned %d", __func__, "cls_queue_urgent_data", ret);
438 return ret;
439 }
440 }
441 }// end ret != ENOENT ...
442 }
443
444 if (! tag_found) {
445 //try inserting in queue head
446 urgent_data.urgent_data_map.insert({op.info.tag, op.info.time});
447 urgent_data.num_head_urgent_entries += 1;
448 has_urgent_data = true;
449
450 bufferlist bl_urgent_data;
451 encode(urgent_data, bl_urgent_data);
452 //insert as xattrs
453 if (bl_urgent_data.length() > head.max_urgent_data_size) {
454 //remove inserted entry from urgent data
455 urgent_data.urgent_data_map.erase(op.info.tag);
456 urgent_data.num_head_urgent_entries -= 1;
457 has_urgent_data = false;
458
459 bufferlist bl_xattrs;
460 int ret = cls_cxx_getxattr(hctx, "cls_queue_urgent_data", &bl_xattrs);
461 if (ret < 0 && (ret != -ENOENT && ret != -ENODATA)) {
462 CLS_LOG(0, "ERROR: %s(): cls_cxx_getxattrs() returned %d", __func__, ret);
463 return ret;
464 }
465 std::unordered_map<string,ceph::real_time> xattr_urgent_data_map;
466 if (ret != -ENOENT && ret != -ENODATA) {
467 auto iter = bl_xattrs.cbegin();
468 try {
469 decode(xattr_urgent_data_map, iter);
470 } catch (buffer::error& err) {
471 CLS_LOG(1, "ERROR: cls_rgw_gc_queue_remove_entries(): failed to decode xattrs urgent data map\n");
472 return -EINVAL;
473 } //end - catch
474 }
475 xattr_urgent_data_map.insert({op.info.tag, op.info.time});
476 urgent_data.num_xattr_urgent_entries += 1;
477 has_urgent_data = true;
478 bufferlist bl_map;
479 encode(xattr_urgent_data_map, bl_map);
480 ret = cls_cxx_setxattr(hctx, "cls_queue_urgent_data", &bl_map);
481 CLS_LOG(20, "%s(): setting attr: %s", __func__, "cls_queue_urgent_data");
482 if (ret < 0) {
483 CLS_LOG(0, "ERROR: %s(): cls_cxx_setxattr (attr=%s) returned %d", __func__, "cls_queue_urgent_data", ret);
484 return ret;
485 }
486 }
487 }
488
489 if ((urgent_data.num_head_urgent_entries + urgent_data.num_xattr_urgent_entries) > urgent_data.num_urgent_data_entries) {
490 CLS_LOG(20, "Total num entries %u", urgent_data.num_urgent_data_entries);
491 CLS_LOG(20, "Num xattr entries %u", urgent_data.num_xattr_urgent_entries);
492 CLS_LOG(20, "Num head entries %u", urgent_data.num_head_urgent_entries);
493 CLS_LOG(0, "ERROR: Number of urgent data entries exceeded that requested by user, returning no space!");
494 return -ENOSPC;
495 }
496
497 cls_queue_enqueue_op enqueue_op;
498 bufferlist bl_data;
499 encode(op.info, bl_data);
500 enqueue_op.bl_data_vec.emplace_back(bl_data);
501 CLS_LOG(10, "INFO: cls_gc_update_entry: Data size is: %u \n", bl_data.length());
502
503 ret = queue_enqueue(hctx, enqueue_op, head);
504 if (ret < 0) {
505 return ret;
506 }
507
508 if (has_urgent_data) {
509 head.bl_urgent_data.clear();
510 encode(urgent_data, head.bl_urgent_data);
511 }
512
513 return queue_write_head(hctx, head);
514 }
515
516 CLS_INIT(rgw_gc)
517 {
518 CLS_LOG(1, "Loaded rgw gc class!");
519
520 cls_handle_t h_class;
521 cls_method_handle_t h_rgw_gc_queue_init;
522 cls_method_handle_t h_rgw_gc_queue_enqueue;
523 cls_method_handle_t h_rgw_gc_queue_list_entries;
524 cls_method_handle_t h_rgw_gc_queue_remove_entries;
525 cls_method_handle_t h_rgw_gc_queue_update_entry;
526
527 cls_register(RGW_GC_CLASS, &h_class);
528
529 /* gc */
530 cls_register_cxx_method(h_class, RGW_GC_QUEUE_INIT, CLS_METHOD_RD | CLS_METHOD_WR, cls_rgw_gc_queue_init, &h_rgw_gc_queue_init);
531 cls_register_cxx_method(h_class, RGW_GC_QUEUE_ENQUEUE, CLS_METHOD_RD | CLS_METHOD_WR, cls_rgw_gc_queue_enqueue, &h_rgw_gc_queue_enqueue);
532 cls_register_cxx_method(h_class, RGW_GC_QUEUE_LIST_ENTRIES, CLS_METHOD_RD, cls_rgw_gc_queue_list_entries, &h_rgw_gc_queue_list_entries);
533 cls_register_cxx_method(h_class, RGW_GC_QUEUE_REMOVE_ENTRIES, CLS_METHOD_RD | CLS_METHOD_WR, cls_rgw_gc_queue_remove_entries, &h_rgw_gc_queue_remove_entries);
534 cls_register_cxx_method(h_class, RGW_GC_QUEUE_UPDATE_ENTRY, CLS_METHOD_RD | CLS_METHOD_WR, cls_rgw_gc_queue_update_entry, &h_rgw_gc_queue_update_entry);
535
536 return;
537 }
538