]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/volume.py
5568d8b588dceb41f8538a3e6538db6030e27e2e
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / volume.py
1 import json
2 import errno
3 import logging
4 from threading import Event
5
6 import cephfs
7
8 from .fs_util import listdir
9
10 from .operations.volume import ConnectionPool, open_volume, create_volume, \
11 delete_volume, list_volumes
12 from .operations.group import open_group, create_group, remove_group
13 from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
14 create_clone
15
16 from .vol_spec import VolSpec
17 from .exception import VolumeException
18 from .async_cloner import Cloner
19 from .purge_queue import ThreadPoolPurgeQueueMixin
20
21 log = logging.getLogger(__name__)
22
23 def octal_str_to_decimal_int(mode):
24 try:
25 return int(mode, 8)
26 except ValueError:
27 raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
28
29 def name_to_json(names):
30 """
31 convert the list of names to json
32 """
33 namedict = []
34 for i in range(len(names)):
35 namedict.append({'name': names[i].decode('utf-8')})
36 return json.dumps(namedict, indent=4, sort_keys=True)
37
38 class VolumeClient(object):
39 def __init__(self, mgr):
40 self.mgr = mgr
41 self.stopping = Event()
42 # volume specification
43 self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
44 self.connection_pool = ConnectionPool(self.mgr)
45 # TODO: make thread pool size configurable
46 self.cloner = Cloner(self, 4)
47 self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
48 # on startup, queue purge job for available volumes to kickstart
49 # purge for leftover subvolume entries in trash. note that, if the
50 # trash directory does not exist or if there are no purge entries
51 # available for a volume, the volume is removed from the purge
52 # job list.
53 fs_map = self.mgr.get('fs_map')
54 for fs in fs_map['filesystems']:
55 self.cloner.queue_job(fs['mdsmap']['fs_name'])
56 self.purge_queue.queue_job(fs['mdsmap']['fs_name'])
57
58 def is_stopping(self):
59 return self.stopping.is_set()
60
61 def shutdown(self):
62 log.info("shutting down")
63 # first, note that we're shutting down
64 self.stopping.set()
65 # second, ask purge threads to quit
66 self.purge_queue.cancel_all_jobs()
67 # third, delete all libcephfs handles from connection pool
68 self.connection_pool.del_all_handles()
69
70 def cluster_log(self, msg, lvl=None):
71 """
72 log to cluster log with default log level as WARN.
73 """
74 if not lvl:
75 lvl = self.mgr.CLUSTER_LOG_PRIO_WARN
76 self.mgr.cluster_log("cluster", lvl, msg)
77
78 def volume_exception_to_retval(self, ve):
79 """
80 return a tuple representation from a volume exception
81 """
82 return ve.to_tuple()
83
84 ### volume operations -- create, rm, ls
85
86 def create_fs_volume(self, volname, placement):
87 if self.is_stopping():
88 return -errno.ESHUTDOWN, "", "shutdown in progress"
89 return create_volume(self.mgr, volname, placement)
90
91 def delete_fs_volume(self, volname, confirm):
92 if self.is_stopping():
93 return -errno.ESHUTDOWN, "", "shutdown in progress"
94
95 if confirm != "--yes-i-really-mean-it":
96 return -errno.EPERM, "", "WARNING: this will *PERMANENTLY DESTROY* all data " \
97 "stored in the filesystem '{0}'. If you are *ABSOLUTELY CERTAIN* " \
98 "that is what you want, re-issue the command followed by " \
99 "--yes-i-really-mean-it.".format(volname)
100
101 self.purge_queue.cancel_jobs(volname)
102 self.connection_pool.del_fs_handle(volname, wait=True)
103 return delete_volume(self.mgr, volname)
104
105 def list_fs_volumes(self):
106 if self.stopping.is_set():
107 return -errno.ESHUTDOWN, "", "shutdown in progress"
108 volumes = list_volumes(self.mgr)
109 return 0, json.dumps(volumes, indent=4, sort_keys=True), ""
110
111 ### subvolume operations
112
113 def _create_subvolume(self, fs_handle, volname, group, subvolname, **kwargs):
114 size = kwargs['size']
115 pool = kwargs['pool_layout']
116 uid = kwargs['uid']
117 gid = kwargs['gid']
118 mode = kwargs['mode']
119
120 oct_mode = octal_str_to_decimal_int(mode)
121 try:
122 create_subvol(
123 fs_handle, self.volspec, group, subvolname, size, False, pool, oct_mode, uid, gid)
124 except VolumeException as ve:
125 # kick the purge threads for async removal -- note that this
126 # assumes that the subvolume is moved to trashcan for cleanup on error.
127 self.purge_queue.queue_job(volname)
128 raise ve
129
130 def create_subvolume(self, **kwargs):
131 ret = 0, "", ""
132 volname = kwargs['vol_name']
133 subvolname = kwargs['sub_name']
134 groupname = kwargs['group_name']
135
136 try:
137 with open_volume(self, volname) as fs_handle:
138 with open_group(fs_handle, self.volspec, groupname) as group:
139 try:
140 with open_subvol(fs_handle, self.volspec, group, subvolname):
141 # idempotent creation -- valid.
142 pass
143 except VolumeException as ve:
144 if ve.errno == -errno.ENOENT:
145 self._create_subvolume(fs_handle, volname, group, subvolname, **kwargs)
146 else:
147 raise
148 except VolumeException as ve:
149 # volume/group does not exist or subvolume creation failed
150 ret = self.volume_exception_to_retval(ve)
151 return ret
152
153 def remove_subvolume(self, **kwargs):
154 ret = 0, "", ""
155 volname = kwargs['vol_name']
156 subvolname = kwargs['sub_name']
157 groupname = kwargs['group_name']
158 force = kwargs['force']
159
160 try:
161 with open_volume(self, volname) as fs_handle:
162 with open_group(fs_handle, self.volspec, groupname) as group:
163 remove_subvol(fs_handle, self.volspec, group, subvolname, force)
164 # kick the purge threads for async removal -- note that this
165 # assumes that the subvolume is moved to trash can.
166 # TODO: make purge queue as singleton so that trash can kicks
167 # the purge threads on dump.
168 self.purge_queue.queue_job(volname)
169 except VolumeException as ve:
170 if ve.errno == -errno.EAGAIN:
171 ve = VolumeException(ve.errno, ve.error_str + " (use --force to override)")
172 ret = self.volume_exception_to_retval(ve)
173 elif not (ve.errno == -errno.ENOENT and force):
174 ret = self.volume_exception_to_retval(ve)
175 return ret
176
177 def resize_subvolume(self, **kwargs):
178 ret = 0, "", ""
179 volname = kwargs['vol_name']
180 subvolname = kwargs['sub_name']
181 newsize = kwargs['new_size']
182 noshrink = kwargs['no_shrink']
183 groupname = kwargs['group_name']
184
185 try:
186 with open_volume(self, volname) as fs_handle:
187 with open_group(fs_handle, self.volspec, groupname) as group:
188 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
189 nsize, usedbytes = subvolume.resize(newsize, noshrink)
190 ret = 0, json.dumps(
191 [{'bytes_used': usedbytes},{'bytes_quota': nsize},
192 {'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0)}],
193 indent=4, sort_keys=True), ""
194 except VolumeException as ve:
195 ret = self.volume_exception_to_retval(ve)
196 return ret
197
198 def subvolume_getpath(self, **kwargs):
199 ret = None
200 volname = kwargs['vol_name']
201 subvolname = kwargs['sub_name']
202 groupname = kwargs['group_name']
203
204 try:
205 with open_volume(self, volname) as fs_handle:
206 with open_group(fs_handle, self.volspec, groupname) as group:
207 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
208 subvolpath = subvolume.path
209 ret = 0, subvolpath.decode("utf-8"), ""
210 except VolumeException as ve:
211 ret = self.volume_exception_to_retval(ve)
212 return ret
213
214 def subvolume_info(self, **kwargs):
215 ret = None
216 volname = kwargs['vol_name']
217 subvolname = kwargs['sub_name']
218 groupname = kwargs['group_name']
219
220 try:
221 with open_volume(self, volname) as fs_handle:
222 with open_group(fs_handle, self.volspec, groupname) as group:
223 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
224 mon_addr_lst = []
225 mon_map_mons = self.mgr.get('mon_map')['mons']
226 for mon in mon_map_mons:
227 ip_port = mon['addr'].split("/")[0]
228 mon_addr_lst.append(ip_port)
229
230 subvol_info_dict = subvolume.info()
231 subvol_info_dict["mon_addrs"] = mon_addr_lst
232 ret = 0, json.dumps(subvol_info_dict, indent=4, sort_keys=True), ""
233 except VolumeException as ve:
234 ret = self.volume_exception_to_retval(ve)
235 return ret
236
237 def list_subvolumes(self, **kwargs):
238 ret = 0, "", ""
239 volname = kwargs['vol_name']
240 groupname = kwargs['group_name']
241
242 try:
243 with open_volume(self, volname) as fs_handle:
244 with open_group(fs_handle, self.volspec, groupname) as group:
245 subvolumes = group.list_subvolumes()
246 ret = 0, name_to_json(subvolumes), ""
247 except VolumeException as ve:
248 ret = self.volume_exception_to_retval(ve)
249 return ret
250
251 ### subvolume snapshot
252
253 def create_subvolume_snapshot(self, **kwargs):
254 ret = 0, "", ""
255 volname = kwargs['vol_name']
256 subvolname = kwargs['sub_name']
257 snapname = kwargs['snap_name']
258 groupname = kwargs['group_name']
259
260 try:
261 with open_volume(self, volname) as fs_handle:
262 with open_group(fs_handle, self.volspec, groupname) as group:
263 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
264 subvolume.create_snapshot(snapname)
265 except VolumeException as ve:
266 ret = self.volume_exception_to_retval(ve)
267 return ret
268
269 def remove_subvolume_snapshot(self, **kwargs):
270 ret = 0, "", ""
271 volname = kwargs['vol_name']
272 subvolname = kwargs['sub_name']
273 snapname = kwargs['snap_name']
274 groupname = kwargs['group_name']
275 force = kwargs['force']
276
277 try:
278 with open_volume(self, volname) as fs_handle:
279 with open_group(fs_handle, self.volspec, groupname) as group:
280 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
281 subvolume.remove_snapshot(snapname)
282 except VolumeException as ve:
283 if not (ve.errno == -errno.ENOENT and force):
284 ret = self.volume_exception_to_retval(ve)
285 return ret
286
287 def list_subvolume_snapshots(self, **kwargs):
288 ret = 0, "", ""
289 volname = kwargs['vol_name']
290 subvolname = kwargs['sub_name']
291 groupname = kwargs['group_name']
292
293 try:
294 with open_volume(self, volname) as fs_handle:
295 with open_group(fs_handle, self.volspec, groupname) as group:
296 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
297 snapshots = subvolume.list_snapshots()
298 ret = 0, name_to_json(snapshots), ""
299 except VolumeException as ve:
300 ret = self.volume_exception_to_retval(ve)
301 return ret
302
303 def protect_subvolume_snapshot(self, **kwargs):
304 ret = 0, "", ""
305 volname = kwargs['vol_name']
306 subvolname = kwargs['sub_name']
307 snapname = kwargs['snap_name']
308 groupname = kwargs['group_name']
309
310 try:
311 with open_volume(self, volname) as fs_handle:
312 with open_group(fs_handle, self.volspec, groupname) as group:
313 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
314 subvolume.protect_snapshot(snapname)
315 except VolumeException as ve:
316 ret = self.volume_exception_to_retval(ve)
317 return ret
318
319 def unprotect_subvolume_snapshot(self, **kwargs):
320 ret = 0, "", ""
321 volname = kwargs['vol_name']
322 subvolname = kwargs['sub_name']
323 snapname = kwargs['snap_name']
324 groupname = kwargs['group_name']
325
326 try:
327 with open_volume(self, volname) as fs_handle:
328 with open_group(fs_handle, self.volspec, groupname) as group:
329 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
330 subvolume.unprotect_snapshot(snapname)
331 except VolumeException as ve:
332 ret = self.volume_exception_to_retval(ve)
333 return ret
334
335 def _prepare_clone_subvolume(self, fs_handle, volname, subvolume, snapname, target_group, target_subvolname, target_pool):
336 create_clone(fs_handle, self.volspec, target_group, target_subvolname, target_pool, volname, subvolume, snapname)
337 with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, need_complete=False) as target_subvolume:
338 try:
339 subvolume.attach_snapshot(snapname, target_subvolume)
340 self.cloner.queue_job(volname)
341 except VolumeException as ve:
342 try:
343 target_subvolume.remove()
344 self.purge_queue.queue_job(volname)
345 except Exception as e:
346 log.warn("failed to cleanup clone subvolume '{0}' ({1})".format(target_subvolname, e))
347 raise ve
348
349 def _clone_subvolume_snapshot(self, fs_handle, volname, subvolume, **kwargs):
350 snapname = kwargs['snap_name']
351 target_pool = kwargs['pool_layout']
352 target_subvolname = kwargs['target_sub_name']
353 target_groupname = kwargs['target_group_name']
354
355 if not snapname.encode('utf-8') in subvolume.list_snapshots():
356 raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
357 if not subvolume.is_snapshot_protected(snapname):
358 raise VolumeException(-errno.EINVAL, "snapshot '{0}' is not protected".format(snapname))
359
360 # TODO: when the target group is same as source, reuse group object.
361 with open_group(fs_handle, self.volspec, target_groupname) as target_group:
362 try:
363 with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, need_complete=False):
364 raise VolumeException(-errno.EEXIST, "subvolume '{0}' exists".format(target_subvolname))
365 except VolumeException as ve:
366 if ve.errno == -errno.ENOENT:
367 self._prepare_clone_subvolume(fs_handle, volname, subvolume, snapname,
368 target_group, target_subvolname, target_pool)
369 else:
370 raise
371
372 def clone_subvolume_snapshot(self, **kwargs):
373 ret = 0, "", ""
374 volname = kwargs['vol_name']
375 subvolname = kwargs['sub_name']
376 groupname = kwargs['group_name']
377
378 try:
379 with open_volume(self, volname) as fs_handle:
380 with open_group(fs_handle, self.volspec, groupname) as group:
381 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
382 self._clone_subvolume_snapshot(fs_handle, volname, subvolume, **kwargs)
383 except VolumeException as ve:
384 ret = self.volume_exception_to_retval(ve)
385 return ret
386
387 def clone_status(self, **kwargs):
388 ret = 0, "", ""
389 volname = kwargs['vol_name']
390 clonename = kwargs['clone_name']
391 groupname = kwargs['group_name']
392
393 try:
394 with open_volume(self, volname) as fs_handle:
395 with open_group(fs_handle, self.volspec, groupname) as group:
396 with open_subvol(fs_handle, self.volspec, group, clonename,
397 need_complete=False, expected_types=["clone"]) as subvolume:
398 ret = 0, json.dumps({'status' : subvolume.status}, indent=2), ""
399 except VolumeException as ve:
400 ret = self.volume_exception_to_retval(ve)
401 return ret
402
403 def clone_cancel(self, **kwargs):
404 ret = 0, "", ""
405 volname = kwargs['vol_name']
406 clonename = kwargs['clone_name']
407 groupname = kwargs['group_name']
408
409 try:
410 self.cloner.cancel_job(volname, (clonename, groupname))
411 except VolumeException as ve:
412 ret = self.volume_exception_to_retval(ve)
413 return ret
414
415 ### group operations
416
417 def create_subvolume_group(self, **kwargs):
418 ret = 0, "", ""
419 volname = kwargs['vol_name']
420 groupname = kwargs['group_name']
421 pool = kwargs['pool_layout']
422 uid = kwargs['uid']
423 gid = kwargs['gid']
424 mode = kwargs['mode']
425
426 try:
427 with open_volume(self, volname) as fs_handle:
428 try:
429 with open_group(fs_handle, self.volspec, groupname):
430 # idempotent creation -- valid.
431 pass
432 except VolumeException as ve:
433 if ve.errno == -errno.ENOENT:
434 oct_mode = octal_str_to_decimal_int(mode)
435 create_group(fs_handle, self.volspec, groupname, pool, oct_mode, uid, gid)
436 else:
437 raise
438 except VolumeException as ve:
439 # volume does not exist or subvolume group creation failed
440 ret = self.volume_exception_to_retval(ve)
441 return ret
442
443 def remove_subvolume_group(self, **kwargs):
444 ret = 0, "", ""
445 volname = kwargs['vol_name']
446 groupname = kwargs['group_name']
447 force = kwargs['force']
448
449 try:
450 with open_volume(self, volname) as fs_handle:
451 remove_group(fs_handle, self.volspec, groupname)
452 except VolumeException as ve:
453 if not (ve.errno == -errno.ENOENT and force):
454 ret = self.volume_exception_to_retval(ve)
455 return ret
456
457 def getpath_subvolume_group(self, **kwargs):
458 volname = kwargs['vol_name']
459 groupname = kwargs['group_name']
460
461 try:
462 with open_volume(self, volname) as fs_handle:
463 with open_group(fs_handle, self.volspec, groupname) as group:
464 return 0, group.path.decode('utf-8'), ""
465 except VolumeException as ve:
466 return self.volume_exception_to_retval(ve)
467
468 def list_subvolume_groups(self, **kwargs):
469 volname = kwargs['vol_name']
470 ret = 0, '[]', ""
471 try:
472 with open_volume(self, volname) as fs_handle:
473 groups = listdir(fs_handle, self.volspec.base_dir)
474 ret = 0, name_to_json(groups), ""
475 except VolumeException as ve:
476 if not ve.errno == -errno.ENOENT:
477 ret = self.volume_exception_to_retval(ve)
478 return ret
479
480 ### group snapshot
481
482 def create_subvolume_group_snapshot(self, **kwargs):
483 ret = 0, "", ""
484 volname = kwargs['vol_name']
485 groupname = kwargs['group_name']
486 snapname = kwargs['snap_name']
487
488 try:
489 with open_volume(self, volname) as fs_handle:
490 with open_group(fs_handle, self.volspec, groupname) as group:
491 group.create_snapshot(snapname)
492 except VolumeException as ve:
493 ret = self.volume_exception_to_retval(ve)
494 return ret
495
496 def remove_subvolume_group_snapshot(self, **kwargs):
497 ret = 0, "", ""
498 volname = kwargs['vol_name']
499 groupname = kwargs['group_name']
500 snapname = kwargs['snap_name']
501 force = kwargs['force']
502
503 try:
504 with open_volume(self, volname) as fs_handle:
505 with open_group(fs_handle, self.volspec, groupname) as group:
506 group.remove_snapshot(snapname)
507 except VolumeException as ve:
508 if not (ve.errno == -errno.ENOENT and force):
509 ret = self.volume_exception_to_retval(ve)
510 return ret
511
512 def list_subvolume_group_snapshots(self, **kwargs):
513 ret = 0, "", ""
514 volname = kwargs['vol_name']
515 groupname = kwargs['group_name']
516
517 try:
518 with open_volume(self, volname) as fs_handle:
519 with open_group(fs_handle, self.volspec, groupname) as group:
520 snapshots = group.list_snapshots()
521 ret = 0, name_to_json(snapshots), ""
522 except VolumeException as ve:
523 ret = self.volume_exception_to_retval(ve)
524 return ret