]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/fs_util.py
44e8d279a8a659272b506a17cad5f58674de3eec
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / fs_util.py
1 import os
2 import errno
3 import logging
4
5 from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
6
7 import cephfs
8 import orchestrator
9
10 from .exception import VolumeException
11
12 log = logging.getLogger(__name__)
13
14 def create_pool(mgr, pool_name):
15 # create the given pool
16 command = {'prefix': 'osd pool create', 'pool': pool_name}
17 return mgr.mon_command(command)
18
19 def remove_pool(mgr, pool_name):
20 command = {'prefix': 'osd pool rm', 'pool': pool_name, 'pool2': pool_name,
21 'yes_i_really_really_mean_it': True}
22 return mgr.mon_command(command)
23
24 def create_filesystem(mgr, fs_name, metadata_pool, data_pool):
25 command = {'prefix': 'fs new', 'fs_name': fs_name, 'metadata': metadata_pool,
26 'data': data_pool}
27 return mgr.mon_command(command)
28
29 def remove_filesystem(mgr, fs_name):
30 command = {'prefix': 'fs fail', 'fs_name': fs_name}
31 r, outb, outs = mgr.mon_command(command)
32 if r != 0:
33 return r, outb, outs
34
35 command = {'prefix': 'fs rm', 'fs_name': fs_name, 'yes_i_really_mean_it': True}
36 return mgr.mon_command(command)
37
38 def create_mds(mgr, fs_name, placement):
39 spec = ServiceSpec(service_type='mds',
40 service_id=fs_name,
41 placement=PlacementSpec.from_string(placement))
42 try:
43 completion = mgr.apply_mds(spec)
44 mgr._orchestrator_wait([completion])
45 orchestrator.raise_if_exception(completion)
46 except (ImportError, orchestrator.OrchestratorError):
47 return 0, "", "Volume created successfully (no MDS daemons created)"
48 except Exception as e:
49 # Don't let detailed orchestrator exceptions (python backtraces)
50 # bubble out to the user
51 log.exception("Failed to create MDS daemons")
52 return -errno.EINVAL, "", str(e)
53 return 0, "", ""
54
55 def volume_exists(mgr, fs_name):
56 fs_map = mgr.get('fs_map')
57 for fs in fs_map['filesystems']:
58 if fs['mdsmap']['fs_name'] == fs_name:
59 return True
60 return False
61
62 def listdir(fs, dirpath):
63 """
64 Get the directory names (only dirs) for a given path
65 """
66 dirs = []
67 try:
68 with fs.opendir(dirpath) as dir_handle:
69 d = fs.readdir(dir_handle)
70 while d:
71 if (d.d_name not in (b".", b"..")) and d.is_dir():
72 dirs.append(d.d_name)
73 d = fs.readdir(dir_handle)
74 except cephfs.Error as e:
75 raise VolumeException(-e.args[0], e.args[1])
76 return dirs
77
78 def list_one_entry_at_a_time(fs, dirpath):
79 """
80 Get a directory entry (one entry a time)
81 """
82 try:
83 with fs.opendir(dirpath) as dir_handle:
84 d = fs.readdir(dir_handle)
85 while d:
86 if d.d_name not in (b".", b".."):
87 yield d
88 d = fs.readdir(dir_handle)
89 except cephfs.Error as e:
90 raise VolumeException(-e.args[0], e.args[1])
91
92 def copy_file(fs, src, dst, mode, cancel_check=None):
93 """
94 Copy a regular file from @src to @dst. @dst is overwritten if it exists.
95 """
96 src_fd = dst_fd = None
97 try:
98 src_fd = fs.open(src, os.O_RDONLY);
99 dst_fd = fs.open(dst, os.O_CREAT | os.O_TRUNC | os.O_WRONLY, mode)
100 except cephfs.Error as e:
101 if src_fd is not None:
102 fs.close(src_fd)
103 if dst_fd is not None:
104 fs.close(dst_fd)
105 raise VolumeException(-e.args[0], e.args[1])
106
107 IO_SIZE = 8 * 1024 * 1024
108 try:
109 while True:
110 if cancel_check and cancel_check():
111 raise VolumeException(-errno.EINTR, "copy operation interrupted")
112 data = fs.read(src_fd, -1, IO_SIZE)
113 if not len(data):
114 break
115 written = 0
116 while written < len(data):
117 written += fs.write(dst_fd, data[written:], -1)
118 fs.fsync(dst_fd, 0)
119 except cephfs.Error as e:
120 raise VolumeException(-e.args[0], e.args[1])
121 finally:
122 fs.close(src_fd)
123 fs.close(dst_fd)
124
125 def get_ancestor_xattr(fs, path, attr):
126 """
127 Helper for reading layout information: if this xattr is missing
128 on the requested path, keep checking parents until we find it.
129 """
130 try:
131 return fs.getxattr(path, attr).decode('utf-8')
132 except cephfs.NoData as e:
133 if path == "/":
134 raise VolumeException(-e.args[0], e.args[1])
135 else:
136 return get_ancestor_xattr(fs, os.path.split(path)[0], attr)