]> git.proxmox.com Git - qemu.git/blob - tests/qemu-iotests/qcow2.py
rtl8139: do not assume TxStatus[] and TxAddr[] are adjacent
[qemu.git] / tests / qemu-iotests / qcow2.py
1 #!/usr/bin/env python
2
3 import sys
4 import struct
5 import string
6
7 class QcowHeaderExtension:
8
9 def __init__(self, magic, length, data):
10 self.magic = magic
11 self.length = length
12 self.data = data
13
14 @classmethod
15 def create(cls, magic, data):
16 return QcowHeaderExtension(magic, len(data), data)
17
18 class QcowHeader:
19
20 uint32_t = 'I'
21 uint64_t = 'Q'
22
23 fields = [
24 # Version 2 header fields
25 [ uint32_t, '%#x', 'magic' ],
26 [ uint32_t, '%d', 'version' ],
27 [ uint64_t, '%#x', 'backing_file_offset' ],
28 [ uint32_t, '%#x', 'backing_file_size' ],
29 [ uint32_t, '%d', 'cluster_bits' ],
30 [ uint64_t, '%d', 'size' ],
31 [ uint32_t, '%d', 'crypt_method' ],
32 [ uint32_t, '%d', 'l1_size' ],
33 [ uint64_t, '%#x', 'l1_table_offset' ],
34 [ uint64_t, '%#x', 'refcount_table_offset' ],
35 [ uint32_t, '%d', 'refcount_table_clusters' ],
36 [ uint32_t, '%d', 'nb_snapshots' ],
37 [ uint64_t, '%#x', 'snapshot_offset' ],
38 ];
39
40 fmt = '>' + ''.join(field[0] for field in fields)
41
42 def __init__(self, fd):
43
44 buf_size = struct.calcsize(QcowHeader.fmt)
45
46 fd.seek(0)
47 buf = fd.read(buf_size)
48
49 header = struct.unpack(QcowHeader.fmt, buf)
50 self.__dict__ = dict((field[2], header[i])
51 for i, field in enumerate(QcowHeader.fields))
52
53 self.cluster_size = 1 << self.cluster_bits
54
55 fd.seek(self.get_header_length())
56 self.load_extensions(fd)
57
58 if self.backing_file_offset:
59 fd.seek(self.backing_file_offset)
60 self.backing_file = fd.read(self.backing_file_size)
61 else:
62 self.backing_file = None
63
64 def get_header_length(self):
65 if self.version == 2:
66 return 72
67 else:
68 raise Exception("version != 2 not supported")
69
70 def load_extensions(self, fd):
71 self.extensions = []
72
73 if self.backing_file_offset != 0:
74 end = min(self.cluster_size, self.backing_file_offset)
75 else:
76 end = self.cluster_size
77
78 while fd.tell() < end:
79 (magic, length) = struct.unpack('>II', fd.read(8))
80 if magic == 0:
81 break
82 else:
83 padded = (length + 7) & ~7
84 data = fd.read(padded)
85 self.extensions.append(QcowHeaderExtension(magic, length, data))
86
87 def update_extensions(self, fd):
88
89 fd.seek(self.get_header_length())
90 extensions = self.extensions
91 extensions.append(QcowHeaderExtension(0, 0, ""))
92 for ex in extensions:
93 buf = struct.pack('>II', ex.magic, ex.length)
94 fd.write(buf)
95 fd.write(ex.data)
96
97 if self.backing_file != None:
98 self.backing_file_offset = fd.tell()
99 fd.write(self.backing_file)
100
101 if fd.tell() > self.cluster_size:
102 raise Exception("I think I just broke the image...")
103
104
105 def update(self, fd):
106 header_bytes = self.get_header_length()
107
108 self.update_extensions(fd)
109
110 fd.seek(0)
111 header = tuple(self.__dict__[f] for t, p, f in QcowHeader.fields)
112 buf = struct.pack(QcowHeader.fmt, *header)
113 buf = buf[0:header_bytes-1]
114 fd.write(buf)
115
116 def dump(self):
117 for f in QcowHeader.fields:
118 print "%-25s" % f[2], f[1] % self.__dict__[f[2]]
119 print ""
120
121 def dump_extensions(self):
122 for ex in self.extensions:
123
124 data = ex.data[:ex.length]
125 if all(c in string.printable for c in data):
126 data = "'%s'" % data
127 else:
128 data = "<binary>"
129
130 print "Header extension:"
131 print "%-25s %#x" % ("magic", ex.magic)
132 print "%-25s %d" % ("length", ex.length)
133 print "%-25s %s" % ("data", data)
134 print ""
135
136
137 def cmd_dump_header(fd):
138 h = QcowHeader(fd)
139 h.dump()
140 h.dump_extensions()
141
142 def cmd_add_header_ext(fd, magic, data):
143 try:
144 magic = int(magic, 0)
145 except:
146 print "'%s' is not a valid magic number" % magic
147 sys.exit(1)
148
149 h = QcowHeader(fd)
150 h.extensions.append(QcowHeaderExtension.create(magic, data))
151 h.update(fd)
152
153 def cmd_del_header_ext(fd, magic):
154 try:
155 magic = int(magic, 0)
156 except:
157 print "'%s' is not a valid magic number" % magic
158 sys.exit(1)
159
160 h = QcowHeader(fd)
161 found = False
162
163 for ex in h.extensions:
164 if ex.magic == magic:
165 found = True
166 h.extensions.remove(ex)
167
168 if not found:
169 print "No such header extension"
170 return
171
172 h.update(fd)
173
174 cmds = [
175 [ 'dump-header', cmd_dump_header, 0, 'Dump image header and header extensions' ],
176 [ 'add-header-ext', cmd_add_header_ext, 2, 'Add a header extension' ],
177 [ 'del-header-ext', cmd_del_header_ext, 1, 'Delete a header extension' ],
178 ]
179
180 def main(filename, cmd, args):
181 fd = open(filename, "r+b")
182 try:
183 for name, handler, num_args, desc in cmds:
184 if name != cmd:
185 continue
186 elif len(args) != num_args:
187 usage()
188 return
189 else:
190 handler(fd, *args)
191 return
192 print "Unknown command '%s'" % cmd
193 finally:
194 fd.close()
195
196 def usage():
197 print "Usage: %s <file> <cmd> [<arg>, ...]" % sys.argv[0]
198 print ""
199 print "Supported commands:"
200 for name, handler, num_args, desc in cmds:
201 print " %-20s - %s" % (name, desc)
202
203 if len(sys.argv) < 3:
204 usage()
205 sys.exit(1)
206
207 main(sys.argv[1], sys.argv[2], sys.argv[3:])