1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/types.h"
6 #include "objclass/objclass.h"
7 #include "cls/queue/cls_queue_types.h"
8 #include "cls/queue/cls_queue_ops.h"
9 #include "cls/queue/cls_queue_const.h"
10 #include "cls/queue/cls_queue_src.h"
13 using ceph::bufferlist
;
17 int queue_write_head(cls_method_context_t hctx
, cls_queue_head
& head
)
20 uint16_t entry_start
= QUEUE_HEAD_START
;
21 encode(entry_start
, bl
);
24 encode(head
, bl_head
);
26 uint64_t encoded_len
= bl_head
.length();
27 encode(encoded_len
, bl
);
29 bl
.claim_append(bl_head
);
31 if (bl
.length() > head
.max_head_size
) {
32 CLS_LOG(0, "ERROR: queue_write_head: invalid head size = %u and urgent data size = %u \n", bl
.length(), head
.bl_urgent_data
.length());
36 int ret
= cls_cxx_write2(hctx
, 0, bl
.length(), &bl
, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED
);
38 CLS_LOG(5, "ERROR: queue_write_head: failed to write head");
44 int queue_read_head(cls_method_context_t hctx
, cls_queue_head
& head
)
46 uint64_t chunk_size
= 1024, start_offset
= 0;
49 const auto ret
= cls_cxx_read(hctx
, start_offset
, chunk_size
, &bl_head
);
51 CLS_LOG(5, "ERROR: queue_read_head: failed to read head");
55 CLS_LOG(20, "INFO: queue_read_head: empty head, not initialized yet");
59 //Process the chunk of data read
60 auto it
= bl_head
.cbegin();
62 uint16_t queue_head_start
;
64 decode(queue_head_start
, it
);
65 } catch (const ceph::buffer::error
& err
) {
66 CLS_LOG(0, "ERROR: queue_read_head: failed to decode queue start: %s", err
.what());
69 if (queue_head_start
!= QUEUE_HEAD_START
) {
70 CLS_LOG(0, "ERROR: queue_read_head: invalid queue start");
76 decode(encoded_len
, it
);
77 } catch (const ceph::buffer::error
& err
) {
78 CLS_LOG(0, "ERROR: queue_read_head: failed to decode encoded head size: %s", err
.what());
82 if (encoded_len
> (chunk_size
- QUEUE_ENTRY_OVERHEAD
)) {
83 start_offset
= chunk_size
;
84 chunk_size
= (encoded_len
- (chunk_size
- QUEUE_ENTRY_OVERHEAD
));
85 bufferlist bl_remaining_head
;
86 const auto ret
= cls_cxx_read2(hctx
, start_offset
, chunk_size
, &bl_remaining_head
, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
);
88 CLS_LOG(5, "ERROR: queue_read_head: failed to read remaining part of head");
91 bl_head
.claim_append(bl_remaining_head
);
96 } catch (const ceph::buffer::error
& err
) {
97 CLS_LOG(0, "ERROR: queue_read_head: failed to decode head: %s", err
.what());
104 int queue_init(cls_method_context_t hctx
, const cls_queue_init_op
& op
)
106 //get head and its size
108 int ret
= queue_read_head(hctx
, head
);
110 //head is already initialized
115 if (ret
< 0 && ret
!= -EINVAL
) {
119 if (op
.bl_urgent_data
.length() > 0) {
120 head
.bl_urgent_data
= op
.bl_urgent_data
;
123 head
.max_head_size
= QUEUE_HEAD_SIZE_1K
+ op
.max_urgent_data_size
;
124 head
.queue_size
= op
.queue_size
+ head
.max_head_size
;
125 head
.max_urgent_data_size
= op
.max_urgent_data_size
;
126 head
.tail
.gen
= head
.front
.gen
= 0;
127 head
.tail
.offset
= head
.front
.offset
= head
.max_head_size
;
129 CLS_LOG(20, "INFO: init_queue_op queue actual size %lu", head
.queue_size
);
130 CLS_LOG(20, "INFO: init_queue_op head size %lu", head
.max_head_size
);
131 CLS_LOG(20, "INFO: init_queue_op queue front offset %s", head
.front
.to_str().c_str());
132 CLS_LOG(20, "INFO: init_queue_op queue max urgent data size %lu", head
.max_urgent_data_size
);
134 return queue_write_head(hctx
, head
);
137 int queue_get_capacity(cls_method_context_t hctx
, cls_queue_get_capacity_ret
& op_ret
)
141 int ret
= queue_read_head(hctx
, head
);
146 op_ret
.queue_capacity
= head
.queue_size
- head
.max_head_size
;
148 CLS_LOG(20, "INFO: queue_get_capacity: size of queue is %lu", op_ret
.queue_capacity
);
155 enqueue of new bufferlist happens in the free spaces of the queue, the queue can be in
159 +-------------+--------------------------------------------------------------------+
160 | object head | XXXXXXXXXXXXXXXXXXXXXXXXXXX |
163 +---+------+--+----------------|-------------------------|-------------------------+
165 | +-------------------|-------------------------+
166 +--------------------------+
168 (2) continuous free space
169 +-------------+--------------------------------------------------------------------+
170 | object head |XXXXXXXXXXXXXXXXX XXXXXXXXXXXXXXXXXXXXXXXXXX|
173 +---+------+--+----------------|-------------------------|-------------------------+
175 | +-------------------+ |
176 +----------------------------------------------------+
179 int queue_enqueue(cls_method_context_t hctx
, cls_queue_enqueue_op
& op
, cls_queue_head
& head
)
181 if ((head
.front
.offset
== head
.tail
.offset
) && (head
.tail
.gen
== head
.front
.gen
+ 1)) {
182 CLS_LOG(0, "ERROR: No space left in queue");
186 for (auto& bl_data
: op
.bl_data_vec
) {
188 uint16_t entry_start
= QUEUE_ENTRY_START
;
189 encode(entry_start
, bl
);
190 uint64_t data_size
= bl_data
.length();
191 encode(data_size
, bl
);
192 bl
.claim_append(bl_data
);
194 CLS_LOG(10, "INFO: queue_enqueue(): Total size to be written is %u and data size is %lu", bl
.length(), data_size
);
196 if (head
.tail
.offset
>= head
.front
.offset
) {
197 // check if data can fit in the remaining space in queue
198 if ((head
.tail
.offset
+ bl
.length()) <= head
.queue_size
) {
199 CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u", head
.tail
.to_str().c_str(), bl
.length());
200 //write data size and data at tail offset
201 auto ret
= cls_cxx_write2(hctx
, head
.tail
.offset
, bl
.length(), &bl
, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
);
205 head
.tail
.offset
+= bl
.length();
207 uint64_t free_space_available
= (head
.queue_size
- head
.tail
.offset
) + (head
.front
.offset
- head
.max_head_size
);
208 //Split data if there is free space available
209 if (bl
.length() <= free_space_available
) {
210 uint64_t size_before_wrap
= head
.queue_size
- head
.tail
.offset
;
211 bufferlist bl_data_before_wrap
;
212 bl
.splice(0, size_before_wrap
, &bl_data_before_wrap
);
213 //write spliced (data size and data) at tail offset
214 CLS_LOG(5, "INFO: queue_enqueue: Writing spliced data at offset: %s and data size: %u", head
.tail
.to_str().c_str(), bl_data_before_wrap
.length());
215 auto ret
= cls_cxx_write2(hctx
, head
.tail
.offset
, bl_data_before_wrap
.length(), &bl_data_before_wrap
, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
);
219 head
.tail
.offset
= head
.max_head_size
;
221 //write remaining data at tail offset after wrapping around
222 CLS_LOG(5, "INFO: queue_enqueue: Writing remaining data at offset: %s and data size: %u", head
.tail
.to_str().c_str(), bl
.length());
223 ret
= cls_cxx_write2(hctx
, head
.tail
.offset
, bl
.length(), &bl
, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
);
227 head
.tail
.offset
+= bl
.length();
229 CLS_LOG(0, "ERROR: No space left in queue\n");
230 // return queue full error
234 } else if (head
.front
.offset
> head
.tail
.offset
) {
235 if ((head
.tail
.offset
+ bl
.length()) <= head
.front
.offset
) {
236 CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u", head
.tail
.to_str().c_str(), bl
.length());
237 //write data size and data at tail offset
238 auto ret
= cls_cxx_write2(hctx
, head
.tail
.offset
, bl
.length(), &bl
, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
);
242 head
.tail
.offset
+= bl
.length();
244 CLS_LOG(0, "ERROR: No space left in queue");
245 // return queue full error
250 if (head
.tail
.offset
== head
.queue_size
) {
251 head
.tail
.offset
= head
.max_head_size
;
254 CLS_LOG(20, "INFO: queue_enqueue: New tail offset: %s", head
.tail
.to_str().c_str());
260 int queue_list_entries(cls_method_context_t hctx
, const cls_queue_list_op
& op
, cls_queue_list_ret
& op_ret
, cls_queue_head
& head
)
262 // If queue is empty, return from here
263 if ((head
.front
.offset
== head
.tail
.offset
) && (head
.front
.gen
== head
.tail
.gen
)) {
264 CLS_LOG(20, "INFO: queue_list_entries(): Next offset is %s", head
.front
.to_str().c_str());
265 op_ret
.next_marker
= head
.front
.to_str();
266 op_ret
.is_truncated
= false;
270 cls_queue_marker start_marker
;
271 start_marker
.from_str(op
.start_marker
.c_str());
272 cls_queue_marker next_marker
= {0, 0};
274 uint64_t start_offset
= 0, gen
= 0;
275 if (start_marker
.offset
== 0) {
276 start_offset
= head
.front
.offset
;
277 gen
= head
.front
.gen
;
279 start_offset
= start_marker
.offset
;
280 gen
= start_marker
.gen
;
283 op_ret
.is_truncated
= true;
284 uint64_t chunk_size
= 1024;
285 uint64_t contiguous_data_size
= 0, size_to_read
= 0;
286 bool wrap_around
= false;
288 //Calculate length of contiguous data to be read depending on front, tail and start offset
289 if (head
.tail
.offset
> head
.front
.offset
) {
290 contiguous_data_size
= head
.tail
.offset
- start_offset
;
291 } else if (head
.front
.offset
>= head
.tail
.offset
) {
292 if (start_offset
>= head
.front
.offset
) {
293 contiguous_data_size
= head
.queue_size
- start_offset
;
295 } else if (start_offset
<= head
.tail
.offset
) {
296 contiguous_data_size
= head
.tail
.offset
- start_offset
;
300 CLS_LOG(10, "INFO: queue_list_entries(): front is: %s, tail is %s", head
.front
.to_str().c_str(), head
.tail
.to_str().c_str());
302 bool offset_populated
= false, entry_start_processed
= false;
303 uint64_t data_size
= 0, num_ops
= 0;
304 uint16_t entry_start
= 0;
309 CLS_LOG(10, "INFO: queue_list_entries(): start_offset is %lu", start_offset
);
312 //Read chunk size at a time, if it is less than contiguous data size, else read contiguous data size
313 if (contiguous_data_size
> chunk_size
) {
314 size_to_read
= chunk_size
;
316 size_to_read
= contiguous_data_size
;
318 CLS_LOG(10, "INFO: queue_list_entries(): size_to_read is %lu", size_to_read
);
319 if (size_to_read
== 0) {
320 next_marker
= head
.tail
;
321 op_ret
.is_truncated
= false;
322 CLS_LOG(20, "INFO: queue_list_entries(): size_to_read is 0, hence breaking out!\n");
326 auto ret
= cls_cxx_read(hctx
, start_offset
, size_to_read
, &bl_chunk
);
331 //If there is leftover data from previous iteration, append new data to leftover data
332 uint64_t entry_start_offset
= start_offset
- bl
.length();
333 CLS_LOG(20, "INFO: queue_list_entries(): Entry start offset accounting for leftover data is %lu", entry_start_offset
);
334 bl
.claim_append(bl_chunk
);
335 bl_chunk
= std::move(bl
);
337 CLS_LOG(20, "INFO: queue_list_entries(): size of chunk %u", bl_chunk
.length());
339 //Process the chunk of data read
341 auto it
= bl_chunk
.cbegin();
342 uint64_t size_to_process
= bl_chunk
.length();
344 CLS_LOG(10, "INFO: queue_list_entries(): index: %u, size_to_process: %lu", index
, size_to_process
);
345 cls_queue_entry entry
;
346 ceph_assert(it
.get_off() == index
);
347 //Use the last marker saved in previous iteration as the marker for this entry
348 if (offset_populated
) {
349 entry
.marker
= last_marker
;
351 //Populate offset if not done in previous iteration
352 if (! offset_populated
) {
353 cls_queue_marker marker
= {entry_start_offset
+ index
, gen
};
354 CLS_LOG(5, "INFO: queue_list_entries(): offset: %s\n", marker
.to_str().c_str());
355 entry
.marker
= marker
.to_str();
357 // Magic number + Data size - process if not done in previous iteration
358 if (! entry_start_processed
) {
359 if (size_to_process
>= QUEUE_ENTRY_OVERHEAD
) {
360 // Decode magic number at start
362 decode(entry_start
, it
);
363 } catch (const ceph::buffer::error
& err
) {
364 CLS_LOG(10, "ERROR: queue_list_entries: failed to decode entry start: %s", err
.what());
367 if (entry_start
!= QUEUE_ENTRY_START
) {
368 CLS_LOG(5, "ERROR: queue_list_entries: invalid entry start %u", entry_start
);
371 index
+= sizeof(uint16_t);
372 size_to_process
-= sizeof(uint16_t);
375 decode(data_size
, it
);
376 } catch (const ceph::buffer::error
& err
) {
377 CLS_LOG(10, "ERROR: queue_list_entries: failed to decode data size: %s", err
.what());
381 // Copy unprocessed data to bl
382 bl_chunk
.splice(index
, size_to_process
, &bl
);
383 offset_populated
= true;
384 last_marker
= entry
.marker
;
385 CLS_LOG(10, "INFO: queue_list_entries: not enough data to read entry start and data size, breaking out!");
388 CLS_LOG(20, "INFO: queue_list_entries(): data size: %lu", data_size
);
389 index
+= sizeof(uint64_t);
390 size_to_process
-= sizeof(uint64_t);
393 if (data_size
<= size_to_process
) {
394 it
.copy(data_size
, entry
.data
);
395 index
+= entry
.data
.length();
396 size_to_process
-= entry
.data
.length();
398 it
.copy(size_to_process
, bl
);
399 offset_populated
= true;
400 entry_start_processed
= true;
401 last_marker
= entry
.marker
;
402 CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!");
405 op_ret
.entries
.emplace_back(entry
);
406 // Resetting some values
407 offset_populated
= false;
408 entry_start_processed
= false;
413 if (num_ops
== op
.max
) {
414 CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!");
417 } while(index
< bl_chunk
.length());
419 CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu\n", num_ops
, op
.max
);
421 if (num_ops
== op
.max
) {
422 next_marker
= cls_queue_marker
{(entry_start_offset
+ index
), gen
};
423 CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu", next_marker
.offset
);
427 //Calculate new start_offset and contiguous data size
428 start_offset
+= size_to_read
;
429 contiguous_data_size
-= size_to_read
;
430 if (contiguous_data_size
== 0) {
432 start_offset
= head
.max_head_size
;
433 contiguous_data_size
= head
.tail
.offset
- head
.max_head_size
;
437 CLS_LOG(10, "INFO: queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!");
438 next_marker
= head
.tail
;
439 op_ret
.is_truncated
= false;
444 } while(num_ops
< op
.max
);
446 //Wrap around next offset if it has reached end of queue
447 if (next_marker
.offset
== head
.queue_size
) {
448 next_marker
.offset
= head
.max_head_size
;
449 next_marker
.gen
+= 1;
451 if ((next_marker
.offset
== head
.tail
.offset
) && (next_marker
.gen
== head
.tail
.gen
)) {
452 op_ret
.is_truncated
= false;
455 CLS_LOG(5, "INFO: queue_list_entries(): next offset: %s", next_marker
.to_str().c_str());
456 op_ret
.next_marker
= next_marker
.to_str();
461 int queue_remove_entries(cls_method_context_t hctx
, const cls_queue_remove_op
& op
, cls_queue_head
& head
)
464 if ((head
.front
.offset
== head
.tail
.offset
) && (head
.front
.gen
== head
.tail
.gen
)) {
468 cls_queue_marker end_marker
;
469 end_marker
.from_str(op
.end_marker
.c_str());
471 CLS_LOG(5, "INFO: queue_remove_entries: op.end_marker = %s", end_marker
.to_str().c_str());
473 //Zero out the entries that have been removed, to reclaim storage space
474 if (end_marker
.offset
> head
.front
.offset
&& end_marker
.gen
== head
.front
.gen
) {
475 uint64_t len
= end_marker
.offset
- head
.front
.offset
;
477 auto ret
= cls_cxx_write_zero(hctx
, head
.front
.offset
, len
);
479 CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries");
480 CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s", head
.front
.to_str().c_str());
484 } else if ((head
.front
.offset
>= end_marker
.offset
) && (end_marker
.gen
== head
.front
.gen
+ 1)) { //start offset > end offset
485 uint64_t len
= head
.queue_size
- head
.front
.offset
;
487 auto ret
= cls_cxx_write_zero(hctx
, head
.front
.offset
, len
);
489 CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries");
490 CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s", head
.front
.to_str().c_str());
494 len
= end_marker
.offset
- head
.max_head_size
;
496 auto ret
= cls_cxx_write_zero(hctx
, head
.max_head_size
, len
);
498 CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries");
499 CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %lu", head
.max_head_size
);
503 } else if ((head
.front
.offset
== end_marker
.offset
) && (head
.front
.gen
== end_marker
.gen
)) {
506 CLS_LOG(0, "INFO: queue_remove_entries: Invalid end marker: offset = %s, gen = %lu", end_marker
.to_str().c_str(), end_marker
.gen
);
510 head
.front
= end_marker
;
512 // Check if it is the end, then wrap around
513 if (head
.front
.offset
== head
.queue_size
) {
514 head
.front
.offset
= head
.max_head_size
;
518 CLS_LOG(20, "INFO: queue_remove_entries: front offset is: %s and tail offset is %s", head
.front
.to_str().c_str(), head
.tail
.to_str().c_str());