]>
git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_mds_metrics.py
727b80c6a91bf63e2c6df8a3ad980175b71c0616
8 from teuthology
.contextutil
import safe_while
, MaxWhileTries
9 from teuthology
.exceptions
import CommandFailedError
10 from tasks
.cephfs
.cephfs_test_case
import CephFSTestCase
12 log
= logging
.getLogger(__name__
)
14 class TestMDSMetrics(CephFSTestCase
):
18 TEST_DIR_PERFIX
= "test_mds_metrics"
21 super(TestMDSMetrics
, self
).setUp()
22 self
._start
_with
_single
_active
_mds
()
23 self
._enable
_mgr
_stats
_plugin
()
26 self
._disable
_mgr
_stats
_plugin
()
27 super(TestMDSMetrics
, self
).tearDown()
29 def _start_with_single_active_mds(self
):
30 curr_max_mds
= self
.fs
.get_var('max_mds')
34 def verify_mds_metrics(self
, active_mds_count
=1, client_count
=1, ranks
=[]):
35 def verify_metrics_cbk(metrics
):
36 mds_metrics
= metrics
['metrics']
37 if not len(mds_metrics
) == active_mds_count
+ 1: # n active mdss + delayed set
39 fs_status
= self
.fs
.status()
42 ranks
= set([info
['rank'] for info
in fs_status
.get_ranks(self
.fs
.id)])
44 r
= mds_metrics
.get("mds.{}".format(rank
), None)
45 if not r
or not len(mds_metrics
['delayed_ranks']) == 0:
47 global_metrics
= metrics
['global_metrics']
48 client_metadata
= metrics
['client_metadata']
49 if not len(global_metrics
) >= client_count
or not len(client_metadata
) >= client_count
:
52 return verify_metrics_cbk
54 def _fs_perf_stats(self
, *args
):
55 return self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "perf", "stats", *args
)
57 def _enable_mgr_stats_plugin(self
):
58 return self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("mgr", "module", "enable", "stats")
60 def _disable_mgr_stats_plugin(self
):
61 return self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("mgr", "module", "disable", "stats")
63 def _spread_directory_on_all_ranks(self
, fscid
):
64 fs_status
= self
.fs
.status()
65 ranks
= set([info
['rank'] for info
in fs_status
.get_ranks(fscid
)])
66 # create a per-rank pinned directory
68 dirname
= "{0}_{1}".format(TestMDSMetrics
.TEST_DIR_PERFIX
, rank
)
69 self
.mount_a
.run_shell(["mkdir", dirname
])
70 self
.mount_a
.setfattr(dirname
, "ceph.dir.pin", str(rank
))
71 log
.info("pinning directory {0} to rank {1}".format(dirname
, rank
))
73 filename
= "{0}.{1}".format("test", i
)
74 self
.mount_a
.write_n_mb(os
.path
.join(dirname
, filename
), 1)
76 def _do_spread_io(self
, fscid
):
78 self
.mount_b
.run_shell(["find", "."])
80 def _do_spread_io_all_clients(self
, fscid
):
82 self
.mount_a
.run_shell(["find", "."])
83 self
.mount_b
.run_shell(["find", "."])
85 def _cleanup_test_dirs(self
):
86 dirnames
= self
.mount_a
.run_shell(["ls"]).stdout
.getvalue()
87 for dirname
in dirnames
.split("\n"):
88 if dirname
.startswith(TestMDSMetrics
.TEST_DIR_PERFIX
):
89 log
.info("cleaning directory {}".format(dirname
))
90 self
.mount_a
.run_shell(["rm", "-rf", dirname
])
92 def _get_metrics(self
, verifier_callback
, trials
, *args
):
95 with
safe_while(sleep
=1, tries
=trials
, action
='wait for metrics') as proceed
:
97 metrics
= json
.loads(self
._fs
_perf
_stats
(*args
))
98 done
= verifier_callback(metrics
)
103 def _setup_fs(self
, fs_name
):
104 fs_a
= self
.mds_cluster
.newfs(name
=fs_name
)
106 self
.mds_cluster
.mds_restart()
108 # Wait for filesystem to go healthy
109 fs_a
.wait_for_daemons()
111 # Reconfigure client auth caps
112 for mount
in self
.mounts
:
113 self
.mds_cluster
.mon_manager
.raw_cluster_cmd_result(
114 'auth', 'caps', f
"client.{mount.client_id}",
117 'osd', f
'allow rw pool={fs_a.get_data_pool_name()}')
121 # basic check to verify if we get back metrics from each active mds rank
123 def test_metrics_from_rank(self
):
125 valid
, metrics
= self
._get
_metrics
(
126 self
.verify_mds_metrics(client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
), 30)
127 log
.debug("metrics={0}".format(metrics
))
128 self
.assertTrue(valid
)
130 def test_metrics_post_client_disconnection(self
):
132 valid
, metrics
= self
._get
_metrics
(
133 self
.verify_mds_metrics(client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
), 30)
134 log
.debug("metrics={0}".format(metrics
))
135 self
.assertTrue(valid
)
137 self
.mount_a
.umount_wait()
139 valid
, metrics
= self
._get
_metrics
(
140 self
.verify_mds_metrics(client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
- 1), 30)
141 log
.debug("metrics={0}".format(metrics
))
142 self
.assertTrue(valid
)
144 def test_metrics_mds_grow(self
):
146 valid
, metrics
= self
._get
_metrics
(
147 self
.verify_mds_metrics(client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
), 30)
148 log
.debug("metrics={0}".format(metrics
))
149 self
.assertTrue(valid
)
151 # grow the mds cluster
155 # spread directory per rank
156 self
._spread
_directory
_on
_all
_ranks
(fscid
)
159 self
._do
_spread
_io
(fscid
)
161 # wait a bit for mgr to get updated metrics
165 valid
, metrics
= self
._get
_metrics
(self
.verify_mds_metrics(
166 active_mds_count
=2, client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
) , 30)
167 log
.debug("metrics={0}".format(metrics
))
168 self
.assertTrue(valid
)
170 # cleanup test directories
171 self
._cleanup
_test
_dirs
()
173 def test_metrics_mds_grow_and_shrink(self
):
175 valid
, metrics
= self
._get
_metrics
(
176 self
.verify_mds_metrics(client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
), 30)
177 log
.debug("metrics={0}".format(metrics
))
178 self
.assertTrue(valid
)
180 # grow the mds cluster
184 # spread directory per rank
185 self
._spread
_directory
_on
_all
_ranks
(fscid
)
188 self
._do
_spread
_io
(fscid
)
190 # wait a bit for mgr to get updated metrics
194 valid
, metrics
= self
._get
_metrics
(
195 self
.verify_mds_metrics(active_mds_count
=2, client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
), 30)
196 log
.debug("metrics={0}".format(metrics
))
197 self
.assertTrue(valid
)
202 # wait a bit for mgr to get updated metrics
206 valid
, metrics
= self
._get
_metrics
(
207 self
.verify_mds_metrics(client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
), 30)
208 log
.debug("metrics={0}".format(metrics
))
209 self
.assertTrue(valid
)
211 # cleanup test directories
212 self
._cleanup
_test
_dirs
()
214 def test_delayed_metrics(self
):
216 valid
, metrics
= self
._get
_metrics
(
217 self
.verify_mds_metrics(client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
), 30)
218 log
.debug("metrics={0}".format(metrics
))
219 self
.assertTrue(valid
)
221 # grow the mds cluster
225 # spread directory per rank
226 self
._spread
_directory
_on
_all
_ranks
(fscid
)
229 self
._do
_spread
_io
(fscid
)
231 # wait a bit for mgr to get updated metrics
235 valid
, metrics
= self
._get
_metrics
(
236 self
.verify_mds_metrics(active_mds_count
=2, client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
), 30)
237 log
.debug("metrics={0}".format(metrics
))
238 self
.assertTrue(valid
)
240 # do not give this mds any chance
242 mds_id_rank0
= self
.fs
.get_rank(rank
=0)['name']
243 mds_id_rank1
= self
.fs
.get_rank(rank
=1)['name']
245 self
.fs
.set_inter_mds_block(True, mds_id_rank0
, mds_id_rank1
)
247 def verify_delayed_metrics(metrics
):
248 mds_metrics
= metrics
['metrics']
249 r
= mds_metrics
.get("mds.{}".format(delayed_rank
), None)
250 if not r
or not delayed_rank
in mds_metrics
['delayed_ranks']:
254 valid
, metrics
= self
._get
_metrics
(verify_delayed_metrics
, 30)
255 log
.debug("metrics={0}".format(metrics
))
257 self
.assertTrue(valid
)
258 self
.fs
.set_inter_mds_block(False, mds_id_rank0
, mds_id_rank1
)
261 valid
, metrics
= self
._get
_metrics
(
262 self
.verify_mds_metrics(active_mds_count
=2, client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
), 30)
263 log
.debug("metrics={0}".format(metrics
))
264 self
.assertTrue(valid
)
266 # cleanup test directories
267 self
._cleanup
_test
_dirs
()
269 def test_query_mds_filter(self
):
271 valid
, metrics
= self
._get
_metrics
(
272 self
.verify_mds_metrics(client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
), 30)
273 log
.debug("metrics={0}".format(metrics
))
274 self
.assertTrue(valid
)
276 # grow the mds cluster
280 # spread directory per rank
281 self
._spread
_directory
_on
_all
_ranks
(fscid
)
284 self
._do
_spread
_io
(fscid
)
286 # wait a bit for mgr to get updated metrics
290 valid
, metrics
= self
._get
_metrics
(
291 self
.verify_mds_metrics(active_mds_count
=2, client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
), 30)
292 log
.debug("metrics={0}".format(metrics
))
293 self
.assertTrue(valid
)
295 # initiate a new query with `--mds_rank` filter and validate if
296 # we get metrics *only* from that mds.
298 valid
, metrics
= self
._get
_metrics
(
299 self
.verify_mds_metrics(client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
,
300 ranks
=[filtered_mds
]), 30, '--mds_rank={}'.format(filtered_mds
))
301 log
.debug("metrics={0}".format(metrics
))
302 self
.assertTrue(valid
)
304 def test_query_client_filter(self
):
306 valid
, metrics
= self
._get
_metrics
(
307 self
.verify_mds_metrics(client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
), 30)
308 log
.debug("metrics={0}".format(metrics
))
309 self
.assertTrue(valid
)
311 mds_metrics
= metrics
['metrics']
312 # pick an random client
313 client
= random
.choice(list(mds_metrics
['mds.0'].keys()))
314 # could have used regex to extract client id
315 client_id
= (client
.split(' ')[0]).split('.')[-1]
317 valid
, metrics
= self
._get
_metrics
(
318 self
.verify_mds_metrics(client_count
=1), 30, '--client_id={}'.format(client_id
))
319 log
.debug("metrics={0}".format(metrics
))
320 self
.assertTrue(valid
)
322 def test_query_client_ip_filter(self
):
324 valid
, metrics
= self
._get
_metrics
(
325 self
.verify_mds_metrics(client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
), 30)
326 log
.debug("metrics={0}".format(metrics
))
327 self
.assertTrue(valid
)
329 client_matadata
= metrics
['client_metadata']
330 # pick an random client
331 client
= random
.choice(list(client_matadata
.keys()))
332 # get IP of client to use in filter
333 client_ip
= client_matadata
[client
]['IP']
335 valid
, metrics
= self
._get
_metrics
(
336 self
.verify_mds_metrics(client_count
=1), 30, '--client_ip={}'.format(client_ip
))
337 log
.debug("metrics={0}".format(metrics
))
338 self
.assertTrue(valid
)
340 # verify IP from output with filter IP
341 for i
in metrics
['client_metadata']:
342 self
.assertEqual(client_ip
, metrics
['client_metadata'][i
]['IP'])
344 def test_query_mds_and_client_filter(self
):
346 valid
, metrics
= self
._get
_metrics
(
347 self
.verify_mds_metrics(client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
), 30)
348 log
.debug("metrics={0}".format(metrics
))
349 self
.assertTrue(valid
)
351 # grow the mds cluster
355 # spread directory per rank
356 self
._spread
_directory
_on
_all
_ranks
(fscid
)
359 self
._do
_spread
_io
_all
_clients
(fscid
)
361 # wait a bit for mgr to get updated metrics
365 valid
, metrics
= self
._get
_metrics
(
366 self
.verify_mds_metrics(active_mds_count
=2, client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
), 30)
367 log
.debug("metrics={0}".format(metrics
))
368 self
.assertTrue(valid
)
370 mds_metrics
= metrics
['metrics']
372 # pick an random client
373 client
= random
.choice(list(mds_metrics
['mds.1'].keys()))
374 # could have used regex to extract client id
375 client_id
= (client
.split(' ')[0]).split('.')[-1]
377 valid
, metrics
= self
._get
_metrics
(
378 self
.verify_mds_metrics(client_count
=1, ranks
=[filtered_mds
]),
379 30, '--mds_rank={}'.format(filtered_mds
), '--client_id={}'.format(client_id
))
380 log
.debug("metrics={0}".format(metrics
))
381 self
.assertTrue(valid
)
383 def test_for_invalid_mds_rank(self
):
384 invalid_mds_rank
= "1,"
385 # try, 'fs perf stat' command with invalid mds_rank
387 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "perf", "stats", "--mds_rank", invalid_mds_rank
)
388 except CommandFailedError
as ce
:
389 if ce
.exitstatus
!= errno
.EINVAL
:
392 raise RuntimeError("expected the 'fs perf stat' command to fail for invalid mds_rank")
394 def test_for_invalid_client_id(self
):
395 invalid_client_id
= "abcd"
396 # try, 'fs perf stat' command with invalid client_id
398 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "perf", "stats", "--client_id", invalid_client_id
)
399 except CommandFailedError
as ce
:
400 if ce
.exitstatus
!= errno
.EINVAL
:
403 raise RuntimeError("expected the 'fs perf stat' command to fail for invalid client_id")
405 def test_for_invalid_client_ip(self
):
406 invalid_client_ip
= "1.2.3"
407 # try, 'fs perf stat' command with invalid client_ip
409 self
.mgr_cluster
.mon_manager
.raw_cluster_cmd("fs", "perf", "stats", "--client_ip", invalid_client_ip
)
410 except CommandFailedError
as ce
:
411 if ce
.exitstatus
!= errno
.EINVAL
:
414 raise RuntimeError("expected the 'fs perf stat' command to fail for invalid client_ip")
416 def test_perf_stats_stale_metrics(self
):
418 That `ceph fs perf stats` doesn't output stale metrics after the rank0 MDS failover
421 valid
, metrics
= self
._get
_metrics
(self
.verify_mds_metrics(
422 active_mds_count
=1, client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
), 30)
423 log
.debug(f
'metrics={metrics}')
424 self
.assertTrue(valid
)
426 #mount_a and mount_b are the clients mounted for TestMDSMetrics. So get their
427 #entries from the global_metrics.
428 client_a_name
= f
'client.{self.mount_a.get_global_id()}'
429 client_b_name
= f
'client.{self.mount_b.get_global_id()}'
431 global_metrics
= metrics
['global_metrics']
432 client_a_metrics
= global_metrics
[client_a_name
]
433 client_b_metrics
= global_metrics
[client_b_name
]
436 self
.fs
.rank_fail(rank
=0)
438 # Wait for 10 seconds for the failover to complete and
439 # the mgr to get initial metrics from the new rank0 mds.
444 # spread directory per rank
445 self
._spread
_directory
_on
_all
_ranks
(fscid
)
448 self
._do
_spread
_io
_all
_clients
(fscid
)
450 # wait a bit for mgr to get updated metrics
455 valid
, metrics_new
= self
._get
_metrics
(self
.verify_mds_metrics(
456 active_mds_count
=1, client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
), 30)
457 log
.debug(f
'metrics={metrics_new}')
458 self
.assertTrue(valid
)
460 global_metrics
= metrics_new
['global_metrics']
461 client_a_metrics_new
= global_metrics
[client_a_name
]
462 client_b_metrics_new
= global_metrics
[client_b_name
]
464 #the metrics should be different for the test to succeed.
465 self
.assertNotEqual(client_a_metrics
, client_a_metrics_new
)
466 self
.assertNotEqual(client_b_metrics
, client_b_metrics_new
)
467 except MaxWhileTries
:
468 raise RuntimeError("Failed to fetch `ceph fs perf stats` metrics")
470 # cleanup test directories
471 self
._cleanup
_test
_dirs
()
473 def test_client_metrics_and_metadata(self
):
474 self
.mount_a
.umount_wait()
475 self
.mount_b
.umount_wait()
477 self
.mds_cluster
.mon_manager
.raw_cluster_cmd("fs", "flag", "set",
478 "enable_multiple", "true",
479 "--yes-i-really-mean-it")
482 fs_a
= self
._setup
_fs
(fs_name
= "fs1")
484 # Mount a client on fs_a
485 self
.mount_a
.mount_wait(cephfs_name
=fs_a
.name
)
486 self
.mount_a
.write_n_mb("pad.bin", 1)
487 self
.mount_a
.write_n_mb("test.bin", 2)
488 self
.mount_a
.path_to_ino("test.bin")
489 self
.mount_a
.create_files()
491 #creating another filesystem
492 fs_b
= self
._setup
_fs
(fs_name
= "fs2")
494 # Mount a client on fs_b
495 self
.mount_b
.mount_wait(cephfs_name
=fs_b
.name
)
496 self
.mount_b
.write_n_mb("test.bin", 1)
497 self
.mount_b
.path_to_ino("test.bin")
498 self
.mount_b
.create_files()
501 valid
, metrics
= self
._get
_metrics
(
502 self
.verify_mds_metrics(client_count
=TestMDSMetrics
.CLIENTS_REQUIRED
), 30)
503 log
.debug(f
"metrics={metrics}")
504 self
.assertTrue(valid
)
506 client_metadata
= metrics
['client_metadata']
508 for i
in client_metadata
:
509 if not (client_metadata
[i
]['hostname']):
510 raise RuntimeError("hostname not found!")
511 if not (client_metadata
[i
]['valid_metrics']):
512 raise RuntimeError("valid_metrics not found!")