4 from threading
import Event
8 from .fs_util
import listdir
10 from .operations
.volume
import ConnectionPool
, open_volume
, create_volume
, \
11 delete_volume
, list_volumes
, get_pool_names
12 from .operations
.group
import open_group
, create_group
, remove_group
13 from .operations
.subvolume
import open_subvol
, create_subvol
, remove_subvol
, \
16 from .vol_spec
import VolSpec
17 from .exception
import VolumeException
18 from .async_cloner
import Cloner
19 from .purge_queue
import ThreadPoolPurgeQueueMixin
21 log
= logging
.getLogger(__name__
)
23 def octal_str_to_decimal_int(mode
):
27 raise VolumeException(-errno
.EINVAL
, "Invalid mode '{0}'".format(mode
))
29 def name_to_json(names
):
31 convert the list of names to json
34 for i
in range(len(names
)):
35 namedict
.append({'name': names
[i
].decode('utf-8')})
36 return json
.dumps(namedict
, indent
=4, sort_keys
=True)
38 class VolumeClient(object):
39 def __init__(self
, mgr
):
41 self
.stopping
= Event()
42 # volume specification
43 self
.volspec
= VolSpec(mgr
.rados
.conf_get('client_snapdir'))
44 self
.connection_pool
= ConnectionPool(self
.mgr
)
45 # TODO: make thread pool size configurable
46 self
.cloner
= Cloner(self
, 4)
47 self
.purge_queue
= ThreadPoolPurgeQueueMixin(self
, 4)
48 # on startup, queue purge job for available volumes to kickstart
49 # purge for leftover subvolume entries in trash. note that, if the
50 # trash directory does not exist or if there are no purge entries
51 # available for a volume, the volume is removed from the purge
53 fs_map
= self
.mgr
.get('fs_map')
54 for fs
in fs_map
['filesystems']:
55 self
.cloner
.queue_job(fs
['mdsmap']['fs_name'])
56 self
.purge_queue
.queue_job(fs
['mdsmap']['fs_name'])
58 def is_stopping(self
):
59 return self
.stopping
.is_set()
62 log
.info("shutting down")
63 # first, note that we're shutting down
65 # second, ask purge threads to quit
66 self
.purge_queue
.cancel_all_jobs()
67 # third, delete all libcephfs handles from connection pool
68 self
.connection_pool
.del_all_handles()
70 def cluster_log(self
, msg
, lvl
=None):
72 log to cluster log with default log level as WARN.
75 lvl
= self
.mgr
.CLUSTER_LOG_PRIO_WARN
76 self
.mgr
.cluster_log("cluster", lvl
, msg
)
78 def volume_exception_to_retval(self
, ve
):
80 return a tuple representation from a volume exception
84 ### volume operations -- create, rm, ls
86 def create_fs_volume(self
, volname
, placement
):
87 if self
.is_stopping():
88 return -errno
.ESHUTDOWN
, "", "shutdown in progress"
89 return create_volume(self
.mgr
, volname
, placement
)
91 def delete_fs_volume(self
, volname
, confirm
):
92 if self
.is_stopping():
93 return -errno
.ESHUTDOWN
, "", "shutdown in progress"
95 if confirm
!= "--yes-i-really-mean-it":
96 return -errno
.EPERM
, "", "WARNING: this will *PERMANENTLY DESTROY* all data " \
97 "stored in the filesystem '{0}'. If you are *ABSOLUTELY CERTAIN* " \
98 "that is what you want, re-issue the command followed by " \
99 "--yes-i-really-mean-it.".format(volname
)
101 ret
, out
, err
= self
.mgr
.check_mon_command({
102 'prefix': 'config get',
103 'key': 'mon_allow_pool_delete',
107 mon_allow_pool_delete
= json
.loads(out
)
108 if not mon_allow_pool_delete
:
109 return -errno
.EPERM
, "", "pool deletion is disabled; you must first " \
110 "set the mon_allow_pool_delete config option to true before volumes " \
113 metadata_pool
, data_pools
= get_pool_names(self
.mgr
, volname
)
114 if not metadata_pool
:
115 return -errno
.ENOENT
, "", "volume {0} doesn't exist".format(volname
)
116 self
.purge_queue
.cancel_jobs(volname
)
117 self
.connection_pool
.del_fs_handle(volname
, wait
=True)
118 return delete_volume(self
.mgr
, volname
, metadata_pool
, data_pools
)
120 def list_fs_volumes(self
):
121 if self
.stopping
.is_set():
122 return -errno
.ESHUTDOWN
, "", "shutdown in progress"
123 volumes
= list_volumes(self
.mgr
)
124 return 0, json
.dumps(volumes
, indent
=4, sort_keys
=True), ""
126 ### subvolume operations
128 def _create_subvolume(self
, fs_handle
, volname
, group
, subvolname
, **kwargs
):
129 size
= kwargs
['size']
130 pool
= kwargs
['pool_layout']
133 mode
= kwargs
['mode']
134 isolate_nspace
= kwargs
['namespace_isolated']
136 oct_mode
= octal_str_to_decimal_int(mode
)
139 fs_handle
, self
.volspec
, group
, subvolname
, size
, isolate_nspace
, pool
, oct_mode
, uid
, gid
)
140 except VolumeException
as ve
:
141 # kick the purge threads for async removal -- note that this
142 # assumes that the subvolume is moved to trashcan for cleanup on error.
143 self
.purge_queue
.queue_job(volname
)
146 def create_subvolume(self
, **kwargs
):
148 volname
= kwargs
['vol_name']
149 subvolname
= kwargs
['sub_name']
150 groupname
= kwargs
['group_name']
151 size
= kwargs
['size']
152 pool
= kwargs
['pool_layout']
155 isolate_nspace
= kwargs
['namespace_isolated']
158 with
open_volume(self
, volname
) as fs_handle
:
159 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
161 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
162 # idempotent creation -- valid. Attributes set is supported.
163 uid
= uid
if uid
else subvolume
.uid
164 gid
= gid
if gid
else subvolume
.gid
165 subvolume
.set_attrs(subvolume
.path
, size
, isolate_nspace
, pool
, uid
, gid
)
166 except VolumeException
as ve
:
167 if ve
.errno
== -errno
.ENOENT
:
168 self
._create
_subvolume
(fs_handle
, volname
, group
, subvolname
, **kwargs
)
171 except VolumeException
as ve
:
172 # volume/group does not exist or subvolume creation failed
173 ret
= self
.volume_exception_to_retval(ve
)
176 def remove_subvolume(self
, **kwargs
):
178 volname
= kwargs
['vol_name']
179 subvolname
= kwargs
['sub_name']
180 groupname
= kwargs
['group_name']
181 force
= kwargs
['force']
184 with
open_volume(self
, volname
) as fs_handle
:
185 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
186 remove_subvol(fs_handle
, self
.volspec
, group
, subvolname
, force
)
187 # kick the purge threads for async removal -- note that this
188 # assumes that the subvolume is moved to trash can.
189 # TODO: make purge queue as singleton so that trash can kicks
190 # the purge threads on dump.
191 self
.purge_queue
.queue_job(volname
)
192 except VolumeException
as ve
:
193 if ve
.errno
== -errno
.EAGAIN
:
194 ve
= VolumeException(ve
.errno
, ve
.error_str
+ " (use --force to override)")
195 ret
= self
.volume_exception_to_retval(ve
)
196 elif not (ve
.errno
== -errno
.ENOENT
and force
):
197 ret
= self
.volume_exception_to_retval(ve
)
200 def resize_subvolume(self
, **kwargs
):
202 volname
= kwargs
['vol_name']
203 subvolname
= kwargs
['sub_name']
204 newsize
= kwargs
['new_size']
205 noshrink
= kwargs
['no_shrink']
206 groupname
= kwargs
['group_name']
209 with
open_volume(self
, volname
) as fs_handle
:
210 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
211 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
212 nsize
, usedbytes
= subvolume
.resize(newsize
, noshrink
)
214 [{'bytes_used': usedbytes
},{'bytes_quota': nsize
},
215 {'bytes_pcent': "undefined" if nsize
== 0 else '{0:.2f}'.format((float(usedbytes
) / nsize
) * 100.0)}],
216 indent
=4, sort_keys
=True), ""
217 except VolumeException
as ve
:
218 ret
= self
.volume_exception_to_retval(ve
)
221 def subvolume_pin(self
, **kwargs
):
223 volname
= kwargs
['vol_name']
224 subvolname
= kwargs
['sub_name']
225 pin_type
= kwargs
['pin_type']
226 pin_setting
= kwargs
['pin_setting']
227 groupname
= kwargs
['group_name']
230 with
open_volume(self
, volname
) as fs_handle
:
231 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
232 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
233 subvolume
.pin(pin_type
, pin_setting
)
234 ret
= 0, json
.dumps({}), ""
235 except VolumeException
as ve
:
236 ret
= self
.volume_exception_to_retval(ve
)
239 def subvolume_getpath(self
, **kwargs
):
241 volname
= kwargs
['vol_name']
242 subvolname
= kwargs
['sub_name']
243 groupname
= kwargs
['group_name']
246 with
open_volume(self
, volname
) as fs_handle
:
247 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
248 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
249 subvolpath
= subvolume
.path
250 ret
= 0, subvolpath
.decode("utf-8"), ""
251 except VolumeException
as ve
:
252 ret
= self
.volume_exception_to_retval(ve
)
255 def subvolume_info(self
, **kwargs
):
257 volname
= kwargs
['vol_name']
258 subvolname
= kwargs
['sub_name']
259 groupname
= kwargs
['group_name']
262 with
open_volume(self
, volname
) as fs_handle
:
263 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
264 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
266 mon_map_mons
= self
.mgr
.get('mon_map')['mons']
267 for mon
in mon_map_mons
:
268 ip_port
= mon
['addr'].split("/")[0]
269 mon_addr_lst
.append(ip_port
)
271 subvol_info_dict
= subvolume
.info()
272 subvol_info_dict
["mon_addrs"] = mon_addr_lst
273 ret
= 0, json
.dumps(subvol_info_dict
, indent
=4, sort_keys
=True), ""
274 except VolumeException
as ve
:
275 ret
= self
.volume_exception_to_retval(ve
)
278 def list_subvolumes(self
, **kwargs
):
280 volname
= kwargs
['vol_name']
281 groupname
= kwargs
['group_name']
284 with
open_volume(self
, volname
) as fs_handle
:
285 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
286 subvolumes
= group
.list_subvolumes()
287 ret
= 0, name_to_json(subvolumes
), ""
288 except VolumeException
as ve
:
289 ret
= self
.volume_exception_to_retval(ve
)
292 ### subvolume snapshot
294 def create_subvolume_snapshot(self
, **kwargs
):
296 volname
= kwargs
['vol_name']
297 subvolname
= kwargs
['sub_name']
298 snapname
= kwargs
['snap_name']
299 groupname
= kwargs
['group_name']
302 with
open_volume(self
, volname
) as fs_handle
:
303 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
304 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
305 subvolume
.create_snapshot(snapname
)
306 except VolumeException
as ve
:
307 ret
= self
.volume_exception_to_retval(ve
)
310 def remove_subvolume_snapshot(self
, **kwargs
):
312 volname
= kwargs
['vol_name']
313 subvolname
= kwargs
['sub_name']
314 snapname
= kwargs
['snap_name']
315 groupname
= kwargs
['group_name']
316 force
= kwargs
['force']
319 with
open_volume(self
, volname
) as fs_handle
:
320 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
321 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
322 subvolume
.remove_snapshot(snapname
)
323 except VolumeException
as ve
:
324 if not (ve
.errno
== -errno
.ENOENT
and force
):
325 ret
= self
.volume_exception_to_retval(ve
)
328 def subvolume_snapshot_info(self
, **kwargs
):
330 volname
= kwargs
['vol_name']
331 subvolname
= kwargs
['sub_name']
332 snapname
= kwargs
['snap_name']
333 groupname
= kwargs
['group_name']
336 with
open_volume(self
, volname
) as fs_handle
:
337 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
338 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
339 snap_info_dict
= subvolume
.snapshot_info(snapname
)
340 ret
= 0, json
.dumps(snap_info_dict
, indent
=4, sort_keys
=True), ""
341 except VolumeException
as ve
:
342 ret
= self
.volume_exception_to_retval(ve
)
345 def list_subvolume_snapshots(self
, **kwargs
):
347 volname
= kwargs
['vol_name']
348 subvolname
= kwargs
['sub_name']
349 groupname
= kwargs
['group_name']
352 with
open_volume(self
, volname
) as fs_handle
:
353 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
354 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
355 snapshots
= subvolume
.list_snapshots()
356 ret
= 0, name_to_json(snapshots
), ""
357 except VolumeException
as ve
:
358 ret
= self
.volume_exception_to_retval(ve
)
361 def protect_subvolume_snapshot(self
, **kwargs
):
362 ret
= 0, "", "Deprecation warning: 'snapshot protect' call is deprecated and will be removed in a future release"
363 volname
= kwargs
['vol_name']
364 subvolname
= kwargs
['sub_name']
365 groupname
= kwargs
['group_name']
368 with
open_volume(self
, volname
) as fs_handle
:
369 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
370 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
371 log
.warning("snapshot protect call is deprecated and will be removed in a future release")
372 except VolumeException
as ve
:
373 ret
= self
.volume_exception_to_retval(ve
)
376 def unprotect_subvolume_snapshot(self
, **kwargs
):
377 ret
= 0, "", "Deprecation warning: 'snapshot unprotect' call is deprecated and will be removed in a future release"
378 volname
= kwargs
['vol_name']
379 subvolname
= kwargs
['sub_name']
380 groupname
= kwargs
['group_name']
383 with
open_volume(self
, volname
) as fs_handle
:
384 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
385 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
386 log
.warning("snapshot unprotect call is deprecated and will be removed in a future release")
387 except VolumeException
as ve
:
388 ret
= self
.volume_exception_to_retval(ve
)
391 def _prepare_clone_subvolume(self
, fs_handle
, volname
, subvolume
, snapname
, target_group
, target_subvolname
, target_pool
):
392 create_clone(fs_handle
, self
.volspec
, target_group
, target_subvolname
, target_pool
, volname
, subvolume
, snapname
)
393 with
open_subvol(fs_handle
, self
.volspec
, target_group
, target_subvolname
, need_complete
=False) as target_subvolume
:
395 subvolume
.attach_snapshot(snapname
, target_subvolume
)
396 self
.cloner
.queue_job(volname
)
397 except VolumeException
as ve
:
399 target_subvolume
.remove()
400 self
.purge_queue
.queue_job(volname
)
401 except Exception as e
:
402 log
.warning("failed to cleanup clone subvolume '{0}' ({1})".format(target_subvolname
, e
))
405 def _clone_subvolume_snapshot(self
, fs_handle
, volname
, subvolume
, **kwargs
):
406 snapname
= kwargs
['snap_name']
407 target_pool
= kwargs
['pool_layout']
408 target_subvolname
= kwargs
['target_sub_name']
409 target_groupname
= kwargs
['target_group_name']
411 if not snapname
.encode('utf-8') in subvolume
.list_snapshots():
412 raise VolumeException(-errno
.ENOENT
, "snapshot '{0}' does not exist".format(snapname
))
414 # TODO: when the target group is same as source, reuse group object.
415 with
open_group(fs_handle
, self
.volspec
, target_groupname
) as target_group
:
417 with
open_subvol(fs_handle
, self
.volspec
, target_group
, target_subvolname
, need_complete
=False):
418 raise VolumeException(-errno
.EEXIST
, "subvolume '{0}' exists".format(target_subvolname
))
419 except VolumeException
as ve
:
420 if ve
.errno
== -errno
.ENOENT
:
421 self
._prepare
_clone
_subvolume
(fs_handle
, volname
, subvolume
, snapname
,
422 target_group
, target_subvolname
, target_pool
)
426 def clone_subvolume_snapshot(self
, **kwargs
):
428 volname
= kwargs
['vol_name']
429 subvolname
= kwargs
['sub_name']
430 groupname
= kwargs
['group_name']
433 with
open_volume(self
, volname
) as fs_handle
:
434 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
435 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
436 self
._clone
_subvolume
_snapshot
(fs_handle
, volname
, subvolume
, **kwargs
)
437 except VolumeException
as ve
:
438 ret
= self
.volume_exception_to_retval(ve
)
441 def clone_status(self
, **kwargs
):
443 volname
= kwargs
['vol_name']
444 clonename
= kwargs
['clone_name']
445 groupname
= kwargs
['group_name']
448 with
open_volume(self
, volname
) as fs_handle
:
449 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
450 with
open_subvol(fs_handle
, self
.volspec
, group
, clonename
,
451 need_complete
=False, expected_types
=["clone"]) as subvolume
:
452 ret
= 0, json
.dumps({'status' : subvolume
.status
}, indent
=2), ""
453 except VolumeException
as ve
:
454 ret
= self
.volume_exception_to_retval(ve
)
457 def clone_cancel(self
, **kwargs
):
459 volname
= kwargs
['vol_name']
460 clonename
= kwargs
['clone_name']
461 groupname
= kwargs
['group_name']
464 self
.cloner
.cancel_job(volname
, (clonename
, groupname
))
465 except VolumeException
as ve
:
466 ret
= self
.volume_exception_to_retval(ve
)
471 def create_subvolume_group(self
, **kwargs
):
473 volname
= kwargs
['vol_name']
474 groupname
= kwargs
['group_name']
475 pool
= kwargs
['pool_layout']
478 mode
= kwargs
['mode']
481 with
open_volume(self
, volname
) as fs_handle
:
483 with
open_group(fs_handle
, self
.volspec
, groupname
):
484 # idempotent creation -- valid.
486 except VolumeException
as ve
:
487 if ve
.errno
== -errno
.ENOENT
:
488 oct_mode
= octal_str_to_decimal_int(mode
)
489 create_group(fs_handle
, self
.volspec
, groupname
, pool
, oct_mode
, uid
, gid
)
492 except VolumeException
as ve
:
493 # volume does not exist or subvolume group creation failed
494 ret
= self
.volume_exception_to_retval(ve
)
497 def remove_subvolume_group(self
, **kwargs
):
499 volname
= kwargs
['vol_name']
500 groupname
= kwargs
['group_name']
501 force
= kwargs
['force']
504 with
open_volume(self
, volname
) as fs_handle
:
505 remove_group(fs_handle
, self
.volspec
, groupname
)
506 except VolumeException
as ve
:
507 if not (ve
.errno
== -errno
.ENOENT
and force
):
508 ret
= self
.volume_exception_to_retval(ve
)
511 def getpath_subvolume_group(self
, **kwargs
):
512 volname
= kwargs
['vol_name']
513 groupname
= kwargs
['group_name']
516 with
open_volume(self
, volname
) as fs_handle
:
517 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
518 return 0, group
.path
.decode('utf-8'), ""
519 except VolumeException
as ve
:
520 return self
.volume_exception_to_retval(ve
)
522 def list_subvolume_groups(self
, **kwargs
):
523 volname
= kwargs
['vol_name']
526 with
open_volume(self
, volname
) as fs_handle
:
527 groups
= listdir(fs_handle
, self
.volspec
.base_dir
)
528 ret
= 0, name_to_json(groups
), ""
529 except VolumeException
as ve
:
530 if not ve
.errno
== -errno
.ENOENT
:
531 ret
= self
.volume_exception_to_retval(ve
)
534 def pin_subvolume_group(self
, **kwargs
):
536 volname
= kwargs
['vol_name']
537 groupname
= kwargs
['group_name']
538 pin_type
= kwargs
['pin_type']
539 pin_setting
= kwargs
['pin_setting']
542 with
open_volume(self
, volname
) as fs_handle
:
543 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
544 group
.pin(pin_type
, pin_setting
)
545 ret
= 0, json
.dumps({}), ""
546 except VolumeException
as ve
:
547 ret
= self
.volume_exception_to_retval(ve
)
552 def create_subvolume_group_snapshot(self
, **kwargs
):
554 volname
= kwargs
['vol_name']
555 groupname
= kwargs
['group_name']
556 snapname
= kwargs
['snap_name']
559 with
open_volume(self
, volname
) as fs_handle
:
560 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
561 group
.create_snapshot(snapname
)
562 except VolumeException
as ve
:
563 ret
= self
.volume_exception_to_retval(ve
)
566 def remove_subvolume_group_snapshot(self
, **kwargs
):
568 volname
= kwargs
['vol_name']
569 groupname
= kwargs
['group_name']
570 snapname
= kwargs
['snap_name']
571 force
= kwargs
['force']
574 with
open_volume(self
, volname
) as fs_handle
:
575 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
576 group
.remove_snapshot(snapname
)
577 except VolumeException
as ve
:
578 if not (ve
.errno
== -errno
.ENOENT
and force
):
579 ret
= self
.volume_exception_to_retval(ve
)
582 def list_subvolume_group_snapshots(self
, **kwargs
):
584 volname
= kwargs
['vol_name']
585 groupname
= kwargs
['group_name']
588 with
open_volume(self
, volname
) as fs_handle
:
589 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
590 snapshots
= group
.list_snapshots()
591 ret
= 0, name_to_json(snapshots
), ""
592 except VolumeException
as ve
:
593 ret
= self
.volume_exception_to_retval(ve
)