]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | |
2 | ||
3 | import json | |
4 | import logging | |
5 | import os | |
6 | from textwrap import dedent | |
7 | import time | |
8 | from teuthology.orchestra.run import CommandFailedError | |
9 | from tasks.cephfs.fuse_mount import FuseMount | |
10 | from tasks.cephfs.cephfs_test_case import CephFSTestCase | |
11 | ||
12 | ||
13 | log = logging.getLogger(__name__) | |
14 | ||
15 | ||
16 | class FullnessTestCase(CephFSTestCase): | |
17 | CLIENTS_REQUIRED = 2 | |
18 | ||
19 | # Subclasses define whether they're filling whole cluster or just data pool | |
20 | data_only = False | |
21 | ||
22 | # Subclasses define how many bytes should be written to achieve fullness | |
23 | pool_capacity = None | |
24 | fill_mb = None | |
25 | ||
26 | # Subclasses define what fullness means to them | |
27 | def is_full(self): | |
28 | raise NotImplementedError() | |
29 | ||
30 | def setUp(self): | |
31 | CephFSTestCase.setUp(self) | |
32 | ||
33 | # These tests just use a single active MDS throughout, so remember its ID | |
34 | # for use in mds_asok calls | |
35 | self.active_mds_id = self.fs.get_active_names()[0] | |
36 | ||
37 | # Capture the initial OSD map epoch for later use | |
38 | self.initial_osd_epoch = json.loads( | |
39 | self.fs.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json").strip() | |
40 | )['epoch'] | |
41 | ||
42 | # Check the initial barrier epoch on the MDS: this should be | |
43 | # set to the latest map at MDS startup. We do this check in | |
44 | # setUp to get in there before subclasses might touch things | |
45 | # in their own setUp functions. | |
46 | self.assertGreaterEqual(self.fs.mds_asok(["status"], mds_id=self.active_mds_id)['osdmap_epoch_barrier'], | |
47 | self.initial_osd_epoch) | |
48 | ||
49 | def test_barrier(self): | |
50 | """ | |
51 | That when an OSD epoch barrier is set on an MDS, subsequently | |
52 | issued capabilities cause clients to update their OSD map to that | |
53 | epoch. | |
54 | """ | |
55 | ||
56 | # Sync up clients with initial MDS OSD map barrier | |
57 | self.mount_a.open_no_data("foo") | |
58 | self.mount_b.open_no_data("bar") | |
59 | ||
60 | # Grab mounts' initial OSD epochs: later we will check that | |
61 | # it hasn't advanced beyond this point. | |
62 | mount_a_initial_epoch = self.mount_a.get_osd_epoch()[0] | |
63 | mount_b_initial_epoch = self.mount_b.get_osd_epoch()[0] | |
64 | ||
65 | # Freshly mounted at start of test, should be up to date with OSD map | |
66 | self.assertGreaterEqual(mount_a_initial_epoch, self.initial_osd_epoch) | |
67 | self.assertGreaterEqual(mount_b_initial_epoch, self.initial_osd_epoch) | |
68 | ||
69 | # Set and unset a flag to cause OSD epoch to increment | |
70 | self.fs.mon_manager.raw_cluster_cmd("osd", "set", "pause") | |
71 | self.fs.mon_manager.raw_cluster_cmd("osd", "unset", "pause") | |
72 | ||
73 | out = self.fs.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json").strip() | |
74 | new_epoch = json.loads(out)['epoch'] | |
75 | self.assertNotEqual(self.initial_osd_epoch, new_epoch) | |
76 | ||
77 | # Do a metadata operation on clients, witness that they end up with | |
78 | # the old OSD map from startup time (nothing has prompted client | |
79 | # to update its map) | |
80 | self.mount_a.open_no_data("alpha") | |
81 | self.mount_b.open_no_data("bravo1") | |
82 | ||
83 | # Sleep long enough that if the OSD map was propagating it would | |
84 | # have done so (this is arbitrary because we are 'waiting' for something | |
85 | # to *not* happen). | |
86 | time.sleep(30) | |
87 | ||
88 | mount_a_epoch, mount_a_barrier = self.mount_a.get_osd_epoch() | |
89 | self.assertEqual(mount_a_epoch, mount_a_initial_epoch) | |
90 | mount_b_epoch, mount_b_barrier = self.mount_b.get_osd_epoch() | |
91 | self.assertEqual(mount_b_epoch, mount_b_initial_epoch) | |
92 | ||
93 | # Set a barrier on the MDS | |
94 | self.fs.mds_asok(["osdmap", "barrier", new_epoch.__str__()], mds_id=self.active_mds_id) | |
95 | ||
96 | # Do an operation on client B, witness that it ends up with | |
97 | # the latest OSD map from the barrier. This shouldn't generate any | |
98 | # cap revokes to A because B was already the last one to touch | |
99 | # a file in root. | |
100 | self.mount_b.run_shell(["touch", "bravo2"]) | |
101 | self.mount_b.open_no_data("bravo2") | |
102 | ||
103 | # Some time passes here because the metadata part of the operation | |
104 | # completes immediately, while the resulting OSD map update happens | |
105 | # asynchronously (it's an Objecter::_maybe_request_map) as a result | |
106 | # of seeing the new epoch barrier. | |
107 | self.wait_until_equal( | |
108 | lambda: self.mount_b.get_osd_epoch(), | |
109 | (new_epoch, new_epoch), | |
110 | 30, | |
111 | lambda x: x[0] > new_epoch or x[1] > new_epoch) | |
112 | ||
113 | # ...and none of this should have affected the oblivious mount a, | |
114 | # because it wasn't doing any data or metadata IO | |
115 | mount_a_epoch, mount_a_barrier = self.mount_a.get_osd_epoch() | |
116 | self.assertEqual(mount_a_epoch, mount_a_initial_epoch) | |
117 | ||
118 | def _data_pool_name(self): | |
119 | data_pool_names = self.fs.get_data_pool_names() | |
120 | if len(data_pool_names) > 1: | |
121 | raise RuntimeError("This test can't handle multiple data pools") | |
122 | else: | |
123 | return data_pool_names[0] | |
124 | ||
125 | def _test_full(self, easy_case): | |
126 | """ | |
127 | - That a client trying to write data to a file is prevented | |
128 | from doing so with an -EFULL result | |
129 | - That they are also prevented from creating new files by the MDS. | |
130 | - That they may delete another file to get the system healthy again | |
131 | ||
132 | :param easy_case: if true, delete a successfully written file to | |
133 | free up space. else, delete the file that experienced | |
134 | the failed write. | |
135 | """ | |
136 | ||
137 | osd_mon_report_interval_max = int(self.fs.get_config("osd_mon_report_interval_max", service_type='osd')) | |
138 | ||
139 | log.info("Writing {0}MB should fill this cluster".format(self.fill_mb)) | |
140 | ||
141 | # Fill up the cluster. This dd may or may not fail, as it depends on | |
142 | # how soon the cluster recognises its own fullness | |
143 | self.mount_a.write_n_mb("large_file_a", self.fill_mb / 2) | |
144 | try: | |
145 | self.mount_a.write_n_mb("large_file_b", self.fill_mb / 2) | |
146 | except CommandFailedError: | |
147 | log.info("Writing file B failed (full status happened already)") | |
148 | assert self.is_full() | |
149 | else: | |
150 | log.info("Writing file B succeeded (full status will happen soon)") | |
151 | self.wait_until_true(lambda: self.is_full(), | |
152 | timeout=osd_mon_report_interval_max * 5) | |
153 | ||
154 | # Attempting to write more data should give me ENOSPC | |
155 | with self.assertRaises(CommandFailedError) as ar: | |
156 | self.mount_a.write_n_mb("large_file_b", 50, seek=self.fill_mb / 2) | |
157 | self.assertEqual(ar.exception.exitstatus, 1) # dd returns 1 on "No space" | |
158 | ||
159 | # Wait for the MDS to see the latest OSD map so that it will reliably | |
160 | # be applying the policy of rejecting non-deletion metadata operations | |
161 | # while in the full state. | |
162 | osd_epoch = json.loads(self.fs.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['epoch'] | |
163 | self.wait_until_true( | |
164 | lambda: self.fs.mds_asok(['status'], mds_id=self.active_mds_id)['osdmap_epoch'] >= osd_epoch, | |
165 | timeout=10) | |
166 | ||
167 | if not self.data_only: | |
168 | with self.assertRaises(CommandFailedError): | |
169 | self.mount_a.write_n_mb("small_file_1", 0) | |
170 | ||
171 | # Clear out some space | |
172 | if easy_case: | |
173 | self.mount_a.run_shell(['rm', '-f', 'large_file_a']) | |
174 | self.mount_a.run_shell(['rm', '-f', 'large_file_b']) | |
175 | else: | |
176 | # In the hard case it is the file that filled the system. | |
177 | # Before the new #7317 (ENOSPC, epoch barrier) changes, this | |
178 | # would fail because the last objects written would be | |
179 | # stuck in the client cache as objecter operations. | |
180 | self.mount_a.run_shell(['rm', '-f', 'large_file_b']) | |
181 | self.mount_a.run_shell(['rm', '-f', 'large_file_a']) | |
182 | ||
183 | # Here we are waiting for two things to happen: | |
184 | # * The MDS to purge the stray folder and execute object deletions | |
185 | # * The OSDs to inform the mon that they are no longer full | |
186 | self.wait_until_true(lambda: not self.is_full(), | |
187 | timeout=osd_mon_report_interval_max * 5) | |
188 | ||
189 | # Wait for the MDS to see the latest OSD map so that it will reliably | |
190 | # be applying the free space policy | |
191 | osd_epoch = json.loads(self.fs.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['epoch'] | |
192 | self.wait_until_true( | |
193 | lambda: self.fs.mds_asok(['status'], mds_id=self.active_mds_id)['osdmap_epoch'] >= osd_epoch, | |
194 | timeout=10) | |
195 | ||
196 | # Now I should be able to write again | |
197 | self.mount_a.write_n_mb("large_file", 50, seek=0) | |
198 | ||
199 | # Ensure that the MDS keeps its OSD epoch barrier across a restart | |
200 | ||
201 | def test_full_different_file(self): | |
202 | self._test_full(True) | |
203 | ||
204 | def test_full_same_file(self): | |
205 | self._test_full(False) | |
206 | ||
207 | def _remote_write_test(self, template): | |
208 | """ | |
209 | Run some remote python in a way that's useful for | |
210 | testing free space behaviour (see test_* methods using this) | |
211 | """ | |
212 | file_path = os.path.join(self.mount_a.mountpoint, "full_test_file") | |
213 | ||
214 | # Enough to trip the full flag | |
215 | osd_mon_report_interval_max = int(self.fs.get_config("osd_mon_report_interval_max", service_type='osd')) | |
216 | mon_tick_interval = int(self.fs.get_config("mon_tick_interval", service_type="mon")) | |
217 | ||
218 | # Sufficient data to cause RADOS cluster to go 'full' | |
219 | log.info("pool capacity {0}, {1}MB should be enough to fill it".format(self.pool_capacity, self.fill_mb)) | |
220 | ||
221 | # Long enough for RADOS cluster to notice it is full and set flag on mons | |
222 | # (report_interval for mon to learn PG stats, tick interval for it to update OSD map, | |
223 | # factor of 1.5 for I/O + network latency in committing OSD map and distributing it | |
224 | # to the OSDs) | |
225 | full_wait = (osd_mon_report_interval_max + mon_tick_interval) * 1.5 | |
226 | ||
227 | # Configs for this test should bring this setting down in order to | |
228 | # run reasonably quickly | |
229 | if osd_mon_report_interval_max > 10: | |
230 | log.warn("This test may run rather slowly unless you decrease" | |
231 | "osd_mon_report_interval_max (5 is a good setting)!") | |
232 | ||
233 | self.mount_a.run_python(template.format( | |
234 | fill_mb=self.fill_mb, | |
235 | file_path=file_path, | |
236 | full_wait=full_wait, | |
237 | is_fuse=isinstance(self.mount_a, FuseMount) | |
238 | )) | |
239 | ||
240 | def test_full_fclose(self): | |
241 | # A remote script which opens a file handle, fills up the filesystem, and then | |
242 | # checks that ENOSPC errors on buffered writes appear correctly as errors in fsync | |
243 | remote_script = dedent(""" | |
244 | import time | |
245 | import datetime | |
246 | import subprocess | |
247 | import os | |
248 | ||
249 | # Write some buffered data through before going full, all should be well | |
250 | print "writing some data through which we expect to succeed" | |
251 | bytes = 0 | |
252 | f = os.open("{file_path}", os.O_WRONLY | os.O_CREAT) | |
253 | bytes += os.write(f, 'a' * 4096) | |
254 | os.fsync(f) | |
255 | print "fsync'ed data successfully, will now attempt to fill fs" | |
256 | ||
257 | # Okay, now we're going to fill up the filesystem, and then keep | |
258 | # writing until we see an error from fsync. As long as we're doing | |
259 | # buffered IO, the error should always only appear from fsync and not | |
260 | # from write | |
261 | full = False | |
262 | ||
263 | for n in range(0, {fill_mb}): | |
264 | bytes += os.write(f, 'x' * 1024 * 1024) | |
265 | print "wrote bytes via buffered write, may repeat" | |
266 | print "done writing bytes" | |
267 | ||
268 | # OK, now we should sneak in under the full condition | |
269 | # due to the time it takes the OSDs to report to the | |
270 | # mons, and get a successful fsync on our full-making data | |
271 | os.fsync(f) | |
272 | print "successfully fsync'ed prior to getting full state reported" | |
273 | ||
274 | # Now wait for the full flag to get set so that our | |
275 | # next flush IO will fail | |
276 | time.sleep(30) | |
277 | ||
278 | # A buffered IO, should succeed | |
279 | print "starting buffered write we expect to succeed" | |
280 | os.write(f, 'x' * 4096) | |
281 | print "wrote, now waiting 30s and then doing a close we expect to fail" | |
282 | ||
283 | # Wait long enough for a background flush that should fail | |
284 | time.sleep(30) | |
285 | ||
286 | if {is_fuse}: | |
287 | # ...and check that the failed background flush is reflected in fclose | |
288 | try: | |
289 | os.close(f) | |
290 | except OSError: | |
291 | print "close() returned an error as expected" | |
292 | else: | |
293 | raise RuntimeError("close() failed to raise error") | |
294 | else: | |
295 | # The kernel cephfs client does not raise errors on fclose | |
296 | os.close(f) | |
297 | ||
298 | os.unlink("{file_path}") | |
299 | """) | |
300 | self._remote_write_test(remote_script) | |
301 | ||
302 | def test_full_fsync(self): | |
303 | """ | |
304 | That when the full flag is encountered during asynchronous | |
305 | flushes, such that an fwrite() succeeds but an fsync/fclose() | |
306 | should return the ENOSPC error. | |
307 | """ | |
308 | ||
309 | # A remote script which opens a file handle, fills up the filesystem, and then | |
310 | # checks that ENOSPC errors on buffered writes appear correctly as errors in fsync | |
311 | remote_script = dedent(""" | |
312 | import time | |
313 | import datetime | |
314 | import subprocess | |
315 | import os | |
316 | ||
317 | # Write some buffered data through before going full, all should be well | |
318 | print "writing some data through which we expect to succeed" | |
319 | bytes = 0 | |
320 | f = os.open("{file_path}", os.O_WRONLY | os.O_CREAT) | |
321 | bytes += os.write(f, 'a' * 4096) | |
322 | os.fsync(f) | |
323 | print "fsync'ed data successfully, will now attempt to fill fs" | |
324 | ||
325 | # Okay, now we're going to fill up the filesystem, and then keep | |
326 | # writing until we see an error from fsync. As long as we're doing | |
327 | # buffered IO, the error should always only appear from fsync and not | |
328 | # from write | |
329 | full = False | |
330 | ||
331 | for n in range(0, {fill_mb} + 1): | |
332 | try: | |
333 | bytes += os.write(f, 'x' * 1024 * 1024) | |
334 | print "wrote bytes via buffered write, moving on to fsync" | |
335 | except OSError as e: | |
336 | print "Unexpected error %s from write() instead of fsync()" % e | |
337 | raise | |
338 | ||
339 | try: | |
340 | os.fsync(f) | |
341 | print "fsync'ed successfully" | |
342 | except OSError as e: | |
343 | print "Reached fullness after %.2f MB" % (bytes / (1024.0 * 1024.0)) | |
344 | full = True | |
345 | break | |
346 | else: | |
347 | print "Not full yet after %.2f MB" % (bytes / (1024.0 * 1024.0)) | |
348 | ||
349 | if n > {fill_mb} * 0.8: | |
350 | # Be cautious in the last region where we expect to hit | |
351 | # the full condition, so that we don't overshoot too dramatically | |
352 | print "sleeping a bit as we've exceeded 80% of our expected full ratio" | |
353 | time.sleep({full_wait}) | |
354 | ||
355 | if not full: | |
356 | raise RuntimeError("Failed to reach fullness after writing %d bytes" % bytes) | |
357 | ||
358 | # close() should not raise an error because we already caught it in | |
359 | # fsync. There shouldn't have been any more writeback errors | |
360 | # since then because all IOs got cancelled on the full flag. | |
361 | print "calling close" | |
362 | os.close(f) | |
363 | print "close() did not raise error" | |
364 | ||
365 | os.unlink("{file_path}") | |
366 | """) | |
367 | ||
368 | self._remote_write_test(remote_script) | |
369 | ||
370 | ||
371 | class TestQuotaFull(FullnessTestCase): | |
372 | """ | |
373 | Test per-pool fullness, which indicates quota limits exceeded | |
374 | """ | |
375 | pool_capacity = 1024 * 1024 * 32 # arbitrary low-ish limit | |
376 | fill_mb = pool_capacity / (1024 * 1024) | |
377 | ||
378 | # We are only testing quota handling on the data pool, not the metadata | |
379 | # pool. | |
380 | data_only = True | |
381 | ||
382 | def setUp(self): | |
383 | super(TestQuotaFull, self).setUp() | |
384 | ||
385 | pool_name = self.fs.get_data_pool_name() | |
386 | self.fs.mon_manager.raw_cluster_cmd("osd", "pool", "set-quota", pool_name, | |
387 | "max_bytes", "{0}".format(self.pool_capacity)) | |
388 | ||
389 | def is_full(self): | |
b32b8144 | 390 | return self.fs.is_full() |
7c673cae FG |
391 | |
392 | ||
393 | class TestClusterFull(FullnessTestCase): | |
394 | """ | |
b32b8144 | 395 | Test data pool fullness, which indicates that an OSD has become too full |
7c673cae FG |
396 | """ |
397 | pool_capacity = None | |
398 | REQUIRE_MEMSTORE = True | |
399 | ||
400 | def setUp(self): | |
401 | super(TestClusterFull, self).setUp() | |
402 | ||
403 | if self.pool_capacity is None: | |
404 | # This is a hack to overcome weird fluctuations in the reported | |
405 | # `max_avail` attribute of pools that sometimes occurs in between | |
406 | # tests (reason as yet unclear, but this dodges the issue) | |
407 | TestClusterFull.pool_capacity = self.fs.get_pool_df(self._data_pool_name())['max_avail'] | |
408 | TestClusterFull.fill_mb = int(1.05 * (self.pool_capacity / (1024.0 * 1024.0))) | |
409 | ||
410 | def is_full(self): | |
411 | return self.fs.is_full() | |
412 | ||
413 | # Hide the parent class so that unittest.loader doesn't try to run it. | |
414 | del globals()['FullnessTestCase'] |