]>
Commit | Line | Data |
---|---|---|
e0edde6f | 1 | # Copyright (c) 2011 Nicira, Inc. |
436f27dd AS |
2 | # Copyright (c) 2010 Citrix Systems, Inc. |
3 | # | |
4 | # Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | # you may not use this file except in compliance with the License. | |
6 | # You may obtain a copy of the License at: | |
7 | # | |
8 | # http://www.apache.org/licenses/LICENSE-2.0 | |
9 | # | |
10 | # Unless required by applicable law or agreed to in writing, software | |
11 | # distributed under the License is distributed on an "AS IS" BASIS, | |
12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | # See the License for the specific language governing permissions and | |
14 | # limitations under the License. | |
15 | ||
16 | from OVEStandard import * | |
17 | from OVEConfig import * | |
18 | from OVELogger import * | |
6aaf8f13 | 19 | import ovs.json |
436f27dd AS |
20 | |
21 | # This sequence installs the qt4reactor before twisted gets a chance to install its reactor | |
22 | import qt4reactor | |
23 | globalApp = QtGui.QApplication([]) | |
24 | qt4reactor.install() | |
25 | ||
26 | try: | |
27 | from twisted.conch.ssh import transport, userauth, connection, common, keys, channel | |
28 | from twisted.internet import defer, protocol, reactor | |
29 | from twisted.application import reactors | |
30 | except Exception, e: | |
31 | print('+++ Python Twisted Conch module is required\n') | |
32 | raise | |
33 | ||
34 | class OVEFetchUserAuth(userauth.SSHUserAuthClient): | |
35 | def __init__(self, fetch, *params): | |
36 | userauth.SSHUserAuthClient.__init__(self, *params) | |
37 | self.fetch = fetch | |
38 | self.authFails = 0 | |
39 | ||
40 | def getPassword(self): | |
41 | return defer.succeed(self.fetch.config()['password']) | |
42 | ||
43 | def ssh_USERAUTH_FAILURE(self, packet): | |
44 | if self.authFails > 0: # We normally get one so ignore. Real failures send these repeatedly | |
45 | OVELog('Authentication failure for '+self.fetch.config()['address']) | |
46 | self.authFails += 1 | |
47 | userauth.SSHUserAuthClient.ssh_USERAUTH_FAILURE(self, packet) | |
48 | ||
49 | class OVEFetchConnection(connection.SSHConnection, QtCore.QObject): | |
50 | def __init__(self, fetch, *params): | |
51 | connection.SSHConnection.__init__(self, *params) | |
52 | QtCore.QObject.__init__(self) | |
53 | self.fetch = fetch | |
54 | self._channel = None | |
55 | self._oldChannels = [] | |
56 | ||
57 | def serviceStarted(self): | |
58 | self.emit(QtCore.SIGNAL('connectionService(QObject)'), self) | |
59 | ||
60 | def serviceStopped(self): | |
61 | self.emit(QtCore.SIGNAL('connectionService(QObject)'), None) | |
62 | ||
63 | def execCommand(self, requester, ref, command, commandType): | |
64 | if self._channel is not None: | |
65 | # Don't delete old channels immediately in case they're e.g. going to time out with a failure | |
66 | self._oldChannels.append(self._channel) | |
67 | if len(self._oldChannels) > 90: | |
68 | # For 30 second timeouts at 1 second refresh interval and three windows open on a single host, need 90 channels | |
69 | del self._oldChannels[1] | |
70 | self._channel = OVECommandChannel(self.fetch, requester, ref, command, commandType, 2**16, 2**15, self) | |
71 | self.openChannel(self._channel) | |
72 | ||
73 | def connectionLost(self, reason): | |
74 | if self._channel is not None: | |
75 | self._channel.connectionLost(reason) | |
76 | ||
77 | class OVEFetchTransport(transport.SSHClientTransport, QtCore.QObject): | |
78 | def __init__(self, fetch, *params): | |
79 | # There is no __init__ method for this class | |
80 | # transport.SSHClientTransport.__init__(self, *params) | |
81 | ||
82 | QtCore.QObject.__init__(self) | |
83 | self.fetch = fetch | |
84 | self._connection = None | |
85 | self.connect(self, QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.fetch.xon_channelFailure) | |
86 | ||
87 | def verifyHostKey(self, hostKey, fingerprint): | |
88 | return defer.succeed(1) | |
89 | ||
90 | def connectionSecure(self): | |
91 | self._connection = OVEFetchConnection(self.fetch) | |
92 | QtCore.QObject.connect(self._connection, QtCore.SIGNAL('connectionService(QObject)'), self.fetch.xon_connectionService) | |
93 | self.requestService( | |
94 | OVEFetchUserAuth(self.fetch, self.fetch.config().get('username', 'root'), | |
95 | self._connection)) | |
96 | ||
97 | def connectionLost(self, reason): | |
98 | if self._connection is not None: | |
99 | self._connection.connectionLost(reason) | |
100 | ||
101 | class OVEFetchWrapper: | |
102 | def __init__(self, contents): | |
103 | self.contents = contents | |
104 | ||
105 | class OVECommandChannel(channel.SSHChannel, QtCore.QObject): | |
106 | name = 'session' | |
107 | MSEC_TIMEOUT=10000 | |
108 | STATUS_CONNECTION_LOST = 100001 | |
109 | STATUS_TIMEOUT = 100002 | |
110 | END_MARKER='END-MARKER' | |
111 | END_MARKER_RE=re.compile(r'^END-MARKER$', re.MULTILINE) | |
112 | ||
113 | def __init__(self, fetch, requester, ref, command, commandType, *params): | |
114 | channel.SSHChannel.__init__(self, *params) | |
115 | QtCore.QObject.__init__(self) | |
116 | self.fetch = fetch | |
117 | self.requester = requester | |
118 | self.ref = ref | |
119 | self.command = command | |
120 | self.commandType= commandType | |
121 | self._data = '' | |
122 | self._extData = '' | |
123 | self._jsonValues = None | |
124 | self._timerId = None | |
125 | self._status = None | |
126 | self.connect(self, QtCore.SIGNAL('channelData(QObject, int, QString)'), self.fetch.xon_channelData) | |
127 | self.connect(self, QtCore.SIGNAL('channelExtData(QObject, int, QString)'), self.fetch.xon_channelExtData) | |
128 | self.connect(self, QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.fetch.xon_channelSuccess) | |
129 | self.connect(self, QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.fetch.xon_channelFailure) | |
130 | ||
131 | def openFailed(self, reason): | |
132 | if self._timerId is not None: | |
133 | self.killTimer(self._timerId) | |
134 | self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref, | |
135 | 'Open failed:'+str(reason), '', '') | |
136 | ||
137 | def channelOpen(self, ignoredData): | |
138 | try: | |
139 | nsCommand = common.NS(str(self.command)) | |
140 | self._timerId = self.startTimer(self.MSEC_TIMEOUT) | |
141 | self.conn.sendRequest(self, 'exec', nsCommand, wantReply=1) | |
142 | except Exception, e: | |
143 | self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref, | |
144 | 'Open failed:'+str(e), self._data, self._extData) | |
145 | ||
146 | def dataReceived(self, data): | |
147 | self._data += data | |
148 | if OVEConfig.Inst().logTraffic: | |
149 | self.emit(QtCore.SIGNAL('channelData(QObject, int, QString)'), self.requester, self.ref, data) | |
150 | self.testIfDone() | |
151 | ||
152 | def extDataReceived(self, extData): | |
153 | self._extData += extData | |
154 | if OVEConfig.Inst().logTraffic: | |
155 | self.emit(QtCore.SIGNAL('channelExtData(QObject, int, QString)'), self.requester, self.ref, extData) | |
156 | ||
157 | def request_exit_status(self, data): | |
158 | # We can get the exit status before the data, so delay calling sendResult until we get both | |
159 | self._status = struct.unpack('>L', data)[0] | |
160 | self.testIfDone() | |
161 | ||
162 | def testIfDone(self): | |
163 | if self._status is not None: | |
164 | if self._status != 0: | |
165 | self.sendResult() # Failed, so send what we have | |
166 | elif len(self._data) > 0: | |
167 | # Status == success and we have some data | |
168 | if self.commandType == 'JSON': | |
169 | try: | |
170 | # Decode the JSON data, to confirm that we have all of the data | |
6aaf8f13 | 171 | self._jsonValues = ovs.json.from_string(str(self._data)) # FIXME: Should handle unicode |
436f27dd AS |
172 | self.sendResult() |
173 | except: | |
174 | pass # Wait for more data | |
175 | elif self.commandType == 'framed': | |
176 | match = self.END_MARKER_RE.search(self._data) | |
177 | if match: | |
178 | self._data = self._data[:match.start()] # Remove end marker | |
179 | self.sendResult() | |
180 | else: | |
181 | OVELog('Bad command type') | |
182 | ||
183 | def sendResult(self): | |
184 | if self._timerId is not None: | |
185 | self.killTimer(self._timerId) | |
186 | if self.commandType == 'JSON' and self._status == 0 and self._jsonValues is not None: | |
187 | self.emit(QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.requester, self.ref, self._data, self._extData, QVariant(OVEFetchWrapper(self._jsonValues))) | |
188 | elif self.commandType != 'JSON' and self._status == 0: | |
189 | self.emit(QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.requester, self.ref, self._data, self._extData, QVariant(None)) | |
190 | else: | |
191 | self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref, 'Remote command failed (rc='+str(self._status)+')', self._data, self._extData) | |
192 | if self._status != self.STATUS_CONNECTION_LOST: | |
193 | try: | |
194 | self.loseConnection() | |
195 | except Exception, e: | |
196 | OVELog('OVECommandChannel.sendResult loseConnection error: '+str(e)) | |
197 | ||
198 | def connectionLost(self, reason): | |
199 | self._extData += '+++ Connection lost' | |
200 | self._status = self.STATUS_CONNECTION_LOST | |
201 | self.sendResult() | |
202 | ||
203 | def timerEvent(self, event): | |
204 | if event.timerId() == self._timerId: | |
205 | self._extData += '+++ Timeout' | |
206 | self._status = self.STATUS_TIMEOUT | |
207 | self.sendResult() | |
208 | else: | |
209 | QtCore.QObject.timerEvent(self, event) | |
210 | ||
211 | class OVEFetchEvent(QtCore.QEvent): | |
212 | TYPE = QtCore.QEvent.Type(QtCore.QEvent.registerEventType()) | |
213 | def __init__(self, ref, data): | |
214 | QtCore.QEvent.__init__(self, self.TYPE) | |
215 | self.ref = ref | |
216 | self.data = data | |
217 | ||
218 | class OVEFetchFailEvent(QtCore.QEvent): | |
219 | TYPE = QtCore.QEvent.Type(QtCore.QEvent.registerEventType()) | |
220 | def __init__(self, ref, message): | |
221 | QtCore.QEvent.__init__(self, self.TYPE) | |
222 | self.ref = ref | |
223 | self.message = str(message) | |
224 | ||
225 | class OVEFetch(QtCore.QObject): | |
226 | instances = {} | |
227 | SEC_TIMEOUT = 10.0 | |
228 | ||
229 | def __init__(self, uuid): | |
230 | QtCore.QObject.__init__(self) | |
231 | self._hostUuid = uuid | |
232 | self._config = None | |
233 | self._transport = None | |
234 | self._connection = None | |
235 | self._commandQueue = [] | |
236 | self._timerRef = 0 | |
237 | self.refs = {} | |
238 | self.messages = {} | |
239 | self.values = {} | |
240 | self.connect(OVEConfig.Inst(), QtCore.SIGNAL("configUpdated()"), self.xon_configUpdated) | |
241 | ||
242 | @classmethod | |
243 | def Inst(cls, uuid): | |
244 | if uuid not in cls.instances: | |
245 | cls.instances[uuid] = OVEFetch(uuid) | |
246 | return cls.instances[uuid] | |
247 | ||
248 | @classmethod | |
249 | def startReactor(cls): | |
250 | reactor.runReturn() | |
251 | ||
252 | def xon_configUpdated(self): | |
253 | self._config = None | |
254 | self.resetTransport() | |
255 | ||
256 | def xon_connectionService(self, connection): | |
257 | self._connection = connection | |
258 | if self._connection is not None: | |
259 | OVELog('SSH connection to '+self.config()['address'] +' established') | |
260 | for command in self._commandQueue: | |
261 | # OVELog('Unqueueing '+str(command)) | |
262 | self.execCommand2(*command) | |
263 | self._commandQueue = [] | |
264 | ||
265 | def xon_channelData(self, requester, ref, data): | |
266 | if OVEConfig.Inst().logTraffic: | |
267 | OVELog('Channel data received: '+str(data)) | |
268 | ||
269 | def xon_channelExtData(self, requester, ref, data): | |
270 | if OVEConfig.Inst().logTraffic: | |
271 | OVELog('+++ Channel extData (stderr) received: '+str(data)) | |
272 | ||
273 | def xon_channelFailure(self, requester, ref, message, data, extData): | |
274 | if OVEConfig.Inst().logTraffic: | |
275 | OVELog('+++ Channel failure: '+str(message)) | |
276 | OVELog("Closing SSH session due to failure") | |
277 | ||
278 | errMessage = message | |
279 | if len(data) > 0: | |
280 | errMessage += '\n+++ Failed command output: '+data | |
281 | if len(extData) > 0: | |
282 | errMessage += '\n+++ Failed command output (stderr): '+extData | |
283 | ||
284 | self.refs[requester] = ref # For PySide workaround | |
285 | self.messages[requester] = errMessage # For PySide workaround | |
286 | event = OVEFetchFailEvent(ref, errMessage) | |
287 | QtCore.QCoreApplication.postEvent(requester, event) | |
288 | self.resetTransport() | |
289 | ||
290 | def xon_channelSuccess(self, requester, ref, data, extData, jsonValueVariant): | |
291 | jsonValues = jsonValueVariant.toPyObject() | |
292 | if OVEConfig.Inst().logTraffic: | |
293 | OVELog('--- Channel success') | |
294 | try: | |
295 | if jsonValues is not None: | |
296 | values = jsonValues.contents | |
297 | else: | |
298 | values = str(data) | |
299 | ||
300 | self.refs[requester] = ref # For PySide workaround | |
301 | self.values[requester] = values # For PySide workaround | |
302 | event = OVEFetchEvent(ref, values) | |
303 | QtCore.QCoreApplication.postEvent(requester, event) | |
304 | except Exception, e: | |
305 | message = ('+++ Failed to decode JSON reply: '+str(e)) | |
306 | if len(data) > 0: message += "\n++++++ Data (stdout): "+str(data) | |
307 | if len(extData) > 0: message += '\n++++++ Error (stderr): '+str(extData) | |
308 | self.refs[requester] = ref # For PySide workaround | |
309 | self.messages[requester] = message # For PySide workaround | |
310 | event = OVEFetchFailEvent(ref, message) | |
311 | QtCore.QCoreApplication.postEvent(requester, event) | |
312 | ||
313 | # Use for workaround only | |
314 | def snoopRef(self, requester): | |
315 | return self.refs.get(requester, None) | |
316 | ||
317 | # Use for workaround only | |
318 | def snoopValues(self, requester): | |
319 | return self.values.get(requester, None) | |
320 | ||
321 | # Use for workaround only | |
322 | def snoopMessage(self, requester): | |
323 | return self.messages.get(requester, None) | |
324 | ||
325 | def config(self): | |
326 | if self._config is None: | |
327 | self._config = OVEConfig.Inst().hostFromUuid(self._hostUuid) | |
328 | ||
329 | return self._config | |
330 | ||
331 | def resetTransport(self): | |
332 | if OVEConfig.Inst().logTraffic: | |
333 | OVELog('Transport reset for '+self.config()['address']) | |
334 | del self._connection | |
335 | del self._transport | |
336 | self._connection = None | |
337 | self._transport = None | |
338 | ||
339 | def transportErrback(self, failure, requester, ref, address): | |
340 | self._timerRef += 1 # Prevent timeout handling | |
341 | self.resetTransport() | |
342 | message = 'Failure connecting to '+address+': '+failure.getErrorMessage() | |
343 | self.refs[requester] = ref # For PySide workaround | |
344 | self.messages[requester] = message # For PySide workaround | |
345 | event = OVEFetchFailEvent(ref, message) | |
346 | QtCore.QCoreApplication.postEvent(requester, event) | |
347 | ||
348 | def transportTimeout(self, timerRef, requester, ref, address): | |
349 | if self._timerRef == timerRef and self._transport is not None and self._connection is None: | |
350 | message = 'Connection attempt to ' +address+' timed out' | |
351 | self.refs[requester] = ref # For PySide workaround | |
352 | self.messages[requester] = message # For PySide workaround | |
353 | event = OVEFetchFailEvent(ref, message) | |
354 | QtCore.QCoreApplication.postEvent(requester, event) | |
355 | self.resetTransport() | |
356 | ||
357 | def execCommand(self, requester, ref, command, commandType): | |
358 | if OVEConfig.Inst().logTraffic: | |
359 | hostName = (self.config() or {}).get('address', '<Address not set>') | |
360 | OVELog(str(QtCore.QTime.currentTime().toString())+' '+hostName+': Executing '+command) | |
361 | if self._transport is None: | |
362 | self._connection = None | |
363 | self._commandQueue.append((requester, ref, command, commandType)) | |
364 | config = self.config() | |
365 | creator = protocol.ClientCreator(reactor, OVEFetchTransport, self) | |
366 | self._transport = creator.connectTCP(config['address'], config.get('port', 22), timeout = self.SEC_TIMEOUT) | |
367 | self._transport.addErrback(self.transportErrback, requester, ref, config['address']) | |
368 | self._timerRef += 1 | |
369 | # Set this timer slightly longer than the twisted.conch timeout, as transportErrback can cancel | |
370 | # the timeout and prevent double handling | |
371 | # lambda timerRef = self._timerRef: takes a copy of self._timerRef | |
372 | QtCore.QTimer.singleShot(int((1+self.SEC_TIMEOUT) * 1000), lambda timerRef = self._timerRef: self.transportTimeout(timerRef, requester, ref, config['address'])) | |
373 | else: | |
374 | self.execCommand2(requester, ref, command, commandType) | |
375 | ||
376 | def execCommand2(self, requester, ref, command, commandType): | |
377 | if self._connection is None: | |
378 | self._commandQueue.append((requester, ref, command, commandType)) | |
379 | else: | |
380 | self._connection.execCommand(requester, ref, command, commandType) | |
381 | ||
382 | def getTable(self, requester, tableName, ref = QtCore.QObject()): | |
383 | command = '/usr/bin/ovsdb-client transact '+self.config()['connectTarget']+' \'["Open_vSwitch", {"op":"select","table":"'+tableName+'", "where":[]}]\'' | |
384 | ||
385 | self.execCommand(requester, ref, command, 'JSON') | |
386 | ||
387 | def execCommandFramed(self, requester, ref, command): | |
388 | self.execCommand(requester, ref, command + ' && echo ' + OVECommandChannel.END_MARKER, 'framed') |