]> git.proxmox.com Git - mirror_ifupdown2.git/blob - pkg/scheduler.py
Fix upperiface check to not barf when --force is used
[mirror_ifupdown2.git] / pkg / scheduler.py
1 #!/usr/bin/python
2 #
3 # Copyright 2013. Cumulus Networks, Inc.
4 # Author: Roopa Prabhu, roopa@cumulusnetworks.com
5 #
6 # ifaceScheduler --
7 # interface scheduler
8 #
9
10 from statemanager import *
11 from iface import *
12 from graph import *
13 from collections import deque
14 from collections import OrderedDict
15 import logging
16 import traceback
17 import sys
18 from graph import *
19 from collections import deque
20 from threading import *
21 from ifupdownbase import *
22
23 class ifaceSchedulerFlags():
24 INORDER = 1
25 POSTORDER = 2
26
27 class ifaceScheduler():
28 """ scheduler functions to schedule configuration of interfaces.
29
30 supports scheduling of interfaces serially in plain interface list
31 or dependency graph format.
32 """
33
34 token_pool = None
35
36 @classmethod
37 def run_iface_op(cls, ifupdownobj, ifaceobj, op, cenv):
38 """ Runs sub operation on an interface """
39 ifacename = ifaceobj.get_name()
40
41 if (ifaceobj.get_state() >= ifaceState.from_str(op) and
42 ifaceobj.get_status() == ifaceStatus.SUCCESS):
43 ifupdownobj.logger.debug('%s: already in state %s' %(ifacename, op))
44 return
45
46 # first run ifupdownobj handlers
47 handler = ifupdownobj.ops_handlers.get(op)
48 if handler:
49 addr_method = ifaceobj.get_addr_method()
50 if not addr_method or (addr_method and addr_method != 'manual'):
51 handler(ifupdownobj, ifaceobj)
52
53 if not ifupdownobj.ADDONS_ENABLE: return
54
55 for mname in ifupdownobj.module_ops.get(op):
56 m = ifupdownobj.modules.get(mname)
57 err = 0
58 try:
59 if hasattr(m, 'run'):
60 msg = ('%s: %s : running module %s' %(ifacename, op, mname))
61 if op == 'query-checkcurr':
62 # Dont check curr if the interface object was
63 # auto generated
64 if (ifaceobj.priv_flags & ifupdownobj.NOCONFIG):
65 continue
66 ifupdownobj.logger.debug(msg)
67 m.run(ifaceobj, op,
68 query_ifaceobj=ifupdownobj.create_n_save_ifaceobjcurr(ifaceobj))
69 else:
70 ifupdownobj.logger.debug(msg)
71 m.run(ifaceobj, op)
72 except Exception, e:
73 err = 1
74 ifupdownobj.log_error(str(e))
75 finally:
76 if err:
77 ifaceobj.set_state_n_status(ifaceState.from_str(op),
78 ifaceStatus.ERROR)
79 else:
80 ifaceobj.set_state_n_status(ifaceState.from_str(op),
81 ifaceStatus.SUCCESS)
82
83 if ifupdownobj.COMPAT_EXEC_SCRIPTS:
84 # execute /etc/network/ scripts
85 for mname in ifupdownobj.script_ops.get(op, []):
86 ifupdownobj.logger.debug('%s: %s : running script %s'
87 %(ifacename, op, mname))
88 try:
89 ifupdownobj.exec_command(mname, cmdenv=cenv)
90 except Exception, e:
91 ifupdownobj.log_error(str(e))
92
93 @classmethod
94 def run_iface_ops(cls, ifupdownobj, ifaceobj, ops):
95 """ Runs all operations on an interface """
96 ifacename = ifaceobj.get_name()
97 # minor optimization. If operation is 'down', proceed only
98 # if interface exists in the system
99 if 'down' in ops[0] and not ifupdownobj.link_exists(ifacename):
100 ifupdownobj.logger.info('%s: does not exist' %ifacename)
101 return
102 cenv=None
103 if ifupdownobj.COMPAT_EXEC_SCRIPTS:
104 # For backward compatibility generate env variables
105 # for attributes
106 cenv = ifupdownobj.generate_running_env(ifaceobj, ops[0])
107 map(lambda op: cls.run_iface_op(ifupdownobj, ifaceobj, op, cenv), ops)
108 posthookfunc = ifupdownobj.sched_hooks.get('posthook')
109 if posthookfunc:
110 posthookfunc(ifupdownobj, ifaceobj)
111
112
113 @classmethod
114 def _check_upperifaces(cls, ifupdownobj, ifaceobj, ops, parent):
115 """ Check if conflicting upper ifaces are around and warn if required
116
117 Returns False if this interface needs to be skipped, else return True """
118
119 ifacename = ifaceobj.get_name()
120 # Deal with upperdevs first
121 ulist = ifaceobj.get_upperifaces()
122 if ulist:
123 tmpulist = ([u for u in ulist if u != parent] if parent
124 else ulist)
125 if not tmpulist:
126 ifupdownobj.logger.info('Roopa: tmpulist = %s' %str(tmpulist))
127 return True
128 if 'down' in ops[0]:
129 # XXX: This is expensive. Find a cheaper way to do this
130 # if any of the upperdevs are present,
131 # dont down this interface
132 for u in tmpulist:
133 if ifupdownobj.link_exists(u):
134 if not ifupdownobj.FORCE and not ifupdownobj.ALL:
135 ifupdownobj.logger.warn('%s: ' %ifacename +
136 ' skip interface down,' +
137 ' upperiface %s still around' %u)
138 return False
139 elif 'up' in ops[0] and not ifupdownobj.ALL:
140 # For 'up', just warn that there is an upperdev which is
141 # probably not up
142 for u in tmpulist:
143 if not ifupdownobj.link_exists(u):
144 ifupdownobj.logger.warn('%s: upper iface %s '
145 %(ifacename, u) + 'does not exist')
146 return True
147
148 @classmethod
149 def run_iface_graph(cls, ifupdownobj, ifacename, ops, parent=None,
150 order=ifaceSchedulerFlags.POSTORDER,
151 followdependents=True):
152 """ runs interface by traversing all nodes rooted at itself """
153
154 # Each ifacename can have a list of iface objects
155 ifaceobjs = ifupdownobj.get_ifaceobjs(ifacename)
156 if not ifaceobjs:
157 raise Exception('%s: not found' %ifacename)
158
159 for ifaceobj in ifaceobjs:
160 if not cls._check_upperifaces(ifupdownobj, ifaceobj, ops, parent):
161 return
162 if order == ifaceSchedulerFlags.INORDER:
163 # If inorder, run the iface first and then its dependents
164 cls.run_iface_ops(ifupdownobj, ifaceobj, ops)
165
166 # Run lowerifaces or dependents
167 dlist = ifaceobj.get_lowerifaces()
168 if dlist:
169 ifupdownobj.logger.debug('%s:' %ifacename +
170 ' found dependents: %s' %str(dlist))
171 try:
172 if not followdependents:
173 # XXX: this is yet another extra step,
174 # but is needed for interfaces that are
175 # implicit dependents. even though we are asked to
176 # not follow dependents, we must follow the ones
177 # that dont have user given config. Because we own them
178 new_dlist = [d for d in dlist
179 if ifupdownobj.is_iface_noconfig(d)]
180 if new_dlist:
181 cls.run_iface_list(ifupdownobj, new_dlist, ops,
182 ifacename, order,
183 followdependents,
184 continueonfailure=False)
185 else:
186 cls.run_iface_list(ifupdownobj, dlist, ops,
187 ifacename, order,
188 followdependents,
189 continueonfailure=False)
190 except Exception, e:
191 if (ifupdownobj.ignore_error(str(e))):
192 pass
193 else:
194 # Dont bring the iface up if children did not come up
195 ifaceobj.set_state_n_status(ifaceState.NEW,
196 ifaceStatus.ERROR)
197 raise
198 if order == ifaceSchedulerFlags.POSTORDER:
199 cls.run_iface_ops(ifupdownobj, ifaceobj, ops)
200
201 @classmethod
202 def run_iface_list(cls, ifupdownobj, ifacenames,
203 ops, parent=None, order=ifaceSchedulerFlags.POSTORDER,
204 followdependents=True, continueonfailure=True):
205 """ Runs interface list """
206
207 for ifacename in ifacenames:
208 try:
209 cls.run_iface_graph(ifupdownobj, ifacename, ops, parent,
210 order, followdependents)
211 except Exception, e:
212 if continueonfailure:
213 if ifupdownobj.logger.isEnabledFor(logging.DEBUG):
214 traceback.print_tb(sys.exc_info()[2])
215 ifupdownobj.logger.error('%s : %s' %(ifacename, str(e)))
216 pass
217 else:
218 if (ifupdownobj.ignore_error(str(e))):
219 pass
220 else:
221 raise Exception('error running iface %s (%s)'
222 %(ifacename, str(e)))
223
224 @classmethod
225 def run_iface_dependency_graphs(cls, ifupdownobj,
226 dependency_graph, ops, indegrees=None,
227 order=ifaceSchedulerFlags.POSTORDER,
228 followdependents=True):
229 """ Runs iface dependeny graph by visiting all the nodes
230
231 Parameters:
232 -----------
233 ifupdownobj : ifupdown object (used for getting and updating iface
234 object state)
235 dependency_graph : dependency graph in adjacency list
236 format (contains more than one dependency graph)
237 ops : list of operations to perform eg ['pre-up', 'up', 'post-up']
238
239 indegrees : indegree array if present is used to determine roots
240 of the graphs in the dependency_graph
241 """
242 run_queue = []
243
244 if not indegrees:
245 indegrees = OrderedDict()
246 for ifacename in dependency_graph.keys():
247 indegrees[ifacename] = ifupdownobj.get_iface_refcnt(ifacename)
248
249 sorted_ifacenames = graph.topological_sort_graphs_all(dependency_graph,
250 dict(indegrees))
251 ifupdownobj.logger.debug('sorted ifacenames %s : '
252 %str(sorted_ifacenames))
253
254 # Build a list of ifaces that dont have any dependencies
255 for ifacename in sorted_ifacenames:
256 if not indegrees.get(ifacename):
257 run_queue.append(ifacename)
258
259 ifupdownobj.logger.debug('graph roots (interfaces that dont have '
260 'dependents):' + ' %s' %str(run_queue))
261
262 return cls.run_iface_list(ifupdownobj, run_queue, ops,
263 parent=None,order=order,
264 followdependents=followdependents)
265
266 @classmethod
267 def run_iface(cls, ifupdownobj, ifacename, ops):
268 """ Runs operation on an interface """
269
270 ifaceobjs = ifupdownobj.get_ifaceobjs(ifacename)
271 for i in ifaceobjs:
272 cls.run_iface_ops(ifupdownobj, i, ops)
273
274 @classmethod
275 def run_iface_list_op(cls, ifupdownobj, ifacenames, op,
276 sorted_by_dependency=False):
277 """ Runs interface list through sub operation handler. """
278
279 ifupdownobj.logger.debug('running operation %s on all given interfaces'
280 %op)
281 iface_run_queue = deque(ifacenames)
282 for i in range(0, len(iface_run_queue)):
283 if op.endswith('up'):
284 # XXX: simplify this
285 if sorted_by_dependency:
286 ifacename = iface_run_queue.pop()
287 else:
288 ifacename = iface_run_queue.popleft()
289 else:
290 if sorted_by_dependency:
291 ifacename = iface_run_queue.popleft()
292 else:
293 ifacename = iface_run_queue.pop()
294
295 try:
296 ifaceobjs = ifupdownobj.get_ifaceobjs(ifacename)
297 for ifaceobj in ifaceobjs:
298 cenv = ifupdownobj.generate_running_env(ifaceobj, op)
299 cls.run_iface_op(ifupdownobj, ifaceobj, op, cenv)
300 except Exception, e:
301 ifupdownobj.log_error(str(e))
302
303 @classmethod
304 def run_iface_list_ops(cls, ifupdownobj, ifacenames, ops,
305 sorted_by_dependency=False):
306 """ Runs interface list through sub operations handler
307
308 Unlike run_iface_list, this method executes a sub operation on the
309 entire interface list before proceeding to the next sub-operation.
310 ie operation 'pre-up' is run through the entire interface list before
311 'up'
312 """
313 # Each sub operation has a module list
314 [cls.run_iface_list_op(ifupdownobj, ifacenames, op,
315 sorted_by_dependency) for op in ops]
316
317 @classmethod
318 def run_iface_dependency_graphs_sorted(cls, ifupdownobj,
319 dependency_graphs,
320 ops, indegrees=None,
321 graphsortall=False):
322 """ runs interface dependency graph by topologically sorting the interfaces """
323
324 if indegrees is None:
325 indegrees = OrderedDict()
326 for ifacename in dependency_graphs.keys():
327 indegrees[ifacename] = ifupdownobj.get_iface_refcnt(ifacename)
328
329 ifupdownobj.logger.debug('indegree array :')
330 ifupdownobj.logger.debug(ifupdownobj.pp.pformat(indegrees))
331
332 try:
333 ifupdownobj.logger.debug('calling topological sort on the graph ' +
334 '...')
335 if graphsortall:
336 sorted_ifacenames = graph.topological_sort_graphs_all(
337 dependency_graphs, indegrees)
338 else:
339 sorted_ifacenames = graph.topological_sort_graphs(
340 dependency_graphs, indegrees)
341 except Exception:
342 raise
343
344 ifupdownobj.logger.debug('sorted iface list = %s' %sorted_ifacenames)
345 cls.run_iface_list_ops(ifupdownobj, sorted_ifacenames, ops,
346 sorted_by_dependency=True)
347
348
349 """ Methods to execute interfaces in parallel """
350 @classmethod
351 def init_tokens(cls, count):
352 cls.token_pool = BoundedSemaphore(count)
353
354 @classmethod
355 def accquire_token(cls, logprefix=''):
356 cls.token_pool.acquire()
357
358 @classmethod
359 def release_token(cls, logprefix=''):
360 cls.token_pool.release()
361
362 @classmethod
363 def run_iface_parallel(cls, ifupdownobj, ifacename, op):
364 """ Configures interface in parallel.
365
366 Executes all its direct dependents in parallel
367
368 """
369
370 ifupdownobj.logger.debug('%s:' %ifacename + ' %s' %op)
371 cls.accquire_token(iface)
372
373 # Each iface can have a list of objects
374 ifaceobjs = ifupdownobj.get_ifaceobjs(ifacename)
375 if ifaceobjs is None:
376 ifupdownobj.logger.warning('%s: ' %ifacename + 'not found')
377 cls.release_token(ifacename)
378 return -1
379
380 for ifaceobj in ifaceobjs:
381 # Run dependents
382 dlist = ifaceobj.get_lowerifaces()
383 if dlist:
384 ifupdownobj.logger.debug('%s:' %ifacename +
385 ' found dependents: %s' %str(dlist))
386 try:
387 cls.release_token(ifacename)
388 cls.run_iface_list_parallel(ifacename, ifupdownobj,
389 dlist, op)
390 cls.accquire_token(ifacename)
391 except Exception, e:
392 if ifupdownobj.ignore_error(str(e)):
393 pass
394 else:
395 # Dont bring the iface up if children did not come up
396 ifupdownobj.logger.debug('%s:' %ifacename +
397 ' there was an error bringing %s' %op +
398 ' dependents (%s)', str(e))
399 ifupdownobj.set_iface_state(ifaceobj,
400 ifaceState.from_str(ops[0]),
401 ifaceStatus.ERROR)
402 return -1
403
404 # Run all sub operations sequentially
405 try:
406 ifupdownobj.logger.debug('%s:' %ifacename +
407 ' running sub-operations')
408 cls.run_iface_ops(ifupdownobj, ifaceobj, op)
409 except Exception, e:
410 ifupdownobj.logger.error('%s:' %ifacename +
411 ' error running sub operations (%s)' %str(e))
412
413 cls.release_token(ifacename)
414
415 @classmethod
416 def run_iface_list_parallel(cls, parent, ifupdownobj, ifacenames, op):
417 """ Runs interface list in parallel """
418
419 running_threads = OrderedDict()
420 err = 0
421
422 for ifacename in ifacenames:
423 try:
424 cls.accquire_token(parent)
425 running_threads[ifacename] = Thread(None,
426 cls.run_iface_parallel, ifacename,
427 args=(ifupdownobj, ifacename, op))
428 running_threads[ifacename].start()
429 cls.release_token(parent)
430 except Exception, e:
431 cls.release_token(parent)
432 if ifupdownobj.ignore_error(str(e)):
433 pass
434 else:
435 raise Exception('error starting thread for iface %s'
436 %ifacename)
437
438
439 ifupdownobj.logger.debug('%s ' %parent +
440 'waiting for all the threads ...')
441 for ifacename, t in running_threads.items():
442 t.join()
443 if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS:
444 err += 1
445
446 return err
447
448 @classmethod
449 def run_iface_graphs_parallel(cls, parent, ifupdownobj, ifacenames, op):
450 """ Runs iface graphs in parallel """
451
452 running_threads = OrderedDict()
453 err = 0
454
455 for ifacename in ifacenames:
456 try:
457 cls.accquire_graph_token(parent)
458 running_threads[ifacename] = Thread(None,
459 cls.run_iface_parallel, ifacename,
460 args=(ifupdownobj, ifacename, op))
461 running_threads[ifacename].start()
462 cls.release_graph_token(parent)
463 except Exception, e:
464 cls.release_graph_token(parent)
465 if ifupdownobj.ignore_error(str(e)):
466 pass
467 else:
468 raise Exception('error starting thread for iface %s'
469 %ifacename)
470
471 ifupdownobj.logger.info('%s ' %parent +
472 'waiting for all the threads ...')
473 for ifacename, t in running_threads.items():
474 t.join()
475 # Check status of thread
476 # XXX: Check all objs
477 if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS:
478 err += 1
479 return err
480
481 @classmethod
482 def run_iface_dependency_graph_parallel(cls, ifupdownobj, dependency_graph,
483 operation):
484 """ Runs iface dependeny graph in parallel.
485
486 arguments:
487 ifupdownobj -- ifupdown object (used for getting and updating iface
488 object state)
489 dependency_graph -- dependency graph with
490 operation -- 'up' or 'down' or 'query'
491
492 """
493
494 ifupdownobj.logger.debug('running dependency graph in parallel ..')
495 run_queue = []
496 # Build a list of ifaces that dont have any dependencies
497 for ifacename in dependency_graph.keys():
498 if ifupdownobj.get_iface_refcnt(ifacename) == 0:
499 run_queue.append(ifacename)
500
501 ifupdownobj.logger.debug('graph roots (interfaces that dont'
502 ' have dependents):' + ' %s' %str(run_queue))
503 cls.init_tokens(ifupdownobj.get_njobs())
504 return cls.run_iface_list_parallel('main', ifupdownobj, run_queue,
505 operation)
506
507 # OR
508 # Run one graph at a time
509 #for iface in run_queue:
510 # self.run_iface_list_parallel('main', ifupdownobj, [iface],
511 # operation)
512