1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/types.h"
8 #include "objclass/objclass.h"
9 #include "cls/queue/cls_queue_types.h"
10 #include "cls/queue/cls_queue_ops.h"
11 #include "cls/queue/cls_queue_const.h"
12 #include "cls/queue/cls_queue_src.h"
14 int queue_write_head(cls_method_context_t hctx
, cls_queue_head
& head
)
17 uint16_t entry_start
= QUEUE_HEAD_START
;
18 encode(entry_start
, bl
);
21 encode(head
, bl_head
);
23 uint64_t encoded_len
= bl_head
.length();
24 encode(encoded_len
, bl
);
26 bl
.claim_append(bl_head
);
28 if (bl
.length() > head
.max_head_size
) {
29 CLS_LOG(0, "ERROR: queue_write_head: invalid head size = %u and urgent data size = %u \n", bl
.length(), head
.bl_urgent_data
.length());
33 int ret
= cls_cxx_write2(hctx
, 0, bl
.length(), &bl
, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED
);
35 CLS_LOG(5, "ERROR: queue_write_head: failed to write head\n");
41 int queue_read_head(cls_method_context_t hctx
, cls_queue_head
& head
)
43 uint64_t chunk_size
= 1024, start_offset
= 0;
46 const auto ret
= cls_cxx_read(hctx
, start_offset
, chunk_size
, &bl_head
);
48 CLS_LOG(5, "ERROR: queue_read_head: failed to read head\n");
52 //Process the chunk of data read
53 auto it
= bl_head
.cbegin();
55 uint16_t queue_head_start
;
57 decode(queue_head_start
, it
);
58 } catch (buffer::error
& err
) {
59 CLS_LOG(0, "ERROR: queue_read_head: failed to decode queue start: %s \n", err
.what());
62 if (queue_head_start
!= QUEUE_HEAD_START
) {
63 CLS_LOG(0, "ERROR: queue_read_head: invalid queue start\n");
69 decode(encoded_len
, it
);
70 } catch (buffer::error
& err
) {
71 CLS_LOG(0, "ERROR: queue_read_head: failed to decode encoded head size: %s\n", err
.what());
75 constexpr auto decoded_head_size
= sizeof(queue_head_start
) + sizeof(encoded_len
);
76 if (encoded_len
> (chunk_size
- decoded_head_size
)) {
77 start_offset
= chunk_size
;
78 chunk_size
= (encoded_len
- (chunk_size
- decoded_head_size
));
79 bufferlist bl_remaining_head
;
80 const auto ret
= cls_cxx_read2(hctx
, start_offset
, chunk_size
, &bl_remaining_head
, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
);
82 CLS_LOG(5, "ERROR: queue_read_head: failed to read remaining part of head\n");
85 bl_head
.claim_append(bl_remaining_head
);
90 } catch (buffer::error
& err
) {
91 CLS_LOG(0, "ERROR: queue_read_head: failed to decode head: %s\n", err
.what());
98 int queue_init(cls_method_context_t hctx
, const cls_queue_init_op
& op
)
100 //get head and its size
102 int ret
= queue_read_head(hctx
, head
);
104 //head is already initialized
109 if (ret
< 0 && ret
!= -EINVAL
) {
113 if (op
.bl_urgent_data
.length() > 0) {
114 head
.bl_urgent_data
= op
.bl_urgent_data
;
117 head
.max_head_size
= QUEUE_HEAD_SIZE_1K
+ op
.max_urgent_data_size
;
118 head
.queue_size
= op
.queue_size
+ head
.max_head_size
;
119 head
.max_urgent_data_size
= op
.max_urgent_data_size
;
120 head
.tail
.gen
= head
.front
.gen
= 0;
121 head
.tail
.offset
= head
.front
.offset
= head
.max_head_size
;
123 CLS_LOG(20, "INFO: init_queue_op queue actual size %lu", head
.queue_size
);
124 CLS_LOG(20, "INFO: init_queue_op head size %lu", head
.max_head_size
);
125 CLS_LOG(20, "INFO: init_queue_op queue front offset %s", head
.front
.to_str().c_str());
126 CLS_LOG(20, "INFO: init_queue_op queue max urgent data size %lu", head
.max_urgent_data_size
);
128 return queue_write_head(hctx
, head
);
131 int queue_get_capacity(cls_method_context_t hctx
, cls_queue_get_capacity_ret
& op_ret
)
135 int ret
= queue_read_head(hctx
, head
);
140 op_ret
.queue_capacity
= head
.queue_size
- head
.max_head_size
;
142 CLS_LOG(20, "INFO: queue_get_capacity: size of queue is %lu\n", op_ret
.queue_capacity
);
147 int queue_enqueue(cls_method_context_t hctx
, cls_queue_enqueue_op
& op
, cls_queue_head
& head
)
149 if ((head
.front
.offset
== head
.tail
.offset
) && (head
.tail
.gen
== head
.front
.gen
+ 1)) {
150 CLS_LOG(0, "ERROR: No space left in queue\n");
154 for (auto& bl_data
: op
.bl_data_vec
) {
156 uint16_t entry_start
= QUEUE_ENTRY_START
;
157 encode(entry_start
, bl
);
158 uint64_t data_size
= bl_data
.length();
159 encode(data_size
, bl
);
160 bl
.claim_append(bl_data
);
162 CLS_LOG(10, "INFO: queue_enqueue(): Total size to be written is %u and data size is %lu\n", bl
.length(), data_size
);
164 if (head
.tail
.offset
>= head
.front
.offset
) {
165 // check if data can fit in the remaining space in queue
166 if ((head
.tail
.offset
+ bl
.length()) <= head
.queue_size
) {
167 CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u\n", head
.tail
.to_str().c_str(), bl
.length());
168 //write data size and data at tail offset
169 auto ret
= cls_cxx_write2(hctx
, head
.tail
.offset
, bl
.length(), &bl
, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
);
173 head
.tail
.offset
+= bl
.length();
175 uint64_t free_space_available
= (head
.queue_size
- head
.tail
.offset
) + (head
.front
.offset
- head
.max_head_size
);
176 //Split data if there is free space available
177 if (bl
.length() <= free_space_available
) {
178 uint64_t size_before_wrap
= head
.queue_size
- head
.tail
.offset
;
179 bufferlist bl_data_before_wrap
;
180 bl
.splice(0, size_before_wrap
, &bl_data_before_wrap
);
181 //write spliced (data size and data) at tail offset
182 CLS_LOG(5, "INFO: queue_enqueue: Writing spliced data at offset: %s and data size: %u\n", head
.tail
.to_str().c_str(), bl_data_before_wrap
.length());
183 auto ret
= cls_cxx_write2(hctx
, head
.tail
.offset
, bl_data_before_wrap
.length(), &bl_data_before_wrap
, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
);
187 head
.tail
.offset
= head
.max_head_size
;
189 //write remaining data at tail offset after wrapping around
190 CLS_LOG(5, "INFO: queue_enqueue: Writing remaining data at offset: %s and data size: %u\n", head
.tail
.to_str().c_str(), bl
.length());
191 ret
= cls_cxx_write2(hctx
, head
.tail
.offset
, bl
.length(), &bl
, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
);
195 head
.tail
.offset
+= bl
.length();
197 CLS_LOG(0, "ERROR: No space left in queue\n");
198 // return queue full error
202 } else if (head
.front
.offset
> head
.tail
.offset
) {
203 if ((head
.tail
.offset
+ bl
.length()) <= head
.front
.offset
) {
204 CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u\n\n", head
.tail
.to_str().c_str(), bl
.length());
205 //write data size and data at tail offset
206 auto ret
= cls_cxx_write2(hctx
, head
.tail
.offset
, bl
.length(), &bl
, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
);
210 head
.tail
.offset
+= bl
.length();
212 CLS_LOG(0, "ERROR: No space left in queue\n");
213 // return queue full error
218 if (head
.tail
.offset
== head
.queue_size
) {
219 head
.tail
.offset
= head
.max_head_size
;
222 CLS_LOG(20, "INFO: queue_enqueue: New tail offset: %s \n", head
.tail
.to_str().c_str());
228 int queue_list_entries(cls_method_context_t hctx
, const cls_queue_list_op
& op
, cls_queue_list_ret
& op_ret
, cls_queue_head
& head
)
230 // If queue is empty, return from here
231 if ((head
.front
.offset
== head
.tail
.offset
) && (head
.front
.gen
== head
.tail
.gen
)) {
232 CLS_LOG(20, "INFO: queue_list_entries(): Next offset is %s\n", head
.front
.to_str().c_str());
233 op_ret
.next_marker
= head
.front
.to_str();
234 op_ret
.is_truncated
= false;
238 cls_queue_marker start_marker
;
239 start_marker
.from_str(op
.start_marker
.c_str());
240 cls_queue_marker next_marker
= {0, 0};
242 uint64_t start_offset
= 0, gen
= 0;
243 if (start_marker
.offset
== 0) {
244 start_offset
= head
.front
.offset
;
245 gen
= head
.front
.gen
;
247 start_offset
= start_marker
.offset
;
248 gen
= start_marker
.gen
;
251 op_ret
.is_truncated
= true;
252 uint64_t chunk_size
= 1024;
253 uint64_t contiguous_data_size
= 0, size_to_read
= 0;
254 bool wrap_around
= false;
256 //Calculate length of contiguous data to be read depending on front, tail and start offset
257 if (head
.tail
.offset
> head
.front
.offset
) {
258 contiguous_data_size
= head
.tail
.offset
- start_offset
;
259 } else if (head
.front
.offset
>= head
.tail
.offset
) {
260 if (start_offset
>= head
.front
.offset
) {
261 contiguous_data_size
= head
.queue_size
- start_offset
;
263 } else if (start_offset
<= head
.tail
.offset
) {
264 contiguous_data_size
= head
.tail
.offset
- start_offset
;
268 CLS_LOG(10, "INFO: queue_list_entries(): front is: %s, tail is %s\n", head
.front
.to_str().c_str(), head
.tail
.to_str().c_str());
270 bool offset_populated
= false, entry_start_processed
= false;
271 uint64_t data_size
= 0, num_ops
= 0;
272 uint16_t entry_start
= 0;
277 CLS_LOG(10, "INFO: queue_list_entries(): start_offset is %lu\n", start_offset
);
280 //Read chunk size at a time, if it is less than contiguous data size, else read contiguous data size
281 if (contiguous_data_size
> chunk_size
) {
282 size_to_read
= chunk_size
;
284 size_to_read
= contiguous_data_size
;
286 CLS_LOG(10, "INFO: queue_list_entries(): size_to_read is %lu\n", size_to_read
);
287 if (size_to_read
== 0) {
288 next_marker
= head
.tail
;
289 op_ret
.is_truncated
= false;
290 CLS_LOG(20, "INFO: queue_list_entries(): size_to_read is 0, hence breaking out!\n");
294 auto ret
= cls_cxx_read(hctx
, start_offset
, size_to_read
, &bl_chunk
);
299 //If there is leftover data from previous iteration, append new data to leftover data
300 uint64_t entry_start_offset
= start_offset
- bl
.length();
301 CLS_LOG(20, "INFO: queue_list_entries(): Entry start offset accounting for leftover data is %lu\n", entry_start_offset
);
302 bl
.claim_append(bl_chunk
);
303 bl_chunk
= std::move(bl
);
305 CLS_LOG(20, "INFO: queue_list_entries(): size of chunk %u\n", bl_chunk
.length());
307 //Process the chunk of data read
309 auto it
= bl_chunk
.cbegin();
310 uint64_t size_to_process
= bl_chunk
.length();
312 CLS_LOG(10, "INFO: queue_list_entries(): index: %u, size_to_process: %lu\n", index
, size_to_process
);
313 cls_queue_entry entry
;
314 ceph_assert(it
.get_off() == index
);
315 //Use the last marker saved in previous iteration as the marker for this entry
316 if (offset_populated
) {
317 entry
.marker
= last_marker
;
319 //Populate offset if not done in previous iteration
320 if (! offset_populated
) {
321 cls_queue_marker marker
= {entry_start_offset
+ index
, gen
};
322 CLS_LOG(5, "INFO: queue_list_entries(): offset: %s\n", marker
.to_str().c_str());
323 entry
.marker
= marker
.to_str();
325 // Magic number + Data size - process if not done in previous iteration
326 if (! entry_start_processed
) {
327 if (size_to_process
>= (sizeof(uint16_t) + sizeof(uint64_t))) {
328 // Decode magic number at start
330 decode(entry_start
, it
);
331 } catch (buffer::error
& err
) {
332 CLS_LOG(10, "ERROR: queue_list_entries: failed to decode entry start: %s\n", err
.what());
335 if (entry_start
!= QUEUE_ENTRY_START
) {
336 CLS_LOG(5, "ERROR: queue_list_entries: invalid entry start %u\n", entry_start
);
339 index
+= sizeof(uint16_t);
340 size_to_process
-= sizeof(uint16_t);
343 decode(data_size
, it
);
344 } catch (buffer::error
& err
) {
345 CLS_LOG(10, "ERROR: queue_list_entries: failed to decode data size: %s\n", err
.what());
349 // Copy unprocessed data to bl
350 bl_chunk
.splice(index
, size_to_process
, &bl
);
351 offset_populated
= true;
352 last_marker
= entry
.marker
;
353 CLS_LOG(10, "INFO: queue_list_entries: not enough data to read entry start and data size, breaking out!\n");
356 CLS_LOG(20, "INFO: queue_list_entries(): data size: %lu\n", data_size
);
357 index
+= sizeof(uint64_t);
358 size_to_process
-= sizeof(uint64_t);
361 if (data_size
<= size_to_process
) {
362 it
.copy(data_size
, entry
.data
);
363 index
+= entry
.data
.length();
364 size_to_process
-= entry
.data
.length();
366 it
.copy(size_to_process
, bl
);
367 offset_populated
= true;
368 entry_start_processed
= true;
369 last_marker
= entry
.marker
;
370 CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!\n");
373 op_ret
.entries
.emplace_back(entry
);
374 // Resetting some values
375 offset_populated
= false;
376 entry_start_processed
= false;
381 if (num_ops
== op
.max
) {
382 CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!\n");
385 } while(index
< bl_chunk
.length());
387 CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu\n", num_ops
, op
.max
);
389 if (num_ops
== op
.max
) {
390 next_marker
= cls_queue_marker
{(entry_start_offset
+ index
), gen
};
391 CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu\n", next_marker
.offset
);
395 //Calculate new start_offset and contiguous data size
396 start_offset
+= size_to_read
;
397 contiguous_data_size
-= size_to_read
;
398 if (contiguous_data_size
== 0) {
400 start_offset
= head
.max_head_size
;
401 contiguous_data_size
= head
.tail
.offset
- head
.max_head_size
;
405 CLS_LOG(10, "INFO: queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!\n");
406 next_marker
= head
.tail
;
407 op_ret
.is_truncated
= false;
412 } while(num_ops
< op
.max
);
414 //Wrap around next offset if it has reached end of queue
415 if (next_marker
.offset
== head
.queue_size
) {
416 next_marker
.offset
= head
.max_head_size
;
417 next_marker
.gen
+= 1;
419 if ((next_marker
.offset
== head
.tail
.offset
) && (next_marker
.gen
== head
.tail
.gen
)) {
420 op_ret
.is_truncated
= false;
423 CLS_LOG(5, "INFO: queue_list_entries(): next offset: %s\n", next_marker
.to_str().c_str());
424 op_ret
.next_marker
= next_marker
.to_str();
429 int queue_remove_entries(cls_method_context_t hctx
, const cls_queue_remove_op
& op
, cls_queue_head
& head
)
432 if ((head
.front
.offset
== head
.tail
.offset
) && (head
.front
.gen
== head
.tail
.gen
)) {
436 cls_queue_marker end_marker
;
437 end_marker
.from_str(op
.end_marker
.c_str());
439 CLS_LOG(5, "INFO: queue_remove_entries: op.end_marker = %s\n", end_marker
.to_str().c_str());
441 //Zero out the entries that have been removed, to reclaim storage space
442 if (end_marker
.offset
> head
.front
.offset
&& end_marker
.gen
== head
.front
.gen
) {
443 uint64_t len
= end_marker
.offset
- head
.front
.offset
;
445 auto ret
= cls_cxx_write_zero(hctx
, head
.front
.offset
, len
);
447 CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
448 CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s\n", head
.front
.to_str().c_str());
452 } else if ((head
.front
.offset
>= end_marker
.offset
) && (end_marker
.gen
== head
.front
.gen
+ 1)) { //start offset > end offset
453 uint64_t len
= head
.queue_size
- head
.front
.offset
;
455 auto ret
= cls_cxx_write_zero(hctx
, head
.front
.offset
, len
);
457 CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
458 CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s\n", head
.front
.to_str().c_str());
462 len
= end_marker
.offset
- head
.max_head_size
;
464 auto ret
= cls_cxx_write_zero(hctx
, head
.max_head_size
, len
);
466 CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
467 CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %lu\n", head
.max_head_size
);
471 } else if ((head
.front
.offset
== end_marker
.offset
) && (head
.front
.gen
== end_marker
.gen
)) {
474 CLS_LOG(0, "INFO: queue_remove_entries: Invalid end marker: offset = %s, gen = %lu\n", end_marker
.to_str().c_str(), end_marker
.gen
);
478 head
.front
= end_marker
;
480 // Check if it is the end, then wrap around
481 if (head
.front
.offset
== head
.queue_size
) {
482 head
.front
.offset
= head
.max_head_size
;
486 CLS_LOG(20, "INFO: queue_remove_entries: front offset is: %s and tail offset is %s\n", head
.front
.to_str().c_str(), head
.tail
.to_str().c_str());