6 from typing
import TYPE_CHECKING
10 from mgr_util
import CephfsClient
12 from .fs_util
import listdir
, has_subdir
14 from .operations
.group
import open_group
, create_group
, remove_group
, \
15 open_group_unique
, set_group_attrs
16 from .operations
.volume
import create_volume
, delete_volume
, rename_volume
, \
17 list_volumes
, open_volume
, get_pool_names
, get_pool_ids
, get_pending_subvol_deletions_count
18 from .operations
.subvolume
import open_subvol
, create_subvol
, remove_subvol
, \
20 from .operations
.trash
import Trash
22 from .vol_spec
import VolSpec
23 from .exception
import VolumeException
, ClusterError
, ClusterTimeout
, EvictionError
24 from .async_cloner
import Cloner
25 from .purge_queue
import ThreadPoolPurgeQueueMixin
26 from .operations
.template
import SubvolumeOpType
29 from volumes
import Module
31 log
= logging
.getLogger(__name__
)
33 ALLOWED_ACCESS_LEVELS
= ('r', 'rw')
36 def octal_str_to_decimal_int(mode
):
40 raise VolumeException(-errno
.EINVAL
, "Invalid mode '{0}'".format(mode
))
43 def name_to_json(names
):
45 convert the list of names to json
48 for i
in range(len(names
)):
49 namedict
.append({'name': names
[i
].decode('utf-8')})
50 return json
.dumps(namedict
, indent
=4, sort_keys
=True)
53 class VolumeClient(CephfsClient
["Module"]):
54 def __init__(self
, mgr
):
56 # volume specification
57 self
.volspec
= VolSpec(mgr
.rados
.conf_get('client_snapdir'))
58 self
.cloner
= Cloner(self
, self
.mgr
.max_concurrent_clones
, self
.mgr
.snapshot_clone_delay
)
59 self
.purge_queue
= ThreadPoolPurgeQueueMixin(self
, 4)
60 # on startup, queue purge job for available volumes to kickstart
61 # purge for leftover subvolume entries in trash. note that, if the
62 # trash directory does not exist or if there are no purge entries
63 # available for a volume, the volume is removed from the purge
65 fs_map
= self
.mgr
.get('fs_map')
66 for fs
in fs_map
['filesystems']:
67 self
.cloner
.queue_job(fs
['mdsmap']['fs_name'])
68 self
.purge_queue
.queue_job(fs
['mdsmap']['fs_name'])
71 # Overrides CephfsClient.shutdown()
72 log
.info("shutting down")
74 self
.cloner
.shutdown()
76 self
.purge_queue
.shutdown()
77 # last, delete all libcephfs handles from connection pool
78 self
.connection_pool
.del_all_connections()
80 def cluster_log(self
, msg
, lvl
=None):
82 log to cluster log with default log level as WARN.
85 lvl
= self
.mgr
.ClusterLogPrio
.WARN
86 self
.mgr
.cluster_log("cluster", lvl
, msg
)
88 def volume_exception_to_retval(self
, ve
):
90 return a tuple representation from a volume exception
94 ### volume operations -- create, rm, ls
96 def create_fs_volume(self
, volname
, placement
):
97 return create_volume(self
.mgr
, volname
, placement
)
99 def delete_fs_volume(self
, volname
, confirm
):
100 if confirm
!= "--yes-i-really-mean-it":
101 return -errno
.EPERM
, "", "WARNING: this will *PERMANENTLY DESTROY* all data " \
102 "stored in the filesystem '{0}'. If you are *ABSOLUTELY CERTAIN* " \
103 "that is what you want, re-issue the command followed by " \
104 "--yes-i-really-mean-it.".format(volname
)
106 ret
, out
, err
= self
.mgr
.check_mon_command({
107 'prefix': 'config get',
108 'key': 'mon_allow_pool_delete',
112 mon_allow_pool_delete
= json
.loads(out
)
113 if not mon_allow_pool_delete
:
114 return -errno
.EPERM
, "", "pool deletion is disabled; you must first " \
115 "set the mon_allow_pool_delete config option to true before volumes " \
118 metadata_pool
, data_pools
= get_pool_names(self
.mgr
, volname
)
119 if not metadata_pool
:
120 return -errno
.ENOENT
, "", "volume {0} doesn't exist".format(volname
)
121 self
.purge_queue
.cancel_jobs(volname
)
122 self
.connection_pool
.del_connections(volname
, wait
=True)
123 return delete_volume(self
.mgr
, volname
, metadata_pool
, data_pools
)
125 def list_fs_volumes(self
):
126 volumes
= list_volumes(self
.mgr
)
127 return 0, json
.dumps(volumes
, indent
=4, sort_keys
=True), ""
129 def rename_fs_volume(self
, volname
, newvolname
, sure
):
133 "WARNING: This will rename the filesystem and possibly its "
134 "pools. It is a potentially disruptive operation, clients' "
135 "cephx credentials need reauthorized to access the file system "
136 "and its pools with the new name. Add --yes-i-really-mean-it "
137 "if you are sure you wish to continue.")
139 return rename_volume(self
.mgr
, volname
, newvolname
)
141 def volume_info(self
, **kwargs
):
143 volname
= kwargs
['vol_name']
144 human_readable
= kwargs
['human_readable']
147 with
open_volume(self
, volname
) as fs_handle
:
148 path
= self
.volspec
.base_dir
151 st
= fs_handle
.statx(path
.encode('utf-8'), cephfs
.CEPH_STATX_SIZE
,
152 cephfs
.AT_SYMLINK_NOFOLLOW
)
154 usedbytes
= st
['size']
155 vol_info_dict
= get_pending_subvol_deletions_count(path
)
157 vol_info_dict
['used_size'] = mgr_util
.format_bytes(int(usedbytes
), 5)
159 vol_info_dict
['used_size'] = int(usedbytes
)
160 except cephfs
.Error
as e
:
161 if e
.args
[0] == errno
.ENOENT
:
163 df
= self
.mgr
.get("df")
164 pool_stats
= dict([(p
['id'], p
['stats']) for p
in df
['pools']])
165 osdmap
= self
.mgr
.get("osd_map")
166 pools
= dict([(p
['pool'], p
) for p
in osdmap
['pools']])
167 metadata_pool_id
, data_pool_ids
= get_pool_ids(self
.mgr
, volname
)
168 vol_info_dict
["pools"] = {"metadata": [], "data": []}
169 for pool_id
in [metadata_pool_id
] + data_pool_ids
:
170 if pool_id
== metadata_pool_id
:
171 pool_type
= "metadata"
175 vol_info_dict
["pools"][pool_type
].append({
176 'name': pools
[pool_id
]['pool_name'],
177 'used': mgr_util
.format_bytes(pool_stats
[pool_id
]['bytes_used'], 5),
178 'avail': mgr_util
.format_bytes(pool_stats
[pool_id
]['max_avail'], 5)})
180 vol_info_dict
["pools"][pool_type
].append({
181 'name': pools
[pool_id
]['pool_name'],
182 'used': pool_stats
[pool_id
]['bytes_used'],
183 'avail': pool_stats
[pool_id
]['max_avail']})
186 mon_map_mons
= self
.mgr
.get('mon_map')['mons']
187 for mon
in mon_map_mons
:
188 ip_port
= mon
['addr'].split("/")[0]
189 mon_addr_lst
.append(ip_port
)
190 vol_info_dict
["mon_addrs"] = mon_addr_lst
191 ret
= 0, json
.dumps(vol_info_dict
, indent
=4, sort_keys
=True), ""
192 except VolumeException
as ve
:
193 ret
= self
.volume_exception_to_retval(ve
)
196 ### subvolume operations
198 def _create_subvolume(self
, fs_handle
, volname
, group
, subvolname
, **kwargs
):
199 size
= kwargs
['size']
200 pool
= kwargs
['pool_layout']
203 mode
= kwargs
['mode']
204 isolate_nspace
= kwargs
['namespace_isolated']
206 oct_mode
= octal_str_to_decimal_int(mode
)
209 self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, size
, isolate_nspace
, pool
, oct_mode
, uid
, gid
)
210 except VolumeException
as ve
:
211 # kick the purge threads for async removal -- note that this
212 # assumes that the subvolume is moved to trashcan for cleanup on error.
213 self
.purge_queue
.queue_job(volname
)
216 def create_subvolume(self
, **kwargs
):
218 volname
= kwargs
['vol_name']
219 subvolname
= kwargs
['sub_name']
220 groupname
= kwargs
['group_name']
221 size
= kwargs
['size']
222 pool
= kwargs
['pool_layout']
225 mode
= kwargs
['mode']
226 isolate_nspace
= kwargs
['namespace_isolated']
229 with
open_volume(self
, volname
) as fs_handle
:
230 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
232 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.CREATE
) as subvolume
:
233 # idempotent creation -- valid. Attributes set is supported.
235 'uid': uid
if uid
else subvolume
.uid
,
236 'gid': gid
if gid
else subvolume
.gid
,
237 'mode': octal_str_to_decimal_int(mode
),
239 'pool_namespace': subvolume
.namespace
if isolate_nspace
else None,
242 subvolume
.set_attrs(subvolume
.path
, attrs
)
243 except VolumeException
as ve
:
244 if ve
.errno
== -errno
.ENOENT
:
245 self
._create
_subvolume
(fs_handle
, volname
, group
, subvolname
, **kwargs
)
248 except VolumeException
as ve
:
249 # volume/group does not exist or subvolume creation failed
250 ret
= self
.volume_exception_to_retval(ve
)
253 def remove_subvolume(self
, **kwargs
):
255 volname
= kwargs
['vol_name']
256 subvolname
= kwargs
['sub_name']
257 groupname
= kwargs
['group_name']
258 force
= kwargs
['force']
259 retainsnaps
= kwargs
['retain_snapshots']
262 with
open_volume(self
, volname
) as fs_handle
:
263 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
264 remove_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, force
, retainsnaps
)
265 # kick the purge threads for async removal -- note that this
266 # assumes that the subvolume is moved to trash can.
267 # TODO: make purge queue as singleton so that trash can kicks
268 # the purge threads on dump.
269 self
.purge_queue
.queue_job(volname
)
270 except VolumeException
as ve
:
271 if ve
.errno
== -errno
.EAGAIN
and not force
:
272 ve
= VolumeException(ve
.errno
, ve
.error_str
+ " (use --force to override)")
273 ret
= self
.volume_exception_to_retval(ve
)
274 elif not (ve
.errno
== -errno
.ENOENT
and force
):
275 ret
= self
.volume_exception_to_retval(ve
)
278 def authorize_subvolume(self
, **kwargs
):
280 volname
= kwargs
['vol_name']
281 subvolname
= kwargs
['sub_name']
282 authid
= kwargs
['auth_id']
283 groupname
= kwargs
['group_name']
284 accesslevel
= kwargs
['access_level']
285 tenant_id
= kwargs
['tenant_id']
286 allow_existing_id
= kwargs
['allow_existing_id']
289 with
open_volume(self
, volname
) as fs_handle
:
290 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
291 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.ALLOW_ACCESS
) as subvolume
:
292 key
= subvolume
.authorize(authid
, accesslevel
, tenant_id
, allow_existing_id
)
294 except VolumeException
as ve
:
295 ret
= self
.volume_exception_to_retval(ve
)
298 def deauthorize_subvolume(self
, **kwargs
):
300 volname
= kwargs
['vol_name']
301 subvolname
= kwargs
['sub_name']
302 authid
= kwargs
['auth_id']
303 groupname
= kwargs
['group_name']
306 with
open_volume(self
, volname
) as fs_handle
:
307 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
308 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.DENY_ACCESS
) as subvolume
:
309 subvolume
.deauthorize(authid
)
310 except VolumeException
as ve
:
311 ret
= self
.volume_exception_to_retval(ve
)
314 def authorized_list(self
, **kwargs
):
316 volname
= kwargs
['vol_name']
317 subvolname
= kwargs
['sub_name']
318 groupname
= kwargs
['group_name']
321 with
open_volume(self
, volname
) as fs_handle
:
322 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
323 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.AUTH_LIST
) as subvolume
:
324 auths
= subvolume
.authorized_list()
325 ret
= 0, json
.dumps(auths
, indent
=4, sort_keys
=True), ""
326 except VolumeException
as ve
:
327 ret
= self
.volume_exception_to_retval(ve
)
330 def evict(self
, **kwargs
):
332 volname
= kwargs
['vol_name']
333 subvolname
= kwargs
['sub_name']
334 authid
= kwargs
['auth_id']
335 groupname
= kwargs
['group_name']
338 with
open_volume(self
, volname
) as fs_handle
:
339 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
340 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.EVICT
) as subvolume
:
341 key
= subvolume
.evict(volname
, authid
)
343 except (VolumeException
, ClusterTimeout
, ClusterError
, EvictionError
) as e
:
344 if isinstance(e
, VolumeException
):
345 ret
= self
.volume_exception_to_retval(e
)
346 elif isinstance(e
, ClusterTimeout
):
347 ret
= -errno
.ETIMEDOUT
, "", "Timedout trying to talk to ceph cluster"
348 elif isinstance(e
, ClusterError
):
349 ret
= e
._result
_code
, "", e
._result
_str
350 elif isinstance(e
, EvictionError
):
351 ret
= -errno
.EINVAL
, "", str(e
)
354 def resize_subvolume(self
, **kwargs
):
356 volname
= kwargs
['vol_name']
357 subvolname
= kwargs
['sub_name']
358 newsize
= kwargs
['new_size']
359 noshrink
= kwargs
['no_shrink']
360 groupname
= kwargs
['group_name']
363 with
open_volume(self
, volname
) as fs_handle
:
364 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
365 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.RESIZE
) as subvolume
:
366 nsize
, usedbytes
= subvolume
.resize(newsize
, noshrink
)
368 [{'bytes_used': usedbytes
},{'bytes_quota': nsize
},
369 {'bytes_pcent': "undefined" if nsize
== 0 else '{0:.2f}'.format((float(usedbytes
) / nsize
) * 100.0)}],
370 indent
=4, sort_keys
=True), ""
371 except VolumeException
as ve
:
372 ret
= self
.volume_exception_to_retval(ve
)
375 def subvolume_pin(self
, **kwargs
):
377 volname
= kwargs
['vol_name']
378 subvolname
= kwargs
['sub_name']
379 pin_type
= kwargs
['pin_type']
380 pin_setting
= kwargs
['pin_setting']
381 groupname
= kwargs
['group_name']
384 with
open_volume(self
, volname
) as fs_handle
:
385 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
386 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.PIN
) as subvolume
:
387 subvolume
.pin(pin_type
, pin_setting
)
388 ret
= 0, json
.dumps({}), ""
389 except VolumeException
as ve
:
390 ret
= self
.volume_exception_to_retval(ve
)
393 def subvolume_getpath(self
, **kwargs
):
395 volname
= kwargs
['vol_name']
396 subvolname
= kwargs
['sub_name']
397 groupname
= kwargs
['group_name']
400 with
open_volume(self
, volname
) as fs_handle
:
401 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
402 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.GETPATH
) as subvolume
:
403 subvolpath
= subvolume
.path
404 ret
= 0, subvolpath
.decode("utf-8"), ""
405 except VolumeException
as ve
:
406 ret
= self
.volume_exception_to_retval(ve
)
409 def subvolume_info(self
, **kwargs
):
411 volname
= kwargs
['vol_name']
412 subvolname
= kwargs
['sub_name']
413 groupname
= kwargs
['group_name']
416 with
open_volume(self
, volname
) as fs_handle
:
417 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
418 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.INFO
) as subvolume
:
420 mon_map_mons
= self
.mgr
.get('mon_map')['mons']
421 for mon
in mon_map_mons
:
422 ip_port
= mon
['addr'].split("/")[0]
423 mon_addr_lst
.append(ip_port
)
425 subvol_info_dict
= subvolume
.info()
426 subvol_info_dict
["mon_addrs"] = mon_addr_lst
427 ret
= 0, json
.dumps(subvol_info_dict
, indent
=4, sort_keys
=True), ""
428 except VolumeException
as ve
:
429 ret
= self
.volume_exception_to_retval(ve
)
432 def set_user_metadata(self
, **kwargs
):
434 volname
= kwargs
['vol_name']
435 subvolname
= kwargs
['sub_name']
436 groupname
= kwargs
['group_name']
437 keyname
= kwargs
['key_name']
438 value
= kwargs
['value']
441 with
open_volume(self
, volname
) as fs_handle
:
442 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
443 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.USER_METADATA_SET
) as subvolume
:
444 subvolume
.set_user_metadata(keyname
, value
)
445 except VolumeException
as ve
:
446 ret
= self
.volume_exception_to_retval(ve
)
449 def get_user_metadata(self
, **kwargs
):
451 volname
= kwargs
['vol_name']
452 subvolname
= kwargs
['sub_name']
453 groupname
= kwargs
['group_name']
454 keyname
= kwargs
['key_name']
457 with
open_volume(self
, volname
) as fs_handle
:
458 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
459 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.USER_METADATA_GET
) as subvolume
:
460 value
= subvolume
.get_user_metadata(keyname
)
462 except VolumeException
as ve
:
463 ret
= self
.volume_exception_to_retval(ve
)
466 def list_user_metadata(self
, **kwargs
):
468 volname
= kwargs
['vol_name']
469 subvolname
= kwargs
['sub_name']
470 groupname
= kwargs
['group_name']
473 with
open_volume(self
, volname
) as fs_handle
:
474 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
475 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.USER_METADATA_LIST
) as subvolume
:
476 subvol_metadata_dict
= subvolume
.list_user_metadata()
477 ret
= 0, json
.dumps(subvol_metadata_dict
, indent
=4, sort_keys
=True), ""
478 except VolumeException
as ve
:
479 ret
= self
.volume_exception_to_retval(ve
)
482 def remove_user_metadata(self
, **kwargs
):
484 volname
= kwargs
['vol_name']
485 subvolname
= kwargs
['sub_name']
486 groupname
= kwargs
['group_name']
487 keyname
= kwargs
['key_name']
488 force
= kwargs
['force']
491 with
open_volume(self
, volname
) as fs_handle
:
492 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
493 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.USER_METADATA_REMOVE
) as subvolume
:
494 subvolume
.remove_user_metadata(keyname
)
495 except VolumeException
as ve
:
496 if not (ve
.errno
== -errno
.ENOENT
and force
):
497 ret
= self
.volume_exception_to_retval(ve
)
500 def list_subvolumes(self
, **kwargs
):
502 volname
= kwargs
['vol_name']
503 groupname
= kwargs
['group_name']
506 with
open_volume(self
, volname
) as fs_handle
:
507 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
508 subvolumes
= group
.list_subvolumes()
509 ret
= 0, name_to_json(subvolumes
), ""
510 except VolumeException
as ve
:
511 ret
= self
.volume_exception_to_retval(ve
)
514 def subvolume_exists(self
, **kwargs
):
515 volname
= kwargs
['vol_name']
516 groupname
= kwargs
['group_name']
518 volume_exists
= False
521 with
open_volume(self
, volname
) as fs_handle
:
523 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
524 res
= group
.has_subvolumes()
526 ret
= 0, "subvolume exists", ""
528 ret
= 0, "no subvolume exists", ""
529 except VolumeException
as ve
:
530 if volume_exists
and ve
.errno
== -errno
.ENOENT
:
531 ret
= 0, "no subvolume exists", ""
533 ret
= self
.volume_exception_to_retval(ve
)
536 ### subvolume snapshot
538 def create_subvolume_snapshot(self
, **kwargs
):
540 volname
= kwargs
['vol_name']
541 subvolname
= kwargs
['sub_name']
542 snapname
= kwargs
['snap_name']
543 groupname
= kwargs
['group_name']
546 with
open_volume(self
, volname
) as fs_handle
:
547 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
548 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_CREATE
) as subvolume
:
549 subvolume
.create_snapshot(snapname
)
550 except VolumeException
as ve
:
551 ret
= self
.volume_exception_to_retval(ve
)
554 def remove_subvolume_snapshot(self
, **kwargs
):
556 volname
= kwargs
['vol_name']
557 subvolname
= kwargs
['sub_name']
558 snapname
= kwargs
['snap_name']
559 groupname
= kwargs
['group_name']
560 force
= kwargs
['force']
563 with
open_volume(self
, volname
) as fs_handle
:
564 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
565 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_REMOVE
) as subvolume
:
566 subvolume
.remove_snapshot(snapname
, force
)
567 except VolumeException
as ve
:
568 # ESTALE serves as an error to state that subvolume is currently stale due to internal removal and,
569 # we should tickle the purge jobs to purge the same
570 if ve
.errno
== -errno
.ESTALE
:
571 self
.purge_queue
.queue_job(volname
)
572 elif not (ve
.errno
== -errno
.ENOENT
and force
):
573 ret
= self
.volume_exception_to_retval(ve
)
576 def subvolume_snapshot_info(self
, **kwargs
):
578 volname
= kwargs
['vol_name']
579 subvolname
= kwargs
['sub_name']
580 snapname
= kwargs
['snap_name']
581 groupname
= kwargs
['group_name']
584 with
open_volume(self
, volname
) as fs_handle
:
585 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
586 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_INFO
) as subvolume
:
587 snap_info_dict
= subvolume
.snapshot_info(snapname
)
588 ret
= 0, json
.dumps(snap_info_dict
, indent
=4, sort_keys
=True), ""
589 except VolumeException
as ve
:
590 ret
= self
.volume_exception_to_retval(ve
)
593 def set_subvolume_snapshot_metadata(self
, **kwargs
):
595 volname
= kwargs
['vol_name']
596 subvolname
= kwargs
['sub_name']
597 snapname
= kwargs
['snap_name']
598 groupname
= kwargs
['group_name']
599 keyname
= kwargs
['key_name']
600 value
= kwargs
['value']
603 with
open_volume(self
, volname
) as fs_handle
:
604 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
605 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_METADATA_SET
) as subvolume
:
606 if not snapname
.encode('utf-8') in subvolume
.list_snapshots():
607 raise VolumeException(-errno
.ENOENT
, "snapshot '{0}' does not exist".format(snapname
))
608 subvolume
.set_snapshot_metadata(snapname
, keyname
, value
)
609 except VolumeException
as ve
:
610 ret
= self
.volume_exception_to_retval(ve
)
613 def get_subvolume_snapshot_metadata(self
, **kwargs
):
615 volname
= kwargs
['vol_name']
616 subvolname
= kwargs
['sub_name']
617 snapname
= kwargs
['snap_name']
618 groupname
= kwargs
['group_name']
619 keyname
= kwargs
['key_name']
622 with
open_volume(self
, volname
) as fs_handle
:
623 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
624 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_METADATA_GET
) as subvolume
:
625 if not snapname
.encode('utf-8') in subvolume
.list_snapshots():
626 raise VolumeException(-errno
.ENOENT
, "snapshot '{0}' does not exist".format(snapname
))
627 value
= subvolume
.get_snapshot_metadata(snapname
, keyname
)
629 except VolumeException
as ve
:
630 ret
= self
.volume_exception_to_retval(ve
)
633 def list_subvolume_snapshot_metadata(self
, **kwargs
):
635 volname
= kwargs
['vol_name']
636 subvolname
= kwargs
['sub_name']
637 snapname
= kwargs
['snap_name']
638 groupname
= kwargs
['group_name']
641 with
open_volume(self
, volname
) as fs_handle
:
642 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
643 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_METADATA_LIST
) as subvolume
:
644 if not snapname
.encode('utf-8') in subvolume
.list_snapshots():
645 raise VolumeException(-errno
.ENOENT
, "snapshot '{0}' does not exist".format(snapname
))
646 snap_metadata_dict
= subvolume
.list_snapshot_metadata(snapname
)
647 ret
= 0, json
.dumps(snap_metadata_dict
, indent
=4, sort_keys
=True), ""
648 except VolumeException
as ve
:
649 ret
= self
.volume_exception_to_retval(ve
)
652 def remove_subvolume_snapshot_metadata(self
, **kwargs
):
654 volname
= kwargs
['vol_name']
655 subvolname
= kwargs
['sub_name']
656 snapname
= kwargs
['snap_name']
657 groupname
= kwargs
['group_name']
658 keyname
= kwargs
['key_name']
659 force
= kwargs
['force']
662 with
open_volume(self
, volname
) as fs_handle
:
663 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
664 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_METADATA_REMOVE
) as subvolume
:
665 if not snapname
.encode('utf-8') in subvolume
.list_snapshots():
666 raise VolumeException(-errno
.ENOENT
, "snapshot '{0}' does not exist".format(snapname
))
667 subvolume
.remove_snapshot_metadata(snapname
, keyname
)
668 except VolumeException
as ve
:
669 if not (ve
.errno
== -errno
.ENOENT
and force
):
670 ret
= self
.volume_exception_to_retval(ve
)
673 def list_subvolume_snapshots(self
, **kwargs
):
675 volname
= kwargs
['vol_name']
676 subvolname
= kwargs
['sub_name']
677 groupname
= kwargs
['group_name']
680 with
open_volume(self
, volname
) as fs_handle
:
681 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
682 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_LIST
) as subvolume
:
683 snapshots
= subvolume
.list_snapshots()
684 ret
= 0, name_to_json(snapshots
), ""
685 except VolumeException
as ve
:
686 ret
= self
.volume_exception_to_retval(ve
)
689 def protect_subvolume_snapshot(self
, **kwargs
):
690 ret
= 0, "", "Deprecation warning: 'snapshot protect' call is deprecated and will be removed in a future release"
691 volname
= kwargs
['vol_name']
692 subvolname
= kwargs
['sub_name']
693 groupname
= kwargs
['group_name']
696 with
open_volume(self
, volname
) as fs_handle
:
697 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
698 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_PROTECT
) as subvolume
:
699 log
.warning("snapshot protect call is deprecated and will be removed in a future release")
700 except VolumeException
as ve
:
701 ret
= self
.volume_exception_to_retval(ve
)
704 def unprotect_subvolume_snapshot(self
, **kwargs
):
705 ret
= 0, "", "Deprecation warning: 'snapshot unprotect' call is deprecated and will be removed in a future release"
706 volname
= kwargs
['vol_name']
707 subvolname
= kwargs
['sub_name']
708 groupname
= kwargs
['group_name']
711 with
open_volume(self
, volname
) as fs_handle
:
712 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
713 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_UNPROTECT
) as subvolume
:
714 log
.warning("snapshot unprotect call is deprecated and will be removed in a future release")
715 except VolumeException
as ve
:
716 ret
= self
.volume_exception_to_retval(ve
)
719 def _prepare_clone_subvolume(self
, fs_handle
, volname
, s_subvolume
, s_snapname
, t_group
, t_subvolname
, **kwargs
):
720 t_pool
= kwargs
['pool_layout']
721 s_subvolname
= kwargs
['sub_name']
722 s_groupname
= kwargs
['group_name']
723 t_groupname
= kwargs
['target_group_name']
725 create_clone(self
.mgr
, fs_handle
, self
.volspec
, t_group
, t_subvolname
, t_pool
, volname
, s_subvolume
, s_snapname
)
726 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, t_group
, t_subvolname
, SubvolumeOpType
.CLONE_INTERNAL
) as t_subvolume
:
728 if t_groupname
== s_groupname
and t_subvolname
== s_subvolname
:
729 t_subvolume
.attach_snapshot(s_snapname
, t_subvolume
)
731 s_subvolume
.attach_snapshot(s_snapname
, t_subvolume
)
732 self
.cloner
.queue_job(volname
)
733 except VolumeException
as ve
:
736 self
.purge_queue
.queue_job(volname
)
737 except Exception as e
:
738 log
.warning("failed to cleanup clone subvolume '{0}' ({1})".format(t_subvolname
, e
))
741 def _clone_subvolume_snapshot(self
, fs_handle
, volname
, s_group
, s_subvolume
, **kwargs
):
742 s_snapname
= kwargs
['snap_name']
743 target_subvolname
= kwargs
['target_sub_name']
744 target_groupname
= kwargs
['target_group_name']
745 s_groupname
= kwargs
['group_name']
747 if not s_snapname
.encode('utf-8') in s_subvolume
.list_snapshots():
748 raise VolumeException(-errno
.ENOENT
, "snapshot '{0}' does not exist".format(s_snapname
))
750 with
open_group_unique(fs_handle
, self
.volspec
, target_groupname
, s_group
, s_groupname
) as target_group
:
752 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, target_group
, target_subvolname
, SubvolumeOpType
.CLONE_CREATE
):
753 raise VolumeException(-errno
.EEXIST
, "subvolume '{0}' exists".format(target_subvolname
))
754 except VolumeException
as ve
:
755 if ve
.errno
== -errno
.ENOENT
:
756 self
._prepare
_clone
_subvolume
(fs_handle
, volname
, s_subvolume
, s_snapname
,
757 target_group
, target_subvolname
, **kwargs
)
761 def clone_subvolume_snapshot(self
, **kwargs
):
763 volname
= kwargs
['vol_name']
764 s_subvolname
= kwargs
['sub_name']
765 s_groupname
= kwargs
['group_name']
768 with
open_volume(self
, volname
) as fs_handle
:
769 with
open_group(fs_handle
, self
.volspec
, s_groupname
) as s_group
:
770 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, s_group
, s_subvolname
, SubvolumeOpType
.CLONE_SOURCE
) as s_subvolume
:
771 self
._clone
_subvolume
_snapshot
(fs_handle
, volname
, s_group
, s_subvolume
, **kwargs
)
772 except VolumeException
as ve
:
773 ret
= self
.volume_exception_to_retval(ve
)
776 def clone_status(self
, **kwargs
):
778 volname
= kwargs
['vol_name']
779 clonename
= kwargs
['clone_name']
780 groupname
= kwargs
['group_name']
783 with
open_volume(self
, volname
) as fs_handle
:
784 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
785 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, clonename
, SubvolumeOpType
.CLONE_STATUS
) as subvolume
:
786 ret
= 0, json
.dumps({'status' : subvolume
.status
}, indent
=2), ""
787 except VolumeException
as ve
:
788 ret
= self
.volume_exception_to_retval(ve
)
791 def clone_cancel(self
, **kwargs
):
793 volname
= kwargs
['vol_name']
794 clonename
= kwargs
['clone_name']
795 groupname
= kwargs
['group_name']
798 self
.cloner
.cancel_job(volname
, (clonename
, groupname
))
799 except VolumeException
as ve
:
800 ret
= self
.volume_exception_to_retval(ve
)
805 def create_subvolume_group(self
, **kwargs
):
807 volname
= kwargs
['vol_name']
808 groupname
= kwargs
['group_name']
809 size
= kwargs
['size']
810 pool
= kwargs
['pool_layout']
813 mode
= kwargs
['mode']
816 with
open_volume(self
, volname
) as fs_handle
:
818 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
819 # idempotent creation -- valid.
823 'mode': octal_str_to_decimal_int(mode
),
827 set_group_attrs(fs_handle
, group
.path
, attrs
)
828 except VolumeException
as ve
:
829 if ve
.errno
== -errno
.ENOENT
:
830 oct_mode
= octal_str_to_decimal_int(mode
)
831 create_group(fs_handle
, self
.volspec
, groupname
, size
, pool
, oct_mode
, uid
, gid
)
834 except VolumeException
as ve
:
835 # volume does not exist or subvolume group creation failed
836 ret
= self
.volume_exception_to_retval(ve
)
839 def remove_subvolume_group(self
, **kwargs
):
841 volname
= kwargs
['vol_name']
842 groupname
= kwargs
['group_name']
843 force
= kwargs
['force']
846 with
open_volume(self
, volname
) as fs_handle
:
847 remove_group(fs_handle
, self
.volspec
, groupname
)
848 except VolumeException
as ve
:
849 if not (ve
.errno
== -errno
.ENOENT
and force
):
850 ret
= self
.volume_exception_to_retval(ve
)
853 def subvolumegroup_info(self
, **kwargs
):
855 volname
= kwargs
['vol_name']
856 groupname
= kwargs
['group_name']
859 with
open_volume(self
, volname
) as fs_handle
:
860 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
862 mon_map_mons
= self
.mgr
.get('mon_map')['mons']
863 for mon
in mon_map_mons
:
864 ip_port
= mon
['addr'].split("/")[0]
865 mon_addr_lst
.append(ip_port
)
867 group_info_dict
= group
.info()
868 group_info_dict
["mon_addrs"] = mon_addr_lst
869 ret
= 0, json
.dumps(group_info_dict
, indent
=4, sort_keys
=True), ""
870 except VolumeException
as ve
:
871 ret
= self
.volume_exception_to_retval(ve
)
874 def resize_subvolume_group(self
, **kwargs
):
876 volname
= kwargs
['vol_name']
877 groupname
= kwargs
['group_name']
878 newsize
= kwargs
['new_size']
879 noshrink
= kwargs
['no_shrink']
882 with
open_volume(self
, volname
) as fs_handle
:
883 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
884 nsize
, usedbytes
= group
.resize(newsize
, noshrink
)
886 [{'bytes_used': usedbytes
},{'bytes_quota': nsize
},
887 {'bytes_pcent': "undefined" if nsize
== 0 else '{0:.2f}'.format((float(usedbytes
) / nsize
) * 100.0)}],
888 indent
=4, sort_keys
=True), ""
889 except VolumeException
as ve
:
890 ret
= self
.volume_exception_to_retval(ve
)
893 def getpath_subvolume_group(self
, **kwargs
):
894 volname
= kwargs
['vol_name']
895 groupname
= kwargs
['group_name']
898 with
open_volume(self
, volname
) as fs_handle
:
899 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
900 return 0, group
.path
.decode('utf-8'), ""
901 except VolumeException
as ve
:
902 return self
.volume_exception_to_retval(ve
)
904 def list_subvolume_groups(self
, **kwargs
):
905 volname
= kwargs
['vol_name']
907 volume_exists
= False
909 with
open_volume(self
, volname
) as fs_handle
:
911 groups
= listdir(fs_handle
, self
.volspec
.base_dir
, filter_entries
=[dir.encode('utf-8') for dir in self
.volspec
.INTERNAL_DIRS
])
912 ret
= 0, name_to_json(groups
), ""
913 except VolumeException
as ve
:
914 if not ve
.errno
== -errno
.ENOENT
or not volume_exists
:
915 ret
= self
.volume_exception_to_retval(ve
)
918 def pin_subvolume_group(self
, **kwargs
):
920 volname
= kwargs
['vol_name']
921 groupname
= kwargs
['group_name']
922 pin_type
= kwargs
['pin_type']
923 pin_setting
= kwargs
['pin_setting']
926 with
open_volume(self
, volname
) as fs_handle
:
927 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
928 group
.pin(pin_type
, pin_setting
)
929 ret
= 0, json
.dumps({}), ""
930 except VolumeException
as ve
:
931 ret
= self
.volume_exception_to_retval(ve
)
934 def subvolume_group_exists(self
, **kwargs
):
935 volname
= kwargs
['vol_name']
937 volume_exists
= False
940 with
open_volume(self
, volname
) as fs_handle
:
942 res
= has_subdir(fs_handle
, self
.volspec
.base_dir
, filter_entries
=[
943 dir.encode('utf-8') for dir in self
.volspec
.INTERNAL_DIRS
])
945 ret
= 0, "subvolumegroup exists", ""
947 ret
= 0, "no subvolumegroup exists", ""
948 except VolumeException
as ve
:
949 if volume_exists
and ve
.errno
== -errno
.ENOENT
:
950 ret
= 0, "no subvolumegroup exists", ""
952 ret
= self
.volume_exception_to_retval(ve
)
957 def create_subvolume_group_snapshot(self
, **kwargs
):
958 ret
= -errno
.ENOSYS
, "", "subvolume group snapshots are not supported"
959 volname
= kwargs
['vol_name']
960 groupname
= kwargs
['group_name']
961 # snapname = kwargs['snap_name']
964 with
open_volume(self
, volname
) as fs_handle
:
965 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
966 # as subvolumes are marked with the vxattr ceph.dir.subvolume deny snapshots
967 # at the subvolume group (see: https://tracker.ceph.com/issues/46074)
968 # group.create_snapshot(snapname)
970 except VolumeException
as ve
:
971 ret
= self
.volume_exception_to_retval(ve
)
974 def remove_subvolume_group_snapshot(self
, **kwargs
):
976 volname
= kwargs
['vol_name']
977 groupname
= kwargs
['group_name']
978 snapname
= kwargs
['snap_name']
979 force
= kwargs
['force']
982 with
open_volume(self
, volname
) as fs_handle
:
983 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
984 group
.remove_snapshot(snapname
)
985 except VolumeException
as ve
:
986 if not (ve
.errno
== -errno
.ENOENT
and force
):
987 ret
= self
.volume_exception_to_retval(ve
)
990 def list_subvolume_group_snapshots(self
, **kwargs
):
992 volname
= kwargs
['vol_name']
993 groupname
= kwargs
['group_name']
996 with
open_volume(self
, volname
) as fs_handle
:
997 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
998 snapshots
= group
.list_snapshots()
999 ret
= 0, name_to_json(snapshots
), ""
1000 except VolumeException
as ve
:
1001 ret
= self
.volume_exception_to_retval(ve
)