]> git.proxmox.com Git - ceph.git/blob - ceph/src/spdk/scripts/perf/nvmf/run_nvmf.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / spdk / scripts / perf / nvmf / run_nvmf.py
1 #!/usr/bin/env python3
2
3 import os
4 import re
5 import sys
6 import json
7 import paramiko
8 import zipfile
9 import threading
10 import subprocess
11 import itertools
12 import time
13 import uuid
14 import rpc
15 import rpc.client
16 import pandas as pd
17 from collections import OrderedDict
18 from common import *
19
20
21 class Server:
22 def __init__(self, name, username, password, mode, nic_ips, transport):
23 self.name = name
24 self.mode = mode
25 self.username = username
26 self.password = password
27 self.nic_ips = nic_ips
28 self.transport = transport.lower()
29
30 if not re.match("^[A-Za-z0-9]*$", name):
31 self.log_print("Please use a name which contains only letters or numbers")
32 sys.exit(1)
33
34 def log_print(self, msg):
35 print("[%s] %s" % (self.name, msg), flush=True)
36
37
38 class Target(Server):
39 def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
40 use_null_block=False, sar_settings=None, pcm_settings=None,
41 bandwidth_settings=None):
42
43 super(Target, self).__init__(name, username, password, mode, nic_ips, transport)
44 self.null_block = bool(use_null_block)
45 self.enable_sar = False
46 self.enable_pcm_memory = False
47 self.enable_pcm = False
48 self.enable_bandwidth = False
49
50 if sar_settings:
51 self.enable_sar, self.sar_delay, self.sar_interval, self.sar_count = sar_settings
52
53 if pcm_settings:
54 self.pcm_dir, self.enable_pcm, self.enable_pcm_memory, self.pcm_delay, self.pcm_interval, self.pcm_count = pcm_settings
55
56 if bandwidth_settings:
57 self.enable_bandwidth, self.bandwidth_count = bandwidth_settings
58
59 self.script_dir = os.path.dirname(os.path.abspath(sys.argv[0]))
60 self.spdk_dir = os.path.abspath(os.path.join(self.script_dir, "../../../"))
61
62 def zip_spdk_sources(self, spdk_dir, dest_file):
63 self.log_print("Zipping SPDK source directory")
64 fh = zipfile.ZipFile(dest_file, "w", zipfile.ZIP_DEFLATED)
65 for root, directories, files in os.walk(spdk_dir, followlinks=True):
66 for file in files:
67 fh.write(os.path.relpath(os.path.join(root, file)))
68 fh.close()
69 self.log_print("Done zipping")
70
71 def read_json_stats(self, file):
72 with open(file, "r") as json_data:
73 data = json.load(json_data)
74 job_pos = 0 # job_post = 0 because using aggregated results
75
76 # Check if latency is in nano or microseconds to choose correct dict key
77 def get_lat_unit(key_prefix, dict_section):
78 # key prefix - lat, clat or slat.
79 # dict section - portion of json containing latency bucket in question
80 # Return dict key to access the bucket and unit as string
81 for k, v in dict_section.items():
82 if k.startswith(key_prefix):
83 return k, k.split("_")[1]
84
85 read_iops = float(data["jobs"][job_pos]["read"]["iops"])
86 read_bw = float(data["jobs"][job_pos]["read"]["bw"])
87 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["read"])
88 read_avg_lat = float(data["jobs"][job_pos]["read"][lat_key]["mean"])
89 read_min_lat = float(data["jobs"][job_pos]["read"][lat_key]["min"])
90 read_max_lat = float(data["jobs"][job_pos]["read"][lat_key]["max"])
91 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["read"])
92 read_p99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.000000"])
93 read_p99_9_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.900000"])
94 read_p99_99_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.990000"])
95 read_p99_999_lat = float(data["jobs"][job_pos]["read"][clat_key]["percentile"]["99.999000"])
96
97 if "ns" in lat_unit:
98 read_avg_lat, read_min_lat, read_max_lat = [x / 1000 for x in [read_avg_lat, read_min_lat, read_max_lat]]
99 if "ns" in clat_unit:
100 read_p99_lat = read_p99_lat / 1000
101 read_p99_9_lat = read_p99_9_lat / 1000
102 read_p99_99_lat = read_p99_99_lat / 1000
103 read_p99_999_lat = read_p99_999_lat / 1000
104
105 write_iops = float(data["jobs"][job_pos]["write"]["iops"])
106 write_bw = float(data["jobs"][job_pos]["write"]["bw"])
107 lat_key, lat_unit = get_lat_unit("lat", data["jobs"][job_pos]["write"])
108 write_avg_lat = float(data["jobs"][job_pos]["write"][lat_key]["mean"])
109 write_min_lat = float(data["jobs"][job_pos]["write"][lat_key]["min"])
110 write_max_lat = float(data["jobs"][job_pos]["write"][lat_key]["max"])
111 clat_key, clat_unit = get_lat_unit("clat", data["jobs"][job_pos]["write"])
112 write_p99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.000000"])
113 write_p99_9_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.900000"])
114 write_p99_99_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.990000"])
115 write_p99_999_lat = float(data["jobs"][job_pos]["write"][clat_key]["percentile"]["99.999000"])
116
117 if "ns" in lat_unit:
118 write_avg_lat, write_min_lat, write_max_lat = [x / 1000 for x in [write_avg_lat, write_min_lat, write_max_lat]]
119 if "ns" in clat_unit:
120 write_p99_lat = write_p99_lat / 1000
121 write_p99_9_lat = write_p99_9_lat / 1000
122 write_p99_99_lat = write_p99_99_lat / 1000
123 write_p99_999_lat = write_p99_999_lat / 1000
124
125 return [read_iops, read_bw, read_avg_lat, read_min_lat, read_max_lat,
126 read_p99_lat, read_p99_9_lat, read_p99_99_lat, read_p99_999_lat,
127 write_iops, write_bw, write_avg_lat, write_min_lat, write_max_lat,
128 write_p99_lat, write_p99_9_lat, write_p99_99_lat, write_p99_999_lat]
129
130 def parse_results(self, results_dir, initiator_count=None, run_num=None):
131 files = os.listdir(results_dir)
132 fio_files = filter(lambda x: ".fio" in x, files)
133 json_files = [x for x in files if ".json" in x]
134
135 headers = ["read_iops", "read_bw", "read_avg_lat_us", "read_min_lat_us", "read_max_lat_us",
136 "read_p99_lat_us", "read_p99.9_lat_us", "read_p99.99_lat_us", "read_p99.999_lat_us",
137 "write_iops", "write_bw", "write_avg_lat_us", "write_min_lat_us", "write_max_lat_us",
138 "write_p99_lat_us", "write_p99.9_lat_us", "write_p99.99_lat_us", "write_p99.999_lat_us"]
139
140 aggr_headers = ["iops", "bw", "avg_lat_us", "min_lat_us", "max_lat_us",
141 "p99_lat_us", "p99.9_lat_us", "p99.99_lat_us", "p99.999_lat_us"]
142
143 header_line = ",".join(["Name", *headers])
144 aggr_header_line = ",".join(["Name", *aggr_headers])
145
146 # Create empty results file
147 csv_file = "nvmf_results.csv"
148 with open(os.path.join(results_dir, csv_file), "w") as fh:
149 fh.write(aggr_header_line + "\n")
150 rows = set()
151
152 for fio_config in fio_files:
153 self.log_print("Getting FIO stats for %s" % fio_config)
154 job_name, _ = os.path.splitext(fio_config)
155
156 # Look in the filename for rwmixread value. Function arguments do
157 # not have that information.
158 # TODO: Improve this function by directly using workload params instead
159 # of regexing through filenames.
160 if "read" in job_name:
161 rw_mixread = 1
162 elif "write" in job_name:
163 rw_mixread = 0
164 else:
165 rw_mixread = float(re.search(r"m_(\d+)", job_name).group(1)) / 100
166
167 # If "_CPU" exists in name - ignore it
168 # Initiators for the same job could have diffrent num_cores parameter
169 job_name = re.sub(r"_\d+CPU", "", job_name)
170 job_result_files = [x for x in json_files if job_name in x]
171 self.log_print("Matching result files for current fio config:")
172 for j in job_result_files:
173 self.log_print("\t %s" % j)
174
175 # There may have been more than 1 initiator used in test, need to check that
176 # Result files are created so that string after last "_" separator is server name
177 inits_names = set([os.path.splitext(x)[0].split("_")[-1] for x in job_result_files])
178 inits_avg_results = []
179 for i in inits_names:
180 self.log_print("\tGetting stats for initiator %s" % i)
181 # There may have been more than 1 test run for this job, calculate average results for initiator
182 i_results = [x for x in job_result_files if i in x]
183 i_results_filename = re.sub(r"run_\d+_", "", i_results[0].replace("json", "csv"))
184
185 separate_stats = []
186 for r in i_results:
187 stats = self.read_json_stats(os.path.join(results_dir, r))
188 separate_stats.append(stats)
189 self.log_print(stats)
190
191 init_results = [sum(x) for x in zip(*separate_stats)]
192 init_results = [x / len(separate_stats) for x in init_results]
193 inits_avg_results.append(init_results)
194
195 self.log_print("\tAverage results for initiator %s" % i)
196 self.log_print(init_results)
197 with open(os.path.join(results_dir, i_results_filename), "w") as fh:
198 fh.write(header_line + "\n")
199 fh.write(",".join([job_name, *["{0:.3f}".format(x) for x in init_results]]) + "\n")
200
201 # Sum results of all initiators running this FIO job.
202 # Latency results are an average of latencies from accros all initiators.
203 inits_avg_results = [sum(x) for x in zip(*inits_avg_results)]
204 inits_avg_results = OrderedDict(zip(headers, inits_avg_results))
205 for key in inits_avg_results:
206 if "lat" in key:
207 inits_avg_results[key] /= len(inits_names)
208
209 # Aggregate separate read/write values into common labels
210 # Take rw_mixread into consideration for mixed read/write workloads.
211 aggregate_results = OrderedDict()
212 for h in aggr_headers:
213 read_stat, write_stat = [float(value) for key, value in inits_avg_results.items() if h in key]
214 if "lat" in h:
215 _ = rw_mixread * read_stat + (1 - rw_mixread) * write_stat
216 else:
217 _ = read_stat + write_stat
218 aggregate_results[h] = "{0:.3f}".format(_)
219
220 rows.add(",".join([job_name, *aggregate_results.values()]))
221
222 # Save results to file
223 for row in rows:
224 with open(os.path.join(results_dir, csv_file), "a") as fh:
225 fh.write(row + "\n")
226 self.log_print("You can find the test results in the file %s" % os.path.join(results_dir, csv_file))
227
228 def measure_sar(self, results_dir, sar_file_name):
229 self.log_print("Waiting %d delay before measuring SAR stats" % self.sar_delay)
230 time.sleep(self.sar_delay)
231 out = subprocess.check_output("sar -P ALL %s %s" % (self.sar_interval, self.sar_count), shell=True).decode(encoding="utf-8")
232 with open(os.path.join(results_dir, sar_file_name), "w") as fh:
233 for line in out.split("\n"):
234 if "Average" in line and "CPU" in line:
235 self.log_print("Summary CPU utilization from SAR:")
236 self.log_print(line)
237 if "Average" in line and "all" in line:
238 self.log_print(line)
239 fh.write(out)
240
241 def measure_pcm_memory(self, results_dir, pcm_file_name):
242 time.sleep(self.pcm_delay)
243 pcm_memory = subprocess.Popen("%s/pcm-memory.x %s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval,
244 results_dir, pcm_file_name), shell=True)
245 time.sleep(self.pcm_count)
246 pcm_memory.kill()
247
248 def measure_pcm(self, results_dir, pcm_file_name):
249 time.sleep(self.pcm_delay)
250 subprocess.run("%s/pcm.x %s -i=%s -csv=%s/%s" % (self.pcm_dir, self.pcm_interval, self.pcm_count,
251 results_dir, pcm_file_name), shell=True, check=True)
252 df = pd.read_csv(os.path.join(results_dir, pcm_file_name), header=[0, 1])
253 df = df.rename(columns=lambda x: re.sub(r'Unnamed:[\w\s]*$', '', x))
254 skt = df.loc[:, df.columns.get_level_values(1).isin({'UPI0', 'UPI1', 'UPI2'})]
255 skt_pcm_file_name = "_".join(["skt", pcm_file_name])
256 skt.to_csv(os.path.join(results_dir, skt_pcm_file_name), index=False)
257
258 def measure_bandwidth(self, results_dir, bandwidth_file_name):
259 bwm = subprocess.run("bwm-ng -o csv -F %s/%s -a 1 -t 1000 -c %s" % (results_dir, bandwidth_file_name,
260 self.bandwidth_count), shell=True, check=True)
261
262
263 class Initiator(Server):
264 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma", cpu_frequency=None,
265 nvmecli_bin="nvme", workspace="/tmp/spdk", cpus_allowed=None,
266 cpus_allowed_policy="shared", fio_bin="/usr/src/fio/fio"):
267
268 super(Initiator, self).__init__(name, username, password, mode, nic_ips, transport)
269
270 self.ip = ip
271 self.spdk_dir = workspace
272 if os.getenv('SPDK_WORKSPACE'):
273 self.spdk_dir = os.getenv('SPDK_WORKSPACE')
274 self.fio_bin = fio_bin
275 self.cpus_allowed = cpus_allowed
276 self.cpus_allowed_policy = cpus_allowed_policy
277 self.cpu_frequency = cpu_frequency
278 self.nvmecli_bin = nvmecli_bin
279 self.ssh_connection = paramiko.SSHClient()
280 self.ssh_connection.set_missing_host_key_policy(paramiko.AutoAddPolicy())
281 self.ssh_connection.connect(self.ip, username=self.username, password=self.password)
282 self.remote_call("sudo rm -rf %s/nvmf_perf" % self.spdk_dir)
283 self.remote_call("mkdir -p %s" % self.spdk_dir)
284 self.set_cpu_frequency()
285
286 def __del__(self):
287 self.ssh_connection.close()
288
289 def put_file(self, local, remote_dest):
290 ftp = self.ssh_connection.open_sftp()
291 ftp.put(local, remote_dest)
292 ftp.close()
293
294 def get_file(self, remote, local_dest):
295 ftp = self.ssh_connection.open_sftp()
296 ftp.get(remote, local_dest)
297 ftp.close()
298
299 def remote_call(self, cmd):
300 stdin, stdout, stderr = self.ssh_connection.exec_command(cmd)
301 out = stdout.read().decode(encoding="utf-8")
302 err = stderr.read().decode(encoding="utf-8")
303 return out, err
304
305 def copy_result_files(self, dest_dir):
306 self.log_print("Copying results")
307
308 if not os.path.exists(dest_dir):
309 os.mkdir(dest_dir)
310
311 # Get list of result files from initiator and copy them back to target
312 stdout, stderr = self.remote_call("ls %s/nvmf_perf" % self.spdk_dir)
313 file_list = stdout.strip().split("\n")
314
315 for file in file_list:
316 self.get_file(os.path.join(self.spdk_dir, "nvmf_perf", file),
317 os.path.join(dest_dir, file))
318 self.log_print("Done copying results")
319
320 def discover_subsystems(self, address_list, subsys_no):
321 num_nvmes = range(0, subsys_no)
322 nvme_discover_output = ""
323 for ip, subsys_no in itertools.product(address_list, num_nvmes):
324 self.log_print("Trying to discover: %s:%s" % (ip, 4420 + subsys_no))
325 nvme_discover_cmd = ["sudo",
326 "%s" % self.nvmecli_bin,
327 "discover", "-t %s" % self.transport,
328 "-s %s" % (4420 + subsys_no),
329 "-a %s" % ip]
330 nvme_discover_cmd = " ".join(nvme_discover_cmd)
331
332 stdout, stderr = self.remote_call(nvme_discover_cmd)
333 if stdout:
334 nvme_discover_output = nvme_discover_output + stdout
335
336 subsystems = re.findall(r'trsvcid:\s(\d+)\s+' # get svcid number
337 r'subnqn:\s+([a-zA-Z0-9\.\-\:]+)\s+' # get NQN id
338 r'traddr:\s+(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})', # get IP address
339 nvme_discover_output) # from nvme discovery output
340 subsystems = filter(lambda x: x[-1] in address_list, subsystems)
341 subsystems = list(set(subsystems))
342 subsystems.sort(key=lambda x: x[1])
343 self.log_print("Found matching subsystems on target side:")
344 for s in subsystems:
345 self.log_print(s)
346
347 return subsystems
348
349 def gen_fio_config(self, rw, rwmixread, block_size, io_depth, subsys_no, num_jobs=None, ramp_time=0, run_time=10):
350 fio_conf_template = """
351 [global]
352 ioengine={ioengine}
353 {spdk_conf}
354 thread=1
355 group_reporting=1
356 direct=1
357 percentile_list=50:90:99:99.5:99.9:99.99:99.999
358
359 norandommap=1
360 rw={rw}
361 rwmixread={rwmixread}
362 bs={block_size}
363 time_based=1
364 ramp_time={ramp_time}
365 runtime={run_time}
366 """
367 if "spdk" in self.mode:
368 subsystems = self.discover_subsystems(self.nic_ips, subsys_no)
369 bdev_conf = self.gen_spdk_bdev_conf(subsystems)
370 self.remote_call("echo '%s' > %s/bdev.conf" % (bdev_conf, self.spdk_dir))
371 ioengine = "%s/build/fio/spdk_bdev" % self.spdk_dir
372 spdk_conf = "spdk_conf=%s/bdev.conf" % self.spdk_dir
373 else:
374 ioengine = "libaio"
375 spdk_conf = ""
376 out, err = self.remote_call("sudo nvme list | grep -E 'SPDK|Linux' | awk '{print $1}'")
377 subsystems = [x for x in out.split("\n") if "nvme" in x]
378
379 if self.cpus_allowed is not None:
380 self.log_print("Limiting FIO workload execution on specific cores %s" % self.cpus_allowed)
381 cpus_num = 0
382 cpus = self.cpus_allowed.split(",")
383 for cpu in cpus:
384 if "-" in cpu:
385 a, b = cpu.split("-")
386 a = int(a)
387 b = int(b)
388 cpus_num += len(range(a, b))
389 else:
390 cpus_num += 1
391 threads = range(0, cpus_num)
392 elif hasattr(self, 'num_cores'):
393 self.log_print("Limiting FIO workload execution to %s cores" % self.num_cores)
394 threads = range(0, int(self.num_cores))
395 else:
396 threads = range(0, len(subsystems))
397
398 if "spdk" in self.mode:
399 filename_section = self.gen_fio_filename_conf(subsystems, threads, io_depth, num_jobs)
400 else:
401 filename_section = self.gen_fio_filename_conf(threads, io_depth, num_jobs)
402
403 fio_config = fio_conf_template.format(ioengine=ioengine, spdk_conf=spdk_conf,
404 rw=rw, rwmixread=rwmixread, block_size=block_size,
405 ramp_time=ramp_time, run_time=run_time)
406 if num_jobs:
407 fio_config = fio_config + "numjobs=%s \n" % num_jobs
408 if self.cpus_allowed is not None:
409 fio_config = fio_config + "cpus_allowed=%s \n" % self.cpus_allowed
410 fio_config = fio_config + "cpus_allowed_policy=%s \n" % self.cpus_allowed_policy
411 fio_config = fio_config + filename_section
412
413 fio_config_filename = "%s_%s_%s_m_%s" % (block_size, io_depth, rw, rwmixread)
414 if hasattr(self, "num_cores"):
415 fio_config_filename += "_%sCPU" % self.num_cores
416 fio_config_filename += ".fio"
417
418 self.remote_call("mkdir -p %s/nvmf_perf" % self.spdk_dir)
419 self.remote_call("echo '%s' > %s/nvmf_perf/%s" % (fio_config, self.spdk_dir, fio_config_filename))
420 self.log_print("Created FIO Config:")
421 self.log_print(fio_config)
422
423 return os.path.join(self.spdk_dir, "nvmf_perf", fio_config_filename)
424
425 def set_cpu_frequency(self):
426 if self.cpu_frequency is not None:
427 try:
428 self.remote_call('sudo cpupower frequency-set -g userspace')
429 self.remote_call('sudo cpupower frequency-set -f %s' % self.cpu_frequency)
430 cmd = "sudo cpupower frequency-info"
431 output, error = self.remote_call(cmd)
432 self.log_print(output)
433 self.log_print(error)
434 except Exception:
435 self.log_print("ERROR: cpu_frequency will not work when intel_pstate is enabled!")
436 sys.exit()
437 else:
438 self.log_print("WARNING: you have disabled intel_pstate and using default cpu governance.")
439
440 def run_fio(self, fio_config_file, run_num=None):
441 job_name, _ = os.path.splitext(fio_config_file)
442 self.log_print("Starting FIO run for job: %s" % job_name)
443 self.log_print("Using FIO: %s" % self.fio_bin)
444
445 if run_num:
446 for i in range(1, run_num + 1):
447 output_filename = job_name + "_run_" + str(i) + "_" + self.name + ".json"
448 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
449 output, error = self.remote_call(cmd)
450 self.log_print(output)
451 self.log_print(error)
452 else:
453 output_filename = job_name + "_" + self.name + ".json"
454 cmd = "sudo %s %s --output-format=json --output=%s" % (self.fio_bin, fio_config_file, output_filename)
455 output, error = self.remote_call(cmd)
456 self.log_print(output)
457 self.log_print(error)
458 self.log_print("FIO run finished. Results in: %s" % output_filename)
459
460
461 class KernelTarget(Target):
462 def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
463 use_null_block=False, sar_settings=None, pcm_settings=None,
464 bandwidth_settings=None, nvmet_bin="nvmetcli", **kwargs):
465
466 super(KernelTarget, self).__init__(name, username, password, mode, nic_ips, transport,
467 use_null_block, sar_settings, pcm_settings, bandwidth_settings)
468 self.nvmet_bin = nvmet_bin
469
470 def __del__(self):
471 nvmet_command(self.nvmet_bin, "clear")
472
473 def kernel_tgt_gen_nullblock_conf(self, address):
474 nvmet_cfg = {
475 "ports": [],
476 "hosts": [],
477 "subsystems": [],
478 }
479
480 nvmet_cfg["subsystems"].append({
481 "allowed_hosts": [],
482 "attr": {
483 "allow_any_host": "1",
484 "serial": "SPDK0001",
485 "version": "1.3"
486 },
487 "namespaces": [
488 {
489 "device": {
490 "path": "/dev/nullb0",
491 "uuid": "%s" % uuid.uuid4()
492 },
493 "enable": 1,
494 "nsid": 1
495 }
496 ],
497 "nqn": "nqn.2018-09.io.spdk:cnode1"
498 })
499
500 nvmet_cfg["ports"].append({
501 "addr": {
502 "adrfam": "ipv4",
503 "traddr": address,
504 "trsvcid": "4420",
505 "trtype": "%s" % self.transport,
506 },
507 "portid": 1,
508 "referrals": [],
509 "subsystems": ["nqn.2018-09.io.spdk:cnode1"]
510 })
511 with open("kernel.conf", 'w') as fh:
512 fh.write(json.dumps(nvmet_cfg, indent=2))
513
514 def kernel_tgt_gen_subsystem_conf(self, nvme_list, address_list):
515
516 nvmet_cfg = {
517 "ports": [],
518 "hosts": [],
519 "subsystems": [],
520 }
521
522 # Split disks between NIC IP's
523 disks_per_ip = int(len(nvme_list) / len(address_list))
524 disk_chunks = [nvme_list[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(address_list))]
525
526 subsys_no = 1
527 port_no = 0
528 for ip, chunk in zip(address_list, disk_chunks):
529 for disk in chunk:
530 nvmet_cfg["subsystems"].append({
531 "allowed_hosts": [],
532 "attr": {
533 "allow_any_host": "1",
534 "serial": "SPDK00%s" % subsys_no,
535 "version": "1.3"
536 },
537 "namespaces": [
538 {
539 "device": {
540 "path": disk,
541 "uuid": "%s" % uuid.uuid4()
542 },
543 "enable": 1,
544 "nsid": subsys_no
545 }
546 ],
547 "nqn": "nqn.2018-09.io.spdk:cnode%s" % subsys_no
548 })
549
550 nvmet_cfg["ports"].append({
551 "addr": {
552 "adrfam": "ipv4",
553 "traddr": ip,
554 "trsvcid": "%s" % (4420 + port_no),
555 "trtype": "%s" % self.transport
556 },
557 "portid": subsys_no,
558 "referrals": [],
559 "subsystems": ["nqn.2018-09.io.spdk:cnode%s" % subsys_no]
560 })
561 subsys_no += 1
562 port_no += 1
563
564 with open("kernel.conf", "w") as fh:
565 fh.write(json.dumps(nvmet_cfg, indent=2))
566 pass
567
568 def tgt_start(self):
569 self.log_print("Configuring kernel NVMeOF Target")
570
571 if self.null_block:
572 print("Configuring with null block device.")
573 if len(self.nic_ips) > 1:
574 print("Testing with null block limited to single RDMA NIC.")
575 print("Please specify only 1 IP address.")
576 exit(1)
577 self.subsys_no = 1
578 self.kernel_tgt_gen_nullblock_conf(self.nic_ips[0])
579 else:
580 print("Configuring with NVMe drives.")
581 nvme_list = get_nvme_devices()
582 self.kernel_tgt_gen_subsystem_conf(nvme_list, self.nic_ips)
583 self.subsys_no = len(nvme_list)
584
585 nvmet_command(self.nvmet_bin, "clear")
586 nvmet_command(self.nvmet_bin, "restore kernel.conf")
587 self.log_print("Done configuring kernel NVMeOF Target")
588
589
590 class SPDKTarget(Target):
591
592 def __init__(self, name, username, password, mode, nic_ips, transport="rdma",
593 use_null_block=False, sar_settings=None, pcm_settings=None,
594 bandwidth_settings=None, num_shared_buffers=4096, num_cores=1, **kwargs):
595
596 super(SPDKTarget, self).__init__(name, username, password, mode, nic_ips, transport,
597 use_null_block, sar_settings, pcm_settings, bandwidth_settings)
598 self.num_cores = num_cores
599 self.num_shared_buffers = num_shared_buffers
600
601 def spdk_tgt_configure(self):
602 self.log_print("Configuring SPDK NVMeOF target via RPC")
603 numa_list = get_used_numa_nodes()
604
605 # Create RDMA transport layer
606 rpc.nvmf.nvmf_create_transport(self.client, trtype=self.transport, num_shared_buffers=self.num_shared_buffers)
607 self.log_print("SPDK NVMeOF transport layer:")
608 rpc.client.print_dict(rpc.nvmf.nvmf_get_transports(self.client))
609
610 if self.null_block:
611 nvme_section = self.spdk_tgt_add_nullblock()
612 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips, req_num_disks=1)
613 else:
614 nvme_section = self.spdk_tgt_add_nvme_conf()
615 subsystems_section = self.spdk_tgt_add_subsystem_conf(self.nic_ips)
616 self.log_print("Done configuring SPDK NVMeOF Target")
617
618 def spdk_tgt_add_nullblock(self):
619 self.log_print("Adding null block bdev to config via RPC")
620 rpc.bdev.bdev_null_create(self.client, 102400, 4096, "Nvme0n1")
621 self.log_print("SPDK Bdevs configuration:")
622 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
623
624 def spdk_tgt_add_nvme_conf(self, req_num_disks=None):
625 self.log_print("Adding NVMe bdevs to config via RPC")
626
627 bdfs = get_nvme_devices_bdf()
628 bdfs = [b.replace(":", ".") for b in bdfs]
629
630 if req_num_disks:
631 if req_num_disks > len(bdfs):
632 self.log_print("ERROR: Requested number of disks is more than available %s" % len(bdfs))
633 sys.exit(1)
634 else:
635 bdfs = bdfs[0:req_num_disks]
636
637 for i, bdf in enumerate(bdfs):
638 rpc.bdev.bdev_nvme_attach_controller(self.client, name="Nvme%s" % i, trtype="PCIe", traddr=bdf)
639
640 self.log_print("SPDK Bdevs configuration:")
641 rpc.client.print_dict(rpc.bdev.bdev_get_bdevs(self.client))
642
643 def spdk_tgt_add_subsystem_conf(self, ips=None, req_num_disks=None):
644 self.log_print("Adding subsystems to config")
645 if not req_num_disks:
646 req_num_disks = get_nvme_devices_count()
647
648 # Distribute bdevs between provided NICs
649 num_disks = range(0, req_num_disks)
650 if len(num_disks) == 1:
651 disks_per_ip = 1
652 else:
653 disks_per_ip = int(len(num_disks) / len(ips))
654 disk_chunks = [num_disks[i * disks_per_ip:disks_per_ip + disks_per_ip * i] for i in range(0, len(ips))]
655
656 # Create subsystems, add bdevs to namespaces, add listeners
657 for ip, chunk in zip(ips, disk_chunks):
658 for c in chunk:
659 nqn = "nqn.2018-09.io.spdk:cnode%s" % c
660 serial = "SPDK00%s" % c
661 bdev_name = "Nvme%sn1" % c
662 rpc.nvmf.nvmf_create_subsystem(self.client, nqn, serial,
663 allow_any_host=True, max_namespaces=8)
664 rpc.nvmf.nvmf_subsystem_add_ns(self.client, nqn, bdev_name)
665
666 rpc.nvmf.nvmf_subsystem_add_listener(self.client, nqn,
667 trtype=self.transport,
668 traddr=ip,
669 trsvcid="4420",
670 adrfam="ipv4")
671
672 self.log_print("SPDK NVMeOF subsystem configuration:")
673 rpc.client.print_dict(rpc.nvmf.nvmf_get_subsystems(self.client))
674
675 def tgt_start(self):
676 if self.null_block:
677 self.subsys_no = 1
678 else:
679 self.subsys_no = get_nvme_devices_count()
680 self.log_print("Starting SPDK NVMeOF Target process")
681 nvmf_app_path = os.path.join(self.spdk_dir, "build/bin/nvmf_tgt")
682 command = " ".join([nvmf_app_path, "-m", self.num_cores])
683 proc = subprocess.Popen(command, shell=True)
684 self.pid = os.path.join(self.spdk_dir, "nvmf.pid")
685
686 with open(self.pid, "w") as fh:
687 fh.write(str(proc.pid))
688 self.nvmf_proc = proc
689 self.log_print("SPDK NVMeOF Target PID=%s" % self.pid)
690 self.log_print("Waiting for spdk to initilize...")
691 while True:
692 if os.path.exists("/var/tmp/spdk.sock"):
693 break
694 time.sleep(1)
695 self.client = rpc.client.JSONRPCClient("/var/tmp/spdk.sock")
696
697 self.spdk_tgt_configure()
698
699 def __del__(self):
700 if hasattr(self, "nvmf_proc"):
701 try:
702 self.nvmf_proc.terminate()
703 self.nvmf_proc.wait()
704 except Exception as e:
705 self.log_print(e)
706 self.nvmf_proc.kill()
707 self.nvmf_proc.communicate()
708
709
710 class KernelInitiator(Initiator):
711 def __init__(self, name, username, password, mode, nic_ips, ip, transport,
712 cpus_allowed=None, cpus_allowed_policy="shared",
713 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs):
714
715 super(KernelInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
716 cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy,
717 cpu_frequency=cpu_frequency, fio_bin=fio_bin)
718
719 self.extra_params = ""
720 if kwargs["extra_params"]:
721 self.extra_params = kwargs["extra_params"]
722
723 def __del__(self):
724 self.ssh_connection.close()
725
726 def kernel_init_connect(self, address_list, subsys_no):
727 subsystems = self.discover_subsystems(address_list, subsys_no)
728 self.log_print("Below connection attempts may result in error messages, this is expected!")
729 for subsystem in subsystems:
730 self.log_print("Trying to connect %s %s %s" % subsystem)
731 self.remote_call("sudo %s connect -t %s -s %s -n %s -a %s %s" % (self.nvmecli_bin,
732 self.transport,
733 *subsystem,
734 self.extra_params))
735 time.sleep(2)
736
737 def kernel_init_disconnect(self, address_list, subsys_no):
738 subsystems = self.discover_subsystems(address_list, subsys_no)
739 for subsystem in subsystems:
740 self.remote_call("sudo %s disconnect -n %s" % (self.nvmecli_bin, subsystem[1]))
741 time.sleep(1)
742
743 def gen_fio_filename_conf(self, threads, io_depth, num_jobs=1):
744 out, err = self.remote_call("sudo nvme list | grep -E 'SPDK|Linux' | awk '{print $1}'")
745 nvme_list = [x for x in out.split("\n") if "nvme" in x]
746
747 filename_section = ""
748 nvme_per_split = int(len(nvme_list) / len(threads))
749 remainder = len(nvme_list) % len(threads)
750 iterator = iter(nvme_list)
751 result = []
752 for i in range(len(threads)):
753 result.append([])
754 for j in range(nvme_per_split):
755 result[i].append(next(iterator))
756 if remainder:
757 result[i].append(next(iterator))
758 remainder -= 1
759 for i, r in enumerate(result):
760 header = "[filename%s]" % i
761 disks = "\n".join(["filename=%s" % x for x in r])
762 job_section_qd = round((io_depth * len(r)) / num_jobs)
763 if job_section_qd == 0:
764 job_section_qd = 1
765 iodepth = "iodepth=%s" % job_section_qd
766 filename_section = "\n".join([filename_section, header, disks, iodepth])
767
768 return filename_section
769
770
771 class SPDKInitiator(Initiator):
772 def __init__(self, name, username, password, mode, nic_ips, ip, transport="rdma",
773 num_cores=1, cpus_allowed=None, cpus_allowed_policy="shared",
774 cpu_frequency=None, fio_bin="/usr/src/fio/fio", **kwargs):
775 super(SPDKInitiator, self).__init__(name, username, password, mode, nic_ips, ip, transport,
776 cpus_allowed=cpus_allowed, cpus_allowed_policy=cpus_allowed_policy,
777 cpu_frequency=cpu_frequency, fio_bin=fio_bin)
778
779 self.num_cores = num_cores
780
781 def install_spdk(self, local_spdk_zip):
782 self.put_file(local_spdk_zip, "/tmp/spdk_drop.zip")
783 self.log_print("Copied sources zip from target")
784 self.remote_call("unzip -qo /tmp/spdk_drop.zip -d %s" % self.spdk_dir)
785
786 self.log_print("Sources unpacked")
787 self.log_print("Using fio binary %s" % self.fio_bin)
788 self.remote_call("cd %s; git submodule update --init; make clean; ./configure --with-rdma --with-fio=%s;"
789 "make -j$(($(nproc)*2))" % (self.spdk_dir, os.path.dirname(self.fio_bin)))
790
791 self.log_print("SPDK built")
792 self.remote_call("sudo %s/scripts/setup.sh" % self.spdk_dir)
793
794 def gen_spdk_bdev_conf(self, remote_subsystem_list):
795 header = "[Nvme]"
796 row_template = """ TransportId "trtype:{transport} adrfam:IPv4 traddr:{ip} trsvcid:{svc} subnqn:{nqn}" Nvme{i}"""
797
798 bdev_rows = [row_template.format(transport=self.transport,
799 svc=x[0],
800 nqn=x[1],
801 ip=x[2],
802 i=i) for i, x in enumerate(remote_subsystem_list)]
803 bdev_rows = "\n".join(bdev_rows)
804 bdev_section = "\n".join([header, bdev_rows])
805 return bdev_section
806
807 def gen_fio_filename_conf(self, subsystems, threads, io_depth, num_jobs=1):
808 filename_section = ""
809 if len(threads) >= len(subsystems):
810 threads = range(0, len(subsystems))
811 filenames = ["Nvme%sn1" % x for x in range(0, len(subsystems))]
812 nvme_per_split = int(len(subsystems) / len(threads))
813 remainder = len(subsystems) % len(threads)
814 iterator = iter(filenames)
815 result = []
816 for i in range(len(threads)):
817 result.append([])
818 for j in range(nvme_per_split):
819 result[i].append(next(iterator))
820 if remainder:
821 result[i].append(next(iterator))
822 remainder -= 1
823 for i, r in enumerate(result):
824 header = "[filename%s]" % i
825 disks = "\n".join(["filename=%s" % x for x in r])
826 job_section_qd = round((io_depth * len(r)) / num_jobs)
827 if job_section_qd == 0:
828 job_section_qd = 1
829 iodepth = "iodepth=%s" % job_section_qd
830 filename_section = "\n".join([filename_section, header, disks, iodepth])
831
832 return filename_section
833
834
835 if __name__ == "__main__":
836 spdk_zip_path = "/tmp/spdk.zip"
837 target_results_dir = "/tmp/results"
838
839 if (len(sys.argv) > 1):
840 config_file_path = sys.argv[1]
841 else:
842 script_full_dir = os.path.dirname(os.path.realpath(__file__))
843 config_file_path = os.path.join(script_full_dir, "config.json")
844
845 print("Using config file: %s" % config_file_path)
846 with open(config_file_path, "r") as config:
847 data = json.load(config)
848
849 initiators = []
850 fio_cases = []
851
852 for k, v in data.items():
853 if "target" in k:
854 if data[k]["mode"] == "spdk":
855 target_obj = SPDKTarget(name=k, **data["general"], **v)
856 elif data[k]["mode"] == "kernel":
857 target_obj = KernelTarget(name=k, **data["general"], **v)
858 elif "initiator" in k:
859 if data[k]["mode"] == "spdk":
860 init_obj = SPDKInitiator(name=k, **data["general"], **v)
861 elif data[k]["mode"] == "kernel":
862 init_obj = KernelInitiator(name=k, **data["general"], **v)
863 initiators.append(init_obj)
864 elif "fio" in k:
865 fio_workloads = itertools.product(data[k]["bs"],
866 data[k]["qd"],
867 data[k]["rw"])
868
869 fio_run_time = data[k]["run_time"]
870 fio_ramp_time = data[k]["ramp_time"]
871 fio_rw_mix_read = data[k]["rwmixread"]
872 fio_run_num = data[k]["run_num"] if "run_num" in data[k].keys() else None
873 fio_num_jobs = data[k]["num_jobs"] if "num_jobs" in data[k].keys() else None
874 else:
875 continue
876
877 # Copy and install SPDK on remote initiators
878 if "skip_spdk_install" not in data["general"]:
879 target_obj.zip_spdk_sources(target_obj.spdk_dir, spdk_zip_path)
880 threads = []
881 for i in initiators:
882 if i.mode == "spdk":
883 t = threading.Thread(target=i.install_spdk, args=(spdk_zip_path,))
884 threads.append(t)
885 t.start()
886 for t in threads:
887 t.join()
888
889 target_obj.tgt_start()
890
891 # Poor mans threading
892 # Run FIO tests
893 for block_size, io_depth, rw in fio_workloads:
894 threads = []
895 configs = []
896 for i in initiators:
897 if i.mode == "kernel":
898 i.kernel_init_connect(i.nic_ips, target_obj.subsys_no)
899
900 cfg = i.gen_fio_config(rw, fio_rw_mix_read, block_size, io_depth, target_obj.subsys_no,
901 fio_num_jobs, fio_ramp_time, fio_run_time)
902 configs.append(cfg)
903
904 for i, cfg in zip(initiators, configs):
905 t = threading.Thread(target=i.run_fio, args=(cfg, fio_run_num))
906 threads.append(t)
907 if target_obj.enable_sar:
908 sar_file_name = "_".join([str(block_size), str(rw), str(io_depth), "sar"])
909 sar_file_name = ".".join([sar_file_name, "txt"])
910 t = threading.Thread(target=target_obj.measure_sar, args=(target_results_dir, sar_file_name))
911 threads.append(t)
912
913 if target_obj.enable_pcm:
914 pcm_file_name = "_".join(["pcm_cpu", str(block_size), str(rw), str(io_depth)])
915 pcm_file_name = ".".join([pcm_file_name, "csv"])
916 t = threading.Thread(target=target_obj.measure_pcm, args=(target_results_dir, pcm_file_name,))
917 threads.append(t)
918
919 if target_obj.enable_pcm_memory:
920 pcm_file_name = "_".join(["pcm_memory", str(block_size), str(rw), str(io_depth)])
921 pcm_file_name = ".".join([pcm_file_name, "csv"])
922 t = threading.Thread(target=target_obj.measure_pcm_memory, args=(target_results_dir, pcm_file_name,))
923 threads.append(t)
924
925 if target_obj.enable_bandwidth:
926 bandwidth_file_name = "_".join(["bandwidth", str(block_size), str(rw), str(io_depth)])
927 bandwidth_file_name = ".".join([bandwidth_file_name, "csv"])
928 t = threading.Thread(target=target_obj.measure_bandwidth, args=(target_results_dir, bandwidth_file_name,))
929 threads.append(t)
930
931 for t in threads:
932 t.start()
933 for t in threads:
934 t.join()
935
936 for i in initiators:
937 if i.mode == "kernel":
938 i.kernel_init_disconnect(i.nic_ips, target_obj.subsys_no)
939 i.copy_result_files(target_results_dir)
940
941 target_obj.parse_results(target_results_dir)