]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/volume.py
import 15.2.4
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / volume.py
1 import json
2 import errno
3 import logging
4 from threading import Event
5
6 import cephfs
7
8 from .fs_util import listdir
9
10 from .operations.volume import ConnectionPool, open_volume, create_volume, \
11 delete_volume, list_volumes
12 from .operations.group import open_group, create_group, remove_group
13 from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
14 create_clone
15
16 from .vol_spec import VolSpec
17 from .exception import VolumeException
18 from .async_cloner import Cloner
19 from .purge_queue import ThreadPoolPurgeQueueMixin
20
21 log = logging.getLogger(__name__)
22
23 def octal_str_to_decimal_int(mode):
24 try:
25 return int(mode, 8)
26 except ValueError:
27 raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
28
29 def name_to_json(names):
30 """
31 convert the list of names to json
32 """
33 namedict = []
34 for i in range(len(names)):
35 namedict.append({'name': names[i].decode('utf-8')})
36 return json.dumps(namedict, indent=4, sort_keys=True)
37
38 class VolumeClient(object):
39 def __init__(self, mgr):
40 self.mgr = mgr
41 self.stopping = Event()
42 # volume specification
43 self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
44 self.connection_pool = ConnectionPool(self.mgr)
45 # TODO: make thread pool size configurable
46 self.cloner = Cloner(self, 4)
47 self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
48 # on startup, queue purge job for available volumes to kickstart
49 # purge for leftover subvolume entries in trash. note that, if the
50 # trash directory does not exist or if there are no purge entries
51 # available for a volume, the volume is removed from the purge
52 # job list.
53 fs_map = self.mgr.get('fs_map')
54 for fs in fs_map['filesystems']:
55 self.cloner.queue_job(fs['mdsmap']['fs_name'])
56 self.purge_queue.queue_job(fs['mdsmap']['fs_name'])
57
58 def is_stopping(self):
59 return self.stopping.is_set()
60
61 def shutdown(self):
62 log.info("shutting down")
63 # first, note that we're shutting down
64 self.stopping.set()
65 # second, ask purge threads to quit
66 self.purge_queue.cancel_all_jobs()
67 # third, delete all libcephfs handles from connection pool
68 self.connection_pool.del_all_handles()
69
70 def cluster_log(self, msg, lvl=None):
71 """
72 log to cluster log with default log level as WARN.
73 """
74 if not lvl:
75 lvl = self.mgr.CLUSTER_LOG_PRIO_WARN
76 self.mgr.cluster_log("cluster", lvl, msg)
77
78 def volume_exception_to_retval(self, ve):
79 """
80 return a tuple representation from a volume exception
81 """
82 return ve.to_tuple()
83
84 ### volume operations -- create, rm, ls
85
86 def create_fs_volume(self, volname, placement):
87 if self.is_stopping():
88 return -errno.ESHUTDOWN, "", "shutdown in progress"
89 return create_volume(self.mgr, volname, placement)
90
91 def delete_fs_volume(self, volname, confirm):
92 if self.is_stopping():
93 return -errno.ESHUTDOWN, "", "shutdown in progress"
94
95 if confirm != "--yes-i-really-mean-it":
96 return -errno.EPERM, "", "WARNING: this will *PERMANENTLY DESTROY* all data " \
97 "stored in the filesystem '{0}'. If you are *ABSOLUTELY CERTAIN* " \
98 "that is what you want, re-issue the command followed by " \
99 "--yes-i-really-mean-it.".format(volname)
100
101 self.purge_queue.cancel_jobs(volname)
102 self.connection_pool.del_fs_handle(volname, wait=True)
103 return delete_volume(self.mgr, volname)
104
105 def list_fs_volumes(self):
106 if self.stopping.is_set():
107 return -errno.ESHUTDOWN, "", "shutdown in progress"
108 volumes = list_volumes(self.mgr)
109 return 0, json.dumps(volumes, indent=4, sort_keys=True), ""
110
111 ### subvolume operations
112
113 def _create_subvolume(self, fs_handle, volname, group, subvolname, **kwargs):
114 size = kwargs['size']
115 pool = kwargs['pool_layout']
116 uid = kwargs['uid']
117 gid = kwargs['gid']
118 mode = kwargs['mode']
119 isolate_nspace = kwargs['namespace_isolated']
120
121 oct_mode = octal_str_to_decimal_int(mode)
122 try:
123 create_subvol(
124 fs_handle, self.volspec, group, subvolname, size, isolate_nspace, pool, oct_mode, uid, gid)
125 except VolumeException as ve:
126 # kick the purge threads for async removal -- note that this
127 # assumes that the subvolume is moved to trashcan for cleanup on error.
128 self.purge_queue.queue_job(volname)
129 raise ve
130
131 def create_subvolume(self, **kwargs):
132 ret = 0, "", ""
133 volname = kwargs['vol_name']
134 subvolname = kwargs['sub_name']
135 groupname = kwargs['group_name']
136 size = kwargs['size']
137 pool = kwargs['pool_layout']
138 uid = kwargs['uid']
139 gid = kwargs['gid']
140 isolate_nspace = kwargs['namespace_isolated']
141
142 try:
143 with open_volume(self, volname) as fs_handle:
144 with open_group(fs_handle, self.volspec, groupname) as group:
145 try:
146 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
147 # idempotent creation -- valid. Attributes set is supported.
148 uid = uid if uid else subvolume.uid
149 gid = gid if gid else subvolume.gid
150 subvolume.set_attrs(subvolume.path, size, isolate_nspace, pool, uid, gid)
151 except VolumeException as ve:
152 if ve.errno == -errno.ENOENT:
153 self._create_subvolume(fs_handle, volname, group, subvolname, **kwargs)
154 else:
155 raise
156 except VolumeException as ve:
157 # volume/group does not exist or subvolume creation failed
158 ret = self.volume_exception_to_retval(ve)
159 return ret
160
161 def remove_subvolume(self, **kwargs):
162 ret = 0, "", ""
163 volname = kwargs['vol_name']
164 subvolname = kwargs['sub_name']
165 groupname = kwargs['group_name']
166 force = kwargs['force']
167
168 try:
169 with open_volume(self, volname) as fs_handle:
170 with open_group(fs_handle, self.volspec, groupname) as group:
171 remove_subvol(fs_handle, self.volspec, group, subvolname, force)
172 # kick the purge threads for async removal -- note that this
173 # assumes that the subvolume is moved to trash can.
174 # TODO: make purge queue as singleton so that trash can kicks
175 # the purge threads on dump.
176 self.purge_queue.queue_job(volname)
177 except VolumeException as ve:
178 if ve.errno == -errno.EAGAIN:
179 ve = VolumeException(ve.errno, ve.error_str + " (use --force to override)")
180 ret = self.volume_exception_to_retval(ve)
181 elif not (ve.errno == -errno.ENOENT and force):
182 ret = self.volume_exception_to_retval(ve)
183 return ret
184
185 def resize_subvolume(self, **kwargs):
186 ret = 0, "", ""
187 volname = kwargs['vol_name']
188 subvolname = kwargs['sub_name']
189 newsize = kwargs['new_size']
190 noshrink = kwargs['no_shrink']
191 groupname = kwargs['group_name']
192
193 try:
194 with open_volume(self, volname) as fs_handle:
195 with open_group(fs_handle, self.volspec, groupname) as group:
196 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
197 nsize, usedbytes = subvolume.resize(newsize, noshrink)
198 ret = 0, json.dumps(
199 [{'bytes_used': usedbytes},{'bytes_quota': nsize},
200 {'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0)}],
201 indent=4, sort_keys=True), ""
202 except VolumeException as ve:
203 ret = self.volume_exception_to_retval(ve)
204 return ret
205
206 def subvolume_getpath(self, **kwargs):
207 ret = None
208 volname = kwargs['vol_name']
209 subvolname = kwargs['sub_name']
210 groupname = kwargs['group_name']
211
212 try:
213 with open_volume(self, volname) as fs_handle:
214 with open_group(fs_handle, self.volspec, groupname) as group:
215 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
216 subvolpath = subvolume.path
217 ret = 0, subvolpath.decode("utf-8"), ""
218 except VolumeException as ve:
219 ret = self.volume_exception_to_retval(ve)
220 return ret
221
222 def subvolume_info(self, **kwargs):
223 ret = None
224 volname = kwargs['vol_name']
225 subvolname = kwargs['sub_name']
226 groupname = kwargs['group_name']
227
228 try:
229 with open_volume(self, volname) as fs_handle:
230 with open_group(fs_handle, self.volspec, groupname) as group:
231 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
232 mon_addr_lst = []
233 mon_map_mons = self.mgr.get('mon_map')['mons']
234 for mon in mon_map_mons:
235 ip_port = mon['addr'].split("/")[0]
236 mon_addr_lst.append(ip_port)
237
238 subvol_info_dict = subvolume.info()
239 subvol_info_dict["mon_addrs"] = mon_addr_lst
240 ret = 0, json.dumps(subvol_info_dict, indent=4, sort_keys=True), ""
241 except VolumeException as ve:
242 ret = self.volume_exception_to_retval(ve)
243 return ret
244
245 def list_subvolumes(self, **kwargs):
246 ret = 0, "", ""
247 volname = kwargs['vol_name']
248 groupname = kwargs['group_name']
249
250 try:
251 with open_volume(self, volname) as fs_handle:
252 with open_group(fs_handle, self.volspec, groupname) as group:
253 subvolumes = group.list_subvolumes()
254 ret = 0, name_to_json(subvolumes), ""
255 except VolumeException as ve:
256 ret = self.volume_exception_to_retval(ve)
257 return ret
258
259 ### subvolume snapshot
260
261 def create_subvolume_snapshot(self, **kwargs):
262 ret = 0, "", ""
263 volname = kwargs['vol_name']
264 subvolname = kwargs['sub_name']
265 snapname = kwargs['snap_name']
266 groupname = kwargs['group_name']
267
268 try:
269 with open_volume(self, volname) as fs_handle:
270 with open_group(fs_handle, self.volspec, groupname) as group:
271 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
272 subvolume.create_snapshot(snapname)
273 except VolumeException as ve:
274 ret = self.volume_exception_to_retval(ve)
275 return ret
276
277 def remove_subvolume_snapshot(self, **kwargs):
278 ret = 0, "", ""
279 volname = kwargs['vol_name']
280 subvolname = kwargs['sub_name']
281 snapname = kwargs['snap_name']
282 groupname = kwargs['group_name']
283 force = kwargs['force']
284
285 try:
286 with open_volume(self, volname) as fs_handle:
287 with open_group(fs_handle, self.volspec, groupname) as group:
288 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
289 subvolume.remove_snapshot(snapname)
290 except VolumeException as ve:
291 if not (ve.errno == -errno.ENOENT and force):
292 ret = self.volume_exception_to_retval(ve)
293 return ret
294
295 def subvolume_snapshot_info(self, **kwargs):
296 ret = 0, "", ""
297 volname = kwargs['vol_name']
298 subvolname = kwargs['sub_name']
299 snapname = kwargs['snap_name']
300 groupname = kwargs['group_name']
301
302 try:
303 with open_volume(self, volname) as fs_handle:
304 with open_group(fs_handle, self.volspec, groupname) as group:
305 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
306 snap_info_dict = subvolume.snapshot_info(snapname)
307 ret = 0, json.dumps(snap_info_dict, indent=4, sort_keys=True), ""
308 except VolumeException as ve:
309 ret = self.volume_exception_to_retval(ve)
310 return ret
311
312 def list_subvolume_snapshots(self, **kwargs):
313 ret = 0, "", ""
314 volname = kwargs['vol_name']
315 subvolname = kwargs['sub_name']
316 groupname = kwargs['group_name']
317
318 try:
319 with open_volume(self, volname) as fs_handle:
320 with open_group(fs_handle, self.volspec, groupname) as group:
321 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
322 snapshots = subvolume.list_snapshots()
323 ret = 0, name_to_json(snapshots), ""
324 except VolumeException as ve:
325 ret = self.volume_exception_to_retval(ve)
326 return ret
327
328 def protect_subvolume_snapshot(self, **kwargs):
329 ret = 0, "", ""
330 volname = kwargs['vol_name']
331 subvolname = kwargs['sub_name']
332 snapname = kwargs['snap_name']
333 groupname = kwargs['group_name']
334
335 try:
336 with open_volume(self, volname) as fs_handle:
337 with open_group(fs_handle, self.volspec, groupname) as group:
338 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
339 subvolume.protect_snapshot(snapname)
340 except VolumeException as ve:
341 ret = self.volume_exception_to_retval(ve)
342 return ret
343
344 def unprotect_subvolume_snapshot(self, **kwargs):
345 ret = 0, "", ""
346 volname = kwargs['vol_name']
347 subvolname = kwargs['sub_name']
348 snapname = kwargs['snap_name']
349 groupname = kwargs['group_name']
350
351 try:
352 with open_volume(self, volname) as fs_handle:
353 with open_group(fs_handle, self.volspec, groupname) as group:
354 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
355 subvolume.unprotect_snapshot(snapname)
356 except VolumeException as ve:
357 ret = self.volume_exception_to_retval(ve)
358 return ret
359
360 def _prepare_clone_subvolume(self, fs_handle, volname, subvolume, snapname, target_group, target_subvolname, target_pool):
361 create_clone(fs_handle, self.volspec, target_group, target_subvolname, target_pool, volname, subvolume, snapname)
362 with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, need_complete=False) as target_subvolume:
363 try:
364 subvolume.attach_snapshot(snapname, target_subvolume)
365 self.cloner.queue_job(volname)
366 except VolumeException as ve:
367 try:
368 target_subvolume.remove()
369 self.purge_queue.queue_job(volname)
370 except Exception as e:
371 log.warning("failed to cleanup clone subvolume '{0}' ({1})".format(target_subvolname, e))
372 raise ve
373
374 def _clone_subvolume_snapshot(self, fs_handle, volname, subvolume, **kwargs):
375 snapname = kwargs['snap_name']
376 target_pool = kwargs['pool_layout']
377 target_subvolname = kwargs['target_sub_name']
378 target_groupname = kwargs['target_group_name']
379
380 if not snapname.encode('utf-8') in subvolume.list_snapshots():
381 raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
382 if not subvolume.is_snapshot_protected(snapname):
383 raise VolumeException(-errno.EINVAL, "snapshot '{0}' is not protected".format(snapname))
384
385 # TODO: when the target group is same as source, reuse group object.
386 with open_group(fs_handle, self.volspec, target_groupname) as target_group:
387 try:
388 with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, need_complete=False):
389 raise VolumeException(-errno.EEXIST, "subvolume '{0}' exists".format(target_subvolname))
390 except VolumeException as ve:
391 if ve.errno == -errno.ENOENT:
392 self._prepare_clone_subvolume(fs_handle, volname, subvolume, snapname,
393 target_group, target_subvolname, target_pool)
394 else:
395 raise
396
397 def clone_subvolume_snapshot(self, **kwargs):
398 ret = 0, "", ""
399 volname = kwargs['vol_name']
400 subvolname = kwargs['sub_name']
401 groupname = kwargs['group_name']
402
403 try:
404 with open_volume(self, volname) as fs_handle:
405 with open_group(fs_handle, self.volspec, groupname) as group:
406 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
407 self._clone_subvolume_snapshot(fs_handle, volname, subvolume, **kwargs)
408 except VolumeException as ve:
409 ret = self.volume_exception_to_retval(ve)
410 return ret
411
412 def clone_status(self, **kwargs):
413 ret = 0, "", ""
414 volname = kwargs['vol_name']
415 clonename = kwargs['clone_name']
416 groupname = kwargs['group_name']
417
418 try:
419 with open_volume(self, volname) as fs_handle:
420 with open_group(fs_handle, self.volspec, groupname) as group:
421 with open_subvol(fs_handle, self.volspec, group, clonename,
422 need_complete=False, expected_types=["clone"]) as subvolume:
423 ret = 0, json.dumps({'status' : subvolume.status}, indent=2), ""
424 except VolumeException as ve:
425 ret = self.volume_exception_to_retval(ve)
426 return ret
427
428 def clone_cancel(self, **kwargs):
429 ret = 0, "", ""
430 volname = kwargs['vol_name']
431 clonename = kwargs['clone_name']
432 groupname = kwargs['group_name']
433
434 try:
435 self.cloner.cancel_job(volname, (clonename, groupname))
436 except VolumeException as ve:
437 ret = self.volume_exception_to_retval(ve)
438 return ret
439
440 ### group operations
441
442 def create_subvolume_group(self, **kwargs):
443 ret = 0, "", ""
444 volname = kwargs['vol_name']
445 groupname = kwargs['group_name']
446 pool = kwargs['pool_layout']
447 uid = kwargs['uid']
448 gid = kwargs['gid']
449 mode = kwargs['mode']
450
451 try:
452 with open_volume(self, volname) as fs_handle:
453 try:
454 with open_group(fs_handle, self.volspec, groupname):
455 # idempotent creation -- valid.
456 pass
457 except VolumeException as ve:
458 if ve.errno == -errno.ENOENT:
459 oct_mode = octal_str_to_decimal_int(mode)
460 create_group(fs_handle, self.volspec, groupname, pool, oct_mode, uid, gid)
461 else:
462 raise
463 except VolumeException as ve:
464 # volume does not exist or subvolume group creation failed
465 ret = self.volume_exception_to_retval(ve)
466 return ret
467
468 def remove_subvolume_group(self, **kwargs):
469 ret = 0, "", ""
470 volname = kwargs['vol_name']
471 groupname = kwargs['group_name']
472 force = kwargs['force']
473
474 try:
475 with open_volume(self, volname) as fs_handle:
476 remove_group(fs_handle, self.volspec, groupname)
477 except VolumeException as ve:
478 if not (ve.errno == -errno.ENOENT and force):
479 ret = self.volume_exception_to_retval(ve)
480 return ret
481
482 def getpath_subvolume_group(self, **kwargs):
483 volname = kwargs['vol_name']
484 groupname = kwargs['group_name']
485
486 try:
487 with open_volume(self, volname) as fs_handle:
488 with open_group(fs_handle, self.volspec, groupname) as group:
489 return 0, group.path.decode('utf-8'), ""
490 except VolumeException as ve:
491 return self.volume_exception_to_retval(ve)
492
493 def list_subvolume_groups(self, **kwargs):
494 volname = kwargs['vol_name']
495 ret = 0, '[]', ""
496 try:
497 with open_volume(self, volname) as fs_handle:
498 groups = listdir(fs_handle, self.volspec.base_dir)
499 ret = 0, name_to_json(groups), ""
500 except VolumeException as ve:
501 if not ve.errno == -errno.ENOENT:
502 ret = self.volume_exception_to_retval(ve)
503 return ret
504
505 ### group snapshot
506
507 def create_subvolume_group_snapshot(self, **kwargs):
508 ret = 0, "", ""
509 volname = kwargs['vol_name']
510 groupname = kwargs['group_name']
511 snapname = kwargs['snap_name']
512
513 try:
514 with open_volume(self, volname) as fs_handle:
515 with open_group(fs_handle, self.volspec, groupname) as group:
516 group.create_snapshot(snapname)
517 except VolumeException as ve:
518 ret = self.volume_exception_to_retval(ve)
519 return ret
520
521 def remove_subvolume_group_snapshot(self, **kwargs):
522 ret = 0, "", ""
523 volname = kwargs['vol_name']
524 groupname = kwargs['group_name']
525 snapname = kwargs['snap_name']
526 force = kwargs['force']
527
528 try:
529 with open_volume(self, volname) as fs_handle:
530 with open_group(fs_handle, self.volspec, groupname) as group:
531 group.remove_snapshot(snapname)
532 except VolumeException as ve:
533 if not (ve.errno == -errno.ENOENT and force):
534 ret = self.volume_exception_to_retval(ve)
535 return ret
536
537 def list_subvolume_group_snapshots(self, **kwargs):
538 ret = 0, "", ""
539 volname = kwargs['vol_name']
540 groupname = kwargs['group_name']
541
542 try:
543 with open_volume(self, volname) as fs_handle:
544 with open_group(fs_handle, self.volspec, groupname) as group:
545 snapshots = group.list_snapshots()
546 ret = 0, name_to_json(snapshots), ""
547 except VolumeException as ve:
548 ret = self.volume_exception_to_retval(ve)
549 return ret