+++ /dev/null
-"""A multi-producer, multi-consumer queue."""\r
-\r
-from time import time as _time\r
-try:\r
- import threading as _threading\r
-except ImportError:\r
- import dummy_threading as _threading\r
-from collections import deque\r
-import heapq\r
-\r
-__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']\r
-\r
-class Empty(Exception):\r
- "Exception raised by Queue.get(block=0)/get_nowait()."\r
- pass\r
-\r
-class Full(Exception):\r
- "Exception raised by Queue.put(block=0)/put_nowait()."\r
- pass\r
-\r
-class Queue:\r
- """Create a queue object with a given maximum size.\r
-\r
- If maxsize is <= 0, the queue size is infinite.\r
- """\r
- def __init__(self, maxsize=0):\r
- self.maxsize = maxsize\r
- self._init(maxsize)\r
- # mutex must be held whenever the queue is mutating. All methods\r
- # that acquire mutex must release it before returning. mutex\r
- # is shared between the three conditions, so acquiring and\r
- # releasing the conditions also acquires and releases mutex.\r
- self.mutex = _threading.Lock()\r
- # Notify not_empty whenever an item is added to the queue; a\r
- # thread waiting to get is notified then.\r
- self.not_empty = _threading.Condition(self.mutex)\r
- # Notify not_full whenever an item is removed from the queue;\r
- # a thread waiting to put is notified then.\r
- self.not_full = _threading.Condition(self.mutex)\r
- # Notify all_tasks_done whenever the number of unfinished tasks\r
- # drops to zero; thread waiting to join() is notified to resume\r
- self.all_tasks_done = _threading.Condition(self.mutex)\r
- self.unfinished_tasks = 0\r
-\r
- def task_done(self):\r
- """Indicate that a formerly enqueued task is complete.\r
-\r
- Used by Queue consumer threads. For each get() used to fetch a task,\r
- a subsequent call to task_done() tells the queue that the processing\r
- on the task is complete.\r
-\r
- If a join() is currently blocking, it will resume when all items\r
- have been processed (meaning that a task_done() call was received\r
- for every item that had been put() into the queue).\r
-\r
- Raises a ValueError if called more times than there were items\r
- placed in the queue.\r
- """\r
- self.all_tasks_done.acquire()\r
- try:\r
- unfinished = self.unfinished_tasks - 1\r
- if unfinished <= 0:\r
- if unfinished < 0:\r
- raise ValueError('task_done() called too many times')\r
- self.all_tasks_done.notify_all()\r
- self.unfinished_tasks = unfinished\r
- finally:\r
- self.all_tasks_done.release()\r
-\r
- def join(self):\r
- """Blocks until all items in the Queue have been gotten and processed.\r
-\r
- The count of unfinished tasks goes up whenever an item is added to the\r
- queue. The count goes down whenever a consumer thread calls task_done()\r
- to indicate the item was retrieved and all work on it is complete.\r
-\r
- When the count of unfinished tasks drops to zero, join() unblocks.\r
- """\r
- self.all_tasks_done.acquire()\r
- try:\r
- while self.unfinished_tasks:\r
- self.all_tasks_done.wait()\r
- finally:\r
- self.all_tasks_done.release()\r
-\r
- def qsize(self):\r
- """Return the approximate size of the queue (not reliable!)."""\r
- self.mutex.acquire()\r
- n = self._qsize()\r
- self.mutex.release()\r
- return n\r
-\r
- def empty(self):\r
- """Return True if the queue is empty, False otherwise (not reliable!)."""\r
- self.mutex.acquire()\r
- n = not self._qsize()\r
- self.mutex.release()\r
- return n\r
-\r
- def full(self):\r
- """Return True if the queue is full, False otherwise (not reliable!)."""\r
- self.mutex.acquire()\r
- n = 0 < self.maxsize == self._qsize()\r
- self.mutex.release()\r
- return n\r
-\r
- def put(self, item, block=True, timeout=None):\r
- """Put an item into the queue.\r
-\r
- If optional args 'block' is true and 'timeout' is None (the default),\r
- block if necessary until a free slot is available. If 'timeout' is\r
- a positive number, it blocks at most 'timeout' seconds and raises\r
- the Full exception if no free slot was available within that time.\r
- Otherwise ('block' is false), put an item on the queue if a free slot\r
- is immediately available, else raise the Full exception ('timeout'\r
- is ignored in that case).\r
- """\r
- self.not_full.acquire()\r
- try:\r
- if self.maxsize > 0:\r
- if not block:\r
- if self._qsize() == self.maxsize:\r
- raise Full\r
- elif timeout is None:\r
- while self._qsize() == self.maxsize:\r
- self.not_full.wait()\r
- elif timeout < 0:\r
- raise ValueError("'timeout' must be a positive number")\r
- else:\r
- endtime = _time() + timeout\r
- while self._qsize() == self.maxsize:\r
- remaining = endtime - _time()\r
- if remaining <= 0.0:\r
- raise Full\r
- self.not_full.wait(remaining)\r
- self._put(item)\r
- self.unfinished_tasks += 1\r
- self.not_empty.notify()\r
- finally:\r
- self.not_full.release()\r
-\r
- def put_nowait(self, item):\r
- """Put an item into the queue without blocking.\r
-\r
- Only enqueue the item if a free slot is immediately available.\r
- Otherwise raise the Full exception.\r
- """\r
- return self.put(item, False)\r
-\r
- def get(self, block=True, timeout=None):\r
- """Remove and return an item from the queue.\r
-\r
- If optional args 'block' is true and 'timeout' is None (the default),\r
- block if necessary until an item is available. If 'timeout' is\r
- a positive number, it blocks at most 'timeout' seconds and raises\r
- the Empty exception if no item was available within that time.\r
- Otherwise ('block' is false), return an item if one is immediately\r
- available, else raise the Empty exception ('timeout' is ignored\r
- in that case).\r
- """\r
- self.not_empty.acquire()\r
- try:\r
- if not block:\r
- if not self._qsize():\r
- raise Empty\r
- elif timeout is None:\r
- while not self._qsize():\r
- self.not_empty.wait()\r
- elif timeout < 0:\r
- raise ValueError("'timeout' must be a positive number")\r
- else:\r
- endtime = _time() + timeout\r
- while not self._qsize():\r
- remaining = endtime - _time()\r
- if remaining <= 0.0:\r
- raise Empty\r
- self.not_empty.wait(remaining)\r
- item = self._get()\r
- self.not_full.notify()\r
- return item\r
- finally:\r
- self.not_empty.release()\r
-\r
- def get_nowait(self):\r
- """Remove and return an item from the queue without blocking.\r
-\r
- Only get an item if one is immediately available. Otherwise\r
- raise the Empty exception.\r
- """\r
- return self.get(False)\r
-\r
- # Override these methods to implement other queue organizations\r
- # (e.g. stack or priority queue).\r
- # These will only be called with appropriate locks held\r
-\r
- # Initialize the queue representation\r
- def _init(self, maxsize):\r
- self.queue = deque()\r
-\r
- def _qsize(self, len=len):\r
- return len(self.queue)\r
-\r
- # Put a new item in the queue\r
- def _put(self, item):\r
- self.queue.append(item)\r
-\r
- # Get an item from the queue\r
- def _get(self):\r
- return self.queue.popleft()\r
-\r
-\r
-class PriorityQueue(Queue):\r
- '''Variant of Queue that retrieves open entries in priority order (lowest first).\r
-\r
- Entries are typically tuples of the form: (priority number, data).\r
- '''\r
-\r
- def _init(self, maxsize):\r
- self.queue = []\r
-\r
- def _qsize(self, len=len):\r
- return len(self.queue)\r
-\r
- def _put(self, item, heappush=heapq.heappush):\r
- heappush(self.queue, item)\r
-\r
- def _get(self, heappop=heapq.heappop):\r
- return heappop(self.queue)\r
-\r
-\r
-class LifoQueue(Queue):\r
- '''Variant of Queue that retrieves most recently added entries first.'''\r
-\r
- def _init(self, maxsize):\r
- self.queue = []\r
-\r
- def _qsize(self, len=len):\r
- return len(self.queue)\r
-\r
- def _put(self, item):\r
- self.queue.append(item)\r
-\r
- def _get(self):\r
- return self.queue.pop()\r