]> git.proxmox.com Git - ceph.git/blame - ceph/src/cls/journal/cls_journal.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / cls / journal / cls_journal.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "include/int_types.h"
5#include "include/buffer.h"
6#include "include/encoding.h"
7#include "common/errno.h"
8#include "objclass/objclass.h"
9#include "cls/journal/cls_journal_types.h"
10#include <errno.h>
11#include <map>
12#include <string>
13#include <sstream>
14
15CLS_VER(1, 0)
16CLS_NAME(journal)
17
f67539c2
TL
18using std::string;
19
20using ceph::bufferlist;
21using ceph::decode;
22using ceph::encode;
23
7c673cae
FG
24namespace {
25
26static const uint64_t MAX_KEYS_READ = 64;
27
28static const std::string HEADER_KEY_ORDER = "order";
29static const std::string HEADER_KEY_SPLAY_WIDTH = "splay_width";
30static const std::string HEADER_KEY_POOL_ID = "pool_id";
31static const std::string HEADER_KEY_MINIMUM_SET = "minimum_set";
32static const std::string HEADER_KEY_ACTIVE_SET = "active_set";
33static const std::string HEADER_KEY_NEXT_TAG_TID = "next_tag_tid";
34static const std::string HEADER_KEY_NEXT_TAG_CLASS = "next_tag_class";
35static const std::string HEADER_KEY_CLIENT_PREFIX = "client_";
36static const std::string HEADER_KEY_TAG_PREFIX = "tag_";
37
38std::string to_hex(uint64_t value) {
39 std::ostringstream oss;
40 oss << std::setw(16) << std::setfill('0') << std::hex << value;
41 return oss.str();
42}
43
44std::string key_from_client_id(const std::string &client_id) {
45 return HEADER_KEY_CLIENT_PREFIX + client_id;
46}
47
48std::string key_from_tag_tid(uint64_t tag_tid) {
49 return HEADER_KEY_TAG_PREFIX + to_hex(tag_tid);
50}
51
52uint64_t tag_tid_from_key(const std::string &key) {
53 std::istringstream iss(key);
54 uint64_t id;
55 iss.ignore(HEADER_KEY_TAG_PREFIX.size()) >> std::hex >> id;
56 return id;
57}
58
59template <typename T>
60int read_key(cls_method_context_t hctx, const string &key, T *t,
61 bool ignore_enoent = false) {
62 bufferlist bl;
63 int r = cls_cxx_map_get_val(hctx, key, &bl);
494da23a
TL
64 if (r == -ENOENT) {
65 if (ignore_enoent) {
66 r = 0;
67 }
68 return r;
7c673cae
FG
69 } else if (r < 0) {
70 CLS_ERR("failed to get omap key: %s", key.c_str());
71 return r;
72 }
73
74 try {
11fdf7f2
TL
75 auto iter = bl.cbegin();
76 decode(*t, iter);
f67539c2 77 } catch (const ceph::buffer::error &err) {
7c673cae
FG
78 CLS_ERR("failed to decode input parameters: %s", err.what());
79 return -EINVAL;
80 }
81 return 0;
82}
83
84template <typename T>
85int write_key(cls_method_context_t hctx, const string &key, const T &t) {
86 bufferlist bl;
11fdf7f2 87 encode(t, bl);
7c673cae
FG
88
89 int r = cls_cxx_map_set_val(hctx, key, &bl);
90 if (r < 0) {
91 CLS_ERR("failed to set omap key: %s", key.c_str());
92 return r;
93 }
94 return 0;
95}
96
97int remove_key(cls_method_context_t hctx, const string &key) {
98 int r = cls_cxx_map_remove_key(hctx, key);
99 if (r < 0 && r != -ENOENT) {
100 CLS_ERR("failed to remove key: %s", key.c_str());
101 return r;
102 }
103 return 0;
104}
105
106int expire_tags(cls_method_context_t hctx, const std::string *skip_client_id) {
107
108 std::string skip_client_key;
109 if (skip_client_id != nullptr) {
110 skip_client_key = key_from_client_id(*skip_client_id);
111 }
112
7c673cae 113 uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
b32b8144 114 std::string last_read = "";
c07f9fc5 115 bool more;
7c673cae
FG
116 do {
117 std::map<std::string, bufferlist> vals;
c07f9fc5
FG
118 int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
119 MAX_KEYS_READ, &vals, &more);
7c673cae
FG
120 if (r < 0 && r != -ENOENT) {
121 CLS_ERR("failed to retrieve registered clients: %s",
122 cpp_strerror(r).c_str());
123 return r;
124 }
125
126 for (auto &val : vals) {
127 // if we are removing a client, skip its commit positions
128 if (val.first == skip_client_key) {
129 continue;
130 }
131
132 cls::journal::Client client;
11fdf7f2 133 auto iter = val.second.cbegin();
7c673cae 134 try {
11fdf7f2 135 decode(client, iter);
f67539c2 136 } catch (const ceph::buffer::error &err) {
7c673cae
FG
137 CLS_ERR("error decoding registered client: %s",
138 val.first.c_str());
139 return -EIO;
140 }
141
b32b8144
FG
142 if (client.state == cls::journal::CLIENT_STATE_DISCONNECTED) {
143 // don't allow a disconnected client to prevent pruning
144 continue;
145 } else if (client.commit_position.object_positions.empty()) {
146 // cannot prune if one or more clients has an empty commit history
147 return 0;
148 }
149
7c673cae 150 for (auto object_position : client.commit_position.object_positions) {
11fdf7f2 151 minimum_tag_tid = std::min(minimum_tag_tid, object_position.tag_tid);
7c673cae
FG
152 }
153 }
154 if (!vals.empty()) {
155 last_read = vals.rbegin()->first;
156 }
c07f9fc5 157 } while (more);
7c673cae
FG
158
159 // cannot expire tags if a client hasn't committed yet
160 if (minimum_tag_tid == std::numeric_limits<uint64_t>::max()) {
161 return 0;
162 }
163
164 // compute the minimum in-use tag for each class
165 std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
166 typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
167 TAG_PASS_SCRUB,
168 TAG_PASS_DONE } TagPass;
169 int tag_pass = TAG_PASS_CALCULATE_MINIMUMS;
170 last_read = HEADER_KEY_TAG_PREFIX;
171 do {
172 std::map<std::string, bufferlist> vals;
c07f9fc5
FG
173 int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
174 MAX_KEYS_READ, &vals, &more);
7c673cae
FG
175 if (r < 0 && r != -ENOENT) {
176 CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
177 return r;
178 }
179
180 for (auto &val : vals) {
181 cls::journal::Tag tag;
11fdf7f2 182 auto iter = val.second.cbegin();
7c673cae 183 try {
11fdf7f2 184 decode(tag, iter);
f67539c2 185 } catch (const ceph::buffer::error &err) {
7c673cae
FG
186 CLS_ERR("error decoding tag: %s", val.first.c_str());
187 return -EIO;
188 }
189
190 if (tag.tid != tag_tid_from_key(val.first)) {
191 CLS_ERR("tag tid mismatched: %s", val.first.c_str());
192 return -EINVAL;
193 }
194
195 if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
196 minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
197 } else if (tag_pass == TAG_PASS_SCRUB &&
198 tag.tid < minimum_tag_class_to_tids[tag.tag_class]) {
199 r = remove_key(hctx, val.first);
200 if (r < 0) {
201 return r;
202 }
203 }
204
205 if (tag.tid >= minimum_tag_tid) {
206 // no need to check for tag classes beyond this point
207 vals.clear();
3efd9988 208 more = false;
7c673cae
FG
209 break;
210 }
211 }
212
c07f9fc5 213 if (tag_pass != TAG_PASS_DONE && !more) {
7c673cae
FG
214 last_read = HEADER_KEY_TAG_PREFIX;
215 ++tag_pass;
216 } else if (!vals.empty()) {
217 last_read = vals.rbegin()->first;
218 }
219 } while (tag_pass != TAG_PASS_DONE);
220 return 0;
221}
222
223int get_client_list_range(cls_method_context_t hctx,
224 std::set<cls::journal::Client> *clients,
225 std::string start_after, uint64_t max_return) {
226 std::string last_read;
227 if (!start_after.empty()) {
228 last_read = key_from_client_id(start_after);
229 }
230
231 std::map<std::string, bufferlist> vals;
c07f9fc5 232 bool more;
7c673cae 233 int r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_CLIENT_PREFIX,
c07f9fc5 234 max_return, &vals, &more);
7c673cae
FG
235 if (r < 0) {
236 CLS_ERR("failed to retrieve omap values: %s", cpp_strerror(r).c_str());
237 return r;
238 }
239
240 for (std::map<std::string, bufferlist>::iterator it = vals.begin();
241 it != vals.end(); ++it) {
242 try {
11fdf7f2 243 auto iter = it->second.cbegin();
7c673cae
FG
244
245 cls::journal::Client client;
11fdf7f2 246 decode(client, iter);
7c673cae 247 clients->insert(client);
f67539c2 248 } catch (const ceph::buffer::error &err) {
7c673cae
FG
249 CLS_ERR("could not decode client '%s': %s", it->first.c_str(),
250 err.what());
251 return -EIO;
252 }
253 }
254
255 return 0;
256}
257
258int find_min_commit_position(cls_method_context_t hctx,
259 cls::journal::ObjectSetPosition *minset) {
260 int r;
261 bool valid = false;
262 std::string start_after = "";
263 uint64_t tag_tid = 0, entry_tid = 0;
264
265 while (true) {
266 std::set<cls::journal::Client> batch;
267
268 r = get_client_list_range(hctx, &batch, start_after, cls::journal::JOURNAL_MAX_RETURN);
269 if ((r < 0) || batch.empty()) {
270 break;
271 }
272
273 start_after = batch.rbegin()->id;
7c673cae 274 // update the (minimum) commit position from this batch of clients
20effc67
TL
275 for (const auto &client : batch) {
276 if (client.state == cls::journal::CLIENT_STATE_DISCONNECTED) {
277 continue;
278 }
279 const auto &object_set_position = client.commit_position;
7c673cae
FG
280 if (object_set_position.object_positions.empty()) {
281 *minset = cls::journal::ObjectSetPosition();
282 break;
283 }
284 cls::journal::ObjectPosition first = object_set_position.object_positions.front();
285
286 // least tag_tid (or least entry_tid for matching tag_tid)
287 if (!valid || (tag_tid > first.tag_tid) || ((tag_tid == first.tag_tid) && (entry_tid > first.entry_tid))) {
288 tag_tid = first.tag_tid;
289 entry_tid = first.entry_tid;
290 *minset = cls::journal::ObjectSetPosition(object_set_position);
291 valid = true;
292 }
293 }
294
295 // got the last batch, we're done
296 if (batch.size() < cls::journal::JOURNAL_MAX_RETURN) {
297 break;
298 }
299 }
300
301 return r;
302}
303
304} // anonymous namespace
305
306/**
307 * Input:
308 * @param order (uint8_t) - bits to shift to compute the object max size
309 * @param splay width (uint8_t) - number of active journal objects
310 *
311 * Output:
312 * @returns 0 on success, negative error code on failure
313 */
314int journal_create(cls_method_context_t hctx, bufferlist *in, bufferlist *out) {
315 uint8_t order;
316 uint8_t splay_width;
317 int64_t pool_id;
318 try {
11fdf7f2
TL
319 auto iter = in->cbegin();
320 decode(order, iter);
321 decode(splay_width, iter);
322 decode(pool_id, iter);
f67539c2 323 } catch (const ceph::buffer::error &err) {
7c673cae
FG
324 CLS_ERR("failed to decode input parameters: %s", err.what());
325 return -EINVAL;
326 }
327
328 bufferlist stored_orderbl;
329 int r = cls_cxx_map_get_val(hctx, HEADER_KEY_ORDER, &stored_orderbl);
330 if (r >= 0) {
331 CLS_ERR("journal already exists");
332 return -EEXIST;
333 } else if (r != -ENOENT) {
334 return r;
335 }
336
337 r = write_key(hctx, HEADER_KEY_ORDER, order);
338 if (r < 0) {
339 return r;
340 }
341
342 r = write_key(hctx, HEADER_KEY_SPLAY_WIDTH, splay_width);
343 if (r < 0) {
344 return r;
345 }
346
347 r = write_key(hctx, HEADER_KEY_POOL_ID, pool_id);
348 if (r < 0) {
349 return r;
350 }
351
352 uint64_t object_set = 0;
353 r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set);
354 if (r < 0) {
355 return r;
356 }
357
358 r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set);
359 if (r < 0) {
360 return r;
361 }
362
363 uint64_t tag_id = 0;
364 r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_id);
365 if (r < 0) {
366 return r;
367 }
368
369 r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_id);
370 if (r < 0) {
371 return r;
372 }
373 return 0;
374}
375
376/**
377 * Input:
378 * none
379 *
380 * Output:
381 * order (uint8_t)
382 * @returns 0 on success, negative error code on failure
383 */
384int journal_get_order(cls_method_context_t hctx, bufferlist *in,
385 bufferlist *out) {
386 uint8_t order;
387 int r = read_key(hctx, HEADER_KEY_ORDER, &order);
388 if (r < 0) {
389 return r;
390 }
391
11fdf7f2 392 encode(order, *out);
7c673cae
FG
393 return 0;
394}
395
396/**
397 * Input:
398 * none
399 *
400 * Output:
11fdf7f2 401 * splay_width (uint8_t)
7c673cae
FG
402 * @returns 0 on success, negative error code on failure
403 */
404int journal_get_splay_width(cls_method_context_t hctx, bufferlist *in,
405 bufferlist *out) {
406 uint8_t splay_width;
407 int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width);
408 if (r < 0) {
409 return r;
410 }
411
11fdf7f2 412 encode(splay_width, *out);
7c673cae
FG
413 return 0;
414}
415
416/**
417 * Input:
418 * none
419 *
420 * Output:
421 * pool_id (int64_t)
422 * @returns 0 on success, negative error code on failure
423 */
424int journal_get_pool_id(cls_method_context_t hctx, bufferlist *in,
425 bufferlist *out) {
11fdf7f2 426 int64_t pool_id = 0;
7c673cae
FG
427 int r = read_key(hctx, HEADER_KEY_POOL_ID, &pool_id);
428 if (r < 0) {
429 return r;
430 }
431
11fdf7f2 432 encode(pool_id, *out);
7c673cae
FG
433 return 0;
434}
435
436/**
437 * Input:
438 * none
439 *
440 * Output:
441 * object set (uint64_t)
442 * @returns 0 on success, negative error code on failure
443 */
444int journal_get_minimum_set(cls_method_context_t hctx, bufferlist *in,
445 bufferlist *out) {
446 uint64_t minimum_set;
447 int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &minimum_set);
448 if (r < 0) {
449 return r;
450 }
451
11fdf7f2 452 encode(minimum_set, *out);
7c673cae
FG
453 return 0;
454}
455
456/**
457 * Input:
458 * @param object set (uint64_t)
459 *
460 * Output:
461 * @returns 0 on success, negative error code on failure
462 */
463int journal_set_minimum_set(cls_method_context_t hctx, bufferlist *in,
464 bufferlist *out) {
465 uint64_t object_set;
466 try {
11fdf7f2
TL
467 auto iter = in->cbegin();
468 decode(object_set, iter);
f67539c2 469 } catch (const ceph::buffer::error &err) {
7c673cae
FG
470 CLS_ERR("failed to decode input parameters: %s", err.what());
471 return -EINVAL;
472 }
473
474 uint64_t current_active_set;
475 int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &current_active_set);
476 if (r < 0) {
477 return r;
478 }
479
480 if (current_active_set < object_set) {
494da23a 481 CLS_LOG(10, "active object set earlier than minimum: %" PRIu64
7c673cae
FG
482 " < %" PRIu64, current_active_set, object_set);
483 return -EINVAL;
484 }
485
486 uint64_t current_minimum_set;
487 r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &current_minimum_set);
488 if (r < 0) {
489 return r;
490 }
491
492 if (object_set == current_minimum_set) {
493 return 0;
494 } else if (object_set < current_minimum_set) {
495 CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64,
496 object_set, current_minimum_set);
497 return -ESTALE;
498 }
499
500 r = write_key(hctx, HEADER_KEY_MINIMUM_SET, object_set);
501 if (r < 0) {
502 return r;
503 }
504 return 0;
505}
506
507/**
508 * Input:
509 * none
510 *
511 * Output:
512 * object set (uint64_t)
513 * @returns 0 on success, negative error code on failure
514 */
515int journal_get_active_set(cls_method_context_t hctx, bufferlist *in,
516 bufferlist *out) {
517 uint64_t active_set;
518 int r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &active_set);
519 if (r < 0) {
520 return r;
521 }
522
11fdf7f2 523 encode(active_set, *out);
7c673cae
FG
524 return 0;
525}
526
527/**
528 * Input:
529 * @param object set (uint64_t)
530 *
531 * Output:
532 * @returns 0 on success, negative error code on failure
533 */
534int journal_set_active_set(cls_method_context_t hctx, bufferlist *in,
535 bufferlist *out) {
536 uint64_t object_set;
537 try {
11fdf7f2
TL
538 auto iter = in->cbegin();
539 decode(object_set, iter);
f67539c2 540 } catch (const ceph::buffer::error &err) {
7c673cae
FG
541 CLS_ERR("failed to decode input parameters: %s", err.what());
542 return -EINVAL;
543 }
544
545 uint64_t current_minimum_set;
546 int r = read_key(hctx, HEADER_KEY_MINIMUM_SET, &current_minimum_set);
547 if (r < 0) {
548 return r;
549 }
550
551 if (current_minimum_set > object_set) {
552 CLS_ERR("minimum object set later than active: %" PRIu64
553 " > %" PRIu64, current_minimum_set, object_set);
554 return -EINVAL;
555 }
556
557 uint64_t current_active_set;
558 r = read_key(hctx, HEADER_KEY_ACTIVE_SET, &current_active_set);
559 if (r < 0) {
560 return r;
561 }
562
563 if (object_set == current_active_set) {
564 return 0;
565 } else if (object_set < current_active_set) {
566 CLS_ERR("object number earlier than current object: %" PRIu64 " < %" PRIu64,
567 object_set, current_active_set);
568 return -ESTALE;
569 }
570
571 r = write_key(hctx, HEADER_KEY_ACTIVE_SET, object_set);
572 if (r < 0) {
573 return r;
574 }
575 return 0;
576}
577
578/**
579 * Input:
580 * @param id (string) - unique client id
581 *
582 * Output:
583 * cls::journal::Client
584 * @returns 0 on success, negative error code on failure
585 */
586int journal_get_client(cls_method_context_t hctx, bufferlist *in,
587 bufferlist *out) {
588 std::string id;
589 try {
11fdf7f2
TL
590 auto iter = in->cbegin();
591 decode(id, iter);
f67539c2 592 } catch (const ceph::buffer::error &err) {
7c673cae
FG
593 CLS_ERR("failed to decode input parameters: %s", err.what());
594 return -EINVAL;
595 }
596
597 std::string key(key_from_client_id(id));
598 cls::journal::Client client;
599 int r = read_key(hctx, key, &client);
600 if (r < 0) {
601 return r;
602 }
603
11fdf7f2 604 encode(client, *out);
7c673cae
FG
605 return 0;
606}
607
608/**
609 * Input:
610 * @param id (string) - unique client id
611 * @param data (bufferlist) - opaque data associated to client
612 *
613 * Output:
614 * @returns 0 on success, negative error code on failure
615 */
616int journal_client_register(cls_method_context_t hctx, bufferlist *in,
617 bufferlist *out) {
618 std::string id;
619 bufferlist data;
620 try {
11fdf7f2
TL
621 auto iter = in->cbegin();
622 decode(id, iter);
623 decode(data, iter);
f67539c2 624 } catch (const ceph::buffer::error &err) {
7c673cae
FG
625 CLS_ERR("failed to decode input parameters: %s", err.what());
626 return -EINVAL;
627 }
628
629 uint8_t order;
630 int r = read_key(hctx, HEADER_KEY_ORDER, &order);
631 if (r < 0) {
632 return r;
633 }
634
635 std::string key(key_from_client_id(id));
636 bufferlist stored_clientbl;
637 r = cls_cxx_map_get_val(hctx, key, &stored_clientbl);
638 if (r >= 0) {
639 CLS_ERR("duplicate client id: %s", id.c_str());
640 return -EEXIST;
641 } else if (r != -ENOENT) {
642 return r;
643 }
644
645 cls::journal::ObjectSetPosition minset;
646 r = find_min_commit_position(hctx, &minset);
647 if (r < 0)
648 return r;
649
650 cls::journal::Client client(id, data, minset);
651 r = write_key(hctx, key, client);
652 if (r < 0) {
653 return r;
654 }
655 return 0;
656}
657
658/**
659 * Input:
660 * @param id (string) - unique client id
661 * @param data (bufferlist) - opaque data associated to client
662 *
663 * Output:
664 * @returns 0 on success, negative error code on failure
665 */
666int journal_client_update_data(cls_method_context_t hctx, bufferlist *in,
667 bufferlist *out) {
668 std::string id;
669 bufferlist data;
670 try {
11fdf7f2
TL
671 auto iter = in->cbegin();
672 decode(id, iter);
673 decode(data, iter);
f67539c2 674 } catch (const ceph::buffer::error &err) {
7c673cae
FG
675 CLS_ERR("failed to decode input parameters: %s", err.what());
676 return -EINVAL;
677 }
678
679 std::string key(key_from_client_id(id));
680 cls::journal::Client client;
681 int r = read_key(hctx, key, &client);
682 if (r < 0) {
683 return r;
684 }
685
686 client.data = data;
687 r = write_key(hctx, key, client);
688 if (r < 0) {
689 return r;
690 }
691 return 0;
692}
693
694/**
695 * Input:
696 * @param id (string) - unique client id
697 * @param state (uint8_t) - client state
698 *
699 * Output:
700 * @returns 0 on success, negative error code on failure
701 */
702int journal_client_update_state(cls_method_context_t hctx, bufferlist *in,
703 bufferlist *out) {
704 std::string id;
705 cls::journal::ClientState state;
706 bufferlist data;
707 try {
11fdf7f2
TL
708 auto iter = in->cbegin();
709 decode(id, iter);
7c673cae 710 uint8_t state_raw;
11fdf7f2 711 decode(state_raw, iter);
7c673cae 712 state = static_cast<cls::journal::ClientState>(state_raw);
f67539c2 713 } catch (const ceph::buffer::error &err) {
7c673cae
FG
714 CLS_ERR("failed to decode input parameters: %s", err.what());
715 return -EINVAL;
716 }
717
718 std::string key(key_from_client_id(id));
719 cls::journal::Client client;
720 int r = read_key(hctx, key, &client);
721 if (r < 0) {
722 return r;
723 }
724
725 client.state = state;
726 r = write_key(hctx, key, client);
727 if (r < 0) {
728 return r;
729 }
730 return 0;
731}
732
733/**
734 * Input:
735 * @param id (string) - unique client id
736 *
737 * Output:
738 * @returns 0 on success, negative error code on failure
739 */
740int journal_client_unregister(cls_method_context_t hctx, bufferlist *in,
741 bufferlist *out) {
742 std::string id;
743 try {
11fdf7f2
TL
744 auto iter = in->cbegin();
745 decode(id, iter);
f67539c2 746 } catch (const ceph::buffer::error &err) {
7c673cae
FG
747 CLS_ERR("failed to decode input parameters: %s", err.what());
748 return -EINVAL;
749 }
750
751 std::string key(key_from_client_id(id));
752 bufferlist bl;
753 int r = cls_cxx_map_get_val(hctx, key, &bl);
754 if (r < 0) {
755 CLS_ERR("client is not registered: %s", id.c_str());
756 return r;
757 }
758
759 r = cls_cxx_map_remove_key(hctx, key);
760 if (r < 0) {
761 CLS_ERR("failed to remove omap key: %s", key.c_str());
762 return r;
763 }
764
765 // prune expired tags
766 r = expire_tags(hctx, &id);
767 if (r < 0) {
768 return r;
769 }
770 return 0;
771}
772
773/**
774 * Input:
775 * @param client_id (uint64_t) - unique client id
776 * @param commit_position (ObjectSetPosition)
777 *
778 * Output:
779 * @returns 0 on success, negative error code on failure
780 */
781int journal_client_commit(cls_method_context_t hctx, bufferlist *in,
782 bufferlist *out) {
783 std::string id;
784 cls::journal::ObjectSetPosition commit_position;
785 try {
11fdf7f2
TL
786 auto iter = in->cbegin();
787 decode(id, iter);
788 decode(commit_position, iter);
f67539c2 789 } catch (const ceph::buffer::error &err) {
7c673cae
FG
790 CLS_ERR("failed to decode input parameters: %s", err.what());
791 return -EINVAL;
792 }
793
794 uint8_t splay_width;
795 int r = read_key(hctx, HEADER_KEY_SPLAY_WIDTH, &splay_width);
796 if (r < 0) {
797 return r;
798 }
799 if (commit_position.object_positions.size() > splay_width) {
800 CLS_ERR("too many object positions");
801 return -EINVAL;
802 }
803
804 std::string key(key_from_client_id(id));
805 cls::journal::Client client;
806 r = read_key(hctx, key, &client);
807 if (r < 0) {
808 return r;
809 }
810
811 if (client.commit_position == commit_position) {
812 return 0;
813 }
814
815 client.commit_position = commit_position;
816 r = write_key(hctx, key, client);
817 if (r < 0) {
818 return r;
819 }
820 return 0;
821}
822
823/**
824 * Input:
825 * @param start_after (string)
826 * @param max_return (uint64_t)
827 *
828 * Output:
829 * clients (set<cls::journal::Client>) - collection of registered clients
830 * @returns 0 on success, negative error code on failure
831 */
832int journal_client_list(cls_method_context_t hctx, bufferlist *in,
833 bufferlist *out) {
834 std::string start_after;
835 uint64_t max_return;
836 try {
11fdf7f2
TL
837 auto iter = in->cbegin();
838 decode(start_after, iter);
839 decode(max_return, iter);
f67539c2 840 } catch (const ceph::buffer::error &err) {
7c673cae
FG
841 CLS_ERR("failed to decode input parameters: %s", err.what());
842 return -EINVAL;
843 }
844
845 std::set<cls::journal::Client> clients;
846 int r = get_client_list_range(hctx, &clients, start_after, max_return);
847 if (r < 0)
848 return r;
849
11fdf7f2 850 encode(clients, *out);
7c673cae
FG
851 return 0;
852}
853
854/**
855 * Input:
856 * none
857 *
858 * Output:
859 * @returns 0 on success, negative error code on failure
860 */
861int journal_get_next_tag_tid(cls_method_context_t hctx, bufferlist *in,
862 bufferlist *out) {
863 uint64_t tag_tid;
864 int r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &tag_tid);
865 if (r < 0) {
866 return r;
867 }
868
11fdf7f2 869 encode(tag_tid, *out);
7c673cae
FG
870 return 0;
871}
872
873/**
874 * Input:
875 * @param tag_tid (uint64_t)
876 *
877 * Output:
878 * cls::journal::Tag
879 * @returns 0 on success, negative error code on failure
880 */
881int journal_get_tag(cls_method_context_t hctx, bufferlist *in,
882 bufferlist *out) {
883 uint64_t tag_tid;
884 try {
11fdf7f2
TL
885 auto iter = in->cbegin();
886 decode(tag_tid, iter);
f67539c2 887 } catch (const ceph::buffer::error &err) {
7c673cae
FG
888 CLS_ERR("failed to decode input parameters: %s", err.what());
889 return -EINVAL;
890 }
891
892 std::string key(key_from_tag_tid(tag_tid));
893 cls::journal::Tag tag;
894 int r = read_key(hctx, key, &tag);
895 if (r < 0) {
896 return r;
897 }
898
11fdf7f2 899 encode(tag, *out);
7c673cae
FG
900 return 0;
901}
902
903/**
904 * Input:
905 * @param tag_tid (uint64_t)
906 * @param tag_class (uint64_t)
907 * @param data (bufferlist)
908 *
909 * Output:
910 * @returns 0 on success, negative error code on failure
911 */
912int journal_tag_create(cls_method_context_t hctx, bufferlist *in,
913 bufferlist *out) {
914 uint64_t tag_tid;
915 uint64_t tag_class;
916 bufferlist data;
917 try {
11fdf7f2
TL
918 auto iter = in->cbegin();
919 decode(tag_tid, iter);
920 decode(tag_class, iter);
921 decode(data, iter);
f67539c2 922 } catch (const ceph::buffer::error &err) {
7c673cae
FG
923 CLS_ERR("failed to decode input parameters: %s", err.what());
924 return -EINVAL;
925 }
926
927 std::string key(key_from_tag_tid(tag_tid));
928 bufferlist stored_tag_bl;
929 int r = cls_cxx_map_get_val(hctx, key, &stored_tag_bl);
930 if (r >= 0) {
931 CLS_ERR("duplicate tag id: %" PRIu64, tag_tid);
932 return -EEXIST;
933 } else if (r != -ENOENT) {
934 return r;
935 }
936
937 // verify tag tid ordering
938 uint64_t next_tag_tid;
939 r = read_key(hctx, HEADER_KEY_NEXT_TAG_TID, &next_tag_tid);
940 if (r < 0) {
941 return r;
942 }
943 if (tag_tid != next_tag_tid) {
944 CLS_LOG(5, "out-of-order tag sequence: %" PRIu64, tag_tid);
945 return -ESTALE;
946 }
947
948 uint64_t next_tag_class;
949 r = read_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, &next_tag_class);
950 if (r < 0) {
951 return r;
952 }
953
954 if (tag_class == cls::journal::Tag::TAG_CLASS_NEW) {
955 // allocate a new tag class
956 tag_class = next_tag_class;
957 r = write_key(hctx, HEADER_KEY_NEXT_TAG_CLASS, tag_class + 1);
958 if (r < 0) {
959 return r;
960 }
961 } else {
962 // verify tag class range
963 if (tag_class >= next_tag_class) {
964 CLS_ERR("out-of-sequence tag class: %" PRIu64, tag_class);
965 return -EINVAL;
966 }
967 }
968
969 // prune expired tags
970 r = expire_tags(hctx, nullptr);
971 if (r < 0) {
972 return r;
973 }
974
975 // update tag tid sequence
976 r = write_key(hctx, HEADER_KEY_NEXT_TAG_TID, tag_tid + 1);
977 if (r < 0) {
978 return r;
979 }
980
981 // write tag structure
982 cls::journal::Tag tag(tag_tid, tag_class, data);
983 key = key_from_tag_tid(tag_tid);
984 r = write_key(hctx, key, tag);
985 if (r < 0) {
986 return r;
987 }
988 return 0;
989}
990
991/**
992 * Input:
993 * @param start_after_tag_tid (uint64_t) - first tag tid
994 * @param max_return (uint64_t) - max tags to return
995 * @param client_id (std::string) - client id filter
996 * @param tag_class (boost::optional<uint64_t> - optional tag class filter
997 *
998 * Output:
999 * std::set<cls::journal::Tag> - collection of tags
1000 * @returns 0 on success, negative error code on failure
1001 */
1002int journal_tag_list(cls_method_context_t hctx, bufferlist *in,
1003 bufferlist *out) {
1004 uint64_t start_after_tag_tid;
1005 uint64_t max_return;
1006 std::string client_id;
1007 boost::optional<uint64_t> tag_class(0);
1008
1009 // handle compiler false positive about use-before-init
1010 tag_class = boost::none;
1011 try {
11fdf7f2
TL
1012 auto iter = in->cbegin();
1013 decode(start_after_tag_tid, iter);
1014 decode(max_return, iter);
1015 decode(client_id, iter);
1016 decode(tag_class, iter);
f67539c2 1017 } catch (const ceph::buffer::error &err) {
7c673cae
FG
1018 CLS_ERR("failed to decode input parameters: %s", err.what());
1019 return -EINVAL;
1020 }
1021
1022 // calculate the minimum tag within client's commit position
1023 uint64_t minimum_tag_tid = std::numeric_limits<uint64_t>::max();
1024 cls::journal::Client client;
1025 int r = read_key(hctx, key_from_client_id(client_id), &client);
1026 if (r < 0) {
1027 return r;
1028 }
1029
1030 for (auto object_position : client.commit_position.object_positions) {
11fdf7f2 1031 minimum_tag_tid = std::min(minimum_tag_tid, object_position.tag_tid);
7c673cae
FG
1032 }
1033
1034 // compute minimum tags in use per-class
1035 std::set<cls::journal::Tag> tags;
1036 std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
1037 typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
1038 TAG_PASS_LIST,
1039 TAG_PASS_DONE } TagPass;
1040 int tag_pass = (minimum_tag_tid == std::numeric_limits<uint64_t>::max() ?
1041 TAG_PASS_LIST : TAG_PASS_CALCULATE_MINIMUMS);
1042 std::string last_read = HEADER_KEY_TAG_PREFIX;
1043 do {
1044 std::map<std::string, bufferlist> vals;
c07f9fc5 1045 bool more;
7c673cae 1046 r = cls_cxx_map_get_vals(hctx, last_read, HEADER_KEY_TAG_PREFIX,
c07f9fc5 1047 MAX_KEYS_READ, &vals, &more);
7c673cae
FG
1048 if (r < 0 && r != -ENOENT) {
1049 CLS_ERR("failed to retrieve tags: %s", cpp_strerror(r).c_str());
1050 return r;
1051 }
1052
1053 for (auto &val : vals) {
1054 cls::journal::Tag tag;
11fdf7f2 1055 auto iter = val.second.cbegin();
7c673cae 1056 try {
11fdf7f2 1057 decode(tag, iter);
f67539c2 1058 } catch (const ceph::buffer::error &err) {
7c673cae
FG
1059 CLS_ERR("error decoding tag: %s", val.first.c_str());
1060 return -EIO;
1061 }
1062
1063 if (tag_pass == TAG_PASS_CALCULATE_MINIMUMS) {
1064 minimum_tag_class_to_tids[tag.tag_class] = tag.tid;
1065
1066 // completed calculation of tag class minimums
1067 if (tag.tid >= minimum_tag_tid) {
1068 vals.clear();
3efd9988 1069 more = false;
7c673cae
FG
1070 break;
1071 }
1072 } else if (tag_pass == TAG_PASS_LIST) {
1073 if (start_after_tag_tid != 0 && tag.tid <= start_after_tag_tid) {
1074 continue;
1075 }
1076
1077 if (tag.tid >= minimum_tag_class_to_tids[tag.tag_class] &&
1078 (!tag_class || *tag_class == tag.tag_class)) {
1079 tags.insert(tag);
1080 }
1081 if (tags.size() >= max_return) {
1082 tag_pass = TAG_PASS_DONE;
1083 }
1084 }
1085 }
1086
c07f9fc5 1087 if (tag_pass != TAG_PASS_DONE && !more) {
7c673cae
FG
1088 last_read = HEADER_KEY_TAG_PREFIX;
1089 ++tag_pass;
1090 } else if (!vals.empty()) {
1091 last_read = vals.rbegin()->first;
1092 }
1093 } while (tag_pass != TAG_PASS_DONE);
1094
11fdf7f2 1095 encode(tags, *out);
7c673cae
FG
1096 return 0;
1097}
1098
1099/**
1100 * Input:
1101 * @param soft_max_size (uint64_t)
1102 *
1103 * Output:
1104 * @returns 0 if object size less than max, negative error code otherwise
1105 */
1106int journal_object_guard_append(cls_method_context_t hctx, bufferlist *in,
1107 bufferlist *out) {
1108 uint64_t soft_max_size;
1109 try {
11fdf7f2
TL
1110 auto iter = in->cbegin();
1111 decode(soft_max_size, iter);
f67539c2 1112 } catch (const ceph::buffer::error &err) {
7c673cae
FG
1113 CLS_ERR("failed to decode input parameters: %s", err.what());
1114 return -EINVAL;
1115 }
1116
1117 uint64_t size;
1118 time_t mtime;
1119 int r = cls_cxx_stat(hctx, &size, &mtime);
1120 if (r == -ENOENT) {
1121 return 0;
1122 } else if (r < 0) {
1123 CLS_ERR("failed to stat object: %s", cpp_strerror(r).c_str());
1124 return r;
1125 }
1126
1127 if (size >= soft_max_size) {
1128 CLS_LOG(5, "journal object full: %" PRIu64 " >= %" PRIu64,
1129 size, soft_max_size);
1130 return -EOVERFLOW;
1131 }
1132 return 0;
1133}
1134
9f95a23c
TL
1135/**
1136 * Input:
1137 * @param soft_max_size (uint64_t)
1138 * @param data (bufferlist) data to append
1139 *
1140 * Output:
1141 * @returns 0 on success, negative error code on failure
1142 * @returns -EOVERFLOW if object size is equal or more than soft_max_size
1143 */
1144int journal_object_append(cls_method_context_t hctx, bufferlist *in,
1145 bufferlist *out) {
1146 uint64_t soft_max_size;
1147 bufferlist data;
1148 try {
1149 auto iter = in->cbegin();
1150 decode(soft_max_size, iter);
1151 decode(data, iter);
f67539c2 1152 } catch (const ceph::buffer::error &err) {
9f95a23c
TL
1153 CLS_ERR("failed to decode input parameters: %s", err.what());
1154 return -EINVAL;
1155 }
1156
1157 uint64_t size = 0;
1158 int r = cls_cxx_stat(hctx, &size, nullptr);
1159 if (r < 0 && r != -ENOENT) {
1160 CLS_ERR("append: failed to stat object: %s", cpp_strerror(r).c_str());
1161 return r;
1162 }
1163
1164 if (size >= soft_max_size) {
1165 CLS_LOG(5, "journal object full: %" PRIu64 " >= %" PRIu64,
1166 size, soft_max_size);
1167 return -EOVERFLOW;
1168 }
1169
1170 auto offset = size;
1171 r = cls_cxx_write2(hctx, offset, data.length(), &data,
1172 CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
1173 if (r < 0) {
1174 CLS_ERR("append: error when writing: %s", cpp_strerror(r).c_str());
1175 return r;
1176 }
1177
1178 if (cls_get_min_compatible_client(hctx) < ceph_release_t::octopus) {
1179 return 0;
1180 }
1181
1182 auto min_alloc_size = cls_get_osd_min_alloc_size(hctx);
1183 if (min_alloc_size == 0) {
1184 min_alloc_size = 8;
1185 }
1186
f67539c2
TL
1187 auto stripe_width = cls_get_pool_stripe_width(hctx);
1188 if (stripe_width > 0) {
1189 min_alloc_size = round_up_to(min_alloc_size, stripe_width);
1190 }
1191
9f95a23c
TL
1192 CLS_LOG(20, "pad to %" PRIu64, min_alloc_size);
1193
1194 auto end = offset + data.length();
1195 auto new_end = round_up_to(end, min_alloc_size);
1196 if (new_end == end) {
1197 return 0;
1198 }
1199
1200 r = cls_cxx_truncate(hctx, new_end);
1201 if (r < 0) {
1202 CLS_ERR("append: error when truncating: %s", cpp_strerror(r).c_str());
1203 return r;
1204 }
1205
1206 return 0;
1207}
1208
7c673cae
FG
1209CLS_INIT(journal)
1210{
1211 CLS_LOG(20, "Loaded journal class!");
1212
1213 cls_handle_t h_class;
1214 cls_method_handle_t h_journal_create;
1215 cls_method_handle_t h_journal_get_order;
1216 cls_method_handle_t h_journal_get_splay_width;
1217 cls_method_handle_t h_journal_get_pool_id;
1218 cls_method_handle_t h_journal_get_minimum_set;
1219 cls_method_handle_t h_journal_set_minimum_set;
1220 cls_method_handle_t h_journal_get_active_set;
1221 cls_method_handle_t h_journal_set_active_set;
1222 cls_method_handle_t h_journal_get_client;
1223 cls_method_handle_t h_journal_client_register;
1224 cls_method_handle_t h_journal_client_update_data;
1225 cls_method_handle_t h_journal_client_update_state;
1226 cls_method_handle_t h_journal_client_unregister;
1227 cls_method_handle_t h_journal_client_commit;
1228 cls_method_handle_t h_journal_client_list;
1229 cls_method_handle_t h_journal_get_next_tag_tid;
1230 cls_method_handle_t h_journal_get_tag;
1231 cls_method_handle_t h_journal_tag_create;
1232 cls_method_handle_t h_journal_tag_list;
1233 cls_method_handle_t h_journal_object_guard_append;
9f95a23c 1234 cls_method_handle_t h_journal_object_append;
7c673cae
FG
1235
1236 cls_register("journal", &h_class);
1237
1238 /// methods for journal.$journal_id objects
1239 cls_register_cxx_method(h_class, "create",
1240 CLS_METHOD_RD | CLS_METHOD_WR,
1241 journal_create, &h_journal_create);
1242 cls_register_cxx_method(h_class, "get_order",
1243 CLS_METHOD_RD,
1244 journal_get_order, &h_journal_get_order);
1245 cls_register_cxx_method(h_class, "get_splay_width",
1246 CLS_METHOD_RD,
1247 journal_get_splay_width, &h_journal_get_splay_width);
1248 cls_register_cxx_method(h_class, "get_pool_id",
1249 CLS_METHOD_RD,
1250 journal_get_pool_id, &h_journal_get_pool_id);
1251 cls_register_cxx_method(h_class, "get_minimum_set",
1252 CLS_METHOD_RD,
1253 journal_get_minimum_set,
1254 &h_journal_get_minimum_set);
1255 cls_register_cxx_method(h_class, "set_minimum_set",
1256 CLS_METHOD_RD | CLS_METHOD_WR,
1257 journal_set_minimum_set,
1258 &h_journal_set_minimum_set);
1259 cls_register_cxx_method(h_class, "get_active_set",
1260 CLS_METHOD_RD,
1261 journal_get_active_set,
1262 &h_journal_get_active_set);
1263 cls_register_cxx_method(h_class, "set_active_set",
1264 CLS_METHOD_RD | CLS_METHOD_WR,
1265 journal_set_active_set,
1266 &h_journal_set_active_set);
1267
1268 cls_register_cxx_method(h_class, "get_client",
1269 CLS_METHOD_RD,
1270 journal_get_client, &h_journal_get_client);
1271 cls_register_cxx_method(h_class, "client_register",
1272 CLS_METHOD_RD | CLS_METHOD_WR,
1273 journal_client_register, &h_journal_client_register);
1274 cls_register_cxx_method(h_class, "client_update_data",
1275 CLS_METHOD_RD | CLS_METHOD_WR,
1276 journal_client_update_data,
1277 &h_journal_client_update_data);
1278 cls_register_cxx_method(h_class, "client_update_state",
1279 CLS_METHOD_RD | CLS_METHOD_WR,
1280 journal_client_update_state,
1281 &h_journal_client_update_state);
1282 cls_register_cxx_method(h_class, "client_unregister",
1283 CLS_METHOD_RD | CLS_METHOD_WR,
1284 journal_client_unregister,
1285 &h_journal_client_unregister);
1286 cls_register_cxx_method(h_class, "client_commit",
1287 CLS_METHOD_RD | CLS_METHOD_WR,
1288 journal_client_commit, &h_journal_client_commit);
1289 cls_register_cxx_method(h_class, "client_list",
1290 CLS_METHOD_RD,
1291 journal_client_list, &h_journal_client_list);
1292
1293 cls_register_cxx_method(h_class, "get_next_tag_tid",
1294 CLS_METHOD_RD,
1295 journal_get_next_tag_tid,
1296 &h_journal_get_next_tag_tid);
1297 cls_register_cxx_method(h_class, "get_tag",
1298 CLS_METHOD_RD,
1299 journal_get_tag, &h_journal_get_tag);
1300 cls_register_cxx_method(h_class, "tag_create",
1301 CLS_METHOD_RD | CLS_METHOD_WR,
1302 journal_tag_create, &h_journal_tag_create);
1303 cls_register_cxx_method(h_class, "tag_list",
1304 CLS_METHOD_RD,
1305 journal_tag_list, &h_journal_tag_list);
1306
1307 /// methods for journal_data.$journal_id.$object_id objects
1308 cls_register_cxx_method(h_class, "guard_append",
1309 CLS_METHOD_RD | CLS_METHOD_WR,
1310 journal_object_guard_append,
1311 &h_journal_object_guard_append);
9f95a23c
TL
1312 cls_register_cxx_method(h_class, "append", CLS_METHOD_RD | CLS_METHOD_WR,
1313 journal_object_append, &h_journal_object_append);
7c673cae 1314}