]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/ceph_volume_client.py
b748f5d85f784abf3cd2a30b7b52f17c36580bb7
[ceph.git] / ceph / src / pybind / ceph_volume_client.py
1 """
2 Copyright (C) 2015 Red Hat, Inc.
3
4 LGPL-2.1 or LGPL-3.0. See file COPYING.
5 """
6
7 from contextlib import contextmanager
8 import errno
9 import fcntl
10 import json
11 import logging
12 import os
13 import re
14 import struct
15 import sys
16 import threading
17 import time
18 import uuid
19
20 from ceph_argparse import json_command
21
22 import cephfs
23 import rados
24
25 def to_bytes(param):
26 '''
27 Helper method that returns byte representation of the given parameter.
28 '''
29 if isinstance(param, str):
30 return param.encode('utf-8')
31 elif param is None:
32 return param
33 else:
34 return str(param).encode('utf-8')
35
36 class RadosError(Exception):
37 """
38 Something went wrong talking to Ceph with librados
39 """
40 pass
41
42
43 RADOS_TIMEOUT = 10
44
45 log = logging.getLogger(__name__)
46
47 # Reserved volume group name which we use in paths for volumes
48 # that are not assigned to a group (i.e. created with group=None)
49 NO_GROUP_NAME = "_nogroup"
50
51 # Filename extensions for meta files.
52 META_FILE_EXT = ".meta"
53
54 class VolumePath(object):
55 """
56 Identify a volume's path as group->volume
57 The Volume ID is a unique identifier, but this is a much more
58 helpful thing to pass around.
59 """
60 def __init__(self, group_id, volume_id):
61 self.group_id = group_id
62 self.volume_id = volume_id
63 assert self.group_id != NO_GROUP_NAME
64 assert self.volume_id != "" and self.volume_id is not None
65
66 def __str__(self):
67 return "{0}/{1}".format(self.group_id, self.volume_id)
68
69
70 class ClusterTimeout(Exception):
71 """
72 Exception indicating that we timed out trying to talk to the Ceph cluster,
73 either to the mons, or to any individual daemon that the mons indicate ought
74 to be up but isn't responding to us.
75 """
76 pass
77
78
79 class ClusterError(Exception):
80 """
81 Exception indicating that the cluster returned an error to a command that
82 we thought should be successful based on our last knowledge of the cluster
83 state.
84 """
85 def __init__(self, action, result_code, result_str):
86 self._action = action
87 self._result_code = result_code
88 self._result_str = result_str
89
90 def __str__(self):
91 return "Error {0} (\"{1}\") while {2}".format(
92 self._result_code, self._result_str, self._action)
93
94
95 class RankEvicter(threading.Thread):
96 """
97 Thread for evicting client(s) from a particular MDS daemon instance.
98
99 This is more complex than simply sending a command, because we have to
100 handle cases where MDS daemons might not be fully up yet, and/or might
101 be transiently unresponsive to commands.
102 """
103 class GidGone(Exception):
104 pass
105
106 POLL_PERIOD = 5
107
108 def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout):
109 """
110 :param client_spec: list of strings, used as filter arguments to "session evict"
111 pass ["id=123"] to evict a single client with session id 123.
112 """
113 self.rank = rank
114 self.gid = gid
115 self._mds_map = mds_map
116 self._client_spec = client_spec
117 self._volume_client = volume_client
118 self._ready_timeout = ready_timeout
119 self._ready_waited = 0
120
121 self.success = False
122 self.exception = None
123
124 super(RankEvicter, self).__init__()
125
126 def _ready_to_evict(self):
127 if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid:
128 log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format(
129 self._client_spec, self.rank, self.gid
130 ))
131 raise RankEvicter.GidGone()
132
133 info = self._mds_map['info']["gid_{0}".format(self.gid)]
134 log.debug("_ready_to_evict: state={0}".format(info['state']))
135 return info['state'] in ["up:active", "up:clientreplay"]
136
137 def _wait_for_ready(self):
138 """
139 Wait for that MDS rank to reach an active or clientreplay state, and
140 not be laggy.
141 """
142 while not self._ready_to_evict():
143 if self._ready_waited > self._ready_timeout:
144 raise ClusterTimeout()
145
146 time.sleep(self.POLL_PERIOD)
147 self._ready_waited += self.POLL_PERIOD
148
149 self._mds_map = self._volume_client.get_mds_map()
150
151 def _evict(self):
152 """
153 Run the eviction procedure. Return true on success, false on errors.
154 """
155
156 # Wait til the MDS is believed by the mon to be available for commands
157 try:
158 self._wait_for_ready()
159 except self.GidGone:
160 return True
161
162 # Then send it an evict
163 ret = errno.ETIMEDOUT
164 while ret == errno.ETIMEDOUT:
165 log.debug("mds_command: {0}, {1}".format(
166 "%s" % self.gid, ["session", "evict"] + self._client_spec
167 ))
168 ret, outb, outs = self._volume_client.fs.mds_command(
169 "%s" % self.gid,
170 [json.dumps({
171 "prefix": "session evict",
172 "filters": self._client_spec
173 })], "")
174 log.debug("mds_command: complete {0} {1}".format(ret, outs))
175
176 # If we get a clean response, great, it's gone from that rank.
177 if ret == 0:
178 return True
179 elif ret == errno.ETIMEDOUT:
180 # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error)
181 self._mds_map = self._volume_client.get_mds_map()
182 try:
183 self._wait_for_ready()
184 except self.GidGone:
185 return True
186 else:
187 raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs)
188
189 def run(self):
190 try:
191 self._evict()
192 except Exception as e:
193 self.success = False
194 self.exception = e
195 else:
196 self.success = True
197
198
199 class EvictionError(Exception):
200 pass
201
202
203 class CephFSVolumeClientError(Exception):
204 """
205 Something went wrong talking to Ceph using CephFSVolumeClient.
206 """
207 pass
208
209
210 CEPHFSVOLUMECLIENT_VERSION_HISTORY = """
211
212 CephFSVolumeClient Version History:
213
214 * 1 - Initial version
215 * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient
216 * 3 - Allow volumes to be created without RADOS namespace isolation
217 * 4 - Added get_object_and_version, put_object_versioned method to CephFSVolumeClient
218 * 5 - Disallow authorize API for users not created by CephFSVolumeClient
219 """
220
221
222 class CephFSVolumeClient(object):
223 """
224 Combine libcephfs and librados interfaces to implement a
225 'Volume' concept implemented as a cephfs directory and
226 client capabilities which restrict mount access to this
227 directory.
228
229 Additionally, volumes may be in a 'Group'. Conveniently,
230 volumes are a lot like manila shares, and groups are a lot
231 like manila consistency groups.
232
233 Refer to volumes with VolumePath, which specifies the
234 volume and group IDs (both strings). The group ID may
235 be None.
236
237 In general, functions in this class are allowed raise rados.Error
238 or cephfs.Error exceptions in unexpected situations.
239 """
240
241 # Current version
242 version = 5
243
244 # Where shall we create our volumes?
245 POOL_PREFIX = "fsvolume_"
246 DEFAULT_VOL_PREFIX = "/volumes"
247 DEFAULT_NS_PREFIX = "fsvolumens_"
248
249 def __init__(self, auth_id=None, conf_path=None, cluster_name=None,
250 volume_prefix=None, pool_ns_prefix=None, rados=None,
251 fs_name=None):
252 """
253 Either set all three of ``auth_id``, ``conf_path`` and
254 ``cluster_name`` (rados constructed on connect), or
255 set ``rados`` (existing rados instance).
256 """
257 self.fs = None
258 self.fs_name = fs_name
259 self.connected = False
260
261 self.conf_path = conf_path
262 self.cluster_name = cluster_name
263 self.auth_id = auth_id
264
265 self.rados = rados
266 if self.rados:
267 # Using an externally owned rados, so we won't tear it down
268 # on disconnect
269 self.own_rados = False
270 else:
271 # self.rados will be constructed in connect
272 self.own_rados = True
273
274 self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX
275 self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX
276 # For flock'ing in cephfs, I want a unique ID to distinguish me
277 # from any other manila-share services that are loading this module.
278 # We could use pid, but that's unnecessary weak: generate a
279 # UUID
280 self._id = struct.unpack(">Q", uuid.uuid1().bytes[0:8])[0]
281
282 # TODO: version the on-disk structures
283
284 def recover(self):
285 # Scan all auth keys to see if they're dirty: if they are, they have
286 # state that might not have propagated to Ceph or to the related
287 # volumes yet.
288
289 # Important: we *always* acquire locks in the order auth->volume
290 # That means a volume can never be dirty without the auth key
291 # we're updating it with being dirty at the same time.
292
293 # First list the auth IDs that have potentially dirty on-disk metadata
294 log.debug("Recovering from partial auth updates (if any)...")
295
296 try:
297 dir_handle = self.fs.opendir(self.volume_prefix)
298 except cephfs.ObjectNotFound:
299 log.debug("Nothing to recover. No auth meta files.")
300 return
301
302 d = self.fs.readdir(dir_handle)
303 auth_ids = []
304
305 if not d:
306 log.debug("Nothing to recover. No auth meta files.")
307
308 while d:
309 # Identify auth IDs from auth meta filenames. The auth meta files
310 # are named as, "$<auth_id><meta filename extension>"
311 regex = "^\$(.*){0}$".format(re.escape(META_FILE_EXT))
312 match = re.search(regex, d.d_name.decode(encoding='utf-8'))
313 if match:
314 auth_ids.append(match.group(1))
315
316 d = self.fs.readdir(dir_handle)
317
318 self.fs.closedir(dir_handle)
319
320 # Key points based on ordering:
321 # * Anything added in VMeta is already added in AMeta
322 # * Anything added in Ceph is already added in VMeta
323 # * Anything removed in VMeta is already removed in Ceph
324 # * Anything removed in AMeta is already removed in VMeta
325
326 # Deauthorization: because I only update metadata AFTER the
327 # update of the next level down, I have the same ordering of
328 # -> things which exist in the AMeta should also exist
329 # in the VMeta, should also exist in Ceph, and the same
330 # recovery procedure that gets me consistent after crashes
331 # during authorization will also work during deauthorization
332
333 # Now for each auth ID, check for dirty flag and apply updates
334 # if dirty flag is found
335 for auth_id in auth_ids:
336 with self._auth_lock(auth_id):
337 auth_meta = self._auth_metadata_get(auth_id)
338 if not auth_meta or not auth_meta['volumes']:
339 # Clean up auth meta file
340 self.fs.unlink(self._auth_metadata_path(auth_id))
341 continue
342 if not auth_meta['dirty']:
343 continue
344 self._recover_auth_meta(auth_id, auth_meta)
345
346 log.debug("Recovered from partial auth updates (if any).")
347
348 def _recover_auth_meta(self, auth_id, auth_meta):
349 """
350 Call me after locking the auth meta file.
351 """
352 remove_volumes = []
353
354 for volume, volume_data in auth_meta['volumes'].items():
355 if not volume_data['dirty']:
356 continue
357
358 (group_id, volume_id) = volume.split('/')
359 group_id = group_id if group_id != 'None' else None
360 volume_path = VolumePath(group_id, volume_id)
361 access_level = volume_data['access_level']
362
363 with self._volume_lock(volume_path):
364 vol_meta = self._volume_metadata_get(volume_path)
365
366 # No VMeta update indicates that there was no auth update
367 # in Ceph either. So it's safe to remove corresponding
368 # partial update in AMeta.
369 if not vol_meta or auth_id not in vol_meta['auths']:
370 remove_volumes.append(volume)
371 continue
372
373 want_auth = {
374 'access_level': access_level,
375 'dirty': False,
376 }
377 # VMeta update looks clean. Ceph auth update must have been
378 # clean.
379 if vol_meta['auths'][auth_id] == want_auth:
380 continue
381
382 readonly = access_level == 'r'
383 client_entity = "client.{0}".format(auth_id)
384 try:
385 existing_caps = self._rados_command(
386 'auth get',
387 {
388 'entity': client_entity
389 }
390 )
391 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
392 except rados.Error:
393 existing_caps = None
394 self._authorize_volume(volume_path, auth_id, readonly, existing_caps)
395
396 # Recovered from partial auth updates for the auth ID's access
397 # to a volume.
398 auth_meta['volumes'][volume]['dirty'] = False
399 self._auth_metadata_set(auth_id, auth_meta)
400
401 for volume in remove_volumes:
402 del auth_meta['volumes'][volume]
403
404 if not auth_meta['volumes']:
405 # Clean up auth meta file
406 self.fs.unlink(self._auth_metadata_path(auth_id))
407 return
408
409 # Recovered from all partial auth updates for the auth ID.
410 auth_meta['dirty'] = False
411 self._auth_metadata_set(auth_id, auth_meta)
412
413 def get_mds_map(self):
414 fs_map = self._rados_command("fs dump", {})
415 return fs_map['filesystems'][0]['mdsmap']
416
417 def evict(self, auth_id, timeout=30, volume_path=None):
418 """
419 Evict all clients based on the authorization ID and optionally based on
420 the volume path mounted. Assumes that the authorization key has been
421 revoked prior to calling this function.
422
423 This operation can throw an exception if the mon cluster is unresponsive, or
424 any individual MDS daemon is unresponsive for longer than the timeout passed in.
425 """
426
427 client_spec = ["auth_name={0}".format(auth_id), ]
428 if volume_path:
429 client_spec.append("client_metadata.root={0}".
430 format(self._get_path(volume_path)))
431
432 log.info("evict clients with {0}".format(', '.join(client_spec)))
433
434 mds_map = self.get_mds_map()
435 up = {}
436 for name, gid in mds_map['up'].items():
437 # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
438 assert name.startswith("mds_")
439 up[int(name[4:])] = gid
440
441 # For all MDS ranks held by a daemon
442 # Do the parallelism in python instead of using "tell mds.*", because
443 # the latter doesn't give us per-mds output
444 threads = []
445 for rank, gid in up.items():
446 thread = RankEvicter(self, client_spec, rank, gid, mds_map,
447 timeout)
448 thread.start()
449 threads.append(thread)
450
451 for t in threads:
452 t.join()
453
454 log.info("evict: joined all")
455
456 for t in threads:
457 if not t.success:
458 msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}".
459 format(', '.join(client_spec), t.rank, t.gid, t.exception)
460 )
461 log.error(msg)
462 raise EvictionError(msg)
463
464 def _get_path(self, volume_path):
465 """
466 Determine the path within CephFS where this volume will live
467 :return: absolute path (string)
468 """
469 return os.path.join(
470 self.volume_prefix,
471 volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME,
472 volume_path.volume_id)
473
474 def _get_group_path(self, group_id):
475 if group_id is None:
476 raise ValueError("group_id may not be None")
477
478 return os.path.join(
479 self.volume_prefix,
480 group_id
481 )
482
483 def _connect(self, premount_evict):
484 log.debug("Connecting to cephfs...")
485 self.fs = cephfs.LibCephFS(rados_inst=self.rados)
486 log.debug("CephFS initializing...")
487 self.fs.init()
488 if premount_evict is not None:
489 log.debug("Premount eviction of {0} starting".format(premount_evict))
490 self.evict(premount_evict)
491 log.debug("Premount eviction of {0} completes".format(premount_evict))
492 log.debug("CephFS mounting...")
493 self.fs.mount(filesystem_name=to_bytes(self.fs_name))
494 log.debug("Connection to cephfs complete")
495
496 # Recover from partial auth updates due to a previous
497 # crash.
498 self.recover()
499
500 def connect(self, premount_evict = None):
501 """
502
503 :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers
504 may want to use this to specify their own auth ID if they expect
505 to be a unique instance and don't want to wait for caps to time
506 out after failure of another instance of themselves.
507 """
508 if self.own_rados:
509 log.debug("Configuring to RADOS with config {0}...".format(self.conf_path))
510 self.rados = rados.Rados(
511 name="client.{0}".format(self.auth_id),
512 clustername=self.cluster_name,
513 conffile=self.conf_path,
514 conf={}
515 )
516 if self.rados.state != "connected":
517 log.debug("Connecting to RADOS...")
518 self.rados.connect()
519 log.debug("Connection to RADOS complete")
520 self._connect(premount_evict)
521
522 def get_mon_addrs(self):
523 log.info("get_mon_addrs")
524 result = []
525 mon_map = self._rados_command("mon dump")
526 for mon in mon_map['mons']:
527 ip_port = mon['addr'].split("/")[0]
528 result.append(ip_port)
529
530 return result
531
532 def disconnect(self):
533 log.info("disconnect")
534 if self.fs:
535 log.debug("Disconnecting cephfs...")
536 self.fs.shutdown()
537 self.fs = None
538 log.debug("Disconnecting cephfs complete")
539
540 if self.rados and self.own_rados:
541 log.debug("Disconnecting rados...")
542 self.rados.shutdown()
543 self.rados = None
544 log.debug("Disconnecting rados complete")
545
546 def __enter__(self):
547 self.connect()
548 return self
549
550 def __exit__(self, exc_type, exc_val, exc_tb):
551 self.disconnect()
552
553 def __del__(self):
554 self.disconnect()
555
556 def _get_pool_id(self, osd_map, pool_name):
557 # Maybe borrow the OSDMap wrapper class from calamari if more helpers
558 # like this are needed.
559 for pool in osd_map['pools']:
560 if pool['pool_name'] == pool_name:
561 return pool['pool']
562
563 return None
564
565 def _create_volume_pool(self, pool_name):
566 """
567 Idempotently create a pool for use as a CephFS data pool, with the given name
568
569 :return The ID of the created pool
570 """
571 osd_map = self._rados_command('osd dump', {})
572
573 existing_id = self._get_pool_id(osd_map, pool_name)
574 if existing_id is not None:
575 log.info("Pool {0} already exists".format(pool_name))
576 return existing_id
577
578 self._rados_command(
579 'osd pool create',
580 {
581 'pool': pool_name,
582 }
583 )
584
585 osd_map = self._rados_command('osd dump', {})
586 pool_id = self._get_pool_id(osd_map, pool_name)
587
588 if pool_id is None:
589 # If the pool isn't there, that's either a ceph bug, or it's some outside influence
590 # removing it right after we created it.
591 log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format(
592 pool_name, json.dumps(osd_map, indent=2)
593 ))
594 raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name))
595 else:
596 return pool_id
597
598 def create_group(self, group_id, mode=0o755):
599 # Prevent craftily-named volume groups from colliding with the meta
600 # files.
601 if group_id.endswith(META_FILE_EXT):
602 raise ValueError("group ID cannot end with '{0}'.".format(
603 META_FILE_EXT))
604 path = self._get_group_path(group_id)
605 self._mkdir_p(path, mode)
606
607 def destroy_group(self, group_id):
608 path = self._get_group_path(group_id)
609 try:
610 self.fs.stat(self.volume_prefix)
611 except cephfs.ObjectNotFound:
612 pass
613 else:
614 self.fs.rmdir(path)
615
616 def _mkdir_p(self, path, mode=0o755):
617 try:
618 self.fs.stat(path)
619 except cephfs.ObjectNotFound:
620 pass
621 else:
622 return
623
624 parts = path.split(os.path.sep)
625
626 for i in range(1, len(parts) + 1):
627 subpath = os.path.join(*parts[0:i])
628 try:
629 self.fs.stat(subpath)
630 except cephfs.ObjectNotFound:
631 self.fs.mkdir(subpath, mode)
632
633 def create_volume(self, volume_path, size=None, data_isolated=False, namespace_isolated=True,
634 mode=0o755):
635 """
636 Set up metadata, pools and auth for a volume.
637
638 This function is idempotent. It is safe to call this again
639 for an already-created volume, even if it is in use.
640
641 :param volume_path: VolumePath instance
642 :param size: In bytes, or None for no size limit
643 :param data_isolated: If true, create a separate OSD pool for this volume
644 :param namespace_isolated: If true, use separate RADOS namespace for this volume
645 :return:
646 """
647 path = self._get_path(volume_path)
648 log.info("create_volume: {0}".format(path))
649
650 self._mkdir_p(path, mode)
651
652 if size is not None:
653 self.fs.setxattr(path, 'ceph.quota.max_bytes', to_bytes(size), 0)
654
655 # data_isolated means create a separate pool for this volume
656 if data_isolated:
657 pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
658 log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name))
659 pool_id = self._create_volume_pool(pool_name)
660 mds_map = self.get_mds_map()
661 if pool_id not in mds_map['data_pools']:
662 self._rados_command("fs add_data_pool", {
663 'fs_name': mds_map['fs_name'],
664 'pool': pool_name
665 })
666 time.sleep(5) # time for MDSMap to be distributed
667 self.fs.setxattr(path, 'ceph.dir.layout.pool', to_bytes(pool_name), 0)
668
669 # enforce security isolation, use separate namespace for this volume
670 if namespace_isolated:
671 namespace = "{0}{1}".format(self.pool_ns_prefix, volume_path.volume_id)
672 log.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path, namespace))
673 self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace',
674 to_bytes(namespace), 0)
675 else:
676 # If volume's namespace layout is not set, then the volume's pool
677 # layout remains unset and will undesirably change with ancestor's
678 # pool layout changes.
679 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
680 self.fs.setxattr(path, 'ceph.dir.layout.pool',
681 to_bytes(pool_name), 0)
682
683 # Create a volume meta file, if it does not already exist, to store
684 # data about auth ids having access to the volume
685 fd = self.fs.open(self._volume_metadata_path(volume_path),
686 os.O_CREAT, 0o755)
687 self.fs.close(fd)
688
689 return {
690 'mount_path': path
691 }
692
693 def delete_volume(self, volume_path, data_isolated=False):
694 """
695 Make a volume inaccessible to guests. This function is
696 idempotent. This is the fast part of tearing down a volume: you must
697 also later call purge_volume, which is the slow part.
698
699 :param volume_path: Same identifier used in create_volume
700 :return:
701 """
702
703 path = self._get_path(volume_path)
704 log.info("delete_volume: {0}".format(path))
705
706 # Create the trash folder if it doesn't already exist
707 trash = os.path.join(self.volume_prefix, "_deleting")
708 self._mkdir_p(trash)
709
710 # We'll move it to here
711 trashed_volume = os.path.join(trash, volume_path.volume_id)
712
713 # Move the volume's data to the trash folder
714 try:
715 self.fs.stat(path)
716 except cephfs.ObjectNotFound:
717 log.warning("Trying to delete volume '{0}' but it's already gone".format(
718 path))
719 else:
720 self.fs.rename(path, trashed_volume)
721
722 # Delete the volume meta file, if it's not already deleted
723 vol_meta_path = self._volume_metadata_path(volume_path)
724 try:
725 self.fs.unlink(vol_meta_path)
726 except cephfs.ObjectNotFound:
727 pass
728
729 def purge_volume(self, volume_path, data_isolated=False):
730 """
731 Finish clearing up a volume that was previously passed to delete_volume. This
732 function is idempotent.
733 """
734
735 trash = os.path.join(self.volume_prefix, "_deleting")
736 trashed_volume = os.path.join(trash, volume_path.volume_id)
737
738 try:
739 self.fs.stat(trashed_volume)
740 except cephfs.ObjectNotFound:
741 log.warning("Trying to purge volume '{0}' but it's already been purged".format(
742 trashed_volume))
743 return
744
745 def rmtree(root_path):
746 log.debug("rmtree {0}".format(root_path))
747 dir_handle = self.fs.opendir(root_path)
748 d = self.fs.readdir(dir_handle)
749 while d:
750 d_name = d.d_name.decode(encoding='utf-8')
751 if d_name not in [".", ".."]:
752 # Do not use os.path.join because it is sensitive
753 # to string encoding, we just pass through dnames
754 # as byte arrays
755 d_full = u"{0}/{1}".format(root_path, d_name)
756 if d.is_dir():
757 rmtree(d_full)
758 else:
759 self.fs.unlink(d_full)
760
761 d = self.fs.readdir(dir_handle)
762 self.fs.closedir(dir_handle)
763
764 self.fs.rmdir(root_path)
765
766 rmtree(trashed_volume)
767
768 if data_isolated:
769 pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
770 osd_map = self._rados_command("osd dump", {})
771 pool_id = self._get_pool_id(osd_map, pool_name)
772 mds_map = self.get_mds_map()
773 if pool_id in mds_map['data_pools']:
774 self._rados_command("fs rm_data_pool", {
775 'fs_name': mds_map['fs_name'],
776 'pool': pool_name
777 })
778 self._rados_command("osd pool delete",
779 {
780 "pool": pool_name,
781 "pool2": pool_name,
782 "yes_i_really_really_mean_it": True
783 })
784
785 def _get_ancestor_xattr(self, path, attr):
786 """
787 Helper for reading layout information: if this xattr is missing
788 on the requested path, keep checking parents until we find it.
789 """
790 try:
791 result = self.fs.getxattr(path, attr).decode()
792 if result == "":
793 # Annoying! cephfs gives us empty instead of an error when attr not found
794 raise cephfs.NoData()
795 else:
796 return result
797 except cephfs.NoData:
798 if path == "/":
799 raise
800 else:
801 return self._get_ancestor_xattr(os.path.split(path)[0], attr)
802
803 def _check_compat_version(self, compat_version):
804 if self.version < compat_version:
805 msg = ("The current version of CephFSVolumeClient, version {0} "
806 "does not support the required feature. Need version {1} "
807 "or greater".format(self.version, compat_version)
808 )
809 log.error(msg)
810 raise CephFSVolumeClientError(msg)
811
812 def _metadata_get(self, path):
813 """
814 Return a deserialized JSON object, or None
815 """
816 fd = self.fs.open(path, "r")
817 # TODO iterate instead of assuming file < 4MB
818 read_bytes = self.fs.read(fd, 0, 4096 * 1024)
819 self.fs.close(fd)
820 if read_bytes:
821 return json.loads(read_bytes.decode())
822 else:
823 return None
824
825 def _metadata_set(self, path, data):
826 serialized = json.dumps(data)
827 fd = self.fs.open(path, "w")
828 try:
829 self.fs.write(fd, to_bytes(serialized), 0)
830 self.fs.fsync(fd, 0)
831 finally:
832 self.fs.close(fd)
833
834 def _lock(self, path):
835 @contextmanager
836 def fn():
837 while(1):
838 fd = self.fs.open(path, os.O_CREAT, 0o755)
839 self.fs.flock(fd, fcntl.LOCK_EX, self._id)
840
841 # The locked file will be cleaned up sometime. It could be
842 # unlinked e.g., by an another manila share instance, before
843 # lock was applied on it. Perform checks to ensure that this
844 # does not happen.
845 try:
846 statbuf = self.fs.stat(path)
847 except cephfs.ObjectNotFound:
848 self.fs.close(fd)
849 continue
850
851 fstatbuf = self.fs.fstat(fd)
852 if statbuf.st_ino == fstatbuf.st_ino:
853 break
854
855 try:
856 yield
857 finally:
858 self.fs.flock(fd, fcntl.LOCK_UN, self._id)
859 self.fs.close(fd)
860
861 return fn()
862
863 def _auth_metadata_path(self, auth_id):
864 return os.path.join(self.volume_prefix, "${0}{1}".format(
865 auth_id, META_FILE_EXT))
866
867 def _auth_lock(self, auth_id):
868 return self._lock(self._auth_metadata_path(auth_id))
869
870 def _auth_metadata_get(self, auth_id):
871 """
872 Call me with the metadata locked!
873
874 Check whether a auth metadata structure can be decoded by the current
875 version of CephFSVolumeClient.
876
877 Return auth metadata that the current version of CephFSVolumeClient
878 can decode.
879 """
880 auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id))
881
882 if auth_metadata:
883 self._check_compat_version(auth_metadata['compat_version'])
884
885 return auth_metadata
886
887 def _auth_metadata_set(self, auth_id, data):
888 """
889 Call me with the metadata locked!
890
891 Fsync the auth metadata.
892
893 Add two version attributes to the auth metadata,
894 'compat_version', the minimum CephFSVolumeClient version that can
895 decode the metadata, and 'version', the CephFSVolumeClient version
896 that encoded the metadata.
897 """
898 data['compat_version'] = 1
899 data['version'] = self.version
900 return self._metadata_set(self._auth_metadata_path(auth_id), data)
901
902 def _volume_metadata_path(self, volume_path):
903 return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format(
904 volume_path.group_id if volume_path.group_id else "",
905 volume_path.volume_id,
906 META_FILE_EXT
907 ))
908
909 def _volume_lock(self, volume_path):
910 """
911 Return a ContextManager which locks the authorization metadata for
912 a particular volume, and persists a flag to the metadata indicating
913 that it is currently locked, so that we can detect dirty situations
914 during recovery.
915
916 This lock isn't just to make access to the metadata safe: it's also
917 designed to be used over the two-step process of checking the
918 metadata and then responding to an authorization request, to
919 ensure that at the point we respond the metadata hasn't changed
920 in the background. It's key to how we avoid security holes
921 resulting from races during that problem ,
922 """
923 return self._lock(self._volume_metadata_path(volume_path))
924
925 def _volume_metadata_get(self, volume_path):
926 """
927 Call me with the metadata locked!
928
929 Check whether a volume metadata structure can be decoded by the current
930 version of CephFSVolumeClient.
931
932 Return a volume_metadata structure that the current version of
933 CephFSVolumeClient can decode.
934 """
935 volume_metadata = self._metadata_get(self._volume_metadata_path(volume_path))
936
937 if volume_metadata:
938 self._check_compat_version(volume_metadata['compat_version'])
939
940 return volume_metadata
941
942 def _volume_metadata_set(self, volume_path, data):
943 """
944 Call me with the metadata locked!
945
946 Add two version attributes to the volume metadata,
947 'compat_version', the minimum CephFSVolumeClient version that can
948 decode the metadata and 'version', the CephFSVolumeClient version
949 that encoded the metadata.
950 """
951 data['compat_version'] = 1
952 data['version'] = self.version
953 return self._metadata_set(self._volume_metadata_path(volume_path), data)
954
955 def _prepare_updated_caps_list(self, existing_caps, mds_cap_str, osd_cap_str, authorize=True):
956 caps_list = []
957 for k, v in existing_caps['caps'].items():
958 if k == 'mds' or k == 'osd':
959 continue
960 elif k == 'mon':
961 if not authorize and v == 'allow r':
962 continue
963 caps_list.extend((k,v))
964
965 if mds_cap_str:
966 caps_list.extend(('mds', mds_cap_str))
967 if osd_cap_str:
968 caps_list.extend(('osd', osd_cap_str))
969
970 if authorize and 'mon' not in caps_list:
971 caps_list.extend(('mon', 'allow r'))
972
973 return caps_list
974
975 def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None, allow_existing_id=False):
976 """
977 Get-or-create a Ceph auth identity for `auth_id` and grant them access
978 to
979 :param volume_path:
980 :param auth_id:
981 :param readonly:
982 :param tenant_id: Optionally provide a stringizable object to
983 restrict any created cephx IDs to other callers
984 passing the same tenant ID.
985 :allow_existing_id: Optionally authorize existing auth-ids not
986 created by ceph_volume_client
987 :return:
988 """
989
990 with self._auth_lock(auth_id):
991 client_entity = "client.{0}".format(auth_id)
992 try:
993 existing_caps = self._rados_command(
994 'auth get',
995 {
996 'entity': client_entity
997 }
998 )
999 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1000 except rados.Error:
1001 existing_caps = None
1002
1003 # Existing meta, or None, to be updated
1004 auth_meta = self._auth_metadata_get(auth_id)
1005
1006 # volume data to be inserted
1007 volume_path_str = str(volume_path)
1008 volume = {
1009 volume_path_str : {
1010 # The access level at which the auth_id is authorized to
1011 # access the volume.
1012 'access_level': 'r' if readonly else 'rw',
1013 'dirty': True,
1014 }
1015 }
1016
1017 if auth_meta is None:
1018 if not allow_existing_id and existing_caps is not None:
1019 msg = "auth ID: {0} exists and not created by ceph_volume_client. Not allowed to modify".format(auth_id)
1020 log.error(msg)
1021 raise CephFSVolumeClientError(msg)
1022
1023 # non-existent auth IDs
1024 sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format(
1025 auth_id, tenant_id
1026 ))
1027 log.debug("Authorize: no existing meta")
1028 auth_meta = {
1029 'dirty': True,
1030 'tenant_id': tenant_id.__str__() if tenant_id else None,
1031 'volumes': volume
1032 }
1033 else:
1034 # Disallow tenants to share auth IDs
1035 if auth_meta['tenant_id'].__str__() != tenant_id.__str__():
1036 msg = "auth ID: {0} is already in use".format(auth_id)
1037 log.error(msg)
1038 raise CephFSVolumeClientError(msg)
1039
1040 if auth_meta['dirty']:
1041 self._recover_auth_meta(auth_id, auth_meta)
1042
1043 log.debug("Authorize: existing tenant {tenant}".format(
1044 tenant=auth_meta['tenant_id']
1045 ))
1046 auth_meta['dirty'] = True
1047 auth_meta['volumes'].update(volume)
1048
1049 self._auth_metadata_set(auth_id, auth_meta)
1050
1051 with self._volume_lock(volume_path):
1052 key = self._authorize_volume(volume_path, auth_id, readonly, existing_caps)
1053
1054 auth_meta['dirty'] = False
1055 auth_meta['volumes'][volume_path_str]['dirty'] = False
1056 self._auth_metadata_set(auth_id, auth_meta)
1057
1058 if tenant_id:
1059 return {
1060 'auth_key': key
1061 }
1062 else:
1063 # Caller wasn't multi-tenant aware: be safe and don't give
1064 # them a key
1065 return {
1066 'auth_key': None
1067 }
1068
1069 def _authorize_volume(self, volume_path, auth_id, readonly, existing_caps):
1070 vol_meta = self._volume_metadata_get(volume_path)
1071
1072 access_level = 'r' if readonly else 'rw'
1073 auth = {
1074 auth_id: {
1075 'access_level': access_level,
1076 'dirty': True,
1077 }
1078 }
1079
1080 if vol_meta is None:
1081 vol_meta = {
1082 'auths': auth
1083 }
1084 else:
1085 vol_meta['auths'].update(auth)
1086 self._volume_metadata_set(volume_path, vol_meta)
1087
1088 key = self._authorize_ceph(volume_path, auth_id, readonly, existing_caps)
1089
1090 vol_meta['auths'][auth_id]['dirty'] = False
1091 self._volume_metadata_set(volume_path, vol_meta)
1092
1093 return key
1094
1095 def _authorize_ceph(self, volume_path, auth_id, readonly, existing_caps):
1096 path = self._get_path(volume_path)
1097 log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(
1098 auth_id, path
1099 ))
1100
1101 # First I need to work out what the data pool is for this share:
1102 # read the layout
1103 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
1104
1105 try:
1106 namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_"
1107 "namespace").decode()
1108 except cephfs.NoData:
1109 namespace = None
1110
1111 # Now construct auth capabilities that give the guest just enough
1112 # permissions to access the share
1113 client_entity = "client.{0}".format(auth_id)
1114 want_access_level = 'r' if readonly else 'rw'
1115 want_mds_cap = 'allow {0} path={1}'.format(want_access_level, path)
1116 if namespace:
1117 want_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
1118 want_access_level, pool_name, namespace)
1119 else:
1120 want_osd_cap = 'allow {0} pool={1}'.format(want_access_level,
1121 pool_name)
1122
1123 if existing_caps is None:
1124 caps = self._rados_command(
1125 'auth get-or-create',
1126 {
1127 'entity': client_entity,
1128 'caps': [
1129 'mds', want_mds_cap,
1130 'osd', want_osd_cap,
1131 'mon', 'allow r']
1132 })
1133 else:
1134 # entity exists, update it
1135 cap = existing_caps[0]
1136
1137 # Construct auth caps that if present might conflict with the desired
1138 # auth caps.
1139 unwanted_access_level = 'r' if want_access_level == 'rw' else 'rw'
1140 unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path)
1141 if namespace:
1142 unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
1143 unwanted_access_level, pool_name, namespace)
1144 else:
1145 unwanted_osd_cap = 'allow {0} pool={1}'.format(
1146 unwanted_access_level, pool_name)
1147
1148 def cap_update(
1149 orig_mds_caps, orig_osd_caps, want_mds_cap,
1150 want_osd_cap, unwanted_mds_cap, unwanted_osd_cap):
1151
1152 if not orig_mds_caps:
1153 return want_mds_cap, want_osd_cap
1154
1155 mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")]
1156 osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")]
1157
1158 if want_mds_cap in mds_cap_tokens:
1159 return orig_mds_caps, orig_osd_caps
1160
1161 if unwanted_mds_cap in mds_cap_tokens:
1162 mds_cap_tokens.remove(unwanted_mds_cap)
1163 osd_cap_tokens.remove(unwanted_osd_cap)
1164
1165 mds_cap_tokens.append(want_mds_cap)
1166 osd_cap_tokens.append(want_osd_cap)
1167
1168 return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens)
1169
1170 orig_mds_caps = cap['caps'].get('mds', "")
1171 orig_osd_caps = cap['caps'].get('osd', "")
1172
1173 mds_cap_str, osd_cap_str = cap_update(
1174 orig_mds_caps, orig_osd_caps, want_mds_cap, want_osd_cap,
1175 unwanted_mds_cap, unwanted_osd_cap)
1176
1177 caps_list = self._prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str)
1178 caps = self._rados_command(
1179 'auth caps',
1180 {
1181 'entity': client_entity,
1182 'caps': caps_list
1183 })
1184
1185 caps = self._rados_command(
1186 'auth get',
1187 {
1188 'entity': client_entity
1189 }
1190 )
1191
1192 # Result expected like this:
1193 # [
1194 # {
1195 # "entity": "client.foobar",
1196 # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==",
1197 # "caps": {
1198 # "mds": "allow *",
1199 # "mon": "allow *"
1200 # }
1201 # }
1202 # ]
1203 assert len(caps) == 1
1204 assert caps[0]['entity'] == client_entity
1205 return caps[0]['key']
1206
1207 def deauthorize(self, volume_path, auth_id):
1208 with self._auth_lock(auth_id):
1209 # Existing meta, or None, to be updated
1210 auth_meta = self._auth_metadata_get(auth_id)
1211
1212 volume_path_str = str(volume_path)
1213 if (auth_meta is None) or (not auth_meta['volumes']):
1214 log.warning("deauthorized called for already-removed auth"
1215 "ID '{auth_id}' for volume ID '{volume}'".format(
1216 auth_id=auth_id, volume=volume_path.volume_id
1217 ))
1218 # Clean up the auth meta file of an auth ID
1219 self.fs.unlink(self._auth_metadata_path(auth_id))
1220 return
1221
1222 if volume_path_str not in auth_meta['volumes']:
1223 log.warning("deauthorized called for already-removed auth"
1224 "ID '{auth_id}' for volume ID '{volume}'".format(
1225 auth_id=auth_id, volume=volume_path.volume_id
1226 ))
1227 return
1228
1229 if auth_meta['dirty']:
1230 self._recover_auth_meta(auth_id, auth_meta)
1231
1232 auth_meta['dirty'] = True
1233 auth_meta['volumes'][volume_path_str]['dirty'] = True
1234 self._auth_metadata_set(auth_id, auth_meta)
1235
1236 self._deauthorize_volume(volume_path, auth_id)
1237
1238 # Filter out the volume we're deauthorizing
1239 del auth_meta['volumes'][volume_path_str]
1240
1241 # Clean up auth meta file
1242 if not auth_meta['volumes']:
1243 self.fs.unlink(self._auth_metadata_path(auth_id))
1244 return
1245
1246 auth_meta['dirty'] = False
1247 self._auth_metadata_set(auth_id, auth_meta)
1248
1249 def _deauthorize_volume(self, volume_path, auth_id):
1250 with self._volume_lock(volume_path):
1251 vol_meta = self._volume_metadata_get(volume_path)
1252
1253 if (vol_meta is None) or (auth_id not in vol_meta['auths']):
1254 log.warning("deauthorized called for already-removed auth"
1255 "ID '{auth_id}' for volume ID '{volume}'".format(
1256 auth_id=auth_id, volume=volume_path.volume_id
1257 ))
1258 return
1259
1260 vol_meta['auths'][auth_id]['dirty'] = True
1261 self._volume_metadata_set(volume_path, vol_meta)
1262
1263 self._deauthorize(volume_path, auth_id)
1264
1265 # Remove the auth_id from the metadata *after* removing it
1266 # from ceph, so that if we crashed here, we would actually
1267 # recreate the auth ID during recovery (i.e. end up with
1268 # a consistent state).
1269
1270 # Filter out the auth we're removing
1271 del vol_meta['auths'][auth_id]
1272 self._volume_metadata_set(volume_path, vol_meta)
1273
1274 def _deauthorize(self, volume_path, auth_id):
1275 """
1276 The volume must still exist.
1277 """
1278 client_entity = "client.{0}".format(auth_id)
1279 path = self._get_path(volume_path)
1280 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
1281 try:
1282 namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_"
1283 "namespace").decode()
1284 except cephfs.NoData:
1285 namespace = None
1286
1287 # The auth_id might have read-only or read-write mount access for the
1288 # volume path.
1289 access_levels = ('r', 'rw')
1290 want_mds_caps = ['allow {0} path={1}'.format(access_level, path)
1291 for access_level in access_levels]
1292 if namespace:
1293 want_osd_caps = ['allow {0} pool={1} namespace={2}'.format(access_level, pool_name, namespace)
1294 for access_level in access_levels]
1295 else:
1296 want_osd_caps = ['allow {0} pool={1}'.format(access_level, pool_name)
1297 for access_level in access_levels]
1298
1299
1300 try:
1301 existing = self._rados_command(
1302 'auth get',
1303 {
1304 'entity': client_entity
1305 }
1306 )
1307
1308 def cap_remove(orig_mds_caps, orig_osd_caps, want_mds_caps, want_osd_caps):
1309 mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")]
1310 osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")]
1311
1312 for want_mds_cap, want_osd_cap in zip(want_mds_caps, want_osd_caps):
1313 if want_mds_cap in mds_cap_tokens:
1314 mds_cap_tokens.remove(want_mds_cap)
1315 osd_cap_tokens.remove(want_osd_cap)
1316 break
1317
1318 return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens)
1319
1320 cap = existing[0]
1321 orig_mds_caps = cap['caps'].get('mds', "")
1322 orig_osd_caps = cap['caps'].get('osd', "")
1323 mds_cap_str, osd_cap_str = cap_remove(orig_mds_caps, orig_osd_caps,
1324 want_mds_caps, want_osd_caps)
1325
1326 caps_list = self._prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str, authorize=False)
1327 if not caps_list:
1328 self._rados_command('auth del', {'entity': client_entity}, decode=False)
1329 else:
1330 self._rados_command(
1331 'auth caps',
1332 {
1333 'entity': client_entity,
1334 'caps': caps_list
1335 })
1336
1337 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1338 except rados.Error:
1339 # Already gone, great.
1340 return
1341
1342 def get_authorized_ids(self, volume_path):
1343 """
1344 Expose a list of auth IDs that have access to a volume.
1345
1346 return: a list of (auth_id, access_level) tuples, where
1347 the access_level can be 'r' , or 'rw'.
1348 None if no auth ID is given access to the volume.
1349 """
1350 with self._volume_lock(volume_path):
1351 meta = self._volume_metadata_get(volume_path)
1352 auths = []
1353 if not meta or not meta['auths']:
1354 return None
1355
1356 for auth, auth_data in meta['auths'].items():
1357 # Skip partial auth updates.
1358 if not auth_data['dirty']:
1359 auths.append((auth, auth_data['access_level']))
1360
1361 return auths
1362
1363 def _rados_command(self, prefix, args=None, decode=True):
1364 """
1365 Safer wrapper for ceph_argparse.json_command, which raises
1366 Error exception instead of relying on caller to check return
1367 codes.
1368
1369 Error exception can result from:
1370 * Timeout
1371 * Actual legitimate errors
1372 * Malformed JSON output
1373
1374 return: Decoded object from ceph, or None if empty string returned.
1375 If decode is False, return a string (the data returned by
1376 ceph command)
1377 """
1378 if args is None:
1379 args = {}
1380
1381 argdict = args.copy()
1382 argdict['format'] = 'json'
1383
1384 ret, outbuf, outs = json_command(self.rados,
1385 prefix=prefix,
1386 argdict=argdict,
1387 timeout=RADOS_TIMEOUT)
1388 if ret != 0:
1389 raise rados.Error(outs)
1390 else:
1391 if decode:
1392 if outbuf:
1393 try:
1394 return json.loads(outbuf.decode())
1395 except (ValueError, TypeError):
1396 raise RadosError("Invalid JSON output for command {0}".format(argdict))
1397 else:
1398 return None
1399 else:
1400 return outbuf
1401
1402 def get_used_bytes(self, volume_path):
1403 return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir."
1404 "rbytes").decode())
1405
1406 def set_max_bytes(self, volume_path, max_bytes):
1407 self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes',
1408 to_bytes(max_bytes if max_bytes else 0), 0)
1409
1410 def _snapshot_path(self, dir_path, snapshot_name):
1411 return os.path.join(
1412 dir_path, self.rados.conf_get('client_snapdir'), snapshot_name
1413 )
1414
1415 def _snapshot_create(self, dir_path, snapshot_name, mode=0o755):
1416 # TODO: raise intelligible exception for clusters where snaps are disabled
1417 self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), mode)
1418
1419 def _snapshot_destroy(self, dir_path, snapshot_name):
1420 """
1421 Remove a snapshot, or do nothing if it already doesn't exist.
1422 """
1423 try:
1424 self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name))
1425 except cephfs.ObjectNotFound:
1426 log.warning("Snapshot was already gone: {0}".format(snapshot_name))
1427
1428 def create_snapshot_volume(self, volume_path, snapshot_name, mode=0o755):
1429 self._snapshot_create(self._get_path(volume_path), snapshot_name, mode)
1430
1431 def destroy_snapshot_volume(self, volume_path, snapshot_name):
1432 self._snapshot_destroy(self._get_path(volume_path), snapshot_name)
1433
1434 def create_snapshot_group(self, group_id, snapshot_name, mode=0o755):
1435 if group_id is None:
1436 raise RuntimeError("Group ID may not be None")
1437
1438 return self._snapshot_create(self._get_group_path(group_id), snapshot_name,
1439 mode)
1440
1441 def destroy_snapshot_group(self, group_id, snapshot_name):
1442 if group_id is None:
1443 raise RuntimeError("Group ID may not be None")
1444 if snapshot_name is None:
1445 raise RuntimeError("Snapshot name may not be None")
1446
1447 return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name)
1448
1449 def _cp_r(self, src, dst):
1450 # TODO
1451 raise NotImplementedError()
1452
1453 def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name):
1454 dest_fs_path = self._get_path(dest_volume_path)
1455 src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name)
1456
1457 self._cp_r(src_snapshot_path, dest_fs_path)
1458
1459 def put_object(self, pool_name, object_name, data):
1460 """
1461 Synchronously write data to an object.
1462
1463 :param pool_name: name of the pool
1464 :type pool_name: str
1465 :param object_name: name of the object
1466 :type object_name: str
1467 :param data: data to write
1468 :type data: bytes
1469 """
1470 return self.put_object_versioned(pool_name, object_name, data)
1471
1472 def put_object_versioned(self, pool_name, object_name, data, version=None):
1473 """
1474 Synchronously write data to an object only if version of the object
1475 version matches the expected version.
1476
1477 :param pool_name: name of the pool
1478 :type pool_name: str
1479 :param object_name: name of the object
1480 :type object_name: str
1481 :param data: data to write
1482 :type data: bytes
1483 :param version: expected version of the object to write
1484 :type version: int
1485 """
1486 ioctx = self.rados.open_ioctx(pool_name)
1487
1488 max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
1489 if len(data) > max_size:
1490 msg = ("Data to be written to object '{0}' exceeds "
1491 "{1} bytes".format(object_name, max_size))
1492 log.error(msg)
1493 raise CephFSVolumeClientError(msg)
1494
1495 try:
1496 with rados.WriteOpCtx() as wop:
1497 if version is not None:
1498 wop.assert_version(version)
1499 wop.write_full(data)
1500 ioctx.operate_write_op(wop, object_name)
1501 except rados.OSError as e:
1502 log.error(e)
1503 raise e
1504 finally:
1505 ioctx.close()
1506
1507 def get_object(self, pool_name, object_name):
1508 """
1509 Synchronously read data from object.
1510
1511 :param pool_name: name of the pool
1512 :type pool_name: str
1513 :param object_name: name of the object
1514 :type object_name: str
1515
1516 :returns: bytes - data read from object
1517 """
1518 return self.get_object_and_version(pool_name, object_name)[0]
1519
1520 def get_object_and_version(self, pool_name, object_name):
1521 """
1522 Synchronously read data from object and get its version.
1523
1524 :param pool_name: name of the pool
1525 :type pool_name: str
1526 :param object_name: name of the object
1527 :type object_name: str
1528
1529 :returns: tuple of object data and version
1530 """
1531 ioctx = self.rados.open_ioctx(pool_name)
1532 max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
1533 try:
1534 bytes_read = ioctx.read(object_name, max_size)
1535 if ((len(bytes_read) == max_size) and
1536 (ioctx.read(object_name, 1, offset=max_size))):
1537 log.warning("Size of object {0} exceeds '{1}' bytes "
1538 "read".format(object_name, max_size))
1539 obj_version = ioctx.get_last_version()
1540 finally:
1541 ioctx.close()
1542 return (bytes_read, obj_version)
1543
1544 def delete_object(self, pool_name, object_name):
1545 ioctx = self.rados.open_ioctx(pool_name)
1546 try:
1547 ioctx.remove_object(object_name)
1548 except rados.ObjectNotFound:
1549 log.warning("Object '{0}' was already removed".format(object_name))
1550 finally:
1551 ioctx.close()