]> git.proxmox.com Git - mirror_ifupdown2.git/blob - pkg/scheduler.py
cleanup state manager + remove some dead code
[mirror_ifupdown2.git] / pkg / scheduler.py
1 #!/usr/bin/python
2 #
3 # Copyright 2013. Cumulus Networks, Inc.
4 # Author: Roopa Prabhu, roopa@cumulusnetworks.com
5 #
6 # ifaceScheduler --
7 # interface scheduler
8 #
9
10 from statemanager import *
11 from iface import *
12 from graph import *
13 from collections import deque
14 from collections import OrderedDict
15 import logging
16 import traceback
17 import sys
18 from graph import *
19 from collections import deque
20 from threading import *
21 from ifupdownbase import *
22
23 class ifaceSchedulerFlags():
24 INORDER = 1
25 POSTORDER = 2
26
27 class ifaceScheduler():
28 """ scheduler functions to schedule configuration of interfaces.
29
30
31 supports scheduling of interfaces serially in plain interface list
32 or dependency graph format.
33 """
34
35 token_pool = None
36
37 @classmethod
38 def run_iface_op(cls, ifupdownobj, ifaceobj, op, cenv):
39 """ Runs sub operation on an interface """
40 ifacename = ifaceobj.get_name()
41
42 if (ifaceobj.get_state() >= ifaceState.from_str(op) and
43 ifaceobj.get_status() == ifaceStatus.SUCCESS):
44 ifupdownobj.logger.debug('%s: already in state %s' %(ifacename, op))
45 return
46
47 # first run ifupdownobj handlers
48 handler = ifupdownobj.ops_handlers.get(op)
49 if handler:
50 addr_method = ifaceobj.get_addr_method()
51 if not addr_method or (addr_method and addr_method != 'manual'):
52 handler(ifupdownobj, ifaceobj)
53
54 for mname in ifupdownobj.module_ops.get(op):
55 m = ifupdownobj.modules.get(mname)
56 err = 0
57 try:
58 if hasattr(m, 'run'):
59 ifupdownobj.logger.debug('%s: %s : running module %s'
60 %(ifacename, op, mname))
61 if op == 'query-checkcurr':
62 # Dont check curr if the interface object was
63 # auto generated
64 if (ifaceobj.priv_flags & ifupdownobj.NOCONFIG):
65 continue
66 m.run(ifaceobj, op,
67 query_ifaceobj=ifupdownobj.create_n_save_ifaceobjcurr(ifaceobj))
68 else:
69 m.run(ifaceobj, op)
70 except Exception, e:
71 err = 1
72 ifupdownobj.log_error(str(e))
73 finally:
74 if err:
75 ifaceobj.set_state_n_status(ifaceState.from_str(op),
76 ifaceStatus.ERROR)
77 else:
78 ifaceobj.set_state_n_status(ifaceState.from_str(op),
79 ifaceStatus.SUCCESS)
80
81 if ifupdownobj.COMPAT_EXEC_SCRIPTS:
82 # execute /etc/network/ scripts
83 for mname in ifupdownobj.script_ops.get(op, []):
84 ifupdownobj.logger.debug('%s: %s : running script %s'
85 %(ifacename, op, mname))
86 try:
87 ifupdownobj.exec_command(mname, cmdenv=cenv)
88 except Exception, e:
89 ifupdownobj.log_error(str(e))
90
91 @classmethod
92 def run_iface_ops(cls, ifupdownobj, ifaceobj, ops):
93 """ Runs all operations on an interface """
94 cenv=None
95 if ifupdownobj.COMPAT_EXEC_SCRIPTS:
96 # For backward compatibility generate env variables
97 # for attributes
98 cenv = ifupdownobj.generate_running_env(ifaceobj, ops[0])
99 map(lambda op: cls.run_iface_op(ifupdownobj, ifaceobj, op, cenv), ops)
100 posthookfunc = ifupdownobj.sched_hooks.get('posthook')
101 if posthookfunc:
102 posthookfunc(ifupdownobj, ifaceobj)
103
104 @classmethod
105 def run_iface_graph(cls, ifupdownobj, ifacename, ops, parent=None,
106 order=ifaceSchedulerFlags.POSTORDER,
107 followdependents=True):
108 """ runs interface by traversing all nodes rooted at itself """
109
110 # minor optimization. If operation is 'down', proceed only
111 # if interface exists in the system
112 if 'down' in ops[0] and not ifupdownobj.link_exists(ifacename):
113 ifupdownobj.logger.info('%s: does not exist' %ifacename)
114 return
115
116 # Each ifacename can have a list of iface objects
117 ifaceobjs = ifupdownobj.get_ifaceobjs(ifacename)
118 if not ifaceobjs:
119 raise Exception('%s: not found' %ifacename)
120
121 for ifaceobj in ifaceobjs:
122 # Deal with upperdevs first
123 ulist = ifaceobj.get_upperifaces()
124 if ulist:
125 tmpulist = ([u for u in ulist if u != parent] if parent
126 else ulist)
127 if tmpulist:
128 if 'down' in ops[0]:
129 # XXX: This is expensive. Find a cheaper way to do this
130 # if any of the upperdevs are present,
131 # dont down this interface
132 for u in tmpulist:
133 if ifupdownobj.link_exists(u):
134 if not ifupdownobj.ALL:
135 ifupdownobj.logger.warn('%s: ' %ifacename +
136 ' skip interface down,' +
137 ' upperiface %s still around' %u)
138 return
139 elif 'up' in ops[0] and not ifupdownobj.ALL:
140 # For 'up', just warn that there is an upperdev which is
141 # probably not up
142 for u in tmpulist:
143 if not ifupdownobj.link_exists(u):
144 ifupdownobj.logger.warn('%s: upper iface %s '
145 %(ifacename, u) + 'does not exist')
146
147 if order == ifaceSchedulerFlags.INORDER:
148 # If inorder, run the iface first and then its dependents
149 cls.run_iface_ops(ifupdownobj, ifaceobj, ops)
150
151 # Run lowerifaces or dependents
152 dlist = ifaceobj.get_lowerifaces()
153 if dlist:
154 ifupdownobj.logger.info('%s:' %ifacename +
155 ' found dependents: %s' %str(dlist))
156 try:
157 if not followdependents:
158 # XXX: this is yet another extra step,
159 # but is needed for interfaces that are
160 # implicit dependents. even though we are asked to
161 # not follow dependents, we must follow the ones
162 # that dont have user given config. Because we own them
163 new_dlist = [d for d in dlist
164 if ifupdownobj.is_iface_noconfig(d)]
165 if new_dlist:
166 cls.run_iface_list(ifupdownobj, new_dlist, ops,
167 ifacename, order,
168 followdependents,
169 continueonfailure=False)
170 else:
171 cls.run_iface_list(ifupdownobj, dlist, ops,
172 ifacename, order,
173 followdependents,
174 continueonfailure=False)
175 except Exception, e:
176 if (ifupdownobj.ignore_error(str(e))):
177 pass
178 else:
179 # Dont bring the iface up if children did not come up
180 ifaceobj.set_state_n_sttaus(ifaceState.NEW,
181 ifacestatus.ERROR)
182 raise
183 if order == ifaceSchedulerFlags.POSTORDER:
184 cls.run_iface_ops(ifupdownobj, ifaceobj, ops)
185
186 @classmethod
187 def run_iface_list(cls, ifupdownobj, ifacenames,
188 ops, parent=None, order=ifaceSchedulerFlags.POSTORDER,
189 followdependents=True, continueonfailure=True):
190 """ Runs interface list """
191
192 for ifacename in ifacenames:
193 try:
194 cls.run_iface_graph(ifupdownobj, ifacename, ops, parent,
195 order, followdependents)
196 except Exception, e:
197 if continueonfailure:
198 if ifupdownobj.logger.isEnabledFor(logging.DEBUG):
199 traceback.print_tb(sys.exc_info()[2])
200 ifupdownobj.logger.error('%s : %s' %(ifacename, str(e)))
201 pass
202 else:
203 if (ifupdownobj.ignore_error(str(e))):
204 pass
205 else:
206 raise Exception('error running iface %s (%s)'
207 %(ifacename, str(e)))
208
209 @classmethod
210 def run_iface_dependency_graphs(cls, ifupdownobj,
211 dependency_graph, ops, indegrees=None,
212 order=ifaceSchedulerFlags.POSTORDER,
213 followdependents=True):
214 """ Runs iface dependeny graph by visiting all the nodes
215
216 Parameters:
217 -----------
218 ifupdownobj : ifupdown object (used for getting and updating iface
219 object state)
220 dependency_graph : dependency graph in adjacency list
221 format (contains more than one dependency graph)
222 ops : list of operations to perform eg ['pre-up', 'up', 'post-up']
223
224 indegrees : indegree array if present is used to determine roots
225 of the graphs in the dependency_graph
226 """
227 run_queue = []
228
229 if indegrees is None:
230 indegrees = OrderedDict()
231 for ifacename in dependency_graph.keys():
232 indegrees[ifacename] = ifupdownobj.get_iface_refcnt(ifacename)
233
234 sorted_ifacenames = graph.topological_sort_graphs_all(dependency_graph,
235 dict(indegrees))
236 ifupdownobj.logger.debug('sorted ifacenames %s : '
237 %str(sorted_ifacenames))
238
239 # Build a list of ifaces that dont have any dependencies
240 for ifacename in sorted_ifacenames:
241 if not indegrees.get(ifacename):
242 run_queue.append(ifacename)
243
244 ifupdownobj.logger.info('graph roots (interfaces that dont have '
245 'dependents):' + ' %s' %str(run_queue))
246
247 return cls.run_iface_list(ifupdownobj, run_queue, ops,
248 parent=None,order=order,
249 followdependents=followdependents)
250
251 @classmethod
252 def run_iface(cls, ifupdownobj, ifacename, ops):
253 """ Runs operation on an interface """
254
255 ifaceobjs = ifupdownobj.get_ifaceobjs(ifacename)
256 for i in ifaceobjs:
257 cls.run_iface_ops(ifupdownobj, i, ops)
258
259 @classmethod
260 def run_iface_list_op(cls, ifupdownobj, ifacenames, op,
261 sorted_by_dependency=False):
262 """ Runs interface list through sub operation handler. """
263
264 ifupdownobj.logger.debug('running operation %s on all given interfaces'
265 %op)
266 iface_run_queue = deque(ifacenames)
267 for i in range(0, len(iface_run_queue)):
268 if op.endswith('up'):
269 # XXX: simplify this
270 if sorted_by_dependency:
271 ifacename = iface_run_queue.pop()
272 else:
273 ifacename = iface_run_queue.popleft()
274 else:
275 if sorted_by_dependency:
276 ifacename = iface_run_queue.popleft()
277 else:
278 ifacename = iface_run_queue.pop()
279
280 try:
281 ifaceobjs = ifupdownobj.get_ifaceobjs(ifacename)
282 for ifaceobj in ifaceobjs:
283 cenv = ifupdownobj.generate_running_env(ifaceobj, op)
284 cls.run_iface_op(ifupdownobj, ifaceobj, op, cenv)
285 except Exception, e:
286 ifupdownobj.log_error(str(e))
287
288 @classmethod
289 def run_iface_list_ops(cls, ifupdownobj, ifacenames, ops,
290 sorted_by_dependency=False):
291 """ Runs interface list through sub operations handler
292
293 Unlike run_iface_list, this method executes a sub operation on the
294 entire interface list before proceeding to the next sub-operation.
295 ie operation 'pre-up' is run through the entire interface list before
296 'up'
297 """
298 # Each sub operation has a module list
299 [cls.run_iface_list_op(ifupdownobj, ifacenames, op,
300 sorted_by_dependency) for op in ops]
301
302 @classmethod
303 def run_iface_dependency_graphs_sorted(cls, ifupdownobj,
304 dependency_graphs,
305 ops, indegrees=None,
306 graphsortall=False):
307 """ runs interface dependency graph by topologically sorting the interfaces """
308
309 if indegrees is None:
310 indegrees = OrderedDict()
311 for ifacename in dependency_graphs.keys():
312 indegrees[ifacename] = ifupdownobj.get_iface_refcnt(ifacename)
313
314 ifupdownobj.logger.debug('indegree array :')
315 ifupdownobj.logger.debug(ifupdownobj.pp.pformat(indegrees))
316
317 try:
318 ifupdownobj.logger.debug('calling topological sort on the graph ' +
319 '...')
320 if graphsortall:
321 sorted_ifacenames = graph.topological_sort_graphs_all(
322 dependency_graphs, indegrees)
323 else:
324 sorted_ifacenames = graph.topological_sort_graphs(
325 dependency_graphs, indegrees)
326 except Exception:
327 raise
328
329 ifupdownobj.logger.debug('sorted iface list = %s' %sorted_ifacenames)
330 cls.run_iface_list_ops(ifupdownobj, sorted_ifacenames, ops,
331 sorted_by_dependency=True)
332
333
334 """ Methods to execute interfaces in parallel """
335 @classmethod
336 def init_tokens(cls, count):
337 cls.token_pool = BoundedSemaphore(count)
338
339 @classmethod
340 def accquire_token(cls, logprefix=''):
341 cls.token_pool.acquire()
342
343 @classmethod
344 def release_token(cls, logprefix=''):
345 cls.token_pool.release()
346
347 @classmethod
348 def run_iface_parallel(cls, ifupdownobj, ifacename, op):
349 """ Configures interface in parallel.
350
351 Executes all its direct dependents in parallel
352
353 """
354
355 ifupdownobj.logger.debug('%s:' %ifacename + ' %s' %op)
356 cls.accquire_token(iface)
357
358 # Each iface can have a list of objects
359 ifaceobjs = ifupdownobj.get_ifaceobjs(ifacename)
360 if ifaceobjs is None:
361 ifupdownobj.logger.warning('%s: ' %ifacename + 'not found')
362 cls.release_token(ifacename)
363 return -1
364
365 for ifaceobj in ifaceobjs:
366 # Run dependents
367 dlist = ifaceobj.get_lowerifaces()
368 if dlist:
369 ifupdownobj.logger.debug('%s:' %ifacename +
370 ' found dependents: %s' %str(dlist))
371 try:
372 cls.release_token(ifacename)
373 cls.run_iface_list_parallel(ifacename, ifupdownobj,
374 dlist, op)
375 cls.accquire_token(ifacename)
376 except Exception, e:
377 if ifupdownobj.ignore_error(str(e)):
378 pass
379 else:
380 # Dont bring the iface up if children did not come up
381 ifupdownobj.logger.debug('%s:' %ifacename +
382 ' there was an error bringing %s' %op +
383 ' dependents (%s)', str(e))
384 ifupdownobj.set_iface_state(ifaceobj,
385 ifaceState.from_str(ops[0]),
386 ifaceStatus.ERROR)
387 return -1
388
389 # Run all sub operations sequentially
390 try:
391 ifupdownobj.logger.debug('%s:' %ifacename +
392 ' running sub-operations')
393 cls.run_iface_ops(ifupdownobj, ifaceobj, op)
394 except Exception, e:
395 ifupdownobj.logger.error('%s:' %ifacename +
396 ' error running sub operations (%s)' %str(e))
397
398 cls.release_token(ifacename)
399
400 @classmethod
401 def run_iface_list_parallel(cls, parent, ifupdownobj, ifacenames, op):
402 """ Runs interface list in parallel """
403
404 running_threads = OrderedDict()
405 err = 0
406
407 for ifacename in ifacenames:
408 try:
409 cls.accquire_token(parent)
410 running_threads[ifacename] = Thread(None,
411 cls.run_iface_parallel, ifacename,
412 args=(ifupdownobj, ifacename, op))
413 running_threads[ifacename].start()
414 cls.release_token(parent)
415 except Exception, e:
416 cls.release_token(parent)
417 if ifupdownobj.ignore_error(str(e)):
418 pass
419 else:
420 raise Exception('error starting thread for iface %s'
421 %ifacename)
422
423
424 ifupdownobj.logger.debug('%s ' %parent +
425 'waiting for all the threads ...')
426 for ifacename, t in running_threads.items():
427 t.join()
428 if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS:
429 err += 1
430
431 return err
432
433 @classmethod
434 def run_iface_graphs_parallel(cls, parent, ifupdownobj, ifacenames, op):
435 """ Runs iface graphs in parallel """
436
437 running_threads = OrderedDict()
438 err = 0
439
440 for ifacename in ifacenames:
441 try:
442 cls.accquire_graph_token(parent)
443 running_threads[ifacename] = Thread(None,
444 cls.run_iface_parallel, ifacename,
445 args=(ifupdownobj, ifacename, op))
446 running_threads[ifacename].start()
447 cls.release_graph_token(parent)
448 except Exception, e:
449 cls.release_graph_token(parent)
450 if ifupdownobj.ignore_error(str(e)):
451 pass
452 else:
453 raise Exception('error starting thread for iface %s'
454 %ifacename)
455
456 ifupdownobj.logger.info('%s ' %parent +
457 'waiting for all the threads ...')
458 for ifacename, t in running_threads.items():
459 t.join()
460 # Check status of thread
461 # XXX: Check all objs
462 if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS:
463 err += 1
464 return err
465
466 @classmethod
467 def run_iface_dependency_graph_parallel(cls, ifupdownobj, dependency_graph,
468 operation):
469 """ Runs iface dependeny graph in parallel.
470
471 arguments:
472 ifupdownobj -- ifupdown object (used for getting and updating iface
473 object state)
474 dependency_graph -- dependency graph with
475 operation -- 'up' or 'down' or 'query'
476
477 """
478
479 ifupdownobj.logger.debug('running dependency graph in parallel ..')
480 run_queue = []
481 # Build a list of ifaces that dont have any dependencies
482 for ifacename in dependency_graph.keys():
483 if ifupdownobj.get_iface_refcnt(ifacename) == 0:
484 run_queue.append(ifacename)
485
486 ifupdownobj.logger.debug('graph roots (interfaces that dont'
487 ' have dependents):' + ' %s' %str(run_queue))
488 cls.init_tokens(ifupdownobj.get_njobs())
489 return cls.run_iface_list_parallel('main', ifupdownobj, run_queue,
490 operation)
491
492 # OR
493 # Run one graph at a time
494 #for iface in run_queue:
495 # self.run_iface_list_parallel('main', ifupdownobj, [iface],
496 # operation)
497