]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/ceph_volume_client.py
87cba46d1cf76a36ae486adf9c5fb9910dd7bb17
[ceph.git] / ceph / src / pybind / ceph_volume_client.py
1 """
2 Copyright (C) 2015 Red Hat, Inc.
3
4 LGPL2. See file COPYING.
5 """
6
7 from contextlib import contextmanager
8 import errno
9 import fcntl
10 import json
11 import logging
12 import os
13 import re
14 import struct
15 import sys
16 import threading
17 import time
18 import uuid
19
20 from ceph_argparse import json_command
21
22 import cephfs
23 import rados
24
25 def to_bytes(param):
26 '''
27 Helper method that returns byte representation of the given parameter.
28 '''
29 if isinstance(param, str):
30 return param.encode()
31 else:
32 return str(param).encode()
33
34 class RadosError(Exception):
35 """
36 Something went wrong talking to Ceph with librados
37 """
38 pass
39
40
41 RADOS_TIMEOUT = 10
42
43 log = logging.getLogger(__name__)
44
45 # Reserved volume group name which we use in paths for volumes
46 # that are not assigned to a group (i.e. created with group=None)
47 NO_GROUP_NAME = "_nogroup"
48
49 # Filename extensions for meta files.
50 META_FILE_EXT = ".meta"
51
52 class VolumePath(object):
53 """
54 Identify a volume's path as group->volume
55 The Volume ID is a unique identifier, but this is a much more
56 helpful thing to pass around.
57 """
58 def __init__(self, group_id, volume_id):
59 self.group_id = group_id
60 self.volume_id = volume_id
61 assert self.group_id != NO_GROUP_NAME
62 assert self.volume_id != "" and self.volume_id is not None
63
64 def __str__(self):
65 return "{0}/{1}".format(self.group_id, self.volume_id)
66
67
68 class ClusterTimeout(Exception):
69 """
70 Exception indicating that we timed out trying to talk to the Ceph cluster,
71 either to the mons, or to any individual daemon that the mons indicate ought
72 to be up but isn't responding to us.
73 """
74 pass
75
76
77 class ClusterError(Exception):
78 """
79 Exception indicating that the cluster returned an error to a command that
80 we thought should be successful based on our last knowledge of the cluster
81 state.
82 """
83 def __init__(self, action, result_code, result_str):
84 self._action = action
85 self._result_code = result_code
86 self._result_str = result_str
87
88 def __str__(self):
89 return "Error {0} (\"{1}\") while {2}".format(
90 self._result_code, self._result_str, self._action)
91
92
93 class RankEvicter(threading.Thread):
94 """
95 Thread for evicting client(s) from a particular MDS daemon instance.
96
97 This is more complex than simply sending a command, because we have to
98 handle cases where MDS daemons might not be fully up yet, and/or might
99 be transiently unresponsive to commands.
100 """
101 class GidGone(Exception):
102 pass
103
104 POLL_PERIOD = 5
105
106 def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout):
107 """
108 :param client_spec: list of strings, used as filter arguments to "session evict"
109 pass ["id=123"] to evict a single client with session id 123.
110 """
111 self.rank = rank
112 self.gid = gid
113 self._mds_map = mds_map
114 self._client_spec = client_spec
115 self._volume_client = volume_client
116 self._ready_timeout = ready_timeout
117 self._ready_waited = 0
118
119 self.success = False
120 self.exception = None
121
122 super(RankEvicter, self).__init__()
123
124 def _ready_to_evict(self):
125 if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid:
126 log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format(
127 self._client_spec, self.rank, self.gid
128 ))
129 raise RankEvicter.GidGone()
130
131 info = self._mds_map['info']["gid_{0}".format(self.gid)]
132 log.debug("_ready_to_evict: state={0}".format(info['state']))
133 return info['state'] in ["up:active", "up:clientreplay"]
134
135 def _wait_for_ready(self):
136 """
137 Wait for that MDS rank to reach an active or clientreplay state, and
138 not be laggy.
139 """
140 while not self._ready_to_evict():
141 if self._ready_waited > self._ready_timeout:
142 raise ClusterTimeout()
143
144 time.sleep(self.POLL_PERIOD)
145 self._ready_waited += self.POLL_PERIOD
146
147 self._mds_map = self._volume_client._rados_command("mds dump", {})
148
149 def _evict(self):
150 """
151 Run the eviction procedure. Return true on success, false on errors.
152 """
153
154 # Wait til the MDS is believed by the mon to be available for commands
155 try:
156 self._wait_for_ready()
157 except self.GidGone:
158 return True
159
160 # Then send it an evict
161 ret = errno.ETIMEDOUT
162 while ret == errno.ETIMEDOUT:
163 log.debug("mds_command: {0}, {1}".format(
164 "%s" % self.gid, ["session", "evict"] + self._client_spec
165 ))
166 ret, outb, outs = self._volume_client.fs.mds_command(
167 "%s" % self.gid,
168 [json.dumps({
169 "prefix": "session evict",
170 "filters": self._client_spec
171 })], "")
172 log.debug("mds_command: complete {0} {1}".format(ret, outs))
173
174 # If we get a clean response, great, it's gone from that rank.
175 if ret == 0:
176 return True
177 elif ret == errno.ETIMEDOUT:
178 # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error)
179 self._mds_map = self._volume_client._rados_command("mds dump", {})
180 try:
181 self._wait_for_ready()
182 except self.GidGone:
183 return True
184 else:
185 raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs)
186
187 def run(self):
188 try:
189 self._evict()
190 except Exception as e:
191 self.success = False
192 self.exception = e
193 else:
194 self.success = True
195
196
197 class EvictionError(Exception):
198 pass
199
200
201 class CephFSVolumeClientError(Exception):
202 """
203 Something went wrong talking to Ceph using CephFSVolumeClient.
204 """
205 pass
206
207
208 CEPHFSVOLUMECLIENT_VERSION_HISTORY = """
209
210 CephFSVolumeClient Version History:
211
212 * 1 - Initial version
213 * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient
214 * 3 - Allow volumes to be created without RADOS namespace isolation
215 * 4 - Added get_object_and_version, put_object_versioned method to CephFSVolumeClient
216 """
217
218
219 class CephFSVolumeClient(object):
220 """
221 Combine libcephfs and librados interfaces to implement a
222 'Volume' concept implemented as a cephfs directory and
223 client capabilities which restrict mount access to this
224 directory.
225
226 Additionally, volumes may be in a 'Group'. Conveniently,
227 volumes are a lot like manila shares, and groups are a lot
228 like manila consistency groups.
229
230 Refer to volumes with VolumePath, which specifies the
231 volume and group IDs (both strings). The group ID may
232 be None.
233
234 In general, functions in this class are allowed raise rados.Error
235 or cephfs.Error exceptions in unexpected situations.
236 """
237
238 # Current version
239 version = 4
240
241 # Where shall we create our volumes?
242 POOL_PREFIX = "fsvolume_"
243 DEFAULT_VOL_PREFIX = "/volumes"
244 DEFAULT_NS_PREFIX = "fsvolumens_"
245
246 def __init__(self, auth_id, conf_path, cluster_name, volume_prefix=None, pool_ns_prefix=None):
247 self.fs = None
248 self.rados = None
249 self.connected = False
250 self.conf_path = conf_path
251 self.cluster_name = cluster_name
252 self.auth_id = auth_id
253 self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX
254 self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX
255 # For flock'ing in cephfs, I want a unique ID to distinguish me
256 # from any other manila-share services that are loading this module.
257 # We could use pid, but that's unnecessary weak: generate a
258 # UUID
259 self._id = struct.unpack(">Q", uuid.uuid1().bytes[0:8])[0]
260
261 # TODO: version the on-disk structures
262
263 def recover(self):
264 # Scan all auth keys to see if they're dirty: if they are, they have
265 # state that might not have propagated to Ceph or to the related
266 # volumes yet.
267
268 # Important: we *always* acquire locks in the order auth->volume
269 # That means a volume can never be dirty without the auth key
270 # we're updating it with being dirty at the same time.
271
272 # First list the auth IDs that have potentially dirty on-disk metadata
273 log.debug("Recovering from partial auth updates (if any)...")
274
275 try:
276 dir_handle = self.fs.opendir(self.volume_prefix)
277 except cephfs.ObjectNotFound:
278 log.debug("Nothing to recover. No auth meta files.")
279 return
280
281 d = self.fs.readdir(dir_handle)
282 auth_ids = []
283
284 if not d:
285 log.debug("Nothing to recover. No auth meta files.")
286
287 while d:
288 # Identify auth IDs from auth meta filenames. The auth meta files
289 # are named as, "$<auth_id><meta filename extension>"
290 regex = "^\$(.*){0}$".format(re.escape(META_FILE_EXT))
291 match = re.search(regex, d.d_name)
292 if match:
293 auth_ids.append(match.group(1))
294
295 d = self.fs.readdir(dir_handle)
296
297 self.fs.closedir(dir_handle)
298
299 # Key points based on ordering:
300 # * Anything added in VMeta is already added in AMeta
301 # * Anything added in Ceph is already added in VMeta
302 # * Anything removed in VMeta is already removed in Ceph
303 # * Anything removed in AMeta is already removed in VMeta
304
305 # Deauthorization: because I only update metadata AFTER the
306 # update of the next level down, I have the same ordering of
307 # -> things which exist in the AMeta should also exist
308 # in the VMeta, should also exist in Ceph, and the same
309 # recovery procedure that gets me consistent after crashes
310 # during authorization will also work during deauthorization
311
312 # Now for each auth ID, check for dirty flag and apply updates
313 # if dirty flag is found
314 for auth_id in auth_ids:
315 with self._auth_lock(auth_id):
316 auth_meta = self._auth_metadata_get(auth_id)
317 if not auth_meta or not auth_meta['volumes']:
318 # Clean up auth meta file
319 self.fs.unlink(self._auth_metadata_path(auth_id))
320 continue
321 if not auth_meta['dirty']:
322 continue
323 self._recover_auth_meta(auth_id, auth_meta)
324
325 log.debug("Recovered from partial auth updates (if any).")
326
327 def _recover_auth_meta(self, auth_id, auth_meta):
328 """
329 Call me after locking the auth meta file.
330 """
331 remove_volumes = []
332
333 for volume, volume_data in auth_meta['volumes'].items():
334 if not volume_data['dirty']:
335 continue
336
337 (group_id, volume_id) = volume.split('/')
338 group_id = group_id if group_id is not 'None' else None
339 volume_path = VolumePath(group_id, volume_id)
340 access_level = volume_data['access_level']
341
342 with self._volume_lock(volume_path):
343 vol_meta = self._volume_metadata_get(volume_path)
344
345 # No VMeta update indicates that there was no auth update
346 # in Ceph either. So it's safe to remove corresponding
347 # partial update in AMeta.
348 if not vol_meta or auth_id not in vol_meta['auths']:
349 remove_volumes.append(volume)
350 continue
351
352 want_auth = {
353 'access_level': access_level,
354 'dirty': False,
355 }
356 # VMeta update looks clean. Ceph auth update must have been
357 # clean.
358 if vol_meta['auths'][auth_id] == want_auth:
359 continue
360
361 readonly = True if access_level is 'r' else False
362 self._authorize_volume(volume_path, auth_id, readonly)
363
364 # Recovered from partial auth updates for the auth ID's access
365 # to a volume.
366 auth_meta['volumes'][volume]['dirty'] = False
367 self._auth_metadata_set(auth_id, auth_meta)
368
369 for volume in remove_volumes:
370 del auth_meta['volumes'][volume]
371
372 if not auth_meta['volumes']:
373 # Clean up auth meta file
374 self.fs.unlink(self._auth_metadata_path(auth_id))
375 return
376
377 # Recovered from all partial auth updates for the auth ID.
378 auth_meta['dirty'] = False
379 self._auth_metadata_set(auth_id, auth_meta)
380
381
382 def evict(self, auth_id, timeout=30, volume_path=None):
383 """
384 Evict all clients based on the authorization ID and optionally based on
385 the volume path mounted. Assumes that the authorization key has been
386 revoked prior to calling this function.
387
388 This operation can throw an exception if the mon cluster is unresponsive, or
389 any individual MDS daemon is unresponsive for longer than the timeout passed in.
390 """
391
392 client_spec = ["auth_name={0}".format(auth_id), ]
393 if volume_path:
394 client_spec.append("client_metadata.root={0}".
395 format(self._get_path(volume_path)))
396
397 log.info("evict clients with {0}".format(', '.join(client_spec)))
398
399 mds_map = self._rados_command("mds dump", {})
400
401 up = {}
402 for name, gid in mds_map['up'].items():
403 # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
404 assert name.startswith("mds_")
405 up[int(name[4:])] = gid
406
407 # For all MDS ranks held by a daemon
408 # Do the parallelism in python instead of using "tell mds.*", because
409 # the latter doesn't give us per-mds output
410 threads = []
411 for rank, gid in up.items():
412 thread = RankEvicter(self, client_spec, rank, gid, mds_map,
413 timeout)
414 thread.start()
415 threads.append(thread)
416
417 for t in threads:
418 t.join()
419
420 log.info("evict: joined all")
421
422 for t in threads:
423 if not t.success:
424 msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}".
425 format(', '.join(client_spec), t.rank, t.gid, t.exception)
426 )
427 log.error(msg)
428 raise EvictionError(msg)
429
430 def _get_path(self, volume_path):
431 """
432 Determine the path within CephFS where this volume will live
433 :return: absolute path (string)
434 """
435 return os.path.join(
436 self.volume_prefix,
437 volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME,
438 volume_path.volume_id)
439
440 def _get_group_path(self, group_id):
441 if group_id is None:
442 raise ValueError("group_id may not be None")
443
444 return os.path.join(
445 self.volume_prefix,
446 group_id
447 )
448
449 def connect(self, premount_evict = None):
450 """
451
452 :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers
453 may want to use this to specify their own auth ID if they expect
454 to be a unique instance and don't want to wait for caps to time
455 out after failure of another instance of themselves.
456 """
457 log.debug("Connecting to RADOS with config {0}...".format(self.conf_path))
458 self.rados = rados.Rados(
459 name="client.{0}".format(self.auth_id),
460 clustername=self.cluster_name,
461 conffile=self.conf_path,
462 conf={}
463 )
464 self.rados.connect()
465
466 log.debug("Connection to RADOS complete")
467
468 log.debug("Connecting to cephfs...")
469 self.fs = cephfs.LibCephFS(rados_inst=self.rados)
470 log.debug("CephFS initializing...")
471 self.fs.init()
472 if premount_evict is not None:
473 log.debug("Premount eviction of {0} starting".format(premount_evict))
474 self.evict(premount_evict)
475 log.debug("Premount eviction of {0} completes".format(premount_evict))
476 log.debug("CephFS mounting...")
477 self.fs.mount()
478 log.debug("Connection to cephfs complete")
479
480 # Recover from partial auth updates due to a previous
481 # crash.
482 self.recover()
483
484 def get_mon_addrs(self):
485 log.info("get_mon_addrs")
486 result = []
487 mon_map = self._rados_command("mon dump")
488 for mon in mon_map['mons']:
489 ip_port = mon['addr'].split("/")[0]
490 result.append(ip_port)
491
492 return result
493
494 def disconnect(self):
495 log.info("disconnect")
496 if self.fs:
497 log.debug("Disconnecting cephfs...")
498 self.fs.shutdown()
499 self.fs = None
500 log.debug("Disconnecting cephfs complete")
501
502 if self.rados:
503 log.debug("Disconnecting rados...")
504 self.rados.shutdown()
505 self.rados = None
506 log.debug("Disconnecting rados complete")
507
508 def __del__(self):
509 self.disconnect()
510
511 def _get_pool_id(self, osd_map, pool_name):
512 # Maybe borrow the OSDMap wrapper class from calamari if more helpers
513 # like this are needed.
514 for pool in osd_map['pools']:
515 if pool['pool_name'] == pool_name:
516 return pool['pool']
517
518 return None
519
520 def _create_volume_pool(self, pool_name):
521 """
522 Idempotently create a pool for use as a CephFS data pool, with the given name
523
524 :return The ID of the created pool
525 """
526 osd_map = self._rados_command('osd dump', {})
527
528 existing_id = self._get_pool_id(osd_map, pool_name)
529 if existing_id is not None:
530 log.info("Pool {0} already exists".format(pool_name))
531 return existing_id
532
533 osd_count = len(osd_map['osds'])
534
535 # We can't query the actual cluster config remotely, but since this is
536 # just a heuristic we'll assume that the ceph.conf we have locally reflects
537 # that in use in the rest of the cluster.
538 pg_warn_max_per_osd = int(self.rados.conf_get('mon_max_pg_per_osd'))
539
540 other_pgs = 0
541 for pool in osd_map['pools']:
542 if not pool['pool_name'].startswith(self.POOL_PREFIX):
543 other_pgs += pool['pg_num']
544
545 # A basic heuristic for picking pg_num: work out the max number of
546 # PGs we can have without tripping a warning, then subtract the number
547 # of PGs already created by non-manila pools, then divide by ten. That'll
548 # give you a reasonable result on a system where you have "a few" manila
549 # shares.
550 pg_num = ((pg_warn_max_per_osd * osd_count) - other_pgs) // 10
551 # TODO Alternatively, respect an override set by the user.
552
553 self._rados_command(
554 'osd pool create',
555 {
556 'pool': pool_name,
557 'pg_num': int(pg_num),
558 }
559 )
560
561 osd_map = self._rados_command('osd dump', {})
562 pool_id = self._get_pool_id(osd_map, pool_name)
563
564 if pool_id is None:
565 # If the pool isn't there, that's either a ceph bug, or it's some outside influence
566 # removing it right after we created it.
567 log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format(
568 pool_name, json.dumps(osd_map, indent=2)
569 ))
570 raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name))
571 else:
572 return pool_id
573
574 def create_group(self, group_id):
575 # Prevent craftily-named volume groups from colliding with the meta
576 # files.
577 if group_id.endswith(META_FILE_EXT):
578 raise ValueError("group ID cannot end with '{0}'.".format(
579 META_FILE_EXT))
580 path = self._get_group_path(group_id)
581 self._mkdir_p(path)
582
583 def destroy_group(self, group_id):
584 path = self._get_group_path(group_id)
585 try:
586 self.fs.stat(self.volume_prefix)
587 except cephfs.ObjectNotFound:
588 pass
589 else:
590 self.fs.rmdir(path)
591
592 def _mkdir_p(self, path):
593 try:
594 self.fs.stat(path)
595 except cephfs.ObjectNotFound:
596 pass
597 else:
598 return
599
600 parts = path.split(os.path.sep)
601
602 for i in range(1, len(parts) + 1):
603 subpath = os.path.join(*parts[0:i])
604 try:
605 self.fs.stat(subpath)
606 except cephfs.ObjectNotFound:
607 self.fs.mkdir(subpath, 0o755)
608
609 def create_volume(self, volume_path, size=None, data_isolated=False, namespace_isolated=True):
610 """
611 Set up metadata, pools and auth for a volume.
612
613 This function is idempotent. It is safe to call this again
614 for an already-created volume, even if it is in use.
615
616 :param volume_path: VolumePath instance
617 :param size: In bytes, or None for no size limit
618 :param data_isolated: If true, create a separate OSD pool for this volume
619 :param namespace_isolated: If true, use separate RADOS namespace for this volume
620 :return:
621 """
622 path = self._get_path(volume_path)
623 log.info("create_volume: {0}".format(path))
624
625 self._mkdir_p(path)
626
627 if size is not None:
628 self.fs.setxattr(path, 'ceph.quota.max_bytes', to_bytes(size), 0)
629
630 # data_isolated means create a separate pool for this volume
631 if data_isolated:
632 pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
633 log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name))
634 pool_id = self._create_volume_pool(pool_name)
635 mds_map = self._rados_command("mds dump", {})
636 if pool_id not in mds_map['data_pools']:
637 self._rados_command("mds add_data_pool", {
638 'pool': pool_name
639 })
640 time.sleep(5) # time for MDSMap to be distributed
641 self.fs.setxattr(path, 'ceph.dir.layout.pool', to_bytes(pool_name), 0)
642
643 # enforce security isolation, use separate namespace for this volume
644 if namespace_isolated:
645 namespace = "{0}{1}".format(self.pool_ns_prefix, volume_path.volume_id)
646 log.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path, namespace))
647 self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace',
648 to_bytes(namespace), 0)
649 else:
650 # If volume's namespace layout is not set, then the volume's pool
651 # layout remains unset and will undesirably change with ancestor's
652 # pool layout changes.
653 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
654 self.fs.setxattr(path, 'ceph.dir.layout.pool',
655 to_bytes(pool_name), 0)
656
657 # Create a volume meta file, if it does not already exist, to store
658 # data about auth ids having access to the volume
659 fd = self.fs.open(self._volume_metadata_path(volume_path),
660 os.O_CREAT, 0o755)
661 self.fs.close(fd)
662
663 return {
664 'mount_path': path
665 }
666
667 def delete_volume(self, volume_path, data_isolated=False):
668 """
669 Make a volume inaccessible to guests. This function is
670 idempotent. This is the fast part of tearing down a volume: you must
671 also later call purge_volume, which is the slow part.
672
673 :param volume_path: Same identifier used in create_volume
674 :return:
675 """
676
677 path = self._get_path(volume_path)
678 log.info("delete_volume: {0}".format(path))
679
680 # Create the trash folder if it doesn't already exist
681 trash = os.path.join(self.volume_prefix, "_deleting")
682 self._mkdir_p(trash)
683
684 # We'll move it to here
685 trashed_volume = os.path.join(trash, volume_path.volume_id)
686
687 # Move the volume's data to the trash folder
688 try:
689 self.fs.stat(path)
690 except cephfs.ObjectNotFound:
691 log.warning("Trying to delete volume '{0}' but it's already gone".format(
692 path))
693 else:
694 self.fs.rename(path, trashed_volume)
695
696 # Delete the volume meta file, if it's not already deleted
697 vol_meta_path = self._volume_metadata_path(volume_path)
698 try:
699 self.fs.unlink(vol_meta_path)
700 except cephfs.ObjectNotFound:
701 pass
702
703 def purge_volume(self, volume_path, data_isolated=False):
704 """
705 Finish clearing up a volume that was previously passed to delete_volume. This
706 function is idempotent.
707 """
708
709 trash = os.path.join(self.volume_prefix, "_deleting")
710 trashed_volume = os.path.join(trash, volume_path.volume_id)
711
712 try:
713 self.fs.stat(trashed_volume)
714 except cephfs.ObjectNotFound:
715 log.warning("Trying to purge volume '{0}' but it's already been purged".format(
716 trashed_volume))
717 return
718
719 def rmtree(root_path):
720 log.debug("rmtree {0}".format(root_path))
721 dir_handle = self.fs.opendir(root_path)
722 d = self.fs.readdir(dir_handle)
723 while d:
724 if d.d_name not in [".", ".."]:
725 # Do not use os.path.join because it is sensitive
726 # to string encoding, we just pass through dnames
727 # as byte arrays
728 d_full = "{0}/{1}".format(root_path, d.d_name)
729 if d.is_dir():
730 rmtree(d_full)
731 else:
732 self.fs.unlink(d_full)
733
734 d = self.fs.readdir(dir_handle)
735 self.fs.closedir(dir_handle)
736
737 self.fs.rmdir(root_path)
738
739 rmtree(trashed_volume)
740
741 if data_isolated:
742 pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
743 osd_map = self._rados_command("osd dump", {})
744 pool_id = self._get_pool_id(osd_map, pool_name)
745 mds_map = self._rados_command("mds dump", {})
746 if pool_id in mds_map['data_pools']:
747 self._rados_command("mds remove_data_pool", {
748 'pool': pool_name
749 })
750 self._rados_command("osd pool delete",
751 {
752 "pool": pool_name,
753 "pool2": pool_name,
754 "sure": "--yes-i-really-really-mean-it"
755 })
756
757 def _get_ancestor_xattr(self, path, attr):
758 """
759 Helper for reading layout information: if this xattr is missing
760 on the requested path, keep checking parents until we find it.
761 """
762 try:
763 result = self.fs.getxattr(path, attr).decode()
764 if result == "":
765 # Annoying! cephfs gives us empty instead of an error when attr not found
766 raise cephfs.NoData()
767 else:
768 return result
769 except cephfs.NoData:
770 if path == "/":
771 raise
772 else:
773 return self._get_ancestor_xattr(os.path.split(path)[0], attr)
774
775 def _check_compat_version(self, compat_version):
776 if self.version < compat_version:
777 msg = ("The current version of CephFSVolumeClient, version {0} "
778 "does not support the required feature. Need version {1} "
779 "or greater".format(self.version, compat_version)
780 )
781 log.error(msg)
782 raise CephFSVolumeClientError(msg)
783
784 def _metadata_get(self, path):
785 """
786 Return a deserialized JSON object, or None
787 """
788 fd = self.fs.open(path, "r")
789 # TODO iterate instead of assuming file < 4MB
790 read_bytes = self.fs.read(fd, 0, 4096 * 1024)
791 self.fs.close(fd)
792 if read_bytes:
793 return json.loads(read_bytes.decode())
794 else:
795 return None
796
797 def _metadata_set(self, path, data):
798 serialized = json.dumps(data)
799 fd = self.fs.open(path, "w")
800 try:
801 self.fs.write(fd, to_bytes(serialized), 0)
802 self.fs.fsync(fd, 0)
803 finally:
804 self.fs.close(fd)
805
806 def _lock(self, path):
807 @contextmanager
808 def fn():
809 while(1):
810 fd = self.fs.open(path, os.O_CREAT, 0o755)
811 self.fs.flock(fd, fcntl.LOCK_EX, self._id)
812
813 # The locked file will be cleaned up sometime. It could be
814 # unlinked e.g., by an another manila share instance, before
815 # lock was applied on it. Perform checks to ensure that this
816 # does not happen.
817 try:
818 statbuf = self.fs.stat(path)
819 except cephfs.ObjectNotFound:
820 self.fs.close(fd)
821 continue
822
823 fstatbuf = self.fs.fstat(fd)
824 if statbuf.st_ino == fstatbuf.st_ino:
825 break
826
827 try:
828 yield
829 finally:
830 self.fs.flock(fd, fcntl.LOCK_UN, self._id)
831 self.fs.close(fd)
832
833 return fn()
834
835 def _auth_metadata_path(self, auth_id):
836 return os.path.join(self.volume_prefix, "${0}{1}".format(
837 auth_id, META_FILE_EXT))
838
839 def _auth_lock(self, auth_id):
840 return self._lock(self._auth_metadata_path(auth_id))
841
842 def _auth_metadata_get(self, auth_id):
843 """
844 Call me with the metadata locked!
845
846 Check whether a auth metadata structure can be decoded by the current
847 version of CephFSVolumeClient.
848
849 Return auth metadata that the current version of CephFSVolumeClient
850 can decode.
851 """
852 auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id))
853
854 if auth_metadata:
855 self._check_compat_version(auth_metadata['compat_version'])
856
857 return auth_metadata
858
859 def _auth_metadata_set(self, auth_id, data):
860 """
861 Call me with the metadata locked!
862
863 Fsync the auth metadata.
864
865 Add two version attributes to the auth metadata,
866 'compat_version', the minimum CephFSVolumeClient version that can
867 decode the metadata, and 'version', the CephFSVolumeClient version
868 that encoded the metadata.
869 """
870 data['compat_version'] = 1
871 data['version'] = self.version
872 return self._metadata_set(self._auth_metadata_path(auth_id), data)
873
874 def _volume_metadata_path(self, volume_path):
875 return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format(
876 volume_path.group_id if volume_path.group_id else "",
877 volume_path.volume_id,
878 META_FILE_EXT
879 ))
880
881 def _volume_lock(self, volume_path):
882 """
883 Return a ContextManager which locks the authorization metadata for
884 a particular volume, and persists a flag to the metadata indicating
885 that it is currently locked, so that we can detect dirty situations
886 during recovery.
887
888 This lock isn't just to make access to the metadata safe: it's also
889 designed to be used over the two-step process of checking the
890 metadata and then responding to an authorization request, to
891 ensure that at the point we respond the metadata hasn't changed
892 in the background. It's key to how we avoid security holes
893 resulting from races during that problem ,
894 """
895 return self._lock(self._volume_metadata_path(volume_path))
896
897 def _volume_metadata_get(self, volume_path):
898 """
899 Call me with the metadata locked!
900
901 Check whether a volume metadata structure can be decoded by the current
902 version of CephFSVolumeClient.
903
904 Return a volume_metadata structure that the current version of
905 CephFSVolumeClient can decode.
906 """
907 volume_metadata = self._metadata_get(self._volume_metadata_path(volume_path))
908
909 if volume_metadata:
910 self._check_compat_version(volume_metadata['compat_version'])
911
912 return volume_metadata
913
914 def _volume_metadata_set(self, volume_path, data):
915 """
916 Call me with the metadata locked!
917
918 Add two version attributes to the volume metadata,
919 'compat_version', the minimum CephFSVolumeClient version that can
920 decode the metadata and 'version', the CephFSVolumeClient version
921 that encoded the metadata.
922 """
923 data['compat_version'] = 1
924 data['version'] = self.version
925 return self._metadata_set(self._volume_metadata_path(volume_path), data)
926
927 def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None):
928 """
929 Get-or-create a Ceph auth identity for `auth_id` and grant them access
930 to
931 :param volume_path:
932 :param auth_id:
933 :param readonly:
934 :param tenant_id: Optionally provide a stringizable object to
935 restrict any created cephx IDs to other callers
936 passing the same tenant ID.
937 :return:
938 """
939
940 with self._auth_lock(auth_id):
941 # Existing meta, or None, to be updated
942 auth_meta = self._auth_metadata_get(auth_id)
943
944 # volume data to be inserted
945 volume_path_str = str(volume_path)
946 volume = {
947 volume_path_str : {
948 # The access level at which the auth_id is authorized to
949 # access the volume.
950 'access_level': 'r' if readonly else 'rw',
951 'dirty': True,
952 }
953 }
954 if auth_meta is None:
955 sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format(
956 auth_id, tenant_id
957 ))
958 log.debug("Authorize: no existing meta")
959 auth_meta = {
960 'dirty': True,
961 'tenant_id': tenant_id.__str__() if tenant_id else None,
962 'volumes': volume
963 }
964
965 # Note: this is *not* guaranteeing that the key doesn't already
966 # exist in Ceph: we are allowing VolumeClient tenants to
967 # 'claim' existing Ceph keys. In order to prevent VolumeClient
968 # tenants from reading e.g. client.admin keys, you need to
969 # have configured your VolumeClient user (e.g. Manila) to
970 # have mon auth caps that prevent it from accessing those keys
971 # (e.g. limit it to only access keys with a manila.* prefix)
972 else:
973 # Disallow tenants to share auth IDs
974 if auth_meta['tenant_id'].__str__() != tenant_id.__str__():
975 msg = "auth ID: {0} is already in use".format(auth_id)
976 log.error(msg)
977 raise CephFSVolumeClientError(msg)
978
979 if auth_meta['dirty']:
980 self._recover_auth_meta(auth_id, auth_meta)
981
982 log.debug("Authorize: existing tenant {tenant}".format(
983 tenant=auth_meta['tenant_id']
984 ))
985 auth_meta['dirty'] = True
986 auth_meta['volumes'].update(volume)
987
988 self._auth_metadata_set(auth_id, auth_meta)
989
990 with self._volume_lock(volume_path):
991 key = self._authorize_volume(volume_path, auth_id, readonly)
992
993 auth_meta['dirty'] = False
994 auth_meta['volumes'][volume_path_str]['dirty'] = False
995 self._auth_metadata_set(auth_id, auth_meta)
996
997 if tenant_id:
998 return {
999 'auth_key': key
1000 }
1001 else:
1002 # Caller wasn't multi-tenant aware: be safe and don't give
1003 # them a key
1004 return {
1005 'auth_key': None
1006 }
1007
1008 def _authorize_volume(self, volume_path, auth_id, readonly):
1009 vol_meta = self._volume_metadata_get(volume_path)
1010
1011 access_level = 'r' if readonly else 'rw'
1012 auth = {
1013 auth_id: {
1014 'access_level': access_level,
1015 'dirty': True,
1016 }
1017 }
1018
1019 if vol_meta is None:
1020 vol_meta = {
1021 'auths': auth
1022 }
1023 else:
1024 vol_meta['auths'].update(auth)
1025 self._volume_metadata_set(volume_path, vol_meta)
1026
1027 key = self._authorize_ceph(volume_path, auth_id, readonly)
1028
1029 vol_meta['auths'][auth_id]['dirty'] = False
1030 self._volume_metadata_set(volume_path, vol_meta)
1031
1032 return key
1033
1034 def _authorize_ceph(self, volume_path, auth_id, readonly):
1035 path = self._get_path(volume_path)
1036 log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(
1037 auth_id, path
1038 ))
1039
1040 # First I need to work out what the data pool is for this share:
1041 # read the layout
1042 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
1043
1044 try:
1045 namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_"
1046 "namespace").decode()
1047 except cephfs.NoData:
1048 namespace = None
1049
1050 # Now construct auth capabilities that give the guest just enough
1051 # permissions to access the share
1052 client_entity = "client.{0}".format(auth_id)
1053 want_access_level = 'r' if readonly else 'rw'
1054 want_mds_cap = 'allow {0} path={1}'.format(want_access_level, path)
1055 if namespace:
1056 want_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
1057 want_access_level, pool_name, namespace)
1058 else:
1059 want_osd_cap = 'allow {0} pool={1}'.format(want_access_level,
1060 pool_name)
1061
1062 try:
1063 existing = self._rados_command(
1064 'auth get',
1065 {
1066 'entity': client_entity
1067 }
1068 )
1069 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1070 except rados.Error:
1071 caps = self._rados_command(
1072 'auth get-or-create',
1073 {
1074 'entity': client_entity,
1075 'caps': [
1076 'mds', want_mds_cap,
1077 'osd', want_osd_cap,
1078 'mon', 'allow r']
1079 })
1080 else:
1081 # entity exists, update it
1082 cap = existing[0]
1083
1084 # Construct auth caps that if present might conflict with the desired
1085 # auth caps.
1086 unwanted_access_level = 'r' if want_access_level is 'rw' else 'rw'
1087 unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path)
1088 if namespace:
1089 unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
1090 unwanted_access_level, pool_name, namespace)
1091 else:
1092 unwanted_osd_cap = 'allow {0} pool={1}'.format(
1093 unwanted_access_level, pool_name)
1094
1095 def cap_update(
1096 orig_mds_caps, orig_osd_caps, want_mds_cap,
1097 want_osd_cap, unwanted_mds_cap, unwanted_osd_cap):
1098
1099 if not orig_mds_caps:
1100 return want_mds_cap, want_osd_cap
1101
1102 mds_cap_tokens = orig_mds_caps.split(",")
1103 osd_cap_tokens = orig_osd_caps.split(",")
1104
1105 if want_mds_cap in mds_cap_tokens:
1106 return orig_mds_caps, orig_osd_caps
1107
1108 if unwanted_mds_cap in mds_cap_tokens:
1109 mds_cap_tokens.remove(unwanted_mds_cap)
1110 osd_cap_tokens.remove(unwanted_osd_cap)
1111
1112 mds_cap_tokens.append(want_mds_cap)
1113 osd_cap_tokens.append(want_osd_cap)
1114
1115 return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens)
1116
1117 orig_mds_caps = cap['caps'].get('mds', "")
1118 orig_osd_caps = cap['caps'].get('osd', "")
1119
1120 mds_cap_str, osd_cap_str = cap_update(
1121 orig_mds_caps, orig_osd_caps, want_mds_cap, want_osd_cap,
1122 unwanted_mds_cap, unwanted_osd_cap)
1123
1124 caps = self._rados_command(
1125 'auth caps',
1126 {
1127 'entity': client_entity,
1128 'caps': [
1129 'mds', mds_cap_str,
1130 'osd', osd_cap_str,
1131 'mon', cap['caps'].get('mon', 'allow r')]
1132 })
1133 caps = self._rados_command(
1134 'auth get',
1135 {
1136 'entity': client_entity
1137 }
1138 )
1139
1140 # Result expected like this:
1141 # [
1142 # {
1143 # "entity": "client.foobar",
1144 # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==",
1145 # "caps": {
1146 # "mds": "allow *",
1147 # "mon": "allow *"
1148 # }
1149 # }
1150 # ]
1151 assert len(caps) == 1
1152 assert caps[0]['entity'] == client_entity
1153 return caps[0]['key']
1154
1155 def deauthorize(self, volume_path, auth_id):
1156 with self._auth_lock(auth_id):
1157 # Existing meta, or None, to be updated
1158 auth_meta = self._auth_metadata_get(auth_id)
1159
1160 volume_path_str = str(volume_path)
1161 if (auth_meta is None) or (not auth_meta['volumes']):
1162 log.warn("deauthorized called for already-removed auth"
1163 "ID '{auth_id}' for volume ID '{volume}'".format(
1164 auth_id=auth_id, volume=volume_path.volume_id
1165 ))
1166 # Clean up the auth meta file of an auth ID
1167 self.fs.unlink(self._auth_metadata_path(auth_id))
1168 return
1169
1170 if volume_path_str not in auth_meta['volumes']:
1171 log.warn("deauthorized called for already-removed auth"
1172 "ID '{auth_id}' for volume ID '{volume}'".format(
1173 auth_id=auth_id, volume=volume_path.volume_id
1174 ))
1175 return
1176
1177 if auth_meta['dirty']:
1178 self._recover_auth_meta(auth_id, auth_meta)
1179
1180 auth_meta['dirty'] = True
1181 auth_meta['volumes'][volume_path_str]['dirty'] = True
1182 self._auth_metadata_set(auth_id, auth_meta)
1183
1184 self._deauthorize_volume(volume_path, auth_id)
1185
1186 # Filter out the volume we're deauthorizing
1187 del auth_meta['volumes'][volume_path_str]
1188
1189 # Clean up auth meta file
1190 if not auth_meta['volumes']:
1191 self.fs.unlink(self._auth_metadata_path(auth_id))
1192 return
1193
1194 auth_meta['dirty'] = False
1195 self._auth_metadata_set(auth_id, auth_meta)
1196
1197 def _deauthorize_volume(self, volume_path, auth_id):
1198 with self._volume_lock(volume_path):
1199 vol_meta = self._volume_metadata_get(volume_path)
1200
1201 if (vol_meta is None) or (auth_id not in vol_meta['auths']):
1202 log.warn("deauthorized called for already-removed auth"
1203 "ID '{auth_id}' for volume ID '{volume}'".format(
1204 auth_id=auth_id, volume=volume_path.volume_id
1205 ))
1206 return
1207
1208 vol_meta['auths'][auth_id]['dirty'] = True
1209 self._volume_metadata_set(volume_path, vol_meta)
1210
1211 self._deauthorize(volume_path, auth_id)
1212
1213 # Remove the auth_id from the metadata *after* removing it
1214 # from ceph, so that if we crashed here, we would actually
1215 # recreate the auth ID during recovery (i.e. end up with
1216 # a consistent state).
1217
1218 # Filter out the auth we're removing
1219 del vol_meta['auths'][auth_id]
1220 self._volume_metadata_set(volume_path, vol_meta)
1221
1222 def _deauthorize(self, volume_path, auth_id):
1223 """
1224 The volume must still exist.
1225 """
1226 client_entity = "client.{0}".format(auth_id)
1227 path = self._get_path(volume_path)
1228 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
1229 try:
1230 namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_"
1231 "namespace").decode()
1232 except cephfs.NoData:
1233 namespace = None
1234
1235 # The auth_id might have read-only or read-write mount access for the
1236 # volume path.
1237 access_levels = ('r', 'rw')
1238 want_mds_caps = ['allow {0} path={1}'.format(access_level, path)
1239 for access_level in access_levels]
1240 if namespace:
1241 want_osd_caps = ['allow {0} pool={1} namespace={2}'.format(access_level, pool_name, namespace)
1242 for access_level in access_levels]
1243 else:
1244 want_osd_caps = ['allow {0} pool={1}'.format(access_level, pool_name)
1245 for access_level in access_levels]
1246
1247
1248 try:
1249 existing = self._rados_command(
1250 'auth get',
1251 {
1252 'entity': client_entity
1253 }
1254 )
1255
1256 def cap_remove(orig_mds_caps, orig_osd_caps, want_mds_caps, want_osd_caps):
1257 mds_cap_tokens = orig_mds_caps.split(",")
1258 osd_cap_tokens = orig_osd_caps.split(",")
1259
1260 for want_mds_cap, want_osd_cap in zip(want_mds_caps, want_osd_caps):
1261 if want_mds_cap in mds_cap_tokens:
1262 mds_cap_tokens.remove(want_mds_cap)
1263 osd_cap_tokens.remove(want_osd_cap)
1264 break
1265
1266 return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens)
1267
1268 cap = existing[0]
1269 orig_mds_caps = cap['caps'].get('mds', "")
1270 orig_osd_caps = cap['caps'].get('osd', "")
1271 mds_cap_str, osd_cap_str = cap_remove(orig_mds_caps, orig_osd_caps,
1272 want_mds_caps, want_osd_caps)
1273
1274 if not mds_cap_str:
1275 self._rados_command('auth del', {'entity': client_entity}, decode=False)
1276 else:
1277 self._rados_command(
1278 'auth caps',
1279 {
1280 'entity': client_entity,
1281 'caps': [
1282 'mds', mds_cap_str,
1283 'osd', osd_cap_str,
1284 'mon', cap['caps'].get('mon', 'allow r')]
1285 })
1286
1287 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1288 except rados.Error:
1289 # Already gone, great.
1290 return
1291
1292 def get_authorized_ids(self, volume_path):
1293 """
1294 Expose a list of auth IDs that have access to a volume.
1295
1296 return: a list of (auth_id, access_level) tuples, where
1297 the access_level can be 'r' , or 'rw'.
1298 None if no auth ID is given access to the volume.
1299 """
1300 with self._volume_lock(volume_path):
1301 meta = self._volume_metadata_get(volume_path)
1302 auths = []
1303 if not meta or not meta['auths']:
1304 return None
1305
1306 for auth, auth_data in meta['auths'].items():
1307 # Skip partial auth updates.
1308 if not auth_data['dirty']:
1309 auths.append((auth, auth_data['access_level']))
1310
1311 return auths
1312
1313 def _rados_command(self, prefix, args=None, decode=True):
1314 """
1315 Safer wrapper for ceph_argparse.json_command, which raises
1316 Error exception instead of relying on caller to check return
1317 codes.
1318
1319 Error exception can result from:
1320 * Timeout
1321 * Actual legitimate errors
1322 * Malformed JSON output
1323
1324 return: Decoded object from ceph, or None if empty string returned.
1325 If decode is False, return a string (the data returned by
1326 ceph command)
1327 """
1328 if args is None:
1329 args = {}
1330
1331 argdict = args.copy()
1332 argdict['format'] = 'json'
1333
1334 ret, outbuf, outs = json_command(self.rados,
1335 prefix=prefix,
1336 argdict=argdict,
1337 timeout=RADOS_TIMEOUT)
1338 if ret != 0:
1339 raise rados.Error(outs)
1340 else:
1341 if decode:
1342 if outbuf:
1343 try:
1344 return json.loads(outbuf.decode())
1345 except (ValueError, TypeError):
1346 raise RadosError("Invalid JSON output for command {0}".format(argdict))
1347 else:
1348 return None
1349 else:
1350 return outbuf
1351
1352 def get_used_bytes(self, volume_path):
1353 return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir."
1354 "rbytes").decode())
1355
1356 def set_max_bytes(self, volume_path, max_bytes):
1357 self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes',
1358 to_bytes(max_bytes if max_bytes else 0), 0)
1359
1360 def _snapshot_path(self, dir_path, snapshot_name):
1361 return os.path.join(
1362 dir_path, self.rados.conf_get('client_snapdir'), snapshot_name
1363 )
1364
1365 def _snapshot_create(self, dir_path, snapshot_name):
1366 # TODO: raise intelligible exception for clusters where snaps are disabled
1367 self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), 0o755)
1368
1369 def _snapshot_destroy(self, dir_path, snapshot_name):
1370 """
1371 Remove a snapshot, or do nothing if it already doesn't exist.
1372 """
1373 try:
1374 self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name))
1375 except cephfs.ObjectNotFound:
1376 log.warn("Snapshot was already gone: {0}".format(snapshot_name))
1377
1378 def create_snapshot_volume(self, volume_path, snapshot_name):
1379 self._snapshot_create(self._get_path(volume_path), snapshot_name)
1380
1381 def destroy_snapshot_volume(self, volume_path, snapshot_name):
1382 self._snapshot_destroy(self._get_path(volume_path), snapshot_name)
1383
1384 def create_snapshot_group(self, group_id, snapshot_name):
1385 if group_id is None:
1386 raise RuntimeError("Group ID may not be None")
1387
1388 return self._snapshot_create(self._get_group_path(group_id), snapshot_name)
1389
1390 def destroy_snapshot_group(self, group_id, snapshot_name):
1391 if group_id is None:
1392 raise RuntimeError("Group ID may not be None")
1393 if snapshot_name is None:
1394 raise RuntimeError("Snapshot name may not be None")
1395
1396 return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name)
1397
1398 def _cp_r(self, src, dst):
1399 # TODO
1400 raise NotImplementedError()
1401
1402 def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name):
1403 dest_fs_path = self._get_path(dest_volume_path)
1404 src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name)
1405
1406 self._cp_r(src_snapshot_path, dest_fs_path)
1407
1408 def put_object(self, pool_name, object_name, data):
1409 """
1410 Synchronously write data to an object.
1411
1412 :param pool_name: name of the pool
1413 :type pool_name: str
1414 :param object_name: name of the object
1415 :type object_name: str
1416 :param data: data to write
1417 :type data: bytes
1418 """
1419 return self.put_object_versioned(pool_name, object_name, data)
1420
1421 def put_object_versioned(self, pool_name, object_name, data, version=None):
1422 """
1423 Synchronously write data to an object only if version of the object
1424 version matches the expected version.
1425
1426 :param pool_name: name of the pool
1427 :type pool_name: str
1428 :param object_name: name of the object
1429 :type object_name: str
1430 :param data: data to write
1431 :type data: bytes
1432 :param version: expected version of the object to write
1433 :type version: int
1434 """
1435 ioctx = self.rados.open_ioctx(pool_name)
1436
1437 max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
1438 if len(data) > max_size:
1439 msg = ("Data to be written to object '{0}' exceeds "
1440 "{1} bytes".format(object_name, max_size))
1441 log.error(msg)
1442 raise CephFSVolumeClientError(msg)
1443
1444 try:
1445 with rados.WriteOpCtx(ioctx) as wop:
1446 if version is not None:
1447 wop.assert_version(version)
1448 wop.write_full(data)
1449 ioctx.operate_write_op(wop, object_name)
1450 except rados.OSError as e:
1451 log.error(e)
1452 raise e
1453 finally:
1454 ioctx.close()
1455
1456 def get_object(self, pool_name, object_name):
1457 """
1458 Synchronously read data from object.
1459
1460 :param pool_name: name of the pool
1461 :type pool_name: str
1462 :param object_name: name of the object
1463 :type object_name: str
1464
1465 :returns: bytes - data read from object
1466 """
1467 return self.get_object_and_version(pool_name, object_name)[0]
1468
1469 def get_object_and_version(self, pool_name, object_name):
1470 """
1471 Synchronously read data from object and get its version.
1472
1473 :param pool_name: name of the pool
1474 :type pool_name: str
1475 :param object_name: name of the object
1476 :type object_name: str
1477
1478 :returns: tuple of object data and version
1479 """
1480 ioctx = self.rados.open_ioctx(pool_name)
1481 max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
1482 try:
1483 bytes_read = ioctx.read(object_name, max_size)
1484 if ((len(bytes_read) == max_size) and
1485 (ioctx.read(object_name, 1, offset=max_size))):
1486 log.warning("Size of object {0} exceeds '{1}' bytes "
1487 "read".format(object_name, max_size))
1488 obj_version = ioctx.get_last_version()
1489 finally:
1490 ioctx.close()
1491 return (bytes_read, obj_version)
1492
1493 def delete_object(self, pool_name, object_name):
1494 ioctx = self.rados.open_ioctx(pool_name)
1495 try:
1496 ioctx.remove_object(object_name)
1497 except rados.ObjectNotFound:
1498 log.warn("Object '{0}' was already removed".format(object_name))
1499 finally:
1500 ioctx.close()