3 from tasks
.ceph_test_case
import CephTestCase
7 from tasks
.cephfs
.fuse_mount
import FuseMount
9 from teuthology
import contextutil
10 from teuthology
.orchestra
import run
11 from teuthology
.orchestra
.run
import CommandFailedError
12 from teuthology
.contextutil
import safe_while
15 log
= logging
.getLogger(__name__
)
18 def for_teuthology(f
):
20 Decorator that adds an "is_for_teuthology" attribute to the wrapped function
22 f
.is_for_teuthology
= True
26 def needs_trimming(f
):
28 Mark fn as requiring a client capable of trimming its cache (i.e. for ceph-fuse
29 this means it needs to be able to run as root, currently)
31 f
.needs_trimming
= True
35 class CephFSTestCase(CephTestCase
):
37 Test case for Ceph FS, requires caller to populate Filesystem and Mounts,
38 into the fs, mount_a, mount_b class attributes (setting mount_b is optional)
40 Handles resetting the cluster under test between tests.
43 # FIXME weird explicit naming
48 # Declarative test requirements: subclasses should override these to indicate
49 # their special needs. If not met, tests will be skipped.
52 REQUIRE_KCLIENT_REMOTE
= False
53 REQUIRE_ONE_CLIENT_REMOTE
= False
55 # Whether to create the default filesystem during setUp
56 REQUIRE_FILESYSTEM
= True
58 # requires REQUIRE_FILESYSTEM = True
59 REQUIRE_RECOVERY_FILESYSTEM
= False
61 LOAD_SETTINGS
= [] # type: ignore
64 super(CephFSTestCase
, self
).setUp()
66 self
.config_set('mon', 'mon_allow_pool_delete', True)
68 if len(self
.mds_cluster
.mds_ids
) < self
.MDSS_REQUIRED
:
69 self
.skipTest("Only have {0} MDSs, require {1}".format(
70 len(self
.mds_cluster
.mds_ids
), self
.MDSS_REQUIRED
73 if len(self
.mounts
) < self
.CLIENTS_REQUIRED
:
74 self
.skipTest("Only have {0} clients, require {1}".format(
75 len(self
.mounts
), self
.CLIENTS_REQUIRED
78 if self
.REQUIRE_KCLIENT_REMOTE
:
79 if not isinstance(self
.mounts
[0], FuseMount
) or not isinstance(self
.mounts
[1], FuseMount
):
80 # kclient kill() power cycles nodes, so requires clients to each be on
82 if self
.mounts
[0].client_remote
.hostname
== self
.mounts
[1].client_remote
.hostname
:
83 self
.skipTest("kclient clients must be on separate nodes")
85 if self
.REQUIRE_ONE_CLIENT_REMOTE
:
86 if self
.mounts
[0].client_remote
.hostname
in self
.mds_cluster
.get_mds_hostnames():
87 self
.skipTest("Require first client to be on separate server from MDSs")
89 # Create friendly mount_a, mount_b attrs
90 for i
in range(0, self
.CLIENTS_REQUIRED
):
91 setattr(self
, "mount_{0}".format(chr(ord('a') + i
)), self
.mounts
[i
])
93 self
.mds_cluster
.clear_firewall()
95 # Unmount all clients, we are about to blow away the filesystem
96 for mount
in self
.mounts
:
97 if mount
.is_mounted():
98 mount
.umount_wait(force
=True)
100 # To avoid any issues with e.g. unlink bugs, we destroy and recreate
101 # the filesystem rather than just doing a rm -rf of files
102 self
.mds_cluster
.delete_all_filesystems()
103 self
.mds_cluster
.mds_restart() # to reset any run-time configs, etc.
104 self
.fs
= None # is now invalid!
105 self
.recovery_fs
= None
107 # In case anything is in the OSD blacklist list, clear it out. This is to avoid
108 # the OSD map changing in the background (due to blacklist expiry) while tests run.
110 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd", "blacklist", "clear")
111 except CommandFailedError
:
112 # Fallback for older Ceph cluster
113 blacklist
= json
.loads(self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd",
114 "dump", "--format=json-pretty"))['blacklist']
115 log
.info("Removing {0} blacklist entries".format(len(blacklist
)))
116 for addr
, blacklisted_at
in blacklist
.items():
117 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd", "blacklist", "rm", addr
)
119 client_mount_ids
= [m
.client_id
for m
in self
.mounts
]
120 # In case the test changes the IDs of clients, stash them so that we can
122 self
._original
_client
_ids
= client_mount_ids
123 log
.info(client_mount_ids
)
125 # In case there were any extra auth identities around from a previous
127 for entry
in self
.auth_list():
128 ent_type
, ent_id
= entry
['entity'].split(".")
129 if ent_type
== "client" and ent_id
not in client_mount_ids
and ent_id
!= "admin":
130 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("auth", "del", entry
['entity'])
132 if self
.REQUIRE_FILESYSTEM
:
133 self
.fs
= self
.mds_cluster
.newfs(create
=True)
135 # In case some test messed with auth caps, reset them
136 for client_id
in client_mount_ids
:
137 self
.mds_cluster
.mon_manager
.raw_cluster_cmd_result(
138 'auth', 'caps', "client.{0}".format(client_id
),
141 'osd', 'allow rw pool={0}'.format(self
.fs
.get_data_pool_name()))
143 # wait for ranks to become active
144 self
.fs
.wait_for_daemons()
146 # Mount the requested number of clients
147 for i
in range(0, self
.CLIENTS_REQUIRED
):
148 self
.mounts
[i
].mount_wait()
150 if self
.REQUIRE_RECOVERY_FILESYSTEM
:
151 if not self
.REQUIRE_FILESYSTEM
:
152 self
.skipTest("Recovery filesystem requires a primary filesystem as well")
153 self
.fs
.mon_manager
.raw_cluster_cmd('fs', 'flag', 'set',
154 'enable_multiple', 'true',
155 '--yes-i-really-mean-it')
156 self
.recovery_fs
= self
.mds_cluster
.newfs(name
="recovery_fs", create
=False)
157 self
.recovery_fs
.set_metadata_overlay(True)
158 self
.recovery_fs
.set_data_pool_name(self
.fs
.get_data_pool_name())
159 self
.recovery_fs
.create()
160 self
.recovery_fs
.getinfo(refresh
=True)
161 self
.recovery_fs
.mds_restart()
162 self
.recovery_fs
.wait_for_daemons()
164 # Load an config settings of interest
165 for setting
in self
.LOAD_SETTINGS
:
166 setattr(self
, setting
, float(self
.fs
.mds_asok(
167 ['config', 'get', setting
], list(self
.mds_cluster
.mds_ids
)[0]
170 self
.configs_set
= set()
173 self
.mds_cluster
.clear_firewall()
174 for m
in self
.mounts
:
177 for i
, m
in enumerate(self
.mounts
):
178 m
.client_id
= self
._original
_client
_ids
[i
]
180 for subsys
, key
in self
.configs_set
:
181 self
.mds_cluster
.clear_ceph_conf(subsys
, key
)
183 return super(CephFSTestCase
, self
).tearDown()
185 def set_conf(self
, subsys
, key
, value
):
186 self
.configs_set
.add((subsys
, key
))
187 self
.mds_cluster
.set_ceph_conf(subsys
, key
, value
)
191 Convenience wrapper on "ceph auth ls"
193 return json
.loads(self
.mds_cluster
.mon_manager
.raw_cluster_cmd(
194 "auth", "ls", "--format=json-pretty"
197 def assert_session_count(self
, expected
, ls_data
=None, mds_id
=None):
199 ls_data
= self
.fs
.mds_asok(['session', 'ls'], mds_id
=mds_id
)
201 alive_count
= len([s
for s
in ls_data
if s
['state'] != 'killing'])
203 self
.assertEqual(expected
, alive_count
, "Expected {0} sessions, found {1}".format(
204 expected
, alive_count
207 def assert_session_state(self
, client_id
, expected_state
):
210 self
.fs
.mds_asok(['session', 'ls'])).get(client_id
, {'state': None})['state'],
213 def get_session_data(self
, client_id
):
214 return self
._session
_by
_id
(client_id
)
216 def _session_list(self
):
217 ls_data
= self
.fs
.mds_asok(['session', 'ls'])
218 ls_data
= [s
for s
in ls_data
if s
['state'] not in ['stale', 'closed']]
221 def get_session(self
, client_id
, session_ls
=None):
222 if session_ls
is None:
223 session_ls
= self
.fs
.mds_asok(['session', 'ls'])
225 return self
._session
_by
_id
(session_ls
)[client_id
]
227 def _session_by_id(self
, session_ls
):
228 return dict([(s
['id'], s
) for s
in session_ls
])
230 def wait_until_evicted(self
, client_id
, timeout
=30):
231 def is_client_evicted():
232 ls
= self
._session
_list
()
234 if s
['id'] == client_id
:
237 self
.wait_until_true(is_client_evicted
, timeout
)
239 def wait_for_daemon_start(self
, daemon_ids
=None):
241 Wait until all the daemons appear in the FSMap, either assigned
242 MDS ranks or in the list of standbys
244 def get_daemon_names():
245 return [info
['name'] for info
in self
.mds_cluster
.status().get_all()]
247 if daemon_ids
is None:
248 daemon_ids
= self
.mds_cluster
.mds_ids
251 self
.wait_until_true(
252 lambda: set(daemon_ids
) & set(get_daemon_names()) == set(daemon_ids
),
256 log
.warning("Timeout waiting for daemons {0}, while we have {1}".format(
257 daemon_ids
, get_daemon_names()
261 def delete_mds_coredump(self
, daemon_id
):
262 # delete coredump file, otherwise teuthology.internal.coredump will
263 # catch it later and treat it as a failure.
264 core_pattern
= self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.sh(
265 "sudo sysctl -n kernel.core_pattern")
266 core_dir
= os
.path
.dirname(core_pattern
.strip())
267 if core_dir
: # Non-default core_pattern with a directory in it
268 # We have seen a core_pattern that looks like it's from teuthology's coredump
269 # task, so proceed to clear out the core file
270 if core_dir
[0] == '|':
271 log
.info("Piped core dumps to program {0}, skip cleaning".format(core_dir
[1:]))
274 log
.info("Clearing core from directory: {0}".format(core_dir
))
276 # Verify that we see the expected single coredump
277 ls_output
= self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.sh([
278 "cd", core_dir
, run
.Raw('&&'),
279 "sudo", "ls", run
.Raw('|'), "sudo", "xargs", "file"
281 cores
= [l
.partition(":")[0]
282 for l
in ls_output
.strip().split("\n")
283 if re
.match(r
'.*ceph-mds.* -i +{0}'.format(daemon_id
), l
)]
285 log
.info("Enumerated cores: {0}".format(cores
))
286 self
.assertEqual(len(cores
), 1)
288 log
.info("Found core file {0}, deleting it".format(cores
[0]))
290 self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.run(args
=[
291 "cd", core_dir
, run
.Raw('&&'), "sudo", "rm", "-f", cores
[0]
294 log
.info("No core_pattern directory set, nothing to clear (internal.coredump not enabled?)")
296 def _get_subtrees(self
, status
=None, rank
=None, path
=None):
300 with contextutil
.safe_while(sleep
=1, tries
=3) as proceed
:
305 for r
in self
.fs
.get_ranks(status
=status
):
306 s
= self
.fs
.rank_asok(["get", "subtrees"], status
=status
, rank
=r
['rank'])
307 s
= filter(lambda s
: s
['auth_first'] == r
['rank'] and s
['auth_second'] == -2, s
)
310 subtrees
= self
.fs
.rank_asok(["get", "subtrees"], status
=status
, rank
=rank
)
311 subtrees
= filter(lambda s
: s
['dir']['path'].startswith(path
), subtrees
)
312 return list(subtrees
)
313 except CommandFailedError
as e
:
314 # Sometimes we get transient errors
315 if e
.exitstatus
== 22:
319 except contextutil
.MaxWhileTries
as e
:
320 raise RuntimeError(f
"could not get subtree state from rank {rank}") from e
322 def _wait_subtrees(self
, test
, status
=None, rank
=None, timeout
=30, sleep
=2, action
=None, path
=None):
325 with contextutil
.safe_while(sleep
=sleep
, tries
=timeout
//sleep
) as proceed
:
327 subtrees
= self
._get
_subtrees
(status
=status
, rank
=rank
, path
=path
)
328 filtered
= sorted([(s
['dir']['path'], s
['auth_first']) for s
in subtrees
])
329 log
.info("%s =?= %s", filtered
, test
)
331 # Confirm export_pin in output is correct:
333 if s
['export_pin'] >= 0:
334 self
.assertTrue(s
['export_pin'] == s
['auth_first'])
336 if action
is not None:
338 except contextutil
.MaxWhileTries
as e
:
339 raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank
)) from e
341 def _wait_until_scrub_complete(self
, path
="/", recursive
=True):
342 out_json
= self
.fs
.rank_tell(["scrub", "start", path
] + ["recursive"] if recursive
else [])
343 with
safe_while(sleep
=10, tries
=10) as proceed
:
345 out_json
= self
.fs
.rank_tell(["scrub", "status"])
346 if out_json
['status'] == "no active scrubs running":
349 def _wait_distributed_subtrees(self
, count
, status
=None, rank
=None, path
=None):
351 with contextutil
.safe_while(sleep
=5, tries
=20) as proceed
:
353 subtrees
= self
._get
_subtrees
(status
=status
, rank
=rank
, path
=path
)
354 subtrees
= list(filter(lambda s
: s
['distributed_ephemeral_pin'] == True, subtrees
))
355 log
.info(f
"len={len(subtrees)} {subtrees}")
356 if len(subtrees
) >= count
:
358 except contextutil
.MaxWhileTries
as e
:
359 raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank
)) from e
361 def _wait_random_subtrees(self
, count
, status
=None, rank
=None, path
=None):
363 with contextutil
.safe_while(sleep
=5, tries
=20) as proceed
:
365 subtrees
= self
._get
_subtrees
(status
=status
, rank
=rank
, path
=path
)
366 subtrees
= list(filter(lambda s
: s
['random_ephemeral_pin'] == True, subtrees
))
367 log
.info(f
"len={len(subtrees)} {subtrees}")
368 if len(subtrees
) >= count
:
370 except contextutil
.MaxWhileTries
as e
:
371 raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank
)) from e