]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/cephfs_test_case.py
import ceph nautilus 14.2.2
[ceph.git] / ceph / qa / tasks / cephfs / cephfs_test_case.py
1 import time
2 import json
3 import logging
4 from unittest import case
5 from tasks.ceph_test_case import CephTestCase
6 import os
7 import re
8 from StringIO import StringIO
9
10 from tasks.cephfs.fuse_mount import FuseMount
11
12 from teuthology.orchestra import run
13 from teuthology.orchestra.run import CommandFailedError
14
15
16 log = logging.getLogger(__name__)
17
18
19 def for_teuthology(f):
20 """
21 Decorator that adds an "is_for_teuthology" attribute to the wrapped function
22 """
23 f.is_for_teuthology = True
24 return f
25
26
27 def needs_trimming(f):
28 """
29 Mark fn as requiring a client capable of trimming its cache (i.e. for ceph-fuse
30 this means it needs to be able to run as root, currently)
31 """
32 f.needs_trimming = True
33 return f
34
35
36 class CephFSTestCase(CephTestCase):
37 """
38 Test case for Ceph FS, requires caller to populate Filesystem and Mounts,
39 into the fs, mount_a, mount_b class attributes (setting mount_b is optional)
40
41 Handles resetting the cluster under test between tests.
42 """
43
44 # FIXME weird explicit naming
45 mount_a = None
46 mount_b = None
47 recovery_mount = None
48
49 # Declarative test requirements: subclasses should override these to indicate
50 # their special needs. If not met, tests will be skipped.
51 CLIENTS_REQUIRED = 1
52 MDSS_REQUIRED = 1
53 REQUIRE_KCLIENT_REMOTE = False
54 REQUIRE_ONE_CLIENT_REMOTE = False
55
56 # Whether to create the default filesystem during setUp
57 REQUIRE_FILESYSTEM = True
58
59 # requires REQUIRE_FILESYSTEM = True
60 REQUIRE_RECOVERY_FILESYSTEM = False
61
62 LOAD_SETTINGS = []
63
64 def setUp(self):
65 super(CephFSTestCase, self).setUp()
66
67 if len(self.mds_cluster.mds_ids) < self.MDSS_REQUIRED:
68 raise case.SkipTest("Only have {0} MDSs, require {1}".format(
69 len(self.mds_cluster.mds_ids), self.MDSS_REQUIRED
70 ))
71
72 if len(self.mounts) < self.CLIENTS_REQUIRED:
73 raise case.SkipTest("Only have {0} clients, require {1}".format(
74 len(self.mounts), self.CLIENTS_REQUIRED
75 ))
76
77 if self.REQUIRE_KCLIENT_REMOTE:
78 if not isinstance(self.mounts[0], FuseMount) or not isinstance(self.mounts[1], FuseMount):
79 # kclient kill() power cycles nodes, so requires clients to each be on
80 # their own node
81 if self.mounts[0].client_remote.hostname == self.mounts[1].client_remote.hostname:
82 raise case.SkipTest("kclient clients must be on separate nodes")
83
84 if self.REQUIRE_ONE_CLIENT_REMOTE:
85 if self.mounts[0].client_remote.hostname in self.mds_cluster.get_mds_hostnames():
86 raise case.SkipTest("Require first client to be on separate server from MDSs")
87
88 # Create friendly mount_a, mount_b attrs
89 for i in range(0, self.CLIENTS_REQUIRED):
90 setattr(self, "mount_{0}".format(chr(ord('a') + i)), self.mounts[i])
91
92 self.mds_cluster.clear_firewall()
93
94 # Unmount all clients, we are about to blow away the filesystem
95 for mount in self.mounts:
96 if mount.is_mounted():
97 mount.umount_wait(force=True)
98
99 # To avoid any issues with e.g. unlink bugs, we destroy and recreate
100 # the filesystem rather than just doing a rm -rf of files
101 self.mds_cluster.mds_stop()
102 self.mds_cluster.mds_fail()
103 self.mds_cluster.delete_all_filesystems()
104 self.fs = None # is now invalid!
105 self.recovery_fs = None
106
107 # In case anything is in the OSD blacklist list, clear it out. This is to avoid
108 # the OSD map changing in the background (due to blacklist expiry) while tests run.
109 try:
110 self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blacklist", "clear")
111 except CommandFailedError:
112 # Fallback for older Ceph cluster
113 blacklist = json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd("osd",
114 "dump", "--format=json-pretty"))['blacklist']
115 log.info("Removing {0} blacklist entries".format(len(blacklist)))
116 for addr, blacklisted_at in blacklist.items():
117 self.mds_cluster.mon_manager.raw_cluster_cmd("osd", "blacklist", "rm", addr)
118
119 client_mount_ids = [m.client_id for m in self.mounts]
120 # In case the test changes the IDs of clients, stash them so that we can
121 # reset in tearDown
122 self._original_client_ids = client_mount_ids
123 log.info(client_mount_ids)
124
125 # In case there were any extra auth identities around from a previous
126 # test, delete them
127 for entry in self.auth_list():
128 ent_type, ent_id = entry['entity'].split(".")
129 if ent_type == "client" and ent_id not in client_mount_ids and ent_id != "admin":
130 self.mds_cluster.mon_manager.raw_cluster_cmd("auth", "del", entry['entity'])
131
132 if self.REQUIRE_FILESYSTEM:
133 self.fs = self.mds_cluster.newfs(create=True)
134 self.fs.mds_restart()
135
136 # In case some test messed with auth caps, reset them
137 for client_id in client_mount_ids:
138 self.mds_cluster.mon_manager.raw_cluster_cmd_result(
139 'auth', 'caps', "client.{0}".format(client_id),
140 'mds', 'allow',
141 'mon', 'allow r',
142 'osd', 'allow rw pool={0}'.format(self.fs.get_data_pool_name()))
143
144 # wait for mds restart to complete...
145 self.fs.wait_for_daemons()
146
147 # Mount the requested number of clients
148 for i in range(0, self.CLIENTS_REQUIRED):
149 self.mounts[i].mount()
150 self.mounts[i].wait_until_mounted()
151
152 if self.REQUIRE_RECOVERY_FILESYSTEM:
153 if not self.REQUIRE_FILESYSTEM:
154 raise case.SkipTest("Recovery filesystem requires a primary filesystem as well")
155 self.fs.mon_manager.raw_cluster_cmd('fs', 'flag', 'set',
156 'enable_multiple', 'true',
157 '--yes-i-really-mean-it')
158 self.recovery_fs = self.mds_cluster.newfs(name="recovery_fs", create=False)
159 self.recovery_fs.set_metadata_overlay(True)
160 self.recovery_fs.set_data_pool_name(self.fs.get_data_pool_name())
161 self.recovery_fs.create()
162 self.recovery_fs.getinfo(refresh=True)
163 self.recovery_fs.mds_restart()
164 self.recovery_fs.wait_for_daemons()
165
166 # Load an config settings of interest
167 for setting in self.LOAD_SETTINGS:
168 setattr(self, setting, float(self.fs.mds_asok(
169 ['config', 'get', setting], self.mds_cluster.mds_ids[0]
170 )[setting]))
171
172 self.configs_set = set()
173
174 def tearDown(self):
175 super(CephFSTestCase, self).tearDown()
176
177 self.mds_cluster.clear_firewall()
178 for m in self.mounts:
179 m.teardown()
180
181 for i, m in enumerate(self.mounts):
182 m.client_id = self._original_client_ids[i]
183
184 for subsys, key in self.configs_set:
185 self.mds_cluster.clear_ceph_conf(subsys, key)
186
187 def set_conf(self, subsys, key, value):
188 self.configs_set.add((subsys, key))
189 self.mds_cluster.set_ceph_conf(subsys, key, value)
190
191 def auth_list(self):
192 """
193 Convenience wrapper on "ceph auth ls"
194 """
195 return json.loads(self.mds_cluster.mon_manager.raw_cluster_cmd(
196 "auth", "ls", "--format=json-pretty"
197 ))['auth_dump']
198
199 def assert_session_count(self, expected, ls_data=None, mds_id=None):
200 if ls_data is None:
201 ls_data = self.fs.mds_asok(['session', 'ls'], mds_id=mds_id)
202
203 alive_count = len([s for s in ls_data if s['state'] != 'killing'])
204
205 self.assertEqual(expected, alive_count, "Expected {0} sessions, found {1}".format(
206 expected, alive_count
207 ))
208
209 def assert_session_state(self, client_id, expected_state):
210 self.assertEqual(
211 self._session_by_id(
212 self.fs.mds_asok(['session', 'ls'])).get(client_id, {'state': None})['state'],
213 expected_state)
214
215 def get_session_data(self, client_id):
216 return self._session_by_id(client_id)
217
218 def _session_list(self):
219 ls_data = self.fs.mds_asok(['session', 'ls'])
220 ls_data = [s for s in ls_data if s['state'] not in ['stale', 'closed']]
221 return ls_data
222
223 def get_session(self, client_id, session_ls=None):
224 if session_ls is None:
225 session_ls = self.fs.mds_asok(['session', 'ls'])
226
227 return self._session_by_id(session_ls)[client_id]
228
229 def _session_by_id(self, session_ls):
230 return dict([(s['id'], s) for s in session_ls])
231
232 def wait_for_daemon_start(self, daemon_ids=None):
233 """
234 Wait until all the daemons appear in the FSMap, either assigned
235 MDS ranks or in the list of standbys
236 """
237 def get_daemon_names():
238 return [info['name'] for info in self.mds_cluster.status().get_all()]
239
240 if daemon_ids is None:
241 daemon_ids = self.mds_cluster.mds_ids
242
243 try:
244 self.wait_until_true(
245 lambda: set(daemon_ids) & set(get_daemon_names()) == set(daemon_ids),
246 timeout=30
247 )
248 except RuntimeError:
249 log.warn("Timeout waiting for daemons {0}, while we have {1}".format(
250 daemon_ids, get_daemon_names()
251 ))
252 raise
253
254 def delete_mds_coredump(self, daemon_id):
255 # delete coredump file, otherwise teuthology.internal.coredump will
256 # catch it later and treat it as a failure.
257 p = self.mds_cluster.mds_daemons[daemon_id].remote.run(args=[
258 "sudo", "sysctl", "-n", "kernel.core_pattern"], stdout=StringIO())
259 core_dir = os.path.dirname(p.stdout.getvalue().strip())
260 if core_dir: # Non-default core_pattern with a directory in it
261 # We have seen a core_pattern that looks like it's from teuthology's coredump
262 # task, so proceed to clear out the core file
263 log.info("Clearing core from directory: {0}".format(core_dir))
264
265 # Verify that we see the expected single coredump
266 ls_proc = self.mds_cluster.mds_daemons[daemon_id].remote.run(args=[
267 "cd", core_dir, run.Raw('&&'),
268 "sudo", "ls", run.Raw('|'), "sudo", "xargs", "file"
269 ], stdout=StringIO())
270 cores = [l.partition(":")[0]
271 for l in ls_proc.stdout.getvalue().strip().split("\n")
272 if re.match(r'.*ceph-mds.* -i +{0}'.format(daemon_id), l)]
273
274 log.info("Enumerated cores: {0}".format(cores))
275 self.assertEqual(len(cores), 1)
276
277 log.info("Found core file {0}, deleting it".format(cores[0]))
278
279 self.mds_cluster.mds_daemons[daemon_id].remote.run(args=[
280 "cd", core_dir, run.Raw('&&'), "sudo", "rm", "-f", cores[0]
281 ])
282 else:
283 log.info("No core_pattern directory set, nothing to clear (internal.coredump not enabled?)")
284
285 def _wait_subtrees(self, status, rank, test):
286 timeout = 30
287 pause = 2
288 test = sorted(test)
289 for i in range(timeout/pause):
290 subtrees = self.fs.mds_asok(["get", "subtrees"], mds_id=status.get_rank(self.fs.id, rank)['name'])
291 subtrees = filter(lambda s: s['dir']['path'].startswith('/'), subtrees)
292 filtered = sorted([(s['dir']['path'], s['auth_first']) for s in subtrees])
293 log.info("%s =?= %s", filtered, test)
294 if filtered == test:
295 # Confirm export_pin in output is correct:
296 for s in subtrees:
297 self.assertTrue(s['export_pin'] == s['auth_first'])
298 return subtrees
299 time.sleep(pause)
300 raise RuntimeError("rank {0} failed to reach desired subtree state", rank)