]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "tools/rbd/ArgumentTypes.h" | |
5 | #include "tools/rbd/Shell.h" | |
6 | #include "tools/rbd/Utils.h" | |
7 | #include "common/Cond.h" | |
8 | #include "common/Formatter.h" | |
9 | #include "common/ceph_json.h" | |
10 | #include "common/errno.h" | |
11 | #include "common/safe_io.h" | |
12 | #include "include/stringify.h" | |
13 | #include <fstream> | |
14 | #include <sstream> | |
15 | #include <boost/program_options.hpp> | |
16 | #include "cls/rbd/cls_rbd_client.h" | |
17 | #include "cls/journal/cls_journal_types.h" | |
18 | #include "cls/journal/cls_journal_client.h" | |
19 | ||
20 | #include "journal/Journaler.h" | |
21 | #include "journal/ReplayEntry.h" | |
22 | #include "journal/ReplayHandler.h" | |
23 | #include "journal/Settings.h" | |
24 | #include "librbd/journal/Types.h" | |
25 | ||
26 | namespace rbd { | |
27 | namespace action { | |
28 | namespace journal { | |
29 | ||
30 | namespace at = argument_types; | |
31 | namespace po = boost::program_options; | |
32 | ||
33 | static int do_show_journal_info(librados::Rados& rados, librados::IoCtx& io_ctx, | |
34 | const std::string& journal_id, Formatter *f) | |
35 | { | |
36 | int r; | |
37 | C_SaferCond cond; | |
38 | ||
39 | std::string header_oid = ::journal::Journaler::header_oid(journal_id); | |
40 | std::string object_oid_prefix = ::journal::Journaler::object_oid_prefix( | |
41 | io_ctx.get_id(), journal_id); | |
42 | uint8_t order; | |
43 | uint8_t splay_width; | |
44 | int64_t pool_id; | |
45 | ||
46 | cls::journal::client::get_immutable_metadata(io_ctx, header_oid, &order, | |
47 | &splay_width, &pool_id, &cond); | |
48 | r = cond.wait(); | |
49 | if (r < 0) { | |
50 | std::cerr << "failed to get journal metadata: " << cpp_strerror(r) | |
51 | << std::endl; | |
52 | return r; | |
53 | } | |
54 | ||
55 | std::string object_pool_name; | |
56 | if (pool_id >= 0) { | |
57 | r = rados.pool_reverse_lookup(pool_id, &object_pool_name); | |
58 | if (r < 0) { | |
59 | std::cerr << "error looking up pool name for pool_id=" << pool_id << ": " | |
60 | << cpp_strerror(r) << std::endl; | |
61 | } | |
62 | } | |
63 | ||
64 | if (f) { | |
65 | f->open_object_section("journal"); | |
66 | f->dump_string("journal_id", journal_id); | |
67 | f->dump_string("header_oid", header_oid); | |
68 | f->dump_string("object_oid_prefix", object_oid_prefix); | |
69 | f->dump_int("order", order); | |
70 | f->dump_int("splay_width", splay_width); | |
71 | if (!object_pool_name.empty()) { | |
72 | f->dump_string("object_pool", object_pool_name); | |
73 | } | |
74 | f->close_section(); | |
75 | f->flush(std::cout); | |
76 | } else { | |
77 | std::cout << "rbd journal '" << journal_id << "':" << std::endl; | |
78 | std::cout << "\theader_oid: " << header_oid << std::endl; | |
79 | std::cout << "\tobject_oid_prefix: " << object_oid_prefix << std::endl; | |
80 | std::cout << "\torder: " << static_cast<int>(order) << " (" | |
1adf2230 | 81 | << byte_u_t(1ull << order) << " objects)"<< std::endl; |
7c673cae FG |
82 | std::cout << "\tsplay_width: " << static_cast<int>(splay_width) << std::endl; |
83 | if (!object_pool_name.empty()) { | |
84 | std::cout << "\tobject_pool: " << object_pool_name << std::endl; | |
85 | } | |
86 | } | |
87 | return 0; | |
88 | } | |
89 | ||
90 | static int do_show_journal_status(librados::IoCtx& io_ctx, | |
91 | const std::string& journal_id, Formatter *f) | |
92 | { | |
93 | int r; | |
94 | ||
95 | C_SaferCond cond; | |
96 | uint64_t minimum_set; | |
97 | uint64_t active_set; | |
98 | std::set<cls::journal::Client> registered_clients; | |
99 | std::string oid = ::journal::Journaler::header_oid(journal_id); | |
100 | ||
101 | cls::journal::client::get_mutable_metadata(io_ctx, oid, &minimum_set, | |
102 | &active_set, ®istered_clients, | |
103 | &cond); | |
104 | r = cond.wait(); | |
105 | if (r < 0) { | |
106 | std::cerr << "warning: failed to get journal metadata" << std::endl; | |
107 | return r; | |
108 | } | |
109 | ||
110 | if (f) { | |
111 | f->open_object_section("status"); | |
112 | f->dump_unsigned("minimum_set", minimum_set); | |
113 | f->dump_unsigned("active_set", active_set); | |
114 | f->open_array_section("registered_clients"); | |
115 | for (std::set<cls::journal::Client>::iterator c = | |
116 | registered_clients.begin(); c != registered_clients.end(); ++c) { | |
117 | f->open_object_section("client"); | |
118 | c->dump(f); | |
119 | f->close_section(); | |
120 | } | |
121 | f->close_section(); | |
122 | f->close_section(); | |
123 | f->flush(std::cout); | |
124 | } else { | |
125 | std::cout << "minimum_set: " << minimum_set << std::endl; | |
126 | std::cout << "active_set: " << active_set << std::endl; | |
127 | std::cout << "registered clients: " << std::endl; | |
128 | for (std::set<cls::journal::Client>::iterator c = | |
129 | registered_clients.begin(); c != registered_clients.end(); ++c) { | |
130 | std::cout << "\t" << *c << std::endl; | |
131 | } | |
132 | } | |
133 | return 0; | |
134 | } | |
135 | ||
136 | static int do_reset_journal(librados::IoCtx& io_ctx, | |
137 | const std::string& journal_id) | |
138 | { | |
139 | // disable/re-enable journaling to delete/re-create the journal | |
140 | // to properly handle mirroring constraints | |
141 | std::string image_name; | |
142 | int r = librbd::cls_client::dir_get_name(&io_ctx, RBD_DIRECTORY, journal_id, | |
143 | &image_name); | |
144 | if (r < 0) { | |
145 | std::cerr << "failed to locate journal's image: " << cpp_strerror(r) | |
146 | << std::endl; | |
147 | return r; | |
148 | } | |
149 | ||
150 | librbd::Image image; | |
151 | r = utils::open_image(io_ctx, image_name, false, &image); | |
152 | if (r < 0) { | |
153 | std::cerr << "failed to open image: " << cpp_strerror(r) << std::endl; | |
154 | return r; | |
155 | } | |
156 | ||
157 | r = image.update_features(RBD_FEATURE_JOURNALING, false); | |
158 | if (r < 0) { | |
159 | std::cerr << "failed to disable image journaling: " << cpp_strerror(r) | |
160 | << std::endl; | |
161 | return r; | |
162 | } | |
163 | ||
164 | r = image.update_features(RBD_FEATURE_JOURNALING, true); | |
165 | if (r < 0) { | |
166 | std::cerr << "failed to re-enable image journaling: " << cpp_strerror(r) | |
167 | << std::endl; | |
168 | return r; | |
169 | } | |
170 | return 0; | |
171 | } | |
172 | ||
173 | static int do_disconnect_journal_client(librados::IoCtx& io_ctx, | |
174 | const std::string& journal_id, | |
175 | const std::string& client_id) | |
176 | { | |
177 | int r; | |
178 | ||
179 | C_SaferCond cond; | |
180 | uint64_t minimum_set; | |
181 | uint64_t active_set; | |
182 | std::set<cls::journal::Client> registered_clients; | |
183 | std::string oid = ::journal::Journaler::header_oid(journal_id); | |
184 | ||
185 | cls::journal::client::get_mutable_metadata(io_ctx, oid, &minimum_set, | |
186 | &active_set, ®istered_clients, | |
187 | &cond); | |
188 | r = cond.wait(); | |
189 | if (r < 0) { | |
190 | std::cerr << "warning: failed to get journal metadata" << std::endl; | |
191 | return r; | |
192 | } | |
193 | ||
194 | static const std::string IMAGE_CLIENT_ID(""); | |
195 | ||
196 | bool found = false; | |
197 | for (auto &c : registered_clients) { | |
198 | if (c.id == IMAGE_CLIENT_ID || (!client_id.empty() && client_id != c.id)) { | |
199 | continue; | |
200 | } | |
201 | r = cls::journal::client::client_update_state(io_ctx, oid, c.id, | |
202 | cls::journal::CLIENT_STATE_DISCONNECTED); | |
203 | if (r < 0) { | |
204 | std::cerr << "warning: failed to disconnect client " << c.id << ": " | |
205 | << cpp_strerror(r) << std::endl; | |
206 | return r; | |
207 | } | |
208 | std::cout << "client " << c.id << " disconnected" << std::endl; | |
209 | found = true; | |
210 | } | |
211 | ||
212 | if (!found) { | |
213 | if (!client_id.empty()) { | |
214 | std::cerr << "warning: client " << client_id << " is not registered" | |
215 | << std::endl; | |
216 | } else { | |
217 | std::cerr << "no registered clients to disconnect" << std::endl; | |
218 | } | |
219 | return -ENOENT; | |
220 | } | |
221 | ||
222 | bufferlist bl; | |
223 | r = io_ctx.notify2(oid, bl, 5000, NULL); | |
224 | if (r < 0) { | |
225 | std::cerr << "warning: failed to notify state change:" << ": " | |
226 | << cpp_strerror(r) << std::endl; | |
227 | return r; | |
228 | } | |
229 | ||
230 | return 0; | |
231 | } | |
232 | ||
233 | class Journaler : public ::journal::Journaler { | |
234 | public: | |
235 | Journaler(librados::IoCtx& io_ctx, const std::string& journal_id, | |
236 | const std::string &client_id) : | |
237 | ::journal::Journaler(io_ctx, journal_id, client_id, {}) { | |
238 | } | |
239 | ||
240 | int init() { | |
241 | int r; | |
242 | ||
243 | // TODO register with librbd payload | |
244 | r = register_client(bufferlist()); | |
245 | if (r < 0) { | |
246 | std::cerr << "failed to register client: " << cpp_strerror(r) | |
247 | << std::endl; | |
248 | return r; | |
249 | } | |
250 | ||
251 | C_SaferCond cond; | |
252 | ||
253 | ::journal::Journaler::init(&cond); | |
254 | r = cond.wait(); | |
255 | if (r < 0) { | |
256 | std::cerr << "failed to initialize journal: " << cpp_strerror(r) | |
257 | << std::endl; | |
258 | (void) unregister_client(); | |
259 | return r; | |
260 | } | |
261 | ||
262 | return 0; | |
263 | } | |
264 | ||
265 | int shut_down() { | |
266 | int r = unregister_client(); | |
267 | if (r < 0) { | |
268 | std::cerr << "rbd: failed to unregister journal client: " | |
269 | << cpp_strerror(r) << std::endl; | |
270 | } | |
271 | ::journal::Journaler::shut_down(); | |
272 | ||
273 | return r; | |
274 | } | |
275 | }; | |
276 | ||
277 | class JournalPlayer { | |
278 | public: | |
279 | JournalPlayer(librados::IoCtx& io_ctx, const std::string& journal_id, | |
280 | const std::string &client_id) : | |
281 | m_journaler(io_ctx, journal_id, client_id), | |
282 | m_cond(), | |
283 | m_r(0) { | |
284 | } | |
285 | ||
286 | virtual ~JournalPlayer() {} | |
287 | ||
288 | virtual int exec() { | |
289 | int r; | |
290 | ||
291 | r = m_journaler.init(); | |
292 | if (r < 0) { | |
293 | return r; | |
294 | } | |
295 | ||
296 | ReplayHandler replay_handler(this); | |
297 | ||
298 | m_journaler.start_replay(&replay_handler); | |
299 | ||
300 | r = m_cond.wait(); | |
301 | if (r < 0) { | |
302 | std::cerr << "rbd: failed to process journal: " << cpp_strerror(r) | |
303 | << std::endl; | |
304 | if (m_r == 0) { | |
305 | m_r = r; | |
306 | } | |
307 | } | |
308 | return m_r; | |
309 | } | |
310 | ||
311 | int shut_down() { | |
312 | return m_journaler.shut_down(); | |
313 | } | |
314 | ||
315 | protected: | |
316 | struct ReplayHandler : public ::journal::ReplayHandler { | |
317 | JournalPlayer *journal; | |
318 | explicit ReplayHandler(JournalPlayer *_journal) : journal(_journal) {} | |
319 | ||
320 | void get() override {} | |
321 | void put() override {} | |
322 | ||
323 | void handle_entries_available() override { | |
324 | journal->handle_replay_ready(); | |
325 | } | |
326 | void handle_complete(int r) override { | |
327 | journal->handle_replay_complete(r); | |
328 | } | |
329 | }; | |
330 | ||
331 | void handle_replay_ready() { | |
332 | int r = 0; | |
333 | while (true) { | |
334 | ::journal::ReplayEntry replay_entry; | |
335 | uint64_t tag_id; | |
336 | if (!m_journaler.try_pop_front(&replay_entry, &tag_id)) { | |
337 | break; | |
338 | } | |
339 | ||
340 | r = process_entry(replay_entry, tag_id); | |
341 | if (r < 0) { | |
342 | break; | |
343 | } | |
344 | } | |
345 | } | |
346 | ||
347 | virtual int process_entry(::journal::ReplayEntry replay_entry, | |
348 | uint64_t tag_id) = 0; | |
349 | ||
350 | void handle_replay_complete(int r) { | |
351 | if (m_r == 0 && r < 0) { | |
352 | m_r = r; | |
353 | } | |
354 | m_journaler.stop_replay(&m_cond); | |
355 | } | |
356 | ||
357 | Journaler m_journaler; | |
358 | C_SaferCond m_cond; | |
359 | int m_r; | |
360 | }; | |
361 | ||
362 | static int inspect_entry(bufferlist& data, | |
363 | librbd::journal::EventEntry& event_entry, | |
364 | bool verbose) { | |
365 | try { | |
366 | bufferlist::iterator it = data.begin(); | |
367 | ::decode(event_entry, it); | |
368 | } catch (const buffer::error &err) { | |
369 | std::cerr << "failed to decode event entry: " << err.what() << std::endl; | |
370 | return -EINVAL; | |
371 | } | |
372 | if (verbose) { | |
373 | JSONFormatter f(true); | |
374 | f.open_object_section("event_entry"); | |
375 | event_entry.dump(&f); | |
376 | f.close_section(); | |
377 | f.flush(std::cout); | |
378 | } | |
379 | return 0; | |
380 | } | |
381 | ||
382 | class JournalInspector : public JournalPlayer { | |
383 | public: | |
384 | JournalInspector(librados::IoCtx& io_ctx, const std::string& journal_id, | |
385 | bool verbose) : | |
386 | JournalPlayer(io_ctx, journal_id, "INSPECT"), | |
387 | m_verbose(verbose), | |
388 | m_s() { | |
389 | } | |
390 | ||
391 | int exec() override { | |
392 | int r = JournalPlayer::exec(); | |
393 | m_s.print(); | |
394 | return r; | |
395 | } | |
396 | ||
397 | private: | |
398 | struct Stats { | |
399 | Stats() : total(0), error(0) {} | |
400 | ||
401 | void print() { | |
402 | std::cout << "Summary:" << std::endl | |
403 | << " " << total << " entries inspected, " << error << " errors" | |
404 | << std::endl; | |
405 | } | |
406 | ||
407 | int total; | |
408 | int error; | |
409 | }; | |
410 | ||
411 | int process_entry(::journal::ReplayEntry replay_entry, | |
412 | uint64_t tag_id) override { | |
413 | m_s.total++; | |
414 | if (m_verbose) { | |
415 | std::cout << "Entry: tag_id=" << tag_id << ", commit_tid=" | |
416 | << replay_entry.get_commit_tid() << std::endl; | |
417 | } | |
418 | bufferlist data = replay_entry.get_data(); | |
419 | librbd::journal::EventEntry event_entry; | |
420 | int r = inspect_entry(data, event_entry, m_verbose); | |
421 | if (r < 0) { | |
422 | m_r = r; | |
423 | m_s.error++; | |
424 | } | |
425 | return 0; | |
426 | } | |
427 | ||
428 | bool m_verbose; | |
429 | Stats m_s; | |
430 | }; | |
431 | ||
432 | static int do_inspect_journal(librados::IoCtx& io_ctx, | |
433 | const std::string& journal_id, | |
434 | bool verbose) { | |
435 | JournalInspector inspector(io_ctx, journal_id, verbose); | |
436 | int r = inspector.exec(); | |
437 | if (r < 0) { | |
438 | inspector.shut_down(); | |
439 | return r; | |
440 | } | |
441 | ||
442 | r = inspector.shut_down(); | |
443 | if (r < 0) { | |
444 | return r; | |
445 | } | |
446 | return 0; | |
447 | } | |
448 | ||
449 | struct ExportEntry { | |
450 | uint64_t tag_id; | |
451 | uint64_t commit_tid; | |
452 | int type; | |
453 | bufferlist entry; | |
454 | ||
455 | ExportEntry() : tag_id(0), commit_tid(0), type(0), entry() {} | |
456 | ||
457 | ExportEntry(uint64_t tag_id, uint64_t commit_tid, int type, | |
458 | const bufferlist& entry) | |
459 | : tag_id(tag_id), commit_tid(commit_tid), type(type), entry(entry) { | |
460 | } | |
461 | ||
462 | void dump(Formatter *f) const { | |
463 | ::encode_json("tag_id", tag_id, f); | |
464 | ::encode_json("commit_tid", commit_tid, f); | |
465 | ::encode_json("type", type, f); | |
466 | ::encode_json("entry", entry, f); | |
467 | } | |
468 | ||
469 | void decode_json(JSONObj *obj) { | |
470 | JSONDecoder::decode_json("tag_id", tag_id, obj); | |
471 | JSONDecoder::decode_json("commit_tid", commit_tid, obj); | |
472 | JSONDecoder::decode_json("type", type, obj); | |
473 | JSONDecoder::decode_json("entry", entry, obj); | |
474 | } | |
475 | }; | |
476 | ||
477 | class JournalExporter : public JournalPlayer { | |
478 | public: | |
479 | JournalExporter(librados::IoCtx& io_ctx, const std::string& journal_id, | |
480 | int fd, bool no_error, bool verbose) : | |
481 | JournalPlayer(io_ctx, journal_id, "EXPORT"), | |
482 | m_journal_id(journal_id), | |
483 | m_fd(fd), | |
484 | m_no_error(no_error), | |
485 | m_verbose(verbose), | |
486 | m_s() { | |
487 | } | |
488 | ||
489 | int exec() override { | |
490 | std::string header("# journal_id: " + m_journal_id + "\n"); | |
491 | int r; | |
492 | r = safe_write(m_fd, header.c_str(), header.size()); | |
493 | if (r < 0) { | |
494 | std::cerr << "rbd: failed to write to export file: " << cpp_strerror(r) | |
495 | << std::endl; | |
496 | return r; | |
497 | } | |
498 | r = JournalPlayer::exec(); | |
499 | m_s.print(); | |
500 | return r; | |
501 | } | |
502 | ||
503 | private: | |
504 | struct Stats { | |
505 | Stats() : total(0), error(0) {} | |
506 | ||
507 | void print() { | |
508 | std::cout << total << " entries processed, " << error << " errors" | |
509 | << std::endl; | |
510 | } | |
511 | ||
512 | int total; | |
513 | int error; | |
514 | }; | |
515 | ||
516 | int process_entry(::journal::ReplayEntry replay_entry, | |
517 | uint64_t tag_id) override { | |
518 | m_s.total++; | |
519 | int type = -1; | |
520 | bufferlist entry = replay_entry.get_data(); | |
521 | librbd::journal::EventEntry event_entry; | |
522 | int r = inspect_entry(entry, event_entry, m_verbose); | |
523 | if (r < 0) { | |
524 | m_s.error++; | |
525 | m_r = r; | |
526 | return m_no_error ? 0 : r; | |
527 | } else { | |
528 | type = event_entry.get_event_type(); | |
529 | } | |
530 | ExportEntry export_entry(tag_id, replay_entry.get_commit_tid(), type, | |
531 | entry); | |
532 | JSONFormatter f; | |
533 | ::encode_json("event_entry", export_entry, &f); | |
534 | std::ostringstream oss; | |
535 | f.flush(oss); | |
536 | std::string objstr = oss.str(); | |
537 | std::string header = stringify(objstr.size()) + " "; | |
538 | r = safe_write(m_fd, header.c_str(), header.size()); | |
539 | if (r == 0) { | |
540 | r = safe_write(m_fd, objstr.c_str(), objstr.size()); | |
541 | } | |
542 | if (r == 0) { | |
543 | r = safe_write(m_fd, "\n", 1); | |
544 | } | |
545 | if (r < 0) { | |
546 | std::cerr << "rbd: failed to write to export file: " << cpp_strerror(r) | |
547 | << std::endl; | |
548 | m_s.error++; | |
549 | return r; | |
550 | } | |
551 | return 0; | |
552 | } | |
553 | ||
554 | std::string m_journal_id; | |
555 | int m_fd; | |
556 | bool m_no_error; | |
557 | bool m_verbose; | |
558 | Stats m_s; | |
559 | }; | |
560 | ||
561 | static int do_export_journal(librados::IoCtx& io_ctx, | |
562 | const std::string& journal_id, | |
563 | const std::string& path, | |
564 | bool no_error, bool verbose) { | |
565 | int r; | |
566 | int fd; | |
567 | bool to_stdout = path == "-"; | |
568 | if (to_stdout) { | |
569 | fd = STDOUT_FILENO; | |
570 | } else { | |
571 | fd = open(path.c_str(), O_WRONLY | O_CREAT | O_EXCL, 0644); | |
572 | if (fd < 0) { | |
573 | r = -errno; | |
574 | std::cerr << "rbd: error creating " << path << std::endl; | |
575 | return r; | |
576 | } | |
577 | #ifdef HAVE_POSIX_FADVISE | |
578 | posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL); | |
579 | #endif | |
580 | } | |
581 | ||
582 | JournalExporter exporter(io_ctx, journal_id, fd, no_error, verbose); | |
583 | r = exporter.exec(); | |
584 | ||
585 | if (!to_stdout) { | |
586 | close(fd); | |
587 | } | |
588 | ||
589 | int shut_down_r = exporter.shut_down(); | |
590 | if (r == 0 && shut_down_r < 0) { | |
591 | r = shut_down_r; | |
592 | } | |
593 | ||
594 | return r; | |
595 | } | |
596 | ||
597 | class JournalImporter { | |
598 | public: | |
599 | JournalImporter(librados::IoCtx& io_ctx, const std::string& journal_id, | |
600 | int fd, bool no_error, bool verbose) : | |
601 | m_journaler(io_ctx, journal_id, "IMPORT"), | |
602 | m_fd(fd), | |
603 | m_no_error(no_error), | |
604 | m_verbose(verbose) { | |
605 | } | |
606 | ||
607 | bool read_entry(bufferlist& bl, int& r) { | |
608 | // Entries are storead in the file using the following format: | |
609 | // | |
610 | // # Optional comments | |
611 | // NNN {json encoded entry} | |
612 | // ... | |
613 | // | |
614 | // Where NNN is the encoded entry size. | |
615 | bl.clear(); | |
616 | char buf[80]; | |
617 | // Skip line feed and comments (lines started with #). | |
618 | while ((r = safe_read_exact(m_fd, buf, 1)) == 0) { | |
619 | if (buf[0] == '\n') { | |
620 | continue; | |
621 | } else if (buf[0] == '#') { | |
622 | while ((r = safe_read_exact(m_fd, buf, 1)) == 0) { | |
623 | if (buf[0] == '\n') { | |
624 | break; | |
625 | } | |
626 | } | |
627 | } else { | |
628 | break; | |
629 | } | |
630 | } | |
631 | if (r < 0) { | |
632 | if (r == -EDOM) { | |
633 | r = 0; | |
634 | } | |
635 | return false; | |
636 | } | |
637 | // Read entry size to buf. | |
638 | if (!isdigit(buf[0])) { | |
639 | r = -EINVAL; | |
640 | std::cerr << "rbd: import data invalid format (digit expected)" | |
641 | << std::endl; | |
642 | return false; | |
643 | } | |
644 | for (size_t i = 1; i < sizeof(buf); i++) { | |
645 | r = safe_read_exact(m_fd, buf + i, 1); | |
646 | if (r < 0) { | |
647 | std::cerr << "rbd: error reading import data" << std::endl; | |
648 | return false; | |
649 | } | |
650 | if (!isdigit(buf[i])) { | |
651 | if (buf[i] != ' ') { | |
652 | r = -EINVAL; | |
653 | std::cerr << "rbd: import data invalid format (space expected)" | |
654 | << std::endl; | |
655 | return false; | |
656 | } | |
657 | buf[i] = '\0'; | |
658 | break; | |
659 | } | |
660 | } | |
661 | int entry_size = atoi(buf); | |
662 | if (entry_size == 0) { | |
663 | r = -EINVAL; | |
664 | std::cerr << "rbd: import data invalid format (zero entry size)" | |
665 | << std::endl; | |
666 | return false; | |
667 | } | |
668 | assert(entry_size > 0); | |
669 | // Read entry. | |
670 | r = bl.read_fd(m_fd, entry_size); | |
671 | if (r < 0) { | |
672 | std::cerr << "rbd: error reading from stdin: " << cpp_strerror(r) | |
673 | << std::endl; | |
674 | return false; | |
675 | } | |
676 | if (r != entry_size) { | |
677 | std::cerr << "rbd: error reading from stdin: trucated" | |
678 | << std::endl; | |
679 | r = -EINVAL; | |
680 | return false; | |
681 | } | |
682 | r = 0; | |
683 | return true; | |
684 | } | |
685 | ||
686 | int exec() { | |
687 | int r = m_journaler.init(); | |
688 | if (r < 0) { | |
689 | return r; | |
690 | } | |
691 | m_journaler.start_append(0, 0, 0); | |
692 | ||
693 | int r1 = 0; | |
694 | bufferlist bl; | |
695 | int n = 0; | |
696 | int error_count = 0; | |
697 | while (read_entry(bl, r)) { | |
698 | n++; | |
699 | error_count++; | |
700 | JSONParser p; | |
701 | if (!p.parse(bl.c_str(), bl.length())) { | |
702 | std::cerr << "rbd: error parsing input (entry " << n << ")" | |
703 | << std::endl; | |
704 | r = -EINVAL; | |
705 | if (m_no_error) { | |
706 | r1 = r; | |
707 | continue; | |
708 | } else { | |
709 | break; | |
710 | } | |
711 | } | |
712 | ExportEntry e; | |
713 | try { | |
714 | decode_json_obj(e, &p); | |
715 | } catch (JSONDecoder::err& err) { | |
716 | std::cerr << "rbd: error json decoding import data (entry " << n << "):" | |
717 | << err.message << std::endl; | |
718 | r = -EINVAL; | |
719 | if (m_no_error) { | |
720 | r1 = r; | |
721 | continue; | |
722 | } else { | |
723 | break; | |
724 | } | |
725 | } | |
726 | librbd::journal::EventEntry event_entry; | |
727 | r = inspect_entry(e.entry, event_entry, m_verbose); | |
728 | if (r < 0) { | |
729 | std::cerr << "rbd: corrupted entry " << n << ": tag_tid=" << e.tag_id | |
730 | << ", commit_tid=" << e.commit_tid << std::endl; | |
731 | if (m_no_error) { | |
732 | r1 = r; | |
733 | continue; | |
734 | } else { | |
735 | break; | |
736 | } | |
737 | } | |
738 | m_journaler.append(e.tag_id, e.entry); | |
739 | error_count--; | |
740 | } | |
741 | ||
742 | std::cout << n << " entries processed, " << error_count << " errors" << std::endl; | |
743 | ||
744 | std::cout << "Waiting for journal append to complete..." << std::endl; | |
745 | ||
746 | C_SaferCond cond; | |
747 | m_journaler.stop_append(&cond); | |
748 | r = cond.wait(); | |
749 | ||
750 | if (r < 0) { | |
751 | std::cerr << "failed to append journal: " << cpp_strerror(r) << std::endl; | |
752 | } | |
753 | ||
754 | if (r1 < 0 && r == 0) { | |
755 | r = r1; | |
756 | } | |
757 | return r; | |
758 | } | |
759 | ||
760 | int shut_down() { | |
761 | return m_journaler.shut_down(); | |
762 | } | |
763 | ||
764 | private: | |
765 | Journaler m_journaler; | |
766 | int m_fd; | |
767 | bool m_no_error; | |
768 | bool m_verbose; | |
769 | }; | |
770 | ||
771 | static int do_import_journal(librados::IoCtx& io_ctx, | |
772 | const std::string& journal_id, | |
773 | const std::string& path, | |
774 | bool no_error, bool verbose) { | |
775 | int r; | |
776 | ||
777 | int fd; | |
778 | bool from_stdin = path == "-"; | |
779 | if (from_stdin) { | |
780 | fd = STDIN_FILENO; | |
781 | } else { | |
782 | if ((fd = open(path.c_str(), O_RDONLY)) < 0) { | |
783 | r = -errno; | |
784 | std::cerr << "rbd: error opening " << path << std::endl; | |
785 | return r; | |
786 | } | |
787 | #ifdef HAVE_POSIX_FADVISE | |
788 | posix_fadvise(fd, 0, 0, POSIX_FADV_SEQUENTIAL); | |
789 | #endif | |
790 | } | |
791 | ||
792 | JournalImporter importer(io_ctx, journal_id, fd, no_error, verbose); | |
793 | r = importer.exec(); | |
794 | ||
795 | if (!from_stdin) { | |
796 | close(fd); | |
797 | } | |
798 | ||
799 | int shut_down_r = importer.shut_down(); | |
800 | if (r == 0 && shut_down_r < 0) { | |
801 | r = shut_down_r; | |
802 | } | |
803 | ||
804 | return r; | |
805 | } | |
806 | ||
807 | void get_info_arguments(po::options_description *positional, | |
808 | po::options_description *options) { | |
809 | at::add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE); | |
810 | at::add_format_options(options); | |
811 | } | |
812 | ||
813 | int execute_info(const po::variables_map &vm) { | |
814 | size_t arg_index = 0; | |
815 | std::string pool_name; | |
816 | std::string journal_name; | |
817 | int r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE, | |
818 | &arg_index, &pool_name, &journal_name); | |
819 | if (r < 0) { | |
820 | return r; | |
821 | } | |
822 | ||
823 | at::Format::Formatter formatter; | |
824 | r = utils::get_formatter(vm, &formatter); | |
825 | if (r < 0) { | |
826 | return r; | |
827 | } | |
828 | ||
829 | librados::Rados rados; | |
830 | librados::IoCtx io_ctx; | |
831 | r = utils::init(pool_name, &rados, &io_ctx); | |
832 | if (r < 0) { | |
833 | return r; | |
834 | } | |
835 | ||
836 | r = do_show_journal_info(rados, io_ctx, journal_name, formatter.get()); | |
837 | if (r < 0) { | |
838 | std::cerr << "rbd: journal info: " << cpp_strerror(r) << std::endl; | |
839 | return r; | |
840 | } | |
841 | return 0; | |
842 | ||
843 | } | |
844 | ||
845 | void get_status_arguments(po::options_description *positional, | |
846 | po::options_description *options) { | |
847 | at::add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE); | |
848 | at::add_format_options(options); | |
849 | } | |
850 | ||
851 | int execute_status(const po::variables_map &vm) { | |
852 | size_t arg_index = 0; | |
853 | std::string pool_name; | |
854 | std::string journal_name; | |
855 | int r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE, | |
856 | &arg_index, &pool_name, &journal_name); | |
857 | if (r < 0) { | |
858 | return r; | |
859 | } | |
860 | ||
861 | at::Format::Formatter formatter; | |
862 | r = utils::get_formatter(vm, &formatter); | |
863 | if (r < 0) { | |
864 | return r; | |
865 | } | |
866 | ||
867 | librados::Rados rados; | |
868 | librados::IoCtx io_ctx; | |
869 | r = utils::init(pool_name, &rados, &io_ctx); | |
870 | if (r < 0) { | |
871 | return r; | |
872 | } | |
873 | ||
874 | r = do_show_journal_status(io_ctx, journal_name, formatter.get()); | |
875 | if (r < 0) { | |
876 | std::cerr << "rbd: journal status: " << cpp_strerror(r) << std::endl; | |
877 | return r; | |
878 | } | |
879 | return 0; | |
880 | } | |
881 | ||
882 | void get_reset_arguments(po::options_description *positional, | |
883 | po::options_description *options) { | |
884 | at::add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE); | |
885 | } | |
886 | ||
887 | int execute_reset(const po::variables_map &vm) { | |
888 | size_t arg_index = 0; | |
889 | std::string pool_name; | |
890 | std::string journal_name; | |
891 | int r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE, | |
892 | &arg_index, &pool_name, &journal_name); | |
893 | if (r < 0) { | |
894 | return r; | |
895 | } | |
896 | ||
897 | librados::Rados rados; | |
898 | librados::IoCtx io_ctx; | |
899 | r = utils::init(pool_name, &rados, &io_ctx); | |
900 | if (r < 0) { | |
901 | return r; | |
902 | } | |
903 | ||
904 | r = do_reset_journal(io_ctx, journal_name); | |
905 | if (r < 0) { | |
906 | std::cerr << "rbd: journal reset: " << cpp_strerror(r) << std::endl; | |
907 | return r; | |
908 | } | |
909 | return 0; | |
910 | } | |
911 | ||
912 | void get_client_disconnect_arguments(po::options_description *positional, | |
913 | po::options_description *options) { | |
914 | at::add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE); | |
915 | options->add_options() | |
916 | ("client-id", po::value<std::string>(), | |
917 | "client ID (or leave unspecified to disconnect all)"); | |
918 | } | |
919 | ||
920 | int execute_client_disconnect(const po::variables_map &vm) { | |
921 | size_t arg_index = 0; | |
922 | std::string pool_name; | |
923 | std::string journal_name; | |
924 | int r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE, | |
925 | &arg_index, &pool_name, &journal_name); | |
926 | if (r < 0) { | |
927 | return r; | |
928 | } | |
929 | ||
930 | std::string client_id; | |
931 | if (vm.count("client-id")) { | |
932 | client_id = vm["client-id"].as<std::string>(); | |
933 | } | |
934 | ||
935 | librados::Rados rados; | |
936 | librados::IoCtx io_ctx; | |
937 | r = utils::init(pool_name, &rados, &io_ctx); | |
938 | if (r < 0) { | |
939 | return r; | |
940 | } | |
941 | ||
942 | r = do_disconnect_journal_client(io_ctx, journal_name, client_id); | |
943 | if (r < 0) { | |
944 | std::cerr << "rbd: journal client disconnect: " << cpp_strerror(r) | |
945 | << std::endl; | |
946 | return r; | |
947 | } | |
948 | return 0; | |
949 | } | |
950 | ||
951 | void get_inspect_arguments(po::options_description *positional, | |
952 | po::options_description *options) { | |
953 | at::add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_NONE); | |
954 | at::add_verbose_option(options); | |
955 | } | |
956 | ||
957 | int execute_inspect(const po::variables_map &vm) { | |
958 | size_t arg_index = 0; | |
959 | std::string pool_name; | |
960 | std::string journal_name; | |
961 | int r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_NONE, | |
962 | &arg_index, &pool_name, &journal_name); | |
963 | if (r < 0) { | |
964 | return r; | |
965 | } | |
966 | ||
967 | librados::Rados rados; | |
968 | librados::IoCtx io_ctx; | |
969 | r = utils::init(pool_name, &rados, &io_ctx); | |
970 | if (r < 0) { | |
971 | return r; | |
972 | } | |
973 | ||
974 | r = do_inspect_journal(io_ctx, journal_name, vm[at::VERBOSE].as<bool>()); | |
975 | if (r < 0) { | |
976 | std::cerr << "rbd: journal inspect: " << cpp_strerror(r) << std::endl; | |
977 | return r; | |
978 | } | |
979 | return 0; | |
980 | } | |
981 | ||
982 | void get_export_arguments(po::options_description *positional, | |
983 | po::options_description *options) { | |
984 | at::add_journal_spec_options(positional, options, | |
985 | at::ARGUMENT_MODIFIER_SOURCE); | |
986 | at::add_path_options(positional, options, | |
987 | "export file (or '-' for stdout)"); | |
988 | at::add_verbose_option(options); | |
989 | at::add_no_error_option(options); | |
990 | } | |
991 | ||
992 | int execute_export(const po::variables_map &vm) { | |
993 | size_t arg_index = 0; | |
994 | std::string pool_name; | |
995 | std::string journal_name; | |
996 | int r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_SOURCE, | |
997 | &arg_index, &pool_name, &journal_name); | |
998 | if (r < 0) { | |
999 | return r; | |
1000 | } | |
1001 | ||
1002 | std::string path; | |
1003 | r = utils::get_path(vm, utils::get_positional_argument(vm, 1), &path); | |
1004 | if (r < 0) { | |
1005 | return r; | |
1006 | } | |
1007 | ||
1008 | librados::Rados rados; | |
1009 | librados::IoCtx io_ctx; | |
1010 | r = utils::init(pool_name, &rados, &io_ctx); | |
1011 | if (r < 0) { | |
1012 | return r; | |
1013 | } | |
1014 | ||
1015 | r = do_export_journal(io_ctx, journal_name, path, vm[at::NO_ERROR].as<bool>(), | |
1016 | vm[at::VERBOSE].as<bool>()); | |
1017 | if (r < 0) { | |
1018 | std::cerr << "rbd: journal export: " << cpp_strerror(r) << std::endl; | |
1019 | return r; | |
1020 | } | |
1021 | return 0; | |
1022 | } | |
1023 | ||
1024 | void get_import_arguments(po::options_description *positional, | |
1025 | po::options_description *options) { | |
1026 | at::add_path_options(positional, options, | |
1027 | "import file (or '-' for stdin)"); | |
1028 | at::add_journal_spec_options(positional, options, at::ARGUMENT_MODIFIER_DEST); | |
1029 | at::add_verbose_option(options); | |
1030 | at::add_no_error_option(options); | |
1031 | } | |
1032 | ||
1033 | int execute_import(const po::variables_map &vm) { | |
1034 | std::string path; | |
1035 | int r = utils::get_path(vm, utils::get_positional_argument(vm, 0), &path); | |
1036 | if (r < 0) { | |
1037 | return r; | |
1038 | } | |
1039 | ||
1040 | size_t arg_index = 1; | |
1041 | std::string pool_name; | |
1042 | std::string journal_name; | |
1043 | r = utils::get_pool_journal_names(vm, at::ARGUMENT_MODIFIER_DEST, | |
1044 | &arg_index, &pool_name, &journal_name); | |
1045 | if (r < 0) { | |
1046 | return r; | |
1047 | } | |
1048 | ||
1049 | librados::Rados rados; | |
1050 | librados::IoCtx io_ctx; | |
1051 | r = utils::init(pool_name, &rados, &io_ctx); | |
1052 | if (r < 0) { | |
1053 | return r; | |
1054 | } | |
1055 | ||
1056 | r = do_import_journal(io_ctx, journal_name, path, vm[at::NO_ERROR].as<bool>(), | |
1057 | vm[at::VERBOSE].as<bool>()); | |
1058 | if (r < 0) { | |
1059 | std::cerr << "rbd: journal export: " << cpp_strerror(r) << std::endl; | |
1060 | return r; | |
1061 | } | |
1062 | return 0; | |
1063 | } | |
1064 | ||
1065 | Shell::Action action_info( | |
1066 | {"journal", "info"}, {}, "Show information about image journal.", "", | |
1067 | &get_info_arguments, &execute_info); | |
1068 | ||
1069 | Shell::Action action_status( | |
1070 | {"journal", "status"}, {}, "Show status of image journal.", "", | |
1071 | &get_status_arguments, &execute_status); | |
1072 | ||
1073 | Shell::Action action_reset( | |
1074 | {"journal", "reset"}, {}, "Reset image journal.", "", | |
1075 | &get_reset_arguments, &execute_reset); | |
1076 | ||
1077 | Shell::Action action_inspect( | |
1078 | {"journal", "inspect"}, {}, "Inspect image journal for structural errors.", "", | |
1079 | &get_inspect_arguments, &execute_inspect); | |
1080 | ||
1081 | Shell::Action action_export( | |
1082 | {"journal", "export"}, {}, "Export image journal.", "", | |
1083 | &get_export_arguments, &execute_export); | |
1084 | ||
1085 | Shell::Action action_import( | |
1086 | {"journal", "import"}, {}, "Import image journal.", "", | |
1087 | &get_import_arguments, &execute_import); | |
1088 | ||
1089 | Shell::Action action_disconnect( | |
1090 | {"journal", "client", "disconnect"}, {}, | |
1091 | "Flag image journal client as disconnected.", "", | |
1092 | &get_client_disconnect_arguments, &execute_client_disconnect); | |
1093 | ||
1094 | } // namespace journal | |
1095 | } // namespace action | |
1096 | } // namespace rbd |