]> git.proxmox.com Git - mirror_ifupdown2.git/blame - pkg/scheduler.py
More fixes and cleanup
[mirror_ifupdown2.git] / pkg / scheduler.py
CommitLineData
a6f80f0e 1#!/usr/bin/python
3e8ee54f 2#
3# Copyright 2013. Cumulus Networks, Inc.
4# Author: Roopa Prabhu, roopa@cumulusnetworks.com
5#
6# ifaceScheduler --
7# interface scheduler
8#
a6f80f0e 9
a6f80f0e 10from statemanager import *
11from iface import *
12from graph import *
13from collections import deque
14from collections import OrderedDict
a6f80f0e 15import logging
d08d5f54 16import traceback
a6f80f0e 17from graph import *
18from collections import deque
19from threading import *
20from ifupdownbase import *
21
d08d5f54 22class ifaceSchedulerFlags():
23 INORDER = 1
24 POSTORDER = 2
25
a6f80f0e 26class ifaceScheduler(ifupdownBase):
27 """ scheduler to schedule configuration of interfaces.
28
29
30 supports scheduling of interfaces serially in plain interface list
31 or dependency graph format.
32 """
33
d08d5f54 34
3e8ee54f 35 def __init__(self, force=False):
a6f80f0e 36 self.logger = logging.getLogger('ifupdown.' +
37 self.__class__.__name__)
3e8ee54f 38 self.FORCE = force
a6f80f0e 39
d08d5f54 40 def run_iface_op(self, ifupdownobj, ifaceobj, op, cenv):
a6f80f0e 41 """ Runs sub operation on an interface """
d08d5f54 42 ifacename = ifaceobj.get_name()
a6f80f0e 43
d08d5f54 44 if (ifaceobj.get_state() >= ifaceState.from_str(op) and
45 ifaceobj.get_status() == ifaceStatus.SUCCESS):
46 self.logger.debug('%s: already in state %s' %(ifacename, op))
47 return
a6f80f0e 48
d08d5f54 49 for mname in ifupdownobj.operations.get(op):
37c0543d 50 m = ifupdownobj.modules.get(mname)
a6f80f0e 51 err = 0
52 try:
d08d5f54 53 if hasattr(m, 'run'):
37c0543d 54 self.logger.debug('%s: %s : running module %s'
d08d5f54 55 %(ifacename, op, mname))
739f665b 56 if op == 'query-checkcurr':
d08d5f54 57 # Dont check curr if the interface object was
37c0543d 58 # auto generated
d08d5f54 59 if (ifaceobj.priv_flags & ifupdownobj.NOCONFIG):
37c0543d 60 continue
d08d5f54 61 m.run(ifaceobj, op,
62 query_ifaceobj=ifupdownobj.create_n_save_ifaceobjcurr(ifaceobj))
a6f80f0e 63 else:
d08d5f54 64 m.run(ifaceobj, op)
a6f80f0e 65 except Exception, e:
66 err = 1
3e8ee54f 67 self.log_error(str(e))
a6f80f0e 68 finally:
d08d5f54 69 if err == 1:
70 ifupdownobj.set_iface_state(ifaceobj,
71 ifaceState.from_str(op),
a6f80f0e 72 ifaceStatus.ERROR)
d08d5f54 73 else:
74 ifupdownobj.set_iface_state(ifaceobj,
75 ifaceState.from_str(op),
a6f80f0e 76 ifaceStatus.SUCCESS)
77
37c0543d 78 # execute /etc/network/ scripts
d08d5f54 79 mlist = ifupdownobj.operations_compat.get(op)
80 if not mlist:
81 return
82 for mname in mlist:
37c0543d 83 self.logger.debug('%s: %s : running script %s'
d08d5f54 84 %(ifacename, op, mname))
37c0543d 85 try:
86 self.exec_command(mname, cmdenv=cenv)
87 except Exception, e:
88 err = 1
89 self.log_error(str(e))
90
d08d5f54 91
92 def run_iface_ops(self, ifupdownobj, ifaceobj, ops):
a6f80f0e 93 """ Runs all sub operations on an interface """
94
95 # For backward compatibility execute scripts with
96 # environent set
d08d5f54 97 cenv = ifupdownobj.generate_running_env(ifaceobj, ops[0])
a6f80f0e 98
99 # Each sub operation has a module list
d08d5f54 100 [self.run_iface_op(ifupdownobj, ifaceobj, op, cenv)
101 for op in ops]
a6f80f0e 102
d08d5f54 103 def run_iface_graph(self, ifupdownobj, ifacename, ops,
104 order=ifaceSchedulerFlags.POSTORDER,
105 followdependents=True):
106 """ runs interface by traversing its dependents first """
a6f80f0e 107
d08d5f54 108 # Each ifacename can have a list of iface objects
a6f80f0e 109 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
d08d5f54 110 if ifaceobjs is None:
111 raise Exception('%s: not found' %ifacename)
a6f80f0e 112
113
d08d5f54 114 for ifaceobj in ifaceobjs:
115 if order == ifaceSchedulerFlags.INORDER:
116 # Run all sub operations sequentially
117 try:
118 self.run_iface_ops(ifupdownobj, ifaceobj, ops)
119 except Exception, e:
120 raise Exception(str(e))
121 # Run dependents
122 dlist = ifaceobj.get_dependents()
123 if dlist and len(dlist):
124 self.logger.debug('%s:' %ifacename +
125 ' found dependents: %s' %str(dlist))
126 try:
127 if not followdependents:
128 # XXX: this is yet another extra step,
129 # but is needed for interfaces that are
130 # implicit dependents
131 # up without dependents, but
132 new_dlist = [d for d in dlist
133 if ifupdownobj.is_iface_noconfig(d)]
134 if not new_dlist: continue
135 self.run_iface_list(ifupdownobj, new_dlist, ops,
136 order, followdependents)
137 else:
138 self.run_iface_list(ifupdownobj, dlist, ops,
139 order, followdependents)
140 except Exception, e:
141 if (self.ignore_error(str(e))):
142 pass
143 else:
144 # Dont bring the iface up if children did not come up
145 ifaceobj.set_state(ifaceState.NEW)
146 ifaceobj.set_status(ifaceStatus.ERROR)
147 raise
a6f80f0e 148
d08d5f54 149 if order == ifaceSchedulerFlags.POSTORDER:
150 try:
151 self.run_iface_ops(ifupdownobj, ifaceobj, ops)
152 except Exception, e:
153 raise Exception(str(e))
a6f80f0e 154
d08d5f54 155 def run_iface_list(self, ifupdownobj, ifacenames,
156 ops, order=ifaceSchedulerFlags.POSTORDER,
157 followdependents=True):
158 """ Runs interface list """
a6f80f0e 159
d08d5f54 160 for ifacename in ifacenames:
a6f80f0e 161 try:
d08d5f54 162 self.run_iface_graph(ifupdownobj, ifacename, ops,
163 order, followdependents)
a6f80f0e 164 except Exception, e:
d08d5f54 165 if (self.ignore_error(str(e))):
166 pass
167 else:
168 traceback.print_stack()
169 raise Exception('error running iface %s (%s)'
170 %(ifacename, str(e)))
171
172 def run_iface_dependency_graphs(self, ifupdownobj,
173 dependency_graph, ops, indegrees=None,
174 order=ifaceSchedulerFlags.POSTORDER,
175 followdependents=True):
176 """ Runs iface dependeny graph by visiting all the nodes
177
178 Parameters:
179 -----------
180 ifupdownobj : ifupdown object (used for getting and updating iface
181 object state)
182 dependency_graph : dependency graph in adjacency list
183 format (contains more than one dependency graph)
184 ops : list of operations to perform eg ['pre-up', 'up', 'post-up']
185
186 indegrees : indegree array if present is used to determine roots
187 of the graphs in the dependency_graph
188 """
189
190 self.logger.debug('running dependency graph serially ..')
a6f80f0e 191
d08d5f54 192 run_queue = []
193 # Build a list of ifaces that dont have any dependencies
194 if indegrees:
195 # use indegrees array if specified
196 for ifacename, degree in indegrees.items():
197 if not indegrees.get(ifacename):
198 run_queue.append(ifacename)
199 else:
200 for ifacename in dependency_graph.keys():
201 if not ifupdownobj.get_iface_refcnt(ifacename):
202 run_queue.append(ifacename)
203
204 self.logger.debug('graph roots (interfaces that dont have '
205 'dependents):' + ' %s' %str(run_queue))
206
207 return self.run_iface_list(ifupdownobj, run_queue, ops, order,
208 followdependents)
209
210
211 def run_iface(self, ifupdownobj, ifacename, ops):
212 """ Runs operation on an interface """
213
214 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
215 for i in ifaceobjs:
216 self.run_iface_ops(ifupdownobj, i, ops)
217
218 def run_iface_list_op(self, ifupdownobj, ifacenames, op,
a6f80f0e 219 sorted_by_dependency=False):
220 """ Runs interface list through sub operation handler. """
221
d08d5f54 222 self.logger.debug('running operation %s on all given interfaces'
223 %op)
a6f80f0e 224 iface_run_queue = deque(ifacenames)
225 for i in range(0, len(iface_run_queue)):
d08d5f54 226 if op.endswith('up'):
a6f80f0e 227 # XXX: simplify this
d08d5f54 228 if sorted_by_dependency:
a6f80f0e 229 ifacename = iface_run_queue.pop()
230 else:
231 ifacename = iface_run_queue.popleft()
232 else:
d08d5f54 233 if sorted_by_dependency:
a6f80f0e 234 ifacename = iface_run_queue.popleft()
235 else:
236 ifacename = iface_run_queue.pop()
237
238 try:
239 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
240 for ifaceobj in ifaceobjs:
a6f80f0e 241 cenv = ifupdownobj.generate_running_env(ifaceobj, op)
d08d5f54 242 self.run_iface_op(ifupdownobj, ifaceobj, op, cenv)
a6f80f0e 243 except Exception, e:
3e8ee54f 244 self.log_error(str(e))
a6f80f0e 245
d08d5f54 246 def run_iface_list_ops(self, ifupdownobj, ifacenames, ops,
a6f80f0e 247 sorted_by_dependency=False):
248 """ Runs interface list through sub operations handler
249
250 Unlike run_iface_list, this method executes a sub operation on the
251 entire interface list before proceeding to the next sub-operation.
252 ie operation 'pre-up' is run through the entire interface list before
253 'up'
254 """
255
a6f80f0e 256 # Each sub operation has a module list
d08d5f54 257 [self.run_iface_list_op(ifupdownobj, ifacenames, op,
258 sorted_by_dependency) for op in ops]
a6f80f0e 259
d08d5f54 260 def run_iface_dependency_graphs_sorted(self, ifupdownobj,
261 dependency_graphs,
262 ops, indegrees=None,
37c0543d 263 graphsortall=False):
d08d5f54 264 """ runs interface dependency graph by topologically sorting the interfaces """
a6f80f0e 265
37c0543d 266 if indegrees is None:
267 indegrees = OrderedDict()
268 for ifacename in dependency_graphs.keys():
269 indegrees[ifacename] = ifupdownobj.get_iface_refcnt(ifacename)
a6f80f0e 270
d08d5f54 271 if self.logger.isEnabledFor(logging.DEBUG):
a6f80f0e 272 self.logger.debug('indegree array :')
37c0543d 273 self.logger.debug(ifupdownobj.pp.pformat(indegrees))
a6f80f0e 274
275 try:
276 self.logger.debug('calling topological sort on the graph ...')
d08d5f54 277 if graphsortall:
37c0543d 278 sorted_ifacenames = graph.topological_sort_graphs_all(
279 dependency_graphs, indegrees)
280 else:
281 sorted_ifacenames = graph.topological_sort_graphs(
282 dependency_graphs, indegrees)
cca03c30 283 except Exception:
a6f80f0e 284 raise
285
286 self.logger.debug('sorted iface list = %s' %sorted_ifacenames)
d08d5f54 287 self.run_iface_list_ops(ifupdownobj, sorted_ifacenames, ops,
288 sorted_by_dependency=True)
a6f80f0e 289
290
d08d5f54 291 """ Methods to execute interfaces in parallel """
a6f80f0e 292 def init_tokens(self, count):
293 self.token_pool = BoundedSemaphore(count)
294 self.logger.debug('initialized bounded semaphore with %d' %count)
295
296 def accquire_token(self, logprefix=''):
297 self.token_pool.acquire()
298 self.logger.debug('%s ' %logprefix + 'acquired token')
299
300 def release_token(self, logprefix=''):
301 self.token_pool.release()
302 self.logger.debug('%s ' %logprefix + 'release token')
303
304 def run_iface_parallel(self, ifupdownobj, ifacename, op):
305 """ Configures interface in parallel.
306
307 Executes all its direct dependents in parallel
308
309 """
310
311 self.logger.debug('%s:' %ifacename + ' %s' %op)
312 self.accquire_token(iface)
313
314 # Each iface can have a list of objects
315 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
316 if ifaceobjs is None:
317 self.logger.warning('%s: ' %ifacename + 'not found')
318 self.release_token(ifacename)
319 return -1
320
321 for ifaceobj in ifaceobjs:
322 # Run dependents
323 dlist = ifaceobj.get_dependents()
324 if dlist is not None and len(dlist) > 0:
325 self.logger.debug('%s:' %ifacename +
326 ' found dependents: %s' %str(dlist))
327 try:
328 self.release_token(ifacename)
329 self.run_iface_list_parallel(ifacename, ifupdownobj,
330 dlist, op)
331 self.accquire_token(ifacename)
332 except Exception, e:
d08d5f54 333 if self.ignore_error(str(e)):
a6f80f0e 334 pass
335 else:
336 # Dont bring the iface up if children did not come up
337 self.logger.debug('%s:' %ifacename +
338 ' there was an error bringing %s' %op +
339 ' dependents (%s)', str(e))
340 ifupdownobj.set_iface_state(ifaceobj,
d08d5f54 341 ifaceState.from_str(ops[0]),
a6f80f0e 342 ifaceStatus.ERROR)
343 return -1
344
a6f80f0e 345 # Run all sub operations sequentially
346 try:
347 self.logger.debug('%s:' %ifacename + ' running sub-operations')
d08d5f54 348 self.run_iface_ops(ifupdownobj, ifaceobj, op)
a6f80f0e 349 except Exception, e:
350 self.logger.error('%s:' %ifacename +
351 ' error running sub operations (%s)' %str(e))
352
353 self.release_token(ifacename)
354
355
356 def run_iface_list_parallel(self, parent, ifupdownobj, ifacenames, op):
357 """ Runs interface list in parallel """
358
359 running_threads = OrderedDict()
360 err = 0
361
362 for ifacename in ifacenames:
363 try:
364 self.accquire_token(parent)
365 running_threads[ifacename] = Thread(None,
366 self.run_iface_parallel, ifacename,
367 args=(ifupdownobj, ifacename, op))
368 running_threads[ifacename].start()
369 self.release_token(parent)
370 except Exception, e:
371 self.release_token(parent)
d08d5f54 372 if ifupdownobj.ignore_error(str(e)):
a6f80f0e 373 pass
374 else:
375 raise Exception('error starting thread for iface %s'
376 %ifacename)
377
378
379 self.logger.debug('%s' %parent + 'waiting for all the threads ...')
380 for ifacename, t in running_threads.items():
381 t.join()
382 if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS:
383 err += 1
384
385 return err
386
387 def run_iface_graphs_parallel(self, parent, ifupdownobj, ifacenames, op):
388 """ Runs iface graphs in parallel """
389
390 running_threads = OrderedDict()
391 err = 0
392
393 for ifacename in ifacenames:
394 try:
395 self.accquire_graph_token(parent)
396 running_threads[ifacename] = Thread(None,
397 self.run_iface_parallel, ifacename,
398 args=(ifupdownobj, ifacename, op))
399 running_threads[ifacename].start()
400 self.release_graph_token(parent)
401 except Exception, e:
402 self.release_graph_token(parent)
d08d5f54 403 if ifupdownobj.ignore_error(str(e)):
a6f80f0e 404 pass
405 else:
406 raise Exception('error starting thread for iface %s'
407 %ifacename)
408
409 self.logger.info('%s' %parent + 'waiting for all the threads ...')
410 for ifacename, t in running_threads.items():
411 t.join()
412 # Check status of thread
413 # XXX: Check all objs
414 if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS:
415 err += 1
416
417 return err
418
419 def run_iface_dependency_graph_parallel(self, ifupdownobj, dependency_graph,
420 operation):
421 """ Runs iface dependeny graph in parallel.
422
423 arguments:
424 ifupdownobj -- ifupdown object (used for getting and updating iface
425 object state)
426 dependency_graph -- dependency graph with
427 operation -- 'up' or 'down' or 'query'
428
429 """
430
431 self.logger.debug('running dependency graph in parallel ..')
432
433 run_queue = []
434
435 # Build a list of ifaces that dont have any dependencies
436 for ifacename in dependency_graph.keys():
437 if ifupdownobj.get_iface_refcnt(ifacename) == 0:
438 run_queue.append(ifacename)
439
d08d5f54 440 self.logger.debug('graph roots (interfaces that dont'
441 ' have dependents):' + ' %s' %str(run_queue))
a6f80f0e 442
443 self.init_tokens(ifupdownobj.get_njobs())
444
445 return self.run_iface_list_parallel('main', ifupdownobj, run_queue,
446 operation)
447
448 # OR
449 # Run one graph at a time
450 #for iface in run_queue:
451 # self.run_iface_list_parallel('main', ifupdownobj, [iface],
452 # operation)
453