]> git.proxmox.com Git - ceph.git/blob - ceph/src/cls/journal/cls_journal.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / cls / journal / cls_journal.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/int_types.h"
5 #include "include/buffer.h"
6 #include "include/encoding.h"
7 #include "common/errno.h"
8 #include "objclass/objclass.h"
9 #include "cls/journal/cls_journal_types.h"
10 #include <errno.h>
11 #include <map>
12 #include <string>
13 #include <sstream>
14
15 CLS_VER(1, 0)
16 CLS_NAME(journal)
17
18 namespace {
19
20 static const uint64_t MAX_KEYS_READ = 64;
21
22 static const std::string HEADER_KEY_ORDER = "order";
23 static const std::string HEADER_KEY_SPLAY_WIDTH = "splay_width";
24 static const std::string HEADER_KEY_POOL_ID = "pool_id";
25 static const std::string HEADER_KEY_MINIMUM_SET = "minimum_set";
26 static const std::string HEADER_KEY_ACTIVE_SET = "active_set";
27 static const std::string HEADER_KEY_NEXT_TAG_TID = "next_tag_tid";
28 static const std::string HEADER_KEY_NEXT_TAG_CLASS = "next_tag_class";
29 static const std::string HEADER_KEY_CLIENT_PREFIX = "client_";
30 static const std::string HEADER_KEY_TAG_PREFIX = "tag_";
31
32 std::string to_hex(uint64_t value) {
33 std::ostringstream oss;
34 oss << std::setw(16) << std::setfill('0') << std::hex << value;
35 return oss.str();
36 }
37
38 std::string key_from_client_id(const std::string &client_id) {
39 return HEADER_KEY_CLIENT_PREFIX + client_id;
40 }
41
42 std::string key_from_tag_tid(uint64_t tag_tid) {
43 return HEADER_KEY_TAG_PREFIX + to_hex(tag_tid);
44 }
45
46 uint64_t tag_tid_from_key(const std::string &key) {
47 std::istringstream iss(key);
48 uint64_t id;
49 iss.ignore(HEADER_KEY_TAG_PREFIX.size()) >> std::hex >> id;
50 return id;
51 }
52
53 template <typename T>
54 int read_key(cls_method_context_t hctx, const string &key, T *t,
55 bool ignore_enoent = false) {
56 bufferlist bl;
57 int r = cls_cxx_map_get_val(hctx, key, &bl);
58 if (r == -ENOENT) {
59 if (ignore_enoent) {
60 r = 0;
61 }
62 return r;
63 } else if (r < 0) {
64 CLS_ERR("failed to get omap key: %s", key.c_str());
65 return r;
66 }
67
68 try {
69 auto iter = bl.cbegin();
70 decode(*t, iter);
71 } catch (const buffer::error &err) {
72 CLS_ERR("failed to decode input parameters: %s", err.what());
73 return -EINVAL;
74 }
75 return 0;
76 }
77
78 template <typename T>
79 int write_key(cls_method_context_t hctx, const string &key, const T &t) {
80 bufferlist bl;
81 encode(t, bl);
82
83 int r = cls_cxx_map_set_val(hctx, key, &bl);
84 if (r < 0) {
85 CLS_ERR("failed to set omap key: %s", key.c_str());
86 return r;
87 }
88 return 0;
89 }
90
91 int remove_key(cls_method_context_t hctx, const string &key) {
92 int r = cls_cxx_map_remove_key(hctx, key);
93 if (r < 0 && r != -ENOENT) {
94 CLS_ERR("failed to remove key: %s", key.c_str());
95 return r;
96 }
97 return 0;
98 }
99
100 int expire_tags(cls_method_context_t hctx, const std::string *skip_client_id) {
101
102 std::string skip_client_key;
103 if (skip_client_id != nullptr) {
104 skip_client_key = key_from_client_id(*skip_client_id);
105 }
106
107 uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
108 std::string last_read = "";
109 bool more;
110 do {
111 std::map<std::string, bufferlist> vals;
112 int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
113 MAX_KEYS_READ, &vals, &more);
114 if (r < 0 && r != -ENOENT) {
115 CLS_ERR("failed to retrieve registered clients: %s",
116 cpp_strerror(r).c_str());
117 return r;
118 }
119
120 for (auto &val : vals) {
121 // if we are removing a client, skip its commit positions
122 if (val.first == skip_client_key) {
123 continue;
124 }
125
126 cls::journal::Client client;
127 auto iter = val.second.cbegin();
128 try {
129 decode(client, iter);
130 } catch (const buffer::error &err) {
131 CLS_ERR("error decoding registered client: %s",
132 val.first.c_str());
133 return -EIO;
134 }
135
136 if (client.state == cls::journal::CLIENT_STATE_DISCONNECTED) {
137 // don't allow a disconnected client to prevent pruning
138 continue;
139 } else if (client.commit_position.object_positions.empty()) {
140 // cannot prune if one or more clients has an empty commit history
141 return 0;
142 }
143
144 for (auto object_position : client.commit_position.object_positions) {
145 minimum_tag_tid = std::min(minimum_tag_tid, object_position.tag_tid);
146 }
147 }
148 if (!vals.empty()) {
149 last_read = vals.rbegin()->first;
150 }
151 } while (more);
152
153 // cannot expire tags if a client hasn't committed yet
154 if (minimum_tag_tid == std::numeric_limits<uint64_t>::max()) {
155 return 0;
156 }
157
158 // compute the minimum in-use tag for each class
159 std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
160 typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
161 TAG_PASS_SCRUB,
162 TAG_PASS_DONE } TagPass;
163 int tag_pass = TAG_PASS_CALCULATE_MINIMUMS;
164 last_read = HEADER_KEY_TAG_PREFIX;
165 do {
166 std::map<std::string, bufferlist> vals;
167 int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
168 MAX_KEYS_READ, &vals, &more);
169 if (r < 0 && r != -ENOENT) {
170 CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
171 return r;
172 }
173
174 for (auto &val : vals) {
175 cls::journal::Tag tag;
176 auto iter = val.second.cbegin();
177 try {
178 decode(tag, iter);
179 } catch (const buffer::error &err) {
180 CLS_ERR("error decoding tag: %s", val.first.c_str());
181 return -EIO;
182 }
183
184 if (tag.tid != tag_tid_from_key(val.first)) {
185 CLS_ERR("tag tid mismatched: %s", val.first.c_str());
186 return -EINVAL;
187 }
188
189 if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
190 minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
191 } else if (tag_pass == TAG_PASS_SCRUB &&
192 tag.tid < minimum_tag_class_to_tids[tag.tag_class]) {
193 r = remove_key(hctx, val.first);
194 if (r < 0) {
195 return r;
196 }
197 }
198
199 if (tag.tid >= minimum_tag_tid) {
200 // no need to check for tag classes beyond this point
201 vals.clear();
202 more = false;
203 break;
204 }
205 }
206
207 if (tag_pass != TAG_PASS_DONE && !more) {
208 last_read = HEADER_KEY_TAG_PREFIX;
209 ++tag_pass;
210 } else if (!vals.empty()) {
211 last_read = vals.rbegin()->first;
212 }
213 } while (tag_pass != TAG_PASS_DONE);
214 return 0;
215 }
216
217 int get_client_list_range(cls_method_context_t hctx,
218 std::set<cls::journal::Client> *clients,
219 std::string start_after, uint64_t max_return) {
220 std::string last_read;
221 if (!start_after.empty()) {
222 last_read = key_from_client_id(start_after);
223 }
224
225 std::map<std::string, bufferlist> vals;
226 bool more;
227 int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
228 max_return, &vals, &more);
229 if (r < 0) {
230 CLS_ERR("failed to retrieve omap values: %s", cpp_strerror(r).c_str());
231 return r;
232 }
233
234 for (std::map<std::string, bufferlist>::iterator it = vals.begin();
235 it != vals.end(); ++it) {
236 try {
237 auto iter = it->second.cbegin();
238
239 cls::journal::Client client;
240 decode(client, iter);
241 clients->insert(client);
242 } catch (const buffer::error &err) {
243 CLS_ERR("could not decode client '%s': %s", it->first.c_str(),
244 err.what());
245 return -EIO;
246 }
247 }
248
249 return 0;
250 }
251
252 int find_min_commit_position(cls_method_context_t hctx,
253 cls::journal::ObjectSetPosition *minset) {
254 int r;
255 bool valid = false;
256 std::string start_after = "";
257 uint64_t tag_tid = 0, entry_tid = 0;
258
259 while (true) {
260 std::set<cls::journal::Client> batch;
261
262 r = get_client_list_range(hctx, &batch, start_after, cls::journal::JOURNAL_MAX_RETURN);
263 if ((r < 0) || batch.empty()) {
264 break;
265 }
266
267 start_after = batch.rbegin()->id;
268
269 // update the (minimum) commit position from this batch of clients
270 for(std::set<cls::journal::Client>::iterator it = batch.begin();
271 it != batch.end(); ++it) {
272 cls::journal::ObjectSetPosition object_set_position = (*it).commit_position;
273 if (object_set_position.object_positions.empty()) {
274 *minset = cls::journal::ObjectSetPosition();
275 break;
276 }
277 cls::journal::ObjectPosition first = object_set_position.object_positions.front();
278
279 // least tag_tid (or least entry_tid for matching tag_tid)
280 if (!valid || (tag_tid > first.tag_tid) || ((tag_tid == first.tag_tid) && (entry_tid > first.entry_tid))) {
281 tag_tid = first.tag_tid;
282 entry_tid = first.entry_tid;
283 *minset = cls::journal::ObjectSetPosition(object_set_position);
284 valid = true;
285 }
286 }
287
288 // got the last batch, we're done
289 if (batch.size() < cls::journal::JOURNAL_MAX_RETURN) {
290 break;
291 }
292 }
293
294 return r;
295 }
296
297 } // anonymous namespace
298
299 /**
300 * Input:
301 * @param order (uint8_t) - bits to shift to compute the object max size
302 * @param splay width (uint8_t) - number of active journal objects
303 *
304 * Output:
305 * @returns 0 on success, negative error code on failure
306 */
307 int journal_create(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
308 uint8_t order;
309 uint8_t splay_width;
310 int64_t pool_id;
311 try {
312 auto iter = in->cbegin();
313 decode(order, iter);
314 decode(splay_width, iter);
315 decode(pool_id, iter);
316 } catch (const buffer::error &err) {
317 CLS_ERR("failed to decode input parameters: %s", err.what());
318 return -EINVAL;
319 }
320
321 bufferlist stored_orderbl;
322 int r = cls_cxx_map_get_val(hctx, HEADER_KEY_ORDER, &stored_orderbl);
323 if (r >= 0) {
324 CLS_ERR("journal already exists");
325 return -EEXIST;
326 } else if (r != -ENOENT) {
327 return r;
328 }
329
330 r = write_key(hctx, HEADER_KEY_ORDER, order);
331 if (r < 0) {
332 return r;
333 }
334
335 r = write_key(hctx, HEADER_KEY_SPLAY_WIDTH, splay_width);
336 if (r < 0) {
337 return r;
338 }
339
340 r = write_key(hctx, HEADER_KEY_POOL_ID, pool_id);
341 if (r < 0) {
342 return r;
343 }
344
345 uint64_t object_set = 0;
346 r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set);
347 if (r < 0) {
348 return r;
349 }
350
351 r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set);
352 if (r < 0) {
353 return r;
354 }
355
356 uint64_t tag_id = 0;
357 r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_id);
358 if (r < 0) {
359 return r;
360 }
361
362 r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_id);
363 if (r < 0) {
364 return r;
365 }
366 return 0;
367 }
368
369 /**
370 * Input:
371 * none
372 *
373 * Output:
374 * order (uint8_t)
375 * @returns 0 on success, negative error code on failure
376 */
377 int journal_get_order(cls_method_context_t hctx, bufferlist *in,
378 bufferlist *out) {
379 uint8_t order;
380 int r = read_key(hctx, HEADER_KEY_ORDER, &order);
381 if (r < 0) {
382 return r;
383 }
384
385 encode(order, *out);
386 return 0;
387 }
388
389 /**
390 * Input:
391 * none
392 *
393 * Output:
394 * splay_width (uint8_t)
395 * @returns 0 on success, negative error code on failure
396 */
397 int journal_get_splay_width(cls_method_context_t hctx, bufferlist *in,
398 bufferlist *out) {
399 uint8_t splay_width;
400 int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width);
401 if (r < 0) {
402 return r;
403 }
404
405 encode(splay_width, *out);
406 return 0;
407 }
408
409 /**
410 * Input:
411 * none
412 *
413 * Output:
414 * pool_id (int64_t)
415 * @returns 0 on success, negative error code on failure
416 */
417 int journal_get_pool_id(cls_method_context_t hctx, bufferlist *in,
418 bufferlist *out) {
419 int64_t pool_id = 0;
420 int r = read_key(hctx, HEADER_KEY_POOL_ID, &pool_id);
421 if (r < 0) {
422 return r;
423 }
424
425 encode(pool_id, *out);
426 return 0;
427 }
428
429 /**
430 * Input:
431 * none
432 *
433 * Output:
434 * object set (uint64_t)
435 * @returns 0 on success, negative error code on failure
436 */
437 int journal_get_minimum_set(cls_method_context_t hctx, bufferlist *in,
438 bufferlist *out) {
439 uint64_t minimum_set;
440 int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &minimum_set);
441 if (r < 0) {
442 return r;
443 }
444
445 encode(minimum_set, *out);
446 return 0;
447 }
448
449 /**
450 * Input:
451 * @param object set (uint64_t)
452 *
453 * Output:
454 * @returns 0 on success, negative error code on failure
455 */
456 int journal_set_minimum_set(cls_method_context_t hctx, bufferlist *in,
457 bufferlist *out) {
458 uint64_t object_set;
459 try {
460 auto iter = in->cbegin();
461 decode(object_set, iter);
462 } catch (const buffer::error &err) {
463 CLS_ERR("failed to decode input parameters: %s", err.what());
464 return -EINVAL;
465 }
466
467 uint64_t current_active_set;
468 int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &current_active_set);
469 if (r < 0) {
470 return r;
471 }
472
473 if (current_active_set < object_set) {
474 CLS_LOG(10, "active object set earlier than minimum: %" PRIu64
475 " < %" PRIu64, current_active_set, object_set);
476 return -EINVAL;
477 }
478
479 uint64_t current_minimum_set;
480 r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &current_minimum_set);
481 if (r < 0) {
482 return r;
483 }
484
485 if (object_set == current_minimum_set) {
486 return 0;
487 } else if (object_set < current_minimum_set) {
488 CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64,
489 object_set, current_minimum_set);
490 return -ESTALE;
491 }
492
493 r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set);
494 if (r < 0) {
495 return r;
496 }
497 return 0;
498 }
499
500 /**
501 * Input:
502 * none
503 *
504 * Output:
505 * object set (uint64_t)
506 * @returns 0 on success, negative error code on failure
507 */
508 int journal_get_active_set(cls_method_context_t hctx, bufferlist *in,
509 bufferlist *out) {
510 uint64_t active_set;
511 int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &active_set);
512 if (r < 0) {
513 return r;
514 }
515
516 encode(active_set, *out);
517 return 0;
518 }
519
520 /**
521 * Input:
522 * @param object set (uint64_t)
523 *
524 * Output:
525 * @returns 0 on success, negative error code on failure
526 */
527 int journal_set_active_set(cls_method_context_t hctx, bufferlist *in,
528 bufferlist *out) {
529 uint64_t object_set;
530 try {
531 auto iter = in->cbegin();
532 decode(object_set, iter);
533 } catch (const buffer::error &err) {
534 CLS_ERR("failed to decode input parameters: %s", err.what());
535 return -EINVAL;
536 }
537
538 uint64_t current_minimum_set;
539 int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &current_minimum_set);
540 if (r < 0) {
541 return r;
542 }
543
544 if (current_minimum_set > object_set) {
545 CLS_ERR("minimum object set later than active: %" PRIu64
546 " > %" PRIu64, current_minimum_set, object_set);
547 return -EINVAL;
548 }
549
550 uint64_t current_active_set;
551 r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &current_active_set);
552 if (r < 0) {
553 return r;
554 }
555
556 if (object_set == current_active_set) {
557 return 0;
558 } else if (object_set < current_active_set) {
559 CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64,
560 object_set, current_active_set);
561 return -ESTALE;
562 }
563
564 r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set);
565 if (r < 0) {
566 return r;
567 }
568 return 0;
569 }
570
571 /**
572 * Input:
573 * @param id (string) - unique client id
574 *
575 * Output:
576 * cls::journal::Client
577 * @returns 0 on success, negative error code on failure
578 */
579 int journal_get_client(cls_method_context_t hctx, bufferlist *in,
580 bufferlist *out) {
581 std::string id;
582 try {
583 auto iter = in->cbegin();
584 decode(id, iter);
585 } catch (const buffer::error &err) {
586 CLS_ERR("failed to decode input parameters: %s", err.what());
587 return -EINVAL;
588 }
589
590 std::string key(key_from_client_id(id));
591 cls::journal::Client client;
592 int r = read_key(hctx, key, &client);
593 if (r < 0) {
594 return r;
595 }
596
597 encode(client, *out);
598 return 0;
599 }
600
601 /**
602 * Input:
603 * @param id (string) - unique client id
604 * @param data (bufferlist) - opaque data associated to client
605 *
606 * Output:
607 * @returns 0 on success, negative error code on failure
608 */
609 int journal_client_register(cls_method_context_t hctx, bufferlist *in,
610 bufferlist *out) {
611 std::string id;
612 bufferlist data;
613 try {
614 auto iter = in->cbegin();
615 decode(id, iter);
616 decode(data, iter);
617 } catch (const buffer::error &err) {
618 CLS_ERR("failed to decode input parameters: %s", err.what());
619 return -EINVAL;
620 }
621
622 uint8_t order;
623 int r = read_key(hctx, HEADER_KEY_ORDER, &order);
624 if (r < 0) {
625 return r;
626 }
627
628 std::string key(key_from_client_id(id));
629 bufferlist stored_clientbl;
630 r = cls_cxx_map_get_val(hctx, key, &stored_clientbl);
631 if (r >= 0) {
632 CLS_ERR("duplicate client id: %s", id.c_str());
633 return -EEXIST;
634 } else if (r != -ENOENT) {
635 return r;
636 }
637
638 cls::journal::ObjectSetPosition minset;
639 r = find_min_commit_position(hctx, &minset);
640 if (r < 0)
641 return r;
642
643 cls::journal::Client client(id, data, minset);
644 r = write_key(hctx, key, client);
645 if (r < 0) {
646 return r;
647 }
648 return 0;
649 }
650
651 /**
652 * Input:
653 * @param id (string) - unique client id
654 * @param data (bufferlist) - opaque data associated to client
655 *
656 * Output:
657 * @returns 0 on success, negative error code on failure
658 */
659 int journal_client_update_data(cls_method_context_t hctx, bufferlist *in,
660 bufferlist *out) {
661 std::string id;
662 bufferlist data;
663 try {
664 auto iter = in->cbegin();
665 decode(id, iter);
666 decode(data, iter);
667 } catch (const buffer::error &err) {
668 CLS_ERR("failed to decode input parameters: %s", err.what());
669 return -EINVAL;
670 }
671
672 std::string key(key_from_client_id(id));
673 cls::journal::Client client;
674 int r = read_key(hctx, key, &client);
675 if (r < 0) {
676 return r;
677 }
678
679 client.data = data;
680 r = write_key(hctx, key, client);
681 if (r < 0) {
682 return r;
683 }
684 return 0;
685 }
686
687 /**
688 * Input:
689 * @param id (string) - unique client id
690 * @param state (uint8_t) - client state
691 *
692 * Output:
693 * @returns 0 on success, negative error code on failure
694 */
695 int journal_client_update_state(cls_method_context_t hctx, bufferlist *in,
696 bufferlist *out) {
697 std::string id;
698 cls::journal::ClientState state;
699 bufferlist data;
700 try {
701 auto iter = in->cbegin();
702 decode(id, iter);
703 uint8_t state_raw;
704 decode(state_raw, iter);
705 state = static_cast<cls::journal::ClientState>(state_raw);
706 } catch (const buffer::error &err) {
707 CLS_ERR("failed to decode input parameters: %s", err.what());
708 return -EINVAL;
709 }
710
711 std::string key(key_from_client_id(id));
712 cls::journal::Client client;
713 int r = read_key(hctx, key, &client);
714 if (r < 0) {
715 return r;
716 }
717
718 client.state = state;
719 r = write_key(hctx, key, client);
720 if (r < 0) {
721 return r;
722 }
723 return 0;
724 }
725
726 /**
727 * Input:
728 * @param id (string) - unique client id
729 *
730 * Output:
731 * @returns 0 on success, negative error code on failure
732 */
733 int journal_client_unregister(cls_method_context_t hctx, bufferlist *in,
734 bufferlist *out) {
735 std::string id;
736 try {
737 auto iter = in->cbegin();
738 decode(id, iter);
739 } catch (const buffer::error &err) {
740 CLS_ERR("failed to decode input parameters: %s", err.what());
741 return -EINVAL;
742 }
743
744 std::string key(key_from_client_id(id));
745 bufferlist bl;
746 int r = cls_cxx_map_get_val(hctx, key, &bl);
747 if (r < 0) {
748 CLS_ERR("client is not registered: %s", id.c_str());
749 return r;
750 }
751
752 r = cls_cxx_map_remove_key(hctx, key);
753 if (r < 0) {
754 CLS_ERR("failed to remove omap key: %s", key.c_str());
755 return r;
756 }
757
758 // prune expired tags
759 r = expire_tags(hctx, &id);
760 if (r < 0) {
761 return r;
762 }
763 return 0;
764 }
765
766 /**
767 * Input:
768 * @param client_id (uint64_t) - unique client id
769 * @param commit_position (ObjectSetPosition)
770 *
771 * Output:
772 * @returns 0 on success, negative error code on failure
773 */
774 int journal_client_commit(cls_method_context_t hctx, bufferlist *in,
775 bufferlist *out) {
776 std::string id;
777 cls::journal::ObjectSetPosition commit_position;
778 try {
779 auto iter = in->cbegin();
780 decode(id, iter);
781 decode(commit_position, iter);
782 } catch (const buffer::error &err) {
783 CLS_ERR("failed to decode input parameters: %s", err.what());
784 return -EINVAL;
785 }
786
787 uint8_t splay_width;
788 int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width);
789 if (r < 0) {
790 return r;
791 }
792 if (commit_position.object_positions.size() > splay_width) {
793 CLS_ERR("too many object positions");
794 return -EINVAL;
795 }
796
797 std::string key(key_from_client_id(id));
798 cls::journal::Client client;
799 r = read_key(hctx, key, &client);
800 if (r < 0) {
801 return r;
802 }
803
804 if (client.commit_position == commit_position) {
805 return 0;
806 }
807
808 client.commit_position = commit_position;
809 r = write_key(hctx, key, client);
810 if (r < 0) {
811 return r;
812 }
813 return 0;
814 }
815
816 /**
817 * Input:
818 * @param start_after (string)
819 * @param max_return (uint64_t)
820 *
821 * Output:
822 * clients (set<cls::journal::Client>) - collection of registered clients
823 * @returns 0 on success, negative error code on failure
824 */
825 int journal_client_list(cls_method_context_t hctx, bufferlist *in,
826 bufferlist *out) {
827 std::string start_after;
828 uint64_t max_return;
829 try {
830 auto iter = in->cbegin();
831 decode(start_after, iter);
832 decode(max_return, iter);
833 } catch (const buffer::error &err) {
834 CLS_ERR("failed to decode input parameters: %s", err.what());
835 return -EINVAL;
836 }
837
838 std::set<cls::journal::Client> clients;
839 int r = get_client_list_range(hctx, &clients, start_after, max_return);
840 if (r < 0)
841 return r;
842
843 encode(clients, *out);
844 return 0;
845 }
846
847 /**
848 * Input:
849 * none
850 *
851 * Output:
852 * @returns 0 on success, negative error code on failure
853 */
854 int journal_get_next_tag_tid(cls_method_context_t hctx, bufferlist *in,
855 bufferlist *out) {
856 uint64_t tag_tid;
857 int r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &tag_tid);
858 if (r < 0) {
859 return r;
860 }
861
862 encode(tag_tid, *out);
863 return 0;
864 }
865
866 /**
867 * Input:
868 * @param tag_tid (uint64_t)
869 *
870 * Output:
871 * cls::journal::Tag
872 * @returns 0 on success, negative error code on failure
873 */
874 int journal_get_tag(cls_method_context_t hctx, bufferlist *in,
875 bufferlist *out) {
876 uint64_t tag_tid;
877 try {
878 auto iter = in->cbegin();
879 decode(tag_tid, iter);
880 } catch (const buffer::error &err) {
881 CLS_ERR("failed to decode input parameters: %s", err.what());
882 return -EINVAL;
883 }
884
885 std::string key(key_from_tag_tid(tag_tid));
886 cls::journal::Tag tag;
887 int r = read_key(hctx, key, &tag);
888 if (r < 0) {
889 return r;
890 }
891
892 encode(tag, *out);
893 return 0;
894 }
895
896 /**
897 * Input:
898 * @param tag_tid (uint64_t)
899 * @param tag_class (uint64_t)
900 * @param data (bufferlist)
901 *
902 * Output:
903 * @returns 0 on success, negative error code on failure
904 */
905 int journal_tag_create(cls_method_context_t hctx, bufferlist *in,
906 bufferlist *out) {
907 uint64_t tag_tid;
908 uint64_t tag_class;
909 bufferlist data;
910 try {
911 auto iter = in->cbegin();
912 decode(tag_tid, iter);
913 decode(tag_class, iter);
914 decode(data, iter);
915 } catch (const buffer::error &err) {
916 CLS_ERR("failed to decode input parameters: %s", err.what());
917 return -EINVAL;
918 }
919
920 std::string key(key_from_tag_tid(tag_tid));
921 bufferlist stored_tag_bl;
922 int r = cls_cxx_map_get_val(hctx, key, &stored_tag_bl);
923 if (r >= 0) {
924 CLS_ERR("duplicate tag id: %" PRIu64, tag_tid);
925 return -EEXIST;
926 } else if (r != -ENOENT) {
927 return r;
928 }
929
930 // verify tag tid ordering
931 uint64_t next_tag_tid;
932 r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &next_tag_tid);
933 if (r < 0) {
934 return r;
935 }
936 if (tag_tid != next_tag_tid) {
937 CLS_LOG(5, "out-of-order tag sequence: %" PRIu64, tag_tid);
938 return -ESTALE;
939 }
940
941 uint64_t next_tag_class;
942 r = read_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, &next_tag_class);
943 if (r < 0) {
944 return r;
945 }
946
947 if (tag_class == cls::journal::Tag::TAG_CLASS_NEW) {
948 // allocate a new tag class
949 tag_class = next_tag_class;
950 r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_class + 1);
951 if (r < 0) {
952 return r;
953 }
954 } else {
955 // verify tag class range
956 if (tag_class >= next_tag_class) {
957 CLS_ERR("out-of-sequence tag class: %" PRIu64, tag_class);
958 return -EINVAL;
959 }
960 }
961
962 // prune expired tags
963 r = expire_tags(hctx, nullptr);
964 if (r < 0) {
965 return r;
966 }
967
968 // update tag tid sequence
969 r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_tid + 1);
970 if (r < 0) {
971 return r;
972 }
973
974 // write tag structure
975 cls::journal::Tag tag(tag_tid, tag_class, data);
976 key = key_from_tag_tid(tag_tid);
977 r = write_key(hctx, key, tag);
978 if (r < 0) {
979 return r;
980 }
981 return 0;
982 }
983
984 /**
985 * Input:
986 * @param start_after_tag_tid (uint64_t) - first tag tid
987 * @param max_return (uint64_t) - max tags to return
988 * @param client_id (std::string) - client id filter
989 * @param tag_class (boost::optional<uint64_t> - optional tag class filter
990 *
991 * Output:
992 * std::set<cls::journal::Tag> - collection of tags
993 * @returns 0 on success, negative error code on failure
994 */
995 int journal_tag_list(cls_method_context_t hctx, bufferlist *in,
996 bufferlist *out) {
997 uint64_t start_after_tag_tid;
998 uint64_t max_return;
999 std::string client_id;
1000 boost::optional<uint64_t> tag_class(0);
1001
1002 // handle compiler false positive about use-before-init
1003 tag_class = boost::none;
1004 try {
1005 auto iter = in->cbegin();
1006 decode(start_after_tag_tid, iter);
1007 decode(max_return, iter);
1008 decode(client_id, iter);
1009 decode(tag_class, iter);
1010 } catch (const buffer::error &err) {
1011 CLS_ERR("failed to decode input parameters: %s", err.what());
1012 return -EINVAL;
1013 }
1014
1015 // calculate the minimum tag within client's commit position
1016 uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
1017 cls::journal::Client client;
1018 int r = read_key(hctx, key_from_client_id(client_id), &client);
1019 if (r < 0) {
1020 return r;
1021 }
1022
1023 for (auto object_position : client.commit_position.object_positions) {
1024 minimum_tag_tid = std::min(minimum_tag_tid, object_position.tag_tid);
1025 }
1026
1027 // compute minimum tags in use per-class
1028 std::set<cls::journal::Tag> tags;
1029 std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
1030 typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
1031 TAG_PASS_LIST,
1032 TAG_PASS_DONE } TagPass;
1033 int tag_pass = (minimum_tag_tid == std::numeric_limits<uint64_t>::max() ?
1034 TAG_PASS_LIST : TAG_PASS_CALCULATE_MINIMUMS);
1035 std::string last_read = HEADER_KEY_TAG_PREFIX;
1036 do {
1037 std::map<std::string, bufferlist> vals;
1038 bool more;
1039 r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
1040 MAX_KEYS_READ, &vals, &more);
1041 if (r < 0 && r != -ENOENT) {
1042 CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
1043 return r;
1044 }
1045
1046 for (auto &val : vals) {
1047 cls::journal::Tag tag;
1048 auto iter = val.second.cbegin();
1049 try {
1050 decode(tag, iter);
1051 } catch (const buffer::error &err) {
1052 CLS_ERR("error decoding tag: %s", val.first.c_str());
1053 return -EIO;
1054 }
1055
1056 if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
1057 minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
1058
1059 // completed calculation of tag class minimums
1060 if (tag.tid >= minimum_tag_tid) {
1061 vals.clear();
1062 more = false;
1063 break;
1064 }
1065 } else if (tag_pass == TAG_PASS_LIST) {
1066 if (start_after_tag_tid != 0 && tag.tid <= start_after_tag_tid) {
1067 continue;
1068 }
1069
1070 if (tag.tid >= minimum_tag_class_to_tids[tag.tag_class] &&
1071 (!tag_class || *tag_class == tag.tag_class)) {
1072 tags.insert(tag);
1073 }
1074 if (tags.size() >= max_return) {
1075 tag_pass = TAG_PASS_DONE;
1076 }
1077 }
1078 }
1079
1080 if (tag_pass != TAG_PASS_DONE && !more) {
1081 last_read = HEADER_KEY_TAG_PREFIX;
1082 ++tag_pass;
1083 } else if (!vals.empty()) {
1084 last_read = vals.rbegin()->first;
1085 }
1086 } while (tag_pass != TAG_PASS_DONE);
1087
1088 encode(tags, *out);
1089 return 0;
1090 }
1091
1092 /**
1093 * Input:
1094 * @param soft_max_size (uint64_t)
1095 *
1096 * Output:
1097 * @returns 0 if object size less than max, negative error code otherwise
1098 */
1099 int journal_object_guard_append(cls_method_context_t hctx, bufferlist *in,
1100 bufferlist *out) {
1101 uint64_t soft_max_size;
1102 try {
1103 auto iter = in->cbegin();
1104 decode(soft_max_size, iter);
1105 } catch (const buffer::error &err) {
1106 CLS_ERR("failed to decode input parameters: %s", err.what());
1107 return -EINVAL;
1108 }
1109
1110 uint64_t size;
1111 time_t mtime;
1112 int r = cls_cxx_stat(hctx, &size, &mtime);
1113 if (r == -ENOENT) {
1114 return 0;
1115 } else if (r < 0) {
1116 CLS_ERR("failed to stat object: %s", cpp_strerror(r).c_str());
1117 return r;
1118 }
1119
1120 if (size >= soft_max_size) {
1121 CLS_LOG(5, "journal object full: %" PRIu64 " >= %" PRIu64,
1122 size, soft_max_size);
1123 return -EOVERFLOW;
1124 }
1125 return 0;
1126 }
1127
1128 CLS_INIT(journal)
1129 {
1130 CLS_LOG(20, "Loaded journal class!");
1131
1132 cls_handle_t h_class;
1133 cls_method_handle_t h_journal_create;
1134 cls_method_handle_t h_journal_get_order;
1135 cls_method_handle_t h_journal_get_splay_width;
1136 cls_method_handle_t h_journal_get_pool_id;
1137 cls_method_handle_t h_journal_get_minimum_set;
1138 cls_method_handle_t h_journal_set_minimum_set;
1139 cls_method_handle_t h_journal_get_active_set;
1140 cls_method_handle_t h_journal_set_active_set;
1141 cls_method_handle_t h_journal_get_client;
1142 cls_method_handle_t h_journal_client_register;
1143 cls_method_handle_t h_journal_client_update_data;
1144 cls_method_handle_t h_journal_client_update_state;
1145 cls_method_handle_t h_journal_client_unregister;
1146 cls_method_handle_t h_journal_client_commit;
1147 cls_method_handle_t h_journal_client_list;
1148 cls_method_handle_t h_journal_get_next_tag_tid;
1149 cls_method_handle_t h_journal_get_tag;
1150 cls_method_handle_t h_journal_tag_create;
1151 cls_method_handle_t h_journal_tag_list;
1152 cls_method_handle_t h_journal_object_guard_append;
1153
1154 cls_register("journal", &h_class);
1155
1156 /// methods for journal.$journal_id objects
1157 cls_register_cxx_method(h_class, "create",
1158 CLS_METHOD_RD | CLS_METHOD_WR,
1159 journal_create, &h_journal_create);
1160 cls_register_cxx_method(h_class, "get_order",
1161 CLS_METHOD_RD,
1162 journal_get_order, &h_journal_get_order);
1163 cls_register_cxx_method(h_class, "get_splay_width",
1164 CLS_METHOD_RD,
1165 journal_get_splay_width, &h_journal_get_splay_width);
1166 cls_register_cxx_method(h_class, "get_pool_id",
1167 CLS_METHOD_RD,
1168 journal_get_pool_id, &h_journal_get_pool_id);
1169 cls_register_cxx_method(h_class, "get_minimum_set",
1170 CLS_METHOD_RD,
1171 journal_get_minimum_set,
1172 &h_journal_get_minimum_set);
1173 cls_register_cxx_method(h_class, "set_minimum_set",
1174 CLS_METHOD_RD | CLS_METHOD_WR,
1175 journal_set_minimum_set,
1176 &h_journal_set_minimum_set);
1177 cls_register_cxx_method(h_class, "get_active_set",
1178 CLS_METHOD_RD,
1179 journal_get_active_set,
1180 &h_journal_get_active_set);
1181 cls_register_cxx_method(h_class, "set_active_set",
1182 CLS_METHOD_RD | CLS_METHOD_WR,
1183 journal_set_active_set,
1184 &h_journal_set_active_set);
1185
1186 cls_register_cxx_method(h_class, "get_client",
1187 CLS_METHOD_RD,
1188 journal_get_client, &h_journal_get_client);
1189 cls_register_cxx_method(h_class, "client_register",
1190 CLS_METHOD_RD | CLS_METHOD_WR,
1191 journal_client_register, &h_journal_client_register);
1192 cls_register_cxx_method(h_class, "client_update_data",
1193 CLS_METHOD_RD | CLS_METHOD_WR,
1194 journal_client_update_data,
1195 &h_journal_client_update_data);
1196 cls_register_cxx_method(h_class, "client_update_state",
1197 CLS_METHOD_RD | CLS_METHOD_WR,
1198 journal_client_update_state,
1199 &h_journal_client_update_state);
1200 cls_register_cxx_method(h_class, "client_unregister",
1201 CLS_METHOD_RD | CLS_METHOD_WR,
1202 journal_client_unregister,
1203 &h_journal_client_unregister);
1204 cls_register_cxx_method(h_class, "client_commit",
1205 CLS_METHOD_RD | CLS_METHOD_WR,
1206 journal_client_commit, &h_journal_client_commit);
1207 cls_register_cxx_method(h_class, "client_list",
1208 CLS_METHOD_RD,
1209 journal_client_list, &h_journal_client_list);
1210
1211 cls_register_cxx_method(h_class, "get_next_tag_tid",
1212 CLS_METHOD_RD,
1213 journal_get_next_tag_tid,
1214 &h_journal_get_next_tag_tid);
1215 cls_register_cxx_method(h_class, "get_tag",
1216 CLS_METHOD_RD,
1217 journal_get_tag, &h_journal_get_tag);
1218 cls_register_cxx_method(h_class, "tag_create",
1219 CLS_METHOD_RD | CLS_METHOD_WR,
1220 journal_tag_create, &h_journal_tag_create);
1221 cls_register_cxx_method(h_class, "tag_list",
1222 CLS_METHOD_RD,
1223 journal_tag_list, &h_journal_tag_list);
1224
1225 /// methods for journal_data.$journal_id.$object_id objects
1226 cls_register_cxx_method(h_class, "guard_append",
1227 CLS_METHOD_RD | CLS_METHOD_WR,
1228 journal_object_guard_append,
1229 &h_journal_object_guard_append);
1230 }