]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/volume.py
335d76a809ab13db72128cd83eabbf75b10c589e
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / volume.py
1 import json
2 import errno
3 import logging
4 import os
5 from typing import TYPE_CHECKING
6
7 import cephfs
8
9 from mgr_util import CephfsClient
10
11 from .fs_util import listdir, has_subdir
12
13 from .operations.group import open_group, create_group, remove_group, \
14 open_group_unique, set_group_attrs
15 from .operations.volume import create_volume, delete_volume, rename_volume, \
16 list_volumes, open_volume, get_pool_names, get_pool_ids, get_pending_subvol_deletions_count
17 from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
18 create_clone
19 from .operations.trash import Trash
20
21 from .vol_spec import VolSpec
22 from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError
23 from .async_cloner import Cloner
24 from .purge_queue import ThreadPoolPurgeQueueMixin
25 from .operations.template import SubvolumeOpType
26
27 if TYPE_CHECKING:
28 from volumes import Module
29
30 log = logging.getLogger(__name__)
31
32 ALLOWED_ACCESS_LEVELS = ('r', 'rw')
33
34
35 def octal_str_to_decimal_int(mode):
36 try:
37 return int(mode, 8)
38 except ValueError:
39 raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
40
41
42 def name_to_json(names):
43 """
44 convert the list of names to json
45 """
46 namedict = []
47 for i in range(len(names)):
48 namedict.append({'name': names[i].decode('utf-8')})
49 return json.dumps(namedict, indent=4, sort_keys=True)
50
51
52 class VolumeClient(CephfsClient["Module"]):
53 def __init__(self, mgr):
54 super().__init__(mgr)
55 # volume specification
56 self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
57 self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay)
58 self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
59 # on startup, queue purge job for available volumes to kickstart
60 # purge for leftover subvolume entries in trash. note that, if the
61 # trash directory does not exist or if there are no purge entries
62 # available for a volume, the volume is removed from the purge
63 # job list.
64 fs_map = self.mgr.get('fs_map')
65 for fs in fs_map['filesystems']:
66 self.cloner.queue_job(fs['mdsmap']['fs_name'])
67 self.purge_queue.queue_job(fs['mdsmap']['fs_name'])
68
69 def shutdown(self):
70 # Overrides CephfsClient.shutdown()
71 log.info("shutting down")
72 # first, note that we're shutting down
73 self.stopping.set()
74 # stop clones
75 self.cloner.shutdown()
76 # stop purge threads
77 self.purge_queue.shutdown()
78 # last, delete all libcephfs handles from connection pool
79 self.connection_pool.del_all_connections()
80
81 def cluster_log(self, msg, lvl=None):
82 """
83 log to cluster log with default log level as WARN.
84 """
85 if not lvl:
86 lvl = self.mgr.ClusterLogPrio.WARN
87 self.mgr.cluster_log("cluster", lvl, msg)
88
89 def volume_exception_to_retval(self, ve):
90 """
91 return a tuple representation from a volume exception
92 """
93 return ve.to_tuple()
94
95 ### volume operations -- create, rm, ls
96
97 def create_fs_volume(self, volname, placement):
98 if self.is_stopping():
99 return -errno.ESHUTDOWN, "", "shutdown in progress"
100 return create_volume(self.mgr, volname, placement)
101
102 def delete_fs_volume(self, volname, confirm):
103 if self.is_stopping():
104 return -errno.ESHUTDOWN, "", "shutdown in progress"
105
106 if confirm != "--yes-i-really-mean-it":
107 return -errno.EPERM, "", "WARNING: this will *PERMANENTLY DESTROY* all data " \
108 "stored in the filesystem '{0}'. If you are *ABSOLUTELY CERTAIN* " \
109 "that is what you want, re-issue the command followed by " \
110 "--yes-i-really-mean-it.".format(volname)
111
112 ret, out, err = self.mgr.check_mon_command({
113 'prefix': 'config get',
114 'key': 'mon_allow_pool_delete',
115 'who': 'mon',
116 'format': 'json',
117 })
118 mon_allow_pool_delete = json.loads(out)
119 if not mon_allow_pool_delete:
120 return -errno.EPERM, "", "pool deletion is disabled; you must first " \
121 "set the mon_allow_pool_delete config option to true before volumes " \
122 "can be deleted"
123
124 metadata_pool, data_pools = get_pool_names(self.mgr, volname)
125 if not metadata_pool:
126 return -errno.ENOENT, "", "volume {0} doesn't exist".format(volname)
127 self.purge_queue.cancel_jobs(volname)
128 self.connection_pool.del_connections(volname, wait=True)
129 return delete_volume(self.mgr, volname, metadata_pool, data_pools)
130
131 def list_fs_volumes(self):
132 if self.stopping.is_set():
133 return -errno.ESHUTDOWN, "", "shutdown in progress"
134 volumes = list_volumes(self.mgr)
135 return 0, json.dumps(volumes, indent=4, sort_keys=True), ""
136
137 def rename_fs_volume(self, volname, newvolname, sure):
138 if self.is_stopping():
139 return -errno.ESHUTDOWN, "", "shutdown in progress"
140
141 if not sure:
142 return (
143 -errno.EPERM, "",
144 "WARNING: This will rename the filesystem and possibly its "
145 "pools. It is a potentially disruptive operation, clients' "
146 "cephx credentials need reauthorized to access the file system "
147 "and its pools with the new name. Add --yes-i-really-mean-it "
148 "if you are sure you wish to continue.")
149
150 return rename_volume(self.mgr, volname, newvolname)
151
152 def volume_info(self, **kwargs):
153 ret = None
154 volname = kwargs['vol_name']
155
156 try:
157 with open_volume(self, volname) as fs_handle:
158 path = self.volspec.base_dir
159 vol_info_dict = {}
160 try:
161 st = fs_handle.statx(path.encode('utf-8'), cephfs.CEPH_STATX_SIZE,
162 cephfs.AT_SYMLINK_NOFOLLOW)
163
164 usedbytes = st['size']
165 vol_info_dict = get_pending_subvol_deletions_count(path)
166 vol_info_dict['used_size'] = int(usedbytes)
167 except cephfs.Error as e:
168 if e.args[0] == errno.ENOENT:
169 pass
170 df = self.mgr.get("df")
171 pool_stats = dict([(p['id'], p['stats']) for p in df['pools']])
172 osdmap = self.mgr.get("osd_map")
173 pools = dict([(p['pool'], p) for p in osdmap['pools']])
174 metadata_pool_id, data_pool_ids = get_pool_ids(self.mgr, volname)
175 vol_info_dict["pools"] = {"metadata": [], "data": []}
176 for pool_id in [metadata_pool_id] + data_pool_ids:
177 if pool_id == metadata_pool_id:
178 pool_type = "metadata"
179 else:
180 pool_type = "data"
181 vol_info_dict["pools"][pool_type].append({
182 'name': pools[pool_id]['pool_name'],
183 'used': pool_stats[pool_id]['bytes_used'],
184 'avail': pool_stats[pool_id]['max_avail']})
185
186 mon_addr_lst = []
187 mon_map_mons = self.mgr.get('mon_map')['mons']
188 for mon in mon_map_mons:
189 ip_port = mon['addr'].split("/")[0]
190 mon_addr_lst.append(ip_port)
191 vol_info_dict["mon_addrs"] = mon_addr_lst
192 ret = 0, json.dumps(vol_info_dict, indent=4, sort_keys=True), ""
193 except VolumeException as ve:
194 ret = self.volume_exception_to_retval(ve)
195 return ret
196
197 ### subvolume operations
198
199 def _create_subvolume(self, fs_handle, volname, group, subvolname, **kwargs):
200 size = kwargs['size']
201 pool = kwargs['pool_layout']
202 uid = kwargs['uid']
203 gid = kwargs['gid']
204 mode = kwargs['mode']
205 isolate_nspace = kwargs['namespace_isolated']
206
207 oct_mode = octal_str_to_decimal_int(mode)
208 try:
209 create_subvol(
210 self.mgr, fs_handle, self.volspec, group, subvolname, size, isolate_nspace, pool, oct_mode, uid, gid)
211 except VolumeException as ve:
212 # kick the purge threads for async removal -- note that this
213 # assumes that the subvolume is moved to trashcan for cleanup on error.
214 self.purge_queue.queue_job(volname)
215 raise ve
216
217 def create_subvolume(self, **kwargs):
218 ret = 0, "", ""
219 volname = kwargs['vol_name']
220 subvolname = kwargs['sub_name']
221 groupname = kwargs['group_name']
222 size = kwargs['size']
223 pool = kwargs['pool_layout']
224 uid = kwargs['uid']
225 gid = kwargs['gid']
226 mode = kwargs['mode']
227 isolate_nspace = kwargs['namespace_isolated']
228
229 try:
230 with open_volume(self, volname) as fs_handle:
231 with open_group(fs_handle, self.volspec, groupname) as group:
232 try:
233 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.CREATE) as subvolume:
234 # idempotent creation -- valid. Attributes set is supported.
235 attrs = {
236 'uid': uid if uid else subvolume.uid,
237 'gid': gid if gid else subvolume.gid,
238 'mode': octal_str_to_decimal_int(mode),
239 'data_pool': pool,
240 'pool_namespace': subvolume.namespace if isolate_nspace else None,
241 'quota': size
242 }
243 subvolume.set_attrs(subvolume.path, attrs)
244 except VolumeException as ve:
245 if ve.errno == -errno.ENOENT:
246 self._create_subvolume(fs_handle, volname, group, subvolname, **kwargs)
247 else:
248 raise
249 except VolumeException as ve:
250 # volume/group does not exist or subvolume creation failed
251 ret = self.volume_exception_to_retval(ve)
252 return ret
253
254 def remove_subvolume(self, **kwargs):
255 ret = 0, "", ""
256 volname = kwargs['vol_name']
257 subvolname = kwargs['sub_name']
258 groupname = kwargs['group_name']
259 force = kwargs['force']
260 retainsnaps = kwargs['retain_snapshots']
261
262 try:
263 with open_volume(self, volname) as fs_handle:
264 with open_group(fs_handle, self.volspec, groupname) as group:
265 remove_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, force, retainsnaps)
266 # kick the purge threads for async removal -- note that this
267 # assumes that the subvolume is moved to trash can.
268 # TODO: make purge queue as singleton so that trash can kicks
269 # the purge threads on dump.
270 self.purge_queue.queue_job(volname)
271 except VolumeException as ve:
272 if ve.errno == -errno.EAGAIN and not force:
273 ve = VolumeException(ve.errno, ve.error_str + " (use --force to override)")
274 ret = self.volume_exception_to_retval(ve)
275 elif not (ve.errno == -errno.ENOENT and force):
276 ret = self.volume_exception_to_retval(ve)
277 return ret
278
279 def authorize_subvolume(self, **kwargs):
280 ret = 0, "", ""
281 volname = kwargs['vol_name']
282 subvolname = kwargs['sub_name']
283 authid = kwargs['auth_id']
284 groupname = kwargs['group_name']
285 accesslevel = kwargs['access_level']
286 tenant_id = kwargs['tenant_id']
287 allow_existing_id = kwargs['allow_existing_id']
288
289 try:
290 with open_volume(self, volname) as fs_handle:
291 with open_group(fs_handle, self.volspec, groupname) as group:
292 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.ALLOW_ACCESS) as subvolume:
293 key = subvolume.authorize(authid, accesslevel, tenant_id, allow_existing_id)
294 ret = 0, key, ""
295 except VolumeException as ve:
296 ret = self.volume_exception_to_retval(ve)
297 return ret
298
299 def deauthorize_subvolume(self, **kwargs):
300 ret = 0, "", ""
301 volname = kwargs['vol_name']
302 subvolname = kwargs['sub_name']
303 authid = kwargs['auth_id']
304 groupname = kwargs['group_name']
305
306 try:
307 with open_volume(self, volname) as fs_handle:
308 with open_group(fs_handle, self.volspec, groupname) as group:
309 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.DENY_ACCESS) as subvolume:
310 subvolume.deauthorize(authid)
311 except VolumeException as ve:
312 ret = self.volume_exception_to_retval(ve)
313 return ret
314
315 def authorized_list(self, **kwargs):
316 ret = 0, "", ""
317 volname = kwargs['vol_name']
318 subvolname = kwargs['sub_name']
319 groupname = kwargs['group_name']
320
321 try:
322 with open_volume(self, volname) as fs_handle:
323 with open_group(fs_handle, self.volspec, groupname) as group:
324 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.AUTH_LIST) as subvolume:
325 auths = subvolume.authorized_list()
326 ret = 0, json.dumps(auths, indent=4, sort_keys=True), ""
327 except VolumeException as ve:
328 ret = self.volume_exception_to_retval(ve)
329 return ret
330
331 def evict(self, **kwargs):
332 ret = 0, "", ""
333 volname = kwargs['vol_name']
334 subvolname = kwargs['sub_name']
335 authid = kwargs['auth_id']
336 groupname = kwargs['group_name']
337
338 try:
339 with open_volume(self, volname) as fs_handle:
340 with open_group(fs_handle, self.volspec, groupname) as group:
341 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.EVICT) as subvolume:
342 key = subvolume.evict(volname, authid)
343 ret = 0, "", ""
344 except (VolumeException, ClusterTimeout, ClusterError, EvictionError) as e:
345 if isinstance(e, VolumeException):
346 ret = self.volume_exception_to_retval(e)
347 elif isinstance(e, ClusterTimeout):
348 ret = -errno.ETIMEDOUT , "", "Timedout trying to talk to ceph cluster"
349 elif isinstance(e, ClusterError):
350 ret = e._result_code , "", e._result_str
351 elif isinstance(e, EvictionError):
352 ret = -errno.EINVAL, "", str(e)
353 return ret
354
355 def resize_subvolume(self, **kwargs):
356 ret = 0, "", ""
357 volname = kwargs['vol_name']
358 subvolname = kwargs['sub_name']
359 newsize = kwargs['new_size']
360 noshrink = kwargs['no_shrink']
361 groupname = kwargs['group_name']
362
363 try:
364 with open_volume(self, volname) as fs_handle:
365 with open_group(fs_handle, self.volspec, groupname) as group:
366 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.RESIZE) as subvolume:
367 nsize, usedbytes = subvolume.resize(newsize, noshrink)
368 ret = 0, json.dumps(
369 [{'bytes_used': usedbytes},{'bytes_quota': nsize},
370 {'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0)}],
371 indent=4, sort_keys=True), ""
372 except VolumeException as ve:
373 ret = self.volume_exception_to_retval(ve)
374 return ret
375
376 def subvolume_pin(self, **kwargs):
377 ret = 0, "", ""
378 volname = kwargs['vol_name']
379 subvolname = kwargs['sub_name']
380 pin_type = kwargs['pin_type']
381 pin_setting = kwargs['pin_setting']
382 groupname = kwargs['group_name']
383
384 try:
385 with open_volume(self, volname) as fs_handle:
386 with open_group(fs_handle, self.volspec, groupname) as group:
387 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.PIN) as subvolume:
388 subvolume.pin(pin_type, pin_setting)
389 ret = 0, json.dumps({}), ""
390 except VolumeException as ve:
391 ret = self.volume_exception_to_retval(ve)
392 return ret
393
394 def subvolume_getpath(self, **kwargs):
395 ret = None
396 volname = kwargs['vol_name']
397 subvolname = kwargs['sub_name']
398 groupname = kwargs['group_name']
399
400 try:
401 with open_volume(self, volname) as fs_handle:
402 with open_group(fs_handle, self.volspec, groupname) as group:
403 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.GETPATH) as subvolume:
404 subvolpath = subvolume.path
405 ret = 0, subvolpath.decode("utf-8"), ""
406 except VolumeException as ve:
407 ret = self.volume_exception_to_retval(ve)
408 return ret
409
410 def subvolume_info(self, **kwargs):
411 ret = None
412 volname = kwargs['vol_name']
413 subvolname = kwargs['sub_name']
414 groupname = kwargs['group_name']
415
416 try:
417 with open_volume(self, volname) as fs_handle:
418 with open_group(fs_handle, self.volspec, groupname) as group:
419 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.INFO) as subvolume:
420 mon_addr_lst = []
421 mon_map_mons = self.mgr.get('mon_map')['mons']
422 for mon in mon_map_mons:
423 ip_port = mon['addr'].split("/")[0]
424 mon_addr_lst.append(ip_port)
425
426 subvol_info_dict = subvolume.info()
427 subvol_info_dict["mon_addrs"] = mon_addr_lst
428 ret = 0, json.dumps(subvol_info_dict, indent=4, sort_keys=True), ""
429 except VolumeException as ve:
430 ret = self.volume_exception_to_retval(ve)
431 return ret
432
433 def set_user_metadata(self, **kwargs):
434 ret = 0, "", ""
435 volname = kwargs['vol_name']
436 subvolname = kwargs['sub_name']
437 groupname = kwargs['group_name']
438 keyname = kwargs['key_name']
439 value = kwargs['value']
440
441 try:
442 with open_volume(self, volname) as fs_handle:
443 with open_group(fs_handle, self.volspec, groupname) as group:
444 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_SET) as subvolume:
445 subvolume.set_user_metadata(keyname, value)
446 except VolumeException as ve:
447 ret = self.volume_exception_to_retval(ve)
448 return ret
449
450 def get_user_metadata(self, **kwargs):
451 ret = 0, "", ""
452 volname = kwargs['vol_name']
453 subvolname = kwargs['sub_name']
454 groupname = kwargs['group_name']
455 keyname = kwargs['key_name']
456
457 try:
458 with open_volume(self, volname) as fs_handle:
459 with open_group(fs_handle, self.volspec, groupname) as group:
460 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_GET) as subvolume:
461 value = subvolume.get_user_metadata(keyname)
462 ret = 0, value, ""
463 except VolumeException as ve:
464 ret = self.volume_exception_to_retval(ve)
465 return ret
466
467 def list_user_metadata(self, **kwargs):
468 ret = 0, "", ""
469 volname = kwargs['vol_name']
470 subvolname = kwargs['sub_name']
471 groupname = kwargs['group_name']
472
473 try:
474 with open_volume(self, volname) as fs_handle:
475 with open_group(fs_handle, self.volspec, groupname) as group:
476 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_LIST) as subvolume:
477 subvol_metadata_dict = subvolume.list_user_metadata()
478 ret = 0, json.dumps(subvol_metadata_dict, indent=4, sort_keys=True), ""
479 except VolumeException as ve:
480 ret = self.volume_exception_to_retval(ve)
481 return ret
482
483 def remove_user_metadata(self, **kwargs):
484 ret = 0, "", ""
485 volname = kwargs['vol_name']
486 subvolname = kwargs['sub_name']
487 groupname = kwargs['group_name']
488 keyname = kwargs['key_name']
489 force = kwargs['force']
490
491 try:
492 with open_volume(self, volname) as fs_handle:
493 with open_group(fs_handle, self.volspec, groupname) as group:
494 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_REMOVE) as subvolume:
495 subvolume.remove_user_metadata(keyname)
496 except VolumeException as ve:
497 if not (ve.errno == -errno.ENOENT and force):
498 ret = self.volume_exception_to_retval(ve)
499 return ret
500
501 def list_subvolumes(self, **kwargs):
502 ret = 0, "", ""
503 volname = kwargs['vol_name']
504 groupname = kwargs['group_name']
505
506 try:
507 with open_volume(self, volname) as fs_handle:
508 with open_group(fs_handle, self.volspec, groupname) as group:
509 subvolumes = group.list_subvolumes()
510 ret = 0, name_to_json(subvolumes), ""
511 except VolumeException as ve:
512 ret = self.volume_exception_to_retval(ve)
513 return ret
514
515 def subvolume_exists(self, **kwargs):
516 volname = kwargs['vol_name']
517 groupname = kwargs['group_name']
518 ret = 0, "", ""
519 volume_exists = False
520
521 try:
522 with open_volume(self, volname) as fs_handle:
523 volume_exists = True
524 with open_group(fs_handle, self.volspec, groupname) as group:
525 res = group.has_subvolumes()
526 if res:
527 ret = 0, "subvolume exists", ""
528 else:
529 ret = 0, "no subvolume exists", ""
530 except VolumeException as ve:
531 if volume_exists and ve.errno == -errno.ENOENT:
532 ret = 0, "no subvolume exists", ""
533 else:
534 ret = self.volume_exception_to_retval(ve)
535 return ret
536
537 ### subvolume snapshot
538
539 def create_subvolume_snapshot(self, **kwargs):
540 ret = 0, "", ""
541 volname = kwargs['vol_name']
542 subvolname = kwargs['sub_name']
543 snapname = kwargs['snap_name']
544 groupname = kwargs['group_name']
545
546 try:
547 with open_volume(self, volname) as fs_handle:
548 with open_group(fs_handle, self.volspec, groupname) as group:
549 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_CREATE) as subvolume:
550 subvolume.create_snapshot(snapname)
551 except VolumeException as ve:
552 ret = self.volume_exception_to_retval(ve)
553 return ret
554
555 def remove_subvolume_snapshot(self, **kwargs):
556 ret = 0, "", ""
557 volname = kwargs['vol_name']
558 subvolname = kwargs['sub_name']
559 snapname = kwargs['snap_name']
560 groupname = kwargs['group_name']
561 force = kwargs['force']
562
563 try:
564 with open_volume(self, volname) as fs_handle:
565 with open_group(fs_handle, self.volspec, groupname) as group:
566 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_REMOVE) as subvolume:
567 subvolume.remove_snapshot(snapname, force)
568 except VolumeException as ve:
569 # ESTALE serves as an error to state that subvolume is currently stale due to internal removal and,
570 # we should tickle the purge jobs to purge the same
571 if ve.errno == -errno.ESTALE:
572 self.purge_queue.queue_job(volname)
573 elif not (ve.errno == -errno.ENOENT and force):
574 ret = self.volume_exception_to_retval(ve)
575 return ret
576
577 def subvolume_snapshot_info(self, **kwargs):
578 ret = 0, "", ""
579 volname = kwargs['vol_name']
580 subvolname = kwargs['sub_name']
581 snapname = kwargs['snap_name']
582 groupname = kwargs['group_name']
583
584 try:
585 with open_volume(self, volname) as fs_handle:
586 with open_group(fs_handle, self.volspec, groupname) as group:
587 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_INFO) as subvolume:
588 snap_info_dict = subvolume.snapshot_info(snapname)
589 ret = 0, json.dumps(snap_info_dict, indent=4, sort_keys=True), ""
590 except VolumeException as ve:
591 ret = self.volume_exception_to_retval(ve)
592 return ret
593
594 def set_subvolume_snapshot_metadata(self, **kwargs):
595 ret = 0, "", ""
596 volname = kwargs['vol_name']
597 subvolname = kwargs['sub_name']
598 snapname = kwargs['snap_name']
599 groupname = kwargs['group_name']
600 keyname = kwargs['key_name']
601 value = kwargs['value']
602
603 try:
604 with open_volume(self, volname) as fs_handle:
605 with open_group(fs_handle, self.volspec, groupname) as group:
606 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_SET) as subvolume:
607 if not snapname.encode('utf-8') in subvolume.list_snapshots():
608 raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
609 subvolume.set_snapshot_metadata(snapname, keyname, value)
610 except VolumeException as ve:
611 ret = self.volume_exception_to_retval(ve)
612 return ret
613
614 def get_subvolume_snapshot_metadata(self, **kwargs):
615 ret = 0, "", ""
616 volname = kwargs['vol_name']
617 subvolname = kwargs['sub_name']
618 snapname = kwargs['snap_name']
619 groupname = kwargs['group_name']
620 keyname = kwargs['key_name']
621
622 try:
623 with open_volume(self, volname) as fs_handle:
624 with open_group(fs_handle, self.volspec, groupname) as group:
625 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_GET) as subvolume:
626 if not snapname.encode('utf-8') in subvolume.list_snapshots():
627 raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
628 value = subvolume.get_snapshot_metadata(snapname, keyname)
629 ret = 0, value, ""
630 except VolumeException as ve:
631 ret = self.volume_exception_to_retval(ve)
632 return ret
633
634 def list_subvolume_snapshot_metadata(self, **kwargs):
635 ret = 0, "", ""
636 volname = kwargs['vol_name']
637 subvolname = kwargs['sub_name']
638 snapname = kwargs['snap_name']
639 groupname = kwargs['group_name']
640
641 try:
642 with open_volume(self, volname) as fs_handle:
643 with open_group(fs_handle, self.volspec, groupname) as group:
644 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_LIST) as subvolume:
645 if not snapname.encode('utf-8') in subvolume.list_snapshots():
646 raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
647 snap_metadata_dict = subvolume.list_snapshot_metadata(snapname)
648 ret = 0, json.dumps(snap_metadata_dict, indent=4, sort_keys=True), ""
649 except VolumeException as ve:
650 ret = self.volume_exception_to_retval(ve)
651 return ret
652
653 def remove_subvolume_snapshot_metadata(self, **kwargs):
654 ret = 0, "", ""
655 volname = kwargs['vol_name']
656 subvolname = kwargs['sub_name']
657 snapname = kwargs['snap_name']
658 groupname = kwargs['group_name']
659 keyname = kwargs['key_name']
660 force = kwargs['force']
661
662 try:
663 with open_volume(self, volname) as fs_handle:
664 with open_group(fs_handle, self.volspec, groupname) as group:
665 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_REMOVE) as subvolume:
666 if not snapname.encode('utf-8') in subvolume.list_snapshots():
667 raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
668 subvolume.remove_snapshot_metadata(snapname, keyname)
669 except VolumeException as ve:
670 if not (ve.errno == -errno.ENOENT and force):
671 ret = self.volume_exception_to_retval(ve)
672 return ret
673
674 def list_subvolume_snapshots(self, **kwargs):
675 ret = 0, "", ""
676 volname = kwargs['vol_name']
677 subvolname = kwargs['sub_name']
678 groupname = kwargs['group_name']
679
680 try:
681 with open_volume(self, volname) as fs_handle:
682 with open_group(fs_handle, self.volspec, groupname) as group:
683 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_LIST) as subvolume:
684 snapshots = subvolume.list_snapshots()
685 ret = 0, name_to_json(snapshots), ""
686 except VolumeException as ve:
687 ret = self.volume_exception_to_retval(ve)
688 return ret
689
690 def protect_subvolume_snapshot(self, **kwargs):
691 ret = 0, "", "Deprecation warning: 'snapshot protect' call is deprecated and will be removed in a future release"
692 volname = kwargs['vol_name']
693 subvolname = kwargs['sub_name']
694 groupname = kwargs['group_name']
695
696 try:
697 with open_volume(self, volname) as fs_handle:
698 with open_group(fs_handle, self.volspec, groupname) as group:
699 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_PROTECT) as subvolume:
700 log.warning("snapshot protect call is deprecated and will be removed in a future release")
701 except VolumeException as ve:
702 ret = self.volume_exception_to_retval(ve)
703 return ret
704
705 def unprotect_subvolume_snapshot(self, **kwargs):
706 ret = 0, "", "Deprecation warning: 'snapshot unprotect' call is deprecated and will be removed in a future release"
707 volname = kwargs['vol_name']
708 subvolname = kwargs['sub_name']
709 groupname = kwargs['group_name']
710
711 try:
712 with open_volume(self, volname) as fs_handle:
713 with open_group(fs_handle, self.volspec, groupname) as group:
714 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_UNPROTECT) as subvolume:
715 log.warning("snapshot unprotect call is deprecated and will be removed in a future release")
716 except VolumeException as ve:
717 ret = self.volume_exception_to_retval(ve)
718 return ret
719
720 def _prepare_clone_subvolume(self, fs_handle, volname, s_subvolume, s_snapname, t_group, t_subvolname, **kwargs):
721 t_pool = kwargs['pool_layout']
722 s_subvolname = kwargs['sub_name']
723 s_groupname = kwargs['group_name']
724 t_groupname = kwargs['target_group_name']
725
726 create_clone(self.mgr, fs_handle, self.volspec, t_group, t_subvolname, t_pool, volname, s_subvolume, s_snapname)
727 with open_subvol(self.mgr, fs_handle, self.volspec, t_group, t_subvolname, SubvolumeOpType.CLONE_INTERNAL) as t_subvolume:
728 try:
729 if t_groupname == s_groupname and t_subvolname == s_subvolname:
730 t_subvolume.attach_snapshot(s_snapname, t_subvolume)
731 else:
732 s_subvolume.attach_snapshot(s_snapname, t_subvolume)
733 self.cloner.queue_job(volname)
734 except VolumeException as ve:
735 try:
736 t_subvolume.remove()
737 self.purge_queue.queue_job(volname)
738 except Exception as e:
739 log.warning("failed to cleanup clone subvolume '{0}' ({1})".format(t_subvolname, e))
740 raise ve
741
742 def _clone_subvolume_snapshot(self, fs_handle, volname, s_group, s_subvolume, **kwargs):
743 s_snapname = kwargs['snap_name']
744 target_subvolname = kwargs['target_sub_name']
745 target_groupname = kwargs['target_group_name']
746 s_groupname = kwargs['group_name']
747
748 if not s_snapname.encode('utf-8') in s_subvolume.list_snapshots():
749 raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(s_snapname))
750
751 with open_group_unique(fs_handle, self.volspec, target_groupname, s_group, s_groupname) as target_group:
752 try:
753 with open_subvol(self.mgr, fs_handle, self.volspec, target_group, target_subvolname, SubvolumeOpType.CLONE_CREATE):
754 raise VolumeException(-errno.EEXIST, "subvolume '{0}' exists".format(target_subvolname))
755 except VolumeException as ve:
756 if ve.errno == -errno.ENOENT:
757 self._prepare_clone_subvolume(fs_handle, volname, s_subvolume, s_snapname,
758 target_group, target_subvolname, **kwargs)
759 else:
760 raise
761
762 def clone_subvolume_snapshot(self, **kwargs):
763 ret = 0, "", ""
764 volname = kwargs['vol_name']
765 s_subvolname = kwargs['sub_name']
766 s_groupname = kwargs['group_name']
767
768 try:
769 with open_volume(self, volname) as fs_handle:
770 with open_group(fs_handle, self.volspec, s_groupname) as s_group:
771 with open_subvol(self.mgr, fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
772 self._clone_subvolume_snapshot(fs_handle, volname, s_group, s_subvolume, **kwargs)
773 except VolumeException as ve:
774 ret = self.volume_exception_to_retval(ve)
775 return ret
776
777 def clone_status(self, **kwargs):
778 ret = 0, "", ""
779 volname = kwargs['vol_name']
780 clonename = kwargs['clone_name']
781 groupname = kwargs['group_name']
782
783 try:
784 with open_volume(self, volname) as fs_handle:
785 with open_group(fs_handle, self.volspec, groupname) as group:
786 with open_subvol(self.mgr, fs_handle, self.volspec, group, clonename, SubvolumeOpType.CLONE_STATUS) as subvolume:
787 ret = 0, json.dumps({'status' : subvolume.status}, indent=2), ""
788 except VolumeException as ve:
789 ret = self.volume_exception_to_retval(ve)
790 return ret
791
792 def clone_cancel(self, **kwargs):
793 ret = 0, "", ""
794 volname = kwargs['vol_name']
795 clonename = kwargs['clone_name']
796 groupname = kwargs['group_name']
797
798 try:
799 self.cloner.cancel_job(volname, (clonename, groupname))
800 except VolumeException as ve:
801 ret = self.volume_exception_to_retval(ve)
802 return ret
803
804 ### group operations
805
806 def create_subvolume_group(self, **kwargs):
807 ret = 0, "", ""
808 volname = kwargs['vol_name']
809 groupname = kwargs['group_name']
810 size = kwargs['size']
811 pool = kwargs['pool_layout']
812 uid = kwargs['uid']
813 gid = kwargs['gid']
814 mode = kwargs['mode']
815
816 try:
817 with open_volume(self, volname) as fs_handle:
818 try:
819 with open_group(fs_handle, self.volspec, groupname) as group:
820 # idempotent creation -- valid.
821 attrs = {
822 'uid': uid,
823 'gid': gid,
824 'mode': octal_str_to_decimal_int(mode),
825 'data_pool': pool,
826 'quota': size
827 }
828 set_group_attrs(fs_handle, group.path, attrs)
829 except VolumeException as ve:
830 if ve.errno == -errno.ENOENT:
831 oct_mode = octal_str_to_decimal_int(mode)
832 create_group(fs_handle, self.volspec, groupname, size, pool, oct_mode, uid, gid)
833 else:
834 raise
835 except VolumeException as ve:
836 # volume does not exist or subvolume group creation failed
837 ret = self.volume_exception_to_retval(ve)
838 return ret
839
840 def remove_subvolume_group(self, **kwargs):
841 ret = 0, "", ""
842 volname = kwargs['vol_name']
843 groupname = kwargs['group_name']
844 force = kwargs['force']
845
846 try:
847 with open_volume(self, volname) as fs_handle:
848 remove_group(fs_handle, self.volspec, groupname)
849 except VolumeException as ve:
850 if not (ve.errno == -errno.ENOENT and force):
851 ret = self.volume_exception_to_retval(ve)
852 return ret
853
854 def subvolumegroup_info(self, **kwargs):
855 ret = None
856 volname = kwargs['vol_name']
857 groupname = kwargs['group_name']
858
859 try:
860 with open_volume(self, volname) as fs_handle:
861 with open_group(fs_handle, self.volspec, groupname) as group:
862 mon_addr_lst = []
863 mon_map_mons = self.mgr.get('mon_map')['mons']
864 for mon in mon_map_mons:
865 ip_port = mon['addr'].split("/")[0]
866 mon_addr_lst.append(ip_port)
867
868 group_info_dict = group.info()
869 group_info_dict["mon_addrs"] = mon_addr_lst
870 ret = 0, json.dumps(group_info_dict, indent=4, sort_keys=True), ""
871 except VolumeException as ve:
872 ret = self.volume_exception_to_retval(ve)
873 return ret
874
875 def resize_subvolume_group(self, **kwargs):
876 ret = 0, "", ""
877 volname = kwargs['vol_name']
878 groupname = kwargs['group_name']
879 newsize = kwargs['new_size']
880 noshrink = kwargs['no_shrink']
881
882 try:
883 with open_volume(self, volname) as fs_handle:
884 with open_group(fs_handle, self.volspec, groupname) as group:
885 nsize, usedbytes = group.resize(newsize, noshrink)
886 ret = 0, json.dumps(
887 [{'bytes_used': usedbytes},{'bytes_quota': nsize},
888 {'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0)}],
889 indent=4, sort_keys=True), ""
890 except VolumeException as ve:
891 ret = self.volume_exception_to_retval(ve)
892 return ret
893
894 def getpath_subvolume_group(self, **kwargs):
895 volname = kwargs['vol_name']
896 groupname = kwargs['group_name']
897
898 try:
899 with open_volume(self, volname) as fs_handle:
900 with open_group(fs_handle, self.volspec, groupname) as group:
901 return 0, group.path.decode('utf-8'), ""
902 except VolumeException as ve:
903 return self.volume_exception_to_retval(ve)
904
905 def list_subvolume_groups(self, **kwargs):
906 volname = kwargs['vol_name']
907 ret = 0, '[]', ""
908 volume_exists = False
909 try:
910 with open_volume(self, volname) as fs_handle:
911 volume_exists = True
912 groups = listdir(fs_handle, self.volspec.base_dir, filter_entries=[dir.encode('utf-8') for dir in self.volspec.INTERNAL_DIRS])
913 ret = 0, name_to_json(groups), ""
914 except VolumeException as ve:
915 if not ve.errno == -errno.ENOENT or not volume_exists:
916 ret = self.volume_exception_to_retval(ve)
917 return ret
918
919 def pin_subvolume_group(self, **kwargs):
920 ret = 0, "", ""
921 volname = kwargs['vol_name']
922 groupname = kwargs['group_name']
923 pin_type = kwargs['pin_type']
924 pin_setting = kwargs['pin_setting']
925
926 try:
927 with open_volume(self, volname) as fs_handle:
928 with open_group(fs_handle, self.volspec, groupname) as group:
929 group.pin(pin_type, pin_setting)
930 ret = 0, json.dumps({}), ""
931 except VolumeException as ve:
932 ret = self.volume_exception_to_retval(ve)
933 return ret
934
935 def subvolume_group_exists(self, **kwargs):
936 volname = kwargs['vol_name']
937 ret = 0, "", ""
938 volume_exists = False
939
940 try:
941 with open_volume(self, volname) as fs_handle:
942 volume_exists = True
943 res = has_subdir(fs_handle, self.volspec.base_dir, filter_entries=[
944 dir.encode('utf-8') for dir in self.volspec.INTERNAL_DIRS])
945 if res:
946 ret = 0, "subvolumegroup exists", ""
947 else:
948 ret = 0, "no subvolumegroup exists", ""
949 except VolumeException as ve:
950 if volume_exists and ve.errno == -errno.ENOENT:
951 ret = 0, "no subvolumegroup exists", ""
952 else:
953 ret = self.volume_exception_to_retval(ve)
954 return ret
955
956 ### group snapshot
957
958 def create_subvolume_group_snapshot(self, **kwargs):
959 ret = -errno.ENOSYS, "", "subvolume group snapshots are not supported"
960 volname = kwargs['vol_name']
961 groupname = kwargs['group_name']
962 # snapname = kwargs['snap_name']
963
964 try:
965 with open_volume(self, volname) as fs_handle:
966 with open_group(fs_handle, self.volspec, groupname) as group:
967 # as subvolumes are marked with the vxattr ceph.dir.subvolume deny snapshots
968 # at the subvolume group (see: https://tracker.ceph.com/issues/46074)
969 # group.create_snapshot(snapname)
970 pass
971 except VolumeException as ve:
972 ret = self.volume_exception_to_retval(ve)
973 return ret
974
975 def remove_subvolume_group_snapshot(self, **kwargs):
976 ret = 0, "", ""
977 volname = kwargs['vol_name']
978 groupname = kwargs['group_name']
979 snapname = kwargs['snap_name']
980 force = kwargs['force']
981
982 try:
983 with open_volume(self, volname) as fs_handle:
984 with open_group(fs_handle, self.volspec, groupname) as group:
985 group.remove_snapshot(snapname)
986 except VolumeException as ve:
987 if not (ve.errno == -errno.ENOENT and force):
988 ret = self.volume_exception_to_retval(ve)
989 return ret
990
991 def list_subvolume_group_snapshots(self, **kwargs):
992 ret = 0, "", ""
993 volname = kwargs['vol_name']
994 groupname = kwargs['group_name']
995
996 try:
997 with open_volume(self, volname) as fs_handle:
998 with open_group(fs_handle, self.volspec, groupname) as group:
999 snapshots = group.list_snapshots()
1000 ret = 0, name_to_json(snapshots), ""
1001 except VolumeException as ve:
1002 ret = self.volume_exception_to_retval(ve)
1003 return ret