]>
Commit | Line | Data |
---|---|---|
8758e8a3 | 1 | # Copyright (c) 2010, 2011 Nicira Networks |
99155935 BP |
2 | # |
3 | # Licensed under the Apache License, Version 2.0 (the "License"); | |
4 | # you may not use this file except in compliance with the License. | |
5 | # You may obtain a copy of the License at: | |
6 | # | |
7 | # http://www.apache.org/licenses/LICENSE-2.0 | |
8 | # | |
9 | # Unless required by applicable law or agreed to in writing, software | |
10 | # distributed under the License is distributed on an "AS IS" BASIS, | |
11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
12 | # See the License for the specific language governing permissions and | |
13 | # limitations under the License. | |
14 | ||
15 | import errno | |
16 | import logging | |
17 | import os | |
18 | ||
da51646f | 19 | import ovs.json |
99155935 BP |
20 | import ovs.poller |
21 | import ovs.reconnect | |
22 | import ovs.stream | |
23 | import ovs.timeval | |
24 | ||
25 | EOF = -1 | |
26 | ||
26bb0f31 | 27 | |
99155935 BP |
28 | class Message(object): |
29 | T_REQUEST = 0 # Request. | |
30 | T_NOTIFY = 1 # Notification. | |
31 | T_REPLY = 2 # Successful reply. | |
32 | T_ERROR = 3 # Error reply. | |
33 | ||
34 | __types = {T_REQUEST: "request", | |
35 | T_NOTIFY: "notification", | |
36 | T_REPLY: "reply", | |
37 | T_ERROR: "error"} | |
99155935 | 38 | |
8758e8a3 BP |
39 | def __init__(self, type_, method, params, result, error, id): |
40 | self.type = type_ | |
99155935 BP |
41 | self.method = method |
42 | self.params = params | |
43 | self.result = result | |
44 | self.error = error | |
45 | self.id = id | |
46 | ||
47 | _next_id = 0 | |
26bb0f31 | 48 | |
99155935 BP |
49 | @staticmethod |
50 | def _create_id(): | |
51 | this_id = Message._next_id | |
52 | Message._next_id += 1 | |
53 | return this_id | |
54 | ||
55 | @staticmethod | |
56 | def create_request(method, params): | |
57 | return Message(Message.T_REQUEST, method, params, None, None, | |
58 | Message._create_id()) | |
59 | ||
60 | @staticmethod | |
61 | def create_notify(method, params): | |
62 | return Message(Message.T_NOTIFY, method, params, None, None, | |
63 | None) | |
64 | ||
65 | @staticmethod | |
66 | def create_reply(result, id): | |
67 | return Message(Message.T_REPLY, None, None, result, None, id) | |
68 | ||
69 | @staticmethod | |
70 | def create_error(error, id): | |
71 | return Message(Message.T_ERROR, None, None, None, error, id) | |
72 | ||
73 | @staticmethod | |
8758e8a3 BP |
74 | def type_to_string(type_): |
75 | return Message.__types[type_] | |
99155935 | 76 | |
b2a5856f | 77 | def __validate_arg(self, value, name, must_have): |
99155935 BP |
78 | if (value is not None) == (must_have != 0): |
79 | return None | |
80 | else: | |
81 | type_name = Message.type_to_string(self.type) | |
82 | if must_have: | |
83 | verb = "must" | |
84 | else: | |
85 | verb = "must not" | |
86 | return "%s %s have \"%s\"" % (type_name, verb, name) | |
87 | ||
88 | def is_valid(self): | |
89 | if self.params is not None and type(self.params) != list: | |
90 | return "\"params\" must be JSON array" | |
91 | ||
92 | pattern = {Message.T_REQUEST: 0x11001, | |
93 | Message.T_NOTIFY: 0x11000, | |
94 | Message.T_REPLY: 0x00101, | |
95 | Message.T_ERROR: 0x00011}.get(self.type) | |
96 | if pattern is None: | |
97 | return "invalid JSON-RPC message type %s" % self.type | |
98 | ||
99 | return ( | |
b2a5856f BP |
100 | self.__validate_arg(self.method, "method", pattern & 0x10000) or |
101 | self.__validate_arg(self.params, "params", pattern & 0x1000) or | |
102 | self.__validate_arg(self.result, "result", pattern & 0x100) or | |
103 | self.__validate_arg(self.error, "error", pattern & 0x10) or | |
104 | self.__validate_arg(self.id, "id", pattern & 0x1)) | |
99155935 BP |
105 | |
106 | @staticmethod | |
107 | def from_json(json): | |
108 | if type(json) != dict: | |
109 | return "message is not a JSON object" | |
110 | ||
111 | # Make a copy to avoid modifying the caller's dict. | |
112 | json = dict(json) | |
113 | ||
114 | if "method" in json: | |
115 | method = json.pop("method") | |
116 | if type(method) not in [str, unicode]: | |
117 | return "method is not a JSON string" | |
118 | else: | |
119 | method = None | |
120 | ||
121 | params = json.pop("params", None) | |
122 | result = json.pop("result", None) | |
123 | error = json.pop("error", None) | |
9b46cccc | 124 | id_ = json.pop("id", None) |
99155935 BP |
125 | if len(json): |
126 | return "message has unexpected member \"%s\"" % json.popitem()[0] | |
127 | ||
128 | if result is not None: | |
129 | msg_type = Message.T_REPLY | |
130 | elif error is not None: | |
131 | msg_type = Message.T_ERROR | |
9b46cccc | 132 | elif id_ is not None: |
99155935 BP |
133 | msg_type = Message.T_REQUEST |
134 | else: | |
135 | msg_type = Message.T_NOTIFY | |
26bb0f31 | 136 | |
9b46cccc | 137 | msg = Message(msg_type, method, params, result, error, id_) |
99155935 BP |
138 | validation_error = msg.is_valid() |
139 | if validation_error is not None: | |
140 | return validation_error | |
141 | else: | |
142 | return msg | |
143 | ||
144 | def to_json(self): | |
145 | json = {} | |
146 | ||
147 | if self.method is not None: | |
148 | json["method"] = self.method | |
149 | ||
150 | if self.params is not None: | |
151 | json["params"] = self.params | |
152 | ||
153 | if self.result is not None or self.type == Message.T_ERROR: | |
154 | json["result"] = self.result | |
155 | ||
156 | if self.error is not None or self.type == Message.T_REPLY: | |
157 | json["error"] = self.error | |
158 | ||
159 | if self.id is not None or self.type == Message.T_NOTIFY: | |
160 | json["id"] = self.id | |
161 | ||
162 | return json | |
163 | ||
164 | def __str__(self): | |
165 | s = [Message.type_to_string(self.type)] | |
166 | if self.method is not None: | |
167 | s.append("method=\"%s\"" % self.method) | |
168 | if self.params is not None: | |
169 | s.append("params=" + ovs.json.to_string(self.params)) | |
b2edc4e7 BP |
170 | if self.result is not None: |
171 | s.append("result=" + ovs.json.to_string(self.result)) | |
99155935 BP |
172 | if self.error is not None: |
173 | s.append("error=" + ovs.json.to_string(self.error)) | |
174 | if self.id is not None: | |
175 | s.append("id=" + ovs.json.to_string(self.id)) | |
176 | return ", ".join(s) | |
177 | ||
26bb0f31 | 178 | |
99155935 BP |
179 | class Connection(object): |
180 | def __init__(self, stream): | |
63b1a521 | 181 | self.name = stream.name |
99155935 BP |
182 | self.stream = stream |
183 | self.status = 0 | |
184 | self.input = "" | |
185 | self.output = "" | |
186 | self.parser = None | |
187 | ||
188 | def close(self): | |
189 | self.stream.close() | |
190 | self.stream = None | |
191 | ||
192 | def run(self): | |
193 | if self.status: | |
194 | return | |
195 | ||
196 | while len(self.output): | |
197 | retval = self.stream.send(self.output) | |
198 | if retval >= 0: | |
199 | self.output = self.output[retval:] | |
200 | else: | |
201 | if retval != -errno.EAGAIN: | |
202 | logging.warn("%s: send error: %s" % (self.name, | |
203 | os.strerror(-retval))) | |
204 | self.error(-retval) | |
205 | break | |
206 | ||
207 | def wait(self, poller): | |
208 | if not self.status: | |
209 | self.stream.run_wait(poller) | |
210 | if len(self.output): | |
211 | self.stream.send_wait() | |
212 | ||
213 | def get_status(self): | |
214 | return self.status | |
215 | ||
216 | def get_backlog(self): | |
217 | if self.status != 0: | |
218 | return 0 | |
219 | else: | |
220 | return len(self.output) | |
221 | ||
99155935 BP |
222 | def __log_msg(self, title, msg): |
223 | logging.debug("%s: %s %s" % (self.name, title, msg)) | |
224 | ||
225 | def send(self, msg): | |
226 | if self.status: | |
227 | return self.status | |
228 | ||
229 | self.__log_msg("send", msg) | |
230 | ||
231 | was_empty = len(self.output) == 0 | |
232 | self.output += ovs.json.to_string(msg.to_json()) | |
233 | if was_empty: | |
234 | self.run() | |
235 | return self.status | |
236 | ||
237 | def send_block(self, msg): | |
238 | error = self.send(msg) | |
239 | if error: | |
240 | return error | |
241 | ||
242 | while True: | |
243 | self.run() | |
244 | if not self.get_backlog() or self.get_status(): | |
245 | return self.status | |
246 | ||
247 | poller = ovs.poller.Poller() | |
248 | self.wait(poller) | |
249 | poller.block() | |
250 | ||
251 | def recv(self): | |
252 | if self.status: | |
253 | return self.status, None | |
254 | ||
255 | while True: | |
2ad4ef89 | 256 | if not self.input: |
99155935 BP |
257 | error, data = self.stream.recv(4096) |
258 | if error: | |
259 | if error == errno.EAGAIN: | |
260 | return error, None | |
261 | else: | |
262 | # XXX rate-limit | |
263 | logging.warning("%s: receive error: %s" | |
264 | % (self.name, os.strerror(error))) | |
265 | self.error(error) | |
266 | return self.status, None | |
2ad4ef89 | 267 | elif not data: |
99155935 BP |
268 | self.error(EOF) |
269 | return EOF, None | |
270 | else: | |
271 | self.input += data | |
272 | else: | |
273 | if self.parser is None: | |
274 | self.parser = ovs.json.Parser() | |
275 | self.input = self.input[self.parser.feed(self.input):] | |
276 | if self.parser.is_done(): | |
277 | msg = self.__process_msg() | |
278 | if msg: | |
279 | return 0, msg | |
280 | else: | |
281 | return self.status, None | |
282 | ||
283 | def recv_block(self): | |
284 | while True: | |
285 | error, msg = self.recv() | |
286 | if error != errno.EAGAIN: | |
287 | return error, msg | |
288 | ||
289 | self.run() | |
290 | ||
291 | poller = ovs.poller.Poller() | |
292 | self.wait(poller) | |
293 | self.recv_wait(poller) | |
294 | poller.block() | |
26bb0f31 | 295 | |
99155935 | 296 | def transact_block(self, request): |
9b46cccc | 297 | id_ = request.id |
99155935 BP |
298 | |
299 | error = self.send(request) | |
300 | reply = None | |
301 | while not error: | |
302 | error, reply = self.recv_block() | |
9b46cccc | 303 | if reply and reply.type == Message.T_REPLY and reply.id == id_: |
99155935 BP |
304 | break |
305 | return error, reply | |
306 | ||
307 | def __process_msg(self): | |
308 | json = self.parser.finish() | |
309 | self.parser = None | |
310 | if type(json) in [str, unicode]: | |
311 | # XXX rate-limit | |
312 | logging.warning("%s: error parsing stream: %s" % (self.name, json)) | |
313 | self.error(errno.EPROTO) | |
314 | return | |
315 | ||
316 | msg = Message.from_json(json) | |
317 | if not isinstance(msg, Message): | |
318 | # XXX rate-limit | |
319 | logging.warning("%s: received bad JSON-RPC message: %s" | |
320 | % (self.name, msg)) | |
321 | self.error(errno.EPROTO) | |
322 | return | |
323 | ||
324 | self.__log_msg("received", msg) | |
325 | return msg | |
26bb0f31 | 326 | |
99155935 | 327 | def recv_wait(self, poller): |
2ad4ef89 | 328 | if self.status or self.input: |
99155935 BP |
329 | poller.immediate_wake() |
330 | else: | |
331 | self.stream.recv_wait(poller) | |
332 | ||
333 | def error(self, error): | |
334 | if self.status == 0: | |
335 | self.status = error | |
336 | self.stream.close() | |
337 | self.output = "" | |
26bb0f31 EJ |
338 | |
339 | ||
99155935 BP |
340 | class Session(object): |
341 | """A JSON-RPC session with reconnection.""" | |
342 | ||
343 | def __init__(self, reconnect, rpc): | |
344 | self.reconnect = reconnect | |
345 | self.rpc = rpc | |
346 | self.stream = None | |
347 | self.pstream = None | |
348 | self.seqno = 0 | |
349 | ||
350 | @staticmethod | |
351 | def open(name): | |
352 | """Creates and returns a Session that maintains a JSON-RPC session to | |
353 | 'name', which should be a string acceptable to ovs.stream.Stream or | |
354 | ovs.stream.PassiveStream's initializer. | |
26bb0f31 | 355 | |
99155935 BP |
356 | If 'name' is an active connection method, e.g. "tcp:127.1.2.3", the new |
357 | session connects and reconnects, with back-off, to 'name'. | |
26bb0f31 | 358 | |
99155935 BP |
359 | If 'name' is a passive connection method, e.g. "ptcp:", the new session |
360 | listens for connections to 'name'. It maintains at most one connection | |
361 | at any given time. Any new connection causes the previous one (if any) | |
362 | to be dropped.""" | |
363 | reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec()) | |
364 | reconnect.set_name(name) | |
365 | reconnect.enable(ovs.timeval.msec()) | |
366 | ||
367 | if ovs.stream.PassiveStream.is_valid_name(name): | |
4071e24d | 368 | reconnect.set_passive(True, ovs.timeval.msec()) |
99155935 BP |
369 | |
370 | return Session(reconnect, None) | |
371 | ||
372 | @staticmethod | |
373 | def open_unreliably(jsonrpc): | |
374 | reconnect = ovs.reconnect.Reconnect(ovs.timeval.msec()) | |
375 | reconnect.set_quiet(True) | |
22bb61e9 | 376 | reconnect.set_name(jsonrpc.name) |
99155935 BP |
377 | reconnect.set_max_tries(0) |
378 | reconnect.connected(ovs.timeval.msec()) | |
379 | return Session(reconnect, jsonrpc) | |
380 | ||
381 | def close(self): | |
382 | if self.rpc is not None: | |
383 | self.rpc.close() | |
384 | self.rpc = None | |
385 | if self.stream is not None: | |
386 | self.stream.close() | |
387 | self.stream = None | |
388 | if self.pstream is not None: | |
389 | self.pstream.close() | |
390 | self.pstream = None | |
391 | ||
392 | def __disconnect(self): | |
393 | if self.rpc is not None: | |
394 | self.rpc.error(EOF) | |
395 | self.rpc.close() | |
396 | self.rpc = None | |
397 | self.seqno += 1 | |
398 | elif self.stream is not None: | |
399 | self.stream.close() | |
400 | self.stream = None | |
401 | self.seqno += 1 | |
26bb0f31 | 402 | |
99155935 BP |
403 | def __connect(self): |
404 | self.__disconnect() | |
405 | ||
406 | name = self.reconnect.get_name() | |
407 | if not self.reconnect.is_passive(): | |
408 | error, self.stream = ovs.stream.Stream.open(name) | |
409 | if not error: | |
410 | self.reconnect.connecting(ovs.timeval.msec()) | |
411 | else: | |
412 | self.reconnect.connect_failed(ovs.timeval.msec(), error) | |
413 | elif self.pstream is not None: | |
414 | error, self.pstream = ovs.stream.PassiveStream.open(name) | |
415 | if not error: | |
416 | self.reconnect.listening(ovs.timeval.msec()) | |
417 | else: | |
418 | self.reconnect.connect_failed(ovs.timeval.msec(), error) | |
419 | ||
420 | self.seqno += 1 | |
421 | ||
422 | def run(self): | |
423 | if self.pstream is not None: | |
424 | error, stream = self.pstream.accept() | |
425 | if error == 0: | |
426 | if self.rpc or self.stream: | |
427 | # XXX rate-limit | |
428 | logging.info("%s: new connection replacing active " | |
429 | "connection" % self.reconnect.get_name()) | |
430 | self.__disconnect() | |
431 | self.reconnect.connected(ovs.timeval.msec()) | |
432 | self.rpc = Connection(stream) | |
433 | elif error != errno.EAGAIN: | |
434 | self.reconnect.listen_error(ovs.timeval.msec(), error) | |
435 | self.pstream.close() | |
436 | self.pstream = None | |
437 | ||
438 | if self.rpc: | |
439 | self.rpc.run() | |
440 | error = self.rpc.get_status() | |
441 | if error != 0: | |
442 | self.reconnect.disconnected(ovs.timeval.msec(), error) | |
443 | self.__disconnect() | |
444 | elif self.stream is not None: | |
445 | self.stream.run() | |
446 | error = self.stream.connect() | |
447 | if error == 0: | |
448 | self.reconnect.connected(ovs.timeval.msec()) | |
449 | self.rpc = Connection(self.stream) | |
450 | self.stream = None | |
451 | elif error != errno.EAGAIN: | |
452 | self.reconnect.connect_failed(ovs.timeval.msec(), error) | |
453 | self.stream.close() | |
454 | self.stream = None | |
455 | ||
456 | action = self.reconnect.run(ovs.timeval.msec()) | |
457 | if action == ovs.reconnect.CONNECT: | |
458 | self.__connect() | |
459 | elif action == ovs.reconnect.DISCONNECT: | |
460 | self.reconnect.disconnected(ovs.timeval.msec(), 0) | |
461 | self.__disconnect() | |
462 | elif action == ovs.reconnect.PROBE: | |
463 | if self.rpc: | |
464 | request = Message.create_request("echo", []) | |
465 | request.id = "echo" | |
466 | self.rpc.send(request) | |
467 | else: | |
468 | assert action == None | |
469 | ||
470 | def wait(self, poller): | |
471 | if self.rpc is not None: | |
472 | self.rpc.wait(poller) | |
473 | elif self.stream is not None: | |
474 | self.stream.run_wait(poller) | |
475 | self.stream.connect_wait(poller) | |
476 | if self.pstream is not None: | |
477 | self.pstream.wait(poller) | |
478 | self.reconnect.wait(poller, ovs.timeval.msec()) | |
479 | ||
480 | def get_backlog(self): | |
481 | if self.rpc is not None: | |
482 | return self.rpc.get_backlog() | |
483 | else: | |
484 | return 0 | |
485 | ||
486 | def get_name(self): | |
487 | return self.reconnect.get_name() | |
488 | ||
489 | def send(self, msg): | |
490 | if self.rpc is not None: | |
491 | return self.rpc.send(msg) | |
492 | else: | |
493 | return errno.ENOTCONN | |
494 | ||
495 | def recv(self): | |
496 | if self.rpc is not None: | |
497 | error, msg = self.rpc.recv() | |
498 | if not error: | |
499 | self.reconnect.received(ovs.timeval.msec()) | |
500 | if msg.type == Message.T_REQUEST and msg.method == "echo": | |
501 | # Echo request. Send reply. | |
502 | self.send(Message.create_reply(msg.params, msg.id)) | |
503 | elif msg.type == Message.T_REPLY and msg.id == "echo": | |
504 | # It's a reply to our echo request. Suppress it. | |
505 | pass | |
506 | else: | |
507 | return msg | |
508 | return None | |
509 | ||
510 | def recv_wait(self, poller): | |
511 | if self.rpc is not None: | |
512 | self.rpc.recv_wait(poller) | |
513 | ||
514 | def is_alive(self): | |
515 | if self.rpc is not None or self.stream is not None: | |
516 | return True | |
517 | else: | |
518 | max_tries = self.reconnect.get_max_tries() | |
519 | return max_tries is None or max_tries > 0 | |
26bb0f31 | 520 | |
99155935 BP |
521 | def is_connected(self): |
522 | return self.rpc is not None | |
523 | ||
524 | def get_seqno(self): | |
525 | return self.seqno | |
526 | ||
527 | def force_reconnect(self): | |
528 | self.reconnect.force_reconnect(ovs.timeval.msec()) |