3 from unittest
import case
4 from tasks
.ceph_test_case
import CephTestCase
7 from StringIO
import StringIO
9 from tasks
.cephfs
.fuse_mount
import FuseMount
11 from teuthology
.orchestra
import run
12 from teuthology
.orchestra
.run
import CommandFailedError
15 log
= logging
.getLogger(__name__
)
18 def for_teuthology(f
):
20 Decorator that adds an "is_for_teuthology" attribute to the wrapped function
22 f
.is_for_teuthology
= True
26 def needs_trimming(f
):
28 Mark fn as requiring a client capable of trimming its cache (i.e. for ceph-fuse
29 this means it needs to be able to run as root, currently)
31 f
.needs_trimming
= True
35 class CephFSTestCase(CephTestCase
):
37 Test case for Ceph FS, requires caller to populate Filesystem and Mounts,
38 into the fs, mount_a, mount_b class attributes (setting mount_b is optional)
40 Handles resetting the cluster under test between tests.
43 # FIXME weird explicit naming
47 # Declarative test requirements: subclasses should override these to indicate
48 # their special needs. If not met, tests will be skipped.
51 REQUIRE_KCLIENT_REMOTE
= False
52 REQUIRE_ONE_CLIENT_REMOTE
= False
53 REQUIRE_MEMSTORE
= False
55 # Whether to create the default filesystem during setUp
56 REQUIRE_FILESYSTEM
= True
61 super(CephFSTestCase
, self
).setUp()
63 if len(self
.mds_cluster
.mds_ids
) < self
.MDSS_REQUIRED
:
64 raise case
.SkipTest("Only have {0} MDSs, require {1}".format(
65 len(self
.mds_cluster
.mds_ids
), self
.MDSS_REQUIRED
68 if len(self
.mounts
) < self
.CLIENTS_REQUIRED
:
69 raise case
.SkipTest("Only have {0} clients, require {1}".format(
70 len(self
.mounts
), self
.CLIENTS_REQUIRED
73 if self
.REQUIRE_KCLIENT_REMOTE
:
74 if not isinstance(self
.mounts
[0], FuseMount
) or not isinstance(self
.mounts
[1], FuseMount
):
75 # kclient kill() power cycles nodes, so requires clients to each be on
77 if self
.mounts
[0].client_remote
.hostname
== self
.mounts
[1].client_remote
.hostname
:
78 raise case
.SkipTest("kclient clients must be on separate nodes")
80 if self
.REQUIRE_ONE_CLIENT_REMOTE
:
81 if self
.mounts
[0].client_remote
.hostname
in self
.mds_cluster
.get_mds_hostnames():
82 raise case
.SkipTest("Require first client to be on separate server from MDSs")
84 if self
.REQUIRE_MEMSTORE
:
85 objectstore
= self
.mds_cluster
.get_config("osd_objectstore", "osd")
86 if objectstore
!= "memstore":
87 # You certainly *could* run this on a real OSD, but you don't want to sit
88 # here for hours waiting for the test to fill up a 1TB drive!
89 raise case
.SkipTest("Require `memstore` OSD backend to simulate full drives")
91 # Create friendly mount_a, mount_b attrs
92 for i
in range(0, self
.CLIENTS_REQUIRED
):
93 setattr(self
, "mount_{0}".format(chr(ord('a') + i
)), self
.mounts
[i
])
95 self
.mds_cluster
.clear_firewall()
97 # Unmount all clients, we are about to blow away the filesystem
98 for mount
in self
.mounts
:
99 if mount
.is_mounted():
100 mount
.umount_wait(force
=True)
102 # To avoid any issues with e.g. unlink bugs, we destroy and recreate
103 # the filesystem rather than just doing a rm -rf of files
104 self
.mds_cluster
.mds_stop()
105 self
.mds_cluster
.mds_fail()
106 self
.mds_cluster
.delete_all_filesystems()
107 self
.fs
= None # is now invalid!
109 # In case the previous filesystem had filled up the RADOS cluster, wait for that
111 osd_mon_report_interval_max
= int(self
.mds_cluster
.get_config("osd_mon_report_interval_max", service_type
='osd'))
112 self
.wait_until_true(lambda: not self
.mds_cluster
.is_full(),
113 timeout
=osd_mon_report_interval_max
* 5)
115 # In case anything is in the OSD blacklist list, clear it out. This is to avoid
116 # the OSD map changing in the background (due to blacklist expiry) while tests run.
118 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd", "blacklist", "clear")
119 except CommandFailedError
:
120 # Fallback for older Ceph cluster
121 blacklist
= json
.loads(self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd",
122 "dump", "--format=json-pretty"))['blacklist']
123 log
.info("Removing {0} blacklist entries".format(len(blacklist
)))
124 for addr
, blacklisted_at
in blacklist
.items():
125 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd", "blacklist", "rm", addr
)
127 client_mount_ids
= [m
.client_id
for m
in self
.mounts
]
128 # In case the test changes the IDs of clients, stash them so that we can
130 self
._original
_client
_ids
= client_mount_ids
131 log
.info(client_mount_ids
)
133 # In case there were any extra auth identities around from a previous
135 for entry
in self
.auth_list():
136 ent_type
, ent_id
= entry
['entity'].split(".")
137 if ent_type
== "client" and ent_id
not in client_mount_ids
and ent_id
!= "admin":
138 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("auth", "del", entry
['entity'])
140 if self
.REQUIRE_FILESYSTEM
:
141 self
.fs
= self
.mds_cluster
.newfs(True)
142 self
.fs
.mds_restart()
144 # In case some test messed with auth caps, reset them
145 for client_id
in client_mount_ids
:
146 self
.mds_cluster
.mon_manager
.raw_cluster_cmd_result(
147 'auth', 'caps', "client.{0}".format(client_id
),
150 'osd', 'allow rw pool={0}'.format(self
.fs
.get_data_pool_name()))
152 # wait for mds restart to complete...
153 self
.fs
.wait_for_daemons()
155 # Mount the requested number of clients
156 for i
in range(0, self
.CLIENTS_REQUIRED
):
157 self
.mounts
[i
].mount()
158 self
.mounts
[i
].wait_until_mounted()
160 # Load an config settings of interest
161 for setting
in self
.LOAD_SETTINGS
:
162 setattr(self
, setting
, int(self
.fs
.mds_asok(
163 ['config', 'get', setting
], self
.mds_cluster
.mds_ids
[0]
166 self
.configs_set
= set()
169 super(CephFSTestCase
, self
).tearDown()
171 self
.mds_cluster
.clear_firewall()
172 for m
in self
.mounts
:
175 for i
, m
in enumerate(self
.mounts
):
176 m
.client_id
= self
._original
_client
_ids
[i
]
178 for subsys
, key
in self
.configs_set
:
179 self
.mds_cluster
.clear_ceph_conf(subsys
, key
)
181 def set_conf(self
, subsys
, key
, value
):
182 self
.configs_set
.add((subsys
, key
))
183 self
.mds_cluster
.set_ceph_conf(subsys
, key
, value
)
187 Convenience wrapper on "ceph auth list"
189 return json
.loads(self
.mds_cluster
.mon_manager
.raw_cluster_cmd(
190 "auth", "list", "--format=json-pretty"
193 def assert_session_count(self
, expected
, ls_data
=None, mds_id
=None):
195 ls_data
= self
.fs
.mds_asok(['session', 'ls'], mds_id
=mds_id
)
197 alive_count
= len([s
for s
in ls_data
if s
['state'] != 'killing'])
199 self
.assertEqual(expected
, alive_count
, "Expected {0} sessions, found {1}".format(
200 expected
, alive_count
203 def assert_session_state(self
, client_id
, expected_state
):
206 self
.fs
.mds_asok(['session', 'ls'])).get(client_id
, {'state': None})['state'],
209 def get_session_data(self
, client_id
):
210 return self
._session
_by
_id
(client_id
)
212 def _session_list(self
):
213 ls_data
= self
.fs
.mds_asok(['session', 'ls'])
214 ls_data
= [s
for s
in ls_data
if s
['state'] not in ['stale', 'closed']]
217 def get_session(self
, client_id
, session_ls
=None):
218 if session_ls
is None:
219 session_ls
= self
.fs
.mds_asok(['session', 'ls'])
221 return self
._session
_by
_id
(session_ls
)[client_id
]
223 def _session_by_id(self
, session_ls
):
224 return dict([(s
['id'], s
) for s
in session_ls
])
226 def wait_for_daemon_start(self
, daemon_ids
=None):
228 Wait until all the daemons appear in the FSMap, either assigned
229 MDS ranks or in the list of standbys
231 def get_daemon_names():
232 return [info
['name'] for info
in self
.mds_cluster
.status().get_all()]
234 if daemon_ids
is None:
235 daemon_ids
= self
.mds_cluster
.mds_ids
238 self
.wait_until_true(
239 lambda: set(daemon_ids
) & set(get_daemon_names()) == set(daemon_ids
),
243 log
.warn("Timeout waiting for daemons {0}, while we have {1}".format(
244 daemon_ids
, get_daemon_names()
248 def assert_mds_crash(self
, daemon_id
):
250 Assert that the a particular MDS daemon crashes (block until
254 self
.mds_cluster
.mds_daemons
[daemon_id
].proc
.wait()
255 except CommandFailedError
as e
:
256 log
.info("MDS '{0}' crashed with status {1} as expected".format(daemon_id
, e
.exitstatus
))
257 self
.mds_cluster
.mds_daemons
[daemon_id
].proc
= None
259 # Go remove the coredump from the crash, otherwise teuthology.internal.coredump will
260 # catch it later and treat it as a failure.
261 p
= self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.run(args
=[
262 "sudo", "sysctl", "-n", "kernel.core_pattern"], stdout
=StringIO())
263 core_pattern
= p
.stdout
.getvalue().strip()
264 if os
.path
.dirname(core_pattern
): # Non-default core_pattern with a directory in it
265 # We have seen a core_pattern that looks like it's from teuthology's coredump
266 # task, so proceed to clear out the core file
267 log
.info("Clearing core from pattern: {0}".format(core_pattern
))
269 # Determine the PID of the crashed MDS by inspecting the MDSMap, it had
270 # to talk to the mons to get assigned a rank to reach the point of crashing
271 addr
= self
.mds_cluster
.mon_manager
.get_mds_status(daemon_id
)['addr']
272 pid_str
= addr
.split("/")[1]
273 log
.info("Determined crasher PID was {0}".format(pid_str
))
275 # Substitute PID into core_pattern to get a glob
276 core_glob
= core_pattern
.replace("%p", pid_str
)
277 core_glob
= re
.sub("%[a-z]", "*", core_glob
) # Match all for all other % tokens
279 # Verify that we see the expected single coredump matching the expected pattern
280 ls_proc
= self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.run(args
=[
281 "sudo", "ls", run
.Raw(core_glob
)
282 ], stdout
=StringIO())
283 cores
= [f
for f
in ls_proc
.stdout
.getvalue().strip().split("\n") if f
]
284 log
.info("Enumerated cores: {0}".format(cores
))
285 self
.assertEqual(len(cores
), 1)
287 log
.info("Found core file {0}, deleting it".format(cores
[0]))
289 self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.run(args
=[
290 "sudo", "rm", "-f", cores
[0]
293 log
.info("No core_pattern directory set, nothing to clear (internal.coredump not enabled?)")
296 raise AssertionError("MDS daemon '{0}' did not crash as expected".format(daemon_id
))