]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/ceph_volume_client.py
7fc5e768a16115a97f0359bb7763604d8aa1fc49
[ceph.git] / ceph / src / pybind / ceph_volume_client.py
1 """
2 Copyright (C) 2015 Red Hat, Inc.
3
4 LGPL-2.1 or LGPL-3.0. See file COPYING.
5 """
6
7 from contextlib import contextmanager
8 import errno
9 import fcntl
10 import json
11 import logging
12 import os
13 import re
14 import struct
15 import sys
16 import threading
17 import time
18 import uuid
19
20 from ceph_argparse import json_command
21
22 import cephfs
23 import rados
24
25 def to_bytes(param):
26 '''
27 Helper method that returns byte representation of the given parameter.
28 '''
29 if isinstance(param, str):
30 return param.encode('utf-8')
31 elif param is None:
32 return param
33 else:
34 return str(param).encode('utf-8')
35
36 class RadosError(Exception):
37 """
38 Something went wrong talking to Ceph with librados
39 """
40 pass
41
42
43 RADOS_TIMEOUT = 10
44
45 log = logging.getLogger(__name__)
46
47 # Reserved volume group name which we use in paths for volumes
48 # that are not assigned to a group (i.e. created with group=None)
49 NO_GROUP_NAME = "_nogroup"
50
51 # Filename extensions for meta files.
52 META_FILE_EXT = ".meta"
53
54 class VolumePath(object):
55 """
56 Identify a volume's path as group->volume
57 The Volume ID is a unique identifier, but this is a much more
58 helpful thing to pass around.
59 """
60 def __init__(self, group_id, volume_id):
61 self.group_id = group_id
62 self.volume_id = volume_id
63 assert self.group_id != NO_GROUP_NAME
64 assert self.volume_id != "" and self.volume_id is not None
65
66 def __str__(self):
67 return "{0}/{1}".format(self.group_id, self.volume_id)
68
69
70 class ClusterTimeout(Exception):
71 """
72 Exception indicating that we timed out trying to talk to the Ceph cluster,
73 either to the mons, or to any individual daemon that the mons indicate ought
74 to be up but isn't responding to us.
75 """
76 pass
77
78
79 class ClusterError(Exception):
80 """
81 Exception indicating that the cluster returned an error to a command that
82 we thought should be successful based on our last knowledge of the cluster
83 state.
84 """
85 def __init__(self, action, result_code, result_str):
86 self._action = action
87 self._result_code = result_code
88 self._result_str = result_str
89
90 def __str__(self):
91 return "Error {0} (\"{1}\") while {2}".format(
92 self._result_code, self._result_str, self._action)
93
94
95 class RankEvicter(threading.Thread):
96 """
97 Thread for evicting client(s) from a particular MDS daemon instance.
98
99 This is more complex than simply sending a command, because we have to
100 handle cases where MDS daemons might not be fully up yet, and/or might
101 be transiently unresponsive to commands.
102 """
103 class GidGone(Exception):
104 pass
105
106 POLL_PERIOD = 5
107
108 def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout):
109 """
110 :param client_spec: list of strings, used as filter arguments to "session evict"
111 pass ["id=123"] to evict a single client with session id 123.
112 """
113 self.rank = rank
114 self.gid = gid
115 self._mds_map = mds_map
116 self._client_spec = client_spec
117 self._volume_client = volume_client
118 self._ready_timeout = ready_timeout
119 self._ready_waited = 0
120
121 self.success = False
122 self.exception = None
123
124 super(RankEvicter, self).__init__()
125
126 def _ready_to_evict(self):
127 if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid:
128 log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format(
129 self._client_spec, self.rank, self.gid
130 ))
131 raise RankEvicter.GidGone()
132
133 info = self._mds_map['info']["gid_{0}".format(self.gid)]
134 log.debug("_ready_to_evict: state={0}".format(info['state']))
135 return info['state'] in ["up:active", "up:clientreplay"]
136
137 def _wait_for_ready(self):
138 """
139 Wait for that MDS rank to reach an active or clientreplay state, and
140 not be laggy.
141 """
142 while not self._ready_to_evict():
143 if self._ready_waited > self._ready_timeout:
144 raise ClusterTimeout()
145
146 time.sleep(self.POLL_PERIOD)
147 self._ready_waited += self.POLL_PERIOD
148
149 self._mds_map = self._volume_client.get_mds_map()
150
151 def _evict(self):
152 """
153 Run the eviction procedure. Return true on success, false on errors.
154 """
155
156 # Wait til the MDS is believed by the mon to be available for commands
157 try:
158 self._wait_for_ready()
159 except self.GidGone:
160 return True
161
162 # Then send it an evict
163 ret = errno.ETIMEDOUT
164 while ret == errno.ETIMEDOUT:
165 log.debug("mds_command: {0}, {1}".format(
166 "%s" % self.gid, ["session", "evict"] + self._client_spec
167 ))
168 ret, outb, outs = self._volume_client.fs.mds_command(
169 "%s" % self.gid,
170 json.dumps({
171 "prefix": "session evict",
172 "filters": self._client_spec
173 }), "")
174 log.debug("mds_command: complete {0} {1}".format(ret, outs))
175
176 # If we get a clean response, great, it's gone from that rank.
177 if ret == 0:
178 return True
179 elif ret == errno.ETIMEDOUT:
180 # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error)
181 self._mds_map = self._volume_client.get_mds_map()
182 try:
183 self._wait_for_ready()
184 except self.GidGone:
185 return True
186 else:
187 raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs)
188
189 def run(self):
190 try:
191 self._evict()
192 except Exception as e:
193 self.success = False
194 self.exception = e
195 else:
196 self.success = True
197
198
199 class EvictionError(Exception):
200 pass
201
202
203 class CephFSVolumeClientError(Exception):
204 """
205 Something went wrong talking to Ceph using CephFSVolumeClient.
206 """
207 pass
208
209
210 CEPHFSVOLUMECLIENT_VERSION_HISTORY = """
211
212 CephFSVolumeClient Version History:
213
214 * 1 - Initial version
215 * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient
216 * 3 - Allow volumes to be created without RADOS namespace isolation
217 * 4 - Added get_object_and_version, put_object_versioned method to CephFSVolumeClient
218 * 5 - Disallow authorize API for users not created by CephFSVolumeClient
219 * 6 - The 'volumes' key in auth-metadata-file is changed to 'subvolumes'.
220 """
221
222
223 class CephFSVolumeClient(object):
224 """
225 Combine libcephfs and librados interfaces to implement a
226 'Volume' concept implemented as a cephfs directory and
227 client capabilities which restrict mount access to this
228 directory.
229
230 Additionally, volumes may be in a 'Group'. Conveniently,
231 volumes are a lot like manila shares, and groups are a lot
232 like manila consistency groups.
233
234 Refer to volumes with VolumePath, which specifies the
235 volume and group IDs (both strings). The group ID may
236 be None.
237
238 In general, functions in this class are allowed raise rados.Error
239 or cephfs.Error exceptions in unexpected situations.
240 """
241
242 # Current version
243 version = 6
244
245 # Where shall we create our volumes?
246 POOL_PREFIX = "fsvolume_"
247 DEFAULT_VOL_PREFIX = "/volumes"
248 DEFAULT_NS_PREFIX = "fsvolumens_"
249
250 def __init__(self, auth_id=None, conf_path=None, cluster_name=None,
251 volume_prefix=None, pool_ns_prefix=None, rados=None,
252 fs_name=None):
253 """
254 Either set all three of ``auth_id``, ``conf_path`` and
255 ``cluster_name`` (rados constructed on connect), or
256 set ``rados`` (existing rados instance).
257 """
258 self.fs = None
259 self.fs_name = fs_name
260 self.connected = False
261
262 self.conf_path = conf_path
263 self.cluster_name = cluster_name
264 self.auth_id = auth_id
265
266 self.rados = rados
267 if self.rados:
268 # Using an externally owned rados, so we won't tear it down
269 # on disconnect
270 self.own_rados = False
271 else:
272 # self.rados will be constructed in connect
273 self.own_rados = True
274
275 self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX
276 self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX
277 # For flock'ing in cephfs, I want a unique ID to distinguish me
278 # from any other manila-share services that are loading this module.
279 # We could use pid, but that's unnecessary weak: generate a
280 # UUID
281 self._id = struct.unpack(">Q", uuid.uuid1().bytes[0:8])[0]
282
283 # TODO: version the on-disk structures
284
285 def recover(self):
286 # Scan all auth keys to see if they're dirty: if they are, they have
287 # state that might not have propagated to Ceph or to the related
288 # volumes yet.
289
290 # Important: we *always* acquire locks in the order auth->volume
291 # That means a volume can never be dirty without the auth key
292 # we're updating it with being dirty at the same time.
293
294 # First list the auth IDs that have potentially dirty on-disk metadata
295 log.debug("Recovering from partial auth updates (if any)...")
296
297 try:
298 dir_handle = self.fs.opendir(self.volume_prefix)
299 except cephfs.ObjectNotFound:
300 log.debug("Nothing to recover. No auth meta files.")
301 return
302
303 d = self.fs.readdir(dir_handle)
304 auth_ids = []
305
306 if not d:
307 log.debug("Nothing to recover. No auth meta files.")
308
309 while d:
310 # Identify auth IDs from auth meta filenames. The auth meta files
311 # are named as, "$<auth_id><meta filename extension>"
312 regex = "^\$(.*){0}$".format(re.escape(META_FILE_EXT))
313 match = re.search(regex, d.d_name.decode(encoding='utf-8'))
314 if match:
315 auth_ids.append(match.group(1))
316
317 d = self.fs.readdir(dir_handle)
318
319 self.fs.closedir(dir_handle)
320
321 # Key points based on ordering:
322 # * Anything added in VMeta is already added in AMeta
323 # * Anything added in Ceph is already added in VMeta
324 # * Anything removed in VMeta is already removed in Ceph
325 # * Anything removed in AMeta is already removed in VMeta
326
327 # Deauthorization: because I only update metadata AFTER the
328 # update of the next level down, I have the same ordering of
329 # -> things which exist in the AMeta should also exist
330 # in the VMeta, should also exist in Ceph, and the same
331 # recovery procedure that gets me consistent after crashes
332 # during authorization will also work during deauthorization
333
334 # Now for each auth ID, check for dirty flag and apply updates
335 # if dirty flag is found
336 for auth_id in auth_ids:
337 with self._auth_lock(auth_id):
338 auth_meta = self._auth_metadata_get(auth_id)
339 # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key
340 if auth_meta and 'volumes' in auth_meta:
341 auth_meta['subvolumes'] = auth_meta.pop('volumes')
342 if not auth_meta or not auth_meta['subvolumes']:
343 # Clean up auth meta file
344 self.fs.unlink(self._auth_metadata_path(auth_id))
345 continue
346 if not auth_meta['dirty']:
347 continue
348 self._recover_auth_meta(auth_id, auth_meta)
349
350 log.debug("Recovered from partial auth updates (if any).")
351
352 def _recover_auth_meta(self, auth_id, auth_meta):
353 """
354 Call me after locking the auth meta file.
355 """
356 remove_volumes = []
357
358 for volume, volume_data in auth_meta['subvolumes'].items():
359 if not volume_data['dirty']:
360 continue
361
362 (group_id, volume_id) = volume.split('/')
363 group_id = group_id if group_id != 'None' else None
364 volume_path = VolumePath(group_id, volume_id)
365 access_level = volume_data['access_level']
366
367 with self._volume_lock(volume_path):
368 vol_meta = self._volume_metadata_get(volume_path)
369
370 # No VMeta update indicates that there was no auth update
371 # in Ceph either. So it's safe to remove corresponding
372 # partial update in AMeta.
373 if not vol_meta or auth_id not in vol_meta['auths']:
374 remove_volumes.append(volume)
375 continue
376
377 want_auth = {
378 'access_level': access_level,
379 'dirty': False,
380 }
381 # VMeta update looks clean. Ceph auth update must have been
382 # clean.
383 if vol_meta['auths'][auth_id] == want_auth:
384 auth_meta['subvolumes'][volume]['dirty'] = False
385 self._auth_metadata_set(auth_id, auth_meta)
386 continue
387
388 readonly = access_level == 'r'
389 client_entity = "client.{0}".format(auth_id)
390 try:
391 existing_caps = self._rados_command(
392 'auth get',
393 {
394 'entity': client_entity
395 }
396 )
397 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
398 except rados.Error:
399 existing_caps = None
400 self._authorize_volume(volume_path, auth_id, readonly, existing_caps)
401
402 # Recovered from partial auth updates for the auth ID's access
403 # to a volume.
404 auth_meta['subvolumes'][volume]['dirty'] = False
405 self._auth_metadata_set(auth_id, auth_meta)
406
407 for volume in remove_volumes:
408 del auth_meta['subvolumes'][volume]
409
410 if not auth_meta['subvolumes']:
411 # Clean up auth meta file
412 self.fs.unlink(self._auth_metadata_path(auth_id))
413 return
414
415 # Recovered from all partial auth updates for the auth ID.
416 auth_meta['dirty'] = False
417 self._auth_metadata_set(auth_id, auth_meta)
418
419 def get_mds_map(self):
420 fs_map = self._rados_command("fs dump", {})
421 return fs_map['filesystems'][0]['mdsmap']
422
423 def evict(self, auth_id, timeout=30, volume_path=None):
424 """
425 Evict all clients based on the authorization ID and optionally based on
426 the volume path mounted. Assumes that the authorization key has been
427 revoked prior to calling this function.
428
429 This operation can throw an exception if the mon cluster is unresponsive, or
430 any individual MDS daemon is unresponsive for longer than the timeout passed in.
431 """
432
433 client_spec = ["auth_name={0}".format(auth_id), ]
434 if volume_path:
435 client_spec.append("client_metadata.root={0}".
436 format(self._get_path(volume_path)))
437
438 log.info("evict clients with {0}".format(', '.join(client_spec)))
439
440 mds_map = self.get_mds_map()
441 up = {}
442 for name, gid in mds_map['up'].items():
443 # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
444 assert name.startswith("mds_")
445 up[int(name[4:])] = gid
446
447 # For all MDS ranks held by a daemon
448 # Do the parallelism in python instead of using "tell mds.*", because
449 # the latter doesn't give us per-mds output
450 threads = []
451 for rank, gid in up.items():
452 thread = RankEvicter(self, client_spec, rank, gid, mds_map,
453 timeout)
454 thread.start()
455 threads.append(thread)
456
457 for t in threads:
458 t.join()
459
460 log.info("evict: joined all")
461
462 for t in threads:
463 if not t.success:
464 msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}".
465 format(', '.join(client_spec), t.rank, t.gid, t.exception)
466 )
467 log.error(msg)
468 raise EvictionError(msg)
469
470 def _get_path(self, volume_path):
471 """
472 Determine the path within CephFS where this volume will live
473 :return: absolute path (string)
474 """
475 return os.path.join(
476 self.volume_prefix,
477 volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME,
478 volume_path.volume_id)
479
480 def _get_group_path(self, group_id):
481 if group_id is None:
482 raise ValueError("group_id may not be None")
483
484 return os.path.join(
485 self.volume_prefix,
486 group_id
487 )
488
489 def _connect(self, premount_evict):
490 log.debug("Connecting to cephfs...")
491 self.fs = cephfs.LibCephFS(rados_inst=self.rados)
492 log.debug("CephFS initializing...")
493 self.fs.init()
494 if premount_evict is not None:
495 log.debug("Premount eviction of {0} starting".format(premount_evict))
496 self.evict(premount_evict)
497 log.debug("Premount eviction of {0} completes".format(premount_evict))
498 log.debug("CephFS mounting...")
499 self.fs.mount(filesystem_name=to_bytes(self.fs_name))
500 log.debug("Connection to cephfs complete")
501
502 # Recover from partial auth updates due to a previous
503 # crash.
504 self.recover()
505
506 def connect(self, premount_evict = None):
507 """
508
509 :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers
510 may want to use this to specify their own auth ID if they expect
511 to be a unique instance and don't want to wait for caps to time
512 out after failure of another instance of themselves.
513 """
514 if self.own_rados:
515 log.debug("Configuring to RADOS with config {0}...".format(self.conf_path))
516 self.rados = rados.Rados(
517 name="client.{0}".format(self.auth_id),
518 clustername=self.cluster_name,
519 conffile=self.conf_path,
520 conf={}
521 )
522 if self.rados.state != "connected":
523 log.debug("Connecting to RADOS...")
524 self.rados.connect()
525 log.debug("Connection to RADOS complete")
526 self._connect(premount_evict)
527
528 def get_mon_addrs(self):
529 log.info("get_mon_addrs")
530 result = []
531 mon_map = self._rados_command("mon dump")
532 for mon in mon_map['mons']:
533 ip_port = mon['addr'].split("/")[0]
534 result.append(ip_port)
535
536 return result
537
538 def disconnect(self):
539 log.info("disconnect")
540 if self.fs:
541 log.debug("Disconnecting cephfs...")
542 self.fs.shutdown()
543 self.fs = None
544 log.debug("Disconnecting cephfs complete")
545
546 if self.rados and self.own_rados:
547 log.debug("Disconnecting rados...")
548 self.rados.shutdown()
549 self.rados = None
550 log.debug("Disconnecting rados complete")
551
552 def __enter__(self):
553 self.connect()
554 return self
555
556 def __exit__(self, exc_type, exc_val, exc_tb):
557 self.disconnect()
558
559 def __del__(self):
560 self.disconnect()
561
562 def _get_pool_id(self, osd_map, pool_name):
563 # Maybe borrow the OSDMap wrapper class from calamari if more helpers
564 # like this are needed.
565 for pool in osd_map['pools']:
566 if pool['pool_name'] == pool_name:
567 return pool['pool']
568
569 return None
570
571 def _create_volume_pool(self, pool_name):
572 """
573 Idempotently create a pool for use as a CephFS data pool, with the given name
574
575 :return The ID of the created pool
576 """
577 osd_map = self._rados_command('osd dump', {})
578
579 existing_id = self._get_pool_id(osd_map, pool_name)
580 if existing_id is not None:
581 log.info("Pool {0} already exists".format(pool_name))
582 return existing_id
583
584 self._rados_command(
585 'osd pool create',
586 {
587 'pool': pool_name,
588 }
589 )
590
591 osd_map = self._rados_command('osd dump', {})
592 pool_id = self._get_pool_id(osd_map, pool_name)
593
594 if pool_id is None:
595 # If the pool isn't there, that's either a ceph bug, or it's some outside influence
596 # removing it right after we created it.
597 log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format(
598 pool_name, json.dumps(osd_map, indent=2)
599 ))
600 raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name))
601 else:
602 return pool_id
603
604 def create_group(self, group_id, mode=0o755):
605 # Prevent craftily-named volume groups from colliding with the meta
606 # files.
607 if group_id.endswith(META_FILE_EXT):
608 raise ValueError("group ID cannot end with '{0}'.".format(
609 META_FILE_EXT))
610 path = self._get_group_path(group_id)
611 self._mkdir_p(path, mode)
612
613 def destroy_group(self, group_id):
614 path = self._get_group_path(group_id)
615 try:
616 self.fs.stat(self.volume_prefix)
617 except cephfs.ObjectNotFound:
618 pass
619 else:
620 self.fs.rmdir(path)
621
622 def _mkdir_p(self, path, mode=0o755):
623 try:
624 self.fs.stat(path)
625 except cephfs.ObjectNotFound:
626 pass
627 else:
628 return
629
630 parts = path.split(os.path.sep)
631
632 for i in range(1, len(parts) + 1):
633 subpath = os.path.join(*parts[0:i])
634 try:
635 self.fs.stat(subpath)
636 except cephfs.ObjectNotFound:
637 self.fs.mkdir(subpath, mode)
638
639 def create_volume(self, volume_path, size=None, data_isolated=False, namespace_isolated=True,
640 mode=0o755):
641 """
642 Set up metadata, pools and auth for a volume.
643
644 This function is idempotent. It is safe to call this again
645 for an already-created volume, even if it is in use.
646
647 :param volume_path: VolumePath instance
648 :param size: In bytes, or None for no size limit
649 :param data_isolated: If true, create a separate OSD pool for this volume
650 :param namespace_isolated: If true, use separate RADOS namespace for this volume
651 :return:
652 """
653 path = self._get_path(volume_path)
654 log.info("create_volume: {0}".format(path))
655
656 self._mkdir_p(path, mode)
657
658 if size is not None:
659 self.fs.setxattr(path, 'ceph.quota.max_bytes', to_bytes(size), 0)
660
661 # data_isolated means create a separate pool for this volume
662 if data_isolated:
663 pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
664 log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name))
665 pool_id = self._create_volume_pool(pool_name)
666 mds_map = self.get_mds_map()
667 if pool_id not in mds_map['data_pools']:
668 self._rados_command("fs add_data_pool", {
669 'fs_name': mds_map['fs_name'],
670 'pool': pool_name
671 })
672 time.sleep(5) # time for MDSMap to be distributed
673 self.fs.setxattr(path, 'ceph.dir.layout.pool', to_bytes(pool_name), 0)
674
675 # enforce security isolation, use separate namespace for this volume
676 if namespace_isolated:
677 namespace = "{0}{1}".format(self.pool_ns_prefix, volume_path.volume_id)
678 log.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path, namespace))
679 self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace',
680 to_bytes(namespace), 0)
681 else:
682 # If volume's namespace layout is not set, then the volume's pool
683 # layout remains unset and will undesirably change with ancestor's
684 # pool layout changes.
685 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
686 self.fs.setxattr(path, 'ceph.dir.layout.pool',
687 to_bytes(pool_name), 0)
688
689 # Create a volume meta file, if it does not already exist, to store
690 # data about auth ids having access to the volume
691 fd = self.fs.open(self._volume_metadata_path(volume_path),
692 os.O_CREAT, 0o755)
693 self.fs.close(fd)
694
695 return {
696 'mount_path': path
697 }
698
699 def delete_volume(self, volume_path, data_isolated=False):
700 """
701 Make a volume inaccessible to guests. This function is
702 idempotent. This is the fast part of tearing down a volume: you must
703 also later call purge_volume, which is the slow part.
704
705 :param volume_path: Same identifier used in create_volume
706 :return:
707 """
708
709 path = self._get_path(volume_path)
710 log.info("delete_volume: {0}".format(path))
711
712 # Create the trash folder if it doesn't already exist
713 trash = os.path.join(self.volume_prefix, "_deleting")
714 self._mkdir_p(trash)
715
716 # We'll move it to here
717 trashed_volume = os.path.join(trash, volume_path.volume_id)
718
719 # Move the volume's data to the trash folder
720 try:
721 self.fs.stat(path)
722 except cephfs.ObjectNotFound:
723 log.warning("Trying to delete volume '{0}' but it's already gone".format(
724 path))
725 else:
726 self.fs.rename(path, trashed_volume)
727
728 # Delete the volume meta file, if it's not already deleted
729 vol_meta_path = self._volume_metadata_path(volume_path)
730 try:
731 self.fs.unlink(vol_meta_path)
732 except cephfs.ObjectNotFound:
733 pass
734
735 def purge_volume(self, volume_path, data_isolated=False):
736 """
737 Finish clearing up a volume that was previously passed to delete_volume. This
738 function is idempotent.
739 """
740
741 trash = os.path.join(self.volume_prefix, "_deleting")
742 trashed_volume = os.path.join(trash, volume_path.volume_id)
743
744 try:
745 self.fs.stat(trashed_volume)
746 except cephfs.ObjectNotFound:
747 log.warning("Trying to purge volume '{0}' but it's already been purged".format(
748 trashed_volume))
749 return
750
751 def rmtree(root_path):
752 log.debug("rmtree {0}".format(root_path))
753 dir_handle = self.fs.opendir(root_path)
754 d = self.fs.readdir(dir_handle)
755 while d:
756 d_name = d.d_name.decode(encoding='utf-8')
757 if d_name not in [".", ".."]:
758 # Do not use os.path.join because it is sensitive
759 # to string encoding, we just pass through dnames
760 # as byte arrays
761 d_full = u"{0}/{1}".format(root_path, d_name)
762 if d.is_dir():
763 rmtree(d_full)
764 else:
765 self.fs.unlink(d_full)
766
767 d = self.fs.readdir(dir_handle)
768 self.fs.closedir(dir_handle)
769
770 self.fs.rmdir(root_path)
771
772 rmtree(trashed_volume)
773
774 if data_isolated:
775 pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
776 osd_map = self._rados_command("osd dump", {})
777 pool_id = self._get_pool_id(osd_map, pool_name)
778 mds_map = self.get_mds_map()
779 if pool_id in mds_map['data_pools']:
780 self._rados_command("fs rm_data_pool", {
781 'fs_name': mds_map['fs_name'],
782 'pool': pool_name
783 })
784 self._rados_command("osd pool delete",
785 {
786 "pool": pool_name,
787 "pool2": pool_name,
788 "yes_i_really_really_mean_it": True
789 })
790
791 def _get_ancestor_xattr(self, path, attr):
792 """
793 Helper for reading layout information: if this xattr is missing
794 on the requested path, keep checking parents until we find it.
795 """
796 try:
797 result = self.fs.getxattr(path, attr).decode()
798 if result == "":
799 # Annoying! cephfs gives us empty instead of an error when attr not found
800 raise cephfs.NoData()
801 else:
802 return result
803 except cephfs.NoData:
804 if path == "/":
805 raise
806 else:
807 return self._get_ancestor_xattr(os.path.split(path)[0], attr)
808
809 def _check_compat_version(self, compat_version):
810 if self.version < compat_version:
811 msg = ("The current version of CephFSVolumeClient, version {0} "
812 "does not support the required feature. Need version {1} "
813 "or greater".format(self.version, compat_version)
814 )
815 log.error(msg)
816 raise CephFSVolumeClientError(msg)
817
818 def _metadata_get(self, path):
819 """
820 Return a deserialized JSON object, or None
821 """
822 fd = self.fs.open(path, "r")
823 # TODO iterate instead of assuming file < 4MB
824 read_bytes = self.fs.read(fd, 0, 4096 * 1024)
825 self.fs.close(fd)
826 if read_bytes:
827 return json.loads(read_bytes.decode())
828 else:
829 return None
830
831 def _metadata_set(self, path, data):
832 serialized = json.dumps(data)
833 fd = self.fs.open(path, "w")
834 try:
835 self.fs.write(fd, to_bytes(serialized), 0)
836 self.fs.fsync(fd, 0)
837 finally:
838 self.fs.close(fd)
839
840 def _lock(self, path):
841 @contextmanager
842 def fn():
843 while(1):
844 fd = self.fs.open(path, os.O_CREAT, 0o755)
845 self.fs.flock(fd, fcntl.LOCK_EX, self._id)
846
847 # The locked file will be cleaned up sometime. It could be
848 # unlinked e.g., by an another manila share instance, before
849 # lock was applied on it. Perform checks to ensure that this
850 # does not happen.
851 try:
852 statbuf = self.fs.stat(path)
853 except cephfs.ObjectNotFound:
854 self.fs.close(fd)
855 continue
856
857 fstatbuf = self.fs.fstat(fd)
858 if statbuf.st_ino == fstatbuf.st_ino:
859 break
860
861 try:
862 yield
863 finally:
864 self.fs.flock(fd, fcntl.LOCK_UN, self._id)
865 self.fs.close(fd)
866
867 return fn()
868
869 def _auth_metadata_path(self, auth_id):
870 return os.path.join(self.volume_prefix, "${0}{1}".format(
871 auth_id, META_FILE_EXT))
872
873 def _auth_lock(self, auth_id):
874 return self._lock(self._auth_metadata_path(auth_id))
875
876 def _auth_metadata_get(self, auth_id):
877 """
878 Call me with the metadata locked!
879
880 Check whether a auth metadata structure can be decoded by the current
881 version of CephFSVolumeClient.
882
883 Return auth metadata that the current version of CephFSVolumeClient
884 can decode.
885 """
886 auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id))
887
888 if auth_metadata:
889 self._check_compat_version(auth_metadata['compat_version'])
890
891 return auth_metadata
892
893 def _auth_metadata_set(self, auth_id, data):
894 """
895 Call me with the metadata locked!
896
897 Fsync the auth metadata.
898
899 Add two version attributes to the auth metadata,
900 'compat_version', the minimum CephFSVolumeClient version that can
901 decode the metadata, and 'version', the CephFSVolumeClient version
902 that encoded the metadata.
903 """
904 data['compat_version'] = 6
905 data['version'] = self.version
906 return self._metadata_set(self._auth_metadata_path(auth_id), data)
907
908 def _volume_metadata_path(self, volume_path):
909 return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format(
910 volume_path.group_id if volume_path.group_id else "",
911 volume_path.volume_id,
912 META_FILE_EXT
913 ))
914
915 def _volume_lock(self, volume_path):
916 """
917 Return a ContextManager which locks the authorization metadata for
918 a particular volume, and persists a flag to the metadata indicating
919 that it is currently locked, so that we can detect dirty situations
920 during recovery.
921
922 This lock isn't just to make access to the metadata safe: it's also
923 designed to be used over the two-step process of checking the
924 metadata and then responding to an authorization request, to
925 ensure that at the point we respond the metadata hasn't changed
926 in the background. It's key to how we avoid security holes
927 resulting from races during that problem ,
928 """
929 return self._lock(self._volume_metadata_path(volume_path))
930
931 def _volume_metadata_get(self, volume_path):
932 """
933 Call me with the metadata locked!
934
935 Check whether a volume metadata structure can be decoded by the current
936 version of CephFSVolumeClient.
937
938 Return a volume_metadata structure that the current version of
939 CephFSVolumeClient can decode.
940 """
941 volume_metadata = self._metadata_get(self._volume_metadata_path(volume_path))
942
943 if volume_metadata:
944 self._check_compat_version(volume_metadata['compat_version'])
945
946 return volume_metadata
947
948 def _volume_metadata_set(self, volume_path, data):
949 """
950 Call me with the metadata locked!
951
952 Add two version attributes to the volume metadata,
953 'compat_version', the minimum CephFSVolumeClient version that can
954 decode the metadata and 'version', the CephFSVolumeClient version
955 that encoded the metadata.
956 """
957 data['compat_version'] = 1
958 data['version'] = self.version
959 return self._metadata_set(self._volume_metadata_path(volume_path), data)
960
961 def _prepare_updated_caps_list(self, existing_caps, mds_cap_str, osd_cap_str, authorize=True):
962 caps_list = []
963 for k, v in existing_caps['caps'].items():
964 if k == 'mds' or k == 'osd':
965 continue
966 elif k == 'mon':
967 if not authorize and v == 'allow r':
968 continue
969 caps_list.extend((k,v))
970
971 if mds_cap_str:
972 caps_list.extend(('mds', mds_cap_str))
973 if osd_cap_str:
974 caps_list.extend(('osd', osd_cap_str))
975
976 if authorize and 'mon' not in caps_list:
977 caps_list.extend(('mon', 'allow r'))
978
979 return caps_list
980
981 def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None, allow_existing_id=False):
982 """
983 Get-or-create a Ceph auth identity for `auth_id` and grant them access
984 to
985 :param volume_path:
986 :param auth_id:
987 :param readonly:
988 :param tenant_id: Optionally provide a stringizable object to
989 restrict any created cephx IDs to other callers
990 passing the same tenant ID.
991 :allow_existing_id: Optionally authorize existing auth-ids not
992 created by ceph_volume_client
993 :return:
994 """
995
996 with self._auth_lock(auth_id):
997 client_entity = "client.{0}".format(auth_id)
998 try:
999 existing_caps = self._rados_command(
1000 'auth get',
1001 {
1002 'entity': client_entity
1003 }
1004 )
1005 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1006 except rados.Error:
1007 existing_caps = None
1008
1009 # Existing meta, or None, to be updated
1010 auth_meta = self._auth_metadata_get(auth_id)
1011
1012 # subvolume data to be inserted
1013 volume_path_str = str(volume_path)
1014 subvolume = {
1015 volume_path_str : {
1016 # The access level at which the auth_id is authorized to
1017 # access the volume.
1018 'access_level': 'r' if readonly else 'rw',
1019 'dirty': True,
1020 }
1021 }
1022
1023 if auth_meta is None:
1024 if not allow_existing_id and existing_caps is not None:
1025 msg = "auth ID: {0} exists and not created by ceph_volume_client. Not allowed to modify".format(auth_id)
1026 log.error(msg)
1027 raise CephFSVolumeClientError(msg)
1028
1029 # non-existent auth IDs
1030 sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format(
1031 auth_id, tenant_id
1032 ))
1033 log.debug("Authorize: no existing meta")
1034 auth_meta = {
1035 'dirty': True,
1036 'tenant_id': tenant_id.__str__() if tenant_id else None,
1037 'subvolumes': subvolume
1038 }
1039 else:
1040 # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key
1041 if 'volumes' in auth_meta:
1042 auth_meta['subvolumes'] = auth_meta.pop('volumes')
1043
1044 # Disallow tenants to share auth IDs
1045 if auth_meta['tenant_id'].__str__() != tenant_id.__str__():
1046 msg = "auth ID: {0} is already in use".format(auth_id)
1047 log.error(msg)
1048 raise CephFSVolumeClientError(msg)
1049
1050 if auth_meta['dirty']:
1051 self._recover_auth_meta(auth_id, auth_meta)
1052
1053 log.debug("Authorize: existing tenant {tenant}".format(
1054 tenant=auth_meta['tenant_id']
1055 ))
1056 auth_meta['dirty'] = True
1057 auth_meta['subvolumes'].update(subvolume)
1058
1059 self._auth_metadata_set(auth_id, auth_meta)
1060
1061 with self._volume_lock(volume_path):
1062 key = self._authorize_volume(volume_path, auth_id, readonly, existing_caps)
1063
1064 auth_meta['dirty'] = False
1065 auth_meta['subvolumes'][volume_path_str]['dirty'] = False
1066 self._auth_metadata_set(auth_id, auth_meta)
1067
1068 if tenant_id:
1069 return {
1070 'auth_key': key
1071 }
1072 else:
1073 # Caller wasn't multi-tenant aware: be safe and don't give
1074 # them a key
1075 return {
1076 'auth_key': None
1077 }
1078
1079 def _authorize_volume(self, volume_path, auth_id, readonly, existing_caps):
1080 vol_meta = self._volume_metadata_get(volume_path)
1081
1082 access_level = 'r' if readonly else 'rw'
1083 auth = {
1084 auth_id: {
1085 'access_level': access_level,
1086 'dirty': True,
1087 }
1088 }
1089
1090 if vol_meta is None:
1091 vol_meta = {
1092 'auths': auth
1093 }
1094 else:
1095 vol_meta['auths'].update(auth)
1096 self._volume_metadata_set(volume_path, vol_meta)
1097
1098 key = self._authorize_ceph(volume_path, auth_id, readonly, existing_caps)
1099
1100 vol_meta['auths'][auth_id]['dirty'] = False
1101 self._volume_metadata_set(volume_path, vol_meta)
1102
1103 return key
1104
1105 def _authorize_ceph(self, volume_path, auth_id, readonly, existing_caps):
1106 path = self._get_path(volume_path)
1107 log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(
1108 auth_id, path
1109 ))
1110
1111 # First I need to work out what the data pool is for this share:
1112 # read the layout
1113 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
1114
1115 try:
1116 namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_"
1117 "namespace").decode()
1118 except cephfs.NoData:
1119 namespace = None
1120
1121 # Now construct auth capabilities that give the guest just enough
1122 # permissions to access the share
1123 client_entity = "client.{0}".format(auth_id)
1124 want_access_level = 'r' if readonly else 'rw'
1125 want_mds_cap = 'allow {0} path={1}'.format(want_access_level, path)
1126 if namespace:
1127 want_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
1128 want_access_level, pool_name, namespace)
1129 else:
1130 want_osd_cap = 'allow {0} pool={1}'.format(want_access_level,
1131 pool_name)
1132
1133 if existing_caps is None:
1134 caps = self._rados_command(
1135 'auth get-or-create',
1136 {
1137 'entity': client_entity,
1138 'caps': [
1139 'mds', want_mds_cap,
1140 'osd', want_osd_cap,
1141 'mon', 'allow r']
1142 })
1143 else:
1144 # entity exists, update it
1145 cap = existing_caps[0]
1146
1147 # Construct auth caps that if present might conflict with the desired
1148 # auth caps.
1149 unwanted_access_level = 'r' if want_access_level == 'rw' else 'rw'
1150 unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path)
1151 if namespace:
1152 unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
1153 unwanted_access_level, pool_name, namespace)
1154 else:
1155 unwanted_osd_cap = 'allow {0} pool={1}'.format(
1156 unwanted_access_level, pool_name)
1157
1158 def cap_update(
1159 orig_mds_caps, orig_osd_caps, want_mds_cap,
1160 want_osd_cap, unwanted_mds_cap, unwanted_osd_cap):
1161
1162 if not orig_mds_caps:
1163 return want_mds_cap, want_osd_cap
1164
1165 mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")]
1166 osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")]
1167
1168 if want_mds_cap in mds_cap_tokens:
1169 return orig_mds_caps, orig_osd_caps
1170
1171 if unwanted_mds_cap in mds_cap_tokens:
1172 mds_cap_tokens.remove(unwanted_mds_cap)
1173 osd_cap_tokens.remove(unwanted_osd_cap)
1174
1175 mds_cap_tokens.append(want_mds_cap)
1176 osd_cap_tokens.append(want_osd_cap)
1177
1178 return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens)
1179
1180 orig_mds_caps = cap['caps'].get('mds', "")
1181 orig_osd_caps = cap['caps'].get('osd', "")
1182
1183 mds_cap_str, osd_cap_str = cap_update(
1184 orig_mds_caps, orig_osd_caps, want_mds_cap, want_osd_cap,
1185 unwanted_mds_cap, unwanted_osd_cap)
1186
1187 caps_list = self._prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str)
1188 caps = self._rados_command(
1189 'auth caps',
1190 {
1191 'entity': client_entity,
1192 'caps': caps_list
1193 })
1194
1195 caps = self._rados_command(
1196 'auth get',
1197 {
1198 'entity': client_entity
1199 }
1200 )
1201
1202 # Result expected like this:
1203 # [
1204 # {
1205 # "entity": "client.foobar",
1206 # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==",
1207 # "caps": {
1208 # "mds": "allow *",
1209 # "mon": "allow *"
1210 # }
1211 # }
1212 # ]
1213 assert len(caps) == 1
1214 assert caps[0]['entity'] == client_entity
1215 return caps[0]['key']
1216
1217 def deauthorize(self, volume_path, auth_id):
1218 with self._auth_lock(auth_id):
1219 # Existing meta, or None, to be updated
1220 auth_meta = self._auth_metadata_get(auth_id)
1221
1222 # Update 'volumes' key (old style auth metadata file) to 'subvolumes' key
1223 if auth_meta and 'volumes' in auth_meta:
1224 auth_meta['subvolumes'] = auth_meta.pop('volumes')
1225
1226 volume_path_str = str(volume_path)
1227 if (auth_meta is None) or (not auth_meta['subvolumes']):
1228 log.warning("deauthorized called for already-removed auth"
1229 "ID '{auth_id}' for volume ID '{volume}'".format(
1230 auth_id=auth_id, volume=volume_path.volume_id
1231 ))
1232 # Clean up the auth meta file of an auth ID
1233 self.fs.unlink(self._auth_metadata_path(auth_id))
1234 return
1235
1236 if volume_path_str not in auth_meta['subvolumes']:
1237 log.warning("deauthorized called for already-removed auth"
1238 "ID '{auth_id}' for volume ID '{volume}'".format(
1239 auth_id=auth_id, volume=volume_path.volume_id
1240 ))
1241 return
1242
1243 if auth_meta['dirty']:
1244 self._recover_auth_meta(auth_id, auth_meta)
1245
1246 auth_meta['dirty'] = True
1247 auth_meta['subvolumes'][volume_path_str]['dirty'] = True
1248 self._auth_metadata_set(auth_id, auth_meta)
1249
1250 self._deauthorize_volume(volume_path, auth_id)
1251
1252 # Filter out the volume we're deauthorizing
1253 del auth_meta['subvolumes'][volume_path_str]
1254
1255 # Clean up auth meta file
1256 if not auth_meta['subvolumes']:
1257 self.fs.unlink(self._auth_metadata_path(auth_id))
1258 return
1259
1260 auth_meta['dirty'] = False
1261 self._auth_metadata_set(auth_id, auth_meta)
1262
1263 def _deauthorize_volume(self, volume_path, auth_id):
1264 with self._volume_lock(volume_path):
1265 vol_meta = self._volume_metadata_get(volume_path)
1266
1267 if (vol_meta is None) or (auth_id not in vol_meta['auths']):
1268 log.warning("deauthorized called for already-removed auth"
1269 "ID '{auth_id}' for volume ID '{volume}'".format(
1270 auth_id=auth_id, volume=volume_path.volume_id
1271 ))
1272 return
1273
1274 vol_meta['auths'][auth_id]['dirty'] = True
1275 self._volume_metadata_set(volume_path, vol_meta)
1276
1277 self._deauthorize(volume_path, auth_id)
1278
1279 # Remove the auth_id from the metadata *after* removing it
1280 # from ceph, so that if we crashed here, we would actually
1281 # recreate the auth ID during recovery (i.e. end up with
1282 # a consistent state).
1283
1284 # Filter out the auth we're removing
1285 del vol_meta['auths'][auth_id]
1286 self._volume_metadata_set(volume_path, vol_meta)
1287
1288 def _deauthorize(self, volume_path, auth_id):
1289 """
1290 The volume must still exist.
1291 """
1292 client_entity = "client.{0}".format(auth_id)
1293 path = self._get_path(volume_path)
1294 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
1295 try:
1296 namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_"
1297 "namespace").decode()
1298 except cephfs.NoData:
1299 namespace = None
1300
1301 # The auth_id might have read-only or read-write mount access for the
1302 # volume path.
1303 access_levels = ('r', 'rw')
1304 want_mds_caps = ['allow {0} path={1}'.format(access_level, path)
1305 for access_level in access_levels]
1306 if namespace:
1307 want_osd_caps = ['allow {0} pool={1} namespace={2}'.format(access_level, pool_name, namespace)
1308 for access_level in access_levels]
1309 else:
1310 want_osd_caps = ['allow {0} pool={1}'.format(access_level, pool_name)
1311 for access_level in access_levels]
1312
1313
1314 try:
1315 existing = self._rados_command(
1316 'auth get',
1317 {
1318 'entity': client_entity
1319 }
1320 )
1321
1322 def cap_remove(orig_mds_caps, orig_osd_caps, want_mds_caps, want_osd_caps):
1323 mds_cap_tokens = [x.strip() for x in orig_mds_caps.split(",")]
1324 osd_cap_tokens = [x.strip() for x in orig_osd_caps.split(",")]
1325
1326 for want_mds_cap, want_osd_cap in zip(want_mds_caps, want_osd_caps):
1327 if want_mds_cap in mds_cap_tokens:
1328 mds_cap_tokens.remove(want_mds_cap)
1329 osd_cap_tokens.remove(want_osd_cap)
1330 break
1331
1332 return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens)
1333
1334 cap = existing[0]
1335 orig_mds_caps = cap['caps'].get('mds', "")
1336 orig_osd_caps = cap['caps'].get('osd', "")
1337 mds_cap_str, osd_cap_str = cap_remove(orig_mds_caps, orig_osd_caps,
1338 want_mds_caps, want_osd_caps)
1339
1340 caps_list = self._prepare_updated_caps_list(cap, mds_cap_str, osd_cap_str, authorize=False)
1341 if not caps_list:
1342 self._rados_command('auth del', {'entity': client_entity}, decode=False)
1343 else:
1344 self._rados_command(
1345 'auth caps',
1346 {
1347 'entity': client_entity,
1348 'caps': caps_list
1349 })
1350
1351 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1352 except rados.Error:
1353 # Already gone, great.
1354 return
1355
1356 def get_authorized_ids(self, volume_path):
1357 """
1358 Expose a list of auth IDs that have access to a volume.
1359
1360 return: a list of (auth_id, access_level) tuples, where
1361 the access_level can be 'r' , or 'rw'.
1362 None if no auth ID is given access to the volume.
1363 """
1364 with self._volume_lock(volume_path):
1365 meta = self._volume_metadata_get(volume_path)
1366 auths = []
1367 if not meta or not meta['auths']:
1368 return None
1369
1370 for auth, auth_data in meta['auths'].items():
1371 # Skip partial auth updates.
1372 if not auth_data['dirty']:
1373 auths.append((auth, auth_data['access_level']))
1374
1375 return auths
1376
1377 def _rados_command(self, prefix, args=None, decode=True):
1378 """
1379 Safer wrapper for ceph_argparse.json_command, which raises
1380 Error exception instead of relying on caller to check return
1381 codes.
1382
1383 Error exception can result from:
1384 * Timeout
1385 * Actual legitimate errors
1386 * Malformed JSON output
1387
1388 return: Decoded object from ceph, or None if empty string returned.
1389 If decode is False, return a string (the data returned by
1390 ceph command)
1391 """
1392 if args is None:
1393 args = {}
1394
1395 argdict = args.copy()
1396 argdict['format'] = 'json'
1397
1398 ret, outbuf, outs = json_command(self.rados,
1399 prefix=prefix,
1400 argdict=argdict,
1401 timeout=RADOS_TIMEOUT)
1402 if ret != 0:
1403 raise rados.Error(outs)
1404 else:
1405 if decode:
1406 if outbuf:
1407 try:
1408 return json.loads(outbuf.decode())
1409 except (ValueError, TypeError):
1410 raise RadosError("Invalid JSON output for command {0}".format(argdict))
1411 else:
1412 return None
1413 else:
1414 return outbuf
1415
1416 def get_used_bytes(self, volume_path):
1417 return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir."
1418 "rbytes").decode())
1419
1420 def set_max_bytes(self, volume_path, max_bytes):
1421 self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes',
1422 to_bytes(max_bytes if max_bytes else 0), 0)
1423
1424 def _snapshot_path(self, dir_path, snapshot_name):
1425 return os.path.join(
1426 dir_path, self.rados.conf_get('client_snapdir'), snapshot_name
1427 )
1428
1429 def _snapshot_create(self, dir_path, snapshot_name, mode=0o755):
1430 # TODO: raise intelligible exception for clusters where snaps are disabled
1431 self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), mode)
1432
1433 def _snapshot_destroy(self, dir_path, snapshot_name):
1434 """
1435 Remove a snapshot, or do nothing if it already doesn't exist.
1436 """
1437 try:
1438 self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name))
1439 except cephfs.ObjectNotFound:
1440 log.warning("Snapshot was already gone: {0}".format(snapshot_name))
1441
1442 def create_snapshot_volume(self, volume_path, snapshot_name, mode=0o755):
1443 self._snapshot_create(self._get_path(volume_path), snapshot_name, mode)
1444
1445 def destroy_snapshot_volume(self, volume_path, snapshot_name):
1446 self._snapshot_destroy(self._get_path(volume_path), snapshot_name)
1447
1448 def create_snapshot_group(self, group_id, snapshot_name, mode=0o755):
1449 if group_id is None:
1450 raise RuntimeError("Group ID may not be None")
1451
1452 return self._snapshot_create(self._get_group_path(group_id), snapshot_name,
1453 mode)
1454
1455 def destroy_snapshot_group(self, group_id, snapshot_name):
1456 if group_id is None:
1457 raise RuntimeError("Group ID may not be None")
1458 if snapshot_name is None:
1459 raise RuntimeError("Snapshot name may not be None")
1460
1461 return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name)
1462
1463 def _cp_r(self, src, dst):
1464 # TODO
1465 raise NotImplementedError()
1466
1467 def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name):
1468 dest_fs_path = self._get_path(dest_volume_path)
1469 src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name)
1470
1471 self._cp_r(src_snapshot_path, dest_fs_path)
1472
1473 def put_object(self, pool_name, object_name, data):
1474 """
1475 Synchronously write data to an object.
1476
1477 :param pool_name: name of the pool
1478 :type pool_name: str
1479 :param object_name: name of the object
1480 :type object_name: str
1481 :param data: data to write
1482 :type data: bytes
1483 """
1484 return self.put_object_versioned(pool_name, object_name, data)
1485
1486 def put_object_versioned(self, pool_name, object_name, data, version=None):
1487 """
1488 Synchronously write data to an object only if version of the object
1489 version matches the expected version.
1490
1491 :param pool_name: name of the pool
1492 :type pool_name: str
1493 :param object_name: name of the object
1494 :type object_name: str
1495 :param data: data to write
1496 :type data: bytes
1497 :param version: expected version of the object to write
1498 :type version: int
1499 """
1500 ioctx = self.rados.open_ioctx(pool_name)
1501
1502 max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
1503 if len(data) > max_size:
1504 msg = ("Data to be written to object '{0}' exceeds "
1505 "{1} bytes".format(object_name, max_size))
1506 log.error(msg)
1507 raise CephFSVolumeClientError(msg)
1508
1509 try:
1510 with rados.WriteOpCtx() as wop:
1511 if version is not None:
1512 wop.assert_version(version)
1513 wop.write_full(data)
1514 ioctx.operate_write_op(wop, object_name)
1515 except rados.OSError as e:
1516 log.error(e)
1517 raise e
1518 finally:
1519 ioctx.close()
1520
1521 def get_object(self, pool_name, object_name):
1522 """
1523 Synchronously read data from object.
1524
1525 :param pool_name: name of the pool
1526 :type pool_name: str
1527 :param object_name: name of the object
1528 :type object_name: str
1529
1530 :returns: bytes - data read from object
1531 """
1532 return self.get_object_and_version(pool_name, object_name)[0]
1533
1534 def get_object_and_version(self, pool_name, object_name):
1535 """
1536 Synchronously read data from object and get its version.
1537
1538 :param pool_name: name of the pool
1539 :type pool_name: str
1540 :param object_name: name of the object
1541 :type object_name: str
1542
1543 :returns: tuple of object data and version
1544 """
1545 ioctx = self.rados.open_ioctx(pool_name)
1546 max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
1547 try:
1548 bytes_read = ioctx.read(object_name, max_size)
1549 if ((len(bytes_read) == max_size) and
1550 (ioctx.read(object_name, 1, offset=max_size))):
1551 log.warning("Size of object {0} exceeds '{1}' bytes "
1552 "read".format(object_name, max_size))
1553 obj_version = ioctx.get_last_version()
1554 finally:
1555 ioctx.close()
1556 return (bytes_read, obj_version)
1557
1558 def delete_object(self, pool_name, object_name):
1559 ioctx = self.rados.open_ioctx(pool_name)
1560 try:
1561 ioctx.remove_object(object_name)
1562 except rados.ObjectNotFound:
1563 log.warning("Object '{0}' was already removed".format(object_name))
1564 finally:
1565 ioctx.close()