]>
git.proxmox.com Git - ceph.git/blob - ceph/src/pybind/mgr/mirroring/fs/dir_map/policy.py
5 from threading
import Lock
6 from typing
import Dict
8 from .state_transition
import ActionType
, PolicyAction
, Transition
, \
10 from ..exception
import MirrorException
12 log
= logging
.getLogger(__name__
)
15 def __init__(self
, instance_id
=None, mapped_time
=None):
16 self
.instance_id
= instance_id
17 self
.mapped_time
= mapped_time
18 self
.state
= State
.UNASSOCIATED
20 self
.transition
= Transition(ActionType
.NONE
)
21 self
.next_state
= None
25 return f
'[instance_id={self.instance_id}, mapped_time={self.mapped_time},'\
26 f
' state={self.state}, transition={self.transition}, next_state={self.next_state},'\
27 f
' purging={self.purging}]'
32 self
.instance_to_dir_map
= {}
33 self
.dead_instances
= []
37 def is_instance_action(action_type
):
38 return action_type
in (ActionType
.ACQUIRE
,
41 def is_dead_instance(self
, instance_id
):
42 return instance_id
in self
.dead_instances
44 def is_state_scheduled(self
, dir_state
, state
):
45 return dir_state
.state
== state
or dir_state
.next_state
== state
47 def is_shuffling(self
, dir_path
):
48 log
.debug(f
'is_shuffling: {dir_path}')
49 return self
.is_state_scheduled(self
.dir_states
[dir_path
], State
.SHUFFLING
)
51 def can_shuffle_dir(self
, dir_path
):
52 """Right now, shuffle directories only based on idleness. Later, we
53 probably want to avoid shuffling images that were recently shuffled.
55 log
.debug(f
'can_shuffle_dir: {dir_path}')
56 dir_state
= self
.dir_states
[dir_path
]
57 return StateTransition
.is_idle(dir_state
.state
)
59 def set_state(self
, dir_state
, state
, ignore_current_state
=False):
60 if not ignore_current_state
and dir_state
.state
== state
:
62 elif StateTransition
.is_idle(dir_state
.state
):
63 dir_state
.state
= state
64 dir_state
.next_state
= None
65 dir_state
.transition
= StateTransition
.transit(
66 dir_state
.state
, dir_state
.transition
.action_type
)
68 dir_state
.next_state
= state
71 def init(self
, dir_mapping
):
73 for dir_path
, dir_map
in dir_mapping
.items():
74 instance_id
= dir_map
['instance_id']
76 if not instance_id
in self
.instance_to_dir_map
:
77 self
.instance_to_dir_map
[instance_id
] = []
78 self
.instance_to_dir_map
[instance_id
].append(dir_path
)
79 self
.dir_states
[dir_path
] = DirectoryState(instance_id
, dir_map
['last_shuffled'])
80 dir_state
= self
.dir_states
[dir_path
]
81 state
= State
.INITIALIZING
if instance_id
else State
.ASSOCIATING
82 purging
= dir_map
.get('purging', 0)
84 dir_state
.purging
= True
85 state
= State
.DISASSOCIATING
87 dir_state
.transition
= StateTransition
.transit(state
,
88 dir_state
.transition
.action_type
)
89 log
.debug(f
'starting state: {dir_path} {state}: {dir_state}')
90 self
.set_state(dir_state
, state
)
91 log
.debug(f
'init dir_state: {dir_state}')
93 def lookup(self
, dir_path
):
94 log
.debug(f
'looking up {dir_path}')
96 dir_state
= self
.dir_states
.get(dir_path
, None)
98 return {'instance_id': dir_state
.instance_id
,
99 'mapped_time': dir_state
.mapped_time
,
100 'purging': dir_state
.purging
}
103 def map(self
, dir_path
, dir_state
):
104 log
.debug(f
'mapping {dir_path}')
105 min_instance_id
= None
106 current_instance_id
= dir_state
.instance_id
107 if current_instance_id
and not self
.is_dead_instance(current_instance_id
):
109 if self
.is_dead_instance(current_instance_id
):
110 self
.unmap(dir_path
, dir_state
)
111 for instance_id
, dir_paths
in self
.instance_to_dir_map
.items():
112 if self
.is_dead_instance(instance_id
):
114 if not min_instance_id
or len(dir_paths
) < len(self
.instance_to_dir_map
[min_instance_id
]):
115 min_instance_id
= instance_id
116 if not min_instance_id
:
117 log
.debug(f
'instance unavailable for {dir_path}')
119 log
.debug(f
'dir_path {dir_path} maps to instance {min_instance_id}')
120 dir_state
.instance_id
= min_instance_id
121 dir_state
.mapped_time
= time
.time()
122 self
.instance_to_dir_map
[min_instance_id
].append(dir_path
)
125 def unmap(self
, dir_path
, dir_state
):
126 instance_id
= dir_state
.instance_id
127 log
.debug(f
'unmapping {dir_path} from instance {instance_id}')
128 self
.instance_to_dir_map
[instance_id
].remove(dir_path
)
129 dir_state
.instance_id
= None
130 dir_state
.mapped_time
= None
131 if self
.is_dead_instance(instance_id
) and not self
.instance_to_dir_map
[instance_id
]:
132 self
.instance_to_dir_map
.pop(instance_id
)
133 self
.dead_instances
.remove(instance_id
)
135 def shuffle(self
, dirs_per_instance
, include_stalled_dirs
):
136 log
.debug(f
'directories per instance: {dirs_per_instance}')
138 for instance_id
, dir_paths
in self
.instance_to_dir_map
.items():
139 cut_off
= len(dir_paths
) - dirs_per_instance
141 for dir_path
in dir_paths
:
144 if self
.is_shuffling(dir_path
):
146 elif self
.can_shuffle_dir(dir_path
):
148 shuffle_dirs
.append(dir_path
)
149 if include_stalled_dirs
:
150 for dir_path
, dir_state
in self
.dir_states
.items():
151 if dir_state
.stalled
:
152 log
.debug(f
'{dir_path} is stalled: {dir_state} -- trigerring kick')
153 dir_state
.stalled
= False
154 shuffle_dirs
.append(dir_path
)
157 def execute_policy_action(self
, dir_path
, dir_state
, policy_action
):
158 log
.debug(f
'executing for directory {dir_path} policy_action {policy_action}')
161 if policy_action
== PolicyAction
.MAP
:
162 done
= self
.map(dir_path
, dir_state
)
163 elif policy_action
== PolicyAction
.UNMAP
:
164 self
.unmap(dir_path
, dir_state
)
165 elif policy_action
== PolicyAction
.REMOVE
:
166 if dir_state
.state
== State
.UNASSOCIATED
:
167 self
.dir_states
.pop(dir_path
)
172 def start_action(self
, dir_path
):
173 log
.debug(f
'start action: {dir_path}')
175 dir_state
= self
.dir_states
.get(dir_path
, None)
178 log
.debug(f
'dir_state: {dir_state}')
179 if dir_state
.transition
.start_policy_action
:
180 stalled
= not self
.execute_policy_action(dir_path
, dir_state
,
181 dir_state
.transition
.start_policy_action
)
183 next_action
= ActionType
.NONE
184 if dir_state
.purging
:
185 dir_state
.next_state
= None
186 dir_state
.state
= State
.UNASSOCIATED
187 dir_state
.transition
= StateTransition
.transit(State
.DISASSOCIATING
, ActionType
.NONE
)
188 self
.set_state(dir_state
, State
.DISASSOCIATING
)
189 next_action
= dir_state
.transition
.action_type
191 dir_state
.stalled
= True
192 log
.debug(f
'state machine stalled')
194 return dir_state
.transition
.action_type
196 def finish_action(self
, dir_path
, r
):
197 log
.debug(f
'finish action {dir_path} r={r}')
199 dir_state
= self
.dir_states
.get(dir_path
, None)
202 if r
< 0 and (not Policy
.is_instance_action(dir_state
.transition
.action_type
) or
203 not dir_state
.instance_id
or
204 not dir_state
.instance_id
in self
.dead_instances
):
206 log
.debug(f
'dir_state: {dir_state}')
207 finish_policy_action
= dir_state
.transition
.finish_policy_action
208 dir_state
.transition
= StateTransition
.transit(
209 dir_state
.state
, dir_state
.transition
.action_type
)
210 log
.debug(f
'transitioned to dir_state: {dir_state}')
211 if dir_state
.transition
.final_state
:
212 log
.debug('reached final state')
213 dir_state
.state
= dir_state
.transition
.final_state
214 dir_state
.transition
= Transition(ActionType
.NONE
)
215 log
.debug(f
'final dir_state: {dir_state}')
216 if StateTransition
.is_idle(dir_state
.state
) and dir_state
.next_state
:
217 self
.set_state(dir_state
, dir_state
.next_state
)
218 pending
= not dir_state
.transition
.action_type
== ActionType
.NONE
219 if finish_policy_action
:
220 self
.execute_policy_action(dir_path
, dir_state
, finish_policy_action
)
223 def find_tracked_ancestor_or_subtree(self
, dir_path
):
224 for tracked_path
, _
in self
.dir_states
.items():
225 comp
= [dir_path
, tracked_path
]
226 cpath
= os
.path
.commonpath(comp
)
228 what
= 'subtree' if cpath
== tracked_path
else 'ancestor'
229 return (tracked_path
, what
)
232 def add_dir(self
, dir_path
):
233 log
.debug(f
'adding dir_path {dir_path}')
235 if dir_path
in self
.dir_states
:
237 as_info
= self
.find_tracked_ancestor_or_subtree(dir_path
)
239 raise MirrorException(-errno
.EINVAL
, f
'{dir_path} is a {as_info[1]} of tracked path {as_info[0]}')
240 self
.dir_states
[dir_path
] = DirectoryState()
241 dir_state
= self
.dir_states
[dir_path
]
242 log
.debug(f
'add dir_state: {dir_state}')
243 if dir_state
.state
== State
.INITIALIZING
:
245 return self
.set_state(dir_state
, State
.ASSOCIATING
)
247 def remove_dir(self
, dir_path
):
248 log
.debug(f
'removing dir_path {dir_path}')
250 dir_state
= self
.dir_states
.get(dir_path
, None)
253 log
.debug(f
'removing dir_state: {dir_state}')
254 dir_state
.purging
= True
255 # advance the state machine with DISASSOCIATING state for removal
256 if dir_state
.stalled
:
257 dir_state
.state
= State
.UNASSOCIATED
258 dir_state
.transition
= StateTransition
.transit(State
.DISASSOCIATING
, ActionType
.NONE
)
259 r
= self
.set_state(dir_state
, State
.DISASSOCIATING
)
260 log
.debug(f
'dir_state: {dir_state}')
263 def add_instances_initial(self
, instance_ids
):
264 """Take care of figuring out instances which no longer exist
265 and remove them. This is to be done only once on startup to
266 identify instances which were previously removed but directories
267 are still mapped (on-disk) to them.
269 for instance_id
in instance_ids
:
270 if not instance_id
in self
.instance_to_dir_map
:
271 self
.instance_to_dir_map
[instance_id
] = []
273 for instance_id
, _
in self
.instance_to_dir_map
.items():
274 if not instance_id
in instance_ids
:
275 dead_instances
.append(instance_id
)
277 self
._remove
_instances
(dead_instances
)
279 def add_instances(self
, instance_ids
, initial_update
=False):
280 log
.debug(f
'adding instances: {instance_ids} initial_update {initial_update}')
283 self
.add_instances_initial(instance_ids
)
285 nr_instances
= len(self
.instance_to_dir_map
)
286 nr_dead_instances
= len(self
.dead_instances
)
288 # adjust dead instances
289 nr_instances
-= nr_dead_instances
290 include_stalled_dirs
= nr_instances
== 0
291 for instance_id
in instance_ids
:
292 if not instance_id
in self
.instance_to_dir_map
:
293 self
.instance_to_dir_map
[instance_id
] = []
294 dirs_per_instance
= int(len(self
.dir_states
) /
295 (len(self
.instance_to_dir_map
) - nr_dead_instances
))
296 if dirs_per_instance
== 0:
297 dirs_per_instance
+= 1
299 # super set of directories which are candidates for shuffling -- choose
300 # those which can be shuffle rightaway (others will be shuffled when
301 # they reach idle state).
302 shuffle_dirs_ss
= self
.shuffle(dirs_per_instance
, include_stalled_dirs
)
303 if include_stalled_dirs
:
304 return shuffle_dirs_ss
305 for dir_path
in shuffle_dirs_ss
:
306 dir_state
= self
.dir_states
[dir_path
]
307 if self
.set_state(dir_state
, State
.SHUFFLING
):
308 shuffle_dirs
.append(dir_path
)
309 log
.debug(f
'remapping directories: {shuffle_dirs}')
312 def remove_instances(self
, instance_ids
):
314 return self
._remove
_instances
(instance_ids
)
316 def _remove_instances(self
, instance_ids
):
317 log
.debug(f
'removing instances: {instance_ids}')
319 for instance_id
in instance_ids
:
320 if not instance_id
in self
.instance_to_dir_map
:
322 if not self
.instance_to_dir_map
[instance_id
]:
323 self
.instance_to_dir_map
.pop(instance_id
)
325 self
.dead_instances
.append(instance_id
)
326 dir_paths
= self
.instance_to_dir_map
[instance_id
]
327 log
.debug(f
'force shuffling instance_id {instance_id}, directories {dir_paths}')
328 for dir_path
in dir_paths
:
329 dir_state
= self
.dir_states
[dir_path
]
330 if self
.is_state_scheduled(dir_state
, State
.DISASSOCIATING
):
331 log
.debug(f
'dir_path {dir_path} is disassociating, ignoring...')
333 log
.debug(f
'shuffling dir_path {dir_path}')
334 if self
.set_state(dir_state
, State
.SHUFFLING
, True):
335 shuffle_dirs
.append(dir_path
)
336 log
.debug(f
'shuffling {shuffle_dirs}')
339 def dir_status(self
, dir_path
):
341 dir_state
= self
.dir_states
.get(dir_path
, None)
343 raise MirrorException(-errno
.ENOENT
, f
'{dir_path} is not tracked')
344 res
= {} # type: Dict
345 if dir_state
.stalled
:
346 res
['state'] = 'stalled'
347 res
['reason'] = 'no mirror daemons running'
348 elif dir_state
.state
== State
.ASSOCIATING
:
349 res
['state'] = 'mapping'
352 dstate
= dir_state
.state
353 if dstate
== State
.ASSOCIATING
:
355 elif dstate
== State
.DISASSOCIATING
:
357 elif dstate
== State
.SHUFFLING
:
359 elif dstate
== State
.ASSOCIATED
:
361 elif dstate
== State
.INITIALIZING
:
364 res
['instance_id'] = dir_state
.instance_id
365 res
['last_shuffled'] = dir_state
.mapped_time
368 def instance_summary(self
):
373 for instance_id
, dir_paths
in self
.instance_to_dir_map
.items():
374 res
['mapping'][instance_id
] = f
'{len(dir_paths)} directories'