2 Copyright (C) 2015 Red Hat, Inc.
4 LGPL2. See file COPYING.
7 from contextlib
import contextmanager
20 from ceph_argparse
import json_command
26 class RadosError(Exception):
28 Something went wrong talking to Ceph with librados
35 log
= logging
.getLogger(__name__
)
38 # Reserved volume group name which we use in paths for volumes
39 # that are not assigned to a group (i.e. created with group=None)
40 NO_GROUP_NAME
= "_nogroup"
42 # Filename extensions for meta files.
43 META_FILE_EXT
= ".meta"
45 class VolumePath(object):
47 Identify a volume's path as group->volume
48 The Volume ID is a unique identifier, but this is a much more
49 helpful thing to pass around.
51 def __init__(self
, group_id
, volume_id
):
52 self
.group_id
= group_id
53 self
.volume_id
= volume_id
54 assert self
.group_id
!= NO_GROUP_NAME
55 assert self
.volume_id
!= "" and self
.volume_id
is not None
58 return "{0}/{1}".format(self
.group_id
, self
.volume_id
)
61 class ClusterTimeout(Exception):
63 Exception indicating that we timed out trying to talk to the Ceph cluster,
64 either to the mons, or to any individual daemon that the mons indicate ought
65 to be up but isn't responding to us.
70 class ClusterError(Exception):
72 Exception indicating that the cluster returned an error to a command that
73 we thought should be successful based on our last knowledge of the cluster
76 def __init__(self
, action
, result_code
, result_str
):
78 self
._result
_code
= result_code
79 self
._result
_str
= result_str
82 return "Error {0} (\"{1}\") while {2}".format(
83 self
._result
_code
, self
._result
_str
, self
._action
)
86 class RankEvicter(threading
.Thread
):
88 Thread for evicting client(s) from a particular MDS daemon instance.
90 This is more complex than simply sending a command, because we have to
91 handle cases where MDS daemons might not be fully up yet, and/or might
92 be transiently unresponsive to commands.
94 class GidGone(Exception):
99 def __init__(self
, volume_client
, client_spec
, rank
, gid
, mds_map
, ready_timeout
):
101 :param client_spec: list of strings, used as filter arguments to "session evict"
102 pass ["id=123"] to evict a single client with session id 123.
106 self
._mds
_map
= mds_map
107 self
._client
_spec
= client_spec
108 self
._volume
_client
= volume_client
109 self
._ready
_timeout
= ready_timeout
110 self
._ready
_waited
= 0
113 self
.exception
= None
115 super(RankEvicter
, self
).__init
__()
117 def _ready_to_evict(self
):
118 if self
._mds
_map
['up'].get("mds_{0}".format(self
.rank
), None) != self
.gid
:
119 log
.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format(
120 self
._client
_spec
, self
.rank
, self
.gid
122 raise RankEvicter
.GidGone()
124 info
= self
._mds
_map
['info']["gid_{0}".format(self
.gid
)]
125 log
.debug("_ready_to_evict: state={0}".format(info
['state']))
126 return info
['state'] in ["up:active", "up:clientreplay"]
128 def _wait_for_ready(self
):
130 Wait for that MDS rank to reach an active or clientreplay state, and
133 while not self
._ready
_to
_evict
():
134 if self
._ready
_waited
> self
._ready
_timeout
:
135 raise ClusterTimeout()
137 time
.sleep(self
.POLL_PERIOD
)
138 self
._ready
_waited
+= self
.POLL_PERIOD
140 self
._mds
_map
= self
._volume
_client
._rados
_command
("mds dump", {})
144 Run the eviction procedure. Return true on success, false on errors.
147 # Wait til the MDS is believed by the mon to be available for commands
149 self
._wait
_for
_ready
()
153 # Then send it an evict
154 ret
= errno
.ETIMEDOUT
155 while ret
== errno
.ETIMEDOUT
:
156 log
.debug("mds_command: {0}, {1}".format(
157 "%s" % self
.gid
, ["session", "evict"] + self
._client
_spec
159 ret
, outb
, outs
= self
._volume
_client
.fs
.mds_command(
162 "prefix": "session evict",
163 "filters": self
._client
_spec
165 log
.debug("mds_command: complete {0} {1}".format(ret
, outs
))
167 # If we get a clean response, great, it's gone from that rank.
170 elif ret
== errno
.ETIMEDOUT
:
171 # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error)
172 self
._mds
_map
= self
._volume
_client
._rados
_command
("mds dump", {})
174 self
._wait
_for
_ready
()
178 raise ClusterError("Sending evict to mds.{0}".format(self
.gid
), ret
, outs
)
183 except Exception as e
:
190 class EvictionError(Exception):
194 class CephFSVolumeClientError(Exception):
196 Something went wrong talking to Ceph using CephFSVolumeClient.
201 CEPHFSVOLUMECLIENT_VERSION_HISTORY
= """
203 CephFSVolumeClient Version History:
205 * 1 - Initial version
206 * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient
211 class CephFSVolumeClient(object):
213 Combine libcephfs and librados interfaces to implement a
214 'Volume' concept implemented as a cephfs directory and
215 client capabilities which restrict mount access to this
218 Additionally, volumes may be in a 'Group'. Conveniently,
219 volumes are a lot like manila shares, and groups are a lot
220 like manila consistency groups.
222 Refer to volumes with VolumePath, which specifies the
223 volume and group IDs (both strings). The group ID may
226 In general, functions in this class are allowed raise rados.Error
227 or cephfs.Error exceptions in unexpected situations.
233 # Where shall we create our volumes?
234 POOL_PREFIX
= "fsvolume_"
235 DEFAULT_VOL_PREFIX
= "/volumes"
236 DEFAULT_NS_PREFIX
= "fsvolumens_"
238 def __init__(self
, auth_id
, conf_path
, cluster_name
, volume_prefix
=None, pool_ns_prefix
=None):
241 self
.connected
= False
242 self
.conf_path
= conf_path
243 self
.cluster_name
= cluster_name
244 self
.auth_id
= auth_id
245 self
.volume_prefix
= volume_prefix
if volume_prefix
else self
.DEFAULT_VOL_PREFIX
246 self
.pool_ns_prefix
= pool_ns_prefix
if pool_ns_prefix
else self
.DEFAULT_NS_PREFIX
247 # For flock'ing in cephfs, I want a unique ID to distinguish me
248 # from any other manila-share services that are loading this module.
249 # We could use pid, but that's unnecessary weak: generate a
251 self
._id
= struct
.unpack(">Q", uuid
.uuid1().get_bytes()[0:8])[0]
253 # TODO: version the on-disk structures
256 # Scan all auth keys to see if they're dirty: if they are, they have
257 # state that might not have propagated to Ceph or to the related
260 # Important: we *always* acquire locks in the order auth->volume
261 # That means a volume can never be dirty without the auth key
262 # we're updating it with being dirty at the same time.
264 # First list the auth IDs that have potentially dirty on-disk metadata
265 log
.debug("Recovering from partial auth updates (if any)...")
268 dir_handle
= self
.fs
.opendir(self
.volume_prefix
)
269 except cephfs
.ObjectNotFound
:
270 log
.debug("Nothing to recover. No auth meta files.")
273 d
= self
.fs
.readdir(dir_handle
)
277 log
.debug("Nothing to recover. No auth meta files.")
280 # Identify auth IDs from auth meta filenames. The auth meta files
281 # are named as, "$<auth_id><meta filename extension>"
282 regex
= "^\$(.*){0}$".format(re
.escape(META_FILE_EXT
))
283 match
= re
.search(regex
, d
.d_name
)
285 auth_ids
.append(match
.group(1))
287 d
= self
.fs
.readdir(dir_handle
)
289 self
.fs
.closedir(dir_handle
)
291 # Key points based on ordering:
292 # * Anything added in VMeta is already added in AMeta
293 # * Anything added in Ceph is already added in VMeta
294 # * Anything removed in VMeta is already removed in Ceph
295 # * Anything removed in AMeta is already removed in VMeta
297 # Deauthorization: because I only update metadata AFTER the
298 # update of the next level down, I have the same ordering of
299 # -> things which exist in the AMeta should also exist
300 # in the VMeta, should also exist in Ceph, and the same
301 # recovery procedure that gets me consistent after crashes
302 # during authorization will also work during deauthorization
304 # Now for each auth ID, check for dirty flag and apply updates
305 # if dirty flag is found
306 for auth_id
in auth_ids
:
307 with self
._auth
_lock
(auth_id
):
308 auth_meta
= self
._auth
_metadata
_get
(auth_id
)
309 if not auth_meta
or not auth_meta
['volumes']:
310 # Clean up auth meta file
311 self
.fs
.unlink(self
._auth
_metadata
_path
(auth_id
))
313 if not auth_meta
['dirty']:
315 self
._recover
_auth
_meta
(auth_id
, auth_meta
)
317 log
.debug("Recovered from partial auth updates (if any).")
319 def _recover_auth_meta(self
, auth_id
, auth_meta
):
321 Call me after locking the auth meta file.
325 for volume
, volume_data
in auth_meta
['volumes'].items():
326 if not volume_data
['dirty']:
329 (group_id
, volume_id
) = volume
.split('/')
330 group_id
= group_id
if group_id
is not 'None' else None
331 volume_path
= VolumePath(group_id
, volume_id
)
332 access_level
= volume_data
['access_level']
334 with self
._volume
_lock
(volume_path
):
335 vol_meta
= self
._volume
_metadata
_get
(volume_path
)
337 # No VMeta update indicates that there was no auth update
338 # in Ceph either. So it's safe to remove corresponding
339 # partial update in AMeta.
340 if not vol_meta
or auth_id
not in vol_meta
['auths']:
341 remove_volumes
.append(volume
)
345 'access_level': access_level
,
348 # VMeta update looks clean. Ceph auth update must have been
350 if vol_meta
['auths'][auth_id
] == want_auth
:
353 readonly
= True if access_level
is 'r' else False
354 self
._authorize
_volume
(volume_path
, auth_id
, readonly
)
356 # Recovered from partial auth updates for the auth ID's access
358 auth_meta
['volumes'][volume
]['dirty'] = False
359 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
361 for volume
in remove_volumes
:
362 del auth_meta
['volumes'][volume
]
364 if not auth_meta
['volumes']:
365 # Clean up auth meta file
366 self
.fs
.unlink(self
._auth
_metadata
_path
(auth_id
))
369 # Recovered from all partial auth updates for the auth ID.
370 auth_meta
['dirty'] = False
371 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
374 def evict(self
, auth_id
, timeout
=30, volume_path
=None):
376 Evict all clients based on the authorization ID and optionally based on
377 the volume path mounted. Assumes that the authorization key has been
378 revoked prior to calling this function.
380 This operation can throw an exception if the mon cluster is unresponsive, or
381 any individual MDS daemon is unresponsive for longer than the timeout passed in.
384 client_spec
= ["auth_name={0}".format(auth_id
), ]
386 client_spec
.append("client_metadata.root={0}".
387 format(self
._get
_path
(volume_path
)))
389 log
.info("evict clients with {0}".format(', '.join(client_spec
)))
391 mds_map
= self
._rados
_command
("mds dump", {})
394 for name
, gid
in mds_map
['up'].items():
395 # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
396 assert name
.startswith("mds_")
397 up
[int(name
[4:])] = gid
399 # For all MDS ranks held by a daemon
400 # Do the parallelism in python instead of using "tell mds.*", because
401 # the latter doesn't give us per-mds output
403 for rank
, gid
in up
.items():
404 thread
= RankEvicter(self
, client_spec
, rank
, gid
, mds_map
,
407 threads
.append(thread
)
412 log
.info("evict: joined all")
416 msg
= ("Failed to evict client with {0} from mds {1}/{2}: {3}".
417 format(', '.join(client_spec
), t
.rank
, t
.gid
, t
.exception
)
420 raise EvictionError(msg
)
422 def _get_path(self
, volume_path
):
424 Determine the path within CephFS where this volume will live
425 :return: absolute path (string)
429 volume_path
.group_id
if volume_path
.group_id
is not None else NO_GROUP_NAME
,
430 volume_path
.volume_id
)
432 def _get_group_path(self
, group_id
):
434 raise ValueError("group_id may not be None")
441 def connect(self
, premount_evict
= None):
444 :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers
445 may want to use this to specify their own auth ID if they expect
446 to be a unique instance and don't want to wait for caps to time
447 out after failure of another instance of themselves.
449 log
.debug("Connecting to RADOS with config {0}...".format(self
.conf_path
))
450 self
.rados
= rados
.Rados(
451 name
="client.{0}".format(self
.auth_id
),
452 clustername
=self
.cluster_name
,
453 conffile
=self
.conf_path
,
458 log
.debug("Connection to RADOS complete")
460 log
.debug("Connecting to cephfs...")
461 self
.fs
= cephfs
.LibCephFS(rados_inst
=self
.rados
)
462 log
.debug("CephFS initializing...")
464 if premount_evict
is not None:
465 log
.debug("Premount eviction of {0} starting".format(premount_evict
))
466 self
.evict(premount_evict
)
467 log
.debug("Premount eviction of {0} completes".format(premount_evict
))
468 log
.debug("CephFS mounting...")
470 log
.debug("Connection to cephfs complete")
472 # Recover from partial auth updates due to a previous
476 def get_mon_addrs(self
):
477 log
.info("get_mon_addrs")
479 mon_map
= self
._rados
_command
("mon dump")
480 for mon
in mon_map
['mons']:
481 ip_port
= mon
['addr'].split("/")[0]
482 result
.append(ip_port
)
486 def disconnect(self
):
487 log
.info("disconnect")
489 log
.debug("Disconnecting cephfs...")
492 log
.debug("Disconnecting cephfs complete")
495 log
.debug("Disconnecting rados...")
496 self
.rados
.shutdown()
498 log
.debug("Disconnecting rados complete")
503 def _get_pool_id(self
, osd_map
, pool_name
):
504 # Maybe borrow the OSDMap wrapper class from calamari if more helpers
505 # like this are needed.
506 for pool
in osd_map
['pools']:
507 if pool
['pool_name'] == pool_name
:
512 def _create_volume_pool(self
, pool_name
):
514 Idempotently create a pool for use as a CephFS data pool, with the given name
516 :return The ID of the created pool
518 osd_map
= self
._rados
_command
('osd dump', {})
520 existing_id
= self
._get
_pool
_id
(osd_map
, pool_name
)
521 if existing_id
is not None:
522 log
.info("Pool {0} already exists".format(pool_name
))
525 osd_count
= len(osd_map
['osds'])
527 # We can't query the actual cluster config remotely, but since this is
528 # just a heuristic we'll assume that the ceph.conf we have locally reflects
529 # that in use in the rest of the cluster.
530 pg_warn_max_per_osd
= int(self
.rados
.conf_get('mon_max_pg_per_osd'))
533 for pool
in osd_map
['pools']:
534 if not pool
['pool_name'].startswith(self
.POOL_PREFIX
):
535 other_pgs
+= pool
['pg_num']
537 # A basic heuristic for picking pg_num: work out the max number of
538 # PGs we can have without tripping a warning, then subtract the number
539 # of PGs already created by non-manila pools, then divide by ten. That'll
540 # give you a reasonable result on a system where you have "a few" manila
542 pg_num
= ((pg_warn_max_per_osd
* osd_count
) - other_pgs
) / 10
543 # TODO Alternatively, respect an override set by the user.
553 osd_map
= self
._rados
_command
('osd dump', {})
554 pool_id
= self
._get
_pool
_id
(osd_map
, pool_name
)
557 # If the pool isn't there, that's either a ceph bug, or it's some outside influence
558 # removing it right after we created it.
559 log
.error("OSD map doesn't contain expected pool '{0}':\n{1}".format(
560 pool_name
, json
.dumps(osd_map
, indent
=2)
562 raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name
))
566 def create_group(self
, group_id
):
567 # Prevent craftily-named volume groups from colliding with the meta
569 if group_id
.endswith(META_FILE_EXT
):
570 raise ValueError("group ID cannot end with '{0}'.".format(
572 path
= self
._get
_group
_path
(group_id
)
575 def destroy_group(self
, group_id
):
576 path
= self
._get
_group
_path
(group_id
)
578 self
.fs
.stat(self
.volume_prefix
)
579 except cephfs
.ObjectNotFound
:
584 def _mkdir_p(self
, path
):
587 except cephfs
.ObjectNotFound
:
592 parts
= path
.split(os
.path
.sep
)
594 for i
in range(1, len(parts
) + 1):
595 subpath
= os
.path
.join(*parts
[0:i
])
597 self
.fs
.stat(subpath
)
598 except cephfs
.ObjectNotFound
:
599 self
.fs
.mkdir(subpath
, 0o755)
601 def create_volume(self
, volume_path
, size
=None, data_isolated
=False):
603 Set up metadata, pools and auth for a volume.
605 This function is idempotent. It is safe to call this again
606 for an already-created volume, even if it is in use.
608 :param volume_path: VolumePath instance
609 :param size: In bytes, or None for no size limit
610 :param data_isolated: If true, create a separate OSD pool for this volume
613 path
= self
._get
_path
(volume_path
)
614 log
.info("create_volume: {0}".format(path
))
619 self
.fs
.setxattr(path
, 'ceph.quota.max_bytes', size
.__str
__(), 0)
621 # data_isolated means create a separate pool for this volume
623 pool_name
= "{0}{1}".format(self
.POOL_PREFIX
, volume_path
.volume_id
)
624 log
.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path
, pool_name
))
625 pool_id
= self
._create
_volume
_pool
(pool_name
)
626 mds_map
= self
._rados
_command
("mds dump", {})
627 if pool_id
not in mds_map
['data_pools']:
628 self
._rados
_command
("mds add_data_pool", {
631 self
.fs
.setxattr(path
, 'ceph.dir.layout.pool', pool_name
, 0)
633 # enforce security isolation, use seperate namespace for this volume
634 namespace
= "{0}{1}".format(self
.pool_ns_prefix
, volume_path
.volume_id
)
635 log
.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path
, namespace
))
636 self
.fs
.setxattr(path
, 'ceph.dir.layout.pool_namespace', namespace
, 0)
638 # Create a volume meta file, if it does not already exist, to store
639 # data about auth ids having access to the volume
640 fd
= self
.fs
.open(self
._volume
_metadata
_path
(volume_path
),
648 def delete_volume(self
, volume_path
, data_isolated
=False):
650 Make a volume inaccessible to guests. This function is
651 idempotent. This is the fast part of tearing down a volume: you must
652 also later call purge_volume, which is the slow part.
654 :param volume_path: Same identifier used in create_volume
658 path
= self
._get
_path
(volume_path
)
659 log
.info("delete_volume: {0}".format(path
))
661 # Create the trash folder if it doesn't already exist
662 trash
= os
.path
.join(self
.volume_prefix
, "_deleting")
665 # We'll move it to here
666 trashed_volume
= os
.path
.join(trash
, volume_path
.volume_id
)
668 # Move the volume's data to the trash folder
671 except cephfs
.ObjectNotFound
:
672 log
.warning("Trying to delete volume '{0}' but it's already gone".format(
675 self
.fs
.rename(path
, trashed_volume
)
677 # Delete the volume meta file, if it's not already deleted
678 vol_meta_path
= self
._volume
_metadata
_path
(volume_path
)
680 self
.fs
.unlink(vol_meta_path
)
681 except cephfs
.ObjectNotFound
:
684 def purge_volume(self
, volume_path
, data_isolated
=False):
686 Finish clearing up a volume that was previously passed to delete_volume. This
687 function is idempotent.
690 trash
= os
.path
.join(self
.volume_prefix
, "_deleting")
691 trashed_volume
= os
.path
.join(trash
, volume_path
.volume_id
)
694 self
.fs
.stat(trashed_volume
)
695 except cephfs
.ObjectNotFound
:
696 log
.warning("Trying to purge volume '{0}' but it's already been purged".format(
700 def rmtree(root_path
):
701 log
.debug("rmtree {0}".format(root_path
))
702 dir_handle
= self
.fs
.opendir(root_path
)
703 d
= self
.fs
.readdir(dir_handle
)
705 if d
.d_name
not in [".", ".."]:
706 # Do not use os.path.join because it is sensitive
707 # to string encoding, we just pass through dnames
709 d_full
= "{0}/{1}".format(root_path
, d
.d_name
)
713 self
.fs
.unlink(d_full
)
715 d
= self
.fs
.readdir(dir_handle
)
716 self
.fs
.closedir(dir_handle
)
718 self
.fs
.rmdir(root_path
)
720 rmtree(trashed_volume
)
723 pool_name
= "{0}{1}".format(self
.POOL_PREFIX
, volume_path
.volume_id
)
724 osd_map
= self
._rados
_command
("osd dump", {})
725 pool_id
= self
._get
_pool
_id
(osd_map
, pool_name
)
726 mds_map
= self
._rados
_command
("mds dump", {})
727 if pool_id
in mds_map
['data_pools']:
728 self
._rados
_command
("mds remove_data_pool", {
731 self
._rados
_command
("osd pool delete",
735 "sure": "--yes-i-really-really-mean-it"
738 def _get_ancestor_xattr(self
, path
, attr
):
740 Helper for reading layout information: if this xattr is missing
741 on the requested path, keep checking parents until we find it.
744 result
= self
.fs
.getxattr(path
, attr
)
746 # Annoying! cephfs gives us empty instead of an error when attr not found
747 raise cephfs
.NoData()
750 except cephfs
.NoData
:
754 return self
._get
_ancestor
_xattr
(os
.path
.split(path
)[0], attr
)
756 def _check_compat_version(self
, compat_version
):
757 if self
.version
< compat_version
:
758 msg
= ("The current version of CephFSVolumeClient, version {0} "
759 "does not support the required feature. Need version {1} "
760 "or greater".format(self
.version
, compat_version
)
763 raise CephFSVolumeClientError(msg
)
765 def _metadata_get(self
, path
):
767 Return a deserialized JSON object, or None
769 fd
= self
.fs
.open(path
, "r")
770 # TODO iterate instead of assuming file < 4MB
771 read_bytes
= self
.fs
.read(fd
, 0, 4096 * 1024)
774 return json
.loads(read_bytes
)
778 def _metadata_set(self
, path
, data
):
779 serialized
= json
.dumps(data
)
780 fd
= self
.fs
.open(path
, "w")
782 self
.fs
.write(fd
, serialized
, 0)
787 def _lock(self
, path
):
791 fd
= self
.fs
.open(path
, os
.O_CREAT
, 0o755)
792 self
.fs
.flock(fd
, fcntl
.LOCK_EX
, self
._id
)
794 # The locked file will be cleaned up sometime. It could be
795 # unlinked e.g., by an another manila share instance, before
796 # lock was applied on it. Perform checks to ensure that this
799 statbuf
= self
.fs
.stat(path
)
800 except cephfs
.ObjectNotFound
:
804 fstatbuf
= self
.fs
.fstat(fd
)
805 if statbuf
.st_ino
== fstatbuf
.st_ino
:
811 self
.fs
.flock(fd
, fcntl
.LOCK_UN
, self
._id
)
816 def _auth_metadata_path(self
, auth_id
):
817 return os
.path
.join(self
.volume_prefix
, "${0}{1}".format(
818 auth_id
, META_FILE_EXT
))
820 def _auth_lock(self
, auth_id
):
821 return self
._lock
(self
._auth
_metadata
_path
(auth_id
))
823 def _auth_metadata_get(self
, auth_id
):
825 Call me with the metadata locked!
827 Check whether a auth metadata structure can be decoded by the current
828 version of CephFSVolumeClient.
830 Return auth metadata that the current version of CephFSVolumeClient
833 auth_metadata
= self
._metadata
_get
(self
._auth
_metadata
_path
(auth_id
))
836 self
._check
_compat
_version
(auth_metadata
['compat_version'])
840 def _auth_metadata_set(self
, auth_id
, data
):
842 Call me with the metadata locked!
844 Fsync the auth metadata.
846 Add two version attributes to the auth metadata,
847 'compat_version', the minimum CephFSVolumeClient version that can
848 decode the metadata, and 'version', the CephFSVolumeClient version
849 that encoded the metadata.
851 data
['compat_version'] = 1
852 data
['version'] = self
.version
853 return self
._metadata
_set
(self
._auth
_metadata
_path
(auth_id
), data
)
855 def _volume_metadata_path(self
, volume_path
):
856 return os
.path
.join(self
.volume_prefix
, "_{0}:{1}{2}".format(
857 volume_path
.group_id
if volume_path
.group_id
else "",
858 volume_path
.volume_id
,
862 def _volume_lock(self
, volume_path
):
864 Return a ContextManager which locks the authorization metadata for
865 a particular volume, and persists a flag to the metadata indicating
866 that it is currently locked, so that we can detect dirty situations
869 This lock isn't just to make access to the metadata safe: it's also
870 designed to be used over the two-step process of checking the
871 metadata and then responding to an authorization request, to
872 ensure that at the point we respond the metadata hasn't changed
873 in the background. It's key to how we avoid security holes
874 resulting from races during that problem ,
876 return self
._lock
(self
._volume
_metadata
_path
(volume_path
))
878 def _volume_metadata_get(self
, volume_path
):
880 Call me with the metadata locked!
882 Check whether a volume metadata structure can be decoded by the current
883 version of CephFSVolumeClient.
885 Return a volume_metadata structure that the current version of
886 CephFSVolumeClient can decode.
888 volume_metadata
= self
._metadata
_get
(self
._volume
_metadata
_path
(volume_path
))
891 self
._check
_compat
_version
(volume_metadata
['compat_version'])
893 return volume_metadata
895 def _volume_metadata_set(self
, volume_path
, data
):
897 Call me with the metadata locked!
899 Add two version attributes to the volume metadata,
900 'compat_version', the minimum CephFSVolumeClient version that can
901 decode the metadata and 'version', the CephFSVolumeClient version
902 that encoded the metadata.
904 data
['compat_version'] = 1
905 data
['version'] = self
.version
906 return self
._metadata
_set
(self
._volume
_metadata
_path
(volume_path
), data
)
908 def authorize(self
, volume_path
, auth_id
, readonly
=False, tenant_id
=None):
910 Get-or-create a Ceph auth identity for `auth_id` and grant them access
915 :param tenant_id: Optionally provide a stringizable object to
916 restrict any created cephx IDs to other callers
917 passing the same tenant ID.
921 with self
._auth
_lock
(auth_id
):
922 # Existing meta, or None, to be updated
923 auth_meta
= self
._auth
_metadata
_get
(auth_id
)
925 # volume data to be inserted
926 volume_path_str
= str(volume_path
)
929 # The access level at which the auth_id is authorized to
931 'access_level': 'r' if readonly
else 'rw',
935 if auth_meta
is None:
936 sys
.stderr
.write("Creating meta for ID {0} with tenant {1}\n".format(
939 log
.debug("Authorize: no existing meta")
942 'tenant_id': tenant_id
.__str
__() if tenant_id
else None,
946 # Note: this is *not* guaranteeing that the key doesn't already
947 # exist in Ceph: we are allowing VolumeClient tenants to
948 # 'claim' existing Ceph keys. In order to prevent VolumeClient
949 # tenants from reading e.g. client.admin keys, you need to
950 # have configured your VolumeClient user (e.g. Manila) to
951 # have mon auth caps that prevent it from accessing those keys
952 # (e.g. limit it to only access keys with a manila.* prefix)
954 # Disallow tenants to share auth IDs
955 if auth_meta
['tenant_id'].__str
__() != tenant_id
.__str
__():
956 msg
= "auth ID: {0} is already in use".format(auth_id
)
958 raise CephFSVolumeClientError(msg
)
960 if auth_meta
['dirty']:
961 self
._recover
_auth
_meta
(auth_id
, auth_meta
)
963 log
.debug("Authorize: existing tenant {tenant}".format(
964 tenant
=auth_meta
['tenant_id']
966 auth_meta
['dirty'] = True
967 auth_meta
['volumes'].update(volume
)
969 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
971 with self
._volume
_lock
(volume_path
):
972 key
= self
._authorize
_volume
(volume_path
, auth_id
, readonly
)
974 auth_meta
['dirty'] = False
975 auth_meta
['volumes'][volume_path_str
]['dirty'] = False
976 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
983 # Caller wasn't multi-tenant aware: be safe and don't give
989 def _authorize_volume(self
, volume_path
, auth_id
, readonly
):
990 vol_meta
= self
._volume
_metadata
_get
(volume_path
)
992 access_level
= 'r' if readonly
else 'rw'
995 'access_level': access_level
,
1000 if vol_meta
is None:
1005 vol_meta
['auths'].update(auth
)
1006 self
._volume
_metadata
_set
(volume_path
, vol_meta
)
1008 key
= self
._authorize
_ceph
(volume_path
, auth_id
, readonly
)
1010 vol_meta
['auths'][auth_id
]['dirty'] = False
1011 self
._volume
_metadata
_set
(volume_path
, vol_meta
)
1015 def _authorize_ceph(self
, volume_path
, auth_id
, readonly
):
1016 path
= self
._get
_path
(volume_path
)
1017 log
.debug("Authorizing Ceph id '{0}' for path '{1}'".format(
1021 # First I need to work out what the data pool is for this share:
1023 pool_name
= self
._get
_ancestor
_xattr
(path
, "ceph.dir.layout.pool")
1024 namespace
= self
.fs
.getxattr(path
, "ceph.dir.layout.pool_namespace")
1026 # Now construct auth capabilities that give the guest just enough
1027 # permissions to access the share
1028 client_entity
= "client.{0}".format(auth_id
)
1029 want_access_level
= 'r' if readonly
else 'rw'
1030 want_mds_cap
= 'allow {0} path={1}'.format(want_access_level
, path
)
1031 want_osd_cap
= 'allow {0} pool={1} namespace={2}'.format(
1032 want_access_level
, pool_name
, namespace
)
1035 existing
= self
._rados
_command
(
1038 'entity': client_entity
1041 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1043 caps
= self
._rados
_command
(
1044 'auth get-or-create',
1046 'entity': client_entity
,
1048 'mds', want_mds_cap
,
1049 'osd', want_osd_cap
,
1053 # entity exists, update it
1056 # Construct auth caps that if present might conflict with the desired
1058 unwanted_access_level
= 'r' if want_access_level
is 'rw' else 'rw'
1059 unwanted_mds_cap
= 'allow {0} path={1}'.format(unwanted_access_level
, path
)
1060 unwanted_osd_cap
= 'allow {0} pool={1} namespace={2}'.format(
1061 unwanted_access_level
, pool_name
, namespace
)
1063 def cap_update(orig
, want
, unwanted
):
1064 # Updates the existing auth caps such that there is a single
1065 # occurrence of wanted auth caps and no occurrence of
1066 # conflicting auth caps.
1071 cap_tokens
= set(orig
.split(","))
1073 cap_tokens
.discard(unwanted
)
1074 cap_tokens
.add(want
)
1076 return ",".join(cap_tokens
)
1078 osd_cap_str
= cap_update(cap
['caps'].get('osd', ""), want_osd_cap
, unwanted_osd_cap
)
1079 mds_cap_str
= cap_update(cap
['caps'].get('mds', ""), want_mds_cap
, unwanted_mds_cap
)
1081 caps
= self
._rados
_command
(
1084 'entity': client_entity
,
1088 'mon', cap
['caps'].get('mon', 'allow r')]
1090 caps
= self
._rados
_command
(
1093 'entity': client_entity
1097 # Result expected like this:
1100 # "entity": "client.foobar",
1101 # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==",
1108 assert len(caps
) == 1
1109 assert caps
[0]['entity'] == client_entity
1110 return caps
[0]['key']
1112 def deauthorize(self
, volume_path
, auth_id
):
1113 with self
._auth
_lock
(auth_id
):
1114 # Existing meta, or None, to be updated
1115 auth_meta
= self
._auth
_metadata
_get
(auth_id
)
1117 volume_path_str
= str(volume_path
)
1118 if (auth_meta
is None) or (not auth_meta
['volumes']):
1119 log
.warn("deauthorized called for already-removed auth"
1120 "ID '{auth_id}' for volume ID '{volume}'".format(
1121 auth_id
=auth_id
, volume
=volume_path
.volume_id
1123 # Clean up the auth meta file of an auth ID
1124 self
.fs
.unlink(self
._auth
_metadata
_path
(auth_id
))
1127 if volume_path_str
not in auth_meta
['volumes']:
1128 log
.warn("deauthorized called for already-removed auth"
1129 "ID '{auth_id}' for volume ID '{volume}'".format(
1130 auth_id
=auth_id
, volume
=volume_path
.volume_id
1134 if auth_meta
['dirty']:
1135 self
._recover
_auth
_meta
(auth_id
, auth_meta
)
1137 auth_meta
['dirty'] = True
1138 auth_meta
['volumes'][volume_path_str
]['dirty'] = True
1139 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
1141 self
._deauthorize
_volume
(volume_path
, auth_id
)
1143 # Filter out the volume we're deauthorizing
1144 del auth_meta
['volumes'][volume_path_str
]
1146 # Clean up auth meta file
1147 if not auth_meta
['volumes']:
1148 self
.fs
.unlink(self
._auth
_metadata
_path
(auth_id
))
1151 auth_meta
['dirty'] = False
1152 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
1154 def _deauthorize_volume(self
, volume_path
, auth_id
):
1155 with self
._volume
_lock
(volume_path
):
1156 vol_meta
= self
._volume
_metadata
_get
(volume_path
)
1158 if (vol_meta
is None) or (auth_id
not in vol_meta
['auths']):
1159 log
.warn("deauthorized called for already-removed auth"
1160 "ID '{auth_id}' for volume ID '{volume}'".format(
1161 auth_id
=auth_id
, volume
=volume_path
.volume_id
1165 vol_meta
['auths'][auth_id
]['dirty'] = True
1166 self
._volume
_metadata
_set
(volume_path
, vol_meta
)
1168 self
._deauthorize
(volume_path
, auth_id
)
1170 # Remove the auth_id from the metadata *after* removing it
1171 # from ceph, so that if we crashed here, we would actually
1172 # recreate the auth ID during recovery (i.e. end up with
1173 # a consistent state).
1175 # Filter out the auth we're removing
1176 del vol_meta
['auths'][auth_id
]
1177 self
._volume
_metadata
_set
(volume_path
, vol_meta
)
1179 def _deauthorize(self
, volume_path
, auth_id
):
1181 The volume must still exist.
1183 client_entity
= "client.{0}".format(auth_id
)
1184 path
= self
._get
_path
(volume_path
)
1185 pool_name
= self
._get
_ancestor
_xattr
(path
, "ceph.dir.layout.pool")
1186 namespace
= self
.fs
.getxattr(path
, "ceph.dir.layout.pool_namespace")
1188 # The auth_id might have read-only or read-write mount access for the
1190 access_levels
= ('r', 'rw')
1191 want_mds_caps
= {'allow {0} path={1}'.format(access_level
, path
)
1192 for access_level
in access_levels
}
1193 want_osd_caps
= {'allow {0} pool={1} namespace={2}'.format(
1194 access_level
, pool_name
, namespace
)
1195 for access_level
in access_levels
}
1198 existing
= self
._rados
_command
(
1201 'entity': client_entity
1205 def cap_remove(orig
, want
):
1206 cap_tokens
= set(orig
.split(","))
1207 return ",".join(cap_tokens
.difference(want
))
1210 osd_cap_str
= cap_remove(cap
['caps'].get('osd', ""), want_osd_caps
)
1211 mds_cap_str
= cap_remove(cap
['caps'].get('mds', ""), want_mds_caps
)
1212 if (not osd_cap_str
) and (not mds_cap_str
):
1213 self
._rados
_command
('auth del', {'entity': client_entity
}, decode
=False)
1215 self
._rados
_command
(
1218 'entity': client_entity
,
1222 'mon', cap
['caps'].get('mon', 'allow r')]
1225 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1227 # Already gone, great.
1230 def get_authorized_ids(self
, volume_path
):
1232 Expose a list of auth IDs that have access to a volume.
1234 return: a list of (auth_id, access_level) tuples, where
1235 the access_level can be 'r' , or 'rw'.
1236 None if no auth ID is given access to the volume.
1238 with self
._volume
_lock
(volume_path
):
1239 meta
= self
._volume
_metadata
_get
(volume_path
)
1241 if not meta
or not meta
['auths']:
1244 for auth
, auth_data
in meta
['auths'].items():
1245 # Skip partial auth updates.
1246 if not auth_data
['dirty']:
1247 auths
.append((auth
, auth_data
['access_level']))
1251 def _rados_command(self
, prefix
, args
=None, decode
=True):
1253 Safer wrapper for ceph_argparse.json_command, which raises
1254 Error exception instead of relying on caller to check return
1257 Error exception can result from:
1259 * Actual legitimate errors
1260 * Malformed JSON output
1262 return: Decoded object from ceph, or None if empty string returned.
1263 If decode is False, return a string (the data returned by
1269 argdict
= args
.copy()
1270 argdict
['format'] = 'json'
1272 ret
, outbuf
, outs
= json_command(self
.rados
,
1275 timeout
=RADOS_TIMEOUT
)
1277 raise rados
.Error(outs
)
1282 return json
.loads(outbuf
)
1283 except (ValueError, TypeError):
1284 raise RadosError("Invalid JSON output for command {0}".format(argdict
))
1290 def get_used_bytes(self
, volume_path
):
1291 return int(self
.fs
.getxattr(self
._get
_path
(volume_path
), "ceph.dir.rbytes"))
1293 def set_max_bytes(self
, volume_path
, max_bytes
):
1294 self
.fs
.setxattr(self
._get
_path
(volume_path
), 'ceph.quota.max_bytes',
1295 max_bytes
.__str
__() if max_bytes
is not None else "0",
1298 def _snapshot_path(self
, dir_path
, snapshot_name
):
1299 return os
.path
.join(
1300 dir_path
, self
.rados
.conf_get('client_snapdir'), snapshot_name
1303 def _snapshot_create(self
, dir_path
, snapshot_name
):
1304 # TODO: raise intelligible exception for clusters where snaps are disabled
1305 self
.fs
.mkdir(self
._snapshot
_path
(dir_path
, snapshot_name
), 0o755)
1307 def _snapshot_destroy(self
, dir_path
, snapshot_name
):
1309 Remove a snapshot, or do nothing if it already doesn't exist.
1312 self
.fs
.rmdir(self
._snapshot
_path
(dir_path
, snapshot_name
))
1313 except cephfs
.ObjectNotFound
:
1314 log
.warn("Snapshot was already gone: {0}".format(snapshot_name
))
1316 def create_snapshot_volume(self
, volume_path
, snapshot_name
):
1317 self
._snapshot
_create
(self
._get
_path
(volume_path
), snapshot_name
)
1319 def destroy_snapshot_volume(self
, volume_path
, snapshot_name
):
1320 self
._snapshot
_destroy
(self
._get
_path
(volume_path
), snapshot_name
)
1322 def create_snapshot_group(self
, group_id
, snapshot_name
):
1323 if group_id
is None:
1324 raise RuntimeError("Group ID may not be None")
1326 return self
._snapshot
_create
(self
._get
_group
_path
(group_id
), snapshot_name
)
1328 def destroy_snapshot_group(self
, group_id
, snapshot_name
):
1329 if group_id
is None:
1330 raise RuntimeError("Group ID may not be None")
1331 if snapshot_name
is None:
1332 raise RuntimeError("Snapshot name may not be None")
1334 return self
._snapshot
_destroy
(self
._get
_group
_path
(group_id
), snapshot_name
)
1336 def _cp_r(self
, src
, dst
):
1338 raise NotImplementedError()
1340 def clone_volume_to_existing(self
, dest_volume_path
, src_volume_path
, src_snapshot_name
):
1341 dest_fs_path
= self
._get
_path
(dest_volume_path
)
1342 src_snapshot_path
= self
._snapshot
_path
(self
._get
_path
(src_volume_path
), src_snapshot_name
)
1344 self
._cp
_r
(src_snapshot_path
, dest_fs_path
)
1346 def put_object(self
, pool_name
, object_name
, data
):
1348 Synchronously write data to an object.
1350 :param pool_name: name of the pool
1351 :type pool_name: str
1352 :param object_name: name of the object
1353 :type object_name: str
1354 :param data: data to write
1357 ioctx
= self
.rados
.open_ioctx(pool_name
)
1358 max_size
= int(self
.rados
.conf_get('osd_max_write_size')) * 1024 * 1024
1359 if len(data
) > max_size
:
1360 msg
= ("Data to be written to object '{0}' exceeds "
1361 "{1} bytes".format(object_name
, max_size
))
1363 raise CephFSVolumeClientError(msg
)
1365 ioctx
.write_full(object_name
, data
)
1369 def get_object(self
, pool_name
, object_name
):
1371 Synchronously read data from object.
1373 :param pool_name: name of the pool
1374 :type pool_name: str
1375 :param object_name: name of the object
1376 :type object_name: str
1378 :returns: bytes - data read from object
1380 ioctx
= self
.rados
.open_ioctx(pool_name
)
1381 max_size
= int(self
.rados
.conf_get('osd_max_write_size')) * 1024 * 1024
1383 bytes_read
= ioctx
.read(object_name
, max_size
)
1384 if ((len(bytes_read
) == max_size
) and
1385 (ioctx
.read(object_name
, 1, offset
=max_size
))):
1386 log
.warning("Size of object {0} exceeds '{1}' bytes "
1387 "read".format(object_name
, max_size
))
1392 def delete_object(self
, pool_name
, object_name
):
1393 ioctx
= self
.rados
.open_ioctx(pool_name
)
1395 ioctx
.remove_object(object_name
)
1396 except rados
.ObjectNotFound
:
1397 log
.warn("Object '{0}' was already removed".format(object_name
))