]> git.proxmox.com Git - mirror_ovs.git/blame - python/ovs/db/idl.py
python/ovs/db/idl: make SchemaHelper accept schema in json form
[mirror_ovs.git] / python / ovs / db / idl.py
CommitLineData
e0edde6f 1# Copyright (c) 2009, 2010, 2011, 2012 Nicira, Inc.
99155935
BP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at:
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
8cdf0349 15import uuid
99155935
BP
16
17import ovs.jsonrpc
4c0f6271 18import ovs.db.parser
99155935
BP
19import ovs.db.schema
20from ovs.db import error
21import ovs.ovsuuid
8cdf0349 22import ovs.poller
3a656eaf
EJ
23import ovs.vlog
24
25vlog = ovs.vlog.Vlog("idl")
99155935 26
26bb0f31
EJ
27__pychecker__ = 'no-classattr no-objattrs'
28
29
99155935
BP
30class Idl:
31 """Open vSwitch Database Interface Definition Language (OVSDB IDL).
32
33 The OVSDB IDL maintains an in-memory replica of a database. It issues RPC
34 requests to an OVSDB database server and parses the responses, converting
35 raw JSON into data structures that are easier for clients to digest.
36
37 The IDL also assists with issuing database transactions. The client
38 creates a transaction, manipulates the IDL data structures, and commits or
39 aborts the transaction. The IDL then composes and issues the necessary
40 JSON-RPC requests and reports to the client whether the transaction
41 completed successfully.
42
8cdf0349
BP
43 The client is allowed to access the following attributes directly, in a
44 read-only fashion:
99155935 45
8cdf0349
BP
46 - 'tables': This is the 'tables' map in the ovs.db.schema.DbSchema provided
47 to the Idl constructor. Each ovs.db.schema.TableSchema in the map is
48 annotated with a new attribute 'rows', which is a dict from a uuid.UUID
49 to a Row object.
50
51 The client may directly read and write the Row objects referenced by the
52 'rows' map values. Refer to Row for more details.
53
54 - 'change_seqno': A number that represents the IDL's state. When the IDL
2f926787
BP
55 is updated (by Idl.run()), its value changes. The sequence number can
56 occasionally change even if the database does not. This happens if the
57 connection to the database drops and reconnects, which causes the
58 database contents to be reloaded even if they didn't change. (It could
59 also happen if the database server sends out a "change" that reflects
60 what the IDL already thought was in the database. The database server is
61 not supposed to do that, but bugs could in theory cause it to do so.)
8cdf0349
BP
62
63 - 'lock_name': The name of the lock configured with Idl.set_lock(), or None
64 if no lock is configured.
65
66 - 'has_lock': True, if the IDL is configured to obtain a lock and owns that
67 lock, and False otherwise.
68
69 Locking and unlocking happens asynchronously from the database client's
70 point of view, so the information is only useful for optimization
71 (e.g. if the client doesn't have the lock then there's no point in trying
72 to write to the database).
73
74 - 'is_lock_contended': True, if the IDL is configured to obtain a lock but
75 the database server has indicated that some other client already owns the
76 requested lock, and False otherwise.
77
78 - 'txn': The ovs.db.idl.Transaction object for the database transaction
79 currently being constructed, if there is one, or None otherwise.
80"""
81
82 def __init__(self, remote, schema):
99155935
BP
83 """Creates and returns a connection to the database named 'db_name' on
84 'remote', which should be in a form acceptable to
85 ovs.jsonrpc.session.open(). The connection will maintain an in-memory
8cdf0349
BP
86 replica of the remote database.
87
88 'schema' should be the schema for the remote database. The caller may
89 have cut it down by removing tables or columns that are not of
90 interest. The IDL will only replicate the tables and columns that
91 remain. The caller may also add a attribute named 'alert' to selected
92 remaining columns, setting its value to False; if so, then changes to
93 those columns will not be considered changes to the database for the
94 purpose of the return value of Idl.run() and Idl.change_seqno. This is
95 useful for columns that the IDL's client will write but not read.
96
ad0991e6
EJ
97 As a convenience to users, 'schema' may also be an instance of the
98 SchemaHelper class.
99
8cdf0349
BP
100 The IDL uses and modifies 'schema' directly."""
101
bf42f674
EJ
102 assert isinstance(schema, SchemaHelper)
103 schema = schema.get_idl_schema()
ad0991e6 104
8cdf0349
BP
105 self.tables = schema.tables
106 self._db = schema
107 self._session = ovs.jsonrpc.Session.open(remote)
108 self._monitor_request_id = None
109 self._last_seqno = None
99155935 110 self.change_seqno = 0
8cdf0349
BP
111
112 # Database locking.
113 self.lock_name = None # Name of lock we need, None if none.
114 self.has_lock = False # Has db server said we have the lock?
26bb0f31 115 self.is_lock_contended = False # Has db server said we can't get lock?
8cdf0349
BP
116 self._lock_request_id = None # JSON-RPC ID of in-flight lock request.
117
118 # Transaction support.
119 self.txn = None
120 self._outstanding_txns = {}
121
122 for table in schema.tables.itervalues():
123 for column in table.columns.itervalues():
124 if not hasattr(column, 'alert'):
125 column.alert = True
126 table.need_table = False
127 table.rows = {}
128 table.idl = self
99155935
BP
129
130 def close(self):
8cdf0349
BP
131 """Closes the connection to the database. The IDL will no longer
132 update."""
133 self._session.close()
99155935
BP
134
135 def run(self):
136 """Processes a batch of messages from the database server. Returns
137 True if the database as seen through the IDL changed, False if it did
138 not change. The initial fetch of the entire contents of the remote
8cdf0349
BP
139 database is considered to be one kind of change. If the IDL has been
140 configured to acquire a database lock (with Idl.set_lock()), then
141 successfully acquiring the lock is also considered to be a change.
99155935
BP
142
143 This function can return occasional false positives, that is, report
144 that the database changed even though it didn't. This happens if the
145 connection to the database drops and reconnects, which causes the
146 database contents to be reloaded even if they didn't change. (It could
147 also happen if the database server sends out a "change" that reflects
148 what we already thought was in the database, but the database server is
149 not supposed to do that.)
150
151 As an alternative to checking the return value, the client may check
8cdf0349
BP
152 for changes in self.change_seqno."""
153 assert not self.txn
99155935 154 initial_change_seqno = self.change_seqno
8cdf0349
BP
155 self._session.run()
156 i = 0
157 while i < 50:
158 i += 1
159 if not self._session.is_connected():
160 break
161
162 seqno = self._session.get_seqno()
163 if seqno != self._last_seqno:
164 self._last_seqno = seqno
165 self.__txn_abort_all()
166 self.__send_monitor_request()
167 if self.lock_name:
168 self.__send_lock_request()
169 break
170
171 msg = self._session.recv()
172 if msg is None:
173 break
174 if (msg.type == ovs.jsonrpc.Message.T_NOTIFY
175 and msg.method == "update"
176 and len(msg.params) == 2
177 and msg.params[0] == None):
178 # Database contents changed.
179 self.__parse_update(msg.params[1])
180 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
181 and self._monitor_request_id is not None
182 and self._monitor_request_id == msg.id):
183 # Reply to our "monitor" request.
184 try:
185 self.change_seqno += 1
186 self._monitor_request_id = None
187 self.__clear()
188 self.__parse_update(msg.result)
189 except error.Error, e:
3a656eaf
EJ
190 vlog.err("%s: parse error in received schema: %s"
191 % (self._session.get_name(), e))
8cdf0349
BP
192 self.__error()
193 elif (msg.type == ovs.jsonrpc.Message.T_REPLY
194 and self._lock_request_id is not None
195 and self._lock_request_id == msg.id):
196 # Reply to our "lock" request.
197 self.__parse_lock_reply(msg.result)
198 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
199 and msg.method == "locked"):
200 # We got our lock.
201 self.__parse_lock_notify(msg.params, True)
202 elif (msg.type == ovs.jsonrpc.Message.T_NOTIFY
203 and msg.method == "stolen"):
204 # Someone else stole our lock.
205 self.__parse_lock_notify(msg.params, False)
206 elif msg.type == ovs.jsonrpc.Message.T_NOTIFY and msg.id == "echo":
207 # Reply to our echo request. Ignore it.
208 pass
209 elif (msg.type in (ovs.jsonrpc.Message.T_ERROR,
210 ovs.jsonrpc.Message.T_REPLY)
211 and self.__txn_process_reply(msg)):
212 # __txn_process_reply() did everything needed.
213 pass
214 else:
215 # This can happen if a transaction is destroyed before we
216 # receive the reply, so keep the log level low.
3a656eaf
EJ
217 vlog.dbg("%s: received unexpected %s message"
218 % (self._session.get_name(),
219 ovs.jsonrpc.Message.type_to_string(msg.type)))
8cdf0349 220
99155935
BP
221 return initial_change_seqno != self.change_seqno
222
223 def wait(self, poller):
224 """Arranges for poller.block() to wake up when self.run() has something
225 to do or when activity occurs on a transaction on 'self'."""
8cdf0349
BP
226 self._session.wait(poller)
227 self._session.recv_wait(poller)
99155935 228
8cdf0349
BP
229 def has_ever_connected(self):
230 """Returns True, if the IDL successfully connected to the remote
231 database and retrieved its contents (even if the connection
232 subsequently dropped and is in the process of reconnecting). If so,
233 then the IDL contains an atomic snapshot of the database's contents
234 (but it might be arbitrarily old if the connection dropped).
235
236 Returns False if the IDL has never connected or retrieved the
237 database's contents. If so, the IDL is empty."""
238 return self.change_seqno != 0
239
240 def force_reconnect(self):
241 """Forces the IDL to drop its connection to the database and reconnect.
242 In the meantime, the contents of the IDL will not change."""
243 self._session.force_reconnect()
244
245 def set_lock(self, lock_name):
246 """If 'lock_name' is not None, configures the IDL to obtain the named
247 lock from the database server and to avoid modifying the database when
248 the lock cannot be acquired (that is, when another client has the same
249 lock).
250
251 If 'lock_name' is None, drops the locking requirement and releases the
252 lock."""
253 assert not self.txn
254 assert not self._outstanding_txns
255
256 if self.lock_name and (not lock_name or lock_name != self.lock_name):
257 # Release previous lock.
258 self.__send_unlock_request()
259 self.lock_name = None
260 self.is_lock_contended = False
261
262 if lock_name and not self.lock_name:
263 # Acquire new lock.
264 self.lock_name = lock_name
265 self.__send_lock_request()
266
267 def __clear(self):
268 changed = False
269
270 for table in self.tables.itervalues():
271 if table.rows:
272 changed = True
273 table.rows = {}
99155935 274
8cdf0349
BP
275 if changed:
276 self.change_seqno += 1
277
278 def __update_has_lock(self, new_has_lock):
279 if new_has_lock and not self.has_lock:
280 if self._monitor_request_id is None:
281 self.change_seqno += 1
282 else:
283 # We're waiting for a monitor reply, so don't signal that the
284 # database changed. The monitor reply will increment
285 # change_seqno anyhow.
286 pass
287 self.is_lock_contended = False
288 self.has_lock = new_has_lock
289
290 def __do_send_lock_request(self, method):
291 self.__update_has_lock(False)
292 self._lock_request_id = None
293 if self._session.is_connected():
294 msg = ovs.jsonrpc.Message.create_request(method, [self.lock_name])
295 msg_id = msg.id
296 self._session.send(msg)
297 else:
298 msg_id = None
299 return msg_id
300
301 def __send_lock_request(self):
302 self._lock_request_id = self.__do_send_lock_request("lock")
303
304 def __send_unlock_request(self):
305 self.__do_send_lock_request("unlock")
306
307 def __parse_lock_reply(self, result):
308 self._lock_request_id = None
309 got_lock = type(result) == dict and result.get("locked") is True
310 self.__update_has_lock(got_lock)
311 if not got_lock:
312 self.is_lock_contended = True
313
314 def __parse_lock_notify(self, params, new_has_lock):
315 if (self.lock_name is not None
316 and type(params) in (list, tuple)
317 and params
318 and params[0] == self.lock_name):
319 self.__update_has_lock(self, new_has_lock)
320 if not new_has_lock:
321 self.is_lock_contended = True
99155935
BP
322
323 def __send_monitor_request(self):
324 monitor_requests = {}
8cdf0349 325 for table in self.tables.itervalues():
99155935
BP
326 monitor_requests[table.name] = {"columns": table.columns.keys()}
327 msg = ovs.jsonrpc.Message.create_request(
8cdf0349
BP
328 "monitor", [self._db.name, None, monitor_requests])
329 self._monitor_request_id = msg.id
330 self._session.send(msg)
99155935
BP
331
332 def __parse_update(self, update):
333 try:
334 self.__do_parse_update(update)
335 except error.Error, e:
3a656eaf
EJ
336 vlog.err("%s: error parsing update: %s"
337 % (self._session.get_name(), e))
99155935
BP
338
339 def __do_parse_update(self, table_updates):
340 if type(table_updates) != dict:
341 raise error.Error("<table-updates> is not an object",
342 table_updates)
343
344 for table_name, table_update in table_updates.iteritems():
8cdf0349 345 table = self.tables.get(table_name)
99155935 346 if not table:
f2d8ad13
BP
347 raise error.Error('<table-updates> includes unknown '
348 'table "%s"' % table_name)
99155935
BP
349
350 if type(table_update) != dict:
f2d8ad13
BP
351 raise error.Error('<table-update> for table "%s" is not '
352 'an object' % table_name, table_update)
99155935
BP
353
354 for uuid_string, row_update in table_update.iteritems():
49c541dc 355 if not ovs.ovsuuid.is_valid_string(uuid_string):
f2d8ad13
BP
356 raise error.Error('<table-update> for table "%s" '
357 'contains bad UUID "%s" as member '
358 'name' % (table_name, uuid_string),
99155935 359 table_update)
49c541dc 360 uuid = ovs.ovsuuid.from_string(uuid_string)
99155935
BP
361
362 if type(row_update) != dict:
f2d8ad13
BP
363 raise error.Error('<table-update> for table "%s" '
364 'contains <row-update> for %s that '
365 'is not an object'
99155935
BP
366 % (table_name, uuid_string))
367
af1eba26 368 parser = ovs.db.parser.Parser(row_update, "row-update")
4c0f6271
BP
369 old = parser.get_optional("old", [dict])
370 new = parser.get_optional("new", [dict])
371 parser.finish()
99155935 372
99155935 373 if not old and not new:
f2d8ad13
BP
374 raise error.Error('<row-update> missing "old" and '
375 '"new" members', row_update)
99155935 376
8cdf0349 377 if self.__process_update(table, uuid, old, new):
99155935
BP
378 self.change_seqno += 1
379
8cdf0349 380 def __process_update(self, table, uuid, old, new):
99155935 381 """Returns True if a column changed, False otherwise."""
8cdf0349 382 row = table.rows.get(uuid)
7d48f8f8 383 changed = False
99155935
BP
384 if not new:
385 # Delete row.
386 if row:
8cdf0349 387 del table.rows[uuid]
7d48f8f8 388 changed = True
99155935
BP
389 else:
390 # XXX rate-limit
3a656eaf
EJ
391 vlog.warn("cannot delete missing row %s from table %s"
392 % (uuid, table.name))
99155935
BP
393 elif not old:
394 # Insert row.
395 if not row:
396 row = self.__create_row(table, uuid)
7d48f8f8 397 changed = True
99155935
BP
398 else:
399 # XXX rate-limit
3a656eaf
EJ
400 vlog.warn("cannot add existing row %s to table %s"
401 % (uuid, table.name))
8cdf0349 402 if self.__row_update(table, row, new):
7d48f8f8 403 changed = True
99155935
BP
404 else:
405 if not row:
406 row = self.__create_row(table, uuid)
7d48f8f8 407 changed = True
99155935 408 # XXX rate-limit
3a656eaf
EJ
409 vlog.warn("cannot modify missing row %s in table %s"
410 % (uuid, table.name))
8cdf0349 411 if self.__row_update(table, row, new):
7d48f8f8
BP
412 changed = True
413 return changed
99155935 414
8cdf0349 415 def __row_update(self, table, row, row_json):
99155935
BP
416 changed = False
417 for column_name, datum_json in row_json.iteritems():
418 column = table.columns.get(column_name)
419 if not column:
420 # XXX rate-limit
3a656eaf
EJ
421 vlog.warn("unknown column %s updating table %s"
422 % (column_name, table.name))
99155935
BP
423 continue
424
425 try:
426 datum = ovs.db.data.Datum.from_json(column.type, datum_json)
427 except error.Error, e:
428 # XXX rate-limit
3a656eaf
EJ
429 vlog.warn("error parsing column %s in table %s: %s"
430 % (column_name, table.name, e))
99155935
BP
431 continue
432
8cdf0349
BP
433 if datum != row._data[column_name]:
434 row._data[column_name] = datum
435 if column.alert:
436 changed = True
99155935
BP
437 else:
438 # Didn't really change but the OVSDB monitor protocol always
439 # includes every value in a row.
440 pass
441 return changed
442
99155935 443 def __create_row(self, table, uuid):
8cdf0349 444 data = {}
99155935 445 for column in table.columns.itervalues():
8cdf0349
BP
446 data[column.name] = ovs.db.data.Datum.default(column.type)
447 row = table.rows[uuid] = Row(self, table, uuid, data)
99155935
BP
448 return row
449
8cdf0349
BP
450 def __error(self):
451 self._session.force_reconnect()
452
453 def __txn_abort_all(self):
454 while self._outstanding_txns:
455 txn = self._outstanding_txns.popitem()[1]
854a94d9 456 txn._status = Transaction.TRY_AGAIN
8cdf0349
BP
457
458 def __txn_process_reply(self, msg):
459 txn = self._outstanding_txns.pop(msg.id, None)
460 if txn:
461 txn._process_reply(msg)
462
26bb0f31 463
8cdf0349
BP
464def _uuid_to_row(atom, base):
465 if base.ref_table:
466 return base.ref_table.rows.get(atom)
467 else:
468 return atom
469
26bb0f31 470
8cdf0349
BP
471def _row_to_uuid(value):
472 if type(value) == Row:
473 return value.uuid
474 else:
475 return value
bf6ec045 476
26bb0f31 477
bf6ec045 478class Row(object):
8cdf0349
BP
479 """A row within an IDL.
480
481 The client may access the following attributes directly:
482
483 - 'uuid': a uuid.UUID object whose value is the row's database UUID.
484
485 - An attribute for each column in the Row's table, named for the column,
486 whose values are as returned by Datum.to_python() for the column's type.
487
488 If some error occurs (e.g. the database server's idea of the column is
489 different from the IDL's idea), then the attribute values is the
490 "default" value return by Datum.default() for the column's type. (It is
491 important to know this because the default value may violate constraints
492 for the column's type, e.g. the default integer value is 0 even if column
493 contraints require the column's value to be positive.)
494
495 When a transaction is active, column attributes may also be assigned new
496 values. Committing the transaction will then cause the new value to be
497 stored into the database.
498
499 *NOTE*: In the current implementation, the value of a column is a *copy*
500 of the value in the database. This means that modifying its value
501 directly will have no useful effect. For example, the following:
502 row.mycolumn["a"] = "b" # don't do this
503 will not change anything in the database, even after commit. To modify
504 the column, instead assign the modified column value back to the column:
505 d = row.mycolumn
506 d["a"] = "b"
507 row.mycolumn = d
508"""
509 def __init__(self, idl, table, uuid, data):
510 # All of the explicit references to self.__dict__ below are required
511 # to set real attributes with invoking self.__getattr__().
512 self.__dict__["uuid"] = uuid
513
514 self.__dict__["_idl"] = idl
515 self.__dict__["_table"] = table
516
517 # _data is the committed data. It takes the following values:
518 #
519 # - A dictionary that maps every column name to a Datum, if the row
520 # exists in the committed form of the database.
521 #
522 # - None, if this row is newly inserted within the active transaction
523 # and thus has no committed form.
524 self.__dict__["_data"] = data
525
526 # _changes describes changes to this row within the active transaction.
527 # It takes the following values:
528 #
529 # - {}, the empty dictionary, if no transaction is active or if the
530 # row has yet not been changed within this transaction.
531 #
532 # - A dictionary that maps a column name to its new Datum, if an
533 # active transaction changes those columns' values.
534 #
535 # - A dictionary that maps every column name to a Datum, if the row
536 # is newly inserted within the active transaction.
537 #
538 # - None, if this transaction deletes this row.
539 self.__dict__["_changes"] = {}
540
541 # A dictionary whose keys are the names of columns that must be
542 # verified as prerequisites when the transaction commits. The values
543 # in the dictionary are all None.
544 self.__dict__["_prereqs"] = {}
545
546 def __getattr__(self, column_name):
547 assert self._changes is not None
548
549 datum = self._changes.get(column_name)
550 if datum is None:
551 datum = self._data[column_name]
552
553 return datum.to_python(_uuid_to_row)
554
555 def __setattr__(self, column_name, value):
556 assert self._changes is not None
557 assert self._idl.txn
558
559 column = self._table.columns[column_name]
560 try:
561 datum = ovs.db.data.Datum.from_python(column.type, value,
562 _row_to_uuid)
563 except error.Error, e:
564 # XXX rate-limit
3a656eaf
EJ
565 vlog.err("attempting to write bad value to column %s (%s)"
566 % (column_name, e))
8cdf0349
BP
567 return
568 self._idl.txn._write(self, column, datum)
569
570 def verify(self, column_name):
571 """Causes the original contents of column 'column_name' in this row to
572 be verified as a prerequisite to completing the transaction. That is,
573 if 'column_name' changed in this row (or if this row was deleted)
574 between the time that the IDL originally read its contents and the time
575 that the transaction commits, then the transaction aborts and
854a94d9 576 Transaction.commit() returns Transaction.TRY_AGAIN.
8cdf0349
BP
577
578 The intention is that, to ensure that no transaction commits based on
579 dirty reads, an application should call Row.verify() on each data item
580 read as part of a read-modify-write operation.
581
582 In some cases Row.verify() reduces to a no-op, because the current
583 value of the column is already known:
584
585 - If this row is a row created by the current transaction (returned
586 by Transaction.insert()).
587
588 - If the column has already been modified within the current
589 transaction.
590
591 Because of the latter property, always call Row.verify() *before*
592 modifying the column, for a given read-modify-write.
593
594 A transaction must be in progress."""
595 assert self._idl.txn
596 assert self._changes is not None
597 if not self._data or column_name in self._changes:
598 return
599
600 self._prereqs[column_name] = None
601
602 def delete(self):
603 """Deletes this row from its table.
604
605 A transaction must be in progress."""
606 assert self._idl.txn
607 assert self._changes is not None
608 if self._data is None:
609 del self._idl.txn._txn_rows[self.uuid]
610 self.__dict__["_changes"] = None
611 del self._table.rows[self.uuid]
612
94fbe1aa
BP
613 def increment(self, column_name):
614 """Causes the transaction, when committed, to increment the value of
615 'column_name' within this row by 1. 'column_name' must have an integer
616 type. After the transaction commits successfully, the client may
617 retrieve the final (incremented) value of 'column_name' with
618 Transaction.get_increment_new_value().
619
620 The client could accomplish something similar by reading and writing
621 and verify()ing columns. However, increment() will never (by itself)
622 cause a transaction to fail because of a verify error.
623
624 The intended use is for incrementing the "next_cfg" column in
625 the Open_vSwitch table."""
626 self._idl.txn._increment(self, column_name)
627
26bb0f31 628
8cdf0349
BP
629def _uuid_name_from_uuid(uuid):
630 return "row%s" % str(uuid).replace("-", "_")
631
26bb0f31 632
8cdf0349
BP
633def _where_uuid_equals(uuid):
634 return [["_uuid", "==", ["uuid", str(uuid)]]]
635
26bb0f31 636
8cdf0349
BP
637class _InsertedRow(object):
638 def __init__(self, op_index):
639 self.op_index = op_index
640 self.real = None
641
26bb0f31 642
8cdf0349 643class Transaction(object):
2f926787
BP
644 """A transaction may modify the contents of a database by modifying the
645 values of columns, deleting rows, inserting rows, or adding checks that
646 columns in the database have not changed ("verify" operations), through
647 Row methods.
648
649 Reading and writing columns and inserting and deleting rows are all
650 straightforward. The reasons to verify columns are less obvious.
651 Verification is the key to maintaining transactional integrity. Because
652 OVSDB handles multiple clients, it can happen that between the time that
653 OVSDB client A reads a column and writes a new value, OVSDB client B has
654 written that column. Client A's write should not ordinarily overwrite
655 client B's, especially if the column in question is a "map" column that
656 contains several more or less independent data items. If client A adds a
657 "verify" operation before it writes the column, then the transaction fails
658 in case client B modifies it first. Client A will then see the new value
659 of the column and compose a new transaction based on the new contents
660 written by client B.
661
662 When a transaction is complete, which must be before the next call to
663 Idl.run(), call Transaction.commit() or Transaction.abort().
664
665 The life-cycle of a transaction looks like this:
666
667 1. Create the transaction and record the initial sequence number:
668
669 seqno = idl.change_seqno(idl)
670 txn = Transaction(idl)
671
672 2. Modify the database with Row and Transaction methods.
673
674 3. Commit the transaction by calling Transaction.commit(). The first call
675 to this function probably returns Transaction.INCOMPLETE. The client
676 must keep calling again along as this remains true, calling Idl.run() in
677 between to let the IDL do protocol processing. (If the client doesn't
678 have anything else to do in the meantime, it can use
679 Transaction.commit_block() to avoid having to loop itself.)
680
681 4. If the final status is Transaction.TRY_AGAIN, wait for Idl.change_seqno
682 to change from the saved 'seqno' (it's possible that it's already
683 changed, in which case the client should not wait at all), then start
684 over from step 1. Only a call to Idl.run() will change the return value
685 of Idl.change_seqno. (Transaction.commit_block() calls Idl.run().)"""
686
8cdf0349 687 # Status values that Transaction.commit() can return.
26bb0f31
EJ
688 UNCOMMITTED = "uncommitted" # Not yet committed or aborted.
689 UNCHANGED = "unchanged" # Transaction didn't include any changes.
690 INCOMPLETE = "incomplete" # Commit in progress, please wait.
691 ABORTED = "aborted" # ovsdb_idl_txn_abort() called.
692 SUCCESS = "success" # Commit successful.
854a94d9 693 TRY_AGAIN = "try again" # Commit failed because a "verify" operation
26bb0f31 694 # reported an inconsistency, due to a network
4fdfe5cc
BP
695 # problem, or other transient failure. Wait
696 # for a change, then try again.
26bb0f31
EJ
697 NOT_LOCKED = "not locked" # Server hasn't given us the lock yet.
698 ERROR = "error" # Commit failed due to a hard error.
8cdf0349
BP
699
700 @staticmethod
701 def status_to_string(status):
702 """Converts one of the status values that Transaction.commit() can
703 return into a human-readable string.
704
705 (The status values are in fact such strings already, so
706 there's nothing to do.)"""
707 return status
708
709 def __init__(self, idl):
710 """Starts a new transaction on 'idl' (an instance of ovs.db.idl.Idl).
711 A given Idl may only have a single active transaction at a time.
712
713 A Transaction may modify the contents of a database by assigning new
714 values to columns (attributes of Row), deleting rows (with
715 Row.delete()), or inserting rows (with Transaction.insert()). It may
716 also check that columns in the database have not changed with
717 Row.verify().
718
719 When a transaction is complete (which must be before the next call to
720 Idl.run()), call Transaction.commit() or Transaction.abort()."""
721 assert idl.txn is None
722
723 idl.txn = self
724 self._request_id = None
725 self.idl = idl
726 self.dry_run = False
727 self._txn_rows = {}
728 self._status = Transaction.UNCOMMITTED
729 self._error = None
730 self._comments = []
4fdfe5cc 731 self._commit_seqno = self.idl.change_seqno
8cdf0349 732
94fbe1aa 733 self._inc_row = None
8cdf0349 734 self._inc_column = None
8cdf0349 735
26bb0f31 736 self._inserted_rows = {} # Map from UUID to _InsertedRow
8cdf0349
BP
737
738 def add_comment(self, comment):
739 """Appens 'comment' to the comments that will be passed to the OVSDB
740 server when this transaction is committed. (The comment will be
741 committed to the OVSDB log, which "ovsdb-tool show-log" can print in a
742 relatively human-readable form.)"""
743 self._comments.append(comment)
744
8cdf0349 745 def wait(self, poller):
2f926787
BP
746 """Causes poll_block() to wake up if this transaction has completed
747 committing."""
8cdf0349
BP
748 if self._status not in (Transaction.UNCOMMITTED,
749 Transaction.INCOMPLETE):
750 poller.immediate_wake()
751
752 def _substitute_uuids(self, json):
753 if type(json) in (list, tuple):
754 if (len(json) == 2
755 and json[0] == 'uuid'
756 and ovs.ovsuuid.is_valid_string(json[1])):
757 uuid = ovs.ovsuuid.from_string(json[1])
758 row = self._txn_rows.get(uuid, None)
759 if row and row._data is None:
760 return ["named-uuid", _uuid_name_from_uuid(uuid)]
761 return json
762
763 def __disassemble(self):
764 self.idl.txn = None
765
766 for row in self._txn_rows.itervalues():
767 if row._changes is None:
768 row._table.rows[row.uuid] = row
769 elif row._data is None:
770 del row._table.rows[row.uuid]
771 row.__dict__["_changes"] = {}
772 row.__dict__["_prereqs"] = {}
773 self._txn_rows = {}
774
775 def commit(self):
2f926787
BP
776 """Attempts to commit 'txn'. Returns the status of the commit
777 operation, one of the following constants:
778
779 Transaction.INCOMPLETE:
780
781 The transaction is in progress, but not yet complete. The caller
782 should call again later, after calling Idl.run() to let the
783 IDL do OVSDB protocol processing.
784
785 Transaction.UNCHANGED:
786
787 The transaction is complete. (It didn't actually change the
788 database, so the IDL didn't send any request to the database
789 server.)
790
791 Transaction.ABORTED:
792
793 The caller previously called Transaction.abort().
794
795 Transaction.SUCCESS:
796
797 The transaction was successful. The update made by the
798 transaction (and possibly other changes made by other database
799 clients) should already be visible in the IDL.
800
801 Transaction.TRY_AGAIN:
802
803 The transaction failed for some transient reason, e.g. because a
804 "verify" operation reported an inconsistency or due to a network
805 problem. The caller should wait for a change to the database,
806 then compose a new transaction, and commit the new transaction.
807
808 Use Idl.change_seqno to wait for a change in the database. It is
809 important to use its value *before* the initial call to
810 Transaction.commit() as the baseline for this purpose, because
811 the change that one should wait for can happen after the initial
812 call but before the call that returns Transaction.TRY_AGAIN, and
813 using some other baseline value in that situation could cause an
814 indefinite wait if the database rarely changes.
815
816 Transaction.NOT_LOCKED:
817
818 The transaction failed because the IDL has been configured to
819 require a database lock (with Idl.set_lock()) but didn't
820 get it yet or has already lost it.
8cdf0349
BP
821
822 Committing a transaction rolls back all of the changes that it made to
2f926787 823 the IDL's copy of the database. If the transaction commits
8cdf0349 824 successfully, then the database server will send an update and, thus,
2f926787 825 the IDL will be updated with the committed changes."""
8cdf0349
BP
826 # The status can only change if we're the active transaction.
827 # (Otherwise, our status will change only in Idl.run().)
828 if self != self.idl.txn:
829 return self._status
830
831 # If we need a lock but don't have it, give up quickly.
832 if self.idl.lock_name and not self.idl.has_lock():
833 self._status = Transaction.NOT_LOCKED
834 self.__disassemble()
835 return self._status
836
837 operations = [self.idl._db.name]
838
839 # Assert that we have the required lock (avoiding a race).
840 if self.idl.lock_name:
841 operations.append({"op": "assert",
842 "lock": self.idl.lock_name})
843
844 # Add prerequisites and declarations of new rows.
845 for row in self._txn_rows.itervalues():
846 if row._prereqs:
847 rows = {}
848 columns = []
849 for column_name in row._prereqs:
850 columns.append(column_name)
851 rows[column_name] = row._data[column_name].to_json()
852 operations.append({"op": "wait",
853 "table": row._table.name,
854 "timeout": 0,
855 "where": _where_uuid_equals(row.uuid),
856 "until": "==",
857 "columns": columns,
858 "rows": [rows]})
859
860 # Add updates.
861 any_updates = False
862 for row in self._txn_rows.itervalues():
863 if row._changes is None:
864 if row._table.is_root:
865 operations.append({"op": "delete",
866 "table": row._table.name,
867 "where": _where_uuid_equals(row.uuid)})
868 any_updates = True
869 else:
870 # Let ovsdb-server decide whether to really delete it.
871 pass
872 elif row._changes:
873 op = {"table": row._table.name}
874 if row._data is None:
875 op["op"] = "insert"
876 op["uuid-name"] = _uuid_name_from_uuid(row.uuid)
877 any_updates = True
878
879 op_index = len(operations) - 1
880 self._inserted_rows[row.uuid] = _InsertedRow(op_index)
881 else:
882 op["op"] = "update"
883 op["where"] = _where_uuid_equals(row.uuid)
884
885 row_json = {}
886 op["row"] = row_json
887
888 for column_name, datum in row._changes.iteritems():
889 if row._data is not None or not datum.is_default():
26bb0f31
EJ
890 row_json[column_name] = (
891 self._substitute_uuids(datum.to_json()))
8cdf0349
BP
892
893 # If anything really changed, consider it an update.
894 # We can't suppress not-really-changed values earlier
895 # or transactions would become nonatomic (see the big
896 # comment inside Transaction._write()).
897 if (not any_updates and row._data is not None and
898 row._data[column_name] != datum):
899 any_updates = True
900
901 if row._data is None or row_json:
902 operations.append(op)
903
904 # Add increment.
94fbe1aa 905 if self._inc_row and any_updates:
8cdf0349
BP
906 self._inc_index = len(operations) - 1
907
908 operations.append({"op": "mutate",
94fbe1aa 909 "table": self._inc_row._table.name,
26bb0f31 910 "where": self._substitute_uuids(
94fbe1aa 911 _where_uuid_equals(self._inc_row.uuid)),
8cdf0349
BP
912 "mutations": [[self._inc_column, "+=", 1]]})
913 operations.append({"op": "select",
94fbe1aa 914 "table": self._inc_row._table.name,
26bb0f31 915 "where": self._substitute_uuids(
94fbe1aa 916 _where_uuid_equals(self._inc_row.uuid)),
8cdf0349
BP
917 "columns": [self._inc_column]})
918
919 # Add comment.
920 if self._comments:
921 operations.append({"op": "comment",
922 "comment": "\n".join(self._comments)})
923
924 # Dry run?
925 if self.dry_run:
926 operations.append({"op": "abort"})
927
928 if not any_updates:
929 self._status = Transaction.UNCHANGED
930 else:
931 msg = ovs.jsonrpc.Message.create_request("transact", operations)
932 self._request_id = msg.id
933 if not self.idl._session.send(msg):
934 self.idl._outstanding_txns[self._request_id] = self
935 self._status = Transaction.INCOMPLETE
936 else:
854a94d9 937 self._status = Transaction.TRY_AGAIN
8cdf0349
BP
938
939 self.__disassemble()
940 return self._status
941
942 def commit_block(self):
2f926787
BP
943 """Attempts to commit this transaction, blocking until the commit
944 either succeeds or fails. Returns the final commit status, which may
945 be any Transaction.* value other than Transaction.INCOMPLETE.
946
947 This function calls Idl.run() on this transaction'ss IDL, so it may
948 cause Idl.change_seqno to change."""
8cdf0349
BP
949 while True:
950 status = self.commit()
951 if status != Transaction.INCOMPLETE:
952 return status
953
954 self.idl.run()
955
956 poller = ovs.poller.Poller()
957 self.idl.wait(poller)
958 self.wait(poller)
959 poller.block()
960
961 def get_increment_new_value(self):
2f926787
BP
962 """Returns the final (incremented) value of the column in this
963 transaction that was set to be incremented by Row.increment. This
964 transaction must have committed successfully."""
8cdf0349
BP
965 assert self._status == Transaction.SUCCESS
966 return self._inc_new_value
967
968 def abort(self):
969 """Aborts this transaction. If Transaction.commit() has already been
970 called then the transaction might get committed anyhow."""
971 self.__disassemble()
972 if self._status in (Transaction.UNCOMMITTED,
973 Transaction.INCOMPLETE):
974 self._status = Transaction.ABORTED
975
976 def get_error(self):
977 """Returns a string representing this transaction's current status,
978 suitable for use in log messages."""
979 if self._status != Transaction.ERROR:
980 return Transaction.status_to_string(self._status)
981 elif self._error:
982 return self._error
983 else:
984 return "no error details available"
985
986 def __set_error_json(self, json):
987 if self._error is None:
988 self._error = ovs.json.to_string(json)
989
990 def get_insert_uuid(self, uuid):
991 """Finds and returns the permanent UUID that the database assigned to a
992 newly inserted row, given the UUID that Transaction.insert() assigned
993 locally to that row.
994
995 Returns None if 'uuid' is not a UUID assigned by Transaction.insert()
996 or if it was assigned by that function and then deleted by Row.delete()
997 within the same transaction. (Rows that are inserted and then deleted
998 within a single transaction are never sent to the database server, so
999 it never assigns them a permanent UUID.)
1000
1001 This transaction must have completed successfully."""
1002 assert self._status in (Transaction.SUCCESS,
1003 Transaction.UNCHANGED)
1004 inserted_row = self._inserted_rows.get(uuid)
1005 if inserted_row:
1006 return inserted_row.real
1007 return None
1008
94fbe1aa
BP
1009 def _increment(self, row, column):
1010 assert not self._inc_row
1011 self._inc_row = row
1012 self._inc_column = column
1013
8cdf0349
BP
1014 def _write(self, row, column, datum):
1015 assert row._changes is not None
1016
1017 txn = row._idl.txn
1018
1019 # If this is a write-only column and the datum being written is the
1020 # same as the one already there, just skip the update entirely. This
1021 # is worth optimizing because we have a lot of columns that get
1022 # periodically refreshed into the database but don't actually change
1023 # that often.
1024 #
1025 # We don't do this for read/write columns because that would break
1026 # atomicity of transactions--some other client might have written a
1027 # different value in that column since we read it. (But if a whole
1028 # transaction only does writes of existing values, without making any
1029 # real changes, we will drop the whole transaction later in
1030 # ovsdb_idl_txn_commit().)
1031 if not column.alert and row._data.get(column.name) == datum:
1032 new_value = row._changes.get(column.name)
1033 if new_value is None or new_value == datum:
1034 return
1035
1036 txn._txn_rows[row.uuid] = row
1037 row._changes[column.name] = datum.copy()
1038
1039 def insert(self, table, new_uuid=None):
1040 """Inserts and returns a new row in 'table', which must be one of the
1041 ovs.db.schema.TableSchema objects in the Idl's 'tables' dict.
1042
1043 The new row is assigned a provisional UUID. If 'uuid' is None then one
1044 is randomly generated; otherwise 'uuid' should specify a randomly
1045 generated uuid.UUID not otherwise in use. ovsdb-server will assign a
1046 different UUID when 'txn' is committed, but the IDL will replace any
1047 uses of the provisional UUID in the data to be to be committed by the
1048 UUID assigned by ovsdb-server."""
1049 assert self._status == Transaction.UNCOMMITTED
1050 if new_uuid is None:
1051 new_uuid = uuid.uuid4()
1052 row = Row(self.idl, table, new_uuid, None)
1053 table.rows[row.uuid] = row
1054 self._txn_rows[row.uuid] = row
1055 return row
1056
1057 def _process_reply(self, msg):
1058 if msg.type == ovs.jsonrpc.Message.T_ERROR:
1059 self._status = Transaction.ERROR
1060 elif type(msg.result) not in (list, tuple):
1061 # XXX rate-limit
3a656eaf 1062 vlog.warn('reply to "transact" is not JSON array')
8cdf0349
BP
1063 else:
1064 hard_errors = False
1065 soft_errors = False
1066 lock_errors = False
1067
1068 ops = msg.result
1069 for op in ops:
1070 if op is None:
1071 # This isn't an error in itself but indicates that some
1072 # prior operation failed, so make sure that we know about
1073 # it.
1074 soft_errors = True
1075 elif type(op) == dict:
1076 error = op.get("error")
1077 if error is not None:
1078 if error == "timed out":
1079 soft_errors = True
1080 elif error == "not owner":
1081 lock_errors = True
1082 elif error == "aborted":
1083 pass
1084 else:
1085 hard_errors = True
1086 self.__set_error_json(op)
1087 else:
1088 hard_errors = True
1089 self.__set_error_json(op)
1090 # XXX rate-limit
3a656eaf 1091 vlog.warn("operation reply is not JSON null or object")
8cdf0349
BP
1092
1093 if not soft_errors and not hard_errors and not lock_errors:
94fbe1aa 1094 if self._inc_row and not self.__process_inc_reply(ops):
8cdf0349
BP
1095 hard_errors = True
1096
1097 for insert in self._inserted_rows.itervalues():
1098 if not self.__process_insert_reply(insert, ops):
1099 hard_errors = True
1100
1101 if hard_errors:
1102 self._status = Transaction.ERROR
1103 elif lock_errors:
1104 self._status = Transaction.NOT_LOCKED
1105 elif soft_errors:
854a94d9 1106 self._status = Transaction.TRY_AGAIN
8cdf0349
BP
1107 else:
1108 self._status = Transaction.SUCCESS
1109
1110 @staticmethod
1111 def __check_json_type(json, types, name):
1112 if not json:
1113 # XXX rate-limit
3a656eaf 1114 vlog.warn("%s is missing" % name)
8cdf0349
BP
1115 return False
1116 elif type(json) not in types:
1117 # XXX rate-limit
3a656eaf 1118 vlog.warn("%s has unexpected type %s" % (name, type(json)))
8cdf0349
BP
1119 return False
1120 else:
1121 return True
1122
1123 def __process_inc_reply(self, ops):
1124 if self._inc_index + 2 > len(ops):
1125 # XXX rate-limit
3a656eaf
EJ
1126 vlog.warn("reply does not contain enough operations for "
1127 "increment (has %d, needs %d)" %
1128 (len(ops), self._inc_index + 2))
8cdf0349
BP
1129
1130 # We know that this is a JSON object because the loop in
1131 # __process_reply() already checked.
1132 mutate = ops[self._inc_index]
1133 count = mutate.get("count")
1134 if not Transaction.__check_json_type(count, (int, long),
1135 '"mutate" reply "count"'):
1136 return False
1137 if count != 1:
1138 # XXX rate-limit
3a656eaf 1139 vlog.warn('"mutate" reply "count" is %d instead of 1' % count)
8cdf0349
BP
1140 return False
1141
1142 select = ops[self._inc_index + 1]
1143 rows = select.get("rows")
1144 if not Transaction.__check_json_type(rows, (list, tuple),
1145 '"select" reply "rows"'):
1146 return False
1147 if len(rows) != 1:
1148 # XXX rate-limit
3a656eaf
EJ
1149 vlog.warn('"select" reply "rows" has %d elements '
1150 'instead of 1' % len(rows))
8cdf0349
BP
1151 return False
1152 row = rows[0]
1153 if not Transaction.__check_json_type(row, (dict,),
1154 '"select" reply row'):
1155 return False
1156 column = row.get(self._inc_column)
1157 if not Transaction.__check_json_type(column, (int, long),
1158 '"select" reply inc column'):
1159 return False
1160 self._inc_new_value = column
1161 return True
1162
1163 def __process_insert_reply(self, insert, ops):
1164 if insert.op_index >= len(ops):
1165 # XXX rate-limit
3a656eaf
EJ
1166 vlog.warn("reply does not contain enough operations "
1167 "for insert (has %d, needs %d)"
1168 % (len(ops), insert.op_index))
8cdf0349
BP
1169 return False
1170
1171 # We know that this is a JSON object because the loop in
1172 # __process_reply() already checked.
1173 reply = ops[insert.op_index]
1174 json_uuid = reply.get("uuid")
1175 if not Transaction.__check_json_type(json_uuid, (tuple, list),
1176 '"insert" reply "uuid"'):
1177 return False
1178
1179 try:
1180 uuid_ = ovs.ovsuuid.from_json(json_uuid)
1181 except error.Error:
1182 # XXX rate-limit
3a656eaf 1183 vlog.warn('"insert" reply "uuid" is not a JSON UUID')
8cdf0349 1184 return False
bf6ec045 1185
8cdf0349
BP
1186 insert.real = uuid_
1187 return True
ad0991e6
EJ
1188
1189
1190class SchemaHelper(object):
1191 """IDL Schema helper.
1192
1193 This class encapsulates the logic required to generate schemas suitable
1194 for creating 'ovs.db.idl.Idl' objects. Clients should register columns
1195 they are interested in using register_columns(). When finished, the
1196 get_idl_schema() function may be called.
1197
1198 The location on disk of the schema used may be found in the
1199 'schema_location' variable."""
1200
e15ad8e6
IY
1201 def __init__(self, location=None, schema_json=None):
1202 """Creates a new Schema object.
ad0991e6 1203
e15ad8e6
IY
1204 'location' file path to ovs schema. None means default location
1205 'schema_json' schema in json preresentation in memory
1206 """
1207
1208 if location and schema_json:
1209 raise ValueError("both location and schema_json can't be "
1210 "specified. it's ambiguous.")
1211 if schema_json is None:
1212 if location is None:
1213 location = "%s/vswitch.ovsschema" % ovs.dirs.PKGDATADIR
1214 schema_json = ovs.json.from_file(location)
ad0991e6 1215
e15ad8e6 1216 self.schema_json = schema_json
ad0991e6 1217 self._tables = {}
bf42f674 1218 self._all = False
ad0991e6
EJ
1219
1220 def register_columns(self, table, columns):
1221 """Registers interest in the given 'columns' of 'table'. Future calls
1222 to get_idl_schema() will include 'table':column for each column in
1223 'columns'. This function automatically avoids adding duplicate entries
1224 to the schema.
1225
1226 'table' must be a string.
1227 'columns' must be a list of strings.
1228 """
1229
1230 assert type(table) is str
1231 assert type(columns) is list
1232
1233 columns = set(columns) | self._tables.get(table, set())
1234 self._tables[table] = columns
1235
7698e31d
IY
1236 def register_table(self, table):
1237 """Registers interest in the given all columns of 'table'. Future calls
1238 to get_idl_schema() will include all columns of 'table'.
1239
1240 'table' must be a string
1241 """
1242 assert type(table) is str
1243 self._tables[table] = set() # empty set means all columns in the table
1244
bf42f674
EJ
1245 def register_all(self):
1246 """Registers interest in every column of every table."""
1247 self._all = True
1248
ad0991e6
EJ
1249 def get_idl_schema(self):
1250 """Gets a schema appropriate for the creation of an 'ovs.db.id.IDL'
1251 object based on columns registered using the register_columns()
1252 function."""
1253
e15ad8e6
IY
1254 schema = ovs.db.schema.DbSchema.from_json(self.schema_json)
1255 self.schema_json = None
ad0991e6 1256
bf42f674
EJ
1257 if not self._all:
1258 schema_tables = {}
1259 for table, columns in self._tables.iteritems():
1260 schema_tables[table] = (
1261 self._keep_table_columns(schema, table, columns))
1262
1263 schema.tables = schema_tables
ad0991e6
EJ
1264 return schema
1265
1266 def _keep_table_columns(self, schema, table_name, columns):
1267 assert table_name in schema.tables
1268 table = schema.tables[table_name]
1269
7698e31d
IY
1270 if not columns:
1271 # empty set means all columns in the table
1272 return table
1273
ad0991e6
EJ
1274 new_columns = {}
1275 for column_name in columns:
1276 assert type(column_name) is str
1277 assert column_name in table.columns
1278
1279 new_columns[column_name] = table.columns[column_name]
1280
1281 table.columns = new_columns
1282 return table