]> git.proxmox.com Git - mirror_ovs.git/blob - python/ovs/db/idl.py
26421f21458a668f377f355b9a9762b221731ce3
[mirror_ovs.git] / python / ovs / db / idl.py
1 # Copyright (c) 2009, 2010, 2011, 2012, 2013, 2016 Nicira, Inc.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 # http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import functools
16 import uuid
17
18 import ovs.db.data as data
19 import ovs.db.parser
20 import ovs.db.schema
21 import ovs.jsonrpc
22 import ovs.ovsuuid
23 import ovs.poller
24 import ovs.vlog
25 from ovs.db import custom_index
26 from ovs.db import error
27
28 import six
29
30 vlog = ovs.vlog.Vlog("idl")
31
32 __pychecker__ = 'no-classattr no-objattrs'
33
34 ROW_CREATE = "create"
35 ROW_UPDATE = "update"
36 ROW_DELETE = "delete"
37
38 OVSDB_UPDATE = 0
39 OVSDB_UPDATE2 = 1
40
41
42 class Idl(object):
43 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
44
45 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
46 requests to an OVSDB database server and parses the responses, converting
47 raw JSON into data structures that are easier for clients to digest.
48
49 The IDL also assists with issuing database transactions. The client
50 creates a transaction, manipulates the IDL data structures, and commits or
51 aborts the transaction. The IDL then composes and issues the necessary
52 JSON-RPC requests and reports to the client whether the transaction
53 completed successfully.
54
55 The client is allowed to access the following attributes directly, in a
56 read-only fashion:
57
58 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
59 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
60 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
61 to a Row object.
62
63 The client may directly read and write the Row objects referenced by the
64 'rows' map values. Refer to Row for more details.
65
66 - 'change_seqno': A number that represents the IDL's state. When the IDL
67 is updated (by Idl.run()), its value changes. The sequence number can
68 occasionally change even if the database does not. This happens if the
69 connection to the database drops and reconnects, which causes the
70 database contents to be reloaded even if they didn't change. (It could
71 also happen if the database server sends out a "change" that reflects
72 what the IDL already thought was in the database. The database server is
73 not supposed to do that, but bugs could in theory cause it to do so.)
74
75 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
76 if no lock is configured.
77
78 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
79 lock, and False otherwise.
80
81 Locking and unlocking happens asynchronously from the database client's
82 point of view, so the information is only useful for optimization
83 (e.g. if the client doesn't have the lock then there's no point in trying
84 to write to the database).
85
86 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
87 the database server has indicated that some other client already owns the
88 requested lock, and False otherwise.
89
90 - 'txn': The ovs.db.idl.Transaction object for the database transaction
91 currently being constructed, if there is one, or None otherwise.
92 """
93
94 IDL_S_INITIAL = 0
95 IDL_S_MONITOR_REQUESTED = 1
96 IDL_S_MONITOR_COND_REQUESTED = 2
97
98 def __init__(self, remote, schema, probe_interval=None):
99 """Creates and returns a connection to the database named 'db_name' on
100 'remote', which should be in a form acceptable to
101 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
102 replica of the remote database.
103
104 'schema' should be the schema for the remote database. The caller may
105 have cut it down by removing tables or columns that are not of
106 interest. The IDL will only replicate the tables and columns that
107 remain. The caller may also add a attribute named 'alert' to selected
108 remaining columns, setting its value to False; if so, then changes to
109 those columns will not be considered changes to the database for the
110 purpose of the return value of Idl.run() and Idl.change_seqno. This is
111 useful for columns that the IDL's client will write but not read.
112
113 As a convenience to users, 'schema' may also be an instance of the
114 SchemaHelper class.
115
116 The IDL uses and modifies 'schema' directly.
117
118 If "probe_interval" is zero it disables the connection keepalive
119 feature. If non-zero the value will be forced to at least 1000
120 milliseconds. If None it will just use the default value in OVS.
121 """
122
123 assert isinstance(schema, SchemaHelper)
124 schema = schema.get_idl_schema()
125
126 self.tables = schema.tables
127 self.readonly = schema.readonly
128 self._db = schema
129 self._session = ovs.jsonrpc.Session.open(remote,
130 probe_interval=probe_interval)
131 self._monitor_request_id = None
132 self._last_seqno = None
133 self.change_seqno = 0
134 self.uuid = uuid.uuid1()
135 self.state = self.IDL_S_INITIAL
136
137 # Database locking.
138 self.lock_name = None # Name of lock we need, None if none.
139 self.has_lock = False # Has db server said we have the lock?
140 self.is_lock_contended = False # Has db server said we can't get lock?
141 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
142
143 # Transaction support.
144 self.txn = None
145 self._outstanding_txns = {}
146
147 for table in six.itervalues(schema.tables):
148 for column in six.itervalues(table.columns):
149 if not hasattr(column, 'alert'):
150 column.alert = True
151 table.need_table = False
152 table.rows = custom_index.IndexedRows(table)
153 table.idl = self
154 table.condition = [True]
155 table.cond_changed = False
156
157 def index_create(self, table, name):
158 """Create a named multi-column index on a table"""
159 return self.tables[table].rows.index_create(name)
160
161 def index_irange(self, table, name, start, end):
162 """Return items in a named index between start/end inclusive"""
163 return self.tables[table].rows.indexes[name].irange(start, end)
164
165 def index_equal(self, table, name, value):
166 """Return items in a named index matching a value"""
167 return self.tables[table].rows.indexes[name].irange(value, value)
168
169 def close(self):
170 """Closes the connection to the database. The IDL will no longer
171 update."""
172 self._session.close()
173
174 def run(self):
175 """Processes a batch of messages from the database server. Returns
176 True if the database as seen through the IDL changed, False if it did
177 not change. The initial fetch of the entire contents of the remote
178 database is considered to be one kind of change. If the IDL has been
179 configured to acquire a database lock (with Idl.set_lock()), then
180 successfully acquiring the lock is also considered to be a change.
181
182 This function can return occasional false positives, that is, report
183 that the database changed even though it didn't. This happens if the
184 connection to the database drops and reconnects, which causes the
185 database contents to be reloaded even if they didn't change. (It could
186 also happen if the database server sends out a "change" that reflects
187 what we already thought was in the database, but the database server is
188 not supposed to do that.)
189
190 As an alternative to checking the return value, the client may check
191 for changes in self.change_seqno."""
192 assert not self.txn
193 initial_change_seqno = self.change_seqno
194
195 self.send_cond_change()
196 self._session.run()
197 i = 0
198 while i < 50:
199 i += 1
200 if not self._session.is_connected():
201 break
202
203 seqno = self._session.get_seqno()
204 if seqno != self._last_seqno:
205 self._last_seqno = seqno
206 self.__txn_abort_all()
207 self.__send_monitor_request()
208 if self.lock_name:
209 self.__send_lock_request()
210 break
211
212 msg = self._session.recv()
213 if msg is None:
214 break
215 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
216 and msg.method == "update2"
217 and len(msg.params) == 2):
218 # Database contents changed.
219 self.__parse_update(msg.params[1], OVSDB_UPDATE2)
220 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
221 and msg.method == "update"
222 and len(msg.params) == 2):
223 # Database contents changed.
224 self.__parse_update(msg.params[1], OVSDB_UPDATE)
225 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
226 and self._monitor_request_id is not None
227 and self._monitor_request_id == msg.id):
228 # Reply to our "monitor" request.
229 try:
230 self.change_seqno += 1
231 self._monitor_request_id = None
232 self.__clear()
233 if self.state == self.IDL_S_MONITOR_COND_REQUESTED:
234 self.__parse_update(msg.result, OVSDB_UPDATE2)
235 else:
236 assert self.state == self.IDL_S_MONITOR_REQUESTED
237 self.__parse_update(msg.result, OVSDB_UPDATE)
238
239 except error.Error as e:
240 vlog.err("%s: parse error in received schema: %s"
241 % (self._session.get_name(), e))
242 self.__error()
243 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
244 and self._lock_request_id is not None
245 and self._lock_request_id == msg.id):
246 # Reply to our "lock" request.
247 self.__parse_lock_reply(msg.result)
248 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
249 and msg.method == "locked"):
250 # We got our lock.
251 self.__parse_lock_notify(msg.params, True)
252 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
253 and msg.method == "stolen"):
254 # Someone else stole our lock.
255 self.__parse_lock_notify(msg.params, False)
256 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
257 # Reply to our echo request. Ignore it.
258 pass
259 elif (msg.type == ovs.jsonrpc.Message.T_ERROR and
260 self.state == self.IDL_S_MONITOR_COND_REQUESTED and
261 self._monitor_request_id == msg.id):
262 if msg.error == "unknown method":
263 self.__send_monitor_request()
264 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
265 ovs.jsonrpc.Message.T_REPLY)
266 and self.__txn_process_reply(msg)):
267 # __txn_process_reply() did everything needed.
268 pass
269 else:
270 # This can happen if a transaction is destroyed before we
271 # receive the reply, so keep the log level low.
272 vlog.dbg("%s: received unexpected %s message"
273 % (self._session.get_name(),
274 ovs.jsonrpc.Message.type_to_string(msg.type)))
275
276 return initial_change_seqno != self.change_seqno
277
278 def send_cond_change(self):
279 if not self._session.is_connected():
280 return
281
282 for table in six.itervalues(self.tables):
283 if table.cond_changed:
284 self.__send_cond_change(table, table.condition)
285 table.cond_changed = False
286
287 def cond_change(self, table_name, cond):
288 """Sets the condition for 'table_name' to 'cond', which should be a
289 conditional expression suitable for use directly in the OVSDB
290 protocol, with the exception that the empty condition []
291 matches no rows (instead of matching every row). That is, []
292 is equivalent to [False], not to [True].
293 """
294
295 table = self.tables.get(table_name)
296 if not table:
297 raise error.Error('Unknown table "%s"' % table_name)
298
299 if cond == []:
300 cond = [False]
301 if table.condition != cond:
302 table.condition = cond
303 table.cond_changed = True
304
305 def wait(self, poller):
306 """Arranges for poller.block() to wake up when self.run() has something
307 to do or when activity occurs on a transaction on 'self'."""
308 self._session.wait(poller)
309 self._session.recv_wait(poller)
310
311 def has_ever_connected(self):
312 """Returns True, if the IDL successfully connected to the remote
313 database and retrieved its contents (even if the connection
314 subsequently dropped and is in the process of reconnecting). If so,
315 then the IDL contains an atomic snapshot of the database's contents
316 (but it might be arbitrarily old if the connection dropped).
317
318 Returns False if the IDL has never connected or retrieved the
319 database's contents. If so, the IDL is empty."""
320 return self.change_seqno != 0
321
322 def force_reconnect(self):
323 """Forces the IDL to drop its connection to the database and reconnect.
324 In the meantime, the contents of the IDL will not change."""
325 self._session.force_reconnect()
326
327 def set_lock(self, lock_name):
328 """If 'lock_name' is not None, configures the IDL to obtain the named
329 lock from the database server and to avoid modifying the database when
330 the lock cannot be acquired (that is, when another client has the same
331 lock).
332
333 If 'lock_name' is None, drops the locking requirement and releases the
334 lock."""
335 assert not self.txn
336 assert not self._outstanding_txns
337
338 if self.lock_name and (not lock_name or lock_name != self.lock_name):
339 # Release previous lock.
340 self.__send_unlock_request()
341 self.lock_name = None
342 self.is_lock_contended = False
343
344 if lock_name and not self.lock_name:
345 # Acquire new lock.
346 self.lock_name = lock_name
347 self.__send_lock_request()
348
349 def notify(self, event, row, updates=None):
350 """Hook for implementing create/update/delete notifications
351
352 :param event: The event that was triggered
353 :type event: ROW_CREATE, ROW_UPDATE, or ROW_DELETE
354 :param row: The row as it is after the operation has occured
355 :type row: Row
356 :param updates: For updates, row with only old values of the changed
357 columns
358 :type updates: Row
359 """
360
361 def __send_cond_change(self, table, cond):
362 monitor_cond_change = {table.name: [{"where": cond}]}
363 old_uuid = str(self.uuid)
364 self.uuid = uuid.uuid1()
365 params = [old_uuid, str(self.uuid), monitor_cond_change]
366 msg = ovs.jsonrpc.Message.create_request("monitor_cond_change", params)
367 self._session.send(msg)
368
369 def __clear(self):
370 changed = False
371
372 for table in six.itervalues(self.tables):
373 if table.rows:
374 changed = True
375 table.rows = custom_index.IndexedRows(table)
376
377 if changed:
378 self.change_seqno += 1
379
380 def __update_has_lock(self, new_has_lock):
381 if new_has_lock and not self.has_lock:
382 if self._monitor_request_id is None:
383 self.change_seqno += 1
384 else:
385 # We're waiting for a monitor reply, so don't signal that the
386 # database changed. The monitor reply will increment
387 # change_seqno anyhow.
388 pass
389 self.is_lock_contended = False
390 self.has_lock = new_has_lock
391
392 def __do_send_lock_request(self, method):
393 self.__update_has_lock(False)
394 self._lock_request_id = None
395 if self._session.is_connected():
396 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
397 msg_id = msg.id
398 self._session.send(msg)
399 else:
400 msg_id = None
401 return msg_id
402
403 def __send_lock_request(self):
404 self._lock_request_id = self.__do_send_lock_request("lock")
405
406 def __send_unlock_request(self):
407 self.__do_send_lock_request("unlock")
408
409 def __parse_lock_reply(self, result):
410 self._lock_request_id = None
411 got_lock = isinstance(result, dict) and result.get("locked") is True
412 self.__update_has_lock(got_lock)
413 if not got_lock:
414 self.is_lock_contended = True
415
416 def __parse_lock_notify(self, params, new_has_lock):
417 if (self.lock_name is not None
418 and isinstance(params, (list, tuple))
419 and params
420 and params[0] == self.lock_name):
421 self.__update_has_lock(new_has_lock)
422 if not new_has_lock:
423 self.is_lock_contended = True
424
425 def __send_monitor_request(self):
426 if self.state == self.IDL_S_INITIAL:
427 self.state = self.IDL_S_MONITOR_COND_REQUESTED
428 method = "monitor_cond"
429 else:
430 self.state = self.IDL_S_MONITOR_REQUESTED
431 method = "monitor"
432
433 monitor_requests = {}
434 for table in six.itervalues(self.tables):
435 columns = []
436 for column in six.iterkeys(table.columns):
437 if ((table.name not in self.readonly) or
438 (table.name in self.readonly) and
439 (column not in self.readonly[table.name])):
440 columns.append(column)
441 monitor_request = {"columns": columns}
442 if method == "monitor_cond" and table.condition != [True]:
443 monitor_request["where"] = table.condition
444 table.cond_change = False
445 monitor_requests[table.name] = [monitor_request]
446
447 msg = ovs.jsonrpc.Message.create_request(
448 method, [self._db.name, str(self.uuid), monitor_requests])
449 self._monitor_request_id = msg.id
450 self._session.send(msg)
451
452 def __parse_update(self, update, version):
453 try:
454 self.__do_parse_update(update, version)
455 except error.Error as e:
456 vlog.err("%s: error parsing update: %s"
457 % (self._session.get_name(), e))
458
459 def __do_parse_update(self, table_updates, version):
460 if not isinstance(table_updates, dict):
461 raise error.Error("<table-updates> is not an object",
462 table_updates)
463
464 for table_name, table_update in six.iteritems(table_updates):
465 table = self.tables.get(table_name)
466 if not table:
467 raise error.Error('<table-updates> includes unknown '
468 'table "%s"' % table_name)
469
470 if not isinstance(table_update, dict):
471 raise error.Error('<table-update> for table "%s" is not '
472 'an object' % table_name, table_update)
473
474 for uuid_string, row_update in six.iteritems(table_update):
475 if not ovs.ovsuuid.is_valid_string(uuid_string):
476 raise error.Error('<table-update> for table "%s" '
477 'contains bad UUID "%s" as member '
478 'name' % (table_name, uuid_string),
479 table_update)
480 uuid = ovs.ovsuuid.from_string(uuid_string)
481
482 if not isinstance(row_update, dict):
483 raise error.Error('<table-update> for table "%s" '
484 'contains <row-update> for %s that '
485 'is not an object'
486 % (table_name, uuid_string))
487
488 if version == OVSDB_UPDATE2:
489 if self.__process_update2(table, uuid, row_update):
490 self.change_seqno += 1
491 continue
492
493 parser = ovs.db.parser.Parser(row_update, "row-update")
494 old = parser.get_optional("old", [dict])
495 new = parser.get_optional("new", [dict])
496 parser.finish()
497
498 if not old and not new:
499 raise error.Error('<row-update> missing "old" and '
500 '"new" members', row_update)
501
502 if self.__process_update(table, uuid, old, new):
503 self.change_seqno += 1
504
505 def __process_update2(self, table, uuid, row_update):
506 row = table.rows.get(uuid)
507 changed = False
508 if "delete" in row_update:
509 if row:
510 del table.rows[uuid]
511 self.notify(ROW_DELETE, row)
512 changed = True
513 else:
514 # XXX rate-limit
515 vlog.warn("cannot delete missing row %s from table"
516 "%s" % (uuid, table.name))
517 elif "insert" in row_update or "initial" in row_update:
518 if row:
519 vlog.warn("cannot add existing row %s from table"
520 " %s" % (uuid, table.name))
521 del table.rows[uuid]
522 row = self.__create_row(table, uuid)
523 if "insert" in row_update:
524 row_update = row_update['insert']
525 else:
526 row_update = row_update['initial']
527 self.__add_default(table, row_update)
528 changed = self.__row_update(table, row, row_update)
529 table.rows[uuid] = row
530 if changed:
531 self.notify(ROW_CREATE, row)
532 elif "modify" in row_update:
533 if not row:
534 raise error.Error('Modify non-existing row')
535
536 old_row = self.__apply_diff(table, row, row_update['modify'])
537 self.notify(ROW_UPDATE, row, Row(self, table, uuid, old_row))
538 changed = True
539 else:
540 raise error.Error('<row-update> unknown operation',
541 row_update)
542 return changed
543
544 def __process_update(self, table, uuid, old, new):
545 """Returns True if a column changed, False otherwise."""
546 row = table.rows.get(uuid)
547 changed = False
548 if not new:
549 # Delete row.
550 if row:
551 del table.rows[uuid]
552 changed = True
553 self.notify(ROW_DELETE, row)
554 else:
555 # XXX rate-limit
556 vlog.warn("cannot delete missing row %s from table %s"
557 % (uuid, table.name))
558 elif not old:
559 # Insert row.
560 op = ROW_CREATE
561 if not row:
562 row = self.__create_row(table, uuid)
563 changed = True
564 else:
565 # XXX rate-limit
566 op = ROW_UPDATE
567 vlog.warn("cannot add existing row %s to table %s"
568 % (uuid, table.name))
569 changed |= self.__row_update(table, row, new)
570 if op == ROW_CREATE:
571 table.rows[uuid] = row
572 if changed:
573 self.notify(ROW_CREATE, row)
574 else:
575 op = ROW_UPDATE
576 if not row:
577 row = self.__create_row(table, uuid)
578 changed = True
579 op = ROW_CREATE
580 # XXX rate-limit
581 vlog.warn("cannot modify missing row %s in table %s"
582 % (uuid, table.name))
583 changed |= self.__row_update(table, row, new)
584 if op == ROW_CREATE:
585 table.rows[uuid] = row
586 if changed:
587 self.notify(op, row, Row.from_json(self, table, uuid, old))
588 return changed
589
590 def __column_name(self, column):
591 if column.type.key.type == ovs.db.types.UuidType:
592 return ovs.ovsuuid.to_json(column.type.key.type.default)
593 else:
594 return column.type.key.type.default
595
596 def __add_default(self, table, row_update):
597 for column in six.itervalues(table.columns):
598 if column.name not in row_update:
599 if ((table.name not in self.readonly) or
600 (table.name in self.readonly) and
601 (column.name not in self.readonly[table.name])):
602 if column.type.n_min != 0 and not column.type.is_map():
603 row_update[column.name] = self.__column_name(column)
604
605 def __apply_diff(self, table, row, row_diff):
606 old_row = {}
607 for column_name, datum_diff_json in six.iteritems(row_diff):
608 column = table.columns.get(column_name)
609 if not column:
610 # XXX rate-limit
611 vlog.warn("unknown column %s updating table %s"
612 % (column_name, table.name))
613 continue
614
615 try:
616 datum_diff = data.Datum.from_json(column.type, datum_diff_json)
617 except error.Error as e:
618 # XXX rate-limit
619 vlog.warn("error parsing column %s in table %s: %s"
620 % (column_name, table.name, e))
621 continue
622
623 old_row[column_name] = row._data[column_name].copy()
624 datum = row._data[column_name].diff(datum_diff)
625 if datum != row._data[column_name]:
626 row._data[column_name] = datum
627
628 return old_row
629
630 def __row_update(self, table, row, row_json):
631 changed = False
632 for column_name, datum_json in six.iteritems(row_json):
633 column = table.columns.get(column_name)
634 if not column:
635 # XXX rate-limit
636 vlog.warn("unknown column %s updating table %s"
637 % (column_name, table.name))
638 continue
639
640 try:
641 datum = data.Datum.from_json(column.type, datum_json)
642 except error.Error as e:
643 # XXX rate-limit
644 vlog.warn("error parsing column %s in table %s: %s"
645 % (column_name, table.name, e))
646 continue
647
648 if datum != row._data[column_name]:
649 row._data[column_name] = datum
650 if column.alert:
651 changed = True
652 else:
653 # Didn't really change but the OVSDB monitor protocol always
654 # includes every value in a row.
655 pass
656 return changed
657
658 def __create_row(self, table, uuid):
659 data = {}
660 for column in six.itervalues(table.columns):
661 data[column.name] = ovs.db.data.Datum.default(column.type)
662 return Row(self, table, uuid, data)
663
664 def __error(self):
665 self._session.force_reconnect()
666
667 def __txn_abort_all(self):
668 while self._outstanding_txns:
669 txn = self._outstanding_txns.popitem()[1]
670 txn._status = Transaction.TRY_AGAIN
671
672 def __txn_process_reply(self, msg):
673 txn = self._outstanding_txns.pop(msg.id, None)
674 if txn:
675 txn._process_reply(msg)
676 return True
677
678
679 def _uuid_to_row(atom, base):
680 if base.ref_table:
681 return base.ref_table.rows.get(atom)
682 else:
683 return atom
684
685
686 def _row_to_uuid(value):
687 if isinstance(value, Row):
688 return value.uuid
689 else:
690 return value
691
692
693 @functools.total_ordering
694 class Row(object):
695 """A row within an IDL.
696
697 The client may access the following attributes directly:
698
699 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
700
701 - An attribute for each column in the Row's table, named for the column,
702 whose values are as returned by Datum.to_python() for the column's type.
703
704 If some error occurs (e.g. the database server's idea of the column is
705 different from the IDL's idea), then the attribute values is the
706 "default" value return by Datum.default() for the column's type. (It is
707 important to know this because the default value may violate constraints
708 for the column's type, e.g. the default integer value is 0 even if column
709 contraints require the column's value to be positive.)
710
711 When a transaction is active, column attributes may also be assigned new
712 values. Committing the transaction will then cause the new value to be
713 stored into the database.
714
715 *NOTE*: In the current implementation, the value of a column is a *copy*
716 of the value in the database. This means that modifying its value
717 directly will have no useful effect. For example, the following:
718 row.mycolumn["a"] = "b" # don't do this
719 will not change anything in the database, even after commit. To modify
720 the column, instead assign the modified column value back to the column:
721 d = row.mycolumn
722 d["a"] = "b"
723 row.mycolumn = d
724 """
725 def __init__(self, idl, table, uuid, data):
726 # All of the explicit references to self.__dict__ below are required
727 # to set real attributes with invoking self.__getattr__().
728 self.__dict__["uuid"] = uuid
729
730 self.__dict__["_idl"] = idl
731 self.__dict__["_table"] = table
732
733 # _data is the committed data. It takes the following values:
734 #
735 # - A dictionary that maps every column name to a Datum, if the row
736 # exists in the committed form of the database.
737 #
738 # - None, if this row is newly inserted within the active transaction
739 # and thus has no committed form.
740 self.__dict__["_data"] = data
741
742 # _changes describes changes to this row within the active transaction.
743 # It takes the following values:
744 #
745 # - {}, the empty dictionary, if no transaction is active or if the
746 # row has yet not been changed within this transaction.
747 #
748 # - A dictionary that maps a column name to its new Datum, if an
749 # active transaction changes those columns' values.
750 #
751 # - A dictionary that maps every column name to a Datum, if the row
752 # is newly inserted within the active transaction.
753 #
754 # - None, if this transaction deletes this row.
755 self.__dict__["_changes"] = {}
756
757 # _mutations describes changes to this row to be handled via a
758 # mutate operation on the wire. It takes the following values:
759 #
760 # - {}, the empty dictionary, if no transaction is active or if the
761 # row has yet not been mutated within this transaction.
762 #
763 # - A dictionary that contains two keys:
764 #
765 # - "_inserts" contains a dictionary that maps column names to
766 # new keys/key-value pairs that should be inserted into the
767 # column
768 # - "_removes" contains a dictionary that maps column names to
769 # the keys/key-value pairs that should be removed from the
770 # column
771 #
772 # - None, if this transaction deletes this row.
773 self.__dict__["_mutations"] = {}
774
775 # A dictionary whose keys are the names of columns that must be
776 # verified as prerequisites when the transaction commits. The values
777 # in the dictionary are all None.
778 self.__dict__["_prereqs"] = {}
779
780 def __lt__(self, other):
781 if not isinstance(other, Row):
782 return NotImplemented
783 return bool(self.__dict__['uuid'] < other.__dict__['uuid'])
784
785 def __eq__(self, other):
786 if not isinstance(other, Row):
787 return NotImplemented
788 return bool(self.__dict__['uuid'] == other.__dict__['uuid'])
789
790 def __hash__(self):
791 return int(self.__dict__['uuid'])
792
793 def __getattr__(self, column_name):
794 assert self._changes is not None
795 assert self._mutations is not None
796
797 try:
798 column = self._table.columns[column_name]
799 except KeyError:
800 raise AttributeError("%s instance has no attribute '%s'" %
801 (self.__class__.__name__, column_name))
802 datum = self._changes.get(column_name)
803 inserts = None
804 if '_inserts' in self._mutations.keys():
805 inserts = self._mutations['_inserts'].get(column_name)
806 removes = None
807 if '_removes' in self._mutations.keys():
808 removes = self._mutations['_removes'].get(column_name)
809 if datum is None:
810 if self._data is None:
811 if inserts is None:
812 raise AttributeError("%s instance has no attribute '%s'" %
813 (self.__class__.__name__,
814 column_name))
815 else:
816 datum = data.Datum.from_python(column.type,
817 inserts,
818 _row_to_uuid)
819 elif column_name in self._data:
820 datum = self._data[column_name]
821 if column.type.is_set():
822 dlist = datum.as_list()
823 if inserts is not None:
824 dlist.extend(list(inserts))
825 if removes is not None:
826 removes_datum = data.Datum.from_python(column.type,
827 removes,
828 _row_to_uuid)
829 removes_list = removes_datum.as_list()
830 dlist = [x for x in dlist if x not in removes_list]
831 datum = data.Datum.from_python(column.type, dlist,
832 _row_to_uuid)
833 elif column.type.is_map():
834 dmap = datum.to_python(_uuid_to_row)
835 if inserts is not None:
836 dmap.update(inserts)
837 if removes is not None:
838 for key in removes:
839 if key not in (inserts or {}):
840 del dmap[key]
841 datum = data.Datum.from_python(column.type, dmap,
842 _row_to_uuid)
843 else:
844 if inserts is None:
845 raise AttributeError("%s instance has no attribute '%s'" %
846 (self.__class__.__name__,
847 column_name))
848 else:
849 datum = inserts
850
851 return datum.to_python(_uuid_to_row)
852
853 def __setattr__(self, column_name, value):
854 assert self._changes is not None
855 assert self._idl.txn
856
857 if ((self._table.name in self._idl.readonly) and
858 (column_name in self._idl.readonly[self._table.name])):
859 vlog.warn("attempting to write to readonly column %s"
860 % column_name)
861 return
862
863 column = self._table.columns[column_name]
864 try:
865 datum = data.Datum.from_python(column.type, value, _row_to_uuid)
866 except error.Error as e:
867 # XXX rate-limit
868 vlog.err("attempting to write bad value to column %s (%s)"
869 % (column_name, e))
870 return
871 # Remove prior version of the Row from the index if it has the indexed
872 # column set, and the column changing is an indexed column
873 if hasattr(self, column_name):
874 for idx in self._table.rows.indexes.values():
875 if column_name in (c.column for c in idx.columns):
876 idx.remove(self)
877 self._idl.txn._write(self, column, datum)
878 for idx in self._table.rows.indexes.values():
879 # Only update the index if indexed columns change
880 if column_name in (c.column for c in idx.columns):
881 idx.add(self)
882
883 def addvalue(self, column_name, key):
884 self._idl.txn._txn_rows[self.uuid] = self
885 column = self._table.columns[column_name]
886 try:
887 data.Datum.from_python(column.type, key, _row_to_uuid)
888 except error.Error as e:
889 # XXX rate-limit
890 vlog.err("attempting to write bad value to column %s (%s)"
891 % (column_name, e))
892 return
893 inserts = self._mutations.setdefault('_inserts', {})
894 column_value = inserts.setdefault(column_name, set())
895 column_value.add(key)
896
897 def delvalue(self, column_name, key):
898 self._idl.txn._txn_rows[self.uuid] = self
899 column = self._table.columns[column_name]
900 try:
901 data.Datum.from_python(column.type, key, _row_to_uuid)
902 except error.Error as e:
903 # XXX rate-limit
904 vlog.err("attempting to delete bad value from column %s (%s)"
905 % (column_name, e))
906 return
907 removes = self._mutations.setdefault('_removes', {})
908 column_value = removes.setdefault(column_name, set())
909 column_value.add(key)
910
911 def setkey(self, column_name, key, value):
912 self._idl.txn._txn_rows[self.uuid] = self
913 column = self._table.columns[column_name]
914 try:
915 data.Datum.from_python(column.type, {key: value}, _row_to_uuid)
916 except error.Error as e:
917 # XXX rate-limit
918 vlog.err("attempting to write bad value to column %s (%s)"
919 % (column_name, e))
920 return
921 if self._data and column_name in self._data:
922 # Remove existing key/value before updating.
923 removes = self._mutations.setdefault('_removes', {})
924 column_value = removes.setdefault(column_name, set())
925 column_value.add(key)
926 inserts = self._mutations.setdefault('_inserts', {})
927 column_value = inserts.setdefault(column_name, {})
928 column_value[key] = value
929
930 def delkey(self, column_name, key, value=None):
931 self._idl.txn._txn_rows[self.uuid] = self
932 if value:
933 try:
934 old_value = data.Datum.to_python(self._data[column_name],
935 _uuid_to_row)
936 except error.Error:
937 return
938 if key not in old_value:
939 return
940 if old_value[key] != value:
941 return
942 removes = self._mutations.setdefault('_removes', {})
943 column_value = removes.setdefault(column_name, set())
944 column_value.add(key)
945 return
946
947 @classmethod
948 def from_json(cls, idl, table, uuid, row_json):
949 data = {}
950 for column_name, datum_json in six.iteritems(row_json):
951 column = table.columns.get(column_name)
952 if not column:
953 # XXX rate-limit
954 vlog.warn("unknown column %s in table %s"
955 % (column_name, table.name))
956 continue
957 try:
958 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
959 except error.Error as e:
960 # XXX rate-limit
961 vlog.warn("error parsing column %s in table %s: %s"
962 % (column_name, table.name, e))
963 continue
964 data[column_name] = datum
965 return cls(idl, table, uuid, data)
966
967 def verify(self, column_name):
968 """Causes the original contents of column 'column_name' in this row to
969 be verified as a prerequisite to completing the transaction. That is,
970 if 'column_name' changed in this row (or if this row was deleted)
971 between the time that the IDL originally read its contents and the time
972 that the transaction commits, then the transaction aborts and
973 Transaction.commit() returns Transaction.TRY_AGAIN.
974
975 The intention is that, to ensure that no transaction commits based on
976 dirty reads, an application should call Row.verify() on each data item
977 read as part of a read-modify-write operation.
978
979 In some cases Row.verify() reduces to a no-op, because the current
980 value of the column is already known:
981
982 - If this row is a row created by the current transaction (returned
983 by Transaction.insert()).
984
985 - If the column has already been modified within the current
986 transaction.
987
988 Because of the latter property, always call Row.verify() *before*
989 modifying the column, for a given read-modify-write.
990
991 A transaction must be in progress."""
992 assert self._idl.txn
993 assert self._changes is not None
994 if not self._data or column_name in self._changes:
995 return
996
997 self._prereqs[column_name] = None
998
999 def delete(self):
1000 """Deletes this row from its table.
1001
1002 A transaction must be in progress."""
1003 assert self._idl.txn
1004 assert self._changes is not None
1005 if self._data is None:
1006 del self._idl.txn._txn_rows[self.uuid]
1007 else:
1008 self._idl.txn._txn_rows[self.uuid] = self
1009 del self._table.rows[self.uuid]
1010 self.__dict__["_changes"] = None
1011
1012 def fetch(self, column_name):
1013 self._idl.txn._fetch(self, column_name)
1014
1015 def increment(self, column_name):
1016 """Causes the transaction, when committed, to increment the value of
1017 'column_name' within this row by 1. 'column_name' must have an integer
1018 type. After the transaction commits successfully, the client may
1019 retrieve the final (incremented) value of 'column_name' with
1020 Transaction.get_increment_new_value().
1021
1022 The client could accomplish something similar by reading and writing
1023 and verify()ing columns. However, increment() will never (by itself)
1024 cause a transaction to fail because of a verify error.
1025
1026 The intended use is for incrementing the "next_cfg" column in
1027 the Open_vSwitch table."""
1028 self._idl.txn._increment(self, column_name)
1029
1030
1031 def _uuid_name_from_uuid(uuid):
1032 return "row%s" % str(uuid).replace("-", "_")
1033
1034
1035 def _where_uuid_equals(uuid):
1036 return [["_uuid", "==", ["uuid", str(uuid)]]]
1037
1038
1039 class _InsertedRow(object):
1040 def __init__(self, op_index):
1041 self.op_index = op_index
1042 self.real = None
1043
1044
1045 class Transaction(object):
1046 """A transaction may modify the contents of a database by modifying the
1047 values of columns, deleting rows, inserting rows, or adding checks that
1048 columns in the database have not changed ("verify" operations), through
1049 Row methods.
1050
1051 Reading and writing columns and inserting and deleting rows are all
1052 straightforward. The reasons to verify columns are less obvious.
1053 Verification is the key to maintaining transactional integrity. Because
1054 OVSDB handles multiple clients, it can happen that between the time that
1055 OVSDB client A reads a column and writes a new value, OVSDB client B has
1056 written that column. Client A's write should not ordinarily overwrite
1057 client B's, especially if the column in question is a "map" column that
1058 contains several more or less independent data items. If client A adds a
1059 "verify" operation before it writes the column, then the transaction fails
1060 in case client B modifies it first. Client A will then see the new value
1061 of the column and compose a new transaction based on the new contents
1062 written by client B.
1063
1064 When a transaction is complete, which must be before the next call to
1065 Idl.run(), call Transaction.commit() or Transaction.abort().
1066
1067 The life-cycle of a transaction looks like this:
1068
1069 1. Create the transaction and record the initial sequence number:
1070
1071 seqno = idl.change_seqno(idl)
1072 txn = Transaction(idl)
1073
1074 2. Modify the database with Row and Transaction methods.
1075
1076 3. Commit the transaction by calling Transaction.commit(). The first call
1077 to this function probably returns Transaction.INCOMPLETE. The client
1078 must keep calling again along as this remains true, calling Idl.run() in
1079 between to let the IDL do protocol processing. (If the client doesn't
1080 have anything else to do in the meantime, it can use
1081 Transaction.commit_block() to avoid having to loop itself.)
1082
1083 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
1084 to change from the saved 'seqno' (it's possible that it's already
1085 changed, in which case the client should not wait at all), then start
1086 over from step 1. Only a call to Idl.run() will change the return value
1087 of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)"""
1088
1089 # Status values that Transaction.commit() can return.
1090
1091 # Not yet committed or aborted.
1092 UNCOMMITTED = "uncommitted"
1093 # Transaction didn't include any changes.
1094 UNCHANGED = "unchanged"
1095 # Commit in progress, please wait.
1096 INCOMPLETE = "incomplete"
1097 # ovsdb_idl_txn_abort() called.
1098 ABORTED = "aborted"
1099 # Commit successful.
1100 SUCCESS = "success"
1101 # Commit failed because a "verify" operation
1102 # reported an inconsistency, due to a network
1103 # problem, or other transient failure. Wait
1104 # for a change, then try again.
1105 TRY_AGAIN = "try again"
1106 # Server hasn't given us the lock yet.
1107 NOT_LOCKED = "not locked"
1108 # Commit failed due to a hard error.
1109 ERROR = "error"
1110
1111 @staticmethod
1112 def status_to_string(status):
1113 """Converts one of the status values that Transaction.commit() can
1114 return into a human-readable string.
1115
1116 (The status values are in fact such strings already, so
1117 there's nothing to do.)"""
1118 return status
1119
1120 def __init__(self, idl):
1121 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
1122 A given Idl may only have a single active transaction at a time.
1123
1124 A Transaction may modify the contents of a database by assigning new
1125 values to columns (attributes of Row), deleting rows (with
1126 Row.delete()), or inserting rows (with Transaction.insert()). It may
1127 also check that columns in the database have not changed with
1128 Row.verify().
1129
1130 When a transaction is complete (which must be before the next call to
1131 Idl.run()), call Transaction.commit() or Transaction.abort()."""
1132 assert idl.txn is None
1133
1134 idl.txn = self
1135 self._request_id = None
1136 self.idl = idl
1137 self.dry_run = False
1138 self._txn_rows = {}
1139 self._status = Transaction.UNCOMMITTED
1140 self._error = None
1141 self._comments = []
1142
1143 self._inc_row = None
1144 self._inc_column = None
1145
1146 self._fetch_requests = []
1147
1148 self._inserted_rows = {} # Map from UUID to _InsertedRow
1149
1150 def add_comment(self, comment):
1151 """Appends 'comment' to the comments that will be passed to the OVSDB
1152 server when this transaction is committed. (The comment will be
1153 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
1154 relatively human-readable form.)"""
1155 self._comments.append(comment)
1156
1157 def wait(self, poller):
1158 """Causes poll_block() to wake up if this transaction has completed
1159 committing."""
1160 if self._status not in (Transaction.UNCOMMITTED,
1161 Transaction.INCOMPLETE):
1162 poller.immediate_wake()
1163
1164 def _substitute_uuids(self, json):
1165 if isinstance(json, (list, tuple)):
1166 if (len(json) == 2
1167 and json[0] == 'uuid'
1168 and ovs.ovsuuid.is_valid_string(json[1])):
1169 uuid = ovs.ovsuuid.from_string(json[1])
1170 row = self._txn_rows.get(uuid, None)
1171 if row and row._data is None:
1172 return ["named-uuid", _uuid_name_from_uuid(uuid)]
1173 else:
1174 return [self._substitute_uuids(elem) for elem in json]
1175 return json
1176
1177 def __disassemble(self):
1178 self.idl.txn = None
1179
1180 for row in six.itervalues(self._txn_rows):
1181 if row._changes is None:
1182 # If we add the deleted row back to rows with _changes == None
1183 # then __getattr__ will not work for the indexes
1184 row.__dict__["_changes"] = {}
1185 row.__dict__["_mutations"] = {}
1186 row._table.rows[row.uuid] = row
1187 elif row._data is None:
1188 del row._table.rows[row.uuid]
1189 row.__dict__["_changes"] = {}
1190 row.__dict__["_mutations"] = {}
1191 row.__dict__["_prereqs"] = {}
1192 self._txn_rows = {}
1193
1194 def commit(self):
1195 """Attempts to commit 'txn'. Returns the status of the commit
1196 operation, one of the following constants:
1197
1198 Transaction.INCOMPLETE:
1199
1200 The transaction is in progress, but not yet complete. The caller
1201 should call again later, after calling Idl.run() to let the
1202 IDL do OVSDB protocol processing.
1203
1204 Transaction.UNCHANGED:
1205
1206 The transaction is complete. (It didn't actually change the
1207 database, so the IDL didn't send any request to the database
1208 server.)
1209
1210 Transaction.ABORTED:
1211
1212 The caller previously called Transaction.abort().
1213
1214 Transaction.SUCCESS:
1215
1216 The transaction was successful. The update made by the
1217 transaction (and possibly other changes made by other database
1218 clients) should already be visible in the IDL.
1219
1220 Transaction.TRY_AGAIN:
1221
1222 The transaction failed for some transient reason, e.g. because a
1223 "verify" operation reported an inconsistency or due to a network
1224 problem. The caller should wait for a change to the database,
1225 then compose a new transaction, and commit the new transaction.
1226
1227 Use Idl.change_seqno to wait for a change in the database. It is
1228 important to use its value *before* the initial call to
1229 Transaction.commit() as the baseline for this purpose, because
1230 the change that one should wait for can happen after the initial
1231 call but before the call that returns Transaction.TRY_AGAIN, and
1232 using some other baseline value in that situation could cause an
1233 indefinite wait if the database rarely changes.
1234
1235 Transaction.NOT_LOCKED:
1236
1237 The transaction failed because the IDL has been configured to
1238 require a database lock (with Idl.set_lock()) but didn't
1239 get it yet or has already lost it.
1240
1241 Committing a transaction rolls back all of the changes that it made to
1242 the IDL's copy of the database. If the transaction commits
1243 successfully, then the database server will send an update and, thus,
1244 the IDL will be updated with the committed changes."""
1245 # The status can only change if we're the active transaction.
1246 # (Otherwise, our status will change only in Idl.run().)
1247 if self != self.idl.txn:
1248 return self._status
1249
1250 # If we need a lock but don't have it, give up quickly.
1251 if self.idl.lock_name and not self.idl.has_lock:
1252 self._status = Transaction.NOT_LOCKED
1253 self.__disassemble()
1254 return self._status
1255
1256 operations = [self.idl._db.name]
1257
1258 # Assert that we have the required lock (avoiding a race).
1259 if self.idl.lock_name:
1260 operations.append({"op": "assert",
1261 "lock": self.idl.lock_name})
1262
1263 # Add prerequisites and declarations of new rows.
1264 for row in six.itervalues(self._txn_rows):
1265 if row._prereqs:
1266 rows = {}
1267 columns = []
1268 for column_name in row._prereqs:
1269 columns.append(column_name)
1270 rows[column_name] = row._data[column_name].to_json()
1271 operations.append({"op": "wait",
1272 "table": row._table.name,
1273 "timeout": 0,
1274 "where": _where_uuid_equals(row.uuid),
1275 "until": "==",
1276 "columns": columns,
1277 "rows": [rows]})
1278
1279 # Add updates.
1280 any_updates = False
1281 for row in six.itervalues(self._txn_rows):
1282 if row._changes is None:
1283 if row._table.is_root:
1284 operations.append({"op": "delete",
1285 "table": row._table.name,
1286 "where": _where_uuid_equals(row.uuid)})
1287 any_updates = True
1288 else:
1289 # Let ovsdb-server decide whether to really delete it.
1290 pass
1291 elif row._changes:
1292 op = {"table": row._table.name}
1293 if row._data is None:
1294 op["op"] = "insert"
1295 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
1296 any_updates = True
1297
1298 op_index = len(operations) - 1
1299 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
1300 else:
1301 op["op"] = "update"
1302 op["where"] = _where_uuid_equals(row.uuid)
1303
1304 row_json = {}
1305 op["row"] = row_json
1306
1307 for column_name, datum in six.iteritems(row._changes):
1308 if row._data is not None or not datum.is_default():
1309 row_json[column_name] = (
1310 self._substitute_uuids(datum.to_json()))
1311
1312 # If anything really changed, consider it an update.
1313 # We can't suppress not-really-changed values earlier
1314 # or transactions would become nonatomic (see the big
1315 # comment inside Transaction._write()).
1316 if (not any_updates and row._data is not None and
1317 row._data[column_name] != datum):
1318 any_updates = True
1319
1320 if row._data is None or row_json:
1321 operations.append(op)
1322 if row._mutations:
1323 addop = False
1324 op = {"table": row._table.name}
1325 op["op"] = "mutate"
1326 if row._data is None:
1327 # New row
1328 op["where"] = self._substitute_uuids(
1329 _where_uuid_equals(row.uuid))
1330 else:
1331 # Existing row
1332 op["where"] = _where_uuid_equals(row.uuid)
1333 op["mutations"] = []
1334 if '_removes' in row._mutations.keys():
1335 for col, dat in six.iteritems(row._mutations['_removes']):
1336 column = row._table.columns[col]
1337 if column.type.is_map():
1338 opdat = ["set"]
1339 opdat.append(list(dat))
1340 else:
1341 opdat = ["set"]
1342 inner_opdat = []
1343 for ele in dat:
1344 try:
1345 datum = data.Datum.from_python(column.type,
1346 ele, _row_to_uuid)
1347 except error.Error:
1348 return
1349 inner_opdat.append(
1350 self._substitute_uuids(datum.to_json()))
1351 opdat.append(inner_opdat)
1352 mutation = [col, "delete", opdat]
1353 op["mutations"].append(mutation)
1354 addop = True
1355 if '_inserts' in row._mutations.keys():
1356 for col, val in six.iteritems(row._mutations['_inserts']):
1357 column = row._table.columns[col]
1358 if column.type.is_map():
1359 opdat = ["map"]
1360 datum = data.Datum.from_python(column.type, val,
1361 _row_to_uuid)
1362 opdat.append(datum.as_list())
1363 else:
1364 opdat = ["set"]
1365 inner_opdat = []
1366 for ele in val:
1367 try:
1368 datum = data.Datum.from_python(column.type,
1369 ele, _row_to_uuid)
1370 except error.Error:
1371 return
1372 inner_opdat.append(
1373 self._substitute_uuids(datum.to_json()))
1374 opdat.append(inner_opdat)
1375 mutation = [col, "insert", opdat]
1376 op["mutations"].append(mutation)
1377 addop = True
1378 if addop:
1379 operations.append(op)
1380 any_updates = True
1381
1382 if self._fetch_requests:
1383 for fetch in self._fetch_requests:
1384 fetch["index"] = len(operations) - 1
1385 operations.append({"op": "select",
1386 "table": fetch["row"]._table.name,
1387 "where": self._substitute_uuids(
1388 _where_uuid_equals(fetch["row"].uuid)),
1389 "columns": [fetch["column_name"]]})
1390 any_updates = True
1391
1392 # Add increment.
1393 if self._inc_row and any_updates:
1394 self._inc_index = len(operations) - 1
1395
1396 operations.append({"op": "mutate",
1397 "table": self._inc_row._table.name,
1398 "where": self._substitute_uuids(
1399 _where_uuid_equals(self._inc_row.uuid)),
1400 "mutations": [[self._inc_column, "+=", 1]]})
1401 operations.append({"op": "select",
1402 "table": self._inc_row._table.name,
1403 "where": self._substitute_uuids(
1404 _where_uuid_equals(self._inc_row.uuid)),
1405 "columns": [self._inc_column]})
1406
1407 # Add comment.
1408 if self._comments:
1409 operations.append({"op": "comment",
1410 "comment": "\n".join(self._comments)})
1411
1412 # Dry run?
1413 if self.dry_run:
1414 operations.append({"op": "abort"})
1415
1416 if not any_updates:
1417 self._status = Transaction.UNCHANGED
1418 else:
1419 msg = ovs.jsonrpc.Message.create_request("transact", operations)
1420 self._request_id = msg.id
1421 if not self.idl._session.send(msg):
1422 self.idl._outstanding_txns[self._request_id] = self
1423 self._status = Transaction.INCOMPLETE
1424 else:
1425 self._status = Transaction.TRY_AGAIN
1426
1427 self.__disassemble()
1428 return self._status
1429
1430 def commit_block(self):
1431 """Attempts to commit this transaction, blocking until the commit
1432 either succeeds or fails. Returns the final commit status, which may
1433 be any Transaction.* value other than Transaction.INCOMPLETE.
1434
1435 This function calls Idl.run() on this transaction'ss IDL, so it may
1436 cause Idl.change_seqno to change."""
1437 while True:
1438 status = self.commit()
1439 if status != Transaction.INCOMPLETE:
1440 return status
1441
1442 self.idl.run()
1443
1444 poller = ovs.poller.Poller()
1445 self.idl.wait(poller)
1446 self.wait(poller)
1447 poller.block()
1448
1449 def get_increment_new_value(self):
1450 """Returns the final (incremented) value of the column in this
1451 transaction that was set to be incremented by Row.increment. This
1452 transaction must have committed successfully."""
1453 assert self._status == Transaction.SUCCESS
1454 return self._inc_new_value
1455
1456 def abort(self):
1457 """Aborts this transaction. If Transaction.commit() has already been
1458 called then the transaction might get committed anyhow."""
1459 self.__disassemble()
1460 if self._status in (Transaction.UNCOMMITTED,
1461 Transaction.INCOMPLETE):
1462 self._status = Transaction.ABORTED
1463
1464 def get_error(self):
1465 """Returns a string representing this transaction's current status,
1466 suitable for use in log messages."""
1467 if self._status != Transaction.ERROR:
1468 return Transaction.status_to_string(self._status)
1469 elif self._error:
1470 return self._error
1471 else:
1472 return "no error details available"
1473
1474 def __set_error_json(self, json):
1475 if self._error is None:
1476 self._error = ovs.json.to_string(json)
1477
1478 def get_insert_uuid(self, uuid):
1479 """Finds and returns the permanent UUID that the database assigned to a
1480 newly inserted row, given the UUID that Transaction.insert() assigned
1481 locally to that row.
1482
1483 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
1484 or if it was assigned by that function and then deleted by Row.delete()
1485 within the same transaction. (Rows that are inserted and then deleted
1486 within a single transaction are never sent to the database server, so
1487 it never assigns them a permanent UUID.)
1488
1489 This transaction must have completed successfully."""
1490 assert self._status in (Transaction.SUCCESS,
1491 Transaction.UNCHANGED)
1492 inserted_row = self._inserted_rows.get(uuid)
1493 if inserted_row:
1494 return inserted_row.real
1495 return None
1496
1497 def _increment(self, row, column):
1498 assert not self._inc_row
1499 self._inc_row = row
1500 self._inc_column = column
1501
1502 def _fetch(self, row, column_name):
1503 self._fetch_requests.append({"row": row, "column_name": column_name})
1504
1505 def _write(self, row, column, datum):
1506 assert row._changes is not None
1507 assert row._mutations is not None
1508
1509 txn = row._idl.txn
1510
1511 # If this is a write-only column and the datum being written is the
1512 # same as the one already there, just skip the update entirely. This
1513 # is worth optimizing because we have a lot of columns that get
1514 # periodically refreshed into the database but don't actually change
1515 # that often.
1516 #
1517 # We don't do this for read/write columns because that would break
1518 # atomicity of transactions--some other client might have written a
1519 # different value in that column since we read it. (But if a whole
1520 # transaction only does writes of existing values, without making any
1521 # real changes, we will drop the whole transaction later in
1522 # ovsdb_idl_txn_commit().)
1523 if (not column.alert and row._data and
1524 row._data.get(column.name) == datum):
1525 new_value = row._changes.get(column.name)
1526 if new_value is None or new_value == datum:
1527 return
1528
1529 txn._txn_rows[row.uuid] = row
1530 if '_inserts' in row._mutations:
1531 row._mutations['_inserts'].pop(column.name, None)
1532 if '_removes' in row._mutations:
1533 row._mutations['_removes'].pop(column.name, None)
1534 row._changes[column.name] = datum.copy()
1535
1536 def insert(self, table, new_uuid=None):
1537 """Inserts and returns a new row in 'table', which must be one of the
1538 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1539
1540 The new row is assigned a provisional UUID. If 'uuid' is None then one
1541 is randomly generated; otherwise 'uuid' should specify a randomly
1542 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
1543 different UUID when 'txn' is committed, but the IDL will replace any
1544 uses of the provisional UUID in the data to be to be committed by the
1545 UUID assigned by ovsdb-server."""
1546 assert self._status == Transaction.UNCOMMITTED
1547 if new_uuid is None:
1548 new_uuid = uuid.uuid4()
1549 row = Row(self.idl, table, new_uuid, None)
1550 table.rows[row.uuid] = row
1551 self._txn_rows[row.uuid] = row
1552 return row
1553
1554 def _process_reply(self, msg):
1555 if msg.type == ovs.jsonrpc.Message.T_ERROR:
1556 self._status = Transaction.ERROR
1557 elif not isinstance(msg.result, (list, tuple)):
1558 # XXX rate-limit
1559 vlog.warn('reply to "transact" is not JSON array')
1560 else:
1561 hard_errors = False
1562 soft_errors = False
1563 lock_errors = False
1564
1565 ops = msg.result
1566 for op in ops:
1567 if op is None:
1568 # This isn't an error in itself but indicates that some
1569 # prior operation failed, so make sure that we know about
1570 # it.
1571 soft_errors = True
1572 elif isinstance(op, dict):
1573 error = op.get("error")
1574 if error is not None:
1575 if error == "timed out":
1576 soft_errors = True
1577 elif error == "not owner":
1578 lock_errors = True
1579 elif error == "aborted":
1580 pass
1581 else:
1582 hard_errors = True
1583 self.__set_error_json(op)
1584 else:
1585 hard_errors = True
1586 self.__set_error_json(op)
1587 # XXX rate-limit
1588 vlog.warn("operation reply is not JSON null or object")
1589
1590 if not soft_errors and not hard_errors and not lock_errors:
1591 if self._inc_row and not self.__process_inc_reply(ops):
1592 hard_errors = True
1593 if self._fetch_requests:
1594 if self.__process_fetch_reply(ops):
1595 self.idl.change_seqno += 1
1596 else:
1597 hard_errors = True
1598
1599 for insert in six.itervalues(self._inserted_rows):
1600 if not self.__process_insert_reply(insert, ops):
1601 hard_errors = True
1602
1603 if hard_errors:
1604 self._status = Transaction.ERROR
1605 elif lock_errors:
1606 self._status = Transaction.NOT_LOCKED
1607 elif soft_errors:
1608 self._status = Transaction.TRY_AGAIN
1609 else:
1610 self._status = Transaction.SUCCESS
1611
1612 @staticmethod
1613 def __check_json_type(json, types, name):
1614 if not json:
1615 # XXX rate-limit
1616 vlog.warn("%s is missing" % name)
1617 return False
1618 elif not isinstance(json, tuple(types)):
1619 # XXX rate-limit
1620 vlog.warn("%s has unexpected type %s" % (name, type(json)))
1621 return False
1622 else:
1623 return True
1624
1625 def __process_fetch_reply(self, ops):
1626 update = False
1627 for fetch_request in self._fetch_requests:
1628 row = fetch_request["row"]
1629 column_name = fetch_request["column_name"]
1630 index = fetch_request["index"]
1631 table = row._table
1632
1633 select = ops[index]
1634 fetched_rows = select.get("rows")
1635 if not Transaction.__check_json_type(fetched_rows, (list, tuple),
1636 '"select" reply "rows"'):
1637 return False
1638 if len(fetched_rows) != 1:
1639 # XXX rate-limit
1640 vlog.warn('"select" reply "rows" has %d elements '
1641 'instead of 1' % len(fetched_rows))
1642 continue
1643 fetched_row = fetched_rows[0]
1644 if not Transaction.__check_json_type(fetched_row, (dict,),
1645 '"select" reply row'):
1646 continue
1647
1648 column = table.columns.get(column_name)
1649 datum_json = fetched_row.get(column_name)
1650 datum = data.Datum.from_json(column.type, datum_json)
1651
1652 row._data[column_name] = datum
1653 update = True
1654
1655 return update
1656
1657 def __process_inc_reply(self, ops):
1658 if self._inc_index + 2 > len(ops):
1659 # XXX rate-limit
1660 vlog.warn("reply does not contain enough operations for "
1661 "increment (has %d, needs %d)" %
1662 (len(ops), self._inc_index + 2))
1663
1664 # We know that this is a JSON object because the loop in
1665 # __process_reply() already checked.
1666 mutate = ops[self._inc_index]
1667 count = mutate.get("count")
1668 if not Transaction.__check_json_type(count, six.integer_types,
1669 '"mutate" reply "count"'):
1670 return False
1671 if count != 1:
1672 # XXX rate-limit
1673 vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
1674 return False
1675
1676 select = ops[self._inc_index + 1]
1677 rows = select.get("rows")
1678 if not Transaction.__check_json_type(rows, (list, tuple),
1679 '"select" reply "rows"'):
1680 return False
1681 if len(rows) != 1:
1682 # XXX rate-limit
1683 vlog.warn('"select" reply "rows" has %d elements '
1684 'instead of 1' % len(rows))
1685 return False
1686 row = rows[0]
1687 if not Transaction.__check_json_type(row, (dict,),
1688 '"select" reply row'):
1689 return False
1690 column = row.get(self._inc_column)
1691 if not Transaction.__check_json_type(column, six.integer_types,
1692 '"select" reply inc column'):
1693 return False
1694 self._inc_new_value = column
1695 return True
1696
1697 def __process_insert_reply(self, insert, ops):
1698 if insert.op_index >= len(ops):
1699 # XXX rate-limit
1700 vlog.warn("reply does not contain enough operations "
1701 "for insert (has %d, needs %d)"
1702 % (len(ops), insert.op_index))
1703 return False
1704
1705 # We know that this is a JSON object because the loop in
1706 # __process_reply() already checked.
1707 reply = ops[insert.op_index]
1708 json_uuid = reply.get("uuid")
1709 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1710 '"insert" reply "uuid"'):
1711 return False
1712
1713 try:
1714 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1715 except error.Error:
1716 # XXX rate-limit
1717 vlog.warn('"insert" reply "uuid" is not a JSON UUID')
1718 return False
1719
1720 insert.real = uuid_
1721 return True
1722
1723
1724 class SchemaHelper(object):
1725 """IDL Schema helper.
1726
1727 This class encapsulates the logic required to generate schemas suitable
1728 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1729 they are interested in using register_columns(). When finished, the
1730 get_idl_schema() function may be called.
1731
1732 The location on disk of the schema used may be found in the
1733 'schema_location' variable."""
1734
1735 def __init__(self, location=None, schema_json=None):
1736 """Creates a new Schema object.
1737
1738 'location' file path to ovs schema. None means default location
1739 'schema_json' schema in json preresentation in memory
1740 """
1741
1742 if location and schema_json:
1743 raise ValueError("both location and schema_json can't be "
1744 "specified. it's ambiguous.")
1745 if schema_json is None:
1746 if location is None:
1747 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1748 schema_json = ovs.json.from_file(location)
1749
1750 self.schema_json = schema_json
1751 self._tables = {}
1752 self._readonly = {}
1753 self._all = False
1754
1755 def register_columns(self, table, columns, readonly=[]):
1756 """Registers interest in the given 'columns' of 'table'. Future calls
1757 to get_idl_schema() will include 'table':column for each column in
1758 'columns'. This function automatically avoids adding duplicate entries
1759 to the schema.
1760 A subset of 'columns' can be specified as 'readonly'. The readonly
1761 columns are not replicated but can be fetched on-demand by the user
1762 with Row.fetch().
1763
1764 'table' must be a string.
1765 'columns' must be a list of strings.
1766 'readonly' must be a list of strings.
1767 """
1768
1769 assert isinstance(table, six.string_types)
1770 assert isinstance(columns, list)
1771
1772 columns = set(columns) | self._tables.get(table, set())
1773 self._tables[table] = columns
1774 self._readonly[table] = readonly
1775
1776 def register_table(self, table):
1777 """Registers interest in the given all columns of 'table'. Future calls
1778 to get_idl_schema() will include all columns of 'table'.
1779
1780 'table' must be a string
1781 """
1782 assert isinstance(table, six.string_types)
1783 self._tables[table] = set() # empty set means all columns in the table
1784
1785 def register_all(self):
1786 """Registers interest in every column of every table."""
1787 self._all = True
1788
1789 def get_idl_schema(self):
1790 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1791 object based on columns registered using the register_columns()
1792 function."""
1793
1794 schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
1795 self.schema_json = None
1796
1797 if not self._all:
1798 schema_tables = {}
1799 for table, columns in six.iteritems(self._tables):
1800 schema_tables[table] = (
1801 self._keep_table_columns(schema, table, columns))
1802
1803 schema.tables = schema_tables
1804 schema.readonly = self._readonly
1805 return schema
1806
1807 def _keep_table_columns(self, schema, table_name, columns):
1808 assert table_name in schema.tables
1809 table = schema.tables[table_name]
1810
1811 if not columns:
1812 # empty set means all columns in the table
1813 return table
1814
1815 new_columns = {}
1816 for column_name in columns:
1817 assert isinstance(column_name, six.string_types)
1818 assert column_name in table.columns
1819
1820 new_columns[column_name] = table.columns[column_name]
1821
1822 table.columns = new_columns
1823 return table