]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_mds_metrics.py
import ceph quincy 17.2.6
[ceph.git] / ceph / qa / tasks / cephfs / test_mds_metrics.py
1 import os
2 import json
3 import time
4 import random
5 import logging
6 import errno
7
8 from teuthology.contextutil import safe_while, MaxWhileTries
9 from teuthology.exceptions import CommandFailedError
10 from tasks.cephfs.cephfs_test_case import CephFSTestCase
11
12 log = logging.getLogger(__name__)
13
14 class TestMDSMetrics(CephFSTestCase):
15 CLIENTS_REQUIRED = 2
16 MDSS_REQUIRED = 3
17
18 TEST_DIR_PERFIX = "test_mds_metrics"
19
20 def setUp(self):
21 super(TestMDSMetrics, self).setUp()
22 self._start_with_single_active_mds()
23 self._enable_mgr_stats_plugin()
24
25 def tearDown(self):
26 self._disable_mgr_stats_plugin()
27 super(TestMDSMetrics, self).tearDown()
28
29 def _start_with_single_active_mds(self):
30 curr_max_mds = self.fs.get_var('max_mds')
31 if curr_max_mds > 1:
32 self.fs.shrink(1)
33
34 def verify_mds_metrics(self, active_mds_count=1, client_count=1, ranks=[], mul_fs=[]):
35 def verify_metrics_cbk(metrics):
36 mds_metrics = metrics['metrics']
37 if not len(mds_metrics) == active_mds_count + 1: # n active mdss + delayed set
38 return False
39 fs_status = self.fs.status()
40 nonlocal ranks, mul_fs
41 if not ranks:
42 if not mul_fs:
43 mul_fs = [self.fs.id]
44 for filesystem in mul_fs:
45 ranks = set([info['rank'] for info in fs_status.get_ranks(filesystem)])
46 for rank in ranks:
47 r = mds_metrics.get("mds.{}".format(rank), None)
48 if not r or not len(mds_metrics['delayed_ranks']) == 0:
49 return False
50 for item in mul_fs:
51 key = fs_status.get_fsmap(item)['mdsmap']['fs_name']
52 global_metrics = metrics['global_metrics'].get(key, {})
53 client_metadata = metrics['client_metadata'].get(key, {})
54 if not len(global_metrics) >= client_count or not len(client_metadata) >= client_count:
55 return False
56 return True
57 return verify_metrics_cbk
58
59 def _fs_perf_stats(self, *args):
60 return self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "perf", "stats", *args)
61
62 def _enable_mgr_stats_plugin(self):
63 return self.mgr_cluster.mon_manager.raw_cluster_cmd("mgr", "module", "enable", "stats")
64
65 def _disable_mgr_stats_plugin(self):
66 return self.mgr_cluster.mon_manager.raw_cluster_cmd("mgr", "module", "disable", "stats")
67
68 def _spread_directory_on_all_ranks(self, fscid):
69 fs_status = self.fs.status()
70 ranks = set([info['rank'] for info in fs_status.get_ranks(fscid)])
71 # create a per-rank pinned directory
72 for rank in ranks:
73 dirname = "{0}_{1}".format(TestMDSMetrics.TEST_DIR_PERFIX, rank)
74 self.mount_a.run_shell(["mkdir", dirname])
75 self.mount_a.setfattr(dirname, "ceph.dir.pin", str(rank))
76 log.info("pinning directory {0} to rank {1}".format(dirname, rank))
77 for i in range(16):
78 filename = "{0}.{1}".format("test", i)
79 self.mount_a.write_n_mb(os.path.join(dirname, filename), 1)
80
81 def _do_spread_io(self, fscid):
82 # spread readdir I/O
83 self.mount_b.run_shell(["find", "."])
84
85 def _do_spread_io_all_clients(self, fscid):
86 # spread readdir I/O
87 self.mount_a.run_shell(["find", "."])
88 self.mount_b.run_shell(["find", "."])
89
90 def _cleanup_test_dirs(self):
91 dirnames = self.mount_a.run_shell(["ls"]).stdout.getvalue()
92 for dirname in dirnames.split("\n"):
93 if dirname.startswith(TestMDSMetrics.TEST_DIR_PERFIX):
94 log.info("cleaning directory {}".format(dirname))
95 self.mount_a.run_shell(["rm", "-rf", dirname])
96
97 def _get_metrics(self, verifier_callback, trials, *args):
98 metrics = None
99 done = False
100 with safe_while(sleep=1, tries=trials, action='wait for metrics') as proceed:
101 while proceed():
102 metrics = json.loads(self._fs_perf_stats(*args))
103 done = verifier_callback(metrics)
104 if done:
105 break
106 return done, metrics
107
108 def _setup_fs(self, fs_name):
109 fs_a = self.mds_cluster.newfs(name=fs_name)
110
111 self.mds_cluster.mds_restart()
112
113 # Wait for filesystem to go healthy
114 fs_a.wait_for_daemons()
115
116 # Reconfigure client auth caps
117 for mount in self.mounts:
118 self.mds_cluster.mon_manager.raw_cluster_cmd_result(
119 'auth', 'caps', f"client.{mount.client_id}",
120 'mds', 'allow',
121 'mon', 'allow r',
122 'osd', f'allow rw pool={fs_a.get_data_pool_name()}')
123
124 return fs_a
125
126 # basic check to verify if we get back metrics from each active mds rank
127
128 def test_metrics_from_rank(self):
129 # validate
130 valid, metrics = self._get_metrics(
131 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
132 log.debug("metrics={0}".format(metrics))
133 self.assertTrue(valid)
134
135 def test_metrics_post_client_disconnection(self):
136 # validate
137 valid, metrics = self._get_metrics(
138 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
139 log.debug("metrics={0}".format(metrics))
140 self.assertTrue(valid)
141
142 self.mount_a.umount_wait()
143
144 valid, metrics = self._get_metrics(
145 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED - 1), 30)
146 log.debug("metrics={0}".format(metrics))
147 self.assertTrue(valid)
148
149 def test_metrics_mds_grow(self):
150 # validate
151 valid, metrics = self._get_metrics(
152 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
153 log.debug("metrics={0}".format(metrics))
154 self.assertTrue(valid)
155
156 # grow the mds cluster
157 self.fs.grow(2)
158
159 fscid = self.fs.id
160 # spread directory per rank
161 self._spread_directory_on_all_ranks(fscid)
162
163 # spread some I/O
164 self._do_spread_io(fscid)
165
166 # wait a bit for mgr to get updated metrics
167 time.sleep(5)
168
169 # validate
170 valid, metrics = self._get_metrics(self.verify_mds_metrics(
171 active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED) , 30)
172 log.debug("metrics={0}".format(metrics))
173 self.assertTrue(valid)
174
175 # cleanup test directories
176 self._cleanup_test_dirs()
177
178 def test_metrics_mds_grow_and_shrink(self):
179 # validate
180 valid, metrics = self._get_metrics(
181 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
182 log.debug("metrics={0}".format(metrics))
183 self.assertTrue(valid)
184
185 # grow the mds cluster
186 self.fs.grow(2)
187
188 fscid = self.fs.id
189 # spread directory per rank
190 self._spread_directory_on_all_ranks(fscid)
191
192 # spread some I/O
193 self._do_spread_io(fscid)
194
195 # wait a bit for mgr to get updated metrics
196 time.sleep(5)
197
198 # validate
199 valid, metrics = self._get_metrics(
200 self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
201 log.debug("metrics={0}".format(metrics))
202 self.assertTrue(valid)
203
204 # shrink mds cluster
205 self.fs.shrink(1)
206
207 # wait a bit for mgr to get updated metrics
208 time.sleep(5)
209
210 # validate
211 valid, metrics = self._get_metrics(
212 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
213 log.debug("metrics={0}".format(metrics))
214 self.assertTrue(valid)
215
216 # cleanup test directories
217 self._cleanup_test_dirs()
218
219 def test_delayed_metrics(self):
220 # validate
221 valid, metrics = self._get_metrics(
222 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
223 log.debug("metrics={0}".format(metrics))
224 self.assertTrue(valid)
225
226 # grow the mds cluster
227 self.fs.grow(2)
228
229 fscid = self.fs.id
230 # spread directory per rank
231 self._spread_directory_on_all_ranks(fscid)
232
233 # spread some I/O
234 self._do_spread_io(fscid)
235
236 # wait a bit for mgr to get updated metrics
237 time.sleep(5)
238
239 # validate
240 valid, metrics = self._get_metrics(
241 self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
242 log.debug("metrics={0}".format(metrics))
243 self.assertTrue(valid)
244
245 # do not give this mds any chance
246 delayed_rank = 1
247 mds_id_rank0 = self.fs.get_rank(rank=0)['name']
248 mds_id_rank1 = self.fs.get_rank(rank=1)['name']
249
250 self.fs.set_inter_mds_block(True, mds_id_rank0, mds_id_rank1)
251
252 def verify_delayed_metrics(metrics):
253 mds_metrics = metrics['metrics']
254 r = mds_metrics.get("mds.{}".format(delayed_rank), None)
255 if not r or not delayed_rank in mds_metrics['delayed_ranks']:
256 return False
257 return True
258 # validate
259 valid, metrics = self._get_metrics(verify_delayed_metrics, 30)
260 log.debug("metrics={0}".format(metrics))
261
262 self.assertTrue(valid)
263 self.fs.set_inter_mds_block(False, mds_id_rank0, mds_id_rank1)
264
265 # validate
266 valid, metrics = self._get_metrics(
267 self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
268 log.debug("metrics={0}".format(metrics))
269 self.assertTrue(valid)
270
271 # cleanup test directories
272 self._cleanup_test_dirs()
273
274 def test_query_mds_filter(self):
275 # validate
276 valid, metrics = self._get_metrics(
277 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
278 log.debug("metrics={0}".format(metrics))
279 self.assertTrue(valid)
280
281 # grow the mds cluster
282 self.fs.grow(2)
283
284 fscid = self.fs.id
285 # spread directory per rank
286 self._spread_directory_on_all_ranks(fscid)
287
288 # spread some I/O
289 self._do_spread_io(fscid)
290
291 # wait a bit for mgr to get updated metrics
292 time.sleep(5)
293
294 # validate
295 valid, metrics = self._get_metrics(
296 self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
297 log.debug("metrics={0}".format(metrics))
298 self.assertTrue(valid)
299
300 filtered_mds = 1
301 def verify_filtered_mds_rank_metrics(metrics):
302 # checks if the metrics has only client_metadata and
303 # global_metrics filtered using --mds_rank=1
304 global_metrics = metrics['global_metrics'].get(self.fs.name, {})
305 client_metadata = metrics['client_metadata'].get(self.fs.name, {})
306 mds_metrics = metrics['metrics']
307 if len(mds_metrics) != 2 or f"mds.{filtered_mds}" not in mds_metrics:
308 return False
309 if len(global_metrics) > TestMDSMetrics.CLIENTS_REQUIRED or\
310 len(client_metadata) > TestMDSMetrics.CLIENTS_REQUIRED:
311 return False
312 if len(set(global_metrics) - set(mds_metrics[f"mds.{filtered_mds}"])) or\
313 len(set(client_metadata) - set(mds_metrics[f"mds.{filtered_mds}"])):
314 return False
315 return True
316 # initiate a new query with `--mds_rank` filter and validate if
317 # we get metrics *only* from that mds.
318 valid, metrics = self._get_metrics(verify_filtered_mds_rank_metrics, 30,
319 f'--mds_rank={filtered_mds}')
320 log.debug(f"metrics={metrics}")
321 self.assertTrue(valid, "Incorrect 'ceph fs perf stats' output"
322 f" with filter '--mds_rank={filtered_mds}'")
323
324 def test_query_client_filter(self):
325 # validate
326 valid, metrics = self._get_metrics(
327 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
328 log.debug("metrics={0}".format(metrics))
329 self.assertTrue(valid)
330
331 mds_metrics = metrics['metrics']
332 # pick an random client
333 client = random.choice(list(mds_metrics['mds.0'].keys()))
334 # could have used regex to extract client id
335 client_id = (client.split(' ')[0]).split('.')[-1]
336
337 valid, metrics = self._get_metrics(
338 self.verify_mds_metrics(client_count=1), 30, '--client_id={}'.format(client_id))
339 log.debug("metrics={0}".format(metrics))
340 self.assertTrue(valid)
341
342 def test_query_client_ip_filter(self):
343 # validate
344 valid, metrics = self._get_metrics(
345 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
346 log.debug("metrics={0}".format(metrics))
347 self.assertTrue(valid)
348
349 client_matadata = metrics['client_metadata'][self.fs.name]
350 # pick an random client
351 client = random.choice(list(client_matadata.keys()))
352 # get IP of client to use in filter
353 client_ip = client_matadata[client]['IP']
354
355 valid, metrics = self._get_metrics(
356 self.verify_mds_metrics(client_count=1), 30, '--client_ip={}'.format(client_ip))
357 log.debug("metrics={0}".format(metrics))
358 self.assertTrue(valid)
359
360 # verify IP from output with filter IP
361 for i in metrics['client_metadata'][self.fs.name]:
362 self.assertEqual(client_ip, metrics['client_metadata'][self.fs.name][i]['IP'])
363
364 def test_query_mds_and_client_filter(self):
365 # validate
366 valid, metrics = self._get_metrics(
367 self.verify_mds_metrics(client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
368 log.debug("metrics={0}".format(metrics))
369 self.assertTrue(valid)
370
371 # grow the mds cluster
372 self.fs.grow(2)
373
374 fscid = self.fs.id
375 # spread directory per rank
376 self._spread_directory_on_all_ranks(fscid)
377
378 # spread some I/O
379 self._do_spread_io_all_clients(fscid)
380
381 # wait a bit for mgr to get updated metrics
382 time.sleep(5)
383
384 # validate
385 valid, metrics = self._get_metrics(
386 self.verify_mds_metrics(active_mds_count=2, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
387 log.debug("metrics={0}".format(metrics))
388 self.assertTrue(valid)
389
390 mds_metrics = metrics['metrics']
391
392 # pick an random client
393 client = random.choice(list(mds_metrics['mds.1'].keys()))
394 # could have used regex to extract client id
395 client_id = (client.split(' ')[0]).split('.')[-1]
396 filtered_mds = 1
397 valid, metrics = self._get_metrics(
398 self.verify_mds_metrics(client_count=1, ranks=[filtered_mds]),
399 30, '--mds_rank={}'.format(filtered_mds), '--client_id={}'.format(client_id))
400 log.debug("metrics={0}".format(metrics))
401 self.assertTrue(valid)
402
403 def test_for_invalid_mds_rank(self):
404 invalid_mds_rank = "1,"
405 # try, 'fs perf stat' command with invalid mds_rank
406 try:
407 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "perf", "stats", "--mds_rank", invalid_mds_rank)
408 except CommandFailedError as ce:
409 if ce.exitstatus != errno.EINVAL:
410 raise
411 else:
412 raise RuntimeError("expected the 'fs perf stat' command to fail for invalid mds_rank")
413
414 def test_for_invalid_client_id(self):
415 invalid_client_id = "abcd"
416 # try, 'fs perf stat' command with invalid client_id
417 try:
418 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "perf", "stats", "--client_id", invalid_client_id)
419 except CommandFailedError as ce:
420 if ce.exitstatus != errno.EINVAL:
421 raise
422 else:
423 raise RuntimeError("expected the 'fs perf stat' command to fail for invalid client_id")
424
425 def test_for_invalid_client_ip(self):
426 invalid_client_ip = "1.2.3"
427 # try, 'fs perf stat' command with invalid client_ip
428 try:
429 self.mgr_cluster.mon_manager.raw_cluster_cmd("fs", "perf", "stats", "--client_ip", invalid_client_ip)
430 except CommandFailedError as ce:
431 if ce.exitstatus != errno.EINVAL:
432 raise
433 else:
434 raise RuntimeError("expected the 'fs perf stat' command to fail for invalid client_ip")
435
436 def test_perf_stats_stale_metrics(self):
437 """
438 That `ceph fs perf stats` doesn't output stale metrics after the rank0 MDS failover
439 """
440 # validate
441 valid, metrics = self._get_metrics(self.verify_mds_metrics(
442 active_mds_count=1, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
443 log.debug(f'metrics={metrics}')
444 self.assertTrue(valid)
445
446 # mount_a and mount_b are the clients mounted for TestMDSMetrics. So get their
447 # entries from the global_metrics.
448 client_a_name = f'client.{self.mount_a.get_global_id()}'
449 client_b_name = f'client.{self.mount_b.get_global_id()}'
450
451 global_metrics = metrics['global_metrics']
452 client_a_metrics = global_metrics[self.fs.name][client_a_name]
453 client_b_metrics = global_metrics[self.fs.name][client_b_name]
454
455 # fail rank0 mds
456 self.fs.rank_fail(rank=0)
457
458 # Wait for rank0 up:active state
459 self.fs.wait_for_state('up:active', rank=0, timeout=30)
460
461 fscid = self.fs.id
462
463 # spread directory per rank
464 self._spread_directory_on_all_ranks(fscid)
465
466 # spread some I/O
467 self._do_spread_io_all_clients(fscid)
468
469 # wait a bit for mgr to get updated metrics
470 time.sleep(5)
471
472 # validate
473 try:
474 valid, metrics_new = self._get_metrics(self.verify_mds_metrics(
475 active_mds_count=1, client_count=TestMDSMetrics.CLIENTS_REQUIRED), 30)
476 log.debug(f'metrics={metrics_new}')
477 self.assertTrue(valid)
478
479 client_metadata = metrics_new['client_metadata']
480 client_a_metadata = client_metadata.get(self.fs.name, {}).get(client_a_name, {})
481 client_b_metadata = client_metadata.get(self.fs.name, {}).get(client_b_name, {})
482
483 global_metrics = metrics_new['global_metrics']
484 client_a_metrics_new = global_metrics.get(self.fs.name, {}).get(client_a_name, {})
485 client_b_metrics_new = global_metrics.get(self.fs.name, {}).get(client_b_name, {})
486
487 # the metrics should be different for the test to succeed.
488 self.assertTrue(client_a_metadata and client_b_metadata and
489 client_a_metrics_new and client_b_metrics_new and
490 (client_a_metrics_new != client_a_metrics) and
491 (client_b_metrics_new != client_b_metrics),
492 "Invalid 'ceph fs perf stats' metrics after rank0 mds failover")
493 except MaxWhileTries:
494 raise RuntimeError("Failed to fetch 'ceph fs perf stats' metrics")
495 finally:
496 # cleanup test directories
497 self._cleanup_test_dirs()
498
499 def test_client_metrics_and_metadata(self):
500 self.mount_a.umount_wait()
501 self.mount_b.umount_wait()
502 self.fs.delete_all_filesystems()
503
504 self.mds_cluster.mon_manager.raw_cluster_cmd("fs", "flag", "set",
505 "enable_multiple", "true", "--yes-i-really-mean-it")
506
507 # creating filesystem
508 fs_a = self._setup_fs(fs_name="fs1")
509
510 # Mount a client on fs_a
511 self.mount_a.mount_wait(cephfs_name=fs_a.name)
512 self.mount_a.write_n_mb("pad.bin", 1)
513 self.mount_a.write_n_mb("test.bin", 2)
514 self.mount_a.path_to_ino("test.bin")
515 self.mount_a.create_files()
516
517 # creating another filesystem
518 fs_b = self._setup_fs(fs_name="fs2")
519
520 # Mount a client on fs_b
521 self.mount_b.mount_wait(cephfs_name=fs_b.name)
522 self.mount_b.write_n_mb("test.bin", 1)
523 self.mount_b.path_to_ino("test.bin")
524 self.mount_b.create_files()
525
526 fscid_list = [fs_a.id, fs_b.id]
527
528 # validate
529 valid, metrics = self._get_metrics(
530 self.verify_mds_metrics(client_count=1, mul_fs=fscid_list), 30)
531 log.debug(f"metrics={metrics}")
532 self.assertTrue(valid)
533
534 client_metadata_a = metrics['client_metadata']['fs1']
535 client_metadata_b = metrics['client_metadata']['fs2']
536
537 for i in client_metadata_a:
538 if not (client_metadata_a[i]['hostname']):
539 raise RuntimeError("hostname of fs1 not found!")
540 if not (client_metadata_a[i]['valid_metrics']):
541 raise RuntimeError("valid_metrics of fs1 not found!")
542
543 for i in client_metadata_b:
544 if not (client_metadata_b[i]['hostname']):
545 raise RuntimeError("hostname of fs2 not found!")
546 if not (client_metadata_b[i]['valid_metrics']):
547 raise RuntimeError("valid_metrics of fs2 not found!")
548
549 def test_non_existing_mds_rank(self):
550 def verify_filtered_metrics(metrics):
551 # checks if the metrics has non empty client_metadata and global_metrics
552 if metrics['client_metadata'].get(self.fs.name, {})\
553 or metrics['global_metrics'].get(self.fs.name, {}):
554 return True
555 return False
556
557 try:
558 # validate
559 filter_rank = random.randint(1, 10)
560 valid, metrics = self._get_metrics(verify_filtered_metrics, 30,
561 '--mds_rank={}'.format(filter_rank))
562 log.info(f'metrics={metrics}')
563 self.assertFalse(valid, "Fetched 'ceph fs perf stats' metrics using nonexistent MDS rank")
564 except MaxWhileTries:
565 # success
566 pass
567
568 def test_perf_stats_stale_metrics_with_multiple_filesystem(self):
569 self.mount_a.umount_wait()
570 self.mount_b.umount_wait()
571
572 self.mds_cluster.mon_manager.raw_cluster_cmd("fs", "flag", "set",
573 "enable_multiple", "true", "--yes-i-really-mean-it")
574
575 # creating filesystem
576 fs_b = self._setup_fs(fs_name="fs2")
577
578 # Mount a client on fs_b
579 self.mount_b.mount_wait(cephfs_name=fs_b.name)
580 self.mount_b.write_n_mb("test.bin", 1)
581 self.mount_b.path_to_ino("test.bin")
582 self.mount_b.create_files()
583
584 # creating another filesystem
585 fs_a = self._setup_fs(fs_name="fs1")
586
587 # Mount a client on fs_a
588 self.mount_a.mount_wait(cephfs_name=fs_a.name)
589 self.mount_a.write_n_mb("pad.bin", 1)
590 self.mount_a.write_n_mb("test.bin", 2)
591 self.mount_a.path_to_ino("test.bin")
592 self.mount_a.create_files()
593
594 # validate
595 valid, metrics = self._get_metrics(
596 self.verify_mds_metrics(client_count=1, mul_fs=[fs_a.id, fs_b.id]), 30)
597 log.debug(f"metrics={metrics}")
598 self.assertTrue(valid)
599
600 # get mounted client's entries from the global_metrics.
601 client_a_name = f'client.{self.mount_a.get_global_id()}'
602
603 global_metrics = metrics['global_metrics']
604 client_a_metrics = global_metrics.get("fs1", {}).get(client_a_name, {})
605
606 # fail active mds of fs_a
607 fs_a_mds = fs_a.get_active_names()[0]
608 self.mds_cluster.mds_fail(fs_a_mds)
609 fs_a.wait_for_state('up:active', rank=0, timeout=30)
610
611 # spread directory per rank
612 self._spread_directory_on_all_ranks(fs_a.id)
613
614 # spread some I/O
615 self._do_spread_io_all_clients(fs_a.id)
616
617 # wait a bit for mgr to get updated metrics
618 time.sleep(5)
619
620 # validate
621 try:
622 valid, metrics_new = self._get_metrics(
623 self.verify_mds_metrics(client_count=1, mul_fs=[fs_a.id, fs_b.id]), 30)
624 log.debug(f'metrics={metrics_new}')
625 self.assertTrue(valid)
626
627 client_metadata = metrics_new['client_metadata']
628 client_a_metadata = client_metadata.get("fs1", {}).get(client_a_name, {})
629
630 global_metrics = metrics_new['global_metrics']
631 client_a_metrics_new = global_metrics.get("fs1", {}).get(client_a_name, {})
632
633 # the metrics should be different for the test to succeed.
634 self.assertTrue(client_a_metadata and client_a_metrics_new
635 and (client_a_metrics_new != client_a_metrics),
636 "Invalid 'ceph fs perf stats' metrics after"
637 f" rank0 mds of {fs_a.name} failover")
638 except MaxWhileTries:
639 raise RuntimeError("Failed to fetch `ceph fs perf stats` metrics")
640 finally:
641 # cleanup test directories
642 self._cleanup_test_dirs()
643