]>
Commit | Line | Data |
---|---|---|
92f5a8d4 TL |
1 | import os |
2 | import errno | |
3 | import logging | |
4 | ||
9f95a23c TL |
5 | from ceph.deployment.service_spec import ServiceSpec, PlacementSpec |
6 | ||
92f5a8d4 TL |
7 | import cephfs |
8 | import orchestrator | |
9 | ||
10 | from .exception import VolumeException | |
11 | ||
12 | log = logging.getLogger(__name__) | |
13 | ||
9f95a23c | 14 | def create_pool(mgr, pool_name): |
92f5a8d4 | 15 | # create the given pool |
9f95a23c | 16 | command = {'prefix': 'osd pool create', 'pool': pool_name} |
92f5a8d4 TL |
17 | return mgr.mon_command(command) |
18 | ||
19 | def remove_pool(mgr, pool_name): | |
20 | command = {'prefix': 'osd pool rm', 'pool': pool_name, 'pool2': pool_name, | |
21 | 'yes_i_really_really_mean_it': True} | |
22 | return mgr.mon_command(command) | |
23 | ||
24 | def create_filesystem(mgr, fs_name, metadata_pool, data_pool): | |
25 | command = {'prefix': 'fs new', 'fs_name': fs_name, 'metadata': metadata_pool, | |
26 | 'data': data_pool} | |
27 | return mgr.mon_command(command) | |
28 | ||
29 | def remove_filesystem(mgr, fs_name): | |
30 | command = {'prefix': 'fs fail', 'fs_name': fs_name} | |
31 | r, outb, outs = mgr.mon_command(command) | |
32 | if r != 0: | |
33 | return r, outb, outs | |
34 | ||
35 | command = {'prefix': 'fs rm', 'fs_name': fs_name, 'yes_i_really_mean_it': True} | |
36 | return mgr.mon_command(command) | |
37 | ||
9f95a23c TL |
38 | def create_mds(mgr, fs_name, placement): |
39 | spec = ServiceSpec(service_type='mds', | |
40 | service_id=fs_name, | |
41 | placement=PlacementSpec.from_string(placement)) | |
92f5a8d4 | 42 | try: |
9f95a23c | 43 | completion = mgr.apply_mds(spec) |
92f5a8d4 TL |
44 | mgr._orchestrator_wait([completion]) |
45 | orchestrator.raise_if_exception(completion) | |
46 | except (ImportError, orchestrator.OrchestratorError): | |
47 | return 0, "", "Volume created successfully (no MDS daemons created)" | |
48 | except Exception as e: | |
49 | # Don't let detailed orchestrator exceptions (python backtraces) | |
50 | # bubble out to the user | |
51 | log.exception("Failed to create MDS daemons") | |
52 | return -errno.EINVAL, "", str(e) | |
53 | return 0, "", "" | |
54 | ||
55 | def volume_exists(mgr, fs_name): | |
56 | fs_map = mgr.get('fs_map') | |
57 | for fs in fs_map['filesystems']: | |
58 | if fs['mdsmap']['fs_name'] == fs_name: | |
59 | return True | |
60 | return False | |
61 | ||
62 | def listdir(fs, dirpath): | |
63 | """ | |
64 | Get the directory names (only dirs) for a given path | |
65 | """ | |
66 | dirs = [] | |
67 | try: | |
68 | with fs.opendir(dirpath) as dir_handle: | |
69 | d = fs.readdir(dir_handle) | |
70 | while d: | |
71 | if (d.d_name not in (b".", b"..")) and d.is_dir(): | |
72 | dirs.append(d.d_name) | |
73 | d = fs.readdir(dir_handle) | |
74 | except cephfs.Error as e: | |
75 | raise VolumeException(-e.args[0], e.args[1]) | |
76 | return dirs | |
77 | ||
78 | def list_one_entry_at_a_time(fs, dirpath): | |
79 | """ | |
80 | Get a directory entry (one entry a time) | |
81 | """ | |
82 | try: | |
83 | with fs.opendir(dirpath) as dir_handle: | |
84 | d = fs.readdir(dir_handle) | |
85 | while d: | |
86 | if d.d_name not in (b".", b".."): | |
87 | yield d | |
88 | d = fs.readdir(dir_handle) | |
89 | except cephfs.Error as e: | |
90 | raise VolumeException(-e.args[0], e.args[1]) | |
91 | ||
9f95a23c | 92 | def copy_file(fs, src, dst, mode, cancel_check=None): |
92f5a8d4 TL |
93 | """ |
94 | Copy a regular file from @src to @dst. @dst is overwritten if it exists. | |
95 | """ | |
96 | src_fd = dst_fd = None | |
97 | try: | |
98 | src_fd = fs.open(src, os.O_RDONLY); | |
99 | dst_fd = fs.open(dst, os.O_CREAT | os.O_TRUNC | os.O_WRONLY, mode) | |
92f5a8d4 TL |
100 | except cephfs.Error as e: |
101 | if src_fd is not None: | |
102 | fs.close(src_fd) | |
103 | if dst_fd is not None: | |
104 | fs.close(dst_fd) | |
105 | raise VolumeException(-e.args[0], e.args[1]) | |
106 | ||
107 | IO_SIZE = 8 * 1024 * 1024 | |
108 | try: | |
109 | while True: | |
9f95a23c TL |
110 | if cancel_check and cancel_check(): |
111 | raise VolumeException(-errno.EINTR, "copy operation interrupted") | |
92f5a8d4 TL |
112 | data = fs.read(src_fd, -1, IO_SIZE) |
113 | if not len(data): | |
114 | break | |
115 | written = 0 | |
116 | while written < len(data): | |
117 | written += fs.write(dst_fd, data[written:], -1) | |
118 | fs.fsync(dst_fd, 0) | |
119 | except cephfs.Error as e: | |
120 | raise VolumeException(-e.args[0], e.args[1]) | |
121 | finally: | |
122 | fs.close(src_fd) | |
123 | fs.close(dst_fd) | |
124 | ||
125 | def get_ancestor_xattr(fs, path, attr): | |
126 | """ | |
127 | Helper for reading layout information: if this xattr is missing | |
128 | on the requested path, keep checking parents until we find it. | |
129 | """ | |
130 | try: | |
131 | return fs.getxattr(path, attr).decode('utf-8') | |
132 | except cephfs.NoData as e: | |
133 | if path == "/": | |
9f95a23c | 134 | raise VolumeException(-e.args[0], e.args[1]) |
92f5a8d4 TL |
135 | else: |
136 | return get_ancestor_xattr(fs, os.path.split(path)[0], attr) |