+++ /dev/null
-"""MH interface -- purely object-oriented (well, almost)\r
-\r
-Executive summary:\r
-\r
-import mhlib\r
-\r
-mh = mhlib.MH() # use default mailbox directory and profile\r
-mh = mhlib.MH(mailbox) # override mailbox location (default from profile)\r
-mh = mhlib.MH(mailbox, profile) # override mailbox and profile\r
-\r
-mh.error(format, ...) # print error message -- can be overridden\r
-s = mh.getprofile(key) # profile entry (None if not set)\r
-path = mh.getpath() # mailbox pathname\r
-name = mh.getcontext() # name of current folder\r
-mh.setcontext(name) # set name of current folder\r
-\r
-list = mh.listfolders() # names of top-level folders\r
-list = mh.listallfolders() # names of all folders, including subfolders\r
-list = mh.listsubfolders(name) # direct subfolders of given folder\r
-list = mh.listallsubfolders(name) # all subfolders of given folder\r
-\r
-mh.makefolder(name) # create new folder\r
-mh.deletefolder(name) # delete folder -- must have no subfolders\r
-\r
-f = mh.openfolder(name) # new open folder object\r
-\r
-f.error(format, ...) # same as mh.error(format, ...)\r
-path = f.getfullname() # folder's full pathname\r
-path = f.getsequencesfilename() # full pathname of folder's sequences file\r
-path = f.getmessagefilename(n) # full pathname of message n in folder\r
-\r
-list = f.listmessages() # list of messages in folder (as numbers)\r
-n = f.getcurrent() # get current message\r
-f.setcurrent(n) # set current message\r
-list = f.parsesequence(seq) # parse msgs syntax into list of messages\r
-n = f.getlast() # get last message (0 if no messagse)\r
-f.setlast(n) # set last message (internal use only)\r
-\r
-dict = f.getsequences() # dictionary of sequences in folder {name: list}\r
-f.putsequences(dict) # write sequences back to folder\r
-\r
-f.createmessage(n, fp) # add message from file f as number n\r
-f.removemessages(list) # remove messages in list from folder\r
-f.refilemessages(list, tofolder) # move messages in list to other folder\r
-f.movemessage(n, tofolder, ton) # move one message to a given destination\r
-f.copymessage(n, tofolder, ton) # copy one message to a given destination\r
-\r
-m = f.openmessage(n) # new open message object (costs a file descriptor)\r
-m is a derived class of mimetools.Message(rfc822.Message), with:\r
-s = m.getheadertext() # text of message's headers\r
-s = m.getheadertext(pred) # text of message's headers, filtered by pred\r
-s = m.getbodytext() # text of message's body, decoded\r
-s = m.getbodytext(0) # text of message's body, not decoded\r
-"""\r
-from warnings import warnpy3k\r
-warnpy3k("the mhlib module has been removed in Python 3.0; use the mailbox "\r
- "module instead", stacklevel=2)\r
-del warnpy3k\r
-\r
-# XXX To do, functionality:\r
-# - annotate messages\r
-# - send messages\r
-#\r
-# XXX To do, organization:\r
-# - move IntSet to separate file\r
-# - move most Message functionality to module mimetools\r
-\r
-\r
-# Customizable defaults\r
-\r
-MH_PROFILE = '~/.mh_profile'\r
-PATH = '~/Mail'\r
-MH_SEQUENCES = '.mh_sequences'\r
-FOLDER_PROTECT = 0700\r
-\r
-\r
-# Imported modules\r
-\r
-import os\r
-import sys\r
-import re\r
-import mimetools\r
-import multifile\r
-import shutil\r
-from bisect import bisect\r
-\r
-__all__ = ["MH","Error","Folder","Message"]\r
-\r
-# Exported constants\r
-\r
-class Error(Exception):\r
- pass\r
-\r
-\r
-class MH:\r
- """Class representing a particular collection of folders.\r
- Optional constructor arguments are the pathname for the directory\r
- containing the collection, and the MH profile to use.\r
- If either is omitted or empty a default is used; the default\r
- directory is taken from the MH profile if it is specified there."""\r
-\r
- def __init__(self, path = None, profile = None):\r
- """Constructor."""\r
- if profile is None: profile = MH_PROFILE\r
- self.profile = os.path.expanduser(profile)\r
- if path is None: path = self.getprofile('Path')\r
- if not path: path = PATH\r
- if not os.path.isabs(path) and path[0] != '~':\r
- path = os.path.join('~', path)\r
- path = os.path.expanduser(path)\r
- if not os.path.isdir(path): raise Error, 'MH() path not found'\r
- self.path = path\r
-\r
- def __repr__(self):\r
- """String representation."""\r
- return 'MH(%r, %r)' % (self.path, self.profile)\r
-\r
- def error(self, msg, *args):\r
- """Routine to print an error. May be overridden by a derived class."""\r
- sys.stderr.write('MH error: %s\n' % (msg % args))\r
-\r
- def getprofile(self, key):\r
- """Return a profile entry, None if not found."""\r
- return pickline(self.profile, key)\r
-\r
- def getpath(self):\r
- """Return the path (the name of the collection's directory)."""\r
- return self.path\r
-\r
- def getcontext(self):\r
- """Return the name of the current folder."""\r
- context = pickline(os.path.join(self.getpath(), 'context'),\r
- 'Current-Folder')\r
- if not context: context = 'inbox'\r
- return context\r
-\r
- def setcontext(self, context):\r
- """Set the name of the current folder."""\r
- fn = os.path.join(self.getpath(), 'context')\r
- f = open(fn, "w")\r
- f.write("Current-Folder: %s\n" % context)\r
- f.close()\r
-\r
- def listfolders(self):\r
- """Return the names of the top-level folders."""\r
- folders = []\r
- path = self.getpath()\r
- for name in os.listdir(path):\r
- fullname = os.path.join(path, name)\r
- if os.path.isdir(fullname):\r
- folders.append(name)\r
- folders.sort()\r
- return folders\r
-\r
- def listsubfolders(self, name):\r
- """Return the names of the subfolders in a given folder\r
- (prefixed with the given folder name)."""\r
- fullname = os.path.join(self.path, name)\r
- # Get the link count so we can avoid listing folders\r
- # that have no subfolders.\r
- nlinks = os.stat(fullname).st_nlink\r
- if nlinks <= 2:\r
- return []\r
- subfolders = []\r
- subnames = os.listdir(fullname)\r
- for subname in subnames:\r
- fullsubname = os.path.join(fullname, subname)\r
- if os.path.isdir(fullsubname):\r
- name_subname = os.path.join(name, subname)\r
- subfolders.append(name_subname)\r
- # Stop looking for subfolders when\r
- # we've seen them all\r
- nlinks = nlinks - 1\r
- if nlinks <= 2:\r
- break\r
- subfolders.sort()\r
- return subfolders\r
-\r
- def listallfolders(self):\r
- """Return the names of all folders and subfolders, recursively."""\r
- return self.listallsubfolders('')\r
-\r
- def listallsubfolders(self, name):\r
- """Return the names of subfolders in a given folder, recursively."""\r
- fullname = os.path.join(self.path, name)\r
- # Get the link count so we can avoid listing folders\r
- # that have no subfolders.\r
- nlinks = os.stat(fullname).st_nlink\r
- if nlinks <= 2:\r
- return []\r
- subfolders = []\r
- subnames = os.listdir(fullname)\r
- for subname in subnames:\r
- if subname[0] == ',' or isnumeric(subname): continue\r
- fullsubname = os.path.join(fullname, subname)\r
- if os.path.isdir(fullsubname):\r
- name_subname = os.path.join(name, subname)\r
- subfolders.append(name_subname)\r
- if not os.path.islink(fullsubname):\r
- subsubfolders = self.listallsubfolders(\r
- name_subname)\r
- subfolders = subfolders + subsubfolders\r
- # Stop looking for subfolders when\r
- # we've seen them all\r
- nlinks = nlinks - 1\r
- if nlinks <= 2:\r
- break\r
- subfolders.sort()\r
- return subfolders\r
-\r
- def openfolder(self, name):\r
- """Return a new Folder object for the named folder."""\r
- return Folder(self, name)\r
-\r
- def makefolder(self, name):\r
- """Create a new folder (or raise os.error if it cannot be created)."""\r
- protect = pickline(self.profile, 'Folder-Protect')\r
- if protect and isnumeric(protect):\r
- mode = int(protect, 8)\r
- else:\r
- mode = FOLDER_PROTECT\r
- os.mkdir(os.path.join(self.getpath(), name), mode)\r
-\r
- def deletefolder(self, name):\r
- """Delete a folder. This removes files in the folder but not\r
- subdirectories. Raise os.error if deleting the folder itself fails."""\r
- fullname = os.path.join(self.getpath(), name)\r
- for subname in os.listdir(fullname):\r
- fullsubname = os.path.join(fullname, subname)\r
- try:\r
- os.unlink(fullsubname)\r
- except os.error:\r
- self.error('%s not deleted, continuing...' %\r
- fullsubname)\r
- os.rmdir(fullname)\r
-\r
-\r
-numericprog = re.compile('^[1-9][0-9]*$')\r
-def isnumeric(str):\r
- return numericprog.match(str) is not None\r
-\r
-class Folder:\r
- """Class representing a particular folder."""\r
-\r
- def __init__(self, mh, name):\r
- """Constructor."""\r
- self.mh = mh\r
- self.name = name\r
- if not os.path.isdir(self.getfullname()):\r
- raise Error, 'no folder %s' % name\r
-\r
- def __repr__(self):\r
- """String representation."""\r
- return 'Folder(%r, %r)' % (self.mh, self.name)\r
-\r
- def error(self, *args):\r
- """Error message handler."""\r
- self.mh.error(*args)\r
-\r
- def getfullname(self):\r
- """Return the full pathname of the folder."""\r
- return os.path.join(self.mh.path, self.name)\r
-\r
- def getsequencesfilename(self):\r
- """Return the full pathname of the folder's sequences file."""\r
- return os.path.join(self.getfullname(), MH_SEQUENCES)\r
-\r
- def getmessagefilename(self, n):\r
- """Return the full pathname of a message in the folder."""\r
- return os.path.join(self.getfullname(), str(n))\r
-\r
- def listsubfolders(self):\r
- """Return list of direct subfolders."""\r
- return self.mh.listsubfolders(self.name)\r
-\r
- def listallsubfolders(self):\r
- """Return list of all subfolders."""\r
- return self.mh.listallsubfolders(self.name)\r
-\r
- def listmessages(self):\r
- """Return the list of messages currently present in the folder.\r
- As a side effect, set self.last to the last message (or 0)."""\r
- messages = []\r
- match = numericprog.match\r
- append = messages.append\r
- for name in os.listdir(self.getfullname()):\r
- if match(name):\r
- append(name)\r
- messages = map(int, messages)\r
- messages.sort()\r
- if messages:\r
- self.last = messages[-1]\r
- else:\r
- self.last = 0\r
- return messages\r
-\r
- def getsequences(self):\r
- """Return the set of sequences for the folder."""\r
- sequences = {}\r
- fullname = self.getsequencesfilename()\r
- try:\r
- f = open(fullname, 'r')\r
- except IOError:\r
- return sequences\r
- while 1:\r
- line = f.readline()\r
- if not line: break\r
- fields = line.split(':')\r
- if len(fields) != 2:\r
- self.error('bad sequence in %s: %s' %\r
- (fullname, line.strip()))\r
- key = fields[0].strip()\r
- value = IntSet(fields[1].strip(), ' ').tolist()\r
- sequences[key] = value\r
- return sequences\r
-\r
- def putsequences(self, sequences):\r
- """Write the set of sequences back to the folder."""\r
- fullname = self.getsequencesfilename()\r
- f = None\r
- for key, seq in sequences.iteritems():\r
- s = IntSet('', ' ')\r
- s.fromlist(seq)\r
- if not f: f = open(fullname, 'w')\r
- f.write('%s: %s\n' % (key, s.tostring()))\r
- if not f:\r
- try:\r
- os.unlink(fullname)\r
- except os.error:\r
- pass\r
- else:\r
- f.close()\r
-\r
- def getcurrent(self):\r
- """Return the current message. Raise Error when there is none."""\r
- seqs = self.getsequences()\r
- try:\r
- return max(seqs['cur'])\r
- except (ValueError, KeyError):\r
- raise Error, "no cur message"\r
-\r
- def setcurrent(self, n):\r
- """Set the current message."""\r
- updateline(self.getsequencesfilename(), 'cur', str(n), 0)\r
-\r
- def parsesequence(self, seq):\r
- """Parse an MH sequence specification into a message list.\r
- Attempt to mimic mh-sequence(5) as close as possible.\r
- Also attempt to mimic observed behavior regarding which\r
- conditions cause which error messages."""\r
- # XXX Still not complete (see mh-format(5)).\r
- # Missing are:\r
- # - 'prev', 'next' as count\r
- # - Sequence-Negation option\r
- all = self.listmessages()\r
- # Observed behavior: test for empty folder is done first\r
- if not all:\r
- raise Error, "no messages in %s" % self.name\r
- # Common case first: all is frequently the default\r
- if seq == 'all':\r
- return all\r
- # Test for X:Y before X-Y because 'seq:-n' matches both\r
- i = seq.find(':')\r
- if i >= 0:\r
- head, dir, tail = seq[:i], '', seq[i+1:]\r
- if tail[:1] in '-+':\r
- dir, tail = tail[:1], tail[1:]\r
- if not isnumeric(tail):\r
- raise Error, "bad message list %s" % seq\r
- try:\r
- count = int(tail)\r
- except (ValueError, OverflowError):\r
- # Can't use sys.maxint because of i+count below\r
- count = len(all)\r
- try:\r
- anchor = self._parseindex(head, all)\r
- except Error, msg:\r
- seqs = self.getsequences()\r
- if not head in seqs:\r
- if not msg:\r
- msg = "bad message list %s" % seq\r
- raise Error, msg, sys.exc_info()[2]\r
- msgs = seqs[head]\r
- if not msgs:\r
- raise Error, "sequence %s empty" % head\r
- if dir == '-':\r
- return msgs[-count:]\r
- else:\r
- return msgs[:count]\r
- else:\r
- if not dir:\r
- if head in ('prev', 'last'):\r
- dir = '-'\r
- if dir == '-':\r
- i = bisect(all, anchor)\r
- return all[max(0, i-count):i]\r
- else:\r
- i = bisect(all, anchor-1)\r
- return all[i:i+count]\r
- # Test for X-Y next\r
- i = seq.find('-')\r
- if i >= 0:\r
- begin = self._parseindex(seq[:i], all)\r
- end = self._parseindex(seq[i+1:], all)\r
- i = bisect(all, begin-1)\r
- j = bisect(all, end)\r
- r = all[i:j]\r
- if not r:\r
- raise Error, "bad message list %s" % seq\r
- return r\r
- # Neither X:Y nor X-Y; must be a number or a (pseudo-)sequence\r
- try:\r
- n = self._parseindex(seq, all)\r
- except Error, msg:\r
- seqs = self.getsequences()\r
- if not seq in seqs:\r
- if not msg:\r
- msg = "bad message list %s" % seq\r
- raise Error, msg\r
- return seqs[seq]\r
- else:\r
- if n not in all:\r
- if isnumeric(seq):\r
- raise Error, "message %d doesn't exist" % n\r
- else:\r
- raise Error, "no %s message" % seq\r
- else:\r
- return [n]\r
-\r
- def _parseindex(self, seq, all):\r
- """Internal: parse a message number (or cur, first, etc.)."""\r
- if isnumeric(seq):\r
- try:\r
- return int(seq)\r
- except (OverflowError, ValueError):\r
- return sys.maxint\r
- if seq in ('cur', '.'):\r
- return self.getcurrent()\r
- if seq == 'first':\r
- return all[0]\r
- if seq == 'last':\r
- return all[-1]\r
- if seq == 'next':\r
- n = self.getcurrent()\r
- i = bisect(all, n)\r
- try:\r
- return all[i]\r
- except IndexError:\r
- raise Error, "no next message"\r
- if seq == 'prev':\r
- n = self.getcurrent()\r
- i = bisect(all, n-1)\r
- if i == 0:\r
- raise Error, "no prev message"\r
- try:\r
- return all[i-1]\r
- except IndexError:\r
- raise Error, "no prev message"\r
- raise Error, None\r
-\r
- def openmessage(self, n):\r
- """Open a message -- returns a Message object."""\r
- return Message(self, n)\r
-\r
- def removemessages(self, list):\r
- """Remove one or more messages -- may raise os.error."""\r
- errors = []\r
- deleted = []\r
- for n in list:\r
- path = self.getmessagefilename(n)\r
- commapath = self.getmessagefilename(',' + str(n))\r
- try:\r
- os.unlink(commapath)\r
- except os.error:\r
- pass\r
- try:\r
- os.rename(path, commapath)\r
- except os.error, msg:\r
- errors.append(msg)\r
- else:\r
- deleted.append(n)\r
- if deleted:\r
- self.removefromallsequences(deleted)\r
- if errors:\r
- if len(errors) == 1:\r
- raise os.error, errors[0]\r
- else:\r
- raise os.error, ('multiple errors:', errors)\r
-\r
- def refilemessages(self, list, tofolder, keepsequences=0):\r
- """Refile one or more messages -- may raise os.error.\r
- 'tofolder' is an open folder object."""\r
- errors = []\r
- refiled = {}\r
- for n in list:\r
- ton = tofolder.getlast() + 1\r
- path = self.getmessagefilename(n)\r
- topath = tofolder.getmessagefilename(ton)\r
- try:\r
- os.rename(path, topath)\r
- except os.error:\r
- # Try copying\r
- try:\r
- shutil.copy2(path, topath)\r
- os.unlink(path)\r
- except (IOError, os.error), msg:\r
- errors.append(msg)\r
- try:\r
- os.unlink(topath)\r
- except os.error:\r
- pass\r
- continue\r
- tofolder.setlast(ton)\r
- refiled[n] = ton\r
- if refiled:\r
- if keepsequences:\r
- tofolder._copysequences(self, refiled.items())\r
- self.removefromallsequences(refiled.keys())\r
- if errors:\r
- if len(errors) == 1:\r
- raise os.error, errors[0]\r
- else:\r
- raise os.error, ('multiple errors:', errors)\r
-\r
- def _copysequences(self, fromfolder, refileditems):\r
- """Helper for refilemessages() to copy sequences."""\r
- fromsequences = fromfolder.getsequences()\r
- tosequences = self.getsequences()\r
- changed = 0\r
- for name, seq in fromsequences.items():\r
- try:\r
- toseq = tosequences[name]\r
- new = 0\r
- except KeyError:\r
- toseq = []\r
- new = 1\r
- for fromn, ton in refileditems:\r
- if fromn in seq:\r
- toseq.append(ton)\r
- changed = 1\r
- if new and toseq:\r
- tosequences[name] = toseq\r
- if changed:\r
- self.putsequences(tosequences)\r
-\r
- def movemessage(self, n, tofolder, ton):\r
- """Move one message over a specific destination message,\r
- which may or may not already exist."""\r
- path = self.getmessagefilename(n)\r
- # Open it to check that it exists\r
- f = open(path)\r
- f.close()\r
- del f\r
- topath = tofolder.getmessagefilename(ton)\r
- backuptopath = tofolder.getmessagefilename(',%d' % ton)\r
- try:\r
- os.rename(topath, backuptopath)\r
- except os.error:\r
- pass\r
- try:\r
- os.rename(path, topath)\r
- except os.error:\r
- # Try copying\r
- ok = 0\r
- try:\r
- tofolder.setlast(None)\r
- shutil.copy2(path, topath)\r
- ok = 1\r
- finally:\r
- if not ok:\r
- try:\r
- os.unlink(topath)\r
- except os.error:\r
- pass\r
- os.unlink(path)\r
- self.removefromallsequences([n])\r
-\r
- def copymessage(self, n, tofolder, ton):\r
- """Copy one message over a specific destination message,\r
- which may or may not already exist."""\r
- path = self.getmessagefilename(n)\r
- # Open it to check that it exists\r
- f = open(path)\r
- f.close()\r
- del f\r
- topath = tofolder.getmessagefilename(ton)\r
- backuptopath = tofolder.getmessagefilename(',%d' % ton)\r
- try:\r
- os.rename(topath, backuptopath)\r
- except os.error:\r
- pass\r
- ok = 0\r
- try:\r
- tofolder.setlast(None)\r
- shutil.copy2(path, topath)\r
- ok = 1\r
- finally:\r
- if not ok:\r
- try:\r
- os.unlink(topath)\r
- except os.error:\r
- pass\r
-\r
- def createmessage(self, n, txt):\r
- """Create a message, with text from the open file txt."""\r
- path = self.getmessagefilename(n)\r
- backuppath = self.getmessagefilename(',%d' % n)\r
- try:\r
- os.rename(path, backuppath)\r
- except os.error:\r
- pass\r
- ok = 0\r
- BUFSIZE = 16*1024\r
- try:\r
- f = open(path, "w")\r
- while 1:\r
- buf = txt.read(BUFSIZE)\r
- if not buf:\r
- break\r
- f.write(buf)\r
- f.close()\r
- ok = 1\r
- finally:\r
- if not ok:\r
- try:\r
- os.unlink(path)\r
- except os.error:\r
- pass\r
-\r
- def removefromallsequences(self, list):\r
- """Remove one or more messages from all sequences (including last)\r
- -- but not from 'cur'!!!"""\r
- if hasattr(self, 'last') and self.last in list:\r
- del self.last\r
- sequences = self.getsequences()\r
- changed = 0\r
- for name, seq in sequences.items():\r
- if name == 'cur':\r
- continue\r
- for n in list:\r
- if n in seq:\r
- seq.remove(n)\r
- changed = 1\r
- if not seq:\r
- del sequences[name]\r
- if changed:\r
- self.putsequences(sequences)\r
-\r
- def getlast(self):\r
- """Return the last message number."""\r
- if not hasattr(self, 'last'):\r
- self.listmessages() # Set self.last\r
- return self.last\r
-\r
- def setlast(self, last):\r
- """Set the last message number."""\r
- if last is None:\r
- if hasattr(self, 'last'):\r
- del self.last\r
- else:\r
- self.last = last\r
-\r
-class Message(mimetools.Message):\r
-\r
- def __init__(self, f, n, fp = None):\r
- """Constructor."""\r
- self.folder = f\r
- self.number = n\r
- if fp is None:\r
- path = f.getmessagefilename(n)\r
- fp = open(path, 'r')\r
- mimetools.Message.__init__(self, fp)\r
-\r
- def __repr__(self):\r
- """String representation."""\r
- return 'Message(%s, %s)' % (repr(self.folder), self.number)\r
-\r
- def getheadertext(self, pred = None):\r
- """Return the message's header text as a string. If an\r
- argument is specified, it is used as a filter predicate to\r
- decide which headers to return (its argument is the header\r
- name converted to lower case)."""\r
- if pred is None:\r
- return ''.join(self.headers)\r
- headers = []\r
- hit = 0\r
- for line in self.headers:\r
- if not line[0].isspace():\r
- i = line.find(':')\r
- if i > 0:\r
- hit = pred(line[:i].lower())\r
- if hit: headers.append(line)\r
- return ''.join(headers)\r
-\r
- def getbodytext(self, decode = 1):\r
- """Return the message's body text as string. This undoes a\r
- Content-Transfer-Encoding, but does not interpret other MIME\r
- features (e.g. multipart messages). To suppress decoding,\r
- pass 0 as an argument."""\r
- self.fp.seek(self.startofbody)\r
- encoding = self.getencoding()\r
- if not decode or encoding in ('', '7bit', '8bit', 'binary'):\r
- return self.fp.read()\r
- try:\r
- from cStringIO import StringIO\r
- except ImportError:\r
- from StringIO import StringIO\r
- output = StringIO()\r
- mimetools.decode(self.fp, output, encoding)\r
- return output.getvalue()\r
-\r
- def getbodyparts(self):\r
- """Only for multipart messages: return the message's body as a\r
- list of SubMessage objects. Each submessage object behaves\r
- (almost) as a Message object."""\r
- if self.getmaintype() != 'multipart':\r
- raise Error, 'Content-Type is not multipart/*'\r
- bdry = self.getparam('boundary')\r
- if not bdry:\r
- raise Error, 'multipart/* without boundary param'\r
- self.fp.seek(self.startofbody)\r
- mf = multifile.MultiFile(self.fp)\r
- mf.push(bdry)\r
- parts = []\r
- while mf.next():\r
- n = "%s.%r" % (self.number, 1 + len(parts))\r
- part = SubMessage(self.folder, n, mf)\r
- parts.append(part)\r
- mf.pop()\r
- return parts\r
-\r
- def getbody(self):\r
- """Return body, either a string or a list of messages."""\r
- if self.getmaintype() == 'multipart':\r
- return self.getbodyparts()\r
- else:\r
- return self.getbodytext()\r
-\r
-\r
-class SubMessage(Message):\r
-\r
- def __init__(self, f, n, fp):\r
- """Constructor."""\r
- Message.__init__(self, f, n, fp)\r
- if self.getmaintype() == 'multipart':\r
- self.body = Message.getbodyparts(self)\r
- else:\r
- self.body = Message.getbodytext(self)\r
- self.bodyencoded = Message.getbodytext(self, decode=0)\r
- # XXX If this is big, should remember file pointers\r
-\r
- def __repr__(self):\r
- """String representation."""\r
- f, n, fp = self.folder, self.number, self.fp\r
- return 'SubMessage(%s, %s, %s)' % (f, n, fp)\r
-\r
- def getbodytext(self, decode = 1):\r
- if not decode:\r
- return self.bodyencoded\r
- if type(self.body) == type(''):\r
- return self.body\r
-\r
- def getbodyparts(self):\r
- if type(self.body) == type([]):\r
- return self.body\r
-\r
- def getbody(self):\r
- return self.body\r
-\r
-\r
-class IntSet:\r
- """Class implementing sets of integers.\r
-\r
- This is an efficient representation for sets consisting of several\r
- continuous ranges, e.g. 1-100,200-400,402-1000 is represented\r
- internally as a list of three pairs: [(1,100), (200,400),\r
- (402,1000)]. The internal representation is always kept normalized.\r
-\r
- The constructor has up to three arguments:\r
- - the string used to initialize the set (default ''),\r
- - the separator between ranges (default ',')\r
- - the separator between begin and end of a range (default '-')\r
- The separators must be strings (not regexprs) and should be different.\r
-\r
- The tostring() function yields a string that can be passed to another\r
- IntSet constructor; __repr__() is a valid IntSet constructor itself.\r
- """\r
-\r
- # XXX The default begin/end separator means that negative numbers are\r
- # not supported very well.\r
- #\r
- # XXX There are currently no operations to remove set elements.\r
-\r
- def __init__(self, data = None, sep = ',', rng = '-'):\r
- self.pairs = []\r
- self.sep = sep\r
- self.rng = rng\r
- if data: self.fromstring(data)\r
-\r
- def reset(self):\r
- self.pairs = []\r
-\r
- def __cmp__(self, other):\r
- return cmp(self.pairs, other.pairs)\r
-\r
- def __hash__(self):\r
- return hash(self.pairs)\r
-\r
- def __repr__(self):\r
- return 'IntSet(%r, %r, %r)' % (self.tostring(), self.sep, self.rng)\r
-\r
- def normalize(self):\r
- self.pairs.sort()\r
- i = 1\r
- while i < len(self.pairs):\r
- alo, ahi = self.pairs[i-1]\r
- blo, bhi = self.pairs[i]\r
- if ahi >= blo-1:\r
- self.pairs[i-1:i+1] = [(alo, max(ahi, bhi))]\r
- else:\r
- i = i+1\r
-\r
- def tostring(self):\r
- s = ''\r
- for lo, hi in self.pairs:\r
- if lo == hi: t = repr(lo)\r
- else: t = repr(lo) + self.rng + repr(hi)\r
- if s: s = s + (self.sep + t)\r
- else: s = t\r
- return s\r
-\r
- def tolist(self):\r
- l = []\r
- for lo, hi in self.pairs:\r
- m = range(lo, hi+1)\r
- l = l + m\r
- return l\r
-\r
- def fromlist(self, list):\r
- for i in list:\r
- self.append(i)\r
-\r
- def clone(self):\r
- new = IntSet()\r
- new.pairs = self.pairs[:]\r
- return new\r
-\r
- def min(self):\r
- return self.pairs[0][0]\r
-\r
- def max(self):\r
- return self.pairs[-1][-1]\r
-\r
- def contains(self, x):\r
- for lo, hi in self.pairs:\r
- if lo <= x <= hi: return True\r
- return False\r
-\r
- def append(self, x):\r
- for i in range(len(self.pairs)):\r
- lo, hi = self.pairs[i]\r
- if x < lo: # Need to insert before\r
- if x+1 == lo:\r
- self.pairs[i] = (x, hi)\r
- else:\r
- self.pairs.insert(i, (x, x))\r
- if i > 0 and x-1 == self.pairs[i-1][1]:\r
- # Merge with previous\r
- self.pairs[i-1:i+1] = [\r
- (self.pairs[i-1][0],\r
- self.pairs[i][1])\r
- ]\r
- return\r
- if x <= hi: # Already in set\r
- return\r
- i = len(self.pairs) - 1\r
- if i >= 0:\r
- lo, hi = self.pairs[i]\r
- if x-1 == hi:\r
- self.pairs[i] = lo, x\r
- return\r
- self.pairs.append((x, x))\r
-\r
- def addpair(self, xlo, xhi):\r
- if xlo > xhi: return\r
- self.pairs.append((xlo, xhi))\r
- self.normalize()\r
-\r
- def fromstring(self, data):\r
- new = []\r
- for part in data.split(self.sep):\r
- list = []\r
- for subp in part.split(self.rng):\r
- s = subp.strip()\r
- list.append(int(s))\r
- if len(list) == 1:\r
- new.append((list[0], list[0]))\r
- elif len(list) == 2 and list[0] <= list[1]:\r
- new.append((list[0], list[1]))\r
- else:\r
- raise ValueError, 'bad data passed to IntSet'\r
- self.pairs = self.pairs + new\r
- self.normalize()\r
-\r
-\r
-# Subroutines to read/write entries in .mh_profile and .mh_sequences\r
-\r
-def pickline(file, key, casefold = 1):\r
- try:\r
- f = open(file, 'r')\r
- except IOError:\r
- return None\r
- pat = re.escape(key) + ':'\r
- prog = re.compile(pat, casefold and re.IGNORECASE)\r
- while 1:\r
- line = f.readline()\r
- if not line: break\r
- if prog.match(line):\r
- text = line[len(key)+1:]\r
- while 1:\r
- line = f.readline()\r
- if not line or not line[0].isspace():\r
- break\r
- text = text + line\r
- return text.strip()\r
- return None\r
-\r
-def updateline(file, key, value, casefold = 1):\r
- try:\r
- f = open(file, 'r')\r
- lines = f.readlines()\r
- f.close()\r
- except IOError:\r
- lines = []\r
- pat = re.escape(key) + ':(.*)\n'\r
- prog = re.compile(pat, casefold and re.IGNORECASE)\r
- if value is None:\r
- newline = None\r
- else:\r
- newline = '%s: %s\n' % (key, value)\r
- for i in range(len(lines)):\r
- line = lines[i]\r
- if prog.match(line):\r
- if newline is None:\r
- del lines[i]\r
- else:\r
- lines[i] = newline\r
- break\r
- else:\r
- if newline is not None:\r
- lines.append(newline)\r
- tempfile = file + "~"\r
- f = open(tempfile, 'w')\r
- for line in lines:\r
- f.write(line)\r
- f.close()\r
- os.rename(tempfile, file)\r
-\r
-\r
-# Test program\r
-\r
-def test():\r
- global mh, f\r
- os.system('rm -rf $HOME/Mail/@test')\r
- mh = MH()\r
- def do(s): print s; print eval(s)\r
- do('mh.listfolders()')\r
- do('mh.listallfolders()')\r
- testfolders = ['@test', '@test/test1', '@test/test2',\r
- '@test/test1/test11', '@test/test1/test12',\r
- '@test/test1/test11/test111']\r
- for t in testfolders: do('mh.makefolder(%r)' % (t,))\r
- do('mh.listsubfolders(\'@test\')')\r
- do('mh.listallsubfolders(\'@test\')')\r
- f = mh.openfolder('@test')\r
- do('f.listsubfolders()')\r
- do('f.listallsubfolders()')\r
- do('f.getsequences()')\r
- seqs = f.getsequences()\r
- seqs['foo'] = IntSet('1-10 12-20', ' ').tolist()\r
- print seqs\r
- f.putsequences(seqs)\r
- do('f.getsequences()')\r
- for t in reversed(testfolders): do('mh.deletefolder(%r)' % (t,))\r
- do('mh.getcontext()')\r
- context = mh.getcontext()\r
- f = mh.openfolder(context)\r
- do('f.getcurrent()')\r
- for seq in ('first', 'last', 'cur', '.', 'prev', 'next',\r
- 'first:3', 'last:3', 'cur:3', 'cur:-3',\r
- 'prev:3', 'next:3',\r
- '1:3', '1:-3', '100:3', '100:-3', '10000:3', '10000:-3',\r
- 'all'):\r
- try:\r
- do('f.parsesequence(%r)' % (seq,))\r
- except Error, msg:\r
- print "Error:", msg\r
- stuff = os.popen("pick %r 2>/dev/null" % (seq,)).read()\r
- list = map(int, stuff.split())\r
- print list, "<-- pick"\r
- do('f.listmessages()')\r
-\r
-\r
-if __name__ == '__main__':\r
- test()\r