]>
git.proxmox.com Git - mirror_ifupdown2.git/blob - pkg/scheduler.py
5 from statemanager
import *
8 from collections
import deque
9 from collections
import OrderedDict
14 from collections
import deque
15 from threading
import *
16 from ifupdownbase
import *
18 class ifaceScheduler(ifupdownBase
):
19 """ scheduler to schedule configuration of interfaces.
22 supports scheduling of interfaces serially in plain interface list
23 or dependency graph format.
27 self
.logger
= logging
.getLogger('ifupdown.' +
28 self
.__class
__.__name
__)
30 def run_iface_subop(self
, ifupdownobj
, ifaceobj
, op
, subop
, mdict
, cenv
):
31 """ Runs sub operation on an interface """
33 self
.logger
.debug('%s: ' %ifaceobj
.get_name() + 'op %s' %op
+
36 for mname
, mdata
in mdict
.items():
37 m
= mdata
.get('module')
40 if (mdata
.get('ftype') == 'pmodule' and
41 hasattr(m
, 'run') == True):
42 self
.logger
.debug('%s: ' %ifaceobj
.get_name() +
43 'running module %s' %mname
+
44 ' op %s' %op
+ ' subop %s' %subop
)
46 m
.run(ifaceobj
, subop
, query
=True,
47 query_ifaceobj
=ifupdownobj
.create_ifaceobjcurr(
50 m
.run(ifaceobj
, subop
)
52 self
.logger
.debug('%s: ' %ifaceobj
.get_name() +
53 'running script %s' %mname
+
54 ' op %s' %op
+ ' subop %s' %subop
)
55 self
.exec_command(m
, cmdenv
=cenv
)
58 if ifupdownobj
.ignore_error(str(e
)) == True:
65 ifupdownobj
.set_iface_state(ifaceobj
,
66 ifaceState
.from_str(subop
),
69 ifupdownobj
.set_iface_state(ifaceobj
,
70 ifaceState
.from_str(subop
),
73 def run_iface_subops(self
, ifupdownobj
, ifaceobj
, op
):
74 """ Runs all sub operations on an interface """
76 # For backward compatibility execute scripts with
78 cenv
= ifupdownobj
.generate_running_env(ifaceobj
, op
)
80 # Each sub operation has a module list
81 subopdict
= ifupdownobj
.operations
.get(op
)
82 for subop
, mdict
in subopdict
.items():
83 self
.run_iface_subop(ifupdownobj
, ifaceobj
, op
, subop
, mdict
, cenv
)
86 def run_iface(self
, ifupdownobj
, ifacename
, op
):
87 """ Runs operation on an interface """
89 ifaceobjs
= ifupdownobj
.get_iface_objs(ifacename
)
91 if (op
!= 'query' and ifupdownobj
.STATE_CHECK
== True and
92 ifupdownobj
.is_valid_state_transition(i
, op
) == False and
93 ifupdownobj
.FORCE
== False):
94 self
.logger
.warning('%s' %ifacename
+
98 self
.run_iface_subops(ifupdownobj
, i
, op
)
101 def run_iface_list(self
, ifupdownobj
, ifacenames
, operation
,
102 sorted_by_dependency
=False):
103 """ Runs interface list serially executing all sub operations on
104 each interface at a time. """
106 self
.logger
.debug('run_iface_list: running interface list for ' +
107 'operation %s' %operation
)
109 iface_run_queue
= deque(ifacenames
)
110 for i
in range(0, len(iface_run_queue
)):
111 if operation
== 'up':
113 if sorted_by_dependency
== True:
114 ifacename
= iface_run_queue
.pop()
116 ifacename
= iface_run_queue
.popleft()
118 if sorted_by_dependency
== True:
119 ifacename
= iface_run_queue
.popleft()
121 ifacename
= iface_run_queue
.pop()
124 self
.run_iface(ifupdownobj
, ifacename
, operation
)
126 if (ifupdownobj
.ignore_error(str(e
)) == True):
132 def run_iface_list_subop(self
, ifupdownobj
, ifacenames
, op
, subop
, mdict
,
133 sorted_by_dependency
=False):
134 """ Runs interface list through sub operation handler. """
136 self
.logger
.debug('running sub operation %s on all given interfaces' %op
)
138 iface_run_queue
= deque(ifacenames
)
139 for i
in range(0, len(iface_run_queue
)):
142 if sorted_by_dependency
== True:
143 ifacename
= iface_run_queue
.pop()
145 ifacename
= iface_run_queue
.popleft()
147 if sorted_by_dependency
== True:
148 ifacename
= iface_run_queue
.popleft()
150 ifacename
= iface_run_queue
.pop()
153 ifaceobjs
= ifupdownobj
.get_iface_objs(ifacename
)
154 for ifaceobj
in ifaceobjs
:
155 if (op
!= 'query' and ifupdownobj
.STATE_CHECK
== True and
156 ifupdownobj
.is_valid_state_transition(ifaceobj
,
157 op
) == False and ifupdownobj
.FORCE
== False):
158 if subop
== 'post-down' or subop
== 'post-up':
159 self
.logger
.warning('%s: ' %ifacename
+
163 cenv
= ifupdownobj
.generate_running_env(ifaceobj
, op
)
164 self
.run_iface_subop(ifupdownobj
, ifaceobj
, op
, subop
,
167 if (ifupdownobj
.ignore_error(str(e
)) == True):
173 def run_iface_list_stages(self
, ifupdownobj
, ifacenames
, op
,
174 sorted_by_dependency
=False):
175 """ Runs interface list through sub operations handler
177 Unlike run_iface_list, this method executes a sub operation on the
178 entire interface list before proceeding to the next sub-operation.
179 ie operation 'pre-up' is run through the entire interface list before
183 self
.logger
.debug('run_iface_list_stages: running interface list for %s'
186 # Each sub operation has a module list
187 subopdict
= ifupdownobj
.operations
.get(op
)
188 for subop
, mdict
in subopdict
.items():
189 self
.run_iface_list_subop(ifupdownobj
, ifacenames
, op
, subop
, mdict
,
190 sorted_by_dependency
)
193 def run_iface_dependency_graph(self
, ifupdownobj
, dependency_graph
,
195 """ runs interface dependency graph """
197 indegrees
= OrderedDict()
199 self
.logger
.debug('creating indegree array ...')
200 for ifacename
in dependency_graph
.keys():
201 indegrees
[ifacename
] = ifupdownobj
.get_iface_refcnt(ifacename
)
203 if self
.logger
.isEnabledFor(logging
.DEBUG
) == True:
204 self
.logger
.debug('indegree array :')
205 ifupdownobj
.pp
.pprint(indegrees
)
208 self
.logger
.debug('calling topological sort on the graph ...')
209 sorted_ifacenames
= graph
.topological_sort(dependency_graph
,
214 self
.logger
.debug('sorted iface list = %s' %sorted
_ifacenames
)
216 #self.run_iface_list(ifupdownobj, sorted_ifacenames, operation,
217 # sorted_by_dependency=True)
219 self
.run_iface_list_stages(ifupdownobj
, sorted_ifacenames
, operation
,
220 sorted_by_dependency
=True)
223 def init_tokens(self
, count
):
224 self
.token_pool
= BoundedSemaphore(count
)
225 self
.logger
.debug('initialized bounded semaphore with %d' %count
)
227 def accquire_token(self
, logprefix
=''):
228 self
.token_pool
.acquire()
229 self
.logger
.debug('%s ' %logprefix
+ 'acquired token')
231 def release_token(self
, logprefix
=''):
232 self
.token_pool
.release()
233 self
.logger
.debug('%s ' %logprefix
+ 'release token')
235 def run_iface_parallel(self
, ifupdownobj
, ifacename
, op
):
236 """ Configures interface in parallel.
238 Executes all its direct dependents in parallel
242 self
.logger
.debug('%s:' %ifacename
+ ' %s' %op
)
243 self
.accquire_token(iface
)
245 # Each iface can have a list of objects
246 ifaceobjs
= ifupdownobj
.get_iface_objs(ifacename
)
247 if ifaceobjs
is None:
248 self
.logger
.warning('%s: ' %ifacename
+ 'not found')
249 self
.release_token(ifacename
)
252 for ifaceobj
in ifaceobjs
:
254 dlist
= ifaceobj
.get_dependents()
255 if dlist
is not None and len(dlist
) > 0:
256 self
.logger
.debug('%s:' %ifacename
+
257 ' found dependents: %s' %str
(dlist
))
259 self
.release_token(ifacename
)
260 self
.run_iface_list_parallel(ifacename
, ifupdownobj
,
262 self
.accquire_token(ifacename
)
264 if (ifupdownobj
.ignore_error(str(e
)) == True):
267 # Dont bring the iface up if children did not come up
268 self
.logger
.debug('%s:' %ifacename
+
269 ' there was an error bringing %s' %op
+
270 ' dependents (%s)', str(e
))
271 ifupdownobj
.set_iface_state(ifaceobj
,
273 ifupdownobj
.get_subops(op
)[0]),
277 if (op
!= 'query' and ifupdownobj
.STATE_CHECK
== True and
278 ifupdownobj
.is_valid_state_transition(ifaceobj
,
279 op
) == False and ifupdownobj
.FORCE
== False):
280 self
.logger
.warning('%s:' %ifacename
+ ' already %s' %op
)
284 # Run all sub operations sequentially
286 self
.logger
.debug('%s:' %ifacename
+ ' running sub-operations')
287 self
.run_iface_subops(ifupdownobj
, ifaceobj
, op
)
289 self
.logger
.error('%s:' %ifacename
+
290 ' error running sub operations (%s)' %str
(e
))
292 self
.release_token(ifacename
)
295 def run_iface_list_parallel(self
, parent
, ifupdownobj
, ifacenames
, op
):
296 """ Runs interface list in parallel """
298 running_threads
= OrderedDict()
301 for ifacename
in ifacenames
:
303 self
.accquire_token(parent
)
304 running_threads
[ifacename
] = Thread(None,
305 self
.run_iface_parallel
, ifacename
,
306 args
=(ifupdownobj
, ifacename
, op
))
307 running_threads
[ifacename
].start()
308 self
.release_token(parent
)
310 self
.release_token(parent
)
311 if (ifupdownobj
.ignore_error(str(e
)) == True):
314 raise Exception('error starting thread for iface %s'
318 self
.logger
.debug('%s' %parent
+ 'waiting for all the threads ...')
319 for ifacename
, t
in running_threads
.items():
321 if ifupdownobj
.get_iface_status(ifacename
) != ifaceStatus
.SUCCESS
:
326 def run_iface_graphs_parallel(self
, parent
, ifupdownobj
, ifacenames
, op
):
327 """ Runs iface graphs in parallel """
329 running_threads
= OrderedDict()
332 for ifacename
in ifacenames
:
334 self
.accquire_graph_token(parent
)
335 running_threads
[ifacename
] = Thread(None,
336 self
.run_iface_parallel
, ifacename
,
337 args
=(ifupdownobj
, ifacename
, op
))
338 running_threads
[ifacename
].start()
339 self
.release_graph_token(parent
)
341 self
.release_graph_token(parent
)
342 if (ifupdownobj
.ignore_error(str(e
)) == True):
345 raise Exception('error starting thread for iface %s'
348 self
.logger
.info('%s' %parent
+ 'waiting for all the threads ...')
349 for ifacename
, t
in running_threads
.items():
351 # Check status of thread
352 # XXX: Check all objs
353 if ifupdownobj
.get_iface_status(ifacename
) != ifaceStatus
.SUCCESS
:
358 def run_iface_dependency_graph_parallel(self
, ifupdownobj
, dependency_graph
,
360 """ Runs iface dependeny graph in parallel.
363 ifupdownobj -- ifupdown object (used for getting and updating iface
365 dependency_graph -- dependency graph with
366 operation -- 'up' or 'down' or 'query'
370 self
.logger
.debug('running dependency graph in parallel ..')
374 # Build a list of ifaces that dont have any dependencies
375 for ifacename
in dependency_graph
.keys():
376 if ifupdownobj
.get_iface_refcnt(ifacename
) == 0:
377 run_queue
.append(ifacename
)
379 self
.logger
.debug('graph roots (interfaces that dont have dependents):' +
380 ' %s' %str
(run_queue
))
382 self
.init_tokens(ifupdownobj
.get_njobs())
384 return self
.run_iface_list_parallel('main', ifupdownobj
, run_queue
,
388 # Run one graph at a time
389 #for iface in run_queue:
390 # self.run_iface_list_parallel('main', ifupdownobj, [iface],