6 from shlex
import split
as shlex_split
8 from tasks
.ceph_test_case
import CephTestCase
10 from teuthology
import contextutil
11 from teuthology
.orchestra
import run
12 from teuthology
.exceptions
import CommandFailedError
14 log
= logging
.getLogger(__name__
)
22 def for_teuthology(f
):
24 Decorator that adds an "is_for_teuthology" attribute to the wrapped function
26 f
.is_for_teuthology
= True
30 def needs_trimming(f
):
32 Mark fn as requiring a client capable of trimming its cache (i.e. for ceph-fuse
33 this means it needs to be able to run as root, currently)
35 f
.needs_trimming
= True
41 def __init__(self
, mntobj
):
42 self
.client_id
= mntobj
.client_id
43 self
.client_keyring_path
= mntobj
.client_keyring_path
44 self
.client_remote
= mntobj
.client_remote
45 self
.cephfs_name
= mntobj
.cephfs_name
46 self
.cephfs_mntpt
= mntobj
.cephfs_mntpt
47 self
.hostfs_mntpt
= mntobj
.hostfs_mntpt
49 def restore(self
, mntobj
):
50 mntobj
.client_id
= self
.client_id
51 mntobj
.client_keyring_path
= self
.client_keyring_path
52 mntobj
.client_remote
= self
.client_remote
53 mntobj
.cephfs_name
= self
.cephfs_name
54 mntobj
.cephfs_mntpt
= self
.cephfs_mntpt
55 mntobj
.hostfs_mntpt
= self
.hostfs_mntpt
58 class CephFSTestCase(CephTestCase
):
60 Test case for Ceph FS, requires caller to populate Filesystem and Mounts,
61 into the fs, mount_a, mount_b class attributes (setting mount_b is optional)
63 Handles resetting the cluster under test between tests.
66 # FIXME weird explicit naming
71 # Declarative test requirements: subclasses should override these to indicate
72 # their special needs. If not met, tests will be skipped.
75 REQUIRE_ONE_CLIENT_REMOTE
= False
77 # Whether to create the default filesystem during setUp
78 REQUIRE_FILESYSTEM
= True
80 # create a backup filesystem if required.
81 # required REQUIRE_FILESYSTEM enabled
82 REQUIRE_BACKUP_FILESYSTEM
= False
84 LOAD_SETTINGS
= [] # type: ignore
86 def _save_mount_details(self
):
88 XXX: Tests may change details of mount objects, so let's stash them so
89 that these details are restored later to ensure smooth setUps and
90 tearDowns for upcoming tests.
92 self
._orig
_mount
_details
= [MountDetails(m
) for m
in self
.mounts
]
93 log
.info(self
._orig
_mount
_details
)
95 def _remove_blocklist(self
):
96 # In case anything is in the OSD blocklist list, clear it out. This is to avoid
97 # the OSD map changing in the background (due to blocklist expiry) while tests run.
99 self
.mds_cluster
.mon_manager
.run_cluster_cmd(args
="osd blocklist clear")
100 except CommandFailedError
:
101 # Fallback for older Ceph cluster
103 blocklist
= json
.loads(self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd",
104 "dump", "--format=json-pretty"))['blocklist']
105 log
.info(f
"Removing {len(blocklist)} blocklist entries")
106 for addr
, blocklisted_at
in blocklist
.items():
107 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd", "blocklist", "rm", addr
)
109 # Fallback for more older Ceph clusters, who will use 'blacklist' instead.
110 blacklist
= json
.loads(self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd",
111 "dump", "--format=json-pretty"))['blacklist']
112 log
.info(f
"Removing {len(blacklist)} blacklist entries")
113 for addr
, blocklisted_at
in blacklist
.items():
114 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd", "blacklist", "rm", addr
)
117 super(CephFSTestCase
, self
).setUp()
119 self
.config_set('mon', 'mon_allow_pool_delete', True)
121 if len(self
.mds_cluster
.mds_ids
) < self
.MDSS_REQUIRED
:
122 self
.skipTest("Only have {0} MDSs, require {1}".format(
123 len(self
.mds_cluster
.mds_ids
), self
.MDSS_REQUIRED
126 if len(self
.mounts
) < self
.CLIENTS_REQUIRED
:
127 self
.skipTest("Only have {0} clients, require {1}".format(
128 len(self
.mounts
), self
.CLIENTS_REQUIRED
131 if self
.REQUIRE_ONE_CLIENT_REMOTE
:
132 if self
.mounts
[0].client_remote
.hostname
in self
.mds_cluster
.get_mds_hostnames():
133 self
.skipTest("Require first client to be on separate server from MDSs")
135 # Create friendly mount_a, mount_b attrs
136 for i
in range(0, self
.CLIENTS_REQUIRED
):
137 setattr(self
, "mount_{0}".format(chr(ord('a') + i
)), self
.mounts
[i
])
139 self
.mds_cluster
.clear_firewall()
141 # Unmount all clients, we are about to blow away the filesystem
142 for mount
in self
.mounts
:
143 if mount
.is_mounted():
144 mount
.umount_wait(force
=True)
145 self
._save
_mount
_details
()
147 # To avoid any issues with e.g. unlink bugs, we destroy and recreate
148 # the filesystem rather than just doing a rm -rf of files
149 self
.mds_cluster
.delete_all_filesystems()
150 self
.mds_cluster
.mds_restart() # to reset any run-time configs, etc.
151 self
.fs
= None # is now invalid!
152 self
.backup_fs
= None
153 self
.recovery_fs
= None
155 self
._remove
_blocklist
()
157 client_mount_ids
= [m
.client_id
for m
in self
.mounts
]
158 # In case there were any extra auth identities around from a previous
160 for entry
in self
.auth_list():
161 ent_type
, ent_id
= entry
['entity'].split(".")
162 if ent_type
== "client" and ent_id
not in client_mount_ids
and not (ent_id
== "admin" or ent_id
[:6] == 'mirror'):
163 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("auth", "del", entry
['entity'])
165 if self
.REQUIRE_FILESYSTEM
:
166 self
.fs
= self
.mds_cluster
.newfs(create
=True)
168 # In case some test messed with auth caps, reset them
169 for client_id
in client_mount_ids
:
170 cmd
= ['auth', 'caps', f
'client.{client_id}', 'mon','allow r',
171 'osd', f
'allow rw tag cephfs data={self.fs.name}',
174 if self
.run_cluster_cmd_result(cmd
) == 0:
178 if self
.run_cluster_cmd_result(cmd
) != 0:
179 raise RuntimeError(f
'Failed to create new client {cmd[2]}')
181 # wait for ranks to become active
182 self
.fs
.wait_for_daemons()
184 # Mount the requested number of clients
185 for i
in range(0, self
.CLIENTS_REQUIRED
):
186 self
.mounts
[i
].mount_wait()
188 if self
.REQUIRE_BACKUP_FILESYSTEM
:
189 if not self
.REQUIRE_FILESYSTEM
:
190 self
.skipTest("backup filesystem requires a primary filesystem as well")
191 self
.fs
.mon_manager
.raw_cluster_cmd('fs', 'flag', 'set',
192 'enable_multiple', 'true',
193 '--yes-i-really-mean-it')
194 self
.backup_fs
= self
.mds_cluster
.newfs(name
="backup_fs")
195 self
.backup_fs
.wait_for_daemons()
197 # Load an config settings of interest
198 for setting
in self
.LOAD_SETTINGS
:
199 setattr(self
, setting
, float(self
.fs
.mds_asok(
200 ['config', 'get', setting
], list(self
.mds_cluster
.mds_ids
)[0]
203 self
.configs_set
= set()
206 self
.mds_cluster
.clear_firewall()
207 for m
in self
.mounts
:
210 # To prevent failover messages during Unwind of ceph task
211 self
.mds_cluster
.delete_all_filesystems()
213 for m
, md
in zip(self
.mounts
, self
._orig
_mount
_details
):
216 for subsys
, key
in self
.configs_set
:
217 self
.mds_cluster
.clear_ceph_conf(subsys
, key
)
219 return super(CephFSTestCase
, self
).tearDown()
221 def set_conf(self
, subsys
, key
, value
):
222 self
.configs_set
.add((subsys
, key
))
223 self
.mds_cluster
.set_ceph_conf(subsys
, key
, value
)
227 Convenience wrapper on "ceph auth ls"
229 return json
.loads(self
.mds_cluster
.mon_manager
.raw_cluster_cmd(
230 "auth", "ls", "--format=json-pretty"
233 def assert_session_count(self
, expected
, ls_data
=None, mds_id
=None):
235 ls_data
= self
.fs
.mds_asok(['session', 'ls'], mds_id
=mds_id
)
237 alive_count
= len([s
for s
in ls_data
if s
['state'] != 'killing'])
239 self
.assertEqual(expected
, alive_count
, "Expected {0} sessions, found {1}".format(
240 expected
, alive_count
243 def assert_session_state(self
, client_id
, expected_state
):
246 self
.fs
.mds_asok(['session', 'ls'])).get(client_id
, {'state': None})['state'],
249 def get_session_data(self
, client_id
):
250 return self
._session
_by
_id
(client_id
)
252 def _session_list(self
):
253 ls_data
= self
.fs
.mds_asok(['session', 'ls'])
254 ls_data
= [s
for s
in ls_data
if s
['state'] not in ['stale', 'closed']]
257 def get_session(self
, client_id
, session_ls
=None):
258 if session_ls
is None:
259 session_ls
= self
.fs
.mds_asok(['session', 'ls'])
261 return self
._session
_by
_id
(session_ls
)[client_id
]
263 def _session_by_id(self
, session_ls
):
264 return dict([(s
['id'], s
) for s
in session_ls
])
266 def perf_dump(self
, rank
=None, status
=None):
267 return self
.fs
.rank_asok(['perf', 'dump'], rank
=rank
, status
=status
)
269 def wait_until_evicted(self
, client_id
, timeout
=30):
270 def is_client_evicted():
271 ls
= self
._session
_list
()
273 if s
['id'] == client_id
:
276 self
.wait_until_true(is_client_evicted
, timeout
)
278 def wait_for_daemon_start(self
, daemon_ids
=None):
280 Wait until all the daemons appear in the FSMap, either assigned
281 MDS ranks or in the list of standbys
283 def get_daemon_names():
284 return [info
['name'] for info
in self
.mds_cluster
.status().get_all()]
286 if daemon_ids
is None:
287 daemon_ids
= self
.mds_cluster
.mds_ids
290 self
.wait_until_true(
291 lambda: set(daemon_ids
) & set(get_daemon_names()) == set(daemon_ids
),
295 log
.warning("Timeout waiting for daemons {0}, while we have {1}".format(
296 daemon_ids
, get_daemon_names()
300 def delete_mds_coredump(self
, daemon_id
):
301 # delete coredump file, otherwise teuthology.internal.coredump will
302 # catch it later and treat it as a failure.
303 core_pattern
= self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.sh(
304 "sudo sysctl -n kernel.core_pattern")
305 core_dir
= os
.path
.dirname(core_pattern
.strip())
306 if core_dir
: # Non-default core_pattern with a directory in it
307 # We have seen a core_pattern that looks like it's from teuthology's coredump
308 # task, so proceed to clear out the core file
309 if core_dir
[0] == '|':
310 log
.info("Piped core dumps to program {0}, skip cleaning".format(core_dir
[1:]))
313 log
.info("Clearing core from directory: {0}".format(core_dir
))
315 # Verify that we see the expected single coredump
316 ls_output
= self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.sh([
317 "cd", core_dir
, run
.Raw('&&'),
318 "sudo", "ls", run
.Raw('|'), "sudo", "xargs", "file"
320 cores
= [l
.partition(":")[0]
321 for l
in ls_output
.strip().split("\n")
322 if re
.match(r
'.*ceph-mds.* -i +{0}'.format(daemon_id
), l
)]
324 log
.info("Enumerated cores: {0}".format(cores
))
325 self
.assertEqual(len(cores
), 1)
327 log
.info("Found core file {0}, deleting it".format(cores
[0]))
329 self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.run(args
=[
330 "cd", core_dir
, run
.Raw('&&'), "sudo", "rm", "-f", cores
[0]
333 log
.info("No core_pattern directory set, nothing to clear (internal.coredump not enabled?)")
335 def _get_subtrees(self
, status
=None, rank
=None, path
=None):
339 with contextutil
.safe_while(sleep
=1, tries
=3) as proceed
:
344 for r
in self
.fs
.get_ranks(status
=status
):
345 s
= self
.fs
.rank_asok(["get", "subtrees"], status
=status
, rank
=r
['rank'])
346 s
= filter(lambda s
: s
['auth_first'] == r
['rank'] and s
['auth_second'] == -2, s
)
349 subtrees
= self
.fs
.rank_asok(["get", "subtrees"], status
=status
, rank
=rank
)
350 subtrees
= filter(lambda s
: s
['dir']['path'].startswith(path
), subtrees
)
351 return list(subtrees
)
352 except CommandFailedError
as e
:
353 # Sometimes we get transient errors
354 if e
.exitstatus
== 22:
358 except contextutil
.MaxWhileTries
as e
:
359 raise RuntimeError(f
"could not get subtree state from rank {rank}") from e
361 def _wait_subtrees(self
, test
, status
=None, rank
=None, timeout
=30, sleep
=2, action
=None, path
=None):
364 with contextutil
.safe_while(sleep
=sleep
, tries
=timeout
//sleep
) as proceed
:
366 subtrees
= self
._get
_subtrees
(status
=status
, rank
=rank
, path
=path
)
367 filtered
= sorted([(s
['dir']['path'], s
['auth_first']) for s
in subtrees
])
368 log
.info("%s =?= %s", filtered
, test
)
370 # Confirm export_pin in output is correct:
372 if s
['export_pin_target'] >= 0:
373 self
.assertTrue(s
['export_pin_target'] == s
['auth_first'])
375 if action
is not None:
377 except contextutil
.MaxWhileTries
as e
:
378 raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank
)) from e
380 def _wait_until_scrub_complete(self
, path
="/", recursive
=True, timeout
=100):
381 out_json
= self
.fs
.run_scrub(["start", path
] + ["recursive"] if recursive
else [])
382 if not self
.fs
.wait_until_scrub_complete(tag
=out_json
["scrub_tag"],
383 sleep
=10, timeout
=timeout
):
384 log
.info("timed out waiting for scrub to complete")
386 def _wait_distributed_subtrees(self
, count
, status
=None, rank
=None, path
=None):
388 with contextutil
.safe_while(sleep
=5, tries
=20) as proceed
:
390 subtrees
= self
._get
_subtrees
(status
=status
, rank
=rank
, path
=path
)
391 subtrees
= list(filter(lambda s
: s
['distributed_ephemeral_pin'] == True and
392 s
['auth_first'] == s
['export_pin_target'],
394 log
.info(f
"len={len(subtrees)} {subtrees}")
395 if len(subtrees
) >= count
:
397 except contextutil
.MaxWhileTries
as e
:
398 raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank
)) from e
400 def _wait_random_subtrees(self
, count
, status
=None, rank
=None, path
=None):
402 with contextutil
.safe_while(sleep
=5, tries
=20) as proceed
:
404 subtrees
= self
._get
_subtrees
(status
=status
, rank
=rank
, path
=path
)
405 subtrees
= list(filter(lambda s
: s
['random_ephemeral_pin'] == True and
406 s
['auth_first'] == s
['export_pin_target'],
408 log
.info(f
"len={len(subtrees)} {subtrees}")
409 if len(subtrees
) >= count
:
411 except contextutil
.MaxWhileTries
as e
:
412 raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank
)) from e
414 def run_cluster_cmd(self
, cmd
):
415 if isinstance(cmd
, str):
416 cmd
= shlex_split(cmd
)
417 return self
.fs
.mon_manager
.raw_cluster_cmd(*cmd
)
419 def run_cluster_cmd_result(self
, cmd
):
420 if isinstance(cmd
, str):
421 cmd
= shlex_split(cmd
)
422 return self
.fs
.mon_manager
.raw_cluster_cmd_result(*cmd
)
424 def create_client(self
, client_id
, moncap
=None, osdcap
=None, mdscap
=None):
425 if not (moncap
or osdcap
or mdscap
):
427 return self
.fs
.authorize(client_id
, ('/', 'rw'))
429 raise RuntimeError('no caps were passed and the default FS '
430 'is not created yet to allow client auth '
433 cmd
= ['auth', 'add', f
'client.{client_id}']
435 cmd
+= ['mon', moncap
]
437 cmd
+= ['osd', osdcap
]
439 cmd
+= ['mds', mdscap
]
441 self
.run_cluster_cmd(cmd
)
442 return self
.run_cluster_cmd(f
'auth get {self.client_name}')