]> git.proxmox.com Git - mirror_ifupdown2.git/blame - pkg/scheduler.py
Some fixes + enhancements
[mirror_ifupdown2.git] / pkg / scheduler.py
CommitLineData
a6f80f0e 1#!/usr/bin/python
2
3import os
4import re
5from statemanager import *
6from iface import *
7from graph import *
8from collections import deque
9from collections import OrderedDict
10import imp
11import pprint
12import logging
13from graph import *
14from collections import deque
15from threading import *
16from ifupdownbase import *
17
18class ifaceScheduler(ifupdownBase):
19 """ scheduler to schedule configuration of interfaces.
20
21
22 supports scheduling of interfaces serially in plain interface list
23 or dependency graph format.
24 """
25
26 def __init__(self):
27 self.logger = logging.getLogger('ifupdown.' +
28 self.__class__.__name__)
29
30 def run_iface_subop(self, ifupdownobj, ifaceobj, op, subop, mdict, cenv):
31 """ Runs sub operation on an interface """
32
33 self.logger.debug('%s: ' %ifaceobj.get_name() + 'op %s' %op +
34 ' subop = %s' %subop)
35
36 for mname, mdata in mdict.items():
37 m = mdata.get('module')
38 err = 0
39 try:
40 if (mdata.get('ftype') == 'pmodule' and
41 hasattr(m, 'run') == True):
42 self.logger.debug('%s: ' %ifaceobj.get_name() +
43 'running module %s' %mname +
44 ' op %s' %op + ' subop %s' %subop)
45 if op == 'query':
46 m.run(ifaceobj, subop, query=True,
47 query_ifaceobj=ifupdownobj.create_ifaceobjcurr(
48 ifaceobj.get_name()))
49 else:
50 m.run(ifaceobj, subop)
51 else:
52 self.logger.debug('%s: ' %ifaceobj.get_name() +
53 'running script %s' %mname +
54 ' op %s' %op + ' subop %s' %subop)
55 self.exec_command(m, cmdenv=cenv)
56 except Exception, e:
57 err = 1
58 if ifupdownobj.ignore_error(str(e)) == True:
59 pass
60 else:
61 raise
62 finally:
63 if op != 'query':
64 if err == 1:
65 ifupdownobj.set_iface_state(ifaceobj,
66 ifaceState.from_str(subop),
67 ifaceStatus.ERROR)
68 else:
69 ifupdownobj.set_iface_state(ifaceobj,
70 ifaceState.from_str(subop),
71 ifaceStatus.SUCCESS)
72
73 def run_iface_subops(self, ifupdownobj, ifaceobj, op):
74 """ Runs all sub operations on an interface """
75
76 # For backward compatibility execute scripts with
77 # environent set
78 cenv = ifupdownobj.generate_running_env(ifaceobj, op)
79
80 # Each sub operation has a module list
81 subopdict = ifupdownobj.operations.get(op)
82 for subop, mdict in subopdict.items():
83 self.run_iface_subop(ifupdownobj, ifaceobj, op, subop, mdict, cenv)
84
85
86 def run_iface(self, ifupdownobj, ifacename, op):
87 """ Runs operation on an interface """
88
89 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
90 for i in ifaceobjs:
91 if (op != 'query' and ifupdownobj.STATE_CHECK == True and
92 ifupdownobj.is_valid_state_transition(i, op) == False and
93 ifupdownobj.FORCE == False):
94 self.logger.warning('%s' %ifacename +
95 ' already %s' %op)
96 continue
97
98 self.run_iface_subops(ifupdownobj, i, op)
99
100
101 def run_iface_list(self, ifupdownobj, ifacenames, operation,
102 sorted_by_dependency=False):
103 """ Runs interface list serially executing all sub operations on
104 each interface at a time. """
105
106 self.logger.debug('run_iface_list: running interface list for ' +
107 'operation %s' %operation)
108
109 iface_run_queue = deque(ifacenames)
110 for i in range(0, len(iface_run_queue)):
111 if operation == 'up':
112 # XXX: simplify this
113 if sorted_by_dependency == True:
114 ifacename = iface_run_queue.pop()
115 else:
116 ifacename = iface_run_queue.popleft()
117 else:
118 if sorted_by_dependency == True:
119 ifacename = iface_run_queue.popleft()
120 else:
121 ifacename = iface_run_queue.pop()
122
123 try:
124 self.run_iface(ifupdownobj, ifacename, operation)
125 except Exception, e:
126 if (ifupdownobj.ignore_error(str(e)) == True):
127 pass
128 else:
129 raise
130
131
132 def run_iface_list_subop(self, ifupdownobj, ifacenames, op, subop, mdict,
133 sorted_by_dependency=False):
134 """ Runs interface list through sub operation handler. """
135
136 self.logger.debug('running sub operation %s on all given interfaces' %op)
137
138 iface_run_queue = deque(ifacenames)
139 for i in range(0, len(iface_run_queue)):
140 if op == 'up':
141 # XXX: simplify this
142 if sorted_by_dependency == True:
143 ifacename = iface_run_queue.pop()
144 else:
145 ifacename = iface_run_queue.popleft()
146 else:
147 if sorted_by_dependency == True:
148 ifacename = iface_run_queue.popleft()
149 else:
150 ifacename = iface_run_queue.pop()
151
152 try:
153 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
154 for ifaceobj in ifaceobjs:
155 if (op != 'query' and ifupdownobj.STATE_CHECK == True and
156 ifupdownobj.is_valid_state_transition(ifaceobj,
157 op) == False and ifupdownobj.FORCE == False):
158 if subop == 'post-down' or subop == 'post-up':
159 self.logger.warning('%s: ' %ifacename +
160 ' already %s' %op)
161 continue
162
163 cenv = ifupdownobj.generate_running_env(ifaceobj, op)
164 self.run_iface_subop(ifupdownobj, ifaceobj, op, subop,
165 mdict, cenv)
166 except Exception, e:
167 if (ifupdownobj.ignore_error(str(e)) == True):
168 pass
169 else:
170 raise
171
172
173 def run_iface_list_stages(self, ifupdownobj, ifacenames, op,
174 sorted_by_dependency=False):
175 """ Runs interface list through sub operations handler
176
177 Unlike run_iface_list, this method executes a sub operation on the
178 entire interface list before proceeding to the next sub-operation.
179 ie operation 'pre-up' is run through the entire interface list before
180 'up'
181 """
182
183 self.logger.debug('run_iface_list_stages: running interface list for %s'
184 %op)
185
186 # Each sub operation has a module list
187 subopdict = ifupdownobj.operations.get(op)
188 for subop, mdict in subopdict.items():
189 self.run_iface_list_subop(ifupdownobj, ifacenames, op, subop, mdict,
190 sorted_by_dependency)
191
192
193 def run_iface_dependency_graph(self, ifupdownobj, dependency_graph,
194 operation):
195 """ runs interface dependency graph """
196
197 indegrees = OrderedDict()
198
199 self.logger.debug('creating indegree array ...')
200 for ifacename in dependency_graph.keys():
201 indegrees[ifacename] = ifupdownobj.get_iface_refcnt(ifacename)
202
203 if self.logger.isEnabledFor(logging.DEBUG) == True:
204 self.logger.debug('indegree array :')
205 ifupdownobj.pp.pprint(indegrees)
206
207 try:
208 self.logger.debug('calling topological sort on the graph ...')
209 sorted_ifacenames = graph.topological_sort(dependency_graph,
210 indegrees)
211 except Exception, e:
212 raise
213
214 self.logger.debug('sorted iface list = %s' %sorted_ifacenames)
215
216 #self.run_iface_list(ifupdownobj, sorted_ifacenames, operation,
217 # sorted_by_dependency=True)
218
219 self.run_iface_list_stages(ifupdownobj, sorted_ifacenames, operation,
220 sorted_by_dependency=True)
221
222
223 def init_tokens(self, count):
224 self.token_pool = BoundedSemaphore(count)
225 self.logger.debug('initialized bounded semaphore with %d' %count)
226
227 def accquire_token(self, logprefix=''):
228 self.token_pool.acquire()
229 self.logger.debug('%s ' %logprefix + 'acquired token')
230
231 def release_token(self, logprefix=''):
232 self.token_pool.release()
233 self.logger.debug('%s ' %logprefix + 'release token')
234
235 def run_iface_parallel(self, ifupdownobj, ifacename, op):
236 """ Configures interface in parallel.
237
238 Executes all its direct dependents in parallel
239
240 """
241
242 self.logger.debug('%s:' %ifacename + ' %s' %op)
243 self.accquire_token(iface)
244
245 # Each iface can have a list of objects
246 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
247 if ifaceobjs is None:
248 self.logger.warning('%s: ' %ifacename + 'not found')
249 self.release_token(ifacename)
250 return -1
251
252 for ifaceobj in ifaceobjs:
253 # Run dependents
254 dlist = ifaceobj.get_dependents()
255 if dlist is not None and len(dlist) > 0:
256 self.logger.debug('%s:' %ifacename +
257 ' found dependents: %s' %str(dlist))
258 try:
259 self.release_token(ifacename)
260 self.run_iface_list_parallel(ifacename, ifupdownobj,
261 dlist, op)
262 self.accquire_token(ifacename)
263 except Exception, e:
264 if (ifupdownobj.ignore_error(str(e)) == True):
265 pass
266 else:
267 # Dont bring the iface up if children did not come up
268 self.logger.debug('%s:' %ifacename +
269 ' there was an error bringing %s' %op +
270 ' dependents (%s)', str(e))
271 ifupdownobj.set_iface_state(ifaceobj,
272 ifaceState.from_str(
273 ifupdownobj.get_subops(op)[0]),
274 ifaceStatus.ERROR)
275 return -1
276
277 if (op != 'query' and ifupdownobj.STATE_CHECK == True and
278 ifupdownobj.is_valid_state_transition(ifaceobj,
279 op) == False and ifupdownobj.FORCE == False):
280 self.logger.warning('%s:' %ifacename + ' already %s' %op)
281 continue
282
283
284 # Run all sub operations sequentially
285 try:
286 self.logger.debug('%s:' %ifacename + ' running sub-operations')
287 self.run_iface_subops(ifupdownobj, ifaceobj, op)
288 except Exception, e:
289 self.logger.error('%s:' %ifacename +
290 ' error running sub operations (%s)' %str(e))
291
292 self.release_token(ifacename)
293
294
295 def run_iface_list_parallel(self, parent, ifupdownobj, ifacenames, op):
296 """ Runs interface list in parallel """
297
298 running_threads = OrderedDict()
299 err = 0
300
301 for ifacename in ifacenames:
302 try:
303 self.accquire_token(parent)
304 running_threads[ifacename] = Thread(None,
305 self.run_iface_parallel, ifacename,
306 args=(ifupdownobj, ifacename, op))
307 running_threads[ifacename].start()
308 self.release_token(parent)
309 except Exception, e:
310 self.release_token(parent)
311 if (ifupdownobj.ignore_error(str(e)) == True):
312 pass
313 else:
314 raise Exception('error starting thread for iface %s'
315 %ifacename)
316
317
318 self.logger.debug('%s' %parent + 'waiting for all the threads ...')
319 for ifacename, t in running_threads.items():
320 t.join()
321 if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS:
322 err += 1
323
324 return err
325
326 def run_iface_graphs_parallel(self, parent, ifupdownobj, ifacenames, op):
327 """ Runs iface graphs in parallel """
328
329 running_threads = OrderedDict()
330 err = 0
331
332 for ifacename in ifacenames:
333 try:
334 self.accquire_graph_token(parent)
335 running_threads[ifacename] = Thread(None,
336 self.run_iface_parallel, ifacename,
337 args=(ifupdownobj, ifacename, op))
338 running_threads[ifacename].start()
339 self.release_graph_token(parent)
340 except Exception, e:
341 self.release_graph_token(parent)
342 if (ifupdownobj.ignore_error(str(e)) == True):
343 pass
344 else:
345 raise Exception('error starting thread for iface %s'
346 %ifacename)
347
348 self.logger.info('%s' %parent + 'waiting for all the threads ...')
349 for ifacename, t in running_threads.items():
350 t.join()
351 # Check status of thread
352 # XXX: Check all objs
353 if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS:
354 err += 1
355
356 return err
357
358 def run_iface_dependency_graph_parallel(self, ifupdownobj, dependency_graph,
359 operation):
360 """ Runs iface dependeny graph in parallel.
361
362 arguments:
363 ifupdownobj -- ifupdown object (used for getting and updating iface
364 object state)
365 dependency_graph -- dependency graph with
366 operation -- 'up' or 'down' or 'query'
367
368 """
369
370 self.logger.debug('running dependency graph in parallel ..')
371
372 run_queue = []
373
374 # Build a list of ifaces that dont have any dependencies
375 for ifacename in dependency_graph.keys():
376 if ifupdownobj.get_iface_refcnt(ifacename) == 0:
377 run_queue.append(ifacename)
378
379 self.logger.debug('graph roots (interfaces that dont have dependents):' +
380 ' %s' %str(run_queue))
381
382 self.init_tokens(ifupdownobj.get_njobs())
383
384 return self.run_iface_list_parallel('main', ifupdownobj, run_queue,
385 operation)
386
387 # OR
388 # Run one graph at a time
389 #for iface in run_queue:
390 # self.run_iface_list_parallel('main', ifupdownobj, [iface],
391 # operation)
392