]> git.proxmox.com Git - ceph.git/blob - ceph/src/ceph-volume/ceph_volume/util/encryption.py
update sources to v12.2.3
[ceph.git] / ceph / src / ceph-volume / ceph_volume / util / encryption.py
1 import base64
2 import os
3 import logging
4 from ceph_volume import process, conf
5 from ceph_volume.util import constants, system
6 from .prepare import write_keyring
7 from .disk import lsblk, device_family, get_part_entry_type
8
9 logger = logging.getLogger(__name__)
10
11
12 def create_dmcrypt_key():
13 """
14 Create the secret dm-crypt key used to decrypt a device.
15 """
16 # get the customizable dmcrypt key size (in bits) from ceph.conf fallback
17 # to the default of 1024
18 dmcrypt_key_size = conf.ceph.get_safe(
19 'osd',
20 'osd_dmcrypt_key_size',
21 default=1024,
22 )
23 # The size of the key is defined in bits, so we must transform that
24 # value to bytes (dividing by 8) because we read in bytes, not bits
25 random_string = os.urandom(dmcrypt_key_size / 8)
26 key = base64.b64encode(random_string).decode('utf-8')
27 return key
28
29
30 def luks_format(key, device):
31 """
32 Decrypt (open) an encrypted device, previously prepared with cryptsetup
33
34 :param key: dmcrypt secret key, will be used for decrypting
35 :param device: Absolute path to device
36 """
37 command = [
38 'cryptsetup',
39 '--batch-mode', # do not prompt
40 '--key-file', # misnomer, should be key
41 '-', # because we indicate stdin for the key here
42 'luksFormat',
43 device,
44 ]
45 process.call(command, stdin=key, terminal_verbose=True, show_command=True)
46
47
48 def plain_open(key, device, mapping):
49 """
50 Decrypt (open) an encrypted device, previously prepared with cryptsetup in plain mode
51
52 .. note: ceph-disk will require an additional b64decode call for this to work
53
54 :param key: dmcrypt secret key
55 :param device: absolute path to device
56 :param mapping: mapping name used to correlate device. Usually a UUID
57 """
58 command = [
59 'cryptsetup',
60 '--key-file',
61 '-',
62 'open',
63 device,
64 mapping,
65 '--type', 'plain',
66 '--key-size', '256',
67 ]
68
69 process.call(command, stdin=key, terminal_verbose=True, show_command=True)
70
71
72 def luks_open(key, device, mapping):
73 """
74 Decrypt (open) an encrypted device, previously prepared with cryptsetup
75
76 .. note: ceph-disk will require an additional b64decode call for this to work
77
78 :param key: dmcrypt secret key
79 :param device: absolute path to device
80 :param mapping: mapping name used to correlate device. Usually a UUID
81 """
82 command = [
83 'cryptsetup',
84 '--key-file',
85 '-',
86 'luksOpen',
87 device,
88 mapping,
89 ]
90 process.call(command, stdin=key, terminal_verbose=True, show_command=True)
91
92
93 def dmcrypt_close(mapping):
94 """
95 Encrypt (close) a device, previously decrypted with cryptsetup
96
97 :param mapping:
98 """
99 process.run(['cryptsetup', 'remove', mapping])
100
101
102 def get_dmcrypt_key(osd_id, osd_fsid, lockbox_keyring=None):
103 """
104 Retrieve the dmcrypt (secret) key stored initially on the monitor. The key
105 is sent initially with JSON, and the Monitor then mangles the name to
106 ``dm-crypt/osd/<fsid>/luks``
107
108 The ``lockbox.keyring`` file is required for this operation, and it is
109 assumed it will exist on the path for the same OSD that is being activated.
110 To support scanning, it is optionally configurable to a custom location
111 (e.g. inside a lockbox partition mounted in a temporary location)
112 """
113 if lockbox_keyring is None:
114 lockbox_keyring = '/var/lib/ceph/osd/%s-%s/lockbox.keyring' % (conf.cluster, osd_id)
115 name = 'client.osd-lockbox.%s' % osd_fsid
116 config_key = 'dm-crypt/osd/%s/luks' % osd_fsid
117
118 stdout, stderr, returncode = process.call(
119 [
120 'ceph',
121 '--cluster', conf.cluster,
122 '--name', name,
123 '--keyring', lockbox_keyring,
124 'config-key',
125 'get',
126 config_key
127 ],
128 show_command=True
129 )
130 if returncode != 0:
131 raise RuntimeError('Unable to retrieve dmcrypt secret')
132 return ' '.join(stdout).strip()
133
134
135 def write_lockbox_keyring(osd_id, osd_fsid, secret):
136 """
137 Helper to write the lockbox keyring. This is needed because the bluestore OSD will
138 not persist the keyring, and it can't be stored in the data device for filestore because
139 at the time this is needed, the device is encrypted.
140
141 For bluestore: A tmpfs filesystem is mounted, so the path can get written
142 to, but the files are ephemeral, which requires this file to be created
143 every time it is activated.
144 For filestore: The path for the OSD would exist at this point even if no
145 OSD data device is mounted, so the keyring is written to fetch the key, and
146 then the data device is mounted on that directory, making the keyring
147 "dissapear".
148 """
149 if os.path.exists('/var/lib/ceph/osd/%s-%s/lockbox.keyring' % (conf.cluster, osd_id)):
150 return
151
152 name = 'client.osd-lockbox.%s' % osd_fsid
153 write_keyring(
154 osd_id,
155 secret,
156 keyring_name='lockbox.keyring',
157 name=name
158 )
159
160
161 def status(device):
162 """
163 Capture the metadata information of a possibly encrypted device, returning
164 a dictionary with all the values found (if any).
165
166 An encrypted device will contain information about a device. Example
167 successful output looks like::
168
169 $ cryptsetup status /dev/mapper/ed6b5a26-eafe-4cd4-87e3-422ff61e26c4
170 /dev/mapper/ed6b5a26-eafe-4cd4-87e3-422ff61e26c4 is active and is in use.
171 type: LUKS1
172 cipher: aes-xts-plain64
173 keysize: 256 bits
174 device: /dev/sdc2
175 offset: 4096 sectors
176 size: 20740063 sectors
177 mode: read/write
178
179 As long as the mapper device is in 'open' state, the ``status`` call will work.
180
181 :param device: Absolute path or UUID of the device mapper
182 """
183 command = [
184 'cryptsetup',
185 'status',
186 device,
187 ]
188 out, err, code = process.call(command, show_command=True)
189 metadata = {}
190 if code != 0:
191 logger.warning('failed to detect device mapper information')
192 return metadata
193 for line in out:
194 # get rid of lines that might not be useful to construct the report:
195 if not line.startswith(' '):
196 continue
197 try:
198 column, value = line.split(': ')
199 except ValueError:
200 continue
201 metadata[column.strip()] = value.strip().strip('"')
202 return metadata
203
204
205 def legacy_encrypted(device):
206 """
207 Detect if a device was encrypted with ceph-disk or not. In the case of
208 encrypted devices, include the type of encryption (LUKS, or PLAIN), and
209 infer what the lockbox partition is.
210
211 This function assumes that ``device`` will be a partition.
212 """
213 if os.path.isdir(device):
214 mounts = system.get_mounts(paths=True)
215 # yes, rebind the device variable here because a directory isn't going
216 # to help with parsing
217 device = mounts.get(device, [None])[0]
218 if not device:
219 raise RuntimeError('unable to determine the device mounted at %s' % device)
220 metadata = {'encrypted': False, 'type': None, 'lockbox': '', 'device': device}
221 # check if the device is online/decrypted first
222 active_mapper = status(device)
223 if active_mapper:
224 # normalize a bit to ensure same values regardless of source
225 metadata['type'] = active_mapper['type'].lower().strip('12') # turn LUKS1 or LUKS2 into luks
226 metadata['encrypted'] = True if metadata['type'] in ['plain', 'luks'] else False
227 # The true device is now available to this function, so it gets
228 # re-assigned here for the lockbox checks to succeed (it is not
229 # possible to guess partitions from a device mapper device otherwise
230 device = active_mapper.get('device', device)
231 metadata['device'] = device
232 else:
233 uuid = get_part_entry_type(device)
234 guid_match = constants.ceph_disk_guids.get(uuid, {})
235 encrypted_guid = guid_match.get('encrypted', False)
236 if encrypted_guid:
237 metadata['encrypted'] = True
238 metadata['type'] = guid_match['encryption_type']
239
240 # Lets find the lockbox location now, to do this, we need to find out the
241 # parent device name for the device so that we can query all of its
242 # associated devices and *then* look for one that has the 'lockbox' label
243 # on it. Thanks for being awesome ceph-disk
244 disk_meta = lsblk(device, abspath=True)
245 if not disk_meta:
246 return metadata
247 parent_device = disk_meta['PKNAME']
248 # With the parent device set, we can now look for the lockbox listing associated devices
249 devices = device_family(parent_device)
250 for i in devices:
251 if 'lockbox' in i.get('PARTLABEL', ''):
252 metadata['lockbox'] = i['NAME']
253 break
254 return metadata