+++ /dev/null
-\r
-"""\r
-csv.py - read/write/investigate CSV files\r
-"""\r
-\r
-import re\r
-from functools import reduce\r
-from _csv import Error, __version__, writer, reader, register_dialect, \\r
- unregister_dialect, get_dialect, list_dialects, \\r
- field_size_limit, \\r
- QUOTE_MINIMAL, QUOTE_ALL, QUOTE_NONNUMERIC, QUOTE_NONE, \\r
- __doc__\r
-from _csv import Dialect as _Dialect\r
-\r
-try:\r
- from cStringIO import StringIO\r
-except ImportError:\r
- from StringIO import StringIO\r
-\r
-__all__ = [ "QUOTE_MINIMAL", "QUOTE_ALL", "QUOTE_NONNUMERIC", "QUOTE_NONE",\r
- "Error", "Dialect", "__doc__", "excel", "excel_tab",\r
- "field_size_limit", "reader", "writer",\r
- "register_dialect", "get_dialect", "list_dialects", "Sniffer",\r
- "unregister_dialect", "__version__", "DictReader", "DictWriter" ]\r
-\r
-class Dialect:\r
- """Describe an Excel dialect.\r
-\r
- This must be subclassed (see csv.excel). Valid attributes are:\r
- delimiter, quotechar, escapechar, doublequote, skipinitialspace,\r
- lineterminator, quoting.\r
-\r
- """\r
- _name = ""\r
- _valid = False\r
- # placeholders\r
- delimiter = None\r
- quotechar = None\r
- escapechar = None\r
- doublequote = None\r
- skipinitialspace = None\r
- lineterminator = None\r
- quoting = None\r
-\r
- def __init__(self):\r
- if self.__class__ != Dialect:\r
- self._valid = True\r
- self._validate()\r
-\r
- def _validate(self):\r
- try:\r
- _Dialect(self)\r
- except TypeError, e:\r
- # We do this for compatibility with py2.3\r
- raise Error(str(e))\r
-\r
-class excel(Dialect):\r
- """Describe the usual properties of Excel-generated CSV files."""\r
- delimiter = ','\r
- quotechar = '"'\r
- doublequote = True\r
- skipinitialspace = False\r
- lineterminator = '\r\n'\r
- quoting = QUOTE_MINIMAL\r
-register_dialect("excel", excel)\r
-\r
-class excel_tab(excel):\r
- """Describe the usual properties of Excel-generated TAB-delimited files."""\r
- delimiter = '\t'\r
-register_dialect("excel-tab", excel_tab)\r
-\r
-\r
-class DictReader:\r
- def __init__(self, f, fieldnames=None, restkey=None, restval=None,\r
- dialect="excel", *args, **kwds):\r
- self._fieldnames = fieldnames # list of keys for the dict\r
- self.restkey = restkey # key to catch long rows\r
- self.restval = restval # default value for short rows\r
- self.reader = reader(f, dialect, *args, **kwds)\r
- self.dialect = dialect\r
- self.line_num = 0\r
-\r
- def __iter__(self):\r
- return self\r
-\r
- @property\r
- def fieldnames(self):\r
- if self._fieldnames is None:\r
- try:\r
- self._fieldnames = self.reader.next()\r
- except StopIteration:\r
- pass\r
- self.line_num = self.reader.line_num\r
- return self._fieldnames\r
-\r
- @fieldnames.setter\r
- def fieldnames(self, value):\r
- self._fieldnames = value\r
-\r
- def next(self):\r
- if self.line_num == 0:\r
- # Used only for its side effect.\r
- self.fieldnames\r
- row = self.reader.next()\r
- self.line_num = self.reader.line_num\r
-\r
- # unlike the basic reader, we prefer not to return blanks,\r
- # because we will typically wind up with a dict full of None\r
- # values\r
- while row == []:\r
- row = self.reader.next()\r
- d = dict(zip(self.fieldnames, row))\r
- lf = len(self.fieldnames)\r
- lr = len(row)\r
- if lf < lr:\r
- d[self.restkey] = row[lf:]\r
- elif lf > lr:\r
- for key in self.fieldnames[lr:]:\r
- d[key] = self.restval\r
- return d\r
-\r
-\r
-class DictWriter:\r
- def __init__(self, f, fieldnames, restval="", extrasaction="raise",\r
- dialect="excel", *args, **kwds):\r
- self.fieldnames = fieldnames # list of keys for the dict\r
- self.restval = restval # for writing short dicts\r
- if extrasaction.lower() not in ("raise", "ignore"):\r
- raise ValueError, \\r
- ("extrasaction (%s) must be 'raise' or 'ignore'" %\r
- extrasaction)\r
- self.extrasaction = extrasaction\r
- self.writer = writer(f, dialect, *args, **kwds)\r
-\r
- def writeheader(self):\r
- header = dict(zip(self.fieldnames, self.fieldnames))\r
- self.writerow(header)\r
-\r
- def _dict_to_list(self, rowdict):\r
- if self.extrasaction == "raise":\r
- wrong_fields = [k for k in rowdict if k not in self.fieldnames]\r
- if wrong_fields:\r
- raise ValueError("dict contains fields not in fieldnames: " +\r
- ", ".join(wrong_fields))\r
- return [rowdict.get(key, self.restval) for key in self.fieldnames]\r
-\r
- def writerow(self, rowdict):\r
- return self.writer.writerow(self._dict_to_list(rowdict))\r
-\r
- def writerows(self, rowdicts):\r
- rows = []\r
- for rowdict in rowdicts:\r
- rows.append(self._dict_to_list(rowdict))\r
- return self.writer.writerows(rows)\r
-\r
-# Guard Sniffer's type checking against builds that exclude complex()\r
-try:\r
- complex\r
-except NameError:\r
- complex = float\r
-\r
-class Sniffer:\r
- '''\r
- "Sniffs" the format of a CSV file (i.e. delimiter, quotechar)\r
- Returns a Dialect object.\r
- '''\r
- def __init__(self):\r
- # in case there is more than one possible delimiter\r
- self.preferred = [',', '\t', ';', ' ', ':']\r
-\r
-\r
- def sniff(self, sample, delimiters=None):\r
- """\r
- Returns a dialect (or None) corresponding to the sample\r
- """\r
-\r
- quotechar, doublequote, delimiter, skipinitialspace = \\r
- self._guess_quote_and_delimiter(sample, delimiters)\r
- if not delimiter:\r
- delimiter, skipinitialspace = self._guess_delimiter(sample,\r
- delimiters)\r
-\r
- if not delimiter:\r
- raise Error, "Could not determine delimiter"\r
-\r
- class dialect(Dialect):\r
- _name = "sniffed"\r
- lineterminator = '\r\n'\r
- quoting = QUOTE_MINIMAL\r
- # escapechar = ''\r
-\r
- dialect.doublequote = doublequote\r
- dialect.delimiter = delimiter\r
- # _csv.reader won't accept a quotechar of ''\r
- dialect.quotechar = quotechar or '"'\r
- dialect.skipinitialspace = skipinitialspace\r
-\r
- return dialect\r
-\r
-\r
- def _guess_quote_and_delimiter(self, data, delimiters):\r
- """\r
- Looks for text enclosed between two identical quotes\r
- (the probable quotechar) which are preceded and followed\r
- by the same character (the probable delimiter).\r
- For example:\r
- ,'some text',\r
- The quote with the most wins, same with the delimiter.\r
- If there is no quotechar the delimiter can't be determined\r
- this way.\r
- """\r
-\r
- matches = []\r
- for restr in ('(?P<delim>[^\w\n"\'])(?P<space> ?)(?P<quote>["\']).*?(?P=quote)(?P=delim)', # ,".*?",\r
- '(?:^|\n)(?P<quote>["\']).*?(?P=quote)(?P<delim>[^\w\n"\'])(?P<space> ?)', # ".*?",\r
- '(?P<delim>>[^\w\n"\'])(?P<space> ?)(?P<quote>["\']).*?(?P=quote)(?:$|\n)', # ,".*?"\r
- '(?:^|\n)(?P<quote>["\']).*?(?P=quote)(?:$|\n)'): # ".*?" (no delim, no space)\r
- regexp = re.compile(restr, re.DOTALL | re.MULTILINE)\r
- matches = regexp.findall(data)\r
- if matches:\r
- break\r
-\r
- if not matches:\r
- # (quotechar, doublequote, delimiter, skipinitialspace)\r
- return ('', False, None, 0)\r
- quotes = {}\r
- delims = {}\r
- spaces = 0\r
- for m in matches:\r
- n = regexp.groupindex['quote'] - 1\r
- key = m[n]\r
- if key:\r
- quotes[key] = quotes.get(key, 0) + 1\r
- try:\r
- n = regexp.groupindex['delim'] - 1\r
- key = m[n]\r
- except KeyError:\r
- continue\r
- if key and (delimiters is None or key in delimiters):\r
- delims[key] = delims.get(key, 0) + 1\r
- try:\r
- n = regexp.groupindex['space'] - 1\r
- except KeyError:\r
- continue\r
- if m[n]:\r
- spaces += 1\r
-\r
- quotechar = reduce(lambda a, b, quotes = quotes:\r
- (quotes[a] > quotes[b]) and a or b, quotes.keys())\r
-\r
- if delims:\r
- delim = reduce(lambda a, b, delims = delims:\r
- (delims[a] > delims[b]) and a or b, delims.keys())\r
- skipinitialspace = delims[delim] == spaces\r
- if delim == '\n': # most likely a file with a single column\r
- delim = ''\r
- else:\r
- # there is *no* delimiter, it's a single column of quoted data\r
- delim = ''\r
- skipinitialspace = 0\r
-\r
- # if we see an extra quote between delimiters, we've got a\r
- # double quoted format\r
- dq_regexp = re.compile(r"((%(delim)s)|^)\W*%(quote)s[^%(delim)s\n]*%(quote)s[^%(delim)s\n]*%(quote)s\W*((%(delim)s)|$)" % \\r
- {'delim':delim, 'quote':quotechar}, re.MULTILINE)\r
-\r
-\r
-\r
- if dq_regexp.search(data):\r
- doublequote = True\r
- else:\r
- doublequote = False\r
-\r
- return (quotechar, doublequote, delim, skipinitialspace)\r
-\r
-\r
- def _guess_delimiter(self, data, delimiters):\r
- """\r
- The delimiter /should/ occur the same number of times on\r
- each row. However, due to malformed data, it may not. We don't want\r
- an all or nothing approach, so we allow for small variations in this\r
- number.\r
- 1) build a table of the frequency of each character on every line.\r
- 2) build a table of frequencies of this frequency (meta-frequency?),\r
- e.g. 'x occurred 5 times in 10 rows, 6 times in 1000 rows,\r
- 7 times in 2 rows'\r
- 3) use the mode of the meta-frequency to determine the /expected/\r
- frequency for that character\r
- 4) find out how often the character actually meets that goal\r
- 5) the character that best meets its goal is the delimiter\r
- For performance reasons, the data is evaluated in chunks, so it can\r
- try and evaluate the smallest portion of the data possible, evaluating\r
- additional chunks as necessary.\r
- """\r
-\r
- data = filter(None, data.split('\n'))\r
-\r
- ascii = [chr(c) for c in range(127)] # 7-bit ASCII\r
-\r
- # build frequency tables\r
- chunkLength = min(10, len(data))\r
- iteration = 0\r
- charFrequency = {}\r
- modes = {}\r
- delims = {}\r
- start, end = 0, min(chunkLength, len(data))\r
- while start < len(data):\r
- iteration += 1\r
- for line in data[start:end]:\r
- for char in ascii:\r
- metaFrequency = charFrequency.get(char, {})\r
- # must count even if frequency is 0\r
- freq = line.count(char)\r
- # value is the mode\r
- metaFrequency[freq] = metaFrequency.get(freq, 0) + 1\r
- charFrequency[char] = metaFrequency\r
-\r
- for char in charFrequency.keys():\r
- items = charFrequency[char].items()\r
- if len(items) == 1 and items[0][0] == 0:\r
- continue\r
- # get the mode of the frequencies\r
- if len(items) > 1:\r
- modes[char] = reduce(lambda a, b: a[1] > b[1] and a or b,\r
- items)\r
- # adjust the mode - subtract the sum of all\r
- # other frequencies\r
- items.remove(modes[char])\r
- modes[char] = (modes[char][0], modes[char][1]\r
- - reduce(lambda a, b: (0, a[1] + b[1]),\r
- items)[1])\r
- else:\r
- modes[char] = items[0]\r
-\r
- # build a list of possible delimiters\r
- modeList = modes.items()\r
- total = float(chunkLength * iteration)\r
- # (rows of consistent data) / (number of rows) = 100%\r
- consistency = 1.0\r
- # minimum consistency threshold\r
- threshold = 0.9\r
- while len(delims) == 0 and consistency >= threshold:\r
- for k, v in modeList:\r
- if v[0] > 0 and v[1] > 0:\r
- if ((v[1]/total) >= consistency and\r
- (delimiters is None or k in delimiters)):\r
- delims[k] = v\r
- consistency -= 0.01\r
-\r
- if len(delims) == 1:\r
- delim = delims.keys()[0]\r
- skipinitialspace = (data[0].count(delim) ==\r
- data[0].count("%c " % delim))\r
- return (delim, skipinitialspace)\r
-\r
- # analyze another chunkLength lines\r
- start = end\r
- end += chunkLength\r
-\r
- if not delims:\r
- return ('', 0)\r
-\r
- # if there's more than one, fall back to a 'preferred' list\r
- if len(delims) > 1:\r
- for d in self.preferred:\r
- if d in delims.keys():\r
- skipinitialspace = (data[0].count(d) ==\r
- data[0].count("%c " % d))\r
- return (d, skipinitialspace)\r
-\r
- # nothing else indicates a preference, pick the character that\r
- # dominates(?)\r
- items = [(v,k) for (k,v) in delims.items()]\r
- items.sort()\r
- delim = items[-1][1]\r
-\r
- skipinitialspace = (data[0].count(delim) ==\r
- data[0].count("%c " % delim))\r
- return (delim, skipinitialspace)\r
-\r
-\r
- def has_header(self, sample):\r
- # Creates a dictionary of types of data in each column. If any\r
- # column is of a single type (say, integers), *except* for the first\r
- # row, then the first row is presumed to be labels. If the type\r
- # can't be determined, it is assumed to be a string in which case\r
- # the length of the string is the determining factor: if all of the\r
- # rows except for the first are the same length, it's a header.\r
- # Finally, a 'vote' is taken at the end for each column, adding or\r
- # subtracting from the likelihood of the first row being a header.\r
-\r
- rdr = reader(StringIO(sample), self.sniff(sample))\r
-\r
- header = rdr.next() # assume first row is header\r
-\r
- columns = len(header)\r
- columnTypes = {}\r
- for i in range(columns): columnTypes[i] = None\r
-\r
- checked = 0\r
- for row in rdr:\r
- # arbitrary number of rows to check, to keep it sane\r
- if checked > 20:\r
- break\r
- checked += 1\r
-\r
- if len(row) != columns:\r
- continue # skip rows that have irregular number of columns\r
-\r
- for col in columnTypes.keys():\r
-\r
- for thisType in [int, long, float, complex]:\r
- try:\r
- thisType(row[col])\r
- break\r
- except (ValueError, OverflowError):\r
- pass\r
- else:\r
- # fallback to length of string\r
- thisType = len(row[col])\r
-\r
- # treat longs as ints\r
- if thisType == long:\r
- thisType = int\r
-\r
- if thisType != columnTypes[col]:\r
- if columnTypes[col] is None: # add new column type\r
- columnTypes[col] = thisType\r
- else:\r
- # type is inconsistent, remove column from\r
- # consideration\r
- del columnTypes[col]\r
-\r
- # finally, compare results against first row and "vote"\r
- # on whether it's a header\r
- hasHeader = 0\r
- for col, colType in columnTypes.items():\r
- if type(colType) == type(0): # it's a length\r
- if len(header[col]) != colType:\r
- hasHeader += 1\r
- else:\r
- hasHeader -= 1\r
- else: # attempt typecast\r
- try:\r
- colType(header[col])\r
- except (ValueError, TypeError):\r
- hasHeader += 1\r
- else:\r
- hasHeader -= 1\r
-\r
- return hasHeader > 0\r