]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/volume.py
8edaca6def12901b414b7db66c6efeb2efbba153
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / volume.py
1 import json
2 import errno
3 import logging
4 from threading import Event
5
6 import cephfs
7
8 from .fs_util import listdir
9
10 from .operations.volume import ConnectionPool, open_volume, create_volume, \
11 delete_volume, list_volumes, get_pool_names
12 from .operations.group import open_group, create_group, remove_group
13 from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
14 create_clone
15
16 from .vol_spec import VolSpec
17 from .exception import VolumeException
18 from .async_cloner import Cloner
19 from .purge_queue import ThreadPoolPurgeQueueMixin
20
21 log = logging.getLogger(__name__)
22
23 def octal_str_to_decimal_int(mode):
24 try:
25 return int(mode, 8)
26 except ValueError:
27 raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
28
29 def name_to_json(names):
30 """
31 convert the list of names to json
32 """
33 namedict = []
34 for i in range(len(names)):
35 namedict.append({'name': names[i].decode('utf-8')})
36 return json.dumps(namedict, indent=4, sort_keys=True)
37
38 class VolumeClient(object):
39 def __init__(self, mgr):
40 self.mgr = mgr
41 self.stopping = Event()
42 # volume specification
43 self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
44 self.connection_pool = ConnectionPool(self.mgr)
45 # TODO: make thread pool size configurable
46 self.cloner = Cloner(self, 4)
47 self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
48 # on startup, queue purge job for available volumes to kickstart
49 # purge for leftover subvolume entries in trash. note that, if the
50 # trash directory does not exist or if there are no purge entries
51 # available for a volume, the volume is removed from the purge
52 # job list.
53 fs_map = self.mgr.get('fs_map')
54 for fs in fs_map['filesystems']:
55 self.cloner.queue_job(fs['mdsmap']['fs_name'])
56 self.purge_queue.queue_job(fs['mdsmap']['fs_name'])
57
58 def is_stopping(self):
59 return self.stopping.is_set()
60
61 def shutdown(self):
62 log.info("shutting down")
63 # first, note that we're shutting down
64 self.stopping.set()
65 # second, ask purge threads to quit
66 self.purge_queue.cancel_all_jobs()
67 # third, delete all libcephfs handles from connection pool
68 self.connection_pool.del_all_handles()
69
70 def cluster_log(self, msg, lvl=None):
71 """
72 log to cluster log with default log level as WARN.
73 """
74 if not lvl:
75 lvl = self.mgr.CLUSTER_LOG_PRIO_WARN
76 self.mgr.cluster_log("cluster", lvl, msg)
77
78 def volume_exception_to_retval(self, ve):
79 """
80 return a tuple representation from a volume exception
81 """
82 return ve.to_tuple()
83
84 ### volume operations -- create, rm, ls
85
86 def create_fs_volume(self, volname, placement):
87 if self.is_stopping():
88 return -errno.ESHUTDOWN, "", "shutdown in progress"
89 return create_volume(self.mgr, volname, placement)
90
91 def delete_fs_volume(self, volname, confirm):
92 if self.is_stopping():
93 return -errno.ESHUTDOWN, "", "shutdown in progress"
94
95 if confirm != "--yes-i-really-mean-it":
96 return -errno.EPERM, "", "WARNING: this will *PERMANENTLY DESTROY* all data " \
97 "stored in the filesystem '{0}'. If you are *ABSOLUTELY CERTAIN* " \
98 "that is what you want, re-issue the command followed by " \
99 "--yes-i-really-mean-it.".format(volname)
100
101 ret, out, err = self.mgr.check_mon_command({
102 'prefix': 'config get',
103 'key': 'mon_allow_pool_delete',
104 'who': 'mon',
105 'format': 'json',
106 })
107 mon_allow_pool_delete = json.loads(out)
108 if not mon_allow_pool_delete:
109 return -errno.EPERM, "", "pool deletion is disabled; you must first " \
110 "set the mon_allow_pool_delete config option to true before volumes " \
111 "can be deleted"
112
113 metadata_pool, data_pools = get_pool_names(self.mgr, volname)
114 if not metadata_pool:
115 return -errno.ENOENT, "", "volume {0} doesn't exist".format(volname)
116 self.purge_queue.cancel_jobs(volname)
117 self.connection_pool.del_fs_handle(volname, wait=True)
118 return delete_volume(self.mgr, volname, metadata_pool, data_pools)
119
120 def list_fs_volumes(self):
121 if self.stopping.is_set():
122 return -errno.ESHUTDOWN, "", "shutdown in progress"
123 volumes = list_volumes(self.mgr)
124 return 0, json.dumps(volumes, indent=4, sort_keys=True), ""
125
126 ### subvolume operations
127
128 def _create_subvolume(self, fs_handle, volname, group, subvolname, **kwargs):
129 size = kwargs['size']
130 pool = kwargs['pool_layout']
131 uid = kwargs['uid']
132 gid = kwargs['gid']
133 mode = kwargs['mode']
134 isolate_nspace = kwargs['namespace_isolated']
135
136 oct_mode = octal_str_to_decimal_int(mode)
137 try:
138 create_subvol(
139 fs_handle, self.volspec, group, subvolname, size, isolate_nspace, pool, oct_mode, uid, gid)
140 except VolumeException as ve:
141 # kick the purge threads for async removal -- note that this
142 # assumes that the subvolume is moved to trashcan for cleanup on error.
143 self.purge_queue.queue_job(volname)
144 raise ve
145
146 def create_subvolume(self, **kwargs):
147 ret = 0, "", ""
148 volname = kwargs['vol_name']
149 subvolname = kwargs['sub_name']
150 groupname = kwargs['group_name']
151 size = kwargs['size']
152 pool = kwargs['pool_layout']
153 uid = kwargs['uid']
154 gid = kwargs['gid']
155 isolate_nspace = kwargs['namespace_isolated']
156
157 try:
158 with open_volume(self, volname) as fs_handle:
159 with open_group(fs_handle, self.volspec, groupname) as group:
160 try:
161 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
162 # idempotent creation -- valid. Attributes set is supported.
163 uid = uid if uid else subvolume.uid
164 gid = gid if gid else subvolume.gid
165 subvolume.set_attrs(subvolume.path, size, isolate_nspace, pool, uid, gid)
166 except VolumeException as ve:
167 if ve.errno == -errno.ENOENT:
168 self._create_subvolume(fs_handle, volname, group, subvolname, **kwargs)
169 else:
170 raise
171 except VolumeException as ve:
172 # volume/group does not exist or subvolume creation failed
173 ret = self.volume_exception_to_retval(ve)
174 return ret
175
176 def remove_subvolume(self, **kwargs):
177 ret = 0, "", ""
178 volname = kwargs['vol_name']
179 subvolname = kwargs['sub_name']
180 groupname = kwargs['group_name']
181 force = kwargs['force']
182
183 try:
184 with open_volume(self, volname) as fs_handle:
185 with open_group(fs_handle, self.volspec, groupname) as group:
186 remove_subvol(fs_handle, self.volspec, group, subvolname, force)
187 # kick the purge threads for async removal -- note that this
188 # assumes that the subvolume is moved to trash can.
189 # TODO: make purge queue as singleton so that trash can kicks
190 # the purge threads on dump.
191 self.purge_queue.queue_job(volname)
192 except VolumeException as ve:
193 if ve.errno == -errno.EAGAIN:
194 ve = VolumeException(ve.errno, ve.error_str + " (use --force to override)")
195 ret = self.volume_exception_to_retval(ve)
196 elif not (ve.errno == -errno.ENOENT and force):
197 ret = self.volume_exception_to_retval(ve)
198 return ret
199
200 def resize_subvolume(self, **kwargs):
201 ret = 0, "", ""
202 volname = kwargs['vol_name']
203 subvolname = kwargs['sub_name']
204 newsize = kwargs['new_size']
205 noshrink = kwargs['no_shrink']
206 groupname = kwargs['group_name']
207
208 try:
209 with open_volume(self, volname) as fs_handle:
210 with open_group(fs_handle, self.volspec, groupname) as group:
211 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
212 nsize, usedbytes = subvolume.resize(newsize, noshrink)
213 ret = 0, json.dumps(
214 [{'bytes_used': usedbytes},{'bytes_quota': nsize},
215 {'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0)}],
216 indent=4, sort_keys=True), ""
217 except VolumeException as ve:
218 ret = self.volume_exception_to_retval(ve)
219 return ret
220
221 def subvolume_pin(self, **kwargs):
222 ret = 0, "", ""
223 volname = kwargs['vol_name']
224 subvolname = kwargs['sub_name']
225 pin_type = kwargs['pin_type']
226 pin_setting = kwargs['pin_setting']
227 groupname = kwargs['group_name']
228
229 try:
230 with open_volume(self, volname) as fs_handle:
231 with open_group(fs_handle, self.volspec, groupname) as group:
232 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
233 subvolume.pin(pin_type, pin_setting)
234 ret = 0, json.dumps({}), ""
235 except VolumeException as ve:
236 ret = self.volume_exception_to_retval(ve)
237 return ret
238
239 def subvolume_getpath(self, **kwargs):
240 ret = None
241 volname = kwargs['vol_name']
242 subvolname = kwargs['sub_name']
243 groupname = kwargs['group_name']
244
245 try:
246 with open_volume(self, volname) as fs_handle:
247 with open_group(fs_handle, self.volspec, groupname) as group:
248 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
249 subvolpath = subvolume.path
250 ret = 0, subvolpath.decode("utf-8"), ""
251 except VolumeException as ve:
252 ret = self.volume_exception_to_retval(ve)
253 return ret
254
255 def subvolume_info(self, **kwargs):
256 ret = None
257 volname = kwargs['vol_name']
258 subvolname = kwargs['sub_name']
259 groupname = kwargs['group_name']
260
261 try:
262 with open_volume(self, volname) as fs_handle:
263 with open_group(fs_handle, self.volspec, groupname) as group:
264 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
265 mon_addr_lst = []
266 mon_map_mons = self.mgr.get('mon_map')['mons']
267 for mon in mon_map_mons:
268 ip_port = mon['addr'].split("/")[0]
269 mon_addr_lst.append(ip_port)
270
271 subvol_info_dict = subvolume.info()
272 subvol_info_dict["mon_addrs"] = mon_addr_lst
273 ret = 0, json.dumps(subvol_info_dict, indent=4, sort_keys=True), ""
274 except VolumeException as ve:
275 ret = self.volume_exception_to_retval(ve)
276 return ret
277
278 def list_subvolumes(self, **kwargs):
279 ret = 0, "", ""
280 volname = kwargs['vol_name']
281 groupname = kwargs['group_name']
282
283 try:
284 with open_volume(self, volname) as fs_handle:
285 with open_group(fs_handle, self.volspec, groupname) as group:
286 subvolumes = group.list_subvolumes()
287 ret = 0, name_to_json(subvolumes), ""
288 except VolumeException as ve:
289 ret = self.volume_exception_to_retval(ve)
290 return ret
291
292 ### subvolume snapshot
293
294 def create_subvolume_snapshot(self, **kwargs):
295 ret = 0, "", ""
296 volname = kwargs['vol_name']
297 subvolname = kwargs['sub_name']
298 snapname = kwargs['snap_name']
299 groupname = kwargs['group_name']
300
301 try:
302 with open_volume(self, volname) as fs_handle:
303 with open_group(fs_handle, self.volspec, groupname) as group:
304 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
305 subvolume.create_snapshot(snapname)
306 except VolumeException as ve:
307 ret = self.volume_exception_to_retval(ve)
308 return ret
309
310 def remove_subvolume_snapshot(self, **kwargs):
311 ret = 0, "", ""
312 volname = kwargs['vol_name']
313 subvolname = kwargs['sub_name']
314 snapname = kwargs['snap_name']
315 groupname = kwargs['group_name']
316 force = kwargs['force']
317
318 try:
319 with open_volume(self, volname) as fs_handle:
320 with open_group(fs_handle, self.volspec, groupname) as group:
321 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
322 subvolume.remove_snapshot(snapname)
323 except VolumeException as ve:
324 if not (ve.errno == -errno.ENOENT and force):
325 ret = self.volume_exception_to_retval(ve)
326 return ret
327
328 def subvolume_snapshot_info(self, **kwargs):
329 ret = 0, "", ""
330 volname = kwargs['vol_name']
331 subvolname = kwargs['sub_name']
332 snapname = kwargs['snap_name']
333 groupname = kwargs['group_name']
334
335 try:
336 with open_volume(self, volname) as fs_handle:
337 with open_group(fs_handle, self.volspec, groupname) as group:
338 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
339 snap_info_dict = subvolume.snapshot_info(snapname)
340 ret = 0, json.dumps(snap_info_dict, indent=4, sort_keys=True), ""
341 except VolumeException as ve:
342 ret = self.volume_exception_to_retval(ve)
343 return ret
344
345 def list_subvolume_snapshots(self, **kwargs):
346 ret = 0, "", ""
347 volname = kwargs['vol_name']
348 subvolname = kwargs['sub_name']
349 groupname = kwargs['group_name']
350
351 try:
352 with open_volume(self, volname) as fs_handle:
353 with open_group(fs_handle, self.volspec, groupname) as group:
354 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
355 snapshots = subvolume.list_snapshots()
356 ret = 0, name_to_json(snapshots), ""
357 except VolumeException as ve:
358 ret = self.volume_exception_to_retval(ve)
359 return ret
360
361 def protect_subvolume_snapshot(self, **kwargs):
362 ret = 0, "", "Deprecation warning: 'snapshot protect' call is deprecated and will be removed in a future release"
363 volname = kwargs['vol_name']
364 subvolname = kwargs['sub_name']
365 groupname = kwargs['group_name']
366
367 try:
368 with open_volume(self, volname) as fs_handle:
369 with open_group(fs_handle, self.volspec, groupname) as group:
370 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
371 log.warning("snapshot protect call is deprecated and will be removed in a future release")
372 except VolumeException as ve:
373 ret = self.volume_exception_to_retval(ve)
374 return ret
375
376 def unprotect_subvolume_snapshot(self, **kwargs):
377 ret = 0, "", "Deprecation warning: 'snapshot unprotect' call is deprecated and will be removed in a future release"
378 volname = kwargs['vol_name']
379 subvolname = kwargs['sub_name']
380 groupname = kwargs['group_name']
381
382 try:
383 with open_volume(self, volname) as fs_handle:
384 with open_group(fs_handle, self.volspec, groupname) as group:
385 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
386 log.warning("snapshot unprotect call is deprecated and will be removed in a future release")
387 except VolumeException as ve:
388 ret = self.volume_exception_to_retval(ve)
389 return ret
390
391 def _prepare_clone_subvolume(self, fs_handle, volname, subvolume, snapname, target_group, target_subvolname, target_pool):
392 create_clone(fs_handle, self.volspec, target_group, target_subvolname, target_pool, volname, subvolume, snapname)
393 with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, need_complete=False) as target_subvolume:
394 try:
395 subvolume.attach_snapshot(snapname, target_subvolume)
396 self.cloner.queue_job(volname)
397 except VolumeException as ve:
398 try:
399 target_subvolume.remove()
400 self.purge_queue.queue_job(volname)
401 except Exception as e:
402 log.warning("failed to cleanup clone subvolume '{0}' ({1})".format(target_subvolname, e))
403 raise ve
404
405 def _clone_subvolume_snapshot(self, fs_handle, volname, subvolume, **kwargs):
406 snapname = kwargs['snap_name']
407 target_pool = kwargs['pool_layout']
408 target_subvolname = kwargs['target_sub_name']
409 target_groupname = kwargs['target_group_name']
410
411 if not snapname.encode('utf-8') in subvolume.list_snapshots():
412 raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
413
414 # TODO: when the target group is same as source, reuse group object.
415 with open_group(fs_handle, self.volspec, target_groupname) as target_group:
416 try:
417 with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, need_complete=False):
418 raise VolumeException(-errno.EEXIST, "subvolume '{0}' exists".format(target_subvolname))
419 except VolumeException as ve:
420 if ve.errno == -errno.ENOENT:
421 self._prepare_clone_subvolume(fs_handle, volname, subvolume, snapname,
422 target_group, target_subvolname, target_pool)
423 else:
424 raise
425
426 def clone_subvolume_snapshot(self, **kwargs):
427 ret = 0, "", ""
428 volname = kwargs['vol_name']
429 subvolname = kwargs['sub_name']
430 groupname = kwargs['group_name']
431
432 try:
433 with open_volume(self, volname) as fs_handle:
434 with open_group(fs_handle, self.volspec, groupname) as group:
435 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
436 self._clone_subvolume_snapshot(fs_handle, volname, subvolume, **kwargs)
437 except VolumeException as ve:
438 ret = self.volume_exception_to_retval(ve)
439 return ret
440
441 def clone_status(self, **kwargs):
442 ret = 0, "", ""
443 volname = kwargs['vol_name']
444 clonename = kwargs['clone_name']
445 groupname = kwargs['group_name']
446
447 try:
448 with open_volume(self, volname) as fs_handle:
449 with open_group(fs_handle, self.volspec, groupname) as group:
450 with open_subvol(fs_handle, self.volspec, group, clonename,
451 need_complete=False, expected_types=["clone"]) as subvolume:
452 ret = 0, json.dumps({'status' : subvolume.status}, indent=2), ""
453 except VolumeException as ve:
454 ret = self.volume_exception_to_retval(ve)
455 return ret
456
457 def clone_cancel(self, **kwargs):
458 ret = 0, "", ""
459 volname = kwargs['vol_name']
460 clonename = kwargs['clone_name']
461 groupname = kwargs['group_name']
462
463 try:
464 self.cloner.cancel_job(volname, (clonename, groupname))
465 except VolumeException as ve:
466 ret = self.volume_exception_to_retval(ve)
467 return ret
468
469 ### group operations
470
471 def create_subvolume_group(self, **kwargs):
472 ret = 0, "", ""
473 volname = kwargs['vol_name']
474 groupname = kwargs['group_name']
475 pool = kwargs['pool_layout']
476 uid = kwargs['uid']
477 gid = kwargs['gid']
478 mode = kwargs['mode']
479
480 try:
481 with open_volume(self, volname) as fs_handle:
482 try:
483 with open_group(fs_handle, self.volspec, groupname):
484 # idempotent creation -- valid.
485 pass
486 except VolumeException as ve:
487 if ve.errno == -errno.ENOENT:
488 oct_mode = octal_str_to_decimal_int(mode)
489 create_group(fs_handle, self.volspec, groupname, pool, oct_mode, uid, gid)
490 else:
491 raise
492 except VolumeException as ve:
493 # volume does not exist or subvolume group creation failed
494 ret = self.volume_exception_to_retval(ve)
495 return ret
496
497 def remove_subvolume_group(self, **kwargs):
498 ret = 0, "", ""
499 volname = kwargs['vol_name']
500 groupname = kwargs['group_name']
501 force = kwargs['force']
502
503 try:
504 with open_volume(self, volname) as fs_handle:
505 remove_group(fs_handle, self.volspec, groupname)
506 except VolumeException as ve:
507 if not (ve.errno == -errno.ENOENT and force):
508 ret = self.volume_exception_to_retval(ve)
509 return ret
510
511 def getpath_subvolume_group(self, **kwargs):
512 volname = kwargs['vol_name']
513 groupname = kwargs['group_name']
514
515 try:
516 with open_volume(self, volname) as fs_handle:
517 with open_group(fs_handle, self.volspec, groupname) as group:
518 return 0, group.path.decode('utf-8'), ""
519 except VolumeException as ve:
520 return self.volume_exception_to_retval(ve)
521
522 def list_subvolume_groups(self, **kwargs):
523 volname = kwargs['vol_name']
524 ret = 0, '[]', ""
525 try:
526 with open_volume(self, volname) as fs_handle:
527 groups = listdir(fs_handle, self.volspec.base_dir)
528 ret = 0, name_to_json(groups), ""
529 except VolumeException as ve:
530 if not ve.errno == -errno.ENOENT:
531 ret = self.volume_exception_to_retval(ve)
532 return ret
533
534 def pin_subvolume_group(self, **kwargs):
535 ret = 0, "", ""
536 volname = kwargs['vol_name']
537 groupname = kwargs['group_name']
538 pin_type = kwargs['pin_type']
539 pin_setting = kwargs['pin_setting']
540
541 try:
542 with open_volume(self, volname) as fs_handle:
543 with open_group(fs_handle, self.volspec, groupname) as group:
544 group.pin(pin_type, pin_setting)
545 ret = 0, json.dumps({}), ""
546 except VolumeException as ve:
547 ret = self.volume_exception_to_retval(ve)
548 return ret
549
550 ### group snapshot
551
552 def create_subvolume_group_snapshot(self, **kwargs):
553 ret = 0, "", ""
554 volname = kwargs['vol_name']
555 groupname = kwargs['group_name']
556 snapname = kwargs['snap_name']
557
558 try:
559 with open_volume(self, volname) as fs_handle:
560 with open_group(fs_handle, self.volspec, groupname) as group:
561 group.create_snapshot(snapname)
562 except VolumeException as ve:
563 ret = self.volume_exception_to_retval(ve)
564 return ret
565
566 def remove_subvolume_group_snapshot(self, **kwargs):
567 ret = 0, "", ""
568 volname = kwargs['vol_name']
569 groupname = kwargs['group_name']
570 snapname = kwargs['snap_name']
571 force = kwargs['force']
572
573 try:
574 with open_volume(self, volname) as fs_handle:
575 with open_group(fs_handle, self.volspec, groupname) as group:
576 group.remove_snapshot(snapname)
577 except VolumeException as ve:
578 if not (ve.errno == -errno.ENOENT and force):
579 ret = self.volume_exception_to_retval(ve)
580 return ret
581
582 def list_subvolume_group_snapshots(self, **kwargs):
583 ret = 0, "", ""
584 volname = kwargs['vol_name']
585 groupname = kwargs['group_name']
586
587 try:
588 with open_volume(self, volname) as fs_handle:
589 with open_group(fs_handle, self.volspec, groupname) as group:
590 snapshots = group.list_snapshots()
591 ret = 0, name_to_json(snapshots), ""
592 except VolumeException as ve:
593 ret = self.volume_exception_to_retval(ve)
594 return ret