]>
git.proxmox.com Git - mirror_ifupdown2.git/blob - pkg/scheduler.py
3 # Copyright 2013. Cumulus Networks, Inc.
4 # Author: Roopa Prabhu, roopa@cumulusnetworks.com
10 from statemanager
import *
13 from collections
import deque
14 from collections
import OrderedDict
19 from collections
import deque
20 from threading
import *
21 from ifupdownbase
import *
23 class ifaceSchedulerFlags():
27 class ifaceScheduler():
28 """ scheduler functions to schedule configuration of interfaces.
30 supports scheduling of interfaces serially in plain interface list
31 or dependency graph format.
39 def run_iface_op(cls
, ifupdownobj
, ifaceobj
, op
, cenv
):
40 """ Runs sub operation on an interface """
41 ifacename
= ifaceobj
.name
43 if (cls
._STATE
_CHECK
and
44 (ifaceobj
.state
>= ifaceState
.from_str(op
)) and
45 (ifaceobj
.status
== ifaceStatus
.SUCCESS
)):
46 ifupdownobj
.logger
.debug('%s: already in state %s' %(ifacename
, op
))
49 # first run ifupdownobj handlers
50 handler
= ifupdownobj
.ops_handlers
.get(op
)
52 if not ifaceobj
.addr_method
or (ifaceobj
.addr_method
and
53 ifaceobj
.addr_method
!= 'manual'):
54 handler(ifupdownobj
, ifaceobj
)
56 if not ifupdownobj
.ADDONS_ENABLE
: return
58 for mname
in ifupdownobj
.module_ops
.get(op
):
59 m
= ifupdownobj
.modules
.get(mname
)
63 msg
= ('%s: %s : running module %s' %(ifacename
, op
, mname
))
64 if op
== 'query-checkcurr':
65 # Dont check curr if the interface object was
67 if (ifaceobj
.priv_flags
& ifupdownobj
.NOCONFIG
):
69 ifupdownobj
.logger
.debug(msg
)
71 query_ifaceobj
=ifupdownobj
.create_n_save_ifaceobjcurr(ifaceobj
))
73 ifupdownobj
.logger
.debug(msg
)
77 ifupdownobj
.log_error(str(e
))
80 ifaceobj
.set_state_n_status(ifaceState
.from_str(op
),
83 ifaceobj
.set_state_n_status(ifaceState
.from_str(op
),
86 if ifupdownobj
.COMPAT_EXEC_SCRIPTS
:
87 # execute /etc/network/ scripts
88 for mname
in ifupdownobj
.script_ops
.get(op
, []):
89 ifupdownobj
.logger
.debug('%s: %s : running script %s'
90 %(ifacename
, op
, mname
))
92 ifupdownobj
.exec_command(mname
, cmdenv
=cenv
)
94 ifupdownobj
.log_error(str(e
))
97 def run_iface_ops(cls
, ifupdownobj
, ifaceobj
, ops
):
98 """ Runs all operations on an interface """
99 ifacename
= ifaceobj
.name
100 # minor optimization. If operation is 'down', proceed only
101 # if interface exists in the system
102 if 'down' in ops
[0] and not ifupdownobj
.link_exists(ifacename
):
103 ifupdownobj
.logger
.info('%s: does not exist' %ifacename
)
106 if ifupdownobj
.COMPAT_EXEC_SCRIPTS
:
107 # For backward compatibility generate env variables
109 cenv
= ifupdownobj
.generate_running_env(ifaceobj
, ops
[0])
110 map(lambda op
: cls
.run_iface_op(ifupdownobj
, ifaceobj
, op
, cenv
), ops
)
111 posthookfunc
= ifupdownobj
.sched_hooks
.get('posthook')
113 posthookfunc(ifupdownobj
, ifaceobj
)
117 def _check_upperifaces(cls
, ifupdownobj
, ifaceobj
, ops
, parent
,
118 followdependents
=False):
119 """ Check if conflicting upper ifaces are around and warn if required
121 Returns False if this interface needs to be skipped, else return True """
123 if 'up' in ops
[0] and followdependents
:
126 # Deal with upperdevs first
127 ulist
= ifaceobj
.upperifaces
129 tmpulist
= ([u
for u
in ulist
if u
!= parent
] if parent
134 # XXX: This is expensive. Find a cheaper way to do this
135 # if any of the upperdevs are present,
136 # dont down this interface
138 if ifupdownobj
.link_exists(u
):
139 if not ifupdownobj
.FORCE
and not ifupdownobj
.ALL
:
140 ifupdownobj
.logger
.warn('%s: ' %ifaceobj
.name
+
141 'upperiface %s still around' %u)
143 elif 'up' in ops
[0] and not ifupdownobj
.ALL
:
144 # For 'up', just warn that there is an upperdev which is
147 if not ifupdownobj
.link_exists(u
):
148 ifupdownobj
.logger
.warn('%s: ' %ifaceobj
.name
+
149 'upper iface %s does not exist' %u)
153 def run_iface_graph(cls
, ifupdownobj
, ifacename
, ops
, parent
=None,
154 order
=ifaceSchedulerFlags
.POSTORDER
,
155 followdependents
=True):
156 """ runs interface by traversing all nodes rooted at itself """
158 # Each ifacename can have a list of iface objects
159 ifaceobjs
= ifupdownobj
.get_ifaceobjs(ifacename
)
161 raise Exception('%s: not found' %ifacename
)
163 for ifaceobj
in ifaceobjs
:
164 if not cls
._check
_upperifaces
(ifupdownobj
, ifaceobj
, ops
, parent
,
167 if order
== ifaceSchedulerFlags
.INORDER
:
168 # If inorder, run the iface first and then its dependents
169 cls
.run_iface_ops(ifupdownobj
, ifaceobj
, ops
)
171 # Run lowerifaces or dependents
172 dlist
= ifaceobj
.lowerifaces
174 ifupdownobj
.logger
.debug('%s: found dependents %s'
175 %(ifacename
, str(dlist
)))
177 if not followdependents
:
178 # XXX: this is yet another extra step,
179 # but is needed for interfaces that are
180 # implicit dependents. even though we are asked to
181 # not follow dependents, we must follow the ones
182 # that dont have user given config. Because we own them
183 new_dlist
= [d
for d
in dlist
184 if ifupdownobj
.is_iface_noconfig(d
)]
186 cls
.run_iface_list(ifupdownobj
, new_dlist
, ops
,
189 continueonfailure
=False)
191 cls
.run_iface_list(ifupdownobj
, dlist
, ops
,
194 continueonfailure
=False)
196 if (ifupdownobj
.ignore_error(str(e
))):
199 # Dont bring the iface up if children did not come up
200 ifaceobj
.set_state_n_status(ifaceState
.NEW
,
203 if order
== ifaceSchedulerFlags
.POSTORDER
:
204 cls
.run_iface_ops(ifupdownobj
, ifaceobj
, ops
)
207 def run_iface_list(cls
, ifupdownobj
, ifacenames
,
208 ops
, parent
=None, order
=ifaceSchedulerFlags
.POSTORDER
,
209 followdependents
=True, continueonfailure
=True):
210 """ Runs interface list """
212 for ifacename
in ifacenames
:
214 cls
.run_iface_graph(ifupdownobj
, ifacename
, ops
, parent
,
215 order
, followdependents
)
217 if continueonfailure
:
218 if ifupdownobj
.logger
.isEnabledFor(logging
.DEBUG
):
219 traceback
.print_tb(sys
.exc_info()[2])
220 ifupdownobj
.logger
.error('%s : %s' %(ifacename
, str(e
)))
223 if (ifupdownobj
.ignore_error(str(e
))):
226 raise Exception('error running iface %s (%s)'
227 %(ifacename
, str(e
)))
230 def run_iface_graph_upper(cls
, ifupdownobj
, ifacename
, ops
, parent
=None,
231 followdependents
=True, skip_root
=False):
232 """ runs interface by traversing all nodes rooted at itself """
234 # Each ifacename can have a list of iface objects
235 ifaceobjs
= ifupdownobj
.get_ifaceobjs(ifacename
)
237 raise Exception('%s: not found' %ifacename
)
239 for ifaceobj
in ifaceobjs
:
241 # run the iface first and then its upperifaces
242 cls
.run_iface_ops(ifupdownobj
, ifaceobj
, ops
)
245 ulist
= ifaceobj
.upperifaces
247 ifupdownobj
.logger
.debug('%s: found upperifaces %s'
248 %(ifacename
, str(ulist
)))
250 cls
.run_iface_list_upper(ifupdownobj
, ulist
, ops
,
253 continueonfailure
=True)
255 if (ifupdownobj
.ignore_error(str(e
))):
261 def run_iface_list_upper(cls
, ifupdownobj
, ifacenames
,
262 ops
, parent
=None, followdependents
=True,
263 continueonfailure
=True, skip_root
=False):
264 """ Runs interface list """
266 for ifacename
in ifacenames
:
268 cls
.run_iface_graph_upper(ifupdownobj
, ifacename
, ops
, parent
,
269 followdependents
, skip_root
)
271 if continueonfailure
:
272 if ifupdownobj
.logger
.isEnabledFor(logging
.DEBUG
):
273 traceback
.print_tb(sys
.exc_info()[2])
274 ifupdownobj
.logger
.error('%s : %s' %(ifacename
, str(e
)))
277 if (ifupdownobj
.ignore_error(str(e
))):
280 raise Exception('error running iface %s (%s)'
281 %(ifacename
, str(e
)))
284 def sched_ifaces(cls
, ifupdownobj
, ifacenames
, ops
,
285 dependency_graph
=None, indegrees
=None,
286 order
=ifaceSchedulerFlags
.POSTORDER
,
287 followdependents
=True):
288 """ Runs iface dependeny graph by visiting all the nodes
292 ifupdownobj : ifupdown object (used for getting and updating iface
294 dependency_graph : dependency graph in adjacency list
295 format (contains more than one dependency graph)
296 ops : list of operations to perform eg ['pre-up', 'up', 'post-up']
298 indegrees : indegree array if present is used to determine roots
299 of the graphs in the dependency_graph
302 if not ifupdownobj
.ALL
or not followdependents
or len(ifacenames
) == 1:
303 cls
.run_iface_list(ifupdownobj
, ifacenames
, ops
,
304 parent
=None,order
=order
,
305 followdependents
=followdependents
)
306 if not ifupdownobj
.ALL
and followdependents
and 'up' in ops
[0]:
307 # If user had given a set of interfaces to bring up
308 # try and execute 'up' on the upperifaces
309 ifupdownobj
.logger
.info('running upperifaces if available')
310 cls
._STATE
_CHECK
= False
311 cls
.run_iface_list_upper(ifupdownobj
, ifacenames
, ops
,
313 cls
._STATE
_CHECK
= True
317 # Get a sorted list of all interfaces
319 indegrees
= OrderedDict()
320 for ifacename
in dependency_graph
.keys():
321 indegrees
[ifacename
] = ifupdownobj
.get_iface_refcnt(ifacename
)
322 sorted_ifacenames
= graph
.topological_sort_graphs_all(dependency_graph
,
324 ifupdownobj
.logger
.debug('sorted ifacenames %s : '
325 %str
(sorted_ifacenames
))
327 # From the sorted list, pick interfaces that user asked
328 # and those that dont have any dependents first
329 [run_queue
.append(ifacename
)
330 for ifacename
in sorted_ifacenames
331 if ifacename
in ifacenames
and
332 not indegrees
.get(ifacename
)]
334 ifupdownobj
.logger
.debug('graph roots (interfaces that dont have '
335 'dependents):' + ' %s' %str
(run_queue
))
336 cls
.run_iface_list(ifupdownobj
, run_queue
, ops
,
337 parent
=None,order
=order
,
338 followdependents
=followdependents
)
341 def run_iface(cls
, ifupdownobj
, ifacename
, ops
):
342 """ Runs operation on an interface """
344 ifaceobjs
= ifupdownobj
.get_ifaceobjs(ifacename
)
346 cls
.run_iface_ops(ifupdownobj
, i
, ops
)
349 def run_iface_list_op(cls
, ifupdownobj
, ifacenames
, op
,
350 sorted_by_dependency
=False):
351 """ Runs interface list through sub operation handler. """
353 ifupdownobj
.logger
.debug('running operation %s on all given interfaces'
355 iface_run_queue
= deque(ifacenames
)
356 for i
in range(0, len(iface_run_queue
)):
357 if op
.endswith('up'):
359 if sorted_by_dependency
:
360 ifacename
= iface_run_queue
.pop()
362 ifacename
= iface_run_queue
.popleft()
364 if sorted_by_dependency
:
365 ifacename
= iface_run_queue
.popleft()
367 ifacename
= iface_run_queue
.pop()
370 ifaceobjs
= ifupdownobj
.get_ifaceobjs(ifacename
)
371 for ifaceobj
in ifaceobjs
:
372 cenv
= ifupdownobj
.generate_running_env(ifaceobj
, op
)
373 cls
.run_iface_op(ifupdownobj
, ifaceobj
, op
, cenv
)
375 ifupdownobj
.log_error(str(e
))
378 def run_iface_list_ops(cls
, ifupdownobj
, ifacenames
, ops
,
379 sorted_by_dependency
=False):
380 """ Runs interface list through sub operations handler
382 Unlike run_iface_list, this method executes a sub operation on the
383 entire interface list before proceeding to the next sub-operation.
384 ie operation 'pre-up' is run through the entire interface list before
387 # Each sub operation has a module list
388 [cls
.run_iface_list_op(ifupdownobj
, ifacenames
, op
,
389 sorted_by_dependency
) for op
in ops
]
392 def run_iface_dependency_graphs_sorted(cls
, ifupdownobj
,
396 """ runs interface dependency graph by topologically sorting the interfaces """
398 if indegrees
is None:
399 indegrees
= OrderedDict()
400 for ifacename
in dependency_graphs
.keys():
401 indegrees
[ifacename
] = ifupdownobj
.get_iface_refcnt(ifacename
)
403 ifupdownobj
.logger
.debug('indegree array :')
404 ifupdownobj
.logger
.debug(ifupdownobj
.pp
.pformat(indegrees
))
407 ifupdownobj
.logger
.debug('calling topological sort on the graph ' +
410 sorted_ifacenames
= graph
.topological_sort_graphs_all(
411 dependency_graphs
, indegrees
)
413 sorted_ifacenames
= graph
.topological_sort_graphs(
414 dependency_graphs
, indegrees
)
418 ifupdownobj
.logger
.debug('sorted iface list = %s' %sorted
_ifacenames
)
419 cls
.run_iface_list_ops(ifupdownobj
, sorted_ifacenames
, ops
,
420 sorted_by_dependency
=True)
423 """ Methods to execute interfaces in parallel """
425 def init_tokens(cls
, count
):
426 cls
.token_pool
= BoundedSemaphore(count
)
429 def accquire_token(cls
, logprefix
=''):
430 cls
.token_pool
.acquire()
433 def release_token(cls
, logprefix
=''):
434 cls
.token_pool
.release()
437 def run_iface_parallel(cls
, ifupdownobj
, ifacename
, op
):
438 """ Configures interface in parallel.
440 Executes all its direct dependents in parallel
444 ifupdownobj
.logger
.debug('%s: %s' %(ifacename
, op
))
445 cls
.accquire_token(iface
)
447 # Each iface can have a list of objects
448 ifaceobjs
= ifupdownobj
.get_ifaceobjs(ifacename
)
449 if ifaceobjs
is None:
450 ifupdownobj
.logger
.warning('%s: ' %ifacename
+ 'not found')
451 cls
.release_token(ifacename
)
454 for ifaceobj
in ifaceobjs
:
456 dlist
= ifaceobj
.lowerifaces
458 ifupdownobj
.logger
.debug('%s:' %ifacename
+
459 ' found dependents: %s' %str
(dlist
))
461 cls
.release_token(ifacename
)
462 cls
.run_iface_list_parallel(ifacename
, ifupdownobj
,
464 cls
.accquire_token(ifacename
)
466 if ifupdownobj
.ignore_error(str(e
)):
469 # Dont bring the iface up if children did not come up
470 ifupdownobj
.logger
.debug('%s:' %ifacename
+
471 ' there was an error bringing %s' %op
+
472 ' dependents (%s)', str(e
))
473 ifupdownobj
.set_iface_state(ifaceobj
,
474 ifaceState
.from_str(ops
[0]),
478 # Run all sub operations sequentially
480 ifupdownobj
.logger
.debug('%s:' %ifacename
+
481 ' running sub-operations')
482 cls
.run_iface_ops(ifupdownobj
, ifaceobj
, op
)
484 ifupdownobj
.logger
.error('%s:' %ifacename
+
485 ' error running sub operations (%s)' %str
(e
))
487 cls
.release_token(ifacename
)
490 def run_iface_list_parallel(cls
, parent
, ifupdownobj
, ifacenames
, op
):
491 """ Runs interface list in parallel """
493 running_threads
= OrderedDict()
496 for ifacename
in ifacenames
:
498 cls
.accquire_token(parent
)
499 running_threads
[ifacename
] = Thread(None,
500 cls
.run_iface_parallel
, ifacename
,
501 args
=(ifupdownobj
, ifacename
, op
))
502 running_threads
[ifacename
].start()
503 cls
.release_token(parent
)
505 cls
.release_token(parent
)
506 if ifupdownobj
.ignore_error(str(e
)):
509 raise Exception('error starting thread for iface %s'
512 ifupdownobj
.logger
.debug('%s ' %parent
+
513 'waiting for all the threads ...')
514 for ifacename
, t
in running_threads
.items():
516 if ifupdownobj
.get_iface_status(ifacename
) != ifaceStatus
.SUCCESS
:
522 def run_iface_graphs_parallel(cls
, parent
, ifupdownobj
, ifacenames
, op
):
523 """ Runs iface graphs in parallel """
525 running_threads
= OrderedDict()
528 for ifacename
in ifacenames
:
530 cls
.accquire_graph_token(parent
)
531 running_threads
[ifacename
] = Thread(None,
532 cls
.run_iface_parallel
, ifacename
,
533 args
=(ifupdownobj
, ifacename
, op
))
534 running_threads
[ifacename
].start()
535 cls
.release_graph_token(parent
)
537 cls
.release_graph_token(parent
)
538 if ifupdownobj
.ignore_error(str(e
)):
541 raise Exception('error starting thread for iface %s'
544 ifupdownobj
.logger
.info('%s ' %parent
+
545 'waiting for all the threads ...')
546 for ifacename
, t
in running_threads
.items():
548 # Check status of thread
549 # XXX: Check all objs
550 if ifupdownobj
.get_iface_status(ifacename
) != ifaceStatus
.SUCCESS
:
555 def run_iface_dependency_graph_parallel(cls
, ifupdownobj
, dependency_graph
,
557 """ Runs iface dependeny graph in parallel.
560 ifupdownobj -- ifupdown object (used for getting and updating iface
562 dependency_graph -- dependency graph with
563 operation -- 'up' or 'down' or 'query'
567 ifupdownobj
.logger
.debug('running dependency graph in parallel ..')
569 # Build a list of ifaces that dont have any dependencies
570 for ifacename
in dependency_graph
.keys():
571 if ifupdownobj
.get_iface_refcnt(ifacename
) == 0:
572 run_queue
.append(ifacename
)
574 ifupdownobj
.logger
.debug('graph roots (interfaces that dont'
575 ' have dependents):' + ' %s' %str
(run_queue
))
576 cls
.init_tokens(ifupdownobj
.get_njobs())
577 return cls
.run_iface_list_parallel('main', ifupdownobj
, run_queue
,
581 # Run one graph at a time
582 #for iface in run_queue:
583 # self.run_iface_list_parallel('main', ifupdownobj, [iface],