]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/deduplication.py
update ceph source to reef 18.1.2
[ceph.git] / ceph / qa / tasks / deduplication.py
1 """
2 Run ceph-dedup-tool
3 """
4 import contextlib
5 import logging
6 import gevent
7 from teuthology import misc as teuthology
8 import json
9 import time
10 from io import StringIO
11
12 log = logging.getLogger(__name__)
13
14 @contextlib.contextmanager
15 def task(ctx, config):
16 """
17 Run ceph-dedup-tool.
18 The config should be as follows::
19 ceph-dedup-tool:
20 clients: [client list]
21 op: <operation name>
22 pool: <pool name>
23 chunk_pool: <chunk pool name>
24 chunk_size: <chunk size>
25 chunk_algorithm: <chunk algorithm, fixed|fastcdc>
26 fingerprint_algorithm: <fingerprint algorithm, sha1|sha256|sha512>
27 chunk_dedup_threashold: <the number of duplicate chunks to trigger chunk dedup>
28 max_thread: <the number of threads>
29 wakeup_period: <duration>
30 For example::
31 tasks:
32 - exec:
33 client.0:
34 - sudo ceph osd pool create low_tier 4
35 - deduplication:
36 clients: [client.0]
37 op: 'sample-dedup'
38 pool: 'default.rgw.buckets.data'
39 chunk_pool: 'low_tier'
40 chunk_size: 131072
41 chunk_algorithm: 'fastcdc'
42 fingerprint_algorithm: 'sha1'
43 chunk_dedup_threshold: 5
44 max_thread: 2
45 wakeup_period: 20
46 sampling_ratio: 100
47 """
48 log.info('Beginning deduplication...')
49 assert isinstance(config, dict), \
50 "please list clients to run on"
51
52 args = [
53 'ceph-dedup-tool']
54 if config.get('op', None):
55 args.extend(['--op', config.get('op', None)])
56 if config.get('chunk_pool', None):
57 args.extend(['--chunk-pool', config.get('chunk_pool', None)])
58 if config.get('chunk_size', False):
59 args.extend(['--chunk-size', str(config.get('chunk_size', 131072))])
60 if config.get('chunk_algorithm', False):
61 args.extend(['--chunk-algorithm', config.get('chunk_algorithm', None)] )
62 if config.get('fingerprint_algorithm', False):
63 args.extend(['--fingerprint-algorithm', config.get('fingerprint_algorithm', None)] )
64 if config.get('chunk_dedup_threshold', False):
65 args.extend(['--chunk-dedup-threshold', str(config.get('chunk_dedup_threshold', 1))])
66 if config.get('max_thread', False):
67 args.extend(['--max-thread', str(config.get('max_thread', 2))])
68 if config.get('sampling_ratio', False):
69 args.extend(['--sampling-ratio', str(config.get('sampling_ratio', 100))])
70 if config.get('wakeup_period', False):
71 args.extend(['--wakeup-period', str(config.get('wakeup_period', 20))])
72 if config.get('pool', False):
73 args.extend(['--pool', config.get('pool', None)])
74
75 args.extend([
76 '--debug',
77 '--daemon',
78 '--loop'])
79
80 def thread():
81 run_remote(args, False, 0)
82
83 def run_remote(args, need_wait, client_num):
84 clients = ['client.{id}'.format(id=id_) for id_ in teuthology.all_roles_of_type(ctx.cluster, 'client')]
85 log.info('clients are %s' % clients)
86 role = 'client.{id}'.format(id=client_num)
87 if role not in clients:
88 raise Exception('wrong client {c}'.format(c=role))
89 assert isinstance(role, str)
90 PREFIX = 'client.'
91 assert role.startswith(PREFIX)
92 testdir = teuthology.get_testdir(ctx)
93 cmd_args = [
94 'adjust-ulimits',
95 'ceph-coverage',
96 '{tdir}/archive/coverage'.format(tdir=testdir)]
97 cmd_args.extend(args)
98 log.info("cmd: %s", cmd_args)
99 tries = 0
100 while True:
101 (remote,) = ctx.cluster.only(role).remotes.keys()
102 proc = remote.run(
103 args=cmd_args,
104 wait=need_wait, check_status=False,
105 stdout=StringIO(),
106 )
107 log.info('exitstatus {r}'.format(r=proc.exitstatus))
108 if proc.exitstatus == 0 or need_wait == False:
109 log.info('proc stdout ', proc.stdout.getvalue())
110 return proc.stdout.getvalue().strip()
111 tries += 1
112 if tries > 30:
113 raise Exception('timed out getting correct exitstatus')
114 time.sleep(30)
115
116 def get_chunk_objs(chunk_pool):
117 chunk_obj_list = run_remote(('rados ls -p ' + chunk_pool).split(), True, 1).split()
118 if chunk_obj_list == False:
119 return None
120 else:
121 return chunk_obj_list
122
123 def get_ref_list(chunk_pool, chunk_obj):
124 # get reference list of chunk object
125 dump_str = run_remote(
126 ('ceph-dedup-tool --op dump-chunk-refs --chunk-pool '
127 + chunk_pool + ' --object ' + chunk_obj).split(),
128 True, 1
129 )
130 # fail in case that reference object is not written
131 assert len(dump_str) > 0
132 log.info('{0} obj has {1} refs'
133 .format(chunk_obj, json.loads(dump_str)['count']))
134
135 # check if chunk object's reference object exists in base-tier
136 ref_list = json.loads(dump_str)['refs']
137 return ref_list
138
139 # To validate whether the sample-dedup operation works well, this function checks if
140 # 1. sample-dedup has been started and
141 # 2. reference of chunk objects' exists in correct base pool
142 def validate():
143 log.info('start validating sample-dedup')
144 base_pool = config.get('pool', None)
145 chunk_pool = config.get('chunk_pool', None)
146 max_validation_cnt = 15
147 retry_cnt = 0
148 # chunk objs for re-validation after chunk-repair
149 retry_chunk_objs = list()
150
151 # check whether sample-dedup has been started
152 chunk_obj_list = get_chunk_objs(chunk_pool)
153 while (chunk_obj_list == None or len(chunk_obj_list) == 0) and retry_cnt < max_validation_cnt:
154 # retry getting # chunk objs after 30 secs of sleep
155 time.sleep(30)
156 chunk_obj_list = get_chunk_objs(chunk_pool)
157 retry_cnt += 1
158 log.info('chunk pool empty. retry ', retry_cnt)
159 assert retry_cnt < max_validation_cnt
160
161 log.info('sample-dedup started successfully')
162
163 retry_cnt = 0
164 max_validation_cnt = 5
165 # validate chunk pool for max_validation_cnt times
166 while retry_cnt < max_validation_cnt:
167 for chunk_obj in chunk_obj_list:
168 ref_list = get_ref_list(chunk_pool, chunk_obj)
169 for ref in ref_list:
170 ret = run_remote(
171 ('rados -p ' + base_pool + ' stat ' + ref['oid'])
172 .split(), True, 1
173 )
174 # check if ref exists in base pool
175 if ret == False or len(ret) == 0:
176 # if ref not exists in base pool, try repair in order to avoid
177 # false-positive inconsistent reference
178 ret = run_remote(('ceph osd pool stats ' + base_pool).split(), True, 1)
179 assert len(ret) > 0
180 base_pool_id = ret.split()[3]
181 ret = run_remote(
182 ('ceph-dedup-tool --op chunk-repair --chunk-pool '
183 + chunk_pool + ' --object ' + chunk_obj + ' --target-ref '
184 + ref['oid'] + ' --target-ref-pool-id ' + base_pool_id)
185 .split(), True, 1
186 )
187 retry_chunk_objs.append(chunk_obj)
188 log.info('{0} obj exists in {1}'.format(ref['oid'], base_pool))
189
190 # retry validation for repaired objects
191 for chunk_obj in retry_chunk_objs:
192 ref_list = get_ref_list(chunk_pool, chunk_obj)
193 for ref in ref_list:
194 ret = run_remote(
195 ('rados -p ' + base_pool + ' stat ' + ref['oid'])
196 .split(), True, 1
197 )
198 assert len(ret) > 0
199 log.info(
200 '{0} obj exists in {1} after repair'.format(ref['oid'],
201 base_pool)
202 )
203 retry_chunk_objs = list()
204
205 # get chunk objects for the next loop
206 chunk_obj_list = get_chunk_objs(chunk_pool)
207 retry_cnt += 1
208 time.sleep(30)
209 return True
210
211
212 running = gevent.spawn(thread)
213 checker = gevent.spawn(validate)
214
215 try:
216 yield
217 finally:
218 log.info('joining ceph-dedup-tool')
219 running.get()
220 checker.get()