]> git.proxmox.com Git - ceph.git/blob - ceph/src/cls/queue/cls_queue_src.cc
b48dcd19f8517188d6fda54c39b1ef8ed5682143
[ceph.git] / ceph / src / cls / queue / cls_queue_src.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/types.h"
5
6 #include <errno.h>
7
8 #include "objclass/objclass.h"
9 #include "cls/queue/cls_queue_types.h"
10 #include "cls/queue/cls_queue_ops.h"
11 #include "cls/queue/cls_queue_const.h"
12 #include "cls/queue/cls_queue_src.h"
13
14 int queue_write_head(cls_method_context_t hctx, cls_queue_head& head)
15 {
16 bufferlist bl;
17 uint16_t entry_start = QUEUE_HEAD_START;
18 encode(entry_start, bl);
19
20 bufferlist bl_head;
21 encode(head, bl_head);
22
23 uint64_t encoded_len = bl_head.length();
24 encode(encoded_len, bl);
25
26 bl.claim_append(bl_head);
27
28 int ret = cls_cxx_write2(hctx, 0, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_WILLNEED);
29 if (ret < 0) {
30 CLS_LOG(5, "ERROR: queue_write_head: failed to write head\n");
31 return ret;
32 }
33 return 0;
34 }
35
36 int queue_read_head(cls_method_context_t hctx, cls_queue_head& head)
37 {
38 uint64_t chunk_size = 1024, start_offset = 0;
39
40 bufferlist bl_head;
41 const auto ret = cls_cxx_read(hctx, start_offset, chunk_size, &bl_head);
42 if (ret < 0) {
43 CLS_LOG(5, "ERROR: queue_read_head: failed to read head\n");
44 return ret;
45 }
46
47 //Process the chunk of data read
48 auto it = bl_head.cbegin();
49 // Queue head start
50 uint16_t queue_head_start;
51 try {
52 decode(queue_head_start, it);
53 } catch (buffer::error& err) {
54 CLS_LOG(0, "ERROR: queue_read_head: failed to decode queue start: %s \n", err.what());
55 return -EINVAL;
56 }
57 if (queue_head_start != QUEUE_HEAD_START) {
58 CLS_LOG(0, "ERROR: queue_read_head: invalid queue start\n");
59 return -EINVAL;
60 }
61
62 uint64_t encoded_len;
63 try {
64 decode(encoded_len, it);
65 } catch (buffer::error& err) {
66 CLS_LOG(0, "ERROR: queue_read_head: failed to decode encoded head size: %s\n", err.what());
67 return -EINVAL;
68 }
69
70 constexpr auto decoded_head_size = sizeof(queue_head_start) + sizeof(encoded_len);
71 if (encoded_len > (chunk_size - decoded_head_size)) {
72 start_offset = chunk_size;
73 chunk_size = (encoded_len - (chunk_size - decoded_head_size));
74 bufferlist bl_remaining_head;
75 const auto ret = cls_cxx_read2(hctx, start_offset, chunk_size, &bl_remaining_head, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
76 if (ret < 0) {
77 CLS_LOG(5, "ERROR: queue_read_head: failed to read remaining part of head\n");
78 return ret;
79 }
80 bl_head.claim_append(bl_remaining_head);
81 }
82
83 try {
84 decode(head, it);
85 } catch (buffer::error& err) {
86 CLS_LOG(0, "ERROR: queue_read_head: failed to decode head: %s\n", err.what());
87 return -EINVAL;
88 }
89
90 return 0;
91 }
92
93 int queue_init(cls_method_context_t hctx, const cls_queue_init_op& op)
94 {
95 //get head and its size
96 cls_queue_head head;
97 int ret = queue_read_head(hctx, head);
98
99 //head is already initialized
100 if (ret == 0) {
101 return -EEXIST;
102 }
103
104 if (ret < 0 && ret != -EINVAL) {
105 return ret;
106 }
107
108 if (op.bl_urgent_data.length() > 0) {
109 head.bl_urgent_data = op.bl_urgent_data;
110 }
111
112 head.max_head_size = QUEUE_HEAD_SIZE_1K + op.max_urgent_data_size;
113 head.queue_size = op.queue_size + head.max_head_size;
114 head.max_urgent_data_size = op.max_urgent_data_size;
115 head.tail.gen = head.front.gen = 0;
116 head.tail.offset = head.front.offset = head.max_head_size;
117
118 CLS_LOG(20, "INFO: init_queue_op queue actual size %lu", head.queue_size);
119 CLS_LOG(20, "INFO: init_queue_op head size %lu", head.max_head_size);
120 CLS_LOG(20, "INFO: init_queue_op queue front offset %s", head.front.to_str().c_str());
121 CLS_LOG(20, "INFO: init_queue_op queue max urgent data size %lu", head.max_urgent_data_size);
122
123 return queue_write_head(hctx, head);
124 }
125
126 int queue_get_capacity(cls_method_context_t hctx, cls_queue_get_capacity_ret& op_ret)
127 {
128 //get head
129 cls_queue_head head;
130 int ret = queue_read_head(hctx, head);
131 if (ret < 0) {
132 return ret;
133 }
134
135 op_ret.queue_capacity = head.queue_size - head.max_head_size;
136
137 CLS_LOG(20, "INFO: queue_get_capacity: size of queue is %lu\n", op_ret.queue_capacity);
138
139 return 0;
140 }
141
142 int queue_enqueue(cls_method_context_t hctx, cls_queue_enqueue_op& op, cls_queue_head& head)
143 {
144 if ((head.front.offset == head.tail.offset) && (head.tail.gen == head.front.gen + 1)) {
145 CLS_LOG(0, "ERROR: No space left in queue\n");
146 return -ENOSPC;
147 }
148
149 for (auto& bl_data : op.bl_data_vec) {
150 bufferlist bl;
151 uint16_t entry_start = QUEUE_ENTRY_START;
152 encode(entry_start, bl);
153 uint64_t data_size = bl_data.length();
154 encode(data_size, bl);
155 bl.claim_append(bl_data);
156
157 CLS_LOG(10, "INFO: queue_enqueue(): Total size to be written is %u and data size is %lu\n", bl.length(), data_size);
158
159 if (head.tail.offset >= head.front.offset) {
160 // check if data can fit in the remaining space in queue
161 if ((head.tail.offset + bl.length()) <= head.queue_size) {
162 CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u\n", head.tail.to_str().c_str(), bl.length());
163 //write data size and data at tail offset
164 auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
165 if (ret < 0) {
166 return ret;
167 }
168 head.tail.offset += bl.length();
169 } else {
170 uint64_t free_space_available = (head.queue_size - head.tail.offset) + (head.front.offset - head.max_head_size);
171 //Split data if there is free space available
172 if (bl.length() <= free_space_available) {
173 uint64_t size_before_wrap = head.queue_size - head.tail.offset;
174 bufferlist bl_data_before_wrap;
175 bl.splice(0, size_before_wrap, &bl_data_before_wrap);
176 //write spliced (data size and data) at tail offset
177 CLS_LOG(5, "INFO: queue_enqueue: Writing spliced data at offset: %s and data size: %u\n", head.tail.to_str().c_str(), bl_data_before_wrap.length());
178 auto ret = cls_cxx_write2(hctx, head.tail.offset, bl_data_before_wrap.length(), &bl_data_before_wrap, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
179 if (ret < 0) {
180 return ret;
181 }
182 head.tail.offset = head.max_head_size;
183 head.tail.gen += 1;
184 //write remaining data at tail offset after wrapping around
185 CLS_LOG(5, "INFO: queue_enqueue: Writing remaining data at offset: %s and data size: %u\n", head.tail.to_str().c_str(), bl.length());
186 ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
187 if (ret < 0) {
188 return ret;
189 }
190 head.tail.offset += bl.length();
191 } else {
192 CLS_LOG(0, "ERROR: No space left in queue\n");
193 // return queue full error
194 return -ENOSPC;
195 }
196 }
197 } else if (head.front.offset > head.tail.offset) {
198 if ((head.tail.offset + bl.length()) <= head.front.offset) {
199 CLS_LOG(5, "INFO: queue_enqueue: Writing data size and data: offset: %s, size: %u\n\n", head.tail.to_str().c_str(), bl.length());
200 //write data size and data at tail offset
201 auto ret = cls_cxx_write2(hctx, head.tail.offset, bl.length(), &bl, CEPH_OSD_OP_FLAG_FADVISE_SEQUENTIAL);
202 if (ret < 0) {
203 return ret;
204 }
205 head.tail.offset += bl.length();
206 } else {
207 CLS_LOG(0, "ERROR: No space left in queue\n");
208 // return queue full error
209 return -ENOSPC;
210 }
211 }
212
213 if (head.tail.offset == head.queue_size) {
214 head.tail.offset = head.max_head_size;
215 head.tail.gen += 1;
216 }
217 CLS_LOG(20, "INFO: queue_enqueue: New tail offset: %s \n", head.tail.to_str().c_str());
218 } //end - for
219
220 return 0;
221 }
222
223 int queue_list_entries(cls_method_context_t hctx, const cls_queue_list_op& op, cls_queue_list_ret& op_ret, cls_queue_head& head)
224 {
225 // If queue is empty, return from here
226 if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) {
227 CLS_LOG(20, "INFO: queue_list_entries(): Next offset is %s\n", head.front.to_str().c_str());
228 op_ret.next_marker = head.front.to_str();
229 op_ret.is_truncated = false;
230 return 0;
231 }
232
233 cls_queue_marker start_marker;
234 start_marker.from_str(op.start_marker.c_str());
235 cls_queue_marker next_marker = {0, 0};
236
237 uint64_t start_offset = 0, gen = 0;
238 if (start_marker.offset == 0) {
239 start_offset = head.front.offset;
240 gen = head.front.gen;
241 } else {
242 start_offset = start_marker.offset;
243 gen = start_marker.gen;
244 }
245
246 op_ret.is_truncated = true;
247 uint64_t chunk_size = 1024;
248 uint64_t contiguous_data_size = 0, size_to_read = 0;
249 bool wrap_around = false;
250
251 //Calculate length of contiguous data to be read depending on front, tail and start offset
252 if (head.tail.offset > head.front.offset) {
253 contiguous_data_size = head.tail.offset - start_offset;
254 } else if (head.front.offset >= head.tail.offset) {
255 if (start_offset >= head.front.offset) {
256 contiguous_data_size = head.queue_size - start_offset;
257 wrap_around = true;
258 } else if (start_offset <= head.tail.offset) {
259 contiguous_data_size = head.tail.offset - start_offset;
260 }
261 }
262
263 CLS_LOG(10, "INFO: queue_list_entries(): front is: %s, tail is %s\n", head.front.to_str().c_str(), head.tail.to_str().c_str());
264
265 bool offset_populated = false, entry_start_processed = false;
266 uint64_t data_size = 0, num_ops = 0;
267 uint16_t entry_start = 0;
268 bufferlist bl;
269 do
270 {
271 CLS_LOG(10, "INFO: queue_list_entries(): start_offset is %lu\n", start_offset);
272
273 bufferlist bl_chunk;
274 //Read chunk size at a time, if it is less than contiguous data size, else read contiguous data size
275 if (contiguous_data_size > chunk_size) {
276 size_to_read = chunk_size;
277 } else {
278 size_to_read = contiguous_data_size;
279 }
280 CLS_LOG(10, "INFO: queue_list_entries(): size_to_read is %lu\n", size_to_read);
281 if (size_to_read == 0) {
282 next_marker = head.tail;
283 op_ret.is_truncated = false;
284 CLS_LOG(20, "INFO: queue_list_entries(): size_to_read is 0, hence breaking out!\n");
285 break;
286 }
287
288 auto ret = cls_cxx_read(hctx, start_offset, size_to_read, &bl_chunk);
289 if (ret < 0) {
290 return ret;
291 }
292
293 //If there is leftover data from previous iteration, append new data to leftover data
294 uint64_t entry_start_offset = start_offset - bl.length();
295 CLS_LOG(20, "INFO: queue_list_entries(): Entry start offset accounting for leftover data is %lu\n", entry_start_offset);
296 bl.claim_append(bl_chunk);
297 bl_chunk = std::move(bl);
298
299 CLS_LOG(20, "INFO: queue_list_entries(): size of chunk %u\n", bl_chunk.length());
300
301 //Process the chunk of data read
302 unsigned index = 0;
303 auto it = bl_chunk.cbegin();
304 uint64_t size_to_process = bl_chunk.length();
305 do {
306 CLS_LOG(10, "INFO: queue_list_entries(): index: %u, size_to_process: %lu\n", index, size_to_process);
307 cls_queue_entry entry;
308 ceph_assert(it.get_off() == index);
309 //Populate offset if not done in previous iteration
310 if (! offset_populated) {
311 cls_queue_marker marker = {entry_start_offset + index, gen};
312 CLS_LOG(5, "INFO: queue_list_entries(): offset: %s\n", marker.to_str().c_str());
313 entry.marker = marker.to_str();
314 }
315 // Magic number + Data size - process if not done in previous iteration
316 if (! entry_start_processed ) {
317 if (size_to_process >= (sizeof(uint16_t) + sizeof(uint64_t))) {
318 // Decode magic number at start
319 try {
320 decode(entry_start, it);
321 } catch (buffer::error& err) {
322 CLS_LOG(10, "ERROR: queue_list_entries: failed to decode entry start: %s\n", err.what());
323 return -EINVAL;
324 }
325 if (entry_start != QUEUE_ENTRY_START) {
326 CLS_LOG(5, "ERROR: queue_list_entries: invalid entry start %u\n", entry_start);
327 return -EINVAL;
328 }
329 index += sizeof(uint16_t);
330 size_to_process -= sizeof(uint16_t);
331 // Decode data size
332 try {
333 decode(data_size, it);
334 } catch (buffer::error& err) {
335 CLS_LOG(10, "ERROR: queue_list_entries: failed to decode data size: %s\n", err.what());
336 return -EINVAL;
337 }
338 } else {
339 // Copy unprocessed data to bl
340 bl_chunk.splice(index, size_to_process, &bl);
341 offset_populated = true;
342 CLS_LOG(10, "INFO: queue_list_entries: not enough data to read entry start and data size, breaking out!\n");
343 break;
344 }
345 CLS_LOG(20, "INFO: queue_list_entries(): data size: %lu\n", data_size);
346 index += sizeof(uint64_t);
347 size_to_process -= sizeof(uint64_t);
348 }
349 // Data
350 if (data_size <= size_to_process) {
351 it.copy(data_size, entry.data);
352 index += entry.data.length();
353 size_to_process -= entry.data.length();
354 } else {
355 it.copy(size_to_process, bl);
356 offset_populated = true;
357 entry_start_processed = true;
358 CLS_LOG(10, "INFO: queue_list_entries(): not enough data to read data, breaking out!\n");
359 break;
360 }
361 op_ret.entries.emplace_back(entry);
362 // Resetting some values
363 offset_populated = false;
364 entry_start_processed = false;
365 data_size = 0;
366 entry_start = 0;
367 num_ops++;
368 if (num_ops == op.max) {
369 CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from inner loop!\n");
370 break;
371 }
372 } while(index < bl_chunk.length());
373
374 CLS_LOG(10, "INFO: num_ops: %lu and op.max is %lu\n", num_ops, op.max);
375
376 if (num_ops == op.max) {
377 next_marker = cls_queue_marker{(entry_start_offset + index), gen};
378 CLS_LOG(10, "INFO: queue_list_entries(): num_ops is same as op.max, hence breaking out from outer loop with next offset: %lu\n", next_marker.offset);
379 break;
380 }
381
382 //Calculate new start_offset and contiguous data size
383 start_offset += size_to_read;
384 contiguous_data_size -= size_to_read;
385 if (contiguous_data_size == 0) {
386 if (wrap_around) {
387 start_offset = head.max_head_size;
388 contiguous_data_size = head.tail.offset - head.max_head_size;
389 gen += 1;
390 wrap_around = false;
391 } else {
392 CLS_LOG(10, "INFO: queue_list_entries(): end of queue data is reached, hence breaking out from outer loop!\n");
393 next_marker = head.tail;
394 op_ret.is_truncated = false;
395 break;
396 }
397 }
398
399 } while(num_ops < op.max);
400
401 //Wrap around next offset if it has reached end of queue
402 if (next_marker.offset == head.queue_size) {
403 next_marker.offset = head.max_head_size;
404 next_marker.gen += 1;
405 }
406 if ((next_marker.offset == head.tail.offset) && (next_marker.gen == head.tail.gen)) {
407 op_ret.is_truncated = false;
408 }
409
410 CLS_LOG(5, "INFO: queue_list_entries(): next offset: %s\n", next_marker.to_str().c_str());
411 op_ret.next_marker = next_marker.to_str();
412
413 return 0;
414 }
415
416 int queue_remove_entries(cls_method_context_t hctx, const cls_queue_remove_op& op, cls_queue_head& head)
417 {
418 //Queue is empty
419 if ((head.front.offset == head.tail.offset) && (head.front.gen == head.tail.gen)) {
420 return 0;
421 }
422
423 cls_queue_marker end_marker;
424 end_marker.from_str(op.end_marker.c_str());
425
426 CLS_LOG(5, "INFO: queue_remove_entries: op.end_marker = %s\n", end_marker.to_str().c_str());
427
428 //Zero out the entries that have been removed, to reclaim storage space
429 if (end_marker.offset > head.front.offset && end_marker.gen == head.front.gen) {
430 uint64_t len = end_marker.offset - head.front.offset;
431 if (len > 0) {
432 auto ret = cls_cxx_write_zero(hctx, head.front.offset, len);
433 if (ret < 0) {
434 CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
435 CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s\n", head.front.to_str().c_str());
436 return ret;
437 }
438 }
439 } else if ((head.front.offset >= end_marker.offset) && (end_marker.gen == head.front.gen + 1)) { //start offset > end offset
440 uint64_t len = head.queue_size - head.front.offset;
441 if (len > 0) {
442 auto ret = cls_cxx_write_zero(hctx, head.front.offset, len);
443 if (ret < 0) {
444 CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
445 CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %s\n", head.front.to_str().c_str());
446 return ret;
447 }
448 }
449 len = end_marker.offset - head.max_head_size;
450 if (len > 0) {
451 auto ret = cls_cxx_write_zero(hctx, head.max_head_size, len);
452 if (ret < 0) {
453 CLS_LOG(5, "INFO: queue_remove_entries: Failed to zero out entries\n");
454 CLS_LOG(10, "INFO: queue_remove_entries: Start offset = %lu\n", head.max_head_size);
455 return ret;
456 }
457 }
458 } else if ((head.front.offset == end_marker.offset) && (head.front.gen == end_marker.gen)) {
459 //no-op
460 } else {
461 CLS_LOG(0, "INFO: queue_remove_entries: Invalid end marker: offset = %s, gen = %lu\n", end_marker.to_str().c_str(), end_marker.gen);
462 return -EINVAL;
463 }
464
465 head.front = end_marker;
466
467 // Check if it is the end, then wrap around
468 if (head.front.offset == head.queue_size) {
469 head.front.offset = head.max_head_size;
470 head.front.gen += 1;
471 }
472
473 CLS_LOG(20, "INFO: queue_remove_entries: front offset is: %s and tail offset is %s\n", head.front.to_str().c_str(), head.tail.to_str().c_str());
474
475 return 0;
476 }