]>
Commit | Line | Data |
---|---|---|
3257aa99 DM |
1 | """Macintosh binhex compression/decompression.\r |
2 | \r | |
3 | easy interface:\r | |
4 | binhex(inputfilename, outputfilename)\r | |
5 | hexbin(inputfilename, outputfilename)\r | |
6 | """\r | |
7 | \r | |
8 | #\r | |
9 | # Jack Jansen, CWI, August 1995.\r | |
10 | #\r | |
11 | # The module is supposed to be as compatible as possible. Especially the\r | |
12 | # easy interface should work "as expected" on any platform.\r | |
13 | # XXXX Note: currently, textfiles appear in mac-form on all platforms.\r | |
14 | # We seem to lack a simple character-translate in python.\r | |
15 | # (we should probably use ISO-Latin-1 on all but the mac platform).\r | |
16 | # XXXX The simple routines are too simple: they expect to hold the complete\r | |
17 | # files in-core. Should be fixed.\r | |
18 | # XXXX It would be nice to handle AppleDouble format on unix\r | |
19 | # (for servers serving macs).\r | |
20 | # XXXX I don't understand what happens when you get 0x90 times the same byte on\r | |
21 | # input. The resulting code (xx 90 90) would appear to be interpreted as an\r | |
22 | # escaped *value* of 0x90. All coders I've seen appear to ignore this nicety...\r | |
23 | #\r | |
24 | import sys\r | |
25 | import os\r | |
26 | import struct\r | |
27 | import binascii\r | |
28 | \r | |
29 | __all__ = ["binhex","hexbin","Error"]\r | |
30 | \r | |
31 | class Error(Exception):\r | |
32 | pass\r | |
33 | \r | |
34 | # States (what have we written)\r | |
35 | _DID_HEADER = 0\r | |
36 | _DID_DATA = 1\r | |
37 | \r | |
38 | # Various constants\r | |
39 | REASONABLY_LARGE=32768 # Minimal amount we pass the rle-coder\r | |
40 | LINELEN=64\r | |
41 | RUNCHAR=chr(0x90) # run-length introducer\r | |
42 | \r | |
43 | #\r | |
44 | # This code is no longer byte-order dependent\r | |
45 | \r | |
46 | #\r | |
47 | # Workarounds for non-mac machines.\r | |
48 | try:\r | |
49 | from Carbon.File import FSSpec, FInfo\r | |
50 | from MacOS import openrf\r | |
51 | \r | |
52 | def getfileinfo(name):\r | |
53 | finfo = FSSpec(name).FSpGetFInfo()\r | |
54 | dir, file = os.path.split(name)\r | |
55 | # XXX Get resource/data sizes\r | |
56 | fp = open(name, 'rb')\r | |
57 | fp.seek(0, 2)\r | |
58 | dlen = fp.tell()\r | |
59 | fp = openrf(name, '*rb')\r | |
60 | fp.seek(0, 2)\r | |
61 | rlen = fp.tell()\r | |
62 | return file, finfo, dlen, rlen\r | |
63 | \r | |
64 | def openrsrc(name, *mode):\r | |
65 | if not mode:\r | |
66 | mode = '*rb'\r | |
67 | else:\r | |
68 | mode = '*' + mode[0]\r | |
69 | return openrf(name, mode)\r | |
70 | \r | |
71 | except ImportError:\r | |
72 | #\r | |
73 | # Glue code for non-macintosh usage\r | |
74 | #\r | |
75 | \r | |
76 | class FInfo:\r | |
77 | def __init__(self):\r | |
78 | self.Type = '????'\r | |
79 | self.Creator = '????'\r | |
80 | self.Flags = 0\r | |
81 | \r | |
82 | def getfileinfo(name):\r | |
83 | finfo = FInfo()\r | |
84 | # Quick check for textfile\r | |
85 | fp = open(name)\r | |
86 | data = open(name).read(256)\r | |
87 | for c in data:\r | |
88 | if not c.isspace() and (c<' ' or ord(c) > 0x7f):\r | |
89 | break\r | |
90 | else:\r | |
91 | finfo.Type = 'TEXT'\r | |
92 | fp.seek(0, 2)\r | |
93 | dsize = fp.tell()\r | |
94 | fp.close()\r | |
95 | dir, file = os.path.split(name)\r | |
96 | file = file.replace(':', '-', 1)\r | |
97 | return file, finfo, dsize, 0\r | |
98 | \r | |
99 | class openrsrc:\r | |
100 | def __init__(self, *args):\r | |
101 | pass\r | |
102 | \r | |
103 | def read(self, *args):\r | |
104 | return ''\r | |
105 | \r | |
106 | def write(self, *args):\r | |
107 | pass\r | |
108 | \r | |
109 | def close(self):\r | |
110 | pass\r | |
111 | \r | |
112 | class _Hqxcoderengine:\r | |
113 | """Write data to the coder in 3-byte chunks"""\r | |
114 | \r | |
115 | def __init__(self, ofp):\r | |
116 | self.ofp = ofp\r | |
117 | self.data = ''\r | |
118 | self.hqxdata = ''\r | |
119 | self.linelen = LINELEN-1\r | |
120 | \r | |
121 | def write(self, data):\r | |
122 | self.data = self.data + data\r | |
123 | datalen = len(self.data)\r | |
124 | todo = (datalen//3)*3\r | |
125 | data = self.data[:todo]\r | |
126 | self.data = self.data[todo:]\r | |
127 | if not data:\r | |
128 | return\r | |
129 | self.hqxdata = self.hqxdata + binascii.b2a_hqx(data)\r | |
130 | self._flush(0)\r | |
131 | \r | |
132 | def _flush(self, force):\r | |
133 | first = 0\r | |
134 | while first <= len(self.hqxdata)-self.linelen:\r | |
135 | last = first + self.linelen\r | |
136 | self.ofp.write(self.hqxdata[first:last]+'\n')\r | |
137 | self.linelen = LINELEN\r | |
138 | first = last\r | |
139 | self.hqxdata = self.hqxdata[first:]\r | |
140 | if force:\r | |
141 | self.ofp.write(self.hqxdata + ':\n')\r | |
142 | \r | |
143 | def close(self):\r | |
144 | if self.data:\r | |
145 | self.hqxdata = \\r | |
146 | self.hqxdata + binascii.b2a_hqx(self.data)\r | |
147 | self._flush(1)\r | |
148 | self.ofp.close()\r | |
149 | del self.ofp\r | |
150 | \r | |
151 | class _Rlecoderengine:\r | |
152 | """Write data to the RLE-coder in suitably large chunks"""\r | |
153 | \r | |
154 | def __init__(self, ofp):\r | |
155 | self.ofp = ofp\r | |
156 | self.data = ''\r | |
157 | \r | |
158 | def write(self, data):\r | |
159 | self.data = self.data + data\r | |
160 | if len(self.data) < REASONABLY_LARGE:\r | |
161 | return\r | |
162 | rledata = binascii.rlecode_hqx(self.data)\r | |
163 | self.ofp.write(rledata)\r | |
164 | self.data = ''\r | |
165 | \r | |
166 | def close(self):\r | |
167 | if self.data:\r | |
168 | rledata = binascii.rlecode_hqx(self.data)\r | |
169 | self.ofp.write(rledata)\r | |
170 | self.ofp.close()\r | |
171 | del self.ofp\r | |
172 | \r | |
173 | class BinHex:\r | |
174 | def __init__(self, name_finfo_dlen_rlen, ofp):\r | |
175 | name, finfo, dlen, rlen = name_finfo_dlen_rlen\r | |
176 | if type(ofp) == type(''):\r | |
177 | ofname = ofp\r | |
178 | ofp = open(ofname, 'w')\r | |
179 | ofp.write('(This file must be converted with BinHex 4.0)\n\n:')\r | |
180 | hqxer = _Hqxcoderengine(ofp)\r | |
181 | self.ofp = _Rlecoderengine(hqxer)\r | |
182 | self.crc = 0\r | |
183 | if finfo is None:\r | |
184 | finfo = FInfo()\r | |
185 | self.dlen = dlen\r | |
186 | self.rlen = rlen\r | |
187 | self._writeinfo(name, finfo)\r | |
188 | self.state = _DID_HEADER\r | |
189 | \r | |
190 | def _writeinfo(self, name, finfo):\r | |
191 | nl = len(name)\r | |
192 | if nl > 63:\r | |
193 | raise Error, 'Filename too long'\r | |
194 | d = chr(nl) + name + '\0'\r | |
195 | d2 = finfo.Type + finfo.Creator\r | |
196 | \r | |
197 | # Force all structs to be packed with big-endian\r | |
198 | d3 = struct.pack('>h', finfo.Flags)\r | |
199 | d4 = struct.pack('>ii', self.dlen, self.rlen)\r | |
200 | info = d + d2 + d3 + d4\r | |
201 | self._write(info)\r | |
202 | self._writecrc()\r | |
203 | \r | |
204 | def _write(self, data):\r | |
205 | self.crc = binascii.crc_hqx(data, self.crc)\r | |
206 | self.ofp.write(data)\r | |
207 | \r | |
208 | def _writecrc(self):\r | |
209 | # XXXX Should this be here??\r | |
210 | # self.crc = binascii.crc_hqx('\0\0', self.crc)\r | |
211 | if self.crc < 0:\r | |
212 | fmt = '>h'\r | |
213 | else:\r | |
214 | fmt = '>H'\r | |
215 | self.ofp.write(struct.pack(fmt, self.crc))\r | |
216 | self.crc = 0\r | |
217 | \r | |
218 | def write(self, data):\r | |
219 | if self.state != _DID_HEADER:\r | |
220 | raise Error, 'Writing data at the wrong time'\r | |
221 | self.dlen = self.dlen - len(data)\r | |
222 | self._write(data)\r | |
223 | \r | |
224 | def close_data(self):\r | |
225 | if self.dlen != 0:\r | |
226 | raise Error, 'Incorrect data size, diff=%r' % (self.rlen,)\r | |
227 | self._writecrc()\r | |
228 | self.state = _DID_DATA\r | |
229 | \r | |
230 | def write_rsrc(self, data):\r | |
231 | if self.state < _DID_DATA:\r | |
232 | self.close_data()\r | |
233 | if self.state != _DID_DATA:\r | |
234 | raise Error, 'Writing resource data at the wrong time'\r | |
235 | self.rlen = self.rlen - len(data)\r | |
236 | self._write(data)\r | |
237 | \r | |
238 | def close(self):\r | |
239 | if self.state is None:\r | |
240 | return\r | |
241 | try:\r | |
242 | if self.state < _DID_DATA:\r | |
243 | self.close_data()\r | |
244 | if self.state != _DID_DATA:\r | |
245 | raise Error, 'Close at the wrong time'\r | |
246 | if self.rlen != 0:\r | |
247 | raise Error, \\r | |
248 | "Incorrect resource-datasize, diff=%r" % (self.rlen,)\r | |
249 | self._writecrc()\r | |
250 | finally:\r | |
251 | self.state = None\r | |
252 | ofp = self.ofp\r | |
253 | del self.ofp\r | |
254 | ofp.close()\r | |
255 | \r | |
256 | def binhex(inp, out):\r | |
257 | """(infilename, outfilename) - Create binhex-encoded copy of a file"""\r | |
258 | finfo = getfileinfo(inp)\r | |
259 | ofp = BinHex(finfo, out)\r | |
260 | \r | |
261 | ifp = open(inp, 'rb')\r | |
262 | # XXXX Do textfile translation on non-mac systems\r | |
263 | while 1:\r | |
264 | d = ifp.read(128000)\r | |
265 | if not d: break\r | |
266 | ofp.write(d)\r | |
267 | ofp.close_data()\r | |
268 | ifp.close()\r | |
269 | \r | |
270 | ifp = openrsrc(inp, 'rb')\r | |
271 | while 1:\r | |
272 | d = ifp.read(128000)\r | |
273 | if not d: break\r | |
274 | ofp.write_rsrc(d)\r | |
275 | ofp.close()\r | |
276 | ifp.close()\r | |
277 | \r | |
278 | class _Hqxdecoderengine:\r | |
279 | """Read data via the decoder in 4-byte chunks"""\r | |
280 | \r | |
281 | def __init__(self, ifp):\r | |
282 | self.ifp = ifp\r | |
283 | self.eof = 0\r | |
284 | \r | |
285 | def read(self, totalwtd):\r | |
286 | """Read at least wtd bytes (or until EOF)"""\r | |
287 | decdata = ''\r | |
288 | wtd = totalwtd\r | |
289 | #\r | |
290 | # The loop here is convoluted, since we don't really now how\r | |
291 | # much to decode: there may be newlines in the incoming data.\r | |
292 | while wtd > 0:\r | |
293 | if self.eof: return decdata\r | |
294 | wtd = ((wtd+2)//3)*4\r | |
295 | data = self.ifp.read(wtd)\r | |
296 | #\r | |
297 | # Next problem: there may not be a complete number of\r | |
298 | # bytes in what we pass to a2b. Solve by yet another\r | |
299 | # loop.\r | |
300 | #\r | |
301 | while 1:\r | |
302 | try:\r | |
303 | decdatacur, self.eof = \\r | |
304 | binascii.a2b_hqx(data)\r | |
305 | break\r | |
306 | except binascii.Incomplete:\r | |
307 | pass\r | |
308 | newdata = self.ifp.read(1)\r | |
309 | if not newdata:\r | |
310 | raise Error, \\r | |
311 | 'Premature EOF on binhex file'\r | |
312 | data = data + newdata\r | |
313 | decdata = decdata + decdatacur\r | |
314 | wtd = totalwtd - len(decdata)\r | |
315 | if not decdata and not self.eof:\r | |
316 | raise Error, 'Premature EOF on binhex file'\r | |
317 | return decdata\r | |
318 | \r | |
319 | def close(self):\r | |
320 | self.ifp.close()\r | |
321 | \r | |
322 | class _Rledecoderengine:\r | |
323 | """Read data via the RLE-coder"""\r | |
324 | \r | |
325 | def __init__(self, ifp):\r | |
326 | self.ifp = ifp\r | |
327 | self.pre_buffer = ''\r | |
328 | self.post_buffer = ''\r | |
329 | self.eof = 0\r | |
330 | \r | |
331 | def read(self, wtd):\r | |
332 | if wtd > len(self.post_buffer):\r | |
333 | self._fill(wtd-len(self.post_buffer))\r | |
334 | rv = self.post_buffer[:wtd]\r | |
335 | self.post_buffer = self.post_buffer[wtd:]\r | |
336 | return rv\r | |
337 | \r | |
338 | def _fill(self, wtd):\r | |
339 | self.pre_buffer = self.pre_buffer + self.ifp.read(wtd+4)\r | |
340 | if self.ifp.eof:\r | |
341 | self.post_buffer = self.post_buffer + \\r | |
342 | binascii.rledecode_hqx(self.pre_buffer)\r | |
343 | self.pre_buffer = ''\r | |
344 | return\r | |
345 | \r | |
346 | #\r | |
347 | # Obfuscated code ahead. We have to take care that we don't\r | |
348 | # end up with an orphaned RUNCHAR later on. So, we keep a couple\r | |
349 | # of bytes in the buffer, depending on what the end of\r | |
350 | # the buffer looks like:\r | |
351 | # '\220\0\220' - Keep 3 bytes: repeated \220 (escaped as \220\0)\r | |
352 | # '?\220' - Keep 2 bytes: repeated something-else\r | |
353 | # '\220\0' - Escaped \220: Keep 2 bytes.\r | |
354 | # '?\220?' - Complete repeat sequence: decode all\r | |
355 | # otherwise: keep 1 byte.\r | |
356 | #\r | |
357 | mark = len(self.pre_buffer)\r | |
358 | if self.pre_buffer[-3:] == RUNCHAR + '\0' + RUNCHAR:\r | |
359 | mark = mark - 3\r | |
360 | elif self.pre_buffer[-1] == RUNCHAR:\r | |
361 | mark = mark - 2\r | |
362 | elif self.pre_buffer[-2:] == RUNCHAR + '\0':\r | |
363 | mark = mark - 2\r | |
364 | elif self.pre_buffer[-2] == RUNCHAR:\r | |
365 | pass # Decode all\r | |
366 | else:\r | |
367 | mark = mark - 1\r | |
368 | \r | |
369 | self.post_buffer = self.post_buffer + \\r | |
370 | binascii.rledecode_hqx(self.pre_buffer[:mark])\r | |
371 | self.pre_buffer = self.pre_buffer[mark:]\r | |
372 | \r | |
373 | def close(self):\r | |
374 | self.ifp.close()\r | |
375 | \r | |
376 | class HexBin:\r | |
377 | def __init__(self, ifp):\r | |
378 | if type(ifp) == type(''):\r | |
379 | ifp = open(ifp)\r | |
380 | #\r | |
381 | # Find initial colon.\r | |
382 | #\r | |
383 | while 1:\r | |
384 | ch = ifp.read(1)\r | |
385 | if not ch:\r | |
386 | raise Error, "No binhex data found"\r | |
387 | # Cater for \r\n terminated lines (which show up as \n\r, hence\r | |
388 | # all lines start with \r)\r | |
389 | if ch == '\r':\r | |
390 | continue\r | |
391 | if ch == ':':\r | |
392 | break\r | |
393 | if ch != '\n':\r | |
394 | dummy = ifp.readline()\r | |
395 | \r | |
396 | hqxifp = _Hqxdecoderengine(ifp)\r | |
397 | self.ifp = _Rledecoderengine(hqxifp)\r | |
398 | self.crc = 0\r | |
399 | self._readheader()\r | |
400 | \r | |
401 | def _read(self, len):\r | |
402 | data = self.ifp.read(len)\r | |
403 | self.crc = binascii.crc_hqx(data, self.crc)\r | |
404 | return data\r | |
405 | \r | |
406 | def _checkcrc(self):\r | |
407 | filecrc = struct.unpack('>h', self.ifp.read(2))[0] & 0xffff\r | |
408 | #self.crc = binascii.crc_hqx('\0\0', self.crc)\r | |
409 | # XXXX Is this needed??\r | |
410 | self.crc = self.crc & 0xffff\r | |
411 | if filecrc != self.crc:\r | |
412 | raise Error, 'CRC error, computed %x, read %x' \\r | |
413 | %(self.crc, filecrc)\r | |
414 | self.crc = 0\r | |
415 | \r | |
416 | def _readheader(self):\r | |
417 | len = self._read(1)\r | |
418 | fname = self._read(ord(len))\r | |
419 | rest = self._read(1+4+4+2+4+4)\r | |
420 | self._checkcrc()\r | |
421 | \r | |
422 | type = rest[1:5]\r | |
423 | creator = rest[5:9]\r | |
424 | flags = struct.unpack('>h', rest[9:11])[0]\r | |
425 | self.dlen = struct.unpack('>l', rest[11:15])[0]\r | |
426 | self.rlen = struct.unpack('>l', rest[15:19])[0]\r | |
427 | \r | |
428 | self.FName = fname\r | |
429 | self.FInfo = FInfo()\r | |
430 | self.FInfo.Creator = creator\r | |
431 | self.FInfo.Type = type\r | |
432 | self.FInfo.Flags = flags\r | |
433 | \r | |
434 | self.state = _DID_HEADER\r | |
435 | \r | |
436 | def read(self, *n):\r | |
437 | if self.state != _DID_HEADER:\r | |
438 | raise Error, 'Read data at wrong time'\r | |
439 | if n:\r | |
440 | n = n[0]\r | |
441 | n = min(n, self.dlen)\r | |
442 | else:\r | |
443 | n = self.dlen\r | |
444 | rv = ''\r | |
445 | while len(rv) < n:\r | |
446 | rv = rv + self._read(n-len(rv))\r | |
447 | self.dlen = self.dlen - n\r | |
448 | return rv\r | |
449 | \r | |
450 | def close_data(self):\r | |
451 | if self.state != _DID_HEADER:\r | |
452 | raise Error, 'close_data at wrong time'\r | |
453 | if self.dlen:\r | |
454 | dummy = self._read(self.dlen)\r | |
455 | self._checkcrc()\r | |
456 | self.state = _DID_DATA\r | |
457 | \r | |
458 | def read_rsrc(self, *n):\r | |
459 | if self.state == _DID_HEADER:\r | |
460 | self.close_data()\r | |
461 | if self.state != _DID_DATA:\r | |
462 | raise Error, 'Read resource data at wrong time'\r | |
463 | if n:\r | |
464 | n = n[0]\r | |
465 | n = min(n, self.rlen)\r | |
466 | else:\r | |
467 | n = self.rlen\r | |
468 | self.rlen = self.rlen - n\r | |
469 | return self._read(n)\r | |
470 | \r | |
471 | def close(self):\r | |
472 | if self.state is None:\r | |
473 | return\r | |
474 | try:\r | |
475 | if self.rlen:\r | |
476 | dummy = self.read_rsrc(self.rlen)\r | |
477 | self._checkcrc()\r | |
478 | finally:\r | |
479 | self.state = None\r | |
480 | self.ifp.close()\r | |
481 | \r | |
482 | def hexbin(inp, out):\r | |
483 | """(infilename, outfilename) - Decode binhexed file"""\r | |
484 | ifp = HexBin(inp)\r | |
485 | finfo = ifp.FInfo\r | |
486 | if not out:\r | |
487 | out = ifp.FName\r | |
488 | \r | |
489 | ofp = open(out, 'wb')\r | |
490 | # XXXX Do translation on non-mac systems\r | |
491 | while 1:\r | |
492 | d = ifp.read(128000)\r | |
493 | if not d: break\r | |
494 | ofp.write(d)\r | |
495 | ofp.close()\r | |
496 | ifp.close_data()\r | |
497 | \r | |
498 | d = ifp.read_rsrc(128000)\r | |
499 | if d:\r | |
500 | ofp = openrsrc(out, 'wb')\r | |
501 | ofp.write(d)\r | |
502 | while 1:\r | |
503 | d = ifp.read_rsrc(128000)\r | |
504 | if not d: break\r | |
505 | ofp.write(d)\r | |
506 | ofp.close()\r | |
507 | \r | |
508 | ifp.close()\r | |
509 | \r | |
510 | def _test():\r | |
511 | fname = sys.argv[1]\r | |
512 | binhex(fname, fname+'.hqx')\r | |
513 | hexbin(fname+'.hqx', fname+'.viahqx')\r | |
514 | #hexbin(fname, fname+'.unpacked')\r | |
515 | sys.exit(1)\r | |
516 | \r | |
517 | if __name__ == '__main__':\r | |
518 | _test()\r |