]> git.proxmox.com Git - ceph.git/blob - ceph/qa/tasks/cephfs/test_full.py
import 15.2.0 Octopus source
[ceph.git] / ceph / qa / tasks / cephfs / test_full.py
1 import json
2 import logging
3 import os
4 from textwrap import dedent
5 import time
6 try:
7 from typing import Optional
8 except:
9 # make it work for python2
10 pass
11 from teuthology.orchestra.run import CommandFailedError
12 from tasks.cephfs.fuse_mount import FuseMount
13 from tasks.cephfs.cephfs_test_case import CephFSTestCase
14
15
16 log = logging.getLogger(__name__)
17
18
19 class FullnessTestCase(CephFSTestCase):
20 CLIENTS_REQUIRED = 2
21
22 # Subclasses define whether they're filling whole cluster or just data pool
23 data_only = False
24
25 # Subclasses define how many bytes should be written to achieve fullness
26 pool_capacity = None # type: Optional[int]
27 fill_mb = None
28
29 # Subclasses define what fullness means to them
30 def is_full(self):
31 raise NotImplementedError()
32
33 def setUp(self):
34 CephFSTestCase.setUp(self)
35
36 mds_status = self.fs.rank_asok(["status"])
37
38 # Capture the initial OSD map epoch for later use
39 self.initial_osd_epoch = mds_status['osdmap_epoch_barrier']
40
41 def test_barrier(self):
42 """
43 That when an OSD epoch barrier is set on an MDS, subsequently
44 issued capabilities cause clients to update their OSD map to that
45 epoch.
46 """
47
48 # Sync up clients with initial MDS OSD map barrier
49 self.mount_a.open_no_data("foo")
50 self.mount_b.open_no_data("bar")
51
52 # Grab mounts' initial OSD epochs: later we will check that
53 # it hasn't advanced beyond this point.
54 mount_a_initial_epoch = self.mount_a.get_osd_epoch()[0]
55 mount_b_initial_epoch = self.mount_b.get_osd_epoch()[0]
56
57 # Freshly mounted at start of test, should be up to date with OSD map
58 self.assertGreaterEqual(mount_a_initial_epoch, self.initial_osd_epoch)
59 self.assertGreaterEqual(mount_b_initial_epoch, self.initial_osd_epoch)
60
61 # Set and unset a flag to cause OSD epoch to increment
62 self.fs.mon_manager.raw_cluster_cmd("osd", "set", "pause")
63 self.fs.mon_manager.raw_cluster_cmd("osd", "unset", "pause")
64
65 out = self.fs.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json").strip()
66 new_epoch = json.loads(out)['epoch']
67 self.assertNotEqual(self.initial_osd_epoch, new_epoch)
68
69 # Do a metadata operation on clients, witness that they end up with
70 # the old OSD map from startup time (nothing has prompted client
71 # to update its map)
72 self.mount_a.open_no_data("alpha")
73 self.mount_b.open_no_data("bravo1")
74
75 # Sleep long enough that if the OSD map was propagating it would
76 # have done so (this is arbitrary because we are 'waiting' for something
77 # to *not* happen).
78 time.sleep(30)
79
80 mount_a_epoch, mount_a_barrier = self.mount_a.get_osd_epoch()
81 self.assertEqual(mount_a_epoch, mount_a_initial_epoch)
82 mount_b_epoch, mount_b_barrier = self.mount_b.get_osd_epoch()
83 self.assertEqual(mount_b_epoch, mount_b_initial_epoch)
84
85 # Set a barrier on the MDS
86 self.fs.rank_asok(["osdmap", "barrier", new_epoch.__str__()])
87
88 # Do an operation on client B, witness that it ends up with
89 # the latest OSD map from the barrier. This shouldn't generate any
90 # cap revokes to A because B was already the last one to touch
91 # a file in root.
92 self.mount_b.run_shell(["touch", "bravo2"])
93 self.mount_b.open_no_data("bravo2")
94
95 # Some time passes here because the metadata part of the operation
96 # completes immediately, while the resulting OSD map update happens
97 # asynchronously (it's an Objecter::_maybe_request_map) as a result
98 # of seeing the new epoch barrier.
99 self.wait_until_equal(
100 lambda: self.mount_b.get_osd_epoch(),
101 (new_epoch, new_epoch),
102 30,
103 lambda x: x[0] > new_epoch or x[1] > new_epoch)
104
105 # ...and none of this should have affected the oblivious mount a,
106 # because it wasn't doing any data or metadata IO
107 mount_a_epoch, mount_a_barrier = self.mount_a.get_osd_epoch()
108 self.assertEqual(mount_a_epoch, mount_a_initial_epoch)
109
110 def _data_pool_name(self):
111 data_pool_names = self.fs.get_data_pool_names()
112 if len(data_pool_names) > 1:
113 raise RuntimeError("This test can't handle multiple data pools")
114 else:
115 return data_pool_names[0]
116
117 def _test_full(self, easy_case):
118 """
119 - That a client trying to write data to a file is prevented
120 from doing so with an -EFULL result
121 - That they are also prevented from creating new files by the MDS.
122 - That they may delete another file to get the system healthy again
123
124 :param easy_case: if true, delete a successfully written file to
125 free up space. else, delete the file that experienced
126 the failed write.
127 """
128
129 osd_mon_report_interval = int(self.fs.get_config("osd_mon_report_interval", service_type='osd'))
130
131 log.info("Writing {0}MB should fill this cluster".format(self.fill_mb))
132
133 # Fill up the cluster. This dd may or may not fail, as it depends on
134 # how soon the cluster recognises its own fullness
135 self.mount_a.write_n_mb("large_file_a", self.fill_mb / 2)
136 try:
137 self.mount_a.write_n_mb("large_file_b", self.fill_mb / 2)
138 except CommandFailedError:
139 log.info("Writing file B failed (full status happened already)")
140 assert self.is_full()
141 else:
142 log.info("Writing file B succeeded (full status will happen soon)")
143 self.wait_until_true(lambda: self.is_full(),
144 timeout=osd_mon_report_interval * 5)
145
146 # Attempting to write more data should give me ENOSPC
147 with self.assertRaises(CommandFailedError) as ar:
148 self.mount_a.write_n_mb("large_file_b", 50, seek=self.fill_mb / 2)
149 self.assertEqual(ar.exception.exitstatus, 1) # dd returns 1 on "No space"
150
151 # Wait for the MDS to see the latest OSD map so that it will reliably
152 # be applying the policy of rejecting non-deletion metadata operations
153 # while in the full state.
154 osd_epoch = json.loads(self.fs.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['epoch']
155 self.wait_until_true(
156 lambda: self.fs.rank_asok(['status'])['osdmap_epoch'] >= osd_epoch,
157 timeout=10)
158
159 if not self.data_only:
160 with self.assertRaises(CommandFailedError):
161 self.mount_a.write_n_mb("small_file_1", 0)
162
163 # Clear out some space
164 if easy_case:
165 self.mount_a.run_shell(['rm', '-f', 'large_file_a'])
166 self.mount_a.run_shell(['rm', '-f', 'large_file_b'])
167 else:
168 # In the hard case it is the file that filled the system.
169 # Before the new #7317 (ENOSPC, epoch barrier) changes, this
170 # would fail because the last objects written would be
171 # stuck in the client cache as objecter operations.
172 self.mount_a.run_shell(['rm', '-f', 'large_file_b'])
173 self.mount_a.run_shell(['rm', '-f', 'large_file_a'])
174
175 # Here we are waiting for two things to happen:
176 # * The MDS to purge the stray folder and execute object deletions
177 # * The OSDs to inform the mon that they are no longer full
178 self.wait_until_true(lambda: not self.is_full(),
179 timeout=osd_mon_report_interval * 5)
180
181 # Wait for the MDS to see the latest OSD map so that it will reliably
182 # be applying the free space policy
183 osd_epoch = json.loads(self.fs.mon_manager.raw_cluster_cmd("osd", "dump", "--format=json-pretty"))['epoch']
184 self.wait_until_true(
185 lambda: self.fs.rank_asok(['status'])['osdmap_epoch'] >= osd_epoch,
186 timeout=10)
187
188 # Now I should be able to write again
189 self.mount_a.write_n_mb("large_file", 50, seek=0)
190
191 # Ensure that the MDS keeps its OSD epoch barrier across a restart
192
193 def test_full_different_file(self):
194 self._test_full(True)
195
196 def test_full_same_file(self):
197 self._test_full(False)
198
199 def _remote_write_test(self, template):
200 """
201 Run some remote python in a way that's useful for
202 testing free space behaviour (see test_* methods using this)
203 """
204 file_path = os.path.join(self.mount_a.mountpoint, "full_test_file")
205
206 # Enough to trip the full flag
207 osd_mon_report_interval = int(self.fs.get_config("osd_mon_report_interval", service_type='osd'))
208 mon_tick_interval = int(self.fs.get_config("mon_tick_interval", service_type="mon"))
209
210 # Sufficient data to cause RADOS cluster to go 'full'
211 log.info("pool capacity {0}, {1}MB should be enough to fill it".format(self.pool_capacity, self.fill_mb))
212
213 # Long enough for RADOS cluster to notice it is full and set flag on mons
214 # (report_interval for mon to learn PG stats, tick interval for it to update OSD map,
215 # factor of 1.5 for I/O + network latency in committing OSD map and distributing it
216 # to the OSDs)
217 full_wait = (osd_mon_report_interval + mon_tick_interval) * 1.5
218
219 # Configs for this test should bring this setting down in order to
220 # run reasonably quickly
221 if osd_mon_report_interval > 10:
222 log.warn("This test may run rather slowly unless you decrease"
223 "osd_mon_report_interval (5 is a good setting)!")
224
225 self.mount_a.run_python(template.format(
226 fill_mb=self.fill_mb,
227 file_path=file_path,
228 full_wait=full_wait,
229 is_fuse=isinstance(self.mount_a, FuseMount)
230 ))
231
232 def test_full_fclose(self):
233 # A remote script which opens a file handle, fills up the filesystem, and then
234 # checks that ENOSPC errors on buffered writes appear correctly as errors in fsync
235 remote_script = dedent("""
236 import time
237 import datetime
238 import subprocess
239 import os
240
241 # Write some buffered data through before going full, all should be well
242 print("writing some data through which we expect to succeed")
243 bytes = 0
244 f = os.open("{file_path}", os.O_WRONLY | os.O_CREAT)
245 bytes += os.write(f, b'a' * 512 * 1024)
246 os.fsync(f)
247 print("fsync'ed data successfully, will now attempt to fill fs")
248
249 # Okay, now we're going to fill up the filesystem, and then keep
250 # writing until we see an error from fsync. As long as we're doing
251 # buffered IO, the error should always only appear from fsync and not
252 # from write
253 full = False
254
255 for n in range(0, int({fill_mb} * 0.9)):
256 bytes += os.write(f, b'x' * 1024 * 1024)
257 print("wrote {{0}} bytes via buffered write, may repeat".format(bytes))
258 print("done writing {{0}} bytes".format(bytes))
259
260 # OK, now we should sneak in under the full condition
261 # due to the time it takes the OSDs to report to the
262 # mons, and get a successful fsync on our full-making data
263 os.fsync(f)
264 print("successfully fsync'ed prior to getting full state reported")
265
266 # buffered write, add more dirty data to the buffer
267 print("starting buffered write")
268 try:
269 for n in range(0, int({fill_mb} * 0.2)):
270 bytes += os.write(f, b'x' * 1024 * 1024)
271 print("sleeping a bit as we've exceeded 90% of our expected full ratio")
272 time.sleep({full_wait})
273 except OSError:
274 pass;
275
276 print("wrote, now waiting 30s and then doing a close we expect to fail")
277
278 # Wait long enough for a background flush that should fail
279 time.sleep(30)
280
281 if {is_fuse}:
282 # ...and check that the failed background flush is reflected in fclose
283 try:
284 os.close(f)
285 except OSError:
286 print("close() returned an error as expected")
287 else:
288 raise RuntimeError("close() failed to raise error")
289 else:
290 # The kernel cephfs client does not raise errors on fclose
291 os.close(f)
292
293 os.unlink("{file_path}")
294 """)
295 self._remote_write_test(remote_script)
296
297 def test_full_fsync(self):
298 """
299 That when the full flag is encountered during asynchronous
300 flushes, such that an fwrite() succeeds but an fsync/fclose()
301 should return the ENOSPC error.
302 """
303
304 # A remote script which opens a file handle, fills up the filesystem, and then
305 # checks that ENOSPC errors on buffered writes appear correctly as errors in fsync
306 remote_script = dedent("""
307 import time
308 import datetime
309 import subprocess
310 import os
311
312 # Write some buffered data through before going full, all should be well
313 print("writing some data through which we expect to succeed")
314 bytes = 0
315 f = os.open("{file_path}", os.O_WRONLY | os.O_CREAT)
316 bytes += os.write(f, b'a' * 4096)
317 os.fsync(f)
318 print("fsync'ed data successfully, will now attempt to fill fs")
319
320 # Okay, now we're going to fill up the filesystem, and then keep
321 # writing until we see an error from fsync. As long as we're doing
322 # buffered IO, the error should always only appear from fsync and not
323 # from write
324 full = False
325
326 for n in range(0, int({fill_mb} * 1.1)):
327 try:
328 bytes += os.write(f, b'x' * 1024 * 1024)
329 print("wrote bytes via buffered write, moving on to fsync")
330 except OSError as e:
331 print("Unexpected error %s from write() instead of fsync()" % e)
332 raise
333
334 try:
335 os.fsync(f)
336 print("fsync'ed successfully")
337 except OSError as e:
338 print("Reached fullness after %.2f MB" % (bytes / (1024.0 * 1024.0)))
339 full = True
340 break
341 else:
342 print("Not full yet after %.2f MB" % (bytes / (1024.0 * 1024.0)))
343
344 if n > {fill_mb} * 0.9:
345 # Be cautious in the last region where we expect to hit
346 # the full condition, so that we don't overshoot too dramatically
347 print("sleeping a bit as we've exceeded 90% of our expected full ratio")
348 time.sleep({full_wait})
349
350 if not full:
351 raise RuntimeError("Failed to reach fullness after writing %d bytes" % bytes)
352
353 # close() should not raise an error because we already caught it in
354 # fsync. There shouldn't have been any more writeback errors
355 # since then because all IOs got cancelled on the full flag.
356 print("calling close")
357 os.close(f)
358 print("close() did not raise error")
359
360 os.unlink("{file_path}")
361 """)
362
363 self._remote_write_test(remote_script)
364
365
366 class TestQuotaFull(FullnessTestCase):
367 """
368 Test per-pool fullness, which indicates quota limits exceeded
369 """
370 pool_capacity = 1024 * 1024 * 32 # arbitrary low-ish limit
371 fill_mb = pool_capacity / (1024 * 1024) # type: ignore
372
373 # We are only testing quota handling on the data pool, not the metadata
374 # pool.
375 data_only = True
376
377 def setUp(self):
378 super(TestQuotaFull, self).setUp()
379
380 pool_name = self.fs.get_data_pool_name()
381 self.fs.mon_manager.raw_cluster_cmd("osd", "pool", "set-quota", pool_name,
382 "max_bytes", "{0}".format(self.pool_capacity))
383
384 def is_full(self):
385 return self.fs.is_full()
386
387
388 class TestClusterFull(FullnessTestCase):
389 """
390 Test data pool fullness, which indicates that an OSD has become too full
391 """
392 pool_capacity = None
393 REQUIRE_MEMSTORE = True
394
395 def setUp(self):
396 super(TestClusterFull, self).setUp()
397
398 if self.pool_capacity is None:
399 max_avail = self.fs.get_pool_df(self._data_pool_name())['max_avail']
400 full_ratio = float(self.fs.get_config("mon_osd_full_ratio", service_type="mon"))
401 TestClusterFull.pool_capacity = int(max_avail * full_ratio)
402 TestClusterFull.fill_mb = (self.pool_capacity / (1024 * 1024))
403
404 def is_full(self):
405 return self.fs.is_full()
406
407 # Hide the parent class so that unittest.loader doesn't try to run it.
408 del globals()['FullnessTestCase']