]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/volume.py
add stop-gap to fix compat with CPUs not supporting SSE 4.1
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / volume.py
1 import json
2 import errno
3 import logging
4 import os
5 import mgr_util
6 from typing import TYPE_CHECKING
7
8 import cephfs
9
10 from mgr_util import CephfsClient
11
12 from .fs_util import listdir, has_subdir
13
14 from .operations.group import open_group, create_group, remove_group, \
15 open_group_unique, set_group_attrs
16 from .operations.volume import create_volume, delete_volume, rename_volume, \
17 list_volumes, open_volume, get_pool_names, get_pool_ids, get_pending_subvol_deletions_count
18 from .operations.subvolume import open_subvol, create_subvol, remove_subvol, \
19 create_clone
20 from .operations.trash import Trash
21
22 from .vol_spec import VolSpec
23 from .exception import VolumeException, ClusterError, ClusterTimeout, EvictionError
24 from .async_cloner import Cloner
25 from .purge_queue import ThreadPoolPurgeQueueMixin
26 from .operations.template import SubvolumeOpType
27
28 if TYPE_CHECKING:
29 from volumes import Module
30
31 log = logging.getLogger(__name__)
32
33 ALLOWED_ACCESS_LEVELS = ('r', 'rw')
34
35
36 def octal_str_to_decimal_int(mode):
37 try:
38 return int(mode, 8)
39 except ValueError:
40 raise VolumeException(-errno.EINVAL, "Invalid mode '{0}'".format(mode))
41
42
43 def name_to_json(names):
44 """
45 convert the list of names to json
46 """
47 namedict = []
48 for i in range(len(names)):
49 namedict.append({'name': names[i].decode('utf-8')})
50 return json.dumps(namedict, indent=4, sort_keys=True)
51
52
53 class VolumeClient(CephfsClient["Module"]):
54 def __init__(self, mgr):
55 super().__init__(mgr)
56 # volume specification
57 self.volspec = VolSpec(mgr.rados.conf_get('client_snapdir'))
58 self.cloner = Cloner(self, self.mgr.max_concurrent_clones, self.mgr.snapshot_clone_delay)
59 self.purge_queue = ThreadPoolPurgeQueueMixin(self, 4)
60 # on startup, queue purge job for available volumes to kickstart
61 # purge for leftover subvolume entries in trash. note that, if the
62 # trash directory does not exist or if there are no purge entries
63 # available for a volume, the volume is removed from the purge
64 # job list.
65 fs_map = self.mgr.get('fs_map')
66 for fs in fs_map['filesystems']:
67 self.cloner.queue_job(fs['mdsmap']['fs_name'])
68 self.purge_queue.queue_job(fs['mdsmap']['fs_name'])
69
70 def shutdown(self):
71 # Overrides CephfsClient.shutdown()
72 log.info("shutting down")
73 # stop clones
74 self.cloner.shutdown()
75 # stop purge threads
76 self.purge_queue.shutdown()
77 # last, delete all libcephfs handles from connection pool
78 self.connection_pool.del_all_connections()
79
80 def cluster_log(self, msg, lvl=None):
81 """
82 log to cluster log with default log level as WARN.
83 """
84 if not lvl:
85 lvl = self.mgr.ClusterLogPrio.WARN
86 self.mgr.cluster_log("cluster", lvl, msg)
87
88 def volume_exception_to_retval(self, ve):
89 """
90 return a tuple representation from a volume exception
91 """
92 return ve.to_tuple()
93
94 ### volume operations -- create, rm, ls
95
96 def create_fs_volume(self, volname, placement):
97 return create_volume(self.mgr, volname, placement)
98
99 def delete_fs_volume(self, volname, confirm):
100 if confirm != "--yes-i-really-mean-it":
101 return -errno.EPERM, "", "WARNING: this will *PERMANENTLY DESTROY* all data " \
102 "stored in the filesystem '{0}'. If you are *ABSOLUTELY CERTAIN* " \
103 "that is what you want, re-issue the command followed by " \
104 "--yes-i-really-mean-it.".format(volname)
105
106 ret, out, err = self.mgr.check_mon_command({
107 'prefix': 'config get',
108 'key': 'mon_allow_pool_delete',
109 'who': 'mon',
110 'format': 'json',
111 })
112 mon_allow_pool_delete = json.loads(out)
113 if not mon_allow_pool_delete:
114 return -errno.EPERM, "", "pool deletion is disabled; you must first " \
115 "set the mon_allow_pool_delete config option to true before volumes " \
116 "can be deleted"
117
118 metadata_pool, data_pools = get_pool_names(self.mgr, volname)
119 if not metadata_pool:
120 return -errno.ENOENT, "", "volume {0} doesn't exist".format(volname)
121 self.purge_queue.cancel_jobs(volname)
122 self.connection_pool.del_connections(volname, wait=True)
123 return delete_volume(self.mgr, volname, metadata_pool, data_pools)
124
125 def list_fs_volumes(self):
126 volumes = list_volumes(self.mgr)
127 return 0, json.dumps(volumes, indent=4, sort_keys=True), ""
128
129 def rename_fs_volume(self, volname, newvolname, sure):
130 if not sure:
131 return (
132 -errno.EPERM, "",
133 "WARNING: This will rename the filesystem and possibly its "
134 "pools. It is a potentially disruptive operation, clients' "
135 "cephx credentials need reauthorized to access the file system "
136 "and its pools with the new name. Add --yes-i-really-mean-it "
137 "if you are sure you wish to continue.")
138
139 return rename_volume(self.mgr, volname, newvolname)
140
141 def volume_info(self, **kwargs):
142 ret = None
143 volname = kwargs['vol_name']
144 human_readable = kwargs['human_readable']
145
146 try:
147 with open_volume(self, volname) as fs_handle:
148 path = self.volspec.base_dir
149 vol_info_dict = {}
150 try:
151 st = fs_handle.statx(path.encode('utf-8'), cephfs.CEPH_STATX_SIZE,
152 cephfs.AT_SYMLINK_NOFOLLOW)
153
154 usedbytes = st['size']
155 vol_info_dict = get_pending_subvol_deletions_count(path)
156 if human_readable:
157 vol_info_dict['used_size'] = mgr_util.format_bytes(int(usedbytes), 5)
158 else:
159 vol_info_dict['used_size'] = int(usedbytes)
160 except cephfs.Error as e:
161 if e.args[0] == errno.ENOENT:
162 pass
163 df = self.mgr.get("df")
164 pool_stats = dict([(p['id'], p['stats']) for p in df['pools']])
165 osdmap = self.mgr.get("osd_map")
166 pools = dict([(p['pool'], p) for p in osdmap['pools']])
167 metadata_pool_id, data_pool_ids = get_pool_ids(self.mgr, volname)
168 vol_info_dict["pools"] = {"metadata": [], "data": []}
169 for pool_id in [metadata_pool_id] + data_pool_ids:
170 if pool_id == metadata_pool_id:
171 pool_type = "metadata"
172 else:
173 pool_type = "data"
174 if human_readable:
175 vol_info_dict["pools"][pool_type].append({
176 'name': pools[pool_id]['pool_name'],
177 'used': mgr_util.format_bytes(pool_stats[pool_id]['bytes_used'], 5),
178 'avail': mgr_util.format_bytes(pool_stats[pool_id]['max_avail'], 5)})
179 else:
180 vol_info_dict["pools"][pool_type].append({
181 'name': pools[pool_id]['pool_name'],
182 'used': pool_stats[pool_id]['bytes_used'],
183 'avail': pool_stats[pool_id]['max_avail']})
184
185 mon_addr_lst = []
186 mon_map_mons = self.mgr.get('mon_map')['mons']
187 for mon in mon_map_mons:
188 ip_port = mon['addr'].split("/")[0]
189 mon_addr_lst.append(ip_port)
190 vol_info_dict["mon_addrs"] = mon_addr_lst
191 ret = 0, json.dumps(vol_info_dict, indent=4, sort_keys=True), ""
192 except VolumeException as ve:
193 ret = self.volume_exception_to_retval(ve)
194 return ret
195
196 ### subvolume operations
197
198 def _create_subvolume(self, fs_handle, volname, group, subvolname, **kwargs):
199 size = kwargs['size']
200 pool = kwargs['pool_layout']
201 uid = kwargs['uid']
202 gid = kwargs['gid']
203 mode = kwargs['mode']
204 isolate_nspace = kwargs['namespace_isolated']
205
206 oct_mode = octal_str_to_decimal_int(mode)
207 try:
208 create_subvol(
209 self.mgr, fs_handle, self.volspec, group, subvolname, size, isolate_nspace, pool, oct_mode, uid, gid)
210 except VolumeException as ve:
211 # kick the purge threads for async removal -- note that this
212 # assumes that the subvolume is moved to trashcan for cleanup on error.
213 self.purge_queue.queue_job(volname)
214 raise ve
215
216 def create_subvolume(self, **kwargs):
217 ret = 0, "", ""
218 volname = kwargs['vol_name']
219 subvolname = kwargs['sub_name']
220 groupname = kwargs['group_name']
221 size = kwargs['size']
222 pool = kwargs['pool_layout']
223 uid = kwargs['uid']
224 gid = kwargs['gid']
225 mode = kwargs['mode']
226 isolate_nspace = kwargs['namespace_isolated']
227
228 try:
229 with open_volume(self, volname) as fs_handle:
230 with open_group(fs_handle, self.volspec, groupname) as group:
231 try:
232 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.CREATE) as subvolume:
233 # idempotent creation -- valid. Attributes set is supported.
234 attrs = {
235 'uid': uid if uid else subvolume.uid,
236 'gid': gid if gid else subvolume.gid,
237 'mode': octal_str_to_decimal_int(mode),
238 'data_pool': pool,
239 'pool_namespace': subvolume.namespace if isolate_nspace else None,
240 'quota': size
241 }
242 subvolume.set_attrs(subvolume.path, attrs)
243 except VolumeException as ve:
244 if ve.errno == -errno.ENOENT:
245 self._create_subvolume(fs_handle, volname, group, subvolname, **kwargs)
246 else:
247 raise
248 except VolumeException as ve:
249 # volume/group does not exist or subvolume creation failed
250 ret = self.volume_exception_to_retval(ve)
251 return ret
252
253 def remove_subvolume(self, **kwargs):
254 ret = 0, "", ""
255 volname = kwargs['vol_name']
256 subvolname = kwargs['sub_name']
257 groupname = kwargs['group_name']
258 force = kwargs['force']
259 retainsnaps = kwargs['retain_snapshots']
260
261 try:
262 with open_volume(self, volname) as fs_handle:
263 with open_group(fs_handle, self.volspec, groupname) as group:
264 remove_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, force, retainsnaps)
265 # kick the purge threads for async removal -- note that this
266 # assumes that the subvolume is moved to trash can.
267 # TODO: make purge queue as singleton so that trash can kicks
268 # the purge threads on dump.
269 self.purge_queue.queue_job(volname)
270 except VolumeException as ve:
271 if ve.errno == -errno.EAGAIN and not force:
272 ve = VolumeException(ve.errno, ve.error_str + " (use --force to override)")
273 ret = self.volume_exception_to_retval(ve)
274 elif not (ve.errno == -errno.ENOENT and force):
275 ret = self.volume_exception_to_retval(ve)
276 return ret
277
278 def authorize_subvolume(self, **kwargs):
279 ret = 0, "", ""
280 volname = kwargs['vol_name']
281 subvolname = kwargs['sub_name']
282 authid = kwargs['auth_id']
283 groupname = kwargs['group_name']
284 accesslevel = kwargs['access_level']
285 tenant_id = kwargs['tenant_id']
286 allow_existing_id = kwargs['allow_existing_id']
287
288 try:
289 with open_volume(self, volname) as fs_handle:
290 with open_group(fs_handle, self.volspec, groupname) as group:
291 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.ALLOW_ACCESS) as subvolume:
292 key = subvolume.authorize(authid, accesslevel, tenant_id, allow_existing_id)
293 ret = 0, key, ""
294 except VolumeException as ve:
295 ret = self.volume_exception_to_retval(ve)
296 return ret
297
298 def deauthorize_subvolume(self, **kwargs):
299 ret = 0, "", ""
300 volname = kwargs['vol_name']
301 subvolname = kwargs['sub_name']
302 authid = kwargs['auth_id']
303 groupname = kwargs['group_name']
304
305 try:
306 with open_volume(self, volname) as fs_handle:
307 with open_group(fs_handle, self.volspec, groupname) as group:
308 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.DENY_ACCESS) as subvolume:
309 subvolume.deauthorize(authid)
310 except VolumeException as ve:
311 ret = self.volume_exception_to_retval(ve)
312 return ret
313
314 def authorized_list(self, **kwargs):
315 ret = 0, "", ""
316 volname = kwargs['vol_name']
317 subvolname = kwargs['sub_name']
318 groupname = kwargs['group_name']
319
320 try:
321 with open_volume(self, volname) as fs_handle:
322 with open_group(fs_handle, self.volspec, groupname) as group:
323 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.AUTH_LIST) as subvolume:
324 auths = subvolume.authorized_list()
325 ret = 0, json.dumps(auths, indent=4, sort_keys=True), ""
326 except VolumeException as ve:
327 ret = self.volume_exception_to_retval(ve)
328 return ret
329
330 def evict(self, **kwargs):
331 ret = 0, "", ""
332 volname = kwargs['vol_name']
333 subvolname = kwargs['sub_name']
334 authid = kwargs['auth_id']
335 groupname = kwargs['group_name']
336
337 try:
338 with open_volume(self, volname) as fs_handle:
339 with open_group(fs_handle, self.volspec, groupname) as group:
340 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.EVICT) as subvolume:
341 key = subvolume.evict(volname, authid)
342 ret = 0, "", ""
343 except (VolumeException, ClusterTimeout, ClusterError, EvictionError) as e:
344 if isinstance(e, VolumeException):
345 ret = self.volume_exception_to_retval(e)
346 elif isinstance(e, ClusterTimeout):
347 ret = -errno.ETIMEDOUT , "", "Timedout trying to talk to ceph cluster"
348 elif isinstance(e, ClusterError):
349 ret = e._result_code , "", e._result_str
350 elif isinstance(e, EvictionError):
351 ret = -errno.EINVAL, "", str(e)
352 return ret
353
354 def resize_subvolume(self, **kwargs):
355 ret = 0, "", ""
356 volname = kwargs['vol_name']
357 subvolname = kwargs['sub_name']
358 newsize = kwargs['new_size']
359 noshrink = kwargs['no_shrink']
360 groupname = kwargs['group_name']
361
362 try:
363 with open_volume(self, volname) as fs_handle:
364 with open_group(fs_handle, self.volspec, groupname) as group:
365 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.RESIZE) as subvolume:
366 nsize, usedbytes = subvolume.resize(newsize, noshrink)
367 ret = 0, json.dumps(
368 [{'bytes_used': usedbytes},{'bytes_quota': nsize},
369 {'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0)}],
370 indent=4, sort_keys=True), ""
371 except VolumeException as ve:
372 ret = self.volume_exception_to_retval(ve)
373 return ret
374
375 def subvolume_pin(self, **kwargs):
376 ret = 0, "", ""
377 volname = kwargs['vol_name']
378 subvolname = kwargs['sub_name']
379 pin_type = kwargs['pin_type']
380 pin_setting = kwargs['pin_setting']
381 groupname = kwargs['group_name']
382
383 try:
384 with open_volume(self, volname) as fs_handle:
385 with open_group(fs_handle, self.volspec, groupname) as group:
386 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.PIN) as subvolume:
387 subvolume.pin(pin_type, pin_setting)
388 ret = 0, json.dumps({}), ""
389 except VolumeException as ve:
390 ret = self.volume_exception_to_retval(ve)
391 return ret
392
393 def subvolume_getpath(self, **kwargs):
394 ret = None
395 volname = kwargs['vol_name']
396 subvolname = kwargs['sub_name']
397 groupname = kwargs['group_name']
398
399 try:
400 with open_volume(self, volname) as fs_handle:
401 with open_group(fs_handle, self.volspec, groupname) as group:
402 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.GETPATH) as subvolume:
403 subvolpath = subvolume.path
404 ret = 0, subvolpath.decode("utf-8"), ""
405 except VolumeException as ve:
406 ret = self.volume_exception_to_retval(ve)
407 return ret
408
409 def subvolume_info(self, **kwargs):
410 ret = None
411 volname = kwargs['vol_name']
412 subvolname = kwargs['sub_name']
413 groupname = kwargs['group_name']
414
415 try:
416 with open_volume(self, volname) as fs_handle:
417 with open_group(fs_handle, self.volspec, groupname) as group:
418 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.INFO) as subvolume:
419 mon_addr_lst = []
420 mon_map_mons = self.mgr.get('mon_map')['mons']
421 for mon in mon_map_mons:
422 ip_port = mon['addr'].split("/")[0]
423 mon_addr_lst.append(ip_port)
424
425 subvol_info_dict = subvolume.info()
426 subvol_info_dict["mon_addrs"] = mon_addr_lst
427 ret = 0, json.dumps(subvol_info_dict, indent=4, sort_keys=True), ""
428 except VolumeException as ve:
429 ret = self.volume_exception_to_retval(ve)
430 return ret
431
432 def set_user_metadata(self, **kwargs):
433 ret = 0, "", ""
434 volname = kwargs['vol_name']
435 subvolname = kwargs['sub_name']
436 groupname = kwargs['group_name']
437 keyname = kwargs['key_name']
438 value = kwargs['value']
439
440 try:
441 with open_volume(self, volname) as fs_handle:
442 with open_group(fs_handle, self.volspec, groupname) as group:
443 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_SET) as subvolume:
444 subvolume.set_user_metadata(keyname, value)
445 except VolumeException as ve:
446 ret = self.volume_exception_to_retval(ve)
447 return ret
448
449 def get_user_metadata(self, **kwargs):
450 ret = 0, "", ""
451 volname = kwargs['vol_name']
452 subvolname = kwargs['sub_name']
453 groupname = kwargs['group_name']
454 keyname = kwargs['key_name']
455
456 try:
457 with open_volume(self, volname) as fs_handle:
458 with open_group(fs_handle, self.volspec, groupname) as group:
459 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_GET) as subvolume:
460 value = subvolume.get_user_metadata(keyname)
461 ret = 0, value, ""
462 except VolumeException as ve:
463 ret = self.volume_exception_to_retval(ve)
464 return ret
465
466 def list_user_metadata(self, **kwargs):
467 ret = 0, "", ""
468 volname = kwargs['vol_name']
469 subvolname = kwargs['sub_name']
470 groupname = kwargs['group_name']
471
472 try:
473 with open_volume(self, volname) as fs_handle:
474 with open_group(fs_handle, self.volspec, groupname) as group:
475 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_LIST) as subvolume:
476 subvol_metadata_dict = subvolume.list_user_metadata()
477 ret = 0, json.dumps(subvol_metadata_dict, indent=4, sort_keys=True), ""
478 except VolumeException as ve:
479 ret = self.volume_exception_to_retval(ve)
480 return ret
481
482 def remove_user_metadata(self, **kwargs):
483 ret = 0, "", ""
484 volname = kwargs['vol_name']
485 subvolname = kwargs['sub_name']
486 groupname = kwargs['group_name']
487 keyname = kwargs['key_name']
488 force = kwargs['force']
489
490 try:
491 with open_volume(self, volname) as fs_handle:
492 with open_group(fs_handle, self.volspec, groupname) as group:
493 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.USER_METADATA_REMOVE) as subvolume:
494 subvolume.remove_user_metadata(keyname)
495 except VolumeException as ve:
496 if not (ve.errno == -errno.ENOENT and force):
497 ret = self.volume_exception_to_retval(ve)
498 return ret
499
500 def list_subvolumes(self, **kwargs):
501 ret = 0, "", ""
502 volname = kwargs['vol_name']
503 groupname = kwargs['group_name']
504
505 try:
506 with open_volume(self, volname) as fs_handle:
507 with open_group(fs_handle, self.volspec, groupname) as group:
508 subvolumes = group.list_subvolumes()
509 ret = 0, name_to_json(subvolumes), ""
510 except VolumeException as ve:
511 ret = self.volume_exception_to_retval(ve)
512 return ret
513
514 def subvolume_exists(self, **kwargs):
515 volname = kwargs['vol_name']
516 groupname = kwargs['group_name']
517 ret = 0, "", ""
518 volume_exists = False
519
520 try:
521 with open_volume(self, volname) as fs_handle:
522 volume_exists = True
523 with open_group(fs_handle, self.volspec, groupname) as group:
524 res = group.has_subvolumes()
525 if res:
526 ret = 0, "subvolume exists", ""
527 else:
528 ret = 0, "no subvolume exists", ""
529 except VolumeException as ve:
530 if volume_exists and ve.errno == -errno.ENOENT:
531 ret = 0, "no subvolume exists", ""
532 else:
533 ret = self.volume_exception_to_retval(ve)
534 return ret
535
536 ### subvolume snapshot
537
538 def create_subvolume_snapshot(self, **kwargs):
539 ret = 0, "", ""
540 volname = kwargs['vol_name']
541 subvolname = kwargs['sub_name']
542 snapname = kwargs['snap_name']
543 groupname = kwargs['group_name']
544
545 try:
546 with open_volume(self, volname) as fs_handle:
547 with open_group(fs_handle, self.volspec, groupname) as group:
548 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_CREATE) as subvolume:
549 subvolume.create_snapshot(snapname)
550 except VolumeException as ve:
551 ret = self.volume_exception_to_retval(ve)
552 return ret
553
554 def remove_subvolume_snapshot(self, **kwargs):
555 ret = 0, "", ""
556 volname = kwargs['vol_name']
557 subvolname = kwargs['sub_name']
558 snapname = kwargs['snap_name']
559 groupname = kwargs['group_name']
560 force = kwargs['force']
561
562 try:
563 with open_volume(self, volname) as fs_handle:
564 with open_group(fs_handle, self.volspec, groupname) as group:
565 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_REMOVE) as subvolume:
566 subvolume.remove_snapshot(snapname, force)
567 except VolumeException as ve:
568 # ESTALE serves as an error to state that subvolume is currently stale due to internal removal and,
569 # we should tickle the purge jobs to purge the same
570 if ve.errno == -errno.ESTALE:
571 self.purge_queue.queue_job(volname)
572 elif not (ve.errno == -errno.ENOENT and force):
573 ret = self.volume_exception_to_retval(ve)
574 return ret
575
576 def subvolume_snapshot_info(self, **kwargs):
577 ret = 0, "", ""
578 volname = kwargs['vol_name']
579 subvolname = kwargs['sub_name']
580 snapname = kwargs['snap_name']
581 groupname = kwargs['group_name']
582
583 try:
584 with open_volume(self, volname) as fs_handle:
585 with open_group(fs_handle, self.volspec, groupname) as group:
586 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_INFO) as subvolume:
587 snap_info_dict = subvolume.snapshot_info(snapname)
588 ret = 0, json.dumps(snap_info_dict, indent=4, sort_keys=True), ""
589 except VolumeException as ve:
590 ret = self.volume_exception_to_retval(ve)
591 return ret
592
593 def set_subvolume_snapshot_metadata(self, **kwargs):
594 ret = 0, "", ""
595 volname = kwargs['vol_name']
596 subvolname = kwargs['sub_name']
597 snapname = kwargs['snap_name']
598 groupname = kwargs['group_name']
599 keyname = kwargs['key_name']
600 value = kwargs['value']
601
602 try:
603 with open_volume(self, volname) as fs_handle:
604 with open_group(fs_handle, self.volspec, groupname) as group:
605 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_SET) as subvolume:
606 if not snapname.encode('utf-8') in subvolume.list_snapshots():
607 raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
608 subvolume.set_snapshot_metadata(snapname, keyname, value)
609 except VolumeException as ve:
610 ret = self.volume_exception_to_retval(ve)
611 return ret
612
613 def get_subvolume_snapshot_metadata(self, **kwargs):
614 ret = 0, "", ""
615 volname = kwargs['vol_name']
616 subvolname = kwargs['sub_name']
617 snapname = kwargs['snap_name']
618 groupname = kwargs['group_name']
619 keyname = kwargs['key_name']
620
621 try:
622 with open_volume(self, volname) as fs_handle:
623 with open_group(fs_handle, self.volspec, groupname) as group:
624 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_GET) as subvolume:
625 if not snapname.encode('utf-8') in subvolume.list_snapshots():
626 raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
627 value = subvolume.get_snapshot_metadata(snapname, keyname)
628 ret = 0, value, ""
629 except VolumeException as ve:
630 ret = self.volume_exception_to_retval(ve)
631 return ret
632
633 def list_subvolume_snapshot_metadata(self, **kwargs):
634 ret = 0, "", ""
635 volname = kwargs['vol_name']
636 subvolname = kwargs['sub_name']
637 snapname = kwargs['snap_name']
638 groupname = kwargs['group_name']
639
640 try:
641 with open_volume(self, volname) as fs_handle:
642 with open_group(fs_handle, self.volspec, groupname) as group:
643 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_LIST) as subvolume:
644 if not snapname.encode('utf-8') in subvolume.list_snapshots():
645 raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
646 snap_metadata_dict = subvolume.list_snapshot_metadata(snapname)
647 ret = 0, json.dumps(snap_metadata_dict, indent=4, sort_keys=True), ""
648 except VolumeException as ve:
649 ret = self.volume_exception_to_retval(ve)
650 return ret
651
652 def remove_subvolume_snapshot_metadata(self, **kwargs):
653 ret = 0, "", ""
654 volname = kwargs['vol_name']
655 subvolname = kwargs['sub_name']
656 snapname = kwargs['snap_name']
657 groupname = kwargs['group_name']
658 keyname = kwargs['key_name']
659 force = kwargs['force']
660
661 try:
662 with open_volume(self, volname) as fs_handle:
663 with open_group(fs_handle, self.volspec, groupname) as group:
664 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_METADATA_REMOVE) as subvolume:
665 if not snapname.encode('utf-8') in subvolume.list_snapshots():
666 raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(snapname))
667 subvolume.remove_snapshot_metadata(snapname, keyname)
668 except VolumeException as ve:
669 if not (ve.errno == -errno.ENOENT and force):
670 ret = self.volume_exception_to_retval(ve)
671 return ret
672
673 def list_subvolume_snapshots(self, **kwargs):
674 ret = 0, "", ""
675 volname = kwargs['vol_name']
676 subvolname = kwargs['sub_name']
677 groupname = kwargs['group_name']
678
679 try:
680 with open_volume(self, volname) as fs_handle:
681 with open_group(fs_handle, self.volspec, groupname) as group:
682 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_LIST) as subvolume:
683 snapshots = subvolume.list_snapshots()
684 ret = 0, name_to_json(snapshots), ""
685 except VolumeException as ve:
686 ret = self.volume_exception_to_retval(ve)
687 return ret
688
689 def protect_subvolume_snapshot(self, **kwargs):
690 ret = 0, "", "Deprecation warning: 'snapshot protect' call is deprecated and will be removed in a future release"
691 volname = kwargs['vol_name']
692 subvolname = kwargs['sub_name']
693 groupname = kwargs['group_name']
694
695 try:
696 with open_volume(self, volname) as fs_handle:
697 with open_group(fs_handle, self.volspec, groupname) as group:
698 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_PROTECT) as subvolume:
699 log.warning("snapshot protect call is deprecated and will be removed in a future release")
700 except VolumeException as ve:
701 ret = self.volume_exception_to_retval(ve)
702 return ret
703
704 def unprotect_subvolume_snapshot(self, **kwargs):
705 ret = 0, "", "Deprecation warning: 'snapshot unprotect' call is deprecated and will be removed in a future release"
706 volname = kwargs['vol_name']
707 subvolname = kwargs['sub_name']
708 groupname = kwargs['group_name']
709
710 try:
711 with open_volume(self, volname) as fs_handle:
712 with open_group(fs_handle, self.volspec, groupname) as group:
713 with open_subvol(self.mgr, fs_handle, self.volspec, group, subvolname, SubvolumeOpType.SNAP_UNPROTECT) as subvolume:
714 log.warning("snapshot unprotect call is deprecated and will be removed in a future release")
715 except VolumeException as ve:
716 ret = self.volume_exception_to_retval(ve)
717 return ret
718
719 def _prepare_clone_subvolume(self, fs_handle, volname, s_subvolume, s_snapname, t_group, t_subvolname, **kwargs):
720 t_pool = kwargs['pool_layout']
721 s_subvolname = kwargs['sub_name']
722 s_groupname = kwargs['group_name']
723 t_groupname = kwargs['target_group_name']
724
725 create_clone(self.mgr, fs_handle, self.volspec, t_group, t_subvolname, t_pool, volname, s_subvolume, s_snapname)
726 with open_subvol(self.mgr, fs_handle, self.volspec, t_group, t_subvolname, SubvolumeOpType.CLONE_INTERNAL) as t_subvolume:
727 try:
728 if t_groupname == s_groupname and t_subvolname == s_subvolname:
729 t_subvolume.attach_snapshot(s_snapname, t_subvolume)
730 else:
731 s_subvolume.attach_snapshot(s_snapname, t_subvolume)
732 self.cloner.queue_job(volname)
733 except VolumeException as ve:
734 try:
735 t_subvolume.remove()
736 self.purge_queue.queue_job(volname)
737 except Exception as e:
738 log.warning("failed to cleanup clone subvolume '{0}' ({1})".format(t_subvolname, e))
739 raise ve
740
741 def _clone_subvolume_snapshot(self, fs_handle, volname, s_group, s_subvolume, **kwargs):
742 s_snapname = kwargs['snap_name']
743 target_subvolname = kwargs['target_sub_name']
744 target_groupname = kwargs['target_group_name']
745 s_groupname = kwargs['group_name']
746
747 if not s_snapname.encode('utf-8') in s_subvolume.list_snapshots():
748 raise VolumeException(-errno.ENOENT, "snapshot '{0}' does not exist".format(s_snapname))
749
750 with open_group_unique(fs_handle, self.volspec, target_groupname, s_group, s_groupname) as target_group:
751 try:
752 with open_subvol(self.mgr, fs_handle, self.volspec, target_group, target_subvolname, SubvolumeOpType.CLONE_CREATE):
753 raise VolumeException(-errno.EEXIST, "subvolume '{0}' exists".format(target_subvolname))
754 except VolumeException as ve:
755 if ve.errno == -errno.ENOENT:
756 self._prepare_clone_subvolume(fs_handle, volname, s_subvolume, s_snapname,
757 target_group, target_subvolname, **kwargs)
758 else:
759 raise
760
761 def clone_subvolume_snapshot(self, **kwargs):
762 ret = 0, "", ""
763 volname = kwargs['vol_name']
764 s_subvolname = kwargs['sub_name']
765 s_groupname = kwargs['group_name']
766
767 try:
768 with open_volume(self, volname) as fs_handle:
769 with open_group(fs_handle, self.volspec, s_groupname) as s_group:
770 with open_subvol(self.mgr, fs_handle, self.volspec, s_group, s_subvolname, SubvolumeOpType.CLONE_SOURCE) as s_subvolume:
771 self._clone_subvolume_snapshot(fs_handle, volname, s_group, s_subvolume, **kwargs)
772 except VolumeException as ve:
773 ret = self.volume_exception_to_retval(ve)
774 return ret
775
776 def clone_status(self, **kwargs):
777 ret = 0, "", ""
778 volname = kwargs['vol_name']
779 clonename = kwargs['clone_name']
780 groupname = kwargs['group_name']
781
782 try:
783 with open_volume(self, volname) as fs_handle:
784 with open_group(fs_handle, self.volspec, groupname) as group:
785 with open_subvol(self.mgr, fs_handle, self.volspec, group, clonename, SubvolumeOpType.CLONE_STATUS) as subvolume:
786 ret = 0, json.dumps({'status' : subvolume.status}, indent=2), ""
787 except VolumeException as ve:
788 ret = self.volume_exception_to_retval(ve)
789 return ret
790
791 def clone_cancel(self, **kwargs):
792 ret = 0, "", ""
793 volname = kwargs['vol_name']
794 clonename = kwargs['clone_name']
795 groupname = kwargs['group_name']
796
797 try:
798 self.cloner.cancel_job(volname, (clonename, groupname))
799 except VolumeException as ve:
800 ret = self.volume_exception_to_retval(ve)
801 return ret
802
803 ### group operations
804
805 def create_subvolume_group(self, **kwargs):
806 ret = 0, "", ""
807 volname = kwargs['vol_name']
808 groupname = kwargs['group_name']
809 size = kwargs['size']
810 pool = kwargs['pool_layout']
811 uid = kwargs['uid']
812 gid = kwargs['gid']
813 mode = kwargs['mode']
814
815 try:
816 with open_volume(self, volname) as fs_handle:
817 try:
818 with open_group(fs_handle, self.volspec, groupname) as group:
819 # idempotent creation -- valid.
820 attrs = {
821 'uid': uid,
822 'gid': gid,
823 'mode': octal_str_to_decimal_int(mode),
824 'data_pool': pool,
825 'quota': size
826 }
827 set_group_attrs(fs_handle, group.path, attrs)
828 except VolumeException as ve:
829 if ve.errno == -errno.ENOENT:
830 oct_mode = octal_str_to_decimal_int(mode)
831 create_group(fs_handle, self.volspec, groupname, size, pool, oct_mode, uid, gid)
832 else:
833 raise
834 except VolumeException as ve:
835 # volume does not exist or subvolume group creation failed
836 ret = self.volume_exception_to_retval(ve)
837 return ret
838
839 def remove_subvolume_group(self, **kwargs):
840 ret = 0, "", ""
841 volname = kwargs['vol_name']
842 groupname = kwargs['group_name']
843 force = kwargs['force']
844
845 try:
846 with open_volume(self, volname) as fs_handle:
847 remove_group(fs_handle, self.volspec, groupname)
848 except VolumeException as ve:
849 if not (ve.errno == -errno.ENOENT and force):
850 ret = self.volume_exception_to_retval(ve)
851 return ret
852
853 def subvolumegroup_info(self, **kwargs):
854 ret = None
855 volname = kwargs['vol_name']
856 groupname = kwargs['group_name']
857
858 try:
859 with open_volume(self, volname) as fs_handle:
860 with open_group(fs_handle, self.volspec, groupname) as group:
861 mon_addr_lst = []
862 mon_map_mons = self.mgr.get('mon_map')['mons']
863 for mon in mon_map_mons:
864 ip_port = mon['addr'].split("/")[0]
865 mon_addr_lst.append(ip_port)
866
867 group_info_dict = group.info()
868 group_info_dict["mon_addrs"] = mon_addr_lst
869 ret = 0, json.dumps(group_info_dict, indent=4, sort_keys=True), ""
870 except VolumeException as ve:
871 ret = self.volume_exception_to_retval(ve)
872 return ret
873
874 def resize_subvolume_group(self, **kwargs):
875 ret = 0, "", ""
876 volname = kwargs['vol_name']
877 groupname = kwargs['group_name']
878 newsize = kwargs['new_size']
879 noshrink = kwargs['no_shrink']
880
881 try:
882 with open_volume(self, volname) as fs_handle:
883 with open_group(fs_handle, self.volspec, groupname) as group:
884 nsize, usedbytes = group.resize(newsize, noshrink)
885 ret = 0, json.dumps(
886 [{'bytes_used': usedbytes},{'bytes_quota': nsize},
887 {'bytes_pcent': "undefined" if nsize == 0 else '{0:.2f}'.format((float(usedbytes) / nsize) * 100.0)}],
888 indent=4, sort_keys=True), ""
889 except VolumeException as ve:
890 ret = self.volume_exception_to_retval(ve)
891 return ret
892
893 def getpath_subvolume_group(self, **kwargs):
894 volname = kwargs['vol_name']
895 groupname = kwargs['group_name']
896
897 try:
898 with open_volume(self, volname) as fs_handle:
899 with open_group(fs_handle, self.volspec, groupname) as group:
900 return 0, group.path.decode('utf-8'), ""
901 except VolumeException as ve:
902 return self.volume_exception_to_retval(ve)
903
904 def list_subvolume_groups(self, **kwargs):
905 volname = kwargs['vol_name']
906 ret = 0, '[]', ""
907 volume_exists = False
908 try:
909 with open_volume(self, volname) as fs_handle:
910 volume_exists = True
911 groups = listdir(fs_handle, self.volspec.base_dir, filter_entries=[dir.encode('utf-8') for dir in self.volspec.INTERNAL_DIRS])
912 ret = 0, name_to_json(groups), ""
913 except VolumeException as ve:
914 if not ve.errno == -errno.ENOENT or not volume_exists:
915 ret = self.volume_exception_to_retval(ve)
916 return ret
917
918 def pin_subvolume_group(self, **kwargs):
919 ret = 0, "", ""
920 volname = kwargs['vol_name']
921 groupname = kwargs['group_name']
922 pin_type = kwargs['pin_type']
923 pin_setting = kwargs['pin_setting']
924
925 try:
926 with open_volume(self, volname) as fs_handle:
927 with open_group(fs_handle, self.volspec, groupname) as group:
928 group.pin(pin_type, pin_setting)
929 ret = 0, json.dumps({}), ""
930 except VolumeException as ve:
931 ret = self.volume_exception_to_retval(ve)
932 return ret
933
934 def subvolume_group_exists(self, **kwargs):
935 volname = kwargs['vol_name']
936 ret = 0, "", ""
937 volume_exists = False
938
939 try:
940 with open_volume(self, volname) as fs_handle:
941 volume_exists = True
942 res = has_subdir(fs_handle, self.volspec.base_dir, filter_entries=[
943 dir.encode('utf-8') for dir in self.volspec.INTERNAL_DIRS])
944 if res:
945 ret = 0, "subvolumegroup exists", ""
946 else:
947 ret = 0, "no subvolumegroup exists", ""
948 except VolumeException as ve:
949 if volume_exists and ve.errno == -errno.ENOENT:
950 ret = 0, "no subvolumegroup exists", ""
951 else:
952 ret = self.volume_exception_to_retval(ve)
953 return ret
954
955 ### group snapshot
956
957 def create_subvolume_group_snapshot(self, **kwargs):
958 ret = -errno.ENOSYS, "", "subvolume group snapshots are not supported"
959 volname = kwargs['vol_name']
960 groupname = kwargs['group_name']
961 # snapname = kwargs['snap_name']
962
963 try:
964 with open_volume(self, volname) as fs_handle:
965 with open_group(fs_handle, self.volspec, groupname) as group:
966 # as subvolumes are marked with the vxattr ceph.dir.subvolume deny snapshots
967 # at the subvolume group (see: https://tracker.ceph.com/issues/46074)
968 # group.create_snapshot(snapname)
969 pass
970 except VolumeException as ve:
971 ret = self.volume_exception_to_retval(ve)
972 return ret
973
974 def remove_subvolume_group_snapshot(self, **kwargs):
975 ret = 0, "", ""
976 volname = kwargs['vol_name']
977 groupname = kwargs['group_name']
978 snapname = kwargs['snap_name']
979 force = kwargs['force']
980
981 try:
982 with open_volume(self, volname) as fs_handle:
983 with open_group(fs_handle, self.volspec, groupname) as group:
984 group.remove_snapshot(snapname)
985 except VolumeException as ve:
986 if not (ve.errno == -errno.ENOENT and force):
987 ret = self.volume_exception_to_retval(ve)
988 return ret
989
990 def list_subvolume_group_snapshots(self, **kwargs):
991 ret = 0, "", ""
992 volname = kwargs['vol_name']
993 groupname = kwargs['group_name']
994
995 try:
996 with open_volume(self, volname) as fs_handle:
997 with open_group(fs_handle, self.volspec, groupname) as group:
998 snapshots = group.list_snapshots()
999 ret = 0, name_to_json(snapshots), ""
1000 except VolumeException as ve:
1001 ret = self.volume_exception_to_retval(ve)
1002 return ret