]> git.proxmox.com Git - ceph.git/blame - ceph/src/cls/journal/cls_journal.cc
update sources to v12.1.2
[ceph.git] / ceph / src / cls / journal / cls_journal.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "include/int_types.h"
5#include "include/buffer.h"
6#include "include/encoding.h"
7#include "common/errno.h"
8#include "objclass/objclass.h"
9#include "cls/journal/cls_journal_types.h"
10#include <errno.h>
11#include <map>
12#include <string>
13#include <sstream>
14
15CLS_VER(1, 0)
16CLS_NAME(journal)
17
18namespace {
19
20static const uint64_t MAX_KEYS_READ = 64;
21
22static const std::string HEADER_KEY_ORDER = "order";
23static const std::string HEADER_KEY_SPLAY_WIDTH = "splay_width";
24static const std::string HEADER_KEY_POOL_ID = "pool_id";
25static const std::string HEADER_KEY_MINIMUM_SET = "minimum_set";
26static const std::string HEADER_KEY_ACTIVE_SET = "active_set";
27static const std::string HEADER_KEY_NEXT_TAG_TID = "next_tag_tid";
28static const std::string HEADER_KEY_NEXT_TAG_CLASS = "next_tag_class";
29static const std::string HEADER_KEY_CLIENT_PREFIX = "client_";
30static const std::string HEADER_KEY_TAG_PREFIX = "tag_";
31
32std::string to_hex(uint64_t value) {
33 std::ostringstream oss;
34 oss << std::setw(16) << std::setfill('0') << std::hex << value;
35 return oss.str();
36}
37
38std::string key_from_client_id(const std::string &client_id) {
39 return HEADER_KEY_CLIENT_PREFIX + client_id;
40}
41
42std::string key_from_tag_tid(uint64_t tag_tid) {
43 return HEADER_KEY_TAG_PREFIX + to_hex(tag_tid);
44}
45
46uint64_t tag_tid_from_key(const std::string &key) {
47 std::istringstream iss(key);
48 uint64_t id;
49 iss.ignore(HEADER_KEY_TAG_PREFIX.size()) >> std::hex >> id;
50 return id;
51}
52
53template <typename T>
54int read_key(cls_method_context_t hctx, const string &key, T *t,
55 bool ignore_enoent = false) {
56 bufferlist bl;
57 int r = cls_cxx_map_get_val(hctx, key, &bl);
58 if (r == -ENOENT && ignore_enoent) {
59 return 0;
60 } else if (r < 0) {
61 CLS_ERR("failed to get omap key: %s", key.c_str());
62 return r;
63 }
64
65 try {
66 bufferlist::iterator iter = bl.begin();
67 ::decode(*t, iter);
68 } catch (const buffer::error &err) {
69 CLS_ERR("failed to decode input parameters: %s", err.what());
70 return -EINVAL;
71 }
72 return 0;
73}
74
75template <typename T>
76int write_key(cls_method_context_t hctx, const string &key, const T &t) {
77 bufferlist bl;
78 ::encode(t, bl);
79
80 int r = cls_cxx_map_set_val(hctx, key, &bl);
81 if (r < 0) {
82 CLS_ERR("failed to set omap key: %s", key.c_str());
83 return r;
84 }
85 return 0;
86}
87
88int remove_key(cls_method_context_t hctx, const string &key) {
89 int r = cls_cxx_map_remove_key(hctx, key);
90 if (r < 0 && r != -ENOENT) {
91 CLS_ERR("failed to remove key: %s", key.c_str());
92 return r;
93 }
94 return 0;
95}
96
97int expire_tags(cls_method_context_t hctx, const std::string *skip_client_id) {
98
99 std::string skip_client_key;
100 if (skip_client_id != nullptr) {
101 skip_client_key = key_from_client_id(*skip_client_id);
102 }
103
7c673cae
FG
104 uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
105 std::string last_read = HEADER_KEY_CLIENT_PREFIX;
c07f9fc5 106 bool more;
7c673cae
FG
107 do {
108 std::map<std::string, bufferlist> vals;
c07f9fc5
FG
109 int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
110 MAX_KEYS_READ, &vals, &more);
7c673cae
FG
111 if (r < 0 && r != -ENOENT) {
112 CLS_ERR("failed to retrieve registered clients: %s",
113 cpp_strerror(r).c_str());
114 return r;
115 }
116
117 for (auto &val : vals) {
118 // if we are removing a client, skip its commit positions
119 if (val.first == skip_client_key) {
120 continue;
121 }
122
123 cls::journal::Client client;
124 bufferlist::iterator iter = val.second.begin();
125 try {
126 ::decode(client, iter);
127 } catch (const buffer::error &err) {
128 CLS_ERR("error decoding registered client: %s",
129 val.first.c_str());
130 return -EIO;
131 }
132
133 for (auto object_position : client.commit_position.object_positions) {
134 minimum_tag_tid = MIN(minimum_tag_tid, object_position.tag_tid);
135 }
136 }
137 if (!vals.empty()) {
138 last_read = vals.rbegin()->first;
139 }
c07f9fc5 140 } while (more);
7c673cae
FG
141
142 // cannot expire tags if a client hasn't committed yet
143 if (minimum_tag_tid == std::numeric_limits<uint64_t>::max()) {
144 return 0;
145 }
146
147 // compute the minimum in-use tag for each class
148 std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
149 typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
150 TAG_PASS_SCRUB,
151 TAG_PASS_DONE } TagPass;
152 int tag_pass = TAG_PASS_CALCULATE_MINIMUMS;
153 last_read = HEADER_KEY_TAG_PREFIX;
154 do {
155 std::map<std::string, bufferlist> vals;
c07f9fc5
FG
156 int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
157 MAX_KEYS_READ, &vals, &more);
7c673cae
FG
158 if (r < 0 && r != -ENOENT) {
159 CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
160 return r;
161 }
162
163 for (auto &val : vals) {
164 cls::journal::Tag tag;
165 bufferlist::iterator iter = val.second.begin();
166 try {
167 ::decode(tag, iter);
168 } catch (const buffer::error &err) {
169 CLS_ERR("error decoding tag: %s", val.first.c_str());
170 return -EIO;
171 }
172
173 if (tag.tid != tag_tid_from_key(val.first)) {
174 CLS_ERR("tag tid mismatched: %s", val.first.c_str());
175 return -EINVAL;
176 }
177
178 if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
179 minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
180 } else if (tag_pass == TAG_PASS_SCRUB &&
181 tag.tid < minimum_tag_class_to_tids[tag.tag_class]) {
182 r = remove_key(hctx, val.first);
183 if (r < 0) {
184 return r;
185 }
186 }
187
188 if (tag.tid >= minimum_tag_tid) {
189 // no need to check for tag classes beyond this point
190 vals.clear();
191 break;
192 }
193 }
194
c07f9fc5 195 if (tag_pass != TAG_PASS_DONE && !more) {
7c673cae
FG
196 last_read = HEADER_KEY_TAG_PREFIX;
197 ++tag_pass;
198 } else if (!vals.empty()) {
199 last_read = vals.rbegin()->first;
200 }
201 } while (tag_pass != TAG_PASS_DONE);
202 return 0;
203}
204
205int get_client_list_range(cls_method_context_t hctx,
206 std::set<cls::journal::Client> *clients,
207 std::string start_after, uint64_t max_return) {
208 std::string last_read;
209 if (!start_after.empty()) {
210 last_read = key_from_client_id(start_after);
211 }
212
213 std::map<std::string, bufferlist> vals;
c07f9fc5 214 bool more;
7c673cae 215 int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
c07f9fc5 216 max_return, &vals, &more);
7c673cae
FG
217 if (r < 0) {
218 CLS_ERR("failed to retrieve omap values: %s", cpp_strerror(r).c_str());
219 return r;
220 }
221
222 for (std::map<std::string, bufferlist>::iterator it = vals.begin();
223 it != vals.end(); ++it) {
224 try {
225 bufferlist::iterator iter = it->second.begin();
226
227 cls::journal::Client client;
228 ::decode(client, iter);
229 clients->insert(client);
230 } catch (const buffer::error &err) {
231 CLS_ERR("could not decode client '%s': %s", it->first.c_str(),
232 err.what());
233 return -EIO;
234 }
235 }
236
237 return 0;
238}
239
240int find_min_commit_position(cls_method_context_t hctx,
241 cls::journal::ObjectSetPosition *minset) {
242 int r;
243 bool valid = false;
244 std::string start_after = "";
245 uint64_t tag_tid = 0, entry_tid = 0;
246
247 while (true) {
248 std::set<cls::journal::Client> batch;
249
250 r = get_client_list_range(hctx, &batch, start_after, cls::journal::JOURNAL_MAX_RETURN);
251 if ((r < 0) || batch.empty()) {
252 break;
253 }
254
255 start_after = batch.rbegin()->id;
256
257 // update the (minimum) commit position from this batch of clients
258 for(std::set<cls::journal::Client>::iterator it = batch.begin();
259 it != batch.end(); ++it) {
260 cls::journal::ObjectSetPosition object_set_position = (*it).commit_position;
261 if (object_set_position.object_positions.empty()) {
262 *minset = cls::journal::ObjectSetPosition();
263 break;
264 }
265 cls::journal::ObjectPosition first = object_set_position.object_positions.front();
266
267 // least tag_tid (or least entry_tid for matching tag_tid)
268 if (!valid || (tag_tid > first.tag_tid) || ((tag_tid == first.tag_tid) && (entry_tid > first.entry_tid))) {
269 tag_tid = first.tag_tid;
270 entry_tid = first.entry_tid;
271 *minset = cls::journal::ObjectSetPosition(object_set_position);
272 valid = true;
273 }
274 }
275
276 // got the last batch, we're done
277 if (batch.size() < cls::journal::JOURNAL_MAX_RETURN) {
278 break;
279 }
280 }
281
282 return r;
283}
284
285} // anonymous namespace
286
287/**
288 * Input:
289 * @param order (uint8_t) - bits to shift to compute the object max size
290 * @param splay width (uint8_t) - number of active journal objects
291 *
292 * Output:
293 * @returns 0 on success, negative error code on failure
294 */
295int journal_create(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
296 uint8_t order;
297 uint8_t splay_width;
298 int64_t pool_id;
299 try {
300 bufferlist::iterator iter = in->begin();
301 ::decode(order, iter);
302 ::decode(splay_width, iter);
303 ::decode(pool_id, iter);
304 } catch (const buffer::error &err) {
305 CLS_ERR("failed to decode input parameters: %s", err.what());
306 return -EINVAL;
307 }
308
309 bufferlist stored_orderbl;
310 int r = cls_cxx_map_get_val(hctx, HEADER_KEY_ORDER, &stored_orderbl);
311 if (r >= 0) {
312 CLS_ERR("journal already exists");
313 return -EEXIST;
314 } else if (r != -ENOENT) {
315 return r;
316 }
317
318 r = write_key(hctx, HEADER_KEY_ORDER, order);
319 if (r < 0) {
320 return r;
321 }
322
323 r = write_key(hctx, HEADER_KEY_SPLAY_WIDTH, splay_width);
324 if (r < 0) {
325 return r;
326 }
327
328 r = write_key(hctx, HEADER_KEY_POOL_ID, pool_id);
329 if (r < 0) {
330 return r;
331 }
332
333 uint64_t object_set = 0;
334 r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set);
335 if (r < 0) {
336 return r;
337 }
338
339 r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set);
340 if (r < 0) {
341 return r;
342 }
343
344 uint64_t tag_id = 0;
345 r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_id);
346 if (r < 0) {
347 return r;
348 }
349
350 r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_id);
351 if (r < 0) {
352 return r;
353 }
354 return 0;
355}
356
357/**
358 * Input:
359 * none
360 *
361 * Output:
362 * order (uint8_t)
363 * @returns 0 on success, negative error code on failure
364 */
365int journal_get_order(cls_method_context_t hctx, bufferlist *in,
366 bufferlist *out) {
367 uint8_t order;
368 int r = read_key(hctx, HEADER_KEY_ORDER, &order);
369 if (r < 0) {
370 return r;
371 }
372
373 ::encode(order, *out);
374 return 0;
375}
376
377/**
378 * Input:
379 * none
380 *
381 * Output:
382 * order (uint8_t)
383 * @returns 0 on success, negative error code on failure
384 */
385int journal_get_splay_width(cls_method_context_t hctx, bufferlist *in,
386 bufferlist *out) {
387 uint8_t splay_width;
388 int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width);
389 if (r < 0) {
390 return r;
391 }
392
393 ::encode(splay_width, *out);
394 return 0;
395}
396
397/**
398 * Input:
399 * none
400 *
401 * Output:
402 * pool_id (int64_t)
403 * @returns 0 on success, negative error code on failure
404 */
405int journal_get_pool_id(cls_method_context_t hctx, bufferlist *in,
406 bufferlist *out) {
407 int64_t pool_id;
408 int r = read_key(hctx, HEADER_KEY_POOL_ID, &pool_id);
409 if (r < 0) {
410 return r;
411 }
412
413 ::encode(pool_id, *out);
414 return 0;
415}
416
417/**
418 * Input:
419 * none
420 *
421 * Output:
422 * object set (uint64_t)
423 * @returns 0 on success, negative error code on failure
424 */
425int journal_get_minimum_set(cls_method_context_t hctx, bufferlist *in,
426 bufferlist *out) {
427 uint64_t minimum_set;
428 int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &minimum_set);
429 if (r < 0) {
430 return r;
431 }
432
433 ::encode(minimum_set, *out);
434 return 0;
435}
436
437/**
438 * Input:
439 * @param object set (uint64_t)
440 *
441 * Output:
442 * @returns 0 on success, negative error code on failure
443 */
444int journal_set_minimum_set(cls_method_context_t hctx, bufferlist *in,
445 bufferlist *out) {
446 uint64_t object_set;
447 try {
448 bufferlist::iterator iter = in->begin();
449 ::decode(object_set, iter);
450 } catch (const buffer::error &err) {
451 CLS_ERR("failed to decode input parameters: %s", err.what());
452 return -EINVAL;
453 }
454
455 uint64_t current_active_set;
456 int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &current_active_set);
457 if (r < 0) {
458 return r;
459 }
460
461 if (current_active_set < object_set) {
462 CLS_ERR("active object set earlier than minimum: %" PRIu64
463 " < %" PRIu64, current_active_set, object_set);
464 return -EINVAL;
465 }
466
467 uint64_t current_minimum_set;
468 r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &current_minimum_set);
469 if (r < 0) {
470 return r;
471 }
472
473 if (object_set == current_minimum_set) {
474 return 0;
475 } else if (object_set < current_minimum_set) {
476 CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64,
477 object_set, current_minimum_set);
478 return -ESTALE;
479 }
480
481 r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set);
482 if (r < 0) {
483 return r;
484 }
485 return 0;
486}
487
488/**
489 * Input:
490 * none
491 *
492 * Output:
493 * object set (uint64_t)
494 * @returns 0 on success, negative error code on failure
495 */
496int journal_get_active_set(cls_method_context_t hctx, bufferlist *in,
497 bufferlist *out) {
498 uint64_t active_set;
499 int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &active_set);
500 if (r < 0) {
501 return r;
502 }
503
504 ::encode(active_set, *out);
505 return 0;
506}
507
508/**
509 * Input:
510 * @param object set (uint64_t)
511 *
512 * Output:
513 * @returns 0 on success, negative error code on failure
514 */
515int journal_set_active_set(cls_method_context_t hctx, bufferlist *in,
516 bufferlist *out) {
517 uint64_t object_set;
518 try {
519 bufferlist::iterator iter = in->begin();
520 ::decode(object_set, iter);
521 } catch (const buffer::error &err) {
522 CLS_ERR("failed to decode input parameters: %s", err.what());
523 return -EINVAL;
524 }
525
526 uint64_t current_minimum_set;
527 int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &current_minimum_set);
528 if (r < 0) {
529 return r;
530 }
531
532 if (current_minimum_set > object_set) {
533 CLS_ERR("minimum object set later than active: %" PRIu64
534 " > %" PRIu64, current_minimum_set, object_set);
535 return -EINVAL;
536 }
537
538 uint64_t current_active_set;
539 r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &current_active_set);
540 if (r < 0) {
541 return r;
542 }
543
544 if (object_set == current_active_set) {
545 return 0;
546 } else if (object_set < current_active_set) {
547 CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64,
548 object_set, current_active_set);
549 return -ESTALE;
550 }
551
552 r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set);
553 if (r < 0) {
554 return r;
555 }
556 return 0;
557}
558
559/**
560 * Input:
561 * @param id (string) - unique client id
562 *
563 * Output:
564 * cls::journal::Client
565 * @returns 0 on success, negative error code on failure
566 */
567int journal_get_client(cls_method_context_t hctx, bufferlist *in,
568 bufferlist *out) {
569 std::string id;
570 try {
571 bufferlist::iterator iter = in->begin();
572 ::decode(id, iter);
573 } catch (const buffer::error &err) {
574 CLS_ERR("failed to decode input parameters: %s", err.what());
575 return -EINVAL;
576 }
577
578 std::string key(key_from_client_id(id));
579 cls::journal::Client client;
580 int r = read_key(hctx, key, &client);
581 if (r < 0) {
582 return r;
583 }
584
585 ::encode(client, *out);
586 return 0;
587}
588
589/**
590 * Input:
591 * @param id (string) - unique client id
592 * @param data (bufferlist) - opaque data associated to client
593 *
594 * Output:
595 * @returns 0 on success, negative error code on failure
596 */
597int journal_client_register(cls_method_context_t hctx, bufferlist *in,
598 bufferlist *out) {
599 std::string id;
600 bufferlist data;
601 try {
602 bufferlist::iterator iter = in->begin();
603 ::decode(id, iter);
604 ::decode(data, iter);
605 } catch (const buffer::error &err) {
606 CLS_ERR("failed to decode input parameters: %s", err.what());
607 return -EINVAL;
608 }
609
610 uint8_t order;
611 int r = read_key(hctx, HEADER_KEY_ORDER, &order);
612 if (r < 0) {
613 return r;
614 }
615
616 std::string key(key_from_client_id(id));
617 bufferlist stored_clientbl;
618 r = cls_cxx_map_get_val(hctx, key, &stored_clientbl);
619 if (r >= 0) {
620 CLS_ERR("duplicate client id: %s", id.c_str());
621 return -EEXIST;
622 } else if (r != -ENOENT) {
623 return r;
624 }
625
626 cls::journal::ObjectSetPosition minset;
627 r = find_min_commit_position(hctx, &minset);
628 if (r < 0)
629 return r;
630
631 cls::journal::Client client(id, data, minset);
632 r = write_key(hctx, key, client);
633 if (r < 0) {
634 return r;
635 }
636 return 0;
637}
638
639/**
640 * Input:
641 * @param id (string) - unique client id
642 * @param data (bufferlist) - opaque data associated to client
643 *
644 * Output:
645 * @returns 0 on success, negative error code on failure
646 */
647int journal_client_update_data(cls_method_context_t hctx, bufferlist *in,
648 bufferlist *out) {
649 std::string id;
650 bufferlist data;
651 try {
652 bufferlist::iterator iter = in->begin();
653 ::decode(id, iter);
654 ::decode(data, iter);
655 } catch (const buffer::error &err) {
656 CLS_ERR("failed to decode input parameters: %s", err.what());
657 return -EINVAL;
658 }
659
660 std::string key(key_from_client_id(id));
661 cls::journal::Client client;
662 int r = read_key(hctx, key, &client);
663 if (r < 0) {
664 return r;
665 }
666
667 client.data = data;
668 r = write_key(hctx, key, client);
669 if (r < 0) {
670 return r;
671 }
672 return 0;
673}
674
675/**
676 * Input:
677 * @param id (string) - unique client id
678 * @param state (uint8_t) - client state
679 *
680 * Output:
681 * @returns 0 on success, negative error code on failure
682 */
683int journal_client_update_state(cls_method_context_t hctx, bufferlist *in,
684 bufferlist *out) {
685 std::string id;
686 cls::journal::ClientState state;
687 bufferlist data;
688 try {
689 bufferlist::iterator iter = in->begin();
690 ::decode(id, iter);
691 uint8_t state_raw;
692 ::decode(state_raw, iter);
693 state = static_cast<cls::journal::ClientState>(state_raw);
694 } catch (const buffer::error &err) {
695 CLS_ERR("failed to decode input parameters: %s", err.what());
696 return -EINVAL;
697 }
698
699 std::string key(key_from_client_id(id));
700 cls::journal::Client client;
701 int r = read_key(hctx, key, &client);
702 if (r < 0) {
703 return r;
704 }
705
706 client.state = state;
707 r = write_key(hctx, key, client);
708 if (r < 0) {
709 return r;
710 }
711 return 0;
712}
713
714/**
715 * Input:
716 * @param id (string) - unique client id
717 *
718 * Output:
719 * @returns 0 on success, negative error code on failure
720 */
721int journal_client_unregister(cls_method_context_t hctx, bufferlist *in,
722 bufferlist *out) {
723 std::string id;
724 try {
725 bufferlist::iterator iter = in->begin();
726 ::decode(id, iter);
727 } catch (const buffer::error &err) {
728 CLS_ERR("failed to decode input parameters: %s", err.what());
729 return -EINVAL;
730 }
731
732 std::string key(key_from_client_id(id));
733 bufferlist bl;
734 int r = cls_cxx_map_get_val(hctx, key, &bl);
735 if (r < 0) {
736 CLS_ERR("client is not registered: %s", id.c_str());
737 return r;
738 }
739
740 r = cls_cxx_map_remove_key(hctx, key);
741 if (r < 0) {
742 CLS_ERR("failed to remove omap key: %s", key.c_str());
743 return r;
744 }
745
746 // prune expired tags
747 r = expire_tags(hctx, &id);
748 if (r < 0) {
749 return r;
750 }
751 return 0;
752}
753
754/**
755 * Input:
756 * @param client_id (uint64_t) - unique client id
757 * @param commit_position (ObjectSetPosition)
758 *
759 * Output:
760 * @returns 0 on success, negative error code on failure
761 */
762int journal_client_commit(cls_method_context_t hctx, bufferlist *in,
763 bufferlist *out) {
764 std::string id;
765 cls::journal::ObjectSetPosition commit_position;
766 try {
767 bufferlist::iterator iter = in->begin();
768 ::decode(id, iter);
769 ::decode(commit_position, iter);
770 } catch (const buffer::error &err) {
771 CLS_ERR("failed to decode input parameters: %s", err.what());
772 return -EINVAL;
773 }
774
775 uint8_t splay_width;
776 int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width);
777 if (r < 0) {
778 return r;
779 }
780 if (commit_position.object_positions.size() > splay_width) {
781 CLS_ERR("too many object positions");
782 return -EINVAL;
783 }
784
785 std::string key(key_from_client_id(id));
786 cls::journal::Client client;
787 r = read_key(hctx, key, &client);
788 if (r < 0) {
789 return r;
790 }
791
792 if (client.commit_position == commit_position) {
793 return 0;
794 }
795
796 client.commit_position = commit_position;
797 r = write_key(hctx, key, client);
798 if (r < 0) {
799 return r;
800 }
801 return 0;
802}
803
804/**
805 * Input:
806 * @param start_after (string)
807 * @param max_return (uint64_t)
808 *
809 * Output:
810 * clients (set<cls::journal::Client>) - collection of registered clients
811 * @returns 0 on success, negative error code on failure
812 */
813int journal_client_list(cls_method_context_t hctx, bufferlist *in,
814 bufferlist *out) {
815 std::string start_after;
816 uint64_t max_return;
817 try {
818 bufferlist::iterator iter = in->begin();
819 ::decode(start_after, iter);
820 ::decode(max_return, iter);
821 } catch (const buffer::error &err) {
822 CLS_ERR("failed to decode input parameters: %s", err.what());
823 return -EINVAL;
824 }
825
826 std::set<cls::journal::Client> clients;
827 int r = get_client_list_range(hctx, &clients, start_after, max_return);
828 if (r < 0)
829 return r;
830
831 ::encode(clients, *out);
832 return 0;
833}
834
835/**
836 * Input:
837 * none
838 *
839 * Output:
840 * @returns 0 on success, negative error code on failure
841 */
842int journal_get_next_tag_tid(cls_method_context_t hctx, bufferlist *in,
843 bufferlist *out) {
844 uint64_t tag_tid;
845 int r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &tag_tid);
846 if (r < 0) {
847 return r;
848 }
849
850 ::encode(tag_tid, *out);
851 return 0;
852}
853
854/**
855 * Input:
856 * @param tag_tid (uint64_t)
857 *
858 * Output:
859 * cls::journal::Tag
860 * @returns 0 on success, negative error code on failure
861 */
862int journal_get_tag(cls_method_context_t hctx, bufferlist *in,
863 bufferlist *out) {
864 uint64_t tag_tid;
865 try {
866 bufferlist::iterator iter = in->begin();
867 ::decode(tag_tid, iter);
868 } catch (const buffer::error &err) {
869 CLS_ERR("failed to decode input parameters: %s", err.what());
870 return -EINVAL;
871 }
872
873 std::string key(key_from_tag_tid(tag_tid));
874 cls::journal::Tag tag;
875 int r = read_key(hctx, key, &tag);
876 if (r < 0) {
877 return r;
878 }
879
880 ::encode(tag, *out);
881 return 0;
882}
883
884/**
885 * Input:
886 * @param tag_tid (uint64_t)
887 * @param tag_class (uint64_t)
888 * @param data (bufferlist)
889 *
890 * Output:
891 * @returns 0 on success, negative error code on failure
892 */
893int journal_tag_create(cls_method_context_t hctx, bufferlist *in,
894 bufferlist *out) {
895 uint64_t tag_tid;
896 uint64_t tag_class;
897 bufferlist data;
898 try {
899 bufferlist::iterator iter = in->begin();
900 ::decode(tag_tid, iter);
901 ::decode(tag_class, iter);
902 ::decode(data, iter);
903 } catch (const buffer::error &err) {
904 CLS_ERR("failed to decode input parameters: %s", err.what());
905 return -EINVAL;
906 }
907
908 std::string key(key_from_tag_tid(tag_tid));
909 bufferlist stored_tag_bl;
910 int r = cls_cxx_map_get_val(hctx, key, &stored_tag_bl);
911 if (r >= 0) {
912 CLS_ERR("duplicate tag id: %" PRIu64, tag_tid);
913 return -EEXIST;
914 } else if (r != -ENOENT) {
915 return r;
916 }
917
918 // verify tag tid ordering
919 uint64_t next_tag_tid;
920 r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &next_tag_tid);
921 if (r < 0) {
922 return r;
923 }
924 if (tag_tid != next_tag_tid) {
925 CLS_LOG(5, "out-of-order tag sequence: %" PRIu64, tag_tid);
926 return -ESTALE;
927 }
928
929 uint64_t next_tag_class;
930 r = read_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, &next_tag_class);
931 if (r < 0) {
932 return r;
933 }
934
935 if (tag_class == cls::journal::Tag::TAG_CLASS_NEW) {
936 // allocate a new tag class
937 tag_class = next_tag_class;
938 r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_class + 1);
939 if (r < 0) {
940 return r;
941 }
942 } else {
943 // verify tag class range
944 if (tag_class >= next_tag_class) {
945 CLS_ERR("out-of-sequence tag class: %" PRIu64, tag_class);
946 return -EINVAL;
947 }
948 }
949
950 // prune expired tags
951 r = expire_tags(hctx, nullptr);
952 if (r < 0) {
953 return r;
954 }
955
956 // update tag tid sequence
957 r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_tid + 1);
958 if (r < 0) {
959 return r;
960 }
961
962 // write tag structure
963 cls::journal::Tag tag(tag_tid, tag_class, data);
964 key = key_from_tag_tid(tag_tid);
965 r = write_key(hctx, key, tag);
966 if (r < 0) {
967 return r;
968 }
969 return 0;
970}
971
972/**
973 * Input:
974 * @param start_after_tag_tid (uint64_t) - first tag tid
975 * @param max_return (uint64_t) - max tags to return
976 * @param client_id (std::string) - client id filter
977 * @param tag_class (boost::optional<uint64_t> - optional tag class filter
978 *
979 * Output:
980 * std::set<cls::journal::Tag> - collection of tags
981 * @returns 0 on success, negative error code on failure
982 */
983int journal_tag_list(cls_method_context_t hctx, bufferlist *in,
984 bufferlist *out) {
985 uint64_t start_after_tag_tid;
986 uint64_t max_return;
987 std::string client_id;
988 boost::optional<uint64_t> tag_class(0);
989
990 // handle compiler false positive about use-before-init
991 tag_class = boost::none;
992 try {
993 bufferlist::iterator iter = in->begin();
994 ::decode(start_after_tag_tid, iter);
995 ::decode(max_return, iter);
996 ::decode(client_id, iter);
997 ::decode(tag_class, iter);
998 } catch (const buffer::error &err) {
999 CLS_ERR("failed to decode input parameters: %s", err.what());
1000 return -EINVAL;
1001 }
1002
1003 // calculate the minimum tag within client's commit position
1004 uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
1005 cls::journal::Client client;
1006 int r = read_key(hctx, key_from_client_id(client_id), &client);
1007 if (r < 0) {
1008 return r;
1009 }
1010
1011 for (auto object_position : client.commit_position.object_positions) {
1012 minimum_tag_tid = MIN(minimum_tag_tid, object_position.tag_tid);
1013 }
1014
1015 // compute minimum tags in use per-class
1016 std::set<cls::journal::Tag> tags;
1017 std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
1018 typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
1019 TAG_PASS_LIST,
1020 TAG_PASS_DONE } TagPass;
1021 int tag_pass = (minimum_tag_tid == std::numeric_limits<uint64_t>::max() ?
1022 TAG_PASS_LIST : TAG_PASS_CALCULATE_MINIMUMS);
1023 std::string last_read = HEADER_KEY_TAG_PREFIX;
1024 do {
1025 std::map<std::string, bufferlist> vals;
c07f9fc5 1026 bool more;
7c673cae 1027 r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
c07f9fc5 1028 MAX_KEYS_READ, &vals, &more);
7c673cae
FG
1029 if (r < 0 && r != -ENOENT) {
1030 CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
1031 return r;
1032 }
1033
1034 for (auto &val : vals) {
1035 cls::journal::Tag tag;
1036 bufferlist::iterator iter = val.second.begin();
1037 try {
1038 ::decode(tag, iter);
1039 } catch (const buffer::error &err) {
1040 CLS_ERR("error decoding tag: %s", val.first.c_str());
1041 return -EIO;
1042 }
1043
1044 if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
1045 minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
1046
1047 // completed calculation of tag class minimums
1048 if (tag.tid >= minimum_tag_tid) {
1049 vals.clear();
1050 break;
1051 }
1052 } else if (tag_pass == TAG_PASS_LIST) {
1053 if (start_after_tag_tid != 0 && tag.tid <= start_after_tag_tid) {
1054 continue;
1055 }
1056
1057 if (tag.tid >= minimum_tag_class_to_tids[tag.tag_class] &&
1058 (!tag_class || *tag_class == tag.tag_class)) {
1059 tags.insert(tag);
1060 }
1061 if (tags.size() >= max_return) {
1062 tag_pass = TAG_PASS_DONE;
1063 }
1064 }
1065 }
1066
c07f9fc5 1067 if (tag_pass != TAG_PASS_DONE && !more) {
7c673cae
FG
1068 last_read = HEADER_KEY_TAG_PREFIX;
1069 ++tag_pass;
1070 } else if (!vals.empty()) {
1071 last_read = vals.rbegin()->first;
1072 }
1073 } while (tag_pass != TAG_PASS_DONE);
1074
1075 ::encode(tags, *out);
1076 return 0;
1077}
1078
1079/**
1080 * Input:
1081 * @param soft_max_size (uint64_t)
1082 *
1083 * Output:
1084 * @returns 0 if object size less than max, negative error code otherwise
1085 */
1086int journal_object_guard_append(cls_method_context_t hctx, bufferlist *in,
1087 bufferlist *out) {
1088 uint64_t soft_max_size;
1089 try {
1090 bufferlist::iterator iter = in->begin();
1091 ::decode(soft_max_size, iter);
1092 } catch (const buffer::error &err) {
1093 CLS_ERR("failed to decode input parameters: %s", err.what());
1094 return -EINVAL;
1095 }
1096
1097 uint64_t size;
1098 time_t mtime;
1099 int r = cls_cxx_stat(hctx, &size, &mtime);
1100 if (r == -ENOENT) {
1101 return 0;
1102 } else if (r < 0) {
1103 CLS_ERR("failed to stat object: %s", cpp_strerror(r).c_str());
1104 return r;
1105 }
1106
1107 if (size >= soft_max_size) {
1108 CLS_LOG(5, "journal object full: %" PRIu64 " >= %" PRIu64,
1109 size, soft_max_size);
1110 return -EOVERFLOW;
1111 }
1112 return 0;
1113}
1114
1115CLS_INIT(journal)
1116{
1117 CLS_LOG(20, "Loaded journal class!");
1118
1119 cls_handle_t h_class;
1120 cls_method_handle_t h_journal_create;
1121 cls_method_handle_t h_journal_get_order;
1122 cls_method_handle_t h_journal_get_splay_width;
1123 cls_method_handle_t h_journal_get_pool_id;
1124 cls_method_handle_t h_journal_get_minimum_set;
1125 cls_method_handle_t h_journal_set_minimum_set;
1126 cls_method_handle_t h_journal_get_active_set;
1127 cls_method_handle_t h_journal_set_active_set;
1128 cls_method_handle_t h_journal_get_client;
1129 cls_method_handle_t h_journal_client_register;
1130 cls_method_handle_t h_journal_client_update_data;
1131 cls_method_handle_t h_journal_client_update_state;
1132 cls_method_handle_t h_journal_client_unregister;
1133 cls_method_handle_t h_journal_client_commit;
1134 cls_method_handle_t h_journal_client_list;
1135 cls_method_handle_t h_journal_get_next_tag_tid;
1136 cls_method_handle_t h_journal_get_tag;
1137 cls_method_handle_t h_journal_tag_create;
1138 cls_method_handle_t h_journal_tag_list;
1139 cls_method_handle_t h_journal_object_guard_append;
1140
1141 cls_register("journal", &h_class);
1142
1143 /// methods for journal.$journal_id objects
1144 cls_register_cxx_method(h_class, "create",
1145 CLS_METHOD_RD | CLS_METHOD_WR,
1146 journal_create, &h_journal_create);
1147 cls_register_cxx_method(h_class, "get_order",
1148 CLS_METHOD_RD,
1149 journal_get_order, &h_journal_get_order);
1150 cls_register_cxx_method(h_class, "get_splay_width",
1151 CLS_METHOD_RD,
1152 journal_get_splay_width, &h_journal_get_splay_width);
1153 cls_register_cxx_method(h_class, "get_pool_id",
1154 CLS_METHOD_RD,
1155 journal_get_pool_id, &h_journal_get_pool_id);
1156 cls_register_cxx_method(h_class, "get_minimum_set",
1157 CLS_METHOD_RD,
1158 journal_get_minimum_set,
1159 &h_journal_get_minimum_set);
1160 cls_register_cxx_method(h_class, "set_minimum_set",
1161 CLS_METHOD_RD | CLS_METHOD_WR,
1162 journal_set_minimum_set,
1163 &h_journal_set_minimum_set);
1164 cls_register_cxx_method(h_class, "get_active_set",
1165 CLS_METHOD_RD,
1166 journal_get_active_set,
1167 &h_journal_get_active_set);
1168 cls_register_cxx_method(h_class, "set_active_set",
1169 CLS_METHOD_RD | CLS_METHOD_WR,
1170 journal_set_active_set,
1171 &h_journal_set_active_set);
1172
1173 cls_register_cxx_method(h_class, "get_client",
1174 CLS_METHOD_RD,
1175 journal_get_client, &h_journal_get_client);
1176 cls_register_cxx_method(h_class, "client_register",
1177 CLS_METHOD_RD | CLS_METHOD_WR,
1178 journal_client_register, &h_journal_client_register);
1179 cls_register_cxx_method(h_class, "client_update_data",
1180 CLS_METHOD_RD | CLS_METHOD_WR,
1181 journal_client_update_data,
1182 &h_journal_client_update_data);
1183 cls_register_cxx_method(h_class, "client_update_state",
1184 CLS_METHOD_RD | CLS_METHOD_WR,
1185 journal_client_update_state,
1186 &h_journal_client_update_state);
1187 cls_register_cxx_method(h_class, "client_unregister",
1188 CLS_METHOD_RD | CLS_METHOD_WR,
1189 journal_client_unregister,
1190 &h_journal_client_unregister);
1191 cls_register_cxx_method(h_class, "client_commit",
1192 CLS_METHOD_RD | CLS_METHOD_WR,
1193 journal_client_commit, &h_journal_client_commit);
1194 cls_register_cxx_method(h_class, "client_list",
1195 CLS_METHOD_RD,
1196 journal_client_list, &h_journal_client_list);
1197
1198 cls_register_cxx_method(h_class, "get_next_tag_tid",
1199 CLS_METHOD_RD,
1200 journal_get_next_tag_tid,
1201 &h_journal_get_next_tag_tid);
1202 cls_register_cxx_method(h_class, "get_tag",
1203 CLS_METHOD_RD,
1204 journal_get_tag, &h_journal_get_tag);
1205 cls_register_cxx_method(h_class, "tag_create",
1206 CLS_METHOD_RD | CLS_METHOD_WR,
1207 journal_tag_create, &h_journal_tag_create);
1208 cls_register_cxx_method(h_class, "tag_list",
1209 CLS_METHOD_RD,
1210 journal_tag_list, &h_journal_tag_list);
1211
1212 /// methods for journal_data.$journal_id.$object_id objects
1213 cls_register_cxx_method(h_class, "guard_append",
1214 CLS_METHOD_RD | CLS_METHOD_WR,
1215 journal_object_guard_append,
1216 &h_journal_object_guard_append);
1217}