]>
Commit | Line | Data |
---|---|---|
39ae355f TL |
1 | import argparse |
2 | import collections | |
3 | import functools | |
4 | import json | |
5 | import logging | |
6 | import math | |
7 | import os | |
8 | import prettytable | |
9 | import random | |
10 | import subprocess | |
11 | import time | |
12 | import threading | |
13 | import typing | |
14 | import uuid | |
15 | from concurrent import futures | |
16 | ||
17 | LOG = logging.getLogger() | |
18 | ||
19 | parser = argparse.ArgumentParser(description='rbd-wnbd tests') | |
20 | parser.add_argument('--test-name', | |
21 | help='The test to be run.', | |
22 | default="RbdFioTest") | |
23 | parser.add_argument('--iterations', | |
24 | help='Total number of test iterations', | |
25 | default=1, type=int) | |
26 | parser.add_argument('--concurrency', | |
27 | help='The number of tests to run in parallel', | |
28 | default=4, type=int) | |
29 | parser.add_argument('--fio-iterations', | |
30 | help='Total number of benchmark iterations per disk.', | |
31 | default=1, type=int) | |
32 | parser.add_argument('--fio-workers', | |
33 | help='Total number of fio workers per disk.', | |
34 | default=1, type=int) | |
35 | parser.add_argument('--fio-depth', | |
36 | help='The number of concurrent asynchronous operations ' | |
37 | 'executed per disk', | |
38 | default=64, type=int) | |
39 | parser.add_argument('--fio-verify', | |
40 | help='The mechanism used to validate the written ' | |
41 | 'data. Examples: crc32c, md5, sha1, null, etc. ' | |
42 | 'If set to null, the written data will not be ' | |
43 | 'verified.', | |
44 | default='crc32c') | |
45 | parser.add_argument('--bs', | |
46 | help='Benchmark block size.', | |
47 | default="2M") | |
48 | parser.add_argument('--op', | |
49 | help='Benchmark operation. ' | |
50 | 'Examples: read, randwrite, rw, etc.', | |
51 | default="rw") | |
52 | parser.add_argument('--image-prefix', | |
53 | help='The image name prefix.', | |
54 | default="cephTest-") | |
55 | parser.add_argument('--image-size-mb', | |
56 | help='The image size in megabytes.', | |
57 | default=1024, type=int) | |
58 | parser.add_argument('--map-timeout', | |
59 | help='Image map timeout.', | |
60 | default=60, type=int) | |
61 | parser.add_argument('--skip-enabling-disk', action='store_true', | |
62 | help='If set, the disk will not be turned online and the ' | |
63 | 'read-only flag will not be removed. Useful when ' | |
64 | 'the SAN policy is set to "onlineAll".') | |
65 | parser.add_argument('--verbose', action='store_true', | |
66 | help='Print info messages.') | |
67 | parser.add_argument('--debug', action='store_true', | |
68 | help='Print debug messages.') | |
69 | parser.add_argument('--stop-on-error', action='store_true', | |
70 | help='Stop testing when hitting errors.') | |
71 | parser.add_argument('--skip-cleanup-on-error', action='store_true', | |
72 | help='Skip cleanup when hitting errors.') | |
73 | ||
74 | ||
75 | class CephTestException(Exception): | |
76 | msg_fmt = "An exception has been encountered." | |
77 | ||
78 | def __init__(self, message: str = None, **kwargs): | |
79 | self.kwargs = kwargs | |
80 | if not message: | |
81 | message = self.msg_fmt % kwargs | |
82 | self.message = message | |
83 | super(CephTestException, self).__init__(message) | |
84 | ||
85 | ||
86 | class CommandFailed(CephTestException): | |
87 | msg_fmt = ( | |
88 | "Command failed: %(command)s. " | |
89 | "Return code: %(returncode)s. " | |
90 | "Stdout: %(stdout)s. Stderr: %(stderr)s.") | |
91 | ||
92 | ||
93 | class CephTestTimeout(CephTestException): | |
94 | msg_fmt = "Operation timeout." | |
95 | ||
96 | ||
97 | def setup_logging(log_level: int = logging.INFO): | |
98 | handler = logging.StreamHandler() | |
99 | handler.setLevel(log_level) | |
100 | ||
101 | log_fmt = '[%(asctime)s] %(levelname)s - %(message)s' | |
102 | formatter = logging.Formatter(log_fmt) | |
103 | handler.setFormatter(formatter) | |
104 | ||
105 | LOG.addHandler(handler) | |
106 | LOG.setLevel(logging.DEBUG) | |
107 | ||
108 | ||
109 | def retry_decorator(timeout: int = 60, | |
110 | retry_interval: int = 2, | |
111 | silent_interval: int = 10, | |
112 | additional_details: str = "", | |
113 | retried_exceptions: | |
114 | typing.Union[ | |
115 | typing.Type[Exception], | |
116 | collections.abc.Iterable[ | |
117 | typing.Type[Exception]]] = Exception): | |
118 | def wrapper(f: typing.Callable[..., typing.Any]): | |
119 | @functools.wraps(f) | |
120 | def inner(*args, **kwargs): | |
121 | tstart: float = time.time() | |
122 | elapsed: float = 0 | |
123 | exc = None | |
124 | details = additional_details or "%s failed" % f.__qualname__ | |
125 | ||
126 | while elapsed < timeout or not timeout: | |
127 | try: | |
128 | return f(*args, **kwargs) | |
129 | except retried_exceptions as ex: | |
130 | exc = ex | |
131 | elapsed = time.time() - tstart | |
132 | if elapsed > silent_interval: | |
133 | level = logging.WARNING | |
134 | else: | |
135 | level = logging.DEBUG | |
136 | LOG.log(level, | |
137 | "Exception: %s. Additional details: %s. " | |
138 | "Time elapsed: %d. Timeout: %d", | |
139 | ex, details, elapsed, timeout) | |
140 | ||
141 | time.sleep(retry_interval) | |
142 | elapsed = time.time() - tstart | |
143 | ||
144 | msg = ( | |
145 | "Operation timed out. Exception: %s. Additional details: %s. " | |
146 | "Time elapsed: %d. Timeout: %d.") | |
147 | raise CephTestTimeout( | |
148 | msg % (exc, details, elapsed, timeout)) | |
149 | return inner | |
150 | return wrapper | |
151 | ||
152 | ||
153 | def execute(*args, **kwargs): | |
154 | LOG.debug("Executing: %s", args) | |
155 | result = subprocess.run( | |
156 | args, | |
157 | stdout=subprocess.PIPE, | |
158 | stderr=subprocess.PIPE, | |
159 | **kwargs) | |
160 | LOG.debug("Command %s returned %d.", args, result.returncode) | |
161 | if result.returncode: | |
162 | exc = CommandFailed( | |
163 | command=args, returncode=result.returncode, | |
164 | stdout=result.stdout, stderr=result.stderr) | |
165 | LOG.error(exc) | |
166 | raise exc | |
167 | return result | |
168 | ||
169 | ||
170 | def ps_execute(*args, **kwargs): | |
171 | # Disable PS progress bar, causes issues when invoked remotely. | |
172 | prefix = "$global:ProgressPreference = 'SilentlyContinue' ; " | |
173 | return execute( | |
174 | "powershell.exe", "-NonInteractive", | |
175 | "-Command", prefix, *args, **kwargs) | |
176 | ||
177 | ||
178 | def array_stats(array: list): | |
179 | mean = sum(array) / len(array) if len(array) else 0 | |
180 | variance = (sum((i - mean) ** 2 for i in array) / len(array) | |
181 | if len(array) else 0) | |
182 | std_dev = math.sqrt(variance) | |
183 | sorted_array = sorted(array) | |
184 | ||
185 | return { | |
186 | 'min': min(array) if len(array) else 0, | |
187 | 'max': max(array) if len(array) else 0, | |
188 | 'sum': sum(array) if len(array) else 0, | |
189 | 'mean': mean, | |
190 | 'median': sorted_array[len(array) // 2] if len(array) else 0, | |
191 | 'max_90': sorted_array[int(len(array) * 0.9)] if len(array) else 0, | |
192 | 'min_90': sorted_array[int(len(array) * 0.1)] if len(array) else 0, | |
193 | 'variance': variance, | |
194 | 'std_dev': std_dev, | |
195 | 'count': len(array) | |
196 | } | |
197 | ||
198 | ||
199 | class Tracer: | |
200 | data: collections.OrderedDict = collections.OrderedDict() | |
201 | lock = threading.Lock() | |
202 | ||
203 | @classmethod | |
204 | def trace(cls, func): | |
205 | def wrapper(*args, **kwargs): | |
206 | tstart = time.time() | |
207 | exc_str = None | |
208 | ||
209 | # Preserve call order | |
210 | with cls.lock: | |
211 | if func.__qualname__ not in cls.data: | |
212 | cls.data[func.__qualname__] = list() | |
213 | ||
214 | try: | |
215 | return func(*args, **kwargs) | |
216 | except Exception as exc: | |
217 | exc_str = str(exc) | |
218 | raise | |
219 | finally: | |
220 | tend = time.time() | |
221 | ||
222 | with cls.lock: | |
223 | cls.data[func.__qualname__] += [{ | |
224 | "duration": tend - tstart, | |
225 | "error": exc_str, | |
226 | }] | |
227 | ||
228 | return wrapper | |
229 | ||
230 | @classmethod | |
231 | def get_results(cls): | |
232 | stats = collections.OrderedDict() | |
233 | for f in cls.data.keys(): | |
234 | stats[f] = array_stats([i['duration'] for i in cls.data[f]]) | |
235 | errors = [] | |
236 | for i in cls.data[f]: | |
237 | if i['error']: | |
238 | errors.append(i['error']) | |
239 | ||
240 | stats[f]['errors'] = errors | |
241 | return stats | |
242 | ||
243 | @classmethod | |
244 | def print_results(cls): | |
245 | r = cls.get_results() | |
246 | ||
247 | table = prettytable.PrettyTable(title="Duration (s)") | |
248 | table.field_names = [ | |
249 | "function", "min", "max", "total", | |
250 | "mean", "median", "std_dev", | |
251 | "max 90%", "min 90%", "count", "errors"] | |
252 | table.float_format = ".4" | |
253 | for f, s in r.items(): | |
254 | table.add_row([f, s['min'], s['max'], s['sum'], | |
255 | s['mean'], s['median'], s['std_dev'], | |
256 | s['max_90'], s['min_90'], | |
257 | s['count'], len(s['errors'])]) | |
258 | print(table) | |
259 | ||
260 | ||
261 | class RbdImage(object): | |
262 | def __init__(self, | |
263 | name: str, | |
264 | size_mb: int, | |
265 | is_shared: bool = True, | |
266 | disk_number: int = -1, | |
267 | mapped: bool = False): | |
268 | self.name = name | |
269 | self.size_mb = size_mb | |
270 | self.is_shared = is_shared | |
271 | self.disk_number = disk_number | |
272 | self.mapped = mapped | |
273 | self.removed = False | |
274 | self.drive_letter = "" | |
275 | ||
276 | @classmethod | |
277 | @Tracer.trace | |
278 | def create(cls, | |
279 | name: str, | |
280 | size_mb: int = 1024, | |
281 | is_shared: bool = True): | |
282 | LOG.info("Creating image: %s. Size: %s.", name, "%sM" % size_mb) | |
283 | cmd = ["rbd", "create", name, "--size", "%sM" % size_mb] | |
284 | if is_shared: | |
285 | cmd += ["--image-shared"] | |
286 | execute(*cmd) | |
287 | ||
288 | return RbdImage(name, size_mb, is_shared) | |
289 | ||
290 | @Tracer.trace | |
291 | def get_disk_number(self, | |
292 | timeout: int = 60, | |
293 | retry_interval: int = 2): | |
294 | @retry_decorator( | |
295 | retried_exceptions=CephTestException, | |
296 | timeout=timeout, | |
297 | retry_interval=retry_interval) | |
298 | def _get_disk_number(): | |
299 | LOG.info("Retrieving disk number: %s", self.name) | |
300 | ||
301 | result = execute("rbd-wnbd", "show", self.name, "--format=json") | |
302 | disk_info = json.loads(result.stdout) | |
303 | disk_number = disk_info["disk_number"] | |
304 | if disk_number > 0: | |
305 | LOG.debug("Image %s disk number: %d", self.name, disk_number) | |
306 | return disk_number | |
307 | ||
308 | raise CephTestException( | |
309 | f"Could not get disk number: {self.name}.") | |
310 | ||
311 | return _get_disk_number() | |
312 | ||
313 | @Tracer.trace | |
314 | def _wait_for_disk(self, | |
315 | timeout: int = 60, | |
316 | retry_interval: int = 2): | |
317 | @retry_decorator( | |
318 | retried_exceptions=(FileNotFoundError, OSError), | |
319 | additional_details="the mapped disk isn't available yet", | |
320 | timeout=timeout, | |
321 | retry_interval=retry_interval) | |
322 | def wait_for_disk(): | |
323 | LOG.debug("Waiting for disk to be accessible: %s %s", | |
324 | self.name, self.path) | |
325 | ||
326 | with open(self.path, 'rb'): | |
327 | pass | |
328 | ||
329 | return wait_for_disk() | |
330 | ||
331 | @property | |
332 | def path(self): | |
333 | return f"\\\\.\\PhysicalDrive{self.disk_number}" | |
334 | ||
335 | @Tracer.trace | |
336 | @retry_decorator(additional_details="couldn't clear disk read-only flag") | |
337 | def set_writable(self): | |
338 | ps_execute( | |
339 | "Set-Disk", "-Number", str(self.disk_number), | |
340 | "-IsReadOnly", "$false") | |
341 | ||
342 | @Tracer.trace | |
343 | @retry_decorator(additional_details="couldn't bring the disk online") | |
344 | def set_online(self): | |
345 | ps_execute( | |
346 | "Set-Disk", "-Number", str(self.disk_number), | |
347 | "-IsOffline", "$false") | |
348 | ||
349 | @Tracer.trace | |
350 | def map(self, timeout: int = 60): | |
351 | LOG.info("Mapping image: %s", self.name) | |
352 | tstart = time.time() | |
353 | ||
354 | execute("rbd-wnbd", "map", self.name) | |
355 | self.mapped = True | |
356 | ||
357 | self.disk_number = self.get_disk_number(timeout=timeout) | |
358 | ||
359 | elapsed = time.time() - tstart | |
360 | self._wait_for_disk(timeout=timeout - elapsed) | |
361 | ||
362 | @Tracer.trace | |
363 | def unmap(self): | |
364 | if self.mapped: | |
365 | LOG.info("Unmapping image: %s", self.name) | |
366 | execute("rbd-wnbd", "unmap", self.name) | |
367 | self.mapped = False | |
368 | ||
369 | @Tracer.trace | |
370 | def remove(self): | |
371 | if not self.removed: | |
372 | LOG.info("Removing image: %s", self.name) | |
373 | execute("rbd", "rm", self.name) | |
374 | self.removed = True | |
375 | ||
376 | def cleanup(self): | |
377 | try: | |
378 | self.unmap() | |
379 | finally: | |
380 | self.remove() | |
381 | ||
382 | @Tracer.trace | |
383 | @retry_decorator() | |
384 | def _init_disk(self): | |
385 | cmd = f"Get-Disk -Number {self.disk_number} | Initialize-Disk" | |
386 | ps_execute(cmd) | |
387 | ||
388 | @Tracer.trace | |
389 | @retry_decorator() | |
390 | def _create_partition(self): | |
391 | cmd = (f"Get-Disk -Number {self.disk_number} | " | |
392 | "New-Partition -AssignDriveLetter -UseMaximumSize") | |
393 | ps_execute(cmd) | |
394 | ||
395 | @Tracer.trace | |
396 | @retry_decorator() | |
397 | def _format_volume(self): | |
398 | cmd = ( | |
399 | f"(Get-Partition -DiskNumber {self.disk_number}" | |
400 | " | ? { $_.DriveLetter }) | Format-Volume -Force -Confirm:$false") | |
401 | ps_execute(cmd) | |
402 | ||
403 | @Tracer.trace | |
404 | @retry_decorator() | |
405 | def _get_drive_letter(self): | |
406 | cmd = (f"(Get-Partition -DiskNumber {self.disk_number}" | |
407 | " | ? { $_.DriveLetter }).DriveLetter") | |
408 | result = ps_execute(cmd) | |
409 | ||
410 | # The PowerShell command will place a null character if no drive letter | |
411 | # is available. For example, we can receive "\x00\r\n". | |
412 | self.drive_letter = result.stdout.decode().strip() | |
413 | if not self.drive_letter.isalpha() or len(self.drive_letter) != 1: | |
414 | raise CephTestException( | |
415 | "Invalid drive letter received: %s" % self.drive_letter) | |
416 | ||
417 | @Tracer.trace | |
418 | def init_fs(self): | |
419 | if not self.mapped: | |
420 | raise CephTestException("Unable to create fs, image not mapped.") | |
421 | ||
422 | LOG.info("Initializing fs, image: %s.", self.name) | |
423 | ||
424 | self._init_disk() | |
425 | self._create_partition() | |
426 | self._format_volume() | |
427 | self._get_drive_letter() | |
428 | ||
429 | @Tracer.trace | |
430 | def get_fs_capacity(self): | |
431 | if not self.drive_letter: | |
432 | raise CephTestException("No drive letter available") | |
433 | ||
434 | cmd = f"(Get-Volume -DriveLetter {self.drive_letter}).Size" | |
435 | result = ps_execute(cmd) | |
436 | ||
437 | return int(result.stdout.decode().strip()) | |
438 | ||
439 | @Tracer.trace | |
440 | def resize(self, new_size_mb, allow_shrink=False): | |
441 | LOG.info( | |
442 | "Resizing image: %s. New size: %s MB, old size: %s MB", | |
443 | self.name, new_size_mb, self.size_mb) | |
444 | ||
445 | cmd = ["rbd", "resize", self.name, | |
446 | "--size", f"{new_size_mb}M", "--no-progress"] | |
447 | if allow_shrink: | |
448 | cmd.append("--allow-shrink") | |
449 | ||
450 | execute(*cmd) | |
451 | ||
452 | self.size_mb = new_size_mb | |
453 | ||
454 | @Tracer.trace | |
455 | def get_disk_size(self): | |
456 | """Retrieve the virtual disk size (bytes) reported by Windows.""" | |
457 | cmd = f"(Get-Disk -Number {self.disk_number}).Size" | |
458 | result = ps_execute(cmd) | |
459 | ||
460 | disk_size = result.stdout.decode().strip() | |
461 | if not disk_size.isdigit(): | |
462 | raise CephTestException( | |
463 | "Invalid disk size received: %s" % disk_size) | |
464 | ||
465 | return int(disk_size) | |
466 | ||
467 | @Tracer.trace | |
468 | @retry_decorator(timeout=30) | |
469 | def wait_for_disk_resize(self): | |
470 | # After resizing the rbd image, the daemon is expected to receive | |
471 | # the notification, inform the WNBD driver and then trigger a disk | |
472 | # rescan (IOCTL_DISK_UPDATE_PROPERTIES). This might take a few seconds, | |
473 | # so we'll need to do some polling. | |
474 | disk_size = self.get_disk_size() | |
475 | disk_size_mb = disk_size // (1 << 20) | |
476 | ||
477 | if disk_size_mb != self.size_mb: | |
478 | raise CephTestException( | |
479 | "The disk size hasn't been updated yet. Retrieved size: " | |
480 | f"{disk_size_mb}MB. Expected size: {self.size_mb}MB.") | |
481 | ||
482 | ||
483 | class RbdTest(object): | |
484 | image: RbdImage | |
485 | ||
486 | requires_disk_online = False | |
487 | requires_disk_write = False | |
488 | ||
489 | def __init__(self, | |
490 | image_prefix: str = "cephTest-", | |
491 | image_size_mb: int = 1024, | |
492 | map_timeout: int = 60, | |
493 | **kwargs): | |
494 | self.image_size_mb = image_size_mb | |
495 | self.image_name = image_prefix + str(uuid.uuid4()) | |
496 | self.map_timeout = map_timeout | |
497 | self.skip_enabling_disk = kwargs.get("skip_enabling_disk") | |
498 | ||
499 | @Tracer.trace | |
500 | def initialize(self): | |
501 | self.image = RbdImage.create( | |
502 | self.image_name, | |
503 | self.image_size_mb) | |
504 | self.image.map(timeout=self.map_timeout) | |
505 | ||
506 | if not self.skip_enabling_disk: | |
507 | if self.requires_disk_write: | |
508 | self.image.set_writable() | |
509 | ||
510 | if self.requires_disk_online: | |
511 | self.image.set_online() | |
512 | ||
513 | def run(self): | |
514 | pass | |
515 | ||
516 | def cleanup(self): | |
517 | if self.image: | |
518 | self.image.cleanup() | |
519 | ||
520 | @classmethod | |
521 | def print_results(cls, | |
522 | title: str = "Test results", | |
523 | description: str = None): | |
524 | pass | |
525 | ||
526 | ||
527 | class RbdFsTestMixin(object): | |
528 | # Windows disks must be turned online before accessing partitions. | |
529 | requires_disk_online = True | |
530 | requires_disk_write = True | |
531 | ||
532 | @Tracer.trace | |
533 | def initialize(self): | |
534 | super(RbdFsTestMixin, self).initialize() | |
535 | ||
536 | self.image.init_fs() | |
537 | ||
538 | def get_subpath(self, *args): | |
539 | drive_path = f"{self.image.drive_letter}:\\" | |
540 | return os.path.join(drive_path, *args) | |
541 | ||
542 | ||
543 | class RbdFsTest(RbdFsTestMixin, RbdTest): | |
544 | pass | |
545 | ||
546 | ||
547 | class RbdFioTest(RbdTest): | |
548 | data: typing.DefaultDict[str, typing.List[typing.Dict[str, str]]] = ( | |
549 | collections.defaultdict(list)) | |
550 | lock = threading.Lock() | |
551 | ||
552 | def __init__(self, | |
553 | *args, | |
554 | fio_size_mb: int = None, | |
555 | iterations: int = 1, | |
556 | workers: int = 1, | |
557 | bs: str = "2M", | |
558 | iodepth: int = 64, | |
559 | op: str = "rw", | |
560 | verify: str = "crc32c", | |
561 | **kwargs): | |
562 | ||
563 | super(RbdFioTest, self).__init__(*args, **kwargs) | |
564 | ||
565 | self.fio_size_mb = fio_size_mb or self.image_size_mb | |
566 | self.iterations = iterations | |
567 | self.workers = workers | |
568 | self.bs = bs | |
569 | self.iodepth = iodepth | |
570 | self.op = op | |
571 | if op not in ("read", "randread"): | |
572 | self.requires_disk_write = True | |
573 | self.verify = verify | |
574 | ||
575 | def process_result(self, raw_fio_output: str): | |
576 | result = json.loads(raw_fio_output) | |
577 | with self.lock: | |
578 | for job in result["jobs"]: | |
579 | # Fio doesn't support trim on Windows | |
580 | for op in ['read', 'write']: | |
581 | if op in job: | |
582 | self.data[op].append({ | |
583 | 'error': job['error'], | |
584 | 'io_bytes': job[op]['io_bytes'], | |
585 | 'bw_bytes': job[op]['bw_bytes'], | |
586 | 'runtime': job[op]['runtime'] / 1000, # seconds | |
587 | 'total_ios': job[op]['short_ios'], | |
588 | 'short_ios': job[op]['short_ios'], | |
589 | 'dropped_ios': job[op]['short_ios'], | |
590 | 'clat_ns_min': job[op]['clat_ns']['min'], | |
591 | 'clat_ns_max': job[op]['clat_ns']['max'], | |
592 | 'clat_ns_mean': job[op]['clat_ns']['mean'], | |
593 | 'clat_ns_stddev': job[op]['clat_ns']['stddev'], | |
594 | 'clat_ns_10': job[op].get('clat_ns', {}) | |
595 | .get('percentile', {}) | |
596 | .get('10.000000', 0), | |
597 | 'clat_ns_90': job[op].get('clat_ns', {}) | |
598 | .get('percentile', {}) | |
599 | .get('90.000000', 0) | |
600 | }) | |
601 | ||
602 | def _get_fio_path(self): | |
603 | return self.image.path | |
604 | ||
605 | @Tracer.trace | |
606 | def _run_fio(self, fio_size_mb=None): | |
607 | LOG.info("Starting FIO test.") | |
608 | cmd = [ | |
609 | "fio", "--thread", "--output-format=json", | |
610 | "--randrepeat=%d" % self.iterations, | |
611 | "--direct=1", "--name=test", | |
612 | "--bs=%s" % self.bs, "--iodepth=%s" % self.iodepth, | |
613 | "--size=%sM" % (fio_size_mb or self.fio_size_mb), | |
614 | "--readwrite=%s" % self.op, | |
615 | "--numjobs=%s" % self.workers, | |
616 | "--filename=%s" % self._get_fio_path(), | |
617 | ] | |
618 | if self.verify: | |
619 | cmd += ["--verify=%s" % self.verify] | |
620 | result = execute(*cmd) | |
621 | LOG.info("Completed FIO test.") | |
622 | self.process_result(result.stdout) | |
623 | ||
624 | @Tracer.trace | |
625 | def run(self): | |
626 | self._run_fio() | |
627 | ||
628 | @classmethod | |
629 | def print_results(cls, | |
630 | title: str = "Benchmark results", | |
631 | description: str = None): | |
632 | if description: | |
633 | title = "%s (%s)" % (title, description) | |
634 | ||
635 | for op in cls.data.keys(): | |
636 | op_title = "%s op=%s" % (title, op) | |
637 | ||
638 | table = prettytable.PrettyTable(title=op_title) | |
639 | table.field_names = ["stat", "min", "max", "mean", | |
640 | "median", "std_dev", | |
641 | "max 90%", "min 90%", "total"] | |
642 | table.float_format = ".4" | |
643 | ||
644 | op_data = cls.data[op] | |
645 | ||
646 | s = array_stats([float(i["bw_bytes"]) / 1000_000 for i in op_data]) | |
647 | table.add_row(["bandwidth (MB/s)", | |
648 | s['min'], s['max'], s['mean'], | |
649 | s['median'], s['std_dev'], | |
650 | s['max_90'], s['min_90'], 'N/A']) | |
651 | ||
652 | s = array_stats([float(i["runtime"]) for i in op_data]) | |
653 | table.add_row(["duration (s)", | |
654 | s['min'], s['max'], s['mean'], | |
655 | s['median'], s['std_dev'], | |
656 | s['max_90'], s['min_90'], s['sum']]) | |
657 | ||
658 | s = array_stats([i["error"] for i in op_data]) | |
659 | table.add_row(["errors", | |
660 | s['min'], s['max'], s['mean'], | |
661 | s['median'], s['std_dev'], | |
662 | s['max_90'], s['min_90'], s['sum']]) | |
663 | ||
664 | s = array_stats([i["short_ios"] for i in op_data]) | |
665 | table.add_row(["incomplete IOs", | |
666 | s['min'], s['max'], s['mean'], | |
667 | s['median'], s['std_dev'], | |
668 | s['max_90'], s['min_90'], s['sum']]) | |
669 | ||
670 | s = array_stats([i["dropped_ios"] for i in op_data]) | |
671 | table.add_row(["dropped IOs", | |
672 | s['min'], s['max'], s['mean'], | |
673 | s['median'], s['std_dev'], | |
674 | s['max_90'], s['min_90'], s['sum']]) | |
675 | ||
676 | clat_min = array_stats([i["clat_ns_min"] for i in op_data]) | |
677 | clat_max = array_stats([i["clat_ns_max"] for i in op_data]) | |
678 | clat_mean = array_stats([i["clat_ns_mean"] for i in op_data]) | |
679 | clat_stddev = math.sqrt( | |
680 | sum([float(i["clat_ns_stddev"]) ** 2 for i in op_data]) / len(op_data) | |
681 | if len(op_data) else 0) | |
682 | clat_10 = array_stats([i["clat_ns_10"] for i in op_data]) | |
683 | clat_90 = array_stats([i["clat_ns_90"] for i in op_data]) | |
684 | # For convenience, we'll convert it from ns to seconds. | |
685 | table.add_row(["completion latency (s)", | |
686 | clat_min['min'] / 1e+9, | |
687 | clat_max['max'] / 1e+9, | |
688 | clat_mean['mean'] / 1e+9, | |
689 | clat_mean['median'] / 1e+9, | |
690 | clat_stddev / 1e+9, | |
691 | clat_10['mean'] / 1e+9, | |
692 | clat_90['mean'] / 1e+9, | |
693 | clat_mean['sum'] / 1e+9]) | |
694 | print(table) | |
695 | ||
696 | ||
697 | class RbdResizeFioTest(RbdFioTest): | |
698 | """Image resize test. | |
699 | ||
700 | This test extends and then shrinks the image, performing FIO tests to | |
701 | validate the resized image. | |
702 | """ | |
703 | ||
704 | @Tracer.trace | |
705 | def run(self): | |
706 | self.image.resize(self.image_size_mb * 2) | |
707 | self.image.wait_for_disk_resize() | |
708 | ||
709 | self._run_fio(fio_size_mb=self.image_size_mb * 2) | |
710 | ||
711 | self.image.resize(self.image_size_mb // 2, allow_shrink=True) | |
712 | self.image.wait_for_disk_resize() | |
713 | ||
714 | self._run_fio(fio_size_mb=self.image_size_mb // 2) | |
715 | ||
716 | # Just like rbd-nbd, rbd-wnbd is masking out-of-bounds errors. | |
717 | # For this reason, we don't have a negative test that writes | |
718 | # passed the disk boundary. | |
719 | ||
720 | ||
721 | class RbdFsFioTest(RbdFsTestMixin, RbdFioTest): | |
722 | def initialize(self): | |
723 | super(RbdFsFioTest, self).initialize() | |
724 | ||
725 | if not self.fio_size_mb or self.fio_size_mb == self.image_size_mb: | |
726 | # Out of caution, we'll use up to 80% of the FS by default | |
727 | self.fio_size_mb = int( | |
728 | self.image.get_fs_capacity() * 0.8 / (1024 * 1024)) | |
729 | ||
730 | @staticmethod | |
731 | def _fio_escape_path(path): | |
732 | # FIO allows specifying multiple files separated by colon. | |
733 | # This means that ":" has to be escaped, so | |
734 | # F:\filename becomes F\:\filename. | |
735 | return path.replace(":", "\\:") | |
736 | ||
737 | def _get_fio_path(self): | |
738 | return self._fio_escape_path(self.get_subpath("test-fio")) | |
739 | ||
740 | ||
741 | class RbdStampTest(RbdTest): | |
742 | requires_disk_write = True | |
743 | ||
744 | _write_open_mode = "rb+" | |
745 | _read_open_mode = "rb" | |
746 | _expect_path_exists = True | |
747 | ||
748 | @staticmethod | |
749 | def _rand_float(min_val: float, max_val: float): | |
750 | return min_val + (random.random() * max_val - min_val) | |
751 | ||
752 | def _get_stamp(self): | |
753 | buff = self.image_name.encode() | |
754 | padding = 512 - len(buff) | |
755 | buff += b'\0' * padding | |
756 | return buff | |
757 | ||
758 | def _get_stamp_path(self): | |
759 | return self.image.path | |
760 | ||
761 | @Tracer.trace | |
762 | def _write_stamp(self): | |
763 | with open(self._get_stamp_path(), self._write_open_mode) as disk: | |
764 | stamp = self._get_stamp() | |
765 | disk.write(stamp) | |
766 | ||
767 | @Tracer.trace | |
768 | def _read_stamp(self): | |
769 | with open(self._get_stamp_path(), self._read_open_mode) as disk: | |
770 | return disk.read(len(self._get_stamp())) | |
771 | ||
772 | @Tracer.trace | |
773 | def run(self): | |
774 | if self._expect_path_exists: | |
775 | # Wait up to 5 seconds and then check the disk, ensuring that | |
776 | # nobody else wrote to it. This is particularly useful when | |
777 | # running a high number of tests in parallel, ensuring that | |
778 | # we aren't writing to the wrong disk. | |
779 | time.sleep(self._rand_float(0, 5)) | |
780 | ||
781 | stamp = self._read_stamp() | |
782 | assert stamp == b'\0' * len(self._get_stamp()) | |
783 | ||
784 | self._write_stamp() | |
785 | ||
786 | stamp = self._read_stamp() | |
787 | assert stamp == self._get_stamp() | |
788 | ||
789 | ||
790 | class RbdFsStampTest(RbdFsTestMixin, RbdStampTest): | |
791 | _write_open_mode = "wb" | |
792 | _expect_path_exists = False | |
793 | ||
794 | def _get_stamp_path(self): | |
795 | return self.get_subpath("test-stamp") | |
796 | ||
797 | ||
798 | class TestRunner(object): | |
799 | def __init__(self, | |
800 | test_cls: typing.Type[RbdTest], | |
801 | test_params: dict = {}, | |
802 | iterations: int = 1, | |
803 | workers: int = 1, | |
804 | stop_on_error: bool = False, | |
805 | cleanup_on_error: bool = True): | |
806 | self.test_cls = test_cls | |
807 | self.test_params = test_params | |
808 | self.iterations = iterations | |
809 | self.workers = workers | |
810 | self.executor = futures.ThreadPoolExecutor(max_workers=workers) | |
811 | self.lock = threading.Lock() | |
812 | self.completed = 0 | |
813 | self.errors = 0 | |
814 | self.stopped = False | |
815 | self.stop_on_error = stop_on_error | |
816 | self.cleanup_on_error = cleanup_on_error | |
817 | ||
818 | @Tracer.trace | |
819 | def run(self): | |
820 | tasks = [] | |
821 | for i in range(self.iterations): | |
822 | task = self.executor.submit(self.run_single_test) | |
823 | tasks.append(task) | |
824 | ||
825 | LOG.info("Waiting for %d tests to complete.", self.iterations) | |
826 | for task in tasks: | |
827 | task.result() | |
828 | ||
829 | def run_single_test(self): | |
830 | failed = False | |
831 | if self.stopped: | |
832 | return | |
833 | ||
834 | try: | |
835 | test = self.test_cls(**self.test_params) | |
836 | test.initialize() | |
837 | test.run() | |
838 | except KeyboardInterrupt: | |
839 | LOG.warning("Received Ctrl-C.") | |
840 | self.stopped = True | |
841 | except Exception as ex: | |
842 | failed = True | |
843 | if self.stop_on_error: | |
844 | self.stopped = True | |
845 | with self.lock: | |
846 | self.errors += 1 | |
847 | LOG.exception( | |
848 | "Test exception: %s. Total exceptions: %d", | |
849 | ex, self.errors) | |
850 | finally: | |
851 | if not failed or self.cleanup_on_error: | |
852 | try: | |
853 | test.cleanup() | |
854 | except KeyboardInterrupt: | |
855 | LOG.warning("Received Ctrl-C.") | |
856 | self.stopped = True | |
857 | # Retry the cleanup | |
858 | test.cleanup() | |
859 | except Exception: | |
860 | LOG.exception("Test cleanup failed.") | |
861 | ||
862 | with self.lock: | |
863 | self.completed += 1 | |
864 | LOG.info("Completed tests: %d. Pending: %d", | |
865 | self.completed, self.iterations - self.completed) | |
866 | ||
867 | ||
868 | TESTS: typing.Dict[str, typing.Type[RbdTest]] = { | |
869 | 'RbdTest': RbdTest, | |
870 | 'RbdFioTest': RbdFioTest, | |
871 | 'RbdResizeFioTest': RbdResizeFioTest, | |
872 | 'RbdStampTest': RbdStampTest, | |
873 | # FS tests | |
874 | 'RbdFsTest': RbdFsTest, | |
875 | 'RbdFsFioTest': RbdFsFioTest, | |
876 | 'RbdFsStampTest': RbdFsStampTest, | |
877 | } | |
878 | ||
879 | if __name__ == '__main__': | |
880 | args = parser.parse_args() | |
881 | ||
882 | log_level = logging.WARNING | |
883 | if args.verbose: | |
884 | log_level = logging.INFO | |
885 | if args.debug: | |
886 | log_level = logging.DEBUG | |
887 | setup_logging(log_level) | |
888 | ||
889 | test_params = dict( | |
890 | image_size_mb=args.image_size_mb, | |
891 | image_prefix=args.image_prefix, | |
892 | bs=args.bs, | |
893 | op=args.op, | |
894 | verify=args.fio_verify, | |
895 | iodepth=args.fio_depth, | |
896 | map_timeout=args.map_timeout, | |
897 | skip_enabling_disk=args.skip_enabling_disk, | |
898 | ) | |
899 | ||
900 | try: | |
901 | test_cls = TESTS[args.test_name] | |
902 | except KeyError: | |
903 | raise CephTestException("Unkown test: {}".format(args.test_name)) | |
904 | ||
905 | runner = TestRunner( | |
906 | test_cls, | |
907 | test_params=test_params, | |
908 | iterations=args.iterations, | |
909 | workers=args.concurrency, | |
910 | stop_on_error=args.stop_on_error, | |
911 | cleanup_on_error=not args.skip_cleanup_on_error) | |
912 | runner.run() | |
913 | ||
914 | Tracer.print_results() | |
915 | test_cls.print_results( | |
916 | description="count: %d, concurrency: %d" % | |
917 | (args.iterations, args.concurrency)) | |
918 | ||
919 | assert runner.errors == 0, f"encountered {runner.errors} error(s)." |