-#!/usr/bin/env python2
+#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
+from __future__ import absolute_import, division, print_function, unicode_literals
+
import os
import sys
import time
import random
+import re
import tempfile
import subprocess
import shutil
# for txn:
# default_params < {blackbox,whitebox}_default_params < txn_params < args
-expected_values_file = tempfile.NamedTemporaryFile()
default_params = {
"acquire_snapshot_one_in": 10000,
+ "backup_max_size": 100 * 1024 * 1024,
+ # Consider larger number when backups considered more stable
+ "backup_one_in": 100000,
"block_size": 16384,
"bloom_bits": lambda: random.choice([random.randint(0,19),
random.lognormvariate(2.3, 1.3)]),
"checksum_type" : lambda: random.choice(["kCRC32c", "kxxHash", "kxxHash64"]),
"compression_max_dict_bytes": lambda: 16384 * random.randint(0, 1),
"compression_zstd_max_train_bytes": lambda: 65536 * random.randint(0, 1),
+ # Disabled compression_parallel_threads as the feature is not stable
+ # lambda: random.choice([1] * 9 + [4])
+ "compression_parallel_threads": 1,
"clear_column_family_one_in": 0,
"compact_files_one_in": 1000000,
"compact_range_one_in": 1000000,
"delrangepercent": 1,
"destroy_db_initially": 0,
"enable_pipelined_write": lambda: random.randint(0, 1),
- "expected_values_path": expected_values_file.name,
+ "enable_compaction_filter": lambda: random.choice([0, 0, 0, 1]),
+ "expected_values_path": lambda: setup_expected_values_file(),
"flush_one_in": 1000000,
- "get_live_files_and_wal_files_one_in": 1000000,
+ "file_checksum_impl": lambda: random.choice(["none", "crc32c", "xxh64", "big"]),
+ "get_live_files_one_in": 1000000,
+ # Note: the following two are intentionally disabled as the corresponding
+ # APIs are not guaranteed to succeed.
+ "get_sorted_wal_files_one_in": 0,
+ "get_current_wal_file_one_in": 0,
# Temporarily disable hash index
- "index_type": lambda: random.choice([0,2]),
+ "index_type": lambda: random.choice([0, 0, 0, 2, 2, 3]),
+ "iterpercent": 10,
+ "mark_for_compaction_one_file_in": lambda: 10 * random.randint(0, 1),
"max_background_compactions": 20,
"max_bytes_for_level_base": 10485760,
"max_key": 100000000,
"max_write_buffer_number": 3,
"mmap_read": lambda: random.randint(0, 1),
"nooverwritepercent": 1,
- "open_files": lambda : random.choice([-1, 500000]),
+ "open_files": lambda : random.choice([-1, -1, 100, 500000]),
+ "optimize_filters_for_memory": lambda: random.randint(0, 1),
"partition_filters": lambda: random.randint(0, 1),
+ "partition_pinning": lambda: random.randint(0, 3),
"pause_background_one_in": 1000000,
"prefixpercent": 5,
"progress_reports": 0,
"recycle_log_file_num": lambda: random.randint(0, 1),
"reopen": 20,
"snapshot_hold_ops": 100000,
+ "sst_file_manager_bytes_per_sec": lambda: random.choice([0, 104857600]),
+ "sst_file_manager_bytes_per_truncate": lambda: random.choice([0, 1048576]),
"long_running_snapshots": lambda: random.randint(0, 1),
"subcompactions": lambda: random.randint(1, 4),
"target_file_size_base": 2097152,
"target_file_size_multiplier": 2,
+ "top_level_index_pinning": lambda: random.randint(0, 3),
+ "unpartitioned_pinning": lambda: random.randint(0, 3),
"use_direct_reads": lambda: random.randint(0, 1),
"use_direct_io_for_flush_and_compaction": lambda: random.randint(0, 1),
+ "mock_direct_io": False,
"use_full_merge_v1": lambda: random.randint(0, 1),
"use_merge": lambda: random.randint(0, 1),
+ "use_ribbon_filter": lambda: random.randint(0, 1),
"verify_checksum": 1,
"write_buffer_size": 4 * 1024 * 1024,
"writepercent": 35,
[0, 0, 0, 1024 * 1024, 8 * 1024 * 1024, 128 * 1024 * 1024]),
"avoid_unnecessary_blocking_io" : random.randint(0, 1),
"write_dbid_to_manifest" : random.randint(0, 1),
+ "avoid_flush_during_recovery" : random.choice(
+ [1 if t == 0 else 0 for t in range(0, 8)]),
"max_write_batch_group_size_bytes" : lambda: random.choice(
[16, 64, 1024 * 1024, 16 * 1024 * 1024]),
"level_compaction_dynamic_level_bytes" : True,
"verify_db_one_in": 100000,
"continuous_verification_interval" : 0,
"max_key_len": 3,
- "key_len_percent_dist": "1,30,69"
+ "key_len_percent_dist": "1,30,69",
+ "read_fault_one_in": lambda: random.choice([0, 1000]),
+ "sync_fault_injection": False,
+ "get_property_one_in": 1000000,
+ "paranoid_file_checks": lambda: random.choice([0, 1, 1, 1]),
+ "max_write_buffer_size_to_maintain": lambda: random.choice(
+ [0, 1024 * 1024, 2 * 1024 * 1024, 4 * 1024 * 1024, 8 * 1024 * 1024]),
}
_TEST_DIR_ENV_VAR = 'TEST_TMPDIR'
+_DEBUG_LEVEL_ENV_VAR = 'DEBUG_LEVEL'
+
+
+def is_release_mode():
+ return os.environ.get(_DEBUG_LEVEL_ENV_VAR) == "0"
def get_dbname(test_name):
+ test_dir_name = "rocksdb_crashtest_" + test_name
test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
if test_tmpdir is None or test_tmpdir == "":
- dbname = tempfile.mkdtemp(prefix='rocksdb_crashtest_' + test_name)
+ dbname = tempfile.mkdtemp(prefix=test_dir_name)
else:
- dbname = test_tmpdir + "/rocksdb_crashtest_" + test_name
+ dbname = test_tmpdir + "/" + test_dir_name
shutil.rmtree(dbname, True)
os.mkdir(dbname)
return dbname
+expected_values_file = None
+def setup_expected_values_file():
+ global expected_values_file
+ if expected_values_file is not None:
+ return expected_values_file
+ expected_file_name = "rocksdb_crashtest_" + "expected"
+ test_tmpdir = os.environ.get(_TEST_DIR_ENV_VAR)
+ if test_tmpdir is None or test_tmpdir == "":
+ expected_values_file = tempfile.NamedTemporaryFile(
+ prefix=expected_file_name, delete=False).name
+ else:
+ # if tmpdir is specified, store the expected_values_file in the same dir
+ expected_values_file = test_tmpdir + "/" + expected_file_name
+ if os.path.exists(expected_values_file):
+ os.remove(expected_values_file)
+ open(expected_values_file, 'a').close()
+ return expected_values_file
+
def is_direct_io_supported(dbname):
with tempfile.NamedTemporaryFile(dir=dbname) as f:
try:
os.open(f.name, os.O_DIRECT)
- except:
+ except BaseException:
return False
return True
"test_batches_snapshots": 0,
"write_buffer_size": 32 * 1024 * 1024,
"level_compaction_dynamic_level_bytes": False,
+ "paranoid_file_checks": lambda: random.choice([0, 1, 1, 1]),
}
blackbox_simple_default_params = {
# more frequently
"write_buffer_size": 1024 * 1024,
"enable_pipelined_write": lambda: random.randint(0, 1),
+ # Snapshots are used heavily in this test mode, while they are incompatible
+ # with compaction filter.
+ "enable_compaction_filter": 0,
}
txn_params = {
"enable_pipelined_write": 0,
}
+best_efforts_recovery_params = {
+ "best_efforts_recovery": True,
+ "skip_verifydb": True,
+ "verify_db_one_in": 0,
+ "continuous_verification_interval": 0,
+}
+
def finalize_and_sanitize(src_params):
dest_params = dict([(k, v() if callable(v) else v)
for (k, v) in src_params.items()])
dest_params["compression_zstd_max_train_bytes"] = 0
if dest_params.get("allow_concurrent_memtable_write", 1) == 1:
dest_params["memtablerep"] = "skip_list"
- if dest_params["mmap_read"] == 1 or not is_direct_io_supported(
- dest_params["db"]):
+ if dest_params["mmap_read"] == 1:
dest_params["use_direct_io_for_flush_and_compaction"] = 0
dest_params["use_direct_reads"] = 0
+ if (dest_params["use_direct_io_for_flush_and_compaction"] == 1
+ or dest_params["use_direct_reads"] == 1) and \
+ not is_direct_io_supported(dest_params["db"]):
+ if is_release_mode():
+ print("{} does not support direct IO. Disabling use_direct_reads and "
+ "use_direct_io_for_flush_and_compaction.\n".format(
+ dest_params["db"]))
+ dest_params["use_direct_reads"] = 0
+ dest_params["use_direct_io_for_flush_and_compaction"] = 0
+ else:
+ dest_params["mock_direct_io"] = True
+
# DeleteRange is not currnetly compatible with Txns
if dest_params.get("test_batches_snapshots") == 1 or \
dest_params.get("use_txn") == 1:
if dest_params.get("atomic_flush", 0) == 1:
# disable pipelined write when atomic flush is used.
dest_params["enable_pipelined_write"] = 0
+ if dest_params.get("sst_file_manager_bytes_per_sec", 0) == 0:
+ dest_params["sst_file_manager_bytes_per_truncate"] = 0
+ if dest_params.get("enable_compaction_filter", 0) == 1:
+ # Compaction filter is incompatible with snapshots. Need to avoid taking
+ # snapshots, as well as avoid operations that use snapshots for
+ # verification.
+ dest_params["acquire_snapshot_one_in"] = 0
+ dest_params["compact_range_one_in"] = 0
+ # Give the iterator ops away to reads.
+ dest_params["readpercent"] += dest_params.get("iterpercent", 10)
+ dest_params["iterpercent"] = 0
+ dest_params["test_batches_snapshots"] = 0
return dest_params
def gen_cmd_params(args):
params.update(cf_consistency_params)
if args.txn:
params.update(txn_params)
+ if args.test_best_efforts_recovery:
+ params.update(best_efforts_recovery_params)
for k, v in vars(args).items():
if v is not None:
'--{0}={1}'.format(k, v)
for k, v in [(k, finalzied_params[k]) for k in sorted(finalzied_params)]
if k not in set(['test_type', 'simple', 'duration', 'interval',
- 'random_kill_odd', 'cf_consistency', 'txn'])
+ 'random_kill_odd', 'cf_consistency', 'txn',
+ 'test_best_efforts_recovery'])
and v is not None] + unknown_params
return cmd
+# Inject inconsistency to db directory.
+def inject_inconsistencies_to_db_dir(dir_path):
+ files = os.listdir(dir_path)
+ file_num_rgx = re.compile(r'(?P<number>[0-9]{6})')
+ largest_fnum = 0
+ for f in files:
+ m = file_num_rgx.search(f)
+ if m and not f.startswith('LOG'):
+ largest_fnum = max(largest_fnum, int(m.group('number')))
+
+ candidates = [
+ f for f in files if re.search(r'[0-9]+\.sst', f)
+ ]
+ deleted = 0
+ corrupted = 0
+ for f in candidates:
+ rnd = random.randint(0, 99)
+ f_path = os.path.join(dir_path, f)
+ if rnd < 10:
+ os.unlink(f_path)
+ deleted = deleted + 1
+ elif 10 <= rnd and rnd < 30:
+ with open(f_path, "a") as fd:
+ fd.write('12345678')
+ corrupted = corrupted + 1
+ print('Removed %d table files' % deleted)
+ print('Corrupted %d table files' % corrupted)
+
+ # Add corrupted MANIFEST and SST
+ for num in range(largest_fnum + 1, largest_fnum + 10):
+ rnd = random.randint(0, 1)
+ fname = ("MANIFEST-%06d" % num) if rnd == 0 else ("%06d.sst" % num)
+ print('Write %s' % fname)
+ with open(os.path.join(dir_path, fname), "w") as fd:
+ fd.write("garbage")
+
+
# This script runs and kills db_stress multiple times. It checks consistency
# in case of unsafe crashes in RocksDB.
def blackbox_crash_main(args, unknown_args):
killtime = time.time() + cmd_params['interval']
cmd = gen_cmd(dict(
- cmd_params.items() +
- {'db': dbname}.items()), unknown_args)
+ list(cmd_params.items())
+ + list({'db': dbname}.items())), unknown_args)
child = subprocess.Popen(cmd, stderr=subprocess.PIPE)
print("Running db_stress with pid=%d: %s\n\n"
time.sleep(1) # time to stabilize after a kill
while True:
- line = child.stderr.readline().strip()
+ line = child.stderr.readline().strip().decode('utf-8')
if line == '':
break
elif not line.startswith('WARNING'):
time.sleep(1) # time to stabilize before the next run
+ if args.test_best_efforts_recovery:
+ inject_inconsistencies_to_db_dir(dbname)
+
+ time.sleep(1) # time to stabilize before the next run
+
# we need to clean up after ourselves -- only do this on test success
shutil.rmtree(dbname, True)
cur_time = time.time()
exit_time = cur_time + cmd_params['duration']
- half_time = cur_time + cmd_params['duration'] / 2
+ half_time = cur_time + cmd_params['duration'] // 2
print("Running whitebox-crash-test with \n"
+ "total-duration=" + str(cmd_params['duration']) + "\n")
})
elif kill_mode == 1:
if cmd_params.get('disable_wal', 0) == 1:
- my_kill_odd = kill_random_test / 50 + 1
+ my_kill_odd = kill_random_test // 50 + 1
else:
- my_kill_odd = kill_random_test / 10 + 1
+ my_kill_odd = kill_random_test // 10 + 1
additional_opts.update({
"kill_random_test": my_kill_odd,
- "kill_prefix_blacklist": "WritableFileWriter::Append,"
+ "kill_exclude_prefixes": "WritableFileWriter::Append,"
+ "WritableFileWriter::WriteBuffered",
})
elif kill_mode == 2:
# TODO: May need to adjust random odds if kill_random_test
# is too small.
additional_opts.update({
- "kill_random_test": (kill_random_test / 5000 + 1),
- "kill_prefix_blacklist": "WritableFileWriter::Append,"
+ "kill_random_test": (kill_random_test // 5000 + 1),
+ "kill_exclude_prefixes": "WritableFileWriter::Append,"
"WritableFileWriter::WriteBuffered,"
"PosixMmapFile::Allocate,WritableFileWriter::Flush",
})
"ops_per_thread": cmd_params['ops_per_thread'],
"compaction_style": 1,
}
+ # Single level universal has a lot of special logic. Ensure we cover
+ # it sometimes.
+ if random.randint(0, 1) == 1:
+ additional_opts.update({
+ "num_levels": 1,
+ })
elif check_mode == 2:
# normal run with FIFO compaction mode
# ops_per_thread is divided by 5 because FIFO compaction
# style is quite a bit slower on reads with lot of files
additional_opts = {
"kill_random_test": None,
- "ops_per_thread": cmd_params['ops_per_thread'] / 5,
+ "ops_per_thread": cmd_params['ops_per_thread'] // 5,
"compaction_style": 2,
}
else:
"ops_per_thread": cmd_params['ops_per_thread'],
}
- cmd = gen_cmd(dict(cmd_params.items() + additional_opts.items()
- + {'db': dbname}.items()), unknown_args)
+ cmd = gen_cmd(dict(list(cmd_params.items())
+ + list(additional_opts.items())
+ + list({'db': dbname}.items())), unknown_args)
- print "Running:" + ' '.join(cmd) + "\n" # noqa: E999 T25377293 Grandfathered in
+ print("Running:" + ' '.join(cmd) + "\n") # noqa: E999 T25377293 Grandfathered in
popen = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
stdoutdata, stderrdata = popen.communicate()
+ if stdoutdata:
+ stdoutdata = stdoutdata.decode('utf-8')
+ if stderrdata:
+ stderrdata = stderrdata.decode('utf-8')
retncode = popen.returncode
msg = ("check_mode={0}, kill option={1}, exitcode={2}\n".format(
check_mode, additional_opts['kill_random_test'], retncode))
- print msg
- print stdoutdata
+ print(msg)
+ print(stdoutdata)
expected = False
if additional_opts['kill_random_test'] is None and (retncode == 0):
expected = True
if not expected:
- print "TEST FAILED. See kill option and exit code above!!!\n"
+ print("TEST FAILED. See kill option and exit code above!!!\n")
sys.exit(1)
stdoutdata = stdoutdata.lower()
errorcount = (stdoutdata.count('error') -
stdoutdata.count('got errors 0 times'))
- print "#times error occurred in output is " + str(errorcount) + "\n"
+ print("#times error occurred in output is " + str(errorcount) + "\n")
if (errorcount > 0):
- print "TEST FAILED. Output has 'error'!!!\n"
+ print("TEST FAILED. Output has 'error'!!!\n")
sys.exit(2)
if (stdoutdata.find('fail') >= 0):
- print "TEST FAILED. Output has 'fail'!!!\n"
+ print("TEST FAILED. Output has 'fail'!!!\n")
sys.exit(2)
# First half of the duration, keep doing kill test. For the next half,
parser.add_argument("--simple", action="store_true")
parser.add_argument("--cf_consistency", action='store_true')
parser.add_argument("--txn", action='store_true')
+ parser.add_argument("--test_best_efforts_recovery", action='store_true')
- all_params = dict(default_params.items()
- + blackbox_default_params.items()
- + whitebox_default_params.items()
- + simple_default_params.items()
- + blackbox_simple_default_params.items()
- + whitebox_simple_default_params.items())
+ all_params = dict(list(default_params.items())
+ + list(blackbox_default_params.items())
+ + list(whitebox_default_params.items())
+ + list(simple_default_params.items())
+ + list(blackbox_simple_default_params.items())
+ + list(whitebox_simple_default_params.items()))
for k, v in all_params.items():
parser.add_argument("--" + k, type=type(v() if callable(v) else v))
blackbox_crash_main(args, unknown_args)
if args.test_type == 'whitebox':
whitebox_crash_main(args, unknown_args)
+ # Only delete the `expected_values_file` if test passes
+ if os.path.exists(expected_values_file):
+ os.remove(expected_values_file)
+
if __name__ == '__main__':
main()