2 Workunit task -- Run ceph on sets of specific clients
11 from tasks
.util
import get_remote_for_role
12 from tasks
.util
.workunit
import get_refspec_after_overrides
14 from teuthology
import misc
15 from teuthology
.config
import config
as teuth_config
16 from teuthology
.orchestra
.run
import CommandFailedError
17 from teuthology
.parallel
import parallel
18 from teuthology
.orchestra
import run
20 log
= logging
.getLogger(__name__
)
22 def task(ctx
, config
):
24 Run ceph on all workunits found under the specified path.
30 - ceph-fuse: [client.0]
33 client.0: [direct_io, xattrs.sh]
37 You can also run a list of workunits on all clients:
44 all: [direct_io, xattrs.sh, snaps]
46 If you have an "all" section it will run all the workunits
47 on each client simultaneously, AFTER running any workunits specified
48 for individual clients. (This prevents unintended simultaneous runs.)
50 To customize tests, you can specify environment variables as a dict. You
51 can also specify a time limit for each work unit (defaults to 3h):
57 sha1: 9b28948635b17165d17c1cf83d4a870bd138ddf6
65 This task supports roles that include a ceph cluster, e.g.::
71 backup.client.0: [foo]
72 client.1: [bar] # cluster is implicitly 'ceph'
74 You can also specify an alternative top-level dir to 'qa/workunits', like
75 'qa/standalone', with::
80 basedir: qa/standalone
83 - test-ceph-helpers.sh
86 :param config: Configuration
88 assert isinstance(config
, dict)
89 assert isinstance(config
.get('clients'), dict), \
90 'configuration must contain a dictionary of clients'
92 overrides
= ctx
.config
.get('overrides', {})
93 refspec
= get_refspec_after_overrides(config
, overrides
)
94 timeout
= config
.get('timeout', '3h')
95 cleanup
= config
.get('cleanup', True)
97 log
.info('Pulling workunits from ref %s', refspec
)
99 created_mountpoint
= {}
101 if config
.get('env') is not None:
102 assert isinstance(config
['env'], dict), 'env must be a dictionary'
103 clients
= config
['clients']
105 # Create scratch dirs for any non-all workunits
106 log
.info('Making a separate scratch dir for every client...')
107 for role
in clients
.keys():
108 assert isinstance(role
, six
.string_types
)
112 assert 'client' in role
113 created_mnt_dir
= _make_scratch_dir(ctx
, role
, config
.get('subdir'))
114 created_mountpoint
[role
] = created_mnt_dir
116 # Execute any non-all workunits
117 log
.info("timeout={}".format(timeout
))
118 log
.info("cleanup={}".format(cleanup
))
119 with
parallel() as p
:
120 for role
, tests
in clients
.items():
122 p
.spawn(_run_tests
, ctx
, refspec
, role
, tests
,
124 basedir
=config
.get('basedir','qa/workunits'),
127 coverage_and_limits
=not config
.get('no_coverage_and_limits', None))
130 # Clean up dirs from any non-all workunits
131 for role
, created
in created_mountpoint
.items():
132 _delete_dir(ctx
, role
, created
)
134 # Execute any 'all' workunits
136 all_tasks
= clients
["all"]
137 _spawn_on_all_clients(ctx
, refspec
, all_tasks
, config
.get('env'),
138 config
.get('basedir', 'qa/workunits'),
139 config
.get('subdir'), timeout
=timeout
,
143 def _client_mountpoint(ctx
, cluster
, id_
):
145 Returns the path to the expected mountpoint for workunits running
146 on some kind of filesystem.
148 # for compatibility with tasks like ceph-fuse that aren't cluster-aware yet,
149 # only include the cluster name in the dir if the cluster is not 'ceph'
150 if cluster
== 'ceph':
151 dir_
= 'mnt.{0}'.format(id_
)
153 dir_
= 'mnt.{0}.{1}'.format(cluster
, id_
)
154 return os
.path
.join(misc
.get_testdir(ctx
), dir_
)
157 def _delete_dir(ctx
, role
, created_mountpoint
):
159 Delete file used by this role, and delete the directory that this
163 :param role: "role.#" where # is used for the role id.
165 cluster
, _
, id_
= misc
.split_role(role
)
166 remote
= get_remote_for_role(ctx
, role
)
167 mnt
= _client_mountpoint(ctx
, cluster
, id_
)
168 client
= os
.path
.join(mnt
, 'client.{id}'.format(id=id_
))
170 # Remove the directory inside the mount where the workunit ran
180 log
.info("Deleted dir {dir}".format(dir=client
))
182 # If the mount was an artificially created dir, delete that too
183 if created_mountpoint
:
191 log
.info("Deleted artificial mount point {dir}".format(dir=client
))
194 def _make_scratch_dir(ctx
, role
, subdir
):
196 Make scratch directories for this role. This also makes the mount
197 point if that directory does not exist.
200 :param role: "role.#" where # is used for the role id.
201 :param subdir: use this subdir (False if not used)
203 created_mountpoint
= False
204 cluster
, _
, id_
= misc
.split_role(role
)
205 remote
= get_remote_for_role(ctx
, role
)
206 dir_owner
= remote
.user
207 mnt
= _client_mountpoint(ctx
, cluster
, id_
)
208 # if neither kclient nor ceph-fuse are required for a workunit,
209 # mnt may not exist. Stat and create the directory if it doesn't.
218 log
.info('Did not need to create dir {dir}'.format(dir=mnt
))
219 except CommandFailedError
:
227 log
.info('Created dir {dir}'.format(dir=mnt
))
228 created_mountpoint
= True
231 subdir
= 'client.{id}'.format(id=id_
)
233 if created_mountpoint
:
248 # cd first so this will fail if the mount point does
249 # not exist; pure install -d will silently do the
259 '--owner={user}'.format(user
=dir_owner
),
265 return created_mountpoint
268 def _spawn_on_all_clients(ctx
, refspec
, tests
, env
, basedir
, subdir
, timeout
=None, cleanup
=True):
270 Make a scratch directory for each client in the cluster, and then for each
271 test spawn _run_tests() for each role.
273 See run_tests() for parameter documentation.
275 is_client
= misc
.is_type('client')
277 created_mountpoint
= {}
278 for remote
, roles_for_host
in ctx
.cluster
.remotes
.items():
279 for role
in roles_for_host
:
281 client_remotes
[role
] = remote
282 created_mountpoint
[role
] = _make_scratch_dir(ctx
, role
, subdir
)
285 with
parallel() as p
:
286 for role
, remote
in client_remotes
.items():
287 p
.spawn(_run_tests
, ctx
, refspec
, role
, [unit
], env
,
292 # cleanup the generated client directories
294 for role
, _
in client_remotes
.items():
295 _delete_dir(ctx
, role
, created_mountpoint
[role
])
298 def _run_tests(ctx
, refspec
, role
, tests
, env
, basedir
,
299 subdir
=None, timeout
=None, cleanup
=True,
300 coverage_and_limits
=True):
302 Run the individual test. Create a scratch directory and then extract the
303 workunits from git. Make the executables, and then run the tests.
304 Clean up (remove files created) after the tests are finished.
307 :param refspec: branch, sha1, or version tag used to identify this
309 :param tests: specific tests specified.
310 :param env: environment set in yaml file. Could be None.
311 :param subdir: subdirectory set in yaml file. Could be None
312 :param timeout: If present, use the 'timeout' command on the remote host
313 to limit execution time. Must be specified by a number
314 followed by 's' for seconds, 'm' for minutes, 'h' for
315 hours, or 'd' for days. If '0' or anything that evaluates
316 to False is passed, the 'timeout' command is not used.
318 testdir
= misc
.get_testdir(ctx
)
319 assert isinstance(role
, six
.string_types
)
320 cluster
, type_
, id_
= misc
.split_role(role
)
321 assert type_
== 'client'
322 remote
= get_remote_for_role(ctx
, role
)
323 mnt
= _client_mountpoint(ctx
, cluster
, id_
)
324 # subdir so we can remove and recreate this a lot without sudo
326 scratch_tmp
= os
.path
.join(mnt
, 'client.{id}'.format(id=id_
), 'tmp')
328 scratch_tmp
= os
.path
.join(mnt
, subdir
)
329 clonedir
= '{tdir}/clone.{role}'.format(tdir
=testdir
, role
=role
)
330 srcdir
= '{cdir}/{basedir}'.format(cdir
=clonedir
,
333 git_url
= teuth_config
.get_ceph_qa_suite_git_url()
334 # if we are running an upgrade test, and ceph-ci does not have branches like
335 # `jewel`, so should use ceph.git as an alternative.
337 remote
.run(logger
=log
.getChild(role
),
338 args
=refspec
.clone(git_url
, clonedir
))
339 except CommandFailedError
:
340 if git_url
.endswith('/ceph-ci.git'):
341 alt_git_url
= git_url
.replace('/ceph-ci.git', '/ceph.git')
342 elif git_url
.endswith('/ceph-ci'):
343 alt_git_url
= re
.sub(r
'/ceph-ci$', '/ceph.git', git_url
)
347 "failed to check out '%s' from %s; will also try in %s",
352 remote
.run(logger
=log
.getChild(role
),
353 args
=refspec
.clone(alt_git_url
, clonedir
))
355 logger
=log
.getChild(role
),
359 'if', 'test', '-e', 'Makefile', run
.Raw(';'), 'then', 'make', run
.Raw(';'), 'fi',
361 'find', '-executable', '-type', 'f', '-printf', r
'%P\0',
362 run
.Raw('>{tdir}/workunits.list.{role}'.format(tdir
=testdir
, role
=role
)),
366 workunits_file
= '{tdir}/workunits.list.{role}'.format(tdir
=testdir
, role
=role
)
367 workunits
= sorted(six
.ensure_str(misc
.get_file(remote
, workunits_file
)).split('\0'))
371 assert isinstance(tests
, list)
373 log
.info('Running workunits matching %s on %s...', spec
, role
)
374 prefix
= '{spec}/'.format(spec
=spec
)
375 to_run
= [w
for w
in workunits
if w
== spec
or w
.startswith(prefix
)]
377 raise RuntimeError('Spec did not match any workunits: {spec!r}'.format(spec
=spec
))
378 for workunit
in to_run
:
379 log
.info('Running workunit %s...', workunit
)
381 'mkdir', '-p', '--', scratch_tmp
,
383 'cd', '--', scratch_tmp
,
385 run
.Raw('CEPH_CLI_TEST_DUP_COMMAND=1'),
386 run
.Raw('CEPH_REF={ref}'.format(ref
=refspec
)),
387 run
.Raw('TESTDIR="{tdir}"'.format(tdir
=testdir
)),
388 run
.Raw('CEPH_ARGS="--cluster {0}"'.format(cluster
)),
389 run
.Raw('CEPH_ID="{id}"'.format(id=id_
)),
390 run
.Raw('PATH=$PATH:/usr/sbin'),
391 run
.Raw('CEPH_BASE={dir}'.format(dir=clonedir
)),
392 run
.Raw('CEPH_ROOT={dir}'.format(dir=clonedir
)),
395 for var
, val
in env
.items():
396 quoted_val
= pipes
.quote(val
)
397 env_arg
= '{var}={val}'.format(var
=var
, val
=quoted_val
)
398 args
.append(run
.Raw(env_arg
))
399 if coverage_and_limits
:
403 '{tdir}/archive/coverage'.format(tdir
=testdir
)])
404 if timeout
and timeout
!= '0':
405 args
.extend(['timeout', timeout
])
407 '{srcdir}/{workunit}'.format(
413 logger
=log
.getChild(role
),
415 label
="workunit test {workunit}".format(workunit
=workunit
)
418 args
=['sudo', 'rm', '-rf', '--', scratch_tmp
]
419 remote
.run(logger
=log
.getChild(role
), args
=args
, timeout
=(60*60))
421 log
.info('Stopping %s on %s...', tests
, role
)
422 args
=['sudo', 'rm', '-rf', '--', workunits_file
, clonedir
]
423 # N.B. don't cleanup scratch_tmp! If the mount is broken then rm will hang.
425 logger
=log
.getChild(role
),