]>
Commit | Line | Data |
---|---|---|
92f5a8d4 | 1 | import os |
cd265ab1 | 2 | import sys |
92f5a8d4 TL |
3 | import stat |
4 | import uuid | |
5 | import errno | |
6 | import logging | |
cd265ab1 | 7 | import json |
e306af50 | 8 | from datetime import datetime |
cd265ab1 | 9 | from typing import List, Dict |
92f5a8d4 TL |
10 | |
11 | import cephfs | |
12 | ||
13 | from .metadata_manager import MetadataManager | |
adb31ebb TL |
14 | from .subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeFeatures |
15 | from .op_sm import SubvolumeOpSm | |
16 | from .subvolume_base import SubvolumeBase | |
92f5a8d4 TL |
17 | from ..template import SubvolumeTemplate |
18 | from ..snapshot_util import mksnap, rmsnap | |
cd265ab1 TL |
19 | from ..access import allow_access, deny_access |
20 | from ...exception import IndexException, OpSmException, VolumeException, MetadataMgrException, EvictionError | |
a4b75251 | 21 | from ...fs_util import listsnaps, is_inherited_snap, create_base_dir |
adb31ebb | 22 | from ..template import SubvolumeOpType |
cd265ab1 TL |
23 | from ..group import Group |
24 | from ..rankevicter import RankEvicter | |
25 | from ..volume import get_mds_map | |
92f5a8d4 TL |
26 | |
27 | from ..clone_index import open_clone_index, create_clone_index | |
28 | ||
29 | log = logging.getLogger(__name__) | |
30 | ||
31 | class SubvolumeV1(SubvolumeBase, SubvolumeTemplate): | |
adb31ebb TL |
32 | """ |
33 | Version 1 subvolumes creates a subvolume with path as follows, | |
34 | volumes/<group-name>/<subvolume-name>/<uuid>/ | |
35 | ||
36 | - The directory under which user data resides is <uuid> | |
37 | - Snapshots of the subvolume are taken within the <uuid> directory | |
38 | - A meta file is maintained under the <subvolume-name> directory as a metadata store, typically storing, | |
39 | - global information about the subvolume (version, path, type, state) | |
40 | - snapshots attached to an ongoing clone operation | |
41 | - clone snapshot source if subvolume is a clone of a snapshot | |
42 | - It retains backward compatability with legacy subvolumes by creating the meta file for legacy subvolumes under | |
43 | /volumes/_legacy/ (see legacy_config_path), thus allowing cloning of older legacy volumes that lack the <uuid> | |
44 | component in the path. | |
45 | """ | |
92f5a8d4 TL |
46 | VERSION = 1 |
47 | ||
48 | @staticmethod | |
49 | def version(): | |
50 | return SubvolumeV1.VERSION | |
51 | ||
52 | @property | |
53 | def path(self): | |
54 | try: | |
55 | # no need to stat the path -- open() does that | |
adb31ebb | 56 | return self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH).encode('utf-8') |
92f5a8d4 TL |
57 | except MetadataMgrException as me: |
58 | raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata") | |
59 | ||
f6b5b4d7 TL |
60 | @property |
61 | def features(self): | |
62 | return [SubvolumeFeatures.FEATURE_SNAPSHOT_CLONE.value, SubvolumeFeatures.FEATURE_SNAPSHOT_AUTOPROTECT.value] | |
63 | ||
adb31ebb TL |
64 | def mark_subvolume(self): |
65 | # set subvolume attr, on subvolume root, marking it as a CephFS subvolume | |
66 | # subvolume root is where snapshots would be taken, and hence is the <uuid> dir for v1 subvolumes | |
67 | try: | |
68 | # MDS treats this as a noop for already marked subvolume | |
69 | self.fs.setxattr(self.path, 'ceph.dir.subvolume', b'1', 0) | |
70 | except cephfs.InvalidValue as e: | |
71 | raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume") | |
72 | except cephfs.Error as e: | |
73 | raise VolumeException(-e.args[0], e.args[1]) | |
74 | ||
75 | def snapshot_base_path(self): | |
76 | """ Base path for all snapshots """ | |
77 | return os.path.join(self.path, self.vol_spec.snapshot_dir_prefix.encode('utf-8')) | |
78 | ||
79 | def snapshot_path(self, snapname): | |
80 | """ Path to a specific snapshot named 'snapname' """ | |
81 | return os.path.join(self.snapshot_base_path(), snapname.encode('utf-8')) | |
82 | ||
83 | def snapshot_data_path(self, snapname): | |
84 | """ Path to user data directory within a subvolume snapshot named 'snapname' """ | |
85 | return self.snapshot_path(snapname) | |
86 | ||
92f5a8d4 | 87 | def create(self, size, isolate_nspace, pool, mode, uid, gid): |
adb31ebb | 88 | subvolume_type = SubvolumeTypes.TYPE_NORMAL |
92f5a8d4 | 89 | try: |
adb31ebb | 90 | initial_state = SubvolumeOpSm.get_init_state(subvolume_type) |
92f5a8d4 TL |
91 | except OpSmException as oe: |
92 | raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error") | |
93 | ||
94 | subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8')) | |
95 | try: | |
a4b75251 TL |
96 | # create group directory with default mode(0o755) if it doesn't exist. |
97 | create_base_dir(self.fs, self.group.path, self.vol_spec.DEFAULT_MODE) | |
92f5a8d4 TL |
98 | # create directory and set attributes |
99 | self.fs.mkdirs(subvol_path, mode) | |
adb31ebb TL |
100 | self.mark_subvolume() |
101 | attrs = { | |
102 | 'uid': uid, | |
103 | 'gid': gid, | |
104 | 'data_pool': pool, | |
105 | 'pool_namespace': self.namespace if isolate_nspace else None, | |
106 | 'quota': size | |
107 | } | |
108 | self.set_attrs(subvol_path, attrs) | |
92f5a8d4 TL |
109 | |
110 | # persist subvolume metadata | |
111 | qpath = subvol_path.decode('utf-8') | |
112 | self.init_config(SubvolumeV1.VERSION, subvolume_type, qpath, initial_state) | |
113 | except (VolumeException, MetadataMgrException, cephfs.Error) as e: | |
114 | try: | |
115 | log.info("cleaning up subvolume with path: {0}".format(self.subvolname)) | |
116 | self.remove() | |
117 | except VolumeException as ve: | |
118 | log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve)) | |
119 | ||
120 | if isinstance(e, MetadataMgrException): | |
121 | log.error("metadata manager exception: {0}".format(e)) | |
122 | e = VolumeException(-errno.EINVAL, "exception in subvolume metadata") | |
123 | elif isinstance(e, cephfs.Error): | |
124 | e = VolumeException(-e.args[0], e.args[1]) | |
125 | raise e | |
126 | ||
127 | def add_clone_source(self, volname, subvolume, snapname, flush=False): | |
128 | self.metadata_mgr.add_section("source") | |
129 | self.metadata_mgr.update_section("source", "volume", volname) | |
130 | if not subvolume.group.is_default_group(): | |
131 | self.metadata_mgr.update_section("source", "group", subvolume.group_name) | |
132 | self.metadata_mgr.update_section("source", "subvolume", subvolume.subvol_name) | |
133 | self.metadata_mgr.update_section("source", "snapshot", snapname) | |
134 | if flush: | |
135 | self.metadata_mgr.flush() | |
136 | ||
137 | def remove_clone_source(self, flush=False): | |
138 | self.metadata_mgr.remove_section("source") | |
139 | if flush: | |
140 | self.metadata_mgr.flush() | |
141 | ||
142 | def create_clone(self, pool, source_volname, source_subvolume, snapname): | |
adb31ebb | 143 | subvolume_type = SubvolumeTypes.TYPE_CLONE |
92f5a8d4 | 144 | try: |
adb31ebb | 145 | initial_state = SubvolumeOpSm.get_init_state(subvolume_type) |
92f5a8d4 TL |
146 | except OpSmException as oe: |
147 | raise VolumeException(-errno.EINVAL, "clone failed: internal error") | |
148 | ||
149 | subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8')) | |
150 | try: | |
adb31ebb TL |
151 | # source snapshot attrs are used to create clone subvolume. |
152 | # attributes of subvolume's content though, are synced during the cloning process. | |
153 | attrs = source_subvolume.get_attrs(source_subvolume.snapshot_data_path(snapname)) | |
154 | ||
20effc67 TL |
155 | # The source of the clone may have exceeded its quota limit as |
156 | # CephFS quotas are imprecise. Cloning such a source may fail if | |
157 | # the quota on the destination is set before starting the clone | |
158 | # copy. So always set the quota on destination after cloning is | |
159 | # successful. | |
160 | attrs["quota"] = None | |
161 | ||
adb31ebb TL |
162 | # override snapshot pool setting, if one is provided for the clone |
163 | if pool is not None: | |
164 | attrs["data_pool"] = pool | |
165 | attrs["pool_namespace"] = None | |
166 | ||
92f5a8d4 | 167 | # create directory and set attributes |
adb31ebb TL |
168 | self.fs.mkdirs(subvol_path, attrs.get("mode")) |
169 | self.mark_subvolume() | |
170 | self.set_attrs(subvol_path, attrs) | |
92f5a8d4 TL |
171 | |
172 | # persist subvolume metadata and clone source | |
173 | qpath = subvol_path.decode('utf-8') | |
adb31ebb | 174 | self.metadata_mgr.init(SubvolumeV1.VERSION, subvolume_type.value, qpath, initial_state.value) |
92f5a8d4 TL |
175 | self.add_clone_source(source_volname, source_subvolume, snapname) |
176 | self.metadata_mgr.flush() | |
177 | except (VolumeException, MetadataMgrException, cephfs.Error) as e: | |
178 | try: | |
179 | log.info("cleaning up subvolume with path: {0}".format(self.subvolname)) | |
180 | self.remove() | |
181 | except VolumeException as ve: | |
182 | log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve)) | |
183 | ||
184 | if isinstance(e, MetadataMgrException): | |
185 | log.error("metadata manager exception: {0}".format(e)) | |
186 | e = VolumeException(-errno.EINVAL, "exception in subvolume metadata") | |
187 | elif isinstance(e, cephfs.Error): | |
188 | e = VolumeException(-e.args[0], e.args[1]) | |
189 | raise e | |
190 | ||
adb31ebb TL |
191 | def allowed_ops_by_type(self, vol_type): |
192 | if vol_type == SubvolumeTypes.TYPE_CLONE: | |
193 | return {op_type for op_type in SubvolumeOpType} | |
194 | ||
195 | if vol_type == SubvolumeTypes.TYPE_NORMAL: | |
196 | return {op_type for op_type in SubvolumeOpType} - {SubvolumeOpType.CLONE_STATUS, | |
197 | SubvolumeOpType.CLONE_CANCEL, | |
198 | SubvolumeOpType.CLONE_INTERNAL} | |
199 | ||
200 | return {} | |
201 | ||
202 | def allowed_ops_by_state(self, vol_state): | |
203 | if vol_state == SubvolumeStates.STATE_COMPLETE: | |
204 | return {op_type for op_type in SubvolumeOpType} | |
205 | ||
206 | return {SubvolumeOpType.REMOVE_FORCE, | |
207 | SubvolumeOpType.CLONE_CREATE, | |
208 | SubvolumeOpType.CLONE_STATUS, | |
209 | SubvolumeOpType.CLONE_CANCEL, | |
210 | SubvolumeOpType.CLONE_INTERNAL} | |
211 | ||
212 | def open(self, op_type): | |
213 | if not isinstance(op_type, SubvolumeOpType): | |
214 | raise VolumeException(-errno.ENOTSUP, "operation {0} not supported on subvolume '{1}'".format( | |
215 | op_type.value, self.subvolname)) | |
92f5a8d4 TL |
216 | try: |
217 | self.metadata_mgr.refresh() | |
adb31ebb TL |
218 | |
219 | etype = self.subvol_type | |
220 | if op_type not in self.allowed_ops_by_type(etype): | |
221 | raise VolumeException(-errno.ENOTSUP, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format( | |
222 | op_type.value, self.subvolname, etype.value)) | |
223 | ||
224 | estate = self.state | |
225 | if op_type not in self.allowed_ops_by_state(estate): | |
226 | raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for operation {1}".format( | |
227 | self.subvolname, op_type.value)) | |
228 | ||
92f5a8d4 TL |
229 | subvol_path = self.path |
230 | log.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path)) | |
231 | st = self.fs.stat(subvol_path) | |
adb31ebb TL |
232 | # unconditionally mark as subvolume, to handle pre-existing subvolumes without the mark |
233 | self.mark_subvolume() | |
234 | ||
92f5a8d4 TL |
235 | self.uid = int(st.st_uid) |
236 | self.gid = int(st.st_gid) | |
237 | self.mode = int(st.st_mode & ~stat.S_IFMT(st.st_mode)) | |
238 | except MetadataMgrException as me: | |
239 | if me.errno == -errno.ENOENT: | |
240 | raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname)) | |
241 | raise VolumeException(me.args[0], me.args[1]) | |
242 | except cephfs.ObjectNotFound: | |
243 | log.debug("missing subvolume path '{0}' for subvolume '{1}'".format(subvol_path, self.subvolname)) | |
244 | raise VolumeException(-errno.ENOENT, "mount path missing for subvolume '{0}'".format(self.subvolname)) | |
245 | except cephfs.Error as e: | |
246 | raise VolumeException(-e.args[0], e.args[1]) | |
247 | ||
cd265ab1 TL |
248 | def _recover_auth_meta(self, auth_id, auth_meta): |
249 | """ | |
250 | Call me after locking the auth meta file. | |
251 | """ | |
252 | remove_subvolumes = [] | |
253 | ||
254 | for subvol, subvol_data in auth_meta['subvolumes'].items(): | |
255 | if not subvol_data['dirty']: | |
256 | continue | |
257 | ||
258 | (group_name, subvol_name) = subvol.split('/') | |
259 | group_name = group_name if group_name != 'None' else Group.NO_GROUP_NAME | |
260 | access_level = subvol_data['access_level'] | |
261 | ||
262 | with self.auth_mdata_mgr.subvol_metadata_lock(group_name, subvol_name): | |
263 | subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(group_name, subvol_name) | |
264 | ||
265 | # No SVMeta update indicates that there was no auth update | |
266 | # in Ceph either. So it's safe to remove corresponding | |
267 | # partial update in AMeta. | |
268 | if not subvol_meta or auth_id not in subvol_meta['auths']: | |
269 | remove_subvolumes.append(subvol) | |
270 | continue | |
271 | ||
272 | want_auth = { | |
273 | 'access_level': access_level, | |
274 | 'dirty': False, | |
275 | } | |
276 | # SVMeta update looks clean. Ceph auth update must have been | |
277 | # clean. Update the dirty flag and continue | |
278 | if subvol_meta['auths'][auth_id] == want_auth: | |
279 | auth_meta['subvolumes'][subvol]['dirty'] = False | |
280 | self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) | |
281 | continue | |
282 | ||
283 | client_entity = "client.{0}".format(auth_id) | |
284 | ret, out, err = self.mgr.mon_command( | |
285 | { | |
286 | 'prefix': 'auth get', | |
287 | 'entity': client_entity, | |
288 | 'format': 'json' | |
289 | }) | |
290 | if ret == 0: | |
291 | existing_caps = json.loads(out) | |
292 | elif ret == -errno.ENOENT: | |
293 | existing_caps = None | |
294 | else: | |
295 | log.error(err) | |
296 | raise VolumeException(ret, err) | |
297 | ||
298 | self._authorize_subvolume(auth_id, access_level, existing_caps) | |
299 | ||
300 | # Recovered from partial auth updates for the auth ID's access | |
301 | # to a subvolume. | |
302 | auth_meta['subvolumes'][subvol]['dirty'] = False | |
303 | self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) | |
304 | ||
305 | for subvol in remove_subvolumes: | |
306 | del auth_meta['subvolumes'][subvol] | |
307 | ||
308 | if not auth_meta['subvolumes']: | |
309 | # Clean up auth meta file | |
310 | self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id)) | |
311 | return | |
312 | ||
313 | # Recovered from all partial auth updates for the auth ID. | |
314 | auth_meta['dirty'] = False | |
315 | self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) | |
316 | ||
317 | def authorize(self, auth_id, access_level, tenant_id=None, allow_existing_id=False): | |
318 | """ | |
319 | Get-or-create a Ceph auth identity for `auth_id` and grant them access | |
320 | to | |
321 | :param auth_id: | |
322 | :param access_level: | |
323 | :param tenant_id: Optionally provide a stringizable object to | |
324 | restrict any created cephx IDs to other callers | |
325 | passing the same tenant ID. | |
326 | :allow_existing_id: Optionally authorize existing auth-ids not | |
327 | created by ceph_volume_client. | |
328 | :return: | |
329 | """ | |
330 | ||
331 | with self.auth_mdata_mgr.auth_lock(auth_id): | |
332 | client_entity = "client.{0}".format(auth_id) | |
333 | ret, out, err = self.mgr.mon_command( | |
334 | { | |
335 | 'prefix': 'auth get', | |
336 | 'entity': client_entity, | |
337 | 'format': 'json' | |
338 | }) | |
339 | ||
340 | if ret == 0: | |
341 | existing_caps = json.loads(out) | |
342 | elif ret == -errno.ENOENT: | |
343 | existing_caps = None | |
344 | else: | |
345 | log.error(err) | |
346 | raise VolumeException(ret, err) | |
347 | ||
348 | # Existing meta, or None, to be updated | |
349 | auth_meta = self.auth_mdata_mgr.auth_metadata_get(auth_id) | |
350 | ||
351 | # subvolume data to be inserted | |
352 | group_name = self.group.groupname if self.group.groupname != Group.NO_GROUP_NAME else None | |
353 | group_subvol_id = "{0}/{1}".format(group_name, self.subvolname) | |
354 | subvolume = { | |
355 | group_subvol_id : { | |
356 | # The access level at which the auth_id is authorized to | |
357 | # access the volume. | |
358 | 'access_level': access_level, | |
359 | 'dirty': True, | |
360 | } | |
361 | } | |
362 | ||
363 | if auth_meta is None: | |
364 | if not allow_existing_id and existing_caps is not None: | |
365 | msg = "auth ID: {0} exists and not created by mgr plugin. Not allowed to modify".format(auth_id) | |
366 | log.error(msg) | |
367 | raise VolumeException(-errno.EPERM, msg) | |
368 | ||
369 | # non-existent auth IDs | |
370 | sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format( | |
371 | auth_id, tenant_id | |
372 | )) | |
373 | log.debug("Authorize: no existing meta") | |
374 | auth_meta = { | |
375 | 'dirty': True, | |
376 | 'tenant_id': str(tenant_id) if tenant_id else None, | |
377 | 'subvolumes': subvolume | |
378 | } | |
379 | else: | |
380 | # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key | |
381 | if 'volumes' in auth_meta: | |
382 | auth_meta['subvolumes'] = auth_meta.pop('volumes') | |
383 | ||
384 | # Disallow tenants to share auth IDs | |
385 | if str(auth_meta['tenant_id']) != str(tenant_id): | |
386 | msg = "auth ID: {0} is already in use".format(auth_id) | |
387 | log.error(msg) | |
388 | raise VolumeException(-errno.EPERM, msg) | |
389 | ||
390 | if auth_meta['dirty']: | |
391 | self._recover_auth_meta(auth_id, auth_meta) | |
392 | ||
393 | log.debug("Authorize: existing tenant {tenant}".format( | |
394 | tenant=auth_meta['tenant_id'] | |
395 | )) | |
396 | auth_meta['dirty'] = True | |
397 | auth_meta['subvolumes'].update(subvolume) | |
398 | ||
399 | self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) | |
400 | ||
401 | with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname): | |
402 | key = self._authorize_subvolume(auth_id, access_level, existing_caps) | |
403 | ||
404 | auth_meta['dirty'] = False | |
405 | auth_meta['subvolumes'][group_subvol_id]['dirty'] = False | |
406 | self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) | |
407 | ||
408 | if tenant_id: | |
409 | return key | |
410 | else: | |
411 | # Caller wasn't multi-tenant aware: be safe and don't give | |
412 | # them a key | |
413 | return "" | |
414 | ||
415 | def _authorize_subvolume(self, auth_id, access_level, existing_caps): | |
416 | subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname) | |
417 | ||
418 | auth = { | |
419 | auth_id: { | |
420 | 'access_level': access_level, | |
421 | 'dirty': True, | |
422 | } | |
423 | } | |
424 | ||
425 | if subvol_meta is None: | |
426 | subvol_meta = { | |
427 | 'auths': auth | |
428 | } | |
429 | else: | |
430 | subvol_meta['auths'].update(auth) | |
431 | self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta) | |
432 | ||
433 | key = self._authorize(auth_id, access_level, existing_caps) | |
434 | ||
435 | subvol_meta['auths'][auth_id]['dirty'] = False | |
436 | self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta) | |
437 | ||
438 | return key | |
439 | ||
440 | def _authorize(self, auth_id, access_level, existing_caps): | |
441 | subvol_path = self.path | |
442 | log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(auth_id, subvol_path)) | |
443 | ||
444 | # First I need to work out what the data pool is for this share: | |
445 | # read the layout | |
446 | try: | |
447 | pool = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool').decode('utf-8') | |
448 | except cephfs.Error as e: | |
449 | raise VolumeException(-e.args[0], e.args[1]) | |
450 | ||
451 | try: | |
452 | namespace = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool_namespace').decode('utf-8') | |
453 | except cephfs.NoData: | |
454 | namespace = None | |
455 | ||
456 | # Now construct auth capabilities that give the guest just enough | |
457 | # permissions to access the share | |
458 | client_entity = "client.{0}".format(auth_id) | |
459 | want_mds_cap = "allow {0} path={1}".format(access_level, subvol_path.decode('utf-8')) | |
460 | want_osd_cap = "allow {0} pool={1}{2}".format( | |
461 | access_level, pool, " namespace={0}".format(namespace) if namespace else "") | |
462 | ||
463 | # Construct auth caps that if present might conflict with the desired | |
464 | # auth caps. | |
f67539c2 | 465 | unwanted_access_level = 'r' if access_level == 'rw' else 'rw' |
cd265ab1 TL |
466 | unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, subvol_path.decode('utf-8')) |
467 | unwanted_osd_cap = "allow {0} pool={1}{2}".format( | |
468 | unwanted_access_level, pool, " namespace={0}".format(namespace) if namespace else "") | |
469 | ||
470 | return allow_access(self.mgr, client_entity, want_mds_cap, want_osd_cap, | |
471 | unwanted_mds_cap, unwanted_osd_cap, existing_caps) | |
472 | ||
473 | def deauthorize(self, auth_id): | |
474 | with self.auth_mdata_mgr.auth_lock(auth_id): | |
475 | # Existing meta, or None, to be updated | |
476 | auth_meta = self.auth_mdata_mgr.auth_metadata_get(auth_id) | |
477 | ||
478 | if auth_meta is None: | |
479 | msg = "auth ID: {0} doesn't exist".format(auth_id) | |
480 | log.error(msg) | |
481 | raise VolumeException(-errno.ENOENT, msg) | |
482 | ||
483 | # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key | |
484 | if 'volumes' in auth_meta: | |
485 | auth_meta['subvolumes'] = auth_meta.pop('volumes') | |
486 | ||
487 | group_name = self.group.groupname if self.group.groupname != Group.NO_GROUP_NAME else None | |
488 | group_subvol_id = "{0}/{1}".format(group_name, self.subvolname) | |
489 | if (auth_meta is None) or (not auth_meta['subvolumes']): | |
490 | log.warning("deauthorized called for already-removed auth" | |
491 | "ID '{auth_id}' for subvolume '{subvolume}'".format( | |
492 | auth_id=auth_id, subvolume=self.subvolname | |
493 | )) | |
494 | # Clean up the auth meta file of an auth ID | |
495 | self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id)) | |
496 | return | |
497 | ||
498 | if group_subvol_id not in auth_meta['subvolumes']: | |
499 | log.warning("deauthorized called for already-removed auth" | |
500 | "ID '{auth_id}' for subvolume '{subvolume}'".format( | |
501 | auth_id=auth_id, subvolume=self.subvolname | |
502 | )) | |
503 | return | |
504 | ||
505 | if auth_meta['dirty']: | |
506 | self._recover_auth_meta(auth_id, auth_meta) | |
507 | ||
508 | auth_meta['dirty'] = True | |
509 | auth_meta['subvolumes'][group_subvol_id]['dirty'] = True | |
510 | self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) | |
511 | ||
512 | self._deauthorize_subvolume(auth_id) | |
513 | ||
514 | # Filter out the volume we're deauthorizing | |
515 | del auth_meta['subvolumes'][group_subvol_id] | |
516 | ||
517 | # Clean up auth meta file | |
518 | if not auth_meta['subvolumes']: | |
519 | self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id)) | |
520 | return | |
521 | ||
522 | auth_meta['dirty'] = False | |
523 | self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta) | |
524 | ||
525 | def _deauthorize_subvolume(self, auth_id): | |
526 | with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname): | |
527 | subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname) | |
528 | ||
529 | if (subvol_meta is None) or (auth_id not in subvol_meta['auths']): | |
530 | log.warning("deauthorized called for already-removed auth" | |
531 | "ID '{auth_id}' for subvolume '{subvolume}'".format( | |
532 | auth_id=auth_id, subvolume=self.subvolname | |
533 | )) | |
534 | return | |
535 | ||
536 | subvol_meta['auths'][auth_id]['dirty'] = True | |
537 | self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta) | |
538 | ||
539 | self._deauthorize(auth_id) | |
540 | ||
541 | # Remove the auth_id from the metadata *after* removing it | |
542 | # from ceph, so that if we crashed here, we would actually | |
543 | # recreate the auth ID during recovery (i.e. end up with | |
544 | # a consistent state). | |
545 | ||
546 | # Filter out the auth we're removing | |
547 | del subvol_meta['auths'][auth_id] | |
548 | self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta) | |
549 | ||
550 | def _deauthorize(self, auth_id): | |
551 | """ | |
552 | The volume must still exist. | |
553 | """ | |
554 | client_entity = "client.{0}".format(auth_id) | |
555 | subvol_path = self.path | |
556 | try: | |
557 | pool_name = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool').decode('utf-8') | |
558 | except cephfs.Error as e: | |
559 | raise VolumeException(-e.args[0], e.args[1]) | |
560 | ||
561 | try: | |
562 | namespace = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool_namespace').decode('utf-8') | |
563 | except cephfs.NoData: | |
564 | namespace = None | |
565 | ||
566 | # The auth_id might have read-only or read-write mount access for the | |
567 | # subvolume path. | |
568 | access_levels = ('r', 'rw') | |
569 | want_mds_caps = ['allow {0} path={1}'.format(access_level, subvol_path.decode('utf-8')) | |
570 | for access_level in access_levels] | |
571 | want_osd_caps = ['allow {0} pool={1}{2}'.format( | |
572 | access_level, pool_name, " namespace={0}".format(namespace) if namespace else "") | |
573 | for access_level in access_levels] | |
574 | deny_access(self.mgr, client_entity, want_mds_caps, want_osd_caps) | |
575 | ||
576 | def authorized_list(self): | |
577 | """ | |
578 | Expose a list of auth IDs that have access to a subvolume. | |
579 | ||
580 | return: a list of (auth_id, access_level) tuples, where | |
581 | the access_level can be 'r' , or 'rw'. | |
582 | None if no auth ID is given access to the subvolume. | |
583 | """ | |
584 | with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname): | |
585 | meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname) | |
586 | auths = [] # type: List[Dict[str,str]] | |
587 | if not meta or not meta['auths']: | |
588 | return auths | |
589 | ||
590 | for auth, auth_data in meta['auths'].items(): | |
591 | # Skip partial auth updates. | |
592 | if not auth_data['dirty']: | |
593 | auths.append({auth: auth_data['access_level']}) | |
594 | ||
595 | return auths | |
596 | ||
597 | def evict(self, volname, auth_id, timeout=30): | |
598 | """ | |
599 | Evict all clients based on the authorization ID and the subvolume path mounted. | |
600 | Assumes that the authorization key has been revoked prior to calling this function. | |
601 | ||
602 | This operation can throw an exception if the mon cluster is unresponsive, or | |
603 | any individual MDS daemon is unresponsive for longer than the timeout passed in. | |
604 | """ | |
605 | ||
606 | client_spec = ["auth_name={0}".format(auth_id), ] | |
607 | client_spec.append("client_metadata.root={0}". | |
608 | format(self.path.decode('utf-8'))) | |
609 | ||
610 | log.info("evict clients with {0}".format(', '.join(client_spec))) | |
611 | ||
612 | mds_map = get_mds_map(self.mgr, volname) | |
613 | if not mds_map: | |
614 | raise VolumeException(-errno.ENOENT, "mdsmap for volume {0} not found".format(volname)) | |
615 | ||
616 | up = {} | |
617 | for name, gid in mds_map['up'].items(): | |
618 | # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0" | |
619 | assert name.startswith("mds_") | |
620 | up[int(name[4:])] = gid | |
621 | ||
622 | # For all MDS ranks held by a daemon | |
623 | # Do the parallelism in python instead of using "tell mds.*", because | |
624 | # the latter doesn't give us per-mds output | |
625 | threads = [] | |
626 | for rank, gid in up.items(): | |
627 | thread = RankEvicter(self.mgr, self.fs, client_spec, volname, rank, gid, mds_map, timeout) | |
628 | thread.start() | |
629 | threads.append(thread) | |
630 | ||
631 | for t in threads: | |
632 | t.join() | |
633 | ||
634 | log.info("evict: joined all") | |
635 | ||
636 | for t in threads: | |
637 | if not t.success: | |
638 | msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}". | |
639 | format(', '.join(client_spec), t.rank, t.gid, t.exception) | |
640 | ) | |
641 | log.error(msg) | |
642 | raise EvictionError(msg) | |
643 | ||
92f5a8d4 TL |
644 | def _get_clone_source(self): |
645 | try: | |
646 | clone_source = { | |
647 | 'volume' : self.metadata_mgr.get_option("source", "volume"), | |
648 | 'subvolume': self.metadata_mgr.get_option("source", "subvolume"), | |
649 | 'snapshot' : self.metadata_mgr.get_option("source", "snapshot"), | |
650 | } | |
651 | ||
652 | try: | |
653 | clone_source["group"] = self.metadata_mgr.get_option("source", "group") | |
654 | except MetadataMgrException as me: | |
655 | if me.errno == -errno.ENOENT: | |
656 | pass | |
657 | else: | |
658 | raise | |
659 | except MetadataMgrException as me: | |
660 | raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata") | |
661 | return clone_source | |
662 | ||
663 | @property | |
664 | def status(self): | |
adb31ebb TL |
665 | state = SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE)) |
666 | subvolume_type = self.subvol_type | |
92f5a8d4 | 667 | subvolume_status = { |
adb31ebb | 668 | 'state' : state.value |
92f5a8d4 | 669 | } |
adb31ebb | 670 | if not SubvolumeOpSm.is_complete_state(state) and subvolume_type == SubvolumeTypes.TYPE_CLONE: |
92f5a8d4 TL |
671 | subvolume_status["source"] = self._get_clone_source() |
672 | return subvolume_status | |
673 | ||
674 | @property | |
675 | def state(self): | |
adb31ebb | 676 | return SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE)) |
92f5a8d4 TL |
677 | |
678 | @state.setter | |
679 | def state(self, val): | |
adb31ebb | 680 | state = val[0].value |
92f5a8d4 TL |
681 | flush = val[1] |
682 | self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, state) | |
683 | if flush: | |
684 | self.metadata_mgr.flush() | |
685 | ||
adb31ebb TL |
686 | def remove(self, retainsnaps=False): |
687 | if retainsnaps: | |
688 | raise VolumeException(-errno.EINVAL, "subvolume '{0}' does not support snapshot retention on delete".format(self.subvolname)) | |
689 | if self.list_snapshots(): | |
690 | raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(self.subvolname)) | |
92f5a8d4 TL |
691 | self.trash_base_dir() |
692 | ||
693 | def resize(self, newsize, noshrink): | |
694 | subvol_path = self.path | |
695 | return self._resize(subvol_path, newsize, noshrink) | |
696 | ||
92f5a8d4 | 697 | def create_snapshot(self, snapname): |
cd265ab1 TL |
698 | try: |
699 | group_snapshot_path = os.path.join(self.group.path, | |
700 | self.vol_spec.snapshot_dir_prefix.encode('utf-8'), | |
701 | snapname.encode('utf-8')) | |
702 | self.fs.stat(group_snapshot_path) | |
703 | except cephfs.Error as e: | |
704 | if e.args[0] == errno.ENOENT: | |
705 | snappath = self.snapshot_path(snapname) | |
706 | mksnap(self.fs, snappath) | |
707 | else: | |
708 | raise VolumeException(-e.args[0], e.args[1]) | |
709 | else: | |
710 | raise VolumeException(-errno.EINVAL, "subvolumegroup and subvolume snapshot name can't be same") | |
92f5a8d4 | 711 | |
92f5a8d4 TL |
712 | def has_pending_clones(self, snapname): |
713 | try: | |
714 | return self.metadata_mgr.section_has_item('clone snaps', snapname) | |
715 | except MetadataMgrException as me: | |
716 | if me.errno == -errno.ENOENT: | |
717 | return False | |
718 | raise | |
719 | ||
720 | def remove_snapshot(self, snapname): | |
f6b5b4d7 TL |
721 | if self.has_pending_clones(snapname): |
722 | raise VolumeException(-errno.EAGAIN, "snapshot '{0}' has pending clones".format(snapname)) | |
92f5a8d4 TL |
723 | snappath = self.snapshot_path(snapname) |
724 | rmsnap(self.fs, snappath) | |
725 | ||
e306af50 | 726 | def snapshot_info(self, snapname): |
cd265ab1 TL |
727 | if is_inherited_snap(snapname): |
728 | raise VolumeException(-errno.EINVAL, | |
729 | "snapshot name '{0}' is invalid".format(snapname)) | |
adb31ebb | 730 | snappath = self.snapshot_data_path(snapname) |
e306af50 TL |
731 | snap_info = {} |
732 | try: | |
733 | snap_attrs = {'created_at':'ceph.snap.btime', 'size':'ceph.dir.rbytes', | |
734 | 'data_pool':'ceph.dir.layout.pool'} | |
735 | for key, val in snap_attrs.items(): | |
736 | snap_info[key] = self.fs.getxattr(snappath, val) | |
737 | return {'size': int(snap_info['size']), | |
738 | 'created_at': str(datetime.fromtimestamp(float(snap_info['created_at']))), | |
739 | 'data_pool': snap_info['data_pool'].decode('utf-8'), | |
e306af50 TL |
740 | 'has_pending_clones': "yes" if self.has_pending_clones(snapname) else "no"} |
741 | except cephfs.Error as e: | |
742 | if e.errno == errno.ENOENT: | |
743 | raise VolumeException(-errno.ENOENT, | |
adb31ebb | 744 | "snapshot '{0}' does not exist".format(snapname)) |
e306af50 TL |
745 | raise VolumeException(-e.args[0], e.args[1]) |
746 | ||
92f5a8d4 TL |
747 | def list_snapshots(self): |
748 | try: | |
adb31ebb | 749 | dirpath = self.snapshot_base_path() |
cd265ab1 | 750 | return listsnaps(self.fs, self.vol_spec, dirpath, filter_inherited_snaps=True) |
92f5a8d4 TL |
751 | except VolumeException as ve: |
752 | if ve.errno == -errno.ENOENT: | |
753 | return [] | |
754 | raise | |
755 | ||
92f5a8d4 TL |
756 | def _add_snap_clone(self, track_id, snapname): |
757 | self.metadata_mgr.add_section("clone snaps") | |
758 | self.metadata_mgr.update_section("clone snaps", track_id, snapname) | |
759 | self.metadata_mgr.flush() | |
760 | ||
761 | def _remove_snap_clone(self, track_id): | |
762 | self.metadata_mgr.remove_option("clone snaps", track_id) | |
763 | self.metadata_mgr.flush() | |
764 | ||
765 | def attach_snapshot(self, snapname, tgt_subvolume): | |
766 | if not snapname.encode('utf-8') in self.list_snapshots(): | |
767 | raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname)) | |
92f5a8d4 TL |
768 | try: |
769 | create_clone_index(self.fs, self.vol_spec) | |
770 | with open_clone_index(self.fs, self.vol_spec) as index: | |
771 | track_idx = index.track(tgt_subvolume.base_path) | |
772 | self._add_snap_clone(track_idx, snapname) | |
773 | except (IndexException, MetadataMgrException) as e: | |
e306af50 | 774 | log.warning("error creating clone index: {0}".format(e)) |
92f5a8d4 TL |
775 | raise VolumeException(-errno.EINVAL, "error cloning subvolume") |
776 | ||
777 | def detach_snapshot(self, snapname, track_id): | |
778 | if not snapname.encode('utf-8') in self.list_snapshots(): | |
779 | raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname)) | |
780 | try: | |
781 | with open_clone_index(self.fs, self.vol_spec) as index: | |
782 | index.untrack(track_id) | |
783 | self._remove_snap_clone(track_id) | |
784 | except (IndexException, MetadataMgrException) as e: | |
e306af50 | 785 | log.warning("error delining snapshot from clone: {0}".format(e)) |
92f5a8d4 | 786 | raise VolumeException(-errno.EINVAL, "error delinking snapshot from clone") |