]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/fwd_scrub.py
2 Thrash mds by simulating failures
7 from gevent
import sleep
, GreenletExit
8 from gevent
.greenlet
import Greenlet
9 from gevent
.event
import Event
10 from teuthology
import misc
as teuthology
12 from tasks
import ceph_manager
13 from tasks
.cephfs
.filesystem
import MDSCluster
, Filesystem
14 from tasks
.thrasher
import Thrasher
16 log
= logging
.getLogger(__name__
)
18 class ForwardScrubber(Thrasher
, Greenlet
):
22 The ForwardScrubber does forward scrubbing of file-systems during execution
23 of other tasks (workunits, etc).
26 def __init__(self
, fs
, scrub_timeout
=300, sleep_between_iterations
=1):
27 super(ForwardScrubber
, self
).__init
__()
29 self
.logger
= log
.getChild('fs.[{f}]'.format(f
=fs
.name
))
31 self
.name
= 'thrasher.fs.[{f}]'.format(f
=fs
.name
)
32 self
.stopping
= Event()
33 self
.scrub_timeout
= scrub_timeout
34 self
.sleep_between_iterations
= sleep_between_iterations
39 except Exception as e
:
40 self
.set_thrasher_exception(e
)
41 self
.logger
.exception("exception:")
42 # allow successful completion so gevent doesn't see an exception...
49 Perform the file-system scrubbing
51 self
.logger
.info(f
'start scrubbing fs: {self.fs.name}')
54 while not self
.stopping
.is_set():
56 sleep(self
.sleep_between_iterations
)
60 self
.logger
.info(f
'end scrubbing fs: {self.fs.name}')
62 def _scrub(self
, path
="/", recursive
=True):
63 self
.logger
.info(f
"scrubbing fs: {self.fs.name}")
66 scrubopts
.append("recursive")
67 out_json
= self
.fs
.run_scrub(["start", path
, ",".join(scrubopts
)])
68 assert out_json
is not None
70 tag
= out_json
['scrub_tag']
72 assert tag
is not None
73 assert out_json
['return_code'] == 0
74 assert out_json
['mode'] == 'asynchronous'
76 return self
.fs
.wait_until_scrub_complete(tag
=tag
, sleep
=30,
77 timeout
=self
.scrub_timeout
)
79 def stop_all_fwd_scrubbers(thrashers
):
80 for thrasher
in thrashers
:
81 if not isinstance(thrasher
, ForwardScrubber
):
85 if thrasher
.exception
is not None:
86 raise RuntimeError(f
"error during scrub thrashing: {thrasher.exception}")
89 @contextlib.contextmanager
90 def task(ctx
, config
):
92 Stress test the mds by running scrub iterations while another task/workunit
98 sleep_between_iterations: 1
101 mds_cluster
= MDSCluster(ctx
)
105 assert isinstance(config
, dict), \
106 'fwd_scrub task only accepts a dict for configuration'
107 mdslist
= list(teuthology
.all_roles_of_type(ctx
.cluster
, 'mds'))
108 assert len(mdslist
) > 0, \
109 'fwd_scrub task requires at least 1 metadata server'
111 (first
,) = ctx
.cluster
.only(f
'mds.{mdslist[0]}').remotes
.keys()
112 manager
= ceph_manager
.CephManager(
113 first
, ctx
=ctx
, logger
=log
.getChild('ceph_manager'),
116 # make sure everyone is in active, standby, or standby-replay
117 log
.info('Wait for all MDSs to reach steady state...')
118 status
= mds_cluster
.status()
121 for info
in status
.get_all():
122 state
= info
['state']
123 if state
not in ('up:active', 'up:standby', 'up:standby-replay'):
129 status
= mds_cluster
.status()
131 log
.info('Ready to start scrub thrashing')
133 manager
.wait_for_clean()
134 assert manager
.is_clean()
136 if 'cluster' not in config
:
137 config
['cluster'] = 'ceph'
139 for fs
in status
.get_filesystems():
140 fwd_scrubber
= ForwardScrubber(Filesystem(ctx
, fscid
=fs
['id']),
141 config
['scrub_timeout'],
142 config
['sleep_between_iterations'])
144 ctx
.ceph
[config
['cluster']].thrashers
.append(fwd_scrubber
)
147 log
.debug('Yielding')
150 log
.info('joining ForwardScrubbers')
151 stop_all_fwd_scrubbers(ctx
.ceph
[config
['cluster']].thrashers
)
152 log
.info('done joining')