2 Copyright (C) 2015 Red Hat, Inc.
4 LGPL-2.1 or LGPL-3.0. See file COPYING.
7 from contextlib
import contextmanager
20 from ceph_argparse
import json_command
27 Helper method that returns byte representation of the given parameter.
29 if isinstance(param
, str):
30 return param
.encode('utf-8')
34 return str(param
).encode('utf-8')
36 class RadosError(Exception):
38 Something went wrong talking to Ceph with librados
45 log
= logging
.getLogger(__name__
)
47 # Reserved volume group name which we use in paths for volumes
48 # that are not assigned to a group (i.e. created with group=None)
49 NO_GROUP_NAME
= "_nogroup"
51 # Filename extensions for meta files.
52 META_FILE_EXT
= ".meta"
54 class VolumePath(object):
56 Identify a volume's path as group->volume
57 The Volume ID is a unique identifier, but this is a much more
58 helpful thing to pass around.
60 def __init__(self
, group_id
, volume_id
):
61 self
.group_id
= group_id
62 self
.volume_id
= volume_id
63 assert self
.group_id
!= NO_GROUP_NAME
64 assert self
.volume_id
!= "" and self
.volume_id
is not None
67 return "{0}/{1}".format(self
.group_id
, self
.volume_id
)
70 class ClusterTimeout(Exception):
72 Exception indicating that we timed out trying to talk to the Ceph cluster,
73 either to the mons, or to any individual daemon that the mons indicate ought
74 to be up but isn't responding to us.
79 class ClusterError(Exception):
81 Exception indicating that the cluster returned an error to a command that
82 we thought should be successful based on our last knowledge of the cluster
85 def __init__(self
, action
, result_code
, result_str
):
87 self
._result
_code
= result_code
88 self
._result
_str
= result_str
91 return "Error {0} (\"{1}\") while {2}".format(
92 self
._result
_code
, self
._result
_str
, self
._action
)
95 class RankEvicter(threading
.Thread
):
97 Thread for evicting client(s) from a particular MDS daemon instance.
99 This is more complex than simply sending a command, because we have to
100 handle cases where MDS daemons might not be fully up yet, and/or might
101 be transiently unresponsive to commands.
103 class GidGone(Exception):
108 def __init__(self
, volume_client
, client_spec
, rank
, gid
, mds_map
, ready_timeout
):
110 :param client_spec: list of strings, used as filter arguments to "session evict"
111 pass ["id=123"] to evict a single client with session id 123.
115 self
._mds
_map
= mds_map
116 self
._client
_spec
= client_spec
117 self
._volume
_client
= volume_client
118 self
._ready
_timeout
= ready_timeout
119 self
._ready
_waited
= 0
122 self
.exception
= None
124 super(RankEvicter
, self
).__init
__()
126 def _ready_to_evict(self
):
127 if self
._mds
_map
['up'].get("mds_{0}".format(self
.rank
), None) != self
.gid
:
128 log
.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format(
129 self
._client
_spec
, self
.rank
, self
.gid
131 raise RankEvicter
.GidGone()
133 info
= self
._mds
_map
['info']["gid_{0}".format(self
.gid
)]
134 log
.debug("_ready_to_evict: state={0}".format(info
['state']))
135 return info
['state'] in ["up:active", "up:clientreplay"]
137 def _wait_for_ready(self
):
139 Wait for that MDS rank to reach an active or clientreplay state, and
142 while not self
._ready
_to
_evict
():
143 if self
._ready
_waited
> self
._ready
_timeout
:
144 raise ClusterTimeout()
146 time
.sleep(self
.POLL_PERIOD
)
147 self
._ready
_waited
+= self
.POLL_PERIOD
149 self
._mds
_map
= self
._volume
_client
.get_mds_map()
153 Run the eviction procedure. Return true on success, false on errors.
156 # Wait til the MDS is believed by the mon to be available for commands
158 self
._wait
_for
_ready
()
162 # Then send it an evict
163 ret
= errno
.ETIMEDOUT
164 while ret
== errno
.ETIMEDOUT
:
165 log
.debug("mds_command: {0}, {1}".format(
166 "%s" % self
.gid
, ["session", "evict"] + self
._client
_spec
168 ret
, outb
, outs
= self
._volume
_client
.fs
.mds_command(
171 "prefix": "session evict",
172 "filters": self
._client
_spec
174 log
.debug("mds_command: complete {0} {1}".format(ret
, outs
))
176 # If we get a clean response, great, it's gone from that rank.
179 elif ret
== errno
.ETIMEDOUT
:
180 # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error)
181 self
._mds
_map
= self
._volume
_client
.get_mds_map()
183 self
._wait
_for
_ready
()
187 raise ClusterError("Sending evict to mds.{0}".format(self
.gid
), ret
, outs
)
192 except Exception as e
:
199 class EvictionError(Exception):
203 class CephFSVolumeClientError(Exception):
205 Something went wrong talking to Ceph using CephFSVolumeClient.
210 CEPHFSVOLUMECLIENT_VERSION_HISTORY
= """
212 CephFSVolumeClient Version History:
214 * 1 - Initial version
215 * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient
216 * 3 - Allow volumes to be created without RADOS namespace isolation
217 * 4 - Added get_object_and_version, put_object_versioned method to CephFSVolumeClient
218 * 5 - Disallow authorize API for users not created by CephFSVolumeClient
222 class CephFSVolumeClient(object):
224 Combine libcephfs and librados interfaces to implement a
225 'Volume' concept implemented as a cephfs directory and
226 client capabilities which restrict mount access to this
229 Additionally, volumes may be in a 'Group'. Conveniently,
230 volumes are a lot like manila shares, and groups are a lot
231 like manila consistency groups.
233 Refer to volumes with VolumePath, which specifies the
234 volume and group IDs (both strings). The group ID may
237 In general, functions in this class are allowed raise rados.Error
238 or cephfs.Error exceptions in unexpected situations.
244 # Where shall we create our volumes?
245 POOL_PREFIX
= "fsvolume_"
246 DEFAULT_VOL_PREFIX
= "/volumes"
247 DEFAULT_NS_PREFIX
= "fsvolumens_"
249 def __init__(self
, auth_id
=None, conf_path
=None, cluster_name
=None,
250 volume_prefix
=None, pool_ns_prefix
=None, rados
=None,
253 Either set all three of ``auth_id``, ``conf_path`` and
254 ``cluster_name`` (rados constructed on connect), or
255 set ``rados`` (existing rados instance).
258 self
.fs_name
= fs_name
259 self
.connected
= False
261 self
.conf_path
= conf_path
262 self
.cluster_name
= cluster_name
263 self
.auth_id
= auth_id
267 # Using an externally owned rados, so we won't tear it down
269 self
.own_rados
= False
271 # self.rados will be constructed in connect
272 self
.own_rados
= True
274 self
.volume_prefix
= volume_prefix
if volume_prefix
else self
.DEFAULT_VOL_PREFIX
275 self
.pool_ns_prefix
= pool_ns_prefix
if pool_ns_prefix
else self
.DEFAULT_NS_PREFIX
276 # For flock'ing in cephfs, I want a unique ID to distinguish me
277 # from any other manila-share services that are loading this module.
278 # We could use pid, but that's unnecessary weak: generate a
280 self
._id
= struct
.unpack(">Q", uuid
.uuid1().bytes
[0:8])[0]
282 # TODO: version the on-disk structures
285 # Scan all auth keys to see if they're dirty: if they are, they have
286 # state that might not have propagated to Ceph or to the related
289 # Important: we *always* acquire locks in the order auth->volume
290 # That means a volume can never be dirty without the auth key
291 # we're updating it with being dirty at the same time.
293 # First list the auth IDs that have potentially dirty on-disk metadata
294 log
.debug("Recovering from partial auth updates (if any)...")
297 dir_handle
= self
.fs
.opendir(self
.volume_prefix
)
298 except cephfs
.ObjectNotFound
:
299 log
.debug("Nothing to recover. No auth meta files.")
302 d
= self
.fs
.readdir(dir_handle
)
306 log
.debug("Nothing to recover. No auth meta files.")
309 # Identify auth IDs from auth meta filenames. The auth meta files
310 # are named as, "$<auth_id><meta filename extension>"
311 regex
= "^\$(.*){0}$".format(re
.escape(META_FILE_EXT
))
312 match
= re
.search(regex
, d
.d_name
.decode(encoding
='utf-8'))
314 auth_ids
.append(match
.group(1))
316 d
= self
.fs
.readdir(dir_handle
)
318 self
.fs
.closedir(dir_handle
)
320 # Key points based on ordering:
321 # * Anything added in VMeta is already added in AMeta
322 # * Anything added in Ceph is already added in VMeta
323 # * Anything removed in VMeta is already removed in Ceph
324 # * Anything removed in AMeta is already removed in VMeta
326 # Deauthorization: because I only update metadata AFTER the
327 # update of the next level down, I have the same ordering of
328 # -> things which exist in the AMeta should also exist
329 # in the VMeta, should also exist in Ceph, and the same
330 # recovery procedure that gets me consistent after crashes
331 # during authorization will also work during deauthorization
333 # Now for each auth ID, check for dirty flag and apply updates
334 # if dirty flag is found
335 for auth_id
in auth_ids
:
336 with self
._auth
_lock
(auth_id
):
337 auth_meta
= self
._auth
_metadata
_get
(auth_id
)
338 if not auth_meta
or not auth_meta
['volumes']:
339 # Clean up auth meta file
340 self
.fs
.unlink(self
._auth
_metadata
_path
(auth_id
))
342 if not auth_meta
['dirty']:
344 self
._recover
_auth
_meta
(auth_id
, auth_meta
)
346 log
.debug("Recovered from partial auth updates (if any).")
348 def _recover_auth_meta(self
, auth_id
, auth_meta
):
350 Call me after locking the auth meta file.
354 for volume
, volume_data
in auth_meta
['volumes'].items():
355 if not volume_data
['dirty']:
358 (group_id
, volume_id
) = volume
.split('/')
359 group_id
= group_id
if group_id
!= 'None' else None
360 volume_path
= VolumePath(group_id
, volume_id
)
361 access_level
= volume_data
['access_level']
363 with self
._volume
_lock
(volume_path
):
364 vol_meta
= self
._volume
_metadata
_get
(volume_path
)
366 # No VMeta update indicates that there was no auth update
367 # in Ceph either. So it's safe to remove corresponding
368 # partial update in AMeta.
369 if not vol_meta
or auth_id
not in vol_meta
['auths']:
370 remove_volumes
.append(volume
)
374 'access_level': access_level
,
377 # VMeta update looks clean. Ceph auth update must have been
379 if vol_meta
['auths'][auth_id
] == want_auth
:
382 readonly
= access_level
== 'r'
383 client_entity
= "client.{0}".format(auth_id
)
385 existing_caps
= self
._rados
_command
(
388 'entity': client_entity
391 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
394 self
._authorize
_volume
(volume_path
, auth_id
, readonly
, existing_caps
)
396 # Recovered from partial auth updates for the auth ID's access
398 auth_meta
['volumes'][volume
]['dirty'] = False
399 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
401 for volume
in remove_volumes
:
402 del auth_meta
['volumes'][volume
]
404 if not auth_meta
['volumes']:
405 # Clean up auth meta file
406 self
.fs
.unlink(self
._auth
_metadata
_path
(auth_id
))
409 # Recovered from all partial auth updates for the auth ID.
410 auth_meta
['dirty'] = False
411 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
413 def get_mds_map(self
):
414 fs_map
= self
._rados
_command
("fs dump", {})
415 return fs_map
['filesystems'][0]['mdsmap']
417 def evict(self
, auth_id
, timeout
=30, volume_path
=None):
419 Evict all clients based on the authorization ID and optionally based on
420 the volume path mounted. Assumes that the authorization key has been
421 revoked prior to calling this function.
423 This operation can throw an exception if the mon cluster is unresponsive, or
424 any individual MDS daemon is unresponsive for longer than the timeout passed in.
427 client_spec
= ["auth_name={0}".format(auth_id
), ]
429 client_spec
.append("client_metadata.root={0}".
430 format(self
._get
_path
(volume_path
)))
432 log
.info("evict clients with {0}".format(', '.join(client_spec
)))
434 mds_map
= self
.get_mds_map()
436 for name
, gid
in mds_map
['up'].items():
437 # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
438 assert name
.startswith("mds_")
439 up
[int(name
[4:])] = gid
441 # For all MDS ranks held by a daemon
442 # Do the parallelism in python instead of using "tell mds.*", because
443 # the latter doesn't give us per-mds output
445 for rank
, gid
in up
.items():
446 thread
= RankEvicter(self
, client_spec
, rank
, gid
, mds_map
,
449 threads
.append(thread
)
454 log
.info("evict: joined all")
458 msg
= ("Failed to evict client with {0} from mds {1}/{2}: {3}".
459 format(', '.join(client_spec
), t
.rank
, t
.gid
, t
.exception
)
462 raise EvictionError(msg
)
464 def _get_path(self
, volume_path
):
466 Determine the path within CephFS where this volume will live
467 :return: absolute path (string)
471 volume_path
.group_id
if volume_path
.group_id
is not None else NO_GROUP_NAME
,
472 volume_path
.volume_id
)
474 def _get_group_path(self
, group_id
):
476 raise ValueError("group_id may not be None")
483 def _connect(self
, premount_evict
):
484 log
.debug("Connecting to cephfs...")
485 self
.fs
= cephfs
.LibCephFS(rados_inst
=self
.rados
)
486 log
.debug("CephFS initializing...")
488 if premount_evict
is not None:
489 log
.debug("Premount eviction of {0} starting".format(premount_evict
))
490 self
.evict(premount_evict
)
491 log
.debug("Premount eviction of {0} completes".format(premount_evict
))
492 log
.debug("CephFS mounting...")
493 self
.fs
.mount(filesystem_name
=to_bytes(self
.fs_name
))
494 log
.debug("Connection to cephfs complete")
496 # Recover from partial auth updates due to a previous
500 def connect(self
, premount_evict
= None):
503 :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers
504 may want to use this to specify their own auth ID if they expect
505 to be a unique instance and don't want to wait for caps to time
506 out after failure of another instance of themselves.
509 log
.debug("Configuring to RADOS with config {0}...".format(self
.conf_path
))
510 self
.rados
= rados
.Rados(
511 name
="client.{0}".format(self
.auth_id
),
512 clustername
=self
.cluster_name
,
513 conffile
=self
.conf_path
,
516 if self
.rados
.state
!= "connected":
517 log
.debug("Connecting to RADOS...")
519 log
.debug("Connection to RADOS complete")
520 self
._connect
(premount_evict
)
522 def get_mon_addrs(self
):
523 log
.info("get_mon_addrs")
525 mon_map
= self
._rados
_command
("mon dump")
526 for mon
in mon_map
['mons']:
527 ip_port
= mon
['addr'].split("/")[0]
528 result
.append(ip_port
)
532 def disconnect(self
):
533 log
.info("disconnect")
535 log
.debug("Disconnecting cephfs...")
538 log
.debug("Disconnecting cephfs complete")
540 if self
.rados
and self
.own_rados
:
541 log
.debug("Disconnecting rados...")
542 self
.rados
.shutdown()
544 log
.debug("Disconnecting rados complete")
550 def __exit__(self
, exc_type
, exc_val
, exc_tb
):
556 def _get_pool_id(self
, osd_map
, pool_name
):
557 # Maybe borrow the OSDMap wrapper class from calamari if more helpers
558 # like this are needed.
559 for pool
in osd_map
['pools']:
560 if pool
['pool_name'] == pool_name
:
565 def _create_volume_pool(self
, pool_name
):
567 Idempotently create a pool for use as a CephFS data pool, with the given name
569 :return The ID of the created pool
571 osd_map
= self
._rados
_command
('osd dump', {})
573 existing_id
= self
._get
_pool
_id
(osd_map
, pool_name
)
574 if existing_id
is not None:
575 log
.info("Pool {0} already exists".format(pool_name
))
585 osd_map
= self
._rados
_command
('osd dump', {})
586 pool_id
= self
._get
_pool
_id
(osd_map
, pool_name
)
589 # If the pool isn't there, that's either a ceph bug, or it's some outside influence
590 # removing it right after we created it.
591 log
.error("OSD map doesn't contain expected pool '{0}':\n{1}".format(
592 pool_name
, json
.dumps(osd_map
, indent
=2)
594 raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name
))
598 def create_group(self
, group_id
, mode
=0o755):
599 # Prevent craftily-named volume groups from colliding with the meta
601 if group_id
.endswith(META_FILE_EXT
):
602 raise ValueError("group ID cannot end with '{0}'.".format(
604 path
= self
._get
_group
_path
(group_id
)
605 self
._mkdir
_p
(path
, mode
)
607 def destroy_group(self
, group_id
):
608 path
= self
._get
_group
_path
(group_id
)
610 self
.fs
.stat(self
.volume_prefix
)
611 except cephfs
.ObjectNotFound
:
616 def _mkdir_p(self
, path
, mode
=0o755):
619 except cephfs
.ObjectNotFound
:
624 parts
= path
.split(os
.path
.sep
)
626 for i
in range(1, len(parts
) + 1):
627 subpath
= os
.path
.join(*parts
[0:i
])
629 self
.fs
.stat(subpath
)
630 except cephfs
.ObjectNotFound
:
631 self
.fs
.mkdir(subpath
, mode
)
633 def create_volume(self
, volume_path
, size
=None, data_isolated
=False, namespace_isolated
=True,
636 Set up metadata, pools and auth for a volume.
638 This function is idempotent. It is safe to call this again
639 for an already-created volume, even if it is in use.
641 :param volume_path: VolumePath instance
642 :param size: In bytes, or None for no size limit
643 :param data_isolated: If true, create a separate OSD pool for this volume
644 :param namespace_isolated: If true, use separate RADOS namespace for this volume
647 path
= self
._get
_path
(volume_path
)
648 log
.info("create_volume: {0}".format(path
))
650 self
._mkdir
_p
(path
, mode
)
653 self
.fs
.setxattr(path
, 'ceph.quota.max_bytes', to_bytes(size
), 0)
655 # data_isolated means create a separate pool for this volume
657 pool_name
= "{0}{1}".format(self
.POOL_PREFIX
, volume_path
.volume_id
)
658 log
.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path
, pool_name
))
659 pool_id
= self
._create
_volume
_pool
(pool_name
)
660 mds_map
= self
.get_mds_map()
661 if pool_id
not in mds_map
['data_pools']:
662 self
._rados
_command
("fs add_data_pool", {
663 'fs_name': mds_map
['fs_name'],
666 time
.sleep(5) # time for MDSMap to be distributed
667 self
.fs
.setxattr(path
, 'ceph.dir.layout.pool', to_bytes(pool_name
), 0)
669 # enforce security isolation, use separate namespace for this volume
670 if namespace_isolated
:
671 namespace
= "{0}{1}".format(self
.pool_ns_prefix
, volume_path
.volume_id
)
672 log
.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path
, namespace
))
673 self
.fs
.setxattr(path
, 'ceph.dir.layout.pool_namespace',
674 to_bytes(namespace
), 0)
676 # If volume's namespace layout is not set, then the volume's pool
677 # layout remains unset and will undesirably change with ancestor's
678 # pool layout changes.
679 pool_name
= self
._get
_ancestor
_xattr
(path
, "ceph.dir.layout.pool")
680 self
.fs
.setxattr(path
, 'ceph.dir.layout.pool',
681 to_bytes(pool_name
), 0)
683 # Create a volume meta file, if it does not already exist, to store
684 # data about auth ids having access to the volume
685 fd
= self
.fs
.open(self
._volume
_metadata
_path
(volume_path
),
693 def delete_volume(self
, volume_path
, data_isolated
=False):
695 Make a volume inaccessible to guests. This function is
696 idempotent. This is the fast part of tearing down a volume: you must
697 also later call purge_volume, which is the slow part.
699 :param volume_path: Same identifier used in create_volume
703 path
= self
._get
_path
(volume_path
)
704 log
.info("delete_volume: {0}".format(path
))
706 # Create the trash folder if it doesn't already exist
707 trash
= os
.path
.join(self
.volume_prefix
, "_deleting")
710 # We'll move it to here
711 trashed_volume
= os
.path
.join(trash
, volume_path
.volume_id
)
713 # Move the volume's data to the trash folder
716 except cephfs
.ObjectNotFound
:
717 log
.warning("Trying to delete volume '{0}' but it's already gone".format(
720 self
.fs
.rename(path
, trashed_volume
)
722 # Delete the volume meta file, if it's not already deleted
723 vol_meta_path
= self
._volume
_metadata
_path
(volume_path
)
725 self
.fs
.unlink(vol_meta_path
)
726 except cephfs
.ObjectNotFound
:
729 def purge_volume(self
, volume_path
, data_isolated
=False):
731 Finish clearing up a volume that was previously passed to delete_volume. This
732 function is idempotent.
735 trash
= os
.path
.join(self
.volume_prefix
, "_deleting")
736 trashed_volume
= os
.path
.join(trash
, volume_path
.volume_id
)
739 self
.fs
.stat(trashed_volume
)
740 except cephfs
.ObjectNotFound
:
741 log
.warning("Trying to purge volume '{0}' but it's already been purged".format(
745 def rmtree(root_path
):
746 log
.debug("rmtree {0}".format(root_path
))
747 dir_handle
= self
.fs
.opendir(root_path
)
748 d
= self
.fs
.readdir(dir_handle
)
750 d_name
= d
.d_name
.decode(encoding
='utf-8')
751 if d_name
not in [".", ".."]:
752 # Do not use os.path.join because it is sensitive
753 # to string encoding, we just pass through dnames
755 d_full
= u
"{0}/{1}".format(root_path
, d_name
)
759 self
.fs
.unlink(d_full
)
761 d
= self
.fs
.readdir(dir_handle
)
762 self
.fs
.closedir(dir_handle
)
764 self
.fs
.rmdir(root_path
)
766 rmtree(trashed_volume
)
769 pool_name
= "{0}{1}".format(self
.POOL_PREFIX
, volume_path
.volume_id
)
770 osd_map
= self
._rados
_command
("osd dump", {})
771 pool_id
= self
._get
_pool
_id
(osd_map
, pool_name
)
772 mds_map
= self
.get_mds_map()
773 if pool_id
in mds_map
['data_pools']:
774 self
._rados
_command
("fs rm_data_pool", {
775 'fs_name': mds_map
['fs_name'],
778 self
._rados
_command
("osd pool delete",
782 "yes_i_really_really_mean_it": True
785 def _get_ancestor_xattr(self
, path
, attr
):
787 Helper for reading layout information: if this xattr is missing
788 on the requested path, keep checking parents until we find it.
791 result
= self
.fs
.getxattr(path
, attr
).decode()
793 # Annoying! cephfs gives us empty instead of an error when attr not found
794 raise cephfs
.NoData()
797 except cephfs
.NoData
:
801 return self
._get
_ancestor
_xattr
(os
.path
.split(path
)[0], attr
)
803 def _check_compat_version(self
, compat_version
):
804 if self
.version
< compat_version
:
805 msg
= ("The current version of CephFSVolumeClient, version {0} "
806 "does not support the required feature. Need version {1} "
807 "or greater".format(self
.version
, compat_version
)
810 raise CephFSVolumeClientError(msg
)
812 def _metadata_get(self
, path
):
814 Return a deserialized JSON object, or None
816 fd
= self
.fs
.open(path
, "r")
817 # TODO iterate instead of assuming file < 4MB
818 read_bytes
= self
.fs
.read(fd
, 0, 4096 * 1024)
821 return json
.loads(read_bytes
.decode())
825 def _metadata_set(self
, path
, data
):
826 serialized
= json
.dumps(data
)
827 fd
= self
.fs
.open(path
, "w")
829 self
.fs
.write(fd
, to_bytes(serialized
), 0)
834 def _lock(self
, path
):
838 fd
= self
.fs
.open(path
, os
.O_CREAT
, 0o755)
839 self
.fs
.flock(fd
, fcntl
.LOCK_EX
, self
._id
)
841 # The locked file will be cleaned up sometime. It could be
842 # unlinked e.g., by an another manila share instance, before
843 # lock was applied on it. Perform checks to ensure that this
846 statbuf
= self
.fs
.stat(path
)
847 except cephfs
.ObjectNotFound
:
851 fstatbuf
= self
.fs
.fstat(fd
)
852 if statbuf
.st_ino
== fstatbuf
.st_ino
:
858 self
.fs
.flock(fd
, fcntl
.LOCK_UN
, self
._id
)
863 def _auth_metadata_path(self
, auth_id
):
864 return os
.path
.join(self
.volume_prefix
, "${0}{1}".format(
865 auth_id
, META_FILE_EXT
))
867 def _auth_lock(self
, auth_id
):
868 return self
._lock
(self
._auth
_metadata
_path
(auth_id
))
870 def _auth_metadata_get(self
, auth_id
):
872 Call me with the metadata locked!
874 Check whether a auth metadata structure can be decoded by the current
875 version of CephFSVolumeClient.
877 Return auth metadata that the current version of CephFSVolumeClient
880 auth_metadata
= self
._metadata
_get
(self
._auth
_metadata
_path
(auth_id
))
883 self
._check
_compat
_version
(auth_metadata
['compat_version'])
887 def _auth_metadata_set(self
, auth_id
, data
):
889 Call me with the metadata locked!
891 Fsync the auth metadata.
893 Add two version attributes to the auth metadata,
894 'compat_version', the minimum CephFSVolumeClient version that can
895 decode the metadata, and 'version', the CephFSVolumeClient version
896 that encoded the metadata.
898 data
['compat_version'] = 1
899 data
['version'] = self
.version
900 return self
._metadata
_set
(self
._auth
_metadata
_path
(auth_id
), data
)
902 def _volume_metadata_path(self
, volume_path
):
903 return os
.path
.join(self
.volume_prefix
, "_{0}:{1}{2}".format(
904 volume_path
.group_id
if volume_path
.group_id
else "",
905 volume_path
.volume_id
,
909 def _volume_lock(self
, volume_path
):
911 Return a ContextManager which locks the authorization metadata for
912 a particular volume, and persists a flag to the metadata indicating
913 that it is currently locked, so that we can detect dirty situations
916 This lock isn't just to make access to the metadata safe: it's also
917 designed to be used over the two-step process of checking the
918 metadata and then responding to an authorization request, to
919 ensure that at the point we respond the metadata hasn't changed
920 in the background. It's key to how we avoid security holes
921 resulting from races during that problem ,
923 return self
._lock
(self
._volume
_metadata
_path
(volume_path
))
925 def _volume_metadata_get(self
, volume_path
):
927 Call me with the metadata locked!
929 Check whether a volume metadata structure can be decoded by the current
930 version of CephFSVolumeClient.
932 Return a volume_metadata structure that the current version of
933 CephFSVolumeClient can decode.
935 volume_metadata
= self
._metadata
_get
(self
._volume
_metadata
_path
(volume_path
))
938 self
._check
_compat
_version
(volume_metadata
['compat_version'])
940 return volume_metadata
942 def _volume_metadata_set(self
, volume_path
, data
):
944 Call me with the metadata locked!
946 Add two version attributes to the volume metadata,
947 'compat_version', the minimum CephFSVolumeClient version that can
948 decode the metadata and 'version', the CephFSVolumeClient version
949 that encoded the metadata.
951 data
['compat_version'] = 1
952 data
['version'] = self
.version
953 return self
._metadata
_set
(self
._volume
_metadata
_path
(volume_path
), data
)
955 def _prepare_updated_caps_list(self
, existing_caps
, mds_cap_str
, osd_cap_str
, authorize
=True):
957 for k
, v
in existing_caps
['caps'].items():
958 if k
== 'mds' or k
== 'osd':
961 if not authorize
and v
== 'allow r':
963 caps_list
.extend((k
,v
))
966 caps_list
.extend(('mds', mds_cap_str
))
968 caps_list
.extend(('osd', osd_cap_str
))
970 if authorize
and 'mon' not in caps_list
:
971 caps_list
.extend(('mon', 'allow r'))
975 def authorize(self
, volume_path
, auth_id
, readonly
=False, tenant_id
=None, allow_existing_id
=False):
977 Get-or-create a Ceph auth identity for `auth_id` and grant them access
982 :param tenant_id: Optionally provide a stringizable object to
983 restrict any created cephx IDs to other callers
984 passing the same tenant ID.
985 :allow_existing_id: Optionally authorize existing auth-ids not
986 created by ceph_volume_client
990 with self
._auth
_lock
(auth_id
):
991 client_entity
= "client.{0}".format(auth_id
)
993 existing_caps
= self
._rados
_command
(
996 'entity': client_entity
999 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1001 existing_caps
= None
1003 # Existing meta, or None, to be updated
1004 auth_meta
= self
._auth
_metadata
_get
(auth_id
)
1006 # volume data to be inserted
1007 volume_path_str
= str(volume_path
)
1010 # The access level at which the auth_id is authorized to
1011 # access the volume.
1012 'access_level': 'r' if readonly
else 'rw',
1017 if auth_meta
is None:
1018 if not allow_existing_id
and existing_caps
is not None:
1019 msg
= "auth ID: {0} exists and not created by ceph_volume_client. Not allowed to modify".format(auth_id
)
1021 raise CephFSVolumeClientError(msg
)
1023 # non-existent auth IDs
1024 sys
.stderr
.write("Creating meta for ID {0} with tenant {1}\n".format(
1027 log
.debug("Authorize: no existing meta")
1030 'tenant_id': tenant_id
.__str
__() if tenant_id
else None,
1034 # Disallow tenants to share auth IDs
1035 if auth_meta
['tenant_id'].__str
__() != tenant_id
.__str
__():
1036 msg
= "auth ID: {0} is already in use".format(auth_id
)
1038 raise CephFSVolumeClientError(msg
)
1040 if auth_meta
['dirty']:
1041 self
._recover
_auth
_meta
(auth_id
, auth_meta
)
1043 log
.debug("Authorize: existing tenant {tenant}".format(
1044 tenant
=auth_meta
['tenant_id']
1046 auth_meta
['dirty'] = True
1047 auth_meta
['volumes'].update(volume
)
1049 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
1051 with self
._volume
_lock
(volume_path
):
1052 key
= self
._authorize
_volume
(volume_path
, auth_id
, readonly
, existing_caps
)
1054 auth_meta
['dirty'] = False
1055 auth_meta
['volumes'][volume_path_str
]['dirty'] = False
1056 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
1063 # Caller wasn't multi-tenant aware: be safe and don't give
1069 def _authorize_volume(self
, volume_path
, auth_id
, readonly
, existing_caps
):
1070 vol_meta
= self
._volume
_metadata
_get
(volume_path
)
1072 access_level
= 'r' if readonly
else 'rw'
1075 'access_level': access_level
,
1080 if vol_meta
is None:
1085 vol_meta
['auths'].update(auth
)
1086 self
._volume
_metadata
_set
(volume_path
, vol_meta
)
1088 key
= self
._authorize
_ceph
(volume_path
, auth_id
, readonly
, existing_caps
)
1090 vol_meta
['auths'][auth_id
]['dirty'] = False
1091 self
._volume
_metadata
_set
(volume_path
, vol_meta
)
1095 def _authorize_ceph(self
, volume_path
, auth_id
, readonly
, existing_caps
):
1096 path
= self
._get
_path
(volume_path
)
1097 log
.debug("Authorizing Ceph id '{0}' for path '{1}'".format(
1101 # First I need to work out what the data pool is for this share:
1103 pool_name
= self
._get
_ancestor
_xattr
(path
, "ceph.dir.layout.pool")
1106 namespace
= self
.fs
.getxattr(path
, "ceph.dir.layout.pool_"
1107 "namespace").decode()
1108 except cephfs
.NoData
:
1111 # Now construct auth capabilities that give the guest just enough
1112 # permissions to access the share
1113 client_entity
= "client.{0}".format(auth_id
)
1114 want_access_level
= 'r' if readonly
else 'rw'
1115 want_mds_cap
= 'allow {0} path={1}'.format(want_access_level
, path
)
1117 want_osd_cap
= 'allow {0} pool={1} namespace={2}'.format(
1118 want_access_level
, pool_name
, namespace
)
1120 want_osd_cap
= 'allow {0} pool={1}'.format(want_access_level
,
1123 if existing_caps
is None:
1124 caps
= self
._rados
_command
(
1125 'auth get-or-create',
1127 'entity': client_entity
,
1129 'mds', want_mds_cap
,
1130 'osd', want_osd_cap
,
1134 # entity exists, update it
1135 cap
= existing_caps
[0]
1137 # Construct auth caps that if present might conflict with the desired
1139 unwanted_access_level
= 'r' if want_access_level
== 'rw' else 'rw'
1140 unwanted_mds_cap
= 'allow {0} path={1}'.format(unwanted_access_level
, path
)
1142 unwanted_osd_cap
= 'allow {0} pool={1} namespace={2}'.format(
1143 unwanted_access_level
, pool_name
, namespace
)
1145 unwanted_osd_cap
= 'allow {0} pool={1}'.format(
1146 unwanted_access_level
, pool_name
)
1149 orig_mds_caps
, orig_osd_caps
, want_mds_cap
,
1150 want_osd_cap
, unwanted_mds_cap
, unwanted_osd_cap
):
1152 if not orig_mds_caps
:
1153 return want_mds_cap
, want_osd_cap
1155 mds_cap_tokens
= [x
.strip() for x
in orig_mds_caps
.split(",")]
1156 osd_cap_tokens
= [x
.strip() for x
in orig_osd_caps
.split(",")]
1158 if want_mds_cap
in mds_cap_tokens
:
1159 return orig_mds_caps
, orig_osd_caps
1161 if unwanted_mds_cap
in mds_cap_tokens
:
1162 mds_cap_tokens
.remove(unwanted_mds_cap
)
1163 osd_cap_tokens
.remove(unwanted_osd_cap
)
1165 mds_cap_tokens
.append(want_mds_cap
)
1166 osd_cap_tokens
.append(want_osd_cap
)
1168 return ",".join(mds_cap_tokens
), ",".join(osd_cap_tokens
)
1170 orig_mds_caps
= cap
['caps'].get('mds', "")
1171 orig_osd_caps
= cap
['caps'].get('osd', "")
1173 mds_cap_str
, osd_cap_str
= cap_update(
1174 orig_mds_caps
, orig_osd_caps
, want_mds_cap
, want_osd_cap
,
1175 unwanted_mds_cap
, unwanted_osd_cap
)
1177 caps_list
= self
._prepare
_updated
_caps
_list
(cap
, mds_cap_str
, osd_cap_str
)
1178 caps
= self
._rados
_command
(
1181 'entity': client_entity
,
1185 caps
= self
._rados
_command
(
1188 'entity': client_entity
1192 # Result expected like this:
1195 # "entity": "client.foobar",
1196 # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==",
1203 assert len(caps
) == 1
1204 assert caps
[0]['entity'] == client_entity
1205 return caps
[0]['key']
1207 def deauthorize(self
, volume_path
, auth_id
):
1208 with self
._auth
_lock
(auth_id
):
1209 # Existing meta, or None, to be updated
1210 auth_meta
= self
._auth
_metadata
_get
(auth_id
)
1212 volume_path_str
= str(volume_path
)
1213 if (auth_meta
is None) or (not auth_meta
['volumes']):
1214 log
.warning("deauthorized called for already-removed auth"
1215 "ID '{auth_id}' for volume ID '{volume}'".format(
1216 auth_id
=auth_id
, volume
=volume_path
.volume_id
1218 # Clean up the auth meta file of an auth ID
1219 self
.fs
.unlink(self
._auth
_metadata
_path
(auth_id
))
1222 if volume_path_str
not in auth_meta
['volumes']:
1223 log
.warning("deauthorized called for already-removed auth"
1224 "ID '{auth_id}' for volume ID '{volume}'".format(
1225 auth_id
=auth_id
, volume
=volume_path
.volume_id
1229 if auth_meta
['dirty']:
1230 self
._recover
_auth
_meta
(auth_id
, auth_meta
)
1232 auth_meta
['dirty'] = True
1233 auth_meta
['volumes'][volume_path_str
]['dirty'] = True
1234 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
1236 self
._deauthorize
_volume
(volume_path
, auth_id
)
1238 # Filter out the volume we're deauthorizing
1239 del auth_meta
['volumes'][volume_path_str
]
1241 # Clean up auth meta file
1242 if not auth_meta
['volumes']:
1243 self
.fs
.unlink(self
._auth
_metadata
_path
(auth_id
))
1246 auth_meta
['dirty'] = False
1247 self
._auth
_metadata
_set
(auth_id
, auth_meta
)
1249 def _deauthorize_volume(self
, volume_path
, auth_id
):
1250 with self
._volume
_lock
(volume_path
):
1251 vol_meta
= self
._volume
_metadata
_get
(volume_path
)
1253 if (vol_meta
is None) or (auth_id
not in vol_meta
['auths']):
1254 log
.warning("deauthorized called for already-removed auth"
1255 "ID '{auth_id}' for volume ID '{volume}'".format(
1256 auth_id
=auth_id
, volume
=volume_path
.volume_id
1260 vol_meta
['auths'][auth_id
]['dirty'] = True
1261 self
._volume
_metadata
_set
(volume_path
, vol_meta
)
1263 self
._deauthorize
(volume_path
, auth_id
)
1265 # Remove the auth_id from the metadata *after* removing it
1266 # from ceph, so that if we crashed here, we would actually
1267 # recreate the auth ID during recovery (i.e. end up with
1268 # a consistent state).
1270 # Filter out the auth we're removing
1271 del vol_meta
['auths'][auth_id
]
1272 self
._volume
_metadata
_set
(volume_path
, vol_meta
)
1274 def _deauthorize(self
, volume_path
, auth_id
):
1276 The volume must still exist.
1278 client_entity
= "client.{0}".format(auth_id
)
1279 path
= self
._get
_path
(volume_path
)
1280 pool_name
= self
._get
_ancestor
_xattr
(path
, "ceph.dir.layout.pool")
1282 namespace
= self
.fs
.getxattr(path
, "ceph.dir.layout.pool_"
1283 "namespace").decode()
1284 except cephfs
.NoData
:
1287 # The auth_id might have read-only or read-write mount access for the
1289 access_levels
= ('r', 'rw')
1290 want_mds_caps
= ['allow {0} path={1}'.format(access_level
, path
)
1291 for access_level
in access_levels
]
1293 want_osd_caps
= ['allow {0} pool={1} namespace={2}'.format(access_level
, pool_name
, namespace
)
1294 for access_level
in access_levels
]
1296 want_osd_caps
= ['allow {0} pool={1}'.format(access_level
, pool_name
)
1297 for access_level
in access_levels
]
1301 existing
= self
._rados
_command
(
1304 'entity': client_entity
1308 def cap_remove(orig_mds_caps
, orig_osd_caps
, want_mds_caps
, want_osd_caps
):
1309 mds_cap_tokens
= [x
.strip() for x
in orig_mds_caps
.split(",")]
1310 osd_cap_tokens
= [x
.strip() for x
in orig_osd_caps
.split(",")]
1312 for want_mds_cap
, want_osd_cap
in zip(want_mds_caps
, want_osd_caps
):
1313 if want_mds_cap
in mds_cap_tokens
:
1314 mds_cap_tokens
.remove(want_mds_cap
)
1315 osd_cap_tokens
.remove(want_osd_cap
)
1318 return ",".join(mds_cap_tokens
), ",".join(osd_cap_tokens
)
1321 orig_mds_caps
= cap
['caps'].get('mds', "")
1322 orig_osd_caps
= cap
['caps'].get('osd', "")
1323 mds_cap_str
, osd_cap_str
= cap_remove(orig_mds_caps
, orig_osd_caps
,
1324 want_mds_caps
, want_osd_caps
)
1326 caps_list
= self
._prepare
_updated
_caps
_list
(cap
, mds_cap_str
, osd_cap_str
, authorize
=False)
1328 self
._rados
_command
('auth del', {'entity': client_entity
}, decode
=False)
1330 self
._rados
_command
(
1333 'entity': client_entity
,
1337 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1339 # Already gone, great.
1342 def get_authorized_ids(self
, volume_path
):
1344 Expose a list of auth IDs that have access to a volume.
1346 return: a list of (auth_id, access_level) tuples, where
1347 the access_level can be 'r' , or 'rw'.
1348 None if no auth ID is given access to the volume.
1350 with self
._volume
_lock
(volume_path
):
1351 meta
= self
._volume
_metadata
_get
(volume_path
)
1353 if not meta
or not meta
['auths']:
1356 for auth
, auth_data
in meta
['auths'].items():
1357 # Skip partial auth updates.
1358 if not auth_data
['dirty']:
1359 auths
.append((auth
, auth_data
['access_level']))
1363 def _rados_command(self
, prefix
, args
=None, decode
=True):
1365 Safer wrapper for ceph_argparse.json_command, which raises
1366 Error exception instead of relying on caller to check return
1369 Error exception can result from:
1371 * Actual legitimate errors
1372 * Malformed JSON output
1374 return: Decoded object from ceph, or None if empty string returned.
1375 If decode is False, return a string (the data returned by
1381 argdict
= args
.copy()
1382 argdict
['format'] = 'json'
1384 ret
, outbuf
, outs
= json_command(self
.rados
,
1387 timeout
=RADOS_TIMEOUT
)
1389 raise rados
.Error(outs
)
1394 return json
.loads(outbuf
.decode())
1395 except (ValueError, TypeError):
1396 raise RadosError("Invalid JSON output for command {0}".format(argdict
))
1402 def get_used_bytes(self
, volume_path
):
1403 return int(self
.fs
.getxattr(self
._get
_path
(volume_path
), "ceph.dir."
1406 def set_max_bytes(self
, volume_path
, max_bytes
):
1407 self
.fs
.setxattr(self
._get
_path
(volume_path
), 'ceph.quota.max_bytes',
1408 to_bytes(max_bytes
if max_bytes
else 0), 0)
1410 def _snapshot_path(self
, dir_path
, snapshot_name
):
1411 return os
.path
.join(
1412 dir_path
, self
.rados
.conf_get('client_snapdir'), snapshot_name
1415 def _snapshot_create(self
, dir_path
, snapshot_name
, mode
=0o755):
1416 # TODO: raise intelligible exception for clusters where snaps are disabled
1417 self
.fs
.mkdir(self
._snapshot
_path
(dir_path
, snapshot_name
), mode
)
1419 def _snapshot_destroy(self
, dir_path
, snapshot_name
):
1421 Remove a snapshot, or do nothing if it already doesn't exist.
1424 self
.fs
.rmdir(self
._snapshot
_path
(dir_path
, snapshot_name
))
1425 except cephfs
.ObjectNotFound
:
1426 log
.warning("Snapshot was already gone: {0}".format(snapshot_name
))
1428 def create_snapshot_volume(self
, volume_path
, snapshot_name
, mode
=0o755):
1429 self
._snapshot
_create
(self
._get
_path
(volume_path
), snapshot_name
, mode
)
1431 def destroy_snapshot_volume(self
, volume_path
, snapshot_name
):
1432 self
._snapshot
_destroy
(self
._get
_path
(volume_path
), snapshot_name
)
1434 def create_snapshot_group(self
, group_id
, snapshot_name
, mode
=0o755):
1435 if group_id
is None:
1436 raise RuntimeError("Group ID may not be None")
1438 return self
._snapshot
_create
(self
._get
_group
_path
(group_id
), snapshot_name
,
1441 def destroy_snapshot_group(self
, group_id
, snapshot_name
):
1442 if group_id
is None:
1443 raise RuntimeError("Group ID may not be None")
1444 if snapshot_name
is None:
1445 raise RuntimeError("Snapshot name may not be None")
1447 return self
._snapshot
_destroy
(self
._get
_group
_path
(group_id
), snapshot_name
)
1449 def _cp_r(self
, src
, dst
):
1451 raise NotImplementedError()
1453 def clone_volume_to_existing(self
, dest_volume_path
, src_volume_path
, src_snapshot_name
):
1454 dest_fs_path
= self
._get
_path
(dest_volume_path
)
1455 src_snapshot_path
= self
._snapshot
_path
(self
._get
_path
(src_volume_path
), src_snapshot_name
)
1457 self
._cp
_r
(src_snapshot_path
, dest_fs_path
)
1459 def put_object(self
, pool_name
, object_name
, data
):
1461 Synchronously write data to an object.
1463 :param pool_name: name of the pool
1464 :type pool_name: str
1465 :param object_name: name of the object
1466 :type object_name: str
1467 :param data: data to write
1470 return self
.put_object_versioned(pool_name
, object_name
, data
)
1472 def put_object_versioned(self
, pool_name
, object_name
, data
, version
=None):
1474 Synchronously write data to an object only if version of the object
1475 version matches the expected version.
1477 :param pool_name: name of the pool
1478 :type pool_name: str
1479 :param object_name: name of the object
1480 :type object_name: str
1481 :param data: data to write
1483 :param version: expected version of the object to write
1486 ioctx
= self
.rados
.open_ioctx(pool_name
)
1488 max_size
= int(self
.rados
.conf_get('osd_max_write_size')) * 1024 * 1024
1489 if len(data
) > max_size
:
1490 msg
= ("Data to be written to object '{0}' exceeds "
1491 "{1} bytes".format(object_name
, max_size
))
1493 raise CephFSVolumeClientError(msg
)
1496 with rados
.WriteOpCtx() as wop
:
1497 if version
is not None:
1498 wop
.assert_version(version
)
1499 wop
.write_full(data
)
1500 ioctx
.operate_write_op(wop
, object_name
)
1501 except rados
.OSError as e
:
1507 def get_object(self
, pool_name
, object_name
):
1509 Synchronously read data from object.
1511 :param pool_name: name of the pool
1512 :type pool_name: str
1513 :param object_name: name of the object
1514 :type object_name: str
1516 :returns: bytes - data read from object
1518 return self
.get_object_and_version(pool_name
, object_name
)[0]
1520 def get_object_and_version(self
, pool_name
, object_name
):
1522 Synchronously read data from object and get its version.
1524 :param pool_name: name of the pool
1525 :type pool_name: str
1526 :param object_name: name of the object
1527 :type object_name: str
1529 :returns: tuple of object data and version
1531 ioctx
= self
.rados
.open_ioctx(pool_name
)
1532 max_size
= int(self
.rados
.conf_get('osd_max_write_size')) * 1024 * 1024
1534 bytes_read
= ioctx
.read(object_name
, max_size
)
1535 if ((len(bytes_read
) == max_size
) and
1536 (ioctx
.read(object_name
, 1, offset
=max_size
))):
1537 log
.warning("Size of object {0} exceeds '{1}' bytes "
1538 "read".format(object_name
, max_size
))
1539 obj_version
= ioctx
.get_last_version()
1542 return (bytes_read
, obj_version
)
1544 def delete_object(self
, pool_name
, object_name
):
1545 ioctx
= self
.rados
.open_ioctx(pool_name
)
1547 ioctx
.remove_object(object_name
)
1548 except rados
.ObjectNotFound
:
1549 log
.warning("Object '{0}' was already removed".format(object_name
))