]>
git.proxmox.com Git - mirror_ifupdown2.git/blob - pkg/scheduler.py
3 # Copyright 2013. Cumulus Networks, Inc.
4 # Author: Roopa Prabhu, roopa@cumulusnetworks.com
10 from statemanager
import *
13 from collections
import deque
14 from collections
import OrderedDict
19 from collections
import deque
20 from threading
import *
21 from ifupdownbase
import *
23 class ifaceSchedulerFlags():
27 class ifaceScheduler():
28 """ scheduler functions to schedule configuration of interfaces.
30 supports scheduling of interfaces serially in plain interface list
31 or dependency graph format.
37 def run_iface_op(cls
, ifupdownobj
, ifaceobj
, op
, cenv
):
38 """ Runs sub operation on an interface """
39 ifacename
= ifaceobj
.get_name()
41 if (ifaceobj
.get_state() >= ifaceState
.from_str(op
) and
42 ifaceobj
.get_status() == ifaceStatus
.SUCCESS
):
43 ifupdownobj
.logger
.debug('%s: already in state %s' %(ifacename
, op
))
46 # first run ifupdownobj handlers
47 handler
= ifupdownobj
.ops_handlers
.get(op
)
49 addr_method
= ifaceobj
.get_addr_method()
50 if not addr_method
or (addr_method
and addr_method
!= 'manual'):
51 handler(ifupdownobj
, ifaceobj
)
53 if not ifupdownobj
.ADDONS_ENABLE
: return
55 for mname
in ifupdownobj
.module_ops
.get(op
):
56 m
= ifupdownobj
.modules
.get(mname
)
60 msg
= ('%s: %s : running module %s' %(ifacename
, op
, mname
))
61 if op
== 'query-checkcurr':
62 # Dont check curr if the interface object was
64 if (ifaceobj
.priv_flags
& ifupdownobj
.NOCONFIG
):
66 ifupdownobj
.logger
.debug(msg
)
68 query_ifaceobj
=ifupdownobj
.create_n_save_ifaceobjcurr(ifaceobj
))
70 ifupdownobj
.logger
.debug(msg
)
74 ifupdownobj
.log_error(str(e
))
77 ifaceobj
.set_state_n_status(ifaceState
.from_str(op
),
80 ifaceobj
.set_state_n_status(ifaceState
.from_str(op
),
83 if ifupdownobj
.COMPAT_EXEC_SCRIPTS
:
84 # execute /etc/network/ scripts
85 for mname
in ifupdownobj
.script_ops
.get(op
, []):
86 ifupdownobj
.logger
.debug('%s: %s : running script %s'
87 %(ifacename
, op
, mname
))
89 ifupdownobj
.exec_command(mname
, cmdenv
=cenv
)
91 ifupdownobj
.log_error(str(e
))
94 def run_iface_ops(cls
, ifupdownobj
, ifaceobj
, ops
):
95 """ Runs all operations on an interface """
96 ifacename
= ifaceobj
.get_name()
97 # minor optimization. If operation is 'down', proceed only
98 # if interface exists in the system
99 if 'down' in ops
[0] and not ifupdownobj
.link_exists(ifacename
):
100 ifupdownobj
.logger
.info('%s: does not exist' %ifacename
)
103 if ifupdownobj
.COMPAT_EXEC_SCRIPTS
:
104 # For backward compatibility generate env variables
106 cenv
= ifupdownobj
.generate_running_env(ifaceobj
, ops
[0])
107 map(lambda op
: cls
.run_iface_op(ifupdownobj
, ifaceobj
, op
, cenv
), ops
)
108 posthookfunc
= ifupdownobj
.sched_hooks
.get('posthook')
110 posthookfunc(ifupdownobj
, ifaceobj
)
114 def _check_upperifaces(cls
, ifupdownobj
, ifaceobj
, ops
, parent
):
115 """ Check if conflicting upper ifaces are around and warn if required
117 Returns False if this interface needs to be skipped, else return True """
119 ifacename
= ifaceobj
.get_name()
120 # Deal with upperdevs first
121 ulist
= ifaceobj
.get_upperifaces()
123 tmpulist
= ([u
for u
in ulist
if u
!= parent
] if parent
126 ifupdownobj
.logger
.info('Roopa: tmpulist = %s' %str
(tmpulist
))
129 # XXX: This is expensive. Find a cheaper way to do this
130 # if any of the upperdevs are present,
131 # dont down this interface
133 if ifupdownobj
.link_exists(u
):
134 if not ifupdownobj
.FORCE
and not ifupdownobj
.ALL
:
135 ifupdownobj
.logger
.warn('%s: ' %ifacename
+
136 ' skip interface down,' +
137 ' upperiface %s still around' %u)
139 elif 'up' in ops
[0] and not ifupdownobj
.ALL
:
140 # For 'up', just warn that there is an upperdev which is
143 if not ifupdownobj
.link_exists(u
):
144 ifupdownobj
.logger
.warn('%s: upper iface %s '
145 %(ifacename
, u
) + 'does not exist')
149 def run_iface_graph(cls
, ifupdownobj
, ifacename
, ops
, parent
=None,
150 order
=ifaceSchedulerFlags
.POSTORDER
,
151 followdependents
=True):
152 """ runs interface by traversing all nodes rooted at itself """
154 # Each ifacename can have a list of iface objects
155 ifaceobjs
= ifupdownobj
.get_ifaceobjs(ifacename
)
157 raise Exception('%s: not found' %ifacename
)
159 for ifaceobj
in ifaceobjs
:
160 if not cls
._check
_upperifaces
(ifupdownobj
, ifaceobj
, ops
, parent
):
162 if order
== ifaceSchedulerFlags
.INORDER
:
163 # If inorder, run the iface first and then its dependents
164 cls
.run_iface_ops(ifupdownobj
, ifaceobj
, ops
)
166 # Run lowerifaces or dependents
167 dlist
= ifaceobj
.get_lowerifaces()
169 ifupdownobj
.logger
.debug('%s:' %ifacename
+
170 ' found dependents: %s' %str
(dlist
))
172 if not followdependents
:
173 # XXX: this is yet another extra step,
174 # but is needed for interfaces that are
175 # implicit dependents. even though we are asked to
176 # not follow dependents, we must follow the ones
177 # that dont have user given config. Because we own them
178 new_dlist
= [d
for d
in dlist
179 if ifupdownobj
.is_iface_noconfig(d
)]
181 cls
.run_iface_list(ifupdownobj
, new_dlist
, ops
,
184 continueonfailure
=False)
186 cls
.run_iface_list(ifupdownobj
, dlist
, ops
,
189 continueonfailure
=False)
191 if (ifupdownobj
.ignore_error(str(e
))):
194 # Dont bring the iface up if children did not come up
195 ifaceobj
.set_state_n_status(ifaceState
.NEW
,
198 if order
== ifaceSchedulerFlags
.POSTORDER
:
199 cls
.run_iface_ops(ifupdownobj
, ifaceobj
, ops
)
202 def run_iface_list(cls
, ifupdownobj
, ifacenames
,
203 ops
, parent
=None, order
=ifaceSchedulerFlags
.POSTORDER
,
204 followdependents
=True, continueonfailure
=True):
205 """ Runs interface list """
207 for ifacename
in ifacenames
:
209 cls
.run_iface_graph(ifupdownobj
, ifacename
, ops
, parent
,
210 order
, followdependents
)
212 if continueonfailure
:
213 if ifupdownobj
.logger
.isEnabledFor(logging
.DEBUG
):
214 traceback
.print_tb(sys
.exc_info()[2])
215 ifupdownobj
.logger
.error('%s : %s' %(ifacename
, str(e
)))
218 if (ifupdownobj
.ignore_error(str(e
))):
221 raise Exception('error running iface %s (%s)'
222 %(ifacename
, str(e
)))
225 def run_iface_dependency_graphs(cls
, ifupdownobj
,
226 dependency_graph
, ops
, indegrees
=None,
227 order
=ifaceSchedulerFlags
.POSTORDER
,
228 followdependents
=True):
229 """ Runs iface dependeny graph by visiting all the nodes
233 ifupdownobj : ifupdown object (used for getting and updating iface
235 dependency_graph : dependency graph in adjacency list
236 format (contains more than one dependency graph)
237 ops : list of operations to perform eg ['pre-up', 'up', 'post-up']
239 indegrees : indegree array if present is used to determine roots
240 of the graphs in the dependency_graph
245 indegrees
= OrderedDict()
246 for ifacename
in dependency_graph
.keys():
247 indegrees
[ifacename
] = ifupdownobj
.get_iface_refcnt(ifacename
)
249 sorted_ifacenames
= graph
.topological_sort_graphs_all(dependency_graph
,
251 ifupdownobj
.logger
.debug('sorted ifacenames %s : '
252 %str
(sorted_ifacenames
))
254 # Build a list of ifaces that dont have any dependencies
255 for ifacename
in sorted_ifacenames
:
256 if not indegrees
.get(ifacename
):
257 run_queue
.append(ifacename
)
259 ifupdownobj
.logger
.debug('graph roots (interfaces that dont have '
260 'dependents):' + ' %s' %str
(run_queue
))
262 return cls
.run_iface_list(ifupdownobj
, run_queue
, ops
,
263 parent
=None,order
=order
,
264 followdependents
=followdependents
)
267 def run_iface(cls
, ifupdownobj
, ifacename
, ops
):
268 """ Runs operation on an interface """
270 ifaceobjs
= ifupdownobj
.get_ifaceobjs(ifacename
)
272 cls
.run_iface_ops(ifupdownobj
, i
, ops
)
275 def run_iface_list_op(cls
, ifupdownobj
, ifacenames
, op
,
276 sorted_by_dependency
=False):
277 """ Runs interface list through sub operation handler. """
279 ifupdownobj
.logger
.debug('running operation %s on all given interfaces'
281 iface_run_queue
= deque(ifacenames
)
282 for i
in range(0, len(iface_run_queue
)):
283 if op
.endswith('up'):
285 if sorted_by_dependency
:
286 ifacename
= iface_run_queue
.pop()
288 ifacename
= iface_run_queue
.popleft()
290 if sorted_by_dependency
:
291 ifacename
= iface_run_queue
.popleft()
293 ifacename
= iface_run_queue
.pop()
296 ifaceobjs
= ifupdownobj
.get_ifaceobjs(ifacename
)
297 for ifaceobj
in ifaceobjs
:
298 cenv
= ifupdownobj
.generate_running_env(ifaceobj
, op
)
299 cls
.run_iface_op(ifupdownobj
, ifaceobj
, op
, cenv
)
301 ifupdownobj
.log_error(str(e
))
304 def run_iface_list_ops(cls
, ifupdownobj
, ifacenames
, ops
,
305 sorted_by_dependency
=False):
306 """ Runs interface list through sub operations handler
308 Unlike run_iface_list, this method executes a sub operation on the
309 entire interface list before proceeding to the next sub-operation.
310 ie operation 'pre-up' is run through the entire interface list before
313 # Each sub operation has a module list
314 [cls
.run_iface_list_op(ifupdownobj
, ifacenames
, op
,
315 sorted_by_dependency
) for op
in ops
]
318 def run_iface_dependency_graphs_sorted(cls
, ifupdownobj
,
322 """ runs interface dependency graph by topologically sorting the interfaces """
324 if indegrees
is None:
325 indegrees
= OrderedDict()
326 for ifacename
in dependency_graphs
.keys():
327 indegrees
[ifacename
] = ifupdownobj
.get_iface_refcnt(ifacename
)
329 ifupdownobj
.logger
.debug('indegree array :')
330 ifupdownobj
.logger
.debug(ifupdownobj
.pp
.pformat(indegrees
))
333 ifupdownobj
.logger
.debug('calling topological sort on the graph ' +
336 sorted_ifacenames
= graph
.topological_sort_graphs_all(
337 dependency_graphs
, indegrees
)
339 sorted_ifacenames
= graph
.topological_sort_graphs(
340 dependency_graphs
, indegrees
)
344 ifupdownobj
.logger
.debug('sorted iface list = %s' %sorted
_ifacenames
)
345 cls
.run_iface_list_ops(ifupdownobj
, sorted_ifacenames
, ops
,
346 sorted_by_dependency
=True)
349 """ Methods to execute interfaces in parallel """
351 def init_tokens(cls
, count
):
352 cls
.token_pool
= BoundedSemaphore(count
)
355 def accquire_token(cls
, logprefix
=''):
356 cls
.token_pool
.acquire()
359 def release_token(cls
, logprefix
=''):
360 cls
.token_pool
.release()
363 def run_iface_parallel(cls
, ifupdownobj
, ifacename
, op
):
364 """ Configures interface in parallel.
366 Executes all its direct dependents in parallel
370 ifupdownobj
.logger
.debug('%s:' %ifacename
+ ' %s' %op
)
371 cls
.accquire_token(iface
)
373 # Each iface can have a list of objects
374 ifaceobjs
= ifupdownobj
.get_ifaceobjs(ifacename
)
375 if ifaceobjs
is None:
376 ifupdownobj
.logger
.warning('%s: ' %ifacename
+ 'not found')
377 cls
.release_token(ifacename
)
380 for ifaceobj
in ifaceobjs
:
382 dlist
= ifaceobj
.get_lowerifaces()
384 ifupdownobj
.logger
.debug('%s:' %ifacename
+
385 ' found dependents: %s' %str
(dlist
))
387 cls
.release_token(ifacename
)
388 cls
.run_iface_list_parallel(ifacename
, ifupdownobj
,
390 cls
.accquire_token(ifacename
)
392 if ifupdownobj
.ignore_error(str(e
)):
395 # Dont bring the iface up if children did not come up
396 ifupdownobj
.logger
.debug('%s:' %ifacename
+
397 ' there was an error bringing %s' %op
+
398 ' dependents (%s)', str(e
))
399 ifupdownobj
.set_iface_state(ifaceobj
,
400 ifaceState
.from_str(ops
[0]),
404 # Run all sub operations sequentially
406 ifupdownobj
.logger
.debug('%s:' %ifacename
+
407 ' running sub-operations')
408 cls
.run_iface_ops(ifupdownobj
, ifaceobj
, op
)
410 ifupdownobj
.logger
.error('%s:' %ifacename
+
411 ' error running sub operations (%s)' %str
(e
))
413 cls
.release_token(ifacename
)
416 def run_iface_list_parallel(cls
, parent
, ifupdownobj
, ifacenames
, op
):
417 """ Runs interface list in parallel """
419 running_threads
= OrderedDict()
422 for ifacename
in ifacenames
:
424 cls
.accquire_token(parent
)
425 running_threads
[ifacename
] = Thread(None,
426 cls
.run_iface_parallel
, ifacename
,
427 args
=(ifupdownobj
, ifacename
, op
))
428 running_threads
[ifacename
].start()
429 cls
.release_token(parent
)
431 cls
.release_token(parent
)
432 if ifupdownobj
.ignore_error(str(e
)):
435 raise Exception('error starting thread for iface %s'
439 ifupdownobj
.logger
.debug('%s ' %parent
+
440 'waiting for all the threads ...')
441 for ifacename
, t
in running_threads
.items():
443 if ifupdownobj
.get_iface_status(ifacename
) != ifaceStatus
.SUCCESS
:
449 def run_iface_graphs_parallel(cls
, parent
, ifupdownobj
, ifacenames
, op
):
450 """ Runs iface graphs in parallel """
452 running_threads
= OrderedDict()
455 for ifacename
in ifacenames
:
457 cls
.accquire_graph_token(parent
)
458 running_threads
[ifacename
] = Thread(None,
459 cls
.run_iface_parallel
, ifacename
,
460 args
=(ifupdownobj
, ifacename
, op
))
461 running_threads
[ifacename
].start()
462 cls
.release_graph_token(parent
)
464 cls
.release_graph_token(parent
)
465 if ifupdownobj
.ignore_error(str(e
)):
468 raise Exception('error starting thread for iface %s'
471 ifupdownobj
.logger
.info('%s ' %parent
+
472 'waiting for all the threads ...')
473 for ifacename
, t
in running_threads
.items():
475 # Check status of thread
476 # XXX: Check all objs
477 if ifupdownobj
.get_iface_status(ifacename
) != ifaceStatus
.SUCCESS
:
482 def run_iface_dependency_graph_parallel(cls
, ifupdownobj
, dependency_graph
,
484 """ Runs iface dependeny graph in parallel.
487 ifupdownobj -- ifupdown object (used for getting and updating iface
489 dependency_graph -- dependency graph with
490 operation -- 'up' or 'down' or 'query'
494 ifupdownobj
.logger
.debug('running dependency graph in parallel ..')
496 # Build a list of ifaces that dont have any dependencies
497 for ifacename
in dependency_graph
.keys():
498 if ifupdownobj
.get_iface_refcnt(ifacename
) == 0:
499 run_queue
.append(ifacename
)
501 ifupdownobj
.logger
.debug('graph roots (interfaces that dont'
502 ' have dependents):' + ' %s' %str
(run_queue
))
503 cls
.init_tokens(ifupdownobj
.get_njobs())
504 return cls
.run_iface_list_parallel('main', ifupdownobj
, run_queue
,
508 # Run one graph at a time
509 #for iface in run_queue:
510 # self.run_iface_list_parallel('main', ifupdownobj, [iface],