]>
Commit | Line | Data |
---|---|---|
a6f80f0e | 1 | #!/usr/bin/python |
2 | ||
3 | import os | |
4 | import re | |
5 | from statemanager import * | |
6 | from iface import * | |
7 | from graph import * | |
8 | from collections import deque | |
9 | from collections import OrderedDict | |
10 | import imp | |
11 | import pprint | |
12 | import logging | |
13 | from graph import * | |
14 | from collections import deque | |
15 | from threading import * | |
16 | from ifupdownbase import * | |
17 | ||
18 | class ifaceScheduler(ifupdownBase): | |
19 | """ scheduler to schedule configuration of interfaces. | |
20 | ||
21 | ||
22 | supports scheduling of interfaces serially in plain interface list | |
23 | or dependency graph format. | |
24 | """ | |
25 | ||
26 | def __init__(self): | |
27 | self.logger = logging.getLogger('ifupdown.' + | |
28 | self.__class__.__name__) | |
29 | ||
30 | def run_iface_subop(self, ifupdownobj, ifaceobj, op, subop, mdict, cenv): | |
31 | """ Runs sub operation on an interface """ | |
32 | ||
33 | self.logger.debug('%s: ' %ifaceobj.get_name() + 'op %s' %op + | |
34 | ' subop = %s' %subop) | |
35 | ||
36 | for mname, mdata in mdict.items(): | |
37 | m = mdata.get('module') | |
38 | err = 0 | |
39 | try: | |
40 | if (mdata.get('ftype') == 'pmodule' and | |
41 | hasattr(m, 'run') == True): | |
42 | self.logger.debug('%s: ' %ifaceobj.get_name() + | |
43 | 'running module %s' %mname + | |
44 | ' op %s' %op + ' subop %s' %subop) | |
45 | if op == 'query': | |
46 | m.run(ifaceobj, subop, query=True, | |
47 | query_ifaceobj=ifupdownobj.create_ifaceobjcurr( | |
48 | ifaceobj.get_name())) | |
49 | else: | |
50 | m.run(ifaceobj, subop) | |
51 | else: | |
52 | self.logger.debug('%s: ' %ifaceobj.get_name() + | |
53 | 'running script %s' %mname + | |
54 | ' op %s' %op + ' subop %s' %subop) | |
55 | self.exec_command(m, cmdenv=cenv) | |
56 | except Exception, e: | |
57 | err = 1 | |
58 | if ifupdownobj.ignore_error(str(e)) == True: | |
59 | pass | |
60 | else: | |
61 | raise | |
62 | finally: | |
63 | if op != 'query': | |
64 | if err == 1: | |
65 | ifupdownobj.set_iface_state(ifaceobj, | |
66 | ifaceState.from_str(subop), | |
67 | ifaceStatus.ERROR) | |
68 | else: | |
69 | ifupdownobj.set_iface_state(ifaceobj, | |
70 | ifaceState.from_str(subop), | |
71 | ifaceStatus.SUCCESS) | |
72 | ||
73 | def run_iface_subops(self, ifupdownobj, ifaceobj, op): | |
74 | """ Runs all sub operations on an interface """ | |
75 | ||
76 | # For backward compatibility execute scripts with | |
77 | # environent set | |
78 | cenv = ifupdownobj.generate_running_env(ifaceobj, op) | |
79 | ||
80 | # Each sub operation has a module list | |
81 | subopdict = ifupdownobj.operations.get(op) | |
82 | for subop, mdict in subopdict.items(): | |
83 | self.run_iface_subop(ifupdownobj, ifaceobj, op, subop, mdict, cenv) | |
84 | ||
85 | ||
86 | def run_iface(self, ifupdownobj, ifacename, op): | |
87 | """ Runs operation on an interface """ | |
88 | ||
89 | ifaceobjs = ifupdownobj.get_iface_objs(ifacename) | |
90 | for i in ifaceobjs: | |
91 | if (op != 'query' and ifupdownobj.STATE_CHECK == True and | |
92 | ifupdownobj.is_valid_state_transition(i, op) == False and | |
93 | ifupdownobj.FORCE == False): | |
94 | self.logger.warning('%s' %ifacename + | |
95 | ' already %s' %op) | |
96 | continue | |
97 | ||
98 | self.run_iface_subops(ifupdownobj, i, op) | |
99 | ||
100 | ||
101 | def run_iface_list(self, ifupdownobj, ifacenames, operation, | |
102 | sorted_by_dependency=False): | |
103 | """ Runs interface list serially executing all sub operations on | |
104 | each interface at a time. """ | |
105 | ||
106 | self.logger.debug('run_iface_list: running interface list for ' + | |
107 | 'operation %s' %operation) | |
108 | ||
109 | iface_run_queue = deque(ifacenames) | |
110 | for i in range(0, len(iface_run_queue)): | |
111 | if operation == 'up': | |
112 | # XXX: simplify this | |
113 | if sorted_by_dependency == True: | |
114 | ifacename = iface_run_queue.pop() | |
115 | else: | |
116 | ifacename = iface_run_queue.popleft() | |
117 | else: | |
118 | if sorted_by_dependency == True: | |
119 | ifacename = iface_run_queue.popleft() | |
120 | else: | |
121 | ifacename = iface_run_queue.pop() | |
122 | ||
123 | try: | |
124 | self.run_iface(ifupdownobj, ifacename, operation) | |
125 | except Exception, e: | |
126 | if (ifupdownobj.ignore_error(str(e)) == True): | |
127 | pass | |
128 | else: | |
129 | raise | |
130 | ||
131 | ||
132 | def run_iface_list_subop(self, ifupdownobj, ifacenames, op, subop, mdict, | |
133 | sorted_by_dependency=False): | |
134 | """ Runs interface list through sub operation handler. """ | |
135 | ||
136 | self.logger.debug('running sub operation %s on all given interfaces' %op) | |
137 | ||
138 | iface_run_queue = deque(ifacenames) | |
139 | for i in range(0, len(iface_run_queue)): | |
140 | if op == 'up': | |
141 | # XXX: simplify this | |
142 | if sorted_by_dependency == True: | |
143 | ifacename = iface_run_queue.pop() | |
144 | else: | |
145 | ifacename = iface_run_queue.popleft() | |
146 | else: | |
147 | if sorted_by_dependency == True: | |
148 | ifacename = iface_run_queue.popleft() | |
149 | else: | |
150 | ifacename = iface_run_queue.pop() | |
151 | ||
152 | try: | |
153 | ifaceobjs = ifupdownobj.get_iface_objs(ifacename) | |
154 | for ifaceobj in ifaceobjs: | |
155 | if (op != 'query' and ifupdownobj.STATE_CHECK == True and | |
156 | ifupdownobj.is_valid_state_transition(ifaceobj, | |
157 | op) == False and ifupdownobj.FORCE == False): | |
158 | if subop == 'post-down' or subop == 'post-up': | |
159 | self.logger.warning('%s: ' %ifacename + | |
160 | ' already %s' %op) | |
161 | continue | |
162 | ||
163 | cenv = ifupdownobj.generate_running_env(ifaceobj, op) | |
164 | self.run_iface_subop(ifupdownobj, ifaceobj, op, subop, | |
165 | mdict, cenv) | |
166 | except Exception, e: | |
167 | if (ifupdownobj.ignore_error(str(e)) == True): | |
168 | pass | |
169 | else: | |
170 | raise | |
171 | ||
172 | ||
173 | def run_iface_list_stages(self, ifupdownobj, ifacenames, op, | |
174 | sorted_by_dependency=False): | |
175 | """ Runs interface list through sub operations handler | |
176 | ||
177 | Unlike run_iface_list, this method executes a sub operation on the | |
178 | entire interface list before proceeding to the next sub-operation. | |
179 | ie operation 'pre-up' is run through the entire interface list before | |
180 | 'up' | |
181 | """ | |
182 | ||
183 | self.logger.debug('run_iface_list_stages: running interface list for %s' | |
184 | %op) | |
185 | ||
186 | # Each sub operation has a module list | |
187 | subopdict = ifupdownobj.operations.get(op) | |
188 | for subop, mdict in subopdict.items(): | |
189 | self.run_iface_list_subop(ifupdownobj, ifacenames, op, subop, mdict, | |
190 | sorted_by_dependency) | |
191 | ||
192 | ||
193 | def run_iface_dependency_graph(self, ifupdownobj, dependency_graph, | |
194 | operation): | |
195 | """ runs interface dependency graph """ | |
196 | ||
197 | indegrees = OrderedDict() | |
198 | ||
199 | self.logger.debug('creating indegree array ...') | |
200 | for ifacename in dependency_graph.keys(): | |
201 | indegrees[ifacename] = ifupdownobj.get_iface_refcnt(ifacename) | |
202 | ||
203 | if self.logger.isEnabledFor(logging.DEBUG) == True: | |
204 | self.logger.debug('indegree array :') | |
205 | ifupdownobj.pp.pprint(indegrees) | |
206 | ||
207 | try: | |
208 | self.logger.debug('calling topological sort on the graph ...') | |
209 | sorted_ifacenames = graph.topological_sort(dependency_graph, | |
210 | indegrees) | |
211 | except Exception, e: | |
212 | raise | |
213 | ||
214 | self.logger.debug('sorted iface list = %s' %sorted_ifacenames) | |
215 | ||
216 | #self.run_iface_list(ifupdownobj, sorted_ifacenames, operation, | |
217 | # sorted_by_dependency=True) | |
218 | ||
219 | self.run_iface_list_stages(ifupdownobj, sorted_ifacenames, operation, | |
220 | sorted_by_dependency=True) | |
221 | ||
222 | ||
223 | def init_tokens(self, count): | |
224 | self.token_pool = BoundedSemaphore(count) | |
225 | self.logger.debug('initialized bounded semaphore with %d' %count) | |
226 | ||
227 | def accquire_token(self, logprefix=''): | |
228 | self.token_pool.acquire() | |
229 | self.logger.debug('%s ' %logprefix + 'acquired token') | |
230 | ||
231 | def release_token(self, logprefix=''): | |
232 | self.token_pool.release() | |
233 | self.logger.debug('%s ' %logprefix + 'release token') | |
234 | ||
235 | def run_iface_parallel(self, ifupdownobj, ifacename, op): | |
236 | """ Configures interface in parallel. | |
237 | ||
238 | Executes all its direct dependents in parallel | |
239 | ||
240 | """ | |
241 | ||
242 | self.logger.debug('%s:' %ifacename + ' %s' %op) | |
243 | self.accquire_token(iface) | |
244 | ||
245 | # Each iface can have a list of objects | |
246 | ifaceobjs = ifupdownobj.get_iface_objs(ifacename) | |
247 | if ifaceobjs is None: | |
248 | self.logger.warning('%s: ' %ifacename + 'not found') | |
249 | self.release_token(ifacename) | |
250 | return -1 | |
251 | ||
252 | for ifaceobj in ifaceobjs: | |
253 | # Run dependents | |
254 | dlist = ifaceobj.get_dependents() | |
255 | if dlist is not None and len(dlist) > 0: | |
256 | self.logger.debug('%s:' %ifacename + | |
257 | ' found dependents: %s' %str(dlist)) | |
258 | try: | |
259 | self.release_token(ifacename) | |
260 | self.run_iface_list_parallel(ifacename, ifupdownobj, | |
261 | dlist, op) | |
262 | self.accquire_token(ifacename) | |
263 | except Exception, e: | |
264 | if (ifupdownobj.ignore_error(str(e)) == True): | |
265 | pass | |
266 | else: | |
267 | # Dont bring the iface up if children did not come up | |
268 | self.logger.debug('%s:' %ifacename + | |
269 | ' there was an error bringing %s' %op + | |
270 | ' dependents (%s)', str(e)) | |
271 | ifupdownobj.set_iface_state(ifaceobj, | |
272 | ifaceState.from_str( | |
273 | ifupdownobj.get_subops(op)[0]), | |
274 | ifaceStatus.ERROR) | |
275 | return -1 | |
276 | ||
277 | if (op != 'query' and ifupdownobj.STATE_CHECK == True and | |
278 | ifupdownobj.is_valid_state_transition(ifaceobj, | |
279 | op) == False and ifupdownobj.FORCE == False): | |
280 | self.logger.warning('%s:' %ifacename + ' already %s' %op) | |
281 | continue | |
282 | ||
283 | ||
284 | # Run all sub operations sequentially | |
285 | try: | |
286 | self.logger.debug('%s:' %ifacename + ' running sub-operations') | |
287 | self.run_iface_subops(ifupdownobj, ifaceobj, op) | |
288 | except Exception, e: | |
289 | self.logger.error('%s:' %ifacename + | |
290 | ' error running sub operations (%s)' %str(e)) | |
291 | ||
292 | self.release_token(ifacename) | |
293 | ||
294 | ||
295 | def run_iface_list_parallel(self, parent, ifupdownobj, ifacenames, op): | |
296 | """ Runs interface list in parallel """ | |
297 | ||
298 | running_threads = OrderedDict() | |
299 | err = 0 | |
300 | ||
301 | for ifacename in ifacenames: | |
302 | try: | |
303 | self.accquire_token(parent) | |
304 | running_threads[ifacename] = Thread(None, | |
305 | self.run_iface_parallel, ifacename, | |
306 | args=(ifupdownobj, ifacename, op)) | |
307 | running_threads[ifacename].start() | |
308 | self.release_token(parent) | |
309 | except Exception, e: | |
310 | self.release_token(parent) | |
311 | if (ifupdownobj.ignore_error(str(e)) == True): | |
312 | pass | |
313 | else: | |
314 | raise Exception('error starting thread for iface %s' | |
315 | %ifacename) | |
316 | ||
317 | ||
318 | self.logger.debug('%s' %parent + 'waiting for all the threads ...') | |
319 | for ifacename, t in running_threads.items(): | |
320 | t.join() | |
321 | if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS: | |
322 | err += 1 | |
323 | ||
324 | return err | |
325 | ||
326 | def run_iface_graphs_parallel(self, parent, ifupdownobj, ifacenames, op): | |
327 | """ Runs iface graphs in parallel """ | |
328 | ||
329 | running_threads = OrderedDict() | |
330 | err = 0 | |
331 | ||
332 | for ifacename in ifacenames: | |
333 | try: | |
334 | self.accquire_graph_token(parent) | |
335 | running_threads[ifacename] = Thread(None, | |
336 | self.run_iface_parallel, ifacename, | |
337 | args=(ifupdownobj, ifacename, op)) | |
338 | running_threads[ifacename].start() | |
339 | self.release_graph_token(parent) | |
340 | except Exception, e: | |
341 | self.release_graph_token(parent) | |
342 | if (ifupdownobj.ignore_error(str(e)) == True): | |
343 | pass | |
344 | else: | |
345 | raise Exception('error starting thread for iface %s' | |
346 | %ifacename) | |
347 | ||
348 | self.logger.info('%s' %parent + 'waiting for all the threads ...') | |
349 | for ifacename, t in running_threads.items(): | |
350 | t.join() | |
351 | # Check status of thread | |
352 | # XXX: Check all objs | |
353 | if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS: | |
354 | err += 1 | |
355 | ||
356 | return err | |
357 | ||
358 | def run_iface_dependency_graph_parallel(self, ifupdownobj, dependency_graph, | |
359 | operation): | |
360 | """ Runs iface dependeny graph in parallel. | |
361 | ||
362 | arguments: | |
363 | ifupdownobj -- ifupdown object (used for getting and updating iface | |
364 | object state) | |
365 | dependency_graph -- dependency graph with | |
366 | operation -- 'up' or 'down' or 'query' | |
367 | ||
368 | """ | |
369 | ||
370 | self.logger.debug('running dependency graph in parallel ..') | |
371 | ||
372 | run_queue = [] | |
373 | ||
374 | # Build a list of ifaces that dont have any dependencies | |
375 | for ifacename in dependency_graph.keys(): | |
376 | if ifupdownobj.get_iface_refcnt(ifacename) == 0: | |
377 | run_queue.append(ifacename) | |
378 | ||
379 | self.logger.debug('graph roots (interfaces that dont have dependents):' + | |
380 | ' %s' %str(run_queue)) | |
381 | ||
382 | self.init_tokens(ifupdownobj.get_njobs()) | |
383 | ||
384 | return self.run_iface_list_parallel('main', ifupdownobj, run_queue, | |
385 | operation) | |
386 | ||
387 | # OR | |
388 | # Run one graph at a time | |
389 | #for iface in run_queue: | |
390 | # self.run_iface_list_parallel('main', ifupdownobj, [iface], | |
391 | # operation) | |
392 |