3 from tasks
.ceph_test_case
import CephTestCase
7 from tasks
.cephfs
.fuse_mount
import FuseMount
9 from teuthology
import contextutil
10 from teuthology
.orchestra
import run
11 from teuthology
.orchestra
.run
import CommandFailedError
12 from teuthology
.contextutil
import safe_while
15 log
= logging
.getLogger(__name__
)
18 def for_teuthology(f
):
20 Decorator that adds an "is_for_teuthology" attribute to the wrapped function
22 f
.is_for_teuthology
= True
26 def needs_trimming(f
):
28 Mark fn as requiring a client capable of trimming its cache (i.e. for ceph-fuse
29 this means it needs to be able to run as root, currently)
31 f
.needs_trimming
= True
35 class CephFSTestCase(CephTestCase
):
37 Test case for Ceph FS, requires caller to populate Filesystem and Mounts,
38 into the fs, mount_a, mount_b class attributes (setting mount_b is optional)
40 Handles resetting the cluster under test between tests.
43 # FIXME weird explicit naming
48 # Declarative test requirements: subclasses should override these to indicate
49 # their special needs. If not met, tests will be skipped.
52 REQUIRE_KCLIENT_REMOTE
= False
53 REQUIRE_ONE_CLIENT_REMOTE
= False
55 # Whether to create the default filesystem during setUp
56 REQUIRE_FILESYSTEM
= True
58 # requires REQUIRE_FILESYSTEM = True
59 REQUIRE_RECOVERY_FILESYSTEM
= False
61 LOAD_SETTINGS
= [] # type: ignore
64 super(CephFSTestCase
, self
).setUp()
66 if len(self
.mds_cluster
.mds_ids
) < self
.MDSS_REQUIRED
:
67 self
.skipTest("Only have {0} MDSs, require {1}".format(
68 len(self
.mds_cluster
.mds_ids
), self
.MDSS_REQUIRED
71 if len(self
.mounts
) < self
.CLIENTS_REQUIRED
:
72 self
.skipTest("Only have {0} clients, require {1}".format(
73 len(self
.mounts
), self
.CLIENTS_REQUIRED
76 if self
.REQUIRE_KCLIENT_REMOTE
:
77 if not isinstance(self
.mounts
[0], FuseMount
) or not isinstance(self
.mounts
[1], FuseMount
):
78 # kclient kill() power cycles nodes, so requires clients to each be on
80 if self
.mounts
[0].client_remote
.hostname
== self
.mounts
[1].client_remote
.hostname
:
81 self
.skipTest("kclient clients must be on separate nodes")
83 if self
.REQUIRE_ONE_CLIENT_REMOTE
:
84 if self
.mounts
[0].client_remote
.hostname
in self
.mds_cluster
.get_mds_hostnames():
85 self
.skipTest("Require first client to be on separate server from MDSs")
87 # Create friendly mount_a, mount_b attrs
88 for i
in range(0, self
.CLIENTS_REQUIRED
):
89 setattr(self
, "mount_{0}".format(chr(ord('a') + i
)), self
.mounts
[i
])
91 self
.mds_cluster
.clear_firewall()
93 # Unmount all clients, we are about to blow away the filesystem
94 for mount
in self
.mounts
:
95 if mount
.is_mounted():
96 mount
.umount_wait(force
=True)
98 # To avoid any issues with e.g. unlink bugs, we destroy and recreate
99 # the filesystem rather than just doing a rm -rf of files
100 self
.mds_cluster
.delete_all_filesystems()
101 self
.mds_cluster
.mds_restart() # to reset any run-time configs, etc.
102 self
.fs
= None # is now invalid!
103 self
.recovery_fs
= None
105 # In case anything is in the OSD blacklist list, clear it out. This is to avoid
106 # the OSD map changing in the background (due to blacklist expiry) while tests run.
108 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd", "blacklist", "clear")
109 except CommandFailedError
:
110 # Fallback for older Ceph cluster
111 blacklist
= json
.loads(self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd",
112 "dump", "--format=json-pretty"))['blacklist']
113 log
.info("Removing {0} blacklist entries".format(len(blacklist
)))
114 for addr
, blacklisted_at
in blacklist
.items():
115 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd", "blacklist", "rm", addr
)
117 client_mount_ids
= [m
.client_id
for m
in self
.mounts
]
118 # In case the test changes the IDs of clients, stash them so that we can
120 self
._original
_client
_ids
= client_mount_ids
121 log
.info(client_mount_ids
)
123 # In case there were any extra auth identities around from a previous
125 for entry
in self
.auth_list():
126 ent_type
, ent_id
= entry
['entity'].split(".")
127 if ent_type
== "client" and ent_id
not in client_mount_ids
and ent_id
!= "admin":
128 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("auth", "del", entry
['entity'])
130 if self
.REQUIRE_FILESYSTEM
:
131 self
.fs
= self
.mds_cluster
.newfs(create
=True)
133 # In case some test messed with auth caps, reset them
134 for client_id
in client_mount_ids
:
135 self
.mds_cluster
.mon_manager
.raw_cluster_cmd_result(
136 'auth', 'caps', "client.{0}".format(client_id
),
139 'osd', 'allow rw pool={0}'.format(self
.fs
.get_data_pool_name()))
141 # wait for ranks to become active
142 self
.fs
.wait_for_daemons()
144 # Mount the requested number of clients
145 for i
in range(0, self
.CLIENTS_REQUIRED
):
146 self
.mounts
[i
].mount_wait()
148 if self
.REQUIRE_RECOVERY_FILESYSTEM
:
149 if not self
.REQUIRE_FILESYSTEM
:
150 self
.skipTest("Recovery filesystem requires a primary filesystem as well")
151 self
.fs
.mon_manager
.raw_cluster_cmd('fs', 'flag', 'set',
152 'enable_multiple', 'true',
153 '--yes-i-really-mean-it')
154 self
.recovery_fs
= self
.mds_cluster
.newfs(name
="recovery_fs", create
=False)
155 self
.recovery_fs
.set_metadata_overlay(True)
156 self
.recovery_fs
.set_data_pool_name(self
.fs
.get_data_pool_name())
157 self
.recovery_fs
.create()
158 self
.recovery_fs
.getinfo(refresh
=True)
159 self
.recovery_fs
.mds_restart()
160 self
.recovery_fs
.wait_for_daemons()
162 # Load an config settings of interest
163 for setting
in self
.LOAD_SETTINGS
:
164 setattr(self
, setting
, float(self
.fs
.mds_asok(
165 ['config', 'get', setting
], list(self
.mds_cluster
.mds_ids
)[0]
168 self
.configs_set
= set()
171 self
.mds_cluster
.clear_firewall()
172 for m
in self
.mounts
:
175 for i
, m
in enumerate(self
.mounts
):
176 m
.client_id
= self
._original
_client
_ids
[i
]
178 for subsys
, key
in self
.configs_set
:
179 self
.mds_cluster
.clear_ceph_conf(subsys
, key
)
181 return super(CephFSTestCase
, self
).tearDown()
183 def set_conf(self
, subsys
, key
, value
):
184 self
.configs_set
.add((subsys
, key
))
185 self
.mds_cluster
.set_ceph_conf(subsys
, key
, value
)
189 Convenience wrapper on "ceph auth ls"
191 return json
.loads(self
.mds_cluster
.mon_manager
.raw_cluster_cmd(
192 "auth", "ls", "--format=json-pretty"
195 def assert_session_count(self
, expected
, ls_data
=None, mds_id
=None):
197 ls_data
= self
.fs
.mds_asok(['session', 'ls'], mds_id
=mds_id
)
199 alive_count
= len([s
for s
in ls_data
if s
['state'] != 'killing'])
201 self
.assertEqual(expected
, alive_count
, "Expected {0} sessions, found {1}".format(
202 expected
, alive_count
205 def assert_session_state(self
, client_id
, expected_state
):
208 self
.fs
.mds_asok(['session', 'ls'])).get(client_id
, {'state': None})['state'],
211 def get_session_data(self
, client_id
):
212 return self
._session
_by
_id
(client_id
)
214 def _session_list(self
):
215 ls_data
= self
.fs
.mds_asok(['session', 'ls'])
216 ls_data
= [s
for s
in ls_data
if s
['state'] not in ['stale', 'closed']]
219 def get_session(self
, client_id
, session_ls
=None):
220 if session_ls
is None:
221 session_ls
= self
.fs
.mds_asok(['session', 'ls'])
223 return self
._session
_by
_id
(session_ls
)[client_id
]
225 def _session_by_id(self
, session_ls
):
226 return dict([(s
['id'], s
) for s
in session_ls
])
228 def wait_until_evicted(self
, client_id
, timeout
=30):
229 def is_client_evicted():
230 ls
= self
._session
_list
()
232 if s
['id'] == client_id
:
235 self
.wait_until_true(is_client_evicted
, timeout
)
237 def wait_for_daemon_start(self
, daemon_ids
=None):
239 Wait until all the daemons appear in the FSMap, either assigned
240 MDS ranks or in the list of standbys
242 def get_daemon_names():
243 return [info
['name'] for info
in self
.mds_cluster
.status().get_all()]
245 if daemon_ids
is None:
246 daemon_ids
= self
.mds_cluster
.mds_ids
249 self
.wait_until_true(
250 lambda: set(daemon_ids
) & set(get_daemon_names()) == set(daemon_ids
),
254 log
.warning("Timeout waiting for daemons {0}, while we have {1}".format(
255 daemon_ids
, get_daemon_names()
259 def delete_mds_coredump(self
, daemon_id
):
260 # delete coredump file, otherwise teuthology.internal.coredump will
261 # catch it later and treat it as a failure.
262 core_pattern
= self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.sh(
263 "sudo sysctl -n kernel.core_pattern")
264 core_dir
= os
.path
.dirname(core_pattern
.strip())
265 if core_dir
: # Non-default core_pattern with a directory in it
266 # We have seen a core_pattern that looks like it's from teuthology's coredump
267 # task, so proceed to clear out the core file
268 if core_dir
[0] == '|':
269 log
.info("Piped core dumps to program {0}, skip cleaning".format(core_dir
[1:]))
272 log
.info("Clearing core from directory: {0}".format(core_dir
))
274 # Verify that we see the expected single coredump
275 ls_output
= self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.sh([
276 "cd", core_dir
, run
.Raw('&&'),
277 "sudo", "ls", run
.Raw('|'), "sudo", "xargs", "file"
279 cores
= [l
.partition(":")[0]
280 for l
in ls_output
.strip().split("\n")
281 if re
.match(r
'.*ceph-mds.* -i +{0}'.format(daemon_id
), l
)]
283 log
.info("Enumerated cores: {0}".format(cores
))
284 self
.assertEqual(len(cores
), 1)
286 log
.info("Found core file {0}, deleting it".format(cores
[0]))
288 self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.run(args
=[
289 "cd", core_dir
, run
.Raw('&&'), "sudo", "rm", "-f", cores
[0]
292 log
.info("No core_pattern directory set, nothing to clear (internal.coredump not enabled?)")
294 def _get_subtrees(self
, status
=None, rank
=None, path
=None):
298 with contextutil
.safe_while(sleep
=1, tries
=3) as proceed
:
303 for r
in self
.fs
.get_ranks(status
=status
):
304 s
= self
.fs
.rank_asok(["get", "subtrees"], status
=status
, rank
=r
['rank'])
305 s
= filter(lambda s
: s
['auth_first'] == r
['rank'] and s
['auth_second'] == -2, s
)
308 subtrees
= self
.fs
.rank_asok(["get", "subtrees"], status
=status
, rank
=rank
)
309 subtrees
= filter(lambda s
: s
['dir']['path'].startswith(path
), subtrees
)
310 return list(subtrees
)
311 except CommandFailedError
as e
:
312 # Sometimes we get transient errors
313 if e
.exitstatus
== 22:
317 except contextutil
.MaxWhileTries
as e
:
318 raise RuntimeError(f
"could not get subtree state from rank {rank}") from e
320 def _wait_subtrees(self
, test
, status
=None, rank
=None, timeout
=30, sleep
=2, action
=None, path
=None):
323 with contextutil
.safe_while(sleep
=sleep
, tries
=timeout
//sleep
) as proceed
:
325 subtrees
= self
._get
_subtrees
(status
=status
, rank
=rank
, path
=path
)
326 filtered
= sorted([(s
['dir']['path'], s
['auth_first']) for s
in subtrees
])
327 log
.info("%s =?= %s", filtered
, test
)
329 # Confirm export_pin in output is correct:
331 if s
['export_pin'] >= 0:
332 self
.assertTrue(s
['export_pin'] == s
['auth_first'])
334 if action
is not None:
336 except contextutil
.MaxWhileTries
as e
:
337 raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank
)) from e
339 def _wait_until_scrub_complete(self
, path
="/", recursive
=True):
340 out_json
= self
.fs
.rank_tell(["scrub", "start", path
] + ["recursive"] if recursive
else [])
341 with
safe_while(sleep
=10, tries
=10) as proceed
:
343 out_json
= self
.fs
.rank_tell(["scrub", "status"])
344 if out_json
['status'] == "no active scrubs running":
347 def _wait_distributed_subtrees(self
, count
, status
=None, rank
=None, path
=None):
349 with contextutil
.safe_while(sleep
=5, tries
=20) as proceed
:
351 subtrees
= self
._get
_subtrees
(status
=status
, rank
=rank
, path
=path
)
352 subtrees
= list(filter(lambda s
: s
['distributed_ephemeral_pin'] == True, subtrees
))
353 log
.info(f
"len={len(subtrees)} {subtrees}")
354 if len(subtrees
) >= count
:
356 except contextutil
.MaxWhileTries
as e
:
357 raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank
)) from e
359 def _wait_random_subtrees(self
, count
, status
=None, rank
=None, path
=None):
361 with contextutil
.safe_while(sleep
=5, tries
=20) as proceed
:
363 subtrees
= self
._get
_subtrees
(status
=status
, rank
=rank
, path
=path
)
364 subtrees
= list(filter(lambda s
: s
['random_ephemeral_pin'] == True, subtrees
))
365 log
.info(f
"len={len(subtrees)} {subtrees}")
366 if len(subtrees
) >= count
:
368 except contextutil
.MaxWhileTries
as e
:
369 raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank
)) from e