]> git.proxmox.com Git - ceph.git/blob - ceph/src/cls/journal/cls_journal.cc
update sources to 12.2.2
[ceph.git] / ceph / src / cls / journal / cls_journal.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/int_types.h"
5 #include "include/buffer.h"
6 #include "include/encoding.h"
7 #include "common/errno.h"
8 #include "objclass/objclass.h"
9 #include "cls/journal/cls_journal_types.h"
10 #include <errno.h>
11 #include <map>
12 #include <string>
13 #include <sstream>
14
15 CLS_VER(1, 0)
16 CLS_NAME(journal)
17
18 namespace {
19
20 static const uint64_t MAX_KEYS_READ = 64;
21
22 static const std::string HEADER_KEY_ORDER = "order";
23 static const std::string HEADER_KEY_SPLAY_WIDTH = "splay_width";
24 static const std::string HEADER_KEY_POOL_ID = "pool_id";
25 static const std::string HEADER_KEY_MINIMUM_SET = "minimum_set";
26 static const std::string HEADER_KEY_ACTIVE_SET = "active_set";
27 static const std::string HEADER_KEY_NEXT_TAG_TID = "next_tag_tid";
28 static const std::string HEADER_KEY_NEXT_TAG_CLASS = "next_tag_class";
29 static const std::string HEADER_KEY_CLIENT_PREFIX = "client_";
30 static const std::string HEADER_KEY_TAG_PREFIX = "tag_";
31
32 std::string to_hex(uint64_t value) {
33 std::ostringstream oss;
34 oss << std::setw(16) << std::setfill('0') << std::hex << value;
35 return oss.str();
36 }
37
38 std::string key_from_client_id(const std::string &client_id) {
39 return HEADER_KEY_CLIENT_PREFIX + client_id;
40 }
41
42 std::string key_from_tag_tid(uint64_t tag_tid) {
43 return HEADER_KEY_TAG_PREFIX + to_hex(tag_tid);
44 }
45
46 uint64_t tag_tid_from_key(const std::string &key) {
47 std::istringstream iss(key);
48 uint64_t id;
49 iss.ignore(HEADER_KEY_TAG_PREFIX.size()) >> std::hex >> id;
50 return id;
51 }
52
53 template <typename T>
54 int read_key(cls_method_context_t hctx, const string &key, T *t,
55 bool ignore_enoent = false) {
56 bufferlist bl;
57 int r = cls_cxx_map_get_val(hctx, key, &bl);
58 if (r == -ENOENT && ignore_enoent) {
59 return 0;
60 } else if (r < 0) {
61 CLS_ERR("failed to get omap key: %s", key.c_str());
62 return r;
63 }
64
65 try {
66 bufferlist::iterator iter = bl.begin();
67 ::decode(*t, iter);
68 } catch (const buffer::error &err) {
69 CLS_ERR("failed to decode input parameters: %s", err.what());
70 return -EINVAL;
71 }
72 return 0;
73 }
74
75 template <typename T>
76 int write_key(cls_method_context_t hctx, const string &key, const T &t) {
77 bufferlist bl;
78 ::encode(t, bl);
79
80 int r = cls_cxx_map_set_val(hctx, key, &bl);
81 if (r < 0) {
82 CLS_ERR("failed to set omap key: %s", key.c_str());
83 return r;
84 }
85 return 0;
86 }
87
88 int remove_key(cls_method_context_t hctx, const string &key) {
89 int r = cls_cxx_map_remove_key(hctx, key);
90 if (r < 0 && r != -ENOENT) {
91 CLS_ERR("failed to remove key: %s", key.c_str());
92 return r;
93 }
94 return 0;
95 }
96
97 int expire_tags(cls_method_context_t hctx, const std::string *skip_client_id) {
98
99 std::string skip_client_key;
100 if (skip_client_id != nullptr) {
101 skip_client_key = key_from_client_id(*skip_client_id);
102 }
103
104 uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
105 std::string last_read = HEADER_KEY_CLIENT_PREFIX;
106 bool more;
107 do {
108 std::map<std::string, bufferlist> vals;
109 int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
110 MAX_KEYS_READ, &vals, &more);
111 if (r < 0 && r != -ENOENT) {
112 CLS_ERR("failed to retrieve registered clients: %s",
113 cpp_strerror(r).c_str());
114 return r;
115 }
116
117 for (auto &val : vals) {
118 // if we are removing a client, skip its commit positions
119 if (val.first == skip_client_key) {
120 continue;
121 }
122
123 cls::journal::Client client;
124 bufferlist::iterator iter = val.second.begin();
125 try {
126 ::decode(client, iter);
127 } catch (const buffer::error &err) {
128 CLS_ERR("error decoding registered client: %s",
129 val.first.c_str());
130 return -EIO;
131 }
132
133 for (auto object_position : client.commit_position.object_positions) {
134 minimum_tag_tid = MIN(minimum_tag_tid, object_position.tag_tid);
135 }
136 }
137 if (!vals.empty()) {
138 last_read = vals.rbegin()->first;
139 }
140 } while (more);
141
142 // cannot expire tags if a client hasn't committed yet
143 if (minimum_tag_tid == std::numeric_limits<uint64_t>::max()) {
144 return 0;
145 }
146
147 // compute the minimum in-use tag for each class
148 std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
149 typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
150 TAG_PASS_SCRUB,
151 TAG_PASS_DONE } TagPass;
152 int tag_pass = TAG_PASS_CALCULATE_MINIMUMS;
153 last_read = HEADER_KEY_TAG_PREFIX;
154 do {
155 std::map<std::string, bufferlist> vals;
156 int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
157 MAX_KEYS_READ, &vals, &more);
158 if (r < 0 && r != -ENOENT) {
159 CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
160 return r;
161 }
162
163 for (auto &val : vals) {
164 cls::journal::Tag tag;
165 bufferlist::iterator iter = val.second.begin();
166 try {
167 ::decode(tag, iter);
168 } catch (const buffer::error &err) {
169 CLS_ERR("error decoding tag: %s", val.first.c_str());
170 return -EIO;
171 }
172
173 if (tag.tid != tag_tid_from_key(val.first)) {
174 CLS_ERR("tag tid mismatched: %s", val.first.c_str());
175 return -EINVAL;
176 }
177
178 if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
179 minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
180 } else if (tag_pass == TAG_PASS_SCRUB &&
181 tag.tid < minimum_tag_class_to_tids[tag.tag_class]) {
182 r = remove_key(hctx, val.first);
183 if (r < 0) {
184 return r;
185 }
186 }
187
188 if (tag.tid >= minimum_tag_tid) {
189 // no need to check for tag classes beyond this point
190 vals.clear();
191 more = false;
192 break;
193 }
194 }
195
196 if (tag_pass != TAG_PASS_DONE && !more) {
197 last_read = HEADER_KEY_TAG_PREFIX;
198 ++tag_pass;
199 } else if (!vals.empty()) {
200 last_read = vals.rbegin()->first;
201 }
202 } while (tag_pass != TAG_PASS_DONE);
203 return 0;
204 }
205
206 int get_client_list_range(cls_method_context_t hctx,
207 std::set<cls::journal::Client> *clients,
208 std::string start_after, uint64_t max_return) {
209 std::string last_read;
210 if (!start_after.empty()) {
211 last_read = key_from_client_id(start_after);
212 }
213
214 std::map<std::string, bufferlist> vals;
215 bool more;
216 int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
217 max_return, &vals, &more);
218 if (r < 0) {
219 CLS_ERR("failed to retrieve omap values: %s", cpp_strerror(r).c_str());
220 return r;
221 }
222
223 for (std::map<std::string, bufferlist>::iterator it = vals.begin();
224 it != vals.end(); ++it) {
225 try {
226 bufferlist::iterator iter = it->second.begin();
227
228 cls::journal::Client client;
229 ::decode(client, iter);
230 clients->insert(client);
231 } catch (const buffer::error &err) {
232 CLS_ERR("could not decode client '%s': %s", it->first.c_str(),
233 err.what());
234 return -EIO;
235 }
236 }
237
238 return 0;
239 }
240
241 int find_min_commit_position(cls_method_context_t hctx,
242 cls::journal::ObjectSetPosition *minset) {
243 int r;
244 bool valid = false;
245 std::string start_after = "";
246 uint64_t tag_tid = 0, entry_tid = 0;
247
248 while (true) {
249 std::set<cls::journal::Client> batch;
250
251 r = get_client_list_range(hctx, &batch, start_after, cls::journal::JOURNAL_MAX_RETURN);
252 if ((r < 0) || batch.empty()) {
253 break;
254 }
255
256 start_after = batch.rbegin()->id;
257
258 // update the (minimum) commit position from this batch of clients
259 for(std::set<cls::journal::Client>::iterator it = batch.begin();
260 it != batch.end(); ++it) {
261 cls::journal::ObjectSetPosition object_set_position = (*it).commit_position;
262 if (object_set_position.object_positions.empty()) {
263 *minset = cls::journal::ObjectSetPosition();
264 break;
265 }
266 cls::journal::ObjectPosition first = object_set_position.object_positions.front();
267
268 // least tag_tid (or least entry_tid for matching tag_tid)
269 if (!valid || (tag_tid > first.tag_tid) || ((tag_tid == first.tag_tid) && (entry_tid > first.entry_tid))) {
270 tag_tid = first.tag_tid;
271 entry_tid = first.entry_tid;
272 *minset = cls::journal::ObjectSetPosition(object_set_position);
273 valid = true;
274 }
275 }
276
277 // got the last batch, we're done
278 if (batch.size() < cls::journal::JOURNAL_MAX_RETURN) {
279 break;
280 }
281 }
282
283 return r;
284 }
285
286 } // anonymous namespace
287
288 /**
289 * Input:
290 * @param order (uint8_t) - bits to shift to compute the object max size
291 * @param splay width (uint8_t) - number of active journal objects
292 *
293 * Output:
294 * @returns 0 on success, negative error code on failure
295 */
296 int journal_create(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
297 uint8_t order;
298 uint8_t splay_width;
299 int64_t pool_id;
300 try {
301 bufferlist::iterator iter = in->begin();
302 ::decode(order, iter);
303 ::decode(splay_width, iter);
304 ::decode(pool_id, iter);
305 } catch (const buffer::error &err) {
306 CLS_ERR("failed to decode input parameters: %s", err.what());
307 return -EINVAL;
308 }
309
310 bufferlist stored_orderbl;
311 int r = cls_cxx_map_get_val(hctx, HEADER_KEY_ORDER, &stored_orderbl);
312 if (r >= 0) {
313 CLS_ERR("journal already exists");
314 return -EEXIST;
315 } else if (r != -ENOENT) {
316 return r;
317 }
318
319 r = write_key(hctx, HEADER_KEY_ORDER, order);
320 if (r < 0) {
321 return r;
322 }
323
324 r = write_key(hctx, HEADER_KEY_SPLAY_WIDTH, splay_width);
325 if (r < 0) {
326 return r;
327 }
328
329 r = write_key(hctx, HEADER_KEY_POOL_ID, pool_id);
330 if (r < 0) {
331 return r;
332 }
333
334 uint64_t object_set = 0;
335 r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set);
336 if (r < 0) {
337 return r;
338 }
339
340 r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set);
341 if (r < 0) {
342 return r;
343 }
344
345 uint64_t tag_id = 0;
346 r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_id);
347 if (r < 0) {
348 return r;
349 }
350
351 r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_id);
352 if (r < 0) {
353 return r;
354 }
355 return 0;
356 }
357
358 /**
359 * Input:
360 * none
361 *
362 * Output:
363 * order (uint8_t)
364 * @returns 0 on success, negative error code on failure
365 */
366 int journal_get_order(cls_method_context_t hctx, bufferlist *in,
367 bufferlist *out) {
368 uint8_t order;
369 int r = read_key(hctx, HEADER_KEY_ORDER, &order);
370 if (r < 0) {
371 return r;
372 }
373
374 ::encode(order, *out);
375 return 0;
376 }
377
378 /**
379 * Input:
380 * none
381 *
382 * Output:
383 * order (uint8_t)
384 * @returns 0 on success, negative error code on failure
385 */
386 int journal_get_splay_width(cls_method_context_t hctx, bufferlist *in,
387 bufferlist *out) {
388 uint8_t splay_width;
389 int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width);
390 if (r < 0) {
391 return r;
392 }
393
394 ::encode(splay_width, *out);
395 return 0;
396 }
397
398 /**
399 * Input:
400 * none
401 *
402 * Output:
403 * pool_id (int64_t)
404 * @returns 0 on success, negative error code on failure
405 */
406 int journal_get_pool_id(cls_method_context_t hctx, bufferlist *in,
407 bufferlist *out) {
408 int64_t pool_id;
409 int r = read_key(hctx, HEADER_KEY_POOL_ID, &pool_id);
410 if (r < 0) {
411 return r;
412 }
413
414 ::encode(pool_id, *out);
415 return 0;
416 }
417
418 /**
419 * Input:
420 * none
421 *
422 * Output:
423 * object set (uint64_t)
424 * @returns 0 on success, negative error code on failure
425 */
426 int journal_get_minimum_set(cls_method_context_t hctx, bufferlist *in,
427 bufferlist *out) {
428 uint64_t minimum_set;
429 int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &minimum_set);
430 if (r < 0) {
431 return r;
432 }
433
434 ::encode(minimum_set, *out);
435 return 0;
436 }
437
438 /**
439 * Input:
440 * @param object set (uint64_t)
441 *
442 * Output:
443 * @returns 0 on success, negative error code on failure
444 */
445 int journal_set_minimum_set(cls_method_context_t hctx, bufferlist *in,
446 bufferlist *out) {
447 uint64_t object_set;
448 try {
449 bufferlist::iterator iter = in->begin();
450 ::decode(object_set, iter);
451 } catch (const buffer::error &err) {
452 CLS_ERR("failed to decode input parameters: %s", err.what());
453 return -EINVAL;
454 }
455
456 uint64_t current_active_set;
457 int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &current_active_set);
458 if (r < 0) {
459 return r;
460 }
461
462 if (current_active_set < object_set) {
463 CLS_ERR("active object set earlier than minimum: %" PRIu64
464 " < %" PRIu64, current_active_set, object_set);
465 return -EINVAL;
466 }
467
468 uint64_t current_minimum_set;
469 r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &current_minimum_set);
470 if (r < 0) {
471 return r;
472 }
473
474 if (object_set == current_minimum_set) {
475 return 0;
476 } else if (object_set < current_minimum_set) {
477 CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64,
478 object_set, current_minimum_set);
479 return -ESTALE;
480 }
481
482 r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set);
483 if (r < 0) {
484 return r;
485 }
486 return 0;
487 }
488
489 /**
490 * Input:
491 * none
492 *
493 * Output:
494 * object set (uint64_t)
495 * @returns 0 on success, negative error code on failure
496 */
497 int journal_get_active_set(cls_method_context_t hctx, bufferlist *in,
498 bufferlist *out) {
499 uint64_t active_set;
500 int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &active_set);
501 if (r < 0) {
502 return r;
503 }
504
505 ::encode(active_set, *out);
506 return 0;
507 }
508
509 /**
510 * Input:
511 * @param object set (uint64_t)
512 *
513 * Output:
514 * @returns 0 on success, negative error code on failure
515 */
516 int journal_set_active_set(cls_method_context_t hctx, bufferlist *in,
517 bufferlist *out) {
518 uint64_t object_set;
519 try {
520 bufferlist::iterator iter = in->begin();
521 ::decode(object_set, iter);
522 } catch (const buffer::error &err) {
523 CLS_ERR("failed to decode input parameters: %s", err.what());
524 return -EINVAL;
525 }
526
527 uint64_t current_minimum_set;
528 int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &current_minimum_set);
529 if (r < 0) {
530 return r;
531 }
532
533 if (current_minimum_set > object_set) {
534 CLS_ERR("minimum object set later than active: %" PRIu64
535 " > %" PRIu64, current_minimum_set, object_set);
536 return -EINVAL;
537 }
538
539 uint64_t current_active_set;
540 r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &current_active_set);
541 if (r < 0) {
542 return r;
543 }
544
545 if (object_set == current_active_set) {
546 return 0;
547 } else if (object_set < current_active_set) {
548 CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64,
549 object_set, current_active_set);
550 return -ESTALE;
551 }
552
553 r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set);
554 if (r < 0) {
555 return r;
556 }
557 return 0;
558 }
559
560 /**
561 * Input:
562 * @param id (string) - unique client id
563 *
564 * Output:
565 * cls::journal::Client
566 * @returns 0 on success, negative error code on failure
567 */
568 int journal_get_client(cls_method_context_t hctx, bufferlist *in,
569 bufferlist *out) {
570 std::string id;
571 try {
572 bufferlist::iterator iter = in->begin();
573 ::decode(id, iter);
574 } catch (const buffer::error &err) {
575 CLS_ERR("failed to decode input parameters: %s", err.what());
576 return -EINVAL;
577 }
578
579 std::string key(key_from_client_id(id));
580 cls::journal::Client client;
581 int r = read_key(hctx, key, &client);
582 if (r < 0) {
583 return r;
584 }
585
586 ::encode(client, *out);
587 return 0;
588 }
589
590 /**
591 * Input:
592 * @param id (string) - unique client id
593 * @param data (bufferlist) - opaque data associated to client
594 *
595 * Output:
596 * @returns 0 on success, negative error code on failure
597 */
598 int journal_client_register(cls_method_context_t hctx, bufferlist *in,
599 bufferlist *out) {
600 std::string id;
601 bufferlist data;
602 try {
603 bufferlist::iterator iter = in->begin();
604 ::decode(id, iter);
605 ::decode(data, iter);
606 } catch (const buffer::error &err) {
607 CLS_ERR("failed to decode input parameters: %s", err.what());
608 return -EINVAL;
609 }
610
611 uint8_t order;
612 int r = read_key(hctx, HEADER_KEY_ORDER, &order);
613 if (r < 0) {
614 return r;
615 }
616
617 std::string key(key_from_client_id(id));
618 bufferlist stored_clientbl;
619 r = cls_cxx_map_get_val(hctx, key, &stored_clientbl);
620 if (r >= 0) {
621 CLS_ERR("duplicate client id: %s", id.c_str());
622 return -EEXIST;
623 } else if (r != -ENOENT) {
624 return r;
625 }
626
627 cls::journal::ObjectSetPosition minset;
628 r = find_min_commit_position(hctx, &minset);
629 if (r < 0)
630 return r;
631
632 cls::journal::Client client(id, data, minset);
633 r = write_key(hctx, key, client);
634 if (r < 0) {
635 return r;
636 }
637 return 0;
638 }
639
640 /**
641 * Input:
642 * @param id (string) - unique client id
643 * @param data (bufferlist) - opaque data associated to client
644 *
645 * Output:
646 * @returns 0 on success, negative error code on failure
647 */
648 int journal_client_update_data(cls_method_context_t hctx, bufferlist *in,
649 bufferlist *out) {
650 std::string id;
651 bufferlist data;
652 try {
653 bufferlist::iterator iter = in->begin();
654 ::decode(id, iter);
655 ::decode(data, iter);
656 } catch (const buffer::error &err) {
657 CLS_ERR("failed to decode input parameters: %s", err.what());
658 return -EINVAL;
659 }
660
661 std::string key(key_from_client_id(id));
662 cls::journal::Client client;
663 int r = read_key(hctx, key, &client);
664 if (r < 0) {
665 return r;
666 }
667
668 client.data = data;
669 r = write_key(hctx, key, client);
670 if (r < 0) {
671 return r;
672 }
673 return 0;
674 }
675
676 /**
677 * Input:
678 * @param id (string) - unique client id
679 * @param state (uint8_t) - client state
680 *
681 * Output:
682 * @returns 0 on success, negative error code on failure
683 */
684 int journal_client_update_state(cls_method_context_t hctx, bufferlist *in,
685 bufferlist *out) {
686 std::string id;
687 cls::journal::ClientState state;
688 bufferlist data;
689 try {
690 bufferlist::iterator iter = in->begin();
691 ::decode(id, iter);
692 uint8_t state_raw;
693 ::decode(state_raw, iter);
694 state = static_cast<cls::journal::ClientState>(state_raw);
695 } catch (const buffer::error &err) {
696 CLS_ERR("failed to decode input parameters: %s", err.what());
697 return -EINVAL;
698 }
699
700 std::string key(key_from_client_id(id));
701 cls::journal::Client client;
702 int r = read_key(hctx, key, &client);
703 if (r < 0) {
704 return r;
705 }
706
707 client.state = state;
708 r = write_key(hctx, key, client);
709 if (r < 0) {
710 return r;
711 }
712 return 0;
713 }
714
715 /**
716 * Input:
717 * @param id (string) - unique client id
718 *
719 * Output:
720 * @returns 0 on success, negative error code on failure
721 */
722 int journal_client_unregister(cls_method_context_t hctx, bufferlist *in,
723 bufferlist *out) {
724 std::string id;
725 try {
726 bufferlist::iterator iter = in->begin();
727 ::decode(id, iter);
728 } catch (const buffer::error &err) {
729 CLS_ERR("failed to decode input parameters: %s", err.what());
730 return -EINVAL;
731 }
732
733 std::string key(key_from_client_id(id));
734 bufferlist bl;
735 int r = cls_cxx_map_get_val(hctx, key, &bl);
736 if (r < 0) {
737 CLS_ERR("client is not registered: %s", id.c_str());
738 return r;
739 }
740
741 r = cls_cxx_map_remove_key(hctx, key);
742 if (r < 0) {
743 CLS_ERR("failed to remove omap key: %s", key.c_str());
744 return r;
745 }
746
747 // prune expired tags
748 r = expire_tags(hctx, &id);
749 if (r < 0) {
750 return r;
751 }
752 return 0;
753 }
754
755 /**
756 * Input:
757 * @param client_id (uint64_t) - unique client id
758 * @param commit_position (ObjectSetPosition)
759 *
760 * Output:
761 * @returns 0 on success, negative error code on failure
762 */
763 int journal_client_commit(cls_method_context_t hctx, bufferlist *in,
764 bufferlist *out) {
765 std::string id;
766 cls::journal::ObjectSetPosition commit_position;
767 try {
768 bufferlist::iterator iter = in->begin();
769 ::decode(id, iter);
770 ::decode(commit_position, iter);
771 } catch (const buffer::error &err) {
772 CLS_ERR("failed to decode input parameters: %s", err.what());
773 return -EINVAL;
774 }
775
776 uint8_t splay_width;
777 int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width);
778 if (r < 0) {
779 return r;
780 }
781 if (commit_position.object_positions.size() > splay_width) {
782 CLS_ERR("too many object positions");
783 return -EINVAL;
784 }
785
786 std::string key(key_from_client_id(id));
787 cls::journal::Client client;
788 r = read_key(hctx, key, &client);
789 if (r < 0) {
790 return r;
791 }
792
793 if (client.commit_position == commit_position) {
794 return 0;
795 }
796
797 client.commit_position = commit_position;
798 r = write_key(hctx, key, client);
799 if (r < 0) {
800 return r;
801 }
802 return 0;
803 }
804
805 /**
806 * Input:
807 * @param start_after (string)
808 * @param max_return (uint64_t)
809 *
810 * Output:
811 * clients (set<cls::journal::Client>) - collection of registered clients
812 * @returns 0 on success, negative error code on failure
813 */
814 int journal_client_list(cls_method_context_t hctx, bufferlist *in,
815 bufferlist *out) {
816 std::string start_after;
817 uint64_t max_return;
818 try {
819 bufferlist::iterator iter = in->begin();
820 ::decode(start_after, iter);
821 ::decode(max_return, iter);
822 } catch (const buffer::error &err) {
823 CLS_ERR("failed to decode input parameters: %s", err.what());
824 return -EINVAL;
825 }
826
827 std::set<cls::journal::Client> clients;
828 int r = get_client_list_range(hctx, &clients, start_after, max_return);
829 if (r < 0)
830 return r;
831
832 ::encode(clients, *out);
833 return 0;
834 }
835
836 /**
837 * Input:
838 * none
839 *
840 * Output:
841 * @returns 0 on success, negative error code on failure
842 */
843 int journal_get_next_tag_tid(cls_method_context_t hctx, bufferlist *in,
844 bufferlist *out) {
845 uint64_t tag_tid;
846 int r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &tag_tid);
847 if (r < 0) {
848 return r;
849 }
850
851 ::encode(tag_tid, *out);
852 return 0;
853 }
854
855 /**
856 * Input:
857 * @param tag_tid (uint64_t)
858 *
859 * Output:
860 * cls::journal::Tag
861 * @returns 0 on success, negative error code on failure
862 */
863 int journal_get_tag(cls_method_context_t hctx, bufferlist *in,
864 bufferlist *out) {
865 uint64_t tag_tid;
866 try {
867 bufferlist::iterator iter = in->begin();
868 ::decode(tag_tid, iter);
869 } catch (const buffer::error &err) {
870 CLS_ERR("failed to decode input parameters: %s", err.what());
871 return -EINVAL;
872 }
873
874 std::string key(key_from_tag_tid(tag_tid));
875 cls::journal::Tag tag;
876 int r = read_key(hctx, key, &tag);
877 if (r < 0) {
878 return r;
879 }
880
881 ::encode(tag, *out);
882 return 0;
883 }
884
885 /**
886 * Input:
887 * @param tag_tid (uint64_t)
888 * @param tag_class (uint64_t)
889 * @param data (bufferlist)
890 *
891 * Output:
892 * @returns 0 on success, negative error code on failure
893 */
894 int journal_tag_create(cls_method_context_t hctx, bufferlist *in,
895 bufferlist *out) {
896 uint64_t tag_tid;
897 uint64_t tag_class;
898 bufferlist data;
899 try {
900 bufferlist::iterator iter = in->begin();
901 ::decode(tag_tid, iter);
902 ::decode(tag_class, iter);
903 ::decode(data, iter);
904 } catch (const buffer::error &err) {
905 CLS_ERR("failed to decode input parameters: %s", err.what());
906 return -EINVAL;
907 }
908
909 std::string key(key_from_tag_tid(tag_tid));
910 bufferlist stored_tag_bl;
911 int r = cls_cxx_map_get_val(hctx, key, &stored_tag_bl);
912 if (r >= 0) {
913 CLS_ERR("duplicate tag id: %" PRIu64, tag_tid);
914 return -EEXIST;
915 } else if (r != -ENOENT) {
916 return r;
917 }
918
919 // verify tag tid ordering
920 uint64_t next_tag_tid;
921 r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &next_tag_tid);
922 if (r < 0) {
923 return r;
924 }
925 if (tag_tid != next_tag_tid) {
926 CLS_LOG(5, "out-of-order tag sequence: %" PRIu64, tag_tid);
927 return -ESTALE;
928 }
929
930 uint64_t next_tag_class;
931 r = read_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, &next_tag_class);
932 if (r < 0) {
933 return r;
934 }
935
936 if (tag_class == cls::journal::Tag::TAG_CLASS_NEW) {
937 // allocate a new tag class
938 tag_class = next_tag_class;
939 r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_class + 1);
940 if (r < 0) {
941 return r;
942 }
943 } else {
944 // verify tag class range
945 if (tag_class >= next_tag_class) {
946 CLS_ERR("out-of-sequence tag class: %" PRIu64, tag_class);
947 return -EINVAL;
948 }
949 }
950
951 // prune expired tags
952 r = expire_tags(hctx, nullptr);
953 if (r < 0) {
954 return r;
955 }
956
957 // update tag tid sequence
958 r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_tid + 1);
959 if (r < 0) {
960 return r;
961 }
962
963 // write tag structure
964 cls::journal::Tag tag(tag_tid, tag_class, data);
965 key = key_from_tag_tid(tag_tid);
966 r = write_key(hctx, key, tag);
967 if (r < 0) {
968 return r;
969 }
970 return 0;
971 }
972
973 /**
974 * Input:
975 * @param start_after_tag_tid (uint64_t) - first tag tid
976 * @param max_return (uint64_t) - max tags to return
977 * @param client_id (std::string) - client id filter
978 * @param tag_class (boost::optional<uint64_t> - optional tag class filter
979 *
980 * Output:
981 * std::set<cls::journal::Tag> - collection of tags
982 * @returns 0 on success, negative error code on failure
983 */
984 int journal_tag_list(cls_method_context_t hctx, bufferlist *in,
985 bufferlist *out) {
986 uint64_t start_after_tag_tid;
987 uint64_t max_return;
988 std::string client_id;
989 boost::optional<uint64_t> tag_class(0);
990
991 // handle compiler false positive about use-before-init
992 tag_class = boost::none;
993 try {
994 bufferlist::iterator iter = in->begin();
995 ::decode(start_after_tag_tid, iter);
996 ::decode(max_return, iter);
997 ::decode(client_id, iter);
998 ::decode(tag_class, iter);
999 } catch (const buffer::error &err) {
1000 CLS_ERR("failed to decode input parameters: %s", err.what());
1001 return -EINVAL;
1002 }
1003
1004 // calculate the minimum tag within client's commit position
1005 uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
1006 cls::journal::Client client;
1007 int r = read_key(hctx, key_from_client_id(client_id), &client);
1008 if (r < 0) {
1009 return r;
1010 }
1011
1012 for (auto object_position : client.commit_position.object_positions) {
1013 minimum_tag_tid = MIN(minimum_tag_tid, object_position.tag_tid);
1014 }
1015
1016 // compute minimum tags in use per-class
1017 std::set<cls::journal::Tag> tags;
1018 std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
1019 typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
1020 TAG_PASS_LIST,
1021 TAG_PASS_DONE } TagPass;
1022 int tag_pass = (minimum_tag_tid == std::numeric_limits<uint64_t>::max() ?
1023 TAG_PASS_LIST : TAG_PASS_CALCULATE_MINIMUMS);
1024 std::string last_read = HEADER_KEY_TAG_PREFIX;
1025 do {
1026 std::map<std::string, bufferlist> vals;
1027 bool more;
1028 r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
1029 MAX_KEYS_READ, &vals, &more);
1030 if (r < 0 && r != -ENOENT) {
1031 CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
1032 return r;
1033 }
1034
1035 for (auto &val : vals) {
1036 cls::journal::Tag tag;
1037 bufferlist::iterator iter = val.second.begin();
1038 try {
1039 ::decode(tag, iter);
1040 } catch (const buffer::error &err) {
1041 CLS_ERR("error decoding tag: %s", val.first.c_str());
1042 return -EIO;
1043 }
1044
1045 if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
1046 minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
1047
1048 // completed calculation of tag class minimums
1049 if (tag.tid >= minimum_tag_tid) {
1050 vals.clear();
1051 more = false;
1052 break;
1053 }
1054 } else if (tag_pass == TAG_PASS_LIST) {
1055 if (start_after_tag_tid != 0 && tag.tid <= start_after_tag_tid) {
1056 continue;
1057 }
1058
1059 if (tag.tid >= minimum_tag_class_to_tids[tag.tag_class] &&
1060 (!tag_class || *tag_class == tag.tag_class)) {
1061 tags.insert(tag);
1062 }
1063 if (tags.size() >= max_return) {
1064 tag_pass = TAG_PASS_DONE;
1065 }
1066 }
1067 }
1068
1069 if (tag_pass != TAG_PASS_DONE && !more) {
1070 last_read = HEADER_KEY_TAG_PREFIX;
1071 ++tag_pass;
1072 } else if (!vals.empty()) {
1073 last_read = vals.rbegin()->first;
1074 }
1075 } while (tag_pass != TAG_PASS_DONE);
1076
1077 ::encode(tags, *out);
1078 return 0;
1079 }
1080
1081 /**
1082 * Input:
1083 * @param soft_max_size (uint64_t)
1084 *
1085 * Output:
1086 * @returns 0 if object size less than max, negative error code otherwise
1087 */
1088 int journal_object_guard_append(cls_method_context_t hctx, bufferlist *in,
1089 bufferlist *out) {
1090 uint64_t soft_max_size;
1091 try {
1092 bufferlist::iterator iter = in->begin();
1093 ::decode(soft_max_size, iter);
1094 } catch (const buffer::error &err) {
1095 CLS_ERR("failed to decode input parameters: %s", err.what());
1096 return -EINVAL;
1097 }
1098
1099 uint64_t size;
1100 time_t mtime;
1101 int r = cls_cxx_stat(hctx, &size, &mtime);
1102 if (r == -ENOENT) {
1103 return 0;
1104 } else if (r < 0) {
1105 CLS_ERR("failed to stat object: %s", cpp_strerror(r).c_str());
1106 return r;
1107 }
1108
1109 if (size >= soft_max_size) {
1110 CLS_LOG(5, "journal object full: %" PRIu64 " >= %" PRIu64,
1111 size, soft_max_size);
1112 return -EOVERFLOW;
1113 }
1114 return 0;
1115 }
1116
1117 CLS_INIT(journal)
1118 {
1119 CLS_LOG(20, "Loaded journal class!");
1120
1121 cls_handle_t h_class;
1122 cls_method_handle_t h_journal_create;
1123 cls_method_handle_t h_journal_get_order;
1124 cls_method_handle_t h_journal_get_splay_width;
1125 cls_method_handle_t h_journal_get_pool_id;
1126 cls_method_handle_t h_journal_get_minimum_set;
1127 cls_method_handle_t h_journal_set_minimum_set;
1128 cls_method_handle_t h_journal_get_active_set;
1129 cls_method_handle_t h_journal_set_active_set;
1130 cls_method_handle_t h_journal_get_client;
1131 cls_method_handle_t h_journal_client_register;
1132 cls_method_handle_t h_journal_client_update_data;
1133 cls_method_handle_t h_journal_client_update_state;
1134 cls_method_handle_t h_journal_client_unregister;
1135 cls_method_handle_t h_journal_client_commit;
1136 cls_method_handle_t h_journal_client_list;
1137 cls_method_handle_t h_journal_get_next_tag_tid;
1138 cls_method_handle_t h_journal_get_tag;
1139 cls_method_handle_t h_journal_tag_create;
1140 cls_method_handle_t h_journal_tag_list;
1141 cls_method_handle_t h_journal_object_guard_append;
1142
1143 cls_register("journal", &h_class);
1144
1145 /// methods for journal.$journal_id objects
1146 cls_register_cxx_method(h_class, "create",
1147 CLS_METHOD_RD | CLS_METHOD_WR,
1148 journal_create, &h_journal_create);
1149 cls_register_cxx_method(h_class, "get_order",
1150 CLS_METHOD_RD,
1151 journal_get_order, &h_journal_get_order);
1152 cls_register_cxx_method(h_class, "get_splay_width",
1153 CLS_METHOD_RD,
1154 journal_get_splay_width, &h_journal_get_splay_width);
1155 cls_register_cxx_method(h_class, "get_pool_id",
1156 CLS_METHOD_RD,
1157 journal_get_pool_id, &h_journal_get_pool_id);
1158 cls_register_cxx_method(h_class, "get_minimum_set",
1159 CLS_METHOD_RD,
1160 journal_get_minimum_set,
1161 &h_journal_get_minimum_set);
1162 cls_register_cxx_method(h_class, "set_minimum_set",
1163 CLS_METHOD_RD | CLS_METHOD_WR,
1164 journal_set_minimum_set,
1165 &h_journal_set_minimum_set);
1166 cls_register_cxx_method(h_class, "get_active_set",
1167 CLS_METHOD_RD,
1168 journal_get_active_set,
1169 &h_journal_get_active_set);
1170 cls_register_cxx_method(h_class, "set_active_set",
1171 CLS_METHOD_RD | CLS_METHOD_WR,
1172 journal_set_active_set,
1173 &h_journal_set_active_set);
1174
1175 cls_register_cxx_method(h_class, "get_client",
1176 CLS_METHOD_RD,
1177 journal_get_client, &h_journal_get_client);
1178 cls_register_cxx_method(h_class, "client_register",
1179 CLS_METHOD_RD | CLS_METHOD_WR,
1180 journal_client_register, &h_journal_client_register);
1181 cls_register_cxx_method(h_class, "client_update_data",
1182 CLS_METHOD_RD | CLS_METHOD_WR,
1183 journal_client_update_data,
1184 &h_journal_client_update_data);
1185 cls_register_cxx_method(h_class, "client_update_state",
1186 CLS_METHOD_RD | CLS_METHOD_WR,
1187 journal_client_update_state,
1188 &h_journal_client_update_state);
1189 cls_register_cxx_method(h_class, "client_unregister",
1190 CLS_METHOD_RD | CLS_METHOD_WR,
1191 journal_client_unregister,
1192 &h_journal_client_unregister);
1193 cls_register_cxx_method(h_class, "client_commit",
1194 CLS_METHOD_RD | CLS_METHOD_WR,
1195 journal_client_commit, &h_journal_client_commit);
1196 cls_register_cxx_method(h_class, "client_list",
1197 CLS_METHOD_RD,
1198 journal_client_list, &h_journal_client_list);
1199
1200 cls_register_cxx_method(h_class, "get_next_tag_tid",
1201 CLS_METHOD_RD,
1202 journal_get_next_tag_tid,
1203 &h_journal_get_next_tag_tid);
1204 cls_register_cxx_method(h_class, "get_tag",
1205 CLS_METHOD_RD,
1206 journal_get_tag, &h_journal_get_tag);
1207 cls_register_cxx_method(h_class, "tag_create",
1208 CLS_METHOD_RD | CLS_METHOD_WR,
1209 journal_tag_create, &h_journal_tag_create);
1210 cls_register_cxx_method(h_class, "tag_list",
1211 CLS_METHOD_RD,
1212 journal_tag_list, &h_journal_tag_list);
1213
1214 /// methods for journal_data.$journal_id.$object_id objects
1215 cls_register_cxx_method(h_class, "guard_append",
1216 CLS_METHOD_RD | CLS_METHOD_WR,
1217 journal_object_guard_append,
1218 &h_journal_object_guard_append);
1219 }