]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / operations / versions / subvolume_v1.py
1 import os
2 import sys
3 import stat
4 import uuid
5 import errno
6 import logging
7 import json
8 from datetime import datetime
9 from typing import List, Dict
11 import cephfs
13 from .metadata_manager import MetadataManager
14 from .subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeFeatures
15 from .op_sm import SubvolumeOpSm
16 from .subvolume_base import SubvolumeBase
17 from ..template import SubvolumeTemplate
18 from ..snapshot_util import mksnap, rmsnap
19 from ..access import allow_access, deny_access
20 from ...exception import IndexException, OpSmException, VolumeException, MetadataMgrException, EvictionError
21 from ...fs_util import listsnaps, is_inherited_snap, create_base_dir
22 from ..template import SubvolumeOpType
23 from ..group import Group
24 from ..rankevicter import RankEvicter
25 from ..volume import get_mds_map
27 from ..clone_index import open_clone_index, create_clone_index
29 log = logging.getLogger(__name__)
31 class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
32 """
33 Version 1 subvolumes creates a subvolume with path as follows,
34 volumes/<group-name>/<subvolume-name>/<uuid>/
36 - The directory under which user data resides is <uuid>
37 - Snapshots of the subvolume are taken within the <uuid> directory
38 - A meta file is maintained under the <subvolume-name> directory as a metadata store, typically storing,
39 - global information about the subvolume (version, path, type, state)
40 - snapshots attached to an ongoing clone operation
41 - clone snapshot source if subvolume is a clone of a snapshot
42 - It retains backward compatability with legacy subvolumes by creating the meta file for legacy subvolumes under
43 /volumes/_legacy/ (see legacy_config_path), thus allowing cloning of older legacy volumes that lack the <uuid>
44 component in the path.
45 """
46 VERSION = 1
48 @staticmethod
49 def version():
50 return SubvolumeV1.VERSION
52 @property
53 def path(self):
54 try:
55 # no need to stat the path -- open() does that
56 return self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH).encode('utf-8')
57 except MetadataMgrException as me:
58 raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata")
60 @property
61 def features(self):
62 return [SubvolumeFeatures.FEATURE_SNAPSHOT_CLONE.value, SubvolumeFeatures.FEATURE_SNAPSHOT_AUTOPROTECT.value]
64 def mark_subvolume(self):
65 # set subvolume attr, on subvolume root, marking it as a CephFS subvolume
66 # subvolume root is where snapshots would be taken, and hence is the <uuid> dir for v1 subvolumes
67 try:
68 # MDS treats this as a noop for already marked subvolume
69 self.fs.setxattr(self.path, 'ceph.dir.subvolume', b'1', 0)
70 except cephfs.InvalidValue as e:
71 raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume")
72 except cephfs.Error as e:
73 raise VolumeException(-e.args[0], e.args[1])
75 def snapshot_base_path(self):
76 """ Base path for all snapshots """
77 return os.path.join(self.path, self.vol_spec.snapshot_dir_prefix.encode('utf-8'))
79 def snapshot_path(self, snapname):
80 """ Path to a specific snapshot named 'snapname' """
81 return os.path.join(self.snapshot_base_path(), snapname.encode('utf-8'))
83 def snapshot_data_path(self, snapname):
84 """ Path to user data directory within a subvolume snapshot named 'snapname' """
85 return self.snapshot_path(snapname)
87 def create(self, size, isolate_nspace, pool, mode, uid, gid):
88 subvolume_type = SubvolumeTypes.TYPE_NORMAL
89 try:
90 initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
91 except OpSmException as oe:
92 raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
94 subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
95 try:
96 # create group directory with default mode(0o755) if it doesn't exist.
97 create_base_dir(self.fs, self.group.path, self.vol_spec.DEFAULT_MODE)
98 # create directory and set attributes
99 self.fs.mkdirs(subvol_path, mode)
100 self.mark_subvolume()
101 attrs = {
102 'uid': uid,
103 'gid': gid,
104 'data_pool': pool,
105 'pool_namespace': self.namespace if isolate_nspace else None,
106 'quota': size
107 }
108 self.set_attrs(subvol_path, attrs)
110 # persist subvolume metadata
111 qpath = subvol_path.decode('utf-8')
112 self.init_config(SubvolumeV1.VERSION, subvolume_type, qpath, initial_state)
113 except (VolumeException, MetadataMgrException, cephfs.Error) as e:
114 try:
115 log.info("cleaning up subvolume with path: {0}".format(self.subvolname))
116 self.remove()
117 except VolumeException as ve:
118 log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve))
120 if isinstance(e, MetadataMgrException):
121 log.error("metadata manager exception: {0}".format(e))
122 e = VolumeException(-errno.EINVAL, "exception in subvolume metadata")
123 elif isinstance(e, cephfs.Error):
124 e = VolumeException(-e.args[0], e.args[1])
125 raise e
127 def add_clone_source(self, volname, subvolume, snapname, flush=False):
128 self.metadata_mgr.add_section("source")
129 self.metadata_mgr.update_section("source", "volume", volname)
130 if not subvolume.group.is_default_group():
131 self.metadata_mgr.update_section("source", "group", subvolume.group_name)
132 self.metadata_mgr.update_section("source", "subvolume", subvolume.subvol_name)
133 self.metadata_mgr.update_section("source", "snapshot", snapname)
134 if flush:
135 self.metadata_mgr.flush()
137 def remove_clone_source(self, flush=False):
138 self.metadata_mgr.remove_section("source")
139 if flush:
140 self.metadata_mgr.flush()
142 def create_clone(self, pool, source_volname, source_subvolume, snapname):
143 subvolume_type = SubvolumeTypes.TYPE_CLONE
144 try:
145 initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
146 except OpSmException as oe:
147 raise VolumeException(-errno.EINVAL, "clone failed: internal error")
149 subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
150 try:
151 # source snapshot attrs are used to create clone subvolume.
152 # attributes of subvolume's content though, are synced during the cloning process.
153 attrs = source_subvolume.get_attrs(source_subvolume.snapshot_data_path(snapname))
155 # The source of the clone may have exceeded its quota limit as
156 # CephFS quotas are imprecise. Cloning such a source may fail if
157 # the quota on the destination is set before starting the clone
158 # copy. So always set the quota on destination after cloning is
159 # successful.
160 attrs["quota"] = None
162 # override snapshot pool setting, if one is provided for the clone
163 if pool is not None:
164 attrs["data_pool"] = pool
165 attrs["pool_namespace"] = None
167 # create directory and set attributes
168 self.fs.mkdirs(subvol_path, attrs.get("mode"))
169 self.mark_subvolume()
170 self.set_attrs(subvol_path, attrs)
172 # persist subvolume metadata and clone source
173 qpath = subvol_path.decode('utf-8')
174 self.metadata_mgr.init(SubvolumeV1.VERSION, subvolume_type.value, qpath, initial_state.value)
175 self.add_clone_source(source_volname, source_subvolume, snapname)
176 self.metadata_mgr.flush()
177 except (VolumeException, MetadataMgrException, cephfs.Error) as e:
178 try:
179 log.info("cleaning up subvolume with path: {0}".format(self.subvolname))
180 self.remove()
181 except VolumeException as ve:
182 log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve))
184 if isinstance(e, MetadataMgrException):
185 log.error("metadata manager exception: {0}".format(e))
186 e = VolumeException(-errno.EINVAL, "exception in subvolume metadata")
187 elif isinstance(e, cephfs.Error):
188 e = VolumeException(-e.args[0], e.args[1])
189 raise e
191 def allowed_ops_by_type(self, vol_type):
192 if vol_type == SubvolumeTypes.TYPE_CLONE:
193 return {op_type for op_type in SubvolumeOpType}
195 if vol_type == SubvolumeTypes.TYPE_NORMAL:
196 return {op_type for op_type in SubvolumeOpType} - {SubvolumeOpType.CLONE_STATUS,
197 SubvolumeOpType.CLONE_CANCEL,
198 SubvolumeOpType.CLONE_INTERNAL}
200 return {}
202 def allowed_ops_by_state(self, vol_state):
203 if vol_state == SubvolumeStates.STATE_COMPLETE:
204 return {op_type for op_type in SubvolumeOpType}
206 return {SubvolumeOpType.REMOVE_FORCE,
207 SubvolumeOpType.CLONE_CREATE,
208 SubvolumeOpType.CLONE_STATUS,
209 SubvolumeOpType.CLONE_CANCEL,
210 SubvolumeOpType.CLONE_INTERNAL}
212 def open(self, op_type):
213 if not isinstance(op_type, SubvolumeOpType):
214 raise VolumeException(-errno.ENOTSUP, "operation {0} not supported on subvolume '{1}'".format(
215 op_type.value, self.subvolname))
216 try:
217 self.metadata_mgr.refresh()
219 etype = self.subvol_type
220 if op_type not in self.allowed_ops_by_type(etype):
221 raise VolumeException(-errno.ENOTSUP, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format(
222 op_type.value, self.subvolname, etype.value))
224 estate = self.state
225 if op_type not in self.allowed_ops_by_state(estate):
226 raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for operation {1}".format(
227 self.subvolname, op_type.value))
229 subvol_path = self.path
230 log.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path))
231 st = self.fs.stat(subvol_path)
232 # unconditionally mark as subvolume, to handle pre-existing subvolumes without the mark
233 self.mark_subvolume()
235 self.uid = int(st.st_uid)
236 self.gid = int(st.st_gid)
237 self.mode = int(st.st_mode & ~stat.S_IFMT(st.st_mode))
238 except MetadataMgrException as me:
239 if me.errno == -errno.ENOENT:
240 raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname))
241 raise VolumeException(me.args[0], me.args[1])
242 except cephfs.ObjectNotFound:
243 log.debug("missing subvolume path '{0}' for subvolume '{1}'".format(subvol_path, self.subvolname))
244 raise VolumeException(-errno.ENOENT, "mount path missing for subvolume '{0}'".format(self.subvolname))
245 except cephfs.Error as e:
246 raise VolumeException(-e.args[0], e.args[1])
248 def _recover_auth_meta(self, auth_id, auth_meta):
249 """
250 Call me after locking the auth meta file.
251 """
252 remove_subvolumes = []
254 for subvol, subvol_data in auth_meta['subvolumes'].items():
255 if not subvol_data['dirty']:
256 continue
258 (group_name, subvol_name) = subvol.split('/')
259 group_name = group_name if group_name != 'None' else Group.NO_GROUP_NAME
260 access_level = subvol_data['access_level']
262 with self.auth_mdata_mgr.subvol_metadata_lock(group_name, subvol_name):
263 subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(group_name, subvol_name)
265 # No SVMeta update indicates that there was no auth update
266 # in Ceph either. So it's safe to remove corresponding
267 # partial update in AMeta.
268 if not subvol_meta or auth_id not in subvol_meta['auths']:
269 remove_subvolumes.append(subvol)
270 continue
272 want_auth = {
273 'access_level': access_level,
274 'dirty': False,
275 }
276 # SVMeta update looks clean. Ceph auth update must have been
277 # clean. Update the dirty flag and continue
278 if subvol_meta['auths'][auth_id] == want_auth:
279 auth_meta['subvolumes'][subvol]['dirty'] = False
280 self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
281 continue
283 client_entity = "client.{0}".format(auth_id)
284 ret, out, err = self.mgr.mon_command(
285 {
286 'prefix': 'auth get',
287 'entity': client_entity,
288 'format': 'json'
289 })
290 if ret == 0:
291 existing_caps = json.loads(out)
292 elif ret == -errno.ENOENT:
293 existing_caps = None
294 else:
295 log.error(err)
296 raise VolumeException(ret, err)
298 self._authorize_subvolume(auth_id, access_level, existing_caps)
300 # Recovered from partial auth updates for the auth ID's access
301 # to a subvolume.
302 auth_meta['subvolumes'][subvol]['dirty'] = False
303 self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
305 for subvol in remove_subvolumes:
306 del auth_meta['subvolumes'][subvol]
308 if not auth_meta['subvolumes']:
309 # Clean up auth meta file
310 self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id))
311 return
313 # Recovered from all partial auth updates for the auth ID.
314 auth_meta['dirty'] = False
315 self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
317 def authorize(self, auth_id, access_level, tenant_id=None, allow_existing_id=False):
318 """
319 Get-or-create a Ceph auth identity for `auth_id` and grant them access
320 to
321 :param auth_id:
322 :param access_level:
323 :param tenant_id: Optionally provide a stringizable object to
324 restrict any created cephx IDs to other callers
325 passing the same tenant ID.
326 :allow_existing_id: Optionally authorize existing auth-ids not
327 created by ceph_volume_client.
328 :return:
329 """
331 with self.auth_mdata_mgr.auth_lock(auth_id):
332 client_entity = "client.{0}".format(auth_id)
333 ret, out, err = self.mgr.mon_command(
334 {
335 'prefix': 'auth get',
336 'entity': client_entity,
337 'format': 'json'
338 })
340 if ret == 0:
341 existing_caps = json.loads(out)
342 elif ret == -errno.ENOENT:
343 existing_caps = None
344 else:
345 log.error(err)
346 raise VolumeException(ret, err)
348 # Existing meta, or None, to be updated
349 auth_meta = self.auth_mdata_mgr.auth_metadata_get(auth_id)
351 # subvolume data to be inserted
352 group_name = self.group.groupname if self.group.groupname != Group.NO_GROUP_NAME else None
353 group_subvol_id = "{0}/{1}".format(group_name, self.subvolname)
354 subvolume = {
355 group_subvol_id : {
356 # The access level at which the auth_id is authorized to
357 # access the volume.
358 'access_level': access_level,
359 'dirty': True,
360 }
361 }
363 if auth_meta is None:
364 if not allow_existing_id and existing_caps is not None:
365 msg = "auth ID: {0} exists and not created by mgr plugin. Not allowed to modify".format(auth_id)
366 log.error(msg)
367 raise VolumeException(-errno.EPERM, msg)
369 # non-existent auth IDs
370 sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format(
371 auth_id, tenant_id
372 ))
373 log.debug("Authorize: no existing meta")
374 auth_meta = {
375 'dirty': True,
376 'tenant_id': str(tenant_id) if tenant_id else None,
377 'subvolumes': subvolume
378 }
379 else:
380 # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key
381 if 'volumes' in auth_meta:
382 auth_meta['subvolumes'] = auth_meta.pop('volumes')
384 # Disallow tenants to share auth IDs
385 if str(auth_meta['tenant_id']) != str(tenant_id):
386 msg = "auth ID: {0} is already in use".format(auth_id)
387 log.error(msg)
388 raise VolumeException(-errno.EPERM, msg)
390 if auth_meta['dirty']:
391 self._recover_auth_meta(auth_id, auth_meta)
393 log.debug("Authorize: existing tenant {tenant}".format(
394 tenant=auth_meta['tenant_id']
395 ))
396 auth_meta['dirty'] = True
397 auth_meta['subvolumes'].update(subvolume)
399 self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
401 with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname):
402 key = self._authorize_subvolume(auth_id, access_level, existing_caps)
404 auth_meta['dirty'] = False
405 auth_meta['subvolumes'][group_subvol_id]['dirty'] = False
406 self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
408 if tenant_id:
409 return key
410 else:
411 # Caller wasn't multi-tenant aware: be safe and don't give
412 # them a key
413 return ""
415 def _authorize_subvolume(self, auth_id, access_level, existing_caps):
416 subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname)
418 auth = {
419 auth_id: {
420 'access_level': access_level,
421 'dirty': True,
422 }
423 }
425 if subvol_meta is None:
426 subvol_meta = {
427 'auths': auth
428 }
429 else:
430 subvol_meta['auths'].update(auth)
431 self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
433 key = self._authorize(auth_id, access_level, existing_caps)
435 subvol_meta['auths'][auth_id]['dirty'] = False
436 self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
438 return key
440 def _authorize(self, auth_id, access_level, existing_caps):
441 subvol_path = self.path
442 log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(auth_id, subvol_path))
444 # First I need to work out what the data pool is for this share:
445 # read the layout
446 try:
447 pool = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool').decode('utf-8')
448 except cephfs.Error as e:
449 raise VolumeException(-e.args[0], e.args[1])
451 try:
452 namespace = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool_namespace').decode('utf-8')
453 except cephfs.NoData:
454 namespace = None
456 # Now construct auth capabilities that give the guest just enough
457 # permissions to access the share
458 client_entity = "client.{0}".format(auth_id)
459 want_mds_cap = "allow {0} path={1}".format(access_level, subvol_path.decode('utf-8'))
460 want_osd_cap = "allow {0} pool={1}{2}".format(
461 access_level, pool, " namespace={0}".format(namespace) if namespace else "")
463 # Construct auth caps that if present might conflict with the desired
464 # auth caps.
465 unwanted_access_level = 'r' if access_level == 'rw' else 'rw'
466 unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, subvol_path.decode('utf-8'))
467 unwanted_osd_cap = "allow {0} pool={1}{2}".format(
468 unwanted_access_level, pool, " namespace={0}".format(namespace) if namespace else "")
470 return allow_access(self.mgr, client_entity, want_mds_cap, want_osd_cap,
471 unwanted_mds_cap, unwanted_osd_cap, existing_caps)
473 def deauthorize(self, auth_id):
474 with self.auth_mdata_mgr.auth_lock(auth_id):
475 # Existing meta, or None, to be updated
476 auth_meta = self.auth_mdata_mgr.auth_metadata_get(auth_id)
478 if auth_meta is None:
479 msg = "auth ID: {0} doesn't exist".format(auth_id)
480 log.error(msg)
481 raise VolumeException(-errno.ENOENT, msg)
483 # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key
484 if 'volumes' in auth_meta:
485 auth_meta['subvolumes'] = auth_meta.pop('volumes')
487 group_name = self.group.groupname if self.group.groupname != Group.NO_GROUP_NAME else None
488 group_subvol_id = "{0}/{1}".format(group_name, self.subvolname)
489 if (auth_meta is None) or (not auth_meta['subvolumes']):
490 log.warning("deauthorized called for already-removed auth"
491 "ID '{auth_id}' for subvolume '{subvolume}'".format(
492 auth_id=auth_id, subvolume=self.subvolname
493 ))
494 # Clean up the auth meta file of an auth ID
495 self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id))
496 return
498 if group_subvol_id not in auth_meta['subvolumes']:
499 log.warning("deauthorized called for already-removed auth"
500 "ID '{auth_id}' for subvolume '{subvolume}'".format(
501 auth_id=auth_id, subvolume=self.subvolname
502 ))
503 return
505 if auth_meta['dirty']:
506 self._recover_auth_meta(auth_id, auth_meta)
508 auth_meta['dirty'] = True
509 auth_meta['subvolumes'][group_subvol_id]['dirty'] = True
510 self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
512 self._deauthorize_subvolume(auth_id)
514 # Filter out the volume we're deauthorizing
515 del auth_meta['subvolumes'][group_subvol_id]
517 # Clean up auth meta file
518 if not auth_meta['subvolumes']:
519 self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id))
520 return
522 auth_meta['dirty'] = False
523 self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
525 def _deauthorize_subvolume(self, auth_id):
526 with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname):
527 subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname)
529 if (subvol_meta is None) or (auth_id not in subvol_meta['auths']):
530 log.warning("deauthorized called for already-removed auth"
531 "ID '{auth_id}' for subvolume '{subvolume}'".format(
532 auth_id=auth_id, subvolume=self.subvolname
533 ))
534 return
536 subvol_meta['auths'][auth_id]['dirty'] = True
537 self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
539 self._deauthorize(auth_id)
541 # Remove the auth_id from the metadata *after* removing it
542 # from ceph, so that if we crashed here, we would actually
543 # recreate the auth ID during recovery (i.e. end up with
544 # a consistent state).
546 # Filter out the auth we're removing
547 del subvol_meta['auths'][auth_id]
548 self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
550 def _deauthorize(self, auth_id):
551 """
552 The volume must still exist.
553 """
554 client_entity = "client.{0}".format(auth_id)
555 subvol_path = self.path
556 try:
557 pool_name = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool').decode('utf-8')
558 except cephfs.Error as e:
559 raise VolumeException(-e.args[0], e.args[1])
561 try:
562 namespace = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool_namespace').decode('utf-8')
563 except cephfs.NoData:
564 namespace = None
566 # The auth_id might have read-only or read-write mount access for the
567 # subvolume path.
568 access_levels = ('r', 'rw')
569 want_mds_caps = ['allow {0} path={1}'.format(access_level, subvol_path.decode('utf-8'))
570 for access_level in access_levels]
571 want_osd_caps = ['allow {0} pool={1}{2}'.format(
572 access_level, pool_name, " namespace={0}".format(namespace) if namespace else "")
573 for access_level in access_levels]
574 deny_access(self.mgr, client_entity, want_mds_caps, want_osd_caps)
576 def authorized_list(self):
577 """
578 Expose a list of auth IDs that have access to a subvolume.
580 return: a list of (auth_id, access_level) tuples, where
581 the access_level can be 'r' , or 'rw'.
582 None if no auth ID is given access to the subvolume.
583 """
584 with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname):
585 meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname)
586 auths = [] # type: List[Dict[str,str]]
587 if not meta or not meta['auths']:
588 return auths
590 for auth, auth_data in meta['auths'].items():
591 # Skip partial auth updates.
592 if not auth_data['dirty']:
593 auths.append({auth: auth_data['access_level']})
595 return auths
597 def evict(self, volname, auth_id, timeout=30):
598 """
599 Evict all clients based on the authorization ID and the subvolume path mounted.
600 Assumes that the authorization key has been revoked prior to calling this function.
602 This operation can throw an exception if the mon cluster is unresponsive, or
603 any individual MDS daemon is unresponsive for longer than the timeout passed in.
604 """
606 client_spec = ["auth_name={0}".format(auth_id), ]
607 client_spec.append("client_metadata.root={0}".
608 format(self.path.decode('utf-8')))
610 log.info("evict clients with {0}".format(', '.join(client_spec)))
612 mds_map = get_mds_map(self.mgr, volname)
613 if not mds_map:
614 raise VolumeException(-errno.ENOENT, "mdsmap for volume {0} not found".format(volname))
616 up = {}
617 for name, gid in mds_map['up'].items():
618 # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
619 assert name.startswith("mds_")
620 up[int(name[4:])] = gid
622 # For all MDS ranks held by a daemon
623 # Do the parallelism in python instead of using "tell mds.*", because
624 # the latter doesn't give us per-mds output
625 threads = []
626 for rank, gid in up.items():
627 thread = RankEvicter(self.mgr, self.fs, client_spec, volname, rank, gid, mds_map, timeout)
628 thread.start()
629 threads.append(thread)
631 for t in threads:
632 t.join()
634 log.info("evict: joined all")
636 for t in threads:
637 if not t.success:
638 msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}".
639 format(', '.join(client_spec), t.rank, t.gid, t.exception)
640 )
641 log.error(msg)
642 raise EvictionError(msg)
644 def _get_clone_source(self):
645 try:
646 clone_source = {
647 'volume' : self.metadata_mgr.get_option("source", "volume"),
648 'subvolume': self.metadata_mgr.get_option("source", "subvolume"),
649 'snapshot' : self.metadata_mgr.get_option("source", "snapshot"),
650 }
652 try:
653 clone_source["group"] = self.metadata_mgr.get_option("source", "group")
654 except MetadataMgrException as me:
655 if me.errno == -errno.ENOENT:
656 pass
657 else:
658 raise
659 except MetadataMgrException as me:
660 raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata")
661 return clone_source
663 @property
664 def status(self):
665 state = SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE))
666 subvolume_type = self.subvol_type
667 subvolume_status = {
668 'state' : state.value
669 }
670 if not SubvolumeOpSm.is_complete_state(state) and subvolume_type == SubvolumeTypes.TYPE_CLONE:
671 subvolume_status["source"] = self._get_clone_source()
672 return subvolume_status
674 @property
675 def state(self):
676 return SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE))
678 @state.setter
679 def state(self, val):
680 state = val[0].value
681 flush = val[1]
682 self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, state)
683 if flush:
684 self.metadata_mgr.flush()
686 def remove(self, retainsnaps=False):
687 if retainsnaps:
688 raise VolumeException(-errno.EINVAL, "subvolume '{0}' does not support snapshot retention on delete".format(self.subvolname))
689 if self.list_snapshots():
690 raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(self.subvolname))
691 self.trash_base_dir()
693 def resize(self, newsize, noshrink):
694 subvol_path = self.path
695 return self._resize(subvol_path, newsize, noshrink)
697 def create_snapshot(self, snapname):
698 try:
699 group_snapshot_path = os.path.join(self.group.path,
700 self.vol_spec.snapshot_dir_prefix.encode('utf-8'),
701 snapname.encode('utf-8'))
702 self.fs.stat(group_snapshot_path)
703 except cephfs.Error as e:
704 if e.args[0] == errno.ENOENT:
705 snappath = self.snapshot_path(snapname)
706 mksnap(self.fs, snappath)
707 else:
708 raise VolumeException(-e.args[0], e.args[1])
709 else:
710 raise VolumeException(-errno.EINVAL, "subvolumegroup and subvolume snapshot name can't be same")
712 def has_pending_clones(self, snapname):
713 try:
714 return self.metadata_mgr.section_has_item('clone snaps', snapname)
715 except MetadataMgrException as me:
716 if me.errno == -errno.ENOENT:
717 return False
718 raise
720 def remove_snapshot(self, snapname):
721 if self.has_pending_clones(snapname):
722 raise VolumeException(-errno.EAGAIN, "snapshot '{0}' has pending clones".format(snapname))
723 snappath = self.snapshot_path(snapname)
724 rmsnap(self.fs, snappath)
726 def snapshot_info(self, snapname):
727 if is_inherited_snap(snapname):
728 raise VolumeException(-errno.EINVAL,
729 "snapshot name '{0}' is invalid".format(snapname))
730 snappath = self.snapshot_data_path(snapname)
731 snap_info = {}
732 try:
733 snap_attrs = {'created_at':'ceph.snap.btime', 'size':'ceph.dir.rbytes',
734 'data_pool':'ceph.dir.layout.pool'}
735 for key, val in snap_attrs.items():
736 snap_info[key] = self.fs.getxattr(snappath, val)
737 return {'size': int(snap_info['size']),
738 'created_at': str(datetime.fromtimestamp(float(snap_info['created_at']))),
739 'data_pool': snap_info['data_pool'].decode('utf-8'),
740 'has_pending_clones': "yes" if self.has_pending_clones(snapname) else "no"}
741 except cephfs.Error as e:
742 if e.errno == errno.ENOENT:
743 raise VolumeException(-errno.ENOENT,
744 "snapshot '{0}' does not exist".format(snapname))
745 raise VolumeException(-e.args[0], e.args[1])
747 def list_snapshots(self):
748 try:
749 dirpath = self.snapshot_base_path()
750 return listsnaps(self.fs, self.vol_spec, dirpath, filter_inherited_snaps=True)
751 except VolumeException as ve:
752 if ve.errno == -errno.ENOENT:
753 return []
754 raise
756 def _add_snap_clone(self, track_id, snapname):
757 self.metadata_mgr.add_section("clone snaps")
758 self.metadata_mgr.update_section("clone snaps", track_id, snapname)
759 self.metadata_mgr.flush()
761 def _remove_snap_clone(self, track_id):
762 self.metadata_mgr.remove_option("clone snaps", track_id)
763 self.metadata_mgr.flush()
765 def attach_snapshot(self, snapname, tgt_subvolume):
766 if not snapname.encode('utf-8') in self.list_snapshots():
767 raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
768 try:
769 create_clone_index(self.fs, self.vol_spec)
770 with open_clone_index(self.fs, self.vol_spec) as index:
771 track_idx = index.track(tgt_subvolume.base_path)
772 self._add_snap_clone(track_idx, snapname)
773 except (IndexException, MetadataMgrException) as e:
774 log.warning("error creating clone index: {0}".format(e))
775 raise VolumeException(-errno.EINVAL, "error cloning subvolume")
777 def detach_snapshot(self, snapname, track_id):
778 if not snapname.encode('utf-8') in self.list_snapshots():
779 raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
780 try:
781 with open_clone_index(self.fs, self.vol_spec) as index:
782 index.untrack(track_id)
783 self._remove_snap_clone(track_id)
784 except (IndexException, MetadataMgrException) as e:
785 log.warning("error delining snapshot from clone: {0}".format(e))
786 raise VolumeException(-errno.EINVAL, "error delinking snapshot from clone")