4 from unittest
import case
5 from tasks
.ceph_test_case
import CephTestCase
8 from StringIO
import StringIO
10 from tasks
.cephfs
.fuse_mount
import FuseMount
12 from teuthology
.orchestra
import run
13 from teuthology
.orchestra
.run
import CommandFailedError
16 log
= logging
.getLogger(__name__
)
19 def for_teuthology(f
):
21 Decorator that adds an "is_for_teuthology" attribute to the wrapped function
23 f
.is_for_teuthology
= True
27 def needs_trimming(f
):
29 Mark fn as requiring a client capable of trimming its cache (i.e. for ceph-fuse
30 this means it needs to be able to run as root, currently)
32 f
.needs_trimming
= True
36 class CephFSTestCase(CephTestCase
):
38 Test case for Ceph FS, requires caller to populate Filesystem and Mounts,
39 into the fs, mount_a, mount_b class attributes (setting mount_b is optional)
41 Handles resetting the cluster under test between tests.
44 # FIXME weird explicit naming
49 # Declarative test requirements: subclasses should override these to indicate
50 # their special needs. If not met, tests will be skipped.
53 REQUIRE_KCLIENT_REMOTE
= False
54 REQUIRE_ONE_CLIENT_REMOTE
= False
56 # Whether to create the default filesystem during setUp
57 REQUIRE_FILESYSTEM
= True
59 # requires REQUIRE_FILESYSTEM = True
60 REQUIRE_RECOVERY_FILESYSTEM
= False
65 super(CephFSTestCase
, self
).setUp()
67 if len(self
.mds_cluster
.mds_ids
) < self
.MDSS_REQUIRED
:
68 raise case
.SkipTest("Only have {0} MDSs, require {1}".format(
69 len(self
.mds_cluster
.mds_ids
), self
.MDSS_REQUIRED
72 if len(self
.mounts
) < self
.CLIENTS_REQUIRED
:
73 raise case
.SkipTest("Only have {0} clients, require {1}".format(
74 len(self
.mounts
), self
.CLIENTS_REQUIRED
77 if self
.REQUIRE_KCLIENT_REMOTE
:
78 if not isinstance(self
.mounts
[0], FuseMount
) or not isinstance(self
.mounts
[1], FuseMount
):
79 # kclient kill() power cycles nodes, so requires clients to each be on
81 if self
.mounts
[0].client_remote
.hostname
== self
.mounts
[1].client_remote
.hostname
:
82 raise case
.SkipTest("kclient clients must be on separate nodes")
84 if self
.REQUIRE_ONE_CLIENT_REMOTE
:
85 if self
.mounts
[0].client_remote
.hostname
in self
.mds_cluster
.get_mds_hostnames():
86 raise case
.SkipTest("Require first client to be on separate server from MDSs")
88 # Create friendly mount_a, mount_b attrs
89 for i
in range(0, self
.CLIENTS_REQUIRED
):
90 setattr(self
, "mount_{0}".format(chr(ord('a') + i
)), self
.mounts
[i
])
92 self
.mds_cluster
.clear_firewall()
94 # Unmount all clients, we are about to blow away the filesystem
95 for mount
in self
.mounts
:
96 if mount
.is_mounted():
97 mount
.umount_wait(force
=True)
99 # To avoid any issues with e.g. unlink bugs, we destroy and recreate
100 # the filesystem rather than just doing a rm -rf of files
101 self
.mds_cluster
.mds_stop()
102 self
.mds_cluster
.mds_fail()
103 self
.mds_cluster
.delete_all_filesystems()
104 self
.fs
= None # is now invalid!
105 self
.recovery_fs
= None
107 # In case anything is in the OSD blacklist list, clear it out. This is to avoid
108 # the OSD map changing in the background (due to blacklist expiry) while tests run.
110 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd", "blacklist", "clear")
111 except CommandFailedError
:
112 # Fallback for older Ceph cluster
113 blacklist
= json
.loads(self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd",
114 "dump", "--format=json-pretty"))['blacklist']
115 log
.info("Removing {0} blacklist entries".format(len(blacklist
)))
116 for addr
, blacklisted_at
in blacklist
.items():
117 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("osd", "blacklist", "rm", addr
)
119 client_mount_ids
= [m
.client_id
for m
in self
.mounts
]
120 # In case the test changes the IDs of clients, stash them so that we can
122 self
._original
_client
_ids
= client_mount_ids
123 log
.info(client_mount_ids
)
125 # In case there were any extra auth identities around from a previous
127 for entry
in self
.auth_list():
128 ent_type
, ent_id
= entry
['entity'].split(".")
129 if ent_type
== "client" and ent_id
not in client_mount_ids
and ent_id
!= "admin":
130 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("auth", "del", entry
['entity'])
132 if self
.REQUIRE_FILESYSTEM
:
133 self
.fs
= self
.mds_cluster
.newfs(create
=True)
134 self
.fs
.mds_restart()
136 # In case some test messed with auth caps, reset them
137 for client_id
in client_mount_ids
:
138 self
.mds_cluster
.mon_manager
.raw_cluster_cmd_result(
139 'auth', 'caps', "client.{0}".format(client_id
),
142 'osd', 'allow rw pool={0}'.format(self
.fs
.get_data_pool_name()))
144 # wait for mds restart to complete...
145 self
.fs
.wait_for_daemons()
147 # Mount the requested number of clients
148 for i
in range(0, self
.CLIENTS_REQUIRED
):
149 self
.mounts
[i
].mount()
150 self
.mounts
[i
].wait_until_mounted()
152 if self
.REQUIRE_RECOVERY_FILESYSTEM
:
153 if not self
.REQUIRE_FILESYSTEM
:
154 raise case
.SkipTest("Recovery filesystem requires a primary filesystem as well")
155 self
.fs
.mon_manager
.raw_cluster_cmd('fs', 'flag', 'set',
156 'enable_multiple', 'true',
157 '--yes-i-really-mean-it')
158 self
.recovery_fs
= self
.mds_cluster
.newfs(name
="recovery_fs", create
=False)
159 self
.recovery_fs
.set_metadata_overlay(True)
160 self
.recovery_fs
.set_data_pool_name(self
.fs
.get_data_pool_name())
161 self
.recovery_fs
.create()
162 self
.recovery_fs
.getinfo(refresh
=True)
163 self
.recovery_fs
.mds_restart()
164 self
.recovery_fs
.wait_for_daemons()
166 # Load an config settings of interest
167 for setting
in self
.LOAD_SETTINGS
:
168 setattr(self
, setting
, float(self
.fs
.mds_asok(
169 ['config', 'get', setting
], self
.mds_cluster
.mds_ids
[0]
172 self
.configs_set
= set()
175 super(CephFSTestCase
, self
).tearDown()
177 self
.mds_cluster
.clear_firewall()
178 for m
in self
.mounts
:
181 for i
, m
in enumerate(self
.mounts
):
182 m
.client_id
= self
._original
_client
_ids
[i
]
184 for subsys
, key
in self
.configs_set
:
185 self
.mds_cluster
.clear_ceph_conf(subsys
, key
)
187 def set_conf(self
, subsys
, key
, value
):
188 self
.configs_set
.add((subsys
, key
))
189 self
.mds_cluster
.set_ceph_conf(subsys
, key
, value
)
193 Convenience wrapper on "ceph auth ls"
195 return json
.loads(self
.mds_cluster
.mon_manager
.raw_cluster_cmd(
196 "auth", "ls", "--format=json-pretty"
199 def assert_session_count(self
, expected
, ls_data
=None, mds_id
=None):
201 ls_data
= self
.fs
.mds_asok(['session', 'ls'], mds_id
=mds_id
)
203 alive_count
= len([s
for s
in ls_data
if s
['state'] != 'killing'])
205 self
.assertEqual(expected
, alive_count
, "Expected {0} sessions, found {1}".format(
206 expected
, alive_count
209 def assert_session_state(self
, client_id
, expected_state
):
212 self
.fs
.mds_asok(['session', 'ls'])).get(client_id
, {'state': None})['state'],
215 def get_session_data(self
, client_id
):
216 return self
._session
_by
_id
(client_id
)
218 def _session_list(self
):
219 ls_data
= self
.fs
.mds_asok(['session', 'ls'])
220 ls_data
= [s
for s
in ls_data
if s
['state'] not in ['stale', 'closed']]
223 def get_session(self
, client_id
, session_ls
=None):
224 if session_ls
is None:
225 session_ls
= self
.fs
.mds_asok(['session', 'ls'])
227 return self
._session
_by
_id
(session_ls
)[client_id
]
229 def _session_by_id(self
, session_ls
):
230 return dict([(s
['id'], s
) for s
in session_ls
])
232 def wait_for_daemon_start(self
, daemon_ids
=None):
234 Wait until all the daemons appear in the FSMap, either assigned
235 MDS ranks or in the list of standbys
237 def get_daemon_names():
238 return [info
['name'] for info
in self
.mds_cluster
.status().get_all()]
240 if daemon_ids
is None:
241 daemon_ids
= self
.mds_cluster
.mds_ids
244 self
.wait_until_true(
245 lambda: set(daemon_ids
) & set(get_daemon_names()) == set(daemon_ids
),
249 log
.warn("Timeout waiting for daemons {0}, while we have {1}".format(
250 daemon_ids
, get_daemon_names()
254 def delete_mds_coredump(self
, daemon_id
):
255 # delete coredump file, otherwise teuthology.internal.coredump will
256 # catch it later and treat it as a failure.
257 p
= self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.run(args
=[
258 "sudo", "sysctl", "-n", "kernel.core_pattern"], stdout
=StringIO())
259 core_dir
= os
.path
.dirname(p
.stdout
.getvalue().strip())
260 if core_dir
: # Non-default core_pattern with a directory in it
261 # We have seen a core_pattern that looks like it's from teuthology's coredump
262 # task, so proceed to clear out the core file
263 log
.info("Clearing core from directory: {0}".format(core_dir
))
265 # Verify that we see the expected single coredump
266 ls_proc
= self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.run(args
=[
267 "cd", core_dir
, run
.Raw('&&'),
268 "sudo", "ls", run
.Raw('|'), "sudo", "xargs", "file"
269 ], stdout
=StringIO())
270 cores
= [l
.partition(":")[0]
271 for l
in ls_proc
.stdout
.getvalue().strip().split("\n")
272 if re
.match(r
'.*ceph-mds.* -i +{0}'.format(daemon_id
), l
)]
274 log
.info("Enumerated cores: {0}".format(cores
))
275 self
.assertEqual(len(cores
), 1)
277 log
.info("Found core file {0}, deleting it".format(cores
[0]))
279 self
.mds_cluster
.mds_daemons
[daemon_id
].remote
.run(args
=[
280 "cd", core_dir
, run
.Raw('&&'), "sudo", "rm", "-f", cores
[0]
283 log
.info("No core_pattern directory set, nothing to clear (internal.coredump not enabled?)")
285 def _wait_subtrees(self
, status
, rank
, test
):
289 for i
in range(timeout
/pause
):
290 subtrees
= self
.fs
.mds_asok(["get", "subtrees"], mds_id
=status
.get_rank(self
.fs
.id, rank
)['name'])
291 subtrees
= filter(lambda s
: s
['dir']['path'].startswith('/'), subtrees
)
292 filtered
= sorted([(s
['dir']['path'], s
['auth_first']) for s
in subtrees
])
293 log
.info("%s =?= %s", filtered
, test
)
295 # Confirm export_pin in output is correct:
297 self
.assertTrue(s
['export_pin'] == s
['auth_first'])
300 raise RuntimeError("rank {0} failed to reach desired subtree state", rank
)