+++ /dev/null
-# -*- coding: utf-8 -*-\r
-# This file should be kept compatible with both Python 2.6 and Python >= 3.0.\r
-\r
-from __future__ import division\r
-from __future__ import print_function\r
-\r
-"""\r
-ccbench, a Python concurrency benchmark.\r
-"""\r
-\r
-import time\r
-import os\r
-import sys\r
-import functools\r
-import itertools\r
-import threading\r
-import subprocess\r
-import socket\r
-from optparse import OptionParser, SUPPRESS_HELP\r
-import platform\r
-\r
-# Compatibility\r
-try:\r
- xrange\r
-except NameError:\r
- xrange = range\r
-\r
-try:\r
- map = itertools.imap\r
-except AttributeError:\r
- pass\r
-\r
-\r
-THROUGHPUT_DURATION = 2.0\r
-\r
-LATENCY_PING_INTERVAL = 0.1\r
-LATENCY_DURATION = 2.0\r
-\r
-BANDWIDTH_PACKET_SIZE = 1024\r
-BANDWIDTH_DURATION = 2.0\r
-\r
-\r
-def task_pidigits():\r
- """Pi calculation (Python)"""\r
- _map = map\r
- _count = itertools.count\r
- _islice = itertools.islice\r
-\r
- def calc_ndigits(n):\r
- # From http://shootout.alioth.debian.org/\r
- def gen_x():\r
- return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))\r
-\r
- def compose(a, b):\r
- aq, ar, as_, at = a\r
- bq, br, bs, bt = b\r
- return (aq * bq,\r
- aq * br + ar * bt,\r
- as_ * bq + at * bs,\r
- as_ * br + at * bt)\r
-\r
- def extract(z, j):\r
- q, r, s, t = z\r
- return (q*j + r) // (s*j + t)\r
-\r
- def pi_digits():\r
- z = (1, 0, 0, 1)\r
- x = gen_x()\r
- while 1:\r
- y = extract(z, 3)\r
- while y != extract(z, 4):\r
- z = compose(z, next(x))\r
- y = extract(z, 3)\r
- z = compose((10, -10*y, 0, 1), z)\r
- yield y\r
-\r
- return list(_islice(pi_digits(), n))\r
-\r
- return calc_ndigits, (50, )\r
-\r
-def task_regex():\r
- """regular expression (C)"""\r
- # XXX this task gives horrendous latency results.\r
- import re\r
- # Taken from the `inspect` module\r
- pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE)\r
- with open(__file__, "r") as f:\r
- arg = f.read(2000)\r
-\r
- def findall(s):\r
- t = time.time()\r
- try:\r
- return pat.findall(s)\r
- finally:\r
- print(time.time() - t)\r
- return pat.findall, (arg, )\r
-\r
-def task_sort():\r
- """list sorting (C)"""\r
- def list_sort(l):\r
- l = l[::-1]\r
- l.sort()\r
-\r
- return list_sort, (list(range(1000)), )\r
-\r
-def task_compress_zlib():\r
- """zlib compression (C)"""\r
- import zlib\r
- with open(__file__, "rb") as f:\r
- arg = f.read(5000) * 3\r
-\r
- def compress(s):\r
- zlib.decompress(zlib.compress(s, 5))\r
- return compress, (arg, )\r
-\r
-def task_compress_bz2():\r
- """bz2 compression (C)"""\r
- import bz2\r
- with open(__file__, "rb") as f:\r
- arg = f.read(3000) * 2\r
-\r
- def compress(s):\r
- bz2.compress(s)\r
- return compress, (arg, )\r
-\r
-def task_hashing():\r
- """SHA1 hashing (C)"""\r
- import hashlib\r
- with open(__file__, "rb") as f:\r
- arg = f.read(5000) * 30\r
-\r
- def compute(s):\r
- hashlib.sha1(s).digest()\r
- return compute, (arg, )\r
-\r
-\r
-throughput_tasks = [task_pidigits, task_regex]\r
-for mod in 'bz2', 'hashlib':\r
- try:\r
- globals()[mod] = __import__(mod)\r
- except ImportError:\r
- globals()[mod] = None\r
-\r
-# For whatever reasons, zlib gives irregular results, so we prefer bz2 or\r
-# hashlib if available.\r
-# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)\r
-if bz2 is not None:\r
- throughput_tasks.append(task_compress_bz2)\r
-elif hashlib is not None:\r
- throughput_tasks.append(task_hashing)\r
-else:\r
- throughput_tasks.append(task_compress_zlib)\r
-\r
-latency_tasks = throughput_tasks\r
-bandwidth_tasks = [task_pidigits]\r
-\r
-\r
-class TimedLoop:\r
- def __init__(self, func, args):\r
- self.func = func\r
- self.args = args\r
-\r
- def __call__(self, start_time, min_duration, end_event, do_yield=False):\r
- step = 20\r
- niters = 0\r
- duration = 0.0\r
- _time = time.time\r
- _sleep = time.sleep\r
- _func = self.func\r
- _args = self.args\r
- t1 = start_time\r
- while True:\r
- for i in range(step):\r
- _func(*_args)\r
- t2 = _time()\r
- # If another thread terminated, the current measurement is invalid\r
- # => return the previous one.\r
- if end_event:\r
- return niters, duration\r
- niters += step\r
- duration = t2 - start_time\r
- if duration >= min_duration:\r
- end_event.append(None)\r
- return niters, duration\r
- if t2 - t1 < 0.01:\r
- # Minimize interference of measurement on overall runtime\r
- step = step * 3 // 2\r
- elif do_yield:\r
- # OS scheduling of Python threads is sometimes so bad that we\r
- # have to force thread switching ourselves, otherwise we get\r
- # completely useless results.\r
- _sleep(0.0001)\r
- t1 = t2\r
-\r
-\r
-def run_throughput_test(func, args, nthreads):\r
- assert nthreads >= 1\r
-\r
- # Warm up\r
- func(*args)\r
-\r
- results = []\r
- loop = TimedLoop(func, args)\r
- end_event = []\r
-\r
- if nthreads == 1:\r
- # Pure single-threaded performance, without any switching or\r
- # synchronization overhead.\r
- start_time = time.time()\r
- results.append(loop(start_time, THROUGHPUT_DURATION,\r
- end_event, do_yield=False))\r
- return results\r
-\r
- started = False\r
- ready_cond = threading.Condition()\r
- start_cond = threading.Condition()\r
- ready = []\r
-\r
- def run():\r
- with ready_cond:\r
- ready.append(None)\r
- ready_cond.notify()\r
- with start_cond:\r
- while not started:\r
- start_cond.wait()\r
- results.append(loop(start_time, THROUGHPUT_DURATION,\r
- end_event, do_yield=True))\r
-\r
- threads = []\r
- for i in range(nthreads):\r
- threads.append(threading.Thread(target=run))\r
- for t in threads:\r
- t.setDaemon(True)\r
- t.start()\r
- # We don't want measurements to include thread startup overhead,\r
- # so we arrange for timing to start after all threads are ready.\r
- with ready_cond:\r
- while len(ready) < nthreads:\r
- ready_cond.wait()\r
- with start_cond:\r
- start_time = time.time()\r
- started = True\r
- start_cond.notify(nthreads)\r
- for t in threads:\r
- t.join()\r
-\r
- return results\r
-\r
-def run_throughput_tests(max_threads):\r
- for task in throughput_tasks:\r
- print(task.__doc__)\r
- print()\r
- func, args = task()\r
- nthreads = 1\r
- baseline_speed = None\r
- while nthreads <= max_threads:\r
- results = run_throughput_test(func, args, nthreads)\r
- # Taking the max duration rather than average gives pessimistic\r
- # results rather than optimistic.\r
- speed = sum(r[0] for r in results) / max(r[1] for r in results)\r
- print("threads=%d: %d" % (nthreads, speed), end="")\r
- if baseline_speed is None:\r
- print(" iterations/s.")\r
- baseline_speed = speed\r
- else:\r
- print(" ( %d %%)" % (speed / baseline_speed * 100))\r
- nthreads += 1\r
- print()\r
-\r
-\r
-LAT_END = "END"\r
-\r
-def _sendto(sock, s, addr):\r
- sock.sendto(s.encode('ascii'), addr)\r
-\r
-def _recv(sock, n):\r
- return sock.recv(n).decode('ascii')\r
-\r
-def latency_client(addr, nb_pings, interval):\r
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\r
- _time = time.time\r
- _sleep = time.sleep\r
- def _ping():\r
- _sendto(sock, "%r\n" % _time(), addr)\r
- # The first ping signals the parent process that we are ready.\r
- _ping()\r
- # We give the parent a bit of time to notice.\r
- _sleep(1.0)\r
- for i in range(nb_pings):\r
- _sleep(interval)\r
- _ping()\r
- _sendto(sock, LAT_END + "\n", addr)\r
-\r
-def run_latency_client(**kwargs):\r
- cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]\r
- cmd_line.extend(['--latclient', repr(kwargs)])\r
- return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,\r
- #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)\r
-\r
-def run_latency_test(func, args, nthreads):\r
- # Create a listening socket to receive the pings. We use UDP which should\r
- # be painlessly cross-platform.\r
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\r
- sock.bind(("127.0.0.1", 0))\r
- addr = sock.getsockname()\r
-\r
- interval = LATENCY_PING_INTERVAL\r
- duration = LATENCY_DURATION\r
- nb_pings = int(duration / interval)\r
-\r
- results = []\r
- threads = []\r
- end_event = []\r
- start_cond = threading.Condition()\r
- started = False\r
- if nthreads > 0:\r
- # Warm up\r
- func(*args)\r
-\r
- results = []\r
- loop = TimedLoop(func, args)\r
- ready = []\r
- ready_cond = threading.Condition()\r
-\r
- def run():\r
- with ready_cond:\r
- ready.append(None)\r
- ready_cond.notify()\r
- with start_cond:\r
- while not started:\r
- start_cond.wait()\r
- loop(start_time, duration * 1.5, end_event, do_yield=False)\r
-\r
- for i in range(nthreads):\r
- threads.append(threading.Thread(target=run))\r
- for t in threads:\r
- t.setDaemon(True)\r
- t.start()\r
- # Wait for threads to be ready\r
- with ready_cond:\r
- while len(ready) < nthreads:\r
- ready_cond.wait()\r
-\r
- # Run the client and wait for the first ping(s) to arrive before\r
- # unblocking the background threads.\r
- chunks = []\r
- process = run_latency_client(addr=sock.getsockname(),\r
- nb_pings=nb_pings, interval=interval)\r
- s = _recv(sock, 4096)\r
- _time = time.time\r
-\r
- with start_cond:\r
- start_time = _time()\r
- started = True\r
- start_cond.notify(nthreads)\r
-\r
- while LAT_END not in s:\r
- s = _recv(sock, 4096)\r
- t = _time()\r
- chunks.append((t, s))\r
-\r
- # Tell the background threads to stop.\r
- end_event.append(None)\r
- for t in threads:\r
- t.join()\r
- process.wait()\r
-\r
- for recv_time, chunk in chunks:\r
- # NOTE: it is assumed that a line sent by a client wasn't received\r
- # in two chunks because the lines are very small.\r
- for line in chunk.splitlines():\r
- line = line.strip()\r
- if line and line != LAT_END:\r
- send_time = eval(line)\r
- assert isinstance(send_time, float)\r
- results.append((send_time, recv_time))\r
-\r
- return results\r
-\r
-def run_latency_tests(max_threads):\r
- for task in latency_tasks:\r
- print("Background CPU task:", task.__doc__)\r
- print()\r
- func, args = task()\r
- nthreads = 0\r
- while nthreads <= max_threads:\r
- results = run_latency_test(func, args, nthreads)\r
- n = len(results)\r
- # We print out milliseconds\r
- lats = [1000 * (t2 - t1) for (t1, t2) in results]\r
- #print(list(map(int, lats)))\r
- avg = sum(lats) / n\r
- dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5\r
- print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="")\r
- print()\r
- #print(" [... from %d samples]" % n)\r
- nthreads += 1\r
- print()\r
-\r
-\r
-BW_END = "END"\r
-\r
-def bandwidth_client(addr, packet_size, duration):\r
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\r
- sock.bind(("127.0.0.1", 0))\r
- local_addr = sock.getsockname()\r
- _time = time.time\r
- _sleep = time.sleep\r
- def _send_chunk(msg):\r
- _sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr)\r
- # We give the parent some time to be ready.\r
- _sleep(1.0)\r
- try:\r
- start_time = _time()\r
- end_time = start_time + duration * 2.0\r
- i = 0\r
- while _time() < end_time:\r
- _send_chunk(str(i))\r
- s = _recv(sock, packet_size)\r
- assert len(s) == packet_size\r
- i += 1\r
- _send_chunk(BW_END)\r
- finally:\r
- sock.close()\r
-\r
-def run_bandwidth_client(**kwargs):\r
- cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]\r
- cmd_line.extend(['--bwclient', repr(kwargs)])\r
- return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,\r
- #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)\r
-\r
-def run_bandwidth_test(func, args, nthreads):\r
- # Create a listening socket to receive the packets. We use UDP which should\r
- # be painlessly cross-platform.\r
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\r
- sock.bind(("127.0.0.1", 0))\r
- addr = sock.getsockname()\r
-\r
- duration = BANDWIDTH_DURATION\r
- packet_size = BANDWIDTH_PACKET_SIZE\r
-\r
- results = []\r
- threads = []\r
- end_event = []\r
- start_cond = threading.Condition()\r
- started = False\r
- if nthreads > 0:\r
- # Warm up\r
- func(*args)\r
-\r
- results = []\r
- loop = TimedLoop(func, args)\r
- ready = []\r
- ready_cond = threading.Condition()\r
-\r
- def run():\r
- with ready_cond:\r
- ready.append(None)\r
- ready_cond.notify()\r
- with start_cond:\r
- while not started:\r
- start_cond.wait()\r
- loop(start_time, duration * 1.5, end_event, do_yield=False)\r
-\r
- for i in range(nthreads):\r
- threads.append(threading.Thread(target=run))\r
- for t in threads:\r
- t.setDaemon(True)\r
- t.start()\r
- # Wait for threads to be ready\r
- with ready_cond:\r
- while len(ready) < nthreads:\r
- ready_cond.wait()\r
-\r
- # Run the client and wait for the first packet to arrive before\r
- # unblocking the background threads.\r
- process = run_bandwidth_client(addr=addr,\r
- packet_size=packet_size,\r
- duration=duration)\r
- _time = time.time\r
- # This will also wait for the parent to be ready\r
- s = _recv(sock, packet_size)\r
- remote_addr = eval(s.partition('#')[0])\r
-\r
- with start_cond:\r
- start_time = _time()\r
- started = True\r
- start_cond.notify(nthreads)\r
-\r
- n = 0\r
- first_time = None\r
- while not end_event and BW_END not in s:\r
- _sendto(sock, s, remote_addr)\r
- s = _recv(sock, packet_size)\r
- if first_time is None:\r
- first_time = _time()\r
- n += 1\r
- end_time = _time()\r
-\r
- end_event.append(None)\r
- for t in threads:\r
- t.join()\r
- process.kill()\r
-\r
- return (n - 1) / (end_time - first_time)\r
-\r
-def run_bandwidth_tests(max_threads):\r
- for task in bandwidth_tasks:\r
- print("Background CPU task:", task.__doc__)\r
- print()\r
- func, args = task()\r
- nthreads = 0\r
- baseline_speed = None\r
- while nthreads <= max_threads:\r
- results = run_bandwidth_test(func, args, nthreads)\r
- speed = results\r
- #speed = len(results) * 1.0 / results[-1][0]\r
- print("CPU threads=%d: %.1f" % (nthreads, speed), end="")\r
- if baseline_speed is None:\r
- print(" packets/s.")\r
- baseline_speed = speed\r
- else:\r
- print(" ( %d %%)" % (speed / baseline_speed * 100))\r
- nthreads += 1\r
- print()\r
-\r
-\r
-def main():\r
- usage = "usage: %prog [-h|--help] [options]"\r
- parser = OptionParser(usage=usage)\r
- parser.add_option("-t", "--throughput",\r
- action="store_true", dest="throughput", default=False,\r
- help="run throughput tests")\r
- parser.add_option("-l", "--latency",\r
- action="store_true", dest="latency", default=False,\r
- help="run latency tests")\r
- parser.add_option("-b", "--bandwidth",\r
- action="store_true", dest="bandwidth", default=False,\r
- help="run I/O bandwidth tests")\r
- parser.add_option("-i", "--interval",\r
- action="store", type="int", dest="check_interval", default=None,\r
- help="sys.setcheckinterval() value")\r
- parser.add_option("-I", "--switch-interval",\r
- action="store", type="float", dest="switch_interval", default=None,\r
- help="sys.setswitchinterval() value")\r
- parser.add_option("-n", "--num-threads",\r
- action="store", type="int", dest="nthreads", default=4,\r
- help="max number of threads in tests")\r
-\r
- # Hidden option to run the pinging and bandwidth clients\r
- parser.add_option("", "--latclient",\r
- action="store", dest="latclient", default=None,\r
- help=SUPPRESS_HELP)\r
- parser.add_option("", "--bwclient",\r
- action="store", dest="bwclient", default=None,\r
- help=SUPPRESS_HELP)\r
-\r
- options, args = parser.parse_args()\r
- if args:\r
- parser.error("unexpected arguments")\r
-\r
- if options.latclient:\r
- kwargs = eval(options.latclient)\r
- latency_client(**kwargs)\r
- return\r
-\r
- if options.bwclient:\r
- kwargs = eval(options.bwclient)\r
- bandwidth_client(**kwargs)\r
- return\r
-\r
- if not options.throughput and not options.latency and not options.bandwidth:\r
- options.throughput = options.latency = options.bandwidth = True\r
- if options.check_interval:\r
- sys.setcheckinterval(options.check_interval)\r
- if options.switch_interval:\r
- sys.setswitchinterval(options.switch_interval)\r
-\r
- print("== %s %s (%s) ==" % (\r
- platform.python_implementation(),\r
- platform.python_version(),\r
- platform.python_build()[0],\r
- ))\r
- # Processor identification often has repeated spaces\r
- cpu = ' '.join(platform.processor().split())\r
- print("== %s %s on '%s' ==" % (\r
- platform.machine(),\r
- platform.system(),\r
- cpu,\r
- ))\r
- print()\r
-\r
- if options.throughput:\r
- print("--- Throughput ---")\r
- print()\r
- run_throughput_tests(options.nthreads)\r
-\r
- if options.latency:\r
- print("--- Latency ---")\r
- print()\r
- run_latency_tests(options.nthreads)\r
-\r
- if options.bandwidth:\r
- print("--- I/O bandwidth ---")\r
- print()\r
- run_bandwidth_tests(options.nthreads)\r
-\r
-if __name__ == "__main__":\r
- main()\r