6 from shlex
import split
as shlex_split
8 from tasks
.ceph_test_case
import CephTestCase
10 from teuthology
import contextutil
11 from teuthology
.orchestra
import run
12 from teuthology
.exceptions
import CommandFailedError
14 log
= logging
.getLogger(__name__
)
16 def for_teuthology(f
):
18 Decorator that adds an "is_for_teuthology" attribute to the wrapped function
20 f
.is_for_teuthology
= True
24 def needs_trimming(f
):
26 Mark fn as requiring a client capable of trimming its cache (i.e. for ceph-fuse
27 this means it needs to be able to run as root, currently)
29 f
.needs_trimming
= True
35 def __init__(self
, mntobj
):
36 self
.client_id
= mntobj
.client_id
37 self
.client_keyring_path
= mntobj
.client_keyring_path
38 self
.client_remote
= mntobj
.client_remote
39 self
.cephfs_name
= mntobj
.cephfs_name
40 self
.cephfs_mntpt
= mntobj
.cephfs_mntpt
41 self
.hostfs_mntpt
= mntobj
.hostfs_mntpt
43 def restore(self
, mntobj
):
44 mntobj
.client_id
= self
.client_id
45 mntobj
.client_keyring_path
= self
.client_keyring_path
46 mntobj
.client_remote
= self
.client_remote
47 mntobj
.cephfs_name
= self
.cephfs_name
48 mntobj
.cephfs_mntpt
= self
.cephfs_mntpt
49 mntobj
.hostfs_mntpt
= self
.hostfs_mntpt
52 class CephFSTestCase(CephTestCase
):
54 Test case for Ceph FS, requires caller to populate Filesystem and Mounts,
55 into the fs, mount_a, mount_b class attributes (setting mount_b is optional)
57 Handles resetting the cluster under test between tests.
60 # FIXME weird explicit naming
65 # Declarative test requirements: subclasses should override these to indicate
66 # their special needs. If not met, tests will be skipped.
69 REQUIRE_ONE_CLIENT_REMOTE
= False
71 # Whether to create the default filesystem during setUp
72 REQUIRE_FILESYSTEM
= True
74 # requires REQUIRE_FILESYSTEM = True
75 REQUIRE_RECOVERY_FILESYSTEM
= False
77 # create a backup filesystem if required.
78 # required REQUIRE_FILESYSTEM enabled
79 REQUIRE_BACKUP_FILESYSTEM
= False
81 LOAD_SETTINGS
= [] # type: ignore
83 def _save_mount_details(self
):
85 XXX: Tests may change details of mount objects, so let's stash them so
86 that these details are restored later to ensure smooth setUps and
87 tearDowns for upcoming tests.
89 self
._orig
_mount
_details
= [MountDetails(m
) for m
in self
.mounts
]
90 log
.info(self
._orig
_mount
_details
)
93 super(CephFSTestCase
, self
).setUp()
95 self
.config_set('mon', 'mon_allow_pool_delete', True)
97 if len(self
.mds_cluster
.mds_ids
) < self
.MDSS_REQUIRED
:
98 self
.skipTest("Only have {0} MDSs, require {1}".format(
99 len(self
.mds_cluster
.mds_ids
), self
.MDSS_REQUIRED
102 if len(self
.mounts
) < self
.CLIENTS_REQUIRED
:
103 self
.skipTest("Only have {0} clients, require {1}".format(
104 len(self
.mounts
), self
.CLIENTS_REQUIRED
107 if self
.REQUIRE_ONE_CLIENT_REMOTE
:
108 if self
.mounts
[0].client_remote
.hostname
in self
.mds_cluster
.get_mds_hostnames():
109 self
.skipTest("Require first client to be on separate server from MDSs")
111 # Create friendly mount_a, mount_b attrs
112 for i
in range(0, self
.CLIENTS_REQUIRED
):
113 setattr(self
, "mount_{0}".format(chr(ord('a') + i
)), self
.mounts
[i
])
115 self
.mds_cluster
.clear_firewall()
117 # Unmount all clients, we are about to blow away the filesystem
118 for mount
in self
.mounts
:
119 if mount
.is_mounted():
120 mount
.umount_wait(force
=True)
121 self
._save
_mount
_details
()
123 # To avoid any issues with e.g. unlink bugs, we destroy and recreate
124 # the filesystem rather than just doing a rm -rf of files
125 self
.mds_cluster
.delete_all_filesystems()
126 self
.mds_cluster
.mds_restart() # to reset any run-time configs, etc.
127 self
.fs
= None # is now invalid!
128 self
.backup_fs
= None
129 self
.recovery_fs
= None
131 # In case anything is in the OSD blocklist list, clear it out. This is to avoid
132 # the OSD map changing in the background (due to blocklist expiry) while tests run.
134 self
.mds_cluster
.mon_manager
.run_cluster_cmd(args
="osd blocklist clear")
135 except CommandFailedError
:
136 # Fallback for older Ceph cluster
137 blocklist
= json
.loads(self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd",
138 "dump", "--format=json-pretty"))['blocklist']
139 log
.info("Removing {0} blocklist entries".format(len(blocklist
)))
140 for addr
, blocklisted_at
in blocklist
.items():
141 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd", "blocklist", "rm", addr
)
143 client_mount_ids
= [m
.client_id
for m
in self
.mounts
]
144 # In case there were any extra auth identities around from a previous
146 for entry
in self
.auth_list():
147 ent_type
, ent_id
= entry
['entity'].split(".")
148 if ent_type
== "client" and ent_id
not in client_mount_ids
and not (ent_id
== "admin" or ent_id
[:6] == 'mirror'):
149 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("auth", "del", entry
['entity'])
151 if self
.REQUIRE_FILESYSTEM
:
152 self
.fs
= self
.mds_cluster
.newfs(create
=True)
154 # In case some test messed with auth caps, reset them
155 for client_id
in client_mount_ids
:
156 cmd
= ['auth', 'caps', f
'client.{client_id}', 'mon','allow r',
157 'osd', f
'allow rw pool={self.fs.get_data_pool_name()}',
160 if self
.run_cluster_cmd_result(cmd
) == 0:
164 if self
.run_cluster_cmd_result(cmd
) != 0:
165 raise RuntimeError(f
'Failed to create new client {cmd[2]}')
167 # wait for ranks to become active
168 self
.fs
.wait_for_daemons()
170 # Mount the requested number of clients
171 for i
in range(0, self
.CLIENTS_REQUIRED
):
172 self
.mounts
[i
].mount_wait()
174 if self
.REQUIRE_BACKUP_FILESYSTEM
:
175 if not self
.REQUIRE_FILESYSTEM
:
176 self
.skipTest("backup filesystem requires a primary filesystem as well")
177 self
.fs
.mon_manager
.raw_cluster_cmd('fs', 'flag', 'set',
178 'enable_multiple', 'true',
179 '--yes-i-really-mean-it')
180 self
.backup_fs
= self
.mds_cluster
.newfs(name
="backup_fs")
181 self
.backup_fs
.wait_for_daemons()
183 if self
.REQUIRE_RECOVERY_FILESYSTEM
:
184 if not self
.REQUIRE_FILESYSTEM
:
185 self
.skipTest("Recovery filesystem requires a primary filesystem as well")
186 # After Octopus is EOL, we can remove this setting:
187 self
.fs
.mon_manager
.raw_cluster_cmd('fs', 'flag', 'set',
188 'enable_multiple', 'true',
189 '--yes-i-really-mean-it')
190 self
.recovery_fs
= self
.mds_cluster
.newfs(name
="recovery_fs", create
=False)
191 self
.recovery_fs
.set_metadata_overlay(True)
192 self
.recovery_fs
.set_data_pool_name(self
.fs
.get_data_pool_name())
193 self
.recovery_fs
.create()
194 self
.recovery_fs
.getinfo(refresh
=True)
195 self
.recovery_fs
.wait_for_daemons()
197 # Load an config settings of interest
198 for setting
in self
.LOAD_SETTINGS
:
199 setattr(self
, setting
, float(self
.fs
.mds_asok(
200 ['config', 'get', setting
], list(self
.mds_cluster
.mds_ids
)[0]
203 self
.configs_set
= set()
206 self
.mds_cluster
.clear_firewall()
207 for m
in self
.mounts
:
210 # To prevent failover messages during Unwind of ceph task
211 self
.mds_cluster
.delete_all_filesystems()
213 for m
, md
in zip(self
.mounts
, self
._orig
_mount
_details
):
216 for subsys
, key
in self
.configs_set
:
217 self
.mds_cluster
.clear_ceph_conf(subsys
, key
)
219 return super(CephFSTestCase
, self
).tearDown()
221 def set_conf(self
, subsys
, key
, value
):
222 self
.configs_set
.add((subsys
, key
))
223 self
.mds_cluster
.set_ceph_conf(subsys
, key
, value
)
227 Convenience wrapper on "ceph auth ls"
229 return json
.loads(self
.mds_cluster
.mon_manager
.raw_cluster_cmd(
230 "auth", "ls", "--format=json-pretty"
233 def assert_session_count(self
, expected
, ls_data
=None, mds_id
=None):
235 ls_data
= self
.fs
.mds_asok(['session', 'ls'], mds_id
=mds_id
)
237 alive_count
= len([s
for s
in ls_data
if s
['state'] != 'killing'])
239 self
.assertEqual(expected
, alive_count
, "Expected {0} sessions, found {1}".format(
240 expected
, alive_count
243 def assert_session_state(self
, client_id
, expected_state
):
246 self
.fs
.mds_asok(['session', 'ls'])).get(client_id
, {'state': None})['state'],
249 def get_session_data(self
, client_id
):
250 return self
._session
_by
_id
(client_id
)
252 def _session_list(self
):
253 ls_data
= self
.fs
.mds_asok(['session', 'ls'])
254 ls_data
= [s
for s
in ls_data
if s
['state'] not in ['stale', 'closed']]
257 def get_session(self
, client_id
, session_ls
=None):
258 if session_ls
is None:
259 session_ls
= self
.fs
.mds_asok(['session', 'ls'])
261 return self
._session
_by
_id
(session_ls
)[client_id
]
263 def _session_by_id(self
, session_ls
):
264 return dict([(s
['id'], s
) for s
in session_ls
])
266 def perf_dump(self
, rank
=None, status
=None):
267 return self
.fs
.rank_asok(['perf', 'dump'], rank
=rank
, status
=status
)
269 def wait_until_evicted(self
, client_id
, timeout
=30):
270 def is_client_evicted():
271 ls
= self
._session
_list
()
273 if s
['id'] == client_id
:
276 self
.wait_until_true(is_client_evicted
, timeout
)
278 def wait_for_daemon_start(self
, daemon_ids
=None):
280 Wait until all the daemons appear in the FSMap, either assigned
281 MDS ranks or in the list of standbys
283 def get_daemon_names():
284 return [info
['name'] for info
in self
.mds_cluster
.status().get_all()]
286 if daemon_ids
is None:
287 daemon_ids
= self
.mds_cluster
.mds_ids
290 self
.wait_until_true(
291 lambda: set(daemon_ids
) & set(get_daemon_names()) == set(daemon_ids
),
295 log
.warning("Timeout waiting for daemons {0}, while we have {1}".format(
296 daemon_ids
, get_daemon_names()
300 def delete_mds_coredump(self
, daemon_id
):
301 # delete coredump file, otherwise teuthology.internal.coredump will
302 # catch it later and treat it as a failure.
303 core_pattern
= self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.sh(
304 "sudo sysctl -n kernel.core_pattern")
305 core_dir
= os
.path
.dirname(core_pattern
.strip())
306 if core_dir
: # Non-default core_pattern with a directory in it
307 # We have seen a core_pattern that looks like it's from teuthology's coredump
308 # task, so proceed to clear out the core file
309 if core_dir
[0] == '|':
310 log
.info("Piped core dumps to program {0}, skip cleaning".format(core_dir
[1:]))
313 log
.info("Clearing core from directory: {0}".format(core_dir
))
315 # Verify that we see the expected single coredump
316 ls_output
= self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.sh([
317 "cd", core_dir
, run
.Raw('&&'),
318 "sudo", "ls", run
.Raw('|'), "sudo", "xargs", "file"
320 cores
= [l
.partition(":")[0]
321 for l
in ls_output
.strip().split("\n")
322 if re
.match(r
'.*ceph-mds.* -i +{0}'.format(daemon_id
), l
)]
324 log
.info("Enumerated cores: {0}".format(cores
))
325 self
.assertEqual(len(cores
), 1)
327 log
.info("Found core file {0}, deleting it".format(cores
[0]))
329 self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.run(args
=[
330 "cd", core_dir
, run
.Raw('&&'), "sudo", "rm", "-f", cores
[0]
333 log
.info("No core_pattern directory set, nothing to clear (internal.coredump not enabled?)")
335 def _get_subtrees(self
, status
=None, rank
=None, path
=None):
339 with contextutil
.safe_while(sleep
=1, tries
=3) as proceed
:
344 for r
in self
.fs
.get_ranks(status
=status
):
345 s
= self
.fs
.rank_asok(["get", "subtrees"], status
=status
, rank
=r
['rank'])
346 s
= filter(lambda s
: s
['auth_first'] == r
['rank'] and s
['auth_second'] == -2, s
)
349 subtrees
= self
.fs
.rank_asok(["get", "subtrees"], status
=status
, rank
=rank
)
350 subtrees
= filter(lambda s
: s
['dir']['path'].startswith(path
), subtrees
)
351 return list(subtrees
)
352 except CommandFailedError
as e
:
353 # Sometimes we get transient errors
354 if e
.exitstatus
== 22:
358 except contextutil
.MaxWhileTries
as e
:
359 raise RuntimeError(f
"could not get subtree state from rank {rank}") from e
361 def _wait_subtrees(self
, test
, status
=None, rank
=None, timeout
=30, sleep
=2, action
=None, path
=None):
364 with contextutil
.safe_while(sleep
=sleep
, tries
=timeout
//sleep
) as proceed
:
366 subtrees
= self
._get
_subtrees
(status
=status
, rank
=rank
, path
=path
)
367 filtered
= sorted([(s
['dir']['path'], s
['auth_first']) for s
in subtrees
])
368 log
.info("%s =?= %s", filtered
, test
)
370 # Confirm export_pin in output is correct:
372 if s
['export_pin_target'] >= 0:
373 self
.assertTrue(s
['export_pin_target'] == s
['auth_first'])
375 if action
is not None:
377 except contextutil
.MaxWhileTries
as e
:
378 raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank
)) from e
380 def _wait_until_scrub_complete(self
, path
="/", recursive
=True, timeout
=100):
381 out_json
= self
.fs
.run_scrub(["start", path
] + ["recursive"] if recursive
else [])
382 if not self
.fs
.wait_until_scrub_complete(tag
=out_json
["scrub_tag"],
383 sleep
=10, timeout
=timeout
):
384 log
.info("timed out waiting for scrub to complete")
386 def _wait_distributed_subtrees(self
, count
, status
=None, rank
=None, path
=None):
388 with contextutil
.safe_while(sleep
=5, tries
=20) as proceed
:
390 subtrees
= self
._get
_subtrees
(status
=status
, rank
=rank
, path
=path
)
391 subtrees
= list(filter(lambda s
: s
['distributed_ephemeral_pin'] == True and
392 s
['auth_first'] == s
['export_pin_target'],
394 log
.info(f
"len={len(subtrees)} {subtrees}")
395 if len(subtrees
) >= count
:
397 except contextutil
.MaxWhileTries
as e
:
398 raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank
)) from e
400 def _wait_random_subtrees(self
, count
, status
=None, rank
=None, path
=None):
402 with contextutil
.safe_while(sleep
=5, tries
=20) as proceed
:
404 subtrees
= self
._get
_subtrees
(status
=status
, rank
=rank
, path
=path
)
405 subtrees
= list(filter(lambda s
: s
['random_ephemeral_pin'] == True and
406 s
['auth_first'] == s
['export_pin_target'],
408 log
.info(f
"len={len(subtrees)} {subtrees}")
409 if len(subtrees
) >= count
:
411 except contextutil
.MaxWhileTries
as e
:
412 raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank
)) from e
414 def run_cluster_cmd(self
, cmd
):
415 if isinstance(cmd
, str):
416 cmd
= shlex_split(cmd
)
417 return self
.fs
.mon_manager
.raw_cluster_cmd(*cmd
)
419 def run_cluster_cmd_result(self
, cmd
):
420 if isinstance(cmd
, str):
421 cmd
= shlex_split(cmd
)
422 return self
.fs
.mon_manager
.raw_cluster_cmd_result(*cmd
)
424 def create_client(self
, client_id
, moncap
=None, osdcap
=None, mdscap
=None):
425 if not (moncap
or osdcap
or mdscap
):
427 return self
.fs
.authorize(client_id
, ('/', 'rw'))
429 raise RuntimeError('no caps were passed and the default FS '
430 'is not created yet to allow client auth '
433 cmd
= ['auth', 'add', f
'client.{client_id}']
435 cmd
+= ['mon', moncap
]
437 cmd
+= ['osd', osdcap
]
439 cmd
+= ['mds', mdscap
]
441 self
.run_cluster_cmd(cmd
)
442 return self
.run_cluster_cmd(f
'auth get {self.client_name}')