]> git.proxmox.com Git - ovs.git/blob - python/ovs/db/idl.py
Remove dependency on python3-six
[ovs.git] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011, 2012, 2013, 2016 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import functools
16 import uuid
17
18 import ovs.db.data as data
19 import ovs.db.parser
20 import ovs.db.schema
21 import ovs.jsonrpc
22 import ovs.ovsuuid
23 import ovs.poller
24 import ovs.vlog
25 from ovs.db import custom_index
26 from ovs.db import error
27
28 vlog = ovs.vlog.Vlog("idl")
29
30 __pychecker__ = 'no-classattr no-objattrs'
31
32 ROW_CREATE = "create"
33 ROW_UPDATE = "update"
34 ROW_DELETE = "delete"
35
36 OVSDB_UPDATE = 0
37 OVSDB_UPDATE2 = 1
38
39 CLUSTERED = "clustered"
40
41
42 class Idl(object):
43 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
44
45 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
46 requests to an OVSDB database server and parses the responses, converting
47 raw JSON into data structures that are easier for clients to digest.
48
49 The IDL also assists with issuing database transactions. The client
50 creates a transaction, manipulates the IDL data structures, and commits or
51 aborts the transaction. The IDL then composes and issues the necessary
52 JSON-RPC requests and reports to the client whether the transaction
53 completed successfully.
54
55 The client is allowed to access the following attributes directly, in a
56 read-only fashion:
57
58 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
59 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
60 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
61 to a Row object.
62
63 The client may directly read and write the Row objects referenced by the
64 'rows' map values. Refer to Row for more details.
65
66 - 'change_seqno': A number that represents the IDL's state. When the IDL
67 is updated (by Idl.run()), its value changes. The sequence number can
68 occasionally change even if the database does not. This happens if the
69 connection to the database drops and reconnects, which causes the
70 database contents to be reloaded even if they didn't change. (It could
71 also happen if the database server sends out a "change" that reflects
72 what the IDL already thought was in the database. The database server is
73 not supposed to do that, but bugs could in theory cause it to do so.)
74
75 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
76 if no lock is configured.
77
78 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
79 lock, and False otherwise.
80
81 Locking and unlocking happens asynchronously from the database client's
82 point of view, so the information is only useful for optimization
83 (e.g. if the client doesn't have the lock then there's no point in trying
84 to write to the database).
85
86 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
87 the database server has indicated that some other client already owns the
88 requested lock, and False otherwise.
89
90 - 'txn': The ovs.db.idl.Transaction object for the database transaction
91 currently being constructed, if there is one, or None otherwise.
92 """
93
94 IDL_S_INITIAL = 0
95 IDL_S_SERVER_SCHEMA_REQUESTED = 1
96 IDL_S_SERVER_MONITOR_REQUESTED = 2
97 IDL_S_DATA_MONITOR_REQUESTED = 3
98 IDL_S_DATA_MONITOR_COND_REQUESTED = 4
99
100 def __init__(self, remote, schema_helper, probe_interval=None,
101 leader_only=True):
102 """Creates and returns a connection to the database named 'db_name' on
103 'remote', which should be in a form acceptable to
104 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
105 replica of the remote database.
106
107 'remote' can be comma separated multiple remotes and each remote
108 should be in a form acceptable to ovs.jsonrpc.session.open().
109
110 'schema_helper' should be an instance of the SchemaHelper class which
111 generates schema for the remote database. The caller may have cut it
112 down by removing tables or columns that are not of interest. The IDL
113 will only replicate the tables and columns that remain. The caller may
114 also add an attribute named 'alert' to selected remaining columns,
115 setting its value to False; if so, then changes to those columns will
116 not be considered changes to the database for the purpose of the return
117 value of Idl.run() and Idl.change_seqno. This is useful for columns
118 that the IDL's client will write but not read.
119
120 As a convenience to users, 'schema' may also be an instance of the
121 SchemaHelper class.
122
123 The IDL uses and modifies 'schema' directly.
124
125 If 'leader_only' is set to True (default value) the IDL will only
126 monitor and transact with the leader of the cluster.
127
128 If "probe_interval" is zero it disables the connection keepalive
129 feature. If non-zero the value will be forced to at least 1000
130 milliseconds. If None it will just use the default value in OVS.
131 """
132
133 assert isinstance(schema_helper, SchemaHelper)
134 schema = schema_helper.get_idl_schema()
135
136 self.tables = schema.tables
137 self.readonly = schema.readonly
138 self._db = schema
139 remotes = self._parse_remotes(remote)
140 self._session = ovs.jsonrpc.Session.open_multiple(remotes,
141 probe_interval=probe_interval)
142 self._monitor_request_id = None
143 self._last_seqno = None
144 self.change_seqno = 0
145 self.uuid = uuid.uuid1()
146
147 # Server monitor.
148 self._server_schema_request_id = None
149 self._server_monitor_request_id = None
150 self._db_change_aware_request_id = None
151 self._server_db_name = '_Server'
152 self._server_db_table = 'Database'
153 self.server_tables = None
154 self._server_db = None
155 self.server_monitor_uuid = uuid.uuid1()
156 self.leader_only = leader_only
157 self.cluster_id = None
158 self._min_index = 0
159
160 self.state = self.IDL_S_INITIAL
161
162 # Database locking.
163 self.lock_name = None # Name of lock we need, None if none.
164 self.has_lock = False # Has db server said we have the lock?
165 self.is_lock_contended = False # Has db server said we can't get lock?
166 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
167
168 # Transaction support.
169 self.txn = None
170 self._outstanding_txns = {}
171
172 for table in schema.tables.values():
173 for column in table.columns.values():
174 if not hasattr(column, 'alert'):
175 column.alert = True
176 table.need_table = False
177 table.rows = custom_index.IndexedRows(table)
178 table.idl = self
179 table.condition = [True]
180 table.cond_changed = False
181
182 def _parse_remotes(self, remote):
183 # If remote is -
184 # "tcp:10.0.0.1:6641,unix:/tmp/db.sock,t,s,tcp:10.0.0.2:6642"
185 # this function returns
186 # ["tcp:10.0.0.1:6641", "unix:/tmp/db.sock,t,s", tcp:10.0.0.2:6642"]
187 remotes = []
188 for r in remote.split(','):
189 if remotes and r.find(":") == -1:
190 remotes[-1] += "," + r
191 else:
192 remotes.append(r)
193 return remotes
194
195 def set_cluster_id(self, cluster_id):
196 """Set the id of the cluster that this idl must connect to."""
197 self.cluster_id = cluster_id
198 if self.state != self.IDL_S_INITIAL:
199 self.force_reconnect()
200
201 def index_create(self, table, name):
202 """Create a named multi-column index on a table"""
203 return self.tables[table].rows.index_create(name)
204
205 def index_irange(self, table, name, start, end):
206 """Return items in a named index between start/end inclusive"""
207 return self.tables[table].rows.indexes[name].irange(start, end)
208
209 def index_equal(self, table, name, value):
210 """Return items in a named index matching a value"""
211 return self.tables[table].rows.indexes[name].irange(value, value)
212
213 def close(self):
214 """Closes the connection to the database. The IDL will no longer
215 update."""
216 self._session.close()
217
218 def run(self):
219 """Processes a batch of messages from the database server. Returns
220 True if the database as seen through the IDL changed, False if it did
221 not change. The initial fetch of the entire contents of the remote
222 database is considered to be one kind of change. If the IDL has been
223 configured to acquire a database lock (with Idl.set_lock()), then
224 successfully acquiring the lock is also considered to be a change.
225
226 This function can return occasional false positives, that is, report
227 that the database changed even though it didn't. This happens if the
228 connection to the database drops and reconnects, which causes the
229 database contents to be reloaded even if they didn't change. (It could
230 also happen if the database server sends out a "change" that reflects
231 what we already thought was in the database, but the database server is
232 not supposed to do that.)
233
234 As an alternative to checking the return value, the client may check
235 for changes in self.change_seqno."""
236 assert not self.txn
237 initial_change_seqno = self.change_seqno
238
239 self.send_cond_change()
240 self._session.run()
241 i = 0
242 while i < 50:
243 i += 1
244 if not self._session.is_connected():
245 break
246
247 seqno = self._session.get_seqno()
248 if seqno != self._last_seqno:
249 self._last_seqno = seqno
250 self.__txn_abort_all()
251 self.__send_server_schema_request()
252 if self.lock_name:
253 self.__send_lock_request()
254 break
255
256 msg = self._session.recv()
257 if msg is None:
258 break
259
260 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
261 and msg.method == "update2"
262 and len(msg.params) == 2):
263 # Database contents changed.
264 self.__parse_update(msg.params[1], OVSDB_UPDATE2)
265 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
266 and msg.method == "update"
267 and len(msg.params) == 2):
268 # Database contents changed.
269 if msg.params[0] == str(self.server_monitor_uuid):
270 self.__parse_update(msg.params[1], OVSDB_UPDATE,
271 tables=self.server_tables)
272 self.change_seqno = initial_change_seqno
273 if not self.__check_server_db():
274 self.force_reconnect()
275 break
276 else:
277 self.__parse_update(msg.params[1], OVSDB_UPDATE)
278 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
279 and self._monitor_request_id is not None
280 and self._monitor_request_id == msg.id):
281 # Reply to our "monitor" request.
282 try:
283 self.change_seqno += 1
284 self._monitor_request_id = None
285 self.__clear()
286 if self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED:
287 self.__parse_update(msg.result, OVSDB_UPDATE2)
288 else:
289 assert self.state == self.IDL_S_DATA_MONITOR_REQUESTED
290 self.__parse_update(msg.result, OVSDB_UPDATE)
291
292 except error.Error as e:
293 vlog.err("%s: parse error in received schema: %s"
294 % (self._session.get_name(), e))
295 self.__error()
296 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
297 and self._server_schema_request_id is not None
298 and self._server_schema_request_id == msg.id):
299 # Reply to our "get_schema" of _Server request.
300 try:
301 self._server_schema_request_id = None
302 sh = SchemaHelper(None, msg.result)
303 sh.register_table(self._server_db_table)
304 schema = sh.get_idl_schema()
305 self._server_db = schema
306 self.server_tables = schema.tables
307 self.__send_server_monitor_request()
308 except error.Error as e:
309 vlog.err("%s: error receiving server schema: %s"
310 % (self._session.get_name(), e))
311 if self.cluster_id:
312 self.__error()
313 break
314 else:
315 self.change_seqno = initial_change_seqno
316 self.__send_monitor_request()
317 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
318 and self._server_monitor_request_id is not None
319 and self._server_monitor_request_id == msg.id):
320 # Reply to our "monitor" of _Server request.
321 try:
322 self._server_monitor_request_id = None
323 self.__parse_update(msg.result, OVSDB_UPDATE,
324 tables=self.server_tables)
325 self.change_seqno = initial_change_seqno
326 if self.__check_server_db():
327 self.__send_monitor_request()
328 self.__send_db_change_aware()
329 else:
330 self.force_reconnect()
331 break
332 except error.Error as e:
333 vlog.err("%s: parse error in received schema: %s"
334 % (self._session.get_name(), e))
335 if self.cluster_id:
336 self.__error()
337 break
338 else:
339 self.change_seqno = initial_change_seqno
340 self.__send_monitor_request()
341 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
342 and self._db_change_aware_request_id is not None
343 and self._db_change_aware_request_id == msg.id):
344 # Reply to us notifying the server of our change awarness.
345 self._db_change_aware_request_id = None
346 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
347 and self._lock_request_id is not None
348 and self._lock_request_id == msg.id):
349 # Reply to our "lock" request.
350 self.__parse_lock_reply(msg.result)
351 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
352 and msg.method == "locked"):
353 # We got our lock.
354 self.__parse_lock_notify(msg.params, True)
355 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
356 and msg.method == "stolen"):
357 # Someone else stole our lock.
358 self.__parse_lock_notify(msg.params, False)
359 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
360 # Reply to our echo request. Ignore it.
361 pass
362 elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
363 self.state == self.IDL_S_DATA_MONITOR_COND_REQUESTED and
364 self._monitor_request_id == msg.id):
365 if msg.error == "unknown method":
366 self.__send_monitor_request()
367 elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
368 self._server_schema_request_id is not None and
369 self._server_schema_request_id == msg.id):
370 self._server_schema_request_id = None
371 if self.cluster_id:
372 self.force_reconnect()
373 break
374 else:
375 self.change_seqno = initial_change_seqno
376 self.__send_monitor_request()
377 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
378 ovs.jsonrpc.Message.T_REPLY)
379 and self.__txn_process_reply(msg)):
380 # __txn_process_reply() did everything needed.
381 pass
382 else:
383 # This can happen if a transaction is destroyed before we
384 # receive the reply, so keep the log level low.
385 vlog.dbg("%s: received unexpected %s message"
386 % (self._session.get_name(),
387 ovs.jsonrpc.Message.type_to_string(msg.type)))
388
389 return initial_change_seqno != self.change_seqno
390
391 def send_cond_change(self):
392 if not self._session.is_connected():
393 return
394
395 for table in self.tables.values():
396 if table.cond_changed:
397 self.__send_cond_change(table, table.condition)
398 table.cond_changed = False
399
400 def cond_change(self, table_name, cond):
401 """Sets the condition for 'table_name' to 'cond', which should be a
402 conditional expression suitable for use directly in the OVSDB
403 protocol, with the exception that the empty condition []
404 matches no rows (instead of matching every row). That is, []
405 is equivalent to [False], not to [True].
406 """
407
408 table = self.tables.get(table_name)
409 if not table:
410 raise error.Error('Unknown table "%s"' % table_name)
411
412 if cond == []:
413 cond = [False]
414 if table.condition != cond:
415 table.condition = cond
416 table.cond_changed = True
417
418 def wait(self, poller):
419 """Arranges for poller.block() to wake up when self.run() has something
420 to do or when activity occurs on a transaction on 'self'."""
421 self._session.wait(poller)
422 self._session.recv_wait(poller)
423
424 def has_ever_connected(self):
425 """Returns True, if the IDL successfully connected to the remote
426 database and retrieved its contents (even if the connection
427 subsequently dropped and is in the process of reconnecting). If so,
428 then the IDL contains an atomic snapshot of the database's contents
429 (but it might be arbitrarily old if the connection dropped).
430
431 Returns False if the IDL has never connected or retrieved the
432 database's contents. If so, the IDL is empty."""
433 return self.change_seqno != 0
434
435 def force_reconnect(self):
436 """Forces the IDL to drop its connection to the database and reconnect.
437 In the meantime, the contents of the IDL will not change."""
438 self._session.force_reconnect()
439
440 def session_name(self):
441 return self._session.get_name()
442
443 def set_lock(self, lock_name):
444 """If 'lock_name' is not None, configures the IDL to obtain the named
445 lock from the database server and to avoid modifying the database when
446 the lock cannot be acquired (that is, when another client has the same
447 lock).
448
449 If 'lock_name' is None, drops the locking requirement and releases the
450 lock."""
451 assert not self.txn
452 assert not self._outstanding_txns
453
454 if self.lock_name and (not lock_name or lock_name != self.lock_name):
455 # Release previous lock.
456 self.__send_unlock_request()
457 self.lock_name = None
458 self.is_lock_contended = False
459
460 if lock_name and not self.lock_name:
461 # Acquire new lock.
462 self.lock_name = lock_name
463 self.__send_lock_request()
464
465 def notify(self, event, row, updates=None):
466 """Hook for implementing create/update/delete notifications
467
468 :param event: The event that was triggered
469 :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE
470 :param row: The row as it is after the operation has occured
471 :type row: Row
472 :param updates: For updates, row with only old values of the changed
473 columns
474 :type updates: Row
475 """
476
477 def __send_cond_change(self, table, cond):
478 monitor_cond_change = {table.name: [{"where": cond}]}
479 old_uuid = str(self.uuid)
480 self.uuid = uuid.uuid1()
481 params = [old_uuid, str(self.uuid), monitor_cond_change]
482 msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
483 self._session.send(msg)
484
485 def __clear(self):
486 changed = False
487
488 for table in self.tables.values():
489 if table.rows:
490 changed = True
491 table.rows = custom_index.IndexedRows(table)
492
493 if changed:
494 self.change_seqno += 1
495
496 def __update_has_lock(self, new_has_lock):
497 if new_has_lock and not self.has_lock:
498 if self._monitor_request_id is None:
499 self.change_seqno += 1
500 else:
501 # We're waiting for a monitor reply, so don't signal that the
502 # database changed. The monitor reply will increment
503 # change_seqno anyhow.
504 pass
505 self.is_lock_contended = False
506 self.has_lock = new_has_lock
507
508 def __do_send_lock_request(self, method):
509 self.__update_has_lock(False)
510 self._lock_request_id = None
511 if self._session.is_connected():
512 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
513 msg_id = msg.id
514 self._session.send(msg)
515 else:
516 msg_id = None
517 return msg_id
518
519 def __send_lock_request(self):
520 self._lock_request_id = self.__do_send_lock_request("lock")
521
522 def __send_unlock_request(self):
523 self.__do_send_lock_request("unlock")
524
525 def __parse_lock_reply(self, result):
526 self._lock_request_id = None
527 got_lock = isinstance(result, dict) and result.get("locked") is True
528 self.__update_has_lock(got_lock)
529 if not got_lock:
530 self.is_lock_contended = True
531
532 def __parse_lock_notify(self, params, new_has_lock):
533 if (self.lock_name is not None
534 and isinstance(params, (list, tuple))
535 and params
536 and params[0] == self.lock_name):
537 self.__update_has_lock(new_has_lock)
538 if not new_has_lock:
539 self.is_lock_contended = True
540
541 def __send_db_change_aware(self):
542 msg = ovs.jsonrpc.Message.create_request("set_db_change_aware",
543 [True])
544 self._db_change_aware_request_id = msg.id
545 self._session.send(msg)
546
547 def __send_monitor_request(self):
548 if (self.state in [self.IDL_S_SERVER_MONITOR_REQUESTED,
549 self.IDL_S_INITIAL]):
550 self.state = self.IDL_S_DATA_MONITOR_COND_REQUESTED
551 method = "monitor_cond"
552 else:
553 self.state = self.IDL_S_DATA_MONITOR_REQUESTED
554 method = "monitor"
555
556 monitor_requests = {}
557 for table in self.tables.values():
558 columns = []
559 for column in table.columns.keys():
560 if ((table.name not in self.readonly) or
561 (table.name in self.readonly) and
562 (column not in self.readonly[table.name])):
563 columns.append(column)
564 monitor_request = {"columns": columns}
565 if method == "monitor_cond" and table.condition != [True]:
566 monitor_request["where"] = table.condition
567 table.cond_change = False
568 monitor_requests[table.name] = [monitor_request]
569
570 msg = ovs.jsonrpc.Message.create_request(
571 method, [self._db.name, str(self.uuid), monitor_requests])
572 self._monitor_request_id = msg.id
573 self._session.send(msg)
574
575 def __send_server_schema_request(self):
576 self.state = self.IDL_S_SERVER_SCHEMA_REQUESTED
577 msg = ovs.jsonrpc.Message.create_request(
578 "get_schema", [self._server_db_name, str(self.uuid)])
579 self._server_schema_request_id = msg.id
580 self._session.send(msg)
581
582 def __send_server_monitor_request(self):
583 self.state = self.IDL_S_SERVER_MONITOR_REQUESTED
584 monitor_requests = {}
585 table = self.server_tables[self._server_db_table]
586 columns = [column for column in table.columns.keys()]
587 for column in table.columns.values():
588 if not hasattr(column, 'alert'):
589 column.alert = True
590 table.rows = custom_index.IndexedRows(table)
591 table.need_table = False
592 table.idl = self
593 monitor_request = {"columns": columns}
594 monitor_requests[table.name] = [monitor_request]
595 msg = ovs.jsonrpc.Message.create_request(
596 'monitor', [self._server_db.name,
597 str(self.server_monitor_uuid),
598 monitor_requests])
599 self._server_monitor_request_id = msg.id
600 self._session.send(msg)
601
602 def __parse_update(self, update, version, tables=None):
603 try:
604 if not tables:
605 self.__do_parse_update(update, version, self.tables)
606 else:
607 self.__do_parse_update(update, version, tables)
608 except error.Error as e:
609 vlog.err("%s: error parsing update: %s"
610 % (self._session.get_name(), e))
611
612 def __do_parse_update(self, table_updates, version, tables):
613 if not isinstance(table_updates, dict):
614 raise error.Error("<table-updates> is not an object",
615 table_updates)
616
617 for table_name, table_update in table_updates.items():
618 table = tables.get(table_name)
619 if not table:
620 raise error.Error('<table-updates> includes unknown '
621 'table "%s"' % table_name)
622
623 if not isinstance(table_update, dict):
624 raise error.Error('<table-update> for table "%s" is not '
625 'an object' % table_name, table_update)
626
627 for uuid_string, row_update in table_update.items():
628 if not ovs.ovsuuid.is_valid_string(uuid_string):
629 raise error.Error('<table-update> for table "%s" '
630 'contains bad UUID "%s" as member '
631 'name' % (table_name, uuid_string),
632 table_update)
633 uuid = ovs.ovsuuid.from_string(uuid_string)
634
635 if not isinstance(row_update, dict):
636 raise error.Error('<table-update> for table "%s" '
637 'contains <row-update> for %s that '
638 'is not an object'
639 % (table_name, uuid_string))
640
641 if version == OVSDB_UPDATE2:
642 if self.__process_update2(table, uuid, row_update):
643 self.change_seqno += 1
644 continue
645
646 parser = ovs.db.parser.Parser(row_update, "row-update")
647 old = parser.get_optional("old", [dict])
648 new = parser.get_optional("new", [dict])
649 parser.finish()
650
651 if not old and not new:
652 raise error.Error('<row-update> missing "old" and '
653 '"new" members', row_update)
654
655 if self.__process_update(table, uuid, old, new):
656 self.change_seqno += 1
657
658 def __process_update2(self, table, uuid, row_update):
659 row = table.rows.get(uuid)
660 changed = False
661 if "delete" in row_update:
662 if row:
663 del table.rows[uuid]
664 self.notify(ROW_DELETE, row)
665 changed = True
666 else:
667 # XXX rate-limit
668 vlog.warn("cannot delete missing row %s from table"
669 "%s" % (uuid, table.name))
670 elif "insert" in row_update or "initial" in row_update:
671 if row:
672 vlog.warn("cannot add existing row %s from table"
673 " %s" % (uuid, table.name))
674 del table.rows[uuid]
675 row = self.__create_row(table, uuid)
676 if "insert" in row_update:
677 row_update = row_update['insert']
678 else:
679 row_update = row_update['initial']
680 self.__add_default(table, row_update)
681 changed = self.__row_update(table, row, row_update)
682 table.rows[uuid] = row
683 if changed:
684 self.notify(ROW_CREATE, row)
685 elif "modify" in row_update:
686 if not row:
687 raise error.Error('Modify non-existing row')
688
689 old_row = self.__apply_diff(table, row, row_update['modify'])
690 self.notify(ROW_UPDATE, row, Row(self, table, uuid, old_row))
691 changed = True
692 else:
693 raise error.Error('<row-update> unknown operation',
694 row_update)
695 return changed
696
697 def __process_update(self, table, uuid, old, new):
698 """Returns True if a column changed, False otherwise."""
699 row = table.rows.get(uuid)
700 changed = False
701 if not new:
702 # Delete row.
703 if row:
704 del table.rows[uuid]
705 changed = True
706 self.notify(ROW_DELETE, row)
707 else:
708 # XXX rate-limit
709 vlog.warn("cannot delete missing row %s from table %s"
710 % (uuid, table.name))
711 elif not old:
712 # Insert row.
713 op = ROW_CREATE
714 if not row:
715 row = self.__create_row(table, uuid)
716 changed = True
717 else:
718 # XXX rate-limit
719 op = ROW_UPDATE
720 vlog.warn("cannot add existing row %s to table %s"
721 % (uuid, table.name))
722 changed |= self.__row_update(table, row, new)
723 if op == ROW_CREATE:
724 table.rows[uuid] = row
725 if changed:
726 self.notify(ROW_CREATE, row)
727 else:
728 op = ROW_UPDATE
729 if not row:
730 row = self.__create_row(table, uuid)
731 changed = True
732 op = ROW_CREATE
733 # XXX rate-limit
734 vlog.warn("cannot modify missing row %s in table %s"
735 % (uuid, table.name))
736 changed |= self.__row_update(table, row, new)
737 if op == ROW_CREATE:
738 table.rows[uuid] = row
739 if changed:
740 self.notify(op, row, Row.from_json(self, table, uuid, old))
741 return changed
742
743 def __check_server_db(self):
744 """Returns True if this is a valid server database, False otherwise."""
745 session_name = self.session_name()
746
747 if self._server_db_table not in self.server_tables:
748 vlog.info("%s: server does not have %s table in its %s database"
749 % (session_name, self._server_db_table,
750 self._server_db_name))
751 return False
752
753 rows = self.server_tables[self._server_db_table].rows
754
755 database = None
756 for row in rows.values():
757 if self.cluster_id:
758 if self.cluster_id in \
759 map(lambda x: str(x)[:4], row.cid):
760 database = row
761 break
762 elif row.name == self._db.name:
763 database = row
764 break
765
766 if not database:
767 vlog.info("%s: server does not have %s database"
768 % (session_name, self._db.name))
769 return False
770
771 if (database.model == CLUSTERED and
772 self._session.get_num_of_remotes() > 1):
773 if not database.schema:
774 vlog.info('%s: clustered database server has not yet joined '
775 'cluster; trying another server' % session_name)
776 return False
777 if not database.connected:
778 vlog.info('%s: clustered database server is disconnected '
779 'from cluster; trying another server' % session_name)
780 return False
781 if (self.leader_only and
782 not database.leader):
783 vlog.info('%s: clustered database server is not cluster '
784 'leader; trying another server' % session_name)
785 return False
786 if database.index:
787 if database.index[0] < self._min_index:
788 vlog.warn('%s: clustered database server has stale data; '
789 'trying another server' % session_name)
790 return False
791 self._min_index = database.index[0]
792
793 return True
794
795 def __column_name(self, column):
796 if column.type.key.type == ovs.db.types.UuidType:
797 return ovs.ovsuuid.to_json(column.type.key.type.default)
798 else:
799 return column.type.key.type.default
800
801 def __add_default(self, table, row_update):
802 for column in table.columns.values():
803 if column.name not in row_update:
804 if ((table.name not in self.readonly) or
805 (table.name in self.readonly) and
806 (column.name not in self.readonly[table.name])):
807 if column.type.n_min != 0 and not column.type.is_map():
808 row_update[column.name] = self.__column_name(column)
809
810 def __apply_diff(self, table, row, row_diff):
811 old_row = {}
812 for column_name, datum_diff_json in row_diff.items():
813 column = table.columns.get(column_name)
814 if not column:
815 # XXX rate-limit
816 vlog.warn("unknown column %s updating table %s"
817 % (column_name, table.name))
818 continue
819
820 try:
821 datum_diff = data.Datum.from_json(column.type, datum_diff_json)
822 except error.Error as e:
823 # XXX rate-limit
824 vlog.warn("error parsing column %s in table %s: %s"
825 % (column_name, table.name, e))
826 continue
827
828 old_row[column_name] = row._data[column_name].copy()
829 datum = row._data[column_name].diff(datum_diff)
830 if datum != row._data[column_name]:
831 row._data[column_name] = datum
832
833 return old_row
834
835 def __row_update(self, table, row, row_json):
836 changed = False
837 for column_name, datum_json in row_json.items():
838 column = table.columns.get(column_name)
839 if not column:
840 # XXX rate-limit
841 vlog.warn("unknown column %s updating table %s"
842 % (column_name, table.name))
843 continue
844
845 try:
846 datum = data.Datum.from_json(column.type, datum_json)
847 except error.Error as e:
848 # XXX rate-limit
849 vlog.warn("error parsing column %s in table %s: %s"
850 % (column_name, table.name, e))
851 continue
852
853 if datum != row._data[column_name]:
854 row._data[column_name] = datum
855 if column.alert:
856 changed = True
857 else:
858 # Didn't really change but the OVSDB monitor protocol always
859 # includes every value in a row.
860 pass
861 return changed
862
863 def __create_row(self, table, uuid):
864 data = {}
865 for column in table.columns.values():
866 data[column.name] = ovs.db.data.Datum.default(column.type)
867 return Row(self, table, uuid, data)
868
869 def __error(self):
870 self._session.force_reconnect()
871
872 def __txn_abort_all(self):
873 while self._outstanding_txns:
874 txn = self._outstanding_txns.popitem()[1]
875 txn._status = Transaction.TRY_AGAIN
876
877 def __txn_process_reply(self, msg):
878 txn = self._outstanding_txns.pop(msg.id, None)
879 if txn:
880 txn._process_reply(msg)
881 return True
882
883
884 def _uuid_to_row(atom, base):
885 if base.ref_table:
886 return base.ref_table.rows.get(atom)
887 else:
888 return atom
889
890
891 def _row_to_uuid(value):
892 if isinstance(value, Row):
893 return value.uuid
894 else:
895 return value
896
897
898 @functools.total_ordering
899 class Row(object):
900 """A row within an IDL.
901
902 The client may access the following attributes directly:
903
904 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
905
906 - An attribute for each column in the Row's table, named for the column,
907 whose values are as returned by Datum.to_python() for the column's type.
908
909 If some error occurs (e.g. the database server's idea of the column is
910 different from the IDL's idea), then the attribute values is the
911 "default" value return by Datum.default() for the column's type. (It is
912 important to know this because the default value may violate constraints
913 for the column's type, e.g. the default integer value is 0 even if column
914 contraints require the column's value to be positive.)
915
916 When a transaction is active, column attributes may also be assigned new
917 values. Committing the transaction will then cause the new value to be
918 stored into the database.
919
920 *NOTE*: In the current implementation, the value of a column is a *copy*
921 of the value in the database. This means that modifying its value
922 directly will have no useful effect. For example, the following:
923 row.mycolumn["a"] = "b" # don't do this
924 will not change anything in the database, even after commit. To modify
925 the column, instead assign the modified column value back to the column:
926 d = row.mycolumn
927 d["a"] = "b"
928 row.mycolumn = d
929 """
930 def __init__(self, idl, table, uuid, data):
931 # All of the explicit references to self.__dict__ below are required
932 # to set real attributes with invoking self.__getattr__().
933 self.__dict__["uuid"] = uuid
934
935 self.__dict__["_idl"] = idl
936 self.__dict__["_table"] = table
937
938 # _data is the committed data. It takes the following values:
939 #
940 # - A dictionary that maps every column name to a Datum, if the row
941 # exists in the committed form of the database.
942 #
943 # - None, if this row is newly inserted within the active transaction
944 # and thus has no committed form.
945 self.__dict__["_data"] = data
946
947 # _changes describes changes to this row within the active transaction.
948 # It takes the following values:
949 #
950 # - {}, the empty dictionary, if no transaction is active or if the
951 # row has yet not been changed within this transaction.
952 #
953 # - A dictionary that maps a column name to its new Datum, if an
954 # active transaction changes those columns' values.
955 #
956 # - A dictionary that maps every column name to a Datum, if the row
957 # is newly inserted within the active transaction.
958 #
959 # - None, if this transaction deletes this row.
960 self.__dict__["_changes"] = {}
961
962 # _mutations describes changes to this row to be handled via a
963 # mutate operation on the wire. It takes the following values:
964 #
965 # - {}, the empty dictionary, if no transaction is active or if the
966 # row has yet not been mutated within this transaction.
967 #
968 # - A dictionary that contains two keys:
969 #
970 # - "_inserts" contains a dictionary that maps column names to
971 # new keys/key-value pairs that should be inserted into the
972 # column
973 # - "_removes" contains a dictionary that maps column names to
974 # the keys/key-value pairs that should be removed from the
975 # column
976 #
977 # - None, if this transaction deletes this row.
978 self.__dict__["_mutations"] = {}
979
980 # A dictionary whose keys are the names of columns that must be
981 # verified as prerequisites when the transaction commits. The values
982 # in the dictionary are all None.
983 self.__dict__["_prereqs"] = {}
984
985 def __lt__(self, other):
986 if not isinstance(other, Row):
987 return NotImplemented
988 return bool(self.__dict__['uuid'] < other.__dict__['uuid'])
989
990 def __eq__(self, other):
991 if not isinstance(other, Row):
992 return NotImplemented
993 return bool(self.__dict__['uuid'] == other.__dict__['uuid'])
994
995 def __hash__(self):
996 return int(self.__dict__['uuid'])
997
998 def __str__(self):
999 return "{table}({data})".format(
1000 table=self._table.name,
1001 data=", ".join("{col}={val}".format(col=c, val=getattr(self, c))
1002 for c in sorted(self._table.columns)))
1003
1004 def __getattr__(self, column_name):
1005 assert self._changes is not None
1006 assert self._mutations is not None
1007
1008 try:
1009 column = self._table.columns[column_name]
1010 except KeyError:
1011 raise AttributeError("%s instance has no attribute '%s'" %
1012 (self.__class__.__name__, column_name))
1013 datum = self._changes.get(column_name)
1014 inserts = None
1015 if '_inserts' in self._mutations.keys():
1016 inserts = self._mutations['_inserts'].get(column_name)
1017 removes = None
1018 if '_removes' in self._mutations.keys():
1019 removes = self._mutations['_removes'].get(column_name)
1020 if datum is None:
1021 if self._data is None:
1022 if inserts is None:
1023 raise AttributeError("%s instance has no attribute '%s'" %
1024 (self.__class__.__name__,
1025 column_name))
1026 else:
1027 datum = data.Datum.from_python(column.type,
1028 inserts,
1029 _row_to_uuid)
1030 elif column_name in self._data:
1031 datum = self._data[column_name]
1032 if column.type.is_set():
1033 dlist = datum.as_list()
1034 if inserts is not None:
1035 dlist.extend(list(inserts))
1036 if removes is not None:
1037 removes_datum = data.Datum.from_python(column.type,
1038 removes,
1039 _row_to_uuid)
1040 removes_list = removes_datum.as_list()
1041 dlist = [x for x in dlist if x not in removes_list]
1042 datum = data.Datum.from_python(column.type, dlist,
1043 _row_to_uuid)
1044 elif column.type.is_map():
1045 dmap = datum.to_python(_uuid_to_row)
1046 if inserts is not None:
1047 dmap.update(inserts)
1048 if removes is not None:
1049 for key in removes:
1050 if key not in (inserts or {}):
1051 dmap.pop(key, None)
1052 datum = data.Datum.from_python(column.type, dmap,
1053 _row_to_uuid)
1054 else:
1055 if inserts is None:
1056 raise AttributeError("%s instance has no attribute '%s'" %
1057 (self.__class__.__name__,
1058 column_name))
1059 else:
1060 datum = inserts
1061
1062 return datum.to_python(_uuid_to_row)
1063
1064 def __setattr__(self, column_name, value):
1065 assert self._changes is not None
1066 assert self._idl.txn
1067
1068 if ((self._table.name in self._idl.readonly) and
1069 (column_name in self._idl.readonly[self._table.name])):
1070 vlog.warn("attempting to write to readonly column %s"
1071 % column_name)
1072 return
1073
1074 column = self._table.columns[column_name]
1075 try:
1076 datum = data.Datum.from_python(column.type, value, _row_to_uuid)
1077 except error.Error as e:
1078 # XXX rate-limit
1079 vlog.err("attempting to write bad value to column %s (%s)"
1080 % (column_name, e))
1081 return
1082 # Remove prior version of the Row from the index if it has the indexed
1083 # column set, and the column changing is an indexed column
1084 if hasattr(self, column_name):
1085 for idx in self._table.rows.indexes.values():
1086 if column_name in (c.column for c in idx.columns):
1087 idx.remove(self)
1088 self._idl.txn._write(self, column, datum)
1089 for idx in self._table.rows.indexes.values():
1090 # Only update the index if indexed columns change
1091 if column_name in (c.column for c in idx.columns):
1092 idx.add(self)
1093
1094 def addvalue(self, column_name, key):
1095 self._idl.txn._txn_rows[self.uuid] = self
1096 column = self._table.columns[column_name]
1097 try:
1098 data.Datum.from_python(column.type, key, _row_to_uuid)
1099 except error.Error as e:
1100 # XXX rate-limit
1101 vlog.err("attempting to write bad value to column %s (%s)"
1102 % (column_name, e))
1103 return
1104 inserts = self._mutations.setdefault('_inserts', {})
1105 column_value = inserts.setdefault(column_name, set())
1106 column_value.add(key)
1107
1108 def delvalue(self, column_name, key):
1109 self._idl.txn._txn_rows[self.uuid] = self
1110 column = self._table.columns[column_name]
1111 try:
1112 data.Datum.from_python(column.type, key, _row_to_uuid)
1113 except error.Error as e:
1114 # XXX rate-limit
1115 vlog.err("attempting to delete bad value from column %s (%s)"
1116 % (column_name, e))
1117 return
1118 removes = self._mutations.setdefault('_removes', {})
1119 column_value = removes.setdefault(column_name, set())
1120 column_value.add(key)
1121
1122 def setkey(self, column_name, key, value):
1123 self._idl.txn._txn_rows[self.uuid] = self
1124 column = self._table.columns[column_name]
1125 try:
1126 data.Datum.from_python(column.type, {key: value}, _row_to_uuid)
1127 except error.Error as e:
1128 # XXX rate-limit
1129 vlog.err("attempting to write bad value to column %s (%s)"
1130 % (column_name, e))
1131 return
1132 if self._data and column_name in self._data:
1133 # Remove existing key/value before updating.
1134 removes = self._mutations.setdefault('_removes', {})
1135 column_value = removes.setdefault(column_name, set())
1136 column_value.add(key)
1137 inserts = self._mutations.setdefault('_inserts', {})
1138 column_value = inserts.setdefault(column_name, {})
1139 column_value[key] = value
1140
1141 def delkey(self, column_name, key, value=None):
1142 self._idl.txn._txn_rows[self.uuid] = self
1143 if value:
1144 try:
1145 old_value = data.Datum.to_python(self._data[column_name],
1146 _uuid_to_row)
1147 except error.Error:
1148 return
1149 if key not in old_value:
1150 return
1151 if old_value[key] != value:
1152 return
1153 removes = self._mutations.setdefault('_removes', {})
1154 column_value = removes.setdefault(column_name, set())
1155 column_value.add(key)
1156 return
1157
1158 @classmethod
1159 def from_json(cls, idl, table, uuid, row_json):
1160 data = {}
1161 for column_name, datum_json in row_json.items():
1162 column = table.columns.get(column_name)
1163 if not column:
1164 # XXX rate-limit
1165 vlog.warn("unknown column %s in table %s"
1166 % (column_name, table.name))
1167 continue
1168 try:
1169 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
1170 except error.Error as e:
1171 # XXX rate-limit
1172 vlog.warn("error parsing column %s in table %s: %s"
1173 % (column_name, table.name, e))
1174 continue
1175 data[column_name] = datum
1176 return cls(idl, table, uuid, data)
1177
1178 def verify(self, column_name):
1179 """Causes the original contents of column 'column_name' in this row to
1180 be verified as a prerequisite to completing the transaction. That is,
1181 if 'column_name' changed in this row (or if this row was deleted)
1182 between the time that the IDL originally read its contents and the time
1183 that the transaction commits, then the transaction aborts and
1184 Transaction.commit() returns Transaction.TRY_AGAIN.
1185
1186 The intention is that, to ensure that no transaction commits based on
1187 dirty reads, an application should call Row.verify() on each data item
1188 read as part of a read-modify-write operation.
1189
1190 In some cases Row.verify() reduces to a no-op, because the current
1191 value of the column is already known:
1192
1193 - If this row is a row created by the current transaction (returned
1194 by Transaction.insert()).
1195
1196 - If the column has already been modified within the current
1197 transaction.
1198
1199 Because of the latter property, always call Row.verify() *before*
1200 modifying the column, for a given read-modify-write.
1201
1202 A transaction must be in progress."""
1203 assert self._idl.txn
1204 assert self._changes is not None
1205 if not self._data or column_name in self._changes:
1206 return
1207
1208 self._prereqs[column_name] = None
1209
1210 def delete(self):
1211 """Deletes this row from its table.
1212
1213 A transaction must be in progress."""
1214 assert self._idl.txn
1215 assert self._changes is not None
1216 if self._data is None:
1217 del self._idl.txn._txn_rows[self.uuid]
1218 else:
1219 self._idl.txn._txn_rows[self.uuid] = self
1220 del self._table.rows[self.uuid]
1221 self.__dict__["_changes"] = None
1222
1223 def fetch(self, column_name):
1224 self._idl.txn._fetch(self, column_name)
1225
1226 def increment(self, column_name):
1227 """Causes the transaction, when committed, to increment the value of
1228 'column_name' within this row by 1. 'column_name' must have an integer
1229 type. After the transaction commits successfully, the client may
1230 retrieve the final (incremented) value of 'column_name' with
1231 Transaction.get_increment_new_value().
1232
1233 The client could accomplish something similar by reading and writing
1234 and verify()ing columns. However, increment() will never (by itself)
1235 cause a transaction to fail because of a verify error.
1236
1237 The intended use is for incrementing the "next_cfg" column in
1238 the Open_vSwitch table."""
1239 self._idl.txn._increment(self, column_name)
1240
1241
1242 def _uuid_name_from_uuid(uuid):
1243 return "row%s" % str(uuid).replace("-", "_")
1244
1245
1246 def _where_uuid_equals(uuid):
1247 return [["_uuid", "==", ["uuid", str(uuid)]]]
1248
1249
1250 class _InsertedRow(object):
1251 def __init__(self, op_index):
1252 self.op_index = op_index
1253 self.real = None
1254
1255
1256 class Transaction(object):
1257 """A transaction may modify the contents of a database by modifying the
1258 values of columns, deleting rows, inserting rows, or adding checks that
1259 columns in the database have not changed ("verify" operations), through
1260 Row methods.
1261
1262 Reading and writing columns and inserting and deleting rows are all
1263 straightforward. The reasons to verify columns are less obvious.
1264 Verification is the key to maintaining transactional integrity. Because
1265 OVSDB handles multiple clients, it can happen that between the time that
1266 OVSDB client A reads a column and writes a new value, OVSDB client B has
1267 written that column. Client A's write should not ordinarily overwrite
1268 client B's, especially if the column in question is a "map" column that
1269 contains several more or less independent data items. If client A adds a
1270 "verify" operation before it writes the column, then the transaction fails
1271 in case client B modifies it first. Client A will then see the new value
1272 of the column and compose a new transaction based on the new contents
1273 written by client B.
1274
1275 When a transaction is complete, which must be before the next call to
1276 Idl.run(), call Transaction.commit() or Transaction.abort().
1277
1278 The life-cycle of a transaction looks like this:
1279
1280 1. Create the transaction and record the initial sequence number:
1281
1282 seqno = idl.change_seqno(idl)
1283 txn = Transaction(idl)
1284
1285 2. Modify the database with Row and Transaction methods.
1286
1287 3. Commit the transaction by calling Transaction.commit(). The first call
1288 to this function probably returns Transaction.INCOMPLETE. The client
1289 must keep calling again along as this remains true, calling Idl.run() in
1290 between to let the IDL do protocol processing. (If the client doesn't
1291 have anything else to do in the meantime, it can use
1292 Transaction.commit_block() to avoid having to loop itself.)
1293
1294 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
1295 to change from the saved 'seqno' (it's possible that it's already
1296 changed, in which case the client should not wait at all), then start
1297 over from step 1. Only a call to Idl.run() will change the return value
1298 of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)"""
1299
1300 # Status values that Transaction.commit() can return.
1301
1302 # Not yet committed or aborted.
1303 UNCOMMITTED = "uncommitted"
1304 # Transaction didn't include any changes.
1305 UNCHANGED = "unchanged"
1306 # Commit in progress, please wait.
1307 INCOMPLETE = "incomplete"
1308 # ovsdb_idl_txn_abort() called.
1309 ABORTED = "aborted"
1310 # Commit successful.
1311 SUCCESS = "success"
1312 # Commit failed because a "verify" operation
1313 # reported an inconsistency, due to a network
1314 # problem, or other transient failure. Wait
1315 # for a change, then try again.
1316 TRY_AGAIN = "try again"
1317 # Server hasn't given us the lock yet.
1318 NOT_LOCKED = "not locked"
1319 # Commit failed due to a hard error.
1320 ERROR = "error"
1321
1322 @staticmethod
1323 def status_to_string(status):
1324 """Converts one of the status values that Transaction.commit() can
1325 return into a human-readable string.
1326
1327 (The status values are in fact such strings already, so
1328 there's nothing to do.)"""
1329 return status
1330
1331 def __init__(self, idl):
1332 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
1333 A given Idl may only have a single active transaction at a time.
1334
1335 A Transaction may modify the contents of a database by assigning new
1336 values to columns (attributes of Row), deleting rows (with
1337 Row.delete()), or inserting rows (with Transaction.insert()). It may
1338 also check that columns in the database have not changed with
1339 Row.verify().
1340
1341 When a transaction is complete (which must be before the next call to
1342 Idl.run()), call Transaction.commit() or Transaction.abort()."""
1343 assert idl.txn is None
1344
1345 idl.txn = self
1346 self._request_id = None
1347 self.idl = idl
1348 self.dry_run = False
1349 self._txn_rows = {}
1350 self._status = Transaction.UNCOMMITTED
1351 self._error = None
1352 self._comments = []
1353
1354 self._inc_row = None
1355 self._inc_column = None
1356
1357 self._fetch_requests = []
1358
1359 self._inserted_rows = {} # Map from UUID to _InsertedRow
1360
1361 def add_comment(self, comment):
1362 """Appends 'comment' to the comments that will be passed to the OVSDB
1363 server when this transaction is committed. (The comment will be
1364 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
1365 relatively human-readable form.)"""
1366 self._comments.append(comment)
1367
1368 def wait(self, poller):
1369 """Causes poll_block() to wake up if this transaction has completed
1370 committing."""
1371 if self._status not in (Transaction.UNCOMMITTED,
1372 Transaction.INCOMPLETE):
1373 poller.immediate_wake()
1374
1375 def _substitute_uuids(self, json):
1376 if isinstance(json, (list, tuple)):
1377 if (len(json) == 2
1378 and json[0] == 'uuid'
1379 and ovs.ovsuuid.is_valid_string(json[1])):
1380 uuid = ovs.ovsuuid.from_string(json[1])
1381 row = self._txn_rows.get(uuid, None)
1382 if row and row._data is None:
1383 return ["named-uuid", _uuid_name_from_uuid(uuid)]
1384 else:
1385 return [self._substitute_uuids(elem) for elem in json]
1386 return json
1387
1388 def __disassemble(self):
1389 self.idl.txn = None
1390
1391 for row in self._txn_rows.values():
1392 if row._changes is None:
1393 # If we add the deleted row back to rows with _changes == None
1394 # then __getattr__ will not work for the indexes
1395 row.__dict__["_changes"] = {}
1396 row.__dict__["_mutations"] = {}
1397 row._table.rows[row.uuid] = row
1398 elif row._data is None:
1399 del row._table.rows[row.uuid]
1400 row.__dict__["_changes"] = {}
1401 row.__dict__["_mutations"] = {}
1402 row.__dict__["_prereqs"] = {}
1403 self._txn_rows = {}
1404
1405 def commit(self):
1406 """Attempts to commit 'txn'. Returns the status of the commit
1407 operation, one of the following constants:
1408
1409 Transaction.INCOMPLETE:
1410
1411 The transaction is in progress, but not yet complete. The caller
1412 should call again later, after calling Idl.run() to let the
1413 IDL do OVSDB protocol processing.
1414
1415 Transaction.UNCHANGED:
1416
1417 The transaction is complete. (It didn't actually change the
1418 database, so the IDL didn't send any request to the database
1419 server.)
1420
1421 Transaction.ABORTED:
1422
1423 The caller previously called Transaction.abort().
1424
1425 Transaction.SUCCESS:
1426
1427 The transaction was successful. The update made by the
1428 transaction (and possibly other changes made by other database
1429 clients) should already be visible in the IDL.
1430
1431 Transaction.TRY_AGAIN:
1432
1433 The transaction failed for some transient reason, e.g. because a
1434 "verify" operation reported an inconsistency or due to a network
1435 problem. The caller should wait for a change to the database,
1436 then compose a new transaction, and commit the new transaction.
1437
1438 Use Idl.change_seqno to wait for a change in the database. It is
1439 important to use its value *before* the initial call to
1440 Transaction.commit() as the baseline for this purpose, because
1441 the change that one should wait for can happen after the initial
1442 call but before the call that returns Transaction.TRY_AGAIN, and
1443 using some other baseline value in that situation could cause an
1444 indefinite wait if the database rarely changes.
1445
1446 Transaction.NOT_LOCKED:
1447
1448 The transaction failed because the IDL has been configured to
1449 require a database lock (with Idl.set_lock()) but didn't
1450 get it yet or has already lost it.
1451
1452 Committing a transaction rolls back all of the changes that it made to
1453 the IDL's copy of the database. If the transaction commits
1454 successfully, then the database server will send an update and, thus,
1455 the IDL will be updated with the committed changes."""
1456 # The status can only change if we're the active transaction.
1457 # (Otherwise, our status will change only in Idl.run().)
1458 if self != self.idl.txn:
1459 return self._status
1460
1461 # If we need a lock but don't have it, give up quickly.
1462 if self.idl.lock_name and not self.idl.has_lock:
1463 self._status = Transaction.NOT_LOCKED
1464 self.__disassemble()
1465 return self._status
1466
1467 operations = [self.idl._db.name]
1468
1469 # Assert that we have the required lock (avoiding a race).
1470 if self.idl.lock_name:
1471 operations.append({"op": "assert",
1472 "lock": self.idl.lock_name})
1473
1474 # Add prerequisites and declarations of new rows.
1475 for row in self._txn_rows.values():
1476 if row._prereqs:
1477 rows = {}
1478 columns = []
1479 for column_name in row._prereqs:
1480 columns.append(column_name)
1481 rows[column_name] = row._data[column_name].to_json()
1482 operations.append({"op": "wait",
1483 "table": row._table.name,
1484 "timeout": 0,
1485 "where": _where_uuid_equals(row.uuid),
1486 "until": "==",
1487 "columns": columns,
1488 "rows": [rows]})
1489
1490 # Add updates.
1491 any_updates = False
1492 for row in self._txn_rows.values():
1493 if row._changes is None:
1494 if row._table.is_root:
1495 operations.append({"op": "delete",
1496 "table": row._table.name,
1497 "where": _where_uuid_equals(row.uuid)})
1498 any_updates = True
1499 else:
1500 # Let ovsdb-server decide whether to really delete it.
1501 pass
1502 elif row._changes:
1503 op = {"table": row._table.name}
1504 if row._data is None:
1505 op["op"] = "insert"
1506 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
1507 any_updates = True
1508
1509 op_index = len(operations) - 1
1510 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
1511 else:
1512 op["op"] = "update"
1513 op["where"] = _where_uuid_equals(row.uuid)
1514
1515 row_json = {}
1516 op["row"] = row_json
1517
1518 for column_name, datum in row._changes.items():
1519 if row._data is not None or not datum.is_default():
1520 row_json[column_name] = (
1521 self._substitute_uuids(datum.to_json()))
1522
1523 # If anything really changed, consider it an update.
1524 # We can't suppress not-really-changed values earlier
1525 # or transactions would become nonatomic (see the big
1526 # comment inside Transaction._write()).
1527 if (not any_updates and row._data is not None and
1528 row._data[column_name] != datum):
1529 any_updates = True
1530
1531 if row._data is None or row_json:
1532 operations.append(op)
1533 if row._mutations:
1534 addop = False
1535 op = {"table": row._table.name}
1536 op["op"] = "mutate"
1537 if row._data is None:
1538 # New row
1539 op["where"] = self._substitute_uuids(
1540 _where_uuid_equals(row.uuid))
1541 else:
1542 # Existing row
1543 op["where"] = _where_uuid_equals(row.uuid)
1544 op["mutations"] = []
1545 if '_removes' in row._mutations.keys():
1546 for col, dat in row._mutations['_removes'].items():
1547 column = row._table.columns[col]
1548 if column.type.is_map():
1549 opdat = ["set"]
1550 opdat.append(list(dat))
1551 else:
1552 opdat = ["set"]
1553 inner_opdat = []
1554 for ele in dat:
1555 try:
1556 datum = data.Datum.from_python(column.type,
1557 ele, _row_to_uuid)
1558 except error.Error:
1559 return
1560 inner_opdat.append(
1561 self._substitute_uuids(datum.to_json()))
1562 opdat.append(inner_opdat)
1563 mutation = [col, "delete", opdat]
1564 op["mutations"].append(mutation)
1565 addop = True
1566 if '_inserts' in row._mutations.keys():
1567 for col, val in row._mutations['_inserts'].items():
1568 column = row._table.columns[col]
1569 if column.type.is_map():
1570 opdat = ["map"]
1571 datum = data.Datum.from_python(column.type, val,
1572 _row_to_uuid)
1573 opdat.append(datum.as_list())
1574 else:
1575 opdat = ["set"]
1576 inner_opdat = []
1577 for ele in val:
1578 try:
1579 datum = data.Datum.from_python(column.type,
1580 ele, _row_to_uuid)
1581 except error.Error:
1582 return
1583 inner_opdat.append(
1584 self._substitute_uuids(datum.to_json()))
1585 opdat.append(inner_opdat)
1586 mutation = [col, "insert", opdat]
1587 op["mutations"].append(mutation)
1588 addop = True
1589 if addop:
1590 operations.append(op)
1591 any_updates = True
1592
1593 if self._fetch_requests:
1594 for fetch in self._fetch_requests:
1595 fetch["index"] = len(operations) - 1
1596 operations.append({"op": "select",
1597 "table": fetch["row"]._table.name,
1598 "where": self._substitute_uuids(
1599 _where_uuid_equals(fetch["row"].uuid)),
1600 "columns": [fetch["column_name"]]})
1601 any_updates = True
1602
1603 # Add increment.
1604 if self._inc_row and any_updates:
1605 self._inc_index = len(operations) - 1
1606
1607 operations.append({"op": "mutate",
1608 "table": self._inc_row._table.name,
1609 "where": self._substitute_uuids(
1610 _where_uuid_equals(self._inc_row.uuid)),
1611 "mutations": [[self._inc_column, "+=", 1]]})
1612 operations.append({"op": "select",
1613 "table": self._inc_row._table.name,
1614 "where": self._substitute_uuids(
1615 _where_uuid_equals(self._inc_row.uuid)),
1616 "columns": [self._inc_column]})
1617
1618 # Add comment.
1619 if self._comments:
1620 operations.append({"op": "comment",
1621 "comment": "\n".join(self._comments)})
1622
1623 # Dry run?
1624 if self.dry_run:
1625 operations.append({"op": "abort"})
1626
1627 if not any_updates:
1628 self._status = Transaction.UNCHANGED
1629 else:
1630 msg = ovs.jsonrpc.Message.create_request("transact", operations)
1631 self._request_id = msg.id
1632 if not self.idl._session.send(msg):
1633 self.idl._outstanding_txns[self._request_id] = self
1634 self._status = Transaction.INCOMPLETE
1635 else:
1636 self._status = Transaction.TRY_AGAIN
1637
1638 self.__disassemble()
1639 return self._status
1640
1641 def commit_block(self):
1642 """Attempts to commit this transaction, blocking until the commit
1643 either succeeds or fails. Returns the final commit status, which may
1644 be any Transaction.* value other than Transaction.INCOMPLETE.
1645
1646 This function calls Idl.run() on this transaction'ss IDL, so it may
1647 cause Idl.change_seqno to change."""
1648 while True:
1649 status = self.commit()
1650 if status != Transaction.INCOMPLETE:
1651 return status
1652
1653 self.idl.run()
1654
1655 poller = ovs.poller.Poller()
1656 self.idl.wait(poller)
1657 self.wait(poller)
1658 poller.block()
1659
1660 def get_increment_new_value(self):
1661 """Returns the final (incremented) value of the column in this
1662 transaction that was set to be incremented by Row.increment. This
1663 transaction must have committed successfully."""
1664 assert self._status == Transaction.SUCCESS
1665 return self._inc_new_value
1666
1667 def abort(self):
1668 """Aborts this transaction. If Transaction.commit() has already been
1669 called then the transaction might get committed anyhow."""
1670 self.__disassemble()
1671 if self._status in (Transaction.UNCOMMITTED,
1672 Transaction.INCOMPLETE):
1673 self._status = Transaction.ABORTED
1674
1675 def get_error(self):
1676 """Returns a string representing this transaction's current status,
1677 suitable for use in log messages."""
1678 if self._status != Transaction.ERROR:
1679 return Transaction.status_to_string(self._status)
1680 elif self._error:
1681 return self._error
1682 else:
1683 return "no error details available"
1684
1685 def __set_error_json(self, json):
1686 if self._error is None:
1687 self._error = ovs.json.to_string(json)
1688
1689 def get_insert_uuid(self, uuid):
1690 """Finds and returns the permanent UUID that the database assigned to a
1691 newly inserted row, given the UUID that Transaction.insert() assigned
1692 locally to that row.
1693
1694 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1695 or if it was assigned by that function and then deleted by Row.delete()
1696 within the same transaction. (Rows that are inserted and then deleted
1697 within a single transaction are never sent to the database server, so
1698 it never assigns them a permanent UUID.)
1699
1700 This transaction must have completed successfully."""
1701 assert self._status in (Transaction.SUCCESS,
1702 Transaction.UNCHANGED)
1703 inserted_row = self._inserted_rows.get(uuid)
1704 if inserted_row:
1705 return inserted_row.real
1706 return None
1707
1708 def _increment(self, row, column):
1709 assert not self._inc_row
1710 self._inc_row = row
1711 self._inc_column = column
1712
1713 def _fetch(self, row, column_name):
1714 self._fetch_requests.append({"row": row, "column_name": column_name})
1715
1716 def _write(self, row, column, datum):
1717 assert row._changes is not None
1718 assert row._mutations is not None
1719
1720 txn = row._idl.txn
1721
1722 # If this is a write-only column and the datum being written is the
1723 # same as the one already there, just skip the update entirely. This
1724 # is worth optimizing because we have a lot of columns that get
1725 # periodically refreshed into the database but don't actually change
1726 # that often.
1727 #
1728 # We don't do this for read/write columns because that would break
1729 # atomicity of transactions--some other client might have written a
1730 # different value in that column since we read it. (But if a whole
1731 # transaction only does writes of existing values, without making any
1732 # real changes, we will drop the whole transaction later in
1733 # ovsdb_idl_txn_commit().)
1734 if (not column.alert and row._data and
1735 row._data.get(column.name) == datum):
1736 new_value = row._changes.get(column.name)
1737 if new_value is None or new_value == datum:
1738 return
1739
1740 txn._txn_rows[row.uuid] = row
1741 if '_inserts' in row._mutations:
1742 row._mutations['_inserts'].pop(column.name, None)
1743 if '_removes' in row._mutations:
1744 row._mutations['_removes'].pop(column.name, None)
1745 row._changes[column.name] = datum.copy()
1746
1747 def insert(self, table, new_uuid=None):
1748 """Inserts and returns a new row in 'table', which must be one of the
1749 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1750
1751 The new row is assigned a provisional UUID. If 'uuid' is None then one
1752 is randomly generated; otherwise 'uuid' should specify a randomly
1753 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
1754 different UUID when 'txn' is committed, but the IDL will replace any
1755 uses of the provisional UUID in the data to be to be committed by the
1756 UUID assigned by ovsdb-server."""
1757 assert self._status == Transaction.UNCOMMITTED
1758 if new_uuid is None:
1759 new_uuid = uuid.uuid4()
1760 row = Row(self.idl, table, new_uuid, None)
1761 table.rows[row.uuid] = row
1762 self._txn_rows[row.uuid] = row
1763 return row
1764
1765 def _process_reply(self, msg):
1766 if msg.type == ovs.jsonrpc.Message.T_ERROR:
1767 self._status = Transaction.ERROR
1768 elif not isinstance(msg.result, (list, tuple)):
1769 # XXX rate-limit
1770 vlog.warn('reply to "transact" is not JSON array')
1771 else:
1772 hard_errors = False
1773 soft_errors = False
1774 lock_errors = False
1775
1776 ops = msg.result
1777 for op in ops:
1778 if op is None:
1779 # This isn't an error in itself but indicates that some
1780 # prior operation failed, so make sure that we know about
1781 # it.
1782 soft_errors = True
1783 elif isinstance(op, dict):
1784 error = op.get("error")
1785 if error is not None:
1786 if error == "timed out":
1787 soft_errors = True
1788 elif error == "not owner":
1789 lock_errors = True
1790 elif error == "aborted":
1791 pass
1792 else:
1793 hard_errors = True
1794 self.__set_error_json(op)
1795 else:
1796 hard_errors = True
1797 self.__set_error_json(op)
1798 # XXX rate-limit
1799 vlog.warn("operation reply is not JSON null or object")
1800
1801 if not soft_errors and not hard_errors and not lock_errors:
1802 if self._inc_row and not self.__process_inc_reply(ops):
1803 hard_errors = True
1804 if self._fetch_requests:
1805 if self.__process_fetch_reply(ops):
1806 self.idl.change_seqno += 1
1807 else:
1808 hard_errors = True
1809
1810 for insert in self._inserted_rows.values():
1811 if not self.__process_insert_reply(insert, ops):
1812 hard_errors = True
1813
1814 if hard_errors:
1815 self._status = Transaction.ERROR
1816 elif lock_errors:
1817 self._status = Transaction.NOT_LOCKED
1818 elif soft_errors:
1819 self._status = Transaction.TRY_AGAIN
1820 else:
1821 self._status = Transaction.SUCCESS
1822
1823 @staticmethod
1824 def __check_json_type(json, types, name):
1825 if not json:
1826 # XXX rate-limit
1827 vlog.warn("%s is missing" % name)
1828 return False
1829 elif not isinstance(json, tuple(types)):
1830 # XXX rate-limit
1831 vlog.warn("%s has unexpected type %s" % (name, type(json)))
1832 return False
1833 else:
1834 return True
1835
1836 def __process_fetch_reply(self, ops):
1837 update = False
1838 for fetch_request in self._fetch_requests:
1839 row = fetch_request["row"]
1840 column_name = fetch_request["column_name"]
1841 index = fetch_request["index"]
1842 table = row._table
1843
1844 select = ops[index]
1845 fetched_rows = select.get("rows")
1846 if not Transaction.__check_json_type(fetched_rows, (list, tuple),
1847 '"select" reply "rows"'):
1848 return False
1849 if len(fetched_rows) != 1:
1850 # XXX rate-limit
1851 vlog.warn('"select" reply "rows" has %d elements '
1852 'instead of 1' % len(fetched_rows))
1853 continue
1854 fetched_row = fetched_rows[0]
1855 if not Transaction.__check_json_type(fetched_row, (dict,),
1856 '"select" reply row'):
1857 continue
1858
1859 column = table.columns.get(column_name)
1860 datum_json = fetched_row.get(column_name)
1861 datum = data.Datum.from_json(column.type, datum_json)
1862
1863 row._data[column_name] = datum
1864 update = True
1865
1866 return update
1867
1868 def __process_inc_reply(self, ops):
1869 if self._inc_index + 2 > len(ops):
1870 # XXX rate-limit
1871 vlog.warn("reply does not contain enough operations for "
1872 "increment (has %d, needs %d)" %
1873 (len(ops), self._inc_index + 2))
1874
1875 # We know that this is a JSON object because the loop in
1876 # __process_reply() already checked.
1877 mutate = ops[self._inc_index]
1878 count = mutate.get("count")
1879 if not Transaction.__check_json_type(count, (int,),
1880 '"mutate" reply "count"'):
1881 return False
1882 if count != 1:
1883 # XXX rate-limit
1884 vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1885 return False
1886
1887 select = ops[self._inc_index + 1]
1888 rows = select.get("rows")
1889 if not Transaction.__check_json_type(rows, (list, tuple),
1890 '"select" reply "rows"'):
1891 return False
1892 if len(rows) != 1:
1893 # XXX rate-limit
1894 vlog.warn('"select" reply "rows" has %d elements '
1895 'instead of 1' % len(rows))
1896 return False
1897 row = rows[0]
1898 if not Transaction.__check_json_type(row, (dict,),
1899 '"select" reply row'):
1900 return False
1901 column = row.get(self._inc_column)
1902 if not Transaction.__check_json_type(column, (int,),
1903 '"select" reply inc column'):
1904 return False
1905 self._inc_new_value = column
1906 return True
1907
1908 def __process_insert_reply(self, insert, ops):
1909 if insert.op_index >= len(ops):
1910 # XXX rate-limit
1911 vlog.warn("reply does not contain enough operations "
1912 "for insert (has %d, needs %d)"
1913 % (len(ops), insert.op_index))
1914 return False
1915
1916 # We know that this is a JSON object because the loop in
1917 # __process_reply() already checked.
1918 reply = ops[insert.op_index]
1919 json_uuid = reply.get("uuid")
1920 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1921 '"insert" reply "uuid"'):
1922 return False
1923
1924 try:
1925 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1926 except error.Error:
1927 # XXX rate-limit
1928 vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1929 return False
1930
1931 insert.real = uuid_
1932 return True
1933
1934
1935 class SchemaHelper(object):
1936 """IDL Schema helper.
1937
1938 This class encapsulates the logic required to generate schemas suitable
1939 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1940 they are interested in using register_columns(). When finished, the
1941 get_idl_schema() function may be called.
1942
1943 The location on disk of the schema used may be found in the
1944 'schema_location' variable."""
1945
1946 def __init__(self, location=None, schema_json=None):
1947 """Creates a new Schema object.
1948
1949 'location' file path to ovs schema. None means default location
1950 'schema_json' schema in json preresentation in memory
1951 """
1952
1953 if location and schema_json:
1954 raise ValueError("both location and schema_json can't be "
1955 "specified. it's ambiguous.")
1956 if schema_json is None:
1957 if location is None:
1958 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1959 schema_json = ovs.json.from_file(location)
1960
1961 self.schema_json = schema_json
1962 self._tables = {}
1963 self._readonly = {}
1964 self._all = False
1965
1966 def register_columns(self, table, columns, readonly=[]):
1967 """Registers interest in the given 'columns' of 'table'. Future calls
1968 to get_idl_schema() will include 'table':column for each column in
1969 'columns'. This function automatically avoids adding duplicate entries
1970 to the schema.
1971 A subset of 'columns' can be specified as 'readonly'. The readonly
1972 columns are not replicated but can be fetched on-demand by the user
1973 with Row.fetch().
1974
1975 'table' must be a string.
1976 'columns' must be a list of strings.
1977 'readonly' must be a list of strings.
1978 """
1979
1980 assert isinstance(table, str)
1981 assert isinstance(columns, list)
1982
1983 columns = set(columns) | self._tables.get(table, set())
1984 self._tables[table] = columns
1985 self._readonly[table] = readonly
1986
1987 def register_table(self, table):
1988 """Registers interest in the given all columns of 'table'. Future calls
1989 to get_idl_schema() will include all columns of 'table'.
1990
1991 'table' must be a string
1992 """
1993 assert isinstance(table, str)
1994 self._tables[table] = set() # empty set means all columns in the table
1995
1996 def register_all(self):
1997 """Registers interest in every column of every table."""
1998 self._all = True
1999
2000 def get_idl_schema(self):
2001 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
2002 object based on columns registered using the register_columns()
2003 function."""
2004
2005 schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
2006 self.schema_json = None
2007
2008 if not self._all:
2009 schema_tables = {}
2010 for table, columns in self._tables.items():
2011 schema_tables[table] = (
2012 self._keep_table_columns(schema, table, columns))
2013
2014 schema.tables = schema_tables
2015 schema.readonly = self._readonly
2016 return schema
2017
2018 def _keep_table_columns(self, schema, table_name, columns):
2019 assert table_name in schema.tables
2020 table = schema.tables[table_name]
2021
2022 if not columns:
2023 # empty set means all columns in the table
2024 return table
2025
2026 new_columns = {}
2027 for column_name in columns:
2028 assert isinstance(column_name, str)
2029 assert column_name in table.columns
2030
2031 new_columns[column_name] = table.columns[column_name]
2032
2033 table.columns = new_columns
2034 return table