]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | """ |
2 | Scrub osds | |
3 | """ | |
4 | import contextlib | |
5 | import gevent | |
6 | import logging | |
7 | import random | |
8 | import time | |
9 | ||
9f95a23c | 10 | import tasks.ceph_manager |
7c673cae FG |
11 | from teuthology import misc as teuthology |
12 | ||
13 | log = logging.getLogger(__name__) | |
14 | ||
15 | @contextlib.contextmanager | |
16 | def task(ctx, config): | |
17 | """ | |
18 | Run scrub periodically. Randomly chooses an OSD to scrub. | |
19 | ||
20 | The config should be as follows: | |
21 | ||
22 | scrub: | |
23 | frequency: <seconds between scrubs> | |
24 | deep: <bool for deepness> | |
25 | ||
26 | example: | |
27 | ||
28 | tasks: | |
29 | - ceph: | |
30 | - scrub: | |
31 | frequency: 30 | |
32 | deep: 0 | |
33 | """ | |
34 | if config is None: | |
35 | config = {} | |
36 | assert isinstance(config, dict), \ | |
37 | 'scrub task only accepts a dict for configuration' | |
38 | ||
39 | log.info('Beginning scrub...') | |
40 | ||
41 | first_mon = teuthology.get_first_mon(ctx, config) | |
9f95a23c | 42 | (mon,) = ctx.cluster.only(first_mon).remotes.keys() |
7c673cae FG |
43 | |
44 | manager = ceph_manager.CephManager( | |
45 | mon, | |
46 | ctx=ctx, | |
47 | logger=log.getChild('ceph_manager'), | |
48 | ) | |
49 | ||
50 | num_osds = teuthology.num_instances_of_type(ctx.cluster, 'osd') | |
51 | while len(manager.get_osd_status()['up']) < num_osds: | |
52 | time.sleep(10) | |
53 | ||
54 | scrub_proc = Scrubber( | |
55 | manager, | |
56 | config, | |
57 | ) | |
58 | try: | |
59 | yield | |
60 | finally: | |
61 | log.info('joining scrub') | |
62 | scrub_proc.do_join() | |
63 | ||
64 | class Scrubber: | |
65 | """ | |
11fdf7f2 | 66 | Scrubbing is actually performed during initialization |
7c673cae FG |
67 | """ |
68 | def __init__(self, manager, config): | |
69 | """ | |
70 | Spawn scrubbing thread upon completion. | |
71 | """ | |
72 | self.ceph_manager = manager | |
73 | self.ceph_manager.wait_for_clean() | |
74 | ||
75 | osd_status = self.ceph_manager.get_osd_status() | |
76 | self.osds = osd_status['up'] | |
77 | ||
78 | self.config = config | |
79 | if self.config is None: | |
80 | self.config = dict() | |
81 | ||
82 | else: | |
83 | def tmp(x): | |
84 | """Local display""" | |
9f95a23c | 85 | print(x) |
7c673cae FG |
86 | self.log = tmp |
87 | ||
88 | self.stopping = False | |
89 | ||
90 | log.info("spawning thread") | |
91 | ||
92 | self.thread = gevent.spawn(self.do_scrub) | |
93 | ||
94 | def do_join(self): | |
95 | """Scrubbing thread finished""" | |
96 | self.stopping = True | |
97 | self.thread.get() | |
98 | ||
99 | def do_scrub(self): | |
100 | """Perform the scrub operation""" | |
101 | frequency = self.config.get("frequency", 30) | |
102 | deep = self.config.get("deep", 0) | |
103 | ||
104 | log.info("stopping %s" % self.stopping) | |
105 | ||
106 | while not self.stopping: | |
107 | osd = str(random.choice(self.osds)) | |
108 | ||
109 | if deep: | |
110 | cmd = 'deep-scrub' | |
111 | else: | |
112 | cmd = 'scrub' | |
113 | ||
114 | log.info('%sbing %s' % (cmd, osd)) | |
115 | self.ceph_manager.raw_cluster_cmd('osd', cmd, osd) | |
116 | ||
117 | time.sleep(frequency) |