]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/volume.py
import 15.2.0 Octopus source
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / volume.py
1 import json
2 import errno
3 import logging
4 from threading import Event
5
6 import cephfs
7
8 from .fs_util import listdir
9
10 from .operations.volume import ConnectionPool, open_volume, create_volume, \
11 delete_volume, list_volumes
12 from .operations.group import open_group, create_group, remove_group
13 from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
14 create_clone
15
16 from .vol_spec import VolSpec
17 from .exception import VolumeException
18 from .async_cloner import Cloner
19 from .purge_queue import ThreadPoolPurgeQueueMixin
20
21 log = logging.getLogger(__name__)
22
23 def octal_str_to_decimal_int(mode):
24 try:
25 return int(mode, 8)
26 except ValueError:
27 raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
28
29 def name_to_json(names):
30 """
31 convert the list of names to json
32 """
33 namedict = []
34 for i in range(len(names)):
35 namedict.append({'name': names[i].decode('utf-8')})
36 return json.dumps(namedict, indent=4, sort_keys=True)
37
38 class VolumeClient(object):
39 def __init__(self, mgr):
40 self.mgr = mgr
41 self.stopping = Event()
42 # volume specification
43 self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
44 self.connection_pool = ConnectionPool(self.mgr)
45 # TODO: make thread pool size configurable
46 self.cloner = Cloner(self, 4)
47 self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
48 # on startup, queue purge job for available volumes to kickstart
49 # purge for leftover subvolume entries in trash. note that, if the
50 # trash directory does not exist or if there are no purge entries
51 # available for a volume, the volume is removed from the purge
52 # job list.
53 fs_map = self.mgr.get('fs_map')
54 for fs in fs_map['filesystems']:
55 self.cloner.queue_job(fs['mdsmap']['fs_name'])
56 self.purge_queue.queue_job(fs['mdsmap']['fs_name'])
57
58 def is_stopping(self):
59 return self.stopping.is_set()
60
61 def shutdown(self):
62 log.info("shutting down")
63 # first, note that we're shutting down
64 self.stopping.set()
65 # second, ask purge threads to quit
66 self.purge_queue.cancel_all_jobs()
67 # third, delete all libcephfs handles from connection pool
68 self.connection_pool.del_all_handles()
69
70 def cluster_log(self, msg, lvl=None):
71 """
72 log to cluster log with default log level as WARN.
73 """
74 if not lvl:
75 lvl = self.mgr.CLUSTER_LOG_PRIO_WARN
76 self.mgr.cluster_log("cluster", lvl, msg)
77
78 def volume_exception_to_retval(self, ve):
79 """
80 return a tuple representation from a volume exception
81 """
82 return ve.to_tuple()
83
84 ### volume operations -- create, rm, ls
85
86 def create_fs_volume(self, volname, placement):
87 if self.is_stopping():
88 return -errno.ESHUTDOWN, "", "shutdown in progress"
89 return create_volume(self.mgr, volname, placement)
90
91 def delete_fs_volume(self, volname, confirm):
92 if self.is_stopping():
93 return -errno.ESHUTDOWN, "", "shutdown in progress"
94
95 if confirm != "--yes-i-really-mean-it":
96 return -errno.EPERM, "", "WARNING: this will *PERMANENTLY DESTROY* all data " \
97 "stored in the filesystem '{0}'. If you are *ABSOLUTELY CERTAIN* " \
98 "that is what you want, re-issue the command followed by " \
99 "--yes-i-really-mean-it.".format(volname)
100
101 self.purge_queue.cancel_jobs(volname)
102 self.connection_pool.del_fs_handle(volname, wait=True)
103 return delete_volume(self.mgr, volname)
104
105 def list_fs_volumes(self):
106 if self.stopping.is_set():
107 return -errno.ESHUTDOWN, "", "shutdown in progress"
108 volumes = list_volumes(self.mgr)
109 return 0, json.dumps(volumes, indent=4, sort_keys=True), ""
110
111 ### subvolume operations
112
113 def _create_subvolume(self, fs_handle, volname, group, subvolname, **kwargs):
114 size = kwargs['size']
115 pool = kwargs['pool_layout']
116 uid = kwargs['uid']
117 gid = kwargs['gid']
118 mode = kwargs['mode']
119
120 oct_mode = octal_str_to_decimal_int(mode)
121 try:
122 create_subvol(
123 fs_handle, self.volspec, group, subvolname, size, False, pool, oct_mode, uid, gid)
124 except VolumeException as ve:
125 # kick the purge threads for async removal -- note that this
126 # assumes that the subvolume is moved to trashcan for cleanup on error.
127 self.purge_queue.queue_job(volname)
128 raise ve
129
130 def create_subvolume(self, **kwargs):
131 ret = 0, "", ""
132 volname = kwargs['vol_name']
133 subvolname = kwargs['sub_name']
134 groupname = kwargs['group_name']
135
136 try:
137 with open_volume(self, volname) as fs_handle:
138 with open_group(fs_handle, self.volspec, groupname) as group:
139 try:
140 with open_subvol(fs_handle, self.volspec, group, subvolname):
141 # idempotent creation -- valid.
142 pass
143 except VolumeException as ve:
144 if ve.errno == -errno.ENOENT:
145 self._create_subvolume(fs_handle, volname, group, subvolname, **kwargs)
146 else:
147 raise
148 except VolumeException as ve:
149 # volume/group does not exist or subvolume creation failed
150 ret = self.volume_exception_to_retval(ve)
151 return ret
152
153 def remove_subvolume(self, **kwargs):
154 ret = 0, "", ""
155 volname = kwargs['vol_name']
156 subvolname = kwargs['sub_name']
157 groupname = kwargs['group_name']
158 force = kwargs['force']
159
160 try:
161 with open_volume(self, volname) as fs_handle:
162 with open_group(fs_handle, self.volspec, groupname) as group:
163 remove_subvol(fs_handle, self.volspec, group, subvolname, force)
164 # kick the purge threads for async removal -- note that this
165 # assumes that the subvolume is moved to trash can.
166 # TODO: make purge queue as singleton so that trash can kicks
167 # the purge threads on dump.
168 self.purge_queue.queue_job(volname)
169 except VolumeException as ve:
170 if ve.errno == -errno.EAGAIN:
171 ve = VolumeException(ve.errno, ve.error_str + " (use --force to override)")
172 ret = self.volume_exception_to_retval(ve)
173 elif not (ve.errno == -errno.ENOENT and force):
174 ret = self.volume_exception_to_retval(ve)
175 return ret
176
177 def resize_subvolume(self, **kwargs):
178 ret = 0, "", ""
179 volname = kwargs['vol_name']
180 subvolname = kwargs['sub_name']
181 newsize = kwargs['new_size']
182 noshrink = kwargs['no_shrink']
183 groupname = kwargs['group_name']
184
185 try:
186 with open_volume(self, volname) as fs_handle:
187 with open_group(fs_handle, self.volspec, groupname) as group:
188 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
189 nsize, usedbytes = subvolume.resize(newsize, noshrink)
190 ret = 0, json.dumps(
191 [{'bytes_used': usedbytes},{'bytes_quota': nsize},
192 {'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0)}],
193 indent=4, sort_keys=True), ""
194 except VolumeException as ve:
195 ret = self.volume_exception_to_retval(ve)
196 return ret
197
198 def subvolume_getpath(self, **kwargs):
199 ret = None
200 volname = kwargs['vol_name']
201 subvolname = kwargs['sub_name']
202 groupname = kwargs['group_name']
203
204 try:
205 with open_volume(self, volname) as fs_handle:
206 with open_group(fs_handle, self.volspec, groupname) as group:
207 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
208 subvolpath = subvolume.path
209 ret = 0, subvolpath.decode("utf-8"), ""
210 except VolumeException as ve:
211 ret = self.volume_exception_to_retval(ve)
212 return ret
213
214 def list_subvolumes(self, **kwargs):
215 ret = 0, "", ""
216 volname = kwargs['vol_name']
217 groupname = kwargs['group_name']
218
219 try:
220 with open_volume(self, volname) as fs_handle:
221 with open_group(fs_handle, self.volspec, groupname) as group:
222 subvolumes = group.list_subvolumes()
223 ret = 0, name_to_json(subvolumes), ""
224 except VolumeException as ve:
225 ret = self.volume_exception_to_retval(ve)
226 return ret
227
228 ### subvolume snapshot
229
230 def create_subvolume_snapshot(self, **kwargs):
231 ret = 0, "", ""
232 volname = kwargs['vol_name']
233 subvolname = kwargs['sub_name']
234 snapname = kwargs['snap_name']
235 groupname = kwargs['group_name']
236
237 try:
238 with open_volume(self, volname) as fs_handle:
239 with open_group(fs_handle, self.volspec, groupname) as group:
240 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
241 subvolume.create_snapshot(snapname)
242 except VolumeException as ve:
243 ret = self.volume_exception_to_retval(ve)
244 return ret
245
246 def remove_subvolume_snapshot(self, **kwargs):
247 ret = 0, "", ""
248 volname = kwargs['vol_name']
249 subvolname = kwargs['sub_name']
250 snapname = kwargs['snap_name']
251 groupname = kwargs['group_name']
252 force = kwargs['force']
253
254 try:
255 with open_volume(self, volname) as fs_handle:
256 with open_group(fs_handle, self.volspec, groupname) as group:
257 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
258 subvolume.remove_snapshot(snapname)
259 except VolumeException as ve:
260 if not (ve.errno == -errno.ENOENT and force):
261 ret = self.volume_exception_to_retval(ve)
262 return ret
263
264 def list_subvolume_snapshots(self, **kwargs):
265 ret = 0, "", ""
266 volname = kwargs['vol_name']
267 subvolname = kwargs['sub_name']
268 groupname = kwargs['group_name']
269
270 try:
271 with open_volume(self, volname) as fs_handle:
272 with open_group(fs_handle, self.volspec, groupname) as group:
273 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
274 snapshots = subvolume.list_snapshots()
275 ret = 0, name_to_json(snapshots), ""
276 except VolumeException as ve:
277 ret = self.volume_exception_to_retval(ve)
278 return ret
279
280 def protect_subvolume_snapshot(self, **kwargs):
281 ret = 0, "", ""
282 volname = kwargs['vol_name']
283 subvolname = kwargs['sub_name']
284 snapname = kwargs['snap_name']
285 groupname = kwargs['group_name']
286
287 try:
288 with open_volume(self, volname) as fs_handle:
289 with open_group(fs_handle, self.volspec, groupname) as group:
290 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
291 subvolume.protect_snapshot(snapname)
292 except VolumeException as ve:
293 ret = self.volume_exception_to_retval(ve)
294 return ret
295
296 def unprotect_subvolume_snapshot(self, **kwargs):
297 ret = 0, "", ""
298 volname = kwargs['vol_name']
299 subvolname = kwargs['sub_name']
300 snapname = kwargs['snap_name']
301 groupname = kwargs['group_name']
302
303 try:
304 with open_volume(self, volname) as fs_handle:
305 with open_group(fs_handle, self.volspec, groupname) as group:
306 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
307 subvolume.unprotect_snapshot(snapname)
308 except VolumeException as ve:
309 ret = self.volume_exception_to_retval(ve)
310 return ret
311
312 def _prepare_clone_subvolume(self, fs_handle, volname, subvolume, snapname, target_group, target_subvolname, target_pool):
313 create_clone(fs_handle, self.volspec, target_group, target_subvolname, target_pool, volname, subvolume, snapname)
314 with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, need_complete=False) as target_subvolume:
315 try:
316 subvolume.attach_snapshot(snapname, target_subvolume)
317 self.cloner.queue_job(volname)
318 except VolumeException as ve:
319 try:
320 target_subvolume.remove()
321 self.purge_queue.queue_job(volname)
322 except Exception as e:
323 log.warn("failed to cleanup clone subvolume '{0}' ({1})".format(target_subvolname, e))
324 raise ve
325
326 def _clone_subvolume_snapshot(self, fs_handle, volname, subvolume, **kwargs):
327 snapname = kwargs['snap_name']
328 target_pool = kwargs['pool_layout']
329 target_subvolname = kwargs['target_sub_name']
330 target_groupname = kwargs['target_group_name']
331
332 if not snapname.encode('utf-8') in subvolume.list_snapshots():
333 raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
334 if not subvolume.is_snapshot_protected(snapname):
335 raise VolumeException(-errno.EINVAL, "snapshot '{0}' is not protected".format(snapname))
336
337 # TODO: when the target group is same as source, reuse group object.
338 with open_group(fs_handle, self.volspec, target_groupname) as target_group:
339 try:
340 with open_subvol(fs_handle, self.volspec, target_group, target_subvolname, need_complete=False):
341 raise VolumeException(-errno.EEXIST, "subvolume '{0}' exists".format(target_subvolname))
342 except VolumeException as ve:
343 if ve.errno == -errno.ENOENT:
344 self._prepare_clone_subvolume(fs_handle, volname, subvolume, snapname,
345 target_group, target_subvolname, target_pool)
346 else:
347 raise
348
349 def clone_subvolume_snapshot(self, **kwargs):
350 ret = 0, "", ""
351 volname = kwargs['vol_name']
352 subvolname = kwargs['sub_name']
353 groupname = kwargs['group_name']
354
355 try:
356 with open_volume(self, volname) as fs_handle:
357 with open_group(fs_handle, self.volspec, groupname) as group:
358 with open_subvol(fs_handle, self.volspec, group, subvolname) as subvolume:
359 self._clone_subvolume_snapshot(fs_handle, volname, subvolume, **kwargs)
360 except VolumeException as ve:
361 ret = self.volume_exception_to_retval(ve)
362 return ret
363
364 def clone_status(self, **kwargs):
365 ret = 0, "", ""
366 volname = kwargs['vol_name']
367 clonename = kwargs['clone_name']
368 groupname = kwargs['group_name']
369
370 try:
371 with open_volume(self, volname) as fs_handle:
372 with open_group(fs_handle, self.volspec, groupname) as group:
373 with open_subvol(fs_handle, self.volspec, group, clonename,
374 need_complete=False, expected_types=["clone"]) as subvolume:
375 ret = 0, json.dumps({'status' : subvolume.status}, indent=2), ""
376 except VolumeException as ve:
377 ret = self.volume_exception_to_retval(ve)
378 return ret
379
380 def clone_cancel(self, **kwargs):
381 ret = 0, "", ""
382 volname = kwargs['vol_name']
383 clonename = kwargs['clone_name']
384 groupname = kwargs['group_name']
385
386 try:
387 self.cloner.cancel_job(volname, (clonename, groupname))
388 except VolumeException as ve:
389 ret = self.volume_exception_to_retval(ve)
390 return ret
391
392 ### group operations
393
394 def create_subvolume_group(self, **kwargs):
395 ret = 0, "", ""
396 volname = kwargs['vol_name']
397 groupname = kwargs['group_name']
398 pool = kwargs['pool_layout']
399 uid = kwargs['uid']
400 gid = kwargs['gid']
401 mode = kwargs['mode']
402
403 try:
404 with open_volume(self, volname) as fs_handle:
405 try:
406 with open_group(fs_handle, self.volspec, groupname):
407 # idempotent creation -- valid.
408 pass
409 except VolumeException as ve:
410 if ve.errno == -errno.ENOENT:
411 oct_mode = octal_str_to_decimal_int(mode)
412 create_group(fs_handle, self.volspec, groupname, pool, oct_mode, uid, gid)
413 else:
414 raise
415 except VolumeException as ve:
416 # volume does not exist or subvolume group creation failed
417 ret = self.volume_exception_to_retval(ve)
418 return ret
419
420 def remove_subvolume_group(self, **kwargs):
421 ret = 0, "", ""
422 volname = kwargs['vol_name']
423 groupname = kwargs['group_name']
424 force = kwargs['force']
425
426 try:
427 with open_volume(self, volname) as fs_handle:
428 remove_group(fs_handle, self.volspec, groupname)
429 except VolumeException as ve:
430 if not (ve.errno == -errno.ENOENT and force):
431 ret = self.volume_exception_to_retval(ve)
432 return ret
433
434 def getpath_subvolume_group(self, **kwargs):
435 volname = kwargs['vol_name']
436 groupname = kwargs['group_name']
437
438 try:
439 with open_volume(self, volname) as fs_handle:
440 with open_group(fs_handle, self.volspec, groupname) as group:
441 return 0, group.path.decode('utf-8'), ""
442 except VolumeException as ve:
443 return self.volume_exception_to_retval(ve)
444
445 def list_subvolume_groups(self, **kwargs):
446 volname = kwargs['vol_name']
447 ret = 0, '[]', ""
448 try:
449 with open_volume(self, volname) as fs_handle:
450 groups = listdir(fs_handle, self.volspec.base_dir)
451 ret = 0, name_to_json(groups), ""
452 except VolumeException as ve:
453 if not ve.errno == -errno.ENOENT:
454 ret = self.volume_exception_to_retval(ve)
455 return ret
456
457 ### group snapshot
458
459 def create_subvolume_group_snapshot(self, **kwargs):
460 ret = 0, "", ""
461 volname = kwargs['vol_name']
462 groupname = kwargs['group_name']
463 snapname = kwargs['snap_name']
464
465 try:
466 with open_volume(self, volname) as fs_handle:
467 with open_group(fs_handle, self.volspec, groupname) as group:
468 group.create_snapshot(snapname)
469 except VolumeException as ve:
470 ret = self.volume_exception_to_retval(ve)
471 return ret
472
473 def remove_subvolume_group_snapshot(self, **kwargs):
474 ret = 0, "", ""
475 volname = kwargs['vol_name']
476 groupname = kwargs['group_name']
477 snapname = kwargs['snap_name']
478 force = kwargs['force']
479
480 try:
481 with open_volume(self, volname) as fs_handle:
482 with open_group(fs_handle, self.volspec, groupname) as group:
483 group.remove_snapshot(snapname)
484 except VolumeException as ve:
485 if not (ve.errno == -errno.ENOENT and force):
486 ret = self.volume_exception_to_retval(ve)
487 return ret
488
489 def list_subvolume_group_snapshots(self, **kwargs):
490 ret = 0, "", ""
491 volname = kwargs['vol_name']
492 groupname = kwargs['group_name']
493
494 try:
495 with open_volume(self, volname) as fs_handle:
496 with open_group(fs_handle, self.volspec, groupname) as group:
497 snapshots = group.list_snapshots()
498 ret = 0, name_to_json(snapshots), ""
499 except VolumeException as ve:
500 ret = self.volume_exception_to_retval(ve)
501 return ret