5 from typing
import TYPE_CHECKING
9 from mgr_util
import CephfsClient
11 from .fs_util
import listdir
, has_subdir
13 from .operations
.group
import open_group
, create_group
, remove_group
, \
14 open_group_unique
, set_group_attrs
15 from .operations
.volume
import create_volume
, delete_volume
, rename_volume
, \
16 list_volumes
, open_volume
, get_pool_names
, get_pool_ids
, get_pending_subvol_deletions_count
17 from .operations
.subvolume
import open_subvol
, create_subvol
, remove_subvol
, \
19 from .operations
.trash
import Trash
21 from .vol_spec
import VolSpec
22 from .exception
import VolumeException
, ClusterError
, ClusterTimeout
, EvictionError
23 from .async_cloner
import Cloner
24 from .purge_queue
import ThreadPoolPurgeQueueMixin
25 from .operations
.template
import SubvolumeOpType
28 from volumes
import Module
30 log
= logging
.getLogger(__name__
)
32 ALLOWED_ACCESS_LEVELS
= ('r', 'rw')
35 def octal_str_to_decimal_int(mode
):
39 raise VolumeException(-errno
.EINVAL
, "Invalid mode '{0}'".format(mode
))
42 def name_to_json(names
):
44 convert the list of names to json
47 for i
in range(len(names
)):
48 namedict
.append({'name': names
[i
].decode('utf-8')})
49 return json
.dumps(namedict
, indent
=4, sort_keys
=True)
52 class VolumeClient(CephfsClient
["Module"]):
53 def __init__(self
, mgr
):
55 # volume specification
56 self
.volspec
= VolSpec(mgr
.rados
.conf_get('client_snapdir'))
57 self
.cloner
= Cloner(self
, self
.mgr
.max_concurrent_clones
, self
.mgr
.snapshot_clone_delay
)
58 self
.purge_queue
= ThreadPoolPurgeQueueMixin(self
, 4)
59 # on startup, queue purge job for available volumes to kickstart
60 # purge for leftover subvolume entries in trash. note that, if the
61 # trash directory does not exist or if there are no purge entries
62 # available for a volume, the volume is removed from the purge
64 fs_map
= self
.mgr
.get('fs_map')
65 for fs
in fs_map
['filesystems']:
66 self
.cloner
.queue_job(fs
['mdsmap']['fs_name'])
67 self
.purge_queue
.queue_job(fs
['mdsmap']['fs_name'])
70 # Overrides CephfsClient.shutdown()
71 log
.info("shutting down")
72 # first, note that we're shutting down
75 self
.cloner
.shutdown()
77 self
.purge_queue
.shutdown()
78 # last, delete all libcephfs handles from connection pool
79 self
.connection_pool
.del_all_connections()
81 def cluster_log(self
, msg
, lvl
=None):
83 log to cluster log with default log level as WARN.
86 lvl
= self
.mgr
.ClusterLogPrio
.WARN
87 self
.mgr
.cluster_log("cluster", lvl
, msg
)
89 def volume_exception_to_retval(self
, ve
):
91 return a tuple representation from a volume exception
95 ### volume operations -- create, rm, ls
97 def create_fs_volume(self
, volname
, placement
):
98 if self
.is_stopping():
99 return -errno
.ESHUTDOWN
, "", "shutdown in progress"
100 return create_volume(self
.mgr
, volname
, placement
)
102 def delete_fs_volume(self
, volname
, confirm
):
103 if self
.is_stopping():
104 return -errno
.ESHUTDOWN
, "", "shutdown in progress"
106 if confirm
!= "--yes-i-really-mean-it":
107 return -errno
.EPERM
, "", "WARNING: this will *PERMANENTLY DESTROY* all data " \
108 "stored in the filesystem '{0}'. If you are *ABSOLUTELY CERTAIN* " \
109 "that is what you want, re-issue the command followed by " \
110 "--yes-i-really-mean-it.".format(volname
)
112 ret
, out
, err
= self
.mgr
.check_mon_command({
113 'prefix': 'config get',
114 'key': 'mon_allow_pool_delete',
118 mon_allow_pool_delete
= json
.loads(out
)
119 if not mon_allow_pool_delete
:
120 return -errno
.EPERM
, "", "pool deletion is disabled; you must first " \
121 "set the mon_allow_pool_delete config option to true before volumes " \
124 metadata_pool
, data_pools
= get_pool_names(self
.mgr
, volname
)
125 if not metadata_pool
:
126 return -errno
.ENOENT
, "", "volume {0} doesn't exist".format(volname
)
127 self
.purge_queue
.cancel_jobs(volname
)
128 self
.connection_pool
.del_connections(volname
, wait
=True)
129 return delete_volume(self
.mgr
, volname
, metadata_pool
, data_pools
)
131 def list_fs_volumes(self
):
132 if self
.stopping
.is_set():
133 return -errno
.ESHUTDOWN
, "", "shutdown in progress"
134 volumes
= list_volumes(self
.mgr
)
135 return 0, json
.dumps(volumes
, indent
=4, sort_keys
=True), ""
137 def rename_fs_volume(self
, volname
, newvolname
, sure
):
138 if self
.is_stopping():
139 return -errno
.ESHUTDOWN
, "", "shutdown in progress"
144 "WARNING: This will rename the filesystem and possibly its "
145 "pools. It is a potentially disruptive operation, clients' "
146 "cephx credentials need reauthorized to access the file system "
147 "and its pools with the new name. Add --yes-i-really-mean-it "
148 "if you are sure you wish to continue.")
150 return rename_volume(self
.mgr
, volname
, newvolname
)
152 def volume_info(self
, **kwargs
):
154 volname
= kwargs
['vol_name']
157 with
open_volume(self
, volname
) as fs_handle
:
158 path
= self
.volspec
.base_dir
161 st
= fs_handle
.statx(path
.encode('utf-8'), cephfs
.CEPH_STATX_SIZE
,
162 cephfs
.AT_SYMLINK_NOFOLLOW
)
164 usedbytes
= st
['size']
165 vol_info_dict
= get_pending_subvol_deletions_count(path
)
166 vol_info_dict
['used_size'] = int(usedbytes
)
167 except cephfs
.Error
as e
:
168 if e
.args
[0] == errno
.ENOENT
:
170 df
= self
.mgr
.get("df")
171 pool_stats
= dict([(p
['id'], p
['stats']) for p
in df
['pools']])
172 osdmap
= self
.mgr
.get("osd_map")
173 pools
= dict([(p
['pool'], p
) for p
in osdmap
['pools']])
174 metadata_pool_id
, data_pool_ids
= get_pool_ids(self
.mgr
, volname
)
175 vol_info_dict
["pools"] = {"metadata": [], "data": []}
176 for pool_id
in [metadata_pool_id
] + data_pool_ids
:
177 if pool_id
== metadata_pool_id
:
178 pool_type
= "metadata"
181 vol_info_dict
["pools"][pool_type
].append({
182 'name': pools
[pool_id
]['pool_name'],
183 'used': pool_stats
[pool_id
]['bytes_used'],
184 'avail': pool_stats
[pool_id
]['max_avail']})
187 mon_map_mons
= self
.mgr
.get('mon_map')['mons']
188 for mon
in mon_map_mons
:
189 ip_port
= mon
['addr'].split("/")[0]
190 mon_addr_lst
.append(ip_port
)
191 vol_info_dict
["mon_addrs"] = mon_addr_lst
192 ret
= 0, json
.dumps(vol_info_dict
, indent
=4, sort_keys
=True), ""
193 except VolumeException
as ve
:
194 ret
= self
.volume_exception_to_retval(ve
)
197 ### subvolume operations
199 def _create_subvolume(self
, fs_handle
, volname
, group
, subvolname
, **kwargs
):
200 size
= kwargs
['size']
201 pool
= kwargs
['pool_layout']
204 mode
= kwargs
['mode']
205 isolate_nspace
= kwargs
['namespace_isolated']
207 oct_mode
= octal_str_to_decimal_int(mode
)
210 self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, size
, isolate_nspace
, pool
, oct_mode
, uid
, gid
)
211 except VolumeException
as ve
:
212 # kick the purge threads for async removal -- note that this
213 # assumes that the subvolume is moved to trashcan for cleanup on error.
214 self
.purge_queue
.queue_job(volname
)
217 def create_subvolume(self
, **kwargs
):
219 volname
= kwargs
['vol_name']
220 subvolname
= kwargs
['sub_name']
221 groupname
= kwargs
['group_name']
222 size
= kwargs
['size']
223 pool
= kwargs
['pool_layout']
226 mode
= kwargs
['mode']
227 isolate_nspace
= kwargs
['namespace_isolated']
230 with
open_volume(self
, volname
) as fs_handle
:
231 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
233 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.CREATE
) as subvolume
:
234 # idempotent creation -- valid. Attributes set is supported.
236 'uid': uid
if uid
else subvolume
.uid
,
237 'gid': gid
if gid
else subvolume
.gid
,
238 'mode': octal_str_to_decimal_int(mode
),
240 'pool_namespace': subvolume
.namespace
if isolate_nspace
else None,
243 subvolume
.set_attrs(subvolume
.path
, attrs
)
244 except VolumeException
as ve
:
245 if ve
.errno
== -errno
.ENOENT
:
246 self
._create
_subvolume
(fs_handle
, volname
, group
, subvolname
, **kwargs
)
249 except VolumeException
as ve
:
250 # volume/group does not exist or subvolume creation failed
251 ret
= self
.volume_exception_to_retval(ve
)
254 def remove_subvolume(self
, **kwargs
):
256 volname
= kwargs
['vol_name']
257 subvolname
= kwargs
['sub_name']
258 groupname
= kwargs
['group_name']
259 force
= kwargs
['force']
260 retainsnaps
= kwargs
['retain_snapshots']
263 with
open_volume(self
, volname
) as fs_handle
:
264 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
265 remove_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, force
, retainsnaps
)
266 # kick the purge threads for async removal -- note that this
267 # assumes that the subvolume is moved to trash can.
268 # TODO: make purge queue as singleton so that trash can kicks
269 # the purge threads on dump.
270 self
.purge_queue
.queue_job(volname
)
271 except VolumeException
as ve
:
272 if ve
.errno
== -errno
.EAGAIN
and not force
:
273 ve
= VolumeException(ve
.errno
, ve
.error_str
+ " (use --force to override)")
274 ret
= self
.volume_exception_to_retval(ve
)
275 elif not (ve
.errno
== -errno
.ENOENT
and force
):
276 ret
= self
.volume_exception_to_retval(ve
)
279 def authorize_subvolume(self
, **kwargs
):
281 volname
= kwargs
['vol_name']
282 subvolname
= kwargs
['sub_name']
283 authid
= kwargs
['auth_id']
284 groupname
= kwargs
['group_name']
285 accesslevel
= kwargs
['access_level']
286 tenant_id
= kwargs
['tenant_id']
287 allow_existing_id
= kwargs
['allow_existing_id']
290 with
open_volume(self
, volname
) as fs_handle
:
291 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
292 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.ALLOW_ACCESS
) as subvolume
:
293 key
= subvolume
.authorize(authid
, accesslevel
, tenant_id
, allow_existing_id
)
295 except VolumeException
as ve
:
296 ret
= self
.volume_exception_to_retval(ve
)
299 def deauthorize_subvolume(self
, **kwargs
):
301 volname
= kwargs
['vol_name']
302 subvolname
= kwargs
['sub_name']
303 authid
= kwargs
['auth_id']
304 groupname
= kwargs
['group_name']
307 with
open_volume(self
, volname
) as fs_handle
:
308 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
309 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.DENY_ACCESS
) as subvolume
:
310 subvolume
.deauthorize(authid
)
311 except VolumeException
as ve
:
312 ret
= self
.volume_exception_to_retval(ve
)
315 def authorized_list(self
, **kwargs
):
317 volname
= kwargs
['vol_name']
318 subvolname
= kwargs
['sub_name']
319 groupname
= kwargs
['group_name']
322 with
open_volume(self
, volname
) as fs_handle
:
323 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
324 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.AUTH_LIST
) as subvolume
:
325 auths
= subvolume
.authorized_list()
326 ret
= 0, json
.dumps(auths
, indent
=4, sort_keys
=True), ""
327 except VolumeException
as ve
:
328 ret
= self
.volume_exception_to_retval(ve
)
331 def evict(self
, **kwargs
):
333 volname
= kwargs
['vol_name']
334 subvolname
= kwargs
['sub_name']
335 authid
= kwargs
['auth_id']
336 groupname
= kwargs
['group_name']
339 with
open_volume(self
, volname
) as fs_handle
:
340 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
341 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.EVICT
) as subvolume
:
342 key
= subvolume
.evict(volname
, authid
)
344 except (VolumeException
, ClusterTimeout
, ClusterError
, EvictionError
) as e
:
345 if isinstance(e
, VolumeException
):
346 ret
= self
.volume_exception_to_retval(e
)
347 elif isinstance(e
, ClusterTimeout
):
348 ret
= -errno
.ETIMEDOUT
, "", "Timedout trying to talk to ceph cluster"
349 elif isinstance(e
, ClusterError
):
350 ret
= e
._result
_code
, "", e
._result
_str
351 elif isinstance(e
, EvictionError
):
352 ret
= -errno
.EINVAL
, "", str(e
)
355 def resize_subvolume(self
, **kwargs
):
357 volname
= kwargs
['vol_name']
358 subvolname
= kwargs
['sub_name']
359 newsize
= kwargs
['new_size']
360 noshrink
= kwargs
['no_shrink']
361 groupname
= kwargs
['group_name']
364 with
open_volume(self
, volname
) as fs_handle
:
365 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
366 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.RESIZE
) as subvolume
:
367 nsize
, usedbytes
= subvolume
.resize(newsize
, noshrink
)
369 [{'bytes_used': usedbytes
},{'bytes_quota': nsize
},
370 {'bytes_pcent': "undefined" if nsize
== 0 else '{0:.2f}'.format((float(usedbytes
) / nsize
) * 100.0)}],
371 indent
=4, sort_keys
=True), ""
372 except VolumeException
as ve
:
373 ret
= self
.volume_exception_to_retval(ve
)
376 def subvolume_pin(self
, **kwargs
):
378 volname
= kwargs
['vol_name']
379 subvolname
= kwargs
['sub_name']
380 pin_type
= kwargs
['pin_type']
381 pin_setting
= kwargs
['pin_setting']
382 groupname
= kwargs
['group_name']
385 with
open_volume(self
, volname
) as fs_handle
:
386 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
387 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.PIN
) as subvolume
:
388 subvolume
.pin(pin_type
, pin_setting
)
389 ret
= 0, json
.dumps({}), ""
390 except VolumeException
as ve
:
391 ret
= self
.volume_exception_to_retval(ve
)
394 def subvolume_getpath(self
, **kwargs
):
396 volname
= kwargs
['vol_name']
397 subvolname
= kwargs
['sub_name']
398 groupname
= kwargs
['group_name']
401 with
open_volume(self
, volname
) as fs_handle
:
402 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
403 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.GETPATH
) as subvolume
:
404 subvolpath
= subvolume
.path
405 ret
= 0, subvolpath
.decode("utf-8"), ""
406 except VolumeException
as ve
:
407 ret
= self
.volume_exception_to_retval(ve
)
410 def subvolume_info(self
, **kwargs
):
412 volname
= kwargs
['vol_name']
413 subvolname
= kwargs
['sub_name']
414 groupname
= kwargs
['group_name']
417 with
open_volume(self
, volname
) as fs_handle
:
418 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
419 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.INFO
) as subvolume
:
421 mon_map_mons
= self
.mgr
.get('mon_map')['mons']
422 for mon
in mon_map_mons
:
423 ip_port
= mon
['addr'].split("/")[0]
424 mon_addr_lst
.append(ip_port
)
426 subvol_info_dict
= subvolume
.info()
427 subvol_info_dict
["mon_addrs"] = mon_addr_lst
428 ret
= 0, json
.dumps(subvol_info_dict
, indent
=4, sort_keys
=True), ""
429 except VolumeException
as ve
:
430 ret
= self
.volume_exception_to_retval(ve
)
433 def set_user_metadata(self
, **kwargs
):
435 volname
= kwargs
['vol_name']
436 subvolname
= kwargs
['sub_name']
437 groupname
= kwargs
['group_name']
438 keyname
= kwargs
['key_name']
439 value
= kwargs
['value']
442 with
open_volume(self
, volname
) as fs_handle
:
443 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
444 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.USER_METADATA_SET
) as subvolume
:
445 subvolume
.set_user_metadata(keyname
, value
)
446 except VolumeException
as ve
:
447 ret
= self
.volume_exception_to_retval(ve
)
450 def get_user_metadata(self
, **kwargs
):
452 volname
= kwargs
['vol_name']
453 subvolname
= kwargs
['sub_name']
454 groupname
= kwargs
['group_name']
455 keyname
= kwargs
['key_name']
458 with
open_volume(self
, volname
) as fs_handle
:
459 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
460 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.USER_METADATA_GET
) as subvolume
:
461 value
= subvolume
.get_user_metadata(keyname
)
463 except VolumeException
as ve
:
464 ret
= self
.volume_exception_to_retval(ve
)
467 def list_user_metadata(self
, **kwargs
):
469 volname
= kwargs
['vol_name']
470 subvolname
= kwargs
['sub_name']
471 groupname
= kwargs
['group_name']
474 with
open_volume(self
, volname
) as fs_handle
:
475 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
476 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.USER_METADATA_LIST
) as subvolume
:
477 subvol_metadata_dict
= subvolume
.list_user_metadata()
478 ret
= 0, json
.dumps(subvol_metadata_dict
, indent
=4, sort_keys
=True), ""
479 except VolumeException
as ve
:
480 ret
= self
.volume_exception_to_retval(ve
)
483 def remove_user_metadata(self
, **kwargs
):
485 volname
= kwargs
['vol_name']
486 subvolname
= kwargs
['sub_name']
487 groupname
= kwargs
['group_name']
488 keyname
= kwargs
['key_name']
489 force
= kwargs
['force']
492 with
open_volume(self
, volname
) as fs_handle
:
493 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
494 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.USER_METADATA_REMOVE
) as subvolume
:
495 subvolume
.remove_user_metadata(keyname
)
496 except VolumeException
as ve
:
497 if not (ve
.errno
== -errno
.ENOENT
and force
):
498 ret
= self
.volume_exception_to_retval(ve
)
501 def list_subvolumes(self
, **kwargs
):
503 volname
= kwargs
['vol_name']
504 groupname
= kwargs
['group_name']
507 with
open_volume(self
, volname
) as fs_handle
:
508 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
509 subvolumes
= group
.list_subvolumes()
510 ret
= 0, name_to_json(subvolumes
), ""
511 except VolumeException
as ve
:
512 ret
= self
.volume_exception_to_retval(ve
)
515 def subvolume_exists(self
, **kwargs
):
516 volname
= kwargs
['vol_name']
517 groupname
= kwargs
['group_name']
519 volume_exists
= False
522 with
open_volume(self
, volname
) as fs_handle
:
524 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
525 res
= group
.has_subvolumes()
527 ret
= 0, "subvolume exists", ""
529 ret
= 0, "no subvolume exists", ""
530 except VolumeException
as ve
:
531 if volume_exists
and ve
.errno
== -errno
.ENOENT
:
532 ret
= 0, "no subvolume exists", ""
534 ret
= self
.volume_exception_to_retval(ve
)
537 ### subvolume snapshot
539 def create_subvolume_snapshot(self
, **kwargs
):
541 volname
= kwargs
['vol_name']
542 subvolname
= kwargs
['sub_name']
543 snapname
= kwargs
['snap_name']
544 groupname
= kwargs
['group_name']
547 with
open_volume(self
, volname
) as fs_handle
:
548 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
549 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_CREATE
) as subvolume
:
550 subvolume
.create_snapshot(snapname
)
551 except VolumeException
as ve
:
552 ret
= self
.volume_exception_to_retval(ve
)
555 def remove_subvolume_snapshot(self
, **kwargs
):
557 volname
= kwargs
['vol_name']
558 subvolname
= kwargs
['sub_name']
559 snapname
= kwargs
['snap_name']
560 groupname
= kwargs
['group_name']
561 force
= kwargs
['force']
564 with
open_volume(self
, volname
) as fs_handle
:
565 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
566 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_REMOVE
) as subvolume
:
567 subvolume
.remove_snapshot(snapname
, force
)
568 except VolumeException
as ve
:
569 # ESTALE serves as an error to state that subvolume is currently stale due to internal removal and,
570 # we should tickle the purge jobs to purge the same
571 if ve
.errno
== -errno
.ESTALE
:
572 self
.purge_queue
.queue_job(volname
)
573 elif not (ve
.errno
== -errno
.ENOENT
and force
):
574 ret
= self
.volume_exception_to_retval(ve
)
577 def subvolume_snapshot_info(self
, **kwargs
):
579 volname
= kwargs
['vol_name']
580 subvolname
= kwargs
['sub_name']
581 snapname
= kwargs
['snap_name']
582 groupname
= kwargs
['group_name']
585 with
open_volume(self
, volname
) as fs_handle
:
586 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
587 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_INFO
) as subvolume
:
588 snap_info_dict
= subvolume
.snapshot_info(snapname
)
589 ret
= 0, json
.dumps(snap_info_dict
, indent
=4, sort_keys
=True), ""
590 except VolumeException
as ve
:
591 ret
= self
.volume_exception_to_retval(ve
)
594 def set_subvolume_snapshot_metadata(self
, **kwargs
):
596 volname
= kwargs
['vol_name']
597 subvolname
= kwargs
['sub_name']
598 snapname
= kwargs
['snap_name']
599 groupname
= kwargs
['group_name']
600 keyname
= kwargs
['key_name']
601 value
= kwargs
['value']
604 with
open_volume(self
, volname
) as fs_handle
:
605 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
606 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_METADATA_SET
) as subvolume
:
607 if not snapname
.encode('utf-8') in subvolume
.list_snapshots():
608 raise VolumeException(-errno
.ENOENT
, "snapshot '{0}' does not exist".format(snapname
))
609 subvolume
.set_snapshot_metadata(snapname
, keyname
, value
)
610 except VolumeException
as ve
:
611 ret
= self
.volume_exception_to_retval(ve
)
614 def get_subvolume_snapshot_metadata(self
, **kwargs
):
616 volname
= kwargs
['vol_name']
617 subvolname
= kwargs
['sub_name']
618 snapname
= kwargs
['snap_name']
619 groupname
= kwargs
['group_name']
620 keyname
= kwargs
['key_name']
623 with
open_volume(self
, volname
) as fs_handle
:
624 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
625 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_METADATA_GET
) as subvolume
:
626 if not snapname
.encode('utf-8') in subvolume
.list_snapshots():
627 raise VolumeException(-errno
.ENOENT
, "snapshot '{0}' does not exist".format(snapname
))
628 value
= subvolume
.get_snapshot_metadata(snapname
, keyname
)
630 except VolumeException
as ve
:
631 ret
= self
.volume_exception_to_retval(ve
)
634 def list_subvolume_snapshot_metadata(self
, **kwargs
):
636 volname
= kwargs
['vol_name']
637 subvolname
= kwargs
['sub_name']
638 snapname
= kwargs
['snap_name']
639 groupname
= kwargs
['group_name']
642 with
open_volume(self
, volname
) as fs_handle
:
643 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
644 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_METADATA_LIST
) as subvolume
:
645 if not snapname
.encode('utf-8') in subvolume
.list_snapshots():
646 raise VolumeException(-errno
.ENOENT
, "snapshot '{0}' does not exist".format(snapname
))
647 snap_metadata_dict
= subvolume
.list_snapshot_metadata(snapname
)
648 ret
= 0, json
.dumps(snap_metadata_dict
, indent
=4, sort_keys
=True), ""
649 except VolumeException
as ve
:
650 ret
= self
.volume_exception_to_retval(ve
)
653 def remove_subvolume_snapshot_metadata(self
, **kwargs
):
655 volname
= kwargs
['vol_name']
656 subvolname
= kwargs
['sub_name']
657 snapname
= kwargs
['snap_name']
658 groupname
= kwargs
['group_name']
659 keyname
= kwargs
['key_name']
660 force
= kwargs
['force']
663 with
open_volume(self
, volname
) as fs_handle
:
664 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
665 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_METADATA_REMOVE
) as subvolume
:
666 if not snapname
.encode('utf-8') in subvolume
.list_snapshots():
667 raise VolumeException(-errno
.ENOENT
, "snapshot '{0}' does not exist".format(snapname
))
668 subvolume
.remove_snapshot_metadata(snapname
, keyname
)
669 except VolumeException
as ve
:
670 if not (ve
.errno
== -errno
.ENOENT
and force
):
671 ret
= self
.volume_exception_to_retval(ve
)
674 def list_subvolume_snapshots(self
, **kwargs
):
676 volname
= kwargs
['vol_name']
677 subvolname
= kwargs
['sub_name']
678 groupname
= kwargs
['group_name']
681 with
open_volume(self
, volname
) as fs_handle
:
682 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
683 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_LIST
) as subvolume
:
684 snapshots
= subvolume
.list_snapshots()
685 ret
= 0, name_to_json(snapshots
), ""
686 except VolumeException
as ve
:
687 ret
= self
.volume_exception_to_retval(ve
)
690 def protect_subvolume_snapshot(self
, **kwargs
):
691 ret
= 0, "", "Deprecation warning: 'snapshot protect' call is deprecated and will be removed in a future release"
692 volname
= kwargs
['vol_name']
693 subvolname
= kwargs
['sub_name']
694 groupname
= kwargs
['group_name']
697 with
open_volume(self
, volname
) as fs_handle
:
698 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
699 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_PROTECT
) as subvolume
:
700 log
.warning("snapshot protect call is deprecated and will be removed in a future release")
701 except VolumeException
as ve
:
702 ret
= self
.volume_exception_to_retval(ve
)
705 def unprotect_subvolume_snapshot(self
, **kwargs
):
706 ret
= 0, "", "Deprecation warning: 'snapshot unprotect' call is deprecated and will be removed in a future release"
707 volname
= kwargs
['vol_name']
708 subvolname
= kwargs
['sub_name']
709 groupname
= kwargs
['group_name']
712 with
open_volume(self
, volname
) as fs_handle
:
713 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
714 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, subvolname
, SubvolumeOpType
.SNAP_UNPROTECT
) as subvolume
:
715 log
.warning("snapshot unprotect call is deprecated and will be removed in a future release")
716 except VolumeException
as ve
:
717 ret
= self
.volume_exception_to_retval(ve
)
720 def _prepare_clone_subvolume(self
, fs_handle
, volname
, s_subvolume
, s_snapname
, t_group
, t_subvolname
, **kwargs
):
721 t_pool
= kwargs
['pool_layout']
722 s_subvolname
= kwargs
['sub_name']
723 s_groupname
= kwargs
['group_name']
724 t_groupname
= kwargs
['target_group_name']
726 create_clone(self
.mgr
, fs_handle
, self
.volspec
, t_group
, t_subvolname
, t_pool
, volname
, s_subvolume
, s_snapname
)
727 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, t_group
, t_subvolname
, SubvolumeOpType
.CLONE_INTERNAL
) as t_subvolume
:
729 if t_groupname
== s_groupname
and t_subvolname
== s_subvolname
:
730 t_subvolume
.attach_snapshot(s_snapname
, t_subvolume
)
732 s_subvolume
.attach_snapshot(s_snapname
, t_subvolume
)
733 self
.cloner
.queue_job(volname
)
734 except VolumeException
as ve
:
737 self
.purge_queue
.queue_job(volname
)
738 except Exception as e
:
739 log
.warning("failed to cleanup clone subvolume '{0}' ({1})".format(t_subvolname
, e
))
742 def _clone_subvolume_snapshot(self
, fs_handle
, volname
, s_group
, s_subvolume
, **kwargs
):
743 s_snapname
= kwargs
['snap_name']
744 target_subvolname
= kwargs
['target_sub_name']
745 target_groupname
= kwargs
['target_group_name']
746 s_groupname
= kwargs
['group_name']
748 if not s_snapname
.encode('utf-8') in s_subvolume
.list_snapshots():
749 raise VolumeException(-errno
.ENOENT
, "snapshot '{0}' does not exist".format(s_snapname
))
751 with
open_group_unique(fs_handle
, self
.volspec
, target_groupname
, s_group
, s_groupname
) as target_group
:
753 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, target_group
, target_subvolname
, SubvolumeOpType
.CLONE_CREATE
):
754 raise VolumeException(-errno
.EEXIST
, "subvolume '{0}' exists".format(target_subvolname
))
755 except VolumeException
as ve
:
756 if ve
.errno
== -errno
.ENOENT
:
757 self
._prepare
_clone
_subvolume
(fs_handle
, volname
, s_subvolume
, s_snapname
,
758 target_group
, target_subvolname
, **kwargs
)
762 def clone_subvolume_snapshot(self
, **kwargs
):
764 volname
= kwargs
['vol_name']
765 s_subvolname
= kwargs
['sub_name']
766 s_groupname
= kwargs
['group_name']
769 with
open_volume(self
, volname
) as fs_handle
:
770 with
open_group(fs_handle
, self
.volspec
, s_groupname
) as s_group
:
771 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, s_group
, s_subvolname
, SubvolumeOpType
.CLONE_SOURCE
) as s_subvolume
:
772 self
._clone
_subvolume
_snapshot
(fs_handle
, volname
, s_group
, s_subvolume
, **kwargs
)
773 except VolumeException
as ve
:
774 ret
= self
.volume_exception_to_retval(ve
)
777 def clone_status(self
, **kwargs
):
779 volname
= kwargs
['vol_name']
780 clonename
= kwargs
['clone_name']
781 groupname
= kwargs
['group_name']
784 with
open_volume(self
, volname
) as fs_handle
:
785 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
786 with
open_subvol(self
.mgr
, fs_handle
, self
.volspec
, group
, clonename
, SubvolumeOpType
.CLONE_STATUS
) as subvolume
:
787 ret
= 0, json
.dumps({'status' : subvolume
.status
}, indent
=2), ""
788 except VolumeException
as ve
:
789 ret
= self
.volume_exception_to_retval(ve
)
792 def clone_cancel(self
, **kwargs
):
794 volname
= kwargs
['vol_name']
795 clonename
= kwargs
['clone_name']
796 groupname
= kwargs
['group_name']
799 self
.cloner
.cancel_job(volname
, (clonename
, groupname
))
800 except VolumeException
as ve
:
801 ret
= self
.volume_exception_to_retval(ve
)
806 def create_subvolume_group(self
, **kwargs
):
808 volname
= kwargs
['vol_name']
809 groupname
= kwargs
['group_name']
810 size
= kwargs
['size']
811 pool
= kwargs
['pool_layout']
814 mode
= kwargs
['mode']
817 with
open_volume(self
, volname
) as fs_handle
:
819 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
820 # idempotent creation -- valid.
824 'mode': octal_str_to_decimal_int(mode
),
828 set_group_attrs(fs_handle
, group
.path
, attrs
)
829 except VolumeException
as ve
:
830 if ve
.errno
== -errno
.ENOENT
:
831 oct_mode
= octal_str_to_decimal_int(mode
)
832 create_group(fs_handle
, self
.volspec
, groupname
, size
, pool
, oct_mode
, uid
, gid
)
835 except VolumeException
as ve
:
836 # volume does not exist or subvolume group creation failed
837 ret
= self
.volume_exception_to_retval(ve
)
840 def remove_subvolume_group(self
, **kwargs
):
842 volname
= kwargs
['vol_name']
843 groupname
= kwargs
['group_name']
844 force
= kwargs
['force']
847 with
open_volume(self
, volname
) as fs_handle
:
848 remove_group(fs_handle
, self
.volspec
, groupname
)
849 except VolumeException
as ve
:
850 if not (ve
.errno
== -errno
.ENOENT
and force
):
851 ret
= self
.volume_exception_to_retval(ve
)
854 def subvolumegroup_info(self
, **kwargs
):
856 volname
= kwargs
['vol_name']
857 groupname
= kwargs
['group_name']
860 with
open_volume(self
, volname
) as fs_handle
:
861 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
863 mon_map_mons
= self
.mgr
.get('mon_map')['mons']
864 for mon
in mon_map_mons
:
865 ip_port
= mon
['addr'].split("/")[0]
866 mon_addr_lst
.append(ip_port
)
868 group_info_dict
= group
.info()
869 group_info_dict
["mon_addrs"] = mon_addr_lst
870 ret
= 0, json
.dumps(group_info_dict
, indent
=4, sort_keys
=True), ""
871 except VolumeException
as ve
:
872 ret
= self
.volume_exception_to_retval(ve
)
875 def resize_subvolume_group(self
, **kwargs
):
877 volname
= kwargs
['vol_name']
878 groupname
= kwargs
['group_name']
879 newsize
= kwargs
['new_size']
880 noshrink
= kwargs
['no_shrink']
883 with
open_volume(self
, volname
) as fs_handle
:
884 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
885 nsize
, usedbytes
= group
.resize(newsize
, noshrink
)
887 [{'bytes_used': usedbytes
},{'bytes_quota': nsize
},
888 {'bytes_pcent': "undefined" if nsize
== 0 else '{0:.2f}'.format((float(usedbytes
) / nsize
) * 100.0)}],
889 indent
=4, sort_keys
=True), ""
890 except VolumeException
as ve
:
891 ret
= self
.volume_exception_to_retval(ve
)
894 def getpath_subvolume_group(self
, **kwargs
):
895 volname
= kwargs
['vol_name']
896 groupname
= kwargs
['group_name']
899 with
open_volume(self
, volname
) as fs_handle
:
900 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
901 return 0, group
.path
.decode('utf-8'), ""
902 except VolumeException
as ve
:
903 return self
.volume_exception_to_retval(ve
)
905 def list_subvolume_groups(self
, **kwargs
):
906 volname
= kwargs
['vol_name']
908 volume_exists
= False
910 with
open_volume(self
, volname
) as fs_handle
:
912 groups
= listdir(fs_handle
, self
.volspec
.base_dir
, filter_entries
=[dir.encode('utf-8') for dir in self
.volspec
.INTERNAL_DIRS
])
913 ret
= 0, name_to_json(groups
), ""
914 except VolumeException
as ve
:
915 if not ve
.errno
== -errno
.ENOENT
or not volume_exists
:
916 ret
= self
.volume_exception_to_retval(ve
)
919 def pin_subvolume_group(self
, **kwargs
):
921 volname
= kwargs
['vol_name']
922 groupname
= kwargs
['group_name']
923 pin_type
= kwargs
['pin_type']
924 pin_setting
= kwargs
['pin_setting']
927 with
open_volume(self
, volname
) as fs_handle
:
928 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
929 group
.pin(pin_type
, pin_setting
)
930 ret
= 0, json
.dumps({}), ""
931 except VolumeException
as ve
:
932 ret
= self
.volume_exception_to_retval(ve
)
935 def subvolume_group_exists(self
, **kwargs
):
936 volname
= kwargs
['vol_name']
938 volume_exists
= False
941 with
open_volume(self
, volname
) as fs_handle
:
943 res
= has_subdir(fs_handle
, self
.volspec
.base_dir
, filter_entries
=[
944 dir.encode('utf-8') for dir in self
.volspec
.INTERNAL_DIRS
])
946 ret
= 0, "subvolumegroup exists", ""
948 ret
= 0, "no subvolumegroup exists", ""
949 except VolumeException
as ve
:
950 if volume_exists
and ve
.errno
== -errno
.ENOENT
:
951 ret
= 0, "no subvolumegroup exists", ""
953 ret
= self
.volume_exception_to_retval(ve
)
958 def create_subvolume_group_snapshot(self
, **kwargs
):
959 ret
= -errno
.ENOSYS
, "", "subvolume group snapshots are not supported"
960 volname
= kwargs
['vol_name']
961 groupname
= kwargs
['group_name']
962 # snapname = kwargs['snap_name']
965 with
open_volume(self
, volname
) as fs_handle
:
966 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
967 # as subvolumes are marked with the vxattr ceph.dir.subvolume deny snapshots
968 # at the subvolume group (see: https://tracker.ceph.com/issues/46074)
969 # group.create_snapshot(snapname)
971 except VolumeException
as ve
:
972 ret
= self
.volume_exception_to_retval(ve
)
975 def remove_subvolume_group_snapshot(self
, **kwargs
):
977 volname
= kwargs
['vol_name']
978 groupname
= kwargs
['group_name']
979 snapname
= kwargs
['snap_name']
980 force
= kwargs
['force']
983 with
open_volume(self
, volname
) as fs_handle
:
984 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
985 group
.remove_snapshot(snapname
)
986 except VolumeException
as ve
:
987 if not (ve
.errno
== -errno
.ENOENT
and force
):
988 ret
= self
.volume_exception_to_retval(ve
)
991 def list_subvolume_group_snapshots(self
, **kwargs
):
993 volname
= kwargs
['vol_name']
994 groupname
= kwargs
['group_name']
997 with
open_volume(self
, volname
) as fs_handle
:
998 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
999 snapshots
= group
.list_snapshots()
1000 ret
= 0, name_to_json(snapshots
), ""
1001 except VolumeException
as ve
:
1002 ret
= self
.volume_exception_to_retval(ve
)