]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | import json |
2 | import logging | |
3 | import os | |
4 | from textwrap import dedent | |
9f95a23c TL |
5 | try: |
6 | from typing import Optional | |
7 | except: | |
8 | # make it work for python2 | |
9 | pass | |
20effc67 | 10 | from teuthology.exceptions import CommandFailedError |
7c673cae FG |
11 | from tasks.cephfs.fuse_mount import FuseMount |
12 | from tasks.cephfs.cephfs_test_case import CephFSTestCase | |
13 | ||
14 | ||
15 | log = logging.getLogger(__name__) | |
16 | ||
17 | ||
18 | class FullnessTestCase(CephFSTestCase): | |
19 | CLIENTS_REQUIRED = 2 | |
20 | ||
21 | # Subclasses define whether they're filling whole cluster or just data pool | |
22 | data_only = False | |
23 | ||
24 | # Subclasses define how many bytes should be written to achieve fullness | |
9f95a23c | 25 | pool_capacity = None # type: Optional[int] |
7c673cae FG |
26 | fill_mb = None |
27 | ||
7c673cae | 28 | def is_full(self): |
522d829b | 29 | return self.fs.is_full() |
7c673cae FG |
30 | |
31 | def setUp(self): | |
32 | CephFSTestCase.setUp(self) | |
33 | ||
92f5a8d4 | 34 | mds_status = self.fs.rank_asok(["status"]) |
7c673cae FG |
35 | |
36 | # Capture the initial OSD map epoch for later use | |
92f5a8d4 | 37 | self.initial_osd_epoch = mds_status['osdmap_epoch_barrier'] |
7c673cae FG |
38 | |
39 | def test_barrier(self): | |
40 | """ | |
41 | That when an OSD epoch barrier is set on an MDS, subsequently | |
42 | issued capabilities cause clients to update their OSD map to that | |
43 | epoch. | |
44 | """ | |
45 | ||
1911f103 TL |
46 | # script that sync up client with MDS OSD map barrier. The barrier should |
47 | # be updated by cap flush ack message. | |
48 | pyscript = dedent(""" | |
49 | import os | |
50 | fd = os.open("{path}", os.O_CREAT | os.O_RDWR, 0O600) | |
51 | os.fchmod(fd, 0O666) | |
52 | os.fsync(fd) | |
53 | os.close(fd) | |
54 | """) | |
55 | ||
56 | # Sync up client with initial MDS OSD map barrier. | |
57 | path = os.path.join(self.mount_a.mountpoint, "foo") | |
58 | self.mount_a.run_python(pyscript.format(path=path)) | |
7c673cae FG |
59 | |
60 | # Grab mounts' initial OSD epochs: later we will check that | |
61 | # it hasn't advanced beyond this point. | |
1911f103 | 62 | mount_a_initial_epoch, mount_a_initial_barrier = self.mount_a.get_osd_epoch() |
7c673cae FG |
63 | |
64 | # Freshly mounted at start of test, should be up to date with OSD map | |
65 | self.assertGreaterEqual(mount_a_initial_epoch, self.initial_osd_epoch) | |
7c673cae FG |
66 | |
67 | # Set and unset a flag to cause OSD epoch to increment | |
68 | self.fs.mon_manager.raw_cluster_cmd("osd", "set", "pause") | |
69 | self.fs.mon_manager.raw_cluster_cmd("osd", "unset", "pause") | |
70 | ||
71 | out = self.fs.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json").strip() | |
72 | new_epoch = json.loads(out)['epoch'] | |
73 | self.assertNotEqual(self.initial_osd_epoch, new_epoch) | |
74 | ||
75 | # Do a metadata operation on clients, witness that they end up with | |
76 | # the old OSD map from startup time (nothing has prompted client | |
77 | # to update its map) | |
1911f103 TL |
78 | path = os.path.join(self.mount_a.mountpoint, "foo") |
79 | self.mount_a.run_python(pyscript.format(path=path)) | |
7c673cae FG |
80 | mount_a_epoch, mount_a_barrier = self.mount_a.get_osd_epoch() |
81 | self.assertEqual(mount_a_epoch, mount_a_initial_epoch) | |
1911f103 | 82 | self.assertEqual(mount_a_barrier, mount_a_initial_barrier) |
7c673cae FG |
83 | |
84 | # Set a barrier on the MDS | |
92f5a8d4 | 85 | self.fs.rank_asok(["osdmap", "barrier", new_epoch.__str__()]) |
7c673cae | 86 | |
1911f103 TL |
87 | # Sync up client with new MDS OSD map barrier |
88 | path = os.path.join(self.mount_a.mountpoint, "baz") | |
89 | self.mount_a.run_python(pyscript.format(path=path)) | |
90 | mount_a_epoch, mount_a_barrier = self.mount_a.get_osd_epoch() | |
91 | self.assertEqual(mount_a_barrier, new_epoch) | |
7c673cae FG |
92 | |
93 | # Some time passes here because the metadata part of the operation | |
94 | # completes immediately, while the resulting OSD map update happens | |
95 | # asynchronously (it's an Objecter::_maybe_request_map) as a result | |
96 | # of seeing the new epoch barrier. | |
1911f103 TL |
97 | self.wait_until_true( |
98 | lambda: self.mount_a.get_osd_epoch()[0] >= new_epoch, | |
99 | timeout=30) | |
7c673cae FG |
100 | |
101 | def _data_pool_name(self): | |
102 | data_pool_names = self.fs.get_data_pool_names() | |
103 | if len(data_pool_names) > 1: | |
104 | raise RuntimeError("This test can't handle multiple data pools") | |
105 | else: | |
106 | return data_pool_names[0] | |
107 | ||
108 | def _test_full(self, easy_case): | |
109 | """ | |
110 | - That a client trying to write data to a file is prevented | |
111 | from doing so with an -EFULL result | |
112 | - That they are also prevented from creating new files by the MDS. | |
113 | - That they may delete another file to get the system healthy again | |
114 | ||
115 | :param easy_case: if true, delete a successfully written file to | |
116 | free up space. else, delete the file that experienced | |
117 | the failed write. | |
118 | """ | |
119 | ||
11fdf7f2 | 120 | osd_mon_report_interval = int(self.fs.get_config("osd_mon_report_interval", service_type='osd')) |
7c673cae FG |
121 | |
122 | log.info("Writing {0}MB should fill this cluster".format(self.fill_mb)) | |
123 | ||
124 | # Fill up the cluster. This dd may or may not fail, as it depends on | |
125 | # how soon the cluster recognises its own fullness | |
e306af50 | 126 | self.mount_a.write_n_mb("large_file_a", self.fill_mb // 2) |
7c673cae | 127 | try: |
522d829b | 128 | self.mount_a.write_n_mb("large_file_b", (self.fill_mb * 1.1) // 2) |
7c673cae FG |
129 | except CommandFailedError: |
130 | log.info("Writing file B failed (full status happened already)") | |
131 | assert self.is_full() | |
132 | else: | |
133 | log.info("Writing file B succeeded (full status will happen soon)") | |
134 | self.wait_until_true(lambda: self.is_full(), | |
522d829b | 135 | timeout=osd_mon_report_interval * 120) |
7c673cae FG |
136 | |
137 | # Attempting to write more data should give me ENOSPC | |
138 | with self.assertRaises(CommandFailedError) as ar: | |
e306af50 | 139 | self.mount_a.write_n_mb("large_file_b", 50, seek=self.fill_mb // 2) |
7c673cae FG |
140 | self.assertEqual(ar.exception.exitstatus, 1) # dd returns 1 on "No space" |
141 | ||
142 | # Wait for the MDS to see the latest OSD map so that it will reliably | |
143 | # be applying the policy of rejecting non-deletion metadata operations | |
144 | # while in the full state. | |
145 | osd_epoch = json.loads(self.fs.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['epoch'] | |
146 | self.wait_until_true( | |
92f5a8d4 | 147 | lambda: self.fs.rank_asok(['status'])['osdmap_epoch'] >= osd_epoch, |
7c673cae FG |
148 | timeout=10) |
149 | ||
150 | if not self.data_only: | |
151 | with self.assertRaises(CommandFailedError): | |
152 | self.mount_a.write_n_mb("small_file_1", 0) | |
153 | ||
154 | # Clear out some space | |
155 | if easy_case: | |
156 | self.mount_a.run_shell(['rm', '-f', 'large_file_a']) | |
157 | self.mount_a.run_shell(['rm', '-f', 'large_file_b']) | |
158 | else: | |
159 | # In the hard case it is the file that filled the system. | |
160 | # Before the new #7317 (ENOSPC, epoch barrier) changes, this | |
161 | # would fail because the last objects written would be | |
162 | # stuck in the client cache as objecter operations. | |
163 | self.mount_a.run_shell(['rm', '-f', 'large_file_b']) | |
164 | self.mount_a.run_shell(['rm', '-f', 'large_file_a']) | |
165 | ||
166 | # Here we are waiting for two things to happen: | |
167 | # * The MDS to purge the stray folder and execute object deletions | |
168 | # * The OSDs to inform the mon that they are no longer full | |
169 | self.wait_until_true(lambda: not self.is_full(), | |
522d829b | 170 | timeout=osd_mon_report_interval * 120) |
7c673cae FG |
171 | |
172 | # Wait for the MDS to see the latest OSD map so that it will reliably | |
173 | # be applying the free space policy | |
174 | osd_epoch = json.loads(self.fs.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['epoch'] | |
175 | self.wait_until_true( | |
92f5a8d4 | 176 | lambda: self.fs.rank_asok(['status'])['osdmap_epoch'] >= osd_epoch, |
7c673cae FG |
177 | timeout=10) |
178 | ||
179 | # Now I should be able to write again | |
180 | self.mount_a.write_n_mb("large_file", 50, seek=0) | |
181 | ||
182 | # Ensure that the MDS keeps its OSD epoch barrier across a restart | |
183 | ||
184 | def test_full_different_file(self): | |
185 | self._test_full(True) | |
186 | ||
187 | def test_full_same_file(self): | |
188 | self._test_full(False) | |
189 | ||
190 | def _remote_write_test(self, template): | |
191 | """ | |
192 | Run some remote python in a way that's useful for | |
193 | testing free space behaviour (see test_* methods using this) | |
194 | """ | |
195 | file_path = os.path.join(self.mount_a.mountpoint, "full_test_file") | |
196 | ||
197 | # Enough to trip the full flag | |
11fdf7f2 | 198 | osd_mon_report_interval = int(self.fs.get_config("osd_mon_report_interval", service_type='osd')) |
7c673cae FG |
199 | mon_tick_interval = int(self.fs.get_config("mon_tick_interval", service_type="mon")) |
200 | ||
201 | # Sufficient data to cause RADOS cluster to go 'full' | |
202 | log.info("pool capacity {0}, {1}MB should be enough to fill it".format(self.pool_capacity, self.fill_mb)) | |
203 | ||
204 | # Long enough for RADOS cluster to notice it is full and set flag on mons | |
205 | # (report_interval for mon to learn PG stats, tick interval for it to update OSD map, | |
206 | # factor of 1.5 for I/O + network latency in committing OSD map and distributing it | |
207 | # to the OSDs) | |
11fdf7f2 | 208 | full_wait = (osd_mon_report_interval + mon_tick_interval) * 1.5 |
7c673cae FG |
209 | |
210 | # Configs for this test should bring this setting down in order to | |
211 | # run reasonably quickly | |
11fdf7f2 | 212 | if osd_mon_report_interval > 10: |
e306af50 | 213 | log.warning("This test may run rather slowly unless you decrease" |
11fdf7f2 | 214 | "osd_mon_report_interval (5 is a good setting)!") |
7c673cae | 215 | |
522d829b TL |
216 | # set the object_size to 1MB to make the objects destributed more evenly |
217 | # among the OSDs to fix Tracker#45434 | |
218 | file_layout = "stripe_unit=1048576 stripe_count=1 object_size=1048576" | |
7c673cae FG |
219 | self.mount_a.run_python(template.format( |
220 | fill_mb=self.fill_mb, | |
221 | file_path=file_path, | |
522d829b | 222 | file_layout=file_layout, |
7c673cae FG |
223 | full_wait=full_wait, |
224 | is_fuse=isinstance(self.mount_a, FuseMount) | |
225 | )) | |
226 | ||
227 | def test_full_fclose(self): | |
228 | # A remote script which opens a file handle, fills up the filesystem, and then | |
229 | # checks that ENOSPC errors on buffered writes appear correctly as errors in fsync | |
230 | remote_script = dedent(""" | |
231 | import time | |
232 | import datetime | |
233 | import subprocess | |
234 | import os | |
235 | ||
236 | # Write some buffered data through before going full, all should be well | |
9f95a23c | 237 | print("writing some data through which we expect to succeed") |
7c673cae FG |
238 | bytes = 0 |
239 | f = os.open("{file_path}", os.O_WRONLY | os.O_CREAT) | |
522d829b | 240 | os.setxattr("{file_path}", 'ceph.file.layout', b'{file_layout}') |
9f95a23c | 241 | bytes += os.write(f, b'a' * 512 * 1024) |
7c673cae | 242 | os.fsync(f) |
9f95a23c | 243 | print("fsync'ed data successfully, will now attempt to fill fs") |
7c673cae FG |
244 | |
245 | # Okay, now we're going to fill up the filesystem, and then keep | |
246 | # writing until we see an error from fsync. As long as we're doing | |
247 | # buffered IO, the error should always only appear from fsync and not | |
248 | # from write | |
249 | full = False | |
250 | ||
94b18763 | 251 | for n in range(0, int({fill_mb} * 0.9)): |
9f95a23c TL |
252 | bytes += os.write(f, b'x' * 1024 * 1024) |
253 | print("wrote {{0}} bytes via buffered write, may repeat".format(bytes)) | |
254 | print("done writing {{0}} bytes".format(bytes)) | |
7c673cae FG |
255 | |
256 | # OK, now we should sneak in under the full condition | |
257 | # due to the time it takes the OSDs to report to the | |
258 | # mons, and get a successful fsync on our full-making data | |
259 | os.fsync(f) | |
9f95a23c | 260 | print("successfully fsync'ed prior to getting full state reported") |
7c673cae | 261 | |
94b18763 | 262 | # buffered write, add more dirty data to the buffer |
9f95a23c | 263 | print("starting buffered write") |
94b18763 FG |
264 | try: |
265 | for n in range(0, int({fill_mb} * 0.2)): | |
9f95a23c TL |
266 | bytes += os.write(f, b'x' * 1024 * 1024) |
267 | print("sleeping a bit as we've exceeded 90% of our expected full ratio") | |
94b18763 FG |
268 | time.sleep({full_wait}) |
269 | except OSError: | |
270 | pass; | |
7c673cae | 271 | |
9f95a23c | 272 | print("wrote, now waiting 30s and then doing a close we expect to fail") |
7c673cae FG |
273 | |
274 | # Wait long enough for a background flush that should fail | |
275 | time.sleep(30) | |
276 | ||
277 | if {is_fuse}: | |
278 | # ...and check that the failed background flush is reflected in fclose | |
279 | try: | |
280 | os.close(f) | |
281 | except OSError: | |
9f95a23c | 282 | print("close() returned an error as expected") |
7c673cae FG |
283 | else: |
284 | raise RuntimeError("close() failed to raise error") | |
285 | else: | |
286 | # The kernel cephfs client does not raise errors on fclose | |
287 | os.close(f) | |
288 | ||
289 | os.unlink("{file_path}") | |
290 | """) | |
291 | self._remote_write_test(remote_script) | |
292 | ||
293 | def test_full_fsync(self): | |
294 | """ | |
295 | That when the full flag is encountered during asynchronous | |
296 | flushes, such that an fwrite() succeeds but an fsync/fclose() | |
297 | should return the ENOSPC error. | |
298 | """ | |
299 | ||
300 | # A remote script which opens a file handle, fills up the filesystem, and then | |
301 | # checks that ENOSPC errors on buffered writes appear correctly as errors in fsync | |
302 | remote_script = dedent(""" | |
303 | import time | |
304 | import datetime | |
305 | import subprocess | |
306 | import os | |
307 | ||
308 | # Write some buffered data through before going full, all should be well | |
9f95a23c | 309 | print("writing some data through which we expect to succeed") |
7c673cae FG |
310 | bytes = 0 |
311 | f = os.open("{file_path}", os.O_WRONLY | os.O_CREAT) | |
522d829b | 312 | os.setxattr("{file_path}", 'ceph.file.layout', b'{file_layout}') |
9f95a23c | 313 | bytes += os.write(f, b'a' * 4096) |
7c673cae | 314 | os.fsync(f) |
9f95a23c | 315 | print("fsync'ed data successfully, will now attempt to fill fs") |
7c673cae FG |
316 | |
317 | # Okay, now we're going to fill up the filesystem, and then keep | |
318 | # writing until we see an error from fsync. As long as we're doing | |
319 | # buffered IO, the error should always only appear from fsync and not | |
320 | # from write | |
321 | full = False | |
322 | ||
94b18763 | 323 | for n in range(0, int({fill_mb} * 1.1)): |
7c673cae | 324 | try: |
9f95a23c TL |
325 | bytes += os.write(f, b'x' * 1024 * 1024) |
326 | print("wrote bytes via buffered write, moving on to fsync") | |
7c673cae | 327 | except OSError as e: |
522d829b TL |
328 | if {is_fuse}: |
329 | print("Unexpected error %s from write() instead of fsync()" % e) | |
330 | raise | |
331 | else: | |
332 | print("Reached fullness after %.2f MB" % (bytes / (1024.0 * 1024.0))) | |
333 | full = True | |
334 | break | |
7c673cae FG |
335 | |
336 | try: | |
337 | os.fsync(f) | |
9f95a23c | 338 | print("fsync'ed successfully") |
7c673cae | 339 | except OSError as e: |
9f95a23c | 340 | print("Reached fullness after %.2f MB" % (bytes / (1024.0 * 1024.0))) |
7c673cae FG |
341 | full = True |
342 | break | |
343 | else: | |
9f95a23c | 344 | print("Not full yet after %.2f MB" % (bytes / (1024.0 * 1024.0))) |
7c673cae | 345 | |
94b18763 | 346 | if n > {fill_mb} * 0.9: |
7c673cae FG |
347 | # Be cautious in the last region where we expect to hit |
348 | # the full condition, so that we don't overshoot too dramatically | |
9f95a23c | 349 | print("sleeping a bit as we've exceeded 90% of our expected full ratio") |
7c673cae FG |
350 | time.sleep({full_wait}) |
351 | ||
352 | if not full: | |
353 | raise RuntimeError("Failed to reach fullness after writing %d bytes" % bytes) | |
354 | ||
355 | # close() should not raise an error because we already caught it in | |
356 | # fsync. There shouldn't have been any more writeback errors | |
357 | # since then because all IOs got cancelled on the full flag. | |
9f95a23c | 358 | print("calling close") |
7c673cae | 359 | os.close(f) |
9f95a23c | 360 | print("close() did not raise error") |
7c673cae FG |
361 | |
362 | os.unlink("{file_path}") | |
363 | """) | |
364 | ||
365 | self._remote_write_test(remote_script) | |
366 | ||
367 | ||
368 | class TestQuotaFull(FullnessTestCase): | |
369 | """ | |
370 | Test per-pool fullness, which indicates quota limits exceeded | |
371 | """ | |
9f95a23c | 372 | pool_capacity = 1024 * 1024 * 32 # arbitrary low-ish limit |
e306af50 | 373 | fill_mb = pool_capacity // (1024 * 1024) # type: ignore |
7c673cae FG |
374 | |
375 | # We are only testing quota handling on the data pool, not the metadata | |
376 | # pool. | |
377 | data_only = True | |
378 | ||
379 | def setUp(self): | |
380 | super(TestQuotaFull, self).setUp() | |
381 | ||
382 | pool_name = self.fs.get_data_pool_name() | |
383 | self.fs.mon_manager.raw_cluster_cmd("osd", "pool", "set-quota", pool_name, | |
384 | "max_bytes", "{0}".format(self.pool_capacity)) | |
385 | ||
7c673cae FG |
386 | |
387 | class TestClusterFull(FullnessTestCase): | |
388 | """ | |
b32b8144 | 389 | Test data pool fullness, which indicates that an OSD has become too full |
7c673cae FG |
390 | """ |
391 | pool_capacity = None | |
392 | REQUIRE_MEMSTORE = True | |
393 | ||
394 | def setUp(self): | |
395 | super(TestClusterFull, self).setUp() | |
396 | ||
397 | if self.pool_capacity is None: | |
522d829b | 398 | TestClusterFull.pool_capacity = self.fs.get_pool_df(self._data_pool_name())['max_avail'] |
e306af50 | 399 | TestClusterFull.fill_mb = (self.pool_capacity // (1024 * 1024)) |
7c673cae | 400 | |
7c673cae FG |
401 | # Hide the parent class so that unittest.loader doesn't try to run it. |
402 | del globals()['FullnessTestCase'] |