]> git.proxmox.com Git - mirror_ifupdown2.git/blame - pkg/scheduler.py
Update TODO lists
[mirror_ifupdown2.git] / pkg / scheduler.py
CommitLineData
a6f80f0e 1#!/usr/bin/python
3e8ee54f 2#
3# Copyright 2013. Cumulus Networks, Inc.
4# Author: Roopa Prabhu, roopa@cumulusnetworks.com
5#
6# ifaceScheduler --
7# interface scheduler
8#
a6f80f0e 9
10import os
11import re
12from statemanager import *
13from iface import *
14from graph import *
15from collections import deque
16from collections import OrderedDict
17import imp
18import pprint
19import logging
20from graph import *
21from collections import deque
22from threading import *
23from ifupdownbase import *
24
25class ifaceScheduler(ifupdownBase):
26 """ scheduler to schedule configuration of interfaces.
27
28
29 supports scheduling of interfaces serially in plain interface list
30 or dependency graph format.
31 """
32
3e8ee54f 33 def __init__(self, force=False):
a6f80f0e 34 self.logger = logging.getLogger('ifupdown.' +
35 self.__class__.__name__)
3e8ee54f 36 self.FORCE = force
a6f80f0e 37
38 def run_iface_subop(self, ifupdownobj, ifaceobj, op, subop, mdict, cenv):
39 """ Runs sub operation on an interface """
40
41 self.logger.debug('%s: ' %ifaceobj.get_name() + 'op %s' %op +
42 ' subop = %s' %subop)
43
44 for mname, mdata in mdict.items():
45 m = mdata.get('module')
46 err = 0
47 try:
48 if (mdata.get('ftype') == 'pmodule' and
49 hasattr(m, 'run') == True):
50 self.logger.debug('%s: ' %ifaceobj.get_name() +
51 'running module %s' %mname +
52 ' op %s' %op + ' subop %s' %subop)
53 if op == 'query':
54 m.run(ifaceobj, subop, query=True,
55 query_ifaceobj=ifupdownobj.create_ifaceobjcurr(
56 ifaceobj.get_name()))
57 else:
58 m.run(ifaceobj, subop)
59 else:
60 self.logger.debug('%s: ' %ifaceobj.get_name() +
61 'running script %s' %mname +
62 ' op %s' %op + ' subop %s' %subop)
63 self.exec_command(m, cmdenv=cenv)
64 except Exception, e:
65 err = 1
3e8ee54f 66 self.log_error(str(e))
a6f80f0e 67 finally:
68 if op != 'query':
69 if err == 1:
70 ifupdownobj.set_iface_state(ifaceobj,
71 ifaceState.from_str(subop),
72 ifaceStatus.ERROR)
73 else:
74 ifupdownobj.set_iface_state(ifaceobj,
75 ifaceState.from_str(subop),
76 ifaceStatus.SUCCESS)
77
78 def run_iface_subops(self, ifupdownobj, ifaceobj, op):
79 """ Runs all sub operations on an interface """
80
81 # For backward compatibility execute scripts with
82 # environent set
83 cenv = ifupdownobj.generate_running_env(ifaceobj, op)
84
85 # Each sub operation has a module list
86 subopdict = ifupdownobj.operations.get(op)
87 for subop, mdict in subopdict.items():
88 self.run_iface_subop(ifupdownobj, ifaceobj, op, subop, mdict, cenv)
89
90
91 def run_iface(self, ifupdownobj, ifacename, op):
92 """ Runs operation on an interface """
93
94 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
95 for i in ifaceobjs:
96 if (op != 'query' and ifupdownobj.STATE_CHECK == True and
97 ifupdownobj.is_valid_state_transition(i, op) == False and
98 ifupdownobj.FORCE == False):
99 self.logger.warning('%s' %ifacename +
100 ' already %s' %op)
101 continue
102
103 self.run_iface_subops(ifupdownobj, i, op)
104
105
106 def run_iface_list(self, ifupdownobj, ifacenames, operation,
107 sorted_by_dependency=False):
108 """ Runs interface list serially executing all sub operations on
109 each interface at a time. """
110
111 self.logger.debug('run_iface_list: running interface list for ' +
112 'operation %s' %operation)
113
114 iface_run_queue = deque(ifacenames)
115 for i in range(0, len(iface_run_queue)):
116 if operation == 'up':
117 # XXX: simplify this
118 if sorted_by_dependency == True:
119 ifacename = iface_run_queue.pop()
120 else:
121 ifacename = iface_run_queue.popleft()
122 else:
123 if sorted_by_dependency == True:
124 ifacename = iface_run_queue.popleft()
125 else:
126 ifacename = iface_run_queue.pop()
127
128 try:
129 self.run_iface(ifupdownobj, ifacename, operation)
130 except Exception, e:
3e8ee54f 131 self.log_error(str(e))
a6f80f0e 132
133 def run_iface_list_subop(self, ifupdownobj, ifacenames, op, subop, mdict,
134 sorted_by_dependency=False):
135 """ Runs interface list through sub operation handler. """
136
137 self.logger.debug('running sub operation %s on all given interfaces' %op)
138
139 iface_run_queue = deque(ifacenames)
140 for i in range(0, len(iface_run_queue)):
141 if op == 'up':
142 # XXX: simplify this
143 if sorted_by_dependency == True:
144 ifacename = iface_run_queue.pop()
145 else:
146 ifacename = iface_run_queue.popleft()
147 else:
148 if sorted_by_dependency == True:
149 ifacename = iface_run_queue.popleft()
150 else:
151 ifacename = iface_run_queue.pop()
152
153 try:
154 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
155 for ifaceobj in ifaceobjs:
156 if (op != 'query' and ifupdownobj.STATE_CHECK == True and
157 ifupdownobj.is_valid_state_transition(ifaceobj,
158 op) == False and ifupdownobj.FORCE == False):
159 if subop == 'post-down' or subop == 'post-up':
160 self.logger.warning('%s: ' %ifacename +
161 ' already %s' %op)
162 continue
163
164 cenv = ifupdownobj.generate_running_env(ifaceobj, op)
165 self.run_iface_subop(ifupdownobj, ifaceobj, op, subop,
166 mdict, cenv)
167 except Exception, e:
3e8ee54f 168 self.log_error(str(e))
a6f80f0e 169
170 def run_iface_list_stages(self, ifupdownobj, ifacenames, op,
171 sorted_by_dependency=False):
172 """ Runs interface list through sub operations handler
173
174 Unlike run_iface_list, this method executes a sub operation on the
175 entire interface list before proceeding to the next sub-operation.
176 ie operation 'pre-up' is run through the entire interface list before
177 'up'
178 """
179
180 self.logger.debug('run_iface_list_stages: running interface list for %s'
181 %op)
182
183 # Each sub operation has a module list
184 subopdict = ifupdownobj.operations.get(op)
185 for subop, mdict in subopdict.items():
186 self.run_iface_list_subop(ifupdownobj, ifacenames, op, subop, mdict,
187 sorted_by_dependency)
188
189
190 def run_iface_dependency_graph(self, ifupdownobj, dependency_graph,
191 operation):
192 """ runs interface dependency graph """
193
194 indegrees = OrderedDict()
195
196 self.logger.debug('creating indegree array ...')
197 for ifacename in dependency_graph.keys():
198 indegrees[ifacename] = ifupdownobj.get_iface_refcnt(ifacename)
199
200 if self.logger.isEnabledFor(logging.DEBUG) == True:
201 self.logger.debug('indegree array :')
202 ifupdownobj.pp.pprint(indegrees)
203
204 try:
205 self.logger.debug('calling topological sort on the graph ...')
206 sorted_ifacenames = graph.topological_sort(dependency_graph,
207 indegrees)
208 except Exception, e:
209 raise
210
211 self.logger.debug('sorted iface list = %s' %sorted_ifacenames)
212
213 #self.run_iface_list(ifupdownobj, sorted_ifacenames, operation,
214 # sorted_by_dependency=True)
215
216 self.run_iface_list_stages(ifupdownobj, sorted_ifacenames, operation,
217 sorted_by_dependency=True)
218
219
220 def init_tokens(self, count):
221 self.token_pool = BoundedSemaphore(count)
222 self.logger.debug('initialized bounded semaphore with %d' %count)
223
224 def accquire_token(self, logprefix=''):
225 self.token_pool.acquire()
226 self.logger.debug('%s ' %logprefix + 'acquired token')
227
228 def release_token(self, logprefix=''):
229 self.token_pool.release()
230 self.logger.debug('%s ' %logprefix + 'release token')
231
232 def run_iface_parallel(self, ifupdownobj, ifacename, op):
233 """ Configures interface in parallel.
234
235 Executes all its direct dependents in parallel
236
237 """
238
239 self.logger.debug('%s:' %ifacename + ' %s' %op)
240 self.accquire_token(iface)
241
242 # Each iface can have a list of objects
243 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
244 if ifaceobjs is None:
245 self.logger.warning('%s: ' %ifacename + 'not found')
246 self.release_token(ifacename)
247 return -1
248
249 for ifaceobj in ifaceobjs:
250 # Run dependents
251 dlist = ifaceobj.get_dependents()
252 if dlist is not None and len(dlist) > 0:
253 self.logger.debug('%s:' %ifacename +
254 ' found dependents: %s' %str(dlist))
255 try:
256 self.release_token(ifacename)
257 self.run_iface_list_parallel(ifacename, ifupdownobj,
258 dlist, op)
259 self.accquire_token(ifacename)
260 except Exception, e:
3e8ee54f 261 if (self.ignore_error(str(e)) == True):
a6f80f0e 262 pass
263 else:
264 # Dont bring the iface up if children did not come up
265 self.logger.debug('%s:' %ifacename +
266 ' there was an error bringing %s' %op +
267 ' dependents (%s)', str(e))
268 ifupdownobj.set_iface_state(ifaceobj,
269 ifaceState.from_str(
270 ifupdownobj.get_subops(op)[0]),
271 ifaceStatus.ERROR)
272 return -1
273
274 if (op != 'query' and ifupdownobj.STATE_CHECK == True and
275 ifupdownobj.is_valid_state_transition(ifaceobj,
276 op) == False and ifupdownobj.FORCE == False):
277 self.logger.warning('%s:' %ifacename + ' already %s' %op)
278 continue
279
280
281 # Run all sub operations sequentially
282 try:
283 self.logger.debug('%s:' %ifacename + ' running sub-operations')
284 self.run_iface_subops(ifupdownobj, ifaceobj, op)
285 except Exception, e:
286 self.logger.error('%s:' %ifacename +
287 ' error running sub operations (%s)' %str(e))
288
289 self.release_token(ifacename)
290
291
292 def run_iface_list_parallel(self, parent, ifupdownobj, ifacenames, op):
293 """ Runs interface list in parallel """
294
295 running_threads = OrderedDict()
296 err = 0
297
298 for ifacename in ifacenames:
299 try:
300 self.accquire_token(parent)
301 running_threads[ifacename] = Thread(None,
302 self.run_iface_parallel, ifacename,
303 args=(ifupdownobj, ifacename, op))
304 running_threads[ifacename].start()
305 self.release_token(parent)
306 except Exception, e:
307 self.release_token(parent)
308 if (ifupdownobj.ignore_error(str(e)) == True):
309 pass
310 else:
311 raise Exception('error starting thread for iface %s'
312 %ifacename)
313
314
315 self.logger.debug('%s' %parent + 'waiting for all the threads ...')
316 for ifacename, t in running_threads.items():
317 t.join()
318 if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS:
319 err += 1
320
321 return err
322
323 def run_iface_graphs_parallel(self, parent, ifupdownobj, ifacenames, op):
324 """ Runs iface graphs in parallel """
325
326 running_threads = OrderedDict()
327 err = 0
328
329 for ifacename in ifacenames:
330 try:
331 self.accquire_graph_token(parent)
332 running_threads[ifacename] = Thread(None,
333 self.run_iface_parallel, ifacename,
334 args=(ifupdownobj, ifacename, op))
335 running_threads[ifacename].start()
336 self.release_graph_token(parent)
337 except Exception, e:
338 self.release_graph_token(parent)
339 if (ifupdownobj.ignore_error(str(e)) == True):
340 pass
341 else:
342 raise Exception('error starting thread for iface %s'
343 %ifacename)
344
345 self.logger.info('%s' %parent + 'waiting for all the threads ...')
346 for ifacename, t in running_threads.items():
347 t.join()
348 # Check status of thread
349 # XXX: Check all objs
350 if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS:
351 err += 1
352
353 return err
354
355 def run_iface_dependency_graph_parallel(self, ifupdownobj, dependency_graph,
356 operation):
357 """ Runs iface dependeny graph in parallel.
358
359 arguments:
360 ifupdownobj -- ifupdown object (used for getting and updating iface
361 object state)
362 dependency_graph -- dependency graph with
363 operation -- 'up' or 'down' or 'query'
364
365 """
366
367 self.logger.debug('running dependency graph in parallel ..')
368
369 run_queue = []
370
371 # Build a list of ifaces that dont have any dependencies
372 for ifacename in dependency_graph.keys():
373 if ifupdownobj.get_iface_refcnt(ifacename) == 0:
374 run_queue.append(ifacename)
375
376 self.logger.debug('graph roots (interfaces that dont have dependents):' +
377 ' %s' %str(run_queue))
378
379 self.init_tokens(ifupdownobj.get_njobs())
380
381 return self.run_iface_list_parallel('main', ifupdownobj, run_queue,
382 operation)
383
384 # OR
385 # Run one graph at a time
386 #for iface in run_queue:
387 # self.run_iface_list_parallel('main', ifupdownobj, [iface],
388 # operation)
389