]> git.proxmox.com Git - ovs.git/blame - ovsdb/ovsdbmonitor/OVEFetch.py
Global replace of Nicira Networks.
[ovs.git] / ovsdb / ovsdbmonitor / OVEFetch.py
CommitLineData
e0edde6f 1# Copyright (c) 2011 Nicira, Inc.
436f27dd
AS
2# Copyright (c) 2010 Citrix Systems, Inc.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at:
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16from OVEStandard import *
17from OVEConfig import *
18from OVELogger import *
6aaf8f13 19import ovs.json
436f27dd
AS
20
21# This sequence installs the qt4reactor before twisted gets a chance to install its reactor
22import qt4reactor
23globalApp = QtGui.QApplication([])
24qt4reactor.install()
25
26try:
27 from twisted.conch.ssh import transport, userauth, connection, common, keys, channel
28 from twisted.internet import defer, protocol, reactor
29 from twisted.application import reactors
30except Exception, e:
31 print('+++ Python Twisted Conch module is required\n')
32 raise
33
34class OVEFetchUserAuth(userauth.SSHUserAuthClient):
35 def __init__(self, fetch, *params):
36 userauth.SSHUserAuthClient.__init__(self, *params)
37 self.fetch = fetch
38 self.authFails = 0
39
40 def getPassword(self):
41 return defer.succeed(self.fetch.config()['password'])
42
43 def ssh_USERAUTH_FAILURE(self, packet):
44 if self.authFails > 0: # We normally get one so ignore. Real failures send these repeatedly
45 OVELog('Authentication failure for '+self.fetch.config()['address'])
46 self.authFails += 1
47 userauth.SSHUserAuthClient.ssh_USERAUTH_FAILURE(self, packet)
48
49class OVEFetchConnection(connection.SSHConnection, QtCore.QObject):
50 def __init__(self, fetch, *params):
51 connection.SSHConnection.__init__(self, *params)
52 QtCore.QObject.__init__(self)
53 self.fetch = fetch
54 self._channel = None
55 self._oldChannels = []
56
57 def serviceStarted(self):
58 self.emit(QtCore.SIGNAL('connectionService(QObject)'), self)
59
60 def serviceStopped(self):
61 self.emit(QtCore.SIGNAL('connectionService(QObject)'), None)
62
63 def execCommand(self, requester, ref, command, commandType):
64 if self._channel is not None:
65 # Don't delete old channels immediately in case they're e.g. going to time out with a failure
66 self._oldChannels.append(self._channel)
67 if len(self._oldChannels) > 90:
68 # For 30 second timeouts at 1 second refresh interval and three windows open on a single host, need 90 channels
69 del self._oldChannels[1]
70 self._channel = OVECommandChannel(self.fetch, requester, ref, command, commandType, 2**16, 2**15, self)
71 self.openChannel(self._channel)
72
73 def connectionLost(self, reason):
74 if self._channel is not None:
75 self._channel.connectionLost(reason)
76
77class OVEFetchTransport(transport.SSHClientTransport, QtCore.QObject):
78 def __init__(self, fetch, *params):
79 # There is no __init__ method for this class
80 # transport.SSHClientTransport.__init__(self, *params)
81
82 QtCore.QObject.__init__(self)
83 self.fetch = fetch
84 self._connection = None
85 self.connect(self, QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.fetch.xon_channelFailure)
86
87 def verifyHostKey(self, hostKey, fingerprint):
88 return defer.succeed(1)
89
90 def connectionSecure(self):
91 self._connection = OVEFetchConnection(self.fetch)
92 QtCore.QObject.connect(self._connection, QtCore.SIGNAL('connectionService(QObject)'), self.fetch.xon_connectionService)
93 self.requestService(
94 OVEFetchUserAuth(self.fetch, self.fetch.config().get('username', 'root'),
95 self._connection))
96
97 def connectionLost(self, reason):
98 if self._connection is not None:
99 self._connection.connectionLost(reason)
100
101class OVEFetchWrapper:
102 def __init__(self, contents):
103 self.contents = contents
104
105class OVECommandChannel(channel.SSHChannel, QtCore.QObject):
106 name = 'session'
107 MSEC_TIMEOUT=10000
108 STATUS_CONNECTION_LOST = 100001
109 STATUS_TIMEOUT = 100002
110 END_MARKER='END-MARKER'
111 END_MARKER_RE=re.compile(r'^END-MARKER$', re.MULTILINE)
112
113 def __init__(self, fetch, requester, ref, command, commandType, *params):
114 channel.SSHChannel.__init__(self, *params)
115 QtCore.QObject.__init__(self)
116 self.fetch = fetch
117 self.requester = requester
118 self.ref = ref
119 self.command = command
120 self.commandType= commandType
121 self._data = ''
122 self._extData = ''
123 self._jsonValues = None
124 self._timerId = None
125 self._status = None
126 self.connect(self, QtCore.SIGNAL('channelData(QObject, int, QString)'), self.fetch.xon_channelData)
127 self.connect(self, QtCore.SIGNAL('channelExtData(QObject, int, QString)'), self.fetch.xon_channelExtData)
128 self.connect(self, QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.fetch.xon_channelSuccess)
129 self.connect(self, QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.fetch.xon_channelFailure)
130
131 def openFailed(self, reason):
132 if self._timerId is not None:
133 self.killTimer(self._timerId)
134 self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref,
135 'Open failed:'+str(reason), '', '')
136
137 def channelOpen(self, ignoredData):
138 try:
139 nsCommand = common.NS(str(self.command))
140 self._timerId = self.startTimer(self.MSEC_TIMEOUT)
141 self.conn.sendRequest(self, 'exec', nsCommand, wantReply=1)
142 except Exception, e:
143 self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref,
144 'Open failed:'+str(e), self._data, self._extData)
145
146 def dataReceived(self, data):
147 self._data += data
148 if OVEConfig.Inst().logTraffic:
149 self.emit(QtCore.SIGNAL('channelData(QObject, int, QString)'), self.requester, self.ref, data)
150 self.testIfDone()
151
152 def extDataReceived(self, extData):
153 self._extData += extData
154 if OVEConfig.Inst().logTraffic:
155 self.emit(QtCore.SIGNAL('channelExtData(QObject, int, QString)'), self.requester, self.ref, extData)
156
157 def request_exit_status(self, data):
158 # We can get the exit status before the data, so delay calling sendResult until we get both
159 self._status = struct.unpack('>L', data)[0]
160 self.testIfDone()
161
162 def testIfDone(self):
163 if self._status is not None:
164 if self._status != 0:
165 self.sendResult() # Failed, so send what we have
166 elif len(self._data) > 0:
167 # Status == success and we have some data
168 if self.commandType == 'JSON':
169 try:
170 # Decode the JSON data, to confirm that we have all of the data
6aaf8f13 171 self._jsonValues = ovs.json.from_string(str(self._data)) # FIXME: Should handle unicode
436f27dd
AS
172 self.sendResult()
173 except:
174 pass # Wait for more data
175 elif self.commandType == 'framed':
176 match = self.END_MARKER_RE.search(self._data)
177 if match:
178 self._data = self._data[:match.start()] # Remove end marker
179 self.sendResult()
180 else:
181 OVELog('Bad command type')
182
183 def sendResult(self):
184 if self._timerId is not None:
185 self.killTimer(self._timerId)
186 if self.commandType == 'JSON' and self._status == 0 and self._jsonValues is not None:
187 self.emit(QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.requester, self.ref, self._data, self._extData, QVariant(OVEFetchWrapper(self._jsonValues)))
188 elif self.commandType != 'JSON' and self._status == 0:
189 self.emit(QtCore.SIGNAL('channelSuccess(QObject, int, QString, QString, QVariant)'), self.requester, self.ref, self._data, self._extData, QVariant(None))
190 else:
191 self.emit(QtCore.SIGNAL('channelFailure(QObject, int, QString, QString, QString)'), self.requester, self.ref, 'Remote command failed (rc='+str(self._status)+')', self._data, self._extData)
192 if self._status != self.STATUS_CONNECTION_LOST:
193 try:
194 self.loseConnection()
195 except Exception, e:
196 OVELog('OVECommandChannel.sendResult loseConnection error: '+str(e))
197
198 def connectionLost(self, reason):
199 self._extData += '+++ Connection lost'
200 self._status = self.STATUS_CONNECTION_LOST
201 self.sendResult()
202
203 def timerEvent(self, event):
204 if event.timerId() == self._timerId:
205 self._extData += '+++ Timeout'
206 self._status = self.STATUS_TIMEOUT
207 self.sendResult()
208 else:
209 QtCore.QObject.timerEvent(self, event)
210
211class OVEFetchEvent(QtCore.QEvent):
212 TYPE = QtCore.QEvent.Type(QtCore.QEvent.registerEventType())
213 def __init__(self, ref, data):
214 QtCore.QEvent.__init__(self, self.TYPE)
215 self.ref = ref
216 self.data = data
217
218class OVEFetchFailEvent(QtCore.QEvent):
219 TYPE = QtCore.QEvent.Type(QtCore.QEvent.registerEventType())
220 def __init__(self, ref, message):
221 QtCore.QEvent.__init__(self, self.TYPE)
222 self.ref = ref
223 self.message = str(message)
224
225class OVEFetch(QtCore.QObject):
226 instances = {}
227 SEC_TIMEOUT = 10.0
228
229 def __init__(self, uuid):
230 QtCore.QObject.__init__(self)
231 self._hostUuid = uuid
232 self._config = None
233 self._transport = None
234 self._connection = None
235 self._commandQueue = []
236 self._timerRef = 0
237 self.refs = {}
238 self.messages = {}
239 self.values = {}
240 self.connect(OVEConfig.Inst(), QtCore.SIGNAL("configUpdated()"), self.xon_configUpdated)
241
242 @classmethod
243 def Inst(cls, uuid):
244 if uuid not in cls.instances:
245 cls.instances[uuid] = OVEFetch(uuid)
246 return cls.instances[uuid]
247
248 @classmethod
249 def startReactor(cls):
250 reactor.runReturn()
251
252 def xon_configUpdated(self):
253 self._config = None
254 self.resetTransport()
255
256 def xon_connectionService(self, connection):
257 self._connection = connection
258 if self._connection is not None:
259 OVELog('SSH connection to '+self.config()['address'] +' established')
260 for command in self._commandQueue:
261 # OVELog('Unqueueing '+str(command))
262 self.execCommand2(*command)
263 self._commandQueue = []
264
265 def xon_channelData(self, requester, ref, data):
266 if OVEConfig.Inst().logTraffic:
267 OVELog('Channel data received: '+str(data))
268
269 def xon_channelExtData(self, requester, ref, data):
270 if OVEConfig.Inst().logTraffic:
271 OVELog('+++ Channel extData (stderr) received: '+str(data))
272
273 def xon_channelFailure(self, requester, ref, message, data, extData):
274 if OVEConfig.Inst().logTraffic:
275 OVELog('+++ Channel failure: '+str(message))
276 OVELog("Closing SSH session due to failure")
277
278 errMessage = message
279 if len(data) > 0:
280 errMessage += '\n+++ Failed command output: '+data
281 if len(extData) > 0:
282 errMessage += '\n+++ Failed command output (stderr): '+extData
283
284 self.refs[requester] = ref # For PySide workaround
285 self.messages[requester] = errMessage # For PySide workaround
286 event = OVEFetchFailEvent(ref, errMessage)
287 QtCore.QCoreApplication.postEvent(requester, event)
288 self.resetTransport()
289
290 def xon_channelSuccess(self, requester, ref, data, extData, jsonValueVariant):
291 jsonValues = jsonValueVariant.toPyObject()
292 if OVEConfig.Inst().logTraffic:
293 OVELog('--- Channel success')
294 try:
295 if jsonValues is not None:
296 values = jsonValues.contents
297 else:
298 values = str(data)
299
300 self.refs[requester] = ref # For PySide workaround
301 self.values[requester] = values # For PySide workaround
302 event = OVEFetchEvent(ref, values)
303 QtCore.QCoreApplication.postEvent(requester, event)
304 except Exception, e:
305 message = ('+++ Failed to decode JSON reply: '+str(e))
306 if len(data) > 0: message += "\n++++++ Data (stdout): "+str(data)
307 if len(extData) > 0: message += '\n++++++ Error (stderr): '+str(extData)
308 self.refs[requester] = ref # For PySide workaround
309 self.messages[requester] = message # For PySide workaround
310 event = OVEFetchFailEvent(ref, message)
311 QtCore.QCoreApplication.postEvent(requester, event)
312
313 # Use for workaround only
314 def snoopRef(self, requester):
315 return self.refs.get(requester, None)
316
317 # Use for workaround only
318 def snoopValues(self, requester):
319 return self.values.get(requester, None)
320
321 # Use for workaround only
322 def snoopMessage(self, requester):
323 return self.messages.get(requester, None)
324
325 def config(self):
326 if self._config is None:
327 self._config = OVEConfig.Inst().hostFromUuid(self._hostUuid)
328
329 return self._config
330
331 def resetTransport(self):
332 if OVEConfig.Inst().logTraffic:
333 OVELog('Transport reset for '+self.config()['address'])
334 del self._connection
335 del self._transport
336 self._connection = None
337 self._transport = None
338
339 def transportErrback(self, failure, requester, ref, address):
340 self._timerRef += 1 # Prevent timeout handling
341 self.resetTransport()
342 message = 'Failure connecting to '+address+': '+failure.getErrorMessage()
343 self.refs[requester] = ref # For PySide workaround
344 self.messages[requester] = message # For PySide workaround
345 event = OVEFetchFailEvent(ref, message)
346 QtCore.QCoreApplication.postEvent(requester, event)
347
348 def transportTimeout(self, timerRef, requester, ref, address):
349 if self._timerRef == timerRef and self._transport is not None and self._connection is None:
350 message = 'Connection attempt to ' +address+' timed out'
351 self.refs[requester] = ref # For PySide workaround
352 self.messages[requester] = message # For PySide workaround
353 event = OVEFetchFailEvent(ref, message)
354 QtCore.QCoreApplication.postEvent(requester, event)
355 self.resetTransport()
356
357 def execCommand(self, requester, ref, command, commandType):
358 if OVEConfig.Inst().logTraffic:
359 hostName = (self.config() or {}).get('address', '<Address not set>')
360 OVELog(str(QtCore.QTime.currentTime().toString())+' '+hostName+': Executing '+command)
361 if self._transport is None:
362 self._connection = None
363 self._commandQueue.append((requester, ref, command, commandType))
364 config = self.config()
365 creator = protocol.ClientCreator(reactor, OVEFetchTransport, self)
366 self._transport = creator.connectTCP(config['address'], config.get('port', 22), timeout = self.SEC_TIMEOUT)
367 self._transport.addErrback(self.transportErrback, requester, ref, config['address'])
368 self._timerRef += 1
369 # Set this timer slightly longer than the twisted.conch timeout, as transportErrback can cancel
370 # the timeout and prevent double handling
371 # lambda timerRef = self._timerRef: takes a copy of self._timerRef
372 QtCore.QTimer.singleShot(int((1+self.SEC_TIMEOUT) * 1000), lambda timerRef = self._timerRef: self.transportTimeout(timerRef, requester, ref, config['address']))
373 else:
374 self.execCommand2(requester, ref, command, commandType)
375
376 def execCommand2(self, requester, ref, command, commandType):
377 if self._connection is None:
378 self._commandQueue.append((requester, ref, command, commandType))
379 else:
380 self._connection.execCommand(requester, ref, command, commandType)
381
382 def getTable(self, requester, tableName, ref = QtCore.QObject()):
383 command = '/usr/bin/ovsdb-client transact '+self.config()['connectTarget']+' \'["Open_vSwitch", {"op":"select","table":"'+tableName+'", "where":[]}]\''
384
385 self.execCommand(requester, ref, command, 'JSON')
386
387 def execCommandFramed(self, requester, ref, command):
388 self.execCommand(requester, ref, command + ' && echo ' + OVECommandChannel.END_MARKER, 'framed')