4 from threading
import Event
8 from .fs_util
import listdir
10 from .operations
.volume
import ConnectionPool
, open_volume
, create_volume
, \
11 delete_volume
, list_volumes
12 from .operations
.group
import open_group
, create_group
, remove_group
13 from .operations
.subvolume
import open_subvol
, create_subvol
, remove_subvol
, \
16 from .vol_spec
import VolSpec
17 from .exception
import VolumeException
18 from .async_cloner
import Cloner
19 from .purge_queue
import ThreadPoolPurgeQueueMixin
21 log
= logging
.getLogger(__name__
)
23 def octal_str_to_decimal_int(mode
):
27 raise VolumeException(-errno
.EINVAL
, "Invalid mode '{0}'".format(mode
))
29 def name_to_json(names
):
31 convert the list of names to json
34 for i
in range(len(names
)):
35 namedict
.append({'name': names
[i
].decode('utf-8')})
36 return json
.dumps(namedict
, indent
=4, sort_keys
=True)
38 class VolumeClient(object):
39 def __init__(self
, mgr
):
41 self
.stopping
= Event()
42 # volume specification
43 self
.volspec
= VolSpec(mgr
.rados
.conf_get('client_snapdir'))
44 self
.connection_pool
= ConnectionPool(self
.mgr
)
45 # TODO: make thread pool size configurable
46 self
.cloner
= Cloner(self
, 4)
47 self
.purge_queue
= ThreadPoolPurgeQueueMixin(self
, 4)
48 # on startup, queue purge job for available volumes to kickstart
49 # purge for leftover subvolume entries in trash. note that, if the
50 # trash directory does not exist or if there are no purge entries
51 # available for a volume, the volume is removed from the purge
53 fs_map
= self
.mgr
.get('fs_map')
54 for fs
in fs_map
['filesystems']:
55 self
.cloner
.queue_job(fs
['mdsmap']['fs_name'])
56 self
.purge_queue
.queue_job(fs
['mdsmap']['fs_name'])
58 def is_stopping(self
):
59 return self
.stopping
.is_set()
62 log
.info("shutting down")
63 # first, note that we're shutting down
65 # second, ask purge threads to quit
66 self
.purge_queue
.cancel_all_jobs()
67 # third, delete all libcephfs handles from connection pool
68 self
.connection_pool
.del_all_handles()
70 def cluster_log(self
, msg
, lvl
=None):
72 log to cluster log with default log level as WARN.
75 lvl
= self
.mgr
.CLUSTER_LOG_PRIO_WARN
76 self
.mgr
.cluster_log("cluster", lvl
, msg
)
78 def volume_exception_to_retval(self
, ve
):
80 return a tuple representation from a volume exception
84 ### volume operations -- create, rm, ls
86 def create_fs_volume(self
, volname
, placement
):
87 if self
.is_stopping():
88 return -errno
.ESHUTDOWN
, "", "shutdown in progress"
89 return create_volume(self
.mgr
, volname
, placement
)
91 def delete_fs_volume(self
, volname
, confirm
):
92 if self
.is_stopping():
93 return -errno
.ESHUTDOWN
, "", "shutdown in progress"
95 if confirm
!= "--yes-i-really-mean-it":
96 return -errno
.EPERM
, "", "WARNING: this will *PERMANENTLY DESTROY* all data " \
97 "stored in the filesystem '{0}'. If you are *ABSOLUTELY CERTAIN* " \
98 "that is what you want, re-issue the command followed by " \
99 "--yes-i-really-mean-it.".format(volname
)
101 self
.purge_queue
.cancel_jobs(volname
)
102 self
.connection_pool
.del_fs_handle(volname
, wait
=True)
103 return delete_volume(self
.mgr
, volname
)
105 def list_fs_volumes(self
):
106 if self
.stopping
.is_set():
107 return -errno
.ESHUTDOWN
, "", "shutdown in progress"
108 volumes
= list_volumes(self
.mgr
)
109 return 0, json
.dumps(volumes
, indent
=4, sort_keys
=True), ""
111 ### subvolume operations
113 def _create_subvolume(self
, fs_handle
, volname
, group
, subvolname
, **kwargs
):
114 size
= kwargs
['size']
115 pool
= kwargs
['pool_layout']
118 mode
= kwargs
['mode']
120 oct_mode
= octal_str_to_decimal_int(mode
)
123 fs_handle
, self
.volspec
, group
, subvolname
, size
, False, pool
, oct_mode
, uid
, gid
)
124 except VolumeException
as ve
:
125 # kick the purge threads for async removal -- note that this
126 # assumes that the subvolume is moved to trashcan for cleanup on error.
127 self
.purge_queue
.queue_job(volname
)
130 def create_subvolume(self
, **kwargs
):
132 volname
= kwargs
['vol_name']
133 subvolname
= kwargs
['sub_name']
134 groupname
= kwargs
['group_name']
137 with
open_volume(self
, volname
) as fs_handle
:
138 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
140 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
):
141 # idempotent creation -- valid.
143 except VolumeException
as ve
:
144 if ve
.errno
== -errno
.ENOENT
:
145 self
._create
_subvolume
(fs_handle
, volname
, group
, subvolname
, **kwargs
)
148 except VolumeException
as ve
:
149 # volume/group does not exist or subvolume creation failed
150 ret
= self
.volume_exception_to_retval(ve
)
153 def remove_subvolume(self
, **kwargs
):
155 volname
= kwargs
['vol_name']
156 subvolname
= kwargs
['sub_name']
157 groupname
= kwargs
['group_name']
158 force
= kwargs
['force']
161 with
open_volume(self
, volname
) as fs_handle
:
162 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
163 remove_subvol(fs_handle
, self
.volspec
, group
, subvolname
, force
)
164 # kick the purge threads for async removal -- note that this
165 # assumes that the subvolume is moved to trash can.
166 # TODO: make purge queue as singleton so that trash can kicks
167 # the purge threads on dump.
168 self
.purge_queue
.queue_job(volname
)
169 except VolumeException
as ve
:
170 if ve
.errno
== -errno
.EAGAIN
:
171 ve
= VolumeException(ve
.errno
, ve
.error_str
+ " (use --force to override)")
172 ret
= self
.volume_exception_to_retval(ve
)
173 elif not (ve
.errno
== -errno
.ENOENT
and force
):
174 ret
= self
.volume_exception_to_retval(ve
)
177 def resize_subvolume(self
, **kwargs
):
179 volname
= kwargs
['vol_name']
180 subvolname
= kwargs
['sub_name']
181 newsize
= kwargs
['new_size']
182 noshrink
= kwargs
['no_shrink']
183 groupname
= kwargs
['group_name']
186 with
open_volume(self
, volname
) as fs_handle
:
187 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
188 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
189 nsize
, usedbytes
= subvolume
.resize(newsize
, noshrink
)
191 [{'bytes_used': usedbytes
},{'bytes_quota': nsize
},
192 {'bytes_pcent': "undefined" if nsize
== 0 else '{0:.2f}'.format((float(usedbytes
) / nsize
) * 100.0)}],
193 indent
=4, sort_keys
=True), ""
194 except VolumeException
as ve
:
195 ret
= self
.volume_exception_to_retval(ve
)
198 def subvolume_getpath(self
, **kwargs
):
200 volname
= kwargs
['vol_name']
201 subvolname
= kwargs
['sub_name']
202 groupname
= kwargs
['group_name']
205 with
open_volume(self
, volname
) as fs_handle
:
206 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
207 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
208 subvolpath
= subvolume
.path
209 ret
= 0, subvolpath
.decode("utf-8"), ""
210 except VolumeException
as ve
:
211 ret
= self
.volume_exception_to_retval(ve
)
214 def subvolume_info(self
, **kwargs
):
216 volname
= kwargs
['vol_name']
217 subvolname
= kwargs
['sub_name']
218 groupname
= kwargs
['group_name']
221 with
open_volume(self
, volname
) as fs_handle
:
222 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
223 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
225 mon_map_mons
= self
.mgr
.get('mon_map')['mons']
226 for mon
in mon_map_mons
:
227 ip_port
= mon
['addr'].split("/")[0]
228 mon_addr_lst
.append(ip_port
)
230 subvol_info_dict
= subvolume
.info()
231 subvol_info_dict
["mon_addrs"] = mon_addr_lst
232 ret
= 0, json
.dumps(subvol_info_dict
, indent
=4, sort_keys
=True), ""
233 except VolumeException
as ve
:
234 ret
= self
.volume_exception_to_retval(ve
)
237 def list_subvolumes(self
, **kwargs
):
239 volname
= kwargs
['vol_name']
240 groupname
= kwargs
['group_name']
243 with
open_volume(self
, volname
) as fs_handle
:
244 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
245 subvolumes
= group
.list_subvolumes()
246 ret
= 0, name_to_json(subvolumes
), ""
247 except VolumeException
as ve
:
248 ret
= self
.volume_exception_to_retval(ve
)
251 ### subvolume snapshot
253 def create_subvolume_snapshot(self
, **kwargs
):
255 volname
= kwargs
['vol_name']
256 subvolname
= kwargs
['sub_name']
257 snapname
= kwargs
['snap_name']
258 groupname
= kwargs
['group_name']
261 with
open_volume(self
, volname
) as fs_handle
:
262 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
263 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
264 subvolume
.create_snapshot(snapname
)
265 except VolumeException
as ve
:
266 ret
= self
.volume_exception_to_retval(ve
)
269 def remove_subvolume_snapshot(self
, **kwargs
):
271 volname
= kwargs
['vol_name']
272 subvolname
= kwargs
['sub_name']
273 snapname
= kwargs
['snap_name']
274 groupname
= kwargs
['group_name']
275 force
= kwargs
['force']
278 with
open_volume(self
, volname
) as fs_handle
:
279 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
280 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
281 subvolume
.remove_snapshot(snapname
)
282 except VolumeException
as ve
:
283 if not (ve
.errno
== -errno
.ENOENT
and force
):
284 ret
= self
.volume_exception_to_retval(ve
)
287 def list_subvolume_snapshots(self
, **kwargs
):
289 volname
= kwargs
['vol_name']
290 subvolname
= kwargs
['sub_name']
291 groupname
= kwargs
['group_name']
294 with
open_volume(self
, volname
) as fs_handle
:
295 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
296 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
297 snapshots
= subvolume
.list_snapshots()
298 ret
= 0, name_to_json(snapshots
), ""
299 except VolumeException
as ve
:
300 ret
= self
.volume_exception_to_retval(ve
)
303 def protect_subvolume_snapshot(self
, **kwargs
):
305 volname
= kwargs
['vol_name']
306 subvolname
= kwargs
['sub_name']
307 snapname
= kwargs
['snap_name']
308 groupname
= kwargs
['group_name']
311 with
open_volume(self
, volname
) as fs_handle
:
312 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
313 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
314 subvolume
.protect_snapshot(snapname
)
315 except VolumeException
as ve
:
316 ret
= self
.volume_exception_to_retval(ve
)
319 def unprotect_subvolume_snapshot(self
, **kwargs
):
321 volname
= kwargs
['vol_name']
322 subvolname
= kwargs
['sub_name']
323 snapname
= kwargs
['snap_name']
324 groupname
= kwargs
['group_name']
327 with
open_volume(self
, volname
) as fs_handle
:
328 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
329 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
330 subvolume
.unprotect_snapshot(snapname
)
331 except VolumeException
as ve
:
332 ret
= self
.volume_exception_to_retval(ve
)
335 def _prepare_clone_subvolume(self
, fs_handle
, volname
, subvolume
, snapname
, target_group
, target_subvolname
, target_pool
):
336 create_clone(fs_handle
, self
.volspec
, target_group
, target_subvolname
, target_pool
, volname
, subvolume
, snapname
)
337 with
open_subvol(fs_handle
, self
.volspec
, target_group
, target_subvolname
, need_complete
=False) as target_subvolume
:
339 subvolume
.attach_snapshot(snapname
, target_subvolume
)
340 self
.cloner
.queue_job(volname
)
341 except VolumeException
as ve
:
343 target_subvolume
.remove()
344 self
.purge_queue
.queue_job(volname
)
345 except Exception as e
:
346 log
.warn("failed to cleanup clone subvolume '{0}' ({1})".format(target_subvolname
, e
))
349 def _clone_subvolume_snapshot(self
, fs_handle
, volname
, subvolume
, **kwargs
):
350 snapname
= kwargs
['snap_name']
351 target_pool
= kwargs
['pool_layout']
352 target_subvolname
= kwargs
['target_sub_name']
353 target_groupname
= kwargs
['target_group_name']
355 if not snapname
.encode('utf-8') in subvolume
.list_snapshots():
356 raise VolumeException(-errno
.ENOENT
, "snapshot '{0}' does not exist".format(snapname
))
357 if not subvolume
.is_snapshot_protected(snapname
):
358 raise VolumeException(-errno
.EINVAL
, "snapshot '{0}' is not protected".format(snapname
))
360 # TODO: when the target group is same as source, reuse group object.
361 with
open_group(fs_handle
, self
.volspec
, target_groupname
) as target_group
:
363 with
open_subvol(fs_handle
, self
.volspec
, target_group
, target_subvolname
, need_complete
=False):
364 raise VolumeException(-errno
.EEXIST
, "subvolume '{0}' exists".format(target_subvolname
))
365 except VolumeException
as ve
:
366 if ve
.errno
== -errno
.ENOENT
:
367 self
._prepare
_clone
_subvolume
(fs_handle
, volname
, subvolume
, snapname
,
368 target_group
, target_subvolname
, target_pool
)
372 def clone_subvolume_snapshot(self
, **kwargs
):
374 volname
= kwargs
['vol_name']
375 subvolname
= kwargs
['sub_name']
376 groupname
= kwargs
['group_name']
379 with
open_volume(self
, volname
) as fs_handle
:
380 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
381 with
open_subvol(fs_handle
, self
.volspec
, group
, subvolname
) as subvolume
:
382 self
._clone
_subvolume
_snapshot
(fs_handle
, volname
, subvolume
, **kwargs
)
383 except VolumeException
as ve
:
384 ret
= self
.volume_exception_to_retval(ve
)
387 def clone_status(self
, **kwargs
):
389 volname
= kwargs
['vol_name']
390 clonename
= kwargs
['clone_name']
391 groupname
= kwargs
['group_name']
394 with
open_volume(self
, volname
) as fs_handle
:
395 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
396 with
open_subvol(fs_handle
, self
.volspec
, group
, clonename
,
397 need_complete
=False, expected_types
=["clone"]) as subvolume
:
398 ret
= 0, json
.dumps({'status' : subvolume
.status
}, indent
=2), ""
399 except VolumeException
as ve
:
400 ret
= self
.volume_exception_to_retval(ve
)
403 def clone_cancel(self
, **kwargs
):
405 volname
= kwargs
['vol_name']
406 clonename
= kwargs
['clone_name']
407 groupname
= kwargs
['group_name']
410 self
.cloner
.cancel_job(volname
, (clonename
, groupname
))
411 except VolumeException
as ve
:
412 ret
= self
.volume_exception_to_retval(ve
)
417 def create_subvolume_group(self
, **kwargs
):
419 volname
= kwargs
['vol_name']
420 groupname
= kwargs
['group_name']
421 pool
= kwargs
['pool_layout']
424 mode
= kwargs
['mode']
427 with
open_volume(self
, volname
) as fs_handle
:
429 with
open_group(fs_handle
, self
.volspec
, groupname
):
430 # idempotent creation -- valid.
432 except VolumeException
as ve
:
433 if ve
.errno
== -errno
.ENOENT
:
434 oct_mode
= octal_str_to_decimal_int(mode
)
435 create_group(fs_handle
, self
.volspec
, groupname
, pool
, oct_mode
, uid
, gid
)
438 except VolumeException
as ve
:
439 # volume does not exist or subvolume group creation failed
440 ret
= self
.volume_exception_to_retval(ve
)
443 def remove_subvolume_group(self
, **kwargs
):
445 volname
= kwargs
['vol_name']
446 groupname
= kwargs
['group_name']
447 force
= kwargs
['force']
450 with
open_volume(self
, volname
) as fs_handle
:
451 remove_group(fs_handle
, self
.volspec
, groupname
)
452 except VolumeException
as ve
:
453 if not (ve
.errno
== -errno
.ENOENT
and force
):
454 ret
= self
.volume_exception_to_retval(ve
)
457 def getpath_subvolume_group(self
, **kwargs
):
458 volname
= kwargs
['vol_name']
459 groupname
= kwargs
['group_name']
462 with
open_volume(self
, volname
) as fs_handle
:
463 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
464 return 0, group
.path
.decode('utf-8'), ""
465 except VolumeException
as ve
:
466 return self
.volume_exception_to_retval(ve
)
468 def list_subvolume_groups(self
, **kwargs
):
469 volname
= kwargs
['vol_name']
472 with
open_volume(self
, volname
) as fs_handle
:
473 groups
= listdir(fs_handle
, self
.volspec
.base_dir
)
474 ret
= 0, name_to_json(groups
), ""
475 except VolumeException
as ve
:
476 if not ve
.errno
== -errno
.ENOENT
:
477 ret
= self
.volume_exception_to_retval(ve
)
482 def create_subvolume_group_snapshot(self
, **kwargs
):
484 volname
= kwargs
['vol_name']
485 groupname
= kwargs
['group_name']
486 snapname
= kwargs
['snap_name']
489 with
open_volume(self
, volname
) as fs_handle
:
490 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
491 group
.create_snapshot(snapname
)
492 except VolumeException
as ve
:
493 ret
= self
.volume_exception_to_retval(ve
)
496 def remove_subvolume_group_snapshot(self
, **kwargs
):
498 volname
= kwargs
['vol_name']
499 groupname
= kwargs
['group_name']
500 snapname
= kwargs
['snap_name']
501 force
= kwargs
['force']
504 with
open_volume(self
, volname
) as fs_handle
:
505 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
506 group
.remove_snapshot(snapname
)
507 except VolumeException
as ve
:
508 if not (ve
.errno
== -errno
.ENOENT
and force
):
509 ret
= self
.volume_exception_to_retval(ve
)
512 def list_subvolume_group_snapshots(self
, **kwargs
):
514 volname
= kwargs
['vol_name']
515 groupname
= kwargs
['group_name']
518 with
open_volume(self
, volname
) as fs_handle
:
519 with
open_group(fs_handle
, self
.volspec
, groupname
) as group
:
520 snapshots
= group
.list_snapshots()
521 ret
= 0, name_to_json(snapshots
), ""
522 except VolumeException
as ve
:
523 ret
= self
.volume_exception_to_retval(ve
)