]>
git.proxmox.com Git - mirror_ifupdown2.git/blob - pkg/scheduler.py
3 # Copyright 2013. Cumulus Networks, Inc.
4 # Author: Roopa Prabhu, roopa@cumulusnetworks.com
10 from statemanager
import *
13 from collections
import deque
14 from collections
import OrderedDict
19 from collections
import deque
20 from threading
import *
21 from ifupdownbase
import *
23 class ifaceSchedulerFlags():
27 class ifaceScheduler():
28 """ scheduler functions to schedule configuration of interfaces.
31 supports scheduling of interfaces serially in plain interface list
32 or dependency graph format.
38 def run_iface_op(cls
, ifupdownobj
, ifaceobj
, op
, cenv
):
39 """ Runs sub operation on an interface """
40 ifacename
= ifaceobj
.get_name()
42 if (ifaceobj
.get_state() >= ifaceState
.from_str(op
) and
43 ifaceobj
.get_status() == ifaceStatus
.SUCCESS
):
44 ifupdownobj
.logger
.debug('%s: already in state %s' %(ifacename
, op
))
47 # first run ifupdownobj handlers
48 handler
= ifupdownobj
.ops_handlers
.get(op
)
50 addr_method
= ifaceobj
.get_addr_method()
51 if not addr_method
or (addr_method
and addr_method
!= 'manual'):
52 handler(ifupdownobj
, ifaceobj
)
54 for mname
in ifupdownobj
.module_ops
.get(op
):
55 m
= ifupdownobj
.modules
.get(mname
)
59 ifupdownobj
.logger
.debug('%s: %s : running module %s'
60 %(ifacename
, op
, mname
))
61 if op
== 'query-checkcurr':
62 # Dont check curr if the interface object was
64 if (ifaceobj
.priv_flags
& ifupdownobj
.NOCONFIG
):
67 query_ifaceobj
=ifupdownobj
.create_n_save_ifaceobjcurr(ifaceobj
))
72 ifupdownobj
.log_error(str(e
))
75 ifaceobj
.set_state_n_status(ifaceState
.from_str(op
),
78 ifaceobj
.set_state_n_status(ifaceState
.from_str(op
),
81 if ifupdownobj
.COMPAT_EXEC_SCRIPTS
:
82 # execute /etc/network/ scripts
83 for mname
in ifupdownobj
.script_ops
.get(op
, []):
84 ifupdownobj
.logger
.debug('%s: %s : running script %s'
85 %(ifacename
, op
, mname
))
87 ifupdownobj
.exec_command(mname
, cmdenv
=cenv
)
89 ifupdownobj
.log_error(str(e
))
92 def run_iface_ops(cls
, ifupdownobj
, ifaceobj
, ops
):
93 """ Runs all operations on an interface """
95 if ifupdownobj
.COMPAT_EXEC_SCRIPTS
:
96 # For backward compatibility generate env variables
98 cenv
= ifupdownobj
.generate_running_env(ifaceobj
, ops
[0])
99 map(lambda op
: cls
.run_iface_op(ifupdownobj
, ifaceobj
, op
, cenv
), ops
)
100 posthookfunc
= ifupdownobj
.sched_hooks
.get('posthook')
102 posthookfunc(ifupdownobj
, ifaceobj
)
105 def run_iface_graph(cls
, ifupdownobj
, ifacename
, ops
, parent
=None,
106 order
=ifaceSchedulerFlags
.POSTORDER
,
107 followdependents
=True):
108 """ runs interface by traversing all nodes rooted at itself """
110 # minor optimization. If operation is 'down', proceed only
111 # if interface exists in the system
112 if 'down' in ops
[0] and not ifupdownobj
.link_exists(ifacename
):
113 ifupdownobj
.logger
.info('%s: does not exist' %ifacename
)
116 # Each ifacename can have a list of iface objects
117 ifaceobjs
= ifupdownobj
.get_ifaceobjs(ifacename
)
119 raise Exception('%s: not found' %ifacename
)
121 for ifaceobj
in ifaceobjs
:
122 # Deal with upperdevs first
123 ulist
= ifaceobj
.get_upperifaces()
125 tmpulist
= ([u
for u
in ulist
if u
!= parent
] if parent
129 # XXX: This is expensive. Find a cheaper way to do this
130 # if any of the upperdevs are present,
131 # dont down this interface
133 if ifupdownobj
.link_exists(u
):
134 if not ifupdownobj
.ALL
:
135 ifupdownobj
.logger
.warn('%s: ' %ifacename
+
136 ' skip interface down,' +
137 ' upperiface %s still around' %u)
139 elif 'up' in ops
[0] and not ifupdownobj
.ALL
:
140 # For 'up', just warn that there is an upperdev which is
143 if not ifupdownobj
.link_exists(u
):
144 ifupdownobj
.logger
.warn('%s: upper iface %s '
145 %(ifacename
, u
) + 'does not exist')
147 if order
== ifaceSchedulerFlags
.INORDER
:
148 # If inorder, run the iface first and then its dependents
149 cls
.run_iface_ops(ifupdownobj
, ifaceobj
, ops
)
151 # Run lowerifaces or dependents
152 dlist
= ifaceobj
.get_lowerifaces()
154 ifupdownobj
.logger
.info('%s:' %ifacename
+
155 ' found dependents: %s' %str
(dlist
))
157 if not followdependents
:
158 # XXX: this is yet another extra step,
159 # but is needed for interfaces that are
160 # implicit dependents. even though we are asked to
161 # not follow dependents, we must follow the ones
162 # that dont have user given config. Because we own them
163 new_dlist
= [d
for d
in dlist
164 if ifupdownobj
.is_iface_noconfig(d
)]
166 cls
.run_iface_list(ifupdownobj
, new_dlist
, ops
,
169 continueonfailure
=False)
171 cls
.run_iface_list(ifupdownobj
, dlist
, ops
,
174 continueonfailure
=False)
176 if (ifupdownobj
.ignore_error(str(e
))):
179 # Dont bring the iface up if children did not come up
180 ifaceobj
.set_state_n_sttaus(ifaceState
.NEW
,
183 if order
== ifaceSchedulerFlags
.POSTORDER
:
184 cls
.run_iface_ops(ifupdownobj
, ifaceobj
, ops
)
187 def run_iface_list(cls
, ifupdownobj
, ifacenames
,
188 ops
, parent
=None, order
=ifaceSchedulerFlags
.POSTORDER
,
189 followdependents
=True, continueonfailure
=True):
190 """ Runs interface list """
192 for ifacename
in ifacenames
:
194 cls
.run_iface_graph(ifupdownobj
, ifacename
, ops
, parent
,
195 order
, followdependents
)
197 if continueonfailure
:
198 if ifupdownobj
.logger
.isEnabledFor(logging
.DEBUG
):
199 traceback
.print_tb(sys
.exc_info()[2])
200 ifupdownobj
.logger
.error('%s : %s' %(ifacename
, str(e
)))
203 if (ifupdownobj
.ignore_error(str(e
))):
206 raise Exception('error running iface %s (%s)'
207 %(ifacename
, str(e
)))
210 def run_iface_dependency_graphs(cls
, ifupdownobj
,
211 dependency_graph
, ops
, indegrees
=None,
212 order
=ifaceSchedulerFlags
.POSTORDER
,
213 followdependents
=True):
214 """ Runs iface dependeny graph by visiting all the nodes
218 ifupdownobj : ifupdown object (used for getting and updating iface
220 dependency_graph : dependency graph in adjacency list
221 format (contains more than one dependency graph)
222 ops : list of operations to perform eg ['pre-up', 'up', 'post-up']
224 indegrees : indegree array if present is used to determine roots
225 of the graphs in the dependency_graph
229 if indegrees
is None:
230 indegrees
= OrderedDict()
231 for ifacename
in dependency_graph
.keys():
232 indegrees
[ifacename
] = ifupdownobj
.get_iface_refcnt(ifacename
)
234 sorted_ifacenames
= graph
.topological_sort_graphs_all(dependency_graph
,
236 ifupdownobj
.logger
.debug('sorted ifacenames %s : '
237 %str
(sorted_ifacenames
))
239 # Build a list of ifaces that dont have any dependencies
240 for ifacename
in sorted_ifacenames
:
241 if not indegrees
.get(ifacename
):
242 run_queue
.append(ifacename
)
244 ifupdownobj
.logger
.info('graph roots (interfaces that dont have '
245 'dependents):' + ' %s' %str
(run_queue
))
247 return cls
.run_iface_list(ifupdownobj
, run_queue
, ops
,
248 parent
=None,order
=order
,
249 followdependents
=followdependents
)
252 def run_iface(cls
, ifupdownobj
, ifacename
, ops
):
253 """ Runs operation on an interface """
255 ifaceobjs
= ifupdownobj
.get_ifaceobjs(ifacename
)
257 cls
.run_iface_ops(ifupdownobj
, i
, ops
)
260 def run_iface_list_op(cls
, ifupdownobj
, ifacenames
, op
,
261 sorted_by_dependency
=False):
262 """ Runs interface list through sub operation handler. """
264 ifupdownobj
.logger
.debug('running operation %s on all given interfaces'
266 iface_run_queue
= deque(ifacenames
)
267 for i
in range(0, len(iface_run_queue
)):
268 if op
.endswith('up'):
270 if sorted_by_dependency
:
271 ifacename
= iface_run_queue
.pop()
273 ifacename
= iface_run_queue
.popleft()
275 if sorted_by_dependency
:
276 ifacename
= iface_run_queue
.popleft()
278 ifacename
= iface_run_queue
.pop()
281 ifaceobjs
= ifupdownobj
.get_ifaceobjs(ifacename
)
282 for ifaceobj
in ifaceobjs
:
283 cenv
= ifupdownobj
.generate_running_env(ifaceobj
, op
)
284 cls
.run_iface_op(ifupdownobj
, ifaceobj
, op
, cenv
)
286 ifupdownobj
.log_error(str(e
))
289 def run_iface_list_ops(cls
, ifupdownobj
, ifacenames
, ops
,
290 sorted_by_dependency
=False):
291 """ Runs interface list through sub operations handler
293 Unlike run_iface_list, this method executes a sub operation on the
294 entire interface list before proceeding to the next sub-operation.
295 ie operation 'pre-up' is run through the entire interface list before
298 # Each sub operation has a module list
299 [cls
.run_iface_list_op(ifupdownobj
, ifacenames
, op
,
300 sorted_by_dependency
) for op
in ops
]
303 def run_iface_dependency_graphs_sorted(cls
, ifupdownobj
,
307 """ runs interface dependency graph by topologically sorting the interfaces """
309 if indegrees
is None:
310 indegrees
= OrderedDict()
311 for ifacename
in dependency_graphs
.keys():
312 indegrees
[ifacename
] = ifupdownobj
.get_iface_refcnt(ifacename
)
314 ifupdownobj
.logger
.debug('indegree array :')
315 ifupdownobj
.logger
.debug(ifupdownobj
.pp
.pformat(indegrees
))
318 ifupdownobj
.logger
.debug('calling topological sort on the graph ' +
321 sorted_ifacenames
= graph
.topological_sort_graphs_all(
322 dependency_graphs
, indegrees
)
324 sorted_ifacenames
= graph
.topological_sort_graphs(
325 dependency_graphs
, indegrees
)
329 ifupdownobj
.logger
.debug('sorted iface list = %s' %sorted
_ifacenames
)
330 cls
.run_iface_list_ops(ifupdownobj
, sorted_ifacenames
, ops
,
331 sorted_by_dependency
=True)
334 """ Methods to execute interfaces in parallel """
336 def init_tokens(cls
, count
):
337 cls
.token_pool
= BoundedSemaphore(count
)
340 def accquire_token(cls
, logprefix
=''):
341 cls
.token_pool
.acquire()
344 def release_token(cls
, logprefix
=''):
345 cls
.token_pool
.release()
348 def run_iface_parallel(cls
, ifupdownobj
, ifacename
, op
):
349 """ Configures interface in parallel.
351 Executes all its direct dependents in parallel
355 ifupdownobj
.logger
.debug('%s:' %ifacename
+ ' %s' %op
)
356 cls
.accquire_token(iface
)
358 # Each iface can have a list of objects
359 ifaceobjs
= ifupdownobj
.get_ifaceobjs(ifacename
)
360 if ifaceobjs
is None:
361 ifupdownobj
.logger
.warning('%s: ' %ifacename
+ 'not found')
362 cls
.release_token(ifacename
)
365 for ifaceobj
in ifaceobjs
:
367 dlist
= ifaceobj
.get_lowerifaces()
369 ifupdownobj
.logger
.debug('%s:' %ifacename
+
370 ' found dependents: %s' %str
(dlist
))
372 cls
.release_token(ifacename
)
373 cls
.run_iface_list_parallel(ifacename
, ifupdownobj
,
375 cls
.accquire_token(ifacename
)
377 if ifupdownobj
.ignore_error(str(e
)):
380 # Dont bring the iface up if children did not come up
381 ifupdownobj
.logger
.debug('%s:' %ifacename
+
382 ' there was an error bringing %s' %op
+
383 ' dependents (%s)', str(e
))
384 ifupdownobj
.set_iface_state(ifaceobj
,
385 ifaceState
.from_str(ops
[0]),
389 # Run all sub operations sequentially
391 ifupdownobj
.logger
.debug('%s:' %ifacename
+
392 ' running sub-operations')
393 cls
.run_iface_ops(ifupdownobj
, ifaceobj
, op
)
395 ifupdownobj
.logger
.error('%s:' %ifacename
+
396 ' error running sub operations (%s)' %str
(e
))
398 cls
.release_token(ifacename
)
401 def run_iface_list_parallel(cls
, parent
, ifupdownobj
, ifacenames
, op
):
402 """ Runs interface list in parallel """
404 running_threads
= OrderedDict()
407 for ifacename
in ifacenames
:
409 cls
.accquire_token(parent
)
410 running_threads
[ifacename
] = Thread(None,
411 cls
.run_iface_parallel
, ifacename
,
412 args
=(ifupdownobj
, ifacename
, op
))
413 running_threads
[ifacename
].start()
414 cls
.release_token(parent
)
416 cls
.release_token(parent
)
417 if ifupdownobj
.ignore_error(str(e
)):
420 raise Exception('error starting thread for iface %s'
424 ifupdownobj
.logger
.debug('%s ' %parent
+
425 'waiting for all the threads ...')
426 for ifacename
, t
in running_threads
.items():
428 if ifupdownobj
.get_iface_status(ifacename
) != ifaceStatus
.SUCCESS
:
434 def run_iface_graphs_parallel(cls
, parent
, ifupdownobj
, ifacenames
, op
):
435 """ Runs iface graphs in parallel """
437 running_threads
= OrderedDict()
440 for ifacename
in ifacenames
:
442 cls
.accquire_graph_token(parent
)
443 running_threads
[ifacename
] = Thread(None,
444 cls
.run_iface_parallel
, ifacename
,
445 args
=(ifupdownobj
, ifacename
, op
))
446 running_threads
[ifacename
].start()
447 cls
.release_graph_token(parent
)
449 cls
.release_graph_token(parent
)
450 if ifupdownobj
.ignore_error(str(e
)):
453 raise Exception('error starting thread for iface %s'
456 ifupdownobj
.logger
.info('%s ' %parent
+
457 'waiting for all the threads ...')
458 for ifacename
, t
in running_threads
.items():
460 # Check status of thread
461 # XXX: Check all objs
462 if ifupdownobj
.get_iface_status(ifacename
) != ifaceStatus
.SUCCESS
:
467 def run_iface_dependency_graph_parallel(cls
, ifupdownobj
, dependency_graph
,
469 """ Runs iface dependeny graph in parallel.
472 ifupdownobj -- ifupdown object (used for getting and updating iface
474 dependency_graph -- dependency graph with
475 operation -- 'up' or 'down' or 'query'
479 ifupdownobj
.logger
.debug('running dependency graph in parallel ..')
481 # Build a list of ifaces that dont have any dependencies
482 for ifacename
in dependency_graph
.keys():
483 if ifupdownobj
.get_iface_refcnt(ifacename
) == 0:
484 run_queue
.append(ifacename
)
486 ifupdownobj
.logger
.debug('graph roots (interfaces that dont'
487 ' have dependents):' + ' %s' %str
(run_queue
))
488 cls
.init_tokens(ifupdownobj
.get_njobs())
489 return cls
.run_iface_list_parallel('main', ifupdownobj
, run_queue
,
493 # Run one graph at a time
494 #for iface in run_queue:
495 # self.run_iface_list_parallel('main', ifupdownobj, [iface],