]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/ceph_volume_client.py
ac01807b69362a3b4c927e1d021b165ecf6b529b
[ceph.git] / ceph / src / pybind / ceph_volume_client.py
1 """
2 Copyright (C) 2015 Red Hat, Inc.
3
4 LGPL2. See file COPYING.
5 """
6
7 from contextlib import contextmanager
8 import errno
9 import fcntl
10 import json
11 import logging
12 import os
13 import re
14 import struct
15 import sys
16 import threading
17 import time
18 import uuid
19
20 from ceph_argparse import json_command
21
22 import cephfs
23 import rados
24
25
26 class RadosError(Exception):
27 """
28 Something went wrong talking to Ceph with librados
29 """
30 pass
31
32
33 RADOS_TIMEOUT = 10
34
35 log = logging.getLogger(__name__)
36
37
38 # Reserved volume group name which we use in paths for volumes
39 # that are not assigned to a group (i.e. created with group=None)
40 NO_GROUP_NAME = "_nogroup"
41
42 # Filename extensions for meta files.
43 META_FILE_EXT = ".meta"
44
45 class VolumePath(object):
46 """
47 Identify a volume's path as group->volume
48 The Volume ID is a unique identifier, but this is a much more
49 helpful thing to pass around.
50 """
51 def __init__(self, group_id, volume_id):
52 self.group_id = group_id
53 self.volume_id = volume_id
54 assert self.group_id != NO_GROUP_NAME
55 assert self.volume_id != "" and self.volume_id is not None
56
57 def __str__(self):
58 return "{0}/{1}".format(self.group_id, self.volume_id)
59
60
61 class ClusterTimeout(Exception):
62 """
63 Exception indicating that we timed out trying to talk to the Ceph cluster,
64 either to the mons, or to any individual daemon that the mons indicate ought
65 to be up but isn't responding to us.
66 """
67 pass
68
69
70 class ClusterError(Exception):
71 """
72 Exception indicating that the cluster returned an error to a command that
73 we thought should be successful based on our last knowledge of the cluster
74 state.
75 """
76 def __init__(self, action, result_code, result_str):
77 self._action = action
78 self._result_code = result_code
79 self._result_str = result_str
80
81 def __str__(self):
82 return "Error {0} (\"{1}\") while {2}".format(
83 self._result_code, self._result_str, self._action)
84
85
86 class RankEvicter(threading.Thread):
87 """
88 Thread for evicting client(s) from a particular MDS daemon instance.
89
90 This is more complex than simply sending a command, because we have to
91 handle cases where MDS daemons might not be fully up yet, and/or might
92 be transiently unresponsive to commands.
93 """
94 class GidGone(Exception):
95 pass
96
97 POLL_PERIOD = 5
98
99 def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout):
100 """
101 :param client_spec: list of strings, used as filter arguments to "session evict"
102 pass ["id=123"] to evict a single client with session id 123.
103 """
104 self.rank = rank
105 self.gid = gid
106 self._mds_map = mds_map
107 self._client_spec = client_spec
108 self._volume_client = volume_client
109 self._ready_timeout = ready_timeout
110 self._ready_waited = 0
111
112 self.success = False
113 self.exception = None
114
115 super(RankEvicter, self).__init__()
116
117 def _ready_to_evict(self):
118 if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid:
119 log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format(
120 self._client_spec, self.rank, self.gid
121 ))
122 raise RankEvicter.GidGone()
123
124 info = self._mds_map['info']["gid_{0}".format(self.gid)]
125 log.debug("_ready_to_evict: state={0}".format(info['state']))
126 return info['state'] in ["up:active", "up:clientreplay"]
127
128 def _wait_for_ready(self):
129 """
130 Wait for that MDS rank to reach an active or clientreplay state, and
131 not be laggy.
132 """
133 while not self._ready_to_evict():
134 if self._ready_waited > self._ready_timeout:
135 raise ClusterTimeout()
136
137 time.sleep(self.POLL_PERIOD)
138 self._ready_waited += self.POLL_PERIOD
139
140 self._mds_map = self._volume_client._rados_command("mds dump", {})
141
142 def _evict(self):
143 """
144 Run the eviction procedure. Return true on success, false on errors.
145 """
146
147 # Wait til the MDS is believed by the mon to be available for commands
148 try:
149 self._wait_for_ready()
150 except self.GidGone:
151 return True
152
153 # Then send it an evict
154 ret = errno.ETIMEDOUT
155 while ret == errno.ETIMEDOUT:
156 log.debug("mds_command: {0}, {1}".format(
157 "%s" % self.gid, ["session", "evict"] + self._client_spec
158 ))
159 ret, outb, outs = self._volume_client.fs.mds_command(
160 "%s" % self.gid,
161 [json.dumps({
162 "prefix": "session evict",
163 "filters": self._client_spec
164 })], "")
165 log.debug("mds_command: complete {0} {1}".format(ret, outs))
166
167 # If we get a clean response, great, it's gone from that rank.
168 if ret == 0:
169 return True
170 elif ret == errno.ETIMEDOUT:
171 # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error)
172 self._mds_map = self._volume_client._rados_command("mds dump", {})
173 try:
174 self._wait_for_ready()
175 except self.GidGone:
176 return True
177 else:
178 raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs)
179
180 def run(self):
181 try:
182 self._evict()
183 except Exception as e:
184 self.success = False
185 self.exception = e
186 else:
187 self.success = True
188
189
190 class EvictionError(Exception):
191 pass
192
193
194 class CephFSVolumeClientError(Exception):
195 """
196 Something went wrong talking to Ceph using CephFSVolumeClient.
197 """
198 pass
199
200
201 CEPHFSVOLUMECLIENT_VERSION_HISTORY = """
202
203 CephFSVolumeClient Version History:
204
205 * 1 - Initial version
206 * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient
207
208 """
209
210
211 class CephFSVolumeClient(object):
212 """
213 Combine libcephfs and librados interfaces to implement a
214 'Volume' concept implemented as a cephfs directory and
215 client capabilities which restrict mount access to this
216 directory.
217
218 Additionally, volumes may be in a 'Group'. Conveniently,
219 volumes are a lot like manila shares, and groups are a lot
220 like manila consistency groups.
221
222 Refer to volumes with VolumePath, which specifies the
223 volume and group IDs (both strings). The group ID may
224 be None.
225
226 In general, functions in this class are allowed raise rados.Error
227 or cephfs.Error exceptions in unexpected situations.
228 """
229
230 # Current version
231 version = 2
232
233 # Where shall we create our volumes?
234 POOL_PREFIX = "fsvolume_"
235 DEFAULT_VOL_PREFIX = "/volumes"
236 DEFAULT_NS_PREFIX = "fsvolumens_"
237
238 def __init__(self, auth_id, conf_path, cluster_name, volume_prefix=None, pool_ns_prefix=None):
239 self.fs = None
240 self.rados = None
241 self.connected = False
242 self.conf_path = conf_path
243 self.cluster_name = cluster_name
244 self.auth_id = auth_id
245 self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX
246 self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX
247 # For flock'ing in cephfs, I want a unique ID to distinguish me
248 # from any other manila-share services that are loading this module.
249 # We could use pid, but that's unnecessary weak: generate a
250 # UUID
251 self._id = struct.unpack(">Q", uuid.uuid1().get_bytes()[0:8])[0]
252
253 # TODO: version the on-disk structures
254
255 def recover(self):
256 # Scan all auth keys to see if they're dirty: if they are, they have
257 # state that might not have propagated to Ceph or to the related
258 # volumes yet.
259
260 # Important: we *always* acquire locks in the order auth->volume
261 # That means a volume can never be dirty without the auth key
262 # we're updating it with being dirty at the same time.
263
264 # First list the auth IDs that have potentially dirty on-disk metadata
265 log.debug("Recovering from partial auth updates (if any)...")
266
267 try:
268 dir_handle = self.fs.opendir(self.volume_prefix)
269 except cephfs.ObjectNotFound:
270 log.debug("Nothing to recover. No auth meta files.")
271 return
272
273 d = self.fs.readdir(dir_handle)
274 auth_ids = []
275
276 if not d:
277 log.debug("Nothing to recover. No auth meta files.")
278
279 while d:
280 # Identify auth IDs from auth meta filenames. The auth meta files
281 # are named as, "$<auth_id><meta filename extension>"
282 regex = "^\$(.*){0}$".format(re.escape(META_FILE_EXT))
283 match = re.search(regex, d.d_name)
284 if match:
285 auth_ids.append(match.group(1))
286
287 d = self.fs.readdir(dir_handle)
288
289 self.fs.closedir(dir_handle)
290
291 # Key points based on ordering:
292 # * Anything added in VMeta is already added in AMeta
293 # * Anything added in Ceph is already added in VMeta
294 # * Anything removed in VMeta is already removed in Ceph
295 # * Anything removed in AMeta is already removed in VMeta
296
297 # Deauthorization: because I only update metadata AFTER the
298 # update of the next level down, I have the same ordering of
299 # -> things which exist in the AMeta should also exist
300 # in the VMeta, should also exist in Ceph, and the same
301 # recovery procedure that gets me consistent after crashes
302 # during authorization will also work during deauthorization
303
304 # Now for each auth ID, check for dirty flag and apply updates
305 # if dirty flag is found
306 for auth_id in auth_ids:
307 with self._auth_lock(auth_id):
308 auth_meta = self._auth_metadata_get(auth_id)
309 if not auth_meta or not auth_meta['volumes']:
310 # Clean up auth meta file
311 self.fs.unlink(self._auth_metadata_path(auth_id))
312 continue
313 if not auth_meta['dirty']:
314 continue
315 self._recover_auth_meta(auth_id, auth_meta)
316
317 log.debug("Recovered from partial auth updates (if any).")
318
319 def _recover_auth_meta(self, auth_id, auth_meta):
320 """
321 Call me after locking the auth meta file.
322 """
323 remove_volumes = []
324
325 for volume, volume_data in auth_meta['volumes'].items():
326 if not volume_data['dirty']:
327 continue
328
329 (group_id, volume_id) = volume.split('/')
330 group_id = group_id if group_id is not 'None' else None
331 volume_path = VolumePath(group_id, volume_id)
332 access_level = volume_data['access_level']
333
334 with self._volume_lock(volume_path):
335 vol_meta = self._volume_metadata_get(volume_path)
336
337 # No VMeta update indicates that there was no auth update
338 # in Ceph either. So it's safe to remove corresponding
339 # partial update in AMeta.
340 if not vol_meta or auth_id not in vol_meta['auths']:
341 remove_volumes.append(volume)
342 continue
343
344 want_auth = {
345 'access_level': access_level,
346 'dirty': False,
347 }
348 # VMeta update looks clean. Ceph auth update must have been
349 # clean.
350 if vol_meta['auths'][auth_id] == want_auth:
351 continue
352
353 readonly = True if access_level is 'r' else False
354 self._authorize_volume(volume_path, auth_id, readonly)
355
356 # Recovered from partial auth updates for the auth ID's access
357 # to a volume.
358 auth_meta['volumes'][volume]['dirty'] = False
359 self._auth_metadata_set(auth_id, auth_meta)
360
361 for volume in remove_volumes:
362 del auth_meta['volumes'][volume]
363
364 if not auth_meta['volumes']:
365 # Clean up auth meta file
366 self.fs.unlink(self._auth_metadata_path(auth_id))
367 return
368
369 # Recovered from all partial auth updates for the auth ID.
370 auth_meta['dirty'] = False
371 self._auth_metadata_set(auth_id, auth_meta)
372
373
374 def evict(self, auth_id, timeout=30, volume_path=None):
375 """
376 Evict all clients based on the authorization ID and optionally based on
377 the volume path mounted. Assumes that the authorization key has been
378 revoked prior to calling this function.
379
380 This operation can throw an exception if the mon cluster is unresponsive, or
381 any individual MDS daemon is unresponsive for longer than the timeout passed in.
382 """
383
384 client_spec = ["auth_name={0}".format(auth_id), ]
385 if volume_path:
386 client_spec.append("client_metadata.root={0}".
387 format(self._get_path(volume_path)))
388
389 log.info("evict clients with {0}".format(', '.join(client_spec)))
390
391 mds_map = self._rados_command("mds dump", {})
392
393 up = {}
394 for name, gid in mds_map['up'].items():
395 # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
396 assert name.startswith("mds_")
397 up[int(name[4:])] = gid
398
399 # For all MDS ranks held by a daemon
400 # Do the parallelism in python instead of using "tell mds.*", because
401 # the latter doesn't give us per-mds output
402 threads = []
403 for rank, gid in up.items():
404 thread = RankEvicter(self, client_spec, rank, gid, mds_map,
405 timeout)
406 thread.start()
407 threads.append(thread)
408
409 for t in threads:
410 t.join()
411
412 log.info("evict: joined all")
413
414 for t in threads:
415 if not t.success:
416 msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}".
417 format(', '.join(client_spec), t.rank, t.gid, t.exception)
418 )
419 log.error(msg)
420 raise EvictionError(msg)
421
422 def _get_path(self, volume_path):
423 """
424 Determine the path within CephFS where this volume will live
425 :return: absolute path (string)
426 """
427 return os.path.join(
428 self.volume_prefix,
429 volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME,
430 volume_path.volume_id)
431
432 def _get_group_path(self, group_id):
433 if group_id is None:
434 raise ValueError("group_id may not be None")
435
436 return os.path.join(
437 self.volume_prefix,
438 group_id
439 )
440
441 def connect(self, premount_evict = None):
442 """
443
444 :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers
445 may want to use this to specify their own auth ID if they expect
446 to be a unique instance and don't want to wait for caps to time
447 out after failure of another instance of themselves.
448 """
449 log.debug("Connecting to RADOS with config {0}...".format(self.conf_path))
450 self.rados = rados.Rados(
451 name="client.{0}".format(self.auth_id),
452 clustername=self.cluster_name,
453 conffile=self.conf_path,
454 conf={}
455 )
456 self.rados.connect()
457
458 log.debug("Connection to RADOS complete")
459
460 log.debug("Connecting to cephfs...")
461 self.fs = cephfs.LibCephFS(rados_inst=self.rados)
462 log.debug("CephFS initializing...")
463 self.fs.init()
464 if premount_evict is not None:
465 log.debug("Premount eviction of {0} starting".format(premount_evict))
466 self.evict(premount_evict)
467 log.debug("Premount eviction of {0} completes".format(premount_evict))
468 log.debug("CephFS mounting...")
469 self.fs.mount()
470 log.debug("Connection to cephfs complete")
471
472 # Recover from partial auth updates due to a previous
473 # crash.
474 self.recover()
475
476 def get_mon_addrs(self):
477 log.info("get_mon_addrs")
478 result = []
479 mon_map = self._rados_command("mon dump")
480 for mon in mon_map['mons']:
481 ip_port = mon['addr'].split("/")[0]
482 result.append(ip_port)
483
484 return result
485
486 def disconnect(self):
487 log.info("disconnect")
488 if self.fs:
489 log.debug("Disconnecting cephfs...")
490 self.fs.shutdown()
491 self.fs = None
492 log.debug("Disconnecting cephfs complete")
493
494 if self.rados:
495 log.debug("Disconnecting rados...")
496 self.rados.shutdown()
497 self.rados = None
498 log.debug("Disconnecting rados complete")
499
500 def __del__(self):
501 self.disconnect()
502
503 def _get_pool_id(self, osd_map, pool_name):
504 # Maybe borrow the OSDMap wrapper class from calamari if more helpers
505 # like this are needed.
506 for pool in osd_map['pools']:
507 if pool['pool_name'] == pool_name:
508 return pool['pool']
509
510 return None
511
512 def _create_volume_pool(self, pool_name):
513 """
514 Idempotently create a pool for use as a CephFS data pool, with the given name
515
516 :return The ID of the created pool
517 """
518 osd_map = self._rados_command('osd dump', {})
519
520 existing_id = self._get_pool_id(osd_map, pool_name)
521 if existing_id is not None:
522 log.info("Pool {0} already exists".format(pool_name))
523 return existing_id
524
525 osd_count = len(osd_map['osds'])
526
527 # We can't query the actual cluster config remotely, but since this is
528 # just a heuristic we'll assume that the ceph.conf we have locally reflects
529 # that in use in the rest of the cluster.
530 pg_warn_max_per_osd = int(self.rados.conf_get('mon_max_pg_per_osd'))
531
532 other_pgs = 0
533 for pool in osd_map['pools']:
534 if not pool['pool_name'].startswith(self.POOL_PREFIX):
535 other_pgs += pool['pg_num']
536
537 # A basic heuristic for picking pg_num: work out the max number of
538 # PGs we can have without tripping a warning, then subtract the number
539 # of PGs already created by non-manila pools, then divide by ten. That'll
540 # give you a reasonable result on a system where you have "a few" manila
541 # shares.
542 pg_num = ((pg_warn_max_per_osd * osd_count) - other_pgs) / 10
543 # TODO Alternatively, respect an override set by the user.
544
545 self._rados_command(
546 'osd pool create',
547 {
548 'pool': pool_name,
549 'pg_num': pg_num
550 }
551 )
552
553 osd_map = self._rados_command('osd dump', {})
554 pool_id = self._get_pool_id(osd_map, pool_name)
555
556 if pool_id is None:
557 # If the pool isn't there, that's either a ceph bug, or it's some outside influence
558 # removing it right after we created it.
559 log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format(
560 pool_name, json.dumps(osd_map, indent=2)
561 ))
562 raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name))
563 else:
564 return pool_id
565
566 def create_group(self, group_id):
567 # Prevent craftily-named volume groups from colliding with the meta
568 # files.
569 if group_id.endswith(META_FILE_EXT):
570 raise ValueError("group ID cannot end with '{0}'.".format(
571 META_FILE_EXT))
572 path = self._get_group_path(group_id)
573 self._mkdir_p(path)
574
575 def destroy_group(self, group_id):
576 path = self._get_group_path(group_id)
577 try:
578 self.fs.stat(self.volume_prefix)
579 except cephfs.ObjectNotFound:
580 pass
581 else:
582 self.fs.rmdir(path)
583
584 def _mkdir_p(self, path):
585 try:
586 self.fs.stat(path)
587 except cephfs.ObjectNotFound:
588 pass
589 else:
590 return
591
592 parts = path.split(os.path.sep)
593
594 for i in range(1, len(parts) + 1):
595 subpath = os.path.join(*parts[0:i])
596 try:
597 self.fs.stat(subpath)
598 except cephfs.ObjectNotFound:
599 self.fs.mkdir(subpath, 0o755)
600
601 def create_volume(self, volume_path, size=None, data_isolated=False):
602 """
603 Set up metadata, pools and auth for a volume.
604
605 This function is idempotent. It is safe to call this again
606 for an already-created volume, even if it is in use.
607
608 :param volume_path: VolumePath instance
609 :param size: In bytes, or None for no size limit
610 :param data_isolated: If true, create a separate OSD pool for this volume
611 :return:
612 """
613 path = self._get_path(volume_path)
614 log.info("create_volume: {0}".format(path))
615
616 self._mkdir_p(path)
617
618 if size is not None:
619 self.fs.setxattr(path, 'ceph.quota.max_bytes', size.__str__(), 0)
620
621 # data_isolated means create a separate pool for this volume
622 if data_isolated:
623 pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
624 log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name))
625 pool_id = self._create_volume_pool(pool_name)
626 mds_map = self._rados_command("mds dump", {})
627 if pool_id not in mds_map['data_pools']:
628 self._rados_command("mds add_data_pool", {
629 'pool': pool_name
630 })
631 self.fs.setxattr(path, 'ceph.dir.layout.pool', pool_name, 0)
632
633 # enforce security isolation, use seperate namespace for this volume
634 namespace = "{0}{1}".format(self.pool_ns_prefix, volume_path.volume_id)
635 log.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path, namespace))
636 self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace', namespace, 0)
637
638 # Create a volume meta file, if it does not already exist, to store
639 # data about auth ids having access to the volume
640 fd = self.fs.open(self._volume_metadata_path(volume_path),
641 os.O_CREAT, 0o755)
642 self.fs.close(fd)
643
644 return {
645 'mount_path': path
646 }
647
648 def delete_volume(self, volume_path, data_isolated=False):
649 """
650 Make a volume inaccessible to guests. This function is
651 idempotent. This is the fast part of tearing down a volume: you must
652 also later call purge_volume, which is the slow part.
653
654 :param volume_path: Same identifier used in create_volume
655 :return:
656 """
657
658 path = self._get_path(volume_path)
659 log.info("delete_volume: {0}".format(path))
660
661 # Create the trash folder if it doesn't already exist
662 trash = os.path.join(self.volume_prefix, "_deleting")
663 self._mkdir_p(trash)
664
665 # We'll move it to here
666 trashed_volume = os.path.join(trash, volume_path.volume_id)
667
668 # Move the volume's data to the trash folder
669 try:
670 self.fs.stat(path)
671 except cephfs.ObjectNotFound:
672 log.warning("Trying to delete volume '{0}' but it's already gone".format(
673 path))
674 else:
675 self.fs.rename(path, trashed_volume)
676
677 # Delete the volume meta file, if it's not already deleted
678 vol_meta_path = self._volume_metadata_path(volume_path)
679 try:
680 self.fs.unlink(vol_meta_path)
681 except cephfs.ObjectNotFound:
682 pass
683
684 def purge_volume(self, volume_path, data_isolated=False):
685 """
686 Finish clearing up a volume that was previously passed to delete_volume. This
687 function is idempotent.
688 """
689
690 trash = os.path.join(self.volume_prefix, "_deleting")
691 trashed_volume = os.path.join(trash, volume_path.volume_id)
692
693 try:
694 self.fs.stat(trashed_volume)
695 except cephfs.ObjectNotFound:
696 log.warning("Trying to purge volume '{0}' but it's already been purged".format(
697 trashed_volume))
698 return
699
700 def rmtree(root_path):
701 log.debug("rmtree {0}".format(root_path))
702 dir_handle = self.fs.opendir(root_path)
703 d = self.fs.readdir(dir_handle)
704 while d:
705 if d.d_name not in [".", ".."]:
706 # Do not use os.path.join because it is sensitive
707 # to string encoding, we just pass through dnames
708 # as byte arrays
709 d_full = "{0}/{1}".format(root_path, d.d_name)
710 if d.is_dir():
711 rmtree(d_full)
712 else:
713 self.fs.unlink(d_full)
714
715 d = self.fs.readdir(dir_handle)
716 self.fs.closedir(dir_handle)
717
718 self.fs.rmdir(root_path)
719
720 rmtree(trashed_volume)
721
722 if data_isolated:
723 pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
724 osd_map = self._rados_command("osd dump", {})
725 pool_id = self._get_pool_id(osd_map, pool_name)
726 mds_map = self._rados_command("mds dump", {})
727 if pool_id in mds_map['data_pools']:
728 self._rados_command("mds remove_data_pool", {
729 'pool': pool_name
730 })
731 self._rados_command("osd pool delete",
732 {
733 "pool": pool_name,
734 "pool2": pool_name,
735 "sure": "--yes-i-really-really-mean-it"
736 })
737
738 def _get_ancestor_xattr(self, path, attr):
739 """
740 Helper for reading layout information: if this xattr is missing
741 on the requested path, keep checking parents until we find it.
742 """
743 try:
744 result = self.fs.getxattr(path, attr)
745 if result == "":
746 # Annoying! cephfs gives us empty instead of an error when attr not found
747 raise cephfs.NoData()
748 else:
749 return result
750 except cephfs.NoData:
751 if path == "/":
752 raise
753 else:
754 return self._get_ancestor_xattr(os.path.split(path)[0], attr)
755
756 def _check_compat_version(self, compat_version):
757 if self.version < compat_version:
758 msg = ("The current version of CephFSVolumeClient, version {0} "
759 "does not support the required feature. Need version {1} "
760 "or greater".format(self.version, compat_version)
761 )
762 log.error(msg)
763 raise CephFSVolumeClientError(msg)
764
765 def _metadata_get(self, path):
766 """
767 Return a deserialized JSON object, or None
768 """
769 fd = self.fs.open(path, "r")
770 # TODO iterate instead of assuming file < 4MB
771 read_bytes = self.fs.read(fd, 0, 4096 * 1024)
772 self.fs.close(fd)
773 if read_bytes:
774 return json.loads(read_bytes)
775 else:
776 return None
777
778 def _metadata_set(self, path, data):
779 serialized = json.dumps(data)
780 fd = self.fs.open(path, "w")
781 try:
782 self.fs.write(fd, serialized, 0)
783 self.fs.fsync(fd, 0)
784 finally:
785 self.fs.close(fd)
786
787 def _lock(self, path):
788 @contextmanager
789 def fn():
790 while(1):
791 fd = self.fs.open(path, os.O_CREAT, 0o755)
792 self.fs.flock(fd, fcntl.LOCK_EX, self._id)
793
794 # The locked file will be cleaned up sometime. It could be
795 # unlinked e.g., by an another manila share instance, before
796 # lock was applied on it. Perform checks to ensure that this
797 # does not happen.
798 try:
799 statbuf = self.fs.stat(path)
800 except cephfs.ObjectNotFound:
801 self.fs.close(fd)
802 continue
803
804 fstatbuf = self.fs.fstat(fd)
805 if statbuf.st_ino == fstatbuf.st_ino:
806 break
807
808 try:
809 yield
810 finally:
811 self.fs.flock(fd, fcntl.LOCK_UN, self._id)
812 self.fs.close(fd)
813
814 return fn()
815
816 def _auth_metadata_path(self, auth_id):
817 return os.path.join(self.volume_prefix, "${0}{1}".format(
818 auth_id, META_FILE_EXT))
819
820 def _auth_lock(self, auth_id):
821 return self._lock(self._auth_metadata_path(auth_id))
822
823 def _auth_metadata_get(self, auth_id):
824 """
825 Call me with the metadata locked!
826
827 Check whether a auth metadata structure can be decoded by the current
828 version of CephFSVolumeClient.
829
830 Return auth metadata that the current version of CephFSVolumeClient
831 can decode.
832 """
833 auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id))
834
835 if auth_metadata:
836 self._check_compat_version(auth_metadata['compat_version'])
837
838 return auth_metadata
839
840 def _auth_metadata_set(self, auth_id, data):
841 """
842 Call me with the metadata locked!
843
844 Fsync the auth metadata.
845
846 Add two version attributes to the auth metadata,
847 'compat_version', the minimum CephFSVolumeClient version that can
848 decode the metadata, and 'version', the CephFSVolumeClient version
849 that encoded the metadata.
850 """
851 data['compat_version'] = 1
852 data['version'] = self.version
853 return self._metadata_set(self._auth_metadata_path(auth_id), data)
854
855 def _volume_metadata_path(self, volume_path):
856 return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format(
857 volume_path.group_id if volume_path.group_id else "",
858 volume_path.volume_id,
859 META_FILE_EXT
860 ))
861
862 def _volume_lock(self, volume_path):
863 """
864 Return a ContextManager which locks the authorization metadata for
865 a particular volume, and persists a flag to the metadata indicating
866 that it is currently locked, so that we can detect dirty situations
867 during recovery.
868
869 This lock isn't just to make access to the metadata safe: it's also
870 designed to be used over the two-step process of checking the
871 metadata and then responding to an authorization request, to
872 ensure that at the point we respond the metadata hasn't changed
873 in the background. It's key to how we avoid security holes
874 resulting from races during that problem ,
875 """
876 return self._lock(self._volume_metadata_path(volume_path))
877
878 def _volume_metadata_get(self, volume_path):
879 """
880 Call me with the metadata locked!
881
882 Check whether a volume metadata structure can be decoded by the current
883 version of CephFSVolumeClient.
884
885 Return a volume_metadata structure that the current version of
886 CephFSVolumeClient can decode.
887 """
888 volume_metadata = self._metadata_get(self._volume_metadata_path(volume_path))
889
890 if volume_metadata:
891 self._check_compat_version(volume_metadata['compat_version'])
892
893 return volume_metadata
894
895 def _volume_metadata_set(self, volume_path, data):
896 """
897 Call me with the metadata locked!
898
899 Add two version attributes to the volume metadata,
900 'compat_version', the minimum CephFSVolumeClient version that can
901 decode the metadata and 'version', the CephFSVolumeClient version
902 that encoded the metadata.
903 """
904 data['compat_version'] = 1
905 data['version'] = self.version
906 return self._metadata_set(self._volume_metadata_path(volume_path), data)
907
908 def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None):
909 """
910 Get-or-create a Ceph auth identity for `auth_id` and grant them access
911 to
912 :param volume_path:
913 :param auth_id:
914 :param readonly:
915 :param tenant_id: Optionally provide a stringizable object to
916 restrict any created cephx IDs to other callers
917 passing the same tenant ID.
918 :return:
919 """
920
921 with self._auth_lock(auth_id):
922 # Existing meta, or None, to be updated
923 auth_meta = self._auth_metadata_get(auth_id)
924
925 # volume data to be inserted
926 volume_path_str = str(volume_path)
927 volume = {
928 volume_path_str : {
929 # The access level at which the auth_id is authorized to
930 # access the volume.
931 'access_level': 'r' if readonly else 'rw',
932 'dirty': True,
933 }
934 }
935 if auth_meta is None:
936 sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format(
937 auth_id, tenant_id
938 ))
939 log.debug("Authorize: no existing meta")
940 auth_meta = {
941 'dirty': True,
942 'tenant_id': tenant_id.__str__() if tenant_id else None,
943 'volumes': volume
944 }
945
946 # Note: this is *not* guaranteeing that the key doesn't already
947 # exist in Ceph: we are allowing VolumeClient tenants to
948 # 'claim' existing Ceph keys. In order to prevent VolumeClient
949 # tenants from reading e.g. client.admin keys, you need to
950 # have configured your VolumeClient user (e.g. Manila) to
951 # have mon auth caps that prevent it from accessing those keys
952 # (e.g. limit it to only access keys with a manila.* prefix)
953 else:
954 # Disallow tenants to share auth IDs
955 if auth_meta['tenant_id'].__str__() != tenant_id.__str__():
956 msg = "auth ID: {0} is already in use".format(auth_id)
957 log.error(msg)
958 raise CephFSVolumeClientError(msg)
959
960 if auth_meta['dirty']:
961 self._recover_auth_meta(auth_id, auth_meta)
962
963 log.debug("Authorize: existing tenant {tenant}".format(
964 tenant=auth_meta['tenant_id']
965 ))
966 auth_meta['dirty'] = True
967 auth_meta['volumes'].update(volume)
968
969 self._auth_metadata_set(auth_id, auth_meta)
970
971 with self._volume_lock(volume_path):
972 key = self._authorize_volume(volume_path, auth_id, readonly)
973
974 auth_meta['dirty'] = False
975 auth_meta['volumes'][volume_path_str]['dirty'] = False
976 self._auth_metadata_set(auth_id, auth_meta)
977
978 if tenant_id:
979 return {
980 'auth_key': key
981 }
982 else:
983 # Caller wasn't multi-tenant aware: be safe and don't give
984 # them a key
985 return {
986 'auth_key': None
987 }
988
989 def _authorize_volume(self, volume_path, auth_id, readonly):
990 vol_meta = self._volume_metadata_get(volume_path)
991
992 access_level = 'r' if readonly else 'rw'
993 auth = {
994 auth_id: {
995 'access_level': access_level,
996 'dirty': True,
997 }
998 }
999
1000 if vol_meta is None:
1001 vol_meta = {
1002 'auths': auth
1003 }
1004 else:
1005 vol_meta['auths'].update(auth)
1006 self._volume_metadata_set(volume_path, vol_meta)
1007
1008 key = self._authorize_ceph(volume_path, auth_id, readonly)
1009
1010 vol_meta['auths'][auth_id]['dirty'] = False
1011 self._volume_metadata_set(volume_path, vol_meta)
1012
1013 return key
1014
1015 def _authorize_ceph(self, volume_path, auth_id, readonly):
1016 path = self._get_path(volume_path)
1017 log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(
1018 auth_id, path
1019 ))
1020
1021 # First I need to work out what the data pool is for this share:
1022 # read the layout
1023 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
1024 namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_namespace")
1025
1026 # Now construct auth capabilities that give the guest just enough
1027 # permissions to access the share
1028 client_entity = "client.{0}".format(auth_id)
1029 want_access_level = 'r' if readonly else 'rw'
1030 want_mds_cap = 'allow {0} path={1}'.format(want_access_level, path)
1031 want_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
1032 want_access_level, pool_name, namespace)
1033
1034 try:
1035 existing = self._rados_command(
1036 'auth get',
1037 {
1038 'entity': client_entity
1039 }
1040 )
1041 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1042 except rados.Error:
1043 caps = self._rados_command(
1044 'auth get-or-create',
1045 {
1046 'entity': client_entity,
1047 'caps': [
1048 'mds', want_mds_cap,
1049 'osd', want_osd_cap,
1050 'mon', 'allow r']
1051 })
1052 else:
1053 # entity exists, update it
1054 cap = existing[0]
1055
1056 # Construct auth caps that if present might conflict with the desired
1057 # auth caps.
1058 unwanted_access_level = 'r' if want_access_level is 'rw' else 'rw'
1059 unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path)
1060 unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
1061 unwanted_access_level, pool_name, namespace)
1062
1063 def cap_update(orig, want, unwanted):
1064 # Updates the existing auth caps such that there is a single
1065 # occurrence of wanted auth caps and no occurrence of
1066 # conflicting auth caps.
1067
1068 if not orig:
1069 return want
1070
1071 cap_tokens = set(orig.split(","))
1072
1073 cap_tokens.discard(unwanted)
1074 cap_tokens.add(want)
1075
1076 return ",".join(cap_tokens)
1077
1078 osd_cap_str = cap_update(cap['caps'].get('osd', ""), want_osd_cap, unwanted_osd_cap)
1079 mds_cap_str = cap_update(cap['caps'].get('mds', ""), want_mds_cap, unwanted_mds_cap)
1080
1081 caps = self._rados_command(
1082 'auth caps',
1083 {
1084 'entity': client_entity,
1085 'caps': [
1086 'mds', mds_cap_str,
1087 'osd', osd_cap_str,
1088 'mon', cap['caps'].get('mon', 'allow r')]
1089 })
1090 caps = self._rados_command(
1091 'auth get',
1092 {
1093 'entity': client_entity
1094 }
1095 )
1096
1097 # Result expected like this:
1098 # [
1099 # {
1100 # "entity": "client.foobar",
1101 # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==",
1102 # "caps": {
1103 # "mds": "allow *",
1104 # "mon": "allow *"
1105 # }
1106 # }
1107 # ]
1108 assert len(caps) == 1
1109 assert caps[0]['entity'] == client_entity
1110 return caps[0]['key']
1111
1112 def deauthorize(self, volume_path, auth_id):
1113 with self._auth_lock(auth_id):
1114 # Existing meta, or None, to be updated
1115 auth_meta = self._auth_metadata_get(auth_id)
1116
1117 volume_path_str = str(volume_path)
1118 if (auth_meta is None) or (not auth_meta['volumes']):
1119 log.warn("deauthorized called for already-removed auth"
1120 "ID '{auth_id}' for volume ID '{volume}'".format(
1121 auth_id=auth_id, volume=volume_path.volume_id
1122 ))
1123 # Clean up the auth meta file of an auth ID
1124 self.fs.unlink(self._auth_metadata_path(auth_id))
1125 return
1126
1127 if volume_path_str not in auth_meta['volumes']:
1128 log.warn("deauthorized called for already-removed auth"
1129 "ID '{auth_id}' for volume ID '{volume}'".format(
1130 auth_id=auth_id, volume=volume_path.volume_id
1131 ))
1132 return
1133
1134 if auth_meta['dirty']:
1135 self._recover_auth_meta(auth_id, auth_meta)
1136
1137 auth_meta['dirty'] = True
1138 auth_meta['volumes'][volume_path_str]['dirty'] = True
1139 self._auth_metadata_set(auth_id, auth_meta)
1140
1141 self._deauthorize_volume(volume_path, auth_id)
1142
1143 # Filter out the volume we're deauthorizing
1144 del auth_meta['volumes'][volume_path_str]
1145
1146 # Clean up auth meta file
1147 if not auth_meta['volumes']:
1148 self.fs.unlink(self._auth_metadata_path(auth_id))
1149 return
1150
1151 auth_meta['dirty'] = False
1152 self._auth_metadata_set(auth_id, auth_meta)
1153
1154 def _deauthorize_volume(self, volume_path, auth_id):
1155 with self._volume_lock(volume_path):
1156 vol_meta = self._volume_metadata_get(volume_path)
1157
1158 if (vol_meta is None) or (auth_id not in vol_meta['auths']):
1159 log.warn("deauthorized called for already-removed auth"
1160 "ID '{auth_id}' for volume ID '{volume}'".format(
1161 auth_id=auth_id, volume=volume_path.volume_id
1162 ))
1163 return
1164
1165 vol_meta['auths'][auth_id]['dirty'] = True
1166 self._volume_metadata_set(volume_path, vol_meta)
1167
1168 self._deauthorize(volume_path, auth_id)
1169
1170 # Remove the auth_id from the metadata *after* removing it
1171 # from ceph, so that if we crashed here, we would actually
1172 # recreate the auth ID during recovery (i.e. end up with
1173 # a consistent state).
1174
1175 # Filter out the auth we're removing
1176 del vol_meta['auths'][auth_id]
1177 self._volume_metadata_set(volume_path, vol_meta)
1178
1179 def _deauthorize(self, volume_path, auth_id):
1180 """
1181 The volume must still exist.
1182 """
1183 client_entity = "client.{0}".format(auth_id)
1184 path = self._get_path(volume_path)
1185 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
1186 namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_namespace")
1187
1188 # The auth_id might have read-only or read-write mount access for the
1189 # volume path.
1190 access_levels = ('r', 'rw')
1191 want_mds_caps = {'allow {0} path={1}'.format(access_level, path)
1192 for access_level in access_levels}
1193 want_osd_caps = {'allow {0} pool={1} namespace={2}'.format(
1194 access_level, pool_name, namespace)
1195 for access_level in access_levels}
1196
1197 try:
1198 existing = self._rados_command(
1199 'auth get',
1200 {
1201 'entity': client_entity
1202 }
1203 )
1204
1205 def cap_remove(orig, want):
1206 cap_tokens = set(orig.split(","))
1207 return ",".join(cap_tokens.difference(want))
1208
1209 cap = existing[0]
1210 osd_cap_str = cap_remove(cap['caps'].get('osd', ""), want_osd_caps)
1211 mds_cap_str = cap_remove(cap['caps'].get('mds', ""), want_mds_caps)
1212 if (not osd_cap_str) and (not mds_cap_str):
1213 self._rados_command('auth del', {'entity': client_entity}, decode=False)
1214 else:
1215 self._rados_command(
1216 'auth caps',
1217 {
1218 'entity': client_entity,
1219 'caps': [
1220 'mds', mds_cap_str,
1221 'osd', osd_cap_str,
1222 'mon', cap['caps'].get('mon', 'allow r')]
1223 })
1224
1225 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1226 except rados.Error:
1227 # Already gone, great.
1228 return
1229
1230 def get_authorized_ids(self, volume_path):
1231 """
1232 Expose a list of auth IDs that have access to a volume.
1233
1234 return: a list of (auth_id, access_level) tuples, where
1235 the access_level can be 'r' , or 'rw'.
1236 None if no auth ID is given access to the volume.
1237 """
1238 with self._volume_lock(volume_path):
1239 meta = self._volume_metadata_get(volume_path)
1240 auths = []
1241 if not meta or not meta['auths']:
1242 return None
1243
1244 for auth, auth_data in meta['auths'].items():
1245 # Skip partial auth updates.
1246 if not auth_data['dirty']:
1247 auths.append((auth, auth_data['access_level']))
1248
1249 return auths
1250
1251 def _rados_command(self, prefix, args=None, decode=True):
1252 """
1253 Safer wrapper for ceph_argparse.json_command, which raises
1254 Error exception instead of relying on caller to check return
1255 codes.
1256
1257 Error exception can result from:
1258 * Timeout
1259 * Actual legitimate errors
1260 * Malformed JSON output
1261
1262 return: Decoded object from ceph, or None if empty string returned.
1263 If decode is False, return a string (the data returned by
1264 ceph command)
1265 """
1266 if args is None:
1267 args = {}
1268
1269 argdict = args.copy()
1270 argdict['format'] = 'json'
1271
1272 ret, outbuf, outs = json_command(self.rados,
1273 prefix=prefix,
1274 argdict=argdict,
1275 timeout=RADOS_TIMEOUT)
1276 if ret != 0:
1277 raise rados.Error(outs)
1278 else:
1279 if decode:
1280 if outbuf:
1281 try:
1282 return json.loads(outbuf)
1283 except (ValueError, TypeError):
1284 raise RadosError("Invalid JSON output for command {0}".format(argdict))
1285 else:
1286 return None
1287 else:
1288 return outbuf
1289
1290 def get_used_bytes(self, volume_path):
1291 return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir.rbytes"))
1292
1293 def set_max_bytes(self, volume_path, max_bytes):
1294 self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes',
1295 max_bytes.__str__() if max_bytes is not None else "0",
1296 0)
1297
1298 def _snapshot_path(self, dir_path, snapshot_name):
1299 return os.path.join(
1300 dir_path, self.rados.conf_get('client_snapdir'), snapshot_name
1301 )
1302
1303 def _snapshot_create(self, dir_path, snapshot_name):
1304 # TODO: raise intelligible exception for clusters where snaps are disabled
1305 self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), 0o755)
1306
1307 def _snapshot_destroy(self, dir_path, snapshot_name):
1308 """
1309 Remove a snapshot, or do nothing if it already doesn't exist.
1310 """
1311 try:
1312 self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name))
1313 except cephfs.ObjectNotFound:
1314 log.warn("Snapshot was already gone: {0}".format(snapshot_name))
1315
1316 def create_snapshot_volume(self, volume_path, snapshot_name):
1317 self._snapshot_create(self._get_path(volume_path), snapshot_name)
1318
1319 def destroy_snapshot_volume(self, volume_path, snapshot_name):
1320 self._snapshot_destroy(self._get_path(volume_path), snapshot_name)
1321
1322 def create_snapshot_group(self, group_id, snapshot_name):
1323 if group_id is None:
1324 raise RuntimeError("Group ID may not be None")
1325
1326 return self._snapshot_create(self._get_group_path(group_id), snapshot_name)
1327
1328 def destroy_snapshot_group(self, group_id, snapshot_name):
1329 if group_id is None:
1330 raise RuntimeError("Group ID may not be None")
1331 if snapshot_name is None:
1332 raise RuntimeError("Snapshot name may not be None")
1333
1334 return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name)
1335
1336 def _cp_r(self, src, dst):
1337 # TODO
1338 raise NotImplementedError()
1339
1340 def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name):
1341 dest_fs_path = self._get_path(dest_volume_path)
1342 src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name)
1343
1344 self._cp_r(src_snapshot_path, dest_fs_path)
1345
1346 def put_object(self, pool_name, object_name, data):
1347 """
1348 Synchronously write data to an object.
1349
1350 :param pool_name: name of the pool
1351 :type pool_name: str
1352 :param object_name: name of the object
1353 :type object_name: str
1354 :param data: data to write
1355 :type data: bytes
1356 """
1357 ioctx = self.rados.open_ioctx(pool_name)
1358 max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
1359 if len(data) > max_size:
1360 msg = ("Data to be written to object '{0}' exceeds "
1361 "{1} bytes".format(object_name, max_size))
1362 log.error(msg)
1363 raise CephFSVolumeClientError(msg)
1364 try:
1365 ioctx.write_full(object_name, data)
1366 finally:
1367 ioctx.close()
1368
1369 def get_object(self, pool_name, object_name):
1370 """
1371 Synchronously read data from object.
1372
1373 :param pool_name: name of the pool
1374 :type pool_name: str
1375 :param object_name: name of the object
1376 :type object_name: str
1377
1378 :returns: bytes - data read from object
1379 """
1380 ioctx = self.rados.open_ioctx(pool_name)
1381 max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
1382 try:
1383 bytes_read = ioctx.read(object_name, max_size)
1384 if ((len(bytes_read) == max_size) and
1385 (ioctx.read(object_name, 1, offset=max_size))):
1386 log.warning("Size of object {0} exceeds '{1}' bytes "
1387 "read".format(object_name, max_size))
1388 finally:
1389 ioctx.close()
1390 return bytes_read
1391
1392 def delete_object(self, pool_name, object_name):
1393 ioctx = self.rados.open_ioctx(pool_name)
1394 try:
1395 ioctx.remove_object(object_name)
1396 except rados.ObjectNotFound:
1397 log.warn("Object '{0}' was already removed".format(object_name))
1398 finally:
1399 ioctx.close()