]> git.proxmox.com Git - ceph.git/blame - ceph/src/cls/journal/cls_journal.cc
buildsys: switch source download to quincy
[ceph.git] / ceph / src / cls / journal / cls_journal.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "include/int_types.h"
5#include "include/buffer.h"
6#include "include/encoding.h"
7#include "common/errno.h"
8#include "objclass/objclass.h"
9#include "cls/journal/cls_journal_types.h"
10#include <errno.h>
11#include <map>
12#include <string>
13#include <sstream>
14
15CLS_VER(1, 0)
16CLS_NAME(journal)
17
f67539c2
TL
18using std::string;
19
20using ceph::bufferlist;
21using ceph::decode;
22using ceph::encode;
23
7c673cae
FG
24namespace {
25
26static const uint64_t MAX_KEYS_READ = 64;
27
28static const std::string HEADER_KEY_ORDER = "order";
29static const std::string HEADER_KEY_SPLAY_WIDTH = "splay_width";
30static const std::string HEADER_KEY_POOL_ID = "pool_id";
31static const std::string HEADER_KEY_MINIMUM_SET = "minimum_set";
32static const std::string HEADER_KEY_ACTIVE_SET = "active_set";
33static const std::string HEADER_KEY_NEXT_TAG_TID = "next_tag_tid";
34static const std::string HEADER_KEY_NEXT_TAG_CLASS = "next_tag_class";
35static const std::string HEADER_KEY_CLIENT_PREFIX = "client_";
36static const std::string HEADER_KEY_TAG_PREFIX = "tag_";
37
38std::string to_hex(uint64_t value) {
39 std::ostringstream oss;
40 oss << std::setw(16) << std::setfill('0') << std::hex << value;
41 return oss.str();
42}
43
44std::string key_from_client_id(const std::string &client_id) {
45 return HEADER_KEY_CLIENT_PREFIX + client_id;
46}
47
48std::string key_from_tag_tid(uint64_t tag_tid) {
49 return HEADER_KEY_TAG_PREFIX + to_hex(tag_tid);
50}
51
52uint64_t tag_tid_from_key(const std::string &key) {
53 std::istringstream iss(key);
54 uint64_t id;
55 iss.ignore(HEADER_KEY_TAG_PREFIX.size()) >> std::hex >> id;
56 return id;
57}
58
59template <typename T>
60int read_key(cls_method_context_t hctx, const string &key, T *t,
61 bool ignore_enoent = false) {
62 bufferlist bl;
63 int r = cls_cxx_map_get_val(hctx, key, &bl);
494da23a
TL
64 if (r == -ENOENT) {
65 if (ignore_enoent) {
66 r = 0;
67 }
68 return r;
7c673cae
FG
69 } else if (r < 0) {
70 CLS_ERR("failed to get omap key: %s", key.c_str());
71 return r;
72 }
73
74 try {
11fdf7f2
TL
75 auto iter = bl.cbegin();
76 decode(*t, iter);
f67539c2 77 } catch (const ceph::buffer::error &err) {
7c673cae
FG
78 CLS_ERR("failed to decode input parameters: %s", err.what());
79 return -EINVAL;
80 }
81 return 0;
82}
83
84template <typename T>
85int write_key(cls_method_context_t hctx, const string &key, const T &t) {
86 bufferlist bl;
11fdf7f2 87 encode(t, bl);
7c673cae
FG
88
89 int r = cls_cxx_map_set_val(hctx, key, &bl);
90 if (r < 0) {
91 CLS_ERR("failed to set omap key: %s", key.c_str());
92 return r;
93 }
94 return 0;
95}
96
97int remove_key(cls_method_context_t hctx, const string &key) {
98 int r = cls_cxx_map_remove_key(hctx, key);
99 if (r < 0 && r != -ENOENT) {
100 CLS_ERR("failed to remove key: %s", key.c_str());
101 return r;
102 }
103 return 0;
104}
105
106int expire_tags(cls_method_context_t hctx, const std::string *skip_client_id) {
107
108 std::string skip_client_key;
109 if (skip_client_id != nullptr) {
110 skip_client_key = key_from_client_id(*skip_client_id);
111 }
112
7c673cae 113 uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
b32b8144 114 std::string last_read = "";
c07f9fc5 115 bool more;
7c673cae
FG
116 do {
117 std::map<std::string, bufferlist> vals;
c07f9fc5
FG
118 int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
119 MAX_KEYS_READ, &vals, &more);
7c673cae
FG
120 if (r < 0 && r != -ENOENT) {
121 CLS_ERR("failed to retrieve registered clients: %s",
122 cpp_strerror(r).c_str());
123 return r;
124 }
125
126 for (auto &val : vals) {
127 // if we are removing a client, skip its commit positions
128 if (val.first == skip_client_key) {
129 continue;
130 }
131
132 cls::journal::Client client;
11fdf7f2 133 auto iter = val.second.cbegin();
7c673cae 134 try {
11fdf7f2 135 decode(client, iter);
f67539c2 136 } catch (const ceph::buffer::error &err) {
7c673cae
FG
137 CLS_ERR("error decoding registered client: %s",
138 val.first.c_str());
139 return -EIO;
140 }
141
b32b8144
FG
142 if (client.state == cls::journal::CLIENT_STATE_DISCONNECTED) {
143 // don't allow a disconnected client to prevent pruning
144 continue;
145 } else if (client.commit_position.object_positions.empty()) {
146 // cannot prune if one or more clients has an empty commit history
147 return 0;
148 }
149
7c673cae 150 for (auto object_position : client.commit_position.object_positions) {
11fdf7f2 151 minimum_tag_tid = std::min(minimum_tag_tid, object_position.tag_tid);
7c673cae
FG
152 }
153 }
154 if (!vals.empty()) {
155 last_read = vals.rbegin()->first;
156 }
c07f9fc5 157 } while (more);
7c673cae
FG
158
159 // cannot expire tags if a client hasn't committed yet
160 if (minimum_tag_tid == std::numeric_limits<uint64_t>::max()) {
161 return 0;
162 }
163
164 // compute the minimum in-use tag for each class
165 std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
166 typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
167 TAG_PASS_SCRUB,
168 TAG_PASS_DONE } TagPass;
169 int tag_pass = TAG_PASS_CALCULATE_MINIMUMS;
170 last_read = HEADER_KEY_TAG_PREFIX;
171 do {
172 std::map<std::string, bufferlist> vals;
c07f9fc5
FG
173 int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
174 MAX_KEYS_READ, &vals, &more);
7c673cae
FG
175 if (r < 0 && r != -ENOENT) {
176 CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
177 return r;
178 }
179
180 for (auto &val : vals) {
181 cls::journal::Tag tag;
11fdf7f2 182 auto iter = val.second.cbegin();
7c673cae 183 try {
11fdf7f2 184 decode(tag, iter);
f67539c2 185 } catch (const ceph::buffer::error &err) {
7c673cae
FG
186 CLS_ERR("error decoding tag: %s", val.first.c_str());
187 return -EIO;
188 }
189
190 if (tag.tid != tag_tid_from_key(val.first)) {
191 CLS_ERR("tag tid mismatched: %s", val.first.c_str());
192 return -EINVAL;
193 }
194
195 if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
196 minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
197 } else if (tag_pass == TAG_PASS_SCRUB &&
198 tag.tid < minimum_tag_class_to_tids[tag.tag_class]) {
199 r = remove_key(hctx, val.first);
200 if (r < 0) {
201 return r;
202 }
203 }
204
205 if (tag.tid >= minimum_tag_tid) {
206 // no need to check for tag classes beyond this point
207 vals.clear();
3efd9988 208 more = false;
7c673cae
FG
209 break;
210 }
211 }
212
c07f9fc5 213 if (tag_pass != TAG_PASS_DONE && !more) {
7c673cae
FG
214 last_read = HEADER_KEY_TAG_PREFIX;
215 ++tag_pass;
216 } else if (!vals.empty()) {
217 last_read = vals.rbegin()->first;
218 }
219 } while (tag_pass != TAG_PASS_DONE);
220 return 0;
221}
222
223int get_client_list_range(cls_method_context_t hctx,
224 std::set<cls::journal::Client> *clients,
225 std::string start_after, uint64_t max_return) {
226 std::string last_read;
227 if (!start_after.empty()) {
228 last_read = key_from_client_id(start_after);
229 }
230
231 std::map<std::string, bufferlist> vals;
c07f9fc5 232 bool more;
7c673cae 233 int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
c07f9fc5 234 max_return, &vals, &more);
7c673cae
FG
235 if (r < 0) {
236 CLS_ERR("failed to retrieve omap values: %s", cpp_strerror(r).c_str());
237 return r;
238 }
239
240 for (std::map<std::string, bufferlist>::iterator it = vals.begin();
241 it != vals.end(); ++it) {
242 try {
11fdf7f2 243 auto iter = it->second.cbegin();
7c673cae
FG
244
245 cls::journal::Client client;
11fdf7f2 246 decode(client, iter);
7c673cae 247 clients->insert(client);
f67539c2 248 } catch (const ceph::buffer::error &err) {
7c673cae
FG
249 CLS_ERR("could not decode client '%s': %s", it->first.c_str(),
250 err.what());
251 return -EIO;
252 }
253 }
254
255 return 0;
256}
257
258int find_min_commit_position(cls_method_context_t hctx,
259 cls::journal::ObjectSetPosition *minset) {
260 int r;
261 bool valid = false;
262 std::string start_after = "";
263 uint64_t tag_tid = 0, entry_tid = 0;
264
265 while (true) {
266 std::set<cls::journal::Client> batch;
267
268 r = get_client_list_range(hctx, &batch, start_after, cls::journal::JOURNAL_MAX_RETURN);
269 if ((r < 0) || batch.empty()) {
270 break;
271 }
272
273 start_after = batch.rbegin()->id;
274
275 // update the (minimum) commit position from this batch of clients
276 for(std::set<cls::journal::Client>::iterator it = batch.begin();
277 it != batch.end(); ++it) {
278 cls::journal::ObjectSetPosition object_set_position = (*it).commit_position;
279 if (object_set_position.object_positions.empty()) {
280 *minset = cls::journal::ObjectSetPosition();
281 break;
282 }
283 cls::journal::ObjectPosition first = object_set_position.object_positions.front();
284
285 // least tag_tid (or least entry_tid for matching tag_tid)
286 if (!valid || (tag_tid > first.tag_tid) || ((tag_tid == first.tag_tid) && (entry_tid > first.entry_tid))) {
287 tag_tid = first.tag_tid;
288 entry_tid = first.entry_tid;
289 *minset = cls::journal::ObjectSetPosition(object_set_position);
290 valid = true;
291 }
292 }
293
294 // got the last batch, we're done
295 if (batch.size() < cls::journal::JOURNAL_MAX_RETURN) {
296 break;
297 }
298 }
299
300 return r;
301}
302
303} // anonymous namespace
304
305/**
306 * Input:
307 * @param order (uint8_t) - bits to shift to compute the object max size
308 * @param splay width (uint8_t) - number of active journal objects
309 *
310 * Output:
311 * @returns 0 on success, negative error code on failure
312 */
313int journal_create(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
314 uint8_t order;
315 uint8_t splay_width;
316 int64_t pool_id;
317 try {
11fdf7f2
TL
318 auto iter = in->cbegin();
319 decode(order, iter);
320 decode(splay_width, iter);
321 decode(pool_id, iter);
f67539c2 322 } catch (const ceph::buffer::error &err) {
7c673cae
FG
323 CLS_ERR("failed to decode input parameters: %s", err.what());
324 return -EINVAL;
325 }
326
327 bufferlist stored_orderbl;
328 int r = cls_cxx_map_get_val(hctx, HEADER_KEY_ORDER, &stored_orderbl);
329 if (r >= 0) {
330 CLS_ERR("journal already exists");
331 return -EEXIST;
332 } else if (r != -ENOENT) {
333 return r;
334 }
335
336 r = write_key(hctx, HEADER_KEY_ORDER, order);
337 if (r < 0) {
338 return r;
339 }
340
341 r = write_key(hctx, HEADER_KEY_SPLAY_WIDTH, splay_width);
342 if (r < 0) {
343 return r;
344 }
345
346 r = write_key(hctx, HEADER_KEY_POOL_ID, pool_id);
347 if (r < 0) {
348 return r;
349 }
350
351 uint64_t object_set = 0;
352 r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set);
353 if (r < 0) {
354 return r;
355 }
356
357 r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set);
358 if (r < 0) {
359 return r;
360 }
361
362 uint64_t tag_id = 0;
363 r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_id);
364 if (r < 0) {
365 return r;
366 }
367
368 r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_id);
369 if (r < 0) {
370 return r;
371 }
372 return 0;
373}
374
375/**
376 * Input:
377 * none
378 *
379 * Output:
380 * order (uint8_t)
381 * @returns 0 on success, negative error code on failure
382 */
383int journal_get_order(cls_method_context_t hctx, bufferlist *in,
384 bufferlist *out) {
385 uint8_t order;
386 int r = read_key(hctx, HEADER_KEY_ORDER, &order);
387 if (r < 0) {
388 return r;
389 }
390
11fdf7f2 391 encode(order, *out);
7c673cae
FG
392 return 0;
393}
394
395/**
396 * Input:
397 * none
398 *
399 * Output:
11fdf7f2 400 * splay_width (uint8_t)
7c673cae
FG
401 * @returns 0 on success, negative error code on failure
402 */
403int journal_get_splay_width(cls_method_context_t hctx, bufferlist *in,
404 bufferlist *out) {
405 uint8_t splay_width;
406 int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width);
407 if (r < 0) {
408 return r;
409 }
410
11fdf7f2 411 encode(splay_width, *out);
7c673cae
FG
412 return 0;
413}
414
415/**
416 * Input:
417 * none
418 *
419 * Output:
420 * pool_id (int64_t)
421 * @returns 0 on success, negative error code on failure
422 */
423int journal_get_pool_id(cls_method_context_t hctx, bufferlist *in,
424 bufferlist *out) {
11fdf7f2 425 int64_t pool_id = 0;
7c673cae
FG
426 int r = read_key(hctx, HEADER_KEY_POOL_ID, &pool_id);
427 if (r < 0) {
428 return r;
429 }
430
11fdf7f2 431 encode(pool_id, *out);
7c673cae
FG
432 return 0;
433}
434
435/**
436 * Input:
437 * none
438 *
439 * Output:
440 * object set (uint64_t)
441 * @returns 0 on success, negative error code on failure
442 */
443int journal_get_minimum_set(cls_method_context_t hctx, bufferlist *in,
444 bufferlist *out) {
445 uint64_t minimum_set;
446 int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &minimum_set);
447 if (r < 0) {
448 return r;
449 }
450
11fdf7f2 451 encode(minimum_set, *out);
7c673cae
FG
452 return 0;
453}
454
455/**
456 * Input:
457 * @param object set (uint64_t)
458 *
459 * Output:
460 * @returns 0 on success, negative error code on failure
461 */
462int journal_set_minimum_set(cls_method_context_t hctx, bufferlist *in,
463 bufferlist *out) {
464 uint64_t object_set;
465 try {
11fdf7f2
TL
466 auto iter = in->cbegin();
467 decode(object_set, iter);
f67539c2 468 } catch (const ceph::buffer::error &err) {
7c673cae
FG
469 CLS_ERR("failed to decode input parameters: %s", err.what());
470 return -EINVAL;
471 }
472
473 uint64_t current_active_set;
474 int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &current_active_set);
475 if (r < 0) {
476 return r;
477 }
478
479 if (current_active_set < object_set) {
494da23a 480 CLS_LOG(10, "active object set earlier than minimum: %" PRIu64
7c673cae
FG
481 " < %" PRIu64, current_active_set, object_set);
482 return -EINVAL;
483 }
484
485 uint64_t current_minimum_set;
486 r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &current_minimum_set);
487 if (r < 0) {
488 return r;
489 }
490
491 if (object_set == current_minimum_set) {
492 return 0;
493 } else if (object_set < current_minimum_set) {
494 CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64,
495 object_set, current_minimum_set);
496 return -ESTALE;
497 }
498
499 r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set);
500 if (r < 0) {
501 return r;
502 }
503 return 0;
504}
505
506/**
507 * Input:
508 * none
509 *
510 * Output:
511 * object set (uint64_t)
512 * @returns 0 on success, negative error code on failure
513 */
514int journal_get_active_set(cls_method_context_t hctx, bufferlist *in,
515 bufferlist *out) {
516 uint64_t active_set;
517 int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &active_set);
518 if (r < 0) {
519 return r;
520 }
521
11fdf7f2 522 encode(active_set, *out);
7c673cae
FG
523 return 0;
524}
525
526/**
527 * Input:
528 * @param object set (uint64_t)
529 *
530 * Output:
531 * @returns 0 on success, negative error code on failure
532 */
533int journal_set_active_set(cls_method_context_t hctx, bufferlist *in,
534 bufferlist *out) {
535 uint64_t object_set;
536 try {
11fdf7f2
TL
537 auto iter = in->cbegin();
538 decode(object_set, iter);
f67539c2 539 } catch (const ceph::buffer::error &err) {
7c673cae
FG
540 CLS_ERR("failed to decode input parameters: %s", err.what());
541 return -EINVAL;
542 }
543
544 uint64_t current_minimum_set;
545 int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &current_minimum_set);
546 if (r < 0) {
547 return r;
548 }
549
550 if (current_minimum_set > object_set) {
551 CLS_ERR("minimum object set later than active: %" PRIu64
552 " > %" PRIu64, current_minimum_set, object_set);
553 return -EINVAL;
554 }
555
556 uint64_t current_active_set;
557 r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &current_active_set);
558 if (r < 0) {
559 return r;
560 }
561
562 if (object_set == current_active_set) {
563 return 0;
564 } else if (object_set < current_active_set) {
565 CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64,
566 object_set, current_active_set);
567 return -ESTALE;
568 }
569
570 r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set);
571 if (r < 0) {
572 return r;
573 }
574 return 0;
575}
576
577/**
578 * Input:
579 * @param id (string) - unique client id
580 *
581 * Output:
582 * cls::journal::Client
583 * @returns 0 on success, negative error code on failure
584 */
585int journal_get_client(cls_method_context_t hctx, bufferlist *in,
586 bufferlist *out) {
587 std::string id;
588 try {
11fdf7f2
TL
589 auto iter = in->cbegin();
590 decode(id, iter);
f67539c2 591 } catch (const ceph::buffer::error &err) {
7c673cae
FG
592 CLS_ERR("failed to decode input parameters: %s", err.what());
593 return -EINVAL;
594 }
595
596 std::string key(key_from_client_id(id));
597 cls::journal::Client client;
598 int r = read_key(hctx, key, &client);
599 if (r < 0) {
600 return r;
601 }
602
11fdf7f2 603 encode(client, *out);
7c673cae
FG
604 return 0;
605}
606
607/**
608 * Input:
609 * @param id (string) - unique client id
610 * @param data (bufferlist) - opaque data associated to client
611 *
612 * Output:
613 * @returns 0 on success, negative error code on failure
614 */
615int journal_client_register(cls_method_context_t hctx, bufferlist *in,
616 bufferlist *out) {
617 std::string id;
618 bufferlist data;
619 try {
11fdf7f2
TL
620 auto iter = in->cbegin();
621 decode(id, iter);
622 decode(data, iter);
f67539c2 623 } catch (const ceph::buffer::error &err) {
7c673cae
FG
624 CLS_ERR("failed to decode input parameters: %s", err.what());
625 return -EINVAL;
626 }
627
628 uint8_t order;
629 int r = read_key(hctx, HEADER_KEY_ORDER, &order);
630 if (r < 0) {
631 return r;
632 }
633
634 std::string key(key_from_client_id(id));
635 bufferlist stored_clientbl;
636 r = cls_cxx_map_get_val(hctx, key, &stored_clientbl);
637 if (r >= 0) {
638 CLS_ERR("duplicate client id: %s", id.c_str());
639 return -EEXIST;
640 } else if (r != -ENOENT) {
641 return r;
642 }
643
644 cls::journal::ObjectSetPosition minset;
645 r = find_min_commit_position(hctx, &minset);
646 if (r < 0)
647 return r;
648
649 cls::journal::Client client(id, data, minset);
650 r = write_key(hctx, key, client);
651 if (r < 0) {
652 return r;
653 }
654 return 0;
655}
656
657/**
658 * Input:
659 * @param id (string) - unique client id
660 * @param data (bufferlist) - opaque data associated to client
661 *
662 * Output:
663 * @returns 0 on success, negative error code on failure
664 */
665int journal_client_update_data(cls_method_context_t hctx, bufferlist *in,
666 bufferlist *out) {
667 std::string id;
668 bufferlist data;
669 try {
11fdf7f2
TL
670 auto iter = in->cbegin();
671 decode(id, iter);
672 decode(data, iter);
f67539c2 673 } catch (const ceph::buffer::error &err) {
7c673cae
FG
674 CLS_ERR("failed to decode input parameters: %s", err.what());
675 return -EINVAL;
676 }
677
678 std::string key(key_from_client_id(id));
679 cls::journal::Client client;
680 int r = read_key(hctx, key, &client);
681 if (r < 0) {
682 return r;
683 }
684
685 client.data = data;
686 r = write_key(hctx, key, client);
687 if (r < 0) {
688 return r;
689 }
690 return 0;
691}
692
693/**
694 * Input:
695 * @param id (string) - unique client id
696 * @param state (uint8_t) - client state
697 *
698 * Output:
699 * @returns 0 on success, negative error code on failure
700 */
701int journal_client_update_state(cls_method_context_t hctx, bufferlist *in,
702 bufferlist *out) {
703 std::string id;
704 cls::journal::ClientState state;
705 bufferlist data;
706 try {
11fdf7f2
TL
707 auto iter = in->cbegin();
708 decode(id, iter);
7c673cae 709 uint8_t state_raw;
11fdf7f2 710 decode(state_raw, iter);
7c673cae 711 state = static_cast<cls::journal::ClientState>(state_raw);
f67539c2 712 } catch (const ceph::buffer::error &err) {
7c673cae
FG
713 CLS_ERR("failed to decode input parameters: %s", err.what());
714 return -EINVAL;
715 }
716
717 std::string key(key_from_client_id(id));
718 cls::journal::Client client;
719 int r = read_key(hctx, key, &client);
720 if (r < 0) {
721 return r;
722 }
723
724 client.state = state;
725 r = write_key(hctx, key, client);
726 if (r < 0) {
727 return r;
728 }
729 return 0;
730}
731
732/**
733 * Input:
734 * @param id (string) - unique client id
735 *
736 * Output:
737 * @returns 0 on success, negative error code on failure
738 */
739int journal_client_unregister(cls_method_context_t hctx, bufferlist *in,
740 bufferlist *out) {
741 std::string id;
742 try {
11fdf7f2
TL
743 auto iter = in->cbegin();
744 decode(id, iter);
f67539c2 745 } catch (const ceph::buffer::error &err) {
7c673cae
FG
746 CLS_ERR("failed to decode input parameters: %s", err.what());
747 return -EINVAL;
748 }
749
750 std::string key(key_from_client_id(id));
751 bufferlist bl;
752 int r = cls_cxx_map_get_val(hctx, key, &bl);
753 if (r < 0) {
754 CLS_ERR("client is not registered: %s", id.c_str());
755 return r;
756 }
757
758 r = cls_cxx_map_remove_key(hctx, key);
759 if (r < 0) {
760 CLS_ERR("failed to remove omap key: %s", key.c_str());
761 return r;
762 }
763
764 // prune expired tags
765 r = expire_tags(hctx, &id);
766 if (r < 0) {
767 return r;
768 }
769 return 0;
770}
771
772/**
773 * Input:
774 * @param client_id (uint64_t) - unique client id
775 * @param commit_position (ObjectSetPosition)
776 *
777 * Output:
778 * @returns 0 on success, negative error code on failure
779 */
780int journal_client_commit(cls_method_context_t hctx, bufferlist *in,
781 bufferlist *out) {
782 std::string id;
783 cls::journal::ObjectSetPosition commit_position;
784 try {
11fdf7f2
TL
785 auto iter = in->cbegin();
786 decode(id, iter);
787 decode(commit_position, iter);
f67539c2 788 } catch (const ceph::buffer::error &err) {
7c673cae
FG
789 CLS_ERR("failed to decode input parameters: %s", err.what());
790 return -EINVAL;
791 }
792
793 uint8_t splay_width;
794 int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width);
795 if (r < 0) {
796 return r;
797 }
798 if (commit_position.object_positions.size() > splay_width) {
799 CLS_ERR("too many object positions");
800 return -EINVAL;
801 }
802
803 std::string key(key_from_client_id(id));
804 cls::journal::Client client;
805 r = read_key(hctx, key, &client);
806 if (r < 0) {
807 return r;
808 }
809
810 if (client.commit_position == commit_position) {
811 return 0;
812 }
813
814 client.commit_position = commit_position;
815 r = write_key(hctx, key, client);
816 if (r < 0) {
817 return r;
818 }
819 return 0;
820}
821
822/**
823 * Input:
824 * @param start_after (string)
825 * @param max_return (uint64_t)
826 *
827 * Output:
828 * clients (set<cls::journal::Client>) - collection of registered clients
829 * @returns 0 on success, negative error code on failure
830 */
831int journal_client_list(cls_method_context_t hctx, bufferlist *in,
832 bufferlist *out) {
833 std::string start_after;
834 uint64_t max_return;
835 try {
11fdf7f2
TL
836 auto iter = in->cbegin();
837 decode(start_after, iter);
838 decode(max_return, iter);
f67539c2 839 } catch (const ceph::buffer::error &err) {
7c673cae
FG
840 CLS_ERR("failed to decode input parameters: %s", err.what());
841 return -EINVAL;
842 }
843
844 std::set<cls::journal::Client> clients;
845 int r = get_client_list_range(hctx, &clients, start_after, max_return);
846 if (r < 0)
847 return r;
848
11fdf7f2 849 encode(clients, *out);
7c673cae
FG
850 return 0;
851}
852
853/**
854 * Input:
855 * none
856 *
857 * Output:
858 * @returns 0 on success, negative error code on failure
859 */
860int journal_get_next_tag_tid(cls_method_context_t hctx, bufferlist *in,
861 bufferlist *out) {
862 uint64_t tag_tid;
863 int r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &tag_tid);
864 if (r < 0) {
865 return r;
866 }
867
11fdf7f2 868 encode(tag_tid, *out);
7c673cae
FG
869 return 0;
870}
871
872/**
873 * Input:
874 * @param tag_tid (uint64_t)
875 *
876 * Output:
877 * cls::journal::Tag
878 * @returns 0 on success, negative error code on failure
879 */
880int journal_get_tag(cls_method_context_t hctx, bufferlist *in,
881 bufferlist *out) {
882 uint64_t tag_tid;
883 try {
11fdf7f2
TL
884 auto iter = in->cbegin();
885 decode(tag_tid, iter);
f67539c2 886 } catch (const ceph::buffer::error &err) {
7c673cae
FG
887 CLS_ERR("failed to decode input parameters: %s", err.what());
888 return -EINVAL;
889 }
890
891 std::string key(key_from_tag_tid(tag_tid));
892 cls::journal::Tag tag;
893 int r = read_key(hctx, key, &tag);
894 if (r < 0) {
895 return r;
896 }
897
11fdf7f2 898 encode(tag, *out);
7c673cae
FG
899 return 0;
900}
901
902/**
903 * Input:
904 * @param tag_tid (uint64_t)
905 * @param tag_class (uint64_t)
906 * @param data (bufferlist)
907 *
908 * Output:
909 * @returns 0 on success, negative error code on failure
910 */
911int journal_tag_create(cls_method_context_t hctx, bufferlist *in,
912 bufferlist *out) {
913 uint64_t tag_tid;
914 uint64_t tag_class;
915 bufferlist data;
916 try {
11fdf7f2
TL
917 auto iter = in->cbegin();
918 decode(tag_tid, iter);
919 decode(tag_class, iter);
920 decode(data, iter);
f67539c2 921 } catch (const ceph::buffer::error &err) {
7c673cae
FG
922 CLS_ERR("failed to decode input parameters: %s", err.what());
923 return -EINVAL;
924 }
925
926 std::string key(key_from_tag_tid(tag_tid));
927 bufferlist stored_tag_bl;
928 int r = cls_cxx_map_get_val(hctx, key, &stored_tag_bl);
929 if (r >= 0) {
930 CLS_ERR("duplicate tag id: %" PRIu64, tag_tid);
931 return -EEXIST;
932 } else if (r != -ENOENT) {
933 return r;
934 }
935
936 // verify tag tid ordering
937 uint64_t next_tag_tid;
938 r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &next_tag_tid);
939 if (r < 0) {
940 return r;
941 }
942 if (tag_tid != next_tag_tid) {
943 CLS_LOG(5, "out-of-order tag sequence: %" PRIu64, tag_tid);
944 return -ESTALE;
945 }
946
947 uint64_t next_tag_class;
948 r = read_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, &next_tag_class);
949 if (r < 0) {
950 return r;
951 }
952
953 if (tag_class == cls::journal::Tag::TAG_CLASS_NEW) {
954 // allocate a new tag class
955 tag_class = next_tag_class;
956 r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_class + 1);
957 if (r < 0) {
958 return r;
959 }
960 } else {
961 // verify tag class range
962 if (tag_class >= next_tag_class) {
963 CLS_ERR("out-of-sequence tag class: %" PRIu64, tag_class);
964 return -EINVAL;
965 }
966 }
967
968 // prune expired tags
969 r = expire_tags(hctx, nullptr);
970 if (r < 0) {
971 return r;
972 }
973
974 // update tag tid sequence
975 r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_tid + 1);
976 if (r < 0) {
977 return r;
978 }
979
980 // write tag structure
981 cls::journal::Tag tag(tag_tid, tag_class, data);
982 key = key_from_tag_tid(tag_tid);
983 r = write_key(hctx, key, tag);
984 if (r < 0) {
985 return r;
986 }
987 return 0;
988}
989
990/**
991 * Input:
992 * @param start_after_tag_tid (uint64_t) - first tag tid
993 * @param max_return (uint64_t) - max tags to return
994 * @param client_id (std::string) - client id filter
995 * @param tag_class (boost::optional<uint64_t> - optional tag class filter
996 *
997 * Output:
998 * std::set<cls::journal::Tag> - collection of tags
999 * @returns 0 on success, negative error code on failure
1000 */
1001int journal_tag_list(cls_method_context_t hctx, bufferlist *in,
1002 bufferlist *out) {
1003 uint64_t start_after_tag_tid;
1004 uint64_t max_return;
1005 std::string client_id;
1006 boost::optional<uint64_t> tag_class(0);
1007
1008 // handle compiler false positive about use-before-init
1009 tag_class = boost::none;
1010 try {
11fdf7f2
TL
1011 auto iter = in->cbegin();
1012 decode(start_after_tag_tid, iter);
1013 decode(max_return, iter);
1014 decode(client_id, iter);
1015 decode(tag_class, iter);
f67539c2 1016 } catch (const ceph::buffer::error &err) {
7c673cae
FG
1017 CLS_ERR("failed to decode input parameters: %s", err.what());
1018 return -EINVAL;
1019 }
1020
1021 // calculate the minimum tag within client's commit position
1022 uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
1023 cls::journal::Client client;
1024 int r = read_key(hctx, key_from_client_id(client_id), &client);
1025 if (r < 0) {
1026 return r;
1027 }
1028
1029 for (auto object_position : client.commit_position.object_positions) {
11fdf7f2 1030 minimum_tag_tid = std::min(minimum_tag_tid, object_position.tag_tid);
7c673cae
FG
1031 }
1032
1033 // compute minimum tags in use per-class
1034 std::set<cls::journal::Tag> tags;
1035 std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
1036 typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
1037 TAG_PASS_LIST,
1038 TAG_PASS_DONE } TagPass;
1039 int tag_pass = (minimum_tag_tid == std::numeric_limits<uint64_t>::max() ?
1040 TAG_PASS_LIST : TAG_PASS_CALCULATE_MINIMUMS);
1041 std::string last_read = HEADER_KEY_TAG_PREFIX;
1042 do {
1043 std::map<std::string, bufferlist> vals;
c07f9fc5 1044 bool more;
7c673cae 1045 r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
c07f9fc5 1046 MAX_KEYS_READ, &vals, &more);
7c673cae
FG
1047 if (r < 0 && r != -ENOENT) {
1048 CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
1049 return r;
1050 }
1051
1052 for (auto &val : vals) {
1053 cls::journal::Tag tag;
11fdf7f2 1054 auto iter = val.second.cbegin();
7c673cae 1055 try {
11fdf7f2 1056 decode(tag, iter);
f67539c2 1057 } catch (const ceph::buffer::error &err) {
7c673cae
FG
1058 CLS_ERR("error decoding tag: %s", val.first.c_str());
1059 return -EIO;
1060 }
1061
1062 if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
1063 minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
1064
1065 // completed calculation of tag class minimums
1066 if (tag.tid >= minimum_tag_tid) {
1067 vals.clear();
3efd9988 1068 more = false;
7c673cae
FG
1069 break;
1070 }
1071 } else if (tag_pass == TAG_PASS_LIST) {
1072 if (start_after_tag_tid != 0 && tag.tid <= start_after_tag_tid) {
1073 continue;
1074 }
1075
1076 if (tag.tid >= minimum_tag_class_to_tids[tag.tag_class] &&
1077 (!tag_class || *tag_class == tag.tag_class)) {
1078 tags.insert(tag);
1079 }
1080 if (tags.size() >= max_return) {
1081 tag_pass = TAG_PASS_DONE;
1082 }
1083 }
1084 }
1085
c07f9fc5 1086 if (tag_pass != TAG_PASS_DONE && !more) {
7c673cae
FG
1087 last_read = HEADER_KEY_TAG_PREFIX;
1088 ++tag_pass;
1089 } else if (!vals.empty()) {
1090 last_read = vals.rbegin()->first;
1091 }
1092 } while (tag_pass != TAG_PASS_DONE);
1093
11fdf7f2 1094 encode(tags, *out);
7c673cae
FG
1095 return 0;
1096}
1097
1098/**
1099 * Input:
1100 * @param soft_max_size (uint64_t)
1101 *
1102 * Output:
1103 * @returns 0 if object size less than max, negative error code otherwise
1104 */
1105int journal_object_guard_append(cls_method_context_t hctx, bufferlist *in,
1106 bufferlist *out) {
1107 uint64_t soft_max_size;
1108 try {
11fdf7f2
TL
1109 auto iter = in->cbegin();
1110 decode(soft_max_size, iter);
f67539c2 1111 } catch (const ceph::buffer::error &err) {
7c673cae
FG
1112 CLS_ERR("failed to decode input parameters: %s", err.what());
1113 return -EINVAL;
1114 }
1115
1116 uint64_t size;
1117 time_t mtime;
1118 int r = cls_cxx_stat(hctx, &size, &mtime);
1119 if (r == -ENOENT) {
1120 return 0;
1121 } else if (r < 0) {
1122 CLS_ERR("failed to stat object: %s", cpp_strerror(r).c_str());
1123 return r;
1124 }
1125
1126 if (size >= soft_max_size) {
1127 CLS_LOG(5, "journal object full: %" PRIu64 " >= %" PRIu64,
1128 size, soft_max_size);
1129 return -EOVERFLOW;
1130 }
1131 return 0;
1132}
1133
9f95a23c
TL
1134/**
1135 * Input:
1136 * @param soft_max_size (uint64_t)
1137 * @param data (bufferlist) data to append
1138 *
1139 * Output:
1140 * @returns 0 on success, negative error code on failure
1141 * @returns -EOVERFLOW if object size is equal or more than soft_max_size
1142 */
1143int journal_object_append(cls_method_context_t hctx, bufferlist *in,
1144 bufferlist *out) {
1145 uint64_t soft_max_size;
1146 bufferlist data;
1147 try {
1148 auto iter = in->cbegin();
1149 decode(soft_max_size, iter);
1150 decode(data, iter);
f67539c2 1151 } catch (const ceph::buffer::error &err) {
9f95a23c
TL
1152 CLS_ERR("failed to decode input parameters: %s", err.what());
1153 return -EINVAL;
1154 }
1155
1156 uint64_t size = 0;
1157 int r = cls_cxx_stat(hctx, &size, nullptr);
1158 if (r < 0 && r != -ENOENT) {
1159 CLS_ERR("append: failed to stat object: %s", cpp_strerror(r).c_str());
1160 return r;
1161 }
1162
1163 if (size >= soft_max_size) {
1164 CLS_LOG(5, "journal object full: %" PRIu64 " >= %" PRIu64,
1165 size, soft_max_size);
1166 return -EOVERFLOW;
1167 }
1168
1169 auto offset = size;
1170 r = cls_cxx_write2(hctx, offset, data.length(), &data,
1171 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1172 if (r < 0) {
1173 CLS_ERR("append: error when writing: %s", cpp_strerror(r).c_str());
1174 return r;
1175 }
1176
1177 if (cls_get_min_compatible_client(hctx) < ceph_release_t::octopus) {
1178 return 0;
1179 }
1180
1181 auto min_alloc_size = cls_get_osd_min_alloc_size(hctx);
1182 if (min_alloc_size == 0) {
1183 min_alloc_size = 8;
1184 }
1185
f67539c2
TL
1186 auto stripe_width = cls_get_pool_stripe_width(hctx);
1187 if (stripe_width > 0) {
1188 min_alloc_size = round_up_to(min_alloc_size, stripe_width);
1189 }
1190
9f95a23c
TL
1191 CLS_LOG(20, "pad to %" PRIu64, min_alloc_size);
1192
1193 auto end = offset + data.length();
1194 auto new_end = round_up_to(end, min_alloc_size);
1195 if (new_end == end) {
1196 return 0;
1197 }
1198
1199 r = cls_cxx_truncate(hctx, new_end);
1200 if (r < 0) {
1201 CLS_ERR("append: error when truncating: %s", cpp_strerror(r).c_str());
1202 return r;
1203 }
1204
1205 return 0;
1206}
1207
7c673cae
FG
1208CLS_INIT(journal)
1209{
1210 CLS_LOG(20, "Loaded journal class!");
1211
1212 cls_handle_t h_class;
1213 cls_method_handle_t h_journal_create;
1214 cls_method_handle_t h_journal_get_order;
1215 cls_method_handle_t h_journal_get_splay_width;
1216 cls_method_handle_t h_journal_get_pool_id;
1217 cls_method_handle_t h_journal_get_minimum_set;
1218 cls_method_handle_t h_journal_set_minimum_set;
1219 cls_method_handle_t h_journal_get_active_set;
1220 cls_method_handle_t h_journal_set_active_set;
1221 cls_method_handle_t h_journal_get_client;
1222 cls_method_handle_t h_journal_client_register;
1223 cls_method_handle_t h_journal_client_update_data;
1224 cls_method_handle_t h_journal_client_update_state;
1225 cls_method_handle_t h_journal_client_unregister;
1226 cls_method_handle_t h_journal_client_commit;
1227 cls_method_handle_t h_journal_client_list;
1228 cls_method_handle_t h_journal_get_next_tag_tid;
1229 cls_method_handle_t h_journal_get_tag;
1230 cls_method_handle_t h_journal_tag_create;
1231 cls_method_handle_t h_journal_tag_list;
1232 cls_method_handle_t h_journal_object_guard_append;
9f95a23c 1233 cls_method_handle_t h_journal_object_append;
7c673cae
FG
1234
1235 cls_register("journal", &h_class);
1236
1237 /// methods for journal.$journal_id objects
1238 cls_register_cxx_method(h_class, "create",
1239 CLS_METHOD_RD | CLS_METHOD_WR,
1240 journal_create, &h_journal_create);
1241 cls_register_cxx_method(h_class, "get_order",
1242 CLS_METHOD_RD,
1243 journal_get_order, &h_journal_get_order);
1244 cls_register_cxx_method(h_class, "get_splay_width",
1245 CLS_METHOD_RD,
1246 journal_get_splay_width, &h_journal_get_splay_width);
1247 cls_register_cxx_method(h_class, "get_pool_id",
1248 CLS_METHOD_RD,
1249 journal_get_pool_id, &h_journal_get_pool_id);
1250 cls_register_cxx_method(h_class, "get_minimum_set",
1251 CLS_METHOD_RD,
1252 journal_get_minimum_set,
1253 &h_journal_get_minimum_set);
1254 cls_register_cxx_method(h_class, "set_minimum_set",
1255 CLS_METHOD_RD | CLS_METHOD_WR,
1256 journal_set_minimum_set,
1257 &h_journal_set_minimum_set);
1258 cls_register_cxx_method(h_class, "get_active_set",
1259 CLS_METHOD_RD,
1260 journal_get_active_set,
1261 &h_journal_get_active_set);
1262 cls_register_cxx_method(h_class, "set_active_set",
1263 CLS_METHOD_RD | CLS_METHOD_WR,
1264 journal_set_active_set,
1265 &h_journal_set_active_set);
1266
1267 cls_register_cxx_method(h_class, "get_client",
1268 CLS_METHOD_RD,
1269 journal_get_client, &h_journal_get_client);
1270 cls_register_cxx_method(h_class, "client_register",
1271 CLS_METHOD_RD | CLS_METHOD_WR,
1272 journal_client_register, &h_journal_client_register);
1273 cls_register_cxx_method(h_class, "client_update_data",
1274 CLS_METHOD_RD | CLS_METHOD_WR,
1275 journal_client_update_data,
1276 &h_journal_client_update_data);
1277 cls_register_cxx_method(h_class, "client_update_state",
1278 CLS_METHOD_RD | CLS_METHOD_WR,
1279 journal_client_update_state,
1280 &h_journal_client_update_state);
1281 cls_register_cxx_method(h_class, "client_unregister",
1282 CLS_METHOD_RD | CLS_METHOD_WR,
1283 journal_client_unregister,
1284 &h_journal_client_unregister);
1285 cls_register_cxx_method(h_class, "client_commit",
1286 CLS_METHOD_RD | CLS_METHOD_WR,
1287 journal_client_commit, &h_journal_client_commit);
1288 cls_register_cxx_method(h_class, "client_list",
1289 CLS_METHOD_RD,
1290 journal_client_list, &h_journal_client_list);
1291
1292 cls_register_cxx_method(h_class, "get_next_tag_tid",
1293 CLS_METHOD_RD,
1294 journal_get_next_tag_tid,
1295 &h_journal_get_next_tag_tid);
1296 cls_register_cxx_method(h_class, "get_tag",
1297 CLS_METHOD_RD,
1298 journal_get_tag, &h_journal_get_tag);
1299 cls_register_cxx_method(h_class, "tag_create",
1300 CLS_METHOD_RD | CLS_METHOD_WR,
1301 journal_tag_create, &h_journal_tag_create);
1302 cls_register_cxx_method(h_class, "tag_list",
1303 CLS_METHOD_RD,
1304 journal_tag_list, &h_journal_tag_list);
1305
1306 /// methods for journal_data.$journal_id.$object_id objects
1307 cls_register_cxx_method(h_class, "guard_append",
1308 CLS_METHOD_RD | CLS_METHOD_WR,
1309 journal_object_guard_append,
1310 &h_journal_object_guard_append);
9f95a23c
TL
1311 cls_register_cxx_method(h_class, "append", CLS_METHOD_RD | CLS_METHOD_WR,
1312 journal_object_append, &h_journal_object_append);
7c673cae 1313}