]> git.proxmox.com Git - mirror_qemu.git/blob - tests/qemu-iotests/qcow2_format.py
qcow2_format.py: separate generic functionality of structure classes
[mirror_qemu.git] / tests / qemu-iotests / qcow2_format.py
1 # Library for manipulations with qcow2 image
2 #
3 # Copyright (c) 2020 Virtuozzo International GmbH.
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
17 #
18
19 import struct
20 import string
21
22
23 class Qcow2StructMeta(type):
24
25 # Mapping from c types to python struct format
26 ctypes = {
27 'u8': 'B',
28 'u16': 'H',
29 'u32': 'I',
30 'u64': 'Q'
31 }
32
33 def __init__(self, name, bases, attrs):
34 if 'fields' in attrs:
35 self.fmt = '>' + ''.join(self.ctypes[f[0]] for f in self.fields)
36
37
38 class Qcow2Struct(metaclass=Qcow2StructMeta):
39
40 """Qcow2Struct: base class for qcow2 data structures
41
42 Successors should define fields class variable, which is: list of tuples,
43 each of three elements:
44 - c-type (one of 'u8', 'u16', 'u32', 'u64')
45 - format (format_spec to use with .format() when dump or 'mask' to dump
46 bitmasks)
47 - field name
48 """
49
50 def __init__(self, fd=None, offset=None, data=None):
51 """
52 Two variants:
53 1. Specify data. fd and offset must be None.
54 2. Specify fd and offset, data must be None. offset may be omitted
55 in this case, than current position of fd is used.
56 """
57 if data is None:
58 assert fd is not None
59 buf_size = struct.calcsize(self.fmt)
60 if offset is not None:
61 fd.seek(offset)
62 data = fd.read(buf_size)
63 else:
64 assert fd is None and offset is None
65
66 values = struct.unpack(self.fmt, data)
67 self.__dict__ = dict((field[2], values[i])
68 for i, field in enumerate(self.fields))
69
70 def dump(self):
71 for f in self.fields:
72 value = self.__dict__[f[2]]
73 if f[1] == 'mask':
74 bits = []
75 for bit in range(64):
76 if value & (1 << bit):
77 bits.append(bit)
78 value_str = str(bits)
79 else:
80 value_str = f[1].format(value)
81
82 print('{:<25} {}'.format(f[2], value_str))
83
84
85 class QcowHeaderExtension:
86
87 def __init__(self, magic, length, data):
88 if length % 8 != 0:
89 padding = 8 - (length % 8)
90 data += b'\0' * padding
91
92 self.magic = magic
93 self.length = length
94 self.data = data
95
96 @classmethod
97 def create(cls, magic, data):
98 return QcowHeaderExtension(magic, len(data), data)
99
100
101 class QcowHeader(Qcow2Struct):
102
103 fields = (
104 # Version 2 header fields
105 ('u32', '{:#x}', 'magic'),
106 ('u32', '{}', 'version'),
107 ('u64', '{:#x}', 'backing_file_offset'),
108 ('u32', '{:#x}', 'backing_file_size'),
109 ('u32', '{}', 'cluster_bits'),
110 ('u64', '{}', 'size'),
111 ('u32', '{}', 'crypt_method'),
112 ('u32', '{}', 'l1_size'),
113 ('u64', '{:#x}', 'l1_table_offset'),
114 ('u64', '{:#x}', 'refcount_table_offset'),
115 ('u32', '{}', 'refcount_table_clusters'),
116 ('u32', '{}', 'nb_snapshots'),
117 ('u64', '{:#x}', 'snapshot_offset'),
118
119 # Version 3 header fields
120 ('u64', 'mask', 'incompatible_features'),
121 ('u64', 'mask', 'compatible_features'),
122 ('u64', 'mask', 'autoclear_features'),
123 ('u32', '{}', 'refcount_order'),
124 ('u32', '{}', 'header_length'),
125 )
126
127 def __init__(self, fd):
128 super().__init__(fd=fd, offset=0)
129
130 self.set_defaults()
131 self.cluster_size = 1 << self.cluster_bits
132
133 fd.seek(self.header_length)
134 self.load_extensions(fd)
135
136 if self.backing_file_offset:
137 fd.seek(self.backing_file_offset)
138 self.backing_file = fd.read(self.backing_file_size)
139 else:
140 self.backing_file = None
141
142 def set_defaults(self):
143 if self.version == 2:
144 self.incompatible_features = 0
145 self.compatible_features = 0
146 self.autoclear_features = 0
147 self.refcount_order = 4
148 self.header_length = 72
149
150 def load_extensions(self, fd):
151 self.extensions = []
152
153 if self.backing_file_offset != 0:
154 end = min(self.cluster_size, self.backing_file_offset)
155 else:
156 end = self.cluster_size
157
158 while fd.tell() < end:
159 (magic, length) = struct.unpack('>II', fd.read(8))
160 if magic == 0:
161 break
162 else:
163 padded = (length + 7) & ~7
164 data = fd.read(padded)
165 self.extensions.append(QcowHeaderExtension(magic, length,
166 data))
167
168 def update_extensions(self, fd):
169
170 fd.seek(self.header_length)
171 extensions = self.extensions
172 extensions.append(QcowHeaderExtension(0, 0, b''))
173 for ex in extensions:
174 buf = struct.pack('>II', ex.magic, ex.length)
175 fd.write(buf)
176 fd.write(ex.data)
177
178 if self.backing_file is not None:
179 self.backing_file_offset = fd.tell()
180 fd.write(self.backing_file)
181
182 if fd.tell() > self.cluster_size:
183 raise Exception('I think I just broke the image...')
184
185 def update(self, fd):
186 header_bytes = self.header_length
187
188 self.update_extensions(fd)
189
190 fd.seek(0)
191 header = tuple(self.__dict__[f] for t, p, f in QcowHeader.fields)
192 buf = struct.pack(QcowHeader.fmt, *header)
193 buf = buf[0:header_bytes-1]
194 fd.write(buf)
195
196 def dump_extensions(self):
197 for ex in self.extensions:
198
199 data = ex.data[:ex.length]
200 if all(c in string.printable.encode('ascii') for c in data):
201 data = f"'{ data.decode('ascii') }'"
202 else:
203 data = '<binary>'
204
205 print('Header extension:')
206 print(f'{"magic":<25} {ex.magic:#x}')
207 print(f'{"length":<25} {ex.length}')
208 print(f'{"data":<25} {data}')
209 print()