]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "include/types.h" | |
5 | ||
9f95a23c TL |
6 | #include "objclass/objclass.h" |
7 | #include "cls/queue/cls_queue_types.h" | |
8 | #include "cls/queue/cls_queue_ops.h" | |
9 | #include "cls/queue/cls_queue_const.h" | |
10 | #include "cls/queue/cls_queue_src.h" | |
11 | ||
20effc67 | 12 | using std::string; |
f67539c2 TL |
13 | using ceph::bufferlist; |
14 | using ceph::decode; | |
15 | using ceph::encode; | |
16 | ||
9f95a23c TL |
17 | int queue_write_head(cls_method_context_t hctx, cls_queue_head& head) |
18 | { | |
19 | bufferlist bl; | |
20 | uint16_t entry_start = QUEUE_HEAD_START; | |
21 | encode(entry_start, bl); | |
22 | ||
23 | bufferlist bl_head; | |
24 | encode(head, bl_head); | |
25 | ||
26 | uint64_t encoded_len = bl_head.length(); | |
27 | encode(encoded_len, bl); | |
28 | ||
29 | bl.claim_append(bl_head); | |
30 | ||
e306af50 TL |
31 | if (bl.length() > head.max_head_size) { |
32 | CLS_LOG(0, "ERROR: queue_write_head: invalid head size = %u and urgent data size = %u \n", bl.length(), head.bl_urgent_data.length()); | |
33 | return -EINVAL; | |
34 | } | |
35 | ||
9f95a23c TL |
36 | int ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED); |
37 | if (ret < 0) { | |
f67539c2 | 38 | CLS_LOG(5, "ERROR: queue_write_head: failed to write head"); |
9f95a23c TL |
39 | return ret; |
40 | } | |
41 | return 0; | |
42 | } | |
43 | ||
44 | int queue_read_head(cls_method_context_t hctx, cls_queue_head& head) | |
45 | { | |
46 | uint64_t chunk_size = 1024, start_offset = 0; | |
47 | ||
48 | bufferlist bl_head; | |
49 | const auto ret = cls_cxx_read(hctx, start_offset, chunk_size, &bl_head); | |
50 | if (ret < 0) { | |
f67539c2 | 51 | CLS_LOG(5, "ERROR: queue_read_head: failed to read head"); |
9f95a23c TL |
52 | return ret; |
53 | } | |
f67539c2 TL |
54 | if (ret == 0) { |
55 | CLS_LOG(20, "INFO: queue_read_head: empty head, not initialized yet"); | |
56 | return -EINVAL; | |
57 | } | |
9f95a23c TL |
58 | |
59 | //Process the chunk of data read | |
60 | auto it = bl_head.cbegin(); | |
61 | // Queue head start | |
62 | uint16_t queue_head_start; | |
63 | try { | |
64 | decode(queue_head_start, it); | |
f67539c2 TL |
65 | } catch (const ceph::buffer::error& err) { |
66 | CLS_LOG(0, "ERROR: queue_read_head: failed to decode queue start: %s", err.what()); | |
9f95a23c TL |
67 | return -EINVAL; |
68 | } | |
69 | if (queue_head_start != QUEUE_HEAD_START) { | |
f67539c2 | 70 | CLS_LOG(0, "ERROR: queue_read_head: invalid queue start"); |
9f95a23c TL |
71 | return -EINVAL; |
72 | } | |
73 | ||
74 | uint64_t encoded_len; | |
75 | try { | |
76 | decode(encoded_len, it); | |
f67539c2 TL |
77 | } catch (const ceph::buffer::error& err) { |
78 | CLS_LOG(0, "ERROR: queue_read_head: failed to decode encoded head size: %s", err.what()); | |
9f95a23c TL |
79 | return -EINVAL; |
80 | } | |
81 | ||
f67539c2 | 82 | if (encoded_len > (chunk_size - QUEUE_ENTRY_OVERHEAD)) { |
9f95a23c | 83 | start_offset = chunk_size; |
f67539c2 | 84 | chunk_size = (encoded_len - (chunk_size - QUEUE_ENTRY_OVERHEAD)); |
9f95a23c TL |
85 | bufferlist bl_remaining_head; |
86 | const auto ret = cls_cxx_read2(hctx, start_offset, chunk_size, &bl_remaining_head, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); | |
87 | if (ret < 0) { | |
f67539c2 | 88 | CLS_LOG(5, "ERROR: queue_read_head: failed to read remaining part of head"); |
9f95a23c TL |
89 | return ret; |
90 | } | |
91 | bl_head.claim_append(bl_remaining_head); | |
92 | } | |
93 | ||
94 | try { | |
95 | decode(head, it); | |
f67539c2 TL |
96 | } catch (const ceph::buffer::error& err) { |
97 | CLS_LOG(0, "ERROR: queue_read_head: failed to decode head: %s", err.what()); | |
9f95a23c TL |
98 | return -EINVAL; |
99 | } | |
100 | ||
101 | return 0; | |
102 | } | |
103 | ||
104 | int queue_init(cls_method_context_t hctx, const cls_queue_init_op& op) | |
105 | { | |
106 | //get head and its size | |
107 | cls_queue_head head; | |
108 | int ret = queue_read_head(hctx, head); | |
109 | ||
110 | //head is already initialized | |
111 | if (ret == 0) { | |
112 | return -EEXIST; | |
113 | } | |
114 | ||
115 | if (ret < 0 && ret != -EINVAL) { | |
116 | return ret; | |
117 | } | |
118 | ||
119 | if (op.bl_urgent_data.length() > 0) { | |
120 | head.bl_urgent_data = op.bl_urgent_data; | |
121 | } | |
122 | ||
123 | head.max_head_size = QUEUE_HEAD_SIZE_1K + op.max_urgent_data_size; | |
124 | head.queue_size = op.queue_size + head.max_head_size; | |
125 | head.max_urgent_data_size = op.max_urgent_data_size; | |
126 | head.tail.gen = head.front.gen = 0; | |
127 | head.tail.offset = head.front.offset = head.max_head_size; | |
128 | ||
129 | CLS_LOG(20, "INFO: init_queue_op queue actual size %lu", head.queue_size); | |
130 | CLS_LOG(20, "INFO: init_queue_op head size %lu", head.max_head_size); | |
131 | CLS_LOG(20, "INFO: init_queue_op queue front offset %s", head.front.to_str().c_str()); | |
132 | CLS_LOG(20, "INFO: init_queue_op queue max urgent data size %lu", head.max_urgent_data_size); | |
133 | ||
134 | return queue_write_head(hctx, head); | |
135 | } | |
136 | ||
137 | int queue_get_capacity(cls_method_context_t hctx, cls_queue_get_capacity_ret& op_ret) | |
138 | { | |
139 | //get head | |
140 | cls_queue_head head; | |
141 | int ret = queue_read_head(hctx, head); | |
142 | if (ret < 0) { | |
143 | return ret; | |
144 | } | |
145 | ||
146 | op_ret.queue_capacity = head.queue_size - head.max_head_size; | |
147 | ||
f67539c2 | 148 | CLS_LOG(20, "INFO: queue_get_capacity: size of queue is %lu", op_ret.queue_capacity); |
9f95a23c TL |
149 | |
150 | return 0; | |
151 | } | |
152 | ||
f67539c2 TL |
153 | |
154 | /* | |
155 | enqueue of new bufferlist happens in the free spaces of the queue, the queue can be in | |
156 | one of two states: | |
157 | ||
158 | (1) split free space | |
159 | +-------------+--------------------------------------------------------------------+ | |
160 | | object head | XXXXXXXXXXXXXXXXXXXXXXXXXXX | | |
161 | | | ^ ^ | | |
162 | | front tail | | | | | |
163 | +---+------+--+----------------|-------------------------|-------------------------+ | |
164 | | | | | | |
165 | | +-------------------|-------------------------+ | |
166 | +--------------------------+ | |
167 | ||
168 | (2) continuous free space | |
169 | +-------------+--------------------------------------------------------------------+ | |
170 | | object head |XXXXXXXXXXXXXXXXX XXXXXXXXXXXXXXXXXXXXXXXXXX| | |
171 | | | ^ ^ | | |
172 | | front tail | | | | | |
173 | +---+------+--+----------------|-------------------------|-------------------------+ | |
174 | | | | | | |
175 | | +-------------------+ | | |
176 | +----------------------------------------------------+ | |
177 | */ | |
178 | ||
9f95a23c TL |
179 | int queue_enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head) |
180 | { | |
181 | if ((head.front.offset == head.tail.offset) && (head.tail.gen == head.front.gen + 1)) { | |
f67539c2 | 182 | CLS_LOG(0, "ERROR: No space left in queue"); |
9f95a23c TL |
183 | return -ENOSPC; |
184 | } | |
185 | ||
186 | for (auto& bl_data : op.bl_data_vec) { | |
187 | bufferlist bl; | |
188 | uint16_t entry_start = QUEUE_ENTRY_START; | |
189 | encode(entry_start, bl); | |
190 | uint64_t data_size = bl_data.length(); | |
191 | encode(data_size, bl); | |
192 | bl.claim_append(bl_data); | |
193 | ||
f67539c2 | 194 | CLS_LOG(10, "INFO: queue_enqueue(): Total size to be written is %u and data size is %lu", bl.length(), data_size); |
9f95a23c TL |
195 | |
196 | if (head.tail.offset >= head.front.offset) { | |
197 | // check if data can fit in the remaining space in queue | |
198 | if ((head.tail.offset + bl.length()) <= head.queue_size) { | |
f67539c2 | 199 | CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u", head.tail.to_str().c_str(), bl.length()); |
9f95a23c TL |
200 | //write data size and data at tail offset |
201 | auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); | |
202 | if (ret < 0) { | |
203 | return ret; | |
204 | } | |
205 | head.tail.offset += bl.length(); | |
206 | } else { | |
207 | uint64_t free_space_available = (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size); | |
208 | //Split data if there is free space available | |
209 | if (bl.length() <= free_space_available) { | |
210 | uint64_t size_before_wrap = head.queue_size - head.tail.offset; | |
211 | bufferlist bl_data_before_wrap; | |
212 | bl.splice(0, size_before_wrap, &bl_data_before_wrap); | |
213 | //write spliced (data size and data) at tail offset | |
f67539c2 | 214 | CLS_LOG(5, "INFO: queue_enqueue: Writing spliced data at offset: %s and data size: %u", head.tail.to_str().c_str(), bl_data_before_wrap.length()); |
9f95a23c TL |
215 | auto ret = cls_cxx_write2(hctx, head.tail.offset, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); |
216 | if (ret < 0) { | |
217 | return ret; | |
218 | } | |
219 | head.tail.offset = head.max_head_size; | |
220 | head.tail.gen += 1; | |
221 | //write remaining data at tail offset after wrapping around | |
f67539c2 | 222 | CLS_LOG(5, "INFO: queue_enqueue: Writing remaining data at offset: %s and data size: %u", head.tail.to_str().c_str(), bl.length()); |
9f95a23c TL |
223 | ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); |
224 | if (ret < 0) { | |
225 | return ret; | |
226 | } | |
227 | head.tail.offset += bl.length(); | |
228 | } else { | |
229 | CLS_LOG(0, "ERROR: No space left in queue\n"); | |
230 | // return queue full error | |
231 | return -ENOSPC; | |
232 | } | |
233 | } | |
234 | } else if (head.front.offset > head.tail.offset) { | |
235 | if ((head.tail.offset + bl.length()) <= head.front.offset) { | |
f67539c2 | 236 | CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u", head.tail.to_str().c_str(), bl.length()); |
9f95a23c TL |
237 | //write data size and data at tail offset |
238 | auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL); | |
239 | if (ret < 0) { | |
240 | return ret; | |
241 | } | |
242 | head.tail.offset += bl.length(); | |
243 | } else { | |
f67539c2 | 244 | CLS_LOG(0, "ERROR: No space left in queue"); |
9f95a23c TL |
245 | // return queue full error |
246 | return -ENOSPC; | |
247 | } | |
248 | } | |
249 | ||
250 | if (head.tail.offset == head.queue_size) { | |
251 | head.tail.offset = head.max_head_size; | |
252 | head.tail.gen += 1; | |
253 | } | |
f67539c2 | 254 | CLS_LOG(20, "INFO: queue_enqueue: New tail offset: %s", head.tail.to_str().c_str()); |
9f95a23c TL |
255 | } //end - for |
256 | ||
257 | return 0; | |
258 | } | |
259 | ||
260 | int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, cls_queue_list_ret& op_ret, cls_queue_head& head) | |
261 | { | |
262 | // If queue is empty, return from here | |
263 | if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) { | |
f67539c2 | 264 | CLS_LOG(20, "INFO: queue_list_entries(): Next offset is %s", head.front.to_str().c_str()); |
9f95a23c TL |
265 | op_ret.next_marker = head.front.to_str(); |
266 | op_ret.is_truncated = false; | |
267 | return 0; | |
268 | } | |
269 | ||
270 | cls_queue_marker start_marker; | |
271 | start_marker.from_str(op.start_marker.c_str()); | |
272 | cls_queue_marker next_marker = {0, 0}; | |
273 | ||
274 | uint64_t start_offset = 0, gen = 0; | |
275 | if (start_marker.offset == 0) { | |
276 | start_offset = head.front.offset; | |
277 | gen = head.front.gen; | |
278 | } else { | |
279 | start_offset = start_marker.offset; | |
280 | gen = start_marker.gen; | |
281 | } | |
282 | ||
283 | op_ret.is_truncated = true; | |
284 | uint64_t chunk_size = 1024; | |
285 | uint64_t contiguous_data_size = 0, size_to_read = 0; | |
286 | bool wrap_around = false; | |
287 | ||
288 | //Calculate length of contiguous data to be read depending on front, tail and start offset | |
289 | if (head.tail.offset > head.front.offset) { | |
290 | contiguous_data_size = head.tail.offset - start_offset; | |
291 | } else if (head.front.offset >= head.tail.offset) { | |
292 | if (start_offset >= head.front.offset) { | |
293 | contiguous_data_size = head.queue_size - start_offset; | |
294 | wrap_around = true; | |
295 | } else if (start_offset <= head.tail.offset) { | |
296 | contiguous_data_size = head.tail.offset - start_offset; | |
297 | } | |
298 | } | |
299 | ||
f67539c2 | 300 | CLS_LOG(10, "INFO: queue_list_entries(): front is: %s, tail is %s", head.front.to_str().c_str(), head.tail.to_str().c_str()); |
9f95a23c TL |
301 | |
302 | bool offset_populated = false, entry_start_processed = false; | |
303 | uint64_t data_size = 0, num_ops = 0; | |
304 | uint16_t entry_start = 0; | |
305 | bufferlist bl; | |
e306af50 | 306 | string last_marker; |
9f95a23c TL |
307 | do |
308 | { | |
f67539c2 | 309 | CLS_LOG(10, "INFO: queue_list_entries(): start_offset is %lu", start_offset); |
9f95a23c TL |
310 | |
311 | bufferlist bl_chunk; | |
312 | //Read chunk size at a time, if it is less than contiguous data size, else read contiguous data size | |
313 | if (contiguous_data_size > chunk_size) { | |
314 | size_to_read = chunk_size; | |
315 | } else { | |
316 | size_to_read = contiguous_data_size; | |
317 | } | |
f67539c2 | 318 | CLS_LOG(10, "INFO: queue_list_entries(): size_to_read is %lu", size_to_read); |
9f95a23c TL |
319 | if (size_to_read == 0) { |
320 | next_marker = head.tail; | |
321 | op_ret.is_truncated = false; | |
322 | CLS_LOG(20, "INFO: queue_list_entries(): size_to_read is 0, hence breaking out!\n"); | |
323 | break; | |
324 | } | |
325 | ||
326 | auto ret = cls_cxx_read(hctx, start_offset, size_to_read, &bl_chunk); | |
327 | if (ret < 0) { | |
328 | return ret; | |
329 | } | |
330 | ||
331 | //If there is leftover data from previous iteration, append new data to leftover data | |
332 | uint64_t entry_start_offset = start_offset - bl.length(); | |
f67539c2 | 333 | CLS_LOG(20, "INFO: queue_list_entries(): Entry start offset accounting for leftover data is %lu", entry_start_offset); |
9f95a23c TL |
334 | bl.claim_append(bl_chunk); |
335 | bl_chunk = std::move(bl); | |
336 | ||
f67539c2 | 337 | CLS_LOG(20, "INFO: queue_list_entries(): size of chunk %u", bl_chunk.length()); |
9f95a23c TL |
338 | |
339 | //Process the chunk of data read | |
340 | unsigned index = 0; | |
341 | auto it = bl_chunk.cbegin(); | |
342 | uint64_t size_to_process = bl_chunk.length(); | |
343 | do { | |
f67539c2 | 344 | CLS_LOG(10, "INFO: queue_list_entries(): index: %u, size_to_process: %lu", index, size_to_process); |
9f95a23c TL |
345 | cls_queue_entry entry; |
346 | ceph_assert(it.get_off() == index); | |
e306af50 TL |
347 | //Use the last marker saved in previous iteration as the marker for this entry |
348 | if (offset_populated) { | |
349 | entry.marker = last_marker; | |
350 | } | |
9f95a23c TL |
351 | //Populate offset if not done in previous iteration |
352 | if (! offset_populated) { | |
353 | cls_queue_marker marker = {entry_start_offset + index, gen}; | |
354 | CLS_LOG(5, "INFO: queue_list_entries(): offset: %s\n", marker.to_str().c_str()); | |
355 | entry.marker = marker.to_str(); | |
356 | } | |
357 | // Magic number + Data size - process if not done in previous iteration | |
358 | if (! entry_start_processed ) { | |
f67539c2 | 359 | if (size_to_process >= QUEUE_ENTRY_OVERHEAD) { |
9f95a23c TL |
360 | // Decode magic number at start |
361 | try { | |
362 | decode(entry_start, it); | |
f67539c2 TL |
363 | } catch (const ceph::buffer::error& err) { |
364 | CLS_LOG(10, "ERROR: queue_list_entries: failed to decode entry start: %s", err.what()); | |
9f95a23c TL |
365 | return -EINVAL; |
366 | } | |
367 | if (entry_start != QUEUE_ENTRY_START) { | |
f67539c2 | 368 | CLS_LOG(5, "ERROR: queue_list_entries: invalid entry start %u", entry_start); |
9f95a23c TL |
369 | return -EINVAL; |
370 | } | |
371 | index += sizeof(uint16_t); | |
372 | size_to_process -= sizeof(uint16_t); | |
373 | // Decode data size | |
374 | try { | |
375 | decode(data_size, it); | |
f67539c2 TL |
376 | } catch (const ceph::buffer::error& err) { |
377 | CLS_LOG(10, "ERROR: queue_list_entries: failed to decode data size: %s", err.what()); | |
9f95a23c TL |
378 | return -EINVAL; |
379 | } | |
380 | } else { | |
381 | // Copy unprocessed data to bl | |
382 | bl_chunk.splice(index, size_to_process, &bl); | |
383 | offset_populated = true; | |
e306af50 | 384 | last_marker = entry.marker; |
f67539c2 | 385 | CLS_LOG(10, "INFO: queue_list_entries: not enough data to read entry start and data size, breaking out!"); |
9f95a23c TL |
386 | break; |
387 | } | |
f67539c2 | 388 | CLS_LOG(20, "INFO: queue_list_entries(): data size: %lu", data_size); |
9f95a23c TL |
389 | index += sizeof(uint64_t); |
390 | size_to_process -= sizeof(uint64_t); | |
391 | } | |
392 | // Data | |
393 | if (data_size <= size_to_process) { | |
394 | it.copy(data_size, entry.data); | |
395 | index += entry.data.length(); | |
396 | size_to_process -= entry.data.length(); | |
397 | } else { | |
398 | it.copy(size_to_process, bl); | |
399 | offset_populated = true; | |
400 | entry_start_processed = true; | |
e306af50 | 401 | last_marker = entry.marker; |
f67539c2 | 402 | CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!"); |
9f95a23c TL |
403 | break; |
404 | } | |
405 | op_ret.entries.emplace_back(entry); | |
406 | // Resetting some values | |
407 | offset_populated = false; | |
408 | entry_start_processed = false; | |
409 | data_size = 0; | |
410 | entry_start = 0; | |
411 | num_ops++; | |
e306af50 | 412 | last_marker.clear(); |
9f95a23c | 413 | if (num_ops == op.max) { |
f67539c2 | 414 | CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!"); |
9f95a23c TL |
415 | break; |
416 | } | |
417 | } while(index < bl_chunk.length()); | |
418 | ||
419 | CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max); | |
420 | ||
421 | if (num_ops == op.max) { | |
422 | next_marker = cls_queue_marker{(entry_start_offset + index), gen}; | |
f67539c2 | 423 | CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu", next_marker.offset); |
9f95a23c TL |
424 | break; |
425 | } | |
426 | ||
427 | //Calculate new start_offset and contiguous data size | |
428 | start_offset += size_to_read; | |
429 | contiguous_data_size -= size_to_read; | |
430 | if (contiguous_data_size == 0) { | |
431 | if (wrap_around) { | |
432 | start_offset = head.max_head_size; | |
433 | contiguous_data_size = head.tail.offset - head.max_head_size; | |
434 | gen += 1; | |
435 | wrap_around = false; | |
436 | } else { | |
f67539c2 | 437 | CLS_LOG(10, "INFO: queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!"); |
9f95a23c TL |
438 | next_marker = head.tail; |
439 | op_ret.is_truncated = false; | |
440 | break; | |
441 | } | |
442 | } | |
443 | ||
444 | } while(num_ops < op.max); | |
445 | ||
446 | //Wrap around next offset if it has reached end of queue | |
447 | if (next_marker.offset == head.queue_size) { | |
448 | next_marker.offset = head.max_head_size; | |
449 | next_marker.gen += 1; | |
450 | } | |
451 | if ((next_marker.offset == head.tail.offset) && (next_marker.gen == head.tail.gen)) { | |
452 | op_ret.is_truncated = false; | |
453 | } | |
454 | ||
f67539c2 | 455 | CLS_LOG(5, "INFO: queue_list_entries(): next offset: %s", next_marker.to_str().c_str()); |
9f95a23c TL |
456 | op_ret.next_marker = next_marker.to_str(); |
457 | ||
458 | return 0; | |
459 | } | |
460 | ||
461 | int queue_remove_entries(cls_method_context_t hctx, const cls_queue_remove_op& op, cls_queue_head& head) | |
462 | { | |
463 | //Queue is empty | |
464 | if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) { | |
465 | return 0; | |
466 | } | |
467 | ||
468 | cls_queue_marker end_marker; | |
469 | end_marker.from_str(op.end_marker.c_str()); | |
470 | ||
f67539c2 | 471 | CLS_LOG(5, "INFO: queue_remove_entries: op.end_marker = %s", end_marker.to_str().c_str()); |
9f95a23c TL |
472 | |
473 | //Zero out the entries that have been removed, to reclaim storage space | |
474 | if (end_marker.offset > head.front.offset && end_marker.gen == head.front.gen) { | |
475 | uint64_t len = end_marker.offset - head.front.offset; | |
476 | if (len > 0) { | |
477 | auto ret = cls_cxx_write_zero(hctx, head.front.offset, len); | |
478 | if (ret < 0) { | |
f67539c2 TL |
479 | CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries"); |
480 | CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s", head.front.to_str().c_str()); | |
9f95a23c TL |
481 | return ret; |
482 | } | |
483 | } | |
484 | } else if ((head.front.offset >= end_marker.offset) && (end_marker.gen == head.front.gen + 1)) { //start offset > end offset | |
485 | uint64_t len = head.queue_size - head.front.offset; | |
486 | if (len > 0) { | |
487 | auto ret = cls_cxx_write_zero(hctx, head.front.offset, len); | |
488 | if (ret < 0) { | |
f67539c2 TL |
489 | CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries"); |
490 | CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s", head.front.to_str().c_str()); | |
9f95a23c TL |
491 | return ret; |
492 | } | |
493 | } | |
494 | len = end_marker.offset - head.max_head_size; | |
495 | if (len > 0) { | |
496 | auto ret = cls_cxx_write_zero(hctx, head.max_head_size, len); | |
497 | if (ret < 0) { | |
f67539c2 TL |
498 | CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries"); |
499 | CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %lu", head.max_head_size); | |
9f95a23c TL |
500 | return ret; |
501 | } | |
502 | } | |
503 | } else if ((head.front.offset == end_marker.offset) && (head.front.gen == end_marker.gen)) { | |
504 | //no-op | |
505 | } else { | |
f67539c2 | 506 | CLS_LOG(0, "INFO: queue_remove_entries: Invalid end marker: offset = %s, gen = %lu", end_marker.to_str().c_str(), end_marker.gen); |
9f95a23c TL |
507 | return -EINVAL; |
508 | } | |
509 | ||
510 | head.front = end_marker; | |
511 | ||
512 | // Check if it is the end, then wrap around | |
513 | if (head.front.offset == head.queue_size) { | |
514 | head.front.offset = head.max_head_size; | |
515 | head.front.gen += 1; | |
516 | } | |
517 | ||
f67539c2 | 518 | CLS_LOG(20, "INFO: queue_remove_entries: front offset is: %s and tail offset is %s", head.front.to_str().c_str(), head.tail.to_str().c_str()); |
9f95a23c TL |
519 | |
520 | return 0; | |
521 | } |