]>
Commit | Line | Data |
---|---|---|
a6f80f0e | 1 | #!/usr/bin/python |
3e8ee54f | 2 | # |
3 | # Copyright 2013. Cumulus Networks, Inc. | |
4 | # Author: Roopa Prabhu, roopa@cumulusnetworks.com | |
5 | # | |
6 | # ifaceScheduler -- | |
7 | # interface scheduler | |
8 | # | |
a6f80f0e | 9 | |
10 | import os | |
11 | import re | |
12 | from statemanager import * | |
13 | from iface import * | |
14 | from graph import * | |
15 | from collections import deque | |
16 | from collections import OrderedDict | |
17 | import imp | |
18 | import pprint | |
19 | import logging | |
20 | from graph import * | |
21 | from collections import deque | |
22 | from threading import * | |
23 | from ifupdownbase import * | |
24 | ||
25 | class ifaceScheduler(ifupdownBase): | |
26 | """ scheduler to schedule configuration of interfaces. | |
27 | ||
28 | ||
29 | supports scheduling of interfaces serially in plain interface list | |
30 | or dependency graph format. | |
31 | """ | |
32 | ||
3e8ee54f | 33 | def __init__(self, force=False): |
a6f80f0e | 34 | self.logger = logging.getLogger('ifupdown.' + |
35 | self.__class__.__name__) | |
3e8ee54f | 36 | self.FORCE = force |
a6f80f0e | 37 | |
38 | def run_iface_subop(self, ifupdownobj, ifaceobj, op, subop, mdict, cenv): | |
39 | """ Runs sub operation on an interface """ | |
40 | ||
41 | self.logger.debug('%s: ' %ifaceobj.get_name() + 'op %s' %op + | |
42 | ' subop = %s' %subop) | |
43 | ||
44 | for mname, mdata in mdict.items(): | |
45 | m = mdata.get('module') | |
46 | err = 0 | |
47 | try: | |
48 | if (mdata.get('ftype') == 'pmodule' and | |
49 | hasattr(m, 'run') == True): | |
50 | self.logger.debug('%s: ' %ifaceobj.get_name() + | |
51 | 'running module %s' %mname + | |
52 | ' op %s' %op + ' subop %s' %subop) | |
739f665b | 53 | if op == 'query-checkcurr': |
54 | m.run(ifaceobj, subop, query_check=True, | |
a6f80f0e | 55 | query_ifaceobj=ifupdownobj.create_ifaceobjcurr( |
739f665b | 56 | ifaceobj)) |
a6f80f0e | 57 | else: |
58 | m.run(ifaceobj, subop) | |
59 | else: | |
60 | self.logger.debug('%s: ' %ifaceobj.get_name() + | |
61 | 'running script %s' %mname + | |
62 | ' op %s' %op + ' subop %s' %subop) | |
63 | self.exec_command(m, cmdenv=cenv) | |
64 | except Exception, e: | |
65 | err = 1 | |
3e8ee54f | 66 | self.log_error(str(e)) |
a6f80f0e | 67 | finally: |
739f665b | 68 | if op[:5] != 'query': |
a6f80f0e | 69 | if err == 1: |
70 | ifupdownobj.set_iface_state(ifaceobj, | |
71 | ifaceState.from_str(subop), | |
72 | ifaceStatus.ERROR) | |
73 | else: | |
74 | ifupdownobj.set_iface_state(ifaceobj, | |
75 | ifaceState.from_str(subop), | |
76 | ifaceStatus.SUCCESS) | |
77 | ||
78 | def run_iface_subops(self, ifupdownobj, ifaceobj, op): | |
79 | """ Runs all sub operations on an interface """ | |
80 | ||
81 | # For backward compatibility execute scripts with | |
82 | # environent set | |
83 | cenv = ifupdownobj.generate_running_env(ifaceobj, op) | |
84 | ||
85 | # Each sub operation has a module list | |
86 | subopdict = ifupdownobj.operations.get(op) | |
87 | for subop, mdict in subopdict.items(): | |
88 | self.run_iface_subop(ifupdownobj, ifaceobj, op, subop, mdict, cenv) | |
89 | ||
90 | ||
91 | def run_iface(self, ifupdownobj, ifacename, op): | |
92 | """ Runs operation on an interface """ | |
93 | ||
94 | ifaceobjs = ifupdownobj.get_iface_objs(ifacename) | |
95 | for i in ifaceobjs: | |
96 | if (op != 'query' and ifupdownobj.STATE_CHECK == True and | |
97 | ifupdownobj.is_valid_state_transition(i, op) == False and | |
98 | ifupdownobj.FORCE == False): | |
99 | self.logger.warning('%s' %ifacename + | |
100 | ' already %s' %op) | |
101 | continue | |
102 | ||
103 | self.run_iface_subops(ifupdownobj, i, op) | |
104 | ||
105 | ||
106 | def run_iface_list(self, ifupdownobj, ifacenames, operation, | |
107 | sorted_by_dependency=False): | |
108 | """ Runs interface list serially executing all sub operations on | |
109 | each interface at a time. """ | |
110 | ||
111 | self.logger.debug('run_iface_list: running interface list for ' + | |
112 | 'operation %s' %operation) | |
113 | ||
114 | iface_run_queue = deque(ifacenames) | |
115 | for i in range(0, len(iface_run_queue)): | |
116 | if operation == 'up': | |
117 | # XXX: simplify this | |
118 | if sorted_by_dependency == True: | |
119 | ifacename = iface_run_queue.pop() | |
120 | else: | |
121 | ifacename = iface_run_queue.popleft() | |
122 | else: | |
123 | if sorted_by_dependency == True: | |
124 | ifacename = iface_run_queue.popleft() | |
125 | else: | |
126 | ifacename = iface_run_queue.pop() | |
127 | ||
128 | try: | |
129 | self.run_iface(ifupdownobj, ifacename, operation) | |
130 | except Exception, e: | |
3e8ee54f | 131 | self.log_error(str(e)) |
a6f80f0e | 132 | |
133 | def run_iface_list_subop(self, ifupdownobj, ifacenames, op, subop, mdict, | |
134 | sorted_by_dependency=False): | |
135 | """ Runs interface list through sub operation handler. """ | |
136 | ||
137 | self.logger.debug('running sub operation %s on all given interfaces' %op) | |
a6f80f0e | 138 | iface_run_queue = deque(ifacenames) |
139 | for i in range(0, len(iface_run_queue)): | |
140 | if op == 'up': | |
141 | # XXX: simplify this | |
142 | if sorted_by_dependency == True: | |
143 | ifacename = iface_run_queue.pop() | |
144 | else: | |
145 | ifacename = iface_run_queue.popleft() | |
146 | else: | |
147 | if sorted_by_dependency == True: | |
148 | ifacename = iface_run_queue.popleft() | |
149 | else: | |
150 | ifacename = iface_run_queue.pop() | |
151 | ||
152 | try: | |
153 | ifaceobjs = ifupdownobj.get_iface_objs(ifacename) | |
154 | for ifaceobj in ifaceobjs: | |
155 | if (op != 'query' and ifupdownobj.STATE_CHECK == True and | |
156 | ifupdownobj.is_valid_state_transition(ifaceobj, | |
157 | op) == False and ifupdownobj.FORCE == False): | |
158 | if subop == 'post-down' or subop == 'post-up': | |
159 | self.logger.warning('%s: ' %ifacename + | |
160 | ' already %s' %op) | |
161 | continue | |
162 | ||
163 | cenv = ifupdownobj.generate_running_env(ifaceobj, op) | |
164 | self.run_iface_subop(ifupdownobj, ifaceobj, op, subop, | |
165 | mdict, cenv) | |
166 | except Exception, e: | |
3e8ee54f | 167 | self.log_error(str(e)) |
a6f80f0e | 168 | |
169 | def run_iface_list_stages(self, ifupdownobj, ifacenames, op, | |
170 | sorted_by_dependency=False): | |
171 | """ Runs interface list through sub operations handler | |
172 | ||
173 | Unlike run_iface_list, this method executes a sub operation on the | |
174 | entire interface list before proceeding to the next sub-operation. | |
175 | ie operation 'pre-up' is run through the entire interface list before | |
176 | 'up' | |
177 | """ | |
178 | ||
179 | self.logger.debug('run_iface_list_stages: running interface list for %s' | |
180 | %op) | |
181 | ||
182 | # Each sub operation has a module list | |
183 | subopdict = ifupdownobj.operations.get(op) | |
184 | for subop, mdict in subopdict.items(): | |
185 | self.run_iface_list_subop(ifupdownobj, ifacenames, op, subop, mdict, | |
186 | sorted_by_dependency) | |
187 | ||
188 | ||
189 | def run_iface_dependency_graph(self, ifupdownobj, dependency_graph, | |
190 | operation): | |
191 | """ runs interface dependency graph """ | |
192 | ||
193 | indegrees = OrderedDict() | |
194 | ||
195 | self.logger.debug('creating indegree array ...') | |
196 | for ifacename in dependency_graph.keys(): | |
197 | indegrees[ifacename] = ifupdownobj.get_iface_refcnt(ifacename) | |
198 | ||
199 | if self.logger.isEnabledFor(logging.DEBUG) == True: | |
200 | self.logger.debug('indegree array :') | |
201 | ifupdownobj.pp.pprint(indegrees) | |
202 | ||
203 | try: | |
204 | self.logger.debug('calling topological sort on the graph ...') | |
cca03c30 | 205 | sorted_ifacenames = graph.topological_sort_graphs( |
206 | dependency_graph, indegrees) | |
207 | except Exception: | |
a6f80f0e | 208 | raise |
209 | ||
210 | self.logger.debug('sorted iface list = %s' %sorted_ifacenames) | |
211 | ||
212 | #self.run_iface_list(ifupdownobj, sorted_ifacenames, operation, | |
213 | # sorted_by_dependency=True) | |
214 | ||
215 | self.run_iface_list_stages(ifupdownobj, sorted_ifacenames, operation, | |
216 | sorted_by_dependency=True) | |
217 | ||
218 | ||
219 | def init_tokens(self, count): | |
220 | self.token_pool = BoundedSemaphore(count) | |
221 | self.logger.debug('initialized bounded semaphore with %d' %count) | |
222 | ||
223 | def accquire_token(self, logprefix=''): | |
224 | self.token_pool.acquire() | |
225 | self.logger.debug('%s ' %logprefix + 'acquired token') | |
226 | ||
227 | def release_token(self, logprefix=''): | |
228 | self.token_pool.release() | |
229 | self.logger.debug('%s ' %logprefix + 'release token') | |
230 | ||
231 | def run_iface_parallel(self, ifupdownobj, ifacename, op): | |
232 | """ Configures interface in parallel. | |
233 | ||
234 | Executes all its direct dependents in parallel | |
235 | ||
236 | """ | |
237 | ||
238 | self.logger.debug('%s:' %ifacename + ' %s' %op) | |
239 | self.accquire_token(iface) | |
240 | ||
241 | # Each iface can have a list of objects | |
242 | ifaceobjs = ifupdownobj.get_iface_objs(ifacename) | |
243 | if ifaceobjs is None: | |
244 | self.logger.warning('%s: ' %ifacename + 'not found') | |
245 | self.release_token(ifacename) | |
246 | return -1 | |
247 | ||
248 | for ifaceobj in ifaceobjs: | |
249 | # Run dependents | |
250 | dlist = ifaceobj.get_dependents() | |
251 | if dlist is not None and len(dlist) > 0: | |
252 | self.logger.debug('%s:' %ifacename + | |
253 | ' found dependents: %s' %str(dlist)) | |
254 | try: | |
255 | self.release_token(ifacename) | |
256 | self.run_iface_list_parallel(ifacename, ifupdownobj, | |
257 | dlist, op) | |
258 | self.accquire_token(ifacename) | |
259 | except Exception, e: | |
3e8ee54f | 260 | if (self.ignore_error(str(e)) == True): |
a6f80f0e | 261 | pass |
262 | else: | |
263 | # Dont bring the iface up if children did not come up | |
264 | self.logger.debug('%s:' %ifacename + | |
265 | ' there was an error bringing %s' %op + | |
266 | ' dependents (%s)', str(e)) | |
267 | ifupdownobj.set_iface_state(ifaceobj, | |
268 | ifaceState.from_str( | |
269 | ifupdownobj.get_subops(op)[0]), | |
270 | ifaceStatus.ERROR) | |
271 | return -1 | |
272 | ||
273 | if (op != 'query' and ifupdownobj.STATE_CHECK == True and | |
274 | ifupdownobj.is_valid_state_transition(ifaceobj, | |
275 | op) == False and ifupdownobj.FORCE == False): | |
276 | self.logger.warning('%s:' %ifacename + ' already %s' %op) | |
277 | continue | |
278 | ||
279 | ||
280 | # Run all sub operations sequentially | |
281 | try: | |
282 | self.logger.debug('%s:' %ifacename + ' running sub-operations') | |
283 | self.run_iface_subops(ifupdownobj, ifaceobj, op) | |
284 | except Exception, e: | |
285 | self.logger.error('%s:' %ifacename + | |
286 | ' error running sub operations (%s)' %str(e)) | |
287 | ||
288 | self.release_token(ifacename) | |
289 | ||
290 | ||
291 | def run_iface_list_parallel(self, parent, ifupdownobj, ifacenames, op): | |
292 | """ Runs interface list in parallel """ | |
293 | ||
294 | running_threads = OrderedDict() | |
295 | err = 0 | |
296 | ||
297 | for ifacename in ifacenames: | |
298 | try: | |
299 | self.accquire_token(parent) | |
300 | running_threads[ifacename] = Thread(None, | |
301 | self.run_iface_parallel, ifacename, | |
302 | args=(ifupdownobj, ifacename, op)) | |
303 | running_threads[ifacename].start() | |
304 | self.release_token(parent) | |
305 | except Exception, e: | |
306 | self.release_token(parent) | |
307 | if (ifupdownobj.ignore_error(str(e)) == True): | |
308 | pass | |
309 | else: | |
310 | raise Exception('error starting thread for iface %s' | |
311 | %ifacename) | |
312 | ||
313 | ||
314 | self.logger.debug('%s' %parent + 'waiting for all the threads ...') | |
315 | for ifacename, t in running_threads.items(): | |
316 | t.join() | |
317 | if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS: | |
318 | err += 1 | |
319 | ||
320 | return err | |
321 | ||
322 | def run_iface_graphs_parallel(self, parent, ifupdownobj, ifacenames, op): | |
323 | """ Runs iface graphs in parallel """ | |
324 | ||
325 | running_threads = OrderedDict() | |
326 | err = 0 | |
327 | ||
328 | for ifacename in ifacenames: | |
329 | try: | |
330 | self.accquire_graph_token(parent) | |
331 | running_threads[ifacename] = Thread(None, | |
332 | self.run_iface_parallel, ifacename, | |
333 | args=(ifupdownobj, ifacename, op)) | |
334 | running_threads[ifacename].start() | |
335 | self.release_graph_token(parent) | |
336 | except Exception, e: | |
337 | self.release_graph_token(parent) | |
338 | if (ifupdownobj.ignore_error(str(e)) == True): | |
339 | pass | |
340 | else: | |
341 | raise Exception('error starting thread for iface %s' | |
342 | %ifacename) | |
343 | ||
344 | self.logger.info('%s' %parent + 'waiting for all the threads ...') | |
345 | for ifacename, t in running_threads.items(): | |
346 | t.join() | |
347 | # Check status of thread | |
348 | # XXX: Check all objs | |
349 | if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS: | |
350 | err += 1 | |
351 | ||
352 | return err | |
353 | ||
354 | def run_iface_dependency_graph_parallel(self, ifupdownobj, dependency_graph, | |
355 | operation): | |
356 | """ Runs iface dependeny graph in parallel. | |
357 | ||
358 | arguments: | |
359 | ifupdownobj -- ifupdown object (used for getting and updating iface | |
360 | object state) | |
361 | dependency_graph -- dependency graph with | |
362 | operation -- 'up' or 'down' or 'query' | |
363 | ||
364 | """ | |
365 | ||
366 | self.logger.debug('running dependency graph in parallel ..') | |
367 | ||
368 | run_queue = [] | |
369 | ||
370 | # Build a list of ifaces that dont have any dependencies | |
371 | for ifacename in dependency_graph.keys(): | |
372 | if ifupdownobj.get_iface_refcnt(ifacename) == 0: | |
373 | run_queue.append(ifacename) | |
374 | ||
375 | self.logger.debug('graph roots (interfaces that dont have dependents):' + | |
376 | ' %s' %str(run_queue)) | |
377 | ||
378 | self.init_tokens(ifupdownobj.get_njobs()) | |
379 | ||
380 | return self.run_iface_list_parallel('main', ifupdownobj, run_queue, | |
381 | operation) | |
382 | ||
383 | # OR | |
384 | # Run one graph at a time | |
385 | #for iface in run_queue: | |
386 | # self.run_iface_list_parallel('main', ifupdownobj, [iface], | |
387 | # operation) | |
388 |