]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/ceph_volume_client.py
bump version to 12.2.1-pve3
[ceph.git] / ceph / src / pybind / ceph_volume_client.py
CommitLineData
7c673cae
FG
1"""
2Copyright (C) 2015 Red Hat, Inc.
3
4LGPL2. See file COPYING.
5"""
6
7from contextlib import contextmanager
8import errno
9import fcntl
10import json
11import logging
12import os
13import re
14import struct
15import sys
16import threading
17import time
18import uuid
19
20from ceph_argparse import json_command
21
22import cephfs
23import rados
24
25
26class RadosError(Exception):
27 """
28 Something went wrong talking to Ceph with librados
29 """
30 pass
31
32
33RADOS_TIMEOUT = 10
34SNAP_DIR = ".snap"
35
36log = logging.getLogger(__name__)
37
38
39# Reserved volume group name which we use in paths for volumes
40# that are not assigned to a group (i.e. created with group=None)
41NO_GROUP_NAME = "_nogroup"
42
43# Filename extensions for meta files.
44META_FILE_EXT = ".meta"
45
46class VolumePath(object):
47 """
48 Identify a volume's path as group->volume
49 The Volume ID is a unique identifier, but this is a much more
50 helpful thing to pass around.
51 """
52 def __init__(self, group_id, volume_id):
53 self.group_id = group_id
54 self.volume_id = volume_id
55 assert self.group_id != NO_GROUP_NAME
56 assert self.volume_id != "" and self.volume_id is not None
57
58 def __str__(self):
59 return "{0}/{1}".format(self.group_id, self.volume_id)
60
61
62class ClusterTimeout(Exception):
63 """
64 Exception indicating that we timed out trying to talk to the Ceph cluster,
65 either to the mons, or to any individual daemon that the mons indicate ought
66 to be up but isn't responding to us.
67 """
68 pass
69
70
71class ClusterError(Exception):
72 """
73 Exception indicating that the cluster returned an error to a command that
74 we thought should be successful based on our last knowledge of the cluster
75 state.
76 """
77 def __init__(self, action, result_code, result_str):
78 self._action = action
79 self._result_code = result_code
80 self._result_str = result_str
81
82 def __str__(self):
83 return "Error {0} (\"{1}\") while {2}".format(
84 self._result_code, self._result_str, self._action)
85
86
87class RankEvicter(threading.Thread):
88 """
89 Thread for evicting client(s) from a particular MDS daemon instance.
90
91 This is more complex than simply sending a command, because we have to
92 handle cases where MDS daemons might not be fully up yet, and/or might
93 be transiently unresponsive to commands.
94 """
95 class GidGone(Exception):
96 pass
97
98 POLL_PERIOD = 5
99
100 def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout):
101 """
102 :param client_spec: list of strings, used as filter arguments to "session evict"
103 pass ["id=123"] to evict a single client with session id 123.
104 """
105 self.rank = rank
106 self.gid = gid
107 self._mds_map = mds_map
108 self._client_spec = client_spec
109 self._volume_client = volume_client
110 self._ready_timeout = ready_timeout
111 self._ready_waited = 0
112
113 self.success = False
114 self.exception = None
115
116 super(RankEvicter, self).__init__()
117
118 def _ready_to_evict(self):
119 if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid:
120 log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format(
121 self._client_spec, self.rank, self.gid
122 ))
123 raise RankEvicter.GidGone()
124
125 info = self._mds_map['info']["gid_{0}".format(self.gid)]
126 log.debug("_ready_to_evict: state={0}".format(info['state']))
127 return info['state'] in ["up:active", "up:clientreplay"]
128
129 def _wait_for_ready(self):
130 """
131 Wait for that MDS rank to reach an active or clientreplay state, and
132 not be laggy.
133 """
134 while not self._ready_to_evict():
135 if self._ready_waited > self._ready_timeout:
136 raise ClusterTimeout()
137
138 time.sleep(self.POLL_PERIOD)
139 self._ready_waited += self.POLL_PERIOD
140
141 self._mds_map = self._volume_client._rados_command("mds dump", {})
142
143 def _evict(self):
144 """
145 Run the eviction procedure. Return true on success, false on errors.
146 """
147
148 # Wait til the MDS is believed by the mon to be available for commands
149 try:
150 self._wait_for_ready()
151 except self.GidGone:
152 return True
153
154 # Then send it an evict
155 ret = errno.ETIMEDOUT
156 while ret == errno.ETIMEDOUT:
157 log.debug("mds_command: {0}, {1}".format(
158 "%s" % self.gid, ["session", "evict"] + self._client_spec
159 ))
160 ret, outb, outs = self._volume_client.fs.mds_command(
161 "%s" % self.gid,
162 [json.dumps({
163 "prefix": "session evict",
164 "filters": self._client_spec
165 })], "")
166 log.debug("mds_command: complete {0} {1}".format(ret, outs))
167
168 # If we get a clean response, great, it's gone from that rank.
169 if ret == 0:
170 return True
171 elif ret == errno.ETIMEDOUT:
172 # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error)
173 self._mds_map = self._volume_client._rados_command("mds dump", {})
174 try:
175 self._wait_for_ready()
176 except self.GidGone:
177 return True
178 else:
179 raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs)
180
181 def run(self):
182 try:
183 self._evict()
184 except Exception as e:
185 self.success = False
186 self.exception = e
187 else:
188 self.success = True
189
190
191class EvictionError(Exception):
192 pass
193
194
195class CephFSVolumeClientError(Exception):
196 """
197 Something went wrong talking to Ceph using CephFSVolumeClient.
198 """
199 pass
200
201
202CEPHFSVOLUMECLIENT_VERSION_HISTORY = """
203
204 CephFSVolumeClient Version History:
205
206 * 1 - Initial version
207
208"""
209
210
211class CephFSVolumeClient(object):
212 """
213 Combine libcephfs and librados interfaces to implement a
214 'Volume' concept implemented as a cephfs directory and
215 client capabilities which restrict mount access to this
216 directory.
217
218 Additionally, volumes may be in a 'Group'. Conveniently,
219 volumes are a lot like manila shares, and groups are a lot
220 like manila consistency groups.
221
222 Refer to volumes with VolumePath, which specifies the
223 volume and group IDs (both strings). The group ID may
224 be None.
225
226 In general, functions in this class are allowed raise rados.Error
227 or cephfs.Error exceptions in unexpected situations.
228 """
229
230 # Current version
231 version = 1
232 # Earliest compatible version
233 compat_version = 1
234
235 # Where shall we create our volumes?
236 POOL_PREFIX = "fsvolume_"
237 DEFAULT_VOL_PREFIX = "/volumes"
238 DEFAULT_NS_PREFIX = "fsvolumens_"
239
240 def __init__(self, auth_id, conf_path, cluster_name, volume_prefix=None, pool_ns_prefix=None):
241 self.fs = None
242 self.rados = None
243 self.connected = False
244 self.conf_path = conf_path
245 self.cluster_name = cluster_name
246 self.auth_id = auth_id
247 self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX
248 self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX
249 # For flock'ing in cephfs, I want a unique ID to distinguish me
250 # from any other manila-share services that are loading this module.
251 # We could use pid, but that's unnecessary weak: generate a
252 # UUID
253 self._id = struct.unpack(">Q", uuid.uuid1().get_bytes()[0:8])[0]
254
255 # TODO: version the on-disk structures
256
257 def recover(self):
258 # Scan all auth keys to see if they're dirty: if they are, they have
259 # state that might not have propagated to Ceph or to the related
260 # volumes yet.
261
262 # Important: we *always* acquire locks in the order auth->volume
263 # That means a volume can never be dirty without the auth key
264 # we're updating it with being dirty at the same time.
265
266 # First list the auth IDs that have potentially dirty on-disk metadata
267 log.debug("Recovering from partial auth updates (if any)...")
268
269 try:
270 dir_handle = self.fs.opendir(self.volume_prefix)
271 except cephfs.ObjectNotFound:
272 log.debug("Nothing to recover. No auth meta files.")
273 return
274
275 d = self.fs.readdir(dir_handle)
276 auth_ids = []
277
278 if not d:
279 log.debug("Nothing to recover. No auth meta files.")
280
281 while d:
282 # Identify auth IDs from auth meta filenames. The auth meta files
283 # are named as, "$<auth_id><meta filename extension>"
284 regex = "^\$(.*){0}$".format(re.escape(META_FILE_EXT))
285 match = re.search(regex, d.d_name)
286 if match:
287 auth_ids.append(match.group(1))
288
289 d = self.fs.readdir(dir_handle)
290
291 self.fs.closedir(dir_handle)
292
293 # Key points based on ordering:
294 # * Anything added in VMeta is already added in AMeta
295 # * Anything added in Ceph is already added in VMeta
296 # * Anything removed in VMeta is already removed in Ceph
297 # * Anything removed in AMeta is already removed in VMeta
298
299 # Deauthorization: because I only update metadata AFTER the
300 # update of the next level down, I have the same ordering of
301 # -> things which exist in the AMeta should also exist
302 # in the VMeta, should also exist in Ceph, and the same
303 # recovery procedure that gets me consistent after crashes
304 # during authorization will also work during deauthorization
305
306 # Now for each auth ID, check for dirty flag and apply updates
307 # if dirty flag is found
308 for auth_id in auth_ids:
309 with self._auth_lock(auth_id):
310 auth_meta = self._auth_metadata_get(auth_id)
311 if not auth_meta or not auth_meta['volumes']:
312 # Clean up auth meta file
313 self.fs.unlink(self._auth_metadata_path(auth_id))
314 continue
315 if not auth_meta['dirty']:
316 continue
317 self._recover_auth_meta(auth_id, auth_meta)
318
319 log.debug("Recovered from partial auth updates (if any).")
320
321 def _recover_auth_meta(self, auth_id, auth_meta):
322 """
323 Call me after locking the auth meta file.
324 """
325 remove_volumes = []
326
327 for volume, volume_data in auth_meta['volumes'].items():
328 if not volume_data['dirty']:
329 continue
330
331 (group_id, volume_id) = volume.split('/')
332 group_id = group_id if group_id is not 'None' else None
333 volume_path = VolumePath(group_id, volume_id)
334 access_level = volume_data['access_level']
335
336 with self._volume_lock(volume_path):
337 vol_meta = self._volume_metadata_get(volume_path)
338
339 # No VMeta update indicates that there was no auth update
340 # in Ceph either. So it's safe to remove corresponding
341 # partial update in AMeta.
342 if not vol_meta or auth_id not in vol_meta['auths']:
343 remove_volumes.append(volume)
344 continue
345
346 want_auth = {
347 'access_level': access_level,
348 'dirty': False,
349 }
350 # VMeta update looks clean. Ceph auth update must have been
351 # clean.
352 if vol_meta['auths'][auth_id] == want_auth:
353 continue
354
355 readonly = True if access_level is 'r' else False
356 self._authorize_volume(volume_path, auth_id, readonly)
357
358 # Recovered from partial auth updates for the auth ID's access
359 # to a volume.
360 auth_meta['volumes'][volume]['dirty'] = False
361 self._auth_metadata_set(auth_id, auth_meta)
362
363 for volume in remove_volumes:
364 del auth_meta['volumes'][volume]
365
366 if not auth_meta['volumes']:
367 # Clean up auth meta file
368 self.fs.unlink(self._auth_metadata_path(auth_id))
369 return
370
371 # Recovered from all partial auth updates for the auth ID.
372 auth_meta['dirty'] = False
373 self._auth_metadata_set(auth_id, auth_meta)
374
375
376 def evict(self, auth_id, timeout=30, volume_path=None):
377 """
378 Evict all clients based on the authorization ID and optionally based on
379 the volume path mounted. Assumes that the authorization key has been
380 revoked prior to calling this function.
381
382 This operation can throw an exception if the mon cluster is unresponsive, or
383 any individual MDS daemon is unresponsive for longer than the timeout passed in.
384 """
385
386 client_spec = ["auth_name={0}".format(auth_id), ]
387 if volume_path:
388 client_spec.append("client_metadata.root={0}".
389 format(self._get_path(volume_path)))
390
391 log.info("evict clients with {0}".format(', '.join(client_spec)))
392
393 mds_map = self._rados_command("mds dump", {})
394
395 up = {}
396 for name, gid in mds_map['up'].items():
397 # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
398 assert name.startswith("mds_")
399 up[int(name[4:])] = gid
400
401 # For all MDS ranks held by a daemon
402 # Do the parallelism in python instead of using "tell mds.*", because
403 # the latter doesn't give us per-mds output
404 threads = []
405 for rank, gid in up.items():
406 thread = RankEvicter(self, client_spec, rank, gid, mds_map,
407 timeout)
408 thread.start()
409 threads.append(thread)
410
411 for t in threads:
412 t.join()
413
414 log.info("evict: joined all")
415
416 for t in threads:
417 if not t.success:
418 msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}".
419 format(', '.join(client_spec), t.rank, t.gid, t.exception)
420 )
421 log.error(msg)
422 raise EvictionError(msg)
423
424 def _get_path(self, volume_path):
425 """
426 Determine the path within CephFS where this volume will live
427 :return: absolute path (string)
428 """
429 return os.path.join(
430 self.volume_prefix,
431 volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME,
432 volume_path.volume_id)
433
434 def _get_group_path(self, group_id):
435 if group_id is None:
436 raise ValueError("group_id may not be None")
437
438 return os.path.join(
439 self.volume_prefix,
440 group_id
441 )
442
443 def connect(self, premount_evict = None):
444 """
445
446 :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers
447 may want to use this to specify their own auth ID if they expect
448 to be a unique instance and don't want to wait for caps to time
449 out after failure of another instance of themselves.
450 """
451 log.debug("Connecting to RADOS with config {0}...".format(self.conf_path))
452 self.rados = rados.Rados(
453 name="client.{0}".format(self.auth_id),
454 clustername=self.cluster_name,
455 conffile=self.conf_path,
456 conf={}
457 )
458 self.rados.connect()
459
460 log.debug("Connection to RADOS complete")
461
462 log.debug("Connecting to cephfs...")
463 self.fs = cephfs.LibCephFS(rados_inst=self.rados)
464 log.debug("CephFS initializing...")
465 self.fs.init()
466 if premount_evict is not None:
467 log.debug("Premount eviction of {0} starting".format(premount_evict))
468 self.evict(premount_evict)
469 log.debug("Premount eviction of {0} completes".format(premount_evict))
470 log.debug("CephFS mounting...")
471 self.fs.mount()
472 log.debug("Connection to cephfs complete")
473
474 # Recover from partial auth updates due to a previous
475 # crash.
476 self.recover()
477
478 def get_mon_addrs(self):
479 log.info("get_mon_addrs")
480 result = []
481 mon_map = self._rados_command("mon dump")
482 for mon in mon_map['mons']:
483 ip_port = mon['addr'].split("/")[0]
484 result.append(ip_port)
485
486 return result
487
488 def disconnect(self):
489 log.info("disconnect")
490 if self.fs:
491 log.debug("Disconnecting cephfs...")
492 self.fs.shutdown()
493 self.fs = None
494 log.debug("Disconnecting cephfs complete")
495
496 if self.rados:
497 log.debug("Disconnecting rados...")
498 self.rados.shutdown()
499 self.rados = None
500 log.debug("Disconnecting rados complete")
501
502 def __del__(self):
503 self.disconnect()
504
505 def _get_pool_id(self, osd_map, pool_name):
506 # Maybe borrow the OSDMap wrapper class from calamari if more helpers
507 # like this are needed.
508 for pool in osd_map['pools']:
509 if pool['pool_name'] == pool_name:
510 return pool['pool']
511
512 return None
513
514 def _create_volume_pool(self, pool_name):
515 """
516 Idempotently create a pool for use as a CephFS data pool, with the given name
517
518 :return The ID of the created pool
519 """
520 osd_map = self._rados_command('osd dump', {})
521
522 existing_id = self._get_pool_id(osd_map, pool_name)
523 if existing_id is not None:
524 log.info("Pool {0} already exists".format(pool_name))
525 return existing_id
526
527 osd_count = len(osd_map['osds'])
528
529 # We can't query the actual cluster config remotely, but since this is
530 # just a heuristic we'll assume that the ceph.conf we have locally reflects
531 # that in use in the rest of the cluster.
532 pg_warn_max_per_osd = int(self.rados.conf_get('mon_pg_warn_max_per_osd'))
533
534 other_pgs = 0
535 for pool in osd_map['pools']:
536 if not pool['pool_name'].startswith(self.POOL_PREFIX):
537 other_pgs += pool['pg_num']
538
539 # A basic heuristic for picking pg_num: work out the max number of
540 # PGs we can have without tripping a warning, then subtract the number
541 # of PGs already created by non-manila pools, then divide by ten. That'll
542 # give you a reasonable result on a system where you have "a few" manila
543 # shares.
544 pg_num = ((pg_warn_max_per_osd * osd_count) - other_pgs) / 10
545 # TODO Alternatively, respect an override set by the user.
546
547 self._rados_command(
548 'osd pool create',
549 {
550 'pool': pool_name,
551 'pg_num': pg_num
552 }
553 )
554
555 osd_map = self._rados_command('osd dump', {})
556 pool_id = self._get_pool_id(osd_map, pool_name)
557
558 if pool_id is None:
559 # If the pool isn't there, that's either a ceph bug, or it's some outside influence
560 # removing it right after we created it.
561 log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format(
562 pool_name, json.dumps(osd_map, indent=2)
563 ))
564 raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name))
565 else:
566 return pool_id
567
568 def create_group(self, group_id):
569 # Prevent craftily-named volume groups from colliding with the meta
570 # files.
571 if group_id.endswith(META_FILE_EXT):
572 raise ValueError("group ID cannot end with '{0}'.".format(
573 META_FILE_EXT))
574 path = self._get_group_path(group_id)
575 self._mkdir_p(path)
576
577 def destroy_group(self, group_id):
578 path = self._get_group_path(group_id)
579 try:
580 self.fs.stat(self.volume_prefix)
581 except cephfs.ObjectNotFound:
582 pass
583 else:
584 self.fs.rmdir(path)
585
586 def _mkdir_p(self, path):
587 try:
588 self.fs.stat(path)
589 except cephfs.ObjectNotFound:
590 pass
591 else:
592 return
593
594 parts = path.split(os.path.sep)
595
596 for i in range(1, len(parts) + 1):
597 subpath = os.path.join(*parts[0:i])
598 try:
599 self.fs.stat(subpath)
600 except cephfs.ObjectNotFound:
601 self.fs.mkdir(subpath, 0o755)
602
603 def create_volume(self, volume_path, size=None, data_isolated=False):
604 """
605 Set up metadata, pools and auth for a volume.
606
607 This function is idempotent. It is safe to call this again
608 for an already-created volume, even if it is in use.
609
610 :param volume_path: VolumePath instance
611 :param size: In bytes, or None for no size limit
612 :param data_isolated: If true, create a separate OSD pool for this volume
613 :return:
614 """
615 path = self._get_path(volume_path)
616 log.info("create_volume: {0}".format(path))
617
618 self._mkdir_p(path)
619
620 if size is not None:
621 self.fs.setxattr(path, 'ceph.quota.max_bytes', size.__str__(), 0)
622
623 # data_isolated means create a separate pool for this volume
624 if data_isolated:
625 pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
626 log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name))
627 pool_id = self._create_volume_pool(pool_name)
628 mds_map = self._rados_command("mds dump", {})
629 if pool_id not in mds_map['data_pools']:
630 self._rados_command("mds add_data_pool", {
631 'pool': pool_name
632 })
633 self.fs.setxattr(path, 'ceph.dir.layout.pool', pool_name, 0)
634
635 # enforce security isolation, use seperate namespace for this volume
636 namespace = "{0}{1}".format(self.pool_ns_prefix, volume_path.volume_id)
637 log.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path, namespace))
638 self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace', namespace, 0)
639
640 # Create a volume meta file, if it does not already exist, to store
641 # data about auth ids having access to the volume
642 fd = self.fs.open(self._volume_metadata_path(volume_path),
643 os.O_CREAT, 0o755)
644 self.fs.close(fd)
645
646 return {
647 'mount_path': path
648 }
649
650 def delete_volume(self, volume_path, data_isolated=False):
651 """
652 Make a volume inaccessible to guests. This function is
653 idempotent. This is the fast part of tearing down a volume: you must
654 also later call purge_volume, which is the slow part.
655
656 :param volume_path: Same identifier used in create_volume
657 :return:
658 """
659
660 path = self._get_path(volume_path)
661 log.info("delete_volume: {0}".format(path))
662
663 # Create the trash folder if it doesn't already exist
664 trash = os.path.join(self.volume_prefix, "_deleting")
665 self._mkdir_p(trash)
666
667 # We'll move it to here
668 trashed_volume = os.path.join(trash, volume_path.volume_id)
669
670 # Move the volume's data to the trash folder
671 try:
672 self.fs.stat(path)
673 except cephfs.ObjectNotFound:
674 log.warning("Trying to delete volume '{0}' but it's already gone".format(
675 path))
676 else:
677 self.fs.rename(path, trashed_volume)
678
679 # Delete the volume meta file, if it's not already deleted
680 vol_meta_path = self._volume_metadata_path(volume_path)
681 try:
682 self.fs.unlink(vol_meta_path)
683 except cephfs.ObjectNotFound:
684 pass
685
686 def purge_volume(self, volume_path, data_isolated=False):
687 """
688 Finish clearing up a volume that was previously passed to delete_volume. This
689 function is idempotent.
690 """
691
692 trash = os.path.join(self.volume_prefix, "_deleting")
693 trashed_volume = os.path.join(trash, volume_path.volume_id)
694
695 try:
696 self.fs.stat(trashed_volume)
697 except cephfs.ObjectNotFound:
698 log.warning("Trying to purge volume '{0}' but it's already been purged".format(
699 trashed_volume))
700 return
701
702 def rmtree(root_path):
703 log.debug("rmtree {0}".format(root_path))
704 dir_handle = self.fs.opendir(root_path)
705 d = self.fs.readdir(dir_handle)
706 while d:
707 if d.d_name not in [".", ".."]:
708 # Do not use os.path.join because it is sensitive
709 # to string encoding, we just pass through dnames
710 # as byte arrays
711 d_full = "{0}/{1}".format(root_path, d.d_name)
712 if d.is_dir():
713 rmtree(d_full)
714 else:
715 self.fs.unlink(d_full)
716
717 d = self.fs.readdir(dir_handle)
718 self.fs.closedir(dir_handle)
719
720 self.fs.rmdir(root_path)
721
722 rmtree(trashed_volume)
723
724 if data_isolated:
725 pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
726 osd_map = self._rados_command("osd dump", {})
727 pool_id = self._get_pool_id(osd_map, pool_name)
728 mds_map = self._rados_command("mds dump", {})
729 if pool_id in mds_map['data_pools']:
730 self._rados_command("mds remove_data_pool", {
731 'pool': pool_name
732 })
733 self._rados_command("osd pool delete",
734 {
735 "pool": pool_name,
736 "pool2": pool_name,
737 "sure": "--yes-i-really-really-mean-it"
738 })
739
740 def _get_ancestor_xattr(self, path, attr):
741 """
742 Helper for reading layout information: if this xattr is missing
743 on the requested path, keep checking parents until we find it.
744 """
745 try:
746 result = self.fs.getxattr(path, attr)
747 if result == "":
748 # Annoying! cephfs gives us empty instead of an error when attr not found
749 raise cephfs.NoData()
750 else:
751 return result
752 except cephfs.NoData:
753 if path == "/":
754 raise
755 else:
756 return self._get_ancestor_xattr(os.path.split(path)[0], attr)
757
758 def _check_compat_version(self, compat_version):
759 if self.version < compat_version:
760 msg = ("The current version of CephFSVolumeClient, version {0} "
761 "does not support the required feature. Need version {1} "
762 "or greater".format(self.version, compat_version)
763 )
764 log.error(msg)
765 raise CephFSVolumeClientError(msg)
766
767 def _metadata_get(self, path):
768 """
769 Return a deserialized JSON object, or None
770 """
771 fd = self.fs.open(path, "r")
772 # TODO iterate instead of assuming file < 4MB
773 read_bytes = self.fs.read(fd, 0, 4096 * 1024)
774 self.fs.close(fd)
775 if read_bytes:
776 return json.loads(read_bytes)
777 else:
778 return None
779
780 def _metadata_set(self, path, data):
781 serialized = json.dumps(data)
782 fd = self.fs.open(path, "w")
783 try:
784 self.fs.write(fd, serialized, 0)
785 self.fs.fsync(fd, 0)
786 finally:
787 self.fs.close(fd)
788
789 def _lock(self, path):
790 @contextmanager
791 def fn():
792 while(1):
793 fd = self.fs.open(path, os.O_CREAT, 0o755)
794 self.fs.flock(fd, fcntl.LOCK_EX, self._id)
795
796 # The locked file will be cleaned up sometime. It could be
797 # unlinked e.g., by an another manila share instance, before
798 # lock was applied on it. Perform checks to ensure that this
799 # does not happen.
800 try:
801 statbuf = self.fs.stat(path)
802 except cephfs.ObjectNotFound:
803 self.fs.close(fd)
804 continue
805
806 fstatbuf = self.fs.fstat(fd)
807 if statbuf.st_ino == fstatbuf.st_ino:
808 break
809
810 try:
811 yield
812 finally:
813 self.fs.flock(fd, fcntl.LOCK_UN, self._id)
814 self.fs.close(fd)
815
816 return fn()
817
818 def _auth_metadata_path(self, auth_id):
819 return os.path.join(self.volume_prefix, "${0}{1}".format(
820 auth_id, META_FILE_EXT))
821
822 def _auth_lock(self, auth_id):
823 return self._lock(self._auth_metadata_path(auth_id))
824
825 def _auth_metadata_get(self, auth_id):
826 """
827 Call me with the metadata locked!
828
829 Check whether a auth metadata structure can be decoded by the current
830 version of CephFSVolumeClient.
831
832 Return auth metadata that the current version of CephFSVolumeClient
833 can decode.
834 """
835 auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id))
836
837 if auth_metadata:
838 self._check_compat_version(auth_metadata['compat_version'])
839
840 return auth_metadata
841
842 def _auth_metadata_set(self, auth_id, data):
843 """
844 Call me with the metadata locked!
845
846 Fsync the auth metadata.
847
848 Add two version attributes to the auth metadata,
849 'compat_version', the minimum CephFSVolumeClient version that can
850 decode the metadata, and 'version', the CephFSVolumeClient version
851 that encoded the metadata.
852 """
853 data['compat_version'] = 1
854 data['version'] = 1
855 return self._metadata_set(self._auth_metadata_path(auth_id), data)
856
857 def _volume_metadata_path(self, volume_path):
858 return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format(
859 volume_path.group_id if volume_path.group_id else "",
860 volume_path.volume_id,
861 META_FILE_EXT
862 ))
863
864 def _volume_lock(self, volume_path):
865 """
866 Return a ContextManager which locks the authorization metadata for
867 a particular volume, and persists a flag to the metadata indicating
868 that it is currently locked, so that we can detect dirty situations
869 during recovery.
870
871 This lock isn't just to make access to the metadata safe: it's also
872 designed to be used over the two-step process of checking the
873 metadata and then responding to an authorization request, to
874 ensure that at the point we respond the metadata hasn't changed
875 in the background. It's key to how we avoid security holes
876 resulting from races during that problem ,
877 """
878 return self._lock(self._volume_metadata_path(volume_path))
879
880 def _volume_metadata_get(self, volume_path):
881 """
882 Call me with the metadata locked!
883
884 Check whether a volume metadata structure can be decoded by the current
885 version of CephFSVolumeClient.
886
887 Return a volume_metadata structure that the current version of
888 CephFSVolumeClient can decode.
889 """
890 volume_metadata = self._metadata_get(self._volume_metadata_path(volume_path))
891
892 if volume_metadata:
893 self._check_compat_version(volume_metadata['compat_version'])
894
895 return volume_metadata
896
897 def _volume_metadata_set(self, volume_path, data):
898 """
899 Call me with the metadata locked!
900
901 Add two version attributes to the volume metadata,
902 'compat_version', the minimum CephFSVolumeClient version that can
903 decode the metadata and 'version', the CephFSVolumeClient version
904 that encoded the metadata.
905 """
906 data['compat_version'] = 1
907 data['version'] = 1
908 return self._metadata_set(self._volume_metadata_path(volume_path), data)
909
910 def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None):
911 """
912 Get-or-create a Ceph auth identity for `auth_id` and grant them access
913 to
914 :param volume_path:
915 :param auth_id:
916 :param readonly:
917 :param tenant_id: Optionally provide a stringizable object to
918 restrict any created cephx IDs to other callers
919 passing the same tenant ID.
920 :return:
921 """
922
923 with self._auth_lock(auth_id):
924 # Existing meta, or None, to be updated
925 auth_meta = self._auth_metadata_get(auth_id)
926
927 # volume data to be inserted
928 volume_path_str = str(volume_path)
929 volume = {
930 volume_path_str : {
931 # The access level at which the auth_id is authorized to
932 # access the volume.
933 'access_level': 'r' if readonly else 'rw',
934 'dirty': True,
935 }
936 }
937 if auth_meta is None:
938 sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format(
939 auth_id, tenant_id
940 ))
941 log.debug("Authorize: no existing meta")
942 auth_meta = {
943 'dirty': True,
944 'tenant_id': tenant_id.__str__() if tenant_id else None,
945 'volumes': volume
946 }
947
948 # Note: this is *not* guaranteeing that the key doesn't already
949 # exist in Ceph: we are allowing VolumeClient tenants to
950 # 'claim' existing Ceph keys. In order to prevent VolumeClient
951 # tenants from reading e.g. client.admin keys, you need to
952 # have configured your VolumeClient user (e.g. Manila) to
953 # have mon auth caps that prevent it from accessing those keys
954 # (e.g. limit it to only access keys with a manila.* prefix)
955 else:
956 # Disallow tenants to share auth IDs
957 if auth_meta['tenant_id'].__str__() != tenant_id.__str__():
958 msg = "auth ID: {0} is already in use".format(auth_id)
959 log.error(msg)
960 raise CephFSVolumeClientError(msg)
961
962 if auth_meta['dirty']:
963 self._recover_auth_meta(auth_id, auth_meta)
964
965 log.debug("Authorize: existing tenant {tenant}".format(
966 tenant=auth_meta['tenant_id']
967 ))
968 auth_meta['dirty'] = True
969 auth_meta['volumes'].update(volume)
970
971 self._auth_metadata_set(auth_id, auth_meta)
972
973 with self._volume_lock(volume_path):
974 key = self._authorize_volume(volume_path, auth_id, readonly)
975
976 auth_meta['dirty'] = False
977 auth_meta['volumes'][volume_path_str]['dirty'] = False
978 self._auth_metadata_set(auth_id, auth_meta)
979
980 if tenant_id:
981 return {
982 'auth_key': key
983 }
984 else:
985 # Caller wasn't multi-tenant aware: be safe and don't give
986 # them a key
987 return {
988 'auth_key': None
989 }
990
991 def _authorize_volume(self, volume_path, auth_id, readonly):
992 vol_meta = self._volume_metadata_get(volume_path)
993
994 access_level = 'r' if readonly else 'rw'
995 auth = {
996 auth_id: {
997 'access_level': access_level,
998 'dirty': True,
999 }
1000 }
1001
1002 if vol_meta is None:
1003 vol_meta = {
1004 'auths': auth
1005 }
1006 else:
1007 vol_meta['auths'].update(auth)
1008 self._volume_metadata_set(volume_path, vol_meta)
1009
1010 key = self._authorize_ceph(volume_path, auth_id, readonly)
1011
1012 vol_meta['auths'][auth_id]['dirty'] = False
1013 self._volume_metadata_set(volume_path, vol_meta)
1014
1015 return key
1016
1017 def _authorize_ceph(self, volume_path, auth_id, readonly):
1018 path = self._get_path(volume_path)
1019 log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(
1020 auth_id, path
1021 ))
1022
1023 # First I need to work out what the data pool is for this share:
1024 # read the layout
1025 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
1026 namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_namespace")
1027
1028 # Now construct auth capabilities that give the guest just enough
1029 # permissions to access the share
1030 client_entity = "client.{0}".format(auth_id)
1031 want_access_level = 'r' if readonly else 'rw'
1032 want_mds_cap = 'allow {0} path={1}'.format(want_access_level, path)
1033 want_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
1034 want_access_level, pool_name, namespace)
1035
1036 try:
1037 existing = self._rados_command(
1038 'auth get',
1039 {
1040 'entity': client_entity
1041 }
1042 )
1043 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1044 except rados.Error:
1045 caps = self._rados_command(
1046 'auth get-or-create',
1047 {
1048 'entity': client_entity,
1049 'caps': [
1050 'mds', want_mds_cap,
1051 'osd', want_osd_cap,
1052 'mon', 'allow r']
1053 })
1054 else:
1055 # entity exists, update it
1056 cap = existing[0]
1057
1058 # Construct auth caps that if present might conflict with the desired
1059 # auth caps.
1060 unwanted_access_level = 'r' if want_access_level is 'rw' else 'rw'
1061 unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path)
1062 unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
1063 unwanted_access_level, pool_name, namespace)
1064
1065 def cap_update(orig, want, unwanted):
1066 # Updates the existing auth caps such that there is a single
1067 # occurrence of wanted auth caps and no occurrence of
1068 # conflicting auth caps.
1069
1070 cap_tokens = set(orig.split(","))
1071
1072 cap_tokens.discard(unwanted)
1073 cap_tokens.add(want)
1074
1075 return ",".join(cap_tokens)
1076
1077 osd_cap_str = cap_update(cap['caps'].get('osd', ""), want_osd_cap, unwanted_osd_cap)
1078 mds_cap_str = cap_update(cap['caps'].get('mds', ""), want_mds_cap, unwanted_mds_cap)
1079
1080 caps = self._rados_command(
1081 'auth caps',
1082 {
1083 'entity': client_entity,
1084 'caps': [
1085 'mds', mds_cap_str,
1086 'osd', osd_cap_str,
1087 'mon', cap['caps'].get('mon', 'allow r')]
1088 })
1089 caps = self._rados_command(
1090 'auth get',
1091 {
1092 'entity': client_entity
1093 }
1094 )
1095
1096 # Result expected like this:
1097 # [
1098 # {
1099 # "entity": "client.foobar",
1100 # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==",
1101 # "caps": {
1102 # "mds": "allow *",
1103 # "mon": "allow *"
1104 # }
1105 # }
1106 # ]
1107 assert len(caps) == 1
1108 assert caps[0]['entity'] == client_entity
1109 return caps[0]['key']
1110
1111 def deauthorize(self, volume_path, auth_id):
1112 with self._auth_lock(auth_id):
1113 # Existing meta, or None, to be updated
1114 auth_meta = self._auth_metadata_get(auth_id)
1115
1116 volume_path_str = str(volume_path)
1117 if (auth_meta is None) or (not auth_meta['volumes']):
1118 log.warn("deauthorized called for already-removed auth"
1119 "ID '{auth_id}' for volume ID '{volume}'".format(
1120 auth_id=auth_id, volume=volume_path.volume_id
1121 ))
1122 # Clean up the auth meta file of an auth ID
1123 self.fs.unlink(self._auth_metadata_path(auth_id))
1124 return
1125
1126 if volume_path_str not in auth_meta['volumes']:
1127 log.warn("deauthorized called for already-removed auth"
1128 "ID '{auth_id}' for volume ID '{volume}'".format(
1129 auth_id=auth_id, volume=volume_path.volume_id
1130 ))
1131 return
1132
1133 if auth_meta['dirty']:
1134 self._recover_auth_meta(auth_id, auth_meta)
1135
1136 auth_meta['dirty'] = True
1137 auth_meta['volumes'][volume_path_str]['dirty'] = True
1138 self._auth_metadata_set(auth_id, auth_meta)
1139
1140 self._deauthorize_volume(volume_path, auth_id)
1141
1142 # Filter out the volume we're deauthorizing
1143 del auth_meta['volumes'][volume_path_str]
1144
1145 # Clean up auth meta file
1146 if not auth_meta['volumes']:
1147 self.fs.unlink(self._auth_metadata_path(auth_id))
1148 return
1149
1150 auth_meta['dirty'] = False
1151 self._auth_metadata_set(auth_id, auth_meta)
1152
1153 def _deauthorize_volume(self, volume_path, auth_id):
1154 with self._volume_lock(volume_path):
1155 vol_meta = self._volume_metadata_get(volume_path)
1156
1157 if (vol_meta is None) or (auth_id not in vol_meta['auths']):
1158 log.warn("deauthorized called for already-removed auth"
1159 "ID '{auth_id}' for volume ID '{volume}'".format(
1160 auth_id=auth_id, volume=volume_path.volume_id
1161 ))
1162 return
1163
1164 vol_meta['auths'][auth_id]['dirty'] = True
1165 self._volume_metadata_set(volume_path, vol_meta)
1166
1167 self._deauthorize(volume_path, auth_id)
1168
1169 # Remove the auth_id from the metadata *after* removing it
1170 # from ceph, so that if we crashed here, we would actually
1171 # recreate the auth ID during recovery (i.e. end up with
1172 # a consistent state).
1173
1174 # Filter out the auth we're removing
1175 del vol_meta['auths'][auth_id]
1176 self._volume_metadata_set(volume_path, vol_meta)
1177
1178 def _deauthorize(self, volume_path, auth_id):
1179 """
1180 The volume must still exist.
1181 """
1182 client_entity = "client.{0}".format(auth_id)
1183 path = self._get_path(volume_path)
1184 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
1185 namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_namespace")
1186
1187 # The auth_id might have read-only or read-write mount access for the
1188 # volume path.
1189 access_levels = ('r', 'rw')
1190 want_mds_caps = {'allow {0} path={1}'.format(access_level, path)
1191 for access_level in access_levels}
1192 want_osd_caps = {'allow {0} pool={1} namespace={2}'.format(
1193 access_level, pool_name, namespace)
1194 for access_level in access_levels}
1195
1196 try:
1197 existing = self._rados_command(
1198 'auth get',
1199 {
1200 'entity': client_entity
1201 }
1202 )
1203
1204 def cap_remove(orig, want):
1205 cap_tokens = set(orig.split(","))
1206 return ",".join(cap_tokens.difference(want))
1207
1208 cap = existing[0]
1209 osd_cap_str = cap_remove(cap['caps'].get('osd', ""), want_osd_caps)
1210 mds_cap_str = cap_remove(cap['caps'].get('mds', ""), want_mds_caps)
1211 if (not osd_cap_str) and (not mds_cap_str):
1212 self._rados_command('auth del', {'entity': client_entity}, decode=False)
1213 else:
1214 self._rados_command(
1215 'auth caps',
1216 {
1217 'entity': client_entity,
1218 'caps': [
1219 'mds', mds_cap_str,
1220 'osd', osd_cap_str,
1221 'mon', cap['caps'].get('mon', 'allow r')]
1222 })
1223
1224 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1225 except rados.Error:
1226 # Already gone, great.
1227 return
1228
1229 def get_authorized_ids(self, volume_path):
1230 """
1231 Expose a list of auth IDs that have access to a volume.
1232
1233 return: a list of (auth_id, access_level) tuples, where
1234 the access_level can be 'r' , or 'rw'.
1235 None if no auth ID is given access to the volume.
1236 """
1237 with self._volume_lock(volume_path):
1238 meta = self._volume_metadata_get(volume_path)
1239 auths = []
1240 if not meta or not meta['auths']:
1241 return None
1242
1243 for auth, auth_data in meta['auths'].items():
1244 # Skip partial auth updates.
1245 if not auth_data['dirty']:
1246 auths.append((auth, auth_data['access_level']))
1247
1248 return auths
1249
1250 def _rados_command(self, prefix, args=None, decode=True):
1251 """
1252 Safer wrapper for ceph_argparse.json_command, which raises
1253 Error exception instead of relying on caller to check return
1254 codes.
1255
1256 Error exception can result from:
1257 * Timeout
1258 * Actual legitimate errors
1259 * Malformed JSON output
1260
1261 return: Decoded object from ceph, or None if empty string returned.
1262 If decode is False, return a string (the data returned by
1263 ceph command)
1264 """
1265 if args is None:
1266 args = {}
1267
1268 argdict = args.copy()
1269 argdict['format'] = 'json'
1270
1271 ret, outbuf, outs = json_command(self.rados,
1272 prefix=prefix,
1273 argdict=argdict,
1274 timeout=RADOS_TIMEOUT)
1275 if ret != 0:
1276 raise rados.Error(outs)
1277 else:
1278 if decode:
1279 if outbuf:
1280 try:
1281 return json.loads(outbuf)
1282 except (ValueError, TypeError):
1283 raise RadosError("Invalid JSON output for command {0}".format(argdict))
1284 else:
1285 return None
1286 else:
1287 return outbuf
1288
1289 def get_used_bytes(self, volume_path):
1290 return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir.rbytes"))
1291
1292 def set_max_bytes(self, volume_path, max_bytes):
1293 self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes',
1294 max_bytes.__str__() if max_bytes is not None else "0",
1295 0)
1296
1297 def _snapshot_path(self, dir_path, snapshot_name):
1298 return os.path.join(
1299 dir_path, SNAP_DIR, snapshot_name
1300 )
1301
1302 def _snapshot_create(self, dir_path, snapshot_name):
1303 # TODO: raise intelligible exception for clusters where snaps are disabled
1304 self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), 0o755)
1305
1306 def _snapshot_destroy(self, dir_path, snapshot_name):
1307 """
1308 Remove a snapshot, or do nothing if it already doesn't exist.
1309 """
1310 try:
1311 self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name))
1312 except cephfs.ObjectNotFound:
1313 log.warn("Snapshot was already gone: {0}".format(snapshot_name))
1314
1315 def create_snapshot_volume(self, volume_path, snapshot_name):
1316 self._snapshot_create(self._get_path(volume_path), snapshot_name)
1317
1318 def destroy_snapshot_volume(self, volume_path, snapshot_name):
1319 self._snapshot_destroy(self._get_path(volume_path), snapshot_name)
1320
1321 def create_snapshot_group(self, group_id, snapshot_name):
1322 if group_id is None:
1323 raise RuntimeError("Group ID may not be None")
1324
1325 return self._snapshot_create(self._get_group_path(group_id), snapshot_name)
1326
1327 def destroy_snapshot_group(self, group_id, snapshot_name):
1328 if group_id is None:
1329 raise RuntimeError("Group ID may not be None")
1330 if snapshot_name is None:
1331 raise RuntimeError("Snapshot name may not be None")
1332
1333 return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name)
1334
1335 def _cp_r(self, src, dst):
1336 # TODO
1337 raise NotImplementedError()
1338
1339 def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name):
1340 dest_fs_path = self._get_path(dest_volume_path)
1341 src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name)
1342
1343 self._cp_r(src_snapshot_path, dest_fs_path)