]>
Commit | Line | Data |
---|---|---|
1 | import json | |
2 | import time | |
3 | import logging | |
4 | from textwrap import dedent | |
5 | import datetime | |
6 | import gevent | |
7 | import datetime | |
8 | ||
9 | from teuthology.orchestra.run import CommandFailedError, Raw | |
10 | from tasks.cephfs.cephfs_test_case import CephFSTestCase, for_teuthology | |
11 | ||
12 | log = logging.getLogger(__name__) | |
13 | ||
14 | ||
15 | class TestStrays(CephFSTestCase): | |
16 | MDSS_REQUIRED = 2 | |
17 | ||
18 | OPS_THROTTLE = 1 | |
19 | FILES_THROTTLE = 2 | |
20 | ||
21 | # Range of different file sizes used in throttle test's workload | |
22 | throttle_workload_size_range = 16 | |
23 | ||
24 | @for_teuthology | |
25 | def test_ops_throttle(self): | |
26 | self._test_throttling(self.OPS_THROTTLE) | |
27 | ||
28 | @for_teuthology | |
29 | def test_files_throttle(self): | |
30 | self._test_throttling(self.FILES_THROTTLE) | |
31 | ||
32 | def test_dir_deletion(self): | |
33 | """ | |
34 | That when deleting a bunch of dentries and the containing | |
35 | directory, everything gets purged. | |
36 | Catches cases where the client might e.g. fail to trim | |
37 | the unlinked dir from its cache. | |
38 | """ | |
39 | file_count = 1000 | |
40 | create_script = dedent(""" | |
41 | import os | |
42 | ||
43 | mount_path = "{mount_path}" | |
44 | subdir = "delete_me" | |
45 | size = {size} | |
46 | file_count = {file_count} | |
47 | os.mkdir(os.path.join(mount_path, subdir)) | |
48 | for i in xrange(0, file_count): | |
49 | filename = "{{0}}_{{1}}.bin".format(i, size) | |
50 | f = open(os.path.join(mount_path, subdir, filename), 'w') | |
51 | f.write(size * 'x') | |
52 | f.close() | |
53 | """.format( | |
54 | mount_path=self.mount_a.mountpoint, | |
55 | size=1024, | |
56 | file_count=file_count | |
57 | )) | |
58 | ||
59 | self.mount_a.run_python(create_script) | |
60 | ||
61 | # That the dirfrag object is created | |
62 | self.fs.mds_asok(["flush", "journal"]) | |
63 | dir_ino = self.mount_a.path_to_ino("delete_me") | |
64 | self.assertTrue(self.fs.dirfrag_exists(dir_ino, 0)) | |
65 | ||
66 | # Remove everything | |
67 | self.mount_a.run_shell(["rm", "-rf", "delete_me"]) | |
68 | self.fs.mds_asok(["flush", "journal"]) | |
69 | ||
70 | # That all the removed files get created as strays | |
71 | strays = self.get_mdc_stat("strays_created") | |
72 | self.assertEqual(strays, file_count + 1) | |
73 | ||
74 | # That the strays all get enqueued for purge | |
75 | self.wait_until_equal( | |
76 | lambda: self.get_mdc_stat("strays_enqueued"), | |
77 | strays, | |
78 | timeout=600 | |
79 | ||
80 | ) | |
81 | ||
82 | # That all the purge operations execute | |
83 | self.wait_until_equal( | |
84 | lambda: self.get_stat("purge_queue", "pq_executed"), | |
85 | strays, | |
86 | timeout=600 | |
87 | ) | |
88 | ||
89 | # That finally, the directory metadata object is gone | |
90 | self.assertFalse(self.fs.dirfrag_exists(dir_ino, 0)) | |
91 | ||
92 | # That finally, the data objects are all gone | |
93 | self.await_data_pool_empty() | |
94 | ||
95 | def _test_throttling(self, throttle_type): | |
96 | self.data_log = [] | |
97 | try: | |
98 | return self._do_test_throttling(throttle_type) | |
99 | except: | |
100 | for l in self.data_log: | |
101 | log.info(",".join([l_.__str__() for l_ in l])) | |
102 | raise | |
103 | ||
104 | def _do_test_throttling(self, throttle_type): | |
105 | """ | |
106 | That the mds_max_purge_ops setting is respected | |
107 | """ | |
108 | ||
109 | def set_throttles(files, ops): | |
110 | """ | |
111 | Helper for updating ops/files limits, and calculating effective | |
112 | ops_per_pg setting to give the same ops limit. | |
113 | """ | |
114 | self.set_conf('mds', 'mds_max_purge_files', "%d" % files) | |
115 | self.set_conf('mds', 'mds_max_purge_ops', "%d" % ops) | |
116 | ||
117 | pgs = self.fs.mon_manager.get_pool_property( | |
118 | self.fs.get_data_pool_name(), | |
119 | "pg_num" | |
120 | ) | |
121 | ops_per_pg = float(ops) / pgs | |
122 | self.set_conf('mds', 'mds_max_purge_ops_per_pg', "%s" % ops_per_pg) | |
123 | ||
124 | # Test conditions depend on what we're going to be exercising. | |
125 | # * Lift the threshold on whatever throttle we are *not* testing, so | |
126 | # that the throttle of interest is the one that will be the bottleneck | |
127 | # * Create either many small files (test file count throttling) or fewer | |
128 | # large files (test op throttling) | |
129 | if throttle_type == self.OPS_THROTTLE: | |
130 | set_throttles(files=100000000, ops=16) | |
131 | size_unit = 1024 * 1024 # big files, generate lots of ops | |
132 | file_multiplier = 100 | |
133 | elif throttle_type == self.FILES_THROTTLE: | |
134 | # The default value of file limit is pretty permissive, so to avoid | |
135 | # the test running too fast, create lots of files and set the limit | |
136 | # pretty low. | |
137 | set_throttles(ops=100000000, files=6) | |
138 | size_unit = 1024 # small, numerous files | |
139 | file_multiplier = 200 | |
140 | else: | |
141 | raise NotImplemented(throttle_type) | |
142 | ||
143 | # Pick up config changes | |
144 | self.fs.mds_fail_restart() | |
145 | self.fs.wait_for_daemons() | |
146 | ||
147 | create_script = dedent(""" | |
148 | import os | |
149 | ||
150 | mount_path = "{mount_path}" | |
151 | subdir = "delete_me" | |
152 | size_unit = {size_unit} | |
153 | file_multiplier = {file_multiplier} | |
154 | os.mkdir(os.path.join(mount_path, subdir)) | |
155 | for i in xrange(0, file_multiplier): | |
156 | for size in xrange(0, {size_range}*size_unit, size_unit): | |
157 | filename = "{{0}}_{{1}}.bin".format(i, size / size_unit) | |
158 | f = open(os.path.join(mount_path, subdir, filename), 'w') | |
159 | f.write(size * 'x') | |
160 | f.close() | |
161 | """.format( | |
162 | mount_path=self.mount_a.mountpoint, | |
163 | size_unit=size_unit, | |
164 | file_multiplier=file_multiplier, | |
165 | size_range=self.throttle_workload_size_range | |
166 | )) | |
167 | ||
168 | self.mount_a.run_python(create_script) | |
169 | ||
170 | # We will run the deletion in the background, to reduce the risk of it completing before | |
171 | # we have started monitoring the stray statistics. | |
172 | def background(): | |
173 | self.mount_a.run_shell(["rm", "-rf", "delete_me"]) | |
174 | self.fs.mds_asok(["flush", "journal"]) | |
175 | ||
176 | background_thread = gevent.spawn(background) | |
177 | ||
178 | total_inodes = file_multiplier * self.throttle_workload_size_range + 1 | |
179 | mds_max_purge_ops = int(self.fs.get_config("mds_max_purge_ops", 'mds')) | |
180 | mds_max_purge_files = int(self.fs.get_config("mds_max_purge_files", 'mds')) | |
181 | ||
182 | # During this phase we look for the concurrent ops to exceed half | |
183 | # the limit (a heuristic) and not exceed the limit (a correctness | |
184 | # condition). | |
185 | purge_timeout = 600 | |
186 | elapsed = 0 | |
187 | files_high_water = 0 | |
188 | ops_high_water = 0 | |
189 | ||
190 | while True: | |
191 | stats = self.fs.mds_asok(['perf', 'dump']) | |
192 | mdc_stats = stats['mds_cache'] | |
193 | pq_stats = stats['purge_queue'] | |
194 | if elapsed >= purge_timeout: | |
195 | raise RuntimeError("Timeout waiting for {0} inodes to purge, stats:{1}".format(total_inodes, mdc_stats)) | |
196 | ||
197 | num_strays = mdc_stats['num_strays'] | |
198 | num_strays_purging = pq_stats['pq_executing'] | |
199 | num_purge_ops = pq_stats['pq_executing_ops'] | |
200 | ||
201 | self.data_log.append([datetime.datetime.now(), num_strays, num_strays_purging, num_purge_ops]) | |
202 | ||
203 | files_high_water = max(files_high_water, num_strays_purging) | |
204 | ops_high_water = max(ops_high_water, num_purge_ops) | |
205 | ||
206 | total_strays_created = mdc_stats['strays_created'] | |
207 | total_strays_purged = pq_stats['pq_executed'] | |
208 | ||
209 | if total_strays_purged == total_inodes: | |
210 | log.info("Complete purge in {0} seconds".format(elapsed)) | |
211 | break | |
212 | elif total_strays_purged > total_inodes: | |
213 | raise RuntimeError("Saw more strays than expected, mdc stats: {0}".format(mdc_stats)) | |
214 | else: | |
215 | if throttle_type == self.OPS_THROTTLE: | |
216 | # 11 is filer_max_purge_ops plus one for the backtrace: | |
217 | # limit is allowed to be overshot by this much. | |
218 | if num_purge_ops > mds_max_purge_ops + 11: | |
219 | raise RuntimeError("num_purge_ops violates threshold {0}/{1}".format( | |
220 | num_purge_ops, mds_max_purge_ops | |
221 | )) | |
222 | elif throttle_type == self.FILES_THROTTLE: | |
223 | if num_strays_purging > mds_max_purge_files: | |
224 | raise RuntimeError("num_strays_purging violates threshold {0}/{1}".format( | |
225 | num_strays_purging, mds_max_purge_files | |
226 | )) | |
227 | else: | |
228 | raise NotImplemented(throttle_type) | |
229 | ||
230 | log.info("Waiting for purge to complete {0}/{1}, {2}/{3}".format( | |
231 | num_strays_purging, num_strays, | |
232 | total_strays_purged, total_strays_created | |
233 | )) | |
234 | time.sleep(1) | |
235 | elapsed += 1 | |
236 | ||
237 | background_thread.join() | |
238 | ||
239 | # Check that we got up to a respectable rate during the purge. This is totally | |
240 | # racy, but should be safeish unless the cluster is pathologically slow, or | |
241 | # insanely fast such that the deletions all pass before we have polled the | |
242 | # statistics. | |
243 | if throttle_type == self.OPS_THROTTLE: | |
244 | if ops_high_water < mds_max_purge_ops / 2: | |
245 | raise RuntimeError("Ops in flight high water is unexpectedly low ({0} / {1})".format( | |
246 | ops_high_water, mds_max_purge_ops | |
247 | )) | |
248 | elif throttle_type == self.FILES_THROTTLE: | |
249 | if files_high_water < mds_max_purge_files / 2: | |
250 | raise RuntimeError("Files in flight high water is unexpectedly low ({0} / {1})".format( | |
251 | ops_high_water, mds_max_purge_files | |
252 | )) | |
253 | ||
254 | # Sanity check all MDC stray stats | |
255 | stats = self.fs.mds_asok(['perf', 'dump']) | |
256 | mdc_stats = stats['mds_cache'] | |
257 | pq_stats = stats['purge_queue'] | |
258 | self.assertEqual(mdc_stats['num_strays'], 0) | |
259 | self.assertEqual(mdc_stats['num_strays_delayed'], 0) | |
260 | self.assertEqual(pq_stats['pq_executing'], 0) | |
261 | self.assertEqual(pq_stats['pq_executing_ops'], 0) | |
262 | self.assertEqual(mdc_stats['strays_created'], total_inodes) | |
263 | self.assertEqual(mdc_stats['strays_enqueued'], total_inodes) | |
264 | self.assertEqual(pq_stats['pq_executed'], total_inodes) | |
265 | ||
266 | def get_mdc_stat(self, name, mds_id=None): | |
267 | return self.get_stat("mds_cache", name, mds_id) | |
268 | ||
269 | def get_stat(self, subsys, name, mds_id=None): | |
270 | return self.fs.mds_asok(['perf', 'dump', subsys, name], | |
271 | mds_id=mds_id)[subsys][name] | |
272 | ||
273 | def _wait_for_counter(self, subsys, counter, expect_val, timeout=60, | |
274 | mds_id=None): | |
275 | self.wait_until_equal( | |
276 | lambda: self.get_stat(subsys, counter, mds_id), | |
277 | expect_val=expect_val, timeout=timeout, | |
278 | reject_fn=lambda x: x > expect_val | |
279 | ) | |
280 | ||
281 | def test_open_inode(self): | |
282 | """ | |
283 | That the case of a dentry unlinked while a client holds an | |
284 | inode open is handled correctly. | |
285 | ||
286 | The inode should be moved into a stray dentry, while the original | |
287 | dentry and directory should be purged. | |
288 | ||
289 | The inode's data should be purged when the client eventually closes | |
290 | it. | |
291 | """ | |
292 | mount_a_client_id = self.mount_a.get_global_id() | |
293 | ||
294 | # Write some bytes to a file | |
295 | size_mb = 8 | |
296 | ||
297 | # Hold the file open | |
298 | p = self.mount_a.open_background("open_file") | |
299 | self.mount_a.write_n_mb("open_file", size_mb) | |
300 | open_file_ino = self.mount_a.path_to_ino("open_file") | |
301 | ||
302 | self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2) | |
303 | ||
304 | # Unlink the dentry | |
305 | self.mount_a.run_shell(["rm", "-f", "open_file"]) | |
306 | ||
307 | # Wait to see the stray count increment | |
308 | self.wait_until_equal( | |
309 | lambda: self.get_mdc_stat("num_strays"), | |
310 | expect_val=1, timeout=60, reject_fn=lambda x: x > 1) | |
311 | ||
312 | # See that while the stray count has incremented, none have passed | |
313 | # on to the purge queue | |
314 | self.assertEqual(self.get_mdc_stat("strays_created"), 1) | |
315 | self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0) | |
316 | ||
317 | # See that the client still holds 2 caps | |
318 | self.assertEqual(self.get_session(mount_a_client_id)['num_caps'], 2) | |
319 | ||
320 | # See that the data objects remain in the data pool | |
321 | self.assertTrue(self.fs.data_objects_present(open_file_ino, size_mb * 1024 * 1024)) | |
322 | ||
323 | # Now close the file | |
324 | self.mount_a.kill_background(p) | |
325 | ||
326 | # Wait to see the client cap count decrement | |
327 | self.wait_until_equal( | |
328 | lambda: self.get_session(mount_a_client_id)['num_caps'], | |
329 | expect_val=1, timeout=60, reject_fn=lambda x: x > 2 or x < 1 | |
330 | ) | |
331 | # Wait to see the purge counter increment, stray count go to zero | |
332 | self._wait_for_counter("mds_cache", "strays_enqueued", 1) | |
333 | self.wait_until_equal( | |
334 | lambda: self.get_mdc_stat("num_strays"), | |
335 | expect_val=0, timeout=6, reject_fn=lambda x: x > 1 | |
336 | ) | |
337 | self._wait_for_counter("purge_queue", "pq_executed", 1) | |
338 | ||
339 | # See that the data objects no longer exist | |
340 | self.assertTrue(self.fs.data_objects_absent(open_file_ino, size_mb * 1024 * 1024)) | |
341 | ||
342 | self.await_data_pool_empty() | |
343 | ||
344 | def test_hardlink_reintegration(self): | |
345 | """ | |
346 | That removal of primary dentry of hardlinked inode results | |
347 | in reintegration of inode into the previously-remote dentry, | |
348 | rather than lingering as a stray indefinitely. | |
349 | """ | |
350 | # Write some bytes to file_a | |
351 | size_mb = 8 | |
352 | self.mount_a.run_shell(["mkdir", "dir_1"]) | |
353 | self.mount_a.write_n_mb("dir_1/file_a", size_mb) | |
354 | ino = self.mount_a.path_to_ino("dir_1/file_a") | |
355 | ||
356 | # Create a hardlink named file_b | |
357 | self.mount_a.run_shell(["mkdir", "dir_2"]) | |
358 | self.mount_a.run_shell(["ln", "dir_1/file_a", "dir_2/file_b"]) | |
359 | self.assertEqual(self.mount_a.path_to_ino("dir_2/file_b"), ino) | |
360 | ||
361 | # Flush journal | |
362 | self.fs.mds_asok(['flush', 'journal']) | |
363 | ||
364 | # See that backtrace for the file points to the file_a path | |
365 | pre_unlink_bt = self.fs.read_backtrace(ino) | |
366 | self.assertEqual(pre_unlink_bt['ancestors'][0]['dname'], "file_a") | |
367 | ||
368 | # empty mds cache. otherwise mds reintegrates stray when unlink finishes | |
369 | self.mount_a.umount_wait() | |
370 | self.fs.mds_asok(['flush', 'journal']) | |
371 | self.fs.mds_fail_restart() | |
372 | self.fs.wait_for_daemons() | |
373 | self.mount_a.mount() | |
374 | ||
375 | # Unlink file_a | |
376 | self.mount_a.run_shell(["rm", "-f", "dir_1/file_a"]) | |
377 | ||
378 | # See that a stray was created | |
379 | self.assertEqual(self.get_mdc_stat("num_strays"), 1) | |
380 | self.assertEqual(self.get_mdc_stat("strays_created"), 1) | |
381 | ||
382 | # Wait, see that data objects are still present (i.e. that the | |
383 | # stray did not advance to purging given time) | |
384 | time.sleep(30) | |
385 | self.assertTrue(self.fs.data_objects_present(ino, size_mb * 1024 * 1024)) | |
386 | self.assertEqual(self.get_mdc_stat("strays_enqueued"), 0) | |
387 | ||
388 | # See that before reintegration, the inode's backtrace points to a stray dir | |
389 | self.fs.mds_asok(['flush', 'journal']) | |
390 | self.assertTrue(self.get_backtrace_path(ino).startswith("stray")) | |
391 | ||
392 | last_reintegrated = self.get_mdc_stat("strays_reintegrated") | |
393 | ||
394 | # Do a metadata operation on the remaining link (mv is heavy handed, but | |
395 | # others like touch may be satisfied from caps without poking MDS) | |
396 | self.mount_a.run_shell(["mv", "dir_2/file_b", "dir_2/file_c"]) | |
397 | ||
398 | # Stray reintegration should happen as a result of the eval_remote call | |
399 | # on responding to a client request. | |
400 | self.wait_until_equal( | |
401 | lambda: self.get_mdc_stat("num_strays"), | |
402 | expect_val=0, | |
403 | timeout=60 | |
404 | ) | |
405 | ||
406 | # See the reintegration counter increment | |
407 | curr_reintegrated = self.get_mdc_stat("strays_reintegrated") | |
408 | self.assertGreater(curr_reintegrated, last_reintegrated) | |
409 | last_reintegrated = curr_reintegrated | |
410 | ||
411 | # Flush the journal | |
412 | self.fs.mds_asok(['flush', 'journal']) | |
413 | ||
414 | # See that the backtrace for the file points to the remaining link's path | |
415 | post_reint_bt = self.fs.read_backtrace(ino) | |
416 | self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_c") | |
417 | ||
418 | # mds should reintegrates stray when unlink finishes | |
419 | self.mount_a.run_shell(["ln", "dir_2/file_c", "dir_2/file_d"]) | |
420 | self.mount_a.run_shell(["rm", "-f", "dir_2/file_c"]) | |
421 | ||
422 | # Stray reintegration should happen as a result of the notify_stray call | |
423 | # on completion of unlink | |
424 | self.wait_until_equal( | |
425 | lambda: self.get_mdc_stat("num_strays"), | |
426 | expect_val=0, | |
427 | timeout=60 | |
428 | ) | |
429 | ||
430 | # See the reintegration counter increment | |
431 | curr_reintegrated = self.get_mdc_stat("strays_reintegrated") | |
432 | self.assertGreater(curr_reintegrated, last_reintegrated) | |
433 | last_reintegrated = curr_reintegrated | |
434 | ||
435 | # Flush the journal | |
436 | self.fs.mds_asok(['flush', 'journal']) | |
437 | ||
438 | # See that the backtrace for the file points to the newest link's path | |
439 | post_reint_bt = self.fs.read_backtrace(ino) | |
440 | self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "file_d") | |
441 | ||
442 | # Now really delete it | |
443 | self.mount_a.run_shell(["rm", "-f", "dir_2/file_d"]) | |
444 | self._wait_for_counter("mds_cache", "strays_enqueued", 1) | |
445 | self._wait_for_counter("purge_queue", "pq_executed", 1) | |
446 | ||
447 | self.assert_purge_idle() | |
448 | self.assertTrue(self.fs.data_objects_absent(ino, size_mb * 1024 * 1024)) | |
449 | ||
450 | # We caused the inode to go stray 3 times | |
451 | self.assertEqual(self.get_mdc_stat("strays_created"), 3) | |
452 | # We purged it at the last | |
453 | self.assertEqual(self.get_mdc_stat("strays_enqueued"), 1) | |
454 | ||
455 | def test_mv_hardlink_cleanup(self): | |
456 | """ | |
457 | That when doing a rename from A to B, and B has hardlinks, | |
458 | then we make a stray for B which is then reintegrated | |
459 | into one of his hardlinks. | |
460 | """ | |
461 | # Create file_a, file_b, and a hardlink to file_b | |
462 | size_mb = 8 | |
463 | self.mount_a.write_n_mb("file_a", size_mb) | |
464 | file_a_ino = self.mount_a.path_to_ino("file_a") | |
465 | ||
466 | self.mount_a.write_n_mb("file_b", size_mb) | |
467 | file_b_ino = self.mount_a.path_to_ino("file_b") | |
468 | ||
469 | self.mount_a.run_shell(["ln", "file_b", "linkto_b"]) | |
470 | self.assertEqual(self.mount_a.path_to_ino("linkto_b"), file_b_ino) | |
471 | ||
472 | # mv file_a file_b | |
473 | self.mount_a.run_shell(["mv", "file_a", "file_b"]) | |
474 | ||
475 | # Stray reintegration should happen as a result of the notify_stray call on | |
476 | # completion of rename | |
477 | self.wait_until_equal( | |
478 | lambda: self.get_mdc_stat("num_strays"), | |
479 | expect_val=0, | |
480 | timeout=60 | |
481 | ) | |
482 | ||
483 | self.assertEqual(self.get_mdc_stat("strays_created"), 1) | |
484 | self.assertGreaterEqual(self.get_mdc_stat("strays_reintegrated"), 1) | |
485 | ||
486 | # No data objects should have been deleted, as both files still have linkage. | |
487 | self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024)) | |
488 | self.assertTrue(self.fs.data_objects_present(file_b_ino, size_mb * 1024 * 1024)) | |
489 | ||
490 | self.fs.mds_asok(['flush', 'journal']) | |
491 | ||
492 | post_reint_bt = self.fs.read_backtrace(file_b_ino) | |
493 | self.assertEqual(post_reint_bt['ancestors'][0]['dname'], "linkto_b") | |
494 | ||
495 | def _setup_two_ranks(self): | |
496 | # Set up two MDSs | |
497 | self.fs.set_max_mds(2) | |
498 | ||
499 | # See that we have two active MDSs | |
500 | self.wait_until_equal(lambda: len(self.fs.get_active_names()), 2, 30, | |
501 | reject_fn=lambda v: v > 2 or v < 1) | |
502 | ||
503 | active_mds_names = self.fs.get_active_names() | |
504 | rank_0_id = active_mds_names[0] | |
505 | rank_1_id = active_mds_names[1] | |
506 | log.info("Ranks 0 and 1 are {0} and {1}".format( | |
507 | rank_0_id, rank_1_id)) | |
508 | ||
509 | # Get rid of other MDS daemons so that it's easier to know which | |
510 | # daemons to expect in which ranks after restarts | |
511 | for unneeded_mds in set(self.mds_cluster.mds_ids) - {rank_0_id, rank_1_id}: | |
512 | self.mds_cluster.mds_stop(unneeded_mds) | |
513 | self.mds_cluster.mds_fail(unneeded_mds) | |
514 | ||
515 | return rank_0_id, rank_1_id | |
516 | ||
517 | def _force_migrate(self, to_id, path, watch_ino): | |
518 | """ | |
519 | :param to_id: MDS id to move it to | |
520 | :param path: Filesystem path (string) to move | |
521 | :param watch_ino: Inode number to look for at destination to confirm move | |
522 | :return: None | |
523 | """ | |
524 | self.mount_a.run_shell(["setfattr", "-n", "ceph.dir.pin", "-v", "1", path]) | |
525 | ||
526 | # Poll the MDS cache dump to watch for the export completing | |
527 | migrated = False | |
528 | migrate_timeout = 60 | |
529 | migrate_elapsed = 0 | |
530 | while not migrated: | |
531 | data = self.fs.mds_asok(["dump", "cache"], to_id) | |
532 | for inode_data in data: | |
533 | if inode_data['ino'] == watch_ino: | |
534 | log.debug("Found ino in cache: {0}".format(json.dumps(inode_data, indent=2))) | |
535 | if inode_data['is_auth'] is True: | |
536 | migrated = True | |
537 | break | |
538 | ||
539 | if not migrated: | |
540 | if migrate_elapsed > migrate_timeout: | |
541 | raise RuntimeError("Migration hasn't happened after {0}s!".format(migrate_elapsed)) | |
542 | else: | |
543 | migrate_elapsed += 1 | |
544 | time.sleep(1) | |
545 | ||
546 | def _is_stopped(self, rank): | |
547 | mds_map = self.fs.get_mds_map() | |
548 | return rank not in [i['rank'] for i in mds_map['info'].values()] | |
549 | ||
550 | def test_purge_on_shutdown(self): | |
551 | """ | |
552 | That when an MDS rank is shut down, its purge queue is | |
553 | drained in the process. | |
554 | """ | |
555 | rank_0_id, rank_1_id = self._setup_two_ranks() | |
556 | ||
557 | self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0") | |
558 | self.mds_cluster.mds_fail_restart(rank_1_id) | |
559 | self.fs.wait_for_daemons() | |
560 | ||
561 | file_count = 5 | |
562 | ||
563 | self.mount_a.create_n_files("delete_me/file", file_count) | |
564 | ||
565 | self._force_migrate(rank_1_id, "delete_me", | |
566 | self.mount_a.path_to_ino("delete_me/file_0")) | |
567 | ||
568 | self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")]) | |
569 | self.mount_a.umount_wait() | |
570 | ||
571 | # See all the strays go into purge queue | |
572 | self._wait_for_counter("mds_cache", "strays_created", file_count, mds_id=rank_1_id) | |
573 | self._wait_for_counter("mds_cache", "strays_enqueued", file_count, mds_id=rank_1_id) | |
574 | self.assertEqual(self.get_stat("mds_cache", "num_strays", mds_id=rank_1_id), 0) | |
575 | ||
576 | # See nothing get purged from the purge queue (yet) | |
577 | time.sleep(10) | |
578 | self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0) | |
579 | ||
580 | # Shut down rank 1 | |
581 | self.fs.set_max_mds(1) | |
582 | self.fs.deactivate(1) | |
583 | ||
584 | # It shouldn't proceed past stopping because its still not allowed | |
585 | # to purge | |
586 | time.sleep(10) | |
587 | self.assertEqual(self.get_stat("purge_queue", "pq_executed", mds_id=rank_1_id), 0) | |
588 | self.assertFalse(self._is_stopped(1)) | |
589 | ||
590 | # Permit the daemon to start purging again | |
591 | self.fs.mon_manager.raw_cluster_cmd('tell', 'mds.{0}'.format(rank_1_id), | |
592 | 'injectargs', | |
593 | "--mds_max_purge_files 100") | |
594 | ||
595 | # It should now proceed through shutdown | |
596 | self.wait_until_true( | |
597 | lambda: self._is_stopped(1), | |
598 | timeout=60 | |
599 | ) | |
600 | ||
601 | # ...and in the process purge all that data | |
602 | self.await_data_pool_empty() | |
603 | ||
604 | def test_migration_on_shutdown(self): | |
605 | """ | |
606 | That when an MDS rank is shut down, any non-purgeable strays | |
607 | get migrated to another rank. | |
608 | """ | |
609 | ||
610 | rank_0_id, rank_1_id = self._setup_two_ranks() | |
611 | ||
612 | # Create a non-purgeable stray in a ~mds1 stray directory | |
613 | # by doing a hard link and deleting the original file | |
614 | self.mount_a.run_shell(["mkdir", "dir_1", "dir_2"]) | |
615 | self.mount_a.run_shell(["touch", "dir_1/original"]) | |
616 | self.mount_a.run_shell(["ln", "dir_1/original", "dir_2/linkto"]) | |
617 | ||
618 | self._force_migrate(rank_1_id, "dir_1", | |
619 | self.mount_a.path_to_ino("dir_1/original")) | |
620 | ||
621 | # empty mds cache. otherwise mds reintegrates stray when unlink finishes | |
622 | self.mount_a.umount_wait() | |
623 | self.fs.mds_asok(['flush', 'journal'], rank_0_id) | |
624 | self.fs.mds_asok(['flush', 'journal'], rank_1_id) | |
625 | self.fs.mds_fail_restart() | |
626 | self.fs.wait_for_daemons() | |
627 | ||
628 | active_mds_names = self.fs.get_active_names() | |
629 | rank_0_id = active_mds_names[0] | |
630 | rank_1_id = active_mds_names[1] | |
631 | ||
632 | self.mount_a.mount() | |
633 | ||
634 | self.mount_a.run_shell(["rm", "-f", "dir_1/original"]) | |
635 | self.mount_a.umount_wait() | |
636 | ||
637 | self._wait_for_counter("mds_cache", "strays_created", 1, | |
638 | mds_id=rank_1_id) | |
639 | ||
640 | # Shut down rank 1 | |
641 | self.fs.mon_manager.raw_cluster_cmd_result('mds', 'set', "max_mds", "1") | |
642 | self.fs.mon_manager.raw_cluster_cmd_result('mds', 'deactivate', "1") | |
643 | ||
644 | # Wait til we get to a single active MDS mdsmap state | |
645 | self.wait_until_true(lambda: self._is_stopped(1), timeout=120) | |
646 | ||
647 | # See that the stray counter on rank 0 has incremented | |
648 | self.assertEqual(self.get_mdc_stat("strays_created", rank_0_id), 1) | |
649 | ||
650 | def assert_backtrace(self, ino, expected_path): | |
651 | """ | |
652 | Assert that the backtrace in the data pool for an inode matches | |
653 | an expected /foo/bar path. | |
654 | """ | |
655 | expected_elements = expected_path.strip("/").split("/") | |
656 | bt = self.fs.read_backtrace(ino) | |
657 | actual_elements = list(reversed([dn['dname'] for dn in bt['ancestors']])) | |
658 | self.assertListEqual(expected_elements, actual_elements) | |
659 | ||
660 | def get_backtrace_path(self, ino): | |
661 | bt = self.fs.read_backtrace(ino) | |
662 | elements = reversed([dn['dname'] for dn in bt['ancestors']]) | |
663 | return "/".join(elements) | |
664 | ||
665 | def assert_purge_idle(self): | |
666 | """ | |
667 | Assert that the MDS perf counters indicate no strays exist and | |
668 | no ongoing purge activity. Sanity check for when PurgeQueue should | |
669 | be idle. | |
670 | """ | |
671 | mdc_stats = self.fs.mds_asok(['perf', 'dump', "mds_cache"])['mds_cache'] | |
672 | pq_stats = self.fs.mds_asok(['perf', 'dump', "purge_queue"])['purge_queue'] | |
673 | self.assertEqual(mdc_stats["num_strays"], 0) | |
674 | self.assertEqual(mdc_stats["num_strays_delayed"], 0) | |
675 | self.assertEqual(pq_stats["pq_executing"], 0) | |
676 | self.assertEqual(pq_stats["pq_executing_ops"], 0) | |
677 | ||
678 | def test_mv_cleanup(self): | |
679 | """ | |
680 | That when doing a rename from A to B, and B has no hardlinks, | |
681 | then we make a stray for B and purge him. | |
682 | """ | |
683 | # Create file_a and file_b, write some to both | |
684 | size_mb = 8 | |
685 | self.mount_a.write_n_mb("file_a", size_mb) | |
686 | file_a_ino = self.mount_a.path_to_ino("file_a") | |
687 | self.mount_a.write_n_mb("file_b", size_mb) | |
688 | file_b_ino = self.mount_a.path_to_ino("file_b") | |
689 | ||
690 | self.fs.mds_asok(['flush', 'journal']) | |
691 | self.assert_backtrace(file_a_ino, "file_a") | |
692 | self.assert_backtrace(file_b_ino, "file_b") | |
693 | ||
694 | # mv file_a file_b | |
695 | self.mount_a.run_shell(['mv', 'file_a', 'file_b']) | |
696 | ||
697 | # See that stray counter increments | |
698 | self.assertEqual(self.get_mdc_stat("strays_created"), 1) | |
699 | # Wait for purge counter to increment | |
700 | self._wait_for_counter("mds_cache", "strays_enqueued", 1) | |
701 | self._wait_for_counter("purge_queue", "pq_executed", 1) | |
702 | ||
703 | self.assert_purge_idle() | |
704 | ||
705 | # file_b should have been purged | |
706 | self.assertTrue(self.fs.data_objects_absent(file_b_ino, size_mb * 1024 * 1024)) | |
707 | ||
708 | # Backtrace should have updated from file_a to file_b | |
709 | self.fs.mds_asok(['flush', 'journal']) | |
710 | self.assert_backtrace(file_a_ino, "file_b") | |
711 | ||
712 | # file_a's data should still exist | |
713 | self.assertTrue(self.fs.data_objects_present(file_a_ino, size_mb * 1024 * 1024)) | |
714 | ||
715 | def _pool_df(self, pool_name): | |
716 | """ | |
717 | Return a dict like | |
718 | { | |
719 | "kb_used": 0, | |
720 | "bytes_used": 0, | |
721 | "max_avail": 19630292406, | |
722 | "objects": 0 | |
723 | } | |
724 | ||
725 | :param pool_name: Which pool (must exist) | |
726 | """ | |
727 | out = self.fs.mon_manager.raw_cluster_cmd("df", "--format=json-pretty") | |
728 | for p in json.loads(out)['pools']: | |
729 | if p['name'] == pool_name: | |
730 | return p['stats'] | |
731 | ||
732 | raise RuntimeError("Pool '{0}' not found".format(pool_name)) | |
733 | ||
734 | def await_data_pool_empty(self): | |
735 | self.wait_until_true( | |
736 | lambda: self._pool_df( | |
737 | self.fs.get_data_pool_name() | |
738 | )['objects'] == 0, | |
739 | timeout=60) | |
740 | ||
741 | def test_snapshot_remove(self): | |
742 | """ | |
743 | That removal of a snapshot that references a now-unlinked file results | |
744 | in purging on the stray for the file. | |
745 | """ | |
746 | # Enable snapshots | |
747 | self.fs.mon_manager.raw_cluster_cmd("mds", "set", "allow_new_snaps", "true", | |
748 | "--yes-i-really-mean-it") | |
749 | ||
750 | # Create a dir with a file in it | |
751 | size_mb = 8 | |
752 | self.mount_a.run_shell(["mkdir", "snapdir"]) | |
753 | self.mount_a.run_shell(["mkdir", "snapdir/subdir"]) | |
754 | self.mount_a.write_test_pattern("snapdir/subdir/file_a", size_mb * 1024 * 1024) | |
755 | file_a_ino = self.mount_a.path_to_ino("snapdir/subdir/file_a") | |
756 | ||
757 | # Snapshot the dir | |
758 | self.mount_a.run_shell(["mkdir", "snapdir/.snap/snap1"]) | |
759 | ||
760 | # Cause the head revision to deviate from the snapshot | |
761 | self.mount_a.write_n_mb("snapdir/subdir/file_a", size_mb) | |
762 | ||
763 | # Flush the journal so that backtraces, dirfrag objects will actually be written | |
764 | self.fs.mds_asok(["flush", "journal"]) | |
765 | ||
766 | # Unlink the file | |
767 | self.mount_a.run_shell(["rm", "-f", "snapdir/subdir/file_a"]) | |
768 | self.mount_a.run_shell(["rmdir", "snapdir/subdir"]) | |
769 | ||
770 | # Unmount the client because when I come back to check the data is still | |
771 | # in the file I don't want to just see what's in the page cache. | |
772 | self.mount_a.umount_wait() | |
773 | ||
774 | self.assertEqual(self.get_mdc_stat("strays_created"), 2) | |
775 | ||
776 | # FIXME: at this stage we see a purge and the stray count drops to | |
777 | # zero, but there's actually still a stray, so at the very | |
778 | # least the StrayManager stats code is slightly off | |
779 | ||
780 | self.mount_a.mount() | |
781 | ||
782 | # See that the data from the snapshotted revision of the file is still present | |
783 | # and correct | |
784 | self.mount_a.validate_test_pattern("snapdir/.snap/snap1/subdir/file_a", size_mb * 1024 * 1024) | |
785 | ||
786 | # Remove the snapshot | |
787 | self.mount_a.run_shell(["rmdir", "snapdir/.snap/snap1"]) | |
788 | ||
789 | # Purging file_a doesn't happen until after we've flushed the journal, because | |
790 | # it is referenced by the snapshotted subdir, and the snapshot isn't really | |
791 | # gone until the journal references to it are gone | |
792 | self.fs.mds_asok(["flush", "journal"]) | |
793 | ||
794 | # Wait for purging to complete, which requires the OSDMap to propagate to the OSDs. | |
795 | # See also: http://tracker.ceph.com/issues/20072 | |
796 | self.wait_until_true( | |
797 | lambda: self.fs.data_objects_absent(file_a_ino, size_mb * 1024 * 1024), | |
798 | timeout=60 | |
799 | ) | |
800 | ||
801 | # See that a purge happens now | |
802 | self._wait_for_counter("mds_cache", "strays_enqueued", 2) | |
803 | self._wait_for_counter("purge_queue", "pq_executed", 2) | |
804 | ||
805 | self.await_data_pool_empty() | |
806 | ||
807 | def test_fancy_layout(self): | |
808 | """ | |
809 | purge stray file with fancy layout | |
810 | """ | |
811 | ||
812 | file_name = "fancy_layout_file" | |
813 | self.mount_a.run_shell(["touch", file_name]) | |
814 | ||
815 | file_layout = "stripe_unit=1048576 stripe_count=4 object_size=8388608" | |
816 | self.mount_a.setfattr(file_name, "ceph.file.layout", file_layout) | |
817 | ||
818 | # 35MB requires 7 objects | |
819 | size_mb = 35 | |
820 | self.mount_a.write_n_mb(file_name, size_mb) | |
821 | ||
822 | self.mount_a.run_shell(["rm", "-f", file_name]) | |
823 | self.fs.mds_asok(["flush", "journal"]) | |
824 | ||
825 | # can't use self.fs.data_objects_absent here, it does not support fancy layout | |
826 | self.await_data_pool_empty() | |
827 | ||
828 | def test_dirfrag_limit(self): | |
829 | """ | |
830 | That the directory fragment size cannot exceed mds_bal_fragment_size_max (using a limit of 50 in all configurations). | |
831 | ||
832 | That fragmentation (forced) will allow more entries to be created. | |
833 | ||
834 | That unlinking fails when the stray directory fragment becomes too large and that unlinking may continue once those strays are purged. | |
835 | """ | |
836 | ||
837 | self.fs.set_allow_dirfrags(True) | |
838 | ||
839 | LOW_LIMIT = 50 | |
840 | for mds in self.fs.get_daemon_names(): | |
841 | self.fs.mds_asok(["config", "set", "mds_bal_fragment_size_max", str(LOW_LIMIT)], mds) | |
842 | ||
843 | try: | |
844 | self.mount_a.run_python(dedent(""" | |
845 | import os | |
846 | path = os.path.join("{path}", "subdir") | |
847 | os.mkdir(path) | |
848 | for n in range(0, {file_count}): | |
849 | open(os.path.join(path, "%s" % n), 'w').write("%s" % n) | |
850 | """.format( | |
851 | path=self.mount_a.mountpoint, | |
852 | file_count=LOW_LIMIT+1 | |
853 | ))) | |
854 | except CommandFailedError: | |
855 | pass # ENOSPAC | |
856 | else: | |
857 | raise RuntimeError("fragment size exceeded") | |
858 | ||
859 | # Now test that we can go beyond the limit if we fragment the directory | |
860 | ||
861 | self.mount_a.run_python(dedent(""" | |
862 | import os | |
863 | path = os.path.join("{path}", "subdir2") | |
864 | os.mkdir(path) | |
865 | for n in range(0, {file_count}): | |
866 | open(os.path.join(path, "%s" % n), 'w').write("%s" % n) | |
867 | dfd = os.open(path, os.O_DIRECTORY) | |
868 | os.fsync(dfd) | |
869 | """.format( | |
870 | path=self.mount_a.mountpoint, | |
871 | file_count=LOW_LIMIT | |
872 | ))) | |
873 | ||
874 | # Ensure that subdir2 is fragmented | |
875 | mds_id = self.fs.get_active_names()[0] | |
876 | self.fs.mds_asok(["dirfrag", "split", "/subdir2", "0/0", "1"], mds_id) | |
877 | ||
878 | # remount+flush (release client caps) | |
879 | self.mount_a.umount_wait() | |
880 | self.fs.mds_asok(["flush", "journal"], mds_id) | |
881 | self.mount_a.mount() | |
882 | self.mount_a.wait_until_mounted() | |
883 | ||
884 | # Create 50% more files than the current fragment limit | |
885 | self.mount_a.run_python(dedent(""" | |
886 | import os | |
887 | path = os.path.join("{path}", "subdir2") | |
888 | for n in range({file_count}, ({file_count}*3)//2): | |
889 | open(os.path.join(path, "%s" % n), 'w').write("%s" % n) | |
890 | """.format( | |
891 | path=self.mount_a.mountpoint, | |
892 | file_count=LOW_LIMIT | |
893 | ))) | |
894 | ||
895 | # Now test the stray directory size is limited and recovers | |
896 | strays_before = self.get_mdc_stat("strays_created") | |
897 | try: | |
898 | self.mount_a.run_python(dedent(""" | |
899 | import os | |
900 | path = os.path.join("{path}", "subdir3") | |
901 | os.mkdir(path) | |
902 | for n in range({file_count}): | |
903 | fpath = os.path.join(path, "%s" % n) | |
904 | f = open(fpath, 'w') | |
905 | f.write("%s" % n) | |
906 | f.close() | |
907 | os.unlink(fpath) | |
908 | """.format( | |
909 | path=self.mount_a.mountpoint, | |
910 | file_count=LOW_LIMIT*10 # 10 stray directories, should collide before this count | |
911 | ))) | |
912 | except CommandFailedError: | |
913 | pass # ENOSPAC | |
914 | else: | |
915 | raise RuntimeError("fragment size exceeded") | |
916 | ||
917 | strays_after = self.get_mdc_stat("strays_created") | |
918 | self.assertGreaterEqual(strays_after-strays_before, LOW_LIMIT) | |
919 | ||
920 | self._wait_for_counter("mds_cache", "strays_enqueued", strays_after) | |
921 | self._wait_for_counter("purge_queue", "pq_executed", strays_after) | |
922 | ||
923 | self.mount_a.run_python(dedent(""" | |
924 | import os | |
925 | path = os.path.join("{path}", "subdir4") | |
926 | os.mkdir(path) | |
927 | for n in range({file_count}): | |
928 | fpath = os.path.join(path, "%s" % n) | |
929 | f = open(fpath, 'w') | |
930 | f.write("%s" % n) | |
931 | f.close() | |
932 | os.unlink(fpath) | |
933 | """.format( | |
934 | path=self.mount_a.mountpoint, | |
935 | file_count=LOW_LIMIT | |
936 | ))) | |
937 | ||
938 | def test_purge_queue_upgrade(self): | |
939 | """ | |
940 | That when starting on a system with no purge queue in the metadata | |
941 | pool, we silently create one. | |
942 | :return: | |
943 | """ | |
944 | ||
945 | self.mds_cluster.mds_stop() | |
946 | self.mds_cluster.mds_fail() | |
947 | self.fs.rados(["rm", "500.00000000"]) | |
948 | self.mds_cluster.mds_restart() | |
949 | self.fs.wait_for_daemons() | |
950 | ||
951 | def test_purge_queue_op_rate(self): | |
952 | """ | |
953 | A busy purge queue is meant to aggregate operations sufficiently | |
954 | that our RADOS ops to the metadata pool are not O(files). Check | |
955 | that that is so. | |
956 | :return: | |
957 | """ | |
958 | ||
959 | # For low rates of deletion, the rate of metadata ops actually | |
960 | # will be o(files), so to see the desired behaviour we have to give | |
961 | # the system a significant quantity, i.e. an order of magnitude | |
962 | # more than the number of files it will purge at one time. | |
963 | ||
964 | max_purge_files = 2 | |
965 | ||
966 | self.set_conf('mds', 'mds_bal_frag', 'false') | |
967 | self.set_conf('mds', 'mds_max_purge_files', "%d" % max_purge_files) | |
968 | self.fs.mds_fail_restart() | |
969 | self.fs.wait_for_daemons() | |
970 | ||
971 | phase_1_files = 256 | |
972 | phase_2_files = 512 | |
973 | ||
974 | self.mount_a.run_shell(["mkdir", "phase1"]) | |
975 | self.mount_a.create_n_files("phase1/file", phase_1_files) | |
976 | ||
977 | self.mount_a.run_shell(["mkdir", "phase2"]) | |
978 | self.mount_a.create_n_files("phase2/file", phase_2_files) | |
979 | ||
980 | def unlink_and_count_ops(path, expected_deletions): | |
981 | initial_ops = self.get_stat("objecter", "op") | |
982 | initial_pq_executed = self.get_stat("purge_queue", "pq_executed") | |
983 | ||
984 | self.mount_a.run_shell(["rm", "-rf", path]) | |
985 | ||
986 | self._wait_for_counter( | |
987 | "purge_queue", "pq_executed", initial_pq_executed + expected_deletions | |
988 | ) | |
989 | ||
990 | final_ops = self.get_stat("objecter", "op") | |
991 | ||
992 | # Calculation of the *overhead* operations, i.e. do not include | |
993 | # the operations where we actually delete files. | |
994 | return final_ops - initial_ops - expected_deletions | |
995 | ||
996 | self.fs.mds_asok(['flush', 'journal']) | |
997 | phase1_ops = unlink_and_count_ops("phase1/", phase_1_files + 1) | |
998 | ||
999 | self.fs.mds_asok(['flush', 'journal']) | |
1000 | phase2_ops = unlink_and_count_ops("phase2/", phase_2_files + 1) | |
1001 | ||
1002 | log.info("Phase 1: {0}".format(phase1_ops)) | |
1003 | log.info("Phase 2: {0}".format(phase2_ops)) | |
1004 | ||
1005 | # The success criterion is that deleting double the number | |
1006 | # of files doesn't generate double the number of overhead ops | |
1007 | # -- this comparison is a rough approximation of that rule. | |
1008 | self.assertTrue(phase2_ops < phase1_ops * 1.25) | |
1009 | ||
1010 | # Finally, check that our activity did include properly quiescing | |
1011 | # the queue (i.e. call to Journaler::write_head in the right place), | |
1012 | # by restarting the MDS and checking that it doesn't try re-executing | |
1013 | # any of the work we did. | |
1014 | self.fs.mds_asok(['flush', 'journal']) # flush to ensure no strays | |
1015 | # hanging around | |
1016 | self.fs.mds_fail_restart() | |
1017 | self.fs.wait_for_daemons() | |
1018 | time.sleep(10) | |
1019 | self.assertEqual(self.get_stat("purge_queue", "pq_executed"), 0) | |
1020 | ||
1021 | def test_replicated_delete_speed(self): | |
1022 | """ | |
1023 | That deletions of replicated metadata are not pathologically slow | |
1024 | """ | |
1025 | rank_0_id, rank_1_id = self._setup_two_ranks() | |
1026 | ||
1027 | self.set_conf("mds.{0}".format(rank_1_id), 'mds_max_purge_files', "0") | |
1028 | self.mds_cluster.mds_fail_restart(rank_1_id) | |
1029 | self.fs.wait_for_daemons() | |
1030 | ||
1031 | file_count = 10 | |
1032 | ||
1033 | self.mount_a.create_n_files("delete_me/file", file_count) | |
1034 | ||
1035 | self._force_migrate(rank_1_id, "delete_me", | |
1036 | self.mount_a.path_to_ino("delete_me/file_0")) | |
1037 | ||
1038 | begin = datetime.datetime.now() | |
1039 | self.mount_a.run_shell(["rm", "-rf", Raw("delete_me/*")]) | |
1040 | end = datetime.datetime.now() | |
1041 | ||
1042 | # What we're really checking here is that we are completing client | |
1043 | # operations immediately rather than delaying until the next tick. | |
1044 | tick_period = float(self.fs.get_config("mds_tick_interval", | |
1045 | service_type="mds")) | |
1046 | ||
1047 | duration = (end - begin).total_seconds() | |
1048 | self.assertLess(duration, (file_count * tick_period) * 0.25) | |
1049 |