]> git.proxmox.com Git - ceph.git/blame - ceph/qa/tasks/workunit.py
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / qa / tasks / workunit.py
CommitLineData
7c673cae
FG
1"""
2Workunit task -- Run ceph on sets of specific clients
3"""
4import logging
5import pipes
6import os
7
8from copy import deepcopy
9from util import get_remote_for_role
10
11from teuthology import misc
12from teuthology.config import config as teuth_config
13from teuthology.orchestra.run import CommandFailedError
14from teuthology.parallel import parallel
15from teuthology.orchestra import run
16
17log = logging.getLogger(__name__)
18
19
20class Refspec:
21 def __init__(self, refspec):
22 self.refspec = refspec
23
24 def __str__(self):
25 return self.refspec
26
27 def _clone(self, git_url, clonedir, opts=None):
28 if opts is None:
29 opts = []
30 return (['rm', '-rf', clonedir] +
31 [run.Raw('&&')] +
32 ['git', 'clone'] + opts +
33 [git_url, clonedir])
34
35 def _cd(self, clonedir):
36 return ['cd', clonedir]
37
38 def _checkout(self):
39 return ['git', 'checkout', self.refspec]
40
41 def clone(self, git_url, clonedir):
42 return (self._clone(git_url, clonedir) +
43 [run.Raw('&&')] +
44 self._cd(clonedir) +
45 [run.Raw('&&')] +
46 self._checkout())
47
48
49class Branch(Refspec):
50 def __init__(self, tag):
51 Refspec.__init__(self, tag)
52
53 def clone(self, git_url, clonedir):
54 opts = ['--depth', '1',
55 '--branch', self.refspec]
56 return (self._clone(git_url, clonedir, opts) +
57 [run.Raw('&&')] +
58 self._cd(clonedir))
59
60
61class Head(Refspec):
62 def __init__(self):
63 Refspec.__init__(self, 'HEAD')
64
65 def clone(self, git_url, clonedir):
66 opts = ['--depth', '1']
67 return (self._clone(git_url, clonedir, opts) +
68 [run.Raw('&&')] +
69 self._cd(clonedir))
70
71
72def task(ctx, config):
73 """
74 Run ceph on all workunits found under the specified path.
75
76 For example::
77
78 tasks:
79 - ceph:
80 - ceph-fuse: [client.0]
81 - workunit:
82 clients:
83 client.0: [direct_io, xattrs.sh]
84 client.1: [snaps]
85 branch: foo
86
87 You can also run a list of workunits on all clients:
88 tasks:
89 - ceph:
90 - ceph-fuse:
91 - workunit:
92 tag: v0.47
93 clients:
94 all: [direct_io, xattrs.sh, snaps]
95
96 If you have an "all" section it will run all the workunits
97 on each client simultaneously, AFTER running any workunits specified
98 for individual clients. (This prevents unintended simultaneous runs.)
99
100 To customize tests, you can specify environment variables as a dict. You
101 can also specify a time limit for each work unit (defaults to 3h):
102
103 tasks:
104 - ceph:
105 - ceph-fuse:
106 - workunit:
107 sha1: 9b28948635b17165d17c1cf83d4a870bd138ddf6
108 clients:
109 all: [snaps]
110 env:
111 FOO: bar
112 BAZ: quux
113 timeout: 3h
114
115 This task supports roles that include a ceph cluster, e.g.::
116
117 tasks:
118 - ceph:
119 - workunit:
120 clients:
121 backup.client.0: [foo]
122 client.1: [bar] # cluster is implicitly 'ceph'
123
124 :param ctx: Context
125 :param config: Configuration
126 """
127 assert isinstance(config, dict)
128 assert isinstance(config.get('clients'), dict), \
129 'configuration must contain a dictionary of clients'
130
131 # mimic the behavior of the "install" task, where the "overrides" are
132 # actually the defaults of that task. in other words, if none of "sha1",
133 # "tag", or "branch" is specified by a "workunit" tasks, we will update
134 # it with the information in the "workunit" sub-task nested in "overrides".
135 overrides = deepcopy(ctx.config.get('overrides', {}).get('workunit', {}))
136 refspecs = {'branch': Branch, 'tag': Refspec, 'sha1': Refspec}
137 if any(map(lambda i: i in config, refspecs.iterkeys())):
138 for i in refspecs.iterkeys():
139 overrides.pop(i, None)
140 misc.deep_merge(config, overrides)
141
142 for spec, cls in refspecs.iteritems():
143 refspec = config.get(spec)
144 if refspec:
145 refspec = cls(refspec)
146 break
147 if refspec is None:
148 refspec = Head()
149
150 timeout = config.get('timeout', '3h')
151
152 log.info('Pulling workunits from ref %s', refspec)
153
154 created_mountpoint = {}
155
156 if config.get('env') is not None:
157 assert isinstance(config['env'], dict), 'env must be a dictionary'
158 clients = config['clients']
159
160 # Create scratch dirs for any non-all workunits
161 log.info('Making a separate scratch dir for every client...')
162 for role in clients.iterkeys():
163 assert isinstance(role, basestring)
164 if role == "all":
165 continue
166
167 assert 'client' in role
168 created_mnt_dir = _make_scratch_dir(ctx, role, config.get('subdir'))
169 created_mountpoint[role] = created_mnt_dir
170
171 # Execute any non-all workunits
172 with parallel() as p:
173 for role, tests in clients.iteritems():
174 if role != "all":
175 p.spawn(_run_tests, ctx, refspec, role, tests,
176 config.get('env'), timeout=timeout)
177
178 # Clean up dirs from any non-all workunits
179 for role, created in created_mountpoint.items():
180 _delete_dir(ctx, role, created)
181
182 # Execute any 'all' workunits
183 if 'all' in clients:
184 all_tasks = clients["all"]
185 _spawn_on_all_clients(ctx, refspec, all_tasks, config.get('env'),
186 config.get('subdir'), timeout=timeout)
187
188
189def _client_mountpoint(ctx, cluster, id_):
190 """
191 Returns the path to the expected mountpoint for workunits running
192 on some kind of filesystem.
193 """
194 # for compatibility with tasks like ceph-fuse that aren't cluster-aware yet,
195 # only include the cluster name in the dir if the cluster is not 'ceph'
196 if cluster == 'ceph':
197 dir_ = 'mnt.{0}'.format(id_)
198 else:
199 dir_ = 'mnt.{0}.{1}'.format(cluster, id_)
200 return os.path.join(misc.get_testdir(ctx), dir_)
201
202
203def _delete_dir(ctx, role, created_mountpoint):
204 """
205 Delete file used by this role, and delete the directory that this
206 role appeared in.
207
208 :param ctx: Context
209 :param role: "role.#" where # is used for the role id.
210 """
211 cluster, _, id_ = misc.split_role(role)
212 remote = get_remote_for_role(ctx, role)
213 mnt = _client_mountpoint(ctx, cluster, id_)
214 client = os.path.join(mnt, 'client.{id}'.format(id=id_))
215
216 # Remove the directory inside the mount where the workunit ran
217 remote.run(
218 args=[
219 'sudo',
220 'rm',
221 '-rf',
222 '--',
223 client,
224 ],
225 )
226 log.info("Deleted dir {dir}".format(dir=client))
227
228 # If the mount was an artificially created dir, delete that too
229 if created_mountpoint:
230 remote.run(
231 args=[
232 'rmdir',
233 '--',
234 mnt,
235 ],
236 )
237 log.info("Deleted artificial mount point {dir}".format(dir=client))
238
239
240def _make_scratch_dir(ctx, role, subdir):
241 """
242 Make scratch directories for this role. This also makes the mount
243 point if that directory does not exist.
244
245 :param ctx: Context
246 :param role: "role.#" where # is used for the role id.
247 :param subdir: use this subdir (False if not used)
248 """
249 created_mountpoint = False
250 cluster, _, id_ = misc.split_role(role)
251 remote = get_remote_for_role(ctx, role)
252 dir_owner = remote.user
253 mnt = _client_mountpoint(ctx, cluster, id_)
254 # if neither kclient nor ceph-fuse are required for a workunit,
255 # mnt may not exist. Stat and create the directory if it doesn't.
256 try:
257 remote.run(
258 args=[
259 'stat',
260 '--',
261 mnt,
262 ],
263 )
264 log.info('Did not need to create dir {dir}'.format(dir=mnt))
265 except CommandFailedError:
266 remote.run(
267 args=[
268 'mkdir',
269 '--',
270 mnt,
271 ],
272 )
273 log.info('Created dir {dir}'.format(dir=mnt))
274 created_mountpoint = True
275
276 if not subdir:
277 subdir = 'client.{id}'.format(id=id_)
278
279 if created_mountpoint:
280 remote.run(
281 args=[
282 'cd',
283 '--',
284 mnt,
285 run.Raw('&&'),
286 'mkdir',
287 '--',
288 subdir,
289 ],
290 )
291 else:
292 remote.run(
293 args=[
294 # cd first so this will fail if the mount point does
295 # not exist; pure install -d will silently do the
296 # wrong thing
297 'cd',
298 '--',
299 mnt,
300 run.Raw('&&'),
301 'sudo',
302 'install',
303 '-d',
304 '-m', '0755',
305 '--owner={user}'.format(user=dir_owner),
306 '--',
307 subdir,
308 ],
309 )
310
311 return created_mountpoint
312
313
314def _spawn_on_all_clients(ctx, refspec, tests, env, subdir, timeout=None):
315 """
316 Make a scratch directory for each client in the cluster, and then for each
317 test spawn _run_tests() for each role.
318
319 See run_tests() for parameter documentation.
320 """
321 is_client = misc.is_type('client')
322 client_remotes = {}
323 created_mountpoint = {}
324 for remote, roles_for_host in ctx.cluster.remotes.items():
325 for role in roles_for_host:
326 if is_client(role):
327 client_remotes[role] = remote
328 created_mountpoint[role] = _make_scratch_dir(ctx, role, subdir)
329
330 for unit in tests:
331 with parallel() as p:
332 for role, remote in client_remotes.items():
333 p.spawn(_run_tests, ctx, refspec, role, [unit], env, subdir,
334 timeout=timeout)
335
336 # cleanup the generated client directories
337 for role, _ in client_remotes.items():
338 _delete_dir(ctx, role, created_mountpoint[role])
339
340
341def _run_tests(ctx, refspec, role, tests, env, subdir=None, timeout=None):
342 """
343 Run the individual test. Create a scratch directory and then extract the
344 workunits from git. Make the executables, and then run the tests.
345 Clean up (remove files created) after the tests are finished.
346
347 :param ctx: Context
348 :param refspec: branch, sha1, or version tag used to identify this
349 build
350 :param tests: specific tests specified.
351 :param env: environment set in yaml file. Could be None.
352 :param subdir: subdirectory set in yaml file. Could be None
353 :param timeout: If present, use the 'timeout' command on the remote host
354 to limit execution time. Must be specified by a number
355 followed by 's' for seconds, 'm' for minutes, 'h' for
356 hours, or 'd' for days. If '0' or anything that evaluates
357 to False is passed, the 'timeout' command is not used.
358 """
359 testdir = misc.get_testdir(ctx)
360 assert isinstance(role, basestring)
361 cluster, type_, id_ = misc.split_role(role)
362 assert type_ == 'client'
363 remote = get_remote_for_role(ctx, role)
364 mnt = _client_mountpoint(ctx, cluster, id_)
365 # subdir so we can remove and recreate this a lot without sudo
366 if subdir is None:
367 scratch_tmp = os.path.join(mnt, 'client.{id}'.format(id=id_), 'tmp')
368 else:
369 scratch_tmp = os.path.join(mnt, subdir)
370 clonedir = '{tdir}/clone.{role}'.format(tdir=testdir, role=role)
371 srcdir = '{cdir}/qa/workunits'.format(cdir=clonedir)
372
373 git_url = teuth_config.get_ceph_qa_suite_git_url()
374 # if we are running an upgrade test, and ceph-ci does not have branches like
375 # `jewel`, so should use ceph.git as an alternative.
376 try:
377 remote.run(logger=log.getChild(role),
378 args=refspec.clone(git_url, clonedir))
379 except CommandFailedError:
380 if not git_url.endswith('/ceph-ci.git'):
381 raise
382 alt_git_url = git_url.replace('/ceph-ci.git', '/ceph.git')
383 log.info(
384 "failed to check out '%s' from %s; will also try in %s",
385 refspec,
386 git_url,
387 alt_git_url,
388 )
389 remote.run(logger=log.getChild(role),
390 args=refspec.clone(alt_git_url, clonedir))
391 remote.run(
392 logger=log.getChild(role),
393 args=[
394 'cd', '--', srcdir,
395 run.Raw('&&'),
396 'if', 'test', '-e', 'Makefile', run.Raw(';'), 'then', 'make', run.Raw(';'), 'fi',
397 run.Raw('&&'),
398 'find', '-executable', '-type', 'f', '-printf', r'%P\0'.format(srcdir=srcdir),
399 run.Raw('>{tdir}/workunits.list.{role}'.format(tdir=testdir, role=role)),
400 ],
401 )
402
403 workunits_file = '{tdir}/workunits.list.{role}'.format(tdir=testdir, role=role)
404 workunits = sorted(misc.get_file(remote, workunits_file).split('\0'))
405 assert workunits
406
407 try:
408 assert isinstance(tests, list)
409 for spec in tests:
410 log.info('Running workunits matching %s on %s...', spec, role)
411 prefix = '{spec}/'.format(spec=spec)
412 to_run = [w for w in workunits if w == spec or w.startswith(prefix)]
413 if not to_run:
414 raise RuntimeError('Spec did not match any workunits: {spec!r}'.format(spec=spec))
415 for workunit in to_run:
416 log.info('Running workunit %s...', workunit)
417 args = [
418 'mkdir', '-p', '--', scratch_tmp,
419 run.Raw('&&'),
420 'cd', '--', scratch_tmp,
421 run.Raw('&&'),
422 run.Raw('CEPH_CLI_TEST_DUP_COMMAND=1'),
423 run.Raw('CEPH_REF={ref}'.format(ref=refspec)),
424 run.Raw('TESTDIR="{tdir}"'.format(tdir=testdir)),
425 run.Raw('CEPH_ARGS="--cluster {0}"'.format(cluster)),
426 run.Raw('CEPH_ID="{id}"'.format(id=id_)),
427 run.Raw('PATH=$PATH:/usr/sbin'),
428 run.Raw('CEPH_BASE={dir}'.format(dir=clonedir)),
429 ]
430 if env is not None:
431 for var, val in env.iteritems():
432 quoted_val = pipes.quote(val)
433 env_arg = '{var}={val}'.format(var=var, val=quoted_val)
434 args.append(run.Raw(env_arg))
435 args.extend([
436 'adjust-ulimits',
437 'ceph-coverage',
438 '{tdir}/archive/coverage'.format(tdir=testdir)])
439 if timeout and timeout != '0':
440 args.extend(['timeout', timeout])
441 args.extend([
442 '{srcdir}/{workunit}'.format(
443 srcdir=srcdir,
444 workunit=workunit,
445 ),
446 ])
447 remote.run(
448 logger=log.getChild(role),
449 args=args,
450 label="workunit test {workunit}".format(workunit=workunit)
451 )
452 remote.run(
453 logger=log.getChild(role),
454 args=['sudo', 'rm', '-rf', '--', scratch_tmp],
455 )
456 finally:
457 log.info('Stopping %s on %s...', tests, role)
458 remote.run(
459 logger=log.getChild(role),
460 args=[
461 'rm', '-rf', '--', workunits_file, clonedir,
462 ],
463 )