]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_mds_metrics.py
727b80c6a91bf63e2c6df8a3ad980175b71c0616
[ceph.git] / ceph / qa / tasks / cephfs / test_mds_metrics.py
1 import os
2 import json
3 import time
4 import random
5 import logging
6 import errno
7
8 from teuthology.contextutil import safe_while, MaxWhileTries
9 from teuthology.exceptions import CommandFailedError
10 from tasks.cephfs.cephfs_test_case import CephFSTestCase
11
12 log = logging.getLogger(__name__)
13
14 class TestMDSMetrics(CephFSTestCase):
15 CLIENTS_REQUIRED = 2
16 MDSS_REQUIRED = 3
17
18 TEST_DIR_PERFIX = "test_mds_metrics"
19
20 def setUp(self):
21 super(TestMDSMetrics, self).setUp()
22 self._start_with_single_active_mds()
23 self._enable_mgr_stats_plugin()
24
25 def tearDown(self):
26 self._disable_mgr_stats_plugin()
27 super(TestMDSMetrics, self).tearDown()
28
29 def _start_with_single_active_mds(self):
30 curr_max_mds = self.fs.get_var('max_mds')
31 if curr_max_mds > 1:
32 self.fs.shrink(1)
33
34 def verify_mds_metrics(self, active_mds_count=1, client_count=1, ranks=[]):
35 def verify_metrics_cbk(metrics):
36 mds_metrics = metrics['metrics']
37 if not len(mds_metrics) == active_mds_count + 1: # n active mdss + delayed set
38 return False
39 fs_status = self.fs.status()
40 nonlocal ranks
41 if not ranks:
42 ranks = set([info['rank'] for info in fs_status.get_ranks(self.fs.id)])
43 for rank in ranks:
44 r = mds_metrics.get("mds.{}".format(rank), None)
45 if not r or not len(mds_metrics['delayed_ranks']) == 0:
46 return False
47 global_metrics = metrics['global_metrics']
48 client_metadata = metrics['client_metadata']
49 if not len(global_metrics) >= client_count or not len(client_metadata) >= client_count:
50 return False
51 return True
52 return verify_metrics_cbk
53
54 def _fs_perf_stats(self, *args):
55 return self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "perf", "stats", *args)
56
57 def _enable_mgr_stats_plugin(self):
58 return self.mgr_cluster.mon_manager.raw_cluster_cmd("mgr", "module", "enable", "stats")
59
60 def _disable_mgr_stats_plugin(self):
61 return self.mgr_cluster.mon_manager.raw_cluster_cmd("mgr", "module", "disable", "stats")
62
63 def _spread_directory_on_all_ranks(self, fscid):
64 fs_status = self.fs.status()
65 ranks = set([info['rank'] for info in fs_status.get_ranks(fscid)])
66 # create a per-rank pinned directory
67 for rank in ranks:
68 dirname = "{0}_{1}".format(TestMDSMetrics.TEST_DIR_PERFIX, rank)
69 self.mount_a.run_shell(["mkdir", dirname])
70 self.mount_a.setfattr(dirname, "ceph.dir.pin", str(rank))
71 log.info("pinning directory {0} to rank {1}".format(dirname, rank))
72 for i in range(16):
73 filename = "{0}.{1}".format("test", i)
74 self.mount_a.write_n_mb(os.path.join(dirname, filename), 1)
75
76 def _do_spread_io(self, fscid):
77 # spread readdir I/O
78 self.mount_b.run_shell(["find", "."])
79
80 def _do_spread_io_all_clients(self, fscid):
81 # spread readdir I/O
82 self.mount_a.run_shell(["find", "."])
83 self.mount_b.run_shell(["find", "."])
84
85 def _cleanup_test_dirs(self):
86 dirnames = self.mount_a.run_shell(["ls"]).stdout.getvalue()
87 for dirname in dirnames.split("\n"):
88 if dirname.startswith(TestMDSMetrics.TEST_DIR_PERFIX):
89 log.info("cleaning directory {}".format(dirname))
90 self.mount_a.run_shell(["rm", "-rf", dirname])
91
92 def _get_metrics(self, verifier_callback, trials, *args):
93 metrics = None
94 done = False
95 with safe_while(sleep=1, tries=trials, action='wait for metrics') as proceed:
96 while proceed():
97 metrics = json.loads(self._fs_perf_stats(*args))
98 done = verifier_callback(metrics)
99 if done:
100 break
101 return done, metrics
102
103 def _setup_fs(self, fs_name):
104 fs_a = self.mds_cluster.newfs(name=fs_name)
105
106 self.mds_cluster.mds_restart()
107
108 # Wait for filesystem to go healthy
109 fs_a.wait_for_daemons()
110
111 # Reconfigure client auth caps
112 for mount in self.mounts:
113 self.mds_cluster.mon_manager.raw_cluster_cmd_result(
114 'auth', 'caps', f"client.{mount.client_id}",
115 'mds', 'allow',
116 'mon', 'allow r',
117 'osd', f'allow rw pool={fs_a.get_data_pool_name()}')
118
119 return fs_a
120
121 # basic check to verify if we get back metrics from each active mds rank
122
123 def test_metrics_from_rank(self):
124 # validate
125 valid, metrics = self._get_metrics(
126 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
127 log.debug("metrics={0}".format(metrics))
128 self.assertTrue(valid)
129
130 def test_metrics_post_client_disconnection(self):
131 # validate
132 valid, metrics = self._get_metrics(
133 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
134 log.debug("metrics={0}".format(metrics))
135 self.assertTrue(valid)
136
137 self.mount_a.umount_wait()
138
139 valid, metrics = self._get_metrics(
140 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED - 1), 30)
141 log.debug("metrics={0}".format(metrics))
142 self.assertTrue(valid)
143
144 def test_metrics_mds_grow(self):
145 # validate
146 valid, metrics = self._get_metrics(
147 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
148 log.debug("metrics={0}".format(metrics))
149 self.assertTrue(valid)
150
151 # grow the mds cluster
152 self.fs.grow(2)
153
154 fscid = self.fs.id
155 # spread directory per rank
156 self._spread_directory_on_all_ranks(fscid)
157
158 # spread some I/O
159 self._do_spread_io(fscid)
160
161 # wait a bit for mgr to get updated metrics
162 time.sleep(5)
163
164 # validate
165 valid, metrics = self._get_metrics(self.verify_mds_metrics(
166 active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED) , 30)
167 log.debug("metrics={0}".format(metrics))
168 self.assertTrue(valid)
169
170 # cleanup test directories
171 self._cleanup_test_dirs()
172
173 def test_metrics_mds_grow_and_shrink(self):
174 # validate
175 valid, metrics = self._get_metrics(
176 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
177 log.debug("metrics={0}".format(metrics))
178 self.assertTrue(valid)
179
180 # grow the mds cluster
181 self.fs.grow(2)
182
183 fscid = self.fs.id
184 # spread directory per rank
185 self._spread_directory_on_all_ranks(fscid)
186
187 # spread some I/O
188 self._do_spread_io(fscid)
189
190 # wait a bit for mgr to get updated metrics
191 time.sleep(5)
192
193 # validate
194 valid, metrics = self._get_metrics(
195 self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
196 log.debug("metrics={0}".format(metrics))
197 self.assertTrue(valid)
198
199 # shrink mds cluster
200 self.fs.shrink(1)
201
202 # wait a bit for mgr to get updated metrics
203 time.sleep(5)
204
205 # validate
206 valid, metrics = self._get_metrics(
207 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
208 log.debug("metrics={0}".format(metrics))
209 self.assertTrue(valid)
210
211 # cleanup test directories
212 self._cleanup_test_dirs()
213
214 def test_delayed_metrics(self):
215 # validate
216 valid, metrics = self._get_metrics(
217 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
218 log.debug("metrics={0}".format(metrics))
219 self.assertTrue(valid)
220
221 # grow the mds cluster
222 self.fs.grow(2)
223
224 fscid = self.fs.id
225 # spread directory per rank
226 self._spread_directory_on_all_ranks(fscid)
227
228 # spread some I/O
229 self._do_spread_io(fscid)
230
231 # wait a bit for mgr to get updated metrics
232 time.sleep(5)
233
234 # validate
235 valid, metrics = self._get_metrics(
236 self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
237 log.debug("metrics={0}".format(metrics))
238 self.assertTrue(valid)
239
240 # do not give this mds any chance
241 delayed_rank = 1
242 mds_id_rank0 = self.fs.get_rank(rank=0)['name']
243 mds_id_rank1 = self.fs.get_rank(rank=1)['name']
244
245 self.fs.set_inter_mds_block(True, mds_id_rank0, mds_id_rank1)
246
247 def verify_delayed_metrics(metrics):
248 mds_metrics = metrics['metrics']
249 r = mds_metrics.get("mds.{}".format(delayed_rank), None)
250 if not r or not delayed_rank in mds_metrics['delayed_ranks']:
251 return False
252 return True
253 # validate
254 valid, metrics = self._get_metrics(verify_delayed_metrics, 30)
255 log.debug("metrics={0}".format(metrics))
256
257 self.assertTrue(valid)
258 self.fs.set_inter_mds_block(False, mds_id_rank0, mds_id_rank1)
259
260 # validate
261 valid, metrics = self._get_metrics(
262 self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
263 log.debug("metrics={0}".format(metrics))
264 self.assertTrue(valid)
265
266 # cleanup test directories
267 self._cleanup_test_dirs()
268
269 def test_query_mds_filter(self):
270 # validate
271 valid, metrics = self._get_metrics(
272 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
273 log.debug("metrics={0}".format(metrics))
274 self.assertTrue(valid)
275
276 # grow the mds cluster
277 self.fs.grow(2)
278
279 fscid = self.fs.id
280 # spread directory per rank
281 self._spread_directory_on_all_ranks(fscid)
282
283 # spread some I/O
284 self._do_spread_io(fscid)
285
286 # wait a bit for mgr to get updated metrics
287 time.sleep(5)
288
289 # validate
290 valid, metrics = self._get_metrics(
291 self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
292 log.debug("metrics={0}".format(metrics))
293 self.assertTrue(valid)
294
295 # initiate a new query with `--mds_rank` filter and validate if
296 # we get metrics *only* from that mds.
297 filtered_mds = 1
298 valid, metrics = self._get_metrics(
299 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED,
300 ranks=[filtered_mds]), 30, '--mds_rank={}'.format(filtered_mds))
301 log.debug("metrics={0}".format(metrics))
302 self.assertTrue(valid)
303
304 def test_query_client_filter(self):
305 # validate
306 valid, metrics = self._get_metrics(
307 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
308 log.debug("metrics={0}".format(metrics))
309 self.assertTrue(valid)
310
311 mds_metrics = metrics['metrics']
312 # pick an random client
313 client = random.choice(list(mds_metrics['mds.0'].keys()))
314 # could have used regex to extract client id
315 client_id = (client.split(' ')[0]).split('.')[-1]
316
317 valid, metrics = self._get_metrics(
318 self.verify_mds_metrics(client_count=1), 30, '--client_id={}'.format(client_id))
319 log.debug("metrics={0}".format(metrics))
320 self.assertTrue(valid)
321
322 def test_query_client_ip_filter(self):
323 # validate
324 valid, metrics = self._get_metrics(
325 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
326 log.debug("metrics={0}".format(metrics))
327 self.assertTrue(valid)
328
329 client_matadata = metrics['client_metadata']
330 # pick an random client
331 client = random.choice(list(client_matadata.keys()))
332 # get IP of client to use in filter
333 client_ip = client_matadata[client]['IP']
334
335 valid, metrics = self._get_metrics(
336 self.verify_mds_metrics(client_count=1), 30, '--client_ip={}'.format(client_ip))
337 log.debug("metrics={0}".format(metrics))
338 self.assertTrue(valid)
339
340 # verify IP from output with filter IP
341 for i in metrics['client_metadata']:
342 self.assertEqual(client_ip, metrics['client_metadata'][i]['IP'])
343
344 def test_query_mds_and_client_filter(self):
345 # validate
346 valid, metrics = self._get_metrics(
347 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
348 log.debug("metrics={0}".format(metrics))
349 self.assertTrue(valid)
350
351 # grow the mds cluster
352 self.fs.grow(2)
353
354 fscid = self.fs.id
355 # spread directory per rank
356 self._spread_directory_on_all_ranks(fscid)
357
358 # spread some I/O
359 self._do_spread_io_all_clients(fscid)
360
361 # wait a bit for mgr to get updated metrics
362 time.sleep(5)
363
364 # validate
365 valid, metrics = self._get_metrics(
366 self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
367 log.debug("metrics={0}".format(metrics))
368 self.assertTrue(valid)
369
370 mds_metrics = metrics['metrics']
371
372 # pick an random client
373 client = random.choice(list(mds_metrics['mds.1'].keys()))
374 # could have used regex to extract client id
375 client_id = (client.split(' ')[0]).split('.')[-1]
376 filtered_mds = 1
377 valid, metrics = self._get_metrics(
378 self.verify_mds_metrics(client_count=1, ranks=[filtered_mds]),
379 30, '--mds_rank={}'.format(filtered_mds), '--client_id={}'.format(client_id))
380 log.debug("metrics={0}".format(metrics))
381 self.assertTrue(valid)
382
383 def test_for_invalid_mds_rank(self):
384 invalid_mds_rank = "1,"
385 # try, 'fs perf stat' command with invalid mds_rank
386 try:
387 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "perf", "stats", "--mds_rank", invalid_mds_rank)
388 except CommandFailedError as ce:
389 if ce.exitstatus != errno.EINVAL:
390 raise
391 else:
392 raise RuntimeError("expected the 'fs perf stat' command to fail for invalid mds_rank")
393
394 def test_for_invalid_client_id(self):
395 invalid_client_id = "abcd"
396 # try, 'fs perf stat' command with invalid client_id
397 try:
398 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "perf", "stats", "--client_id", invalid_client_id)
399 except CommandFailedError as ce:
400 if ce.exitstatus != errno.EINVAL:
401 raise
402 else:
403 raise RuntimeError("expected the 'fs perf stat' command to fail for invalid client_id")
404
405 def test_for_invalid_client_ip(self):
406 invalid_client_ip = "1.2.3"
407 # try, 'fs perf stat' command with invalid client_ip
408 try:
409 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "perf", "stats", "--client_ip", invalid_client_ip)
410 except CommandFailedError as ce:
411 if ce.exitstatus != errno.EINVAL:
412 raise
413 else:
414 raise RuntimeError("expected the 'fs perf stat' command to fail for invalid client_ip")
415
416 def test_perf_stats_stale_metrics(self):
417 """
418 That `ceph fs perf stats` doesn't output stale metrics after the rank0 MDS failover
419 """
420 # validate
421 valid, metrics = self._get_metrics(self.verify_mds_metrics(
422 active_mds_count=1, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
423 log.debug(f'metrics={metrics}')
424 self.assertTrue(valid)
425
426 #mount_a and mount_b are the clients mounted for TestMDSMetrics. So get their
427 #entries from the global_metrics.
428 client_a_name = f'client.{self.mount_a.get_global_id()}'
429 client_b_name = f'client.{self.mount_b.get_global_id()}'
430
431 global_metrics = metrics['global_metrics']
432 client_a_metrics = global_metrics[client_a_name]
433 client_b_metrics = global_metrics[client_b_name]
434
435 #fail rank0 mds
436 self.fs.rank_fail(rank=0)
437
438 # Wait for 10 seconds for the failover to complete and
439 # the mgr to get initial metrics from the new rank0 mds.
440 time.sleep(10)
441
442 fscid = self.fs.id
443
444 # spread directory per rank
445 self._spread_directory_on_all_ranks(fscid)
446
447 # spread some I/O
448 self._do_spread_io_all_clients(fscid)
449
450 # wait a bit for mgr to get updated metrics
451 time.sleep(5)
452
453 # validate
454 try:
455 valid, metrics_new = self._get_metrics(self.verify_mds_metrics(
456 active_mds_count=1, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
457 log.debug(f'metrics={metrics_new}')
458 self.assertTrue(valid)
459
460 global_metrics = metrics_new['global_metrics']
461 client_a_metrics_new = global_metrics[client_a_name]
462 client_b_metrics_new = global_metrics[client_b_name]
463
464 #the metrics should be different for the test to succeed.
465 self.assertNotEqual(client_a_metrics, client_a_metrics_new)
466 self.assertNotEqual(client_b_metrics, client_b_metrics_new)
467 except MaxWhileTries:
468 raise RuntimeError("Failed to fetch `ceph fs perf stats` metrics")
469 finally:
470 # cleanup test directories
471 self._cleanup_test_dirs()
472
473 def test_client_metrics_and_metadata(self):
474 self.mount_a.umount_wait()
475 self.mount_b.umount_wait()
476
477 self.mds_cluster.mon_manager.raw_cluster_cmd("fs", "flag", "set",
478 "enable_multiple", "true",
479 "--yes-i-really-mean-it")
480
481 #creating filesystem
482 fs_a = self._setup_fs(fs_name = "fs1")
483
484 # Mount a client on fs_a
485 self.mount_a.mount_wait(cephfs_name=fs_a.name)
486 self.mount_a.write_n_mb("pad.bin", 1)
487 self.mount_a.write_n_mb("test.bin", 2)
488 self.mount_a.path_to_ino("test.bin")
489 self.mount_a.create_files()
490
491 #creating another filesystem
492 fs_b = self._setup_fs(fs_name = "fs2")
493
494 # Mount a client on fs_b
495 self.mount_b.mount_wait(cephfs_name=fs_b.name)
496 self.mount_b.write_n_mb("test.bin", 1)
497 self.mount_b.path_to_ino("test.bin")
498 self.mount_b.create_files()
499
500 # validate
501 valid, metrics = self._get_metrics(
502 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
503 log.debug(f"metrics={metrics}")
504 self.assertTrue(valid)
505
506 client_metadata = metrics['client_metadata']
507
508 for i in client_metadata:
509 if not (client_metadata[i]['hostname']):
510 raise RuntimeError("hostname not found!")
511 if not (client_metadata[i]['valid_metrics']):
512 raise RuntimeError("valid_metrics not found!")
513