]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/ceph_volume_client.py
update sources to 12.2.7
[ceph.git] / ceph / src / pybind / ceph_volume_client.py
1 """
2 Copyright (C) 2015 Red Hat, Inc.
3
4 LGPL2. See file COPYING.
5 """
6
7 from contextlib import contextmanager
8 import errno
9 import fcntl
10 import json
11 import logging
12 import os
13 import re
14 import struct
15 import sys
16 import threading
17 import time
18 import uuid
19
20 from ceph_argparse import json_command
21
22 import cephfs
23 import rados
24
25
26 class RadosError(Exception):
27 """
28 Something went wrong talking to Ceph with librados
29 """
30 pass
31
32
33 RADOS_TIMEOUT = 10
34
35 log = logging.getLogger(__name__)
36
37
38 # Reserved volume group name which we use in paths for volumes
39 # that are not assigned to a group (i.e. created with group=None)
40 NO_GROUP_NAME = "_nogroup"
41
42 # Filename extensions for meta files.
43 META_FILE_EXT = ".meta"
44
45 class VolumePath(object):
46 """
47 Identify a volume's path as group->volume
48 The Volume ID is a unique identifier, but this is a much more
49 helpful thing to pass around.
50 """
51 def __init__(self, group_id, volume_id):
52 self.group_id = group_id
53 self.volume_id = volume_id
54 assert self.group_id != NO_GROUP_NAME
55 assert self.volume_id != "" and self.volume_id is not None
56
57 def __str__(self):
58 return "{0}/{1}".format(self.group_id, self.volume_id)
59
60
61 class ClusterTimeout(Exception):
62 """
63 Exception indicating that we timed out trying to talk to the Ceph cluster,
64 either to the mons, or to any individual daemon that the mons indicate ought
65 to be up but isn't responding to us.
66 """
67 pass
68
69
70 class ClusterError(Exception):
71 """
72 Exception indicating that the cluster returned an error to a command that
73 we thought should be successful based on our last knowledge of the cluster
74 state.
75 """
76 def __init__(self, action, result_code, result_str):
77 self._action = action
78 self._result_code = result_code
79 self._result_str = result_str
80
81 def __str__(self):
82 return "Error {0} (\"{1}\") while {2}".format(
83 self._result_code, self._result_str, self._action)
84
85
86 class RankEvicter(threading.Thread):
87 """
88 Thread for evicting client(s) from a particular MDS daemon instance.
89
90 This is more complex than simply sending a command, because we have to
91 handle cases where MDS daemons might not be fully up yet, and/or might
92 be transiently unresponsive to commands.
93 """
94 class GidGone(Exception):
95 pass
96
97 POLL_PERIOD = 5
98
99 def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout):
100 """
101 :param client_spec: list of strings, used as filter arguments to "session evict"
102 pass ["id=123"] to evict a single client with session id 123.
103 """
104 self.rank = rank
105 self.gid = gid
106 self._mds_map = mds_map
107 self._client_spec = client_spec
108 self._volume_client = volume_client
109 self._ready_timeout = ready_timeout
110 self._ready_waited = 0
111
112 self.success = False
113 self.exception = None
114
115 super(RankEvicter, self).__init__()
116
117 def _ready_to_evict(self):
118 if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid:
119 log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format(
120 self._client_spec, self.rank, self.gid
121 ))
122 raise RankEvicter.GidGone()
123
124 info = self._mds_map['info']["gid_{0}".format(self.gid)]
125 log.debug("_ready_to_evict: state={0}".format(info['state']))
126 return info['state'] in ["up:active", "up:clientreplay"]
127
128 def _wait_for_ready(self):
129 """
130 Wait for that MDS rank to reach an active or clientreplay state, and
131 not be laggy.
132 """
133 while not self._ready_to_evict():
134 if self._ready_waited > self._ready_timeout:
135 raise ClusterTimeout()
136
137 time.sleep(self.POLL_PERIOD)
138 self._ready_waited += self.POLL_PERIOD
139
140 self._mds_map = self._volume_client._rados_command("mds dump", {})
141
142 def _evict(self):
143 """
144 Run the eviction procedure. Return true on success, false on errors.
145 """
146
147 # Wait til the MDS is believed by the mon to be available for commands
148 try:
149 self._wait_for_ready()
150 except self.GidGone:
151 return True
152
153 # Then send it an evict
154 ret = errno.ETIMEDOUT
155 while ret == errno.ETIMEDOUT:
156 log.debug("mds_command: {0}, {1}".format(
157 "%s" % self.gid, ["session", "evict"] + self._client_spec
158 ))
159 ret, outb, outs = self._volume_client.fs.mds_command(
160 "%s" % self.gid,
161 [json.dumps({
162 "prefix": "session evict",
163 "filters": self._client_spec
164 })], "")
165 log.debug("mds_command: complete {0} {1}".format(ret, outs))
166
167 # If we get a clean response, great, it's gone from that rank.
168 if ret == 0:
169 return True
170 elif ret == errno.ETIMEDOUT:
171 # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error)
172 self._mds_map = self._volume_client._rados_command("mds dump", {})
173 try:
174 self._wait_for_ready()
175 except self.GidGone:
176 return True
177 else:
178 raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs)
179
180 def run(self):
181 try:
182 self._evict()
183 except Exception as e:
184 self.success = False
185 self.exception = e
186 else:
187 self.success = True
188
189
190 class EvictionError(Exception):
191 pass
192
193
194 class CephFSVolumeClientError(Exception):
195 """
196 Something went wrong talking to Ceph using CephFSVolumeClient.
197 """
198 pass
199
200
201 CEPHFSVOLUMECLIENT_VERSION_HISTORY = """
202
203 CephFSVolumeClient Version History:
204
205 * 1 - Initial version
206 * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient
207 * 3 - Allow volumes to be created without RADOS namespace isolation
208 """
209
210
211 class CephFSVolumeClient(object):
212 """
213 Combine libcephfs and librados interfaces to implement a
214 'Volume' concept implemented as a cephfs directory and
215 client capabilities which restrict mount access to this
216 directory.
217
218 Additionally, volumes may be in a 'Group'. Conveniently,
219 volumes are a lot like manila shares, and groups are a lot
220 like manila consistency groups.
221
222 Refer to volumes with VolumePath, which specifies the
223 volume and group IDs (both strings). The group ID may
224 be None.
225
226 In general, functions in this class are allowed raise rados.Error
227 or cephfs.Error exceptions in unexpected situations.
228 """
229
230 # Current version
231 version = 3
232
233 # Where shall we create our volumes?
234 POOL_PREFIX = "fsvolume_"
235 DEFAULT_VOL_PREFIX = "/volumes"
236 DEFAULT_NS_PREFIX = "fsvolumens_"
237
238 def __init__(self, auth_id, conf_path, cluster_name, volume_prefix=None, pool_ns_prefix=None):
239 self.fs = None
240 self.rados = None
241 self.connected = False
242 self.conf_path = conf_path
243 self.cluster_name = cluster_name
244 self.auth_id = auth_id
245 self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX
246 self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX
247 # For flock'ing in cephfs, I want a unique ID to distinguish me
248 # from any other manila-share services that are loading this module.
249 # We could use pid, but that's unnecessary weak: generate a
250 # UUID
251 self._id = struct.unpack(">Q", uuid.uuid1().get_bytes()[0:8])[0]
252
253 # TODO: version the on-disk structures
254
255 def recover(self):
256 # Scan all auth keys to see if they're dirty: if they are, they have
257 # state that might not have propagated to Ceph or to the related
258 # volumes yet.
259
260 # Important: we *always* acquire locks in the order auth->volume
261 # That means a volume can never be dirty without the auth key
262 # we're updating it with being dirty at the same time.
263
264 # First list the auth IDs that have potentially dirty on-disk metadata
265 log.debug("Recovering from partial auth updates (if any)...")
266
267 try:
268 dir_handle = self.fs.opendir(self.volume_prefix)
269 except cephfs.ObjectNotFound:
270 log.debug("Nothing to recover. No auth meta files.")
271 return
272
273 d = self.fs.readdir(dir_handle)
274 auth_ids = []
275
276 if not d:
277 log.debug("Nothing to recover. No auth meta files.")
278
279 while d:
280 # Identify auth IDs from auth meta filenames. The auth meta files
281 # are named as, "$<auth_id><meta filename extension>"
282 regex = "^\$(.*){0}$".format(re.escape(META_FILE_EXT))
283 match = re.search(regex, d.d_name)
284 if match:
285 auth_ids.append(match.group(1))
286
287 d = self.fs.readdir(dir_handle)
288
289 self.fs.closedir(dir_handle)
290
291 # Key points based on ordering:
292 # * Anything added in VMeta is already added in AMeta
293 # * Anything added in Ceph is already added in VMeta
294 # * Anything removed in VMeta is already removed in Ceph
295 # * Anything removed in AMeta is already removed in VMeta
296
297 # Deauthorization: because I only update metadata AFTER the
298 # update of the next level down, I have the same ordering of
299 # -> things which exist in the AMeta should also exist
300 # in the VMeta, should also exist in Ceph, and the same
301 # recovery procedure that gets me consistent after crashes
302 # during authorization will also work during deauthorization
303
304 # Now for each auth ID, check for dirty flag and apply updates
305 # if dirty flag is found
306 for auth_id in auth_ids:
307 with self._auth_lock(auth_id):
308 auth_meta = self._auth_metadata_get(auth_id)
309 if not auth_meta or not auth_meta['volumes']:
310 # Clean up auth meta file
311 self.fs.unlink(self._auth_metadata_path(auth_id))
312 continue
313 if not auth_meta['dirty']:
314 continue
315 self._recover_auth_meta(auth_id, auth_meta)
316
317 log.debug("Recovered from partial auth updates (if any).")
318
319 def _recover_auth_meta(self, auth_id, auth_meta):
320 """
321 Call me after locking the auth meta file.
322 """
323 remove_volumes = []
324
325 for volume, volume_data in auth_meta['volumes'].items():
326 if not volume_data['dirty']:
327 continue
328
329 (group_id, volume_id) = volume.split('/')
330 group_id = group_id if group_id is not 'None' else None
331 volume_path = VolumePath(group_id, volume_id)
332 access_level = volume_data['access_level']
333
334 with self._volume_lock(volume_path):
335 vol_meta = self._volume_metadata_get(volume_path)
336
337 # No VMeta update indicates that there was no auth update
338 # in Ceph either. So it's safe to remove corresponding
339 # partial update in AMeta.
340 if not vol_meta or auth_id not in vol_meta['auths']:
341 remove_volumes.append(volume)
342 continue
343
344 want_auth = {
345 'access_level': access_level,
346 'dirty': False,
347 }
348 # VMeta update looks clean. Ceph auth update must have been
349 # clean.
350 if vol_meta['auths'][auth_id] == want_auth:
351 continue
352
353 readonly = True if access_level is 'r' else False
354 self._authorize_volume(volume_path, auth_id, readonly)
355
356 # Recovered from partial auth updates for the auth ID's access
357 # to a volume.
358 auth_meta['volumes'][volume]['dirty'] = False
359 self._auth_metadata_set(auth_id, auth_meta)
360
361 for volume in remove_volumes:
362 del auth_meta['volumes'][volume]
363
364 if not auth_meta['volumes']:
365 # Clean up auth meta file
366 self.fs.unlink(self._auth_metadata_path(auth_id))
367 return
368
369 # Recovered from all partial auth updates for the auth ID.
370 auth_meta['dirty'] = False
371 self._auth_metadata_set(auth_id, auth_meta)
372
373
374 def evict(self, auth_id, timeout=30, volume_path=None):
375 """
376 Evict all clients based on the authorization ID and optionally based on
377 the volume path mounted. Assumes that the authorization key has been
378 revoked prior to calling this function.
379
380 This operation can throw an exception if the mon cluster is unresponsive, or
381 any individual MDS daemon is unresponsive for longer than the timeout passed in.
382 """
383
384 client_spec = ["auth_name={0}".format(auth_id), ]
385 if volume_path:
386 client_spec.append("client_metadata.root={0}".
387 format(self._get_path(volume_path)))
388
389 log.info("evict clients with {0}".format(', '.join(client_spec)))
390
391 mds_map = self._rados_command("mds dump", {})
392
393 up = {}
394 for name, gid in mds_map['up'].items():
395 # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
396 assert name.startswith("mds_")
397 up[int(name[4:])] = gid
398
399 # For all MDS ranks held by a daemon
400 # Do the parallelism in python instead of using "tell mds.*", because
401 # the latter doesn't give us per-mds output
402 threads = []
403 for rank, gid in up.items():
404 thread = RankEvicter(self, client_spec, rank, gid, mds_map,
405 timeout)
406 thread.start()
407 threads.append(thread)
408
409 for t in threads:
410 t.join()
411
412 log.info("evict: joined all")
413
414 for t in threads:
415 if not t.success:
416 msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}".
417 format(', '.join(client_spec), t.rank, t.gid, t.exception)
418 )
419 log.error(msg)
420 raise EvictionError(msg)
421
422 def _get_path(self, volume_path):
423 """
424 Determine the path within CephFS where this volume will live
425 :return: absolute path (string)
426 """
427 return os.path.join(
428 self.volume_prefix,
429 volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME,
430 volume_path.volume_id)
431
432 def _get_group_path(self, group_id):
433 if group_id is None:
434 raise ValueError("group_id may not be None")
435
436 return os.path.join(
437 self.volume_prefix,
438 group_id
439 )
440
441 def connect(self, premount_evict = None):
442 """
443
444 :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers
445 may want to use this to specify their own auth ID if they expect
446 to be a unique instance and don't want to wait for caps to time
447 out after failure of another instance of themselves.
448 """
449 log.debug("Connecting to RADOS with config {0}...".format(self.conf_path))
450 self.rados = rados.Rados(
451 name="client.{0}".format(self.auth_id),
452 clustername=self.cluster_name,
453 conffile=self.conf_path,
454 conf={}
455 )
456 self.rados.connect()
457
458 log.debug("Connection to RADOS complete")
459
460 log.debug("Connecting to cephfs...")
461 self.fs = cephfs.LibCephFS(rados_inst=self.rados)
462 log.debug("CephFS initializing...")
463 self.fs.init()
464 if premount_evict is not None:
465 log.debug("Premount eviction of {0} starting".format(premount_evict))
466 self.evict(premount_evict)
467 log.debug("Premount eviction of {0} completes".format(premount_evict))
468 log.debug("CephFS mounting...")
469 self.fs.mount()
470 log.debug("Connection to cephfs complete")
471
472 # Recover from partial auth updates due to a previous
473 # crash.
474 self.recover()
475
476 def get_mon_addrs(self):
477 log.info("get_mon_addrs")
478 result = []
479 mon_map = self._rados_command("mon dump")
480 for mon in mon_map['mons']:
481 ip_port = mon['addr'].split("/")[0]
482 result.append(ip_port)
483
484 return result
485
486 def disconnect(self):
487 log.info("disconnect")
488 if self.fs:
489 log.debug("Disconnecting cephfs...")
490 self.fs.shutdown()
491 self.fs = None
492 log.debug("Disconnecting cephfs complete")
493
494 if self.rados:
495 log.debug("Disconnecting rados...")
496 self.rados.shutdown()
497 self.rados = None
498 log.debug("Disconnecting rados complete")
499
500 def __del__(self):
501 self.disconnect()
502
503 def _get_pool_id(self, osd_map, pool_name):
504 # Maybe borrow the OSDMap wrapper class from calamari if more helpers
505 # like this are needed.
506 for pool in osd_map['pools']:
507 if pool['pool_name'] == pool_name:
508 return pool['pool']
509
510 return None
511
512 def _create_volume_pool(self, pool_name):
513 """
514 Idempotently create a pool for use as a CephFS data pool, with the given name
515
516 :return The ID of the created pool
517 """
518 osd_map = self._rados_command('osd dump', {})
519
520 existing_id = self._get_pool_id(osd_map, pool_name)
521 if existing_id is not None:
522 log.info("Pool {0} already exists".format(pool_name))
523 return existing_id
524
525 osd_count = len(osd_map['osds'])
526
527 # We can't query the actual cluster config remotely, but since this is
528 # just a heuristic we'll assume that the ceph.conf we have locally reflects
529 # that in use in the rest of the cluster.
530 pg_warn_max_per_osd = int(self.rados.conf_get('mon_max_pg_per_osd'))
531
532 other_pgs = 0
533 for pool in osd_map['pools']:
534 if not pool['pool_name'].startswith(self.POOL_PREFIX):
535 other_pgs += pool['pg_num']
536
537 # A basic heuristic for picking pg_num: work out the max number of
538 # PGs we can have without tripping a warning, then subtract the number
539 # of PGs already created by non-manila pools, then divide by ten. That'll
540 # give you a reasonable result on a system where you have "a few" manila
541 # shares.
542 pg_num = ((pg_warn_max_per_osd * osd_count) - other_pgs) / 10
543 # TODO Alternatively, respect an override set by the user.
544
545 self._rados_command(
546 'osd pool create',
547 {
548 'pool': pool_name,
549 'pg_num': pg_num
550 }
551 )
552
553 osd_map = self._rados_command('osd dump', {})
554 pool_id = self._get_pool_id(osd_map, pool_name)
555
556 if pool_id is None:
557 # If the pool isn't there, that's either a ceph bug, or it's some outside influence
558 # removing it right after we created it.
559 log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format(
560 pool_name, json.dumps(osd_map, indent=2)
561 ))
562 raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name))
563 else:
564 return pool_id
565
566 def create_group(self, group_id):
567 # Prevent craftily-named volume groups from colliding with the meta
568 # files.
569 if group_id.endswith(META_FILE_EXT):
570 raise ValueError("group ID cannot end with '{0}'.".format(
571 META_FILE_EXT))
572 path = self._get_group_path(group_id)
573 self._mkdir_p(path)
574
575 def destroy_group(self, group_id):
576 path = self._get_group_path(group_id)
577 try:
578 self.fs.stat(self.volume_prefix)
579 except cephfs.ObjectNotFound:
580 pass
581 else:
582 self.fs.rmdir(path)
583
584 def _mkdir_p(self, path):
585 try:
586 self.fs.stat(path)
587 except cephfs.ObjectNotFound:
588 pass
589 else:
590 return
591
592 parts = path.split(os.path.sep)
593
594 for i in range(1, len(parts) + 1):
595 subpath = os.path.join(*parts[0:i])
596 try:
597 self.fs.stat(subpath)
598 except cephfs.ObjectNotFound:
599 self.fs.mkdir(subpath, 0o755)
600
601 def create_volume(self, volume_path, size=None, data_isolated=False, namespace_isolated=True):
602 """
603 Set up metadata, pools and auth for a volume.
604
605 This function is idempotent. It is safe to call this again
606 for an already-created volume, even if it is in use.
607
608 :param volume_path: VolumePath instance
609 :param size: In bytes, or None for no size limit
610 :param data_isolated: If true, create a separate OSD pool for this volume
611 :param namespace_isolated: If true, use separate RADOS namespace for this volume
612 :return:
613 """
614 path = self._get_path(volume_path)
615 log.info("create_volume: {0}".format(path))
616
617 self._mkdir_p(path)
618
619 if size is not None:
620 self.fs.setxattr(path, 'ceph.quota.max_bytes', size.__str__(), 0)
621
622 # data_isolated means create a separate pool for this volume
623 if data_isolated:
624 pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
625 log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name))
626 pool_id = self._create_volume_pool(pool_name)
627 mds_map = self._rados_command("mds dump", {})
628 if pool_id not in mds_map['data_pools']:
629 self._rados_command("mds add_data_pool", {
630 'pool': pool_name
631 })
632 self.fs.setxattr(path, 'ceph.dir.layout.pool', pool_name, 0)
633
634 # enforce security isolation, use separate namespace for this volume
635 if namespace_isolated:
636 namespace = "{0}{1}".format(self.pool_ns_prefix, volume_path.volume_id)
637 log.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path, namespace))
638 self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace', namespace, 0)
639 else:
640 # If volume's namespace layout is not set, then the volume's pool
641 # layout remains unset and will undesirably change with ancestor's
642 # pool layout changes.
643 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
644 self.fs.setxattr(path, 'ceph.dir.layout.pool', pool_name, 0)
645
646 # Create a volume meta file, if it does not already exist, to store
647 # data about auth ids having access to the volume
648 fd = self.fs.open(self._volume_metadata_path(volume_path),
649 os.O_CREAT, 0o755)
650 self.fs.close(fd)
651
652 return {
653 'mount_path': path
654 }
655
656 def delete_volume(self, volume_path, data_isolated=False):
657 """
658 Make a volume inaccessible to guests. This function is
659 idempotent. This is the fast part of tearing down a volume: you must
660 also later call purge_volume, which is the slow part.
661
662 :param volume_path: Same identifier used in create_volume
663 :return:
664 """
665
666 path = self._get_path(volume_path)
667 log.info("delete_volume: {0}".format(path))
668
669 # Create the trash folder if it doesn't already exist
670 trash = os.path.join(self.volume_prefix, "_deleting")
671 self._mkdir_p(trash)
672
673 # We'll move it to here
674 trashed_volume = os.path.join(trash, volume_path.volume_id)
675
676 # Move the volume's data to the trash folder
677 try:
678 self.fs.stat(path)
679 except cephfs.ObjectNotFound:
680 log.warning("Trying to delete volume '{0}' but it's already gone".format(
681 path))
682 else:
683 self.fs.rename(path, trashed_volume)
684
685 # Delete the volume meta file, if it's not already deleted
686 vol_meta_path = self._volume_metadata_path(volume_path)
687 try:
688 self.fs.unlink(vol_meta_path)
689 except cephfs.ObjectNotFound:
690 pass
691
692 def purge_volume(self, volume_path, data_isolated=False):
693 """
694 Finish clearing up a volume that was previously passed to delete_volume. This
695 function is idempotent.
696 """
697
698 trash = os.path.join(self.volume_prefix, "_deleting")
699 trashed_volume = os.path.join(trash, volume_path.volume_id)
700
701 try:
702 self.fs.stat(trashed_volume)
703 except cephfs.ObjectNotFound:
704 log.warning("Trying to purge volume '{0}' but it's already been purged".format(
705 trashed_volume))
706 return
707
708 def rmtree(root_path):
709 log.debug("rmtree {0}".format(root_path))
710 dir_handle = self.fs.opendir(root_path)
711 d = self.fs.readdir(dir_handle)
712 while d:
713 if d.d_name not in [".", ".."]:
714 # Do not use os.path.join because it is sensitive
715 # to string encoding, we just pass through dnames
716 # as byte arrays
717 d_full = "{0}/{1}".format(root_path, d.d_name)
718 if d.is_dir():
719 rmtree(d_full)
720 else:
721 self.fs.unlink(d_full)
722
723 d = self.fs.readdir(dir_handle)
724 self.fs.closedir(dir_handle)
725
726 self.fs.rmdir(root_path)
727
728 rmtree(trashed_volume)
729
730 if data_isolated:
731 pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
732 osd_map = self._rados_command("osd dump", {})
733 pool_id = self._get_pool_id(osd_map, pool_name)
734 mds_map = self._rados_command("mds dump", {})
735 if pool_id in mds_map['data_pools']:
736 self._rados_command("mds remove_data_pool", {
737 'pool': pool_name
738 })
739 self._rados_command("osd pool delete",
740 {
741 "pool": pool_name,
742 "pool2": pool_name,
743 "sure": "--yes-i-really-really-mean-it"
744 })
745
746 def _get_ancestor_xattr(self, path, attr):
747 """
748 Helper for reading layout information: if this xattr is missing
749 on the requested path, keep checking parents until we find it.
750 """
751 try:
752 result = self.fs.getxattr(path, attr)
753 if result == "":
754 # Annoying! cephfs gives us empty instead of an error when attr not found
755 raise cephfs.NoData()
756 else:
757 return result
758 except cephfs.NoData:
759 if path == "/":
760 raise
761 else:
762 return self._get_ancestor_xattr(os.path.split(path)[0], attr)
763
764 def _check_compat_version(self, compat_version):
765 if self.version < compat_version:
766 msg = ("The current version of CephFSVolumeClient, version {0} "
767 "does not support the required feature. Need version {1} "
768 "or greater".format(self.version, compat_version)
769 )
770 log.error(msg)
771 raise CephFSVolumeClientError(msg)
772
773 def _metadata_get(self, path):
774 """
775 Return a deserialized JSON object, or None
776 """
777 fd = self.fs.open(path, "r")
778 # TODO iterate instead of assuming file < 4MB
779 read_bytes = self.fs.read(fd, 0, 4096 * 1024)
780 self.fs.close(fd)
781 if read_bytes:
782 return json.loads(read_bytes)
783 else:
784 return None
785
786 def _metadata_set(self, path, data):
787 serialized = json.dumps(data)
788 fd = self.fs.open(path, "w")
789 try:
790 self.fs.write(fd, serialized, 0)
791 self.fs.fsync(fd, 0)
792 finally:
793 self.fs.close(fd)
794
795 def _lock(self, path):
796 @contextmanager
797 def fn():
798 while(1):
799 fd = self.fs.open(path, os.O_CREAT, 0o755)
800 self.fs.flock(fd, fcntl.LOCK_EX, self._id)
801
802 # The locked file will be cleaned up sometime. It could be
803 # unlinked e.g., by an another manila share instance, before
804 # lock was applied on it. Perform checks to ensure that this
805 # does not happen.
806 try:
807 statbuf = self.fs.stat(path)
808 except cephfs.ObjectNotFound:
809 self.fs.close(fd)
810 continue
811
812 fstatbuf = self.fs.fstat(fd)
813 if statbuf.st_ino == fstatbuf.st_ino:
814 break
815
816 try:
817 yield
818 finally:
819 self.fs.flock(fd, fcntl.LOCK_UN, self._id)
820 self.fs.close(fd)
821
822 return fn()
823
824 def _auth_metadata_path(self, auth_id):
825 return os.path.join(self.volume_prefix, "${0}{1}".format(
826 auth_id, META_FILE_EXT))
827
828 def _auth_lock(self, auth_id):
829 return self._lock(self._auth_metadata_path(auth_id))
830
831 def _auth_metadata_get(self, auth_id):
832 """
833 Call me with the metadata locked!
834
835 Check whether a auth metadata structure can be decoded by the current
836 version of CephFSVolumeClient.
837
838 Return auth metadata that the current version of CephFSVolumeClient
839 can decode.
840 """
841 auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id))
842
843 if auth_metadata:
844 self._check_compat_version(auth_metadata['compat_version'])
845
846 return auth_metadata
847
848 def _auth_metadata_set(self, auth_id, data):
849 """
850 Call me with the metadata locked!
851
852 Fsync the auth metadata.
853
854 Add two version attributes to the auth metadata,
855 'compat_version', the minimum CephFSVolumeClient version that can
856 decode the metadata, and 'version', the CephFSVolumeClient version
857 that encoded the metadata.
858 """
859 data['compat_version'] = 1
860 data['version'] = self.version
861 return self._metadata_set(self._auth_metadata_path(auth_id), data)
862
863 def _volume_metadata_path(self, volume_path):
864 return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format(
865 volume_path.group_id if volume_path.group_id else "",
866 volume_path.volume_id,
867 META_FILE_EXT
868 ))
869
870 def _volume_lock(self, volume_path):
871 """
872 Return a ContextManager which locks the authorization metadata for
873 a particular volume, and persists a flag to the metadata indicating
874 that it is currently locked, so that we can detect dirty situations
875 during recovery.
876
877 This lock isn't just to make access to the metadata safe: it's also
878 designed to be used over the two-step process of checking the
879 metadata and then responding to an authorization request, to
880 ensure that at the point we respond the metadata hasn't changed
881 in the background. It's key to how we avoid security holes
882 resulting from races during that problem ,
883 """
884 return self._lock(self._volume_metadata_path(volume_path))
885
886 def _volume_metadata_get(self, volume_path):
887 """
888 Call me with the metadata locked!
889
890 Check whether a volume metadata structure can be decoded by the current
891 version of CephFSVolumeClient.
892
893 Return a volume_metadata structure that the current version of
894 CephFSVolumeClient can decode.
895 """
896 volume_metadata = self._metadata_get(self._volume_metadata_path(volume_path))
897
898 if volume_metadata:
899 self._check_compat_version(volume_metadata['compat_version'])
900
901 return volume_metadata
902
903 def _volume_metadata_set(self, volume_path, data):
904 """
905 Call me with the metadata locked!
906
907 Add two version attributes to the volume metadata,
908 'compat_version', the minimum CephFSVolumeClient version that can
909 decode the metadata and 'version', the CephFSVolumeClient version
910 that encoded the metadata.
911 """
912 data['compat_version'] = 1
913 data['version'] = self.version
914 return self._metadata_set(self._volume_metadata_path(volume_path), data)
915
916 def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None):
917 """
918 Get-or-create a Ceph auth identity for `auth_id` and grant them access
919 to
920 :param volume_path:
921 :param auth_id:
922 :param readonly:
923 :param tenant_id: Optionally provide a stringizable object to
924 restrict any created cephx IDs to other callers
925 passing the same tenant ID.
926 :return:
927 """
928
929 with self._auth_lock(auth_id):
930 # Existing meta, or None, to be updated
931 auth_meta = self._auth_metadata_get(auth_id)
932
933 # volume data to be inserted
934 volume_path_str = str(volume_path)
935 volume = {
936 volume_path_str : {
937 # The access level at which the auth_id is authorized to
938 # access the volume.
939 'access_level': 'r' if readonly else 'rw',
940 'dirty': True,
941 }
942 }
943 if auth_meta is None:
944 sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format(
945 auth_id, tenant_id
946 ))
947 log.debug("Authorize: no existing meta")
948 auth_meta = {
949 'dirty': True,
950 'tenant_id': tenant_id.__str__() if tenant_id else None,
951 'volumes': volume
952 }
953
954 # Note: this is *not* guaranteeing that the key doesn't already
955 # exist in Ceph: we are allowing VolumeClient tenants to
956 # 'claim' existing Ceph keys. In order to prevent VolumeClient
957 # tenants from reading e.g. client.admin keys, you need to
958 # have configured your VolumeClient user (e.g. Manila) to
959 # have mon auth caps that prevent it from accessing those keys
960 # (e.g. limit it to only access keys with a manila.* prefix)
961 else:
962 # Disallow tenants to share auth IDs
963 if auth_meta['tenant_id'].__str__() != tenant_id.__str__():
964 msg = "auth ID: {0} is already in use".format(auth_id)
965 log.error(msg)
966 raise CephFSVolumeClientError(msg)
967
968 if auth_meta['dirty']:
969 self._recover_auth_meta(auth_id, auth_meta)
970
971 log.debug("Authorize: existing tenant {tenant}".format(
972 tenant=auth_meta['tenant_id']
973 ))
974 auth_meta['dirty'] = True
975 auth_meta['volumes'].update(volume)
976
977 self._auth_metadata_set(auth_id, auth_meta)
978
979 with self._volume_lock(volume_path):
980 key = self._authorize_volume(volume_path, auth_id, readonly)
981
982 auth_meta['dirty'] = False
983 auth_meta['volumes'][volume_path_str]['dirty'] = False
984 self._auth_metadata_set(auth_id, auth_meta)
985
986 if tenant_id:
987 return {
988 'auth_key': key
989 }
990 else:
991 # Caller wasn't multi-tenant aware: be safe and don't give
992 # them a key
993 return {
994 'auth_key': None
995 }
996
997 def _authorize_volume(self, volume_path, auth_id, readonly):
998 vol_meta = self._volume_metadata_get(volume_path)
999
1000 access_level = 'r' if readonly else 'rw'
1001 auth = {
1002 auth_id: {
1003 'access_level': access_level,
1004 'dirty': True,
1005 }
1006 }
1007
1008 if vol_meta is None:
1009 vol_meta = {
1010 'auths': auth
1011 }
1012 else:
1013 vol_meta['auths'].update(auth)
1014 self._volume_metadata_set(volume_path, vol_meta)
1015
1016 key = self._authorize_ceph(volume_path, auth_id, readonly)
1017
1018 vol_meta['auths'][auth_id]['dirty'] = False
1019 self._volume_metadata_set(volume_path, vol_meta)
1020
1021 return key
1022
1023 def _authorize_ceph(self, volume_path, auth_id, readonly):
1024 path = self._get_path(volume_path)
1025 log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(
1026 auth_id, path
1027 ))
1028
1029 # First I need to work out what the data pool is for this share:
1030 # read the layout
1031 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
1032
1033 try:
1034 namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_namespace")
1035 except cephfs.NoData:
1036 namespace = None
1037
1038 # Now construct auth capabilities that give the guest just enough
1039 # permissions to access the share
1040 client_entity = "client.{0}".format(auth_id)
1041 want_access_level = 'r' if readonly else 'rw'
1042 want_mds_cap = 'allow {0} path={1}'.format(want_access_level, path)
1043 if namespace:
1044 want_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
1045 want_access_level, pool_name, namespace)
1046 else:
1047 want_osd_cap = 'allow {0} pool={1}'.format(want_access_level,
1048 pool_name)
1049
1050 try:
1051 existing = self._rados_command(
1052 'auth get',
1053 {
1054 'entity': client_entity
1055 }
1056 )
1057 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1058 except rados.Error:
1059 caps = self._rados_command(
1060 'auth get-or-create',
1061 {
1062 'entity': client_entity,
1063 'caps': [
1064 'mds', want_mds_cap,
1065 'osd', want_osd_cap,
1066 'mon', 'allow r']
1067 })
1068 else:
1069 # entity exists, update it
1070 cap = existing[0]
1071
1072 # Construct auth caps that if present might conflict with the desired
1073 # auth caps.
1074 unwanted_access_level = 'r' if want_access_level is 'rw' else 'rw'
1075 unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path)
1076 if namespace:
1077 unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
1078 unwanted_access_level, pool_name, namespace)
1079 else:
1080 unwanted_osd_cap = 'allow {0} pool={1}'.format(
1081 unwanted_access_level, pool_name)
1082
1083 def cap_update(
1084 orig_mds_caps, orig_osd_caps, want_mds_cap,
1085 want_osd_cap, unwanted_mds_cap, unwanted_osd_cap):
1086
1087 if not orig_mds_caps:
1088 return want_mds_cap, want_osd_cap
1089
1090 mds_cap_tokens = orig_mds_caps.split(",")
1091 osd_cap_tokens = orig_osd_caps.split(",")
1092
1093 if want_mds_cap in mds_cap_tokens:
1094 return orig_mds_caps, orig_osd_caps
1095
1096 if unwanted_mds_cap in mds_cap_tokens:
1097 mds_cap_tokens.remove(unwanted_mds_cap)
1098 osd_cap_tokens.remove(unwanted_osd_cap)
1099
1100 mds_cap_tokens.append(want_mds_cap)
1101 osd_cap_tokens.append(want_osd_cap)
1102
1103 return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens)
1104
1105 orig_mds_caps = cap['caps'].get('mds', "")
1106 orig_osd_caps = cap['caps'].get('osd', "")
1107
1108 mds_cap_str, osd_cap_str = cap_update(
1109 orig_mds_caps, orig_osd_caps, want_mds_cap, want_osd_cap,
1110 unwanted_mds_cap, unwanted_osd_cap)
1111
1112 caps = self._rados_command(
1113 'auth caps',
1114 {
1115 'entity': client_entity,
1116 'caps': [
1117 'mds', mds_cap_str,
1118 'osd', osd_cap_str,
1119 'mon', cap['caps'].get('mon', 'allow r')]
1120 })
1121 caps = self._rados_command(
1122 'auth get',
1123 {
1124 'entity': client_entity
1125 }
1126 )
1127
1128 # Result expected like this:
1129 # [
1130 # {
1131 # "entity": "client.foobar",
1132 # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==",
1133 # "caps": {
1134 # "mds": "allow *",
1135 # "mon": "allow *"
1136 # }
1137 # }
1138 # ]
1139 assert len(caps) == 1
1140 assert caps[0]['entity'] == client_entity
1141 return caps[0]['key']
1142
1143 def deauthorize(self, volume_path, auth_id):
1144 with self._auth_lock(auth_id):
1145 # Existing meta, or None, to be updated
1146 auth_meta = self._auth_metadata_get(auth_id)
1147
1148 volume_path_str = str(volume_path)
1149 if (auth_meta is None) or (not auth_meta['volumes']):
1150 log.warn("deauthorized called for already-removed auth"
1151 "ID '{auth_id}' for volume ID '{volume}'".format(
1152 auth_id=auth_id, volume=volume_path.volume_id
1153 ))
1154 # Clean up the auth meta file of an auth ID
1155 self.fs.unlink(self._auth_metadata_path(auth_id))
1156 return
1157
1158 if volume_path_str not in auth_meta['volumes']:
1159 log.warn("deauthorized called for already-removed auth"
1160 "ID '{auth_id}' for volume ID '{volume}'".format(
1161 auth_id=auth_id, volume=volume_path.volume_id
1162 ))
1163 return
1164
1165 if auth_meta['dirty']:
1166 self._recover_auth_meta(auth_id, auth_meta)
1167
1168 auth_meta['dirty'] = True
1169 auth_meta['volumes'][volume_path_str]['dirty'] = True
1170 self._auth_metadata_set(auth_id, auth_meta)
1171
1172 self._deauthorize_volume(volume_path, auth_id)
1173
1174 # Filter out the volume we're deauthorizing
1175 del auth_meta['volumes'][volume_path_str]
1176
1177 # Clean up auth meta file
1178 if not auth_meta['volumes']:
1179 self.fs.unlink(self._auth_metadata_path(auth_id))
1180 return
1181
1182 auth_meta['dirty'] = False
1183 self._auth_metadata_set(auth_id, auth_meta)
1184
1185 def _deauthorize_volume(self, volume_path, auth_id):
1186 with self._volume_lock(volume_path):
1187 vol_meta = self._volume_metadata_get(volume_path)
1188
1189 if (vol_meta is None) or (auth_id not in vol_meta['auths']):
1190 log.warn("deauthorized called for already-removed auth"
1191 "ID '{auth_id}' for volume ID '{volume}'".format(
1192 auth_id=auth_id, volume=volume_path.volume_id
1193 ))
1194 return
1195
1196 vol_meta['auths'][auth_id]['dirty'] = True
1197 self._volume_metadata_set(volume_path, vol_meta)
1198
1199 self._deauthorize(volume_path, auth_id)
1200
1201 # Remove the auth_id from the metadata *after* removing it
1202 # from ceph, so that if we crashed here, we would actually
1203 # recreate the auth ID during recovery (i.e. end up with
1204 # a consistent state).
1205
1206 # Filter out the auth we're removing
1207 del vol_meta['auths'][auth_id]
1208 self._volume_metadata_set(volume_path, vol_meta)
1209
1210 def _deauthorize(self, volume_path, auth_id):
1211 """
1212 The volume must still exist.
1213 """
1214 client_entity = "client.{0}".format(auth_id)
1215 path = self._get_path(volume_path)
1216 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
1217 try:
1218 namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_namespace")
1219 except cephfs.NoData:
1220 namespace = None
1221
1222 # The auth_id might have read-only or read-write mount access for the
1223 # volume path.
1224 access_levels = ('r', 'rw')
1225 want_mds_caps = ['allow {0} path={1}'.format(access_level, path)
1226 for access_level in access_levels]
1227 if namespace:
1228 want_osd_caps = ['allow {0} pool={1} namespace={2}'.format(access_level, pool_name, namespace)
1229 for access_level in access_levels]
1230 else:
1231 want_osd_caps = ['allow {0} pool={1}'.format(access_level, pool_name)
1232 for access_level in access_levels]
1233
1234
1235 try:
1236 existing = self._rados_command(
1237 'auth get',
1238 {
1239 'entity': client_entity
1240 }
1241 )
1242
1243 def cap_remove(orig_mds_caps, orig_osd_caps, want_mds_caps, want_osd_caps):
1244 mds_cap_tokens = orig_mds_caps.split(",")
1245 osd_cap_tokens = orig_osd_caps.split(",")
1246
1247 for want_mds_cap, want_osd_cap in zip(want_mds_caps, want_osd_caps):
1248 if want_mds_cap in mds_cap_tokens:
1249 mds_cap_tokens.remove(want_mds_cap)
1250 osd_cap_tokens.remove(want_osd_cap)
1251 break
1252
1253 return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens)
1254
1255 cap = existing[0]
1256 orig_mds_caps = cap['caps'].get('mds', "")
1257 orig_osd_caps = cap['caps'].get('osd', "")
1258 mds_cap_str, osd_cap_str = cap_remove(orig_mds_caps, orig_osd_caps,
1259 want_mds_caps, want_osd_caps)
1260
1261 if not mds_cap_str:
1262 self._rados_command('auth del', {'entity': client_entity}, decode=False)
1263 else:
1264 self._rados_command(
1265 'auth caps',
1266 {
1267 'entity': client_entity,
1268 'caps': [
1269 'mds', mds_cap_str,
1270 'osd', osd_cap_str,
1271 'mon', cap['caps'].get('mon', 'allow r')]
1272 })
1273
1274 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1275 except rados.Error:
1276 # Already gone, great.
1277 return
1278
1279 def get_authorized_ids(self, volume_path):
1280 """
1281 Expose a list of auth IDs that have access to a volume.
1282
1283 return: a list of (auth_id, access_level) tuples, where
1284 the access_level can be 'r' , or 'rw'.
1285 None if no auth ID is given access to the volume.
1286 """
1287 with self._volume_lock(volume_path):
1288 meta = self._volume_metadata_get(volume_path)
1289 auths = []
1290 if not meta or not meta['auths']:
1291 return None
1292
1293 for auth, auth_data in meta['auths'].items():
1294 # Skip partial auth updates.
1295 if not auth_data['dirty']:
1296 auths.append((auth, auth_data['access_level']))
1297
1298 return auths
1299
1300 def _rados_command(self, prefix, args=None, decode=True):
1301 """
1302 Safer wrapper for ceph_argparse.json_command, which raises
1303 Error exception instead of relying on caller to check return
1304 codes.
1305
1306 Error exception can result from:
1307 * Timeout
1308 * Actual legitimate errors
1309 * Malformed JSON output
1310
1311 return: Decoded object from ceph, or None if empty string returned.
1312 If decode is False, return a string (the data returned by
1313 ceph command)
1314 """
1315 if args is None:
1316 args = {}
1317
1318 argdict = args.copy()
1319 argdict['format'] = 'json'
1320
1321 ret, outbuf, outs = json_command(self.rados,
1322 prefix=prefix,
1323 argdict=argdict,
1324 timeout=RADOS_TIMEOUT)
1325 if ret != 0:
1326 raise rados.Error(outs)
1327 else:
1328 if decode:
1329 if outbuf:
1330 try:
1331 return json.loads(outbuf)
1332 except (ValueError, TypeError):
1333 raise RadosError("Invalid JSON output for command {0}".format(argdict))
1334 else:
1335 return None
1336 else:
1337 return outbuf
1338
1339 def get_used_bytes(self, volume_path):
1340 return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir.rbytes"))
1341
1342 def set_max_bytes(self, volume_path, max_bytes):
1343 self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes',
1344 max_bytes.__str__() if max_bytes is not None else "0",
1345 0)
1346
1347 def _snapshot_path(self, dir_path, snapshot_name):
1348 return os.path.join(
1349 dir_path, self.rados.conf_get('client_snapdir'), snapshot_name
1350 )
1351
1352 def _snapshot_create(self, dir_path, snapshot_name):
1353 # TODO: raise intelligible exception for clusters where snaps are disabled
1354 self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), 0o755)
1355
1356 def _snapshot_destroy(self, dir_path, snapshot_name):
1357 """
1358 Remove a snapshot, or do nothing if it already doesn't exist.
1359 """
1360 try:
1361 self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name))
1362 except cephfs.ObjectNotFound:
1363 log.warn("Snapshot was already gone: {0}".format(snapshot_name))
1364
1365 def create_snapshot_volume(self, volume_path, snapshot_name):
1366 self._snapshot_create(self._get_path(volume_path), snapshot_name)
1367
1368 def destroy_snapshot_volume(self, volume_path, snapshot_name):
1369 self._snapshot_destroy(self._get_path(volume_path), snapshot_name)
1370
1371 def create_snapshot_group(self, group_id, snapshot_name):
1372 if group_id is None:
1373 raise RuntimeError("Group ID may not be None")
1374
1375 return self._snapshot_create(self._get_group_path(group_id), snapshot_name)
1376
1377 def destroy_snapshot_group(self, group_id, snapshot_name):
1378 if group_id is None:
1379 raise RuntimeError("Group ID may not be None")
1380 if snapshot_name is None:
1381 raise RuntimeError("Snapshot name may not be None")
1382
1383 return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name)
1384
1385 def _cp_r(self, src, dst):
1386 # TODO
1387 raise NotImplementedError()
1388
1389 def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name):
1390 dest_fs_path = self._get_path(dest_volume_path)
1391 src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name)
1392
1393 self._cp_r(src_snapshot_path, dest_fs_path)
1394
1395 def put_object(self, pool_name, object_name, data):
1396 """
1397 Synchronously write data to an object.
1398
1399 :param pool_name: name of the pool
1400 :type pool_name: str
1401 :param object_name: name of the object
1402 :type object_name: str
1403 :param data: data to write
1404 :type data: bytes
1405 """
1406 ioctx = self.rados.open_ioctx(pool_name)
1407 max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
1408 if len(data) > max_size:
1409 msg = ("Data to be written to object '{0}' exceeds "
1410 "{1} bytes".format(object_name, max_size))
1411 log.error(msg)
1412 raise CephFSVolumeClientError(msg)
1413 try:
1414 ioctx.write_full(object_name, data)
1415 finally:
1416 ioctx.close()
1417
1418 def get_object(self, pool_name, object_name):
1419 """
1420 Synchronously read data from object.
1421
1422 :param pool_name: name of the pool
1423 :type pool_name: str
1424 :param object_name: name of the object
1425 :type object_name: str
1426
1427 :returns: bytes - data read from object
1428 """
1429 ioctx = self.rados.open_ioctx(pool_name)
1430 max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
1431 try:
1432 bytes_read = ioctx.read(object_name, max_size)
1433 if ((len(bytes_read) == max_size) and
1434 (ioctx.read(object_name, 1, offset=max_size))):
1435 log.warning("Size of object {0} exceeds '{1}' bytes "
1436 "read".format(object_name, max_size))
1437 finally:
1438 ioctx.close()
1439 return bytes_read
1440
1441 def delete_object(self, pool_name, object_name):
1442 ioctx = self.rados.open_ioctx(pool_name)
1443 try:
1444 ioctx.remove_object(object_name)
1445 except rados.ObjectNotFound:
1446 log.warn("Object '{0}' was already removed".format(object_name))
1447 finally:
1448 ioctx.close()