]> git.proxmox.com Git - ceph.git/blob - ceph/src/tools/ceph_dedup_tool.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / tools / ceph_dedup_tool.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Author: Myoungwon Oh <ohmyoungwon@gmail.com>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14 #include "include/types.h"
15
16 #include "include/rados/buffer.h"
17 #include "include/rados/librados.hpp"
18 #include "include/rados/rados_types.hpp"
19
20 #include "acconfig.h"
21
22 #include "common/Cond.h"
23 #include "common/Formatter.h"
24 #include "common/ceph_argparse.h"
25 #include "common/ceph_crypto.h"
26 #include "common/config.h"
27 #include "common/debug.h"
28 #include "common/errno.h"
29 #include "common/obj_bencher.h"
30 #include "global/global_init.h"
31
32 #include <iostream>
33 #include <fstream>
34 #include <stdlib.h>
35 #include <time.h>
36 #include <sstream>
37 #include <errno.h>
38 #include <dirent.h>
39 #include <stdexcept>
40 #include <climits>
41 #include <locale>
42 #include <memory>
43 #include <math.h>
44
45 #include "tools/RadosDump.h"
46 #include "cls/cas/cls_cas_client.h"
47 #include "cls/cas/cls_cas_internal.h"
48 #include "include/stringify.h"
49 #include "global/signal_handler.h"
50 #include "common/CDC.h"
51 #include "common/Preforker.h"
52
53 #include <boost/program_options/variables_map.hpp>
54 #include <boost/program_options/parsers.hpp>
55
56 using namespace std;
57 namespace po = boost::program_options;
58
59 struct EstimateResult {
60 std::unique_ptr<CDC> cdc;
61
62 uint64_t chunk_size;
63
64 ceph::mutex lock = ceph::make_mutex("EstimateResult::lock");
65
66 // < key, <count, chunk_size> >
67 map< string, pair <uint64_t, uint64_t> > chunk_statistics;
68 uint64_t total_bytes = 0;
69 std::atomic<uint64_t> total_objects = {0};
70
71 EstimateResult(std::string alg, int chunk_size)
72 : cdc(CDC::create(alg, chunk_size)),
73 chunk_size(1ull << chunk_size) {}
74
75 void add_chunk(bufferlist& chunk, const std::string& fp_algo) {
76 string fp;
77 if (fp_algo == "sha1") {
78 sha1_digest_t sha1_val = crypto::digest<crypto::SHA1>(chunk);
79 fp = sha1_val.to_str();
80 } else if (fp_algo == "sha256") {
81 sha256_digest_t sha256_val = crypto::digest<crypto::SHA256>(chunk);
82 fp = sha256_val.to_str();
83 } else if (fp_algo == "sha512") {
84 sha512_digest_t sha512_val = crypto::digest<crypto::SHA512>(chunk);
85 fp = sha512_val.to_str();
86 } else {
87 ceph_assert(0 == "no support fingerperint algorithm");
88 }
89
90 std::lock_guard l(lock);
91 auto p = chunk_statistics.find(fp);
92 if (p != chunk_statistics.end()) {
93 p->second.first++;
94 if (p->second.second != chunk.length()) {
95 cerr << "warning: hash collision on " << fp
96 << ": was " << p->second.second
97 << " now " << chunk.length() << std::endl;
98 }
99 } else {
100 chunk_statistics[fp] = make_pair(1, chunk.length());
101 }
102 total_bytes += chunk.length();
103 }
104
105 void dump(Formatter *f) const {
106 f->dump_unsigned("target_chunk_size", chunk_size);
107
108 uint64_t dedup_bytes = 0;
109 uint64_t dedup_objects = chunk_statistics.size();
110 for (auto& j : chunk_statistics) {
111 dedup_bytes += j.second.second;
112 }
113 //f->dump_unsigned("dedup_bytes", dedup_bytes);
114 //f->dump_unsigned("original_bytes", total_bytes);
115 f->dump_float("dedup_bytes_ratio",
116 (double)dedup_bytes / (double)total_bytes);
117 f->dump_float("dedup_objects_ratio",
118 (double)dedup_objects / (double)total_objects);
119
120 uint64_t avg = total_bytes / dedup_objects;
121 uint64_t sqsum = 0;
122 for (auto& j : chunk_statistics) {
123 sqsum += (avg - j.second.second) * (avg - j.second.second);
124 }
125 uint64_t stddev = sqrt(sqsum / dedup_objects);
126 f->dump_unsigned("chunk_size_average", avg);
127 f->dump_unsigned("chunk_size_stddev", stddev);
128 }
129 };
130
131 map<uint64_t, EstimateResult> dedup_estimates; // chunk size -> result
132
133 using namespace librados;
134 unsigned default_op_size = 1 << 26;
135 unsigned default_max_thread = 2;
136 int32_t default_report_period = 10;
137 ceph::mutex glock = ceph::make_mutex("glock");
138
139 po::options_description make_usage() {
140 po::options_description desc("Usage");
141 desc.add_options()
142 ("help,h", ": produce help message")
143 ("op estimate --pool <POOL> --chunk-size <CHUNK_SIZE> --chunk-algorithm <ALGO> --fingerprint-algorithm <FP_ALGO>",
144 ": estimate how many chunks are redundant")
145 ("op chunk-scrub --chunk-pool <POOL>",
146 ": perform chunk scrub")
147 ("op chunk-get-ref --chunk-pool <POOL> --object <OID> --target-ref <OID> --target-ref-pool-id <POOL_ID>",
148 ": get chunk object's reference")
149 ("op chunk-put-ref --chunk-pool <POOL> --object <OID> --target-ref <OID> --target-ref-pool-id <POOL_ID>",
150 ": put chunk object's reference")
151 ("op chunk-repair --chunk-pool <POOL> --object <OID> --target-ref <OID> --target-ref-pool-id <POOL_ID>",
152 ": fix mismatched references")
153 ("op dump-chunk-refs --chunk-pool <POOL> --object <OID>",
154 ": dump chunk object's references")
155 ("op chunk-dedup --pool <POOL> --object <OID> --chunk-pool <POOL> --fingerprint-algorithm <FP> --source-off <OFFSET> --source-length <LENGTH>",
156 ": perform a chunk dedup---deduplicate only a chunk, which is a part of object.")
157 ("op object-dedup --pool <POOL> --object <OID> --chunk-pool <POOL> --fingerprint-algorithm <FP> --dedup-cdc-chunk-size <CHUNK_SIZE> [--snap]",
158 ": perform a object dedup---deduplicate the entire object, not a chunk. Related snapshots are also deduplicated if --snap is given")
159 ("op sample-dedup --pool <POOL> --chunk-pool <POOL> --chunk-algorithm <ALGO> --fingerprint-algorithm <FP> --daemon --loop",
160 ": perform a sample dedup---make crawling threads which crawl objects in base pool and deduplicate them based on their deduplication efficiency")
161 ;
162 po::options_description op_desc("Opational arguments");
163 op_desc.add_options()
164 ("op", po::value<std::string>(), ": estimate|chunk-scrub|chunk-get-ref|chunk-put-ref|chunk-repair|dump-chunk-refs|chunk-dedup|object-dedup")
165 ("target-ref", po::value<std::string>(), ": set target object")
166 ("target-ref-pool-id", po::value<uint64_t>(), ": set target pool id")
167 ("object", po::value<std::string>(), ": set object name")
168 ("chunk-size", po::value<int>(), ": chunk size (byte)")
169 ("chunk-algorithm", po::value<std::string>(), ": <fixed|fastcdc>, set chunk-algorithm")
170 ("fingerprint-algorithm", po::value<std::string>(), ": <sha1|sha256|sha512>, set fingerprint-algorithm")
171 ("chunk-pool", po::value<std::string>(), ": set chunk pool name")
172 ("max-thread", po::value<int>(), ": set max thread")
173 ("report-period", po::value<int>(), ": set report-period")
174 ("max-seconds", po::value<int>(), ": set max runtime")
175 ("max-read-size", po::value<int>(), ": set max read size")
176 ("pool", po::value<std::string>(), ": set pool name")
177 ("min-chunk-size", po::value<int>(), ": min chunk size (byte)")
178 ("max-chunk-size", po::value<int>(), ": max chunk size (byte)")
179 ("source-off", po::value<uint64_t>(), ": set source offset")
180 ("source-length", po::value<uint64_t>(), ": set source length")
181 ("dedup-cdc-chunk-size", po::value<unsigned int>(), ": set dedup chunk size for cdc")
182 ("snap", ": deduplciate snapshotted object")
183 ("debug", ": enable debug")
184 ("pgid", ": set pgid")
185 ("chunk-dedup-threshold", po::value<uint32_t>(), ": set the threshold for chunk dedup (number of duplication) ")
186 ("sampling-ratio", po::value<int>(), ": set the sampling ratio (percentile)")
187 ("daemon", ": execute sample dedup in daemon mode")
188 ("loop", ": execute sample dedup in a loop until terminated. Sleeps 'wakeup-period' seconds between iterations")
189 ("wakeup-period", po::value<int>(), ": set the wakeup period of crawler thread (sec)")
190 ;
191 desc.add(op_desc);
192 return desc;
193 }
194
195 template <typename I, typename T>
196 static int rados_sistrtoll(I &i, T *val) {
197 std::string err;
198 *val = strict_iecstrtoll(i->second, &err);
199 if (err != "") {
200 cerr << "Invalid value for " << i->first << ": " << err << std::endl;
201 return -EINVAL;
202 } else {
203 return 0;
204 }
205 }
206
207 class EstimateDedupRatio;
208 class ChunkScrub;
209 class CrawlerThread : public Thread
210 {
211 IoCtx io_ctx;
212 int n;
213 int m;
214 ObjectCursor begin;
215 ObjectCursor end;
216 ceph::mutex m_lock = ceph::make_mutex("CrawlerThread::Locker");
217 ceph::condition_variable m_cond;
218 int32_t report_period;
219 bool m_stop = false;
220 uint64_t total_bytes = 0;
221 uint64_t total_objects = 0;
222 uint64_t examined_objects = 0;
223 uint64_t examined_bytes = 0;
224 uint64_t max_read_size = 0;
225 bool debug = false;
226 #define COND_WAIT_INTERVAL 10
227
228 public:
229 CrawlerThread(IoCtx& io_ctx, int n, int m,
230 ObjectCursor begin, ObjectCursor end, int32_t report_period,
231 uint64_t num_objects, uint64_t max_read_size = default_op_size):
232 io_ctx(io_ctx), n(n), m(m), begin(begin), end(end),
233 report_period(report_period), total_objects(num_objects), max_read_size(max_read_size)
234 {}
235
236 void signal(int signum) {
237 std::lock_guard l{m_lock};
238 m_stop = true;
239 m_cond.notify_all();
240 }
241 virtual void print_status(Formatter *f, ostream &out) {}
242 uint64_t get_examined_objects() { return examined_objects; }
243 uint64_t get_examined_bytes() { return examined_bytes; }
244 uint64_t get_total_bytes() { return total_bytes; }
245 uint64_t get_total_objects() { return total_objects; }
246 void set_debug(const bool debug_) { debug = debug_; }
247 friend class EstimateDedupRatio;
248 friend class ChunkScrub;
249 };
250
251 class EstimateDedupRatio : public CrawlerThread
252 {
253 string chunk_algo;
254 string fp_algo;
255 uint64_t chunk_size;
256 uint64_t max_seconds;
257
258 public:
259 EstimateDedupRatio(
260 IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end,
261 string chunk_algo, string fp_algo, uint64_t chunk_size, int32_t report_period,
262 uint64_t num_objects, uint64_t max_read_size,
263 uint64_t max_seconds):
264 CrawlerThread(io_ctx, n, m, begin, end, report_period, num_objects,
265 max_read_size),
266 chunk_algo(chunk_algo),
267 fp_algo(fp_algo),
268 chunk_size(chunk_size),
269 max_seconds(max_seconds) {
270 }
271
272 void* entry() {
273 estimate_dedup_ratio();
274 return NULL;
275 }
276 void estimate_dedup_ratio();
277 };
278
279 class ChunkScrub: public CrawlerThread
280 {
281 IoCtx chunk_io_ctx;
282 int damaged_objects = 0;
283
284 public:
285 ChunkScrub(IoCtx& io_ctx, int n, int m, ObjectCursor begin, ObjectCursor end,
286 IoCtx& chunk_io_ctx, int32_t report_period, uint64_t num_objects):
287 CrawlerThread(io_ctx, n, m, begin, end, report_period, num_objects), chunk_io_ctx(chunk_io_ctx)
288 { }
289 void* entry() {
290 chunk_scrub_common();
291 return NULL;
292 }
293 void chunk_scrub_common();
294 int get_damaged_objects() { return damaged_objects; }
295 void print_status(Formatter *f, ostream &out);
296 };
297
298 vector<std::unique_ptr<CrawlerThread>> estimate_threads;
299
300 static void print_dedup_estimate(std::ostream& out, std::string chunk_algo)
301 {
302 /*
303 uint64_t total_bytes = 0;
304 uint64_t total_objects = 0;
305 */
306 uint64_t examined_objects = 0;
307 uint64_t examined_bytes = 0;
308
309 for (auto &et : estimate_threads) {
310 examined_objects += et->get_examined_objects();
311 examined_bytes += et->get_examined_bytes();
312 }
313
314 auto f = Formatter::create("json-pretty");
315 f->open_object_section("results");
316 f->dump_string("chunk_algo", chunk_algo);
317 f->open_array_section("chunk_sizes");
318 for (auto& i : dedup_estimates) {
319 f->dump_object("chunker", i.second);
320 }
321 f->close_section();
322
323 f->open_object_section("summary");
324 f->dump_unsigned("examined_objects", examined_objects);
325 f->dump_unsigned("examined_bytes", examined_bytes);
326 /*
327 f->dump_unsigned("total_objects", total_objects);
328 f->dump_unsigned("total_bytes", total_bytes);
329 f->dump_float("examined_ratio", (float)examined_bytes / (float)total_bytes);
330 */
331 f->close_section();
332 f->close_section();
333 f->flush(out);
334 }
335
336 static void handle_signal(int signum)
337 {
338 std::lock_guard l{glock};
339 for (auto &p : estimate_threads) {
340 p->signal(signum);
341 }
342 }
343
344 void EstimateDedupRatio::estimate_dedup_ratio()
345 {
346 ObjectCursor shard_start;
347 ObjectCursor shard_end;
348
349 io_ctx.object_list_slice(
350 begin,
351 end,
352 n,
353 m,
354 &shard_start,
355 &shard_end);
356
357 utime_t start = ceph_clock_now();
358 utime_t end;
359 if (max_seconds) {
360 end = start;
361 end += max_seconds;
362 }
363
364 utime_t next_report;
365 if (report_period) {
366 next_report = start;
367 next_report += report_period;
368 }
369
370 ObjectCursor c(shard_start);
371 while (c < shard_end)
372 {
373 std::vector<ObjectItem> result;
374 int r = io_ctx.object_list(c, shard_end, 12, {}, &result, &c);
375 if (r < 0 ){
376 cerr << "error object_list : " << cpp_strerror(r) << std::endl;
377 return;
378 }
379
380 unsigned op_size = max_read_size;
381
382 for (const auto & i : result) {
383 const auto &oid = i.oid;
384
385 utime_t now = ceph_clock_now();
386 if (max_seconds && now > end) {
387 m_stop = true;
388 }
389 if (m_stop) {
390 return;
391 }
392
393 if (n == 0 && // first thread only
394 next_report != utime_t() && now > next_report) {
395 cerr << (int)(now - start) << "s : read "
396 << dedup_estimates.begin()->second.total_bytes << " bytes so far..."
397 << std::endl;
398 print_dedup_estimate(cerr, chunk_algo);
399 next_report = now;
400 next_report += report_period;
401 }
402
403 // read entire object
404 bufferlist bl;
405 uint64_t offset = 0;
406 while (true) {
407 bufferlist t;
408 int ret = io_ctx.read(oid, t, op_size, offset);
409 if (ret <= 0) {
410 break;
411 }
412 offset += ret;
413 bl.claim_append(t);
414 }
415 examined_objects++;
416 examined_bytes += bl.length();
417
418 // do the chunking
419 for (auto& i : dedup_estimates) {
420 vector<pair<uint64_t, uint64_t>> chunks;
421 i.second.cdc->calc_chunks(bl, &chunks);
422 for (auto& p : chunks) {
423 bufferlist chunk;
424 chunk.substr_of(bl, p.first, p.second);
425 i.second.add_chunk(chunk, fp_algo);
426 if (debug) {
427 cout << " " << oid << " " << p.first << "~" << p.second << std::endl;
428 }
429 }
430 ++i.second.total_objects;
431 }
432 }
433 }
434 }
435
436 void ChunkScrub::chunk_scrub_common()
437 {
438 ObjectCursor shard_start;
439 ObjectCursor shard_end;
440 int ret;
441 Rados rados;
442
443 ret = rados.init_with_context(g_ceph_context);
444 if (ret < 0) {
445 cerr << "couldn't initialize rados: " << cpp_strerror(ret) << std::endl;
446 return;
447 }
448 ret = rados.connect();
449 if (ret) {
450 cerr << "couldn't connect to cluster: " << cpp_strerror(ret) << std::endl;
451 return;
452 }
453
454 chunk_io_ctx.object_list_slice(
455 begin,
456 end,
457 n,
458 m,
459 &shard_start,
460 &shard_end);
461
462 ObjectCursor c(shard_start);
463 while(c < shard_end)
464 {
465 std::vector<ObjectItem> result;
466 int r = chunk_io_ctx.object_list(c, shard_end, 12, {}, &result, &c);
467 if (r < 0 ){
468 cerr << "error object_list : " << cpp_strerror(r) << std::endl;
469 return;
470 }
471
472 for (const auto & i : result) {
473 std::unique_lock l{m_lock};
474 if (m_stop) {
475 Formatter *formatter = Formatter::create("json-pretty");
476 print_status(formatter, cout);
477 delete formatter;
478 return;
479 }
480 auto oid = i.oid;
481 cout << oid << std::endl;
482 chunk_refs_t refs;
483 {
484 bufferlist t;
485 ret = chunk_io_ctx.getxattr(oid, CHUNK_REFCOUNT_ATTR, t);
486 if (ret < 0) {
487 continue;
488 }
489 auto p = t.cbegin();
490 decode(refs, p);
491 }
492
493 examined_objects++;
494 if (refs.get_type() != chunk_refs_t::TYPE_BY_OBJECT) {
495 // we can't do anything here
496 continue;
497 }
498
499 // check all objects
500 chunk_refs_by_object_t *byo =
501 static_cast<chunk_refs_by_object_t*>(refs.r.get());
502 set<hobject_t> real_refs;
503
504 uint64_t pool_missing = 0;
505 uint64_t object_missing = 0;
506 uint64_t does_not_ref = 0;
507 for (auto& pp : byo->by_object) {
508 IoCtx target_io_ctx;
509 ret = rados.ioctx_create2(pp.pool, target_io_ctx);
510 if (ret < 0) {
511 cerr << oid << " ref " << pp
512 << ": referencing pool does not exist" << std::endl;
513 ++pool_missing;
514 continue;
515 }
516
517 ret = cls_cas_references_chunk(target_io_ctx, pp.oid.name, oid);
518 if (ret == -ENOENT) {
519 cerr << oid << " ref " << pp
520 << ": referencing object missing" << std::endl;
521 ++object_missing;
522 } else if (ret == -ENOLINK) {
523 cerr << oid << " ref " << pp
524 << ": referencing object does not reference chunk"
525 << std::endl;
526 ++does_not_ref;
527 }
528 }
529 if (pool_missing || object_missing || does_not_ref) {
530 ++damaged_objects;
531 }
532 }
533 }
534 cout << "--done--" << std::endl;
535 }
536
537 using AioCompRef = unique_ptr<AioCompletion>;
538
539 class SampleDedupWorkerThread : public Thread
540 {
541 public:
542 struct chunk_t {
543 string oid = "";
544 size_t start = 0;
545 size_t size = 0;
546 string fingerprint = "";
547 bufferlist data;
548 };
549
550 class FpStore {
551 public:
552 using dup_count_t = ssize_t;
553
554 bool find(string& fp) {
555 std::shared_lock lock(fingerprint_lock);
556 auto found_item = fp_map.find(fp);
557 return found_item != fp_map.end();
558 }
559
560 // return true if the chunk is duplicate
561 bool add(chunk_t& chunk) {
562 std::unique_lock lock(fingerprint_lock);
563 auto found_iter = fp_map.find(chunk.fingerprint);
564 ssize_t cur_reference = 1;
565 if (found_iter == fp_map.end()) {
566 fp_map.insert({chunk.fingerprint, 1});
567 } else {
568 cur_reference = ++found_iter->second;
569 }
570 return cur_reference >= dedup_threshold && dedup_threshold != -1;
571 }
572
573 void init(size_t dedup_threshold_) {
574 std::unique_lock lock(fingerprint_lock);
575 fp_map.clear();
576 dedup_threshold = dedup_threshold_;
577 }
578 FpStore(size_t chunk_threshold) : dedup_threshold(chunk_threshold) { }
579
580 private:
581 ssize_t dedup_threshold = -1;
582 std::unordered_map<std::string, dup_count_t> fp_map;
583 std::shared_mutex fingerprint_lock;
584 };
585
586 struct SampleDedupGlobal {
587 FpStore fp_store;
588 const double sampling_ratio = -1;
589 SampleDedupGlobal(
590 int chunk_threshold,
591 int sampling_ratio) :
592 fp_store(chunk_threshold),
593 sampling_ratio(static_cast<double>(sampling_ratio) / 100) { }
594 };
595
596 SampleDedupWorkerThread(
597 IoCtx &io_ctx,
598 IoCtx &chunk_io_ctx,
599 ObjectCursor begin,
600 ObjectCursor end,
601 size_t chunk_size,
602 std::string &fp_algo,
603 std::string &chunk_algo,
604 SampleDedupGlobal &sample_dedup_global) :
605 io_ctx(io_ctx),
606 chunk_io_ctx(chunk_io_ctx),
607 chunk_size(chunk_size),
608 fp_type(pg_pool_t::get_fingerprint_from_str(fp_algo)),
609 chunk_algo(chunk_algo),
610 sample_dedup_global(sample_dedup_global),
611 begin(begin),
612 end(end) { }
613
614 ~SampleDedupWorkerThread() { };
615
616 protected:
617 void* entry() override {
618 crawl();
619 return nullptr;
620 }
621
622 private:
623 void crawl();
624 std::tuple<std::vector<ObjectItem>, ObjectCursor> get_objects(
625 ObjectCursor current,
626 ObjectCursor end,
627 size_t max_object_count);
628 std::vector<size_t> sample_object(size_t count);
629 void try_dedup_and_accumulate_result(ObjectItem &object);
630 bool ok_to_dedup_all();
631 int do_chunk_dedup(chunk_t &chunk);
632 bufferlist read_object(ObjectItem &object);
633 std::vector<std::tuple<bufferlist, pair<uint64_t, uint64_t>>> do_cdc(
634 ObjectItem &object,
635 bufferlist &data);
636 std::string generate_fingerprint(bufferlist chunk_data);
637 AioCompRef do_async_evict(string oid);
638
639 IoCtx io_ctx;
640 IoCtx chunk_io_ctx;
641 size_t total_duplicated_size = 0;
642 size_t total_object_size = 0;
643
644 std::set<std::string> oid_for_evict;
645 const size_t chunk_size = 0;
646 pg_pool_t::fingerprint_t fp_type = pg_pool_t::TYPE_FINGERPRINT_NONE;
647 std::string chunk_algo;
648 SampleDedupGlobal &sample_dedup_global;
649 ObjectCursor begin;
650 ObjectCursor end;
651 };
652
653 void SampleDedupWorkerThread::crawl()
654 {
655 cout << "new iteration" << std::endl;
656
657 ObjectCursor current_object = begin;
658 while (current_object < end) {
659 std::vector<ObjectItem> objects;
660 // Get the list of object IDs to deduplicate
661 std::tie(objects, current_object) = get_objects(current_object, end, 100);
662
663 // Pick few objects to be processed. Sampling ratio decides how many
664 // objects to pick. Lower sampling ratio makes crawler have lower crawling
665 // overhead but find less duplication.
666 auto sampled_indexes = sample_object(objects.size());
667 for (size_t index : sampled_indexes) {
668 ObjectItem target = objects[index];
669 try_dedup_and_accumulate_result(target);
670 }
671 }
672
673 vector<AioCompRef> evict_completions(oid_for_evict.size());
674 int i = 0;
675 for (auto &oid : oid_for_evict) {
676 evict_completions[i] = do_async_evict(oid);
677 i++;
678 }
679 for (auto &completion : evict_completions) {
680 completion->wait_for_complete();
681 }
682 cout << "done iteration" << std::endl;
683 }
684
685 AioCompRef SampleDedupWorkerThread::do_async_evict(string oid)
686 {
687 Rados rados;
688 ObjectReadOperation op_tier;
689 AioCompRef completion(rados.aio_create_completion());
690 op_tier.tier_evict();
691 io_ctx.aio_operate(
692 oid,
693 completion.get(),
694 &op_tier,
695 NULL);
696 return completion;
697 }
698
699 std::tuple<std::vector<ObjectItem>, ObjectCursor> SampleDedupWorkerThread::get_objects(
700 ObjectCursor current, ObjectCursor end, size_t max_object_count)
701 {
702 std::vector<ObjectItem> objects;
703 ObjectCursor next;
704 int ret = io_ctx.object_list(
705 current,
706 end,
707 max_object_count,
708 {},
709 &objects,
710 &next);
711 if (ret < 0 ) {
712 cerr << "error object_list" << std::endl;
713 objects.clear();
714 }
715
716 return std::make_tuple(objects, next);
717 }
718
719 std::vector<size_t> SampleDedupWorkerThread::sample_object(size_t count)
720 {
721 std::vector<size_t> indexes(count);
722 for (size_t i = 0 ; i < count ; i++) {
723 indexes[i] = i;
724 }
725 default_random_engine generator;
726 shuffle(indexes.begin(), indexes.end(), generator);
727 size_t sampling_count = static_cast<double>(count) *
728 sample_dedup_global.sampling_ratio;
729 indexes.resize(sampling_count);
730
731 return indexes;
732 }
733
734 void SampleDedupWorkerThread::try_dedup_and_accumulate_result(ObjectItem &object)
735 {
736 bufferlist data = read_object(object);
737 if (data.length() == 0) {
738 cerr << __func__ << " skip object " << object.oid
739 << " read returned size 0" << std::endl;
740 return;
741 }
742 auto chunks = do_cdc(object, data);
743 size_t chunk_total_amount = 0;
744
745 // First, check total size of created chunks
746 for (auto &chunk : chunks) {
747 auto &chunk_data = std::get<0>(chunk);
748 chunk_total_amount += chunk_data.length();
749 }
750 if (chunk_total_amount != data.length()) {
751 cerr << __func__ << " sum of chunked length(" << chunk_total_amount
752 << ") is different from object data length(" << data.length() << ")"
753 << std::endl;
754 return;
755 }
756
757 size_t duplicated_size = 0;
758 list<chunk_t> redundant_chunks;
759 for (auto &chunk : chunks) {
760 auto &chunk_data = std::get<0>(chunk);
761 std::string fingerprint = generate_fingerprint(chunk_data);
762 std::pair<uint64_t, uint64_t> chunk_boundary = std::get<1>(chunk);
763 chunk_t chunk_info = {
764 .oid = object.oid,
765 .start = chunk_boundary.first,
766 .size = chunk_boundary.second,
767 .fingerprint = fingerprint,
768 .data = chunk_data
769 };
770
771 if (sample_dedup_global.fp_store.find(fingerprint)) {
772 duplicated_size += chunk_data.length();
773 }
774 if (sample_dedup_global.fp_store.add(chunk_info)) {
775 redundant_chunks.push_back(chunk_info);
776 }
777 }
778
779 size_t object_size = data.length();
780
781 // perform chunk-dedup
782 for (auto &p : redundant_chunks) {
783 do_chunk_dedup(p);
784 }
785 total_duplicated_size += duplicated_size;
786 total_object_size += object_size;
787 }
788
789 bufferlist SampleDedupWorkerThread::read_object(ObjectItem &object)
790 {
791 bufferlist whole_data;
792 size_t offset = 0;
793 int ret = -1;
794 while (ret != 0) {
795 bufferlist partial_data;
796 ret = io_ctx.read(object.oid, partial_data, default_op_size, offset);
797 if (ret < 0) {
798 cerr << "read object error " << object.oid << " offset " << offset
799 << " size " << default_op_size << " error(" << cpp_strerror(ret)
800 << std::endl;
801 bufferlist empty_buf;
802 return empty_buf;
803 }
804 offset += ret;
805 whole_data.claim_append(partial_data);
806 }
807 return whole_data;
808 }
809
810 std::vector<std::tuple<bufferlist, pair<uint64_t, uint64_t>>> SampleDedupWorkerThread::do_cdc(
811 ObjectItem &object,
812 bufferlist &data)
813 {
814 std::vector<std::tuple<bufferlist, pair<uint64_t, uint64_t>>> ret;
815
816 unique_ptr<CDC> cdc = CDC::create(chunk_algo, cbits(chunk_size) - 1);
817 vector<pair<uint64_t, uint64_t>> chunks;
818 cdc->calc_chunks(data, &chunks);
819 for (auto &p : chunks) {
820 bufferlist chunk;
821 chunk.substr_of(data, p.first, p.second);
822 ret.push_back(make_tuple(chunk, p));
823 }
824
825 return ret;
826 }
827
828 std::string SampleDedupWorkerThread::generate_fingerprint(bufferlist chunk_data)
829 {
830 string ret;
831
832 switch (fp_type) {
833 case pg_pool_t::TYPE_FINGERPRINT_SHA1:
834 ret = crypto::digest<crypto::SHA1>(chunk_data).to_str();
835 break;
836
837 case pg_pool_t::TYPE_FINGERPRINT_SHA256:
838 ret = crypto::digest<crypto::SHA256>(chunk_data).to_str();
839 break;
840
841 case pg_pool_t::TYPE_FINGERPRINT_SHA512:
842 ret = crypto::digest<crypto::SHA512>(chunk_data).to_str();
843 break;
844 default:
845 ceph_assert(0 == "Invalid fp type");
846 break;
847 }
848 return ret;
849 }
850
851 int SampleDedupWorkerThread::do_chunk_dedup(chunk_t &chunk)
852 {
853 uint64_t size;
854 time_t mtime;
855
856 int ret = chunk_io_ctx.stat(chunk.fingerprint, &size, &mtime);
857
858 if (ret == -ENOENT) {
859 bufferlist bl;
860 bl.append(chunk.data);
861 ObjectWriteOperation wop;
862 wop.write_full(bl);
863 chunk_io_ctx.operate(chunk.fingerprint, &wop);
864 } else {
865 ceph_assert(ret == 0);
866 }
867
868 ObjectReadOperation op;
869 op.set_chunk(
870 chunk.start,
871 chunk.size,
872 chunk_io_ctx,
873 chunk.fingerprint,
874 0,
875 CEPH_OSD_OP_FLAG_WITH_REFERENCE);
876 ret = io_ctx.operate(chunk.oid, &op, nullptr);
877 oid_for_evict.insert(chunk.oid);
878 return ret;
879 }
880
881 void ChunkScrub::print_status(Formatter *f, ostream &out)
882 {
883 if (f) {
884 f->open_array_section("chunk_scrub");
885 f->dump_string("PID", stringify(get_pid()));
886 f->open_object_section("Status");
887 f->dump_string("Total object", stringify(total_objects));
888 f->dump_string("Examined objects", stringify(examined_objects));
889 f->dump_string("damaged objects", stringify(damaged_objects));
890 f->close_section();
891 f->flush(out);
892 cout << std::endl;
893 }
894 }
895
896 string get_opts_pool_name(const po::variables_map &opts) {
897 if (opts.count("pool")) {
898 return opts["pool"].as<string>();
899 }
900 cerr << "must specify pool name" << std::endl;
901 exit(1);
902 }
903
904 string get_opts_chunk_algo(const po::variables_map &opts) {
905 if (opts.count("chunk-algorithm")) {
906 string chunk_algo = opts["chunk-algorithm"].as<string>();
907 if (!CDC::create(chunk_algo, 12)) {
908 cerr << "unrecognized chunk-algorithm " << chunk_algo << std::endl;
909 exit(1);
910 }
911 return chunk_algo;
912 }
913 cerr << "must specify chunk-algorithm" << std::endl;
914 exit(1);
915 }
916
917 string get_opts_fp_algo(const po::variables_map &opts) {
918 if (opts.count("fingerprint-algorithm")) {
919 string fp_algo = opts["fingerprint-algorithm"].as<string>();
920 if (fp_algo != "sha1"
921 && fp_algo != "sha256" && fp_algo != "sha512") {
922 cerr << "unrecognized fingerprint-algorithm " << fp_algo << std::endl;
923 exit(1);
924 }
925 return fp_algo;
926 }
927 cout << "SHA1 is set as fingerprint algorithm by default" << std::endl;
928 return string("sha1");
929 }
930
931 string get_opts_op_name(const po::variables_map &opts) {
932 if (opts.count("op")) {
933 return opts["op"].as<string>();
934 } else {
935 cerr << "must specify op" << std::endl;
936 exit(1);
937 }
938 }
939
940 string get_opts_chunk_pool(const po::variables_map &opts) {
941 if (opts.count("chunk-pool")) {
942 return opts["chunk-pool"].as<string>();
943 } else {
944 cerr << "must specify --chunk-pool" << std::endl;
945 exit(1);
946 }
947 }
948
949 string get_opts_object_name(const po::variables_map &opts) {
950 if (opts.count("object")) {
951 return opts["object"].as<string>();
952 } else {
953 cerr << "must specify object" << std::endl;
954 exit(1);
955 }
956 }
957
958 int get_opts_max_thread(const po::variables_map &opts) {
959 if (opts.count("max-thread")) {
960 return opts["max-thread"].as<int>();
961 } else {
962 cout << "2 is set as the number of threads by default" << std::endl;
963 return 2;
964 }
965 }
966
967 int get_opts_report_period(const po::variables_map &opts) {
968 if (opts.count("report-period")) {
969 return opts["report-period"].as<int>();
970 } else {
971 cout << "10 seconds is set as report period by default" << std::endl;
972 return 10;
973 }
974 }
975
976 int estimate_dedup_ratio(const po::variables_map &opts)
977 {
978 Rados rados;
979 IoCtx io_ctx;
980 std::string chunk_algo = "fastcdc";
981 string fp_algo = "sha1";
982 string pool_name;
983 uint64_t chunk_size = 8192;
984 uint64_t min_chunk_size = 8192;
985 uint64_t max_chunk_size = 4*1024*1024;
986 unsigned max_thread = default_max_thread;
987 uint32_t report_period = default_report_period;
988 uint64_t max_read_size = default_op_size;
989 uint64_t max_seconds = 0;
990 int ret;
991 std::map<std::string, std::string>::const_iterator i;
992 bool debug = false;
993 ObjectCursor begin;
994 ObjectCursor end;
995 librados::pool_stat_t s;
996 list<string> pool_names;
997 map<string, librados::pool_stat_t> stats;
998
999 pool_name = get_opts_pool_name(opts);
1000 if (opts.count("chunk-algorithm")) {
1001 chunk_algo = opts["chunk-algorithm"].as<string>();
1002 if (!CDC::create(chunk_algo, 12)) {
1003 cerr << "unrecognized chunk-algorithm " << chunk_algo << std::endl;
1004 exit(1);
1005 }
1006 } else {
1007 cerr << "must specify chunk-algorithm" << std::endl;
1008 exit(1);
1009 }
1010 fp_algo = get_opts_fp_algo(opts);
1011 if (opts.count("chunk-size")) {
1012 chunk_size = opts["chunk-size"].as<int>();
1013 } else {
1014 cout << "8192 is set as chunk size by default" << std::endl;
1015 }
1016 if (opts.count("min-chunk-size")) {
1017 chunk_size = opts["min-chunk-size"].as<int>();
1018 } else {
1019 cout << "8192 is set as min chunk size by default" << std::endl;
1020 }
1021 if (opts.count("max-chunk-size")) {
1022 chunk_size = opts["max-chunk-size"].as<int>();
1023 } else {
1024 cout << "4MB is set as max chunk size by default" << std::endl;
1025 }
1026 max_thread = get_opts_max_thread(opts);
1027 report_period = get_opts_report_period(opts);
1028 if (opts.count("max-seconds")) {
1029 max_seconds = opts["max-seconds"].as<int>();
1030 } else {
1031 cout << "max seconds is not set" << std::endl;
1032 }
1033 if (opts.count("max-read-size")) {
1034 max_read_size = opts["max-read-size"].as<int>();
1035 } else {
1036 cout << default_op_size << " is set as max-read-size by default" << std::endl;
1037 }
1038 if (opts.count("debug")) {
1039 debug = true;
1040 }
1041 boost::optional<pg_t> pgid(opts.count("pgid"), pg_t());
1042
1043 ret = rados.init_with_context(g_ceph_context);
1044 if (ret < 0) {
1045 cerr << "couldn't initialize rados: " << cpp_strerror(ret) << std::endl;
1046 goto out;
1047 }
1048 ret = rados.connect();
1049 if (ret) {
1050 cerr << "couldn't connect to cluster: " << cpp_strerror(ret) << std::endl;
1051 ret = -1;
1052 goto out;
1053 }
1054 if (pool_name.empty()) {
1055 cerr << "--create-pool requested but pool_name was not specified!" << std::endl;
1056 exit(1);
1057 }
1058 ret = rados.ioctx_create(pool_name.c_str(), io_ctx);
1059 if (ret < 0) {
1060 cerr << "error opening pool "
1061 << pool_name << ": "
1062 << cpp_strerror(ret) << std::endl;
1063 goto out;
1064 }
1065
1066 // set up chunkers
1067 if (chunk_size) {
1068 dedup_estimates.emplace(std::piecewise_construct,
1069 std::forward_as_tuple(chunk_size),
1070 std::forward_as_tuple(chunk_algo, cbits(chunk_size)-1));
1071 } else {
1072 for (size_t cs = min_chunk_size; cs <= max_chunk_size; cs *= 2) {
1073 dedup_estimates.emplace(std::piecewise_construct,
1074 std::forward_as_tuple(cs),
1075 std::forward_as_tuple(chunk_algo, cbits(cs)-1));
1076 }
1077 }
1078
1079 glock.lock();
1080 begin = io_ctx.object_list_begin();
1081 end = io_ctx.object_list_end();
1082 pool_names.push_back(pool_name);
1083 ret = rados.get_pool_stats(pool_names, stats);
1084 if (ret < 0) {
1085 cerr << "error fetching pool stats: " << cpp_strerror(ret) << std::endl;
1086 glock.unlock();
1087 return ret;
1088 }
1089 if (stats.find(pool_name) == stats.end()) {
1090 cerr << "stats can not find pool name: " << pool_name << std::endl;
1091 glock.unlock();
1092 return ret;
1093 }
1094 s = stats[pool_name];
1095
1096 for (unsigned i = 0; i < max_thread; i++) {
1097 std::unique_ptr<CrawlerThread> ptr (
1098 new EstimateDedupRatio(io_ctx, i, max_thread, begin, end,
1099 chunk_algo, fp_algo, chunk_size,
1100 report_period, s.num_objects, max_read_size,
1101 max_seconds));
1102 ptr->create("estimate_thread");
1103 ptr->set_debug(debug);
1104 estimate_threads.push_back(move(ptr));
1105 }
1106 glock.unlock();
1107
1108 for (auto &p : estimate_threads) {
1109 p->join();
1110 }
1111
1112 print_dedup_estimate(cout, chunk_algo);
1113
1114 out:
1115 return (ret < 0) ? 1 : 0;
1116 }
1117
1118 static void print_chunk_scrub()
1119 {
1120 uint64_t total_objects = 0;
1121 uint64_t examined_objects = 0;
1122 int damaged_objects = 0;
1123
1124 for (auto &et : estimate_threads) {
1125 if (!total_objects) {
1126 total_objects = et->get_total_objects();
1127 }
1128 examined_objects += et->get_examined_objects();
1129 ChunkScrub *ptr = static_cast<ChunkScrub*>(et.get());
1130 damaged_objects += ptr->get_damaged_objects();
1131 }
1132
1133 cout << " Total object : " << total_objects << std::endl;
1134 cout << " Examined object : " << examined_objects << std::endl;
1135 cout << " Damaged object : " << damaged_objects << std::endl;
1136 }
1137
1138 int chunk_scrub_common(const po::variables_map &opts)
1139 {
1140 Rados rados;
1141 IoCtx io_ctx, chunk_io_ctx;
1142 std::string object_name, target_object_name;
1143 string chunk_pool_name, op_name;
1144 int ret;
1145 unsigned max_thread = default_max_thread;
1146 std::map<std::string, std::string>::const_iterator i;
1147 uint32_t report_period = default_report_period;
1148 ObjectCursor begin;
1149 ObjectCursor end;
1150 librados::pool_stat_t s;
1151 list<string> pool_names;
1152 map<string, librados::pool_stat_t> stats;
1153
1154 op_name = get_opts_op_name(opts);
1155 chunk_pool_name = get_opts_chunk_pool(opts);
1156 boost::optional<pg_t> pgid(opts.count("pgid"), pg_t());
1157
1158 ret = rados.init_with_context(g_ceph_context);
1159 if (ret < 0) {
1160 cerr << "couldn't initialize rados: " << cpp_strerror(ret) << std::endl;
1161 goto out;
1162 }
1163 ret = rados.connect();
1164 if (ret) {
1165 cerr << "couldn't connect to cluster: " << cpp_strerror(ret) << std::endl;
1166 ret = -1;
1167 goto out;
1168 }
1169 ret = rados.ioctx_create(chunk_pool_name.c_str(), chunk_io_ctx);
1170 if (ret < 0) {
1171 cerr << "error opening pool "
1172 << chunk_pool_name << ": "
1173 << cpp_strerror(ret) << std::endl;
1174 goto out;
1175 }
1176
1177 if (op_name == "chunk-get-ref" ||
1178 op_name == "chunk-put-ref" ||
1179 op_name == "chunk-repair") {
1180 string target_object_name;
1181 uint64_t pool_id;
1182 object_name = get_opts_object_name(opts);
1183 if (opts.count("target-ref")) {
1184 target_object_name = opts["target-ref"].as<string>();
1185 } else {
1186 cerr << "must specify target ref" << std::endl;
1187 exit(1);
1188 }
1189 if (opts.count("target-ref-pool-id")) {
1190 pool_id = opts["target-ref-pool-id"].as<uint64_t>();
1191 } else {
1192 cerr << "must specify target-ref-pool-id" << std::endl;
1193 exit(1);
1194 }
1195
1196 uint32_t hash;
1197 ret = chunk_io_ctx.get_object_hash_position2(object_name, &hash);
1198 if (ret < 0) {
1199 return ret;
1200 }
1201 hobject_t oid(sobject_t(target_object_name, CEPH_NOSNAP), "", hash, pool_id, "");
1202
1203 auto run_op = [] (ObjectWriteOperation& op, hobject_t& oid,
1204 string& object_name, IoCtx& chunk_io_ctx) -> int {
1205 int ret = chunk_io_ctx.operate(object_name, &op);
1206 if (ret < 0) {
1207 cerr << " operate fail : " << cpp_strerror(ret) << std::endl;
1208 }
1209 return ret;
1210 };
1211
1212 ObjectWriteOperation op;
1213 if (op_name == "chunk-get-ref") {
1214 cls_cas_chunk_get_ref(op, oid);
1215 ret = run_op(op, oid, object_name, chunk_io_ctx);
1216 } else if (op_name == "chunk-put-ref") {
1217 cls_cas_chunk_put_ref(op, oid);
1218 ret = run_op(op, oid, object_name, chunk_io_ctx);
1219 } else if (op_name == "chunk-repair") {
1220 ret = rados.ioctx_create2(pool_id, io_ctx);
1221 if (ret < 0) {
1222 cerr << oid << " ref " << pool_id
1223 << ": referencing pool does not exist" << std::endl;
1224 return ret;
1225 }
1226 int chunk_ref = -1, base_ref = -1;
1227 // read object on chunk pool to know how many reference the object has
1228 bufferlist t;
1229 ret = chunk_io_ctx.getxattr(object_name, CHUNK_REFCOUNT_ATTR, t);
1230 if (ret < 0) {
1231 return ret;
1232 }
1233 chunk_refs_t refs;
1234 auto p = t.cbegin();
1235 decode(refs, p);
1236 if (refs.get_type() != chunk_refs_t::TYPE_BY_OBJECT) {
1237 cerr << " does not supported chunk type " << std::endl;
1238 return -1;
1239 }
1240 chunk_ref =
1241 static_cast<chunk_refs_by_object_t*>(refs.r.get())->by_object.count(oid);
1242 if (chunk_ref < 0) {
1243 cerr << object_name << " has no reference of " << target_object_name
1244 << std::endl;
1245 return chunk_ref;
1246 }
1247 cout << object_name << " has " << chunk_ref << " references for "
1248 << target_object_name << std::endl;
1249
1250 // read object on base pool to know the number of chunk object's references
1251 base_ref = cls_cas_references_chunk(io_ctx, target_object_name, object_name);
1252 if (base_ref < 0) {
1253 if (base_ref == -ENOENT || base_ref == -ENOLINK) {
1254 base_ref = 0;
1255 } else {
1256 return base_ref;
1257 }
1258 }
1259 cout << target_object_name << " has " << base_ref << " references for "
1260 << object_name << std::endl;
1261 if (chunk_ref != base_ref) {
1262 if (base_ref > chunk_ref) {
1263 cerr << "error : " << target_object_name << "'s ref. < " << object_name
1264 << "' ref. " << std::endl;
1265 return -EINVAL;
1266 }
1267 cout << " fix dangling reference from " << chunk_ref << " to " << base_ref
1268 << std::endl;
1269 while (base_ref != chunk_ref) {
1270 ObjectWriteOperation op;
1271 cls_cas_chunk_put_ref(op, oid);
1272 chunk_ref--;
1273 ret = run_op(op, oid, object_name, chunk_io_ctx);
1274 if (ret < 0) {
1275 return ret;
1276 }
1277 }
1278 }
1279 }
1280 return ret;
1281
1282 } else if (op_name == "dump-chunk-refs") {
1283 object_name = get_opts_object_name(opts);
1284 bufferlist t;
1285 ret = chunk_io_ctx.getxattr(object_name, CHUNK_REFCOUNT_ATTR, t);
1286 if (ret < 0) {
1287 return ret;
1288 }
1289 chunk_refs_t refs;
1290 auto p = t.cbegin();
1291 decode(refs, p);
1292 auto f = Formatter::create("json-pretty");
1293 f->dump_object("refs", refs);
1294 f->flush(cout);
1295 return 0;
1296 }
1297
1298 max_thread = get_opts_max_thread(opts);
1299 report_period = get_opts_report_period(opts);
1300 glock.lock();
1301 begin = chunk_io_ctx.object_list_begin();
1302 end = chunk_io_ctx.object_list_end();
1303 pool_names.push_back(chunk_pool_name);
1304 ret = rados.get_pool_stats(pool_names, stats);
1305 if (ret < 0) {
1306 cerr << "error fetching pool stats: " << cpp_strerror(ret) << std::endl;
1307 glock.unlock();
1308 return ret;
1309 }
1310 if (stats.find(chunk_pool_name) == stats.end()) {
1311 cerr << "stats can not find pool name: " << chunk_pool_name << std::endl;
1312 glock.unlock();
1313 return ret;
1314 }
1315 s = stats[chunk_pool_name];
1316
1317 for (unsigned i = 0; i < max_thread; i++) {
1318 std::unique_ptr<CrawlerThread> ptr (
1319 new ChunkScrub(io_ctx, i, max_thread, begin, end, chunk_io_ctx,
1320 report_period, s.num_objects));
1321 ptr->create("estimate_thread");
1322 estimate_threads.push_back(move(ptr));
1323 }
1324 glock.unlock();
1325
1326 for (auto &p : estimate_threads) {
1327 cout << "join " << std::endl;
1328 p->join();
1329 cout << "joined " << std::endl;
1330 }
1331
1332 print_chunk_scrub();
1333
1334 out:
1335 return (ret < 0) ? 1 : 0;
1336 }
1337
1338 string make_pool_str(string pool, string var, string val)
1339 {
1340 return string("{\"prefix\": \"osd pool set\",\"pool\":\"") + pool
1341 + string("\",\"var\": \"") + var + string("\",\"val\": \"")
1342 + val + string("\"}");
1343 }
1344
1345 string make_pool_str(string pool, string var, int val)
1346 {
1347 return make_pool_str(pool, var, stringify(val));
1348 }
1349
1350 int make_dedup_object(const po::variables_map &opts)
1351 {
1352 Rados rados;
1353 IoCtx io_ctx, chunk_io_ctx;
1354 std::string object_name, chunk_pool_name, op_name, pool_name, fp_algo;
1355 int ret;
1356 std::map<std::string, std::string>::const_iterator i;
1357
1358 op_name = get_opts_op_name(opts);
1359 pool_name = get_opts_pool_name(opts);
1360 object_name = get_opts_object_name(opts);
1361 chunk_pool_name = get_opts_chunk_pool(opts);
1362 boost::optional<pg_t> pgid(opts.count("pgid"), pg_t());
1363
1364 ret = rados.init_with_context(g_ceph_context);
1365 if (ret < 0) {
1366 cerr << "couldn't initialize rados: " << cpp_strerror(ret) << std::endl;
1367 goto out;
1368 }
1369 ret = rados.connect();
1370 if (ret) {
1371 cerr << "couldn't connect to cluster: " << cpp_strerror(ret) << std::endl;
1372 ret = -1;
1373 goto out;
1374 }
1375 ret = rados.ioctx_create(pool_name.c_str(), io_ctx);
1376 if (ret < 0) {
1377 cerr << "error opening pool "
1378 << chunk_pool_name << ": "
1379 << cpp_strerror(ret) << std::endl;
1380 goto out;
1381 }
1382 ret = rados.ioctx_create(chunk_pool_name.c_str(), chunk_io_ctx);
1383 if (ret < 0) {
1384 cerr << "error opening pool "
1385 << chunk_pool_name << ": "
1386 << cpp_strerror(ret) << std::endl;
1387 goto out;
1388 }
1389 fp_algo = get_opts_fp_algo(opts);
1390
1391 if (op_name == "chunk-dedup") {
1392 uint64_t offset, length;
1393 string chunk_object;
1394 if (opts.count("source-off")) {
1395 offset = opts["source-off"].as<uint64_t>();
1396 } else {
1397 cerr << "must specify --source-off" << std::endl;
1398 exit(1);
1399 }
1400 if (opts.count("source-length")) {
1401 length = opts["source-length"].as<uint64_t>();
1402 } else {
1403 cerr << "must specify --source-length" << std::endl;
1404 exit(1);
1405 }
1406 // 1. make a copy from manifest object to chunk object
1407 bufferlist bl;
1408 ret = io_ctx.read(object_name, bl, length, offset);
1409 if (ret < 0) {
1410 cerr << " reading object in base pool fails : " << cpp_strerror(ret) << std::endl;
1411 goto out;
1412 }
1413 chunk_object = [&fp_algo, &bl]() -> string {
1414 if (fp_algo == "sha1") {
1415 return ceph::crypto::digest<ceph::crypto::SHA1>(bl).to_str();
1416 } else if (fp_algo == "sha256") {
1417 return ceph::crypto::digest<ceph::crypto::SHA256>(bl).to_str();
1418 } else if (fp_algo == "sha512") {
1419 return ceph::crypto::digest<ceph::crypto::SHA512>(bl).to_str();
1420 } else {
1421 assert(0 == "unrecognized fingerprint type");
1422 return {};
1423 }
1424 }();
1425 ret = chunk_io_ctx.write(chunk_object, bl, length, offset);
1426 if (ret < 0) {
1427 cerr << " writing object in chunk pool fails : " << cpp_strerror(ret) << std::endl;
1428 goto out;
1429 }
1430 // 2. call set_chunk
1431 ObjectReadOperation op;
1432 op.set_chunk(offset, length, chunk_io_ctx, chunk_object, 0,
1433 CEPH_OSD_OP_FLAG_WITH_REFERENCE);
1434 ret = io_ctx.operate(object_name, &op, NULL);
1435 if (ret < 0) {
1436 cerr << " operate fail : " << cpp_strerror(ret) << std::endl;
1437 goto out;
1438 }
1439 } else if (op_name == "object-dedup") {
1440 unsigned chunk_size = 0;
1441 bool snap = false;
1442 if (opts.count("dedup-cdc-chunk-size")) {
1443 chunk_size = opts["dedup-cdc-chunk-size"].as<unsigned int>();
1444 } else {
1445 cerr << "must specify --dedup-cdc-chunk-size" << std::endl;
1446 exit(1);
1447 }
1448 if (opts.count("snap")) {
1449 snap = true;
1450 }
1451
1452 bufferlist inbl;
1453 ret = rados.mon_command(
1454 make_pool_str(pool_name, "fingerprint_algorithm", fp_algo),
1455 inbl, NULL, NULL);
1456 if (ret < 0) {
1457 cerr << " operate fail : " << cpp_strerror(ret) << std::endl;
1458 return ret;
1459 }
1460 ret = rados.mon_command(
1461 make_pool_str(pool_name, "dedup_tier", chunk_pool_name),
1462 inbl, NULL, NULL);
1463 if (ret < 0) {
1464 cerr << " operate fail : " << cpp_strerror(ret) << std::endl;
1465 return ret;
1466 }
1467 ret = rados.mon_command(
1468 make_pool_str(pool_name, "dedup_chunk_algorithm", "fastcdc"),
1469 inbl, NULL, NULL);
1470 if (ret < 0) {
1471 cerr << " operate fail : " << cpp_strerror(ret) << std::endl;
1472 return ret;
1473 }
1474 ret = rados.mon_command(
1475 make_pool_str(pool_name, "dedup_cdc_chunk_size", chunk_size),
1476 inbl, NULL, NULL);
1477 if (ret < 0) {
1478 cerr << " operate fail : " << cpp_strerror(ret) << std::endl;
1479 return ret;
1480 }
1481
1482 auto create_new_deduped_object =
1483 [&io_ctx](string object_name) -> int {
1484
1485 // tier-flush to perform deduplication
1486 ObjectReadOperation flush_op;
1487 flush_op.tier_flush();
1488 int ret = io_ctx.operate(object_name, &flush_op, NULL);
1489 if (ret < 0) {
1490 cerr << " tier_flush fail : " << cpp_strerror(ret) << std::endl;
1491 return ret;
1492 }
1493 // tier-evict
1494 ObjectReadOperation evict_op;
1495 evict_op.tier_evict();
1496 ret = io_ctx.operate(object_name, &evict_op, NULL);
1497 if (ret < 0) {
1498 cerr << " tier_evict fail : " << cpp_strerror(ret) << std::endl;
1499 return ret;
1500 }
1501 return ret;
1502 };
1503
1504 if (snap) {
1505 io_ctx.snap_set_read(librados::SNAP_DIR);
1506 snap_set_t snap_set;
1507 int snap_ret;
1508 ObjectReadOperation op;
1509 op.list_snaps(&snap_set, &snap_ret);
1510 io_ctx.operate(object_name, &op, NULL);
1511
1512 for (vector<librados::clone_info_t>::const_iterator r = snap_set.clones.begin();
1513 r != snap_set.clones.end();
1514 ++r) {
1515 io_ctx.snap_set_read(r->cloneid);
1516 ret = create_new_deduped_object(object_name);
1517 if (ret < 0) {
1518 goto out;
1519 }
1520 }
1521 } else {
1522 ret = create_new_deduped_object(object_name);
1523 }
1524 }
1525
1526 out:
1527 return (ret < 0) ? 1 : 0;
1528 }
1529
1530 int make_crawling_daemon(const po::variables_map &opts)
1531 {
1532 string base_pool_name = get_opts_pool_name(opts);
1533 string chunk_pool_name = get_opts_chunk_pool(opts);
1534 unsigned max_thread = get_opts_max_thread(opts);
1535
1536 bool loop = false;
1537 if (opts.count("loop")) {
1538 loop = true;
1539 }
1540
1541 int sampling_ratio = -1;
1542 if (opts.count("sampling-ratio")) {
1543 sampling_ratio = opts["sampling-ratio"].as<int>();
1544 }
1545 size_t chunk_size = 8192;
1546 if (opts.count("chunk-size")) {
1547 chunk_size = opts["chunk-size"].as<int>();
1548 } else {
1549 cout << "8192 is set as chunk size by default" << std::endl;
1550 }
1551
1552 uint32_t chunk_dedup_threshold = -1;
1553 if (opts.count("chunk-dedup-threshold")) {
1554 chunk_dedup_threshold = opts["chunk-dedup-threshold"].as<uint32_t>();
1555 }
1556
1557 std::string chunk_algo = get_opts_chunk_algo(opts);
1558
1559 Rados rados;
1560 int ret = rados.init_with_context(g_ceph_context);
1561 if (ret < 0) {
1562 cerr << "couldn't initialize rados: " << cpp_strerror(ret) << std::endl;
1563 return -EINVAL;
1564 }
1565 ret = rados.connect();
1566 if (ret) {
1567 cerr << "couldn't connect to cluster: " << cpp_strerror(ret) << std::endl;
1568 return -EINVAL;
1569 }
1570 int wakeup_period = 100;
1571 if (opts.count("wakeup-period")) {
1572 wakeup_period = opts["wakeup-period"].as<int>();
1573 } else {
1574 cout << "100 second is set as wakeup period by default" << std::endl;
1575 }
1576
1577 std::string fp_algo = get_opts_fp_algo(opts);
1578
1579 list<string> pool_names;
1580 IoCtx io_ctx, chunk_io_ctx;
1581 pool_names.push_back(base_pool_name);
1582 ret = rados.ioctx_create(base_pool_name.c_str(), io_ctx);
1583 if (ret < 0) {
1584 cerr << "error opening base pool "
1585 << base_pool_name << ": "
1586 << cpp_strerror(ret) << std::endl;
1587 return -EINVAL;
1588 }
1589
1590 ret = rados.ioctx_create(chunk_pool_name.c_str(), chunk_io_ctx);
1591 if (ret < 0) {
1592 cerr << "error opening chunk pool "
1593 << chunk_pool_name << ": "
1594 << cpp_strerror(ret) << std::endl;
1595 return -EINVAL;
1596 }
1597 bufferlist inbl;
1598 ret = rados.mon_command(
1599 make_pool_str(base_pool_name, "fingerprint_algorithm", fp_algo),
1600 inbl, NULL, NULL);
1601 if (ret < 0) {
1602 cerr << " operate fail : " << cpp_strerror(ret) << std::endl;
1603 return ret;
1604 }
1605 ret = rados.mon_command(
1606 make_pool_str(base_pool_name, "dedup_chunk_algorithm", "fastcdc"),
1607 inbl, NULL, NULL);
1608 if (ret < 0) {
1609 cerr << " operate fail : " << cpp_strerror(ret) << std::endl;
1610 return ret;
1611 }
1612 ret = rados.mon_command(
1613 make_pool_str(base_pool_name, "dedup_cdc_chunk_size", chunk_size),
1614 inbl, NULL, NULL);
1615 if (ret < 0) {
1616 cerr << " operate fail : " << cpp_strerror(ret) << std::endl;
1617 return ret;
1618 }
1619 ret = rados.mon_command(
1620 make_pool_str(base_pool_name, "dedup_tier", chunk_pool_name),
1621 inbl, NULL, NULL);
1622 if (ret < 0) {
1623 cerr << " operate fail : " << cpp_strerror(ret) << std::endl;
1624 return ret;
1625 }
1626
1627 cout << "SampleRatio : " << sampling_ratio << std::endl
1628 << "Chunk Dedup Threshold : " << chunk_dedup_threshold << std::endl
1629 << "Chunk Size : " << chunk_size << std::endl
1630 << std::endl;
1631
1632 while (true) {
1633 lock_guard lock(glock);
1634 ObjectCursor begin = io_ctx.object_list_begin();
1635 ObjectCursor end = io_ctx.object_list_end();
1636 map<string, librados::pool_stat_t> stats;
1637 ret = rados.get_pool_stats(pool_names, stats);
1638 if (ret < 0) {
1639 cerr << "error fetching pool stats: " << cpp_strerror(ret) << std::endl;
1640 return -EINVAL;
1641 }
1642 if (stats.find(base_pool_name) == stats.end()) {
1643 cerr << "stats can not find pool name: " << base_pool_name << std::endl;
1644 return -EINVAL;
1645 }
1646
1647 SampleDedupWorkerThread::SampleDedupGlobal sample_dedup_global(
1648 chunk_dedup_threshold, sampling_ratio);
1649
1650 std::list<SampleDedupWorkerThread> threads;
1651 for (unsigned i = 0; i < max_thread; i++) {
1652 cout << " add thread.. " << std::endl;
1653 ObjectCursor shard_start;
1654 ObjectCursor shard_end;
1655 io_ctx.object_list_slice(
1656 begin,
1657 end,
1658 i,
1659 max_thread,
1660 &shard_start,
1661 &shard_end);
1662
1663 threads.emplace_back(
1664 io_ctx,
1665 chunk_io_ctx,
1666 shard_start,
1667 shard_end,
1668 chunk_size,
1669 fp_algo,
1670 chunk_algo,
1671 sample_dedup_global);
1672 threads.back().create("sample_dedup");
1673 }
1674
1675 for (auto &p : threads) {
1676 p.join();
1677 }
1678 if (loop) {
1679 sleep(wakeup_period);
1680 } else {
1681 break;
1682 }
1683 }
1684
1685 return 0;
1686 }
1687
1688 int main(int argc, const char **argv)
1689 {
1690 auto args = argv_to_vec(argc, argv);
1691 if (args.empty()) {
1692 cerr << argv[0] << ": -h or --help for usage" << std::endl;
1693 exit(1);
1694 }
1695
1696 po::variables_map opts;
1697 po::positional_options_description p;
1698 p.add("command", 1);
1699 po::options_description desc = make_usage();
1700 try {
1701 po::parsed_options parsed =
1702 po::command_line_parser(argc, argv).options(desc).positional(p).allow_unregistered().run();
1703 po::store(parsed, opts);
1704 po::notify(opts);
1705 } catch(po::error &e) {
1706 std::cerr << e.what() << std::endl;
1707 return 1;
1708 }
1709 if (opts.count("help") || opts.count("h")) {
1710 cout<< desc << std::endl;
1711 exit(0);
1712 }
1713
1714 auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
1715 CODE_ENVIRONMENT_DAEMON,
1716 CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS);
1717
1718 Preforker forker;
1719 if (global_init_prefork(g_ceph_context) >= 0) {
1720 std::string err;
1721 int r = forker.prefork(err);
1722 if (r < 0) {
1723 cerr << err << std::endl;
1724 return r;
1725 }
1726 if (forker.is_parent()) {
1727 g_ceph_context->_log->start();
1728 if (forker.parent_wait(err) != 0) {
1729 return -ENXIO;
1730 }
1731 return 0;
1732 }
1733 global_init_postfork_start(g_ceph_context);
1734 }
1735 common_init_finish(g_ceph_context);
1736 if (opts.count("daemon")) {
1737 global_init_postfork_finish(g_ceph_context);
1738 forker.daemonize();
1739 }
1740 init_async_signal_handler();
1741 register_async_signal_handler_oneshot(SIGINT, handle_signal);
1742 register_async_signal_handler_oneshot(SIGTERM, handle_signal);
1743
1744 string op_name = get_opts_op_name(opts);
1745 int ret = 0;
1746 if (op_name == "estimate") {
1747 ret = estimate_dedup_ratio(opts);
1748 } else if (op_name == "chunk-scrub" ||
1749 op_name == "chunk-get-ref" ||
1750 op_name == "chunk-put-ref" ||
1751 op_name == "chunk-repair" ||
1752 op_name == "dump-chunk-refs") {
1753 ret = chunk_scrub_common(opts);
1754 } else if (op_name == "chunk-dedup" ||
1755 op_name == "object-dedup") {
1756 /*
1757 * chunk-dedup:
1758 * using a chunk generated by given source,
1759 * create a new object in the chunk pool or increase the reference
1760 * if the object exists
1761 *
1762 * object-dedup:
1763 * perform deduplication on the entire object, not a chunk.
1764 *
1765 */
1766 ret = make_dedup_object(opts);
1767 } else if (op_name == "sample-dedup") {
1768 ret = make_crawling_daemon(opts);
1769 } else {
1770 cerr << "unrecognized op " << op_name << std::endl;
1771 exit(1);
1772 }
1773
1774 unregister_async_signal_handler(SIGINT, handle_signal);
1775 unregister_async_signal_handler(SIGTERM, handle_signal);
1776 shutdown_async_signal_handler();
1777
1778 return forker.signal_exit(ret);
1779 }