]>
Commit | Line | Data |
---|---|---|
92f5a8d4 TL |
1 | import os |
2 | import errno | |
3 | import logging | |
9f95a23c | 4 | import sys |
2a845540 TL |
5 | import threading |
6 | import configparser | |
7 | import re | |
92f5a8d4 TL |
8 | |
9 | import cephfs | |
10 | ||
11 | from ...exception import MetadataMgrException | |
12 | ||
13 | log = logging.getLogger(__name__) | |
14 | ||
2a845540 TL |
15 | # _lock needs to be shared across all instances of MetadataManager. |
16 | # that is why we have a file level instance | |
17 | _lock = threading.Lock() | |
18 | ||
19 | ||
20 | def _conf_reader(fs, fd, offset=0, length=4096): | |
21 | while True: | |
22 | buf = fs.read(fd, offset, length) | |
23 | offset += len(buf) | |
24 | if not buf: | |
25 | return | |
26 | yield buf.decode('utf-8') | |
27 | ||
28 | ||
29 | class _ConfigWriter: | |
30 | def __init__(self, fs, fd): | |
31 | self._fs = fs | |
32 | self._fd = fd | |
33 | self._wrote = 0 | |
34 | ||
35 | def write(self, value): | |
36 | buf = value.encode('utf-8') | |
37 | wrote = self._fs.write(self._fd, buf, -1) | |
38 | self._wrote += wrote | |
39 | return wrote | |
40 | ||
41 | def fsync(self): | |
42 | self._fs.fsync(self._fd, 0) | |
43 | ||
44 | @property | |
45 | def wrote(self): | |
46 | return self._wrote | |
47 | ||
48 | def __enter__(self): | |
49 | return self | |
50 | ||
51 | def __exit__(self, exc_type, exc_value, tb): | |
52 | self._fs.close(self._fd) | |
53 | ||
54 | ||
92f5a8d4 TL |
55 | class MetadataManager(object): |
56 | GLOBAL_SECTION = "GLOBAL" | |
33c7a0ef | 57 | USER_METADATA_SECTION = "USER_METADATA" |
92f5a8d4 TL |
58 | GLOBAL_META_KEY_VERSION = "version" |
59 | GLOBAL_META_KEY_TYPE = "type" | |
60 | GLOBAL_META_KEY_PATH = "path" | |
61 | GLOBAL_META_KEY_STATE = "state" | |
62 | ||
33c7a0ef TL |
63 | CLONE_FAILURE_SECTION = "CLONE_FAILURE" |
64 | CLONE_FAILURE_META_KEY_ERRNO = "errno" | |
65 | CLONE_FAILURE_META_KEY_ERROR_MSG = "error_msg" | |
66 | ||
92f5a8d4 TL |
67 | def __init__(self, fs, config_path, mode): |
68 | self.fs = fs | |
69 | self.mode = mode | |
70 | self.config_path = config_path | |
2a845540 | 71 | self.config = configparser.ConfigParser() |
92f5a8d4 TL |
72 | |
73 | def refresh(self): | |
74 | fd = None | |
92f5a8d4 | 75 | try: |
2a845540 TL |
76 | log.debug("opening config {0}".format(self.config_path)) |
77 | with _lock: | |
78 | fd = self.fs.open(self.config_path, os.O_RDONLY) | |
79 | cfg = ''.join(_conf_reader(self.fs, fd)) | |
80 | self.config.read_string(cfg, source=self.config_path) | |
0948533f TL |
81 | except UnicodeDecodeError: |
82 | raise MetadataMgrException(-errno.EINVAL, | |
83 | "failed to decode, erroneous metadata config '{0}'".format(self.config_path)) | |
92f5a8d4 TL |
84 | except cephfs.ObjectNotFound: |
85 | raise MetadataMgrException(-errno.ENOENT, "metadata config '{0}' not found".format(self.config_path)) | |
86 | except cephfs.Error as e: | |
87 | raise MetadataMgrException(-e.args[0], e.args[1]) | |
0948533f TL |
88 | except configparser.Error: |
89 | raise MetadataMgrException(-errno.EINVAL, "failed to parse, erroneous metadata config " | |
90 | "'{0}'".format(self.config_path)) | |
2a845540 TL |
91 | finally: |
92 | if fd is not None: | |
93 | self.fs.close(fd) | |
92f5a8d4 TL |
94 | |
95 | def flush(self): | |
96 | # cull empty sections | |
97 | for section in list(self.config.sections()): | |
98 | if len(self.config.items(section)) == 0: | |
99 | self.config.remove_section(section) | |
100 | ||
92f5a8d4 | 101 | try: |
2a845540 TL |
102 | with _lock: |
103 | tmp_config_path = self.config_path + b'.tmp' | |
104 | fd = self.fs.open(tmp_config_path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, self.mode) | |
105 | with _ConfigWriter(self.fs, fd) as cfg_writer: | |
106 | self.config.write(cfg_writer) | |
107 | cfg_writer.fsync() | |
108 | self.fs.rename(tmp_config_path, self.config_path) | |
109 | log.info(f"wrote {cfg_writer.wrote} bytes to config {tmp_config_path}") | |
110 | log.info(f"Renamed {tmp_config_path} to config {self.config_path}") | |
92f5a8d4 TL |
111 | except cephfs.Error as e: |
112 | raise MetadataMgrException(-e.args[0], e.args[1]) | |
92f5a8d4 TL |
113 | |
114 | def init(self, version, typ, path, state): | |
115 | # you may init just once before refresh (helps to overwrite conf) | |
116 | if self.config.has_section(MetadataManager.GLOBAL_SECTION): | |
117 | raise MetadataMgrException(-errno.EINVAL, "init called on an existing config") | |
118 | ||
119 | self.add_section(MetadataManager.GLOBAL_SECTION) | |
120 | self.update_section_multi( | |
121 | MetadataManager.GLOBAL_SECTION, {MetadataManager.GLOBAL_META_KEY_VERSION : str(version), | |
122 | MetadataManager.GLOBAL_META_KEY_TYPE : str(typ), | |
123 | MetadataManager.GLOBAL_META_KEY_PATH : str(path), | |
124 | MetadataManager.GLOBAL_META_KEY_STATE : str(state) | |
125 | }) | |
126 | ||
127 | def add_section(self, section): | |
128 | try: | |
129 | self.config.add_section(section) | |
130 | except configparser.DuplicateSectionError: | |
131 | return | |
132 | except: | |
133 | raise MetadataMgrException(-errno.EINVAL, "error adding section to config") | |
134 | ||
135 | def remove_option(self, section, key): | |
136 | if not self.config.has_section(section): | |
137 | raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section)) | |
33c7a0ef | 138 | return self.config.remove_option(section, key) |
92f5a8d4 TL |
139 | |
140 | def remove_section(self, section): | |
141 | self.config.remove_section(section) | |
142 | ||
143 | def update_section(self, section, key, value): | |
144 | if not self.config.has_section(section): | |
145 | raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section)) | |
146 | self.config.set(section, key, str(value)) | |
147 | ||
148 | def update_section_multi(self, section, dct): | |
149 | if not self.config.has_section(section): | |
150 | raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section)) | |
151 | for key,value in dct.items(): | |
152 | self.config.set(section, key, str(value)) | |
153 | ||
154 | def update_global_section(self, key, value): | |
155 | self.update_section(MetadataManager.GLOBAL_SECTION, key, str(value)) | |
156 | ||
157 | def get_option(self, section, key): | |
158 | if not self.config.has_section(section): | |
159 | raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section)) | |
160 | if not self.config.has_option(section, key): | |
161 | raise MetadataMgrException(-errno.ENOENT, "no config '{0}' in section '{1}'".format(key, section)) | |
162 | return self.config.get(section, key) | |
163 | ||
164 | def get_global_option(self, key): | |
165 | return self.get_option(MetadataManager.GLOBAL_SECTION, key) | |
166 | ||
33c7a0ef TL |
167 | def list_all_options_from_section(self, section): |
168 | metadata_dict = {} | |
169 | if self.config.has_section(section): | |
170 | options = self.config.options(section) | |
171 | for option in options: | |
172 | metadata_dict[option] = self.config.get(section,option) | |
173 | return metadata_dict | |
174 | ||
2a845540 TL |
175 | def list_all_keys_with_specified_values_from_section(self, section, value): |
176 | keys = [] | |
177 | if self.config.has_section(section): | |
178 | options = self.config.options(section) | |
179 | for option in options: | |
180 | if (value == self.config.get(section, option)) : | |
181 | keys.append(option) | |
182 | return keys | |
183 | ||
92f5a8d4 TL |
184 | def section_has_item(self, section, item): |
185 | if not self.config.has_section(section): | |
186 | raise MetadataMgrException(-errno.ENOENT, "section '{0}' does not exist".format(section)) | |
187 | return item in [v[1] for v in self.config.items(section)] | |
2a845540 TL |
188 | |
189 | def has_snap_metadata_section(self): | |
190 | sections = self.config.sections() | |
191 | r = re.compile('SNAP_METADATA_.*') | |
192 | for section in sections: | |
193 | if r.match(section): | |
194 | return True | |
195 | return False | |
196 | ||
197 | def list_snaps_with_metadata(self): | |
198 | sections = self.config.sections() | |
199 | r = re.compile('SNAP_METADATA_.*') | |
200 | return [section[len("SNAP_METADATA_"):] for section in sections if r.match(section)] |