1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "include/types.h"
8 #include "objclass/objclass.h"
9 #include "cls/queue/cls_queue_types.h"
10 #include "cls/queue/cls_queue_ops.h"
11 #include "cls/queue/cls_queue_const.h"
12 #include "cls/queue/cls_queue_src.h"
14 int queue_write_head(cls_method_context_t hctx
, cls_queue_head
& head
)
17 uint16_t entry_start
= QUEUE_HEAD_START
;
18 encode(entry_start
, bl
);
21 encode(head
, bl_head
);
23 uint64_t encoded_len
= bl_head
.length();
24 encode(encoded_len
, bl
);
26 bl
.claim_append(bl_head
);
28 int ret
= cls_cxx_write2(hctx
, 0, bl
.length(), &bl
, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED
);
30 CLS_LOG(5, "ERROR: queue_write_head: failed to write head\n");
36 int queue_read_head(cls_method_context_t hctx
, cls_queue_head
& head
)
38 uint64_t chunk_size
= 1024, start_offset
= 0;
41 const auto ret
= cls_cxx_read(hctx
, start_offset
, chunk_size
, &bl_head
);
43 CLS_LOG(5, "ERROR: queue_read_head: failed to read head\n");
47 //Process the chunk of data read
48 auto it
= bl_head
.cbegin();
50 uint16_t queue_head_start
;
52 decode(queue_head_start
, it
);
53 } catch (buffer::error
& err
) {
54 CLS_LOG(0, "ERROR: queue_read_head: failed to decode queue start: %s \n", err
.what());
57 if (queue_head_start
!= QUEUE_HEAD_START
) {
58 CLS_LOG(0, "ERROR: queue_read_head: invalid queue start\n");
64 decode(encoded_len
, it
);
65 } catch (buffer::error
& err
) {
66 CLS_LOG(0, "ERROR: queue_read_head: failed to decode encoded head size: %s\n", err
.what());
70 constexpr auto decoded_head_size
= sizeof(queue_head_start
) + sizeof(encoded_len
);
71 if (encoded_len
> (chunk_size
- decoded_head_size
)) {
72 start_offset
= chunk_size
;
73 chunk_size
= (encoded_len
- (chunk_size
- decoded_head_size
));
74 bufferlist bl_remaining_head
;
75 const auto ret
= cls_cxx_read2(hctx
, start_offset
, chunk_size
, &bl_remaining_head
, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
);
77 CLS_LOG(5, "ERROR: queue_read_head: failed to read remaining part of head\n");
80 bl_head
.claim_append(bl_remaining_head
);
85 } catch (buffer::error
& err
) {
86 CLS_LOG(0, "ERROR: queue_read_head: failed to decode head: %s\n", err
.what());
93 int queue_init(cls_method_context_t hctx
, const cls_queue_init_op
& op
)
95 //get head and its size
97 int ret
= queue_read_head(hctx
, head
);
99 //head is already initialized
104 if (ret
< 0 && ret
!= -EINVAL
) {
108 if (op
.bl_urgent_data
.length() > 0) {
109 head
.bl_urgent_data
= op
.bl_urgent_data
;
112 head
.max_head_size
= QUEUE_HEAD_SIZE_1K
+ op
.max_urgent_data_size
;
113 head
.queue_size
= op
.queue_size
+ head
.max_head_size
;
114 head
.max_urgent_data_size
= op
.max_urgent_data_size
;
115 head
.tail
.gen
= head
.front
.gen
= 0;
116 head
.tail
.offset
= head
.front
.offset
= head
.max_head_size
;
118 CLS_LOG(20, "INFO: init_queue_op queue actual size %lu", head
.queue_size
);
119 CLS_LOG(20, "INFO: init_queue_op head size %lu", head
.max_head_size
);
120 CLS_LOG(20, "INFO: init_queue_op queue front offset %s", head
.front
.to_str().c_str());
121 CLS_LOG(20, "INFO: init_queue_op queue max urgent data size %lu", head
.max_urgent_data_size
);
123 return queue_write_head(hctx
, head
);
126 int queue_get_capacity(cls_method_context_t hctx
, cls_queue_get_capacity_ret
& op_ret
)
130 int ret
= queue_read_head(hctx
, head
);
135 op_ret
.queue_capacity
= head
.queue_size
- head
.max_head_size
;
137 CLS_LOG(20, "INFO: queue_get_capacity: size of queue is %lu\n", op_ret
.queue_capacity
);
142 int queue_enqueue(cls_method_context_t hctx
, cls_queue_enqueue_op
& op
, cls_queue_head
& head
)
144 if ((head
.front
.offset
== head
.tail
.offset
) && (head
.tail
.gen
== head
.front
.gen
+ 1)) {
145 CLS_LOG(0, "ERROR: No space left in queue\n");
149 for (auto& bl_data
: op
.bl_data_vec
) {
151 uint16_t entry_start
= QUEUE_ENTRY_START
;
152 encode(entry_start
, bl
);
153 uint64_t data_size
= bl_data
.length();
154 encode(data_size
, bl
);
155 bl
.claim_append(bl_data
);
157 CLS_LOG(10, "INFO: queue_enqueue(): Total size to be written is %u and data size is %lu\n", bl
.length(), data_size
);
159 if (head
.tail
.offset
>= head
.front
.offset
) {
160 // check if data can fit in the remaining space in queue
161 if ((head
.tail
.offset
+ bl
.length()) <= head
.queue_size
) {
162 CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u\n", head
.tail
.to_str().c_str(), bl
.length());
163 //write data size and data at tail offset
164 auto ret
= cls_cxx_write2(hctx
, head
.tail
.offset
, bl
.length(), &bl
, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
);
168 head
.tail
.offset
+= bl
.length();
170 uint64_t free_space_available
= (head
.queue_size
- head
.tail
.offset
) + (head
.front
.offset
- head
.max_head_size
);
171 //Split data if there is free space available
172 if (bl
.length() <= free_space_available
) {
173 uint64_t size_before_wrap
= head
.queue_size
- head
.tail
.offset
;
174 bufferlist bl_data_before_wrap
;
175 bl
.splice(0, size_before_wrap
, &bl_data_before_wrap
);
176 //write spliced (data size and data) at tail offset
177 CLS_LOG(5, "INFO: queue_enqueue: Writing spliced data at offset: %s and data size: %u\n", head
.tail
.to_str().c_str(), bl_data_before_wrap
.length());
178 auto ret
= cls_cxx_write2(hctx
, head
.tail
.offset
, bl_data_before_wrap
.length(), &bl_data_before_wrap
, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
);
182 head
.tail
.offset
= head
.max_head_size
;
184 //write remaining data at tail offset after wrapping around
185 CLS_LOG(5, "INFO: queue_enqueue: Writing remaining data at offset: %s and data size: %u\n", head
.tail
.to_str().c_str(), bl
.length());
186 ret
= cls_cxx_write2(hctx
, head
.tail
.offset
, bl
.length(), &bl
, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
);
190 head
.tail
.offset
+= bl
.length();
192 CLS_LOG(0, "ERROR: No space left in queue\n");
193 // return queue full error
197 } else if (head
.front
.offset
> head
.tail
.offset
) {
198 if ((head
.tail
.offset
+ bl
.length()) <= head
.front
.offset
) {
199 CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u\n\n", head
.tail
.to_str().c_str(), bl
.length());
200 //write data size and data at tail offset
201 auto ret
= cls_cxx_write2(hctx
, head
.tail
.offset
, bl
.length(), &bl
, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL
);
205 head
.tail
.offset
+= bl
.length();
207 CLS_LOG(0, "ERROR: No space left in queue\n");
208 // return queue full error
213 if (head
.tail
.offset
== head
.queue_size
) {
214 head
.tail
.offset
= head
.max_head_size
;
217 CLS_LOG(20, "INFO: queue_enqueue: New tail offset: %s \n", head
.tail
.to_str().c_str());
223 int queue_list_entries(cls_method_context_t hctx
, const cls_queue_list_op
& op
, cls_queue_list_ret
& op_ret
, cls_queue_head
& head
)
225 // If queue is empty, return from here
226 if ((head
.front
.offset
== head
.tail
.offset
) && (head
.front
.gen
== head
.tail
.gen
)) {
227 CLS_LOG(20, "INFO: queue_list_entries(): Next offset is %s\n", head
.front
.to_str().c_str());
228 op_ret
.next_marker
= head
.front
.to_str();
229 op_ret
.is_truncated
= false;
233 cls_queue_marker start_marker
;
234 start_marker
.from_str(op
.start_marker
.c_str());
235 cls_queue_marker next_marker
= {0, 0};
237 uint64_t start_offset
= 0, gen
= 0;
238 if (start_marker
.offset
== 0) {
239 start_offset
= head
.front
.offset
;
240 gen
= head
.front
.gen
;
242 start_offset
= start_marker
.offset
;
243 gen
= start_marker
.gen
;
246 op_ret
.is_truncated
= true;
247 uint64_t chunk_size
= 1024;
248 uint64_t contiguous_data_size
= 0, size_to_read
= 0;
249 bool wrap_around
= false;
251 //Calculate length of contiguous data to be read depending on front, tail and start offset
252 if (head
.tail
.offset
> head
.front
.offset
) {
253 contiguous_data_size
= head
.tail
.offset
- start_offset
;
254 } else if (head
.front
.offset
>= head
.tail
.offset
) {
255 if (start_offset
>= head
.front
.offset
) {
256 contiguous_data_size
= head
.queue_size
- start_offset
;
258 } else if (start_offset
<= head
.tail
.offset
) {
259 contiguous_data_size
= head
.tail
.offset
- start_offset
;
263 CLS_LOG(10, "INFO: queue_list_entries(): front is: %s, tail is %s\n", head
.front
.to_str().c_str(), head
.tail
.to_str().c_str());
265 bool offset_populated
= false, entry_start_processed
= false;
266 uint64_t data_size
= 0, num_ops
= 0;
267 uint16_t entry_start
= 0;
271 CLS_LOG(10, "INFO: queue_list_entries(): start_offset is %lu\n", start_offset
);
274 //Read chunk size at a time, if it is less than contiguous data size, else read contiguous data size
275 if (contiguous_data_size
> chunk_size
) {
276 size_to_read
= chunk_size
;
278 size_to_read
= contiguous_data_size
;
280 CLS_LOG(10, "INFO: queue_list_entries(): size_to_read is %lu\n", size_to_read
);
281 if (size_to_read
== 0) {
282 next_marker
= head
.tail
;
283 op_ret
.is_truncated
= false;
284 CLS_LOG(20, "INFO: queue_list_entries(): size_to_read is 0, hence breaking out!\n");
288 auto ret
= cls_cxx_read(hctx
, start_offset
, size_to_read
, &bl_chunk
);
293 //If there is leftover data from previous iteration, append new data to leftover data
294 uint64_t entry_start_offset
= start_offset
- bl
.length();
295 CLS_LOG(20, "INFO: queue_list_entries(): Entry start offset accounting for leftover data is %lu\n", entry_start_offset
);
296 bl
.claim_append(bl_chunk
);
297 bl_chunk
= std::move(bl
);
299 CLS_LOG(20, "INFO: queue_list_entries(): size of chunk %u\n", bl_chunk
.length());
301 //Process the chunk of data read
303 auto it
= bl_chunk
.cbegin();
304 uint64_t size_to_process
= bl_chunk
.length();
306 CLS_LOG(10, "INFO: queue_list_entries(): index: %u, size_to_process: %lu\n", index
, size_to_process
);
307 cls_queue_entry entry
;
308 ceph_assert(it
.get_off() == index
);
309 //Populate offset if not done in previous iteration
310 if (! offset_populated
) {
311 cls_queue_marker marker
= {entry_start_offset
+ index
, gen
};
312 CLS_LOG(5, "INFO: queue_list_entries(): offset: %s\n", marker
.to_str().c_str());
313 entry
.marker
= marker
.to_str();
315 // Magic number + Data size - process if not done in previous iteration
316 if (! entry_start_processed
) {
317 if (size_to_process
>= (sizeof(uint16_t) + sizeof(uint64_t))) {
318 // Decode magic number at start
320 decode(entry_start
, it
);
321 } catch (buffer::error
& err
) {
322 CLS_LOG(10, "ERROR: queue_list_entries: failed to decode entry start: %s\n", err
.what());
325 if (entry_start
!= QUEUE_ENTRY_START
) {
326 CLS_LOG(5, "ERROR: queue_list_entries: invalid entry start %u\n", entry_start
);
329 index
+= sizeof(uint16_t);
330 size_to_process
-= sizeof(uint16_t);
333 decode(data_size
, it
);
334 } catch (buffer::error
& err
) {
335 CLS_LOG(10, "ERROR: queue_list_entries: failed to decode data size: %s\n", err
.what());
339 // Copy unprocessed data to bl
340 bl_chunk
.splice(index
, size_to_process
, &bl
);
341 offset_populated
= true;
342 CLS_LOG(10, "INFO: queue_list_entries: not enough data to read entry start and data size, breaking out!\n");
345 CLS_LOG(20, "INFO: queue_list_entries(): data size: %lu\n", data_size
);
346 index
+= sizeof(uint64_t);
347 size_to_process
-= sizeof(uint64_t);
350 if (data_size
<= size_to_process
) {
351 it
.copy(data_size
, entry
.data
);
352 index
+= entry
.data
.length();
353 size_to_process
-= entry
.data
.length();
355 it
.copy(size_to_process
, bl
);
356 offset_populated
= true;
357 entry_start_processed
= true;
358 CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!\n");
361 op_ret
.entries
.emplace_back(entry
);
362 // Resetting some values
363 offset_populated
= false;
364 entry_start_processed
= false;
368 if (num_ops
== op
.max
) {
369 CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!\n");
372 } while(index
< bl_chunk
.length());
374 CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu\n", num_ops
, op
.max
);
376 if (num_ops
== op
.max
) {
377 next_marker
= cls_queue_marker
{(entry_start_offset
+ index
), gen
};
378 CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu\n", next_marker
.offset
);
382 //Calculate new start_offset and contiguous data size
383 start_offset
+= size_to_read
;
384 contiguous_data_size
-= size_to_read
;
385 if (contiguous_data_size
== 0) {
387 start_offset
= head
.max_head_size
;
388 contiguous_data_size
= head
.tail
.offset
- head
.max_head_size
;
392 CLS_LOG(10, "INFO: queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!\n");
393 next_marker
= head
.tail
;
394 op_ret
.is_truncated
= false;
399 } while(num_ops
< op
.max
);
401 //Wrap around next offset if it has reached end of queue
402 if (next_marker
.offset
== head
.queue_size
) {
403 next_marker
.offset
= head
.max_head_size
;
404 next_marker
.gen
+= 1;
406 if ((next_marker
.offset
== head
.tail
.offset
) && (next_marker
.gen
== head
.tail
.gen
)) {
407 op_ret
.is_truncated
= false;
410 CLS_LOG(5, "INFO: queue_list_entries(): next offset: %s\n", next_marker
.to_str().c_str());
411 op_ret
.next_marker
= next_marker
.to_str();
416 int queue_remove_entries(cls_method_context_t hctx
, const cls_queue_remove_op
& op
, cls_queue_head
& head
)
419 if ((head
.front
.offset
== head
.tail
.offset
) && (head
.front
.gen
== head
.tail
.gen
)) {
423 cls_queue_marker end_marker
;
424 end_marker
.from_str(op
.end_marker
.c_str());
426 CLS_LOG(5, "INFO: queue_remove_entries: op.end_marker = %s\n", end_marker
.to_str().c_str());
428 //Zero out the entries that have been removed, to reclaim storage space
429 if (end_marker
.offset
> head
.front
.offset
&& end_marker
.gen
== head
.front
.gen
) {
430 uint64_t len
= end_marker
.offset
- head
.front
.offset
;
432 auto ret
= cls_cxx_write_zero(hctx
, head
.front
.offset
, len
);
434 CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
435 CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s\n", head
.front
.to_str().c_str());
439 } else if ((head
.front
.offset
>= end_marker
.offset
) && (end_marker
.gen
== head
.front
.gen
+ 1)) { //start offset > end offset
440 uint64_t len
= head
.queue_size
- head
.front
.offset
;
442 auto ret
= cls_cxx_write_zero(hctx
, head
.front
.offset
, len
);
444 CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
445 CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s\n", head
.front
.to_str().c_str());
449 len
= end_marker
.offset
- head
.max_head_size
;
451 auto ret
= cls_cxx_write_zero(hctx
, head
.max_head_size
, len
);
453 CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
454 CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %lu\n", head
.max_head_size
);
458 } else if ((head
.front
.offset
== end_marker
.offset
) && (head
.front
.gen
== end_marker
.gen
)) {
461 CLS_LOG(0, "INFO: queue_remove_entries: Invalid end marker: offset = %s, gen = %lu\n", end_marker
.to_str().c_str(), end_marker
.gen
);
465 head
.front
= end_marker
;
467 // Check if it is the end, then wrap around
468 if (head
.front
.offset
== head
.queue_size
) {
469 head
.front
.offset
= head
.max_head_size
;
473 CLS_LOG(20, "INFO: queue_remove_entries: front offset is: %s and tail offset is %s\n", head
.front
.to_str().c_str(), head
.tail
.to_str().c_str());