]> git.proxmox.com Git - mirror_edk2.git/blobdiff - AppPkg/Applications/Python/Python-2.7.2/Tools/ccbench/ccbench.py
edk2: Remove AppPkg, StdLib, StdLibPrivateInternalFiles
[mirror_edk2.git] / AppPkg / Applications / Python / Python-2.7.2 / Tools / ccbench / ccbench.py
diff --git a/AppPkg/Applications/Python/Python-2.7.2/Tools/ccbench/ccbench.py b/AppPkg/Applications/Python/Python-2.7.2/Tools/ccbench/ccbench.py
deleted file mode 100644 (file)
index a2cf720..0000000
+++ /dev/null
@@ -1,609 +0,0 @@
-# -*- coding: utf-8 -*-\r
-# This file should be kept compatible with both Python 2.6 and Python >= 3.0.\r
-\r
-from __future__ import division\r
-from __future__ import print_function\r
-\r
-"""\r
-ccbench, a Python concurrency benchmark.\r
-"""\r
-\r
-import time\r
-import os\r
-import sys\r
-import functools\r
-import itertools\r
-import threading\r
-import subprocess\r
-import socket\r
-from optparse import OptionParser, SUPPRESS_HELP\r
-import platform\r
-\r
-# Compatibility\r
-try:\r
-    xrange\r
-except NameError:\r
-    xrange = range\r
-\r
-try:\r
-    map = itertools.imap\r
-except AttributeError:\r
-    pass\r
-\r
-\r
-THROUGHPUT_DURATION = 2.0\r
-\r
-LATENCY_PING_INTERVAL = 0.1\r
-LATENCY_DURATION = 2.0\r
-\r
-BANDWIDTH_PACKET_SIZE = 1024\r
-BANDWIDTH_DURATION = 2.0\r
-\r
-\r
-def task_pidigits():\r
-    """Pi calculation (Python)"""\r
-    _map = map\r
-    _count = itertools.count\r
-    _islice = itertools.islice\r
-\r
-    def calc_ndigits(n):\r
-        # From http://shootout.alioth.debian.org/\r
-        def gen_x():\r
-            return _map(lambda k: (k, 4*k + 2, 0, 2*k + 1), _count(1))\r
-\r
-        def compose(a, b):\r
-            aq, ar, as_, at = a\r
-            bq, br, bs, bt = b\r
-            return (aq * bq,\r
-                    aq * br + ar * bt,\r
-                    as_ * bq + at * bs,\r
-                    as_ * br + at * bt)\r
-\r
-        def extract(z, j):\r
-            q, r, s, t = z\r
-            return (q*j + r) // (s*j + t)\r
-\r
-        def pi_digits():\r
-            z = (1, 0, 0, 1)\r
-            x = gen_x()\r
-            while 1:\r
-                y = extract(z, 3)\r
-                while y != extract(z, 4):\r
-                    z = compose(z, next(x))\r
-                    y = extract(z, 3)\r
-                z = compose((10, -10*y, 0, 1), z)\r
-                yield y\r
-\r
-        return list(_islice(pi_digits(), n))\r
-\r
-    return calc_ndigits, (50, )\r
-\r
-def task_regex():\r
-    """regular expression (C)"""\r
-    # XXX this task gives horrendous latency results.\r
-    import re\r
-    # Taken from the `inspect` module\r
-    pat = re.compile(r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)', re.MULTILINE)\r
-    with open(__file__, "r") as f:\r
-        arg = f.read(2000)\r
-\r
-    def findall(s):\r
-        t = time.time()\r
-        try:\r
-            return pat.findall(s)\r
-        finally:\r
-            print(time.time() - t)\r
-    return pat.findall, (arg, )\r
-\r
-def task_sort():\r
-    """list sorting (C)"""\r
-    def list_sort(l):\r
-        l = l[::-1]\r
-        l.sort()\r
-\r
-    return list_sort, (list(range(1000)), )\r
-\r
-def task_compress_zlib():\r
-    """zlib compression (C)"""\r
-    import zlib\r
-    with open(__file__, "rb") as f:\r
-        arg = f.read(5000) * 3\r
-\r
-    def compress(s):\r
-        zlib.decompress(zlib.compress(s, 5))\r
-    return compress, (arg, )\r
-\r
-def task_compress_bz2():\r
-    """bz2 compression (C)"""\r
-    import bz2\r
-    with open(__file__, "rb") as f:\r
-        arg = f.read(3000) * 2\r
-\r
-    def compress(s):\r
-        bz2.compress(s)\r
-    return compress, (arg, )\r
-\r
-def task_hashing():\r
-    """SHA1 hashing (C)"""\r
-    import hashlib\r
-    with open(__file__, "rb") as f:\r
-        arg = f.read(5000) * 30\r
-\r
-    def compute(s):\r
-        hashlib.sha1(s).digest()\r
-    return compute, (arg, )\r
-\r
-\r
-throughput_tasks = [task_pidigits, task_regex]\r
-for mod in 'bz2', 'hashlib':\r
-    try:\r
-        globals()[mod] = __import__(mod)\r
-    except ImportError:\r
-        globals()[mod] = None\r
-\r
-# For whatever reasons, zlib gives irregular results, so we prefer bz2 or\r
-# hashlib if available.\r
-# (NOTE: hashlib releases the GIL from 2.7 and 3.1 onwards)\r
-if bz2 is not None:\r
-    throughput_tasks.append(task_compress_bz2)\r
-elif hashlib is not None:\r
-    throughput_tasks.append(task_hashing)\r
-else:\r
-    throughput_tasks.append(task_compress_zlib)\r
-\r
-latency_tasks = throughput_tasks\r
-bandwidth_tasks = [task_pidigits]\r
-\r
-\r
-class TimedLoop:\r
-    def __init__(self, func, args):\r
-        self.func = func\r
-        self.args = args\r
-\r
-    def __call__(self, start_time, min_duration, end_event, do_yield=False):\r
-        step = 20\r
-        niters = 0\r
-        duration = 0.0\r
-        _time = time.time\r
-        _sleep = time.sleep\r
-        _func = self.func\r
-        _args = self.args\r
-        t1 = start_time\r
-        while True:\r
-            for i in range(step):\r
-                _func(*_args)\r
-            t2 = _time()\r
-            # If another thread terminated, the current measurement is invalid\r
-            # => return the previous one.\r
-            if end_event:\r
-                return niters, duration\r
-            niters += step\r
-            duration = t2 - start_time\r
-            if duration >= min_duration:\r
-                end_event.append(None)\r
-                return niters, duration\r
-            if t2 - t1 < 0.01:\r
-                # Minimize interference of measurement on overall runtime\r
-                step = step * 3 // 2\r
-            elif do_yield:\r
-                # OS scheduling of Python threads is sometimes so bad that we\r
-                # have to force thread switching ourselves, otherwise we get\r
-                # completely useless results.\r
-                _sleep(0.0001)\r
-            t1 = t2\r
-\r
-\r
-def run_throughput_test(func, args, nthreads):\r
-    assert nthreads >= 1\r
-\r
-    # Warm up\r
-    func(*args)\r
-\r
-    results = []\r
-    loop = TimedLoop(func, args)\r
-    end_event = []\r
-\r
-    if nthreads == 1:\r
-        # Pure single-threaded performance, without any switching or\r
-        # synchronization overhead.\r
-        start_time = time.time()\r
-        results.append(loop(start_time, THROUGHPUT_DURATION,\r
-                            end_event, do_yield=False))\r
-        return results\r
-\r
-    started = False\r
-    ready_cond = threading.Condition()\r
-    start_cond = threading.Condition()\r
-    ready = []\r
-\r
-    def run():\r
-        with ready_cond:\r
-            ready.append(None)\r
-            ready_cond.notify()\r
-        with start_cond:\r
-            while not started:\r
-                start_cond.wait()\r
-        results.append(loop(start_time, THROUGHPUT_DURATION,\r
-                            end_event, do_yield=True))\r
-\r
-    threads = []\r
-    for i in range(nthreads):\r
-        threads.append(threading.Thread(target=run))\r
-    for t in threads:\r
-        t.setDaemon(True)\r
-        t.start()\r
-    # We don't want measurements to include thread startup overhead,\r
-    # so we arrange for timing to start after all threads are ready.\r
-    with ready_cond:\r
-        while len(ready) < nthreads:\r
-            ready_cond.wait()\r
-    with start_cond:\r
-        start_time = time.time()\r
-        started = True\r
-        start_cond.notify(nthreads)\r
-    for t in threads:\r
-        t.join()\r
-\r
-    return results\r
-\r
-def run_throughput_tests(max_threads):\r
-    for task in throughput_tasks:\r
-        print(task.__doc__)\r
-        print()\r
-        func, args = task()\r
-        nthreads = 1\r
-        baseline_speed = None\r
-        while nthreads <= max_threads:\r
-            results = run_throughput_test(func, args, nthreads)\r
-            # Taking the max duration rather than average gives pessimistic\r
-            # results rather than optimistic.\r
-            speed = sum(r[0] for r in results) / max(r[1] for r in results)\r
-            print("threads=%d: %d" % (nthreads, speed), end="")\r
-            if baseline_speed is None:\r
-                print(" iterations/s.")\r
-                baseline_speed = speed\r
-            else:\r
-                print(" ( %d %%)" % (speed / baseline_speed * 100))\r
-            nthreads += 1\r
-        print()\r
-\r
-\r
-LAT_END = "END"\r
-\r
-def _sendto(sock, s, addr):\r
-    sock.sendto(s.encode('ascii'), addr)\r
-\r
-def _recv(sock, n):\r
-    return sock.recv(n).decode('ascii')\r
-\r
-def latency_client(addr, nb_pings, interval):\r
-    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\r
-    _time = time.time\r
-    _sleep = time.sleep\r
-    def _ping():\r
-        _sendto(sock, "%r\n" % _time(), addr)\r
-    # The first ping signals the parent process that we are ready.\r
-    _ping()\r
-    # We give the parent a bit of time to notice.\r
-    _sleep(1.0)\r
-    for i in range(nb_pings):\r
-        _sleep(interval)\r
-        _ping()\r
-    _sendto(sock, LAT_END + "\n", addr)\r
-\r
-def run_latency_client(**kwargs):\r
-    cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]\r
-    cmd_line.extend(['--latclient', repr(kwargs)])\r
-    return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,\r
-                            #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)\r
-\r
-def run_latency_test(func, args, nthreads):\r
-    # Create a listening socket to receive the pings. We use UDP which should\r
-    # be painlessly cross-platform.\r
-    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\r
-    sock.bind(("127.0.0.1", 0))\r
-    addr = sock.getsockname()\r
-\r
-    interval = LATENCY_PING_INTERVAL\r
-    duration = LATENCY_DURATION\r
-    nb_pings = int(duration / interval)\r
-\r
-    results = []\r
-    threads = []\r
-    end_event = []\r
-    start_cond = threading.Condition()\r
-    started = False\r
-    if nthreads > 0:\r
-        # Warm up\r
-        func(*args)\r
-\r
-        results = []\r
-        loop = TimedLoop(func, args)\r
-        ready = []\r
-        ready_cond = threading.Condition()\r
-\r
-        def run():\r
-            with ready_cond:\r
-                ready.append(None)\r
-                ready_cond.notify()\r
-            with start_cond:\r
-                while not started:\r
-                    start_cond.wait()\r
-            loop(start_time, duration * 1.5, end_event, do_yield=False)\r
-\r
-        for i in range(nthreads):\r
-            threads.append(threading.Thread(target=run))\r
-        for t in threads:\r
-            t.setDaemon(True)\r
-            t.start()\r
-        # Wait for threads to be ready\r
-        with ready_cond:\r
-            while len(ready) < nthreads:\r
-                ready_cond.wait()\r
-\r
-    # Run the client and wait for the first ping(s) to arrive before\r
-    # unblocking the background threads.\r
-    chunks = []\r
-    process = run_latency_client(addr=sock.getsockname(),\r
-                                 nb_pings=nb_pings, interval=interval)\r
-    s = _recv(sock, 4096)\r
-    _time = time.time\r
-\r
-    with start_cond:\r
-        start_time = _time()\r
-        started = True\r
-        start_cond.notify(nthreads)\r
-\r
-    while LAT_END not in s:\r
-        s = _recv(sock, 4096)\r
-        t = _time()\r
-        chunks.append((t, s))\r
-\r
-    # Tell the background threads to stop.\r
-    end_event.append(None)\r
-    for t in threads:\r
-        t.join()\r
-    process.wait()\r
-\r
-    for recv_time, chunk in chunks:\r
-        # NOTE: it is assumed that a line sent by a client wasn't received\r
-        # in two chunks because the lines are very small.\r
-        for line in chunk.splitlines():\r
-            line = line.strip()\r
-            if line and line != LAT_END:\r
-                send_time = eval(line)\r
-                assert isinstance(send_time, float)\r
-                results.append((send_time, recv_time))\r
-\r
-    return results\r
-\r
-def run_latency_tests(max_threads):\r
-    for task in latency_tasks:\r
-        print("Background CPU task:", task.__doc__)\r
-        print()\r
-        func, args = task()\r
-        nthreads = 0\r
-        while nthreads <= max_threads:\r
-            results = run_latency_test(func, args, nthreads)\r
-            n = len(results)\r
-            # We print out milliseconds\r
-            lats = [1000 * (t2 - t1) for (t1, t2) in results]\r
-            #print(list(map(int, lats)))\r
-            avg = sum(lats) / n\r
-            dev = (sum((x - avg) ** 2 for x in lats) / n) ** 0.5\r
-            print("CPU threads=%d: %d ms. (std dev: %d ms.)" % (nthreads, avg, dev), end="")\r
-            print()\r
-            #print("    [... from %d samples]" % n)\r
-            nthreads += 1\r
-        print()\r
-\r
-\r
-BW_END = "END"\r
-\r
-def bandwidth_client(addr, packet_size, duration):\r
-    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\r
-    sock.bind(("127.0.0.1", 0))\r
-    local_addr = sock.getsockname()\r
-    _time = time.time\r
-    _sleep = time.sleep\r
-    def _send_chunk(msg):\r
-        _sendto(sock, ("%r#%s\n" % (local_addr, msg)).rjust(packet_size), addr)\r
-    # We give the parent some time to be ready.\r
-    _sleep(1.0)\r
-    try:\r
-        start_time = _time()\r
-        end_time = start_time + duration * 2.0\r
-        i = 0\r
-        while _time() < end_time:\r
-            _send_chunk(str(i))\r
-            s = _recv(sock, packet_size)\r
-            assert len(s) == packet_size\r
-            i += 1\r
-        _send_chunk(BW_END)\r
-    finally:\r
-        sock.close()\r
-\r
-def run_bandwidth_client(**kwargs):\r
-    cmd_line = [sys.executable, '-E', os.path.abspath(__file__)]\r
-    cmd_line.extend(['--bwclient', repr(kwargs)])\r
-    return subprocess.Popen(cmd_line) #, stdin=subprocess.PIPE,\r
-                            #stdout=subprocess.PIPE, stderr=subprocess.STDOUT)\r
-\r
-def run_bandwidth_test(func, args, nthreads):\r
-    # Create a listening socket to receive the packets. We use UDP which should\r
-    # be painlessly cross-platform.\r
-    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\r
-    sock.bind(("127.0.0.1", 0))\r
-    addr = sock.getsockname()\r
-\r
-    duration = BANDWIDTH_DURATION\r
-    packet_size = BANDWIDTH_PACKET_SIZE\r
-\r
-    results = []\r
-    threads = []\r
-    end_event = []\r
-    start_cond = threading.Condition()\r
-    started = False\r
-    if nthreads > 0:\r
-        # Warm up\r
-        func(*args)\r
-\r
-        results = []\r
-        loop = TimedLoop(func, args)\r
-        ready = []\r
-        ready_cond = threading.Condition()\r
-\r
-        def run():\r
-            with ready_cond:\r
-                ready.append(None)\r
-                ready_cond.notify()\r
-            with start_cond:\r
-                while not started:\r
-                    start_cond.wait()\r
-            loop(start_time, duration * 1.5, end_event, do_yield=False)\r
-\r
-        for i in range(nthreads):\r
-            threads.append(threading.Thread(target=run))\r
-        for t in threads:\r
-            t.setDaemon(True)\r
-            t.start()\r
-        # Wait for threads to be ready\r
-        with ready_cond:\r
-            while len(ready) < nthreads:\r
-                ready_cond.wait()\r
-\r
-    # Run the client and wait for the first packet to arrive before\r
-    # unblocking the background threads.\r
-    process = run_bandwidth_client(addr=addr,\r
-                                   packet_size=packet_size,\r
-                                   duration=duration)\r
-    _time = time.time\r
-    # This will also wait for the parent to be ready\r
-    s = _recv(sock, packet_size)\r
-    remote_addr = eval(s.partition('#')[0])\r
-\r
-    with start_cond:\r
-        start_time = _time()\r
-        started = True\r
-        start_cond.notify(nthreads)\r
-\r
-    n = 0\r
-    first_time = None\r
-    while not end_event and BW_END not in s:\r
-        _sendto(sock, s, remote_addr)\r
-        s = _recv(sock, packet_size)\r
-        if first_time is None:\r
-            first_time = _time()\r
-        n += 1\r
-    end_time = _time()\r
-\r
-    end_event.append(None)\r
-    for t in threads:\r
-        t.join()\r
-    process.kill()\r
-\r
-    return (n - 1) / (end_time - first_time)\r
-\r
-def run_bandwidth_tests(max_threads):\r
-    for task in bandwidth_tasks:\r
-        print("Background CPU task:", task.__doc__)\r
-        print()\r
-        func, args = task()\r
-        nthreads = 0\r
-        baseline_speed = None\r
-        while nthreads <= max_threads:\r
-            results = run_bandwidth_test(func, args, nthreads)\r
-            speed = results\r
-            #speed = len(results) * 1.0 / results[-1][0]\r
-            print("CPU threads=%d: %.1f" % (nthreads, speed), end="")\r
-            if baseline_speed is None:\r
-                print(" packets/s.")\r
-                baseline_speed = speed\r
-            else:\r
-                print(" ( %d %%)" % (speed / baseline_speed * 100))\r
-            nthreads += 1\r
-        print()\r
-\r
-\r
-def main():\r
-    usage = "usage: %prog [-h|--help] [options]"\r
-    parser = OptionParser(usage=usage)\r
-    parser.add_option("-t", "--throughput",\r
-                      action="store_true", dest="throughput", default=False,\r
-                      help="run throughput tests")\r
-    parser.add_option("-l", "--latency",\r
-                      action="store_true", dest="latency", default=False,\r
-                      help="run latency tests")\r
-    parser.add_option("-b", "--bandwidth",\r
-                      action="store_true", dest="bandwidth", default=False,\r
-                      help="run I/O bandwidth tests")\r
-    parser.add_option("-i", "--interval",\r
-                      action="store", type="int", dest="check_interval", default=None,\r
-                      help="sys.setcheckinterval() value")\r
-    parser.add_option("-I", "--switch-interval",\r
-                      action="store", type="float", dest="switch_interval", default=None,\r
-                      help="sys.setswitchinterval() value")\r
-    parser.add_option("-n", "--num-threads",\r
-                      action="store", type="int", dest="nthreads", default=4,\r
-                      help="max number of threads in tests")\r
-\r
-    # Hidden option to run the pinging and bandwidth clients\r
-    parser.add_option("", "--latclient",\r
-                      action="store", dest="latclient", default=None,\r
-                      help=SUPPRESS_HELP)\r
-    parser.add_option("", "--bwclient",\r
-                      action="store", dest="bwclient", default=None,\r
-                      help=SUPPRESS_HELP)\r
-\r
-    options, args = parser.parse_args()\r
-    if args:\r
-        parser.error("unexpected arguments")\r
-\r
-    if options.latclient:\r
-        kwargs = eval(options.latclient)\r
-        latency_client(**kwargs)\r
-        return\r
-\r
-    if options.bwclient:\r
-        kwargs = eval(options.bwclient)\r
-        bandwidth_client(**kwargs)\r
-        return\r
-\r
-    if not options.throughput and not options.latency and not options.bandwidth:\r
-        options.throughput = options.latency = options.bandwidth = True\r
-    if options.check_interval:\r
-        sys.setcheckinterval(options.check_interval)\r
-    if options.switch_interval:\r
-        sys.setswitchinterval(options.switch_interval)\r
-\r
-    print("== %s %s (%s) ==" % (\r
-        platform.python_implementation(),\r
-        platform.python_version(),\r
-        platform.python_build()[0],\r
-    ))\r
-    # Processor identification often has repeated spaces\r
-    cpu = ' '.join(platform.processor().split())\r
-    print("== %s %s on '%s' ==" % (\r
-        platform.machine(),\r
-        platform.system(),\r
-        cpu,\r
-    ))\r
-    print()\r
-\r
-    if options.throughput:\r
-        print("--- Throughput ---")\r
-        print()\r
-        run_throughput_tests(options.nthreads)\r
-\r
-    if options.latency:\r
-        print("--- Latency ---")\r
-        print()\r
-        run_latency_tests(options.nthreads)\r
-\r
-    if options.bandwidth:\r
-        print("--- I/O bandwidth ---")\r
-        print()\r
-        run_bandwidth_tests(options.nthreads)\r
-\r
-if __name__ == "__main__":\r
-    main()\r