]>
git.proxmox.com Git - mirror_qemu.git/blob - tests/qemu-iotests/qcow2_format.py
1 # Library for manipulations with qcow2 image
3 # Copyright (c) 2020 Virtuozzo International GmbH.
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
23 class Qcow2StructMeta(type):
25 # Mapping from c types to python struct format
33 def __init__(self
, name
, bases
, attrs
):
35 self
.fmt
= '>' + ''.join(self
.ctypes
[f
[0]] for f
in self
.fields
)
38 class Qcow2Struct(metaclass
=Qcow2StructMeta
):
40 """Qcow2Struct: base class for qcow2 data structures
42 Successors should define fields class variable, which is: list of tuples,
43 each of three elements:
44 - c-type (one of 'u8', 'u16', 'u32', 'u64')
45 - format (format_spec to use with .format() when dump or 'mask' to dump
50 def __init__(self
, fd
=None, offset
=None, data
=None):
53 1. Specify data. fd and offset must be None.
54 2. Specify fd and offset, data must be None. offset may be omitted
55 in this case, than current position of fd is used.
59 buf_size
= struct
.calcsize(self
.fmt
)
60 if offset
is not None:
62 data
= fd
.read(buf_size
)
64 assert fd
is None and offset
is None
66 values
= struct
.unpack(self
.fmt
, data
)
67 self
.__dict
__ = dict((field
[2], values
[i
])
68 for i
, field
in enumerate(self
.fields
))
72 value
= self
.__dict
__[f
[2]]
76 if value
& (1 << bit
):
80 value_str
= f
[1].format(value
)
82 print('{:<25} {}'.format(f
[2], value_str
))
85 class QcowHeaderExtension
:
87 def __init__(self
, magic
, length
, data
):
89 padding
= 8 - (length
% 8)
90 data
+= b
'\0' * padding
97 def create(cls
, magic
, data
):
98 return QcowHeaderExtension(magic
, len(data
), data
)
101 class QcowHeader(Qcow2Struct
):
104 # Version 2 header fields
105 ('u32', '{:#x}', 'magic'),
106 ('u32', '{}', 'version'),
107 ('u64', '{:#x}', 'backing_file_offset'),
108 ('u32', '{:#x}', 'backing_file_size'),
109 ('u32', '{}', 'cluster_bits'),
110 ('u64', '{}', 'size'),
111 ('u32', '{}', 'crypt_method'),
112 ('u32', '{}', 'l1_size'),
113 ('u64', '{:#x}', 'l1_table_offset'),
114 ('u64', '{:#x}', 'refcount_table_offset'),
115 ('u32', '{}', 'refcount_table_clusters'),
116 ('u32', '{}', 'nb_snapshots'),
117 ('u64', '{:#x}', 'snapshot_offset'),
119 # Version 3 header fields
120 ('u64', 'mask', 'incompatible_features'),
121 ('u64', 'mask', 'compatible_features'),
122 ('u64', 'mask', 'autoclear_features'),
123 ('u32', '{}', 'refcount_order'),
124 ('u32', '{}', 'header_length'),
127 def __init__(self
, fd
):
128 super().__init
__(fd
=fd
, offset
=0)
131 self
.cluster_size
= 1 << self
.cluster_bits
133 fd
.seek(self
.header_length
)
134 self
.load_extensions(fd
)
136 if self
.backing_file_offset
:
137 fd
.seek(self
.backing_file_offset
)
138 self
.backing_file
= fd
.read(self
.backing_file_size
)
140 self
.backing_file
= None
142 def set_defaults(self
):
143 if self
.version
== 2:
144 self
.incompatible_features
= 0
145 self
.compatible_features
= 0
146 self
.autoclear_features
= 0
147 self
.refcount_order
= 4
148 self
.header_length
= 72
150 def load_extensions(self
, fd
):
153 if self
.backing_file_offset
!= 0:
154 end
= min(self
.cluster_size
, self
.backing_file_offset
)
156 end
= self
.cluster_size
158 while fd
.tell() < end
:
159 (magic
, length
) = struct
.unpack('>II', fd
.read(8))
163 padded
= (length
+ 7) & ~
7
164 data
= fd
.read(padded
)
165 self
.extensions
.append(QcowHeaderExtension(magic
, length
,
168 def update_extensions(self
, fd
):
170 fd
.seek(self
.header_length
)
171 extensions
= self
.extensions
172 extensions
.append(QcowHeaderExtension(0, 0, b
''))
173 for ex
in extensions
:
174 buf
= struct
.pack('>II', ex
.magic
, ex
.length
)
178 if self
.backing_file
is not None:
179 self
.backing_file_offset
= fd
.tell()
180 fd
.write(self
.backing_file
)
182 if fd
.tell() > self
.cluster_size
:
183 raise Exception('I think I just broke the image...')
185 def update(self
, fd
):
186 header_bytes
= self
.header_length
188 self
.update_extensions(fd
)
191 header
= tuple(self
.__dict
__[f
] for t
, p
, f
in QcowHeader
.fields
)
192 buf
= struct
.pack(QcowHeader
.fmt
, *header
)
193 buf
= buf
[0:header_bytes
-1]
196 def dump_extensions(self
):
197 for ex
in self
.extensions
:
199 data
= ex
.data
[:ex
.length
]
200 if all(c
in string
.printable
.encode('ascii') for c
in data
):
201 data
= f
"'{ data.decode('ascii') }'"
205 print('Header extension:')
206 print(f
'{"magic":<25} {ex.magic:#x}')
207 print(f
'{"length":<25} {ex.length}')
208 print(f
'{"data":<25} {data}')