]> git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mirroring/fs/dir_map/policy.py
import ceph pacific 16.2.5
[ceph.git] / ceph / src / pybind / mgr / mirroring / fs / dir_map / policy.py
1 import os
2 import errno
3 import logging
4 import time
5 from threading import Lock
6 from typing import Dict
7
8 from .state_transition import ActionType, PolicyAction, Transition, \
9 State, StateTransition
10 from ..exception import MirrorException
11
12 log = logging.getLogger(__name__)
13
14 class DirectoryState:
15 def __init__(self, instance_id=None, mapped_time=None):
16 self.instance_id = instance_id
17 self.mapped_time = mapped_time
18 self.state = State.UNASSOCIATED
19 self.stalled = False
20 self.transition = Transition(ActionType.NONE)
21 self.next_state = None
22 self.purging = False
23
24 def __str__(self):
25 return f'[instance_id={self.instance_id}, mapped_time={self.mapped_time},'\
26 f' state={self.state}, transition={self.transition}, next_state={self.next_state},'\
27 f' purging={self.purging}]'
28
29 class Policy:
30 def __init__(self):
31 self.dir_states = {}
32 self.instance_to_dir_map = {}
33 self.dead_instances = []
34 self.lock = Lock()
35
36 @staticmethod
37 def is_instance_action(action_type):
38 return action_type in (ActionType.ACQUIRE,
39 ActionType.RELEASE)
40
41 def is_dead_instance(self, instance_id):
42 return instance_id in self.dead_instances
43
44 def is_state_scheduled(self, dir_state, state):
45 return dir_state.state == state or dir_state.next_state == state
46
47 def is_shuffling(self, dir_path):
48 log.debug(f'is_shuffling: {dir_path}')
49 return self.is_state_scheduled(self.dir_states[dir_path], State.SHUFFLING)
50
51 def can_shuffle_dir(self, dir_path):
52 """Right now, shuffle directories only based on idleness. Later, we
53 probably want to avoid shuffling images that were recently shuffled.
54 """
55 log.debug(f'can_shuffle_dir: {dir_path}')
56 dir_state = self.dir_states[dir_path]
57 return StateTransition.is_idle(dir_state.state)
58
59 def set_state(self, dir_state, state, ignore_current_state=False):
60 if not ignore_current_state and dir_state.state == state:
61 return False
62 elif StateTransition.is_idle(dir_state.state):
63 dir_state.state = state
64 dir_state.next_state = None
65 dir_state.transition = StateTransition.transit(
66 dir_state.state, dir_state.transition.action_type)
67 return True
68 dir_state.next_state = state
69 return False
70
71 def init(self, dir_mapping):
72 with self.lock:
73 for dir_path, dir_map in dir_mapping.items():
74 instance_id = dir_map['instance_id']
75 if instance_id:
76 if not instance_id in self.instance_to_dir_map:
77 self.instance_to_dir_map[instance_id] = []
78 self.instance_to_dir_map[instance_id].append(dir_path)
79 self.dir_states[dir_path] = DirectoryState(instance_id, dir_map['last_shuffled'])
80 dir_state = self.dir_states[dir_path]
81 state = State.INITIALIZING if instance_id else State.ASSOCIATING
82 purging = dir_map.get('purging', 0)
83 if purging:
84 dir_state.purging = True
85 state = State.DISASSOCIATING
86 if not instance_id:
87 dir_state.transition = StateTransition.transit(state,
88 dir_state.transition.action_type)
89 log.debug(f'starting state: {dir_path} {state}: {dir_state}')
90 self.set_state(dir_state, state)
91 log.debug(f'init dir_state: {dir_state}')
92
93 def lookup(self, dir_path):
94 log.debug(f'looking up {dir_path}')
95 with self.lock:
96 dir_state = self.dir_states.get(dir_path, None)
97 if dir_state:
98 return {'instance_id': dir_state.instance_id,
99 'mapped_time': dir_state.mapped_time,
100 'purging': dir_state.purging}
101 return None
102
103 def map(self, dir_path, dir_state):
104 log.debug(f'mapping {dir_path}')
105 min_instance_id = None
106 current_instance_id = dir_state.instance_id
107 if current_instance_id and not self.is_dead_instance(current_instance_id):
108 return True
109 if self.is_dead_instance(current_instance_id):
110 self.unmap(dir_path, dir_state)
111 for instance_id, dir_paths in self.instance_to_dir_map.items():
112 if self.is_dead_instance(instance_id):
113 continue
114 if not min_instance_id or len(dir_paths) < len(self.instance_to_dir_map[min_instance_id]):
115 min_instance_id = instance_id
116 if not min_instance_id:
117 log.debug(f'instance unavailable for {dir_path}')
118 return False
119 log.debug(f'dir_path {dir_path} maps to instance {min_instance_id}')
120 dir_state.instance_id = min_instance_id
121 dir_state.mapped_time = time.time()
122 self.instance_to_dir_map[min_instance_id].append(dir_path)
123 return True
124
125 def unmap(self, dir_path, dir_state):
126 instance_id = dir_state.instance_id
127 log.debug(f'unmapping {dir_path} from instance {instance_id}')
128 self.instance_to_dir_map[instance_id].remove(dir_path)
129 dir_state.instance_id = None
130 dir_state.mapped_time = None
131 if self.is_dead_instance(instance_id) and not self.instance_to_dir_map[instance_id]:
132 self.instance_to_dir_map.pop(instance_id)
133 self.dead_instances.remove(instance_id)
134
135 def shuffle(self, dirs_per_instance, include_stalled_dirs):
136 log.debug(f'directories per instance: {dirs_per_instance}')
137 shuffle_dirs = []
138 for instance_id, dir_paths in self.instance_to_dir_map.items():
139 cut_off = len(dir_paths) - dirs_per_instance
140 if cut_off > 0:
141 for dir_path in dir_paths:
142 if cut_off == 0:
143 break
144 if self.is_shuffling(dir_path):
145 cut_off -= 1
146 elif self.can_shuffle_dir(dir_path):
147 cut_off -= 1
148 shuffle_dirs.append(dir_path)
149 if include_stalled_dirs:
150 for dir_path, dir_state in self.dir_states.items():
151 if dir_state.stalled:
152 log.debug(f'{dir_path} is stalled: {dir_state} -- trigerring kick')
153 dir_state.stalled = False
154 shuffle_dirs.append(dir_path)
155 return shuffle_dirs
156
157 def execute_policy_action(self, dir_path, dir_state, policy_action):
158 log.debug(f'executing for directory {dir_path} policy_action {policy_action}')
159
160 done = True
161 if policy_action == PolicyAction.MAP:
162 done = self.map(dir_path, dir_state)
163 elif policy_action == PolicyAction.UNMAP:
164 self.unmap(dir_path, dir_state)
165 elif policy_action == PolicyAction.REMOVE:
166 if dir_state.state == State.UNASSOCIATED:
167 self.dir_states.pop(dir_path)
168 else:
169 raise Exception()
170 return done
171
172 def start_action(self, dir_path):
173 log.debug(f'start action: {dir_path}')
174 with self.lock:
175 dir_state = self.dir_states.get(dir_path, None)
176 if not dir_state:
177 raise Exception()
178 log.debug(f'dir_state: {dir_state}')
179 if dir_state.transition.start_policy_action:
180 stalled = not self.execute_policy_action(dir_path, dir_state,
181 dir_state.transition.start_policy_action)
182 if stalled:
183 next_action = ActionType.NONE
184 if dir_state.purging:
185 dir_state.next_state = None
186 dir_state.state = State.UNASSOCIATED
187 dir_state.transition = StateTransition.transit(State.DISASSOCIATING, ActionType.NONE)
188 self.set_state(dir_state, State.DISASSOCIATING)
189 next_action = dir_state.transition.action_type
190 else:
191 dir_state.stalled = True
192 log.debug(f'state machine stalled')
193 return next_action
194 return dir_state.transition.action_type
195
196 def finish_action(self, dir_path, r):
197 log.debug(f'finish action {dir_path} r={r}')
198 with self.lock:
199 dir_state = self.dir_states.get(dir_path, None)
200 if not dir_state:
201 raise Exception()
202 if r < 0 and (not Policy.is_instance_action(dir_state.transition.action_type) or
203 not dir_state.instance_id or
204 not dir_state.instance_id in self.dead_instances):
205 return True
206 log.debug(f'dir_state: {dir_state}')
207 finish_policy_action = dir_state.transition.finish_policy_action
208 dir_state.transition = StateTransition.transit(
209 dir_state.state, dir_state.transition.action_type)
210 log.debug(f'transitioned to dir_state: {dir_state}')
211 if dir_state.transition.final_state:
212 log.debug('reached final state')
213 dir_state.state = dir_state.transition.final_state
214 dir_state.transition = Transition(ActionType.NONE)
215 log.debug(f'final dir_state: {dir_state}')
216 if StateTransition.is_idle(dir_state.state) and dir_state.next_state:
217 self.set_state(dir_state, dir_state.next_state)
218 pending = not dir_state.transition.action_type == ActionType.NONE
219 if finish_policy_action:
220 self.execute_policy_action(dir_path, dir_state, finish_policy_action)
221 return pending
222
223 def find_tracked_ancestor_or_subtree(self, dir_path):
224 for tracked_path, _ in self.dir_states.items():
225 comp = [dir_path, tracked_path]
226 cpath = os.path.commonpath(comp)
227 if cpath in comp:
228 what = 'subtree' if cpath == tracked_path else 'ancestor'
229 return (tracked_path, what)
230 return None
231
232 def add_dir(self, dir_path):
233 log.debug(f'adding dir_path {dir_path}')
234 with self.lock:
235 if dir_path in self.dir_states:
236 return False
237 as_info = self.find_tracked_ancestor_or_subtree(dir_path)
238 if as_info:
239 raise MirrorException(-errno.EINVAL, f'{dir_path} is a {as_info[1]} of tracked path {as_info[0]}')
240 self.dir_states[dir_path] = DirectoryState()
241 dir_state = self.dir_states[dir_path]
242 log.debug(f'add dir_state: {dir_state}')
243 if dir_state.state == State.INITIALIZING:
244 return False
245 return self.set_state(dir_state, State.ASSOCIATING)
246
247 def remove_dir(self, dir_path):
248 log.debug(f'removing dir_path {dir_path}')
249 with self.lock:
250 dir_state = self.dir_states.get(dir_path, None)
251 if not dir_state:
252 return False
253 log.debug(f'removing dir_state: {dir_state}')
254 dir_state.purging = True
255 # advance the state machine with DISASSOCIATING state for removal
256 if dir_state.stalled:
257 dir_state.state = State.UNASSOCIATED
258 dir_state.transition = StateTransition.transit(State.DISASSOCIATING, ActionType.NONE)
259 r = self.set_state(dir_state, State.DISASSOCIATING)
260 log.debug(f'dir_state: {dir_state}')
261 return r
262
263 def add_instances_initial(self, instance_ids):
264 """Take care of figuring out instances which no longer exist
265 and remove them. This is to be done only once on startup to
266 identify instances which were previously removed but directories
267 are still mapped (on-disk) to them.
268 """
269 for instance_id in instance_ids:
270 if not instance_id in self.instance_to_dir_map:
271 self.instance_to_dir_map[instance_id] = []
272 dead_instances = []
273 for instance_id, _ in self.instance_to_dir_map.items():
274 if not instance_id in instance_ids:
275 dead_instances.append(instance_id)
276 if dead_instances:
277 self._remove_instances(dead_instances)
278
279 def add_instances(self, instance_ids, initial_update=False):
280 log.debug(f'adding instances: {instance_ids} initial_update {initial_update}')
281 with self.lock:
282 if initial_update:
283 self.add_instances_initial(instance_ids)
284 else:
285 nr_instances = len(self.instance_to_dir_map)
286 nr_dead_instances = len(self.dead_instances)
287 if nr_instances > 0:
288 # adjust dead instances
289 nr_instances -= nr_dead_instances
290 include_stalled_dirs = nr_instances == 0
291 for instance_id in instance_ids:
292 if not instance_id in self.instance_to_dir_map:
293 self.instance_to_dir_map[instance_id] = []
294 dirs_per_instance = int(len(self.dir_states) /
295 (len(self.instance_to_dir_map) - nr_dead_instances))
296 if dirs_per_instance == 0:
297 dirs_per_instance += 1
298 shuffle_dirs = []
299 # super set of directories which are candidates for shuffling -- choose
300 # those which can be shuffle rightaway (others will be shuffled when
301 # they reach idle state).
302 shuffle_dirs_ss = self.shuffle(dirs_per_instance, include_stalled_dirs)
303 if include_stalled_dirs:
304 return shuffle_dirs_ss
305 for dir_path in shuffle_dirs_ss:
306 dir_state = self.dir_states[dir_path]
307 if self.set_state(dir_state, State.SHUFFLING):
308 shuffle_dirs.append(dir_path)
309 log.debug(f'remapping directories: {shuffle_dirs}')
310 return shuffle_dirs
311
312 def remove_instances(self, instance_ids):
313 with self.lock:
314 return self._remove_instances(instance_ids)
315
316 def _remove_instances(self, instance_ids):
317 log.debug(f'removing instances: {instance_ids}')
318 shuffle_dirs = []
319 for instance_id in instance_ids:
320 if not instance_id in self.instance_to_dir_map:
321 continue
322 if not self.instance_to_dir_map[instance_id]:
323 self.instance_to_dir_map.pop(instance_id)
324 continue
325 self.dead_instances.append(instance_id)
326 dir_paths = self.instance_to_dir_map[instance_id]
327 log.debug(f'force shuffling instance_id {instance_id}, directories {dir_paths}')
328 for dir_path in dir_paths:
329 dir_state = self.dir_states[dir_path]
330 if self.is_state_scheduled(dir_state, State.DISASSOCIATING):
331 log.debug(f'dir_path {dir_path} is disassociating, ignoring...')
332 continue
333 log.debug(f'shuffling dir_path {dir_path}')
334 if self.set_state(dir_state, State.SHUFFLING, True):
335 shuffle_dirs.append(dir_path)
336 log.debug(f'shuffling {shuffle_dirs}')
337 return shuffle_dirs
338
339 def dir_status(self, dir_path):
340 with self.lock:
341 dir_state = self.dir_states.get(dir_path, None)
342 if not dir_state:
343 raise MirrorException(-errno.ENOENT, f'{dir_path} is not tracked')
344 res = {} # type: Dict
345 if dir_state.stalled:
346 res['state'] = 'stalled'
347 res['reason'] = 'no mirror daemons running'
348 elif dir_state.state == State.ASSOCIATING:
349 res['state'] = 'mapping'
350 else:
351 state = None
352 dstate = dir_state.state
353 if dstate == State.ASSOCIATING:
354 state = 'mapping'
355 elif dstate == State.DISASSOCIATING:
356 state = 'unmapping'
357 elif dstate == State.SHUFFLING:
358 state = 'shuffling'
359 elif dstate == State.ASSOCIATED:
360 state = 'mapped'
361 elif dstate == State.INITIALIZING:
362 state = 'resolving'
363 res['state'] = state
364 res['instance_id'] = dir_state.instance_id
365 res['last_shuffled'] = dir_state.mapped_time
366 return res
367
368 def instance_summary(self):
369 with self.lock:
370 res = {
371 'mapping': {}
372 } # type: Dict
373 for instance_id, dir_paths in self.instance_to_dir_map.items():
374 res['mapping'][instance_id] = f'{len(dir_paths)} directories'
375 return res