]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/operations/versions/subvolume_v1.py
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / operations / versions / subvolume_v1.py
1 import os
2 import sys
3 import stat
4 import uuid
5 import errno
6 import logging
7 import json
8 from datetime import datetime
9 from typing import List, Dict
10
11 import cephfs
12
13 from .metadata_manager import MetadataManager
14 from .subvolume_attrs import SubvolumeTypes, SubvolumeStates, SubvolumeFeatures
15 from .op_sm import SubvolumeOpSm
16 from .subvolume_base import SubvolumeBase
17 from ..template import SubvolumeTemplate
18 from ..snapshot_util import mksnap, rmsnap
19 from ..access import allow_access, deny_access
20 from ...exception import IndexException, OpSmException, VolumeException, MetadataMgrException, EvictionError
21 from ...fs_util import listsnaps, is_inherited_snap
22 from ..template import SubvolumeOpType
23 from ..group import Group
24 from ..rankevicter import RankEvicter
25 from ..volume import get_mds_map
26
27 from ..clone_index import open_clone_index, create_clone_index
28
29 log = logging.getLogger(__name__)
30
31 class SubvolumeV1(SubvolumeBase, SubvolumeTemplate):
32 """
33 Version 1 subvolumes creates a subvolume with path as follows,
34 volumes/<group-name>/<subvolume-name>/<uuid>/
35
36 - The directory under which user data resides is <uuid>
37 - Snapshots of the subvolume are taken within the <uuid> directory
38 - A meta file is maintained under the <subvolume-name> directory as a metadata store, typically storing,
39 - global information about the subvolume (version, path, type, state)
40 - snapshots attached to an ongoing clone operation
41 - clone snapshot source if subvolume is a clone of a snapshot
42 - It retains backward compatability with legacy subvolumes by creating the meta file for legacy subvolumes under
43 /volumes/_legacy/ (see legacy_config_path), thus allowing cloning of older legacy volumes that lack the <uuid>
44 component in the path.
45 """
46 VERSION = 1
47
48 @staticmethod
49 def version():
50 return SubvolumeV1.VERSION
51
52 @property
53 def path(self):
54 try:
55 # no need to stat the path -- open() does that
56 return self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_PATH).encode('utf-8')
57 except MetadataMgrException as me:
58 raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata")
59
60 @property
61 def features(self):
62 return [SubvolumeFeatures.FEATURE_SNAPSHOT_CLONE.value, SubvolumeFeatures.FEATURE_SNAPSHOT_AUTOPROTECT.value]
63
64 def mark_subvolume(self):
65 # set subvolume attr, on subvolume root, marking it as a CephFS subvolume
66 # subvolume root is where snapshots would be taken, and hence is the <uuid> dir for v1 subvolumes
67 try:
68 # MDS treats this as a noop for already marked subvolume
69 self.fs.setxattr(self.path, 'ceph.dir.subvolume', b'1', 0)
70 except cephfs.InvalidValue as e:
71 raise VolumeException(-errno.EINVAL, "invalid value specified for ceph.dir.subvolume")
72 except cephfs.Error as e:
73 raise VolumeException(-e.args[0], e.args[1])
74
75 def snapshot_base_path(self):
76 """ Base path for all snapshots """
77 return os.path.join(self.path, self.vol_spec.snapshot_dir_prefix.encode('utf-8'))
78
79 def snapshot_path(self, snapname):
80 """ Path to a specific snapshot named 'snapname' """
81 return os.path.join(self.snapshot_base_path(), snapname.encode('utf-8'))
82
83 def snapshot_data_path(self, snapname):
84 """ Path to user data directory within a subvolume snapshot named 'snapname' """
85 return self.snapshot_path(snapname)
86
87 def create(self, size, isolate_nspace, pool, mode, uid, gid):
88 subvolume_type = SubvolumeTypes.TYPE_NORMAL
89 try:
90 initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
91 except OpSmException as oe:
92 raise VolumeException(-errno.EINVAL, "subvolume creation failed: internal error")
93
94 subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
95 try:
96 # create directory and set attributes
97 self.fs.mkdirs(subvol_path, mode)
98 self.mark_subvolume()
99 attrs = {
100 'uid': uid,
101 'gid': gid,
102 'data_pool': pool,
103 'pool_namespace': self.namespace if isolate_nspace else None,
104 'quota': size
105 }
106 self.set_attrs(subvol_path, attrs)
107
108 # persist subvolume metadata
109 qpath = subvol_path.decode('utf-8')
110 self.init_config(SubvolumeV1.VERSION, subvolume_type, qpath, initial_state)
111 except (VolumeException, MetadataMgrException, cephfs.Error) as e:
112 try:
113 log.info("cleaning up subvolume with path: {0}".format(self.subvolname))
114 self.remove()
115 except VolumeException as ve:
116 log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve))
117
118 if isinstance(e, MetadataMgrException):
119 log.error("metadata manager exception: {0}".format(e))
120 e = VolumeException(-errno.EINVAL, "exception in subvolume metadata")
121 elif isinstance(e, cephfs.Error):
122 e = VolumeException(-e.args[0], e.args[1])
123 raise e
124
125 def add_clone_source(self, volname, subvolume, snapname, flush=False):
126 self.metadata_mgr.add_section("source")
127 self.metadata_mgr.update_section("source", "volume", volname)
128 if not subvolume.group.is_default_group():
129 self.metadata_mgr.update_section("source", "group", subvolume.group_name)
130 self.metadata_mgr.update_section("source", "subvolume", subvolume.subvol_name)
131 self.metadata_mgr.update_section("source", "snapshot", snapname)
132 if flush:
133 self.metadata_mgr.flush()
134
135 def remove_clone_source(self, flush=False):
136 self.metadata_mgr.remove_section("source")
137 if flush:
138 self.metadata_mgr.flush()
139
140 def create_clone(self, pool, source_volname, source_subvolume, snapname):
141 subvolume_type = SubvolumeTypes.TYPE_CLONE
142 try:
143 initial_state = SubvolumeOpSm.get_init_state(subvolume_type)
144 except OpSmException as oe:
145 raise VolumeException(-errno.EINVAL, "clone failed: internal error")
146
147 subvol_path = os.path.join(self.base_path, str(uuid.uuid4()).encode('utf-8'))
148 try:
149 # source snapshot attrs are used to create clone subvolume.
150 # attributes of subvolume's content though, are synced during the cloning process.
151 attrs = source_subvolume.get_attrs(source_subvolume.snapshot_data_path(snapname))
152
153 # override snapshot pool setting, if one is provided for the clone
154 if pool is not None:
155 attrs["data_pool"] = pool
156 attrs["pool_namespace"] = None
157
158 # create directory and set attributes
159 self.fs.mkdirs(subvol_path, attrs.get("mode"))
160 self.mark_subvolume()
161 self.set_attrs(subvol_path, attrs)
162
163 # persist subvolume metadata and clone source
164 qpath = subvol_path.decode('utf-8')
165 self.metadata_mgr.init(SubvolumeV1.VERSION, subvolume_type.value, qpath, initial_state.value)
166 self.add_clone_source(source_volname, source_subvolume, snapname)
167 self.metadata_mgr.flush()
168 except (VolumeException, MetadataMgrException, cephfs.Error) as e:
169 try:
170 log.info("cleaning up subvolume with path: {0}".format(self.subvolname))
171 self.remove()
172 except VolumeException as ve:
173 log.info("failed to cleanup subvolume '{0}' ({1})".format(self.subvolname, ve))
174
175 if isinstance(e, MetadataMgrException):
176 log.error("metadata manager exception: {0}".format(e))
177 e = VolumeException(-errno.EINVAL, "exception in subvolume metadata")
178 elif isinstance(e, cephfs.Error):
179 e = VolumeException(-e.args[0], e.args[1])
180 raise e
181
182 def allowed_ops_by_type(self, vol_type):
183 if vol_type == SubvolumeTypes.TYPE_CLONE:
184 return {op_type for op_type in SubvolumeOpType}
185
186 if vol_type == SubvolumeTypes.TYPE_NORMAL:
187 return {op_type for op_type in SubvolumeOpType} - {SubvolumeOpType.CLONE_STATUS,
188 SubvolumeOpType.CLONE_CANCEL,
189 SubvolumeOpType.CLONE_INTERNAL}
190
191 return {}
192
193 def allowed_ops_by_state(self, vol_state):
194 if vol_state == SubvolumeStates.STATE_COMPLETE:
195 return {op_type for op_type in SubvolumeOpType}
196
197 return {SubvolumeOpType.REMOVE_FORCE,
198 SubvolumeOpType.CLONE_CREATE,
199 SubvolumeOpType.CLONE_STATUS,
200 SubvolumeOpType.CLONE_CANCEL,
201 SubvolumeOpType.CLONE_INTERNAL}
202
203 def open(self, op_type):
204 if not isinstance(op_type, SubvolumeOpType):
205 raise VolumeException(-errno.ENOTSUP, "operation {0} not supported on subvolume '{1}'".format(
206 op_type.value, self.subvolname))
207 try:
208 self.metadata_mgr.refresh()
209
210 etype = self.subvol_type
211 if op_type not in self.allowed_ops_by_type(etype):
212 raise VolumeException(-errno.ENOTSUP, "operation '{0}' is not allowed on subvolume '{1}' of type {2}".format(
213 op_type.value, self.subvolname, etype.value))
214
215 estate = self.state
216 if op_type not in self.allowed_ops_by_state(estate):
217 raise VolumeException(-errno.EAGAIN, "subvolume '{0}' is not ready for operation {1}".format(
218 self.subvolname, op_type.value))
219
220 subvol_path = self.path
221 log.debug("refreshed metadata, checking subvolume path '{0}'".format(subvol_path))
222 st = self.fs.stat(subvol_path)
223 # unconditionally mark as subvolume, to handle pre-existing subvolumes without the mark
224 self.mark_subvolume()
225
226 self.uid = int(st.st_uid)
227 self.gid = int(st.st_gid)
228 self.mode = int(st.st_mode & ~stat.S_IFMT(st.st_mode))
229 except MetadataMgrException as me:
230 if me.errno == -errno.ENOENT:
231 raise VolumeException(-errno.ENOENT, "subvolume '{0}' does not exist".format(self.subvolname))
232 raise VolumeException(me.args[0], me.args[1])
233 except cephfs.ObjectNotFound:
234 log.debug("missing subvolume path '{0}' for subvolume '{1}'".format(subvol_path, self.subvolname))
235 raise VolumeException(-errno.ENOENT, "mount path missing for subvolume '{0}'".format(self.subvolname))
236 except cephfs.Error as e:
237 raise VolumeException(-e.args[0], e.args[1])
238
239 def _recover_auth_meta(self, auth_id, auth_meta):
240 """
241 Call me after locking the auth meta file.
242 """
243 remove_subvolumes = []
244
245 for subvol, subvol_data in auth_meta['subvolumes'].items():
246 if not subvol_data['dirty']:
247 continue
248
249 (group_name, subvol_name) = subvol.split('/')
250 group_name = group_name if group_name != 'None' else Group.NO_GROUP_NAME
251 access_level = subvol_data['access_level']
252
253 with self.auth_mdata_mgr.subvol_metadata_lock(group_name, subvol_name):
254 subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(group_name, subvol_name)
255
256 # No SVMeta update indicates that there was no auth update
257 # in Ceph either. So it's safe to remove corresponding
258 # partial update in AMeta.
259 if not subvol_meta or auth_id not in subvol_meta['auths']:
260 remove_subvolumes.append(subvol)
261 continue
262
263 want_auth = {
264 'access_level': access_level,
265 'dirty': False,
266 }
267 # SVMeta update looks clean. Ceph auth update must have been
268 # clean. Update the dirty flag and continue
269 if subvol_meta['auths'][auth_id] == want_auth:
270 auth_meta['subvolumes'][subvol]['dirty'] = False
271 self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
272 continue
273
274 client_entity = "client.{0}".format(auth_id)
275 ret, out, err = self.mgr.mon_command(
276 {
277 'prefix': 'auth get',
278 'entity': client_entity,
279 'format': 'json'
280 })
281 if ret == 0:
282 existing_caps = json.loads(out)
283 elif ret == -errno.ENOENT:
284 existing_caps = None
285 else:
286 log.error(err)
287 raise VolumeException(ret, err)
288
289 self._authorize_subvolume(auth_id, access_level, existing_caps)
290
291 # Recovered from partial auth updates for the auth ID's access
292 # to a subvolume.
293 auth_meta['subvolumes'][subvol]['dirty'] = False
294 self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
295
296 for subvol in remove_subvolumes:
297 del auth_meta['subvolumes'][subvol]
298
299 if not auth_meta['subvolumes']:
300 # Clean up auth meta file
301 self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id))
302 return
303
304 # Recovered from all partial auth updates for the auth ID.
305 auth_meta['dirty'] = False
306 self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
307
308 def authorize(self, auth_id, access_level, tenant_id=None, allow_existing_id=False):
309 """
310 Get-or-create a Ceph auth identity for `auth_id` and grant them access
311 to
312 :param auth_id:
313 :param access_level:
314 :param tenant_id: Optionally provide a stringizable object to
315 restrict any created cephx IDs to other callers
316 passing the same tenant ID.
317 :allow_existing_id: Optionally authorize existing auth-ids not
318 created by ceph_volume_client.
319 :return:
320 """
321
322 with self.auth_mdata_mgr.auth_lock(auth_id):
323 client_entity = "client.{0}".format(auth_id)
324 ret, out, err = self.mgr.mon_command(
325 {
326 'prefix': 'auth get',
327 'entity': client_entity,
328 'format': 'json'
329 })
330
331 if ret == 0:
332 existing_caps = json.loads(out)
333 elif ret == -errno.ENOENT:
334 existing_caps = None
335 else:
336 log.error(err)
337 raise VolumeException(ret, err)
338
339 # Existing meta, or None, to be updated
340 auth_meta = self.auth_mdata_mgr.auth_metadata_get(auth_id)
341
342 # subvolume data to be inserted
343 group_name = self.group.groupname if self.group.groupname != Group.NO_GROUP_NAME else None
344 group_subvol_id = "{0}/{1}".format(group_name, self.subvolname)
345 subvolume = {
346 group_subvol_id : {
347 # The access level at which the auth_id is authorized to
348 # access the volume.
349 'access_level': access_level,
350 'dirty': True,
351 }
352 }
353
354 if auth_meta is None:
355 if not allow_existing_id and existing_caps is not None:
356 msg = "auth ID: {0} exists and not created by mgr plugin. Not allowed to modify".format(auth_id)
357 log.error(msg)
358 raise VolumeException(-errno.EPERM, msg)
359
360 # non-existent auth IDs
361 sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format(
362 auth_id, tenant_id
363 ))
364 log.debug("Authorize: no existing meta")
365 auth_meta = {
366 'dirty': True,
367 'tenant_id': str(tenant_id) if tenant_id else None,
368 'subvolumes': subvolume
369 }
370 else:
371 # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key
372 if 'volumes' in auth_meta:
373 auth_meta['subvolumes'] = auth_meta.pop('volumes')
374
375 # Disallow tenants to share auth IDs
376 if str(auth_meta['tenant_id']) != str(tenant_id):
377 msg = "auth ID: {0} is already in use".format(auth_id)
378 log.error(msg)
379 raise VolumeException(-errno.EPERM, msg)
380
381 if auth_meta['dirty']:
382 self._recover_auth_meta(auth_id, auth_meta)
383
384 log.debug("Authorize: existing tenant {tenant}".format(
385 tenant=auth_meta['tenant_id']
386 ))
387 auth_meta['dirty'] = True
388 auth_meta['subvolumes'].update(subvolume)
389
390 self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
391
392 with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname):
393 key = self._authorize_subvolume(auth_id, access_level, existing_caps)
394
395 auth_meta['dirty'] = False
396 auth_meta['subvolumes'][group_subvol_id]['dirty'] = False
397 self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
398
399 if tenant_id:
400 return key
401 else:
402 # Caller wasn't multi-tenant aware: be safe and don't give
403 # them a key
404 return ""
405
406 def _authorize_subvolume(self, auth_id, access_level, existing_caps):
407 subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname)
408
409 auth = {
410 auth_id: {
411 'access_level': access_level,
412 'dirty': True,
413 }
414 }
415
416 if subvol_meta is None:
417 subvol_meta = {
418 'auths': auth
419 }
420 else:
421 subvol_meta['auths'].update(auth)
422 self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
423
424 key = self._authorize(auth_id, access_level, existing_caps)
425
426 subvol_meta['auths'][auth_id]['dirty'] = False
427 self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
428
429 return key
430
431 def _authorize(self, auth_id, access_level, existing_caps):
432 subvol_path = self.path
433 log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(auth_id, subvol_path))
434
435 # First I need to work out what the data pool is for this share:
436 # read the layout
437 try:
438 pool = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool').decode('utf-8')
439 except cephfs.Error as e:
440 raise VolumeException(-e.args[0], e.args[1])
441
442 try:
443 namespace = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool_namespace').decode('utf-8')
444 except cephfs.NoData:
445 namespace = None
446
447 # Now construct auth capabilities that give the guest just enough
448 # permissions to access the share
449 client_entity = "client.{0}".format(auth_id)
450 want_mds_cap = "allow {0} path={1}".format(access_level, subvol_path.decode('utf-8'))
451 want_osd_cap = "allow {0} pool={1}{2}".format(
452 access_level, pool, " namespace={0}".format(namespace) if namespace else "")
453
454 # Construct auth caps that if present might conflict with the desired
455 # auth caps.
456 unwanted_access_level = 'r' if access_level == 'rw' else 'rw'
457 unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, subvol_path.decode('utf-8'))
458 unwanted_osd_cap = "allow {0} pool={1}{2}".format(
459 unwanted_access_level, pool, " namespace={0}".format(namespace) if namespace else "")
460
461 return allow_access(self.mgr, client_entity, want_mds_cap, want_osd_cap,
462 unwanted_mds_cap, unwanted_osd_cap, existing_caps)
463
464 def deauthorize(self, auth_id):
465 with self.auth_mdata_mgr.auth_lock(auth_id):
466 # Existing meta, or None, to be updated
467 auth_meta = self.auth_mdata_mgr.auth_metadata_get(auth_id)
468
469 if auth_meta is None:
470 msg = "auth ID: {0} doesn't exist".format(auth_id)
471 log.error(msg)
472 raise VolumeException(-errno.ENOENT, msg)
473
474 # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key
475 if 'volumes' in auth_meta:
476 auth_meta['subvolumes'] = auth_meta.pop('volumes')
477
478 group_name = self.group.groupname if self.group.groupname != Group.NO_GROUP_NAME else None
479 group_subvol_id = "{0}/{1}".format(group_name, self.subvolname)
480 if (auth_meta is None) or (not auth_meta['subvolumes']):
481 log.warning("deauthorized called for already-removed auth"
482 "ID '{auth_id}' for subvolume '{subvolume}'".format(
483 auth_id=auth_id, subvolume=self.subvolname
484 ))
485 # Clean up the auth meta file of an auth ID
486 self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id))
487 return
488
489 if group_subvol_id not in auth_meta['subvolumes']:
490 log.warning("deauthorized called for already-removed auth"
491 "ID '{auth_id}' for subvolume '{subvolume}'".format(
492 auth_id=auth_id, subvolume=self.subvolname
493 ))
494 return
495
496 if auth_meta['dirty']:
497 self._recover_auth_meta(auth_id, auth_meta)
498
499 auth_meta['dirty'] = True
500 auth_meta['subvolumes'][group_subvol_id]['dirty'] = True
501 self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
502
503 self._deauthorize_subvolume(auth_id)
504
505 # Filter out the volume we're deauthorizing
506 del auth_meta['subvolumes'][group_subvol_id]
507
508 # Clean up auth meta file
509 if not auth_meta['subvolumes']:
510 self.fs.unlink(self.auth_mdata_mgr._auth_metadata_path(auth_id))
511 return
512
513 auth_meta['dirty'] = False
514 self.auth_mdata_mgr.auth_metadata_set(auth_id, auth_meta)
515
516 def _deauthorize_subvolume(self, auth_id):
517 with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname):
518 subvol_meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname)
519
520 if (subvol_meta is None) or (auth_id not in subvol_meta['auths']):
521 log.warning("deauthorized called for already-removed auth"
522 "ID '{auth_id}' for subvolume '{subvolume}'".format(
523 auth_id=auth_id, subvolume=self.subvolname
524 ))
525 return
526
527 subvol_meta['auths'][auth_id]['dirty'] = True
528 self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
529
530 self._deauthorize(auth_id)
531
532 # Remove the auth_id from the metadata *after* removing it
533 # from ceph, so that if we crashed here, we would actually
534 # recreate the auth ID during recovery (i.e. end up with
535 # a consistent state).
536
537 # Filter out the auth we're removing
538 del subvol_meta['auths'][auth_id]
539 self.auth_mdata_mgr.subvol_metadata_set(self.group.groupname, self.subvolname, subvol_meta)
540
541 def _deauthorize(self, auth_id):
542 """
543 The volume must still exist.
544 """
545 client_entity = "client.{0}".format(auth_id)
546 subvol_path = self.path
547 try:
548 pool_name = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool').decode('utf-8')
549 except cephfs.Error as e:
550 raise VolumeException(-e.args[0], e.args[1])
551
552 try:
553 namespace = self.fs.getxattr(subvol_path, 'ceph.dir.layout.pool_namespace').decode('utf-8')
554 except cephfs.NoData:
555 namespace = None
556
557 # The auth_id might have read-only or read-write mount access for the
558 # subvolume path.
559 access_levels = ('r', 'rw')
560 want_mds_caps = ['allow {0} path={1}'.format(access_level, subvol_path.decode('utf-8'))
561 for access_level in access_levels]
562 want_osd_caps = ['allow {0} pool={1}{2}'.format(
563 access_level, pool_name, " namespace={0}".format(namespace) if namespace else "")
564 for access_level in access_levels]
565 deny_access(self.mgr, client_entity, want_mds_caps, want_osd_caps)
566
567 def authorized_list(self):
568 """
569 Expose a list of auth IDs that have access to a subvolume.
570
571 return: a list of (auth_id, access_level) tuples, where
572 the access_level can be 'r' , or 'rw'.
573 None if no auth ID is given access to the subvolume.
574 """
575 with self.auth_mdata_mgr.subvol_metadata_lock(self.group.groupname, self.subvolname):
576 meta = self.auth_mdata_mgr.subvol_metadata_get(self.group.groupname, self.subvolname)
577 auths = [] # type: List[Dict[str,str]]
578 if not meta or not meta['auths']:
579 return auths
580
581 for auth, auth_data in meta['auths'].items():
582 # Skip partial auth updates.
583 if not auth_data['dirty']:
584 auths.append({auth: auth_data['access_level']})
585
586 return auths
587
588 def evict(self, volname, auth_id, timeout=30):
589 """
590 Evict all clients based on the authorization ID and the subvolume path mounted.
591 Assumes that the authorization key has been revoked prior to calling this function.
592
593 This operation can throw an exception if the mon cluster is unresponsive, or
594 any individual MDS daemon is unresponsive for longer than the timeout passed in.
595 """
596
597 client_spec = ["auth_name={0}".format(auth_id), ]
598 client_spec.append("client_metadata.root={0}".
599 format(self.path.decode('utf-8')))
600
601 log.info("evict clients with {0}".format(', '.join(client_spec)))
602
603 mds_map = get_mds_map(self.mgr, volname)
604 if not mds_map:
605 raise VolumeException(-errno.ENOENT, "mdsmap for volume {0} not found".format(volname))
606
607 up = {}
608 for name, gid in mds_map['up'].items():
609 # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
610 assert name.startswith("mds_")
611 up[int(name[4:])] = gid
612
613 # For all MDS ranks held by a daemon
614 # Do the parallelism in python instead of using "tell mds.*", because
615 # the latter doesn't give us per-mds output
616 threads = []
617 for rank, gid in up.items():
618 thread = RankEvicter(self.mgr, self.fs, client_spec, volname, rank, gid, mds_map, timeout)
619 thread.start()
620 threads.append(thread)
621
622 for t in threads:
623 t.join()
624
625 log.info("evict: joined all")
626
627 for t in threads:
628 if not t.success:
629 msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}".
630 format(', '.join(client_spec), t.rank, t.gid, t.exception)
631 )
632 log.error(msg)
633 raise EvictionError(msg)
634
635 def _get_clone_source(self):
636 try:
637 clone_source = {
638 'volume' : self.metadata_mgr.get_option("source", "volume"),
639 'subvolume': self.metadata_mgr.get_option("source", "subvolume"),
640 'snapshot' : self.metadata_mgr.get_option("source", "snapshot"),
641 }
642
643 try:
644 clone_source["group"] = self.metadata_mgr.get_option("source", "group")
645 except MetadataMgrException as me:
646 if me.errno == -errno.ENOENT:
647 pass
648 else:
649 raise
650 except MetadataMgrException as me:
651 raise VolumeException(-errno.EINVAL, "error fetching subvolume metadata")
652 return clone_source
653
654 @property
655 def status(self):
656 state = SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE))
657 subvolume_type = self.subvol_type
658 subvolume_status = {
659 'state' : state.value
660 }
661 if not SubvolumeOpSm.is_complete_state(state) and subvolume_type == SubvolumeTypes.TYPE_CLONE:
662 subvolume_status["source"] = self._get_clone_source()
663 return subvolume_status
664
665 @property
666 def state(self):
667 return SubvolumeStates.from_value(self.metadata_mgr.get_global_option(MetadataManager.GLOBAL_META_KEY_STATE))
668
669 @state.setter
670 def state(self, val):
671 state = val[0].value
672 flush = val[1]
673 self.metadata_mgr.update_global_section(MetadataManager.GLOBAL_META_KEY_STATE, state)
674 if flush:
675 self.metadata_mgr.flush()
676
677 def remove(self, retainsnaps=False):
678 if retainsnaps:
679 raise VolumeException(-errno.EINVAL, "subvolume '{0}' does not support snapshot retention on delete".format(self.subvolname))
680 if self.list_snapshots():
681 raise VolumeException(-errno.ENOTEMPTY, "subvolume '{0}' has snapshots".format(self.subvolname))
682 self.trash_base_dir()
683
684 def resize(self, newsize, noshrink):
685 subvol_path = self.path
686 return self._resize(subvol_path, newsize, noshrink)
687
688 def create_snapshot(self, snapname):
689 try:
690 group_snapshot_path = os.path.join(self.group.path,
691 self.vol_spec.snapshot_dir_prefix.encode('utf-8'),
692 snapname.encode('utf-8'))
693 self.fs.stat(group_snapshot_path)
694 except cephfs.Error as e:
695 if e.args[0] == errno.ENOENT:
696 snappath = self.snapshot_path(snapname)
697 mksnap(self.fs, snappath)
698 else:
699 raise VolumeException(-e.args[0], e.args[1])
700 else:
701 raise VolumeException(-errno.EINVAL, "subvolumegroup and subvolume snapshot name can't be same")
702
703 def has_pending_clones(self, snapname):
704 try:
705 return self.metadata_mgr.section_has_item('clone snaps', snapname)
706 except MetadataMgrException as me:
707 if me.errno == -errno.ENOENT:
708 return False
709 raise
710
711 def remove_snapshot(self, snapname):
712 if self.has_pending_clones(snapname):
713 raise VolumeException(-errno.EAGAIN, "snapshot '{0}' has pending clones".format(snapname))
714 snappath = self.snapshot_path(snapname)
715 rmsnap(self.fs, snappath)
716
717 def snapshot_info(self, snapname):
718 if is_inherited_snap(snapname):
719 raise VolumeException(-errno.EINVAL,
720 "snapshot name '{0}' is invalid".format(snapname))
721 snappath = self.snapshot_data_path(snapname)
722 snap_info = {}
723 try:
724 snap_attrs = {'created_at':'ceph.snap.btime', 'size':'ceph.dir.rbytes',
725 'data_pool':'ceph.dir.layout.pool'}
726 for key, val in snap_attrs.items():
727 snap_info[key] = self.fs.getxattr(snappath, val)
728 return {'size': int(snap_info['size']),
729 'created_at': str(datetime.fromtimestamp(float(snap_info['created_at']))),
730 'data_pool': snap_info['data_pool'].decode('utf-8'),
731 'has_pending_clones': "yes" if self.has_pending_clones(snapname) else "no"}
732 except cephfs.Error as e:
733 if e.errno == errno.ENOENT:
734 raise VolumeException(-errno.ENOENT,
735 "snapshot '{0}' does not exist".format(snapname))
736 raise VolumeException(-e.args[0], e.args[1])
737
738 def list_snapshots(self):
739 try:
740 dirpath = self.snapshot_base_path()
741 return listsnaps(self.fs, self.vol_spec, dirpath, filter_inherited_snaps=True)
742 except VolumeException as ve:
743 if ve.errno == -errno.ENOENT:
744 return []
745 raise
746
747 def _add_snap_clone(self, track_id, snapname):
748 self.metadata_mgr.add_section("clone snaps")
749 self.metadata_mgr.update_section("clone snaps", track_id, snapname)
750 self.metadata_mgr.flush()
751
752 def _remove_snap_clone(self, track_id):
753 self.metadata_mgr.remove_option("clone snaps", track_id)
754 self.metadata_mgr.flush()
755
756 def attach_snapshot(self, snapname, tgt_subvolume):
757 if not snapname.encode('utf-8') in self.list_snapshots():
758 raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
759 try:
760 create_clone_index(self.fs, self.vol_spec)
761 with open_clone_index(self.fs, self.vol_spec) as index:
762 track_idx = index.track(tgt_subvolume.base_path)
763 self._add_snap_clone(track_idx, snapname)
764 except (IndexException, MetadataMgrException) as e:
765 log.warning("error creating clone index: {0}".format(e))
766 raise VolumeException(-errno.EINVAL, "error cloning subvolume")
767
768 def detach_snapshot(self, snapname, track_id):
769 if not snapname.encode('utf-8') in self.list_snapshots():
770 raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
771 try:
772 with open_clone_index(self.fs, self.vol_spec) as index:
773 index.untrack(track_id)
774 self._remove_snap_clone(track_id)
775 except (IndexException, MetadataMgrException) as e:
776 log.warning("error delining snapshot from clone: {0}".format(e))
777 raise VolumeException(-errno.EINVAL, "error delinking snapshot from clone")