2 Copyright (C) 2015 Red Hat, Inc.
4 LGPL2.1. See file COPYING.
7 from contextlib
import contextmanager
20 from ceph_argparse
import json_command
27 Helper method that returns byte representation of the given parameter.
29 if isinstance(param
, str):
32 return str(param
).encode()
34 class RadosError(Exception):
36 Something went wrong talking to Ceph with librados
43 log
= logging
.getLogger(__name__
)
45 # Reserved volume group name which we use in paths for volumes
46 # that are not assigned to a group (i.e. created with group=None)
47 NO_GROUP_NAME
= "_nogroup"
49 # Filename extensions for meta files.
50 META_FILE_EXT
= ".meta"
52 class VolumePath(object):
54 Identify a volume's path as group->volume
55 The Volume ID is a unique identifier, but this is a much more
56 helpful thing to pass around.
58 def __init__(self
, group_id
, volume_id
):
59 self
.group_id
= group_id
60 self
.volume_id
= volume_id
61 assert self
.group_id
!= NO_GROUP_NAME
62 assert self
.volume_id
!= "" and self
.volume_id
is not None
65 return "{0}/{1}".format(self
.group_id
, self
.volume_id
)
68 class ClusterTimeout(Exception):
70 Exception indicating that we timed out trying to talk to the Ceph cluster,
71 either to the mons, or to any individual daemon that the mons indicate ought
72 to be up but isn't responding to us.
77 class ClusterError(Exception):
79 Exception indicating that the cluster returned an error to a command that
80 we thought should be successful based on our last knowledge of the cluster
83 def __init__(self
, action
, result_code
, result_str
):
85 self
._result
_code
= result_code
86 self
._result
_str
= result_str
89 return "Error {0} (\"{1}\") while {2}".format(
90 self
._result
_code
, self
._result
_str
, self
._action
)
93 class RankEvicter(threading
.Thread
):
95 Thread for evicting client(s) from a particular MDS daemon instance.
97 This is more complex than simply sending a command, because we have to
98 handle cases where MDS daemons might not be fully up yet, and/or might
99 be transiently unresponsive to commands.
101 class GidGone(Exception):
106 def __init__(self
, volume_client
, client_spec
, rank
, gid
, mds_map
, ready_timeout
):
108 :param client_spec: list of strings, used as filter arguments to "session evict"
109 pass ["id=123"] to evict a single client with session id 123.
113 self
._mds
_map
= mds_map
114 self
._client
_spec
= client_spec
115 self
._volume
_client
= volume_client
116 self
._ready
_timeout
= ready_timeout
117 self
._ready
_waited
= 0
120 self
.exception
= None
122 super(RankEvicter
, self
).__init
__()
124 def _ready_to_evict(self
):
125 if self
._mds
_map
['up'].get("mds_{0}".format(self
.rank
), None) != self
.gid
:
126 log
.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format(
127 self
._client
_spec
, self
.rank
, self
.gid
129 raise RankEvicter
.GidGone()
131 info
= self
._mds
_map
['info']["gid_{0}".format(self
.gid
)]
132 log
.debug("_ready_to_evict: state={0}".format(info
['state']))
133 return info
['state'] in ["up:active", "up:clientreplay"]
135 def _wait_for_ready(self
):
137 Wait for that MDS rank to reach an active or clientreplay state, and
140 while not self
._ready
_to
_evict
():
141 if self
._ready
_waited
> self
._ready
_timeout
:
142 raise ClusterTimeout()
144 time
.sleep(self
.POLL_PERIOD
)
145 self
._ready
_waited
+= self
.POLL_PERIOD
147 self
._mds
_map
= self
._volume
_client
.get_mds_map()
151 Run the eviction procedure. Return true on success, false on errors.
154 # Wait til the MDS is believed by the mon to be available for commands
156 self
._wait
_for
_ready
()
160 # Then send it an evict
161 ret
= errno
.ETIMEDOUT
162 while ret
== errno
.ETIMEDOUT
:
163 log
.debug("mds_command: {0}, {1}".format(
164 "%s" % self
.gid
, ["session", "evict"] + self
._client
_spec
166 ret
, outb
, outs
= self
._volume
_client
.fs
.mds_command(
169 "prefix": "session evict",
170 "filters": self
._client
_spec
172 log
.debug("mds_command: complete {0} {1}".format(ret
, outs
))
174 # If we get a clean response, great, it's gone from that rank.
177 elif ret
== errno
.ETIMEDOUT
:
178 # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error)
179 self
._mds
_map
= self
._volume
_client
.get_mds_map()
181 self
._wait
_for
_ready
()
185 raise ClusterError("Sending evict to mds.{0}".format(self
.gid
), ret
, outs
)
190 except Exception as e
:
197 class EvictionError(Exception):
201 class CephFSVolumeClientError(Exception):
203 Something went wrong talking to Ceph using CephFSVolumeClient.
208 CEPHFSVOLUMECLIENT_VERSION_HISTORY
= """
210 CephFSVolumeClient Version History:
212 * 1 - Initial version
213 * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient
214 * 3 - Allow volumes to be created without RADOS namespace isolation
215 * 4 - Added get_object_and_version, put_object_versioned method to CephFSVolumeClient
219 class CephFSVolumeClient(object):
221 Combine libcephfs and librados interfaces to implement a
222 'Volume' concept implemented as a cephfs directory and
223 client capabilities which restrict mount access to this
226 Additionally, volumes may be in a 'Group'. Conveniently,
227 volumes are a lot like manila shares, and groups are a lot
228 like manila consistency groups.
230 Refer to volumes with VolumePath, which specifies the
231 volume and group IDs (both strings). The group ID may
234 In general, functions in this class are allowed raise rados.Error
235 or cephfs.Error exceptions in unexpected situations.
241 # Where shall we create our volumes?
242 POOL_PREFIX
= "fsvolume_"
243 DEFAULT_VOL_PREFIX
= "/volumes"
244 DEFAULT_NS_PREFIX
= "fsvolumens_"
246 def __init__(self
, auth_id
=None, conf_path
=None, cluster_name
=None,
247 volume_prefix
=None, pool_ns_prefix
=None, rados
=None,
250 Either set all three of ``auth_id``, ``conf_path`` and
251 ``cluster_name`` (rados constructed on connect), or
252 set ``rados`` (existing rados instance).
255 self
.fs_name
= fs_name
256 self
.connected
= False
258 self
.conf_path
= conf_path
259 self
.cluster_name
= cluster_name
260 self
.auth_id
= auth_id
264 # Using an externally owned rados, so we won't tear it down
266 self
.own_rados
= False
268 # self.rados will be constructed in connect
269 self
.own_rados
= True
271 self
.volume_prefix
= volume_prefix
if volume_prefix
else self
.DEFAULT_VOL_PREFIX
272 self
.pool_ns_prefix
= pool_ns_prefix
if pool_ns_prefix
else self
.DEFAULT_NS_PREFIX
273 # For flock'ing in cephfs, I want a unique ID to distinguish me
274 # from any other manila-share services that are loading this module.
275 # We could use pid, but that's unnecessary weak: generate a
277 self
._id
= struct
.unpack(">Q", uuid
.uuid1().bytes
[0:8])[0]
279 # TODO: version the on-disk structures
282 # Scan all auth keys to see if they're dirty: if they are, they have
283 # state that might not have propagated to Ceph or to the related
286 # Important: we *always* acquire locks in the order auth->volume
287 # That means a volume can never be dirty without the auth key
288 # we're updating it with being dirty at the same time.
290 # First list the auth IDs that have potentially dirty on-disk metadata
291 log
.debug("Recovering from partial auth updates (if any)...")
294 dir_handle
= self
.fs
.opendir(self
.volume_prefix
)
295 except cephfs
.ObjectNotFound
:
296 log
.debug("Nothing to recover. No auth meta files.")
299 d
= self
.fs
.readdir(dir_handle
)
303 log
.debug("Nothing to recover. No auth meta files.")
306 # Identify auth IDs from auth meta filenames. The auth meta files
307 # are named as, "$<auth_id><meta filename extension>"
308 regex
= "^\$(.*){0}$".format(re
.escape(META_FILE_EXT
))
309 match
= re
.search(regex
, d
.d_name
.decode(encoding
='utf-8'))
311 auth_ids
.append(match
.group(1))
313 d
= self
.fs
.readdir(dir_handle
)
315 self
.fs
.closedir(dir_handle
)
317 # Key points based on ordering:
318 # * Anything added in VMeta is already added in AMeta
319 # * Anything added in Ceph is already added in VMeta
320 # * Anything removed in VMeta is already removed in Ceph
321 # * Anything removed in AMeta is already removed in VMeta
323 # Deauthorization: because I only update metadata AFTER the
324 # update of the next level down, I have the same ordering of
325 # -> things which exist in the AMeta should also exist
326 # in the VMeta, should also exist in Ceph, and the same
327 # recovery procedure that gets me consistent after crashes
328 # during authorization will also work during deauthorization
330 # Now for each auth ID, check for dirty flag and apply updates
331 # if dirty flag is found
332 for auth_id
in auth_ids
:
333 with self
._auth
_lock
(auth_id
):
334 auth_meta
= self
._auth
_metadata
_get
(auth_id
)
335 if not auth_meta
or not auth_meta
['volumes']:
336 # Clean up auth meta file
337 self
.fs
.unlink(self
._auth
_metadata
_path
(auth_id
))
339 if not auth_meta
['dirty']:
341 self
._recover
_auth
_meta
(auth_id
, auth_meta
)
343 log
.debug("Recovered from partial auth updates (if any).")
345 def _recover_auth_meta(self
, auth_id
, auth_meta
):
347 Call me after locking the auth meta file.
351 for volume
, volume_data
in auth_meta
['volumes'].items():
352 if not volume_data
['dirty']:
355 (group_id
, volume_id
) = volume
.split('/')
356 group_id
= group_id
if group_id
is not 'None' else None
357 volume_path
= VolumePath(group_id
, volume_id
)
358 access_level
= volume_data
['access_level']
360 with self
._volume
_lock
(volume_path
):
361 vol_meta
= self
._volume
_metadata
_get
(volume_path
)
363 # No VMeta update indicates that there was no auth update
364 # in Ceph either. So it's safe to remove corresponding
365 # partial update in AMeta.
366 if not vol_meta
or auth_id
not in vol_meta
['auths']:
367 remove_volumes
.append(volume
)
371 'access_level': access_level
,
374 # VMeta update looks clean. Ceph auth update must have been
376 if vol_meta
['auths'][auth_id
] == want_auth
:
379 readonly
= True if access_level
is 'r' else False
380 self
._authorize
_volume
(volume_path
, auth_id
, readonly
)
382 # Recovered from partial auth updates for the auth ID's access
384 auth_meta
['volumes'][volume
]['dirty'] = False
385 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
387 for volume
in remove_volumes
:
388 del auth_meta
['volumes'][volume
]
390 if not auth_meta
['volumes']:
391 # Clean up auth meta file
392 self
.fs
.unlink(self
._auth
_metadata
_path
(auth_id
))
395 # Recovered from all partial auth updates for the auth ID.
396 auth_meta
['dirty'] = False
397 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
399 def get_mds_map(self
):
400 fs_map
= self
._rados
_command
("fs dump", {})
401 return fs_map
['filesystems'][0]['mdsmap']
403 def evict(self
, auth_id
, timeout
=30, volume_path
=None):
405 Evict all clients based on the authorization ID and optionally based on
406 the volume path mounted. Assumes that the authorization key has been
407 revoked prior to calling this function.
409 This operation can throw an exception if the mon cluster is unresponsive, or
410 any individual MDS daemon is unresponsive for longer than the timeout passed in.
413 client_spec
= ["auth_name={0}".format(auth_id
), ]
415 client_spec
.append("client_metadata.root={0}".
416 format(self
._get
_path
(volume_path
)))
418 log
.info("evict clients with {0}".format(', '.join(client_spec
)))
420 mds_map
= self
.get_mds_map()
422 for name
, gid
in mds_map
['up'].items():
423 # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
424 assert name
.startswith("mds_")
425 up
[int(name
[4:])] = gid
427 # For all MDS ranks held by a daemon
428 # Do the parallelism in python instead of using "tell mds.*", because
429 # the latter doesn't give us per-mds output
431 for rank
, gid
in up
.items():
432 thread
= RankEvicter(self
, client_spec
, rank
, gid
, mds_map
,
435 threads
.append(thread
)
440 log
.info("evict: joined all")
444 msg
= ("Failed to evict client with {0} from mds {1}/{2}: {3}".
445 format(', '.join(client_spec
), t
.rank
, t
.gid
, t
.exception
)
448 raise EvictionError(msg
)
450 def _get_path(self
, volume_path
):
452 Determine the path within CephFS where this volume will live
453 :return: absolute path (string)
457 volume_path
.group_id
if volume_path
.group_id
is not None else NO_GROUP_NAME
,
458 volume_path
.volume_id
)
460 def _get_group_path(self
, group_id
):
462 raise ValueError("group_id may not be None")
469 def _connect(self
, premount_evict
):
470 log
.debug("Connecting to cephfs...")
471 self
.fs
= cephfs
.LibCephFS(rados_inst
=self
.rados
)
472 log
.debug("CephFS initializing...")
474 if premount_evict
is not None:
475 log
.debug("Premount eviction of {0} starting".format(premount_evict
))
476 self
.evict(premount_evict
)
477 log
.debug("Premount eviction of {0} completes".format(premount_evict
))
478 log
.debug("CephFS mounting...")
479 self
.fs
.mount(filesystem_name
=self
.fs_name
)
480 log
.debug("Connection to cephfs complete")
482 # Recover from partial auth updates due to a previous
486 def connect(self
, premount_evict
= None):
489 :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers
490 may want to use this to specify their own auth ID if they expect
491 to be a unique instance and don't want to wait for caps to time
492 out after failure of another instance of themselves.
495 log
.debug("Configuring to RADOS with config {0}...".format(self
.conf_path
))
496 self
.rados
= rados
.Rados(
497 name
="client.{0}".format(self
.auth_id
),
498 clustername
=self
.cluster_name
,
499 conffile
=self
.conf_path
,
502 if self
.rados
.state
!= "connected":
503 log
.debug("Connecting to RADOS...")
505 log
.debug("Connection to RADOS complete")
506 self
._connect
(premount_evict
)
508 def get_mon_addrs(self
):
509 log
.info("get_mon_addrs")
511 mon_map
= self
._rados
_command
("mon dump")
512 for mon
in mon_map
['mons']:
513 ip_port
= mon
['addr'].split("/")[0]
514 result
.append(ip_port
)
518 def disconnect(self
):
519 log
.info("disconnect")
521 log
.debug("Disconnecting cephfs...")
524 log
.debug("Disconnecting cephfs complete")
526 if self
.rados
and self
.own_rados
:
527 log
.debug("Disconnecting rados...")
528 self
.rados
.shutdown()
530 log
.debug("Disconnecting rados complete")
536 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
542 def _get_pool_id(self
, osd_map
, pool_name
):
543 # Maybe borrow the OSDMap wrapper class from calamari if more helpers
544 # like this are needed.
545 for pool
in osd_map
['pools']:
546 if pool
['pool_name'] == pool_name
:
551 def _create_volume_pool(self
, pool_name
):
553 Idempotently create a pool for use as a CephFS data pool, with the given name
555 :return The ID of the created pool
557 osd_map
= self
._rados
_command
('osd dump', {})
559 existing_id
= self
._get
_pool
_id
(osd_map
, pool_name
)
560 if existing_id
is not None:
561 log
.info("Pool {0} already exists".format(pool_name
))
564 osd_count
= len(osd_map
['osds'])
566 # We can't query the actual cluster config remotely, but since this is
567 # just a heuristic we'll assume that the ceph.conf we have locally reflects
568 # that in use in the rest of the cluster.
569 pg_warn_max_per_osd
= int(self
.rados
.conf_get('mon_max_pg_per_osd'))
572 for pool
in osd_map
['pools']:
573 if not pool
['pool_name'].startswith(self
.POOL_PREFIX
):
574 other_pgs
+= pool
['pg_num']
576 # A basic heuristic for picking pg_num: work out the max number of
577 # PGs we can have without tripping a warning, then subtract the number
578 # of PGs already created by non-manila pools, then divide by ten. That'll
579 # give you a reasonable result on a system where you have "a few" manila
581 pg_num
= ((pg_warn_max_per_osd
* osd_count
) - other_pgs
) // 10
582 # TODO Alternatively, respect an override set by the user.
588 'pg_num': int(pg_num
),
592 osd_map
= self
._rados
_command
('osd dump', {})
593 pool_id
= self
._get
_pool
_id
(osd_map
, pool_name
)
596 # If the pool isn't there, that's either a ceph bug, or it's some outside influence
597 # removing it right after we created it.
598 log
.error("OSD map doesn't contain expected pool '{0}':\n{1}".format(
599 pool_name
, json
.dumps(osd_map
, indent
=2)
601 raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name
))
605 def create_group(self
, group_id
, mode
=0o755):
606 # Prevent craftily-named volume groups from colliding with the meta
608 if group_id
.endswith(META_FILE_EXT
):
609 raise ValueError("group ID cannot end with '{0}'.".format(
611 path
= self
._get
_group
_path
(group_id
)
612 self
._mkdir
_p
(path
, mode
)
614 def destroy_group(self
, group_id
):
615 path
= self
._get
_group
_path
(group_id
)
617 self
.fs
.stat(self
.volume_prefix
)
618 except cephfs
.ObjectNotFound
:
623 def _mkdir_p(self
, path
, mode
=0o755):
626 except cephfs
.ObjectNotFound
:
631 parts
= path
.split(os
.path
.sep
)
633 for i
in range(1, len(parts
) + 1):
634 subpath
= os
.path
.join(*parts
[0:i
])
636 self
.fs
.stat(subpath
)
637 except cephfs
.ObjectNotFound
:
638 self
.fs
.mkdir(subpath
, mode
)
640 def create_volume(self
, volume_path
, size
=None, data_isolated
=False, namespace_isolated
=True,
643 Set up metadata, pools and auth for a volume.
645 This function is idempotent. It is safe to call this again
646 for an already-created volume, even if it is in use.
648 :param volume_path: VolumePath instance
649 :param size: In bytes, or None for no size limit
650 :param data_isolated: If true, create a separate OSD pool for this volume
651 :param namespace_isolated: If true, use separate RADOS namespace for this volume
654 path
= self
._get
_path
(volume_path
)
655 log
.info("create_volume: {0}".format(path
))
657 self
._mkdir
_p
(path
, mode
)
660 self
.fs
.setxattr(path
, 'ceph.quota.max_bytes', to_bytes(size
), 0)
662 # data_isolated means create a separate pool for this volume
664 pool_name
= "{0}{1}".format(self
.POOL_PREFIX
, volume_path
.volume_id
)
665 log
.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path
, pool_name
))
666 pool_id
= self
._create
_volume
_pool
(pool_name
)
667 mds_map
= self
.get_mds_map()
668 if pool_id
not in mds_map
['data_pools']:
669 self
._rados
_command
("fs add_data_pool", {
670 'fs_name': mds_map
['fs_name'],
673 time
.sleep(5) # time for MDSMap to be distributed
674 self
.fs
.setxattr(path
, 'ceph.dir.layout.pool', to_bytes(pool_name
), 0)
676 # enforce security isolation, use separate namespace for this volume
677 if namespace_isolated
:
678 namespace
= "{0}{1}".format(self
.pool_ns_prefix
, volume_path
.volume_id
)
679 log
.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path
, namespace
))
680 self
.fs
.setxattr(path
, 'ceph.dir.layout.pool_namespace',
681 to_bytes(namespace
), 0)
683 # If volume's namespace layout is not set, then the volume's pool
684 # layout remains unset and will undesirably change with ancestor's
685 # pool layout changes.
686 pool_name
= self
._get
_ancestor
_xattr
(path
, "ceph.dir.layout.pool")
687 self
.fs
.setxattr(path
, 'ceph.dir.layout.pool',
688 to_bytes(pool_name
), 0)
690 # Create a volume meta file, if it does not already exist, to store
691 # data about auth ids having access to the volume
692 fd
= self
.fs
.open(self
._volume
_metadata
_path
(volume_path
),
700 def delete_volume(self
, volume_path
, data_isolated
=False):
702 Make a volume inaccessible to guests. This function is
703 idempotent. This is the fast part of tearing down a volume: you must
704 also later call purge_volume, which is the slow part.
706 :param volume_path: Same identifier used in create_volume
710 path
= self
._get
_path
(volume_path
)
711 log
.info("delete_volume: {0}".format(path
))
713 # Create the trash folder if it doesn't already exist
714 trash
= os
.path
.join(self
.volume_prefix
, "_deleting")
717 # We'll move it to here
718 trashed_volume
= os
.path
.join(trash
, volume_path
.volume_id
)
720 # Move the volume's data to the trash folder
723 except cephfs
.ObjectNotFound
:
724 log
.warning("Trying to delete volume '{0}' but it's already gone".format(
727 self
.fs
.rename(path
, trashed_volume
)
729 # Delete the volume meta file, if it's not already deleted
730 vol_meta_path
= self
._volume
_metadata
_path
(volume_path
)
732 self
.fs
.unlink(vol_meta_path
)
733 except cephfs
.ObjectNotFound
:
736 def purge_volume(self
, volume_path
, data_isolated
=False):
738 Finish clearing up a volume that was previously passed to delete_volume. This
739 function is idempotent.
742 trash
= os
.path
.join(self
.volume_prefix
, "_deleting")
743 trashed_volume
= os
.path
.join(trash
, volume_path
.volume_id
)
746 self
.fs
.stat(trashed_volume
)
747 except cephfs
.ObjectNotFound
:
748 log
.warning("Trying to purge volume '{0}' but it's already been purged".format(
752 def rmtree(root_path
):
753 log
.debug("rmtree {0}".format(root_path
))
754 dir_handle
= self
.fs
.opendir(root_path
)
755 d
= self
.fs
.readdir(dir_handle
)
757 d_name
= d
.d_name
.decode(encoding
='utf-8')
758 if d_name
not in [".", ".."]:
759 # Do not use os.path.join because it is sensitive
760 # to string encoding, we just pass through dnames
762 d_full
= u
"{0}/{1}".format(root_path
, d_name
)
766 self
.fs
.unlink(d_full
)
768 d
= self
.fs
.readdir(dir_handle
)
769 self
.fs
.closedir(dir_handle
)
771 self
.fs
.rmdir(root_path
)
773 rmtree(trashed_volume
)
776 pool_name
= "{0}{1}".format(self
.POOL_PREFIX
, volume_path
.volume_id
)
777 osd_map
= self
._rados
_command
("osd dump", {})
778 pool_id
= self
._get
_pool
_id
(osd_map
, pool_name
)
779 mds_map
= self
.get_mds_map()
780 if pool_id
in mds_map
['data_pools']:
781 self
._rados
_command
("fs rm_data_pool", {
782 'fs_name': mds_map
['fs_name'],
785 self
._rados
_command
("osd pool delete",
789 "yes_i_really_really_mean_it": True
792 def _get_ancestor_xattr(self
, path
, attr
):
794 Helper for reading layout information: if this xattr is missing
795 on the requested path, keep checking parents until we find it.
798 result
= self
.fs
.getxattr(path
, attr
).decode()
800 # Annoying! cephfs gives us empty instead of an error when attr not found
801 raise cephfs
.NoData()
804 except cephfs
.NoData
:
808 return self
._get
_ancestor
_xattr
(os
.path
.split(path
)[0], attr
)
810 def _check_compat_version(self
, compat_version
):
811 if self
.version
< compat_version
:
812 msg
= ("The current version of CephFSVolumeClient, version {0} "
813 "does not support the required feature. Need version {1} "
814 "or greater".format(self
.version
, compat_version
)
817 raise CephFSVolumeClientError(msg
)
819 def _metadata_get(self
, path
):
821 Return a deserialized JSON object, or None
823 fd
= self
.fs
.open(path
, "r")
824 # TODO iterate instead of assuming file < 4MB
825 read_bytes
= self
.fs
.read(fd
, 0, 4096 * 1024)
828 return json
.loads(read_bytes
.decode())
832 def _metadata_set(self
, path
, data
):
833 serialized
= json
.dumps(data
)
834 fd
= self
.fs
.open(path
, "w")
836 self
.fs
.write(fd
, to_bytes(serialized
), 0)
841 def _lock(self
, path
):
845 fd
= self
.fs
.open(path
, os
.O_CREAT
, 0o755)
846 self
.fs
.flock(fd
, fcntl
.LOCK_EX
, self
._id
)
848 # The locked file will be cleaned up sometime. It could be
849 # unlinked e.g., by an another manila share instance, before
850 # lock was applied on it. Perform checks to ensure that this
853 statbuf
= self
.fs
.stat(path
)
854 except cephfs
.ObjectNotFound
:
858 fstatbuf
= self
.fs
.fstat(fd
)
859 if statbuf
.st_ino
== fstatbuf
.st_ino
:
865 self
.fs
.flock(fd
, fcntl
.LOCK_UN
, self
._id
)
870 def _auth_metadata_path(self
, auth_id
):
871 return os
.path
.join(self
.volume_prefix
, "${0}{1}".format(
872 auth_id
, META_FILE_EXT
))
874 def _auth_lock(self
, auth_id
):
875 return self
._lock
(self
._auth
_metadata
_path
(auth_id
))
877 def _auth_metadata_get(self
, auth_id
):
879 Call me with the metadata locked!
881 Check whether a auth metadata structure can be decoded by the current
882 version of CephFSVolumeClient.
884 Return auth metadata that the current version of CephFSVolumeClient
887 auth_metadata
= self
._metadata
_get
(self
._auth
_metadata
_path
(auth_id
))
890 self
._check
_compat
_version
(auth_metadata
['compat_version'])
894 def _auth_metadata_set(self
, auth_id
, data
):
896 Call me with the metadata locked!
898 Fsync the auth metadata.
900 Add two version attributes to the auth metadata,
901 'compat_version', the minimum CephFSVolumeClient version that can
902 decode the metadata, and 'version', the CephFSVolumeClient version
903 that encoded the metadata.
905 data
['compat_version'] = 1
906 data
['version'] = self
.version
907 return self
._metadata
_set
(self
._auth
_metadata
_path
(auth_id
), data
)
909 def _volume_metadata_path(self
, volume_path
):
910 return os
.path
.join(self
.volume_prefix
, "_{0}:{1}{2}".format(
911 volume_path
.group_id
if volume_path
.group_id
else "",
912 volume_path
.volume_id
,
916 def _volume_lock(self
, volume_path
):
918 Return a ContextManager which locks the authorization metadata for
919 a particular volume, and persists a flag to the metadata indicating
920 that it is currently locked, so that we can detect dirty situations
923 This lock isn't just to make access to the metadata safe: it's also
924 designed to be used over the two-step process of checking the
925 metadata and then responding to an authorization request, to
926 ensure that at the point we respond the metadata hasn't changed
927 in the background. It's key to how we avoid security holes
928 resulting from races during that problem ,
930 return self
._lock
(self
._volume
_metadata
_path
(volume_path
))
932 def _volume_metadata_get(self
, volume_path
):
934 Call me with the metadata locked!
936 Check whether a volume metadata structure can be decoded by the current
937 version of CephFSVolumeClient.
939 Return a volume_metadata structure that the current version of
940 CephFSVolumeClient can decode.
942 volume_metadata
= self
._metadata
_get
(self
._volume
_metadata
_path
(volume_path
))
945 self
._check
_compat
_version
(volume_metadata
['compat_version'])
947 return volume_metadata
949 def _volume_metadata_set(self
, volume_path
, data
):
951 Call me with the metadata locked!
953 Add two version attributes to the volume metadata,
954 'compat_version', the minimum CephFSVolumeClient version that can
955 decode the metadata and 'version', the CephFSVolumeClient version
956 that encoded the metadata.
958 data
['compat_version'] = 1
959 data
['version'] = self
.version
960 return self
._metadata
_set
(self
._volume
_metadata
_path
(volume_path
), data
)
962 def authorize(self
, volume_path
, auth_id
, readonly
=False, tenant_id
=None):
964 Get-or-create a Ceph auth identity for `auth_id` and grant them access
969 :param tenant_id: Optionally provide a stringizable object to
970 restrict any created cephx IDs to other callers
971 passing the same tenant ID.
975 with self
._auth
_lock
(auth_id
):
976 # Existing meta, or None, to be updated
977 auth_meta
= self
._auth
_metadata
_get
(auth_id
)
979 # volume data to be inserted
980 volume_path_str
= str(volume_path
)
983 # The access level at which the auth_id is authorized to
985 'access_level': 'r' if readonly
else 'rw',
989 if auth_meta
is None:
990 sys
.stderr
.write("Creating meta for ID {0} with tenant {1}\n".format(
993 log
.debug("Authorize: no existing meta")
996 'tenant_id': tenant_id
.__str
__() if tenant_id
else None,
1000 # Note: this is *not* guaranteeing that the key doesn't already
1001 # exist in Ceph: we are allowing VolumeClient tenants to
1002 # 'claim' existing Ceph keys. In order to prevent VolumeClient
1003 # tenants from reading e.g. client.admin keys, you need to
1004 # have configured your VolumeClient user (e.g. Manila) to
1005 # have mon auth caps that prevent it from accessing those keys
1006 # (e.g. limit it to only access keys with a manila.* prefix)
1008 # Disallow tenants to share auth IDs
1009 if auth_meta
['tenant_id'].__str
__() != tenant_id
.__str
__():
1010 msg
= "auth ID: {0} is already in use".format(auth_id
)
1012 raise CephFSVolumeClientError(msg
)
1014 if auth_meta
['dirty']:
1015 self
._recover
_auth
_meta
(auth_id
, auth_meta
)
1017 log
.debug("Authorize: existing tenant {tenant}".format(
1018 tenant
=auth_meta
['tenant_id']
1020 auth_meta
['dirty'] = True
1021 auth_meta
['volumes'].update(volume
)
1023 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
1025 with self
._volume
_lock
(volume_path
):
1026 key
= self
._authorize
_volume
(volume_path
, auth_id
, readonly
)
1028 auth_meta
['dirty'] = False
1029 auth_meta
['volumes'][volume_path_str
]['dirty'] = False
1030 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
1037 # Caller wasn't multi-tenant aware: be safe and don't give
1043 def _authorize_volume(self
, volume_path
, auth_id
, readonly
):
1044 vol_meta
= self
._volume
_metadata
_get
(volume_path
)
1046 access_level
= 'r' if readonly
else 'rw'
1049 'access_level': access_level
,
1054 if vol_meta
is None:
1059 vol_meta
['auths'].update(auth
)
1060 self
._volume
_metadata
_set
(volume_path
, vol_meta
)
1062 key
= self
._authorize
_ceph
(volume_path
, auth_id
, readonly
)
1064 vol_meta
['auths'][auth_id
]['dirty'] = False
1065 self
._volume
_metadata
_set
(volume_path
, vol_meta
)
1069 def _authorize_ceph(self
, volume_path
, auth_id
, readonly
):
1070 path
= self
._get
_path
(volume_path
)
1071 log
.debug("Authorizing Ceph id '{0}' for path '{1}'".format(
1075 # First I need to work out what the data pool is for this share:
1077 pool_name
= self
._get
_ancestor
_xattr
(path
, "ceph.dir.layout.pool")
1080 namespace
= self
.fs
.getxattr(path
, "ceph.dir.layout.pool_"
1081 "namespace").decode()
1082 except cephfs
.NoData
:
1085 # Now construct auth capabilities that give the guest just enough
1086 # permissions to access the share
1087 client_entity
= "client.{0}".format(auth_id
)
1088 want_access_level
= 'r' if readonly
else 'rw'
1089 want_mds_cap
= 'allow {0} path={1}'.format(want_access_level
, path
)
1091 want_osd_cap
= 'allow {0} pool={1} namespace={2}'.format(
1092 want_access_level
, pool_name
, namespace
)
1094 want_osd_cap
= 'allow {0} pool={1}'.format(want_access_level
,
1098 existing
= self
._rados
_command
(
1101 'entity': client_entity
1104 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1106 caps
= self
._rados
_command
(
1107 'auth get-or-create',
1109 'entity': client_entity
,
1111 'mds', want_mds_cap
,
1112 'osd', want_osd_cap
,
1116 # entity exists, update it
1119 # Construct auth caps that if present might conflict with the desired
1121 unwanted_access_level
= 'r' if want_access_level
is 'rw' else 'rw'
1122 unwanted_mds_cap
= 'allow {0} path={1}'.format(unwanted_access_level
, path
)
1124 unwanted_osd_cap
= 'allow {0} pool={1} namespace={2}'.format(
1125 unwanted_access_level
, pool_name
, namespace
)
1127 unwanted_osd_cap
= 'allow {0} pool={1}'.format(
1128 unwanted_access_level
, pool_name
)
1131 orig_mds_caps
, orig_osd_caps
, want_mds_cap
,
1132 want_osd_cap
, unwanted_mds_cap
, unwanted_osd_cap
):
1134 if not orig_mds_caps
:
1135 return want_mds_cap
, want_osd_cap
1137 mds_cap_tokens
= orig_mds_caps
.split(",")
1138 osd_cap_tokens
= orig_osd_caps
.split(",")
1140 if want_mds_cap
in mds_cap_tokens
:
1141 return orig_mds_caps
, orig_osd_caps
1143 if unwanted_mds_cap
in mds_cap_tokens
:
1144 mds_cap_tokens
.remove(unwanted_mds_cap
)
1145 osd_cap_tokens
.remove(unwanted_osd_cap
)
1147 mds_cap_tokens
.append(want_mds_cap
)
1148 osd_cap_tokens
.append(want_osd_cap
)
1150 return ",".join(mds_cap_tokens
), ",".join(osd_cap_tokens
)
1152 orig_mds_caps
= cap
['caps'].get('mds', "")
1153 orig_osd_caps
= cap
['caps'].get('osd', "")
1155 mds_cap_str
, osd_cap_str
= cap_update(
1156 orig_mds_caps
, orig_osd_caps
, want_mds_cap
, want_osd_cap
,
1157 unwanted_mds_cap
, unwanted_osd_cap
)
1159 caps
= self
._rados
_command
(
1162 'entity': client_entity
,
1166 'mon', cap
['caps'].get('mon', 'allow r')]
1168 caps
= self
._rados
_command
(
1171 'entity': client_entity
1175 # Result expected like this:
1178 # "entity": "client.foobar",
1179 # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==",
1186 assert len(caps
) == 1
1187 assert caps
[0]['entity'] == client_entity
1188 return caps
[0]['key']
1190 def deauthorize(self
, volume_path
, auth_id
):
1191 with self
._auth
_lock
(auth_id
):
1192 # Existing meta, or None, to be updated
1193 auth_meta
= self
._auth
_metadata
_get
(auth_id
)
1195 volume_path_str
= str(volume_path
)
1196 if (auth_meta
is None) or (not auth_meta
['volumes']):
1197 log
.warn("deauthorized called for already-removed auth"
1198 "ID '{auth_id}' for volume ID '{volume}'".format(
1199 auth_id
=auth_id
, volume
=volume_path
.volume_id
1201 # Clean up the auth meta file of an auth ID
1202 self
.fs
.unlink(self
._auth
_metadata
_path
(auth_id
))
1205 if volume_path_str
not in auth_meta
['volumes']:
1206 log
.warn("deauthorized called for already-removed auth"
1207 "ID '{auth_id}' for volume ID '{volume}'".format(
1208 auth_id
=auth_id
, volume
=volume_path
.volume_id
1212 if auth_meta
['dirty']:
1213 self
._recover
_auth
_meta
(auth_id
, auth_meta
)
1215 auth_meta
['dirty'] = True
1216 auth_meta
['volumes'][volume_path_str
]['dirty'] = True
1217 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
1219 self
._deauthorize
_volume
(volume_path
, auth_id
)
1221 # Filter out the volume we're deauthorizing
1222 del auth_meta
['volumes'][volume_path_str
]
1224 # Clean up auth meta file
1225 if not auth_meta
['volumes']:
1226 self
.fs
.unlink(self
._auth
_metadata
_path
(auth_id
))
1229 auth_meta
['dirty'] = False
1230 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
1232 def _deauthorize_volume(self
, volume_path
, auth_id
):
1233 with self
._volume
_lock
(volume_path
):
1234 vol_meta
= self
._volume
_metadata
_get
(volume_path
)
1236 if (vol_meta
is None) or (auth_id
not in vol_meta
['auths']):
1237 log
.warn("deauthorized called for already-removed auth"
1238 "ID '{auth_id}' for volume ID '{volume}'".format(
1239 auth_id
=auth_id
, volume
=volume_path
.volume_id
1243 vol_meta
['auths'][auth_id
]['dirty'] = True
1244 self
._volume
_metadata
_set
(volume_path
, vol_meta
)
1246 self
._deauthorize
(volume_path
, auth_id
)
1248 # Remove the auth_id from the metadata *after* removing it
1249 # from ceph, so that if we crashed here, we would actually
1250 # recreate the auth ID during recovery (i.e. end up with
1251 # a consistent state).
1253 # Filter out the auth we're removing
1254 del vol_meta
['auths'][auth_id
]
1255 self
._volume
_metadata
_set
(volume_path
, vol_meta
)
1257 def _deauthorize(self
, volume_path
, auth_id
):
1259 The volume must still exist.
1261 client_entity
= "client.{0}".format(auth_id
)
1262 path
= self
._get
_path
(volume_path
)
1263 pool_name
= self
._get
_ancestor
_xattr
(path
, "ceph.dir.layout.pool")
1265 namespace
= self
.fs
.getxattr(path
, "ceph.dir.layout.pool_"
1266 "namespace").decode()
1267 except cephfs
.NoData
:
1270 # The auth_id might have read-only or read-write mount access for the
1272 access_levels
= ('r', 'rw')
1273 want_mds_caps
= ['allow {0} path={1}'.format(access_level
, path
)
1274 for access_level
in access_levels
]
1276 want_osd_caps
= ['allow {0} pool={1} namespace={2}'.format(access_level
, pool_name
, namespace
)
1277 for access_level
in access_levels
]
1279 want_osd_caps
= ['allow {0} pool={1}'.format(access_level
, pool_name
)
1280 for access_level
in access_levels
]
1284 existing
= self
._rados
_command
(
1287 'entity': client_entity
1291 def cap_remove(orig_mds_caps
, orig_osd_caps
, want_mds_caps
, want_osd_caps
):
1292 mds_cap_tokens
= orig_mds_caps
.split(",")
1293 osd_cap_tokens
= orig_osd_caps
.split(",")
1295 for want_mds_cap
, want_osd_cap
in zip(want_mds_caps
, want_osd_caps
):
1296 if want_mds_cap
in mds_cap_tokens
:
1297 mds_cap_tokens
.remove(want_mds_cap
)
1298 osd_cap_tokens
.remove(want_osd_cap
)
1301 return ",".join(mds_cap_tokens
), ",".join(osd_cap_tokens
)
1304 orig_mds_caps
= cap
['caps'].get('mds', "")
1305 orig_osd_caps
= cap
['caps'].get('osd', "")
1306 mds_cap_str
, osd_cap_str
= cap_remove(orig_mds_caps
, orig_osd_caps
,
1307 want_mds_caps
, want_osd_caps
)
1310 self
._rados
_command
('auth del', {'entity': client_entity
}, decode
=False)
1312 self
._rados
_command
(
1315 'entity': client_entity
,
1319 'mon', cap
['caps'].get('mon', 'allow r')]
1322 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1324 # Already gone, great.
1327 def get_authorized_ids(self
, volume_path
):
1329 Expose a list of auth IDs that have access to a volume.
1331 return: a list of (auth_id, access_level) tuples, where
1332 the access_level can be 'r' , or 'rw'.
1333 None if no auth ID is given access to the volume.
1335 with self
._volume
_lock
(volume_path
):
1336 meta
= self
._volume
_metadata
_get
(volume_path
)
1338 if not meta
or not meta
['auths']:
1341 for auth
, auth_data
in meta
['auths'].items():
1342 # Skip partial auth updates.
1343 if not auth_data
['dirty']:
1344 auths
.append((auth
, auth_data
['access_level']))
1348 def _rados_command(self
, prefix
, args
=None, decode
=True):
1350 Safer wrapper for ceph_argparse.json_command, which raises
1351 Error exception instead of relying on caller to check return
1354 Error exception can result from:
1356 * Actual legitimate errors
1357 * Malformed JSON output
1359 return: Decoded object from ceph, or None if empty string returned.
1360 If decode is False, return a string (the data returned by
1366 argdict
= args
.copy()
1367 argdict
['format'] = 'json'
1369 ret
, outbuf
, outs
= json_command(self
.rados
,
1372 timeout
=RADOS_TIMEOUT
)
1374 raise rados
.Error(outs
)
1379 return json
.loads(outbuf
.decode())
1380 except (ValueError, TypeError):
1381 raise RadosError("Invalid JSON output for command {0}".format(argdict
))
1387 def get_used_bytes(self
, volume_path
):
1388 return int(self
.fs
.getxattr(self
._get
_path
(volume_path
), "ceph.dir."
1391 def set_max_bytes(self
, volume_path
, max_bytes
):
1392 self
.fs
.setxattr(self
._get
_path
(volume_path
), 'ceph.quota.max_bytes',
1393 to_bytes(max_bytes
if max_bytes
else 0), 0)
1395 def _snapshot_path(self
, dir_path
, snapshot_name
):
1396 return os
.path
.join(
1397 dir_path
, self
.rados
.conf_get('client_snapdir'), snapshot_name
1400 def _snapshot_create(self
, dir_path
, snapshot_name
, mode
=0o755):
1401 # TODO: raise intelligible exception for clusters where snaps are disabled
1402 self
.fs
.mkdir(self
._snapshot
_path
(dir_path
, snapshot_name
), mode
)
1404 def _snapshot_destroy(self
, dir_path
, snapshot_name
):
1406 Remove a snapshot, or do nothing if it already doesn't exist.
1409 self
.fs
.rmdir(self
._snapshot
_path
(dir_path
, snapshot_name
))
1410 except cephfs
.ObjectNotFound
:
1411 log
.warn("Snapshot was already gone: {0}".format(snapshot_name
))
1413 def create_snapshot_volume(self
, volume_path
, snapshot_name
, mode
=0o755):
1414 self
._snapshot
_create
(self
._get
_path
(volume_path
), snapshot_name
, mode
)
1416 def destroy_snapshot_volume(self
, volume_path
, snapshot_name
):
1417 self
._snapshot
_destroy
(self
._get
_path
(volume_path
), snapshot_name
)
1419 def create_snapshot_group(self
, group_id
, snapshot_name
, mode
=0o755):
1420 if group_id
is None:
1421 raise RuntimeError("Group ID may not be None")
1423 return self
._snapshot
_create
(self
._get
_group
_path
(group_id
), snapshot_name
,
1426 def destroy_snapshot_group(self
, group_id
, snapshot_name
):
1427 if group_id
is None:
1428 raise RuntimeError("Group ID may not be None")
1429 if snapshot_name
is None:
1430 raise RuntimeError("Snapshot name may not be None")
1432 return self
._snapshot
_destroy
(self
._get
_group
_path
(group_id
), snapshot_name
)
1434 def _cp_r(self
, src
, dst
):
1436 raise NotImplementedError()
1438 def clone_volume_to_existing(self
, dest_volume_path
, src_volume_path
, src_snapshot_name
):
1439 dest_fs_path
= self
._get
_path
(dest_volume_path
)
1440 src_snapshot_path
= self
._snapshot
_path
(self
._get
_path
(src_volume_path
), src_snapshot_name
)
1442 self
._cp
_r
(src_snapshot_path
, dest_fs_path
)
1444 def put_object(self
, pool_name
, object_name
, data
):
1446 Synchronously write data to an object.
1448 :param pool_name: name of the pool
1449 :type pool_name: str
1450 :param object_name: name of the object
1451 :type object_name: str
1452 :param data: data to write
1455 return self
.put_object_versioned(pool_name
, object_name
, data
)
1457 def put_object_versioned(self
, pool_name
, object_name
, data
, version
=None):
1459 Synchronously write data to an object only if version of the object
1460 version matches the expected version.
1462 :param pool_name: name of the pool
1463 :type pool_name: str
1464 :param object_name: name of the object
1465 :type object_name: str
1466 :param data: data to write
1468 :param version: expected version of the object to write
1471 ioctx
= self
.rados
.open_ioctx(pool_name
)
1473 max_size
= int(self
.rados
.conf_get('osd_max_write_size')) * 1024 * 1024
1474 if len(data
) > max_size
:
1475 msg
= ("Data to be written to object '{0}' exceeds "
1476 "{1} bytes".format(object_name
, max_size
))
1478 raise CephFSVolumeClientError(msg
)
1481 with rados
.WriteOpCtx() as wop
:
1482 if version
is not None:
1483 wop
.assert_version(version
)
1484 wop
.write_full(data
)
1485 ioctx
.operate_write_op(wop
, object_name
)
1486 except rados
.OSError as e
:
1492 def get_object(self
, pool_name
, object_name
):
1494 Synchronously read data from object.
1496 :param pool_name: name of the pool
1497 :type pool_name: str
1498 :param object_name: name of the object
1499 :type object_name: str
1501 :returns: bytes - data read from object
1503 return self
.get_object_and_version(pool_name
, object_name
)[0]
1505 def get_object_and_version(self
, pool_name
, object_name
):
1507 Synchronously read data from object and get its version.
1509 :param pool_name: name of the pool
1510 :type pool_name: str
1511 :param object_name: name of the object
1512 :type object_name: str
1514 :returns: tuple of object data and version
1516 ioctx
= self
.rados
.open_ioctx(pool_name
)
1517 max_size
= int(self
.rados
.conf_get('osd_max_write_size')) * 1024 * 1024
1519 bytes_read
= ioctx
.read(object_name
, max_size
)
1520 if ((len(bytes_read
) == max_size
) and
1521 (ioctx
.read(object_name
, 1, offset
=max_size
))):
1522 log
.warning("Size of object {0} exceeds '{1}' bytes "
1523 "read".format(object_name
, max_size
))
1524 obj_version
= ioctx
.get_last_version()
1527 return (bytes_read
, obj_version
)
1529 def delete_object(self
, pool_name
, object_name
):
1530 ioctx
= self
.rados
.open_ioctx(pool_name
)
1532 ioctx
.remove_object(object_name
)
1533 except rados
.ObjectNotFound
:
1534 log
.warn("Object '{0}' was already removed".format(object_name
))