1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Author: Myoungwon Oh <ohmyoungwon@gmail.com>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "include/types.h"
16 #include "include/rados/buffer.h"
17 #include "include/rados/librados.hpp"
18 #include "include/rados/rados_types.hpp"
22 #include "common/Cond.h"
23 #include "common/Formatter.h"
24 #include "common/ceph_argparse.h"
25 #include "common/ceph_crypto.h"
26 #include "common/config.h"
27 #include "common/debug.h"
28 #include "common/errno.h"
29 #include "common/obj_bencher.h"
30 #include "global/global_init.h"
45 #include "tools/RadosDump.h"
46 #include "cls/cas/cls_cas_client.h"
47 #include "cls/cas/cls_cas_internal.h"
48 #include "include/stringify.h"
49 #include "global/signal_handler.h"
50 #include "common/CDC.h"
51 #include "common/Preforker.h"
53 #include <boost/program_options/variables_map.hpp>
54 #include <boost/program_options/parsers.hpp>
57 namespace po
= boost::program_options
;
59 struct EstimateResult
{
60 std::unique_ptr
<CDC
> cdc
;
64 ceph::mutex lock
= ceph::make_mutex("EstimateResult::lock");
66 // < key, <count, chunk_size> >
67 map
< string
, pair
<uint64_t, uint64_t> > chunk_statistics
;
68 uint64_t total_bytes
= 0;
69 std::atomic
<uint64_t> total_objects
= {0};
71 EstimateResult(std::string alg
, int chunk_size
)
72 : cdc(CDC::create(alg
, chunk_size
)),
73 chunk_size(1ull << chunk_size
) {}
75 void add_chunk(bufferlist
& chunk
, const std::string
& fp_algo
) {
77 if (fp_algo
== "sha1") {
78 sha1_digest_t sha1_val
= crypto::digest
<crypto::SHA1
>(chunk
);
79 fp
= sha1_val
.to_str();
80 } else if (fp_algo
== "sha256") {
81 sha256_digest_t sha256_val
= crypto::digest
<crypto::SHA256
>(chunk
);
82 fp
= sha256_val
.to_str();
83 } else if (fp_algo
== "sha512") {
84 sha512_digest_t sha512_val
= crypto::digest
<crypto::SHA512
>(chunk
);
85 fp
= sha512_val
.to_str();
87 ceph_assert(0 == "no support fingerperint algorithm");
90 std::lock_guard
l(lock
);
91 auto p
= chunk_statistics
.find(fp
);
92 if (p
!= chunk_statistics
.end()) {
94 if (p
->second
.second
!= chunk
.length()) {
95 cerr
<< "warning: hash collision on " << fp
96 << ": was " << p
->second
.second
97 << " now " << chunk
.length() << std::endl
;
100 chunk_statistics
[fp
] = make_pair(1, chunk
.length());
102 total_bytes
+= chunk
.length();
105 void dump(Formatter
*f
) const {
106 f
->dump_unsigned("target_chunk_size", chunk_size
);
108 uint64_t dedup_bytes
= 0;
109 uint64_t dedup_objects
= chunk_statistics
.size();
110 for (auto& j
: chunk_statistics
) {
111 dedup_bytes
+= j
.second
.second
;
113 //f->dump_unsigned("dedup_bytes", dedup_bytes);
114 //f->dump_unsigned("original_bytes", total_bytes);
115 f
->dump_float("dedup_bytes_ratio",
116 (double)dedup_bytes
/ (double)total_bytes
);
117 f
->dump_float("dedup_objects_ratio",
118 (double)dedup_objects
/ (double)total_objects
);
120 uint64_t avg
= total_bytes
/ dedup_objects
;
122 for (auto& j
: chunk_statistics
) {
123 sqsum
+= (avg
- j
.second
.second
) * (avg
- j
.second
.second
);
125 uint64_t stddev
= sqrt(sqsum
/ dedup_objects
);
126 f
->dump_unsigned("chunk_size_average", avg
);
127 f
->dump_unsigned("chunk_size_stddev", stddev
);
131 map
<uint64_t, EstimateResult
> dedup_estimates
; // chunk size -> result
133 using namespace librados
;
134 unsigned default_op_size
= 1 << 26;
135 unsigned default_max_thread
= 2;
136 int32_t default_report_period
= 10;
137 ceph::mutex glock
= ceph::make_mutex("glock");
139 po::options_description
make_usage() {
140 po::options_description
desc("Usage");
142 ("help,h", ": produce help message")
143 ("op estimate --pool <POOL> --chunk-size <CHUNK_SIZE> --chunk-algorithm <ALGO> --fingerprint-algorithm <FP_ALGO>",
144 ": estimate how many chunks are redundant")
145 ("op chunk-scrub --chunk-pool <POOL>",
146 ": perform chunk scrub")
147 ("op chunk-get-ref --chunk-pool <POOL> --object <OID> --target-ref <OID> --target-ref-pool-id <POOL_ID>",
148 ": get chunk object's reference")
149 ("op chunk-put-ref --chunk-pool <POOL> --object <OID> --target-ref <OID> --target-ref-pool-id <POOL_ID>",
150 ": put chunk object's reference")
151 ("op chunk-repair --chunk-pool <POOL> --object <OID> --target-ref <OID> --target-ref-pool-id <POOL_ID>",
152 ": fix mismatched references")
153 ("op dump-chunk-refs --chunk-pool <POOL> --object <OID>",
154 ": dump chunk object's references")
155 ("op chunk-dedup --pool <POOL> --object <OID> --chunk-pool <POOL> --fingerprint-algorithm <FP> --source-off <OFFSET> --source-length <LENGTH>",
156 ": perform a chunk dedup---deduplicate only a chunk, which is a part of object.")
157 ("op object-dedup --pool <POOL> --object <OID> --chunk-pool <POOL> --fingerprint-algorithm <FP> --dedup-cdc-chunk-size <CHUNK_SIZE> [--snap]",
158 ": perform a object dedup---deduplicate the entire object, not a chunk. Related snapshots are also deduplicated if --snap is given")
159 ("op sample-dedup --pool <POOL> --chunk-pool <POOL> --chunk-algorithm <ALGO> --fingerprint-algorithm <FP> --daemon --loop",
160 ": perform a sample dedup---make crawling threads which crawl objects in base pool and deduplicate them based on their deduplication efficiency")
162 po::options_description
op_desc("Opational arguments");
163 op_desc
.add_options()
164 ("op", po::value
<std::string
>(), ": estimate|chunk-scrub|chunk-get-ref|chunk-put-ref|chunk-repair|dump-chunk-refs|chunk-dedup|object-dedup")
165 ("target-ref", po::value
<std::string
>(), ": set target object")
166 ("target-ref-pool-id", po::value
<uint64_t>(), ": set target pool id")
167 ("object", po::value
<std::string
>(), ": set object name")
168 ("chunk-size", po::value
<int>(), ": chunk size (byte)")
169 ("chunk-algorithm", po::value
<std::string
>(), ": <fixed|fastcdc>, set chunk-algorithm")
170 ("fingerprint-algorithm", po::value
<std::string
>(), ": <sha1|sha256|sha512>, set fingerprint-algorithm")
171 ("chunk-pool", po::value
<std::string
>(), ": set chunk pool name")
172 ("max-thread", po::value
<int>(), ": set max thread")
173 ("report-period", po::value
<int>(), ": set report-period")
174 ("max-seconds", po::value
<int>(), ": set max runtime")
175 ("max-read-size", po::value
<int>(), ": set max read size")
176 ("pool", po::value
<std::string
>(), ": set pool name")
177 ("min-chunk-size", po::value
<int>(), ": min chunk size (byte)")
178 ("max-chunk-size", po::value
<int>(), ": max chunk size (byte)")
179 ("source-off", po::value
<uint64_t>(), ": set source offset")
180 ("source-length", po::value
<uint64_t>(), ": set source length")
181 ("dedup-cdc-chunk-size", po::value
<unsigned int>(), ": set dedup chunk size for cdc")
182 ("snap", ": deduplciate snapshotted object")
183 ("debug", ": enable debug")
184 ("pgid", ": set pgid")
185 ("chunk-dedup-threshold", po::value
<uint32_t>(), ": set the threshold for chunk dedup (number of duplication) ")
186 ("sampling-ratio", po::value
<int>(), ": set the sampling ratio (percentile)")
187 ("daemon", ": execute sample dedup in daemon mode")
188 ("loop", ": execute sample dedup in a loop until terminated. Sleeps 'wakeup-period' seconds between iterations")
189 ("wakeup-period", po::value
<int>(), ": set the wakeup period of crawler thread (sec)")
195 template <typename I
, typename T
>
196 static int rados_sistrtoll(I
&i
, T
*val
) {
198 *val
= strict_iecstrtoll(i
->second
, &err
);
200 cerr
<< "Invalid value for " << i
->first
<< ": " << err
<< std::endl
;
207 class EstimateDedupRatio
;
209 class CrawlerThread
: public Thread
216 ceph::mutex m_lock
= ceph::make_mutex("CrawlerThread::Locker");
217 ceph::condition_variable m_cond
;
218 int32_t report_period
;
220 uint64_t total_bytes
= 0;
221 uint64_t total_objects
= 0;
222 uint64_t examined_objects
= 0;
223 uint64_t examined_bytes
= 0;
224 uint64_t max_read_size
= 0;
226 #define COND_WAIT_INTERVAL 10
229 CrawlerThread(IoCtx
& io_ctx
, int n
, int m
,
230 ObjectCursor begin
, ObjectCursor end
, int32_t report_period
,
231 uint64_t num_objects
, uint64_t max_read_size
= default_op_size
):
232 io_ctx(io_ctx
), n(n
), m(m
), begin(begin
), end(end
),
233 report_period(report_period
), total_objects(num_objects
), max_read_size(max_read_size
)
236 void signal(int signum
) {
237 std::lock_guard l
{m_lock
};
241 virtual void print_status(Formatter
*f
, ostream
&out
) {}
242 uint64_t get_examined_objects() { return examined_objects
; }
243 uint64_t get_examined_bytes() { return examined_bytes
; }
244 uint64_t get_total_bytes() { return total_bytes
; }
245 uint64_t get_total_objects() { return total_objects
; }
246 void set_debug(const bool debug_
) { debug
= debug_
; }
247 friend class EstimateDedupRatio
;
248 friend class ChunkScrub
;
251 class EstimateDedupRatio
: public CrawlerThread
256 uint64_t max_seconds
;
260 IoCtx
& io_ctx
, int n
, int m
, ObjectCursor begin
, ObjectCursor end
,
261 string chunk_algo
, string fp_algo
, uint64_t chunk_size
, int32_t report_period
,
262 uint64_t num_objects
, uint64_t max_read_size
,
263 uint64_t max_seconds
):
264 CrawlerThread(io_ctx
, n
, m
, begin
, end
, report_period
, num_objects
,
266 chunk_algo(chunk_algo
),
268 chunk_size(chunk_size
),
269 max_seconds(max_seconds
) {
273 estimate_dedup_ratio();
276 void estimate_dedup_ratio();
279 class ChunkScrub
: public CrawlerThread
282 int damaged_objects
= 0;
285 ChunkScrub(IoCtx
& io_ctx
, int n
, int m
, ObjectCursor begin
, ObjectCursor end
,
286 IoCtx
& chunk_io_ctx
, int32_t report_period
, uint64_t num_objects
):
287 CrawlerThread(io_ctx
, n
, m
, begin
, end
, report_period
, num_objects
), chunk_io_ctx(chunk_io_ctx
)
290 chunk_scrub_common();
293 void chunk_scrub_common();
294 int get_damaged_objects() { return damaged_objects
; }
295 void print_status(Formatter
*f
, ostream
&out
);
298 vector
<std::unique_ptr
<CrawlerThread
>> estimate_threads
;
300 static void print_dedup_estimate(std::ostream
& out
, std::string chunk_algo
)
303 uint64_t total_bytes = 0;
304 uint64_t total_objects = 0;
306 uint64_t examined_objects
= 0;
307 uint64_t examined_bytes
= 0;
309 for (auto &et
: estimate_threads
) {
310 examined_objects
+= et
->get_examined_objects();
311 examined_bytes
+= et
->get_examined_bytes();
314 auto f
= Formatter::create("json-pretty");
315 f
->open_object_section("results");
316 f
->dump_string("chunk_algo", chunk_algo
);
317 f
->open_array_section("chunk_sizes");
318 for (auto& i
: dedup_estimates
) {
319 f
->dump_object("chunker", i
.second
);
323 f
->open_object_section("summary");
324 f
->dump_unsigned("examined_objects", examined_objects
);
325 f
->dump_unsigned("examined_bytes", examined_bytes
);
327 f->dump_unsigned("total_objects", total_objects);
328 f->dump_unsigned("total_bytes", total_bytes);
329 f->dump_float("examined_ratio", (float)examined_bytes / (float)total_bytes);
336 static void handle_signal(int signum
)
338 std::lock_guard l
{glock
};
339 for (auto &p
: estimate_threads
) {
344 void EstimateDedupRatio::estimate_dedup_ratio()
346 ObjectCursor shard_start
;
347 ObjectCursor shard_end
;
349 io_ctx
.object_list_slice(
357 utime_t start
= ceph_clock_now();
367 next_report
+= report_period
;
370 ObjectCursor
c(shard_start
);
371 while (c
< shard_end
)
373 std::vector
<ObjectItem
> result
;
374 int r
= io_ctx
.object_list(c
, shard_end
, 12, {}, &result
, &c
);
376 cerr
<< "error object_list : " << cpp_strerror(r
) << std::endl
;
380 unsigned op_size
= max_read_size
;
382 for (const auto & i
: result
) {
383 const auto &oid
= i
.oid
;
385 utime_t now
= ceph_clock_now();
386 if (max_seconds
&& now
> end
) {
393 if (n
== 0 && // first thread only
394 next_report
!= utime_t() && now
> next_report
) {
395 cerr
<< (int)(now
- start
) << "s : read "
396 << dedup_estimates
.begin()->second
.total_bytes
<< " bytes so far..."
398 print_dedup_estimate(cerr
, chunk_algo
);
400 next_report
+= report_period
;
403 // read entire object
408 int ret
= io_ctx
.read(oid
, t
, op_size
, offset
);
416 examined_bytes
+= bl
.length();
419 for (auto& i
: dedup_estimates
) {
420 vector
<pair
<uint64_t, uint64_t>> chunks
;
421 i
.second
.cdc
->calc_chunks(bl
, &chunks
);
422 for (auto& p
: chunks
) {
424 chunk
.substr_of(bl
, p
.first
, p
.second
);
425 i
.second
.add_chunk(chunk
, fp_algo
);
427 cout
<< " " << oid
<< " " << p
.first
<< "~" << p
.second
<< std::endl
;
430 ++i
.second
.total_objects
;
436 void ChunkScrub::chunk_scrub_common()
438 ObjectCursor shard_start
;
439 ObjectCursor shard_end
;
443 ret
= rados
.init_with_context(g_ceph_context
);
445 cerr
<< "couldn't initialize rados: " << cpp_strerror(ret
) << std::endl
;
448 ret
= rados
.connect();
450 cerr
<< "couldn't connect to cluster: " << cpp_strerror(ret
) << std::endl
;
454 chunk_io_ctx
.object_list_slice(
462 ObjectCursor
c(shard_start
);
465 std::vector
<ObjectItem
> result
;
466 int r
= chunk_io_ctx
.object_list(c
, shard_end
, 12, {}, &result
, &c
);
468 cerr
<< "error object_list : " << cpp_strerror(r
) << std::endl
;
472 for (const auto & i
: result
) {
473 std::unique_lock l
{m_lock
};
475 Formatter
*formatter
= Formatter::create("json-pretty");
476 print_status(formatter
, cout
);
481 cout
<< oid
<< std::endl
;
485 ret
= chunk_io_ctx
.getxattr(oid
, CHUNK_REFCOUNT_ATTR
, t
);
494 if (refs
.get_type() != chunk_refs_t::TYPE_BY_OBJECT
) {
495 // we can't do anything here
500 chunk_refs_by_object_t
*byo
=
501 static_cast<chunk_refs_by_object_t
*>(refs
.r
.get());
502 set
<hobject_t
> real_refs
;
504 uint64_t pool_missing
= 0;
505 uint64_t object_missing
= 0;
506 uint64_t does_not_ref
= 0;
507 for (auto& pp
: byo
->by_object
) {
509 ret
= rados
.ioctx_create2(pp
.pool
, target_io_ctx
);
511 cerr
<< oid
<< " ref " << pp
512 << ": referencing pool does not exist" << std::endl
;
517 ret
= cls_cas_references_chunk(target_io_ctx
, pp
.oid
.name
, oid
);
518 if (ret
== -ENOENT
) {
519 cerr
<< oid
<< " ref " << pp
520 << ": referencing object missing" << std::endl
;
522 } else if (ret
== -ENOLINK
) {
523 cerr
<< oid
<< " ref " << pp
524 << ": referencing object does not reference chunk"
529 if (pool_missing
|| object_missing
|| does_not_ref
) {
534 cout
<< "--done--" << std::endl
;
537 using AioCompRef
= unique_ptr
<AioCompletion
>;
539 class SampleDedupWorkerThread
: public Thread
546 string fingerprint
= "";
552 using dup_count_t
= ssize_t
;
554 bool find(string
& fp
) {
555 std::shared_lock
lock(fingerprint_lock
);
556 auto found_item
= fp_map
.find(fp
);
557 return found_item
!= fp_map
.end();
560 // return true if the chunk is duplicate
561 bool add(chunk_t
& chunk
) {
562 std::unique_lock
lock(fingerprint_lock
);
563 auto found_iter
= fp_map
.find(chunk
.fingerprint
);
564 ssize_t cur_reference
= 1;
565 if (found_iter
== fp_map
.end()) {
566 fp_map
.insert({chunk
.fingerprint
, 1});
568 cur_reference
= ++found_iter
->second
;
570 return cur_reference
>= dedup_threshold
&& dedup_threshold
!= -1;
573 void init(size_t dedup_threshold_
) {
574 std::unique_lock
lock(fingerprint_lock
);
576 dedup_threshold
= dedup_threshold_
;
578 FpStore(size_t chunk_threshold
) : dedup_threshold(chunk_threshold
) { }
581 ssize_t dedup_threshold
= -1;
582 std::unordered_map
<std::string
, dup_count_t
> fp_map
;
583 std::shared_mutex fingerprint_lock
;
586 struct SampleDedupGlobal
{
588 const double sampling_ratio
= -1;
591 int sampling_ratio
) :
592 fp_store(chunk_threshold
),
593 sampling_ratio(static_cast<double>(sampling_ratio
) / 100) { }
596 SampleDedupWorkerThread(
602 std::string
&fp_algo
,
603 std::string
&chunk_algo
,
604 SampleDedupGlobal
&sample_dedup_global
) :
606 chunk_io_ctx(chunk_io_ctx
),
607 chunk_size(chunk_size
),
608 fp_type(pg_pool_t::get_fingerprint_from_str(fp_algo
)),
609 chunk_algo(chunk_algo
),
610 sample_dedup_global(sample_dedup_global
),
614 ~SampleDedupWorkerThread() { };
617 void* entry() override
{
624 std::tuple
<std::vector
<ObjectItem
>, ObjectCursor
> get_objects(
625 ObjectCursor current
,
627 size_t max_object_count
);
628 std::vector
<size_t> sample_object(size_t count
);
629 void try_dedup_and_accumulate_result(ObjectItem
&object
);
630 bool ok_to_dedup_all();
631 int do_chunk_dedup(chunk_t
&chunk
);
632 bufferlist
read_object(ObjectItem
&object
);
633 std::vector
<std::tuple
<bufferlist
, pair
<uint64_t, uint64_t>>> do_cdc(
636 std::string
generate_fingerprint(bufferlist chunk_data
);
637 AioCompRef
do_async_evict(string oid
);
641 size_t total_duplicated_size
= 0;
642 size_t total_object_size
= 0;
644 std::set
<std::string
> oid_for_evict
;
645 const size_t chunk_size
= 0;
646 pg_pool_t::fingerprint_t fp_type
= pg_pool_t::TYPE_FINGERPRINT_NONE
;
647 std::string chunk_algo
;
648 SampleDedupGlobal
&sample_dedup_global
;
653 void SampleDedupWorkerThread::crawl()
655 cout
<< "new iteration" << std::endl
;
657 ObjectCursor current_object
= begin
;
658 while (current_object
< end
) {
659 std::vector
<ObjectItem
> objects
;
660 // Get the list of object IDs to deduplicate
661 std::tie(objects
, current_object
) = get_objects(current_object
, end
, 100);
663 // Pick few objects to be processed. Sampling ratio decides how many
664 // objects to pick. Lower sampling ratio makes crawler have lower crawling
665 // overhead but find less duplication.
666 auto sampled_indexes
= sample_object(objects
.size());
667 for (size_t index
: sampled_indexes
) {
668 ObjectItem target
= objects
[index
];
669 try_dedup_and_accumulate_result(target
);
673 vector
<AioCompRef
> evict_completions(oid_for_evict
.size());
675 for (auto &oid
: oid_for_evict
) {
676 evict_completions
[i
] = do_async_evict(oid
);
679 for (auto &completion
: evict_completions
) {
680 completion
->wait_for_complete();
682 cout
<< "done iteration" << std::endl
;
685 AioCompRef
SampleDedupWorkerThread::do_async_evict(string oid
)
688 ObjectReadOperation op_tier
;
689 AioCompRef
completion(rados
.aio_create_completion());
690 op_tier
.tier_evict();
699 std::tuple
<std::vector
<ObjectItem
>, ObjectCursor
> SampleDedupWorkerThread::get_objects(
700 ObjectCursor current
, ObjectCursor end
, size_t max_object_count
)
702 std::vector
<ObjectItem
> objects
;
704 int ret
= io_ctx
.object_list(
712 cerr
<< "error object_list" << std::endl
;
716 return std::make_tuple(objects
, next
);
719 std::vector
<size_t> SampleDedupWorkerThread::sample_object(size_t count
)
721 std::vector
<size_t> indexes(count
);
722 for (size_t i
= 0 ; i
< count
; i
++) {
725 default_random_engine generator
;
726 shuffle(indexes
.begin(), indexes
.end(), generator
);
727 size_t sampling_count
= static_cast<double>(count
) *
728 sample_dedup_global
.sampling_ratio
;
729 indexes
.resize(sampling_count
);
734 void SampleDedupWorkerThread::try_dedup_and_accumulate_result(ObjectItem
&object
)
736 bufferlist data
= read_object(object
);
737 if (data
.length() == 0) {
738 cerr
<< __func__
<< " skip object " << object
.oid
739 << " read returned size 0" << std::endl
;
742 auto chunks
= do_cdc(object
, data
);
743 size_t chunk_total_amount
= 0;
745 // First, check total size of created chunks
746 for (auto &chunk
: chunks
) {
747 auto &chunk_data
= std::get
<0>(chunk
);
748 chunk_total_amount
+= chunk_data
.length();
750 if (chunk_total_amount
!= data
.length()) {
751 cerr
<< __func__
<< " sum of chunked length(" << chunk_total_amount
752 << ") is different from object data length(" << data
.length() << ")"
757 size_t duplicated_size
= 0;
758 list
<chunk_t
> redundant_chunks
;
759 for (auto &chunk
: chunks
) {
760 auto &chunk_data
= std::get
<0>(chunk
);
761 std::string fingerprint
= generate_fingerprint(chunk_data
);
762 std::pair
<uint64_t, uint64_t> chunk_boundary
= std::get
<1>(chunk
);
763 chunk_t chunk_info
= {
765 .start
= chunk_boundary
.first
,
766 .size
= chunk_boundary
.second
,
767 .fingerprint
= fingerprint
,
771 if (sample_dedup_global
.fp_store
.find(fingerprint
)) {
772 duplicated_size
+= chunk_data
.length();
774 if (sample_dedup_global
.fp_store
.add(chunk_info
)) {
775 redundant_chunks
.push_back(chunk_info
);
779 size_t object_size
= data
.length();
781 // perform chunk-dedup
782 for (auto &p
: redundant_chunks
) {
785 total_duplicated_size
+= duplicated_size
;
786 total_object_size
+= object_size
;
789 bufferlist
SampleDedupWorkerThread::read_object(ObjectItem
&object
)
791 bufferlist whole_data
;
795 bufferlist partial_data
;
796 ret
= io_ctx
.read(object
.oid
, partial_data
, default_op_size
, offset
);
798 cerr
<< "read object error " << object
.oid
<< " offset " << offset
799 << " size " << default_op_size
<< " error(" << cpp_strerror(ret
)
801 bufferlist empty_buf
;
805 whole_data
.claim_append(partial_data
);
810 std::vector
<std::tuple
<bufferlist
, pair
<uint64_t, uint64_t>>> SampleDedupWorkerThread::do_cdc(
814 std::vector
<std::tuple
<bufferlist
, pair
<uint64_t, uint64_t>>> ret
;
816 unique_ptr
<CDC
> cdc
= CDC::create(chunk_algo
, cbits(chunk_size
) - 1);
817 vector
<pair
<uint64_t, uint64_t>> chunks
;
818 cdc
->calc_chunks(data
, &chunks
);
819 for (auto &p
: chunks
) {
821 chunk
.substr_of(data
, p
.first
, p
.second
);
822 ret
.push_back(make_tuple(chunk
, p
));
828 std::string
SampleDedupWorkerThread::generate_fingerprint(bufferlist chunk_data
)
833 case pg_pool_t::TYPE_FINGERPRINT_SHA1
:
834 ret
= crypto::digest
<crypto::SHA1
>(chunk_data
).to_str();
837 case pg_pool_t::TYPE_FINGERPRINT_SHA256
:
838 ret
= crypto::digest
<crypto::SHA256
>(chunk_data
).to_str();
841 case pg_pool_t::TYPE_FINGERPRINT_SHA512
:
842 ret
= crypto::digest
<crypto::SHA512
>(chunk_data
).to_str();
845 ceph_assert(0 == "Invalid fp type");
851 int SampleDedupWorkerThread::do_chunk_dedup(chunk_t
&chunk
)
856 int ret
= chunk_io_ctx
.stat(chunk
.fingerprint
, &size
, &mtime
);
858 if (ret
== -ENOENT
) {
860 bl
.append(chunk
.data
);
861 ObjectWriteOperation wop
;
863 chunk_io_ctx
.operate(chunk
.fingerprint
, &wop
);
865 ceph_assert(ret
== 0);
868 ObjectReadOperation op
;
875 CEPH_OSD_OP_FLAG_WITH_REFERENCE
);
876 ret
= io_ctx
.operate(chunk
.oid
, &op
, nullptr);
877 oid_for_evict
.insert(chunk
.oid
);
881 void ChunkScrub::print_status(Formatter
*f
, ostream
&out
)
884 f
->open_array_section("chunk_scrub");
885 f
->dump_string("PID", stringify(get_pid()));
886 f
->open_object_section("Status");
887 f
->dump_string("Total object", stringify(total_objects
));
888 f
->dump_string("Examined objects", stringify(examined_objects
));
889 f
->dump_string("damaged objects", stringify(damaged_objects
));
896 string
get_opts_pool_name(const po::variables_map
&opts
) {
897 if (opts
.count("pool")) {
898 return opts
["pool"].as
<string
>();
900 cerr
<< "must specify pool name" << std::endl
;
904 string
get_opts_chunk_algo(const po::variables_map
&opts
) {
905 if (opts
.count("chunk-algorithm")) {
906 string chunk_algo
= opts
["chunk-algorithm"].as
<string
>();
907 if (!CDC::create(chunk_algo
, 12)) {
908 cerr
<< "unrecognized chunk-algorithm " << chunk_algo
<< std::endl
;
913 cerr
<< "must specify chunk-algorithm" << std::endl
;
917 string
get_opts_fp_algo(const po::variables_map
&opts
) {
918 if (opts
.count("fingerprint-algorithm")) {
919 string fp_algo
= opts
["fingerprint-algorithm"].as
<string
>();
920 if (fp_algo
!= "sha1"
921 && fp_algo
!= "sha256" && fp_algo
!= "sha512") {
922 cerr
<< "unrecognized fingerprint-algorithm " << fp_algo
<< std::endl
;
927 cout
<< "SHA1 is set as fingerprint algorithm by default" << std::endl
;
928 return string("sha1");
931 string
get_opts_op_name(const po::variables_map
&opts
) {
932 if (opts
.count("op")) {
933 return opts
["op"].as
<string
>();
935 cerr
<< "must specify op" << std::endl
;
940 string
get_opts_chunk_pool(const po::variables_map
&opts
) {
941 if (opts
.count("chunk-pool")) {
942 return opts
["chunk-pool"].as
<string
>();
944 cerr
<< "must specify --chunk-pool" << std::endl
;
949 string
get_opts_object_name(const po::variables_map
&opts
) {
950 if (opts
.count("object")) {
951 return opts
["object"].as
<string
>();
953 cerr
<< "must specify object" << std::endl
;
958 int get_opts_max_thread(const po::variables_map
&opts
) {
959 if (opts
.count("max-thread")) {
960 return opts
["max-thread"].as
<int>();
962 cout
<< "2 is set as the number of threads by default" << std::endl
;
967 int get_opts_report_period(const po::variables_map
&opts
) {
968 if (opts
.count("report-period")) {
969 return opts
["report-period"].as
<int>();
971 cout
<< "10 seconds is set as report period by default" << std::endl
;
976 int estimate_dedup_ratio(const po::variables_map
&opts
)
980 std::string chunk_algo
= "fastcdc";
981 string fp_algo
= "sha1";
983 uint64_t chunk_size
= 8192;
984 uint64_t min_chunk_size
= 8192;
985 uint64_t max_chunk_size
= 4*1024*1024;
986 unsigned max_thread
= default_max_thread
;
987 uint32_t report_period
= default_report_period
;
988 uint64_t max_read_size
= default_op_size
;
989 uint64_t max_seconds
= 0;
991 std::map
<std::string
, std::string
>::const_iterator i
;
995 librados::pool_stat_t s
;
996 list
<string
> pool_names
;
997 map
<string
, librados::pool_stat_t
> stats
;
999 pool_name
= get_opts_pool_name(opts
);
1000 if (opts
.count("chunk-algorithm")) {
1001 chunk_algo
= opts
["chunk-algorithm"].as
<string
>();
1002 if (!CDC::create(chunk_algo
, 12)) {
1003 cerr
<< "unrecognized chunk-algorithm " << chunk_algo
<< std::endl
;
1007 cerr
<< "must specify chunk-algorithm" << std::endl
;
1010 fp_algo
= get_opts_fp_algo(opts
);
1011 if (opts
.count("chunk-size")) {
1012 chunk_size
= opts
["chunk-size"].as
<int>();
1014 cout
<< "8192 is set as chunk size by default" << std::endl
;
1016 if (opts
.count("min-chunk-size")) {
1017 chunk_size
= opts
["min-chunk-size"].as
<int>();
1019 cout
<< "8192 is set as min chunk size by default" << std::endl
;
1021 if (opts
.count("max-chunk-size")) {
1022 chunk_size
= opts
["max-chunk-size"].as
<int>();
1024 cout
<< "4MB is set as max chunk size by default" << std::endl
;
1026 max_thread
= get_opts_max_thread(opts
);
1027 report_period
= get_opts_report_period(opts
);
1028 if (opts
.count("max-seconds")) {
1029 max_seconds
= opts
["max-seconds"].as
<int>();
1031 cout
<< "max seconds is not set" << std::endl
;
1033 if (opts
.count("max-read-size")) {
1034 max_read_size
= opts
["max-read-size"].as
<int>();
1036 cout
<< default_op_size
<< " is set as max-read-size by default" << std::endl
;
1038 if (opts
.count("debug")) {
1041 boost::optional
<pg_t
> pgid(opts
.count("pgid"), pg_t());
1043 ret
= rados
.init_with_context(g_ceph_context
);
1045 cerr
<< "couldn't initialize rados: " << cpp_strerror(ret
) << std::endl
;
1048 ret
= rados
.connect();
1050 cerr
<< "couldn't connect to cluster: " << cpp_strerror(ret
) << std::endl
;
1054 if (pool_name
.empty()) {
1055 cerr
<< "--create-pool requested but pool_name was not specified!" << std::endl
;
1058 ret
= rados
.ioctx_create(pool_name
.c_str(), io_ctx
);
1060 cerr
<< "error opening pool "
1061 << pool_name
<< ": "
1062 << cpp_strerror(ret
) << std::endl
;
1068 dedup_estimates
.emplace(std::piecewise_construct
,
1069 std::forward_as_tuple(chunk_size
),
1070 std::forward_as_tuple(chunk_algo
, cbits(chunk_size
)-1));
1072 for (size_t cs
= min_chunk_size
; cs
<= max_chunk_size
; cs
*= 2) {
1073 dedup_estimates
.emplace(std::piecewise_construct
,
1074 std::forward_as_tuple(cs
),
1075 std::forward_as_tuple(chunk_algo
, cbits(cs
)-1));
1080 begin
= io_ctx
.object_list_begin();
1081 end
= io_ctx
.object_list_end();
1082 pool_names
.push_back(pool_name
);
1083 ret
= rados
.get_pool_stats(pool_names
, stats
);
1085 cerr
<< "error fetching pool stats: " << cpp_strerror(ret
) << std::endl
;
1089 if (stats
.find(pool_name
) == stats
.end()) {
1090 cerr
<< "stats can not find pool name: " << pool_name
<< std::endl
;
1094 s
= stats
[pool_name
];
1096 for (unsigned i
= 0; i
< max_thread
; i
++) {
1097 std::unique_ptr
<CrawlerThread
> ptr (
1098 new EstimateDedupRatio(io_ctx
, i
, max_thread
, begin
, end
,
1099 chunk_algo
, fp_algo
, chunk_size
,
1100 report_period
, s
.num_objects
, max_read_size
,
1102 ptr
->create("estimate_thread");
1103 ptr
->set_debug(debug
);
1104 estimate_threads
.push_back(move(ptr
));
1108 for (auto &p
: estimate_threads
) {
1112 print_dedup_estimate(cout
, chunk_algo
);
1115 return (ret
< 0) ? 1 : 0;
1118 static void print_chunk_scrub()
1120 uint64_t total_objects
= 0;
1121 uint64_t examined_objects
= 0;
1122 int damaged_objects
= 0;
1124 for (auto &et
: estimate_threads
) {
1125 if (!total_objects
) {
1126 total_objects
= et
->get_total_objects();
1128 examined_objects
+= et
->get_examined_objects();
1129 ChunkScrub
*ptr
= static_cast<ChunkScrub
*>(et
.get());
1130 damaged_objects
+= ptr
->get_damaged_objects();
1133 cout
<< " Total object : " << total_objects
<< std::endl
;
1134 cout
<< " Examined object : " << examined_objects
<< std::endl
;
1135 cout
<< " Damaged object : " << damaged_objects
<< std::endl
;
1138 int chunk_scrub_common(const po::variables_map
&opts
)
1141 IoCtx io_ctx
, chunk_io_ctx
;
1142 std::string object_name
, target_object_name
;
1143 string chunk_pool_name
, op_name
;
1145 unsigned max_thread
= default_max_thread
;
1146 std::map
<std::string
, std::string
>::const_iterator i
;
1147 uint32_t report_period
= default_report_period
;
1150 librados::pool_stat_t s
;
1151 list
<string
> pool_names
;
1152 map
<string
, librados::pool_stat_t
> stats
;
1154 op_name
= get_opts_op_name(opts
);
1155 chunk_pool_name
= get_opts_chunk_pool(opts
);
1156 boost::optional
<pg_t
> pgid(opts
.count("pgid"), pg_t());
1158 ret
= rados
.init_with_context(g_ceph_context
);
1160 cerr
<< "couldn't initialize rados: " << cpp_strerror(ret
) << std::endl
;
1163 ret
= rados
.connect();
1165 cerr
<< "couldn't connect to cluster: " << cpp_strerror(ret
) << std::endl
;
1169 ret
= rados
.ioctx_create(chunk_pool_name
.c_str(), chunk_io_ctx
);
1171 cerr
<< "error opening pool "
1172 << chunk_pool_name
<< ": "
1173 << cpp_strerror(ret
) << std::endl
;
1177 if (op_name
== "chunk-get-ref" ||
1178 op_name
== "chunk-put-ref" ||
1179 op_name
== "chunk-repair") {
1180 string target_object_name
;
1182 object_name
= get_opts_object_name(opts
);
1183 if (opts
.count("target-ref")) {
1184 target_object_name
= opts
["target-ref"].as
<string
>();
1186 cerr
<< "must specify target ref" << std::endl
;
1189 if (opts
.count("target-ref-pool-id")) {
1190 pool_id
= opts
["target-ref-pool-id"].as
<uint64_t>();
1192 cerr
<< "must specify target-ref-pool-id" << std::endl
;
1197 ret
= chunk_io_ctx
.get_object_hash_position2(object_name
, &hash
);
1201 hobject_t
oid(sobject_t(target_object_name
, CEPH_NOSNAP
), "", hash
, pool_id
, "");
1203 auto run_op
= [] (ObjectWriteOperation
& op
, hobject_t
& oid
,
1204 string
& object_name
, IoCtx
& chunk_io_ctx
) -> int {
1205 int ret
= chunk_io_ctx
.operate(object_name
, &op
);
1207 cerr
<< " operate fail : " << cpp_strerror(ret
) << std::endl
;
1212 ObjectWriteOperation op
;
1213 if (op_name
== "chunk-get-ref") {
1214 cls_cas_chunk_get_ref(op
, oid
);
1215 ret
= run_op(op
, oid
, object_name
, chunk_io_ctx
);
1216 } else if (op_name
== "chunk-put-ref") {
1217 cls_cas_chunk_put_ref(op
, oid
);
1218 ret
= run_op(op
, oid
, object_name
, chunk_io_ctx
);
1219 } else if (op_name
== "chunk-repair") {
1220 ret
= rados
.ioctx_create2(pool_id
, io_ctx
);
1222 cerr
<< oid
<< " ref " << pool_id
1223 << ": referencing pool does not exist" << std::endl
;
1226 int chunk_ref
= -1, base_ref
= -1;
1227 // read object on chunk pool to know how many reference the object has
1229 ret
= chunk_io_ctx
.getxattr(object_name
, CHUNK_REFCOUNT_ATTR
, t
);
1234 auto p
= t
.cbegin();
1236 if (refs
.get_type() != chunk_refs_t::TYPE_BY_OBJECT
) {
1237 cerr
<< " does not supported chunk type " << std::endl
;
1241 static_cast<chunk_refs_by_object_t
*>(refs
.r
.get())->by_object
.count(oid
);
1242 if (chunk_ref
< 0) {
1243 cerr
<< object_name
<< " has no reference of " << target_object_name
1247 cout
<< object_name
<< " has " << chunk_ref
<< " references for "
1248 << target_object_name
<< std::endl
;
1250 // read object on base pool to know the number of chunk object's references
1251 base_ref
= cls_cas_references_chunk(io_ctx
, target_object_name
, object_name
);
1253 if (base_ref
== -ENOENT
|| base_ref
== -ENOLINK
) {
1259 cout
<< target_object_name
<< " has " << base_ref
<< " references for "
1260 << object_name
<< std::endl
;
1261 if (chunk_ref
!= base_ref
) {
1262 if (base_ref
> chunk_ref
) {
1263 cerr
<< "error : " << target_object_name
<< "'s ref. < " << object_name
1264 << "' ref. " << std::endl
;
1267 cout
<< " fix dangling reference from " << chunk_ref
<< " to " << base_ref
1269 while (base_ref
!= chunk_ref
) {
1270 ObjectWriteOperation op
;
1271 cls_cas_chunk_put_ref(op
, oid
);
1273 ret
= run_op(op
, oid
, object_name
, chunk_io_ctx
);
1282 } else if (op_name
== "dump-chunk-refs") {
1283 object_name
= get_opts_object_name(opts
);
1285 ret
= chunk_io_ctx
.getxattr(object_name
, CHUNK_REFCOUNT_ATTR
, t
);
1290 auto p
= t
.cbegin();
1292 auto f
= Formatter::create("json-pretty");
1293 f
->dump_object("refs", refs
);
1298 max_thread
= get_opts_max_thread(opts
);
1299 report_period
= get_opts_report_period(opts
);
1301 begin
= chunk_io_ctx
.object_list_begin();
1302 end
= chunk_io_ctx
.object_list_end();
1303 pool_names
.push_back(chunk_pool_name
);
1304 ret
= rados
.get_pool_stats(pool_names
, stats
);
1306 cerr
<< "error fetching pool stats: " << cpp_strerror(ret
) << std::endl
;
1310 if (stats
.find(chunk_pool_name
) == stats
.end()) {
1311 cerr
<< "stats can not find pool name: " << chunk_pool_name
<< std::endl
;
1315 s
= stats
[chunk_pool_name
];
1317 for (unsigned i
= 0; i
< max_thread
; i
++) {
1318 std::unique_ptr
<CrawlerThread
> ptr (
1319 new ChunkScrub(io_ctx
, i
, max_thread
, begin
, end
, chunk_io_ctx
,
1320 report_period
, s
.num_objects
));
1321 ptr
->create("estimate_thread");
1322 estimate_threads
.push_back(move(ptr
));
1326 for (auto &p
: estimate_threads
) {
1327 cout
<< "join " << std::endl
;
1329 cout
<< "joined " << std::endl
;
1332 print_chunk_scrub();
1335 return (ret
< 0) ? 1 : 0;
1338 string
make_pool_str(string pool
, string var
, string val
)
1340 return string("{\"prefix\": \"osd pool set\",\"pool\":\"") + pool
1341 + string("\",\"var\": \"") + var
+ string("\",\"val\": \"")
1342 + val
+ string("\"}");
1345 string
make_pool_str(string pool
, string var
, int val
)
1347 return make_pool_str(pool
, var
, stringify(val
));
1350 int make_dedup_object(const po::variables_map
&opts
)
1353 IoCtx io_ctx
, chunk_io_ctx
;
1354 std::string object_name
, chunk_pool_name
, op_name
, pool_name
, fp_algo
;
1356 std::map
<std::string
, std::string
>::const_iterator i
;
1358 op_name
= get_opts_op_name(opts
);
1359 pool_name
= get_opts_pool_name(opts
);
1360 object_name
= get_opts_object_name(opts
);
1361 chunk_pool_name
= get_opts_chunk_pool(opts
);
1362 boost::optional
<pg_t
> pgid(opts
.count("pgid"), pg_t());
1364 ret
= rados
.init_with_context(g_ceph_context
);
1366 cerr
<< "couldn't initialize rados: " << cpp_strerror(ret
) << std::endl
;
1369 ret
= rados
.connect();
1371 cerr
<< "couldn't connect to cluster: " << cpp_strerror(ret
) << std::endl
;
1375 ret
= rados
.ioctx_create(pool_name
.c_str(), io_ctx
);
1377 cerr
<< "error opening pool "
1378 << chunk_pool_name
<< ": "
1379 << cpp_strerror(ret
) << std::endl
;
1382 ret
= rados
.ioctx_create(chunk_pool_name
.c_str(), chunk_io_ctx
);
1384 cerr
<< "error opening pool "
1385 << chunk_pool_name
<< ": "
1386 << cpp_strerror(ret
) << std::endl
;
1389 fp_algo
= get_opts_fp_algo(opts
);
1391 if (op_name
== "chunk-dedup") {
1392 uint64_t offset
, length
;
1393 string chunk_object
;
1394 if (opts
.count("source-off")) {
1395 offset
= opts
["source-off"].as
<uint64_t>();
1397 cerr
<< "must specify --source-off" << std::endl
;
1400 if (opts
.count("source-length")) {
1401 length
= opts
["source-length"].as
<uint64_t>();
1403 cerr
<< "must specify --source-length" << std::endl
;
1406 // 1. make a copy from manifest object to chunk object
1408 ret
= io_ctx
.read(object_name
, bl
, length
, offset
);
1410 cerr
<< " reading object in base pool fails : " << cpp_strerror(ret
) << std::endl
;
1413 chunk_object
= [&fp_algo
, &bl
]() -> string
{
1414 if (fp_algo
== "sha1") {
1415 return ceph::crypto::digest
<ceph::crypto::SHA1
>(bl
).to_str();
1416 } else if (fp_algo
== "sha256") {
1417 return ceph::crypto::digest
<ceph::crypto::SHA256
>(bl
).to_str();
1418 } else if (fp_algo
== "sha512") {
1419 return ceph::crypto::digest
<ceph::crypto::SHA512
>(bl
).to_str();
1421 assert(0 == "unrecognized fingerprint type");
1425 ret
= chunk_io_ctx
.write(chunk_object
, bl
, length
, offset
);
1427 cerr
<< " writing object in chunk pool fails : " << cpp_strerror(ret
) << std::endl
;
1430 // 2. call set_chunk
1431 ObjectReadOperation op
;
1432 op
.set_chunk(offset
, length
, chunk_io_ctx
, chunk_object
, 0,
1433 CEPH_OSD_OP_FLAG_WITH_REFERENCE
);
1434 ret
= io_ctx
.operate(object_name
, &op
, NULL
);
1436 cerr
<< " operate fail : " << cpp_strerror(ret
) << std::endl
;
1439 } else if (op_name
== "object-dedup") {
1440 unsigned chunk_size
= 0;
1442 if (opts
.count("dedup-cdc-chunk-size")) {
1443 chunk_size
= opts
["dedup-cdc-chunk-size"].as
<unsigned int>();
1445 cerr
<< "must specify --dedup-cdc-chunk-size" << std::endl
;
1448 if (opts
.count("snap")) {
1453 ret
= rados
.mon_command(
1454 make_pool_str(pool_name
, "fingerprint_algorithm", fp_algo
),
1457 cerr
<< " operate fail : " << cpp_strerror(ret
) << std::endl
;
1460 ret
= rados
.mon_command(
1461 make_pool_str(pool_name
, "dedup_tier", chunk_pool_name
),
1464 cerr
<< " operate fail : " << cpp_strerror(ret
) << std::endl
;
1467 ret
= rados
.mon_command(
1468 make_pool_str(pool_name
, "dedup_chunk_algorithm", "fastcdc"),
1471 cerr
<< " operate fail : " << cpp_strerror(ret
) << std::endl
;
1474 ret
= rados
.mon_command(
1475 make_pool_str(pool_name
, "dedup_cdc_chunk_size", chunk_size
),
1478 cerr
<< " operate fail : " << cpp_strerror(ret
) << std::endl
;
1482 auto create_new_deduped_object
=
1483 [&io_ctx
](string object_name
) -> int {
1485 // tier-flush to perform deduplication
1486 ObjectReadOperation flush_op
;
1487 flush_op
.tier_flush();
1488 int ret
= io_ctx
.operate(object_name
, &flush_op
, NULL
);
1490 cerr
<< " tier_flush fail : " << cpp_strerror(ret
) << std::endl
;
1494 ObjectReadOperation evict_op
;
1495 evict_op
.tier_evict();
1496 ret
= io_ctx
.operate(object_name
, &evict_op
, NULL
);
1498 cerr
<< " tier_evict fail : " << cpp_strerror(ret
) << std::endl
;
1505 io_ctx
.snap_set_read(librados::SNAP_DIR
);
1506 snap_set_t snap_set
;
1508 ObjectReadOperation op
;
1509 op
.list_snaps(&snap_set
, &snap_ret
);
1510 io_ctx
.operate(object_name
, &op
, NULL
);
1512 for (vector
<librados::clone_info_t
>::const_iterator r
= snap_set
.clones
.begin();
1513 r
!= snap_set
.clones
.end();
1515 io_ctx
.snap_set_read(r
->cloneid
);
1516 ret
= create_new_deduped_object(object_name
);
1522 ret
= create_new_deduped_object(object_name
);
1527 return (ret
< 0) ? 1 : 0;
1530 int make_crawling_daemon(const po::variables_map
&opts
)
1532 string base_pool_name
= get_opts_pool_name(opts
);
1533 string chunk_pool_name
= get_opts_chunk_pool(opts
);
1534 unsigned max_thread
= get_opts_max_thread(opts
);
1537 if (opts
.count("loop")) {
1541 int sampling_ratio
= -1;
1542 if (opts
.count("sampling-ratio")) {
1543 sampling_ratio
= opts
["sampling-ratio"].as
<int>();
1545 size_t chunk_size
= 8192;
1546 if (opts
.count("chunk-size")) {
1547 chunk_size
= opts
["chunk-size"].as
<int>();
1549 cout
<< "8192 is set as chunk size by default" << std::endl
;
1552 uint32_t chunk_dedup_threshold
= -1;
1553 if (opts
.count("chunk-dedup-threshold")) {
1554 chunk_dedup_threshold
= opts
["chunk-dedup-threshold"].as
<uint32_t>();
1557 std::string chunk_algo
= get_opts_chunk_algo(opts
);
1560 int ret
= rados
.init_with_context(g_ceph_context
);
1562 cerr
<< "couldn't initialize rados: " << cpp_strerror(ret
) << std::endl
;
1565 ret
= rados
.connect();
1567 cerr
<< "couldn't connect to cluster: " << cpp_strerror(ret
) << std::endl
;
1570 int wakeup_period
= 100;
1571 if (opts
.count("wakeup-period")) {
1572 wakeup_period
= opts
["wakeup-period"].as
<int>();
1574 cout
<< "100 second is set as wakeup period by default" << std::endl
;
1577 std::string fp_algo
= get_opts_fp_algo(opts
);
1579 list
<string
> pool_names
;
1580 IoCtx io_ctx
, chunk_io_ctx
;
1581 pool_names
.push_back(base_pool_name
);
1582 ret
= rados
.ioctx_create(base_pool_name
.c_str(), io_ctx
);
1584 cerr
<< "error opening base pool "
1585 << base_pool_name
<< ": "
1586 << cpp_strerror(ret
) << std::endl
;
1590 ret
= rados
.ioctx_create(chunk_pool_name
.c_str(), chunk_io_ctx
);
1592 cerr
<< "error opening chunk pool "
1593 << chunk_pool_name
<< ": "
1594 << cpp_strerror(ret
) << std::endl
;
1598 ret
= rados
.mon_command(
1599 make_pool_str(base_pool_name
, "fingerprint_algorithm", fp_algo
),
1602 cerr
<< " operate fail : " << cpp_strerror(ret
) << std::endl
;
1605 ret
= rados
.mon_command(
1606 make_pool_str(base_pool_name
, "dedup_chunk_algorithm", "fastcdc"),
1609 cerr
<< " operate fail : " << cpp_strerror(ret
) << std::endl
;
1612 ret
= rados
.mon_command(
1613 make_pool_str(base_pool_name
, "dedup_cdc_chunk_size", chunk_size
),
1616 cerr
<< " operate fail : " << cpp_strerror(ret
) << std::endl
;
1619 ret
= rados
.mon_command(
1620 make_pool_str(base_pool_name
, "dedup_tier", chunk_pool_name
),
1623 cerr
<< " operate fail : " << cpp_strerror(ret
) << std::endl
;
1627 cout
<< "SampleRatio : " << sampling_ratio
<< std::endl
1628 << "Chunk Dedup Threshold : " << chunk_dedup_threshold
<< std::endl
1629 << "Chunk Size : " << chunk_size
<< std::endl
1633 lock_guard
lock(glock
);
1634 ObjectCursor begin
= io_ctx
.object_list_begin();
1635 ObjectCursor end
= io_ctx
.object_list_end();
1636 map
<string
, librados::pool_stat_t
> stats
;
1637 ret
= rados
.get_pool_stats(pool_names
, stats
);
1639 cerr
<< "error fetching pool stats: " << cpp_strerror(ret
) << std::endl
;
1642 if (stats
.find(base_pool_name
) == stats
.end()) {
1643 cerr
<< "stats can not find pool name: " << base_pool_name
<< std::endl
;
1647 SampleDedupWorkerThread::SampleDedupGlobal
sample_dedup_global(
1648 chunk_dedup_threshold
, sampling_ratio
);
1650 std::list
<SampleDedupWorkerThread
> threads
;
1651 for (unsigned i
= 0; i
< max_thread
; i
++) {
1652 cout
<< " add thread.. " << std::endl
;
1653 ObjectCursor shard_start
;
1654 ObjectCursor shard_end
;
1655 io_ctx
.object_list_slice(
1663 threads
.emplace_back(
1671 sample_dedup_global
);
1672 threads
.back().create("sample_dedup");
1675 for (auto &p
: threads
) {
1679 sleep(wakeup_period
);
1688 int main(int argc
, const char **argv
)
1690 auto args
= argv_to_vec(argc
, argv
);
1692 cerr
<< argv
[0] << ": -h or --help for usage" << std::endl
;
1696 po::variables_map opts
;
1697 po::positional_options_description p
;
1698 p
.add("command", 1);
1699 po::options_description desc
= make_usage();
1701 po::parsed_options parsed
=
1702 po::command_line_parser(argc
, argv
).options(desc
).positional(p
).allow_unregistered().run();
1703 po::store(parsed
, opts
);
1705 } catch(po::error
&e
) {
1706 std::cerr
<< e
.what() << std::endl
;
1709 if (opts
.count("help") || opts
.count("h")) {
1710 cout
<< desc
<< std::endl
;
1714 auto cct
= global_init(NULL
, args
, CEPH_ENTITY_TYPE_CLIENT
,
1715 CODE_ENVIRONMENT_DAEMON
,
1716 CINIT_FLAG_UNPRIVILEGED_DAEMON_DEFAULTS
);
1719 if (global_init_prefork(g_ceph_context
) >= 0) {
1721 int r
= forker
.prefork(err
);
1723 cerr
<< err
<< std::endl
;
1726 if (forker
.is_parent()) {
1727 g_ceph_context
->_log
->start();
1728 if (forker
.parent_wait(err
) != 0) {
1733 global_init_postfork_start(g_ceph_context
);
1735 common_init_finish(g_ceph_context
);
1736 if (opts
.count("daemon")) {
1737 global_init_postfork_finish(g_ceph_context
);
1740 init_async_signal_handler();
1741 register_async_signal_handler_oneshot(SIGINT
, handle_signal
);
1742 register_async_signal_handler_oneshot(SIGTERM
, handle_signal
);
1744 string op_name
= get_opts_op_name(opts
);
1746 if (op_name
== "estimate") {
1747 ret
= estimate_dedup_ratio(opts
);
1748 } else if (op_name
== "chunk-scrub" ||
1749 op_name
== "chunk-get-ref" ||
1750 op_name
== "chunk-put-ref" ||
1751 op_name
== "chunk-repair" ||
1752 op_name
== "dump-chunk-refs") {
1753 ret
= chunk_scrub_common(opts
);
1754 } else if (op_name
== "chunk-dedup" ||
1755 op_name
== "object-dedup") {
1758 * using a chunk generated by given source,
1759 * create a new object in the chunk pool or increase the reference
1760 * if the object exists
1763 * perform deduplication on the entire object, not a chunk.
1766 ret
= make_dedup_object(opts
);
1767 } else if (op_name
== "sample-dedup") {
1768 ret
= make_crawling_daemon(opts
);
1770 cerr
<< "unrecognized op " << op_name
<< std::endl
;
1774 unregister_async_signal_handler(SIGINT
, handle_signal
);
1775 unregister_async_signal_handler(SIGTERM
, handle_signal
);
1776 shutdown_async_signal_handler();
1778 return forker
.signal_exit(ret
);