]>
git.proxmox.com Git - mirror_ifupdown2.git/blob - pkg/scheduler.py
3 # Copyright 2013. Cumulus Networks, Inc.
4 # Author: Roopa Prabhu, roopa@cumulusnetworks.com
10 from statemanager
import *
13 from collections
import deque
14 from collections
import OrderedDict
19 from collections
import deque
20 from threading
import *
21 from ifupdownbase
import *
23 class ifaceSchedulerFlags():
27 class ifaceScheduler():
28 """ scheduler functions to schedule configuration of interfaces.
31 supports scheduling of interfaces serially in plain interface list
32 or dependency graph format.
38 def run_iface_op(cls
, ifupdownobj
, ifaceobj
, op
, cenv
):
39 """ Runs sub operation on an interface """
40 ifacename
= ifaceobj
.get_name()
42 if (ifaceobj
.get_state() >= ifaceState
.from_str(op
) and
43 ifaceobj
.get_status() == ifaceStatus
.SUCCESS
):
44 ifupdownobj
.logger
.debug('%s: already in state %s' %(ifacename
, op
))
47 # first run ifupdownobj handlers
48 handler
= ifupdownobj
.ops_handlers
.get(op
)
50 addr_method
= ifaceobj
.get_addr_method()
51 if not addr_method
or (addr_method
and addr_method
!= 'manual'):
52 handler(ifupdownobj
, ifaceobj
)
54 for mname
in ifupdownobj
.module_ops
.get(op
):
55 m
= ifupdownobj
.modules
.get(mname
)
59 ifupdownobj
.logger
.debug('%s: %s : running module %s'
60 %(ifacename
, op
, mname
))
61 if op
== 'query-checkcurr':
62 # Dont check curr if the interface object was
64 if (ifaceobj
.priv_flags
& ifupdownobj
.NOCONFIG
):
67 query_ifaceobj
=ifupdownobj
.create_n_save_ifaceobjcurr(ifaceobj
))
72 ifupdownobj
.log_error(str(e
))
75 ifupdownobj
.set_iface_state(ifaceobj
,
76 ifaceState
.from_str(op
),
79 ifupdownobj
.set_iface_state(ifaceobj
,
80 ifaceState
.from_str(op
),
84 if ifupdownobj
.COMPAT_EXEC_SCRIPTS
:
85 # execute /etc/network/ scripts
86 for mname
in ifupdownobj
.script_ops
.get(op
, []):
87 ifupdownobj
.logger
.debug('%s: %s : running script %s'
88 %(ifacename
, op
, mname
))
90 ifupdownobj
.exec_command(mname
, cmdenv
=cenv
)
92 ifupdownobj
.log_error(str(e
))
95 def run_iface_ops(cls
, ifupdownobj
, ifaceobj
, ops
):
96 """ Runs all sub operations on an interface """
99 if ifupdownobj
.COMPAT_EXEC_SCRIPTS
:
100 # For backward compatibility generate env variables
102 cenv
= ifupdownobj
.generate_running_env(ifaceobj
, ops
[0])
104 # Each sub operation has a module list
105 map(lambda op
: cls
.run_iface_op(ifupdownobj
, ifaceobj
, op
, cenv
), ops
)
108 def run_iface_graph(cls
, ifupdownobj
, ifacename
, ops
, parent
=None,
109 order
=ifaceSchedulerFlags
.POSTORDER
,
110 followdependents
=True):
111 """ runs interface by traversing all nodes rooted at itself """
113 # minor optimization. If operation is 'down', proceed only
114 # if interface exists in the system
115 if 'down' in ops
[0] and not ifupdownobj
.link_exists(ifacename
):
116 ifupdownobj
.logger
.info('%s: does not exist' %ifacename
)
119 # Each ifacename can have a list of iface objects
120 ifaceobjs
= ifupdownobj
.get_iface_objs(ifacename
)
121 if ifaceobjs
is None:
122 raise Exception('%s: not found' %ifacename
)
124 for ifaceobj
in ifaceobjs
:
125 # Deal with upperdevs first
126 ulist
= ifaceobj
.get_upperifaces()
128 tmpulist
= ([u
for u
in ulist
if u
!= parent
] if parent
132 # XXX: This is expensive. Find a cheaper way to do this
133 # if any of the upperdevs are present,
134 # dont down this interface
136 if ifupdownobj
.link_exists(u
):
137 if not ifupdownobj
.ALL
:
138 ifupdownobj
.logger
.warn('%s: ' %ifacename
+
139 ' skip interface down,' +
140 ' upperiface %s still around' %u)
142 elif 'up' in ops
[0] and not ifupdownobj
.ALL
:
143 # For 'up', just warn that there is an upperdev which is
146 if not ifupdownobj
.link_exists(u
):
147 ifupdownobj
.logger
.warn('%s: upper iface %s '
148 %(ifacename
, u
) + 'does not exist')
150 if order
== ifaceSchedulerFlags
.INORDER
:
151 # If inorder, run the iface first and then its dependents
152 cls
.run_iface_ops(ifupdownobj
, ifaceobj
, ops
)
154 # Run lowerifaces or dependents
155 dlist
= ifaceobj
.get_lowerifaces()
157 ifupdownobj
.logger
.info('%s:' %ifacename
+
158 ' found dependents: %s' %str
(dlist
))
160 if not followdependents
:
161 # XXX: this is yet another extra step,
162 # but is needed for interfaces that are
163 # implicit dependents. even though we are asked to
164 # not follow dependents, we must follow the ones
165 # that dont have user given config. Because we own them
166 new_dlist
= [d
for d
in dlist
167 if ifupdownobj
.is_iface_noconfig(d
)]
169 cls
.run_iface_list(ifupdownobj
, new_dlist
, ops
,
172 continueonfailure
=False)
174 cls
.run_iface_list(ifupdownobj
, dlist
, ops
,
177 continueonfailure
=False)
179 if (ifupdownobj
.ignore_error(str(e
))):
182 # Dont bring the iface up if children did not come up
183 ifaceobj
.set_state(ifaceState
.NEW
)
184 ifaceobj
.set_status(ifaceStatus
.ERROR
)
186 if order
== ifaceSchedulerFlags
.POSTORDER
:
187 cls
.run_iface_ops(ifupdownobj
, ifaceobj
, ops
)
190 def run_iface_list(cls
, ifupdownobj
, ifacenames
,
191 ops
, parent
=None, order
=ifaceSchedulerFlags
.POSTORDER
,
192 followdependents
=True, continueonfailure
=True):
193 """ Runs interface list """
195 for ifacename
in ifacenames
:
197 cls
.run_iface_graph(ifupdownobj
, ifacename
, ops
, parent
,
198 order
, followdependents
)
200 if continueonfailure
:
201 if ifupdownobj
.logger
.isEnabledFor(logging
.DEBUG
):
202 traceback
.print_tb(sys
.exc_info()[2])
203 ifupdownobj
.logger
.error('%s : %s' %(ifacename
, str(e
)))
206 if (ifupdownobj
.ignore_error(str(e
))):
209 raise Exception('error running iface %s (%s)'
210 %(ifacename
, str(e
)))
213 def run_iface_dependency_graphs(cls
, ifupdownobj
,
214 dependency_graph
, ops
, indegrees
=None,
215 order
=ifaceSchedulerFlags
.POSTORDER
,
216 followdependents
=True):
217 """ Runs iface dependeny graph by visiting all the nodes
221 ifupdownobj : ifupdown object (used for getting and updating iface
223 dependency_graph : dependency graph in adjacency list
224 format (contains more than one dependency graph)
225 ops : list of operations to perform eg ['pre-up', 'up', 'post-up']
227 indegrees : indegree array if present is used to determine roots
228 of the graphs in the dependency_graph
232 if indegrees
is None:
233 indegrees
= OrderedDict()
234 for ifacename
in dependency_graph
.keys():
235 indegrees
[ifacename
] = ifupdownobj
.get_iface_refcnt(ifacename
)
237 sorted_ifacenames
= graph
.topological_sort_graphs_all(dependency_graph
,
239 ifupdownobj
.logger
.debug('sorted ifacenames %s : '
240 %str
(sorted_ifacenames
))
242 # Build a list of ifaces that dont have any dependencies
243 for ifacename
in sorted_ifacenames
:
244 if not indegrees
.get(ifacename
):
245 run_queue
.append(ifacename
)
247 ifupdownobj
.logger
.info('graph roots (interfaces that dont have '
248 'dependents):' + ' %s' %str
(run_queue
))
250 return cls
.run_iface_list(ifupdownobj
, run_queue
, ops
,
251 parent
=None,order
=order
,
252 followdependents
=followdependents
)
255 def run_iface(cls
, ifupdownobj
, ifacename
, ops
):
256 """ Runs operation on an interface """
258 ifaceobjs
= ifupdownobj
.get_iface_objs(ifacename
)
260 cls
.run_iface_ops(ifupdownobj
, i
, ops
)
263 def run_iface_list_op(cls
, ifupdownobj
, ifacenames
, op
,
264 sorted_by_dependency
=False):
265 """ Runs interface list through sub operation handler. """
267 ifupdownobj
.logger
.debug('running operation %s on all given interfaces'
269 iface_run_queue
= deque(ifacenames
)
270 for i
in range(0, len(iface_run_queue
)):
271 if op
.endswith('up'):
273 if sorted_by_dependency
:
274 ifacename
= iface_run_queue
.pop()
276 ifacename
= iface_run_queue
.popleft()
278 if sorted_by_dependency
:
279 ifacename
= iface_run_queue
.popleft()
281 ifacename
= iface_run_queue
.pop()
284 ifaceobjs
= ifupdownobj
.get_iface_objs(ifacename
)
285 for ifaceobj
in ifaceobjs
:
286 cenv
= ifupdownobj
.generate_running_env(ifaceobj
, op
)
287 cls
.run_iface_op(ifupdownobj
, ifaceobj
, op
, cenv
)
289 ifupdownobj
.log_error(str(e
))
292 def run_iface_list_ops(cls
, ifupdownobj
, ifacenames
, ops
,
293 sorted_by_dependency
=False):
294 """ Runs interface list through sub operations handler
296 Unlike run_iface_list, this method executes a sub operation on the
297 entire interface list before proceeding to the next sub-operation.
298 ie operation 'pre-up' is run through the entire interface list before
301 # Each sub operation has a module list
302 [cls
.run_iface_list_op(ifupdownobj
, ifacenames
, op
,
303 sorted_by_dependency
) for op
in ops
]
306 def run_iface_dependency_graphs_sorted(cls
, ifupdownobj
,
310 """ runs interface dependency graph by topologically sorting the interfaces """
312 if indegrees
is None:
313 indegrees
= OrderedDict()
314 for ifacename
in dependency_graphs
.keys():
315 indegrees
[ifacename
] = ifupdownobj
.get_iface_refcnt(ifacename
)
317 ifupdownobj
.logger
.debug('indegree array :')
318 ifupdownobj
.logger
.debug(ifupdownobj
.pp
.pformat(indegrees
))
321 ifupdownobj
.logger
.debug('calling topological sort on the graph ' +
324 sorted_ifacenames
= graph
.topological_sort_graphs_all(
325 dependency_graphs
, indegrees
)
327 sorted_ifacenames
= graph
.topological_sort_graphs(
328 dependency_graphs
, indegrees
)
332 ifupdownobj
.logger
.debug('sorted iface list = %s' %sorted
_ifacenames
)
333 cls
.run_iface_list_ops(ifupdownobj
, sorted_ifacenames
, ops
,
334 sorted_by_dependency
=True)
337 """ Methods to execute interfaces in parallel """
339 def init_tokens(cls
, count
):
340 cls
.token_pool
= BoundedSemaphore(count
)
343 def accquire_token(cls
, logprefix
=''):
344 cls
.token_pool
.acquire()
347 def release_token(cls
, logprefix
=''):
348 cls
.token_pool
.release()
351 def run_iface_parallel(cls
, ifupdownobj
, ifacename
, op
):
352 """ Configures interface in parallel.
354 Executes all its direct dependents in parallel
358 ifupdownobj
.logger
.debug('%s:' %ifacename
+ ' %s' %op
)
359 cls
.accquire_token(iface
)
361 # Each iface can have a list of objects
362 ifaceobjs
= ifupdownobj
.get_iface_objs(ifacename
)
363 if ifaceobjs
is None:
364 ifupdownobj
.logger
.warning('%s: ' %ifacename
+ 'not found')
365 cls
.release_token(ifacename
)
368 for ifaceobj
in ifaceobjs
:
370 dlist
= ifaceobj
.get_lowerifaces()
372 ifupdownobj
.logger
.debug('%s:' %ifacename
+
373 ' found dependents: %s' %str
(dlist
))
375 cls
.release_token(ifacename
)
376 cls
.run_iface_list_parallel(ifacename
, ifupdownobj
,
378 cls
.accquire_token(ifacename
)
380 if ifupdownobj
.ignore_error(str(e
)):
383 # Dont bring the iface up if children did not come up
384 ifupdownobj
.logger
.debug('%s:' %ifacename
+
385 ' there was an error bringing %s' %op
+
386 ' dependents (%s)', str(e
))
387 ifupdownobj
.set_iface_state(ifaceobj
,
388 ifaceState
.from_str(ops
[0]),
392 # Run all sub operations sequentially
394 ifupdownobj
.logger
.debug('%s:' %ifacename
+
395 ' running sub-operations')
396 cls
.run_iface_ops(ifupdownobj
, ifaceobj
, op
)
398 ifupdownobj
.logger
.error('%s:' %ifacename
+
399 ' error running sub operations (%s)' %str
(e
))
401 cls
.release_token(ifacename
)
404 def run_iface_list_parallel(cls
, parent
, ifupdownobj
, ifacenames
, op
):
405 """ Runs interface list in parallel """
407 running_threads
= OrderedDict()
410 for ifacename
in ifacenames
:
412 cls
.accquire_token(parent
)
413 running_threads
[ifacename
] = Thread(None,
414 cls
.run_iface_parallel
, ifacename
,
415 args
=(ifupdownobj
, ifacename
, op
))
416 running_threads
[ifacename
].start()
417 cls
.release_token(parent
)
419 cls
.release_token(parent
)
420 if ifupdownobj
.ignore_error(str(e
)):
423 raise Exception('error starting thread for iface %s'
427 ifupdownobj
.logger
.debug('%s ' %parent
+
428 'waiting for all the threads ...')
429 for ifacename
, t
in running_threads
.items():
431 if ifupdownobj
.get_iface_status(ifacename
) != ifaceStatus
.SUCCESS
:
437 def run_iface_graphs_parallel(cls
, parent
, ifupdownobj
, ifacenames
, op
):
438 """ Runs iface graphs in parallel """
440 running_threads
= OrderedDict()
443 for ifacename
in ifacenames
:
445 cls
.accquire_graph_token(parent
)
446 running_threads
[ifacename
] = Thread(None,
447 cls
.run_iface_parallel
, ifacename
,
448 args
=(ifupdownobj
, ifacename
, op
))
449 running_threads
[ifacename
].start()
450 cls
.release_graph_token(parent
)
452 cls
.release_graph_token(parent
)
453 if ifupdownobj
.ignore_error(str(e
)):
456 raise Exception('error starting thread for iface %s'
459 ifupdownobj
.logger
.info('%s ' %parent
+
460 'waiting for all the threads ...')
461 for ifacename
, t
in running_threads
.items():
463 # Check status of thread
464 # XXX: Check all objs
465 if ifupdownobj
.get_iface_status(ifacename
) != ifaceStatus
.SUCCESS
:
470 def run_iface_dependency_graph_parallel(cls
, ifupdownobj
, dependency_graph
,
472 """ Runs iface dependeny graph in parallel.
475 ifupdownobj -- ifupdown object (used for getting and updating iface
477 dependency_graph -- dependency graph with
478 operation -- 'up' or 'down' or 'query'
482 ifupdownobj
.logger
.debug('running dependency graph in parallel ..')
484 # Build a list of ifaces that dont have any dependencies
485 for ifacename
in dependency_graph
.keys():
486 if ifupdownobj
.get_iface_refcnt(ifacename
) == 0:
487 run_queue
.append(ifacename
)
489 ifupdownobj
.logger
.debug('graph roots (interfaces that dont'
490 ' have dependents):' + ' %s' %str
(run_queue
))
491 cls
.init_tokens(ifupdownobj
.get_njobs())
492 return cls
.run_iface_list_parallel('main', ifupdownobj
, run_queue
,
496 # Run one graph at a time
497 #for iface in run_queue:
498 # self.run_iface_list_parallel('main', ifupdownobj, [iface],