RADOS_TIMEOUT = 10
-SNAP_DIR = ".snap"
log = logging.getLogger(__name__)
CephFSVolumeClient Version History:
* 1 - Initial version
+ * 2 - Added get_object, put_object, delete_object methods to CephFSVolumeClient
"""
"""
# Current version
- version = 1
- # Earliest compatible version
- compat_version = 1
+ version = 2
# Where shall we create our volumes?
POOL_PREFIX = "fsvolume_"
# We can't query the actual cluster config remotely, but since this is
# just a heuristic we'll assume that the ceph.conf we have locally reflects
# that in use in the rest of the cluster.
- pg_warn_max_per_osd = int(self.rados.conf_get('mon_pg_warn_max_per_osd'))
+ pg_warn_max_per_osd = int(self.rados.conf_get('mon_max_pg_per_osd'))
other_pgs = 0
for pool in osd_map['pools']:
that encoded the metadata.
"""
data['compat_version'] = 1
- data['version'] = 1
+ data['version'] = self.version
return self._metadata_set(self._auth_metadata_path(auth_id), data)
def _volume_metadata_path(self, volume_path):
that encoded the metadata.
"""
data['compat_version'] = 1
- data['version'] = 1
+ data['version'] = self.version
return self._metadata_set(self._volume_metadata_path(volume_path), data)
def authorize(self, volume_path, auth_id, readonly=False, tenant_id=None):
# occurrence of wanted auth caps and no occurrence of
# conflicting auth caps.
+ if not orig:
+ return want
+
cap_tokens = set(orig.split(","))
cap_tokens.discard(unwanted)
def _snapshot_path(self, dir_path, snapshot_name):
return os.path.join(
- dir_path, SNAP_DIR, snapshot_name
+ dir_path, self.rados.conf_get('client_snapdir'), snapshot_name
)
def _snapshot_create(self, dir_path, snapshot_name):
src_snapshot_path = self._snapshot_path(self._get_path(src_volume_path), src_snapshot_name)
self._cp_r(src_snapshot_path, dest_fs_path)
+
+ def put_object(self, pool_name, object_name, data):
+ """
+ Synchronously write data to an object.
+
+ :param pool_name: name of the pool
+ :type pool_name: str
+ :param object_name: name of the object
+ :type object_name: str
+ :param data: data to write
+ :type data: bytes
+ """
+ ioctx = self.rados.open_ioctx(pool_name)
+ max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
+ if len(data) > max_size:
+ msg = ("Data to be written to object '{0}' exceeds "
+ "{1} bytes".format(object_name, max_size))
+ log.error(msg)
+ raise CephFSVolumeClientError(msg)
+ try:
+ ioctx.write_full(object_name, data)
+ finally:
+ ioctx.close()
+
+ def get_object(self, pool_name, object_name):
+ """
+ Synchronously read data from object.
+
+ :param pool_name: name of the pool
+ :type pool_name: str
+ :param object_name: name of the object
+ :type object_name: str
+
+ :returns: bytes - data read from object
+ """
+ ioctx = self.rados.open_ioctx(pool_name)
+ max_size = int(self.rados.conf_get('osd_max_write_size')) * 1024 * 1024
+ try:
+ bytes_read = ioctx.read(object_name, max_size)
+ if ((len(bytes_read) == max_size) and
+ (ioctx.read(object_name, 1, offset=max_size))):
+ log.warning("Size of object {0} exceeds '{1}' bytes "
+ "read".format(object_name, max_size))
+ finally:
+ ioctx.close()
+ return bytes_read
+
+ def delete_object(self, pool_name, object_name):
+ ioctx = self.rados.open_ioctx(pool_name)
+ try:
+ ioctx.remove_object(object_name)
+ except rados.ObjectNotFound:
+ log.warn("Object '{0}' was already removed".format(object_name))
+ finally:
+ ioctx.close()