]> git.proxmox.com Git - ceph.git/blob - ceph/src/cls/queue/cls_queue_src.cc
bd973352d425d88609d4e24c6a2ebb67af3f433e
[ceph.git] / ceph / src / cls / queue / cls_queue_src.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/types.h"
5
6 #include "objclass/objclass.h"
7 #include "cls/queue/cls_queue_types.h"
8 #include "cls/queue/cls_queue_ops.h"
9 #include "cls/queue/cls_queue_const.h"
10 #include "cls/queue/cls_queue_src.h"
11
12 using std::string;
13 using ceph::bufferlist;
14 using ceph::decode;
15 using ceph::encode;
16
17 int queue_write_head(cls_method_context_t hctx, cls_queue_head& head)
18 {
19 bufferlist bl;
20 uint16_t entry_start = QUEUE_HEAD_START;
21 encode(entry_start, bl);
22
23 bufferlist bl_head;
24 encode(head, bl_head);
25
26 uint64_t encoded_len = bl_head.length();
27 encode(encoded_len, bl);
28
29 bl.claim_append(bl_head);
30
31 if (bl.length() > head.max_head_size) {
32 CLS_LOG(0, "ERROR: queue_write_head: invalid head size = %u and urgent data size = %u \n", bl.length(), head.bl_urgent_data.length());
33 return -EINVAL;
34 }
35
36 int ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
37 if (ret < 0) {
38 CLS_LOG(5, "ERROR: queue_write_head: failed to write head");
39 return ret;
40 }
41 return 0;
42 }
43
44 int queue_read_head(cls_method_context_t hctx, cls_queue_head& head)
45 {
46 uint64_t chunk_size = 1024, start_offset = 0;
47
48 bufferlist bl_head;
49 const auto ret = cls_cxx_read(hctx, start_offset, chunk_size, &bl_head);
50 if (ret < 0) {
51 CLS_LOG(5, "ERROR: queue_read_head: failed to read head");
52 return ret;
53 }
54 if (ret == 0) {
55 CLS_LOG(20, "INFO: queue_read_head: empty head, not initialized yet");
56 return -EINVAL;
57 }
58
59 //Process the chunk of data read
60 auto it = bl_head.cbegin();
61 // Queue head start
62 uint16_t queue_head_start;
63 try {
64 decode(queue_head_start, it);
65 } catch (const ceph::buffer::error& err) {
66 CLS_LOG(0, "ERROR: queue_read_head: failed to decode queue start: %s", err.what());
67 return -EINVAL;
68 }
69 if (queue_head_start != QUEUE_HEAD_START) {
70 CLS_LOG(0, "ERROR: queue_read_head: invalid queue start");
71 return -EINVAL;
72 }
73
74 uint64_t encoded_len;
75 try {
76 decode(encoded_len, it);
77 } catch (const ceph::buffer::error& err) {
78 CLS_LOG(0, "ERROR: queue_read_head: failed to decode encoded head size: %s", err.what());
79 return -EINVAL;
80 }
81
82 if (encoded_len > (chunk_size - QUEUE_ENTRY_OVERHEAD)) {
83 start_offset = chunk_size;
84 chunk_size = (encoded_len - (chunk_size - QUEUE_ENTRY_OVERHEAD));
85 bufferlist bl_remaining_head;
86 const auto ret = cls_cxx_read2(hctx, start_offset, chunk_size, &bl_remaining_head, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
87 if (ret < 0) {
88 CLS_LOG(5, "ERROR: queue_read_head: failed to read remaining part of head");
89 return ret;
90 }
91 bl_head.claim_append(bl_remaining_head);
92 }
93
94 try {
95 decode(head, it);
96 } catch (const ceph::buffer::error& err) {
97 CLS_LOG(0, "ERROR: queue_read_head: failed to decode head: %s", err.what());
98 return -EINVAL;
99 }
100
101 return 0;
102 }
103
104 int queue_init(cls_method_context_t hctx, const cls_queue_init_op& op)
105 {
106 //get head and its size
107 cls_queue_head head;
108 int ret = queue_read_head(hctx, head);
109
110 //head is already initialized
111 if (ret == 0) {
112 return -EEXIST;
113 }
114
115 if (ret < 0 && ret != -EINVAL) {
116 return ret;
117 }
118
119 if (op.bl_urgent_data.length() > 0) {
120 head.bl_urgent_data = op.bl_urgent_data;
121 }
122
123 head.max_head_size = QUEUE_HEAD_SIZE_1K + op.max_urgent_data_size;
124 head.queue_size = op.queue_size + head.max_head_size;
125 head.max_urgent_data_size = op.max_urgent_data_size;
126 head.tail.gen = head.front.gen = 0;
127 head.tail.offset = head.front.offset = head.max_head_size;
128
129 CLS_LOG(20, "INFO: init_queue_op queue actual size %lu", head.queue_size);
130 CLS_LOG(20, "INFO: init_queue_op head size %lu", head.max_head_size);
131 CLS_LOG(20, "INFO: init_queue_op queue front offset %s", head.front.to_str().c_str());
132 CLS_LOG(20, "INFO: init_queue_op queue max urgent data size %lu", head.max_urgent_data_size);
133
134 return queue_write_head(hctx, head);
135 }
136
137 int queue_get_capacity(cls_method_context_t hctx, cls_queue_get_capacity_ret& op_ret)
138 {
139 //get head
140 cls_queue_head head;
141 int ret = queue_read_head(hctx, head);
142 if (ret < 0) {
143 return ret;
144 }
145
146 op_ret.queue_capacity = head.queue_size - head.max_head_size;
147
148 CLS_LOG(20, "INFO: queue_get_capacity: size of queue is %lu", op_ret.queue_capacity);
149
150 return 0;
151 }
152
153
154 /*
155 enqueue of new bufferlist happens in the free spaces of the queue, the queue can be in
156 one of two states:
157
158 (1) split free space
159 +-------------+--------------------------------------------------------------------+
160 | object head | XXXXXXXXXXXXXXXXXXXXXXXXXXX |
161 | | ^ ^ |
162 | front tail | | | |
163 +---+------+--+----------------|-------------------------|-------------------------+
164 | | | |
165 | +-------------------|-------------------------+
166 +--------------------------+
167
168 (2) continuous free space
169 +-------------+--------------------------------------------------------------------+
170 | object head |XXXXXXXXXXXXXXXXX XXXXXXXXXXXXXXXXXXXXXXXXXX|
171 | | ^ ^ |
172 | front tail | | | |
173 +---+------+--+----------------|-------------------------|-------------------------+
174 | | | |
175 | +-------------------+ |
176 +----------------------------------------------------+
177 */
178
179 int queue_enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head)
180 {
181 if ((head.front.offset == head.tail.offset) && (head.tail.gen == head.front.gen + 1)) {
182 CLS_LOG(0, "ERROR: No space left in queue");
183 return -ENOSPC;
184 }
185
186 for (auto& bl_data : op.bl_data_vec) {
187 bufferlist bl;
188 uint16_t entry_start = QUEUE_ENTRY_START;
189 encode(entry_start, bl);
190 uint64_t data_size = bl_data.length();
191 encode(data_size, bl);
192 bl.claim_append(bl_data);
193
194 CLS_LOG(10, "INFO: queue_enqueue(): Total size to be written is %u and data size is %lu", bl.length(), data_size);
195
196 if (head.tail.offset >= head.front.offset) {
197 // check if data can fit in the remaining space in queue
198 if ((head.tail.offset + bl.length()) <= head.queue_size) {
199 CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u", head.tail.to_str().c_str(), bl.length());
200 //write data size and data at tail offset
201 auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
202 if (ret < 0) {
203 return ret;
204 }
205 head.tail.offset += bl.length();
206 } else {
207 uint64_t free_space_available = (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size);
208 //Split data if there is free space available
209 if (bl.length() <= free_space_available) {
210 uint64_t size_before_wrap = head.queue_size - head.tail.offset;
211 bufferlist bl_data_before_wrap;
212 bl.splice(0, size_before_wrap, &bl_data_before_wrap);
213 //write spliced (data size and data) at tail offset
214 CLS_LOG(5, "INFO: queue_enqueue: Writing spliced data at offset: %s and data size: %u", head.tail.to_str().c_str(), bl_data_before_wrap.length());
215 auto ret = cls_cxx_write2(hctx, head.tail.offset, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
216 if (ret < 0) {
217 return ret;
218 }
219 head.tail.offset = head.max_head_size;
220 head.tail.gen += 1;
221 //write remaining data at tail offset after wrapping around
222 CLS_LOG(5, "INFO: queue_enqueue: Writing remaining data at offset: %s and data size: %u", head.tail.to_str().c_str(), bl.length());
223 ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
224 if (ret < 0) {
225 return ret;
226 }
227 head.tail.offset += bl.length();
228 } else {
229 CLS_LOG(0, "ERROR: No space left in queue\n");
230 // return queue full error
231 return -ENOSPC;
232 }
233 }
234 } else if (head.front.offset > head.tail.offset) {
235 if ((head.tail.offset + bl.length()) <= head.front.offset) {
236 CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u", head.tail.to_str().c_str(), bl.length());
237 //write data size and data at tail offset
238 auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
239 if (ret < 0) {
240 return ret;
241 }
242 head.tail.offset += bl.length();
243 } else {
244 CLS_LOG(0, "ERROR: No space left in queue");
245 // return queue full error
246 return -ENOSPC;
247 }
248 }
249
250 if (head.tail.offset == head.queue_size) {
251 head.tail.offset = head.max_head_size;
252 head.tail.gen += 1;
253 }
254 CLS_LOG(20, "INFO: queue_enqueue: New tail offset: %s", head.tail.to_str().c_str());
255 } //end - for
256
257 return 0;
258 }
259
260 int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, cls_queue_list_ret& op_ret, cls_queue_head& head)
261 {
262 // If queue is empty, return from here
263 if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) {
264 CLS_LOG(20, "INFO: queue_list_entries(): Next offset is %s", head.front.to_str().c_str());
265 op_ret.next_marker = head.front.to_str();
266 op_ret.is_truncated = false;
267 return 0;
268 }
269
270 cls_queue_marker start_marker;
271 start_marker.from_str(op.start_marker.c_str());
272 cls_queue_marker next_marker = {0, 0};
273
274 uint64_t start_offset = 0, gen = 0;
275 if (start_marker.offset == 0) {
276 start_offset = head.front.offset;
277 gen = head.front.gen;
278 } else {
279 start_offset = start_marker.offset;
280 gen = start_marker.gen;
281 }
282
283 op_ret.is_truncated = true;
284 uint64_t chunk_size = 1024;
285 uint64_t contiguous_data_size = 0, size_to_read = 0;
286 bool wrap_around = false;
287
288 //Calculate length of contiguous data to be read depending on front, tail and start offset
289 if (head.tail.offset > head.front.offset) {
290 contiguous_data_size = head.tail.offset - start_offset;
291 } else if (head.front.offset >= head.tail.offset) {
292 if (start_offset >= head.front.offset) {
293 contiguous_data_size = head.queue_size - start_offset;
294 wrap_around = true;
295 } else if (start_offset <= head.tail.offset) {
296 contiguous_data_size = head.tail.offset - start_offset;
297 }
298 }
299
300 CLS_LOG(10, "INFO: queue_list_entries(): front is: %s, tail is %s", head.front.to_str().c_str(), head.tail.to_str().c_str());
301
302 bool offset_populated = false, entry_start_processed = false;
303 uint64_t data_size = 0, num_ops = 0;
304 uint16_t entry_start = 0;
305 bufferlist bl;
306 string last_marker;
307 do
308 {
309 CLS_LOG(10, "INFO: queue_list_entries(): start_offset is %lu", start_offset);
310
311 bufferlist bl_chunk;
312 //Read chunk size at a time, if it is less than contiguous data size, else read contiguous data size
313 if (contiguous_data_size > chunk_size) {
314 size_to_read = chunk_size;
315 } else {
316 size_to_read = contiguous_data_size;
317 }
318 CLS_LOG(10, "INFO: queue_list_entries(): size_to_read is %lu", size_to_read);
319 if (size_to_read == 0) {
320 next_marker = head.tail;
321 op_ret.is_truncated = false;
322 CLS_LOG(20, "INFO: queue_list_entries(): size_to_read is 0, hence breaking out!\n");
323 break;
324 }
325
326 auto ret = cls_cxx_read(hctx, start_offset, size_to_read, &bl_chunk);
327 if (ret < 0) {
328 return ret;
329 }
330
331 //If there is leftover data from previous iteration, append new data to leftover data
332 uint64_t entry_start_offset = start_offset - bl.length();
333 CLS_LOG(20, "INFO: queue_list_entries(): Entry start offset accounting for leftover data is %lu", entry_start_offset);
334 bl.claim_append(bl_chunk);
335 bl_chunk = std::move(bl);
336
337 CLS_LOG(20, "INFO: queue_list_entries(): size of chunk %u", bl_chunk.length());
338
339 //Process the chunk of data read
340 unsigned index = 0;
341 auto it = bl_chunk.cbegin();
342 uint64_t size_to_process = bl_chunk.length();
343 do {
344 CLS_LOG(10, "INFO: queue_list_entries(): index: %u, size_to_process: %lu", index, size_to_process);
345 cls_queue_entry entry;
346 ceph_assert(it.get_off() == index);
347 //Use the last marker saved in previous iteration as the marker for this entry
348 if (offset_populated) {
349 entry.marker = last_marker;
350 }
351 //Populate offset if not done in previous iteration
352 if (! offset_populated) {
353 cls_queue_marker marker = {entry_start_offset + index, gen};
354 CLS_LOG(5, "INFO: queue_list_entries(): offset: %s\n", marker.to_str().c_str());
355 entry.marker = marker.to_str();
356 }
357 // Magic number + Data size - process if not done in previous iteration
358 if (! entry_start_processed ) {
359 if (size_to_process >= QUEUE_ENTRY_OVERHEAD) {
360 // Decode magic number at start
361 try {
362 decode(entry_start, it);
363 } catch (const ceph::buffer::error& err) {
364 CLS_LOG(10, "ERROR: queue_list_entries: failed to decode entry start: %s", err.what());
365 return -EINVAL;
366 }
367 if (entry_start != QUEUE_ENTRY_START) {
368 CLS_LOG(5, "ERROR: queue_list_entries: invalid entry start %u", entry_start);
369 return -EINVAL;
370 }
371 index += sizeof(uint16_t);
372 size_to_process -= sizeof(uint16_t);
373 // Decode data size
374 try {
375 decode(data_size, it);
376 } catch (const ceph::buffer::error& err) {
377 CLS_LOG(10, "ERROR: queue_list_entries: failed to decode data size: %s", err.what());
378 return -EINVAL;
379 }
380 } else {
381 // Copy unprocessed data to bl
382 bl_chunk.splice(index, size_to_process, &bl);
383 offset_populated = true;
384 last_marker = entry.marker;
385 CLS_LOG(10, "INFO: queue_list_entries: not enough data to read entry start and data size, breaking out!");
386 break;
387 }
388 CLS_LOG(20, "INFO: queue_list_entries(): data size: %lu", data_size);
389 index += sizeof(uint64_t);
390 size_to_process -= sizeof(uint64_t);
391 }
392 // Data
393 if (data_size <= size_to_process) {
394 it.copy(data_size, entry.data);
395 index += entry.data.length();
396 size_to_process -= entry.data.length();
397 } else {
398 it.copy(size_to_process, bl);
399 offset_populated = true;
400 entry_start_processed = true;
401 last_marker = entry.marker;
402 CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!");
403 break;
404 }
405 op_ret.entries.emplace_back(entry);
406 // Resetting some values
407 offset_populated = false;
408 entry_start_processed = false;
409 data_size = 0;
410 entry_start = 0;
411 num_ops++;
412 last_marker.clear();
413 if (num_ops == op.max) {
414 CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!");
415 break;
416 }
417 } while(index < bl_chunk.length());
418
419 CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max);
420
421 if (num_ops == op.max) {
422 next_marker = cls_queue_marker{(entry_start_offset + index), gen};
423 CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu", next_marker.offset);
424 break;
425 }
426
427 //Calculate new start_offset and contiguous data size
428 start_offset += size_to_read;
429 contiguous_data_size -= size_to_read;
430 if (contiguous_data_size == 0) {
431 if (wrap_around) {
432 start_offset = head.max_head_size;
433 contiguous_data_size = head.tail.offset - head.max_head_size;
434 gen += 1;
435 wrap_around = false;
436 } else {
437 CLS_LOG(10, "INFO: queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!");
438 next_marker = head.tail;
439 op_ret.is_truncated = false;
440 break;
441 }
442 }
443
444 } while(num_ops < op.max);
445
446 //Wrap around next offset if it has reached end of queue
447 if (next_marker.offset == head.queue_size) {
448 next_marker.offset = head.max_head_size;
449 next_marker.gen += 1;
450 }
451 if ((next_marker.offset == head.tail.offset) && (next_marker.gen == head.tail.gen)) {
452 op_ret.is_truncated = false;
453 }
454
455 CLS_LOG(5, "INFO: queue_list_entries(): next offset: %s", next_marker.to_str().c_str());
456 op_ret.next_marker = next_marker.to_str();
457
458 return 0;
459 }
460
461 int queue_remove_entries(cls_method_context_t hctx, const cls_queue_remove_op& op, cls_queue_head& head)
462 {
463 //Queue is empty
464 if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) {
465 return 0;
466 }
467
468 cls_queue_marker end_marker;
469 end_marker.from_str(op.end_marker.c_str());
470
471 CLS_LOG(5, "INFO: queue_remove_entries: op.end_marker = %s", end_marker.to_str().c_str());
472
473 //Zero out the entries that have been removed, to reclaim storage space
474 if (end_marker.offset > head.front.offset && end_marker.gen == head.front.gen) {
475 uint64_t len = end_marker.offset - head.front.offset;
476 if (len > 0) {
477 auto ret = cls_cxx_write_zero(hctx, head.front.offset, len);
478 if (ret < 0) {
479 CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries");
480 CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s", head.front.to_str().c_str());
481 return ret;
482 }
483 }
484 } else if ((head.front.offset >= end_marker.offset) && (end_marker.gen == head.front.gen + 1)) { //start offset > end offset
485 uint64_t len = head.queue_size - head.front.offset;
486 if (len > 0) {
487 auto ret = cls_cxx_write_zero(hctx, head.front.offset, len);
488 if (ret < 0) {
489 CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries");
490 CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s", head.front.to_str().c_str());
491 return ret;
492 }
493 }
494 len = end_marker.offset - head.max_head_size;
495 if (len > 0) {
496 auto ret = cls_cxx_write_zero(hctx, head.max_head_size, len);
497 if (ret < 0) {
498 CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries");
499 CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %lu", head.max_head_size);
500 return ret;
501 }
502 }
503 } else if ((head.front.offset == end_marker.offset) && (head.front.gen == end_marker.gen)) {
504 //no-op
505 } else {
506 CLS_LOG(0, "INFO: queue_remove_entries: Invalid end marker: offset = %s, gen = %lu", end_marker.to_str().c_str(), end_marker.gen);
507 return -EINVAL;
508 }
509
510 head.front = end_marker;
511
512 // Check if it is the end, then wrap around
513 if (head.front.offset == head.queue_size) {
514 head.front.offset = head.max_head_size;
515 head.front.gen += 1;
516 }
517
518 CLS_LOG(20, "INFO: queue_remove_entries: front offset is: %s and tail offset is %s", head.front.to_str().c_str(), head.tail.to_str().c_str());
519
520 return 0;
521 }