]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/rbd/action/Journal.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / tools / rbd / action / Journal.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "tools/rbd/ArgumentTypes.h"
5 #include "tools/rbd/Shell.h"
6 #include "tools/rbd/Utils.h"
7 #include "common/Cond.h"
8 #include "common/Formatter.h"
9 #include "common/ceph_json.h"
10 #include "common/errno.h"
11 #include "common/safe_io.h"
12 #include "include/stringify.h"
13 #include <fstream>
14 #include <sstream>
15 #include <boost/program_options.hpp>
16 #include "cls/rbd/cls_rbd_client.h"
17 #include "cls/journal/cls_journal_types.h"
18 #include "cls/journal/cls_journal_client.h"
19
20 #include "journal/Journaler.h"
21 #include "journal/ReplayEntry.h"
22 #include "journal/ReplayHandler.h"
23 #include "journal/Settings.h"
24 #include "librbd/journal/Types.h"
25
26 namespace rbd {
27 namespace action {
28 namespace journal {
29
30 namespace at = argument_types;
31 namespace po = boost::program_options;
32
33 static const std::string JOURNAL_SPEC("journal-spec");
34 static const std::string JOURNAL_NAME("journal");
35 static const std::string DEST_JOURNAL_NAME("dest-journal");
36
37 void add_journal_option(po::options_description *opt,
38 at::ArgumentModifier modifier) {
39 std::string name = JOURNAL_NAME;
40 std::string description = at::get_description_prefix(modifier) +
41 "journal name";
42 switch (modifier) {
43 case at::ARGUMENT_MODIFIER_NONE:
44 case at::ARGUMENT_MODIFIER_SOURCE:
45 break;
46 case at::ARGUMENT_MODIFIER_DEST:
47 name = DEST_JOURNAL_NAME;
48 break;
49 }
50
51 // TODO add validator
52 opt->add_options()
53 (name.c_str(), po::value<std::string>(), description.c_str());
54 }
55
56 void add_journal_spec_options(po::options_description *pos,
57 po::options_description *opt,
58 at::ArgumentModifier modifier) {
59
60 pos->add_options()
61 ((get_name_prefix(modifier) + JOURNAL_SPEC).c_str(),
62 (get_description_prefix(modifier) + "journal specification\n" +
63 "(example: [<pool-name>/[<namespace>/]]<journal-name>)").c_str());
64 add_pool_option(opt, modifier);
65 add_namespace_option(opt, modifier);
66 add_image_option(opt, modifier);
67 add_journal_option(opt, modifier);
68 }
69
70 int get_pool_journal_names(const po::variables_map &vm,
71 at::ArgumentModifier mod,
72 size_t *spec_arg_index,
73 std::string *pool_name,
74 std::string *namespace_name,
75 std::string *journal_name) {
76 std::string pool_key = (mod == at::ARGUMENT_MODIFIER_DEST ?
77 at::DEST_POOL_NAME : at::POOL_NAME);
78 std::string namespace_key = (mod == at::ARGUMENT_MODIFIER_DEST ?
79 at::DEST_NAMESPACE_NAME : at::NAMESPACE_NAME);
80 std::string image_key = (mod == at::ARGUMENT_MODIFIER_DEST ?
81 at::DEST_IMAGE_NAME : at::IMAGE_NAME);
82 std::string journal_key = (mod == at::ARGUMENT_MODIFIER_DEST ?
83 DEST_JOURNAL_NAME : JOURNAL_NAME);
84
85 if (vm.count(pool_key) && pool_name != nullptr) {
86 *pool_name = vm[pool_key].as<std::string>();
87 }
88 if (vm.count(namespace_key) && namespace_name != nullptr) {
89 *namespace_name = vm[namespace_key].as<std::string>();
90 }
91 if (vm.count(journal_key) && journal_name != nullptr) {
92 *journal_name = vm[journal_key].as<std::string>();
93 }
94
95 std::string image_name;
96 if (vm.count(image_key)) {
97 image_name = vm[image_key].as<std::string>();
98 }
99
100 int r;
101 if (journal_name != nullptr && !journal_name->empty()) {
102 // despite the separate pool option,
103 // we can also specify them via the journal option
104 std::string journal_name_copy(*journal_name);
105 r = extract_spec(journal_name_copy, pool_name, namespace_name, journal_name,
106 nullptr, utils::SPEC_VALIDATION_FULL);
107 if (r < 0) {
108 return r;
109 }
110 }
111
112 if (!image_name.empty()) {
113 // despite the separate pool option,
114 // we can also specify them via the image option
115 std::string image_name_copy(image_name);
116 r = extract_spec(image_name_copy, pool_name, namespace_name, &image_name,
117 nullptr, utils::SPEC_VALIDATION_NONE);
118 if (r < 0) {
119 return r;
120 }
121 }
122
123 if (journal_name != nullptr && spec_arg_index != nullptr &&
124 journal_name->empty()) {
125 std::string spec = utils::get_positional_argument(vm, (*spec_arg_index)++);
126 if (!spec.empty()) {
127 r = extract_spec(spec, pool_name, namespace_name, journal_name, nullptr,
128 utils::SPEC_VALIDATION_FULL);
129 if (r < 0) {
130 return r;
131 }
132 }
133 }
134
135 if (pool_name != nullptr && pool_name->empty()) {
136 *pool_name = utils::get_default_pool_name();
137 }
138
139 if (pool_name != nullptr && namespace_name != nullptr &&
140 journal_name != nullptr && journal_name->empty() && !image_name.empty()) {
141 // Try to get journal name from image info.
142 librados::Rados rados;
143 librados::IoCtx io_ctx;
144 librbd::Image image;
145 int r = utils::init_and_open_image(*pool_name, *namespace_name, image_name,
146 "", "", true, &rados, &io_ctx, &image);
147 if (r < 0) {
148 std::cerr << "rbd: failed to open image " << image_name
149 << " to get journal name: " << cpp_strerror(r) << std::endl;
150 return r;
151 }
152
153 uint64_t features;
154 r = image.features(&features);
155 if (r < 0) {
156 return r;
157 }
158 if ((features & RBD_FEATURE_JOURNALING) == 0) {
159 std::cerr << "rbd: journaling is not enabled for image " << image_name
160 << std::endl;
161 return -EINVAL;
162 }
163 *journal_name = utils::image_id(image);
164 }
165
166 if (journal_name != nullptr && journal_name->empty()) {
167 std::string prefix = at::get_description_prefix(mod);
168 std::cerr << "rbd: "
169 << (mod == at::ARGUMENT_MODIFIER_DEST ? prefix : std::string())
170 << "journal was not specified" << std::endl;
171 return -EINVAL;
172 }
173
174 return 0;
175 }
176
177 static int do_show_journal_info(librados::Rados& rados, librados::IoCtx& io_ctx,
178 const std::string& journal_id, Formatter *f)
179 {
180 int r;
181 C_SaferCond cond;
182
183 std::string header_oid = ::journal::Journaler::header_oid(journal_id);
184 std::string object_oid_prefix = ::journal::Journaler::object_oid_prefix(
185 io_ctx.get_id(), journal_id);
186 uint8_t order;
187 uint8_t splay_width;
188 int64_t pool_id;
189
190 cls::journal::client::get_immutable_metadata(io_ctx, header_oid, &order,
191 &splay_width, &pool_id, &cond);
192 r = cond.wait();
193 if (r < 0) {
194 std::cerr << "failed to get journal metadata: " << cpp_strerror(r)
195 << std::endl;
196 return r;
197 }
198
199 std::string object_pool_name;
200 if (pool_id >= 0) {
201 r = rados.pool_reverse_lookup(pool_id, &object_pool_name);
202 if (r < 0) {
203 std::cerr << "error looking up pool name for pool_id=" << pool_id << ": "
204 << cpp_strerror(r) << std::endl;
205 }
206 }
207
208 if (f) {
209 f->open_object_section("journal");
210 f->dump_string("journal_id", journal_id);
211 f->dump_string("header_oid", header_oid);
212 f->dump_string("object_oid_prefix", object_oid_prefix);
213 f->dump_int("order", order);
214 f->dump_int("splay_width", splay_width);
215 if (!object_pool_name.empty()) {
216 f->dump_string("object_pool", object_pool_name);
217 }
218 f->close_section();
219 f->flush(std::cout);
220 } else {
221 std::cout << "rbd journal '" << journal_id << "':" << std::endl;
222 std::cout << "\theader_oid: " << header_oid << std::endl;
223 std::cout << "\tobject_oid_prefix: " << object_oid_prefix << std::endl;
224 std::cout << "\torder: " << static_cast<int>(order) << " ("
225 << byte_u_t(1ull << order) << " objects)"<< std::endl;
226 std::cout << "\tsplay_width: " << static_cast<int>(splay_width) << std::endl;
227 if (!object_pool_name.empty()) {
228 std::cout << "\tobject_pool: " << object_pool_name << std::endl;
229 }
230 }
231 return 0;
232 }
233
234 static int do_show_journal_status(librados::IoCtx& io_ctx,
235 const std::string& journal_id, Formatter *f)
236 {
237 int r;
238
239 C_SaferCond cond;
240 uint64_t minimum_set;
241 uint64_t active_set;
242 std::set<cls::journal::Client> registered_clients;
243 std::string oid = ::journal::Journaler::header_oid(journal_id);
244
245 cls::journal::client::get_mutable_metadata(io_ctx, oid, &minimum_set,
246 &active_set, &registered_clients,
247 &cond);
248 r = cond.wait();
249 if (r < 0) {
250 std::cerr << "warning: failed to get journal metadata" << std::endl;
251 return r;
252 }
253
254 if (f) {
255 f->open_object_section("status");
256 f->dump_unsigned("minimum_set", minimum_set);
257 f->dump_unsigned("active_set", active_set);
258 f->open_array_section("registered_clients");
259 for (std::set<cls::journal::Client>::iterator c =
260 registered_clients.begin(); c != registered_clients.end(); ++c) {
261 f->open_object_section("client");
262 c->dump(f);
263 f->close_section();
264 }
265 f->close_section();
266 f->close_section();
267 f->flush(std::cout);
268 } else {
269 std::cout << "minimum_set: " << minimum_set << std::endl;
270 std::cout << "active_set: " << active_set << std::endl;
271 std::cout << "registered clients: " << std::endl;
272 for (std::set<cls::journal::Client>::iterator c =
273 registered_clients.begin(); c != registered_clients.end(); ++c) {
274 std::cout << "\t" << *c << std::endl;
275 }
276 }
277 return 0;
278 }
279
280 static int do_reset_journal(librados::IoCtx& io_ctx,
281 const std::string& journal_id)
282 {
283 // disable/re-enable journaling to delete/re-create the journal
284 // to properly handle mirroring constraints
285 std::string image_name;
286 int r = librbd::cls_client::dir_get_name(&io_ctx, RBD_DIRECTORY, journal_id,
287 &image_name);
288 if (r < 0) {
289 std::cerr << "failed to locate journal's image: " << cpp_strerror(r)
290 << std::endl;
291 return r;
292 }
293
294 librbd::Image image;
295 r = utils::open_image(io_ctx, image_name, false, &image);
296 if (r < 0) {
297 std::cerr << "failed to open image: " << cpp_strerror(r) << std::endl;
298 return r;
299 }
300
301 r = image.update_features(RBD_FEATURE_JOURNALING, false);
302 if (r < 0) {
303 std::cerr << "failed to disable image journaling: " << cpp_strerror(r)
304 << std::endl;
305 return r;
306 }
307
308 r = image.update_features(RBD_FEATURE_JOURNALING, true);
309 if (r < 0) {
310 std::cerr << "failed to re-enable image journaling: " << cpp_strerror(r)
311 << std::endl;
312 return r;
313 }
314 return 0;
315 }
316
317 static int do_disconnect_journal_client(librados::IoCtx& io_ctx,
318 const std::string& journal_id,
319 const std::string& client_id)
320 {
321 int r;
322
323 C_SaferCond cond;
324 uint64_t minimum_set;
325 uint64_t active_set;
326 std::set<cls::journal::Client> registered_clients;
327 std::string oid = ::journal::Journaler::header_oid(journal_id);
328
329 cls::journal::client::get_mutable_metadata(io_ctx, oid, &minimum_set,
330 &active_set, &registered_clients,
331 &cond);
332 r = cond.wait();
333 if (r < 0) {
334 std::cerr << "warning: failed to get journal metadata" << std::endl;
335 return r;
336 }
337
338 static const std::string IMAGE_CLIENT_ID("");
339
340 bool found = false;
341 for (auto &c : registered_clients) {
342 if (c.id == IMAGE_CLIENT_ID || (!client_id.empty() && client_id != c.id)) {
343 continue;
344 }
345 r = cls::journal::client::client_update_state(io_ctx, oid, c.id,
346 cls::journal::CLIENT_STATE_DISCONNECTED);
347 if (r < 0) {
348 std::cerr << "warning: failed to disconnect client " << c.id << ": "
349 << cpp_strerror(r) << std::endl;
350 return r;
351 }
352 std::cout << "client " << c.id << " disconnected" << std::endl;
353 found = true;
354 }
355
356 if (!found) {
357 if (!client_id.empty()) {
358 std::cerr << "warning: client " << client_id << " is not registered"
359 << std::endl;
360 } else {
361 std::cerr << "no registered clients to disconnect" << std::endl;
362 }
363 return -ENOENT;
364 }
365
366 bufferlist bl;
367 r = io_ctx.notify2(oid, bl, 5000, NULL);
368 if (r < 0) {
369 std::cerr << "warning: failed to notify state change:" << ": "
370 << cpp_strerror(r) << std::endl;
371 return r;
372 }
373
374 return 0;
375 }
376
377 class Journaler : public ::journal::Journaler {
378 public:
379 Journaler(librados::IoCtx& io_ctx, const std::string& journal_id,
380 const std::string &client_id) :
381 ::journal::Journaler(io_ctx, journal_id, client_id, {}) {
382 }
383
384 int init() {
385 int r;
386
387 // TODO register with librbd payload
388 r = register_client(bufferlist());
389 if (r < 0) {
390 std::cerr << "failed to register client: " << cpp_strerror(r)
391 << std::endl;
392 return r;
393 }
394
395 C_SaferCond cond;
396
397 ::journal::Journaler::init(&cond);
398 r = cond.wait();
399 if (r < 0) {
400 std::cerr << "failed to initialize journal: " << cpp_strerror(r)
401 << std::endl;
402 (void) unregister_client();
403 return r;
404 }
405
406 return 0;
407 }
408
409 int shut_down() {
410 int r = unregister_client();
411 if (r < 0) {
412 std::cerr << "rbd: failed to unregister journal client: "
413 << cpp_strerror(r) << std::endl;
414 }
415 ::journal::Journaler::shut_down();
416
417 return r;
418 }
419 };
420
421 class JournalPlayer {
422 public:
423 JournalPlayer(librados::IoCtx& io_ctx, const std::string& journal_id,
424 const std::string &client_id) :
425 m_journaler(io_ctx, journal_id, client_id),
426 m_cond(),
427 m_r(0) {
428 }
429
430 virtual ~JournalPlayer() {}
431
432 virtual int exec() {
433 int r;
434
435 r = m_journaler.init();
436 if (r < 0) {
437 return r;
438 }
439
440 ReplayHandler replay_handler(this);
441
442 m_journaler.start_replay(&replay_handler);
443
444 r = m_cond.wait();
445 if (r < 0) {
446 std::cerr << "rbd: failed to process journal: " << cpp_strerror(r)
447 << std::endl;
448 if (m_r == 0) {
449 m_r = r;
450 }
451 }
452 return m_r;
453 }
454
455 int shut_down() {
456 return m_journaler.shut_down();
457 }
458
459 protected:
460 struct ReplayHandler : public ::journal::ReplayHandler {
461 JournalPlayer *journal;
462 explicit ReplayHandler(JournalPlayer *_journal) : journal(_journal) {}
463
464 void get() override {}
465 void put() override {}
466
467 void handle_entries_available() override {
468 journal->handle_replay_ready();
469 }
470 void handle_complete(int r) override {
471 journal->handle_replay_complete(r);
472 }
473 };
474
475 void handle_replay_ready() {
476 int r = 0;
477 while (true) {
478 ::journal::ReplayEntry replay_entry;
479 uint64_t tag_id;
480 if (!m_journaler.try_pop_front(&replay_entry, &tag_id)) {
481 break;
482 }
483
484 r = process_entry(replay_entry, tag_id);
485 if (r < 0) {
486 break;
487 }
488 }
489 }
490
491 virtual int process_entry(::journal::ReplayEntry replay_entry,
492 uint64_t tag_id) = 0;
493
494 void handle_replay_complete(int r) {
495 if (m_r == 0 && r < 0) {
496 m_r = r;
497 }
498 m_journaler.stop_replay(&m_cond);
499 }
500
501 Journaler m_journaler;
502 C_SaferCond m_cond;
503 int m_r;
504 };
505
506 static int inspect_entry(bufferlist& data,
507 librbd::journal::EventEntry& event_entry,
508 bool verbose) {
509 try {
510 auto it = data.cbegin();
511 decode(event_entry, it);
512 } catch (const buffer::error &err) {
513 std::cerr << "failed to decode event entry: " << err.what() << std::endl;
514 return -EINVAL;
515 }
516 if (verbose) {
517 JSONFormatter f(true);
518 f.open_object_section("event_entry");
519 event_entry.dump(&f);
520 f.close_section();
521 f.flush(std::cout);
522 }
523 return 0;
524 }
525
526 class JournalInspector : public JournalPlayer {
527 public:
528 JournalInspector(librados::IoCtx& io_ctx, const std::string& journal_id,
529 bool verbose) :
530 JournalPlayer(io_ctx, journal_id, "INSPECT"),
531 m_verbose(verbose),
532 m_s() {
533 }
534
535 int exec() override {
536 int r = JournalPlayer::exec();
537 m_s.print();
538 return r;
539 }
540
541 private:
542 struct Stats {
543 Stats() : total(0), error(0) {}
544
545 void print() {
546 std::cout << "Summary:" << std::endl
547 << " " << total << " entries inspected, " << error << " errors"
548 << std::endl;
549 }
550
551 int total;
552 int error;
553 };
554
555 int process_entry(::journal::ReplayEntry replay_entry,
556 uint64_t tag_id) override {
557 m_s.total++;
558 if (m_verbose) {
559 std::cout << "Entry: tag_id=" << tag_id << ", commit_tid="
560 << replay_entry.get_commit_tid() << std::endl;
561 }
562 bufferlist data = replay_entry.get_data();
563 librbd::journal::EventEntry event_entry;
564 int r = inspect_entry(data, event_entry, m_verbose);
565 if (r < 0) {
566 m_r = r;
567 m_s.error++;
568 }
569 return 0;
570 }
571
572 bool m_verbose;
573 Stats m_s;
574 };
575
576 static int do_inspect_journal(librados::IoCtx& io_ctx,
577 const std::string& journal_id,
578 bool verbose) {
579 JournalInspector inspector(io_ctx, journal_id, verbose);
580 int r = inspector.exec();
581 if (r < 0) {
582 inspector.shut_down();
583 return r;
584 }
585
586 r = inspector.shut_down();
587 if (r < 0) {
588 return r;
589 }
590 return 0;
591 }
592
593 struct ExportEntry {
594 uint64_t tag_id;
595 uint64_t commit_tid;
596 int type;
597 bufferlist entry;
598
599 ExportEntry() : tag_id(0), commit_tid(0), type(0), entry() {}
600
601 ExportEntry(uint64_t tag_id, uint64_t commit_tid, int type,
602 const bufferlist& entry)
603 : tag_id(tag_id), commit_tid(commit_tid), type(type), entry(entry) {
604 }
605
606 void dump(Formatter *f) const {
607 ::encode_json("tag_id", tag_id, f);
608 ::encode_json("commit_tid", commit_tid, f);
609 ::encode_json("type", type, f);
610 ::encode_json("entry", entry, f);
611 }
612
613 void decode_json(JSONObj *obj) {
614 JSONDecoder::decode_json("tag_id", tag_id, obj);
615 JSONDecoder::decode_json("commit_tid", commit_tid, obj);
616 JSONDecoder::decode_json("type", type, obj);
617 JSONDecoder::decode_json("entry", entry, obj);
618 }
619 };
620
621 class JournalExporter : public JournalPlayer {
622 public:
623 JournalExporter(librados::IoCtx& io_ctx, const std::string& journal_id,
624 int fd, bool no_error, bool verbose) :
625 JournalPlayer(io_ctx, journal_id, "EXPORT"),
626 m_journal_id(journal_id),
627 m_fd(fd),
628 m_no_error(no_error),
629 m_verbose(verbose),
630 m_s() {
631 }
632
633 int exec() override {
634 std::string header("# journal_id: " + m_journal_id + "\n");
635 int r;
636 r = safe_write(m_fd, header.c_str(), header.size());
637 if (r < 0) {
638 std::cerr << "rbd: failed to write to export file: " << cpp_strerror(r)
639 << std::endl;
640 return r;
641 }
642 r = JournalPlayer::exec();
643 m_s.print();
644 return r;
645 }
646
647 private:
648 struct Stats {
649 Stats() : total(0), error(0) {}
650
651 void print() {
652 std::cout << total << " entries processed, " << error << " errors"
653 << std::endl;
654 }
655
656 int total;
657 int error;
658 };
659
660 int process_entry(::journal::ReplayEntry replay_entry,
661 uint64_t tag_id) override {
662 m_s.total++;
663 int type = -1;
664 bufferlist entry = replay_entry.get_data();
665 librbd::journal::EventEntry event_entry;
666 int r = inspect_entry(entry, event_entry, m_verbose);
667 if (r < 0) {
668 m_s.error++;
669 m_r = r;
670 return m_no_error ? 0 : r;
671 } else {
672 type = event_entry.get_event_type();
673 }
674 ExportEntry export_entry(tag_id, replay_entry.get_commit_tid(), type,
675 entry);
676 JSONFormatter f;
677 ::encode_json("event_entry", export_entry, &f);
678 std::ostringstream oss;
679 f.flush(oss);
680 std::string objstr = oss.str();
681 std::string header = stringify(objstr.size()) + " ";
682 r = safe_write(m_fd, header.c_str(), header.size());
683 if (r == 0) {
684 r = safe_write(m_fd, objstr.c_str(), objstr.size());
685 }
686 if (r == 0) {
687 r = safe_write(m_fd, "\n", 1);
688 }
689 if (r < 0) {
690 std::cerr << "rbd: failed to write to export file: " << cpp_strerror(r)
691 << std::endl;
692 m_s.error++;
693 return r;
694 }
695 return 0;
696 }
697
698 std::string m_journal_id;
699 int m_fd;
700 bool m_no_error;
701 bool m_verbose;
702 Stats m_s;
703 };
704
705 static int do_export_journal(librados::IoCtx& io_ctx,
706 const std::string& journal_id,
707 const std::string& path,
708 bool no_error, bool verbose) {
709 int r;
710 int fd;
711 bool to_stdout = path == "-";
712 if (to_stdout) {
713 fd = STDOUT_FILENO;
714 } else {
715 fd = open(path.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0644);
716 if (fd < 0) {
717 r = -errno;
718 std::cerr << "rbd: error creating " << path << std::endl;
719 return r;
720 }
721 #ifdef HAVE_POSIX_FADVISE
722 posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
723 #endif
724 }
725
726 JournalExporter exporter(io_ctx, journal_id, fd, no_error, verbose);
727 r = exporter.exec();
728
729 if (!to_stdout) {
730 close(fd);
731 }
732
733 int shut_down_r = exporter.shut_down();
734 if (r == 0 && shut_down_r < 0) {
735 r = shut_down_r;
736 }
737
738 return r;
739 }
740
741 class JournalImporter {
742 public:
743 JournalImporter(librados::IoCtx& io_ctx, const std::string& journal_id,
744 int fd, bool no_error, bool verbose) :
745 m_journaler(io_ctx, journal_id, "IMPORT"),
746 m_fd(fd),
747 m_no_error(no_error),
748 m_verbose(verbose) {
749 }
750
751 bool read_entry(bufferlist& bl, int& r) {
752 // Entries are stored in the file using the following format:
753 //
754 // # Optional comments
755 // NNN {json encoded entry}
756 // ...
757 //
758 // Where NNN is the encoded entry size.
759 bl.clear();
760 char buf[80];
761 // Skip line feed and comments (lines started with #).
762 while ((r = safe_read_exact(m_fd, buf, 1)) == 0) {
763 if (buf[0] == '\n') {
764 continue;
765 } else if (buf[0] == '#') {
766 while ((r = safe_read_exact(m_fd, buf, 1)) == 0) {
767 if (buf[0] == '\n') {
768 break;
769 }
770 }
771 } else {
772 break;
773 }
774 }
775 if (r < 0) {
776 if (r == -EDOM) {
777 r = 0;
778 }
779 return false;
780 }
781 // Read entry size to buf.
782 if (!isdigit(buf[0])) {
783 r = -EINVAL;
784 std::cerr << "rbd: import data invalid format (digit expected)"
785 << std::endl;
786 return false;
787 }
788 for (size_t i = 1; i < sizeof(buf); i++) {
789 r = safe_read_exact(m_fd, buf + i, 1);
790 if (r < 0) {
791 std::cerr << "rbd: error reading import data" << std::endl;
792 return false;
793 }
794 if (!isdigit(buf[i])) {
795 if (buf[i] != ' ') {
796 r = -EINVAL;
797 std::cerr << "rbd: import data invalid format (space expected)"
798 << std::endl;
799 return false;
800 }
801 buf[i] = '\0';
802 break;
803 }
804 }
805 int entry_size = atoi(buf);
806 if (entry_size == 0) {
807 r = -EINVAL;
808 std::cerr << "rbd: import data invalid format (zero entry size)"
809 << std::endl;
810 return false;
811 }
812 ceph_assert(entry_size > 0);
813 // Read entry.
814 r = bl.read_fd(m_fd, entry_size);
815 if (r < 0) {
816 std::cerr << "rbd: error reading from stdin: " << cpp_strerror(r)
817 << std::endl;
818 return false;
819 }
820 if (r != entry_size) {
821 std::cerr << "rbd: error reading from stdin: truncated"
822 << std::endl;
823 r = -EINVAL;
824 return false;
825 }
826 r = 0;
827 return true;
828 }
829
830 int exec() {
831 int r = m_journaler.init();
832 if (r < 0) {
833 return r;
834 }
835 m_journaler.start_append(0);
836
837 int r1 = 0;
838 bufferlist bl;
839 int n = 0;
840 int error_count = 0;
841 while (read_entry(bl, r)) {
842 n++;
843 error_count++;
844 JSONParser p;
845 if (!p.parse(bl.c_str(), bl.length())) {
846 std::cerr << "rbd: error parsing input (entry " << n << ")"
847 << std::endl;
848 r = -EINVAL;
849 if (m_no_error) {
850 r1 = r;
851 continue;
852 } else {
853 break;
854 }
855 }
856 ExportEntry e;
857 try {
858 decode_json_obj(e, &p);
859 } catch (JSONDecoder::err& err) {
860 std::cerr << "rbd: error json decoding import data (entry " << n << "):"
861 << err.message << std::endl;
862 r = -EINVAL;
863 if (m_no_error) {
864 r1 = r;
865 continue;
866 } else {
867 break;
868 }
869 }
870 librbd::journal::EventEntry event_entry;
871 r = inspect_entry(e.entry, event_entry, m_verbose);
872 if (r < 0) {
873 std::cerr << "rbd: corrupted entry " << n << ": tag_tid=" << e.tag_id
874 << ", commit_tid=" << e.commit_tid << std::endl;
875 if (m_no_error) {
876 r1 = r;
877 continue;
878 } else {
879 break;
880 }
881 }
882 m_journaler.append(e.tag_id, e.entry);
883 error_count--;
884 }
885
886 std::cout << n << " entries processed, " << error_count << " errors" << std::endl;
887
888 std::cout << "Waiting for journal append to complete..." << std::endl;
889
890 C_SaferCond cond;
891 m_journaler.stop_append(&cond);
892 r = cond.wait();
893
894 if (r < 0) {
895 std::cerr << "failed to append journal: " << cpp_strerror(r) << std::endl;
896 }
897
898 if (r1 < 0 && r == 0) {
899 r = r1;
900 }
901 return r;
902 }
903
904 int shut_down() {
905 return m_journaler.shut_down();
906 }
907
908 private:
909 Journaler m_journaler;
910 int m_fd;
911 bool m_no_error;
912 bool m_verbose;
913 };
914
915 static int do_import_journal(librados::IoCtx& io_ctx,
916 const std::string& journal_id,
917 const std::string& path,
918 bool no_error, bool verbose) {
919 int r;
920
921 int fd;
922 bool from_stdin = path == "-";
923 if (from_stdin) {
924 fd = STDIN_FILENO;
925 } else {
926 if ((fd = open(path.c_str(), O_RDONLY)) < 0) {
927 r = -errno;
928 std::cerr << "rbd: error opening " << path << std::endl;
929 return r;
930 }
931 #ifdef HAVE_POSIX_FADVISE
932 posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL);
933 #endif
934 }
935
936 JournalImporter importer(io_ctx, journal_id, fd, no_error, verbose);
937 r = importer.exec();
938
939 if (!from_stdin) {
940 close(fd);
941 }
942
943 int shut_down_r = importer.shut_down();
944 if (r == 0 && shut_down_r < 0) {
945 r = shut_down_r;
946 }
947
948 return r;
949 }
950
951 void get_info_arguments(po::options_description *positional,
952 po::options_description *options) {
953 add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
954 at::add_format_options(options);
955 }
956
957 int execute_info(const po::variables_map &vm,
958 const std::vector<std::string> &ceph_global_init_args) {
959 size_t arg_index = 0;
960 std::string pool_name;
961 std::string namespace_name;
962 std::string journal_name;
963 int r = get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE, &arg_index,
964 &pool_name, &namespace_name, &journal_name);
965 if (r < 0) {
966 return r;
967 }
968
969 at::Format::Formatter formatter;
970 r = utils::get_formatter(vm, &formatter);
971 if (r < 0) {
972 return r;
973 }
974
975 librados::Rados rados;
976 librados::IoCtx io_ctx;
977 r = utils::init(pool_name, namespace_name, &rados, &io_ctx);
978 if (r < 0) {
979 return r;
980 }
981
982 r = do_show_journal_info(rados, io_ctx, journal_name, formatter.get());
983 if (r < 0) {
984 std::cerr << "rbd: journal info: " << cpp_strerror(r) << std::endl;
985 return r;
986 }
987 return 0;
988
989 }
990
991 void get_status_arguments(po::options_description *positional,
992 po::options_description *options) {
993 add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
994 at::add_format_options(options);
995 }
996
997 int execute_status(const po::variables_map &vm,
998 const std::vector<std::string> &ceph_global_init_args) {
999 size_t arg_index = 0;
1000 std::string pool_name;
1001 std::string namespace_name;
1002 std::string journal_name;
1003 int r = get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE, &arg_index,
1004 &pool_name, &namespace_name, &journal_name);
1005 if (r < 0) {
1006 return r;
1007 }
1008
1009 at::Format::Formatter formatter;
1010 r = utils::get_formatter(vm, &formatter);
1011 if (r < 0) {
1012 return r;
1013 }
1014
1015 librados::Rados rados;
1016 librados::IoCtx io_ctx;
1017 r = utils::init(pool_name, namespace_name, &rados, &io_ctx);
1018 if (r < 0) {
1019 return r;
1020 }
1021
1022 r = do_show_journal_status(io_ctx, journal_name, formatter.get());
1023 if (r < 0) {
1024 std::cerr << "rbd: journal status: " << cpp_strerror(r) << std::endl;
1025 return r;
1026 }
1027 return 0;
1028 }
1029
1030 void get_reset_arguments(po::options_description *positional,
1031 po::options_description *options) {
1032 add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
1033 }
1034
1035 int execute_reset(const po::variables_map &vm,
1036 const std::vector<std::string> &ceph_global_init_args) {
1037 size_t arg_index = 0;
1038 std::string pool_name;
1039 std::string namespace_name;
1040 std::string journal_name;
1041 int r = get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE, &arg_index,
1042 &pool_name, &namespace_name, &journal_name);
1043 if (r < 0) {
1044 return r;
1045 }
1046
1047 librados::Rados rados;
1048 librados::IoCtx io_ctx;
1049 r = utils::init(pool_name, namespace_name, &rados, &io_ctx);
1050 if (r < 0) {
1051 return r;
1052 }
1053
1054 r = do_reset_journal(io_ctx, journal_name);
1055 if (r < 0) {
1056 std::cerr << "rbd: journal reset: " << cpp_strerror(r) << std::endl;
1057 return r;
1058 }
1059 return 0;
1060 }
1061
1062 void get_client_disconnect_arguments(po::options_description *positional,
1063 po::options_description *options) {
1064 add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
1065 options->add_options()
1066 ("client-id", po::value<std::string>(),
1067 "client ID (or leave unspecified to disconnect all)");
1068 }
1069
1070 int execute_client_disconnect(const po::variables_map &vm,
1071 const std::vector<std::string> &ceph_global_init_args) {
1072 size_t arg_index = 0;
1073 std::string pool_name;
1074 std::string namespace_name;
1075 std::string journal_name;
1076 int r = get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE, &arg_index,
1077 &pool_name, &namespace_name, &journal_name);
1078 if (r < 0) {
1079 return r;
1080 }
1081
1082 std::string client_id;
1083 if (vm.count("client-id")) {
1084 client_id = vm["client-id"].as<std::string>();
1085 }
1086
1087 librados::Rados rados;
1088 librados::IoCtx io_ctx;
1089 r = utils::init(pool_name, namespace_name, &rados, &io_ctx);
1090 if (r < 0) {
1091 return r;
1092 }
1093
1094 r = do_disconnect_journal_client(io_ctx, journal_name, client_id);
1095 if (r < 0) {
1096 std::cerr << "rbd: journal client disconnect: " << cpp_strerror(r)
1097 << std::endl;
1098 return r;
1099 }
1100 return 0;
1101 }
1102
1103 void get_inspect_arguments(po::options_description *positional,
1104 po::options_description *options) {
1105 add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE);
1106 at::add_verbose_option(options);
1107 }
1108
1109 int execute_inspect(const po::variables_map &vm,
1110 const std::vector<std::string> &ceph_global_init_args) {
1111 size_t arg_index = 0;
1112 std::string pool_name;
1113 std::string namespace_name;
1114 std::string journal_name;
1115 int r = get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE, &arg_index,
1116 &pool_name, &namespace_name, &journal_name);
1117 if (r < 0) {
1118 return r;
1119 }
1120
1121 librados::Rados rados;
1122 librados::IoCtx io_ctx;
1123 r = utils::init(pool_name, namespace_name, &rados, &io_ctx);
1124 if (r < 0) {
1125 return r;
1126 }
1127
1128 r = do_inspect_journal(io_ctx, journal_name, vm[at::VERBOSE].as<bool>());
1129 if (r < 0) {
1130 std::cerr << "rbd: journal inspect: " << cpp_strerror(r) << std::endl;
1131 return r;
1132 }
1133 return 0;
1134 }
1135
1136 void get_export_arguments(po::options_description *positional,
1137 po::options_description *options) {
1138 add_journal_spec_options(positional, options,
1139 at::ARGUMENT_MODIFIER_SOURCE);
1140 at::add_path_options(positional, options,
1141 "export file (or '-' for stdout)");
1142 at::add_verbose_option(options);
1143 at::add_no_error_option(options);
1144 }
1145
1146 int execute_export(const po::variables_map &vm,
1147 const std::vector<std::string> &ceph_global_init_args) {
1148 size_t arg_index = 0;
1149 std::string pool_name;
1150 std::string namespace_name;
1151 std::string journal_name;
1152 int r = get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_SOURCE, &arg_index,
1153 &pool_name, &namespace_name, &journal_name);
1154 if (r < 0) {
1155 return r;
1156 }
1157
1158 std::string path;
1159 r = utils::get_path(vm, &arg_index, &path);
1160 if (r < 0) {
1161 return r;
1162 }
1163
1164 librados::Rados rados;
1165 librados::IoCtx io_ctx;
1166 r = utils::init(pool_name, namespace_name, &rados, &io_ctx);
1167 if (r < 0) {
1168 return r;
1169 }
1170
1171 r = do_export_journal(io_ctx, journal_name, path, vm[at::NO_ERROR].as<bool>(),
1172 vm[at::VERBOSE].as<bool>());
1173 if (r < 0) {
1174 std::cerr << "rbd: journal export: " << cpp_strerror(r) << std::endl;
1175 return r;
1176 }
1177 return 0;
1178 }
1179
1180 void get_import_arguments(po::options_description *positional,
1181 po::options_description *options) {
1182 at::add_path_options(positional, options,
1183 "import file (or '-' for stdin)");
1184 add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_DEST);
1185 at::add_verbose_option(options);
1186 at::add_no_error_option(options);
1187 }
1188
1189 int execute_import(const po::variables_map &vm,
1190 const std::vector<std::string> &ceph_global_init_args) {
1191 std::string path;
1192 size_t arg_index = 0;
1193 int r = utils::get_path(vm, &arg_index, &path);
1194 if (r < 0) {
1195 return r;
1196 }
1197
1198 std::string pool_name;
1199 std::string namespace_name;
1200 std::string journal_name;
1201 r = get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_DEST, &arg_index,
1202 &pool_name, &namespace_name, &journal_name);
1203 if (r < 0) {
1204 return r;
1205 }
1206
1207 librados::Rados rados;
1208 librados::IoCtx io_ctx;
1209 r = utils::init(pool_name, namespace_name, &rados, &io_ctx);
1210 if (r < 0) {
1211 return r;
1212 }
1213
1214 r = do_import_journal(io_ctx, journal_name, path, vm[at::NO_ERROR].as<bool>(),
1215 vm[at::VERBOSE].as<bool>());
1216 if (r < 0) {
1217 std::cerr << "rbd: journal import: " << cpp_strerror(r) << std::endl;
1218 return r;
1219 }
1220 return 0;
1221 }
1222
1223 Shell::Action action_info(
1224 {"journal", "info"}, {}, "Show information about image journal.", "",
1225 &get_info_arguments, &execute_info);
1226
1227 Shell::Action action_status(
1228 {"journal", "status"}, {}, "Show status of image journal.", "",
1229 &get_status_arguments, &execute_status);
1230
1231 Shell::Action action_reset(
1232 {"journal", "reset"}, {}, "Reset image journal.", "",
1233 &get_reset_arguments, &execute_reset);
1234
1235 Shell::Action action_inspect(
1236 {"journal", "inspect"}, {}, "Inspect image journal for structural errors.", "",
1237 &get_inspect_arguments, &execute_inspect);
1238
1239 Shell::Action action_export(
1240 {"journal", "export"}, {}, "Export image journal.", "",
1241 &get_export_arguments, &execute_export);
1242
1243 Shell::Action action_import(
1244 {"journal", "import"}, {}, "Import image journal.", "",
1245 &get_import_arguments, &execute_import);
1246
1247 Shell::Action action_disconnect(
1248 {"journal", "client", "disconnect"}, {},
1249 "Flag image journal client as disconnected.", "",
1250 &get_client_disconnect_arguments, &execute_client_disconnect);
1251
1252 } // namespace journal
1253 } // namespace action
1254 } // namespace rbd