]> git.proxmox.com Git - mirror_ifupdown2.git/blob - pkg/scheduler.py
Disable exec of legacy /etc/network/<scripts> for now because of the
[mirror_ifupdown2.git] / pkg / scheduler.py
1 #!/usr/bin/python
2 #
3 # Copyright 2013. Cumulus Networks, Inc.
4 # Author: Roopa Prabhu, roopa@cumulusnetworks.com
5 #
6 # ifaceScheduler --
7 # interface scheduler
8 #
9
10 from statemanager import *
11 from iface import *
12 from graph import *
13 from collections import deque
14 from collections import OrderedDict
15 import logging
16 import traceback
17 import sys
18 from graph import *
19 from collections import deque
20 from threading import *
21 from ifupdownbase import *
22
23 class ifaceSchedulerFlags():
24 INORDER = 1
25 POSTORDER = 2
26
27 class ifaceScheduler():
28 """ scheduler functions to schedule configuration of interfaces.
29
30
31 supports scheduling of interfaces serially in plain interface list
32 or dependency graph format.
33 """
34
35 token_pool = None
36
37 @classmethod
38 def run_iface_op(cls, ifupdownobj, ifaceobj, op, cenv):
39 """ Runs sub operation on an interface """
40 ifacename = ifaceobj.get_name()
41
42 if (ifaceobj.get_state() >= ifaceState.from_str(op) and
43 ifaceobj.get_status() == ifaceStatus.SUCCESS):
44 ifupdownobj.logger.debug('%s: already in state %s' %(ifacename, op))
45 return
46
47 # first run ifupdownobj handlers
48 handler = ifupdownobj.ops_handlers.get(op)
49 if handler:
50 addr_method = ifaceobj.get_addr_method()
51 if not addr_method or (addr_method and addr_method != 'manual'):
52 handler(ifupdownobj, ifaceobj)
53
54 for mname in ifupdownobj.module_ops.get(op):
55 m = ifupdownobj.modules.get(mname)
56 err = 0
57 try:
58 if hasattr(m, 'run'):
59 ifupdownobj.logger.debug('%s: %s : running module %s'
60 %(ifacename, op, mname))
61 if op == 'query-checkcurr':
62 # Dont check curr if the interface object was
63 # auto generated
64 if (ifaceobj.priv_flags & ifupdownobj.NOCONFIG):
65 continue
66 m.run(ifaceobj, op,
67 query_ifaceobj=ifupdownobj.create_n_save_ifaceobjcurr(ifaceobj))
68 else:
69 m.run(ifaceobj, op)
70 except Exception, e:
71 err = 1
72 ifupdownobj.log_error(str(e))
73 finally:
74 if err == 1:
75 ifupdownobj.set_iface_state(ifaceobj,
76 ifaceState.from_str(op),
77 ifaceStatus.ERROR)
78 else:
79 ifupdownobj.set_iface_state(ifaceobj,
80 ifaceState.from_str(op),
81 ifaceStatus.SUCCESS)
82
83
84 if ifupdownobj.COMPAT_EXEC_SCRIPTS:
85 # execute /etc/network/ scripts
86 for mname in ifupdownobj.script_ops.get(op, []):
87 ifupdownobj.logger.debug('%s: %s : running script %s'
88 %(ifacename, op, mname))
89 try:
90 ifupdownobj.exec_command(mname, cmdenv=cenv)
91 except Exception, e:
92 ifupdownobj.log_error(str(e))
93
94 @classmethod
95 def run_iface_ops(cls, ifupdownobj, ifaceobj, ops):
96 """ Runs all sub operations on an interface """
97 cenv=None
98
99 if ifupdownobj.COMPAT_EXEC_SCRIPTS:
100 # For backward compatibility generate env variables
101 # for attributes
102 cenv = ifupdownobj.generate_running_env(ifaceobj, ops[0])
103
104 # Each sub operation has a module list
105 map(lambda op: cls.run_iface_op(ifupdownobj, ifaceobj, op, cenv), ops)
106
107 @classmethod
108 def run_iface_graph(cls, ifupdownobj, ifacename, ops, parent=None,
109 order=ifaceSchedulerFlags.POSTORDER,
110 followdependents=True):
111 """ runs interface by traversing all nodes rooted at itself """
112
113 # minor optimization. If operation is 'down', proceed only
114 # if interface exists in the system
115 if 'down' in ops[0] and not ifupdownobj.link_exists(ifacename):
116 ifupdownobj.logger.info('%s: does not exist' %ifacename)
117 return
118
119 # Each ifacename can have a list of iface objects
120 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
121 if ifaceobjs is None:
122 raise Exception('%s: not found' %ifacename)
123
124 for ifaceobj in ifaceobjs:
125 # Deal with upperdevs first
126 ulist = ifaceobj.get_upperifaces()
127 if ulist:
128 tmpulist = ([u for u in ulist if u != parent] if parent
129 else ulist)
130 if tmpulist:
131 if 'down' in ops[0]:
132 # XXX: This is expensive. Find a cheaper way to do this
133 # if any of the upperdevs are present,
134 # dont down this interface
135 for u in tmpulist:
136 if ifupdownobj.link_exists(u):
137 if not ifupdownobj.ALL:
138 ifupdownobj.logger.warn('%s: ' %ifacename +
139 ' skip interface down,' +
140 ' upperiface %s still around' %u)
141 return
142 elif 'up' in ops[0] and not ifupdownobj.ALL:
143 # For 'up', just warn that there is an upperdev which is
144 # probably not up
145 for u in tmpulist:
146 if not ifupdownobj.link_exists(u):
147 ifupdownobj.logger.warn('%s: upper iface %s '
148 %(ifacename, u) + 'does not exist')
149
150 if order == ifaceSchedulerFlags.INORDER:
151 # If inorder, run the iface first and then its dependents
152 cls.run_iface_ops(ifupdownobj, ifaceobj, ops)
153
154 # Run lowerifaces or dependents
155 dlist = ifaceobj.get_lowerifaces()
156 if dlist:
157 ifupdownobj.logger.info('%s:' %ifacename +
158 ' found dependents: %s' %str(dlist))
159 try:
160 if not followdependents:
161 # XXX: this is yet another extra step,
162 # but is needed for interfaces that are
163 # implicit dependents. even though we are asked to
164 # not follow dependents, we must follow the ones
165 # that dont have user given config. Because we own them
166 new_dlist = [d for d in dlist
167 if ifupdownobj.is_iface_noconfig(d)]
168 if new_dlist:
169 cls.run_iface_list(ifupdownobj, new_dlist, ops,
170 ifacename, order,
171 followdependents,
172 continueonfailure=False)
173 else:
174 cls.run_iface_list(ifupdownobj, dlist, ops,
175 ifacename, order,
176 followdependents,
177 continueonfailure=False)
178 except Exception, e:
179 if (ifupdownobj.ignore_error(str(e))):
180 pass
181 else:
182 # Dont bring the iface up if children did not come up
183 ifaceobj.set_state(ifaceState.NEW)
184 ifaceobj.set_status(ifaceStatus.ERROR)
185 raise
186 if order == ifaceSchedulerFlags.POSTORDER:
187 cls.run_iface_ops(ifupdownobj, ifaceobj, ops)
188
189 @classmethod
190 def run_iface_list(cls, ifupdownobj, ifacenames,
191 ops, parent=None, order=ifaceSchedulerFlags.POSTORDER,
192 followdependents=True, continueonfailure=True):
193 """ Runs interface list """
194
195 for ifacename in ifacenames:
196 try:
197 cls.run_iface_graph(ifupdownobj, ifacename, ops, parent,
198 order, followdependents)
199 except Exception, e:
200 if continueonfailure:
201 if ifupdownobj.logger.isEnabledFor(logging.DEBUG):
202 traceback.print_tb(sys.exc_info()[2])
203 ifupdownobj.logger.error('%s : %s' %(ifacename, str(e)))
204 pass
205 else:
206 if (ifupdownobj.ignore_error(str(e))):
207 pass
208 else:
209 raise Exception('error running iface %s (%s)'
210 %(ifacename, str(e)))
211
212 @classmethod
213 def run_iface_dependency_graphs(cls, ifupdownobj,
214 dependency_graph, ops, indegrees=None,
215 order=ifaceSchedulerFlags.POSTORDER,
216 followdependents=True):
217 """ Runs iface dependeny graph by visiting all the nodes
218
219 Parameters:
220 -----------
221 ifupdownobj : ifupdown object (used for getting and updating iface
222 object state)
223 dependency_graph : dependency graph in adjacency list
224 format (contains more than one dependency graph)
225 ops : list of operations to perform eg ['pre-up', 'up', 'post-up']
226
227 indegrees : indegree array if present is used to determine roots
228 of the graphs in the dependency_graph
229 """
230 run_queue = []
231
232 if indegrees is None:
233 indegrees = OrderedDict()
234 for ifacename in dependency_graph.keys():
235 indegrees[ifacename] = ifupdownobj.get_iface_refcnt(ifacename)
236
237 sorted_ifacenames = graph.topological_sort_graphs_all(dependency_graph,
238 dict(indegrees))
239 ifupdownobj.logger.debug('sorted ifacenames %s : '
240 %str(sorted_ifacenames))
241
242 # Build a list of ifaces that dont have any dependencies
243 for ifacename in sorted_ifacenames:
244 if not indegrees.get(ifacename):
245 run_queue.append(ifacename)
246
247 ifupdownobj.logger.info('graph roots (interfaces that dont have '
248 'dependents):' + ' %s' %str(run_queue))
249
250 return cls.run_iface_list(ifupdownobj, run_queue, ops,
251 parent=None,order=order,
252 followdependents=followdependents)
253
254 @classmethod
255 def run_iface(cls, ifupdownobj, ifacename, ops):
256 """ Runs operation on an interface """
257
258 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
259 for i in ifaceobjs:
260 cls.run_iface_ops(ifupdownobj, i, ops)
261
262 @classmethod
263 def run_iface_list_op(cls, ifupdownobj, ifacenames, op,
264 sorted_by_dependency=False):
265 """ Runs interface list through sub operation handler. """
266
267 ifupdownobj.logger.debug('running operation %s on all given interfaces'
268 %op)
269 iface_run_queue = deque(ifacenames)
270 for i in range(0, len(iface_run_queue)):
271 if op.endswith('up'):
272 # XXX: simplify this
273 if sorted_by_dependency:
274 ifacename = iface_run_queue.pop()
275 else:
276 ifacename = iface_run_queue.popleft()
277 else:
278 if sorted_by_dependency:
279 ifacename = iface_run_queue.popleft()
280 else:
281 ifacename = iface_run_queue.pop()
282
283 try:
284 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
285 for ifaceobj in ifaceobjs:
286 cenv = ifupdownobj.generate_running_env(ifaceobj, op)
287 cls.run_iface_op(ifupdownobj, ifaceobj, op, cenv)
288 except Exception, e:
289 ifupdownobj.log_error(str(e))
290
291 @classmethod
292 def run_iface_list_ops(cls, ifupdownobj, ifacenames, ops,
293 sorted_by_dependency=False):
294 """ Runs interface list through sub operations handler
295
296 Unlike run_iface_list, this method executes a sub operation on the
297 entire interface list before proceeding to the next sub-operation.
298 ie operation 'pre-up' is run through the entire interface list before
299 'up'
300 """
301 # Each sub operation has a module list
302 [cls.run_iface_list_op(ifupdownobj, ifacenames, op,
303 sorted_by_dependency) for op in ops]
304
305 @classmethod
306 def run_iface_dependency_graphs_sorted(cls, ifupdownobj,
307 dependency_graphs,
308 ops, indegrees=None,
309 graphsortall=False):
310 """ runs interface dependency graph by topologically sorting the interfaces """
311
312 if indegrees is None:
313 indegrees = OrderedDict()
314 for ifacename in dependency_graphs.keys():
315 indegrees[ifacename] = ifupdownobj.get_iface_refcnt(ifacename)
316
317 ifupdownobj.logger.debug('indegree array :')
318 ifupdownobj.logger.debug(ifupdownobj.pp.pformat(indegrees))
319
320 try:
321 ifupdownobj.logger.debug('calling topological sort on the graph ' +
322 '...')
323 if graphsortall:
324 sorted_ifacenames = graph.topological_sort_graphs_all(
325 dependency_graphs, indegrees)
326 else:
327 sorted_ifacenames = graph.topological_sort_graphs(
328 dependency_graphs, indegrees)
329 except Exception:
330 raise
331
332 ifupdownobj.logger.debug('sorted iface list = %s' %sorted_ifacenames)
333 cls.run_iface_list_ops(ifupdownobj, sorted_ifacenames, ops,
334 sorted_by_dependency=True)
335
336
337 """ Methods to execute interfaces in parallel """
338 @classmethod
339 def init_tokens(cls, count):
340 cls.token_pool = BoundedSemaphore(count)
341
342 @classmethod
343 def accquire_token(cls, logprefix=''):
344 cls.token_pool.acquire()
345
346 @classmethod
347 def release_token(cls, logprefix=''):
348 cls.token_pool.release()
349
350 @classmethod
351 def run_iface_parallel(cls, ifupdownobj, ifacename, op):
352 """ Configures interface in parallel.
353
354 Executes all its direct dependents in parallel
355
356 """
357
358 ifupdownobj.logger.debug('%s:' %ifacename + ' %s' %op)
359 cls.accquire_token(iface)
360
361 # Each iface can have a list of objects
362 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
363 if ifaceobjs is None:
364 ifupdownobj.logger.warning('%s: ' %ifacename + 'not found')
365 cls.release_token(ifacename)
366 return -1
367
368 for ifaceobj in ifaceobjs:
369 # Run dependents
370 dlist = ifaceobj.get_lowerifaces()
371 if dlist:
372 ifupdownobj.logger.debug('%s:' %ifacename +
373 ' found dependents: %s' %str(dlist))
374 try:
375 cls.release_token(ifacename)
376 cls.run_iface_list_parallel(ifacename, ifupdownobj,
377 dlist, op)
378 cls.accquire_token(ifacename)
379 except Exception, e:
380 if ifupdownobj.ignore_error(str(e)):
381 pass
382 else:
383 # Dont bring the iface up if children did not come up
384 ifupdownobj.logger.debug('%s:' %ifacename +
385 ' there was an error bringing %s' %op +
386 ' dependents (%s)', str(e))
387 ifupdownobj.set_iface_state(ifaceobj,
388 ifaceState.from_str(ops[0]),
389 ifaceStatus.ERROR)
390 return -1
391
392 # Run all sub operations sequentially
393 try:
394 ifupdownobj.logger.debug('%s:' %ifacename +
395 ' running sub-operations')
396 cls.run_iface_ops(ifupdownobj, ifaceobj, op)
397 except Exception, e:
398 ifupdownobj.logger.error('%s:' %ifacename +
399 ' error running sub operations (%s)' %str(e))
400
401 cls.release_token(ifacename)
402
403 @classmethod
404 def run_iface_list_parallel(cls, parent, ifupdownobj, ifacenames, op):
405 """ Runs interface list in parallel """
406
407 running_threads = OrderedDict()
408 err = 0
409
410 for ifacename in ifacenames:
411 try:
412 cls.accquire_token(parent)
413 running_threads[ifacename] = Thread(None,
414 cls.run_iface_parallel, ifacename,
415 args=(ifupdownobj, ifacename, op))
416 running_threads[ifacename].start()
417 cls.release_token(parent)
418 except Exception, e:
419 cls.release_token(parent)
420 if ifupdownobj.ignore_error(str(e)):
421 pass
422 else:
423 raise Exception('error starting thread for iface %s'
424 %ifacename)
425
426
427 ifupdownobj.logger.debug('%s ' %parent +
428 'waiting for all the threads ...')
429 for ifacename, t in running_threads.items():
430 t.join()
431 if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS:
432 err += 1
433
434 return err
435
436 @classmethod
437 def run_iface_graphs_parallel(cls, parent, ifupdownobj, ifacenames, op):
438 """ Runs iface graphs in parallel """
439
440 running_threads = OrderedDict()
441 err = 0
442
443 for ifacename in ifacenames:
444 try:
445 cls.accquire_graph_token(parent)
446 running_threads[ifacename] = Thread(None,
447 cls.run_iface_parallel, ifacename,
448 args=(ifupdownobj, ifacename, op))
449 running_threads[ifacename].start()
450 cls.release_graph_token(parent)
451 except Exception, e:
452 cls.release_graph_token(parent)
453 if ifupdownobj.ignore_error(str(e)):
454 pass
455 else:
456 raise Exception('error starting thread for iface %s'
457 %ifacename)
458
459 ifupdownobj.logger.info('%s ' %parent +
460 'waiting for all the threads ...')
461 for ifacename, t in running_threads.items():
462 t.join()
463 # Check status of thread
464 # XXX: Check all objs
465 if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS:
466 err += 1
467 return err
468
469 @classmethod
470 def run_iface_dependency_graph_parallel(cls, ifupdownobj, dependency_graph,
471 operation):
472 """ Runs iface dependeny graph in parallel.
473
474 arguments:
475 ifupdownobj -- ifupdown object (used for getting and updating iface
476 object state)
477 dependency_graph -- dependency graph with
478 operation -- 'up' or 'down' or 'query'
479
480 """
481
482 ifupdownobj.logger.debug('running dependency graph in parallel ..')
483 run_queue = []
484 # Build a list of ifaces that dont have any dependencies
485 for ifacename in dependency_graph.keys():
486 if ifupdownobj.get_iface_refcnt(ifacename) == 0:
487 run_queue.append(ifacename)
488
489 ifupdownobj.logger.debug('graph roots (interfaces that dont'
490 ' have dependents):' + ' %s' %str(run_queue))
491 cls.init_tokens(ifupdownobj.get_njobs())
492 return cls.run_iface_list_parallel('main', ifupdownobj, run_queue,
493 operation)
494
495 # OR
496 # Run one graph at a time
497 #for iface in run_queue:
498 # self.run_iface_list_parallel('main', ifupdownobj, [iface],
499 # operation)
500