]> git.proxmox.com Git - mirror_qemu.git/blob - tests/migration/guestperf/engine.py
python/qemu: split QEMUMachine out from underneath __init__.py
[mirror_qemu.git] / tests / migration / guestperf / engine.py
1 from __future__ import print_function
2 #
3 # Migration test main engine
4 #
5 # Copyright (c) 2016 Red Hat, Inc.
6 #
7 # This library is free software; you can redistribute it and/or
8 # modify it under the terms of the GNU Lesser General Public
9 # License as published by the Free Software Foundation; either
10 # version 2 of the License, or (at your option) any later version.
11 #
12 # This library is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 # Lesser General Public License for more details.
16 #
17 # You should have received a copy of the GNU Lesser General Public
18 # License along with this library; if not, see <http://www.gnu.org/licenses/>.
19 #
20
21
22 import os
23 import re
24 import sys
25 import time
26
27 from guestperf.progress import Progress, ProgressStats
28 from guestperf.report import Report
29 from guestperf.timings import TimingRecord, Timings
30
31 sys.path.append(os.path.join(os.path.dirname(__file__),
32 '..', '..', '..', 'python'))
33 from qemu.machine import QEMUMachine
34
35
36 class Engine(object):
37
38 def __init__(self, binary, dst_host, kernel, initrd, transport="tcp",
39 sleep=15, verbose=False, debug=False):
40
41 self._binary = binary # Path to QEMU binary
42 self._dst_host = dst_host # Hostname of target host
43 self._kernel = kernel # Path to kernel image
44 self._initrd = initrd # Path to stress initrd
45 self._transport = transport # 'unix' or 'tcp' or 'rdma'
46 self._sleep = sleep
47 self._verbose = verbose
48 self._debug = debug
49
50 if debug:
51 self._verbose = debug
52
53 def _vcpu_timing(self, pid, tid_list):
54 records = []
55 now = time.time()
56
57 jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
58 for tid in tid_list:
59 statfile = "/proc/%d/task/%d/stat" % (pid, tid)
60 with open(statfile, "r") as fh:
61 stat = fh.readline()
62 fields = stat.split(" ")
63 stime = int(fields[13])
64 utime = int(fields[14])
65 records.append(TimingRecord(tid, now, 1000 * (stime + utime) / jiffies_per_sec))
66 return records
67
68 def _cpu_timing(self, pid):
69 records = []
70 now = time.time()
71
72 jiffies_per_sec = os.sysconf(os.sysconf_names['SC_CLK_TCK'])
73 statfile = "/proc/%d/stat" % pid
74 with open(statfile, "r") as fh:
75 stat = fh.readline()
76 fields = stat.split(" ")
77 stime = int(fields[13])
78 utime = int(fields[14])
79 return TimingRecord(pid, now, 1000 * (stime + utime) / jiffies_per_sec)
80
81 def _migrate_progress(self, vm):
82 info = vm.command("query-migrate")
83
84 if "ram" not in info:
85 info["ram"] = {}
86
87 return Progress(
88 info.get("status", "active"),
89 ProgressStats(
90 info["ram"].get("transferred", 0),
91 info["ram"].get("remaining", 0),
92 info["ram"].get("total", 0),
93 info["ram"].get("duplicate", 0),
94 info["ram"].get("skipped", 0),
95 info["ram"].get("normal", 0),
96 info["ram"].get("normal-bytes", 0),
97 info["ram"].get("dirty-pages-rate", 0),
98 info["ram"].get("mbps", 0),
99 info["ram"].get("dirty-sync-count", 0)
100 ),
101 time.time(),
102 info.get("total-time", 0),
103 info.get("downtime", 0),
104 info.get("expected-downtime", 0),
105 info.get("setup-time", 0),
106 info.get("x-cpu-throttle-percentage", 0),
107 )
108
109 def _migrate(self, hardware, scenario, src, dst, connect_uri):
110 src_qemu_time = []
111 src_vcpu_time = []
112 src_pid = src.get_pid()
113
114 vcpus = src.command("query-cpus")
115 src_threads = []
116 for vcpu in vcpus:
117 src_threads.append(vcpu["thread_id"])
118
119 # XXX how to get dst timings on remote host ?
120
121 if self._verbose:
122 print("Sleeping %d seconds for initial guest workload run" % self._sleep)
123 sleep_secs = self._sleep
124 while sleep_secs > 1:
125 src_qemu_time.append(self._cpu_timing(src_pid))
126 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
127 time.sleep(1)
128 sleep_secs -= 1
129
130 if self._verbose:
131 print("Starting migration")
132 if scenario._auto_converge:
133 resp = src.command("migrate-set-capabilities",
134 capabilities = [
135 { "capability": "auto-converge",
136 "state": True }
137 ])
138 resp = src.command("migrate-set-parameters",
139 x_cpu_throttle_increment=scenario._auto_converge_step)
140
141 if scenario._post_copy:
142 resp = src.command("migrate-set-capabilities",
143 capabilities = [
144 { "capability": "postcopy-ram",
145 "state": True }
146 ])
147 resp = dst.command("migrate-set-capabilities",
148 capabilities = [
149 { "capability": "postcopy-ram",
150 "state": True }
151 ])
152
153 resp = src.command("migrate_set_speed",
154 value=scenario._bandwidth * 1024 * 1024)
155
156 resp = src.command("migrate_set_downtime",
157 value=scenario._downtime / 1024.0)
158
159 if scenario._compression_mt:
160 resp = src.command("migrate-set-capabilities",
161 capabilities = [
162 { "capability": "compress",
163 "state": True }
164 ])
165 resp = src.command("migrate-set-parameters",
166 compress_threads=scenario._compression_mt_threads)
167 resp = dst.command("migrate-set-capabilities",
168 capabilities = [
169 { "capability": "compress",
170 "state": True }
171 ])
172 resp = dst.command("migrate-set-parameters",
173 decompress_threads=scenario._compression_mt_threads)
174
175 if scenario._compression_xbzrle:
176 resp = src.command("migrate-set-capabilities",
177 capabilities = [
178 { "capability": "xbzrle",
179 "state": True }
180 ])
181 resp = dst.command("migrate-set-capabilities",
182 capabilities = [
183 { "capability": "xbzrle",
184 "state": True }
185 ])
186 resp = src.command("migrate-set-cache-size",
187 value=(hardware._mem * 1024 * 1024 * 1024 / 100 *
188 scenario._compression_xbzrle_cache))
189
190 resp = src.command("migrate", uri=connect_uri)
191
192 post_copy = False
193 paused = False
194
195 progress_history = []
196
197 start = time.time()
198 loop = 0
199 while True:
200 loop = loop + 1
201 time.sleep(0.05)
202
203 progress = self._migrate_progress(src)
204 if (loop % 20) == 0:
205 src_qemu_time.append(self._cpu_timing(src_pid))
206 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
207
208 if (len(progress_history) == 0 or
209 (progress_history[-1]._ram._iterations <
210 progress._ram._iterations)):
211 progress_history.append(progress)
212
213 if progress._status in ("completed", "failed", "cancelled"):
214 if progress._status == "completed" and paused:
215 dst.command("cont")
216 if progress_history[-1] != progress:
217 progress_history.append(progress)
218
219 if progress._status == "completed":
220 if self._verbose:
221 print("Sleeping %d seconds for final guest workload run" % self._sleep)
222 sleep_secs = self._sleep
223 while sleep_secs > 1:
224 time.sleep(1)
225 src_qemu_time.append(self._cpu_timing(src_pid))
226 src_vcpu_time.extend(self._vcpu_timing(src_pid, src_threads))
227 sleep_secs -= 1
228
229 return [progress_history, src_qemu_time, src_vcpu_time]
230
231 if self._verbose and (loop % 20) == 0:
232 print("Iter %d: remain %5dMB of %5dMB (total %5dMB @ %5dMb/sec)" % (
233 progress._ram._iterations,
234 progress._ram._remaining_bytes / (1024 * 1024),
235 progress._ram._total_bytes / (1024 * 1024),
236 progress._ram._transferred_bytes / (1024 * 1024),
237 progress._ram._transfer_rate_mbs,
238 ))
239
240 if progress._ram._iterations > scenario._max_iters:
241 if self._verbose:
242 print("No completion after %d iterations over RAM" % scenario._max_iters)
243 src.command("migrate_cancel")
244 continue
245
246 if time.time() > (start + scenario._max_time):
247 if self._verbose:
248 print("No completion after %d seconds" % scenario._max_time)
249 src.command("migrate_cancel")
250 continue
251
252 if (scenario._post_copy and
253 progress._ram._iterations >= scenario._post_copy_iters and
254 not post_copy):
255 if self._verbose:
256 print("Switching to post-copy after %d iterations" % scenario._post_copy_iters)
257 resp = src.command("migrate-start-postcopy")
258 post_copy = True
259
260 if (scenario._pause and
261 progress._ram._iterations >= scenario._pause_iters and
262 not paused):
263 if self._verbose:
264 print("Pausing VM after %d iterations" % scenario._pause_iters)
265 resp = src.command("stop")
266 paused = True
267
268 def _get_common_args(self, hardware, tunnelled=False):
269 args = [
270 "noapic",
271 "edd=off",
272 "printk.time=1",
273 "noreplace-smp",
274 "cgroup_disable=memory",
275 "pci=noearly",
276 "console=ttyS0",
277 ]
278 if self._debug:
279 args.append("debug")
280 else:
281 args.append("quiet")
282
283 args.append("ramsize=%s" % hardware._mem)
284
285 cmdline = " ".join(args)
286 if tunnelled:
287 cmdline = "'" + cmdline + "'"
288
289 argv = [
290 "-machine", "accel=kvm",
291 "-cpu", "host",
292 "-kernel", self._kernel,
293 "-initrd", self._initrd,
294 "-append", cmdline,
295 "-chardev", "stdio,id=cdev0",
296 "-device", "isa-serial,chardev=cdev0",
297 "-m", str((hardware._mem * 1024) + 512),
298 "-smp", str(hardware._cpus),
299 ]
300
301 if self._debug:
302 argv.extend(["-device", "sga"])
303
304 if hardware._prealloc_pages:
305 argv_source += ["-mem-path", "/dev/shm",
306 "-mem-prealloc"]
307 if hardware._locked_pages:
308 argv_source += ["-realtime", "mlock=on"]
309 if hardware._huge_pages:
310 pass
311
312 return argv
313
314 def _get_src_args(self, hardware):
315 return self._get_common_args(hardware)
316
317 def _get_dst_args(self, hardware, uri):
318 tunnelled = False
319 if self._dst_host != "localhost":
320 tunnelled = True
321 argv = self._get_common_args(hardware, tunnelled)
322 return argv + ["-incoming", uri]
323
324 @staticmethod
325 def _get_common_wrapper(cpu_bind, mem_bind):
326 wrapper = []
327 if len(cpu_bind) > 0 or len(mem_bind) > 0:
328 wrapper.append("numactl")
329 if cpu_bind:
330 wrapper.append("--physcpubind=%s" % ",".join(cpu_bind))
331 if mem_bind:
332 wrapper.append("--membind=%s" % ",".join(mem_bind))
333
334 return wrapper
335
336 def _get_src_wrapper(self, hardware):
337 return self._get_common_wrapper(hardware._src_cpu_bind, hardware._src_mem_bind)
338
339 def _get_dst_wrapper(self, hardware):
340 wrapper = self._get_common_wrapper(hardware._dst_cpu_bind, hardware._dst_mem_bind)
341 if self._dst_host != "localhost":
342 return ["ssh",
343 "-R", "9001:localhost:9001",
344 self._dst_host] + wrapper
345 else:
346 return wrapper
347
348 def _get_timings(self, vm):
349 log = vm.get_log()
350 if not log:
351 return []
352 if self._debug:
353 print(log)
354
355 regex = r"[^\s]+\s\((\d+)\):\sINFO:\s(\d+)ms\scopied\s\d+\sGB\sin\s(\d+)ms"
356 matcher = re.compile(regex)
357 records = []
358 for line in log.split("\n"):
359 match = matcher.match(line)
360 if match:
361 records.append(TimingRecord(int(match.group(1)),
362 int(match.group(2)) / 1000.0,
363 int(match.group(3))))
364 return records
365
366 def run(self, hardware, scenario, result_dir=os.getcwd()):
367 abs_result_dir = os.path.join(result_dir, scenario._name)
368
369 if self._transport == "tcp":
370 uri = "tcp:%s:9000" % self._dst_host
371 elif self._transport == "rdma":
372 uri = "rdma:%s:9000" % self._dst_host
373 elif self._transport == "unix":
374 if self._dst_host != "localhost":
375 raise Exception("Running use unix migration transport for non-local host")
376 uri = "unix:/var/tmp/qemu-migrate-%d.migrate" % os.getpid()
377 try:
378 os.remove(uri[5:])
379 os.remove(monaddr)
380 except:
381 pass
382
383 if self._dst_host != "localhost":
384 dstmonaddr = ("localhost", 9001)
385 else:
386 dstmonaddr = "/var/tmp/qemu-dst-%d-monitor.sock" % os.getpid()
387 srcmonaddr = "/var/tmp/qemu-src-%d-monitor.sock" % os.getpid()
388
389 src = QEMUMachine(self._binary,
390 args=self._get_src_args(hardware),
391 wrapper=self._get_src_wrapper(hardware),
392 name="qemu-src-%d" % os.getpid(),
393 monitor_address=srcmonaddr)
394
395 dst = QEMUMachine(self._binary,
396 args=self._get_dst_args(hardware, uri),
397 wrapper=self._get_dst_wrapper(hardware),
398 name="qemu-dst-%d" % os.getpid(),
399 monitor_address=dstmonaddr)
400
401 try:
402 src.launch()
403 dst.launch()
404
405 ret = self._migrate(hardware, scenario, src, dst, uri)
406 progress_history = ret[0]
407 qemu_timings = ret[1]
408 vcpu_timings = ret[2]
409 if uri[0:5] == "unix:":
410 os.remove(uri[5:])
411 if self._verbose:
412 print("Finished migration")
413
414 src.shutdown()
415 dst.shutdown()
416
417 return Report(hardware, scenario, progress_history,
418 Timings(self._get_timings(src) + self._get_timings(dst)),
419 Timings(qemu_timings),
420 Timings(vcpu_timings),
421 self._binary, self._dst_host, self._kernel,
422 self._initrd, self._transport, self._sleep)
423 except Exception as e:
424 if self._debug:
425 print("Failed: %s" % str(e))
426 try:
427 src.shutdown()
428 except:
429 pass
430 try:
431 dst.shutdown()
432 except:
433 pass
434
435 if self._debug:
436 print(src.get_log())
437 print(dst.get_log())
438 raise
439