8 from datetime
import datetime
9 from typing
import List
, Dict
13 from .metadata_manager
import MetadataManager
14 from .subvolume_attrs
import SubvolumeTypes
, SubvolumeStates
, SubvolumeFeatures
15 from .op_sm
import SubvolumeOpSm
16 from .subvolume_base
import SubvolumeBase
17 from ..template
import SubvolumeTemplate
18 from ..snapshot_util
import mksnap
, rmsnap
19 from ..access
import allow_access
, deny_access
20 from ...exception
import IndexException
, OpSmException
, VolumeException
, MetadataMgrException
, EvictionError
21 from ...fs_util
import listsnaps
, is_inherited_snap
, create_base_dir
22 from ..template
import SubvolumeOpType
23 from ..group
import Group
24 from ..rankevicter
import RankEvicter
25 from ..volume
import get_mds_map
27 from ..clone_index
import open_clone_index
, create_clone_index
29 log
= logging
.getLogger(__name__
)
31 class SubvolumeV1(SubvolumeBase
, SubvolumeTemplate
):
33 Version 1 subvolumes creates a subvolume with path as follows,
34 volumes/<group-name>/<subvolume-name>/<uuid>/
36 - The directory under which user data resides is <uuid>
37 - Snapshots of the subvolume are taken within the <uuid> directory
38 - A meta file is maintained under the <subvolume-name> directory as a metadata store, typically storing,
39 - global information about the subvolume (version, path, type, state)
40 - snapshots attached to an ongoing clone operation
41 - clone snapshot source if subvolume is a clone of a snapshot
42 - It retains backward compatability with legacy subvolumes by creating the meta file for legacy subvolumes under
43 /volumes/_legacy/ (see legacy_config_path), thus allowing cloning of older legacy volumes that lack the <uuid>
44 component in the path.
50 return SubvolumeV1
.VERSION
55 # no need to stat the path -- open() does that
56 return self
.metadata_mgr
.get_global_option(MetadataManager
.GLOBAL_META_KEY_PATH
).encode('utf-8')
57 except MetadataMgrException
as me
:
58 raise VolumeException(-errno
.EINVAL
, "error fetching subvolume metadata")
62 return [SubvolumeFeatures
.FEATURE_SNAPSHOT_CLONE
.value
, SubvolumeFeatures
.FEATURE_SNAPSHOT_AUTOPROTECT
.value
]
64 def mark_subvolume(self
):
65 # set subvolume attr, on subvolume root, marking it as a CephFS subvolume
66 # subvolume root is where snapshots would be taken, and hence is the <uuid> dir for v1 subvolumes
68 # MDS treats this as a noop for already marked subvolume
69 self
.fs
.setxattr(self
.path
, 'ceph.dir.subvolume', b
'1', 0)
70 except cephfs
.InvalidValue
as e
:
71 raise VolumeException(-errno
.EINVAL
, "invalid value specified for ceph.dir.subvolume")
72 except cephfs
.Error
as e
:
73 raise VolumeException(-e
.args
[0], e
.args
[1])
75 def snapshot_base_path(self
):
76 """ Base path for all snapshots """
77 return os
.path
.join(self
.path
, self
.vol_spec
.snapshot_dir_prefix
.encode('utf-8'))
79 def snapshot_path(self
, snapname
):
80 """ Path to a specific snapshot named 'snapname' """
81 return os
.path
.join(self
.snapshot_base_path(), snapname
.encode('utf-8'))
83 def snapshot_data_path(self
, snapname
):
84 """ Path to user data directory within a subvolume snapshot named 'snapname' """
85 return self
.snapshot_path(snapname
)
87 def create(self
, size
, isolate_nspace
, pool
, mode
, uid
, gid
):
88 subvolume_type
= SubvolumeTypes
.TYPE_NORMAL
90 initial_state
= SubvolumeOpSm
.get_init_state(subvolume_type
)
91 except OpSmException
as oe
:
92 raise VolumeException(-errno
.EINVAL
, "subvolume creation failed: internal error")
94 subvol_path
= os
.path
.join(self
.base_path
, str(uuid
.uuid4()).encode('utf-8'))
96 # create group directory with default mode(0o755) if it doesn't exist.
97 create_base_dir(self
.fs
, self
.group
.path
, self
.vol_spec
.DEFAULT_MODE
)
98 # create directory and set attributes
99 self
.fs
.mkdirs(subvol_path
, mode
)
100 self
.mark_subvolume()
105 'pool_namespace': self
.namespace
if isolate_nspace
else None,
108 self
.set_attrs(subvol_path
, attrs
)
110 # persist subvolume metadata
111 qpath
= subvol_path
.decode('utf-8')
112 self
.init_config(SubvolumeV1
.VERSION
, subvolume_type
, qpath
, initial_state
)
113 except (VolumeException
, MetadataMgrException
, cephfs
.Error
) as e
:
115 log
.info("cleaning up subvolume with path: {0}".format(self
.subvolname
))
117 except VolumeException
as ve
:
118 log
.info("failed to cleanup subvolume '{0}' ({1})".format(self
.subvolname
, ve
))
120 if isinstance(e
, MetadataMgrException
):
121 log
.error("metadata manager exception: {0}".format(e
))
122 e
= VolumeException(-errno
.EINVAL
, "exception in subvolume metadata")
123 elif isinstance(e
, cephfs
.Error
):
124 e
= VolumeException(-e
.args
[0], e
.args
[1])
127 def add_clone_source(self
, volname
, subvolume
, snapname
, flush
=False):
128 self
.metadata_mgr
.add_section("source")
129 self
.metadata_mgr
.update_section("source", "volume", volname
)
130 if not subvolume
.group
.is_default_group():
131 self
.metadata_mgr
.update_section("source", "group", subvolume
.group_name
)
132 self
.metadata_mgr
.update_section("source", "subvolume", subvolume
.subvol_name
)
133 self
.metadata_mgr
.update_section("source", "snapshot", snapname
)
135 self
.metadata_mgr
.flush()
137 def remove_clone_source(self
, flush
=False):
138 self
.metadata_mgr
.remove_section("source")
140 self
.metadata_mgr
.flush()
142 def create_clone(self
, pool
, source_volname
, source_subvolume
, snapname
):
143 subvolume_type
= SubvolumeTypes
.TYPE_CLONE
145 initial_state
= SubvolumeOpSm
.get_init_state(subvolume_type
)
146 except OpSmException
as oe
:
147 raise VolumeException(-errno
.EINVAL
, "clone failed: internal error")
149 subvol_path
= os
.path
.join(self
.base_path
, str(uuid
.uuid4()).encode('utf-8'))
151 # source snapshot attrs are used to create clone subvolume.
152 # attributes of subvolume's content though, are synced during the cloning process.
153 attrs
= source_subvolume
.get_attrs(source_subvolume
.snapshot_data_path(snapname
))
155 # The source of the clone may have exceeded its quota limit as
156 # CephFS quotas are imprecise. Cloning such a source may fail if
157 # the quota on the destination is set before starting the clone
158 # copy. So always set the quota on destination after cloning is
160 attrs
["quota"] = None
162 # override snapshot pool setting, if one is provided for the clone
164 attrs
["data_pool"] = pool
165 attrs
["pool_namespace"] = None
167 # create directory and set attributes
168 self
.fs
.mkdirs(subvol_path
, attrs
.get("mode"))
169 self
.mark_subvolume()
170 self
.set_attrs(subvol_path
, attrs
)
172 # persist subvolume metadata and clone source
173 qpath
= subvol_path
.decode('utf-8')
174 self
.metadata_mgr
.init(SubvolumeV1
.VERSION
, subvolume_type
.value
, qpath
, initial_state
.value
)
175 self
.add_clone_source(source_volname
, source_subvolume
, snapname
)
176 self
.metadata_mgr
.flush()
177 except (VolumeException
, MetadataMgrException
, cephfs
.Error
) as e
:
179 log
.info("cleaning up subvolume with path: {0}".format(self
.subvolname
))
181 except VolumeException
as ve
:
182 log
.info("failed to cleanup subvolume '{0}' ({1})".format(self
.subvolname
, ve
))
184 if isinstance(e
, MetadataMgrException
):
185 log
.error("metadata manager exception: {0}".format(e
))
186 e
= VolumeException(-errno
.EINVAL
, "exception in subvolume metadata")
187 elif isinstance(e
, cephfs
.Error
):
188 e
= VolumeException(-e
.args
[0], e
.args
[1])
191 def allowed_ops_by_type(self
, vol_type
):
192 if vol_type
== SubvolumeTypes
.TYPE_CLONE
:
193 return {op_type
for op_type
in SubvolumeOpType
}
195 if vol_type
== SubvolumeTypes
.TYPE_NORMAL
:
196 return {op_type
for op_type
in SubvolumeOpType
} - {SubvolumeOpType
.CLONE_STATUS
,
197 SubvolumeOpType
.CLONE_CANCEL
,
198 SubvolumeOpType
.CLONE_INTERNAL
}
202 def allowed_ops_by_state(self
, vol_state
):
203 if vol_state
== SubvolumeStates
.STATE_COMPLETE
:
204 return {op_type
for op_type
in SubvolumeOpType
}
206 return {SubvolumeOpType
.REMOVE_FORCE
,
207 SubvolumeOpType
.CLONE_CREATE
,
208 SubvolumeOpType
.CLONE_STATUS
,
209 SubvolumeOpType
.CLONE_CANCEL
,
210 SubvolumeOpType
.CLONE_INTERNAL
}
212 def open(self
, op_type
):
213 if not isinstance(op_type
, SubvolumeOpType
):
214 raise VolumeException(-errno
.ENOTSUP
, "operation {0} not supported on subvolume '{1}'".format(
215 op_type
.value
, self
.subvolname
))
217 self
.metadata_mgr
.refresh()
219 etype
= self
.subvol_type
220 if op_type
not in self
.allowed_ops_by_type(etype
):
221 raise VolumeException(-errno
.ENOTSUP
, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format(
222 op_type
.value
, self
.subvolname
, etype
.value
))
225 if op_type
not in self
.allowed_ops_by_state(estate
):
226 raise VolumeException(-errno
.EAGAIN
, "subvolume '{0}' is not ready for operation {1}".format(
227 self
.subvolname
, op_type
.value
))
229 subvol_path
= self
.path
230 log
.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path
))
231 st
= self
.fs
.stat(subvol_path
)
232 # unconditionally mark as subvolume, to handle pre-existing subvolumes without the mark
233 self
.mark_subvolume()
235 self
.uid
= int(st
.st_uid
)
236 self
.gid
= int(st
.st_gid
)
237 self
.mode
= int(st
.st_mode
& ~stat
.S_IFMT(st
.st_mode
))
238 except MetadataMgrException
as me
:
239 if me
.errno
== -errno
.ENOENT
:
240 raise VolumeException(-errno
.ENOENT
, "subvolume '{0}' does not exist".format(self
.subvolname
))
241 raise VolumeException(me
.args
[0], me
.args
[1])
242 except cephfs
.ObjectNotFound
:
243 log
.debug("missing subvolume path '{0}' for subvolume '{1}'".format(subvol_path
, self
.subvolname
))
244 raise VolumeException(-errno
.ENOENT
, "mount path missing for subvolume '{0}'".format(self
.subvolname
))
245 except cephfs
.Error
as e
:
246 raise VolumeException(-e
.args
[0], e
.args
[1])
248 def _recover_auth_meta(self
, auth_id
, auth_meta
):
250 Call me after locking the auth meta file.
252 remove_subvolumes
= []
254 for subvol
, subvol_data
in auth_meta
['subvolumes'].items():
255 if not subvol_data
['dirty']:
258 (group_name
, subvol_name
) = subvol
.split('/')
259 group_name
= group_name
if group_name
!= 'None' else Group
.NO_GROUP_NAME
260 access_level
= subvol_data
['access_level']
262 with self
.auth_mdata_mgr
.subvol_metadata_lock(group_name
, subvol_name
):
263 subvol_meta
= self
.auth_mdata_mgr
.subvol_metadata_get(group_name
, subvol_name
)
265 # No SVMeta update indicates that there was no auth update
266 # in Ceph either. So it's safe to remove corresponding
267 # partial update in AMeta.
268 if not subvol_meta
or auth_id
not in subvol_meta
['auths']:
269 remove_subvolumes
.append(subvol
)
273 'access_level': access_level
,
276 # SVMeta update looks clean. Ceph auth update must have been
277 # clean. Update the dirty flag and continue
278 if subvol_meta
['auths'][auth_id
] == want_auth
:
279 auth_meta
['subvolumes'][subvol
]['dirty'] = False
280 self
.auth_mdata_mgr
.auth_metadata_set(auth_id
, auth_meta
)
283 client_entity
= "client.{0}".format(auth_id
)
284 ret
, out
, err
= self
.mgr
.mon_command(
286 'prefix': 'auth get',
287 'entity': client_entity
,
291 existing_caps
= json
.loads(out
)
292 elif ret
== -errno
.ENOENT
:
296 raise VolumeException(ret
, err
)
298 self
._authorize
_subvolume
(auth_id
, access_level
, existing_caps
)
300 # Recovered from partial auth updates for the auth ID's access
302 auth_meta
['subvolumes'][subvol
]['dirty'] = False
303 self
.auth_mdata_mgr
.auth_metadata_set(auth_id
, auth_meta
)
305 for subvol
in remove_subvolumes
:
306 del auth_meta
['subvolumes'][subvol
]
308 if not auth_meta
['subvolumes']:
309 # Clean up auth meta file
310 self
.fs
.unlink(self
.auth_mdata_mgr
._auth
_metadata
_path
(auth_id
))
313 # Recovered from all partial auth updates for the auth ID.
314 auth_meta
['dirty'] = False
315 self
.auth_mdata_mgr
.auth_metadata_set(auth_id
, auth_meta
)
317 def authorize(self
, auth_id
, access_level
, tenant_id
=None, allow_existing_id
=False):
319 Get-or-create a Ceph auth identity for `auth_id` and grant them access
323 :param tenant_id: Optionally provide a stringizable object to
324 restrict any created cephx IDs to other callers
325 passing the same tenant ID.
326 :allow_existing_id: Optionally authorize existing auth-ids not
327 created by ceph_volume_client.
331 with self
.auth_mdata_mgr
.auth_lock(auth_id
):
332 client_entity
= "client.{0}".format(auth_id
)
333 ret
, out
, err
= self
.mgr
.mon_command(
335 'prefix': 'auth get',
336 'entity': client_entity
,
341 existing_caps
= json
.loads(out
)
342 elif ret
== -errno
.ENOENT
:
346 raise VolumeException(ret
, err
)
348 # Existing meta, or None, to be updated
349 auth_meta
= self
.auth_mdata_mgr
.auth_metadata_get(auth_id
)
351 # subvolume data to be inserted
352 group_name
= self
.group
.groupname
if self
.group
.groupname
!= Group
.NO_GROUP_NAME
else None
353 group_subvol_id
= "{0}/{1}".format(group_name
, self
.subvolname
)
356 # The access level at which the auth_id is authorized to
358 'access_level': access_level
,
363 if auth_meta
is None:
364 if not allow_existing_id
and existing_caps
is not None:
365 msg
= "auth ID: {0} exists and not created by mgr plugin. Not allowed to modify".format(auth_id
)
367 raise VolumeException(-errno
.EPERM
, msg
)
369 # non-existent auth IDs
370 sys
.stderr
.write("Creating meta for ID {0} with tenant {1}\n".format(
373 log
.debug("Authorize: no existing meta")
376 'tenant_id': str(tenant_id
) if tenant_id
else None,
377 'subvolumes': subvolume
380 # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key
381 if 'volumes' in auth_meta
:
382 auth_meta
['subvolumes'] = auth_meta
.pop('volumes')
384 # Disallow tenants to share auth IDs
385 if str(auth_meta
['tenant_id']) != str(tenant_id
):
386 msg
= "auth ID: {0} is already in use".format(auth_id
)
388 raise VolumeException(-errno
.EPERM
, msg
)
390 if auth_meta
['dirty']:
391 self
._recover
_auth
_meta
(auth_id
, auth_meta
)
393 log
.debug("Authorize: existing tenant {tenant}".format(
394 tenant
=auth_meta
['tenant_id']
396 auth_meta
['dirty'] = True
397 auth_meta
['subvolumes'].update(subvolume
)
399 self
.auth_mdata_mgr
.auth_metadata_set(auth_id
, auth_meta
)
401 with self
.auth_mdata_mgr
.subvol_metadata_lock(self
.group
.groupname
, self
.subvolname
):
402 key
= self
._authorize
_subvolume
(auth_id
, access_level
, existing_caps
)
404 auth_meta
['dirty'] = False
405 auth_meta
['subvolumes'][group_subvol_id
]['dirty'] = False
406 self
.auth_mdata_mgr
.auth_metadata_set(auth_id
, auth_meta
)
411 # Caller wasn't multi-tenant aware: be safe and don't give
415 def _authorize_subvolume(self
, auth_id
, access_level
, existing_caps
):
416 subvol_meta
= self
.auth_mdata_mgr
.subvol_metadata_get(self
.group
.groupname
, self
.subvolname
)
420 'access_level': access_level
,
425 if subvol_meta
is None:
430 subvol_meta
['auths'].update(auth
)
431 self
.auth_mdata_mgr
.subvol_metadata_set(self
.group
.groupname
, self
.subvolname
, subvol_meta
)
433 key
= self
._authorize
(auth_id
, access_level
, existing_caps
)
435 subvol_meta
['auths'][auth_id
]['dirty'] = False
436 self
.auth_mdata_mgr
.subvol_metadata_set(self
.group
.groupname
, self
.subvolname
, subvol_meta
)
440 def _authorize(self
, auth_id
, access_level
, existing_caps
):
441 subvol_path
= self
.path
442 log
.debug("Authorizing Ceph id '{0}' for path '{1}'".format(auth_id
, subvol_path
))
444 # First I need to work out what the data pool is for this share:
447 pool
= self
.fs
.getxattr(subvol_path
, 'ceph.dir.layout.pool').decode('utf-8')
448 except cephfs
.Error
as e
:
449 raise VolumeException(-e
.args
[0], e
.args
[1])
452 namespace
= self
.fs
.getxattr(subvol_path
, 'ceph.dir.layout.pool_namespace').decode('utf-8')
453 except cephfs
.NoData
:
456 # Now construct auth capabilities that give the guest just enough
457 # permissions to access the share
458 client_entity
= "client.{0}".format(auth_id
)
459 want_mds_cap
= "allow {0} path={1}".format(access_level
, subvol_path
.decode('utf-8'))
460 want_osd_cap
= "allow {0} pool={1}{2}".format(
461 access_level
, pool
, " namespace={0}".format(namespace
) if namespace
else "")
463 # Construct auth caps that if present might conflict with the desired
465 unwanted_access_level
= 'r' if access_level
== 'rw' else 'rw'
466 unwanted_mds_cap
= 'allow {0} path={1}'.format(unwanted_access_level
, subvol_path
.decode('utf-8'))
467 unwanted_osd_cap
= "allow {0} pool={1}{2}".format(
468 unwanted_access_level
, pool
, " namespace={0}".format(namespace
) if namespace
else "")
470 return allow_access(self
.mgr
, client_entity
, want_mds_cap
, want_osd_cap
,
471 unwanted_mds_cap
, unwanted_osd_cap
, existing_caps
)
473 def deauthorize(self
, auth_id
):
474 with self
.auth_mdata_mgr
.auth_lock(auth_id
):
475 # Existing meta, or None, to be updated
476 auth_meta
= self
.auth_mdata_mgr
.auth_metadata_get(auth_id
)
478 if auth_meta
is None:
479 msg
= "auth ID: {0} doesn't exist".format(auth_id
)
481 raise VolumeException(-errno
.ENOENT
, msg
)
483 # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key
484 if 'volumes' in auth_meta
:
485 auth_meta
['subvolumes'] = auth_meta
.pop('volumes')
487 group_name
= self
.group
.groupname
if self
.group
.groupname
!= Group
.NO_GROUP_NAME
else None
488 group_subvol_id
= "{0}/{1}".format(group_name
, self
.subvolname
)
489 if (auth_meta
is None) or (not auth_meta
['subvolumes']):
490 log
.warning("deauthorized called for already-removed auth"
491 "ID '{auth_id}' for subvolume '{subvolume}'".format(
492 auth_id
=auth_id
, subvolume
=self
.subvolname
494 # Clean up the auth meta file of an auth ID
495 self
.fs
.unlink(self
.auth_mdata_mgr
._auth
_metadata
_path
(auth_id
))
498 if group_subvol_id
not in auth_meta
['subvolumes']:
499 log
.warning("deauthorized called for already-removed auth"
500 "ID '{auth_id}' for subvolume '{subvolume}'".format(
501 auth_id
=auth_id
, subvolume
=self
.subvolname
505 if auth_meta
['dirty']:
506 self
._recover
_auth
_meta
(auth_id
, auth_meta
)
508 auth_meta
['dirty'] = True
509 auth_meta
['subvolumes'][group_subvol_id
]['dirty'] = True
510 self
.auth_mdata_mgr
.auth_metadata_set(auth_id
, auth_meta
)
512 self
._deauthorize
_subvolume
(auth_id
)
514 # Filter out the volume we're deauthorizing
515 del auth_meta
['subvolumes'][group_subvol_id
]
517 # Clean up auth meta file
518 if not auth_meta
['subvolumes']:
519 self
.fs
.unlink(self
.auth_mdata_mgr
._auth
_metadata
_path
(auth_id
))
522 auth_meta
['dirty'] = False
523 self
.auth_mdata_mgr
.auth_metadata_set(auth_id
, auth_meta
)
525 def _deauthorize_subvolume(self
, auth_id
):
526 with self
.auth_mdata_mgr
.subvol_metadata_lock(self
.group
.groupname
, self
.subvolname
):
527 subvol_meta
= self
.auth_mdata_mgr
.subvol_metadata_get(self
.group
.groupname
, self
.subvolname
)
529 if (subvol_meta
is None) or (auth_id
not in subvol_meta
['auths']):
530 log
.warning("deauthorized called for already-removed auth"
531 "ID '{auth_id}' for subvolume '{subvolume}'".format(
532 auth_id
=auth_id
, subvolume
=self
.subvolname
536 subvol_meta
['auths'][auth_id
]['dirty'] = True
537 self
.auth_mdata_mgr
.subvol_metadata_set(self
.group
.groupname
, self
.subvolname
, subvol_meta
)
539 self
._deauthorize
(auth_id
)
541 # Remove the auth_id from the metadata *after* removing it
542 # from ceph, so that if we crashed here, we would actually
543 # recreate the auth ID during recovery (i.e. end up with
544 # a consistent state).
546 # Filter out the auth we're removing
547 del subvol_meta
['auths'][auth_id
]
548 self
.auth_mdata_mgr
.subvol_metadata_set(self
.group
.groupname
, self
.subvolname
, subvol_meta
)
550 def _deauthorize(self
, auth_id
):
552 The volume must still exist.
554 client_entity
= "client.{0}".format(auth_id
)
555 subvol_path
= self
.path
557 pool_name
= self
.fs
.getxattr(subvol_path
, 'ceph.dir.layout.pool').decode('utf-8')
558 except cephfs
.Error
as e
:
559 raise VolumeException(-e
.args
[0], e
.args
[1])
562 namespace
= self
.fs
.getxattr(subvol_path
, 'ceph.dir.layout.pool_namespace').decode('utf-8')
563 except cephfs
.NoData
:
566 # The auth_id might have read-only or read-write mount access for the
568 access_levels
= ('r', 'rw')
569 want_mds_caps
= ['allow {0} path={1}'.format(access_level
, subvol_path
.decode('utf-8'))
570 for access_level
in access_levels
]
571 want_osd_caps
= ['allow {0} pool={1}{2}'.format(
572 access_level
, pool_name
, " namespace={0}".format(namespace
) if namespace
else "")
573 for access_level
in access_levels
]
574 deny_access(self
.mgr
, client_entity
, want_mds_caps
, want_osd_caps
)
576 def authorized_list(self
):
578 Expose a list of auth IDs that have access to a subvolume.
580 return: a list of (auth_id, access_level) tuples, where
581 the access_level can be 'r' , or 'rw'.
582 None if no auth ID is given access to the subvolume.
584 with self
.auth_mdata_mgr
.subvol_metadata_lock(self
.group
.groupname
, self
.subvolname
):
585 meta
= self
.auth_mdata_mgr
.subvol_metadata_get(self
.group
.groupname
, self
.subvolname
)
586 auths
= [] # type: List[Dict[str,str]]
587 if not meta
or not meta
['auths']:
590 for auth
, auth_data
in meta
['auths'].items():
591 # Skip partial auth updates.
592 if not auth_data
['dirty']:
593 auths
.append({auth
: auth_data
['access_level']})
597 def evict(self
, volname
, auth_id
, timeout
=30):
599 Evict all clients based on the authorization ID and the subvolume path mounted.
600 Assumes that the authorization key has been revoked prior to calling this function.
602 This operation can throw an exception if the mon cluster is unresponsive, or
603 any individual MDS daemon is unresponsive for longer than the timeout passed in.
606 client_spec
= ["auth_name={0}".format(auth_id
), ]
607 client_spec
.append("client_metadata.root={0}".
608 format(self
.path
.decode('utf-8')))
610 log
.info("evict clients with {0}".format(', '.join(client_spec
)))
612 mds_map
= get_mds_map(self
.mgr
, volname
)
614 raise VolumeException(-errno
.ENOENT
, "mdsmap for volume {0} not found".format(volname
))
617 for name
, gid
in mds_map
['up'].items():
618 # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
619 assert name
.startswith("mds_")
620 up
[int(name
[4:])] = gid
622 # For all MDS ranks held by a daemon
623 # Do the parallelism in python instead of using "tell mds.*", because
624 # the latter doesn't give us per-mds output
626 for rank
, gid
in up
.items():
627 thread
= RankEvicter(self
.mgr
, self
.fs
, client_spec
, volname
, rank
, gid
, mds_map
, timeout
)
629 threads
.append(thread
)
634 log
.info("evict: joined all")
638 msg
= ("Failed to evict client with {0} from mds {1}/{2}: {3}".
639 format(', '.join(client_spec
), t
.rank
, t
.gid
, t
.exception
)
642 raise EvictionError(msg
)
644 def _get_clone_source(self
):
647 'volume' : self
.metadata_mgr
.get_option("source", "volume"),
648 'subvolume': self
.metadata_mgr
.get_option("source", "subvolume"),
649 'snapshot' : self
.metadata_mgr
.get_option("source", "snapshot"),
653 clone_source
["group"] = self
.metadata_mgr
.get_option("source", "group")
654 except MetadataMgrException
as me
:
655 if me
.errno
== -errno
.ENOENT
:
659 except MetadataMgrException
as me
:
660 raise VolumeException(-errno
.EINVAL
, "error fetching subvolume metadata")
665 state
= SubvolumeStates
.from_value(self
.metadata_mgr
.get_global_option(MetadataManager
.GLOBAL_META_KEY_STATE
))
666 subvolume_type
= self
.subvol_type
668 'state' : state
.value
670 if not SubvolumeOpSm
.is_complete_state(state
) and subvolume_type
== SubvolumeTypes
.TYPE_CLONE
:
671 subvolume_status
["source"] = self
._get
_clone
_source
()
672 return subvolume_status
676 return SubvolumeStates
.from_value(self
.metadata_mgr
.get_global_option(MetadataManager
.GLOBAL_META_KEY_STATE
))
679 def state(self
, val
):
682 self
.metadata_mgr
.update_global_section(MetadataManager
.GLOBAL_META_KEY_STATE
, state
)
684 self
.metadata_mgr
.flush()
686 def remove(self
, retainsnaps
=False):
688 raise VolumeException(-errno
.EINVAL
, "subvolume '{0}' does not support snapshot retention on delete".format(self
.subvolname
))
689 if self
.list_snapshots():
690 raise VolumeException(-errno
.ENOTEMPTY
, "subvolume '{0}' has snapshots".format(self
.subvolname
))
691 self
.trash_base_dir()
693 def resize(self
, newsize
, noshrink
):
694 subvol_path
= self
.path
695 return self
._resize
(subvol_path
, newsize
, noshrink
)
697 def create_snapshot(self
, snapname
):
699 group_snapshot_path
= os
.path
.join(self
.group
.path
,
700 self
.vol_spec
.snapshot_dir_prefix
.encode('utf-8'),
701 snapname
.encode('utf-8'))
702 self
.fs
.stat(group_snapshot_path
)
703 except cephfs
.Error
as e
:
704 if e
.args
[0] == errno
.ENOENT
:
705 snappath
= self
.snapshot_path(snapname
)
706 mksnap(self
.fs
, snappath
)
708 raise VolumeException(-e
.args
[0], e
.args
[1])
710 raise VolumeException(-errno
.EINVAL
, "subvolumegroup and subvolume snapshot name can't be same")
712 def has_pending_clones(self
, snapname
):
714 return self
.metadata_mgr
.section_has_item('clone snaps', snapname
)
715 except MetadataMgrException
as me
:
716 if me
.errno
== -errno
.ENOENT
:
720 def remove_snapshot(self
, snapname
):
721 if self
.has_pending_clones(snapname
):
722 raise VolumeException(-errno
.EAGAIN
, "snapshot '{0}' has pending clones".format(snapname
))
723 snappath
= self
.snapshot_path(snapname
)
724 rmsnap(self
.fs
, snappath
)
726 def snapshot_info(self
, snapname
):
727 if is_inherited_snap(snapname
):
728 raise VolumeException(-errno
.EINVAL
,
729 "snapshot name '{0}' is invalid".format(snapname
))
730 snappath
= self
.snapshot_data_path(snapname
)
733 snap_attrs
= {'created_at':'ceph.snap.btime', 'size':'ceph.dir.rbytes',
734 'data_pool':'ceph.dir.layout.pool'}
735 for key
, val
in snap_attrs
.items():
736 snap_info
[key
] = self
.fs
.getxattr(snappath
, val
)
737 return {'size': int(snap_info
['size']),
738 'created_at': str(datetime
.fromtimestamp(float(snap_info
['created_at']))),
739 'data_pool': snap_info
['data_pool'].decode('utf-8'),
740 'has_pending_clones': "yes" if self
.has_pending_clones(snapname
) else "no"}
741 except cephfs
.Error
as e
:
742 if e
.errno
== errno
.ENOENT
:
743 raise VolumeException(-errno
.ENOENT
,
744 "snapshot '{0}' does not exist".format(snapname
))
745 raise VolumeException(-e
.args
[0], e
.args
[1])
747 def list_snapshots(self
):
749 dirpath
= self
.snapshot_base_path()
750 return listsnaps(self
.fs
, self
.vol_spec
, dirpath
, filter_inherited_snaps
=True)
751 except VolumeException
as ve
:
752 if ve
.errno
== -errno
.ENOENT
:
756 def _add_snap_clone(self
, track_id
, snapname
):
757 self
.metadata_mgr
.add_section("clone snaps")
758 self
.metadata_mgr
.update_section("clone snaps", track_id
, snapname
)
759 self
.metadata_mgr
.flush()
761 def _remove_snap_clone(self
, track_id
):
762 self
.metadata_mgr
.remove_option("clone snaps", track_id
)
763 self
.metadata_mgr
.flush()
765 def attach_snapshot(self
, snapname
, tgt_subvolume
):
766 if not snapname
.encode('utf-8') in self
.list_snapshots():
767 raise VolumeException(-errno
.ENOENT
, "snapshot '{0}' does not exist".format(snapname
))
769 create_clone_index(self
.fs
, self
.vol_spec
)
770 with
open_clone_index(self
.fs
, self
.vol_spec
) as index
:
771 track_idx
= index
.track(tgt_subvolume
.base_path
)
772 self
._add
_snap
_clone
(track_idx
, snapname
)
773 except (IndexException
, MetadataMgrException
) as e
:
774 log
.warning("error creating clone index: {0}".format(e
))
775 raise VolumeException(-errno
.EINVAL
, "error cloning subvolume")
777 def detach_snapshot(self
, snapname
, track_id
):
778 if not snapname
.encode('utf-8') in self
.list_snapshots():
779 raise VolumeException(-errno
.ENOENT
, "snapshot '{0}' does not exist".format(snapname
))
781 with
open_clone_index(self
.fs
, self
.vol_spec
) as index
:
782 index
.untrack(track_id
)
783 self
._remove
_snap
_clone
(track_id
)
784 except (IndexException
, MetadataMgrException
) as e
:
785 log
.warning("error delining snapshot from clone: {0}".format(e
))
786 raise VolumeException(-errno
.EINVAL
, "error delinking snapshot from clone")