]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/volumes/fs/fs_util.py
import quincy beta 17.1.0
[ceph.git] / ceph / src / pybind / mgr / volumes / fs / fs_util.py
1 import os
2 import errno
3 import logging
4
5 from ceph.deployment.service_spec import ServiceSpec, PlacementSpec
6
7 import cephfs
8 import orchestrator
9
10 from .exception import VolumeException
11
12 log = logging.getLogger(__name__)
13
14 def create_pool(mgr, pool_name):
15 # create the given pool
16 command = {'prefix': 'osd pool create', 'pool': pool_name}
17 return mgr.mon_command(command)
18
19 def remove_pool(mgr, pool_name):
20 command = {'prefix': 'osd pool rm', 'pool': pool_name, 'pool2': pool_name,
21 'yes_i_really_really_mean_it': True}
22 return mgr.mon_command(command)
23
24 def create_filesystem(mgr, fs_name, metadata_pool, data_pool):
25 command = {'prefix': 'fs new', 'fs_name': fs_name, 'metadata': metadata_pool,
26 'data': data_pool}
27 return mgr.mon_command(command)
28
29 def remove_filesystem(mgr, fs_name):
30 command = {'prefix': 'fs fail', 'fs_name': fs_name}
31 r, outb, outs = mgr.mon_command(command)
32 if r != 0:
33 return r, outb, outs
34
35 command = {'prefix': 'fs rm', 'fs_name': fs_name, 'yes_i_really_mean_it': True}
36 return mgr.mon_command(command)
37
38 def create_mds(mgr, fs_name, placement):
39 spec = ServiceSpec(service_type='mds',
40 service_id=fs_name,
41 placement=PlacementSpec.from_string(placement))
42 try:
43 completion = mgr.apply([spec], no_overwrite=True)
44 orchestrator.raise_if_exception(completion)
45 except (ImportError, orchestrator.OrchestratorError):
46 return 0, "", "Volume created successfully (no MDS daemons created)"
47 except Exception as e:
48 # Don't let detailed orchestrator exceptions (python backtraces)
49 # bubble out to the user
50 log.exception("Failed to create MDS daemons")
51 return -errno.EINVAL, "", str(e)
52 return 0, "", ""
53
54 def volume_exists(mgr, fs_name):
55 fs_map = mgr.get('fs_map')
56 for fs in fs_map['filesystems']:
57 if fs['mdsmap']['fs_name'] == fs_name:
58 return True
59 return False
60
61 def listdir(fs, dirpath):
62 """
63 Get the directory names (only dirs) for a given path
64 """
65 dirs = []
66 try:
67 with fs.opendir(dirpath) as dir_handle:
68 d = fs.readdir(dir_handle)
69 while d:
70 if (d.d_name not in (b".", b"..")) and d.is_dir():
71 dirs.append(d.d_name)
72 d = fs.readdir(dir_handle)
73 except cephfs.Error as e:
74 raise VolumeException(-e.args[0], e.args[1])
75 return dirs
76
77 def is_inherited_snap(snapname):
78 """
79 Returns True if the snapname is inherited else False
80 """
81 return snapname.startswith("_")
82
83 def listsnaps(fs, volspec, snapdirpath, filter_inherited_snaps=False):
84 """
85 Get the snap names from a given snap directory path
86 """
87 if os.path.basename(snapdirpath) != volspec.snapshot_prefix.encode('utf-8'):
88 raise VolumeException(-errno.EINVAL, "Not a snap directory: {0}".format(snapdirpath))
89 snaps = []
90 try:
91 with fs.opendir(snapdirpath) as dir_handle:
92 d = fs.readdir(dir_handle)
93 while d:
94 if (d.d_name not in (b".", b"..")) and d.is_dir():
95 d_name = d.d_name.decode('utf-8')
96 if not is_inherited_snap(d_name):
97 snaps.append(d.d_name)
98 elif is_inherited_snap(d_name) and not filter_inherited_snaps:
99 snaps.append(d.d_name)
100 d = fs.readdir(dir_handle)
101 except cephfs.Error as e:
102 raise VolumeException(-e.args[0], e.args[1])
103 return snaps
104
105 def list_one_entry_at_a_time(fs, dirpath):
106 """
107 Get a directory entry (one entry a time)
108 """
109 try:
110 with fs.opendir(dirpath) as dir_handle:
111 d = fs.readdir(dir_handle)
112 while d:
113 if d.d_name not in (b".", b".."):
114 yield d
115 d = fs.readdir(dir_handle)
116 except cephfs.Error as e:
117 raise VolumeException(-e.args[0], e.args[1])
118
119 def copy_file(fs, src, dst, mode, cancel_check=None):
120 """
121 Copy a regular file from @src to @dst. @dst is overwritten if it exists.
122 """
123 src_fd = dst_fd = None
124 try:
125 src_fd = fs.open(src, os.O_RDONLY)
126 dst_fd = fs.open(dst, os.O_CREAT | os.O_TRUNC | os.O_WRONLY, mode)
127 except cephfs.Error as e:
128 if src_fd is not None:
129 fs.close(src_fd)
130 if dst_fd is not None:
131 fs.close(dst_fd)
132 raise VolumeException(-e.args[0], e.args[1])
133
134 IO_SIZE = 8 * 1024 * 1024
135 try:
136 while True:
137 if cancel_check and cancel_check():
138 raise VolumeException(-errno.EINTR, "copy operation interrupted")
139 data = fs.read(src_fd, -1, IO_SIZE)
140 if not len(data):
141 break
142 written = 0
143 while written < len(data):
144 written += fs.write(dst_fd, data[written:], -1)
145 fs.fsync(dst_fd, 0)
146 except cephfs.Error as e:
147 raise VolumeException(-e.args[0], e.args[1])
148 finally:
149 fs.close(src_fd)
150 fs.close(dst_fd)
151
152 def get_ancestor_xattr(fs, path, attr):
153 """
154 Helper for reading layout information: if this xattr is missing
155 on the requested path, keep checking parents until we find it.
156 """
157 try:
158 return fs.getxattr(path, attr).decode('utf-8')
159 except cephfs.NoData as e:
160 if path == "/":
161 raise VolumeException(-e.args[0], e.args[1])
162 else:
163 return get_ancestor_xattr(fs, os.path.split(path)[0], attr)
164
165 def create_base_dir(fs, path, mode):
166 """
167 Create volspec base/group directory if it doesn't exist
168 """
169 try:
170 fs.stat(path)
171 except cephfs.Error as e:
172 if e.args[0] == errno.ENOENT:
173 fs.mkdirs(path, mode)
174 else:
175 raise VolumeException(-e.args[0], e.args[1])