]> git.proxmox.com Git - mirror_ifupdown2.git/blame - pkg/scheduler.py
minor fixes + cleanup + update manpage
[mirror_ifupdown2.git] / pkg / scheduler.py
CommitLineData
a6f80f0e 1#!/usr/bin/python
3e8ee54f 2#
3# Copyright 2013. Cumulus Networks, Inc.
4# Author: Roopa Prabhu, roopa@cumulusnetworks.com
5#
6# ifaceScheduler --
7# interface scheduler
8#
a6f80f0e 9
a6f80f0e 10from statemanager import *
11from iface import *
12from graph import *
13from collections import deque
14from collections import OrderedDict
a6f80f0e 15import logging
d08d5f54 16import traceback
a6f80f0e 17from graph import *
18from collections import deque
19from threading import *
20from ifupdownbase import *
21
d08d5f54 22class ifaceSchedulerFlags():
23 INORDER = 1
24 POSTORDER = 2
25
a6f80f0e 26class ifaceScheduler(ifupdownBase):
27 """ scheduler to schedule configuration of interfaces.
28
29
30 supports scheduling of interfaces serially in plain interface list
31 or dependency graph format.
32 """
33
d08d5f54 34
3e8ee54f 35 def __init__(self, force=False):
a6f80f0e 36 self.logger = logging.getLogger('ifupdown.' +
37 self.__class__.__name__)
3e8ee54f 38 self.FORCE = force
a6f80f0e 39
d08d5f54 40 def run_iface_op(self, ifupdownobj, ifaceobj, op, cenv):
a6f80f0e 41 """ Runs sub operation on an interface """
d08d5f54 42 ifacename = ifaceobj.get_name()
a6f80f0e 43
d08d5f54 44 if (ifaceobj.get_state() >= ifaceState.from_str(op) and
45 ifaceobj.get_status() == ifaceStatus.SUCCESS):
46 self.logger.debug('%s: already in state %s' %(ifacename, op))
47 return
a6f80f0e 48
d08d5f54 49 for mname in ifupdownobj.operations.get(op):
37c0543d 50 m = ifupdownobj.modules.get(mname)
a6f80f0e 51 err = 0
52 try:
d08d5f54 53 if hasattr(m, 'run'):
37c0543d 54 self.logger.debug('%s: %s : running module %s'
d08d5f54 55 %(ifacename, op, mname))
739f665b 56 if op == 'query-checkcurr':
d08d5f54 57 # Dont check curr if the interface object was
37c0543d 58 # auto generated
d08d5f54 59 if (ifaceobj.priv_flags & ifupdownobj.NOCONFIG):
37c0543d 60 continue
d08d5f54 61 m.run(ifaceobj, op,
62 query_ifaceobj=ifupdownobj.create_n_save_ifaceobjcurr(ifaceobj))
a6f80f0e 63 else:
d08d5f54 64 m.run(ifaceobj, op)
a6f80f0e 65 except Exception, e:
66 err = 1
3e8ee54f 67 self.log_error(str(e))
a6f80f0e 68 finally:
d08d5f54 69 if err == 1:
70 ifupdownobj.set_iface_state(ifaceobj,
71 ifaceState.from_str(op),
a6f80f0e 72 ifaceStatus.ERROR)
d08d5f54 73 else:
74 ifupdownobj.set_iface_state(ifaceobj,
75 ifaceState.from_str(op),
a6f80f0e 76 ifaceStatus.SUCCESS)
77
37c0543d 78 # execute /etc/network/ scripts
d08d5f54 79 mlist = ifupdownobj.operations_compat.get(op)
80 if not mlist:
81 return
82 for mname in mlist:
37c0543d 83 self.logger.debug('%s: %s : running script %s'
d08d5f54 84 %(ifacename, op, mname))
37c0543d 85 try:
86 self.exec_command(mname, cmdenv=cenv)
87 except Exception, e:
88 err = 1
89 self.log_error(str(e))
90
d08d5f54 91
92 def run_iface_ops(self, ifupdownobj, ifaceobj, ops):
a6f80f0e 93 """ Runs all sub operations on an interface """
94
95 # For backward compatibility execute scripts with
96 # environent set
d08d5f54 97 cenv = ifupdownobj.generate_running_env(ifaceobj, ops[0])
a6f80f0e 98
99 # Each sub operation has a module list
d08d5f54 100 [self.run_iface_op(ifupdownobj, ifaceobj, op, cenv)
101 for op in ops]
a6f80f0e 102
f3215127 103 def run_iface_graph(self, ifupdownobj, ifacename, ops, parent=None,
d08d5f54 104 order=ifaceSchedulerFlags.POSTORDER,
105 followdependents=True):
6ef5bfa2 106 """ runs interface by traversing all nodes rooted at itself """
107
108 # minor optimization. If operation is 'down', proceed only
109 # if interface exists in the system
f3215127 110 if 'down' in ops[0] and not self.link_exists(ifacename):
111 self.logger.info('%s: does not exist' %ifacename)
6ef5bfa2 112 return
a6f80f0e 113
d08d5f54 114 # Each ifacename can have a list of iface objects
a6f80f0e 115 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
d08d5f54 116 if ifaceobjs is None:
117 raise Exception('%s: not found' %ifacename)
a6f80f0e 118
d08d5f54 119 for ifaceobj in ifaceobjs:
f3215127 120 # Deal with upperdevs first
121 ulist = ifaceobj.get_upperifaces()
122 if ulist:
123 self.logger.debug('%s: parent = %s, ulist = %s'
124 %(ifacename, parent, ulist))
125 tmpulist = ([u for u in ulist if u != parent] if parent
126 else ulist)
127 if tmpulist:
128 self.logger.debug('%s: parent = %s, tmpulist = %s'
129 %(ifacename, parent, tmpulist))
130 if 'down' in ops[0]:
131 # XXX: This is expensive. Find a cheaper way to do this
132 # if any of the upperdevs are present,
133 # dont down this interface
134 for u in tmpulist:
135 if self.link_exists(u):
136 if not ifupdownobj.ALL:
137 self.logger.warn('%s: skip interface '
138 'down upperiface %s still around'
139 %(ifacename, u))
140 return
141 elif 'up' in ops[0] and not ifupdownobj.ALL:
142 # For 'up', just warn that there is an upperdev which is
143 # probably not up
144 for u in tmpulist:
145 if not self.link_exists(u):
146 self.logger.warn('%s: upper iface %s does not'
147 ' exist' %(ifacename, u))
148
d08d5f54 149 if order == ifaceSchedulerFlags.INORDER:
f3215127 150 # If inorder, run the iface first and then its dependents
151 self.run_iface_ops(ifupdownobj, ifaceobj, ops)
152
153 # Run lowerifaces or dependents
154 dlist = ifaceobj.get_lowerifaces()
155 if dlist:
6ef5bfa2 156 self.logger.info('%s:' %ifacename +
d08d5f54 157 ' found dependents: %s' %str(dlist))
158 try:
159 if not followdependents:
160 # XXX: this is yet another extra step,
161 # but is needed for interfaces that are
f3215127 162 # implicit dependents. even though we are asked to
163 # not follow dependents, we must follow the ones
164 # that dont have user given config. Because we own them
d08d5f54 165 new_dlist = [d for d in dlist
166 if ifupdownobj.is_iface_noconfig(d)]
6ef5bfa2 167 if new_dlist:
168 self.run_iface_list(ifupdownobj, new_dlist, ops,
f3215127 169 ifacename, order,
170 followdependents,
6ef5bfa2 171 continueonfailure=False)
d08d5f54 172 else:
173 self.run_iface_list(ifupdownobj, dlist, ops,
f3215127 174 ifacename, order,
175 followdependents,
6ef5bfa2 176 continueonfailure=False)
d08d5f54 177 except Exception, e:
178 if (self.ignore_error(str(e))):
179 pass
180 else:
181 # Dont bring the iface up if children did not come up
182 ifaceobj.set_state(ifaceState.NEW)
183 ifaceobj.set_status(ifaceStatus.ERROR)
184 raise
d08d5f54 185 if order == ifaceSchedulerFlags.POSTORDER:
f3215127 186 self.run_iface_ops(ifupdownobj, ifaceobj, ops)
187
a6f80f0e 188
d08d5f54 189 def run_iface_list(self, ifupdownobj, ifacenames,
f3215127 190 ops, parent=None, order=ifaceSchedulerFlags.POSTORDER,
6ef5bfa2 191 followdependents=True, continueonfailure=True):
d08d5f54 192 """ Runs interface list """
a6f80f0e 193
d08d5f54 194 for ifacename in ifacenames:
a6f80f0e 195 try:
f3215127 196 self.run_iface_graph(ifupdownobj, ifacename, ops, parent,
d08d5f54 197 order, followdependents)
a6f80f0e 198 except Exception, e:
6ef5bfa2 199 if continueonfailure:
200 self.logger.error('%s : %s' %(ifacename, str(e)))
d08d5f54 201 pass
202 else:
6ef5bfa2 203 if (self.ignore_error(str(e))):
204 pass
205 else:
206 raise Exception('error running iface %s (%s)'
207 %(ifacename, str(e)))
d08d5f54 208
209 def run_iface_dependency_graphs(self, ifupdownobj,
210 dependency_graph, ops, indegrees=None,
211 order=ifaceSchedulerFlags.POSTORDER,
212 followdependents=True):
213 """ Runs iface dependeny graph by visiting all the nodes
214
215 Parameters:
216 -----------
217 ifupdownobj : ifupdown object (used for getting and updating iface
218 object state)
219 dependency_graph : dependency graph in adjacency list
220 format (contains more than one dependency graph)
221 ops : list of operations to perform eg ['pre-up', 'up', 'post-up']
222
223 indegrees : indegree array if present is used to determine roots
224 of the graphs in the dependency_graph
225 """
d08d5f54 226 run_queue = []
f3215127 227
228 if indegrees is None:
229 indegrees = OrderedDict()
d08d5f54 230 for ifacename in dependency_graph.keys():
f3215127 231 indegrees[ifacename] = ifupdownobj.get_iface_refcnt(ifacename)
232
233 sorted_ifacenames = graph.topological_sort_graphs_all(dependency_graph,
234 dict(indegrees))
235 self.logger.debug('sorted ifacenames %s : ' %str(sorted_ifacenames))
236
237 # Build a list of ifaces that dont have any dependencies
238 for ifacename in sorted_ifacenames:
239 if not indegrees.get(ifacename):
240 run_queue.append(ifacename)
d08d5f54 241
f3215127 242 self.logger.info('graph roots (interfaces that dont have '
d08d5f54 243 'dependents):' + ' %s' %str(run_queue))
244
f3215127 245 return self.run_iface_list(ifupdownobj, run_queue, ops,
246 parent=None,order=order,
247 followdependents=followdependents)
d08d5f54 248
249
250 def run_iface(self, ifupdownobj, ifacename, ops):
251 """ Runs operation on an interface """
252
253 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
254 for i in ifaceobjs:
255 self.run_iface_ops(ifupdownobj, i, ops)
256
257 def run_iface_list_op(self, ifupdownobj, ifacenames, op,
a6f80f0e 258 sorted_by_dependency=False):
259 """ Runs interface list through sub operation handler. """
260
d08d5f54 261 self.logger.debug('running operation %s on all given interfaces'
262 %op)
a6f80f0e 263 iface_run_queue = deque(ifacenames)
264 for i in range(0, len(iface_run_queue)):
d08d5f54 265 if op.endswith('up'):
a6f80f0e 266 # XXX: simplify this
d08d5f54 267 if sorted_by_dependency:
a6f80f0e 268 ifacename = iface_run_queue.pop()
269 else:
270 ifacename = iface_run_queue.popleft()
271 else:
d08d5f54 272 if sorted_by_dependency:
a6f80f0e 273 ifacename = iface_run_queue.popleft()
274 else:
275 ifacename = iface_run_queue.pop()
276
277 try:
278 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
279 for ifaceobj in ifaceobjs:
a6f80f0e 280 cenv = ifupdownobj.generate_running_env(ifaceobj, op)
d08d5f54 281 self.run_iface_op(ifupdownobj, ifaceobj, op, cenv)
a6f80f0e 282 except Exception, e:
3e8ee54f 283 self.log_error(str(e))
a6f80f0e 284
d08d5f54 285 def run_iface_list_ops(self, ifupdownobj, ifacenames, ops,
a6f80f0e 286 sorted_by_dependency=False):
287 """ Runs interface list through sub operations handler
288
289 Unlike run_iface_list, this method executes a sub operation on the
290 entire interface list before proceeding to the next sub-operation.
291 ie operation 'pre-up' is run through the entire interface list before
292 'up'
293 """
294
a6f80f0e 295 # Each sub operation has a module list
d08d5f54 296 [self.run_iface_list_op(ifupdownobj, ifacenames, op,
297 sorted_by_dependency) for op in ops]
a6f80f0e 298
d08d5f54 299 def run_iface_dependency_graphs_sorted(self, ifupdownobj,
300 dependency_graphs,
301 ops, indegrees=None,
37c0543d 302 graphsortall=False):
d08d5f54 303 """ runs interface dependency graph by topologically sorting the interfaces """
a6f80f0e 304
37c0543d 305 if indegrees is None:
306 indegrees = OrderedDict()
307 for ifacename in dependency_graphs.keys():
308 indegrees[ifacename] = ifupdownobj.get_iface_refcnt(ifacename)
a6f80f0e 309
d08d5f54 310 if self.logger.isEnabledFor(logging.DEBUG):
a6f80f0e 311 self.logger.debug('indegree array :')
37c0543d 312 self.logger.debug(ifupdownobj.pp.pformat(indegrees))
a6f80f0e 313
314 try:
315 self.logger.debug('calling topological sort on the graph ...')
d08d5f54 316 if graphsortall:
37c0543d 317 sorted_ifacenames = graph.topological_sort_graphs_all(
318 dependency_graphs, indegrees)
319 else:
320 sorted_ifacenames = graph.topological_sort_graphs(
321 dependency_graphs, indegrees)
cca03c30 322 except Exception:
a6f80f0e 323 raise
324
325 self.logger.debug('sorted iface list = %s' %sorted_ifacenames)
d08d5f54 326 self.run_iface_list_ops(ifupdownobj, sorted_ifacenames, ops,
327 sorted_by_dependency=True)
a6f80f0e 328
329
d08d5f54 330 """ Methods to execute interfaces in parallel """
a6f80f0e 331 def init_tokens(self, count):
332 self.token_pool = BoundedSemaphore(count)
333 self.logger.debug('initialized bounded semaphore with %d' %count)
334
335 def accquire_token(self, logprefix=''):
336 self.token_pool.acquire()
337 self.logger.debug('%s ' %logprefix + 'acquired token')
338
339 def release_token(self, logprefix=''):
340 self.token_pool.release()
341 self.logger.debug('%s ' %logprefix + 'release token')
342
343 def run_iface_parallel(self, ifupdownobj, ifacename, op):
344 """ Configures interface in parallel.
345
346 Executes all its direct dependents in parallel
347
348 """
349
350 self.logger.debug('%s:' %ifacename + ' %s' %op)
351 self.accquire_token(iface)
352
353 # Each iface can have a list of objects
354 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
355 if ifaceobjs is None:
356 self.logger.warning('%s: ' %ifacename + 'not found')
357 self.release_token(ifacename)
358 return -1
359
360 for ifaceobj in ifaceobjs:
361 # Run dependents
f3215127 362 dlist = ifaceobj.get_lowerifaces()
a6f80f0e 363 if dlist is not None and len(dlist) > 0:
364 self.logger.debug('%s:' %ifacename +
365 ' found dependents: %s' %str(dlist))
366 try:
367 self.release_token(ifacename)
368 self.run_iface_list_parallel(ifacename, ifupdownobj,
369 dlist, op)
370 self.accquire_token(ifacename)
371 except Exception, e:
d08d5f54 372 if self.ignore_error(str(e)):
a6f80f0e 373 pass
374 else:
375 # Dont bring the iface up if children did not come up
376 self.logger.debug('%s:' %ifacename +
377 ' there was an error bringing %s' %op +
378 ' dependents (%s)', str(e))
379 ifupdownobj.set_iface_state(ifaceobj,
d08d5f54 380 ifaceState.from_str(ops[0]),
a6f80f0e 381 ifaceStatus.ERROR)
382 return -1
383
a6f80f0e 384 # Run all sub operations sequentially
385 try:
386 self.logger.debug('%s:' %ifacename + ' running sub-operations')
d08d5f54 387 self.run_iface_ops(ifupdownobj, ifaceobj, op)
a6f80f0e 388 except Exception, e:
389 self.logger.error('%s:' %ifacename +
390 ' error running sub operations (%s)' %str(e))
391
392 self.release_token(ifacename)
393
394
395 def run_iface_list_parallel(self, parent, ifupdownobj, ifacenames, op):
396 """ Runs interface list in parallel """
397
398 running_threads = OrderedDict()
399 err = 0
400
401 for ifacename in ifacenames:
402 try:
403 self.accquire_token(parent)
404 running_threads[ifacename] = Thread(None,
405 self.run_iface_parallel, ifacename,
406 args=(ifupdownobj, ifacename, op))
407 running_threads[ifacename].start()
408 self.release_token(parent)
409 except Exception, e:
410 self.release_token(parent)
d08d5f54 411 if ifupdownobj.ignore_error(str(e)):
a6f80f0e 412 pass
413 else:
414 raise Exception('error starting thread for iface %s'
415 %ifacename)
416
417
418 self.logger.debug('%s' %parent + 'waiting for all the threads ...')
419 for ifacename, t in running_threads.items():
420 t.join()
421 if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS:
422 err += 1
423
424 return err
425
426 def run_iface_graphs_parallel(self, parent, ifupdownobj, ifacenames, op):
427 """ Runs iface graphs in parallel """
428
429 running_threads = OrderedDict()
430 err = 0
431
432 for ifacename in ifacenames:
433 try:
434 self.accquire_graph_token(parent)
435 running_threads[ifacename] = Thread(None,
436 self.run_iface_parallel, ifacename,
437 args=(ifupdownobj, ifacename, op))
438 running_threads[ifacename].start()
439 self.release_graph_token(parent)
440 except Exception, e:
441 self.release_graph_token(parent)
d08d5f54 442 if ifupdownobj.ignore_error(str(e)):
a6f80f0e 443 pass
444 else:
445 raise Exception('error starting thread for iface %s'
446 %ifacename)
447
448 self.logger.info('%s' %parent + 'waiting for all the threads ...')
449 for ifacename, t in running_threads.items():
450 t.join()
451 # Check status of thread
452 # XXX: Check all objs
453 if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS:
454 err += 1
455
456 return err
457
458 def run_iface_dependency_graph_parallel(self, ifupdownobj, dependency_graph,
459 operation):
460 """ Runs iface dependeny graph in parallel.
461
462 arguments:
463 ifupdownobj -- ifupdown object (used for getting and updating iface
464 object state)
465 dependency_graph -- dependency graph with
466 operation -- 'up' or 'down' or 'query'
467
468 """
469
470 self.logger.debug('running dependency graph in parallel ..')
471
472 run_queue = []
473
474 # Build a list of ifaces that dont have any dependencies
475 for ifacename in dependency_graph.keys():
476 if ifupdownobj.get_iface_refcnt(ifacename) == 0:
477 run_queue.append(ifacename)
478
d08d5f54 479 self.logger.debug('graph roots (interfaces that dont'
480 ' have dependents):' + ' %s' %str(run_queue))
a6f80f0e 481
482 self.init_tokens(ifupdownobj.get_njobs())
483
484 return self.run_iface_list_parallel('main', ifupdownobj, run_queue,
485 operation)
486
487 # OR
488 # Run one graph at a time
489 #for iface in run_queue:
490 # self.run_iface_list_parallel('main', ifupdownobj, [iface],
491 # operation)
492