]> git.proxmox.com Git - mirror_ifupdown2.git/blame - pkg/scheduler.py
More fixes and changes
[mirror_ifupdown2.git] / pkg / scheduler.py
CommitLineData
a6f80f0e 1#!/usr/bin/python
3e8ee54f 2#
3# Copyright 2013. Cumulus Networks, Inc.
4# Author: Roopa Prabhu, roopa@cumulusnetworks.com
5#
6# ifaceScheduler --
7# interface scheduler
8#
a6f80f0e 9
10import os
11import re
12from statemanager import *
13from iface import *
14from graph import *
15from collections import deque
16from collections import OrderedDict
17import imp
18import pprint
19import logging
20from graph import *
21from collections import deque
22from threading import *
23from ifupdownbase import *
24
25class ifaceScheduler(ifupdownBase):
26 """ scheduler to schedule configuration of interfaces.
27
28
29 supports scheduling of interfaces serially in plain interface list
30 or dependency graph format.
31 """
32
3e8ee54f 33 def __init__(self, force=False):
a6f80f0e 34 self.logger = logging.getLogger('ifupdown.' +
35 self.__class__.__name__)
3e8ee54f 36 self.FORCE = force
a6f80f0e 37
37c0543d 38 def run_iface_subop(self, ifupdownobj, ifaceobj, op, subop, mlist, cenv):
a6f80f0e 39 """ Runs sub operation on an interface """
40
41 self.logger.debug('%s: ' %ifaceobj.get_name() + 'op %s' %op +
42 ' subop = %s' %subop)
43
37c0543d 44 for mname in mlist:
45 m = ifupdownobj.modules.get(mname)
a6f80f0e 46 err = 0
47 try:
37c0543d 48 if hasattr(m, 'run') == True:
49 self.logger.debug('%s: %s : running module %s'
50 %(ifaceobj.get_name(), subop, mname))
739f665b 51 if op == 'query-checkcurr':
37c0543d 52 # Dont check state if the interface object was
53 # auto generated
54 if ((ifaceobj.priv_flags & ifupdownobj.BUILTIN) != 0 or
55 (ifaceobj.priv_flags & ifupdownobj.NOCONFIG) != 0):
56 continue
57 m.run(ifaceobj, subop,
a6f80f0e 58 query_ifaceobj=ifupdownobj.create_ifaceobjcurr(
739f665b 59 ifaceobj))
a6f80f0e 60 else:
61 m.run(ifaceobj, subop)
a6f80f0e 62 except Exception, e:
63 err = 1
3e8ee54f 64 self.log_error(str(e))
a6f80f0e 65 finally:
739f665b 66 if op[:5] != 'query':
a6f80f0e 67 if err == 1:
68 ifupdownobj.set_iface_state(ifaceobj,
69 ifaceState.from_str(subop),
70 ifaceStatus.ERROR)
71 else:
72 ifupdownobj.set_iface_state(ifaceobj,
73 ifaceState.from_str(subop),
74 ifaceStatus.SUCCESS)
75
37c0543d 76 # execute /etc/network/ scripts
77 subop_dict = ifupdownobj.operations_compat.get(op)
78 if subop_dict is None: return
79 for mname in subop_dict.get(subop):
80 self.logger.debug('%s: %s : running script %s'
81 %(ifaceobj.get_name(), subop, mname))
82 try:
83 self.exec_command(mname, cmdenv=cenv)
84 except Exception, e:
85 err = 1
86 self.log_error(str(e))
87
a6f80f0e 88 def run_iface_subops(self, ifupdownobj, ifaceobj, op):
89 """ Runs all sub operations on an interface """
90
91 # For backward compatibility execute scripts with
92 # environent set
93 cenv = ifupdownobj.generate_running_env(ifaceobj, op)
94
95 # Each sub operation has a module list
96 subopdict = ifupdownobj.operations.get(op)
37c0543d 97 for subop, mlist in subopdict.items():
98 self.run_iface_subop(ifupdownobj, ifaceobj, op, subop, mlist, cenv)
a6f80f0e 99
100
101 def run_iface(self, ifupdownobj, ifacename, op):
102 """ Runs operation on an interface """
103
104 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
105 for i in ifaceobjs:
106 if (op != 'query' and ifupdownobj.STATE_CHECK == True and
107 ifupdownobj.is_valid_state_transition(i, op) == False and
108 ifupdownobj.FORCE == False):
109 self.logger.warning('%s' %ifacename +
110 ' already %s' %op)
111 continue
112
113 self.run_iface_subops(ifupdownobj, i, op)
114
115
116 def run_iface_list(self, ifupdownobj, ifacenames, operation,
117 sorted_by_dependency=False):
118 """ Runs interface list serially executing all sub operations on
119 each interface at a time. """
120
121 self.logger.debug('run_iface_list: running interface list for ' +
122 'operation %s' %operation)
123
124 iface_run_queue = deque(ifacenames)
125 for i in range(0, len(iface_run_queue)):
126 if operation == 'up':
127 # XXX: simplify this
128 if sorted_by_dependency == True:
129 ifacename = iface_run_queue.pop()
130 else:
131 ifacename = iface_run_queue.popleft()
132 else:
133 if sorted_by_dependency == True:
134 ifacename = iface_run_queue.popleft()
135 else:
136 ifacename = iface_run_queue.pop()
137
138 try:
139 self.run_iface(ifupdownobj, ifacename, operation)
140 except Exception, e:
3e8ee54f 141 self.log_error(str(e))
a6f80f0e 142
143 def run_iface_list_subop(self, ifupdownobj, ifacenames, op, subop, mdict,
144 sorted_by_dependency=False):
145 """ Runs interface list through sub operation handler. """
146
37c0543d 147 self.logger.debug('running sub operation %s on all given interfaces'
148 %subop)
a6f80f0e 149 iface_run_queue = deque(ifacenames)
150 for i in range(0, len(iface_run_queue)):
151 if op == 'up':
152 # XXX: simplify this
153 if sorted_by_dependency == True:
154 ifacename = iface_run_queue.pop()
155 else:
156 ifacename = iface_run_queue.popleft()
157 else:
158 if sorted_by_dependency == True:
159 ifacename = iface_run_queue.popleft()
160 else:
161 ifacename = iface_run_queue.pop()
162
163 try:
164 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
165 for ifaceobj in ifaceobjs:
166 if (op != 'query' and ifupdownobj.STATE_CHECK == True and
167 ifupdownobj.is_valid_state_transition(ifaceobj,
168 op) == False and ifupdownobj.FORCE == False):
169 if subop == 'post-down' or subop == 'post-up':
170 self.logger.warning('%s: ' %ifacename +
171 ' already %s' %op)
172 continue
173
174 cenv = ifupdownobj.generate_running_env(ifaceobj, op)
175 self.run_iface_subop(ifupdownobj, ifaceobj, op, subop,
176 mdict, cenv)
177 except Exception, e:
3e8ee54f 178 self.log_error(str(e))
a6f80f0e 179
180 def run_iface_list_stages(self, ifupdownobj, ifacenames, op,
181 sorted_by_dependency=False):
182 """ Runs interface list through sub operations handler
183
184 Unlike run_iface_list, this method executes a sub operation on the
185 entire interface list before proceeding to the next sub-operation.
186 ie operation 'pre-up' is run through the entire interface list before
187 'up'
188 """
189
a6f80f0e 190 # Each sub operation has a module list
191 subopdict = ifupdownobj.operations.get(op)
192 for subop, mdict in subopdict.items():
193 self.run_iface_list_subop(ifupdownobj, ifacenames, op, subop, mdict,
194 sorted_by_dependency)
195
196
37c0543d 197 def run_iface_dependency_graph(self, ifupdownobj, dependency_graphs,
198 operation, indegrees=None,
199 graphsortall=False):
a6f80f0e 200 """ runs interface dependency graph """
201
a6f80f0e 202
37c0543d 203 if indegrees is None:
204 indegrees = OrderedDict()
205 for ifacename in dependency_graphs.keys():
206 indegrees[ifacename] = ifupdownobj.get_iface_refcnt(ifacename)
a6f80f0e 207
208 if self.logger.isEnabledFor(logging.DEBUG) == True:
209 self.logger.debug('indegree array :')
37c0543d 210 self.logger.debug(ifupdownobj.pp.pformat(indegrees))
a6f80f0e 211
212 try:
213 self.logger.debug('calling topological sort on the graph ...')
37c0543d 214 if graphsortall == True:
215 sorted_ifacenames = graph.topological_sort_graphs_all(
216 dependency_graphs, indegrees)
217 else:
218 sorted_ifacenames = graph.topological_sort_graphs(
219 dependency_graphs, indegrees)
cca03c30 220 except Exception:
a6f80f0e 221 raise
222
223 self.logger.debug('sorted iface list = %s' %sorted_ifacenames)
224
225 #self.run_iface_list(ifupdownobj, sorted_ifacenames, operation,
226 # sorted_by_dependency=True)
227
228 self.run_iface_list_stages(ifupdownobj, sorted_ifacenames, operation,
229 sorted_by_dependency=True)
230
231
232 def init_tokens(self, count):
233 self.token_pool = BoundedSemaphore(count)
234 self.logger.debug('initialized bounded semaphore with %d' %count)
235
236 def accquire_token(self, logprefix=''):
237 self.token_pool.acquire()
238 self.logger.debug('%s ' %logprefix + 'acquired token')
239
240 def release_token(self, logprefix=''):
241 self.token_pool.release()
242 self.logger.debug('%s ' %logprefix + 'release token')
243
244 def run_iface_parallel(self, ifupdownobj, ifacename, op):
245 """ Configures interface in parallel.
246
247 Executes all its direct dependents in parallel
248
249 """
250
251 self.logger.debug('%s:' %ifacename + ' %s' %op)
252 self.accquire_token(iface)
253
254 # Each iface can have a list of objects
255 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
256 if ifaceobjs is None:
257 self.logger.warning('%s: ' %ifacename + 'not found')
258 self.release_token(ifacename)
259 return -1
260
261 for ifaceobj in ifaceobjs:
262 # Run dependents
263 dlist = ifaceobj.get_dependents()
264 if dlist is not None and len(dlist) > 0:
265 self.logger.debug('%s:' %ifacename +
266 ' found dependents: %s' %str(dlist))
267 try:
268 self.release_token(ifacename)
269 self.run_iface_list_parallel(ifacename, ifupdownobj,
270 dlist, op)
271 self.accquire_token(ifacename)
272 except Exception, e:
3e8ee54f 273 if (self.ignore_error(str(e)) == True):
a6f80f0e 274 pass
275 else:
276 # Dont bring the iface up if children did not come up
277 self.logger.debug('%s:' %ifacename +
278 ' there was an error bringing %s' %op +
279 ' dependents (%s)', str(e))
280 ifupdownobj.set_iface_state(ifaceobj,
281 ifaceState.from_str(
282 ifupdownobj.get_subops(op)[0]),
283 ifaceStatus.ERROR)
284 return -1
285
286 if (op != 'query' and ifupdownobj.STATE_CHECK == True and
287 ifupdownobj.is_valid_state_transition(ifaceobj,
288 op) == False and ifupdownobj.FORCE == False):
289 self.logger.warning('%s:' %ifacename + ' already %s' %op)
290 continue
291
292
293 # Run all sub operations sequentially
294 try:
295 self.logger.debug('%s:' %ifacename + ' running sub-operations')
296 self.run_iface_subops(ifupdownobj, ifaceobj, op)
297 except Exception, e:
298 self.logger.error('%s:' %ifacename +
299 ' error running sub operations (%s)' %str(e))
300
301 self.release_token(ifacename)
302
303
304 def run_iface_list_parallel(self, parent, ifupdownobj, ifacenames, op):
305 """ Runs interface list in parallel """
306
307 running_threads = OrderedDict()
308 err = 0
309
310 for ifacename in ifacenames:
311 try:
312 self.accquire_token(parent)
313 running_threads[ifacename] = Thread(None,
314 self.run_iface_parallel, ifacename,
315 args=(ifupdownobj, ifacename, op))
316 running_threads[ifacename].start()
317 self.release_token(parent)
318 except Exception, e:
319 self.release_token(parent)
320 if (ifupdownobj.ignore_error(str(e)) == True):
321 pass
322 else:
323 raise Exception('error starting thread for iface %s'
324 %ifacename)
325
326
327 self.logger.debug('%s' %parent + 'waiting for all the threads ...')
328 for ifacename, t in running_threads.items():
329 t.join()
330 if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS:
331 err += 1
332
333 return err
334
335 def run_iface_graphs_parallel(self, parent, ifupdownobj, ifacenames, op):
336 """ Runs iface graphs in parallel """
337
338 running_threads = OrderedDict()
339 err = 0
340
341 for ifacename in ifacenames:
342 try:
343 self.accquire_graph_token(parent)
344 running_threads[ifacename] = Thread(None,
345 self.run_iface_parallel, ifacename,
346 args=(ifupdownobj, ifacename, op))
347 running_threads[ifacename].start()
348 self.release_graph_token(parent)
349 except Exception, e:
350 self.release_graph_token(parent)
351 if (ifupdownobj.ignore_error(str(e)) == True):
352 pass
353 else:
354 raise Exception('error starting thread for iface %s'
355 %ifacename)
356
357 self.logger.info('%s' %parent + 'waiting for all the threads ...')
358 for ifacename, t in running_threads.items():
359 t.join()
360 # Check status of thread
361 # XXX: Check all objs
362 if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS:
363 err += 1
364
365 return err
366
367 def run_iface_dependency_graph_parallel(self, ifupdownobj, dependency_graph,
368 operation):
369 """ Runs iface dependeny graph in parallel.
370
371 arguments:
372 ifupdownobj -- ifupdown object (used for getting and updating iface
373 object state)
374 dependency_graph -- dependency graph with
375 operation -- 'up' or 'down' or 'query'
376
377 """
378
379 self.logger.debug('running dependency graph in parallel ..')
380
381 run_queue = []
382
383 # Build a list of ifaces that dont have any dependencies
384 for ifacename in dependency_graph.keys():
385 if ifupdownobj.get_iface_refcnt(ifacename) == 0:
386 run_queue.append(ifacename)
387
388 self.logger.debug('graph roots (interfaces that dont have dependents):' +
389 ' %s' %str(run_queue))
390
391 self.init_tokens(ifupdownobj.get_njobs())
392
393 return self.run_iface_list_parallel('main', ifupdownobj, run_queue,
394 operation)
395
396 # OR
397 # Run one graph at a time
398 #for iface in run_queue:
399 # self.run_iface_list_parallel('main', ifupdownobj, [iface],
400 # operation)
401