2 Workunit task -- Run ceph on sets of specific clients
10 from tasks
.util
import get_remote_for_role
11 from tasks
.util
.workunit
import get_refspec_after_overrides
13 from teuthology
import misc
14 from teuthology
.config
import config
as teuth_config
15 from teuthology
.exceptions
import CommandFailedError
16 from teuthology
.parallel
import parallel
17 from teuthology
.orchestra
import run
19 log
= logging
.getLogger(__name__
)
21 def task(ctx
, config
):
23 Run ceph on all workunits found under the specified path.
29 - ceph-fuse: [client.0]
32 client.0: [direct_io, xattrs.sh]
36 You can also run a list of workunits on all clients:
43 all: [direct_io, xattrs.sh, snaps]
45 If you have an "all" section it will run all the workunits
46 on each client simultaneously, AFTER running any workunits specified
47 for individual clients. (This prevents unintended simultaneous runs.)
49 To customize tests, you can specify environment variables as a dict. You
50 can also specify a time limit for each work unit (defaults to 3h):
56 sha1: 9b28948635b17165d17c1cf83d4a870bd138ddf6
64 You can also pass optional arguments to the found workunits:
70 - test-ceph-helpers.sh test_get_config
72 This task supports roles that include a ceph cluster, e.g.::
78 backup.client.0: [foo]
79 client.1: [bar] # cluster is implicitly 'ceph'
81 You can also specify an alternative top-level dir to 'qa/workunits', like
82 'qa/standalone', with::
87 basedir: qa/standalone
90 - test-ceph-helpers.sh
93 :param config: Configuration
95 assert isinstance(config
, dict)
96 assert isinstance(config
.get('clients'), dict), \
97 'configuration must contain a dictionary of clients'
99 overrides
= ctx
.config
.get('overrides', {})
100 refspec
= get_refspec_after_overrides(config
, overrides
)
101 timeout
= config
.get('timeout', '3h')
102 cleanup
= config
.get('cleanup', True)
104 log
.info('Pulling workunits from ref %s', refspec
)
106 created_mountpoint
= {}
108 if config
.get('env') is not None:
109 assert isinstance(config
['env'], dict), 'env must be a dictionary'
110 clients
= config
['clients']
112 # Create scratch dirs for any non-all workunits
113 log
.info('Making a separate scratch dir for every client...')
114 for role
in clients
.keys():
115 assert isinstance(role
, str)
119 assert 'client' in role
120 created_mnt_dir
= _make_scratch_dir(ctx
, role
, config
.get('subdir'))
121 created_mountpoint
[role
] = created_mnt_dir
123 # Execute any non-all workunits
124 log
.info("timeout={}".format(timeout
))
125 log
.info("cleanup={}".format(cleanup
))
126 with
parallel() as p
:
127 for role
, tests
in clients
.items():
129 p
.spawn(_run_tests
, ctx
, refspec
, role
, tests
,
131 basedir
=config
.get('basedir','qa/workunits'),
132 subdir
=config
.get('subdir'),
135 coverage_and_limits
=not config
.get('no_coverage_and_limits', None))
138 # Clean up dirs from any non-all workunits
139 for role
, created
in created_mountpoint
.items():
140 _delete_dir(ctx
, role
, created
)
142 # Execute any 'all' workunits
144 all_tasks
= clients
["all"]
145 _spawn_on_all_clients(ctx
, refspec
, all_tasks
, config
.get('env'),
146 config
.get('basedir', 'qa/workunits'),
147 config
.get('subdir'), timeout
=timeout
,
151 def _client_mountpoint(ctx
, cluster
, id_
):
153 Returns the path to the expected mountpoint for workunits running
154 on some kind of filesystem.
156 # for compatibility with tasks like ceph-fuse that aren't cluster-aware yet,
157 # only include the cluster name in the dir if the cluster is not 'ceph'
158 if cluster
== 'ceph':
159 dir_
= 'mnt.{0}'.format(id_
)
161 dir_
= 'mnt.{0}.{1}'.format(cluster
, id_
)
162 return os
.path
.join(misc
.get_testdir(ctx
), dir_
)
165 def _delete_dir(ctx
, role
, created_mountpoint
):
167 Delete file used by this role, and delete the directory that this
171 :param role: "role.#" where # is used for the role id.
173 cluster
, _
, id_
= misc
.split_role(role
)
174 remote
= get_remote_for_role(ctx
, role
)
175 mnt
= _client_mountpoint(ctx
, cluster
, id_
)
176 client
= os
.path
.join(mnt
, 'client.{id}'.format(id=id_
))
178 # Remove the directory inside the mount where the workunit ran
188 log
.info("Deleted dir {dir}".format(dir=client
))
190 # If the mount was an artificially created dir, delete that too
191 if created_mountpoint
:
199 log
.info("Deleted artificial mount point {dir}".format(dir=client
))
202 def _make_scratch_dir(ctx
, role
, subdir
):
204 Make scratch directories for this role. This also makes the mount
205 point if that directory does not exist.
208 :param role: "role.#" where # is used for the role id.
209 :param subdir: use this subdir (False if not used)
211 created_mountpoint
= False
212 cluster
, _
, id_
= misc
.split_role(role
)
213 remote
= get_remote_for_role(ctx
, role
)
214 dir_owner
= remote
.user
215 mnt
= _client_mountpoint(ctx
, cluster
, id_
)
216 # if neither kclient nor ceph-fuse are required for a workunit,
217 # mnt may not exist. Stat and create the directory if it doesn't.
226 log
.info('Did not need to create dir {dir}'.format(dir=mnt
))
227 except CommandFailedError
:
235 log
.info('Created dir {dir}'.format(dir=mnt
))
236 created_mountpoint
= True
239 subdir
= 'client.{id}'.format(id=id_
)
241 if created_mountpoint
:
256 # cd first so this will fail if the mount point does
257 # not exist; pure install -d will silently do the
267 '--owner={user}'.format(user
=dir_owner
),
273 return created_mountpoint
276 def _spawn_on_all_clients(ctx
, refspec
, tests
, env
, basedir
, subdir
, timeout
=None, cleanup
=True):
278 Make a scratch directory for each client in the cluster, and then for each
279 test spawn _run_tests() for each role.
281 See run_tests() for parameter documentation.
283 is_client
= misc
.is_type('client')
285 created_mountpoint
= {}
286 for remote
, roles_for_host
in ctx
.cluster
.remotes
.items():
287 for role
in roles_for_host
:
289 client_remotes
[role
] = remote
290 created_mountpoint
[role
] = _make_scratch_dir(ctx
, role
, subdir
)
293 with
parallel() as p
:
294 for role
, remote
in client_remotes
.items():
295 p
.spawn(_run_tests
, ctx
, refspec
, role
, [unit
], env
,
300 # cleanup the generated client directories
302 for role
, _
in client_remotes
.items():
303 _delete_dir(ctx
, role
, created_mountpoint
[role
])
306 def _run_tests(ctx
, refspec
, role
, tests
, env
, basedir
,
307 subdir
=None, timeout
=None, cleanup
=True,
308 coverage_and_limits
=True):
310 Run the individual test. Create a scratch directory and then extract the
311 workunits from git. Make the executables, and then run the tests.
312 Clean up (remove files created) after the tests are finished.
315 :param refspec: branch, sha1, or version tag used to identify this
317 :param tests: specific tests specified.
318 :param env: environment set in yaml file. Could be None.
319 :param subdir: subdirectory set in yaml file. Could be None
320 :param timeout: If present, use the 'timeout' command on the remote host
321 to limit execution time. Must be specified by a number
322 followed by 's' for seconds, 'm' for minutes, 'h' for
323 hours, or 'd' for days. If '0' or anything that evaluates
324 to False is passed, the 'timeout' command is not used.
326 testdir
= misc
.get_testdir(ctx
)
327 assert isinstance(role
, str)
328 cluster
, type_
, id_
= misc
.split_role(role
)
329 assert type_
== 'client'
330 remote
= get_remote_for_role(ctx
, role
)
331 mnt
= _client_mountpoint(ctx
, cluster
, id_
)
332 # subdir so we can remove and recreate this a lot without sudo
334 scratch_tmp
= os
.path
.join(mnt
, 'client.{id}'.format(id=id_
), 'tmp')
336 scratch_tmp
= os
.path
.join(mnt
, subdir
)
337 clonedir
= '{tdir}/clone.{role}'.format(tdir
=testdir
, role
=role
)
338 srcdir
= '{cdir}/{basedir}'.format(cdir
=clonedir
,
341 git_url
= teuth_config
.get_ceph_qa_suite_git_url()
342 # if we are running an upgrade test, and ceph-ci does not have branches like
343 # `jewel`, so should use ceph.git as an alternative.
345 remote
.run(logger
=log
.getChild(role
),
346 args
=refspec
.clone(git_url
, clonedir
))
347 except CommandFailedError
:
348 if git_url
.endswith('/ceph-ci.git'):
349 alt_git_url
= git_url
.replace('/ceph-ci.git', '/ceph.git')
350 elif git_url
.endswith('/ceph-ci'):
351 alt_git_url
= re
.sub(r
'/ceph-ci$', '/ceph.git', git_url
)
355 "failed to check out '%s' from %s; will also try in %s",
360 remote
.run(logger
=log
.getChild(role
),
361 args
=refspec
.clone(alt_git_url
, clonedir
))
363 logger
=log
.getChild(role
),
367 'if', 'test', '-e', 'Makefile', run
.Raw(';'), 'then', 'make', run
.Raw(';'), 'fi',
369 'find', '-executable', '-type', 'f', '-printf', r
'%P\0',
370 run
.Raw('>{tdir}/workunits.list.{role}'.format(tdir
=testdir
, role
=role
)),
374 workunits_file
= '{tdir}/workunits.list.{role}'.format(tdir
=testdir
, role
=role
)
375 workunits
= sorted(remote
.read_file(workunits_file
).decode().split('\0'))
379 assert isinstance(tests
, list)
381 dir_or_fname
, *optional_args
= shlex
.split(spec
)
382 log
.info('Running workunits matching %s on %s...', dir_or_fname
, role
)
383 # match executables named "foo" or "foo/*" with workunit named
385 to_run
= [w
for w
in workunits
386 if os
.path
.commonpath([w
, dir_or_fname
]) == dir_or_fname
]
388 raise RuntimeError('Spec did not match any workunits: {spec!r}'.format(spec
=spec
))
389 for workunit
in to_run
:
390 log
.info('Running workunit %s...', workunit
)
392 'mkdir', '-p', '--', scratch_tmp
,
394 'cd', '--', scratch_tmp
,
396 run
.Raw('CEPH_CLI_TEST_DUP_COMMAND=1'),
397 run
.Raw('CEPH_REF={ref}'.format(ref
=refspec
)),
398 run
.Raw('TESTDIR="{tdir}"'.format(tdir
=testdir
)),
399 run
.Raw('CEPH_ARGS="--cluster {0}"'.format(cluster
)),
400 run
.Raw('CEPH_ID="{id}"'.format(id=id_
)),
401 run
.Raw('PATH=$PATH:/usr/sbin'),
402 run
.Raw('CEPH_BASE={dir}'.format(dir=clonedir
)),
403 run
.Raw('CEPH_ROOT={dir}'.format(dir=clonedir
)),
404 run
.Raw('CEPH_MNT={dir}'.format(dir=mnt
)),
407 for var
, val
in env
.items():
408 quoted_val
= pipes
.quote(val
)
409 env_arg
= '{var}={val}'.format(var
=var
, val
=quoted_val
)
410 args
.append(run
.Raw(env_arg
))
411 if coverage_and_limits
:
415 '{tdir}/archive/coverage'.format(tdir
=testdir
)])
416 if timeout
and timeout
!= '0':
417 args
.extend(['timeout', timeout
])
419 '{srcdir}/{workunit}'.format(
425 logger
=log
.getChild(role
),
426 args
=args
+ optional_args
,
427 label
="workunit test {workunit}".format(workunit
=workunit
)
430 args
=['sudo', 'rm', '-rf', '--', scratch_tmp
]
431 remote
.run(logger
=log
.getChild(role
), args
=args
, timeout
=(60*60))
433 log
.info('Stopping %s on %s...', tests
, role
)
434 args
=['sudo', 'rm', '-rf', '--', workunits_file
, clonedir
]
435 # N.B. don't cleanup scratch_tmp! If the mount is broken then rm will hang.
437 logger
=log
.getChild(role
),