]>
Commit | Line | Data |
---|---|---|
a6f80f0e | 1 | #!/usr/bin/python |
3e8ee54f | 2 | # |
3 | # Copyright 2013. Cumulus Networks, Inc. | |
4 | # Author: Roopa Prabhu, roopa@cumulusnetworks.com | |
5 | # | |
6 | # ifaceScheduler -- | |
7 | # interface scheduler | |
8 | # | |
a6f80f0e | 9 | |
10 | import os | |
11 | import re | |
12 | from statemanager import * | |
13 | from iface import * | |
14 | from graph import * | |
15 | from collections import deque | |
16 | from collections import OrderedDict | |
17 | import imp | |
18 | import pprint | |
19 | import logging | |
20 | from graph import * | |
21 | from collections import deque | |
22 | from threading import * | |
23 | from ifupdownbase import * | |
24 | ||
25 | class ifaceScheduler(ifupdownBase): | |
26 | """ scheduler to schedule configuration of interfaces. | |
27 | ||
28 | ||
29 | supports scheduling of interfaces serially in plain interface list | |
30 | or dependency graph format. | |
31 | """ | |
32 | ||
3e8ee54f | 33 | def __init__(self, force=False): |
a6f80f0e | 34 | self.logger = logging.getLogger('ifupdown.' + |
35 | self.__class__.__name__) | |
3e8ee54f | 36 | self.FORCE = force |
a6f80f0e | 37 | |
37c0543d | 38 | def run_iface_subop(self, ifupdownobj, ifaceobj, op, subop, mlist, cenv): |
a6f80f0e | 39 | """ Runs sub operation on an interface """ |
40 | ||
41 | self.logger.debug('%s: ' %ifaceobj.get_name() + 'op %s' %op + | |
42 | ' subop = %s' %subop) | |
43 | ||
37c0543d | 44 | for mname in mlist: |
45 | m = ifupdownobj.modules.get(mname) | |
a6f80f0e | 46 | err = 0 |
47 | try: | |
37c0543d | 48 | if hasattr(m, 'run') == True: |
49 | self.logger.debug('%s: %s : running module %s' | |
50 | %(ifaceobj.get_name(), subop, mname)) | |
739f665b | 51 | if op == 'query-checkcurr': |
37c0543d | 52 | # Dont check state if the interface object was |
53 | # auto generated | |
54 | if ((ifaceobj.priv_flags & ifupdownobj.BUILTIN) != 0 or | |
55 | (ifaceobj.priv_flags & ifupdownobj.NOCONFIG) != 0): | |
56 | continue | |
57 | m.run(ifaceobj, subop, | |
a6f80f0e | 58 | query_ifaceobj=ifupdownobj.create_ifaceobjcurr( |
739f665b | 59 | ifaceobj)) |
a6f80f0e | 60 | else: |
61 | m.run(ifaceobj, subop) | |
a6f80f0e | 62 | except Exception, e: |
63 | err = 1 | |
3e8ee54f | 64 | self.log_error(str(e)) |
a6f80f0e | 65 | finally: |
739f665b | 66 | if op[:5] != 'query': |
a6f80f0e | 67 | if err == 1: |
68 | ifupdownobj.set_iface_state(ifaceobj, | |
69 | ifaceState.from_str(subop), | |
70 | ifaceStatus.ERROR) | |
71 | else: | |
72 | ifupdownobj.set_iface_state(ifaceobj, | |
73 | ifaceState.from_str(subop), | |
74 | ifaceStatus.SUCCESS) | |
75 | ||
37c0543d | 76 | # execute /etc/network/ scripts |
77 | subop_dict = ifupdownobj.operations_compat.get(op) | |
78 | if subop_dict is None: return | |
79 | for mname in subop_dict.get(subop): | |
80 | self.logger.debug('%s: %s : running script %s' | |
81 | %(ifaceobj.get_name(), subop, mname)) | |
82 | try: | |
83 | self.exec_command(mname, cmdenv=cenv) | |
84 | except Exception, e: | |
85 | err = 1 | |
86 | self.log_error(str(e)) | |
87 | ||
a6f80f0e | 88 | def run_iface_subops(self, ifupdownobj, ifaceobj, op): |
89 | """ Runs all sub operations on an interface """ | |
90 | ||
91 | # For backward compatibility execute scripts with | |
92 | # environent set | |
93 | cenv = ifupdownobj.generate_running_env(ifaceobj, op) | |
94 | ||
95 | # Each sub operation has a module list | |
96 | subopdict = ifupdownobj.operations.get(op) | |
37c0543d | 97 | for subop, mlist in subopdict.items(): |
98 | self.run_iface_subop(ifupdownobj, ifaceobj, op, subop, mlist, cenv) | |
a6f80f0e | 99 | |
100 | ||
101 | def run_iface(self, ifupdownobj, ifacename, op): | |
102 | """ Runs operation on an interface """ | |
103 | ||
104 | ifaceobjs = ifupdownobj.get_iface_objs(ifacename) | |
105 | for i in ifaceobjs: | |
106 | if (op != 'query' and ifupdownobj.STATE_CHECK == True and | |
107 | ifupdownobj.is_valid_state_transition(i, op) == False and | |
108 | ifupdownobj.FORCE == False): | |
109 | self.logger.warning('%s' %ifacename + | |
110 | ' already %s' %op) | |
111 | continue | |
112 | ||
113 | self.run_iface_subops(ifupdownobj, i, op) | |
114 | ||
115 | ||
116 | def run_iface_list(self, ifupdownobj, ifacenames, operation, | |
117 | sorted_by_dependency=False): | |
118 | """ Runs interface list serially executing all sub operations on | |
119 | each interface at a time. """ | |
120 | ||
121 | self.logger.debug('run_iface_list: running interface list for ' + | |
122 | 'operation %s' %operation) | |
123 | ||
124 | iface_run_queue = deque(ifacenames) | |
125 | for i in range(0, len(iface_run_queue)): | |
126 | if operation == 'up': | |
127 | # XXX: simplify this | |
128 | if sorted_by_dependency == True: | |
129 | ifacename = iface_run_queue.pop() | |
130 | else: | |
131 | ifacename = iface_run_queue.popleft() | |
132 | else: | |
133 | if sorted_by_dependency == True: | |
134 | ifacename = iface_run_queue.popleft() | |
135 | else: | |
136 | ifacename = iface_run_queue.pop() | |
137 | ||
138 | try: | |
139 | self.run_iface(ifupdownobj, ifacename, operation) | |
140 | except Exception, e: | |
3e8ee54f | 141 | self.log_error(str(e)) |
a6f80f0e | 142 | |
143 | def run_iface_list_subop(self, ifupdownobj, ifacenames, op, subop, mdict, | |
144 | sorted_by_dependency=False): | |
145 | """ Runs interface list through sub operation handler. """ | |
146 | ||
37c0543d | 147 | self.logger.debug('running sub operation %s on all given interfaces' |
148 | %subop) | |
a6f80f0e | 149 | iface_run_queue = deque(ifacenames) |
150 | for i in range(0, len(iface_run_queue)): | |
151 | if op == 'up': | |
152 | # XXX: simplify this | |
153 | if sorted_by_dependency == True: | |
154 | ifacename = iface_run_queue.pop() | |
155 | else: | |
156 | ifacename = iface_run_queue.popleft() | |
157 | else: | |
158 | if sorted_by_dependency == True: | |
159 | ifacename = iface_run_queue.popleft() | |
160 | else: | |
161 | ifacename = iface_run_queue.pop() | |
162 | ||
163 | try: | |
164 | ifaceobjs = ifupdownobj.get_iface_objs(ifacename) | |
165 | for ifaceobj in ifaceobjs: | |
166 | if (op != 'query' and ifupdownobj.STATE_CHECK == True and | |
167 | ifupdownobj.is_valid_state_transition(ifaceobj, | |
168 | op) == False and ifupdownobj.FORCE == False): | |
169 | if subop == 'post-down' or subop == 'post-up': | |
170 | self.logger.warning('%s: ' %ifacename + | |
171 | ' already %s' %op) | |
172 | continue | |
173 | ||
174 | cenv = ifupdownobj.generate_running_env(ifaceobj, op) | |
175 | self.run_iface_subop(ifupdownobj, ifaceobj, op, subop, | |
176 | mdict, cenv) | |
177 | except Exception, e: | |
3e8ee54f | 178 | self.log_error(str(e)) |
a6f80f0e | 179 | |
180 | def run_iface_list_stages(self, ifupdownobj, ifacenames, op, | |
181 | sorted_by_dependency=False): | |
182 | """ Runs interface list through sub operations handler | |
183 | ||
184 | Unlike run_iface_list, this method executes a sub operation on the | |
185 | entire interface list before proceeding to the next sub-operation. | |
186 | ie operation 'pre-up' is run through the entire interface list before | |
187 | 'up' | |
188 | """ | |
189 | ||
a6f80f0e | 190 | # Each sub operation has a module list |
191 | subopdict = ifupdownobj.operations.get(op) | |
192 | for subop, mdict in subopdict.items(): | |
193 | self.run_iface_list_subop(ifupdownobj, ifacenames, op, subop, mdict, | |
194 | sorted_by_dependency) | |
195 | ||
196 | ||
37c0543d | 197 | def run_iface_dependency_graph(self, ifupdownobj, dependency_graphs, |
198 | operation, indegrees=None, | |
199 | graphsortall=False): | |
a6f80f0e | 200 | """ runs interface dependency graph """ |
201 | ||
a6f80f0e | 202 | |
37c0543d | 203 | if indegrees is None: |
204 | indegrees = OrderedDict() | |
205 | for ifacename in dependency_graphs.keys(): | |
206 | indegrees[ifacename] = ifupdownobj.get_iface_refcnt(ifacename) | |
a6f80f0e | 207 | |
208 | if self.logger.isEnabledFor(logging.DEBUG) == True: | |
209 | self.logger.debug('indegree array :') | |
37c0543d | 210 | self.logger.debug(ifupdownobj.pp.pformat(indegrees)) |
a6f80f0e | 211 | |
212 | try: | |
213 | self.logger.debug('calling topological sort on the graph ...') | |
37c0543d | 214 | if graphsortall == True: |
215 | sorted_ifacenames = graph.topological_sort_graphs_all( | |
216 | dependency_graphs, indegrees) | |
217 | else: | |
218 | sorted_ifacenames = graph.topological_sort_graphs( | |
219 | dependency_graphs, indegrees) | |
cca03c30 | 220 | except Exception: |
a6f80f0e | 221 | raise |
222 | ||
223 | self.logger.debug('sorted iface list = %s' %sorted_ifacenames) | |
224 | ||
225 | #self.run_iface_list(ifupdownobj, sorted_ifacenames, operation, | |
226 | # sorted_by_dependency=True) | |
227 | ||
228 | self.run_iface_list_stages(ifupdownobj, sorted_ifacenames, operation, | |
229 | sorted_by_dependency=True) | |
230 | ||
231 | ||
232 | def init_tokens(self, count): | |
233 | self.token_pool = BoundedSemaphore(count) | |
234 | self.logger.debug('initialized bounded semaphore with %d' %count) | |
235 | ||
236 | def accquire_token(self, logprefix=''): | |
237 | self.token_pool.acquire() | |
238 | self.logger.debug('%s ' %logprefix + 'acquired token') | |
239 | ||
240 | def release_token(self, logprefix=''): | |
241 | self.token_pool.release() | |
242 | self.logger.debug('%s ' %logprefix + 'release token') | |
243 | ||
244 | def run_iface_parallel(self, ifupdownobj, ifacename, op): | |
245 | """ Configures interface in parallel. | |
246 | ||
247 | Executes all its direct dependents in parallel | |
248 | ||
249 | """ | |
250 | ||
251 | self.logger.debug('%s:' %ifacename + ' %s' %op) | |
252 | self.accquire_token(iface) | |
253 | ||
254 | # Each iface can have a list of objects | |
255 | ifaceobjs = ifupdownobj.get_iface_objs(ifacename) | |
256 | if ifaceobjs is None: | |
257 | self.logger.warning('%s: ' %ifacename + 'not found') | |
258 | self.release_token(ifacename) | |
259 | return -1 | |
260 | ||
261 | for ifaceobj in ifaceobjs: | |
262 | # Run dependents | |
263 | dlist = ifaceobj.get_dependents() | |
264 | if dlist is not None and len(dlist) > 0: | |
265 | self.logger.debug('%s:' %ifacename + | |
266 | ' found dependents: %s' %str(dlist)) | |
267 | try: | |
268 | self.release_token(ifacename) | |
269 | self.run_iface_list_parallel(ifacename, ifupdownobj, | |
270 | dlist, op) | |
271 | self.accquire_token(ifacename) | |
272 | except Exception, e: | |
3e8ee54f | 273 | if (self.ignore_error(str(e)) == True): |
a6f80f0e | 274 | pass |
275 | else: | |
276 | # Dont bring the iface up if children did not come up | |
277 | self.logger.debug('%s:' %ifacename + | |
278 | ' there was an error bringing %s' %op + | |
279 | ' dependents (%s)', str(e)) | |
280 | ifupdownobj.set_iface_state(ifaceobj, | |
281 | ifaceState.from_str( | |
282 | ifupdownobj.get_subops(op)[0]), | |
283 | ifaceStatus.ERROR) | |
284 | return -1 | |
285 | ||
286 | if (op != 'query' and ifupdownobj.STATE_CHECK == True and | |
287 | ifupdownobj.is_valid_state_transition(ifaceobj, | |
288 | op) == False and ifupdownobj.FORCE == False): | |
289 | self.logger.warning('%s:' %ifacename + ' already %s' %op) | |
290 | continue | |
291 | ||
292 | ||
293 | # Run all sub operations sequentially | |
294 | try: | |
295 | self.logger.debug('%s:' %ifacename + ' running sub-operations') | |
296 | self.run_iface_subops(ifupdownobj, ifaceobj, op) | |
297 | except Exception, e: | |
298 | self.logger.error('%s:' %ifacename + | |
299 | ' error running sub operations (%s)' %str(e)) | |
300 | ||
301 | self.release_token(ifacename) | |
302 | ||
303 | ||
304 | def run_iface_list_parallel(self, parent, ifupdownobj, ifacenames, op): | |
305 | """ Runs interface list in parallel """ | |
306 | ||
307 | running_threads = OrderedDict() | |
308 | err = 0 | |
309 | ||
310 | for ifacename in ifacenames: | |
311 | try: | |
312 | self.accquire_token(parent) | |
313 | running_threads[ifacename] = Thread(None, | |
314 | self.run_iface_parallel, ifacename, | |
315 | args=(ifupdownobj, ifacename, op)) | |
316 | running_threads[ifacename].start() | |
317 | self.release_token(parent) | |
318 | except Exception, e: | |
319 | self.release_token(parent) | |
320 | if (ifupdownobj.ignore_error(str(e)) == True): | |
321 | pass | |
322 | else: | |
323 | raise Exception('error starting thread for iface %s' | |
324 | %ifacename) | |
325 | ||
326 | ||
327 | self.logger.debug('%s' %parent + 'waiting for all the threads ...') | |
328 | for ifacename, t in running_threads.items(): | |
329 | t.join() | |
330 | if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS: | |
331 | err += 1 | |
332 | ||
333 | return err | |
334 | ||
335 | def run_iface_graphs_parallel(self, parent, ifupdownobj, ifacenames, op): | |
336 | """ Runs iface graphs in parallel """ | |
337 | ||
338 | running_threads = OrderedDict() | |
339 | err = 0 | |
340 | ||
341 | for ifacename in ifacenames: | |
342 | try: | |
343 | self.accquire_graph_token(parent) | |
344 | running_threads[ifacename] = Thread(None, | |
345 | self.run_iface_parallel, ifacename, | |
346 | args=(ifupdownobj, ifacename, op)) | |
347 | running_threads[ifacename].start() | |
348 | self.release_graph_token(parent) | |
349 | except Exception, e: | |
350 | self.release_graph_token(parent) | |
351 | if (ifupdownobj.ignore_error(str(e)) == True): | |
352 | pass | |
353 | else: | |
354 | raise Exception('error starting thread for iface %s' | |
355 | %ifacename) | |
356 | ||
357 | self.logger.info('%s' %parent + 'waiting for all the threads ...') | |
358 | for ifacename, t in running_threads.items(): | |
359 | t.join() | |
360 | # Check status of thread | |
361 | # XXX: Check all objs | |
362 | if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS: | |
363 | err += 1 | |
364 | ||
365 | return err | |
366 | ||
367 | def run_iface_dependency_graph_parallel(self, ifupdownobj, dependency_graph, | |
368 | operation): | |
369 | """ Runs iface dependeny graph in parallel. | |
370 | ||
371 | arguments: | |
372 | ifupdownobj -- ifupdown object (used for getting and updating iface | |
373 | object state) | |
374 | dependency_graph -- dependency graph with | |
375 | operation -- 'up' or 'down' or 'query' | |
376 | ||
377 | """ | |
378 | ||
379 | self.logger.debug('running dependency graph in parallel ..') | |
380 | ||
381 | run_queue = [] | |
382 | ||
383 | # Build a list of ifaces that dont have any dependencies | |
384 | for ifacename in dependency_graph.keys(): | |
385 | if ifupdownobj.get_iface_refcnt(ifacename) == 0: | |
386 | run_queue.append(ifacename) | |
387 | ||
388 | self.logger.debug('graph roots (interfaces that dont have dependents):' + | |
389 | ' %s' %str(run_queue)) | |
390 | ||
391 | self.init_tokens(ifupdownobj.get_njobs()) | |
392 | ||
393 | return self.run_iface_list_parallel('main', ifupdownobj, run_queue, | |
394 | operation) | |
395 | ||
396 | # OR | |
397 | # Run one graph at a time | |
398 | #for iface in run_queue: | |
399 | # self.run_iface_list_parallel('main', ifupdownobj, [iface], | |
400 | # operation) | |
401 |