]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/workunit.py
update sources to v12.1.2
[ceph.git] / ceph / qa / tasks / workunit.py
1 """
2 Workunit task -- Run ceph on sets of specific clients
3 """
4 import logging
5 import pipes
6 import os
7 import re
8
9 from copy import deepcopy
10 from util import get_remote_for_role
11
12 from teuthology import misc
13 from teuthology.config import config as teuth_config
14 from teuthology.orchestra.run import CommandFailedError
15 from teuthology.parallel import parallel
16 from teuthology.orchestra import run
17
18 log = logging.getLogger(__name__)
19
20
21 class Refspec:
22 def __init__(self, refspec):
23 self.refspec = refspec
24
25 def __str__(self):
26 return self.refspec
27
28 def _clone(self, git_url, clonedir, opts=None):
29 if opts is None:
30 opts = []
31 return (['rm', '-rf', clonedir] +
32 [run.Raw('&&')] +
33 ['git', 'clone'] + opts +
34 [git_url, clonedir])
35
36 def _cd(self, clonedir):
37 return ['cd', clonedir]
38
39 def _checkout(self):
40 return ['git', 'checkout', self.refspec]
41
42 def clone(self, git_url, clonedir):
43 return (self._clone(git_url, clonedir) +
44 [run.Raw('&&')] +
45 self._cd(clonedir) +
46 [run.Raw('&&')] +
47 self._checkout())
48
49
50 class Branch(Refspec):
51 def __init__(self, tag):
52 Refspec.__init__(self, tag)
53
54 def clone(self, git_url, clonedir):
55 opts = ['--depth', '1',
56 '--branch', self.refspec]
57 return (self._clone(git_url, clonedir, opts) +
58 [run.Raw('&&')] +
59 self._cd(clonedir))
60
61
62 class Head(Refspec):
63 def __init__(self):
64 Refspec.__init__(self, 'HEAD')
65
66 def clone(self, git_url, clonedir):
67 opts = ['--depth', '1']
68 return (self._clone(git_url, clonedir, opts) +
69 [run.Raw('&&')] +
70 self._cd(clonedir))
71
72
73 def task(ctx, config):
74 """
75 Run ceph on all workunits found under the specified path.
76
77 For example::
78
79 tasks:
80 - ceph:
81 - ceph-fuse: [client.0]
82 - workunit:
83 clients:
84 client.0: [direct_io, xattrs.sh]
85 client.1: [snaps]
86 branch: foo
87
88 You can also run a list of workunits on all clients:
89 tasks:
90 - ceph:
91 - ceph-fuse:
92 - workunit:
93 tag: v0.47
94 clients:
95 all: [direct_io, xattrs.sh, snaps]
96
97 If you have an "all" section it will run all the workunits
98 on each client simultaneously, AFTER running any workunits specified
99 for individual clients. (This prevents unintended simultaneous runs.)
100
101 To customize tests, you can specify environment variables as a dict. You
102 can also specify a time limit for each work unit (defaults to 3h):
103
104 tasks:
105 - ceph:
106 - ceph-fuse:
107 - workunit:
108 sha1: 9b28948635b17165d17c1cf83d4a870bd138ddf6
109 clients:
110 all: [snaps]
111 env:
112 FOO: bar
113 BAZ: quux
114 timeout: 3h
115
116 This task supports roles that include a ceph cluster, e.g.::
117
118 tasks:
119 - ceph:
120 - workunit:
121 clients:
122 backup.client.0: [foo]
123 client.1: [bar] # cluster is implicitly 'ceph'
124
125 You can also specify an alternative top-level dir to 'qa/workunits', like
126 'qa/standalone', with::
127
128 tasks:
129 - install:
130 - workunit:
131 basedir: qa/standalone
132 clients:
133 client.0:
134 - test-ceph-helpers.sh
135
136 :param ctx: Context
137 :param config: Configuration
138 """
139 assert isinstance(config, dict)
140 assert isinstance(config.get('clients'), dict), \
141 'configuration must contain a dictionary of clients'
142
143 # mimic the behavior of the "install" task, where the "overrides" are
144 # actually the defaults of that task. in other words, if none of "sha1",
145 # "tag", or "branch" is specified by a "workunit" tasks, we will update
146 # it with the information in the "workunit" sub-task nested in "overrides".
147 overrides = deepcopy(ctx.config.get('overrides', {}).get('workunit', {}))
148 refspecs = {'branch': Branch, 'tag': Refspec, 'sha1': Refspec}
149 if any(map(lambda i: i in config, refspecs.iterkeys())):
150 for i in refspecs.iterkeys():
151 overrides.pop(i, None)
152 misc.deep_merge(config, overrides)
153
154 for spec, cls in refspecs.iteritems():
155 refspec = config.get(spec)
156 if refspec:
157 refspec = cls(refspec)
158 break
159 if refspec is None:
160 refspec = Head()
161
162 timeout = config.get('timeout', '3h')
163
164 log.info('Pulling workunits from ref %s', refspec)
165
166 created_mountpoint = {}
167
168 if config.get('env') is not None:
169 assert isinstance(config['env'], dict), 'env must be a dictionary'
170 clients = config['clients']
171
172 # Create scratch dirs for any non-all workunits
173 log.info('Making a separate scratch dir for every client...')
174 for role in clients.iterkeys():
175 assert isinstance(role, basestring)
176 if role == "all":
177 continue
178
179 assert 'client' in role
180 created_mnt_dir = _make_scratch_dir(ctx, role, config.get('subdir'))
181 created_mountpoint[role] = created_mnt_dir
182
183 # Execute any non-all workunits
184 with parallel() as p:
185 for role, tests in clients.iteritems():
186 if role != "all":
187 p.spawn(_run_tests, ctx, refspec, role, tests,
188 config.get('env'),
189 basedir=config.get('basedir','qa/workunits'),
190 timeout=timeout)
191
192 # Clean up dirs from any non-all workunits
193 for role, created in created_mountpoint.items():
194 _delete_dir(ctx, role, created)
195
196 # Execute any 'all' workunits
197 if 'all' in clients:
198 all_tasks = clients["all"]
199 _spawn_on_all_clients(ctx, refspec, all_tasks, config.get('env'),
200 config.get('basedir', 'qa/workunits'),
201 config.get('subdir'), timeout=timeout)
202
203
204 def _client_mountpoint(ctx, cluster, id_):
205 """
206 Returns the path to the expected mountpoint for workunits running
207 on some kind of filesystem.
208 """
209 # for compatibility with tasks like ceph-fuse that aren't cluster-aware yet,
210 # only include the cluster name in the dir if the cluster is not 'ceph'
211 if cluster == 'ceph':
212 dir_ = 'mnt.{0}'.format(id_)
213 else:
214 dir_ = 'mnt.{0}.{1}'.format(cluster, id_)
215 return os.path.join(misc.get_testdir(ctx), dir_)
216
217
218 def _delete_dir(ctx, role, created_mountpoint):
219 """
220 Delete file used by this role, and delete the directory that this
221 role appeared in.
222
223 :param ctx: Context
224 :param role: "role.#" where # is used for the role id.
225 """
226 cluster, _, id_ = misc.split_role(role)
227 remote = get_remote_for_role(ctx, role)
228 mnt = _client_mountpoint(ctx, cluster, id_)
229 client = os.path.join(mnt, 'client.{id}'.format(id=id_))
230
231 # Remove the directory inside the mount where the workunit ran
232 remote.run(
233 args=[
234 'sudo',
235 'rm',
236 '-rf',
237 '--',
238 client,
239 ],
240 )
241 log.info("Deleted dir {dir}".format(dir=client))
242
243 # If the mount was an artificially created dir, delete that too
244 if created_mountpoint:
245 remote.run(
246 args=[
247 'rmdir',
248 '--',
249 mnt,
250 ],
251 )
252 log.info("Deleted artificial mount point {dir}".format(dir=client))
253
254
255 def _make_scratch_dir(ctx, role, subdir):
256 """
257 Make scratch directories for this role. This also makes the mount
258 point if that directory does not exist.
259
260 :param ctx: Context
261 :param role: "role.#" where # is used for the role id.
262 :param subdir: use this subdir (False if not used)
263 """
264 created_mountpoint = False
265 cluster, _, id_ = misc.split_role(role)
266 remote = get_remote_for_role(ctx, role)
267 dir_owner = remote.user
268 mnt = _client_mountpoint(ctx, cluster, id_)
269 # if neither kclient nor ceph-fuse are required for a workunit,
270 # mnt may not exist. Stat and create the directory if it doesn't.
271 try:
272 remote.run(
273 args=[
274 'stat',
275 '--',
276 mnt,
277 ],
278 )
279 log.info('Did not need to create dir {dir}'.format(dir=mnt))
280 except CommandFailedError:
281 remote.run(
282 args=[
283 'mkdir',
284 '--',
285 mnt,
286 ],
287 )
288 log.info('Created dir {dir}'.format(dir=mnt))
289 created_mountpoint = True
290
291 if not subdir:
292 subdir = 'client.{id}'.format(id=id_)
293
294 if created_mountpoint:
295 remote.run(
296 args=[
297 'cd',
298 '--',
299 mnt,
300 run.Raw('&&'),
301 'mkdir',
302 '--',
303 subdir,
304 ],
305 )
306 else:
307 remote.run(
308 args=[
309 # cd first so this will fail if the mount point does
310 # not exist; pure install -d will silently do the
311 # wrong thing
312 'cd',
313 '--',
314 mnt,
315 run.Raw('&&'),
316 'sudo',
317 'install',
318 '-d',
319 '-m', '0755',
320 '--owner={user}'.format(user=dir_owner),
321 '--',
322 subdir,
323 ],
324 )
325
326 return created_mountpoint
327
328
329 def _spawn_on_all_clients(ctx, refspec, tests, env, basedir, subdir, timeout=None):
330 """
331 Make a scratch directory for each client in the cluster, and then for each
332 test spawn _run_tests() for each role.
333
334 See run_tests() for parameter documentation.
335 """
336 is_client = misc.is_type('client')
337 client_remotes = {}
338 created_mountpoint = {}
339 for remote, roles_for_host in ctx.cluster.remotes.items():
340 for role in roles_for_host:
341 if is_client(role):
342 client_remotes[role] = remote
343 created_mountpoint[role] = _make_scratch_dir(ctx, role, subdir)
344
345 for unit in tests:
346 with parallel() as p:
347 for role, remote in client_remotes.items():
348 p.spawn(_run_tests, ctx, refspec, role, [unit], env,
349 basedir,
350 subdir,
351 timeout=timeout)
352
353 # cleanup the generated client directories
354 for role, _ in client_remotes.items():
355 _delete_dir(ctx, role, created_mountpoint[role])
356
357
358 def _run_tests(ctx, refspec, role, tests, env, basedir,
359 subdir=None, timeout=None):
360 """
361 Run the individual test. Create a scratch directory and then extract the
362 workunits from git. Make the executables, and then run the tests.
363 Clean up (remove files created) after the tests are finished.
364
365 :param ctx: Context
366 :param refspec: branch, sha1, or version tag used to identify this
367 build
368 :param tests: specific tests specified.
369 :param env: environment set in yaml file. Could be None.
370 :param subdir: subdirectory set in yaml file. Could be None
371 :param timeout: If present, use the 'timeout' command on the remote host
372 to limit execution time. Must be specified by a number
373 followed by 's' for seconds, 'm' for minutes, 'h' for
374 hours, or 'd' for days. If '0' or anything that evaluates
375 to False is passed, the 'timeout' command is not used.
376 """
377 testdir = misc.get_testdir(ctx)
378 assert isinstance(role, basestring)
379 cluster, type_, id_ = misc.split_role(role)
380 assert type_ == 'client'
381 remote = get_remote_for_role(ctx, role)
382 mnt = _client_mountpoint(ctx, cluster, id_)
383 # subdir so we can remove and recreate this a lot without sudo
384 if subdir is None:
385 scratch_tmp = os.path.join(mnt, 'client.{id}'.format(id=id_), 'tmp')
386 else:
387 scratch_tmp = os.path.join(mnt, subdir)
388 clonedir = '{tdir}/clone.{role}'.format(tdir=testdir, role=role)
389 srcdir = '{cdir}/{basedir}'.format(cdir=clonedir,
390 basedir=basedir)
391
392 git_url = teuth_config.get_ceph_qa_suite_git_url()
393 # if we are running an upgrade test, and ceph-ci does not have branches like
394 # `jewel`, so should use ceph.git as an alternative.
395 try:
396 remote.run(logger=log.getChild(role),
397 args=refspec.clone(git_url, clonedir))
398 except CommandFailedError:
399 if git_url.endswith('/ceph-ci.git'):
400 alt_git_url = git_url.replace('/ceph-ci.git', '/ceph.git')
401 elif git_url.endswith('/ceph-ci'):
402 alt_git_url = re.sub(r'/ceph-ci$', '/ceph.git', git_url)
403 else:
404 raise
405 log.info(
406 "failed to check out '%s' from %s; will also try in %s",
407 refspec,
408 git_url,
409 alt_git_url,
410 )
411 remote.run(logger=log.getChild(role),
412 args=refspec.clone(alt_git_url, clonedir))
413 remote.run(
414 logger=log.getChild(role),
415 args=[
416 'cd', '--', srcdir,
417 run.Raw('&&'),
418 'if', 'test', '-e', 'Makefile', run.Raw(';'), 'then', 'make', run.Raw(';'), 'fi',
419 run.Raw('&&'),
420 'find', '-executable', '-type', 'f', '-printf', r'%P\0'.format(srcdir=srcdir),
421 run.Raw('>{tdir}/workunits.list.{role}'.format(tdir=testdir, role=role)),
422 ],
423 )
424
425 workunits_file = '{tdir}/workunits.list.{role}'.format(tdir=testdir, role=role)
426 workunits = sorted(misc.get_file(remote, workunits_file).split('\0'))
427 assert workunits
428
429 try:
430 assert isinstance(tests, list)
431 for spec in tests:
432 log.info('Running workunits matching %s on %s...', spec, role)
433 prefix = '{spec}/'.format(spec=spec)
434 to_run = [w for w in workunits if w == spec or w.startswith(prefix)]
435 if not to_run:
436 raise RuntimeError('Spec did not match any workunits: {spec!r}'.format(spec=spec))
437 for workunit in to_run:
438 log.info('Running workunit %s...', workunit)
439 args = [
440 'mkdir', '-p', '--', scratch_tmp,
441 run.Raw('&&'),
442 'cd', '--', scratch_tmp,
443 run.Raw('&&'),
444 run.Raw('CEPH_CLI_TEST_DUP_COMMAND=1'),
445 run.Raw('CEPH_REF={ref}'.format(ref=refspec)),
446 run.Raw('TESTDIR="{tdir}"'.format(tdir=testdir)),
447 run.Raw('CEPH_ARGS="--cluster {0}"'.format(cluster)),
448 run.Raw('CEPH_ID="{id}"'.format(id=id_)),
449 run.Raw('PATH=$PATH:/usr/sbin'),
450 run.Raw('CEPH_BASE={dir}'.format(dir=clonedir)),
451 run.Raw('CEPH_ROOT={dir}'.format(dir=clonedir)),
452 ]
453 if env is not None:
454 for var, val in env.iteritems():
455 quoted_val = pipes.quote(val)
456 env_arg = '{var}={val}'.format(var=var, val=quoted_val)
457 args.append(run.Raw(env_arg))
458 args.extend([
459 'adjust-ulimits',
460 'ceph-coverage',
461 '{tdir}/archive/coverage'.format(tdir=testdir)])
462 if timeout and timeout != '0':
463 args.extend(['timeout', timeout])
464 args.extend([
465 '{srcdir}/{workunit}'.format(
466 srcdir=srcdir,
467 workunit=workunit,
468 ),
469 ])
470 remote.run(
471 logger=log.getChild(role),
472 args=args,
473 label="workunit test {workunit}".format(workunit=workunit)
474 )
475 remote.run(
476 logger=log.getChild(role),
477 args=['sudo', 'rm', '-rf', '--', scratch_tmp],
478 )
479 finally:
480 log.info('Stopping %s on %s...', tests, role)
481 remote.run(
482 logger=log.getChild(role),
483 args=[
484 'rm', '-rf', '--', workunits_file, clonedir,
485 ],
486 )