]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/ceph_volume_client.py
68bdc514159a5c4a3a76a5c16f4dfae214f7e2b2
[ceph.git] / ceph / src / pybind / ceph_volume_client.py
1 """
2 Copyright (C) 2015 Red Hat, Inc.
3
4 LGPL-2.1 or LGPL-3.0. See file COPYING.
5 """
6
7 from contextlib import contextmanager
8 import errno
9 import fcntl
10 import json
11 import logging
12 import os
13 import re
14 import struct
15 import sys
16 import threading
17 import time
18 import uuid
19
20 from ceph_argparse import json_command
21
22 import cephfs
23 import rados
24
25 def to_bytes(param):
26 '''
27 Helper method that returns byte representation of the given parameter.
28 '''
29 if isinstance(param, str):
30 return param.encode('utf-8')
31 elif param is None:
32 return param
33 else:
34 return str(param).encode('utf-8')
35
36 class RadosError(Exception):
37 """
38 Something went wrong talking to Ceph with librados
39 """
40 pass
41
42
43 RADOS_TIMEOUT = 10
44
45 log = logging.getLogger(__name__)
46
47 # Reserved volume group name which we use in paths for volumes
48 # that are not assigned to a group (i.e. created with group=None)
49 NO_GROUP_NAME = "_nogroup"
50
51 # Filename extensions for meta files.
52 META_FILE_EXT = ".meta"
53
54 class VolumePath(object):
55 """
56 Identify a volume's path as group->volume
57 The Volume ID is a unique identifier, but this is a much more
58 helpful thing to pass around.
59 """
60 def __init__(self, group_id, volume_id):
61 self.group_id = group_id
62 self.volume_id = volume_id
63 assert self.group_id != NO_GROUP_NAME
64 assert self.volume_id != "" and self.volume_id is not None
65
66 def __str__(self):
67 return "{0}/{1}".format(self.group_id, self.volume_id)
68
69
70 class ClusterTimeout(Exception):
71 """
72 Exception indicating that we timed out trying to talk to the Ceph cluster,
73 either to the mons, or to any individual daemon that the mons indicate ought
74 to be up but isn't responding to us.
75 """
76 pass
77
78
79 class ClusterError(Exception):
80 """
81 Exception indicating that the cluster returned an error to a command that
82 we thought should be successful based on our last knowledge of the cluster
83 state.
84 """
85 def __init__(self, action, result_code, result_str):
86 self._action = action
87 self._result_code = result_code
88 self._result_str = result_str
89
90 def __str__(self):
91 return "Error {0} (\"{1}\") while {2}".format(
92 self._result_code, self._result_str, self._action)
93
94
95 class RankEvicter(threading.Thread):
96 """
97 Thread for evicting client(s) from a particular MDS daemon instance.
98
99 This is more complex than simply sending a command, because we have to
100 handle cases where MDS daemons might not be fully up yet, and/or might
101 be transiently unresponsive to commands.
102 """
103 class GidGone(Exception):
104 pass
105
106 POLL_PERIOD = 5
107
108 def __init__(self, volume_client, client_spec, rank, gid, mds_map, ready_timeout):
109 """
110 :param client_spec: list of strings, used as filter arguments to "session evict"
111 pass ["id=123"] to evict a single client with session id 123.
112 """
113 self.rank = rank
114 self.gid = gid
115 self._mds_map = mds_map
116 self._client_spec = client_spec
117 self._volume_client = volume_client
118 self._ready_timeout = ready_timeout
119 self._ready_waited = 0
120
121 self.success = False
122 self.exception = None
123
124 super(RankEvicter, self).__init__()
125
126 def _ready_to_evict(self):
127 if self._mds_map['up'].get("mds_{0}".format(self.rank), None) != self.gid:
128 log.info("Evicting {0} from {1}/{2}: rank no longer associated with gid, done.".format(
129 self._client_spec, self.rank, self.gid
130 ))
131 raise RankEvicter.GidGone()
132
133 info = self._mds_map['info']["gid_{0}".format(self.gid)]
134 log.debug("_ready_to_evict: state={0}".format(info['state']))
135 return info['state'] in ["up:active", "up:clientreplay"]
136
137 def _wait_for_ready(self):
138 """
139 Wait for that MDS rank to reach an active or clientreplay state, and
140 not be laggy.
141 """
142 while not self._ready_to_evict():
143 if self._ready_waited > self._ready_timeout:
144 raise ClusterTimeout()
145
146 time.sleep(self.POLL_PERIOD)
147 self._ready_waited += self.POLL_PERIOD
148
149 self._mds_map = self._volume_client.get_mds_map()
150
151 def _evict(self):
152 """
153 Run the eviction procedure. Return true on success, false on errors.
154 """
155
156 # Wait til the MDS is believed by the mon to be available for commands
157 try:
158 self._wait_for_ready()
159 except self.GidGone:
160 return True
161
162 # Then send it an evict
163 ret = errno.ETIMEDOUT
164 while ret == errno.ETIMEDOUT:
165 log.debug("mds_command: {0}, {1}".format(
166 "%s" % self.gid, ["session", "evict"] + self._client_spec
167 ))
168 ret, outb, outs = self._volume_client.fs.mds_command(
169 "%s" % self.gid,
170 [json.dumps({
171 "prefix": "session evict",
172 "filters": self._client_spec
173 })], "")
174 log.debug("mds_command: complete {0} {1}".format(ret, outs))
175
176 # If we get a clean response, great, it's gone from that rank.
177 if ret == 0:
178 return True
179 elif ret == errno.ETIMEDOUT:
180 # Oh no, the MDS went laggy (that's how libcephfs knows to emit this error)
181 self._mds_map = self._volume_client.get_mds_map()
182 try:
183 self._wait_for_ready()
184 except self.GidGone:
185 return True
186 else:
187 raise ClusterError("Sending evict to mds.{0}".format(self.gid), ret, outs)
188
189 def run(self):
190 try:
191 self._evict()
192 except Exception as e:
193 self.success = False
194 self.exception = e
195 else:
196 self.success = True
197
198
199 class EvictionError(Exception):
200 pass
201
202
203 class CephFSVolumeClientError(Exception):
204 """
205 Something went wrong talking to Ceph using CephFSVolumeClient.
206 """
207 pass
208
209
210 CEPHFSVOLUMECLIENT_VERSION_HISTORY = """
211
212 CephFSVolumeClient Version History:
213
214 * 1 - Initial version
215 * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient
216 * 3 - Allow volumes to be created without RADOS namespace isolation
217 * 4 - Added get_object_and_version, put_object_versioned method to CephFSVolumeClient
218 """
219
220
221 class CephFSVolumeClient(object):
222 """
223 Combine libcephfs and librados interfaces to implement a
224 'Volume' concept implemented as a cephfs directory and
225 client capabilities which restrict mount access to this
226 directory.
227
228 Additionally, volumes may be in a 'Group'. Conveniently,
229 volumes are a lot like manila shares, and groups are a lot
230 like manila consistency groups.
231
232 Refer to volumes with VolumePath, which specifies the
233 volume and group IDs (both strings). The group ID may
234 be None.
235
236 In general, functions in this class are allowed raise rados.Error
237 or cephfs.Error exceptions in unexpected situations.
238 """
239
240 # Current version
241 version = 4
242
243 # Where shall we create our volumes?
244 POOL_PREFIX = "fsvolume_"
245 DEFAULT_VOL_PREFIX = "/volumes"
246 DEFAULT_NS_PREFIX = "fsvolumens_"
247
248 def __init__(self, auth_id=None, conf_path=None, cluster_name=None,
249 volume_prefix=None, pool_ns_prefix=None, rados=None,
250 fs_name=None):
251 """
252 Either set all three of ``auth_id``, ``conf_path`` and
253 ``cluster_name`` (rados constructed on connect), or
254 set ``rados`` (existing rados instance).
255 """
256 self.fs = None
257 self.fs_name = fs_name
258 self.connected = False
259
260 self.conf_path = conf_path
261 self.cluster_name = cluster_name
262 self.auth_id = auth_id
263
264 self.rados = rados
265 if self.rados:
266 # Using an externally owned rados, so we won't tear it down
267 # on disconnect
268 self.own_rados = False
269 else:
270 # self.rados will be constructed in connect
271 self.own_rados = True
272
273 self.volume_prefix = volume_prefix if volume_prefix else self.DEFAULT_VOL_PREFIX
274 self.pool_ns_prefix = pool_ns_prefix if pool_ns_prefix else self.DEFAULT_NS_PREFIX
275 # For flock'ing in cephfs, I want a unique ID to distinguish me
276 # from any other manila-share services that are loading this module.
277 # We could use pid, but that's unnecessary weak: generate a
278 # UUID
279 self._id = struct.unpack(">Q", uuid.uuid1().bytes[0:8])[0]
280
281 # TODO: version the on-disk structures
282
283 def recover(self):
284 # Scan all auth keys to see if they're dirty: if they are, they have
285 # state that might not have propagated to Ceph or to the related
286 # volumes yet.
287
288 # Important: we *always* acquire locks in the order auth->volume
289 # That means a volume can never be dirty without the auth key
290 # we're updating it with being dirty at the same time.
291
292 # First list the auth IDs that have potentially dirty on-disk metadata
293 log.debug("Recovering from partial auth updates (if any)...")
294
295 try:
296 dir_handle = self.fs.opendir(self.volume_prefix)
297 except cephfs.ObjectNotFound:
298 log.debug("Nothing to recover. No auth meta files.")
299 return
300
301 d = self.fs.readdir(dir_handle)
302 auth_ids = []
303
304 if not d:
305 log.debug("Nothing to recover. No auth meta files.")
306
307 while d:
308 # Identify auth IDs from auth meta filenames. The auth meta files
309 # are named as, "$<auth_id><meta filename extension>"
310 regex = "^\$(.*){0}$".format(re.escape(META_FILE_EXT))
311 match = re.search(regex, d.d_name.decode(encoding='utf-8'))
312 if match:
313 auth_ids.append(match.group(1))
314
315 d = self.fs.readdir(dir_handle)
316
317 self.fs.closedir(dir_handle)
318
319 # Key points based on ordering:
320 # * Anything added in VMeta is already added in AMeta
321 # * Anything added in Ceph is already added in VMeta
322 # * Anything removed in VMeta is already removed in Ceph
323 # * Anything removed in AMeta is already removed in VMeta
324
325 # Deauthorization: because I only update metadata AFTER the
326 # update of the next level down, I have the same ordering of
327 # -> things which exist in the AMeta should also exist
328 # in the VMeta, should also exist in Ceph, and the same
329 # recovery procedure that gets me consistent after crashes
330 # during authorization will also work during deauthorization
331
332 # Now for each auth ID, check for dirty flag and apply updates
333 # if dirty flag is found
334 for auth_id in auth_ids:
335 with self._auth_lock(auth_id):
336 auth_meta = self._auth_metadata_get(auth_id)
337 if not auth_meta or not auth_meta['volumes']:
338 # Clean up auth meta file
339 self.fs.unlink(self._auth_metadata_path(auth_id))
340 continue
341 if not auth_meta['dirty']:
342 continue
343 self._recover_auth_meta(auth_id, auth_meta)
344
345 log.debug("Recovered from partial auth updates (if any).")
346
347 def _recover_auth_meta(self, auth_id, auth_meta):
348 """
349 Call me after locking the auth meta file.
350 """
351 remove_volumes = []
352
353 for volume, volume_data in auth_meta['volumes'].items():
354 if not volume_data['dirty']:
355 continue
356
357 (group_id, volume_id) = volume.split('/')
358 group_id = group_id if group_id is not 'None' else None
359 volume_path = VolumePath(group_id, volume_id)
360 access_level = volume_data['access_level']
361
362 with self._volume_lock(volume_path):
363 vol_meta = self._volume_metadata_get(volume_path)
364
365 # No VMeta update indicates that there was no auth update
366 # in Ceph either. So it's safe to remove corresponding
367 # partial update in AMeta.
368 if not vol_meta or auth_id not in vol_meta['auths']:
369 remove_volumes.append(volume)
370 continue
371
372 want_auth = {
373 'access_level': access_level,
374 'dirty': False,
375 }
376 # VMeta update looks clean. Ceph auth update must have been
377 # clean.
378 if vol_meta['auths'][auth_id] == want_auth:
379 continue
380
381 readonly = True if access_level is 'r' else False
382 self._authorize_volume(volume_path, auth_id, readonly)
383
384 # Recovered from partial auth updates for the auth ID's access
385 # to a volume.
386 auth_meta['volumes'][volume]['dirty'] = False
387 self._auth_metadata_set(auth_id, auth_meta)
388
389 for volume in remove_volumes:
390 del auth_meta['volumes'][volume]
391
392 if not auth_meta['volumes']:
393 # Clean up auth meta file
394 self.fs.unlink(self._auth_metadata_path(auth_id))
395 return
396
397 # Recovered from all partial auth updates for the auth ID.
398 auth_meta['dirty'] = False
399 self._auth_metadata_set(auth_id, auth_meta)
400
401 def get_mds_map(self):
402 fs_map = self._rados_command("fs dump", {})
403 return fs_map['filesystems'][0]['mdsmap']
404
405 def evict(self, auth_id, timeout=30, volume_path=None):
406 """
407 Evict all clients based on the authorization ID and optionally based on
408 the volume path mounted. Assumes that the authorization key has been
409 revoked prior to calling this function.
410
411 This operation can throw an exception if the mon cluster is unresponsive, or
412 any individual MDS daemon is unresponsive for longer than the timeout passed in.
413 """
414
415 client_spec = ["auth_name={0}".format(auth_id), ]
416 if volume_path:
417 client_spec.append("client_metadata.root={0}".
418 format(self._get_path(volume_path)))
419
420 log.info("evict clients with {0}".format(', '.join(client_spec)))
421
422 mds_map = self.get_mds_map()
423 up = {}
424 for name, gid in mds_map['up'].items():
425 # Quirk of the MDSMap JSON dump: keys in the up dict are like "mds_0"
426 assert name.startswith("mds_")
427 up[int(name[4:])] = gid
428
429 # For all MDS ranks held by a daemon
430 # Do the parallelism in python instead of using "tell mds.*", because
431 # the latter doesn't give us per-mds output
432 threads = []
433 for rank, gid in up.items():
434 thread = RankEvicter(self, client_spec, rank, gid, mds_map,
435 timeout)
436 thread.start()
437 threads.append(thread)
438
439 for t in threads:
440 t.join()
441
442 log.info("evict: joined all")
443
444 for t in threads:
445 if not t.success:
446 msg = ("Failed to evict client with {0} from mds {1}/{2}: {3}".
447 format(', '.join(client_spec), t.rank, t.gid, t.exception)
448 )
449 log.error(msg)
450 raise EvictionError(msg)
451
452 def _get_path(self, volume_path):
453 """
454 Determine the path within CephFS where this volume will live
455 :return: absolute path (string)
456 """
457 return os.path.join(
458 self.volume_prefix,
459 volume_path.group_id if volume_path.group_id is not None else NO_GROUP_NAME,
460 volume_path.volume_id)
461
462 def _get_group_path(self, group_id):
463 if group_id is None:
464 raise ValueError("group_id may not be None")
465
466 return os.path.join(
467 self.volume_prefix,
468 group_id
469 )
470
471 def _connect(self, premount_evict):
472 log.debug("Connecting to cephfs...")
473 self.fs = cephfs.LibCephFS(rados_inst=self.rados)
474 log.debug("CephFS initializing...")
475 self.fs.init()
476 if premount_evict is not None:
477 log.debug("Premount eviction of {0} starting".format(premount_evict))
478 self.evict(premount_evict)
479 log.debug("Premount eviction of {0} completes".format(premount_evict))
480 log.debug("CephFS mounting...")
481 self.fs.mount(filesystem_name=to_bytes(self.fs_name))
482 log.debug("Connection to cephfs complete")
483
484 # Recover from partial auth updates due to a previous
485 # crash.
486 self.recover()
487
488 def connect(self, premount_evict = None):
489 """
490
491 :param premount_evict: Optional auth_id to evict before mounting the filesystem: callers
492 may want to use this to specify their own auth ID if they expect
493 to be a unique instance and don't want to wait for caps to time
494 out after failure of another instance of themselves.
495 """
496 if self.own_rados:
497 log.debug("Configuring to RADOS with config {0}...".format(self.conf_path))
498 self.rados = rados.Rados(
499 name="client.{0}".format(self.auth_id),
500 clustername=self.cluster_name,
501 conffile=self.conf_path,
502 conf={}
503 )
504 if self.rados.state != "connected":
505 log.debug("Connecting to RADOS...")
506 self.rados.connect()
507 log.debug("Connection to RADOS complete")
508 self._connect(premount_evict)
509
510 def get_mon_addrs(self):
511 log.info("get_mon_addrs")
512 result = []
513 mon_map = self._rados_command("mon dump")
514 for mon in mon_map['mons']:
515 ip_port = mon['addr'].split("/")[0]
516 result.append(ip_port)
517
518 return result
519
520 def disconnect(self):
521 log.info("disconnect")
522 if self.fs:
523 log.debug("Disconnecting cephfs...")
524 self.fs.shutdown()
525 self.fs = None
526 log.debug("Disconnecting cephfs complete")
527
528 if self.rados and self.own_rados:
529 log.debug("Disconnecting rados...")
530 self.rados.shutdown()
531 self.rados = None
532 log.debug("Disconnecting rados complete")
533
534 def __enter__(self):
535 self.connect()
536 return self
537
538 def __exit__(self, exc_type, exc_val, exc_tb):
539 self.disconnect()
540
541 def __del__(self):
542 self.disconnect()
543
544 def _get_pool_id(self, osd_map, pool_name):
545 # Maybe borrow the OSDMap wrapper class from calamari if more helpers
546 # like this are needed.
547 for pool in osd_map['pools']:
548 if pool['pool_name'] == pool_name:
549 return pool['pool']
550
551 return None
552
553 def _create_volume_pool(self, pool_name):
554 """
555 Idempotently create a pool for use as a CephFS data pool, with the given name
556
557 :return The ID of the created pool
558 """
559 osd_map = self._rados_command('osd dump', {})
560
561 existing_id = self._get_pool_id(osd_map, pool_name)
562 if existing_id is not None:
563 log.info("Pool {0} already exists".format(pool_name))
564 return existing_id
565
566 self._rados_command(
567 'osd pool create',
568 {
569 'pool': pool_name,
570 }
571 )
572
573 osd_map = self._rados_command('osd dump', {})
574 pool_id = self._get_pool_id(osd_map, pool_name)
575
576 if pool_id is None:
577 # If the pool isn't there, that's either a ceph bug, or it's some outside influence
578 # removing it right after we created it.
579 log.error("OSD map doesn't contain expected pool '{0}':\n{1}".format(
580 pool_name, json.dumps(osd_map, indent=2)
581 ))
582 raise RuntimeError("Pool '{0}' not present in map after creation".format(pool_name))
583 else:
584 return pool_id
585
586 def create_group(self, group_id, mode=0o755):
587 # Prevent craftily-named volume groups from colliding with the meta
588 # files.
589 if group_id.endswith(META_FILE_EXT):
590 raise ValueError("group ID cannot end with '{0}'.".format(
591 META_FILE_EXT))
592 path = self._get_group_path(group_id)
593 self._mkdir_p(path, mode)
594
595 def destroy_group(self, group_id):
596 path = self._get_group_path(group_id)
597 try:
598 self.fs.stat(self.volume_prefix)
599 except cephfs.ObjectNotFound:
600 pass
601 else:
602 self.fs.rmdir(path)
603
604 def _mkdir_p(self, path, mode=0o755):
605 try:
606 self.fs.stat(path)
607 except cephfs.ObjectNotFound:
608 pass
609 else:
610 return
611
612 parts = path.split(os.path.sep)
613
614 for i in range(1, len(parts) + 1):
615 subpath = os.path.join(*parts[0:i])
616 try:
617 self.fs.stat(subpath)
618 except cephfs.ObjectNotFound:
619 self.fs.mkdir(subpath, mode)
620
621 def create_volume(self, volume_path, size=None, data_isolated=False, namespace_isolated=True,
622 mode=0o755):
623 """
624 Set up metadata, pools and auth for a volume.
625
626 This function is idempotent. It is safe to call this again
627 for an already-created volume, even if it is in use.
628
629 :param volume_path: VolumePath instance
630 :param size: In bytes, or None for no size limit
631 :param data_isolated: If true, create a separate OSD pool for this volume
632 :param namespace_isolated: If true, use separate RADOS namespace for this volume
633 :return:
634 """
635 path = self._get_path(volume_path)
636 log.info("create_volume: {0}".format(path))
637
638 self._mkdir_p(path, mode)
639
640 if size is not None:
641 self.fs.setxattr(path, 'ceph.quota.max_bytes', to_bytes(size), 0)
642
643 # data_isolated means create a separate pool for this volume
644 if data_isolated:
645 pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
646 log.info("create_volume: {0}, create pool {1} as data_isolated =True.".format(volume_path, pool_name))
647 pool_id = self._create_volume_pool(pool_name)
648 mds_map = self.get_mds_map()
649 if pool_id not in mds_map['data_pools']:
650 self._rados_command("fs add_data_pool", {
651 'fs_name': mds_map['fs_name'],
652 'pool': pool_name
653 })
654 time.sleep(5) # time for MDSMap to be distributed
655 self.fs.setxattr(path, 'ceph.dir.layout.pool', to_bytes(pool_name), 0)
656
657 # enforce security isolation, use separate namespace for this volume
658 if namespace_isolated:
659 namespace = "{0}{1}".format(self.pool_ns_prefix, volume_path.volume_id)
660 log.info("create_volume: {0}, using rados namespace {1} to isolate data.".format(volume_path, namespace))
661 self.fs.setxattr(path, 'ceph.dir.layout.pool_namespace',
662 to_bytes(namespace), 0)
663 else:
664 # If volume's namespace layout is not set, then the volume's pool
665 # layout remains unset and will undesirably change with ancestor's
666 # pool layout changes.
667 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
668 self.fs.setxattr(path, 'ceph.dir.layout.pool',
669 to_bytes(pool_name), 0)
670
671 # Create a volume meta file, if it does not already exist, to store
672 # data about auth ids having access to the volume
673 fd = self.fs.open(self._volume_metadata_path(volume_path),
674 os.O_CREAT, 0o755)
675 self.fs.close(fd)
676
677 return {
678 'mount_path': path
679 }
680
681 def delete_volume(self, volume_path, data_isolated=False):
682 """
683 Make a volume inaccessible to guests. This function is
684 idempotent. This is the fast part of tearing down a volume: you must
685 also later call purge_volume, which is the slow part.
686
687 :param volume_path: Same identifier used in create_volume
688 :return:
689 """
690
691 path = self._get_path(volume_path)
692 log.info("delete_volume: {0}".format(path))
693
694 # Create the trash folder if it doesn't already exist
695 trash = os.path.join(self.volume_prefix, "_deleting")
696 self._mkdir_p(trash)
697
698 # We'll move it to here
699 trashed_volume = os.path.join(trash, volume_path.volume_id)
700
701 # Move the volume's data to the trash folder
702 try:
703 self.fs.stat(path)
704 except cephfs.ObjectNotFound:
705 log.warning("Trying to delete volume '{0}' but it's already gone".format(
706 path))
707 else:
708 self.fs.rename(path, trashed_volume)
709
710 # Delete the volume meta file, if it's not already deleted
711 vol_meta_path = self._volume_metadata_path(volume_path)
712 try:
713 self.fs.unlink(vol_meta_path)
714 except cephfs.ObjectNotFound:
715 pass
716
717 def purge_volume(self, volume_path, data_isolated=False):
718 """
719 Finish clearing up a volume that was previously passed to delete_volume. This
720 function is idempotent.
721 """
722
723 trash = os.path.join(self.volume_prefix, "_deleting")
724 trashed_volume = os.path.join(trash, volume_path.volume_id)
725
726 try:
727 self.fs.stat(trashed_volume)
728 except cephfs.ObjectNotFound:
729 log.warning("Trying to purge volume '{0}' but it's already been purged".format(
730 trashed_volume))
731 return
732
733 def rmtree(root_path):
734 log.debug("rmtree {0}".format(root_path))
735 dir_handle = self.fs.opendir(root_path)
736 d = self.fs.readdir(dir_handle)
737 while d:
738 d_name = d.d_name.decode(encoding='utf-8')
739 if d_name not in [".", ".."]:
740 # Do not use os.path.join because it is sensitive
741 # to string encoding, we just pass through dnames
742 # as byte arrays
743 d_full = u"{0}/{1}".format(root_path, d_name)
744 if d.is_dir():
745 rmtree(d_full)
746 else:
747 self.fs.unlink(d_full)
748
749 d = self.fs.readdir(dir_handle)
750 self.fs.closedir(dir_handle)
751
752 self.fs.rmdir(root_path)
753
754 rmtree(trashed_volume)
755
756 if data_isolated:
757 pool_name = "{0}{1}".format(self.POOL_PREFIX, volume_path.volume_id)
758 osd_map = self._rados_command("osd dump", {})
759 pool_id = self._get_pool_id(osd_map, pool_name)
760 mds_map = self.get_mds_map()
761 if pool_id in mds_map['data_pools']:
762 self._rados_command("fs rm_data_pool", {
763 'fs_name': mds_map['fs_name'],
764 'pool': pool_name
765 })
766 self._rados_command("osd pool delete",
767 {
768 "pool": pool_name,
769 "pool2": pool_name,
770 "yes_i_really_really_mean_it": True
771 })
772
773 def _get_ancestor_xattr(self, path, attr):
774 """
775 Helper for reading layout information: if this xattr is missing
776 on the requested path, keep checking parents until we find it.
777 """
778 try:
779 result = self.fs.getxattr(path, attr).decode()
780 if result == "":
781 # Annoying! cephfs gives us empty instead of an error when attr not found
782 raise cephfs.NoData()
783 else:
784 return result
785 except cephfs.NoData:
786 if path == "/":
787 raise
788 else:
789 return self._get_ancestor_xattr(os.path.split(path)[0], attr)
790
791 def _check_compat_version(self, compat_version):
792 if self.version < compat_version:
793 msg = ("The current version of CephFSVolumeClient, version {0} "
794 "does not support the required feature. Need version {1} "
795 "or greater".format(self.version, compat_version)
796 )
797 log.error(msg)
798 raise CephFSVolumeClientError(msg)
799
800 def _metadata_get(self, path):
801 """
802 Return a deserialized JSON object, or None
803 """
804 fd = self.fs.open(path, "r")
805 # TODO iterate instead of assuming file < 4MB
806 read_bytes = self.fs.read(fd, 0, 4096 * 1024)
807 self.fs.close(fd)
808 if read_bytes:
809 return json.loads(read_bytes.decode())
810 else:
811 return None
812
813 def _metadata_set(self, path, data):
814 serialized = json.dumps(data)
815 fd = self.fs.open(path, "w")
816 try:
817 self.fs.write(fd, to_bytes(serialized), 0)
818 self.fs.fsync(fd, 0)
819 finally:
820 self.fs.close(fd)
821
822 def _lock(self, path):
823 @contextmanager
824 def fn():
825 while(1):
826 fd = self.fs.open(path, os.O_CREAT, 0o755)
827 self.fs.flock(fd, fcntl.LOCK_EX, self._id)
828
829 # The locked file will be cleaned up sometime. It could be
830 # unlinked e.g., by an another manila share instance, before
831 # lock was applied on it. Perform checks to ensure that this
832 # does not happen.
833 try:
834 statbuf = self.fs.stat(path)
835 except cephfs.ObjectNotFound:
836 self.fs.close(fd)
837 continue
838
839 fstatbuf = self.fs.fstat(fd)
840 if statbuf.st_ino == fstatbuf.st_ino:
841 break
842
843 try:
844 yield
845 finally:
846 self.fs.flock(fd, fcntl.LOCK_UN, self._id)
847 self.fs.close(fd)
848
849 return fn()
850
851 def _auth_metadata_path(self, auth_id):
852 return os.path.join(self.volume_prefix, "${0}{1}".format(
853 auth_id, META_FILE_EXT))
854
855 def _auth_lock(self, auth_id):
856 return self._lock(self._auth_metadata_path(auth_id))
857
858 def _auth_metadata_get(self, auth_id):
859 """
860 Call me with the metadata locked!
861
862 Check whether a auth metadata structure can be decoded by the current
863 version of CephFSVolumeClient.
864
865 Return auth metadata that the current version of CephFSVolumeClient
866 can decode.
867 """
868 auth_metadata = self._metadata_get(self._auth_metadata_path(auth_id))
869
870 if auth_metadata:
871 self._check_compat_version(auth_metadata['compat_version'])
872
873 return auth_metadata
874
875 def _auth_metadata_set(self, auth_id, data):
876 """
877 Call me with the metadata locked!
878
879 Fsync the auth metadata.
880
881 Add two version attributes to the auth metadata,
882 'compat_version', the minimum CephFSVolumeClient version that can
883 decode the metadata, and 'version', the CephFSVolumeClient version
884 that encoded the metadata.
885 """
886 data['compat_version'] = 1
887 data['version'] = self.version
888 return self._metadata_set(self._auth_metadata_path(auth_id), data)
889
890 def _volume_metadata_path(self, volume_path):
891 return os.path.join(self.volume_prefix, "_{0}:{1}{2}".format(
892 volume_path.group_id if volume_path.group_id else "",
893 volume_path.volume_id,
894 META_FILE_EXT
895 ))
896
897 def _volume_lock(self, volume_path):
898 """
899 Return a ContextManager which locks the authorization metadata for
900 a particular volume, and persists a flag to the metadata indicating
901 that it is currently locked, so that we can detect dirty situations
902 during recovery.
903
904 This lock isn't just to make access to the metadata safe: it's also
905 designed to be used over the two-step process of checking the
906 metadata and then responding to an authorization request, to
907 ensure that at the point we respond the metadata hasn't changed
908 in the background. It's key to how we avoid security holes
909 resulting from races during that problem ,
910 """
911 return self._lock(self._volume_metadata_path(volume_path))
912
913 def _volume_metadata_get(self, volume_path):
914 """
915 Call me with the metadata locked!
916
917 Check whether a volume metadata structure can be decoded by the current
918 version of CephFSVolumeClient.
919
920 Return a volume_metadata structure that the current version of
921 CephFSVolumeClient can decode.
922 """
923 volume_metadata = self._metadata_get(self._volume_metadata_path(volume_path))
924
925 if volume_metadata:
926 self._check_compat_version(volume_metadata['compat_version'])
927
928 return volume_metadata
929
930 def _volume_metadata_set(self, volume_path, data):
931 """
932 Call me with the metadata locked!
933
934 Add two version attributes to the volume metadata,
935 'compat_version', the minimum CephFSVolumeClient version that can
936 decode the metadata and 'version', the CephFSVolumeClient version
937 that encoded the metadata.
938 """
939 data['compat_version'] = 1
940 data['version'] = self.version
941 return self._metadata_set(self._volume_metadata_path(volume_path), data)
942
943 def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None):
944 """
945 Get-or-create a Ceph auth identity for `auth_id` and grant them access
946 to
947 :param volume_path:
948 :param auth_id:
949 :param readonly:
950 :param tenant_id: Optionally provide a stringizable object to
951 restrict any created cephx IDs to other callers
952 passing the same tenant ID.
953 :return:
954 """
955
956 with self._auth_lock(auth_id):
957 # Existing meta, or None, to be updated
958 auth_meta = self._auth_metadata_get(auth_id)
959
960 # volume data to be inserted
961 volume_path_str = str(volume_path)
962 volume = {
963 volume_path_str : {
964 # The access level at which the auth_id is authorized to
965 # access the volume.
966 'access_level': 'r' if readonly else 'rw',
967 'dirty': True,
968 }
969 }
970 if auth_meta is None:
971 sys.stderr.write("Creating meta for ID {0} with tenant {1}\n".format(
972 auth_id, tenant_id
973 ))
974 log.debug("Authorize: no existing meta")
975 auth_meta = {
976 'dirty': True,
977 'tenant_id': tenant_id.__str__() if tenant_id else None,
978 'volumes': volume
979 }
980
981 # Note: this is *not* guaranteeing that the key doesn't already
982 # exist in Ceph: we are allowing VolumeClient tenants to
983 # 'claim' existing Ceph keys. In order to prevent VolumeClient
984 # tenants from reading e.g. client.admin keys, you need to
985 # have configured your VolumeClient user (e.g. Manila) to
986 # have mon auth caps that prevent it from accessing those keys
987 # (e.g. limit it to only access keys with a manila.* prefix)
988 else:
989 # Disallow tenants to share auth IDs
990 if auth_meta['tenant_id'].__str__() != tenant_id.__str__():
991 msg = "auth ID: {0} is already in use".format(auth_id)
992 log.error(msg)
993 raise CephFSVolumeClientError(msg)
994
995 if auth_meta['dirty']:
996 self._recover_auth_meta(auth_id, auth_meta)
997
998 log.debug("Authorize: existing tenant {tenant}".format(
999 tenant=auth_meta['tenant_id']
1000 ))
1001 auth_meta['dirty'] = True
1002 auth_meta['volumes'].update(volume)
1003
1004 self._auth_metadata_set(auth_id, auth_meta)
1005
1006 with self._volume_lock(volume_path):
1007 key = self._authorize_volume(volume_path, auth_id, readonly)
1008
1009 auth_meta['dirty'] = False
1010 auth_meta['volumes'][volume_path_str]['dirty'] = False
1011 self._auth_metadata_set(auth_id, auth_meta)
1012
1013 if tenant_id:
1014 return {
1015 'auth_key': key
1016 }
1017 else:
1018 # Caller wasn't multi-tenant aware: be safe and don't give
1019 # them a key
1020 return {
1021 'auth_key': None
1022 }
1023
1024 def _authorize_volume(self, volume_path, auth_id, readonly):
1025 vol_meta = self._volume_metadata_get(volume_path)
1026
1027 access_level = 'r' if readonly else 'rw'
1028 auth = {
1029 auth_id: {
1030 'access_level': access_level,
1031 'dirty': True,
1032 }
1033 }
1034
1035 if vol_meta is None:
1036 vol_meta = {
1037 'auths': auth
1038 }
1039 else:
1040 vol_meta['auths'].update(auth)
1041 self._volume_metadata_set(volume_path, vol_meta)
1042
1043 key = self._authorize_ceph(volume_path, auth_id, readonly)
1044
1045 vol_meta['auths'][auth_id]['dirty'] = False
1046 self._volume_metadata_set(volume_path, vol_meta)
1047
1048 return key
1049
1050 def _authorize_ceph(self, volume_path, auth_id, readonly):
1051 path = self._get_path(volume_path)
1052 log.debug("Authorizing Ceph id '{0}' for path '{1}'".format(
1053 auth_id, path
1054 ))
1055
1056 # First I need to work out what the data pool is for this share:
1057 # read the layout
1058 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
1059
1060 try:
1061 namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_"
1062 "namespace").decode()
1063 except cephfs.NoData:
1064 namespace = None
1065
1066 # Now construct auth capabilities that give the guest just enough
1067 # permissions to access the share
1068 client_entity = "client.{0}".format(auth_id)
1069 want_access_level = 'r' if readonly else 'rw'
1070 want_mds_cap = 'allow {0} path={1}'.format(want_access_level, path)
1071 if namespace:
1072 want_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
1073 want_access_level, pool_name, namespace)
1074 else:
1075 want_osd_cap = 'allow {0} pool={1}'.format(want_access_level,
1076 pool_name)
1077
1078 try:
1079 existing = self._rados_command(
1080 'auth get',
1081 {
1082 'entity': client_entity
1083 }
1084 )
1085 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1086 except rados.Error:
1087 caps = self._rados_command(
1088 'auth get-or-create',
1089 {
1090 'entity': client_entity,
1091 'caps': [
1092 'mds', want_mds_cap,
1093 'osd', want_osd_cap,
1094 'mon', 'allow r']
1095 })
1096 else:
1097 # entity exists, update it
1098 cap = existing[0]
1099
1100 # Construct auth caps that if present might conflict with the desired
1101 # auth caps.
1102 unwanted_access_level = 'r' if want_access_level is 'rw' else 'rw'
1103 unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path)
1104 if namespace:
1105 unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
1106 unwanted_access_level, pool_name, namespace)
1107 else:
1108 unwanted_osd_cap = 'allow {0} pool={1}'.format(
1109 unwanted_access_level, pool_name)
1110
1111 def cap_update(
1112 orig_mds_caps, orig_osd_caps, want_mds_cap,
1113 want_osd_cap, unwanted_mds_cap, unwanted_osd_cap):
1114
1115 if not orig_mds_caps:
1116 return want_mds_cap, want_osd_cap
1117
1118 mds_cap_tokens = orig_mds_caps.split(",")
1119 osd_cap_tokens = orig_osd_caps.split(",")
1120
1121 if want_mds_cap in mds_cap_tokens:
1122 return orig_mds_caps, orig_osd_caps
1123
1124 if unwanted_mds_cap in mds_cap_tokens:
1125 mds_cap_tokens.remove(unwanted_mds_cap)
1126 osd_cap_tokens.remove(unwanted_osd_cap)
1127
1128 mds_cap_tokens.append(want_mds_cap)
1129 osd_cap_tokens.append(want_osd_cap)
1130
1131 return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens)
1132
1133 orig_mds_caps = cap['caps'].get('mds', "")
1134 orig_osd_caps = cap['caps'].get('osd', "")
1135
1136 mds_cap_str, osd_cap_str = cap_update(
1137 orig_mds_caps, orig_osd_caps, want_mds_cap, want_osd_cap,
1138 unwanted_mds_cap, unwanted_osd_cap)
1139
1140 caps = self._rados_command(
1141 'auth caps',
1142 {
1143 'entity': client_entity,
1144 'caps': [
1145 'mds', mds_cap_str,
1146 'osd', osd_cap_str,
1147 'mon', cap['caps'].get('mon', 'allow r')]
1148 })
1149 caps = self._rados_command(
1150 'auth get',
1151 {
1152 'entity': client_entity
1153 }
1154 )
1155
1156 # Result expected like this:
1157 # [
1158 # {
1159 # "entity": "client.foobar",
1160 # "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==",
1161 # "caps": {
1162 # "mds": "allow *",
1163 # "mon": "allow *"
1164 # }
1165 # }
1166 # ]
1167 assert len(caps) == 1
1168 assert caps[0]['entity'] == client_entity
1169 return caps[0]['key']
1170
1171 def deauthorize(self, volume_path, auth_id):
1172 with self._auth_lock(auth_id):
1173 # Existing meta, or None, to be updated
1174 auth_meta = self._auth_metadata_get(auth_id)
1175
1176 volume_path_str = str(volume_path)
1177 if (auth_meta is None) or (not auth_meta['volumes']):
1178 log.warn("deauthorized called for already-removed auth"
1179 "ID '{auth_id}' for volume ID '{volume}'".format(
1180 auth_id=auth_id, volume=volume_path.volume_id
1181 ))
1182 # Clean up the auth meta file of an auth ID
1183 self.fs.unlink(self._auth_metadata_path(auth_id))
1184 return
1185
1186 if volume_path_str not in auth_meta['volumes']:
1187 log.warn("deauthorized called for already-removed auth"
1188 "ID '{auth_id}' for volume ID '{volume}'".format(
1189 auth_id=auth_id, volume=volume_path.volume_id
1190 ))
1191 return
1192
1193 if auth_meta['dirty']:
1194 self._recover_auth_meta(auth_id, auth_meta)
1195
1196 auth_meta['dirty'] = True
1197 auth_meta['volumes'][volume_path_str]['dirty'] = True
1198 self._auth_metadata_set(auth_id, auth_meta)
1199
1200 self._deauthorize_volume(volume_path, auth_id)
1201
1202 # Filter out the volume we're deauthorizing
1203 del auth_meta['volumes'][volume_path_str]
1204
1205 # Clean up auth meta file
1206 if not auth_meta['volumes']:
1207 self.fs.unlink(self._auth_metadata_path(auth_id))
1208 return
1209
1210 auth_meta['dirty'] = False
1211 self._auth_metadata_set(auth_id, auth_meta)
1212
1213 def _deauthorize_volume(self, volume_path, auth_id):
1214 with self._volume_lock(volume_path):
1215 vol_meta = self._volume_metadata_get(volume_path)
1216
1217 if (vol_meta is None) or (auth_id not in vol_meta['auths']):
1218 log.warn("deauthorized called for already-removed auth"
1219 "ID '{auth_id}' for volume ID '{volume}'".format(
1220 auth_id=auth_id, volume=volume_path.volume_id
1221 ))
1222 return
1223
1224 vol_meta['auths'][auth_id]['dirty'] = True
1225 self._volume_metadata_set(volume_path, vol_meta)
1226
1227 self._deauthorize(volume_path, auth_id)
1228
1229 # Remove the auth_id from the metadata *after* removing it
1230 # from ceph, so that if we crashed here, we would actually
1231 # recreate the auth ID during recovery (i.e. end up with
1232 # a consistent state).
1233
1234 # Filter out the auth we're removing
1235 del vol_meta['auths'][auth_id]
1236 self._volume_metadata_set(volume_path, vol_meta)
1237
1238 def _deauthorize(self, volume_path, auth_id):
1239 """
1240 The volume must still exist.
1241 """
1242 client_entity = "client.{0}".format(auth_id)
1243 path = self._get_path(volume_path)
1244 pool_name = self._get_ancestor_xattr(path, "ceph.dir.layout.pool")
1245 try:
1246 namespace = self.fs.getxattr(path, "ceph.dir.layout.pool_"
1247 "namespace").decode()
1248 except cephfs.NoData:
1249 namespace = None
1250
1251 # The auth_id might have read-only or read-write mount access for the
1252 # volume path.
1253 access_levels = ('r', 'rw')
1254 want_mds_caps = ['allow {0} path={1}'.format(access_level, path)
1255 for access_level in access_levels]
1256 if namespace:
1257 want_osd_caps = ['allow {0} pool={1} namespace={2}'.format(access_level, pool_name, namespace)
1258 for access_level in access_levels]
1259 else:
1260 want_osd_caps = ['allow {0} pool={1}'.format(access_level, pool_name)
1261 for access_level in access_levels]
1262
1263
1264 try:
1265 existing = self._rados_command(
1266 'auth get',
1267 {
1268 'entity': client_entity
1269 }
1270 )
1271
1272 def cap_remove(orig_mds_caps, orig_osd_caps, want_mds_caps, want_osd_caps):
1273 mds_cap_tokens = orig_mds_caps.split(",")
1274 osd_cap_tokens = orig_osd_caps.split(",")
1275
1276 for want_mds_cap, want_osd_cap in zip(want_mds_caps, want_osd_caps):
1277 if want_mds_cap in mds_cap_tokens:
1278 mds_cap_tokens.remove(want_mds_cap)
1279 osd_cap_tokens.remove(want_osd_cap)
1280 break
1281
1282 return ",".join(mds_cap_tokens), ",".join(osd_cap_tokens)
1283
1284 cap = existing[0]
1285 orig_mds_caps = cap['caps'].get('mds', "")
1286 orig_osd_caps = cap['caps'].get('osd', "")
1287 mds_cap_str, osd_cap_str = cap_remove(orig_mds_caps, orig_osd_caps,
1288 want_mds_caps, want_osd_caps)
1289
1290 if not mds_cap_str:
1291 self._rados_command('auth del', {'entity': client_entity}, decode=False)
1292 else:
1293 self._rados_command(
1294 'auth caps',
1295 {
1296 'entity': client_entity,
1297 'caps': [
1298 'mds', mds_cap_str,
1299 'osd', osd_cap_str,
1300 'mon', cap['caps'].get('mon', 'allow r')]
1301 })
1302
1303 # FIXME: rados raising Error instead of ObjectNotFound in auth get failure
1304 except rados.Error:
1305 # Already gone, great.
1306 return
1307
1308 def get_authorized_ids(self, volume_path):
1309 """
1310 Expose a list of auth IDs that have access to a volume.
1311
1312 return: a list of (auth_id, access_level) tuples, where
1313 the access_level can be 'r' , or 'rw'.
1314 None if no auth ID is given access to the volume.
1315 """
1316 with self._volume_lock(volume_path):
1317 meta = self._volume_metadata_get(volume_path)
1318 auths = []
1319 if not meta or not meta['auths']:
1320 return None
1321
1322 for auth, auth_data in meta['auths'].items():
1323 # Skip partial auth updates.
1324 if not auth_data['dirty']:
1325 auths.append((auth, auth_data['access_level']))
1326
1327 return auths
1328
1329 def _rados_command(self, prefix, args=None, decode=True):
1330 """
1331 Safer wrapper for ceph_argparse.json_command, which raises
1332 Error exception instead of relying on caller to check return
1333 codes.
1334
1335 Error exception can result from:
1336 * Timeout
1337 * Actual legitimate errors
1338 * Malformed JSON output
1339
1340 return: Decoded object from ceph, or None if empty string returned.
1341 If decode is False, return a string (the data returned by
1342 ceph command)
1343 """
1344 if args is None:
1345 args = {}
1346
1347 argdict = args.copy()
1348 argdict['format'] = 'json'
1349
1350 ret, outbuf, outs = json_command(self.rados,
1351 prefix=prefix,
1352 argdict=argdict,
1353 timeout=RADOS_TIMEOUT)
1354 if ret != 0:
1355 raise rados.Error(outs)
1356 else:
1357 if decode:
1358 if outbuf:
1359 try:
1360 return json.loads(outbuf.decode())
1361 except (ValueError, TypeError):
1362 raise RadosError("Invalid JSON output for command {0}".format(argdict))
1363 else:
1364 return None
1365 else:
1366 return outbuf
1367
1368 def get_used_bytes(self, volume_path):
1369 return int(self.fs.getxattr(self._get_path(volume_path), "ceph.dir."
1370 "rbytes").decode())
1371
1372 def set_max_bytes(self, volume_path, max_bytes):
1373 self.fs.setxattr(self._get_path(volume_path), 'ceph.quota.max_bytes',
1374 to_bytes(max_bytes if max_bytes else 0), 0)
1375
1376 def _snapshot_path(self, dir_path, snapshot_name):
1377 return os.path.join(
1378 dir_path, self.rados.conf_get('client_snapdir'), snapshot_name
1379 )
1380
1381 def _snapshot_create(self, dir_path, snapshot_name, mode=0o755):
1382 # TODO: raise intelligible exception for clusters where snaps are disabled
1383 self.fs.mkdir(self._snapshot_path(dir_path, snapshot_name), mode)
1384
1385 def _snapshot_destroy(self, dir_path, snapshot_name):
1386 """
1387 Remove a snapshot, or do nothing if it already doesn't exist.
1388 """
1389 try:
1390 self.fs.rmdir(self._snapshot_path(dir_path, snapshot_name))
1391 except cephfs.ObjectNotFound:
1392 log.warn("Snapshot was already gone: {0}".format(snapshot_name))
1393
1394 def create_snapshot_volume(self, volume_path, snapshot_name, mode=0o755):
1395 self._snapshot_create(self._get_path(volume_path), snapshot_name, mode)
1396
1397 def destroy_snapshot_volume(self, volume_path, snapshot_name):
1398 self._snapshot_destroy(self._get_path(volume_path), snapshot_name)
1399
1400 def create_snapshot_group(self, group_id, snapshot_name, mode=0o755):
1401 if group_id is None:
1402 raise RuntimeError("Group ID may not be None")
1403
1404 return self._snapshot_create(self._get_group_path(group_id), snapshot_name,
1405 mode)
1406
1407 def destroy_snapshot_group(self, group_id, snapshot_name):
1408 if group_id is None:
1409 raise RuntimeError("Group ID may not be None")
1410 if snapshot_name is None:
1411 raise RuntimeError("Snapshot name may not be None")
1412
1413 return self._snapshot_destroy(self._get_group_path(group_id), snapshot_name)
1414
1415 def _cp_r(self, src, dst):
1416 # TODO
1417 raise NotImplementedError()
1418
1419 def clone_volume_to_existing(self, dest_volume_path, src_volume_path, src_snapshot_name):
1420 dest_fs_path = self._get_path(dest_volume_path)
1421 src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name)
1422
1423 self._cp_r(src_snapshot_path, dest_fs_path)
1424
1425 def put_object(self, pool_name, object_name, data):
1426 """
1427 Synchronously write data to an object.
1428
1429 :param pool_name: name of the pool
1430 :type pool_name: str
1431 :param object_name: name of the object
1432 :type object_name: str
1433 :param data: data to write
1434 :type data: bytes
1435 """
1436 return self.put_object_versioned(pool_name, object_name, data)
1437
1438 def put_object_versioned(self, pool_name, object_name, data, version=None):
1439 """
1440 Synchronously write data to an object only if version of the object
1441 version matches the expected version.
1442
1443 :param pool_name: name of the pool
1444 :type pool_name: str
1445 :param object_name: name of the object
1446 :type object_name: str
1447 :param data: data to write
1448 :type data: bytes
1449 :param version: expected version of the object to write
1450 :type version: int
1451 """
1452 ioctx = self.rados.open_ioctx(pool_name)
1453
1454 max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
1455 if len(data) > max_size:
1456 msg = ("Data to be written to object '{0}' exceeds "
1457 "{1} bytes".format(object_name, max_size))
1458 log.error(msg)
1459 raise CephFSVolumeClientError(msg)
1460
1461 try:
1462 with rados.WriteOpCtx() as wop:
1463 if version is not None:
1464 wop.assert_version(version)
1465 wop.write_full(data)
1466 ioctx.operate_write_op(wop, object_name)
1467 except rados.OSError as e:
1468 log.error(e)
1469 raise e
1470 finally:
1471 ioctx.close()
1472
1473 def get_object(self, pool_name, object_name):
1474 """
1475 Synchronously read data from object.
1476
1477 :param pool_name: name of the pool
1478 :type pool_name: str
1479 :param object_name: name of the object
1480 :type object_name: str
1481
1482 :returns: bytes - data read from object
1483 """
1484 return self.get_object_and_version(pool_name, object_name)[0]
1485
1486 def get_object_and_version(self, pool_name, object_name):
1487 """
1488 Synchronously read data from object and get its version.
1489
1490 :param pool_name: name of the pool
1491 :type pool_name: str
1492 :param object_name: name of the object
1493 :type object_name: str
1494
1495 :returns: tuple of object data and version
1496 """
1497 ioctx = self.rados.open_ioctx(pool_name)
1498 max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
1499 try:
1500 bytes_read = ioctx.read(object_name, max_size)
1501 if ((len(bytes_read) == max_size) and
1502 (ioctx.read(object_name, 1, offset=max_size))):
1503 log.warning("Size of object {0} exceeds '{1}' bytes "
1504 "read".format(object_name, max_size))
1505 obj_version = ioctx.get_last_version()
1506 finally:
1507 ioctx.close()
1508 return (bytes_read, obj_version)
1509
1510 def delete_object(self, pool_name, object_name):
1511 ioctx = self.rados.open_ioctx(pool_name)
1512 try:
1513 ioctx.remove_object(object_name)
1514 except rados.ObjectNotFound:
1515 log.warn("Object '{0}' was already removed".format(object_name))
1516 finally:
1517 ioctx.close()