]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | """ |
2 | Workunit task -- Run ceph on sets of specific clients | |
3 | """ | |
4 | import logging | |
5 | import pipes | |
6 | import os | |
7 | ||
8 | from copy import deepcopy | |
9 | from util import get_remote_for_role | |
10 | ||
11 | from teuthology import misc | |
12 | from teuthology.config import config as teuth_config | |
13 | from teuthology.orchestra.run import CommandFailedError | |
14 | from teuthology.parallel import parallel | |
15 | from teuthology.orchestra import run | |
16 | ||
17 | log = logging.getLogger(__name__) | |
18 | ||
19 | ||
20 | class Refspec: | |
21 | def __init__(self, refspec): | |
22 | self.refspec = refspec | |
23 | ||
24 | def __str__(self): | |
25 | return self.refspec | |
26 | ||
27 | def _clone(self, git_url, clonedir, opts=None): | |
28 | if opts is None: | |
29 | opts = [] | |
30 | return (['rm', '-rf', clonedir] + | |
31 | [run.Raw('&&')] + | |
32 | ['git', 'clone'] + opts + | |
33 | [git_url, clonedir]) | |
34 | ||
35 | def _cd(self, clonedir): | |
36 | return ['cd', clonedir] | |
37 | ||
38 | def _checkout(self): | |
39 | return ['git', 'checkout', self.refspec] | |
40 | ||
41 | def clone(self, git_url, clonedir): | |
42 | return (self._clone(git_url, clonedir) + | |
43 | [run.Raw('&&')] + | |
44 | self._cd(clonedir) + | |
45 | [run.Raw('&&')] + | |
46 | self._checkout()) | |
47 | ||
48 | ||
49 | class Branch(Refspec): | |
50 | def __init__(self, tag): | |
51 | Refspec.__init__(self, tag) | |
52 | ||
53 | def clone(self, git_url, clonedir): | |
54 | opts = ['--depth', '1', | |
55 | '--branch', self.refspec] | |
56 | return (self._clone(git_url, clonedir, opts) + | |
57 | [run.Raw('&&')] + | |
58 | self._cd(clonedir)) | |
59 | ||
60 | ||
61 | class Head(Refspec): | |
62 | def __init__(self): | |
63 | Refspec.__init__(self, 'HEAD') | |
64 | ||
65 | def clone(self, git_url, clonedir): | |
66 | opts = ['--depth', '1'] | |
67 | return (self._clone(git_url, clonedir, opts) + | |
68 | [run.Raw('&&')] + | |
69 | self._cd(clonedir)) | |
70 | ||
71 | ||
72 | def task(ctx, config): | |
73 | """ | |
74 | Run ceph on all workunits found under the specified path. | |
75 | ||
76 | For example:: | |
77 | ||
78 | tasks: | |
79 | - ceph: | |
80 | - ceph-fuse: [client.0] | |
81 | - workunit: | |
82 | clients: | |
83 | client.0: [direct_io, xattrs.sh] | |
84 | client.1: [snaps] | |
85 | branch: foo | |
86 | ||
87 | You can also run a list of workunits on all clients: | |
88 | tasks: | |
89 | - ceph: | |
90 | - ceph-fuse: | |
91 | - workunit: | |
92 | tag: v0.47 | |
93 | clients: | |
94 | all: [direct_io, xattrs.sh, snaps] | |
95 | ||
96 | If you have an "all" section it will run all the workunits | |
97 | on each client simultaneously, AFTER running any workunits specified | |
98 | for individual clients. (This prevents unintended simultaneous runs.) | |
99 | ||
100 | To customize tests, you can specify environment variables as a dict. You | |
101 | can also specify a time limit for each work unit (defaults to 3h): | |
102 | ||
103 | tasks: | |
104 | - ceph: | |
105 | - ceph-fuse: | |
106 | - workunit: | |
107 | sha1: 9b28948635b17165d17c1cf83d4a870bd138ddf6 | |
108 | clients: | |
109 | all: [snaps] | |
110 | env: | |
111 | FOO: bar | |
112 | BAZ: quux | |
113 | timeout: 3h | |
114 | ||
115 | This task supports roles that include a ceph cluster, e.g.:: | |
116 | ||
117 | tasks: | |
118 | - ceph: | |
119 | - workunit: | |
120 | clients: | |
121 | backup.client.0: [foo] | |
122 | client.1: [bar] # cluster is implicitly 'ceph' | |
123 | ||
124 | :param ctx: Context | |
125 | :param config: Configuration | |
126 | """ | |
127 | assert isinstance(config, dict) | |
128 | assert isinstance(config.get('clients'), dict), \ | |
129 | 'configuration must contain a dictionary of clients' | |
130 | ||
131 | # mimic the behavior of the "install" task, where the "overrides" are | |
132 | # actually the defaults of that task. in other words, if none of "sha1", | |
133 | # "tag", or "branch" is specified by a "workunit" tasks, we will update | |
134 | # it with the information in the "workunit" sub-task nested in "overrides". | |
135 | overrides = deepcopy(ctx.config.get('overrides', {}).get('workunit', {})) | |
136 | refspecs = {'branch': Branch, 'tag': Refspec, 'sha1': Refspec} | |
137 | if any(map(lambda i: i in config, refspecs.iterkeys())): | |
138 | for i in refspecs.iterkeys(): | |
139 | overrides.pop(i, None) | |
140 | misc.deep_merge(config, overrides) | |
141 | ||
142 | for spec, cls in refspecs.iteritems(): | |
143 | refspec = config.get(spec) | |
144 | if refspec: | |
145 | refspec = cls(refspec) | |
146 | break | |
147 | if refspec is None: | |
148 | refspec = Head() | |
149 | ||
150 | timeout = config.get('timeout', '3h') | |
151 | ||
152 | log.info('Pulling workunits from ref %s', refspec) | |
153 | ||
154 | created_mountpoint = {} | |
155 | ||
156 | if config.get('env') is not None: | |
157 | assert isinstance(config['env'], dict), 'env must be a dictionary' | |
158 | clients = config['clients'] | |
159 | ||
160 | # Create scratch dirs for any non-all workunits | |
161 | log.info('Making a separate scratch dir for every client...') | |
162 | for role in clients.iterkeys(): | |
163 | assert isinstance(role, basestring) | |
164 | if role == "all": | |
165 | continue | |
166 | ||
167 | assert 'client' in role | |
168 | created_mnt_dir = _make_scratch_dir(ctx, role, config.get('subdir')) | |
169 | created_mountpoint[role] = created_mnt_dir | |
170 | ||
171 | # Execute any non-all workunits | |
172 | with parallel() as p: | |
173 | for role, tests in clients.iteritems(): | |
174 | if role != "all": | |
175 | p.spawn(_run_tests, ctx, refspec, role, tests, | |
176 | config.get('env'), timeout=timeout) | |
177 | ||
178 | # Clean up dirs from any non-all workunits | |
179 | for role, created in created_mountpoint.items(): | |
180 | _delete_dir(ctx, role, created) | |
181 | ||
182 | # Execute any 'all' workunits | |
183 | if 'all' in clients: | |
184 | all_tasks = clients["all"] | |
185 | _spawn_on_all_clients(ctx, refspec, all_tasks, config.get('env'), | |
186 | config.get('subdir'), timeout=timeout) | |
187 | ||
188 | ||
189 | def _client_mountpoint(ctx, cluster, id_): | |
190 | """ | |
191 | Returns the path to the expected mountpoint for workunits running | |
192 | on some kind of filesystem. | |
193 | """ | |
194 | # for compatibility with tasks like ceph-fuse that aren't cluster-aware yet, | |
195 | # only include the cluster name in the dir if the cluster is not 'ceph' | |
196 | if cluster == 'ceph': | |
197 | dir_ = 'mnt.{0}'.format(id_) | |
198 | else: | |
199 | dir_ = 'mnt.{0}.{1}'.format(cluster, id_) | |
200 | return os.path.join(misc.get_testdir(ctx), dir_) | |
201 | ||
202 | ||
203 | def _delete_dir(ctx, role, created_mountpoint): | |
204 | """ | |
205 | Delete file used by this role, and delete the directory that this | |
206 | role appeared in. | |
207 | ||
208 | :param ctx: Context | |
209 | :param role: "role.#" where # is used for the role id. | |
210 | """ | |
211 | cluster, _, id_ = misc.split_role(role) | |
212 | remote = get_remote_for_role(ctx, role) | |
213 | mnt = _client_mountpoint(ctx, cluster, id_) | |
214 | client = os.path.join(mnt, 'client.{id}'.format(id=id_)) | |
215 | ||
216 | # Remove the directory inside the mount where the workunit ran | |
217 | remote.run( | |
218 | args=[ | |
219 | 'sudo', | |
220 | 'rm', | |
221 | '-rf', | |
222 | '--', | |
223 | client, | |
224 | ], | |
225 | ) | |
226 | log.info("Deleted dir {dir}".format(dir=client)) | |
227 | ||
228 | # If the mount was an artificially created dir, delete that too | |
229 | if created_mountpoint: | |
230 | remote.run( | |
231 | args=[ | |
232 | 'rmdir', | |
233 | '--', | |
234 | mnt, | |
235 | ], | |
236 | ) | |
237 | log.info("Deleted artificial mount point {dir}".format(dir=client)) | |
238 | ||
239 | ||
240 | def _make_scratch_dir(ctx, role, subdir): | |
241 | """ | |
242 | Make scratch directories for this role. This also makes the mount | |
243 | point if that directory does not exist. | |
244 | ||
245 | :param ctx: Context | |
246 | :param role: "role.#" where # is used for the role id. | |
247 | :param subdir: use this subdir (False if not used) | |
248 | """ | |
249 | created_mountpoint = False | |
250 | cluster, _, id_ = misc.split_role(role) | |
251 | remote = get_remote_for_role(ctx, role) | |
252 | dir_owner = remote.user | |
253 | mnt = _client_mountpoint(ctx, cluster, id_) | |
254 | # if neither kclient nor ceph-fuse are required for a workunit, | |
255 | # mnt may not exist. Stat and create the directory if it doesn't. | |
256 | try: | |
257 | remote.run( | |
258 | args=[ | |
259 | 'stat', | |
260 | '--', | |
261 | mnt, | |
262 | ], | |
263 | ) | |
264 | log.info('Did not need to create dir {dir}'.format(dir=mnt)) | |
265 | except CommandFailedError: | |
266 | remote.run( | |
267 | args=[ | |
268 | 'mkdir', | |
269 | '--', | |
270 | mnt, | |
271 | ], | |
272 | ) | |
273 | log.info('Created dir {dir}'.format(dir=mnt)) | |
274 | created_mountpoint = True | |
275 | ||
276 | if not subdir: | |
277 | subdir = 'client.{id}'.format(id=id_) | |
278 | ||
279 | if created_mountpoint: | |
280 | remote.run( | |
281 | args=[ | |
282 | 'cd', | |
283 | '--', | |
284 | mnt, | |
285 | run.Raw('&&'), | |
286 | 'mkdir', | |
287 | '--', | |
288 | subdir, | |
289 | ], | |
290 | ) | |
291 | else: | |
292 | remote.run( | |
293 | args=[ | |
294 | # cd first so this will fail if the mount point does | |
295 | # not exist; pure install -d will silently do the | |
296 | # wrong thing | |
297 | 'cd', | |
298 | '--', | |
299 | mnt, | |
300 | run.Raw('&&'), | |
301 | 'sudo', | |
302 | 'install', | |
303 | '-d', | |
304 | '-m', '0755', | |
305 | '--owner={user}'.format(user=dir_owner), | |
306 | '--', | |
307 | subdir, | |
308 | ], | |
309 | ) | |
310 | ||
311 | return created_mountpoint | |
312 | ||
313 | ||
314 | def _spawn_on_all_clients(ctx, refspec, tests, env, subdir, timeout=None): | |
315 | """ | |
316 | Make a scratch directory for each client in the cluster, and then for each | |
317 | test spawn _run_tests() for each role. | |
318 | ||
319 | See run_tests() for parameter documentation. | |
320 | """ | |
321 | is_client = misc.is_type('client') | |
322 | client_remotes = {} | |
323 | created_mountpoint = {} | |
324 | for remote, roles_for_host in ctx.cluster.remotes.items(): | |
325 | for role in roles_for_host: | |
326 | if is_client(role): | |
327 | client_remotes[role] = remote | |
328 | created_mountpoint[role] = _make_scratch_dir(ctx, role, subdir) | |
329 | ||
330 | for unit in tests: | |
331 | with parallel() as p: | |
332 | for role, remote in client_remotes.items(): | |
333 | p.spawn(_run_tests, ctx, refspec, role, [unit], env, subdir, | |
334 | timeout=timeout) | |
335 | ||
336 | # cleanup the generated client directories | |
337 | for role, _ in client_remotes.items(): | |
338 | _delete_dir(ctx, role, created_mountpoint[role]) | |
339 | ||
340 | ||
341 | def _run_tests(ctx, refspec, role, tests, env, subdir=None, timeout=None): | |
342 | """ | |
343 | Run the individual test. Create a scratch directory and then extract the | |
344 | workunits from git. Make the executables, and then run the tests. | |
345 | Clean up (remove files created) after the tests are finished. | |
346 | ||
347 | :param ctx: Context | |
348 | :param refspec: branch, sha1, or version tag used to identify this | |
349 | build | |
350 | :param tests: specific tests specified. | |
351 | :param env: environment set in yaml file. Could be None. | |
352 | :param subdir: subdirectory set in yaml file. Could be None | |
353 | :param timeout: If present, use the 'timeout' command on the remote host | |
354 | to limit execution time. Must be specified by a number | |
355 | followed by 's' for seconds, 'm' for minutes, 'h' for | |
356 | hours, or 'd' for days. If '0' or anything that evaluates | |
357 | to False is passed, the 'timeout' command is not used. | |
358 | """ | |
359 | testdir = misc.get_testdir(ctx) | |
360 | assert isinstance(role, basestring) | |
361 | cluster, type_, id_ = misc.split_role(role) | |
362 | assert type_ == 'client' | |
363 | remote = get_remote_for_role(ctx, role) | |
364 | mnt = _client_mountpoint(ctx, cluster, id_) | |
365 | # subdir so we can remove and recreate this a lot without sudo | |
366 | if subdir is None: | |
367 | scratch_tmp = os.path.join(mnt, 'client.{id}'.format(id=id_), 'tmp') | |
368 | else: | |
369 | scratch_tmp = os.path.join(mnt, subdir) | |
370 | clonedir = '{tdir}/clone.{role}'.format(tdir=testdir, role=role) | |
371 | srcdir = '{cdir}/qa/workunits'.format(cdir=clonedir) | |
372 | ||
373 | git_url = teuth_config.get_ceph_qa_suite_git_url() | |
374 | # if we are running an upgrade test, and ceph-ci does not have branches like | |
375 | # `jewel`, so should use ceph.git as an alternative. | |
376 | try: | |
377 | remote.run(logger=log.getChild(role), | |
378 | args=refspec.clone(git_url, clonedir)) | |
379 | except CommandFailedError: | |
380 | if not git_url.endswith('/ceph-ci.git'): | |
381 | raise | |
382 | alt_git_url = git_url.replace('/ceph-ci.git', '/ceph.git') | |
383 | log.info( | |
384 | "failed to check out '%s' from %s; will also try in %s", | |
385 | refspec, | |
386 | git_url, | |
387 | alt_git_url, | |
388 | ) | |
389 | remote.run(logger=log.getChild(role), | |
390 | args=refspec.clone(alt_git_url, clonedir)) | |
391 | remote.run( | |
392 | logger=log.getChild(role), | |
393 | args=[ | |
394 | 'cd', '--', srcdir, | |
395 | run.Raw('&&'), | |
396 | 'if', 'test', '-e', 'Makefile', run.Raw(';'), 'then', 'make', run.Raw(';'), 'fi', | |
397 | run.Raw('&&'), | |
398 | 'find', '-executable', '-type', 'f', '-printf', r'%P\0'.format(srcdir=srcdir), | |
399 | run.Raw('>{tdir}/workunits.list.{role}'.format(tdir=testdir, role=role)), | |
400 | ], | |
401 | ) | |
402 | ||
403 | workunits_file = '{tdir}/workunits.list.{role}'.format(tdir=testdir, role=role) | |
404 | workunits = sorted(misc.get_file(remote, workunits_file).split('\0')) | |
405 | assert workunits | |
406 | ||
407 | try: | |
408 | assert isinstance(tests, list) | |
409 | for spec in tests: | |
410 | log.info('Running workunits matching %s on %s...', spec, role) | |
411 | prefix = '{spec}/'.format(spec=spec) | |
412 | to_run = [w for w in workunits if w == spec or w.startswith(prefix)] | |
413 | if not to_run: | |
414 | raise RuntimeError('Spec did not match any workunits: {spec!r}'.format(spec=spec)) | |
415 | for workunit in to_run: | |
416 | log.info('Running workunit %s...', workunit) | |
417 | args = [ | |
418 | 'mkdir', '-p', '--', scratch_tmp, | |
419 | run.Raw('&&'), | |
420 | 'cd', '--', scratch_tmp, | |
421 | run.Raw('&&'), | |
422 | run.Raw('CEPH_CLI_TEST_DUP_COMMAND=1'), | |
423 | run.Raw('CEPH_REF={ref}'.format(ref=refspec)), | |
424 | run.Raw('TESTDIR="{tdir}"'.format(tdir=testdir)), | |
425 | run.Raw('CEPH_ARGS="--cluster {0}"'.format(cluster)), | |
426 | run.Raw('CEPH_ID="{id}"'.format(id=id_)), | |
427 | run.Raw('PATH=$PATH:/usr/sbin'), | |
428 | run.Raw('CEPH_BASE={dir}'.format(dir=clonedir)), | |
429 | ] | |
430 | if env is not None: | |
431 | for var, val in env.iteritems(): | |
432 | quoted_val = pipes.quote(val) | |
433 | env_arg = '{var}={val}'.format(var=var, val=quoted_val) | |
434 | args.append(run.Raw(env_arg)) | |
435 | args.extend([ | |
436 | 'adjust-ulimits', | |
437 | 'ceph-coverage', | |
438 | '{tdir}/archive/coverage'.format(tdir=testdir)]) | |
439 | if timeout and timeout != '0': | |
440 | args.extend(['timeout', timeout]) | |
441 | args.extend([ | |
442 | '{srcdir}/{workunit}'.format( | |
443 | srcdir=srcdir, | |
444 | workunit=workunit, | |
445 | ), | |
446 | ]) | |
447 | remote.run( | |
448 | logger=log.getChild(role), | |
449 | args=args, | |
450 | label="workunit test {workunit}".format(workunit=workunit) | |
451 | ) | |
452 | remote.run( | |
453 | logger=log.getChild(role), | |
454 | args=['sudo', 'rm', '-rf', '--', scratch_tmp], | |
455 | ) | |
456 | finally: | |
457 | log.info('Stopping %s on %s...', tests, role) | |
458 | remote.run( | |
459 | logger=log.getChild(role), | |
460 | args=[ | |
461 | 'rm', '-rf', '--', workunits_file, clonedir, | |
462 | ], | |
463 | ) |