]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/workunit.py
update sources to v12.1.1
[ceph.git] / ceph / qa / tasks / workunit.py
CommitLineData
7c673cae
FG
1"""
2Workunit task -- Run ceph on sets of specific clients
3"""
4import logging
5import pipes
6import os
224ce89b 7import re
7c673cae
FG
8
9from copy import deepcopy
10from util import get_remote_for_role
11
12from teuthology import misc
13from teuthology.config import config as teuth_config
14from teuthology.orchestra.run import CommandFailedError
15from teuthology.parallel import parallel
16from teuthology.orchestra import run
17
18log = logging.getLogger(__name__)
19
20
21class Refspec:
22 def __init__(self, refspec):
23 self.refspec = refspec
24
25 def __str__(self):
26 return self.refspec
27
28 def _clone(self, git_url, clonedir, opts=None):
29 if opts is None:
30 opts = []
31 return (['rm', '-rf', clonedir] +
32 [run.Raw('&&')] +
33 ['git', 'clone'] + opts +
34 [git_url, clonedir])
35
36 def _cd(self, clonedir):
37 return ['cd', clonedir]
38
39 def _checkout(self):
40 return ['git', 'checkout', self.refspec]
41
42 def clone(self, git_url, clonedir):
43 return (self._clone(git_url, clonedir) +
44 [run.Raw('&&')] +
45 self._cd(clonedir) +
46 [run.Raw('&&')] +
47 self._checkout())
48
49
50class Branch(Refspec):
51 def __init__(self, tag):
52 Refspec.__init__(self, tag)
53
54 def clone(self, git_url, clonedir):
55 opts = ['--depth', '1',
56 '--branch', self.refspec]
57 return (self._clone(git_url, clonedir, opts) +
58 [run.Raw('&&')] +
59 self._cd(clonedir))
60
61
62class Head(Refspec):
63 def __init__(self):
64 Refspec.__init__(self, 'HEAD')
65
66 def clone(self, git_url, clonedir):
67 opts = ['--depth', '1']
68 return (self._clone(git_url, clonedir, opts) +
69 [run.Raw('&&')] +
70 self._cd(clonedir))
71
72
73def task(ctx, config):
74 """
75 Run ceph on all workunits found under the specified path.
76
77 For example::
78
79 tasks:
80 - ceph:
81 - ceph-fuse: [client.0]
82 - workunit:
83 clients:
84 client.0: [direct_io, xattrs.sh]
85 client.1: [snaps]
86 branch: foo
87
88 You can also run a list of workunits on all clients:
89 tasks:
90 - ceph:
91 - ceph-fuse:
92 - workunit:
93 tag: v0.47
94 clients:
95 all: [direct_io, xattrs.sh, snaps]
96
97 If you have an "all" section it will run all the workunits
98 on each client simultaneously, AFTER running any workunits specified
99 for individual clients. (This prevents unintended simultaneous runs.)
100
101 To customize tests, you can specify environment variables as a dict. You
102 can also specify a time limit for each work unit (defaults to 3h):
103
104 tasks:
105 - ceph:
106 - ceph-fuse:
107 - workunit:
108 sha1: 9b28948635b17165d17c1cf83d4a870bd138ddf6
109 clients:
110 all: [snaps]
111 env:
112 FOO: bar
113 BAZ: quux
114 timeout: 3h
115
116 This task supports roles that include a ceph cluster, e.g.::
117
118 tasks:
119 - ceph:
120 - workunit:
121 clients:
122 backup.client.0: [foo]
123 client.1: [bar] # cluster is implicitly 'ceph'
124
125 :param ctx: Context
126 :param config: Configuration
127 """
128 assert isinstance(config, dict)
129 assert isinstance(config.get('clients'), dict), \
130 'configuration must contain a dictionary of clients'
131
132 # mimic the behavior of the "install" task, where the "overrides" are
133 # actually the defaults of that task. in other words, if none of "sha1",
134 # "tag", or "branch" is specified by a "workunit" tasks, we will update
135 # it with the information in the "workunit" sub-task nested in "overrides".
136 overrides = deepcopy(ctx.config.get('overrides', {}).get('workunit', {}))
137 refspecs = {'branch': Branch, 'tag': Refspec, 'sha1': Refspec}
138 if any(map(lambda i: i in config, refspecs.iterkeys())):
139 for i in refspecs.iterkeys():
140 overrides.pop(i, None)
141 misc.deep_merge(config, overrides)
142
143 for spec, cls in refspecs.iteritems():
144 refspec = config.get(spec)
145 if refspec:
146 refspec = cls(refspec)
147 break
148 if refspec is None:
149 refspec = Head()
150
151 timeout = config.get('timeout', '3h')
152
153 log.info('Pulling workunits from ref %s', refspec)
154
155 created_mountpoint = {}
156
157 if config.get('env') is not None:
158 assert isinstance(config['env'], dict), 'env must be a dictionary'
159 clients = config['clients']
160
161 # Create scratch dirs for any non-all workunits
162 log.info('Making a separate scratch dir for every client...')
163 for role in clients.iterkeys():
164 assert isinstance(role, basestring)
165 if role == "all":
166 continue
167
168 assert 'client' in role
169 created_mnt_dir = _make_scratch_dir(ctx, role, config.get('subdir'))
170 created_mountpoint[role] = created_mnt_dir
171
172 # Execute any non-all workunits
173 with parallel() as p:
174 for role, tests in clients.iteritems():
175 if role != "all":
176 p.spawn(_run_tests, ctx, refspec, role, tests,
177 config.get('env'), timeout=timeout)
178
179 # Clean up dirs from any non-all workunits
180 for role, created in created_mountpoint.items():
181 _delete_dir(ctx, role, created)
182
183 # Execute any 'all' workunits
184 if 'all' in clients:
185 all_tasks = clients["all"]
186 _spawn_on_all_clients(ctx, refspec, all_tasks, config.get('env'),
187 config.get('subdir'), timeout=timeout)
188
189
190def _client_mountpoint(ctx, cluster, id_):
191 """
192 Returns the path to the expected mountpoint for workunits running
193 on some kind of filesystem.
194 """
195 # for compatibility with tasks like ceph-fuse that aren't cluster-aware yet,
196 # only include the cluster name in the dir if the cluster is not 'ceph'
197 if cluster == 'ceph':
198 dir_ = 'mnt.{0}'.format(id_)
199 else:
200 dir_ = 'mnt.{0}.{1}'.format(cluster, id_)
201 return os.path.join(misc.get_testdir(ctx), dir_)
202
203
204def _delete_dir(ctx, role, created_mountpoint):
205 """
206 Delete file used by this role, and delete the directory that this
207 role appeared in.
208
209 :param ctx: Context
210 :param role: "role.#" where # is used for the role id.
211 """
212 cluster, _, id_ = misc.split_role(role)
213 remote = get_remote_for_role(ctx, role)
214 mnt = _client_mountpoint(ctx, cluster, id_)
215 client = os.path.join(mnt, 'client.{id}'.format(id=id_))
216
217 # Remove the directory inside the mount where the workunit ran
218 remote.run(
219 args=[
220 'sudo',
221 'rm',
222 '-rf',
223 '--',
224 client,
225 ],
226 )
227 log.info("Deleted dir {dir}".format(dir=client))
228
229 # If the mount was an artificially created dir, delete that too
230 if created_mountpoint:
231 remote.run(
232 args=[
233 'rmdir',
234 '--',
235 mnt,
236 ],
237 )
238 log.info("Deleted artificial mount point {dir}".format(dir=client))
239
240
241def _make_scratch_dir(ctx, role, subdir):
242 """
243 Make scratch directories for this role. This also makes the mount
244 point if that directory does not exist.
245
246 :param ctx: Context
247 :param role: "role.#" where # is used for the role id.
248 :param subdir: use this subdir (False if not used)
249 """
250 created_mountpoint = False
251 cluster, _, id_ = misc.split_role(role)
252 remote = get_remote_for_role(ctx, role)
253 dir_owner = remote.user
254 mnt = _client_mountpoint(ctx, cluster, id_)
255 # if neither kclient nor ceph-fuse are required for a workunit,
256 # mnt may not exist. Stat and create the directory if it doesn't.
257 try:
258 remote.run(
259 args=[
260 'stat',
261 '--',
262 mnt,
263 ],
264 )
265 log.info('Did not need to create dir {dir}'.format(dir=mnt))
266 except CommandFailedError:
267 remote.run(
268 args=[
269 'mkdir',
270 '--',
271 mnt,
272 ],
273 )
274 log.info('Created dir {dir}'.format(dir=mnt))
275 created_mountpoint = True
276
277 if not subdir:
278 subdir = 'client.{id}'.format(id=id_)
279
280 if created_mountpoint:
281 remote.run(
282 args=[
283 'cd',
284 '--',
285 mnt,
286 run.Raw('&&'),
287 'mkdir',
288 '--',
289 subdir,
290 ],
291 )
292 else:
293 remote.run(
294 args=[
295 # cd first so this will fail if the mount point does
296 # not exist; pure install -d will silently do the
297 # wrong thing
298 'cd',
299 '--',
300 mnt,
301 run.Raw('&&'),
302 'sudo',
303 'install',
304 '-d',
305 '-m', '0755',
306 '--owner={user}'.format(user=dir_owner),
307 '--',
308 subdir,
309 ],
310 )
311
312 return created_mountpoint
313
314
315def _spawn_on_all_clients(ctx, refspec, tests, env, subdir, timeout=None):
316 """
317 Make a scratch directory for each client in the cluster, and then for each
318 test spawn _run_tests() for each role.
319
320 See run_tests() for parameter documentation.
321 """
322 is_client = misc.is_type('client')
323 client_remotes = {}
324 created_mountpoint = {}
325 for remote, roles_for_host in ctx.cluster.remotes.items():
326 for role in roles_for_host:
327 if is_client(role):
328 client_remotes[role] = remote
329 created_mountpoint[role] = _make_scratch_dir(ctx, role, subdir)
330
331 for unit in tests:
332 with parallel() as p:
333 for role, remote in client_remotes.items():
334 p.spawn(_run_tests, ctx, refspec, role, [unit], env, subdir,
335 timeout=timeout)
336
337 # cleanup the generated client directories
338 for role, _ in client_remotes.items():
339 _delete_dir(ctx, role, created_mountpoint[role])
340
341
342def _run_tests(ctx, refspec, role, tests, env, subdir=None, timeout=None):
343 """
344 Run the individual test. Create a scratch directory and then extract the
345 workunits from git. Make the executables, and then run the tests.
346 Clean up (remove files created) after the tests are finished.
347
348 :param ctx: Context
349 :param refspec: branch, sha1, or version tag used to identify this
350 build
351 :param tests: specific tests specified.
352 :param env: environment set in yaml file. Could be None.
353 :param subdir: subdirectory set in yaml file. Could be None
354 :param timeout: If present, use the 'timeout' command on the remote host
355 to limit execution time. Must be specified by a number
356 followed by 's' for seconds, 'm' for minutes, 'h' for
357 hours, or 'd' for days. If '0' or anything that evaluates
358 to False is passed, the 'timeout' command is not used.
359 """
360 testdir = misc.get_testdir(ctx)
361 assert isinstance(role, basestring)
362 cluster, type_, id_ = misc.split_role(role)
363 assert type_ == 'client'
364 remote = get_remote_for_role(ctx, role)
365 mnt = _client_mountpoint(ctx, cluster, id_)
366 # subdir so we can remove and recreate this a lot without sudo
367 if subdir is None:
368 scratch_tmp = os.path.join(mnt, 'client.{id}'.format(id=id_), 'tmp')
369 else:
370 scratch_tmp = os.path.join(mnt, subdir)
371 clonedir = '{tdir}/clone.{role}'.format(tdir=testdir, role=role)
372 srcdir = '{cdir}/qa/workunits'.format(cdir=clonedir)
373
374 git_url = teuth_config.get_ceph_qa_suite_git_url()
375 # if we are running an upgrade test, and ceph-ci does not have branches like
376 # `jewel`, so should use ceph.git as an alternative.
377 try:
378 remote.run(logger=log.getChild(role),
379 args=refspec.clone(git_url, clonedir))
380 except CommandFailedError:
224ce89b
WB
381 if git_url.endswith('/ceph-ci.git'):
382 alt_git_url = git_url.replace('/ceph-ci.git', '/ceph.git')
383 elif git_url.endswith('/ceph-ci'):
384 alt_git_url = re.sub(r'/ceph-ci$', '/ceph.git', git_url)
385 else:
7c673cae 386 raise
7c673cae
FG
387 log.info(
388 "failed to check out '%s' from %s; will also try in %s",
389 refspec,
390 git_url,
391 alt_git_url,
392 )
393 remote.run(logger=log.getChild(role),
394 args=refspec.clone(alt_git_url, clonedir))
395 remote.run(
396 logger=log.getChild(role),
397 args=[
398 'cd', '--', srcdir,
399 run.Raw('&&'),
400 'if', 'test', '-e', 'Makefile', run.Raw(';'), 'then', 'make', run.Raw(';'), 'fi',
401 run.Raw('&&'),
402 'find', '-executable', '-type', 'f', '-printf', r'%P\0'.format(srcdir=srcdir),
403 run.Raw('>{tdir}/workunits.list.{role}'.format(tdir=testdir, role=role)),
404 ],
405 )
406
407 workunits_file = '{tdir}/workunits.list.{role}'.format(tdir=testdir, role=role)
408 workunits = sorted(misc.get_file(remote, workunits_file).split('\0'))
409 assert workunits
410
411 try:
412 assert isinstance(tests, list)
413 for spec in tests:
414 log.info('Running workunits matching %s on %s...', spec, role)
415 prefix = '{spec}/'.format(spec=spec)
416 to_run = [w for w in workunits if w == spec or w.startswith(prefix)]
417 if not to_run:
418 raise RuntimeError('Spec did not match any workunits: {spec!r}'.format(spec=spec))
419 for workunit in to_run:
420 log.info('Running workunit %s...', workunit)
421 args = [
422 'mkdir', '-p', '--', scratch_tmp,
423 run.Raw('&&'),
424 'cd', '--', scratch_tmp,
425 run.Raw('&&'),
426 run.Raw('CEPH_CLI_TEST_DUP_COMMAND=1'),
427 run.Raw('CEPH_REF={ref}'.format(ref=refspec)),
428 run.Raw('TESTDIR="{tdir}"'.format(tdir=testdir)),
429 run.Raw('CEPH_ARGS="--cluster {0}"'.format(cluster)),
430 run.Raw('CEPH_ID="{id}"'.format(id=id_)),
431 run.Raw('PATH=$PATH:/usr/sbin'),
432 run.Raw('CEPH_BASE={dir}'.format(dir=clonedir)),
433 ]
434 if env is not None:
435 for var, val in env.iteritems():
436 quoted_val = pipes.quote(val)
437 env_arg = '{var}={val}'.format(var=var, val=quoted_val)
438 args.append(run.Raw(env_arg))
439 args.extend([
440 'adjust-ulimits',
441 'ceph-coverage',
442 '{tdir}/archive/coverage'.format(tdir=testdir)])
443 if timeout and timeout != '0':
444 args.extend(['timeout', timeout])
445 args.extend([
446 '{srcdir}/{workunit}'.format(
447 srcdir=srcdir,
448 workunit=workunit,
449 ),
450 ])
451 remote.run(
452 logger=log.getChild(role),
453 args=args,
454 label="workunit test {workunit}".format(workunit=workunit)
455 )
456 remote.run(
457 logger=log.getChild(role),
458 args=['sudo', 'rm', '-rf', '--', scratch_tmp],
459 )
460 finally:
461 log.info('Stopping %s on %s...', tests, role)
462 remote.run(
463 logger=log.getChild(role),
464 args=[
465 'rm', '-rf', '--', workunits_file, clonedir,
466 ],
467 )