]> git.proxmox.com Git - mirror_ifupdown2.git/blob - pkg/scheduler.py
cleanup (mostly cosmetic)
[mirror_ifupdown2.git] / pkg / scheduler.py
1 #!/usr/bin/python
2 #
3 # Copyright 2013. Cumulus Networks, Inc.
4 # Author: Roopa Prabhu, roopa@cumulusnetworks.com
5 #
6 # ifaceScheduler --
7 # interface scheduler
8 #
9
10 from statemanager import *
11 from iface import *
12 from graph import *
13 from collections import deque
14 from collections import OrderedDict
15 import logging
16 import traceback
17 import sys
18 from graph import *
19 from collections import deque
20 from threading import *
21 from ifupdownbase import *
22
23 class ifaceSchedulerFlags():
24 INORDER = 0x1
25 POSTORDER = 0x2
26
27 class ifaceScheduler():
28 """ scheduler functions to schedule configuration of interfaces.
29
30 supports scheduling of interfaces serially in plain interface list
31 or dependency graph format.
32 """
33
34 _STATE_CHECK = True
35
36 token_pool = None
37
38 @classmethod
39 def run_iface_op(cls, ifupdownobj, ifaceobj, op, cenv):
40 """ Runs sub operation on an interface """
41 ifacename = ifaceobj.name
42
43 if (cls._STATE_CHECK and
44 (ifaceobj.state >= ifaceState.from_str(op)) and
45 (ifaceobj.status == ifaceStatus.SUCCESS)):
46 ifupdownobj.logger.debug('%s: already in state %s' %(ifacename, op))
47 return
48
49 # first run ifupdownobj handlers
50 handler = ifupdownobj.ops_handlers.get(op)
51 if handler:
52 if not ifaceobj.addr_method or (ifaceobj.addr_method and
53 ifaceobj.addr_method != 'manual'):
54 handler(ifupdownobj, ifaceobj)
55
56 if not ifupdownobj.ADDONS_ENABLE: return
57
58 for mname in ifupdownobj.module_ops.get(op):
59 m = ifupdownobj.modules.get(mname)
60 err = 0
61 try:
62 if hasattr(m, 'run'):
63 msg = ('%s: %s : running module %s' %(ifacename, op, mname))
64 if op == 'query-checkcurr':
65 # Dont check curr if the interface object was
66 # auto generated
67 if (ifaceobj.priv_flags & ifupdownobj.NOCONFIG):
68 continue
69 ifupdownobj.logger.debug(msg)
70 m.run(ifaceobj, op,
71 query_ifaceobj=ifupdownobj.create_n_save_ifaceobjcurr(ifaceobj))
72 else:
73 ifupdownobj.logger.debug(msg)
74 m.run(ifaceobj, op)
75 except Exception, e:
76 err = 1
77 ifupdownobj.log_error(str(e))
78 finally:
79 if err:
80 ifaceobj.set_state_n_status(ifaceState.from_str(op),
81 ifaceStatus.ERROR)
82 else:
83 ifaceobj.set_state_n_status(ifaceState.from_str(op),
84 ifaceStatus.SUCCESS)
85
86 if ifupdownobj.COMPAT_EXEC_SCRIPTS:
87 # execute /etc/network/ scripts
88 for mname in ifupdownobj.script_ops.get(op, []):
89 ifupdownobj.logger.debug('%s: %s : running script %s'
90 %(ifacename, op, mname))
91 try:
92 ifupdownobj.exec_command(mname, cmdenv=cenv)
93 except Exception, e:
94 ifupdownobj.log_error(str(e))
95
96 @classmethod
97 def run_iface_ops(cls, ifupdownobj, ifaceobj, ops):
98 """ Runs all operations on an interface """
99 ifacename = ifaceobj.name
100 # minor optimization. If operation is 'down', proceed only
101 # if interface exists in the system
102 if 'down' in ops[0] and not ifupdownobj.link_exists(ifacename):
103 ifupdownobj.logger.info('%s: does not exist' %ifacename)
104 return
105 cenv=None
106 if ifupdownobj.COMPAT_EXEC_SCRIPTS:
107 # For backward compatibility generate env variables
108 # for attributes
109 cenv = ifupdownobj.generate_running_env(ifaceobj, ops[0])
110 map(lambda op: cls.run_iface_op(ifupdownobj, ifaceobj, op, cenv), ops)
111 posthookfunc = ifupdownobj.sched_hooks.get('posthook')
112 if posthookfunc:
113 posthookfunc(ifupdownobj, ifaceobj)
114
115
116 @classmethod
117 def _check_upperifaces(cls, ifupdownobj, ifaceobj, ops, parent, followdependents=False):
118 """ Check if conflicting upper ifaces are around and warn if required
119
120 Returns False if this interface needs to be skipped, else return True """
121
122 if 'up' in ops[0] and followdependents:
123 return True
124
125 ifacename = ifaceobj.name
126 # Deal with upperdevs first
127 ulist = ifaceobj.upperifaces
128 if ulist:
129 tmpulist = ([u for u in ulist if u != parent] if parent
130 else ulist)
131 if not tmpulist:
132 return True
133 if 'down' in ops[0]:
134 # XXX: This is expensive. Find a cheaper way to do this
135 # if any of the upperdevs are present,
136 # dont down this interface
137 for u in tmpulist:
138 if ifupdownobj.link_exists(u):
139 if not ifupdownobj.FORCE and not ifupdownobj.ALL:
140 ifupdownobj.logger.warn('%s: ' %ifacename +
141 ' skip interface down,' +
142 ' upperiface %s still around' %u)
143 return False
144 elif 'up' in ops[0] and not ifupdownobj.ALL:
145 # For 'up', just warn that there is an upperdev which is
146 # probably not up
147 for u in tmpulist:
148 if not ifupdownobj.link_exists(u):
149 ifupdownobj.logger.warn('%s: upper iface %s '
150 %(ifacename, u) + 'does not exist')
151 return True
152
153 @classmethod
154 def run_iface_graph(cls, ifupdownobj, ifacename, ops, parent=None,
155 order=ifaceSchedulerFlags.POSTORDER,
156 followdependents=True):
157 """ runs interface by traversing all nodes rooted at itself """
158
159 # Each ifacename can have a list of iface objects
160 ifaceobjs = ifupdownobj.get_ifaceobjs(ifacename)
161 if not ifaceobjs:
162 raise Exception('%s: not found' %ifacename)
163
164 for ifaceobj in ifaceobjs:
165 if not cls._check_upperifaces(ifupdownobj, ifaceobj, ops, parent,
166 followdependents):
167 return
168 if order == ifaceSchedulerFlags.INORDER:
169 # If inorder, run the iface first and then its dependents
170 cls.run_iface_ops(ifupdownobj, ifaceobj, ops)
171
172 # Run lowerifaces or dependents
173 dlist = ifaceobj.lowerifaces
174 if dlist:
175 ifupdownobj.logger.debug('%s:' %ifacename +
176 ' found dependents: %s' %str(dlist))
177 try:
178 if not followdependents:
179 # XXX: this is yet another extra step,
180 # but is needed for interfaces that are
181 # implicit dependents. even though we are asked to
182 # not follow dependents, we must follow the ones
183 # that dont have user given config. Because we own them
184 new_dlist = [d for d in dlist
185 if ifupdownobj.is_iface_noconfig(d)]
186 if new_dlist:
187 cls.run_iface_list(ifupdownobj, new_dlist, ops,
188 ifacename, order,
189 followdependents,
190 continueonfailure=False)
191 else:
192 cls.run_iface_list(ifupdownobj, dlist, ops,
193 ifacename, order,
194 followdependents,
195 continueonfailure=False)
196 except Exception, e:
197 if (ifupdownobj.ignore_error(str(e))):
198 pass
199 else:
200 # Dont bring the iface up if children did not come up
201 ifaceobj.set_state_n_status(ifaceState.NEW,
202 ifaceStatus.ERROR)
203 raise
204 if order == ifaceSchedulerFlags.POSTORDER:
205 cls.run_iface_ops(ifupdownobj, ifaceobj, ops)
206
207 @classmethod
208 def run_iface_list(cls, ifupdownobj, ifacenames,
209 ops, parent=None, order=ifaceSchedulerFlags.POSTORDER,
210 followdependents=True, continueonfailure=True):
211 """ Runs interface list """
212
213 for ifacename in ifacenames:
214 try:
215 cls.run_iface_graph(ifupdownobj, ifacename, ops, parent,
216 order, followdependents)
217 except Exception, e:
218 if continueonfailure:
219 if ifupdownobj.logger.isEnabledFor(logging.DEBUG):
220 traceback.print_tb(sys.exc_info()[2])
221 ifupdownobj.logger.error('%s : %s' %(ifacename, str(e)))
222 pass
223 else:
224 if (ifupdownobj.ignore_error(str(e))):
225 pass
226 else:
227 raise Exception('error running iface %s (%s)'
228 %(ifacename, str(e)))
229
230 @classmethod
231 def run_iface_graph_upper(cls, ifupdownobj, ifacename, ops, parent=None,
232 followdependents=True, skip_root=False):
233 """ runs interface by traversing all nodes rooted at itself """
234
235 # Each ifacename can have a list of iface objects
236 ifaceobjs = ifupdownobj.get_ifaceobjs(ifacename)
237 if not ifaceobjs:
238 raise Exception('%s: not found' %ifacename)
239
240 for ifaceobj in ifaceobjs:
241 if not skip_root:
242 # run the iface first and then its upperifaces
243 cls.run_iface_ops(ifupdownobj, ifaceobj, ops)
244
245 # Run upperifaces
246 ulist = ifaceobj.upperifaces
247 if ulist:
248 ifupdownobj.logger.debug('%s:' %ifacename +
249 ' found upperifaces: %s' %str(ulist))
250 try:
251 cls.run_iface_list_upper(ifupdownobj, ulist, ops,
252 ifacename,
253 followdependents,
254 continueonfailure=True)
255 except Exception, e:
256 if (ifupdownobj.ignore_error(str(e))):
257 pass
258 else:
259 raise
260
261 @classmethod
262 def run_iface_list_upper(cls, ifupdownobj, ifacenames,
263 ops, parent=None, followdependents=True,
264 continueonfailure=True, skip_root=False):
265 """ Runs interface list """
266
267 for ifacename in ifacenames:
268 try:
269 cls.run_iface_graph_upper(ifupdownobj, ifacename, ops, parent,
270 followdependents, skip_root)
271 except Exception, e:
272 if continueonfailure:
273 if ifupdownobj.logger.isEnabledFor(logging.DEBUG):
274 traceback.print_tb(sys.exc_info()[2])
275 ifupdownobj.logger.error('%s : %s' %(ifacename, str(e)))
276 pass
277 else:
278 if (ifupdownobj.ignore_error(str(e))):
279 pass
280 else:
281 raise Exception('error running iface %s (%s)'
282 %(ifacename, str(e)))
283
284 @classmethod
285 def sched_ifaces(cls, ifupdownobj, ifacenames, ops,
286 dependency_graph=None, indegrees=None,
287 order=ifaceSchedulerFlags.POSTORDER,
288 followdependents=True):
289 """ Runs iface dependeny graph by visiting all the nodes
290
291 Parameters:
292 -----------
293 ifupdownobj : ifupdown object (used for getting and updating iface
294 object state)
295 dependency_graph : dependency graph in adjacency list
296 format (contains more than one dependency graph)
297 ops : list of operations to perform eg ['pre-up', 'up', 'post-up']
298
299 indegrees : indegree array if present is used to determine roots
300 of the graphs in the dependency_graph
301 """
302
303 if not ifupdownobj.ALL or not followdependents or len(ifacenames) == 1:
304 cls.run_iface_list(ifupdownobj, ifacenames, ops,
305 parent=None,order=order,
306 followdependents=followdependents)
307 if not ifupdownobj.ALL and followdependents and 'up' in ops[0]:
308 # If user had given a set of interfaces to bring up
309 # try and execute 'up' on the upperifaces
310 ifupdownobj.logger.info('running upperifaces if available')
311 cls._STATE_CHECK = False
312 cls.run_iface_list_upper(ifupdownobj, ifacenames, ops,
313 skip_root=True)
314 cls._STATE_CHECK = True
315 return
316 run_queue = []
317
318 # Get a sorted list of all interfaces
319 if not indegrees:
320 indegrees = OrderedDict()
321 for ifacename in dependency_graph.keys():
322 indegrees[ifacename] = ifupdownobj.get_iface_refcnt(ifacename)
323 sorted_ifacenames = graph.topological_sort_graphs_all(dependency_graph,
324 dict(indegrees))
325 ifupdownobj.logger.debug('sorted ifacenames %s : '
326 %str(sorted_ifacenames))
327
328 # From the sorted list, pick interfaces that user asked
329 # and those that dont have any dependents first
330 [run_queue.append(ifacename)
331 for ifacename in sorted_ifacenames
332 if ifacename in ifacenames and
333 not indegrees.get(ifacename)]
334
335 ifupdownobj.logger.debug('graph roots (interfaces that dont have '
336 'dependents):' + ' %s' %str(run_queue))
337 cls.run_iface_list(ifupdownobj, run_queue, ops,
338 parent=None,order=order,
339 followdependents=followdependents)
340
341 @classmethod
342 def run_iface(cls, ifupdownobj, ifacename, ops):
343 """ Runs operation on an interface """
344
345 ifaceobjs = ifupdownobj.get_ifaceobjs(ifacename)
346 for i in ifaceobjs:
347 cls.run_iface_ops(ifupdownobj, i, ops)
348
349 @classmethod
350 def run_iface_list_op(cls, ifupdownobj, ifacenames, op,
351 sorted_by_dependency=False):
352 """ Runs interface list through sub operation handler. """
353
354 ifupdownobj.logger.debug('running operation %s on all given interfaces'
355 %op)
356 iface_run_queue = deque(ifacenames)
357 for i in range(0, len(iface_run_queue)):
358 if op.endswith('up'):
359 # XXX: simplify this
360 if sorted_by_dependency:
361 ifacename = iface_run_queue.pop()
362 else:
363 ifacename = iface_run_queue.popleft()
364 else:
365 if sorted_by_dependency:
366 ifacename = iface_run_queue.popleft()
367 else:
368 ifacename = iface_run_queue.pop()
369
370 try:
371 ifaceobjs = ifupdownobj.get_ifaceobjs(ifacename)
372 for ifaceobj in ifaceobjs:
373 cenv = ifupdownobj.generate_running_env(ifaceobj, op)
374 cls.run_iface_op(ifupdownobj, ifaceobj, op, cenv)
375 except Exception, e:
376 ifupdownobj.log_error(str(e))
377
378 @classmethod
379 def run_iface_list_ops(cls, ifupdownobj, ifacenames, ops,
380 sorted_by_dependency=False):
381 """ Runs interface list through sub operations handler
382
383 Unlike run_iface_list, this method executes a sub operation on the
384 entire interface list before proceeding to the next sub-operation.
385 ie operation 'pre-up' is run through the entire interface list before
386 'up'
387 """
388 # Each sub operation has a module list
389 [cls.run_iface_list_op(ifupdownobj, ifacenames, op,
390 sorted_by_dependency) for op in ops]
391
392 @classmethod
393 def run_iface_dependency_graphs_sorted(cls, ifupdownobj,
394 dependency_graphs,
395 ops, indegrees=None,
396 graphsortall=False):
397 """ runs interface dependency graph by topologically sorting the interfaces """
398
399 if indegrees is None:
400 indegrees = OrderedDict()
401 for ifacename in dependency_graphs.keys():
402 indegrees[ifacename] = ifupdownobj.get_iface_refcnt(ifacename)
403
404 ifupdownobj.logger.debug('indegree array :')
405 ifupdownobj.logger.debug(ifupdownobj.pp.pformat(indegrees))
406
407 try:
408 ifupdownobj.logger.debug('calling topological sort on the graph ' +
409 '...')
410 if graphsortall:
411 sorted_ifacenames = graph.topological_sort_graphs_all(
412 dependency_graphs, indegrees)
413 else:
414 sorted_ifacenames = graph.topological_sort_graphs(
415 dependency_graphs, indegrees)
416 except Exception:
417 raise
418
419 ifupdownobj.logger.debug('sorted iface list = %s' %sorted_ifacenames)
420 cls.run_iface_list_ops(ifupdownobj, sorted_ifacenames, ops,
421 sorted_by_dependency=True)
422
423
424 """ Methods to execute interfaces in parallel """
425 @classmethod
426 def init_tokens(cls, count):
427 cls.token_pool = BoundedSemaphore(count)
428
429 @classmethod
430 def accquire_token(cls, logprefix=''):
431 cls.token_pool.acquire()
432
433 @classmethod
434 def release_token(cls, logprefix=''):
435 cls.token_pool.release()
436
437 @classmethod
438 def run_iface_parallel(cls, ifupdownobj, ifacename, op):
439 """ Configures interface in parallel.
440
441 Executes all its direct dependents in parallel
442
443 """
444
445 ifupdownobj.logger.debug('%s:' %ifacename + ' %s' %op)
446 cls.accquire_token(iface)
447
448 # Each iface can have a list of objects
449 ifaceobjs = ifupdownobj.get_ifaceobjs(ifacename)
450 if ifaceobjs is None:
451 ifupdownobj.logger.warning('%s: ' %ifacename + 'not found')
452 cls.release_token(ifacename)
453 return -1
454
455 for ifaceobj in ifaceobjs:
456 # Run dependents
457 dlist = ifaceobj.lowerifaces
458 if dlist:
459 ifupdownobj.logger.debug('%s:' %ifacename +
460 ' found dependents: %s' %str(dlist))
461 try:
462 cls.release_token(ifacename)
463 cls.run_iface_list_parallel(ifacename, ifupdownobj,
464 dlist, op)
465 cls.accquire_token(ifacename)
466 except Exception, e:
467 if ifupdownobj.ignore_error(str(e)):
468 pass
469 else:
470 # Dont bring the iface up if children did not come up
471 ifupdownobj.logger.debug('%s:' %ifacename +
472 ' there was an error bringing %s' %op +
473 ' dependents (%s)', str(e))
474 ifupdownobj.set_iface_state(ifaceobj,
475 ifaceState.from_str(ops[0]),
476 ifaceStatus.ERROR)
477 return -1
478
479 # Run all sub operations sequentially
480 try:
481 ifupdownobj.logger.debug('%s:' %ifacename +
482 ' running sub-operations')
483 cls.run_iface_ops(ifupdownobj, ifaceobj, op)
484 except Exception, e:
485 ifupdownobj.logger.error('%s:' %ifacename +
486 ' error running sub operations (%s)' %str(e))
487
488 cls.release_token(ifacename)
489
490 @classmethod
491 def run_iface_list_parallel(cls, parent, ifupdownobj, ifacenames, op):
492 """ Runs interface list in parallel """
493
494 running_threads = OrderedDict()
495 err = 0
496
497 for ifacename in ifacenames:
498 try:
499 cls.accquire_token(parent)
500 running_threads[ifacename] = Thread(None,
501 cls.run_iface_parallel, ifacename,
502 args=(ifupdownobj, ifacename, op))
503 running_threads[ifacename].start()
504 cls.release_token(parent)
505 except Exception, e:
506 cls.release_token(parent)
507 if ifupdownobj.ignore_error(str(e)):
508 pass
509 else:
510 raise Exception('error starting thread for iface %s'
511 %ifacename)
512
513
514 ifupdownobj.logger.debug('%s ' %parent +
515 'waiting for all the threads ...')
516 for ifacename, t in running_threads.items():
517 t.join()
518 if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS:
519 err += 1
520
521 return err
522
523 @classmethod
524 def run_iface_graphs_parallel(cls, parent, ifupdownobj, ifacenames, op):
525 """ Runs iface graphs in parallel """
526
527 running_threads = OrderedDict()
528 err = 0
529
530 for ifacename in ifacenames:
531 try:
532 cls.accquire_graph_token(parent)
533 running_threads[ifacename] = Thread(None,
534 cls.run_iface_parallel, ifacename,
535 args=(ifupdownobj, ifacename, op))
536 running_threads[ifacename].start()
537 cls.release_graph_token(parent)
538 except Exception, e:
539 cls.release_graph_token(parent)
540 if ifupdownobj.ignore_error(str(e)):
541 pass
542 else:
543 raise Exception('error starting thread for iface %s'
544 %ifacename)
545
546 ifupdownobj.logger.info('%s ' %parent +
547 'waiting for all the threads ...')
548 for ifacename, t in running_threads.items():
549 t.join()
550 # Check status of thread
551 # XXX: Check all objs
552 if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS:
553 err += 1
554 return err
555
556 @classmethod
557 def run_iface_dependency_graph_parallel(cls, ifupdownobj, dependency_graph,
558 operation):
559 """ Runs iface dependeny graph in parallel.
560
561 arguments:
562 ifupdownobj -- ifupdown object (used for getting and updating iface
563 object state)
564 dependency_graph -- dependency graph with
565 operation -- 'up' or 'down' or 'query'
566
567 """
568
569 ifupdownobj.logger.debug('running dependency graph in parallel ..')
570 run_queue = []
571 # Build a list of ifaces that dont have any dependencies
572 for ifacename in dependency_graph.keys():
573 if ifupdownobj.get_iface_refcnt(ifacename) == 0:
574 run_queue.append(ifacename)
575
576 ifupdownobj.logger.debug('graph roots (interfaces that dont'
577 ' have dependents):' + ' %s' %str(run_queue))
578 cls.init_tokens(ifupdownobj.get_njobs())
579 return cls.run_iface_list_parallel('main', ifupdownobj, run_queue,
580 operation)
581
582 # OR
583 # Run one graph at a time
584 #for iface in run_queue:
585 # self.run_iface_list_parallel('main', ifupdownobj, [iface],
586 # operation)
587