]> git.proxmox.com Git - ceph.git/blob - ceph/src/cls/queue/cls_queue_src.cc
import 15.2.4
[ceph.git] / ceph / src / cls / queue / cls_queue_src.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/types.h"
5
6 #include <errno.h>
7
8 #include "objclass/objclass.h"
9 #include "cls/queue/cls_queue_types.h"
10 #include "cls/queue/cls_queue_ops.h"
11 #include "cls/queue/cls_queue_const.h"
12 #include "cls/queue/cls_queue_src.h"
13
14 int queue_write_head(cls_method_context_t hctx, cls_queue_head& head)
15 {
16 bufferlist bl;
17 uint16_t entry_start = QUEUE_HEAD_START;
18 encode(entry_start, bl);
19
20 bufferlist bl_head;
21 encode(head, bl_head);
22
23 uint64_t encoded_len = bl_head.length();
24 encode(encoded_len, bl);
25
26 bl.claim_append(bl_head);
27
28 if (bl.length() > head.max_head_size) {
29 CLS_LOG(0, "ERROR: queue_write_head: invalid head size = %u and urgent data size = %u \n", bl.length(), head.bl_urgent_data.length());
30 return -EINVAL;
31 }
32
33 int ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
34 if (ret < 0) {
35 CLS_LOG(5, "ERROR: queue_write_head: failed to write head\n");
36 return ret;
37 }
38 return 0;
39 }
40
41 int queue_read_head(cls_method_context_t hctx, cls_queue_head& head)
42 {
43 uint64_t chunk_size = 1024, start_offset = 0;
44
45 bufferlist bl_head;
46 const auto ret = cls_cxx_read(hctx, start_offset, chunk_size, &bl_head);
47 if (ret < 0) {
48 CLS_LOG(5, "ERROR: queue_read_head: failed to read head\n");
49 return ret;
50 }
51
52 //Process the chunk of data read
53 auto it = bl_head.cbegin();
54 // Queue head start
55 uint16_t queue_head_start;
56 try {
57 decode(queue_head_start, it);
58 } catch (buffer::error& err) {
59 CLS_LOG(0, "ERROR: queue_read_head: failed to decode queue start: %s \n", err.what());
60 return -EINVAL;
61 }
62 if (queue_head_start != QUEUE_HEAD_START) {
63 CLS_LOG(0, "ERROR: queue_read_head: invalid queue start\n");
64 return -EINVAL;
65 }
66
67 uint64_t encoded_len;
68 try {
69 decode(encoded_len, it);
70 } catch (buffer::error& err) {
71 CLS_LOG(0, "ERROR: queue_read_head: failed to decode encoded head size: %s\n", err.what());
72 return -EINVAL;
73 }
74
75 constexpr auto decoded_head_size = sizeof(queue_head_start) + sizeof(encoded_len);
76 if (encoded_len > (chunk_size - decoded_head_size)) {
77 start_offset = chunk_size;
78 chunk_size = (encoded_len - (chunk_size - decoded_head_size));
79 bufferlist bl_remaining_head;
80 const auto ret = cls_cxx_read2(hctx, start_offset, chunk_size, &bl_remaining_head, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
81 if (ret < 0) {
82 CLS_LOG(5, "ERROR: queue_read_head: failed to read remaining part of head\n");
83 return ret;
84 }
85 bl_head.claim_append(bl_remaining_head);
86 }
87
88 try {
89 decode(head, it);
90 } catch (buffer::error& err) {
91 CLS_LOG(0, "ERROR: queue_read_head: failed to decode head: %s\n", err.what());
92 return -EINVAL;
93 }
94
95 return 0;
96 }
97
98 int queue_init(cls_method_context_t hctx, const cls_queue_init_op& op)
99 {
100 //get head and its size
101 cls_queue_head head;
102 int ret = queue_read_head(hctx, head);
103
104 //head is already initialized
105 if (ret == 0) {
106 return -EEXIST;
107 }
108
109 if (ret < 0 && ret != -EINVAL) {
110 return ret;
111 }
112
113 if (op.bl_urgent_data.length() > 0) {
114 head.bl_urgent_data = op.bl_urgent_data;
115 }
116
117 head.max_head_size = QUEUE_HEAD_SIZE_1K + op.max_urgent_data_size;
118 head.queue_size = op.queue_size + head.max_head_size;
119 head.max_urgent_data_size = op.max_urgent_data_size;
120 head.tail.gen = head.front.gen = 0;
121 head.tail.offset = head.front.offset = head.max_head_size;
122
123 CLS_LOG(20, "INFO: init_queue_op queue actual size %lu", head.queue_size);
124 CLS_LOG(20, "INFO: init_queue_op head size %lu", head.max_head_size);
125 CLS_LOG(20, "INFO: init_queue_op queue front offset %s", head.front.to_str().c_str());
126 CLS_LOG(20, "INFO: init_queue_op queue max urgent data size %lu", head.max_urgent_data_size);
127
128 return queue_write_head(hctx, head);
129 }
130
131 int queue_get_capacity(cls_method_context_t hctx, cls_queue_get_capacity_ret& op_ret)
132 {
133 //get head
134 cls_queue_head head;
135 int ret = queue_read_head(hctx, head);
136 if (ret < 0) {
137 return ret;
138 }
139
140 op_ret.queue_capacity = head.queue_size - head.max_head_size;
141
142 CLS_LOG(20, "INFO: queue_get_capacity: size of queue is %lu\n", op_ret.queue_capacity);
143
144 return 0;
145 }
146
147 int queue_enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head)
148 {
149 if ((head.front.offset == head.tail.offset) && (head.tail.gen == head.front.gen + 1)) {
150 CLS_LOG(0, "ERROR: No space left in queue\n");
151 return -ENOSPC;
152 }
153
154 for (auto& bl_data : op.bl_data_vec) {
155 bufferlist bl;
156 uint16_t entry_start = QUEUE_ENTRY_START;
157 encode(entry_start, bl);
158 uint64_t data_size = bl_data.length();
159 encode(data_size, bl);
160 bl.claim_append(bl_data);
161
162 CLS_LOG(10, "INFO: queue_enqueue(): Total size to be written is %u and data size is %lu\n", bl.length(), data_size);
163
164 if (head.tail.offset >= head.front.offset) {
165 // check if data can fit in the remaining space in queue
166 if ((head.tail.offset + bl.length()) <= head.queue_size) {
167 CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u\n", head.tail.to_str().c_str(), bl.length());
168 //write data size and data at tail offset
169 auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
170 if (ret < 0) {
171 return ret;
172 }
173 head.tail.offset += bl.length();
174 } else {
175 uint64_t free_space_available = (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size);
176 //Split data if there is free space available
177 if (bl.length() <= free_space_available) {
178 uint64_t size_before_wrap = head.queue_size - head.tail.offset;
179 bufferlist bl_data_before_wrap;
180 bl.splice(0, size_before_wrap, &bl_data_before_wrap);
181 //write spliced (data size and data) at tail offset
182 CLS_LOG(5, "INFO: queue_enqueue: Writing spliced data at offset: %s and data size: %u\n", head.tail.to_str().c_str(), bl_data_before_wrap.length());
183 auto ret = cls_cxx_write2(hctx, head.tail.offset, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
184 if (ret < 0) {
185 return ret;
186 }
187 head.tail.offset = head.max_head_size;
188 head.tail.gen += 1;
189 //write remaining data at tail offset after wrapping around
190 CLS_LOG(5, "INFO: queue_enqueue: Writing remaining data at offset: %s and data size: %u\n", head.tail.to_str().c_str(), bl.length());
191 ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
192 if (ret < 0) {
193 return ret;
194 }
195 head.tail.offset += bl.length();
196 } else {
197 CLS_LOG(0, "ERROR: No space left in queue\n");
198 // return queue full error
199 return -ENOSPC;
200 }
201 }
202 } else if (head.front.offset > head.tail.offset) {
203 if ((head.tail.offset + bl.length()) <= head.front.offset) {
204 CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u\n\n", head.tail.to_str().c_str(), bl.length());
205 //write data size and data at tail offset
206 auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
207 if (ret < 0) {
208 return ret;
209 }
210 head.tail.offset += bl.length();
211 } else {
212 CLS_LOG(0, "ERROR: No space left in queue\n");
213 // return queue full error
214 return -ENOSPC;
215 }
216 }
217
218 if (head.tail.offset == head.queue_size) {
219 head.tail.offset = head.max_head_size;
220 head.tail.gen += 1;
221 }
222 CLS_LOG(20, "INFO: queue_enqueue: New tail offset: %s \n", head.tail.to_str().c_str());
223 } //end - for
224
225 return 0;
226 }
227
228 int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, cls_queue_list_ret& op_ret, cls_queue_head& head)
229 {
230 // If queue is empty, return from here
231 if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) {
232 CLS_LOG(20, "INFO: queue_list_entries(): Next offset is %s\n", head.front.to_str().c_str());
233 op_ret.next_marker = head.front.to_str();
234 op_ret.is_truncated = false;
235 return 0;
236 }
237
238 cls_queue_marker start_marker;
239 start_marker.from_str(op.start_marker.c_str());
240 cls_queue_marker next_marker = {0, 0};
241
242 uint64_t start_offset = 0, gen = 0;
243 if (start_marker.offset == 0) {
244 start_offset = head.front.offset;
245 gen = head.front.gen;
246 } else {
247 start_offset = start_marker.offset;
248 gen = start_marker.gen;
249 }
250
251 op_ret.is_truncated = true;
252 uint64_t chunk_size = 1024;
253 uint64_t contiguous_data_size = 0, size_to_read = 0;
254 bool wrap_around = false;
255
256 //Calculate length of contiguous data to be read depending on front, tail and start offset
257 if (head.tail.offset > head.front.offset) {
258 contiguous_data_size = head.tail.offset - start_offset;
259 } else if (head.front.offset >= head.tail.offset) {
260 if (start_offset >= head.front.offset) {
261 contiguous_data_size = head.queue_size - start_offset;
262 wrap_around = true;
263 } else if (start_offset <= head.tail.offset) {
264 contiguous_data_size = head.tail.offset - start_offset;
265 }
266 }
267
268 CLS_LOG(10, "INFO: queue_list_entries(): front is: %s, tail is %s\n", head.front.to_str().c_str(), head.tail.to_str().c_str());
269
270 bool offset_populated = false, entry_start_processed = false;
271 uint64_t data_size = 0, num_ops = 0;
272 uint16_t entry_start = 0;
273 bufferlist bl;
274 string last_marker;
275 do
276 {
277 CLS_LOG(10, "INFO: queue_list_entries(): start_offset is %lu\n", start_offset);
278
279 bufferlist bl_chunk;
280 //Read chunk size at a time, if it is less than contiguous data size, else read contiguous data size
281 if (contiguous_data_size > chunk_size) {
282 size_to_read = chunk_size;
283 } else {
284 size_to_read = contiguous_data_size;
285 }
286 CLS_LOG(10, "INFO: queue_list_entries(): size_to_read is %lu\n", size_to_read);
287 if (size_to_read == 0) {
288 next_marker = head.tail;
289 op_ret.is_truncated = false;
290 CLS_LOG(20, "INFO: queue_list_entries(): size_to_read is 0, hence breaking out!\n");
291 break;
292 }
293
294 auto ret = cls_cxx_read(hctx, start_offset, size_to_read, &bl_chunk);
295 if (ret < 0) {
296 return ret;
297 }
298
299 //If there is leftover data from previous iteration, append new data to leftover data
300 uint64_t entry_start_offset = start_offset - bl.length();
301 CLS_LOG(20, "INFO: queue_list_entries(): Entry start offset accounting for leftover data is %lu\n", entry_start_offset);
302 bl.claim_append(bl_chunk);
303 bl_chunk = std::move(bl);
304
305 CLS_LOG(20, "INFO: queue_list_entries(): size of chunk %u\n", bl_chunk.length());
306
307 //Process the chunk of data read
308 unsigned index = 0;
309 auto it = bl_chunk.cbegin();
310 uint64_t size_to_process = bl_chunk.length();
311 do {
312 CLS_LOG(10, "INFO: queue_list_entries(): index: %u, size_to_process: %lu\n", index, size_to_process);
313 cls_queue_entry entry;
314 ceph_assert(it.get_off() == index);
315 //Use the last marker saved in previous iteration as the marker for this entry
316 if (offset_populated) {
317 entry.marker = last_marker;
318 }
319 //Populate offset if not done in previous iteration
320 if (! offset_populated) {
321 cls_queue_marker marker = {entry_start_offset + index, gen};
322 CLS_LOG(5, "INFO: queue_list_entries(): offset: %s\n", marker.to_str().c_str());
323 entry.marker = marker.to_str();
324 }
325 // Magic number + Data size - process if not done in previous iteration
326 if (! entry_start_processed ) {
327 if (size_to_process >= (sizeof(uint16_t) + sizeof(uint64_t))) {
328 // Decode magic number at start
329 try {
330 decode(entry_start, it);
331 } catch (buffer::error& err) {
332 CLS_LOG(10, "ERROR: queue_list_entries: failed to decode entry start: %s\n", err.what());
333 return -EINVAL;
334 }
335 if (entry_start != QUEUE_ENTRY_START) {
336 CLS_LOG(5, "ERROR: queue_list_entries: invalid entry start %u\n", entry_start);
337 return -EINVAL;
338 }
339 index += sizeof(uint16_t);
340 size_to_process -= sizeof(uint16_t);
341 // Decode data size
342 try {
343 decode(data_size, it);
344 } catch (buffer::error& err) {
345 CLS_LOG(10, "ERROR: queue_list_entries: failed to decode data size: %s\n", err.what());
346 return -EINVAL;
347 }
348 } else {
349 // Copy unprocessed data to bl
350 bl_chunk.splice(index, size_to_process, &bl);
351 offset_populated = true;
352 last_marker = entry.marker;
353 CLS_LOG(10, "INFO: queue_list_entries: not enough data to read entry start and data size, breaking out!\n");
354 break;
355 }
356 CLS_LOG(20, "INFO: queue_list_entries(): data size: %lu\n", data_size);
357 index += sizeof(uint64_t);
358 size_to_process -= sizeof(uint64_t);
359 }
360 // Data
361 if (data_size <= size_to_process) {
362 it.copy(data_size, entry.data);
363 index += entry.data.length();
364 size_to_process -= entry.data.length();
365 } else {
366 it.copy(size_to_process, bl);
367 offset_populated = true;
368 entry_start_processed = true;
369 last_marker = entry.marker;
370 CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!\n");
371 break;
372 }
373 op_ret.entries.emplace_back(entry);
374 // Resetting some values
375 offset_populated = false;
376 entry_start_processed = false;
377 data_size = 0;
378 entry_start = 0;
379 num_ops++;
380 last_marker.clear();
381 if (num_ops == op.max) {
382 CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!\n");
383 break;
384 }
385 } while(index < bl_chunk.length());
386
387 CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max);
388
389 if (num_ops == op.max) {
390 next_marker = cls_queue_marker{(entry_start_offset + index), gen};
391 CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu\n", next_marker.offset);
392 break;
393 }
394
395 //Calculate new start_offset and contiguous data size
396 start_offset += size_to_read;
397 contiguous_data_size -= size_to_read;
398 if (contiguous_data_size == 0) {
399 if (wrap_around) {
400 start_offset = head.max_head_size;
401 contiguous_data_size = head.tail.offset - head.max_head_size;
402 gen += 1;
403 wrap_around = false;
404 } else {
405 CLS_LOG(10, "INFO: queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!\n");
406 next_marker = head.tail;
407 op_ret.is_truncated = false;
408 break;
409 }
410 }
411
412 } while(num_ops < op.max);
413
414 //Wrap around next offset if it has reached end of queue
415 if (next_marker.offset == head.queue_size) {
416 next_marker.offset = head.max_head_size;
417 next_marker.gen += 1;
418 }
419 if ((next_marker.offset == head.tail.offset) && (next_marker.gen == head.tail.gen)) {
420 op_ret.is_truncated = false;
421 }
422
423 CLS_LOG(5, "INFO: queue_list_entries(): next offset: %s\n", next_marker.to_str().c_str());
424 op_ret.next_marker = next_marker.to_str();
425
426 return 0;
427 }
428
429 int queue_remove_entries(cls_method_context_t hctx, const cls_queue_remove_op& op, cls_queue_head& head)
430 {
431 //Queue is empty
432 if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) {
433 return 0;
434 }
435
436 cls_queue_marker end_marker;
437 end_marker.from_str(op.end_marker.c_str());
438
439 CLS_LOG(5, "INFO: queue_remove_entries: op.end_marker = %s\n", end_marker.to_str().c_str());
440
441 //Zero out the entries that have been removed, to reclaim storage space
442 if (end_marker.offset > head.front.offset && end_marker.gen == head.front.gen) {
443 uint64_t len = end_marker.offset - head.front.offset;
444 if (len > 0) {
445 auto ret = cls_cxx_write_zero(hctx, head.front.offset, len);
446 if (ret < 0) {
447 CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
448 CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s\n", head.front.to_str().c_str());
449 return ret;
450 }
451 }
452 } else if ((head.front.offset >= end_marker.offset) && (end_marker.gen == head.front.gen + 1)) { //start offset > end offset
453 uint64_t len = head.queue_size - head.front.offset;
454 if (len > 0) {
455 auto ret = cls_cxx_write_zero(hctx, head.front.offset, len);
456 if (ret < 0) {
457 CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
458 CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s\n", head.front.to_str().c_str());
459 return ret;
460 }
461 }
462 len = end_marker.offset - head.max_head_size;
463 if (len > 0) {
464 auto ret = cls_cxx_write_zero(hctx, head.max_head_size, len);
465 if (ret < 0) {
466 CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
467 CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %lu\n", head.max_head_size);
468 return ret;
469 }
470 }
471 } else if ((head.front.offset == end_marker.offset) && (head.front.gen == end_marker.gen)) {
472 //no-op
473 } else {
474 CLS_LOG(0, "INFO: queue_remove_entries: Invalid end marker: offset = %s, gen = %lu\n", end_marker.to_str().c_str(), end_marker.gen);
475 return -EINVAL;
476 }
477
478 head.front = end_marker;
479
480 // Check if it is the end, then wrap around
481 if (head.front.offset == head.queue_size) {
482 head.front.offset = head.max_head_size;
483 head.front.gen += 1;
484 }
485
486 CLS_LOG(20, "INFO: queue_remove_entries: front offset is: %s and tail offset is %s\n", head.front.to_str().c_str(), head.tail.to_str().c_str());
487
488 return 0;
489 }