]> git.proxmox.com Git - ceph.git/blame - ceph/src/pybind/ceph_volume_client.py
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / pybind / ceph_volume_client.py
CommitLineData
7c673cae
FG
1"""
2Copyright (C) 2015 Red Hat, Inc.
3
11fdf7f2 4LGPL2.1. See file COPYING.
7c673cae
FG
5"""
6
7from contextlib import contextmanager
8import errno
9import fcntl
10import json
11import logging
12import os
13import re
14import struct
15import sys
16import threading
17import time
18import uuid
19
20from ceph_argparse import json_command
21
22import cephfs
23import rados
24
91327a77
AA
25def to_bytes(param):
26 '''
27 Helper method that returns byte representation of the given parameter.
28 '''
29 if isinstance(param, str):
30 return param.encode()
31 else:
32 return str(param).encode()
7c673cae
FG
33
34class RadosError(Exception):
35 """
36 Something went wrong talking to Ceph with librados
37 """
38 pass
39
40
41RADOS_TIMEOUT = 10
7c673cae
FG
42
43log = logging.getLogger(__name__)
44
7c673cae
FG
45# Reserved volume group name which we use in paths for volumes
46# that are not assigned to a group (i.e. created with group=None)
47NO_GROUP_NAME = "_nogroup"
48
49# Filename extensions for meta files.
50META_FILE_EXT = ".meta"
51
52class VolumePath(object):
53 """
54 Identify a volume's path as group->volume
55 The Volume ID is a unique identifier, but this is a much more
56 helpful thing to pass around.
57 """
58 def __init__(self, group_id, volume_id):
59 self.group_id = group_id
60 self.volume_id = volume_id
61 assert self.group_id != NO_GROUP_NAME
62 assert self.volume_id != "" and self.volume_id is not None
63
64 def __str__(self):
65 return "{0}/{1}".format(self.group_id, self.volume_id)
66
67
68class ClusterTimeout(Exception):
69 """
70 Exception indicating that we timed out trying to talk to the Ceph cluster,
71 either to the mons, or to any individual daemon that the mons indicate ought
72 to be up but isn't responding to us.
73 """
74 pass
75
76
77class ClusterError(Exception):
78 """
79 Exception indicating that the cluster returned an error to a command that
80 we thought should be successful based on our last knowledge of the cluster
81 state.
82 """
83 def __init__(self, action, result_code, result_str):
84 self._action = action
85 self._result_code = result_code
86 self._result_str = result_str
87
88 def __str__(self):
89 return "Error {0} (\"{1}\") while {2}".format(
90 self._result_code, self._result_str, self._action)
91
92
93class RankEvicter(threading.Thread):
94 """
95 Thread for evicting client(s) from a particular MDS daemon instance.
96
97 This is more complex than simply sending a command, because we have to
98 handle cases where MDS daemons might not be fully up yet, and/or might
99 be transiently unresponsive to commands.
100 """
101 class GidGone(Exception):
102 pass
103
104 POLL_PERIOD = 5
105
106 def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout):
107 """
108 :param client_spec: list of strings, used as filter arguments to "session evict"
109 pass ["id=123"] to evict a single client with session id 123.
110 """
111 self.rank = rank
112 self.gid = gid
113 self._mds_map = mds_map
114 self._client_spec = client_spec
115 self._volume_client = volume_client
116 self._ready_timeout = ready_timeout
117 self._ready_waited = 0
118
119 self.success = False
120 self.exception = None
121
122 super(RankEvicter, self).__init__()
123
124 def _ready_to_evict(self):
125 if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid:
126 log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format(
127 self._client_spec, self.rank, self.gid
128 ))
129 raise RankEvicter.GidGone()
130
131 info = self._mds_map['info']["gid_{0}".format(self.gid)]
132 log.debug("_ready_to_evict: state={0}".format(info['state']))
133 return info['state'] in ["up:active", "up:clientreplay"]
134
135 def _wait_for_ready(self):
136 """
137 Wait for that MDS rank to reach an active or clientreplay state, and
138 not be laggy.
139 """
140 while not self._ready_to_evict():
141 if self._ready_waited > self._ready_timeout:
142 raise ClusterTimeout()
143
144 time.sleep(self.POLL_PERIOD)
145 self._ready_waited += self.POLL_PERIOD
146
11fdf7f2 147 self._mds_map = self._volume_client.get_mds_map()
7c673cae
FG
148
149 def _evict(self):
150 """
151 Run the eviction procedure. Return true on success, false on errors.
152 """
153
154 # Wait til the MDS is believed by the mon to be available for commands
155 try:
156 self._wait_for_ready()
157 except self.GidGone:
158 return True
159
160 # Then send it an evict
161 ret = errno.ETIMEDOUT
162 while ret == errno.ETIMEDOUT:
163 log.debug("mds_command: {0}, {1}".format(
164 "%s" % self.gid, ["session", "evict"] + self._client_spec
165 ))
166 ret, outb, outs = self._volume_client.fs.mds_command(
167 "%s" % self.gid,
168 [json.dumps({
169 "prefix": "session evict",
170 "filters": self._client_spec
171 })], "")
172 log.debug("mds_command: complete {0} {1}".format(ret, outs))
173
174 # If we get a clean response, great, it's gone from that rank.
175 if ret == 0:
176 return True
177 elif ret == errno.ETIMEDOUT:
178 # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error)
11fdf7f2 179 self._mds_map = self._volume_client.get_mds_map()
7c673cae
FG
180 try:
181 self._wait_for_ready()
182 except self.GidGone:
183 return True
184 else:
185 raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs)
186
187 def run(self):
188 try:
189 self._evict()
190 except Exception as e:
191 self.success = False
192 self.exception = e
193 else:
194 self.success = True
195
196
197class EvictionError(Exception):
198 pass
199
200
201class CephFSVolumeClientError(Exception):
202 """
203 Something went wrong talking to Ceph using CephFSVolumeClient.
204 """
205 pass
206
207
208CEPHFSVOLUMECLIENT_VERSION_HISTORY = """
209
210 CephFSVolumeClient Version History:
211
212 * 1 - Initial version
3efd9988 213 * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient
28e407b8 214 * 3 - Allow volumes to be created without RADOS namespace isolation
91327a77 215 * 4 - Added get_object_and_version, put_object_versioned method to CephFSVolumeClient
7c673cae
FG
216"""
217
218
219class CephFSVolumeClient(object):
220 """
221 Combine libcephfs and librados interfaces to implement a
222 'Volume' concept implemented as a cephfs directory and
223 client capabilities which restrict mount access to this
224 directory.
225
226 Additionally, volumes may be in a 'Group'. Conveniently,
227 volumes are a lot like manila shares, and groups are a lot
228 like manila consistency groups.
229
230 Refer to volumes with VolumePath, which specifies the
231 volume and group IDs (both strings). The group ID may
232 be None.
233
234 In general, functions in this class are allowed raise rados.Error
235 or cephfs.Error exceptions in unexpected situations.
236 """
237
238 # Current version
91327a77 239 version = 4
7c673cae
FG
240
241 # Where shall we create our volumes?
242 POOL_PREFIX = "fsvolume_"
243 DEFAULT_VOL_PREFIX = "/volumes"
244 DEFAULT_NS_PREFIX = "fsvolumens_"
245
11fdf7f2
TL
246 def __init__(self, auth_id=None, conf_path=None, cluster_name=None,
247 volume_prefix=None, pool_ns_prefix=None, rados=None,
248 fs_name=None):
249 """
250 Either set all three of ``auth_id``, ``conf_path`` and
251 ``cluster_name`` (rados constructed on connect), or
252 set ``rados`` (existing rados instance).
253 """
7c673cae 254 self.fs = None
11fdf7f2 255 self.fs_name = fs_name
7c673cae 256 self.connected = False
11fdf7f2 257
7c673cae
FG
258 self.conf_path = conf_path
259 self.cluster_name = cluster_name
260 self.auth_id = auth_id
11fdf7f2
TL
261
262 self.rados = rados
263 if self.rados:
264 # Using an externally owned rados, so we won't tear it down
265 # on disconnect
266 self.own_rados = False
267 else:
268 # self.rados will be constructed in connect
269 self.own_rados = True
270
7c673cae
FG
271 self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX
272 self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX
273 # For flock'ing in cephfs, I want a unique ID to distinguish me
274 # from any other manila-share services that are loading this module.
275 # We could use pid, but that's unnecessary weak: generate a
276 # UUID
91327a77 277 self._id = struct.unpack(">Q", uuid.uuid1().bytes[0:8])[0]
7c673cae
FG
278
279 # TODO: version the on-disk structures
280
281 def recover(self):
282 # Scan all auth keys to see if they're dirty: if they are, they have
283 # state that might not have propagated to Ceph or to the related
284 # volumes yet.
285
286 # Important: we *always* acquire locks in the order auth->volume
287 # That means a volume can never be dirty without the auth key
288 # we're updating it with being dirty at the same time.
289
290 # First list the auth IDs that have potentially dirty on-disk metadata
291 log.debug("Recovering from partial auth updates (if any)...")
292
293 try:
294 dir_handle = self.fs.opendir(self.volume_prefix)
295 except cephfs.ObjectNotFound:
296 log.debug("Nothing to recover. No auth meta files.")
297 return
298
299 d = self.fs.readdir(dir_handle)
300 auth_ids = []
301
302 if not d:
303 log.debug("Nothing to recover. No auth meta files.")
304
305 while d:
306 # Identify auth IDs from auth meta filenames. The auth meta files
307 # are named as, "$<auth_id><meta filename extension>"
308 regex = "^\$(.*){0}$".format(re.escape(META_FILE_EXT))
494da23a 309 match = re.search(regex, d.d_name.decode(encoding='utf-8'))
7c673cae
FG
310 if match:
311 auth_ids.append(match.group(1))
312
313 d = self.fs.readdir(dir_handle)
314
315 self.fs.closedir(dir_handle)
316
317 # Key points based on ordering:
318 # * Anything added in VMeta is already added in AMeta
319 # * Anything added in Ceph is already added in VMeta
320 # * Anything removed in VMeta is already removed in Ceph
321 # * Anything removed in AMeta is already removed in VMeta
322
323 # Deauthorization: because I only update metadata AFTER the
324 # update of the next level down, I have the same ordering of
325 # -> things which exist in the AMeta should also exist
326 # in the VMeta, should also exist in Ceph, and the same
327 # recovery procedure that gets me consistent after crashes
328 # during authorization will also work during deauthorization
329
330 # Now for each auth ID, check for dirty flag and apply updates
331 # if dirty flag is found
332 for auth_id in auth_ids:
333 with self._auth_lock(auth_id):
334 auth_meta = self._auth_metadata_get(auth_id)
335 if not auth_meta or not auth_meta['volumes']:
336 # Clean up auth meta file
337 self.fs.unlink(self._auth_metadata_path(auth_id))
338 continue
339 if not auth_meta['dirty']:
340 continue
341 self._recover_auth_meta(auth_id, auth_meta)
342
343 log.debug("Recovered from partial auth updates (if any).")
344
345 def _recover_auth_meta(self, auth_id, auth_meta):
346 """
347 Call me after locking the auth meta file.
348 """
349 remove_volumes = []
350
351 for volume, volume_data in auth_meta['volumes'].items():
352 if not volume_data['dirty']:
353 continue
354
355 (group_id, volume_id) = volume.split('/')
356 group_id = group_id if group_id is not 'None' else None
357 volume_path = VolumePath(group_id, volume_id)
358 access_level = volume_data['access_level']
359
360 with self._volume_lock(volume_path):
361 vol_meta = self._volume_metadata_get(volume_path)
362
363 # No VMeta update indicates that there was no auth update
364 # in Ceph either. So it's safe to remove corresponding
365 # partial update in AMeta.
366 if not vol_meta or auth_id not in vol_meta['auths']:
367 remove_volumes.append(volume)
368 continue
369
370 want_auth = {
371 'access_level': access_level,
372 'dirty': False,
373 }
374 # VMeta update looks clean. Ceph auth update must have been
375 # clean.
376 if vol_meta['auths'][auth_id] == want_auth:
377 continue
378
379 readonly = True if access_level is 'r' else False
380 self._authorize_volume(volume_path, auth_id, readonly)
381
382 # Recovered from partial auth updates for the auth ID's access
383 # to a volume.
384 auth_meta['volumes'][volume]['dirty'] = False
385 self._auth_metadata_set(auth_id, auth_meta)
386
387 for volume in remove_volumes:
388 del auth_meta['volumes'][volume]
389
390 if not auth_meta['volumes']:
391 # Clean up auth meta file
392 self.fs.unlink(self._auth_metadata_path(auth_id))
393 return
394
395 # Recovered from all partial auth updates for the auth ID.
396 auth_meta['dirty'] = False
397 self._auth_metadata_set(auth_id, auth_meta)
398
11fdf7f2
TL
399 def get_mds_map(self):
400 fs_map = self._rados_command("fs dump", {})
401 return fs_map['filesystems'][0]['mdsmap']
7c673cae
FG
402
403 def evict(self, auth_id, timeout=30, volume_path=None):
404 """
405 Evict all clients based on the authorization ID and optionally based on
406 the volume path mounted. Assumes that the authorization key has been
407 revoked prior to calling this function.
408
409 This operation can throw an exception if the mon cluster is unresponsive, or
410 any individual MDS daemon is unresponsive for longer than the timeout passed in.
411 """
412
413 client_spec = ["auth_name={0}".format(auth_id), ]
414 if volume_path:
415 client_spec.append("client_metadata.root={0}".
416 format(self._get_path(volume_path)))
417
418 log.info("evict clients with {0}".format(', '.join(client_spec)))
419
11fdf7f2 420 mds_map = self.get_mds_map()
7c673cae
FG
421 up = {}
422 for name, gid in mds_map['up'].items():
423 # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
424 assert name.startswith("mds_")
425 up[int(name[4:])] = gid
426
427 # For all MDS ranks held by a daemon
428 # Do the parallelism in python instead of using "tell mds.*", because
429 # the latter doesn't give us per-mds output
430 threads = []
431 for rank, gid in up.items():
432 thread = RankEvicter(self, client_spec, rank, gid, mds_map,
433 timeout)
434 thread.start()
435 threads.append(thread)
436
437 for t in threads:
438 t.join()
439
440 log.info("evict: joined all")
441
442 for t in threads:
443 if not t.success:
444 msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}".
445 format(', '.join(client_spec), t.rank, t.gid, t.exception)
446 )
447 log.error(msg)
448 raise EvictionError(msg)
449
450 def _get_path(self, volume_path):
451 """
452 Determine the path within CephFS where this volume will live
453 :return: absolute path (string)
454 """
455 return os.path.join(
456 self.volume_prefix,
457 volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME,
458 volume_path.volume_id)
459
460 def _get_group_path(self, group_id):
461 if group_id is None:
462 raise ValueError("group_id may not be None")
463
464 return os.path.join(
465 self.volume_prefix,
466 group_id
467 )
468
11fdf7f2 469 def _connect(self, premount_evict):
7c673cae
FG
470 log.debug("Connecting to cephfs...")
471 self.fs = cephfs.LibCephFS(rados_inst=self.rados)
472 log.debug("CephFS initializing...")
473 self.fs.init()
474 if premount_evict is not None:
475 log.debug("Premount eviction of {0} starting".format(premount_evict))
476 self.evict(premount_evict)
477 log.debug("Premount eviction of {0} completes".format(premount_evict))
478 log.debug("CephFS mounting...")
11fdf7f2 479 self.fs.mount(filesystem_name=self.fs_name)
7c673cae
FG
480 log.debug("Connection to cephfs complete")
481
482 # Recover from partial auth updates due to a previous
483 # crash.
484 self.recover()
485
11fdf7f2
TL
486 def connect(self, premount_evict = None):
487 """
488
489 :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers
490 may want to use this to specify their own auth ID if they expect
491 to be a unique instance and don't want to wait for caps to time
492 out after failure of another instance of themselves.
493 """
494 if self.own_rados:
495 log.debug("Configuring to RADOS with config {0}...".format(self.conf_path))
496 self.rados = rados.Rados(
497 name="client.{0}".format(self.auth_id),
498 clustername=self.cluster_name,
499 conffile=self.conf_path,
500 conf={}
501 )
502 if self.rados.state != "connected":
503 log.debug("Connecting to RADOS...")
504 self.rados.connect()
505 log.debug("Connection to RADOS complete")
506 self._connect(premount_evict)
507
7c673cae
FG
508 def get_mon_addrs(self):
509 log.info("get_mon_addrs")
510 result = []
511 mon_map = self._rados_command("mon dump")
512 for mon in mon_map['mons']:
513 ip_port = mon['addr'].split("/")[0]
514 result.append(ip_port)
515
516 return result
517
518 def disconnect(self):
519 log.info("disconnect")
520 if self.fs:
521 log.debug("Disconnecting cephfs...")
522 self.fs.shutdown()
523 self.fs = None
524 log.debug("Disconnecting cephfs complete")
525
11fdf7f2 526 if self.rados and self.own_rados:
7c673cae
FG
527 log.debug("Disconnecting rados...")
528 self.rados.shutdown()
529 self.rados = None
530 log.debug("Disconnecting rados complete")
531
11fdf7f2
TL
532 def __enter__(self):
533 self.connect()
534 return self
535
536 def __exit__(self, exc_type, exc_val, exc_tb):
537 self.disconnect()
538
7c673cae
FG
539 def __del__(self):
540 self.disconnect()
541
542 def _get_pool_id(self, osd_map, pool_name):
543 # Maybe borrow the OSDMap wrapper class from calamari if more helpers
544 # like this are needed.
545 for pool in osd_map['pools']:
546 if pool['pool_name'] == pool_name:
547 return pool['pool']
548
549 return None
550
551 def _create_volume_pool(self, pool_name):
552 """
553 Idempotently create a pool for use as a CephFS data pool, with the given name
554
555 :return The ID of the created pool
556 """
557 osd_map = self._rados_command('osd dump', {})
558
559 existing_id = self._get_pool_id(osd_map, pool_name)
560 if existing_id is not None:
561 log.info("Pool {0} already exists".format(pool_name))
562 return existing_id
563
564 osd_count = len(osd_map['osds'])
565
566 # We can't query the actual cluster config remotely, but since this is
567 # just a heuristic we'll assume that the ceph.conf we have locally reflects
568 # that in use in the rest of the cluster.
3efd9988 569 pg_warn_max_per_osd = int(self.rados.conf_get('mon_max_pg_per_osd'))
7c673cae
FG
570
571 other_pgs = 0
572 for pool in osd_map['pools']:
573 if not pool['pool_name'].startswith(self.POOL_PREFIX):
574 other_pgs += pool['pg_num']
575
576 # A basic heuristic for picking pg_num: work out the max number of
577 # PGs we can have without tripping a warning, then subtract the number
578 # of PGs already created by non-manila pools, then divide by ten. That'll
579 # give you a reasonable result on a system where you have "a few" manila
580 # shares.
91327a77 581 pg_num = ((pg_warn_max_per_osd * osd_count) - other_pgs) // 10
7c673cae
FG
582 # TODO Alternatively, respect an override set by the user.
583
584 self._rados_command(
585 'osd pool create',
586 {
587 'pool': pool_name,
91327a77 588 'pg_num': int(pg_num),
7c673cae
FG
589 }
590 )
591
592 osd_map = self._rados_command('osd dump', {})
593 pool_id = self._get_pool_id(osd_map, pool_name)
594
595 if pool_id is None:
596 # If the pool isn't there, that's either a ceph bug, or it's some outside influence
597 # removing it right after we created it.
598 log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format(
599 pool_name, json.dumps(osd_map, indent=2)
600 ))
601 raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name))
602 else:
603 return pool_id
604
f64942e4 605 def create_group(self, group_id, mode=0o755):
7c673cae
FG
606 # Prevent craftily-named volume groups from colliding with the meta
607 # files.
608 if group_id.endswith(META_FILE_EXT):
609 raise ValueError("group ID cannot end with '{0}'.".format(
610 META_FILE_EXT))
611 path = self._get_group_path(group_id)
f64942e4 612 self._mkdir_p(path, mode)
7c673cae
FG
613
614 def destroy_group(self, group_id):
615 path = self._get_group_path(group_id)
616 try:
617 self.fs.stat(self.volume_prefix)
618 except cephfs.ObjectNotFound:
619 pass
620 else:
621 self.fs.rmdir(path)
622
f64942e4 623 def _mkdir_p(self, path, mode=0o755):
7c673cae
FG
624 try:
625 self.fs.stat(path)
626 except cephfs.ObjectNotFound:
627 pass
628 else:
629 return
630
631 parts = path.split(os.path.sep)
632
633 for i in range(1, len(parts) + 1):
634 subpath = os.path.join(*parts[0:i])
635 try:
636 self.fs.stat(subpath)
637 except cephfs.ObjectNotFound:
f64942e4 638 self.fs.mkdir(subpath, mode)
7c673cae 639
f64942e4
AA
640 def create_volume(self, volume_path, size=None, data_isolated=False, namespace_isolated=True,
641 mode=0o755):
7c673cae
FG
642 """
643 Set up metadata, pools and auth for a volume.
644
645 This function is idempotent. It is safe to call this again
646 for an already-created volume, even if it is in use.
647
648 :param volume_path: VolumePath instance
649 :param size: In bytes, or None for no size limit
650 :param data_isolated: If true, create a separate OSD pool for this volume
28e407b8 651 :param namespace_isolated: If true, use separate RADOS namespace for this volume
7c673cae
FG
652 :return:
653 """
654 path = self._get_path(volume_path)
655 log.info("create_volume: {0}".format(path))
656
f64942e4 657 self._mkdir_p(path, mode)
7c673cae
FG
658
659 if size is not None:
91327a77 660 self.fs.setxattr(path, 'ceph.quota.max_bytes', to_bytes(size), 0)
7c673cae
FG
661
662 # data_isolated means create a separate pool for this volume
663 if data_isolated:
664 pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
665 log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name))
666 pool_id = self._create_volume_pool(pool_name)
11fdf7f2 667 mds_map = self.get_mds_map()
7c673cae 668 if pool_id not in mds_map['data_pools']:
11fdf7f2
TL
669 self._rados_command("fs add_data_pool", {
670 'fs_name': mds_map['fs_name'],
7c673cae
FG
671 'pool': pool_name
672 })
91327a77
AA
673 time.sleep(5) # time for MDSMap to be distributed
674 self.fs.setxattr(path, 'ceph.dir.layout.pool', to_bytes(pool_name), 0)
7c673cae 675
28e407b8
AA
676 # enforce security isolation, use separate namespace for this volume
677 if namespace_isolated:
678 namespace = "{0}{1}".format(self.pool_ns_prefix, volume_path.volume_id)
679 log.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path, namespace))
91327a77
AA
680 self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace',
681 to_bytes(namespace), 0)
28e407b8
AA
682 else:
683 # If volume's namespace layout is not set, then the volume's pool
684 # layout remains unset and will undesirably change with ancestor's
685 # pool layout changes.
686 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
91327a77
AA
687 self.fs.setxattr(path, 'ceph.dir.layout.pool',
688 to_bytes(pool_name), 0)
7c673cae
FG
689
690 # Create a volume meta file, if it does not already exist, to store
691 # data about auth ids having access to the volume
692 fd = self.fs.open(self._volume_metadata_path(volume_path),
693 os.O_CREAT, 0o755)
694 self.fs.close(fd)
695
696 return {
697 'mount_path': path
698 }
699
700 def delete_volume(self, volume_path, data_isolated=False):
701 """
702 Make a volume inaccessible to guests. This function is
703 idempotent. This is the fast part of tearing down a volume: you must
704 also later call purge_volume, which is the slow part.
705
706 :param volume_path: Same identifier used in create_volume
707 :return:
708 """
709
710 path = self._get_path(volume_path)
711 log.info("delete_volume: {0}".format(path))
712
713 # Create the trash folder if it doesn't already exist
714 trash = os.path.join(self.volume_prefix, "_deleting")
715 self._mkdir_p(trash)
716
717 # We'll move it to here
718 trashed_volume = os.path.join(trash, volume_path.volume_id)
719
720 # Move the volume's data to the trash folder
721 try:
722 self.fs.stat(path)
723 except cephfs.ObjectNotFound:
724 log.warning("Trying to delete volume '{0}' but it's already gone".format(
725 path))
726 else:
727 self.fs.rename(path, trashed_volume)
728
729 # Delete the volume meta file, if it's not already deleted
730 vol_meta_path = self._volume_metadata_path(volume_path)
731 try:
732 self.fs.unlink(vol_meta_path)
733 except cephfs.ObjectNotFound:
734 pass
735
736 def purge_volume(self, volume_path, data_isolated=False):
737 """
738 Finish clearing up a volume that was previously passed to delete_volume. This
739 function is idempotent.
740 """
741
742 trash = os.path.join(self.volume_prefix, "_deleting")
743 trashed_volume = os.path.join(trash, volume_path.volume_id)
744
745 try:
746 self.fs.stat(trashed_volume)
747 except cephfs.ObjectNotFound:
748 log.warning("Trying to purge volume '{0}' but it's already been purged".format(
749 trashed_volume))
750 return
751
752 def rmtree(root_path):
753 log.debug("rmtree {0}".format(root_path))
754 dir_handle = self.fs.opendir(root_path)
755 d = self.fs.readdir(dir_handle)
756 while d:
494da23a
TL
757 d_name = d.d_name.decode(encoding='utf-8')
758 if d_name not in [".", ".."]:
7c673cae
FG
759 # Do not use os.path.join because it is sensitive
760 # to string encoding, we just pass through dnames
761 # as byte arrays
494da23a 762 d_full = u"{0}/{1}".format(root_path, d_name)
7c673cae
FG
763 if d.is_dir():
764 rmtree(d_full)
765 else:
766 self.fs.unlink(d_full)
767
768 d = self.fs.readdir(dir_handle)
769 self.fs.closedir(dir_handle)
770
771 self.fs.rmdir(root_path)
772
773 rmtree(trashed_volume)
774
775 if data_isolated:
776 pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
777 osd_map = self._rados_command("osd dump", {})
778 pool_id = self._get_pool_id(osd_map, pool_name)
11fdf7f2 779 mds_map = self.get_mds_map()
7c673cae 780 if pool_id in mds_map['data_pools']:
11fdf7f2
TL
781 self._rados_command("fs rm_data_pool", {
782 'fs_name': mds_map['fs_name'],
7c673cae
FG
783 'pool': pool_name
784 })
785 self._rados_command("osd pool delete",
786 {
787 "pool": pool_name,
788 "pool2": pool_name,
11fdf7f2 789 "yes_i_really_really_mean_it": True
7c673cae
FG
790 })
791
792 def _get_ancestor_xattr(self, path, attr):
793 """
794 Helper for reading layout information: if this xattr is missing
795 on the requested path, keep checking parents until we find it.
796 """
797 try:
91327a77 798 result = self.fs.getxattr(path, attr).decode()
7c673cae
FG
799 if result == "":
800 # Annoying! cephfs gives us empty instead of an error when attr not found
801 raise cephfs.NoData()
802 else:
803 return result
804 except cephfs.NoData:
805 if path == "/":
806 raise
807 else:
808 return self._get_ancestor_xattr(os.path.split(path)[0], attr)
809
810 def _check_compat_version(self, compat_version):
811 if self.version < compat_version:
812 msg = ("The current version of CephFSVolumeClient, version {0} "
813 "does not support the required feature. Need version {1} "
814 "or greater".format(self.version, compat_version)
815 )
816 log.error(msg)
817 raise CephFSVolumeClientError(msg)
818
819 def _metadata_get(self, path):
820 """
821 Return a deserialized JSON object, or None
822 """
823 fd = self.fs.open(path, "r")
824 # TODO iterate instead of assuming file < 4MB
825 read_bytes = self.fs.read(fd, 0, 4096 * 1024)
826 self.fs.close(fd)
827 if read_bytes:
91327a77 828 return json.loads(read_bytes.decode())
7c673cae
FG
829 else:
830 return None
831
832 def _metadata_set(self, path, data):
833 serialized = json.dumps(data)
834 fd = self.fs.open(path, "w")
835 try:
91327a77 836 self.fs.write(fd, to_bytes(serialized), 0)
7c673cae
FG
837 self.fs.fsync(fd, 0)
838 finally:
839 self.fs.close(fd)
840
841 def _lock(self, path):
842 @contextmanager
843 def fn():
844 while(1):
845 fd = self.fs.open(path, os.O_CREAT, 0o755)
846 self.fs.flock(fd, fcntl.LOCK_EX, self._id)
847
848 # The locked file will be cleaned up sometime. It could be
849 # unlinked e.g., by an another manila share instance, before
850 # lock was applied on it. Perform checks to ensure that this
851 # does not happen.
852 try:
853 statbuf = self.fs.stat(path)
854 except cephfs.ObjectNotFound:
855 self.fs.close(fd)
856 continue
857
858 fstatbuf = self.fs.fstat(fd)
859 if statbuf.st_ino == fstatbuf.st_ino:
860 break
861
862 try:
863 yield
864 finally:
865 self.fs.flock(fd, fcntl.LOCK_UN, self._id)
866 self.fs.close(fd)
867
868 return fn()
869
870 def _auth_metadata_path(self, auth_id):
871 return os.path.join(self.volume_prefix, "${0}{1}".format(
872 auth_id, META_FILE_EXT))
873
874 def _auth_lock(self, auth_id):
875 return self._lock(self._auth_metadata_path(auth_id))
876
877 def _auth_metadata_get(self, auth_id):
878 """
879 Call me with the metadata locked!
880
881 Check whether a auth metadata structure can be decoded by the current
882 version of CephFSVolumeClient.
883
884 Return auth metadata that the current version of CephFSVolumeClient
885 can decode.
886 """
887 auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id))
888
889 if auth_metadata:
890 self._check_compat_version(auth_metadata['compat_version'])
891
892 return auth_metadata
893
894 def _auth_metadata_set(self, auth_id, data):
895 """
896 Call me with the metadata locked!
897
898 Fsync the auth metadata.
899
900 Add two version attributes to the auth metadata,
901 'compat_version', the minimum CephFSVolumeClient version that can
902 decode the metadata, and 'version', the CephFSVolumeClient version
903 that encoded the metadata.
904 """
905 data['compat_version'] = 1
3efd9988 906 data['version'] = self.version
7c673cae
FG
907 return self._metadata_set(self._auth_metadata_path(auth_id), data)
908
909 def _volume_metadata_path(self, volume_path):
910 return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format(
911 volume_path.group_id if volume_path.group_id else "",
912 volume_path.volume_id,
913 META_FILE_EXT
914 ))
915
916 def _volume_lock(self, volume_path):
917 """
918 Return a ContextManager which locks the authorization metadata for
919 a particular volume, and persists a flag to the metadata indicating
920 that it is currently locked, so that we can detect dirty situations
921 during recovery.
922
923 This lock isn't just to make access to the metadata safe: it's also
924 designed to be used over the two-step process of checking the
925 metadata and then responding to an authorization request, to
926 ensure that at the point we respond the metadata hasn't changed
927 in the background. It's key to how we avoid security holes
928 resulting from races during that problem ,
929 """
930 return self._lock(self._volume_metadata_path(volume_path))
931
932 def _volume_metadata_get(self, volume_path):
933 """
934 Call me with the metadata locked!
935
936 Check whether a volume metadata structure can be decoded by the current
937 version of CephFSVolumeClient.
938
939 Return a volume_metadata structure that the current version of
940 CephFSVolumeClient can decode.
941 """
942 volume_metadata = self._metadata_get(self._volume_metadata_path(volume_path))
943
944 if volume_metadata:
945 self._check_compat_version(volume_metadata['compat_version'])
946
947 return volume_metadata
948
949 def _volume_metadata_set(self, volume_path, data):
950 """
951 Call me with the metadata locked!
952
953 Add two version attributes to the volume metadata,
954 'compat_version', the minimum CephFSVolumeClient version that can
955 decode the metadata and 'version', the CephFSVolumeClient version
956 that encoded the metadata.
957 """
958 data['compat_version'] = 1
3efd9988 959 data['version'] = self.version
7c673cae
FG
960 return self._metadata_set(self._volume_metadata_path(volume_path), data)
961
962 def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None):
963 """
964 Get-or-create a Ceph auth identity for `auth_id` and grant them access
965 to
966 :param volume_path:
967 :param auth_id:
968 :param readonly:
969 :param tenant_id: Optionally provide a stringizable object to
970 restrict any created cephx IDs to other callers
971 passing the same tenant ID.
972 :return:
973 """
974
975 with self._auth_lock(auth_id):
976 # Existing meta, or None, to be updated
977 auth_meta = self._auth_metadata_get(auth_id)
978
979 # volume data to be inserted
980 volume_path_str = str(volume_path)
981 volume = {
982 volume_path_str : {
983 # The access level at which the auth_id is authorized to
984 # access the volume.
985 'access_level': 'r' if readonly else 'rw',
986 'dirty': True,
987 }
988 }
989 if auth_meta is None:
990 sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format(
991 auth_id, tenant_id
992 ))
993 log.debug("Authorize: no existing meta")
994 auth_meta = {
995 'dirty': True,
996 'tenant_id': tenant_id.__str__() if tenant_id else None,
997 'volumes': volume
998 }
999
1000 # Note: this is *not* guaranteeing that the key doesn't already
1001 # exist in Ceph: we are allowing VolumeClient tenants to
1002 # 'claim' existing Ceph keys. In order to prevent VolumeClient
1003 # tenants from reading e.g. client.admin keys, you need to
1004 # have configured your VolumeClient user (e.g. Manila) to
1005 # have mon auth caps that prevent it from accessing those keys
1006 # (e.g. limit it to only access keys with a manila.* prefix)
1007 else:
1008 # Disallow tenants to share auth IDs
1009 if auth_meta['tenant_id'].__str__() != tenant_id.__str__():
1010 msg = "auth ID: {0} is already in use".format(auth_id)
1011 log.error(msg)
1012 raise CephFSVolumeClientError(msg)
1013
1014 if auth_meta['dirty']:
1015 self._recover_auth_meta(auth_id, auth_meta)
1016
1017 log.debug("Authorize: existing tenant {tenant}".format(
1018 tenant=auth_meta['tenant_id']
1019 ))
1020 auth_meta['dirty'] = True
1021 auth_meta['volumes'].update(volume)
1022
1023 self._auth_metadata_set(auth_id, auth_meta)
1024
1025 with self._volume_lock(volume_path):
1026 key = self._authorize_volume(volume_path, auth_id, readonly)
1027
1028 auth_meta['dirty'] = False
1029 auth_meta['volumes'][volume_path_str]['dirty'] = False
1030 self._auth_metadata_set(auth_id, auth_meta)
1031
1032 if tenant_id:
1033 return {
1034 'auth_key': key
1035 }
1036 else:
1037 # Caller wasn't multi-tenant aware: be safe and don't give
1038 # them a key
1039 return {
1040 'auth_key': None
1041 }
1042
1043 def _authorize_volume(self, volume_path, auth_id, readonly):
1044 vol_meta = self._volume_metadata_get(volume_path)
1045
1046 access_level = 'r' if readonly else 'rw'
1047 auth = {
1048 auth_id: {
1049 'access_level': access_level,
1050 'dirty': True,
1051 }
1052 }
1053
1054 if vol_meta is None:
1055 vol_meta = {
1056 'auths': auth
1057 }
1058 else:
1059 vol_meta['auths'].update(auth)
1060 self._volume_metadata_set(volume_path, vol_meta)
1061
1062 key = self._authorize_ceph(volume_path, auth_id, readonly)
1063
1064 vol_meta['auths'][auth_id]['dirty'] = False
1065 self._volume_metadata_set(volume_path, vol_meta)
1066
1067 return key
1068
1069 def _authorize_ceph(self, volume_path, auth_id, readonly):
1070 path = self._get_path(volume_path)
1071 log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(
1072 auth_id, path
1073 ))
1074
1075 # First I need to work out what the data pool is for this share:
1076 # read the layout
1077 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
28e407b8
AA
1078
1079 try:
91327a77
AA
1080 namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_"
1081 "namespace").decode()
28e407b8
AA
1082 except cephfs.NoData:
1083 namespace = None
7c673cae
FG
1084
1085 # Now construct auth capabilities that give the guest just enough
1086 # permissions to access the share
1087 client_entity = "client.{0}".format(auth_id)
1088 want_access_level = 'r' if readonly else 'rw'
1089 want_mds_cap = 'allow {0} path={1}'.format(want_access_level, path)
28e407b8
AA
1090 if namespace:
1091 want_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
1092 want_access_level, pool_name, namespace)
1093 else:
1094 want_osd_cap = 'allow {0} pool={1}'.format(want_access_level,
1095 pool_name)
7c673cae
FG
1096
1097 try:
1098 existing = self._rados_command(
1099 'auth get',
1100 {
1101 'entity': client_entity
1102 }
1103 )
1104 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1105 except rados.Error:
1106 caps = self._rados_command(
1107 'auth get-or-create',
1108 {
1109 'entity': client_entity,
1110 'caps': [
1111 'mds', want_mds_cap,
1112 'osd', want_osd_cap,
1113 'mon', 'allow r']
1114 })
1115 else:
1116 # entity exists, update it
1117 cap = existing[0]
1118
1119 # Construct auth caps that if present might conflict with the desired
1120 # auth caps.
1121 unwanted_access_level = 'r' if want_access_level is 'rw' else 'rw'
1122 unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path)
28e407b8
AA
1123 if namespace:
1124 unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
1125 unwanted_access_level, pool_name, namespace)
1126 else:
1127 unwanted_osd_cap = 'allow {0} pool={1}'.format(
1128 unwanted_access_level, pool_name)
1129
1130 def cap_update(
1131 orig_mds_caps, orig_osd_caps, want_mds_cap,
1132 want_osd_cap, unwanted_mds_cap, unwanted_osd_cap):
7c673cae 1133
28e407b8
AA
1134 if not orig_mds_caps:
1135 return want_mds_cap, want_osd_cap
7c673cae 1136
28e407b8
AA
1137 mds_cap_tokens = orig_mds_caps.split(",")
1138 osd_cap_tokens = orig_osd_caps.split(",")
3efd9988 1139
28e407b8
AA
1140 if want_mds_cap in mds_cap_tokens:
1141 return orig_mds_caps, orig_osd_caps
7c673cae 1142
28e407b8
AA
1143 if unwanted_mds_cap in mds_cap_tokens:
1144 mds_cap_tokens.remove(unwanted_mds_cap)
1145 osd_cap_tokens.remove(unwanted_osd_cap)
7c673cae 1146
28e407b8
AA
1147 mds_cap_tokens.append(want_mds_cap)
1148 osd_cap_tokens.append(want_osd_cap)
7c673cae 1149
28e407b8
AA
1150 return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens)
1151
1152 orig_mds_caps = cap['caps'].get('mds', "")
1153 orig_osd_caps = cap['caps'].get('osd', "")
1154
1155 mds_cap_str, osd_cap_str = cap_update(
1156 orig_mds_caps, orig_osd_caps, want_mds_cap, want_osd_cap,
1157 unwanted_mds_cap, unwanted_osd_cap)
7c673cae
FG
1158
1159 caps = self._rados_command(
1160 'auth caps',
1161 {
1162 'entity': client_entity,
1163 'caps': [
1164 'mds', mds_cap_str,
1165 'osd', osd_cap_str,
1166 'mon', cap['caps'].get('mon', 'allow r')]
1167 })
1168 caps = self._rados_command(
1169 'auth get',
1170 {
1171 'entity': client_entity
1172 }
1173 )
1174
1175 # Result expected like this:
1176 # [
1177 # {
1178 # "entity": "client.foobar",
1179 # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==",
1180 # "caps": {
1181 # "mds": "allow *",
1182 # "mon": "allow *"
1183 # }
1184 # }
1185 # ]
1186 assert len(caps) == 1
1187 assert caps[0]['entity'] == client_entity
1188 return caps[0]['key']
1189
1190 def deauthorize(self, volume_path, auth_id):
1191 with self._auth_lock(auth_id):
1192 # Existing meta, or None, to be updated
1193 auth_meta = self._auth_metadata_get(auth_id)
1194
1195 volume_path_str = str(volume_path)
1196 if (auth_meta is None) or (not auth_meta['volumes']):
1197 log.warn("deauthorized called for already-removed auth"
1198 "ID '{auth_id}' for volume ID '{volume}'".format(
1199 auth_id=auth_id, volume=volume_path.volume_id
1200 ))
1201 # Clean up the auth meta file of an auth ID
1202 self.fs.unlink(self._auth_metadata_path(auth_id))
1203 return
1204
1205 if volume_path_str not in auth_meta['volumes']:
1206 log.warn("deauthorized called for already-removed auth"
1207 "ID '{auth_id}' for volume ID '{volume}'".format(
1208 auth_id=auth_id, volume=volume_path.volume_id
1209 ))
1210 return
1211
1212 if auth_meta['dirty']:
1213 self._recover_auth_meta(auth_id, auth_meta)
1214
1215 auth_meta['dirty'] = True
1216 auth_meta['volumes'][volume_path_str]['dirty'] = True
1217 self._auth_metadata_set(auth_id, auth_meta)
1218
1219 self._deauthorize_volume(volume_path, auth_id)
1220
1221 # Filter out the volume we're deauthorizing
1222 del auth_meta['volumes'][volume_path_str]
1223
1224 # Clean up auth meta file
1225 if not auth_meta['volumes']:
1226 self.fs.unlink(self._auth_metadata_path(auth_id))
1227 return
1228
1229 auth_meta['dirty'] = False
1230 self._auth_metadata_set(auth_id, auth_meta)
1231
1232 def _deauthorize_volume(self, volume_path, auth_id):
1233 with self._volume_lock(volume_path):
1234 vol_meta = self._volume_metadata_get(volume_path)
1235
1236 if (vol_meta is None) or (auth_id not in vol_meta['auths']):
1237 log.warn("deauthorized called for already-removed auth"
1238 "ID '{auth_id}' for volume ID '{volume}'".format(
1239 auth_id=auth_id, volume=volume_path.volume_id
1240 ))
1241 return
1242
1243 vol_meta['auths'][auth_id]['dirty'] = True
1244 self._volume_metadata_set(volume_path, vol_meta)
1245
1246 self._deauthorize(volume_path, auth_id)
1247
1248 # Remove the auth_id from the metadata *after* removing it
1249 # from ceph, so that if we crashed here, we would actually
1250 # recreate the auth ID during recovery (i.e. end up with
1251 # a consistent state).
1252
1253 # Filter out the auth we're removing
1254 del vol_meta['auths'][auth_id]
1255 self._volume_metadata_set(volume_path, vol_meta)
1256
1257 def _deauthorize(self, volume_path, auth_id):
1258 """
1259 The volume must still exist.
1260 """
1261 client_entity = "client.{0}".format(auth_id)
1262 path = self._get_path(volume_path)
1263 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
28e407b8 1264 try:
91327a77
AA
1265 namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_"
1266 "namespace").decode()
28e407b8
AA
1267 except cephfs.NoData:
1268 namespace = None
7c673cae
FG
1269
1270 # The auth_id might have read-only or read-write mount access for the
1271 # volume path.
1272 access_levels = ('r', 'rw')
28e407b8
AA
1273 want_mds_caps = ['allow {0} path={1}'.format(access_level, path)
1274 for access_level in access_levels]
1275 if namespace:
1276 want_osd_caps = ['allow {0} pool={1} namespace={2}'.format(access_level, pool_name, namespace)
1277 for access_level in access_levels]
1278 else:
1279 want_osd_caps = ['allow {0} pool={1}'.format(access_level, pool_name)
1280 for access_level in access_levels]
1281
7c673cae
FG
1282
1283 try:
1284 existing = self._rados_command(
1285 'auth get',
1286 {
1287 'entity': client_entity
1288 }
1289 )
1290
28e407b8
AA
1291 def cap_remove(orig_mds_caps, orig_osd_caps, want_mds_caps, want_osd_caps):
1292 mds_cap_tokens = orig_mds_caps.split(",")
1293 osd_cap_tokens = orig_osd_caps.split(",")
1294
1295 for want_mds_cap, want_osd_cap in zip(want_mds_caps, want_osd_caps):
1296 if want_mds_cap in mds_cap_tokens:
1297 mds_cap_tokens.remove(want_mds_cap)
1298 osd_cap_tokens.remove(want_osd_cap)
1299 break
1300
1301 return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens)
7c673cae
FG
1302
1303 cap = existing[0]
28e407b8
AA
1304 orig_mds_caps = cap['caps'].get('mds', "")
1305 orig_osd_caps = cap['caps'].get('osd', "")
1306 mds_cap_str, osd_cap_str = cap_remove(orig_mds_caps, orig_osd_caps,
1307 want_mds_caps, want_osd_caps)
1308
1309 if not mds_cap_str:
7c673cae
FG
1310 self._rados_command('auth del', {'entity': client_entity}, decode=False)
1311 else:
1312 self._rados_command(
1313 'auth caps',
1314 {
1315 'entity': client_entity,
1316 'caps': [
1317 'mds', mds_cap_str,
1318 'osd', osd_cap_str,
1319 'mon', cap['caps'].get('mon', 'allow r')]
1320 })
1321
1322 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1323 except rados.Error:
1324 # Already gone, great.
1325 return
1326
1327 def get_authorized_ids(self, volume_path):
1328 """
1329 Expose a list of auth IDs that have access to a volume.
1330
1331 return: a list of (auth_id, access_level) tuples, where
1332 the access_level can be 'r' , or 'rw'.
1333 None if no auth ID is given access to the volume.
1334 """
1335 with self._volume_lock(volume_path):
1336 meta = self._volume_metadata_get(volume_path)
1337 auths = []
1338 if not meta or not meta['auths']:
1339 return None
1340
1341 for auth, auth_data in meta['auths'].items():
1342 # Skip partial auth updates.
1343 if not auth_data['dirty']:
1344 auths.append((auth, auth_data['access_level']))
1345
1346 return auths
1347
1348 def _rados_command(self, prefix, args=None, decode=True):
1349 """
1350 Safer wrapper for ceph_argparse.json_command, which raises
1351 Error exception instead of relying on caller to check return
1352 codes.
1353
1354 Error exception can result from:
1355 * Timeout
1356 * Actual legitimate errors
1357 * Malformed JSON output
1358
1359 return: Decoded object from ceph, or None if empty string returned.
1360 If decode is False, return a string (the data returned by
1361 ceph command)
1362 """
1363 if args is None:
1364 args = {}
1365
1366 argdict = args.copy()
1367 argdict['format'] = 'json'
1368
1369 ret, outbuf, outs = json_command(self.rados,
1370 prefix=prefix,
1371 argdict=argdict,
1372 timeout=RADOS_TIMEOUT)
1373 if ret != 0:
1374 raise rados.Error(outs)
1375 else:
1376 if decode:
1377 if outbuf:
1378 try:
91327a77 1379 return json.loads(outbuf.decode())
7c673cae
FG
1380 except (ValueError, TypeError):
1381 raise RadosError("Invalid JSON output for command {0}".format(argdict))
1382 else:
1383 return None
1384 else:
1385 return outbuf
1386
1387 def get_used_bytes(self, volume_path):
91327a77
AA
1388 return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir."
1389 "rbytes").decode())
7c673cae
FG
1390
1391 def set_max_bytes(self, volume_path, max_bytes):
1392 self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes',
91327a77 1393 to_bytes(max_bytes if max_bytes else 0), 0)
7c673cae
FG
1394
1395 def _snapshot_path(self, dir_path, snapshot_name):
1396 return os.path.join(
3efd9988 1397 dir_path, self.rados.conf_get('client_snapdir'), snapshot_name
7c673cae
FG
1398 )
1399
f64942e4 1400 def _snapshot_create(self, dir_path, snapshot_name, mode=0o755):
7c673cae 1401 # TODO: raise intelligible exception for clusters where snaps are disabled
f64942e4 1402 self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), mode)
7c673cae
FG
1403
1404 def _snapshot_destroy(self, dir_path, snapshot_name):
1405 """
1406 Remove a snapshot, or do nothing if it already doesn't exist.
1407 """
1408 try:
1409 self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name))
1410 except cephfs.ObjectNotFound:
1411 log.warn("Snapshot was already gone: {0}".format(snapshot_name))
1412
f64942e4
AA
1413 def create_snapshot_volume(self, volume_path, snapshot_name, mode=0o755):
1414 self._snapshot_create(self._get_path(volume_path), snapshot_name, mode)
7c673cae
FG
1415
1416 def destroy_snapshot_volume(self, volume_path, snapshot_name):
1417 self._snapshot_destroy(self._get_path(volume_path), snapshot_name)
1418
f64942e4 1419 def create_snapshot_group(self, group_id, snapshot_name, mode=0o755):
7c673cae
FG
1420 if group_id is None:
1421 raise RuntimeError("Group ID may not be None")
1422
f64942e4
AA
1423 return self._snapshot_create(self._get_group_path(group_id), snapshot_name,
1424 mode)
7c673cae
FG
1425
1426 def destroy_snapshot_group(self, group_id, snapshot_name):
1427 if group_id is None:
1428 raise RuntimeError("Group ID may not be None")
1429 if snapshot_name is None:
1430 raise RuntimeError("Snapshot name may not be None")
1431
1432 return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name)
1433
1434 def _cp_r(self, src, dst):
1435 # TODO
1436 raise NotImplementedError()
1437
1438 def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name):
1439 dest_fs_path = self._get_path(dest_volume_path)
1440 src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name)
1441
1442 self._cp_r(src_snapshot_path, dest_fs_path)
3efd9988
FG
1443
1444 def put_object(self, pool_name, object_name, data):
1445 """
1446 Synchronously write data to an object.
1447
1448 :param pool_name: name of the pool
1449 :type pool_name: str
1450 :param object_name: name of the object
1451 :type object_name: str
1452 :param data: data to write
1453 :type data: bytes
1454 """
91327a77
AA
1455 return self.put_object_versioned(pool_name, object_name, data)
1456
1457 def put_object_versioned(self, pool_name, object_name, data, version=None):
1458 """
1459 Synchronously write data to an object only if version of the object
1460 version matches the expected version.
1461
1462 :param pool_name: name of the pool
1463 :type pool_name: str
1464 :param object_name: name of the object
1465 :type object_name: str
1466 :param data: data to write
1467 :type data: bytes
1468 :param version: expected version of the object to write
1469 :type version: int
1470 """
3efd9988 1471 ioctx = self.rados.open_ioctx(pool_name)
91327a77 1472
3efd9988
FG
1473 max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
1474 if len(data) > max_size:
1475 msg = ("Data to be written to object '{0}' exceeds "
1476 "{1} bytes".format(object_name, max_size))
1477 log.error(msg)
1478 raise CephFSVolumeClientError(msg)
91327a77 1479
3efd9988 1480 try:
81eedcae 1481 with rados.WriteOpCtx() as wop:
91327a77
AA
1482 if version is not None:
1483 wop.assert_version(version)
1484 wop.write_full(data)
1485 ioctx.operate_write_op(wop, object_name)
1486 except rados.OSError as e:
1487 log.error(e)
1488 raise e
3efd9988
FG
1489 finally:
1490 ioctx.close()
1491
1492 def get_object(self, pool_name, object_name):
1493 """
1494 Synchronously read data from object.
1495
1496 :param pool_name: name of the pool
1497 :type pool_name: str
1498 :param object_name: name of the object
1499 :type object_name: str
1500
1501 :returns: bytes - data read from object
1502 """
91327a77
AA
1503 return self.get_object_and_version(pool_name, object_name)[0]
1504
1505 def get_object_and_version(self, pool_name, object_name):
1506 """
1507 Synchronously read data from object and get its version.
1508
1509 :param pool_name: name of the pool
1510 :type pool_name: str
1511 :param object_name: name of the object
1512 :type object_name: str
1513
1514 :returns: tuple of object data and version
1515 """
3efd9988
FG
1516 ioctx = self.rados.open_ioctx(pool_name)
1517 max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
1518 try:
1519 bytes_read = ioctx.read(object_name, max_size)
1520 if ((len(bytes_read) == max_size) and
1521 (ioctx.read(object_name, 1, offset=max_size))):
1522 log.warning("Size of object {0} exceeds '{1}' bytes "
1523 "read".format(object_name, max_size))
91327a77 1524 obj_version = ioctx.get_last_version()
3efd9988
FG
1525 finally:
1526 ioctx.close()
91327a77 1527 return (bytes_read, obj_version)
3efd9988
FG
1528
1529 def delete_object(self, pool_name, object_name):
1530 ioctx = self.rados.open_ioctx(pool_name)
1531 try:
1532 ioctx.remove_object(object_name)
1533 except rados.ObjectNotFound:
1534 log.warn("Object '{0}' was already removed".format(object_name))
1535 finally:
1536 ioctx.close()