]> git.proxmox.com Git - ceph.git/blame - ceph/qa/workunits/windows/test_rbd_wnbd.py
import ceph quincy 17.2.6
[ceph.git] / ceph / qa / workunits / windows / test_rbd_wnbd.py
CommitLineData
39ae355f
TL
1import argparse
2import collections
3import functools
4import json
5import logging
6import math
7import os
8import prettytable
9import random
10import subprocess
11import time
12import threading
13import typing
14import uuid
15from concurrent import futures
16
17LOG = logging.getLogger()
18
19parser = argparse.ArgumentParser(description='rbd-wnbd tests')
20parser.add_argument('--test-name',
21 help='The test to be run.',
22 default="RbdFioTest")
23parser.add_argument('--iterations',
24 help='Total number of test iterations',
25 default=1, type=int)
26parser.add_argument('--concurrency',
27 help='The number of tests to run in parallel',
28 default=4, type=int)
29parser.add_argument('--fio-iterations',
30 help='Total number of benchmark iterations per disk.',
31 default=1, type=int)
32parser.add_argument('--fio-workers',
33 help='Total number of fio workers per disk.',
34 default=1, type=int)
35parser.add_argument('--fio-depth',
36 help='The number of concurrent asynchronous operations '
37 'executed per disk',
38 default=64, type=int)
39parser.add_argument('--fio-verify',
40 help='The mechanism used to validate the written '
41 'data. Examples: crc32c, md5, sha1, null, etc. '
42 'If set to null, the written data will not be '
43 'verified.',
44 default='crc32c')
45parser.add_argument('--bs',
46 help='Benchmark block size.',
47 default="2M")
48parser.add_argument('--op',
49 help='Benchmark operation. '
50 'Examples: read, randwrite, rw, etc.',
51 default="rw")
52parser.add_argument('--image-prefix',
53 help='The image name prefix.',
54 default="cephTest-")
55parser.add_argument('--image-size-mb',
56 help='The image size in megabytes.',
57 default=1024, type=int)
58parser.add_argument('--map-timeout',
59 help='Image map timeout.',
60 default=60, type=int)
61parser.add_argument('--skip-enabling-disk', action='store_true',
62 help='If set, the disk will not be turned online and the '
63 'read-only flag will not be removed. Useful when '
64 'the SAN policy is set to "onlineAll".')
65parser.add_argument('--verbose', action='store_true',
66 help='Print info messages.')
67parser.add_argument('--debug', action='store_true',
68 help='Print debug messages.')
69parser.add_argument('--stop-on-error', action='store_true',
70 help='Stop testing when hitting errors.')
71parser.add_argument('--skip-cleanup-on-error', action='store_true',
72 help='Skip cleanup when hitting errors.')
73
74
75class CephTestException(Exception):
76 msg_fmt = "An exception has been encountered."
77
78 def __init__(self, message: str = None, **kwargs):
79 self.kwargs = kwargs
80 if not message:
81 message = self.msg_fmt % kwargs
82 self.message = message
83 super(CephTestException, self).__init__(message)
84
85
86class CommandFailed(CephTestException):
87 msg_fmt = (
88 "Command failed: %(command)s. "
89 "Return code: %(returncode)s. "
90 "Stdout: %(stdout)s. Stderr: %(stderr)s.")
91
92
93class CephTestTimeout(CephTestException):
94 msg_fmt = "Operation timeout."
95
96
97def setup_logging(log_level: int = logging.INFO):
98 handler = logging.StreamHandler()
99 handler.setLevel(log_level)
100
101 log_fmt = '[%(asctime)s] %(levelname)s - %(message)s'
102 formatter = logging.Formatter(log_fmt)
103 handler.setFormatter(formatter)
104
105 LOG.addHandler(handler)
106 LOG.setLevel(logging.DEBUG)
107
108
109def retry_decorator(timeout: int = 60,
110 retry_interval: int = 2,
111 silent_interval: int = 10,
112 additional_details: str = "",
113 retried_exceptions:
114 typing.Union[
115 typing.Type[Exception],
116 collections.abc.Iterable[
117 typing.Type[Exception]]] = Exception):
118 def wrapper(f: typing.Callable[..., typing.Any]):
119 @functools.wraps(f)
120 def inner(*args, **kwargs):
121 tstart: float = time.time()
122 elapsed: float = 0
123 exc = None
124 details = additional_details or "%s failed" % f.__qualname__
125
126 while elapsed < timeout or not timeout:
127 try:
128 return f(*args, **kwargs)
129 except retried_exceptions as ex:
130 exc = ex
131 elapsed = time.time() - tstart
132 if elapsed > silent_interval:
133 level = logging.WARNING
134 else:
135 level = logging.DEBUG
136 LOG.log(level,
137 "Exception: %s. Additional details: %s. "
138 "Time elapsed: %d. Timeout: %d",
139 ex, details, elapsed, timeout)
140
141 time.sleep(retry_interval)
142 elapsed = time.time() - tstart
143
144 msg = (
145 "Operation timed out. Exception: %s. Additional details: %s. "
146 "Time elapsed: %d. Timeout: %d.")
147 raise CephTestTimeout(
148 msg % (exc, details, elapsed, timeout))
149 return inner
150 return wrapper
151
152
153def execute(*args, **kwargs):
154 LOG.debug("Executing: %s", args)
155 result = subprocess.run(
156 args,
157 stdout=subprocess.PIPE,
158 stderr=subprocess.PIPE,
159 **kwargs)
160 LOG.debug("Command %s returned %d.", args, result.returncode)
161 if result.returncode:
162 exc = CommandFailed(
163 command=args, returncode=result.returncode,
164 stdout=result.stdout, stderr=result.stderr)
165 LOG.error(exc)
166 raise exc
167 return result
168
169
170def ps_execute(*args, **kwargs):
171 # Disable PS progress bar, causes issues when invoked remotely.
172 prefix = "$global:ProgressPreference = 'SilentlyContinue' ; "
173 return execute(
174 "powershell.exe", "-NonInteractive",
175 "-Command", prefix, *args, **kwargs)
176
177
178def array_stats(array: list):
179 mean = sum(array) / len(array) if len(array) else 0
180 variance = (sum((i - mean) ** 2 for i in array) / len(array)
181 if len(array) else 0)
182 std_dev = math.sqrt(variance)
183 sorted_array = sorted(array)
184
185 return {
186 'min': min(array) if len(array) else 0,
187 'max': max(array) if len(array) else 0,
188 'sum': sum(array) if len(array) else 0,
189 'mean': mean,
190 'median': sorted_array[len(array) // 2] if len(array) else 0,
191 'max_90': sorted_array[int(len(array) * 0.9)] if len(array) else 0,
192 'min_90': sorted_array[int(len(array) * 0.1)] if len(array) else 0,
193 'variance': variance,
194 'std_dev': std_dev,
195 'count': len(array)
196 }
197
198
199class Tracer:
200 data: collections.OrderedDict = collections.OrderedDict()
201 lock = threading.Lock()
202
203 @classmethod
204 def trace(cls, func):
205 def wrapper(*args, **kwargs):
206 tstart = time.time()
207 exc_str = None
208
209 # Preserve call order
210 with cls.lock:
211 if func.__qualname__ not in cls.data:
212 cls.data[func.__qualname__] = list()
213
214 try:
215 return func(*args, **kwargs)
216 except Exception as exc:
217 exc_str = str(exc)
218 raise
219 finally:
220 tend = time.time()
221
222 with cls.lock:
223 cls.data[func.__qualname__] += [{
224 "duration": tend - tstart,
225 "error": exc_str,
226 }]
227
228 return wrapper
229
230 @classmethod
231 def get_results(cls):
232 stats = collections.OrderedDict()
233 for f in cls.data.keys():
234 stats[f] = array_stats([i['duration'] for i in cls.data[f]])
235 errors = []
236 for i in cls.data[f]:
237 if i['error']:
238 errors.append(i['error'])
239
240 stats[f]['errors'] = errors
241 return stats
242
243 @classmethod
244 def print_results(cls):
245 r = cls.get_results()
246
247 table = prettytable.PrettyTable(title="Duration (s)")
248 table.field_names = [
249 "function", "min", "max", "total",
250 "mean", "median", "std_dev",
251 "max 90%", "min 90%", "count", "errors"]
252 table.float_format = ".4"
253 for f, s in r.items():
254 table.add_row([f, s['min'], s['max'], s['sum'],
255 s['mean'], s['median'], s['std_dev'],
256 s['max_90'], s['min_90'],
257 s['count'], len(s['errors'])])
258 print(table)
259
260
261class RbdImage(object):
262 def __init__(self,
263 name: str,
264 size_mb: int,
265 is_shared: bool = True,
266 disk_number: int = -1,
267 mapped: bool = False):
268 self.name = name
269 self.size_mb = size_mb
270 self.is_shared = is_shared
271 self.disk_number = disk_number
272 self.mapped = mapped
273 self.removed = False
274 self.drive_letter = ""
275
276 @classmethod
277 @Tracer.trace
278 def create(cls,
279 name: str,
280 size_mb: int = 1024,
281 is_shared: bool = True):
282 LOG.info("Creating image: %s. Size: %s.", name, "%sM" % size_mb)
283 cmd = ["rbd", "create", name, "--size", "%sM" % size_mb]
284 if is_shared:
285 cmd += ["--image-shared"]
286 execute(*cmd)
287
288 return RbdImage(name, size_mb, is_shared)
289
290 @Tracer.trace
291 def get_disk_number(self,
292 timeout: int = 60,
293 retry_interval: int = 2):
294 @retry_decorator(
295 retried_exceptions=CephTestException,
296 timeout=timeout,
297 retry_interval=retry_interval)
298 def _get_disk_number():
299 LOG.info("Retrieving disk number: %s", self.name)
300
301 result = execute("rbd-wnbd", "show", self.name, "--format=json")
302 disk_info = json.loads(result.stdout)
303 disk_number = disk_info["disk_number"]
304 if disk_number > 0:
305 LOG.debug("Image %s disk number: %d", self.name, disk_number)
306 return disk_number
307
308 raise CephTestException(
309 f"Could not get disk number: {self.name}.")
310
311 return _get_disk_number()
312
313 @Tracer.trace
314 def _wait_for_disk(self,
315 timeout: int = 60,
316 retry_interval: int = 2):
317 @retry_decorator(
318 retried_exceptions=(FileNotFoundError, OSError),
319 additional_details="the mapped disk isn't available yet",
320 timeout=timeout,
321 retry_interval=retry_interval)
322 def wait_for_disk():
323 LOG.debug("Waiting for disk to be accessible: %s %s",
324 self.name, self.path)
325
326 with open(self.path, 'rb'):
327 pass
328
329 return wait_for_disk()
330
331 @property
332 def path(self):
333 return f"\\\\.\\PhysicalDrive{self.disk_number}"
334
335 @Tracer.trace
336 @retry_decorator(additional_details="couldn't clear disk read-only flag")
337 def set_writable(self):
338 ps_execute(
339 "Set-Disk", "-Number", str(self.disk_number),
340 "-IsReadOnly", "$false")
341
342 @Tracer.trace
343 @retry_decorator(additional_details="couldn't bring the disk online")
344 def set_online(self):
345 ps_execute(
346 "Set-Disk", "-Number", str(self.disk_number),
347 "-IsOffline", "$false")
348
349 @Tracer.trace
350 def map(self, timeout: int = 60):
351 LOG.info("Mapping image: %s", self.name)
352 tstart = time.time()
353
354 execute("rbd-wnbd", "map", self.name)
355 self.mapped = True
356
357 self.disk_number = self.get_disk_number(timeout=timeout)
358
359 elapsed = time.time() - tstart
360 self._wait_for_disk(timeout=timeout - elapsed)
361
362 @Tracer.trace
363 def unmap(self):
364 if self.mapped:
365 LOG.info("Unmapping image: %s", self.name)
366 execute("rbd-wnbd", "unmap", self.name)
367 self.mapped = False
368
369 @Tracer.trace
370 def remove(self):
371 if not self.removed:
372 LOG.info("Removing image: %s", self.name)
373 execute("rbd", "rm", self.name)
374 self.removed = True
375
376 def cleanup(self):
377 try:
378 self.unmap()
379 finally:
380 self.remove()
381
382 @Tracer.trace
383 @retry_decorator()
384 def _init_disk(self):
385 cmd = f"Get-Disk -Number {self.disk_number} | Initialize-Disk"
386 ps_execute(cmd)
387
388 @Tracer.trace
389 @retry_decorator()
390 def _create_partition(self):
391 cmd = (f"Get-Disk -Number {self.disk_number} | "
392 "New-Partition -AssignDriveLetter -UseMaximumSize")
393 ps_execute(cmd)
394
395 @Tracer.trace
396 @retry_decorator()
397 def _format_volume(self):
398 cmd = (
399 f"(Get-Partition -DiskNumber {self.disk_number}"
400 " | ? { $_.DriveLetter }) | Format-Volume -Force -Confirm:$false")
401 ps_execute(cmd)
402
403 @Tracer.trace
404 @retry_decorator()
405 def _get_drive_letter(self):
406 cmd = (f"(Get-Partition -DiskNumber {self.disk_number}"
407 " | ? { $_.DriveLetter }).DriveLetter")
408 result = ps_execute(cmd)
409
410 # The PowerShell command will place a null character if no drive letter
411 # is available. For example, we can receive "\x00\r\n".
412 self.drive_letter = result.stdout.decode().strip()
413 if not self.drive_letter.isalpha() or len(self.drive_letter) != 1:
414 raise CephTestException(
415 "Invalid drive letter received: %s" % self.drive_letter)
416
417 @Tracer.trace
418 def init_fs(self):
419 if not self.mapped:
420 raise CephTestException("Unable to create fs, image not mapped.")
421
422 LOG.info("Initializing fs, image: %s.", self.name)
423
424 self._init_disk()
425 self._create_partition()
426 self._format_volume()
427 self._get_drive_letter()
428
429 @Tracer.trace
430 def get_fs_capacity(self):
431 if not self.drive_letter:
432 raise CephTestException("No drive letter available")
433
434 cmd = f"(Get-Volume -DriveLetter {self.drive_letter}).Size"
435 result = ps_execute(cmd)
436
437 return int(result.stdout.decode().strip())
438
439 @Tracer.trace
440 def resize(self, new_size_mb, allow_shrink=False):
441 LOG.info(
442 "Resizing image: %s. New size: %s MB, old size: %s MB",
443 self.name, new_size_mb, self.size_mb)
444
445 cmd = ["rbd", "resize", self.name,
446 "--size", f"{new_size_mb}M", "--no-progress"]
447 if allow_shrink:
448 cmd.append("--allow-shrink")
449
450 execute(*cmd)
451
452 self.size_mb = new_size_mb
453
454 @Tracer.trace
455 def get_disk_size(self):
456 """Retrieve the virtual disk size (bytes) reported by Windows."""
457 cmd = f"(Get-Disk -Number {self.disk_number}).Size"
458 result = ps_execute(cmd)
459
460 disk_size = result.stdout.decode().strip()
461 if not disk_size.isdigit():
462 raise CephTestException(
463 "Invalid disk size received: %s" % disk_size)
464
465 return int(disk_size)
466
467 @Tracer.trace
468 @retry_decorator(timeout=30)
469 def wait_for_disk_resize(self):
470 # After resizing the rbd image, the daemon is expected to receive
471 # the notification, inform the WNBD driver and then trigger a disk
472 # rescan (IOCTL_DISK_UPDATE_PROPERTIES). This might take a few seconds,
473 # so we'll need to do some polling.
474 disk_size = self.get_disk_size()
475 disk_size_mb = disk_size // (1 << 20)
476
477 if disk_size_mb != self.size_mb:
478 raise CephTestException(
479 "The disk size hasn't been updated yet. Retrieved size: "
480 f"{disk_size_mb}MB. Expected size: {self.size_mb}MB.")
481
482
483class RbdTest(object):
484 image: RbdImage
485
486 requires_disk_online = False
487 requires_disk_write = False
488
489 def __init__(self,
490 image_prefix: str = "cephTest-",
491 image_size_mb: int = 1024,
492 map_timeout: int = 60,
493 **kwargs):
494 self.image_size_mb = image_size_mb
495 self.image_name = image_prefix + str(uuid.uuid4())
496 self.map_timeout = map_timeout
497 self.skip_enabling_disk = kwargs.get("skip_enabling_disk")
498
499 @Tracer.trace
500 def initialize(self):
501 self.image = RbdImage.create(
502 self.image_name,
503 self.image_size_mb)
504 self.image.map(timeout=self.map_timeout)
505
506 if not self.skip_enabling_disk:
507 if self.requires_disk_write:
508 self.image.set_writable()
509
510 if self.requires_disk_online:
511 self.image.set_online()
512
513 def run(self):
514 pass
515
516 def cleanup(self):
517 if self.image:
518 self.image.cleanup()
519
520 @classmethod
521 def print_results(cls,
522 title: str = "Test results",
523 description: str = None):
524 pass
525
526
527class RbdFsTestMixin(object):
528 # Windows disks must be turned online before accessing partitions.
529 requires_disk_online = True
530 requires_disk_write = True
531
532 @Tracer.trace
533 def initialize(self):
534 super(RbdFsTestMixin, self).initialize()
535
536 self.image.init_fs()
537
538 def get_subpath(self, *args):
539 drive_path = f"{self.image.drive_letter}:\\"
540 return os.path.join(drive_path, *args)
541
542
543class RbdFsTest(RbdFsTestMixin, RbdTest):
544 pass
545
546
547class RbdFioTest(RbdTest):
548 data: typing.DefaultDict[str, typing.List[typing.Dict[str, str]]] = (
549 collections.defaultdict(list))
550 lock = threading.Lock()
551
552 def __init__(self,
553 *args,
554 fio_size_mb: int = None,
555 iterations: int = 1,
556 workers: int = 1,
557 bs: str = "2M",
558 iodepth: int = 64,
559 op: str = "rw",
560 verify: str = "crc32c",
561 **kwargs):
562
563 super(RbdFioTest, self).__init__(*args, **kwargs)
564
565 self.fio_size_mb = fio_size_mb or self.image_size_mb
566 self.iterations = iterations
567 self.workers = workers
568 self.bs = bs
569 self.iodepth = iodepth
570 self.op = op
571 if op not in ("read", "randread"):
572 self.requires_disk_write = True
573 self.verify = verify
574
575 def process_result(self, raw_fio_output: str):
576 result = json.loads(raw_fio_output)
577 with self.lock:
578 for job in result["jobs"]:
579 # Fio doesn't support trim on Windows
580 for op in ['read', 'write']:
581 if op in job:
582 self.data[op].append({
583 'error': job['error'],
584 'io_bytes': job[op]['io_bytes'],
585 'bw_bytes': job[op]['bw_bytes'],
586 'runtime': job[op]['runtime'] / 1000, # seconds
587 'total_ios': job[op]['short_ios'],
588 'short_ios': job[op]['short_ios'],
589 'dropped_ios': job[op]['short_ios'],
590 'clat_ns_min': job[op]['clat_ns']['min'],
591 'clat_ns_max': job[op]['clat_ns']['max'],
592 'clat_ns_mean': job[op]['clat_ns']['mean'],
593 'clat_ns_stddev': job[op]['clat_ns']['stddev'],
594 'clat_ns_10': job[op].get('clat_ns', {})
595 .get('percentile', {})
596 .get('10.000000', 0),
597 'clat_ns_90': job[op].get('clat_ns', {})
598 .get('percentile', {})
599 .get('90.000000', 0)
600 })
601
602 def _get_fio_path(self):
603 return self.image.path
604
605 @Tracer.trace
606 def _run_fio(self, fio_size_mb=None):
607 LOG.info("Starting FIO test.")
608 cmd = [
609 "fio", "--thread", "--output-format=json",
610 "--randrepeat=%d" % self.iterations,
611 "--direct=1", "--name=test",
612 "--bs=%s" % self.bs, "--iodepth=%s" % self.iodepth,
613 "--size=%sM" % (fio_size_mb or self.fio_size_mb),
614 "--readwrite=%s" % self.op,
615 "--numjobs=%s" % self.workers,
616 "--filename=%s" % self._get_fio_path(),
617 ]
618 if self.verify:
619 cmd += ["--verify=%s" % self.verify]
620 result = execute(*cmd)
621 LOG.info("Completed FIO test.")
622 self.process_result(result.stdout)
623
624 @Tracer.trace
625 def run(self):
626 self._run_fio()
627
628 @classmethod
629 def print_results(cls,
630 title: str = "Benchmark results",
631 description: str = None):
632 if description:
633 title = "%s (%s)" % (title, description)
634
635 for op in cls.data.keys():
636 op_title = "%s op=%s" % (title, op)
637
638 table = prettytable.PrettyTable(title=op_title)
639 table.field_names = ["stat", "min", "max", "mean",
640 "median", "std_dev",
641 "max 90%", "min 90%", "total"]
642 table.float_format = ".4"
643
644 op_data = cls.data[op]
645
646 s = array_stats([float(i["bw_bytes"]) / 1000_000 for i in op_data])
647 table.add_row(["bandwidth (MB/s)",
648 s['min'], s['max'], s['mean'],
649 s['median'], s['std_dev'],
650 s['max_90'], s['min_90'], 'N/A'])
651
652 s = array_stats([float(i["runtime"]) for i in op_data])
653 table.add_row(["duration (s)",
654 s['min'], s['max'], s['mean'],
655 s['median'], s['std_dev'],
656 s['max_90'], s['min_90'], s['sum']])
657
658 s = array_stats([i["error"] for i in op_data])
659 table.add_row(["errors",
660 s['min'], s['max'], s['mean'],
661 s['median'], s['std_dev'],
662 s['max_90'], s['min_90'], s['sum']])
663
664 s = array_stats([i["short_ios"] for i in op_data])
665 table.add_row(["incomplete IOs",
666 s['min'], s['max'], s['mean'],
667 s['median'], s['std_dev'],
668 s['max_90'], s['min_90'], s['sum']])
669
670 s = array_stats([i["dropped_ios"] for i in op_data])
671 table.add_row(["dropped IOs",
672 s['min'], s['max'], s['mean'],
673 s['median'], s['std_dev'],
674 s['max_90'], s['min_90'], s['sum']])
675
676 clat_min = array_stats([i["clat_ns_min"] for i in op_data])
677 clat_max = array_stats([i["clat_ns_max"] for i in op_data])
678 clat_mean = array_stats([i["clat_ns_mean"] for i in op_data])
679 clat_stddev = math.sqrt(
680 sum([float(i["clat_ns_stddev"]) ** 2 for i in op_data]) / len(op_data)
681 if len(op_data) else 0)
682 clat_10 = array_stats([i["clat_ns_10"] for i in op_data])
683 clat_90 = array_stats([i["clat_ns_90"] for i in op_data])
684 # For convenience, we'll convert it from ns to seconds.
685 table.add_row(["completion latency (s)",
686 clat_min['min'] / 1e+9,
687 clat_max['max'] / 1e+9,
688 clat_mean['mean'] / 1e+9,
689 clat_mean['median'] / 1e+9,
690 clat_stddev / 1e+9,
691 clat_10['mean'] / 1e+9,
692 clat_90['mean'] / 1e+9,
693 clat_mean['sum'] / 1e+9])
694 print(table)
695
696
697class RbdResizeFioTest(RbdFioTest):
698 """Image resize test.
699
700 This test extends and then shrinks the image, performing FIO tests to
701 validate the resized image.
702 """
703
704 @Tracer.trace
705 def run(self):
706 self.image.resize(self.image_size_mb * 2)
707 self.image.wait_for_disk_resize()
708
709 self._run_fio(fio_size_mb=self.image_size_mb * 2)
710
711 self.image.resize(self.image_size_mb // 2, allow_shrink=True)
712 self.image.wait_for_disk_resize()
713
714 self._run_fio(fio_size_mb=self.image_size_mb // 2)
715
716 # Just like rbd-nbd, rbd-wnbd is masking out-of-bounds errors.
717 # For this reason, we don't have a negative test that writes
718 # passed the disk boundary.
719
720
721class RbdFsFioTest(RbdFsTestMixin, RbdFioTest):
722 def initialize(self):
723 super(RbdFsFioTest, self).initialize()
724
725 if not self.fio_size_mb or self.fio_size_mb == self.image_size_mb:
726 # Out of caution, we'll use up to 80% of the FS by default
727 self.fio_size_mb = int(
728 self.image.get_fs_capacity() * 0.8 / (1024 * 1024))
729
730 @staticmethod
731 def _fio_escape_path(path):
732 # FIO allows specifying multiple files separated by colon.
733 # This means that ":" has to be escaped, so
734 # F:\filename becomes F\:\filename.
735 return path.replace(":", "\\:")
736
737 def _get_fio_path(self):
738 return self._fio_escape_path(self.get_subpath("test-fio"))
739
740
741class RbdStampTest(RbdTest):
742 requires_disk_write = True
743
744 _write_open_mode = "rb+"
745 _read_open_mode = "rb"
746 _expect_path_exists = True
747
748 @staticmethod
749 def _rand_float(min_val: float, max_val: float):
750 return min_val + (random.random() * max_val - min_val)
751
752 def _get_stamp(self):
753 buff = self.image_name.encode()
754 padding = 512 - len(buff)
755 buff += b'\0' * padding
756 return buff
757
758 def _get_stamp_path(self):
759 return self.image.path
760
761 @Tracer.trace
762 def _write_stamp(self):
763 with open(self._get_stamp_path(), self._write_open_mode) as disk:
764 stamp = self._get_stamp()
765 disk.write(stamp)
766
767 @Tracer.trace
768 def _read_stamp(self):
769 with open(self._get_stamp_path(), self._read_open_mode) as disk:
770 return disk.read(len(self._get_stamp()))
771
772 @Tracer.trace
773 def run(self):
774 if self._expect_path_exists:
775 # Wait up to 5 seconds and then check the disk, ensuring that
776 # nobody else wrote to it. This is particularly useful when
777 # running a high number of tests in parallel, ensuring that
778 # we aren't writing to the wrong disk.
779 time.sleep(self._rand_float(0, 5))
780
781 stamp = self._read_stamp()
782 assert stamp == b'\0' * len(self._get_stamp())
783
784 self._write_stamp()
785
786 stamp = self._read_stamp()
787 assert stamp == self._get_stamp()
788
789
790class RbdFsStampTest(RbdFsTestMixin, RbdStampTest):
791 _write_open_mode = "wb"
792 _expect_path_exists = False
793
794 def _get_stamp_path(self):
795 return self.get_subpath("test-stamp")
796
797
798class TestRunner(object):
799 def __init__(self,
800 test_cls: typing.Type[RbdTest],
801 test_params: dict = {},
802 iterations: int = 1,
803 workers: int = 1,
804 stop_on_error: bool = False,
805 cleanup_on_error: bool = True):
806 self.test_cls = test_cls
807 self.test_params = test_params
808 self.iterations = iterations
809 self.workers = workers
810 self.executor = futures.ThreadPoolExecutor(max_workers=workers)
811 self.lock = threading.Lock()
812 self.completed = 0
813 self.errors = 0
814 self.stopped = False
815 self.stop_on_error = stop_on_error
816 self.cleanup_on_error = cleanup_on_error
817
818 @Tracer.trace
819 def run(self):
820 tasks = []
821 for i in range(self.iterations):
822 task = self.executor.submit(self.run_single_test)
823 tasks.append(task)
824
825 LOG.info("Waiting for %d tests to complete.", self.iterations)
826 for task in tasks:
827 task.result()
828
829 def run_single_test(self):
830 failed = False
831 if self.stopped:
832 return
833
834 try:
835 test = self.test_cls(**self.test_params)
836 test.initialize()
837 test.run()
838 except KeyboardInterrupt:
839 LOG.warning("Received Ctrl-C.")
840 self.stopped = True
841 except Exception as ex:
842 failed = True
843 if self.stop_on_error:
844 self.stopped = True
845 with self.lock:
846 self.errors += 1
847 LOG.exception(
848 "Test exception: %s. Total exceptions: %d",
849 ex, self.errors)
850 finally:
851 if not failed or self.cleanup_on_error:
852 try:
853 test.cleanup()
854 except KeyboardInterrupt:
855 LOG.warning("Received Ctrl-C.")
856 self.stopped = True
857 # Retry the cleanup
858 test.cleanup()
859 except Exception:
860 LOG.exception("Test cleanup failed.")
861
862 with self.lock:
863 self.completed += 1
864 LOG.info("Completed tests: %d. Pending: %d",
865 self.completed, self.iterations - self.completed)
866
867
868TESTS: typing.Dict[str, typing.Type[RbdTest]] = {
869 'RbdTest': RbdTest,
870 'RbdFioTest': RbdFioTest,
871 'RbdResizeFioTest': RbdResizeFioTest,
872 'RbdStampTest': RbdStampTest,
873 # FS tests
874 'RbdFsTest': RbdFsTest,
875 'RbdFsFioTest': RbdFsFioTest,
876 'RbdFsStampTest': RbdFsStampTest,
877}
878
879if __name__ == '__main__':
880 args = parser.parse_args()
881
882 log_level = logging.WARNING
883 if args.verbose:
884 log_level = logging.INFO
885 if args.debug:
886 log_level = logging.DEBUG
887 setup_logging(log_level)
888
889 test_params = dict(
890 image_size_mb=args.image_size_mb,
891 image_prefix=args.image_prefix,
892 bs=args.bs,
893 op=args.op,
894 verify=args.fio_verify,
895 iodepth=args.fio_depth,
896 map_timeout=args.map_timeout,
897 skip_enabling_disk=args.skip_enabling_disk,
898 )
899
900 try:
901 test_cls = TESTS[args.test_name]
902 except KeyError:
903 raise CephTestException("Unkown test: {}".format(args.test_name))
904
905 runner = TestRunner(
906 test_cls,
907 test_params=test_params,
908 iterations=args.iterations,
909 workers=args.concurrency,
910 stop_on_error=args.stop_on_error,
911 cleanup_on_error=not args.skip_cleanup_on_error)
912 runner.run()
913
914 Tracer.print_results()
915 test_cls.print_results(
916 description="count: %d, concurrency: %d" %
917 (args.iterations, args.concurrency))
918
919 assert runner.errors == 0, f"encountered {runner.errors} error(s)."