]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/fwd_scrub.py
a7ae1c4ed74a485e18ac03df292f882979dd7808
2 Thrash mds by simulating failures
7 from gevent
import sleep
, GreenletExit
8 from gevent
.greenlet
import Greenlet
9 from gevent
.event
import Event
10 from teuthology
import misc
as teuthology
12 from tasks
import ceph_manager
13 from tasks
.cephfs
.filesystem
import MDSCluster
, Filesystem
14 from tasks
.thrasher
import Thrasher
16 log
= logging
.getLogger(__name__
)
18 class ForwardScrubber(Thrasher
, Greenlet
):
22 The ForwardScrubber does forward scrubbing of file-systems during execution
23 of other tasks (workunits, etc).
26 def __init__(self
, fs
, scrub_timeout
=300, sleep_between_iterations
=1):
27 super(ForwardScrubber
, self
).__init
__()
29 self
.logger
= log
.getChild('fs.[{f}]'.format(f
=fs
.name
))
31 self
.name
= 'thrasher.fs.[{f}]'.format(f
=fs
.name
)
32 self
.stopping
= Event()
33 self
.scrub_timeout
= scrub_timeout
34 self
.sleep_between_iterations
= sleep_between_iterations
39 except Exception as e
:
40 self
.set_thrasher_exception(e
)
41 self
.logger
.exception("exception:")
42 # allow successful completion so gevent doesn't see an exception...
49 Perform the file-system scrubbing
51 self
.logger
.info(f
'start scrubbing fs: {self.fs.name}')
54 while not self
.stopping
.is_set():
56 sleep(self
.sleep_between_iterations
)
60 self
.logger
.info(f
'end scrubbing fs: {self.fs.name}')
62 def _scrub(self
, path
="/", recursive
=True):
63 self
.logger
.info(f
"scrubbing fs: {self.fs.name}")
64 recopt
= ["recursive", "force"] if recursive
else ["force"]
65 out_json
= self
.fs
.run_scrub(["start", path
] + recopt
)
66 assert out_json
is not None
68 tag
= out_json
['scrub_tag']
70 assert tag
is not None
71 assert out_json
['return_code'] == 0
72 assert out_json
['mode'] == 'asynchronous'
74 return self
.fs
.wait_until_scrub_complete(tag
=tag
, sleep
=30,
75 timeout
=self
.scrub_timeout
)
77 def stop_all_fwd_scrubbers(thrashers
):
78 for thrasher
in thrashers
:
79 if not isinstance(thrasher
, ForwardScrubber
):
83 if thrasher
.exception
is not None:
84 raise RuntimeError(f
"error during scrub thrashing: {thrasher.exception}")
87 @contextlib.contextmanager
88 def task(ctx
, config
):
90 Stress test the mds by running scrub iterations while another task/workunit
96 sleep_between_iterations: 1
99 mds_cluster
= MDSCluster(ctx
)
103 assert isinstance(config
, dict), \
104 'fwd_scrub task only accepts a dict for configuration'
105 mdslist
= list(teuthology
.all_roles_of_type(ctx
.cluster
, 'mds'))
106 assert len(mdslist
) > 0, \
107 'fwd_scrub task requires at least 1 metadata server'
109 (first
,) = ctx
.cluster
.only(f
'mds.{mdslist[0]}').remotes
.keys()
110 manager
= ceph_manager
.CephManager(
111 first
, ctx
=ctx
, logger
=log
.getChild('ceph_manager'),
114 # make sure everyone is in active, standby, or standby-replay
115 log
.info('Wait for all MDSs to reach steady state...')
116 status
= mds_cluster
.status()
119 for info
in status
.get_all():
120 state
= info
['state']
121 if state
not in ('up:active', 'up:standby', 'up:standby-replay'):
127 status
= mds_cluster
.status()
129 log
.info('Ready to start scrub thrashing')
131 manager
.wait_for_clean()
132 assert manager
.is_clean()
134 if 'cluster' not in config
:
135 config
['cluster'] = 'ceph'
137 for fs
in status
.get_filesystems():
138 fwd_scrubber
= ForwardScrubber(Filesystem(ctx
, fscid
=fs
['id']),
139 config
['scrub_timeout'],
140 config
['sleep_between_iterations'])
142 ctx
.ceph
[config
['cluster']].thrashers
.append(fwd_scrubber
)
145 log
.debug('Yielding')
148 log
.info('joining ForwardScrubbers')
149 stop_all_fwd_scrubbers(ctx
.ceph
[config
['cluster']].thrashers
)
150 log
.info('done joining')