]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rocksdb/tools/db_crashtest.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / rocksdb / tools / db_crashtest.py
index bf690b1eca1f74b7645d8b48ac9d79b614a748e2..722593caf5432baf18b13f701a983eec988e5247 100644 (file)
@@ -1,9 +1,12 @@
-#!/usr/bin/env python2
+#!/usr/bin/env python3
 # Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
+from __future__ import absolute_import, division, print_function, unicode_literals
+
 import os
 import sys
 import time
 import random
+import re
 import tempfile
 import subprocess
 import shutil
@@ -22,10 +25,12 @@ import argparse
 #   for txn:
 #       default_params < {blackbox,whitebox}_default_params < txn_params < args
 
-expected_values_file = tempfile.NamedTemporaryFile()
 
 default_params = {
     "acquire_snapshot_one_in": 10000,
+    "backup_max_size": 100 * 1024 * 1024,
+    # Consider larger number when backups considered more stable
+    "backup_one_in": 100000,
     "block_size": 16384,
     "bloom_bits": lambda: random.choice([random.randint(0,19),
                                          random.lognormvariate(2.3, 1.3)]),
@@ -42,6 +47,9 @@ default_params = {
     "checksum_type" : lambda: random.choice(["kCRC32c", "kxxHash", "kxxHash64"]),
     "compression_max_dict_bytes": lambda: 16384 * random.randint(0, 1),
     "compression_zstd_max_train_bytes": lambda: 65536 * random.randint(0, 1),
+    # Disabled compression_parallel_threads as the feature is not stable
+    # lambda: random.choice([1] * 9 + [4])
+    "compression_parallel_threads": 1,
     "clear_column_family_one_in": 0,
     "compact_files_one_in": 1000000,
     "compact_range_one_in": 1000000,
@@ -49,19 +57,29 @@ default_params = {
     "delrangepercent": 1,
     "destroy_db_initially": 0,
     "enable_pipelined_write": lambda: random.randint(0, 1),
-    "expected_values_path": expected_values_file.name,
+    "enable_compaction_filter": lambda: random.choice([0, 0, 0, 1]),
+    "expected_values_path": lambda: setup_expected_values_file(),
     "flush_one_in": 1000000,
-    "get_live_files_and_wal_files_one_in": 1000000,
+    "file_checksum_impl": lambda: random.choice(["none", "crc32c", "xxh64", "big"]),
+    "get_live_files_one_in": 1000000,
+    # Note: the following two are intentionally disabled as the corresponding
+    # APIs are not guaranteed to succeed.
+    "get_sorted_wal_files_one_in": 0,
+    "get_current_wal_file_one_in": 0,
     # Temporarily disable hash index
-    "index_type": lambda: random.choice([0,2]),
+    "index_type": lambda: random.choice([0, 0, 0, 2, 2, 3]),
+    "iterpercent": 10,
+    "mark_for_compaction_one_file_in": lambda: 10 * random.randint(0, 1),
     "max_background_compactions": 20,
     "max_bytes_for_level_base": 10485760,
     "max_key": 100000000,
     "max_write_buffer_number": 3,
     "mmap_read": lambda: random.randint(0, 1),
     "nooverwritepercent": 1,
-    "open_files": lambda : random.choice([-1, 500000]),
+    "open_files": lambda : random.choice([-1, -1, 100, 500000]),
+    "optimize_filters_for_memory": lambda: random.randint(0, 1),
     "partition_filters": lambda: random.randint(0, 1),
+    "partition_pinning": lambda: random.randint(0, 3),
     "pause_background_one_in": 1000000,
     "prefixpercent": 5,
     "progress_reports": 0,
@@ -69,14 +87,20 @@ default_params = {
     "recycle_log_file_num": lambda: random.randint(0, 1),
     "reopen": 20,
     "snapshot_hold_ops": 100000,
+    "sst_file_manager_bytes_per_sec": lambda: random.choice([0, 104857600]),
+    "sst_file_manager_bytes_per_truncate": lambda: random.choice([0, 1048576]),
     "long_running_snapshots": lambda: random.randint(0, 1),
     "subcompactions": lambda: random.randint(1, 4),
     "target_file_size_base": 2097152,
     "target_file_size_multiplier": 2,
+    "top_level_index_pinning": lambda: random.randint(0, 3),
+    "unpartitioned_pinning": lambda: random.randint(0, 3),
     "use_direct_reads": lambda: random.randint(0, 1),
     "use_direct_io_for_flush_and_compaction": lambda: random.randint(0, 1),
+    "mock_direct_io": False,
     "use_full_merge_v1": lambda: random.randint(0, 1),
     "use_merge": lambda: random.randint(0, 1),
+    "use_ribbon_filter": lambda: random.randint(0, 1),
     "verify_checksum": 1,
     "write_buffer_size": 4 * 1024 * 1024,
     "writepercent": 35,
@@ -100,6 +124,8 @@ default_params = {
         [0, 0, 0, 1024 * 1024, 8 * 1024 * 1024, 128 * 1024 * 1024]),
     "avoid_unnecessary_blocking_io" : random.randint(0, 1),
     "write_dbid_to_manifest" : random.randint(0, 1),
+    "avoid_flush_during_recovery" : random.choice(
+        [1 if t == 0 else 0 for t in range(0, 8)]),
     "max_write_batch_group_size_bytes" : lambda: random.choice(
         [16, 64, 1024 * 1024, 16 * 1024 * 1024]),
     "level_compaction_dynamic_level_bytes" : True,
@@ -107,28 +133,58 @@ default_params = {
     "verify_db_one_in": 100000,
     "continuous_verification_interval" : 0,
     "max_key_len": 3,
-    "key_len_percent_dist": "1,30,69"
+    "key_len_percent_dist": "1,30,69",
+    "read_fault_one_in": lambda: random.choice([0, 1000]),
+    "sync_fault_injection": False,
+    "get_property_one_in": 1000000,
+    "paranoid_file_checks": lambda: random.choice([0, 1, 1, 1]),
+    "max_write_buffer_size_to_maintain": lambda: random.choice(
+        [0, 1024 * 1024, 2 * 1024 * 1024, 4 * 1024 * 1024, 8 * 1024 * 1024]),
 }
 
 _TEST_DIR_ENV_VAR = 'TEST_TMPDIR'
+_DEBUG_LEVEL_ENV_VAR = 'DEBUG_LEVEL'
+
+
+def is_release_mode():
+    return os.environ.get(_DEBUG_LEVEL_ENV_VAR) == "0"
 
 
 def get_dbname(test_name):
+    test_dir_name = "rocksdb_crashtest_" + test_name
     test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
     if test_tmpdir is None or test_tmpdir == "":
-        dbname = tempfile.mkdtemp(prefix='rocksdb_crashtest_' + test_name)
+        dbname = tempfile.mkdtemp(prefix=test_dir_name)
     else:
-        dbname = test_tmpdir + "/rocksdb_crashtest_" + test_name
+        dbname = test_tmpdir + "/" + test_dir_name
         shutil.rmtree(dbname, True)
         os.mkdir(dbname)
     return dbname
 
+expected_values_file = None
+def setup_expected_values_file():
+    global expected_values_file
+    if expected_values_file is not None:
+        return expected_values_file
+    expected_file_name = "rocksdb_crashtest_" + "expected"
+    test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
+    if test_tmpdir is None or test_tmpdir == "":
+        expected_values_file = tempfile.NamedTemporaryFile(
+            prefix=expected_file_name, delete=False).name
+    else:
+        # if tmpdir is specified, store the expected_values_file in the same dir
+        expected_values_file = test_tmpdir + "/" + expected_file_name
+        if os.path.exists(expected_values_file):
+            os.remove(expected_values_file)
+        open(expected_values_file, 'a').close()
+    return expected_values_file
+
 
 def is_direct_io_supported(dbname):
     with tempfile.NamedTemporaryFile(dir=dbname) as f:
         try:
             os.open(f.name, os.O_DIRECT)
-        except:
+        except BaseException:
             return False
         return True
 
@@ -166,6 +222,7 @@ simple_default_params = {
     "test_batches_snapshots": 0,
     "write_buffer_size": 32 * 1024 * 1024,
     "level_compaction_dynamic_level_bytes": False,
+    "paranoid_file_checks": lambda: random.choice([0, 1, 1, 1]),
 }
 
 blackbox_simple_default_params = {
@@ -183,6 +240,9 @@ cf_consistency_params = {
     # more frequently
     "write_buffer_size": 1024 * 1024,
     "enable_pipelined_write": lambda: random.randint(0, 1),
+    # Snapshots are used heavily in this test mode, while they are incompatible
+    # with compaction filter.
+    "enable_compaction_filter": 0,
 }
 
 txn_params = {
@@ -197,6 +257,13 @@ txn_params = {
     "enable_pipelined_write": 0,
 }
 
+best_efforts_recovery_params = {
+    "best_efforts_recovery": True,
+    "skip_verifydb": True,
+    "verify_db_one_in": 0,
+    "continuous_verification_interval": 0,
+}
+
 def finalize_and_sanitize(src_params):
     dest_params = dict([(k,  v() if callable(v) else v)
                         for (k, v) in src_params.items()])
@@ -205,10 +272,21 @@ def finalize_and_sanitize(src_params):
         dest_params["compression_zstd_max_train_bytes"] = 0
     if dest_params.get("allow_concurrent_memtable_write", 1) == 1:
         dest_params["memtablerep"] = "skip_list"
-    if dest_params["mmap_read"] == 1 or not is_direct_io_supported(
-            dest_params["db"]):
+    if dest_params["mmap_read"] == 1:
         dest_params["use_direct_io_for_flush_and_compaction"] = 0
         dest_params["use_direct_reads"] = 0
+    if (dest_params["use_direct_io_for_flush_and_compaction"] == 1
+            or dest_params["use_direct_reads"] == 1) and \
+            not is_direct_io_supported(dest_params["db"]):
+        if is_release_mode():
+            print("{} does not support direct IO. Disabling use_direct_reads and "
+                    "use_direct_io_for_flush_and_compaction.\n".format(
+                        dest_params["db"]))
+            dest_params["use_direct_reads"] = 0
+            dest_params["use_direct_io_for_flush_and_compaction"] = 0
+        else:
+            dest_params["mock_direct_io"] = True
+
     # DeleteRange is not currnetly compatible with Txns
     if dest_params.get("test_batches_snapshots") == 1 or \
             dest_params.get("use_txn") == 1:
@@ -239,6 +317,18 @@ def finalize_and_sanitize(src_params):
     if dest_params.get("atomic_flush", 0) == 1:
         # disable pipelined write when atomic flush is used.
         dest_params["enable_pipelined_write"] = 0
+    if dest_params.get("sst_file_manager_bytes_per_sec", 0) == 0:
+        dest_params["sst_file_manager_bytes_per_truncate"] = 0
+    if dest_params.get("enable_compaction_filter", 0) == 1:
+        # Compaction filter is incompatible with snapshots. Need to avoid taking
+        # snapshots, as well as avoid operations that use snapshots for
+        # verification.
+        dest_params["acquire_snapshot_one_in"] = 0
+        dest_params["compact_range_one_in"] = 0
+        # Give the iterator ops away to reads.
+        dest_params["readpercent"] += dest_params.get("iterpercent", 10)
+        dest_params["iterpercent"] = 0
+        dest_params["test_batches_snapshots"] = 0
     return dest_params
 
 def gen_cmd_params(args):
@@ -259,6 +349,8 @@ def gen_cmd_params(args):
         params.update(cf_consistency_params)
     if args.txn:
         params.update(txn_params)
+    if args.test_best_efforts_recovery:
+        params.update(best_efforts_recovery_params)
 
     for k, v in vars(args).items():
         if v is not None:
@@ -272,11 +364,49 @@ def gen_cmd(params, unknown_params):
         '--{0}={1}'.format(k, v)
         for k, v in [(k, finalzied_params[k]) for k in sorted(finalzied_params)]
         if k not in set(['test_type', 'simple', 'duration', 'interval',
-                         'random_kill_odd', 'cf_consistency', 'txn'])
+                         'random_kill_odd', 'cf_consistency', 'txn',
+                         'test_best_efforts_recovery'])
         and v is not None] + unknown_params
     return cmd
 
 
+# Inject inconsistency to db directory.
+def inject_inconsistencies_to_db_dir(dir_path):
+    files = os.listdir(dir_path)
+    file_num_rgx = re.compile(r'(?P<number>[0-9]{6})')
+    largest_fnum = 0
+    for f in files:
+        m = file_num_rgx.search(f)
+        if m and not f.startswith('LOG'):
+            largest_fnum = max(largest_fnum, int(m.group('number')))
+
+    candidates = [
+        f for f in files if re.search(r'[0-9]+\.sst', f)
+    ]
+    deleted = 0
+    corrupted = 0
+    for f in candidates:
+        rnd = random.randint(0, 99)
+        f_path = os.path.join(dir_path, f)
+        if rnd < 10:
+            os.unlink(f_path)
+            deleted = deleted + 1
+        elif 10 <= rnd and rnd < 30:
+            with open(f_path, "a") as fd:
+                fd.write('12345678')
+            corrupted = corrupted + 1
+    print('Removed %d table files' % deleted)
+    print('Corrupted %d table files' % corrupted)
+
+    # Add corrupted MANIFEST and SST
+    for num in range(largest_fnum + 1, largest_fnum + 10):
+        rnd = random.randint(0, 1)
+        fname = ("MANIFEST-%06d" % num) if rnd == 0 else ("%06d.sst" % num)
+        print('Write %s' % fname)
+        with open(os.path.join(dir_path, fname), "w") as fd:
+            fd.write("garbage")
+
+
 # This script runs and kills db_stress multiple times. It checks consistency
 # in case of unsafe crashes in RocksDB.
 def blackbox_crash_main(args, unknown_args):
@@ -293,8 +423,8 @@ def blackbox_crash_main(args, unknown_args):
         killtime = time.time() + cmd_params['interval']
 
         cmd = gen_cmd(dict(
-            cmd_params.items() +
-            {'db': dbname}.items()), unknown_args)
+            list(cmd_params.items())
+            + list({'db': dbname}.items())), unknown_args)
 
         child = subprocess.Popen(cmd, stderr=subprocess.PIPE)
         print("Running db_stress with pid=%d: %s\n\n"
@@ -319,7 +449,7 @@ def blackbox_crash_main(args, unknown_args):
                 time.sleep(1)  # time to stabilize after a kill
 
         while True:
-            line = child.stderr.readline().strip()
+            line = child.stderr.readline().strip().decode('utf-8')
             if line == '':
                 break
             elif not line.startswith('WARNING'):
@@ -332,6 +462,11 @@ def blackbox_crash_main(args, unknown_args):
 
         time.sleep(1)  # time to stabilize before the next run
 
+        if args.test_best_efforts_recovery:
+            inject_inconsistencies_to_db_dir(dbname)
+
+        time.sleep(1)  # time to stabilize before the next run
+
     # we need to clean up after ourselves -- only do this on test success
     shutil.rmtree(dbname, True)
 
@@ -344,7 +479,7 @@ def whitebox_crash_main(args, unknown_args):
 
     cur_time = time.time()
     exit_time = cur_time + cmd_params['duration']
-    half_time = cur_time + cmd_params['duration'] / 2
+    half_time = cur_time + cmd_params['duration'] // 2
 
     print("Running whitebox-crash-test with \n"
           + "total-duration=" + str(cmd_params['duration']) + "\n")
@@ -370,20 +505,20 @@ def whitebox_crash_main(args, unknown_args):
                 })
             elif kill_mode == 1:
                 if cmd_params.get('disable_wal', 0) == 1:
-                    my_kill_odd = kill_random_test / 50 + 1
+                    my_kill_odd = kill_random_test // 50 + 1
                 else:
-                    my_kill_odd = kill_random_test / 10 + 1
+                    my_kill_odd = kill_random_test // 10 + 1
                 additional_opts.update({
                     "kill_random_test": my_kill_odd,
-                    "kill_prefix_blacklist": "WritableFileWriter::Append,"
+                    "kill_exclude_prefixes": "WritableFileWriter::Append,"
                     + "WritableFileWriter::WriteBuffered",
                 })
             elif kill_mode == 2:
                 # TODO: May need to adjust random odds if kill_random_test
                 # is too small.
                 additional_opts.update({
-                    "kill_random_test": (kill_random_test / 5000 + 1),
-                    "kill_prefix_blacklist": "WritableFileWriter::Append,"
+                    "kill_random_test": (kill_random_test // 5000 + 1),
+                    "kill_exclude_prefixes": "WritableFileWriter::Append,"
                     "WritableFileWriter::WriteBuffered,"
                     "PosixMmapFile::Allocate,WritableFileWriter::Flush",
                 })
@@ -396,13 +531,19 @@ def whitebox_crash_main(args, unknown_args):
                 "ops_per_thread": cmd_params['ops_per_thread'],
                 "compaction_style": 1,
             }
+            # Single level universal has a lot of special logic. Ensure we cover
+            # it sometimes.
+            if random.randint(0, 1) == 1:
+                additional_opts.update({
+                    "num_levels": 1,
+                })
         elif check_mode == 2:
             # normal run with FIFO compaction mode
             # ops_per_thread is divided by 5 because FIFO compaction
             # style is quite a bit slower on reads with lot of files
             additional_opts = {
                 "kill_random_test": None,
-                "ops_per_thread": cmd_params['ops_per_thread'] / 5,
+                "ops_per_thread": cmd_params['ops_per_thread'] // 5,
                 "compaction_style": 2,
             }
         else:
@@ -412,19 +553,24 @@ def whitebox_crash_main(args, unknown_args):
                 "ops_per_thread": cmd_params['ops_per_thread'],
             }
 
-        cmd = gen_cmd(dict(cmd_params.items() + additional_opts.items()
-                           + {'db': dbname}.items()), unknown_args)
+        cmd = gen_cmd(dict(list(cmd_params.items())
+            + list(additional_opts.items())
+            + list({'db': dbname}.items())), unknown_args)
 
-        print "Running:" + ' '.join(cmd) + "\n"  # noqa: E999 T25377293 Grandfathered in
+        print("Running:" + ' '.join(cmd) + "\n")  # noqa: E999 T25377293 Grandfathered in
 
         popen = subprocess.Popen(cmd, stdout=subprocess.PIPE,
                                  stderr=subprocess.STDOUT)
         stdoutdata, stderrdata = popen.communicate()
+        if stdoutdata:
+            stdoutdata = stdoutdata.decode('utf-8')
+        if stderrdata:
+            stderrdata = stderrdata.decode('utf-8')
         retncode = popen.returncode
         msg = ("check_mode={0}, kill option={1}, exitcode={2}\n".format(
                check_mode, additional_opts['kill_random_test'], retncode))
-        print msg
-        print stdoutdata
+        print(msg)
+        print(stdoutdata)
 
         expected = False
         if additional_opts['kill_random_test'] is None and (retncode == 0):
@@ -436,19 +582,19 @@ def whitebox_crash_main(args, unknown_args):
             expected = True
 
         if not expected:
-            print "TEST FAILED. See kill option and exit code above!!!\n"
+            print("TEST FAILED. See kill option and exit code above!!!\n")
             sys.exit(1)
 
         stdoutdata = stdoutdata.lower()
         errorcount = (stdoutdata.count('error') -
                       stdoutdata.count('got errors 0 times'))
-        print "#times error occurred in output is " + str(errorcount) + "\n"
+        print("#times error occurred in output is " + str(errorcount) + "\n")
 
         if (errorcount > 0):
-            print "TEST FAILED. Output has 'error'!!!\n"
+            print("TEST FAILED. Output has 'error'!!!\n")
             sys.exit(2)
         if (stdoutdata.find('fail') >= 0):
-            print "TEST FAILED. Output has 'fail'!!!\n"
+            print("TEST FAILED. Output has 'fail'!!!\n")
             sys.exit(2)
 
         # First half of the duration, keep doing kill test. For the next half,
@@ -471,13 +617,14 @@ def main():
     parser.add_argument("--simple", action="store_true")
     parser.add_argument("--cf_consistency", action='store_true')
     parser.add_argument("--txn", action='store_true')
+    parser.add_argument("--test_best_efforts_recovery", action='store_true')
 
-    all_params = dict(default_params.items()
-                      + blackbox_default_params.items()
-                      + whitebox_default_params.items()
-                      + simple_default_params.items()
-                      + blackbox_simple_default_params.items()
-                      + whitebox_simple_default_params.items())
+    all_params = dict(list(default_params.items())
+                      + list(blackbox_default_params.items())
+                      + list(whitebox_default_params.items())
+                      + list(simple_default_params.items())
+                      + list(blackbox_simple_default_params.items())
+                      + list(whitebox_simple_default_params.items()))
 
     for k, v in all_params.items():
         parser.add_argument("--" + k, type=type(v() if callable(v) else v))
@@ -494,6 +641,10 @@ def main():
         blackbox_crash_main(args, unknown_args)
     if args.test_type == 'whitebox':
         whitebox_crash_main(args, unknown_args)
+    # Only delete the `expected_values_file` if test passes
+    if os.path.exists(expected_values_file):
+        os.remove(expected_values_file)
+
 
 if __name__ == '__main__':
     main()