2 Copyright (C) 2015 Red Hat, Inc.
4 LGPL2. See file COPYING.
7 from contextlib
import contextmanager
20 from ceph_argparse
import json_command
26 class RadosError(Exception):
28 Something went wrong talking to Ceph with librados
35 log
= logging
.getLogger(__name__
)
38 # Reserved volume group name which we use in paths for volumes
39 # that are not assigned to a group (i.e. created with group=None)
40 NO_GROUP_NAME
= "_nogroup"
42 # Filename extensions for meta files.
43 META_FILE_EXT
= ".meta"
45 class VolumePath(object):
47 Identify a volume's path as group->volume
48 The Volume ID is a unique identifier, but this is a much more
49 helpful thing to pass around.
51 def __init__(self
, group_id
, volume_id
):
52 self
.group_id
= group_id
53 self
.volume_id
= volume_id
54 assert self
.group_id
!= NO_GROUP_NAME
55 assert self
.volume_id
!= "" and self
.volume_id
is not None
58 return "{0}/{1}".format(self
.group_id
, self
.volume_id
)
61 class ClusterTimeout(Exception):
63 Exception indicating that we timed out trying to talk to the Ceph cluster,
64 either to the mons, or to any individual daemon that the mons indicate ought
65 to be up but isn't responding to us.
70 class ClusterError(Exception):
72 Exception indicating that the cluster returned an error to a command that
73 we thought should be successful based on our last knowledge of the cluster
76 def __init__(self
, action
, result_code
, result_str
):
78 self
._result
_code
= result_code
79 self
._result
_str
= result_str
82 return "Error {0} (\"{1}\") while {2}".format(
83 self
._result
_code
, self
._result
_str
, self
._action
)
86 class RankEvicter(threading
.Thread
):
88 Thread for evicting client(s) from a particular MDS daemon instance.
90 This is more complex than simply sending a command, because we have to
91 handle cases where MDS daemons might not be fully up yet, and/or might
92 be transiently unresponsive to commands.
94 class GidGone(Exception):
99 def __init__(self
, volume_client
, client_spec
, rank
, gid
, mds_map
, ready_timeout
):
101 :param client_spec: list of strings, used as filter arguments to "session evict"
102 pass ["id=123"] to evict a single client with session id 123.
106 self
._mds
_map
= mds_map
107 self
._client
_spec
= client_spec
108 self
._volume
_client
= volume_client
109 self
._ready
_timeout
= ready_timeout
110 self
._ready
_waited
= 0
113 self
.exception
= None
115 super(RankEvicter
, self
).__init
__()
117 def _ready_to_evict(self
):
118 if self
._mds
_map
['up'].get("mds_{0}".format(self
.rank
), None) != self
.gid
:
119 log
.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format(
120 self
._client
_spec
, self
.rank
, self
.gid
122 raise RankEvicter
.GidGone()
124 info
= self
._mds
_map
['info']["gid_{0}".format(self
.gid
)]
125 log
.debug("_ready_to_evict: state={0}".format(info
['state']))
126 return info
['state'] in ["up:active", "up:clientreplay"]
128 def _wait_for_ready(self
):
130 Wait for that MDS rank to reach an active or clientreplay state, and
133 while not self
._ready
_to
_evict
():
134 if self
._ready
_waited
> self
._ready
_timeout
:
135 raise ClusterTimeout()
137 time
.sleep(self
.POLL_PERIOD
)
138 self
._ready
_waited
+= self
.POLL_PERIOD
140 self
._mds
_map
= self
._volume
_client
._rados
_command
("mds dump", {})
144 Run the eviction procedure. Return true on success, false on errors.
147 # Wait til the MDS is believed by the mon to be available for commands
149 self
._wait
_for
_ready
()
153 # Then send it an evict
154 ret
= errno
.ETIMEDOUT
155 while ret
== errno
.ETIMEDOUT
:
156 log
.debug("mds_command: {0}, {1}".format(
157 "%s" % self
.gid
, ["session", "evict"] + self
._client
_spec
159 ret
, outb
, outs
= self
._volume
_client
.fs
.mds_command(
162 "prefix": "session evict",
163 "filters": self
._client
_spec
165 log
.debug("mds_command: complete {0} {1}".format(ret
, outs
))
167 # If we get a clean response, great, it's gone from that rank.
170 elif ret
== errno
.ETIMEDOUT
:
171 # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error)
172 self
._mds
_map
= self
._volume
_client
._rados
_command
("mds dump", {})
174 self
._wait
_for
_ready
()
178 raise ClusterError("Sending evict to mds.{0}".format(self
.gid
), ret
, outs
)
183 except Exception as e
:
190 class EvictionError(Exception):
194 class CephFSVolumeClientError(Exception):
196 Something went wrong talking to Ceph using CephFSVolumeClient.
201 CEPHFSVOLUMECLIENT_VERSION_HISTORY
= """
203 CephFSVolumeClient Version History:
205 * 1 - Initial version
206 * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient
207 * 3 - Allow volumes to be created without RADOS namespace isolation
211 class CephFSVolumeClient(object):
213 Combine libcephfs and librados interfaces to implement a
214 'Volume' concept implemented as a cephfs directory and
215 client capabilities which restrict mount access to this
218 Additionally, volumes may be in a 'Group'. Conveniently,
219 volumes are a lot like manila shares, and groups are a lot
220 like manila consistency groups.
222 Refer to volumes with VolumePath, which specifies the
223 volume and group IDs (both strings). The group ID may
226 In general, functions in this class are allowed raise rados.Error
227 or cephfs.Error exceptions in unexpected situations.
233 # Where shall we create our volumes?
234 POOL_PREFIX
= "fsvolume_"
235 DEFAULT_VOL_PREFIX
= "/volumes"
236 DEFAULT_NS_PREFIX
= "fsvolumens_"
238 def __init__(self
, auth_id
, conf_path
, cluster_name
, volume_prefix
=None, pool_ns_prefix
=None):
241 self
.connected
= False
242 self
.conf_path
= conf_path
243 self
.cluster_name
= cluster_name
244 self
.auth_id
= auth_id
245 self
.volume_prefix
= volume_prefix
if volume_prefix
else self
.DEFAULT_VOL_PREFIX
246 self
.pool_ns_prefix
= pool_ns_prefix
if pool_ns_prefix
else self
.DEFAULT_NS_PREFIX
247 # For flock'ing in cephfs, I want a unique ID to distinguish me
248 # from any other manila-share services that are loading this module.
249 # We could use pid, but that's unnecessary weak: generate a
251 self
._id
= struct
.unpack(">Q", uuid
.uuid1().get_bytes()[0:8])[0]
253 # TODO: version the on-disk structures
256 # Scan all auth keys to see if they're dirty: if they are, they have
257 # state that might not have propagated to Ceph or to the related
260 # Important: we *always* acquire locks in the order auth->volume
261 # That means a volume can never be dirty without the auth key
262 # we're updating it with being dirty at the same time.
264 # First list the auth IDs that have potentially dirty on-disk metadata
265 log
.debug("Recovering from partial auth updates (if any)...")
268 dir_handle
= self
.fs
.opendir(self
.volume_prefix
)
269 except cephfs
.ObjectNotFound
:
270 log
.debug("Nothing to recover. No auth meta files.")
273 d
= self
.fs
.readdir(dir_handle
)
277 log
.debug("Nothing to recover. No auth meta files.")
280 # Identify auth IDs from auth meta filenames. The auth meta files
281 # are named as, "$<auth_id><meta filename extension>"
282 regex
= "^\$(.*){0}$".format(re
.escape(META_FILE_EXT
))
283 match
= re
.search(regex
, d
.d_name
)
285 auth_ids
.append(match
.group(1))
287 d
= self
.fs
.readdir(dir_handle
)
289 self
.fs
.closedir(dir_handle
)
291 # Key points based on ordering:
292 # * Anything added in VMeta is already added in AMeta
293 # * Anything added in Ceph is already added in VMeta
294 # * Anything removed in VMeta is already removed in Ceph
295 # * Anything removed in AMeta is already removed in VMeta
297 # Deauthorization: because I only update metadata AFTER the
298 # update of the next level down, I have the same ordering of
299 # -> things which exist in the AMeta should also exist
300 # in the VMeta, should also exist in Ceph, and the same
301 # recovery procedure that gets me consistent after crashes
302 # during authorization will also work during deauthorization
304 # Now for each auth ID, check for dirty flag and apply updates
305 # if dirty flag is found
306 for auth_id
in auth_ids
:
307 with self
._auth
_lock
(auth_id
):
308 auth_meta
= self
._auth
_metadata
_get
(auth_id
)
309 if not auth_meta
or not auth_meta
['volumes']:
310 # Clean up auth meta file
311 self
.fs
.unlink(self
._auth
_metadata
_path
(auth_id
))
313 if not auth_meta
['dirty']:
315 self
._recover
_auth
_meta
(auth_id
, auth_meta
)
317 log
.debug("Recovered from partial auth updates (if any).")
319 def _recover_auth_meta(self
, auth_id
, auth_meta
):
321 Call me after locking the auth meta file.
325 for volume
, volume_data
in auth_meta
['volumes'].items():
326 if not volume_data
['dirty']:
329 (group_id
, volume_id
) = volume
.split('/')
330 group_id
= group_id
if group_id
is not 'None' else None
331 volume_path
= VolumePath(group_id
, volume_id
)
332 access_level
= volume_data
['access_level']
334 with self
._volume
_lock
(volume_path
):
335 vol_meta
= self
._volume
_metadata
_get
(volume_path
)
337 # No VMeta update indicates that there was no auth update
338 # in Ceph either. So it's safe to remove corresponding
339 # partial update in AMeta.
340 if not vol_meta
or auth_id
not in vol_meta
['auths']:
341 remove_volumes
.append(volume
)
345 'access_level': access_level
,
348 # VMeta update looks clean. Ceph auth update must have been
350 if vol_meta
['auths'][auth_id
] == want_auth
:
353 readonly
= True if access_level
is 'r' else False
354 self
._authorize
_volume
(volume_path
, auth_id
, readonly
)
356 # Recovered from partial auth updates for the auth ID's access
358 auth_meta
['volumes'][volume
]['dirty'] = False
359 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
361 for volume
in remove_volumes
:
362 del auth_meta
['volumes'][volume
]
364 if not auth_meta
['volumes']:
365 # Clean up auth meta file
366 self
.fs
.unlink(self
._auth
_metadata
_path
(auth_id
))
369 # Recovered from all partial auth updates for the auth ID.
370 auth_meta
['dirty'] = False
371 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
374 def evict(self
, auth_id
, timeout
=30, volume_path
=None):
376 Evict all clients based on the authorization ID and optionally based on
377 the volume path mounted. Assumes that the authorization key has been
378 revoked prior to calling this function.
380 This operation can throw an exception if the mon cluster is unresponsive, or
381 any individual MDS daemon is unresponsive for longer than the timeout passed in.
384 client_spec
= ["auth_name={0}".format(auth_id
), ]
386 client_spec
.append("client_metadata.root={0}".
387 format(self
._get
_path
(volume_path
)))
389 log
.info("evict clients with {0}".format(', '.join(client_spec
)))
391 mds_map
= self
._rados
_command
("mds dump", {})
394 for name
, gid
in mds_map
['up'].items():
395 # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
396 assert name
.startswith("mds_")
397 up
[int(name
[4:])] = gid
399 # For all MDS ranks held by a daemon
400 # Do the parallelism in python instead of using "tell mds.*", because
401 # the latter doesn't give us per-mds output
403 for rank
, gid
in up
.items():
404 thread
= RankEvicter(self
, client_spec
, rank
, gid
, mds_map
,
407 threads
.append(thread
)
412 log
.info("evict: joined all")
416 msg
= ("Failed to evict client with {0} from mds {1}/{2}: {3}".
417 format(', '.join(client_spec
), t
.rank
, t
.gid
, t
.exception
)
420 raise EvictionError(msg
)
422 def _get_path(self
, volume_path
):
424 Determine the path within CephFS where this volume will live
425 :return: absolute path (string)
429 volume_path
.group_id
if volume_path
.group_id
is not None else NO_GROUP_NAME
,
430 volume_path
.volume_id
)
432 def _get_group_path(self
, group_id
):
434 raise ValueError("group_id may not be None")
441 def connect(self
, premount_evict
= None):
444 :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers
445 may want to use this to specify their own auth ID if they expect
446 to be a unique instance and don't want to wait for caps to time
447 out after failure of another instance of themselves.
449 log
.debug("Connecting to RADOS with config {0}...".format(self
.conf_path
))
450 self
.rados
= rados
.Rados(
451 name
="client.{0}".format(self
.auth_id
),
452 clustername
=self
.cluster_name
,
453 conffile
=self
.conf_path
,
458 log
.debug("Connection to RADOS complete")
460 log
.debug("Connecting to cephfs...")
461 self
.fs
= cephfs
.LibCephFS(rados_inst
=self
.rados
)
462 log
.debug("CephFS initializing...")
464 if premount_evict
is not None:
465 log
.debug("Premount eviction of {0} starting".format(premount_evict
))
466 self
.evict(premount_evict
)
467 log
.debug("Premount eviction of {0} completes".format(premount_evict
))
468 log
.debug("CephFS mounting...")
470 log
.debug("Connection to cephfs complete")
472 # Recover from partial auth updates due to a previous
476 def get_mon_addrs(self
):
477 log
.info("get_mon_addrs")
479 mon_map
= self
._rados
_command
("mon dump")
480 for mon
in mon_map
['mons']:
481 ip_port
= mon
['addr'].split("/")[0]
482 result
.append(ip_port
)
486 def disconnect(self
):
487 log
.info("disconnect")
489 log
.debug("Disconnecting cephfs...")
492 log
.debug("Disconnecting cephfs complete")
495 log
.debug("Disconnecting rados...")
496 self
.rados
.shutdown()
498 log
.debug("Disconnecting rados complete")
503 def _get_pool_id(self
, osd_map
, pool_name
):
504 # Maybe borrow the OSDMap wrapper class from calamari if more helpers
505 # like this are needed.
506 for pool
in osd_map
['pools']:
507 if pool
['pool_name'] == pool_name
:
512 def _create_volume_pool(self
, pool_name
):
514 Idempotently create a pool for use as a CephFS data pool, with the given name
516 :return The ID of the created pool
518 osd_map
= self
._rados
_command
('osd dump', {})
520 existing_id
= self
._get
_pool
_id
(osd_map
, pool_name
)
521 if existing_id
is not None:
522 log
.info("Pool {0} already exists".format(pool_name
))
525 osd_count
= len(osd_map
['osds'])
527 # We can't query the actual cluster config remotely, but since this is
528 # just a heuristic we'll assume that the ceph.conf we have locally reflects
529 # that in use in the rest of the cluster.
530 pg_warn_max_per_osd
= int(self
.rados
.conf_get('mon_max_pg_per_osd'))
533 for pool
in osd_map
['pools']:
534 if not pool
['pool_name'].startswith(self
.POOL_PREFIX
):
535 other_pgs
+= pool
['pg_num']
537 # A basic heuristic for picking pg_num: work out the max number of
538 # PGs we can have without tripping a warning, then subtract the number
539 # of PGs already created by non-manila pools, then divide by ten. That'll
540 # give you a reasonable result on a system where you have "a few" manila
542 pg_num
= ((pg_warn_max_per_osd
* osd_count
) - other_pgs
) / 10
543 # TODO Alternatively, respect an override set by the user.
553 osd_map
= self
._rados
_command
('osd dump', {})
554 pool_id
= self
._get
_pool
_id
(osd_map
, pool_name
)
557 # If the pool isn't there, that's either a ceph bug, or it's some outside influence
558 # removing it right after we created it.
559 log
.error("OSD map doesn't contain expected pool '{0}':\n{1}".format(
560 pool_name
, json
.dumps(osd_map
, indent
=2)
562 raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name
))
566 def create_group(self
, group_id
):
567 # Prevent craftily-named volume groups from colliding with the meta
569 if group_id
.endswith(META_FILE_EXT
):
570 raise ValueError("group ID cannot end with '{0}'.".format(
572 path
= self
._get
_group
_path
(group_id
)
575 def destroy_group(self
, group_id
):
576 path
= self
._get
_group
_path
(group_id
)
578 self
.fs
.stat(self
.volume_prefix
)
579 except cephfs
.ObjectNotFound
:
584 def _mkdir_p(self
, path
):
587 except cephfs
.ObjectNotFound
:
592 parts
= path
.split(os
.path
.sep
)
594 for i
in range(1, len(parts
) + 1):
595 subpath
= os
.path
.join(*parts
[0:i
])
597 self
.fs
.stat(subpath
)
598 except cephfs
.ObjectNotFound
:
599 self
.fs
.mkdir(subpath
, 0o755)
601 def create_volume(self
, volume_path
, size
=None, data_isolated
=False, namespace_isolated
=True):
603 Set up metadata, pools and auth for a volume.
605 This function is idempotent. It is safe to call this again
606 for an already-created volume, even if it is in use.
608 :param volume_path: VolumePath instance
609 :param size: In bytes, or None for no size limit
610 :param data_isolated: If true, create a separate OSD pool for this volume
611 :param namespace_isolated: If true, use separate RADOS namespace for this volume
614 path
= self
._get
_path
(volume_path
)
615 log
.info("create_volume: {0}".format(path
))
620 self
.fs
.setxattr(path
, 'ceph.quota.max_bytes', size
.__str
__(), 0)
622 # data_isolated means create a separate pool for this volume
624 pool_name
= "{0}{1}".format(self
.POOL_PREFIX
, volume_path
.volume_id
)
625 log
.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path
, pool_name
))
626 pool_id
= self
._create
_volume
_pool
(pool_name
)
627 mds_map
= self
._rados
_command
("mds dump", {})
628 if pool_id
not in mds_map
['data_pools']:
629 self
._rados
_command
("mds add_data_pool", {
632 self
.fs
.setxattr(path
, 'ceph.dir.layout.pool', pool_name
, 0)
634 # enforce security isolation, use separate namespace for this volume
635 if namespace_isolated
:
636 namespace
= "{0}{1}".format(self
.pool_ns_prefix
, volume_path
.volume_id
)
637 log
.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path
, namespace
))
638 self
.fs
.setxattr(path
, 'ceph.dir.layout.pool_namespace', namespace
, 0)
640 # If volume's namespace layout is not set, then the volume's pool
641 # layout remains unset and will undesirably change with ancestor's
642 # pool layout changes.
643 pool_name
= self
._get
_ancestor
_xattr
(path
, "ceph.dir.layout.pool")
644 self
.fs
.setxattr(path
, 'ceph.dir.layout.pool', pool_name
, 0)
646 # Create a volume meta file, if it does not already exist, to store
647 # data about auth ids having access to the volume
648 fd
= self
.fs
.open(self
._volume
_metadata
_path
(volume_path
),
656 def delete_volume(self
, volume_path
, data_isolated
=False):
658 Make a volume inaccessible to guests. This function is
659 idempotent. This is the fast part of tearing down a volume: you must
660 also later call purge_volume, which is the slow part.
662 :param volume_path: Same identifier used in create_volume
666 path
= self
._get
_path
(volume_path
)
667 log
.info("delete_volume: {0}".format(path
))
669 # Create the trash folder if it doesn't already exist
670 trash
= os
.path
.join(self
.volume_prefix
, "_deleting")
673 # We'll move it to here
674 trashed_volume
= os
.path
.join(trash
, volume_path
.volume_id
)
676 # Move the volume's data to the trash folder
679 except cephfs
.ObjectNotFound
:
680 log
.warning("Trying to delete volume '{0}' but it's already gone".format(
683 self
.fs
.rename(path
, trashed_volume
)
685 # Delete the volume meta file, if it's not already deleted
686 vol_meta_path
= self
._volume
_metadata
_path
(volume_path
)
688 self
.fs
.unlink(vol_meta_path
)
689 except cephfs
.ObjectNotFound
:
692 def purge_volume(self
, volume_path
, data_isolated
=False):
694 Finish clearing up a volume that was previously passed to delete_volume. This
695 function is idempotent.
698 trash
= os
.path
.join(self
.volume_prefix
, "_deleting")
699 trashed_volume
= os
.path
.join(trash
, volume_path
.volume_id
)
702 self
.fs
.stat(trashed_volume
)
703 except cephfs
.ObjectNotFound
:
704 log
.warning("Trying to purge volume '{0}' but it's already been purged".format(
708 def rmtree(root_path
):
709 log
.debug("rmtree {0}".format(root_path
))
710 dir_handle
= self
.fs
.opendir(root_path
)
711 d
= self
.fs
.readdir(dir_handle
)
713 if d
.d_name
not in [".", ".."]:
714 # Do not use os.path.join because it is sensitive
715 # to string encoding, we just pass through dnames
717 d_full
= "{0}/{1}".format(root_path
, d
.d_name
)
721 self
.fs
.unlink(d_full
)
723 d
= self
.fs
.readdir(dir_handle
)
724 self
.fs
.closedir(dir_handle
)
726 self
.fs
.rmdir(root_path
)
728 rmtree(trashed_volume
)
731 pool_name
= "{0}{1}".format(self
.POOL_PREFIX
, volume_path
.volume_id
)
732 osd_map
= self
._rados
_command
("osd dump", {})
733 pool_id
= self
._get
_pool
_id
(osd_map
, pool_name
)
734 mds_map
= self
._rados
_command
("mds dump", {})
735 if pool_id
in mds_map
['data_pools']:
736 self
._rados
_command
("mds remove_data_pool", {
739 self
._rados
_command
("osd pool delete",
743 "sure": "--yes-i-really-really-mean-it"
746 def _get_ancestor_xattr(self
, path
, attr
):
748 Helper for reading layout information: if this xattr is missing
749 on the requested path, keep checking parents until we find it.
752 result
= self
.fs
.getxattr(path
, attr
)
754 # Annoying! cephfs gives us empty instead of an error when attr not found
755 raise cephfs
.NoData()
758 except cephfs
.NoData
:
762 return self
._get
_ancestor
_xattr
(os
.path
.split(path
)[0], attr
)
764 def _check_compat_version(self
, compat_version
):
765 if self
.version
< compat_version
:
766 msg
= ("The current version of CephFSVolumeClient, version {0} "
767 "does not support the required feature. Need version {1} "
768 "or greater".format(self
.version
, compat_version
)
771 raise CephFSVolumeClientError(msg
)
773 def _metadata_get(self
, path
):
775 Return a deserialized JSON object, or None
777 fd
= self
.fs
.open(path
, "r")
778 # TODO iterate instead of assuming file < 4MB
779 read_bytes
= self
.fs
.read(fd
, 0, 4096 * 1024)
782 return json
.loads(read_bytes
)
786 def _metadata_set(self
, path
, data
):
787 serialized
= json
.dumps(data
)
788 fd
= self
.fs
.open(path
, "w")
790 self
.fs
.write(fd
, serialized
, 0)
795 def _lock(self
, path
):
799 fd
= self
.fs
.open(path
, os
.O_CREAT
, 0o755)
800 self
.fs
.flock(fd
, fcntl
.LOCK_EX
, self
._id
)
802 # The locked file will be cleaned up sometime. It could be
803 # unlinked e.g., by an another manila share instance, before
804 # lock was applied on it. Perform checks to ensure that this
807 statbuf
= self
.fs
.stat(path
)
808 except cephfs
.ObjectNotFound
:
812 fstatbuf
= self
.fs
.fstat(fd
)
813 if statbuf
.st_ino
== fstatbuf
.st_ino
:
819 self
.fs
.flock(fd
, fcntl
.LOCK_UN
, self
._id
)
824 def _auth_metadata_path(self
, auth_id
):
825 return os
.path
.join(self
.volume_prefix
, "${0}{1}".format(
826 auth_id
, META_FILE_EXT
))
828 def _auth_lock(self
, auth_id
):
829 return self
._lock
(self
._auth
_metadata
_path
(auth_id
))
831 def _auth_metadata_get(self
, auth_id
):
833 Call me with the metadata locked!
835 Check whether a auth metadata structure can be decoded by the current
836 version of CephFSVolumeClient.
838 Return auth metadata that the current version of CephFSVolumeClient
841 auth_metadata
= self
._metadata
_get
(self
._auth
_metadata
_path
(auth_id
))
844 self
._check
_compat
_version
(auth_metadata
['compat_version'])
848 def _auth_metadata_set(self
, auth_id
, data
):
850 Call me with the metadata locked!
852 Fsync the auth metadata.
854 Add two version attributes to the auth metadata,
855 'compat_version', the minimum CephFSVolumeClient version that can
856 decode the metadata, and 'version', the CephFSVolumeClient version
857 that encoded the metadata.
859 data
['compat_version'] = 1
860 data
['version'] = self
.version
861 return self
._metadata
_set
(self
._auth
_metadata
_path
(auth_id
), data
)
863 def _volume_metadata_path(self
, volume_path
):
864 return os
.path
.join(self
.volume_prefix
, "_{0}:{1}{2}".format(
865 volume_path
.group_id
if volume_path
.group_id
else "",
866 volume_path
.volume_id
,
870 def _volume_lock(self
, volume_path
):
872 Return a ContextManager which locks the authorization metadata for
873 a particular volume, and persists a flag to the metadata indicating
874 that it is currently locked, so that we can detect dirty situations
877 This lock isn't just to make access to the metadata safe: it's also
878 designed to be used over the two-step process of checking the
879 metadata and then responding to an authorization request, to
880 ensure that at the point we respond the metadata hasn't changed
881 in the background. It's key to how we avoid security holes
882 resulting from races during that problem ,
884 return self
._lock
(self
._volume
_metadata
_path
(volume_path
))
886 def _volume_metadata_get(self
, volume_path
):
888 Call me with the metadata locked!
890 Check whether a volume metadata structure can be decoded by the current
891 version of CephFSVolumeClient.
893 Return a volume_metadata structure that the current version of
894 CephFSVolumeClient can decode.
896 volume_metadata
= self
._metadata
_get
(self
._volume
_metadata
_path
(volume_path
))
899 self
._check
_compat
_version
(volume_metadata
['compat_version'])
901 return volume_metadata
903 def _volume_metadata_set(self
, volume_path
, data
):
905 Call me with the metadata locked!
907 Add two version attributes to the volume metadata,
908 'compat_version', the minimum CephFSVolumeClient version that can
909 decode the metadata and 'version', the CephFSVolumeClient version
910 that encoded the metadata.
912 data
['compat_version'] = 1
913 data
['version'] = self
.version
914 return self
._metadata
_set
(self
._volume
_metadata
_path
(volume_path
), data
)
916 def authorize(self
, volume_path
, auth_id
, readonly
=False, tenant_id
=None):
918 Get-or-create a Ceph auth identity for `auth_id` and grant them access
923 :param tenant_id: Optionally provide a stringizable object to
924 restrict any created cephx IDs to other callers
925 passing the same tenant ID.
929 with self
._auth
_lock
(auth_id
):
930 # Existing meta, or None, to be updated
931 auth_meta
= self
._auth
_metadata
_get
(auth_id
)
933 # volume data to be inserted
934 volume_path_str
= str(volume_path
)
937 # The access level at which the auth_id is authorized to
939 'access_level': 'r' if readonly
else 'rw',
943 if auth_meta
is None:
944 sys
.stderr
.write("Creating meta for ID {0} with tenant {1}\n".format(
947 log
.debug("Authorize: no existing meta")
950 'tenant_id': tenant_id
.__str
__() if tenant_id
else None,
954 # Note: this is *not* guaranteeing that the key doesn't already
955 # exist in Ceph: we are allowing VolumeClient tenants to
956 # 'claim' existing Ceph keys. In order to prevent VolumeClient
957 # tenants from reading e.g. client.admin keys, you need to
958 # have configured your VolumeClient user (e.g. Manila) to
959 # have mon auth caps that prevent it from accessing those keys
960 # (e.g. limit it to only access keys with a manila.* prefix)
962 # Disallow tenants to share auth IDs
963 if auth_meta
['tenant_id'].__str
__() != tenant_id
.__str
__():
964 msg
= "auth ID: {0} is already in use".format(auth_id
)
966 raise CephFSVolumeClientError(msg
)
968 if auth_meta
['dirty']:
969 self
._recover
_auth
_meta
(auth_id
, auth_meta
)
971 log
.debug("Authorize: existing tenant {tenant}".format(
972 tenant
=auth_meta
['tenant_id']
974 auth_meta
['dirty'] = True
975 auth_meta
['volumes'].update(volume
)
977 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
979 with self
._volume
_lock
(volume_path
):
980 key
= self
._authorize
_volume
(volume_path
, auth_id
, readonly
)
982 auth_meta
['dirty'] = False
983 auth_meta
['volumes'][volume_path_str
]['dirty'] = False
984 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
991 # Caller wasn't multi-tenant aware: be safe and don't give
997 def _authorize_volume(self
, volume_path
, auth_id
, readonly
):
998 vol_meta
= self
._volume
_metadata
_get
(volume_path
)
1000 access_level
= 'r' if readonly
else 'rw'
1003 'access_level': access_level
,
1008 if vol_meta
is None:
1013 vol_meta
['auths'].update(auth
)
1014 self
._volume
_metadata
_set
(volume_path
, vol_meta
)
1016 key
= self
._authorize
_ceph
(volume_path
, auth_id
, readonly
)
1018 vol_meta
['auths'][auth_id
]['dirty'] = False
1019 self
._volume
_metadata
_set
(volume_path
, vol_meta
)
1023 def _authorize_ceph(self
, volume_path
, auth_id
, readonly
):
1024 path
= self
._get
_path
(volume_path
)
1025 log
.debug("Authorizing Ceph id '{0}' for path '{1}'".format(
1029 # First I need to work out what the data pool is for this share:
1031 pool_name
= self
._get
_ancestor
_xattr
(path
, "ceph.dir.layout.pool")
1034 namespace
= self
.fs
.getxattr(path
, "ceph.dir.layout.pool_namespace")
1035 except cephfs
.NoData
:
1038 # Now construct auth capabilities that give the guest just enough
1039 # permissions to access the share
1040 client_entity
= "client.{0}".format(auth_id
)
1041 want_access_level
= 'r' if readonly
else 'rw'
1042 want_mds_cap
= 'allow {0} path={1}'.format(want_access_level
, path
)
1044 want_osd_cap
= 'allow {0} pool={1} namespace={2}'.format(
1045 want_access_level
, pool_name
, namespace
)
1047 want_osd_cap
= 'allow {0} pool={1}'.format(want_access_level
,
1051 existing
= self
._rados
_command
(
1054 'entity': client_entity
1057 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1059 caps
= self
._rados
_command
(
1060 'auth get-or-create',
1062 'entity': client_entity
,
1064 'mds', want_mds_cap
,
1065 'osd', want_osd_cap
,
1069 # entity exists, update it
1072 # Construct auth caps that if present might conflict with the desired
1074 unwanted_access_level
= 'r' if want_access_level
is 'rw' else 'rw'
1075 unwanted_mds_cap
= 'allow {0} path={1}'.format(unwanted_access_level
, path
)
1077 unwanted_osd_cap
= 'allow {0} pool={1} namespace={2}'.format(
1078 unwanted_access_level
, pool_name
, namespace
)
1080 unwanted_osd_cap
= 'allow {0} pool={1}'.format(
1081 unwanted_access_level
, pool_name
)
1084 orig_mds_caps
, orig_osd_caps
, want_mds_cap
,
1085 want_osd_cap
, unwanted_mds_cap
, unwanted_osd_cap
):
1087 if not orig_mds_caps
:
1088 return want_mds_cap
, want_osd_cap
1090 mds_cap_tokens
= orig_mds_caps
.split(",")
1091 osd_cap_tokens
= orig_osd_caps
.split(",")
1093 if want_mds_cap
in mds_cap_tokens
:
1094 return orig_mds_caps
, orig_osd_caps
1096 if unwanted_mds_cap
in mds_cap_tokens
:
1097 mds_cap_tokens
.remove(unwanted_mds_cap
)
1098 osd_cap_tokens
.remove(unwanted_osd_cap
)
1100 mds_cap_tokens
.append(want_mds_cap
)
1101 osd_cap_tokens
.append(want_osd_cap
)
1103 return ",".join(mds_cap_tokens
), ",".join(osd_cap_tokens
)
1105 orig_mds_caps
= cap
['caps'].get('mds', "")
1106 orig_osd_caps
= cap
['caps'].get('osd', "")
1108 mds_cap_str
, osd_cap_str
= cap_update(
1109 orig_mds_caps
, orig_osd_caps
, want_mds_cap
, want_osd_cap
,
1110 unwanted_mds_cap
, unwanted_osd_cap
)
1112 caps
= self
._rados
_command
(
1115 'entity': client_entity
,
1119 'mon', cap
['caps'].get('mon', 'allow r')]
1121 caps
= self
._rados
_command
(
1124 'entity': client_entity
1128 # Result expected like this:
1131 # "entity": "client.foobar",
1132 # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==",
1139 assert len(caps
) == 1
1140 assert caps
[0]['entity'] == client_entity
1141 return caps
[0]['key']
1143 def deauthorize(self
, volume_path
, auth_id
):
1144 with self
._auth
_lock
(auth_id
):
1145 # Existing meta, or None, to be updated
1146 auth_meta
= self
._auth
_metadata
_get
(auth_id
)
1148 volume_path_str
= str(volume_path
)
1149 if (auth_meta
is None) or (not auth_meta
['volumes']):
1150 log
.warn("deauthorized called for already-removed auth"
1151 "ID '{auth_id}' for volume ID '{volume}'".format(
1152 auth_id
=auth_id
, volume
=volume_path
.volume_id
1154 # Clean up the auth meta file of an auth ID
1155 self
.fs
.unlink(self
._auth
_metadata
_path
(auth_id
))
1158 if volume_path_str
not in auth_meta
['volumes']:
1159 log
.warn("deauthorized called for already-removed auth"
1160 "ID '{auth_id}' for volume ID '{volume}'".format(
1161 auth_id
=auth_id
, volume
=volume_path
.volume_id
1165 if auth_meta
['dirty']:
1166 self
._recover
_auth
_meta
(auth_id
, auth_meta
)
1168 auth_meta
['dirty'] = True
1169 auth_meta
['volumes'][volume_path_str
]['dirty'] = True
1170 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
1172 self
._deauthorize
_volume
(volume_path
, auth_id
)
1174 # Filter out the volume we're deauthorizing
1175 del auth_meta
['volumes'][volume_path_str
]
1177 # Clean up auth meta file
1178 if not auth_meta
['volumes']:
1179 self
.fs
.unlink(self
._auth
_metadata
_path
(auth_id
))
1182 auth_meta
['dirty'] = False
1183 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
1185 def _deauthorize_volume(self
, volume_path
, auth_id
):
1186 with self
._volume
_lock
(volume_path
):
1187 vol_meta
= self
._volume
_metadata
_get
(volume_path
)
1189 if (vol_meta
is None) or (auth_id
not in vol_meta
['auths']):
1190 log
.warn("deauthorized called for already-removed auth"
1191 "ID '{auth_id}' for volume ID '{volume}'".format(
1192 auth_id
=auth_id
, volume
=volume_path
.volume_id
1196 vol_meta
['auths'][auth_id
]['dirty'] = True
1197 self
._volume
_metadata
_set
(volume_path
, vol_meta
)
1199 self
._deauthorize
(volume_path
, auth_id
)
1201 # Remove the auth_id from the metadata *after* removing it
1202 # from ceph, so that if we crashed here, we would actually
1203 # recreate the auth ID during recovery (i.e. end up with
1204 # a consistent state).
1206 # Filter out the auth we're removing
1207 del vol_meta
['auths'][auth_id
]
1208 self
._volume
_metadata
_set
(volume_path
, vol_meta
)
1210 def _deauthorize(self
, volume_path
, auth_id
):
1212 The volume must still exist.
1214 client_entity
= "client.{0}".format(auth_id
)
1215 path
= self
._get
_path
(volume_path
)
1216 pool_name
= self
._get
_ancestor
_xattr
(path
, "ceph.dir.layout.pool")
1218 namespace
= self
.fs
.getxattr(path
, "ceph.dir.layout.pool_namespace")
1219 except cephfs
.NoData
:
1222 # The auth_id might have read-only or read-write mount access for the
1224 access_levels
= ('r', 'rw')
1225 want_mds_caps
= ['allow {0} path={1}'.format(access_level
, path
)
1226 for access_level
in access_levels
]
1228 want_osd_caps
= ['allow {0} pool={1} namespace={2}'.format(access_level
, pool_name
, namespace
)
1229 for access_level
in access_levels
]
1231 want_osd_caps
= ['allow {0} pool={1}'.format(access_level
, pool_name
)
1232 for access_level
in access_levels
]
1236 existing
= self
._rados
_command
(
1239 'entity': client_entity
1243 def cap_remove(orig_mds_caps
, orig_osd_caps
, want_mds_caps
, want_osd_caps
):
1244 mds_cap_tokens
= orig_mds_caps
.split(",")
1245 osd_cap_tokens
= orig_osd_caps
.split(",")
1247 for want_mds_cap
, want_osd_cap
in zip(want_mds_caps
, want_osd_caps
):
1248 if want_mds_cap
in mds_cap_tokens
:
1249 mds_cap_tokens
.remove(want_mds_cap
)
1250 osd_cap_tokens
.remove(want_osd_cap
)
1253 return ",".join(mds_cap_tokens
), ",".join(osd_cap_tokens
)
1256 orig_mds_caps
= cap
['caps'].get('mds', "")
1257 orig_osd_caps
= cap
['caps'].get('osd', "")
1258 mds_cap_str
, osd_cap_str
= cap_remove(orig_mds_caps
, orig_osd_caps
,
1259 want_mds_caps
, want_osd_caps
)
1262 self
._rados
_command
('auth del', {'entity': client_entity
}, decode
=False)
1264 self
._rados
_command
(
1267 'entity': client_entity
,
1271 'mon', cap
['caps'].get('mon', 'allow r')]
1274 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1276 # Already gone, great.
1279 def get_authorized_ids(self
, volume_path
):
1281 Expose a list of auth IDs that have access to a volume.
1283 return: a list of (auth_id, access_level) tuples, where
1284 the access_level can be 'r' , or 'rw'.
1285 None if no auth ID is given access to the volume.
1287 with self
._volume
_lock
(volume_path
):
1288 meta
= self
._volume
_metadata
_get
(volume_path
)
1290 if not meta
or not meta
['auths']:
1293 for auth
, auth_data
in meta
['auths'].items():
1294 # Skip partial auth updates.
1295 if not auth_data
['dirty']:
1296 auths
.append((auth
, auth_data
['access_level']))
1300 def _rados_command(self
, prefix
, args
=None, decode
=True):
1302 Safer wrapper for ceph_argparse.json_command, which raises
1303 Error exception instead of relying on caller to check return
1306 Error exception can result from:
1308 * Actual legitimate errors
1309 * Malformed JSON output
1311 return: Decoded object from ceph, or None if empty string returned.
1312 If decode is False, return a string (the data returned by
1318 argdict
= args
.copy()
1319 argdict
['format'] = 'json'
1321 ret
, outbuf
, outs
= json_command(self
.rados
,
1324 timeout
=RADOS_TIMEOUT
)
1326 raise rados
.Error(outs
)
1331 return json
.loads(outbuf
)
1332 except (ValueError, TypeError):
1333 raise RadosError("Invalid JSON output for command {0}".format(argdict
))
1339 def get_used_bytes(self
, volume_path
):
1340 return int(self
.fs
.getxattr(self
._get
_path
(volume_path
), "ceph.dir.rbytes"))
1342 def set_max_bytes(self
, volume_path
, max_bytes
):
1343 self
.fs
.setxattr(self
._get
_path
(volume_path
), 'ceph.quota.max_bytes',
1344 max_bytes
.__str
__() if max_bytes
is not None else "0",
1347 def _snapshot_path(self
, dir_path
, snapshot_name
):
1348 return os
.path
.join(
1349 dir_path
, self
.rados
.conf_get('client_snapdir'), snapshot_name
1352 def _snapshot_create(self
, dir_path
, snapshot_name
):
1353 # TODO: raise intelligible exception for clusters where snaps are disabled
1354 self
.fs
.mkdir(self
._snapshot
_path
(dir_path
, snapshot_name
), 0o755)
1356 def _snapshot_destroy(self
, dir_path
, snapshot_name
):
1358 Remove a snapshot, or do nothing if it already doesn't exist.
1361 self
.fs
.rmdir(self
._snapshot
_path
(dir_path
, snapshot_name
))
1362 except cephfs
.ObjectNotFound
:
1363 log
.warn("Snapshot was already gone: {0}".format(snapshot_name
))
1365 def create_snapshot_volume(self
, volume_path
, snapshot_name
):
1366 self
._snapshot
_create
(self
._get
_path
(volume_path
), snapshot_name
)
1368 def destroy_snapshot_volume(self
, volume_path
, snapshot_name
):
1369 self
._snapshot
_destroy
(self
._get
_path
(volume_path
), snapshot_name
)
1371 def create_snapshot_group(self
, group_id
, snapshot_name
):
1372 if group_id
is None:
1373 raise RuntimeError("Group ID may not be None")
1375 return self
._snapshot
_create
(self
._get
_group
_path
(group_id
), snapshot_name
)
1377 def destroy_snapshot_group(self
, group_id
, snapshot_name
):
1378 if group_id
is None:
1379 raise RuntimeError("Group ID may not be None")
1380 if snapshot_name
is None:
1381 raise RuntimeError("Snapshot name may not be None")
1383 return self
._snapshot
_destroy
(self
._get
_group
_path
(group_id
), snapshot_name
)
1385 def _cp_r(self
, src
, dst
):
1387 raise NotImplementedError()
1389 def clone_volume_to_existing(self
, dest_volume_path
, src_volume_path
, src_snapshot_name
):
1390 dest_fs_path
= self
._get
_path
(dest_volume_path
)
1391 src_snapshot_path
= self
._snapshot
_path
(self
._get
_path
(src_volume_path
), src_snapshot_name
)
1393 self
._cp
_r
(src_snapshot_path
, dest_fs_path
)
1395 def put_object(self
, pool_name
, object_name
, data
):
1397 Synchronously write data to an object.
1399 :param pool_name: name of the pool
1400 :type pool_name: str
1401 :param object_name: name of the object
1402 :type object_name: str
1403 :param data: data to write
1406 ioctx
= self
.rados
.open_ioctx(pool_name
)
1407 max_size
= int(self
.rados
.conf_get('osd_max_write_size')) * 1024 * 1024
1408 if len(data
) > max_size
:
1409 msg
= ("Data to be written to object '{0}' exceeds "
1410 "{1} bytes".format(object_name
, max_size
))
1412 raise CephFSVolumeClientError(msg
)
1414 ioctx
.write_full(object_name
, data
)
1418 def get_object(self
, pool_name
, object_name
):
1420 Synchronously read data from object.
1422 :param pool_name: name of the pool
1423 :type pool_name: str
1424 :param object_name: name of the object
1425 :type object_name: str
1427 :returns: bytes - data read from object
1429 ioctx
= self
.rados
.open_ioctx(pool_name
)
1430 max_size
= int(self
.rados
.conf_get('osd_max_write_size')) * 1024 * 1024
1432 bytes_read
= ioctx
.read(object_name
, max_size
)
1433 if ((len(bytes_read
) == max_size
) and
1434 (ioctx
.read(object_name
, 1, offset
=max_size
))):
1435 log
.warning("Size of object {0} exceeds '{1}' bytes "
1436 "read".format(object_name
, max_size
))
1441 def delete_object(self
, pool_name
, object_name
):
1442 ioctx
= self
.rados
.open_ioctx(pool_name
)
1444 ioctx
.remove_object(object_name
)
1445 except rados
.ObjectNotFound
:
1446 log
.warn("Object '{0}' was already removed".format(object_name
))