8 from datetime
import datetime
9 from typing
import List
, Dict
13 from .metadata_manager
import MetadataManager
14 from .subvolume_attrs
import SubvolumeTypes
, SubvolumeStates
, SubvolumeFeatures
15 from .op_sm
import SubvolumeOpSm
16 from .subvolume_base
import SubvolumeBase
17 from ..template
import SubvolumeTemplate
18 from ..snapshot_util
import mksnap
, rmsnap
19 from ..access
import allow_access
, deny_access
20 from ...exception
import IndexException
, OpSmException
, VolumeException
, MetadataMgrException
, EvictionError
21 from ...fs_util
import listsnaps
, is_inherited_snap
22 from ..template
import SubvolumeOpType
23 from ..group
import Group
24 from ..rankevicter
import RankEvicter
25 from ..volume
import get_mds_map
27 from ..clone_index
import open_clone_index
, create_clone_index
29 log
= logging
.getLogger(__name__
)
31 class SubvolumeV1(SubvolumeBase
, SubvolumeTemplate
):
33 Version 1 subvolumes creates a subvolume with path as follows,
34 volumes/<group-name>/<subvolume-name>/<uuid>/
36 - The directory under which user data resides is <uuid>
37 - Snapshots of the subvolume are taken within the <uuid> directory
38 - A meta file is maintained under the <subvolume-name> directory as a metadata store, typically storing,
39 - global information about the subvolume (version, path, type, state)
40 - snapshots attached to an ongoing clone operation
41 - clone snapshot source if subvolume is a clone of a snapshot
42 - It retains backward compatability with legacy subvolumes by creating the meta file for legacy subvolumes under
43 /volumes/_legacy/ (see legacy_config_path), thus allowing cloning of older legacy volumes that lack the <uuid>
44 component in the path.
50 return SubvolumeV1
.VERSION
55 # no need to stat the path -- open() does that
56 return self
.metadata_mgr
.get_global_option(MetadataManager
.GLOBAL_META_KEY_PATH
).encode('utf-8')
57 except MetadataMgrException
as me
:
58 raise VolumeException(-errno
.EINVAL
, "error fetching subvolume metadata")
62 return [SubvolumeFeatures
.FEATURE_SNAPSHOT_CLONE
.value
, SubvolumeFeatures
.FEATURE_SNAPSHOT_AUTOPROTECT
.value
]
64 def mark_subvolume(self
):
65 # set subvolume attr, on subvolume root, marking it as a CephFS subvolume
66 # subvolume root is where snapshots would be taken, and hence is the <uuid> dir for v1 subvolumes
68 # MDS treats this as a noop for already marked subvolume
69 self
.fs
.setxattr(self
.path
, 'ceph.dir.subvolume', b
'1', 0)
70 except cephfs
.InvalidValue
as e
:
71 raise VolumeException(-errno
.EINVAL
, "invalid value specified for ceph.dir.subvolume")
72 except cephfs
.Error
as e
:
73 raise VolumeException(-e
.args
[0], e
.args
[1])
75 def snapshot_base_path(self
):
76 """ Base path for all snapshots """
77 return os
.path
.join(self
.path
, self
.vol_spec
.snapshot_dir_prefix
.encode('utf-8'))
79 def snapshot_path(self
, snapname
):
80 """ Path to a specific snapshot named 'snapname' """
81 return os
.path
.join(self
.snapshot_base_path(), snapname
.encode('utf-8'))
83 def snapshot_data_path(self
, snapname
):
84 """ Path to user data directory within a subvolume snapshot named 'snapname' """
85 return self
.snapshot_path(snapname
)
87 def create(self
, size
, isolate_nspace
, pool
, mode
, uid
, gid
):
88 subvolume_type
= SubvolumeTypes
.TYPE_NORMAL
90 initial_state
= SubvolumeOpSm
.get_init_state(subvolume_type
)
91 except OpSmException
as oe
:
92 raise VolumeException(-errno
.EINVAL
, "subvolume creation failed: internal error")
94 subvol_path
= os
.path
.join(self
.base_path
, str(uuid
.uuid4()).encode('utf-8'))
96 # create directory and set attributes
97 self
.fs
.mkdirs(subvol_path
, mode
)
103 'pool_namespace': self
.namespace
if isolate_nspace
else None,
106 self
.set_attrs(subvol_path
, attrs
)
108 # persist subvolume metadata
109 qpath
= subvol_path
.decode('utf-8')
110 self
.init_config(SubvolumeV1
.VERSION
, subvolume_type
, qpath
, initial_state
)
111 except (VolumeException
, MetadataMgrException
, cephfs
.Error
) as e
:
113 log
.info("cleaning up subvolume with path: {0}".format(self
.subvolname
))
115 except VolumeException
as ve
:
116 log
.info("failed to cleanup subvolume '{0}' ({1})".format(self
.subvolname
, ve
))
118 if isinstance(e
, MetadataMgrException
):
119 log
.error("metadata manager exception: {0}".format(e
))
120 e
= VolumeException(-errno
.EINVAL
, "exception in subvolume metadata")
121 elif isinstance(e
, cephfs
.Error
):
122 e
= VolumeException(-e
.args
[0], e
.args
[1])
125 def add_clone_source(self
, volname
, subvolume
, snapname
, flush
=False):
126 self
.metadata_mgr
.add_section("source")
127 self
.metadata_mgr
.update_section("source", "volume", volname
)
128 if not subvolume
.group
.is_default_group():
129 self
.metadata_mgr
.update_section("source", "group", subvolume
.group_name
)
130 self
.metadata_mgr
.update_section("source", "subvolume", subvolume
.subvol_name
)
131 self
.metadata_mgr
.update_section("source", "snapshot", snapname
)
133 self
.metadata_mgr
.flush()
135 def remove_clone_source(self
, flush
=False):
136 self
.metadata_mgr
.remove_section("source")
138 self
.metadata_mgr
.flush()
140 def create_clone(self
, pool
, source_volname
, source_subvolume
, snapname
):
141 subvolume_type
= SubvolumeTypes
.TYPE_CLONE
143 initial_state
= SubvolumeOpSm
.get_init_state(subvolume_type
)
144 except OpSmException
as oe
:
145 raise VolumeException(-errno
.EINVAL
, "clone failed: internal error")
147 subvol_path
= os
.path
.join(self
.base_path
, str(uuid
.uuid4()).encode('utf-8'))
149 # source snapshot attrs are used to create clone subvolume.
150 # attributes of subvolume's content though, are synced during the cloning process.
151 attrs
= source_subvolume
.get_attrs(source_subvolume
.snapshot_data_path(snapname
))
153 # override snapshot pool setting, if one is provided for the clone
155 attrs
["data_pool"] = pool
156 attrs
["pool_namespace"] = None
158 # create directory and set attributes
159 self
.fs
.mkdirs(subvol_path
, attrs
.get("mode"))
160 self
.mark_subvolume()
161 self
.set_attrs(subvol_path
, attrs
)
163 # persist subvolume metadata and clone source
164 qpath
= subvol_path
.decode('utf-8')
165 self
.metadata_mgr
.init(SubvolumeV1
.VERSION
, subvolume_type
.value
, qpath
, initial_state
.value
)
166 self
.add_clone_source(source_volname
, source_subvolume
, snapname
)
167 self
.metadata_mgr
.flush()
168 except (VolumeException
, MetadataMgrException
, cephfs
.Error
) as e
:
170 log
.info("cleaning up subvolume with path: {0}".format(self
.subvolname
))
172 except VolumeException
as ve
:
173 log
.info("failed to cleanup subvolume '{0}' ({1})".format(self
.subvolname
, ve
))
175 if isinstance(e
, MetadataMgrException
):
176 log
.error("metadata manager exception: {0}".format(e
))
177 e
= VolumeException(-errno
.EINVAL
, "exception in subvolume metadata")
178 elif isinstance(e
, cephfs
.Error
):
179 e
= VolumeException(-e
.args
[0], e
.args
[1])
182 def allowed_ops_by_type(self
, vol_type
):
183 if vol_type
== SubvolumeTypes
.TYPE_CLONE
:
184 return {op_type
for op_type
in SubvolumeOpType
}
186 if vol_type
== SubvolumeTypes
.TYPE_NORMAL
:
187 return {op_type
for op_type
in SubvolumeOpType
} - {SubvolumeOpType
.CLONE_STATUS
,
188 SubvolumeOpType
.CLONE_CANCEL
,
189 SubvolumeOpType
.CLONE_INTERNAL
}
193 def allowed_ops_by_state(self
, vol_state
):
194 if vol_state
== SubvolumeStates
.STATE_COMPLETE
:
195 return {op_type
for op_type
in SubvolumeOpType
}
197 return {SubvolumeOpType
.REMOVE_FORCE
,
198 SubvolumeOpType
.CLONE_CREATE
,
199 SubvolumeOpType
.CLONE_STATUS
,
200 SubvolumeOpType
.CLONE_CANCEL
,
201 SubvolumeOpType
.CLONE_INTERNAL
}
203 def open(self
, op_type
):
204 if not isinstance(op_type
, SubvolumeOpType
):
205 raise VolumeException(-errno
.ENOTSUP
, "operation {0} not supported on subvolume '{1}'".format(
206 op_type
.value
, self
.subvolname
))
208 self
.metadata_mgr
.refresh()
210 etype
= self
.subvol_type
211 if op_type
not in self
.allowed_ops_by_type(etype
):
212 raise VolumeException(-errno
.ENOTSUP
, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format(
213 op_type
.value
, self
.subvolname
, etype
.value
))
216 if op_type
not in self
.allowed_ops_by_state(estate
):
217 raise VolumeException(-errno
.EAGAIN
, "subvolume '{0}' is not ready for operation {1}".format(
218 self
.subvolname
, op_type
.value
))
220 subvol_path
= self
.path
221 log
.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path
))
222 st
= self
.fs
.stat(subvol_path
)
223 # unconditionally mark as subvolume, to handle pre-existing subvolumes without the mark
224 self
.mark_subvolume()
226 self
.uid
= int(st
.st_uid
)
227 self
.gid
= int(st
.st_gid
)
228 self
.mode
= int(st
.st_mode
& ~stat
.S_IFMT(st
.st_mode
))
229 except MetadataMgrException
as me
:
230 if me
.errno
== -errno
.ENOENT
:
231 raise VolumeException(-errno
.ENOENT
, "subvolume '{0}' does not exist".format(self
.subvolname
))
232 raise VolumeException(me
.args
[0], me
.args
[1])
233 except cephfs
.ObjectNotFound
:
234 log
.debug("missing subvolume path '{0}' for subvolume '{1}'".format(subvol_path
, self
.subvolname
))
235 raise VolumeException(-errno
.ENOENT
, "mount path missing for subvolume '{0}'".format(self
.subvolname
))
236 except cephfs
.Error
as e
:
237 raise VolumeException(-e
.args
[0], e
.args
[1])
239 def _recover_auth_meta(self
, auth_id
, auth_meta
):
241 Call me after locking the auth meta file.
243 remove_subvolumes
= []
245 for subvol
, subvol_data
in auth_meta
['subvolumes'].items():
246 if not subvol_data
['dirty']:
249 (group_name
, subvol_name
) = subvol
.split('/')
250 group_name
= group_name
if group_name
!= 'None' else Group
.NO_GROUP_NAME
251 access_level
= subvol_data
['access_level']
253 with self
.auth_mdata_mgr
.subvol_metadata_lock(group_name
, subvol_name
):
254 subvol_meta
= self
.auth_mdata_mgr
.subvol_metadata_get(group_name
, subvol_name
)
256 # No SVMeta update indicates that there was no auth update
257 # in Ceph either. So it's safe to remove corresponding
258 # partial update in AMeta.
259 if not subvol_meta
or auth_id
not in subvol_meta
['auths']:
260 remove_subvolumes
.append(subvol
)
264 'access_level': access_level
,
267 # SVMeta update looks clean. Ceph auth update must have been
268 # clean. Update the dirty flag and continue
269 if subvol_meta
['auths'][auth_id
] == want_auth
:
270 auth_meta
['subvolumes'][subvol
]['dirty'] = False
271 self
.auth_mdata_mgr
.auth_metadata_set(auth_id
, auth_meta
)
274 client_entity
= "client.{0}".format(auth_id
)
275 ret
, out
, err
= self
.mgr
.mon_command(
277 'prefix': 'auth get',
278 'entity': client_entity
,
282 existing_caps
= json
.loads(out
)
283 elif ret
== -errno
.ENOENT
:
287 raise VolumeException(ret
, err
)
289 self
._authorize
_subvolume
(auth_id
, access_level
, existing_caps
)
291 # Recovered from partial auth updates for the auth ID's access
293 auth_meta
['subvolumes'][subvol
]['dirty'] = False
294 self
.auth_mdata_mgr
.auth_metadata_set(auth_id
, auth_meta
)
296 for subvol
in remove_subvolumes
:
297 del auth_meta
['subvolumes'][subvol
]
299 if not auth_meta
['subvolumes']:
300 # Clean up auth meta file
301 self
.fs
.unlink(self
.auth_mdata_mgr
._auth
_metadata
_path
(auth_id
))
304 # Recovered from all partial auth updates for the auth ID.
305 auth_meta
['dirty'] = False
306 self
.auth_mdata_mgr
.auth_metadata_set(auth_id
, auth_meta
)
308 def authorize(self
, auth_id
, access_level
, tenant_id
=None, allow_existing_id
=False):
310 Get-or-create a Ceph auth identity for `auth_id` and grant them access
314 :param tenant_id: Optionally provide a stringizable object to
315 restrict any created cephx IDs to other callers
316 passing the same tenant ID.
317 :allow_existing_id: Optionally authorize existing auth-ids not
318 created by ceph_volume_client.
322 with self
.auth_mdata_mgr
.auth_lock(auth_id
):
323 client_entity
= "client.{0}".format(auth_id
)
324 ret
, out
, err
= self
.mgr
.mon_command(
326 'prefix': 'auth get',
327 'entity': client_entity
,
332 existing_caps
= json
.loads(out
)
333 elif ret
== -errno
.ENOENT
:
337 raise VolumeException(ret
, err
)
339 # Existing meta, or None, to be updated
340 auth_meta
= self
.auth_mdata_mgr
.auth_metadata_get(auth_id
)
342 # subvolume data to be inserted
343 group_name
= self
.group
.groupname
if self
.group
.groupname
!= Group
.NO_GROUP_NAME
else None
344 group_subvol_id
= "{0}/{1}".format(group_name
, self
.subvolname
)
347 # The access level at which the auth_id is authorized to
349 'access_level': access_level
,
354 if auth_meta
is None:
355 if not allow_existing_id
and existing_caps
is not None:
356 msg
= "auth ID: {0} exists and not created by mgr plugin. Not allowed to modify".format(auth_id
)
358 raise VolumeException(-errno
.EPERM
, msg
)
360 # non-existent auth IDs
361 sys
.stderr
.write("Creating meta for ID {0} with tenant {1}\n".format(
364 log
.debug("Authorize: no existing meta")
367 'tenant_id': str(tenant_id
) if tenant_id
else None,
368 'subvolumes': subvolume
371 # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key
372 if 'volumes' in auth_meta
:
373 auth_meta
['subvolumes'] = auth_meta
.pop('volumes')
375 # Disallow tenants to share auth IDs
376 if str(auth_meta
['tenant_id']) != str(tenant_id
):
377 msg
= "auth ID: {0} is already in use".format(auth_id
)
379 raise VolumeException(-errno
.EPERM
, msg
)
381 if auth_meta
['dirty']:
382 self
._recover
_auth
_meta
(auth_id
, auth_meta
)
384 log
.debug("Authorize: existing tenant {tenant}".format(
385 tenant
=auth_meta
['tenant_id']
387 auth_meta
['dirty'] = True
388 auth_meta
['subvolumes'].update(subvolume
)
390 self
.auth_mdata_mgr
.auth_metadata_set(auth_id
, auth_meta
)
392 with self
.auth_mdata_mgr
.subvol_metadata_lock(self
.group
.groupname
, self
.subvolname
):
393 key
= self
._authorize
_subvolume
(auth_id
, access_level
, existing_caps
)
395 auth_meta
['dirty'] = False
396 auth_meta
['subvolumes'][group_subvol_id
]['dirty'] = False
397 self
.auth_mdata_mgr
.auth_metadata_set(auth_id
, auth_meta
)
402 # Caller wasn't multi-tenant aware: be safe and don't give
406 def _authorize_subvolume(self
, auth_id
, access_level
, existing_caps
):
407 subvol_meta
= self
.auth_mdata_mgr
.subvol_metadata_get(self
.group
.groupname
, self
.subvolname
)
411 'access_level': access_level
,
416 if subvol_meta
is None:
421 subvol_meta
['auths'].update(auth
)
422 self
.auth_mdata_mgr
.subvol_metadata_set(self
.group
.groupname
, self
.subvolname
, subvol_meta
)
424 key
= self
._authorize
(auth_id
, access_level
, existing_caps
)
426 subvol_meta
['auths'][auth_id
]['dirty'] = False
427 self
.auth_mdata_mgr
.subvol_metadata_set(self
.group
.groupname
, self
.subvolname
, subvol_meta
)
431 def _authorize(self
, auth_id
, access_level
, existing_caps
):
432 subvol_path
= self
.path
433 log
.debug("Authorizing Ceph id '{0}' for path '{1}'".format(auth_id
, subvol_path
))
435 # First I need to work out what the data pool is for this share:
438 pool
= self
.fs
.getxattr(subvol_path
, 'ceph.dir.layout.pool').decode('utf-8')
439 except cephfs
.Error
as e
:
440 raise VolumeException(-e
.args
[0], e
.args
[1])
443 namespace
= self
.fs
.getxattr(subvol_path
, 'ceph.dir.layout.pool_namespace').decode('utf-8')
444 except cephfs
.NoData
:
447 # Now construct auth capabilities that give the guest just enough
448 # permissions to access the share
449 client_entity
= "client.{0}".format(auth_id
)
450 want_mds_cap
= "allow {0} path={1}".format(access_level
, subvol_path
.decode('utf-8'))
451 want_osd_cap
= "allow {0} pool={1}{2}".format(
452 access_level
, pool
, " namespace={0}".format(namespace
) if namespace
else "")
454 # Construct auth caps that if present might conflict with the desired
456 unwanted_access_level
= 'r' if access_level
== 'rw' else 'rw'
457 unwanted_mds_cap
= 'allow {0} path={1}'.format(unwanted_access_level
, subvol_path
.decode('utf-8'))
458 unwanted_osd_cap
= "allow {0} pool={1}{2}".format(
459 unwanted_access_level
, pool
, " namespace={0}".format(namespace
) if namespace
else "")
461 return allow_access(self
.mgr
, client_entity
, want_mds_cap
, want_osd_cap
,
462 unwanted_mds_cap
, unwanted_osd_cap
, existing_caps
)
464 def deauthorize(self
, auth_id
):
465 with self
.auth_mdata_mgr
.auth_lock(auth_id
):
466 # Existing meta, or None, to be updated
467 auth_meta
= self
.auth_mdata_mgr
.auth_metadata_get(auth_id
)
469 if auth_meta
is None:
470 msg
= "auth ID: {0} doesn't exist".format(auth_id
)
472 raise VolumeException(-errno
.ENOENT
, msg
)
474 # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key
475 if 'volumes' in auth_meta
:
476 auth_meta
['subvolumes'] = auth_meta
.pop('volumes')
478 group_name
= self
.group
.groupname
if self
.group
.groupname
!= Group
.NO_GROUP_NAME
else None
479 group_subvol_id
= "{0}/{1}".format(group_name
, self
.subvolname
)
480 if (auth_meta
is None) or (not auth_meta
['subvolumes']):
481 log
.warning("deauthorized called for already-removed auth"
482 "ID '{auth_id}' for subvolume '{subvolume}'".format(
483 auth_id
=auth_id
, subvolume
=self
.subvolname
485 # Clean up the auth meta file of an auth ID
486 self
.fs
.unlink(self
.auth_mdata_mgr
._auth
_metadata
_path
(auth_id
))
489 if group_subvol_id
not in auth_meta
['subvolumes']:
490 log
.warning("deauthorized called for already-removed auth"
491 "ID '{auth_id}' for subvolume '{subvolume}'".format(
492 auth_id
=auth_id
, subvolume
=self
.subvolname
496 if auth_meta
['dirty']:
497 self
._recover
_auth
_meta
(auth_id
, auth_meta
)
499 auth_meta
['dirty'] = True
500 auth_meta
['subvolumes'][group_subvol_id
]['dirty'] = True
501 self
.auth_mdata_mgr
.auth_metadata_set(auth_id
, auth_meta
)
503 self
._deauthorize
_subvolume
(auth_id
)
505 # Filter out the volume we're deauthorizing
506 del auth_meta
['subvolumes'][group_subvol_id
]
508 # Clean up auth meta file
509 if not auth_meta
['subvolumes']:
510 self
.fs
.unlink(self
.auth_mdata_mgr
._auth
_metadata
_path
(auth_id
))
513 auth_meta
['dirty'] = False
514 self
.auth_mdata_mgr
.auth_metadata_set(auth_id
, auth_meta
)
516 def _deauthorize_subvolume(self
, auth_id
):
517 with self
.auth_mdata_mgr
.subvol_metadata_lock(self
.group
.groupname
, self
.subvolname
):
518 subvol_meta
= self
.auth_mdata_mgr
.subvol_metadata_get(self
.group
.groupname
, self
.subvolname
)
520 if (subvol_meta
is None) or (auth_id
not in subvol_meta
['auths']):
521 log
.warning("deauthorized called for already-removed auth"
522 "ID '{auth_id}' for subvolume '{subvolume}'".format(
523 auth_id
=auth_id
, subvolume
=self
.subvolname
527 subvol_meta
['auths'][auth_id
]['dirty'] = True
528 self
.auth_mdata_mgr
.subvol_metadata_set(self
.group
.groupname
, self
.subvolname
, subvol_meta
)
530 self
._deauthorize
(auth_id
)
532 # Remove the auth_id from the metadata *after* removing it
533 # from ceph, so that if we crashed here, we would actually
534 # recreate the auth ID during recovery (i.e. end up with
535 # a consistent state).
537 # Filter out the auth we're removing
538 del subvol_meta
['auths'][auth_id
]
539 self
.auth_mdata_mgr
.subvol_metadata_set(self
.group
.groupname
, self
.subvolname
, subvol_meta
)
541 def _deauthorize(self
, auth_id
):
543 The volume must still exist.
545 client_entity
= "client.{0}".format(auth_id
)
546 subvol_path
= self
.path
548 pool_name
= self
.fs
.getxattr(subvol_path
, 'ceph.dir.layout.pool').decode('utf-8')
549 except cephfs
.Error
as e
:
550 raise VolumeException(-e
.args
[0], e
.args
[1])
553 namespace
= self
.fs
.getxattr(subvol_path
, 'ceph.dir.layout.pool_namespace').decode('utf-8')
554 except cephfs
.NoData
:
557 # The auth_id might have read-only or read-write mount access for the
559 access_levels
= ('r', 'rw')
560 want_mds_caps
= ['allow {0} path={1}'.format(access_level
, subvol_path
.decode('utf-8'))
561 for access_level
in access_levels
]
562 want_osd_caps
= ['allow {0} pool={1}{2}'.format(
563 access_level
, pool_name
, " namespace={0}".format(namespace
) if namespace
else "")
564 for access_level
in access_levels
]
565 deny_access(self
.mgr
, client_entity
, want_mds_caps
, want_osd_caps
)
567 def authorized_list(self
):
569 Expose a list of auth IDs that have access to a subvolume.
571 return: a list of (auth_id, access_level) tuples, where
572 the access_level can be 'r' , or 'rw'.
573 None if no auth ID is given access to the subvolume.
575 with self
.auth_mdata_mgr
.subvol_metadata_lock(self
.group
.groupname
, self
.subvolname
):
576 meta
= self
.auth_mdata_mgr
.subvol_metadata_get(self
.group
.groupname
, self
.subvolname
)
577 auths
= [] # type: List[Dict[str,str]]
578 if not meta
or not meta
['auths']:
581 for auth
, auth_data
in meta
['auths'].items():
582 # Skip partial auth updates.
583 if not auth_data
['dirty']:
584 auths
.append({auth
: auth_data
['access_level']})
588 def evict(self
, volname
, auth_id
, timeout
=30):
590 Evict all clients based on the authorization ID and the subvolume path mounted.
591 Assumes that the authorization key has been revoked prior to calling this function.
593 This operation can throw an exception if the mon cluster is unresponsive, or
594 any individual MDS daemon is unresponsive for longer than the timeout passed in.
597 client_spec
= ["auth_name={0}".format(auth_id
), ]
598 client_spec
.append("client_metadata.root={0}".
599 format(self
.path
.decode('utf-8')))
601 log
.info("evict clients with {0}".format(', '.join(client_spec
)))
603 mds_map
= get_mds_map(self
.mgr
, volname
)
605 raise VolumeException(-errno
.ENOENT
, "mdsmap for volume {0} not found".format(volname
))
608 for name
, gid
in mds_map
['up'].items():
609 # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
610 assert name
.startswith("mds_")
611 up
[int(name
[4:])] = gid
613 # For all MDS ranks held by a daemon
614 # Do the parallelism in python instead of using "tell mds.*", because
615 # the latter doesn't give us per-mds output
617 for rank
, gid
in up
.items():
618 thread
= RankEvicter(self
.mgr
, self
.fs
, client_spec
, volname
, rank
, gid
, mds_map
, timeout
)
620 threads
.append(thread
)
625 log
.info("evict: joined all")
629 msg
= ("Failed to evict client with {0} from mds {1}/{2}: {3}".
630 format(', '.join(client_spec
), t
.rank
, t
.gid
, t
.exception
)
633 raise EvictionError(msg
)
635 def _get_clone_source(self
):
638 'volume' : self
.metadata_mgr
.get_option("source", "volume"),
639 'subvolume': self
.metadata_mgr
.get_option("source", "subvolume"),
640 'snapshot' : self
.metadata_mgr
.get_option("source", "snapshot"),
644 clone_source
["group"] = self
.metadata_mgr
.get_option("source", "group")
645 except MetadataMgrException
as me
:
646 if me
.errno
== -errno
.ENOENT
:
650 except MetadataMgrException
as me
:
651 raise VolumeException(-errno
.EINVAL
, "error fetching subvolume metadata")
656 state
= SubvolumeStates
.from_value(self
.metadata_mgr
.get_global_option(MetadataManager
.GLOBAL_META_KEY_STATE
))
657 subvolume_type
= self
.subvol_type
659 'state' : state
.value
661 if not SubvolumeOpSm
.is_complete_state(state
) and subvolume_type
== SubvolumeTypes
.TYPE_CLONE
:
662 subvolume_status
["source"] = self
._get
_clone
_source
()
663 return subvolume_status
667 return SubvolumeStates
.from_value(self
.metadata_mgr
.get_global_option(MetadataManager
.GLOBAL_META_KEY_STATE
))
670 def state(self
, val
):
673 self
.metadata_mgr
.update_global_section(MetadataManager
.GLOBAL_META_KEY_STATE
, state
)
675 self
.metadata_mgr
.flush()
677 def remove(self
, retainsnaps
=False):
679 raise VolumeException(-errno
.EINVAL
, "subvolume '{0}' does not support snapshot retention on delete".format(self
.subvolname
))
680 if self
.list_snapshots():
681 raise VolumeException(-errno
.ENOTEMPTY
, "subvolume '{0}' has snapshots".format(self
.subvolname
))
682 self
.trash_base_dir()
684 def resize(self
, newsize
, noshrink
):
685 subvol_path
= self
.path
686 return self
._resize
(subvol_path
, newsize
, noshrink
)
688 def create_snapshot(self
, snapname
):
690 group_snapshot_path
= os
.path
.join(self
.group
.path
,
691 self
.vol_spec
.snapshot_dir_prefix
.encode('utf-8'),
692 snapname
.encode('utf-8'))
693 self
.fs
.stat(group_snapshot_path
)
694 except cephfs
.Error
as e
:
695 if e
.args
[0] == errno
.ENOENT
:
696 snappath
= self
.snapshot_path(snapname
)
697 mksnap(self
.fs
, snappath
)
699 raise VolumeException(-e
.args
[0], e
.args
[1])
701 raise VolumeException(-errno
.EINVAL
, "subvolumegroup and subvolume snapshot name can't be same")
703 def has_pending_clones(self
, snapname
):
705 return self
.metadata_mgr
.section_has_item('clone snaps', snapname
)
706 except MetadataMgrException
as me
:
707 if me
.errno
== -errno
.ENOENT
:
711 def remove_snapshot(self
, snapname
):
712 if self
.has_pending_clones(snapname
):
713 raise VolumeException(-errno
.EAGAIN
, "snapshot '{0}' has pending clones".format(snapname
))
714 snappath
= self
.snapshot_path(snapname
)
715 rmsnap(self
.fs
, snappath
)
717 def snapshot_info(self
, snapname
):
718 if is_inherited_snap(snapname
):
719 raise VolumeException(-errno
.EINVAL
,
720 "snapshot name '{0}' is invalid".format(snapname
))
721 snappath
= self
.snapshot_data_path(snapname
)
724 snap_attrs
= {'created_at':'ceph.snap.btime', 'size':'ceph.dir.rbytes',
725 'data_pool':'ceph.dir.layout.pool'}
726 for key
, val
in snap_attrs
.items():
727 snap_info
[key
] = self
.fs
.getxattr(snappath
, val
)
728 return {'size': int(snap_info
['size']),
729 'created_at': str(datetime
.fromtimestamp(float(snap_info
['created_at']))),
730 'data_pool': snap_info
['data_pool'].decode('utf-8'),
731 'has_pending_clones': "yes" if self
.has_pending_clones(snapname
) else "no"}
732 except cephfs
.Error
as e
:
733 if e
.errno
== errno
.ENOENT
:
734 raise VolumeException(-errno
.ENOENT
,
735 "snapshot '{0}' does not exist".format(snapname
))
736 raise VolumeException(-e
.args
[0], e
.args
[1])
738 def list_snapshots(self
):
740 dirpath
= self
.snapshot_base_path()
741 return listsnaps(self
.fs
, self
.vol_spec
, dirpath
, filter_inherited_snaps
=True)
742 except VolumeException
as ve
:
743 if ve
.errno
== -errno
.ENOENT
:
747 def _add_snap_clone(self
, track_id
, snapname
):
748 self
.metadata_mgr
.add_section("clone snaps")
749 self
.metadata_mgr
.update_section("clone snaps", track_id
, snapname
)
750 self
.metadata_mgr
.flush()
752 def _remove_snap_clone(self
, track_id
):
753 self
.metadata_mgr
.remove_option("clone snaps", track_id
)
754 self
.metadata_mgr
.flush()
756 def attach_snapshot(self
, snapname
, tgt_subvolume
):
757 if not snapname
.encode('utf-8') in self
.list_snapshots():
758 raise VolumeException(-errno
.ENOENT
, "snapshot '{0}' does not exist".format(snapname
))
760 create_clone_index(self
.fs
, self
.vol_spec
)
761 with
open_clone_index(self
.fs
, self
.vol_spec
) as index
:
762 track_idx
= index
.track(tgt_subvolume
.base_path
)
763 self
._add
_snap
_clone
(track_idx
, snapname
)
764 except (IndexException
, MetadataMgrException
) as e
:
765 log
.warning("error creating clone index: {0}".format(e
))
766 raise VolumeException(-errno
.EINVAL
, "error cloning subvolume")
768 def detach_snapshot(self
, snapname
, track_id
):
769 if not snapname
.encode('utf-8') in self
.list_snapshots():
770 raise VolumeException(-errno
.ENOENT
, "snapshot '{0}' does not exist".format(snapname
))
772 with
open_clone_index(self
.fs
, self
.vol_spec
) as index
:
773 index
.untrack(track_id
)
774 self
._remove
_snap
_clone
(track_id
)
775 except (IndexException
, MetadataMgrException
) as e
:
776 log
.warning("error delining snapshot from clone: {0}".format(e
))
777 raise VolumeException(-errno
.EINVAL
, "error delinking snapshot from clone")