]> git.proxmox.com Git - mirror_ifupdown2.git/blame - pkg/scheduler.py
minor init.d fixes
[mirror_ifupdown2.git] / pkg / scheduler.py
CommitLineData
a6f80f0e 1#!/usr/bin/python
3e8ee54f 2#
3# Copyright 2013. Cumulus Networks, Inc.
4# Author: Roopa Prabhu, roopa@cumulusnetworks.com
5#
6# ifaceScheduler --
7# interface scheduler
8#
a6f80f0e 9
10import os
11import re
12from statemanager import *
13from iface import *
14from graph import *
15from collections import deque
16from collections import OrderedDict
17import imp
18import pprint
19import logging
20from graph import *
21from collections import deque
22from threading import *
23from ifupdownbase import *
24
25class ifaceScheduler(ifupdownBase):
26 """ scheduler to schedule configuration of interfaces.
27
28
29 supports scheduling of interfaces serially in plain interface list
30 or dependency graph format.
31 """
32
3e8ee54f 33 def __init__(self, force=False):
a6f80f0e 34 self.logger = logging.getLogger('ifupdown.' +
35 self.__class__.__name__)
3e8ee54f 36 self.FORCE = force
a6f80f0e 37
38 def run_iface_subop(self, ifupdownobj, ifaceobj, op, subop, mdict, cenv):
39 """ Runs sub operation on an interface """
40
41 self.logger.debug('%s: ' %ifaceobj.get_name() + 'op %s' %op +
42 ' subop = %s' %subop)
43
44 for mname, mdata in mdict.items():
45 m = mdata.get('module')
46 err = 0
47 try:
48 if (mdata.get('ftype') == 'pmodule' and
49 hasattr(m, 'run') == True):
50 self.logger.debug('%s: ' %ifaceobj.get_name() +
51 'running module %s' %mname +
52 ' op %s' %op + ' subop %s' %subop)
739f665b 53 if op == 'query-checkcurr':
54 m.run(ifaceobj, subop, query_check=True,
a6f80f0e 55 query_ifaceobj=ifupdownobj.create_ifaceobjcurr(
739f665b 56 ifaceobj))
a6f80f0e 57 else:
58 m.run(ifaceobj, subop)
59 else:
60 self.logger.debug('%s: ' %ifaceobj.get_name() +
61 'running script %s' %mname +
62 ' op %s' %op + ' subop %s' %subop)
63 self.exec_command(m, cmdenv=cenv)
64 except Exception, e:
65 err = 1
3e8ee54f 66 self.log_error(str(e))
a6f80f0e 67 finally:
739f665b 68 if op[:5] != 'query':
a6f80f0e 69 if err == 1:
70 ifupdownobj.set_iface_state(ifaceobj,
71 ifaceState.from_str(subop),
72 ifaceStatus.ERROR)
73 else:
74 ifupdownobj.set_iface_state(ifaceobj,
75 ifaceState.from_str(subop),
76 ifaceStatus.SUCCESS)
77
78 def run_iface_subops(self, ifupdownobj, ifaceobj, op):
79 """ Runs all sub operations on an interface """
80
81 # For backward compatibility execute scripts with
82 # environent set
83 cenv = ifupdownobj.generate_running_env(ifaceobj, op)
84
85 # Each sub operation has a module list
86 subopdict = ifupdownobj.operations.get(op)
87 for subop, mdict in subopdict.items():
88 self.run_iface_subop(ifupdownobj, ifaceobj, op, subop, mdict, cenv)
89
90
91 def run_iface(self, ifupdownobj, ifacename, op):
92 """ Runs operation on an interface """
93
94 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
95 for i in ifaceobjs:
96 if (op != 'query' and ifupdownobj.STATE_CHECK == True and
97 ifupdownobj.is_valid_state_transition(i, op) == False and
98 ifupdownobj.FORCE == False):
99 self.logger.warning('%s' %ifacename +
100 ' already %s' %op)
101 continue
102
103 self.run_iface_subops(ifupdownobj, i, op)
104
105
106 def run_iface_list(self, ifupdownobj, ifacenames, operation,
107 sorted_by_dependency=False):
108 """ Runs interface list serially executing all sub operations on
109 each interface at a time. """
110
111 self.logger.debug('run_iface_list: running interface list for ' +
112 'operation %s' %operation)
113
114 iface_run_queue = deque(ifacenames)
115 for i in range(0, len(iface_run_queue)):
116 if operation == 'up':
117 # XXX: simplify this
118 if sorted_by_dependency == True:
119 ifacename = iface_run_queue.pop()
120 else:
121 ifacename = iface_run_queue.popleft()
122 else:
123 if sorted_by_dependency == True:
124 ifacename = iface_run_queue.popleft()
125 else:
126 ifacename = iface_run_queue.pop()
127
128 try:
129 self.run_iface(ifupdownobj, ifacename, operation)
130 except Exception, e:
3e8ee54f 131 self.log_error(str(e))
a6f80f0e 132
133 def run_iface_list_subop(self, ifupdownobj, ifacenames, op, subop, mdict,
134 sorted_by_dependency=False):
135 """ Runs interface list through sub operation handler. """
136
137 self.logger.debug('running sub operation %s on all given interfaces' %op)
a6f80f0e 138 iface_run_queue = deque(ifacenames)
139 for i in range(0, len(iface_run_queue)):
140 if op == 'up':
141 # XXX: simplify this
142 if sorted_by_dependency == True:
143 ifacename = iface_run_queue.pop()
144 else:
145 ifacename = iface_run_queue.popleft()
146 else:
147 if sorted_by_dependency == True:
148 ifacename = iface_run_queue.popleft()
149 else:
150 ifacename = iface_run_queue.pop()
151
152 try:
153 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
154 for ifaceobj in ifaceobjs:
155 if (op != 'query' and ifupdownobj.STATE_CHECK == True and
156 ifupdownobj.is_valid_state_transition(ifaceobj,
157 op) == False and ifupdownobj.FORCE == False):
158 if subop == 'post-down' or subop == 'post-up':
159 self.logger.warning('%s: ' %ifacename +
160 ' already %s' %op)
161 continue
162
163 cenv = ifupdownobj.generate_running_env(ifaceobj, op)
164 self.run_iface_subop(ifupdownobj, ifaceobj, op, subop,
165 mdict, cenv)
166 except Exception, e:
3e8ee54f 167 self.log_error(str(e))
a6f80f0e 168
169 def run_iface_list_stages(self, ifupdownobj, ifacenames, op,
170 sorted_by_dependency=False):
171 """ Runs interface list through sub operations handler
172
173 Unlike run_iface_list, this method executes a sub operation on the
174 entire interface list before proceeding to the next sub-operation.
175 ie operation 'pre-up' is run through the entire interface list before
176 'up'
177 """
178
179 self.logger.debug('run_iface_list_stages: running interface list for %s'
180 %op)
181
182 # Each sub operation has a module list
183 subopdict = ifupdownobj.operations.get(op)
184 for subop, mdict in subopdict.items():
185 self.run_iface_list_subop(ifupdownobj, ifacenames, op, subop, mdict,
186 sorted_by_dependency)
187
188
189 def run_iface_dependency_graph(self, ifupdownobj, dependency_graph,
190 operation):
191 """ runs interface dependency graph """
192
193 indegrees = OrderedDict()
194
195 self.logger.debug('creating indegree array ...')
196 for ifacename in dependency_graph.keys():
197 indegrees[ifacename] = ifupdownobj.get_iface_refcnt(ifacename)
198
199 if self.logger.isEnabledFor(logging.DEBUG) == True:
200 self.logger.debug('indegree array :')
201 ifupdownobj.pp.pprint(indegrees)
202
203 try:
204 self.logger.debug('calling topological sort on the graph ...')
cca03c30 205 sorted_ifacenames = graph.topological_sort_graphs(
206 dependency_graph, indegrees)
207 except Exception:
a6f80f0e 208 raise
209
210 self.logger.debug('sorted iface list = %s' %sorted_ifacenames)
211
212 #self.run_iface_list(ifupdownobj, sorted_ifacenames, operation,
213 # sorted_by_dependency=True)
214
215 self.run_iface_list_stages(ifupdownobj, sorted_ifacenames, operation,
216 sorted_by_dependency=True)
217
218
219 def init_tokens(self, count):
220 self.token_pool = BoundedSemaphore(count)
221 self.logger.debug('initialized bounded semaphore with %d' %count)
222
223 def accquire_token(self, logprefix=''):
224 self.token_pool.acquire()
225 self.logger.debug('%s ' %logprefix + 'acquired token')
226
227 def release_token(self, logprefix=''):
228 self.token_pool.release()
229 self.logger.debug('%s ' %logprefix + 'release token')
230
231 def run_iface_parallel(self, ifupdownobj, ifacename, op):
232 """ Configures interface in parallel.
233
234 Executes all its direct dependents in parallel
235
236 """
237
238 self.logger.debug('%s:' %ifacename + ' %s' %op)
239 self.accquire_token(iface)
240
241 # Each iface can have a list of objects
242 ifaceobjs = ifupdownobj.get_iface_objs(ifacename)
243 if ifaceobjs is None:
244 self.logger.warning('%s: ' %ifacename + 'not found')
245 self.release_token(ifacename)
246 return -1
247
248 for ifaceobj in ifaceobjs:
249 # Run dependents
250 dlist = ifaceobj.get_dependents()
251 if dlist is not None and len(dlist) > 0:
252 self.logger.debug('%s:' %ifacename +
253 ' found dependents: %s' %str(dlist))
254 try:
255 self.release_token(ifacename)
256 self.run_iface_list_parallel(ifacename, ifupdownobj,
257 dlist, op)
258 self.accquire_token(ifacename)
259 except Exception, e:
3e8ee54f 260 if (self.ignore_error(str(e)) == True):
a6f80f0e 261 pass
262 else:
263 # Dont bring the iface up if children did not come up
264 self.logger.debug('%s:' %ifacename +
265 ' there was an error bringing %s' %op +
266 ' dependents (%s)', str(e))
267 ifupdownobj.set_iface_state(ifaceobj,
268 ifaceState.from_str(
269 ifupdownobj.get_subops(op)[0]),
270 ifaceStatus.ERROR)
271 return -1
272
273 if (op != 'query' and ifupdownobj.STATE_CHECK == True and
274 ifupdownobj.is_valid_state_transition(ifaceobj,
275 op) == False and ifupdownobj.FORCE == False):
276 self.logger.warning('%s:' %ifacename + ' already %s' %op)
277 continue
278
279
280 # Run all sub operations sequentially
281 try:
282 self.logger.debug('%s:' %ifacename + ' running sub-operations')
283 self.run_iface_subops(ifupdownobj, ifaceobj, op)
284 except Exception, e:
285 self.logger.error('%s:' %ifacename +
286 ' error running sub operations (%s)' %str(e))
287
288 self.release_token(ifacename)
289
290
291 def run_iface_list_parallel(self, parent, ifupdownobj, ifacenames, op):
292 """ Runs interface list in parallel """
293
294 running_threads = OrderedDict()
295 err = 0
296
297 for ifacename in ifacenames:
298 try:
299 self.accquire_token(parent)
300 running_threads[ifacename] = Thread(None,
301 self.run_iface_parallel, ifacename,
302 args=(ifupdownobj, ifacename, op))
303 running_threads[ifacename].start()
304 self.release_token(parent)
305 except Exception, e:
306 self.release_token(parent)
307 if (ifupdownobj.ignore_error(str(e)) == True):
308 pass
309 else:
310 raise Exception('error starting thread for iface %s'
311 %ifacename)
312
313
314 self.logger.debug('%s' %parent + 'waiting for all the threads ...')
315 for ifacename, t in running_threads.items():
316 t.join()
317 if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS:
318 err += 1
319
320 return err
321
322 def run_iface_graphs_parallel(self, parent, ifupdownobj, ifacenames, op):
323 """ Runs iface graphs in parallel """
324
325 running_threads = OrderedDict()
326 err = 0
327
328 for ifacename in ifacenames:
329 try:
330 self.accquire_graph_token(parent)
331 running_threads[ifacename] = Thread(None,
332 self.run_iface_parallel, ifacename,
333 args=(ifupdownobj, ifacename, op))
334 running_threads[ifacename].start()
335 self.release_graph_token(parent)
336 except Exception, e:
337 self.release_graph_token(parent)
338 if (ifupdownobj.ignore_error(str(e)) == True):
339 pass
340 else:
341 raise Exception('error starting thread for iface %s'
342 %ifacename)
343
344 self.logger.info('%s' %parent + 'waiting for all the threads ...')
345 for ifacename, t in running_threads.items():
346 t.join()
347 # Check status of thread
348 # XXX: Check all objs
349 if ifupdownobj.get_iface_status(ifacename) != ifaceStatus.SUCCESS:
350 err += 1
351
352 return err
353
354 def run_iface_dependency_graph_parallel(self, ifupdownobj, dependency_graph,
355 operation):
356 """ Runs iface dependeny graph in parallel.
357
358 arguments:
359 ifupdownobj -- ifupdown object (used for getting and updating iface
360 object state)
361 dependency_graph -- dependency graph with
362 operation -- 'up' or 'down' or 'query'
363
364 """
365
366 self.logger.debug('running dependency graph in parallel ..')
367
368 run_queue = []
369
370 # Build a list of ifaces that dont have any dependencies
371 for ifacename in dependency_graph.keys():
372 if ifupdownobj.get_iface_refcnt(ifacename) == 0:
373 run_queue.append(ifacename)
374
375 self.logger.debug('graph roots (interfaces that dont have dependents):' +
376 ' %s' %str(run_queue))
377
378 self.init_tokens(ifupdownobj.get_njobs())
379
380 return self.run_iface_list_parallel('main', ifupdownobj, run_queue,
381 operation)
382
383 # OR
384 # Run one graph at a time
385 #for iface in run_queue:
386 # self.run_iface_list_parallel('main', ifupdownobj, [iface],
387 # operation)
388