4 from tasks
.ceph_test_case
import CephTestCase
7 from StringIO
import StringIO
9 from tasks
.cephfs
.fuse_mount
import FuseMount
11 from teuthology
.orchestra
import run
12 from teuthology
.orchestra
.run
import CommandFailedError
15 log
= logging
.getLogger(__name__
)
18 def for_teuthology(f
):
20 Decorator that adds an "is_for_teuthology" attribute to the wrapped function
22 f
.is_for_teuthology
= True
26 def needs_trimming(f
):
28 Mark fn as requiring a client capable of trimming its cache (i.e. for ceph-fuse
29 this means it needs to be able to run as root, currently)
31 f
.needs_trimming
= True
35 class CephFSTestCase(CephTestCase
):
37 Test case for Ceph FS, requires caller to populate Filesystem and Mounts,
38 into the fs, mount_a, mount_b class attributes (setting mount_b is optional)
40 Handles resetting the cluster under test between tests.
43 # FIXME weird explicit naming
48 # Declarative test requirements: subclasses should override these to indicate
49 # their special needs. If not met, tests will be skipped.
52 REQUIRE_KCLIENT_REMOTE
= False
53 REQUIRE_ONE_CLIENT_REMOTE
= False
55 # Whether to create the default filesystem during setUp
56 REQUIRE_FILESYSTEM
= True
58 # requires REQUIRE_FILESYSTEM = True
59 REQUIRE_RECOVERY_FILESYSTEM
= False
61 LOAD_SETTINGS
= [] # type: ignore
64 super(CephFSTestCase
, self
).setUp()
66 if len(self
.mds_cluster
.mds_ids
) < self
.MDSS_REQUIRED
:
67 self
.skipTest("Only have {0} MDSs, require {1}".format(
68 len(self
.mds_cluster
.mds_ids
), self
.MDSS_REQUIRED
71 if len(self
.mounts
) < self
.CLIENTS_REQUIRED
:
72 self
.skipTest("Only have {0} clients, require {1}".format(
73 len(self
.mounts
), self
.CLIENTS_REQUIRED
76 if self
.REQUIRE_KCLIENT_REMOTE
:
77 if not isinstance(self
.mounts
[0], FuseMount
) or not isinstance(self
.mounts
[1], FuseMount
):
78 # kclient kill() power cycles nodes, so requires clients to each be on
80 if self
.mounts
[0].client_remote
.hostname
== self
.mounts
[1].client_remote
.hostname
:
81 self
.skipTest("kclient clients must be on separate nodes")
83 if self
.REQUIRE_ONE_CLIENT_REMOTE
:
84 if self
.mounts
[0].client_remote
.hostname
in self
.mds_cluster
.get_mds_hostnames():
85 self
.skipTest("Require first client to be on separate server from MDSs")
87 # Create friendly mount_a, mount_b attrs
88 for i
in range(0, self
.CLIENTS_REQUIRED
):
89 setattr(self
, "mount_{0}".format(chr(ord('a') + i
)), self
.mounts
[i
])
91 self
.mds_cluster
.clear_firewall()
93 # Unmount all clients, we are about to blow away the filesystem
94 for mount
in self
.mounts
:
95 if mount
.is_mounted():
96 mount
.umount_wait(force
=True)
98 # To avoid any issues with e.g. unlink bugs, we destroy and recreate
99 # the filesystem rather than just doing a rm -rf of files
100 self
.mds_cluster
.delete_all_filesystems()
101 self
.mds_cluster
.mds_restart() # to reset any run-time configs, etc.
102 self
.fs
= None # is now invalid!
103 self
.recovery_fs
= None
105 # In case anything is in the OSD blacklist list, clear it out. This is to avoid
106 # the OSD map changing in the background (due to blacklist expiry) while tests run.
108 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd", "blacklist", "clear")
109 except CommandFailedError
:
110 # Fallback for older Ceph cluster
111 blacklist
= json
.loads(self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd",
112 "dump", "--format=json-pretty"))['blacklist']
113 log
.info("Removing {0} blacklist entries".format(len(blacklist
)))
114 for addr
, blacklisted_at
in blacklist
.items():
115 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd", "blacklist", "rm", addr
)
117 client_mount_ids
= [m
.client_id
for m
in self
.mounts
]
118 # In case the test changes the IDs of clients, stash them so that we can
120 self
._original
_client
_ids
= client_mount_ids
121 log
.info(client_mount_ids
)
123 # In case there were any extra auth identities around from a previous
125 for entry
in self
.auth_list():
126 ent_type
, ent_id
= entry
['entity'].split(".")
127 if ent_type
== "client" and ent_id
not in client_mount_ids
and ent_id
!= "admin":
128 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("auth", "del", entry
['entity'])
130 if self
.REQUIRE_FILESYSTEM
:
131 self
.fs
= self
.mds_cluster
.newfs(create
=True)
133 # In case some test messed with auth caps, reset them
134 for client_id
in client_mount_ids
:
135 self
.mds_cluster
.mon_manager
.raw_cluster_cmd_result(
136 'auth', 'caps', "client.{0}".format(client_id
),
139 'osd', 'allow rw pool={0}'.format(self
.fs
.get_data_pool_name()))
141 # wait for ranks to become active
142 self
.fs
.wait_for_daemons()
144 # Mount the requested number of clients
145 for i
in range(0, self
.CLIENTS_REQUIRED
):
146 self
.mounts
[i
].mount()
147 self
.mounts
[i
].wait_until_mounted()
149 if self
.REQUIRE_RECOVERY_FILESYSTEM
:
150 if not self
.REQUIRE_FILESYSTEM
:
151 self
.skipTest("Recovery filesystem requires a primary filesystem as well")
152 self
.fs
.mon_manager
.raw_cluster_cmd('fs', 'flag', 'set',
153 'enable_multiple', 'true',
154 '--yes-i-really-mean-it')
155 self
.recovery_fs
= self
.mds_cluster
.newfs(name
="recovery_fs", create
=False)
156 self
.recovery_fs
.set_metadata_overlay(True)
157 self
.recovery_fs
.set_data_pool_name(self
.fs
.get_data_pool_name())
158 self
.recovery_fs
.create()
159 self
.recovery_fs
.getinfo(refresh
=True)
160 self
.recovery_fs
.mds_restart()
161 self
.recovery_fs
.wait_for_daemons()
163 # Load an config settings of interest
164 for setting
in self
.LOAD_SETTINGS
:
165 setattr(self
, setting
, float(self
.fs
.mds_asok(
166 ['config', 'get', setting
], self
.mds_cluster
.mds_ids
[0]
169 self
.configs_set
= set()
172 self
.mds_cluster
.clear_firewall()
173 for m
in self
.mounts
:
176 for i
, m
in enumerate(self
.mounts
):
177 m
.client_id
= self
._original
_client
_ids
[i
]
179 for subsys
, key
in self
.configs_set
:
180 self
.mds_cluster
.clear_ceph_conf(subsys
, key
)
182 return super(CephFSTestCase
, self
).tearDown()
184 def set_conf(self
, subsys
, key
, value
):
185 self
.configs_set
.add((subsys
, key
))
186 self
.mds_cluster
.set_ceph_conf(subsys
, key
, value
)
190 Convenience wrapper on "ceph auth ls"
192 return json
.loads(self
.mds_cluster
.mon_manager
.raw_cluster_cmd(
193 "auth", "ls", "--format=json-pretty"
196 def assert_session_count(self
, expected
, ls_data
=None, mds_id
=None):
198 ls_data
= self
.fs
.mds_asok(['session', 'ls'], mds_id
=mds_id
)
200 alive_count
= len([s
for s
in ls_data
if s
['state'] != 'killing'])
202 self
.assertEqual(expected
, alive_count
, "Expected {0} sessions, found {1}".format(
203 expected
, alive_count
206 def assert_session_state(self
, client_id
, expected_state
):
209 self
.fs
.mds_asok(['session', 'ls'])).get(client_id
, {'state': None})['state'],
212 def get_session_data(self
, client_id
):
213 return self
._session
_by
_id
(client_id
)
215 def _session_list(self
):
216 ls_data
= self
.fs
.mds_asok(['session', 'ls'])
217 ls_data
= [s
for s
in ls_data
if s
['state'] not in ['stale', 'closed']]
220 def get_session(self
, client_id
, session_ls
=None):
221 if session_ls
is None:
222 session_ls
= self
.fs
.mds_asok(['session', 'ls'])
224 return self
._session
_by
_id
(session_ls
)[client_id
]
226 def _session_by_id(self
, session_ls
):
227 return dict([(s
['id'], s
) for s
in session_ls
])
229 def wait_until_evicted(self
, client_id
, timeout
=30):
230 def is_client_evicted():
231 ls
= self
._session
_list
()
233 if s
['id'] == client_id
:
236 self
.wait_until_true(is_client_evicted
, timeout
)
238 def wait_for_daemon_start(self
, daemon_ids
=None):
240 Wait until all the daemons appear in the FSMap, either assigned
241 MDS ranks or in the list of standbys
243 def get_daemon_names():
244 return [info
['name'] for info
in self
.mds_cluster
.status().get_all()]
246 if daemon_ids
is None:
247 daemon_ids
= self
.mds_cluster
.mds_ids
250 self
.wait_until_true(
251 lambda: set(daemon_ids
) & set(get_daemon_names()) == set(daemon_ids
),
255 log
.warn("Timeout waiting for daemons {0}, while we have {1}".format(
256 daemon_ids
, get_daemon_names()
260 def delete_mds_coredump(self
, daemon_id
):
261 # delete coredump file, otherwise teuthology.internal.coredump will
262 # catch it later and treat it as a failure.
263 p
= self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.run(args
=[
264 "sudo", "sysctl", "-n", "kernel.core_pattern"], stdout
=StringIO())
265 core_dir
= os
.path
.dirname(p
.stdout
.getvalue().strip())
266 if core_dir
: # Non-default core_pattern with a directory in it
267 # We have seen a core_pattern that looks like it's from teuthology's coredump
268 # task, so proceed to clear out the core file
269 log
.info("Clearing core from directory: {0}".format(core_dir
))
271 # Verify that we see the expected single coredump
272 ls_proc
= self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.run(args
=[
273 "cd", core_dir
, run
.Raw('&&'),
274 "sudo", "ls", run
.Raw('|'), "sudo", "xargs", "file"
275 ], stdout
=StringIO())
276 cores
= [l
.partition(":")[0]
277 for l
in ls_proc
.stdout
.getvalue().strip().split("\n")
278 if re
.match(r
'.*ceph-mds.* -i +{0}'.format(daemon_id
), l
)]
280 log
.info("Enumerated cores: {0}".format(cores
))
281 self
.assertEqual(len(cores
), 1)
283 log
.info("Found core file {0}, deleting it".format(cores
[0]))
285 self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.run(args
=[
286 "cd", core_dir
, run
.Raw('&&'), "sudo", "rm", "-f", cores
[0]
289 log
.info("No core_pattern directory set, nothing to clear (internal.coredump not enabled?)")
291 def _wait_subtrees(self
, status
, rank
, test
):
295 for i
in range(timeout
/pause
):
296 subtrees
= self
.fs
.mds_asok(["get", "subtrees"], mds_id
=status
.get_rank(self
.fs
.id, rank
)['name'])
297 subtrees
= filter(lambda s
: s
['dir']['path'].startswith('/'), subtrees
)
298 filtered
= sorted([(s
['dir']['path'], s
['auth_first']) for s
in subtrees
])
299 log
.info("%s =?= %s", filtered
, test
)
301 # Confirm export_pin in output is correct:
303 self
.assertTrue(s
['export_pin'] == s
['auth_first'])
306 raise RuntimeError("rank {0} failed to reach desired subtree state".format(rank
))